Code Monkey home page Code Monkey logo

pentaho-kafka-consumer's Introduction

pentaho-kafka-consumer

Apache Kafka consumer step plug-in for Pentaho Kettle.

Build Status

Screenshots

Using Apache Kafka Consumer in Kettle

Apache Kafka Compatibility

The consumer depends on Apache Kafka 0.8.1.1, which means that the broker must be of 0.8.x version or later.

If you want to build the plugin for a different Kafka version you have to modify the values of kafka.version and kafka.scala.version in the properties section of the pom.xml.

Maximum Duration Of Consumption

Note that the maximum duration of consumption is a limit on the duration of the entire step, not an individual read. This means that if you have a maximum duration of 5000ms, your transformation will stop after 5s, whether or not more data exists and independent of how fast each message is fetched from the topic. If you want to stop reading messages when the topic has no more messages, see the section on Empty topic handling.

Empty topic handling

If you want the step to halt when there are no more messages available on the topic, check the "Stop on empty topic" checkbox in the configuration dialog. The default timeout to wait for messages is 1000ms, but you can override this by setting the "consumer.timeout.ms" property in the dialog. If you configure a timeout without checking the box, an empty topic will be considered a failure case.

Installation

  1. Download pentaho-kafka-consumer Zip archive from latest release page.
  2. Extract downloaded archive into plugins/steps directory of your Pentaho Data Integration distribution.

Building from source code

mvn clean package

pentaho-kafka-consumer's People

Contributors

dchenbecker avatar dependabot[bot] avatar fhossfel avatar hemgov avatar nicolasreyrolle avatar spektom avatar thumbtack-etrapeznikov avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pentaho-kafka-consumer's Issues

Consumer reads messages more than once

Hi, thank you for this plugin. I'm experiencing a behaviour I don't understand and I'd like to figure it out whether is there are errors on my business logic.
Here it is my skeleton:
skeleton
And I'm using mostly default values:
defaults

The transformation on the right is called every 1000 rows. It's about 500 rows per second and it is aligned with the message throughput. Sometimes we have to stop it for a while so, upon restart, there is a lag which can be consistent (400k message). This means that for half an hour we have kettle filling up server resources to catch up. It's fine and it works. But in those cases, and only in those where the server is at full capacity, the kafka consumer emits more messages than the actual number in the kafka topic. Eg:

  • kafka topic has 100k records. no new messages arriving (I can control this and I did to have a clean test)
  • we expect the consumer to emit 10k rows; then we expect it to wait indefinitely
  • the consumer actually emits 10080 messages, where those 80 are repetitions of messages already emitted, then it waits as expected
  • as a doublecheck: the subsequent transformation confirms that exactly 80 messages have an offset already registered

I understand this is acceptable in an at-least-once scenario, and it's ok, we discard duplicates, no harm. But I'm just curious about understanding how those repetitions are related with performances. Why this happens on "stress"? Is there any timeout I overlooked which makes the consumer "retry"?

Thank you
Virgilio

the plugin can't work in kettle 5.x

when i compiler it with kettle core 5.2.0.0, it's will success.but not be found in kettle 5.x menu.can you solve this problem ? thank you

Apache Kafka Consumer - using variables

Hi, is there a problem substituting a variable in the "Maximum duration of consumption" field instead of the actual value? I am using PDI 6.1.0.3 and the latest version of the kafka consumer. When I use a variable to this field it keeps getting removed and replaced with the value "0".

NoClassDefFoundError(s)

I got the Kafka consumer working but had to go find the following jars and include them in lib:

metrics-annotation-2.2.0.jar
scala-compiler-2.10.1.jar
scala-reflect-2.10.1.jar
slf4j-api-1.6.4.jar
slf4j-simple-1.6.4.jar
snappy-java-1.0.4.1.jar

Plus, the jar kafka_2.10-0.8.1.1.jar was referenced in plugin.xml but it was actually kafka_2.10-0.8.1.jar

Controlling Offsets

Is it possible to control when an offset is written by PDI, i.e., I would like to update the offset once the PDI transform has completed successfully rather than when it read the message.

SSL Encryption

Is there a way for the plug-in to communicate (consume/produce) to Kafka server over SSL Protocol?

New client access for new versions of Kafka

