Code Monkey home page Code Monkey logo

akka-persistence-dynamodb's Introduction

氏名

加藤 潤一(かとう じゅんいち)

生年月日

1972年2月3日

主なスキル

  • Scala/Rust
    • 本業ではScala/Akkaを使った分散システムの設計・実装を担当しています。
    • 趣味ではもっぱらRustで遊んでいます。
  • ドメイン駆動設計
    • 10年以上現場での導入・実践経験があります。チームへのインストールも数多くやっています。
  • CQRS/Event Sourcing
  • リアクティブ・システム

書籍のレビュー

直近の職歴

  • Chatwork株式会社 2014/7〜
    • テックリード
  • グリー株式会社 2013/3〜2014/6
    • シニアエンジニア
  • 株式会社ドワンゴ 2011/8〜2013/2

ソフトウェア設計全般を支援するサービス

勉強会・座談会・相談会は初回無料で対応可能です。 詳細は以下をご覧ください。

https://utopian-cyclamen-728.notion.site/106c8956bc33489b86874694ba9589fd

連絡先

ご用命がある場合は [email protected] までご連絡ください。

リンク

Github Sponsor

https://github.com/sponsors/j5ik2o

Github Stats

akka-persistence-dynamodb's People

Contributors

dependabot[bot] avatar dlagnohed avatar j5ik2o avatar j5ik2o-bot[bot] avatar mergify[bot] avatar mmccuiston avatar pjfanning avatar renovate[bot] avatar s10myk4 avatar scala-steward avatar scotartt avatar vladimir-lu avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

akka-persistence-dynamodb's Issues

[NotIssue] Question on Sequence based and PartitionId based key schemes

Hi,

I am not sure where I can ask this question, so I open it here.

I am trying to wrap my head around the two key schemes for events: the default sequence number based and the partition id based as explained in the write sharding section of the usage document.

Given an entity MyEntity with id 12345, 5 events for that entity with sequence numbers 1-5 and 2 (default) write shards configured I understand that:

Under Sequence Number based scheme, the partition and sort key for the events will be as followed:

Partition Key = ${persistenceId}-${sequenceNumber % shardCount}
Sort Key = ${sequenceNumber}

seq partition key sort key
1 MyEntity-12345-1 1
2 MyEntity-12345-0 2
3 MyEntity-12345-1 3
4 MyEntity-12345-0 4
5 MyEntity-12345-1 5

This means that half of events will end up in one dynamodb partition and the other half in another one.

This means that when querying for events for that entity IN ORDER they occured, you need to query both partitions independently and merge the results based on the sort key.

Under PartitionId based scheme, the keys will be as followed (assuming ${md5(persistenceId.reverse) % shardCount} is 0)

Partition Key = ${persistenceId.prefix}-${md5(persistenceId.reverse) % shardCount}
Sort Key = ${persistenceId.body}-${sequenceNumber}

seq partition key sort key
1 MyEntity-0 MyEntity-12345-1
2 MyEntity-0 MyEntity-12345-2
3 MyEntity-0 MyEntity-12345-3
4 MyEntity-0 MyEntity-12345-4
5 MyEntity-0 MyEntity-12345-5

This means that half of entities of MyEntity type will be in one dynamodb partition and the other half in another one.

This means that all events of MyEntity entity with given id will be in the same dynamodb partition and that means when querying back for all events IN ORDER they occurred will require one query against one partition only.

The documentation states:

If you want to load events chronologically in DynamoDB Streams, choose pattern-2.

pattern-2 being PartitionId based approach. I thought that events in dynamodb stream appear in order of modifications happenign on the table regardless of partition from dynamod db streams doc:

DynamoDB Streams captures a time-ordered sequence of item-level modifications in any DynamoDB table and stores this information in a log for up to 24 hours

So how exactly the key scheme affects the ordering on the table stream?

Side question: What is the reverse in persistence.reverse for?

Thx

eventsByTag do not produce new events

DynamoDBReadJournal eventsByTag does only read current events

