Code Monkey home page Code Monkey logo

aws-fluent-plugin-kinesis's Introduction

Fluent plugin for Amazon Kinesis

Build Status Gem Version Gem Downloads

A fluentd output plugin to send events to Amazon Kinesis Data Streams and Amazon Data Firehose. The plugin also supports KPL Aggregated Record Format.

This gem includes following three output plugins:

  • kinesis_streams
  • kinesis_firehose
  • kinesis_streams_aggregated

The plugin is also described in official Fluentd document.

Note: This README is for the latest v3. Plugin v3 is almost compatible with v2. If you use v1, see v1 README.

Installation

Simply use RubyGems:

$ gem install fluent-plugin-kinesis --no-document

If you would like to build by yourself and install, you can build and install as follows:

$ git clone https://github.com/awslabs/aws-fluent-plugin-kinesis.git
$ cd aws-fluent-plugin-kinesis
$ bundle install
$ bundle exec rake build
$ bundle exec rake install

# If using fluent-package (td-agent), use td-agent-gem
$ td-agent-gem install pkg/fluent-plugin-kinesis

Requirements

fluent-plugin-kinesis fluentd ruby
>= 3.5.0 >= 0.14.22 >= 2.4.2
>= 3.2.0 && < 3.5.0 >= 0.14.22 >= 2.3
>= 3.0.0 && < 3.2.0 >= 0.14.10 >= 2.1
>= 2.0.0 && < 3.0.0 >= 0.12.35 >= 2.1
< 2.0.0 >= 0.10.58 >= 2.0

Getting Started

When you run this plugin on Amazon EC2 instances or container services, use instance profiles to assume role. If you want to use specific credentials, see Credentials.

kinesis_streams

In your Fluentd configuration, use @type kinesis_streams. The configuration would look like this:

<match *>
  @type kinesis_streams
  region us-east-1
  stream_name YOUR_STREAM
  partition_key key  # Otherwise, use random partition key
</match>

For more details, see Configuration: kinesis_streams.

kinesis_firehose

In your Fluentd configuration, use @type kinesis_firehose. The configuration would look like this:

<match *>
  @type kinesis_firehose
  region us-east-1
  delivery_stream_name YOUR_STREAM
</match>

For more details, see Configuration: kinesis_firehose.

kinesis_streams_aggregated

In your Fluentd configuration, use @type kinesis_streams_aggregated. The configuration would look like this:

<match *>
  @type kinesis_streams_aggregated
  region us-east-1
  stream_name YOUR_STREAM
  # Unlike kinesis_streams, there is no way to use dynamic partition key.
  # fixed_partition_key or random.
</match>

For more details, see Configuration: kinesis_streams_aggregated.

Configuration

Configuration: Plugin

Configuration: kinesis_streams

The following parameters are kinesis_streams specific configurations.

stream_name

Name of the stream to put data.

As of Fluentd v1, you can use placeholders for this stream_name parameter. Note that chunk keys are required in your buffer section attributes for placeholders to work.

The following configuration shows kinesis_streams output plugin that applies extract_placeholders on stream_name:

# chunk_key: tag
# ${tag} will be replaced with actual tag string
<match *>
  @type kinesis_streams
  stream_name ${tag}-stream

  <buffer tag>
    # ...
  </buffer>
</match>

The value of timekey in buffer chunk keys can be extracted using strptime placeholders like this:

# chunk_key: tag and time
<match *>
  @type kinesis_streams
  stream_name ${tag}-stream-%Y%m%d

  <buffer tag, time>
    # ...
  </buffer>
</match>

You can also use custom placeholder like this:

# chunk_key: $.kubernetes.annotations.kinesis_stream
<match *>
  @type kinesis_streams
  stream_name "${$.kubernetes.annotations.kinesis_stream}"

  <buffer $.kubernetes.annotations.kinesis_stream>
    # ...
  </buffer>
</match>

For more details, see Placeholders in Config: Buffer Section.

partition_key

A key to extract partition key from JSON object. Default nil, which means partition key will be generated randomly.

Configuration: kinesis_firehose

The following parameters are kinesis_firehose specific configurations.

delivery_stream_name

Name of the delivery stream to put data.

As of Fluentd v1, placerholders are supported. For more details, see stream_name for kinesis_streams plugin and Placeholders in Config: Buffer Section.

append_new_line

Boolean. Default true. If it is enabled, the plugin adds new line character (\n) to each serialized record.
Before appending \n, plugin calls chomp and removes separator from the end of each record as chomp_record is true. Therefore, you don't need to enable chomp_record option when you use kinesis_firehose output with default configuration (append_new_line is true). If you want to set append_new_line false, you can choose chomp_record false (default) or true (compatible format with plugin v2).

Configuration: kinesis_streams_aggregated

The following parameters are kinesis_streams_aggregated specific configurations.

stream_name (kinesis_streams_aggregated)

Name of the stream to put data.

As of Fluentd v1, placerholders are supported. For more details, see stream_name for kinesis_streams plugin and Placeholders in Config: Buffer Section.

fixed_partition_key

A value of fixed partition key. Default nil, which means partition key will be generated randomly. Note that all records will go to single shard if you specify this option.

Configuration: Credentials

To put records into Amazon Kinesis Data Streams or Amazon Data Firehose, you need to provide AWS security credentials. Without specifying credentials in config file, this plugin automatically fetches credentials just following AWS SDK for Ruby does (environment variable, shared profile, or instance profile).

This plugin uses almost same configurations as fluent-plugin-s3, but also supports several additional configurations like aws_ses_token for temporary credentials.

AWS key and secret authentication

These parameters are required when your agent is not running on EC2 instance with an IAM Role. When using an IAM role, make sure to configure instance_profile_credentials. Usage can be found below.

aws_key_id (required)

AWS access key id.

aws_sec_key (required)

AWS secret key.

aws_ses_token

AWS session token. This parameter is optional, but can be provided if using MFA or temporary credentials when your agent is not running on EC2 instance with an IAM Role.

aws_iam_retries

The number of attempts to make (with exponential backoff) when loading instance profile credentials from the EC2 metadata service using an IAM role. Defaults to 5 retries.

<assume_role_credentials> section

Typically, you can use AssumeRole for cross-account access or federation.

<match *>
  @type kinesis_streams

  <assume_role_credentials>
    role_arn          ROLE_ARN
    role_session_name ROLE_SESSION_NAME
  </assume_role_credentials>
</match>

See also:

role_arn (required)

The Amazon Resource Name (ARN) of the role to assume.

role_session_name (required)

An identifier for the assumed role session.

policy

An IAM policy in JSON format.

duration_seconds

The duration, in seconds, of the role session. The value can range from 900 seconds (15 minutes) to 3600 seconds (1 hour). By default, the value is set to 3600 seconds.

external_id

A unique identifier that is used by third parties when assuming roles in their customers' accounts.

sts_http_proxy

Proxy url for proxying requests to amazon sts service api. This needs to be set up independently from global http_proxy parameter for the use case in which requests to kinesis api are going via kinesis vpc endpoint but requests to sts api have to go via http proxy. It should be added to assume_role_credentials section in the next format:

sts_http_proxy http://[username:password]@hostname:port

sts_endpoint_url

STS API endpoint url. This can be used to override the default global STS API endpoint of sts.amazonaws.com. Using regional endpoints may be preferred to reduce latency, and are required if utilizing a PrivateLink VPC Endpoint for STS API calls.

<web_identity_credentials> section

Similar to the assume_role_credentials, but for usage in EKS.

<match *>
  @type kinesis_streams

  <web_identity_credentials>
    role_arn          ROLE_ARN
    role_session_name ROLE_SESSION_NAME
    web_identity_token_file AWS_WEB_IDENTITY_TOKEN_FILE
  </web_identity_credentials>
</match>

See also:

role_arn (required)

The Amazon Resource Name (ARN) of the role to assume.

role_session_name (required)

An identifier for the assumed role session.

web_identity_token_file (required)

The absolute path to the file on disk containing the OIDC token.

policy

An IAM policy in JSON format.

duration_seconds

The duration, in seconds, of the role session. The value can range from 900 seconds (15 minutes) to 43200 seconds (12 hours). By default, the value is set to 3600 seconds (1 hour).

<instance_profile_credentials> section

Retrieve temporary security credentials via HTTP request. This is useful on EC2 instance.

<match *>
  @type kinesis_streams

  <instance_profile_credentials>
    ip_address IP_ADDRESS
    port       PORT
  </instance_profile_credentials>
</match>

See also:

retries

Number of times to retry when retrieving credentials. Default is 5.

ip_address

Default is 169.254.169.254.

port

Default is 80.

http_open_timeout

Default is 5.

http_read_timeout

Default is 5.

<shared_credentials> section

This loads AWS access credentials from local ini file. This is useful for local developing.

<match *>
  @type kinesis_streams

  <shared_credentials>
    path         PATH
    profile_name PROFILE_NAME
  </shared_credentials>
</match>

See also:

path

Path to the shared file. Defaults to "#{Dir.home}/.aws/credentials".

profile_name

Defaults to 'default' or [ENV]('AWS_PROFILE').

<process_credentials> section

This loads AWS access credentials from an external process.

<match *>
  @type kinesis_streams

  <process_credentials>
    process CMD
  </process_credentials>
</match>

See also:

process (required)

Command to be executed as an external process.

Configuration: Performance

<buffer> section

Use Fluentd buffering and flushing parameters to optimize throughput. When you use Fluent v1+ (td-agent v3+), write these configurations in buffer section like this:

<match *>
  @type kinesis_streams

  <buffer>
    flush_interval 1
    chunk_limit_size 1m
    flush_thread_interval 0.1
    flush_thread_burst_interval 0.01
    flush_thread_count 15
  </buffer>
</match>

For more details, see Config: Buffer Section. Note that each parameter should be adjusted to your system.

Configuration: Batch Request

retries_on_batch_request

Integer, default is 8. The plugin will put multiple records to Amazon Kinesis Data Streams in batches using PutRecords. A set of records in a batch may fail for reasons documented in the Kinesis Service API Reference for PutRecords. Failed records will be retried retries_on_batch_request times. If a record fails all retries an error log will be emitted.

reset_backoff_if_success

Boolean, default true. If enabled, when after retrying, the next retrying checks the number of succeeded records on the former batch request and reset exponential backoff if there is any success. Because batch request could be composed by requests across shards, simple exponential backoff for the batch request wouldn't work some cases.

batch_request_max_count

Integer, default 500. The number of max count of making batch request from record chunk. It can't exceed the default value because it's API limit.

Default:

  • kinesis_streams: 500
  • kinesis_firehose: 500
  • kinesis_streams_aggregated: 100,000

batch_request_max_size

Integer. The number of max size of making batch request from record chunk. It can't exceed the default value because it's API limit.

Default:

  • kinesis_streams: 5 MB
  • kinesis_firehose: 4 MB
  • kinesis_streams_aggregated: 1 MB

drop_failed_records_after_batch_request_retries

Boolean, default true.

If drop_failed_records_after_batch_request_retries is enabled (default), the plugin will drop failed records when batch request fails after retrying max times configured as retries_on_batch_request. This dropping can be monitored from monitor_agent or fluent-plugin-prometheus as retry_count or num_errors metrics.

