Code Monkey home page Code Monkey logo

ecto_job's Introduction

EctoJob

Build Status Module Version Hex Docs Inline docs Total Download License Last Updated

A transactional job queue built with Ecto and GenStage.

It is compatible with PostgreSQL and MySQL with a major difference:

  • PostgreSQL: job queue updates are notified to ecto_job through PostgreSQL notification feature.
  • MySQL: job queue updates are notified through database polling.

Goals

  • Transactional job processing
  • Retries
  • Scheduled jobs
  • Multiple queues
  • Low latency concurrent processing
  • Avoid frequent database polling
  • Library of functions, not a full OTP application

Getting Started

Add :ecto_job to your dependencies

  {:ecto_job, "~> 3.1"}

Installation

Add a migration to install the notification function and create a job queue table:

mix ecto.gen.migration create_job_queue
defmodule MyApp.Repo.Migrations.CreateJobQueue do
  use Ecto.Migration

  @ecto_job_version 3

  def up do
    EctoJob.Migrations.Install.up()
    EctoJob.Migrations.CreateJobTable.up("jobs", version: @ecto_job_version)
  end

  def down do
    EctoJob.Migrations.CreateJobTable.down("jobs")
    EctoJob.Migrations.Install.down()
  end
end

By default, a job holds a map of arbitrary data (which corresponds to a jsonb field in the table). If you want to store an arbitrary Elixir/Erlang term in the job (bytea in the table), you can set up the params_type option:

def up do
  EctoJob.Migrations.Install.up()
  EctoJob.Migrations.CreateJobTable.up("jobs", version: @ecto_job_version, params_type: :binary)
end

Compatibility

EctoJob leverages specific PostgreSQL features, like notification mechanism when inserting a new job into a queue.

However, a non-optimized version of EctoJob can be used on top of MySQL >= 8.0.1. Other version of MySQL / MariaDB may not be working because of the use of the following specific syntax:

  • FOR UPDATE SKIP LOCKED
  • Default value for datetime column

Upgrading to version 3.0

To upgrade your project to 3.0 version of ecto_job you must add a migration to update the pre-existent job queue tables:

mix ecto.gen.migration update_job_queue
defmodule MyApp.Repo.Migrations.UpdateJobQueue do
  use Ecto.Migration
  @ecto_job_version 3

  def up do
    EctoJob.Migrations.UpdateJobTable.up(@ecto_job_version, "jobs")
  end
  def down do
    EctoJob.Migrations.UpdateJobTable.down(@ecto_job_version, "jobs")
  end
end

Add a module for the queue, mix in EctoJob.JobQueue. This will declare an Ecto.Schema to use with the table created in the migration, and a start_link function allowing the worker supervision tree to be started conveniently.

defmodule MyApp.JobQueue do
  use EctoJob.JobQueue, table_name: "jobs"
end

For jobs being Elixir/Erlang terms, you should add the :params_type option:

defmodule MyApp.JobQueue do
  use EctoJob.JobQueue, table_name: "jobs", params_type: :binary
end

Add perform/2 function to the job queue module, this is where jobs from the queue will be dispatched.

defmodule MyApp.JobQueue do
  use EctoJob.JobQueue, table_name: "jobs"

  def perform(multi = %Ecto.Multi{}, job = %{}) do
    ... job logic here ...
  end
end

Add your new JobQueue module to the application supervision tree to run the worker supervisor:

def start(_type, _args) do
  import Supervisor.Spec

  children = [
    MyApp.Repo,
    {MyApp.JobQueue, repo: MyApp.Repo, max_demand: 100}
  ]

  opts = [strategy: :one_for_one, name: MyApp.Supervisor]
  Supervisor.start_link(children, opts)
end

If you want to run the workers on a separate node to the enqueuers, just leave your JobQueue module out of the supervision tree.

Usage

Enqueueing jobs

Jobs are Ecto schemas, with each queue backed by a different table. A job can be inserted into the Repo directly by constructing a job with the new/2 function:

%{"type" => "SendEmail", "address" => "[email protected]", "body" => "Welcome!"}
|> MyApp.JobQueue.new()
|> MyApp.Repo.insert()

For inserting any arbitrary Elixir/Erlang term:

