Comments (37)
Hi Alexander!
I'm all open for new tests, better docs and other improvements.
from kafka.
As for Java library compatibility for API.. I have some concerns as I would prefer to keep things simple where possible. Like, different languages, different styles. Anyway, I'm open for discussion :)
from kafka.
@oleksiyk Well, there I was talking about compatibility in stuff like options and the way things are handled, meaning synchronization, etc. So Node.js Kafka client solves same tasks but using Node way. Okay, then, soon I'll come with a list of ideas.
from kafka.
Ok, let's get more concrete. These would be the top priority for me:
Compression: We use Kafka in production, and use snappy compression. The driver should support none, gzip & snappy.
Replace consumer fire & forget emitter with promise or function callback: when you have a producer way faster than the consumer, or a topic with many many messages to be processed, the consumer will keep receiving messages without making sure they have been processed (and committed), eventually crashing the application due to memory limitations.
// base_consumer.js
self.emit('data', p.messageSet, p.topic, p.partition);
// application
consumer.on('data', function (messageSet, topic, partition) { ... })
Committing offsets when a messageSet
has been processed does not guarantee that previous message sets have been fully processed, and a crash of the app would mean losing messages.
My proposal is something along the lines of:
// base_consumer.js
self.onMessageSet(p.messageSet, p.topic, p.partition).then(function () {..});
// application
function onMessageSet(messageSet, topic, partition) {
// process messages ...
return Promise.resolve(); // the consumer would only send another fetch request after this, and it could event auto commit offsets (if configured as an option)
});
The above is just a rough idea, but the main point is that no more fetch requests are sent to the Kafka broker until the application has acknowledge the consumption of the messages.
Ability to create topics: Not a must, but useful when unit testing.
from kafka.
Replace consumer fire & forget emitter with promise or function callback
Thats on top of roadmap
Compression
Something I have with a lower priority
Ability to create topics
There is no API call to create a topic. There is a server option autocreateTopics
(or something like that) which will make Kafka to create a topic when its metadata is first requested but the topic is created with default parameters, which is useless. Unless there will be a specific API call to create/manage a topic I'm against this. You can always call spawn(kafka-topics.sh..)
in unit tests.
from kafka.
Thats on top of roadmap
That's great to hear. Would you create an issue and we can elaborate there? Would you take this feature yourself?
Something I have with a lower priority
I'm happy with kicking off some work around compression, especially snappy since it's the one we currently use.
Regarding creation of topics, fair enough. Not having an API sucks, so I'm actually in favor of not having a function that only kind of works.
from kafka.
I'm not into source yet, guys.
Another thing is buffering of requests. Is it already implemented or to be done. I guess it would require changes in Client
. It's a tricky thing if from start client implementation is not ready for this (and some people tried to do even more strange things to resolve problems - SOHU-Co/kafka-node#301)
from kafka.
Another thing is buffering of requests.
Which exactly requests? Kafka protocol is all about bulk operations so I really don't see any need in additional buffering. Producer.send() allows you to send multiple messages at once. Its very easy to buffer them in your own code and then send a bulk of messages each second or however you like.
BaseConsumer also groups all fetch requests in one large bulk fetch request.
Am I missing something?
from kafka.
Ouch, sorry. Batching. Or it's me missing something. Let me check Kafka docs.
from kafka.
Would you take this feature yourself?
Yes, I would take "Replace consumer fire & forget emitter with promise" myself. I will open an issue soon.
from kafka.
Correct, I was talking about batching - 0.9.x docs have these settings: batch.size
and linger.ms
. It's not something super-important to me ATM, but still would be good to have. I could take this.
from kafka.
Batching is also interesting, not crucial either for me yet, but if @estliberitas takes that one, it would be great.
An interesting use case for buffering though is when the broker is temporarily unreachable, like when doing a rolling maintenance for example. As far as I know, the current producer will try 3 times now, with 1 second interval, which may not be enough to discover the new leader. The guys at Uber implemented something in these lines in a non-maintained Kafka driver, where you could say the limit in time and/or messages allowed to be buffered before erring.
from kafka.
Right, 3 times but with increasing delay, for a total of 6 seconds. This can probably be moved to options.
from kafka.
Better to move to options, really, coz otherwise someday we may run into situation like with node-zookeeper-client, where reconnection is handled so internally so in fact you can't affect it's behavior.
from kafka.
Alright, I will work on batching this weekend or more probably start of next week.
from kafka.
Btw, @carlessistare @paddy3883 @jfkoch are you interested in participating, guys? 😉
from kafka.
@estliberitas count me in. Should we move this conversation to a different forum? Like gitter?
from kafka.
@jfkoch Well it's already different forum ))
from kafka.
Yes count me in too!
On 6 Feb 2016 07:01, "Alexander Makarenko" [email protected] wrote:
@jfkoch https://github.com/jfkoch Well it's already different forum ))
—
Reply to this email directly or view it on GitHub
#12 (comment).
from kafka.
@jfkoch Good idea, but I don't think it's time for this yet. We here are not like discussing adding promises to Node.js core (Github trims that discussion afaik already coz of very big amount of messages).
from kafka.
@estliberitas yes I am interested in participating, even if I am not yet much into to code.
If you take the producer batch feature you may be interested in taking a look into my 2 pr's (one merge, and the other pending) for kafka-node SOHU-Co/kafka-node#262 SOHU-Co/kafka-node#314
That said, is there any other priority feature, I could take car of?
from kafka.
@carlessistare Sure I'll do, thanks.
/cc @oleksiyk Plz read msg above 😉
from kafka.
Hi, guys. I've got a bit too busy on these business days. Will work on batching this weekend for sure.
from kafka.
Actually, I'm going to start on batching as well. So probably no reason for you.
from kafka.
Once its complete it will go to 2.0 branch. There are some minor bugs pending and then I'm ready to release 2.0.
from kafka.
Okay. Btw, did you test gzip compression under big load? Event loop tick avg time would make a sense. Asking this, because Sync methods may cause a big deal by blocking event loop.
from kafka.
Btw, did you test gzip compression under big load?
Not yet. What kind of problem to expect?
Event loop tick avg time would make a sense.
What does that mean?
from kafka.
Since Node runs JS in 1 thread, when you call many *Sync()
methods and especially if they do a lot of computation you'll block JS thread. So, since Node is based on libuv
's event loop all the calling synchronous methods blocks current event loop tick. So timers don't fire, connections are not accepted while JS is executed (timers are executed in the start of event loop, all the async i/o enters JS environment then, and setImmediate()
fires after all of that if I recall correctly). This is a big price you pay if you do too much sync computation in Node.
This means any other business logic requiring fast interaction (websocket keep-alives, usual HTTP requests maybe) may be affected and start returning timeouts or similar errors.
from kafka.
Yeah, I understand that. But all buffers already received. It doesn't matter if you call ungzip() and wait for callback or call ungzipSync(). Same buffer will be processed the same sync way in zlib. It will only matter if you wait for more buffers to arrive from socket or file handle for example.
from kafka.
The only difference is that ungzip()
would execute in one of 4 (default) libuv i/o threads and ungzipSync()
- in JS.
I'll make a benchmark on weekend to get raw numbers. I did not do this in case of kafka-node
but your child definitely deserves such a care 👍
from kafka.
I'm happy to provide two ways for compression, sync and async. I suspect sync will be much faster for smaller Kafka messages (and/or large number of partitions for topic). Async might be better if compressed messages exceed some (>200KB?) size.
It would be nice to have a benchmark suite. Interested in creating one?
from kafka.
Sure.
from kafka.
Great!
Please select 2.0 branch as target for PR
from kafka.
Both sync and async compressions are now supported: c3a2378
with the asyncCompression
boolean option.
from kafka.
Hi guys. Sorry that did not notify earlier. I'm a bit out of game for some time because of relocation process.
from kafka.
So where does this leave stewardship for this module? Should new features probably target this repo or the original?
from kafka.
@tizzo it's actually original, different implementation, it's up to you what to work on
from kafka.
Related Issues (20)
- Bad signature for getPartitionOffset in base_consumer.js line 95 HOT 1
- LeaderNotAvailable disguised as UnknownTopicOrPartition HOT 1
- Unable to compile no-kafka v 3.2.10 in Angular 6
- Lodash dependency is a security vulnerability HOT 2
- Message timestamps as in KIP-32
- For enable_auto_commit False, no automatic re-join ; Error Heartbeat: local member_id was not recognized; this consumer needs to re-join HOT 2
- Consumer not working with SSL certificate
- 'Connection was aborted before connection was established.' gets stuck in a loop.
- Support Delete Topics
- If consumer group Id is wrong not throwing any error HOT 2
- "--from-beginning" consumer equivalent?
- consume non existing topic
- Return a message to the topic
- How to ensure all messages are read
- Lodash dependency is a security vulnerability HOT 1
- Kafka SASL credential based authentication support for producer and consumer
- Use of Promise.each() inside GroupConsumer dataHandler HOT 3
- support inquiry HOT 2
- There is no way to set enable.auto.commit config for a consumer
- WARN no-kafka-client No partition assignment received
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from kafka.