ex:

  sealed trait Event
  final case class NoteAdded(text: String) extends Event

  sealed trait Command
  final case class AddNote(text: String) extends Command


  def noteBehaviour(id: String) = EventSourcedBehavior[Command, Event, Unit](
    persistenceId = PersistenceId.of("notes", id),
    emptyState = (),
    commandHandler = {
      case (state, AddNote(text)) => Effect.persist(NoteAdded(text)).thenRun { s =>
        logger.info("{}: Added note: [{}]",id, text)
      }
    },
    eventHandler = {
      case _ => ()
    }
  ).withTagger(_ => Set("notes"))

  val guardian = Behaviors.setup[String] { ctx =>
    Behaviors.withTimers { timerFactory =>
      val note1 = ctx.spawn(noteBehaviour("1"), "1")

      timerFactory.startTimerAtFixedRate(UUID.randomUUID().toString, 5 seconds)

      Behaviors.receive[String] { (ctx, str) =>
        note1 ! AddNote(str)
        Behaviors.same[String]
      }
    }
  }

  implicit val system = ActorSystem[String](guardian, "TestSystem", config)
  implicit val ec = system.executionContext


  val readJournal: DynamoDBReadJournal =
    PersistenceQuery(system).readJournalFor[DynamoDBReadJournal](DynamoDBReadJournal.Identifier)

  readJournal.eventsByTag("notes", Offset.noOffset)
    .runForeach(e => logger.info("Event: [{}]", e))

results in:

2021-06-04 11:14:07,895 [INFO ] | Main$:	1: Added note: [dce35195-6e36-41ed-8eaf-58f8ba7e40b5] | persistencePhase=persist-evt, akkaAddress=akka://TestSystem, akkaSource=akka://TestSystem/user/1, sourceActorSystem=TestSystem, persistenceId=notes|1
2021-06-04 11:14:12,723 [INFO ] | Main$:	1: Added note: [dce35195-6e36-41ed-8eaf-58f8ba7e40b5] | persistencePhase=persist-evt, akkaAddress=akka://TestSystem, akkaSource=akka://TestSystem/user/1, sourceActorSystem=TestSystem, persistenceId=notes|1
2021-06-04 11:14:13,906 [INFO ] | Main$:	Event: [EventEnvelope(Sequence(1),notes|1,1,NoteAdded(dce35195-6e36-41ed-8eaf-58f8ba7e40b5),0,None)] | 
2021-06-04 11:14:13,907 [INFO ] | Main$:	Event: [EventEnvelope(Sequence(2),notes|1,2,NoteAdded(dce35195-6e36-41ed-8eaf-58f8ba7e40b5),0,None)] | 
2021-06-04 11:14:17,759 [INFO ] | Main$:	1: Added note: [dce35195-6e36-41ed-8eaf-58f8ba7e40b5] | persistencePhase=persist-evt, akkaAddress=akka://TestSystem, akkaSource=akka://TestSystem/user/1, sourceActorSystem=TestSystem, persistenceId=notes|1
2021-06-04 11:14:22,718 [INFO ] | Main$:	1: Added note: [dce35195-6e36-41ed-8eaf-58f8ba7e40b5] | persistencePhase=persist-evt, akkaAddress=akka://TestSystem, akkaSource=akka://TestSystem/user/1, sourceActorSystem=TestSystem, persistenceId=notes|1
2021-06-04 11:14:27,738 [INFO ] | Main$:	1: Added note: [dce35195-6e36-41ed-8eaf-58f8ba7e40b5] | persistencePhase=persist-evt, akkaAddress=akka://TestSystem, akkaSource=akka://TestSystem/user/1, sourceActorSystem=TestSystem, persistenceId=notes|1
2021-06-04 11:14:32,726 [INFO ] | Main$:	1: Added note: [dce35195-6e36-41ed-8eaf-58f8ba7e40b5] | persistencePhase=persist-evt, akkaAddress=akka://TestSystem, akkaSource=akka://TestSystem/user/1, sourceActorSystem=TestSystem, persistenceId=notes|1
2021-06-04 11:14:37,736 [INFO ] | Main$:	1: Added note: [dce35195-6e36-41ed-8eaf-58f8ba7e40b5] | persistencePhase=persist-evt, akkaAddress=akka://TestSystem, akkaSource=akka://TestSystem/user/1, sourceActorSystem=TestSystem, persistenceId=notes|1
2021-06-04 11:14:42,722 [INFO ] | Main$:	1: Added note: [dce35195-6e36-41ed-8eaf-58f8ba7e40b5] | persistencePhase=persist-evt, akkaAddress=akka://TestSystem, akkaSource=akka://TestSystem/user/1, sourceActorSystem=TestSystem, persistenceId=notes|1
2021-06-04 11:14:47,731 [INFO ] | Main$:	1: Added note: [dce35195-6e36-41ed-8eaf-58f8ba7e40b5] | persistencePhase=persist-evt, akkaAddress=akka://TestSystem, akkaSource=akka://TestSystem/user/1, sourceActorSystem=TestSystem, persistenceId=notes|1
2021-06-04 11:14:52,735 [INFO ] | Main$:	1: Added note: [dce35195-6e36-41ed-8eaf-58f8ba7e40b5] | persistencePhase=persist-evt, akkaAddress=akka://TestSystem, akkaSource=akka://TestSystem/user/1, sourceActorSystem=TestSystem, persistenceId=notes|1

