Code Monkey home page Code Monkey logo

amazon-kinesis-producer's Introduction

Kinesis Producer Library

Build Status

Introduction

The Amazon Kinesis Producer Library (KPL) performs many tasks common to creating efficient and reliable producers for Amazon Kinesis. By using the KPL, customers do not need to develop the same logic every time they create a new application for data ingestion.

For detailed information and installation instructions, see the article Developing Producer Applications for Amazon Kinesis Using the Amazon Kinesis Producer Library in the Amazon Kinesis Developer Guide.

Back-pressure

Please see this blog post for details about writing efficient and reliable producers using the KPL. This blogpost contains details about overhead in various situations in which you might be using the KPL including back-pressure considerations.

The KPL can consume enough memory to crash itself if it gets pushed too many records without time to process them. As a protection against this, we ask that every customer implement back-pressure to protect the KPL process. Once the KPL starts getting too many records in it's buffer it will spend most of it's CPU cycles on record management, rather than record processing making the problem worse. This is highly dependent on the customer record sizes, rates, configurations, host CPU and memory limits.

When deciding the limits of your KPL instance, please consider your MAX record size, MAX request rate spikes, host memory availability, and TTL. If you are buffering requests before going into the KPL, consider that as well since that still puts memory pressure on the host system. If the KPL buffer grows too large it may be forcibly crashed due to memory exhaustion.

Sample Back-pressure implementation:

ClickEvent event = inputQueue.take();
        String partitionKey = event.getSessionId();
        String payload =  event.getPayload();
        ByteBuffer data = ByteBuffer.wrap(payload.getBytes("UTF-8"));
        while (kpl.getOutstandingRecordsCount() > MAX_RECORDS_IN_FLIGHT) {
            Thread.sleep(SLEEP_BACKOFF_IN_MS);
        }
        recordsPut.getAndIncrement();

        ListenableFuture<UserRecordResult> f =
                kpl.addUserRecord(STREAM_NAME, partitionKey, data);
        Futures.addCallback(f, new FutureCallback<UserRecordResult>() {
          ...
          ...

Sample above is provided as an example implementation. Please take your application and use cases into consideration before applying logic

Recommended Upgrade for All Users of 0.15.0 - 0.15.6 Amazon Kinesis Producer

⚠️ It's highly recommended for users of version 0.15.0 - 0.15.6 of the Amazon Kinesis Producer to upgrade to version 0.15.7 . A bug has been identified in versions prior from 0.15.0 - 0.15.6 is causing memory leak issue.

ℹ️ Amazon Kinesis Producer versions prior to 0.15.0 are not impacted.

Recommended Settings for Streams larger than 800 shards

The KPL is an application for ingesting data to your Kinesis Data Streams. As your streams grow you may find the need to tune the KPL to enable it to accommodate the growing needs of your applications. Without optimized configurations your KPL processes will see inefficient CPU usage and delays in writing records into KDS. For streams larger than 800 shards, we recommend the following settings:

  • ThreadingModel= “POOLED”
  • MetricsGranularity= “stream”
  • ThreadPoolSize=128

We recommend performing sufficient testing before applying these changes to production, as every customer has different usage patterns

Required KPL Update – v0.15.0

KPL 0.15.0 now incorporates StreamARN in the Kinesis requests, such as PutRecords and ListShards, to take advantage of Kinesis Data Streams (KDS) enhanced availability as the result of service cellularization. Version 0.15.0 adds STS as the new dependency; by using STS, customers can benefit from StreamARN without modifying any code.

Required KPL Update – v0.14.0

KPL 0.14.0 now uses ListShards API, making it easier for your Kinesis Producer applications to scale. Kinesis Data Streams (KDS) enables you to scale your stream capacity without any changes to producers and consumers. After a scaling event, producer applications need to discover the new shard map. Version 0.14.0 replaces the DescribeStream with the ListShards API for shard discovery. ListShards API supports 100TPS per stream compared to DescribeStream that supports 10TPS per account. For an account with 10 streams using KPL v0.14.0 will provide you a 100X higher call rate for shard discovery, eliminating the need for a DescribeStream API limit increase for scaling. You can find more information on the ListShards API in the Kinesis Data Streams documentation.

Required Upgrade

Starting on February 9, 2018 Amazon Kinesis Data Streams will begin transitioning to certificates issued by Amazon Trust Services (ATS). To continue using the Kinesis Producer Library (KPL) you must upgrade the KPL to version 0.12.6 or later.

If you have further questions please open a GitHub Issue, or create a case with the AWS Support Center.

This is a restatement of the notice published in the Amazon Kinesis Data Streams Developer Guide

Release Notes

0.15.10

  • #560 Reverting to remove a bug with using Stream ARN. Please stay tuned for a future release before using Stream ARN.
  • #526 Drop dependency on jaxb for converting binary arrays to hex
  • 1.1.18Update GSR dependency - To address CVE vulnerability

0.15.9

  • #552 Add StreamARN parameter to support CAA
    • StreamARN parameter can be now be used to benefit from Cross account access for KPL requests.

0.15.8

  • #537 Update to latest version of Glue Schema Registry library

0.15.7

  • #498 Fix some memory leak cases in legacy code
    • Upgrade SDK version to avoid s2n_cleanup related memory leak
    • Fix resource cleanup on KPL end to avoid memory leak

0.15.6

  • #490 Updating aws cpp sdk version

0.15.5

  • #482 Remove the stream arn parameter when the next token is present

0.15.4

0.15.3

  • #478 Update AWS SDK CPP version

0.15.2

  • #471 Upgrade Java dependencies

0.15.1

  • #469 Use AWS CodeBuild to compile C++ binary

0.15.0

  • #465
    • Revert the upgrade of jakarta.xml.bind to be backward-compatible with Java8
    • Add more logs to verify that IMDSV2 is used correctly for getting region info for KPL running in EC2 instances
  • #463
    • Use sts to construct stream arn
    • Exit KPL if STS call fails to avoid dual mode
    • Deprecate IMDSv1 calls for obtaining EC2 metadata
  • #444
    • Update bootstrap.sh to work on three platforms

0.14.13

  • #440
    • Upgrade the dependencies used in bootstrap + Java dependencies
    • Correct the log level discrepancy for the warnings

0.14.12

  • #425 Fix build issues in CI
  • #424 Fix build issues in CI
  • #423 Upgrade GSR version to 1.1.9
  • #420 Fix cpp branch
  • #419 Fix aws-cpp branch
  • #418 Fix travis build
  • #416 Configure dependabot
  • #415 Fix travis build
  • #414 Fix travis build

0.14.11

  • #409 Bump protobuf-java from 3.11.4 to 3.16.1 in /java/amazon-kinesis-producer
  • #408 Update curl version from 7.77 to 7.81
  • #395 Configure dependabot
  • #391 Fixing travis build issues
  • #388 Fixing build issues due to stale CA certs

0.14.10

  • #386 Upgraded Glue schema registry from 1.1.1 to 1.1.5
  • #384 Upgraded logback-classic from 1.2.0 to 1.2.6
  • #323 Upgraded junit from 4.12 to 4.13.1

0.14.9

  • #370 Upgraded build script dependencies
    • Upgraded version of openssl from 1.0.1m to 1.0.2u
    • Upgraded version of boost from 1.61 to 1.76
    • Upgraded version of zlib from 1.2.8 to 1.2.11
  • #377 Added an optimization to filter out closed shards.

0.14.8

  • PR #331 Fixed a typo in README.md
  • PR #363 Upgrading hibernate-validator to 6.0.20.Final
  • PR #365 Upgrading logback-classic to 1.2.0
  • PR #367 Upgrading Glue Schema Registry to 1.1.1

0.14.7

  • PR #350 Upgrading Guava to 29.0-jre
  • PR #352 Upgrading Commons IO to 2.7
  • PR #351 Adding support for proxy configurations
  • PR #356 Fixing build issues in Travis CI

0.14.6

  • [PR #341] Updating Java SDK version in KPL to 1.11.960.

0.14.5

  • [PR #339] Fixing KPL not emmiting Kinesis PutRecords call context metrics.

0.14.4

  • [PR #334] Add support for building multiple architectures, specifically arm64.
    • This now supports AWS Graviton based instances.
    • Bumped Boost slightly to a version that includes Arm support and added the architecture to the path for kinesis_producer.
  • [PR #335] Fixed logging for native layer allowing to enable debug/trace logs.

0.14.3

  • [PR #327] Adding support for timeout on user records at Java layer.
    • New optional KPL config parameter userRecordTimeoutInMillis which can be used to timeout records at the java layer queued for processing.
  • [PR #328] Changing CloudWatch client retry strategy to use default SDK retry strategy with exponential backoff.
  • [PR #324] Adding KPL metric to track the time for oldest user record in processing at the java layer.
  • [PR #318] Fixing bug where KPL goes into a continuous retry storm if the stream is deleted and re-created.

0.14.2

  • [PR #320] Adding support for Glue Schema Registry.
    • Serialize and send schemas along with records, support for compression and auto-registration of schemas.
  • [PR #316] Bumping junit from 4.12 to 4.13.1
  • [PR #312] Adding new parameter in KPL config to allow cert path to be overridden.
  • [PR #310] Fixing bug to make the executor service to use 4*num_cores threads.
  • [PR #307] Dependency Upgrade
    • Upgrade Guava to 26.0-jre
    • Update BOOST C++ Libraries link as cert expired on the older link

0.14.1

  • [PR #302] Dependency Upgrade
    • upgrade org.hibernate.validator:hibernate-validator 6.0.2.Final -> 6.0.18.Final
    • upgrade com.google.guava:guava 18.0 -> 24.1.1-jre
  • [PR #300] Fix Travis CI build issues
  • [PR #298] Upgrade google-protobuf to 3.11.4

0.14.0

  • Note: Windows platform will be unsupported going forward for this library.
  • [PR #280] When aggregation is enabled and all the buffer time is consumed for aggregating User records into Kinesis records, allow some additional buffer time for aggregating Kinesis Records into PutRecords calls.
  • [PR #260] Added endpoint for China Ningxia region (cn-northwest-1).
  • [PR #277] Changed mechanism to update the shard map
    • Switched to using ListShards instead of DescribeStream, as this is a more scalable API
    • Reduced the number of unnecessary shard map invalidations
    • Reduced the number of unnecessary update shard map calls
    • Reduced logging noise for aggregated records landing on an unexpected shard
  • [PR #276] Updated AWS SDK from 1.0.5 to 1.7.180
  • [PR #275] Improved the sample code to avoid need to edit code to run.
  • [PR #274] Updated bootstrap.sh to build all dependencies and pack binaries into the jar.
  • [PR #273] Added compile flags to enable compiling aws-sdk-cpp with Gcc7.
  • [PR #229] Fixed bootstrap.sh to download dependent libraries directly from source.
  • [PR #246] [PR #264] Various Typos

0.13.1

  • Including windows binary for Apache 2.0 release.

0.13.0

  • [PR #256] Update KPL to Apache 2.0

0.12.11

Java

  • Bump up the version to 0.12.11.

Older release notes moved to CHANGELOG.md

Supported Platforms and Languages

The KPL is written in C++ and runs as a child process to the main user process. Precompiled native binaries are bundled with the Java release and are managed by the Java wrapper.

The Java package should run without the need to install any additional native libraries on the following operating systems:

  • Linux distributions with glibc 2.9 or later
  • Apple OS X 10.13 and later

Note the release is 64-bit only.

Sample Code

A sample java project is available in java/amazon-kinesis-sample.

Compiling the Native Code

Rather than compiling from source, Java developers are encouraged to use the KPL release in Maven, which includes pre-compiled native binaries for Linux, macOS.

To build the native components and bundle them into the jar, you can run the ./bootstrap.sh which will download the dependencies, build them, then build the native binaries, bundle them into the java resources folder, and then build the java packages. This must be done on the platform you are planning to execute the jars on.

Using the Java Wrapper with the Compiled Native Binaries

There are two options. You can either pack the binaries into the jar like we did for the official release, or you can deploy the native binaries separately and point the java code at it.

Pointing the Java wrapper at a Custom Binary

The KinesisProducerConfiguration class provides an option setNativeExecutable(String val). You can use this to provide a path to the kinesis_producer[.exe] executable you have built. You have to use backslashes to delimit paths on Windows if giving a string literal.

amazon-kinesis-producer's People

Contributors

aakkem avatar ankugar avatar ashwing avatar avahuang0429 avatar avinashchowdary avatar aws-nageshvh avatar awslankakamal avatar blacktooth avatar cory-bradshaw avatar damianwylie avatar dependabot[bot] avatar girida-amazon avatar isurues avatar joshcough avatar joshua-kim avatar keerthy411 avatar kevincdeng avatar markglh avatar matchav avatar mertindervish avatar pfifer avatar qaqj avatar sayantacc avatar shanmsac avatar skidder avatar sudohalt avatar theovoss avatar xbcratos avatar yatins47 avatar zengyu714 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

amazon-kinesis-producer's Issues

No implementation of get_sesssion_token for EnvVarAwsCredentialsProvider

Hi everyone,

It looks like there is no implementation of get_session_token for any credential provider other than InstanceProfileAwsCredentialsProvider (

boost::optional<std::string> get_session_token() override;
).

This would be really helpful to have for at least EnvVarAwsCredentialsProvider since I use session tokens for local development and it would also allow for the KPL to be used in Lambda since the credentials are passed there via environment variables.

I'd love to do a PR but I can't seem to build on my machine. Curl seems to become unhappy on my Arch laptop, but that's another issue.

curl: symbol lookup error: /usr/lib/libssh2.so.1: undefined symbol: EVP_cast5_cbc

thanks

No support for http proxies

I can't see any mention of http proxying anywhere in the Java bindings or in the underlying C++ code.

What I've gathered from a cursory glance is that the HTTP requests to Kinesis are happening in C++. I definitely don't feel confident enough to roll up my sleeves and crank out a PR to add support for http proxies in there ;)

IKVM of amazon-kinesis-producer-0.10.2.jar error

Hi there,

I am trying to IKVM the above jar file into a dll for use in .Net.
I have tried to convert it and it failed conversion with a long list of warnings that imported classes could not be found.
I am receiving the following error when tying to run the jar:
C:\Users\Andrew\Downloads\ikvmbin-7.2.4630.5\ikvm-7.2.4630.5\bin>ikvm -jar C:\Users\Andrew\Downloads\kinesis_maven_build\amazon-kinesis-producer-0.10.2.jar
Manifest doesn't contain a Main-Class.

My TeamCity Maven build fails as well with the following error:
"Failed to execute goal on project amazon-kinesis-producer: Could not resolve dependencies for project com.amazonaws:amazon-kinesis-producer:jar:0.10.2: The following artifacts could not be resolved: com.google.guava:guava:jar:18.0, commons-io:commons-io:jar:2.4, commons-lang:commons-lang:jar:2.6, commons-logging:commons-logging:jar:1.1.3, org.apache.httpcomponents:httpcore:jar:4.3.3, com.fasterxml.jackson.core:jackson-core:jar:2.5.3, joda-time:joda-time:jar:2.8.1: Could not transfer artifact com.google.guava:guava:jar:18.0 from/to central (http://repo.maven.apache.org/maven2): Connection reset"

When executing the tests.exe file after going through the "Building on Windows with MinGW" process, the tests seem to fine and nothing errors out.

Kind Regards,
Andrew

Can't access cn-north-1 ?

Currently doesn't support BJS region ?

[error] [http_client.cc:148] Failed to open connection to kinesis.cn-north-1.amazonaws.com:443 : Host not found (authoritative)

Sending metrics to CloudWatch failed when metrics namespace contained a /

I set the metrics namespace to "FD/EventCollector" and added an additional metric dimension named "Environment". With this configuration I kept getting this error message from the KPL:

[2016-02-09 03:53:35.034485] [0x00007f18e54d3740] [error] [metrics_manager.cc:190] Metrics upload failed:
<ErrorResponse xmlns="http://monitoring.amazonaws.com/doc/2010-08-01/">
  <Error>
    <Type>Sender</Type>
    <Code>SignatureDoesNotMatch</Code>
    <Message>The request signature we calculated does not match the signature you provided. Check your AWS Secret Access Key and signing method. Consult the service documentation for details.

The Canonical String for this request should have been
'POST
/doc/2010-08-01/
Action=PutMetricData&amp;MetricData.member.1.Dimensions.member.1.Name=Environment&amp;MetricData.member.1.Dimensions.member.1.Value=...

I removed the / from the namespace and the problem stopped happening.

Operating system:

$ cat /etc/lsb-release
DISTRIB_ID=Ubuntu
DISTRIB_RELEASE=12.04
DISTRIB_CODENAME=precise
DISTRIB_DESCRIPTION="Ubuntu 12.04.5 LTS"
$ uname -a
Linux dbvhostweb 3.2.0-97-virtual #137-Ubuntu SMP Thu Dec 17 18:38:14 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux

MultiLangDaemon plans?

Is there a plan to add MLD similar to KCL daemon? I need to write producer in Ruby and thinking between using API and KPL, but KPL would require jruby currently :\

Failed to open connection to kinesis

Hi,

I got an error messages:
[2015-12-12 03:48:30.050520] [0x00007f2aab010700] [error] [http_client.cc:148] Failed to open connection to kinesis.us-west-2.amazonaws.com:443 : Operation can celed
2015-12-12 05:36:39.968935 - error: Failed to open connection to kinesis.us-west-2.amazonaws.com:443 : Operation canceled
2015-12-12 05:36:57.007662 - error: Failed to open connection to kinesis.us-west-2.amazonaws.com:443 : Operation canceled
2015-12-12 05:37:03.335249 - error: Failed to open connection to kinesis.us-west-2.amazonaws.com:443 : Operation canceled

Can someone please advise what causes this error message?

Thanks,
--Gil.

Remove slf4j-simple from the deployed artifact

The maven artifact is deployed with a runtime dependency on slf4j-simple implementation, which may cause a problem in runtime when the user of the library uses another slf4j implementation such as log4j or logback.

Please change the dependency scope of slf4j-simple to "test" in java/amazon-kinesis-producer/pom.xml

Producer fails to start silently

I had a problem with transitive dependencies requiring different versions of the Jackson library. I had an incorrect POM definition that included the older Jackson version, so when the auth chain calls the ProfileCredentialsProvider the call crashed due to a LinkageError because the provider uses a non existent method in that old version. So far it's an expected behavior.

What is not expected is the producer behaves like it were working correctly, even when the daemon is not even running. Moreover, is the library doesn't generate any error logging at all. We had to enable remote debug do discover that.

The problem is the try & catch in the Daemon constructor. I'm not saying that the constructor should catch the error, since Error classes should bubble up by definition, but there should be a way to inform the client code that the daemon failed to start using a exception or something, don't you think? I mean, why the fork is happening inside an executor and not in the current thread? Or at least, it should check the future to get the exception if that is the case. Maybe I'm missing something regarding cross-platform compatibility or something like that.

With the current implementation the client code doesn't have any method to know that the shit hit the fan so the app can't act according to it 👎

Native binary output not routed into SLF4J

Right now, any messages printed by the native binary (including error messages) are sent directly to the console - so they're lost if console output is unavailable (for example, when running via Boxfuse with logs sent to Graylog, as we happen to do). The proper solution would be to read stdout/stderr of the native process and output each line as a log message.

LD_LIBRARY_PATH is mangled for the native process

When launching the native process, LD_LIBRARY_PATH is set to the directory where the native executable is unpacked (or cleared completely, if a custom native executable location is specified). This leads to standard dynamically linked libraries (like libc and libm) being not available to the native process on some platforms (like Boxfuse). Additionally, simply copying (or symlinking) library binaries doesn't completely solve the problem - the executable starts, but then fails immediately with errors about AWS endpoint hosts being not found.

The solution is to either propagate LD_LIBRARY_PATH to the child process, or to add a property to KinesisProducerConfiguration to set the library path explicitly.

Error during socket read: End of file

I am trying to use the jar as a library and I am getting the following exception

E0612 22:21:52.243887 120803328 io_service_socket.h:196] Error during socket read: End of file; 0 bytes read so far

E0612 22:23:36.134788 120803328 io_service_socket.h:169] Error during socket write: Broken pipe; 0 bytes out of 816 written

Environment:

  • JRuby 1.7.12 , 1.9 mode
  • JDK "1.8.0_40-ea"
  • Mac OSX 10.10.3
  • amazon-kinesis-producer-0.9.0.jar
  • protobuf-java-2.6.1.jar
  • guava-18.0.jar
  • commons-lang-2.6.jar
  • commons-compress-1.9.jar
  • commons-io-2.4.jar
  • slf4j-simple-1.7.12.jar
  • slf4j-api-1.7.12.jar

code:

          java_import 'com.amazonaws.kinesis.producer.Configuration'
          java_import 'com.amazonaws.kinesis.producer.KinesisProducer'
          java_import 'java.nio.ByteBuffer'
          java_import 'com.google.common.util.concurrent.FutureCallback'
          java_import 'com.google.common.util.concurrent.Futures'

          kinesis_config = Configuration.new
          kinesis_config.setRegion('us-east-1');
          kinesis_config.setAwsAccessKeyId("xxxx");
          kinesis_config.setAwsSecretKey("xxxx");
          kinesis_config.setMaxConnections(1);
          kinesis_config.setRequestTimeout(60000);
          kinesis_config.setRecordMaxBufferedTime(101);
          @kinesis_producer = KinesisProducer.new(kinesis_config);

          data = {}
          data[:foo] = :bar
          b= ByteBuffer.wrap(data.to_json.to_java_bytes)
          f = @kinesis_producer.addUserRecord("stream_name",event_id.to_s,b)          

KPL sent records out of order

KPL sent records of out order even when maxConnections was set to 1 and no error/retry occurred. I cannot find detail of KPL order guarantee in the documents. Is there a way to ensure ordering of records sent from KPL?

Disk-based persistence

Hi,

We're designing a system that will pump data to Kinesis and are concerned about temporary unavailability of the Kinesis service, going over capacity for let's say an hour, or failures of individual shards or network connections to them.

If I understand correctly, Kinesis Producer Library buffers everything in memory. This gives us a very very small buffer (time wise) as we will quickly run out of memory.

We're thinking of building a buffer in front of KPL, but it seems more efficient for this to be done inside the KPL, ideally somewhere between the reducer and the limiter, so at the shard level.

Has this been considered yet? How do most people handle this? Just pass to KPL, let it buffer in memory and cross fingers?

Thanks,
Jaka

//cc @apejcic

InvalidSignatureException for DescribeStream and PutRecords

Hi, I am experiencing an issue on OSX with the Java wrapper. Normally I know InvalidSignatureExceptions are due to bad keys, but I am using the exact same CredentialsProvider that I've specified in the KinesisProducerConfiguration as I am with the AmazonKinesisClient (in the SDK).

I can successfully make a DescribeStream call and a PutRecords call with AmazonKinesisClient but not with this library. I would provide more debugging info such as the outgoing request, but I am unable to determine where that's actually happening (the OSX binary?). I have also double checked my IAM permissions and I have all kinesis operations allowed.

[2015-11-18 16:42:10.787369] [0x0000000104081000] [info] [kinesis_producer.cc:79] Created pipeline for stream "curt_MixedEvent_Stream"
[2015-11-18 16:42:10.787472] [0x0000000104081000] [info] [shard_map.cc:83] Updating shard map for stream "curt_MixedEvent_Stream"
[2015-11-18 16:42:10.820452] [0x00007fff7eea2300] [error] [shard_map.cc:172] Shard map update for stream "curt_MixedEvent_Stream" failed: {"__type":"InvalidSignatureException","message":"The request signature we calculated does not match the signature you provided. Check your AWS Secret Access Key and signing method. Consult the service documentation for details.\n\nThe Canonical String for this request should have been\n'POST\n/\n\nconnection:Keep-Alive\ncontent-length:39\ncontent-type:application/x-amz-json-1.1\nhost:kinesis.us-east-1.amazonaws.com\nuser-agent:KinesisProducerLibrary/0.10.1 | Darwin | 14.5.0 | Darwin Kernel Version 14.5.0: Tue Sep 1 21:23:09 PDT 2015; root:xnu-2782.50.1~1/RELEASE_X86_64 | x86_64\nx-amz-date:20151118T214210Z\nx-amz-target:Kinesis_20131202.DescribeStream\n\nconnection;content-length;content-type;host;user-agent;x-amz-date;x-amz-target\n6b50969c2daf6a0562021da1188d7e4c5a4265580c9a04c35b74d31810ade5bd'\n\nThe String-to-Sign should have been\n'AWS4-HMAC-SHA256\n20151118T214210Z\n20151118/us-east-1/kinesis/aws4_request\n04eacd3ded9c591a5303499feb1412f65cd698eeb4169283e762ce8147ab7e66'\n"}; retrying in 1000 ms
[2015-11-18 16:42:11.012258] [0x0000000104081000] [error] [retrier.cc:59] PutRecords failed: {"__type":"InvalidSignatureException","message":"The request signature we calculated does not match the signature you provided. Check your AWS Secret Access Key and signing method. Consult the service documentation for details.\n\nThe Canonical String for this request should have been\n'POST\n/\n\nconnection:Keep-Alive\ncontent-length:21076\ncontent-type:application/x-amz-json-1.1\nhost:kinesis.us-east-1.amazonaws.com\nuser-agent:KinesisProducerLibrary/0.10.1 | Darwin | 14.5.0 | Darwin Kernel Version 14.5.0: Tue Sep 1 21:23:09 PDT 2015; root:xnu-2782.50.1~1/RELEASE_X86_64 | x86_64\nx-amz-date:20151118T214210Z\nx-amz-target:Kinesis_20131202.PutRecords\n\nconnection;content-length;content-type;host;user-agent;x-amz-date;x-amz-target\n8145baebe2f73d3ba0f2da60ce4db8e2cf8bb53727f4f7236ad8b78acb0ffd6e'\n\nThe String-to-Sign should have been\n'AWS4-HMAC-SHA256\n20151118T214210Z\n20151118/us-east-1/kinesis/aws4_request\n30d1d07bb192ddd60290476c51311a5f023e447770347b5b5780cd96f11b93e8'\n"}

I have confirmed the same issue on another OSX machine. I am using v0.10.1 for amazon-kinesis-producer and v1.10.34 for aws-java-sdk-kinesis.

Unable to trap failures

I am trying to use the jar as a library and I am unable to trap any errors.

I am using the following class

        class KinesisFutureCallback
          include FutureCallback
          def initialize logger
            @logger = logger
          end
          def onFailure(e)
            puts "failure"
            @logger.error "Failed in publishing event to Kinesis"
            @logger.error e
          end

          def onSuccess(r)
            puts "success"
            @logger.info "Published event to kinesis #{r}"
          end
        end
@kinesis_callback = KinesisFutureCallback.new(@logger)
            Futures.addCallback(
                @kinesis_producer.addUserRecord(
                    @kinesis_stream_name,
                    Time.now.to_i.to_s,
                    ByteBuffer.wrap({:foo => :bar}.to_json.to_java_bytes)
                ),
                @kinesis_callback)

I am able to trap success. But even if I try to turn off my internet connection I do not end up in the failure block. The library auto-retries. Also if I send a STOP signal to the application it does not terminate at that point. It keeps retrying.

Environment:

  • JRuby 1.7.12 , 1.9 mode
  • JDK "1.8.0_40-ea"
  • Mac OSX 10.10.3
  • amazon-kinesis-producer-0.9.0.jar
  • protobuf-java-2.6.1.jar
  • guava-18.0.jar
  • commons-lang-2.6.jar
  • commons-compress-1.9.jar
  • commons-io-2.4.jar
  • slf4j-simple-1.7.12.jar
  • slf4j-api-1.7.12.jar

Production goes to that of 1 shard after splitting 25 shards to 75

We recently increased the shards of our stream to 75 from 25. We were getting a great 25KMPS with two workers. Immediately after the shard up completed we are not producing at the rate I'd expect for one shard, about 1KMPS. I've even tried restarting our worker instances that use the KPL.

Kinesis API signatures should be recalculated when network is unavailable for > 5 minutes.

Ref: AWS Support case 1643879051.

When the network cable is removed from a Java/KPL client which pushes one payload every minute, after more than 5 minutes, the initial payloads raise:

[2016-02-04 14:39:19.910029] [0x000070000028d000] [error] [retrier.cc:59] PutRecords failed:
 {"__type":"InvalidSignatureException","message":
 "Signature expired: 20160204T133419Z is now earlier than 20160204T133420Z (20160204T133920Z - 5 min.)"}

We believe the KPL is trying the same PUT without recalculating the signature information. AWS Support are taking the view that this should be handled by the client (which is in fact the workaround) but I think it's a leaky abstraction - the KPL should do this, since signature handling is not the client's responsibility. Another factor is that the record TTL is set to an hour - I wouldn't expect a failure callback until that hour had elapsed.

Revisions:

  • amazon-kinesis-producer 0.10.2
  • aws-java-sdk-core 1.10.50

amazonaws.com.cn not supported on metrics_manager.h

The domain is amazonaws.com.cn when I use kinesis on China(Beijing) region

The configuration of KPL should be the region "cn-north-1".
When I only set this region on configuration for KPL,
the request will send to
monitoring.cn-north-1.amazonaws.com and kinesis.cn-north-1.amazonaws.com, while they are not exist.

I found another method config.setCustomEndpoint could resolve this.
But config.setCustomEndpoint for both kinesis and cloudwatch at once. When I set CustomEndpoint
to kinesis.cn-north-1.amazonaws.com.cn the cloudwatch uploading will not work.

Someone can resolve this issue? Either develop separate endpoints for kinesis and cloudwatch or add new method for the domain setting?
Thanks.

Followings are the hardcode

std::chrono::milliseconds upload_frequency = std::chrono::minutes(1),
std::chrono::milliseconds retry_frequency = std::chrono::seconds(10))
: executor_(std::move(executor)),
endpoint_(custom_endpoint.empty()
? "monitoring." + region + ".amazonaws.com"
: custom_endpoint),

Orphaned kinesis_producer native processes go berserk!

I'm not quite sure how to reproduce this, but I've had it happen on my local laptop for testing and also on a few boxes that I deployed a service to that uses KPL.

Anyway, if you somehow managed to kill the parent KPL process and not the native kinesis_producer binary, it sits around indefinitely and seems to go into a busy-wait spin. It starts chewing up basically a whole CPU doing nothing in particular.

I think it should be more robust than that right? Is it possible to detect that the IPC pipe is dead and just exit gracefully or something?

Java Process does not restart Daemon after crash

If the C++ child process dies the Java process does nothing to restart it. Weird. So there appears to be no recovery options. For other's having this problem the best thing I can suggest is catching the Daemon Exception and then exiting the Java process so that something can restart it. But it appears you are pretty much dead in the water.

My hope is that the KPL team will have the Java process restart the child process when it crashes (so the java process does not have to be restarted).

KPL retries never seem to work

Until recently we were using the basic Kinesis API to write to our streams. This worked quite well, but occasionally we would lose messages due to instability in AWS Kinesis (usually transient 500 errors when writing to Kinesis)

Rather than writing our own retry logic to deal with this, we decided to use the KPL as recommended in the official AWS Kinesis documentation.

Unfortunately we now still lose messages as we see every retry fail (with opaque error messages). This seems to happen with about the same frequency as we were experiencing problems with the vanilla client library. This leads me to suspect that the KPL retries are simply not working properly.

Here are some examples of the kind of thing we see

2016-07-13 22:21:28.000 (UTC + 10) Message with partition key 81918 failed on attempt 1 with delay 1000 over duration 3339 with error End of file (Exception)
2016-07-13 22:21:28.000 (UTC + 10) Message with partition key 81918 failed on attempt 2 with delay -631325014 over duration 631352083 with error Expired while waiting in HttpClient queue (Exception)
2016-07-13 22:21:28.000 (UTC + 10) Message with partition key 81918 failed on attempt 3 with delay 0 over duration 0 with error Record has reached expiration (Expired)

Notice the negative value for the delay on the second attempt, which doesn't make a lot of sense. This looks like the initial error 'End of File' has broken something that caused all the retries to fail after that.

We encountered a few messages lost like that, and then messages started failing on the first attempt without encountering the 'End of File' error. For example:

2016-07-13 22:21:31.000 (UTC + 10) Message with partition key 81918 failed on attempt 1 with delay -631325016 over duration 631352083 with error Expired while waiting in HttpClient queue (Exception)
2016-07-13 22:21:31.000 (UTC + 10)  Message with partition key 81918 failed on attempt 2 with delay -631352083 over duration 631355084 with error Expired while waiting in HttpClient queue (Exception)
2016-07-13 22:21:31.000  (UTC + 10) Message with partition key 81918 failed on attempt 3 with delay 0 over duration 0 with error Record has reached expiration (Expired)

This keeps happening for a while before eventually, everything starts working again.

I can see a number of other people are encountering similar issues:
#30 shows a similar scenario where an initial error seems to cause subsequent failures with ' Expired while waiting in HttpClient queue' and a negative delay.
#12 also shows ' Expired while waiting in HttpClient queue' and a negative delay errors

These issues have been open for quite some time - Is anyone still supporting this library?

KPL native binaries consume all memory => crash

We updated our java application that sends data to Kinesis in order to use KPL (0.10.0) last week. Our application has crashed 3 times since the update. It crashes because it runs out of memory.

Memory is not exhausted by java application directly. It is consumed by external process that is spawned by KPL. It seems that KPL uses some native library that does not properly manage memory.

image


Env: Debian 7; Java 8.u51; KPL version 0.10.0

ExpiredTokenException

I have setup the KPL to post to Kinesis using an assumed role by using STSAssumeRoleSessionCredentialsProvider as credentials provider.

Now, this worked well for some time, but I ended up after a few hours with these errors:

17:28:54.774 [kpl-callback-pool-0-thread-0] WARN  c.s.bui.kinesis.KinesisOutput - Record failed to put, partitionKey=42, attempts:
Delay after prev attempt: 1508 ms, Duration: 4 ms, Code: 400, Message: {"__type":"ExpiredTokenException","message":"The security token included in the request is expired"}
17:28:54.774 [kpl-callback-pool-0-thread-0] ERROR c.s.bui.kinesis.KinesisOutput - Exception while posting to kinesis
com.amazonaws.services.kinesis.producer.UserRecordFailedException: null
    at com.amazonaws.services.kinesis.producer.KinesisProducer$MessageHandler.onPutRecordResult(KinesisProducer.java:188) [amazon-kinesis-producer-0.10.2.jar:na]
    at com.amazonaws.services.kinesis.producer.KinesisProducer$MessageHandler.access$000(KinesisProducer.java:127) [amazon-kinesis-producer-0.10.2.jar:na]
    at com.amazonaws.services.kinesis.producer.KinesisProducer$MessageHandler$1.run(KinesisProducer.java:134) [amazon-kinesis-producer-0.10.2.jar:na]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_72-internal]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_72-internal]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_72-internal]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_72-internal]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_72-internal]

Does the Java code renew credentials often enough and hand them over to the native daemon?

KinesisProducer.destroy() method seems not to stop all threads

It seems that calling destroy() doesn't kill all threads. I made a very small code and it seems that it does not terminate even after calling destroy() on a KinesisProducer instance even if I don't publish messages on it. For now, I'm working around this issue by invoking System.exit() after calling destroy when shutting down my application.

Is there a plan to make a windows build of the native libraries?

I do my development on windows (I run ubuntu on the real servers), so I was wondering if there was a plan to make windows binaries and include them in the KPL release in Maven?

I saw some code in the library that extracts the binaries checking for windows, so I was wondering...

Thanks

Occasionally Get a BUNCH this Opaque Error

Every once in a while I get a bunch of these at once (like hundreds). Unfortunately they are pretty opaque. I don't know what it means nor what I am supposed to do in response to it. Is retry possible in this case? Does a FATAL error mean I need to restart the native lib? So yeah, what does it mean, why did it happen, and what should I do.

java.lang.RuntimeException: EOF reached during read
    at com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:498)
    at com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:480)
    at com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:476)
    at com.amazonaws.services.kinesis.producer.Daemon.readSome(Daemon.java:519)
    at com.amazonaws.services.kinesis.producer.Daemon.receiveMessage(Daemon.java:241)
    at com.amazonaws.services.kinesis.producer.Daemon.access$500(Daemon.java:61) 
    at com.amazonaws.services.kinesis.producer.Daemon$3.run(Daemon.java:296) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_66]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_66]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_66]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_66]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_66]

0.10.0 Introduces "Could not retrieve credentials from anywhere" on any type of credentials provider

I have the simplest case of a producer, one that gets

Config either from a StaticCredentialsProvider,
or a ClassPathCredentialsProviderr and I place the AwsCredentials.properties file in the classpath. Or a DefaultCredentialsProvider and specify the

System.setProperty("aws.secretKey", "XXX"); 
System.setProperty("aws.accessKeyId", "XXXX");

But every time I get the error in the daemon:
Could not retrieve credentials from anywhere

Seems like the args to the process builder are not correctly passed? Here is stack trace, not that it helps:

[2015-10-12 23:54:26.443458] [0x00007f5cb52dc780] [error] [main.cc:156] Could not retrieve credentials from anywhere.
[pool-2-thread-1] ERROR com.amazonaws.services.kinesis.producer.KinesisProducer - Error in child process
com.amazonaws.services.kinesis.producer.IrrecoverableError: Child process exited with code 1
    at com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:502)
    at com.amazonaws.services.kinesis.producer.Daemon.fatalError(Daemon.java:480)
    at com.amazonaws.services.kinesis.producer.Daemon.startChildProcess(Daemon.java:460)
    at com.amazonaws.services.kinesis.producer.Daemon.access$100(Daemon.java:61)
    at com.amazonaws.services.kinesis.producer.Daemon$1.run(Daemon.java:128)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:744) 

Debugging shows that regardless which credentials provider I specify, it always is done correctly on the java side, but then things get messy during the process builder protobuffing the last argument to the process for the accessKey and the secretKey. This is always reproducible and happens on 0.10.0 and 0.10.1

Support for proto3?

Would it be possible to move to proto3?

The reason I ask is that Google is adding native support for Ruby, but only for proto3. This is relevant for people using fluentd to send to Kinesis via the Kinesis output plugin, as fluentd plugins must be implemented in Ruby. The existing plugin works using a Ruby implementation of protobufs, but it suffers from performance problems: awslabs/aws-fluent-plugin-kinesis#66.

Exception caused in unrelated S3 test on 0.10.0 update

Not sure how this is caused the by upgrade to 0.10.0, but I only started seeing this with this update:
java.lang.AbstractMethodError: com.amazonaws.services.s3.internal.S3Signer.sign(Lcom/amazonaws/SignableRequest;Lcom/amazonaws/auth/AWSCredentials;)

'std::runtime_error' what(): EOF reached while reading from ipc channel

Hi,

After several hours of writing records into Kinesis stream we get the following error:

2015-10-22 00:22:27.679701] [0x00007f6ca799f700] [error] [io_service_socket.h:229] Error during socket read: Operation canceled; 0 bytes read so far (kinesis.us-west-2.amazonaws.com:443)
terminate called after throwing an instance of 'std::runtime_error'
what(): EOF reached while reading from ipc channel

We are using logstash plugin: logstash-output-kinesis which runs on top of KPL and this error stops our logstash from running.

I will appreciate any assistance!

Thanks,
--Gil.

KPL sends invalid HTTP requests to EC2 metadata service

Hello,

we've been trying to get the KPL to run using IAM roles and instance profiles instead of hardcoding a key pair. Whenever we try to run this we get the following errors:

[2016-08-29 08:52:50.816171] [0x00007f4f66f4d700] [error] [io_service_socket.h:229] Error during socket read: End of file; 0 bytes read so far (169.254.169.254:80)
[2016-08-29 08:52:50.917202] [0x00007f4f66f4d700] [error] [io_service_socket.h:229] Error during socket read: End of file; 0 bytes read so far (169.254.169.254:80)
[2016-08-29 08:52:51.058269] [0x00007f4f6884f740] [error] [main.cc:156] Could not retrieve credentials from anywhere.

This seems very strange because using wget from the place where the KPL is running to get credentials from the metadata server works fine. Therefore I cranked out tcpdump to figure out what's going on.

Example of the data stream of a working wget-request:

GET /latest/meta-data/iam/security-credentials/flink-worker HTTP/1.1
User-Agent: Wget/1.16 (linux-gnu)
Accept: */*
Host: 169.254.169.254
Connection: Keep-Alive

HTTP/1.1 200 OK
Server: EC2ws
Date: Mon, 29 Aug 2016 08:52:33 GMT
Content-Length: 551
Content-Type: text/plain; charset=utf-8

{"Code":"Success","LastUpdated":"2016-08-29T08:41:55Z","Type":"AWS-HMAC","AccessKeyId":"REDACTED","SecretAccessKey":"REDACTED","Token":"REDACTED"}

(Note that the KPL has to first do a call to retrieve the role name, which we already included).

Now for the fun part. Example of a request sent by the KPL:

GET /latest/meta-data/iam/security-credentials/ HTTP/1.1

HTTP/1.1 400 Bad Request
Content-Type: text/plain
Connection: close

400 Bad Request: missing required Host header

It seems to be sending invalid HTTP requests (the Host header is required). This seemed very strange to us because surely somebody must have run this code against the EC2 metadata server?

We took a look at the C++ source and ended up with this in the HTTP client code:

// http_request.cc
std::string HttpRequest::to_string() {
  generate_preamble();
  return preamble_ + data_;
}

void HttpRequest::generate_preamble() {
  std::stringstream ss;
  ss << method_ << " "
     << path_ << " "
     << http_version_
     << "\r\n";
  for (const auto& h : headers_) {
    ss << h.first << ":" << h.second << "\r\n";
  }
  ss << "\r\n";
  preamble_ = std::move(ss.str());
}

The Host header must be added manually while building up the request. However this never actually happens for the EC2 metadata calls, only for Kinesis itself and some metrics service:

~/s/amazon-kinesis-producer> ag -Q 'req.add_header("Host"'
aws/http/http.cc
57:  req.add_header("Host", "kinesis." + region + ".amazonaws.com");

aws/metrics/metrics_manager.cc
158:  req.add_header("Host", endpoint_);

This seems like a bug and should probably be addressed.

addUserRecord call throws DaemonException

Sometimes calling addUserRecord starts to throw:

com.amazonaws.services.kinesis.producer.DaemonException: The child process has been shutdown and can no longer accept messages.
    at com.amazonaws.services.kinesis.producer.Daemon.add(Daemon.java:171) ~[amazon-kinesis-producer-0.10.2.jar:na]
    at com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:467) ~[amazon-kinesis-producer-0.10.2.jar:na]
    at com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:338) ~[amazon-kinesis-producer-0.10.2.jar:na]

The KPL does not seems to recover from this. All further calls to addUserRecord also fail. Restarting the KPL java process fixes the situation.

This seems to happen when the kinesis stream is throttling requests so my guess is that the native process cant write to the stream quickly enough and runs out of memory. If that's the case my expectation would be that the native process should start to discard older data and of course that if the native process dies the KPL recovers to a working state.

InterruptedException: sleep interrupted during KPL shutdown

We have noticed that in out CI and sometimes even in PROD we see InterruptedException being logged by KPL during shutdown:

[pool-6-thread-6] WARN com.amazonaws.services.kinesis.producer.Daemon - Exception during updateCredentials
java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method)
    at com.amazonaws.services.kinesis.producer.Daemon$5.run(Daemon.java:316)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

As I understand that is because we use updateCredentials which sleeps for a number of milliseconds before trying to update credentials. And during shutdown this InterruptedException is raised.
It is not an error but it does pollute logs with stack traces which gets especially noisy in CI.
I have raised a PR to fix that issue by not logging an InterruptedException during normal shutdown process.
Can we please get some attention to this PR?

Unexpected peformance

I'm seeing fewer puts per second than I would expect. I have a 25 shard stream and to stream producers running on m4.xlarge instances. When one instance is running I get about 25K #addUserRecord calls per second with aggregation enabled. Before KPL I would get about 25K puts per second with about 25M bytes per second throughput.

With KPL I still get 25M bytes per second throughput but I only get about 200 puts per second throughput. I guess I expected this to be a lot higher with aggregation. I figured I'd get >25K puts per second instead of only ~200-300 puts per second.

Is this a documentation error, for example, can I not go any higher than 25M bytes per second with 25 shards?

recordTtl = Long.MAX_VALUE causes records to expire immediately & no standard exception message in UserRecordFailedException

I'm fairly new to Kinesis and any help is greatly appreciated.

I pretty much followed the code example on AWS to the letter but every single callback fails with a null message. I'm pretty stuck now as I can't find anyone help having similar problems.

I ran the program on both Rackspace Linux and Mac OS X with the same result. I've also tried changing the partition key and the record content without success.

I'm using the latest 0.10.2 and installed using Maven.

Included the full stack trace below:

[2016-06-07 18:28:54.862884] [0x0000700000081000] [info] [kinesis_producer.cc:79] Created pipeline for stream "my-kinesis-stream"
[2016-06-07 18:28:54.863092] [0x0000700000081000] [info] [shard_map.cc:83] Updating shard map for stream "my-kinesis-stream"
[2016-06-07 18:28:54.988505] [0x00007fff7cc3f000] [info] [shard_map.cc:163] Successfully updated shard map for stream "my-kinesis-stream" found 1 shards
18:28:55.895 [kpl-callback-pool-0-thread-0] WARN c.s.kinesis.KinesisRecordProducer - Failed to send record '1465320533000' to Kinesis.
com.amazonaws.services.kinesis.producer.UserRecordFailedException: null
at com.amazonaws.services.kinesis.producer.KinesisProducer$MessageHandler.onPutRecordResult(KinesisProducer.java:188) [amazon-kinesis-producer-0.10.2.jar:na]
at com.amazonaws.services.kinesis.producer.KinesisProducer$MessageHandler.access$000(KinesisProducer.java:127) [amazon-kinesis-producer-0.10.2.jar:na]
at com.amazonaws.services.kinesis.producer.KinesisProducer$MessageHandler$1.run(KinesisProducer.java:134) [amazon-kinesis-producer-0.10.2.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_66]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_66]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_66]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_66]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_66]

Upgrade to "Release 0.10.0" - Instability?

We were running our app using release 0.9.0 smoothly for weeks without issues. After upgrading the KPL to release 0.10.0 last week, our app started to fail ("outofmemory") after some hours running OK. We noticed a high cpu and memory usage on the KPL native process itself. We rolled back to version 0.9.0 and it started working flawlessly again. Is there a way to generate a log file for the KPL library so that we can retrieve more info?

[Java] Make KinesisProducerConfiguration serializable

Hi,
I'm currently working on integrating the Kinesis Producer into Apache Flink.
I found that the KinesisProducerConfiguration is not serializable. A quick look through the code didn't reveal any fields that would not be serializable (the AWSCredentialsProvider is probably the thing that will need some adoptions, but the rest is just strings, and numbers).

It would be quite helpful for our implementation if the KinesisProducerConfiguration would be serializable so that users can pass their own producer config into Flink and we'll send the config to all the parallel worker instances in the cluster.

Any thoughts?

FileLockInterruptionException when extracting native binaries

I see these exceptions frequently when my code first attempts to instantiate a KinesisProducer instance:

java.lang.RuntimeException: Could not copy native binaries to temp directory /tmp/amazon-kinesis-producer-native-binaries
        at com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:826)
        at com.amazonaws.services.kinesis.producer.KinesisProducer.<init>(KinesisProducer.java:236)
        at com.mypackage.MyKinesisWriter.run(MyKinesisWriter.java:32)
        at com.netflix.hystrix.HystrixCommand$1.call(HystrixCommand.java:294)
        at com.netflix.hystrix.HystrixCommand$1.call(HystrixCommand.java:289)
        at rx.Observable$1.call(Observable.java:145)
        at rx.Observable$1.call(Observable.java:137)
        at rx.Observable$1.call(Observable.java:145)
        at rx.Observable$1.call(Observable.java:137)
        at rx.Observable$1.call(Observable.java:145)
        at rx.Observable$1.call(Observable.java:137)
        at rx.Observable.unsafeSubscribe(Observable.java:7713)
        at com.netflix.hystrix.AbstractCommand$5.call(AbstractCommand.java:534)
        at com.netflix.hystrix.AbstractCommand$5.call(AbstractCommand.java:509)
        at rx.Observable.unsafeSubscribe(Observable.java:7713)
        at rx.internal.operators.OperatorSubscribeOn$1$1.call(OperatorSubscribeOn.java:62)
        at com.netflix.hystrix.strategy.concurrency.HystrixContexSchedulerAction$1.call(HystrixContexSchedulerAction.java:56)
        at com.netflix.hystrix.strategy.concurrency.HystrixContexSchedulerAction$1.call(HystrixContexSchedulerAction.java:47)
        at com.nike.trace.hystrix.TraceCallableWrapper.call(TraceCallableWrapper.java:32)
        at com.netflix.hystrix.strategy.concurrency.HystrixContexSchedulerAction.call(HystrixContexSchedulerAction.java:69)
        at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.channels.FileLockInterruptionException
        at sun.nio.ch.FileChannelImpl.lock(FileChannelImpl.java:1071)
        at java.nio.channels.FileChannel.lock(FileChannel.java:1052)
        at com.amazonaws.services.kinesis.producer.KinesisProducer.extractBinaries(KinesisProducer.java:818)
        ... 27 more

I've got a synchronized block around the method from which I'm calling the KinesisProducer constructor in an effort to make it thread-safe, but that doesn't help. I've also tried configuring my KinesisProducerConfiguration instance with a unique pathname to avoid file collisions, but I still get the above exception. I've done some Googling but haven't found any instances of anyone else reporting this error. My code attempts to re-try initialization of the KinesisProducer instance when this exception occurs, but it happens multiple times in a row.

Some other things to point out:

  • The class running this code is run using Hystrix
  • The application that is running this code is run in a cluster, and interestingly, I've noticed that this error will often only affect a single node in the cluster.

Exception during updateCredentials

Is it possible to avoid this error when calling producer.destroy() to end the program? It doesn't seem to cause any issues, but I always see this:
[pool-1-thread-6] WARN com.amazonaws.services.kinesis.producer.Daemon - Exception during updateCredentials
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at com.amazonaws.services.kinesis.producer.Daemon$5.run(Daemon.java:316)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)

Using producer from C++

Is there any precedent for using this library directly from a C++ program? It seems like the code is fairly tied into being used through IPC rather than in-process. Would you recommend against using it in-process in a C++ application?

Internal Service Error caused subsequent kinesis requests to fail

Hi

We use the Kinesis Producer Library (v0.10.1) to push events to our Kinesis stream. On 10/20/2015 at 22:32 Pacific Time, we saw a steep rise in errors from the Kinesis Producer Library. The errors seem to have been triggered by an "Internal Service Error" on the Kinesis side. We didn't see any service outages on the AWS service health dashboard.
The errors resolved on its own when we restarted our tomcat server.

We also saw a spike in PutRecord latency at the same time. It seems like it was caused due to the errors.
putrecord_latency_spike

Can you please give us insight into why these errors would occur?

Initial error:

Error while logging Kinesis Record - attempts=5, attemptDetails={"errorMessage":"Internal service failure.","duration":5571,"errorCode":"InternalFailure","successful":false,"delay":10013},{"errorMessage":"Internal service failure.","duration":2213,"errorCode":"InternalFailure","successful":false,"delay":4425},{"errorMessage":"Internal service failure.","duration":41,"errorCode":"InternalFailure","successful":false,"delay":5002},{"errorMessage":"Expired while waiting in HttpClient queue","duration":55568920,"errorCode":"Exception","successful":false,"delay":-55566188},{"errorMessage":"Record has reached expiration","duration":0,"errorCode":"Expired","successful":false,"delay":0}

Subsequent Errors:
Error while logging Kinesis Record - attempts=6, attemptDetails={"errorMessage":"Internal service failure.","duration":5571,"errorCode":"InternalFailure","successful":false,"delay":9602},{"errorMessage":"Internal service failure.","duration":2213,"errorCode":"InternalFailure","successful":false,"delay":4425},{"errorMessage":"Internal service failure.","duration":41,"errorCode":"InternalFailure","successful":false,"delay":5002},{"errorMessage":"Expired while waiting in HttpClient queue","duration":55568920,"errorCode":"Exception","successful":false,"delay":-55566188},{"errorMessage":"Expired while waiting in HttpClient queue","duration":55569332,"errorCode":"Exception","successful":false,"delay":-55568920},{"errorMessage":"Record has reached expiration","duration":0,"errorCode":"Expired","successful":false,"delay":0}

No exponential backoff?

We were observing some pretty insane amounts of network throughput hitting the reverse proxy we have in front of kinesis.us-west-1.amazonaws.com:

image

It turns out that our stream was saturated. As soon as we resharded things settled down dramatically. I can only take this to mean that when KPL runs into ProvisionedThroughputExceptions it simply starts retrying in a tight loop? Shouldn't there be some kind of backoff period (ideally configurable)?

KPL addUserRecord "SerializationException" with JSON string

I am getting errors when attempting to write to a Kinesis stream:
[2016-03-01 20:21:35.422459] [0x000048a4] [info] [kinesis_producer.cc:79] Created pipeline for stream ""MYSTREAMNAMEGOESHERE""
[2016-03-01 20:21:35.422459] [0x000048a4] [info] [shard_map.cc:83] Updating shard map for stream ""MYSTREAMNAMEGOESHERE""
[2016-03-01 20:21:35.508680] [0x00003b08] [error] [shard_map.cc:172] Shard map update for stream ""MYSTREAMNAMEGOESHERE""
failed: {"__type":"SerializationException"}; retrying in 1000 ms

Where dataPayload is a String that contains well-formed JSON data,and _producer is my KinesisProducer, my code is as follows:

    ByteBuffer data = ByteBuffer.wrap(dataPayload.getBytes("UTF-8"));      
    ListenableFuture<UserRecordResult> f = 
            _producer.addUserRecord(_streamName, deviceId, data);  

Do I need to do anything in particular to format the JSON data or should the string value work??

Error during socket read: End of file; 0 bytes read so far

Hi,

I am running with KPL v0.10.0 and keep getting the following error message:
[error][io_service_socket.h:229] Error during socket read: End of file; 0 bytes read so far.
I am trying to run logstash plug in based on KPL (=logstash-output-kinesis).

Your support is much appreciated.

KPL back off is kind of bush league

The KPL authors suggest writing something like this when records are being produced at a higher rate than the KPL can submit them to kinesis.

while (producer.getOutstandingRecordsCount() > MAX_BUFFERED) {
    Thread.sleep(1);
}

Using a busy loop to keep the KPL queue from getting too big seems kind of a novice approach. Since the Java code already has a callback and a ConcurrentHashMap it should allow us to specify a max and then have a "blocking" version of the addUserRecord call. I am doing something like this to work around the issue:

  private static final Semaphore backPressure = new Semaphore(MAX_BUFFERED_SIZE, true);

  private static final FutureCallback<UserRecordResult> CALLBACK = new FutureCallback<UserRecordResult>() {
    @Override
    public void onSuccess(UserRecordResult result) {
      backPressure.release();
    }

    @Override
    public void onFailure(Throwable t) {
      backPressure.release();
      . . .
    }
  };

  public void submit(String stream, String partitionKey, ByteBuffer data) throws InterruptedException {
    backPressure.acquire();
    try {
      ListenableFuture<UserRecordResult> future = producer.addUserRecord(stream, partitionKey, data);
      Futures.addCallback(future, CALLBACK);
    } catch (RuntimeException e) {
      backPressure.release();
      throw e;
    }
  }

It seems kind of lame I have to do this with a semphore when the KPL java code could do this more efficiently and built-in.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.