Comments (5)
Hi @mfontanini, any thoughts on this feature?
Also on a bit of an unrelated discussion, perhaps you have more experience with large kafka configurations...so assume you are publishing and consuming form a large number of topics (let's say single kafka cluster), some having different message throughput. Do you think it's better to have in the application a single producer pushing to all {topics,partitions} or a producer per topic in case some brokers are slower to ack? Same thing for consumers...would it be better to have a single one pulling from all partitions from various topics or one per topic? Any experience with this kind of setup? The advantage of single producer/consumer per topic is that you can configure each producer/consumer object slightly differently and also you don't risk having a slow broker impact the other producers. In this case I would also assume different poll threads for each producer/consumer.
from cppkafka.
I'm not sure if i'm missing something but it seems there's no way of overriding the default topic configuration once a producer/consumer has been instantiated. Suppose you produce to two different topics using the same producer, this is perfectly allowed by the API. The producer config takes a default topic config but that's across all topics. Producer::produce
calls the rd_kafka_producev
version of librdkafka which does not specify any topic handles like the regular rd_kafka_produce
does, hence you cannot override the topic config on a per-topic basis.
I also see that KafkaHandleBase
has a way to get a topic with a new config, but that Topic
class is not really used anywhere except for getting metadata. So if you want two different topic behaviors, I guess you have to instantiate two different producer/consumer classes?
from cppkafka.
I'm not sure about this, it looks kind of dirty. If you're worried about a function that handles a message not handling every exit point, can't you just create a dumb "auto committer" wrapper?
class AutoCommitter {
public:
AutoCommitter(Consumer& consumer, Message& msg) : consumer_(consumer), msg_(msg) { }
// no move, copy, etc
~AutoCommitter() {
consumer_.commit(msg_);
}
Consumer& consumer_;
Message& msg_;
};
I'd like to keep the message class as simple as possible. Also, doing that sort of logic in the destructor doesn't look very well (e.g. the fact that you have to check for uncaught_exception
gives a hint of this).
Regarding consumers/producers, we use microservices at work so I mostly have consume/produce from/to a single topic, hence a I have a single consumer/producer. I think this depends on the application but I don't think you should have a single application writing to a large number of topics. Personally I'd prefer having one consumer per topic (I don't think it makes too much difference for producers).
I'm not sure what's the behavior rdkafka has regarding topic configs and the producev
function. When I first implemented this, I was using produce
as producev
didn't exist. At the time, once you created a topic with a specific config, it would stick forever in that rdkafka produce/consumer handle. Doesn't producev
end up looking for the topic handle under the hood? Meaning, if you create a topic with a specific config and then use producev
, doesn't that still work as expected?
from cppkafka.
Hi, thanks for the feedback. Ok for the RAII commit stuff, it was just a thought.
As for the design, that was my gut feeling too in regards to one consumer/producer per topic. I will test with both scenarios and see. My preference would be to have a consumer/producer per config. If the config for a specific topic changes, then you spin-off another producer or consumer. At the same time it feels like a waste or resources to have multiple producers just because one topic setting is different.
Meaning, if you create a topic with a specific config and then use producev, doesn't that still work as expected?
Well based on the documentation in librdkafka, you get a new topic handle with rd_kafka_topic_new
and a specific config, and then when you use rd_kafka_produce
and rd_kafka_consume
you specify this handle and it will use the configuration specified instead of the default one. These are lower level functions and not really used so much in the auto-rebalance scheme. Since rd_kafka_producev
or rd_kafka_consumer_poll
which you are using don't really take a topic handle, that means that whatever topic you specify (via subscribe or in producev) will take the default config, so in your library unfortunately it's impossible to override a topic config.
from cppkafka.
rd_kafka_producev
is the new version of rd_kafka_produce
. The latter doesn't support newer options like message timestamps so I presume under the hood it's still the same. As I said, when I checked this, the config for a topic was sticky. Meaning when you create a topic with a config, it just sticks. Even if you then ask for a handle for the same topic, it will still have the initial config you used even if you don't provide the config on the second call. Are you sure if you create a topic and then you produce a message on it using rd_kafka_producev
it doesn't use your config? I know rd_kafka_producev
doesn't take a topic object but it still should somehow respect the config with which the topic was created, otherwise this is not a direct replacement for rd_kafka_produce
and hence it's broken. rd_kafka_producev
should be a replacement for rd_kafka_produce
, otherwise there's no way to create a message into a topic using e.g. a timestamp but using a non default topic config.
from cppkafka.
Related Issues (20)
- Cannot pull using consumer HOT 1
- poll_batch does not trigger commit when auto.commit is true ? HOT 1
- How to remove topic using the API
- Consumer misbehavior with newer librdkafka versions HOT 1
- Can't build tests on Ubuntu 22.04 (Jammy)
- produer success but kafka shell consumer all "null" HOT 1
- How to use BufferedProducer to send binary data?
- Dll import function not allowed
- create BufferedProducer<std::string> producer globally
- Is there any way to build it without "boost"? HOT 1
- Please help me, I have tried a variety of methods to compile, all failed, including using the method provided by chatGPT 4.0. also failed. HOT 1
- How to set ttl?
- How to get number of messages in the topic?
- add_library(Rdkafka::rdkafka) issue
- CPPKafka crashes when trying to create a configuration in Windows
- What happens if Kafka was offline ? HOT 1
- Automate Compilation of "examples" in Build Process
- warning: object backing the pointer will be destroyed at the end of the full-expression
- cppkafka::produce not flushing
- clang complains about parameter used in std::atomic_flag constructor
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from cppkafka.