Code Monkey home page Code Monkey logo

djjob's Introduction

DJJob

DJJob allows PHP web applications to process long-running tasks asynchronously. It is a PHP port of delayed_job (developed at Shopify), which has been used in production at SeatGeek since April 2010.

Like delayed_job, DJJob uses a jobs table for persisting and tracking pending, in-progress, and failed jobs.

Requirements

  • PHP5
  • PDO (Ships with PHP >= 5.1)
  • (Optional) PCNTL library

Setup

Import the sql database table.

mysql db < jobs.sql

The jobs table structure looks like:

CREATE TABLE `jobs` (
`id` INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
`handler` TEXT NOT NULL,
`queue` VARCHAR(255) NOT NULL DEFAULT 'default',
`attempts` INT UNSIGNED NOT NULL DEFAULT 0,
`run_at` DATETIME NULL,
`locked_at` DATETIME NULL,
`locked_by` VARCHAR(255) NULL,
`failed_at` DATETIME NULL,
`error` TEXT NULL,
`created_at` DATETIME NOT NULL
) ENGINE = INNODB;

You may need to use BLOB as the column type for handler if you are passing in serialized blobs of data instead of record ids. For more information, see this link This may be the case for errors such as the following: unserialize(): Error at offset 2010 of 2425 bytes

Tell DJJob how to connect to your database:

DJJob::configure([
    'driver' => 'mysql',
    'host' => '127.0.0.1',
    'dbname' => 'djjob',
    'user' => 'root',
    'password' => 'topsecret',
]);

Usage

Jobs are PHP objects that respond to a method perform. Jobs are serialized and stored in the database.

<?php
// Job class
class HelloWorldJob {
    public function __construct($name) {
        $this->name = $name;
    }
    public function perform() {
        echo "Hello {$this->name}!\n";
    }
}

// enqueue a new job
DJJob::enqueue(new HelloWorldJob("delayed_job"));

Unlike delayed_job, DJJob does not have the concept of task priority (not yet at least). Instead, it supports multiple queues. By default, jobs are placed on the "default" queue. You can specifiy an alternative queue like:

DJJob::enqueue(new SignupEmailJob("[email protected]"), "email");

At SeatGeek, we run an email-specific queue. Emails have a sendLater method which places a job on the email queue. Here's a simplified version of our base Email class:

class Email {
    public function __construct($recipient) {
        $this->recipient = $recipient;
    }
    public function send() {
        // do some expensive work to build the email: geolocation, etc..
        // use mail api to send this email
    }
    public function perform() {
        $this->send();
    }
    public function sendLater() {
        DJJob::enqueue($this, "email");
    }
}

Because Email has a perform method, all instances of the email class are also jobs.

Running the jobs

Running a worker is as simple as:

$worker = new DJWorker($options);
$worker->start();

Initializing your environment, connecting to the database, etc. is up to you. We use symfony's task system to run workers, here's an example of our jobs:worker task:

<?php
class jobsWorkerTask extends sfPropelBaseTask {
  protected function configure() {
    $this->namespace        = 'jobs';
    $this->name             = 'worker';
    $this->briefDescription = '';
    $this->detailedDescription = <<<EOF
The [jobs:worker|INFO] task runs jobs created by the DJJob system.
Call it with:

  [php symfony jobs:worker|INFO]
EOF;
    $this->addArgument('application', sfCommandArgument::OPTIONAL, 'The application name', 'customer');
    $this->addOption('env', null, sfCommandOption::PARAMETER_REQUIRED, 'The environment', 'dev');
    $this->addOption('connection', null, sfCommandOption::PARAMETER_REQUIRED, 'The connection name', 'propel');
    $this->addOption('queue', null, sfCommandOption::PARAMETER_REQUIRED, 'The queue to pull jobs from', 'default');
    $this->addOption('count', null, sfCommandOption::PARAMETER_REQUIRED, 'The number of jobs to run before exiting (0 for unlimited)', 0);
    $this->addOption('sleep', null, sfCommandOption::PARAMETER_REQUIRED, 'Seconds to sleep after finding no new jobs', 5);
}