Dependency Dashboard

This issue lists Renovate updates and detected dependencies. Read the Dependency Dashboard docs to learn more.

Rate-Limited

These updates are currently rate-limited. Click on a checkbox below to force their creation now.

  • chore(deps): Update dependency com.amazonaws:aws-java-sdk-dynamodb to v1.12.692
  • chore(deps): Update dependency sbt/sbt to v1.9.9
  • chore(deps): Update testcontainers-java monorepo to v1.19.7 (org.testcontainers:kafka, org.testcontainers:localstack, org.testcontainers:testcontainers)
  • chore(deps): Update testcontainersScalaVersion to v0.41.3 (com.dimafeng:testcontainers-scala, com.dimafeng:testcontainers-scala-kafka, com.dimafeng:testcontainers-scala-localstack, com.dimafeng:testcontainers-scala-scalatest)
  • chore(deps): Update dependency ch.epfl.scala:sbt-scalafix to v0.12.0
  • chore(deps): Update dependency ch.qos.logback:logback-classic to v1.5.3
  • chore(deps): Update dependency com.fasterxml.jackson.module:jackson-module-scala to v2.17.0
  • chore(deps): Update dependency software.amazon.awssdk:dynamodb to v2.25.22
  • chore(deps): Update mathieudutour/github-tag-action action to v6.2
  • 🔐 Create all rate-limited PRs at once 🔐

Open

These updates have all been created already. Click a checkbox below to force a retry/rebase of any.

Detected dependencies

asdf
.tool-versions
  • python 3.12.2
docker-compose
docker-compose.yml
dockerfile
tools/dynamodb-setup/Dockerfile
github-actions
.github/workflows/bump-version.yml
  • actions/checkout v4
  • mathieudutour/github-tag-action v6.1
  • actions/create-release v1
.github/workflows/ci.yml
  • actions/checkout v4
  • actions/setup-java v4
  • actions/checkout v4
  • actions/setup-java v4
.github/workflows/release.yml
  • actions/checkout v4
  • actions/setup-java v4
  • olafurpg/setup-gpg v3
.github/workflows/snapshot.yml
  • actions/checkout v4
  • actions/setup-java v4
  • olafurpg/setup-gpg v3
pip_requirements
docs/requirements.txt
  • alabaster ==0.7.16
  • Babel ==2.14.0
  • certifi ==2023.11.17
  • charset-normalizer ==3.3.2
  • docutils ==0.20.1
  • idna ==3.6
  • imagesize ==1.4.1
  • Jinja2 ==3.1.3
  • markdown-it-py ==3.0.0
  • MarkupSafe ==2.1.5
  • mdit-py-plugins ==0.4.0
  • mdurl ==0.1.2
  • mistune ==3.0.2
  • myst-parser ==2.0.0
  • packaging ==23.2
  • Pygments ==2.17.2
  • pyparsing ==3.1.2
  • pytz ==2023.4
  • PyYAML ==6.0.1
  • requests ==2.31.0
  • snowballstemmer ==2.2.0
  • Sphinx ==7.2.6
  • sphinx-bootstrap-theme ==0.8.1
  • sphinx-rtd-theme ==1.3.0
  • sphinxcontrib-applehelp ==1.0.8
  • sphinxcontrib-devhelp ==1.0.6
  • sphinxcontrib-htmlhelp ==2.0.5
  • sphinxcontrib-jsmath ==1.0.1
  • sphinxcontrib-qthelp ==1.0.7
  • sphinxcontrib-serializinghtml ==1.1.10
  • typing_extensions ==4.10.0
  • urllib3 ==2.2.1
