Code Monkey home page Code Monkey logo

logstash-filter-jdbc_streaming's Issues

Enable pipeline to configure connection pooling

I want to send more concurrent filter queries to speed up processing of large batches of events. I've been unable to find any documentation that might explain the behavior here. Can anybody clear things up for me?

-- Justin McAleer, via discuss

Since Sequel supports connection pooling by default, exposing the ability to control aspects of pooling up to the pipeline configuration should be pretty straight-forward.

JDBC streaming filter seems not working properly

This is my .conf file configuration

input{
  jdbc {
    jdbc_connection_string => "jdbc:sqlserver://testserver;databaseName=TEST_DEV;user=me;password=secret;"
    jdbc_user => "me"
    jdbc_password => "secret"
    jdbc_driver_library => "../jre8/sqljdbc42.jar"
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    # Shedule is once per hour or customise in cron type syntax
    schedule => "* * * * *"
    statement => "SELECT * from PROPERTY_MASTER_ORS.C_B_LEASE_COMPARABLES"
    # Uncomment and experiement for large tables
    #jdbc_paging_enabled => true
    #jdbc_page_size => 15000
    #jdbc_fetch_size => 10000
  }
}
filter {
  jdbc_streaming {
    jdbc_driver_library => "../jre8/sqljdbc42.jar"
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    jdbc_connection_string => "jdbc:sqlserver://testserver;databaseName=TEST_DEV;user=me;password=secret;"
    jdbc_user => "me"
    jdbc_password => "secret"
    statement => "SELECT * from PROPERTY_MASTER_ORS.C_B_LEASE_COMP_FLR where lease_comaparable_id = :id"
    parameters => { "id" =>"rowid_object"}
    target => "lease_comp_floor"
  }
}

output {
  elasticsearch {
    hosts => "http://estestcloud.cloudapp.net:9200"
    index => "rpdi-es-1.0"
    document_type => "lease-comp"
    document_id => "%{rowid_object}"
  }
  stdout {
    codec => rubydebug
  }
}

So it is fetching few records, after that i am getting the below error.

[2017-07-13T18:33:47,081][INFO ]logstash.inputs.jdbc SELECT * from PROPERTY_MASTER_ORS.C_B_LEASE_COMPARABLES
[2017-07-13T18:33:47,867][ERROR][logstash.pipeline ] t.ruby_set_field(JrubyEventExtLibrary.java:128)", "org.logstash.ext.JrubyEventExtLibrary$RubyEvent$INVOKER$i$2$0$ruby_set_field.call(JrubyEventExtLibrary$RubyEvent$INVOKER$i$2$0$ruby_set_field.gen)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:202)", "org.jruby.ast.CallTwoArgNode.interpret(CallTwoArgNode.java:59)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)", "org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:74)", "org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:225)", "org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:219)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:202)", "org.jruby.ast.FCallTwoArgNode.interpret(FCallTwoArgNode.java:38)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.ast.IfNode.interpret(IfNode.java:118)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)", "org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:74)", "org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:204)", "org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:211)", "org.jruby.runtime.callsite.CachingCallSite.callBlock(CachingCallSite.java:177)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:183)", "org.jruby.ast.FCallOneArgBlockPassNode.interpret(FCallOneArgBlockPassNode.java:32)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)", "org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:74)", "org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:204)", "org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:211)", "org.jruby.runtime.callsite.CachingCallSite.callBlock(CachingCallSite.java:177)", "org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:188)", "org.jruby.ast.FCallOneArgBlockNode.interpret(FCallOneArgBlockNode.java:34)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)", "org.jruby.ast.IfNode.interpret(IfNode.java:118)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.evaluator.ASTInterpreter.INTERPRET_BLOCK(ASTInterpreter.java:112)", "org.jruby.runtime.Interpreted19Block.evalBlockBody(Interpreted19Block.java:206)", "org.jruby.runtime.Interpreted19Block.yield(Interpreted19Block.java:157)", "org.jruby.runtime.Block.yield(Block.java:142)", "org.jruby.RubyArray.eachCommon(RubyArray.java:1606)", "org.jruby.RubyArray.each(RubyArray.java:1613)", "org.jruby.RubyArray$INVOKER$i$0$0$each.call(RubyArray$INVOKER$i$0$0$each.gen)", "org.jruby.runtime.callsite.CachingCallSite.callBlock(CachingCallSite.java:143)", "org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:154)", "org.jruby.ast.CallNoArgBlockNode.interpret(CallNoArgBlockNode.java:64)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)", "org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:74)", "org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:182)", "org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:203)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:168)", "org.jruby.ast.CallOneArgNode.interpret(CallOneArgNode.java:57)", "org.jruby.ast.LocalAsgnNode.interpret(LocalAsgnNode.java:123)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)"}]

