Code Monkey home page Code Monkey logo

kafka-connect-mongodb's Introduction

Kafka Connect Mongodb

The connector is used to load data both from Kafka to Mongodb and from Mongodb to Kafka.

Building

You can build the connector with Maven using the standard lifecycle phases:

mvn clean
mvn package

Source Connector

When the connector is run as a Source Connector, it reads data from Mongodb oplog and publishes it on Kafka. 3 different types of messages are read from the oplog:

  • Insert
  • Update
  • Delete

For every message, a SourceRecord is created, having the following schema:

{
  "type": "record",
  "name": "schemaname",
  "fields": [
    {
      "name": "timestamp",
      "type": [
        "null",
        "int"
      ]
    },
    {
      "name": "order",
      "type": [
        "null",
        "int"
      ]
    },
    {
      "name": "operation",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "database",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "object",
      "type": [
        "null",
        "string"
      ]
    }
  ],
  "connect.name": "stillmongotesting"
}
  • timestamp: timestamp in seconds when the event happened
  • order: order of the event between events with the same timestamp
  • operation: type of operation the message represent. i: insert, u: update, d: delete
  • database: database in which the operation took place
  • object: inserted/updated/deleted object

Sample Configuration

name=mongodb-source-connector
connector.class=org.apache.kafka.connect.mongodb.MongodbSourceConnector
tasks.max=1
uri=mongodb://127.0.0.1:27017
batch.size=100
schema.name=mongodbschema
topic.prefix=optionalprefix
databases=mydb.test1,mydb.test2,mydb.test3
  • name: name of the connector
  • connector.class: class of the implementation of the connector
  • tasks.max: maximum number of tasks to create
  • uri: mongodb uri (required if host is not informed)
  • host: mongodb host (required if uri is not informed)
  • port: mongodb port (required if uri is not informed)
  • batch.size: maximum number of messages to write on Kafka at every poll() call
  • schema.name: name to use for the schema, it will be formatted as {schema.name}_{database}_{collection}
  • topic.prefix: optional prefix to append to the topic names. The topic name is formatted as {topic.prefix}_{database}_{collection}
  • converter.class: converter class used to transform a mongodb oplog in a kafka message. We recommend use org.apache.kafka.connect.mongodb.converter.JsonStructConverter, but due backward compatibility the default is org.apache.kafka.connect.mongodb.converter.StringStructConverter.
  • databases: comma separated list of collections from which import data

Sink Connector

When the connector is run as Sink, it retrieves messages from Kafka and writes them on mongodb collections. The structure of the written document is derived from the schema of the messages.

Sample Configuration

name=mongodb-sink-connector
connector.class=org.apache.kafka.connect.mongodb.MongodbSinkConnector
tasks.max=1
uri=mongodb://127.0.0.1:27017
bulk.size=100
mongodb.database=databasetest
mongodb.collections=mydb_test1,mydb_test2,mydb_test3
converter.class=org.apache.kafka.connect.mongodb.converter.JsonStructConverter
topics=optionalprefix_mydb_test1,optionalprefix_mydb_test2,optionalprefix_mydb_test3
  • name: name of the connector
  • connector.class: class of the implementation of the connector
  • tasks.max: maximum number of tasks to create
  • uri: mongodb uri (required if host is not informed)
  • host: mongodb host (required if uri is not informed)
  • port: mongodb port (required if uri is not informed)
  • bulk.size: maximum number of documents to write on Mongodb at every put() call
  • mongodb.database: database to use
  • mongodb.collections: comma separated list of collections on which write the documents
  • topics: comma separated list of topics to write on Mongodb

The number of collections and the number of topics should be the same.

kafka-connect-mongodb's People

Contributors

dgradl-fl avatar patelliandrea avatar sailxjx avatar spinatelli avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-connect-mongodb's Issues

List of Custom Schema

I am trying to change the library in a way that it gets only the object, but not as a String, instead I want the the record as JSON. Do you know how to create a Schema for Array where the elements are CustomSchema

This will give me a Schema for Array of String
SchemaBuilder.array(Schema.STRING_SCHEMA).schema();

I want the Same but for a custom schema.

Connector Task State remains Unassigned

I have integrated the connector with the confluent platform and it sets up fine but the task state remains at Unassigned. I have tried restarting, pausing and resuming the connector as well as integrating it from scratch again but the status remains as follows :

{
"name": "mongo",
"connector": {
"state": "RUNNING",
"worker_id": "192.168.1.120:8083"
},
"tasks": [
{
"state": "UNASSIGNED",
"id": 0,
"worker_id": "192.168.1.120:8083"
}
]
}

Can you tell me what the issue is here?

No _id when updating by query

I am using MongoDB as a source. When I update documents by query (e.g. with an update({amount: 100}, {$push: {values: 10}}), with or without the multi option), I cannot find the _ids of the updated documents in the messages produced by the connector. It looks like they are stored in an o2 field in the oplog.rs collection, while the messages only use the content of the o field.

In order to reproduce:

  • create an empty collection coll in a test database;
  • create a document such as {"label": "Label", "amount": 100, "values": [1]};
  • update it with db.getCollection('coll').update({amount: 100}, {$push: {values: 10}});

