Code Monkey home page Code Monkey logo

kafka-avro's People

Contributors

941design avatar bfncs avatar dapetcu21 avatar dependabot[bot] avatar eparreno avatar javierholguera avatar johnbenz13 avatar macabu avatar oleksandrkrupko avatar pharoz avatar pleszczy avatar ricardohbin avatar scottchapman avatar scottwd9 avatar thanpolas avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-avro's Issues

Unhandled rejection Error: connect ECONNREFUSED 127.0.0.1:80

Hi,

I am trying to run a simple demo.

var KafkaAvro = require('kafka-avro');
var kafkaAvro = new KafkaAvro({
    kafkaBroker: 'localhost:9092',
    schemaRegistry: 'http://localhost:8081'
});

// Query the Schema Registry for all topic-schema's
// fetch them and evaluate them.
kafkaAvro.init()
    .then(function() {
        console.log('Ready to use');
    });}

I get this error at
/node_modules/kafka-avro/lib/schema-registry.js:30

this.schemaRegistryUrl = new URL(opts.schemaRegistryUrl + (opts.schemaRegistryUrl.endsWith('/') ? '' : '/'));

TypeError: URL is not a constructor

I changed this
require('url').URL
with
require('url').Url
and the error stops.

But now I get this kind of error

[2018-09-14T17:43:26.591Z]  INFO: KafkaAvro/48426 on MBP.lan: init() :: Initializing KafkaAvro... (module=/kafka-avro.js)
[2018-09-14T17:43:26.596Z]  INFO: KafkaAvro/48426 on MBP.lan: init() :: Initializing SR, will fetch all schemas from SR... (module=/schema-registry.js)
[2018-09-14T17:43:26.596Z] DEBUG: KafkaAvro/48426 on MBP.lan: _fetchSchemas() :: Schemas refreshed (module=/schema-registry.js)
[2018-09-14T17:43:26.597Z] DEBUG: KafkaAvro/48426 on MBP.lan: _fetchAllSchemaTopics() :: Fetching all schemas using url: [object Object] (module=/schema-registry.js)

Unhandled rejection Error: connect ECONNREFUSED 127.0.0.1:80
    at Object.exports._errnoException (util.js:1022:11)
    at exports._exceptionWithHostPort (util.js:1045:20)
    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1087:14)

I am running the confluent docker image

broker            /etc/confluent/docker/run        Up      0.0.0.0:29092->29092/tcp, 0.0.0.0:9092->9092/tcp
connect           /etc/confluent/docker/run        Up      0.0.0.0:8083->8083/tcp, 9092/tcp                
control-center    /etc/confluent/docker/run        Up      0.0.0.0:9021->9021/tcp                          
ksql-cli          /bin/sh                          Up                                                      
ksql-datagen      bash -c echo Waiting for K ...   Up                                                      
ksql-server       /etc/confluent/docker/run        Up      0.0.0.0:8088->8088/tcp                          
rest-proxy        /etc/confluent/docker/run        Up      0.0.0.0:8082->8082/tcp                          
schema-registry   /etc/confluent/docker/run        Up      0.0.0.0:8081->8081/tcp                          
zookeeper         /etc/confluent/docker/run        Up      0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp

SOLVED

Update to Nodejs 8.12.0 solved the problem

are stream callbacks synchronous?

We have implemented kafka-avro in a platform, which handles over million events a day. During e2e tests, we found out that even though our callback functions are async operations, some events do not wait for callback to be finished and we get wrong calculations.
Are callbacks parallelized? Any help appreciated.

Unhandled rejection Error: Local: Broker transport failure

Hi,

After the kafka-avro initialization and getting the message ready getting the following error:
Unhandled rejection Error: Local: Broker transport failure
at Function.createLibrdkafkaError [as create] (/Users/xxxx/mydevelopment/Kafka/node_modules/kafka-avro/node_modules/node-rdkafka/lib/error.js:254:10)
at /Users/xxxx/mydevelopment/Kafka/node_modules/kafka-avro/node_modules/node-rdkafka/lib/client.js:201:30
at /Users/xxxx/mydevelopment/Kafka/node_modules/kafka-avro/node_modules/node-rdkafka/lib/client.js:342:9
Can you kinldy help?

Many thanks,
D Tanna

Authenticate with Confluent Schema Registry

I went over the documentation multiple time but couldn't find how to authenticate to Confluent Schema Registry. Where do I put the registry api_key and api_secret in the configuration?

