Code Monkey home page Code Monkey logo

cqrs's Introduction

cqrs

A lightweight, opinionated CQRS and event sourcing framework targeting serverless architectures.

Command Query Responsibility Segregation (CQRS) is a pattern in Domain Driven Design that uses separate write and read models for application objects and interconnects them with events. Event sourcing uses the generated events as the source of truth for the state of the application.

Together these provide a number of benefits:

  • Removes coupling between tests and application logic allowing limitless refactoring.
  • Greater isolation of the aggregate.
  • Ability to create views that more accurately model our business environment.
  • A horizontally scalable read path.

Things that could be helpful:

Three backing data stores are supported:

Crates.io docs CodeBuild

cqrs's People

Contributors

danglotb avatar danieleades avatar davegarred avatar dependabot[bot] avatar dmytrokyrychuk avatar eduardohki avatar frederikhors avatar gregthomas-srvrls avatar rogierknoester avatar serverlesstechnology avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

cqrs's Issues

Feature: Make aggregate id, aggregate type, event type, event version types parametric

I had a look at the serialization into postgres via postgres-es and the current implementation of

aggregate_id: string
aggregate_type: string
event_type: string
event_version string

is quite limiting.

Speaking from experience in a production db 5 years in with a hundred billion events using queries on the event store get super slow, bc. text is not a good base type, keeping those types more parametric so one can choose something more performant like uuid for ids, postgres enums for event/aggregate type (which will add some error potential to the process of adding new events/aggregate, for the sake of performance).

and the event version as it currently is used is a bit problematic - the examples suggest using a version kinda string "x.y.z", but I believe string has a lexicographic ordering - so wouldn't "0.20.0" be before "0.3.0"?

Allow searching for aggregate instance without aggregate ID

My team is happily utilizing this crate, however, we're a bit stuck at the moment with the following:

It seems like aggregate instances can only be retrieved from the store by their aggregate IDs(?). For example, if I have a BankAccount Aggregate:

pub struct BankAccount {
    account_id: String,
    first_name: String,
    last_name: String,
    email: String,
    balance: f64,
}

