Code Monkey home page Code Monkey logo

faktory_worker_ruby's People

Contributors

ahacop avatar bryanrite avatar envek avatar ffiller avatar fnordfish avatar ibrahima avatar igneous avatar jagthedrummer avatar jpwinans avatar jweslley avatar motymichaely avatar mperham avatar msnexploder avatar nathanpalmer avatar neilturner77 avatar razenpok avatar scottrobertson avatar thebadmonkeydev avatar ttasanen avatar ttilberg avatar y-yagi avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

faktory_worker_ruby's Issues

List of queues passed to Faktory::Launcher is ignored

In working on an ActiveJob adapter for Faktory I'm trying to start and stop workers programmatically using Faktory::Launcher. I can get workers to start and stop, but I can't ever seem to get them to watch any queue other than default.

I'm basically doing this:

require "faktory/launcher"
faktory = Faktory::Launcher.new(queues: ["blarf"],
                                 environment: "test",
                                 concurrency: 1,
                                 timeout: 1)
faktory.run

No matter what I put in the queues array it always acts like I've done queues: ["default"].

I'm running this against the branch from #7. I haven't been able to spot any changes there that I think might affect this but it's certainly possible I've missed something.

Here's the entire process I'm using to start/stop workers if it helps. (This code is adapted from the AJ test adapter for Sidekiq: https://github.com/rails/rails/blob/master/activejob/test/support/integration/adapters/sidekiq.rb)

  def start_workers
    continue_read, continue_write = IO.pipe
    death_read, death_write = IO.pipe

    @pid = fork do
      continue_read.close
      death_write.close

      # Faktory is not warning-clean :(
      $VERBOSE = false

      $stdin.reopen(File::NULL)
      $stdout.sync = true
      $stderr.sync = true

      logfile = Rails.root.join("log/faktory.log").to_s
      puts "logfile = #{logfile}"
      Faktory.logger = Logger.new(logfile)
      #Faktory::Logging.initialize_logger(logfile)

      self_read, self_write = IO.pipe
      trap "TERM" do
        self_write.puts("TERM")
      end

      Thread.new do
        begin
          death_read.read
        rescue Exception
        end
        self_write.puts("TERM")
      end

      require "faktory/cli"
      require "faktory/launcher"
      faktory = Faktory::Launcher.new(queues: ["blarf"],
                                       environment: "test",
                                       concurrency: 1,
                                       timeout: 1)
      #Faktory.average_scheduled_poll_interval = 0.5
      #Faktory.options[:poll_interval_average] = 1
      begin
        faktory.run
        continue_write.puts "started"
        while readable_io = IO.select([self_read])
          signal = readable_io.first[0].gets.strip
          raise Interrupt if signal == "TERM"
        end
      rescue Interrupt
      end

      puts "about to call faktory.stop"
      faktory.stop
      exit!
    end
    continue_write.close
    death_read.close
    @worker_lifeline = death_write

    raise "Failed to start worker" unless continue_read.gets == "started\n"
  end

  def stop_workers_hard
    if @pid
      Process.kill "TERM", @pid
      Process.wait @pid
    end
  end

Version 0.6.2?

Hey Mike, was just hoping you might cut a new version of this gem. In order to land an ActiveJob adapter other folks will need to be able to get at the testing extensions and the fix you made here: ad454bb

Can't push job

I can't seem to push jobs using the latest server and Ruby client. Am I missing something basic?

class MyJob
  include Faktory::Job

  def perform(a)
    # empty because I only want to push the job, not consume it, with Ruby
  end
end
MyJob.perform_async(1)
Faktory::CommandError: MALFORMED json: cannot unmarshal object into Go struct field Job.jobtype of type string
from /usr/local/bundle/gems/faktory_worker_ruby-0.7.0/lib/faktory/client.rb:220:in `result'

2 rails app with the same instance of Faktory

Hi.
I have 2 rails app in the same machine and I would like to enqueue jobs using the same instance of Faktory.
The problem is that when I run
bundle exec faktory-worker
on the first rails application, it loads also jobs enqueued by the other application and of course it crashes.
Is there a way to retrieve jobs only for a specific application? Or for an array of queues?
Thanks, Stefano

Faktory::Testing.fake! is still attempting to hit localhost....

I've added a line of code that calls perform_async on a job as part of an after_commit handler on one of my Active Record models, and now a bunch of unit tests are failing. I have attempted (in many locations) to call Faktory::Testing.fake!, yet it is still attempting to hit a faktory server rather than just mock things locally.

How can I properly mock Faktory for the purposes of unit testing (not integration testing)

Ruby 2.7: warning: Using the last argument as keyword parameters is deprecated

gems/ruby-2.7.1/gems/faktory_worker_ruby-1.0.0/lib/faktory/connection.rb:10: warning: Using the last argument as keyword parameters is deprecated; maybe ** should be added to the call
gems/ruby-2.7.1/gems/faktory_worker_ruby-1.0.0/lib/faktory/client.rb:47: warning: The called method `initialize' is defined here

Add -L option output the log

This is an issue when you're running the faktory-worker in a service cause you need the logs to live somewhere similar to Sidekiq.

Propsal:

bundle exec faktory-worker -L log/faktory.log

Jobs submitted by other jobs are going to the wrong queue

Hi there,

I have a queue for jobs that's fairly long-running ( > 90s) which goes through a large CSV file and writes its contents to a DB.

At some point in that job, I submit several more jobs (1 per row inserted) to another another set of workers on another queue.

For a while - I can't determine how long or why - jobs submitted by the long running worker will go to the correct queue. After something happens - again, I don't know what that is - the jobs are instead added under the same queue as the long running worker, completely ignoring the faktory_options queue: 'other_queue' setting.

These jobs getting added to the wrong queue clogs up the long running queue, and at the same time, delays the results of the other queue.

Here's a concrete example, this job below takes around 90s ± to run. There's normally a few dozen to process at a time...

class ParseBatchedCSVFileJob
  include Faktory::Job

  faktory_options queue: "default"

  def perform(file)
    return unless File.file?(file)

    job = ::ETL::CSVParser::Process.setup(file_path: file)

    ::ETL::CSVParser::Process.run(job)
  end
end

Somewhere in the above ETL::CSV::Parser code, there is more code that runs this line:

Rating::RateCDR.perform_async(cdr_id)

That class has the following faktory config and code.

module Rating
  class RateCDR
    class Error < StandardError; end

    include Faktory::Job
    extend ::LightService::Organizer

    faktory_options queue: "rating"

    def self.call(cdr)
      with(cdr: cdr).reduce(actions)
    end

    def self.actions
      [::Rating::AssociateRates, ::Rating::CalculateCharges]
    end

    def perform(cdr_id)
      cdr = CDR.find(cdr_id)

      response = self.class.call(cdr)
      raise(Error, response.message) if response.failure?
    end
  end
end

So, what happens is the main job happily processes itself in the default queue, and submits rating jobs per CDR to the rating queue, but after something happens, these jobs get diverted to the default queue as well.

Luckily all jobs are running in the context of a rails app, which has access to all the queue's job code, so even though a Rating job gets erroneously submitted to the default queue, the code can still be executed.

Do you have any idea why jobs aren't being submitted to the queue for which their class says they should go to?

Issue with unit testing

Hello,

Wanted to report a potential bug I just uncovered; it took me a while to understand what's going on as I'm not a ruby developer by trade (but getting there). I understand the testing for the ruby worker operates by using method redirection. We actually have some unit tests set up using this and it's been working well.

Issues popped up when we updated the faktory_worker_ruby gem to version 1.2.0 - our unit tests started erroring out saying that it couldn't connect to port 7419, which is odd because it shouldn't be hitting the network.

After diff'ing the source code for both gems, I believe the error is this: Within faktory_worker_ruby, the lib\faktory\client.rb file was changed to use the cgi gem, which then changed the main open call to now be open_socket. However, the lib\faktory\testing.rb file is still just redirecting the previous open call only. Hence, no matter what, the real open_socket call will fire and will try to contact the faktory server over the network in test mode, resulting in the error.

Docker Ruby does not auto-load `digest` module, required by production

Using Docker(ruby:2.4, ruby:2.5), the Digest module is not included and raises exception when trying to connect a client against a production Faktory. This does not happen in the development config, as it doesn't require a password.

rubyworker1_1  | 5 TID-gquxjs6bg INFO: Running in ruby 2.5.1p57 (2018-03-29 revision 63029) [x86_64-linux]
rubyworker1_1  | 5 TID-gquxjs6bg INFO: See LICENSE and the LGPL-3.0 for licensing details.
rubyworker1_1  | 5 TID-gquxll27o ERROR: Error fetching job: uninitialized constant Faktory::Client::Digest
rubyworker1_1  | 5 TID-gquxll27o ERROR: /usr/local/bundle/gems/faktory_worker_ruby-0.7.1/lib/faktory/client.rb:14:in `block in <class:Client>'
rubyworker1_1  | 5 TID-gquxll27o ERROR: /usr/local/bundle/gems/faktory_worker_ruby-0.7.1/lib/faktory/client.rb:174:in `open'

Reproduction:

Save this docker-compose.yml file and execute it with docker-compose up

version: '3'
services:
  faktory:
    image: contribsys/faktory
    environment:
      - FAKTORY_PASSWORD=password
    command: /faktory -b 0.0.0.0:7419 -e production

  example:
    image: ruby:2.5
    depends_on: [faktory]
    environment:
      - FAKTORY_URL=tcp://:password@faktory:7419
    command: bash -c "gem install faktory_worker_ruby && ruby -r faktory_worker_ruby -e Faktory::Client.new"

Advice on Integration Tests

I would like to get advice on how to perform integration test (more detail on what I mean below) that involves Rails and Faktory.

Set Up

I have a project that boils down to the following:

  • Rails Web Application with UI
  • Rails Faktory Worker
  • Golang daemon that creates Faktory jobs

Test Case

  1. Daemon sends jobs to faktory
  2. Faktory ruby worker processes, updates DB and sends ActionCable broadcast
  3. UI is updated

I am hoping to create integration tests that will run a the mentioned live and make use of the test API offered by rails.
In particular, I am wondering if it is possible to set up test instance of Faktory worker that accesses Minitest DB Transaction used in system test?

Thank you.

Should we remove blockless_inline_is_a_bad_idea_but_I_wanna_do_it_anyway?

The blockless_inline_is_a_bad_idea_but_I_wanna_do_it_anyway! method for testing was added when it looked like we'd need it to be able to get an ActiveJob adapter to pass the Rails test suite. Since the adapter now lives in this gem I'm not sure there's a compelling reason to keep that method around. Just thought I'd get a sanity check on that before submitting a PR.

Queue Size

Is there a way to fetch the current queue size?

We use a method in sidekiq to fetch the "least busy" queue from a subset of queues.

Unable to process jobs outside of ActiveJob

We're using Faktory in a setup where Rails handles the API and a couple of backend processes are written in Go. These backend processes are receiving their work through a Faktory instance.

In Rails we are pushing new jobs by utilizing (or misusing?) the ActiveJob classes. This allows us to have custom Faktory configuration per job (like queue name etc). But the Faktory adapter for ActiveJob wraps the name of the job before sending it to Faktory (ActiveJob::QueueAdapters::FaktoryAdapter::JobWrapper) so in Go we cannot register the correct handler for it. So currently we have a middleware "unwrapping" the name of the job;

class Faktory::Middleware::Unwrap
  class Client
    def call(payload, _pool)
      wrapped_job_klass = payload.dig('custom', 'wrapped')
      if wrapped_job_klass
        payload['jobtype'] = wrapped_job_klass
        payload['custom'] = payload['custom'].except('wrapped')
      end
      yield
    end
  end
end

I can imagine that more developers will run into this and it does not feel right with Faktory's strength to be a polyglot job processing framework. Thus it would be great to be able to configure the "wrapping" on Faktory itself instead of depending on custom middleware.

Faktory::Job.set ignores class-defined faktory_options

I'm not sure if this is intended behavior, but I was surprised by it.

In Getting Started, there's a code comment that suggests that use of Faktory::Job.set "dynamically override[s]" options. But, while it does, it also drops any other options that are defined on the Job. So, to use set you have to pass all the options to it, in order to override, say, just 1.

Here's some code that demonstrates the behavior, using the example job in the docs:

class SomeJob
  include Faktory::Job
  faktory_options queue: "critical", custom: { unique_for: 10.minutes.to_i }
  def perform(*)
  end
end

it 'fails to use the critical queue' do
  SomeJob.set(custom: { unique_for: nil }).perform_async(1)
  assert_equal 1, Faktory::Queues['default'].size
  assert_equal 0, Faktory::Queues['critical'].size
end

Reschedule job in current queue

Inside my Faktory::Job, am attempting to reschedule the current job (same options) in the future.

  def reschedule_job(options)
    self.class.perform_in(RESCHEDULE_DELAY_DURATION, options)
  end

But this reschedules in the default queue. I would like to reschedule in the same queue if possible, but I haven't found out how to determine the current queue (my job could be enqueued in multiple queues).

Is there a way for my job to know queue that the job was dequeued from?

Thanks In advance

getting Error: "unknown: No jobtype registered: RunnerWorker"

Hi,

I have an inconsistent issue with the ruby worker.

When calling specific worker sometimes I get the error "unknown: No jobtype registered: RunnerWorker"

The worker doesn't recognize the "RunnerWorker".

"RunnerWorker" is the only worker in the system and I start faktory-worker like this: "bundle exec faktory-worker --require ./workers/runner_worker.rb --queue runner-manager,1 --environment production"

Your help is appreciated

Josh.

Event for when retries are exhausted

I'm wondering if there is a way to perform an action after the retries have been exhausted. Similar to the sidekiq_retries_exhausted method. Is this possible with Faktory?

List of options for faktory_options

I cant seem to find a definitive list of all the options available to be used with faktory_options, is there one somewhere that can be pointed out?

Worker only checking default channel?

I have a worker defined in a file workers.rb:

class ChannelWorker
  include Faktory::Job

  faktory_options queue: 'channels', queues:['channels'], retry: 25, backtrace: 0

  def perform(username)
  ...
  end
end

I am trying to get this worker to pull work from the channels queue. While ChannelWorker.perform_async channel puts work into the channels queue, when running the worker, it does not get the work from the queue.

When removing the faktory_options line, the worker runs like normal, but only on the default queue, which I am trying to avoid so that I can have a queue per job type.

I checked the options that the worker currently has with ChannelWorker.get_faktory_options, which returns {"retry"=>25, "queue"=>"channels", "queues"=>["channels"], "backtrace"=>0}, leading me to believe that it should be pulling from the queue.

How to determine that worker process is alive and working?

Is there any way to check each worker process for its health? To be able to restart them in case if they will get stuck or freeze or anything else.

If worker isn't busy and isn't able to take new jobs, I want to kill it and restart.

It is primarily needed for deploying to Kubernetes as it is considered a best practice to always include liveness checks for containers in k8s pods. For example it may be any executable (e.g. shell script) that will return 0 if worker is healthy and something else if it is not. Also checks may be TCP or HTTP probes but I don't think that they fit for worker processes case.

Thank you for faktory!

Bulk push of jobs

I like to make a feature request for add support for pushing a large batch of jobs to the Faktory server, something similar to what Sidekiq offers with #push_bulk.

The network latency of enqueueing a job individually is starting to add up when dealing with a large data set.

Sample code

markers = ids.each_slice(10).to_a

markers.each do |m|
  SomeWorker.perform_async(m[0], m[-1])
end

Is there a workaround for something like this for the time being?

unique_for throwing Faktory::CommandError: json: unsupported value: floating point number

gem version 2.0.0

We recently added faktory_options custom: { unique_for: 1.hour.to_i } to one of our workers and we are now getting:
Faktory::CommandError: json: unsupported value: floating point number when trying to enqueue a job.

We've used the exact line faktory_options custom: { unique_for: 1.hour.to_i } in several other workers without an issue so this is perplexing.

image

module OurWorkersModule
  class OurWorker
      include Faktory::Job
  
      faktory_options custom: { unique_for: 1.hour.to_i }
      faktory_options queue: 'ourqueue'

....

Client Initialization with Multiple Faktories

Hello!

I was evaluating Faktory to replace our current job queue. Since Faktory server itself doesn't scale horizontally (yet), and our scale is very large, we decided to partition the jobs into two Faktory server instances (rather than different queues). Basically our web frontend will push jobs to Faktory-1 server, one cluster of workers will pull jobs off Faktory-1 server, do work, and potentially queue jobs on Faktory-2 server.

The issue we are having is that the way the Faktory::Client is initialized, you cannot override the server location if the environment variable is set. As long as FAKTORY_PROVIDER/FAKTORY_URL is set, Faktory::Client.new(url: ENV["FAKTORY_OTHER_URL"]) will never point to that server.

https://github.com/contribsys/faktory_worker_ruby/blob/master/lib/faktory/client.rb#L38

Could we change the initializer to be something like:

def initialize(url: uri_from_env || 'tcp://localhost:7419', debug: false)
  @debug = debug
  @location = URI(url)
  open
end

This would allow same default functionality but allow overriding the url if manually creating clients.

I can submit a PR if you 👍

undefined method `jobs' for nil:NilClass

Hey

Having an issue with batches on a few specific workers.

We have a batch, which enqueues jobs, and then those jobs can add to the batch. This has been working totally fine, but for some reason occasionally this happens.

undefined method `jobs' for nil:NilClass

That line is batch.jobs do

However, as you can see, there is a bid set. (it's in custom, is that right? We don't set that manually).

Screenshot 2020-01-20 at 15 39 44

jobtype in polyglot

Hi! I have jobs being enqueued with jobtype "AsyncWork.DocumentJob" and then the Ruby worker client is bombing trying to constantize that.

I thought I could make middleware to handle this, but the middleware gets the instantiated jobtype instead of the "payload".

It would be pretty easy to make a patch that gives the JSON payload to middleware and doesn't instantiate the job class until after middleware is run, but that's a potentially breaking change.

Another solution would be add :jobtype as an option to job classes like

class DocumentJob
  include Faktory::Job
  faktory_options jobtype: "AsyncJob.DocumentJob"
  ...
end

But that's tricky because we'd have to keep track of all job classes and I'm not sure how well that work with Rail's lazy loading of things.

Third option is have a global config for it?

Faktory.configure_worker do |config|
  config.jobtype_map = {
    "AsyncWork.DocumentJob" => "DocumentJob"
  }
end

What do you think? I would be happy to implement any of these or any other idea.

Release?

Hey there, any chance of getting a new release? Took me a while to figure out why Client.push wasn't working without a job id, since the code on GitHub clearly filled it in

NoMethodError (undefined method `wait_readable' for #<OpenSSL::SSL::SSLSocket:0x00007fc7641dea60>)

Faktory client fails to connect to Faktory server behind AWS Network Load Blanacer with TLS listener.

Establishing a simple Faktory client connection to a Faktory server behind an AWS Network Load Balancer with TLS listener fails:

require 'faktory'
# This is required or otherwise a NameError (uninitialized constant Faktory::Client::OpenSSL) error is raised
require 'openssl' 

client = Faktory::Client.new

The stacktrace:

Traceback (most recent call last):
        7: from ~/.rbenv/versions/2.7.2/bin/irb:23:in `<main>'
        6: from ~/.rbenv/versions/2.7.2/bin/irb:23:in `load'
        5: from ~/.rbenv/versions/2.7.2/lib/ruby/gems/2.7.0/gems/irb-1.2.6/exe/irb:11:in `<top (required)>'
        4: from (irb):2
        3: from (irb):2:in `new'
        2: from ~/.rbenv/versions/2.7.2/lib/ruby/gems/2.7.0/gems/faktory_worker_ruby-1.1.0/lib/faktory/client.rb:58:in `initialize'
        1: from ~/.rbenv/versions/2.7.2/lib/ruby/gems/2.7.0/gems/faktory_worker_ruby-1.1.0/lib/faktory/client.rb:247:in `open'
NameError (uninitialized constant Faktory::Client::OpenSSL)
Did you mean?  OpenStruct

NOTE: Establishing the same connection using Go client and Python client works as expected.


Gemfile:

source 'https://rubygems.org'

ruby '2.7.2'

gem 'faktory_worker_ruby'
gem 'openssl'

Gemfile.lock:

GEM
  remote: https://rubygems.org/
  specs:
    connection_pool (2.2.5)
    faktory_worker_ruby (1.1.0)
      connection_pool (~> 2.2, >= 2.2.2)
    openssl (2.2.0)

PLATFORMS
  ruby

DEPENDENCIES
  faktory_worker_ruby
  openssl

RUBY VERSION
   ruby 2.7.2p137

BUNDLED WITH
   2.1.4

Faktory::CommandError: ERR Invalid password

I'm trying to connect to a local faktory instance with a password and am not having any luck.

I started the server like this:

$ FAKTORY_PASSWORD=test faktory

Then if I try to connect without configuring anything I see this, as expected:

> MyJob.perform_async(42)
ArgumentError: Server requires password, but none has been configured

So then I set ENV vars like this:

FAKTORY_PROVIDER=MY_FAKTORY_URL
MY_FAKTORY_URL=tcp://:test@localhost:7419

And then when I try to connect I see this:

> MyJob.perform_async(42)
Faktory::CommandError: ERR Invalid password

Am I missing something about how to configure the ruby client with a password?

Set socket timeout

Need to set a default open/read/write 5 sec timeout and (less important) expose the option for config.

Bug without custom parameters

klass = job_hash['custom']['wrapped'] || job_hash["jobtype"]

This fails unless there is a 'custom' key in the hash. #dig would be the current obvious choice here - but that is Ruby 2.3 and higher.

Since you support 2.2.2 and higher maybe the easiest option is to just pull this https://github.com/marcandre/backports/blob/master/lib/backports/2.3.0/hash/dig.rb in from backports until you push up to 2.3 where it could go away. It's all of 10 lines so it's not a huge push. Obviously adding backports as a requirement would work as well but might be a little heavier lift.

Add support for testing configuration

It would be nice to be able to fake queueing jobs for testing like sidekiq does. I'm not sure the correct way to go about adding something like that but I'll certainly take a crack at it if no one else wants to.

Faktory::ParseError

Hi @mperham,

We've been consistently seeing occurrences of [Faktory::ParseError](https://github.com/contribsys/faktory_worker_ruby/blob/master/lib/faktory/client.rb#L341), for roughly 0.01% of our Jobs.

line.strip is always blank. The comment suggests that, "this is bad, indicates we need to reset the socket and start fresh".

Do you have any experience with what usually triggers these and any suggestion for an improvement we could PR up and submit to handle these Parse Errors? (or other recommendation on how to handle on our end)

Thanks!

Error fetching job: Invalid password - password with invalid URL characters

Issue migrated from contribsys/faktory#411


  • Which Faktory package and version?
    Faktory 1.6.1 Docker image (contribsys/faktory:1.6.1)

  • Which Faktory worker package and version?
    FWR 1.1.1

  • Please include any relevant worker configuration

  • Please include any relevant error messages or stacktraces

/usr/local/bundle/gems/faktory_worker_ruby-1.1.1/lib/faktory/client.rb:352:in `result': Invalid password\r (Faktory::CommandError)
	from /usr/local/bundle/gems/faktory_worker_ruby-1.1.1/lib/faktory/client.rb:362:in `ok'
	from /usr/local/bundle/gems/faktory_worker_ruby-1.1.1/lib/faktory/client.rb:294:in `open'
	from /usr/local/bundle/gems/faktory_worker_ruby-1.1.1/lib/faktory/client.rb:58:in `initialize'
	from -e:1:in `new'
	from -e:1:in `<main>'

Are you using an old version? No


I'm using Faktory docker image version 1.6.1 (contribsys/faktory:1.6.1) and FWR 1.1.1

When I set a password for Faktory (using the env var FAKTORY_PASSWORD on the server) and the password has an invalid URL character (eg: @), when I connect to the server with FWR, I get the invalid password error. If I use a password without any URL invalid characters, everything works fine.

In the FAKTORY_URL I used the url-encoded password.

This docker compose file reproduces the error. If the password is changed to password (in both FAKTORY_PASSWORD and FAKTORY_URL), FWR can fetch jobs.

version: '3'
services:
  faktory:
    image: contribsys/faktory
    environment:
      - FAKTORY_PASSWORD=p@ssword
    command: /faktory

  example:
    image: ruby:2.7.6
    depends_on: [faktory]
    environment:
      - FAKTORY_URL=tcp://:p%40ssword@faktory:7419
    command: bash -c "gem install faktory_worker_ruby && ruby -r faktory_worker_ruby -e Faktory::Client.new"

Maybe a url-decode is missing before computing the password hash??

Jobs are not being fetched for certain queues

We have an issue in which jobs are not being processed.

We are running a Faktory worker in Ruby 2.7.2 using a configuration file:

$ bundle exec faktory-worker -C config/faktory/worker.yml

The worker.yml file looks like this:

:queues:
  - jobs
:concurrency: 10
:timeout: 25

An example: JobWorker class:

class JobWorker
  include Faktory::Job

  def perform(arg1, arg2)
    puts "Arg1: #{arg1}"
    puts "Arg2: #{arg2}"
  end
end

Pushing a job to Faktory:

jid = SecureRandom.hex(12)
Faktory::Client.new.push(
  jid: jid,
  queue: "jobs",
  jobtype: "JobWorker",
  args: ["arg1", "arg2"]
)

The job is stuck in the jobs queue. I would expect the job to be processed by the worker.

I suspect it's related to the fact the JobWorker doesn't have a queue option but our requirement is to have JobWorker processed by several processes from different queues.

What am I missing?

Unique jobs do not work with ActiveJob

Trying to do the below task in order to create unique jobs, as per:
https://github.com/contribsys/faktory/wiki/Pro-Unique-Jobs

My job classes look something like:

class FaktoryJob < ActiveJob::Base
  self.queue_adapter = :faktory
  queue_as 'default'
end

class SleepJob < FaktoryJob
  faktory_options custom: { unique_for: 5 }

  def perform
    sleep 10.seconds
  end
end

It seems that there might be an issue with the custom object being passed in correctly utilising ActiveJob vs native Faktory::Jobs
As you can see two identical jobs are enqueued immediately after one another:

Screen Shot 2021-04-12 at 8 41 48 am

Pool increase in a release?

Is there a timing for the next release / can you cut one? We are having a hard time pulling the main branch to get this commit and having it boot correctly in our k8s cluster. We would rather not rewrite our config just to change it back when the release comes.

Errno::ECONNRESET: Connection reset by peer - No response

I'm trying to get going with this and am running into a weird error.

First I installed the faktory server via homebrew.

Then I start faktory locally:

$ faktory
Faktory 0.5.0
Copyright © 2017 Contributed Systems LLC
Licensed under the GNU Public License 3.0
INFO[2017-11-08T22:20:41.828681916-06:00] Initializing storage at /Users/jgreen/.faktory/db 
INFO[2017-11-08T22:20:41.850672271-06:00] Now listening at localhost:7419, press Ctrl-C to stop 
INFO[2017-11-08T22:20:41.850750997-06:00] Web server now listening on port 7420  

Then I installed this gem in my Gemfile

gem 'faktory_worker_ruby', git: 'https://github.com/contribsys/faktory_worker_ruby'

Then in app/jobs/my_job.rb I have this code:

require 'faktory'

class MyJob
  include Faktory::Job

  def perform(some_id)
    puts "we should do something with this id : #{some_id}"
  end
end

Then I open a Rails console and do:

> MyJob.perform_async(42)
Errno::ECONNRESET: Connection reset by peer - No response
	from (irb):4

The server console shows this when the error happens:

ERRO[2017-11-08T22:20:53.102064455-06:00] Invalid client data in AHOYInvalid client Wid

ActiveJob Adapter

So, apparently the Rails team isn't accepting new AJ adapters (rails/rails#32285) but are asking that the adapter code live in outside gems. Is this project (faktory_worker_ruby) the right place to make a PR for an AJ adapter, or should that be a completely seperate gem?

Client middleware is not used when pushing jobs from a worker

I have a job that may conditionally en-queue other jobs when it's running. I have found that the Client Middleware is not evoked when the code is run from within the worker.

I would expect that the client middleware would still get used even when used from inside faktory-worker (since it's still using the Faktory client to push more jobs out).

Here's a small demo app:

require 'faktory_worker_ruby'

class LoudMiddleware
  def call(*_)
    puts "BEFORE THE JOB!"
    puts "!!!!!!!!!!!!!!!\n"* 3
    yield
    puts "AFTER THE JOB!\n"
    puts "!!!!!!!!!!!!!!!\n"* 3

  end
end

Faktory.configure_client do |config|
  config.client_middleware do |chain|
    chain.add LoudMiddleware
  end
end


class MyJob
  include Faktory::Job
  def perform *args
    5.times {|i| SecondaryJob.perform_async i}
    puts "Doing #{args}"
  end
end

class SecondaryJob
  include Faktory::Job
  def perform *args
    puts "Wrapping up: #{args}"
  end
end

MyJob.perform_async 'some stuff'
$ ruby app.rb
# Enqueues a single job, and the middleware shouts as it pushes the job to faktory

$ faktory-worker -r ./app.rb
# Also enqueues that job, since it's at the end, but does not shout.
# Also, the processing of the job includes enqueing 5 more jobs, which also do not get middlewared.
ttilberg@timdesktop:~/dev/fkt_middleware$ ruby app.rb 
BEFORE THE JOB!
!!!!!!!!!!!!!!!
!!!!!!!!!!!!!!!
!!!!!!!!!!!!!!!
AFTER THE JOB!
!!!!!!!!!!!!!!!
!!!!!!!!!!!!!!!
!!!!!!!!!!!!!!!
ttilberg@timdesktop:~/dev/fkt_middleware$ faktory-worker -r ./app.rb 


                    ,,,,
            ,,,,    |  |
            |  |    |  |
            |  |    |  |
            |  |,,~~'  '~,          ___       _
       ,,~~''   ,~~,      '~,      / __)     | |      _
 ,,~~''   ,~~,  |  |        |    _| |__ _____| |  _ _| |_ ___   ____ _   _
 |  ,~~,  |  |  |  |        |   (_   __|____ | |_/ |_   _) _ \ / ___) | | |
 |  |  |  |  |  |  |        |     | |  / ___ |  _ (  | || |_| | |   | |_| |
 |  |__|  |__|  |__|        |     |_|  \_____|_| \_)  \__)___/|_|    \__  |
 |__________________________|                                       (____/


2019-05-24T19:51:19.067Z 15827 TID-gptgbogs0 INFO: Running in ruby 2.5.3p105 (2018-10-18 revision 65156) [x86_64-linux]
2019-05-24T19:51:19.067Z 15827 TID-gptgbogs0 INFO: See LICENSE and the LGPL-3.0 for licensing details.
2019-05-24T19:51:19.067Z 15827 TID-gptgbogs0 INFO: Starting processing, hit Ctrl-C to stop
2019-05-24T19:51:19.075Z 15827 TID-gptgejawo MyJob JID-63b98e5eea80d8594cd5d7e2 INFO: start
2019-05-24T19:51:19.075Z 15827 TID-gptgej9s4 MyJob JID-5b0871c9b745362a6cda047a INFO: start
2019-05-24T19:51:19.075Z 15827 TID-gptgej9m0 SecondaryJob JID-5e713e7cb88f0682cf001549 INFO: start
Wrapping up: [0]
2019-05-24T19:51:19.075Z 15827 TID-gptgej9m0 SecondaryJob JID-5e713e7cb88f0682cf001549 INFO: done: 0.0 sec
2019-05-24T19:51:19.075Z 15827 TID-gptgej9fw SecondaryJob JID-f17c4ac4a1111af538bccbd8 INFO: start
Wrapping up: [0]
2019-05-24T19:51:19.076Z 15827 TID-gptgej9fw SecondaryJob JID-f17c4ac4a1111af538bccbd8 INFO: done: 0.0 sec
2019-05-24T19:51:19.076Z 15827 TID-gptgej93o SecondaryJob JID-d75c75c75e66ca36b5ee6eea INFO: start
Wrapping up: [1]
2019-05-24T19:51:19.076Z 15827 TID-gptgej93o SecondaryJob JID-d75c75c75e66ca36b5ee6eea INFO: done: 0.0 sec
2019-05-24T19:51:19.077Z 15827 TID-gptgej8lc SecondaryJob JID-9408c4c062de490c8c4283e8 INFO: start
Doing ["do some stuff"]
Doing ["do some stuff"]
Wrapping up: [3]
2019-05-24T19:51:19.077Z 15827 TID-gptgej9m0 SecondaryJob JID-04a8ae11a5f3ba5669b02723 INFO: start
Wrapping up: [4]
2019-05-24T19:51:19.077Z 15827 TID-gptgej9m0 SecondaryJob JID-04a8ae11a5f3ba5669b02723 INFO: done: 0.0 sec
2019-05-24T19:51:19.077Z 15827 TID-gptgej9fw SecondaryJob JID-e5d1cac96085eca6dc4f1b9a INFO: start
Wrapping up: [4]
2019-05-24T19:51:19.077Z 15827 TID-gptgejawo MyJob JID-63b98e5eea80d8594cd5d7e2 INFO: done: 0.002 sec
2019-05-24T19:51:19.077Z 15827 TID-gptgej8f8 SecondaryJob JID-3b1e572fba828ed10d9c81c7 INFO: start
Wrapping up: [3]
2019-05-24T19:51:19.077Z 15827 TID-gptgej8f8 SecondaryJob JID-3b1e572fba828ed10d9c81c7 INFO: done: 0.001 sec
2019-05-24T19:51:19.076Z 15827 TID-gptgej8rg SecondaryJob JID-92e701ca2896b314bb304c91 INFO: start
Wrapping up: [2]
2019-05-24T19:51:19.078Z 15827 TID-gptgej8rg SecondaryJob JID-92e701ca2896b314bb304c91 INFO: done: 0.001 sec
2019-05-24T19:51:19.076Z 15827 TID-gptgej8xk SecondaryJob JID-7338f7c156be7b94353ab587 INFO: start
Wrapping up: [2]
2019-05-24T19:51:19.076Z 15827 TID-gptgej99s SecondaryJob JID-0ab401372a385d341923f55b INFO: start
Wrapping up: [1]
2019-05-24T19:51:19.077Z 15827 TID-gptgej9fw SecondaryJob JID-e5d1cac96085eca6dc4f1b9a INFO: done: 0.0 sec
2019-05-24T19:51:19.077Z 15827 TID-gptgej9s4 MyJob JID-5b0871c9b745362a6cda047a INFO: done: 0.002 sec
2019-05-24T19:51:19.077Z 15827 TID-gptgej8lc SecondaryJob JID-9408c4c062de490c8c4283e8 INFO: done: 0.0 sec
2019-05-24T19:51:19.078Z 15827 TID-gptgej8xk SecondaryJob JID-7338f7c156be7b94353ab587 INFO: done: 0.001 sec
2019-05-24T19:51:19.078Z 15827 TID-gptgej99s SecondaryJob JID-0ab401372a385d341923f55b INFO: done: 0.002 sec
^C2019-05-24T19:51:25.290Z 15827 TID-gptgbogs0 INFO: Shutting down
2019-05-24T19:51:25.290Z 15827 TID-gptgbogs0 INFO: Terminating quiet threads
2019-05-24T19:51:25.390Z 15827 TID-gptgbogs0 INFO: Pausing to allow threads to finish...
2019-05-24T19:51:28.393Z 15827 TID-gptgbogs0 INFO: Bye!

Implement push_bulk

Faktory 1.6 added PUSHB. Sidekiq just added support for lazy enumerators in perform_bulk. Pull the combination into FWR.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.