Code Monkey home page Code Monkey logo

eventstore.jvm's Introduction

Warning

DEPRECATION NOTICE:

  • EventStoreDB version 23.10.x is the last OSS version to support the tcp protocol based client.
  • This project is no longer maintained. We recommend moving to EventStoreDB-Client-Java for ongoing updates and support.

Event Store JVM Client Continuous Integration Coverage Status Version

Scala 2.13.7 / 2.12.15
Akka 2.6.17
Event Store v5.x, v20.x, v21.x, v22.x, and v23.x are supported

Please note that TCP protocol is not supported on EventStoreDB v24.2 and higher. Only the versions listed above are compatible.

We have two APIs available:

  • Calling methods on EsConnection

We are using scala.concurrent.Future for asynchronous calls, however it is not friendly enough for Java users. In order to make Java devs happy and not reinvent a wheel, we propose to use tools invented by Akka team. Check it out

final EsConnection connection = EsConnectionFactory.create(system);
final Future<Event> future    = connection.readEvent("my-stream", new EventNumber.Exact(0), false, null);
val connection = EsConnection(system)
val future     = connection(ReadEvent(EventStream.Id("my-stream"), EventNumber.First))
  • Sending messages to eventstore.ConnectionActor
final ActorRef connection = system.actorOf(ConnectionActor.getProps());
final ReadEvent readEvent = new ReadEventBuilder("my-stream").first().build();
connection.tell(readEvent, null);
val connection = system.actorOf(ConnectionActor.props())
connection ! ReadEvent(EventStream.Id("my-stream"), EventNumber.First)

Setup

Sbt

libraryDependencies += "com.geteventstore" %% "eventstore-client" % "7.4.0"

Maven

<dependency>
    <groupId>com.geteventstore</groupId>
    <artifactId>eventstore-client_${scala.version}</artifactId>
    <version>7.4.0</version>
</dependency>

Java examples

Read event

import java.net.InetSocketAddress;
import akka.actor.*;
import akka.actor.Status.Failure;
import akka.event.*;
import eventstore.j.*;
import eventstore.core.*;
import eventstore.akka.Settings;
import eventstore.akka.tcp.ConnectionActor;

public class ReadEventExample {

    public static void main(String[] args) {
        final ActorSystem system = ActorSystem.create();
        final Settings settings = new SettingsBuilder()
                .address(new InetSocketAddress("127.0.0.1", 1113))
                .defaultCredentials("admin", "changeit")
                .build();
        final ActorRef connection = system.actorOf(ConnectionActor.getProps(settings));
        final ActorRef readResult = system.actorOf(Props.create(ReadResult.class));

        final ReadEvent readEvent = new ReadEventBuilder("my-stream")
                .first()
                .resolveLinkTos(false)
                .requireMaster(true)
                .build();

        connection.tell(readEvent, readResult);
    }


    public static class ReadResult extends AbstractActor {
        final LoggingAdapter log = Logging.getLogger(getContext().system(), this);

        @Override
        public Receive createReceive() {
            return receiveBuilder()
                    .match(ReadEventCompleted.class, m -> {
                        final Event event = m.event();
                        log.info("event: {}", event);
                        context().system().terminate();
                    })
                    .match(Failure.class, f -> {
                        final EsException exception = (EsException) f.cause();
                        log.error(exception, exception.toString());
                        context().system().terminate();
                    })
                    .build();
        }
    }
}

Write event

import java.util.UUID;
import akka.actor.*;
import akka.event.*;
import eventstore.j.*;
import eventstore.core.*;
import eventstore.akka.tcp.ConnectionActor;

public class WriteEventExample {

    public static void main(String[] args) {

        final ActorSystem system   = ActorSystem.create();
        final ActorRef connection  = system.actorOf(ConnectionActor.getProps());
        final ActorRef writeResult = system.actorOf(Props.create(WriteResult.class));

        final EventData event = new EventDataBuilder("my-event")
                .eventId(UUID.randomUUID())
                .data("my event data")
                .metadata("my first event")
                .build();

        final WriteEvents writeEvents = new WriteEventsBuilder("my-stream")
                .addEvent(event)
                .expectAnyVersion()
                .build();

        connection.tell(writeEvents, writeResult);
    }

    public static class WriteResult extends AbstractActor {

        final LoggingAdapter log = Logging.getLogger(getContext().system(), this);

        @Override
        public Receive createReceive() {
            return receiveBuilder()
                    .match(WriteEventsCompleted.class, m -> {
                        log.info("range: {}, position: {}", m.numbersRange(), m.position());
                        context().system().terminate();
                    })
                    .match(Status.Failure.class, f -> {
                        final EsException exception = (EsException) f.cause();
                        log.error(exception, exception.toString());
                    })
                    .build();
        }

    }
}

Subscribe to All

import java.io.Closeable;
import akka.actor.ActorSystem;
import eventstore.j.*;
import eventstore.core.IndexedEvent;
import eventstore.akka.SubscriptionObserver;

