que-rb / que Goto Github PK
View Code? Open in Web Editor NEWA Ruby job queue that uses PostgreSQL's advisory locks for speed and reliability.
License: MIT License
A Ruby job queue that uses PostgreSQL's advisory locks for speed and reliability.
License: MIT License
I don't see a way to specify a queue via an environment variable to rake que:work
? Unless I missed it.
Not sure if it would be helpful to have a timestamp that indicates when the job was created. Possibly when it was last ran as well.
I suppose you could figure that out from the retry count and the retry interval though..
Thoughts?
pg_advisory_lock
supports passing two int4
s instead of one int8
, that way it's possible to "namespace" the locks, with either a user-supplied number or by e.g. using the oid of the que_jobs
table.
This allows having more than one que running per Postgres instance or to use que while also having other code that uses Postgres advisory locks.
Thoughts?
We sometimes need to be able to que jobs that should run even if the current transaction rolls back.
Right now, we use dblink to open the new connection.
I'm not sure if that's something that should be added to que or not.
Is there a more convenient way of running (not enqueueing with no delay) once than MyJob.new({ job_class: MyJob.to_s }).run(some_id, other, arg)
?
Hi, I have question.
My Env is ...
An error occurred like below.
I, [2014-09-10T11:50:33.594993 #18456] INFO -- : forked child re-executing...
I, [2014-09-10T11:50:34.096243 #18456] INFO -- : inherited addr=/app/appname/current/tmp/sockets/unicorn.sock fd=14
I, [2014-09-10T11:50:34.096457 #18456] INFO -- : Refreshing Gem list
I, [2014-09-10T11:50:42.183461 #18456] INFO -- : master process ready
I, [2014-09-10T11:50:42.186065 #18614] INFO -- : worker=0 ready
I, [2014-09-10T11:50:42.190885 #18621] INFO -- : worker=2 ready
I, [2014-09-10T11:50:42.193107 #18618] INFO -- : worker=1 ready
I, [2014-09-10T11:50:42.197371 #18635] INFO -- : worker=3 ready
E, [2014-09-10T11:50:42.546774 #18635] ERROR -- : listen loop error: "FATAL: remaining connection slots are reserved for non-replication superuser connections\n" (PG::ConnectionBad)
E, [2014-09-10T11:50:42.546853 #18635] ERROR -- : /app/appname/shared/bundle/ruby/2.1.0/gems/activerecord-4.1.5/lib/active_record/connection_adapters/postgresql_adapter.rb:888:in `initialize'
・・・
E, [2014-09-10T13:50:18.227689 #28140] ERROR -- : /app/reserven/shared/bundle/ruby/2.1.0/gems/activerecord-4.1.5/lib/active_record/connection_adapters/abstract/connection_pool.rb:445:in `checkout_new_connection'
E, [2014-09-10T13:50:18.227712 #28140] ERROR -- : /app/reserven/shared/bundle/ruby/2.1.0/gems/activerecord-4.1.5/lib/active_record/connection_adapters/abstract/connection_pool.rb:416:in `acquire_connection'
E, [2014-09-10T13:50:18.227735 #28140] ERROR -- : /app/reserven/shared/bundle/ruby/2.1.0/gems/activerecord-4.1.5/lib/active_record/connection_adapters/abstract/connection_pool.rb:351:in `block in checkout'
E, [2014-09-10T13:50:18.227770 #28140] ERROR -- : /usr/local/rbenv/versions/2.1.2/lib/ruby/2.1.0/monitor.rb:211:in `mon_synchronize'
E, [2014-09-10T13:50:18.227795 #28140] ERROR -- : /app/reserven/shared/bundle/ruby/2.1.0/gems/activerecord-4.1.5/lib/active_record/connection_adapters/abstract/connection_pool.rb:350:in `checkout'
E, [2014-09-10T13:50:18.227818 #28140] ERROR -- : /app/reserven/shared/bundle/ruby/2.1.0/gems/activerecord-4.1.5/lib/active_record/connection_adapters/abstract/connection_pool.rb:265:in `block in connection'
E, [2014-09-10T13:50:18.227842 #28140] ERROR -- : /usr/local/rbenv/versions/2.1.2/lib/ruby/2.1.0/monitor.rb:211:in `mon_synchronize'
E, [2014-09-10T13:50:18.227879 #28140] ERROR -- : /app/reserven/shared/bundle/ruby/2.1.0/gems/activerecord-4.1.5/lib/active_record/connection_adapters/abstract/connection_pool.rb:264:in `connection'
E, [2014-09-10T13:50:18.227901 #28140] ERROR -- : /app/reserven/shared/bundle/ruby/2.1.0/gems/activerecord-4.1.5/lib/active_record/connection_adapters/abstract/connection_pool.rb:294:in `with_connection'
E, [2014-09-10T13:50:18.227923 #28140] ERROR -- : /app/reserven/shared/bundle/ruby/2.1.0/gems/que-0.8.1/lib/que/adapters/active_record.rb:30:in `checkout_activerecord_adapter'
E, [2014-09-10T13:50:18.227945 #28140] ERROR -- : /app/reserven/shared/bundle/ruby/2.1.0/gems/que-0.8.1/lib/que/adapters/active_record.rb:5:in `checkout'
E, [2014-09-10T13:50:18.227967 #28140] ERROR -- : /app/reserven/shared/bundle/ruby/2.1.0/gems/que-0.8.1/lib/que/job.rb:82:in `work'
E, [2014-09-10T13:50:18.227989 #28140] ERROR -- : /app/reserven/shared/bundle/ruby/2.1.0/gems/que-0.8.1/lib/que/worker.rb:78:in `block in work_loop'
E, [2014-09-10T13:50:18.228011 #28140] ERROR -- : /app/reserven/shared/bundle/ruby/2.1.0/gems/que-0.8.1/lib/que/worker.rb:73:in `loop'
E, [2014-09-10T13:50:18.228033 #28140] ERROR -- : /app/reserven/shared/bundle/ruby/2.1.0/gems/que-0.8.1/lib/que/worker.rb:73:in `work_loop'
E, [2014-09-10T13:50:18.228055 #28140] ERROR -- : /app/reserven/shared/bundle/ruby/2.1.0/gems/que-0.8.1/lib/que/worker.rb:17:in `block in initialize'
My unicorn.rb is like below.
When I change worker_processes
to 4
. The error occur not always.
( When worker_processes is 2, Error does't occured.)
I found Puma's issue(Restarting process with USR2 doesn't always work).
Is The issue relate?
Or, Que's problem?
thanks.
app_path = '/app/appname'
working_directory app_path + '/current'
pid app_path + '/current/tmp/pids/unicorn.pid'
preload_app true
GC.respond_to?(:copy_on_write_friendly=) and GC.copy_on_write_friendly = true
worker_processes 2
listen app_path + '/current/tmp/sockets/unicorn.sock', :backlog => 64
before_fork do |server, worker|
old_pid = "#{ server.config[:pid] }.oldbin"
if File.exists?(old_pid) && server.pid != old_pid
begin
sig = (worker.nr + 1) >= server.worker_processes ? :QUIT : :TTOU
Process.kill(sig, File.read(old_pid).to_i)
end
end
defined?(ActiveRecord::Base) and
ActiveRecord::Base.connection.disconnect!
end
after_fork do |server, worker|
defined?(ActiveRecord::Base) and
ActiveRecord::Base.establish_connection
Que.mode = :async
end
Hi, I'm using this great gem. thank you.
This is not bug, is question.
I'm developing with below env.
and, I have confirmed that que is very good working.
( WEBRick and Apache, Passenger env.)
but, I changed my env to use unicorn gem(4.8.2).
Then que gem create a database data, but not working job process.
How do I setting my env ?
'que' is a simple good name, but it is.
What do you think about this?
Hello, I just discovered this Project and I've started to integrate it into our app. I really like it's simplicity in setting up a new job, but I have one issue where I can't find any help on.
When I run 'rails server' it outputs every 5 seconds:
{"lib":"que","hostname":"louis.speedport.ip","pid":42723,"thread":2240071600,"event":"job_unavailable"}
However, at this point I haven't actually scheduled any Jobs and the queries:
Que.execute( "select * from que_jobs" )
Que.worker_states
Que.job_stats
return an empty Array.
When I do run a job it gets scheduled and executed correctly, but the constant spamming is somewhat annoying.
Our App runs currently on Rails 4.1.4 and uses Postgres version 9.3.5
Any help / hint to solve this would be appreciated.
Hi there,
I think Que is amazing. Nice work.
But I'm getting a ActiveRecord::ConnectionNotEstablished
crash when using it on Heroku. Here's the backtrace:
$ heroku run 'rake db:migrate'
Running `rake db:migrate` attached to terminal... up, run.6525
rake aborted!
ActiveRecord::ConnectionNotEstablished: ActiveRecord::ConnectionNotEstablished
/app/vendor/bundle/ruby/2.1.0/gems/activerecord-4.1.3/lib/active_record/connection_handling.rb:109:in `connection_pool'
/app/vendor/bundle/ruby/2.1.0/gems/que-0.8.1/lib/que/adapters/active_record.rb:30:in `checkout_activerecord_adapter'
/app/vendor/bundle/ruby/2.1.0/gems/que-0.8.1/lib/que/adapters/active_record.rb:5:in `checkout'
/app/vendor/bundle/ruby/2.1.0/gems/que-0.8.1/lib/que/job.rb:82:in `work'
/app/vendor/bundle/ruby/2.1.0/gems/que-0.8.1/lib/que/worker.rb:78:in `block in work_loop'
/app/vendor/bundle/ruby/2.1.0/gems/que-0.8.1/lib/que/worker.rb:73:in `loop'
/app/vendor/bundle/ruby/2.1.0/gems/que-0.8.1/lib/que/worker.rb:73:in `work_loop'
/app/vendor/bundle/ruby/2.1.0/gems/que-0.8.1/lib/que/worker.rb:17:in `block in initialize'
(See full trace by running task with --trace)
My configuration:
config.que.worker_count = 1
config.que.wake_interval = 10.minutes
config.que.mode = :async
It works fine if I change it to config.que.mode = :off
. This is a single Heroku web dyno. Everything works fine locally.
Any ideas?
https://bgentry.io/blog/2014/12/31/announcing-que-go
I'd love to see ports of que in more languages. Makes for a nice simple message queue between different applications.
See #74.
Rubinius 2.2 split a bunch of the standard library out into gems, and I'm not sure what's necessary in order to support it alongside older versions.
I have Que 0.7.3 in a Rails 4.1.1 application on MRI running Thin and using all default settings except for config.que.worker_count = 1
in my config/environments/development.rb
. My ActiveRecord connection pool is set to 5 connections.
When I run rails s
, on my last attempt, the app successfully started up 1 in 10 tries. For the other 9, I received the following output:
/Users/jamieenglish/projects/ee2e/.bundle/gems/activerecord-4.1.1/lib/active_record/connection_handling.rb:109:in `connection_pool': ActiveRecord::ConnectionNotEstablished (ActiveRecord::ConnectionNotEstablished)
from /Users/jamieenglish/projects/ee2e/.bundle/gems/que-0.7.3/lib/que/adapters/active_record.rb:30:in `checkout_activerecord_adapter'
from /Users/jamieenglish/projects/ee2e/.bundle/gems/que-0.7.3/lib/que/adapters/active_record.rb:5:in `checkout'
from /Users/jamieenglish/projects/ee2e/.bundle/gems/que-0.7.3/lib/que/job.rb:82:in `work'
from /Users/jamieenglish/projects/ee2e/.bundle/gems/que-0.7.3/lib/que/worker.rb:76:in `block in work_loop'
from /Users/jamieenglish/projects/ee2e/.bundle/gems/que-0.7.3/lib/que/worker.rb:73:in `loop'
from /Users/jamieenglish/projects/ee2e/.bundle/gems/que-0.7.3/lib/que/worker.rb:73:in `work_loop'
from /Users/jamieenglish/projects/ee2e/.bundle/gems/que-0.7.3/lib/que/worker.rb:17:in `block in initialize'
This seems like a race condition, as if I add sleep 2
to the top of Que::Worker#work_loop
all is fine.
Happy to provide a fix if pointed in the right direction.
I can't easily get Que to work under puma with multiple worker (single mode is fine). Que is getting instantiated in the root process and copied over into the child processes without reinitialization.
The docs say I can just set use Que.mode = :async
to make it work, but I also have to reinitialize the connection and the logger.
The problem then remaining is that the workers are not woken up periodically, since the wrangler is in the wrong process. I need this for jobs added by cron-jobs.
Am I doing something wrong? My current work-around feels brittle.
The interesting lines of my current workaround:
Que.connection= ::ActiveRecord # else we get a /que.rb:40:in `adapter': Que connection not established! (RuntimeError)
Que.logger = Rails.logger # else we don't get any log entries
Que.worker_count = que_workers # enable the que again
# Que.wake_interval = 5.seconds # crashes with ThreadError
Thread.new do
loop do
sleep 1
Que.wake!
end
end
my full configuration is:
require 'que'
puma_workers = Integer(ENV['PUMA_WORKERS'] || 2)
min_threads = Integer(ENV['MIN_THREADS'] || 8)
max_threads = Integer(ENV['MAX_THREADS'] || 8)
que_workers = Integer(ENV['QUE_WORKERS'] || 1)
Que.mode = :off
workers puma_workers
threads min_threads, max_threads
preload_app!
port ENV['PORT'] || 3000
environment ENV['RACK_ENV'] || 'development'
on_worker_boot do
ActiveRecord::Base.connection_pool.disconnect!
ActiveSupport.on_load(:active_record) do
config = ActiveRecord::Base.configurations[Rails.env] ||
Rails.application.config.database_configuration[Rails.env]
config['pool'] = max_threads + que_workers
ActiveRecord::Base.establish_connection(config)
Que.connection= ::ActiveRecord # else we get a /que.rb:40:in `adapter': Que connection not established! (RuntimeError)
Que.logger = Rails.logger # else we don't get any log entries
Que.worker_count = que_workers # enable the que again
# Que.wake_interval = 5.seconds # crashes with ThreadError
Thread.new do
loop do
sleep 1
Que.wake!
end
end
end
end
Plus I have a Que.mode = :off in my application.rb to be safe.
I suppose it could either be configurable per job or globally.
Hi.
I found strange things.
When I upgrade to rails 4.1.6 from 4.1.5.
I create DB again by rake db:migrate:reset
.
Then, que_jobs
scheme is crashed like below.
create_table "que_jobs", primary_key: "queue", force: true do |t|
t.integer "priority", limit: 2, default: 100, null: false
t.datetime "run_at", default: "now()", null: false
t.integer "job_id", limit: 8, default: "nextval('que_jobs_job_id_seq'::regclass)", null: false
t.text "job_class", null: false
t.json "args", default: [], null: false
t.integer "error_count", default: 0, null: false
t.text "last_error"
end
I back to rails 4.1.5. I migrated again.
so, It is correct scheme.
Why? my enviroment?
create_table "que_jobs", id: false, force: true do |t|
t.integer "priority", limit: 2, default: 100, null: false
t.datetime "run_at", default: "now()", null: false
t.integer "job_id", limit: 8, default: "nextval('que_jobs_job_id_seq'::regclass)", null: false
t.text "job_class", null: false
t.json "args", default: [], null: false
t.integer "error_count", default: 0, null: false
t.text "last_error"
t.text "queue", default: "", null: false
end
This appears to be a similar sort of race condition to #47. It seems the wrangler
gets called when setting the mode, and because environments aren't in after_initialize (obviously) like how the railtie sets the mode, we get errors (but not always).
I don't think this needs a code fix, but a mention in the readme or docs might be helpful. Or at least someone can search issues and find this.
Thanks for this great gem!
Hi there,
Just taking a look at your project - looks cool.
It can even do this in the background of your web process - if you're running on Heroku, for example, you won't need to run a separate worker dyno.
How exactly would you accomplish this? Would you have to spawn the thread in a unicorn before_fork
or is there a much simpler method to load on rails boot?
Thanks!
I have a couple different apps that listen to the same jobs table for jobs. For example, I have a process who's only responsible for syncing with zendesk. I have a process that's only responsible for sending emails. Etc. The zendesk process can't run any of the email jobs, and the email process can't run any of the zendesk jobs.
I don't think it's possible now to have more than one application connecting to a que_job table -- all the worker processes will try to run all the jobs, whereas I want only the zendesk process to run the zendesk jobs, the email process to run the email jobs, etc.
Usually this is done via a queue column, I think.
Any time I try to use @attrs[:run_at]
, even in exactly the same manner as described in the docs to continuous jobs, I get an error:
"error":{"class":"TypeError","message":"no implicit conversion of ActiveSupport::Duration into String"}
Perhaps something has changed at the docs haven't caught up yet?
pretty simple to do so, just need different Gemfiles in the spec folder. I'll get to this in a bit.
Hello, just found this neat project. I currently use queue_classic
to process jobs, which doesn't use advisory locks but does use NOTIFY
/LISTEN
for efficiently picking up new jobs. It looks like que
currently uses polling to detect new jobs.
I see there has been some consideration for NOTIFY
/LISTEN
in #8 and #22 and the "experimental" branch. Just wondering if that's still a feature being considered? Polling works fine for my use case, but I'm just curious. Thanks!
class Job < Que::Job
def run(start_at: end_at:)
puts "start at: #{start_at}, end at: #{end_at}"
end
end
Job.enqueue(start_at: 1.hour.ago, end_at: Time.now)
Right now, it makes start_at
a hash containing {start_at: 1.hour.ago, end_at: Time.now}
.
I took a stab at this, but it wasn't trivial. So I wanted to see what others thought first before spending more time on it.
I've been playing with Que and the ActiveJob wrapper in Rails 4.2, looks really useful so thanks to everyone who contributed so far.
It took me a while to realise that an apps Que workers will only listen to one queue at a time, defaulting to ''. Once I'd figured out I could set QUE_QUEUE
environment variable I made some progress. However I ran into trouble again when using the new deliver_later
functionality in ActionMailer. The job it creates puts mails on the mailers
queue. I've worked around this in my test app by putting all my jobs on the mailers
queue but it's not a great solution.
I can think of two solutions to this.
The simpler one would be to let QUE_QUEUE
potentially be a list of queues separated by commas and tweak the :lock_job
sql to use WHERE queue in ...
A more complicated (but more flexible) solution would be to somehow let the app override set_up_workers and create it's own workers listening to whatever queues were required. This may already be possible by setting worker_count
to 0 and doing Que::Worker.workers.push Que::Worker.new('foo')
as required. I'll test this out after I've finished writing this ticket.
Would you object to a PR allowing something like the following?
# One worker for the mailer queue and two for the default queue
Que.worker_configuration = { 'mailers' => 1, 'default' => 2 }
The issues tracker is a weird place to bring this up, but anyway - I'm going to be giving a talk on Que (and the design and concepts behind using advisory locks for job queues in general, I suppose) at the NYC Postgres User Group meetup in a couple of weeks. I haven't sketched out exactly what I'll be talking about yet, but I'm open to input on:
I don't think the presentations at NYCPUG are usually recorded, but I'll be happy to upload my slides somewhere after it's done. Thanks!
Running rails generate que:install
just installs the migration, right?
Isn't there something that needs to do Que.connection = ActiveRecord
in a rails app that uses que?
Hey there. I'm not entirely sure if this is something that I'm not doing correctly, but it's a problem that I keep encountering during development of a Rails app that I'm working on at the moment.
Whenever my schema is dumped, this is the entry for the Que table:
create_table "que_jobs", primary_key: "queue", force: true do |t|
t.integer "priority", limit: 2, default: 100, null: false
t.datetime "run_at", default: "now()", null: false
t.integer "job_id", limit: 8, default: "nextval('que_jobs_job_id_seq'::regclass)", null: false
t.text "job_class", null: false
t.json "args", default: [], null: false
t.integer "error_count", default: 0, null: false
t.text "last_error"
end
The problem is when the schema is loaded (like, say, during testing), then I start getting errors from Que regarding the SQL queries and mismatched column types:
PG::DatatypeMismatch: ERROR: column "queue" is of type integer but expression is of type text
LINE 5: (coalesce($1, '')::text, coalesce($2, 100)::smallint, ...
^
HINT: You will need to rewrite or cast the expression.
The reason why I'm sure it's to do with the schema format is that I can mitigate the problem by dropping the test DB, and then recreating it via migrations, instead of loading it via rake db:schema:load
.
I had a bit of a dig around the codebase, but I couldn't come up with any solid answers. Is it possible that Que sets something in the database which Rails cannot serialise to it's Ruby schema format? Admittedly I'm just guessing here and grasping at straws, so any help would be appreciated! 😄
When I start puma with a configuration file like this:
threads 8,8
workers 3
preload_app!
It won't exit. It prints out:
Finishing Que's current jobs before exiting...
Finishing Que's current jobs before exiting...
Finishing Que's current jobs before exiting...
and hangs forever. I have to manually kill the ruby processes and when I kill the last of the 3 process it prints out:
Finishing Que's current jobs before exiting...
Que's jobs finished, exiting...
If I remove workers 3
line, puma exits almost directly.
https://travis-ci.org/joevandyk/que/builds/16960818
run 37 on 1.9.3 failed
jruby was failing because uninitialized constant PG::SyntaxError
in migrations.
Probably should go here: https://github.com/chanks/que/blob/master/docs/customizing_que.md
Say I upload a file that takes a minute to process. I'd like to insert a que job and have it run, and have rails refresh the page every so often and show the user the status of that job (10% done, 'finished', 'error', etc).
(I think you'd probably do this via a que_job_status table? Would probably be up to the user to set that up and keep it up to date)
5 seconds seems pretty high?
The polling query should be pretty quick. I have three worker processes set to 0.01 running and there's no noticeable postgresql cpu or disk activity.
Maybe set it to 1 or 0.25 or 0.1?
As the title suggests, running rake db:reset
in a Rails app does not create que_jobs_job_id_seq
. This results in 0s for job_id
in the que_jobs
table.
Issue #2 kind of got side-tracked with rubinius and jruby so I am opening new issue instead.
The current jruby branch uses jdbc-postgres
and activerecord-jdbcpostgresql-adapter
but from what I can see it looks like it would be easier to use https://github.com/headius/jruby-pg
Which is aimed to be a "Drop-in" replacement for CRuby's pg driver.
Using that gem might heavily reduce the amount of code modifications required to support jruby. (It is currently marked as an rc level gem)
At the least, describe what SQL should be ran. I had to dig into the code to find the code that created the que_jobs table.
Had a que job that hanged forever, possibly because of a problem unrelated to que.
Got me to thinking, would it be useful to have an option for setting the maximum length a job could run? Obviously could be handled by client code easily, but could be a useful option to include in que.
error when running 'rake que:work'
("class":"PG::UndefinedFunction","message":"ERROR: operator does not exist: integer = text\nLINE 7: WHERE queue = $1::text\n ^\nHINT: No operator matches the given name and argument type(s). You might need to add explicit type casts.\n")
I was able to resolve this be making the following change in /lib/que/sql.rb (line 10)
WHERE queue = $1::text
to
WHERE queue = $1::int4
Some of the attractiveness of que
comes from the fact that it leans on Postgres, which can use SSL for transport. This is a huge plus imho, as you can't get this at all with a Redis backed system. This is a regular frustration of mine, especially when running workers in The Cloud (:tm:).
Anyway, mentioning that in your README might be a good idea, as I'm not sure it is immediately obvious to most people.
Thank you for this awesome gem!
I've a question. When I start my puma webserver with the following command:
bundle exec puma -e $RACK_ENV -p $PORT -C ./config/puma.rb
Que does not start the Que.mode
in :async
. I've added config.que.mode = :async
to <Rails.env>.rb
enable the async mode, but this also applies to the rails console. Where do I enable the Que.mode = :async
only for the webserver?
Cheers!
With rake que:work
, been running into undefined constants and other things that indicate that rails isn't fully loaded.
I noticed that resque here https://github.com/resque/resque/blob/1-x-stable/lib/resque/tasks.rb#L57 runs Rails.application.eager_load!
on boot.
que should do something similar, right?
Hi there,
We use Que with Que.mode = :off
so that we can process jobs in a dedicated worker process, via the rake que:work
task. After upgrading to v0.8.0, this has stopped processing jobs for us.
I've reproduced this issue in a vanilla Rails 4.1 app. The steps I took were:
rails new test_que
Added gem "que"
to the Gemfile
and ran bundle
Added config/initializers/que.rb
with Que.mode = :off
as the only content
Added a tiny job class to test with, such as:
class TestJob < Que::Job
def run
puts "I'm done!"
end
end
Ran rake que:work
Over in another terminal tab, opened rails console
and entered TestJob.enqueue
Back in the tab with the rake task, I watch the output and see nothing new about processing that TestJob
.
If I do all of these steps with gem "que", "0.7.3"
in my Gemfile
, then everything works. I run TestJob.enqueue
and output about processing the job immediately appears in the shell running the rake que:work
task.
Looking over your changes in v0.8.0, it looks like this might be the cause of the issue. It looks like the code for the worker to actually process the jobs only ever gets activated if Que.mode = :async
. This would be fine if you wanted Que in :async
mode, but when you want it :off
, to allow the dedicated worker process, it seems like nothing ever happens.
Am I understanding all of this right? Would a fix be to change that conditional to check for either :async
or :off
as the Que mode? Anyway, I hope this is enough information for you to look into the issue. Thank you!
I have some jobs that process a lot of data. Using a forking worker model means that each process can start with a clean slate. If the process balloons to 1GB, that data is automatically cleaned up at the end of the job.
I see
I, [2014-01-12T20:07:10.774322 #27845] INFO -- : [Que] No jobs available...
in the output even when que is processing failing jobs. I think there should be some logs saying that the job failed?
I do see:
I, [2014-01-12T20:06:40.769155 #27845] INFO -- : [Que] Worked job in 0.7 ms: #<MyJob:0xbac4f300 @attrs={"priority"=>"1", "run_at"=>"2014-01-12 20:06:37.653061-08", "job_id"=>"7", "job_class"=>"MyJob", "args"=>"[]", "error_count"=>"0", :args=>[]}, @destroyed=true>
on job success.
The discussion in #77 derailed a bit, so I'm opening a new thread for this.
The idea is to use stored procedures for grabbing an item from the queue to avoid the extra round trip otherwise necessary to deal with the race condition. I ran some initial tests using a program I wrote just a while back, and the results suggest an improvement in performance:
(The program implements the same general algorithm, but it's not exactly the code Que
uses.)
It's not clear that anyone needs queues this fast (we're already talking thousands of items per second), though, so I'm not sure if this is worth the effort in practice.
I do like to read global variable like below code.
class CreateContainer < Que::Job
def run(region, options)
$storages[region].directories.create key: options[:container_name], public: true
end
end
But I got this error.
{"class":"NoMethodError","message":"undefined method `directories' for nil:NilClass"},"job":{"queue":"","priority":100,"run_at":"2014-09-22T08:21:12.274+00:00","job_id":6,"job_class":"CreateContainer","args":["ord",{"container_name":"test_container"}],"error_count":5}}
Could you give me any suggestion?
Haven't dug into this yet, but it happens repeatably for me on the latest master, running on ubuntu 12.04 on a vm:
$ rake
/usr/local/stow/ruby-2.0.0-p247/bin/ruby -S rspec ./spec/adapters/active_record_spec.rb ./spec/adapters/connection_pool_spec.rb ./spec/adapters/pg_spec.rb ./spec/adapters/sequel_spec.rb ./spec/unit/connection_spec.rb ./spec/unit/customization_spec.rb ./spec/unit/enqueue_spec.rb ./spec/unit/helper_spec.rb ./spec/unit/logging_spec.rb ./spec/unit/migrations_spec.rb ./spec/unit/pool_spec.rb ./spec/unit/states_spec.rb ./spec/unit/stats_spec.rb ./spec/unit/work_spec.rb ./spec/unit/worker_spec.rb
.F.FF. [THEN HANGS FOREVER]
I see about three failures, then it hangs forever. apparently there's a few other test ruby processes that have been forked.
When I stop those other processes, I see the following:
Failures:
1) Que using the ActiveRecord adapter should wake up a Worker after queueing a job in async mode, waiting for a transaction to commit if necessary
Failure/Error: sleep_until { Que::Worker.workers.all?(&:sleeping?) && DB[:que_jobs].empty? }
RuntimeError:
Thing never happened!
# ./spec/support/helpers.rb:7:in `block in sleep_until'
# ./spec/support/helpers.rb:5:in `loop'
# ./spec/support/helpers.rb:5:in `sleep_until'
# ./spec/adapters/active_record_spec.rb:58:in `block (2 levels) in <top (required)>'
2) Que using the ActiveRecord adapter should instantiate args as ActiveSupport::HashWithIndifferentAccess
Failure/Error: $passed_args.first[:param].should == 2
NoMethodError:
undefined method `first' for nil:NilClass
# ./spec/adapters/active_record_spec.rb:37:in `block (2 levels) in <top (required)>'
3) Que using the ActiveRecord adapter should support Rails' special extensions for times
Failure/Error: DB[:que_jobs].get(:run_at).should be_within(3).of Time.now - 60
expected 2014-01-28 08:59:03 -0800 to be within 3 of 2014-01-28 16:59:03 -0800
# ./spec/adapters/active_record_spec.rb:46:in `block (2 levels) in <top (required)>'
4) Que using the ActiveRecord adapter behaves like a multi-threaded Que adapter should allow multiple workers to complete jobs simultaneously
Failure/Error: $q1.pop
SignalException:
SIGTERM
Shared Example Group: "a multi-threaded Que adapter" called from ./spec/adapters/active_record_spec.rb:14
# ./spec/support/shared_examples/multi_threaded_adapter.rb:31:in `block (2 levels) in <top (required)>'
5) Que using the ActiveRecord adapter behaves like a multi-threaded Que adapter behaves like a Que adapter should be able to queue and work a job
Failure/Error: result[:event].should == :job_worked
expected: :job_worked
got: :job_race_condition (using ==)
Diff:
@@ -1,2 +1,2 @@
-:job_worked
+:job_race_condition
Shared Example Group: "a Que adapter" called from ./spec/support/shared_examples/multi_threaded_adapter.rb:2
# ./spec/support/shared_examples/adapter.rb:15:in `block (2 levels) in <top (required)>'
Finished in 10.05 seconds
116 examples, 5 failures
Failed examples:
rspec ./spec/adapters/active_record_spec.rb:52 # Que using the ActiveRecord adapter should wake up a Worker after queueing a job in async mode, waiting for a transaction to commit if necessary
rspec ./spec/adapters/active_record_spec.rb:34 # Que using the ActiveRecord adapter should instantiate args as ActiveSupport::HashWithIndifferentAccess
rspec ./spec/adapters/active_record_spec.rb:41 # Que using the ActiveRecord adapter should support Rails' special extensions for times
rspec ./spec/support/shared_examples/multi_threaded_adapter.rb:28 # Que using the ActiveRecord adapter behaves like a multi-threaded Que adapter should allow multiple workers to complete jobs simultaneously
rspec ./spec/support/shared_examples/adapter.rb:12 # Que using the ActiveRecord adapter behaves like a multi-threaded Que adapter behaves like a Que adapter should be able to queue and work a job
Might be wise to implement these three things!
They definitely do help keep development as clean and mishap free as possible!
I believe the link for adding a codeclimate open source project is:
https://codeclimate.com/github/signup
Say a web process starts a job that can take a while, say we're generating a large file that gets pushed to s3.
I think this is a pretty common thing, would be nice to have a somewhat idiomatic document that explains how to do it with que. I'll see if I can work on it.
Would be nice to be able to report back job status (like "generating file", "uploading", "finalizing", etc)
I'm trying to build the same job queue but in python. I see the queries in sql.rb
but it's not very clear to me how they operate, given my moderate understanding of SQL.
Could you please explain step by step how the job_lock
query works and what guarantees it provides?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.