Testing hangs on CI

Testing will hang on both CIs we've tried, CircleCI and Travis.

The CI's both will hang without any error thrown and the ultimate timeout of 10 minutes will stop the build. As you can observe on CIrcleCI builds #20, #21, #22, #23, #24, the test will freeze on different moments in the testing flow, typically after a few Produce-Consume tests. Same behavior is observed on the travisCi builds.

I have pushed a build with node-rdkafka debug set to true and is available here: https://circleci.com/gh/waldophotos/kafka-avro/29

On each test all the Consumers and Producers that were instantiated are being disconnected.

I'm pretty much stuck right now, not sure which direction to take. @webmakersteve if you have any ideas, suggestions I'd greatly appreciate them.

I'll keep this issue updated as I progress.

checkForAllVersions is lost in code refactoring

At the end of schema registration, .then(this._checkForAllVersions); was used in init logic before.

In v.1.0.0.

SchemaRegistry.prototype.init = Promise.method(function () {
  log.info('init() :: Initializing SR, will fetch all schemas from SR...');

  return this._fetchTopics()
    .bind(this)
    .then(this._storeTopics)
    .map(this._fetchLatestVersion, { concurrency: 10 })
    .filter(Boolean)
    .map(this._fetchSchema, { concurrency: 10 })
    .map(this._registerSchemaLatest)
    .then(this._checkForAllVersions);
});

in latest version:

SchemaRegistry.prototype._fetchSchemas = function () {
  log.debug('_fetchSchemas() :: Schemas refreshed');
  return this._fetchTopics()
    .bind(this)
    .then(this._storeTopics)
    .map(this._fetchLatestVersion, { concurrency: 10 })
    .filter(Boolean)
    .map(this._fetchSchema, { concurrency: 10 })
    .map(this._registerSchemaLatest);
};

.then(this._checkForAllVersions); was lost, ultimately making the functionality dead

Looks like there's something else too; as just adding this line is not deserializing it rightly..

Edit - For my case, I had to increase the concurrency to 50 and it solved the problem. Please add the checkforAllVersions logic in the final code.

App doesn't exit when `fetchRefreshRate` option is used.

Steps:

      var kafkaAvro = new KafkaAvro({
             kafkaBroker: brokers || 'localhost:29092',
             schemaRegistry: schemaRegistry || 'http://localhost:8081',
             fetchRefreshRate: 60
          });

create a consumer and use it ...
Disconnect:

consumer.disconnect( () => {
          console.log("back from disconnect()");
        });

App doesn't exit.
App exits after consumer disconnects, when the fetchRefreshRate is not used.

parsed value not correctly serialized

data: {
value: <Buffer 00 00 00 00 07 02 48 35 44 35 37 45 42 33 41 2d 32 42 33 38 2d 34 34 46 39 2d 39 45 44 38 2d 46 30 35 42 32 38 30 44 34 43 33 41 00 00 00 00 b2 cf 5e ... 662 more bytes>,
size: 712,
key: <Buffer 53 74 72 75 63 74 7b 4f 62 6a 65 63 74 49 64 3d 35 44 35 37 45 42 33 41 2d 32 42 33 38 2d 34 34 46 39 2d 39 45 44 38 2d 46 30 35 42 32 38 30 44 34 43 ... 3 more bytes>,
topic: 'btdl.dbo.Transactions',
offset: 150937,
partition: 0,
timestamp: 1621764554821,
parsedKey: 'Struct{ObjectId=5D57EB3A-2B38-44F9-9ED8-F05B280D4C3A}',
parsed: '\x00\x00\x00\x00\x07\x02H5D57EB3A-2B38-44F9-9ED8-F05B280D4C3A\x00\x00\x00\x00��^A\x00\x00������Ё-\x00\x00\x00\x00\x00\x00\x00\x00\x00H�z\x14���@\x00\x00\x00\x00��^AHEAC8C12E-A090-4AB5-AF3B-F7F9976DD6A3\x00\x00\x00\x00\x00\x00\x00��\t\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00������Ё-\x02HF192521A-C1A4-40D1-9F81-0A9226A6EE6D\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02���Ԡ���-\x00\x02\x00\x00\x00\x00\x00��@\x00\x02��k\x00\x00\x02H5D57EB3A-2B38-44F9-9ED8-F05B280D4C3A\x00\x00\x00\x00��^A\x00\x00������Ё-\x00\x00\x00\x00\x00\x00\x00\x00\x00H�z\x14���@\x00\x00\x00\x00��^AHEAC8C12E-A090-4AB5-AF3B-F7F9976DD6A3\x02\x12BCA - BCA\x02@Hubungi Customer Service [BCA--]\x00\x02\x144270059789\x00\x00\x00��\t\x02\x1EG-63757351928-1\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x02\x0EA878974������Ё-\x02HF192521A-C1A4-40D1-9F81-0A9226A6EE6D\x00\x00\x02\x00\x02\x1C114.124.174.42\x02\x10mekibau3\x00\x00\x00\x00\x02���Ԡ���-\x02\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00��@\x00\x02��k\x00\x02\x18YOGA WARDAYA\x161.2.2.Final\x12sqlserver\bbtdl��զ�^\x00\n' +
'false\x1ECashMarket-BTDL\x06dbo\x18Transactions\x02,00001fdd:0000a818:004f\x02,00001fdd:0000a818:00ac\x02\x04\x02u\x02�����^\x00'
}

