Code Monkey home page Code Monkey logo

exq's Introduction

Exq

CI Coveralls Coverage Module Version Hex Docs Total Download License Last Updated

Exq is a job processing library compatible with Resque / Sidekiq for the Elixir language.

  • Exq uses Redis as a store for background processing jobs.
  • Exq handles concurrency, job persistence, job retries, reliable queueing and tracking so you don't have to.
  • Jobs are persistent so they would survive across node restarts.
  • You can use multiple Erlang nodes to process from the same pool of jobs.
  • Exq uses a format that is Resque/Sidekiq compatible.
    • This means you can use it to integrate with existing Rails / Django projects that also use a background job that's Resque compatible - typically with little or no changes needed to your existing apps. However, you can also use Exq standalone.
    • You can also use the Sidekiq UI to view job statuses, as Exq is compatible with the Sidekiq stats format.
    • You can run both Exq and Toniq in the same app for different workers.
  • Exq supports uncapped amount of jobs running, or also allows a max limit per queue.
  • Exq supports job retries with exponential backoff.
  • Exq supports configurable middleware for customization / plugins.
  • Exq tracks several stats including failed busy, and processed jobs.
  • Exq stores in progress jobs in a backup queue (using the Redis RPOPLPUSH command). This means that if the system or worker is restarted while a job is in progress, the job will be re_enqueued when the node is restarted and not lost.
  • Exq provides an optional web UI that you can use to view several stats as well as rate of job processing.
  • When shutting down Exq will attempt to let workers terminate gracefully, with a configurable timeout.
  • There is no time limit to how long a job can run for.

Do you need Exq?

While you may reach for Sidekiq / Resque / Celery by default when writing apps in other languages, in Elixir there are some good options to consider that are already provided by the language and platform. So before adding Exq or any Redis backed queueing library to your application, make sure to get familiar with OTP and see if that is enough for your needs. Redis backed queueing libraries do add additional infrastructure complexity and also overhead due to serialization / marshalling, so make sure to evaluate whether it is an actual need or not.

Some OTP related documentation to look at:

If you need a durable jobs, retries with exponential backoffs, dynamically scheduled jobs in the future - that are all able to survive application restarts, then an externally backed queueing library such as Exq could be a good fit.

Getting Started

Pre-requisite

This assumes you have an instance of Redis to use. The easiest way to install it on OSX is via brew:

> brew install redis

To start it:

> redis-server

Screencast on elixircasts.io:

If you prefer video instructions, check out the screencast on elixircasts.io which details how to install and use the Exq library: https://elixircasts.io/elixir-job-processing-with-exq

Installation

Add :exq to your mix.exs deps (replace version with the latest hex.pm package version):

defp deps do
  [
    # ... other deps
    {:exq, "~> 0.19.0"}
  ]
end

Then run mix deps.get.

Configuration

By default, Exq will use configuration from your config.exs file. You can use this to configure your Redis host, port, password, as well as namespace (which helps isolate the data in Redis). If you would like to specify your options as a Redis URL, that is also an option using the url config key (in which case you would not need to pass the other Redis options).

Configuration options may optionally be given in the {:system, "VARNAME"} format, which will resolve to the runtime environment value.

Other options include:

  • The queues list specifies which queues Exq will listen to for new jobs.
  • The concurrency setting will let you configure the amount of concurrent workers that will be allowed, or :infinite to disable any throttling.
  • The name option allows you to customize Exq's registered name, similar to using Exq.start_link([name: Name]). The default is Exq.
  • If the option start_on_application is false, Exq won't be started automatically when booting up you Application. You can start it with Exq.start_link/1.
  • The shutdown_timeout is the number of milliseconds to wait for workers to finish processing jobs when the application is shutting down. It defaults to 5000 ms.
  • The mode option can be used to control what components of Exq are started. This would be useful if you want to only enqueue jobs in one node and run the workers in different node.
    • :default - starts worker, enqueuer and API.
    • :enqueuer - starts only the enqueuer.
    • :api - starts only the api.
    • [:api, :enqueuer] - starts both enqueuer and api.
  • The backoff option allows you to customize the backoff time used for retry when a job fails. By default exponential time scaled based on job's retry_count is used. To change the default behavior, create a new module which implements the Exq.Backoff.Behaviour and set backoff option value to the module name.