{"SendEmail", "[email protected]", "Welcome!"}
|> MyApp.JobQueue.new()
|> MyApp.Repo.insert()

or

|> %MyStruct{}
|> MyApp.JobQueue.new()
|> MyApp.Repo.insert()

A job can be inserted with optional params:

  • :schedule : runs the job at the given %DateTime{}. The default value is DateTime.utc_now().
  • :max_attempts : the maximum attempts for this job. The default value is 5.
  • :priority (integer): lower numbers run first; default is 0
%{"type" => "SendEmail", "address" => "[email protected]", "body" => "Welcome!"}
|> MyApp.JobQueue.new(max_attempts: 10)
|> MyApp.Repo.insert()

%{"type" => "SendEmail", "address" => "[email protected]", "body" => "Welcome!"}
|> MyApp.JobQueue.new(priority: 1)
|> MyApp.Repo.insert()

%{"type" => "SendEmail", "address" => "[email protected]", "body" => "Welcome!"}
|> MyApp.JobQueue.new(priority: 2, max_attempts: 2)
|> MyApp.Repo.insert()

The primary benefit of EctoJob is the ability to enqueue and process jobs transactionally. To achieve this, a job can be added to an Ecto.Multi, along with other application updates, using the enqueue/3 function:

Ecto.Multi.new()
|> Ecto.Multi.insert(:add_user, User.insert_changeset(%{name: "Joe", email: "[email protected]"}))
|> MyApp.JobQueue.enqueue(:email_job, %{"type" => "SendEmail", "address" => "[email protected]", "body" => "Welcome!"})
|> MyApp.Repo.transaction()

Handling Jobs

All jobs sent to a queue are eventually dispatched to the perform/2 function defined in the queue module. The first argument supplied is an Ecto.Multi which has been initialized with a delete operation, marking the job as complete. The Ecto.Multi struct must be passed to the Ecto.Repo.transaction function to complete the job, along with any other application updates.

defmodule MyApp.JobQueue do
  use EctoJob.JobQueue, table_name: "jobs"

  def perform(multi = %Ecto.Multi{}, job = %{"type" => "SendEmail", "recipient" => recipient, "body" => body}) do
    multi
    |> Ecto.Multi.run(:send, fn _repo, _changes -> EmailService.send(recipient, body) end)
    |> Ecto.Multi.insert(:stats, %EmailSendStats{recipient: recipient})
    |> MyApp.Repo.transaction()
  end
end

When a queue handles multiple job types, it is useful to pattern match on the job and delegate to separate modules:

defmodule MyApp.JobQueue do
  use EctoJob.JobQueue, table_name: "jobs"

  def perform(multi = %Ecto.Multi{}, job = %{"type" => "SendEmail"}),      do: MyApp.SendEmail.perform(multi, job)
  def perform(multi = %Ecto.Multi{}, job = %{"type" => "CustomerReport"}), do: MyApp.CustomerReport.perform(multi, job)
  def perform(multi = %Ecto.Multi{}, job = %{"type" => "SyncWithCRM"}),    do: MyApp.CRMSync.perform(multi, job)
  ...
end

Options

You can customize how often the table is polled for scheduled jobs. The default is 60_000 ms.

config :ecto_job, :poll_interval, 15_000

Control the time for which the job is reserved while waiting for a worker to pick it up, before the poller will make the job available again for dispatch by the producer. The default is 60_000 ms.

config :ecto_job, :reservation_timeout, 15_000

Control the delay between retries following a job execution failure. Keep in mind, for jobs that are expected to retry quickly, any configured retry_timeout will only retry a job as quickly as the poll_interval. The default is 30_000 ms (30 seconds).

config :ecto_job, :retry_timeout, 30_000

Control the timeout for job execution before an "IN_PROGRESS" job is assumed to have failed. Begins when job is picked up by worker. Similarly to retry_timeout, any configured execution_timeout will only retry a job as quickly as the poll_interval. The default is 300_000 ms (5 mins).

config :ecto_job, :execution_timeout, 300_000

You can control whether logs are on or off and the log level. The default is true and :info.

config :ecto_job, log: true,
                  log_level: :debug

See EctoJob.Config for configuration details.

