Code Monkey home page Code Monkey logo

agenda's Introduction

Agenda

Agenda

A light-weight job scheduling library for Node.js

This was originally a fork of agenda.js, it differs from the original version in following points:

  • Complete rewrite in Typescript (fully typed!)
  • mongodb4 driver (supports mongodb 5.x)
  • Supports mongoDB sharding by name
  • touch() can have an optional progress parameter (0-100)
  • Bugfixes and improvements for locking & job processing (concurrency, lockLimit,..)
  • Breaking change: define() config paramter moved from 2nd position to 3rd
  • getRunningStats()
  • automatically waits for agenda to be connected before calling any database operations
  • uses a database abstraction layer behind the scene
  • does not create a database index by default, you can set ensureIndex: true when initializing Agenda or run manually:
db.agendaJobs.ensureIndex({
    "name" : 1,
    "nextRunAt" : 1,
    "priority" : -1,
    "lockedAt" : 1,
    "disabled" : 1
}, "findAndLockNextJobIndex")

Agenda offers

  • Minimal overhead. Agenda aims to keep its code base small.
  • Mongo backed persistence layer.
  • Promises based API.
  • Scheduling with configurable priority, concurrency, repeating and persistence of job results.
  • Scheduling via cron or human readable syntax.
  • Event backed job queue that you can hook into.
  • Agenda-rest: optional standalone REST API.
  • Inversify-agenda - Some utilities for the development of agenda workers with Inversify.
  • Agendash: optional standalone web-interface.

Feature Comparison

Since there are a few job queue solutions, here a table comparing them to help you use the one that better suits your needs.

Feature Bull Bee Agenda
Backend redis redis mongo
Priorities
Concurrency
Delayed jobs
Global events
Rate Limiter
Pause/Resume
Sandboxed worker
Repeatable jobs
Atomic ops ~
Persistence
UI
REST API
Central (Scalable) Queue
Supports long running jobs
Optimized for Jobs / Messages Messages Jobs

Kudos for making the comparison chart goes to Bull maintainers.

Installation

Install via NPM

npm install @hokify/agenda

You will also need a working Mongo database (v4+) to point it to.

Example Usage

const mongoConnectionString = 'mongodb://127.0.0.1/agenda';

const agenda = new Agenda({ db: { address: mongoConnectionString } });

// Or override the default collection name:
// const agenda = new Agenda({db: {address: mongoConnectionString, collection: 'jobCollectionName'}});

// or pass additional connection options:
// const agenda = new Agenda({db: {address: mongoConnectionString, collection: 'jobCollectionName', options: {ssl: true}}});

// or pass in an existing mongodb-native MongoClient instance
// const agenda = new Agenda({mongo: myMongoClient});

agenda.define('delete old users', async job => {
	await User.remove({ lastLogIn: { $lt: twoDaysAgo } });
});

(async function () {
	// IIFE to give access to async/await
	await agenda.start();

	await agenda.every('3 minutes', 'delete old users');

	// Alternatively, you could also do:
	await agenda.every('*/3 * * * *', 'delete old users');
})();
agenda.define(
	'send email report',
	async job => {
		const { to } = job.attrs.data;
		await emailClient.send({
			to,
			from: '[email protected]',
			subject: 'Email Report',
			body: '...'
		});
	},
	{ priority: 'high', concurrency: 10 }
);

(async function () {
	await agenda.start();
	await agenda.schedule('in 20 minutes', 'send email report', { to: '[email protected]' });
})();
(async function () {
	const weeklyReport = agenda.create('send email report', { to: '[email protected]' });
	await agenda.start();
	await weeklyReport.repeatEvery('1 week').save();
})();

Full documentation

See also https://hokify.github.io/agenda/

Agenda's basic control structure is an instance of an agenda. Agenda's are mapped to a database collection and load the jobs from within.

Table of Contents

Configuring an agenda

All configuration methods are chainable, meaning you can do something like:

const agenda = new Agenda();
agenda
  .database(...)
  .processEvery('3 minutes')
  ...;

Possible agenda config options:

{
	name: string;
	defaultConcurrency: number;
	processEvery: number;
	maxConcurrency: number;
	defaultLockLimit: number;
	lockLimit: number;
	defaultLockLifetime: number;
	ensureIndex: boolean;
	sort: SortOptionObject<IJobParameters>;
	db: {
		collection: string;
		address: string;
		options: MongoClientOptions;
	}
	mongo: Db;
}

Agenda uses Human Interval for specifying the intervals. It supports the following units:

seconds, minutes, hours, days,weeks, months -- assumes 30 days, years -- assumes 365 days

More sophisticated examples

agenda.processEvery('one minute');
agenda.processEvery('1.5 minutes');
agenda.processEvery('3 days and 4 hours');
agenda.processEvery('3 days, 4 hours and 36 seconds');

database(url, [collectionName], [MongoClientOptions])

Specifies the database at the url specified. If no collection name is given, agendaJobs is used.

By default useNewUrlParser and useUnifiedTopology is set to true,

agenda.database('localhost:27017/agenda-test', 'agendaJobs');

You can also specify it during instantiation.

const agenda = new Agenda({
	db: { address: 'localhost:27017/agenda-test', collection: 'agendaJobs' }
});

Agenda will emit a ready event (see Agenda Events) when properly connected to the database. It is safe to call agenda.start() without waiting for this event, as this is handled internally. If you're using the db options, or call database, then you may still need to listen for ready before saving jobs.

mongo(dbInstance, [collectionName])

Use an existing mongodb-native MongoClient/Db instance. This can help consolidate connections to a database. You can instead use .database to have agenda handle connecting for you.

You can also specify it during instantiation:

const agenda = new Agenda({ mongo: mongoClientInstance.db('agenda-test') });

Note that MongoClient.connect() returns a mongoClientInstance since node-mongodb-native 3.0.0, while it used to return a dbInstance that could then be directly passed to agenda.

name(name)

Sets the lastModifiedBy field to name in the jobs collection. Useful if you have multiple job processors (agendas) and want to see which job queue last ran the job.

agenda.name(os.hostname + '-' + process.pid);

You can also specify it during instantiation

const agenda = new Agenda({ name: 'test queue' });

processEvery(interval)

Takes a string interval which can be either a traditional javascript number, or a string such as 3 minutes

Specifies the frequency at which agenda will query the database looking for jobs that need to be processed. Agenda internally uses setTimeout to guarantee that jobs run at (close to ~3ms) the right time.

Decreasing the frequency will result in fewer database queries, but more jobs being stored in memory.

Also worth noting is that if the job queue is shutdown, any jobs stored in memory that haven't run will still be locked, meaning that you may have to wait for the lock to expire. By default it is '5 seconds'.

agenda.processEvery('1 minute');

You can also specify it during instantiation

const agenda = new Agenda({ processEvery: '30 seconds' });

maxConcurrency(number)

Takes a number which specifies the max number of jobs that can be running at any given moment. By default it is 20.

agenda.maxConcurrency(20);

You can also specify it during instantiation

const agenda = new Agenda({ maxConcurrency: 20 });

defaultConcurrency(number)

Takes a number which specifies the default number of a specific job that can be running at any given moment. By default it is 5.

agenda.defaultConcurrency(5);

You can also specify it during instantiation

const agenda = new Agenda({ defaultConcurrency: 5 });

lockLimit(number)

Takes a number which specifies the max number jobs that can be locked at any given moment. By default it is 0 for no max.

agenda.lockLimit(0);

You can also specify it during instantiation

const agenda = new Agenda({ lockLimit: 0 });

defaultLockLimit(number)

Takes a number which specifies the default number of a specific job that can be locked at any given moment. By default it is 0 for no max.

agenda.defaultLockLimit(0);

You can also specify it during instantiation

const agenda = new Agenda({ defaultLockLimit: 0 });

defaultLockLifetime(number)

Takes a number which specifies the default lock lifetime in milliseconds. By default it is 10 minutes. This can be overridden by specifying the lockLifetime option to a defined job.

