oleksiyk / kafka Goto Github PK
View Code? Open in Web Editor NEWApache Kafka 0.9 client for Node
License: MIT License
Apache Kafka 0.9 client for Node
License: MIT License
I was trying this out - running the group consumer example in the README.md, so the default round-robin strategy. Running the consumer in two different processes - the console messages seem reasonable, joining and electing leaders. But pushing messages through tends to show up in both consumers - is this expected?
I was trying to understand why kafka-node
was consuming faster than no-kafka
and saw that kafka-node
doesn't have a delay between fetch requests
yet no-kafka
defaults to 1000ms for its fetch loop
Was there a reason to set the default at 1000ms (seems high)? Can you think of any risks of setting it to 0? (it also looks like the official Java client fetches continuously)
Producer / Consumers could benefit from a stream interface to increase interop with any NodeJS stream.
Thoughts?
Hi @oleksiyk, I've tried to run a producer + group consumer, but I'm running into some problems with the consumer. Using the same setup, the simple consumer works and I'm able to receive messages.
The error in question is:
[2016-02-03 23:04:34,155] ERROR Closing socket for /172.17.42.1 because of error (kafka.network.Processor)
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topics': java.nio.BufferUnderflowException
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:66)
at org.apache.kafka.common.requests.JoinGroupRequest.parse(JoinGroupRequest.java:85)
at kafka.api.JoinGroupRequestAndHeader$.readFrom(JoinGroupRequestAndHeader.scala:29)
at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50)
at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50)
at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:50)
at kafka.network.Processor.read(SocketServer.scala:450)
at kafka.network.Processor.run(SocketServer.scala:340)
at java.lang.Thread.run(Thread.java:745)
What I've done to setup the environment is:
$ docker run -d --name zookeeper jplock/zookeeper:3.4.6
$ docker run -d --name kafka --link zookeeper:zookeeper ches/kafka
$ ZK_IP=$(docker inspect --format '{{ .NetworkSettings.IPAddress }}' zookeeper)
$ KAFKA_IP=$(docker inspect --format '{{ .NetworkSettings.IPAddress }}' kafka)
$ docker run --rm ches/kafka kafka-topics.sh --create --topic test --replication-factor 1 --partitions 1 --zookeeper $ZK_IP:2181
Created topic "test".
My producer is:
var Kafka = require('no-kafka');
var producer = new Kafka.Producer({
connectionString: process.env.KAFKA_IP + ':9092'
});
return producer.init().then(function(){
return producer.send({
topic: 'test',
partition: 0,
message: {
value: 'Hello!'
}
});
}).then(console.log).then(process.exit);
And my group consumer:
var Kafka = require('no-kafka');
var consumer = new Kafka.GroupConsumer({
connectionString: process.env.KAFKA_IP + ':9092'
});
var strategies = {
strategy: 'TestStrategy',
subscriptions: ['test']
};
consumer.on('data', function (messageSet, topic, partition) {
messageSet.forEach(function (m) {
console.log(topic, partition, m.offset, m.message.value.toString('utf8'));
// process each message and commit its offset
consumer.commitOffset({topic: topic, partition: partition, offset: m.offset, metadata: 'optional'});
});
});
return consumer.init(strategies).then(function() {
// all done, now wait for messages in event listener
});
Running the consumer throws the following error: NoKafkaConnectionError (172.17.0.45:9092): Kafka server [172.17.0.45:9092] has closed connection
, which is due to the above error taken from the Kafka error log file.
Any help or ideas would be appreciated!
Thanks for the awesome library!
Question applies for both Publisher and Subscriber. What would be the best way to:
Version: 2.0.1
Use-case: using SimpleCustomer to receive events
Platform: Windows
Kafka: 0.9.0
Stack-trace:
Warning: a promise was created in a Promise.map handler but was not returned from it
at [...]\node_modules\no-kafka\lib\base_consumer.js:69:23
From previous event:
at d:\dev\attractions-reboot.git\attractions-service\node_modules\no-kafka\lib\base_consumer.js:59:28
at processImmediate as _immediateCallback
It seems like base_consumer.js:69 sould be prefixed by a "return"
Hi,
Im playing around with this lib but im not sure how the group consumers are supposed to work. Is it just to group a few partitions based on the selected strategy?
How can I round robin one producer over multiple consumers? Must I increment the partition after every message on the producer side?
Thank you!
Regards,
Riaan
When connection disappears or if never was on the first place then the NoKafkaConnectionError
is thrown.
ERROR no-kafka-client Metadata request failed: NoKafkaConnectionError [127.0.0.1:9092]: Error: connect ECONNREFUSED
The retries.delay
and retries.attempts
are not used in that case. The default 1000
value is used instead.
https://github.com/oleksiyk/kafka/blob/master/lib/client.js#L155
Can submit a PR which uses the delay provided in the Producer options. Otherwise, please fix. Thanks!
Are there any plans to support connecting to brokers over SSL as described in the securing kafka docs?
Connection string needs to be now kafka://host:port otherwise it fails in parsing the host and port:
lib/client.js
`
self.initialBrokers = self.options.connectionString.split(',').map(function (hostStr) {
var parsed = url.parse(hostStr);
var config = {
host: parsed.hostname,
port: parsed.port
};
return config.host && config.port ? new Connection(config) : undefined;
});
`
start 2 kafka brokers (ports 9092, 9093)
create mytopic
with replication-factor of 2
./bin/kafka-topics.sh --zookeeper localhost --topic mytopic --create --partitions 1 --replication-factor 2
start a consumer in a loop
var consumer = new Kafka.SimpleConsumer({connectionString: 'localhost:9092, localhost:9093'})
consumer.init().then(function () {
consumer.subscribe('mytopic', [], function (messageSet, topic, partition) {
messageSet.forEach(function (m) {
console.log(topic, partition, m.offset, m.message.value.toString('utf8'));
});
});
});
configure no-kafka
producer with max attempts and delay long enough to survive rebalancing
var producer = new NoKafka.Producer({
connectionString: 'localhost:9092, localhost:9093',
clientId: 'producer',
timeout: 100,
retries: {
attempts: 10,
delay: 1000
}
});
run no-kafka
producer in loop
var counter = 0;
setInterval(function() {
producer.send({
topic: 'mytopic',
message: { value: 'message #' + counter },,
partition: 0
})
.catch(function(err) { console.log('promise rejection', err); })
.then(function(err) { console.log('promise fulfillment', err.error); });
}, 1000)
kill -9
whichever broker is the leader for mytopic:0
Observe the messages be sent in reverse order when the new leader is elected
AFAIK, while sending out keyed messages, it should automatically be distributed to a partition based on the key, and I guess we don't need to specify the partition explicitly?
So, while sending out keyed messages, via no-kafka
it's expecting a partition, I get an error Missing or invalid partition
.
Is this expected?
I am trying a simple Producer example with no-kakfa
versioned 2.4.1
. The entire code is as,
var Kafka = require('no-kafka');
var producer = new Kafka.Producer()
return producer.init().then(function(){
return producer.send({
topic: 'kafka-test-topic',
partition: 0,
message: {
value: 'Hello!'
}
});
})
.then(function (result) {
console.log('the kafka s post send result is ', result)
});
And it throws exception as,
Unhandled rejection RangeError: Trying to access beyond buffer length
at checkOffset (buffer.js:582:11)
at Buffer.readInt32BE (buffer.js:667:5)
at _Reader.define.read (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/node_modules/bin-protocol/lib/index.js:101:47)
at _Reader._read (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
at apply (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/node_modules/lodash/lodash.js:427:27)
at _Reader.wrapper (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/node_modules/lodash/lodash.js:4675:16)
at _Reader.Protocol.define.read (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/lib/protocol/produce.js:72:14)
at _Reader._read (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/node_modules/bin-protocol/lib/reader.js:42:12)
at apply (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/node_modules/lodash/lodash.js:426:27)
at _Reader.wrapper [as ProduceResponse] (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/node_modules/lodash/lodash.js:4675:16)
at /home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/lib/client.js:294:67
at tryCatcher (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/node_modules/bluebird/js/release/util.js:16:23)
at Promise._settlePromiseFromHandler (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/node_modules/bluebird/js/release/promise.js:503:31)
at Promise._settlePromise (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/node_modules/bluebird/js/release/promise.js:560:18)
at Promise._settlePromise0 (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/node_modules/bluebird/js/release/promise.js:605:10)
at Promise._settlePromises (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/node_modules/bluebird/js/release/promise.js:684:18)
at Async._drainQueue (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/node_modules/bluebird/js/release/async.js:126:16)
at Async._drainQueues (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/node_modules/bluebird/js/release/async.js:136:10)
at Async.drainQueues (/home/sk/ws/express/node-postgres-todo/node_modules/no-kafka/node_modules/bluebird/js/release/async.js:16:14)
at process._tickCallback (node.js:419:13)
Am i missing something here?
Hello @oleksiyk!
I'm moving discussion from SOHU-Co/kafka-node#309 here. At least I came to thinking that author of kafka-node
has no time or wish to re-organize his project for the good of everyone. Also I'm seeing kind of regress in code and quality of module after 0.3.0
was released. And your module seems to be very promising. So we'd like to help ya with kind of roadmap, splitting tasks, covering with tests, review, etc. Are you interested or not? 😉
/cc @ismriv @paddy3883
If a producer suddenly loses connection to all brokers:
_try
closure for each call to producer.send
updateMetadata
(but doesn't throw any errors AKA "reject any promises")Ideally (and maybe this is already possible), we could subscribe (or catch) to those failures after a circuit breaker pattern has been tripped and then stop calling producer.send
OR... the producer.send
would reject a promise after the circuit breaker has been tripped (might be the simplest and most intuitive).
If we don't have a mechanism for subscribing to connection failures, our no-kafka
processes are just going to hit the VM memory limit until the broker(s) is/are available again.
I'm working on porting my application from kafka-node to no-kafka. So far everything has been much easier to use. Thanks!
I'm a little uncertain what the strategy
property of the GroupConsumer init is for. I've looked through the tests and code a little bit, and it looks like it is just used as a key when storing multiple consumer strategies and handlers, but all of the tests use 'TestStrategy'
. It would be a nice addition to the docs to explain what this is for.
How should I handle failure while processing messages?
It seems client/lib uses offset of the last received message and not the latest committed offset.
see https://github.com/oleksiyk/kafka/blob/master/lib/base_consumer.js#L72
Is this a bug or should it be handled by the application.
Shall we clearly state in LICENSE
& README
that this module is licensed under MIT? The only reference I find now is in package.json
.
Do you plan supporting Zookeeper connection string? So that no-kafka will fetch Kafka hosts automatically?
Simply missing the tag for selecting the most recent version you have published.
When creating a new producer without options like in the example, my API crashes with this error:
[...]/node_modules/no-kafka/lib/producer.js:28
this.partitioner = options.partitioner;
^
TypeError: Cannot read property 'partitioner' of undefined
at new Producer ([...]/node_modules/no-kafka/lib/producer.js:28:31)
at ...
Do you plan to support subscription to multiple topics via pattern matching (as in the official Java API: http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.regex.Pattern,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener) )?
Or do i have to take care of it myself?
The current Producer doesn't support Keyed Messages. We use log compaction in a number of topics, and this would prevent us from fully moving to the new driver. Is this feature in the roadmap?
Great improvements in the last weeks @oleksiyk!
Request for some API's for topic introspection and management, such as:
Or do you feel that no-kafka is more a consumption/production solution? Interested in what the roadmap might be.
When calling .end(), there's lots of error logging that ends up in the console, such as:
2016-01-07T23:27:11.935Z ERROR Error: Kafka server has closed connection
[Error: Kafka server has closed connection]
at d:\Github\kafka-poc\node_modules\no-kafka\lib\connection.js:81:30
at Array.forEach (native)
at Connection._disconnect (d:\Github\kafka-poc\node_modules\no-kafka\lib\connection.js:80:16)
... blah blah
I was thinking perhaps for the .end() method we might pass a 'skipRejections' flag to _disconnect and make it not bother calling the explicit rejections for promises. Ordinarily on a disconnect we'd want to maintain state, but it seems redundant if we are doing a shutdown since there is nothing we should want users to care about on the instance after an .end(). Something like:
// events call directly to make sure internal state is maintained
Connection.prototype._disconnect = function (err, skipRejections) {
if (!this.connected) {
return;
}
this.socket.end();
this.connected = false;
if (!skipRejections) {
this.queue.forEach(function (t) {
t.reject(err ? err : new Error('Kafka server has closed connection'));
});
}
this.queue = [];
};
Then:
Connection.prototype.close = function () {
// console.log('Closing connection', this.host + ':' + this.port);
this.auto_connect = false;
this._disconnect(null, true);
};
I only see sample code of SSL use in producer, but not sure if it is a limitation in the current release
Does there is any plan or possibility that no-kafka
could support auto.create.topic
like the old time?
Is there any way to force a consumer to close connection?
Something like what kafka-node does.
The use case is to be able to capture SIGINT|SIGTERM and close the GroupConsumer before baling.
Thanks!
Thank you for the awesome module supporting so many neat features!
I'm struggling with its API though. It's not clear from the README what are the producer.send()
function options.
message
. Although, the message
can have property message
. So it's unclear what "message" is. Either the first argument, or the property of the first argument.message
but messages
. I assume, but not sure, that the first argument can be an array.options
argument possible properties are not listed anywhere. All I see is the Producer
options. Could you update the README with the .send
function options please?Thank you so much
I see you updated the docs to suggest not using Promise.map on a messageSet in the Group Consumer handler (1a63120), but the commitOffset method signature still allows for an array of commits.
/**
* Commit (save) processed offsets to Kafka
*
* @param {Object|Array} commits [{topic, partition, offset, metadata}]
* @return {Promise}
*/
GroupConsumer.prototype.commitOffset = function (commits) {
var self = this;
return self.client.offsetCommitRequestV2(self.options.groupId, self.memberId, self.generationId,
self._prepareOffsetRequest('commit', commits));
};
Is it not safe, or otherwise undesirable, to batch commit the entire messageSet in the handler function for the same reason you suggest not doing concurrent commits of individual messages?
In some cases it's desirable to have a custom/central logging component for an application that allows collection of messages from the external/third-party libraries (such as no-kafka). Suggestion here is to make the constructors of the no-kafka library accept an additional parameter on:
This parameter takes the form of an object with .log/.warn/.debug methods - and then defaults back to nice-simple-logger when no value for this property is passed. This allows consuming code to pass in custom logging handlers, such as:
const csmr = new GroupConsumer({
logger: {
warn: function (msg) {
// lorem ipsum
}
}
});
The object should be validated to ensure that all required operations are present as typeof === 'function' and then any missing members should be defaulted to passing through to NSL, allowing only specific types of events to be collected.
bithound / codeclimate would be a good addition
Just came into mind.
I'm wondering why the default producer timeout is set to 100ms.
This timeout is passed to the client and then on to the ProduceRequest call. According to the Kafka Protocol guide and the documentation page:
The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to meet the acknowledgment requirements the producer has specified with the acks configuration. If the requested number of acknowledgments are not met when the timeout elapses an error will be returned. This timeout is measured on the server side and does not include the network latency of the request.
Returning an error to the client before all brokers ACK may lead developers to manually retry the write and cause data duplication.
The default Kafka setting is 30 seconds. Why set it so low in this driver?
Hello,
I've noticed when adding new GroupConsumers to a topic for watching, they start watching from the current position of the topic and not the start position. This means only new data from the time the consumer is added is reported on, which is often not the requirement (i.e. Adding a new consumer to read a log from start to finish, and then watch the tail).
Is there a flag or option to make the initial offset be 0 for a new consumer?
-Steve
Hey,
When writing unit tests, there's no obviously documented way to close the consumers. This is fine for how the tests are written for the library itself, but if you're running them as part of a gulp job then the tests will never let the node process exit, because there's callbacks hanging around.
-Steve
Hi Oleksiyk
I've been using no-kafka for couple of months and by far this is the best npm package for kafka.
I've recently switched to 2.2.1 and noticed one small issue with base_consumer (used as SimpleConsumer).
Each time dataHandler is invoked i receive a warning "Updating offset because of OffsetOutOfRange"
It's probably nothing but I checked that since 2.2.1 I don't have to add +1 to offset when subscribing to kafka partition.
I noticed that +1 is still used in finally() handler in base_consumer::_fetch.
If this is not an issue then sorry for trouble.
Thanks for the great work
Tomasz
My server url is configured with the path like 'localhost:2121/kafka', but I found no-kafka connect the server by net.socket, and only use host and port.
Could you suggest any way to support it?
In the beginning, it is confuse me when i open my laptop the error down here was always in the console
RangeError: Maximum call stack size exceeded
But, just now , when i rejoin the wifi network, the error reappear again.
After couple times testing, I am sure that the rejoin would cause the error.
There are some console info.
2016-03-04T08:53:59.064Z INFO group-name-56d94c8a737bb0fb808ade52 Joined group group-name generationId 3 as group-name-56d94c8a737bb0fb808ade52-2fc5cefc-c9b4-41d6-91e0-438075a26395
2016-03-04T08:53:59.382Z ERROR group-name-56d94c8a737bb0fb808ade55 Sending heartbeat failed: KafkaError: UnknownMemberId: Returned from group requests (offset commits/fetches, heartbeats, etc) when the memberId is not in the current generation.
{ [KafkaError: Returned from group requests (offset commits/fetches, heartbeats, etc) when the memberId is not in the current generation.]
name: 'KafkaError',
code: 'UnknownMemberId',
message: 'Returned from group requests (offset commits/fetches, heartbeats, etc) when the memberId is not in the current generation.' }
at Object.exports.byCode (/path/to/program/node_modules/no-kafka/lib/errors.js:84:12)
at _Reader.Protocol.define.read (/path/to/program/node_modules/no-kafka/lib/protocol/common.js:87:23)
at _Reader._read (/path/to/program/node_modules/bin-protocol/lib/reader.js:41:18)
at apply (/path/to/program/node_modules/bin-protocol/node_modules/lodash/lodash.js:419:27)
at _Reader.wrapper [as ErrorCode] (/path/to/program/node_modules/bin-protocol/node_modules/lodash/lodash.js:4499:16)
at _Reader.Protocol.define.read (/path/to/program/node_modules/no-kafka/lib/protocol/group_membership.js:275:14)
at _Reader._read (/path/to/program/node_modules/bin-protocol/lib/reader.js:41:18)
at apply (/path/to/program/node_modules/bin-protocol/node_modules/lodash/lodash.js:418:27)
at _Reader.wrapper [as HeartbeatResponse] (/path/to/program/node_modules/bin-protocol/node_modules/lodash/lodash.js:4499:16)
at /path/to/program/node_modules/no-kafka/lib/client.js:541:61
From previous event:
at /path/to/program/node_modules/no-kafka/lib/client.js:540:40
at processImmediate [as _immediateCallback] (timers.js:383:17)
From previous event:
at Client.heartbeatRequest (/path/to/program/node_modules/no-kafka/lib/client.js:531:48)
at GroupConsumer._heartbeat (/path/to/program/node_modules/no-kafka/lib/group_consumer.js:201:24)
at /path/to/program/node_modules/no-kafka/lib/group_consumer.js:209:25
at Timer.listOnTimeout (timers.js:92:15)
[at apply (/path/to/program/node_modules/no-kafka/node_modules/lodash/lodash.js:419:27)]
2016-03-04T08:54:00.060Z INFO group-name-56d94c8a737bb0fb808ade52 Rejoining group on RebalanceInProgress
/path/to/program/node_modules/bluebird/js/release/promise.js:533
Promise.prototype._settlePromise = function(promise, handler, receiver, value) {
^
RangeError: Maximum call stack size exceeded
at Promise._settlePromise (/path/to/program/node_modules/bluebird/js/release/promise.js:533:44)
at Promise._settlePromise0 (/path/to/program/node_modules/bluebird/js/release/promise.js:605:10)
at Promise._settlePromises (/path/to/program/node_modules/bluebird/js/release/promise.js:684:18)
at Promise._fulfill (/path/to/program/node_modules/bluebird/js/release/promise.js:629:18)
at Promise._resolveCallback (/path/to/program/node_modules/bluebird/js/release/promise.js:424:57)
at Promise._settlePromiseFromHandler (/path/to/program/node_modules/bluebird/js/release/promise.js:515:17)
Doesn't know it is related about he newest change log of [email protected]
, which have said something about stack overflow
.
If I have a docker-compose configuration that has 3 containers I can reproduce a connection issue where nokafkaclient
can't subscribe to anything:
services:
zookeeper:
image: "zookeeper:latest"
ports:
- "2181:2181"
kafka:
image: "kafka:latest"
depends_on:
- "zookeeper"
ports:
- "9092:9092"
nokafkaclient:
depends_on:
- "zookeeper"
- "kafka"
(the "depends_on" feature of docker-compose doesn't introduce delays or "waits", all it does is control order)
If docker-compose up nokafkaclient
is run, the simple consumer's init
's promise returns successfully, yet calling subscribe
promise returns an error:
This request is for a topic or partition that does not exist on this broker.
logLevel
at 5, there's no debug output at allrestarting the nokafkaclient
will resolve it.
If using the simple consumer, should the metadata be manually refreshed at certain times, events or errors?
Lines 66 to 73 in 067c48d
It seems like the finally handler there is incrementing offsets regardless of handler success. This means that if the handler chooses not to commit offsets to kafka due to failures in the handler itself, the consumer will still continue to consume messages, leading to potential divergence in persisted offsets and processed offsets.
What is the recommended way of not advancing the offset on failure?
Moreover, if a batch of payloads fails some of the way through, a handler can commit the last successful offset. In this scenario, shouldn't the consumer continue from the offset after this last successful one until the handler commits a greater offset?
When a Kafka leader is kill -9
'd, the producer has inconsistent behavior.
producer.send()
promise is fulfilled and no error is returnedproducer.send()
promise is rejected with UnknownTopicOrPartition
. It looks like it might be coming from here Line 228 in 067c48d
kill -9
'd broker)Assuming my observations are correct, I would further assume that Producer.prototype._send
needs some kind of wrapper/retry around _prepareProduceRequest
- but maybe this design was intentional.
Would it be possible to implement the solution to reuse the Client instance between Consumer and Producers?
For example, right now I'm looking at the topicMetadata
and need that before I can make one of my Consumers, so I thought it'd be nice to simply make a new Client, wait for it to init, and grab the metadata, and then reuse the same client for my new Consumer.
I've tried a few things, and can't seem to find anywhere this is implemented with the simple logger.
Would it be possible to use something like winston, or bunyan, and simply just give it the logger object, rather than a logger function?
In our tests, lz4 is very cpu friendly and allows us to achieve higher throughput than snappy or gzip.
start 2 kafka brokers (ports 9092, 9093)
create mytopic
with replication-factor of 2
./bin/kafka-topics.sh --zookeeper localhost --topic mytopic --create --partitions 1 --replication-factor 2
configure no-kafka
producer with max attempts of 1
var producer = new NoKafka.Producer({
connectionString: 'localhost:9092, localhost:9093',
clientId: 'producer',
timeout: 50,
retries: {
attempts: 1,
delay: 50
}
});
run no-kafka
producer in loop
setInterval(function() {
producer.send({
topic: 'mytopic',
message: { value: 'value' },
partition: 0
})
.catch(function(err) { console.log('promise rejection', err); })
.then(function(err) { console.log('promise fulfillment', err.error); });
}, 1000)
kill -9
whichever broker is the leader for mytopic:0
witness the producer loop get into a dead/inert failed state
The simple consumer's fetch loop ensures that metadata is updated despite arbitrarily long broker outages.
The producer, however, gets into an inert state after a long leader election or broker outage (as in, the outage lasts longer than the default of 3 attempts with 1000ms delay) in which case the promise is fulfilled with an error NoKafkaConnectionError
(rather than rejected) and repeated calls to producer.send
continues to fail (because the metadata is not updated).
If the original broker comes back online, the error, expectedly changes to: NotLeaderForPartition
(but still as a promise fulfillment rather than promise rejection)
Do you have any recommendations on how best to handle this? I can think of a few:
attempts
for the producer to something arbitrarily largeno-kafka
to reject the promise allowing the caller to decide what to do (reconnect or fail permanently) (or... was it intentional to return the error via the promise fulfillment?) in either case, reconnect or fetch metadata on failure at caller's discretionno-kafka
no-kafka
to apply a throttled metadata update on ALL producer.send'sno-kafka
to apply a throttled metadata update on ALL ERROR'D producer.send's (seemingly most sensible)no-kafka
to apply a circuit breaker like pattern on ALL ERROR'D producer.sends支持0.8 版本的吗
There's a small glitch when closing a connection shortly after it opens where Connection._recieve will get shift().resolve() called on a null.
The error stack looks like:
stream.js:74
throw er; // Unhandled stream error in pipe.
^
TypeError: Cannot read property 'resolve' of undefined
at Connection._receive (d:\Github\kafka-poc\node_modules\no-kafka\lib\connection.js:167:11)
at emitOne (events.js:77:13)
at Socket.emit (events.js:169:7)
at readableAddChunk (_stream_readable.js:146:16)
at Socket.Readable.push (_stream_readable.js:110:10)
at TCP.onread (net.js:523:20)
This issue is to collection ideas and details of further implementation.
Basically, what I propose is using two (configurable) characteristics which would control batching:
batchSize
Number
. Max size of batch buffer. Once current buffer hits this size, we send batch to Broker. Default: 16384
or 16kb. In environment where amount of produce requests is unpredictable this value should be lowered.batchLinger
Number
. Amount of time in ms to wait before sending batch to broker. If batchSize
is not hit and timeout reached its end, batch is sent. Default: 0
meaning that such timeout is disabled.Considerations:
highWaterMark
option). As pointed above, there are environment where one can't predict amount of produce requests. But in such case batchLinger
can be set to 100
.batch
object?Suggested in #59
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.