public class SubscribeToAllExample {
    public static void main(String[] args) {
        final ActorSystem system = ActorSystem.create();
        final EsConnection connection = EsConnectionFactory.create(system);
        final Closeable closeable = connection.subscribeToAll(new SubscriptionObserver<IndexedEvent>() {
            @Override
            public void onLiveProcessingStart(Closeable subscription) {
                system.log().info("live processing started");
            }

            @Override
            public void onEvent(IndexedEvent event, Closeable subscription) {
                system.log().info(event.toString());
            }

            @Override
            public void onError(Throwable e) {
                system.log().error(e.toString());
            }

            @Override
            public void onClose() {
                system.log().error("subscription closed");
            }
        }, false, null);
    }
}

Build event

import java.util.UUID;
import eventstore.core.EventData;
import eventstore.j.EventDataBuilder;

public class EventDataBuilderExample {

    final EventData empty = new EventDataBuilder("eventType").build();

    final EventData binary = new EventDataBuilder("binary")
            .eventId(UUID.randomUUID())
            .data(new byte[]{1, 2, 3, 4})
            .metadata(new byte[]{5, 6, 7, 8})
            .build();

    final EventData string = new EventDataBuilder("string")
            .eventId(UUID.randomUUID())
            .data("data")
            .metadata("metadata")
            .build();

    final EventData json = new EventDataBuilder("json")
            .eventId(UUID.randomUUID())
            .jsonData("{\"data\":\"data\"}")
            .jsonMetadata("{\"metadata\":\"metadata\"}")
            .build();
}

Scala examples

Read event

import java.net.InetSocketAddress
import _root_.akka.actor._
import _root_.akka.actor.Status.Failure
import eventstore.akka.tcp.ConnectionActor

object ReadEventExample extends App {
  val system = ActorSystem()

  val settings = Settings(
    address = new InetSocketAddress("127.0.0.1", 1113),
    defaultCredentials = Some(UserCredentials("admin", "changeit"))
  )

  val connection = system.actorOf(ConnectionActor.props(settings))
  implicit val readResult = system.actorOf(Props[ReadResult]())

  connection ! ReadEvent(EventStream.Id("my-stream"), EventNumber.First)

  class ReadResult extends Actor with ActorLogging {
    def receive = {
      case ReadEventCompleted(event) =>
        log.info("event: {}", event)
        shutdown()

      case Failure(e: EsException) =>
        log.error(e.toString)
        shutdown()
    }

    def shutdown(): Unit = { context.system.terminate(); () }
  }
}

Write event

import _root_.akka.actor.Status.Failure
import _root_.akka.actor.{ ActorLogging, Actor, Props, ActorSystem }
import eventstore.core.util.uuid.randomUuid
import eventstore.akka.tcp.ConnectionActor

object WriteEventExample extends App {

  val system      = ActorSystem()
  val connection  = system.actorOf(ConnectionActor.props())
  val event       = EventData("my-event", eventId = randomUuid, data = Content("my event data"), metadata = Content("my first event"))

  implicit val writeResult = system.actorOf(Props(WriteResult))

  connection ! WriteEvents(EventStream.Id("my-stream"), List(event))

  case object WriteResult extends Actor with ActorLogging {

    def receive = {
      case WriteEventsCompleted(range, position) =>
        log.info("range: {}, position: {}", range, position)
        shutdown()

      case Failure(e: EsException) =>
        log.error(e.toString)
        shutdown()
    }

    def shutdown(): Unit = { context.system.terminate();  () }
  }
}

Start transaction

import _root_.akka.actor.{ActorSystem, Props}
import eventstore.core.util.uuid.randomUuid
import eventstore.akka.tcp.ConnectionActor
import eventstore.akka.TransactionActor._

object StartTransactionExample extends App {
  val system = ActorSystem()
  val connection = system.actorOf(ConnectionActor.props(), "connection")

  val kickoff = Start(TransactionStart(EventStream.Id("my-stream")))
  val transaction = system.actorOf(TransactionActor.props(connection, kickoff), "transaction")
  implicit val transactionResult = system.actorOf(Props[TransactionResult], "result")

  val data = EventData("transaction-event", eventId = randomUuid)

  transaction ! GetTransactionId // replies with `TransactionId(transactionId)`
  transaction ! Write(data) // replies with `WriteCompleted`
  transaction ! Write(data) // replies with `WriteCompleted`
  transaction ! Write(data) // replies with `WriteCompleted`
  transaction ! Commit // replies with `CommitCompleted`
}

Count all events

import _root_.akka.actor._
import scala.concurrent.duration._
import eventstore.akka.tcp.ConnectionActor

object CountAll extends App {
  val system = ActorSystem()
  val connection = system.actorOf(ConnectionActor.props(), "connection")
  val countAll = system.actorOf(Props[CountAll](), "count-all")
  system.actorOf(SubscriptionActor.props(connection, countAll, None, None, Settings.Default), "subscription")
}

class CountAll extends Actor with ActorLogging {
  context.setReceiveTimeout(1.second)

  def receive = count(0)