How it works

Each job queue is represented as a PostgreSQL table and Ecto schema.

Jobs are added to the queue by inserting into the table, using Ecto.Repo.transaction to transactionally enqueue jobs with other application updates.

A GenStage producer responds to demand for jobs by efficiently pulling jobs from the queue in batches. When there is insufficient jobs in the queue, the demand for jobs is buffered.

As jobs are inserted into the queue, pg_notify notifies the producer that new work is available, allowing the producer to dispatch jobs immediately if there is pending demand.

A GenStage ConsumerSupervisor subscribes to the producer, and spawns a new Task for each job.

The callback for each job receives an Ecto.Multi structure, pre-populated with a delete command to remove the job from the queue.

Application code then add additional commands to the Ecto.Multi and submit it to the Repo with a call to transaction, ensuring that application updates are performed atomically with the job removal.

Scheduled jobs and Failed jobs are reactivated by polling the database once per minute.

Job Lifecycle

Jobs scheduled to run at a future time start in the "SCHEDULED" state. Scheduled jobs transition to "AVAILABLE" after the scheduled time has passed.

Jobs that are intended to run immediately start in an "AVAILABLE" state.

The producer will update a batch of jobs setting the state to "RESERVED", with an expiry of 5 minutes unless otherwise configured.

Once a consumer is given a job, it increments the attempt counter and updates the state to "IN_PROGRESS", with an initial timeout configurable as execution_timeout, defaulting to 5 minutes. If the job is being retried, the expiry will be initial timeout * the attempt counter.

If successful, the consumer can delete the job from the queue using the preloaded multi passed to the perform/2 job handler. If an exception is raised in the worker or a successful processing attempt fails to successfully commit the preloaded multi, the job is transitioned to the "RETRY" state, scheduled to run again after retry_timeout * attempt counter. If the processes is killed or is otherwise unable to transition to "RETRY", it will remain in "IN_PROGRESS" until the execution_timeout expires.

Jobs in the "RESERVED" or "IN_PROGRESS" state past the expiry time will be returned to the "AVAILABLE" state.

Expired jobs in the "IN_PROGRESS" state with attempts >= MAX_ATTEMPTS move to a "FAILED" state. Failed jobs are kept in the database so that application developers can handle the failure.

Job Timeouts and Transactional Safety

When performing long-running jobs or when configuring a short execution timeout, keep in mind that a job may be retried before it has finished and the retry has no proactive mechanism to cancel the running job.

In the case that the initial job attempts to finish and commit a result, and the commit includes the preloaded multi passed as the first parameter to perform/2, the optimistic lock will fail the transaction.

In the case where the job performs other side effects outside of the transaction such as calls to external APIs or additional database writes, these are suggested to implement other idempotency guarantees, as they will not be rolled back in a failed or duplicated job.

Copyright and License

Copyright (c) 2017 Mike Buhot

This library is released under the MIT License. See the LICENSE.md file for further details.

ecto_job's People

Contributors

bmanuel avatar cpjolicoeur avatar darksheik avatar davidoliver avatar enzoqtvf avatar gabriel128 avatar jeanparpaillon avatar kianmeng avatar lukyanov avatar mbuhot avatar mkorszun avatar mmartinson avatar niku avatar ramondelemos avatar sneako avatar vitortrin avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

ecto_job's Issues

Honor host application timezone configuration

Problem Description

With an Ecto.Repo configured to use a different time type, like:

config :my_app, MyApp.Repo,
  ... other config ...
  migration_timestamps: [
    type: :timestamptz,
    usec: true
  ],
... other config ...

the jobs table is created with inserted_at and updated_at fields with the configured type, as expected, but creates the expires and schedule columns with the library's defined type (also expected). This behavior is completely normal, but causes application developers to have to write and maintain code to handle time differently for this library than they do elsewhere throughout the application.

Expected Behavior

Libraries should "play nice" with existing Ecto.Migration and Ecto.Repo configuration options, and should honor the configuration of the host application's Repo with respect to time column type and autogeneration.

Workaround

It isn't possible to use an application-defined time type, so the only workaround available seems to be to create the table via the migration as usual, then remember to deal with time differently when working with the application's jobs.

Potential Solutions

