Code Monkey home page Code Monkey logo

Comments (12)

Ronniexie avatar Ronniexie commented on August 20, 2024

I need to start Schema Registry before I running the kafka-connect-mongodb?

from kafka-connect-mongodb.

tony-lijinwen avatar tony-lijinwen commented on August 20, 2024

Hi Ronniexie,

Did you know how to use the jar? If you know, please help to explain it as clear as possible. I am a newer for Kafka.

Best regards,
Tony

from kafka-connect-mongodb.

patelliandrea avatar patelliandrea commented on August 20, 2024

@Ronniexie yes, the schema registry has to be started beforehand. You need to start Zookeeper, Kafka and then the schema registry.
When everything is running, the easiest way for starting the connector is copying the jar with dependencies in the folder confluent/share/java/confluent-common and then run a distributed connector. Once the distributed connector is running, you can simply push a configuration for starting the mongo connector.

Unfortunately the connnector works only with confluent 2 since I didn't have time to update it to work with the new version, as soon as I have time I'll update it.

from kafka-connect-mongodb.

tony-lijinwen avatar tony-lijinwen commented on August 20, 2024

@patelliandrea, thanks for your reply, but when I follow the above steps, I got the following errors,

org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:319)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:266)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
        at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
        at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
Caused by: org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x10c7661(above 10ffff)  at char #1, byte #7)
Caused by: java.io.CharConversionException: Invalid UTF-32 character 0x10c7661(above 10ffff)  at char #1, byte #7)
        at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:189)
        at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:150)
        at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.loadMore(ReaderBasedJsonParser.java:153)
        at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:1854)
        at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:571)
        at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3604)
        at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3549)
        at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2161)
        at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:317)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:266)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
        at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
        at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
Exception in thread "WorkerSinkTask-mongodb-sink-connector-0" org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:319)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:266)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
        at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
        at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
Caused by: org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x10c7661(above 10ffff)  at char #1, byte #7)
Caused by: java.io.CharConversionException: Invalid UTF-32 character 0x10c7661(above 10ffff)  at char #1, byte #7)
        at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:189)
        at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:150)
        at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.loadMore(ReaderBasedJsonParser.java:153)
        at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:1854)
        at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:571)
        at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3604)
        at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3549)
        at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2161)
        at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:317)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:266)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
        at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
        at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)

My steps is as below,

1 Use the kafka-avro-console-producer to add the schema,

./bin/kafka-avro-console-producer \
             --broker-list localhost:9094 --topic test2 \
             --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

{"f1": "value1"}

2 The configure of connect-file-sink.properties is as below,

name=mongodb-sink-connector
connector.class=org.apache.kafka.connect.mongodb.MongodbSinkConnector
tasks.max=3
host=127.0.0.1
port=27017
bulk.size=100
mongodb.database=fafa
mongodb.collections=fafa123
topics=test2

3. The configure of connect-standalone.properties is as below,

bootstrap.servers=localhost:9094

# If I set the following values as true, it always report "JsonDeserializer with schemas.enable requires #\"schema\" and \"payload\" fields and may not contain additional fields", even the message of the #topics is as: '{"schema" : {"type" : "struct","optional" : false,"fields" : [{"type" : "string","optional" : false,"field" : "company"}]},"payload" : {"company": "debezium", }}', so I set them as false
key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

4. Then I use the following command to test the connector,

./bin/connect-standalone  ./etc/kafka/connect-standalone.properties   ./etc/kafka/connect-file-sink.properties

Do I need to do more configure?

from kafka-connect-mongodb.

tony-lijinwen avatar tony-lijinwen commented on August 20, 2024

I resolved it, that's because I use the incorrect worker properties, I should use the following command:

./bin/connect-standalone  ./etc/schema-registry/connect-avro-standalone.properties   ./etc/kafka/connect-file-sink.properties

But, I have another question, can I use this in kafka without confluent?

from kafka-connect-mongodb.

patelliandrea avatar patelliandrea commented on August 20, 2024

Yes you can, you have to use

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

instead of the AvroConverter

from kafka-connect-mongodb.

xuxu0728 avatar xuxu0728 commented on August 20, 2024

hi,patelliandrea.Does it support confulent 3 now?

from kafka-connect-mongodb.

xuxu0728 avatar xuxu0728 commented on August 20, 2024

hi,patelliandrea! When I run it use your and Tony's method,I found the error:
[2017-08-11 17:30:21,473] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:100)
java.lang.NullPointerException
at org.apache.kafka.connect.connector.Connector.validate(Connector.java:133)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:257)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:157)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:94)
Please!
Ps:My confluent's version is 3.3.0.

from kafka-connect-mongodb.

aiodsunil avatar aiodsunil commented on August 20, 2024

I am getting same error
java.lang.NullPointerException
at org.apache.kafka.connect.connector.Connector.validate(Connector.java:133)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:257)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:157)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:94)

can someone help on this please ..using confluent's version is 3.3.0

from kafka-connect-mongodb.

r14152 avatar r14152 commented on August 20, 2024

Whenever i run mongodb source connector its show this error :
[2017-11-16 06:11:54,968] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser:109)
[2017-11-16 06:11:54,968] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser:110)
[2017-11-16 06:11:54,975] ERROR WorkerSourceTask{id=mongodb-source-connector-0} Task failed initialization and will not be started. (org.apache.kafka.connect.runtime.WorkerSourceTask:126)
java.lang.ClassCastException: Non-string value found in original settings for key converter.class: null
at org.apache.kafka.common.config.AbstractConfig.originalsStrings(AbstractConfig.java:165)
at org.apache.kafka.connect.runtime.WorkerSourceTask.initialize(WorkerSourceTask.java:124)
at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:405)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.createConnectorTasks(StandaloneHerder.java:278)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.updateConnectorTasks(StandaloneHerder.java:302)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:183)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:102)
[2017-11-16 06:11:54,979] INFO [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:341)
[20

from kafka-connect-mongodb.

AnthonyKoueik avatar AnthonyKoueik commented on August 20, 2024

Hello

i have kafka and mongoDb installed and running,
can anyone give me a small java class example to run to send messages from MyTopic to Mongo?
( I do not have Confluent , simply kafka)

thank you

from kafka-connect-mongodb.

raror3 avatar raror3 commented on August 20, 2024

Where do I see the logs for the kafka-connect-mongodb? as I kept the jar in lib directory in kafka and restarted the kafka instance. But new messages in Kafka are not getting sink-ed to Mongo?
As I only want to sink data in Mongo from Kafka - thus have only used the sink files in jar

from kafka-connect-mongodb.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.