I tried to modify your code, but failed to ask for help!
for example:
Properties props = new Properties();
props.put("bootstrap.servers", "locahost:9094");
props.put("group.id", "groupId");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topics));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(200);
for (ConsumerRecord<String, String> record : records) {
System.out.println(Thread.currentThread().getName() + " partition: "
+ record.partition() + " offset: " + record.offset()
+ " key:" + record.key() + " value:"
+ record.value());
}
}

Kafka Consumer Step Question

When I open the Kafka Consumer step and try to add a parameter, the changes do not persist after I close the step. This is on PDI 7, with the latest version of the steps. Is this by design? If not, is this a bug? I noticed the same issue with the Kafka Producer Step.

"nodename nor servname provide" error

Hi,

I'm getting the following error when starting the transformation - any idea what might be causing this?

2015/04/30 12:13:21 - Apache Kafka Consumer.0 - Creating Kafka consumer listening on zookeeper: localhost:2181
2015/04/30 12:13:21 - Apache Kafka Consumer.0 - ERROR (version 5.3.0.0-213, build 1 from 2015-02-02_12-17-08 by buildguy) : Error initializing step [Apache Kafka Consumer]
2015/04/30 12:13:21 - Apache Kafka Consumer.0 - ERROR (version 5.3.0.0-213, build 1 from 2015-02-02_12-17-08 by buildguy) : java.net.UnknownHostException: MRMIOMO1443: MRMIOMO1443: nodename nor servname provided, or not known
2015/04/30 12:13:21 - Apache Kafka Consumer.0 - at java.net.InetAddress.getLocalHost(InetAddress.java:1473)

Consumer not getting any message from kafka

Consumer not getting any message from kafka , i do see message when i run from kafka console consumer, but with PDI kafka consumer step doesn't receive any message also there is no error.

Below Properties i have tired

topic = test
Message Limit = 0 (default)
Read Timeout (ms) =1000 ( also tried with 0 and 100000)
group.id =group1
zookeeper.connect =localhost:2181
consumer.id =pentaho1 (also tied with default)
client.id =pentaho1 (also tied with default)
auto.commit.enable =false (also tied with default)
auto.offset.reset =smallest (also tied with default)

does the zookeerper server parameter allow to have multiple nodes ?

Our kafka instance is replicating the messages across multiple zk nodes, I was trying to provide a list of nodes separated by comma and I did try semicolon also however the component is taking everything as a big string, is it normal ? or does this component use something different to separate the nodes ?

process parition specific data

Is the key field same as the partition number? if not how i can I specify for consumer to read only specific partition data for ApacheKafkaConsumer step(version 1.7)

Using consumer with pdi 6.1.0.3 enterprise - java.lang.linkageError: loader constraint violation

Fresh kettle installation with cdh54 big-data-plugin. Kafka plugins install from marketplace.

The producer works great, but upon running the consumer, i get 2 errors. The first error is from the client itself, the second is what spoon.sh reports. Can you help identify what I'm doing wrong?

first error (client ui):
java.lang.NullPointerException at org.pentaho.di.trans.Trans.fireTransFinishedListeners(Trans.java:1466) at org.pentaho.di.trans.Trans.prepareExecution(Trans.java:1124) at org.pentaho.di.ui.spoon.trans.TransGraph.debug(TransGraph.java:3803) at org.pentaho.di.ui.spoon.delegates.SpoonTransformationDelegate.executeTransformation(SpoonTransformationDelegate.java:879) at org.pentaho.di.ui.spoon.Spoon$31$1.run(Spoon.java:8523) at org.eclipse.swt.widgets.RunnableLock.run(Unknown Source) at org.eclipse.swt.widgets.Synchronizer.runAsyncMessages(Unknown Source) at org.eclipse.swt.widgets.Display.runAsyncMessages(Unknown Source) at org.eclipse.swt.widgets.Display.readAndDispatch(Unknown Source) at org.pentaho.di.ui.spoon.Spoon.readAndDispatch(Spoon.java:1347) at org.pentaho.di.ui.spoon.Spoon.waitForDispose(Spoon.java:7989) at org.pentaho.di.ui.spoon.Spoon.start(Spoon.java:9269) at org.pentaho.di.ui.spoon.Spoon.main(Spoon.java:663) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.pentaho.commons.launcher.Launcher.main(Launcher.java:92)

