Code Monkey home page Code Monkey logo

storm-contrib's People

Contributors

apetresc avatar boneill42 avatar danbeaulieu avatar dependabot[bot] avatar docdoc avatar git2samus avatar jayatitiwari avatar nathanmarz avatar ptgoetz avatar sorenmacbeth avatar surajwaghulde avatar tjun avatar tombrown52 avatar xuwenhao avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

storm-contrib's Issues

KafkaSpout should timeout all connection issues

If trying to connect to a port that is running something other than kafka, the kafka client can hang indefinitely. Instead, KafkaSpout (both Storm and Trident) should connect in a future and give up after a timeout.

storm-rdbms RDBMSDumperBolt doesn´t works in cluster mode

RDBMSDumperBolt doesnt works in cluster mode.

The constructor code must be moved to the prepare method, leaving only the initialization of variables in the constructor.

It worked for me this way:

// Constructor after changes
public RDBMSDumperBolt(String primaryKey, String tableName, ArrayList columnNames,
ArrayList columnTypes, String dBUrl, String username, String password) throws SQLException {
super();
this.primaryKey = primaryKey;
this.tableName = tableName;
this.columnNames = columnNames;
this.columnTypes = columnTypes;
this.dBUrl = dBUrl;
this.username = username;
this.password = password;
}

// Prepare after changes
@OverRide
public void prepare(Map stormConf, TopologyContext context) {
try {
con = connector.getConnection(dBUrl, username, password);
} catch (Exception e) {
e.printStackTrace();
}
communicator = new RDBMSCommunicator(con, primaryKey, tableName, columnNames, columnTypes);
}

storm-kafka in use

I set my spout as OpaqueTransactionalKafkaSpout,and I've took two test case with the bolt used in topology:
1)set the bolt extends BaseBatchBolt,and I have two bolts,the problem is the data will just computed in the fist bolt's execute(),it will not run the finishBatch().
I also saw the demo is storm-starter,found that the MemoryTransactionTopology the setted the max tuples in a batch,but OpaqueTransactionalKafkaSpout did not.I wander whether it is the cause.

2)set the bolt extends BaseBasicBolt , it will fail some data and re-consume from kafka.

Is there any solutions with the use of kafka in storm?

Unresolved dependencies

Looks like twitter removed kafka client from their repo.
Getting "com.twitter#kafka_2.9.2;0.7.0: not found"

KafkaSpout - no args constructor

I would like to see a no-args constructor available in KafkaSpout.

Some of the information to be set in SpoutConfig for my system needs to be injected - zookeeper servers from a property file. By not having a no-args constructor, my Topology has to be aware of this SpoutConfig and responsible for creating it.

StreamID for kafka-spout emits?

If I've a number of spout implementations in a plant, and they are all using the KafkaSpout implementation... is there any means or desire for a means (other than mine own) to explicit control the streamId the spout emits to?

Use case: Using the documented "forceStartOffsetTime" at some future time, I may want to emit the recovery (depending on its currency of priority of originating request"), send some input down a "special" stream. Its not clear I could mimic this behavior with a separate spout; even if I could set the "emitTo" streamId.

thoughts?

storm cluster Wrong question

I run the storm cluster, encountered the following problems, I hope a great God can put forward solution.

error info:
Client client-boss-1 [ERROR] connection attempt 2 to Netty-Client-storm3/192.168.172.133:6700 failed: java.net.ConnectException: connection denied: storm3/192.168.172
.133:6700
2017-07-26 11:12:30.349 o.a.s.u.StormBoundedExponentialBackoffRetry client-boss-1 [WARN] WILL SLEEP FOR 106ms (NOT MAX)

storm-kafka: EOFException

Trying to get storm-kafka set up and I'm running into an exception. Here is the full exception:

java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
    at kafka.utils.Utils$.read(Utils.scala:486)
    at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
    at kafka.network.Receive$class.readCompletely(Transmission.scala:57)
    at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
    at kafka.consumer.SimpleConsumer.getResponse(SimpleConsumer.scala:184)
    at kafka.consumer.SimpleConsumer.liftedTree3$1(SimpleConsumer.scala:168)
    at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:158)
    at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:66)
    at storm.kafka.PartitionManager.<init>(PartitionManager.java:67)
    at storm.kafka.StaticCoordinator.<init>(StaticCoordinator.java:24)
    at storm.kafka.KafkaSpout.open(KafkaSpout.java:72)
    at backtype.storm.daemon.executor$fn__3985$fn__3997.invoke(executor.clj:460)
    at backtype.storm.util$async_loop$fn__465.invoke(util.clj:375)
    at clojure.lang.AFn.run(AFn.java:24)
    at java.lang.Thread.run(Thread.java:722)