Notice that the document has been updated but there is no mention of its _id in the message, which looks like the following:

{ "schema": ...}, "payload": { "timestamp": 1522613398, "order": 1, "operation": "u", "database": "test.coll", "object": "{ \"$set\" : { \"values.1\" : 10.0 } }" } }

Can't build jar (tests fail)

I'm having trouble producing a jar to use, as mvm package is failing.

I've modified the port configuration in MongodbSourceConnectorTest.java and mongodb/MongodbSourceTaskTest.java to connect to local mongo, and ensured there is a rs0 replication group present in local mongo. I've also ensured there is a mydb database present.

It seems to be failing on the following line
https://github.com/DataReply/kafka-connect-mongodb/blob/master/src/test/java/org/apache/kafka/connect/mongodb/MongodbSourceTaskTest.java#L112

↪ mvn package
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building connect-mongodb 1.0
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ connect-mongodb ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /Users/kyle.white/universe/experimental/kafka/kafka-connect-mongodb/src/main/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.3:compile (default-compile) @ connect-mongodb ---
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ connect-mongodb ---
[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!
[INFO] skip non existing resourceDirectory /Users/kyle.white/universe/experimental/kafka/kafka-connect-mongodb/src/test/resources
[INFO]
[INFO] --- maven-compiler-plugin:3.3:testCompile (default-testCompile) @ connect-mongodb ---
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- maven-surefire-plugin:2.12.4:test (default-test) @ connect-mongodb ---
[INFO] Surefire report directory: /Users/kyle.white/universe/experimental/kafka/kafka-connect-mongodb/target/surefire-reports

-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running org.apache.kafka.connect.mongodb.MongodbSourceConnectorTest
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.077 sec
Running org.apache.kafka.connect.mongodb.MongodbSourceTaskTest
Aug 16, 2016 11:50:28 AM com.mongodb.diagnostics.logging.JULLogger log
INFO: Cluster created with settings {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
Aug 16, 2016 11:50:28 AM com.mongodb.diagnostics.logging.JULLogger log
INFO: Cluster created with settings {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
Aug 16, 2016 11:50:28 AM com.mongodb.diagnostics.logging.JULLogger log
INFO: Cluster created with settings {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
------------------------EXCEPTION-------------------------
java.lang.NullPointerException
        at org.apache.kafka.connect.mongodb.MongodbSourceTaskTest.testInsertWithNullOffsets(MongodbSourceTaskTest.java:112)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at junit.framework.TestCase.runTest(TestCase.java:176)
        at junit.framework.TestCase.runBare(TestCase.java:141)
        at junit.framework.TestResult$1.protect(TestResult.java:122)
        at junit.framework.TestResult.runProtected(TestResult.java:142)
        at junit.framework.TestResult.run(TestResult.java:125)
        at junit.framework.TestCase.run(TestCase.java:129)
        at junit.framework.TestSuite.runTest(TestSuite.java:252)
        at junit.framework.TestSuite.run(TestSuite.java:247)
        at org.junit.internal.runners.JUnit38ClassRunner.run(JUnit38ClassRunner.java:86)
        at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:252)
        at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:141)
        at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189)
        at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165)
        at org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85)
        at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:115)
        at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75)
