Code Monkey home page Code Monkey logo

kinesis-storm-spout's People

Contributors

afitzgibbon avatar gauravgh avatar pfifer avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kinesis-storm-spout's Issues

Unable to load AWS credentials from any provider in the chain

Hello Awslabs team,

I have configure the ~/.aws/credentials files with my AWS credentials.
Still I'm getting following exception:

9503 [Thread-21-kinesis-entry] ERROR backtype.storm.util - Async loop died!
com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117) ~[kinesis-storm-clickstream-sample-app-1.0.0-jar-with-dependencies.jar:na]
at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2460) ~[kinesis-storm-clickstream-sample-app-1.0.0-jar-with-dependencies.jar:na]
at com.amazonaws.services.kinesis.AmazonKinesisClient.describeStream(AmazonKinesisClient.java:860) ~[kinesis-storm-clickstream-sample-app-1.0.0-jar-with-dependencies.jar:na]
at com.amazonaws.services.kinesis.stormspout.KinesisHelper.getShardList(KinesisHelper.java:85) ~[kinesis-storm-clickstream-sample-app-1.0.0-jar-with-dependencies.jar:na]
at com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperStateManager.activate(ZookeeperStateManager.java:109) ~[kinesis-storm-clickstream-sample-app-1.0.0-jar-with-dependencies.jar:na]
at com.amazonaws.services.kinesis.stormspout.KinesisSpout.activate(KinesisSpout.java:125) ~[kinesis-storm-clickstream-sample-app-1.0.0-jar-with-dependencies.jar:na]
at backtype.storm.daemon.executor$fn__4654$fn__4669$fn__4698.invoke(executor.clj:563) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.util$async_loop$fn__458.invoke(util.clj:463) ~[storm-core-0.9.4.jar:0.9.4]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75]

I have tried with manually export my credentials. But then, it only work for LocalMode Storm Topology, not for RemoteMode Topology.

Thanks.

Unable to get Kinesis Shard List

Hi,

I am unable to get Kinesis shard list in ZookeeperStateManager - activate() function in remote Mode.

Not able to getShardList in below line code:
ImmutableList shardList = ImmutableList.copyOf(shardListGetter.getShardList().keySet());

The last log which I got:
5815 [Thread-11-kinesis_spout] INFO com.amazonaws.services.kinesis.stormspout.KinesisHelper - Using us-west-1 region

It works fine in case of LocalMode.

Apache Storm - KinesisSpout throwing AmazonClientException backing off

2016-02-02 16:15:18 c.a.s.k.s.u.InfiniteConstantBackoffRetry [DEBUG] Caught exception of type com.amazonaws.AmazonClientException, backing off for 1000 ms.

I tested GET and PUT using Streams and Get requests - both worked flawless. I have all 3 variants Batch, Storm and Spark.
Spark - used KinesisStreams - working
Batch: Can you Get and Put - working
Storm: planning to use KinesisSpout library from kinesis. It is failing with no clue.

Code:
final KinesisSpoutConfig config = new KinesisSpoutConfig(streamname, zookeeperurl);
config.withInitialPositionInStream(ipis);
config.withRegion(Regions.fromName(regionName));
config.withCheckpointIntervalMillis(Integer.parseInt(checkinterval));
config.withZookeeperPrefix("kinesis-zooprefix-" + name);

System.setProperty("aws.accessKeyId", key);
System.setProperty("aws.secretKey", keysecret);
SystemPropertiesCredentialsProvider scp = new SystemPropertiesCredentialsProvider();
final KinesisSpout spout = new KinesisSpoutConflux(config, scp, new ClientConfiguration());

What am I doing wrong?

Storm Logs:
2016-02-02 16:15:17 c.a.s.k.s.KinesisSpout [INFO] KinesisSpoutConflux[taskIndex=0] open() called with topoConfig task index 0 for processing stream Kinesis-Conflux
...
2016-02-02 16:15:17 c.a.s.k.s.KinesisSpout [DEBUG] KinesisSpoutConflux[taskIndex=0] activating. Starting to process stream Kinesis-Test
...
2016-02-02 16:15:17 c.a.s.k.s.KinesisHelper [INFO] Using us-east-1 region

I dont see "nextTuple" getting called.

My Versions:
org.apache.storm
storm-hdfs
0.9.3

com.amazonaws kinesis-storm-spout 1.1.1

Exception when number of worker process is more than one

Hi

When I raised the number of worker process to more than one (topoConf.setNumWorkers(4);) I faced an exception. Could you please help me out?

I've tried both in local mode and remote mode (aws cluster). In both way I faced the same problem in spout.

Exception message:
java.lang.RuntimeException: java.lang.RuntimeException: java.io.NotSerializableException: java.nio.HeapByteBuffer
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90) ~[storm-core-0.9.0.1.jar:na]
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61) ~[storm-core-0.9.0.1.jar:na]
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62) ~[storm-core-0.9.0.1.jar:na]
at backtype.storm.disruptor$consume_loop_STAR_$fn__2975.invoke(disruptor.clj:74) ~[storm-core-0.9.0.1.jar:na]
at backtype.storm.util$async_loop$fn__444.invoke(util.clj:403) ~[storm-core-0.9.0.1.jar:na]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
at java.lang.Thread.run(Thread.java:695) [na:1.6.0_65]
Caused by: java.lang.RuntimeException: java.io.NotSerializableException: java.nio.HeapByteBuffer
at backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:24) ~[storm-core-0.9.0.1.jar:na]
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:554) ~[kryo-2.17.jar:na]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:77) ~[kryo-2.17.jar:na]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18) ~[kryo-2.17.jar:na]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:472) ~[kryo-2.17.jar:na]
at backtype.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:27) ~[storm-core-0.9.0.1.jar:na]
at backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:27) ~[storm-core-0.9.0.1.jar:na]
at backtype.storm.daemon.worker$mk_transfer_fn$fn__5686$fn__5690.invoke(worker.clj:108) ~[storm-core-0.9.0.1.jar:na]
at backtype.storm.util$fast_list_map.invoke(util.clj:801) ~[storm-core-0.9.0.1.jar:na]
at backtype.storm.daemon.worker$mk_transfer_fn$fn__5686.invoke(worker.clj:108) ~[storm-core-0.9.0.1.jar:na]
at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3328.invoke(executor.clj:240) ~[storm-core-0.9.0.1.jar:na]
at backtype.storm.disruptor$clojure_handler$reify__2962.onEvent(disruptor.clj:43) ~[storm-core-0.9.0.1.jar:na]
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87) ~[storm-core-0.9.0.1.jar:na]
... 6 common frames omitted
Caused by: java.io.NotSerializableException: java.nio.HeapByteBuffer
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1165) ~[na:1.6.0_65]
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1535) ~[na:1.6.0_65]
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) ~[na:1.6.0_65]
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1413) ~[na:1.6.0_65]
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1159) ~[na:1.6.0_65]
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:329) ~[na:1.6.0_65]
at backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:21) ~[storm-core-0.9.0.1.jar:na]
... 18 common frames omitted

kinesis-storm-spout for Ireland region

Hi

The current kinesis-storm-spout is available N. Virginia by default. However I want to use it for Ireland region because Kinesis service is available for Ireland region from July 01.

Could you please help me out how the project can be workable for Ireland region also?

com.amazonaws.AbortedException in KinesisShardGetter

We are periodically seeing these messages in our logs after our topology has been running for a number of hours. They are crashing the spout. We have forked the code hosted here to add support for pushing failed messages to a SQS Dead Letter Queue. But we don't think that is related. This issue is meant as a discussion, we will update this issue as we make progress finding the root cause.. Any tips are appreciated.

  • 10 Shard Kinesis
  • 10 parallelism on the Kinesis Spout

We have seen this issue across two of our topologies so we don't think its related to any of our business logic.