second error (spoon.sh command output).
2016/08/11 15:49:53 - consumer - Dispatching started for transformation [consumer] 2016/08/11 15:49:53 - Apache Kafka Consumer.0 - ERROR (version 6.1.0.3-223, build 1 from 2016-06-16 15.11.37 by buildguy) : Error initializing step [Apache Kafka Consumer] 2016/08/11 15:49:53 - Apache Kafka Consumer.0 - ERROR (version 6.1.0.3-223, build 1 from 2016-06-16 15.11.37 by buildguy) : java.lang.LinkageError: loader constraint violation: loader (instance of org/pentaho/di/core/plugins/KettleURLClassLoader) previously initiated loading for a different type with name "org/w3c/dom/Node" 2016/08/11 15:49:53 - Apache Kafka Consumer.0 - at java.lang.ClassLoader.defineClass1(Native Method) 2016/08/11 15:49:53 - Apache Kafka Consumer.0 - at java.lang.ClassLoader.defineClass(ClassLoader.java:760) 2016/08/11 15:49:53 - Apache Kafka Consumer.0 - at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) 2016/08/11 15:49:53 - Apache Kafka Consumer.0 - at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) 2016/08/11 15:49:53 - Apache Kafka Consumer.0 - at java.net.URLClassLoader.access$100(URLClassLoader.java:73) 2016/08/11 15:49:53 - Apache Kafka Consumer.0 - at java.net.URLClassLoader$1.run(URLClassLoader.java:368) 2016/08/11 15:49:53 - Apache Kafka Consumer.0 - at java.net.URLClassLoader$1.run(URLClassLoader.java:362) 2016/08/11 15:49:53 - Apache Kafka Consumer.0 - at java.security.AccessController.doPrivileged(Native Method) 2016/08/11 15:49:53 - Apache Kafka Consumer.0 - at java.net.URLClassLoader.findClass(URLClassLoader.java:361) 2016/08/11 15:49:53 - Apache Kafka Consumer.0 - at org.pentaho.di.core.plugins.KettleURLClassLoader.loadClassFromThisLoader(KettleURLClassLoader.java:78) 2016/08/11 15:49:53 - Apache Kafka Consumer.0 - at org.pentaho.di.core.plugins.KettleURLClassLoader.loadClass(KettleURLClassLoader.java:101) 2016/08/11 15:49:53 - Apache Kafka Consumer.0 - at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 2016/08/11 15:49:53 - Apache Kafka Consumer.0 - at java.lang.Class.forName0(Native Method) 2016/08/11 15:49:53 - Apache Kafka Consumer.0 - at java.lang.Class.forName(Class.java:264) 2016/08/11 15:49:53 - Apache Kafka Consumer.0 - at org.apache.log4j.helpers.Loader.loadClass(Loader.java:182) 2016/08/11 15:49:53 - Apache Kafka Consumer.0 - at org.apache.log4j.helpers.OptionConverter.instantiateByClassName(OptionConverter.java:326) 2016/08/11 15:49:53 - Apache Kafka Consumer.0 - at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:472) 2016/08/11 15:49:53 - Apache Kafka Consumer.0 - at org.apache.log4j.LogManager.<clinit>(LogManager.java:127) 2016/08/11 15:49:53 - Apache Kafka Consumer.0 - at org.apache.log4j.Logger.getLogger(Logger.java:104) 2016/08/11 15:49:53 - Apache Kafka Consumer.0 - at kafka.utils.Logging$class.logger(Logging.scala:24) 2016/08/11 15:49:53 - Apache Kafka Consumer.0 - at kafka.utils.VerifiableProperties.logger$lzycompute(VerifiableProperties.scala:26) 2016/08/11 15:49:53 - Apache Kafka Consumer.0 - at kafka.utils.VerifiableProperties.logger(VerifiableProperties.scala:26) 2016/08/11 15:49:53 - Apache Kafka Consumer.0 - at kafka.utils.Logging$class.info(Logging.scala:67) 2016/08/11 15:49:53 - Apache Kafka Consumer.0 - at kafka.utils.VerifiableProperties.info(VerifiableProperties.scala:26) 2016/08/11 15:49:53 - Apache Kafka Consumer.0 - at kafka.utils.VerifiableProperties.verify(VerifiableProperties.scala:217) 2016/08/11 15:49:53 - Apache Kafka Consumer.0 - at kafka.consumer.ConsumerConfig.<init>(ConsumerConfig.scala:95) 2016/08/11 15:49:53 - Apache Kafka Consumer.0 - at com.ruckuswireless.pentaho.kafka.consumer.KafkaConsumerStep.init(KafkaConsumerStep.java:64) 2016/08/11 15:49:53 - Apache Kafka Consumer.0 - at org.pentaho.di.trans.step.StepInitThread.run(StepInitThread.java:69) 2016/08/11 15:49:53 - Apache Kafka Consumer.0 - at java.lang.Thread.run(Thread.java:745) 2016/08/11 15:49:53 - consumer - ERROR (version 6.1.0.3-223, build 1 from 2016-06-16 15.11.37 by buildguy) : Step [Apache Kafka Consumer.0] failed to initialize!