Aug 16, 2016 11:50:28 AM com.mongodb.diagnostics.logging.JULLogger log
INFO: No server chosen by ReadPreferenceServerSelector{readPreference=primary} from cluster description ClusterDescription{type=UNKNOWN, connectionMode=SINGLE, all=[ServerDescription{address=localhost:27017, type=UNKNOWN, state=CONNECTING}]}. Waiting for 30000 ms before timing out
Aug 16, 2016 11:50:28 AM com.mongodb.diagnostics.logging.JULLogger log
INFO: No server chosen by ReadPreferenceServerSelector{readPreference=primary} from cluster description ClusterDescription{type=UNKNOWN, connectionMode=SINGLE, all=[ServerDescription{address=localhost:27017, type=UNKNOWN, state=CONNECTING}]}. Waiting for 30000 ms before timing out
Aug 16, 2016 11:50:28 AM com.mongodb.diagnostics.logging.JULLogger log
INFO: No server chosen by ReadPreferenceServerSelector{readPreference=primary} from cluster description ClusterDescription{type=UNKNOWN, connectionMode=SINGLE, all=[ServerDescription{address=localhost:27017, type=UNKNOWN, state=CONNECTING}]}. Waiting for 30000 ms before timing out
Aug 16, 2016 11:50:28 AM com.mongodb.diagnostics.logging.JULLogger log
INFO: Opened connection [connectionId{localValue:3, serverValue:5}] to localhost:27017
Aug 16, 2016 11:50:28 AM com.mongodb.diagnostics.logging.JULLogger log
INFO: Opened connection [connectionId{localValue:1, serverValue:3}] to localhost:27017
Aug 16, 2016 11:50:28 AM com.mongodb.diagnostics.logging.JULLogger log
INFO: Opened connection [connectionId{localValue:2, serverValue:4}] to localhost:27017
Aug 16, 2016 11:50:28 AM com.mongodb.diagnostics.logging.JULLogger log
INFO: Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=REPLICA_SET_GHOST, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 2, 8]}, minWireVersion=0, maxWireVersion=4, maxDocumentSize=16777216, roundTripTimeNanos=959849, setName='null', canonicalAddress=null, hosts=[], passives=[], arbiters=[], primary='null', tagSet=TagSet{[]}, electionId=null, setVersion=null}
Aug 16, 2016 11:50:28 AM com.mongodb.diagnostics.logging.JULLogger log
INFO: Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=REPLICA_SET_GHOST, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 2, 8]}, minWireVersion=0, maxWireVersion=4, maxDocumentSize=16777216, roundTripTimeNanos=1055668, setName='null', canonicalAddress=null, hosts=[], passives=[], arbiters=[], primary='null', tagSet=TagSet{[]}, electionId=null, setVersion=null}
Aug 16, 2016 11:50:28 AM com.mongodb.diagnostics.logging.JULLogger log
INFO: Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=REPLICA_SET_GHOST, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 2, 8]}, minWireVersion=0, maxWireVersion=4, maxDocumentSize=16777216, roundTripTimeNanos=1423390, setName='null', canonicalAddress=null, hosts=[], passives=[], arbiters=[], primary='null', tagSet=TagSet{[]}, electionId=null, setVersion=null}
Aug 16, 2016 11:50:28 AM com.mongodb.diagnostics.logging.JULLogger log
INFO: Opened connection [connectionId{localValue:5, serverValue:6}] to localhost:27017
Aug 16, 2016 11:50:28 AM com.mongodb.diagnostics.logging.JULLogger log
INFO: Opened connection [connectionId{localValue:6, serverValue:8}] to localhost:27017
Aug 16, 2016 11:50:28 AM com.mongodb.diagnostics.logging.JULLogger log
INFO: Opened connection [connectionId{localValue:4, serverValue:7}] to localhost:27017
Aug 16, 2016 11:50:28 AM com.mongodb.diagnostics.logging.JULLogger log
INFO: Cluster created with settings {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
Aug 16, 2016 11:50:28 AM com.mongodb.diagnostics.logging.JULLogger log
INFO: Cluster created with settings {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
Aug 16, 2016 11:50:28 AM com.mongodb.diagnostics.logging.JULLogger log
INFO: No server chosen by ReadPreferenceServerSelector{readPreference=primary} from cluster description ClusterDescription{type=UNKNOWN, connectionMode=SINGLE, all=[ServerDescription{address=localhost:27017, type=UNKNOWN, state=CONNECTING}]}. Waiting for 30000 ms before timing out
Aug 16, 2016 11:50:28 AM com.mongodb.diagnostics.logging.JULLogger log
INFO: Cluster created with settings {hosts=[localhost:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500}
Aug 16, 2016 11:50:28 AM com.mongodb.diagnostics.logging.JULLogger log
INFO: No server chosen by ReadPreferenceServerSelector{readPreference=primary} from cluster description ClusterDescription{type=UNKNOWN, connectionMode=SINGLE, all=[ServerDescription{address=localhost:27017, type=UNKNOWN, state=CONNECTING}]}. Waiting for 30000 ms before timing out
------------------------EXCEPTION-------------------------
java.lang.NullPointerException
        at org.apache.kafka.connect.mongodb.MongodbSourceTaskTest.testInsertWithOffsets(MongodbSourceTaskTest.java:148)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at junit.framework.TestCase.runTest(TestCase.java:176)
        at junit.framework.TestCase.runBare(TestCase.java:141)
        at junit.framework.TestResult$1.protect(TestResult.java:122)
        at junit.framework.TestResult.runProtected(TestResult.java:142)
        at junit.framework.TestResult.run(TestResult.java:125)Aug 16, 2016 11:50:28 AM com.mongodb.diagnostics.logging.JULLogger log
INFO: No server chosen by ReadPreferenceServerSelector{readPreference=primary} from cluster description ClusterDescription{type=UNKNOWN, connectionMode=SINGLE, all=[ServerDescription{address=localhost:27017, type=UNKNOWN, state=CONNECTING}]}. Waiting for 30000 ms before timing out

        at junit.framework.TestCase.run(TestCase.java:129)
        at junit.framework.TestSuite.runTest(TestSuite.java:252)
        at junit.framework.TestSuite.run(TestSuite.java:247)
        at org.junit.internal.runners.JUnit38ClassRunner.run(JUnit38ClassRunner.java:86)
        at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:252)
        at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:141)
        at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189)
        at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165)
        at org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85)
        at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:115)
Aug 16, 2016 11:50:28 AM com.mongodb.diagnostics.logging.JULLogger log
INFO: Opened connection [connectionId{localValue:7, serverValue:10}] to localhost:27017
        at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75)