Here is the topology I'm trying to submit. I removed all the bolts to narrow down the problem:

SpoutConfig spoutConfig = new SpoutConfig(
                new KafkaConfig.StaticHosts(hosts, 2),
                "syslog",
                "/kafkastorm",
                "discovery");

KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka", kafkaSpout, 2);

Config conf = new Config();
conf.setDebug(true);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("syslog", conf, builder.createTopology());

I'm using these dependencies. I had to go to an older version of storm-kafka since it depends on beta storm versions. I am currently using the 0.8.2 of storm so I wanted something that works with that.

        <dependency>
            <groupId>storm</groupId>
            <artifactId>storm</artifactId>
            <version>0.8.2</version>
        </dependency>
        <dependency>
            <groupId>storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>0.8.0-wip4</version>
        </dependency>

The only thing I can seem to find on the web is that I have to rebuild storm-kafka, but I'm wondering if there is another solution.

Not able to update the submodules

git submodule update
Cloning into 'storm-backport'...
Permission denied (publickey).
fatal: Could not read from remote repository.

Please make sure you have the correct access rights
and the repository exists.
Clone of '[email protected]:stormprocessor/storm-backport.git' into submodule path 'storm-backport' failed

kafka.common.OffsetOutOfRangeException

Trying to read from a partitioned Kafka Topic w/ 3 partitions. Keep getting this. Any ideas?

Using the default TestTopology.

3848 [Thread-25] ERROR backtype.storm.daemon.task -
kafka.common.OffsetOutOfRangeException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
at java.lang.reflect.Constructor.newInstance(Constructor.java:513)
at java.lang.Class.newInstance0(Class.java:355)
at java.lang.Class.newInstance(Class.java:308)
at kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53)
at kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:60)
at kafka.javaapi.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:43)
at kafka.javaapi.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:42)
at storm.kafka.KafkaUtils.emitPartitionBatchNew(KafkaUtils.java:34)
at storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:70)
at storm.kafka.OpaqueTransactionalKafkaSpout$Emitter.emitPartitionBatch(OpaqueTransactionalKafkaSpout.java:59)
at backtype.storm.transactional.partitioned.OpaquePartitionedTransactionalSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTransactionalSpoutExecutor.java:77)
at backtype.storm.transactional.TransactionalSpoutBatchExecutor.execute(TransactionalSpoutBatchExecutor.java:47)
at backtype.storm.coordination.CoordinatedBolt.execute(CoordinatedBolt.java:290)
at backtype.storm.daemon.task$fn__3545$fn__3630.invoke(task.clj:499)
at backtype.storm.daemon.task$mk_task$iter__3386__3390$fn__3391$fn__3392$fn__3393.invoke(task.clj:247)
at clojure.lang.AFn.applyToHelper(AFn.java:159)
at clojure.lang.AFn.applyTo(AFn.java:151)
at clojure.core$apply.invoke(core.clj:540)
at backtype.storm.util$async_loop$fn__487.invoke(util.clj:271)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:662)
4113 [Thread-25] INFO backtype.storm.util - Halting process: ("Task died")

how can I set 'consumergroup' in storm-kafka?

in this code piece:

SpoutConfig spoutConfig = new SpoutConfig(
ImmutableList.of("kafkahost1", "kafkahost2"), // list of Kafka brokers
8, // number of partitions per host
"clicks", // topic to read from
"/kafkastorm", // the root path in Zookeeper for the spout to store the consumer offsets
"discovery"); // an id for this consumer for storing the consumer offsets in Zookeeper
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

is the 'discovery' the consumergroup? If not, how can I specify the consumergroup?

Pom.xml's module list references nothing that exists

At present the pom.xml contains these entries:

storm-contrib-core storm-contrib-mongo storm-contrib-sqs

Searching for a pom.xml, I get these contenders for actual modules:

storm-contrib/pom.xml
storm-contrib/storm-cassandra/examples/pom.xml
storm-contrib/storm-cassandra/pom.xml
storm-contrib/storm-clockspout/pom.xml
storm-contrib/storm-growl/pom.xml
storm-contrib/storm-hbase/pom.xml
storm-contrib/storm-jms/examples/pom.xml
storm-contrib/storm-jms/pom.xml
storm-contrib/storm-mongo/pom.xml
storm-contrib/storm-rdbms/pom.xml
storm-contrib/storm-signals/pom.xml
storm-contrib/storm-sqs/pom.xml

