Code Monkey home page Code Monkey logo

siddhi-map-avro's Introduction

Siddhi Map Avro

Jenkins Build Status GitHub Release GitHub Release Date GitHub Open Issues GitHub Last Commit License

The siddhi-map-avro extension is an extension to Siddhi that converts AVRO messages to/from Siddhi events.

For information on Siddhi and it's features refer Siddhi Documentation.

Download

  • Versions 2.x and above with group id io.siddhi.extension.* from here.
  • Versions 1.x and lower with group id org.wso2.extension.siddhi.* from here.

Latest API Docs

Latest API Docs is 2.2.4.

Features

  • avro (Sink Mapper)

    This extension is a Siddhi Event to Avro Message output mapper.Transports that publish messages to Avro sink can utilize this extension to convert Siddhi events to Avro messages.
     You can either specify the Avro schema or provide the schema registry URL and the schema reference ID as parameters in the stream definition.
    If no Avro schema is specified, a flat Avro schema of the 'record' type is generated with the stream attributes as schema fields.

  • avro (Source Mapper)

    This extension is an Avro to Event input mapper. Transports that accept Avro messages can utilize this extension to convert the incoming Avro messages to Siddhi events.
     The Avro schema to be used for creating Avro messages can be specified as a parameter in the stream definition.
     If no Avro schema is specified, a flat avro schema of the 'record' type is generated with the stream attributes as schema fields.
    The generated/specified Avro schema is used to convert Avro messages to Siddhi events.

Dependencies

There are no other dependencies needed for this extension.

Installation

For installing this extension on various siddhi execution environments refer Siddhi documentation section on adding extensions.

Support and Contribution

  • We encourage users to ask questions and get support via StackOverflow, make sure to add the siddhi tag to the issue for better response.

  • If you find any issues related to the extension please report them on the issue tracker.

  • For production support and other contribution related information refer Siddhi Community documentation.

siddhi-map-avro's People

Contributors

anugayan avatar charukak avatar dependabot[bot] avatar dilini-muthumala avatar dnwick avatar heshansudarshana avatar lasanthas avatar maheshika avatar malinthar avatar minudika avatar mohanvive avatar niveathika avatar pcnfernando avatar ramindu90 avatar rukshiw avatar senthuran16 avatar suhothayan avatar sujanan avatar tikiri16 avatar tishan89 avatar wso2-jenkins-bot avatar

Stargazers

 avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

siddhi-map-avro's Issues

Schema definition

Description:

Switching from design view to source view after changing any element causes the conversion of triple-quotes into single ones in the avro scheme definition.

Affected Product Version:
v1.0.67

Avro schema find by name

Description:
I run into problem in my project. I recently implement schema registry to store all my auto generated schema and also use wso2 to read from kafka. But I can only search by schema id. I miss search by name option
wso2-schemaregistry

Suggested Labels:
There is option in schema registry api to search by name

curl -X GET http://localhost:8081/subjects/<SUBJECT_NAME>/versions/latest

https://docs.confluent.io/platform/current/schema-registry/develop/using.html

Suggested Assignees:

Affected Product Version:

OS, DB, other environment details and versions:

Steps to reproduce:

Related Issues:

Creation of Siddhi app with AvroMapper returns ClassNotFoundException:feign.FeignException

Description:

Following is the stack trace printed at the Siddhi app creation time

     [echo] CustomMapping will be overridden if eventDefinition is given.
     [echo] If eventDefinition is given, its mandatory to give a file path to the events as well.
     [echo] Following mentioned values will be applied if no value is given for optional values.
     [echo] Configure
     [echo]             -bootstrapServerList=localhost:9092(Optional)
     [echo]             -DtopicName=kafka_sample_topic(Optional)
     [echo]             -DpartitionNo=(Optional)
     [echo]             -DsequenceId=(Optional)
     [echo]             -Dkey=(Optional)
     [echo]             -DoptionalConfiguration=(Optional)
     [echo]             -DisBinaryMessage=false(Optional)
     [echo]             -Dtype=json(Optional) -Ddelay=1000(Optional) -DcustomMapping=false/true(Optional)
     [echo]             -DeventDefinition='{"customItem":{"address":"{0}","age":{1},"country":"{2}"}}'(Optional)
     [echo]             -DfilePath=(Optional)
     [echo]             -DcontinuouslyReadFile=false(optional [if noOfEventsToSend larger than events in file, make it true])
     [echo]             -DnoOfEventsToSend=5(Optional)
     [echo]         
     [java] [io.siddhi.distribution.sample.kafka.client.KafkaClient] : Initialize kafka producer client.
     [java] Exception in thread "main" io.siddhi.core.exception.SiddhiAppCreationException: Error on 'KafkaSink' @ Line: 19. Position: 18, near '@sink(type='kafka',
     [java] topic='kafka_avro_test',
     [java] bootstrap.servers='localhost:9092',
     [java] group.id='group',
     [java] is.binary.message='true',
     [java] @map(type='avro',schema.def="""
     [java]             {
     [java] 	            "type": "record",
     [java] 	            "name": "sweet",
     [java] 	            "namespace": "sweet.material",
     [java] 	            "fields": [{
     [java] 		            "name": "name",
     [java] 		            "type": "string"
     [java] 	            }, {
     [java] 		            "name": "amount",
     [java] 		            "type": "double"
     [java] 		        }]
     [java]             }"""))'. feign/FeignException
     [java] 	at io.siddhi.core.util.ExceptionUtil.populateQueryContext(ExceptionUtil.java:43)
     [java] 	at io.siddhi.core.util.parser.helper.DefinitionParserHelper.addEventSink(DefinitionParserHelper.java:588)
     [java] 	at io.siddhi.core.util.SiddhiAppRuntimeBuilder.defineStream(SiddhiAppRuntimeBuilder.java:119)
     [java] 	at io.siddhi.core.util.parser.SiddhiAppParser.defineStreamDefinitions(SiddhiAppParser.java:374)
     [java] 	at io.siddhi.core.util.parser.SiddhiAppParser.parse(SiddhiAppParser.java:230)
     [java] 	at io.siddhi.core.SiddhiManager.createSiddhiAppRuntime(SiddhiManager.java:85)
     [java] 	at io.siddhi.core.SiddhiManager.createSiddhiAppRuntime(SiddhiManager.java:95)
     [java] 	at io.siddhi.distribution.sample.kafka.client.KafkaClient.main(Unknown Source)
     [java] Caused by: java.lang.NoClassDefFoundError: feign/FeignException
     [java] 	at java.lang.Class.getDeclaredConstructors0(Native Method)
     [java] 	at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
     [java] 	at java.lang.Class.getConstructor0(Class.java:3075)
     [java] 	at java.lang.Class.newInstance(Class.java:412)
     [java] 	at io.siddhi.core.util.SiddhiClassLoader.loadClass(SiddhiClassLoader.java:32)
     [java] 	at io.siddhi.core.util.SiddhiClassLoader.loadExtensionImplementation(SiddhiClassLoader.java:48)
     [java] 	at io.siddhi.core.util.parser.helper.DefinitionParserHelper.addEventSink(DefinitionParserHelper.java:501)
     [java] 	... 6 more
     [java] Caused by: java.lang.ClassNotFoundException: feign.FeignException
     [java] 	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
     [java] 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
     [java] 	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
     [java] 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
     [java] 	... 13 more
     [java] Java Result: 1


Suggested Labels:
dependency issue

Affected Product Version:
2.0.2

Avro Map not compatible with Kafka I/O due to retention of schema registry header

Description:
The Avro Map extension is often referenced in conjunction with Siddhi's extnesion for Kafka. Unfortunately none of the available examples for using these together seem to work!
The culprit appears to be that Confluent's support for Avro in Kafka utiilizes an interaction with its Schema Registry to assign a schema ID to each unique schema it finds, and prefixes every message it stores in Kafka with its schema ID in a 5-byte prefix just before every Avro-seraiilized message follows. Those five bytes are not stripped away before this mapper is invoked, and the extra 5 byte header throws the binary decoding off kilter.