  protected function execute($arguments = array(), $options = array()) {
    // Database initialization
    $databaseManager = new sfDatabaseManager($this->configuration);
    $connection = Propel::getConnection($options['connection'] ? $options['connection'] : '');

    $worker = new DJWorker($options);
    $worker->start();
  }
}

The worker will exit if the database has any connectivity problems. We use god to manage our workers, including restarting them when they exit for any reason.

Changes

  • Eliminated Propel dependency by switching to PDO

djjob's People

Contributors

andreweq avatar asuth avatar atimmer avatar erwaller avatar iansltx avatar jfloff avatar josegonzalez avatar jstayton avatar lukebaker avatar michaeldauria avatar mmcev106 avatar philfreo avatar rjocoleman avatar shijialee avatar spiasecki avatar tim-wakeless avatar zackkitzmiller avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

djjob's Issues

One test is failing

Hi,

the test/database.php is failing because the worker has invalid configuration.

It is:

$worker = new DJWorker(array("count" => 5, "max_attempts" => 2, "sleep" => 10));

while it should be:

$worker = new DJWorker(array("count" => 6, "max_attempts" => 2, "sleep" => 10));

Indeed, you create 4 OK jobs + 1 Failing Job with 2 attempts. That is a total of 6 run.

self::$jobsTable Is Never Set When DJJob::setConnection() Is Called

Based on the default functionality of the CakeDjjob behavior, CakeDjjob will call for DJJob::setConnection() without setting self::$jobsTable. It might be useful if DJJob::setConnection() has an optional parameter named $jobsTable that defaults with the value 'jobs'. There should be a line inside DJJob::setConnection() that will set self::$jobsTable = $jobsTable. Right now, since self::$jobsTable is empty, I'm getting errors like the screenshot attached here.
screen shot 2014-02-12 at 5 37 17 pm

When running many simultaneous workers, a job can be run twice

We have a lot of simultaneous running workers (15 for our production site). And sometimes it happens, that jobs get run twice.

What happens is (at least I think):

  • Job Worker 1 selects job 1.
  • Job Worker 2 selects job 1.
  • Job Worker 1 locks job 1
  • Job Worker 2 locks job 1
  • Both do job 1.

Can this happen? I've noticed this several times already that a job was executed twice.

Grettings,
Sebastian

Change log output format to ISO 8601

Small change, would change the log format though:

protected static function log($mesg, $severity = self::CRITICAL) {
    if ($severity >= self::$log_level) {
        printf("[%s] %s\n", date('c'), $mesg);
    }
}

Thoughts? I will have this in a pull request in a bit.

Ability to fork prior to performing a given job to increase mainline worker stability

There are a few job types that we've been having intermittent issues with, primarily due to timeout errors on third party services, which will hang/terminate the worker despite having exception handling. If we could set a flag within the job object so that those jobs would fork before executing, their failure would be less destructive to the overall worker process.

Granted, this is a bit of a major change to the library, but it should be doable without too much more code. If we don't convert to another queueing system soon I'll implement this on my fork.

Adding Priority to Queues

Hi,

We were trying to add priority to the Jobs queue, and we noticed that you use the "shuffle" method to "avoid deadlock" issues, what kinda deadlock issues are we talking about here ?

Thanks
Rishav

convert datetime usage to unix timestamps

Curious if you'd accept a patch that does this.

My main reason is that it makes timezones irrelevant. I have some machines where the job server and the database server aren't in the same timezone.

  • Since MySQL datetimes don't contain timezone information, correctly scheduling a job to run in the future requires conversion from PHP to MySQL timezone, which isn't simple to do because of all the various config options which can be involved.
  • Monitoring the amount of time a job has been on a queue is complex for the same reason.

All the existing functionality could be easily replicated with unix timestamps. Thoughts?

Handling jobs that have been locked for too long

It seems like we should have some way to specify if jobs should "time out" if they've been locked for too long.

I'm not exactly sure why but occasionally I get some jobs that get stuck as locked (no error) and I believe the process has died.

@illoyd has this commit as one option:
illoyd@3140b2a

It's important that rather than having a single long-running DJ Worker script that never ends, that each time you deploy code that could possibly affect the jobs that you "restart" the djworker. For example, if a job calls a function and you change a class that a job interacts with, you need to restart the djworker so that it "sees" the new PHP code for that class.

