Code Monkey home page Code Monkey logo

Comments (8)

pawelkaczor avatar pawelkaczor commented on June 3, 2024 1

Ok, I will look into that. Thanks for reporting.

from akka-ddd.

pawelkaczor avatar pawelkaczor commented on June 3, 2024

I believe there is a cause for this, but I have no idea what it could be (based on information you provided).

from akka-ddd.

hengkysucanda avatar hengkysucanda commented on June 3, 2024

i noticed on the receptor there is a capacity config, which is defaulted to 1000. I suspect it could be because of the regular snapshotting interval config

from akka-ddd.

hengkysucanda avatar hengkysucanda commented on June 3, 2024

I think i can say with certainty that it is an issue with the capacity config. I have just enlarged it myself using a custom SagaSupport and SagaManager, and the Saga starts working again afterward.

from akka-ddd.

pawelkaczor avatar pawelkaczor commented on June 3, 2024

interval = config.capacity) // TODO: snapshoting interval should be configured independently of the receptor capacity ?
Perhaps default snapshoting interval should be set to capacity -1 (capacity minus one)...

from akka-ddd.

hengkysucanda avatar hengkysucanda commented on June 3, 2024

If -1 it means it will never snapshots?

Some logs that also led me to conclude that the saga stops after processing 1000 events is this:

2016-05-17 12:51:02,064 [INFO] from co.styletheory.context.inventory.domain.write.InventoryWriteConfiguration$$anonfun$sagaManagerFactory$1$$anon$1 in styletheory-akka.actor.default-dispatcher-5 - Receptor-products--1941104551 subscribed to 'products' event stream from position: 1001.
2016-05-17 12:51:02,530 [INFO] from eventstore.tcp.ConnectionActor in styletheory-akka.actor.default-dispatcher-17 - Connected to eventstore/172.31.12.35:1113
2016-05-17 12:51:02,537 [INFO] from co.styletheory.context.userAndAccount.domain.write.UserWriteConfiguration$$anonfun$sagaManagerFactory$1$$anon$1 in styletheory-akka.actor.default-dispatcher-18 - Receptor-users-2137104516 subscribed to 'users' event stream from position: 4003.

I restarted the server several times, and it always says it streams from position above, never increments. Also not long after, i have a huge list of unconfirmed deliveries on the log.

My users stream is aggregating 4 different streams, it looks like this:

fromStreams(['$ce-Product', '$ce-User', '$ce-Invoice', '$ce-Shipment']).
when({
    'co.styletheory.context.userAndAccount.domain.contracts.user.AccountCreated' : function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.userAndAccount.domain.contracts.user.UserRegistered' : function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.userAndAccount.domain.contracts.user.SubscriptionCreated' : function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.userAndAccount.domain.contracts.user.UserCharged' : function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.invoice.domain.contracts.invoice.ChargeSucceeded' : function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.invoice.domain.contracts.invoice.InvoiceCreatedAndChargeSucceeded' : function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.inventory.domain.contracts.inventory.ProductBooked': function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.inventory.domain.contracts.inventory.ProductBookedForSwap': function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.inventory.domain.contracts.inventory.ProductUnbooked': function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.shipping.domain.contracts.shipping.ShipmentStatusUpdated': function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.shipping.domain.contracts.shipping.ShippingCreated': function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.userAndAccount.domain.contracts.user.BoxStatusUpdated': function(s,e) {
        linkTo('users', e);
    },
    'co.styletheory.context.inventory.domain.contracts.inventory.ProductReviewed': function(s,e) {
        linkTo('users', e);
    }
});

So i thought that 4 different streams * 1000 = 4000, might explain the users stream

from akka-ddd.

