Code Monkey home page Code Monkey logo

akka-edge-rs's Introduction

Akka

The Akka family of projects is managed by teams at Lightbend with help from the community.

We believe that writing correct concurrent & distributed, resilient and elastic applications is too hard. Most of the time it's because we are using the wrong tools and the wrong level of abstraction.

Akka is here to change that.

Using the Actor Model we raise the abstraction level and provide a better platform to build correct concurrent and scalable applications. This model is a perfect match for the principles laid out in the Reactive Manifesto.

For resilience, we adopt the "Let it crash" model which the telecom industry has used with great success to build applications that self-heal and systems that never stop.

Actors also provide the abstraction for transparent distribution and the basis for truly scalable and fault-tolerant applications.

Learn more at akka.io.

Reference Documentation

The reference documentation is available at doc.akka.io, for Scala and Java.

Current versions of all Akka libraries

The current versions of all Akka libraries are listed on the Akka Dependencies page. Releases of the Akka core libraries in this repository are listed on the GitHub releases page.

Community

You can join these groups and chats to discuss and ask Akka related questions:

In addition to that, you may enjoy following:

Contributing

Contributions are very welcome!

If you see an issue that you'd like to see fixed, or want to shape out some ideas, the best way to make it happen is to help out by submitting a pull request implementing it. We welcome contributions from all, even you are not yet familiar with this project, We are happy to get you started, and will guide you through the process once you've submitted your PR.

Refer to the CONTRIBUTING.md file for more details about the workflow, and general hints on how to prepare your pull request. You can also ask for clarifications or guidance in GitHub issues directly, or in the akka/dev chat if a more real time communication would be of benefit.

License

Akka is licensed under the Business Source License 1.1, please see the Akka License FAQ.

Tests and documentation are under a separate license, see the LICENSE file in each documentation and test root directory for details.

akka-edge-rs's People

Contributors

ennru avatar huntc avatar johanandren avatar patriknw avatar sebastian-alfers avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Forkers

shogun-code

akka-edge-rs's Issues

Documentation objectives

What documentation should we consider? We have been creating Rust doc along the way, and that should provide a reasonable reference, accessible from crates.io. I'm thinking that the regular Akka documentation may also need to include akka-edge-rs.

Building project locally failing with protoc problems

Pretty sure I used to be able to build, but current main as of 29 sep fails with:

Caused by:
  process didn't exit successfully: `/Users/johan/Code/Lightbend/Akka/akka-edge-rs/target/debug/build/akka-projection-rs-grpc-0b55aa6ca26e950f/build-script-build` (exit status: 1)
  --- stdout
  cargo:rerun-if-changed=akka/projection/grpc/event_consumer.proto
  cargo:rerun-if-changed=proto

  --- stderr
  Error: Custom { kind: Other, error: "protoc failed: google/protobuf/any.proto: File not found.\ngoogle/protobuf/timestamp.proto: File not found.\ngoogle/protobuf/empty.proto: File not found.\nakka/projection/grpc/event_producer.proto:7:1: Import \"google/protobuf/any.proto\" was not found or had errors.\nakka/projection/grpc/event_producer.proto:8:1: Import \"google/protobuf/timestamp.proto\" was not found or had errors.\nakka/projection/grpc/event_producer.proto:9:1: Import \"google/protobuf/empty.proto\" was not found or had errors.\nakka/projection/grpc/event_producer.proto:193:3: \"google.protobuf.Timestamp\" is not defined.\nakka/projection/grpc/event_producer.proto:222:3: \"google.protobuf.Any\" is not defined.\nakka/projection/grpc/event_producer.proto:227:3: \"google.protobuf.Any\" is not defined.\nakka/projection/grpc/event_producer.proto:251:3: \"google.protobuf.Timestamp\" is not defined.\nakka/projection/grpc/event_consumer.proto:7:1: Import \"google/protobuf/any.proto\" was not found or had errors.\nakka/projection/grpc/event_consumer.proto:8:1: Import \"google/protobuf/timestamp.proto\" was not found or had errors.\nakka/projection/grpc/event_consumer.proto:9:1: Import \"google/protobuf/empty.proto\" was not found or had errors.\nakka/projection/grpc/event_consumer.proto:10:1: Import \"akka/projection/grpc/event_producer.proto\" was not found or had errors.\nakka/projection/grpc/event_consumer.proto:42:5: \"Event\" is not defined.\nakka/projection/grpc/event_consumer.proto:43:5: \"FilteredEvent\" is not defined.\nakka/projection/grpc/event_consumer.proto:59:12: \"FilterCriteria\" is not defined.\n" }
warning: build failed, waiting for other jobs to finish...

Add example use cases

Would be good to add one or a few use cases to the use-cases.md page that are relevant for akka-edge-rs

Difference in consistency boundary from regular Akka

In Akka Persistence the only the first effect is allowed to be a persist, and there is no direct API to do something async after receiving a command that then leads to a persist once that async task completes. This is on purpose since validating a command against the state and then persisting an event will always be inside a consistency boundary.

If the command can trigger some async logic and then persist an event once it completes, the state may have changed in the meanwhile, breaking the boundary making it possible to persist events that are not valid given the current state.