If drop_failed_records_after_batch_request_retries is disabled, the plugin will raise error and return chunk to Fluentd buffer when batch request fails after retrying max times. Fluentd will retry to send chunk records according to retry config in Buffer Section. Note that this retryng may create duplicate records since PutRecords API of Kinesis Data Streams and PutRecordBatch API of Kinesis Data Firehose may return a partially successful response.

monitor_num_of_batch_request_retries

Boolean, default false. If enabled, the plugin will increment retry_count monitoring metrics after internal retrying to send batch request. This configuration enables you to monitor ProvisionedThroughputExceededException from monitor_agent or fluent-plugin-prometheus. Note that retry_count metrics will be counted by the plugin in addition to original Fluentd buffering mechanism if monitor_num_of_batch_request_retries is enabled.

Configuration: Format

<format> section

This plugin uses Fluent::TextFormatter to serialize record to string. See formatter.rb for more details. By default, it uses json formatter same as specific like below:

<match *>
  @type kinesis_streams

  <format>
    @type json
  </format>
</match>

For other configurations of json formatter, see json formatter plugin.

<inject> section

This plugin uses Fluent::TimeFormatter and other injection configurations. See inject.rb for more details.

For example, the config below will add time field whose value is event time with nanosecond and tag field whose value is its tag.

<match *>
  @type kinesis_streams

  <inject>
    time_key time
    tag_key tag
  </inject>
</match>

By default, time_type string and time_format %Y-%m-%dT%H:%M:%S.%N%z are already set to be applicable to Elasticsearch sub-second format. Although, you can use any configuration.

In addition, there are some format related options:

data_key

If your record contains a field whose string should be sent to Amazon Kinesis directly (without formatter), use this parameter to specify the field. In that case, other fields than data_key are thrown away and never sent to Amazon Kinesis. Default nil, which means whole record will be formatted and sent.

compression

Specifying compression way for data of each record. Current accepted options are zlib and gzip. Otherwise, no compression will be preformed.

log_truncate_max_size

Integer, default 1024. When emitting the log entry, the message will be truncated by this size to avoid infinite loop when the log is also sent to Kinesis. The value 0 means no truncation.

chomp_record

Boolean. Default false. If it is enabled, the plugin calls chomp and removes separator from the end of each record. This option is for compatible format with plugin v2. See #142 for more details.
When you use kinesis_firehose output, append_new_line option is true as default. If append_new_line is enabled, the plugin calls chomp as chomp_record is true before appending \n to each record. Therefore, you don't need to enable chomp_record option when you use kinesis_firehose with default configuration. If you want to set append_new_line false, you can choose chomp_record false (default) or true (compatible format with plugin v2).

Configuration: AWS SDK

region

AWS region of your stream. It should be in form like us-east-1, us-west-2. Refer to Regions and Endpoints in AWS General Reference for supported regions. Default nil, which means try to find from environment variable AWS_REGION.

max_record_size

The upper limit of size of each record. Default is 1 MB which is the limitation of Kinesis.

http_proxy

HTTP proxy for API calling. Default nil.

endpoint

API endpoint URL, for testing. Default nil.

ssl_verify_peer

Boolean. Disable if you want to verify ssl connection, for testing. Default true.

debug

Boolean. Enable if you need to debug Amazon Data Firehose API call. Default is false.

Development

To launch fluentd process with this plugin for development, follow the steps below:

git clone https://github.com/awslabs/aws-fluent-plugin-kinesis.git
cd aws-fluent-plugin-kinesis
make # will install gems dependency
bundle exec fluentd -c /path/to/fluent.conf

To launch using specified version of Fluentd, use BUNDLE_GEMFILE environment variable:

BUNDLE_GEMFILE=$PWD/gemfiles/Gemfile.td-agent-3.3.0 bundle exec fluentd -c /path/to/fluent.conf

Contributing

Bug reports and pull requests are welcome on GitHub.

Related Resources

aws-fluent-plugin-kinesis's People

Contributors

adammw avatar akoetsier avatar c-jcarlsen-godaddy avatar clcpolevaulter avatar cosmo0920 avatar grosser avatar hyandell avatar jganoff avatar jihyunsong avatar jtblin avatar metalfork avatar nhanatl avatar odudek avatar ohkyutaeknology avatar rahulashok avatar rajeshkp avatar repeatedly avatar riywo avatar ryysud avatar samsplunks avatar sbruggeman avatar simukappu avatar tahoward avatar tamtam180 avatar trevorrowe avatar trongnhanbmt avatar xavi- avatar yuta-imai avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

aws-fluent-plugin-kinesis's Issues

Should count as success if any records shipped (?)

Consider this situation: shard is close to saturated. One fluentd node gets unlucky, and has to retry a few times (can happen just because of poor distribution). Because it's retrying, the next chunk to ship is much bigger (i.e. up to the capacity of put_records).

Now it has a bigger chunk, it's much more likely to fail to ship (more records, and more likely to hit capacity). Therefore it will need even more retries to succeed, and because of the exponential back-off will take a long time to do so. i.e. the throughput of this particular fluentd node will be much lower than any others, because the more records you try to ship in a particular put_records attempt the longer you will spend backing off.

Suggested fix: set retry_count to 0 if any records were pushed, which means the sleep will be ~0.5 secs (the smallest possible sleep).

Also, would be great to log how many records were pushed at this point (and how many were attempted).

Plugin putting one record at a time to Kinesis

I am experimenting with putting our data to Kinesis with fluentd but as it seems this plug-in is putting one record at a time to the kinesis and waiting for the response, basically making it useless for high traffic data volumes.

Nothing too fancy in my fluentd.conf:

<source>
    type tail
    path var/spool/tracking-logs/%Y-%m-%d_%H-%M.events
    pos_file var/spool/tracking-logs/events.log.pos
    tag kinesis.eventtracker
    read_from_head true
    format json
    refresh_interval 5
</source>

# Kinesis
<match kinesis.eventtracker>
type kinesis

stream_name eventStream

aws_key_id ID
aws_sec_key KEY

region us-east-1
debug true
#partition_key sessionId
partition_key timestamp
</match>

Basically, in a 5 minute interval roughly 800 records are put into stream, making throughput something like 3 records per second, way below 1000 RpS that a single Kinesis shard is able to process. Switching debug mode on/off doesn't affect performance. Is there a way to fire up more events to the kinesis/make it perform async calls?

Any hints of how to improve performance or is this an underlying problem related to AWS SDK for Ruby?

Misleading README notes on "detach_process" & "num_threads"

Greetings!

Do the configuration options "detach_process" & "num_threads" still do anything after the re-write? The README still contains a section on using these options, but grep-ing through the code base the only references I can find for these parameters is in the old, deprecated output plugin. Links:

README section - Improving Throughput to Amazon Kinesis:

Code references in the deprecated output plugin:

If these config parameters don't do anything for the new plugins, they should probably be removed from the README to avoid confusion.