hengkysucanda avatar hengkysucanda commented on June 3, 2024
object UserSaga {
  sealed trait UserStatus extends SagaState[UserStatus] {
    def isNew = false
  }
  case object New extends UserStatus {
    override def isNew: Boolean = true
  }
  case object Verified extends UserStatus
  case object Subscribed extends UserStatus
  implicit object UserSagaConfig extends SagaConfig[UserSaga]("users") {
    def correlationIdResolver = {
      case UserRegistered(email, securePassword, grant_type, social_id, liked, disliked) => email
      case AccountCreated(userId, accountId, firstName, lastName, owner, stripeCustomerId, status) => userId
      case SubscriptionCreated(email, userId, starting_date, billing_date, subscription_id,  first_name, last_name, address, city, zip, phone) => email
      case ChargeSucceeded(invoiceId, customer, subscription, email, amount, currency, charge_id)  => email
      case ProductBooked(productUUID, product_id, email, user_id) => email
      case ProductBookedForSwap(productUUID, product_id, email, user_id) => email
      case ProductUnbooked(productUUID, productId, email, user_id) => email
      case ShipmentStatusUpdated(shipmentId, email, logistic_status, previous_status, shipping_status) => email
      case ShippingCreated(shippingId, address_id, address, building, unit_number, postal, city, recipient_user, recipient_email, recipient_phone, item, shipping_type, logistic_status, logistic_tracking_id) => recipient_email
      case BoxStatusUpdated(userId, status, logistic_status) => userId
      case ProductReviewed(productId, productParent, user_id, user_email, rate, comment) => user_email
      case UserCharged(email, subscriptionId, invoiceId, amount, currency, charge_id) => email
      case InvoiceCreatedAndChargeSucceeded(invoiceId, email, charge_id, customerId, subscriptionId, currency, subtotal, total, amountDue, periodStart, periodEnd) => email
    }
  }
}
class UserSaga(val pc: PassivationConfig, userOffice: ActorPath) extends ProcessManager[UserStatus] {
  override def officeId: OfficeId = UserSagaConfig
  startWhen {
    case registered: UserRegistered => New
  } andThen {
    case New => {
      case AccountCreated(userId, accountId, firstName, lastName, owner, stripeCustomerId, status) =>
        deliverCommand(userOffice, AddAccount(userId, accountId, stripeCustomerId))
        stay()
      case SubscriptionCreated(email, userId, starting_date, billing_date, subscription_id,  first_name, last_name, address, city, zip, phone) =>
        deliverCommand(userOffice, UpdateUser(email, Some(phone), Some(first_name + " " + last_name), None))
        goto(Subscribed)
      case ChargeSucceeded(invoiceId, customer, subscription, email, amount, currency, charge_id) =>
        deliverCommand(userOffice, ChargeUser(email, subscription, invoiceId, amount, currency, charge_id))
        goto(Subscribed)
      case InvoiceCreatedAndChargeSucceeded(invoiceId, email, charge_id, customerId, subscriptionId, currency, subtotal, total, amountDue, periodStart, periodEnd) =>
        deliverCommand(userOffice, ChargeUser(email, subscriptionId, invoiceId, total, currency, charge_id))
        goto(Subscribed)
        // TODO to be removed later since only subscribed user can book, or should deliver commands that produces events to cancel the booking on inventory domain
      case ProductBooked(productUUID, product_id, email, user_id) =>
        deliverCommand(userOffice, BookProduct(email, productUUID))
        stay()
      case ProductBookedForSwap(productUUID, product_id, email, user_id) =>
        deliverCommand(userOffice, BookProductForSwapPurpose(email, productUUID))
        stay()
      case ProductUnbooked(productUUID, productId, email, user_id) =>
        deliverCommand(userOffice, UnbookProduct(email, productUUID))
        stay()
    }
    case Subscribed => {
      case ProductBooked(productUUID, product_id, email, user_id) =>
        deliverCommand(userOffice, BookProduct(email, productUUID))
        stay()
      case ProductUnbooked(productUUID, productId, email, user_id) =>
        deliverCommand(userOffice, UnbookProduct(email, productUUID))
        stay()
      case ProductBookedForSwap(productUUID, product_id, email, user_id) =>
        deliverCommand(userOffice, BookProductForSwapPurpose(email, productUUID))
        stay()
      case ShippingCreated(shippingId, address_id, address, floor, unit_number, postal, city, recipient_user, recipient_email, recipient_phone, item, shipping_type, logistic_status, logistic_tracking_id) =>
        val command = shipping_type match {
          case "Normal" => UpdateBoxShippingStatus(recipient_email, BoxStatus.PreparingForShipment.id, "Pending Pickup", s"$address, floor $floor unit $unit_number, $city $postal", shippingId)
          case "Return" => UpdateBoxShippingStatus(recipient_email, BoxStatus.ScheduledForReturn.id, "Pending Pickup", s"$address, floor $floor unit $unit_number, $city $postal", shippingId)
        }
        deliverCommand(userOffice, command)
        stay()
      case ShipmentStatusUpdated(shippingId, to_user, logistic_status, previous_status, status) =>
        val boxStatus = ShippingStatus(status) match {
          case ShippingStatus.Cancelled => BoxStatus.DeliveryFailed
          case ShippingStatus.Delivered => BoxStatus.Delivered
          case ShippingStatus.Failed => BoxStatus.DeliveryFailed
          case ShippingStatus.Sending => BoxStatus.Delivering
          case ShippingStatus.Staging => BoxStatus.PreparingForShipment
        }
        deliverCommand(userOffice, UpdateBoxStatus(to_user, boxStatus.id, logistic_status))
        stay()
      case UserCharged(email, subscriptionId, invoiceId, amount, currency, charge_id) =>
        deliverCommand(userOffice, UpdateUserStatus(email, UserStatus.Subscriber.id))
        stay()
      case ChargeSucceeded(invoiceId, customer, subscription, email, amount, currency, charge_id) =>
        deliverCommand(userOffice, ChargeUser(email, subscription, invoiceId, amount, currency, charge_id))
        stay()
      case InvoiceCreatedAndChargeSucceeded(invoiceId, email, charge_id, customerId, subscriptionId, currency, subtotal, total, amountDue, periodStart, periodEnd) =>
        deliverCommand(userOffice, ChargeUser(email, subscriptionId, invoiceId, total, currency, charge_id))
        stay()
      case ProductReviewed(productId, productParent, user_id, user_email, rate, comment) =>
        deliverCommand(userOffice, UserRateProduct(user_email, productId))
        stay()
    }
  }
}

from akka-ddd.

pawelkaczor avatar pawelkaczor commented on June 3, 2024

Already fixed (see issue #24)

from akka-ddd.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.