Tests run: 2, Failures: 2, Errors: 0, Skipped: 0, Time elapsed: 1.776 sec <<< FAILURE!
testInsertWithNullOffsets(org.apache.kafka.connect.mongodb.MongodbSourceTaskTest)  Time elapsed: 1.497 sec  <<< FAILURE!
java.lang.AssertionError
        at org.junit.Assert.fail(Assert.java:86)
        at org.junit.Assert.assertTrue(Assert.java:41)
        at org.junit.Assert.assertTrue(Assert.java:52)
        at org.apache.kafka.connect.mongodb.MongodbSourceTaskTest.testInsertWithNullOffsets(MongodbSourceTaskTest.java:136)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at junit.framework.TestCase.runTest(TestCase.java:176)
        at junit.framework.TestCase.runBare(TestCase.java:141)
        at junit.framework.TestResult$1.protect(TestResult.java:122)
        at junit.framework.TestResult.runProtected(TestResult.java:142)
        at junit.framework.TestResult.run(TestResult.java:125)
        at junit.framework.TestCase.run(TestCase.java:129)
        at junit.framework.TestSuite.runTest(TestSuite.java:252)
        at junit.framework.TestSuite.run(TestSuite.java:247)
        at org.junit.internal.runners.JUnit38ClassRunner.run(JUnit38ClassRunner.java:86)
        at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:252)
        at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:141)
        at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189)
        at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165)
        at org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85)
        at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:115)
        at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75)

testInsertWithOffsets(org.apache.kafka.connect.mongodb.MongodbSourceTaskTest)  Time elapsed: 0.277 sec  <<< FAILURE!
java.lang.AssertionError
        at org.junit.Assert.fail(Assert.java:86)
        at org.junit.Assert.assertTrue(Assert.java:41)
        at org.junit.Assert.assertTrue(Assert.java:52)
        at org.apache.kafka.connect.mongodb.MongodbSourceTaskTest.testInsertWithOffsets(MongodbSourceTaskTest.java:172)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at junit.framework.TestCase.runTest(TestCase.java:176)
        at junit.framework.TestCase.runBare(TestCase.java:141)
        at junit.framework.TestResult$1.protect(TestResult.java:122)
        at junit.framework.TestResult.runProtected(TestResult.java:142)
        at junit.framework.TestResult.run(TestResult.java:125)
        at junit.framework.TestCase.run(TestCase.java:129)
        at junit.framework.TestSuite.runTest(TestSuite.java:252)
        at junit.framework.TestSuite.run(TestSuite.java:247)
        at org.junit.internal.runners.JUnit38ClassRunner.run(JUnit38ClassRunner.java:86)
        at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:252)
        at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:141)
        at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189)
        at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165)
        at org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85)
        at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:115)
        at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75)

Aug 16, 2016 11:50:28 AM com.mongodb.diagnostics.logging.JULLogger log
INFO: Opened connection [connectionId{localValue:8, serverValue:11}] to localhost:27017
Aug 16, 2016 11:50:28 AM com.mongodb.diagnostics.logging.JULLogger log
INFO: Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=REPLICA_SET_GHOST, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 2, 8]}, minWireVersion=0, maxWireVersion=4, maxDocumentSize=16777216, roundTripTimeNanos=1451258, setName='null', canonicalAddress=null, hosts=[], passives=[], arbiters=[], primary='null', tagSet=TagSet{[]}, electionId=null, setVersion=null}
Aug 16, 2016 11:50:28 AM com.mongodb.diagnostics.logging.JULLogger log
INFO: Opened connection [connectionId{localValue:9, serverValue:12}] to localhost:27017
Aug 16, 2016 11:50:28 AM com.mongodb.diagnostics.logging.JULLogger log
INFO: Monitor thread successfully connected to server with description ServerDescription{address=localhost:27017, type=REPLICA_SET_GHOST, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 2, 8]}, minWireVersion=0, maxWireVersion=4, maxDocumentSize=16777216, roundTripTimeNanos=1815778, setName='null', canonicalAddress=null, hosts=[], passives=[], arbiters=[], primary='null', tagSet=TagSet{[]}, electionId=null, setVersion=null}

Results :

Failed tests:   testInsertWithNullOffsets(org.apache.kafka.connect.mongodb.MongodbSourceTaskTest)
  testInsertWithOffsets(org.apache.kafka.connect.mongodb.MongodbSourceTaskTest)

Tests run: 5, Failures: 2, Errors: 0, Skipped: 0