config :exq,
  name: Exq,
  host: "127.0.0.1",
  port: 6379,
  password: "optional_redis_auth",
  namespace: "exq",
  concurrency: :infinite,
  queues: ["default"],
  poll_timeout: 50,
  scheduler_poll_timeout: 200,
  scheduler_enable: true,
  max_retries: 25,
  mode: :default,
  shutdown_timeout: 5000

Concurrency

Exq supports concurrency setting per queue. You can specify the same concurrency option to apply to each queue or specify it based on a per queue basis.

Concurrency for each queue will be set at 1000:

config :exq,
  host: "127.0.0.1",
  port: 6379,
  namespace: "exq",
  concurrency: 1000,
  queues: ["default"]

Concurrency for q1 is set at 10_000 while q2 is set at 10:

config :exq,
  host: "127.0.0.1",
  port: 6379,
  namespace: "exq",
  queues: [{"q1", 10_000}, {"q2", 10}]

Job Retries

Exq will automatically retry failed job. It will use an exponential backoff timing similar to Sidekiq or delayed_job to retry failed jobs. It can be configured via these settings:

config :exq,
  host: "127.0.0.1",
  port: 6379,
  ...
  scheduler_enable: true,
  max_retries: 25

Note that scheduler_enable has to be set to true and max_retries should be greater than 0.

Dead Jobs

Any job that has failed more than max_retries times will be moved to dead jobs queue. Dead jobs could be manually re-enqueued via Sidekiq UI. Max size and timeout of dead jobs queue can be configured via these settings:

config :exq,
  dead_max_jobs: 10_000,
  dead_timeout_in_seconds: 180 * 24 * 60 * 60, # 6 months

OTP Application

You can add Exq into your OTP application list, and it will start an instance of Exq along with your application startup. It will use the configuration from your config.exs file.

def application do
  [
    applications: [:logger, :exq],
    #other stuff...
  ]
end

When using Exq through OTP, it will register a process under the name Elixir.Exq - you can use this atom where expecting a process name in the Exq module.

If you would like to control Exq startup, you can configure Exq to not start anything on application start. For example, if you are using Exq along with Phoenix, and your workers are accessing the database or other resources, it is recommended to disable Exq startup and manually add it to the supervision tree.

This can be done by setting start_on_application to false and adding it to your supervision tree:

config :exq,
   start_on_application: false
def start(_type, _args) do
  children = [
    # Start the Ecto repository
    MyApp.Repo,
    # Start the endpoint when the application starts
    MyApp.Endpoint,
    # Start the EXQ supervisor
    Exq,
  ]

Sentinel

Exq uses Redix client for communication with redis server. The client can be configured to use sentinel via redis_options. Note: you need to have Redix 0.9.0+.

config :exq
  redis_options: [
    sentinel: [sentinels: [[host: "127.0.0.1", port: 6666]], group: "exq"],
    database: 0,
    password: nil,
    timeout: 5000,
    name: Exq.Redis.Client,
    socket_opts: []
  ]

Using IEx

If you'd like to try Exq out on the iex console, you can do this by typing:

> mix deps.get

and then:

> iex -S mix

Standalone Exq

You can run Exq standalone from the command line, to run it:

> mix do app.start, exq.run

Workers

Enqueuing jobs:

To enqueue jobs:

{:ok, ack} = Exq.enqueue(Exq, "default", MyWorker, ["arg1", "arg2"])

{:ok, ack} = Exq.enqueue(Exq, "default", "MyWorker", ["arg1", "arg2"])

## Don't retry job in per worker
{:ok, ack} = Exq.enqueue(Exq, "default", MyWorker, ["arg1", "arg2"], max_retries: 0)
## max_retries = 10, it will override :max_retries in config
{:ok, ack} = Exq.enqueue(Exq, "default", MyWorker, ["arg1", "arg2"], max_retries: 10)

In this example, "arg1" will get passed as the first argument to the perform method in your worker, "arg2" will be second argument, etc.

You can also enqueue jobs without starting workers:

{:ok, sup} = Exq.Enqueuer.start_link([port: 6379])

{:ok, ack} = Exq.Enqueuer.enqueue(Exq.Enqueuer, "default", MyWorker, [])

You can also schedule jobs to start at a future time. You need to make sure scheduler_enable is set to true.

Schedule a job to start in 5 mins:

{:ok, ack} = Exq.enqueue_in(Exq, "default", 300, MyWorker, ["arg1", "arg2"])

# If using `mode: [:enqueuer]`
{:ok, ack} = Exq.Enqueuer.enqueue_in(Exq.Enqueuer, "default", 300, MyWorker, ["arg1", "arg2"])