Todo: Support ecto 3 and ecto_sql

Ecto 3 is coming in the next couple days and with it the recommended upgrade path for those using it for database interaction is to change dependency from ecto to ecto_sql.

I think to support backwards compatibly for presumably a long time we'll want to introduce an optional dependency.

Reference commit and announcement.

I'll be able to take this on eventually as I'm working on a new project that will be using ecto_sql and will want to use this lib, if no one else gets to it before then.

Using Multiple Queues

Hi,

I am wondering if I could set max demands per job_type.

The scenario is like this - I have two kinds of jobs, which are 1. sequential jobs (job processing is one at a time), 2. parallel jobs (job processings are done many at a time)

How can I use ecto_job to accommodate these kinds of jobs?

Requeue seems inconsistent with the general use of Ecto.Multi

One would typically want to handle a multi in the following pattern:

{existing multi}
|> Ecto.Multi.merge(fn %{job: job} = multi ->
     Ecto.Multi.new()
     |> __MODULE__.requeue("requeue_job", job)
end)
|> MyApp.Repo.transaction()
|> case do
       {:error, :non_failed_job} -> do_something
    end

The above would also allow you to pass the error on to fallback controllers and such.

However, the way the requeue function is currently implimented, it does not just register an error on the multi, but rather returns an error. This forces one do add a non-elegant error handling block somewhere midstream something like the following.

|> Ecto.Multi.merge(fn %{job: job} = multi ->
          Ecto.Multi.new()
          |> __MODULE__.requeue("requeue_job", job)
          |> case do
            {:error, :non_failed_job} ->
              Ecto.Multi.new()
              |> Ecto.Multi.error("requeue_job", {:error, :non_failed_job})

            any ->
              any
          end
        end)
|> MyApp.Repo.transaction()
|> case do
       {:error, :non_failed_job} -> do_something
    end

Any interest in using postgres notifications?

Hey folks!

I'm looking at switching out our home grown postgres based job library with ecto_job. One thing we do however is have a trigger on the queue table that does a pg_notify to a jobs topic. Then there's a Postgrex.Notifications process that listens for these and uses it to trigger a job fetch.

Notably, this is NOT used to replace getting the rows out of the database with lock: "FOR UPDATE SKIP LOCKED". It does replacing the polling however, and decreases the overall latency.

Any interest in having a PR that adds this (perhaps optionally?) to ecto_job?

Maintainers Wanted!

@ramondelemos @lukyanov @jeanparpaillon I'd really like to get your open PRs merged.
Unfortunately I'm only able to work on EctoJob in my free time, and haven't been able to thoroughly test out the changes.

Would any of you like to become a co-maintainer of EctoJob?

It's generally not a lot of work - EctoJob has been fairly stable over the years. I just like to be careful with updates since a bug in a job queue library could have a big impact on users :)

error list for unsuccessful jobs

Dear @mbuhot , I am thinking about on how to create a PR that deals with job errors, adding an error list in the job data-structure, so every non successful trial will add an error message to the job queue. It will help to have a good vision on what is happening behind the scenes of every job attempt.

Please, could you send me some small guidelines on how to implement it
regards
Henry

LogLevel in configuration

Hi,

I tried to set log_level in configuration (according to readme), but it still logs like crazy. Is this functionality working?

I just try to get rid of rather useless reoccurring log messages in iex shell.

Thanks!

hex.pm

Excellent job!
let me suggest to delivery a tagged version to hex.pm

EctoJob UI

Following from the proof-of-concept in #46 this issue is to track the integration between ecto_job and a separate EctoJob UI from @AaronCowan.

  • Update README to link to ecto_job_ui project
  • Add any additional calls to pg_notify for job state changes

Publish 3.0 to hex.pm

It appears that 3.0 is not published to hex.pm. Is there a plan to do that soonish? I like that the indexes were added to some columns. Nice project btw!

EDIT: Is 3.0 still in dev?

pg_notify not working when testing

Using notification when testing it seems like pg notifications are not triggered.
When using the dev env I have notifications working.
I guess this is related to the ecto Sandbox.

OTP 21
Elixir 1.8.1
ecto_job 2.0.0
ecto 3.0.0
postgrex 0.14.1