sbt
build.sbt
  • com.github.j5ik2o:docker-controller-scala-scalatest 1.15.34
  • com.github.j5ik2o:docker-controller-scala-dynamodb-local 1.15.34
  • software.amazon.dax:amazon-dax-client 2.0.4
  • com.github.j5ik2o:docker-controller-scala-scalatest 1.15.34
  • com.github.j5ik2o:docker-controller-scala-dynamodb-local 1.15.34
project/Dependencies.scala
  • com.iheart:ficus 1.5.2
  • org.slf4j:slf4j-api 1.7.36
  • org.slf4j:jul-to-slf4j 1.7.36
  • com.fasterxml.jackson.module:jackson-module-scala 2.16.1
  • software.amazon.awssdk:dynamodb 2.23.18
  • ch.qos.logback:logback-classic 1.4.14
  • com.amazonaws:aws-java-sdk-dynamodb 1.12.653
  • com.amazonaws:amazon-dax-client 1.0.230341.0
  • org.testcontainers:testcontainers 1.19.4
  • org.testcontainers:localstack 1.19.4
  • org.testcontainers:kafka 1.19.4
  • com.dimafeng:testcontainers-scala 0.41.2
  • com.dimafeng:testcontainers-scala-scalatest 0.41.2
  • com.dimafeng:testcontainers-scala-kafka 0.41.2
  • com.dimafeng:testcontainers-scala-localstack 0.41.2
  • net.java.dev.jna:jna 5.14.0
  • org.scala-lang.modules:scala-collection-compat 2.11.0
project/build.properties
  • sbt/sbt 1.9.8
project/plugins.sbt
  • org.scalameta:sbt-scalafmt 2.5.2
  • com.github.sbt:sbt-ci-release 1.5.12
  • pl.project13.scala:sbt-jmh 0.4.7
  • ch.epfl.scala:sbt-scalafix 0.11.1

  • Check this box to trigger a request for Renovate to run again on this repository

MessageDigest exception under high load

Under high load, this exception is sporadically thrown:

ERROR c.g.j.a.p.d.journal.DynamoDBJournal - occurred error java.lang.ArrayIndexOutOfBoundsException: arraycopy: length -15 is negative at java.base/sun.security.provider.DigestBase.engineUpdate(DigestBase.java:118) at java.base/java.security.MessageDigest$Delegate.engineUpdate(MessageDigest.java:613) at java.base/java.security.MessageDigest.update(MessageDigest.java:355) at java.base/java.security.MessageDigest.digest(MessageDigest.java:430) at com.github.j5ik2o.akka.persistence.dynamodb.journal.PartitionKeyResolver$Mod.resolve(PartitionKeyResolver.scala:46) at com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.WriteJournalDaoImpl.$anonfun$singlePutJournalRowFlow$2(WriteJournalDaoImpl.scala:670) at com.github.j5ik2o.akka.persistence.dynamodb.journal.dao.WriteJournalDaoImpl.$anonfun$singlePutJournalRowFlow$2$adapted(WriteJournalDaoImpl.scala:669)

Could this be related to https://bugs.openjdk.java.net/browse/JDK-8009635?

Documentation in scaladex show incorrect table name

Hi,

I created the DynamoDB using the documentation of the site https://index.scala-lang.org/j5ik2o/akka-persistence-dynamodb/akka-persistence-dynamodb-snapshot/1.1.4?target=_2.13 and there in the section "DynamoDB Setup" the snapshot table name is "Snapshots" see that have a "s" on the end. How I created my table using the name "Snapshots" I got this error: Caused by: java.util.concurrent.CompletionException: software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException: Requested resource not found (Service: DynamoDb, Status Code: 400, Request ID: A9T7T1IG5DRFO2B6KTSUMM3ECNVV4KQNSO5AEMVJF66Q9ASUAAJG, Extended Request ID: null)
I changed my config to use "Snapshots" but seeing the code SnapshotPluginConfig.scala i saw that default is "Snapshot".