A job will unlock if it is finished (ie. the returned Promise resolves/rejects or done is specified in the params and done() is called) before the lockLifetime. The lock is useful if the job crashes or times out.

agenda.defaultLockLifetime(10000);

You can also specify it during instantiation

const agenda = new Agenda({ defaultLockLifetime: 10000 });

sort(query)

Takes a query which specifies the sort query to be used for finding and locking the next job.

By default it is { nextRunAt: 1, priority: -1 }, which obeys a first in first out approach, with respect to priority.

disableAutoIndex(boolean)

Optional. Disables the automatic creation of the default index on the jobs table. By default, Agenda creates an index to optimize its queries against Mongo while processing jobs.

This is useful if you want to use your own index in specific use-cases.

Agenda Events

An instance of an agenda will emit the following events:

  • ready - called when Agenda mongo connection is successfully opened and indices created. If you're passing agenda an existing connection, you shouldn't need to listen for this, as agenda.start() will not resolve until indices have been created. If you're using the db options, or call database, then you may still need to listen for the ready event before saving jobs. agenda.start() will still wait for the connection to be opened.
  • error - called when Agenda mongo connection process has thrown an error
await agenda.start();

Defining Job Processors

Before you can use a job, you must define its processing behavior.

define(jobName, fn, [options])

Defines a job with the name of jobName. When a job of jobName gets run, it will be passed to fn(job, done). To maintain asynchronous behavior, you may either provide a Promise-returning function in fn or provide done as a second parameter to fn. If done is specified in the function signature, you must call done() when you are processing the job. If your function is synchronous or returns a Promise, you may omit done from the signature.

options is an optional argument which can overwrite the defaults. It can take the following:

  • concurrency: number maximum number of that job that can be running at once (per instance of agenda)
  • lockLimit: number maximum number of that job that can be locked at once (per instance of agenda)
  • lockLifetime: number interval in ms of how long the job stays locked for (see multiple job processors for more info). A job will automatically unlock once a returned promise resolves/rejects (or if done is specified in the signature and done() is called).
  • priority: (lowest|low|normal|high|highest|number) specifies the priority of the job. Higher priority jobs will run first. See the priority mapping below
  • shouldSaveResult: boolean flag that specifies whether the result of the job should also be stored in the database. Defaults to false

Priority mapping:

{
  highest: 20,
  high: 10,
  normal: 0,
  low: -10,
  lowest: -20
}

Async Job:

agenda.define('some long running job', async job => {
	const data = await doSomelengthyTask();
	await formatThatData(data);
	await sendThatData(data);
});

Async Job (using done):

agenda.define('some long running job', (job, done) => {
	doSomelengthyTask(data => {
		formatThatData(data);
		sendThatData(data);
		done();
	});
});

Sync Job:

agenda.define('say hello', job => {
	console.log('Hello!');
});

define() acts like an assignment: if define(jobName, ...) is called multiple times (e.g. every time your script starts), the definition in the last call will overwrite the previous one. Thus, if you define the jobName only once in your code, it's safe for that call to execute multiple times.

Creating Jobs

every(interval, name, [data], [options])

Runs job name at the given interval. Optionally, data and options can be passed in. Every creates a job of type single, which means that it will only create one job in the database, even if that line is run multiple times. This lets you put it in a file that may get run multiple times, such as webserver.js which may reboot from time to time.

interval can be a human-readable format String, a cron format String, or a Number.

data is an optional argument that will be passed to the processing function under job.attrs.data.

options is an optional argument that will be passed to job.repeatEvery. In order to use this argument, data must also be specified.

Returns the job.

agenda.define('printAnalyticsReport', async job => {
	const users = await User.doSomethingReallyIntensive();
	processUserData(users);
	console.log('I print a report!');
});

agenda.every('15 minutes', 'printAnalyticsReport');

Optionally, name could be array of job names, which is convenient for scheduling different jobs for same interval.

agenda.every('15 minutes', ['printAnalyticsReport', 'sendNotifications', 'updateUserRecords']);

In this case, every returns array of jobs.

schedule(when, name, [data])

Schedules a job to run name once at a given time. when can be a Date or a String such as tomorrow at 5pm.

data is an optional argument that will be passed to the processing function under job.attrs.data.

Returns the job.

agenda.schedule('tomorrow at noon', 'printAnalyticsReport', { userCount: 100 });

Optionally, name could be array of job names, similar to the every method.

agenda.schedule('tomorrow at noon', [
	'printAnalyticsReport',
	'sendNotifications',
	'updateUserRecords'
]);

In this case, schedule returns array of jobs.

now(name, [data])

Schedules a job to run name once immediately.

data is an optional argument that will be passed to the processing function under job.attrs.data.

Returns the job.

agenda.now('do the hokey pokey');

create(jobName, data)

Returns an instance of a jobName with data. This does NOT save the job in the database. See below to learn how to manually work with jobs.

const job = agenda.create('printAnalyticsReport', { userCount: 100 });
await job.save();
console.log('Job successfully saved');

Managing Jobs

jobs(mongodb-native query, mongodb-native sort, mongodb-native limit, mongodb-native skip)

Lets you query (then sort, limit and skip the result) all of the jobs in the agenda job's database. These are full mongodb-native find, sort, limit and skip commands. See mongodb-native's documentation for details.

const jobs = await agenda.jobs({ name: 'printAnalyticsReport' }, { data: -1 }, 3, 1);
// Work with jobs (see below)

cancel(mongodb-native query)

Cancels any jobs matching the passed mongodb-native query, and removes them from the database. Returns a Promise resolving to the number of cancelled jobs, or rejecting on error.

const numRemoved = await agenda.cancel({ name: 'printAnalyticsReport' });

This functionality can also be achieved by first retrieving all the jobs from the database using agenda.jobs(), looping through the resulting array and calling job.remove() on each. It is however preferable to use agenda.cancel() for this use case, as this ensures the operation is atomic.

disable(mongodb-native query)

Disables any jobs matching the passed mongodb-native query, preventing any matching jobs from being run by the Job Processor.

const numDisabled = await agenda.disable({ name: 'pollExternalService' });

Similar to agenda.cancel(), this functionality can be acheived with a combination of agenda.jobs() and job.disable()

enable(mongodb-native query)

Enables any jobs matching the passed mongodb-native query, allowing any matching jobs to be run by the Job Processor.

const numEnabled = await agenda.enable({ name: 'pollExternalService' });

Similar to agenda.cancel(), this functionality can be acheived with a combination of agenda.jobs() and job.enable()

purge()

Removes all jobs in the database without defined behaviors. Useful if you change a definition name and want to remove old jobs. Returns a Promise resolving to the number of removed jobs, or rejecting on error.

IMPORTANT: Do not run this before you finish defining all of your jobs. If you do, you will nuke your database of jobs.

const numRemoved = await agenda.purge();

Starting the job processor

To get agenda to start processing jobs from the database you must start it. This will schedule an interval (based on processEvery) to check for new jobs and run them. You can also stop the queue.

start

Starts the job queue processing, checking processEvery time to see if there are new jobs. Must be called after processEvery, and before any job scheduling (e.g. every).

stop

Stops the job queue processing. Unlocks currently running jobs.

This can be very useful for graceful shutdowns so that currently running/grabbed jobs are abandoned so that other job queues can grab them / they are unlocked should the job queue start again. Here is an example of how to do a graceful shutdown.

async function graceful() {
	await agenda.stop();
	process.exit(0);
}

process.on('SIGTERM', graceful);
process.on('SIGINT', graceful);

Multiple job processors

Sometimes you may want to have multiple node instances / machines process from the same queue. Agenda supports a locking mechanism to ensure that multiple queues don't process the same job.

You can configure the locking mechanism by specifying lockLifetime as an interval when defining the job.

agenda.define(
	'someJob',
	(job, cb) => {
		// Do something in 10 seconds or less...
	},
	{ lockLifetime: 10000 }
);