Unable to get relative time offset for "now minus X days"

For a number of services in my organization we are required to replay state from the last few days to get "clean".

We have messages that arrive very frequently (in the order of minutes) and events that are only to be seen daily, if at all. In order to calculate the state of the world we may need to start "yesterday at midnight" or "at the beginning of the calendar month" or "about 30 days ago".

In the Java APIs getOffsetsForTimes exists (read more) which gives a relative offset given a timestamp, our colleges in the org who use Java make extensive use of this.

The similar API can exist on rdkafka itself, kinda (read more)

node-kafka provides kafka.Offsets(client) (read more)

None of these options in Node.js land are anywhere near as elegant as those in the Java world, and unfortunately I'm neither a Java programmer nor a Kafka expert, it's entirely possible that I'm just clueless on how to access this information via the kafkaAvro instance.

Any pointers are appreciated, else I can try and write this up on StackOverflow and hope for the best over there.

Are producer streams supported?

In node-rdkafka, you can use ProducerStreams, similar to ConsumerStreams. I only see kafkaAvro.getConsumerStream, kafkaAvro.getConsumer and kafkaAvro.getProducer. Is there a way to get or create a ProducerStream?

ld: symbol(s) not found for architecture x86_64 when running npm install

Hello everyone,

I'm trying to install this module into my project but I get the following error

ld: symbol(s) not found for architecture x86_64
clang: error: linker command failed with exit code 1 (use -v to see invocation)
make[2]: *** [librdkafka.1.dylib] Error 1
make[1]: *** [libs] Error 2
make: *** [88679b727fa5c22c41a05490f93df34949b97955.intermediate] Error 2
rm 88679b727fa5c22c41a05490f93df34949b97955.intermediate

I'm using a macOS High Sierra Version: 10.13.6 (17G65)
My node version is v8.10.0
My npm version is 6.4.1

What other information can I provide to you?
How can I help you to solve this?

Please let me know. Thank you

Avro validation on a version of schema

Hey,

I was unable to find a way to validate the message against a specific version of a schema, which is not the latest one. Is there currently a way to do so using the confluent schema-registry either using the version id or schema id?

Edit:

Something like using a resolver for 2 different versions of a schema as asked here

kafkaAvro.init() causes ECONNREFUSED 127.0.0.1:80

When I follow the quickstart example in the repo README, I get the following error and stacktrace:

> kafkaAvro.init().then(function() { console.log("Ready to use"); });
Promise {
  _bitField: 2097152,
  _fulfillmentHandler0: undefined,
  _rejectionHandler0: undefined,
  _promise0: undefined,
  _receiver0: undefined,
  _boundTo:
   noop {
     schemaRegistryUrl: 'localhost:8081',
     valueSchemas: {},
     keySchemas: {},
     schemaMeta: {},
     schemaTypeById: {} } }
> Unhandled rejection Error: connect ECONNREFUSED 127.0.0.1:80
    at Object._errnoException (util.js:1041:11)
    at _exceptionWithHostPort (util.js:1064:20)
    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1153:14)

Check topic existence on Producer '_produceWrapper()`

Hi,

I am trying out a fairly simple example. I have the following topic and avro key and value schemas created:

test.topic

{
subject: "test.topic-key",
version: 1,
id: 2,
schema: ""string""
}