It is possible to reproduce the same incorrect decoding by taking a message from an Avro-enabled topic and get the identical wrong result just by retaining the 5-byte header, and then also to cure the problem by removing those five out-of-band header bytes.

Affected Product Version:
2.0.6

OS, DB, other environment details and versions:
MacOS Mojave (10.14.4) running with Kafka 2.3.1's Docker Images on Docker Desktop Community 2.1.0.4

Steps to reproduce:

  1. Save this avro schema to sample.avsc:
{"namespace":"sample.avro","type":"record","name":"avro","fields":[{"name":"name","type":"string"},{"name":"amount","type":"double"}]}
  1. Save an example document in JSON to sample.json:
{"name":"exnobhbyaitkfiayoyipianxshgkofnmpkef","amount":0.9629494268310558}
  1. Launch docker container with Kafka exposed on port 9092 and Schema Registry on 8081. The following docker-compose works for that purpose:
version: '3.6'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.3.1
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888
    environment:
     - ZOOKEEPER_CLIENT_PORT=2181
     - ZOOKEEPER_SERVER_ID=1
     - ZOOKEEPER_TICK_TIME=2000
     - ZOOKEEPER_SYNC_LIMIT=2
  kafka:
    image: confluentinc/cp-kafka:5.3.1
    ports:
     - 9092:9092
    links:
     - zookeeper
    environment:
     - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
     - KAFKA_KAFKA_HOST_NAME=kafka
     - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
     - KAFKA_BROKER_ID=1
     - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:19092,LISTENER_HOST://0.0.0.0:9092
     - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:19092,LISTENER_HOST://localhost:9092
     - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,LISTENER_HOST:PLAINTEXT
     - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
  schema-registry:
    image: confluentinc/cp-schema-registry:5.3.1
    ports:
     - 8081:8081
    environment:
     - SCHEMA_REGISTRY_HOST_NAME=schema-registry
     - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=PLAINTEXT://kafka:19092
     - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
     - SCHEMA_REGISTRY_DEBUG=false
    links:
     - zookeeper
     - kafka
  1. Load our message to a topic:
cat sample.json | kafka-avro-console-producer --topic "kafka_result_topic" --broker-list localhost:9092 --property schema.registry.url=http://localhost:8081  --property value.schema=`cat sample.avsc`
  1. Create and run the following Siddhi app
@App:name('KafkaSample')
@sink(type='log')
define stream logStream(name string, amount double);

@source(type='kafka', bootstrap.servers='localhost:9092', topic.list='kafka_result_topic', group.id='test', threading.option='single.threaded', is.binary.message='true', @map(type='avro', schema.def = '{"namespace":"sample.avro","type":"record","name":"avro","fields":[{"name":"name","type":"string"},{"name":"amount","type":"double"}]}' 
define stream LowProductionAlertStream(name string, amount double);

from LowProductionAlertStream
select *
insert into logStream;
  1. Observe the incorrect decoding in Siddhi's output log:
     [java] [io.siddhi.core.stream.output.sink.LogSink] : KafkaSample : logStream : Event{timestamp=1573734234725, data=[, 1.410940917531979E224], isExpired=false}
  1. .Extract the raw message from Kakfa
 kafka-console-consumer --topic kafka_result_topic --offset 0 --partition 0 --max-messages 1 --bootstrap-server localhost:9092 > onemsg.dat
  1. Confirm that the retention of Confluent's 5 byte header for Schema Registry yields the same incorrect result:
    avro-tools fragtojson --schema-file sample.avsc onemsg.dat
    Observe that the name is similarly absent and the amount is still 1.410940917531979E224, not the expected 0.9629494268310558
  2. Remove the five byte header and try again
old_sz=$(echo `wc -c onemsg.dat | awk '{print $1}'`)
new_sz = $(echo "$old_sz - 5" | bc)
sh -c "tail -${s}c onemsg.dat" > trunc.dat
avro-tools fragtojson --schema-file sample.avsc onemsg.dat

Observe that this time the 5-byte-shorter message deserialized too our original input was!!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.