Code Monkey home page Code Monkey logo

reels's Introduction

reels


codecov
Maven Central

Actor framework for Java, non-blocking, performant. Developed as a reaction to the Akka licence change. Akka is a huge framework with a large number of extensions including persistence, web serving, streaming and more. My needs are limited to core of Akka, namely in-memory actor support (including supervision) and that's what this library provides.

Why would you trust this library with plenty of concurrency-sensitive code? I've been involved in concurrency-sensitive projects since 2014 as a frequent contributor to RxJava 1.x and the author of numerous popular RxJava libraries (rxjava-extras, rxjava-jdbc, rxjava3-pool, rxjava-slf4j, rxjava-http, rxjava-aws). All these libraries involve a lot of concurrency-aware and performance-aware work so this is not new ground for me. Please raise an issue if you spot something.

Why do we need another Actor library on the JVM? I think there's space for an in-memory Actor library that has a small footprint (only dependency is slf4j-api for logging and jar is ~100K), is easy to use (the API is both simple and discoverable), offers a goodly number of useful features, and is thread-safe and performant.

Features

  • Discoverable concise API (Akka is not, partly due to the way it's evolved and a lot of Scala library stuff)
  • Custom supervisors including the ability to retry processing later and/or restart the actor (recreate the Actor object)
  • Parent-child actor hierarchies
  • Dead letter actor
  • SLF4J logging (add the implementation that you like)
  • Akka stop semantics (stopping an actor stops its children first)
  • Jar size 100K with only one logging dependency (slf4j)

Status: Published to Maven Central

TODO

  • Lifecycle monitoring (DeathWatch) support. Later.
  • Add Actor.preRestart, Actor.postRestart methods? Jury still out on this. NO
  • Akka has a kill method that forces onMessage to throw an ActorKilledException. Not keen on this. NO

How to build

mvn clean install

Getting started

Add this dependency to your pom.xml:

<dependency>
  <groupId>com.github.davidmoten</groupId>
  <artifactId>reels</artifactId>
  <version>VERSION_HERE</version>
</dependency><

Glossary

TODO

  • Actor
  • ActorRef
  • Context
  • Supervisor
  • SupervisedActorRef
  • Message

Actor Lifecycle

Actor Lifecycle

Class Diagram

Class Diagram

Usage

Create a Context

  • Create a Context (ActorSystem in Akka) object to create your actors and control their lifecycle
  • Schedulers (Dispatchers in Akka) live outside your Context object and thus can be shared across Contexts (for greater efficiency))

The simplest way to create a Context object is:

Context context = Context.create();

You can use a builder to configure the Context:

Context context = Context
  .builder()
  .supervisor(supervisor)
  .deadLetterActorFactory(factory)
  .scheduler(scheduler)
  .build();

Create an Actor using matchers

Now use a Context to create typed Actors:

Context context = Context.create();
ActorRef<String> a = context.matchAny(m -> System.out.println(m.content())).build();
a.tell("hi there");
Thread.sleep(1000);

ActorRef.tell is asynchronous (with the default scheduler) so we wait with Thread.sleep to see something happen. The result of course is that "hi there" will be written to the console and it will generally happen on a different thread to the call of tell.

Here's a "kitchen sink" example that demonstrates many options when creating actors:

Context context = Context.create();

// create a parent actor (you can setup heirarchies) using a builder
ActorRef<Number> a = context 
    .<Number>matchAny(m -> {
        log.info("{}: parent received {}", m.self(), m.content());
        m.self().child("b").tell(m.content(), m.self());
    }) 
    .name("a") 
    .scheduler(Scheduler.single()) 
    .onStop(self -> log.info("{}: onStop", self)) 
    .build();

// create a child actor of `a` using a builder
context 
    // specify a series of matches to apply to incoming message content
    .<Number>matchEquals(1, m -> {
        log.info("{}: equal matched, sender = {}", m.self(), m.sender());
        m.sender().<Number>tell(9999));
    }) 
    .match(Integer.class, m -> log.info("{}: received integer {}", m.self(), m.content())) 
    .match(Double.class, m -> log.info("{}: received double {}", m.self(), m.content())) 
    .matchAny(m -> log.info("{}: received something else {}", m.self(), m.content())) 
    .name("b") 
    .onError(e -> log.error(e.getMessage(), e)) 
    .preStart(self -> log.info("{}: preStart", self)) 
    .onStop(self -> log.info("{}: onStop", self)) 
    .scheduler(Scheduler.computation()) 
    .parent(a) 
    .mailboxFactory(Mailbox.bounded(1000, true))
    .supervisor((m, actor, e) -> {
        log.error(e.getMessage(), e);
        actor.pause(30, TimeUnit.SECONDS);
        actor.retry();
    }) 
    .build();

