contribsys / faktory_worker_ruby Goto Github PK
View Code? Open in Web Editor NEWFaktory worker for Ruby
License: GNU Lesser General Public License v3.0
Faktory worker for Ruby
License: GNU Lesser General Public License v3.0
In working on an ActiveJob adapter for Faktory I'm trying to start and stop workers programmatically using Faktory::Launcher
. I can get workers to start and stop, but I can't ever seem to get them to watch any queue other than default
.
I'm basically doing this:
require "faktory/launcher"
faktory = Faktory::Launcher.new(queues: ["blarf"],
environment: "test",
concurrency: 1,
timeout: 1)
faktory.run
No matter what I put in the queues
array it always acts like I've done queues: ["default"]
.
I'm running this against the branch from #7. I haven't been able to spot any changes there that I think might affect this but it's certainly possible I've missed something.
Here's the entire process I'm using to start/stop workers if it helps. (This code is adapted from the AJ test adapter for Sidekiq: https://github.com/rails/rails/blob/master/activejob/test/support/integration/adapters/sidekiq.rb)
def start_workers
continue_read, continue_write = IO.pipe
death_read, death_write = IO.pipe
@pid = fork do
continue_read.close
death_write.close
# Faktory is not warning-clean :(
$VERBOSE = false
$stdin.reopen(File::NULL)
$stdout.sync = true
$stderr.sync = true
logfile = Rails.root.join("log/faktory.log").to_s
puts "logfile = #{logfile}"
Faktory.logger = Logger.new(logfile)
#Faktory::Logging.initialize_logger(logfile)
self_read, self_write = IO.pipe
trap "TERM" do
self_write.puts("TERM")
end
Thread.new do
begin
death_read.read
rescue Exception
end
self_write.puts("TERM")
end
require "faktory/cli"
require "faktory/launcher"
faktory = Faktory::Launcher.new(queues: ["blarf"],
environment: "test",
concurrency: 1,
timeout: 1)
#Faktory.average_scheduled_poll_interval = 0.5
#Faktory.options[:poll_interval_average] = 1
begin
faktory.run
continue_write.puts "started"
while readable_io = IO.select([self_read])
signal = readable_io.first[0].gets.strip
raise Interrupt if signal == "TERM"
end
rescue Interrupt
end
puts "about to call faktory.stop"
faktory.stop
exit!
end
continue_write.close
death_read.close
@worker_lifeline = death_write
raise "Failed to start worker" unless continue_read.gets == "started\n"
end
def stop_workers_hard
if @pid
Process.kill "TERM", @pid
Process.wait @pid
end
end
Hey Mike, was just hoping you might cut a new version of this gem. In order to land an ActiveJob adapter other folks will need to be able to get at the testing extensions and the fix you made here: ad454bb
I can't seem to push jobs using the latest server and Ruby client. Am I missing something basic?
class MyJob
include Faktory::Job
def perform(a)
# empty because I only want to push the job, not consume it, with Ruby
end
end
MyJob.perform_async(1)
Faktory::CommandError: MALFORMED json: cannot unmarshal object into Go struct field Job.jobtype of type string
from /usr/local/bundle/gems/faktory_worker_ruby-0.7.0/lib/faktory/client.rb:220:in `result'
Hi.
I have 2 rails app in the same machine and I would like to enqueue jobs using the same instance of Faktory.
The problem is that when I run
bundle exec faktory-worker
on the first rails application, it loads also jobs enqueued by the other application and of course it crashes.
Is there a way to retrieve jobs only for a specific application? Or for an array of queues?
Thanks, Stefano
I've added a line of code that calls perform_async on a job as part of an after_commit handler on one of my Active Record models, and now a bunch of unit tests are failing. I have attempted (in many locations) to call Faktory::Testing.fake!, yet it is still attempting to hit a faktory server rather than just mock things locally.
How can I properly mock Faktory for the purposes of unit testing (not integration testing)
gems/ruby-2.7.1/gems/faktory_worker_ruby-1.0.0/lib/faktory/connection.rb:10: warning: Using the last argument as keyword parameters is deprecated; maybe ** should be added to the call
gems/ruby-2.7.1/gems/faktory_worker_ruby-1.0.0/lib/faktory/client.rb:47: warning: The called method `initialize' is defined here
This is an issue when you're running the faktory-worker in a service cause you need the logs to live somewhere similar to Sidekiq.
Propsal:
bundle exec faktory-worker -L log/faktory.log
Hi there,
I have a queue for jobs that's fairly long-running ( > 90s) which goes through a large CSV file and writes its contents to a DB.
At some point in that job, I submit several more jobs (1 per row inserted) to another another set of workers on another queue.
For a while - I can't determine how long or why - jobs submitted by the long running worker will go to the correct queue. After something happens - again, I don't know what that is - the jobs are instead added under the same queue as the long running worker, completely ignoring the faktory_options queue: 'other_queue'
setting.
These jobs getting added to the wrong queue clogs up the long running queue, and at the same time, delays the results of the other queue.
Here's a concrete example, this job below takes around 90s ± to run. There's normally a few dozen to process at a time...
class ParseBatchedCSVFileJob
include Faktory::Job
faktory_options queue: "default"
def perform(file)
return unless File.file?(file)
job = ::ETL::CSVParser::Process.setup(file_path: file)
::ETL::CSVParser::Process.run(job)
end
end
Somewhere in the above ETL::CSV::Parser
code, there is more code that runs this line:
Rating::RateCDR.perform_async(cdr_id)
That class has the following faktory config and code.
module Rating
class RateCDR
class Error < StandardError; end
include Faktory::Job
extend ::LightService::Organizer
faktory_options queue: "rating"
def self.call(cdr)
with(cdr: cdr).reduce(actions)
end
def self.actions
[::Rating::AssociateRates, ::Rating::CalculateCharges]
end
def perform(cdr_id)
cdr = CDR.find(cdr_id)
response = self.class.call(cdr)
raise(Error, response.message) if response.failure?
end
end
end
So, what happens is the main job happily processes itself in the default queue, and submits rating jobs per CDR to the rating
queue, but after something happens, these jobs get diverted to the default
queue as well.
Luckily all jobs are running in the context of a rails app, which has access to all the queue's job code, so even though a Rating
job gets erroneously submitted to the default queue, the code can still be executed.
Do you have any idea why jobs aren't being submitted to the queue for which their class says they should go to?
Hello,
Wanted to report a potential bug I just uncovered; it took me a while to understand what's going on as I'm not a ruby developer by trade (but getting there). I understand the testing for the ruby worker operates by using method redirection. We actually have some unit tests set up using this and it's been working well.
Issues popped up when we updated the faktory_worker_ruby
gem to version 1.2.0
- our unit tests started erroring out saying that it couldn't connect to port 7419, which is odd because it shouldn't be hitting the network.
After diff'ing the source code for both gems, I believe the error is this: Within faktory_worker_ruby
, the lib\faktory\client.rb
file was changed to use the cgi
gem, which then changed the main open
call to now be open_socket
. However, the lib\faktory\testing.rb
file is still just redirecting the previous open
call only. Hence, no matter what, the real open_socket
call will fire and will try to contact the faktory server over the network in test mode, resulting in the error.
Using Docker(ruby:2.4
, ruby:2.5
), the Digest
module is not included and raises exception when trying to connect a client against a production Faktory. This does not happen in the development config, as it doesn't require a password.
rubyworker1_1 | 5 TID-gquxjs6bg INFO: Running in ruby 2.5.1p57 (2018-03-29 revision 63029) [x86_64-linux]
rubyworker1_1 | 5 TID-gquxjs6bg INFO: See LICENSE and the LGPL-3.0 for licensing details.
rubyworker1_1 | 5 TID-gquxll27o ERROR: Error fetching job: uninitialized constant Faktory::Client::Digest
rubyworker1_1 | 5 TID-gquxll27o ERROR: /usr/local/bundle/gems/faktory_worker_ruby-0.7.1/lib/faktory/client.rb:14:in `block in <class:Client>'
rubyworker1_1 | 5 TID-gquxll27o ERROR: /usr/local/bundle/gems/faktory_worker_ruby-0.7.1/lib/faktory/client.rb:174:in `open'
Reproduction:
Save this docker-compose.yml
file and execute it with docker-compose up
version: '3'
services:
faktory:
image: contribsys/faktory
environment:
- FAKTORY_PASSWORD=password
command: /faktory -b 0.0.0.0:7419 -e production
example:
image: ruby:2.5
depends_on: [faktory]
environment:
- FAKTORY_URL=tcp://:password@faktory:7419
command: bash -c "gem install faktory_worker_ruby && ruby -r faktory_worker_ruby -e Faktory::Client.new"
I would like to get advice on how to perform integration test (more detail on what I mean below) that involves Rails and Faktory.
Set Up
I have a project that boils down to the following:
Test Case
I am hoping to create integration tests that will run a the mentioned live and make use of the test API offered by rails.
In particular, I am wondering if it is possible to set up test instance of Faktory worker that accesses Minitest DB Transaction used in system test?
Thank you.
The blockless_inline_is_a_bad_idea_but_I_wanna_do_it_anyway!
method for testing was added when it looked like we'd need it to be able to get an ActiveJob adapter to pass the Rails test suite. Since the adapter now lives in this gem I'm not sure there's a compelling reason to keep that method around. Just thought I'd get a sanity check on that before submitting a PR.
Looks like new relic auto detects sidekiq workers but im not seeing any way to instrument a faktory worker, is there any known way to accomplish this?
Is there a way to fetch the current queue size?
We use a method in sidekiq to fetch the "least busy" queue from a subset of queues.
We're using Faktory in a setup where Rails handles the API and a couple of backend processes are written in Go. These backend processes are receiving their work through a Faktory instance.
In Rails we are pushing new jobs by utilizing (or misusing?) the ActiveJob classes. This allows us to have custom Faktory configuration per job (like queue name etc). But the Faktory adapter for ActiveJob wraps the name of the job before sending it to Faktory (ActiveJob::QueueAdapters::FaktoryAdapter::JobWrapper) so in Go we cannot register the correct handler for it. So currently we have a middleware "unwrapping" the name of the job;
class Faktory::Middleware::Unwrap
class Client
def call(payload, _pool)
wrapped_job_klass = payload.dig('custom', 'wrapped')
if wrapped_job_klass
payload['jobtype'] = wrapped_job_klass
payload['custom'] = payload['custom'].except('wrapped')
end
yield
end
end
end
I can imagine that more developers will run into this and it does not feel right with Faktory's strength to be a polyglot job processing framework. Thus it would be great to be able to configure the "wrapping" on Faktory itself instead of depending on custom middleware.
The Web UI allows you to send "quiet" and "terminate" signals to a worker process. Implement these signal handlers.
The call to localhost:7419 results in an error:
Errno::ECONNREFUSED in StaticsController#home Connection refused - connect(2) for "localhost" port 7419
I guess I need different setting for FAKTORY_URL, but I have tried a ton of different options. Any ideas?
I'm not sure if this is intended behavior, but I was surprised by it.
In Getting Started, there's a code comment that suggests that use of Faktory::Job.set
"dynamically override[s]" options. But, while it does, it also drops any other options that are defined on the Job. So, to use set
you have to pass all the options to it, in order to override, say, just 1.
Here's some code that demonstrates the behavior, using the example job in the docs:
class SomeJob
include Faktory::Job
faktory_options queue: "critical", custom: { unique_for: 10.minutes.to_i }
def perform(*)
end
end
it 'fails to use the critical queue' do
SomeJob.set(custom: { unique_for: nil }).perform_async(1)
assert_equal 1, Faktory::Queues['default'].size
assert_equal 0, Faktory::Queues['critical'].size
end
Inside my Faktory::Job, am attempting to reschedule the current job (same options) in the future.
def reschedule_job(options)
self.class.perform_in(RESCHEDULE_DELAY_DURATION, options)
end
But this reschedules in the default
queue. I would like to reschedule in the same queue if possible, but I haven't found out how to determine the current queue (my job could be enqueued in multiple queues).
Is there a way for my job to know queue that the job was dequeued from?
Thanks In advance
Using track status reports status as "unknown" whereas using track status from Go is reporting the correct status.
I have 2 different servers 1 in Go and another in Ruby and Ruby one is taking the jobs meant for Go worker is there a way to specify that in ruby Faktory worker service to listen to a specific queue only?
Thanks
Hi,
I have an inconsistent issue with the ruby worker.
When calling specific worker sometimes I get the error "unknown: No jobtype registered: RunnerWorker"
The worker doesn't recognize the "RunnerWorker".
"RunnerWorker" is the only worker in the system and I start faktory-worker like this: "bundle exec faktory-worker --require ./workers/runner_worker.rb --queue runner-manager,1 --environment production"
Your help is appreciated
Josh.
I'm wondering if there is a way to perform an action after the retries have been exhausted. Similar to the sidekiq_retries_exhausted
method. Is this possible with Faktory?
I cant seem to find a definitive list of all the options available to be used with faktory_options
, is there one somewhere that can be pointed out?
I have a worker defined in a file workers.rb
:
class ChannelWorker
include Faktory::Job
faktory_options queue: 'channels', queues:['channels'], retry: 25, backtrace: 0
def perform(username)
...
end
end
I am trying to get this worker to pull work from the channels
queue. While ChannelWorker.perform_async channel
puts work into the channels
queue, when running the worker, it does not get the work from the queue.
When removing the faktory_options
line, the worker runs like normal, but only on the default queue, which I am trying to avoid so that I can have a queue per job type.
I checked the options that the worker currently has with ChannelWorker.get_faktory_options
, which returns {"retry"=>25, "queue"=>"channels", "queues"=>["channels"], "backtrace"=>0}
, leading me to believe that it should be pulling from the queue.
Is there any way to check each worker process for its health? To be able to restart them in case if they will get stuck or freeze or anything else.
If worker isn't busy and isn't able to take new jobs, I want to kill it and restart.
It is primarily needed for deploying to Kubernetes as it is considered a best practice to always include liveness checks for containers in k8s pods. For example it may be any executable (e.g. shell script) that will return 0 if worker is healthy and something else if it is not. Also checks may be TCP or HTTP probes but I don't think that they fit for worker processes case.
Thank you for faktory!
I like to make a feature request for add support for pushing a large batch of jobs to the Faktory server, something similar to what Sidekiq offers with #push_bulk
.
The network latency of enqueueing a job individually is starting to add up when dealing with a large data set.
Sample code
markers = ids.each_slice(10).to_a
markers.each do |m|
SomeWorker.perform_async(m[0], m[-1])
end
Is there a workaround for something like this for the time being?
Since we're on GitHub, move to GitHub Actions. Sidekiq's configuration is here.
https://github.com/mperham/sidekiq/blob/master/.github/workflows/ci.yml
gem version 2.0.0
We recently added faktory_options custom: { unique_for: 1.hour.to_i }
to one of our workers and we are now getting:
Faktory::CommandError: json: unsupported value: floating point number when trying to enqueue a job.
We've used the exact line faktory_options custom: { unique_for: 1.hour.to_i }
in several other workers without an issue so this is perplexing.
module OurWorkersModule
class OurWorker
include Faktory::Job
faktory_options custom: { unique_for: 1.hour.to_i }
faktory_options queue: 'ourqueue'
....
Hello!
I was evaluating Faktory to replace our current job queue. Since Faktory server itself doesn't scale horizontally (yet), and our scale is very large, we decided to partition the jobs into two Faktory server instances (rather than different queues). Basically our web frontend will push jobs to Faktory-1 server, one cluster of workers will pull jobs off Faktory-1 server, do work, and potentially queue jobs on Faktory-2 server.
The issue we are having is that the way the Faktory::Client
is initialized, you cannot override the server location if the environment variable is set. As long as FAKTORY_PROVIDER/FAKTORY_URL
is set, Faktory::Client.new(url: ENV["FAKTORY_OTHER_URL"])
will never point to that server.
https://github.com/contribsys/faktory_worker_ruby/blob/master/lib/faktory/client.rb#L38
Could we change the initializer to be something like:
def initialize(url: uri_from_env || 'tcp://localhost:7419', debug: false)
@debug = debug
@location = URI(url)
open
end
This would allow same default functionality but allow overriding the url if manually creating clients.
I can submit a PR if you 👍
Hey
Having an issue with batches on a few specific workers.
We have a batch, which enqueues jobs, and then those jobs can add to the batch. This has been working totally fine, but for some reason occasionally this happens.
undefined method `jobs' for nil:NilClass
That line is batch.jobs do
However, as you can see, there is a bid set. (it's in custom, is that right? We don't set that manually).
Hi! I have jobs being enqueued with jobtype "AsyncWork.DocumentJob"
and then the Ruby worker client is bombing trying to constantize that.
I thought I could make middleware to handle this, but the middleware gets the instantiated jobtype instead of the "payload".
It would be pretty easy to make a patch that gives the JSON payload to middleware and doesn't instantiate the job class until after middleware is run, but that's a potentially breaking change.
Another solution would be add :jobtype
as an option to job classes like
class DocumentJob
include Faktory::Job
faktory_options jobtype: "AsyncJob.DocumentJob"
...
end
But that's tricky because we'd have to keep track of all job classes and I'm not sure how well that work with Rail's lazy loading of things.
Third option is have a global config for it?
Faktory.configure_worker do |config|
config.jobtype_map = {
"AsyncWork.DocumentJob" => "DocumentJob"
}
end
What do you think? I would be happy to implement any of these or any other idea.
Hey there, any chance of getting a new release? Took me a while to figure out why Client.push wasn't working without a job id, since the code on GitHub clearly filled it in
Faktory client fails to connect to Faktory server behind AWS Network Load Blanacer with TLS listener.
Establishing a simple Faktory client connection to a Faktory server behind an AWS Network Load Balancer with TLS listener fails:
require 'faktory'
# This is required or otherwise a NameError (uninitialized constant Faktory::Client::OpenSSL) error is raised
require 'openssl'
client = Faktory::Client.new
The stacktrace:
Traceback (most recent call last):
7: from ~/.rbenv/versions/2.7.2/bin/irb:23:in `<main>'
6: from ~/.rbenv/versions/2.7.2/bin/irb:23:in `load'
5: from ~/.rbenv/versions/2.7.2/lib/ruby/gems/2.7.0/gems/irb-1.2.6/exe/irb:11:in `<top (required)>'
4: from (irb):2
3: from (irb):2:in `new'
2: from ~/.rbenv/versions/2.7.2/lib/ruby/gems/2.7.0/gems/faktory_worker_ruby-1.1.0/lib/faktory/client.rb:58:in `initialize'
1: from ~/.rbenv/versions/2.7.2/lib/ruby/gems/2.7.0/gems/faktory_worker_ruby-1.1.0/lib/faktory/client.rb:247:in `open'
NameError (uninitialized constant Faktory::Client::OpenSSL)
Did you mean? OpenStruct
NOTE: Establishing the same connection using Go client and Python client works as expected.
Gemfile:
source 'https://rubygems.org'
ruby '2.7.2'
gem 'faktory_worker_ruby'
gem 'openssl'
Gemfile.lock:
GEM
remote: https://rubygems.org/
specs:
connection_pool (2.2.5)
faktory_worker_ruby (1.1.0)
connection_pool (~> 2.2, >= 2.2.2)
openssl (2.2.0)
PLATFORMS
ruby
DEPENDENCIES
faktory_worker_ruby
openssl
RUBY VERSION
ruby 2.7.2p137
BUNDLED WITH
2.1.4
I'm trying to connect to a local faktory instance with a password and am not having any luck.
I started the server like this:
$ FAKTORY_PASSWORD=test faktory
Then if I try to connect without configuring anything I see this, as expected:
> MyJob.perform_async(42)
ArgumentError: Server requires password, but none has been configured
So then I set ENV vars like this:
FAKTORY_PROVIDER=MY_FAKTORY_URL
MY_FAKTORY_URL=tcp://:test@localhost:7419
And then when I try to connect I see this:
> MyJob.perform_async(42)
Faktory::CommandError: ERR Invalid password
Am I missing something about how to configure the ruby client with a password?
Need to set a default open/read/write 5 sec timeout and (less important) expose the option for config.
This fails unless there is a 'custom' key in the hash. #dig would be the current obvious choice here - but that is Ruby 2.3 and higher.
Since you support 2.2.2 and higher maybe the easiest option is to just pull this https://github.com/marcandre/backports/blob/master/lib/backports/2.3.0/hash/dig.rb in from backports until you push up to 2.3 where it could go away. It's all of 10 lines so it's not a huge push. Obviously adding backports as a requirement would work as well but might be a little heavier lift.
It would be nice to be able to fake queueing jobs for testing like sidekiq does. I'm not sure the correct way to go about adding something like that but I'll certainly take a crack at it if no one else wants to.
Hi @mperham,
We've been consistently seeing occurrences of [Faktory::ParseError](https://github.com/contribsys/faktory_worker_ruby/blob/master/lib/faktory/client.rb#L341)
, for roughly 0.01% of our Jobs.
line.strip
is always blank. The comment suggests that, "this is bad, indicates we need to reset the socket and start fresh".
Do you have any experience with what usually triggers these and any suggestion for an improvement we could PR up and submit to handle these Parse Errors? (or other recommendation on how to handle on our end)
Thanks!
Issue migrated from contribsys/faktory#411
Which Faktory package and version?
Faktory 1.6.1 Docker image (contribsys/faktory:1.6.1)
Which Faktory worker package and version?
FWR 1.1.1
Please include any relevant worker configuration
Please include any relevant error messages or stacktraces
/usr/local/bundle/gems/faktory_worker_ruby-1.1.1/lib/faktory/client.rb:352:in `result': Invalid password\r (Faktory::CommandError)
from /usr/local/bundle/gems/faktory_worker_ruby-1.1.1/lib/faktory/client.rb:362:in `ok'
from /usr/local/bundle/gems/faktory_worker_ruby-1.1.1/lib/faktory/client.rb:294:in `open'
from /usr/local/bundle/gems/faktory_worker_ruby-1.1.1/lib/faktory/client.rb:58:in `initialize'
from -e:1:in `new'
from -e:1:in `<main>'
Are you using an old version? No
I'm using Faktory docker image version 1.6.1 (contribsys/faktory:1.6.1) and FWR 1.1.1
When I set a password for Faktory (using the env var FAKTORY_PASSWORD
on the server) and the password has an invalid URL character (eg: @
), when I connect to the server with FWR, I get the invalid password error. If I use a password without any URL invalid characters, everything works fine.
In the FAKTORY_URL I used the url-encoded password.
This docker compose file reproduces the error. If the password is changed to password
(in both FAKTORY_PASSWORD
and FAKTORY_URL
), FWR can fetch jobs.
version: '3'
services:
faktory:
image: contribsys/faktory
environment:
- FAKTORY_PASSWORD=p@ssword
command: /faktory
example:
image: ruby:2.7.6
depends_on: [faktory]
environment:
- FAKTORY_URL=tcp://:p%40ssword@faktory:7419
command: bash -c "gem install faktory_worker_ruby && ruby -r faktory_worker_ruby -e Faktory::Client.new"
Maybe a url-decode is missing before computing the password hash??
We have an issue in which jobs are not being processed.
We are running a Faktory worker in Ruby 2.7.2 using a configuration file:
$ bundle exec faktory-worker -C config/faktory/worker.yml
The worker.yml
file looks like this:
:queues:
- jobs
:concurrency: 10
:timeout: 25
An example: JobWorker
class:
class JobWorker
include Faktory::Job
def perform(arg1, arg2)
puts "Arg1: #{arg1}"
puts "Arg2: #{arg2}"
end
end
Pushing a job to Faktory:
jid = SecureRandom.hex(12)
Faktory::Client.new.push(
jid: jid,
queue: "jobs",
jobtype: "JobWorker",
args: ["arg1", "arg2"]
)
The job is stuck in the jobs queue. I would expect the job to be processed by the worker.
I suspect it's related to the fact the JobWorker doesn't have a queue
option but our requirement is to have JobWorker processed by several processes from different queues.
What am I missing?
Trying to do the below task in order to create unique jobs, as per:
https://github.com/contribsys/faktory/wiki/Pro-Unique-Jobs
My job classes look something like:
class FaktoryJob < ActiveJob::Base
self.queue_adapter = :faktory
queue_as 'default'
end
class SleepJob < FaktoryJob
faktory_options custom: { unique_for: 5 }
def perform
sleep 10.seconds
end
end
It seems that there might be an issue with the custom
object being passed in correctly utilising ActiveJob
vs native Faktory::Jobs
As you can see two identical jobs are enqueued
immediately after one another:
Is there a timing for the next release / can you cut one? We are having a hard time pulling the main branch to get this commit and having it boot correctly in our k8s cluster. We would rather not rewrite our config just to change it back when the release comes.
I want to run my Faktory workers on a Kubernetes cluster.
The example at Fine Parallel Processing Using a Work Queue shows the worker exiting when the queue is empty.
Is there such an option in faktory-worker currently?
As a developer, I want to run my faktory_worker under a daemon worker and write log to a logfile
https://github.com/contribsys/faktory_worker_ruby/blob/master/lib/faktory/cli.rb#L31-L33
Based on what I have learned from sidekiq cli, we can bring all daemon stuff to it, right? I have tried adding daemon config to our gem and it's working in my end, so I'm gonna create an issue there and if you allow me to create a PR to add this one, please let me know
I'm trying to get going with this and am running into a weird error.
First I installed the faktory server via homebrew.
Then I start faktory
locally:
$ faktory
Faktory 0.5.0
Copyright © 2017 Contributed Systems LLC
Licensed under the GNU Public License 3.0
INFO[2017-11-08T22:20:41.828681916-06:00] Initializing storage at /Users/jgreen/.faktory/db
INFO[2017-11-08T22:20:41.850672271-06:00] Now listening at localhost:7419, press Ctrl-C to stop
INFO[2017-11-08T22:20:41.850750997-06:00] Web server now listening on port 7420
Then I installed this gem in my Gemfile
gem 'faktory_worker_ruby', git: 'https://github.com/contribsys/faktory_worker_ruby'
Then in app/jobs/my_job.rb
I have this code:
require 'faktory'
class MyJob
include Faktory::Job
def perform(some_id)
puts "we should do something with this id : #{some_id}"
end
end
Then I open a Rails console and do:
> MyJob.perform_async(42)
Errno::ECONNRESET: Connection reset by peer - No response
from (irb):4
The server console shows this when the error happens:
ERRO[2017-11-08T22:20:53.102064455-06:00] Invalid client data in AHOYInvalid client Wid
So, apparently the Rails team isn't accepting new AJ adapters (rails/rails#32285) but are asking that the adapter code live in outside gems. Is this project (faktory_worker_ruby
) the right place to make a PR for an AJ adapter, or should that be a completely seperate gem?
I have a job that may conditionally en-queue other jobs when it's running. I have found that the Client Middleware is not evoked when the code is run from within the worker.
I would expect that the client middleware would still get used even when used from inside faktory-worker
(since it's still using the Faktory client to push more jobs out).
Here's a small demo app:
require 'faktory_worker_ruby'
class LoudMiddleware
def call(*_)
puts "BEFORE THE JOB!"
puts "!!!!!!!!!!!!!!!\n"* 3
yield
puts "AFTER THE JOB!\n"
puts "!!!!!!!!!!!!!!!\n"* 3
end
end
Faktory.configure_client do |config|
config.client_middleware do |chain|
chain.add LoudMiddleware
end
end
class MyJob
include Faktory::Job
def perform *args
5.times {|i| SecondaryJob.perform_async i}
puts "Doing #{args}"
end
end
class SecondaryJob
include Faktory::Job
def perform *args
puts "Wrapping up: #{args}"
end
end
MyJob.perform_async 'some stuff'
$ ruby app.rb
# Enqueues a single job, and the middleware shouts as it pushes the job to faktory
$ faktory-worker -r ./app.rb
# Also enqueues that job, since it's at the end, but does not shout.
# Also, the processing of the job includes enqueing 5 more jobs, which also do not get middlewared.
ttilberg@timdesktop:~/dev/fkt_middleware$ ruby app.rb
BEFORE THE JOB!
!!!!!!!!!!!!!!!
!!!!!!!!!!!!!!!
!!!!!!!!!!!!!!!
AFTER THE JOB!
!!!!!!!!!!!!!!!
!!!!!!!!!!!!!!!
!!!!!!!!!!!!!!!
ttilberg@timdesktop:~/dev/fkt_middleware$ faktory-worker -r ./app.rb
,,,,
,,,, | |
| | | |
| | | |
| |,,~~' '~, ___ _
,,~~'' ,~~, '~, / __) | | _
,,~~'' ,~~, | | | _| |__ _____| | _ _| |_ ___ ____ _ _
| ,~~, | | | | | (_ __|____ | |_/ |_ _) _ \ / ___) | | |
| | | | | | | | | | / ___ | _ ( | || |_| | | | |_| |
| |__| |__| |__| | |_| \_____|_| \_) \__)___/|_| \__ |
|__________________________| (____/
2019-05-24T19:51:19.067Z 15827 TID-gptgbogs0 INFO: Running in ruby 2.5.3p105 (2018-10-18 revision 65156) [x86_64-linux]
2019-05-24T19:51:19.067Z 15827 TID-gptgbogs0 INFO: See LICENSE and the LGPL-3.0 for licensing details.
2019-05-24T19:51:19.067Z 15827 TID-gptgbogs0 INFO: Starting processing, hit Ctrl-C to stop
2019-05-24T19:51:19.075Z 15827 TID-gptgejawo MyJob JID-63b98e5eea80d8594cd5d7e2 INFO: start
2019-05-24T19:51:19.075Z 15827 TID-gptgej9s4 MyJob JID-5b0871c9b745362a6cda047a INFO: start
2019-05-24T19:51:19.075Z 15827 TID-gptgej9m0 SecondaryJob JID-5e713e7cb88f0682cf001549 INFO: start
Wrapping up: [0]
2019-05-24T19:51:19.075Z 15827 TID-gptgej9m0 SecondaryJob JID-5e713e7cb88f0682cf001549 INFO: done: 0.0 sec
2019-05-24T19:51:19.075Z 15827 TID-gptgej9fw SecondaryJob JID-f17c4ac4a1111af538bccbd8 INFO: start
Wrapping up: [0]
2019-05-24T19:51:19.076Z 15827 TID-gptgej9fw SecondaryJob JID-f17c4ac4a1111af538bccbd8 INFO: done: 0.0 sec
2019-05-24T19:51:19.076Z 15827 TID-gptgej93o SecondaryJob JID-d75c75c75e66ca36b5ee6eea INFO: start
Wrapping up: [1]
2019-05-24T19:51:19.076Z 15827 TID-gptgej93o SecondaryJob JID-d75c75c75e66ca36b5ee6eea INFO: done: 0.0 sec
2019-05-24T19:51:19.077Z 15827 TID-gptgej8lc SecondaryJob JID-9408c4c062de490c8c4283e8 INFO: start
Doing ["do some stuff"]
Doing ["do some stuff"]
Wrapping up: [3]
2019-05-24T19:51:19.077Z 15827 TID-gptgej9m0 SecondaryJob JID-04a8ae11a5f3ba5669b02723 INFO: start
Wrapping up: [4]
2019-05-24T19:51:19.077Z 15827 TID-gptgej9m0 SecondaryJob JID-04a8ae11a5f3ba5669b02723 INFO: done: 0.0 sec
2019-05-24T19:51:19.077Z 15827 TID-gptgej9fw SecondaryJob JID-e5d1cac96085eca6dc4f1b9a INFO: start
Wrapping up: [4]
2019-05-24T19:51:19.077Z 15827 TID-gptgejawo MyJob JID-63b98e5eea80d8594cd5d7e2 INFO: done: 0.002 sec
2019-05-24T19:51:19.077Z 15827 TID-gptgej8f8 SecondaryJob JID-3b1e572fba828ed10d9c81c7 INFO: start
Wrapping up: [3]
2019-05-24T19:51:19.077Z 15827 TID-gptgej8f8 SecondaryJob JID-3b1e572fba828ed10d9c81c7 INFO: done: 0.001 sec
2019-05-24T19:51:19.076Z 15827 TID-gptgej8rg SecondaryJob JID-92e701ca2896b314bb304c91 INFO: start
Wrapping up: [2]
2019-05-24T19:51:19.078Z 15827 TID-gptgej8rg SecondaryJob JID-92e701ca2896b314bb304c91 INFO: done: 0.001 sec
2019-05-24T19:51:19.076Z 15827 TID-gptgej8xk SecondaryJob JID-7338f7c156be7b94353ab587 INFO: start
Wrapping up: [2]
2019-05-24T19:51:19.076Z 15827 TID-gptgej99s SecondaryJob JID-0ab401372a385d341923f55b INFO: start
Wrapping up: [1]
2019-05-24T19:51:19.077Z 15827 TID-gptgej9fw SecondaryJob JID-e5d1cac96085eca6dc4f1b9a INFO: done: 0.0 sec
2019-05-24T19:51:19.077Z 15827 TID-gptgej9s4 MyJob JID-5b0871c9b745362a6cda047a INFO: done: 0.002 sec
2019-05-24T19:51:19.077Z 15827 TID-gptgej8lc SecondaryJob JID-9408c4c062de490c8c4283e8 INFO: done: 0.0 sec
2019-05-24T19:51:19.078Z 15827 TID-gptgej8xk SecondaryJob JID-7338f7c156be7b94353ab587 INFO: done: 0.001 sec
2019-05-24T19:51:19.078Z 15827 TID-gptgej99s SecondaryJob JID-0ab401372a385d341923f55b INFO: done: 0.002 sec
^C2019-05-24T19:51:25.290Z 15827 TID-gptgbogs0 INFO: Shutting down
2019-05-24T19:51:25.290Z 15827 TID-gptgbogs0 INFO: Terminating quiet threads
2019-05-24T19:51:25.390Z 15827 TID-gptgbogs0 INFO: Pausing to allow threads to finish...
2019-05-24T19:51:28.393Z 15827 TID-gptgbogs0 INFO: Bye!
Faktory 1.6 added PUSHB. Sidekiq just added support for lazy enumerators in perform_bulk. Pull the combination into FWR.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.