To solve this right now during deployment we are killing our djworker, which I'm guessing is what causes some of them to get locked and never unlocked. Even with pcntl stuff installed, I'm not sure it's properly receiving/handling the signal.

How should this be handled?

When using DJJob in other CLI scripts to enqueue jobs, the SQL connection might be down

We have a Twitter Streaming API CLI daemon that inserts jobs into our database when a tweet comes in.

Now it happened, that DJJob seemed to have lost the connection to the database (or the connection was simply timed out) when the Twitter script ran a long time. I saw

* [JOB] failed to enqueue new job

log entries.

Maybe we should add some logic to DJJob to automatically reconnect to the database, when the connection was lost?

PHP Warning: unserialize(): Function unserialize_jobs() hasn't defined the class it was called for in /app/plugins/djjob/DJJob.php on line 274

This is the error I get when trying to execute cake worker run after a job has been enqueued. I am using the latest version of your Djjob project together with with https://github.com/josegonzalez/cake_djjob .

I don't know why you are using unserialize(). You only appear to use it in one place; might as well do the logic right there instead of using that quirky override, but maybe I'm wrong.

Any ideas?

Project license

Can you clarify the license djjob is released under? Thanks much!

DJWorker Options Help

Please explain the Djworker options

$options = array_merge(array(
            "queue" => "default",
            "count" => 0,
            "sleep" => 5,
            "max_attempts" => 5,
            "fail_on_output" => false
        )

Worker Details

Hello how it possible to check if worker'(s) is running and how many workers running?
How to stop a worker.
How to start worker in single mode?, [if worker is running do not allow to start another worker].

Question: Do I have to require every job class before running the job?

Hi,

I go the problem solved while composing this question.. thought I should just post it anyway.. maybe there is better way doing things

original question

I am tryint to use djjob in my codeigniter code and had done some tests and got it working or I think... But I have one problem that I can't figure out with my limited php knowledge and I thought I would ask here.

I have a function that does job enqueue and the job is created in the database. code as such:

    public function test() {
        require_once APPPATH. 'addons/djjobs/HelloWorldJob.php';                                                                                                                                                                              
        $this->load->library('DJJob_lib');
        $this->djjob_lib->enqueue(new HelloWorldJob('test me'));
    }

DJJob_lib is a simple wrapper class that initiates database connection etc..

I have another function that starts the job, code:

    public function dj_worker($sleep = 5, $queue = 'default', $count = 0, $max_attempts = 5) {
        $this->load->library('DJJob_lib');
        // require_once APPPATH. 'addons/djjobs/HelloWorldJob.php';
        $this->djjob_lib->start(array("count" => $count, "max_attempts" => $max_attempts, "sleep" => $sleep, "queue" => $queue));
    }

NB: the commented require_once line in the above code.

I discovered that I have to require_once in order to have djjob start the queued job. Otherwise, I am getting unserialize(): Function spl_autoload_call() hasn't defined the class it was called for error when running the dj_worker function.

In the end, I have to do the following:

    public function dj_worker($sleep = 5, $queue = 'default', $count = 0, $max_attempts = 5) {
        $this->load->library('DJJob_lib'); 
        ini_set('unserialize_callback_func', 'callback_spl');
        function callback_spl($className)  {
                //var_dump(__FUNCTION__ . ' : ' . $className);
                require_once APPPATH. 'addons/djjobs/' . $className . '.php';
        }
        $this->djjob_lib->start(array("count" => $count, "max_attempts" => $max_attempts, "sleep" => $sleep, "queue" => $queue));
    }

Above solves my problem.

END

Publishing this project on packagist.org

When someone wants to install this Magento extension via Composer, it will not be possible by running composer require seatgeek/djjob. That's because each new version should be tagged, currently there is no tag, meaning no stable release is available.

Things to do:

  • add "version": "1.0.0" to composer.json
  • create a Git tag 1.0.0 from the current master branch
  • enable auto update to let packagist publish future releases automatically

I've created a fork and published it here: https://packagist.org/packages/elvetemedve/djjob
I'll remove it as soon as the original package is available on packagist.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.