a.tell(1);
a.tell(2);
a.tell(3.5);
a.tell(4f);

// give enough time to run (especially for b to respond to `a.tell(1)`
Thread.sleep(500);
context.shutdownGracefully().get(5000, TimeUnit.SECONDS);

Output:

2022-10-07T21:47:28:055 +1100 [ReelsSingle-1] INFO com.github.davidmoten.reels.ActorTest - a: parent received 1
2022-10-07T21:47:28:057 +1100 [ReelsSingle-1] INFO com.github.davidmoten.reels.ActorTest - a: parent received 2
2022-10-07T21:47:28:057 +1100 [ReelsComputation-1] INFO com.github.davidmoten.reels.ActorTest - b: preStart
2022-10-07T21:47:28:057 +1100 [ReelsComputation-1] INFO com.github.davidmoten.reels.ActorTest - b: equal matched, sender = Optional[a]
2022-10-07T21:47:28:058 +1100 [ReelsComputation-1] INFO com.github.davidmoten.reels.ActorTest - b: received integer 2
2022-10-07T21:47:28:059 +1100 [ReelsSingle-1] INFO com.github.davidmoten.reels.ActorTest - a: parent received 3.5
2022-10-07T21:47:28:059 +1100 [ReelsSingle-1] INFO com.github.davidmoten.reels.ActorTest - a: parent received 4.0
2022-10-07T21:47:28:059 +1100 [ReelsComputation-1] INFO com.github.davidmoten.reels.ActorTest - b: received double 3.5
2022-10-07T21:47:28:060 +1100 [ReelsSingle-1] INFO com.github.davidmoten.reels.ActorTest - a: parent received 9999
2022-10-07T21:47:28:060 +1100 [ReelsComputation-1] INFO com.github.davidmoten.reels.ActorTest - b: received something else 4.0
2022-10-07T21:47:28:060 +1100 [ReelsComputation-1] INFO com.github.davidmoten.reels.ActorTest - b: received integer 9999
2022-10-07T21:47:28:562 +1100 [ReelsComputation-1] INFO com.github.davidmoten.reels.ActorTest - b: onStop
2022-10-07T21:47:28:564 +1100 [ReelsSingle-1] INFO com.github.davidmoten.reels.ActorTest - a: onStop

A warning about using lambdas to create actors

Note that if you use a lambda and you make a reference to the enclosing class then the ActorRef will retain a reference to the enclosing object. We should be careful about this because that referred object will not be garbage collected till the ActorRef is. Consequently, if you create an actor from another actor then you should be conscious about this lifecycle link.

Create an Actor using your own class

You can also create an Actor class yourself instead of using the builder. Implement Actor<T> or extend AbstractActor<T>. Suppose you create a class called MyActor which extends AbstractActor<Integer>. You can create an ActorRef for this class with the Context as follows:

ActorRef<Integer> actor = context.actorClass(MyActor.class).build();

or

ActorRef<Integer> actor = context.actorFactory(() -> new MyActor()).build();

Note that you can also use the actorClass builder method with constructor arguments too:

ActorRef<Integer> actor = context.actorClass(MyActor.class, "Fred Nurk", 1980).build();

Send a message to an Actor

This sends an anonymous message (the Actor wil be supplied ActorRef.none() as the sender) to an actor:

ActorRef<Thing> actor = ...
Thing thing = ...
actor.tell(thing);

To include a sender actor (for example as a reply-to actor):

ActorRef<Thing> actor = ...
ActorRef<Object> replyTo = ...
Thing thing = ...
actor.tell(thing, replyTo);

ask

Here's an example of ask where an actor does some calculation and returns an answer asynchronously (via a CompletableFuture):