This will ensure that no other job processor (this one included) attempts to run the job again for the next 10 seconds. If you have a particularly long running job, you will want to specify a longer lockLifetime.

By default it is 10 minutes. Typically you shouldn't have a job that runs for 10 minutes, so this is really insurance should the job queue crash before the job is unlocked.

When a job is finished (i.e. the returned promise resolves/rejects or done is specified in the signature and done() is called), it will automatically unlock.

Manually working with a job

A job instance has many instance methods. All mutating methods must be followed with a call to await job.save() in order to persist the changes to the database.

repeatEvery(interval, [options])

Specifies an interval on which the job should repeat. The job runs at the time of defining as well in configured intervals, that is "run now and in intervals".

interval can be a human-readable format String, a cron format String, or a Number.

options is an optional argument containing:

options.timezone: should be a string as accepted by moment-timezone and is considered when using an interval in the cron string format.

options.skipImmediate: true | false (default) Setting this true will skip the immediate run. The first run will occur only in configured interval.

options.startDate: Date the first time the job runs, should be equal or after the start date.

options.endDate: Date the job should not repeat after the endDate. The job can run on the end-date itself, but not after that.

options.skipDays: human readable string ('2 days'). After each run, it will skip the duration of 'skipDays'

job.repeatEvery('10 minutes');
await job.save();
job.repeatEvery('3 minutes', {
	skipImmediate: true
});
await job.save();
job.repeatEvery('0 6 * * *', {
	timezone: 'America/New_York'
});
await job.save();

repeatAt(time)

Specifies a time when the job should repeat. Possible values

job.repeatAt('3:30pm');
await job.save();

schedule(time)

Specifies the next time at which the job should run.

job.schedule('tomorrow at 6pm');
await job.save();

priority(priority)

Specifies the priority weighting of the job. Can be a number or a string from the above priority table.

job.priority('low');
await job.save();

setShouldSaveResult(setShouldSaveResult)

Specifies whether the result of the job should also be stored in the database. Defaults to false.

job.setShouldSaveResult(true);
await job.save();

The data returned by the job will be available on the result attribute after it succeeded and got retrieved again from the database, e.g. via agenda.jobs(...) or through the success job event).

unique(properties, [options])

Ensure that only one instance of this job exists with the specified properties

options is an optional argument which can overwrite the defaults. It can take the following:

  • insertOnly: boolean will prevent any properties from persisting if the job already exists. Defaults to false.
job.unique({ 'data.type': 'active', 'data.userId': '123', nextRunAt: date });
await job.save();

IMPORTANT: To avoid high CPU usage by MongoDB, make sure to create an index on the used fields, like data.type and data.userId for the example above.

fail(reason)

Sets job.attrs.failedAt to now, and sets job.attrs.failReason to reason.

Optionally, reason can be an error, in which case job.attrs.failReason will be set to error.message

job.fail('insufficient disk space');
// or
job.fail(new Error('insufficient disk space'));
await job.save();

run(callback)

Runs the given job and calls callback(err, job) upon completion. Normally you never need to call this manually.

job.run((err, job) => {
	console.log("I don't know why you would need to do this...");
});

save()

Saves the job.attrs into the database. Returns a Promise resolving to a Job instance, or rejecting on error.

try {
	await job.save();
	cosole.log('Successfully saved job to collection');
} catch (e) {
	console.error('Error saving job to collection');
}

remove()

Removes the job from the database. Returns a Promise resolving to the number of jobs removed, or rejecting on error.

try {
	await job.remove();
	console.log('Successfully removed job from collection');
} catch (e) {
	console.error('Error removing job from collection');
}

disable()

Disables the job. Upcoming runs won't execute.

enable()

Enables the job if it got disabled before. Upcoming runs will execute.

touch()

Resets the lock on the job. Useful to indicate that the job hasn't timed out when you have very long running jobs. The call returns a promise that resolves when the job's lock has been renewed.

agenda.define('super long job', async job => {
	await doSomeLongTask();
	await job.touch();
	await doAnotherLongTask();
	await job.touch();
	await finishOurLongTasks();
});

Job Queue Events

An instance of an agenda will emit the following events:

  • start - called just before a job starts
  • start:job name - called just before the specified job starts
agenda.on('start', job => {
	console.log('Job %s starting', job.attrs.name);
});
  • complete - called when a job finishes, regardless of if it succeeds or fails
  • complete:job name - called when a job finishes, regardless of if it succeeds or fails
agenda.on('complete', job => {
	console.log(`Job ${job.attrs.name} finished`);
});
  • success - called when a job finishes successfully
  • success:job name - called when a job finishes successfully
agenda.on('success:send email', job => {
	console.log(`Sent Email Successfully to ${job.attrs.data.to}`);
});
  • fail - called when a job throws an error
  • fail:job name - called when a job throws an error
agenda.on('fail:send email', (err, job) => {
	console.log(`Job failed with error: ${err.message}`);
});

Frequently Asked Questions

What is the order in which jobs run?

Jobs are run with priority in a first in first out order (so they will be run in the order they were scheduled AND with respect to highest priority).

For example, if we have two jobs named "send-email" queued (both with the same priority), and the first job is queued at 3:00 PM and second job is queued at 3:05 PM with the same priority value, then the first job will run first if we start to send "send-email" jobs at 3:10 PM. However if the first job has a priority of 5 and the second job has a priority of 10, then the second will run first (priority takes precedence) at 3:10 PM.

The default MongoDB sort object is { nextRunAt: 1, priority: -1 } and can be changed through the option sort when configuring Agenda.

What is the difference between lockLimit and maxConcurrency?

Agenda will lock jobs 1 by one, setting the lockedAt property in mongoDB, and creating an instance of the Job class which it caches into the _lockedJobs array. This defaults to having no limit, but can be managed using lockLimit. If all jobs will need to be run before agenda's next interval (set via agenda.processEvery), then agenda will attempt to lock all jobs.

Agenda will also pull jobs from _lockedJobs and into _runningJobs. These jobs are actively being worked on by user code, and this is limited by maxConcurrency (defaults to 20).

If you have multiple instances of agenda processing the same job definition with a fast repeat time you may find they get unevenly loaded. This is because they will compete to lock as many jobs as possible, even if they don't have enough concurrency to process them. This can be resolved by tweaking the maxConcurrency and lockLimit properties.

Sample Project Structure?

Agenda doesn't have a preferred project structure and leaves it to the user to choose how they would like to use it. That being said, you can check out the example project structure below.

Can I Donate?

Thanks! I'm flattered, but it's really not necessary. If you really want to, you can find my gittip here.

Web Interface?

Agenda itself does not have a web interface built in but we do offer stand-alone web interface Agendash:

Agendash interface

Mongo vs Redis

The decision to use Mongo instead of Redis is intentional. Redis is often used for non-essential data (such as sessions) and without configuration doesn't guarantee the same level of persistence as Mongo (should the server need to be restarted/crash).

Agenda decides to focus on persistence without requiring special configuration of Redis (thereby degrading the performance of the Redis server on non-critical data, such as sessions).

Ultimately if enough people want a Redis driver instead of Mongo, I will write one. (Please open an issue requesting it). For now, Agenda decided to focus on guaranteed persistence.

Spawning / forking processes

Ultimately Agenda can work from a single job queue across multiple machines, node processes, or forks. If you are interested in having more than one worker, Bars3s has written up a fantastic example of how one might do it:

const cluster = require('cluster');
const os = require('os');

const httpServer = require('./app/http-server');
const jobWorker = require('./app/job-worker');

const jobWorkers = [];
const webWorkers = [];

