Code Monkey home page Code Monkey logo

Comments (9)

guyboertje avatar guyboertje commented on August 28, 2024 2

@stanuku

From @dmitrymurashenkov above...

We had 10 pipelines with several filters each and this setup sometimes gave bursts of 100 open connections which were too many for our DB.

Looking at the Logstash and Sequel code, each pipeline is autonomous and each filter plugin instance (as seen in the config) declares (from the authors) itself threadsafe or not. Threadsafe filters are reused across workers (but not pipelines). Threadsafe filters are assumed to be able to be called in parallel by each worker thread. On my Macbook, the jdbc_streaming filter uses a threaded connection pool with has 4 connections by default.

Each worker takes a batch of events from the queue (the inputs feed newly minted events into the queue) and feeds the events from the batch through each filter sequentially based on the conditional logic in the config (if any). This means that if you have two jdbc_streaming filters one after the other then only one will be executing a statement at any one time per worker thread.

Simultaneous execution of a statement by multiple workers threads is probable (up to the 4 connection limit, the default pool size) but to what degree this simultaneous statement execution occurs is determined by the how synchronised the worker loops become as each worker loop is subject to variable delays as it executes the filters and output(s).

Thinking about a worst case scenario, imagine that a jdbc_streaming filter is used to lookup user details from an userid held in an event and that a stream of 2500 events all refer to the same user id. The exact same statement will be executed 2500 times - this is clearly very wasteful but the jdbc_streaming filter is designed with volatility in mind, i.e. that the lookup source tables change often and unpredictably. The jdbc_static filter is designed with non-volatile lookup sources in mind (but a refresh schedule allows for periodic refill of the latest changes). However, jdbc_static is less useful when the lookup sources are very large. There are some mix and match options possible to mitigate downloading millions of lookup records in the jdbc_static by using a jdbc_static for the "hot" 20000 ids and a conditional jdbc_streaming filter for the "cold" rest.

To test whether a bigger pool size will improve throughput or whether a smaller pool size will put less load on the DB you can modify the jdbc_streaming filter source code (in place). Replace line 53 of the file vendor/bundle/jruby/2.3.0/gems/logstash-filter-jdbc_streaming-1.0.4/lib/logstash/plugin_mixins/jdbc_streaming.rb with these 3 lines:

    opts = {:user=> @jdbc_user, :password=>  @jdbc_password.nil? ? nil : @jdbc_password.value}
    opts[:max_connections] = 9 # number of workers plus 1
    @database = Sequel.connect(@jdbc_connection_string, opts)

If you do this test, please report your findings back here.

from logstash-filter-jdbc_streaming.

dmitrymurashenkov avatar dmitrymurashenkov commented on August 28, 2024

+1 For the issue, it seems that several connections are opened and kept in pool, but only one of the used at any time.

from logstash-filter-jdbc_streaming.

stanuku avatar stanuku commented on August 28, 2024

Any updates please?

from logstash-filter-jdbc_streaming.

guyboertje avatar guyboertje commented on August 28, 2024

Is the problem here that one filter is serially accessed by each worker?

We have discussed batch event processing in enhancement filters before. No conclusive approach was decided upon.
One strategy that I am interested in exploring is distinct value processing, assumes that batches have clusters of events that will lookup the same value, e.g. IP address or product id. This could also be called cache by batch (cache cleared after each batch).

from logstash-filter-jdbc_streaming.

dmitrymurashenkov avatar dmitrymurashenkov commented on August 28, 2024

@guyboertje In our particular case the problem was unexpected number of connections to database and no ability to control them.

We had 10 pipelines with several filters each and this setup sometimes gave bursts of 100 open connections which were too many for our DB.

The best solution for this case would be ability to configure shared connection pool in all pipelines in filters, but it may lead to starvation and not sure if technically possible to implement. So the bare minimum is to simply allow set number of connections for each filter's connection pool. Since now it uses 4 connections per filter as default and cannot be set to 1 connection per filter max.

from logstash-filter-jdbc_streaming.

stanuku avatar stanuku commented on August 28, 2024

@guyboertje Thank you for your response.

Is the problem here that one filter is serially accessed by each worker?

Yes, In our case we have one pipeline, with 5 workers and a batch size of 5000.

  • Input - JDBC plugin connecting to Oracle DB, each event (input data row) has a unique key and independent of the other.
  • Filters - Two JDBC Streaming filters that retrieve additional data (from same Oracle DB) using key value(s) from the event.
  • Output - To Elasticsearch

It is a simple setup, I guess, however, we noticed that the process freezes for records > 50. We have close to 2.5 Million rows from the input SQL query, if we don't include the JDBC streaming, then the process completes in a jiffy. We suspect that the JDBC steaming is causing some bottleneck (sequential executions??) not sure if the contention is on the Oracle connection or something else.

