Code Monkey home page Code Monkey logo

logstash-input-sqs's Introduction

Logstash Plugin

Travis Build Status

This is a plugin for Logstash.

It is fully free and fully open source. The license is Apache 2.0, meaning you are pretty much free to use it however you want in whatever way.

Documentation

Logstash provides infrastructure to automatically generate documentation for this plugin. We use the asciidoc format to write documentation so any comments in the source code will be first converted into asciidoc and then into html. All plugin documentation are placed under one central location.

Need Help?

Need help? Try #logstash on freenode IRC or the https://discuss.elastic.co/c/logstash discussion forum.

Developing

1. Plugin Developement and Testing

Code

  • To get started, you'll need JRuby with the Bundler gem installed.

  • Create a new plugin or clone and existing from the GitHub logstash-plugins organization. We also provide example plugins.

  • Install dependencies

bundle install

Test

  • Update your dependencies
bundle install
  • Run tests
bundle exec rspec

2. Running your unpublished Plugin in Logstash

2.1 Run in a local Logstash clone

  • Edit Logstash Gemfile and add the local plugin path, for example:
gem "logstash-filter-awesome", :path => "/your/local/logstash-filter-awesome"
  • Install plugin
# Logstash 2.3 and higher
bin/logstash-plugin install --no-verify

# Prior to Logstash 2.3
bin/plugin install --no-verify
  • Run Logstash with your plugin
bin/logstash -e 'filter {awesome {}}'

At this point any modifications to the plugin code will be applied to this local Logstash setup. After modifying the plugin, simply rerun Logstash.

2.2 Run in an installed Logstash

You can use the same 2.1 method to run your plugin in an installed Logstash by editing its Gemfile and pointing the :path to your local plugin development directory or you can build the gem and install it using:

  • Build your plugin gem
gem build logstash-filter-awesome.gemspec
  • Install the plugin from the Logstash home
# Logstash 2.3 and higher
bin/logstash-plugin install --no-verify

# Prior to Logstash 2.3
bin/plugin install --no-verify
  • Start Logstash and proceed to test the plugin

Contributing

All contributions are welcome: ideas, patches, documentation, bug reports, complaints, and even something you drew up on a napkin.

Programming is not a required skill. Whatever you've seen about open source and maintainers or community members saying "send patches or die" - you will not see that here.

It is more important to the community that you are able to contribute.

For more information about contributing, see the CONTRIBUTING file.

logstash-input-sqs's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

logstash-input-sqs's Issues

plugin fails in case of (intermitent) DNS outage

on (7.6.2) a (temporary) DNS outage at AWS' might lead to a LS shutdown as the exception propagates up:

[2020-11-13T10:02:23,947][ERROR][org.logstash.execution.WorkerLoop][main] Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash.
org.jruby.exceptions.StandardError: (NetworkingError) Failed to open TCP connection to sqs.us-east-2.amazonaws.com:443 (initialize: name or service not known)
warning: thread "[main]>worker0" terminated with exception (report_on_exception is true):
java.lang.IllegalStateException: org.jruby.exceptions.StandardError: (NetworkingError) Failed to open TCP connection to sqs.us-east-2.amazonaws.com:443 (initialize: name or service not known)
        at org.logstash.execution.WorkerLoop.run(org/logstash/execution/WorkerLoop.java:85)
        at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:498)
        at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:441)
        at org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:305)
        at mnt.c.bash.logstash_minus_7_dot_6_dot_2.logstash_minus_core.lib.logstash.java_pipeline.start_workers(/mnt/c/bash/logstash-7.6.2/logstash-core/lib/logstash/java_pipeline.rb:262)