Schedule a job to start at 8am 2015-12-25 UTC:

time = Timex.now() |> Timex.shift(days: 8)
{:ok, ack} = Exq.enqueue_at(Exq, "default", time, MyWorker, ["arg1", "arg2"])

# If using `mode: [:enqueuer]`
{:ok, ack} = Exq.Enqueuer.enqueue_at(Exq.Enqueuer, "default", time, MyWorker, ["arg1", "arg2"])

Creating Workers

To create a worker, create an elixir module matching the worker name that will be enqueued. To process a job with "MyWorker", create a MyWorker module. Note that the perform also needs to match the number of arguments as well.

Here is an example of a worker:

defmodule MyWorker do
  def perform do
  end
end

We could enqueue a job to this worker:

{:ok, jid} = Exq.enqueue(Exq, "default", MyWorker, [])

The 'perform' method will be called with matching args. For example:

{:ok, jid} = Exq.enqueue(Exq, "default", "MyWorker", [arg1, arg2])

Would match:

defmodule MyWorker do
  def perform(arg1, arg2) do
  end
end

Job data from worker

If you'd like to get Job metadata information from a worker, you can call worker_job from within the worker:

defmodule MyWorker do
  def perform(arg1, arg2) do
    # get job metadata
    job = Exq.worker_job()
  end
end

Dynamic queue subscriptions

The list of queues that are being monitored by Exq is determined by the config.exs file or the parameters passed to Exq.start_link. However, we can also dynamically add and remove queue subscriptions after Exq has started.

To subscribe to a new queue:

# last arg is optional and is the max concurrency for the queue
:ok = Exq.subscribe(Exq, "new_queue_name", 10)

To unsubscribe from a queue:

:ok = Exq.unsubscribe(Exq, "queue_to_unsubscribe")

To unsubscribe from all queues:

:ok = Exq.unsubscribe_all(Exq)

Middleware Support

If you'd like to customize worker execution and/or create plugins like Sidekiq/Resque have, Exq supports custom middleware. The first step would be to define the middleware in config.exs and add your middleware into the chain:

middleware: [Exq.Middleware.Stats, Exq.Middleware.Job, Exq.Middleware.Manager, Exq.Middleware.Unique, Exq.Middleware.Logger]

You can then create a module that implements the middleware behavior and defines before_work, after_processed_work and after_failed_work functions. You can also halt execution of the chain as well. For a simple example of middleware implementation, see the Exq Logger Middleware.

Using with Phoenix and Ecto

If you would like to use Exq alongside Phoenix and Ecto, add :exq to your mix.exs application list:

def application do
  [
    mod: {Chat, []},
    applications: [:phoenix, :phoenix_html, :cowboy, :logger, :exq]
  ]
end

Assuming you will be accessing the database from Exq workers, you will want to lower the concurrency level for those workers, as they are using a finite pool of connections and can potentially back up and time out. You can lower this through the concurrency setting, or perhaps use a different queue for database workers that have a lower concurrency just for that queue. Inside your worker, you would then be able to use the Repo to work with the database:

defmodule Worker do
  def perform do
    HelloPhoenix.Repo.insert!(%HelloPhoenix.User{name: "Hello", email: "[email protected]"})
  end
end

Using alongside Sidekiq / Resque

To use alongside Sidekiq / Resque, make sure your namespaces as configured in Exq match the namespaces you are using in Sidekiq. By default, Exq will use the exq namespace, so you will have to change that.

Another option is to modify Sidekiq to use the Exq namespace in the sidekiq initializer in your ruby project:

Sidekiq.configure_server do |config|
  config.redis = { url: 'redis://127.0.0.1:6379', namespace: 'exq' }
end

Sidekiq.configure_client do |config|
  config.redis = { url: 'redis://127.0.0.1:6379', namespace: 'exq' }
end

For an implementation example, see sineed's demo app illustrating Sidekiq to Exq communication.

If you would like to exclusively send some jobs from Sidekiq to Exq as your migration strategy, you should create queue(s) that are exclusively listened to only in Exq (and configure those in the queue section in the Exq config). Make sure they are not configured to be listened to in Sidekiq, otherwise Sidekiq will also take jobs off that queue. You can still Enqueue jobs to that queue in Sidekiq even though they are not being monitored:

Sidekiq::Client.push('queue' => 'elixir_queue', 'class' => 'ElixirWorker', 'args' => ['foo', 'bar'])