if (cluster.isMaster) {
	const cpuCount = os.cpus().length;
	// Create a worker for each CPU
	for (let i = 0; i < cpuCount; i += 1) {
		addJobWorker();
		addWebWorker();
	}

	cluster.on('exit', (worker, code, signal) => {
		if (jobWorkers.indexOf(worker.id) !== -1) {
			console.log(
				`job worker ${worker.process.pid} exited (signal: ${signal}). Trying to respawn...`
			);
			removeJobWorker(worker.id);
			addJobWorker();
		}

		if (webWorkers.indexOf(worker.id) !== -1) {
			console.log(
				`http worker ${worker.process.pid} exited (signal: ${signal}). Trying to respawn...`
			);
			removeWebWorker(worker.id);
			addWebWorker();
		}
	});
} else {
	if (process.env.web) {
		console.log(`start http server: ${cluster.worker.id}`);
		// Initialize the http server here
		httpServer.start();
	}

	if (process.env.job) {
		console.log(`start job server: ${cluster.worker.id}`);
		// Initialize the Agenda here
		jobWorker.start();
	}
}

function addWebWorker() {
	webWorkers.push(cluster.fork({ web: 1 }).id);
}

function addJobWorker() {
	jobWorkers.push(cluster.fork({ job: 1 }).id);
}

function removeWebWorker(id) {
	webWorkers.splice(webWorkers.indexOf(id), 1);
}

function removeJobWorker(id) {
	jobWorkers.splice(jobWorkers.indexOf(id), 1);
}

Recovering lost Mongo connections ("auto_reconnect")

Agenda is configured by default to automatically reconnect indefinitely, emitting an error event when no connection is available on each process tick, allowing you to restore the Mongo instance without having to restart the application.

However, if you are using an existing Mongo client you'll need to configure the reconnectTries and reconnectInterval connection settings manually, otherwise you'll find that Agenda will throw an error with the message "MongoDB connection is not recoverable, application restart required" if the connection cannot be recovered within 30 seconds.

Example Project Structure

Agenda will only process jobs that it has definitions for. This allows you to selectively choose which jobs a given agenda will process.

Consider the following project structure, which allows us to share models with the rest of our code base, and specify which jobs a worker processes, if any at all.

- server.js
- worker.js
lib/
  - agenda.js
  controllers/
    - user-controller.js
  jobs/
    - email.js
    - video-processing.js
    - image-processing.js
   models/
     - user-model.js
     - blog-post.model.js

Sample job processor (eg. jobs/email.js)

let email = require('some-email-lib'),
	User = require('../models/user-model.js');

module.exports = function (agenda) {
	agenda.define('registration email', async job => {
		const user = await User.get(job.attrs.data.userId);
		await email(user.email(), 'Thanks for registering', 'Thanks for registering ' + user.name());
	});

	agenda.define('reset password', async job => {
		// Etc
	});

	// More email related jobs
};

lib/agenda.js

const Agenda = require('agenda');

const connectionOpts = { db: { address: 'localhost:27017/agenda-test', collection: 'agendaJobs' } };

const agenda = new Agenda(connectionOpts);

const jobTypes = process.env.JOB_TYPES ? process.env.JOB_TYPES.split(',') : [];

jobTypes.forEach(type => {
	require('./jobs/' + type)(agenda);
});

if (jobTypes.length) {
	agenda.start(); // Returns a promise, which should be handled appropriately
}

module.exports = agenda;

lib/controllers/user-controller.js

let app = express(),
	User = require('../models/user-model'),
	agenda = require('../worker.js');

app.post('/users', (req, res, next) => {
	const user = new User(req.body);
	user.save(err => {
		if (err) {
			return next(err);
		}
		agenda.now('registration email', { userId: user.primary() });
		res.send(201, user.toJson());
	});
});

worker.js

require('./lib/agenda.js');

Now you can do the following in your project:

node server.js

Fire up an instance with no JOB_TYPES, giving you the ability to process jobs, but not wasting resources processing jobs.

JOB_TYPES=email node server.js

Allow your http server to process email jobs.

JOB_TYPES=email node worker.js

Fire up an instance that processes email jobs.

JOB_TYPES=video-processing,image-processing node worker.js

Fire up an instance that processes video-processing/image-processing jobs. Good for a heavy hitting server.

Debugging Issues

If you think you have encountered a bug, please feel free to report it here:

Submit Issue

Please provide us with as much details as possible such as:

  • Agenda version
  • Environment (OSX, Linux, Windows, etc)
  • Small description of what happened
  • Any relevant stack track
  • Agenda logs (see below)

To turn on logging, please set your DEBUG env variable like so:

  • OSX: DEBUG="agenda:*" ts-node src/index.ts
  • Linux: DEBUG="agenda:*" ts-node src/index.ts
  • Windows CMD: set DEBUG=agenda:*
  • Windows PowerShell: $env:DEBUG = "agenda:*"

While not necessary, attaching a text file with this debug information would be extremely useful in debugging certain issues and is encouraged.

Known Issues

"Multiple order-by items are not supported. Please specify a single order-by item."

When running Agenda on Azure cosmosDB, you might run into this issue caused by Agenda's sort query used for finding and locking the next job. To fix this, you can pass custom sort option: sort: { nextRunAt: 1 }

Performance

It is recommended to set this index if you use agendash:

db.agendaJobs.ensureIndex({
    "nextRunAt" : -1,
    "lastRunAt" : -1,
    "lastFinishedAt" : -1
}, "agendash2")

If you have one job definition with thousand of instances, you can add this index to improve internal sorting query for faster sortings

db.agendaJobs.ensureIndex({
    "name" : 1,
    "disabled" : 1,
    "lockedAt" : 1
}, "findAndLockDeadJobs")

Sandboxed Worker - use child processes

It's possible to start jobs in a child process, this helps for example for long running processes to seperate them from the main thread. For example if one process consumes too much memory and gets killed, it will not affect any others. To use this feature, several steps are required. 1.) create a childWorker helper. The subrocess has a complete seperate context, so there are no database connections or anything else that can be shared. Therefore you have to ensure that all required connections and initializations are done here too. Furthermore you also have to load the correct job definition so that agenda nows what code it must execute. Therefore 3 parameters are passed to the childWorker: name, jobId and path to the job definition.

Example file can look like this:

childWorker.ts

import 'reflect-metadata';

process.on('message', message => {
  if (message === 'cancel') {
    process.exit(2);
  } else {
    console.log('got message', message);
  }
});

(async () => {
	const mongooseConnection = /** connect to database */

  /** do other required initializations */

  // get process arguments (name, jobId and path to agenda definition file)
	const [, , name, jobId, agendaDefinition] = process.argv;

  // set fancy process title
	process.title = `${process.title} (sub worker: ${name}/${jobId})`;

  // initialize Agenda in "forkedWorker" mode
	const agenda = new Agenda({ name: `subworker-${name}`, forkedWorker: true });
	// connect agenda (but do not start it)
	await agenda.mongo(mongooseConnection.db as any);

	if (!name || !jobId) {
		throw new Error(`invalid parameters: ${JSON.stringify(process.argv)}`);
	}

  // load job definition
  /** in this case the file is for example ../some/path/definitions.js
  with a content like:
  export default (agenda: Agenda, definitionOnly = false) => {
    agenda.define(
      'some job',
      async (notification: {
        attrs: { data: { dealId: string; orderId: TypeObjectId<IOrder> } };
      }) => {
        // do something
      }
    );

    if (!definitionOnly) {
        // here you can create scheduled jobs or other things
    }
	});
  */
	if (agendaDefinition) {
		const loadDefinition = await import(agendaDefinition);
		(loadDefinition.default || loadDefinition)(agenda, true);
	}

  // run this job now
	await agenda.runForkedJob(jobId);

  // disconnect database and exit
	process.exit(0);
})().catch(err => {
	console.error('err', err);
	if (process.send) {
		process.send(JSON.stringify(err));
	}
	process.exit(1);
});

Ensure to only define job definitions during this step, otherwise you create some overhead (e.g. if you create new jobs inside the defintion files). That's why I call the defintion file with agenda and a second paramter that is set to true. If this parameter is true, I do not initialize any jobs (create jobs etc..)

2.) to use this, you have to enable it on a job. Set forkMode to true:

const job = agenda.create('some job', { meep: 1 });
job.forkMode(true);
await job.save();