Kafka reprocess messages

I am using the Kafka Consumer Plugin for Pentaho CE and would appreciate your help in its usage. I would like to know if any of you were in a situation where pentaho failed and you lost any messages (based on the official docs there's no way to read the message twice, am I wrong ?). If this situation occurs how do you capture these messages so you can reprocess them?

Some questions

Thank you for contributing this step to the marketplace! I am writing up a blog entry as we speak demonstrating both the consumer and producer you've created. I have a few questions and wanted to see if I've missed something or if this all makes sense.

When I first run the plugins published on the marketplace, I get the following exception:

2015/12/23 11:58:18 - Apache Kafka Consumer.0 - ERROR (version 6.0.0.0-353, build 1 from 2015-10-07 13.27.43 by buildguy) : Error initializing step [Apache Kafka Consumer]
2015/12/23 11:58:18 - Apache Kafka Consumer.0 - ERROR (version 6.0.0.0-353, build 1 from 2015-10-07 13.27.43 by buildguy) : java.lang.NoClassDefFoundError: kafka/consumer/ConsumerConfig

Is this expected? To resolve the issue I placed all the lib/jar files in Spoon's main classpath. Do you have another approach? I am running Kettle 6.0 on Windows.

After I resolved that issue, I wasn't able to get the consumer to read any messages from Kafka. I tried setting various properties like auto.offset.reset=smallest, but I was unable to get anything out of Kafka. Also the transformation would stop quickly after running it. I am running Kafka 0.9 so maybe that had something to do with it.

To resolve that issue I checked out the latest source and updated the plugin to include the latest code. I noticed the new feature "Stop on empty topic", so figured there were changes that might be good to try out.

With the latest update, I still needed to copy the lib jars into the main classpath, after doing that I was able to successfully run the step!

Also with those changes I was able to get the producer working as well, I didn't need to get the latest version of the producer from github, the marketplace version worked against Kafka 0.9 just fine.

Would it be possible to update the marketplace with the latest version of the plugin?

Thank you again for contributing these steps!

Will

Kafka 0.10 support is missing

I am using Pentaho Data Integration 6.1 and trying to connect to Kafka (2.10-0.10.0.0). I download the consumer from here and put it in plugins/steps folder. While running, I'm getting this error:

2016/08/01 09:12:00 - Apache Kafka Consumer.0 - ERROR (version 6.1.0.1-196, build 1 from 2016-04-07 12.08.49 by buildguy) : Error initializing step [Apache Kafka Consumer]
2016/08/01 09:12:00 - Apache Kafka Consumer.0 - ERROR (version 6.1.0.1-196, build 1 from 2016-04-07 12.08.49 by buildguy) : java.lang.NoClassDefFoundError: Could not initialize class org.apache.log4j.LogManager
2016/08/01 09:12:00 - Apache Kafka Consumer.0 - at org.apache.log4j.Logger.getLogger(Logger.java:104)
2016/08/01 09:12:00 - Apache Kafka Consumer.0 - at kafka.utils.Logging$class.logger(Logging.scala:24)
2016/08/01 09:12:00 - Apache Kafka Consumer.0 - at kafka.utils.VerifiableProperties.logger$lzycompute(VerifiableProperties.scala:26)
2016/08/01 09:12:00 - Apache Kafka Consumer.0 - at kafka.utils.VerifiableProperties.logger(VerifiableProperties.scala:26)
2016/08/01 09:12:00 - Apache Kafka Consumer.0 - at kafka.utils.Logging$class.info(Logging.scala:67)
2016/08/01 09:12:00 - Apache Kafka Consumer.0 - at kafka.utils.VerifiableProperties.info(VerifiableProperties.scala:26)
2016/08/01 09:12:00 - Apache Kafka Consumer.0 - at kafka.utils.VerifiableProperties.verify(VerifiableProperties.scala:217)
2016/08/01 09:12:00 - Apache Kafka Consumer.0 - at kafka.consumer.ConsumerConfig.(ConsumerConfig.scala:95)
2016/08/01 09:12:00 - Apache Kafka Consumer.0 - at com.ruckuswireless.pentaho.kafka.consumer.KafkaConsumerStep.init(KafkaConsumerStep.java:64)
2016/08/01 09:12:00 - Apache Kafka Consumer.0 - at org.pentaho.di.trans.step.StepInitThread.run(StepInitThread.java:69)
2016/08/01 09:12:00 - Apache Kafka Consumer.0 - at java.lang.Thread.run(Unknown Source)
2016/08/01 09:12:00 - kafkaConsumer - ERROR (version 6.1.0.1-196, build 1 from 2016-04-07 12.08.49 by buildguy) : Step [Apache Kafka Consumer.0] failed to initialize!
2016/08/01 09:12:00 - Spoon - ERROR (version 6.1.0.1-196, build 1 from 2016-04-07 12.08.49 by buildguy) : kafkaConsumer: preparing transformation execution failed
2016/08/01 09:12:00 - Spoon - ERROR (version 6.1.0.1-196, build 1 from 2016-04-07 12.08.49 by buildguy) : org.pentaho.di.core.exception.KettleException:
2016/08/01 09:12:00 - Spoon - We failed to initialize at least one step. Execution can not begin!
2016/08/01 09:12:00 - Spoon -
2016/08/01 09:12:00 - Spoon -
2016/08/01 09:12:00 - Spoon - at org.pentaho.di.trans.Trans.prepareExecution(Trans.java:1142)
2016/08/01 09:12:00 - Spoon - at org.pentaho.di.ui.spoon.trans.TransGraph$29.run(TransGraph.java:4035)
2016/08/01 09:12:00 - Spoon - at java.lang.Thread.run(Unknown Source)

Am I missing anything? Thanks.

after kafka connect JavaScript have question

I think after kafka-consumer use JavaScript handle received json message ,open JavaScript have question:
org.eclipse.swt.SWTException: Failed to execute runnable (java.lang.IllegalArgumentException: Argument cannot be null)
at org.eclipse.swt.SWT.error(Unknown Source)
at org.eclipse.swt.SWT.error(Unknown Source)
at org.eclipse.swt.widgets.Synchronizer.runAsyncMessages(Unknown Source)
at org.eclipse.swt.widgets.Display.runAsyncMessages(Unknown Source)
at org.eclipse.swt.widgets.Display.readAndDispatch(Unknown Source)
at org.pentaho.di.ui.trans.steps.scriptvalues_mod.ScriptValuesModDialog.open(ScriptValuesModDialog.java:703)
at org.pentaho.di.ui.spoon.delegates.SpoonStepsDelegate.editStep(SpoonStepsDelegate.java:127)
at org.pentaho.di.ui.spoon.Spoon.editStep(Spoon.java:8789)
at org.pentaho.di.ui.spoon.trans.TransGraph.editStep(TransGraph.java:3179)
at org.pentaho.di.ui.spoon.trans.TransGraph.mouseDoubleClick(TransGraph.java:775)
at org.eclipse.swt.widgets.TypedListener.handleEvent(Unknown Source)
at org.eclipse.swt.widgets.EventTable.sendEvent(Unknown Source)
at org.eclipse.swt.widgets.Display.sendEvent(Unknown Source)
at org.eclipse.swt.widgets.Widget.sendEvent(Unknown Source)
at org.eclipse.swt.widgets.Display.runDeferredEvents(Unknown Source)
at org.eclipse.swt.widgets.Display.readAndDispatch(Unknown Source)
at org.pentaho.di.ui.spoon.Spoon.readAndDispatch(Spoon.java:1359)
at org.pentaho.di.ui.spoon.Spoon.waitForDispose(Spoon.java:7990)
at org.pentaho.di.ui.spoon.Spoon.start(Spoon.java:9290)
at org.pentaho.di.ui.spoon.Spoon.main(Spoon.java:685)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.pentaho.commons.launcher.Launcher.main(Launcher.java:92)
Caused by: java.lang.IllegalArgumentException: Argument cannot be null
at org.eclipse.swt.SWT.error(Unknown Source)
at org.eclipse.swt.SWT.error(Unknown Source)
at org.eclipse.swt.SWT.error(Unknown Source)
at org.eclipse.swt.widgets.Widget.error(Unknown Source)
at org.eclipse.swt.widgets.TreeItem.setText(Unknown Source)
at org.eclipse.swt.widgets.TreeItem.setText(Unknown Source)
at org.pentaho.di.ui.trans.steps.scriptvalues_mod.ScriptValuesModDialog$16.run(ScriptValuesModDialog.java:1739)
at org.eclipse.swt.widgets.RunnableLock.run(Unknown Source)
... 23 more

kafka 2.11.010 err

producer kafka_2.11-0.10.0.0,using consumer default lib,can't received anything。
message is :2017-04-20 14:42:51 [IFF:56]-[INFO] topicName:BI_LOG JSONObjectString={"appid":"SMS","channel":"UP"}
config as this
Topic name:BI_LOG
Target key field name:message
Target key field name:a

if changed lib kafka_2.10-0.10.0.0.jar 、kafka-clients-0.10.0.0.jar ,running has error log
2017/04/20 15:26:19 - Spoon - Transformation opened.
2017/04/20 15:26:19 - Spoon - Launching transformation [pisp]...
2017/04/20 15:26:19 - Spoon - Started the transformation execution.
2017/04/20 15:26:19 - pisp - Dispatching started for transformation [pisp]
2017/04/20 15:26:20 - Apache Kafka Consumer.0 - Creating Kafka consumer listening on zookeeper: 10.121.30.90:2181
2017/04/20 15:26:31 - Apache Kafka Consumer.0 - ERROR (version 7.0.0.0-25, build 1 from 2016-11-05 15.35.36 by buildguy) : Error initializing step [Apache Kafka Consumer]
2017/04/20 15:26:31 - Apache Kafka Consumer.0 - ERROR (version 7.0.0.0-25, build 1 from 2016-11-05 15.35.36 by buildguy) : kafka.common.ConsumerRebalanceFailedException: group_team-PC-1492673180099-4ac86678 can't rebalance after 4 retries
2017/04/20 15:26:31 - Apache Kafka Consumer.0 - at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:670)
2017/04/20 15:26:31 - Apache Kafka Consumer.0 - at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:977)
2017/04/20 15:26:31 - Apache Kafka Consumer.0 - at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:264)
2017/04/20 15:26:31 - Apache Kafka Consumer.0 - at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:85)
2017/04/20 15:26:31 - Apache Kafka Consumer.0 - at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:97)
2017/04/20 15:26:31 - Apache Kafka Consumer.0 - at com.ruckuswireless.pentaho.kafka.consumer.KafkaConsumerStep.init(KafkaConsumerStep.java:71)
2017/04/20 15:26:31 - Apache Kafka Consumer.0 - at org.pentaho.di.trans.step.StepInitThread.run(StepInitThread.java:69)
2017/04/20 15:26:31 - Apache Kafka Consumer.0 - at java.lang.Thread.run(Thread.java:745)
2017/04/20 15:26:31 - pisp - ERROR (version 7.0.0.0-25, build 1 from 2016-11-05 15.35.36 by buildguy) : Step [Apache Kafka Consumer.0] failed to initialize!
2017/04/20 15:26:31 - Spoon - ERROR (version 7.0.0.0-25, build 1 from 2016-11-05 15.35.36 by buildguy) : pisp: preparing transformation execution failed
2017/04/20 15:26:31 - Spoon - ERROR (version 7.0.0.0-25, build 1 from 2016-11-05 15.35.36 by buildguy) : org.pentaho.di.core.exception.KettleException:
2017/04/20 15:26:31 - Spoon - We failed to initialize at least one step. Execution can not begin!
2017/04/20 15:26:31 - Spoon -
2017/04/20 15:26:31 - Spoon -
2017/04/20 15:26:31 - Spoon - at org.pentaho.di.trans.Trans.prepareExecution(Trans.java:1144)
2017/04/20 15:26:31 - Spoon - at org.pentaho.di.ui.spoon.trans.TransGraph$29.run(TransGraph.java:4156)
2017/04/20 15:26:31 - Spoon - at java.lang.Thread.run(Thread.java:745)

Sub-transforms

Do you know if it is possible to pass messages retrieved from Kafka down to a sub-transform or can they only be worked upon in the same transform as the consumer? I've tried passing them down but can't seem to get it to work.

consumer can't work?

I've been using the latest version of consumer, but I still can't read the data, and no errors were reported in kettle.Why does this happen?And I've added the associated jars to the lib directory, as will did.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.