  def count(n: Long, printed: Boolean = false): Receive = {
    case _: IndexedEvent       => context become count(n + 1)
    case LiveProcessingStarted => log.info("live processing started")
    case ReceiveTimeout if !printed =>
      log.info("count {}", n)
      context become count(n, printed = true)
  }
}

Future-like api

import _root_.akka.actor.ActorSystem
import scala.concurrent.Future
import eventstore.core.util.uuid.randomUuid

object EsConnectionExample extends App {
  val system = ActorSystem()

  import system.dispatcher

  val connection = EsConnection(system)
  val log = system.log

  val stream = EventStream.Id("my-stream")

  val readEvent: Future[ReadEventCompleted] = connection(ReadEvent(stream))
  readEvent foreach { x =>
    log.info(x.event.toString)
  }

  val readStreamEvents: Future[ReadStreamEventsCompleted] = connection(ReadStreamEvents(stream))
  readStreamEvents foreach { x =>
    log.info(x.events.toString())
  }

  val readAllEvents: Future[ReadAllEventsCompleted] = connection(ReadAllEvents(maxCount = 5))
  readAllEvents foreach { x =>
    log.info(x.events.toString())
  }

  val writeEvents: Future[WriteEventsCompleted] = connection(WriteEvents(stream, List(EventData("my-event", eventId = randomUuid))))
  writeEvents foreach { x =>
    log.info(x.numbersRange.toString)
  }
}

EventStoreExtension

Most common use case is to have a single Event Store connection per application. Thus you can use our akka extension, it will make sure you have a single instance of connection actor.

EventStoreExtension(system).actor ! ReadEvent(EventStream.Id("stream"))
EventStoreExtension(system).connection(ReadEvent(EventStream.Id("stream")))

Streams

The client provides Akka Streams interface for EventStore subscriptions. You can find two methods allStreamsSource and streamSource available in Java and Scala APIs.

Here is a short example on how to use it:

List all streams

import _root_.akka.actor.ActorSystem

object ListAllStreamsExample extends App {
  implicit val system = ActorSystem()
  import system.dispatcher

  val connection = EventStoreExtension(system).connection
  val source = connection.streamSource(EventStream.System.`$streams`, infinite = false, resolveLinkTos = true)

  source
    .runForeach { x => println(x.streamId.streamId) }
    .onComplete { _ => system.terminate() }
}

Reactive Streams

You can use generic Reactive Streams Publisher interface for EventStore subscriptions, by converting an Akka Stream to Publisher. See: Integrating Akka Streams with Reactive Streams

Here is a short example on how to accomplish that:

import _root_.akka.actor.ActorSystem
import _root_.akka.stream.scaladsl._
import org.reactivestreams.{Publisher, Subscriber}
import scala.concurrent.duration._

object MessagesPerSecondReactiveStreams extends App {
  implicit val system = ActorSystem()

  val connection = EventStoreExtension(system).connection

  val publisher: Publisher[String] = connection.allStreamsSource()
    .groupedWithin(Int.MaxValue, 1.second)
    .map { xs => f"${xs.size.toDouble / 1000}%2.1fk m/s" }
    .runWith(Sink.asPublisher(fanout = false))

  val subscriber: Subscriber[String] = Source.asSubscriber[String]
    .to(Sink.foreach(println))
    .run()

  publisher.subscribe(subscriber)
}

Configuration

Default client settings defined in core reference.conf and client reference.conf. You can override them via own application.conf put in the src/main/resources, the same way you might already do for akka. We are using the same approach using the same configuration library.

Cluster

It is possible to use client against cluster of Event Store. For this you need to configure client via eventstore.cluster section in core reference.conf or ClusterSettings. Using application.conf for configuration is more preferable option.

eventstore.jvm's People

Contributors

ahjohannessen avatar bardurdam avatar danleech avatar dstranger avatar feynmanliang avatar gregoryyoung avatar jen20 avatar michielboekhoff avatar miguelsantoszup avatar pawelkaczor avatar rsowald avatar scala-steward avatar t3hnar avatar w1am avatar ylorph avatar zacacj avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

eventstore.jvm's Issues

Uberjar with ESjvmclient and Akka fails on startup

The problem is in how Typesafe config is used in both Akka and ESJvmClient. Both expect the configuration to be found in application.conf or reference.conf, but only a single reference.conf remains after being merged into the uberjar.

ConfigFactory.load() should only be invoked in case the configuration isn't supplied explicitly, as in Akka. Otherwise class initialization fails and there is no other option but to supply the eventstore configuration block in the main application.conf file.

EDIT: Fails with an ExceptionInInitializerError as ConfigFactory.load is invoked in a static initializer when eventstore.Settings$ class is loaded.

Reconnection to ES issue. EsException(NotHandled(NotReady))

After the application looses the connection to ES there are messages in the logs:

[INFO] [09/25/2014 09:46:31.026] [default-akka.actor.default-dispatcher-8] [akka://default/deadLetters] Message [akka.actor.Status$Failure] from Actor[akka://default/user/$a#-2045056086] to Actor[akka://default/deadLetters] was not delivered. [10] dead letters encountered, no more dead letters will be logged. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Then:

[WARN] [09/25/2014 09:46:31.032] [default-akka.actor.default-dispatcher-9] [akka://default/user/$a] connection lost to /192.168.33.10:1113: peer closed
[INFO] [09/25/2014 09:46:31.033] [default-akka.actor.default-dispatcher-9] [akka://default/user/$a] reconnecting to /192.168.33.10:1113 in 250 milliseconds
[ERROR] [09/25/2014 09:46:31.301] [default-akka.actor.default-dispatcher-4] [akka://default/user/$a] connection failed to /192.168.33.10:1113
[INFO] [09/25/2014 09:46:31.302] [default-akka.actor.default-dispatcher-4] [akka://default/user/$a] reconnecting to /192.168.33.10:1113 in 500 milliseconds
[ERROR] [09/25/2014 09:46:31.817] [default-akka.actor.default-dispatcher-4] [akka://default/user/$a] connection failed to /192.168.33.10:1113

And so on until the ES comes up. Then the logs are appended with:

09:46:47.968 [default-akka.actor.default-dispatcher-4] ERROR l.b.n.a.e.c.a.EventStoreClientConnection - Error in subscription: lt.bas.nano.app.eventstore.connector.disruptor.DynamicDisruptor$1@7f9ea30e
eventstore.EsException: null
[WARN] [09/25/2014 09:46:47.974] [default-akka.actor.default-dispatcher-6] [akka://default/user/$a] can not deliver Failure(EsException(NotHandled(NotReady))), sender not found for correlationId: 9918ac56-04b9-4ec8-a02c-c8c8b5ccbec3
[ERROR] [09/25/2014 09:46:47.982] [default-akka.actor.default-dispatcher-10] [akka://default/user/$b] Monitored actor [Actor[akka://default/user/$c#-944501135]] terminated (akka.actor.DeathPactException)
09:46:47.984 [default-akka.actor.default-dispatcher-4] DEBUG l.b.n.a.e.c.a.EventStoreClientConnection - Closed: lt.bas.nano.app.eventstore.connector.disruptor.DynamicDisruptor$1@7f9ea30e, total: 0

Does it have anything in common with the line

// TODO reconnect on EsException(NotHandled(NotReady))

in ConnectionActor.scala?

Operation hasn't got response from server for PackOut

Getting a below exception when client application trying to read events from EventStore. The exception not always occur. I am running an application inside a docker container, I use a EventStore client ver 2.0.3. I already try setting a operation timeout property to 1m but its doesn't solve a problem.

Operation hasn't got response from server for PackOut(ReadStreamEvents(Stream(auction),EventNumber(0),500,Forward,false,true),9a1c4692-6a1a-4021-bb41-b3c80ed9e748,Some(UserCredentials(admin,***)))

New messages for competing

On the competing branch there are a few new messages. Not much to implement (mostly the crud methods subscription data shows up in same way eventappeared) can be acked/nacked. Just a placeholder for later.

EventStore.JVM example?

Is there a good EventStore.JVM example out there?
Different implementations of event sourcing is using different naming (for example - projection is the same as query and view)
Is there an architecture picture that shows which naming EventStore.JVM is using and is the naming reflected in the code?

Exception when reading events from a missing stream

Is this by design?

This might as well happen because the jvmclient (0.1) protobuffer spec is out of sync with my eventstore build (latest master). Should I be building the lib from master?

Eventstore API returns:

message ReadStreamEventsCompleted {

        enum ReadStreamResult {
                Success = 0;
                NoStream = 1;
                StreamDeleted = 2;
                NotModified = 3;
                Error = 4;
                AccessDenied = 5;
        }

        ...
}

so I would expect the operation to succeed with an appropriate ReadStreamResult.

build on windows fails (v8, due missing directory)

@robvdlv commented on Tue Feb 10 2015

Attempting a from-scratch build of dev branch @87e7c77e913377ce4619e47db6b5d5e6373688b5 on Windows 8.1. AFAIK I have met all prerequisites in terms of what needs to be installed and available on path. Using powershell to build, with full rights.

See below for details.
As far as I can see, there seems to be a directory name mismatch.

The build looks for folder <eventstore>\v8\build\gyp_v8.
What actually exists on disk however, is <eventstore>\v8\build\gyp.

C:\dev\oss\EventStore [dev...origin/dev]> .\build.cmd
C:\dev\oss\EventStore\src\libs\x64\js1.dll
Cannot run a 'quick' build - js1.dll is not found at C:\dev\oss\EventStore\src\libs\x64
Running full build instead
Build Configuration
-------------------
Target: full
Platform: x64
Configuration: release
Version: 0.0.0.0
Visual Studio Version will be autodetected
Additional Defines:


All dependencies already met.

No specific version of Visual Studio provided, using 2013
C:\dev\oss\EventStore\v8\third_party\python_26\python.exe: can't open file 'C:\dev\oss\EventStore\v8\build\gyp_v8': [Errno 2] No such file or directory
Exec: (C:\dev\oss\EventStore\v8)\nFailed executing  & $pythonExecutable $gypFile $vsVersionParameter $gypPlatformParameter
At C:\dev\oss\EventStore\scripts\build-windows\build-functions.ps1:38 char:9
+         throw ("Exec: (" + $pwd + ")\n" + $ErrorMessage)
+         ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    + CategoryInfo          : OperationStopped: (Exec: (C:\dev\o...tformParameter :String) [], RuntimeException
    + FullyQualifiedErrorId : Exec: (C:\dev\oss\EventStore\v8)\nFailed executing  & $pythonExecutable $gypFile $vsVersionParameter $gypPlatformParameter


@jen20 commented on Tue Feb 10 2015

Fairly sure I've seen this before when some of the dependencies are incomplete - we only check that there's a repository there (svn in the case of most of the dependencies), not that it's at the correct revision. Can you try deleting the entire v8 directory from the repository root and rerunning the build?


@robvdlv commented on Tue Feb 10 2015

Definitely. Will try your suggestion and let you know the outcome asap.


@robvdlv commented on Tue Feb 10 2015

James you were right.

Apparently SVN failed to checkout the repo successfully the first time. Cleanup and retry did the job.

Thanks!


@jen20 commented on Tue Feb 10 2015

This has become an increasingly frequent problem since Google seem unable to install a valid certificate on the site hosting the Chrome and V8 source... Hopefully you should be building now!

Rename this repo to EventStore.Akka or similar

To better describe what this is it would be good to rename this repo in a similar vein to all the others - I'd suggest EventStore.Akka though definitely open to suggestions.

Likely this will only affect @t3hnar as someone with remotes set up - as detailed here, GitHub will forward remotes, but it's probably better to go through and change the remote paths to whatever new name is chosen.

Any advances on EventStore.Akka?

Position of written event is always zero

EventStore v3.5.0
Akka Client v2.2.1

When writing events using "ask(connection, message, writeTimeout)", but not immediately joining on the returned Future, I often get that the WriteEventsCompleted message always has the same range and position:
Some(EventNumber.Range(0 to 3)), position: Some(Position(0))

Isn't position supposed to increase every time an event is written?

The client in this case is doing a data import, but I limit it to 100 outstanding Futures (i.e. even if there is no join on the Future there can only be max 100 Futures that are not yet completed, if that helps at all).

Subscribe to All Example

The "Subscribe to All" example uses an EsConnection and the implementation doesn't send messages like the other examples (read/write). Is it possible to send messages to subscribe so that I can share the same connection, used in read/write?

I tried something like this, but it doesn't work:

{
final ActorRef subscriptionObserver = _system.actorOf(Props.create(SubscriptionObserver.class));

    final SubscribeTo subscribeTo = new SubscribeToBuilder()  
    .toStream(stream)
    .build(); 

    _connection.tell(subscribeTo, subscriptionObserver);   

}

public static class SubscriptionObserver extends UntypedActor {        
    public void onReceive(Object message) throws Exception {
            System.out.println("Got Message");
    }
}

Anonymous actors should be avoided if possible

While monitoring my app using the Kamon I got confused by names of anonymous actors showing up on the dashboard. The actors were constantly processing some messages. By examining the log I discovered that the actors were created by the Eventstore.JVM:

DEBUG akka://sales/user/$a/$a - started (eventstore.pipeline.TcpPipelineHandler@1f5502f7)
DEBUG akka://sales/user/$a/$a - now watched by Actor[akka://sales/user/$a#-1191817804]
DEBUG akka://sales/system/IO-TCP/selectors/$a/1 - now watched by Actor[akka://sales/user/$a#-1191817804]
DEBUG akka://sales/system/IO-TCP/selectors/$a/1 - now watched by Actor[akka://sales/user/$a/$a#1665270900]
DEBUG akka://sales/user - now supervising Actor[akka://sales/user/$b#-1183869946]
DEBUG akka://sales/user/$a/$a - now watched by Actor[akka://sales/system/IO-TCP/selectors/$a/1#1171102288]
DEBUG akka://sales/user/$b - started (eventstore.StreamPublisher@cf7dbde)
DEBUG akka://sales/user/$b - now watched by Actor[akka://sales/user/$a#-1191817804]
DEBUG akka://sales/user/$b - no longer watched by Actor[akka://sales/user/$a#-1191817804]
DEBUG akka://sales/user/$b - now watched by Actor[akka://sales/user/$a#-1191817804]

Is it possible to give some names to these actors?

build: reason for withSources

I noticed that the published jar has dependency on a series of sources. I believe it is recommended that one should use updateClassifiers in sbt in order to fetch sources and javadocs.

Is it possible that you could remove .map(_.withSources()) and publish a new version? Reason for asking is that I get a few of those sources while using sbt-native-packager to package my applications and I would rather not do manual filtering on x-sources.jar for that.

    libraryDependencies ++= Seq(
      Akka.actor, Akka.testkit, protobuf,
      typesafeConfig, codec, mockito,
      Joda.time, Joda.convert,
      Specs2.core, Specs2.mock,
      Spray.json, Spray.client,
      AkkaStream.stream, AkkaStream.tck, AkkaStream.testkit,
      ReactiveStreams.streams, ReactiveStreams.tck).map(_.withSources())
  )

Require master flag taken from Default settings rather than connection settings

WriteEvents in Message.scala uses Settings.Default.requireMaster as default value for requireMaster and EsConnectionImpl does not pass an override.

For now we can work around this by using the config files, but as we have another config system in place, we'd really prefer that the setting from the Settings object passed to EsConnectionFactory.create would be used (as expected).

This appears to affect all operations, not just writes.

Support for competing consumers

I've started looking at the new competing consumers / persistent subscriptions available in EventStore, but cannot find any sign of them in this JVM client. I'd love to be able to use them but will need to do so from Java - is there a plan/timeline for support?

Connecting to a subscription group and ack/nak would be enough for my project atm, creating subscription groups could be done separately.

A few comments

I took a brief look at the API. A few comments. It should perhaps generate separate tickets if you find them relevant.

  • I suggest that you use Typesafe config library and define default configuration in reference.conf
  • Is scalabuff worth it? We used scalabuff in Akka cluster but switched to plain protobuf to reduce dependency complexity.
  • logging can be done more efficiently (reduced garbage) by using the {} placeholders, e.g. log.debug(s"add sender $sender for $x") should be log.debug("add sender {} for {}", sender, x)
  • Enumeration vs. objects and sealed trait. I think the trend is to not use scala Enumeration.
  • EventNumber Int. Is Int enough or should it be a Long? Maybe I don't understand its purpose.

Events continue to be received if subscription dropped before accepting SubscribeToAllCompleted

[ERROR] [03/14/2014 15:41:02.179] [default-akka.actor.default-dispatcher-15] [akka://default/user/$e] Monitored actor [Actor[akka://d
efault/user/$d#1527744792]] terminated (akka.actor.DeathPactException)
[WARN] [03/14/2014 15:41:02.179] [default-akka.actor.default-dispatcher-17] [akka://default/user/$a] can not deliver SubscribeToAllCo
mpleted(1908783972), sender not found for correlationId: 0a160195-9fbf-4cae-a561-38ab18627579

This subscription now receives events until the EventStore connection is closed.

[WARN] [03/14/2014 15:41:34.873] [default-akka.actor.default-dispatcher-3] [akka://default/user/$a] can not deliver StreamEventAppear
ed(IndexedEvent(EventRecord(SystemStream($stats-127.0.0.1:2113),EventNumber(128807),EventData($statsCollected,417467d0-7d30-563c-cef3-2a358ee4379d,Content(Byte
String(..),ContentType.Binary}),Content(ByteString(),ContentType.Binary}))),Position(1908813534,1908798809))), sender not found for correlationId: 0a160195-9fb
f-4cae-a561-38ab18627579
[WARN] [03/14/2014 15:42:04.911] [default-akka.actor.default-dispatcher-14] [akka://default/user/$a] can not deliver StreamEventAppea
red(IndexedEvent(EventRecord(SystemStream($stats-127.0.0.1:2113),EventNumber(128808),EventData($statsCollected,450d072c-3a2f-b323-26b8-7601c1275b8f,Content(Byt
eString(..),ContentType.Binary}),Content(ByteString(),ContentType.Binary}))),Position(1908828330,1908813596))), sender not found for correlationId: 0a160195-9f
bf-4cae-a561-38ab18627579

subscription to all

Hi,

I try to subscribe to events from geteventstore, and to select them using the streamId prefix i use for my aggregat (e.g ticket & clientAccount).
When i ran this code, it's ok for prefix "ticket" (c.f isTicketEvent method) but I found no event for prefix "clientAccount" (c.f isClientAccountEvent).
In GetEventstore database I have both streams starting with "ticket" and "clientAccount" prefix.
The last 'case' in receive method is here to test if i have other events than "ticket" but not.

Any idea to understand what happened ?

Thx for the response.

class AnstelEventListener(settings: Settings = EventstoreProdConnection.settings) extends Actor with ActorLogging {

    val connection = context.system.actorOf(ConnectionActor.props(settings), "connection")

    override def preStart() = {
        log.info("AnstelEventListenner actor started !")
        context.system.actorOf(SubscriptionActor.props(connection, self), "subscription")
    }

    override def receive = {
        case IndexedEvent(event, _) if(isTicketEvent(event)) => log.info("ticket")

        case IndexedEvent(event, _) if(isClientAccountEvent(event)) => log.info("clientAccount")

        case IndexedEvent(event, _) if(!event.streamId.streamId.startsWith("ticket") &&   
                          !event.streamId.streamId.startsWith("$")) =>  
      log.info("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$  " + event.streamId.streamId)
    }
   def isTicketEvent(event: Event): Boolean = event.streamId.streamId.startsWith("ticket")
   def isClientAccountEvent(event: Event): Boolean = event.streamId.streamId.startsWith("clientAccount")
}

Regarding subscription termination

Is it safe to assume that a subscription can be terminated by sending a Kill to an actor?

More generally, are there any supervision trees which will appear in the client (e.g. something that will restart the subscription when it's killed)?

EsConnection.subscribe catchup exhibits multiple problems

During startup of my application yesterday I had the following output to stdout. I subscribe to the allstream so it is processing for about a minute before it reaches the live events state.

My interpretation of the the log output is:

  • The library is requesting so large batches that it triggers internal timeouts.
  • An actor then apparently finishes receiving its events but cannot send it anywhere.
  • It cannot send it anywhere so it prints it all (about 10 MB !) to stdout.

All of the points above seem a bit problematic to me. Do you agree ?

I am using 'com.geteventstore:eventstore-client_2.11:2.2.0'.

The logged ouput is:


[INFO] [06/13/2016 16:34:26.978] [EventStoreActorSystem-akka.actor.default-dispatcher-4] [akka://EventStoreActorSystem/user/$a] Connected to #####
[WARN] [06/13/2016 16:35:10.767] [EventStoreActorSystem-akka.actor.default-dispatcher-13] [akka://EventStoreActorSystem/user/$a] Connection lost to #####: no heartbeat within 2 seconds
[INFO] [06/13/2016 16:35:11.056] [EventStoreActorSystem-akka.actor.default-dispatcher-2] [akka://EventStoreActorSystem/user/$a] Connected to #####
[INFO] [06/13/2016 16:35:11.746] [EventStoreActorSystem-akka.actor.default-dispatcher-3] [akka://EventStoreActorSystem/user/$a] closing connection to #####
[WARN] [06/13/2016 16:35:13.197] [EventStoreActorSystem-akka.actor.default-dispatcher-9] [akka://EventStoreActorSystem/user/$a] Connection lost to #####: connection actor died
[INFO] [06/13/2016 16:35:13.494] [EventStoreActorSystem-akka.actor.default-dispatcher-9] [akka://EventStoreActorSystem/user/$a] Connected to #####
[WARN] [06/13/2016 16:35:13.544] [EventStoreActorSystem-akka.actor.default-dispatcher-3] [akka://EventStoreActorSystem/user/$a] Connection lost to #####: peer closed
[INFO] [06/13/2016 16:35:13.842] [EventStoreActorSystem-akka.actor.default-dispatcher-13] [akka://EventStoreActorSystem/user/$a] Connected to #####
[WARN] [06/13/2016 16:35:16.550] [EventStoreActorSystem-akka.actor.default-dispatcher-9] [akka://EventStoreActorSystem/user/$a] Cannot deliver ReadAllEventsCompleted(List(IndexedEvent(EventRecord(SystemStream($stats-0.0.0.0:2113),EventNumber(13324),EventData($statsCollected,43f1f344-701f-d37a-5d16-66cc39e72f94,Content({
....
10 MB of logs here
....
},ContentType.Json),Content(ByteString(),ContentType.Json)),Some(2016-06-04T07:12:35.566+02:00)),Position(292328028))),Position(281767231),Position(292349164),Forward), client not found for correlationId: 76e3beeb-93c2-48b8-9611-59c37b362ed8
[WARN] [06/13/2016 16:35:16.556] [EventStoreActorSystem-akka.actor.default-dispatcher-9] [akka://EventStoreActorSystem/user/$a] Connection lost to #####: peer closed
[INFO] [06/13/2016 16:35:16.563] [EventStoreActorSystem-akka.actor.default-dispatcher-4] [akka://EventStoreActorSystem/user/$a/$d] Message [eventstore.pipeline.TcpPipelineHandler$Init$Command] from Actor[akka://EventStoreActorSystem/user/$a#-761697866] to Actor[akka://EventStoreActorSystem/user/$a/$d#-201202823] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [06/13/2016 16:35:16.850] [EventStoreActorSystem-akka.actor.default-dispatcher-9] [akka://EventStoreActorSystem/user/$a] Connected to #####

aborting connection (buffer overrun)

677
678
679
680
681
682
[INFO] [11/13/2014 10:31:47.252] [default-akka.actor.default-dispatcher-9] [akka://default/user/$a] closing connection to /172.16.1.71:1113
[INFO] [11/13/2014 10:31:47.252] [default-akka.actor.default-dispatcher-14] [akka://default/system/IO-TCP/selectors/$a/0] Message [akka.io.SelectionHandler$ChannelReadable$] from Actor[akka://default/deadLetters] to Actor[akka://default/system/IO-TCP/selectors/$a/0#-179609393] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
683
[INFO] [11/13/2014 10:31:47.254] [default-akka.actor.default-dispatcher-13] [akka://default/user/$a/$a] Message [eventstore.pipeline.TcpPipelineHandler$Init$Command] from Actor[akka://default/user/$a#-1492706652] to Actor[akka://default/user/$a/$a#-449091588] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
684
685

Subscribing to a stream containing links to deleted events crashes StreamSubscriptionActor

When subscribing with resolveLinkTos=true to a stream that contains event links to events that have been deleted, the StreamSubscriptionActor will terminate with an IllegalArgumentException when the event in question is parsed from the protobuf.

Exception text:

ERROR eventstore.StreamSubscriptionActor - eventstore.CommandNotExpectedException: Expected: ReadStreamEventsCompleted, actual: java.lang.IllegalArgumentException: requirement failed: streamId must not be empty

I can reliably reproduce the exception by writing an event of type "TestType" to a random stream in an ES instance with projections enabled, then deleting said stream and subscribing to "$et-TestType".

The problem seems to be that EventStore, upon encountering an unresolvable event, will create a ResolvedEvent instance as shown below (excerpt from EventStore.Core/Data/ResolvedEvent.cs):

public static ResolvedEvent ForFailedResolvedLink(EventRecord link, ReadEventResult resolveResult, long? commitPosition = null)
        {
            return new ResolvedEvent(null, link, commitPosition, resolveResult);
        }

The JVM client does not handle this case and just crashes when it encounters a ResolvedEvent that does not, in fact, contain a resolved event.

I would expect the subscription actor to simply ignore any events that are "unresolvable because deleted", at least when resolveLinkTos is enabled.

Including as dependency

I can't find how i can add this project in my play framework build.sbt file to libraryDependencies.

Thanks for help

CPU is at 10% even when the app is idle

I use EventStore.JVM [2.0.0] with reference configuration and changed IP + port.
Establishing connection with ES 3.0.1 causes CPU usage to jump from 0.1% to 10%[i7 2xcore],
also it's eating up RAM at a rate of ~1 MB/s
Logs show only heart beat messages going in & out

Longer description:
EventStore is used with Akka Persistance (2.3.9)
Problem was discovered after update to 2.0.0
I've tried 3 scenarios to separate source of the issue:
AKKA+ES.Persistance(1 actor, empty jurnal) -> CPU ~10% MEM ~130MB (~300MB after 2 minutes)
AKKA+LEVELDB -> CPU ~0.2% MEM ~125MB const.
AKKA+LEVELDB+ESConnection(not used) -> CPU ~10% MEM ~130MB (~300MB after 2 minutes)

My conclusion is that the problem is related to established connection in EventStore.JVM.
CPU goes down to 0.1% when i turn off the DB while the app is running.

subscription actor & automatic reconnection

Hi.
When I use subscription actor,

class EventListenner() extends Actor with ActorLogging {
    val connection = context.system.actorOf(ConnectionActor.props(), "connection")
    context.system.actorOf(SubscriptionActor.props(connection, self), "subscription"

override def preStart() = {
        log.info("ViewProcess actor started !")
        connection ! SubscribeTo(EventStream.All)  // TODO check reconnection
    }

    override def receive = {                // TODO stock last datetime event for each stream category & send it to database when actor stop
        case IndexedEvent(event, _) if(isTicketEvent(event) /*&& isNew(event)*/) => ???
        case _ => 
    }
    private def isTicketEvent(event: Event): Boolean = event.streamId.streamId.startsWith("ticket")
}

and GetEventstore shutdown & restart, the subscribe down at client side, and I don't know how to keep it up automatically. (I must restart my app actually) :/

Have you any tips ?
Thx for your response.

Subscription is lost after reconnect to ES

Subscription is created using EsConnection:

public Closeable subscribeToAllFrom(Position position, final EventObserver observer) {
        return connection.subscribeToAllFrom(new SubscriptionObserver<IndexedEvent>() {
            @Override
            public void onLiveProcessingStart(Closeable closeable) {
                numberOfSubscriptions.inc();
                logger.debug("Live processing: {}, total: {}", observer, numberOfSubscriptions.getCount());
            }

            @Override
            public void onEvent(IndexedEvent indexedEvent, Closeable closeable) {
                if (!isSystemEvent(indexedEvent)) {
                    try (Timer.Context ignored = allNonSystemEvents.time()) {
                        observer.onEvent(asSubscribed(indexedEvent));
                    }
                }
            }

            @Override
            public void onError(Throwable throwable) {
                logger.error("Error in subscription: " + observer, throwable);
            }

            @Override
            public void onClose() {
                numberOfSubscriptions.dec();
                logger.debug("Closed: {}, total: {}", observer, numberOfSubscriptions.getCount());
            }
        }, exactPosition(position), false, null);
    }

EventStore version: 3.0.0
client version: 1.0.1

After losing and reestablishing connection to EventStore, SubscriptionObserver is no longer receiving events. As I understand it, this is not an intended behavior?

EventStoreExtension should not use Settings.Default

Hello,
I met problems with using EventStoreExtension, because by default it configures ESConnection with Settings.Default:

class EventStoreExtension(system: ActorSystem) extends Extension {

  def settings: Settings = Settings.Default

  val actor: ActorRef = system.actorOf(ConnectionActor.props(settings), "eventstore-connection")

  val connection: EsConnection = new EsConnection(actor, system, settings.operationTimeout)
}

Setting.Default are built as follow:

lazy val Default: Settings = Settings(ConfigFactory.load())

It means that configuration will not origin from passed ActorSystem, but from application.conf. I suggest to create this Settings from system.settings.config.

spray dependencies

Hi :)

Would it be possible to make the

  object Spray {
    val client = "io.spray" %% "spray-client" % "1.3.2"
    val json   = "io.spray" %% "spray-json" % "1.3.1"
  }

provided in order to avoid having unnecessary dependencies?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.