Any suggestion please.

TypeError: failed to coerce jdk.internal.loader.ClassLoaders$AppClassLoader to java.net.URLClassLoader

This is just to report/track that the same issue and same workaround solution on logstash 7.4.0 with jdbc_streaming filter plugin as for jdbc_static reported in logstash-plugins/logstash-filter-jdbc_static#47

In this case issue occurred when using

jdbc_streaming {
jdbc_driver_library => "/usr/share/logstash/x-pack-sql-jdbc-7.4.0.jar"

The same workaround solution worked - remove jdbc_driver_library setting or pass empty string and put the jar file directly in /usr/share/logstash/logstash-core/lib/jars/x-pack-sql-jdbc-7.4.0.jar

Multiple search results generate nested objects

I use your plugin to enrich events with data from external reputation database. My goal is to process the enriched data from my external database with JDBC support using visualization in Kibana.

There is a key snippet from my logstash.conf:

statement => "select feed_name, first_seen, last_added from feeds_aggregated where ip = :lookupIP"
parameters => { "lookupIP" => "ip" }
target => "[enrich][fia_nested]"

After Logstash processing I get enriched data with such structure in Elasticsearch:

"enrich": {
      "fia_nested": [
        {
          "feed_name": "blocklist_de_apache",
          "first_seen": "2018-07-19T14:16:03.714Z",
          "last_added": "2018-07-19T14:16:03.714Z"
        },
        {
          "feed_name": "urandomusto_http",
          "first_seen": "2018-07-19T14:18:03.829Z",
          "last_added": "2018-07-19T14:18:03.829Z"
        }
      ]

The restriction of Kibana does not allow properly processing nested objects. (https://www.elastic.co/guide/en/kibana/current/nested-objects.html).

How can I manage the result data structure using you plugin?

Multiple JAR files in classpath

I'd like to report a bug that plugin does not recognize multiple JAR files in class path. For instance that's my config file for this filter:

jdbc_streaming {
    id => "via-categories-stream"
    jdbc_user => "user"
    jdbc_driver_library => "/usr/share/logstash/drivers/GoogleBigQueryJDBC42.jar,/usr/share/logstash/drivers/jackson-core-2.1.3.jar,/usr/share/logstash/drivers/google-oauth-client-1.24.1.jar,/usr/share/logstash/drivers/google-http-client-jackson2-1.24.1.jar,/usr/share/logstash/drivers/google-http-client-1.24.1.jar,/usr/share/logstash/drivers/google-api-services-bigquery-v2-rev400-1.24.1.jar,/usr/share/logstash/drivers/google-api-client-1.24.1.jar"
    jdbc_driver_class => "com.simba.googlebigquery.jdbc42.Driver"
    jdbc_connection_string => ".."
    statement => "SELECT categoryId, countryId, brandId, supplierId FROM UNNEST ((SELECT filters FROM migration_search.categories WHERE categoryid=:id))"
    parameters => { "id" => "categoryid"}
    target => "filters"
}

And that's the exception I receive:

[2019-01-22T13:49:41,291][ERROR][logstash.filters.jdbcstreaming] Invalid setting for jdbc_streaming filter plugin:

filter {
  jdbc_streaming {
    # This setting must be a path
    # File does not exist or cannot be opened /usr/share/logstash/drivers/GoogleBigQueryJDBC42.jar,/usr/share/logstash/drivers/jackson-core-2.1.3.jar,/usr/share/logstash/drivers/google-oauth-client-1.24.1.jar,/usr/share/logstash/drivers/google-http-client-jackson2-1.24.1.jar,/usr/share/logstash/drivers/google-http-client-1.24.1.jar,/usr/share/logstash/drivers/google-api-services-bigquery-v2-rev400-1.24.1.jar,/usr/share/logstash/drivers/google-api-client-1.24.1.jar
    jdbc_driver_library => "/usr/share/logstash/drivers/GoogleBigQueryJDBC42.jar,/usr/share/logstash/drivers/jackson-core-2.1.3.jar,/usr/share/logstash/drivers/google-oauth-client-1.24.1.jar,/usr/share/logstash/drivers/google-http-client-jackson2-1.24.1.jar,/usr/share/logstash/drivers/google-http-client-1.24.1.jar,/usr/share/logstash/drivers/google-api-services-bigquery-v2-rev400-1.24.1.jar,/usr/share/logstash/drivers/google-api-client-1.24.1.jar"
    ...
  }
}

Very same configuration works with JDBC input filter, so I assume that it doesn't recognize multiple JARs in classpath

DB Lookup Requirements

Outlining some requirements to ship v1. For reference purposes, the meta issue is here.

Currently as-is, the plugin will execute a SQL query per event, and store the full result set in a new field. This requires a query and network round trip for each event, which isn't ideal. We should make this more dynamic...

Goals

Enable the ability to lookup and enrich events from relational databases, commonly needed for users running CMDBs or using traditional data warehousing.

Features

  • Allow for caching of JDBC result datasets, hashed in-memory by a specific key for fast lookup and field enrichment
  • Lookups can be done with a primary key or composite key
  • Max cache size should be configurable
  • Periodic reloading of cache, in seconds - live refresh with subsequent JDBC query

When Logstash first starts up, it should automatically query the DB and load the result set locally in cache. For fast retrieval, each row can probably be hashed by the primary or composite key. Since we always know the key for every event, it should be O(1) access.

Configuration

filter {
  jdbc {
    # Setup JDBC connection
    jdbc_driver_library => "/path/to/mysql-connector-java-5.1.34-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => ""jdbc:mysql://localhost:3306/mydatabase"
    jdbc_user => "me"
    jdbc_password => "secret"
    
    # JDBC query statement; dataset to be cached for enrichment purposes
    statement => "select * from WORLD.COUNTRY"

    # Define the lookup keys used to lookup and enrich DB rows to Logstash events. It maps the DB column name(s) 
    # to the field name(s) in the event, essentially conducting a "join" on this key.
    # Each lookup should return a single unique result, or else it will just take first result found for enrichment.
    # Both primary and composite keys are supported. Composite keys will have multiple keys specified, like below.
    # DB column names referenced must be available in the query result dataset.
    # (Required) - Syntax:  "event field name" => "DB column name"
    lookup_keys => {
      "code" => "country_code"
      "org" => "organization"
    }

    # Define one or more DB columns to enrich the Logstash event with.
    # DB column names referenced must be available in the query result dataset.
    # The new Logstash event field name(s) will be the same as the respective column(s) it came from.
    # (Optional) - if undefined, adds all fields (excluding lookup_key fields) to event top level.
    target_fields => ["country_name", "country_details"]

    # Cache settings
    refresh_schedule => "0 * * * *"  # Refresh cache every hour. Cron syntax like the JDBC input scheduler. (Optional) - default = never refresh
    max_cache_size => "1MB"  # (Optional) - need to find a good default for this, maybe 500MB?
  }
}

Failure Handling

If lookups fail, we can tag it "_jdbclookupfailure".

Stops processing when blank results are returned

I'm using this filter to enhance reporting for suricata alerts, and I've recently discovered that when a blank result is returned, it appears to stop processing that particular event. Not sure if this is intended behavior, but if so, it could be beneficial to have a boolean that would allow the output variables to be set to, say, blank_result_returned when this happens.

prepared_statement_bind_values about binding Array

When i bind an array like [1,2,3,4] into prepared_statement_bind_values, the real executing sql is '1,2,3,4'. That's bad news for IN in sql.

What i want:

select .... where somefield IN (1,2,3,4)

But:

select ... where somefield IN('1,2,3,4')

Issues running with MSSQL and JRE11

I'm having issues getting it running with JRE11 and MSSQL JDBC.

Here are the errors:

[2019-04-11T10:12:52,426][ERROR][logstash.javapipeline    ] Pipeline aborted due to error {:pipeline_id=>"main", :exception=>#<Sequel::AdapterNotFound: com.microsoft.sqlserver.jdbc.SQLServerDriver not loaded>, :backtrace=>["/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/sequel-5.19.0/lib/sequel/adapters/jdbc.rb:56:in `load_driver'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-jdbc_streaming-1.0.5/lib/logstash/plugin_mixins/jdbc_streaming.rb:52:in `prepare_jdbc_connection'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-jdbc_streaming-1.0.5/lib/logstash/filters/jdbc_streaming.rb:194:in `prepare_connected_jdbc_cache'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-jdbc_streaming-1.0.5/lib/logstash/filters/jdbc_streaming.rb:116:in `register'", "org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java:56:in `register'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:192:in `block in register_plugins'", "org/jruby/RubyArray.java:1792:in `each'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:191:in `register_plugins'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:447:in `maybe_setup_out_plugins'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:204:in `start_workers'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:146:in `run'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:105:in `block in start'"], :thread=>"#<Thread:0x612bbead@/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:102 run>"}
[2019-04-11T10:12:52,428][ERROR][logstash.agent           ] Failed to execute action {:id=>:main, :action_type=>LogStash::ConvergeResult::FailedAction, :message=>"Could not execute action: PipelineAction::Create<main>, action_result: false", :backtrace=>nil}

And here is my config:

        jdbc_streaming {
          jdbc_validate_connection => true
          jdbc_validation_timeout => 300
          cache_expiration => 60
          cache_size => 20000
          jdbc_driver_library => '/etc/logstash/sqljdbc/mssql-jdbc-7.2.1.jre11.jar'
          jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
          jdbc_connection_string => "jdbc:sqlserver://[Connection Removed]"
          jdbc_user => '[User Removed]'
          jdbc_password => "[Password Removed]"
          statement => "[Query removed]"
          parameters => { "IPAddy" => "queryip" }
          target => "target"

Note that is does work with jre8 and the mssql-jdbc-7.2.1.jre8.jar JAR.

Add LRU cache to this filter

The current code executes the query for every event. To improve performance, we should provide an LRU cache to store the values returned after the first DB lookup. The following config should be exposed:

cache => true
cache_expiration => 5ms
cache_entries => 500

Users who don't want stale data can set the cache => false or set aggressive cache_expiration time

Support for loading query from file

Whenever there are large and complex queries it would be nice to load them from a file instead of having a multi-line SQL-Statement in the config file. This would also eliminate the annoyance of escaped quotes when using PostgreSQL and jsonb queries

Missing Converter handling for full class name=org.jruby.RubyObjectVar3

Logstash Version: 6.4 hosted on Oracle Linux Server release 6.8
Logstash-filter-jdbc_streaming Version: 1.0.4
MSSQL Driver: sqljdbc_6.0/enu/jre8/sqljdbc42.jar
Database: MSSQL 2017

input {
  jdbc {
    jdbc_driver_library => "/etc/logstash/ancillary/jdbc/drivers/sqljdbc_6.0/enu/jre8/sqljdbc42.jar"
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    jdbc_connection_string => ""
    jdbc_user => ""
    jdbc_password => ""
    statement => ""
    last_run_metadata_path => "/etc/logstash/ancillary/jdbc/.logstash_jdbc_last_run"
    add_field => {
      "[@metadata][elastic]" => "false"
      "[@metadata][sql]" => "true"
      "[@metadata][type]" => "inc"
      "[@metadata][run]" => "full-pipeline"
    }
  }
}
filter {
    jdbc_streaming {
      id => "JDBCSTREAM"
      jdbc_driver_library => "/etc/logstash/ancillary/jdbc/drivers/sqljdbc_6.0/enu/jre8/sqljdbc42.jar"
      jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
      jdbc_connection_string => ""
      jdbc_user => ""
      jdbc_password => ""
      statement => "SP_INCIDENT_BACKLOG_INFERENCE @INCIDENT=:inc, @RUN=:run"
      parameters => { "inc" => "incident_id" "run" => "[@metadata][run]"}
      target => "sql_results"
    }
    split { field => "sql_results" }
    ruby {
      code =>'
        event.get("sql_results").each {|k,v| event.set(k,v)}
        event.remove("sql_results")
        event.remove("@timetamp")
      '
    }
}
output { stdout { codec => rubydebug } }

Wanted to get this logged as an issue so at least others can be aware of it. I have a discussion post going through the issue as well.

Missing Converter handling for full class name=org.jruby.RubyObjectVar3

Lead me to an old discussion topic: https://discuss.elastic.co/t/problem-with-timestamp-and-timezone/148622

Which resulted in an issue being logged for the logstash-input-jdbc plugin:
logstash-plugins/logstash-input-jdbc#302

Which lead me to guyboertje's comment pointing back to the logstash-filter-jdbc_streaming plugin: logstash-plugins/logstash-input-jdbc#302 (comment)

Given the conversation it seemed like this was a type issue, where one of my MSSQL types was not playing well with whatever this internal JRuby class (org.jruby.RubyObjectVar3) is doing. Going through some process of elimination I found the culprit. I had a single date type field being leveraged to remove the time value as it was not needed. All I had to do was cast this date type into our standard datetime2 type and everything was right with the world.

I am not sure if upgrading to a newer version of LS would solve this problem but currently that's not an option for me. Wanted to make sure this issue was documented in case someone else runs into it in the future.

Related POST with all the details: https://discuss.elastic.co/t/logstash-filter-jdbc-streaming-parameter-issue-calling-stored-procedure-mssql/209085/2?u=chris_lyons

Unable to load plugin

[ERROR] 2019-05-08 05:51:21.572 [[main]-pipeline-manager] javapipeline - Pipeline aborted due to error {:pipeline_id=>"main", :exception=>#<TypeError: failed to coerce jdk.internal.loader.ClassLoaders$AppClassLoader to java.net.URLClassLoader>, :backtrace=>["org/jruby/java/addons/KernelJavaAddons.java:29:in to_java'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-jdbc_streaming-1.0.5/lib/logstash/plugin_mixins/jdbc_streaming.rb:48:in prepare_jdbc_connection'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-jdbc_streaming-1.0.5/lib/logstash/filters/jdbc_streaming.rb:194:in prepare_connected_jdbc_cache'", "/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-jdbc_streaming-1.0.5/lib/logstash/filters/jdbc_streaming.rb:116:in register'", "org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java:56:in register'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:191:in block in register_plugins'", "org/jruby/RubyArray.java:1792:in each'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:190:in register_plugins'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:446:in maybe_setup_out_plugins'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:203:in start_workers'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:145:in run'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:104:in block in start'"], :thread=>"#<Thread:0x7e04c1ec run>"} [ERROR] 2019-05-08 05:51:21.597 [Converge PipelineAction::Create<main>] agent - Failed to execute action {:id=>:main, :action_type=>LogStash::ConvergeResult::FailedAction, :message=>"Could not execute action: PipelineAction::Create<main>, action_result: false", :backtrace=>nil}

My config are as follows

filter {
    json {
        source => "message"
    }
  
    grok {
        match => {
            "topic" => "tele/%{DATA:mac_address}/SENSOR"
        }
    }
    
    jdbc_streaming {
        jdbc_connection_string => "jdbc:mysql://10.243.252.61:3306/mydb"
        jdbc_driver_library => "/root/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_user => "mydb"
        jdbc_password => "mydb"
        statement => "SELECT email FROM chargingstation LEFT JOIN auth_user ON chargingstation.owner_id = auth_user.id  where mac_address=:mac_address1"
        parameters => { "mac_address1" => "mac_address" }
        target => "email"
    }
}

tag_on_success

Hello,

I thought about a new option tag_on_success or tag_on_match or the like. Right now we need the ability to tag an event only if the desired row was found. The common option add_tag is applied regardless of a match or a mismatch.
Currently the workaround is to append a mutate filter that checks the absence of the tag from tag_on_default_use and โ€“ if so โ€“ adds the "success tag", but that is somewhat ugly.
What do you think?

Unhandled exception when connection fails

logstash (5.6.14 and 6.5.4)
logstash-filter-jdbc_streaming (1.0.4)

Hi there,

I get an error when connecting to a dababase server that refuses the connection. Unfortunately this error kills the whole pipeline and no further events are processed.
In my understanding the tags from tag_on_failure should be applied in such a error case.

Error with SQL Server and smalldatetime field

When I use a SQL query with some smalldatetime field, I get this error :

[ERROR][logstash.pipeline] Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash.

{
    "exception" => "Missing Valuefier handling for full class name=org.jruby.RubyObject, simple name=RubyObject"
    "backtrace" => [
      "org.logstash.Valuefier.convertNonCollection(Valuefier.java:51)"
      "org.logstash.Valuefier.convert(Valuefier.java:90)"
      "org.logstash.ConvertedMap$1.visit(ConvertedMap.java:43)"
      "org.jruby.RubyHash.visitLimited(RubyHash.java:648)"
      "org.jruby.RubyHash.visitAll(RubyHash.java:634)"
      "org.logstash.ConvertedMap.newFromRubyHash(ConvertedMap.java:39)"
      "org.logstash.Valuefier.convert(Valuefier.java:63)"
      "org.logstash.ConvertedList.newFromRubyArray(ConvertedList.java:43)"
      "org.logstash.Valuefier.convert(Valuefier.java:66)"
      "org.logstash.ext.JrubyEventExtLibrary$RubyEvent.ruby_set_field(JrubyEventExtLibrary.java:128)"
      "org.logstash.ext.JrubyEventExtLibrary$RubyEvent$INVOKER$i$2$0$ruby_set_field.call(JrubyEventExtLibrary$RubyEvent$INVOKER$i$2$0$ruby_set_field.gen)"
      "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:202)"
      "org.jruby.ast.CallTwoArgNode.interpret(CallTwoArgNode.java:59)"
      "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)"
      "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)"
      "org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:74)"
      "org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:225)"
      "org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:219)"
      "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:202)"
      "org.jruby.ast.FCallTwoArgNode.interpret(FCallTwoArgNode.java:38)"
      "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)"
      "org.jruby.ast.IfNode.interpret(IfNode.java:118)"
      "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)"
      "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)"
      "org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:74)"
      "org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:204)"
      "org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:211)"
      "org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:336)"
      "org.jruby.runtime.callsite.CachingCallSite.callBlock(CachingCallSite.java:179)"
      "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:183)"
      "org.jruby.ast.FCallOneArgBlockPassNode.interpret(FCallOneArgBlockPassNode.java:32)"
      "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)"
      "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)"
      "org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:74)"
      "org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:204)"
      "org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:211)"
      "org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:336)"
      "org.jruby.runtime.callsite.CachingCallSite.callBlock(CachingCallSite.java:179)"
      "org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:188)"
      "org.jruby.ast.FCallOneArgBlockNode.interpret(FCallOneArgBlockNode.java:34)"
      "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)"
      "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)"
      "org.jruby.ast.IfNode.interpret(IfNode.java:118)"
      "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)"
      "org.jruby.evaluator.ASTInterpreter.INTERPRET_BLOCK(ASTInterpreter.java:112)"
      "org.jruby.runtime.Interpreted19Block.evalBlockBody(Interpreted19Block.java:206)"
      "org.jruby.runtime.Interpreted19Block.yield(Interpreted19Block.java:157)"
      "org.jruby.runtime.Block.yield(Block.java:142)"
      "org.jruby.RubyArray.eachCommon(RubyArray.java:1606)"
      "org.jruby.RubyArray.each(RubyArray.java:1613)"
      "org.jruby.RubyArray$INVOKER$i$0$0$each.call(RubyArray$INVOKER$i$0$0$each.gen)"
      "org.jruby.runtime.callsite.CachingCallSite.callBlock(CachingCallSite.java:143)"
      "org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:154)"
      "org.jruby.ast.CallNoArgBlockNode.interpret(CallNoArgBlockNode.java:64)"
      "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)"
      "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)"
      "org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:74)"
      "org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:182)"
      "org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:203)"
      "org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:326)"
      "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:170)"
      "org.jruby.ast.CallOneArgNode.interpret(CallOneArgNode.java:57)"
      "org.jruby.ast.LocalAsgnNode.interpret(LocalAsgnNode.java:123)"
      "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)"
      "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)"
      "org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:74)"
      "org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:182)"
      "org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:203)"
      "org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:326)"
      "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:170)"
      "org.jruby.ast.CallOneArgNode.interpret(CallOneArgNode.java:57)"
      "org.jruby.ast.DAsgnNode.interpret(DAsgnNode.java:110)"
      "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)"
      "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)"
      "org.jruby.evaluator.ASTInterpreter.INTERPRET_BLOCK(ASTInterpreter.java:112)"
      "org.jruby.runtime.Interpreted19Block.evalBlockBody(Interpreted19Block.java:206)"
      "org.jruby.runtime.Interpreted19Block.yield(Interpreted19Block.java:194)"
      "org.jruby.runtime.Interpreted19Block.call(Interpreted19Block.java:125)"
      "org.jruby.runtime.Block.call(Block.java:101)"
      "org.jruby.RubyProc.call(RubyProc.java:300)"
      "org.jruby.internal.runtime.methods.ProcMethod.call(ProcMethod.java:64)"
      "org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:210)"
      "org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:206)"
      "org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:326)"
      "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:170)"
      "rubyjit.LogStash::Pipeline$$filter_batch_e0d70a513c9b572e72031cc1bba279d4ffc82a801442407170.block_0$RUBY$__file__(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:353)"
      "rubyjit$LogStash::Pipeline$$filter_batch_e0d70a513c9b572e72031cc1bba279d4ffc82a801442407170$block_0$RUBY$__file__.call(rubyjit$LogStash::Pipeline$$filter_batch_e0d70a513c9b572e72031cc1bba279d4ffc82a801442407170$block_0$RUBY$__file__)"
      "org.jruby.runtime.CompiledBlock19.yield(CompiledBlock19.java:159)"
      "org.jruby.runtime.CompiledBlock19.call(CompiledBlock19.java:87)"
      "org.jruby.runtime.Block.call(Block.java:101)"
      "org.jruby.RubyProc.call(RubyProc.java:300)"
      "org.jruby.RubyProc.call19(RubyProc.java:281)"
      "org.jruby.RubyProc$INVOKER$i$0$0$call19.call(RubyProc$INVOKER$i$0$0$call19.gen)"
      "org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:210)"
      "org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:206)"
      "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:168)"
      "rubyjit.LogStash::Util::WrappedAckedQueue::ReadBatch$$each_460b01f11b82d55a13c70efd2aabe7c62fbc86591442407170.block_0$RUBY$__file__(/usr/share/logstash/logstash-core/lib/logstash/util/wrapped_acked_queue.rb:259)"
      "rubyjit$LogStash::Util::WrappedAckedQueue::ReadBatch$$each_460b01f11b82d55a13c70efd2aabe7c62fbc86591442407170$block_0$RUBY$__file__.call(rubyjit$LogStash::Util::WrappedAckedQueue::ReadBatch$$each_460b01f11b82d55a13c70efd2aabe7c62fbc86591442407170$block_0$RUBY$__file__)"
      "org.jruby.runtime.CompiledBlock19.yield(CompiledBlock19.java:135)"
      "org.jruby.runtime.Block.yield(Block.java:142)"
      "org.jruby.RubyHash$13.visit(RubyHash.java:1355)"
      "org.jruby.RubyHash.visitLimited(RubyHash.java:648)"
      "org.jruby.RubyHash.visitAll(RubyHash.java:634)"
      "org.jruby.RubyHash.iteratorVisitAll(RubyHash.java:1306)"
      "org.jruby.RubyHash.each_pairCommon(RubyHash.java:1351)"
      "org.jruby.RubyHash.each19(RubyHash.java:1342)"
      "org.jruby.RubyHash$INVOKER$i$0$0$each19.call(RubyHash$INVOKER$i$0$0$each19.gen)"
      "org.jruby.runtime.callsite.CachingCallSite.callBlock(CachingCallSite.java:143)"
      "org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:154)"
      "rubyjit.LogStash::Util::WrappedAckedQueue::ReadBatch$$each_460b01f11b82d55a13c70efd2aabe7c62fbc86591442407170.__file__(/usr/share/logstash/logstash-core/lib/logstash/util/wrapped_acked_queue.rb:258)"
      "rubyjit.LogStash::Util::WrappedAckedQueue::ReadBatch$$each_460b01f11b82d55a13c70efd2aabe7c62fbc86591442407170.__file__(/usr/share/logstash/logstash-core/lib/logstash/util/wrapped_acked_queue.rb)"
      "org.jruby.internal.runtime.methods.JittedMethod.call(JittedMethod.java:161)"
      "org.jruby.runtime.callsite.CachingCallSite.callBlock(CachingCallSite.java:143)"
      "org.jruby.runtime.callsite.CachingCallSite.callIter(CachingCallSite.java:154)"
      "rubyjit.LogStash::Pipeline$$filter_batch_e0d70a513c9b572e72031cc1bba279d4ffc82a801442407170.chained_0_rescue_1$RUBY$SYNTHETIC__file__(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:352)"
      "rubyjit.LogStash::Pipeline$$filter_batch_e0d70a513c9b572e72031cc1bba279d4ffc82a801442407170.__file__(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb)"
      "rubyjit.LogStash::Pipeline$$filter_batch_e0d70a513c9b572e72031cc1bba279d4ffc82a801442407170.__file__(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb)"
      "org.jruby.internal.runtime.methods.JittedMethod.call(JittedMethod.java:181)"
      "org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:326)"
      "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:170)"
      "org.jruby.ast.FCallOneArgNode.interpret(FCallOneArgNode.java:36)"
      "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)"
      "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)"
      "org.jruby.ast.WhileNode.interpret(WhileNode.java:131)"
      "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)"
      "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)"
      "org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:74)"
      "org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:225)"
      "org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:219)"
      "org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:346)"
      "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:204)"
      "org.jruby.ast.FCallTwoArgNode.interpret(FCallTwoArgNode.java:38)"
      "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)"
      "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)"
      "org.jruby.evaluator.ASTInterpreter.INTERPRET_BLOCK(ASTInterpreter.java:112)"
      "org.jruby.runtime.Interpreted19Block.evalBlockBody(Interpreted19Block.java:206)"
      "org.jruby.runtime.Interpreted19Block.yield(Interpreted19Block.java:194)"
      "org.jruby.runtime.Interpreted19Block.call(Interpreted19Block.java:125)"
      "org.jruby.runtime.Block.call(Block.java:101)"
      "org.jruby.RubyProc.call(RubyProc.java:300)"
      "org.jruby.RubyProc.call(RubyProc.java:230)"
      "org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:99)"
      "java.lang.Thread.run(Thread.java:745)"
  ]
}

Oracle SQLRecoverableException

Hello

After some time we get the following Exception and Data is lost.

Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash
"exception"=>java.sql.SQLRecoverableException: No more data to read from socket

Version
Oracle 11.2.0.4.0

jdbc_streaming {
    jdbc_connection_string => "${xxx}"
    jdbc_user => "${xxx}"
    jdbc_password => "${xxx}"
    jdbc_validate_connection => true
    jdbc_driver_library => "/usr/share/logstash/ojdbc7-12.1.0.2.jar"
    jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
    statement => "select xxx"
    parameters => { "unr" => "xxx"}
    target => "xxx"
}

To fix it the plugin should handle Network or Sql Exceptions with retry. Or validate / refresh connection when SQL Exception occurs.

Thanks

Add jdbc_password_filepath

In the jdbc input plugin, there is a parameter called jdbc_password_filepath which is very convenient when deploying pipelines in Kubernetes, as the credentials for the JDBC connection can be mounted from cluster secrets.

It would be very helpful to have this parameter on the jdbc_streaming filter.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.