[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.951 s
[INFO] Finished at: 2016-08-16T11:50:28-04:00
[INFO] Final Memory: 12M/309M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.12.4:test (default-test) on project connect-mongodb: There are test failures.
[ERROR]
[ERROR] Please refer to /Users/kyle.white/universe/experimental/kafka/kafka-connect-mongodb/target/surefire-reports for the individual test results.
[ERROR] -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException

Is there a way to prevent the JSON structure that is pulled from mongo from being destroyed?

Ok, so I love the connector - it makes getting data from Mongo to Kafka easy, however when the data gets pulled from mongo, it loses its own JSON structure. Is there a way to prevent this from happening?

Example:
My JSON looks like this in Mongo:

{ 
   "_id": "56feaa424f1249736af0ba4f",
   "ts":"Thu Mar 31 09:00:00 PDT 2016",
   "type":"hrlyEvtLvls",
   "events": [{
                     "TechDump":4,
                     "Reboot":174,
                     "FatalError":0
                  }]
}

However when I retrieve it from Kafka this is what it looks like in the payload object:

Document{{_id=56feaa424f1249736af0ba4f, ts=Thu Mar 31 09:00:00 PDT 2016, type=hrlyEvtLvls, events=Document{{TechDump=4, Reboot=174, FatalError=0}}}}

I don't have an easy way to reprocess the received data back into JSON, so is there some way I can prevent my JSON from being lost in the first place?

Example code?

Hi. I would love to try this. It is not clear how to run it. Is there an example for Sink and Source. It looks like it is configuration driven.

How to skip error message with mongodb-connector?

mongodb connector throws this exception and halts execution even if one message in a kafka topic is erroneous. I found that kafka-console-consumer continues with processing of subsequent messages when --skip-message-on-error option is used. Can the same behaviour be provided in mongodb-connector?

Thanks.

[2016-08-03 15:17:43,192] ERROR Task mongodb-sink-connector-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: 
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:328)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:346)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Illegal unquoted character ((CTRL-CHAR, code 13)): has to be escaped using backslash to be included in string value
 at [Source: [B@584dfc1a; line: 2, column: 219]
Caused by: com.fasterxml.jackson.core.JsonParseException: Illegal unquoted character ((CTRL-CHAR, code 13)): has to be escaped using backslash to be included in string value
 at [Source: [B@584dfc1a; line: 2, column: 219]
        at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581)
        at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533)
        at com.fasterxml.jackson.core.base.ParserMinimalBase._throwUnquotedSpace(ParserMinimalBase.java:497)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString2(UTF8StreamJsonParser.java:2485)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishAndReturnString(UTF8StreamJsonParser.java:2414)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.getText(UTF8StreamJsonParser.java:285)
        at com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeObject(JsonNodeDeserializer.java:233)
        at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:69)
        at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:15)
        at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3736)
        at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2294)
        at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:326)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:346)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Failed to find any class that implements Connector

Hello.

I placed .jar (with or without dependencies) into plugins folder, but when I'm trying to create connector (via post request) - it gives me an error:

{
    "error_code": 500,
    "message": "Failed to find any class that implements Connector and which name matches org.apache.kafka.connect.mongodb.MongodbSourceConnector, available connectors are: PluginDesc{klass=class com.datamountaineer.streamreactor.connect.mongodb.sink.MongoSinkConnector, name='com.datamountaineer.streamreactor.connect.mongodb.sink.MongoSinkConnector', version='null', encodedVersion=null, type=connector, typeName='connector', location='file:/usr/share/java/connector-plugins/kafka-connect-mongodb-0.3.0-3.3.0-all.jar'}, PluginDesc{klass=class io.confluent.connect.elasticsearch.ElasticsearchSinkConnector, name='io.confluent.connect.elasticsearch.ElasticsearchSinkConnector', version='4.0.0', encodedVersion=4.0.0, type=sink, typeName='sink', location='file:/usr/share/java/connector-plugins/connector-elasticsearch/'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='1.0.0-cp1', encodedVersion=1.0.0-cp1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='1.0.0-cp1', encodedVersion=1.0.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.mongodb.MongodbSinkConnector, name='org.apache.kafka.connect.mongodb.MongodbSinkConnector', version='1.0.0-cp1', encodedVersion=1.0.0-cp1, type=sink, typeName='sink', location='file:/usr/share/java/connector-plugins/connect-mongodb-1.1-jar-with-dependencies.jar'}, PluginDesc{klass=class org.apache.kafka.connect.mongodb.MongodbSourceConnector, name='org.apache.kafka.connect.mongodb.MongodbSourceConnector', version='1.0.0-cp1', encodedVersion=1.0.0-cp1, type=source, typeName='source', location='file:/usr/share/java/connector-plugins/connect-mongodb-1.1-jar-with-dependencies.jar'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='1.0.0-cp1', encodedVersion=1.0.0-cp1, type=connector, typeName='connector', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='1.0.0-cp1', encodedVersion=1.0.0-cp1, type=sink, typeName='sink', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='1.0.0-cp1', encodedVersion=1.0.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='1.0.0-cp1', encodedVersion=1.0.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='1.0.0-cp1', encodedVersion=1.0.0-cp1, type=source, typeName='source', location='classpath'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='1.0.0-cp1', encodedVersion=1.0.0-cp1, type=source, typeName='source', location='classpath'}"
}

The classname, which I specified in "connector.class" property is present in list below (list of available connectors).

For comparison, Mongo sync from this pack: https://github.com/Landoop/stream-reactor
works (and creates) well.
kafka-connect-mongo doesn't creates with same error too.

Docker log: https://gist.github.com/vlad1777d/7cd9a8744fe2f6ca5e8c0a3f57834d89
Docker Compose .yml file: docker-compose.yml.txt

Build was successful:

[INFO] Building jar: /Users/vlad/Downloads/kafka-connect-mongodb-master/target/connect-mongodb-1.1-jar-with-dependencies.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 02:12 min
[INFO] Finished at: 2017-12-27T18:45:26+02:00
[INFO] Final Memory: 28M/208M
[INFO] ------------------------------------------------------------------------