{
subject: "test.topic-value",
version: 1,
id: 1,
schema: "{"type":"record","name":"User","namespace":"test.avro","fields":[{"name":"name","type":"string"}]}"
}

When I attempt to produce a subsequent message with kafka-avro I get the following error:

Ready to use A problem occurred when sending our message Error: invalid "string": undefined at throwInvalidError (/mnt/d/dev/lambda-kafka/src/test2/node_modules/avsc/lib/types.js:2688:9) at StringType._write (/mnt/d/dev/lambda-kafka/src/test2/node_modules/avsc/lib/types.js:743:5) at RecordType.writeUser [as _write] (eval at RecordType._createWriter (/mnt/d/dev/lambda-kafka/src/test2/node_modules/avsc/lib/types.js:2005:10), :4:6) at RecordType.Type.encode (/mnt/d/dev/lambda-kafka/src/test2/node_modules/avsc/lib/types.js:294:8) at Object.magicByte.toMessageBuffer (/mnt/d/dev/lambda-kafka/src/test2/node_modules/kafka-avro/lib/magic-byte.js:29:18) at Ctor.Producer.serialize (/mnt/d/dev/lambda-kafka/src/test2/node_modules/kafka-avro/lib/kafka-producer.js:110:28) at Ctor.Producer._produceWrapper (/mnt/d/dev/lambda-kafka/src/test2/node_modules/kafka-avro/lib/kafka-producer.js:94:23) at kafkaAvro.getProducer.then.producer (/mnt/d/dev/lambda-kafka/src/test2/handler.js:28:30) at tryCatcher (/mnt/d/dev/lambda-kafka/src/test2/node_modules/bluebird/js/release/util.js:16:23) at Promise._settlePromiseFromHandler (/mnt/d/dev/lambda-kafka/src/test2/node_modules/bluebird/js/release/promise.js:512:31) at Promise._settlePromise (/mnt/d/dev/lambda-kafka/src/test2/node_modules/bluebird/js/release/promise.js:569:18) at Promise._settlePromise0 (/mnt/d/dev/lambda-kafka/src/test2/node_modules/bluebird/js/release/promise.js:614:10) at Promise._settlePromises (/mnt/d/dev/lambda-kafka/src/test2/node_modules/bluebird/js/release/promise.js:693:18) at Async._drainQueue (/mnt/d/dev/lambda-kafka/src/test2/node_modules/bluebird/js/release/async.js:133:16) at Async._drainQueues (/mnt/d/dev/lambda-kafka/src/test2/node_modules/bluebird/js/release/async.js:143:10) at Immediate.Async.drainQueues (/mnt/d/dev/lambda-kafka/src/test2/node_modules/bluebird/js/release/async.js:17:14)

Here is the code:

const KafkaAvro = require('kafka-avro');

let kafkaAvro = new KafkaAvro({
    kafkaBroker: 'localhost:9092',
    schemaRegistry: 'http://localhost:8081',
});

kafkaAvro.init()
    .then(function() {
        console.log('Ready to use');

        kafkaAvro.getProducer({
                'debug': 'all'
            })
            .then(producer => {
                let topicName = 'test.topic';

                let topic = producer.Topic(topicName, {
                    'request.required.acks': 1
                });

                let value = new Buffer('{ "name": "Beethooven"}');
                let key = '"key"';

                let partition = -1;

                try {
                    producer.produce(topic, partition, value, key);
                } catch (e) {
                    console.error('A problem occurred when sending our message');
                    console.error(e);
                }
            })
    });

Thanks

avsc and the readiness of kafka-avro

Hi,

I am using the node-rdkafka with avsc for avro but that doesn't seem to work, I was curious to know the status and readiness of kafka-avro, can you kindly let me know.

Many thanks,
D Tanna

SchemaRegistry.prototype.init() loads all the latest schemas into memory

Problem
Currently with KafkaAvro.prototype.init() is called it loads all the latest schemas in the Schema Registry into memory. This might be ok for smaller clusters, but it doesn't scale when you get into the 100's or 1000's of schemas in the Registry for several reasons:

  1. The most common use case for any application is to write to a few topics, not the majority of them.
  2. The Schema Registry service will be slammed on startup of an Node process, due to the REST API.
  3. (Really 2a) Doing 1000s of network requests for all the subjects in the Registry will cause slow down on a normally very quick starting Node process.

For these reasons I'd like to suggest a change that the KafkaAvro constructor take an optional array of topics via the config that will be preloaded from the schema registry. If no array is passed then it defaults to the current load-all-the-schemas initialization.