the wrapped error is expected to be a SocketError with message initialize: name or service not known
(comming from org/jruby/ext/socket/RubyTCPSocket.java's initialize method)

the plugin should handle these kinds of networking-errors with a retry

essentially the same issue as #55 but I've reported a duplicate to make the trace easier to locate.

Setting MAX_MESSAGES_TO_FETCH to 1 causes undefined method error

While troubleshooting an issue of why not all my SQS messages are being processed, I tried to set the MAX_MESSAGES_TO_FETCH to 1 and got the following.

{:timestamp=>"2015-10-14T23:20:06.700000+0000", :message=>"A plugin had an unrecoverable error. Will restart this plugin.\n Plugin: <LogStash::Inputs::SQS_S3 access_key_id=>"###", secret_access_key=>"###", queue=>"###", bucket=>"###", codec=>"###", debug=>false, threads=>1, region=>"us-east-1", delete=>false, interval=>60, temporary_directory=>"/tmp/logstash", s3_endpoint=>"s3.amazonaws.com", polling_frequency=>5>\n Error: undefined method `each' for #Aws::SQS::Types::Message:0x7f57c049", :level=>:error}

Null Pointer exception kill the input thread.

This process run for a few hours without problem then the input thread crash with this error.

{:timestamp=>"2015-02-20T10:07:36.677000+0000", :message=>"Error reading SQS queue.", :error=>java.lang.NullPointerException, :queue=>"c4-elasticsearch-inbound-perf", :level=>:error, :file=>"logstash/inputs/sqs.rb", :line=>"167"}

Error reading SQS queue - Timeout::Error

Also the issue I think is not just checksum's but any sort of issue... (plugin doesn't restart as expected just dies)

{:timestamp=>"2015-06-11T16:27:43.449000-0400", :message=>"Error reading SQS queue.", :error=>#<Timeout::Error: execution expired>, :queue=>"int-LogQueue", :level=>:error}

Logstash 1.4.2:

We've seen this when the queue has like 1M messages the timeout happens.

I have 8 threads on the SQS input and looking through the log you can watch them all die:

{:timestamp=>"2015-06-11T12:53:22.236000-0400", :message=>"Error reading SQS queue.", :error=>#<Timeout::Error: execution expired>, :queue=>"int-LogQueue", :level=>:error}
{:timestamp=>"2015-06-11T13:05:04.023000-0400", :message=>"Error reading SQS queue.", :error=>#<Timeout::Error: execution expired>, :queue=>"int-LogQueue", :level=>:error}
{:timestamp=>"2015-06-11T13:10:38.453000-0400", :message=>"Error reading SQS queue.", :error=>#<Timeout::Error: execution expired>, :queue=>"int-LogQueue", :level=>:error}
{:timestamp=>"2015-06-11T13:19:27.005000-0400", :message=>"Error reading SQS queue.", :error=>#<Timeout::Error: execution expired>, :queue=>"int-LogQueue", :level=>:error}
{:timestamp=>"2015-06-11T13:22:55.558000-0400", :message=>"Error reading SQS queue.", :error=>#<Timeout::Error: execution expired>, :queue=>"int-LogQueue", :level=>:error}
{:timestamp=>"2015-06-11T13:25:58.123000-0400", :message=>"Error reading SQS queue.", :error=>#<Timeout::Error: execution expired>, :queue=>"int-LogQueue", :level=>:error}
{:timestamp=>"2015-06-11T13:41:36.046000-0400", :message=>"Error reading SQS queue.", :error=>#<Timeout::Error: execution expired>, :queue=>"int-LogQueue", :level=>:error}
{:timestamp=>"2015-06-11T13:55:24.643000-0400", :message=>"Error reading SQS queue.", :error=>#<Timeout::Error: execution expired>, :queue=>"int-LogQueue", :level=>:error}

Error reading SQS queue

(This issue was originally filed by @PraveenJayakrishnakumar at elastic/logstash#2190)


We are using logstash indexer to read all logs from logstash-agent. But, sometime, we are getting checksum failed error. we are using logstash verion 1.4.3

:message=>"Error reading SQS queue.", :error=>#<AWS::SQS::Errors::ChecksumError: 2 messages failed checksum verification>,

No tests / Tests missing

Hi,
this plugin has no test at all, this is a situation that should be amended soon, so for example changes in both the plugin or the logstash core himself, are properly tested in the CI environment.

If you are giving this plugin some test, feel free to update elastic/logstash#3740 so we keep track of it.

AWS region "ap-south-1" not supported

New region Asia Pacific (Mumbai) has been recently launched. SQS connect from logstash fails on region validation.

Invalid setting for sqs input plugin:

  input {
    sqs {
      # This setting must be a ["us-east-1", "us-west-1", "us-west-2", "eu-central-1", "eu-west-1", "ap-southeast-1", "ap-southeast-2", "ap-northeast-1", "ap-northeast-2", "sa-east-1", "us-gov-west-1", "cn-north-1"]
      # Expected one of ["us-east-1", "us-west-1", "us-west-2", "eu-central-1", "eu-west-1", "ap-southeast-1", "ap-southeast-2", "ap-northeast-1", "ap-northeast-2", "sa-east-1", "us-gov-west-1", "cn-north-1"], got ["ap-south-1"]
      region => "ap-south-1"
      ...
    }
  } {:level=>:error}
fetched an invalid config {:config=>"input {\n   sqs {\n     queue => \"myqueue\"\n     access_key_id => \"%%%%%%\"\n     secret_access_key => \"%%%%%\"\n     region => \"ap-south-1\"\n     type => \"events\"\n   }\n\n}\n\noutput {\n\n      stdout { codec => rubydebug }\n\n      if [type]==\"events\"{\n                elasticsearch {\n                        protocol => \"http\"\n                        document_type => \"events\"\n                        host => [\"172.31.27.76\", \"172.31.27.77\"]\n                        index => \"events\"\n                }\n        }\n}\n\n\n", :reason=>"Something is wrong with your configuration.", :level=>:error}


provide (ECS) filed name defaults

The plugin relies on user configuration of a number of fields, if possible we should provide ECS defaults:

  • id_field - event.id?
  • md5_field - [@metadata][input][sqs][event][hash][md5]?
  • sent_timestamp_field - @timestamp?

NOTE: there's no existing conflicts with ECS since no defaults are provided for these fields.

Logstash hangs with multiple sqs inputs

(This issue was originally filed by @loganbhardy at elastic/logstash#2884)


Logstash 1.4.2

Logstash seems stable with with a single SQS input but hangs after a short time if I add a two or more to my config. There is nothing written to the logs when this occurs. The cpu gets pegged at 100% and messages are no longer picked up from the SQS queue. I was told in passing by an engineer at Elasticon that lowering the number of threads on the SQS input could help but that had no effect. I've set threads as high as 8 with a single SQS input and things seemed stable. The problem only seems to occur if I add more than two SQS inputs. One other configuration of note is that I had to disable the SQS checksum validation in order to get around an issue there. (See issue #2190) I should also note that I am running two central logstash servers with this config. I have not yet tried this with Logstash 1.5.

Here is my logstash config.

input {
sqs {
region => "us-west-1"
queue => "logstash-app1"
access_key_id => "MYACCESSID"
secret_access_key => "MYACCESSKEY"
threads => 1
use_ssl => false
}
sqs {
region => "us-west-1"
queue => "logstash-app2"
access_key_id => "MYACCESSID"
secret_access_key => "MYACCESSKEY"
threads => 1
use_ssl => false
}
sqs {
region => "us-west-1"
queue => "logstash-app3"
access_key_id => "MYACCESSID"
secret_access_key => "MYACCESSKEY"
threads => 1
use_ssl => false
}
sqs {
region => "us-west-1"
queue => "logstash-app4"
access_key_id => "MYACCESSID"
secret_access_key => "MYACCESSKEY"
threads => 1
use_ssl => false
}
}
filter {
if [type] == "app1" or "app2" or "app3" or "app4" {
mutate {
gsub => [ "message", "\x1B[([0-9]{1,2}(;[0-9]{1,2})?)?[m|K]", "" ]
}
grok {
match => [ "message", "(?m)%{TIMESTAMP_ISO8601:timestamp}\s-\s%{LOGLEVEL:level}:\s[(?[^]]+)]\s(?[^(]+)*((?[^:]+):%{INT:line})\s+%{GREEDYDATA:msg}" ]
add_tag => [ "grokked" ]
}
date {
match => [ "timestamp", "ISO8601" ]
remove_field => [ "timestamp" ]
add_tag => [ "dated" ]
}
}
}
output {
elasticsearch {
cluster => "es-metrics"
host => "localhost"
workers => 1
document_id => "%{@uuid}"
}
}

plugin stops polling on various exceptions

Two times lately, our connection to SQS was interrupted. First was due to connectivity error, second time due to an internal error at Amazon.

The exception first time is lost.

However the second time we get:

{:timestamp=>"2015-04-23T11:22:07.605000+0200", :message=>"Error reading SQS queue.", :error=>#<AWS::Errors::Base: <ServiceUnavailableException/>
>, :queue=>"sqs-transcoder", :level=>:error}

Suggest doing retries also on non-specific errors that ends up here: https://github.com/logstash-plugins/logstash-input-sqs/blob/master/lib/logstash/inputs/sqs.rb#L170

Plugin drops data when multiple objects are returned from codec.decode (json_lines)

Bug report + code fix: https://discuss.elastic.co/t/sqs-input-plugin-only-pulls-one-json-object-from-sqs-payload/50352/3

TLDR; When using a codec to created multiple events from a single sqs payload, only a single event would be ingested.

The way the code was structured would return on the first object codec.decode returned. By flattening some of the code, the json_lines codec worked again. Do you want a PR with the flattened fix, or do you have a different way you'd like to handle this?

Socket error due timeout kills the pipeline instead to go in backoff retry

  • Version: <= 3.1.2

Sometimes is the SQS plugin is in run and encounter an error from the HTTP client library, the plugin doesn't retry to reconnect but kills the pipeline.

stack trace
[2020-09-26T00:17:14,174][ERROR][logstash.javapipeline    ][main] A plugin had an unrecoverable error. Will restart this plugin.
  Pipeline_id:main
  Plugin: <LogStash::Inputs::SQS threads=>4, 
id=>"1e95d5586024e71da3c6c432de1c06d5ed6f9b05af26dfe45bfe945c182278f0", 
region=>"eu-west-1", queue=>"MonSysCloudwatchMetricsQueue", enable_metric=>true, 
codec=><LogStash::Codecs::JSON 
     id=>"json_6f7b71ca-94a4-4dcc-8b24-debafa66959d", 
    enable_metric=>true, 
    charset=>"UTF-8">, 
role_session_name=>"logstash", 
polling_frequency=>20
>
  Error: Net::OpenTimeout
  Exception: Seahorse::Client::NetworkingError
  Stack: uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/net/protocol.rb:41:in `ssl_socket_connect'
uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/net/http.rb:985:in `connect'
uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/net/http.rb:924:in `do_start'
uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/net/http.rb:919:in `start'
uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib/delegate.rb:83:in `method_missing'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/seahorse/client/net_http/connection_pool.rb:285:in `start_session'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/seahorse/client/net_http/connection_pool.rb:92:in `session_for'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/seahorse/client/net_http/handler.rb:119:in `session'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/seahorse/client/net_http/handler.rb:71:in `transmit'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/seahorse/client/net_http/handler.rb:45:in `call'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/seahorse/client/plugins/content_length.rb:12:in `call'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/aws-sdk-core/xml/error_handler.rb:8:in `call'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/aws-sdk-core/plugins/request_signer.rb:88:in `call'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/aws-sdk-core/plugins/helpful_socket_errors.rb:10:in `call'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/aws-sdk-core/plugins/retry_errors.rb:108:in `call'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/aws-sdk-core/plugins/retry_errors.rb:139:in `retry_request'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/aws-sdk-core/plugins/retry_errors.rb:122:in `retry_if_possible'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/aws-sdk-core/plugins/retry_errors.rb:110:in `call'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/aws-sdk-core/plugins/retry_errors.rb:139:in `retry_request'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/aws-sdk-core/plugins/retry_errors.rb:122:in `retry_if_possible'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/aws-sdk-core/plugins/retry_errors.rb:110:in `call'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/aws-sdk-core/plugins/retry_errors.rb:139:in `retry_request'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/aws-sdk-core/plugins/retry_errors.rb:122:in `retry_if_possible'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/aws-sdk-core/plugins/retry_errors.rb:110:in `call'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/aws-sdk-core/plugins/sqs_queue_urls.rb:13:in `call'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/aws-sdk-core/query/handler.rb:27:in `call'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/aws-sdk-core/plugins/user_agent.rb:12:in `call'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/aws-sdk-core/plugins/endpoint_pattern.rb:27:in `call'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/aws-sdk-core/plugins/endpoint_discovery.rb:67:in `call'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/seahorse/client/plugins/endpoint.rb:41:in `call'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/aws-sdk-core/plugins/param_validator.rb:21:in `call'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/seahorse/client/plugins/raise_response_errors.rb:14:in `call'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/aws-sdk-core/plugins/jsonvalue_converter.rb:20:in `call'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/aws-sdk-core/plugins/idempotency_token.rb:18:in `call'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/aws-sdk-core/plugins/param_converter.rb:20:in `call'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/seahorse/client/plugins/response_target.rb:21:in `call'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/seahorse/client/request.rb:70:in `send_request'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-core-2.11.429/lib/seahorse/client/base.rb:207:in `block in define_operation_methods'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-resources-2.11.429/lib/aws-sdk-resources/services/sqs/queue_poller.rb:390:in `send_request'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-resources-2.11.429/lib/aws-sdk-resources/services/sqs/queue_poller.rb:383:in `get_messages'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-resources-2.11.429/lib/aws-sdk-resources/services/sqs/queue_poller.rb:332:in `block in poll'
org/jruby/RubyKernel.java:1446:in `loop'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-resources-2.11.429/lib/aws-sdk-resources/services/sqs/queue_poller.rb:331:in `block in poll'
org/jruby/RubyKernel.java:1193:in `catch'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/aws-sdk-resources-2.11.429/lib/aws-sdk-resources/services/sqs/queue_poller.rb:330:in `poll'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-sqs-3.1.2/lib/logstash/inputs/sqs.rb:143:in `block in run'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-sqs-3.1.2/lib/logstash/inputs/sqs.rb:164:in `run_with_backoff'
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-sqs-3.1.2/lib/logstash/inputs/sqs.rb:142:in `run'
/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:328:in `inputworker'
/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:320:in `block in start_input'

the problem is that the client throws Seahorse::Client::NetworkingError in case of Net::OpenTimeout but the rescue code

rescue Aws::SQS::Errors::ServiceError => e
handle only subclasses of Aws::SQS::Errors::ServiceError

ap-south-1 is not a valid region

I have configured glacier in centos 7 and after making necessary changes in ./glacier-cmd I'm getting an error by stating
str: Invalid region. Available regions for Amazon Glacier are:
us-east-1 (US - Virginia)
us-west-1 (US - N. California)
us-west-2 (US - Oregon)
eu-west-1 (EU - Ireland)
eu-central-1 (EU - Frankfurt)
sa-east-1 (South America - Sao Paulo)
ap-northeast-1 (Asia-Pacific - Tokyo)
ap-southeast-1 (Asia Pacific (Singapore)
ap-southeast-2 (Asia-Pacific - Sydney)
Caused by: 1 exception
|| str: Invalid region code: ap-south-1

Unable to run simple SQS tests

Logstash Version 5.3.0
OS: mac OS Sierra 10.12.3
Config File:

input{
	sqs{
		queue => "queue"
	}
}

output {
	stdout { codec => json_lines }   
}

ERROR:

[2017-05-01T12:38:22,898][ERROR][logstash.pipeline ] Error registering plugin {:plugin=>"<LogStash::Inputs::SQS queue=>\"arn:aws:sqs:us-east-1:623692085147:gogo-billing-realtime-fig2\", id=>\"d4297e7362463b9e28ffd54d552eb1314e0502fe-1\", enable_metric=>true, codec=><LogStash::Codecs::JSON id=>\"json_5122233f-4067-409f-ab92-f60a58b82272\", enable_metric=>true, charset=>\"UTF-8\">, threads=>1, region=>\"us-east-1\", polling_frequency=>20>", :error=>"uninitialized constant Aws::Client::Errors"}

aws sqs receive-message --queue-url=https://sqs.us-east-1.amazonaws.com/id/gogo-queue works from AWS CLI..

Testing logstash-input-sqs with ElasticMQ-SQS locally

I am developing an application that uses the Amazon Simple Queue Service (SQS) for testing locally without having to connect to the real SQS every time.

https://s3-eu-west-1.amazonaws.com/softwaremill-public/elasticmq-server-0.13.8.jar

Could you please add queue_url option for example http://sqs-local:80/queue/#{queue_name} to allow polling from the sqs on your localhost ?

Thanks

SQS input failing to pull whole message when payload contains multiple json objects. Only first object returned.

Please post all product and debugging questions on our forum. Your questions will reach our wider community members there, and if we confirm that there is a bug, then we can open a new issue here.

Currently running into an issue with polling SQS. The SQS message contains multiple JSON objects but SQS is only returning the first object. The issue is describe here as well I was looking to see if the fix was merged but did not see it in the sqs.rb file.

https://discuss.elastic.co/t/sqs-input-plugin-only-pulls-one-json-object-from-sqs-payload/50352

stdout shows the first json in the message but I cannot get the other json objects.

For all general issues, please provide the following details for fast resolution:

  • Version: 3.1.1
  • Operating System: Centos 7.3
  • Config File (if you have sensitive info, please remove it):
    input {
    sqs {
    access_key_id =>
    secret_access_key =>
    region =>
    queue =>
    codec => "json"
    }
    }
    filter {
    mutate {
    add_field => {"awslogtype" => "AWSVPCFlowLogs"}
    }
    }

output {
stdout {codec => rubydebug}
tcp {
host => ""
port => 514
codec => json_lines

}

}

  • Sample Data:
    {"obj1": 1}
    {"obj2": 2}
    {"obj3": 3}

I have also tried
[{"obj1": 1},{"obj2": 2},{"obj3": 3}]
and
{"obj1": 1},{"obj2": 2},{"obj3": 3}
and
{"obj1": 1}{"obj2": 2}{"obj3": 3}

  • Steps to Reproduce:
    Just add the messages to a SQS then look at the stdout

Thanks

IAM policy sample missing required Actions in documentation

  • Version: current
  • Operating System:n/a

The documentation for both the sqs input and output plugins provide a sample IAM policy document which is missing Action keys listed directly above it in the docs:

The "consumer" identity must have the following permissions on the queue:
sqs:ChangeMessageVisibility
sqs:ChangeMessageVisibilityBatch
sqs:DeleteMessage
sqs:DeleteMessageBatch
sqs:GetQueueAttributes
sqs:GetQueueUrl
sqs:ListQueues
sqs:ReceiveMessage

yet the sample policy is missing DeleteMessage and DeleteMessageBatch:

{
  "Statement": [
    {
      "Action": [
        "sqs:ChangeMessageVisibility",
        "sqs:ChangeMessageVisibilityBatch",
        "sqs:GetQueueAttributes",
        "sqs:GetQueueUrl",
        "sqs:ListQueues",
        "sqs:SendMessage",
        "sqs:SendMessageBatch"
      ],
      "Effect": "Allow",
      "Resource": [
        "arn:aws:sqs:us-east-1:123456789012:Logstash"
      ]
    }
  ]
}

There is a similar issue in the output plugin which I will open seperately, and imo an issue with the error handling of AccessDenied errors, since logstash doesnt log which action is attempting to be done, and Cloudtrail doesnt log SQS API calls.

Feb 02 18:27:11 ip-10-202-5-90 logstash[410]: {:timestamp=>"2017-02-02T18:27:11.273000+0000", :message=>"Aws::SQS::Errors::ServiceError ... retrying SQS request with exponential backoff", :queue=>"logstash_logs_us-west-1_staging_logstash-test-emitter", :sleep_time=>1, :error=>#<Aws::SQS::Errors::AccessDenied: Access to the resource https://sqs.us-west-1.amazonaws.com/[REDACTED]/logstash_logs_us-west-1_staging_logstash-test-emitter is denied.>, :level=>:warn}

Logstash SQS too slow to process

(This issue was originally filed by @PraveenJayakrishnakumar at elastic/logstash#2515)


I observed that logstash SQS input queue is making too slow to process the logs and sometimes it breaks the process, if it processing more log messages.

Following error messages am getting when it started to process more logs,

  1. Errno::EBADF: Bad file descriptor - Bad file descriptor
  2. AWS::SQS::Errors::SignatureDoesNotMatch
  3. AWS::SQS::Errors::BatchRequestTooLong
  4. AWS::SQS::Errors::ChecksumError

Support for Message Attributes

Is it possible to fetch the message attributes with the plugin or is there a workaround to get the message attributes?

Please post all product and debugging questions on our forum. Your questions will reach our wider community members there, and if we confirm that there is a bug, then we can open a new issue here.

For all general issues, please provide the following details for fast resolution:

  • Version:
  • Operating System:
  • Config File (if you have sensitive info, please remove it):
  • Sample Data:
  • Steps to Reproduce:

SQS Plugin seems to not work on docker

Hello !

We got an issue with AWS SQS Input plugin, we are trying to use the plugin in a redhat from AWS with Logstash 6.5 installed by ourself on a docker environnement. (we are not using AWS managed services)

When we are trying to start the docker, we got those errors (with debbug mode on):

[2019-01-07T17:57:58,715][DEBUG][logstash.agent           ] starting pipeline {:id=>"main"}
[2019-01-07T17:57:58,719][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>250}
[2019-01-07T17:57:58,729][ERROR][logstash.pipeline        ] Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash. {"exception"=>"undefined method `to_hash' for []:Array", "backtrace"=>["(eval):19:in `filter_func'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:398:in `filter_batch'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:379:in `worker_loop'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:342:in `start_workers'"]}
[2019-01-07T17:57:58,739][INFO ][logstash.inputs.sqs      ] Registering SQS input {:queue=>"SQS-Logstash-bit"}
[2019-01-07T17:57:58,750][ERROR][logstash.pipeline        ] Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash. {"exception"=>"undefined method `to_hash' for []:Array", "backtrace"=>["(eval):19:in `filter_func'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:398:in `filter_batch'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:379:in `worker_loop'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:342:in `start_workers'"]}
[2019-01-07T17:57:58,910][FATAL][logstash.runner          ] An unexpected error occurred! {:error=>#<NoMethodError: undefined method `to_hash' for []:Array>, :backtrace=>["(eval):19:in `filter_func'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:398:in `filter_batch'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:379:in `worker_loop'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:342:in `start_workers'"]}

We first thought of a problem with AWS connectivity or user right, so we tried to use AWSCLI in the same docker container and informations to directly poll the SQS Queue wishing that will also fail but... it worked perfectly.
Also, we tried to get the url queue with the same informations as the input configuration and it also worked.

Finaly we installed logstash directly on the host and it's working with the same configuration.

Any idea why ?

Here is our logstash configuration :

input {
        sqs {
                id => "SQS-Logstash-bit"
                access_key_id => "SuperAccessKeyId"
                secret_access_key => "SuperSecretAccessKey"
                queue => "SQS-Logstash-bit"
                region => "eu-west-1"
                threads => 1
        }
}

filter {
}


output {
 stdout { codec => rubydebug {  metadata => true } }
}

sqs input with 20 threads drops avg throughput to 1/4th after ~2 hours

moved from logstash-plugins/logstash-output-sqs#2
original report from @abonuccelli-es


Logstash 1.4.2 pulling events from 2 SQS queues(assume infinite number of events) and feeding into ES with transport using 1 worker, keeps throughput steady to 400 indexing request per second towards ES, then after ~90 minutes indexing requests to ES drop to 100. (No issues whatsoever on ES side).

Reproduced on AWS

Logstash config

input { 
    sqs {
                access_key_id => "secret"
                secret_access_key => "secret"
                queue => "test1"
                region => "xxxxxxxx"
                threads => 20
            }

    sqs {
                access_key_id => "secret"
                secret_access_key => "secret"
                queue => "test2"
                region => "xxxxxxxxx"
                threads => 20
            }
} 

filter { 
}

output { 

        stdout { codec => rubydebug }

    elasticsearch {
                cluster => "xxxxxx"
                protocol => "transport"
                workers => 1
                host => "xxxxxxxxxxx"
                port => "9300"
                index => "xxxxxxx"

        }   
} 

LS debug log and output of

while true;do date >> /tmp/7113-stats.log;jstat  -gccapacity 13682 >> /tmp/7113-stats.log ;jstat -gc 13682 >> /tmp/7113-stats.log; sleep 5;done

at https://s3-eu-west-1.amazonaws.com/users.eu.elasticsearch.org/abonuccelli/7113-reproduced.tar.gz

Attaching es screenshots showing indexing requests going down

screen shot 2015-02-24 at 19 02 16
screen shot 2015-02-24 at 19 02 24

marvel time is GMT+1 (sudden drop plotted between ~18:45-18:47)
logs time is GMT

SignatureDoesNotMatch and MissingCredentialsError issues

Sporadically logstash gets the following errors that breaks the pulling loop and at some point all threads stop pulling messages from sqs queue :

logstash.log-20150218.gz:{:timestamp=>"2015-02-17T07:37:33.181000+0000", :message=>"Error reading SQS queue.", :error=>#<AWS::SQS::Errors::SignatureDoesNotMatch: The request signature we calculated does not match the signature you provided. Check your AWS Secret Access Key and signing method. Consult the service documentation for details.

logstash.log-20150223.gz:{:timestamp=>"2015-02-22T20:02:04.994000+0000", :message=>"Error reading SQS queue.", :error=>#<AWS::Errors::MissingCredentialsError:

I believe similar problem has been reported in logstash sqs output

Improve Debug Logging

Determining the source of any connection errors in this plugin is tricky, due to the lack of debugging in the plugin, with little to no debug logging to understand where issues are happening.

As well as a lack of plugin level logging, this plugin does not provide any ability to provide wire level tracing via passing additional_settings through to the underlying library that is used to connected to SQS - see https://www.elastic.co/guide/en/logstash/current/plugins-inputs-s3.html#plugins-inputs-s3-additional_settings for an example.

By including additional_settings, the user is able to change the log level of the AWS SDK, and also set http_wire_trace to track the requests to/from AWS. See https://docs.aws.amazon.com/sdk-for-ruby/v2/api/Aws/SQS/Client.html#initialize-instance_method for details of applicable additional_settings

Error: undefined method `message_count' for #<Aws::SQS::QueuePoller::PollerStats

A plugin had an unrecoverable error. Will restart this plugin.
  Plugin: <LogStash::Inputs::SQS access_key_id=>"AAAAAAA", secret_access_key=>"ZZZZZZ", queue=>"logstashbrokerqueue", region=>"us-east-1", threads=>4, debug=>false, codec=><LogStash::Codecs::JSON charset=>"UTF-8">, polling_frequency=>20>
  Error: undefined method `message_count' for #<Aws::SQS::QueuePoller::PollerStats:0x160b6c12>
  Exception: NoMethodError
  Stack: /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-sqs-1.1.0/lib/logstash/inputs/sqs.rb:148:in `run'
/opt/logstash/vendor/bundle/jruby/1.9/gems/aws-sdk-resources-2.1.33/lib/aws-sdk-resources/services/sqs/queue_poller.rb:416:in `yield_messages'
/opt/logstash/vendor/bundle/jruby/1.9/gems/aws-sdk-resources-2.1.33/lib/aws-sdk-resources/services/sqs/queue_poller.rb:405:in `process_messages'
org/jruby/RubyKernel.java:1242:in `catch'
/opt/logstash/vendor/bundle/jruby/1.9/gems/aws-sdk-resources-2.1.33/lib/aws-sdk-resources/services/sqs/queue_poller.rb:404:in `process_messages'
/opt/logstash/vendor/bundle/jruby/1.9/gems/aws-sdk-resources-2.1.33/lib/aws-sdk-resources/services/sqs/queue_poller.rb:336:in `poll'
org/jruby/RubyKernel.java:1479:in `loop'
/opt/logstash/vendor/bundle/jruby/1.9/gems/aws-sdk-resources-2.1.33/lib/aws-sdk-resources/services/sqs/queue_poller.rb:331:in `poll'
org/jruby/RubyKernel.java:1242:in `catch'
/opt/logstash/vendor/bundle/jruby/1.9/gems/aws-sdk-resources-2.1.33/lib/aws-sdk-resources/services/sqs/queue_poller.rb:330:in `poll'
/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-sqs-1.1.0/lib/logstash/inputs/sqs.rb:143:in `run'
org/jruby/RubyProc.java:271:in `call'
/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-sqs-1.1.0/lib/logstash/inputs/sqs.rb:166:in `run_with_backoff'
/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-sqs-1.1.0/lib/logstash/inputs/sqs.rb:142:in `run'
/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.5-java/lib/logstash/pipeline.rb:177:in `inputworker'
/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.5-java/lib/logstash/pipeline.rb:171:in `start_input' {:level=>:error, :file=>"logstash/pipeline.rb", :line=>"182", :method=>"inputworker"}
/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-sqs-1.1.0/lib/logstash/inputs/sqs.rb:148:in `run'
/opt/logstash/vendor/bundle/jruby/1.9/gems/aws-sdk-resources-2.1.33/lib/aws-sdk-resources/services/sqs/queue_poller.rb:416:in `yield_messages'
/opt/logstash/vendor/bundle/jruby/1.9/gems/aws-sdk-resources-2.1.33/lib/aws-sdk-resources/services/sqs/queue_poller.rb:405:in `process_messages'
org/jruby/RubyKernel.java:1242:in `catch'
/opt/logstash/vendor/bundle/jruby/1.9/gems/aws-sdk-resources-2.1.33/lib/aws-sdk-resources/services/sqs/queue_poller.rb:404:in `process_messages'
/opt/logstash/vendor/bundle/jruby/1.9/gems/aws-sdk-resources-2.1.33/lib/aws-sdk-resources/services/sqs/queue_poller.rb:336:in `poll'
org/jruby/RubyKernel.java:1479:in `loop'
/opt/logstash/vendor/bundle/jruby/1.9/gems/aws-sdk-resources-2.1.33/lib/aws-sdk-resources/services/sqs/queue_poller.rb:331:in `poll'
org/jruby/RubyKernel.java:1242:in `catch'
/opt/logstash/vendor/bundle/jruby/1.9/gems/aws-sdk-resources-2.1.33/lib/aws-sdk-resources/services/sqs/queue_poller.rb:330:in `poll'
/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-sqs-1.1.0/lib/logstash/inputs/sqs.rb:143:in `run'
org/jruby/RubyProc.java:271:in `call'
/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-sqs-1.1.0/lib/logstash/inputs/sqs.rb:166:in `run_with_backoff'
/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-sqs-1.1.0/lib/logstash/inputs/sqs.rb:142:in `run'
/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.5-java/lib/logstash/pipeline.rb:177:in `inputworker'
/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.5-java/lib/logstash/pipeline.rb:171:in `start_input'

Support for us-east-2?

Any plans to support us-east-2? Should region list additions be configurable?

input {
sqs {
# This setting must be a ["us-east-1", "us-west-1", "us-west-2", "eu-central-1", "eu-west-1", "ap-southeast-1", "ap-southeast-2", "ap-northeast-1", "ap-northeast-2", "sa-east-1", "us-gov-west-1", "cn-north-1", "ap-south-1"]
# Expected one of ["us-east-1", "us-west-1", "us-west-2", "eu-central-1", "eu-west-1", "ap-southeast-1", "ap-southeast-2", "ap-northeast-1", "ap-northeast-2", "sa-east-1", "us-gov-west-1", "cn-north-1", "ap-south-1"], got ["us-east-2"]
region => "us-east-2"

SQS input plugin just stops working, no errors

Logstash information:

Please include the following information:

  1. Logstash version (e.g. bin/logstash --version) - 7.10.2
  2. Logstash installation source (e.g. built from source, with a package manager: DEB/RPM, expanded from tar or zip archive, docker) - installed with apt-get
  3. How is Logstash being run (e.g. as a service/service manager: systemd, upstart, etc. Via command line, docker/kubernetes) - via command line
  4. How was the Logstash Plugin installed - command line

JVM (e.g. java -version): 1.8.0_312

If the affected version of Logstash is 7.9 (or earlier), or if it is NOT using the bundled JDK or using the 'no-jdk' version in 7.10 (or higher), please provide the following information:

  1. JVM version (java -version)
  2. JVM installation source (e.g. from the Operating System's package manager, from source, etc).
  3. Value of the JAVA_HOME environment variable if set.

OS version (uname -a if on a Unix-like system): Linux ip-10-210-30-158 5.11.0-1028-aws #31~20.04.1-Ubuntu SMP Fri Jan 14 14:37:50 UTC 2022 x86_64 x86_64 x86_64 GNU/Linux

Description of the problem including expected versus actual behavior:
Logstash just stops consuming SQS messages on the configured queue. There are no errors in the logs, it consumes whatever is on the queue when it starts up and then messages just pile up on the queue after that until i restart logstash.

Steps to reproduce:

Please include a minimal but complete recreation of the problem,
including (e.g.) pipeline definition(s), settings, locale, etc. The easier
you make for us to reproduce it, the more likely that somebody will take the
time to look at it.

This is my dead-simple input:
input {
sqs {
access_key_id => "xxxxxxxxxxxxx"
secret_access_key => "yyyyyyyyyyyyyy"
queue => "places"
region => "us-west-2"
}
}

Provide logs (if relevant):

Warn messages repeatedly logged when polling_frequency is higher than 20.

Using version 3.0.3 with Logstash 5.4 on RedHat Linux, setting polling_frequency => 30, results in the following WARN message being repeatedly logged:

[2017-05-04T22:05:23,818][WARN ][logstash.inputs.sqs ] Aws::SQS::Errors::ServiceError ... retrying SQS request with exponential backoff {:queue=>"SQS_QUEUE_NAME", :sleep_time=>1, :error=>#<Aws::SQS::Errors::InvalidParameterValue: Value 30 for parameter WaitTimeSeconds is invalid. Reason: Must be >= 0 and <= 20, if provided.>}

It appears that the desired value may not actually be getting applied and is reset to 20. Configuration is similar to following:

input {
  sqs {
    queue => "SQS_QUEUE_NAME"
    region => "us-east-1"
    access_key_id => "SOME_KEYID"
    secret_access_key => "SOME_SECRET"
    polling_frequency => 30
  }
} 

Not supporting cross account access while using proxyuri

Please post all product and debugging questions on our forum. Your questions will reach our wider community members there, and if we confirm that there is a bug, then we can open a new issue here.

For all general issues, please provide the following details for fast resolution:

  • Version:
  • Operating System: windows 10
  • Config File (if you have sensitive info, please remove it):
    input {
    sqs {
    region => "us-east-1"
    queue => "testingjk"
    access_key_id => "access key"
    secret_access_key => "secret key"
    role_arn => "arn:aws:iam::accountid:role/rolename"
    proxy_uri => "url"
    }
    }

output {
stdout { }
}

  • Sample Data:
  • Steps to Reproduce:

Plugin won't shutdown if no events coming through the SQS queue

I've noticed that if an SQS queue has no events flowing through it, then the plugin won't shut down.

The use case I have for a queue without events flowing through it is that I use SQS as a fallback transport, and only send data through it when my primary transport is down or having issues streaming.

Looking at the code, it seems that breaking out of the polling loop happens only when there are messages to loop through:
https://github.com/logstash-plugins/logstash-input-sqs/blob/master/lib/logstash/inputs/sqs.rb#L143

seems there needs to be a before_request callback (http://docs.aws.amazon.com/sdkforruby/api/Aws/SQS/QueuePoller.html#before_request-instance_method) that checks for stopped and throws ':stop_polling' to break out of the processing loop even if there are no events flowing through the queue.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.