eligosource / eventsourced Goto Github PK
View Code? Open in Web Editor NEWA library for building reliable, scalable and distributed event-sourced applications in Scala
License: Apache License 2.0
A library for building reliable, scalable and distributed event-sourced applications in Scala
License: Apache License 2.0
Only a single journal implementation based on on LevelDB and leveldbjni is supported at the moment. Hence the library can not be used on Windows (see leveldbjni issue #3).
Currently, only one destination in a multicast destination scenario needs to confirm a message before the message is deleted/marked for deletion.
It would be very useful to support a scenario in which all multicast destinations need to confirm the message before it is deleted. For example, if a message needs to be sent to multiple external systems, and one system is down, then the message would remain in the journal, for the destination actor that sends to that system only.
I would like to use eventsourced in an OSGi environment. I was able to add the OSGi meta-data to both leveldbjni, and eventsourced, and make it work correctly. However, it would be great if this could be applied upstream.
Unfortunately, the sbt-osgi plugin seems to be very flaky -- see my post here on the sbt mailing list as I could not get sbt to add the OSGi meta-data:
https://groups.google.com/d/msg/simple-build-tool/LC0rLHK2jfc/du8os8wmxE0J
I will continue to follow up on this and submit a pull request once I have it working.
Should be enforced on EventsourcingExtension
level.
For configurability of certain settings such as some timeouts, event serializer, journal settings, redelivery policies etc.
To achieve journal high-availability and horizontal scalabilty.
Based on the new API introduced with #16
... but do not timestamp messages written by reliable channels.
See LevelDB documentation, section Checksums.
Configurable redelivery policies for reliable output channels.
Hi, I am not sure if this is the right place.
I checked out the recent wip-akka-2.1 branch. Started SBT and did a compile. So far looks fine but running "test:run-nobootcp org.eligosource.eventsourced.guide.FirstSteps" I get
about
[info] This is sbt 0.12.1
[info] The current project is {file:/D:/Development/Playground/EventSourced/wip-
aka-2_1/eventsourced/}eventsourced
[info] The current project is built against Scala 2.10.0-RC2
[info] Available Plugins: org.sbtidea.SbtIdeaPlugin
[info] sbt, sbt plugins, and build definitions are using Scala 2.9.2test:run-nobootcp org.eligosource.eventsourced.guide.FirstSteps
[warn] Credentials file C:\Users\Weissmann.sbt.credentials does not exist
Uncaught error from thread [guide-akka.actor.default-dispatcher-3] shutting down
JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[guide]
java.lang.UnsatisfiedLinkError: Could not load library. Reasons: [no leveldbjni3
2-1.2 in java.library.path, no leveldbjni-1.2 in java.library.path, no leveldbjn
i in java.library.path]
at org.fusesource.hawtjni.runtime.Library.doLoad(Library.java:184)
at org.fusesource.hawtjni.runtime.Library.load(Library.java:142)
at org.fusesource.leveldbjni.JniDBFactory.(JniDBFactory.java:48)
at org.eligosource.eventsourced.journal.LeveldbJournalPS.(LeveldbJournalPS.scala:47)
at org.eligosource.eventsourced.journal.LeveldbJournal$$anonfun$processorStructured$2.apply(LeveldbJournal.scala:45)
at org.eligosource.eventsourced.journal.LeveldbJournal$$anonfun$processorStructured$2.apply(LeveldbJournal.scala:45)
at akka.actor.ActorCell.newActor(ActorCell.scala:456)
at akka.actor.ActorCell.create(ActorCell.scala:474)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:346)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:256)
at akka.dispatch.Mailbox.run(Mailbox.scala:211)
at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:502)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:262)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1478)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)
[trace] Stack trace suppressed: run last *:run-nobootcp for the full output.
error Run failed
[error] Total time: 2 s, completed 09.11.2012 13:57:05
Journal and output channels could concurrently add messages to a processor mailbox (concurrent replay() and deliver()). Delivery must only be started once journal has added all replayed messages to processor mailboxes.
The below code gives me very strange behavior.
import scala.concurrent._
import scala.concurrent.duration._
import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import org.eligosource.eventsourced.core._
import org.eligosource.eventsourced.journal.LeveldbJournalProps
object QFXSystem {
implicit val system = ActorSystem("example")
implicit val timeout = Timeout(5 seconds)
import system.dispatcher
val journalDir = new java.io.File("target/example-1")
val journal = Journal(LeveldbJournalProps(journalDir))
val extension = EventsourcingExtension(system, journal)
val processor = extension.processorOf(Props(new OrderProcessor with Emitter with Eventsourced { val id = 0 }))
val riskEngine = extension.processorOf(Props(new OrderConsumer(processor) with Receiver with Confirm with Eventsourced { val id = 1 }))
extension.recover()
class OrderProcessor extends Actor { this: Emitter =>
def receive = {
case x => println (x)
}
}
class OrderConsumer(val consumer: ActorRef) extends Actor { this: Receiver =>
def receive = {
case x => println(x)
}
}
}
Exceptions follow:
Exception in thread "main" java.lang.ExceptionInInitializerError
at com.fxq.qfx.QFXSystem.main(QFXSystem.scala)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [5 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:96)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:100)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.eligosource.eventsourced.core.EventsourcingExtension.processorOf(EventsourcingExtension.scala:110)
at com.fxq.qfx.QFXSystem$.(QFXSystem.scala:24)
at com.fxq.qfx.QFXSystem$.(QFXSystem.scala)
... 1 more
Uncaught error from thread [example-akka.actor.default-dispatcher-2] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[example]
java.lang.NoClassDefFoundError: Could not initialize class com.fxq.qfx.QFXSystem$
at com.fxq.qfx.QFXSystem$$anonfun$2$$anon$2.(QFXSystem.scala:24)
at com.fxq.qfx.QFXSystem$$anonfun$2.apply(QFXSystem.scala:24)
at com.fxq.qfx.QFXSystem$$anonfun$2.apply(QFXSystem.scala:24)
at akka.actor.ActorCell.newActor(ActorCell.scala:461)
at akka.actor.ActorCell.create(ActorCell.scala:479)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:351)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:256)
at akka.dispatch.Mailbox.run(Mailbox.scala:211)
at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:502)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:262)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1478)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)
[ERROR] [01/06/2013 15:02:19.742] [example-akka.actor.default-dispatcher-2] [ActorSystem(example)] Uncaught error from thread [example-akka.actor.default-dispatcher-2] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled
java.lang.NoClassDefFoundError: Could not initialize class com.fxq.qfx.QFXSystem$
at com.fxq.qfx.QFXSystem$$anonfun$2$$anon$2.(QFXSystem.scala:24)
at com.fxq.qfx.QFXSystem$$anonfun$2.apply(QFXSystem.scala:24)
at com.fxq.qfx.QFXSystem$$anonfun$2.apply(QFXSystem.scala:24)
at akka.actor.ActorCell.newActor(ActorCell.scala:461)
at akka.actor.ActorCell.create(ActorCell.scala:479)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:351)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:256)
at akka.dispatch.Mailbox.run(Mailbox.scala:211)
at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:502)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:262)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1478)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)
Any ideas what I'm doing wrong?
Such as fsync, for example
Separate sub-project for each journal implementation.
Let applications specify the rate at which event messages are replayed.
Allows destinations to reply to senders during recovery:
val sender1: ActorRef = ...
val sender2: ActorRef = ...
val channel: ActorRef = ...
channel ! Deliver // recover channel
sender1
and sender2
will receive replies if previous application runs did
channel tell (..., sender1)
channel tell (..., sender2)
without having made a delivery confirmation. If previous application runs where asking channels
channel ? ...
there won't be a future to reply to after recovery and the reply will go to deadLetters
. This enhancement will also support storage of sender references before a reliable channel is initialized.
Hi,
Is it possible to develop this as part of type safe like Slick? Perhaps you could convince them to take this on.
S
P.S. I believe is it best to make the production journal an embeddable pure Java implementations. Perhaps even implement one as part of the project itself.
restartMax
optionDeliveryStopped
event to event stream when channel finally stops deliveryEventsourcingExtension.deliver(Int)
Hi, I get the following exception on the OrderExample (wip-akka-2.1 branch) :
Exception in thread "main" c487df01-d62d-4807-8c83-1367c8e9acebakka.actor.InvalidActorNameException: illegal actor name 'validation requests', must conform to (?:[-\w:@&=+,.!';]|%\p{XDigit}{2})(?:[-\w:@&=+,.!'$;]|%\p{XDigit}{2})*
at akka.actor.dungeon.Children$class.checkName(Children.scala:166)
at akka.actor.dungeon.Children$class.attachChild(Children.scala:43)
at akka.actor.ActorCell.attachChild(ActorCell.scala:301)
at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:513)
at org.eligosource.eventsourced.core.ReliableChannelProps.createChannel(EventsourcingExtension.scala:404)
at org.eligosource.eventsourced.core.EventsourcingExtension.channelOf(EventsourcingExtension.scala:187)
at org.eligosource.eventsourced.example.OrderExample$delayedInit$body.apply(OrderExample.scala:43)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:309)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.App$class.main(App.scala:71)
at org.eligosource.eventsourced.example.OrderExample$.main(OrderExample.scala:28)
at org.eligosource.eventsourced.example.OrderExample.main(OrderExample.scala)
Hi, I am new to this, so this might make no sense at all, but is it possible to use wildcards for a channel ?
I try to figure out if the following scenario is possible in this environment (like in akka-camel, there it is possible) :
Publish - Subscribe with Wildcards :
extension.channelOf(DefaultChannelProps(1, listener1).withName("level1.level2.level3.*"))
extension.channelOf(DefaultChannelProps(2, listener2).withName("level1.>"))
extension.channelOf(DefaultChannelProps(3, listener3).withName("level1.level2.level3.level4"))
....
emitter("level1") sendEvent AnyEvent -> listener2 acts
....
emitter("level1.level2.level3.level5") sendEvent AnyEvent -> listener1 and listener2 acts
....
Cheers, Rob.
Maybe based on Akka Cluster
The subject says it all: we're seeing
java.lang.UnsupportedClassVersionError: org/eligosource/eventsourced/core/JournalProtocol$CommandProtocol : Unsupported major.minor version 51.0
at runtime using the recently-published 0.5-SNAPSHOT for Scala 2.10.0-RC2 and Akka 2.1.0-RC2 (thanks again for that)!
Hello Martin. Your project is really great!
But I have a problem.
I have two actors - Deposit 1 and Deposit 2. And two messages - changeDepositAmount(100) for Deposit 1 and changeDepositAmount(-100) for Deposit 2.
Then Deposit 1 successfully complete operation and save message to journal. But Deposit 2 has not enough money and fail operation. For system consistency I want to rollback first message.
What is right way to do it? Could you please help me?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.