MongoDB Source Connector not working.

I am getting the following issue whenever I try to start Kafka connect using MongoDB source connector.

[2022-02-22 16:18:48,215] INFO [mongodb-source-connector|task-0] Resuming the change stream after the previous offset: {"_data": "826214BF89000000022B0229296E04"} (com.mongodb.kafka.connect.source.MongoSourceTask:430)
[2022-02-22 16:18:49,330] INFO [mongodb-source-connector|task-0] [Producer clientId=connector-producer-mongodb-source-connector-0] Resetting the last seen epoch of partition My_DB.My_Data-0 to 0 since the associated topicId changed from null to 3IPCPHczQuOheIQoBAoxCQ (org.apache.kafka.clients.Metadata:402)
[2022-02-22 16:18:52,036] INFO [mongodb-source-connector|task-0|offsets] WorkerSourceTask{id=mongodb-source-connector-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:484)
[2022-02-22 16:19:02,048] INFO [mongodb-source-connector|task-0|offsets] WorkerSourceTask{id=mongodb-source-connector-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:484)
[2022-02-22 16:19:12,070] INFO [mongodb-source-connector|task-0|offsets] WorkerSourceTask{id=mongodb-source-connector-0} Either no records were produced by the task since the last offset commit, or every record has been filtered out by a transformation or dropped due to transformation or conversion errors. (org.apache.kafka.connect.runtime.WorkerSourceTask:484)

Cluster aware

Hi
I was wondering if the connector should not:

  1. be aware that there are more than one node of the replica set (so host,port should be a list not an entry), so if the primary goes down we still can connect to the new primary (that was a secondary) and still read the oplog.
    and/or
  2. it should be aware of the fact that it's a shared cluster and spawn one job per shard/replica set? In other ways, first determine that it's connected to a mongos, also process should be able to connect to multiple mongos (as the driver) to be fault tolerant.

Makes sense?

Primary/Secondary server Internals with OpLog

Hey, This is more of a question than an issue.
I was looking into another connector Debezium and found a special case where they search for a primary server in the replication set. This solution doesn't handle this case. So, how does it affect when we have multiple replication sets and there is primary server failure?

Please correct me if I am wrong, but when the primary server handles an operation, it generates an oplog. This log then gets synced to other secondary servers. In our case since we maintain offset, even let say we lost connection to the primary. In this case, if we now switch to secondary server oplog, all the process will do is wait for the offset, until the secondary server's oplog get synced.

Source Connector connected but not fetching data from mongodb

Hi
Source Connector connected but not fetching data from mongodb.
Please advice.

[2017-12-05 16:59:58,009] INFO Cluster created with settings {hosts=[127.0.0.1:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500} (org.mongodb.driver.cluster:71)
[2017-12-05 16:59:58,016] INFO Cluster description not yet available. Waiting for 30000 ms before timing out (org.mongodb.driver.cluster:71)
[2017-12-05 16:59:58,019] INFO Opened connection [connectionId{localValue:3, serverValue:282}] to 127.0.0.1:27017 (org.mongodb.driver.connection:71)
[2017-12-05 16:59:58,019] INFO Monitor thread successfully connected to server with description ServerDescription{address=127.0.0.1:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 4, 10]}, minWireVersion=0, maxWireVersion=5, maxDocumentSize=16777216, roundTripTimeNanos=280047} (org.mongodb.driver.cluster:71)
[2017-12-05 16:59:58,020] INFO WorkerSourceTask{id=mongodb-source-connector-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:158)
[2017-12-05 16:59:58,024] INFO Opened connection [connectionId{localValue:4, serverValue:283}] to 127.0.0.1:27017 (org.mongodb.driver.connection:71)
[2017-12-05 16:59:58,024] INFO Opened connection [connectionId{localValue:5, serverValue:284}] to 127.0.0.1:27017 (org.mongodb.driver.connection:71)
[2017-12-05 16:59:58,024] INFO Opened connection [connectionId{localValue:6, serverValue:285}] to 127.0.0.1:27017 (org.mongodb.driver.connection:71)
[2017-12-05 17:00:57,903] INFO WorkerSourceTask{id=mongodb-source-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:306)
[2017-12-05 17:00:57,904] INFO WorkerSourceTask{id=mongodb-source-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:323)

Best regards,

Loss of records in the insertion in mongo

Hi
I use mongo sink connector in distributed mode.
I send registers from my producer to mongo database, but I'm lost 70% of registers
In my mongo database only are 30%

This is the configuration of my connector
curl -X POST -H "Content-Type: application/json" --data '{"name":"test-sink-mongo","config":{"connector.class":"org.apache.kafka.connect.mongodb.MongodbSinkConnector","tasks.max":"1","host":"10.0.0.25","port":"27017","bulk.size":"100","mongodb.database":"test","mongodb.collections":"collection-database-example","topics":"MONGO"}}' http://localhost:8083/connectors