Security

By default, your Redis server could be open to the world. As by default, Redis comes with no password authentication, and some hosting companies leave that port accessible to the world.. This means that anyone can read data on the queue as well as pass data in to be run. Obviously this is not desired, please secure your Redis installation by following guides such as the Digital Ocean Redis Security Guide.

Node Recovery

A Node can be stopped unexpectedly while processing jobs due to various reasons like deployment, system crash, OOM, etc. This could leave the jobs in the in-progress state. Exq comes with two mechanisms to handle this situation.

Same Node Recovery

Exq identifies each node using an identifier. By default machine's hostname is used as the identifier. When a node comes back online after a crash, it will first check if there are any in-progress jobs for its identifier. Note that it will only re-enqueue jobs with the same identifier. There are environments like Heroku or Kubernetes where the hostname would change on each deployment. In those cases, the default identifier can be overridden

config :exq,
   node_identifier: MyApp.CustomNodeIdentifier
defmodule MyApp.CustomNodeIdentifier do
  @behaviour Exq.NodeIdentifier.Behaviour

  def node_id do
     # return node ID, perhaps from environment variable, etc
     System.get_env("NODE_ID")
  end
end

Heartbeat

Same node recovery is straightforward and works well if the number of worker nodes is fixed. There are use cases that need the worker nodes to be autoscaled based on the workload. In those situations, a node that goes down might not come back for a very long period.

Heartbeat mechanism helps in these cases. Each node registers a heartbeat at regular interval. If any node misses 5 consecutive heartbeats, it will be considered dead and all the in-progress jobs belong to that node will be re-enqueued.

This feature is disabled by default and can be enabled using the following config:

config :exq,
    heartbeat_enable: true,
    heartbeat_interval: 60_000,
    missed_heartbeats_allowed: 5

Unique Jobs

There are many use cases where we want to avoid duplicate jobs. Exq provides a few job level options to handle these cases.

This feature is implemented using lock abstraction. When you enqueue a job for the first time, a unique lock is created. The lock token is derived from the job queue, class and args or from the unique_token value if provided. If you try to enqueue another job with same args and the lock has not expired yet, you will get back {:conflict, jid}, here jid refers the first successful job.

The lock expiration is controlled by two options.

  • unique_for (seconds), controls the maximum duration a lock can be active. This option is mandatory to create a unique job and the lock never outlives the expiration duration. In cases of scheduled job, the expiration time is calculated as scheduled_time + unique_for

  • unique_until allows you to clear the lock based on job lifecycle. Using :success will clear the lock on successful completion of job or if the job is dead, :start will clear the lock when the job is picked for execution for the first time. :expiry specifies the lock should be cleared based on the expiration time set via unique_for.

{:ok, jid} = Exq.enqueue(Exq, "default", MyWorker, ["arg1", "arg2"], unique_for: 60 * 60)
{:conflict, ^jid} = Exq.enqueue(Exq, "default", MyWorker, ["arg1", "arg2"], unique_for: 60 * 60)

Example usages

  • Idempotency - Let's say you want to send a welcome email and want to make sure it's never sent more than once, even when the enqueue part might get retried due to timeout etc. Use a reasonable expiration duration (unique_for) that covers the retry period along with unique_until: :expiry.

  • Debounce - Let's say for any change to user data, you want to sync it to another system. If you just enqueue a job for each change, you might end up with unnecessary duplicate sync calls. Use unique_until: :start along with expiration time based on queue load. This will make sure you never have more than one job pending for a user in the queue.

  • Batch - Let's say you want to send a notification to user, but want to wait for an hour and batch them together. Schedule a job one hour in the future using enqueue_in and set unique_until: :success. This will make sure no other job get enqueued till the scheduled job completes successfully.

Although Exq provides unique jobs feature, try to make your worker idempotent as much as possible. Unique jobs doesn't prevent your job from getting retried on failure etc. So, unique jobs is best effort, not a guarantee to avoid duplicate execution. Uniqueness feature depends on Exq.Middleware.Unique middleware. If you override :middleware configuration, make sure to include it.

Enqueuing Many Jobs Atomically

Similar to database transactions, there are cases where you may want to enqueue/schedule many jobs atomically. A common usecase of this would be when you have a computationally heavy job and you want to break it down to multiple smaller jobs so they can be run concurrently. If you use a loop to enqueue/schedule these jobs, and a network, connectivity, or application error occurs while passing these jobs to Exq, you will end up in a situation where you have to roll back all the jobs that you may already have scheduled/enqueued which will be a complicated process. In order to avoid this problem, Exq comes with an enqueue_all method which guarantees atomicity.