Also in that case, they should be removed from the benchmark/*.conf files, ex: https://github.com/awslabs/aws-fluent-plugin-kinesis/blob/master/benchmark/producer.conf#L18-L19

No implicit conversion of String into Integer error loop when record not in json

We are getting the following error:

2015-10-21 15:14:33 +0000 [warn]: temporarily failed to flush the buffer. next_retry=2015-10-21 15:14:34 +0000 error_class="TypeError"
error="no implicit conversion of String into Integer" plugin_id="object:3fb566013c1c"
2015-10-21 15:14:33 +0000 [warn]: /usr/local/bundle/gems/fluent-plugin-kinesis-0.3.6/lib/fluent/plugin/out_kinesis.rb:124:in []' 2015-10-21 15:14:33 +0000 [warn]: /usr/local/bundle/gems/fluent-plugin-kinesis-0.3.6/lib/fluent/plugin/out_kinesis.rb:124:inblock in write'
2015-10-21 15:14:33 +0000 [warn]: /usr/local/bundle/gems/fluentd-0.12.15/lib/fluent/buffer.rb:117:in each' 2015-10-21 15:14:33 +0000 [warn]: /usr/local/bundle/gems/fluentd-0.12.15/lib/fluent/buffer.rb:117:inblock in msgpack_each'
2015-10-21 15:14:33 +0000 [warn]: /usr/local/bundle/gems/fluentd-0.12.15/lib/fluent/plugin/buf_file.rb:64:in open' 2015-10-21 15:14:33 +0000 [warn]: /usr/local/bundle/gems/fluentd-0.12.15/lib/fluent/buffer.rb:114:inmsgpack_each'
2015-10-21 15:14:33 +0000 [warn]: /usr/local/bundle/gems/fluent-plugin-kinesis-0.3.6/lib/fluent/plugin/out_kinesis.rb:123:in each' 2015-10-21 15:14:33 +0000 [warn]: /usr/local/bundle/gems/fluent-plugin-kinesis-0.3.6/lib/fluent/plugin/out_kinesis.rb:123:infind_all'
2015-10-21 15:14:33 +0000 [warn]: /usr/local/bundle/gems/fluent-plugin-kinesis-0.3.6/lib/fluent/plugin/out_kinesis.rb:123:in write' 2015-10-21 15:14:33 +0000 [warn]: /usr/local/bundle/gems/fluentd-0.12.15/lib/fluent/buffer.rb:325:inwrite_chunk'
2015-10-21 15:14:33 +0000 [warn]: /usr/local/bundle/gems/fluentd-0.12.15/lib/fluent/buffer.rb:304:in pop' 2015-10-21 15:14:33 +0000 [warn]: /usr/local/bundle/gems/fluentd-0.12.15/lib/fluent/output.rb:321:intry_flush'
2015-10-21 15:14:33 +0000 [warn]: /usr/local/bundle/gems/fluentd-0.12.15/lib/fluent/output.rb:140:in `run'

The error is in write on the following lines: record['data']

data_list = chunk.to_enum(:msgpack_each).find_all{|record|
  unless record_exceeds_max_size?(record['data'])`

This is likely due to record not being a json object but an array so [] is expecting an integer. The elasticsearch plugin avoids this issue with a guard to check that the record is a hash.

next unless record.is_a? Hash

On restart of the process, it seems to be looping on this error for error while trying to process the buffer.

instance profile credentials expiring

The plugin (I'm using kinesis_producer) does not seem fetch fresh tokens from the metadata service regularly.

2016-03-27 05:20:10 +0000 [warn]: temporarily failed to flush the buffer. next_retry=2016-03-27 05:29:17 +0000 error_class="StandardError" error="{:attempts=>[{:delay=>112, :dura
tion=>6, :success=>false, :error_code=>\"400\", :error_message=>\"{\\\"__type\\\":\\\"ExpiredTokenException\\\",\\\"message\\\":\\\"The security token included in the request is
expired\\\"}\"}], :success=>false}" plugin_id="object:3ff76076a31c"

The credentials seem to be fetched once at plugin startup in https://github.com/awslabs/aws-fluent-plugin-kinesis/blob/v1.0.0/lib/fluent/plugin/kinesis_helper/credentials.rb#L47. The name of the method default_credentials_provider is suggesting that it returns a provider, instead it returns credentials.

The credentials are then passed in to the client which has a loop to refresh the credentials https://github.com/awslabs/aws-fluent-plugin-kinesis/blob/v1.0.0/lib/kinesis_producer/daemon.rb#L183-L192. After ~6 hours, the credentials are no longer valid and the producer stalls.

Shouldn't this use the provider instead and dereference the credentials property regularly?

[edited to replace master with v1.0.0 in the file links]

Derive stream name from fluentd tag

We have an application where we would like to route data to different Kinesis streams based on the fluentd tag. This is doable by specifying a different <match></match> configuration for each tag, but it means that there will be a separate KPL process for each configuration. And if we have 20 different streams, that means 20 instances of the KPL process, which wastes resources.

Since the KPL requires the stream name to be specified with each record, the fluentd plugin can dynamically determine the stream name for each record it processes. One possible way to do this would be to allow users to specify a stream name prefix, and a regex to extract other data from the tag. A simpler (and more efficient) alternative would be a simple concatenation.

I'm happy to provide a PR for this functionality, if there is interest in accepting it.

Gem Dependency Conflict

Hi, after plugin installation I am getting following error:

/opt/td-agent/embedded/lib/ruby/site_ruby/2.1.0/rubygems/specification.rb:2064:in raise_if_conflicts': Unable to activate aws-sdk-2.1.36, because aws-sdk-resources-2.2.3 conflicts with aws-sdk-resources (= 2.1.36) (Gem::LoadError) from /opt/td-agent/embedded/lib/ruby/site_ruby/2.1.0/rubygems/specification.rb:1262:inactivate'
from /opt/td-agent/embedded/lib/ruby/site_ruby/2.1.0/rubygems/specification.rb:1298:in block in activate_dependencies' from /opt/td-agent/embedded/lib/ruby/site_ruby/2.1.0/rubygems/specification.rb:1284:ineach'
from /opt/td-agent/embedded/lib/ruby/site_ruby/2.1.0/rubygems/specification.rb:1284:in activate_dependencies' from /opt/td-agent/embedded/lib/ruby/site_ruby/2.1.0/rubygems/specification.rb:1266:inactivate'
from /opt/td-agent/embedded/lib/ruby/site_ruby/2.1.0/rubygems.rb:196:in rescue in try_activate' from /opt/td-agent/embedded/lib/ruby/site_ruby/2.1.0/rubygems.rb:193:intry_activate'
from /opt/td-agent/embedded/lib/ruby/site_ruby/2.1.0/rubygems/core_ext/kernel_require.rb:132:in rescue in require' from /opt/td-agent/embedded/lib/ruby/site_ruby/2.1.0/rubygems/core_ext/kernel_require.rb:144:inrequire'
from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-firehose-0.1.1/lib/fluent/plugin/out_kinesis_firehose.rb:1:in <top (required)>' from /opt/td-agent/embedded/lib/ruby/site_ruby/2.1.0/rubygems/core_ext/kernel_require.rb:126:inrequire'
from /opt/td-agent/embedded/lib/ruby/site_ruby/2.1.0/rubygems/core_ext/kernel_require.rb:126:in require' from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.17/lib/fluent/plugin.rb:168:inblock in try_load_plugin'
from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.17/lib/fluent/plugin.rb:166:in each' from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.17/lib/fluent/plugin.rb:166:intry_load_plugin'
from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.17/lib/fluent/plugin.rb:126:in new_impl' from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.17/lib/fluent/plugin.rb:57:innew_output'
from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.17/lib/fluent/plugin/out_copy.rb:41:in block in configure' from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.17/lib/fluent/plugin/out_copy.rb:34:ineach'
from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.17/lib/fluent/plugin/out_copy.rb:34:in configure' from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.17/lib/fluent/agent.rb:129:inadd_match'
from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.17/lib/fluent/agent.rb:60:in block in configure' from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.17/lib/fluent/agent.rb:54:ineach'
from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.17/lib/fluent/agent.rb:54:in configure' from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.17/lib/fluent/root_agent.rb:82:inconfigure'
from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.17/lib/fluent/engine.rb:116:in configure' from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.17/lib/fluent/engine.rb:90:inrun_configure'
from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.17/lib/fluent/supervisor.rb:497:in run_configure' from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.17/lib/fluent/supervisor.rb:145:inblock in start'
from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.17/lib/fluent/supervisor.rb:336:in call' from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.17/lib/fluent/supervisor.rb:336:inmain_process'
from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.17/lib/fluent/supervisor.rb:311:in block in supervise' from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.17/lib/fluent/supervisor.rb:310:infork'
from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.17/lib/fluent/supervisor.rb:310:in supervise' from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.17/lib/fluent/supervisor.rb:141:instart'
from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.17/lib/fluent/command/fluentd.rb:171:in <top (required)>' from /opt/td-agent/embedded/lib/ruby/site_ruby/2.1.0/rubygems/core_ext/kernel_require.rb:73:inrequire'
from /opt/td-agent/embedded/lib/ruby/site_ruby/2.1.0/rubygems/core_ext/kernel_require.rb:73:in require' from /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.17/bin/fluentd:6:in<top (required)>'
from /opt/td-agent/embedded/bin/fluentd:23:in load' from /opt/td-agent/embedded/bin/fluentd:23:in<top (required)>'
from /usr/sbin/td-agent:7:in load' from /usr/sbin/td-agent:7:in

'
[FAILED]
2016-07-14 11:19:28 -0400 [info]: process finished code=256
2016-07-14 11:19:28 -0400 [warn]: process died within 1 second. exit.
^C[1]+ Exit 1 service td-agent start

This is td-agent version:

/etc/td-agent/conf.d]# yum info td-agent
Loaded plugins: priorities, update-motd, upgrade-helper
959 packages excluded due to repository priority protections
Installed Packages
Name : td-agent
Arch : x86_64
Version : 2.2.1
Release : 0.el2015
Size : 199 M
Repo : installed
From repo : treasuredata
Summary : Treasure Agent: A data collector for Treasure Data
URL : http://treasuredata.com
License : unknown
Description : Treasure Agent: A data collector for Treasure Data

Could you please help about this?

Thanks

Http_proxy

Can I use IP instead of DNS as our internet network doesn't allow name?

Potential memory leak in the plugin

Hey, I have been using the kinesis plugin to ship logs in production. However, we found that the memory usage of fluentd keep going up even though I'm not using memory buffering (using buffer_type file). This leads to the fluentd get killed by OOM killer. I'm not 100% sure what causes the leak, do you have any insights? I'm using ruby2.2 and kinesis plugin 0.3.5

Emits entire record on error

There are a couple of situations in this plugin where the record data is logged on failure. If the fluentd logs are also sent to the kinesis stream, this can cause problems, since the reason the record failed might be to do with the record itself (i.e. too large) and an even larger record will be generated.

Thus an endless loop of 'failed' records ensues, each one progressively larger.

Perhaps the record can be trimmed before being logged?

http_proxy error ..urgent help required

HTTP proxy settings giving this error ..please help ..

2016-03-22 13:23:32 +1100 [warn]: temporarily failed to flush the buffer. next_retry=2016-03-22 13:23:48 +1100 error_class="Net::HTTPServerException" error="407 "authenticationrequired"" plugin_id="object:3f81117b0aa8"
2016-03-22 13:23:32 +1100 [warn]: suppressed same stacktrace
2016-03-22 13:23:48 +1100 [warn]: temporarily failed to flush the buffer. next_retry=2016-03-22 13:24:16 +1100 error_class="Net::HTTPServerException" error="407 "authenticationrequired"" plugin_id="object:3f81117b0aa8"
2016-03-22 13:23:48 +1100 [warn]: suppressed same stacktrace
2016-03-22 13:24:17 +1100 [warn]: temporarily failed to flush the buffer. next_retry=2016-03-22 13:25:24 +1100 error_class="Net::HTTPServerException" error="407 "authenticationrequired"" plugin_id="object:3f81117b0aa8"
2016-03-22 13:24:17 +1100 [warn]: suppressed same stacktrace
2016-03-22 13:25:24 +1100 [warn]: temporarily failed to flush the buffer. next_retry=2016-03-22 13:27:22 +1100 error_class="Net::HTTPServerException" error="407 "authenticationrequired"" plugin_id="object:3f81117b0aa8"
2016-03-22 13:25:24 +1100 [warn]: suppressed same stacktrace
2016-03-22 13:27:22 +1100 [warn]: temporarily failed to flush the buffer. next_retry=2016-03-22 13:31:07 +1100 error_class="Net::HTTPServerException" error="407 "authenticationrequired"" plugin_id="object:3f81117b0aa8"
2016-03-22 13:27:22 +1100 [warn]: suppressed same stacktrace
2016-03-22 13:30:03 +1100 [info]: shutting down fluentd
2016-03-22 13:30:03 +1100 [info]: shutting down input type="forward" plugin_id="object:3f811335afd8"
2016-03-22 13:30:03 +1100 [info]: shutting down input type="http" plugin_id="object:3f8113355894"
2016-03-22 13:30:03 +1100 [info]: shutting down input type="debug_agent" plugin_id="object:3f81133489c8"
2016-03-22 13:30:03 +1100 [info]: shutting down input type="tail" plugin_id="object:3f8113361108"
2016-03-22 13:30:04 +1100 [info]: shutting down output type="stdout" plugin_id="object:3f811336bdc4"
2016-03-22 13:30:04 +1100 [info]: shutting down output type="kinesis_streams" plugin_id="object:3f81117b0aa8"
2016-03-22 13:30:04 +1100 [warn]: before_shutdown failed error="407 "authenticationrequired""
2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/2.1.0/net/http/response.rb:119:in error!' 2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/2.1.0/net/http/response.rb:128:invalue'
2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/2.1.0/net/http.rb:915:in connect' 2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/2.1.0/net/http.rb:863:indo_start'
2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/2.1.0/net/http.rb:858:in start' 2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/seahorse/client/net_http/connection_pool.rb:292:instart_session'
2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/seahorse/client/net_http/connection_pool.rb:104:in session_for' 2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/seahorse/client/net_http/handler.rb:109:insession'
2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/seahorse/client/net_http/handler.rb:61:in transmit' 2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/seahorse/client/net_http/handler.rb:35:incall'
2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/seahorse/client/plugins/content_length.rb:12:in call' 2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/aws-sdk-core/json/error_handler.rb:8:incall'
2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/aws-sdk-core/plugins/request_signer.rb:84:in call' 2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/aws-sdk-core/plugins/retry_errors.rb:87:incall'
2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/aws-sdk-core/json/handler.rb:11:in call' 2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/aws-sdk-core/plugins/user_agent.rb:12:incall'
2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/seahorse/client/plugins/endpoint.rb:41:in call' 2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/aws-sdk-core/plugins/param_validator.rb:21:incall'
2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/seahorse/client/plugins/raise_response_errors.rb:14:in call' 2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/aws-sdk-core/plugins/param_converter.rb:20:incall'
2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/seahorse/client/plugins/response_target.rb:21:in call' 2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/seahorse/client/request.rb:70:insend_request'
2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/seahorse/client/base.rb:207:in block (2 levels) in define_operation_methods' 2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-1.0.0/lib/fluent/plugin/out_kinesis_streams.rb:41:inbatch_request'
2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-1.0.0/lib/fluent/plugin/kinesis_helper/api.rb:79:in batch_request_with_retry' 2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-1.0.0/lib/fluent/plugin/out_kinesis_streams.rb:26:inblock in write'
2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-1.0.0/lib/fluent/plugin/out_kinesis_streams.rb:25:in each' 2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-1.0.0/lib/fluent/plugin/out_kinesis_streams.rb:25:inwrite'
2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.22/lib/fluent/buffer.rb:351:in write_chunk' 2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.22/lib/fluent/buffer.rb:330:inpop'
2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.22/lib/fluent/plugin/buf_memory.rb:100:in block in before_shutdown' 2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/2.1.0/monitor.rb:211:inmon_synchronize'
2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.22/lib/fluent/plugin/buf_memory.rb:96:in before_shutdown' 2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.22/lib/fluent/output.rb:413:inbefore_shutdown'
2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.22/lib/fluent/output.rb:169:in block in run' 2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.22/lib/fluent/output.rb:168:insynchronize'
2016-03-22 13:30:04 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.22/lib/fluent/output.rb:168:in `run'
2016-03-22 13:30:04 +1100 [info]: process finished code=0
2016-03-22 13:30:05 +1100 [info]: reading config file path="/etc/td-agent/td-agent.conf"
2016-03-22 13:30:05 +1100 [info]: starting fluentd-0.12.22
2016-03-22 13:30:05 +1100 [info]: gem 'fluent-mixin-config-placeholders' version '0.3.1'
2016-03-22 13:30:05 +1100 [info]: gem 'fluent-mixin-plaintextformatter' version '0.2.6'
2016-03-22 13:30:05 +1100 [info]: gem 'fluent-plugin-kinesis' version '1.0.0'
2016-03-22 13:30:05 +1100 [info]: gem 'fluent-plugin-kinesis' version '0.4.0'
2016-03-22 13:30:05 +1100 [info]: gem 'fluent-plugin-mongo' version '0.7.12'
2016-03-22 13:30:05 +1100 [info]: gem 'fluent-plugin-rewrite-tag-filter' version '1.5.4'
2016-03-22 13:30:05 +1100 [info]: gem 'fluent-plugin-s3' version '0.6.5'
2016-03-22 13:30:05 +1100 [info]: gem 'fluent-plugin-scribe' version '0.10.14'
2016-03-22 13:30:05 +1100 [info]: gem 'fluent-plugin-td' version '0.10.28'
2016-03-22 13:30:05 +1100 [info]: gem 'fluent-plugin-td-monitoring' version '0.2.2'
2016-03-22 13:30:05 +1100 [info]: gem 'fluent-plugin-webhdfs' version '0.4.1'
2016-03-22 13:30:05 +1100 [info]: gem 'fluentd' version '0.12.22'
2016-03-22 13:30:05 +1100 [info]: gem 'fluentd' version '0.12.20'
2016-03-22 13:30:05 +1100 [info]: adding match pattern="" type="kinesis_streams"
2016-03-22 13:30:05 +1100 [info]: adding match pattern="debug.
" type="stdout"
2016-03-22 13:30:05 +1100 [info]: adding source type="tail"
2016-03-22 13:30:05 +1100 [info]: adding source type="forward"
2016-03-22 13:30:05 +1100 [info]: adding source type="http"
2016-03-22 13:30:05 +1100 [info]: adding source type="debug_agent"
2016-03-22 13:30:05 +1100 [info]: using configuration file:
<match **>
@type kinesis_streams
stream_name FluentDHeartbeat
aws_key_id xxxxxx
aws_sec_key xxxxxx
http_proxy http://172.17.100.181:9090
debug true
region ap-southeast-2
flush_interval 5s
num_threads 1

type tail path /var/log/td-agent/heartbeat.log tag heartbeat.access format none pos_file /var/log/td-agent/heartbeat.log.pos type stdout type forward type http port 8888 type debug_agent bind 127.0.0.1 port 24230 2016-03-22 13:30:05 +1100 [info]: following tail of /var/log/td-agent/heartbeat.log 2016-03-22 13:30:05 +1100 [info]: listening fluent socket on 0.0.0.0:24224 2016-03-22 13:30:05 +1100 [info]: listening dRuby uri="druby://127.0.0.1:24230" object="Engine" D, [2016-03-22T13:30:46.512589 #15576] DEBUG -- : [Aws::Kinesis::Client 0 0.018289 0 retries] put_records(stream_name:"FluentDHeartbeat",records:[{data:"{\"message\":\"Tue Mar 22 13:30:43 AEDT 2016\"}",partition_key:"ad2ec04a08f7d96ed93cb74215f4349d"}]) Net::HTTPServerException 407 "authenticationrequired"

2016-03-22 13:30:46 +1100 [warn]: temporarily failed to flush the buffer. next_retry=2016-03-22 13:30:47 +1100 error_class="Net::HTTPServerException" error="407 "authenticationrequired"" plugin_id="object:3fe9db3aabd8"
2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/2.1.0/net/http/response.rb:119:in error!' 2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/2.1.0/net/http/response.rb:128:invalue'
2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/2.1.0/net/http.rb:915:in connect' 2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/2.1.0/net/http.rb:863:indo_start'
2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/2.1.0/net/http.rb:858:in start' 2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/seahorse/client/net_http/connection_pool.rb:292:instart_session'
2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/seahorse/client/net_http/connection_pool.rb:104:in session_for' 2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/seahorse/client/net_http/handler.rb:109:insession'
2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/seahorse/client/net_http/handler.rb:61:in transmit' 2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/seahorse/client/net_http/handler.rb:35:incall'
2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/seahorse/client/plugins/content_length.rb:12:in call' 2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/aws-sdk-core/json/error_handler.rb:8:incall'
2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/aws-sdk-core/plugins/request_signer.rb:84:in call' 2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/aws-sdk-core/plugins/retry_errors.rb:87:incall'
2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/aws-sdk-core/json/handler.rb:11:in call' 2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/aws-sdk-core/plugins/user_agent.rb:12:incall'
2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/seahorse/client/plugins/endpoint.rb:41:in call' 2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/aws-sdk-core/plugins/param_validator.rb:21:incall'
2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/aws-sdk-core/plugins/logging.rb:39:in call' 2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/seahorse/client/plugins/raise_response_errors.rb:14:incall'
2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/aws-sdk-core/plugins/param_converter.rb:20:in call' 2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/seahorse/client/plugins/response_target.rb:21:incall'
2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/seahorse/client/request.rb:70:in send_request' 2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.14/lib/seahorse/client/base.rb:207:inblock (2 levels) in define_operation_methods'
2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-1.0.0/lib/fluent/plugin/out_kinesis_streams.rb:41:in batch_request' 2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-1.0.0/lib/fluent/plugin/kinesis_helper/api.rb:79:inbatch_request_with_retry'
2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-1.0.0/lib/fluent/plugin/out_kinesis_streams.rb:26:in block in write' 2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-1.0.0/lib/fluent/plugin/out_kinesis_streams.rb:25:ineach'
2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-1.0.0/lib/fluent/plugin/out_kinesis_streams.rb:25:in write' 2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.22/lib/fluent/buffer.rb:351:inwrite_chunk'
2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.22/lib/fluent/buffer.rb:330:in pop' 2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.22/lib/fluent/output.rb:338:intry_flush'
2016-03-22 13:30:46 +1100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.22/lib/fluent/output.rb:149:in `run'
D, [2016-03-22T13:30:47.418011 #15576] DEBUG -- : [Aws::Kinesis::Client 0 0.015474 0 retries] put_records(stream_name:"FluentDHeartbeat",records:[{data:"{"message":"Tue Mar 22 13:30:43 AEDT 2016"}",partition_key:"c23b30f2ae9e08c8b56b2cd1a9ed3132"}]) Net::HTTPServerException 407 "authenticationrequired"

2016-03-22 13:30:47 +1100 [warn]: temporarily failed to flush the buffer. next_retry=2016-03-22 13:30:49 +1100 error_class="Net::HTTPServerException" error="407 "authenticationrequired"" plugin_id="object:3fe9db3aabd8"
2016-03-22 13:30:47 +1100 [warn]: suppressed same stacktrace
D, [2016-03-22T13:30:49.487253 #15576] DEBUG -- : [Aws::Kinesis::Client 0 0.016633 0 retries] put_records(stream_name:"FluentDHeartbeat",records:[{data:"{"message":"Tue Mar 22 13:30:43 AEDT 2016"}",partition_key:"f20318016147ab2690b0fe79f151c273"}]) Net::HTTPServerException 407 "authenticationrequired"

2016-03-22 13:30:49 +1100 [warn]: temporarily failed to flush the buffer. next_retry=2016-03-22 13:30:53 +1100 error_class="Net::HTTPServerException" error="407 "authenticationrequired"" plugin_id="object:3fe9db3aabd8"
2016-03-22 13:30:49 +1100 [warn]: suppressed same stacktrace
D, [2016-03-22T13:30:53.465222 #15576] DEBUG -- : [Aws::Kinesis::Client 0 0.047881 0 retries] put_records(stream_name:"FluentDHeartbeat",records:[{data:"{"message":"Tue Mar 22 13:30:43 AEDT 2016"}",partition_key:"74722665315a868587d6228d2a9e97ad"}]) Net::HTTPServerException 407 "authenticationrequired"

2016-03-22 13:30:53 +1100 [warn]: temporarily failed to flush the buffer. next_retry=2016-03-22 13:31:00 +1100 error_class="Net::HTTPServerException" error="407 "authenticationrequired"" plugin_id="object:3fe9db3aabd8"
2016-03-22 13:30:53 +1100 [warn]: suppressed same stacktrace
D, [2016-03-22T13:31:00.990256 #15576] DEBUG -- : [Aws::Kinesis::Client 0 0.018406 0 retries] put_records(stream_name:"FluentDHeartbeat",records:[{data:"{"message":"Tue Mar 22 13:30:43 AEDT 2016"}",partition_key:"9e847c6a4655c8d448275cf8b296d3e2"}]) Net::HTTPServerException 407 "authenticationrequired"

2016-03-22 13:31:00 +1100 [warn]: temporarily failed to flush the buffer. next_retry=2016-03-22 13:31:15 +1100 error_class="Net::HTTPServerException" error="407 "authenticationrequired"" plugin_id="object:3fe9db3aabd8"
2016-03-22 13:31:00 +1100 [warn]: suppressed same stacktrace

Can not install plugin

I install plugin for td-agent as you wrote on documentation but when i run "td-agent -c file.conf" it says, the plugin kinesis could not find. What am i missing?

Can't push it from CentOS

We installed the td-agent and configure it as:

<source>
type tail
path /var/log/nginx/access.json.log
format json
tag nginx.access
</source>

## File output
## match tag=local.** and write to file
<match **>
type kinesis
stream_name teste-es
aws_key_id MYID
aws_sec_key MYSECKEY
region us-east-1
partition_key nginx
</match>

After a few requests to the nginx we keep seeing these errors and no message is produced to the Kinesis stream.

2015-11-18 16:29:04 +0000 [warn]: temporarily failed to flush the buffer. next_retry=2015-11-18 16:29:05 +0000 error_class="Aws::Kinesis::Errors::ValidationException" error="3 validation errors detected: Value '' at 'records.1.member.partitionKey' failed to satisfy constraint: Member must have length greater than or equal to 1; Value '' at 'records.2.member.partitionKey' failed to satisfy constraint: Member must have length greater than or equal to 1; Value '' at 'records.3.member.partitionKey' failed to satisfy constraint: Member must have length greater than or equal to 1" plugin_id="object:3f9730e90f14"
  2015-11-18 16:29:04 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.0/lib/seahorse/client/plugins/raise_response_errors.rb:15:in `call'
  2015-11-18 16:29:04 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.0/lib/aws-sdk-core/plugins/param_converter.rb:20:in `call'
  2015-11-18 16:29:04 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.0/lib/seahorse/client/plugins/response_target.rb:21:in `call'
  2015-11-18 16:29:04 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.0/lib/seahorse/client/request.rb:70:in `send_request'
  2015-11-18 16:29:04 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.2.0/lib/seahorse/client/base.rb:207:in `block (2 levels) in define_operation_methods'
  2015-11-18 16:29:04 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-0.4.0/lib/fluent/plugin/out_kinesis.rb:271:in `put_records_with_retry'
  2015-11-18 16:29:04 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-0.4.0/lib/fluent/plugin/out_kinesis.rb:144:in `block in write'
  2015-11-18 16:29:04 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-0.4.0/lib/fluent/plugin/out_kinesis.rb:143:in `each'
  2015-11-18 16:29:04 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-0.4.0/lib/fluent/plugin/out_kinesis.rb:143:in `write'
  2015-11-18 16:29:04 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.12/lib/fluent/buffer.rb:325:in `write_chunk'
  2015-11-18 16:29:04 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.12/lib/fluent/buffer.rb:304:in `pop'
  2015-11-18 16:29:04 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.12/lib/fluent/output.rb:321:in `try_flush'
  2015-11-18 16:29:04 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.12/lib/fluent/output.rb:140:in `run'
2015-11-18 16:29:05 +0000 [warn]: temporarily failed to flush the buffer. next_retry=2015-11-18 16:29:07 +0000 error_class="Aws::Kinesis::Errors::ValidationException" error="3 validation errors detected: Value '' at 'records.1.member.partitionKey' failed to satisfy constraint: Member must have length greater than or equal to 1; Value '' at 'records.2.member.partitionKey' failed to satisfy constraint: Member must have length greater than or equal to 1; Value '' at 'records.3.member.partitionKey' failed to satisfy constraint: Member must have length greater than or equal to 1" plugin_id="object:3f9730e90f14"
  2015-11-18 16:29:05 +0000 [warn]: suppressed same stacktrace
2015-11-18 16:29:07 +0000 [warn]: temporarily failed to flush the buffer. next_retry=2015-11-18 16:29:11 +0000 error_class="Aws::Kinesis::Errors::ValidationException" error="3 validation errors detected: Value '' at 'records.1.member.partitionKey' failed to satisfy constraint: Member must have length greater than or equal to 1; Value '' at 'records.2.member.partitionKey' failed to satisfy constraint: Member must have length greater than or equal to 1; Value '' at 'records.3.member.partitionKey' failed to satisfy constraint: Member must have length greater than or equal to 1" plugin_id="object:3f9730e90f14"
  2015-11-18 16:29:07 +0000 [warn]: suppressed same stacktrace
2015-11-18 16:29:11 +0000 [warn]: temporarily failed to flush the buffer. next_retry=2015-11-18 16:29:19 +0000 error_class="Aws::Kinesis::Errors::ValidationException" error="3 validation errors detected: Value '' at 'records.1.member.partitionKey' failed to satisfy constraint: Member must have length greater than or equal to 1; Value '' at 'records.2.member.partitionKey' failed to satisfy constraint: Member must have length greater than or equal to 1; Value '' at 'records.3.member.partitionKey' failed to satisfy constraint: Member must have length greater than or equal to 1" plugin_id="object:3f9730e90f14"
  2015-11-18 16:29:11 +0000 [warn]: suppressed same stacktrace

The log format being pushed from nginx is:


    log_format le_json '{ "time": "$time_iso8601", '
       '"remote_addr": "$remote_addr", '
       '"remote_user": "$remote_user", '
       '"body_bytes_sent": "$body_bytes_sent", '
       '"request_time": "$request_time", '
       '"status": "$status", '
       '"request": "$request", '
       '"request_method": "$request_method", '
       '"http_referrer": "$http_referer", '
       '"http_user_agent": "$http_user_agent", '
       '"http_x_forwarded_for": "$http_x_forwarded_for" }';

    access_log /var/log/nginx/access.json.log le_json;

and the ruby --version gives me ruby 1.9.3p551 (2014-11-13 revision 48407) [x86_64-linux]

KPL format aggregated logs

Here is a quick hack of this project to do KPL format aggregated logs (wrote it as a drastic change, because format/emit end up being different again, there are already a lot of ifs for not much code, and aggregation was semi-rejected in #24 - nevertheless, maybe you want this as part of this library?)

https://github.com/atlassian/fluent-plugin-kinesis-aggregation

Reason:
https://github.com/atlassian/fluent-plugin-kinesis-aggregation#before-you-start

The important part of the code:
https://github.com/atlassian/fluent-plugin-kinesis-aggregation/blob/master/lib/fluent/plugin/out_kinesis-aggregation.rb#L107

deploy version 1.0 plugin on system without internet

Hi All -

We're getting below error message when we try to download the plugin on a temp server ....idea is to move those gems on actual source that has no internet connection.

It worked with Version .4, but giving the below error message for version 1.0

Question - should I just grab active support manually or this could be something bigger than just simply grabbing a gem manually. Please advise.

Step 1 grab td-agent

curl -O http://packages.treasuredata.com.s3.amazonaws.com/2/redhat/5/i386/td-agent-2.2.1-0.el5.i386.rpm

Step 2 Install td-agent
rpm -ivh td-agent-2.2.1-0.el5.i386.rpm

Step 3 grab all the fluentd-agent related gem

td-agent-gem install fluent-plugin-kinesis -i repo --no-rdoc --no-ri

While doing step 3, it failed as โ€”

ERROR: Error installing fluent-plugin-kinesis:
activesupport requires Ruby version >= 2.2.2.

error_class="TypeError" error="can't convert StringIO into String" when sending record to Kinesis

Hi,

I am running into an issue where there is a error_class="TypeError" error="can't convert StringIO into String" with the following trace:

2014-12-23 01:53:20 +0000 [warn]: plugin/out_kinesis.rb:119:block in write: {:stream_name=>"tm_logger", :data=>"{\"json\":\"nevin is a dict\",\"time\":\"2014-12-23T01:53:17Z\",\"tag\":\"kinesis.test\"}", :partition_key=>"7fcba520-4120-435a-af24-8e2b40dda171"}
2014-12-23 01:53:20 +0000 [warn]: fluent/output.rb:369:rescue in try_flush: failed to flush the buffer. error_class="TypeError" error="can't convert StringIO into String" plugin_id="object:13be4c0"
2014-12-23 01:53:20 +0000 [warn]: fluent/output.rb:370:rescue in try_flush: retry count exceededs limit.
  2014-12-23 01:53:20 +0000 [warn]: /home/fluent/.rbenv/versions/1.9.3-p551/lib/ruby/1.9.1/base64.rb:65:in `pack'
  2014-12-23 01:53:20 +0000 [warn]: /home/fluent/.rbenv/versions/1.9.3-p551/lib/ruby/1.9.1/base64.rb:65:in `strict_encode64'
  2014-12-23 01:53:20 +0000 [warn]: /home/fluent/.rbenv/versions/1.9.3-p551/lib/ruby/gems/1.9.1/gems/aws-sdk-core-2.0.16/lib/aws-sdk-core/json/builder.rb:57:in `format'
  2014-12-23 01:53:20 +0000 [warn]: /home/fluent/.rbenv/versions/1.9.3-p551/lib/ruby/gems/1.9.1/gems/aws-sdk-core-2.0.16/lib/aws-sdk-core/json/builder.rb:24:in `block in structure'
  2014-12-23 01:53:20 +0000 [warn]: /home/fluent/.rbenv/versions/1.9.3-p551/lib/ruby/gems/1.9.1/gems/aws-sdk-core-2.0.16/lib/aws-sdk-core/json/builder.rb:20:in `each'
  2014-12-23 01:53:20 +0000 [warn]: /home/fluent/.rbenv/versions/1.9.3-p551/lib/ruby/gems/1.9.1/gems/aws-sdk-core-2.0.16/lib/aws-sdk-core/json/builder.rb:20:in `with_object'
  2014-12-23 01:53:20 +0000 [warn]: /home/fluent/.rbenv/versions/1.9.3-p551/lib/ruby/gems/1.9.1/gems/aws-sdk-core-2.0.16/lib/aws-sdk-core/json/builder.rb:20:in `structure'
  2014-12-23 01:53:20 +0000 [warn]: /home/fluent/.rbenv/versions/1.9.3-p551/lib/ruby/gems/1.9.1/gems/aws-sdk-core-2.0.16/lib/aws-sdk-core/json/builder.rb:53:in `format'
  2014-12-23 01:53:20 +0000 [warn]: /home/fluent/.rbenv/versions/1.9.3-p551/lib/ruby/gems/1.9.1/gems/aws-sdk-core-2.0.16/lib/aws-sdk-core/json/builder.rb:11:in `to_json'
  2014-12-23 01:53:20 +0000 [warn]: /home/fluent/.rbenv/versions/1.9.3-p551/lib/ruby/gems/1.9.1/gems/aws-sdk-core-2.0.16/lib/aws-sdk-core/json/rpc_body_handler.rb:20:in `build_json'
  2014-12-23 01:53:20 +0000 [warn]: /home/fluent/.rbenv/versions/1.9.3-p551/lib/ruby/gems/1.9.1/gems/aws-sdk-core-2.0.16/lib/aws-sdk-core/json/rpc_body_handler.rb:8:in `call'
  2014-12-23 01:53:20 +0000 [warn]: /home/fluent/.rbenv/versions/1.9.3-p551/lib/ruby/gems/1.9.1/gems/aws-sdk-core-2.0.16/lib/aws-sdk-core/json/rpc_headers_handler.rb:10:in `call'
  2014-12-23 01:53:20 +0000 [warn]: /home/fluent/.rbenv/versions/1.9.3-p551/lib/ruby/gems/1.9.1/gems/aws-sdk-core-2.0.16/lib/aws-sdk-core/plugins/user_agent.rb:12:in `call'
  2014-12-23 01:53:20 +0000 [warn]: /home/fluent/.rbenv/versions/1.9.3-p551/lib/ruby/gems/1.9.1/gems/aws-sdk-core-2.0.16/lib/seahorse/client/plugins/restful_bindings.rb:13:in `call'
  2014-12-23 01:53:20 +0000 [warn]: /home/fluent/.rbenv/versions/1.9.3-p551/lib/ruby/gems/1.9.1/gems/aws-sdk-core-2.0.16/lib/seahorse/client/plugins/endpoint.rb:35:in `call'
  2014-12-23 01:53:20 +0000 [warn]: /home/fluent/.rbenv/versions/1.9.3-p551/lib/ruby/gems/1.9.1/gems/aws-sdk-core-2.0.16/lib/seahorse/client/logging/handler.rb:10:in `call'
  2014-12-23 01:53:20 +0000 [warn]: /home/fluent/.rbenv/versions/1.9.3-p551/lib/ruby/gems/1.9.1/gems/aws-sdk-core-2.0.16/lib/seahorse/client/plugins/param_validation.rb:22:in `call'
  2014-12-23 01:53:20 +0000 [warn]: /home/fluent/.rbenv/versions/1.9.3-p551/lib/ruby/gems/1.9.1/gems/aws-sdk-core-2.0.16/lib/seahorse/client/plugins/raise_response_errors.rb:14:in `call'
  2014-12-23 01:53:20 +0000 [warn]: /home/fluent/.rbenv/versions/1.9.3-p551/lib/ruby/gems/1.9.1/gems/aws-sdk-core-2.0.16/lib/seahorse/client/plugins/param_conversion.rb:22:in `call'
  2014-12-23 01:53:20 +0000 [warn]: /home/fluent/.rbenv/versions/1.9.3-p551/lib/ruby/gems/1.9.1/gems/aws-sdk-core-2.0.16/lib/aws-sdk-core/plugins/response_paging.rb:10:in `call'
  2014-12-23 01:53:20 +0000 [warn]: /home/fluent/.rbenv/versions/1.9.3-p551/lib/ruby/gems/1.9.1/gems/aws-sdk-core-2.0.16/lib/seahorse/client/request.rb:70:in `send_request'
  2014-12-23 01:53:20 +0000 [warn]: /home/fluent/.rbenv/versions/1.9.3-p551/lib/ruby/gems/1.9.1/gems/aws-sdk-core-2.0.16/lib/seahorse/client/base.rb:215:in `block (2 levels) in define_operation_methods'
  2014-12-23 01:53:20 +0000 [warn]: /home/fluent/.rbenv/versions/1.9.3-p551/lib/ruby/gems/1.9.1/gems/fluent-plugin-kinesis-0.2.0/lib/fluent/plugin/out_kinesis.rb:120:in `block in write'
  2014-12-23 01:53:20 +0000 [warn]: /home/fluent/.rbenv/versions/1.9.3-p551/lib/ruby/gems/1.9.1/gems/fluentd-0.10.58/lib/fluent/plugin/buf_memory.rb:62:in `feed_each'
  2014-12-23 01:53:20 +0000 [warn]: /home/fluent/.rbenv/versions/1.9.3-p551/lib/ruby/gems/1.9.1/gems/fluentd-0.10.58/lib/fluent/plugin/buf_memory.rb:62:in `msgpack_each'
  2014-12-23 01:53:20 +0000 [warn]: /home/fluent/.rbenv/versions/1.9.3-p551/lib/ruby/gems/1.9.1/gems/fluent-plugin-kinesis-0.2.0/lib/fluent/plugin/out_kinesis.rb:98:in `write'
  2014-12-23 01:53:20 +0000 [warn]: /home/fluent/.rbenv/versions/1.9.3-p551/lib/ruby/gems/1.9.1/gems/fluentd-0.10.58/lib/fluent/buffer.rb:300:in `write_chunk'
  2014-12-23 01:53:20 +0000 [warn]: /home/fluent/.rbenv/versions/1.9.3-p551/lib/ruby/gems/1.9.1/gems/fluentd-0.10.58/lib/fluent/buffer.rb:280:in `pop'
  2014-12-23 01:53:20 +0000 [warn]: /home/fluent/.rbenv/versions/1.9.3-p551/lib/ruby/gems/1.9.1/gems/fluentd-0.10.58/lib/fluent/output.rb:315:in `try_flush'
  2014-12-23 01:53:20 +0000 [warn]: /home/fluent/.rbenv/versions/1.9.3-p551/lib/ruby/gems/1.9.1/gems/fluentd-0.10.58/lib/fluent/output.rb:136:in `run'

I added an extra log message at the top printing the data_to_put variable in the write(chunk) function of out_kinesis.rb

The problem was also described by someone else here: http://stackoverflow.com/questions/27578440/td-agent-is-not-not-forwarding-messages-to-kinesis-using-kinesis-plugin#_=_

Thanks!
Daniel Gollas

"kinesis_producer" results in 2x CPU consumption as "kinesis_streams"

Hi there,

I work at Twilio (a major AWS customer) and we're using fluentd & Kinesis as part of a log aggregation solution for our microservices.

I was very excited to see the 1.0 release of this plugin which added support for the C-based KPL log-shipper. Unfortunately in our testing, the KPL-based "kinesis_producer" output plugin seems to cause fluentd's "ruby2.0" process to consume >2x the CPU utilization vs using the "kinesis_streams" plugin. Ex: At 1,000 messages/second we see ~10% CPU usage with "kinesis_streams" and ~24% CPU% with "kinesis_producer".

Any thoughts/comments? Is this expected? Any thoughts on why the CPU consumption for the "ruby2.0" process with "kinesis_producer" would be so much higher?

Thanks,
Dan

More configuration for AssumeRole

Current assume_role_credentials parameter section supports limited configuration for AssumeRole. For example, aws_key_id and aws_sec_key are not passed to STS client, or the duration is fixed.

It is better to have more flexibility for configuration of AssumeRole, but there is no issue reported so far. If there is any request for AssmeRole, please comment here.

aws-fluent-plugin-kinesis on aws-sdk-core-2.5.1

Hello amazing AWS team,

Happy Monday! I hope you week starts off to a great start! We noticed there is a small issue with the new aws ruby SDK from last week suggesting the variable region couldn't be resolved after 2.5.1 (https://github.com/aws/aws-sdk-ruby/blob/master/aws-sdk-core/lib/aws-sdk-core/credential_provider_chain.rb#L76). Wondering if this might be something we could resolve on the fluend plugin's end?

2016-08-05 09:08:26 +0000 [warn]: temporarily failed to flush the buffer. next_retry=2016-08-05 09:08:27 +0000 error_class="NoMethodError" error="undefined methodregion' for #<#Class:0x007fb8401d0fb0:0x007fb8401d0e70>" plugin_id="object:3fdc23fd232c"
2016-08-05 09:08:26 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.5.1/lib/aws-sdk-core/credential_provider_chain.rb:76:in assume_role_credentials' 2016-08-05 09:08:26 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.5.1/lib/aws-sdk-core/credential_provider_chain.rb:12:inblock in resolve'
2016-08-05 09:08:26 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.5.1/lib/aws-sdk-core/credential_provider_chain.rb:11:in each' 2016-08-05 09:08:26 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.5.1/lib/aws-sdk-core/credential_provider_chain.rb:11:inresolve'
2016-08-05 09:08:26 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-1.1.0/lib/fluent/plugin/kinesis_helper/credentials.rb:43:in default_credentials_provider' 2016-08-05 09:08:26 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-1.1.0/lib/fluent/plugin/kinesis_helper/credentials.rb:34:incredentials'
2016-08-05 09:08:26 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-1.1.0/lib/fluent/plugin/kinesis_helper/api.rb:42:in client_options' 2016-08-05 09:08:26 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-1.1.0/lib/fluent/plugin/kinesis_helper/client.rb:19:inclient'
2016-08-05 09:08:26 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-1.1.0/lib/fluent/plugin/out_kinesis_firehose.rb:38:in batch_request' 2016-08-05 09:08:26 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-1.1.0/lib/fluent/plugin/kinesis_helper/api.rb:79:inbatch_request_with_retry'
2016-08-05 09:08:26 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-1.1.0/lib/fluent/plugin/out_kinesis_firehose.rb:26:in block in write' 2016-08-05 09:08:26 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-1.1.0/lib/fluent/plugin/out_kinesis_firehose.rb:25:ineach'
2016-08-05 09:08:26 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-1.1.0/lib/fluent/plugin/out_kinesis_firehose.rb:25:in write' 2016-08-05 09:08:26 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/buffer.rb:325:inwrite_chunk'
2016-08-05 09:08:26 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/buffer.rb:304:in pop' 2016-08-05 09:08:26 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/output.rb:321:intry_flush'
2016-08-05 09:08:26 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/output.rb:140:in run'

Urgent query on deploying the plugin without internet connection

Thanks for your outstanding help in other queries.

I've got a question on deploying your plugin on source that has no internet connection ...

Let me describe the situation --- The source is in totally closed area so we need to download the td-agent and your plugin in a separate box, and then move those two to source. But your plugin requires other package such as this --

./fluent-gem install fluent-plugin-kinesis

This will fetch some files: (Do these need to be manually downloaded??)
Fetching: jmespath-1.1.3.gemโ€จ
Fetching: aws-sdk-core-2.2.20.gem
โ€จFetching: fluent-plugin-kinesis-0.4.0.gem

So our understanding is that we need to manually download the file and build the plugin before move to source. So we did the below -

curl -O https://rubygems.org/gems/aws-sdk-core/versions/2.2.20
and then td-agent-gem 2.2.20 --- but didn't work.

So you see we are in the zone of dependency hell, what would be your recommendations to build the plugin as one package so we can deploy to source (that doesn't have internet connection)

Revisit backoff logic

Was looking through the backoff logic after we hit some "ProvisionedThroughputExceededException" errors. We see the following issues:

From

def calc(count)
(2 ** count) * scaling_factor
end
def scaling_factor
0.3 + (0.5-rand) * 0.1
end
:

  • scaling_factor is always going to be a number between 0.25-0.35. That is not a very wide spread.
  • Given that the default number of retries is 3 and the scaling factor is relatively narrow, calc(count) for counts {0,1,2} is going to return backoffs of approximately 0.5 sec, 1 sec, and 2 seconds, respectively. Therefore a temporary (~5 second) spike in traffic could cause records to be dropped via a ProvisionedThroughputExceededException, by the default configuration.

We're getting around this by setting retries_on_batch_request to 7 in order to give ourselves 30+ seconds of retries, but I think that one could reasonably argue that the defaults here for backoffs & retries are not great.

Plugin incompatible with new fluentd release 0.14

The kinesis plugin is not compatible with the new release of fluentd (0.14).
Startup fail with the following message :

2016-06-01 09:37:04 +0000 [error]: unexpected error error="uninitialized constant Fluent::DetachMultiProcessMixin"
2016-06-01 09:37:04 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-1.0.1/lib/fluent/plugin/kinesis_helper.rb:23:in `module:KinesisHelper'

The use of detach_process in an output plugin was pointed as a bad practice from fluentd developper tagomoris and will be deprecated in future release.

output plugins using detach_process looks very curious from viewpoint of fluentd core developer...

Cannot handle record in UTF-8 format

When the record in UTF-8 format, the plugin will raise Encoding::UndefinedConversionError. The following is td-agent log.

  2015-01-16 07:01:06 +0000 [warn]: emit transaction failed  error_class=Encoding::UndefinedConversionError error=#<Encoding::UndefinedConversionError: "\xE5" from ASCII-8BIT to UTF-8>
    2015-01-16 07:01:06 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-0.3.0/lib/fluent/plugin/out_kinesis.rb:100:in `encode'
    2015-01-16 07:01:06 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-0.3.0/lib/fluent/plugin/out_kinesis.rb:100:in `to_json'
    2015-01-16 07:01:06 +0000 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-0.3.0/lib/fluent/plugin/out_kinesis.rb:100:in `format'

cloudwatch

Great to see there are heaps of cloud watch support.

Correct me if I'm wrong, they are only available for Producer option, not the kinesis_Stream type?

Metrics (buffer_queue_length and buffer_total_queued_size) not available with detached processes

Fluentd metrics are useful to check for contention in output plugins buffers.
buffer_queue_length and buffer_total_queued_size can be checked against maximum values to ensure that no buffer chunk is about to be dropped.

When using detach_process, the metrics are no more available. Is it possible to agregate metrics for all detached processes?

Detaching processes may be the only way to ensure high throughput because increasing only the number of threads is not enough : 1 process with 50 threads is slower than 5 processes with 10 threads each.

error_class=Encoding::UndefinedConversionError error="\"\\xC2\" from ASCII-8BIT to UTF-8"

Hello,

Im having an issue only when I'm using the kinsis plugin with my logs, I'm messages in my logs " error=""\xC2" from ASCII-8BIT to UTF-8"

<source>
  force_encoding('UTF-8')
  type  tail
  format  json
  path  /to/some/file
  tag  somelog
  pos_file /some/position/file.whatever
</source>

<match **>
  type kinesis
  random_partition_key
  aws_key_id xxxxxx
  aws_sec_key xxxxxxxx
  region us-east-1
  stream_name xxxxx
  flush_interval 
</match>

ruby 2.2.3p173 (2015-08-18 revision 51636) [x86_64-linux]
aws-sdk-core (2.1.28)
bigdecimal (1.2.6)
cool.io (1.4.1)
fluent-plugin-kinesis (0.3.6)
fluentd (0.12.16)
http_parser.rb (0.6.0)
io-console (0.4.3)
jmespath (1.1.3)
json (1.8.1)
minitest (5.4.3)
msgpack (0.5.12)
multi_json (1.11.2)
power_assert (0.2.2)
psych (2.0.8)
rake (10.4.2)
rdoc (4.2.0)
sigdump (0.2.3)
string-scrub (0.0.5)
test-unit (3.0.8)
thread_safe (0.3.5)
tzinfo (1.2.2)
tzinfo-data (1.2015.7)
yajl-ruby (1.2.1)

any help or suggestions would be appreciated.

SSL issue

When we use proxy, this is what will happen, right?

Fluent-plugin-Kinesis is writing to our proxy in cleartext (no SSL) and then the proxy is sending it up to Kinesis via SSL, correct.

So we need to handle SSL from Proxy server to AWS Kinesis regional end point??

Parallelism and throughput question

I setup fluent to ingest nginx logs into Kinesis using this plugin and I am trying to configure it to keep up with the throughput. I have around 30-40 log events per second, which seems reasonable.

I have the Kinesis stream set for 2 shards, which should be enough and the Amazon monitoring does not show any issues (write capacity and read capacity usage are reading near zero).

I setup two EC2 xlarge machines. The nginx processes and disk usage have absolutely no problems keeping up. Nginx logs show in real time. Each machine has td-agent running. Tweaking this seems to be where the problem is. I have tried with detach_process 1 through 4, and all kinds of values for num_threads. I also tried setting it to random_partition_key true

No matter what I try the CPU is not saturated and the ruby processes can't seem to keep up. During higher loads (around 40 events per second, or 20 per second per machine) it falls to 20 or 30 minutes behind.

Are there any recommended settings tweaks that I can try, or possibly more machines for network throughput, or is this expected for this load to kinesis?

fluentd agent and plugin

Hi,

Referring to your previous answer around supporting http_proxy.

There might be a confusion here, so let me clarify.

First, our assumption was was-fluent-plugin-kinesis is based on KPL, that's why we asked whether this http_proxy is part of KPL code.

Second, let me tell you how we think of deploying -

  1. fluentd agent to collect data
  2. and your plugin to send data to kinesis stream
  3. And we'll have a proxy in between.

So the question is, if we use your plugin, the proxy is supported. And the second question is, your plugin is based on KPL or actually it's written on AWS SDK for Ruby. If that's the case, do we need to have both Java (1.7) and Ruby Interpretor on the source system for your agent to run?

Please let us know.

Thanks.

Removing message and time

Hi,

How can I remove message and time field below keeping only json {"transactionId":"221620499",....} in the message forwarded to kinesis forehose?

{
"message": "\r{"transactionId":"221620499",....}",
"time": "2016-07-20T10:58:50-04:00"
}

Thanks

Unclear error messages being logged: could it be more detailed?

Hi there,
we recently have observed a relatively high number of stack-traces as the following one on our fluent-kinesis agents:

2015-07-31 13:00:08 +0100 [warn]: temporarily failed to flush the buffer. next_retry=2015-07-31 13:00:03 +0100 error_class="Aws::Kinesis::Errors::InternalFailure" error="" plugin_id="object:156192f6c0d0"
2015-07-31 13:00:08 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/seahorse/client/plugins/raise_response_errors.rb:15:in `call' 
2015-07-31 13:00:08 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/seahorse/client/plugins/param_conversion.rb:22:in `call' 
2015-07-31 13:00:08 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/aws-sdk-core/plugins/response_paging.rb:10:in `call' 
2015-07-31 13:00:08 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/seahorse/client/plugins/response_target.rb:18:in `call' 
2015-07-31 13:00:08 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/seahorse/client/request.rb:70:in `send_request' 
2015-07-31 13:00:08 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/seahorse/client/base.rb:216:in `block (2 levels) in define_operation_methods' 
2015-07-31 13:00:08 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-0.3.1.2/lib/fluent/plugin/out_kinesis.rb:232:in `put_records_with_retry' 
2015-07-31 13:00:08 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-0.3.1.2/lib/fluent/plugin/out_kinesis.rb:249:in `put_records_with_retry' 
2015-07-31 13:00:08 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-0.3.1.2/lib/fluent/plugin/out_kinesis.rb:128:in `block in write' 
2015-07-31 13:00:08 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-0.3.1.2/lib/fluent/plugin/out_kinesis.rb:127:in `each' 
2015-07-31 13:00:08 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-0.3.1.2/lib/fluent/plugin/out_kinesis.rb:127:in `write' 
2015-07-31 13:00:08 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/buffer.rb:325:in `write_chunk' 
2015-07-31 13:00:08 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/buffer.rb:304:in `pop' 
2015-07-31 13:00:08 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/output.rb:321:in `try_flush' 
2015-07-31 13:00:08 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/output.rb:140:in `run'

We have see this happening around the same time other errors, most likely related to a DNS issue, occurred. Here is that other stack-trace:

2015-08-12 17:58:05 +0100 [warn]: temporarily failed to flush the buffer. next_retry=2015-08-12 17:58:03 +0100 error_class="Seahorse::Client::NetworkingError" error="getaddrinfo: Temporary failure in name resolution" plugin_id="object:1581b1d6e0dc" 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/2.1.0/net/http.rb:879:in `initialize'
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/2.1.0/net/http.rb:879:in `open'
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/2.1.0/net/http.rb:879:in `block in connect' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/2.1.0/timeout.rb:91:in `block in timeout' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/2.1.0/timeout.rb:101:in `call'
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/2.1.0/timeout.rb:101:in `timeout'
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/2.1.0/net/http.rb:878:in `connect'
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/2.1.0/net/http.rb:863:in `do_start'
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/2.1.0/net/http.rb:858:in `start'
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/seahorse/client/net_http/connection_pool.rb:292:in `start_session' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/seahorse/client/net_http/connection_pool.rb:104:in `session_for' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/seahorse/client/net_http/handler.rb:108:in `session' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/seahorse/client/net_http/handler.rb:60:in `transmit' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/seahorse/client/net_http/handler.rb:34:in `call' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/seahorse/client/plugins/content_length.rb:12:in `call' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/aws-sdk-core/json/error_handler.rb:8:in `call' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/aws-sdk-core/plugins/request_signer.rb:78:in `call' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/aws-sdk-core/plugins/retry_errors.rb:88:in `call' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/aws-sdk-core/plugins/retry_errors.rb:119:in `retry_request' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/aws-sdk-core/plugins/retry_errors.rb:102:in `retry_if_possible' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/aws-sdk-core/plugins/retry_errors.rb:90:in `call' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/aws-sdk-core/plugins/retry_errors.rb:119:in `retry_request' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/aws-sdk-core/plugins/retry_errors.rb:102:in `retry_if_possible' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/aws-sdk-core/plugins/retry_errors.rb:90:in `call' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/aws-sdk-core/plugins/retry_errors.rb:119:in `retry_request' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/aws-sdk-core/plugins/retry_errors.rb:102:in `retry_if_possible' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/aws-sdk-core/plugins/retry_errors.rb:90:in `call' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/aws-sdk-core/json/rpc_body_handler.rb:9:in `call' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/aws-sdk-core/json/rpc_headers_handler.rb:10:in `call' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/aws-sdk-core/plugins/user_agent.rb:12:in `call' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/seahorse/client/plugins/restful_bindings.rb:13:in `call' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/seahorse/client/plugins/endpoint.rb:31:in `call' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/seahorse/client/plugins/param_validation.rb:22:in `call' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/seahorse/client/plugins/raise_response_errors.rb:14:in `call' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/seahorse/client/plugins/param_conversion.rb:22:in `call' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/aws-sdk-core/plugins/response_paging.rb:10:in `call' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/seahorse/client/plugins/response_target.rb:18:in `call' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/seahorse/client/request.rb:70:in `send_request' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/aws-sdk-core-2.0.45/lib/seahorse/client/base.rb:216:in `block (2 levels) in define_operation_methods' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-0.3.1.2/lib/fluent/plugin/out_kinesis.rb:232:in `put_records_with_retry' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-0.3.1.2/lib/fluent/plugin/out_kinesis.rb:128:in `block in write' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-0.3.1.2/lib/fluent/plugin/out_kinesis.rb:127:in `each' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-0.3.1.2/lib/fluent/plugin/out_kinesis.rb:127:in `write' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/buffer.rb:325:in `write_chunk' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/buffer.rb:304:in `pop' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/output.rb:321:in `try_flush' 
2015-08-12 17:58:05 +0100 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.7/lib/fluent/output.rb:140:in `run'

It would be nice from an end-user perspective to have a better sense of why the agent gave up.

Thanks in advance!

Using KMS encrypted secret key

We have the kinesis stream that we are writing to in a different account than the clients, and given that the aws-sdk doesn't support using the full ARN for the stream name, we have to distribute access and secret keys instead of IAM roles which is less than ideal. Would you be open to a feature that allow to pass a KMS encrypted secret key using a different variable name e.g. kms_aws_sec_key as we could use IAM role policy to decrypt the key and thus avoid leaking the secret? I'd be happy to contribute a PR for that.

Data is based64 encoded twice

The data put to Kinesis seem to be base64 encoded twice. When using https://github.com/awslabs/amazon-kinesis-client-python to get data from Kinesis stream put by this plugin, I need to use the following python code to get my JSON data correctly.

data = base64.b64decode(base64.b64decode(record.get('data')))

If I use https://github.com/boto/boto to put data into Kinesis, the following code is sufficient to get my JSON data correctly.

data = base64.b64decode(record.get('data'))

The following is content of td-agent.conf

<match extracted.*.*>
  type kinesis

  stream_name <censored>
  aws_key_id <censored>
  aws_sec_key <censored>
  region <censored>

  random_partition_key true
</match>

Consider removing kinesis connection from 'start'

If kinesis is for some reason inaccessible when this plugin starts, the whole fluentd process crashes (exceptions thrown in start aren't handled, it seems).

In my opinion this is not ideal behaviour, both because there might be other output sources that should keep functioning in the absence of kinesis, AND if kinesis returns fluentd should have been buffering in the meanwhile.

Consider, for instance, the fluentd elasticsearch plugin's approach: https://github.com/uken/fluent-plugin-elasticsearch/blob/master/lib/fluent/plugin/out_elasticsearch.rb
(nothing is done in 'start')

What do you think?

Support for batching messages into a single kinesis record

Hi

It would be cool to be able to batch a number of log messages per kinesis put record in some way, since kinesis supports up to a 1MB payload and a single log message can easy be <1kb. So it seems wasteful on high throughput systems to use 1 kinesis record per log message.

If you create such a feature, it could be provided by a config option, and then be limited on number of log messages and/or size, with a minimum send timeout.

Formatting for the data blob would be an issue here (e.g. multiple JSON records per message). Perhaps there could also be a config option for this too, whether it goes into a bigger json (records unordered), json array, or simply the same format but line separated.

Thanks

Fluentd Conflict

We have recently encountered this error:

"Unable to activate fluent-plugin-kinesis-0.3.0, because fluentd-0.12.2 conflicts with fluentd (< 0.11, >= 0.10.53) (Gem::LoadError),"

is there any reason why the plugin is only compatible to fluentd versions up to 0.11 only? Please advise.

Thanks.

[1.0RC] kinesis_producer plugin causes an error on td-agent

Kinesis_producer plugin causes an error on td-agent.
It looks like it is concerned with @kinesis_producer.credentials_refresh_delay.

  • Ubuntu Server 14.04 Trusty
  • td-agent 2.3.1

install instruction

$ curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-trusty-td-agent2.sh | sh
$ sudo td-agent-gem install fluent-plugin-kinesis --pre

full td-agent.log

2016-03-10 06:56:47 +0000 [info]: reading config file path="/etc/td-agent/td-agent.conf"
2016-03-10 06:56:47 +0000 [info]: starting fluentd-0.12.20
2016-03-10 06:56:47 +0000 [info]: gem 'fluent-mixin-config-placeholders' version '0.3.1'
2016-03-10 06:56:47 +0000 [info]: gem 'fluent-mixin-plaintextformatter' version '0.2.6'
2016-03-10 06:56:47 +0000 [info]: gem 'fluent-plugin-kinesis' version '1.0.0.rc3'
2016-03-10 06:56:47 +0000 [info]: gem 'fluent-plugin-mongo' version '0.7.12'
2016-03-10 06:56:47 +0000 [info]: gem 'fluent-plugin-rewrite-tag-filter' version '1.5.4'
2016-03-10 06:56:47 +0000 [info]: gem 'fluent-plugin-s3' version '0.6.5'
2016-03-10 06:56:47 +0000 [info]: gem 'fluent-plugin-scribe' version '0.10.14'
2016-03-10 06:56:47 +0000 [info]: gem 'fluent-plugin-td' version '0.10.28'
2016-03-10 06:56:47 +0000 [info]: gem 'fluent-plugin-td-monitoring' version '0.2.2'
2016-03-10 06:56:47 +0000 [info]: gem 'fluent-plugin-webhdfs' version '0.4.1'
2016-03-10 06:56:47 +0000 [info]: gem 'fluentd' version '0.12.20'
2016-03-10 06:56:47 +0000 [info]: adding match pattern="td.*.*" type="tdlog"
2016-03-10 06:56:47 +0000 [info]: adding match pattern="debug.**" type="stdout"
2016-03-10 06:56:47 +0000 [info]: adding match pattern="**" type="kinesis_producer"
2016-03-10 06:56:47 +0000 [info]: adding source type="forward"
2016-03-10 06:56:47 +0000 [info]: adding source type="http"
2016-03-10 06:56:47 +0000 [info]: adding source type="debug_agent"
2016-03-10 06:56:47 +0000 [info]: adding source type="tail"
2016-03-10 06:56:47 +0000 [info]: using configuration file: <ROOT>
  <match td.*.*>
    type tdlog
    apikey xxxxxx
    auto_create_table
    buffer_type file
    buffer_path /var/log/td-agent/buffer/td
    <secondary>
      type file
      path /var/log/td-agent/failed_records
      buffer_path /var/log/td-agent/failed_records.*
    </secondary>
  </match>
  <match debug.**>
    type stdout
  </match>
  <source>
    type forward
  </source>
  <source>
    type http
    port 8888
  </source>
  <source>
    type debug_agent
    bind 127.0.0.1
    port 24230
  </source>
  <source>
    @type tail
    path /var/log/apache2/access.log
    pos_file /var/log/td-agent/httpd-access.log.pos
    tag apache.access
    format apache2
  </source>
  <match **>
    @type kinesis_producer
    region us-west-2
    stream_name takipone-test
  </match>
</ROOT>
2016-03-10 06:56:47 +0000 [error]: unexpected error error_class=NoMethodError error=#<NoMethodError: undefined method `credentials_refresh_delay' for nil:NilClass>
  2016-03-10 06:56:47 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-1.0.0.rc3/lib/fluent/plugin/kinesis_helper/kpl.rb:44:in `client_options'
  2016-03-10 06:56:47 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-1.0.0.rc3/lib/fluent/plugin/kinesis_helper/client.rb:19:in `client'
  2016-03-10 06:56:47 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-1.0.0.rc3/lib/fluent/plugin/kinesis_helper/kpl.rb:29:in `block in start'
  2016-03-10 06:56:47 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/process.rb:485:in `call'
  2016-03-10 06:56:47 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/process.rb:485:in `detach_multi_process'
  2016-03-10 06:56:47 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluent-plugin-kinesis-1.0.0.rc3/lib/fluent/plugin/kinesis_helper/kpl.rb:27:in `start'
  2016-03-10 06:56:47 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/agent.rb:67:in `block in start'
  2016-03-10 06:56:47 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/agent.rb:66:in `each'
  2016-03-10 06:56:47 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/agent.rb:66:in `start'
  2016-03-10 06:56:47 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/root_agent.rb:104:in `start'
  2016-03-10 06:56:47 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/engine.rb:225:in `start'
  2016-03-10 06:56:47 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/engine.rb:175:in `run'
  2016-03-10 06:56:47 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/supervisor.rb:597:in `run_engine'
  2016-03-10 06:56:47 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/supervisor.rb:148:in `block in start'
  2016-03-10 06:56:47 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/supervisor.rb:352:in `call'
  2016-03-10 06:56:47 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/supervisor.rb:352:in `main_process'
  2016-03-10 06:56:47 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/supervisor.rb:325:in `block in supervise'
  2016-03-10 06:56:47 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/supervisor.rb:324:in `fork'
  2016-03-10 06:56:47 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/supervisor.rb:324:in `supervise'
  2016-03-10 06:56:47 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/supervisor.rb:142:in `start'
  2016-03-10 06:56:47 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/lib/fluent/command/fluentd.rb:171:in `<top (required)>'
  2016-03-10 06:56:47 +0000 [error]: /opt/td-agent/embedded/lib/ruby/site_ruby/2.1.0/rubygems/core_ext/kernel_require.rb:54:in `require'
  2016-03-10 06:56:47 +0000 [error]: /opt/td-agent/embedded/lib/ruby/site_ruby/2.1.0/rubygems/core_ext/kernel_require.rb:54:in `require'
  2016-03-10 06:56:47 +0000 [error]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.20/bin/fluentd:6:in `<top (required)>'
  2016-03-10 06:56:47 +0000 [error]: /opt/td-agent/embedded/bin/fluentd:23:in `load'
  2016-03-10 06:56:47 +0000 [error]: /opt/td-agent/embedded/bin/fluentd:23:in `<top (required)>'
  2016-03-10 06:56:47 +0000 [error]: /usr/sbin/td-agent:7:in `load'
  2016-03-10 06:56:47 +0000 [error]: /usr/sbin/td-agent:7:in `<main>'
2016-03-10 06:56:47 +0000 [info]: shutting down fluentd
2016-03-10 06:56:47 +0000 [info]: shutting down output type="tdlog" plugin_id="object:3fca814a9558"
2016-03-10 06:56:47 +0000 [info]: shutting down output type="stdout" plugin_id="object:3fca83e3ee18"
2016-03-10 06:56:47 +0000 [info]: process finished code=0
2016-03-10 06:56:47 +0000 [warn]: process died within 1 second. exit.

KPL Records have same partition key

I use kinesis_producer type and not set partition_key.
Not aggregated records have a random partition key, but aggregated records have same partition key "a".
Is this behavior correct?

Here are outputs of aws kinesis get-records (Excerpt)

        {
            "Data": "eyJob3N0IjoiMTI3LjAuMC4xIiwidXNlciI6bnVsbCwibWV0aG9kIjoiR0VUIiwicGF0aCI6Ii8iLCJjb2RlIjoyMDAsInNpemUiOjExNzY0LCJyZWZlcmVyIjpudWxsLCJhZ2VudCI6ImN1cmwvNy4zNS4wIn0=",
            "PartitionKey": "a11392987a071f957ab3505bfc25cace",
            "ApproximateArrivalTimestamp": 1458397403.342,
            "SequenceNumber": "49559948046087405716686625491085803253240564075691245570"
        },
        {
            "Data": "84mawgogNDJmZTg3OWMzOGQ2YmQ5YjM4ZTAyODk5Y2IwZDBhOTAKIDRlMzQ2NDFjYmEwZGFkOGZlZGFiMGNiN2Y3NzVlMjRiCiBmZDg0NGRkZTUyNWI3OTVlYjA3NGIyNmY4ZTlhMzRhOBp/CAAae3siaG9zdCI6IjEyNy4wLjAuMSIsInVzZXIiOm51bGwsIm1ldGhvZCI6IkdFVCIsInBhdGgiOiIvIiwiY29kZSI6MjAwLCJzaXplIjoxMTc4MywicmVmZXJlciI6bnVsbCwiYWdlbnQiOiJBcGFjaGVCZW5jaC8yLjMifRp/CAEae3siaG9zdCI6IjEyNy4wLjAuMSIsInVzZXIiOm51bGwsIm1ldGhvZCI6IkdFVCIsInBhdGgiOiIvIiwiY29kZSI6MjAwLCJzaXplIjoxMTc4MywicmVmZXJlciI6bnVsbCwiYWdlbnQiOiJBcGFjaGVCZW5jaC8yLjMifRp/CAIae3siaG9zdCI6IjEyNy4wLjAuMSIsInVzZXIiOm51bGwsIm1ldGhvZCI6IkdFVCIsInBhdGgiOiIvIiwiY29kZSI6MjAwLCJzaXplIjoxMTc4MywicmVmZXJlciI6bnVsbCwiYWdlbnQiOiJBcGFjaGVCZW5jaC8yLjMifWI6nYO/8w3iHbNp9OPICuY=",
            "PartitionKey": "a",
            "ApproximateArrivalTimestamp": 1458397523.517,
            "SequenceNumber": "49559948046087405716686625493830064863765780548611473410"
        },

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.