I'm working on a PR to implement what I suggested, but wanted to raise the Issue now to see if there were any thoughts on this approach?

Proposed Solution Example

let kafkaAvro = new KafkaAvro({
    kafkaBrokers: ....,
    schemaRegistry: ....,
    topics: ['myTopicV1', 'myTopicV2']
});

// kafkaAvro.init() then kafkaAvro.getProducer()/.getConsumer(); ....

// There would also be an async method to allow loading topics from the SR after instantiation
kafkaAvro.loadTopics(['myTopicV3, ....']).then(() => {
    // do something with the new topics
});

Example consumer doesn't work

Hi,
I have a topic that is populated, called heartbeat. It contains this example data:

[
  {
    "topic": "heartbeat",
    "key": "aaaa",
    "value": "{\"deviceId\":\"aaaa\",\"timestamp\":\"2020-06-01T09:36:08\"}",
    "partition": 0,
    "offset": 0
  },
...
]

But when I run your example, and just change the name, it never consumes, only prints Ready to use. Even when I push live data onto the topic, nothing shows. What could be going wrong?

const KafkaAvro = require('kafka-avro');

const kafkaAvro = new KafkaAvro({
	kafkaBroker: 'localhost:9092',
	schemaRegistry: 'http://localhost:8081',
});

// Query the Schema Registry for all topic-schema's
// fetch them and evaluate them.
kafkaAvro.init()
	.then(function () {
		console.log('Ready to use');
	}).then(() => {
		kafkaAvro.getConsumer({
			'group.id': 'librd-test3',
			'socket.keepalive.enable': true,
			'enable.auto.commit': false,
			'auto.offset.reset': 'earliest'
		})
			// the "getConsumer()" method will return a bluebird promise.
			.then(function (consumer) {
				// Perform a consumer.connect()
				return new Promise(function (resolve, reject) {
					consumer.on('ready', function () {
						resolve(consumer);
					});

					consumer.connect({}, function (err) {
						if (err) {
							reject(err);
							return;
						}
						resolve(consumer); // depend on Promises' single resolve contract.
					});
				});
			})
			.then(function (consumer) {
				// Subscribe and consume.
				const topicName = 'heartbeat';
				consumer.subscribe([topicName]);
				consumer.consume();
				consumer.on('data', function (rawData) {
					console.log('data:', rawData);
				});
			});
	})

Investigate "Maximum call stack size exceeded" on Magic Byte

This appeared out of the blue, with a connected consumer but no actual topics defined to be consumed.

22:06:03.519Z ERROR waldo-core-api: GQL Error message -> Maximum call stack size exceeded
22:06:03.519Z ERROR waldo-core-api:
  GQL Error message -> RangeError: Maximum call stack size exceeded
      at Buffer.Uint8Array (native)
      at FastBuffer (buffer.js:11:5)
      at Buffer.slice (buffer.js:811:10)
      at allocate (buffer.js:173:23)
      at Function.Buffer.allocUnsafe (buffer.js:141:10)
      at new Buffer (buffer.js:78:19)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:23:13)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:30:22)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:30:22)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:30:22)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:30:22)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:30:22)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:30:22)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:30:22)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:30:22)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:30:22)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:30:22)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:30:22)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:30:22)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:30:22)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:30:22)
      at Object.magicByte.toMessageBuffer (/app/node_modules/kafka-avro/lib/magic-byte.js:30:22)

Why support producing non-Avro encoded messages?

First, thanks for putting this project together and fighting for transform interceptors in node-rdkafka. It is unfortunate that they see that as a scope-expanding feature and instead would rather have people fully wrap their extremely instrumented APIs.

Now to my question... I'm trying to understand the design decision around allowing producing to topics that weren't registered in the schema-registry? (Here) I really want to use this project as the underlying NodeJS Kafka client library for my own business-logic wrapped client library. Unfortunately, requiring Avro schemas for every topic is a requirement and I just feel a bit odd creating yet-another-producer-wrapper to undo that section of logic.

Producing messages doesn't encode magic byte and schema id

Using [email protected], I have a topic called test, I'm using the RecordNameStrategy and have registered a subject test in the schema registry.

Producing my message:

class test { }

const producer = await kafkaAvro.getProducer()
const data = new test()
data.long_field = 10
producer.produce('test', -1, data, 'key');