{
    "timestamp": "2015-06-25T17:36:22.375+0000",
    "level": "ERROR",
    "thread": "Thread-21-kinesis-spout",
    "logger": "com.amazonaws.services.kinesis.stormspout.KinesisShardGetter",
    "message": "KinesisShardGetter[shardId=shardId-000000000006] Caught exception when fetching records for shardId-000000000006",
    "context": "default",
    "exception": "com.amazonaws.AbortedException: \n\tat com.amazonaws.internal.SdkFilterInputStream.abortIfNeeded(SdkFilterInputStream.java:51)\n\tat com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:71)\n\tat com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:151)\n\tat com.fasterxml.jackson.core.json.UTF8StreamJsonParser.loadMore(UTF8StreamJsonParser.java:180)\n\tat com.fasterxml.jackson.core.base.ParserBase.loadMoreGuaranteed(ParserBase.java:460)\n\tat com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString2(UTF8StreamJsonParser.java:2306)\n\tat com.fasterxml.jackson.core.json.UTF8StreamJsonParser._finishString(UTF8StreamJsonParser.java:2287)\n\tat com.fasterxml.jackson.core.json.UTF8StreamJsonParser.getText(UTF8StreamJsonParser.java:286)\n\tat com.amazonaws.transform.JsonUnmarshallerContextImpl.readCurrentJsonTokenValue(JsonUnmarshallerContextImpl.java:129)\n\tat com.amazonaws.transform.JsonUnmarshallerContextImpl.readText(JsonUnmarshallerContextImpl.java:123)\n\tat com.amazonaws.transform.SimpleTypeJsonUnmarshallers$ByteBufferJsonUnmarshaller.unmarshall(SimpleTypeJsonUnmarshallers.java:185)\n\tat com.amazonaws.services.kinesis.model.transform.RecordJsonUnmarshaller.unmarshall(RecordJsonUnmarshaller.java:54)\n\tat com.amazonaws.services.kinesis.model.transform.RecordJsonUnmarshaller.unmarshall(RecordJsonUnmarshaller.java:31)\n\tat com.amazonaws.transform.ListUnmarshaller.unmarshallJsonToList(ListUnmarshaller.java:93)\n\tat com.amazonaws.transform.ListUnmarshaller.unmarshall(ListUnmarshaller.java:43)\n\tat com.amazonaws.services.kinesis.model.transform.GetRecordsResultJsonUnmarshaller.unmarshall(GetRecordsResultJsonUnmarshaller.java:50)\n\tat com.amazonaws.services.kinesis.model.transform.GetRecordsResultJsonUnmarshaller.unmarshall(GetRecordsResultJsonUnmarshaller.java:31)\n\tat com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:106)\n\tat com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:42)\n\tat com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:975)\n\tat com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:702)\n\tat com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:461)\n\tat com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:296)\n\tat com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2498)\n\tat com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1133)\n\tat com.amazonaws.services.kinesis.stormspout.KinesisShardGetter.safeGetRecords(KinesisShardGetter.java:175)\n\tat com.amazonaws.services.kinesis.stormspout.KinesisShardGetter.getNext(KinesisShardGetter.java:74)\n\tat com.amazonaws.services.kinesis.stormspout.BufferedGetter.rebuffer(BufferedGetter.java:126)\n\tat com.amazonaws.services.kinesis.stormspout.BufferedGetter.ensureBuffered(BufferedGetter.java:119)\n\tat com.amazonaws.services.kinesis.stormspout.BufferedGetter.getNext(BufferedGetter.java:71)\n\tat com.amazonaws.services.kinesis.stormspout.KinesisSpout.nextTuple(KinesisSpout.java:181)\n\tat backtype.storm.daemon.executor$fn__4654$fn__4669$fn__4698.invoke(executor.clj:565)\n\tat backtype.storm.util$async_loop$fn__458.invoke(util.clj:463)\n\tat clojure.lang.AFn.run(AFn.java:24)\n\tat java.lang.Thread.run(Thread.java:745)\n"
}

java.lang.NoClassDefFoundError: com/amazonaws/util/json/JSONObject

I am getting the following exception. May i know the reason.

java.lang.NoClassDefFoundError: com/amazonaws/util/json/JSONObject
at com.amazonaws.services.kinesis.leases.impl.Lease.toString(Lease.java:229)
at java.lang.String.valueOf(String.java:2849)
at java.lang.StringBuilder.append(StringBuilder.java:128)
at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseIfNotExists(LeaseManager.java:281)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncer.syncShardLeases(ShardSyncer.java:127)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncer.checkAndCreateLeasesForNewShards(ShardSyncer.java:88)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardSyncTask.call(ShardSyncTask.java:68)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.initialize(Worker.java:395)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:330)

"Cannot process events if state is not active" while deactivating

Hi

In our topology we have 1 kinesis-spout and we are observing the following error during topology deactivation:

2016-01-23 13:52:49 c.a.s.k.s.s.z.ZookeeperStateManager [INFO] ZookeeperStateManager[taskIndex=0]Advanced checkpoint for shardId-000000000000 to 49557085486395718085074646145964072477802219383101063170
2016-01-23 13:52:55 b.s.d.executor [INFO] Deactivating spout kinesisSpout:(16)
2016-01-23 13:52:55 c.a.s.k.s.s.z.ZookeeperStateManager [INFO] ZookeeperStateManager[taskIndex=0]Advanced checkpoint for shardId-000000000000 to 49557085486395718085074646148585023654726735571303006210
2016-01-23 13:52:55 o.a.z.ClientCnxn [ERROR] Error while calling watcher 
java.lang.IllegalStateException: Cannot process events if state is not active (a ZK connection is necessary).
        at com.google.common.base.Preconditions.checkState(Preconditions.java:176) ~[stormjar.jar:na]
        at com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperStateManager.process(ZookeeperStateManager.java:268) ~[stormjar.jar:na]
        at com.netflix.curator.framework.imps.NamespaceWatcher.process(NamespaceWatcher.java:38) ~[stormjar.jar:na]
        at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:522) [stormjar.jar:na]
        at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:498) [stormjar.jar:na]
2016-01-23 13:52:55 o.a.z.ZooKeeper [INFO] Session: 0x1518daf83170242 closed
2016-01-23 13:52:55 o.a.z.ClientCnxn [INFO] EventThread shut down

After looking at the source we noticed that the ZK connection is closed as soon as the topology is deactivated. That means that all tuples that are still processing will be re-processed at the next startup.

We know that this follows "at least once", but Is there any way of not closing the zk connection until the topology stop ? With this approach we could accept all incoming ACK events and commit in zookeeper the real status.

when I run this I get error "Exception in thread "main" java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider"

I'm trying to run against a AWS Kinesis stream, and I've included my AWSCredentials.properties file in the same directory where sample.properties file is.

I've examined , reviewed packaged jar file and the file is present , exists in packaged jar file com/amazonaws/auth/AWSCredentialsProvider

Below is error message. Any and all replies are appreciated!!

Neha

ns-lang-2.5.jar:/opt/storm/lib/reflectasm-1.07-shaded.jar:/opt/storm/lib/jetty-util-6.1.26.jar:/opt/storm/lib/disruptor-2.10.1.jar:KinesisStormSpout14.jar:/opt/storm/conf:/opt/storm/bin -Dstorm.jar=KinesisStormSpout14.jar SampleTopology sample.properties RemoteMode Exception in thread "main" java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider at java.lang.Class.getDeclaredMethods0(Native Method) at java.lang.Class.privateGetDeclaredMethods(Class.java:2570) at java.lang.Class.getMethod0(Class.java:2813) at java.lang.Class.getMethod(Class.java:1663) at sun.launcher.LauncherHelper.getMainMethod(LauncherHelper.java:494) at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:486) Caused by: java.lang.ClassNotFoundException: com.amazonaws.auth.AWSCredentialsProvider at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) ... 6 more [vagrant@supervisor1 shared]$ Exception in thread "main" java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider

Number of spouts and shards

Hi
I've used the following configuration. However the throughput is not as expected.

Using code sample from another open issue

Kinesis side:
Number of shards: 5
builder.setSpout("kinesis_spout", spout, 10);
conf.setNumWorkers(5);

  1. Can more than one spout used against a shard (Number of workers) ? YES or NO ?
  2. Having more number of threads (10 threads for 5 workers) speeds up the process ?
  3. How to enforce a kinesis starting offset even if zookeeper state exists ?

Karthik

Is this project still active?

From an outside perspective, this project appears to be largely abandoned, considering the number of commits since inception, lack of recent activity and the outstanding / unaddressed open issues. For the benefit of those invested in Storm, could we please get a pulse on the project, whether or not it has a future, if there are plans to address "future work", etc.?

I realize and appreciate that perhaps Kinesis Analytics is the guided path, but any indications of commitment would be greatly appreciated!

Storm doesn't read data from Kinesis.

I'm using the sample project to try and read data from Kinesis. Everything looks fine. This is the log I got.