In the effect API here I think it is possible to do for example emit_event, and_then/and(then( some async work, followed by a and_then_emit_event which will break the boundary.

If we want to leave that up to the user to make sure they don't shoot their own feed, we should carefully document this difference.

An error occurred when processing an effect

When updating temperature there are sometimes (not always) this error:

[2023-10-18T08:15:12.946Z WARN  akka_persistence_rs::entity_manager] An error occurred when processing an effect for 1. Result: Err(IoError(Custom { kind: Other, error: "Problem emitting an event" })) Evicting it.
[2023-10-18T08:30:31.780Z WARN  akka_persistence_rs::entity_manager] An error occurred when processing an effect for 1. Result: Err(IoError(Custom { kind: Other, error: "Problem emitting an event" })) Evicting it.
[2023-10-18T08:30:35.731Z WARN  akka_persistence_rs::entity_manager] An error occurred when processing an effect for 1. Result: Err(IoError(Custom { kind: Other, error: "Problem emitting an event" })) Evicting it.
[2023-10-18T08:30:39.080Z WARN  akka_persistence_rs::entity_manager] An error occurred when processing an effect for 1. Result: Err(IoError(Custom { kind: Other, error: "Problem emitting an event" })) Evicting it.
[2023-10-18T08:30:42.084Z WARN  akka_persistence_rs::entity_manager] An error occurred when processing an effect for 1. Result: Err(IoError(Custom { kind: Other, error: "Problem emitting an event" })) Evicting it.
[2023-10-18T08:30:45.021Z WARN  akka_persistence_rs::entity_manager] An error occurred when processing an effect for 1. Result: Err(IoError(Custom { kind: Other, error: "Problem emitting an event" })) Evicting it.

It seems like the update is stored. I can see last ten from curl -v "127.0.0.1:8080/api/temperature/events/1" also after a restart of the iot-service

Persist vs emit

In Akka Persistence the effect is persist(event), can we align that instead of calling it emit(event) here to keep it familiar?

Running the entity manager

The guide has the section "Running the entity manager". It should have a brief introduction what the purpose of the entity manager.

The section should also explain the parameters to fn task. They are probably in main.rs, which is not mentioned there.

ERROR hyper::server::tcp] accept error: Too many open files (os error 24)

A convoluted way to reproduce this:

  1. start samples/grpc/restaurant-drone-deliveries-service-scala
sbt -Dconfig.resource=local1.conf run
  1. start samples/grpc/local-drone-control-scala
sbt run
  1. report location
grpcurl -d '{"drone_id":"drone1", "coordinates": {"longitude": 18.07125, "latitude": 59.41834}, "altitude": 5}' -plaintext 127.0.0.1:8080 local.drones.DroneService.ReportLocation
  1. stop local-drone-control-scala

  2. start iot-service-rs, which will connect to the "invalid" restaurant-drone-deliveries-service-scala

  3. Idle a few minutes

  4. report location again, which iot-service-rs doesn't know about

grpcurl -d '{"drone_id":"drone1", "coordinates": {"longitude": 18.07125, "latitude": 59.41834}, "altitude": 5}' -plaintext 127.0.0.1:8080 local.drones.DroneService.ReportLocation

That results in too many open files in iot-service-rs

[2023-10-19T09:19:48.099Z ERROR hyper::server::tcp] accept error: Too many open files (os error 24)
[2023-10-19T09:19:49.102Z ERROR hyper::server::tcp] accept error: Too many open files (os error 24)
[2023-10-19T09:19:50.650Z ERROR hyper::server::tcp] accept error: Too many open files (os error 24)
[2023-10-19T09:19:53.648Z ERROR hyper::server::tcp] accept error: Too many open files (os error 24)
[2023-10-19T09:19:57.834Z ERROR hyper::server::tcp] accept error: Too many open files (os error 24)

Reply after emit without state doesn't compile

From the docs I had expected this to be possible:

emit_event(Event::SomeEvent { stuff }).reply(reply_to, some_reply)

But I get an error for the reply call saying "method not found in `EmitEvent<_>"

Need to consider filters

What filtering functionality should we support for this initial work in terms of initial and adaptive filters?

Return a kill switch instead of having to create

For consumer::run, akka_projection_rs_grpc::producer::run and akka_projection_rs_grpc::producer::run

Instead of manually having to create one and pass when starting those, could they create the kill switch and return it?

UI in the sample makes it more complicated

@johanandren and I had this opinion when we saw the UI PR, and we are both even more convinced when reading the guide. The UI code takes too much focus, and makes the initial installation more difficult.

It would be better to walk through the "core" features, step-by-step, without having to explain things like broadcast_event or shared model.

Curl would be perfectly fine for interacting with the sample.

At the end of the guide, it could have a step explaining what would be needed to add UI.

Don't automatically skip events that cannot be deserialized

This looks like a difference from Akka JVM. There we don't automatically accept and ignore events with invalid format. The reason is that it is most likely a mistake to not evolve the serialization format in a compatible way, and then ignoring such events and continue with other events can lead to data corruption on the consumer side.

Originally posted by @patriknw in #119 (comment)

Projections for SSE and views

The core persistence and projection libraries are able to target WASM by design (not the gRPC or commit log ones though; just the core two).

As discussed at the CAB, we could create a crate for use in WASM browser contexts where there is a source provider of Server Sent Events (SSE).

In addition, and not dependent on WASM, but certainly useful in a WASM context, perhaps we can have something similar to the Kalix View where we event source an entity that cannot be commanded. This is useful in a web UI where we consume state from a projection (eg over SSE) so that we can render from that state.

The combination of SSE and event sourcing read-only entities in the browser is what we do with our products and it results in a responsive UI, leading to what we believe is a great user experience.

Timestamp offset, deduplication and backtracking

I have hinted at that the offset tracking is difficult, but we haven't went into the details yet. Now when we have the base in place it it might be a good time to explain and discuss more.

We might find a simplified solution for akka-edge-rs, but this is how it works in JVM Projections.

Background

First, why do we have a timestamp offset?

A database sequence is not good because it becomes a single point of bottleneck on the write path, and it doesn't have much guarantees for monotonically increasing without gaps anyway. We also wanted to design for distributed databases, where a global sequence number is not really an option.

Backtracking

Instead, we use a timestamp as starting point offset and then track individual sequence numbers for each persistence id on the consumer side. Since the timestamp isn't very exact (clock skew, delayed visibility, and so on) we scan back in time for potentially missed events. Events are therefore emitted more than once and deduplicated at the consumer side by comparing the persistenceId/seqNr and thereby exact deduplication of events that have already been processed.

In the end we allow at-least-once but we do best effort deduplication.

Events emitted by the backtracking don’t contain the event payload and the consumer can load the full envelope with the loadEnvelope query if it is not a duplicate.

Offset store

The offset store persists not only the latest processed timestamp, but also latest seqNr for each persistenceId. The history is capped to a time window. For quick deduplication it has a cache in memory. https://github.com/akka/akka-projection/blob/main/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala

TimestampOffset

The TimestampOffset has in addition to the utc timestamp a seen Map of persistenceId/seqNr. The Map is populated when several events have the exact same timestamp.

Events over PubSub

There is another source of duplicate events. Events are sent over the Akka Cluster when storing the event as a low latency path, see https://doc.akka.io/docs/akka-persistence-r2dbc/current/query.html#publish-events-for-lower-latency-of-eventsbyslices
Those events are also deduplicated on the consumer side by comparing the persistenceId/seqNr with processed offsets.

Consider the applicability of tags for the edge

From Patrik:

We have found that tags is a pretty good way to filter events (consumer defined filters, evaluated on producer side). Without tags those filters can only be on entity ids (exact or regex). I think we have been and still are uncertain about how useful consumer defined filters are for the case when edge is producer. I guess the consumer would be interested in all events from the edge in most cases, and the edge itself decides what to publish (producer defined filter). I would say that tag filters can be added when we have more important things in place. Also depends on effort, if it's a small thing to add tagging support we can just do it.

We now have a WithTags trait for envelopes. At present, it'll always return an empty vector.

Opaque error when commit log directory does not exist

More of an upstream issue maybe, but when handing a path that does not exist, the file log starts up without issue but then later fails leading to a pretty opaque "A problem occurred producing a envelope" error message from the CommitLogTopicAdapter.

Would be nice if the check was done on startup somewhere or if that is not possible for some reason at least lead to an error saying what is wrong.

Convenience for initial consumer filters

Not super important, but just to note it down in an issue.

On the Akka side we have an initial-consumer-filters, convenient for use cases where you have a set filter that belongs with the consuming service and never changes dynamically. Right now you have to do:

let expression = TopicMatcher::new(origin_id.clone()).unwrap();
let criteria = vec![
  exclude_all(),
    FilterCriteria::IncludeTopics {
      expressions: vec![expression.clone()],
    },
];
let (_consumer_filter_sender, consumer_filter_receiver) = watch::channel(criteria);
let source_provider = GrpcSourceProvider::new(|| event_producer.connect(), stream_id)
  .with_consumer_filters(consumer_filter_receiver);

With a convenience it could be something like, not a huge difference, but convenient:

let source_provider = GrpcSourceProvider::new(|| event_producer.connect(), stream_id)
  .with_initial_consumer_filters(vec![
    exclude_all(),
    FilterCriteria::IncludeTopics {
      expressions: vec![TopicMatcher::new(origin_id.clone()).unwrap()],
    },
  ]);

Unencrypted event storage

I can't find a way to build a CommitLogMarshaler without having to dig into event envelope construction details but that should be abstracted away from the users. What I find is implemented only for encrypted payloads in EncryptedCommitLogMarshaler.{decrypted_envelope,encrypted_producer_record}.

Not dealing with encryption makes getting started easier and must surely be something at least a part of the potential user base would want?

Backend support for projecting persistent and ephemeral events to a UI

In our own edge applications, we prefer to push both persistent and ephemeral events from a backend service to the frontend. I believe that the scenario is important enough that we should be helping the developer support it at the backend.

The solution we've arrived at is similar to where a gRPC consumer at the edge consumes a stream of events from the remote producer. The end result is an improved user experience as the frontend application becomes responsive. An example from our application is where real time metering events from an electrical switchboard are streamed to the UI twice per second (animated gif):

Note that these metering events are not required to be persisted. However, the events that inform the UI of the "Monitor" being online, and its mains capacity are persisted. Both types of events are from the same entity. It would also make coding the UI more difficult if these events arrived out of order.

Most applications in this space poll for this type of metering data, from once per 5 minutes to 15 minutes in my experience. This leads to an inadequate user experience as viewing metering data close to real-time can be very useful, and sometimes very important.

Predefined marshaller

When getting started it would be good if we can figure out some common enough id and serialisation scheme and have a predefined marshaler that you can construct instead of having to implement one yourself. Implementing it yourself should still be possible of course to unlock the full capabilities.

I'd expect such a predefined marshaller to take the entity type string and possibly some event to ordinal lambda (or could we maybe even use the enum discriminant, is that too implicit?) to it's constructor, then use non-encrypted (if possible) CBOR or JSON for storage.

Is compaction enabled?

I tried to test compaction, but there is something I don't understand. I stopped the iot-service-scala, and updated the temperature over 100 times for the same entity. Then I expected that it would only keep the last 10 events. When starting iot-service-scala I still see all TemperatureRead events emitted over grpc and stored in the iot-service-scala db.

By adding println in on_event of the temperature entity I can also see that all events are replayed on startup.

Edge didn't reconnect

I don't have much information about how to reproduce this, but something is not right with the reconnect. I had stopped iot-service-scala. Updated the temperature 1000 times. Started iot-service-scala, and it didn't receive the events. Nothing in logs. Updated temp a few more times when both running, nothing emitted over grpc.

Restarted the iot-service-rs and then the events were received in iot-service-scala.

Too many open files

After trying the samples, and I had restarted the jvm service a few times

[2023-09-11T12:37:14.923Z DEBUG hyper::client::connect::http] connecting to 127.0.0.1:8101
[2023-09-11T12:37:22.669Z DEBUG hyper::client::connect::http] connecting to 127.0.0.1:8101
[2023-09-11T12:37:22.670Z DEBUG hyper::client::connect::http] connected to 127.0.0.1:8101
[2023-09-11T12:37:22.670Z DEBUG h2::client] binding client connection
[2023-09-11T12:37:22.670Z DEBUG h2::client] client connection bound
[2023-09-11T12:37:22.670Z DEBUG h2::codec::framed_write] send frame=Settings { flags: (0x0), enable_push: 0, initial_window_size: 2097152, max_frame_size: 16384 }
[2023-09-11T12:37:22.670Z DEBUG h2::proto::connection] Connection; peer=Client
[2023-09-11T12:37:22.670Z DEBUG h2::codec::framed_write] send frame=WindowUpdate { stream_id: StreamId(0), size_increment: 5177345 }
[2023-09-11T12:37:22.672Z DEBUG tower::buffer::worker] service.ready=true message=processing request
[2023-09-11T12:37:22.672Z DEBUG h2::codec::framed_write] send frame=Headers { stream_id: StreamId(1), flags: (0x4: END_HEADERS) }
[2023-09-11T12:37:22.672Z DEBUG h2::codec::framed_write] send frame=Data { stream_id: StreamId(1) }
[2023-09-11T12:37:22.725Z DEBUG h2::codec::framed_read] received frame=Settings { flags: (0x0), max_concurrent_streams: 256 }
[2023-09-11T12:37:22.725Z DEBUG h2::codec::framed_write] send frame=Settings { flags: (0x1: ACK) }
[2023-09-11T12:37:22.725Z DEBUG h2::codec::framed_read] received frame=Settings { flags: (0x1: ACK) }
[2023-09-11T12:37:22.725Z DEBUG h2::proto::settings] received settings ACK; applying Settings { flags: (0x0), enable_push: 0, initial_window_size: 2097152, max_frame_size: 16384 }
[2023-09-11T12:37:22.725Z DEBUG h2::codec::framed_read] received frame=WindowUpdate { stream_id: StreamId(0), size_increment: 9934465 }
[2023-09-11T12:37:22.780Z DEBUG h2::codec::framed_read] received frame=Headers { stream_id: StreamId(1), flags: (0x4: END_HEADERS) }
[2023-09-11T12:37:22.780Z DEBUG h2::codec::framed_read] received frame=WindowUpdate { stream_id: StreamId(1), size_increment: 446531 }
[2023-09-11T12:37:22.901Z DEBUG h2::codec::framed_read] received frame=Data { stream_id: StreamId(1) }
[2023-09-11T12:37:22.902Z DEBUG h2::codec::framed_read] received frame=Data { stream_id: StreamId(1) }
[2023-09-11T12:37:23.575Z WARN  streambed_logged] Error reading topic file: Too many open files (os error 24) - aborting subscription
[2023-09-11T12:37:23.575Z WARN  streambed_logged] Error reading topic file: Too many open files (os error 24) - aborting subscription
[2023-09-11T12:37:23.575Z WARN  streambed_logged] Error reading topic file: Too many open files (os error 24) - aborting subscription
[2023-09-11T12:37:23.575Z WARN  streambed_logged] Error reading topic file: Too many open files (os error 24) - aborting subscription
[2023-09-11T12:37:23.575Z WARN  streambed_logged] Error reading topic file: Too many open files (os error 24) - aborting subscription
[2023-09-11T12:37:23.575Z WARN  streambed_logged] Error reading topic file: Too many open files (os error 24) - aborting subscription
[2023-09-11T12:37:23.575Z WARN  streambed_logged] Error reading topic file: Too many open files (os error 24) - aborting subscription
[2023-09-11T12:37:23.575Z WARN  streambed_logged] Error reading topic file: Too many open files (os error 24) - aborting subscription
[2023-09-11T12:37:23.575Z WARN  streambed_logged] Error reading topic file: Too many open files (os error 24) - aborting subscription
[2023-09-11T12:37:23.575Z WARN  streambed_logged] Error reading topic file: Too many open files (os error 24) - aborting subscription
[2023-09-11T12:37:23.575Z WARN  streambed_logged] Error reading topic file: Too many open files (os error 24) - aborting subscription
[2023-09-11T12:37:23.575Z WARN  streambed_logged] Error reading topic file: Too many open files (os error 24) - aborting subscription
[2023-09-11T12:37:23.576Z WARN  streambed_logged] Error reading topic file: Too many open files (os error 24) - aborting subscription
[2023-09-11T12:37:23.576Z WARN  streambed_logged] Error reading topic file: Too many open files (os error 24) - aborting subscription
[2023-09-11T12:37:23.576Z WARN  streambed_logged] Error reading topic file: Too many open files (os error 24) - aborting subscription
[2023-09-11T12:37:23.576Z WARN  streambed_logged] Error reading topic file: Too many open files (os error 24) - aborting subscription
[2023-09-11T12:37:23.576Z WARN  streambed_logged] Error reading topic file: Too many open files (os error 24) - aborting subscription
[2023-09-11T12:37:23.576Z WARN  streambed_logged] Error reading topic file: Too many open files (os error 24) - aborting subscription
[2023-09-11T12:37:23.576Z WARN  streambed_logged] Error reading topic file: Too many open files (os error 24) - aborting subscription
[2023-09-11T12:37:23.576Z WARN  streambed_logged] Error reading topic file: Too many open files (os error 24) - aborting subscription

Can we hide Topics?

I know Topics come from Streambed, but do we need to expose that in the user API? Could we at least have some convenient convention where we have a 1:1 between topic and entity_type?

Emitting filtered event once per second

When leaving it running it seems to emit a filtered event once per second.

select persistence_id, seq_nr, db_timestamp, event_ser_id, event_ser_manifest, event_payload from event_journal order by db_timestamp;
 persistence_id | seq_nr |         db_timestamp          | event_ser_id |            event_ser_manifest            |                event_payload
----------------+--------+-------------------------------+--------------+------------------------------------------+----------------------------------------------
 Registration|1 |      1 | 2023-10-18 08:58:26.807942+02 |           33 | iot.registration.Registration$Registered | \xbf66736563726574bf6576616c756563666f6fffff
 Sensor|1       |      1 | 2023-10-18 09:25:08.837427+02 |           34 | F                                        | \x
 Sensor|1       |      2 | 2023-10-18 09:25:11.831705+02 |           34 | F                                        | \x
 Sensor|1       |      3 | 2023-10-18 09:26:11.940911+02 |           34 | F                                        | \x
 Sensor|1       |      4 | 2023-10-18 09:26:56.254831+02 |            2 | iot.temperature.proto.TemperatureRead    | \x0802
 Sensor|1       |      5 | 2023-10-18 09:27:12.22157+02  |           34 | F                                        | \x
 Sensor|1       |      6 | 2023-10-18 09:28:12.334963+02 |           34 | F                                        | \x
 Sensor|1       |      7 | 2023-10-18 09:29:12.451345+02 |           34 | F                                        | \x
 Sensor|1       |      8 | 2023-10-18 09:30:12.560615+02 |           34 | F                                        | \x
 Sensor|1       |      9 | 2023-10-18 09:31:12.659421+02 |           34 | F                                        | \x
 Sensor|1       |     10 | 2023-10-18 09:32:12.763068+02 |           34 | F                                        | \x
 Sensor|1       |     11 | 2023-10-18 09:33:12.869349+02 |           34 | F                                        | \x
 Sensor|1       |     12 | 2023-10-18 09:34:12.981261+02 |           34 | F                                        | \x
 Sensor|1       |     13 | 2023-10-18 09:35:13.090149+02 |           34 | F                                        | \x
 Sensor|1       |     14 | 2023-10-18 09:36:13.193917+02 |           34 | F                                        | \x
 Sensor|1       |     15 | 2023-10-18 09:37:13.296181+02 |           34 | F                                        | \x
 Sensor|1       |     16 | 2023-10-18 09:38:13.41136+02  |           34 | F                                        | \x
 Sensor|1       |     17 | 2023-10-18 09:39:14.519305+02 |           34 | F                                        | \x
 Sensor|1       |     18 | 2023-10-18 09:40:14.617701+02 |           34 | F                                        | \x
 Sensor|1       |     19 | 2023-10-18 09:41:14.722375+02 |           34 | F                                        | \x
 Sensor|1       |     20 | 2023-10-18 09:42:14.825526+02 |           34 | F                                        | \x
 Sensor|1       |     21 | 2023-10-18 09:43:14.933911+02 |           34 | F                                        | \x
 Sensor|1       |     22 | 2023-10-18 09:44:15.043617+02 |           34 | F                                        | \x
 Sensor|1       |     23 | 2023-10-18 09:45:15.156405+02 |           34 | F                                        | \x
 Sensor|1       |     24 | 2023-10-18 09:46:15.26491+02  |           34 | F                                        | \x
 Sensor|1       |     25 | 2023-10-18 09:47:15.386042+02 |           34 | F                                        | \x
 Sensor|1       |     26 | 2023-10-18 09:48:15.504482+02 |           34 | F                                        | \x
 Sensor|1       |     27 | 2023-10-18 09:49:15.613997+02 |           34 | F                                        | \x
 Sensor|1       |     28 | 2023-10-18 09:50:15.73658+02  |           34 | F                                        | \x
 Sensor|1       |     29 | 2023-10-18 09:51:15.852413+02 |           34 | F                                        | \x
 Sensor|1       |     30 | 2023-10-18 09:52:15.991798+02 |           34 | F                                        | \x
 Sensor|1       |     31 | 2023-10-18 09:53:16.112647+02 |           34 | F                                        | \x
 Sensor|1       |     32 | 2023-10-18 09:54:16.225129+02 |           34 | F                                        | \x
 Sensor|1       |     33 | 2023-10-18 09:55:16.338582+02 |           34 | F                                        | \x
 Sensor|1       |     34 | 2023-10-18 09:56:17.467419+02 |           34 | F                                        | \x
 Sensor|1       |     35 | 2023-10-18 09:57:17.576886+02 |           34 | F                                        | \x
 Sensor|1       |     36 | 2023-10-18 09:58:17.704509+02 |           34 | F                                        | \x
 Sensor|1       |     37 | 2023-10-18 09:59:17.829994+02 |           34 | F                                        | \x
 Sensor|1       |     38 | 2023-10-18 10:00:17.962988+02 |           34 | F                                        | \x
 Sensor|1       |     39 | 2023-10-18 10:01:18.084971+02 |           34 | F                                        | \x
 Sensor|1       |     40 | 2023-10-18 10:02:18.210927+02 |           34 | F                                        | \x
 Sensor|1       |     41 | 2023-10-18 10:03:18.341436+02 |           34 | F                                        | \x
 Sensor|1       |     42 | 2023-10-18 10:04:18.459105+02 |           34 | F                                        | \x
 Sensor|1       |     43 | 2023-10-18 10:05:18.588381+02 |           34 | F                                        | \x
 Sensor|1       |     44 | 2023-10-18 10:06:18.719669+02 |           34 | F                                        | \x
 Sensor|1       |     45 | 2023-10-18 10:07:18.850505+02 |           34 | F                                        | \x
 Sensor|1       |     46 | 2023-10-18 10:08:18.981718+02 |           34 | F                                        | \x
 Sensor|1       |     47 | 2023-10-18 10:09:19.104074+02 |           34 | F                                        | \x
 Sensor|1       |     48 | 2023-10-18 10:10:19.235849+02 |           34 | F                                        | \x
 Sensor|1       |     49 | 2023-10-18 10:11:19.371335+02 |           34 | F                                        | \x
 Sensor|1       |     50 | 2023-10-18 10:12:19.493649+02 |           34 | F                                        | \x
 Sensor|1       |     51 | 2023-10-18 10:13:19.61885+02  |           34 | F                                        | \x
 Sensor|1       |     52 | 2023-10-18 10:14:19.745095+02 |           34 | F                                        | \x
 Sensor|1       |     53 | 2023-10-18 10:14:51.814505+02 |            2 | iot.temperature.proto.TemperatureRead    | \x0803
 Sensor|1       |     54 | 2023-10-18 10:15:13.865504+02 |            2 | iot.temperature.proto.TemperatureRead    | \x0804
 Sensor|1       |     55 | 2023-10-18 10:22:54.964036+02 |           34 | F                                        | \x
 Sensor|1       |     56 | 2023-10-18 10:23:56.061048+02 |           34 | F                                        | \x
 Sensor|1       |     57 | 2023-10-18 10:24:56.187304+02 |           34 | F                                        | \x
 Sensor|1       |     58 | 2023-10-18 10:25:56.319631+02 |           34 | F                                        | \x

Out of the box unencrypted marshaller

I think it is fine to strongly recommend encrypted data at rest in docs etc if we want but we still should make it possible to create a marshaller without encryption without having to touch internal APIs (constructing EventEnvelope the yourself and having to care about the ProducerRecord encoding of data in the streambed store).

Return the channel from the entity manager

Could it make sense to return a Sender<Message<...>> from entity_manager::run instead of () like now, instead of the user having to create the sender/receiver?

When setting the entity up you'd always want that sender, so it will be repeated logic for each entity just to be able to set the buffer size (which we could take as a parameter to run).

Producer doesn't reconnect after wrong stream_id

How to reproduce:

  1. enable debug logs in iot-servcie-scala src/main/resources/logback.xml
    <logger name="akka.projection.grpc" level="DEBUG" />
    
  2. Change to another event stream in TemperatureEvents.cala
    val TemperatureEventsStreamId = "wrong-temperature-events"
    
  3. Start iot-servcie-scala
  4. Start iot-service-rs
  5. emit some temperature updates, these will not be accepted in iot-servcie-scala and log will show:
    Event producer [edge-iot-service] wanted to push events for stream id [temperature-events] but that is not among the accepted stream ids
    
  6. keep iot-service-rs running
  7. stop iot-servcie-scala
  8. Change to the right event stream in TemperatureEvents.cala
    val TemperatureEventsStreamId = "temperature-events"
    
  9. start iot-servcie-scala
  10. here we would expect iot-service-rs to reconnect and deliver the events, but nothing happens
  11. try a few more temperature updates, but those will also not be delivered
  12. restart iot-service-rs, it will connect and deliver the events. log shows
    Event stream from [edge-iot-service] for stream id [temperature-events] started
    

Duplicate Registered events

Strange. I registered 10 entities in iot-service-scala. I happened to have some println in on_event in the temperature entity and noticed that there are many duplicate Registered events, and more are emitted at the end when keeping the systems running idle.

println output, where the first number is the entity id:

1 on_event: Registered foo
1 on_event: Registered foo
1 on_event: Registered foo
1 on_event: Registered foo
1 on_event: Registered foo
1 on_event: Registered foo
1 on_event: Registered foo
1 on_event: Registered foo
1 on_event: Registered foo
1 on_event: Registered foo
2 on_event: Registered foo2
2 on_event: Registered foo2
3 on_event: Registered foo2
4 on_event: Registered foo2
4 on_event: Registered foo2
5 on_event: Registered foo2
5 on_event: Registered foo2
6 on_event: Registered foo2
6 on_event: Registered foo2
7 on_event: Registered foo2
7 on_event: Registered foo2
3 on_event: Registered foo2
4 on_event: Registered foo2
5 on_event: Registered foo2
6 on_event: Registered foo2
7 on_event: Registered foo2
2 on_event: Registered foo2
3 on_event: Registered foo2
4 on_event: Registered foo2
5 on_event: Registered foo2
6 on_event: Registered foo2
7 on_event: Registered foo2
2 on_event: Registered foo2
3 on_event: Registered foo2
4 on_event: Registered foo2
5 on_event: Registered foo2
6 on_event: Registered foo2
7 on_event: Registered foo2
2 on_event: Registered foo2
3 on_event: Registered foo2
4 on_event: Registered foo2
5 on_event: Registered foo2
6 on_event: Registered foo2
7 on_event: Registered foo2
2 on_event: Registered foo2
3 on_event: Registered foo2
4 on_event: Registered foo2
5 on_event: Registered foo2
6 on_event: Registered foo2
7 on_event: Registered foo2
2 on_event: Registered foo2
3 on_event: Registered foo2
4 on_event: Registered foo2
5 on_event: Registered foo2
6 on_event: Registered foo2
7 on_event: Registered foo2
2 on_event: Registered foo2
3 on_event: Registered foo2
4 on_event: Registered foo2
5 on_event: Registered foo2
6 on_event: Registered foo2
7 on_event: Registered foo2
2 on_event: Registered foo2
3 on_event: Registered foo2
4 on_event: Registered foo2
5 on_event: Registered foo2
6 on_event: Registered foo2
7 on_event: Registered foo2
2 on_event: Registered foo2
3 on_event: Registered foo2
4 on_event: Registered foo2
5 on_event: Registered foo2
6 on_event: Registered foo2
7 on_event: Registered foo2
2 on_event: Registered foo2
3 on_event: Registered foo2
4 on_event: Registered foo2
5 on_event: Registered foo2
6 on_event: Registered foo2
7 on_event: Registered foo2
2 on_event: Registered foo2
3 on_event: Registered foo2
4 on_event: Registered foo2
5 on_event: Registered foo2
6 on_event: Registered foo2
7 on_event: Registered foo2
2 on_event: Registered foo2
3 on_event: Registered foo2
4 on_event: Registered foo2
5 on_event: Registered foo2
6 on_event: Registered foo2
7 on_event: Registered foo2
2 on_event: Registered foo2
3 on_event: Registered foo2
4 on_event: Registered foo2
5 on_event: Registered foo2
6 on_event: Registered foo2
7 on_event: Registered foo2
2 on_event: Registered foo2
3 on_event: Registered foo2
4 on_event: Registered foo2
5 on_event: Registered foo2
6 on_event: Registered foo2
7 on_event: Registered foo2
2 on_event: Registered foo2
3 on_event: Registered foo2
4 on_event: Registered foo2
5 on_event: Registered foo2
6 on_event: Registered foo2
7 on_event: Registered foo2
8 on_event: Registered foo2
9 on_event: Registered foo2
8 on_event: Registered foo2
10 on_event: Registered foo2
10 on_event: Registered foo2
9 on_event: Registered foo2
10 on_event: Registered foo2
8 on_event: Registered foo2
9 on_event: Registered foo2
10 on_event: Registered foo2
8 on_event: Registered foo2
9 on_event: Registered foo2
10 on_event: Registered foo2
8 on_event: Registered foo2
9 on_event: Registered foo2
10 on_event: Registered foo2
8 on_event: Registered foo2
9 on_event: Registered foo2
10 on_event: Registered foo2
8 on_event: Registered foo2
9 on_event: Registered foo2
10 on_event: Registered foo2
8 on_event: Registered foo2
9 on_event: Registered foo2
10 on_event: Registered foo2
8 on_event: Registered foo2

More effect DSL examples in docs

This might be a consequence of me not being familiar enough with rust but I found it hard to understand the various composed effect factory names. When looking at for example warp they have a section on their dsl in the doc start page: https://docs.rs/warp/latest/warp/#filters and then also on individual function pages https://docs.rs/warp/latest/warp/filters/path/index.html

Would be nice if we could do something like that as well, on the landing page, the entity page or maybe the effect page and the individual concrete effect function pages.

Persist multiple events

In Akka persistence we have persist(List(events)) and also varargs persist(event1, event2, ...).
Is that not possible with streamed because the emit cannot be atomic?
If it is possible we should have effects multiple events as well.

Sample code / effect systems complexity

I assume that we aim this against Scala developers already familiar with Akka and the effect system, so this may be okay and might only need to be cleaned up a bit so it's easier on the eyes.

https://doc.akka.io/docs/akka-edge-preview/snapshot/guide-rs/3-temperature-entity.html

But I wonder how many Rust developers intuitively get the and(then without us having a section on it?

              emit_event(Event::TemperatureRead { temperature })
                    .and(then(
                        move |behavior: &Self,
                              _new_state: Option<&State>,
                              prev_result: effect::Result| {
                            behavior.send_broadcast_event(
                                prev_result,
                                broadcast_entity_id,
                                broadcast_temperature,
                            )
                        },
                    ))
                    .boxed()

Investigate using gRPC-Web in place of SSE for the frontend

Consider supporting gRPC-Web for browser communication in place of the Server Sent Events (SSE) we have in our example. By doing so, the machinery of projections can be shared between the frontend and backend, further reducing what the developer must learn.

The gRPC library we use (Tonic) supports gRPC-Web on the server-side out of the box.

There is also a gRPC-Web client for Rust on Webassembly which looks to be feature-complete and active.

A consideration of gRPC-Web in general is that while server-side streaming is supported, client-side streaming is not. For a comprehensive outline on where gRPC-Web is going please read https://github.com/grpc/grpc-web/blob/master/doc/roadmap.md.

In general terms, client-side streaming should not be required by a frontend. However, the Akka gRPC consumer protocol does stream an initial request followed (optionally) by requests to update filters on the consumer side. Perhaps a variant of the EventsBySlices request would be possible that takes an InitReq in place of a StreamIn might be considered, foregoing the ability to adapt filters.

Marshal vs marshall

Not sure if this is a British vs American English spelling thing, but in Akka HTTP we use Marshaller and Unmarshaller while here it is Marshaler, can we align?

Two offset stores in play

When consuming a projection, we have two offset stores in play.

The first, commit-log-based offset store is provided to the GrpcSourceProvider:

    let event_producer = Channel::builder(event_producer_addr);
    let source_provider =
        GrpcSourceProvider::new(|| event_producer.connect(), stream_id, offset_store);

The source provider will use this offset store to validate each entity's seq_nr.

The next offset store is further down when running the projection:

    akka_projection_rs_storage::run(
        &secret_store,
        &offsets_key_secret_path,
        &state_storage_path,
        kill_switch,
        source_provider,
        handler,
        Duration::from_millis(100),
    )
    .await

... where the first three params, and last param are supplied so the projection can both source and save the last offset processed from and to a file.

The above can be simplified by supplying the offset store to just the projection, and also providing a load_event method on the source provider. This will then permit the projection to validate entity seq_nrs in place of the source provider doing it; which would also be consistent with the approach the JVM code takes.

Look toward supporting native async traits

Creating this is to capture the desire to support the async trait function of Rust 1.75.

In essence, the change would be to remove usage of the async-trait crate and associated macros.

Higher level api for typical usage

We have many channels that the user must create and then we use them to glue together the different components. That might give us high flexibility of combining different components with each other. At some point I think we should review that with the DX hat on and see what we can make easier for the normal cases.

Originally posted by @patriknw in #36 (comment)

In that PR we combine akka_projection_rs_grpc::producer with akka_projection_rs_storage. Maybe that could be combined into a higher level user api, which could create the channels.

Edge Sample: Error consuming events from JVM

When running the akka-edge-rs version of the local drone control (from https://github.com/lightbend/akka-projection-temp/pull/16 ) against the Scala restaurant-drone-deliveries, with an empty database, I get an error:

[2023-11-06T13:46:41Z INFO  warp::server] Server::run; addr=127.0.0.1:8080
[2023-11-06T13:46:41Z INFO  warp::server] listening on http://127.0.0.1:8080
[2023-11-06T13:46:45Z WARN  akka_projection_rs::consumer] Back track received for a future event: RestaurantDeliveries|restaurant1. Aborting stream.
[2023-11-06T13:46:49Z WARN  akka_projection_rs::consumer] Back track received for a future event: RestaurantDeliveries|restaurant1. Aborting stream.
[2023-11-06T13:46:52Z WARN  akka_projection_rs::consumer] Back track received for a future event: RestaurantDeliveries|restaurant1. Aborting stream.
[2023-11-06T13:46:56Z WARN  akka_projection_rs::consumer] Back track received for a future event: RestaurantDeliveries|restaurant1. Aborting stream.
[2023-11-06T13:47:00Z WARN  akka_projection_rs::consumer] Back track received for a future event: RestaurantDeliveries|restaurant1. Aborting stream.
[2023-11-06T13:47:03Z WARN  akka_projection_rs::consumer] Back track received for a future event: RestaurantDeliveries|restaurant1. Aborting stream.
...

Steps to reproduce:

  • start the restaurant-drone-deliveries-scala sample up
    • follow sample README.md to set new empty postgres up
    • start single restaurant drone-deliveries service up: sbt -Dconfig.resource=local1.conf run
  • start the local-drone-control-rs up:
    • mkdir ./target/eventlog
    • cargo run -- --cl-root-path=./target/eventlog
  • Register a restaurant in the cloud service: grpcurl -d '{"restaurant_id":"restaurant1","coordinates":{"latitude": 59.330324, "longitude": 18.039568}, "local_control_location_id": "sweden/stockholm/kungsholmen" }' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.SetUpRestaurant
  • Register a delivery for the restaurant grpcurl -d '{"restaurant_id":"restaurant1","delivery_id": "order1","coordinates":{"latitude": 59.330841, "longitude": 18.038885}}' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.RegisterDelivery
  • As soon as the delivery is passed to the local-drone-control-rs it is stuck with the error retry loop

Optional return values from marshaller

Why are to_compaction_key, to_entity_id, envelope and producer_record returning Option, shouldn't they all always produce a value or fail (so possibly Result<T, Error> if not just the return value)?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.