edgurgel / verk Goto Github PK
View Code? Open in Web Editor NEWA job processing system that just verks! π§β
Home Page: https://hex.pm/packages/verk
License: MIT License
A job processing system that just verks! π§β
Home Page: https://hex.pm/packages/verk
License: MIT License
Can be useful for saving the history of completed jobs
These modules should offer "twins" of each function:
Example: clear!
and clear
where clear
returns{:ok, something}
or {:error, reason}
and clear!
will return something
or throw an error.
This should be done soon as we don't want to break this API after we release 1.0.0
Several GenServers include operations in their init()
functions that require a connection to Redis, for example:
verk/lib/verk/schedule_manager.ex
Lines 31 to 34 in e39293e
verk/lib/verk/queue_manager.ex
Lines 84 to 88 in e39293e
This results in these GenServers failing to start when the Redis connection is not available. The effect is that if an application adds Verk.Supervisor
to their supervision tree, the application will not be able to start if Redis is not available.
This doesn't seem to be what we would want in most cases. Instead I'd expect my application to be able to handle Redis connection retries in case there was a temporary connection disruption during startup.
We could move function calls that rely on the Redis connection to be present out of init()
and instead do that work in a callback. This would allow for the work to fail and the GenServer to still start.
For example Verk.QueueManager.init
would look something like:
def init([queue_name]) do
node_id = Application.get_env(:verk, :node_id, "1")
Process.send_after(self(), :startup, 0)
state = %State{queue_name: queue_name, redis: nil, node_id: node_id}
{:ok, state}
end
def handle_info(:startup, state) do
{:ok, redis_url} = Application.fetch_env(:verk, :redis_url)
{:ok, redis} = Redix.start_link(redis_url)
Verk.Scripts.load(redis)
state.redis = redis
Logger.info "Queue Manager started for queue #{state.queue_name}"
{:noreply, state}
end
16:46 <jeregrine> is there any reason why I *shouldn't* manually start an application outside of my mix.exs? Say I want to make sure Repo is started before an application?
16:48 <ericmj> jeregrine: it sounds weird to have a dependency depend on your app
16:48 <jeregrine> ericmj: yea it is. We're using the worker queue "verk" and the worker starts jobs which require the Repo before the repo has started
16:48 <jeregrine> its kind of a race condition
16:49 <jeregrine> so whenever we boot we get a couple jobs that fail
16:49 <edgurgel> yeah we would need a way to βwaitβ till the system is up
16:49 <edgurgel> cause Verk itself holds the queues,workers.
16:49 <ericmj> maybe the verk supervisor should be started by the parent app
16:49 <edgurgel> so Verk starts and it tries to run the jobs as fast as possible
16:50 <edgurgel> yeah it would probably be a good solution
Verk.enqueue_at
or something like thisScheduleManager
to look for jobs on retry
and schedule
ZSET
sHow to restore queues with data from redis after the application restart, which have been added dynamically Verk.add_queue(:new, 10)
At the initial startup, the supervisor only reads the data from the config
Line 19 in 062811e
Timex 3 is the latest major version, so Verk should probably support that. Opening this ticket to track that support.
Hey!
More of question than anything else, could you comment a bit on the differences between this library and exq?
Thanks!
I'm a noob to Elixir and looking for a job queue that I can adapt to my needs and your project looks good.
After a quick look on your source, I'm wondering where new jobs gets fetched from Redis.
Is the fetching done via the
def handle_info(:timeout, state) do
function in the Verk.WorkersManager
module?
Many thanks for explaining
Right now Verk does not use Redis to store stats about the jobs. We can easily "flush" data from the QueueStats
handler to the keys.
We could keep track of how many failed and processed jobs per queue and also overall.
It would be interesting if we could provide a way to fetch metadata about the job so that the worker could know which queue, job_id it's working on now.
My initial idea is to use the process dictionary as it's just a metadata related to the worker(process) and it will be cleaned when the job is done or failed. It feels like the perfect use case!
Still need some thought around the API. Worker.metadata
?
We should probably do this in a way that we would have both running at the same time while users can move their handlers (if any) to the new architecture.
Why? GenEvent
has lots of issues that we want to avoid. More info here: http://elixir-lang.org/blog/2016/07/14/announcing-genstage/
I have a few jobs that spawn a very large number of child jobs.
I want to run these initial "spawning" jobs only when the queues are at 0. How can I check that?
Having read the documentation, I got the impression that the job queue that the workers receive job from is FIFO. In many applications, the jobs have priority associated. If I want to make job queue type configurable (FIFO, priority queue), how much design change in verk
do you think one would have to make?
My question is, if it's possible to process the incoming work in the order it's arriving.
So if the first job is failing, the other ones have to wait in line for the first one to finish. So only when the first job is finished, the second in line is processed and so on.
We need to find a way to report errors that happened with a job so the user can define/use their own error reporting system (Airbrake, Raygun, etc)
My initial idea was to create events using GenEvent and one of them would be "error happened". The same GenEvent could be used to handle metrics as well.
Is it possible to do so that until tasks from the highest priority queue are completed, do not start tasks from others. Or set the task at the top of the queue so that it runs in high priority - RPUSH insteadof LPUSH in Verk.enqueue
Hello!
I've just discovered this library, and I'm interested in how it compares to https://github.com/akira/exq
There were a few things I was unable to determine from the documentation and a very quick scan of the codebase.
It'd be great if the algorithm was documented so it's easier to make an informed decision when evaluating worker libs :)
Thanks,
Louis
https://github.com/edgurgel/verk/blob/master/priv/requeue_job_now.lua
local removed = redis.call("ZREM", KEYS[1], job)
if removed == 1 then
redis.call("LPUSH", queue_key, job)
else
If the ZREM
succeeds but something happens afterwards that LPUSH is not called we may never enqueue again this job. We would rather have 2 jobs than 0 jobs to be processed.
cc/ @keyan
I'm not sure if we want to try to LREM
if the ZREM
failed? What do you think?
perform_at = Timex.shift(Timex.DateTime.now, seconds: 30)
raise error:
(UndefinedFunctionError) function Timex.DateTime.now/0 is undefined (module Timex.DateTime is not available)
Should use Timex.now
Related to #61
I think we should investigate what's the right approach here. Maybe checking ExUnit's code on how they treat this case.
The log level for "start" and "done" for each job is info
. I would like to not have these logs in production, as we have a high volume of jobs.
I thought about lowering my production log level to warn
, but I would like the phoenix request log messages, and they are also at info
.
Two solutions that would work for me:
start
and done
to debug
. I'm not sure if this would be ideal for others. I would keep fail
at info
or change it to warn
.done
and fail
, but not start
. This should be doable with Logger.log/3, however it does recommend using the macros as they are optimized out at compile time.fail
to log at warn
.I'd be happy to write up a pull-request if there's agreement on a direction.
https://github.com/edgurgel/verk/blob/master/lib/verk/workers_manager.ex#L188 on version 0.9.13 (but would happen the same on 10) returns :full
If you need more details from our redis instance we can grab them for you.
This needs to be done when we enqueue through Verk.enqueue
and when jobs move from the scheduled set to the queues.
enqueued_at
is just an unix time of the moment we enqueued.
I've got a number of errors in my log for
02:24:50.005 [error] Disconnected from Redis (redis.cache.amazonaws.com:6379): unknown POSIX error
The GenServer doesn't die, but it seems that 0.3.4 of redix fixed this. (See whatyouhide/redix#19)
Can you advise?
Sidekiq has a few options for testing workers: https://github.com/mperham/sidekiq/wiki/Testing
Do you have any plans to implement something similar or do you have a better approach to deal with workers when running tests?
For context: I'm running my Verk-backed application in Kubernetes. When I do a deploy Kubernetes issues a SIGTERM signal, waits 30 seconds, then issues a SIGKILL. There is also an option to run a custom script as a hook before issuing the SIGTERM.
I don't believe that the Erlang VM is handling the Unix signals in any special way, I think instead that eventually Verk is brutally killed, stopping any in-progress jobs in a potentially weird state. I'd like to have some way to gracefully stop Verk, perhaps by stopping dequeuing of jobs from Redis and letting it finish any in-progress jobs.
Does Verk have anything right now to support this? Can we somehow leverage GenServer callbacks to accomplish this?
We are always logging seconds
[info] process_id=#PID<0.7160.0> NoOpWorker 4e500781cedd2f68fc13e1d6 done: 0 secs
It would be nice to see them in micro seconds if less than 1 second. Basically use the appropriated unit depending on the value.
I've just started Elixir, and I just wonder what was your reasoning against using an erlang built in solution like Mnesia or DETS for this instead of Redis?
I hope that doesn't sound harsh or offend you, this is a great library.
I just want to get some answers as it's been on my head for quite a bit.
Right now it seeks for new jobs each second. We should add some random part to it so every queue does not try to fetch at the same time and they keep somehow unsync'd. This could be a problem if someone is running hundreds of queues
This will be useful to keep the code consistent.
https://github.com/rrrene/credo
Ideally we can hook the PR with the credo analysis.
We also need to discuss which rules we should follow etc.
This may be related to #48, but I am trying to test my worker that references Verk.Worker.current_job and get an error.
For example, with this worker:
defmodule ExportWorker do
def perform(random_id) do
job_id = Verk.Worker.current_job.jid
end
end
and this test:
test "performs" do
result = ExportWorker.perform("abc123")
assert result == :error
end
I get this error message:
1) test performs (ExportWorkerTest)
test/workers/export_worker_test.exs:36
** (UndefinedFunctionError) undefined function :undefined.jid/0 (module :undefined is not available)
stacktrace:
:undefined.jid()
(wombat_worker) ExportWorker.perform/2
test/workers/export_worker_test.exs:37
Any ideas?
For myself, the use case is not wanting to retry certain types of jobs, and some others I'd like to retry less than the default.
I would be willing add the feature if this is something would be beneficial to have in Verk.
Hi, Here're a few parts of my code from a controller
campaign = Repo.get!(Campaign, id) |> Repo.preload(group: :users)
Verk.enqueue(%Verk.Job{queue: :default, class: "EmailWorker", args: [campaign], max_retry_count: 5})
And here's what I get inside worker when inspecting campaign args\":[{\"group\":{},\"
Group values are available inside controller but empty inside worker.
I also added
@derive {Poison.Encoder, only: [:name, :subject, :body, :group]}
to Campaign model
@derive {Poison.Encoder, only: [:user]}
to User model
Any advise, please?
Thanks
I'm fairly new to Elixir so there might be an obvious answer to this, but is there an easy way to reuse the Redix connection that Verk opens up elsewhere in the code?
After too many retries the job should be inserted into a set of dead jobs. The set should have a maximum of X dead jobs.
X could be 1000 for now?
We are seeing this issue come up:
15:49:13.960 [info] 10 jobs readded to the queue php_queue:emails from inprogress list
15:49:13.963 [error] Manager terminating, reason: {%Poison.SyntaxError{message: "Unexpected end of input", token: nil}, [{Poison.Parser, :parse!, 2, [file: 'lib/poison/parser.ex', line: 54]}, {Poison, :decode!, 2, [file: 'lib/poison.ex', line: 83]}, {Verk.Job, :decode!, 1, [file: 'lib/verk/job.ex', line: 19]}, {Verk.WorkersManager, :"-handle_info/2-fun-0-", 3, [file: 'lib/verk/workers_manager.ex', line: 102]}, {Enum, :"-reduce/3-lists^foldl/2-0-", 3, [file: 'lib/enum.ex', line: 1623]}, {Verk.WorkersManager, :handle_info, 2, [file: 'lib/verk/workers_manager.ex', line: 102]}, {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 601]}, {:gen_server, :handle_msg, 5, [file: 'gen_server.erl', line: 667]}]}
15:49:13.964 [error] GenServer :"php_queue:emails.workers_manager" terminating
** (Poison.SyntaxError) Unexpected end of input
(poison) lib/poison/parser.ex:54: Poison.Parser.parse!/2
(poison) lib/poison.ex:83: Poison.decode!/2
(verk) lib/verk/job.ex:19: Verk.Job.decode!/1
(verk) lib/verk/workers_manager.ex:102: anonymous fn/3 in Verk.WorkersManager.handle_info/2
(elixir) lib/enum.ex:1623: Enum."-reduce/3-lists^foldl/2-0-"/3
(verk) lib/verk/workers_manager.ex:102: Verk.WorkersManager.handle_info/2
(stdlib) gen_server.erl:601: :gen_server.try_dispatch/4
(stdlib) gen_server.erl:667: :gen_server.handle_msg/5
Last message: :timeout
State: %Verk.WorkersManager.State{monitors: :"php_queue:emails.workers_manager", pool_name: :"php_queue:emails.pool", pool_size: 10, queue_manager_name: :"php_queue:emails.queue_manager", queue_name: :"php_queue:emails", timeout: 1000}
15:49:13.966 [info] Application <APPLICATION NAME> exited: shutdown
{"Kernel pid terminated",application_controller,"{application_terminated,<APPLICATION NAME>,shutdown}"}
Kernel pid terminated (application_controller) ({application_terminated,<APPLICATION NAME>,shutdown})
Crash dump is being written to: erl_crash.dump...done
It looks like what is happening is that a WorkerManager attempts to decode a job that probably has bad JSON, this makes the worker manager crash and eventually leads to the parent process crashing.
I am confused as to why this would result in our application crashing. We are supervising Verk in the suggested way, I would expect instead for the worker manager process to crash and be restarted by Verk.Supervisor
. Here is our supervisor configuration:
import Supervisor.Spec, warn: false
tree = [supervisor(Verk.Supervisor, [])]
opts = [name: <APPLICATION_NAME>.Supervisor, strategy: :one_for_one]
Supervisor.start_link(tree, opts)
So:
Poison.decode
instead and then move jobs with bad JSON into a malformed
key in Redis?
dead
through DeadSet
without modifications, because we rely on proper encoding in DeadSet.add/3
.I think the option to "retry now" selected failed jobs would be useful - i.e. to run failed jobs after deploying a fixed codebase.
Thoughts?
Because class
sounds a bit object oriented, I guess it would be nice if it were replaced. I was thinking that module
or worker
would be a bit better.
Let me know what do you think, maybe I can send a pull request with the changes.
9b80a3a#diff-3f12772bbd50c9b2d993f795fd835138
error being raised in lib/verk/worker.ex
What would be the impact of upgrading to Timex 2.0?
Right now the WorkersManager does a couple of tasks:
QueueManager
;QueueManager
;inprocess
list of jobs that were hanging from previous executionThis issue is a starting point to discuss what and if we can split these concerns in different processes but keeping the same feature set we have now.
cc/ @mitchellhenke
We should include the backtrace (or at least some part of it) into the job when failing and scheduling to be retried.
If for some reason the amount of "inprogress" jobs is too high we may spend too long time putting the jobs back to the queue and getting a timeout.
We should change QueueManager
to do this in multiple steps.
I was playing with this, and saw an issue if we disconnect Redis and then reconnect it.
Once Redis is back up, I'm getting this error:
Failed to fetch retry set. Error: {:error, %Redix.Error{message: "NOSCRIPT No matching script. Please use EVAL."}}
Looks like the Lua scripts need to be reloaded? Wonder if there is a way to listen to disconnect/reconnect events from Redix.
Hi,
It is not very apparent looking at the doc if the retry mechanism implements any kind of exponential backoff. If not, I believe it would be a good addition. Thanks
This line should be configurable: https://github.com/edgurgel/verk/blob/master/lib/verk/workers_manager.ex#L12
Right now we store the whole stacktrace but we should limit for the first N lines. Probably just a configuration to set up with the default being...5 lines?
Hello, how can we configure Verk task (worker) timeout?
We have an applications with several taks, all of then works fine, but some file transfers, take more than tak timeouts and always fail.
How should we configure it ?
We've tryed with workers_manager_timeout: 360000 but we still got timeout
[debug] Worker got down, reason: :timeout, [#Reference<0.0.6.1826>, #PID<0.760.0>]
[debug] Rumbl.VideoWorker 20668957681470793693 fail: 31 s
[error] Task #PID<0.782.0> started from #PID<0.760.0> terminating
** (stop) exited in: Task.Supervised.stream(30000)
Thanx
A declarative, efficient, and flexible JavaScript library for building user interfaces.
π Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. πππ
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google β€οΈ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.