mysql compatible version ?

Is it planned to have a mysql compatible version of ecto_job.

Some projects (unfortunately) relies on mysql. It would be useful ecto_job to be compatible with mysql, even with limited features (poll-based job producer).

Base expiry constant is not clear

Hey. So I've been reading through your library to determine if its appropriate to use in a production codebase that need to handle fairly modest load but with strong safety guarantees. There's one piece of behaviour that I didn't understand exactly, which is the the 300s base expiry value, where it comes from, whether it makes sense to have it configurable, and how it interacts with the rest of the processing lifecycle.

I understand that this 300s for the RESERVED expiry is the maximum time between when the job is dispatched from the Producer while the Worker is permitted to begin processing. For this use case it seems like an arbitrary but reasonable value, if not a bit high, though it's not obvious what the failure conditions would be in GenStage and when it would make sense to retry earlier

For the IN_PROGRESS expiry during attempt 1, which would be 300s, is it correct that only on the next poll, if the expiry had passed then the job would be retried, assuming it had failed?

The reason I ask, is because 300s seems too long for a first retry attempt. As far as I can understand the real retry timeout would be whatever is smaller between the expiry and the poll interval. Does it makes sense to you @mbuhot to make this configurable as well? Can you think of any unintended consequences of setting it to, say, 10s, other than burning through the retry attempts more quickly?

EctoJob just suddenly stop.

Hi,

We are experiencing that Ecto Job just stops working.

Our setup ->

  • We have no. of elixir nodes (usually 3) set up to connect to one PostgreSQL database.

We continue to experience that Ecto Job just stop working. And we have to restart our elixir node to make things start working again.

When Ecto Job stops working, we can observe that there are unprocessed records (around 15-30 records) in AVAILABLE states in the job_queue table.

Please recommend how to check this issue and to protect this from happening again.

DBConnection error when running test with :manual mode for sandbox

I have an application using EctoJob and it seems I'm getting errors when I'm using the manual sandbox mode.

This is very strange

because I have the same setup in another app and this is not happening.

I can't seem to find what's wrong

