Code Monkey home page Code Monkey logo

Comments (11)

josevalim avatar josevalim commented on July 17, 2024 4

Maybe we should add an option to resize a Broadway Pipeline without shutting the prodiucers down (everything else would be restarted though).

from broadway_rabbitmq.

josevalim avatar josevalim commented on July 17, 2024 1

As soon as things arrive it starts sending them down. IRC, prefetch_count is about how many items in flights, not how many items to get before anything starts.

from broadway_rabbitmq.

whatyouhide avatar whatyouhide commented on July 17, 2024

This is similar to #5. We could allow passing an already established connection but it would require significant changes to the architecture here, because right now the producer manages disconnections and reconnections and generally the whole lifecycle of opening the connection and channel.

Also, Broadway is most often started in supervision trees, and in that case, it's unlikely you'd have the connection available already when starting the Broadway pipeline, making it a bit tricky to achieve this.

However, I think we should think of how to improve the situation right now because yes, one connection per producer is not ideal, it would be better to have one channel per producer and have a pool of connections or something like that.

from broadway_rabbitmq.

josevalim avatar josevalim commented on July 17, 2024

Hi @hassox!

Our biggest concern with sharing connections is how to reason about failures. If we are going to do sharing, I would probably still prefer to have Broadway itself starting the pool, rather than accepting external connections.

Also, when bumping the concurrency, are you sure you have to bump the number of producers drastically? I would expect the processors to be the bottleneck. :)

from broadway_rabbitmq.

hassox avatar hassox commented on July 17, 2024

Hey @josevalim

Fair. Is there anyway that we can setup a pool for them? Tbh I'm a little confused about how the flow works. I have a large number of processors spun up atm but I'm not seeing it move as quickly as I thought it would. There are 3 numbers that I think are at play

  1. qos -> prefetch_count
  2. producer concurrency
  3. processors concurrency

I guess what I'm not clear about with these numbers is how the acks are working.

Lets say I have a prefetch count of 5, producer concurrency of 2 and processor concurrency of 100. Am I going to be able to get more than 10 concurrent processors running at a time (prefetch * producer)

from broadway_rabbitmq.

josevalim avatar josevalim commented on July 17, 2024

Look at max_demand. By default we send 10 items to each processor. So if you have a prefetch_count of 5, it is likely that all 5 are going to the same processor. But even if you set max_demand: 1, so you send only a single item to each processor, you will have 5 * 2 items in flux at once, which means you will never get all 100 processors busy.

So you need to increase the prefetch_count and reduce max_demand. I would try setting the prefetch_countto the order of 100 or 1000.

from broadway_rabbitmq.

hassox avatar hassox commented on July 17, 2024

Thankyou!

So just so I'm clear this would work out?

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {
          BroadwayRabbitMQ.Producer,
          queue: MainApp.Amqp.Queues.transformed_queue(),
          connection: connection_options(),
          qos: [
            prefetch_count: 100,
          ]
        },
        concurrency: 2
      ],
      processors: [
        default: [
          max_demand: 1,
          concurrency: 200
        ]
      ]
    )
  end

Would it wait until it had 100 to fetch or will it anything up to 100 that is available?

Thanks for taking the time to explain this mate.

from broadway_rabbitmq.

fcevado avatar fcevado commented on July 17, 2024

I think my issue relates to this one, but if it's required i'll open a new issue.
When I set the concurrency option for BroadwayRabbitMQ.Producer it opens one connection per concurrent producer, this is not a good approach for rabbitmq usage. From a cloudamqp blogpost:

Use long-lived connection
Each channel consumes a relatively small amount of memory on the client, compared to a connection. Too many connections can be a heavy burden on the RabbitMQ server memory usage. Try to keep long-lived connections and instead open and close channels more frequently, if required.
We recommend that each process only creates one TCP connection and uses multiple channels in that connection for different threads.

I understand that providing a static connection for the broadway pipeline to use is not good too, because in case of disconnection the pipeline would need to be stopped and restarted manually.

I think a good approach could be to pass a callback function on the :connection so the BroadwayRabbitMQ.Producer can checkout for a channel. This way someone using this producer can choose the best way to handle those resources and the library has a way to properly deal with failures. I think the callback function could return channel() | :retry | :stop and it only retries to reacquire a channel if the :backoff_strategy is not set to :stop.

from broadway_rabbitmq.

josevalim avatar josevalim commented on July 17, 2024

Yes, it is related. Before we get to this point though, have you confirmed that bumping the producer concurrency is beneficial for you? Most times the bottleneck is not in the producer. In any case, a PR around this area would be welcome.

from broadway_rabbitmq.

fcevado avatar fcevado commented on July 17, 2024

@josevalim i'm trying to come up with a proper solution to scale up the consumption rate in case of messages piling up on a queue. Possible ways to handle it that i see is:

  • increase the amount of consumers(producers from broadway perspective)
  • increase the processors concurrency

broadway doesn't allow me to increase the concurrency of processors after the pipeline started. my solution to this would be starting a new pipeline with a higher processor concurrency.
I'm thinking of a solution like that:

  • start a worker that handles connection and channels checkout
  • setup the broadway pipeline on a DynamicSupervisor.
  • whenever a certain threshold of ready messages is reached, I can start a new pipeline, reusing the connection already open to rabbitmq.
  • when the amount of ready messages are above this threshold, I can stop the children of the DynamicSupervisor so I don't put pressure on rabbit

from broadway_rabbitmq.

whatyouhide avatar whatyouhide commented on July 17, 2024

This is now possible with #118. ✅

from broadway_rabbitmq.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.