Acknowledgements

License

The MIT License

agenda's People

Contributors

albert-iv avatar alexbeletsky avatar bars3s avatar bompus avatar camhenlin avatar dandv avatar dependabot[bot] avatar emhagman avatar fiznool avatar harisvsulaiman avatar ilanbiala avatar jakeorr avatar jdiamond avatar joeframbach avatar koresar avatar leonardlin avatar loris avatar niftylettuce avatar nwkeeley avatar omgimalexis avatar pdfowler avatar renovate[bot] avatar rschmukler avatar shakefu avatar simison avatar simllll avatar thebestnom avatar uzlopak avatar vkarpov15 avatar wingsbob avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

agenda's Issues

ReferenceError: Agenda is not defined

I know this is a total noob question, but how do I include this , I keep getting

ReferenceError: Agenda is not defined

var agenda = new Agenda();

Optionally remove old jobs

When I decide to change one job name it happens that the old job still exists in Mongo and if I forgot to change the execution name on an event (let's imagine I have this job triggered from several places at my app), the old job gets executed instead of the new one.

'502 socket hang up error'

Hi,

Thanks for the great library.

I am using agenda for sending reminder email after an interval of 2 days of adding some activity. Everything is working fine on localhost but after deploying application with nodejitsu, I am getting following error on calling an API that uses agenda,

502 socket hang up

After removing agenda from the API that uses agenda, everything works fine. Any idea why this error is occurring on deploying application?

Agenda Reruns Already Completed Jobs

It seems as though Agenda reruns jobs whenever they are defined, regardless if they were ran before or not.

Take this example:

var mongoose        = require('mongoose');
var Agenda          = require('agenda');

var config = require('./configuration').env();

var agenda = new Agenda();
agenda.database(config.mongo);

agenda.define('testJob', function(job, done){
  console.log('testJob Running');
});

agenda.every('1 days', 'testJob');

agenda.start();

console.log('Agenda started');

When you start this program, the job runs once and waits. If you restart the server, the job gets ran again, even though it was defined to be ran daily and was marked as ran and finished.

TestJob MongoDB Entry:

{
    "_id" : ObjectId("53149a7b6ad28fe1395c487e"),
    "data" : null,
    "failReason" : null,
    "failedAt" : null,
    "lastFinishedAt" : ISODate("2014-03-03T15:28:44.887Z"),
    "lastRunAt" : ISODate("2014-03-03T15:28:44.886Z"),
    "lockedAt" : null,
    "name" : "testJob",
    "nextRunAt" : ISODate("2014-03-04T15:28:44.886Z"),
    "priority" : 0,
    "repeatInterval" : "1 days",
    "type" : "single"
}

As we can see, the job is marked as ran, and should be ran next on March 4th (the next day). Is there a way of defining and setting a job up so that it doesn't automatically run?

I've also tried using the Crontab syntax ('0 5 * * 1-5'), but this job also gets ran every time the server starts.

I want to defined a job or two to run at a certain time every day and not have to worry about the server rerunning these daily jobs if the server restarts for some reason.

Tests are failing on master

I have tests locally failing. It look strange since travis ci shows everything is green. Will appreciate suggestions:

> [email protected] test /Users/alexanderbeletsky/Development/Projects/agenda
> mocha


  ․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․

  63 passing (768ms)
  2 failing

  1) Agenda configuration methods database sets the database:
     TypeError: Cannot read property '_connect_args' of undefined
      at Context.<anonymous> (/Users/alexanderbeletsky/Development/Projects/agenda/test/agenda.js:26:33)
      at Test.Runnable.run (/Users/alexanderbeletsky/Development/Projects/agenda/node_modules/mocha/lib/runnable.js:211:32)
      at Runner.runTest (/Users/alexanderbeletsky/Development/Projects/agenda/node_modules/mocha/lib/runner.js:358:10)
      at /Users/alexanderbeletsky/Development/Projects/agenda/node_modules/mocha/lib/runner.js:404:12
      at next (/Users/alexanderbeletsky/Development/Projects/agenda/node_modules/mocha/lib/runner.js:284:14)
      at /Users/alexanderbeletsky/Development/Projects/agenda/node_modules/mocha/lib/runner.js:293:7
      at next (/Users/alexanderbeletsky/Development/Projects/agenda/node_modules/mocha/lib/runner.js:237:23)
      at Object._onImmediate (/Users/alexanderbeletsky/Development/Projects/agenda/node_modules/mocha/lib/runner.js:261:5)
      at processImmediate [as _immediateCallback] (timers.js:330:15)

  2) Agenda configuration methods database sets the collection:
     TypeError: Cannot read property '0' of undefined
      at Context.<anonymous> (/Users/alexanderbeletsky/Development/Projects/agenda/test/agenda.js:30:41)
      at Test.Runnable.run (/Users/alexanderbeletsky/Development/Projects/agenda/node_modules/mocha/lib/runnable.js:211:32)
      at Runner.runTest (/Users/alexanderbeletsky/Development/Projects/agenda/node_modules/mocha/lib/runner.js:358:10)
      at /Users/alexanderbeletsky/Development/Projects/agenda/node_modules/mocha/lib/runner.js:404:12
      at next (/Users/alexanderbeletsky/Development/Projects/agenda/node_modules/mocha/lib/runner.js:284:14)
      at /Users/alexanderbeletsky/Development/Projects/agenda/node_modules/mocha/lib/runner.js:293:7
      at next (/Users/alexanderbeletsky/Development/Projects/agenda/node_modules/mocha/lib/runner.js:237:23)
      at Object._onImmediate (/Users/alexanderbeletsky/Development/Projects/agenda/node_modules/mocha/lib/runner.js:261:5)
      at processImmediate [as _immediateCallback] (timers.js:330:15)



npm ERR! Test failed.  See above for more details.
npm ERR! not ok code 0

Rerunning jobs after server shutdown

Hey guys, Love the library. Thank you.

I just had a question about agenda and how it saves jobs.

does saving a job mean that onces the node server is shutdown and restarted, it will continue to process the jobs saved in the database? if not, the what is the alternate solution?

I have tried saving jobs, but after restarting my node script, it does not run my old jobs. Here is my script

var express = require('express')
var app = express()
var server = require('http').Server(app)
var io = require('socket.io')(server)


var Agenda = require('Agenda')
var agenda = new Agenda({ db: { address: 'localhost:27017/jobs' } })
agenda.start() // start agenda

io.on('connection', function (socket) {

    socket.on('createJob', function (data, callback) {
        var jobName = data.name;

        // create the job
        agenda.define(jobName, function (job, done) {

            // do stuff
            getReport();
            sendEmail();

            // save the job
            job.save(function (err) {
                if (!err) {
                    console.log("Saved");
                    done();
                }
            })

        })

        agenda.every('3 minutes', jobName, { data: 'random' });

    })

})

// start server
server.listen(3000, function () {
    console.log('Server started');
})

cron-style with every() not working

I want to run this jobs:

_agenda.every('1 minute', 'heartbeat', {}); // every minute
_agenda.every('35,36,37 * * * *',   'anotherjob', {}); // at 35 mins, at 36 mins, at 37 mins of each hour

Then I add a logger to monitor output:

_agenda.on('start', function (job) {
    console.log("> "+(new Date)+" : "+job.attrs.name+" // "+JSON.stringify(job.attrs.data));
});

So the output is:

> Mon Apr 07 2014 15:34:54 GMT-0300 (BRT) : heartbeat // {}
> Mon Apr 07 2014 15:35:54 GMT-0300 (BRT) : heartbeat // {}
> Mon Apr 07 2014 15:36:54 GMT-0300 (BRT) : heartbeat // {}
> Mon Apr 07 2014 15:37:54 GMT-0300 (BRT) : heartbeat // {}
> Mon Apr 07 2014 15:38:54 GMT-0300 (BRT) : heartbeat // {}

This means that my scheduled job cron-style doesn't work.