Copying connect-mongodb-1.0.jar vs. connect-mongodb-1.0-jar-with-dependencies.jar to $CONFLUENT_HOME/share/java/confluent-common - different issues

Hello,

I use kafka-connect-mongo with Confluent 3.0.0 and MongoDB 3.2.8.

CASE 1:

If I copy connect-mongodb-1.0.jar to $CONFLUENT_HOME/share/java/confluent-common then I can run fine Zookeeper, Kafka server and SchemaRegistry. But:

$CONFLUENT_HOME/bin/connect-standalone $CONFLUENT_HOME/etc/kafka/connect-standalone.properties $CONFLUENT_HOME/etc/kafka/connect-file-sink.properties

gives the error "java.lang.NoClassDefFoundError: org/bson/conversions/Bson"

Here is the extended output:

...
[2016-08-25 14:31:24,095] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:58)
[2016-08-25 14:31:24,099] INFO ConnectorConfig values:
connector.class = org.apache.kafka.connect.mongodb.MongodbSinkConnector
tasks.max = 3
name = mongodb-sink-connector
(org.apache.kafka.connect.runtime.ConnectorConfig:178)
[2016-08-25 14:31:24,105] INFO Creating connector mongodb-sink-connector of type org.apache.kafka.connect.mongodb.MongodbSinkConnector (org.apache.kafka.connect.runtime.Worker:168)
[2016-08-25 14:31:24,107] INFO Instantiated connector mongodb-sink-connector with version 0.10.0.0-cp1 of type org.apache.kafka.connect.mongodb.MongodbSinkConnector (org.apache.kafka.connect.runtime.Worker:176)
[2016-08-25 14:31:24,111] INFO Finished creating connector mongodb-sink-connector (org.apache.kafka.connect.runtime.Worker:181)
[2016-08-25 14:31:24,111] INFO SinkConnectorConfig values:
connector.class = org.apache.kafka.connect.mongodb.MongodbSinkConnector
tasks.max = 3
topics = [railstations_topic2]
name = mongodb-sink-connector
(org.apache.kafka.connect.runtime.SinkConnectorConfig:178)
[2016-08-25 14:31:24,117] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:100)
java.lang.NoClassDefFoundError: org/bson/conversions/Bson
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:663)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:418)
at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:55)
at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
at org.apache.kafka.connect.runtime.TaskConfig.(TaskConfig.java:52)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.createConnectorTasks(StandaloneHerder.java:275)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.updateConnectorTasks(StandaloneHerder.java:308)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:165)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:94)
Caused by: java.lang.ClassNotFoundException: org.bson.conversions.Bson
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 11 more
[2016-08-25 14:31:24,120] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:68)
...
[2016-08-25 14:31:24,168] ERROR Task not found: mongodb-sink-connector-0 (org.apache.kafka.connect.runtime.Worker:416)
Exception in thread "main" org.apache.kafka.connect.errors.ConnectException: Task not found: mongodb-sink-connector-0
at org.apache.kafka.connect.runtime.Worker.getTask(Worker.java:417)
at org.apache.kafka.connect.runtime.Worker.stopTasks(Worker.java:373)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.removeConnectorTasks(StandaloneHerder.java:290)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.stop(StandaloneHerder.java:83)
at org.apache.kafka.connect.runtime.Connect.stop(Connect.java:71)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:101)
[2016-08-25 14:32:03,114] INFO Reflections took 40002 ms to scan 261 urls, producing 12948 keys and 85634 values (org.reflections.Reflections:229)

CASE 2:

If I copy connect-mongodb-1.0-jar-with-dependencies.jar to $CONFLUENT_HOME/share/java/confluent-common then I can run fine Zookeeper and Kafka server, but SchemaRegistry fails to start:

[2016-08-25 13:44:43,184] ERROR Server died unexpectedly: (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:51)
kafka.common.KafkaException: Failed to parse the broker info from zookeeper: {"jmx_port":-1,"timestamp":"1472124710146","endpoints":["PLAINTEXT://localhost:9092"],"host":"localhost","version":3,"port":9092}
at kafka.cluster.Broker$.createBroker(Broker.scala:101)
at kafka.utils.ZkUtils.getBrokerInfo(ZkUtils.scala:787)
at kafka.utils.ZkUtils$$anonfun$getAllBrokersInCluster$2.apply(ZkUtils.scala:162)
at kafka.utils.ZkUtils$$anonfun$getAllBrokersInCluster$2.apply(ZkUtils.scala:162)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.utils.ZkUtils.getAllBrokersInCluster(ZkUtils.scala:162)
at io.confluent.kafka.schemaregistry.storage.KafkaStore.(KafkaStore.java:109)
at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.(KafkaSchemaRegistry.java:136)
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:53)
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:37)
at io.confluent.rest.Application.createServer(Application.java:117)
at io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain.main(SchemaRegistryMain.java:43)
Caused by: java.lang.NoSuchMethodError: org.apache.kafka.common.protocol.SecurityProtocol.forName(Ljava/lang/String;)Lorg/apache/kafka/common/protocol/SecurityProtocol;
at kafka.cluster.EndPoint$.createEndPoint(EndPoint.scala:49)
at kafka.cluster.Broker$$anonfun$1.apply(Broker.scala:90)
at kafka.cluster.Broker$$anonfun$1.apply(Broker.scala:89)
at scala.collection.immutable.List.map(List.scala:273)
at kafka.cluster.Broker$.createBroker(Broker.scala:89)
... 16 more

