logstash-plugins / logstash-filter-jdbc_streaming Goto Github PK
View Code? Open in Web Editor NEWA Logstash filter that can enrich events with data from a database
License: Apache License 2.0
A Logstash filter that can enrich events with data from a database
License: Apache License 2.0
I want to send more concurrent filter queries to speed up processing of large batches of events. I've been unable to find any documentation that might explain the behavior here. Can anybody clear things up for me?
Since Sequel
supports connection pooling by default, exposing the ability to control aspects of pooling up to the pipeline configuration should be pretty straight-forward.
This is my .conf file configuration
input{
jdbc {
jdbc_connection_string => "jdbc:sqlserver://testserver;databaseName=TEST_DEV;user=me;password=secret;"
jdbc_user => "me"
jdbc_password => "secret"
jdbc_driver_library => "../jre8/sqljdbc42.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
# Shedule is once per hour or customise in cron type syntax
schedule => "* * * * *"
statement => "SELECT * from PROPERTY_MASTER_ORS.C_B_LEASE_COMPARABLES"
# Uncomment and experiement for large tables
#jdbc_paging_enabled => true
#jdbc_page_size => 15000
#jdbc_fetch_size => 10000
}
}
filter {
jdbc_streaming {
jdbc_driver_library => "../jre8/sqljdbc42.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://testserver;databaseName=TEST_DEV;user=me;password=secret;"
jdbc_user => "me"
jdbc_password => "secret"
statement => "SELECT * from PROPERTY_MASTER_ORS.C_B_LEASE_COMP_FLR where lease_comaparable_id = :id"
parameters => { "id" =>"rowid_object"}
target => "lease_comp_floor"
}
}
output {
elasticsearch {
hosts => "http://estestcloud.cloudapp.net:9200"
index => "rpdi-es-1.0"
document_type => "lease-comp"
document_id => "%{rowid_object}"
}
stdout {
codec => rubydebug
}
}
So it is fetching few records, after that i am getting the below error.
[2017-07-13T18:33:47,081][INFO ]logstash.inputs.jdbc SELECT * from PROPERTY_MASTER_ORS.C_B_LEASE_COMPARABLES
[2017-07-13T18:33:47,867][ERROR][logstash.pipeline ] t.ruby_set_field(JrubyEventExtLibrary.java:128)", "org.logstash.ext.JrubyEventExtLibrary$RubyEvent$INVOKER$i$2$0$ruby_set_field.call(JrubyEventExtLibrary$RubyEvent$INVOKER$i$2$0$ruby_set_field.gen)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:202)", "org.jruby.ast.CallTwoArgNode.interpret(CallTwoArgNode.java:59)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)", "org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:74)", "org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:225)", "org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:219)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:202)", "org.jruby.ast.FCallTwoArgNode.interpret(FCallTwoArgNode.java:38)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.ast.IfNode.interpret(IfNode.java:118)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)", "org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:74)", "org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:204)", "org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:211)", "org.jruby.runtime.callsite.CachingCallSite.callBlock(CachingCallSite.java:177)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:183)", "org.jruby.ast.FCallOneArgBlockPassNode.interpret(FCallOneArgBlockPassNode.java:32)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)", "org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:74)", "org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:204)", "org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:211)", "org.jruby.runtime.callsite.CachingCallSite.callBlock(CachingCallSite.java:177)", "org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:188)", "org.jruby.ast.FCallOneArgBlockNode.interpret(FCallOneArgBlockNode.java:34)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)", "org.jruby.ast.IfNode.interpret(IfNode.java:118)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.evaluator.ASTInterpreter.INTERPRET_BLOCK(ASTInterpreter.java:112)", "org.jruby.runtime.Interpreted19Block.evalBlockBody(Interpreted19Block.java:206)", "org.jruby.runtime.Interpreted19Block.yield(Interpreted19Block.java:157)", "org.jruby.runtime.Block.yield(Block.java:142)", "org.jruby.RubyArray.eachCommon(RubyArray.java:1606)", "org.jruby.RubyArray.each(RubyArray.java:1613)", "org.jruby.RubyArray$INVOKER$i$0$0$each.call(RubyArray$INVOKER$i$0$0$each.gen)", "org.jruby.runtime.callsite.CachingCallSite.callBlock(CachingCallSite.java:143)", "org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:154)", "org.jruby.ast.CallNoArgBlockNode.interpret(CallNoArgBlockNode.java:64)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)", "org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:74)", "org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:182)", "org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:203)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:168)", "org.jruby.ast.CallOneArgNode.interpret(CallOneArgNode.java:57)", "org.jruby.ast.LocalAsgnNode.interpret(LocalAsgnNode.java:123)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)"}]
Any suggestion please.
This is just to report/track that the same issue and same workaround solution on logstash 7.4.0 with jdbc_streaming filter plugin as for jdbc_static reported in logstash-plugins/logstash-filter-jdbc_static#47
In this case issue occurred when using
jdbc_streaming {
jdbc_driver_library => "/usr/share/logstash/x-pack-sql-jdbc-7.4.0.jar"
The same workaround solution worked - remove jdbc_driver_library setting or pass empty string and put the jar file directly in /usr/share/logstash/logstash-core/lib/jars/x-pack-sql-jdbc-7.4.0.jar
I use your plugin to enrich events with data from external reputation database. My goal is to process the enriched data from my external database with JDBC support using visualization in Kibana.
There is a key snippet from my logstash.conf
:
statement => "select feed_name, first_seen, last_added from feeds_aggregated where ip = :lookupIP"
parameters => { "lookupIP" => "ip" }
target => "[enrich][fia_nested]"
After Logstash processing I get enriched data with such structure in Elasticsearch:
"enrich": {
"fia_nested": [
{
"feed_name": "blocklist_de_apache",
"first_seen": "2018-07-19T14:16:03.714Z",
"last_added": "2018-07-19T14:16:03.714Z"
},
{
"feed_name": "urandomusto_http",
"first_seen": "2018-07-19T14:18:03.829Z",
"last_added": "2018-07-19T14:18:03.829Z"
}
]
The restriction of Kibana does not allow properly processing nested objects. (https://www.elastic.co/guide/en/kibana/current/nested-objects.html).
How can I manage the result data structure using you plugin?
Please provide the parameter jdbc_password_filepath
for storing the password in a separate file.
This is necessary to separate secrets from configuration.
I'd like to report a bug that plugin does not recognize multiple JAR files in class path. For instance that's my config file for this filter:
jdbc_streaming {
id => "via-categories-stream"
jdbc_user => "user"
jdbc_driver_library => "/usr/share/logstash/drivers/GoogleBigQueryJDBC42.jar,/usr/share/logstash/drivers/jackson-core-2.1.3.jar,/usr/share/logstash/drivers/google-oauth-client-1.24.1.jar,/usr/share/logstash/drivers/google-http-client-jackson2-1.24.1.jar,/usr/share/logstash/drivers/google-http-client-1.24.1.jar,/usr/share/logstash/drivers/google-api-services-bigquery-v2-rev400-1.24.1.jar,/usr/share/logstash/drivers/google-api-client-1.24.1.jar"
jdbc_driver_class => "com.simba.googlebigquery.jdbc42.Driver"
jdbc_connection_string => ".."
statement => "SELECT categoryId, countryId, brandId, supplierId FROM UNNEST ((SELECT filters FROM migration_search.categories WHERE categoryid=:id))"
parameters => { "id" => "categoryid"}
target => "filters"
}
And that's the exception I receive:
[2019-01-22T13:49:41,291][ERROR][logstash.filters.jdbcstreaming] Invalid setting for jdbc_streaming filter plugin:
filter {
jdbc_streaming {
# This setting must be a path
# File does not exist or cannot be opened /usr/share/logstash/drivers/GoogleBigQueryJDBC42.jar,/usr/share/logstash/drivers/jackson-core-2.1.3.jar,/usr/share/logstash/drivers/google-oauth-client-1.24.1.jar,/usr/share/logstash/drivers/google-http-client-jackson2-1.24.1.jar,/usr/share/logstash/drivers/google-http-client-1.24.1.jar,/usr/share/logstash/drivers/google-api-services-bigquery-v2-rev400-1.24.1.jar,/usr/share/logstash/drivers/google-api-client-1.24.1.jar
jdbc_driver_library => "/usr/share/logstash/drivers/GoogleBigQueryJDBC42.jar,/usr/share/logstash/drivers/jackson-core-2.1.3.jar,/usr/share/logstash/drivers/google-oauth-client-1.24.1.jar,/usr/share/logstash/drivers/google-http-client-jackson2-1.24.1.jar,/usr/share/logstash/drivers/google-http-client-1.24.1.jar,/usr/share/logstash/drivers/google-api-services-bigquery-v2-rev400-1.24.1.jar,/usr/share/logstash/drivers/google-api-client-1.24.1.jar"
...
}
}
Very same configuration works with JDBC input filter, so I assume that it doesn't recognize multiple JARs in classpath
Outlining some requirements to ship v1. For reference purposes, the meta issue is here.
Currently as-is, the plugin will execute a SQL query per event, and store the full result set in a new field. This requires a query and network round trip for each event, which isn't ideal. We should make this more dynamic...
Enable the ability to lookup and enrich events from relational databases, commonly needed for users running CMDBs or using traditional data warehousing.
When Logstash first starts up, it should automatically query the DB and load the result set locally in cache. For fast retrieval, each row can probably be hashed by the primary or composite key. Since we always know the key for every event, it should be O(1) access.
filter {
jdbc {
# Setup JDBC connection
jdbc_driver_library => "/path/to/mysql-connector-java-5.1.34-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => ""jdbc:mysql://localhost:3306/mydatabase"
jdbc_user => "me"
jdbc_password => "secret"
# JDBC query statement; dataset to be cached for enrichment purposes
statement => "select * from WORLD.COUNTRY"
# Define the lookup keys used to lookup and enrich DB rows to Logstash events. It maps the DB column name(s)
# to the field name(s) in the event, essentially conducting a "join" on this key.
# Each lookup should return a single unique result, or else it will just take first result found for enrichment.
# Both primary and composite keys are supported. Composite keys will have multiple keys specified, like below.
# DB column names referenced must be available in the query result dataset.
# (Required) - Syntax: "event field name" => "DB column name"
lookup_keys => {
"code" => "country_code"
"org" => "organization"
}
# Define one or more DB columns to enrich the Logstash event with.
# DB column names referenced must be available in the query result dataset.
# The new Logstash event field name(s) will be the same as the respective column(s) it came from.
# (Optional) - if undefined, adds all fields (excluding lookup_key fields) to event top level.
target_fields => ["country_name", "country_details"]
# Cache settings
refresh_schedule => "0 * * * *" # Refresh cache every hour. Cron syntax like the JDBC input scheduler. (Optional) - default = never refresh
max_cache_size => "1MB" # (Optional) - need to find a good default for this, maybe 500MB?
}
}
If lookups fail, we can tag it "_jdbclookupfailure".
I'm using this filter to enhance reporting for suricata alerts, and I've recently discovered that when a blank result is returned, it appears to stop processing that particular event. Not sure if this is intended behavior, but if so, it could be beneficial to have a boolean that would allow the output variables to be set to, say, blank_result_returned when this happens.
When i bind an array like [1,2,3,4]
into prepared_statement_bind_values
, the real executing sql is '1,2,3,4'
. That's bad news for IN
in sql.
What i want:
select .... where somefield IN (1,2,3,4)
But:
select ... where somefield IN('1,2,3,4')
I'm having issues getting it running with JRE11 and MSSQL JDBC.
Here are the errors:
[2019-04-11T10:12:52,426][ERROR][logstash.javapipeline ] Pipeline aborted due to error {:pipeline_id=>"main", :exception=>#<Sequel::AdapterNotFound: com.microsoft.sqlserver.jdbc.SQLServerDriver not loaded>, :backtrace=>["/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/sequel-5.19.0/lib/sequel/adapters/jdbc.rb:56:in `load_driver'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-jdbc_streaming-1.0.5/lib/logstash/plugin_mixins/jdbc_streaming.rb:52:in `prepare_jdbc_connection'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-jdbc_streaming-1.0.5/lib/logstash/filters/jdbc_streaming.rb:194:in `prepare_connected_jdbc_cache'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-jdbc_streaming-1.0.5/lib/logstash/filters/jdbc_streaming.rb:116:in `register'", "org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java:56:in `register'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:192:in `block in register_plugins'", "org/jruby/RubyArray.java:1792:in `each'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:191:in `register_plugins'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:447:in `maybe_setup_out_plugins'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:204:in `start_workers'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:146:in `run'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:105:in `block in start'"], :thread=>"#<Thread:0x612bbead@/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:102 run>"}
[2019-04-11T10:12:52,428][ERROR][logstash.agent ] Failed to execute action {:id=>:main, :action_type=>LogStash::ConvergeResult::FailedAction, :message=>"Could not execute action: PipelineAction::Create<main>, action_result: false", :backtrace=>nil}
And here is my config:
jdbc_streaming {
jdbc_validate_connection => true
jdbc_validation_timeout => 300
cache_expiration => 60
cache_size => 20000
jdbc_driver_library => '/etc/logstash/sqljdbc/mssql-jdbc-7.2.1.jre11.jar'
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://[Connection Removed]"
jdbc_user => '[User Removed]'
jdbc_password => "[Password Removed]"
statement => "[Query removed]"
parameters => { "IPAddy" => "queryip" }
target => "target"
Note that is does work with jre8 and the mssql-jdbc-7.2.1.jre8.jar JAR.
The current code executes the query for every event. To improve performance, we should provide an LRU cache to store the values returned after the first DB lookup. The following config should be exposed:
cache => true
cache_expiration => 5ms
cache_entries => 500
Users who don't want stale data can set the cache => false or set aggressive cache_expiration
time
Whenever there are large and complex queries it would be nice to load them from a file instead of having a multi-line SQL-Statement in the config file. This would also eliminate the annoyance of escaped quotes when using PostgreSQL and jsonb
queries
Logstash Version: 6.4 hosted on Oracle Linux Server release 6.8
Logstash-filter-jdbc_streaming Version: 1.0.4
MSSQL Driver: sqljdbc_6.0/enu/jre8/sqljdbc42.jar
Database: MSSQL 2017
input {
jdbc {
jdbc_driver_library => "/etc/logstash/ancillary/jdbc/drivers/sqljdbc_6.0/enu/jre8/sqljdbc42.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => ""
jdbc_user => ""
jdbc_password => ""
statement => ""
last_run_metadata_path => "/etc/logstash/ancillary/jdbc/.logstash_jdbc_last_run"
add_field => {
"[@metadata][elastic]" => "false"
"[@metadata][sql]" => "true"
"[@metadata][type]" => "inc"
"[@metadata][run]" => "full-pipeline"
}
}
}
filter {
jdbc_streaming {
id => "JDBCSTREAM"
jdbc_driver_library => "/etc/logstash/ancillary/jdbc/drivers/sqljdbc_6.0/enu/jre8/sqljdbc42.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => ""
jdbc_user => ""
jdbc_password => ""
statement => "SP_INCIDENT_BACKLOG_INFERENCE @INCIDENT=:inc, @RUN=:run"
parameters => { "inc" => "incident_id" "run" => "[@metadata][run]"}
target => "sql_results"
}
split { field => "sql_results" }
ruby {
code =>'
event.get("sql_results").each {|k,v| event.set(k,v)}
event.remove("sql_results")
event.remove("@timetamp")
'
}
}
output { stdout { codec => rubydebug } }
Wanted to get this logged as an issue so at least others can be aware of it. I have a discussion post going through the issue as well.
Missing Converter handling for full class name=org.jruby.RubyObjectVar3
Lead me to an old discussion topic: https://discuss.elastic.co/t/problem-with-timestamp-and-timezone/148622
Which resulted in an issue being logged for the logstash-input-jdbc plugin:
logstash-plugins/logstash-input-jdbc#302
Which lead me to guyboertje's comment pointing back to the logstash-filter-jdbc_streaming plugin: logstash-plugins/logstash-input-jdbc#302 (comment)
Given the conversation it seemed like this was a type issue, where one of my MSSQL types was not playing well with whatever this internal JRuby class (org.jruby.RubyObjectVar3) is doing. Going through some process of elimination I found the culprit. I had a single date type field being leveraged to remove the time value as it was not needed. All I had to do was cast this date type into our standard datetime2 type and everything was right with the world.
I am not sure if upgrading to a newer version of LS would solve this problem but currently that's not an option for me. Wanted to make sure this issue was documented in case someone else runs into it in the future.
Related POST with all the details: https://discuss.elastic.co/t/logstash-filter-jdbc-streaming-parameter-issue-calling-stored-procedure-mssql/209085/2?u=chris_lyons
[ERROR] 2019-05-08 05:51:21.572 [[main]-pipeline-manager] javapipeline - Pipeline aborted due to error {:pipeline_id=>"main", :exception=>#<TypeError: failed to coerce jdk.internal.loader.ClassLoaders$AppClassLoader to java.net.URLClassLoader>, :backtrace=>["org/jruby/java/addons/KernelJavaAddons.java:29:in
to_java'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-jdbc_streaming-1.0.5/lib/logstash/plugin_mixins/jdbc_streaming.rb:48:in prepare_jdbc_connection'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-jdbc_streaming-1.0.5/lib/logstash/filters/jdbc_streaming.rb:194:in
prepare_connected_jdbc_cache'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-jdbc_streaming-1.0.5/lib/logstash/filters/jdbc_streaming.rb:116:in register'", "org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java:56:in
register'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:191:in block in register_plugins'", "org/jruby/RubyArray.java:1792:in
each'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:190:in register_plugins'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:446:in
maybe_setup_out_plugins'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:203:in start_workers'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:145:in
run'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:104:in block in start'"], :thread=>"#<Thread:0x7e04c1ec run>"} [ERROR] 2019-05-08 05:51:21.597 [Converge PipelineAction::Create<main>] agent - Failed to execute action {:id=>:main, :action_type=>LogStash::ConvergeResult::FailedAction, :message=>"Could not execute action: PipelineAction::Create<main>, action_result: false", :backtrace=>nil}
My config are as follows
filter {
json {
source => "message"
}
grok {
match => {
"topic" => "tele/%{DATA:mac_address}/SENSOR"
}
}
jdbc_streaming {
jdbc_connection_string => "jdbc:mysql://10.243.252.61:3306/mydb"
jdbc_driver_library => "/root/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_user => "mydb"
jdbc_password => "mydb"
statement => "SELECT email FROM chargingstation LEFT JOIN auth_user ON chargingstation.owner_id = auth_user.id where mac_address=:mac_address1"
parameters => { "mac_address1" => "mac_address" }
target => "email"
}
}
Hello,
I thought about a new option tag_on_success
or tag_on_match
or the like. Right now we need the ability to tag an event only if the desired row was found. The common option add_tag
is applied regardless of a match or a mismatch.
Currently the workaround is to append a mutate filter that checks the absence of the tag from tag_on_default_use
and โ if so โ adds the "success tag", but that is somewhat ugly.
What do you think?
logstash (5.6.14 and 6.5.4)
logstash-filter-jdbc_streaming (1.0.4)
Hi there,
I get an error when connecting to a dababase server that refuses the connection. Unfortunately this error kills the whole pipeline and no further events are processed.
In my understanding the tags from tag_on_failure
should be applied in such a error case.
When I use a SQL query with some smalldatetime
field, I get this error :
[ERROR][logstash.pipeline] Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash.
{
"exception" => "Missing Valuefier handling for full class name=org.jruby.RubyObject, simple name=RubyObject"
"backtrace" => [
"org.logstash.Valuefier.convertNonCollection(Valuefier.java:51)"
"org.logstash.Valuefier.convert(Valuefier.java:90)"
"org.logstash.ConvertedMap$1.visit(ConvertedMap.java:43)"
"org.jruby.RubyHash.visitLimited(RubyHash.java:648)"
"org.jruby.RubyHash.visitAll(RubyHash.java:634)"
"org.logstash.ConvertedMap.newFromRubyHash(ConvertedMap.java:39)"
"org.logstash.Valuefier.convert(Valuefier.java:63)"
"org.logstash.ConvertedList.newFromRubyArray(ConvertedList.java:43)"
"org.logstash.Valuefier.convert(Valuefier.java:66)"
"org.logstash.ext.JrubyEventExtLibrary$RubyEvent.ruby_set_field(JrubyEventExtLibrary.java:128)"
"org.logstash.ext.JrubyEventExtLibrary$RubyEvent$INVOKER$i$2$0$ruby_set_field.call(JrubyEventExtLibrary$RubyEvent$INVOKER$i$2$0$ruby_set_field.gen)"
"org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:202)"
"org.jruby.ast.CallTwoArgNode.interpret(CallTwoArgNode.java:59)"
"org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)"
"org.jruby.ast.BlockNode.interpret(BlockNode.java:71)"
"org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:74)"
"org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:225)"
"org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:219)"
"org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:202)"
"org.jruby.ast.FCallTwoArgNode.interpret(FCallTwoArgNode.java:38)"
"org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)"
"org.jruby.ast.IfNode.interpret(IfNode.java:118)"
"org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)"
"org.jruby.ast.BlockNode.interpret(BlockNode.java:71)"
"org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:74)"
"org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:204)"
"org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:211)"
"org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:336)"
"org.jruby.runtime.callsite.CachingCallSite.callBlock(CachingCallSite.java:179)"
"org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:183)"
"org.jruby.ast.FCallOneArgBlockPassNode.interpret(FCallOneArgBlockPassNode.java:32)"
"org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)"
"org.jruby.ast.BlockNode.interpret(BlockNode.java:71)"
"org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:74)"
"org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:204)"
"org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:211)"
"org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:336)"
"org.jruby.runtime.callsite.CachingCallSite.callBlock(CachingCallSite.java:179)"
"org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:188)"
"org.jruby.ast.FCallOneArgBlockNode.interpret(FCallOneArgBlockNode.java:34)"
"org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)"
"org.jruby.ast.BlockNode.interpret(BlockNode.java:71)"
"org.jruby.ast.IfNode.interpret(IfNode.java:118)"
"org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)"
"org.jruby.evaluator.ASTInterpreter.INTERPRET_BLOCK(ASTInterpreter.java:112)"
"org.jruby.runtime.Interpreted19Block.evalBlockBody(Interpreted19Block.java:206)"
"org.jruby.runtime.Interpreted19Block.yield(Interpreted19Block.java:157)"
"org.jruby.runtime.Block.yield(Block.java:142)"
"org.jruby.RubyArray.eachCommon(RubyArray.java:1606)"
"org.jruby.RubyArray.each(RubyArray.java:1613)"
"org.jruby.RubyArray$INVOKER$i$0$0$each.call(RubyArray$INVOKER$i$0$0$each.gen)"
"org.jruby.runtime.callsite.CachingCallSite.callBlock(CachingCallSite.java:143)"
"org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:154)"
"org.jruby.ast.CallNoArgBlockNode.interpret(CallNoArgBlockNode.java:64)"
"org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)"
"org.jruby.ast.BlockNode.interpret(BlockNode.java:71)"
"org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:74)"
"org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:182)"
"org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:203)"
"org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:326)"
"org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:170)"
"org.jruby.ast.CallOneArgNode.interpret(CallOneArgNode.java:57)"
"org.jruby.ast.LocalAsgnNode.interpret(LocalAsgnNode.java:123)"
"org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)"
"org.jruby.ast.BlockNode.interpret(BlockNode.java:71)"
"org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:74)"
"org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:182)"
"org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:203)"
"org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:326)"
"org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:170)"
"org.jruby.ast.CallOneArgNode.interpret(CallOneArgNode.java:57)"
"org.jruby.ast.DAsgnNode.interpret(DAsgnNode.java:110)"
"org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)"
"org.jruby.ast.BlockNode.interpret(BlockNode.java:71)"
"org.jruby.evaluator.ASTInterpreter.INTERPRET_BLOCK(ASTInterpreter.java:112)"
"org.jruby.runtime.Interpreted19Block.evalBlockBody(Interpreted19Block.java:206)"
"org.jruby.runtime.Interpreted19Block.yield(Interpreted19Block.java:194)"
"org.jruby.runtime.Interpreted19Block.call(Interpreted19Block.java:125)"
"org.jruby.runtime.Block.call(Block.java:101)"
"org.jruby.RubyProc.call(RubyProc.java:300)"
"org.jruby.internal.runtime.methods.ProcMethod.call(ProcMethod.java:64)"
"org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:210)"
"org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:206)"
"org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:326)"
"org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:170)"
"rubyjit.LogStash::Pipeline$$filter_batch_e0d70a513c9b572e72031cc1bba279d4ffc82a801442407170.block_0$RUBY$__file__(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:353)"
"rubyjit$LogStash::Pipeline$$filter_batch_e0d70a513c9b572e72031cc1bba279d4ffc82a801442407170$block_0$RUBY$__file__.call(rubyjit$LogStash::Pipeline$$filter_batch_e0d70a513c9b572e72031cc1bba279d4ffc82a801442407170$block_0$RUBY$__file__)"
"org.jruby.runtime.CompiledBlock19.yield(CompiledBlock19.java:159)"
"org.jruby.runtime.CompiledBlock19.call(CompiledBlock19.java:87)"
"org.jruby.runtime.Block.call(Block.java:101)"
"org.jruby.RubyProc.call(RubyProc.java:300)"
"org.jruby.RubyProc.call19(RubyProc.java:281)"
"org.jruby.RubyProc$INVOKER$i$0$0$call19.call(RubyProc$INVOKER$i$0$0$call19.gen)"
"org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:210)"
"org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:206)"
"org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:168)"
"rubyjit.LogStash::Util::WrappedAckedQueue::ReadBatch$$each_460b01f11b82d55a13c70efd2aabe7c62fbc86591442407170.block_0$RUBY$__file__(/usr/share/logstash/logstash-core/lib/logstash/util/wrapped_acked_queue.rb:259)"
"rubyjit$LogStash::Util::WrappedAckedQueue::ReadBatch$$each_460b01f11b82d55a13c70efd2aabe7c62fbc86591442407170$block_0$RUBY$__file__.call(rubyjit$LogStash::Util::WrappedAckedQueue::ReadBatch$$each_460b01f11b82d55a13c70efd2aabe7c62fbc86591442407170$block_0$RUBY$__file__)"
"org.jruby.runtime.CompiledBlock19.yield(CompiledBlock19.java:135)"
"org.jruby.runtime.Block.yield(Block.java:142)"
"org.jruby.RubyHash$13.visit(RubyHash.java:1355)"
"org.jruby.RubyHash.visitLimited(RubyHash.java:648)"
"org.jruby.RubyHash.visitAll(RubyHash.java:634)"
"org.jruby.RubyHash.iteratorVisitAll(RubyHash.java:1306)"
"org.jruby.RubyHash.each_pairCommon(RubyHash.java:1351)"
"org.jruby.RubyHash.each19(RubyHash.java:1342)"
"org.jruby.RubyHash$INVOKER$i$0$0$each19.call(RubyHash$INVOKER$i$0$0$each19.gen)"
"org.jruby.runtime.callsite.CachingCallSite.callBlock(CachingCallSite.java:143)"
"org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:154)"
"rubyjit.LogStash::Util::WrappedAckedQueue::ReadBatch$$each_460b01f11b82d55a13c70efd2aabe7c62fbc86591442407170.__file__(/usr/share/logstash/logstash-core/lib/logstash/util/wrapped_acked_queue.rb:258)"
"rubyjit.LogStash::Util::WrappedAckedQueue::ReadBatch$$each_460b01f11b82d55a13c70efd2aabe7c62fbc86591442407170.__file__(/usr/share/logstash/logstash-core/lib/logstash/util/wrapped_acked_queue.rb)"
"org.jruby.internal.runtime.methods.JittedMethod.call(JittedMethod.java:161)"
"org.jruby.runtime.callsite.CachingCallSite.callBlock(CachingCallSite.java:143)"
"org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:154)"
"rubyjit.LogStash::Pipeline$$filter_batch_e0d70a513c9b572e72031cc1bba279d4ffc82a801442407170.chained_0_rescue_1$RUBY$SYNTHETIC__file__(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:352)"
"rubyjit.LogStash::Pipeline$$filter_batch_e0d70a513c9b572e72031cc1bba279d4ffc82a801442407170.__file__(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb)"
"rubyjit.LogStash::Pipeline$$filter_batch_e0d70a513c9b572e72031cc1bba279d4ffc82a801442407170.__file__(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb)"
"org.jruby.internal.runtime.methods.JittedMethod.call(JittedMethod.java:181)"
"org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:326)"
"org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:170)"
"org.jruby.ast.FCallOneArgNode.interpret(FCallOneArgNode.java:36)"
"org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)"
"org.jruby.ast.BlockNode.interpret(BlockNode.java:71)"
"org.jruby.ast.WhileNode.interpret(WhileNode.java:131)"
"org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)"
"org.jruby.ast.BlockNode.interpret(BlockNode.java:71)"
"org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:74)"
"org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:225)"
"org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:219)"
"org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:346)"
"org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:204)"
"org.jruby.ast.FCallTwoArgNode.interpret(FCallTwoArgNode.java:38)"
"org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)"
"org.jruby.ast.BlockNode.interpret(BlockNode.java:71)"
"org.jruby.evaluator.ASTInterpreter.INTERPRET_BLOCK(ASTInterpreter.java:112)"
"org.jruby.runtime.Interpreted19Block.evalBlockBody(Interpreted19Block.java:206)"
"org.jruby.runtime.Interpreted19Block.yield(Interpreted19Block.java:194)"
"org.jruby.runtime.Interpreted19Block.call(Interpreted19Block.java:125)"
"org.jruby.runtime.Block.call(Block.java:101)"
"org.jruby.RubyProc.call(RubyProc.java:300)"
"org.jruby.RubyProc.call(RubyProc.java:230)"
"org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:99)"
"java.lang.Thread.run(Thread.java:745)"
]
}
Hello
After some time we get the following Exception and Data is lost.
Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash
"exception"=>java.sql.SQLRecoverableException: No more data to read from socket
Version
Oracle 11.2.0.4.0
jdbc_streaming {
jdbc_connection_string => "${xxx}"
jdbc_user => "${xxx}"
jdbc_password => "${xxx}"
jdbc_validate_connection => true
jdbc_driver_library => "/usr/share/logstash/ojdbc7-12.1.0.2.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
statement => "select xxx"
parameters => { "unr" => "xxx"}
target => "xxx"
}
To fix it the plugin should handle Network or Sql Exceptions with retry. Or validate / refresh connection when SQL Exception occurs.
Thanks
In the jdbc input plugin, there is a parameter called jdbc_password_filepath
which is very convenient when deploying pipelines in Kubernetes, as the credentials for the JDBC connection can be mounted from cluster secrets.
It would be very helpful to have this parameter on the jdbc_streaming filter.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.