{:ok, [{:ok, jid_1}, {:ok, jid_2}, {:ok, jid_3}]} = Exq.enqueue_all(Exq, [
  [job_1_queue, job_1_worker, job_1_args, job_1_options],
  [job_2_queue, job_2_worker, job_2_args, job_2_options],
  [job_3_queue, job_3_worker, job_3_args, job_3_options]
])

enqueue_all also supports scheduling jobs via schedule key in the options passed for each job:

{:ok, [{:ok, jid_1}, {:ok, jid_2}, {:ok, jid_3}]} = Exq.enqueue_all(Exq, [
  [job_1_queue, job_1_worker, job_1_args, [schedule: {:in, 60 * 60}]],
  [job_2_queue, job_2_worker, job_2_args, [schedule: {:at, midnight}]],
  [job_3_queue, job_3_worker, job_3_args, []] # no schedule key is present, it is enqueued immediately
])

Web UI

Exq has a separate repo, exq_ui which provides with a Web UI to monitor your workers:

Screenshot

See https://github.com/akira/exq_ui for more details.

Community Plugins

  • exq_scheduler Exq Scheduler is a cron like job scheduler for Exq, it's also compatible with Sidekiq and Resque.
  • exq_limit ExqLimit implements different types of rate limiting for Exq queue.
  • exq_batch ExqBatch provides a building block to create complex workflows using Exq jobs. A batch monitors a group of Exq jobs and creates callback job when all the jobs are processed.

Starting Exq manually

Typically, Exq will start as part of the application along with the configuration you have set. However, you can also start Exq manually and set your own configuration per instance.

Here is an example of how to start Exq manually:

{:ok, sup} = Exq.start_link

To connect with custom configuration options (if you need multiple instances of Exq for example), you can pass in options under start_link:

{:ok, sup} = Exq.start_link([host: "127.0.0.1", port: 6379, namespace: "x"])

By default, Exq will register itself under the Elixir.Exq atom. You can change this by passing in a name parameter:

{:ok, exq} = Exq.start_link(name: Exq.Custom)

Testing

Exq.Mock module provides few options to test your workers:

# change queue_adapter in config/test.exs
config :exq,
  queue_adapter: Exq.Adapters.Queue.Mock

# start mock server in your test_helper.exs
Exq.Mock.start_link(mode: :redis)

Exq.Mock currently supports three modes. The default mode can provided on the Exq.Mock.start_link call. The mode could be overridden for each test by calling Exq.Mock.set_mode(:fake)

redis

This could be used for integration testing. Doesn't support async: true option.

fake

The jobs get enqueued in a local queue and never get executed. Exq.Mock.jobs() returns all the jobs. Supports async: true option.

inline

The jobs get executed in the same process. Supports async: true option.

Donation

To donate, send to:

Bitcoin (BTC): 17j52Veb8qRmVKVvTDijVtmRXvTUpsAWHv Ethereum (ETH): 0xA0add27EBdB4394E15b7d1F84D4173aDE1b5fBB3

Questions? Issues?

For issues, please submit a Github issue with steps on how to reproduce the problem.

Contributions

Contributions are welcome. Tests are encouraged.

To run tests / ensure your changes have not caused any regressions:

mix test --no-start

To run the full suite, including failure conditions (can have some false negatives):

mix test --trace --include failure_scenarios:true --no-start

Maintainers

Anantha Kumaran / @ananthakumaran (Lead)

Contributors

Justin McNally (j-mcnally) (structtv), zhongwencool (zhongwencool), Joe Webb (ImJoeWebb), Chelsea Robb (chelsea), Nick Sanders (nicksanders), Nick Gal (nickgal), Ben Wilson (benwilson512), Mike Lawlor (disbelief), colbyh (colbyh), Udo Kramer (optikfluffel), Andreas Franzรฉn (triptec),Josh Kalderimis (joshk), Daniel Perez (tuvistavie), Victor Rodrigues (rodrigues), Denis Tataurov (sineed), Joe Honzawa (Joe-noh), Aaron Jensen (aaronjensen), Andrew Vy (andrewvy), David Le (dl103), Roman Smirnov (romul), Thomas Athanas (typicalpixel), Wen Li (wli0503), Akshay (akki91), Rob Gilson (D1plo1d), edmz (edmz), and Benjamin Tan Wei Hao (benjamintanweihao).

