Comments (11)
Maybe we should add an option to resize a Broadway Pipeline without shutting the prodiucers down (everything else would be restarted though).
from broadway_rabbitmq.
As soon as things arrive it starts sending them down. IRC, prefetch_count is about how many items in flights, not how many items to get before anything starts.
from broadway_rabbitmq.
This is similar to #5. We could allow passing an already established connection but it would require significant changes to the architecture here, because right now the producer manages disconnections and reconnections and generally the whole lifecycle of opening the connection and channel.
Also, Broadway is most often started in supervision trees, and in that case, it's unlikely you'd have the connection available already when starting the Broadway pipeline, making it a bit tricky to achieve this.
However, I think we should think of how to improve the situation right now because yes, one connection per producer is not ideal, it would be better to have one channel per producer and have a pool of connections or something like that.
from broadway_rabbitmq.
Hi @hassox!
Our biggest concern with sharing connections is how to reason about failures. If we are going to do sharing, I would probably still prefer to have Broadway itself starting the pool, rather than accepting external connections.
Also, when bumping the concurrency, are you sure you have to bump the number of producers drastically? I would expect the processors to be the bottleneck. :)
from broadway_rabbitmq.
Hey @josevalim
Fair. Is there anyway that we can setup a pool for them? Tbh I'm a little confused about how the flow works. I have a large number of processors spun up atm but I'm not seeing it move as quickly as I thought it would. There are 3 numbers that I think are at play
- qos -> prefetch_count
- producer concurrency
- processors concurrency
I guess what I'm not clear about with these numbers is how the acks are working.
Lets say I have a prefetch count of 5, producer concurrency of 2 and processor concurrency of 100. Am I going to be able to get more than 10 concurrent processors running at a time (prefetch * producer)
from broadway_rabbitmq.
Look at max_demand. By default we send 10 items to each processor. So if you have a prefetch_count of 5, it is likely that all 5 are going to the same processor. But even if you set max_demand: 1
, so you send only a single item to each processor, you will have 5 * 2 items in flux at once, which means you will never get all 100 processors busy.
So you need to increase the prefetch_count
and reduce max_demand
. I would try setting the prefetch_count
to the order of 100 or 1000.
from broadway_rabbitmq.
Thankyou!
So just so I'm clear this would work out?
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {
BroadwayRabbitMQ.Producer,
queue: MainApp.Amqp.Queues.transformed_queue(),
connection: connection_options(),
qos: [
prefetch_count: 100,
]
},
concurrency: 2
],
processors: [
default: [
max_demand: 1,
concurrency: 200
]
]
)
end
Would it wait until it had 100 to fetch or will it anything up to 100 that is available?
Thanks for taking the time to explain this mate.
from broadway_rabbitmq.
I think my issue relates to this one, but if it's required i'll open a new issue.
When I set the concurrency
option for BroadwayRabbitMQ.Producer
it opens one connection per concurrent producer, this is not a good approach for rabbitmq usage. From a cloudamqp blogpost:
Use long-lived connection
Each channel consumes a relatively small amount of memory on the client, compared to a connection. Too many connections can be a heavy burden on the RabbitMQ server memory usage. Try to keep long-lived connections and instead open and close channels more frequently, if required.
We recommend that each process only creates one TCP connection and uses multiple channels in that connection for different threads.
I understand that providing a static connection for the broadway pipeline to use is not good too, because in case of disconnection the pipeline would need to be stopped and restarted manually.
I think a good approach could be to pass a callback function on the :connection
so the BroadwayRabbitMQ.Producer
can checkout for a channel. This way someone using this producer can choose the best way to handle those resources and the library has a way to properly deal with failures. I think the callback function could return channel() | :retry | :stop
and it only retries to reacquire a channel if the :backoff_strategy
is not set to :stop
.
from broadway_rabbitmq.
Yes, it is related. Before we get to this point though, have you confirmed that bumping the producer concurrency is beneficial for you? Most times the bottleneck is not in the producer. In any case, a PR around this area would be welcome.
from broadway_rabbitmq.
@josevalim i'm trying to come up with a proper solution to scale up the consumption rate in case of messages piling up on a queue. Possible ways to handle it that i see is:
- increase the amount of consumers(producers from broadway perspective)
- increase the processors concurrency
broadway doesn't allow me to increase the concurrency of processors after the pipeline started. my solution to this would be starting a new pipeline with a higher processor concurrency.
I'm thinking of a solution like that:
- start a worker that handles connection and channels checkout
- setup the broadway pipeline on a DynamicSupervisor.
- whenever a certain threshold of ready messages is reached, I can start a new pipeline, reusing the connection already open to rabbitmq.
- when the amount of ready messages are above this threshold, I can stop the children of the DynamicSupervisor so I don't put pressure on rabbit
from broadway_rabbitmq.
This is now possible with #118. ✅
from broadway_rabbitmq.
Related Issues (20)
- Crash supervisor after killing Broadway process HOT 1
- econnrefused when deploying elixir project with broadway HOT 3
- ACK timeout kills connection without getting restarted HOT 4
- Ability to retrieve the AMQP Channel used by the Producer HOT 3
- Any plan for new version release? HOT 2
- Allow use of nimble_options 0.4.0 in broadway_rabbitmq HOT 2
- Retrieving additional information from AMQP.Queue.declare HOT 2
- AMQP 1.0 support HOT 7
- Allow usage of connection/channel pool HOT 6
- Update nimble options requirement HOT 1
- Allow for multiple rabbit queues HOT 9
- Add API to setup RabbitMQ topology on producer connect HOT 4
- Proposal: require users to specify `:on_failure` HOT 1
- Questions about supported versions of the AMQP protocol HOT 2
- AMQP 2.0 HOT 3
- Multiple producers is an anti-pattern, right? HOT 12
- Working dead letter example HOT 1
- producers keep failing with declare: [no_wait: true] HOT 5
- :message_count is not honoured as metadata option. HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from broadway_rabbitmq.