I think that fix the documentation on scaladex site will be enough

Thanks

Suggested DynamoDB setup doesn't work with default configuration

Version v1.1.3 with suggested dynamodb structure (which is pkey, skey as hash and range keys) is not working with default configuration of the plugin. At the same time it works with legacy db structure having default configuration (no override for sort-key-column-name and legacy-config-layout is not set)

"dynamo-db-journal" is confusingly close to "dynamodb-journal"

Non-blocker.

This is, equivalently, an issue with "dynamodb-journal". I'd prefer if both had clearly distinct names. EG: something like "j5ik2o-dynamodb-journal" and "akka-dynamodb-journal" respectively.

Using a generic name like "dynamodb-journal" is not appropriate IMO. There are several different ways even beyond the current implementations akka persistence could be implemented using dynamodb. I'd prefer to have a distinct prefix for each. This assists in a system using multiple persistence plugins without confusion.

That said, I see no issues with the implementations all having the package name "akka-persistence-dynamodb" as long as they are clearly from different organizations. The plugin identifier, however, does not have a similar organization scope.

In addition: without some mechanism to alias the prior plugin identifiers this kind of name change is going to be a challenge. Not sure if such a thing exists.

Deserialization of DynamoDB event's binary messages

Hello, I try to wrtie a lambda that reacts to DynamoDB trigger on the Journal table created with akka-persistence-dynamodb. I tried deserialising the binary message from record.getDynamodb.getNewImage.get("message").getB with Apache Commons SerializationUtils but it results with the following exception:

java.io.StreamCorruptedException: invalid stream header: 0AEE0108: org.apache.commons.lang3.SerializationException

This is my lambda function:

package org.acme.project.sessionquery

import com.amazonaws.services.lambda.runtime.events.DynamodbEvent
import com.amazonaws.services.lambda.runtime.{Context, RequestHandler}
import org.acme.project.sessionquery.api._
import org.apache.commons.lang3.SerializationUtils

import scala.annotation.unused
import scala.jdk.CollectionConverters.CollectionHasAsScala

@unused
class Service extends RequestHandler[DynamodbEvent, Response] {
  override def handleRequest(event: DynamodbEvent, context: Context): Response = {
    val records = event.getRecords.asScala
    records.foreach { record =>
      record.getEventName match {
        case "INSERT" =>
          val messageBuffer = record.getDynamodb.getNewImage.get("message").getB
          val deserializedObject = SerializationUtils.deserialize(messageBuffer.array())
          println(s"Deserialized object: $deserializedObject")
      }
    }
    Ok
  }
}

I guess it has something to do with how the akka-persistence-dynamodb serializes the event before storing it in the Journal table, although I have no idea where to look for the reference. Any ideas how to tackle this?

Unable to run with dynamodb-local due to missing region configuration

When trying to run with dynamodb-local inside an integration test, I get the following exception:

akka.actor.ActorInitializationException: akka://system-persistence-test/system/dynamo-db-journal: exception during creation
    at akka.actor.ActorInitializationException$.apply(Actor.scala:202)
    at akka.actor.ActorCell.create(ActorCell.scala:696)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:547)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:569)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:293)
    at akka.dispatch.Mailbox.run(Mailbox.scala:228)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.reflect.InvocationTargetException: null
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
    at akka.util.Reflect$.instantiate(Reflect.scala:68)
    at akka.actor.ArgsReflectConstructor.produce(IndirectActorProducer.scala:101)
    at akka.actor.Props.newActor(Props.scala:212)
    at akka.actor.ActorCell.newActor(ActorCell.scala:648)
    at akka.actor.ActorCell.create(ActorCell.scala:674)
    ... 9 common frames omitted