Am I doing something wrong?

purge() error on MongoDB 2.6.0

Sample code:

var Agenda, queue;

Agenda = require('agenda');

queue = new Agenda({
  db: {
    address: 'localhost:27017/test_DB'
  }
});

queue.purge(function(err, num) {
  if (err != null) {
   console.log("Error purging Agenda queue: " + err);
  }
});

Output of the script: key $not must not start with '$'.

question: every() same job with different data

I'm trying to run same job with different params.

_agenda.every('15,35,55 * * * *',   'myjob', {name:"john"});
_agenda.every('40 minutes',         'myjob', {name:"mark"});
_agenda.every('0 17,13,19 * * *',   'myjob', {name:"bill"});
_agenda.every('0 2,14 * * *',       'myjob', {name:"steve"});

I know every() saves only one doc in database. but why?

Is there any way to do what i'm trying without re-define same job function many times?

UUID Generation

I noticed you have a custom function generateUUID. Why not use node-uuid instead?

Math.random sometimes causes collisions, and node-uuid avoids them while taking advantage of the Node-JS crypto API to be more secure. It also supports timestamp based UUIDs, which might be more applicable to a scheduling library.

See the below, starting at line 169 in https://github.com/rschmukler/agenda/blob/master/lib/agenda.js

function generateUUID() {
   'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
     var r = Math.random()*16|0, v = c == 'x' ? r : (r&0x3|0x8);
     return v.toString(16);
    });
}

Recurring interval notation

FWIW: I have created date-frequency a while ago.

If you ever bump into limitations with the cron notation in the every method you offer, maybe it is worthwhile having a look at it. It is far from finished, but its goals are:

  • An ISO 8601 Durations based string notation. This is more readable than cron and is more extensible. For instance, cron doesn't offer n-th week of the year.
  • Allow "next occurence after" and "occurences between" lookups.

The project is of course open for changes/improvements should you ever consider it. It now only does what it needs to do for my own needs.

Feature Request: Record the server a job was last run on

I'm not quite sure how to structure this, but I'd like to be able to look up the job records, and see which server a job was last run on.

I'd like the hostname (os.hostname) and process id (process.pid) records. Perhaps to keep this flexible a callback could be provided which sets the server identifier for the record update?

This will help with troubleshooting - tracking what was run where.

Success event is not being propagated while async job finishes via done()

from README:

success - called when a job finishes successfully
success:job name - called when a job finishes successfully

agenda.once('success:send email', function(job) {
console.log("Sent Email Successfully to: %s", job.attrs.data.to);
});

I use something like:
agenda.on('success', function(job) {
console.log("Sent Email Successfully to: %s", job.attrs.data.to);
});