Produces the message but without the encoded schema.
The problem seem to stem from the fact that kafka-avro seems opinionated about the subject name (and I didn't see it mentioned in the docs). It seems to expect all topics to either end with -value or -key:

const parts = schemaTopic.split('-');

If it doesn't match that, it falls back and puts it in the keys schemas and doesn't put it in the values schemas:

if (schemaObj.schemaType.toLowerCase() === 'value') {

And then when it tries to produce the message it won't find the schema as it's not in the values schemas dictionary and fail to encode it.

Schema validation on publish?

I am not clear how this is done. I poked through the code, and it looks like you get the schema for the topic, but I don't see how the validation is done exactly?

Thanks in advance.

Allow topics and subjects to be different

I just want to discuss about that before creating a PR. You're assuming that topics match subjects, which is the most common approach but is not always the case. In our app for instance we have topics per cities where we sent the same kind of messages, some people do it with partitions but we choose that approach for some reasons.

In your implementation it's assumed that topic and subject are the same. Do you think it would be worth to implement and interface like

producer.produce(topic, partition, subject, value, key);

socket hangup error

SO, I am getting this when I connect:

{ Error: socket hang up
    at createHangUpError (_http_client.js:313:15)
    at Socket.socketOnEnd (_http_client.js:409:23)
    at Socket.emit (events.js:187:15)
    at endReadableNT (_stream_readable.js:1086:12)
    at process._tickCallback (internal/process/next_tick.js:63:19)
  code: 'ECONNRESET',
  config: 
   { adapter: [Function: httpAdapter],
     transformRequest: { '0': [Function: transformRequest] },
     transformResponse: { '0': [Function: transformResponse] },
     timeout: 0,
     xsrfCookieName: 'XSRF-TOKEN',
     xsrfHeaderName: 'X-XSRF-TOKEN',
     maxContentLength: -1,
     validateStatus: [Function: validateStatus],
     headers: 
      { Accept: 'application/json, text/plain, */*',
        'User-Agent': 'axios/0.15.3' },
     method: 'get',
     url: 'http://10.3.0.42:18082/subjects',
     data: undefined },
  response: undefined }

My code looks like:

var KafkaAvro = require('kafka-avro');
var options = {
	  'group.id': 'kafka',
	  'ssl.ca.location': "./secrets/ca.crt",
	  'ssl.certificate.location': "./secrets/cert.pem",
	  'ssl.key.location': "./secrets/cert-key.pem",
	  'ssl.key.password': "changeit",
	  'security.protocol': "ssl",
	  'metadata.broker.list': "kafka-0.kafka.default:9093"
};

var topicName = 'actionable-event-topic';

var kafkaAvro = new KafkaAvro({
	kafkaBroker: 'kafka-0.kafka.default:9093',
	schemaRegistry: 'http://10.3.0.42:18082'
});

kafkaAvro.init().then(function() {
	console.log("Kafka Avro Ready to go!");
}).catch(err => {
	console.dir(err);
});

Not sure if this matters, but I am using SSL on the brokers

Kafka-avro refactoring

The actual code is outdated and has a particular way of functionality: it's different of other avro implementations (like python and java) - this already have been discussed in some issues.

Should we need to write a code using confluent's CachedSchemaRegistryClient strategy as is, and remove all these fetch strategies or keep the actual implementation using fetch?

Anyway, the code style and the examples can be improved to a more actual syntax, with async/await etc.

Our integration tests also need to be improved a lot, and we need some unit tests to safer features.

But to do all this, we need to do a completely rewrite of actual code, almost a "new project".

What's the best strategy to a 2.0.0 release? Now github supports WIP pull requests - maybe this can help us with this.

What do you think @thanpolas?

Missing timestamps on produced messages

Hi, and first off thanks for making this library.

Today I had a weird situation when I restarted my brokers and my producers to do an upgrade. After the cluster came back, I found that many of the messages being produced had -1 as their timestamp (using kafkacat). I am setting the timestamp specifically in the code:

producer.produce(topic, -1, value, key, eventTsMs);

I know that eventTsMs value is correct at this point, because the same variable also gets serialized inside the message value, and it is a normal timestamp in there.

Even long after the restart, one of the producers was creating null (-1) timestamped messages. Then I restarted it, and the timestamps went back to normal.

What do you think could have caused this? Should I be looking more at the broker or the producer? Is it likely to be in node-rdkafka? librdkafka?

Missing param in function magicByte.fromMessageBuffer

I find missing param "@param {avsc.Type}" in function magicByte.fromMessageBuffer.
Is it right?

In magic-byte.js

/**
 * Decode a confluent SR message with magic byte.
 *
 * @param {avsc.Type} type The topic's Avro decoder.
 * @param {Buffer} encodedMessage The incoming message.
 * @param {kafka-avro.SchemaRegistry} sr The local SR instance.
 * @return {Object} Object with:
 *   @param {number} schemaId The schema id.
 *   @param {Object} value The decoded avro value.
 */
magicByte.fromMessageBuffer = function (encodedMessage, sr) 

In kafka-consumer.js

/**
 * Deserialize an avro message.
 *
 * @param {avsc.Type} type Avro type instance.
 * @param {Object} message The raw message.
 * @param {boolean} isKey Whether the data is the key or value of the schema.
 * @return {Object} The deserialized object.
 */
Consumer.prototype.deserialize = function (type, message, isKey) {
  try {
    const deserializeType = isKey === true
      ? message.key
      : message.value;

    return magicByte.fromMessageBuffer(
      type,
      deserializeType,
      this.sr
    );
  } catch (err) {
    log.warn(`deserialize() :: Error deserializing on topic ${message.topic}`,
      'Raw value:', message.value, `Partition: ${message.partition} Offset:`,
      `${message.offset} Key: ${message.key} Exception:`, err);
    return null;
  }
};

Please beta test 0.8.0

Need beta testers for the new 0.8.0-beta.1 version which has been published on npm with the tag beta.

Changelog

  • v0.8.0-beta.1, 09 Nov 2017
    • Provides option to fetch all past versions of a topic (thank you CMTegner.
    • Provides option to select which topics should be fetched.

New features

New options when instantiating kafka-avro:

  • topics Array of Strings You may optionally define specific topics to be fetched by kafka-avro vs fetching schemas for all the topics which is the default behavior.
  • fetchAllVersions Boolean Set to true to fetch all versions for each topic, use it when updating of schemas is often in your environment.

Test Cases

  1. Run kafka-avro as you normally would, the default behavior is to fetch the latest schema for all topics.
  2. Enable the fetchAllVersions option and produce on an older version from an external producer, see if the message gets properly parsed.
  3. Define specific topics to be fetched using the topics option, check that only specific topics are fetched.
  4. Use fetchAllVersions in combination with topics and check expected results (legacy producer + limited topics).

I am sorry I cannot run those tests, as mentioned on #3 testing is dodgy with the current tooling and I have been long gone from the company that required this library so my local setup isn't up to speed.

Once this version checks out I will proceed with publishing proper.

The 0.8.0-beta.1 version is on the schema-fetch-updates branch.

ping @giannisp @codeburke @CMTegner

Update circleCI to 2.0

CircleCI 1.0 will be deprecated in 08/31. The integration tests must be fixed before this date.

node-rdkafka version

Hi,

I see you use the version 0.7.0-ALPHA.3 of node-rdkafka whereas the actual one is 2.0.0.

Do you plan to update your kafka-avro to deal with this new version of node-rdkafka or at least with the v1 ?

Key not being serialized

Is there any reason of key not being serialized, since python kafka avro module serializes both key and value whereas this node module serializes only value

Error while JSON.parse when we have no topic schema

When we don't have our requested topic in this.sr.valueSchemas we can't decode message.value and JSON.parse throwing error.
This is lib/kafka-consumer.js file:

if (!this.sr.valueSchemas[message.topic]) {
      log.warn('_onWrapper() :: Warning, consumer did not find topic on SR:',
        message.topic);

      message.parsed = JSON.parse(message.value.toString('utf-8'));

      cb(message);
      return;
    }

I think it is not so critical to stop process and suggest next code:

if (!this.sr.valueSchemas[message.topic]) {
      log.warn('_onWrapper() :: Warning, consumer did not find topic on SR:',
        message.topic);

      try {
        message.parsed = JSON.parse(message.value.toString('utf-8'));
      } catch (error) {
        console.warn('Warning, consumer did not find topic on SR: ', message.topic);
      }

      return;
    }

Custom parameter on fields?

Hi,

I am working on a new project that will use kafka with avro schemas. The lib you have developed here looks great! This is not an issue report - just a query/idea.

Now, in addition to using schemas I am considering to encrypt some of the data before it goes into kafka (i.e. some fields). And that is of course easy to do in both producer and consumer of certain topics, but my instinct says that: “Hmm, if we could define in the fields array of the schema definition which fields that should be encrypted/decrypted this could be “automatic””.

I’m new to a lot of the details here, but skimmed the http://avro.apache.org/docs/current/spec.html and did not see any obvious ways/methods to add additional/optional parameters to a field.

Ideally, since we’re contemplating a per record id encryption, one would be able to say something like:
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "secretmessage", "type": "string", "encryptionKeyFieldName": "id"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}

I.e. the data in “secretmessage” is encrypted and the value of the “encryptionKeyFieldname” is “id” so effectively we have an symmetric encryption key somewhere in the system that is a key->value from user.id to key. The name “encryptionKeyFieldname” is not good, but just to show/explain.

Has anyone thought about this before? Any ways of adding such details to the schema without breaking the avro standard? All input is appreciated.

Invalid string undefined error when producing a message

Hi,

I've updated this issue (#24 ) with more info. It's not because the topic is not getting created. It's because of an error in the magic-bytes.js code. Was hoping someone familiar with it would know right off the bat what could be going on.

undefined:1 error upon consuming first avro message

If the topic being consumed is recently created, the first message will always cause the application to crash. This seems to happen because the schema-registry will only have the first version of the schema after a message is sent to the topic (therefore, if the node server is up before there is anything on the topic, it will not be able to load anything, not even schema changes that comes afterwards)
The error itself occurs when trying to JSON.parse an avro message (kafka-consumer.js:109) since the updated schema was not found.

Is there any way to go around this issue? From what I understood, there is only one instance where the schemas are loaded (init), and no way to update them after the initial load is done. Or am I missing something?

Allow for configureable record name for produced messages

Hi,

We have our subject value naming strategy as TopicRecordNameStrategy but our record names are namespaced so they are formatted with periods like com.etc.etc.RecordName. In the subject naming strategy module, it looks like the record name is being taken from the constructor name, but class names can't have periods in them. Would it possible to open that to be able to specify a record name to compose the subject name?

schema-registry.schemaMeta is unpredictable

Hi,

When schema-registry load a schema it had them into this.schemaMeta, keyed by the topic name schemaObj.topic. Unfortunately the content of schemaMeta is unpredictable if some topics also have a schema for their key.
Because 2 schemas are loaded for each topic - one for the value and one for the key - each of them are added to this.schemaMeta with the same topic name, so only the last one wins.

As this.schemaMeta doesn't seem to be used by kafka-avro I suggest to only add value schema in this.schemaMeta: this is what it's suggested by the comment on this.schemaMeta .

librdkafka.sp.1 not found

I was trying to use kafka-avro for the first time but just requiring the package fails with this error message:

Code:
const KafkaAvro = require('kafka-avro');

Error:
Error: librdkafka.so.1: cannot open shared object file: No such file or directory
at Object.Module._extensions..node (module.js:664:18)
at Module.load (module.js:554:32)
at tryModuleLoad (module.js:497:12)
at Function.Module._load (module.js:489:3)
at Module.require (module.js:579:17)
at require (internal/module.js:11:18)

Passing no key fails

From what I you should be able to NOT pass a key to kafka and still have it create a message. Currently what happends if i don't pass anything (or pass an empty string) is that the produce fails.

In the code we pass null the the node-rdkafak produce method and that is not allowed (see https://blizzard.github.io/node-rdkafka/current/Producer.html#produce). I propse that we alway call _serializeType for the key (at

var sendKey = key
? this._serializeType(topicName, true, key)
: null;
) so even if the key is not defined we produce a empty buffer.

Where is validation actually done?

Hi all,
I read in the docs that kafka-avro serialises and validates the data. So from your example I don't see anything stopping any invalid message from being sent:

kafkaAvro.getProducer({
  // Options listed bellow
})
    // "getProducer()" returns a Bluebird Promise.
    .then(function(producer) {
        var topicName = 'test';

        producer.on('disconnected', function(arg) {
          console.log('producer disconnected. ' + JSON.stringify(arg));
        });

        var value = {name:'John'};
        var key = 'key';

        // if partition is set to -1, librdkafka will use the default partitioner
        var partition = -1;
        producer.produce(topicName, partition, value, key);
    })

I registered avro files to Schema Registry but whatever I send as value is being sent to the topic. Is there a use case where .produce method will actually fail or return error because data doesn't match the schema? Or I have basic misunderstanding of this..
Thanks in advance!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.