ActorRef<Integer> square = 
    context
      .<Integer>matchAny(m -> m.reply(m.content() * m.content())
      .build();
square
  .ask(23)
  .thenAccept(System.out::println)
  .join();

Output:

529

Supervisors

When an error is thrown by your code in onMessage you have two options:

  • catch and handle the error in-place and reset state as appropriate
  • handle the error in a Supervisor, which has more functionality including the ability to restart (recreate the Actor instance), pause processing, and retry a message.

For example, a Supervisor for some JDBC work can check if the thrown exception is an instance of SQLTransientException and mark the message for a retry after a pause. If the thrown exception is an instance of SQLNonTransientConnectionException then we should retry with a new connection object. This unit test demonstrates this example.

If you don't specify your own Supervisor (either globally in the Context or specifically for your Actor) then the default Supervisor is used.

Note that the Supervisor is called also for errors thrown by Actor.preStart and Actor.onStop. However, those errors will arrive at the Supervisor wrapped in PreStartException and OnStopException respectively.

The default Supervisor for an actor is the Supervisor of its parent.

Supervisors themselves should not throw. If they do TODO.

Notes

  • An Actor is created by a Context object. The Context object has an internal singleton root actor but is the parent for an Actor you create unless you provide it with an explicit parent.
  • When an actor is disposed no more children can be created for it
  • Dispose happens synchronously (the actor and all its children and descendants are disposed before the method returns)
  • Restarting an actor from a supervisor will dispose all that actors children

Schedulers

Sending a message to an Actor is normally asynchronous. That is when you call actorRef.tell(msg) what happens under the covers is that the message is placed on a concurrent queue for that actor and the actor is notified to process its queue using an executor.

Schedulers wrap executors and are designed to be efficient for particular use cases. Actors obtain a Worker from each Scheduler for the lifetime of the actor. The standard schedulers are

  • Scheduler.forkjoin(), a singleton work-stealing pool of threads that is great for general purpose non-blocking work (wins benchmarks pretty handily and became twice as fast between Java 8 and Java 17). The default scheduler.
  • Scheduler.computation() is an alias for forkJoin() and is for non-blocking work
  • Scheduler.computationSticky(), a singleton pool of threads (size = number of processors) for non-blocking work. A Worker on this pool uses a randomly/round-robin assigned thread from the pool and that thread stays with the Worker till disposal of the Worker (that is a thread sticks to an actor). One thread can be in use by many Workers. Normally slower than forkJoin().
  • Scheduler.io(), a singleton unbounded thread pool designed for blocking work, unused threads are disposed of by an evicting thread after 60s of inactivity. Each Worker has one thread (and each thread in this pool has only one Worker). This scheduler was adapted from RxJava 3.x Schedulers.io().
  • Scheduler.single(), a singleton scheduler that is based on a single thread executor service
  • Scheduler.newSingle(), creates a new single-thread-based scheduler
  • Scheduler.fromExecutor(ExecutorService), creates a new scheduler based on the given ExecutorService. Use one of these with a pool for blocking work where you have a lot of actors (to limit context switching and thread memory use)
  • Scheduler.test() is for synchronous unit testing purposes and should not be mixed with asynchronous scheduler use in the same Context.
  • Scheduler.immediate() is for synchronous execution of all tasks, limited delayed scheduling, and should not be mixed with asynchronous scheduler use in the same Context

Blocking work

Make sure you use a blocking scheduler (especially Scheduler.io()) for any blocking work like database calls, file system IO, network IO.

Benchmarks

To run benchmarks:

mvn clean install -P benchmark

Benchmarking indicates that reels is faster than Akka for four aspects tested:

  • ask performance (15% faster)
  • hub and spoke contended performance (3x faster)
  • random messages around a ring performance, some contention, uses actor lookups (5x faster)
  • long sequential chain then return (8x faster)

Using JDK 17 on i5-6200U with 4 cores:

Benchmark                                          Mode  Cnt       Score     Error  Units
Benchmarks.actorCreateAndStop                     thrpt   10  251437.893 ± 449.575  ops/s
Benchmarks.ask                                    thrpt   10       8.063 ±   0.018  ops/s
Benchmarks.contendedConcurrencyComputationSticky  thrpt   10       0.763 ±   0.050  ops/s
Benchmarks.contendedConcurrencyForkJoin           thrpt   10       6.028 ±   0.248  ops/s
Benchmarks.contendedConcurrencyImmediate          thrpt   10      10.072 ±   0.099  ops/s
Benchmarks.groupRandomMessagesComputationSticky   thrpt   10       1.958 ±   0.045  ops/s
Benchmarks.groupRandomMessagesForkJoin            thrpt   10      17.947 ±   0.694  ops/s
Benchmarks.groupRandomMessagesImmediate           thrpt   10      32.813 ±   0.616  ops/s
Benchmarks.groupRandomMessagesIo                  thrpt   10       0.867 ±   0.019  ops/s
BenchmarksAkka.ask                                thrpt   10       6.978 ±   0.293  ops/s
BenchmarksAkka.contendedConcurrency               thrpt   10       1.641 ±   0.105  ops/s
BenchmarksAkka.groupRandomMessages                thrpt   10       3.711 ±   0.055  ops/s
Benchmarks.sequential                                ss   10       1.951 ±   0.326   s/op
BenchmarksAkka.sequential                            ss   10      20.868 ±   0.478   s/op

reels's People

Contributors

davidmoten avatar dependabot[bot] avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.