and this is not being executed even if my jobs are not failing and I receive on agenda.on('complete', ...

  • there is typo in README since we should use agenda.on not agenda.once

interface to view jobs

I am considering creating a small express middleware that will give you a big readonly view of the current jobs and their statuses.

Do you know if anything like this already exists?

issue with readme example

Hi, i took the example in readme and wrote a tryAgenda.js very simple

Agenda = require('agenda');
var agenda = new Agenda({db: { address: 'localhost:27017/agenda-example'}});
agenda.define('a task', function(job, done) {
console.log("a task is running ... say hello");
});
agenda.every('1 minutes', 'a task');
agenda.start();

but at runtime it exits with error ...see trace http://pastebin.com/QC4JMK2t

any hints?

Prefill Job _id so it could be saved at the time of creation

Hey,

Agenda.prototype.every currently returns the job attributes, but not the id, which makes it hard to save a reference to that particular job. That could easily be remedied by creating the _id beforehand with new Mongo.ObjectID — possibly in the Job constructor.

Cheers

how to remove a job

for the testing purpose, I randomly created a job, later in the code, I rename the job to another, then the I encountered a error : uncaught exception: Error: Attempted to run job undefined.

I don't see any API to remove the job or list all the jobs, or I missed it

Jobs as child processes?

It doesn't look like jobs are spun out as their own child processes. Was mainly wondering if this was a feature, or maybe I'm misreading the code somewhere. Thanks for the work!

.every() Jobs Do Not Get Marked As Complete

I'm not 100% sure yet, but it seems after the latest update .every() jobs never properly save being completed. Take this example:

var Agenda          = require('agenda');
var log             = function() { console.log.apply(this, arguments); };

var agenda = new Agenda();
agenda.database('mongodb://localhost/some-db').processEvery(100);
agenda.start();

agenda.define('testJob', function(job, done){
  done();
  log('testJob Running');
});

agenda.define('anotherJob', function(job, done) {
  done();
  log('anotherJob Running');
});

agenda.every('2 seconds', 'testJob');
agenda.now('anotherJob');


log('Agenda started');

log('finding jobs');

On a blank DB, you'll find these in your agendaJobs collection:

{
    "_id" : ObjectId("5319eb8f3b96f798eb0a3760"),
    "data" : null,
    "failReason" : null,
    "failedAt" : null,
    "lastFinishedAt" : null,
    "lastRunAt" : null,
    "lockedAt" : ISODate("2014-03-07T15:53:51.388Z"),
    "name" : "testJob",
    "nextRunAt" : ISODate("2014-03-07T15:53:51.376Z"),
    "priority" : 0,
    "repeatInterval" : "2 seconds",
    "type" : "single"
}
{
    "_id" : ObjectId("5319eb8ff86ba55343000001"),
    "data" : null,
    "failReason" : null,
    "failedAt" : null,
    "lastFinishedAt" : ISODate("2014-03-07T15:53:51.445Z"),
    "lastRunAt" : ISODate("2014-03-07T15:53:51.445Z"),
    "lockedAt" : null,
    "name" : "anotherJob",
    "nextRunAt" : null,
    "priority" : 0,
    "repeatInterval" : null,
    "type" : "normal"
}

"anotherJob" gets marked as complete, however "testJob" does not. It's pretty odd, I looked into the callback function to ensure the .save() actually updated the document.

    var jobCallback = function(err){
      if(err){

        self.fail(err);
        agenda.emit('fail', err, self);
        agenda.emit('fail:' + self.attrs.name, err, self);
      }else{
        agenda.emit('success', self);
        agenda.emit('success:' + self.attrs.name, self);
      }

      self.attrs.lastFinishedAt = new Date();
      self.attrs.lockedAt = null;
      self.save(function(saveErr, job){

        console.log('===     self.save     ===');
        console.log(arguments);

        cb && cb(err || saveErr, job);
      });
    };

You'll see the .save() callback's returned job is actually marked as completed, which is slightly baffling. Sorry for opening to many issues, I hope this helps!

Too many issue

Hi

I was testing agenda to use it in my project. to kick it off, I tried with agenda.now() and I encountered too many issues

The sample test case i used is to print the input string. I tried printing from 1 - n and i observed these things

  1. It did not print all the numbers, it missed some
  2. Sometimes its printing numbers twice
  3. After 10 calls (approx), the scheduler stops forever, i dont see any numbers printing, even if I input
    4.There is atleast a 2 second delay in printing the numbers

Job progress reporting

Would it be practical to have a mechanism to report progress on a job to the database or via event emission?

Make job not run at first run?

var Agenda = require('agenda');

setInterval(function() {
  console.log(new Date)
}, 1000);

var agenda = new Agenda({db: { address: 'localhost:27017/agenda-example'}});
agenda.define('check this out yow', function(job, done) {
  console.log('checked!');
  done();
});
agenda.every('0 * * * *', 'check this out yow');
agenda.start();

I suppose the check should run every hour (00:00, 01:00... 23:00). Unfortunately at first run the definition gets ran first then when the cron pattern has matched, gets ran. Is there a way to make the job not run when the scripts executed?

Database Adapters

Can you allow extensibility to this package to hook up different data stores, so that we can add additional database adapters to it. I am looking for MYSQL support and if it was allowed, then we could use this package for different data sources.

Can the scheduling code know if a job ran / succeeded / failed?

I am considering porting my kue code over to agenda. Reading the docs I do not see any way to schedule a job, wait for it to complete and then have an complete job event that can tell you the status of the job that just ran.

I realize that this functionality is not needed in the case of recurring jobs, but it is very useful in apps that use jobs to do some heavy lifting during a request and then return some result of the job to the user.

One other question - or maybe feature request: If this type of event is supported, it is possible to pass back data from the job back to the code that initiated the job?

Updated job gets locked

In some use cases I need to update a job, to repeat in another intervall:

agenda.jobs({
    name: 'some job'
}, function(err, jobs) {
    if(jobs.length == 0){
        //define job
        agenda.define('some job', { lockLifeTime: 1000 }, function(job, done) {
             // do something and call done();
        });
        agenda.every((interval * 60) - 5 + ' seconds', 'some job');
    }else{
        jobs[0].repeatEvery(repeatInterval + ' seconds').save();
        jobs[0].schedule('in ' + repeatInterval + ' seconds');
   }
});

However the job is updated it is executed only once and stuck in following state:

{
    "_id" : ObjectId("53725383a27e17ac40300276"),
    "data" : null,
    "lastFinishedAt" : ISODate("2014-05-13T17:18:41.625Z"),
    "lastRunAt" : ISODate("2014-05-13T17:18:41.624Z"),
    "lockedAt" : ISODate("2014-05-13T17:19:09.744Z"),
    "name" : "5370c7e94f8ccf6913c87515",
    "nextRunAt" : ISODate("2014-05-13T17:19:36.624Z"),
    "priority" : 0,
    "repeatInterval" : "55 seconds",
    "type" : "single"
}

Why does the job get locked?

Multi-server lock on jobs

Having agenda be aware of other nodes in the system that connect to the jobs database will be helpful in the scenario you don't want a dedicated batch machine in your environment. This will prevent the same jobs being run twice in the same environment on different machines. (some sort of distributed lock mechanism with mongo?).

Question: Schedule and repeat

I need to have a job, that scheduled for particular time.. but then repeated again next time. Something like

agenda.schedule('every saturday 12:00', 'send email');

So, it would run every saturday 12:00. Is it possible to do that?

agenda.now() returns the job.nextRunAt property, however does not save it in the database

var job = agenda.now('foo');
Inspecting the returned job shows that a nextRunAt is set accordingly.

Line #219 in test/agenda.js
describe('jobs', function() also verifies the nextRunAt is returned. However this test does not verify whats written to the database only what has been setup in the job before its saved.

Line #179 in lib/agenda.js
When creating a job with .now() nextRunAt will always be <= now so props.nextRunAt will be deleted from the props object.

Line #184 in lib/agenda.js
update = { $set: props }; will never have a nextRunAt property with job.now()

Line #196 in /lib/agenda.js
Will insert this job with no nextRunAt properties.

This is currently what I am experiencing, all the job.now()'s are being entered into the database with no nextRunAt property.

** Edit - just wanted to say this is a great module and I appreciate the hard work put into it. I suspect this bug might have snuck in with the commits on 6/11 with the .every fix?

Processing items synchronously

Hello,

I have the following scenario: my queue contains three scheduled items (i.e. A, B, C). The external process that handles these items cannot be launched multiple times at the same time. For this reason, I would need Agenda to process A, wait until it's completed (or failed), then process B, wait, then finally C.

If I understand the documentation correctly, I would have to invoke Agenda like so using your example in the documentation:

agenda.define('some long running job', function(job, done) {
  doSomelengthyTask(function(data) {
    formatThatData(data);
    sendThatData(data);
    // done();
  });
});

I should remove done() because I want Agenda to process the scheduled items synchronously, correct?

Also, what is the content type and value of job and done? What should I be expecting back?

Thanks!

How are jobs persisted when server is restarted?

When I create a job to run every 10 minutes from a running node instance, it works fine. However when I restart the node server, the jobs are not started again.

While my app is initializing I do:

global.agenda = new require('agenda')({
    db: {
        address: 'USER:PASSWORD@localhost:27017/agenda',
        collection: 'jobs'
    },
    processEvery: '30 seconds',
    maxConcurrency: 100
});

global.agenda.start();

I bind a job to a model of my app when the model is created:

agenda.define(model.id, function(job, done) {
    //do something
    done();
}); 
agenda.every('10 minutes', model.id);

This works when the job is created and while nodejs is running. However when nodejs is restarted, the jobs do not run.

Am I missing something?

Need of `save` method?

I'm trying to understand the real need to save method. With save method, you write smth like

var job = agenda.schedule('some', 'job');
job.repeatEvery('week');
job.save()

it's to verbose, as for me. It would look better with kind of fluent inteface.

agenda.schedule('some', 'job').repeatEvery('week');

Looks like methods repeatEvery, schedule, priority could do save operation inside, so you don't have to call it manually. What do you think?

"Saturday at Noon" syntax not actually supported

Hello, if you see my previous issue, I did some testing today and found that whenever I used syntax specifying the day and time like "Friday at 11:30pm" what agenda put in mongo was actually [today] at whatever time. So 'at 11:30pm' works, but specifying the day seems equivalent to not specifying it, so with 'Friday at 11:30pm' or 'AnyDay at 11:30pm' it just gets scheduled for today.

But that syntax would be an awesome feature and is in the Readme example. So if you have a moment, can you confirm that is not actually supported [yet], if you plan to support it in the future, or if you are actually certain it IS supported and somehow my test was not able to reproduce that functionality?

Thanks so much for your time.

Question; getting it working

So, is the package standalone? Do I need to run a mongodb service on the system?

var agenda = new Agenda({db: { address: 'localhost:27017/agenda-example'}});

agenda.define('delete old users', function(job, done) {
  User.remove({lastLogIn: { $lt: twoDaysAgo }}, done);
});

agenda.every('3 minutes', 'delete old users');

Trying the examples results in errors all over the place and so I'm wondering what I'm missing, what isn't explained in the dox.

Running into failed to connect to [localhost:27017], undefined function calls when I try different ways etc.

.every() Jobs Being Repeated

I know this isn't the most helpful issue opened, but after the last update pushed to npm I am seeing daily jobs get ran every few hours.

I'm going to sit down tomorrow and get some test cases going, but just a heads up that something is up.

Mongodb authentication

If a mongodb instance is secured with username/password, how do I setup this in agenda?

Unlock jobs on server close

Right now the server is locking jobs and then not starting them with a setTimeout so that we get the exact run time. If you close agenda while it's in limbo, the job will likely be locked.

References #58 (I suspect).

Strange scheduling; possible time zone issue?

Hello, thanks so much for building this. Its really a big advantage over cron. I am not sure what happened, maybe this is somehow an issue with my time zone or something? OR maybe I did not quite understand how it works.

But I could swear that I scheduled something for Friday at 11:30pm.. unfortunately the server restarted on Wednesday at 6:30pm. Not sure how a time zone configuration could account for that, although it does end in :30.

Anyway I really appreciate a hint about how to fix this if anyone has a quick second. This is what is in my DB:

 db.agendaJobs.find()
{ "_id" : ObjectId("530430bd810cd93400000001"), "data" : {  }, "failReason" : null, "failedAt" : null, "lastFinishedAt" : ISODate("2014-02-19T23:30:08.737Z"), "lastRunAt" : ISODate("2014-02-19T23:30:04.706Z"), "lockedAt" : null, "name" : "restart scripto", "nextRunAt" : null, "priority" : 0, "repeatInterval" : null, "type" : "normal" }
{ "_id" : ObjectId("530430bd810cd93400000002"), "data" : {  }, "failReason" : null, "failedAt" : null, "lastFinishedAt" : ISODate("2014-02-19T23:30:04.737Z"), "lastRunAt" : ISODate("2014-02-19T23:30:04.710Z"), "lockedAt" : null, "name" : "restart scripto", "nextRunAt" : ISODate("2014-02-26T23:30:04.710Z"), "priority" : 0, "repeatInterval" : "1 week", "type" : "normal" }
{ "_id" : ObjectId("530430bd810cd93400000003"), "data" : {  }, "failReason" : null, "failedAt" : null, "lastFinishedAt" : ISODate("2014-02-19T23:52:44.436Z"), "lastRunAt" : ISODate("2014-02-19T23:50:04.958Z"), "lockedAt" : null, "name" : "backup", "nextRunAt" : null, "priority" : 0, "repeatInterval" : null, "type" : "normal" }
{ "_id" : ObjectId("530430bd810cd93400000004"), "data" : {  }, "failReason" : null, "failedAt" : null, "lastFinishedAt" : ISODate("2014-02-19T23:52:44.237Z"), "lastRunAt" : ISODate("2014-02-19T23:50:04.962Z"), "lockedAt" : null, "name" : "backup", "nextRunAt" : ISODate("2014-02-20T23:50:04.962Z"), "priority" : 0, "repeatInterval" : "1 day", "type" : "normal" }

And this is my code:

Agenda = require 'agenda'
childproc = require 'child_process'

restart = (job, done) ->
  e, o, er = childproc.exec! './stop'
  console.log "#{e}\n#{o}\n#{er}"

  prog = childproc.spawn 'bin/run.sh', [], {}
  prog.stderr.on 'data', (d) -> console.log d.toString()
  prog.stdout.on 'data', (d) -> console.log d.toString()
  console.log "started scripto"
  done?()
  null

backup = (job, done) ->
  e, o, er = childproc.exec! 'toffee backup.coffee'
  console.log "#{e}\n#{o}\n#{er}"
  done()
  null

restart {}

agenda = new Agenda {db: { address: 'localhost:27017/agenda' }}

agenda.define 'restart scripto', restart
agenda.define 'backup', backup

weeklyRestart = agenda.schedule 'Friday at 11:30pm', 'restart scripto', {}
weeklyRestart.repeatEvery('1 week').save()

nightlyBackup = agenda.schedule "at 11:50pm", 'backup', {}
nightlyBackup.repeatEvery('1 day').save()

agenda.start()

process.on 'uncaughtException', (er) ->
  console.log er

Thanks so much for any advice you have.

Running Jobs

Is there any way to determine what jobs are running right now?

Redis support

Redis support would be fantastic. I run for a 100% Redis outfit, and I'd like to use your module for building our scheduler system.

I know it muddies the waters somewhat, so it may be worth looking into JugglingDB (an ORM for Node with support for many databases, including Redis and Mongo). Usually I'd be loathe to recommend an ORM, but this is one case that may suit such a suggestion.

Last run at 8 but locked at 10

I have two jobs running and they both got stuck at 08:31:13 today running for about 9 hours. It is strange that the lockedAt value is 10:41 while lastRun was 08:31. Do you have any clue whats going on?

{
    "_id" : ObjectId("53867e6ea27e17ac40300357"),
    "data" : null,
    "lastFinishedAt" : ISODate("2014-05-29T08:21:18.834Z"),
    "lastModifiedBy" : null,
    "lastRunAt" : ISODate("2014-05-29T08:21:18.829Z"),
    "lockedAt" : ISODate("2014-05-29T10:41:22.142Z"),
    "name" : "53865113fd1bb38c07329133",
    "nextRunAt" : ISODate("2014-05-29T08:31:13.829Z"),
    "priority" : 0,
    "repeatInterval" : "595 seconds",
    "type" : "single"
}

{
    "_id" : ObjectId("53867e6ea27e17ac40300356"),
    "data" : null,
    "lastFinishedAt" : ISODate("2014-05-29T08:21:18.849Z"),
    "lastModifiedBy" : null,
    "lastRunAt" : ISODate("2014-05-29T08:21:18.846Z"),
    "lockedAt" : ISODate("2014-05-29T10:41:22.141Z"),
    "name" : "53862eb2599aed4707f7c66f",
    "nextRunAt" : ISODate("2014-05-29T08:31:13.846Z"),
    "priority" : 0,
   "repeatInterval" : "595 seconds",
   "type" : "single"
}

processEvery is set to 30 seconds.

It runs so many times?

Hi,

'use strict';

var agenda = require('agenda'),
    task = agenda({ db: { address: 'localhost:27017/agenda-example' } }),
    util = require('util');

task.define('FindInaPage', function(job, done) {
  util.log('found!');
  done();
});

task.every('0 35 * * * *', 'FindInaPage');
task.start();

Turns into:

$ node runner.js
3 Apr 20:34:59 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:00 - found!
3 Apr 20:35:01 - found!

That is so weird.

[Discussion] Consider a networked architecture

I am considering moving agenda into a networked architecture that handles things like job delegation among multiple workers. Currently this is handled by mongo and selective querying by workers.

This is spurred by comments by @coreybutler in #24.

Current System

Advantages

  • simplified architecture: no distinction between a master/slave, configuring of multiple workers, etc. Extremely easy to get started (single worker)
  • Robust: doesn't break if single worker goes down, or even if 3 workers go down. Each worker is independent

Disadvantages

  • Highly reliant on Mongo: Because things like locking, grabbing jobs, etc are all done with queries, it makes it difficult for new adapters to be written. This continues to escalate if we wanted to add things like progress reporting.
  • Not as optimized: Currently we grab all the jobs that could be run by a single worker, which cues them and then runs them one at a time (obeying its concurrency). This means that a worker could queue (and grab) 5 jobs, while only being able to run 3. If timing happened correctly, a second worker wouldn't grab those jobs. This could be fixed with better queries, locking, etc. but we further commit to mongo
  • Events tied to workers: job events happen on the worker, but not ALL of the workers. This means that every worker must be able to handle the events, and that there's no way to handle an event for a job if you're not the worker that processed the job.

New System

I am envisioning all workers communicating with each other via a socket-like interface. The first worker to connect will basically take a "master" role and handle all of the delegation, database querying, managing state, etc.

Workers can specify which jobs they will take, eg.

var agenda = new Agenda({server: 'agenda://127.0.0.1:29999'})

agenda.define('jobA', doJobA);
agenda.define('jobB', doJobB);
agenda.define('jobC', doJobC);

// worker A
agenda.start({only: ['jobA', 'jobB']})

// worker B
agenda.start({except: ['jobA']})

Advantages

  • simplified db adapters : With this, the database goes back to just storing data, not handling things like locking, messaging, etc. With this, we would (hopefully) be able to abstract it away into multiple adapters, allowing support for mongo, redis, postgres, and whatever people want
  • unified state : With things like writing a web-interface, it is currently difficult to get a full picture (how many workers are there, what are they working on, etc). This would create a "master" with an answer. This could then have its own query API (sockets would be fine) which would allow a web interface to connect to it and issue commands to get state, force processing of jobs, etc
Disadvantages
  • conceptually more complex: There's potentially more to understand, which could make it harder for new contributors to come into the project, as well as make it harder for people to use. Right now agenda seems pretty simple at first glance, and this could lose some of that
  • Massive Rewrite: Most of agenda would need to be re-written from scratch. It can be worth it, but it is a big undertaking
  • Could be more fragile: What happens when the master goes down? Yes we can recover, but I envision it being a big pain.

Keep in mind this isn't even a proposal, just a half-baked first idea. I am hoping that people will chime in and help shape this, or tell me that it's a bad idea.

Feel free to chime in @coreybutler @bars3s @TomKaltz @mkoryak @moudy and anyone else who wants to make their opinion heard!

Removing Jobs

I had a look through the documentation and code but couldn't find any way to remove jobs?

I created a task and had it running every 10 seconds but decided I want to call it manually, however it keeps running!

I tried Job#remove as mentioned in #26 but that doesn't exist:

agenda.jobs({name: 'Update Instagram'}, function(err, jobs) {
  if(err) throw err;
  jobs.forEach(function(job) {
    job.remove();
  });
});

[TypeError: Object #<Job> has no method 'remove']

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.