Then I can indeed the specific BankAccount instance using account_id (from https://github.com/serverlesstechnology/cqrs-demo/blob/main/src/route_handler.rs#L11-L26):

pub async fn query_handler(
    Path(account_id): Path<String>,
    State(state): State<ApplicationState>,
) -> Response {
    let view = match state.account_query.load(&account_id).await {
        Ok(view) => view,
        Err(err) => {
            println!("Error: {:#?}\n", err);
            return (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into_response();
        }
    };
    match view {
        None => StatusCode::NOT_FOUND.into_response(),
        Some(account_view) => (StatusCode::OK, Json(account_view)).into_response(),
    }
}

But in some situations I might not have access to the account_id, but I still want to be able to find the BankAccount instance that I need, for example using an email address:

pub async fn query_handler_2(
    email: String,
    State(state): State<ApplicationState>,
) -> Response {
    let view = match state.account_query.find(&email).await {    // <--- use a 'find' method instead of 'load'.
        Ok(view) => view,
        Err(err) => {
            println!("Error: {:#?}\n", err);
            return (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into_response();
        }
    };
    match view {
        None => StatusCode::NOT_FOUND.into_response(),
        Some(account_view) => (StatusCode::OK, Json(account_view)).into_response(),
    }
}

So my main question is, what is the intended way of implementing a way to retrieve an Aggregate instance not by its ID, but rather by something else (email or something else)?

Else would it make sense to add a find/search method to the ViewRepository trait? If so, I would be more than happy to contribute this.

Question: Default trait for aggregate

Why is it necessary to have the Default trait for aggregates?
Is this to avoid a "Aggregate not found error"?

I have used CQRS in haskell, and reading this rust implementation is really neat and tidy, good work!

Alternative pattern for Event upcasting

I wanted to present an alternative pattern for event upcasting. This relies on serde's ability to deserialize untagged enum representations.

I've used this pattern before for backwards compatibility of configuration files-

The gist is to separate the internal representation of an event from its serialised representation. It's serialised representation is an untagged union of all historical versions of the Event. You then add an infallible conversion from the union to the current version, and let serde do the rest.

use serde::{Deserialize, Serialize};

mod legacy {
    //! Previous versions of the `Event` enum, for backwards compatibility
    use serde::{Deserialize, Serialize};

    #[derive(Serialize, Deserialize)]
    pub enum V1 {}

    #[derive(Serialize, Deserialize)]
    pub enum V2 {}
}

// This is version 3 of the 'event'
#[derive(Serialize, Deserialize)]
#[serde(from = "EventRep")]
pub enum Event {}

#[derive(Serialize, Deserialize)]
#[serde(untagged)]
enum EventRep {
    V1(legacy::V1),
    V2(legacy::V2),
    V3(Event)
}

impl From<EventRep> for Event {
    fn from(value: EventRep) -> Self {
        match value {
            EventRep::V1(_) => todo!(),
            EventRep::V2(_) => todo!(),
            EventRep::V3(event) => event,
        }
    }
}

For fallible conversions, you could also use #[serde(try_from = "EventRep"].

Implementing upcasting this way simplifies the implementation of the framework, and removes the 'stringly' typed upcasting API in favour of a strongly-typed pattern. The downside is possibly more cognitive load on downstream users to implement this themselves and to get it right.

Obviously this is a breaking change, but i'm interested to get your thoughts.

I'd say it's likely to be possible to simplify some of the boilerplate with a derive macro, if such a thing doesn't already exist in the wild

Either make EventStore public or have some kind of accessor?

Hi again! One use case I've run into recently is sometimes I want to build an aggregate without a command to send it. It might be in lieu of a query where the query would be overkill or as part of a query processor chain. In order to do this, I can call load_aggregate on the EventStore object and call the aggregate() method. Unfortunately, there is no public handle to the EventStore member in the CqrsFramework struct, so I have to create duplicate EventSource objects that I pass along to my axum routes. This works fine, but it would be a lot nicer if either the EventSource was public or I could access it somehow. What are your thoughts?

Questions: deleting events and query by aggregate property

Is there a way to delete events from the database, I am asking bc. for GDPR purposes
I need to be able to delete user data if requested.

Also if I create a view say of a user(id, name, email) aggregate - can I query by name or email through the cqrs framework?
Of course I can always create queries that write to the db with raw sql and then query those with raw sql as well.

load_all / `.iter()` on `ViewRepository`

Returning all entries from a ViewRepository, not only one by ID, would be nice addition.
I want a REST API like /query/entry/ to return all and /query/entry/{id}/ to return one. Last one would be easy to implement with ViewRepository. First one I'm struggling how to do with cqrs-es.

Modifications of Views

How can I modify View structs reliably? If I have a View for an Aggregate with thousands of instances, and I decide to add an attribute to the view struct, this new attribute will only exist on the instances, that have been updated after the code change. The only way to update all view instances would be to drop the entire view table and replay every event for the view query, right?

Possibility of queries loosing an event.

for processor in &self.queries {

        let committed_events = self
            .store
            .commit(resultant_events, aggregate_context, metadata)
            .await?;
        for processor in &self.queries {
            let dispatch_events = committed_events.as_slice();
            processor.dispatch(aggregate_id, dispatch_events).await;
        }

if the program crashes between committing and dispatching to queries, the queries might never see events. Is this on purpose? I would expect that to guarantee at least once delivery the dispatch should be done first, and commit later, but I'm somewhat of a noob.

About event replay and read model changes

A view payload {"name": "John"} is compatible with the view

struct ProductView {
    name: String,
}

But it will be incompatible if we change this read model to

struct ProductView {
    name: String,
    price: u32
}

It will then result in DeserializationError(Error("missing field events", line: 0, column: 0). Is it safe to assume that we should add a deserialization fallback in case it fails or even an explicit "view upcast" (I don't even know if this is technically correct) in the same fashion as the event upcast?

Incompatible payloads will exist from the moment the new read model is deployed until the events are replayed to rebuild the views, which can take some time to be fully replayed and updated.

About event replay, I see that there is only PersistedEventRepository::get_events (by single aggregate_id) and ViewRepository::update_view (for single view). I forked the repos to add (it's still WIP):

// PersistedEventRepository
   async fn get_multiple_aggregate_events<A: Aggregate>(
        &self,
        aggregate_ids: Vec<&str>,
    ) -> Result<HashMap<String, Vec<SerializedEvent>>, PersistenceError>;
    
// ViewRepository
    async fn update_views(&self, views: Vec<(V, ViewContext)>) -> Result<(), PersistenceError>;

So we can batch select and update aggregate and views. What are your thoughts?

`0.4.10` Fails to compile on `1.75`

Rust now has native support for async fn in traits and implementations.

For the time being, any attempt to compile with a cleaned up Cargo.lock results in the following:

method `handle` should be async because the method from the trait is async

Minimum working example:

  1. cargo new cqrs-es-test
  2. In cqrs-es-test run cargo add cqrs-es
  3. Finally, run cargo check.

Proposed solution

Remove dependency on async_trait. It handles more edge cases than the current rustc solution, but those edge cases are likely to be resolved upstream.

Consistency over mulitple commands

cqrs_a.execute(id_a, command_a).await?;
cqrs_b.execute(id_b, command_b).await?;

If the server crashes right after executing the first line, only the aggregate of cqrs_a will be in the correct state and the state of cqrs_b's aggregate will be incorrect. Is there some way of ensuring that the resultant events of both commands are either all commited, if both commands succeed, or all dropped, if one of the commands fails?

How to compose a single view from multiple aggregates where an aggregate references another aggregate?

Hi,

Given a single view composed by two aggregates roots, Vendor and Product:

pub struct VendorView {
    pub id: String,
    pub name: String,
    pub products: Vec<VendorViewProduct>,
}

pub struct VendorViewProduct {
    pub id: String,
    pub name: String,
    pub price: u32,
}

where id is the aggregate ID of each aggregate root, and the actual view_id is VendorView.id copied from the Vendor. This view stores all products of a single vendor. The Product aggregate has a vendor_id field to reference the Vendor.

I also wondered whether Product should not be a aggregate root at all, but in many scenarios/commands the Product itself is the root to be referenced, with its own stock, prices and so on. So it seems for this case the model is easier if Vendor and Products are separate aggregates.

To dispatch events to this view from separate CQRS instances of each aggregate we implement both View<Vendor> and View<Product> for VendorView. However we have no way to pass this vendor_id as view_id down to the Query because:

pub trait Query<A: Aggregate>: Send + Sync {
    async fn dispatch(&self, aggregate_id: &str, events: &[EventEnvelope<A>]);
}

// cqrs.rs
impl<A, ES> CqrsFramework<A, ES> {
    pub async fn execute_with_metadata(...) -> Result<(), AggregateError<A::Error>> {
let aggregate_context = self.store.load_aggregate(aggregate_id).await?;
        // here the aggregate_id is the Product's. Ok
        let aggregate_context = self.store.load_aggregate(aggregate_id).await?;
        // .....
        for processor in &self.queries {
            let dispatch_events = committed_events.as_slice();
            // for most queries, specially the ones exclusive to the Product, this is OK
            // but for VendorView this is not ok since Project.id != VendorView.id
            processor.dispatch(aggregate_id, dispatch_events).await;
        }
    }
}

One workaround I ended up doing was:

pub trait Query<A: Aggregate>: Send + Sync {
    async fn dispatch(&self, aggregate_id: &str, events: &[EventEnvelope<A>], secondary_id: Option<&str>);
}

pub trait Aggregate: Default + Serialize + DeserializeOwned + Sync + Send {
    fn secondary_id(&self) -> Option<String> {
        None
    }
}

impl Aggregate for Product {
    fn secondary_id(&self) -> Option<String> {
        Some(self.vendor_id.clone())
    }
}

impl<A, ES> CqrsFramework<A, ES> {
    pub async fn execute_with_metadata(...) -> Result<(), AggregateError<A::Error>> {
        // ..... I had to load_aggregate again to get the aggregate with the newly applied event
        let aggregate_context = self.store.load_aggregate(aggregate_id).await?;
        let aggregate = aggregate_context.aggregate();
        let secondary_id = aggregate.secondary_id();
       
        for processor in &self.queries {
            let dispatch_events = committed_events.as_slice();
            processor
                .dispatch(aggregate_id, dispatch_events, secondary_id.as_deref())
                .await;
        }
    }
}

Thanks!

Clone trait implementation for memory store

Hi there!

When I try to test an aggregate by using a MemStore and then trying to load the resulting aggregate I'm having this error:

let event_store = MemStore::<MyAggregate>::default();
let cqrs = CqrsFramework::new(event_store, vec![], services);

cqrs.execute(
    aggregate_id,
    command1,
)
.await
.unwrap();

let aggregate = event_store.load_aggregate(aggregate_id).await.unwrap();
252 |         let event_store = MemStore::<MyAggregate>::default();
    |             ----------- move occurs because `event_store` has type `MemStore<aggregate::MyAggregate>`, which does not implement the `Copy` trait
253 |         let cqrs = CqrsFramework::new(event_store, vec![], Box::new(services));
    |                                       ----------- value moved here
...
272 |         event_store.load_aggregate(aggregate_id).await.unwrap();
    |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ value borrowed here after move

I tried to copy the source of MemStore and use derive(Clone), then I can just clone the event_store and it works without any problem.

Is this intentional? or is there another better way to handle this use case?

Thanks!

How to check for aggregate existence?

If I understand correctly, implementations of Store are intended to return Default::default() in case the aggregate is not found.
Sometimes in handling commands (say, extending someones fitness club card, where card is an aggregate) you have to check for the entity existence.

Workaround: make the aggregate an enum with two variants to simulate Option.

The case where this is needed seems very common, so I would suggest to modify the framework to satisfy these needs.
But I have almost no experience in ES/CQRS, so I may be missing something.

Propagating the timestamp through to the EventEnvelope

Hi, I'm working on some views where I want to send the traditional created_at and updated_at fields to the front end. All the data for this can be easily derived from the timestamp field, so I'd like to use it, but it isn't surfaced by the EventEnvelope. I realise that currently there is no dependency on any DateTime libraries, so this isn't a trivial change, but it seems like the alternative is to store the timestamp in each event, which is less than ideal. Do you have any other thoughts on how this can be approached?

Thanks!

Support the TestFramework inside Cucumber tests

Hi serverlesstechnology,

Once again, I'm loving the framework and intend to promote it heavily very soon!

One major thing I need to achieve before doing so, is leveraging the TestFramework inside cucumber tests. Being able to use .feature files to setup BDD test scenarios is important to effectively communicate the tests with stakeholders who cannot read code.

For example, I'd love to be able to do something like this:

#[derive(World)]
pub struct BankAccountWorld {
    test_framework: AccountTestFramework,
}

impl Default for BankAccountWorld {
    fn default() -> Self {
        let services = BankAccountServices::new(Box::<MockBankAccountServices>::default());
        BankAccountWorld {
            test_framework: AccountTestFramework::with(services),
        }
    }
}

#[given(regex = r"^I want to be able to open a bank account with ID (\d+)$")]
pub fn given_i_want_to_be_able_to_open_a_bank_account_with_id(
    world: &mut BankAccountWorld,
    account_id: String,
) {
    world.test_framework = world
        .test_framework
        .given(vec![BankAccountEvent::AccountOpened {
            account_id: (account_id),
        }]);
}

However, the methods on the test framework build a AggregateTestExecuter, and consume self, instead of borrowing, so any means of storing state in the World struct becomes a bit of a hacky work around.

    ...
    #[must_use]
    pub fn given(self, events: Vec<A::Event>) -> AggregateTestExecutor<A> {
        AggregateTestExecutor {
            events,
            service: self.service,
        }
    }

Could we perhaps implement a cleaner solution so that the given, when and then methods do not have to be chained and state can be stored between calls? If you have a preference for direction, I'd be happy to contribute if you prefer not to write the code yourself 👍

Advised way to extend view retrieval functionality?

My understanding is that the only way to use the load functionality with a View Repository is by supplying the view ID, which must be used because of the necessity to implement the ViewRepository trait (below) required by the GenericQuery struct.

#[async_trait]
pub trait ViewRepository<V, A>: Send + Sync
where
    V: View<A>,
    A: Aggregate,
{
    /// Returns the current view instance.
    async fn load(&self, view_id: &str) -> Result<Option<V>, PersistenceError>;

    /// Returns the current view instance and context, used by the `GenericQuery` to update
    /// views with committed events.
    async fn load_with_context(
        &self,
        view_id: &str,
    ) -> Result<Option<(V, ViewContext)>, PersistenceError>;

    /// Updates the view instance and context, used by the `GenericQuery` to update
    /// views with committed events.
    async fn update_view(&self, view: V, context: ViewContext) -> Result<(), PersistenceError>;
}

But what if the view_id is not known upfront? For example, if I have a table user_query, where across all rows, the email field in the payload must be unique, how should I check if user_query table contains a row where the payload's email value does not match a provided input?

Am I meant to create a user service and add such methods to that service?

Control over view_id when creating a view

Related with #90

When we use CQRS the main idea is to segregate commands and queries right?
On this crate we have control over the aggregate_id when we are managing commands, however when we build a view the view_id is populated always with the aggregate_id of an specific aggregate.

This second part for me breaks the main advantage of CQRS, first of all I should be able to use events from 2 different aggregates and secondly (as the problem on issue #90) in many cases the query result (let's call it view record) will not match 1:1 with an aggregate nor have the same id.

image
You can see in the picture how the Query and Command model are not 1:1 (and they should not)

Example where aggregate id is different from view id:

  • Aggregate Portfolio
  • View UserPortfoliosOverview --> aggregated view of all portfolios for a specific user

The Portfolio aggregate commands will use a generated id for identity and will add the user_id as part of the metadata.
The view will use user_id as a key, and in every PortfolioCreated event will decide if it updates the view or not.

Example with different aggregates:

  • Aggregate Portfolio
  • Aggregate Markets
  • View PortfolioUsdValueView --> it will update the portfolio value when a NewPriceEvent is emitted by Markets or a NewAssetAdded is emitted by Portfolio

This case can be managed if we consider PortfolioUsdValue as an aggregate, but the reality is just a view (we will never execute a command from users).
We need to be able on this scenario to receive events from Portfolio and Market aggregates.

What I found

When we submit a command we are able to provide 3 parameters:

  • aggregate_id
  • payload
  • metadata
  cqrs.execute_with_metadata(aggregate_id, payload, metadata)

However when we write a query we can only update the payload of the view record.

When I see the implementation of execute_with_metadata I see why the aggregate_id becomes the view_id (this is only a specific case but not the general expected behavior of a query entity)

pub async fn execute_with_metadata(
        &self,
        aggregate_id: &str,
        command: A::Command,
        metadata: HashMap<String, String>,
    ) -> Result<(), AggregateError<A::Error>> {
       ...
        for processor in &self.queries {
            let dispatch_events = committed_events.as_slice();
            processor.dispatch(aggregate_id, dispatch_events).await;
        }
        Ok(())
    }

Then I see the trait Query has couple of things I'm not fully aligned:

  • it expects a Aggregate type (why not a ViewRecord or a QueryEntity type)
  • there is no way to specify nor change the view_id
  • the aggregate_id is already part of EventEnvelope, so there is no need for it as parameter of dispatch function

Current implementation:

#[async_trait]
pub trait Query<A: Aggregate>: Send + Sync {
    /// Events will be dispatched here immediately after being committed.
    async fn 
    dispatch(&self, aggregate_id: &str, events: &[EventEnvelope<A>]);
}

Proposal

So maybe we could change this into something like:

#[async_trait]
pub trait Query<A: ViewRecord>: Send + Sync {
    /// no need for aggregate_id parameter
    async fn dispatch(&self, events: &[EventEnvelope<A>]);
}

#[async_trait]
pub trait ViewRecord: Default + Serialize + DeserializeOwned + Sync + Send {

    fn view_record_id() -> String;
    fn apply(&mut self, event: Self::Event);
}

And to not disrupt current implementation we can provide From implementations like:

impl From<Aggregate> for ViewRecord ...

Share your thoughts I may be missing something.

Is this crate no longer maintained?

I noticed that there is a CQRS-ES2 crate that is forked from this project; that project has made big updates about ownership and makes large changes, so should this project be considered unmaintained and should we instead be using that other crate?

I guess what I'm really asking here is if the CQRS-ES2 crate should be considered the modern replacement of this one that we should actually use instead?

I'm seeking guidance on the right CQRS crate to use for a new project.

move the demo application into this repo

the demo application should probably be moved into this repo, and this repo modified to be a cargo workspace

this ensures that this library and the demo application are updated in lockstep.

I had a quick go at this, but ran into some subtle version issues. I suspect postgres-es would also need to be added to the workspace to prevent multiple different versions of cqrs being pulled into the dependency tree

Defining multiple aggregates in `ApplicationState`

The cqrs-demo shows how the aggregate BankAccount is used in the ApplicationState:

pub struct ApplicationState {
    pub cqrs: Arc<PostgresCqrs<BankAccount>>,
    pub account_query: Arc<PostgresViewRepository<BankAccountView, BankAccount>>,
}

The CqrsFramework<A, ES> can only hold one single Aggregate, so I assume I need to add all my aggregates as additional fields to the ApplicationState?

pub struct ApplicationState {
    pub bank_account: Arc<PostgresCqrs<BankAccount>>,
    pub bank_account_query: Arc<PostgresViewRepository<BankAccountView, BankAccount>>,
    pub other_aggregate: Arc<PostgresCqrs<OtherAggregate>>,
    pub other_aggregate_query: Arc<PostgresViewRepository<OtherAggregateView, OtherAggregate>>,
}

Is this the intended way of doing it?

Thanks in advance!

Garbage collect old events

Hi,

I've just read the book about this repo and am really excited about it!

However I am a bit worried about the size of the event store. Is there some kind of garbage collection for old events?

For instance let's say we have a system where only a known set of agents may write commands. For some time it makes sense to store the set of commands since due to networking latency, some commands may arrive delayed. However when we received some message from all writing agents, we may safely assume that the state of our system does not change at the point where we are sure to have received all commands to that point in time. Therefore we might calculate the state at that point, take it as new "start" and delete the events before, saving a lot of memory.
Kind of like a concept called snapshots I read about, but to save storage, not computation time.

Or do I overestimate the problem the indefinitely growing event/command logs are?

Some questions and suggestions

First some basic suggestion: please enable Discussion for this project and/or make some Matrix channel etc. for users to hang out in, ask for help and socialize. Had it been already there, I would have just asked there. :)

I wanted to start using "event processing" (write stuff down in a log, and then have other stuff follow that log and react to it) in my Rust projects. I did not specifically thought about ES/CQRS but I think what I wanted to achieve might fit in this model.

Your crate looks like a solid, more informed base and I went through the book, read through the demo code and some of the source code and I think I can find myself here. I like that it's lightweight, to the point and supports Postgres. Great job.

I basically want to write a CI/CD bot to power my fake Rust/Nix software shop. I'm thinking - I consume github webhooks as "Commands", I track aggregates like "PR" (PR, commits in it, all the pushes, all the builds).

Q1: Are Querieseagerly and reactive? I mean - on every new event, I can expect the "simple query" to be called eagerly and in it's code I can emit any side effects(?) like starting a new CI build? Is it guaranteed? The ("it's useless in production"](https://github.com/serverlesstechnology/cqrs-demo/blob/50029766363aa0ff324ccd264bdeed532e5a236e/src/queries.rs#L12) is a bit confusing. I guess the framework itself does not keep track of which events each query processed already, so I would have to persist some "pointer" to skip emitting side effects for events I already reacted to before (some idempotency-like considerations), but otherwise that's it? BTW. You might want to consider building a concept of a "Reactor" or something that would wrap a Query and keep track of position in the event stream automatically - seems rather generic.

Q2: How do I approach multiple event streams including splitting between multiple "microservices"? The demos are kind of simplistic and include only one aggregate. I might just need to read more literature, since it's more of an architecture question, but if you could give me a pointer or two, I would appreciate it.

I guess I can have multiple programs using cqrs library pointing at the same DB in Postgres and one (or even more) of them writing events, and all following new events. I do realize that I can have things like CDC with Kafka/Kinesis etc. but it's kind of heavy for my tiny internal tools use, at least for now.

Anyway, thanks for your work on this project, and sooner or later I'll figure it out and maybe post some links with results.

Use CloudEvents as standard event schema

CloudEvents aims to be the universal standard for event schemas.

There is an SDK for Rust.

It would be fantastic if we could ensure the events published by the framework are stored in the database as a CloudEvent type with the data in the data field.

Additionally, I would advise using the ULID crate for the ID if you're open to this suggestion.

Aggregate Commands may need Deserialize trait

Sending events over channels is possible because DomainEvents can be serialized and deserialized before sending and after receiving.

However, in cases where a command is to be sent, a channel sending and receiving aggregate commands cannot generally be constructed and has to be handled on a case by case basis for each aggregate command type.

One way to fix this is to add a constraint to the Aggregate trait such as:

#[async_trait]
pub trait Aggregate: Default + Serialize + DeserializeOwned + Sync + Send {
    /// Specifies the inbound command used to make changes in the state of the Aggregate.
    type Command: Serialize + DeserializeOwned + fmt::Debug + Sync + Send;
...
}

See #32

Forwarding events to message broker / outbox table

Firstly, thanks for this library. What you guys are doing is so valuable!

I've followed the book guide and implemented the bank service with postgres persistent data.. my next question is:
How should events be forwarded to an event broker for other services to subscribe to?

I'm aware of the outbox pattern approach, is there any recommended way I can accomplish this?
One simple way I can think of is have a Query which forwards events. impl Query<BankAccount> for EventForwarder.

Thanks!

Events lost on error?

Looking deeply at the code in the CQRS execute_with_metadata call, it looks like the 'aggregate.handle(command)' call can never return errors that can be recovered because of the use of '?', Since we are not given a way to recover.

Is this intentional? Was the intention to put a hard barrier up and not allow the processing of errors? I'm under the impression that in rust it's generally considered an anti-pattern to use this '?' flow because you're not able to correctly deal with errors due to the forced panic. Is my understanding wrong?

This seems like an error to me, because I would want to be able to handle a recoverable error in a clean way as part of this pipeline,or at least log it cleanly to a dead letter queue or something, but the create doesn't allow me to even add a logger to the failure, since it will panic as soon as the result is returned.

Why are aggregates aware of commands?

Moved to Discussions

I just realized this should be in the discussion area, sorry about that. Feel free to delete this.

Hi there. I'm just getting started with the framework, I'm coming from the C#/.NET world of DDD/CQRS. I am enjoying the library but I did have a question about why aggregates are handling commands and the reason for that?


.NET World

In .NET, from my experience, the CQRS is separated from the domain/aggregates.

  \domain\
     bank_account_aggregate.rs
     bank_account_events.rs
   \appllication\
     deposit_funds_command.rs
     deposit_funds_command_handler.rs
   \infrastructure\
     bank_account_repository.rs
     event_store.rs
   \api\
     bank_account_controller.rs

bank_account_aggregate.rs: implements all the business logic functions and emits events. It'll open the account, handle deposits, withdraw, etc... and then return events.

deposit_funds_command_handler.rs: Handles the deposit_funds_command.rs by using the bank_account_repository.rs to get the bank account from the data store then calls the deposit function on the aggregate and then handles emitting the returned event or error.

bank_account_controller.rs: Is the HTTP endpoint that receives/validates user input then creates the command to hand off to the command handler.

The benefits of this is that I can swap out any and all 3rd party dependencies, since the domain library should have no dependencies, and the business logic isn't cluttered with external calls since the aggregate has, either from it's state or from the function parameters, everything it requires to do what it needs to do.


Hopefully that helps set the context for my question. It wouldn't be too much work for me to pull the business logic out of the aggregate handler but I was I just want to know the reason for the aggregate being the command handler. Perhaps in your experience you've found the command handler isn't worth it or maybe it helps with Rust lifetimes. I'm not criticizing the choice I just want to understand the reasoning.

Thanks!

Empty event list being propagated to queries

When you return Ok(vec![]) from an aggregate's handle() function, the aggregate's queries are being executed even though there are no events to be dispatched. You could argue that it is the query's responsibility to ignore an empty list of events, but the current implementation of GenericQuery does not do that and will instead always load a view, increment its version and save it whenever the query's dispatch() function is called, irrespective of its inputs.

I think it makes sense to change the behavior of CqrsFramework, so that it doesn't propagate any empty event lists coming from the aggregate's handle() function.

Aggregate::handle

Hello 👋 , thanks for creating this crate, this is a great improvement on domain_patterns!

I'm trying to learn about the library, but am a bit confused on an aspect of the design. Could you please explain why aggregates handle their own commands in this framework? And, how can infrastructure or domain services be used within an impl of this handle method without also tying the instantiation of an aggregate instance to those services? (particularly to avoid mocking those services to test the domain objects).

In my experience, aggregates are concerned only with the business logic, whereas it is an "application service" that (a separate type/service) that handles commands against a particular aggregate. In order to do that, this application service queries repositories, shares references to domain services (e.g. for a domain object to use when performing a business operation), persists new events created by business operations (and invoke event handlers in non-ES systems), etc.

Any help you could provide to clarify would be appreciated, thanks 🙂

Coontribution

I like this repository so much!! I'm a Rust developer. There is a way I can contribute? Bests.

Add `append_query` method to framework

Take two aggregates, AggregateA and AggregateB running on the same server. If these are related, a query on AggregateA can be used to trigger commands on AggregateB on certain events, but the reverse can't be configured since queries are injected via the constructor.

A method should be added to CqrsFramework to append additional queries to allow this.

Support for multiple event tables?

Hi, thanks for all your work. I'm learning a lot about how crqs/es works using your module and applying it against a real world modelling scenario. I saw in your book that you say:

A single table can be used for all aggregate events and this is usually the ideal setup for development and testing. For production systems it is recommended that each aggregate have a table solely dedicated to its' events.

I looked through the code, but it appears that this isn't supported at this point unless I'm missing something. I didn't see anything else in the upcoming 0.3 commit log either, but I may have missed something. Is this in development or on the roadmap? Cheers!

CQRS no ES

Is there a way to use this crate to as an AggregateSource CQRS Framework without the event sourcing?

I don't need the event history but only the aggregate current state persisted in my database.

Aggregate ID unavailable in command handler

Is there a reason why the aggregate ID is not being passed to the handle function of the aggregate trait? In a lot of situations this forces the inclusion of the ID in the aggregate‘s commands and events, so it can be stored inside the aggregate‘s struct. This is not ideal though because it adds unnecessary redundancy and verbosity, and implies that the ID is mutable (which it is not). It would be very helpful if the aggregate ID was available inside the handle function because then it would be possible to e.g. query relations of the aggregate instance via the aggregate services.

How can I replay all events in the order they were committed?

How can I replay all events in the order they were committed?

I am storing events from multiple aggregates in the same table, such as aggregates A, B, and C. The events are presented staggered in the table as follows:

  • A::E1
  • B::E1
  • C::E1
  • A::E2
  • C::E2
  • B::E2

However, QueryReplay requires an Aggregate type and only replays events for that specific aggregate. What I want is to replay all events in the order they were committed, regardless of which aggregate they belong to.

Consider these scenarios:

  1. A::E2 has a foreign key dependency on C::E1; if I only replay aggregate A's events, it will lead to a database error.
  2. If I replay C's events after B's events and they update some common fields, there may be conflicting updates.

Adding Queries and Views after events were emitted.

Query only works within a single instance, unless persisted somewhere. GenericQuery is only working correctly if it was created before the first event.

There is QueryReplay but that replay is all or nothing. That means adding a new query after bunch of events were emitted is needlessly hard:

  • Persisted query could be messed up if the event log was replayed, but there are some events between replay and plugin query into CqrsFramework.
    • If not using GenericQuery, in my case I'm dumping everything into a table and query that table from other places, so I will need to truncate the table before replying.
  • If I want to avoid persisting Query, then I'm left with manually replaying the entire log every time I intend to read it
  • Deciding to replay or not is also confusing – how do you tell whether the view was materialized or not?

Moreover: say I have 2 instances that use PostgreSQL as backing for events and views. If host-1 dies before it dispatched all events to queries, and host-2 emits some events – then the view will be incorrect since the persistent view never got updated from host-1.

Unless I'm missing something, I think PersistedEventRepository should have methods to stream events since some arbitrary sequence. Essentially PersistedEventRepository::get_last_events, but without aggregate_id.

This would allow having non-persistent queries that can lazily updated.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.