Description:
The Avro Map extension is often referenced in conjunction with Siddhi's extnesion for Kafka. Unfortunately none of the available examples for using these together seem to work!
The culprit appears to be that Confluent's support for Avro in Kafka utiilizes an interaction with its Schema Registry to assign a schema ID to each unique schema it finds, and prefixes every message it stores in Kafka with its schema ID in a 5-byte prefix just before every Avro-seraiilized message follows. Those five bytes are not stripped away before this mapper is invoked, and the extra 5 byte header throws the binary decoding off kilter.
It is possible to reproduce the same incorrect decoding by taking a message from an Avro-enabled topic and get the identical wrong result just by retaining the 5-byte header, and then also to cure the problem by removing those five out-of-band header bytes.
Affected Product Version:
2.0.6
OS, DB, other environment details and versions:
MacOS Mojave (10.14.4) running with Kafka 2.3.1's Docker Images on Docker Desktop Community 2.1.0.4
Steps to reproduce:
- Save this avro schema to sample.avsc:
{"namespace":"sample.avro","type":"record","name":"avro","fields":[{"name":"name","type":"string"},{"name":"amount","type":"double"}]}
- Save an example document in JSON to
sample.json
:
{"name":"exnobhbyaitkfiayoyipianxshgkofnmpkef","amount":0.9629494268310558}
- Launch docker container with Kafka exposed on port 9092 and Schema Registry on 8081. The following docker-compose works for that purpose:
version: '3.6'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.3.1
ports:
- 2181:2181
- 2888:2888
- 3888:3888
environment:
- ZOOKEEPER_CLIENT_PORT=2181
- ZOOKEEPER_SERVER_ID=1
- ZOOKEEPER_TICK_TIME=2000
- ZOOKEEPER_SYNC_LIMIT=2
kafka:
image: confluentinc/cp-kafka:5.3.1
ports:
- 9092:9092
links:
- zookeeper
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_KAFKA_HOST_NAME=kafka
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_BROKER_ID=1
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:19092,LISTENER_HOST://0.0.0.0:9092
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:19092,LISTENER_HOST://localhost:9092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,LISTENER_HOST:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
schema-registry:
image: confluentinc/cp-schema-registry:5.3.1
ports:
- 8081:8081
environment:
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=PLAINTEXT://kafka:19092
- SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
- SCHEMA_REGISTRY_DEBUG=false
links:
- zookeeper
- kafka
- Load our message to a topic:
cat sample.json | kafka-avro-console-producer --topic "kafka_result_topic" --broker-list localhost:9092 --property schema.registry.url=http://localhost:8081 --property value.schema=`cat sample.avsc`
- Create and run the following Siddhi app
@App:name('KafkaSample')
@sink(type='log')
define stream logStream(name string, amount double);
@source(type='kafka', bootstrap.servers='localhost:9092', topic.list='kafka_result_topic', group.id='test', threading.option='single.threaded', is.binary.message='true', @map(type='avro', schema.def = '{"namespace":"sample.avro","type":"record","name":"avro","fields":[{"name":"name","type":"string"},{"name":"amount","type":"double"}]}'
define stream LowProductionAlertStream(name string, amount double);
from LowProductionAlertStream
select *
insert into logStream;
- Observe the incorrect decoding in Siddhi's output log:
[java] [io.siddhi.core.stream.output.sink.LogSink] : KafkaSample : logStream : Event{timestamp=1573734234725, data=[, 1.410940917531979E224], isExpired=false}
- .Extract the raw message from Kakfa
kafka-console-consumer --topic kafka_result_topic --offset 0 --partition 0 --max-messages 1 --bootstrap-server localhost:9092 > onemsg.dat
- Confirm that the retention of Confluent's 5 byte header for Schema Registry yields the same incorrect result:
avro-tools fragtojson --schema-file sample.avsc onemsg.dat
Observe that the name is similarly absent and the amount is still 1.410940917531979E224, not the expected 0.9629494268310558
- Remove the five byte header and try again
old_sz=$(echo `wc -c onemsg.dat | awk '{print $1}'`)
new_sz = $(echo "$old_sz - 5" | bc)
sh -c "tail -${s}c onemsg.dat" > trunc.dat
avro-tools fragtojson --schema-file sample.avsc onemsg.dat
Observe that this time the 5-byte-shorter message deserialized too our original input was!!