Comments (8)
Ok, I will look into that. Thanks for reporting.
from akka-ddd.
I believe there is a cause for this, but I have no idea what it could be (based on information you provided).
from akka-ddd.
i noticed on the receptor there is a capacity config, which is defaulted to 1000. I suspect it could be because of the regular snapshotting interval config
from akka-ddd.
I think i can say with certainty that it is an issue with the capacity config. I have just enlarged it myself using a custom SagaSupport and SagaManager, and the Saga starts working again afterward.
from akka-ddd.
from akka-ddd.
If -1 it means it will never snapshots?
Some logs that also led me to conclude that the saga stops after processing 1000 events is this:
2016-05-17 12:51:02,064 [INFO] from co.styletheory.context.inventory.domain.write.InventoryWriteConfiguration$$anonfun$sagaManagerFactory$1$$anon$1 in styletheory-akka.actor.default-dispatcher-5 - Receptor-products--1941104551 subscribed to 'products' event stream from position: 1001. 2016-05-17 12:51:02,530 [INFO] from eventstore.tcp.ConnectionActor in styletheory-akka.actor.default-dispatcher-17 - Connected to eventstore/172.31.12.35:1113 2016-05-17 12:51:02,537 [INFO] from co.styletheory.context.userAndAccount.domain.write.UserWriteConfiguration$$anonfun$sagaManagerFactory$1$$anon$1 in styletheory-akka.actor.default-dispatcher-18 - Receptor-users-2137104516 subscribed to 'users' event stream from position: 4003.
I restarted the server several times, and it always says it streams from position above, never increments. Also not long after, i have a huge list of unconfirmed deliveries on the log.
My users stream is aggregating 4 different streams, it looks like this:
fromStreams(['$ce-Product', '$ce-User', '$ce-Invoice', '$ce-Shipment']). when({ 'co.styletheory.context.userAndAccount.domain.contracts.user.AccountCreated' : function(s,e) { linkTo('users', e); }, 'co.styletheory.context.userAndAccount.domain.contracts.user.UserRegistered' : function(s,e) { linkTo('users', e); }, 'co.styletheory.context.userAndAccount.domain.contracts.user.SubscriptionCreated' : function(s,e) { linkTo('users', e); }, 'co.styletheory.context.userAndAccount.domain.contracts.user.UserCharged' : function(s,e) { linkTo('users', e); }, 'co.styletheory.context.invoice.domain.contracts.invoice.ChargeSucceeded' : function(s,e) { linkTo('users', e); }, 'co.styletheory.context.invoice.domain.contracts.invoice.InvoiceCreatedAndChargeSucceeded' : function(s,e) { linkTo('users', e); }, 'co.styletheory.context.inventory.domain.contracts.inventory.ProductBooked': function(s,e) { linkTo('users', e); }, 'co.styletheory.context.inventory.domain.contracts.inventory.ProductBookedForSwap': function(s,e) { linkTo('users', e); }, 'co.styletheory.context.inventory.domain.contracts.inventory.ProductUnbooked': function(s,e) { linkTo('users', e); }, 'co.styletheory.context.shipping.domain.contracts.shipping.ShipmentStatusUpdated': function(s,e) { linkTo('users', e); }, 'co.styletheory.context.shipping.domain.contracts.shipping.ShippingCreated': function(s,e) { linkTo('users', e); }, 'co.styletheory.context.userAndAccount.domain.contracts.user.BoxStatusUpdated': function(s,e) { linkTo('users', e); }, 'co.styletheory.context.inventory.domain.contracts.inventory.ProductReviewed': function(s,e) { linkTo('users', e); } });
So i thought that 4 different streams * 1000 = 4000, might explain the users stream
from akka-ddd.
object UserSaga { sealed trait UserStatus extends SagaState[UserStatus] { def isNew = false } case object New extends UserStatus { override def isNew: Boolean = true } case object Verified extends UserStatus case object Subscribed extends UserStatus implicit object UserSagaConfig extends SagaConfig[UserSaga]("users") { def correlationIdResolver = { case UserRegistered(email, securePassword, grant_type, social_id, liked, disliked) => email case AccountCreated(userId, accountId, firstName, lastName, owner, stripeCustomerId, status) => userId case SubscriptionCreated(email, userId, starting_date, billing_date, subscription_id, first_name, last_name, address, city, zip, phone) => email case ChargeSucceeded(invoiceId, customer, subscription, email, amount, currency, charge_id) => email case ProductBooked(productUUID, product_id, email, user_id) => email case ProductBookedForSwap(productUUID, product_id, email, user_id) => email case ProductUnbooked(productUUID, productId, email, user_id) => email case ShipmentStatusUpdated(shipmentId, email, logistic_status, previous_status, shipping_status) => email case ShippingCreated(shippingId, address_id, address, building, unit_number, postal, city, recipient_user, recipient_email, recipient_phone, item, shipping_type, logistic_status, logistic_tracking_id) => recipient_email case BoxStatusUpdated(userId, status, logistic_status) => userId case ProductReviewed(productId, productParent, user_id, user_email, rate, comment) => user_email case UserCharged(email, subscriptionId, invoiceId, amount, currency, charge_id) => email case InvoiceCreatedAndChargeSucceeded(invoiceId, email, charge_id, customerId, subscriptionId, currency, subtotal, total, amountDue, periodStart, periodEnd) => email } } } class UserSaga(val pc: PassivationConfig, userOffice: ActorPath) extends ProcessManager[UserStatus] { override def officeId: OfficeId = UserSagaConfig startWhen { case registered: UserRegistered => New } andThen { case New => { case AccountCreated(userId, accountId, firstName, lastName, owner, stripeCustomerId, status) => deliverCommand(userOffice, AddAccount(userId, accountId, stripeCustomerId)) stay() case SubscriptionCreated(email, userId, starting_date, billing_date, subscription_id, first_name, last_name, address, city, zip, phone) => deliverCommand(userOffice, UpdateUser(email, Some(phone), Some(first_name + " " + last_name), None)) goto(Subscribed) case ChargeSucceeded(invoiceId, customer, subscription, email, amount, currency, charge_id) => deliverCommand(userOffice, ChargeUser(email, subscription, invoiceId, amount, currency, charge_id)) goto(Subscribed) case InvoiceCreatedAndChargeSucceeded(invoiceId, email, charge_id, customerId, subscriptionId, currency, subtotal, total, amountDue, periodStart, periodEnd) => deliverCommand(userOffice, ChargeUser(email, subscriptionId, invoiceId, total, currency, charge_id)) goto(Subscribed) // TODO to be removed later since only subscribed user can book, or should deliver commands that produces events to cancel the booking on inventory domain case ProductBooked(productUUID, product_id, email, user_id) => deliverCommand(userOffice, BookProduct(email, productUUID)) stay() case ProductBookedForSwap(productUUID, product_id, email, user_id) => deliverCommand(userOffice, BookProductForSwapPurpose(email, productUUID)) stay() case ProductUnbooked(productUUID, productId, email, user_id) => deliverCommand(userOffice, UnbookProduct(email, productUUID)) stay() } case Subscribed => { case ProductBooked(productUUID, product_id, email, user_id) => deliverCommand(userOffice, BookProduct(email, productUUID)) stay() case ProductUnbooked(productUUID, productId, email, user_id) => deliverCommand(userOffice, UnbookProduct(email, productUUID)) stay() case ProductBookedForSwap(productUUID, product_id, email, user_id) => deliverCommand(userOffice, BookProductForSwapPurpose(email, productUUID)) stay() case ShippingCreated(shippingId, address_id, address, floor, unit_number, postal, city, recipient_user, recipient_email, recipient_phone, item, shipping_type, logistic_status, logistic_tracking_id) => val command = shipping_type match { case "Normal" => UpdateBoxShippingStatus(recipient_email, BoxStatus.PreparingForShipment.id, "Pending Pickup", s"$address, floor $floor unit $unit_number, $city $postal", shippingId) case "Return" => UpdateBoxShippingStatus(recipient_email, BoxStatus.ScheduledForReturn.id, "Pending Pickup", s"$address, floor $floor unit $unit_number, $city $postal", shippingId) } deliverCommand(userOffice, command) stay() case ShipmentStatusUpdated(shippingId, to_user, logistic_status, previous_status, status) => val boxStatus = ShippingStatus(status) match { case ShippingStatus.Cancelled => BoxStatus.DeliveryFailed case ShippingStatus.Delivered => BoxStatus.Delivered case ShippingStatus.Failed => BoxStatus.DeliveryFailed case ShippingStatus.Sending => BoxStatus.Delivering case ShippingStatus.Staging => BoxStatus.PreparingForShipment } deliverCommand(userOffice, UpdateBoxStatus(to_user, boxStatus.id, logistic_status)) stay() case UserCharged(email, subscriptionId, invoiceId, amount, currency, charge_id) => deliverCommand(userOffice, UpdateUserStatus(email, UserStatus.Subscriber.id)) stay() case ChargeSucceeded(invoiceId, customer, subscription, email, amount, currency, charge_id) => deliverCommand(userOffice, ChargeUser(email, subscription, invoiceId, amount, currency, charge_id)) stay() case InvoiceCreatedAndChargeSucceeded(invoiceId, email, charge_id, customerId, subscriptionId, currency, subtotal, total, amountDue, periodStart, periodEnd) => deliverCommand(userOffice, ChargeUser(email, subscriptionId, invoiceId, total, currency, charge_id)) stay() case ProductReviewed(productId, productParent, user_id, user_email, rate, comment) => deliverCommand(userOffice, UserRateProduct(user_email, productId)) stay() } } }
from akka-ddd.
Already fixed (see issue #24)
from akka-ddd.
Related Issues (20)
- eventNumber from event source stream not available in Projection
- Support querying AR
- Aggregate Behavior - Command Handler should be able to access a context associated with currently processed command
- Support tagged events, use Akka-Persistence Query
- Pluggable extensive logging for Aggregate Roots
- Support for creating many receptors for the same bpsName HOT 3
- Receptor stop receiving messages if PM uses stream with unused events
- Support Aggregate Root snapshots
- AR Specs - Support automatically generated commands (no custom Generators required!)
- Command Queue
- Support testing of Process Managers
- Support alternative event/snapshot journal (i.e. Cassandra)
- Update to Kamon 1.0.0
- Update / Add documentation
- IDs of commands sent by PMs should be deterministic
- Composition of AR Reactions
- Saving/restoring state of the Deduplication should be handled automatically
- Receptor Builder should support auto-routing with help of Office Registry
- Receptor - delete confirmed events from the in-memory event journal
- Update to latest Akka 2.5.x
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from akka-ddd.