Caused by: software.amazon.awssdk.core.exception.SdkClientException: Unable to load region from any of the providers in the chain software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain@1ed92427: [software.amazon.awssdk.regions.providers.SystemSettingsRegionProvider@a2d6269: Unable to load region from system settings. Region must be specified either via environment variable (AWS_REGION) or  system property (aws.region)., software.amazon.awssdk.regions.providers.AwsProfileRegionProvider@1ed30f08: No region provided in profile: default, software.amazon.awssdk.regions.providers.InstanceProfileRegionProvider@3420df8e: Unable to contact EC2 metadata service.]
    at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:97)
    at software.amazon.awssdk.regions.providers.AwsRegionProviderChain.getRegion(AwsRegionProviderChain.java:70)
    at software.amazon.awssdk.regions.providers.LazyAwsRegionProvider.getRegion(LazyAwsRegionProvider.java:45)
    at software.amazon.awssdk.awscore.client.builder.AwsDefaultClientBuilder.regionFromDefaultProvider(AwsDefaultClientBuilder.java:178)
    at software.amazon.awssdk.awscore.client.builder.AwsDefaultClientBuilder.resolveRegion(AwsDefaultClientBuilder.java:167)
    at software.amazon.awssdk.awscore.client.builder.AwsDefaultClientBuilder.lambda$mergeChildDefaults$0(AwsDefaultClientBuilder.java:111)
    at software.amazon.awssdk.utils.builder.SdkBuilder.applyMutation(SdkBuilder.java:61)
    at software.amazon.awssdk.core.client.config.SdkClientConfiguration.merge(SdkClientConfiguration.java:66)
    at software.amazon.awssdk.awscore.client.builder.AwsDefaultClientBuilder.mergeChildDefaults(AwsDefaultClientBuilder.java:111)
    at software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder.asyncClientConfiguration(SdkDefaultClientBuilder.java:166)
    at software.amazon.awssdk.services.dynamodb.DefaultDynamoDbAsyncClientBuilder.buildClient(DefaultDynamoDbAsyncClientBuilder.java:34)
    at software.amazon.awssdk.services.dynamodb.DefaultDynamoDbAsyncClientBuilder.buildClient(DefaultDynamoDbAsyncClientBuilder.java:22)
    at software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder.build(SdkDefaultClientBuilder.java:119)
    at com.github.j5ik2o.akka.persistence.dynamodb.journal.DynamoDBJournal.<init>(DynamoDBJournal.scala:72)
    ... 18 common frames omitted

The AWS SDK source has some hint (here) that the region must be set in the client builder when using local dynamodb. This could be explosed as a config option - I'll make a PR.

Dynamo DB table setup

I've set up my table with 'pkey' as the partition id and 'persistence-id' as the sort key, based on this description in the readme:

columns-def {
    partition-key-column-name = "pkey"
    persistence-id-column-name = "persistence-id"
    sequence-nr-column-name = "sequence-nr"
    deleted-column-name = "deleted"
    message-column-name = "message"
    ordering-column-name = "ordering"
    tags-column-name = "tags"
  }

but I get this error when accessing it

Caused by: software.amazon.awssdk.services.dynamodb.model.DynamoDbException: Query condition missed key schema element: pkey (Service: DynamoDb, Status Code: 400, Request ID: 2UV7F9V42VNLUV8HE01TAKCGB3VV4KQNSO5AEMVJF66Q9ASUAAJG)

How should one setup the Dynamo DB tables?

Journal entries do not delete with soft-delete=false