We have discussed batch event processing in enhancement filters before. No conclusive approach was decided upon.

Would this approach help with the JDBC streaming filter?

One strategy that I am interested in exploring is distinct value processing, assumes that batches have clusters of events that will lookup the same value, e.g. IP address or product id. This could also be called cache by batch (cache cleared after each batch).

Sorry, but I'm not too familiar with the architecture of Logstash i.e. how the workers get distributed among the batches etc.

Any help or insight into our situation would be appreciated.

I'm including a trimmed out snippet of our config:

input {
   jdbc {
        clean_run => false	
	jdbc_driver_library => "C:\logstash\ojdbc8.jar"	
	jdbc_driver_class => "Java::oracle.jdbc.OracleDriver"
	jdbc_connection_string => "jdbc:oracle:thin:hostserver.abc.xyz/service"
        jdbc_user => "xxxx"
	jdbc_password => "xxxxx"
	# Connection pool configuration. 
	# Validate connection before use.
	jdbc_validate_connection => true
	#How often to validate a connection (in seconds)
	jdbc_validation_timeout => 120
	#jdbc_validation_query => "SELECT 1 FROM DUAL"
	#Maximum number of times to try connecting to database
	connection_retry_attempts => 5
	# Number of seconds to sleep between connection attempts
	connection_retry_attempts_wait_time => 2
	lowercase_column_names => true
	
	# General/Vendor-specific Sequel configuration options. 
	# https://github.com/jeremyevans/sequel/blob/master/doc/opening_databases.rdoc
	sequel_opts => 
		{   
			login_timeout => "60"
			prefetch_rows => "5000"			
			jdbc_properties =>
				{
					"defaultRowPrefetch" => "5000"
					"loginTimeout" => "60"
					"inactiveConnectionTimeout" => "120"
					"timeoutCheckInterval" => "120"
					"tcpKeepAlive" => "true"
					"oracle.net.READ_TIMEOUT" => "5000"
					"validationQuery" => "SELECT 1 FROM DUAL"
				}
		}
	
	# SQL Query
	statement => " SELECT id, abc, def, xyz FROM table "
   }
}

filter {
	jdbc_streaming {			
			...
			jdbc_connection_string => "jdbc:oracle:thin:hostserver.abc.xyz/service"
			...
			target => "target_field_1"
			use_cache => "false"
			cache_expiration => "5"
			cache_size => "0"
			statement => "SELECT blah FROM TABLE_1 WHERE ID = :id"
			parameters => {"id" => "id"}
	}

	jdbc_streaming {			
			...
			jdbc_connection_string => "jdbc:oracle:thin:hostserver.abc.xyz/service"
			...
			target => "target_field_2"
			use_cache => "false"
			cache_expiration => "5"
			cache_size => "0"
			statement => "SELECT blah, blah, blah FROM TABLE_2 WHERE ID = :id"
			parameters => {"id" => "id"}
	}
	
  #Add field first to handle if there is any null code
	mutate { add_field => { "[xyz][code]" => "" }}			

	if [code] {	mutate { replace => { "[xyz][code]" => "%{code}" } } }
}

output {
	elasticsearch { 
		hosts => xxxx
		index => "indexname"
		http_compression => true
	}
}

from logstash-filter-jdbc_streaming.

guyboertje avatar guyboertje commented on August 28, 2024

Ahh OK. So you mean global connection pooling. I don't think this can be done in Logstash as each filter is seen as an autonomous transformation engine working on a single event at a time.

I have seen similar questions before some time back now, at that time I searched for any kind of jdbc db proxy. I searched again today and found:

  1. sqlrelay
  2. HA-JDBC

Both are actively developed and are open source. I have not tested any of them. Please feedback any conclusions if you decide to evaluate them and/or use one in production.

In either the original direct or a proxied (pooled) indirect setup we still have a job to do understanding how the various jdbc plugins will react to waiting for its turn at execution and the timing out thereof. I must admit that this is a lesser understood facet of the jdbc plugin behaviour.

from logstash-filter-jdbc_streaming.

stanuku avatar stanuku commented on August 28, 2024

@guyboertje

Ok. That leads to a followup question, do (or can) the workers process filters in parallel, i.e. lets suppose I have 5 workers, each is processing an input event (so 5 in all) in parallel, and for each of those events will there be an autonomous instance of a filter be created to process the event?

Thanks for the pointers to the JDBC connection pooling, depending on the answer to my above question, we can narrow down to where the main bottleneck is, i.e. if the filters can't process multiple events in parallel, then connection pooling becomes secondary (but a bottleneck nonetheless).

from logstash-filter-jdbc_streaming.

NETESudarshan avatar NETESudarshan commented on August 28, 2024

@guyboertje Much appreciated!, will keep you posted.

from logstash-filter-jdbc_streaming.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.