Actor framework for Java, non-blocking, performant. Developed as a reaction to the Akka licence change. Akka is a huge framework with a large number of extensions including persistence, web serving, streaming and more. My needs are limited to core of Akka, namely in-memory actor support (including supervision) and that's what this library provides.
Why would you trust this library with plenty of concurrency-sensitive code? I've been involved in concurrency-sensitive projects since 2014 as a frequent contributor to RxJava 1.x and the author of numerous popular RxJava libraries (rxjava-extras, rxjava-jdbc, rxjava3-pool, rxjava-slf4j, rxjava-http, rxjava-aws). All these libraries involve a lot of concurrency-aware and performance-aware work so this is not new ground for me. Please raise an issue if you spot something.
Why do we need another Actor library on the JVM? I think there's space for an in-memory Actor library that has a small footprint (only dependency is slf4j-api for logging and jar is ~100K), is easy to use (the API is both simple and discoverable), offers a goodly number of useful features, and is thread-safe and performant.
- Discoverable concise API (Akka is not, partly due to the way it's evolved and a lot of Scala library stuff)
- Custom supervisors including the ability to retry processing later and/or restart the actor (recreate the Actor object)
- Parent-child actor hierarchies
- Dead letter actor
- SLF4J logging (add the implementation that you like)
- Akka stop semantics (stopping an actor stops its children first)
- Jar size 100K with only one logging dependency (slf4j)
Status: Published to Maven Central
TODO
- Lifecycle monitoring (DeathWatch) support. Later.
- Add Actor.preRestart, Actor.postRestart methods? Jury still out on this. NO
- Akka has a kill method that forces onMessage to throw an ActorKilledException. Not keen on this. NO
mvn clean install
Add this dependency to your pom.xml:
<dependency>
<groupId>com.github.davidmoten</groupId>
<artifactId>reels</artifactId>
<version>VERSION_HERE</version>
</dependency><
TODO
- Actor
- ActorRef
- Context
- Supervisor
- SupervisedActorRef
- Message
- Create a
Context
(ActorSystem
in Akka) object to create your actors and control their lifecycle Scheduler
s (Dispatcher
s in Akka) live outside yourContext
object and thus can be shared acrossContext
s (for greater efficiency))
The simplest way to create a Context
object is:
Context context = Context.create();
You can use a builder to configure the Context
:
Context context = Context
.builder()
.supervisor(supervisor)
.deadLetterActorFactory(factory)
.scheduler(scheduler)
.build();
Now use a Context
to create typed Actors:
Context context = Context.create();
ActorRef<String> a = context.matchAny(m -> System.out.println(m.content())).build();
a.tell("hi there");
Thread.sleep(1000);
ActorRef.tell
is asynchronous (with the default scheduler) so we wait with Thread.sleep
to see something happen. The result of course is that
"hi there" will be written to the console and it will generally happen on a different thread to the call of tell
.
Here's a "kitchen sink" example that demonstrates many options when creating actors:
Context context = Context.create();
// create a parent actor (you can setup heirarchies) using a builder
ActorRef<Number> a = context
.<Number>matchAny(m -> {
log.info("{}: parent received {}", m.self(), m.content());
m.self().child("b").tell(m.content(), m.self());
})
.name("a")
.scheduler(Scheduler.single())
.onStop(self -> log.info("{}: onStop", self))
.build();
// create a child actor of `a` using a builder
context
// specify a series of matches to apply to incoming message content
.<Number>matchEquals(1, m -> {
log.info("{}: equal matched, sender = {}", m.self(), m.sender());
m.sender().<Number>tell(9999));
})
.match(Integer.class, m -> log.info("{}: received integer {}", m.self(), m.content()))
.match(Double.class, m -> log.info("{}: received double {}", m.self(), m.content()))
.matchAny(m -> log.info("{}: received something else {}", m.self(), m.content()))
.name("b")
.onError(e -> log.error(e.getMessage(), e))
.preStart(self -> log.info("{}: preStart", self))
.onStop(self -> log.info("{}: onStop", self))
.scheduler(Scheduler.computation())
.parent(a)
.mailboxFactory(Mailbox.bounded(1000, true))
.supervisor((m, actor, e) -> {
log.error(e.getMessage(), e);
actor.pause(30, TimeUnit.SECONDS);
actor.retry();
})
.build();
a.tell(1);
a.tell(2);
a.tell(3.5);
a.tell(4f);
// give enough time to run (especially for b to respond to `a.tell(1)`
Thread.sleep(500);
context.shutdownGracefully().get(5000, TimeUnit.SECONDS);
Output:
2022-10-07T21:47:28:055 +1100 [ReelsSingle-1] INFO com.github.davidmoten.reels.ActorTest - a: parent received 1
2022-10-07T21:47:28:057 +1100 [ReelsSingle-1] INFO com.github.davidmoten.reels.ActorTest - a: parent received 2
2022-10-07T21:47:28:057 +1100 [ReelsComputation-1] INFO com.github.davidmoten.reels.ActorTest - b: preStart
2022-10-07T21:47:28:057 +1100 [ReelsComputation-1] INFO com.github.davidmoten.reels.ActorTest - b: equal matched, sender = Optional[a]
2022-10-07T21:47:28:058 +1100 [ReelsComputation-1] INFO com.github.davidmoten.reels.ActorTest - b: received integer 2
2022-10-07T21:47:28:059 +1100 [ReelsSingle-1] INFO com.github.davidmoten.reels.ActorTest - a: parent received 3.5
2022-10-07T21:47:28:059 +1100 [ReelsSingle-1] INFO com.github.davidmoten.reels.ActorTest - a: parent received 4.0
2022-10-07T21:47:28:059 +1100 [ReelsComputation-1] INFO com.github.davidmoten.reels.ActorTest - b: received double 3.5
2022-10-07T21:47:28:060 +1100 [ReelsSingle-1] INFO com.github.davidmoten.reels.ActorTest - a: parent received 9999
2022-10-07T21:47:28:060 +1100 [ReelsComputation-1] INFO com.github.davidmoten.reels.ActorTest - b: received something else 4.0
2022-10-07T21:47:28:060 +1100 [ReelsComputation-1] INFO com.github.davidmoten.reels.ActorTest - b: received integer 9999
2022-10-07T21:47:28:562 +1100 [ReelsComputation-1] INFO com.github.davidmoten.reels.ActorTest - b: onStop
2022-10-07T21:47:28:564 +1100 [ReelsSingle-1] INFO com.github.davidmoten.reels.ActorTest - a: onStop
Note that if you use a lambda and you make a reference to the enclosing class then the ActorRef will retain a reference to the enclosing object. We should be careful about this because that referred object will not be garbage collected till the ActorRef is. Consequently, if you create an actor from another actor then you should be conscious about this lifecycle link.
You can also create an Actor class yourself instead of using the builder. Implement Actor<T>
or extend AbstractActor<T>
. Suppose you create a class called MyActor
which extends AbstractActor<Integer>
. You can create an ActorRef for this class with the Context as follows:
ActorRef<Integer> actor = context.actorClass(MyActor.class).build();
or
ActorRef<Integer> actor = context.actorFactory(() -> new MyActor()).build();
Note that you can also use the actorClass builder method with constructor arguments too:
ActorRef<Integer> actor = context.actorClass(MyActor.class, "Fred Nurk", 1980).build();
This sends an anonymous message (the Actor wil be supplied ActorRef.none() as the sender) to an actor:
ActorRef<Thing> actor = ...
Thing thing = ...
actor.tell(thing);
To include a sender actor (for example as a reply-to actor):
ActorRef<Thing> actor = ...
ActorRef<Object> replyTo = ...
Thing thing = ...
actor.tell(thing, replyTo);
Here's an example of ask
where an actor does some calculation and returns an answer asynchronously (via a CompletableFuture
):
ActorRef<Integer> square =
context
.<Integer>matchAny(m -> m.reply(m.content() * m.content())
.build();
square
.ask(23)
.thenAccept(System.out::println)
.join();
Output:
529
When an error is thrown by your code in onMessage you have two options:
- catch and handle the error in-place and reset state as appropriate
- handle the error in a Supervisor, which has more functionality including the ability to restart (recreate the Actor instance), pause processing, and retry a message.
For example, a Supervisor for some JDBC work can check if the thrown exception is an instance of SQLTransientException and mark the message for a retry after a pause. If the thrown exception is an instance of SQLNonTransientConnectionException then we should retry with a new connection object. This unit test demonstrates this example.
If you don't specify your own Supervisor (either globally in the Context or specifically for your Actor) then the default Supervisor is used.
Note that the Supervisor is called also for errors thrown by Actor.preStart
and Actor.onStop
. However, those errors will arrive at the Supervisor wrapped in PreStartException
and OnStopException
respectively.
The default Supervisor for an actor is the Supervisor of its parent.
Supervisors themselves should not throw. If they do TODO.
- An Actor is created by a Context object. The Context object has an internal singleton root actor but is the parent for an Actor you create unless you provide it with an explicit parent.
- When an actor is disposed no more children can be created for it
- Dispose happens synchronously (the actor and all its children and descendants are disposed before the method returns)
- Restarting an actor from a supervisor will dispose all that actors children
Sending a message to an Actor is normally asynchronous. That is when you call actorRef.tell(msg)
what happens under the covers is that the message is placed on a concurrent queue for that actor and the actor is notified to process its queue using an executor.
Schedulers wrap executors and are designed to be efficient for particular use cases. Actors obtain a Worker from each Scheduler for the lifetime of the actor. The standard schedulers are
Scheduler.forkjoin()
, a singleton work-stealing pool of threads that is great for general purpose non-blocking work (wins benchmarks pretty handily and became twice as fast between Java 8 and Java 17). The default scheduler.Scheduler.computation()
is an alias forforkJoin()
and is for non-blocking workScheduler.computationSticky()
, a singleton pool of threads (size = number of processors) for non-blocking work. A Worker on this pool uses a randomly/round-robin assigned thread from the pool and that thread stays with the Worker till disposal of the Worker (that is a thread sticks to an actor). One thread can be in use by many Workers. Normally slower thanforkJoin()
.Scheduler.io()
, a singleton unbounded thread pool designed for blocking work, unused threads are disposed of by an evicting thread after 60s of inactivity. Each Worker has one thread (and each thread in this pool has only one Worker). This scheduler was adapted from RxJava 3.xSchedulers.io()
.Scheduler.single()
, a singleton scheduler that is based on a single thread executor serviceScheduler.newSingle()
, creates a new single-thread-based schedulerScheduler.fromExecutor(ExecutorService)
, creates a new scheduler based on the given ExecutorService. Use one of these with a pool for blocking work where you have a lot of actors (to limit context switching and thread memory use)Scheduler.test()
is for synchronous unit testing purposes and should not be mixed with asynchronous scheduler use in the same Context.Scheduler.immediate()
is for synchronous execution of all tasks, limited delayed scheduling, and should not be mixed with asynchronous scheduler use in the same Context
Make sure you use a blocking scheduler (especially Scheduler.io()
) for any blocking work like database calls, file system IO, network IO.
To run benchmarks:
mvn clean install -P benchmark
Benchmarking indicates that reels is faster than Akka for four aspects tested:
- ask performance (15% faster)
- hub and spoke contended performance (3x faster)
- random messages around a ring performance, some contention, uses actor lookups (5x faster)
- long sequential chain then return (8x faster)
Using JDK 17 on i5-6200U with 4 cores:
Benchmark Mode Cnt Score Error Units
Benchmarks.actorCreateAndStop thrpt 10 251437.893 ± 449.575 ops/s
Benchmarks.ask thrpt 10 8.063 ± 0.018 ops/s
Benchmarks.contendedConcurrencyComputationSticky thrpt 10 0.763 ± 0.050 ops/s
Benchmarks.contendedConcurrencyForkJoin thrpt 10 6.028 ± 0.248 ops/s
Benchmarks.contendedConcurrencyImmediate thrpt 10 10.072 ± 0.099 ops/s
Benchmarks.groupRandomMessagesComputationSticky thrpt 10 1.958 ± 0.045 ops/s
Benchmarks.groupRandomMessagesForkJoin thrpt 10 17.947 ± 0.694 ops/s
Benchmarks.groupRandomMessagesImmediate thrpt 10 32.813 ± 0.616 ops/s
Benchmarks.groupRandomMessagesIo thrpt 10 0.867 ± 0.019 ops/s
BenchmarksAkka.ask thrpt 10 6.978 ± 0.293 ops/s
BenchmarksAkka.contendedConcurrency thrpt 10 1.641 ± 0.105 ops/s
BenchmarksAkka.groupRandomMessages thrpt 10 3.711 ± 0.055 ops/s
Benchmarks.sequential ss 10 1.951 ± 0.326 s/op
BenchmarksAkka.sequential ss 10 20.868 ± 0.478 s/op