When soft delete is set to false and the retention criteria creates a snapshot while removing old entries is causes an error. This is the error in the logs:
Failed to delete events to sequence number [20] due to: requirement failed: Invalid value
The source of this is V2JournalRowReadDriver.highestSequenceNr returns 0 which causes an error on WriteJournalDaoImpl:187. This source of this error comes from the structure of the query created by V2JournalRowReadDriver.createHighestSequenceNrRequest. The filter expression that filters out deleted == true is applied after the limit == 1 is applied. So if the first response item is not deleted the response is empty. I made some changes to fix this method but I ran into additional errors when testing them when the actual delete query was executed. I also saw that during the delete process the full journal rows are read in to list the sequence numbers. A projection expression should be used so the full byte payload can be skipped which will greatly increase performance. I included that in these changes but there are other places that can be optimized as well.
V2JournalRowReadDriver:

  def highestSequenceNr(
      persistenceId: PersistenceId,
      fromSequenceNr: Option[SequenceNumber] = None,
      deleted: Option[Boolean] = None
  ): Source[Long, NotUsed] = {
    val queryRequest = createHighestSequenceNrRequest(persistenceId, fromSequenceNr, deleted)
    Source
      .single(queryRequest)
      .via(streamClient.queryFlow)
      .flatMapConcat { response =>
        if (response.sdkHttpResponse().isSuccessful) {
          val result = for {
            items <- Option(response.items())
            head <- items.asScala.headOption
            sequence <- Option(head.get(pluginConfig.columnsDefConfig.sequenceNrColumnName))
          } yield sequence.n().toLong
          Source.single(result.getOrElse(0L))
        } else {
          val statusCode = response.sdkHttpResponse().statusCode()
          val statusText = response.sdkHttpResponse().statusText()
          Source.failed(new IOException(s"statusCode: $statusCode" + statusText.asScala.fold("")(s => s", $s")))
        }
      }.withAttributes(logLevels)
  }
  private def createHighestSequenceNrRequest(
      persistenceId: PersistenceId,
      fromSequenceNr: Option[SequenceNumber] = None,
      deleted: Option[Boolean] = None
  ): QueryRequest = {
    val limit = deleted.map(_ => Int.MaxValue).getOrElse(1)
    QueryRequest
      .builder()
      .tableName(pluginConfig.tableName)
      .indexName(pluginConfig.getJournalRowsIndexName)
      .keyConditionExpression(
        fromSequenceNr.map(_ => "#pid = :id and #snr >= :nr").orElse(Some("#pid = :id")).orNull
      )
      .filterExpression(deleted.map(_ => "#d = :flg").orNull)
      .projectionExpression((Seq("#snr") ++ deleted.map(_ => "#d")).mkString(","))
      .expressionAttributeNames(
        (Map(
          "#pid" -> pluginConfig.columnsDefConfig.persistenceIdColumnName,
          "#snr" -> pluginConfig.columnsDefConfig.sequenceNrColumnName) ++
          deleted.map(_ => Map("#d" -> pluginConfig.columnsDefConfig.deletedColumnName)).getOrElse(Map.empty)
          ).asJava
      )
      .expressionAttributeValues(
        (Map(
          ":id" -> AttributeValue.builder().s(persistenceId.asString).build()
        ) ++ deleted
          .map(d => Map(":flg" -> AttributeValue.builder().bool(d).build())).getOrElse(Map.empty) ++ fromSequenceNr
          .map(nr => Map(":nr" -> AttributeValue.builder().n(nr.asString).build())).getOrElse(Map.empty)).asJava
      ).scanIndexForward(false)
      .limit(limit)
      .build()
  }

configuration question

The configuration example in the readme doesn't seem to cover variables for items to specify where the dynamodb endpoint is. Is that because you use the default configuration for the DynamoDB client?

That is;

DynamoDbClient client = DynamoDbClient.create();

which means the environment must be populated with the default environment variables, e.g.

AWS_ACCESS_KEY_ID=your_access_key_id 
AWS_SECRET_ACCESS_KEY=your_secret_access_key
AWS_REGION=your_aws_region

or via any of the other methods listed on the AWS SDK documentation site?

Query plugin used to read "eventsByTag" from the journal is complaining about Bad Request

When I've tried to use the "eventsByTag" feature in the Query plugin, I found an error with the following log message:

[info] [2020-02-04 13:26:34,641] [WARN] [com.github.j5ik2o.akka.persistence.dynamodb.query.JournalSequenceActor] [bla-akka.actor.default-dispatcher-47] [akka://[email protected]:25520/system/j5ik2o.dynamo-db-read-journal.akka-persistence-dynamodb-journal-sequence-actor] - Failed to query max ordering id because of java.util.concurrent.CompletionException: software.amazon.awssdk.services.dynamodb.model.DynamoDbException: Either the KeyConditions or KeyConditionExpression parameter must be specified in the request. (Service: DynamoDb, Status Code: 400, Request ID: 6281d9df-6c6d-4f6f-b733-569e6b2cd603), retrying in 1 minute

Then I look at the code and I found that there is a Query in ReadJournalDaoImpl.scala which does not define any KeyConditions or KeyConditionExpression parameter which is required that means DynamoDB will reject all of this query as Bad Request, the code is:

val queryRequest = QueryRequest
            .builder().tableName(tableName).select(Select.SPECIFIC_ATTRIBUTES).attributesToGet(
              columnsDefConfig.orderingColumnName
            ).limit(queryBatchSize).exclusiveStartKeyAsScala(lastEvaluatedKey).build()

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.