Copyright and License

Copyright (c) 2014 Alex Kira

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

exq's People

Contributors

akira avatar ananthakumaran avatar bradediger avatar chulkilee avatar d1plo1d avatar deepfryed avatar disbelief avatar edmz avatar frahugo avatar geoff-lee-lendesk avatar jherdman avatar joe-noh avatar joshk avatar kianmeng avatar korialis avatar lpil avatar meysius avatar mosic avatar nickgal avatar optikfluffel avatar robobakery avatar rodrigues avatar romul avatar ryansch avatar samidarko avatar sineed avatar stavro avatar triptec avatar tzilist avatar zhongwencool avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

exq's Issues

[Enhancement] Ability to limit the concurrency of a specific queue.

Would be nice for tasks such as video encoding to be able to to only run one task in a specific queue at a time.

We could allow queues to be specified in the for [{"Default", 50}, {"Encoding", 1}]

Since the workers per queue would be tracked on a per instance basis, we can keep track of the number of running tasks in ETS rather than redis.

RFC: Middleware support

I think we have a pretty good base for a lot of use cases.

Personally I have branches that add specific functionality that I want but might not be perfect for everyone.

I would like to explore some sort of contrib libraries to add functionality without maintaining additional forks or shoe horning them into main line.

This proposal is to add hooks into exq mainline that would allow for 3rd party libraries to add extensions.

I'd love thoughts related to how we can build a system to allow middleware similar to what sidekiq supports.

Stats should poll to update

Right now the chart updates but we should also be updating the stats at the top of the page with the latest stats.

Outdated version of timex causing issues on OSX

I've been trying get exq running on OSX Yosemite 10.10.5 and have run into an issue when trying to enqueue a job:

iex(7)> {:ok, jid} = Exq.enqueue(:exq, "default", Eventer, []) 
** (exit) exited in: GenServer.call(:exq, {:enqueue, "default", Eventer, []}, 5000)
    ** (EXIT) exited in: :gen_server.call(#PID<0.203.0>, :stop)
        ** (EXIT) no process
    (elixir) lib/gen_server.ex:356: GenServer.call/3
iex(7)> 
21:25:39.328 [error] GenServer :exq_enqueuer terminating
Last message: {:"$gen_cast", {:enqueue, {#PID<0.190.0>, #Reference<0.0.1.4934>}, "default", Eventer, []}}
State: %Exq.Enqueuer.Server.State{namespace: "exq", redis: #PID<0.203.0>, redis_owner: true}
** (exit) an exception was raised:
    ** (MatchError) no match of right hand side value: {:error, "No timezone found for: No timezone found for: AEST"}
        lib/timezone/timezone_dst.ex:20: Timex.Timezone.Dst.is_dst?/1
        lib/dateformat/formatter.ex:139: Timex.DateFormat.Formatters.Formatter.format_token/2
        lib/dateformat/formatters/default.ex:201: Timex.DateFormat.Formatters.DefaultFormatter.do_format/3
        lib/dateformat/formatters/default.ex:194: Timex.DateFormat.Formatters.DefaultFormatter.do_format/3
        lib/dateformat/formatter.ex:36: Timex.DateFormat.Formatters.Formatter.format!/3
        (exq) lib/exq/redis/job_queue.ex:124: Exq.Redis.JobQueue.to_job_json/3
        (exq) lib/exq/redis/job_queue.ex:37: Exq.Redis.JobQueue.enqueue/5
        (exq) lib/exq/enqueuer/server.ex:43: Exq.Enqueuer.Server.handle_cast/2

So it seems my timezone is being returned as "AEST" (Australian Eastern Standard Time). This isn't parsable by Timex, so the enqueing fails.

In iex I get these results when using the version of Timex required by exq:

iex(5)> Timex.Timezone.Local.lookup
"AEST"
iex(6)> Timex.Timezone.local       
{:error, "No timezone found for: AEST"}
iex(7)> Timex.Timezone.Local.parse_tzfile("/etc/localtime")
{:ok, "AEST"}

When I setup a new project with just the latest version of Timex (0.19.5) the timezone seems to work correctly:

iex(2)> Timex.Timezone.local
%Timex.TimezoneInfo{abbreviation: "AEST",
 from: {:sunday, {{2015, 4, 5}, {2, 0, 0}}}, full_name: "Australia/Sydney",
 offset_std: 0, offset_utc: 600, until: {:sunday, {{2015, 10, 4}, {2, 0, 0}}}}
iex(3)> Timex.Timezone.Local.lookup
"Australia/Sydney"
iex(4)> Timex.Timezone.Local.parse_tzfile("/etc/localtime")
{:ok, "AEST"}

It seems to me that bumping the version of Timex should resolve the issue above. Thoughts?

Failure scenarios

Once we settle on actor / supervisor hierarchy, make sure these scenarios are handled and have specs:

Operation fails, but manager / workers should not crash if:

  • Enqueue fails due to Redis being down
  • Enqueue fails due to incorrect Redis datastructure type
  • Invalid JSON on dequeued job
  • Redis connection lost

Standalone Web UI doesn't start

Hi, I managed start exq itself and it works great.
But web UI refuses to start with an error:

$ mix exq.ui --webport 4040

Started ExqUI on Port 4040
** (UndefinedFunctionError) undefined function: Exq.RouterPlug.init/1 (module Exq.RouterPlug is not available)
    Exq.RouterPlug.init([namespace: "", exqopts: [name: :exq_enq_ui, host: '127.0.0.1', webport: 4040]])
    lib/plug/adapters/cowboy.ex:155: Plug.Adapters.Cowboy.dispatch_for/2
    lib/plug/adapters/cowboy.ex:36: Plug.Adapters.Cowboy.args/4
    lib/plug/adapters/cowboy.ex:126: Plug.Adapters.Cowboy.run/4
    lib/mix/tasks/exq.ui.ex:22: Mix.Tasks.Exq.Ui.run/1
    (mix) lib/mix/cli.ex:55: Mix.CLI.run_task/2

Could you point me what is going wrong?

Spawn vs Apply in worker code.

I was doing some perusing of erlang and found

http://www.erlang.org/doc/reference_manual/processes.html

It looks like we can

  def dispatch_work(worker_module, method, args) do
    :erlang.spawn(String.to_atom("Elixir.#{worker_module}"), method, args)
  end

instead of

  def dispatch_work(worker_module, method, args) do
    :erlang.apply(String.to_atom("Elixir.#{worker_module}"), method, args)
  end

This will actually run the worker code as its own process. Im not 100% sure what this does vs having a worker process, but wanted to throw it out there incase it could help clean up the api at all.

[Enhancement] Seperate the Queuing from the Manager

I keep running into places where i want to start Exq, but dont want it to spawn workers. The WebUI is a great example of this. I think it makes sense to be able to use Exq to connect and talk to redis, however i dont want the UI to process to actually run jobs.

If we kept Exq.start as the general queue link, we can create a new supervisor to dispatch workers. I think this relates to #11 and #6

[Enhancement] JobIds should be UUIDs stored in redis

I noticed today that job ids are currently expressed as the redis LIST index, is this really the best way. I feel like as the api matures we might want to do things like Retry jobs and delete jobs, and this could cause race conditions if the index of a job changes. Id like to look at how sidekiq handles jobs and jobids and maybe take some inspiration. Thoughts / Ideas?

If we go UUIDs i am a fan of this library: https://github.com/zyro/elixir-uuid

UI - allow requeue failed jobs

As an end user, I want to re-enqueue failed jobs in the UI

Acceptance Criteria

  • Add "Requeue" button in failed jobs UI view for each job
  • Clicking on "Requeue" button, requeues failed jobs

[Enhancement] WebUI

Would be nice to have an admin panel similar to sidekiq where you can see running tasks / workers etc.

We could develop it as a plug so it can be added to Phoenix and other frameworks using Plug.

[Enhancement] Create a standalone task runner.

Would be nice to have something similar to the sidekiq binary that starts the sidekiq task runner.

I think we could implement this as a separate mix task.

mix exq.run or something along those lines.

Short circuit enqueue directly to workers if there are workers ready to take jobs for queue

When feature is enabled, send job directly to workers if they are free and skip redis enqueue / dequeue.

Also add a configuration option: short_circuit_enabled

Acceptance Criteria:

  • If there are free workers for the specific queue (depends on concurrency setting)
  • If short_circuit_enabled is set to true in config
  • Then don't enqueue job to redis, send directly to worker
  • Same code path should be used except the enqueue / dequeue
  • Stats should still be tracked as usual
  • Should still be put in backup queue

[Enhancement] Task Scheduler

Can we have a way to run tasks at a certain time.

Basically this would set a time to run a task at and not dequeue an item for work until that time has passed.

Basically when a task is queued we put a timestamp, by default that would be NOW but it could also be any future DateTime, so if we build an interface where you say run a task in 5 minutes the time stamp would just be 5 minutes from now.

This would also allow us to build SideTiq like functionality where we could have recurring tasks by scheduling tasks based on their recurrence in the work checking loop.

Investigate performance benchmark

Using this as a quick and dirty benchmark:

    {:ok, pid} = Exq.start([host: '127.0.0.1', port: 6379])
    Exq.enqueue(pid, "default", "MyWorker2", ["arg1", "arg2"])
    for n <- 1..4000, do: Exq.enqueue(pid, "default", "MyWorker", ["arg1", "arg2"])
    :timer.sleep(:infinity)

seems like there were performance differences before / after #19. Some of this may be due to Poison decode / encode of direct structs, and then stats recording. Investigate these.

Error installing exq with Phoenix 0.16

Error caused by plug dependency:

Looking up alternatives for conflicting requirements on plug
  Activated version: 1.0.0
  From exq v0.2.0: >= 0.8.1 and < 1.0.0
  From phoenix_html v2.1.1: ~> 0.13 or ~> 1.0
** (Mix) Hex dependency resolution failed, relax the version requirements or unlock dependencies

WebUI Authentication

We should provide a first class way to auth. For now i think basic auth is fine, but rather than rely on nginx or an upstream plug we should bake it in.

OTP application elixir 1.1.1

I'm trying to use exq with elixir 1.1.1

I keep getting this error when I try to start the app

=INFO REPORT==== 6-Oct-2015::06:39:11 ===
application: logger
exited: stopped
type: temporary
Could not start application exq: Exq.start(:normal, []) returned an error: shutdown: failed to start child: Exq.Manager.Server
** (EXIT) an exception was raised:
** (FunctionClauseError) no function clause matching in :eredis.start_link/6
(eredis) src/eredis.erl:48: :eredis.start_link('redis', "6379", 0, [], 100, 5000)
(exq) lib/exq/manager/server.ex:31: Exq.Manager.Server.init/1
(stdlib) gen_server.erl:328: :gen_server.init_it/6
(stdlib) proc_lib.erl:240: :proc_lib.init_p_do_apply/3

mix.exs:

defmodule AW.Mixfile do
use Mix.Project

def project do
[app: :aw,
version: "0.0.1",
elixir: "~> 1.0",
#build_embedded: Mix.env == :prod,
#start_permanent: Mix.env == :prod,
escript: [main_module: AW],
deps: deps]
end

Configuration for the OTP application

Type mix help compile.app for more information

def application do
[applications: [:logger, :httpoison, :amqp, :postgrex, :exq]]
end

Dependencies can be Hex packages:

{:mydep, "~> 0.3.0"}

Or git/path repositories:

{:mydep, git: "https://github.com/elixir-lang/mydep.git", tag: "0.1.0"}

Type mix help deps for more examples and options

defp deps do
[
{:httpoison, "> 0.7"},
{:poison, "
> 1.4.0"},
{:postgrex, "> 0.8"},
{:exjsx, "
> 3.2.0"},
{:amqp, "0.1.1"},
{:slugger, git: "https://github.com/triptec/slugger.git"},
{:exq, git: "https://github.com/akira/exq.git", branch: "upgrade_timex"}
]
end
end

[Enhancement] Record failures

We should track failures in redis, one to show a stack trace when problem occur and two to allow failures to be retried either manually or automatically.

Redis enhancement - add support for Redis URL

  • Support Redis URL for options instead of having to pass each Redis component separately (host, port, password, etc).
  • Still keep the separate options for people that would like to use those.

Configurable job retries

Retry failed jobs up to a limit. Allow configurable retries.

Sidekiq uses a "retry" key with sorted set / score to track this information. Here is a sample entry:

{"retry":true,"queue":"default","class":"Sidekiq::Extensions::DelayedClass","args":["---\n- !ruby/class 'Array'\n- :blah\n- - 1\n  - 2\n  - 3\n"],"jid":"441fc27d25a90f0b0588a1be","enqueued_at":1414340571.833299,"error_message":"undefined method `blah' for Array:Class","error_class":"NoMethodError","failed_at":"2014-10-26T16:23:10Z","retry_count":4,"retried_at":"2014-10-26T16:28:38Z"}

Will need to figure out how to configure retry period per worker.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.