/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/bin/java -agentlib:jdwp=transport=dt_socket,address=127.0.0.1:60249,suspend=y,server=n -Dfile.encoding=UTF-8 -classpath "/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/jfxswt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/management-agent.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/plugin.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/lib/ant-javafx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/lib/dt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/lib/javafx-mx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/lib/jconsole.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/lib/packager.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/lib/sa-jdi.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/lib/tools.jar:/Users/ncharass/opensource/kinesis-storm-spout/target/classes:/Users/ncharass/.m2/repository/com/amazonaws/aws-java-sdk/1.7.13/aws-java-sdk-1.7.13.jar:/Users/ncharass/.m2/repository/commons-logging/commons-logging/1.1.1/commons-logging-1.1.1.jar:/Users/ncharass/.m2/repository/org/apache/httpcomponents/httpclient/4.2/httpclient-4.2.jar:/Users/ncharass/.m2/repository/org/apache/httpcomponents/httpcore/4.2/httpcore-4.2.jar:/Users/ncharass/.m2/repository/commons-codec/commons-codec/1.3/commons-codec-1.3.jar:/Users/ncharass/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.1.1/jackson-core-2.1.1.jar:/Users/ncharass/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.1.1/jackson-databind-2.1.1.jar:/Users/ncharass/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.1.1/jackson-annotations-2.1.1.jar:/Users/ncharass/.m2/repository/joda-time/joda-time/2.9.3/joda-time-2.9.3.jar:/Users/ncharass/.m2/repository/org/apache/commons/commons-lang3/3.0/commons-lang3-3.0.jar:/Users/ncharass/.m2/repository/com/google/guava/guava/13.0/guava-13.0.jar:/Users/ncharass/.m2/repository/org/apache/storm/storm-core/0.9.2-incubating/storm-core-0.9.2-incubating.jar:/Users/ncharass/.m2/repository/org/clojure/clojure/1.5.1/clojure-1.5.1.jar:/Users/ncharass/.m2/repository/clj-time/clj-time/0.4.1/clj-time-0.4.1.jar:/Users/ncharass/.m2/repository/compojure/compojure/1.1.3/compojure-1.1.3.jar:/Users/ncharass/.m2/repository/org/clojure/core.incubator/0.1.0/core.incubator-0.1.0.jar:/Users/ncharass/.m2/repository/org/clojure/tools.macro/0.1.0/tools.macro-0.1.0.jar:/Users/ncharass/.m2/repository/clout/clout/1.0.1/clout-1.0.1.jar:/Users/ncharass/.m2/repository/ring/ring-core/1.1.5/ring-core-1.1.5.jar:/Users/ncharass/.m2/repository/commons-fileupload/commons-fileupload/1.2.1/commons-fileupload-1.2.1.jar:/Users/ncharass/.m2/repository/javax/servlet/servlet-api/2.5/servlet-api-2.5.jar:/Users/ncharass/.m2/repository/hiccup/hiccup/0.3.6/hiccup-0.3.6.jar:/Users/ncharass/.m2/repository/ring/ring-devel/0.3.11/ring-devel-0.3.11.jar:/Users/ncharass/.m2/repository/clj-stacktrace/clj-stacktrace/0.2.2/clj-stacktrace-0.2.2.jar:/Users/ncharass/.m2/repository/ring/ring-jetty-adapter/0.3.11/ring-jetty-adapter-0.3.11.jar:/Users/ncharass/.m2/repository/ring/ring-servlet/0.3.11/ring-servlet-0.3.11.jar:/Users/ncharass/.m2/repository/org/mortbay/jetty/jetty/6.1.26/jetty-6.1.26.jar:/Users/ncharass/.m2/repository/org/mortbay/jetty/servlet-api/2.5-20081211/servlet-api-2.5-20081211.jar:/Users/ncharass/.m2/repository/org/mortbay/jetty/jetty-util/6.1.26/jetty-util-6.1.26.jar:/Users/ncharass/.m2/repository/org/clojure/tools.logging/0.2.3/tools.logging-0.2.3.jar:/Users/ncharass/.m2/repository/org/clojure/math.numeric-tower/0.0.1/math.numeric-tower-0.0.1.jar:/Users/ncharass/.m2/repository/org/clojure/tools.cli/0.2.4/tools.cli-0.2.4.jar:/Users/ncharass/.m2/repository/commons-io/commons-io/2.4/commons-io-2.4.jar:/Users/ncharass/.m2/repository/org/apache/commons/commons-exec/1.1/commons-exec-1.1.jar:/Users/ncharass/.m2/repository/commons-lang/commons-lang/2.5/commons-lang-2.5.jar:/Users/ncharass/.m2/repository/org/apache/curator/curator-framework/2.4.0/curator-framework-2.4.0.jar:/Users/ncharass/.m2/repository/org/apache/curator/curator-client/2.4.0/curator-client-2.4.0.jar:/Users/ncharass/.m2/repository/com/googlecode/json-simple/json-simple/1.1/json-simple-1.1.jar:/Users/ncharass/.m2/repository/com/twitter/carbonite/1.4.0/carbonite-1.4.0.jar:/Users/ncharass/.m2/repository/com/esotericsoftware/kryo/kryo/2.21/kryo-2.21.jar:/Users/ncharass/.m2/repository/com/esotericsoftware/reflectasm/reflectasm/1.07/reflectasm-1.07-shaded.jar:/Users/ncharass/.m2/repository/org/ow2/asm/asm/4.0/asm-4.0.jar:/Users/ncharass/.m2/repository/com/esotericsoftware/minlog/minlog/1.2/minlog-1.2.jar:/Users/ncharass/.m2/repository/org/objenesis/objenesis/1.2/objenesis-1.2.jar:/Users/ncharass/.m2/repository/com/twitter/chill-java/0.3.5/chill-java-0.3.5.jar:/Users/ncharass/.m2/repository/org/yaml/snakeyaml/1.11/snakeyaml-1.11.jar:/Users/ncharass/.m2/repository/com/googlecode/disruptor/disruptor/2.10.1/disruptor-2.10.1.jar:/Users/ncharass/.m2/repository/org/jgrapht/jgrapht-core/0.9.0/jgrapht-core-0.9.0.jar:/Users/ncharass/.m2/repository/ch/qos/logback/logback-classic/1.0.6/logback-classic-1.0.6.jar:/Users/ncharass/.m2/repository/ch/qos/logback/logback-core/1.0.6/logback-core-1.0.6.jar:/Users/ncharass/.m2/repository/org/slf4j/slf4j-api/1.6.5/slf4j-api-1.6.5.jar:/Users/ncharass/.m2/repository/org/slf4j/log4j-over-slf4j/1.6.6/log4j-over-slf4j-1.6.6.jar:/Users/ncharass/.m2/repository/io/netty/netty/3.6.3.Final/netty-3.6.3.Final.jar:/Users/ncharass/.m2/repository/com/netflix/curator/curator-framework/1.1.3/curator-framework-1.1.3.jar:/Users/ncharass/.m2/repository/com/netflix/curator/curator-client/1.1.3/curator-client-1.1.3.jar:/Users/ncharass/.m2/repository/org/apache/zookeeper/zookeeper/3.4.2/zookeeper-3.4.2.jar:/Users/ncharass/.m2/repository/org/slf4j/slf4j-log4j12/1.6.1/slf4j-log4j12-1.6.1.jar:/Users/ncharass/.m2/repository/log4j/log4j/1.2.15/log4j-1.2.15.jar:/Users/ncharass/.m2/repository/javax/mail/mail/1.4/mail-1.4.jar:/Users/ncharass/.m2/repository/javax/activation/activation/1.1/activation-1.1.jar:/Users/ncharass/.m2/repository/jline/jline/0.9.94/jline-0.9.94.jar:/Users/ncharass/.m2/repository/org/jboss/netty/netty/3.2.2.Final/netty-3.2.2.Final.jar:/Applications/IntelliJ IDEA 15 CE.app/Contents/lib/idea_rt.jar" SampleTopology
Connected to the target VM, address: '127.0.0.1:60249', transport: 'socket'
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/ncharass/.m2/repository/ch/qos/logback/logback-classic/1.0.6/logback-classic-1.0.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/ncharass/.m2/repository/org/slf4j/slf4j-log4j12/1.6.1/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
209  [main] INFO  SampleTopology - Using topology name SampleTopology
212  [main] INFO  SampleTopology - Using stream name sparrow-ci
212  [main] INFO  SampleTopology - Using initial position LATEST (if a checkpoint is not found).
212  [main] INFO  SampleTopology - Using recordRetryLimit 3
212  [main] INFO  SampleTopology - Using region us-east-1
212  [main] INFO  SampleTopology - Using zookeeper endpoint localhost:2181
212  [main] INFO  SampleTopology - Using zookeeper prefix kinesis_spout
SLF4J: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. 
SLF4J: See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more details.
354  [main] INFO  SampleTopology - Using Kinesis stream: sparrow-ci
377  [main] INFO  SampleTopology - Starting sample storm topology in LocalMode ...
2190 [main] INFO  backtype.storm.zookeeper - Starting inprocess zookeeper at port 2000 and dir /var/folders/8w/mhcwr8bx4js2xwcbh66nmypx3kk1ll/T//cad2c8c6-08bc-4e23-b175-a55b1bac57a3
2323 [main] INFO  backtype.storm.daemon.nimbus - Starting Nimbus with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 50, "storm.messaging.netty.flush.check.interval.ms" 10, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/var/folders/8w/mhcwr8bx4js2xwcbh66nmypx3kk1ll/T//3d256710-b2f0-4531-90b0-e54253f15998", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" [6700 6701 6702 6703], "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "topology.multilang.serializer" "backtype.storm.multilang.JsonSerializer", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "topology.worker.receiver.thread.count" 1, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.netty.Context", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", "topology.max.task.parallelism" nil, "storm.messaging.netty.transfer.batch.size" 262144}
2326 [main] INFO  backtype.storm.daemon.nimbus - Using default scheduler
2380 [main] INFO  org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
2487 [main-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
2487 [ConnectionStateManager-0] WARN  org.apache.curator.framework.state.ConnectionStateManager - There are no ConnectionStateListeners registered.
2488 [main-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state update: :connected:none
3518 [main] INFO  org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
3521 [main-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
3521 [ConnectionStateManager-0] WARN  org.apache.curator.framework.state.ConnectionStateManager - There are no ConnectionStateListeners registered.
3540 [main] INFO  org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
3543 [main-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
3543 [ConnectionStateManager-0] WARN  org.apache.curator.framework.state.ConnectionStateManager - There are no ConnectionStateListeners registered.
3543 [main-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state update: :connected:none
3545 [main] INFO  org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
3546 [main] INFO  org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
3547 [main-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
3547 [ConnectionStateManager-0] WARN  org.apache.curator.framework.state.ConnectionStateManager - There are no ConnectionStateListeners registered.
3549 [main-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
3549 [ConnectionStateManager-0] WARN  org.apache.curator.framework.state.ConnectionStateManager - There are no ConnectionStateListeners registered.
3549 [main-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state update: :connected:none
3550 [main] INFO  org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
3553 [main-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
3553 [ConnectionStateManager-0] WARN  org.apache.curator.framework.state.ConnectionStateManager - There are no ConnectionStateListeners registered.
3561 [main] INFO  backtype.storm.daemon.supervisor - Starting Supervisor with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 50, "storm.messaging.netty.flush.check.interval.ms" 10, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/var/folders/8w/mhcwr8bx4js2xwcbh66nmypx3kk1ll/T//ed2de9de-62f4-4d0a-92a6-ec3583b91a98", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" (1024 1025 1026), "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "topology.multilang.serializer" "backtype.storm.multilang.JsonSerializer", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "topology.worker.receiver.thread.count" 1, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.netty.Context", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", "topology.max.task.parallelism" nil, "storm.messaging.netty.transfer.batch.size" 262144}
3573 [main] INFO  org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
3575 [main-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
3575 [ConnectionStateManager-0] WARN  org.apache.curator.framework.state.ConnectionStateManager - There are no ConnectionStateListeners registered.
3575 [main-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state update: :connected:none
3577 [main] INFO  org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
3579 [main-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
3579 [ConnectionStateManager-0] WARN  org.apache.curator.framework.state.ConnectionStateManager - There are no ConnectionStateListeners registered.
3595 [main] INFO  backtype.storm.daemon.supervisor - Starting supervisor with id f039207b-d4ba-4acd-a7a8-b9072794a406 at host 192.168.1.2
3598 [main] INFO  backtype.storm.daemon.supervisor - Starting Supervisor with conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 50, "storm.messaging.netty.flush.check.interval.ms" 10, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/var/folders/8w/mhcwr8bx4js2xwcbh66nmypx3kk1ll/T//4d463789-d881-43a6-aa02-8f8a5e6b301f", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" (1027 1028 1029), "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "topology.multilang.serializer" "backtype.storm.multilang.JsonSerializer", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "topology.worker.receiver.thread.count" 1, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.netty.Context", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", "topology.max.task.parallelism" nil, "storm.messaging.netty.transfer.batch.size" 262144}
3600 [main] INFO  org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
3602 [main-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
3602 [ConnectionStateManager-0] WARN  org.apache.curator.framework.state.ConnectionStateManager - There are no ConnectionStateListeners registered.
3602 [main-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state update: :connected:none
3604 [main] INFO  org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
3606 [main-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
3606 [ConnectionStateManager-0] WARN  org.apache.curator.framework.state.ConnectionStateManager - There are no ConnectionStateListeners registered.
3612 [main] INFO  backtype.storm.daemon.supervisor - Starting supervisor with id 11439b69-c60a-4479-b86b-46963888b4ec at host 192.168.1.2
3654 [main] INFO  backtype.storm.daemon.nimbus - Received topology submission for test_spout with conf {"topology.max.task.parallelism" nil, "topology.acker.executors" nil, "topology.kryo.register" nil, "topology.kryo.decorators" (), "topology.name" "test_spout", "storm.id" "test_spout-1-1462854622", "topology.debug" false, "topology.fall.back.on.java.serialization" true}
3669 [main] INFO  backtype.storm.daemon.nimbus - Activating test_spout: test_spout-1-1462854622
3729 [main] INFO  backtype.storm.scheduler.EvenScheduler - Available slots: (["11439b69-c60a-4479-b86b-46963888b4ec" 1027] ["11439b69-c60a-4479-b86b-46963888b4ec" 1028] ["11439b69-c60a-4479-b86b-46963888b4ec" 1029] ["f039207b-d4ba-4acd-a7a8-b9072794a406" 1024] ["f039207b-d4ba-4acd-a7a8-b9072794a406" 1025] ["f039207b-d4ba-4acd-a7a8-b9072794a406" 1026])
3743 [main] INFO  backtype.storm.daemon.nimbus - Setting new assignment for topology id test_spout-1-1462854622: #backtype.storm.daemon.common.Assignment{:master-code-dir "/var/folders/8w/mhcwr8bx4js2xwcbh66nmypx3kk1ll/T//3d256710-b2f0-4531-90b0-e54253f15998/nimbus/stormdist/test_spout-1-1462854622", :node->host {"11439b69-c60a-4479-b86b-46963888b4ec" "192.168.1.2"}, :executor->node+port {[4 4] ["11439b69-c60a-4479-b86b-46963888b4ec" 1027], [3 3] ["11439b69-c60a-4479-b86b-46963888b4ec" 1027], [2 2] ["11439b69-c60a-4479-b86b-46963888b4ec" 1027], [1 1] ["11439b69-c60a-4479-b86b-46963888b4ec" 1027], [5 5] ["11439b69-c60a-4479-b86b-46963888b4ec" 1027]}, :executor->start-time-secs {[5 5] 1462854622, [1 1] 1462854622, [2 2] 1462854622, [3 3] 1462854622, [4 4] 1462854622}}
4609 [Thread-5] INFO  backtype.storm.daemon.supervisor - Downloading code for storm id test_spout-1-1462854622 from /var/folders/8w/mhcwr8bx4js2xwcbh66nmypx3kk1ll/T//3d256710-b2f0-4531-90b0-e54253f15998/nimbus/stormdist/test_spout-1-1462854622
4811 [Thread-5] INFO  backtype.storm.daemon.supervisor - Extracting resources from jar at /Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/lib/ant-javafx.jar to /var/folders/8w/mhcwr8bx4js2xwcbh66nmypx3kk1ll/T//4d463789-d881-43a6-aa02-8f8a5e6b301f/supervisor/stormdist/test_spout-1-1462854622/resources
4825 [Thread-5] INFO  backtype.storm.daemon.supervisor - Finished downloading code for storm id test_spout-1-1462854622 from /var/folders/8w/mhcwr8bx4js2xwcbh66nmypx3kk1ll/T//3d256710-b2f0-4531-90b0-e54253f15998/nimbus/stormdist/test_spout-1-1462854622
4836 [Thread-6] INFO  backtype.storm.daemon.supervisor - Launching worker with assignment #backtype.storm.daemon.supervisor.LocalAssignment{:storm-id "test_spout-1-1462854622", :executors ([4 4] [3 3] [2 2] [1 1] [5 5])} for this supervisor 11439b69-c60a-4479-b86b-46963888b4ec on port 1027 with id 94ff415f-690b-417c-8b1e-4bc4a61c4e69
4837 [Thread-6] INFO  backtype.storm.daemon.worker - Launching worker for test_spout-1-1462854622 on 11439b69-c60a-4479-b86b-46963888b4ec:1027 with id 94ff415f-690b-417c-8b1e-4bc4a61c4e69 and conf {"dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 50, "storm.messaging.netty.flush.check.interval.ms" 10, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/var/folders/8w/mhcwr8bx4js2xwcbh66nmypx3kk1ll/T//4d463789-d881-43a6-aa02-8f8a5e6b301f", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" (1027 1028 1029), "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "topology.multilang.serializer" "backtype.storm.multilang.JsonSerializer", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "topology.worker.receiver.thread.count" 1, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.netty.Context", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", "topology.max.task.parallelism" nil, "storm.messaging.netty.transfer.batch.size" 262144}
4838 [Thread-6] INFO  org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
4841 [Thread-6-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
4841 [ConnectionStateManager-0] WARN  org.apache.curator.framework.state.ConnectionStateManager - There are no ConnectionStateListeners registered.
4841 [Thread-6-EventThread] INFO  backtype.storm.zookeeper - Zookeeper state update: :connected:none
4843 [Thread-6] INFO  org.apache.curator.framework.imps.CuratorFrameworkImpl - Starting
4845 [Thread-6-EventThread] INFO  org.apache.curator.framework.state.ConnectionStateManager - State change: CONNECTED
4846 [ConnectionStateManager-0] WARN  org.apache.curator.framework.state.ConnectionStateManager - There are no ConnectionStateListeners registered.
5033 [Thread-6] INFO  backtype.storm.daemon.executor - Loading executor kinesis_spout:[2 2]
5041 [Thread-6] INFO  backtype.storm.daemon.executor - Loaded executor tasks kinesis_spout:[2 2]
5053 [Thread-8-kinesis_spout] INFO  backtype.storm.daemon.executor - Opening spout kinesis_spout:(2)
5053 [Thread-6] INFO  backtype.storm.daemon.executor - Finished loading executor kinesis_spout:[2 2]
5060 [Thread-6] INFO  backtype.storm.daemon.executor - Loading executor kinesis_spout:[3 3]
5061 [Thread-6] INFO  backtype.storm.daemon.executor - Loaded executor tasks kinesis_spout:[3 3]
5063 [Thread-10-kinesis_spout] INFO  backtype.storm.daemon.executor - Opening spout kinesis_spout:(3)
5063 [Thread-6] INFO  backtype.storm.daemon.executor - Finished loading executor kinesis_spout:[3 3]
5066 [Thread-8-kinesis_spout] INFO  com.amazonaws.services.kinesis.stormspout.KinesisSpout - KinesisSpout[taskIndex=0] open() called with topoConfig task index 0 for processing stream sparrow-ci
5066 [Thread-10-kinesis_spout] INFO  com.amazonaws.services.kinesis.stormspout.KinesisSpout - KinesisSpout[taskIndex=1] open() called with topoConfig task index 1 for processing stream sparrow-ci
5066 [Thread-10-kinesis_spout] INFO  backtype.storm.daemon.executor - Opened spout kinesis_spout:(3)
5066 [Thread-8-kinesis_spout] INFO  backtype.storm.daemon.executor - Opened spout kinesis_spout:(2)
5067 [Thread-6] INFO  backtype.storm.daemon.executor - Loading executor print_bolt:[4 4]
5068 [Thread-6] INFO  backtype.storm.daemon.executor - Loaded executor tasks print_bolt:[4 4]
5068 [Thread-8-kinesis_spout] INFO  backtype.storm.daemon.executor - Activating spout kinesis_spout:(2)
5068 [Thread-10-kinesis_spout] INFO  backtype.storm.daemon.executor - Activating spout kinesis_spout:(3)
5072 [Thread-6] INFO  backtype.storm.daemon.executor - Finished loading executor print_bolt:[4 4]
5072 [Thread-12-print_bolt] INFO  backtype.storm.daemon.executor - Preparing bolt print_bolt:(4)
5075 [Thread-12-print_bolt] INFO  backtype.storm.daemon.executor - Prepared bolt print_bolt:(4)
5076 [Thread-6] INFO  backtype.storm.daemon.executor - Loading executor print_bolt:[5 5]
5077 [Thread-6] INFO  backtype.storm.daemon.executor - Loaded executor tasks print_bolt:[5 5]
5078 [Thread-6] INFO  backtype.storm.daemon.executor - Finished loading executor print_bolt:[5 5]
5078 [Thread-14-print_bolt] INFO  backtype.storm.daemon.executor - Preparing bolt print_bolt:(5)
5078 [Thread-14-print_bolt] INFO  backtype.storm.daemon.executor - Prepared bolt print_bolt:(5)
5082 [Thread-6] INFO  backtype.storm.daemon.executor - Loading executor __system:[-1 -1]
5082 [Thread-6] INFO  backtype.storm.daemon.executor - Loaded executor tasks __system:[-1 -1]
5084 [Thread-6] INFO  backtype.storm.daemon.executor - Finished loading executor __system:[-1 -1]
5084 [Thread-16-__system] INFO  backtype.storm.daemon.executor - Preparing bolt __system:(-1)
5087 [Thread-8-kinesis_spout] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
5087 [Thread-10-kinesis_spout] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl - Starting
5087 [Thread-16-__system] INFO  backtype.storm.daemon.executor - Prepared bolt __system:(-1)
5088 [Thread-6] INFO  backtype.storm.daemon.executor - Loading executor __acker:[1 1]
5089 [Thread-6] INFO  backtype.storm.daemon.executor - Loaded executor tasks __acker:[1 1]
5091 [Thread-6] INFO  backtype.storm.daemon.executor - Timeouts disabled for executor __acker:[1 1]
5091 [Thread-18-__acker] INFO  backtype.storm.daemon.executor - Preparing bolt __acker:(1)
5091 [Thread-6] INFO  backtype.storm.daemon.executor - Finished loading executor __acker:[1 1]
5091 [Thread-6] INFO  backtype.storm.daemon.worker - Launching receive-thread for 11439b69-c60a-4479-b86b-46963888b4ec:1027
5092 [Thread-18-__acker] INFO  backtype.storm.daemon.executor - Prepared bolt __acker:(1)
5096 [Thread-19-worker-receiver-thread-0] INFO  backtype.storm.messaging.loader - Starting receive-thread: [stormId: test_spout-1-1462854622, port: 1027, thread-id: 0 ]
5101 [Thread-6] INFO  backtype.storm.daemon.worker - Worker has topology config {"storm.id" "test_spout-1-1462854622", "dev.zookeeper.path" "/tmp/dev-storm-zookeeper", "topology.tick.tuple.freq.secs" nil, "topology.builtin.metrics.bucket.size.secs" 60, "topology.fall.back.on.java.serialization" true, "topology.max.error.report.per.interval" 5, "zmq.linger.millis" 0, "topology.skip.missing.kryo.registrations" true, "storm.messaging.netty.client_worker_threads" 1, "ui.childopts" "-Xmx768m", "storm.zookeeper.session.timeout" 20000, "nimbus.reassign" true, "topology.trident.batch.emit.interval.millis" 50, "storm.messaging.netty.flush.check.interval.ms" 10, "nimbus.monitor.freq.secs" 10, "logviewer.childopts" "-Xmx128m", "java.library.path" "/usr/local/lib:/opt/local/lib:/usr/lib", "topology.executor.send.buffer.size" 1024, "storm.local.dir" "/var/folders/8w/mhcwr8bx4js2xwcbh66nmypx3kk1ll/T//4d463789-d881-43a6-aa02-8f8a5e6b301f", "storm.messaging.netty.buffer_size" 5242880, "supervisor.worker.start.timeout.secs" 120, "topology.enable.message.timeouts" true, "nimbus.cleanup.inbox.freq.secs" 600, "nimbus.inbox.jar.expiration.secs" 3600, "drpc.worker.threads" 64, "topology.worker.shared.thread.pool.size" 4, "nimbus.host" "localhost", "storm.messaging.netty.min_wait_ms" 100, "storm.zookeeper.port" 2000, "transactional.zookeeper.port" nil, "topology.executor.receive.buffer.size" 1024, "transactional.zookeeper.servers" nil, "storm.zookeeper.root" "/storm", "storm.zookeeper.retry.intervalceiling.millis" 30000, "supervisor.enable" true, "storm.messaging.netty.server_worker_threads" 1, "storm.zookeeper.servers" ["localhost"], "transactional.zookeeper.root" "/transactional", "topology.acker.executors" nil, "topology.kryo.decorators" (), "topology.name" "test_spout", "topology.transfer.buffer.size" 1024, "topology.worker.childopts" nil, "drpc.queue.size" 128, "worker.childopts" "-Xmx768m", "supervisor.heartbeat.frequency.secs" 5, "topology.error.throttle.interval.secs" 10, "zmq.hwm" 0, "drpc.port" 3772, "supervisor.monitor.frequency.secs" 3, "drpc.childopts" "-Xmx768m", "topology.receiver.buffer.size" 8, "task.heartbeat.frequency.secs" 3, "topology.tasks" nil, "storm.messaging.netty.max_retries" 30, "topology.spout.wait.strategy" "backtype.storm.spout.SleepSpoutWaitStrategy", "nimbus.thrift.max_buffer_size" 1048576, "topology.max.spout.pending" nil, "storm.zookeeper.retry.interval" 1000, "topology.sleep.spout.wait.strategy.time.ms" 1, "nimbus.topology.validator" "backtype.storm.nimbus.DefaultTopologyValidator", "supervisor.slots.ports" (1027 1028 1029), "topology.debug" false, "nimbus.task.launch.secs" 120, "nimbus.supervisor.timeout.secs" 60, "topology.kryo.register" nil, "topology.message.timeout.secs" 30, "task.refresh.poll.secs" 10, "topology.workers" 1, "supervisor.childopts" "-Xmx256m", "nimbus.thrift.port" 6627, "topology.stats.sample.rate" 0.05, "worker.heartbeat.frequency.secs" 1, "topology.tuple.serializer" "backtype.storm.serialization.types.ListDelegateSerializer", "topology.disruptor.wait.strategy" "com.lmax.disruptor.BlockingWaitStrategy", "topology.multilang.serializer" "backtype.storm.multilang.JsonSerializer", "nimbus.task.timeout.secs" 30, "storm.zookeeper.connection.timeout" 15000, "topology.kryo.factory" "backtype.storm.serialization.DefaultKryoFactory", "drpc.invocations.port" 3773, "logviewer.port" 8000, "zmq.threads" 1, "storm.zookeeper.retry.times" 5, "topology.worker.receiver.thread.count" 1, "storm.thrift.transport" "backtype.storm.security.auth.SimpleTransportPlugin", "topology.state.synchronization.timeout.secs" 60, "supervisor.worker.timeout.secs" 30, "nimbus.file.copy.expiration.secs" 600, "storm.messaging.transport" "backtype.storm.messaging.netty.Context", "logviewer.appender.name" "A1", "storm.messaging.netty.max_wait_ms" 1000, "drpc.request.timeout.secs" 600, "storm.local.mode.zmq" false, "ui.port" 8080, "nimbus.childopts" "-Xmx1024m", "storm.cluster.mode" "local", "topology.max.task.parallelism" nil, "storm.messaging.netty.transfer.batch.size" 262144}
5101 [Thread-6] INFO  backtype.storm.daemon.worker - Worker 94ff415f-690b-417c-8b1e-4bc4a61c4e69 for storm test_spout-1-1462854622 on 11439b69-c60a-4479-b86b-46963888b4ec:1027 has finished loading
5459 [Thread-8-kinesis_spout] INFO  com.amazonaws.services.kinesis.stormspout.KinesisHelper - Using us-east-1 region
5459 [Thread-10-kinesis_spout] INFO  com.amazonaws.services.kinesis.stormspout.KinesisHelper - Using us-east-1 region

However, I don't see that the Spout gets any data from the stream.

Here's the code I use to generate data in Node.js

'use strict';

const AWS = require('aws-sdk');
const kinesis = new AWS.Kinesis({
  region: 'us-east-1'
});

module.exports = {

  add: () => {
    let params = {
      Data: 'test',
      PartitionKey: 'partitionKey'
    };

    let records = [];
    for(let i = 0; i < 300; i++) {
      records.push(params);
    }

    let payload = {
      Records: records,
      StreamName: 'sparrow-ci'
    }

    kinesis.putRecords(payload, (err, data) => {
      if (err) {
        console.log(err, err.stack);
      }
      console.log(data);
    });
  }

};

I can see that the data is in Kinesis by looking at CloudWatch, but I don't see that the Spout reads any data.

Update Apache Storm version

The Apache Storm version being used in the code is 0.9.2.
Apache Storm is now on 1.x version, which has some changes in the packages names, for example backtype.storm.topology no longer exist.

Support kinesis stream resharding

The README mentioned future work to support resharding (close, merge, split) events. I'm curious if work has been doing towards this goal, or if such work is currently planned. I may also be in a position to work on developing such a feature in the near future so any insights into this would be welcome if there are currently no plans to develop this feature.

Checkpointing and LATEST position.

I'm seeing this behavior and I'm a bit confused by it. In either case I've set the position in shard to be LATEST, e.g. my spout config uses:

withInitialPositionInStream(          
    com.amazonaws.services.kinesis.stormspout.InitialPositionInStream.LATEST
);

Run my topology locally, using the Zookeeper instance created by LocalCluster - I'm getting the latest records.

Run my topology locally, using an instance of Zookeeper I've setup separately - I'm getting records that are 10 hours old.

I'd expect that if I choose the LATEST position in the shard, I shouldn't be getting such old data. Is there something I'm missing?

No Leader Found Issue

13:34:46.087 [main] INFO o.a.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -6903501841317391034:-8252491722746197288
13:34:46.161 [main] WARN o.a.s.v.ConfigValidation - storm.messaging.netty.max_retries is a deprecated config please see class org.apache.storm.Config.STORM_MESSAGING_NETTY_MAX_RETRIES for more information.
13:34:46.192 [main] WARN o.a.s.u.StormBoundedExponentialBackoffRetry - WILL SLEEP FOR 2001ms (NOT MAX)
13:34:48.193 [main] WARN o.a.s.u.StormBoundedExponentialBackoffRetry - WILL SLEEP FOR 2003ms (NOT MAX)
13:34:50.197 [main] WARN o.a.s.u.StormBoundedExponentialBackoffRetry - WILL SLEEP FOR 2007ms (NOT MAX)
13:34:52.205 [main] WARN o.a.s.u.StormBoundedExponentialBackoffRetry - WILL SLEEP FOR 2015ms (NOT MAX)
13:34:54.221 [main] WARN o.a.s.u.StormBoundedExponentialBackoffRetry - WILL SLEEP FOR 2029ms (NOT MAX)
Exception in thread "main" org.apache.storm.utils.NimbusLeaderNotFoundException: Could not find leader nimbus from seed hosts [localhost]. Did you specify a valid list of nimbus hosts for config nimbus.seeds?
at org.apache.storm.utils.NimbusClient.getConfiguredClientAs(NimbusClient.java:141)
at org.apache.storm.utils.NimbusClient.getConfiguredClient(NimbusClient.java:83)
at org.apache.storm.blobstore.NimbusBlobStore.prepare(NimbusBlobStore.java:268)
at org.apache.storm.StormSubmitter.getListOfKeysFromBlobStore(StormSubmitter.java:599)
at org.apache.storm.StormSubmitter.validateConfs(StormSubmitter.java:565)
at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:211)
at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:391)
at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:163)
at org.apache.storm.kinesis.spout.test.KinesisSpoutTopology.main(KinesisSpoutTopology.java:69)
13:34:56.251 [main] WARN o.a.s.u.NimbusClient - Ignoring exception while trying to get leader nimbus info from localhost. will retry with a different seed host.
java.lang.RuntimeException: java.lang.RuntimeException: org.apache.thrift.transport.TTransportException: java.net.ConnectException: Connection refused (Connection refused)
at org.apache.storm.security.auth.ThriftClient.reconnect(ThriftClient.java:110) ~[storm-client-2.0.0-20170519.203612-2.jar:2.0.0-SNAPSHOT]
at org.apache.storm.security.auth.ThriftClient.(ThriftClient.java:70) ~[storm-client-2.0.0-20170519.203612-2.jar:2.0.0-SNAPSHOT]
at org.apache.storm.utils.NimbusClient.(NimbusClient.java:158) ~[storm-client-2.0.0-20170519.203612-2.jar:2.0.0-SNAPSHOT]
at org.apache.storm.utils.NimbusClient.getConfiguredClientAs(NimbusClient.java:113) [storm-client-2.0.0-20170519.203612-2.jar:2.0.0-SNAPSHOT]
at org.apache.storm.utils.NimbusClient.getConfiguredClient(NimbusClient.java:83) [storm-client-2.0.0-20170519.203612-2.jar:2.0.0-SNAPSHOT]
at org.apache.storm.blobstore.NimbusBlobStore.prepare(NimbusBlobStore.java:268) [storm-client-2.0.0-20170519.203612-2.jar:2.0.0-SNAPSHOT]
at org.apache.storm.StormSubmitter.getListOfKeysFromBlobStore(StormSubmitter.java:599) [storm-client-2.0.0-20170519.203612-2.jar:2.0.0-SNAPSHOT]
at org.apache.storm.StormSubmitter.validateConfs(StormSubmitter.java:565) [storm-client-2.0.0-20170519.203612-2.jar:2.0.0-SNAPSHOT]
at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:211) [storm-client-2.0.0-20170519.203612-2.jar:2.0.0-SNAPSHOT]
at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:391) [storm-client-2.0.0-20170519.203612-2.jar:2.0.0-SNAPSHOT]
at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:163) [storm-client-2.0.0-20170519.203612-2.jar:2.0.0-SNAPSHOT]
at org.apache.storm.kinesis.spout.test.KinesisSpoutTopology.main(KinesisSpoutTopology.java:69) [test-classes/:?]
Caused by: java.lang.RuntimeException: org.apache.thrift.transport.TTransportException: java.net.ConnectException: Connection refused (Connection refused)
at org.apache.storm.security.auth.TBackoffConnect.retryNext(TBackoffConnect.java:64) ~[storm-client-2.0.0-20170519.203612-2.jar:2.0.0-SNAPSHOT]
at org.apache.storm.security.auth.TBackoffConnect.doConnectWithRetry(TBackoffConnect.java:56) ~[storm-client-2.0.0-20170519.203612-2.jar:2.0.0-SNAPSHOT]
at org.apache.storm.security.auth.ThriftClient.reconnect(ThriftClient.java:102) ~[storm-client-2.0.0-20170519.203612-2.jar:2.0.0-SNAPSHOT]
... 11 more
Caused by: org.apache.thrift.transport.TTransportException: java.net.ConnectException: Connection refused (Connection refused)
at org.apache.thrift.transport.TSocket.open(TSocket.java:226) ~[libthrift-0.9.3.jar:0.9.3]
at org.apache.thrift.transport.TFramedTransport.open(TFramedTransport.java:81) ~[libthrift-0.9.3.jar:0.9.3]
at org.apache.storm.security.auth.SimpleTransportPlugin.connect(SimpleTransportPlugin.java:105) ~[storm-client-2.0.0-20170519.203612-2.jar:2.0.0-SNAPSHOT]
at org.apache.storm.security.auth.TBackoffConnect.doConnectWithRetry(TBackoffConnect.java:53) ~[storm-client-2.0.0-20170519.203612-2.jar:2.0.0-SNAPSHOT]
at org.apache.storm.security.auth.ThriftClient.reconnect(ThriftClient.java:102) ~[storm-client-2.0.0-20170519.203612-2.jar:2.0.0-SNAPSHOT]
... 11 more
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:1.8.0_131]
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) ~[?:1.8.0_131]
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) ~[?:1.8.0_131]
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) ~[?:1.8.0_131]
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[?:1.8.0_131]
at java.net.Socket.connect(Socket.java:589) ~[?:1.8.0_131]
at org.apache.thrift.transport.TSocket.open(TSocket.java:221) ~[libthrift-0.9.3.jar:0.9.3]
at org.apache.thrift.transport.TFramedTransport.open(TFramedTransport.java:81) ~[libthrift-0.9.3.jar:0.9.3]
at org.apache.storm.security.auth.SimpleTransportPlugin.connect(SimpleTransportPlugin.java:105) ~[storm-client-2.0.0-20170519.203612-2.jar:2.0.0-SNAPSHOT]
at org.apache.storm.security.auth.TBackoffConnect.doConnectWithRetry(TBackoffConnect.java:53) ~[storm-client-2.0.0-20170519.203612-2.jar:2.0.0-SNAPSHOT]
at org.apache.storm.security.auth.ThriftClient.reconnect(ThriftClient.java:102) ~[storm-client-2.0.0-20170519.203612-2.jar:2.0.0-SNAPSHOT]
... 11 more

What should I use for zookeeperPrefix value in my sample.properties?

Should I use my value /storm , from my Storm cluster's variable storm.zookeeper.root variable?

Or do I just make up a value like "kinesis_spout", which isn't a directory, but seems like just a made up value?

Prefix/path for storing spout state in Zookeeper

zookeeperPrefix = kinesis_spout

Thanks for any replies.
Sam Glover
Austin, Texas

spout slows down

not sure what I'm doing wrong. I have my topology configured with 10 as the parallel hint for the spout. I create a stream with 10 shards to test. I've tried having the producer partition the records out in the following ways:

UUID - so lots of partitions
% 10 - so that it is distributed evenly among shards
% 30
% 2

No matter what partition key I choose the data seems to scream into storm...I'll get ~5-8k records in and then it slows down to a halt. When I say halt I mean 10s-20s of records. Then it will burst again for only a couple hundred and slow down again.

I feel like I'm not understanding something. Any help would be appreciated.

Getting the NoNode error for /kinesis_spout

23349 [Thread-18-kinesis_spout-executor[2 2]] ERROR c.a.s.k.s.s.z.ZookeeperStateManager - ZookeeperStateManager[taskIndex=0] something went wrong while initializing Zookeeper shardList. Assuming it is unsafe to continue.
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /kinesis_spout

Not sure where went wrong. Was trying to run a simple storm application on EC2 docker

OUT OF ORDER Inserts with Kinesis Spout and large batch sizes

Believe we found the root issue. The code issues a getRecords call in batches of a configurable size to Amazon and expects the list of records to be returned in order. For example lets say we are requesting in batches of 1000 records. Amazon does provide the next 1000 sequence numbers but the list that is returned is not necessarily sorted. This causes the out of order inserts. The solution we have in place is to sort the result that Amazon returns.

Test case to prove the behavior:
https://gist.github.com/geota/ed47ecdead08ab0cab66

Will send a PR soon

"""
Processed 900 with no out of order inserts
Requesting records
java.lang.RuntimeException: OUT OF ORDER INSERT: last seq 49552833805435751671064410149559486681498920731233222658 is AFTER curent seq: 49552833805435751671064410147127127932433940966597984258
at com.amazonaws.services.kinesis.stormspout.KinesisHelperTest.test(KinesisHelperTest.java:48)
"""

when I run this I get error "Exception in thread "main" java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider"

I'm trying to run against a AWS Kinesis stream, and I've included my AWSCredentials.properties file in the same directory where sample.properties file is.

I've examined , reviewed packaged jar file and the file is present , exists in packaged jar file com/amazonaws/auth/AWSCredentialsProvider

I'm using Eclipse Java EE IDE for Web Developers to build "runnable jar file", packaging in all dependencies.
Version: Kepler Service Release 2
Build id: 20140224-0627

Below is error message. Any and all replies are appreciated!!

Sam

ns-lang-2.5.jar:/opt/storm/lib/reflectasm-1.07-shaded.jar:/opt/storm/lib/jetty-util-6.1.26.jar:/opt/storm/lib/disruptor-2.10.1.jar:KinesisStormSpout14.jar:/opt/storm/conf:/opt/storm/bin -Dstorm.jar=KinesisStormSpout14.jar SampleTopology sample.properties RemoteMode
Exception in thread "main" java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2570)
at java.lang.Class.getMethod0(Class.java:2813)
at java.lang.Class.getMethod(Class.java:1663)
at sun.launcher.LauncherHelper.getMainMethod(LauncherHelper.java:494)
at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:486)
Caused by: java.lang.ClassNotFoundException: com.amazonaws.auth.AWSCredentialsProvider
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 6 more
[vagrant@supervisor1 shared]$ Exception in thread "main" java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider

How to increase the number of actual working spouts?

Hi
I've used the following configuration. However the throughput is not as expected.

Kinesis side:
Number of shards: 5
Around 500,000 rows in kinesis stream.
My topology:
builder.setSpout("kinesis_spout", spout, 10);
BoltDeclarer bolt = builder.setBolt("redis", new KinesisSplitLog(), 50).setNumTasks(200);
bolt.shuffleGrouping("kinesis_spout");

and the configuration:
conf.setNumAckers(0);
conf.setDebug(false);
conf.setNumWorkers(5);
conf.setMaxSpoutPending(5000);

I used TRIM_HORIZON

Result:

  1. Only 3 (sometimes 2) spouts fetch records from kinesis, though the shard number is 5.
  2. The processing rate is ~2000 records/sec at the very first minute. And gradually the rate is decreased and after 2-3 minute the rate is decreased to 200-300 records/sec.

My question is:

  1. How can 5 spouts fetch records concurrently?
  2. My required rate is to process 10,000 records/sec. What would the configuration of topology and thread number of spout and bolt to reach this requirement?

Thanks in advance
Sunoyon

java.lang.RuntimeException: java.lang.NullPointerException in InflightRecordTracker

We are seeing periodically these error messages in our logs. Brain dumping this issue here for now, will follow up with more information as we debug this issue.

In our KinesisSpout fail function, we call "Record record = stateManager.getInflightRecord(shardId, seqNum);" which leads to a NullPointerException.

{
    "timestamp": "2015-06-26T05:08:58.820+0000",
    "level": "ERROR",
    "thread": "Thread-91-kinesis-spout",
    "logger": "backtype.storm.util",
    "message": "Async loop died!",
    "context": "default",
    "exception": "java.lang.RuntimeException: java.lang.NullPointerException\n\tat backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)\n\tat backtype.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:87)\n\tat backtype.storm.disruptor$consume_batch.invoke(disruptor.clj:76)\n\tat backtype.storm.daemon.executor$fn__4654$fn__4669$fn__4698.invoke(executor.clj:542)\n\tat backtype.storm.util$async_loop$fn__458.invoke(util.clj:463)\n\tat clojure.lang.AFn.run(AFn.java:24)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: java.lang.NullPointerException: null\n\tat com.amazonaws.services.kinesis.stormspout.state.zookeeper.InflightRecordTracker.getInflightRecord(InflightRecordTracker.java:210)\n\tat com.amazonaws.services.kinesis.stormspout.state.zookeeper.LocalShardState.getInflightRecord(LocalShardState.java:92)\n\tat com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperStateManager.getInflightRecord(ZookeeperStateManager.java:229)\n\tat com.amazonaws.services.kinesis.stormspout.KinesisSpout.fail(KinesisSpout.java:260)\n\tat backtype.storm.daemon.executor$fail_spout_msg.invoke(executor.clj:372)\n\tat backtype.storm.daemon.executor$fn$reify__4657.expire(executor.clj:432)\n\tat backtype.storm.utils.RotatingMap.rotate(RotatingMap.java:73)\n\tat backtype.storm.daemon.executor$fn__4654$tuple_action_fn__4660.invoke(executor.clj:437)\n\tat backtype.storm.daemon.executor$mk_task_receiver$fn__4645.invoke(executor.clj:404)\n\tat backtype.storm.disruptor$clojure_handler$reify__1446.onEvent(disruptor.clj:58)\n\tat backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)\n\t... 6 common frames omitted\n",
    "source_tag": "tail.dat-cer-sup.workerlog",
    "@timestamp": "2015-06-26T05:08:58+00:00"
  }

Exception in zookeepershardstate.java getLastCommittedSeqNum(shardId) function

I am getting the following Exception when running my topology

10442 [Thread-14-spout] INFO com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperStateManager - ZookeeperStateManager[taskIndex=0]Activating with shardList [shardId-000000000000, shardId-000000000001, shardId-000000000002, shardId-000000000003, shardId-000000000004, shardId-000000000005, shardId-000000000006, shardId-000000000007, shardId-000000000008, shardId-000000000009, shardId-000000000010, shardId-000000000011, shardId-000000000012, shardId-000000000013, shardId-000000000014, shardId-000000000015, shardId-000000000016, shardId-000000000017, shardId-000000000018, shardId-000000000019]
10443 [Thread-16-spout] INFO com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperStateManager - ZookeeperStateManager[taskIndex=0]Activating with shardList [shardId-000000000000, shardId-000000000001, shardId-000000000002, shardId-000000000003, shardId-000000000004, shardId-000000000005, shardId-000000000006, shardId-000000000007, shardId-000000000008, shardId-000000000009, shardId-000000000010, shardId-000000000011, shardId-000000000012, shardId-000000000013, shardId-000000000014, shardId-000000000015, shardId-000000000016, shardId-000000000017, shardId-000000000018, shardId-000000000019]
10456 [Thread-16-spout] INFO com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperShardState - ShardList already initialized in Zookeeper. Assuming it is valid.
10456 [Thread-14-spout] INFO com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperShardState - ShardList already initialized in Zookeeper. Assuming it is valid.
10594 [Thread-16-spout] INFO com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperStateManager - ZookeeperStateManager[taskIndex=1] Got shardList: [shardId-000000000000, shardId-000000000001, shardId-000000000002, shardId-000000000003, shardId-000000000004, shardId-000000000005, shardId-000000000006, shardId-000000000007, shardId-000000000008, shardId-000000000009, shardId-000000000010, shardId-000000000011, shardId-000000000012, shardId-000000000013, shardId-000000000014, shardId-000000000015, shardId-000000000016, shardId-000000000017, shardId-000000000018, shardId-000000000019]
10604 [Thread-14-spout] INFO com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperStateManager - ZookeeperStateManager[taskIndex=0] Got shardList: [shardId-000000000000, shardId-000000000001, shardId-000000000002, shardId-000000000003, shardId-000000000004, shardId-000000000005, shardId-000000000006, shardId-000000000007, shardId-000000000008, shardId-000000000009, shardId-000000000010, shardId-000000000011, shardId-000000000012, shardId-000000000013, shardId-000000000014, shardId-000000000015, shardId-000000000016, shardId-000000000017, shardId-000000000018, shardId-000000000019]
10606 [Thread-16-spout] INFO com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperShardState - No shard state for shardId-000000000001
10606 [Thread-14-spout] INFO com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperShardState - No shard state for shardId-000000000000
10609 [Thread-16-spout] INFO com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperShardState - No shard state for shardId-000000000003
10609 [Thread-14-spout] INFO com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperShardState - No shard state for shardId-000000000002
10610 [Thread-14-spout] INFO com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperShardState - No shard state for shardId-000000000004
10610 [Thread-16-spout] INFO com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperShardState - No shard state for shardId-000000000005
10611 [Thread-16-spout] INFO com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperShardState - No shard state for shardId-000000000007
10611 [Thread-14-spout] INFO com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperShardState - No shard state for shardId-000000000006
10611 [Thread-14-spout] INFO com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperShardState - No shard state for shardId-000000000008
10611 [Thread-16-spout] INFO com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperShardState - No shard state for shardId-000000000009
10612 [Thread-16-spout] INFO com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperShardState - No shard state for shardId-000000000011
10612 [Thread-14-spout] INFO com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperShardState - No shard state for shardId-000000000010
10613 [Thread-14-spout] INFO com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperShardState - No shard state for shardId-000000000012
10613 [Thread-16-spout] INFO com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperShardState - No shard state for shardId-000000000013
10614 [Thread-16-spout] INFO com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperShardState - No shard state for shardId-000000000015
10614 [Thread-14-spout] INFO com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperShardState - No shard state for shardId-000000000014
10614 [Thread-16-spout] INFO com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperShardState - No shard state for shardId-000000000017
10614 [Thread-14-spout] INFO com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperShardState - No shard state for shardId-000000000016
10615 [Thread-14-spout] INFO com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperShardState - No shard state for shardId-000000000018
10615 [Thread-16-spout] INFO com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperShardState - No shard state for shardId-000000000019

zookeeper connection issue in kinesis spout

I am trying to use kinesis spout in my storm topology but when i am runnign my topology there is connection exception in Zookeeperstatemanager.java(104).This is my sample kinesisSpoutconfig which i am using
topologyName = SampleTopology
streamName = AssociatesRealTimeClicks-NA

Use TRIM_HORIZON to start processing from the oldest available record in the shard (if checkpoint is not present)

initialPositionInStream = LATEST
recordRetryLimit = 3
zookeeperEndpoint = localhost:2181

Prefix/path for storing spout state in Zookeeper

zookeeperPrefix = kinesis_spout
SymmetricKeyMaterialSetName = com.amazon.associates.analytics.test.credentialsodin
redisEndpoint = redis-cluster-endpoint
regionName = us-east-1

Feature request: support for de-aggregation

Hello! I was wondering what your views are on de-aggregation. There doesn't seem to be any support in the current version, is this something you have considered?

Best regards

KinesisSpout throwing IllegalArgumentException

I'm encountering an exception when I submit the SampleTopology with a KinesisSpout to a cluster.

com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Can not set java.util.List field com.amazonaws.auth.AWSCredentialsProviderChain.credentialsProviders to KinesisStormClickstreamApp.CustomCredentialsProviderChain Serialization trace: credentialsProviders (KinesisStormClickstreamApp.CustomCredentialsProviderChain) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.amazonaws.services.kinesis.stormspout.SerializationHelper.kryoDeserializeObject(SerializationHelper.java:74) at com.amazonaws.services.kinesis.stormspout.KinesisHelper.getKinesisCredsProvider(KinesisHelper.java:123) at com.amazonaws.services.kinesis.stormspout.KinesisHelper.makeNewKinesisClient(KinesisHelper.java:108) at com.amazonaws.services.kinesis.stormspout.KinesisHelper.getSharedkinesisClient(KinesisHelper.java:116) at com.amazonaws.services.kinesis.stormspout.KinesisHelper.getShardList(KinesisHelper.java:85) at com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperStateManager.activate(ZookeeperStateManager.java:109) at com.amazonaws.services.kinesis.stormspout.KinesisSpout.activate(KinesisSpout.java:125) at backtype.storm.daemon.executor$fn__6579$fn__6594$fn__6623.invoke(executor.clj:563) at backtype.storm.util$async_loop$fn__459.invoke(util.clj:463) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IllegalArgumentException: Can not set java.util.List field com.amazonaws.auth.AWSCredentialsProviderChain.credentialsProviders to KinesisStormClickstreamApp.CustomCredentialsProviderChain at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:164) at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:168) at sun.reflect.UnsafeObjectFieldAccessorImpl.set(UnsafeObjectFieldAccessorImpl.java:81) at java.lang.reflect.Field.set(Field.java:741) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:619)

The sample is here.
https://github.com/awslabs/aws-big-data-blog/tree/master/aws-blog-kinesis-storm-clickstream-app

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.