dashbitco / broadway_rabbitmq Goto Github PK
View Code? Open in Web Editor NEWA Broadway producer for RabbitMQ
A Broadway producer for RabbitMQ
Hi,
I'm happy to be working on an academic project involving criminal justice reform using Elixir and Broadway RabbitMQ!
This started out as an issue but as I did more and more research, it is more of an example.
However it is sort of an issue, in that it's not totally clear how dead-letter configuration works in the docs, and I think it would be helpful to add an example.
Here is the code--I wanted to share a method so that others may benefit. Happy to create a PR if that's helpful as well.
Thanks!
This will send failed messages to the my_queue_error
queue on the default exchange.
def declare_queues do
{:ok, connection} = AMQP.Connection.open()
{:ok, channel} = AMQP.Channel.open(connection)
AMQP.Queue.declare(channel, "my_queue_error", durable: true)
AMQP.Queue.declare(channel, "my_queue",
durable: true,
arguments: [
{"x-dead-letter-exchange", :longstr, ""},
{"x-dead-letter-routing-key", :longstr, "my_queue_error"}
]
)
end
def handle_message(_, message, _) do
try do
data = Jason.decode!(message, keys: :atoms)
do_something_with!(message)
message
rescue
e in RuntimeError ->
message
|> Broadway.Message.failed(e.message)
end
end
Hi there... Wondering if there's a way to pass in an already established connection to use for creating channels. (Probably please excuse my ignorance)
When I bump up the concurrency it makes a new connection for each producer rather than creating a new channel on an existing connection. I'm getting close to maxing out my connections and wondering if there's a way to pass in an MFA to get an established connection to make a channel from.
It would be great if you could provide an updated release from broadway_rabbitmq which depends on the newest broadway version.
Also, while the current version of broadway_rabbitmq is 0.3.0, there are no github release for it and the readme suggests version ~> 0.1.0
as dependency.
Hi guys, first I would like to thank you for this excellent library.
In my company we have many services that connect to a broker called ActiveMQ Artemis. This broker among other types of protocol supports version 1.0 of the AMQP protocol. What I would like to know is that it would be possible to use this library to consume messages from that Broker. I know that the versions of the AMQP protocol 0.9 and 1.0 are completely different, but since RabbitMQ itself supports version 1.0 via plugin, I was hopeful that this library could work in my use case.
I would love to replace the current services written in C# by Elixir but I am encountering this barrier of protocols. I cannot change the Broker as it is also used by partners.
Note: I tried to use the MQTT Broadway library, but without success.
Any help?
The Broadway docs for the Broadway.prepare_messages/2
callback state:
The length of the list of messages received by this callback is based on the min_demand/max_demand configuration in the processor.
However, in our application using BroadwayRabbitMQ.Producer
, it seems as if the length of the list of messages is always exactly one regardless of the min_demand
/max_demand
configuration in the processor. Furthermore, it doesn't seem to matter if there are only a few messages in the RabbitMQ queue being used, or if there there are very many messages in the queue. Whenever I IO.inspect(length(messages), label: "prepare_messages batch size")
in my prepare_messages
callback, it always prints prepare_messages batch size: 1
.
Is this a bug in BroadwayRabbitMQ.Producer
? Or have I misconfigured something? Or is there something else happening here that I'm not correctly understanding?
For reference, here is the Broadway topology for the pipeline in question:
[
producers: [
%{
name: Integration.MQ.TrackingData.PlayerTrackingConsumer.Broadway.Producer,
concurrency: 1
}
],
processors: [
%{
name: Integration.MQ.TrackingData.PlayerTrackingConsumer.Broadway.Processor_default,
concurrency: 2,
processor_key: :default
}
],
batchers: [
%{
name: Integration.MQ.TrackingData.PlayerTrackingConsumer.Broadway.BatchProcessor_derived_stats,
concurrency: 4,
batcher_key: :derived_stats,
batcher_name: Integration.MQ.TrackingData.PlayerTrackingConsumer.Broadway.Batcher_derived_stats
}
]
]
And below is the (elided) configuration being passed to Broadway.start_link
. Note that :qos
and most other optional configs are not being overridden, and that the processor is configured with min_demand: 10, max_demand: 20
, which should result in a batch of 10 messages being passed to prepare_messages
.
producer: [
module: {BroadwayRabbitMQ.Producer,
[
bindings: [
{"***elided exchange***",
[routing_key: "***elided***"]}
],
on_failure: :reject,
metadata: [:delivery_tag, :exchange, :routing_key, :content_type,
:content_encoding, :headers, :correlation_id, :message_id,
:timestamp, :type, :app_id],
declare: [durable: true],
queue: "***elided***",
connection: "***elided***"
]}
],
processors: [
default: [
concurrency: 2,
min_demand: 10,
max_demand: 20
]
],
batchers: [
derived_stats: [
concurrency: 4,
batch_size: 20,
batch_timeout: 1_000
]
]
Currently, each Broadway producer opens and monitors it's own connection. This has the advantage of keeping producers completely independent from each other, each one maintaining its own state. However, RabbitMQ's documentation states that:
Each connection uses about 100 KB of RAM (and even more, if TLS is used). Thousands of connections can be a heavy burden on a RabbitMQ server. In the worst case, the server can crash due to out-of-memory. The AMQP protocol has a mechanism called channels that “multiplexes” a single TCP connection. It is recommended that each process only creates one TCP connection, and uses multiple channels in that connection for different threads.
So ideally, I believe we should have a separated process to maintain the connection. This process would be responsible for opening, monitoring and reopening the connection when necessary using the backoff strategy chosen by the user.
Borrow implementation from db_connection
Thanks for the great work on this producer. Quick question:
The AMQP library on which this producer is based provides the AMQP.Exchange.declare/4
function to declare exchanges. Calling this function creates an exchange if it doesn't exist or does nothing if the exchange exists.
I am trying to follow the docs to create a producer that binds an auto-generated queue to an exchange, and I want the exchange to be auto created when the producer starts if the exchange doesn't exist. I also want to create the exchange as type :topic
.
Is there a way to do this? If not, is there a reason why the broadway_rabbitmq producer shouldn't declare exchanges?
Thanks.
Right now, a BroadwayRabbitMQ producer can only connect to a single RabbitMQ node. Some SaaS platforms provide multiple nodes and only guarantee high uptime and a subset of the nodes, not all of them. How can we solve this problem in BroadwayRabbitMQ? For example, a solution could be allowing to pass a function to :connection
that returns a URL (and could just choose at random in a list). This is a simple solution but not very flexible, but just to show an example of what I mean.
Ultimately, this could be an additional use case for having multiple producers, where each producer can connect to a different RabbitMQ node. Thoughts?
If a producer restarts during shutdown, it might receive messages even though there's no more consumers subscribed.
when i set it to declare: [no_wait: true]
the producer keeps failing:
[error] Cannot connect to RabbitMQ broker: :ok
[error] Crashing because of unexpected error when connecting to RabbitMQ
[error] GenServer Rmq.Broadway.Producer_0 terminating
** (RuntimeError) unexpected error when connecting to RabbitMQ broker
(broadway_rabbitmq 0.6.5) lib/broadway_rabbitmq/producer.ex:605: BroadwayRabbitMQ.Producer.handle_connection_failure/2
(broadway_rabbitmq 0.6.5) lib/broadway_rabbitmq/producer.ex:428: BroadwayRabbitMQ.Producer.handle_info/2
(broadway 0.6.2) lib/broadway/topology/producer_stage.ex:228: Broadway.Topology.ProducerStage.handle_info/2
(gen_stage 1.1.0) lib/gen_stage.ex:2108: GenStage.noreply_callback/3
(stdlib 3.14) gen_server.erl:689: :gen_server.try_dispatch/4
(stdlib 3.14) gen_server.erl:765: :gen_server.handle_msg/6
(stdlib 3.14) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Last message: {:connect, :no_init_client}
elixir: 1.11.3
erlang 23.2.7
I belive the producer needs to wait the proper declare
operation to finish before trying to bind the queue to the exchange.
In the documentation example, multiple producers are used.
Isn't that always a bad idea for RabbitMQ (and any other queue-like situation where a Broadway producer does nothing but pull from a finite set of items)? It seems to me that you'd always want a producer concurrency: 1
in such cases.
I created this demo which runs smoothly unless the producer concurrency is increased beyond 1
, in which case the performance suffers dramatically. The basic issue seems to be that every producer tries to satisfy the demand of every processor, such that a few processors get multiple messages (one from each producer), which just pile up in their mailboxes. The other processors get no messages and are idle.
As I updated the docs to show in #73, :reject_and_requeue
can cause an infinite retry loop if the message is unprocessable. This happened to us in production, as my colleague @shamil614 figured out.
I would like to suggest a (breaking) change which could prevent this problem.
Rather than defaulting to :reject_and_requeue
, the user could be required to specify one of these options. This would prompt them to consider the trade offs and whether they want to set up a dead letter exchange and potentially re-publish those messages. (@shamil614 came up with a way to do that, republishing a fixed number of times with an increasing delay using the RabbitMQ delayed message plugin to achieve a "back off" effect.)
Library version 0.3.0
erlang 21.3.6
elixir 1.8.1
** (CaseClauseError) no case clause matching: {:error, :unknown_host}
(broadway_rabbitmq) lib/broadway_rabbitmq/producer.ex:278: BroadwayRabbitMQ.Producer.connect/1
(broadway_rabbitmq) lib/broadway_rabbitmq/producer.ex:187: BroadwayRabbitMQ.Producer.handle_info/2
(broadway) lib/broadway/producer.ex:103: Broadway.Producer.handle_info/2
(gen_stage) lib/gen_stage.ex:2082: GenStage.noreply_callback/3
(stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
(stdlib) gen_server.erl:711: :gen_server.handle_msg/6
(stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
An unknown host currently causes an exception in the Producer code.
I would expect this code to bubble up the error to my application code.
any plan on publishing a patch with #129 ?
We need to:
Mention the official guide at the top, as the guide is a quick getting started and not a complete reference (see SQS)
Have a section on Acknowledgements. Say explicitly what we ack and what we don't ack. We can move the note about requeue to inside this section, but we should break it apart too. For example, it should say as distinct pargraphs: "if requeue is always, then watch out for x and y". "if requeue is never, then watch out for z and w". Etc. We should also talk about the redelivered metadata and have a subsection on batching that says if batching is necessary for RabbitMQ or not (see SQS)
We should have a session on configuring RabbitMQ to use metadata. I removed it from the README here (See SQS)
Right now, Broadway acknowledges messages at the end of the pipeline. This is good if you are using an "at least once" delivery model, where your messages are idempotent and can be redelivered to a consumer many times. However, it's problematic in an "at most once" delivery model where you want to avoid processing a message twice. In those cases, the common pattern is to ack a message as soon as it's received by a consumer so that even if processing fails the message is not requeued.
This could be supported with an option on the producer (since that's the thing that knows how to deal with the external system and knows how to ack messages):
producers: [
default: [
module: {BroadwayRabbitMQ.Producer, opts}
stages: 1,
ack: :start_of_pipeline
]
],
I can't think of a good name for the option but you get the idea.
One thing that's not clear to me is the behaviour of requeue: :never
(in the RabbitMQ producer): I get that if I manually fail the message then the :requeue
option is used, but what happens if something crashes in my pipeline? The message will not be acked and the :ack
option would make sense there, right?
\cc @josevalim @msaraiva
I have a requirement where I need to get access to the rabbitmq queue size. The AMQP client returns this information when using declare/3 which is currently being ignored by broadway_rabbitmq here.
Is this something that could be added to the message metadata or the producer state so that I can access it in handle_message/3
?
I'm happy to open a PR if this is something that would be helpful to add
versions:
elixir 1.12
OTP 24.0
broadway_rabbitmq :0.7
broadway: 1.0
amqp: 2.1.2
Suppose we have the following amqp configuration:
config :amqp,
connections: [
server: [
url: "amqp://rabbitmq:rabbitmq@localhost"
]
],
channels: [
channel: [connection: :server]
]
With broadway producer configuration:
producer: [
module: {
BroadwayRabbitMQ.Producer,
queue: "broadway_queue",
qos: [prefetch_count: 50],
declare: [durable: true],
bindings: [{"outgoing-exchange", []}],
on_failure: :reject
},
concurrency: 1
],
By default, when run with: iex -S mix
, everything works as expected, no errors and side effects.
When creating a release however, after starting it, in log keeps appearing the error:
07:31:41.497 [error] Cannot connect to RabbitMQ broker: :econnrefused
07:31:57.607 [error] Cannot connect to RabbitMQ broker: :econnrefused
07:32:16.246 [error] Cannot connect to RabbitMQ broker: :econnrefused
07:32:38.095 [error] Cannot connect to RabbitMQ broker: :econnrefused
07:32:57.107 [error] Cannot connect to RabbitMQ broker: :econnrefused
I can confirm that there is a open connection, looking in rabbitmq management console.
When running a remote console with command:
> AMQP.Connection.open()
> {:error, :econnrefused}
However running:
AMQP.Connection.open("amqp://rabbitmq:rabbitmq@localhost")
Opens a connection successfully.
To my understanding this happens because broadway
and amqp
try to use the same connection. The thing that bothers me is that in development it works without any problems, even the connection to production rabbitmq server over tls.
Is this the case, or there is a bug?
The new ampq 3.0.0
change is a very nice one that helps with dropping lager and OTP24 compatibility. Is there any plan to make a release soon with that change. It seems life changing enough to have a release :)
Possible values:
Today, the RabbitMQ producer here stops if it receives a basic_cancel
from the broker: https://github.com/plataformatec/broadway_rabbitmq/blob/1ba40e62f079ad2277d7ef6b253872f31065fb78/lib/broadway_rabbitmq/producer.ex#L154-L157
After speaking with @josevalim, it seems that the supervisor will restart the producer and all will proceed. We have two alternatives to improve the current situation:
Let the supervisor do its job: basically what we have today. We can use :shutdown
as the reason. Using a supervisor as a restart mechanism is not the best IMO, as outlined in the famous post "It's all about the guarantees" (go read it if you haven't!).
Handle the reconnection internally. This means using a possible backoff mechanism. This will let us control the logic behind reconnecting, while the producer process will never go down and always act as the "connection manager" instead of the connection itself.
Currently, the use case I have for the broker sending a basic_cancel
to the consumer is when a queue is deleted. If the RabbitMQ consumer is consuming from a queue and that queue is deleted (for example, for recreating it with different properties), the broker sends a basic_cancel
to the consumer.
Let me know how I can help.
This is a feature request.
My use case is I have multiple rabbitmq queues across multiple rabbitmq instances.
I want to transform the data coming in, and publish it to elasticsearch in batches using the bulk api; https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
The challenge is, Broadway is moving to a model where it is limited to one producer. So that means I'll have 1 batcher per queue. I need to listen on many rabbit queues (lets say about 100), and the architecture is forcing me to use 1 elasticsearch "stream" / bulk insert per queue.
So this would look like
<base supervisor>(1)
<broadway supervisor>(1)
<broadway instances per queue>(100)
I guess a workaround would be to have the handle_batch in each call a single process, which then batches the batches from these 100 instances, and flushes them to elasticsearch at certain intervals. But it seems like there is a strong use case for one of ;
Thoughts?
this relates with the discussion on #60 but i'm creating a new issue to address just the points related to handling the resource that the producer uses to connect/communicate with rabbitmq.
just stating some things about rabbitmq and how AMQP
lib model stuff:
%AMQP.Channel{}
has a pid for the channel that can be monitored.With that in mind, what do you think about adding an option pool
to the producer config, that option accept a 1-arity function or a mfa.
Once a channel is needed we checkout a channel by calling the function passing the producer pid for the pool manager.
This looks to be related to this issue rabbitmq/rabbitmq-server#11328.
Seems like it is fixed in the newest version of https://hex.pm/packages/rabbit_common but the dependency chain from here to there will need to be updated.
Hello guys,
I would like to use broadway_rabbitmq, because I think that it's a great package but I would also be able to create my queue directly from my application and also being able to setup few things before consuming it.
Why is not possible to implement a client
by using BroadwayRabbitMQ.RabbitmqClient
behaviour and call it in the broadway.start_link/2
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producers: [
default: [
module: {BroadwayRabbitMQ.Producer,
client: MyModule.Client
queue: "my_queue",
},
stages: 2
]
],
processors: [
default: [
stages: 50
]
]
)
end
By reading the code, I had the impression that it was possible because of this line: https://github.com/plataformatec/broadway_rabbitmq/blob/1ba40e62f079ad2277d7ef6b253872f31065fb78/lib/broadway_rabbitmq/producer.ex#L118
Can you tell me if it is something you will give the ability to the users to do?
Hi,
I am looking to install this but I am running into an issue with nimble options dependency requirement. The broadway_rabbitmq
package requires ~> 0.3.5 or ~> 0.4
but opentelemetry_phoenix
(version 1.1.0
) is requiring ~> 0.5
. Could it be possible to update this dependency in here or relax the requirements a bit more?
Thanks!
Hi there. I am a co-maintainer of amqp library. I am currently preparing a major version release for the library.
Since broadway_rabbitmq seems to be the most popular library that uses amqp, I'd like to get your feedback before releasing the 2.0.0.
As you can see the release notes, there are couple breaking changes.
no_wait to nowait
amqp still supports no_wait
option so you don't have to make any changes for this.
Connection name
Connection name is now moved to options parameter. You need to make a little change on BroadwayRabbitMQ.AmqpClient.setup_channel/1
.
Since the option was introduced at 1.5
you will also need to change the dependency requirement like this:
{:amqp, "~> 1.5 or ~> 2.0"},
If this can block you supporting 2.0, I can consider dropping the change on Connection.open and keep supporting connection name as a separate parameter. What do you think?
If you have any other questions or/and feedback, please let me know.
Thanks!
License
Copyright 2019 Plataformatec
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Producer.prepare_for_draining
to ensure no more messages will be receivedversions:
broadway: 1.0.0
bradway_rabbitmq: 0.7.0
amqp: 2.1
elixir: 1.12
otp: 24.0.5
I have some long-running tasks that sometime may time-out the consumer_timeout
from rabbitmq with message:
09:38:26.044 [warn] AMQP channel went down with reason: {:shutdown, {:server_initiated_close, 406, "PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 7200000 ms. This timeout value can be configured, see consumers doc guide to learn more"}}
The expected behavior would be to reestablish a new connection, kill the timed-out processors and rabbitmq to redeliver messages.
The current behavior is that the GenServer is killed and broadway can no longer send messages to rabbitmq. This is fixed only by restarting the broadway process.
First of all, thank you for this amazing library!
What do you think about adding the ability to retrieve the AMQP Channel used by the Producer?
I'm aware that the Channel is already available in the message metadata, but I'm talking here about having a dedicated method to query the Channel (from the Producer state) outside of a message consumption.
I see several use-cases for this:
Publish RabbitMQ events: Broadway would be responsible for managing the AMQP Channel (connection, re-connection in case of failure, proper disconnection in case the process crashes, etc). This would save a lot of boilerplate code, avoiding the management of a dedicated Channel just to publish events.
Send RabbitMQ RPC requests: building a RPC Client sending RPC requests with the Channel managed by Broadway, using Broadway as a reply-queue consumer for handling RPC responses. Kind of the same use-case as 1., but with the added complexity that we need to keep a mapping between the caller process PID and the RabbitMQ correlation ID, in order to send the response back to the caller process when the RPC response is consumed.
For now, I have circumvented this by having a dedicated process RequestHandler
storing the Channel. I send the Channel to RequestHandler
using the Broadway after_connect
option. But this feels quite hacky to be honest.
Please tell me if you think this would be outside the scope of Broadway, as one could argue it is supposed to be essentially a consumer of messages, not a producer (publish events & RPC commands).
The docs state (emphasis mine):
Unlike the RabittMQ client that has a default :prefetch_count = 0, which disables back-pressure, BroadwayRabbitMQ overwrite the default value to 50 enabling the back-pressure mechanism. You can still define it as 0, however, if you do this, make sure the machine has enough resources to handle the number of messages coming from the broker.
However, setting prefetch_count
to 0
currently doesn't work, because it causes the buffer_size
to be set to 0
as well, which prevents GenStage from starting up.
I think when it is set to 0
, buffer_size
should be set to a reasonably high value, or better yet, allow it to be configured.
The configure/3
callback was introduced in dashbitco/broadway#109. We need to implement it for the RabbitMQ producer.
@josevalim what's the plan for implementing it until we require the new version of Broadway that supports this? Just look at the callbacks exported by Broadway.Acknowledger
?
I've had the experience that when we weren't setting up queue bindings before starting to consume the queue, the RabbitMQ broker would cease to route messages correctly.
We've since moved to excessively declaring every detail of the queues and exchanges, including the bindings, and the problems with routing have disappeared.
I think it should be possible to either define a list of bindings to the queue, or add a callback that can do it instead.
Examples:
[
queue: "fancy_queue",
declare: [durable: true],
bindings: [{"some_exchange", routing_key: "only.for.fancy.queue"}]
]
Alternatively with a callback, it could look like this:
[
queue: "fancy_queue",
declare: [durable: true],
before_consume: &setup_fancy_queue/2
]
# ...
def setup_fancy_queue(channel, queue) do
AMQP.Queue.bind(channel, queue, "some_exchange", routing_key: "only.for.fancy.queue")
end
Would it be possible for us to allow nimble_options 0.4.0 for this library?
We (at work) often have the necessity of defining complex RabbitMQ topologies of exchanges and queues. RabbitMQ is pretty clear on the pattern here: declare everything you need every time you connect since declaring is idempotent. We also have a hard requirement of trying to connect to multiple RabbitMQ URLs until one connection succeeds.
The code that we currently have to do things like declare an exchange and bind it to another exchange before declaring the consumer queue and starting the producer looks like this:
with {:ok, conn} <- AMQP.Connection.open(...),
{:ok, chan} <- AMQP.Channel.open(conn),
:ok <- declare_exchange(chan),
:ok <- bind_exchange(chan) do
Broadway.start_link(...)
end
This has a huge disadvantage: it makes our Broadway pipeline synchronous when starting. That is, the pipeline won't start unless RabbitMQ is available right away. This goes against the good principle of starting the process and connecting in the background with potential backoff (see this great article).
What we would like to do instead is to be able to declare the necessary RabbitMQ topology every time the producer connects to RabbitMQ. This will free us of the synchronization point and generally makes things more coherent.
To achieve this, we can go two ways: either we provide a way to pass in a generic piece of code that takes a AMQP channel, or we come up with a schema for additional options that lets us declare arbitrary topologies.
Note that we're only interested in topologies that are strictly related to the pipeline here. This means that we don't care about being able to declare additional queues for example, since the pipeline can only consume from a single queue.
The first option is to pass a generic piece of code that takes the AMQP channel. The API I propose is a new option :rabbitmq_setup_fun
that takes an anonymous function or an MFA tuple.
rabbitmq_setup_fun: (AMQP.Channel.t() -> :ok | {:error, reason}) | {module(), atom(), [term()]}
An example of its usage:
rabbitmq_setup_fun: fn chan ->
with :ok <- AMQP.Exchange.declare(chan, "my-exchange", :direct),
:ok <- AMQP.Exchange.bind(chan, "other-exchange", "my-exchange", routing_key: "#"),
do: :ok
end
or with MFA:
rabbitmq_setup_fun: {__MODULE__, :declare_exchange, [_routing_key = "#"]}
# In the module:
def declare_exchange(chan, routing_key) do
with :ok <- AMQP.Exchange.declare(chan, "my-exchange", :direct),
:ok <- AMQP.Exchange.bind(chan, "other-exchange", "my-exchange", routing_key: routing_key),
do: :ok
end
The alternative to custom code is to provide more options that let us declare arbitrary topologies. The nice thing is that in RabbitMQ you can only have bindings that look like this:
exchange1 -> exchange2 -> ... -> exchangeN -> queue
That is, you can only bind to one queue at the end of the "exchanges pipeline". This means we only need to support two things:
We don't need to support exchange-to-queue bindings since we already support that through the :bindings
option.
What I propose is to have a new :declare_exchanges
option to declare exchanges:
declare_exchanges: [{name :: String.t(), type :: :direct | :topic | :headers | :fanout, options :: keyword()}]
For example:
declare_exchanges: [
{"my-exchange", :topic, durable: true},
{"my-other-exchange", :headers, durable: true, internal: true}
]
I have two different proposals here.
Option 1: add a new option :exchange_bindings
(to mirror the name of the already existing :bindings
) option.
exchange_bindings: [{source :: String.t(), dest :: String.t(), options :: keyword()}]
Option 2: modify the current :bindings
option to support exchange-to-exchange bindings as well. Right now this option supports a list of {exchange_name :: String.t(), options :: keyword()}
. What I propose is to switch to three-element tuples like the one above: {source :: String.t(), dest :: String.t(), options :: keyword()}
. However, we would have a special possible value for dest
which is the atom :queue
which represents the queue used in the :queue
option.
All thoughts are welcome, excited to have this discussion!
BroadwayRabbitMQ.AmqpClient validates connection params and accepts only a keyword list: https://github.com/plataformatec/broadway_rabbitmq/blob/3a7c7a68811dbcf723d8bc805e22a7eca27934ce/lib/broadway_rabbitmq/amqp_client.ex#L115
But AMPQ.Connection.open accepts also a binary for the url: https://hexdocs.pm/amqp/1.1.1/AMQP.Connection.html#open/1 in the form of "amqp://user:pass@server/db".
I can not set the rabbitmq connection param from ENV due to this limitation.
Can we change it to accept also a uri. We can validate the URI format if required.
If the connection is lost after the message has been successfully processed but before been acknowledged, there's no way to acknowledge the message anymore since the acknowledgement is bound to the channel that has delivered the message. The documentation states that:
A channel only exists in the context of a connection and never on its own. When a connection is closed, so are all channels on it.
That means messages that were processed but not acknowledged will be requeued and processed more than once. However, the documentation also explains that:
If a message is delivered to a consumer and then requeued (because it was not acknowledged before the consumer connection dropped, for example) then RabbitMQ will set the redelivered flag on it when it is delivered again (whether to the same consumer or a different one). This is a hint that a consumer may have seen this message before (although that's not guaranteed, the message may have made it out of the broker but not into a consumer before the connection dropped)
That raises a couple of questions:
broadway_rabbitmq
have a builtin way to handle duplicated message due to a connection lost?redelivered
flag to avoid processing the message again? If so, how can we check if the redelivered message was previously successfully processed or not? The new message will have a different delivery_tag
on a new channel which removes the possibility of comparing the messages. Is there another way?Current documentation shows :message_count
as an available option for metadata of a message. I belive this is not possible to achieve with current implementation. message_count is only available as a metadata when doing basic get(not recommended by the way), current implementation uses basic consume that doesn't expose message_count as a metadata.
Possible see solutions:
AMQP.Queue.message_count/2
to populate the message_count metadata.I can do a fast PR for fixing it. just need to know what is the preferred approach.
Background :
At work we have a need to use AMQP 1.0 to consume messages from Azure Service Bus and IBM MQ via AMQP 1.0. We wanted to use broadway, so we made an offbroadway amqp10 plugin. The plugin is targeted for AMQP 1.0 in general vs just rabbitmq or a particular service.
We plan on open sourcing this, but before we do it was suggested by @wojtekmach to open an issue for discussion.
An AMQP 1.0 client could be added to broadway vs publishing a new offbroadway plugin. The question is, does it make sense?
Differences between AMQP 1.0 and AMQP < 1.0 aside, it makes sense in that this plugin could support rabbitmq installs that have AMQP 1.0 enabled (via rmq plugin).
On the other hand :
Interested to hear thoughts on all this.
Reference:
Hi, I have application supervisor which starts broadway process:
opts = [strategy: :one_for_one, name: __MODULE__]
Supervisor.start_link([MessagesConsumer], opts)
and my process:
use Broadway
def start_link(_) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module:
{BroadwayRabbitMQ.Producer,
queue: @queue,
declare: QueueDeclareOptions.get(@queue),
connection: connection(),
on_failure: :reject_and_requeue,
qos: [prefetch_count: 10],
buffer_size: 10_000}
],
processors: [default: [concurrency: 10]]
)
end
When I kill my process via Process.exit(:kill)
then my supervisor crashes totally:
09:37:44.222 [error] CRASH REPORT Process <0.664.0> with 0 neighbours crashed with reason: no match of right hand value {error,{already_started,<0.624.0>}} in 'Elixir.Broadway.Topology':init/1 line 37
...
09:37:44.246 [info] Application scenarios exited with reason: shutdown
As I got it, the linked process was not killed. How to solve this problem?
mix.exs:
{:broadway_rabbitmq, "~> 0.6.5"},
Would it be possible to ease amqp
dep from 1.1
to 1.3
or ~> 1
?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.