Pull requests for submodules

Are the @stormprocessor repos the correct ones to send pull requests for the submodules to? I'm a bit confused as to who is responsible for each one and who can approve pull requests. In #25 there was a suggested list but neither @nathanmarz nor @apetresc are listed as members of @stormprocessor.

I'd be happy to jump in to a larger role but I'd figured I'd try to get a few pull requests through before offering to do so, and thus make sure I had an aligned vision with the project.

Serialization Issue in storm-state module for org.apache.hadoop.hdfs.DistributedFileSystem

When I run given MapExample usign storm 0.8.1 and cloudera hadoop 0.20.2 there is serization issue.

Exception in thread "main" java.lang.RuntimeException: java.io.NotSerializableException: org.apache.hadoop.hdfs.DistributedFileSystem
at backtype.storm.utils.Utils.serialize(Utils.java:47)
at backtype.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:89)
at storm.state.example.MapExample.main(MapExample.java:126)

[storm-rdbms] java.lang.NullPointerException when using storm on cluster mode

It can run correctly in the local mode when I write to my mysql database. But when I use in storm cluster mode, it fails.

I submit my topology by StormSubmitter.submitTopology(mTopologyName,mTopologyConfig,mBuilder.createTopology());

It can create table successfully, but it can't write anything to my mysql in RDBMSDumperBolt.

java.lang.RuntimeException: java.lang.NullPointerException
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107) ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:78) ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77) ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]
at backtype.storm.daemon.executor$eval3918$fn__3919$fn__3931$fn__3978.invoke(executor.clj:745) ~[na:na]
at backtype.storm.util$async_loop$fn__384.invoke(util.clj:433) ~[na:na]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
Caused by: java.lang.NullPointerException: null
at storm.contrib.rdbms.RDBMSDumperBolt.execute(RDBMSDumperBolt.java:95) ~[stormjar.jar:na]
at backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]
at backtype.storm.daemon.executor$eval3918$fn__3919$tuple_action_fn__3921.invoke(executor.clj:630) ~[na:na]
at backtype.storm.daemon.executor$mk_task_receiver$fn__3839.invoke(executor.clj:398) ~[na:na]
at backtype.storm.disruptor$clojure_handler$reify__1560.onEvent(disruptor.clj:58) ~[na:na]
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:104) ~[storm-core-0.9.1-incubating.jar:0.9.1-incubating]
... 6 common frames omitted
2014-05-29 06:11:15 b.s.d.executor [ERROR]

Can anyone help me to solve this tough problem? Thank a lots:)))

Kafka spout PartitionManager should emit failed tuples without replaying all data that follows

PartitionManager resets to the offset of a failed message and upon nextTuple() starts re-transmitting all data from that offset forward. We modified this behavior to cache the data emitted and only re-transmit the failed data on nextTuple(). I would like to submit the changes for review/inclusion if others feel that this is worthwhile. The following changes were made:

*Add a HashMap<Long, List> for pending messages
*Add a LinkedList for offsets of failed messages
*Change fail(Long offset) to only add the offset to the failed offset list
*Change ack(Long offset) to also remove the entry of the acked offset from the HashMap
*Change next(SpoutOutputCollector collector) to re-emit failed messages if list is not empty

Respectfully Submitted

Storm Kafka, reading brokers from Zookeeper

Regarding:

  • Discover Kafka brokers dynamically through Zookeeper instead of using a static list of brokers

I have recently added this sort of functionality to one of my projects that uses storm-kafka. I'd like to contribute but my solution uses zkClient: https://github.com/sgroschupf/zkclient
Is that acceptable or is the desired solution to go with a zookeeper library and not using zkClient?

Non-commiters are unable to checkout submodules.

Most users will checking this project out and following the wiki will immediately encounter:

storm-contrib$ git submodule update
Permission denied (publickey).
fatal: The remote end hung up unexpectedly
Unable to fetch in submodule path 'storm-backport'`

Please provide a useful storm-contrib project to those people who are not also commiters for all of the sub-projects.

License?

Under which license is this repository? MIT? Apache-2.0? Or something else?

Update maven dependencies for storm-kafka

Hi,
Is it possible to update storm-kafka in clojar ?
Last release of storm-kafka is really interresting but not available from maven.
It also "depend on 0.9.0-wip1" wich is also not a storm release available from clojar...

I tried to push libs in clojar myself but I'm a lein newbie and ran into problem when trying to "lein jar, pom" storm 0.9.0-wip1

Regards

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.