See Ecto.Adapters.SQL.Sandbox docs for more information.
    (ecto_sql) lib/ecto/adapters/sql.ex:626: Ecto.Adapters.SQL.raise_sql_call_error/1
    (ecto_sql) lib/ecto/adapters/sql.ex:562: Ecto.Adapters.SQL.execute/5
    (ecto) lib/ecto/repo/queryable.ex:177: Ecto.Repo.Queryable.execute/4
    (ecto_job) lib/ecto_job/producer.ex:184: EctoJob.Producer.dispatch_jobs/2
    (gen_stage) lib/gen_stage.ex:2103: GenStage.noreply_callback/3
    (stdlib) gen_server.erl:637: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:711: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: {:"$gen_producer", {#PID<0.427.0>, #Reference<0.3185423543.3099852806.167003>}, {:ask, 100}}
12:02:12.163 [error] GenServer #PID<0.427.0> terminating
** (DBConnection.OwnershipError) cannot find ownership process for #PID<0.426.0>.

test_helper.exs

ExUnit.start()

Ecto.Adapters.SQL.Sandbox.mode(Dispatch.Repo, :manual)

If I comment out the last line everything works.

Configure max_demand and polling_interval similarly

max_demand is passed as a param to the start_link function, while polling_interval is configured through the application environment.

Unify the two approaches into a Config struct that takes values passed to start_link and falls back to the application environment.

completed jobs

Hi,
pls, is there any configuration or explicit message to send that marks jobs as completed ?
After running my jobs, the state continues as "IN PROGRESS".

Do you think that a configurable cleaner job should be made to clean the completed jobs after X time ?

Non-concurrent queue

Hello,

can a queue be processed one job after another?

The use-case is generating invoices where the transaction would fail if the invoice has the same number so the jobs needs to run one after another.

As I understand it I could move all single jobs (per customer) to be one big job to avoid running into possible transaction failures, but before that I wanted to explore if there is a possibility to simply run them in sequence.

I can imagine this might be useful to other use-cases too.

Breaks with Repos configured to use UUIDs by default

Problem Description

With an Ecto.Repo configured to use UUIDs for new migrations by default, like:

config :my_app, MyApp.Repo,
  ... other config ...
  migration_primary_key: [
    id: :uuid,
    type: :binary_id,
    autogenerate: true
  ],
... other config ...

the jobs table is created with a PK id column of type UUID, as expected, but the application code is expecting an integer, preventing jobs from being created, even if explicitly given a UUID.

Expected Behavior

Libraries should "play nice" with existing Ecto.Migration and Ecto.Repo configuration options, and should honor the configuration of the host application's Repo with respect to id column type and autogeneration.

Workaround

It isn't possible to use a UUID at all, so the only workaround available is to create the table with a primary ID type of integer. The most straightforward way to do this is to simply comment out the relevant configuration before running the migration that creates the job queue table.

Potential Solutions

Negative Demand Errors

Hi there.

We've got EctoJob running on a reasonably busy queue and have recent started getting error reports such as these:

ERROR 2201W (invalid_row_count_in_limit_clause) LIMIT must not be negative

We were able to trace this back to a negative demand in the Producer state, for example:

%EctoJob.Producer.State{clock: &DateTime.utc_now/0, demand: -31, execution_timeout: 300000, notifications_listen_timeout: 5000, notifier: #PID<0.2601.0>, poll_interval: 60000, repo: Repo, reservation_timeout: 60000, schema: JobQueue}

After a bit of digging and debugging, we narrowed the error down to this part of the code: https://github.com/mbuhot/ecto_job/blob/master/lib/ecto_job/producer.ex#L184-L185

The count returned by JobQueue.reserve_available_jobs is greater than the demand, which results in the demand being updated to a negative number in the Producer's state. Then the following handle_demand fails.

We dug into this a little more, because, looking the JobQueue.reserve_available_jobs query it looks like this shouldn't be happening, because the limit is applied. Then we found this. Basically, there's an issue with using limit in a subquery of an update which can result in the limit being ignored if the query planner chooses a Nested Loop in the execution plan.

The work around for this is to use a CTE, however Ecto doesn't support that. There is a PR on Ecto to add support for CTEs, after which you'd be able to apply the work around (I hope). I couldn't find any other way to get Ecto to use a CTE.

Add to documentation used connections

Our application was using more connections than what we configured for our Repo. We later found out that every job queue uses an exclusive connection, and that caused the strange number of connections.

Somewhere in the documentation it should explained that usage of connections so the user can understand the behaviour of his application.

Use clock from database rather than application?

Hi there, thanks for making EctoJob! I've been evaluating it recently to possibly use it at our company.

One of the things I noticed while skimming the source is the number of places where you're generating the current time in application code. This jumped out at me given the number of times I've seen issues caused by distributed application servers whose system clocks are not perfectly in sync, timezone misconfigurations, etc ๐Ÿ˜ฉ

Given that Postgres has a now() function to fetch the current time from the start of the in-progress transaction, and given that this function wouldn't susceptible to skewed clocks, I wondered if you've given any thought to using on the time from the database rather than always calling DateTime.utc_now(). It wouldn't necessarily protect you from issues arising from scheduled jobs, but it might eliminate whole classes of other potential problems.

perform later?

We started to use ecto_job on the project I am working on, but I cannot find if it's possible to schedule a job for running later (e.g. 30 minutes after it's schedule, and not before). I very used to this feature from other job processing libraries such as Sidekiq. Is it supported? If not, are you planning on adding something like this?

Multi-node postgrex listener resulting in more queries than necessary.

We've been noticing some increased CPU usage that seems to be due to an abundance of calls for available jobs to the database.

We have in the neighborhood of 100's of jobs inserted per minute. We have max_demand setup at 100 and we currently have 2 nodes churning away. The workers seem to keep up with the queue so we're almost always buffering demand.

The way I understand the issue, it seems like each insert into the table will result in a call to dispatch_jobs on both nodes so they are competing to get these individual jobs. Also, due to the number of rows we're inserting and how the workers are almost always ahead of what's being queued, we end up doing this A LOT.

I think the desired behavior would actually be to just have the jobs queue up a little more before dispatching. I'm wondering if I should just kill the PG_Notify stuff in my implementation and just rely on polling? Any better ideas?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.