Please give me some advice on how to finalize successfully this task.

Best regards,
Camelia

Use Mongo as source and sink

I have not been able to get this working with MongoDB as source as well as sink. These servers are not a part of the replica set and there is a need to replay all events as it is.

Questions:

  1. I assume scheme registry is not applicable here. Please confirm
  2. What is the definition of schema.name - Can any values be used here?

Following are the property files:

Source

name=mongodb-source-connector
connector.class=org.apache.kafka.connect.mongodb.MongodbSourceConnector
tasks.max=1
host=10.6.156.188
port=27017
batch.size=100
topic.prefix=testconn
schema.name=mongodbschema
databases=PocDB.myCollection

Sink

name=mongodb-sink-connector
connector.class=org.apache.kafka.connect.mongodb.MongodbSinkConnector
tasks.max=1
host=10.6.156.189
port=27017
bulk.size=100
mongodb.database=PocDB
mongodb.collections=myCollection
topics=testconn_PocDB_myCollection

Any help in this regard is appreciated.

Thanks

mongodb-source-connector

my connections are opened but it is not flushing any records from the database
INFO Kafka version : 1.1.0-cp1 (org.apache.kafka.common.utils.AppInfoParser:109)
[2018-05-08 17:10:24,265] INFO Kafka commitId : 93e03414f72c2485 (org.apache.kafka.common.utils.AppInfoParser:110)
[2018-05-08 17:10:24,281] INFO Created connector mongodb-source-connector-thirteen (org.apache.kafka.connect.cli.ConnectStandalone:104)
[2018-05-08 17:10:24,422] INFO Cluster created with settings {hosts=[127.0.0.1:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500} (org.mongodb.driver.cluster:71)
[2018-05-08 17:10:24,445] INFO Cluster description not yet available. Waiting for 30000 ms before timing out (org.mongodb.driver.cluster:71)
[2018-05-08 17:10:24,664] INFO Opened connection [connectionId{localValue:1, serverValue:6}] to 127.0.0.1:27017 (org.mongodb.driver.connection:71)
[2018-05-08 17:10:24,665] INFO Monitor thread successfully connected to server with description ServerDescription{address=127.0.0.1:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 6, 4]}, minWireVersion=0, maxWireVersion=6, maxDocumentSize=16777216, roundTripTimeNanos=550564} (org.mongodb.driver.cluster:71)
[2018-05-08 17:10:24,678] INFO Cluster created with settings {hosts=[127.0.0.1:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500} (org.mongodb.driver.cluster:71)
[2018-05-08 17:10:24,684] INFO Cluster description not yet available. Waiting for 30000 ms before timing out (org.mongodb.driver.cluster:71)
[2018-05-08 17:10:24,691] INFO Opened connection [connectionId{localValue:2, serverValue:7}] to 127.0.0.1:27017 (org.mongodb.driver.connection:71)
[2018-05-08 17:10:24,694] INFO Monitor thread successfully connected to server with description ServerDescription{address=127.0.0.1:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 6, 4]}, minWireVersion=0, maxWireVersion=6, maxDocumentSize=16777216, roundTripTimeNanos=1392005} (org.mongodb.driver.cluster:71)
[2018-05-08 17:10:24,699] INFO WorkerSourceTask{id=mongodb-source-connector-thirteen-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:165)
[2018-05-08 17:10:24,739] INFO Opened connection [connectionId{localValue:4, serverValue:8}] to 127.0.0.1:27017 (org.mongodb.driver.connection:71)
[2018-05-08 17:10:24,754] INFO Opened connection [connectionId{localValue:3, serverValue:9}] to 127.0.0.1:27017 (org.mongodb.driver.connection:71)
[2018-05-08 17:11:24,281] INFO WorkerSourceTask{id=mongodb-source-connector-thirteen-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
[2018-05-08 17:11:24,282] INFO WorkerSourceTask{id=mongodb-source-connector-thirteen-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)
[2018-05-08 17:12:24,283] INFO WorkerSourceTask{id=mongodb-source-connector-thirteen-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
[2018-05-08 17:12:24,283] INFO WorkerSourceTask{id=mongodb-source-connector-thirteen-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)

my property file

name=mongodb-source-connector-fifteen
connector.class=org.apache.kafka.connect.mongodb.MongodbSourceConnector
tasks.max=1
uri=mongodb://127.0.0.1:27017
host=127.0.0.1
port=27017
batch.size=100
schema.name=mongodbschema
topic.prefix=mongodb-kafka
converter.class=org.apache.kafka.connect.mongodb.converter.JsonStructConverter
databases=volumata.demography,volumata.mycollection

File Jar for Mongo and BSON

Hi , I add this project to eclipse but there is error for "import" for org.mongo and org.bson.
Any one help me?
thanks.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.