nathanmarz / storm-contrib Goto Github PK
View Code? Open in Web Editor NEWA collection of spouts, bolts, serializers, DSLs, and other goodies to use with Storm
License: Eclipse Public License 1.0
A collection of spouts, bolts, serializers, DSLs, and other goodies to use with Storm
License: Eclipse Public License 1.0
If trying to connect to a port that is running something other than kafka, the kafka client can hang indefinitely. Instead, KafkaSpout (both Storm and Trident) should connect in a future and give up after a timeout.
RDBMSDumperBolt doesnt works in cluster mode.
The constructor code must be moved to the prepare method, leaving only the initialization of variables in the constructor.
It worked for me this way:
// Constructor after changes
public RDBMSDumperBolt(String primaryKey, String tableName, ArrayList columnNames,
ArrayList columnTypes, String dBUrl, String username, String password) throws SQLException {
super();
this.primaryKey = primaryKey;
this.tableName = tableName;
this.columnNames = columnNames;
this.columnTypes = columnTypes;
this.dBUrl = dBUrl;
this.username = username;
this.password = password;
}
// Prepare after changes
@OverRide
public void prepare(Map stormConf, TopologyContext context) {
try {
con = connector.getConnection(dBUrl, username, password);
} catch (Exception e) {
e.printStackTrace();
}
communicator = new RDBMSCommunicator(con, primaryKey, tableName, columnNames, columnTypes);
}
hello
seeking to run storm-kafka against newer kafka codebase (0.8.x, 0.9.x) codebease in contrib/storm-kafka is compatible with 0.7.x only..
any chance in the near term for a port to the newer kafka ?
thanks!
I set my spout as OpaqueTransactionalKafkaSpout,and I've took two test case with the bolt used in topology:
1)set the bolt extends BaseBatchBolt,and I have two bolts,the problem is the data will just computed in the fist bolt's execute(),it will not run the finishBatch().
I also saw the demo is storm-starter,found that the MemoryTransactionTopology the setted the max tuples in a batch,but OpaqueTransactionalKafkaSpout did not.I wander whether it is the cause.
2)set the bolt extends BaseBasicBolt , it will fail some data and re-consume from kafka.
Is there any solutions with the use of kafka in storm?
Looks like twitter removed kafka client from their repo.
Getting "com.twitter#kafka_2.9.2;0.7.0: not found"
I am using TransactionalTridentKafkaSpout and transactional trident state, it works correctly in local cluster, but fails to get any message when I deploy it to a remote cluster
In both local cluster and remote cluster I am pointing TransactionalTridentKafkaSpout to the same zookeeper and kafka, really not know how this can possibly happen
I would like to see a no-args constructor available in KafkaSpout.
Some of the information to be set in SpoutConfig for my system needs to be injected - zookeeper servers from a property file. By not having a no-args constructor, my Topology has to be aware of this SpoutConfig and responsible for creating it.
If I've a number of spout implementations in a plant, and they are all using the KafkaSpout implementation... is there any means or desire for a means (other than mine own) to explicit control the streamId the spout emits to?
Use case: Using the documented "forceStartOffsetTime" at some future time, I may want to emit the recovery (depending on its currency of priority of originating request"), send some input down a "special" stream. Its not clear I could mimic this behavior with a separate spout; even if I could set the "emitTo" streamId.
thoughts?
I run the storm cluster, encountered the following problems, I hope a great God can put forward solution.
error info:
Client client-boss-1 [ERROR] connection attempt 2 to Netty-Client-storm3/192.168.172.133:6700 failed: java.net.ConnectException: connection denied: storm3/192.168.172
.133:6700
2017-07-26 11:12:30.349 o.a.s.u.StormBoundedExponentialBackoffRetry client-boss-1 [WARN] WILL SLEEP FOR 106ms (NOT MAX)
Trying to get storm-kafka set up and I'm running into an exception. Here is the full exception:
java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
at kafka.utils.Utils$.read(Utils.scala:486)
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:57)
at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:184)
at kafka.consumer.SimpleConsumer.liftedTree3$1(SimpleConsumer.scala:168)
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:158)
at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:66)
at storm.kafka.PartitionManager.<init>(PartitionManager.java:67)
at storm.kafka.StaticCoordinator.<init>(StaticCoordinator.java:24)
at storm.kafka.KafkaSpout.open(KafkaSpout.java:72)
at backtype.storm.daemon.executor$fn__3985$fn__3997.invoke(executor.clj:460)
at backtype.storm.util$async_loop$fn__465.invoke(util.clj:375)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:722)
Here is the topology I'm trying to submit. I removed all the bolts to narrow down the problem:
SpoutConfig spoutConfig = new SpoutConfig(
new KafkaConfig.StaticHosts(hosts, 2),
"syslog",
"/kafkastorm",
"discovery");
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka", kafkaSpout, 2);
Config conf = new Config();
conf.setDebug(true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("syslog", conf, builder.createTopology());
I'm using these dependencies. I had to go to an older version of storm-kafka since it depends on beta storm versions. I am currently using the 0.8.2 of storm so I wanted something that works with that.
<dependency>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.8.2</version>
</dependency>
<dependency>
<groupId>storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.8.0-wip4</version>
</dependency>
The only thing I can seem to find on the web is that I have to rebuild storm-kafka, but I'm wondering if there is another solution.
git submodule update
Cloning into 'storm-backport'...
Permission denied (publickey).
fatal: Could not read from remote repository.
Please make sure you have the correct access rights
and the repository exists.
Clone of '[email protected]:stormprocessor/storm-backport.git' into submodule path 'storm-backport' failed
Trying to read from a partitioned Kafka Topic w/ 3 partitions. Keep getting this. Any ideas?
Using the default TestTopology.
3848 [Thread-25] ERROR backtype.storm.daemon.task -
kafka.common.OffsetOutOfRangeException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
at java.lang.reflect.Constructor.newInstance(Constructor.java:513)
at java.lang.Class.newInstance0(Class.java:355)
at java.lang.Class.newInstance(Class.java:308)
at kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53)
at kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:60)
at kafka.javaapi.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:43)
at kafka.javaapi.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:42)
at storm.kafka.KafkaUtils.emitPartitionBatchNew(KafkaUtils.java:34)
at storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:70)
at storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:59)
at backtype.storm.transactional.partitioned.OpaquePartitionedTransactionalSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTransactionalSpoutExecutor.java:77)
at backtype.storm.transactional.TransactionalSpoutBatchExecutor.execute(TransactionalSpoutBatchExecutor.java:47)
at backtype.storm.coordination.CoordinatedBolt.execute(CoordinatedBolt.java:290)
at backtype.storm.daemon.task$fn__3545$fn__3630.invoke(task.clj:499)
at backtype.storm.daemon.task$mk_task$iter__3386__3390$fn__3391$fn__3392$fn__3393.invoke(task.clj:247)
at clojure.lang.AFn.applyToHelper(AFn.java:159)
at clojure.lang.AFn.applyTo(AFn.java:151)
at clojure.core$apply.invoke(core.clj:540)
at backtype.storm.util$async_loop$fn__487.invoke(util.clj:271)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:662)
4113 [Thread-25] INFO backtype.storm.util - Halting process: ("Task died")
in this code piece:
SpoutConfig spoutConfig = new SpoutConfig(
ImmutableList.of("kafkahost1", "kafkahost2"), // list of Kafka brokers
8, // number of partitions per host
"clicks", // topic to read from
"/kafkastorm", // the root path in Zookeeper for the spout to store the consumer offsets
"discovery"); // an id for this consumer for storing the consumer offsets in Zookeeper
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
is the 'discovery' the consumergroup? If not, how can I specify the consumergroup?
At present the pom.xml contains these entries:
storm-contrib-core storm-contrib-mongo storm-contrib-sqs
Searching for a pom.xml, I get these contenders for actual modules:
storm-contrib/pom.xml
storm-contrib/storm-cassandra/examples/pom.xml
storm-contrib/storm-cassandra/pom.xml
storm-contrib/storm-clockspout/pom.xml
storm-contrib/storm-growl/pom.xml
storm-contrib/storm-hbase/pom.xml
storm-contrib/storm-jms/examples/pom.xml
storm-contrib/storm-jms/pom.xml
storm-contrib/storm-mongo/pom.xml
storm-contrib/storm-rdbms/pom.xml
storm-contrib/storm-signals/pom.xml
storm-contrib/storm-sqs/pom.xml
Are the @stormprocessor repos the correct ones to send pull requests for the submodules to? I'm a bit confused as to who is responsible for each one and who can approve pull requests. In #25 there was a suggested list but neither @nathanmarz nor @apetresc are listed as members of @stormprocessor.
I'd be happy to jump in to a larger role but I'd figured I'd try to get a few pull requests through before offering to do so, and thus make sure I had an aligned vision with the project.
Hi,
Are there any plans or attemps at supporting Kafka 0.8?
Thanks,
Danijel
When I run given MapExample usign storm 0.8.1 and cloudera hadoop 0.20.2 there is serization issue.
Exception in thread "main" java.lang.RuntimeException: java.io.NotSerializableException: org.apache.hadoop.hdfs.DistributedFileSystem
at backtype.storm.utils.Utils.serialize(Utils.java:47)
at backtype.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:89)
at storm.state.example.MapExample.main(MapExample.java:126)
It can run correctly in the local mode when I write to my mysql database. But when I use in storm cluster mode, it fails.
I submit my topology by StormSubmitter.submitTopology(mTopologyName,mTopologyConfig,mBuilder.createTopology());
It can create table successfully, but it can't write anything to my mysql in RDBMSDumperBolt.
java.lang.RuntimeException: java.lang.NullPointerException
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107) ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:78) ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77) ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]
at backtype.storm.daemon.executor$eval3918$fn__3919$fn__3931$fn__3978.invoke(executor.clj:745) ~[na:na]
at backtype.storm.util$async_loop$fn__384.invoke(util.clj:433) ~[na:na]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
Caused by: java.lang.NullPointerException: null
at storm.contrib.rdbms.RDBMSDumperBolt.execute(RDBMSDumperBolt.java:95) ~[stormjar.jar:na]
at backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]
at backtype.storm.daemon.executor$eval3918$fn__3919$tuple_action_fn__3921.invoke(executor.clj:630) ~[na:na]
at backtype.storm.daemon.executor$mk_task_receiver$fn__3839.invoke(executor.clj:398) ~[na:na]
at backtype.storm.disruptor$clojure_handler$reify__1560.onEvent(disruptor.clj:58) ~[na:na]
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:104) ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]
... 6 common frames omitted
2014-05-29 06:11:15 b.s.d.executor [ERROR]
Can anyone help me to solve this tough problem? Thank a lots:)))
Hi,
I have been trying to use kafka-spout with clojure but can not seem to get it to work - do you have an example for making a spout in clojure?
So far I have the following: https://gist.github.com/pixie79/7677546
Regards
Mark
PartitionManager resets to the offset of a failed message and upon nextTuple() starts re-transmitting all data from that offset forward. We modified this behavior to cache the data emitted and only re-transmit the failed data on nextTuple(). I would like to submit the changes for review/inclusion if others feel that this is worthwhile. The following changes were made:
*Add a HashMap<Long, List> for pending messages
*Add a LinkedList for offsets of failed messages
*Change fail(Long offset) to only add the offset to the failed offset list
*Change ack(Long offset) to also remove the entry of the acked offset from the HashMap
*Change next(SpoutOutputCollector collector) to re-emit failed messages if list is not empty
Respectfully Submitted
Does Storm Kafka spout compiles ? Libs don't exist in the twitter repo!
Regarding:
I have recently added this sort of functionality to one of my projects that uses storm-kafka. I'd like to contribute but my solution uses zkClient: https://github.com/sgroschupf/zkclient
Is that acceptable or is the desired solution to go with a zookeeper library and not using zkClient?
Most users will checking this project out and following the wiki will immediately encounter:
storm-contrib$ git submodule update
Permission denied (publickey).
fatal: The remote end hung up unexpectedly
Unable to fetch in submodule path 'storm-backport'`
Please provide a useful storm-contrib project to those people who are not also commiters for all of the sub-projects.
Under which license is this repository? MIT? Apache-2.0? Or something else?
Hi,
Is it possible to update storm-kafka in clojar ?
Last release of storm-kafka is really interresting but not available from maven.
It also "depend on 0.9.0-wip1" wich is also not a storm release available from clojar...
I tried to push libs in clojar myself but I'm a lein newbie and ran into problem when trying to "lein jar, pom" storm 0.9.0-wip1
Regards
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.