Code Monkey home page Code Monkey logo

mongodb-cron's Introduction

Build Status NPM Version

mongodb-cron

MongoDB collection as crontab

This package offers a simple API for scheduling tasks and running recurring jobs on MongoDB collections. Any collection can be converted into a job queue or crontab list. It uses the officially supported Node.js driver for MongoDB. It's fast, minimizes processing overhead and it uses atomic commands to ensure safe job executions even in cluster environments.

This is a light weight open source package for NodeJS written with TypeScript. It's actively maintained, well tested and already used in production environments. The source code is available on GitHub where you can also find our issue tracker.

Installation

This is a module for Node.js and can be installed via npm. It depends on the mongodb package and uses promises.

$ npm install --save mongodb mongodb-cron

Example

Below, is a simple example to show the benefit of using this package in your Node.js projects.

Let's start by initializing the database connection.

import { MongoClient } from 'mongodb';

const mongo = await MongoClient.connect('mongodb://localhost:27017');
const db = mongo.db('test');

Continue by initializing and starting a the worker.

import { MongoCron } from 'mongodb-cron';

const collection = db.collection('jobs');
const cron = new MongoCron({
  collection, // a collection where jobs are stored
  onDocument: async (doc) => console.log(doc), // triggered on job processing
  onError: async (err) => console.log(err), // triggered on error
});

cron.start(); // start processing

We can now create our first job.

const job = await collection.insert({
  sleepUntil: new Date('2016-01-01'), // ISO 8601 format (can include timezone)
});

When the processing starts the onDocument handler (defined earlier) is triggered. We have a very basic example here so please continue reading.

Documentation

The MongoCron class converts a collection into a job queue. Jobs are represented by the documents stored in a MongoDB collection. When cron is started it loops through the collection and processes available jobs one by one.

A job should have at least the sleepUntil field. Cron processes only documents where this field exists, other documents are ignored.

One-time Jobs

To create a one-time job we only need to define the required field sleepUntil. When this filed is set to some date in the past, the processing starts immediately.

const job = await collection.insert({
  sleepUntil: new Date(),
});

When the processing of a document starts the sleepUntil field is updated to a new date in the future. This locks the document for a certain amount of time in which the processing must complete (lock duration is configurable). This mechanism prevents possible race conditions and ensures that a job is always processed by only one process at a time.

When the processing ends, the sleepUntil field is set to null.

If cron is unexpectedly interrupted during the processing of a job (e.g. server shutdown), the system automatically recovers and transparently restarts.

Deferred Execution

We can schedule job execution for a particular time in the future by setting the sleepUntil field to a future date.

const job = await collection.insert({
  ...
  sleepUntil: new Date('2016-01-01'), // start on 2016-01-01
});

Recurring Jobs

By setting the interval field we define a recurring job.

const job = await collection.insert({
  ...
  interval: '* * * * * *', // every second
});

The interval above consists of 6 values.

* * * * * *
┬ ┬ ┬ ┬ ┬ ┬
│ │ │ │ │ └── day of week (0 - 7) (0 or 7 is Sun)
│ │ │ │ └──── month (1 - 12)
│ │ │ └────── day of month (1 - 31)
│ │ └──────── hour (0 - 23)
│ └────────── minute (0 - 59)
└──────────── second (0 - 59)

A recurring job will repeat endlessly unless we limit that by setting the repeatUntil field. When a job expires it stops repeating by removing the processable field.

const job = await collection.insert({
  ...
  interval: '* * * * * *',
  repeatUntil: new Date('2020-01-01'),
});

Auto-removable Jobs

A job can automatically remove itself from the collection when the processing completes. To configure that, we need to set the autoRemove field to true.

const job = await collection.insert({
  ...
  autoRemove: true,
});

API

new MongoCron({ collection, condition, onStart, onStop, onDocument, onError, nextDelay, reprocessDelay, idleDelay, lockDuration, sleepUntilFieldPath, intervalFieldPath, repeatUntilFieldPath, autoRemoveFieldPath })

The core class for converting a MongoDB collection into a job queue.

Option Type Required Default Description
autoRemoveFieldPath String No autoRemove The autoRemove field path.
collection Object Yes - MongoDB collection object.
condition Object No null Additional query condition.
idleDelay Integer No 0 A variable which tells how many milliseconds the worker should wait before checking for new jobs after all jobs has been processed.
intervalFieldPath String No interval The interval field path.
lockDuration Integer No 600000 A number of milliseconds for which each job gets locked for (we have to make sure that the job completes in that time frame).
nextDelay Integer No 0 A variable which tells how fast the next job can be processed.
onDocument Function/Promise No - A method which is triggered when a document should be processed.
onError Function/Promise No - A method which is triggered in case of an error.
onIdle Function/Promise No - A method which is triggered when all jobs in a collection have been processed.
onStart Function/Promise No - A method which is triggered when the cron is started.
onStop Function/Promise No - A method which is triggered when the cron is stopped.
repeatUntilFieldPath String No repeatUntil The repeatUntil field path.
reprocessDelay Integer No 0 A variable which tells how many milliseconds the worker should wait before processing the same job again in case the job is a recurring job.
sleepUntilFieldPath String No sleepUntil The sleepUntil field path.
import { MongoClient } from 'mongodb';

const mongo = await MongoClient.connect('mongodb://localhost:27017/test');

const cron = new MongoCron({
  collection: db.collection('jobs'),
  onStart: async () => {},
  onStop: async () => {},
  onDocument: async (doc) => {},
  onIdle: async (doc) => {},
  onError: async (err) => {},
  nextDelay: 1000,
  reprocessDelay: 1000,
  idleDelay: 10000,
  lockDuration: 600000,
  sleepUntilFieldPath: 'cron.sleepUntil',
  intervalFieldPath: 'cron.interval',
  repeatUntilFieldPath: 'cron.repeatUntil',
  autoRemoveFieldPath: 'cron.autoRemove',
});

cron.start():Promise

Starts the cron processor.

cron.stop():Promise

Stops the cron processor.

cron.isRunning():Boolean

Returns true if the cron is started.

cron.isProcessing():Boolean

Returns true if cron is processing a document.

cron.isIdle():Boolean

Returns true if the cron is in idle state.

Processing Speed

Processing speed can be reduced when more and more documents are added into the collection. We can maintain the speed by creating indexes.

await collection.createIndex({
  sleepUntil: 1, // the `sleepUntil` field path, set by the sleepUntilFieldPath
}, {
  sparse: true,
});

Don't forget to adjust the index definition when using your custom query condition.

Best Practice

  • Make your jobs idempotent and transactional. Idempotency means that your job can safely execute multiple times.
  • Run this package in cluster mode. Design your jobs in a way that you can run lots of them in parallel.

Licence

Copyright (c) 2016 Kristijan Sedlak <[email protected]>

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

mongodb-cron's People

Contributors

chdanielmueller avatar ciak0 avatar gutnar avatar rajandhinoja avatar xpepermint avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

mongodb-cron's Issues

Any way to add a priority/hierarchy to each job?

Love this package BTW, been using it for a long time for my job processing however I'm now in need of a priority/hierarchy of jobs.

For example I want a certain job type to always execute immediately, before other job types - even if there's a backlog of jobs of other types.

Is this possible at all? I saw in another answer you wrote something about if you change sleepUntil to another value you can have different queues - not sure if this could be applied somehow?

Thanks!

Question: Idempotent jobs?

Couple of questions, all related to restarting instances:

The documentation says:

If cron is unexpectedly interrupted during the processing of a job (e.g. server shutdown), the system automatically recovers and transparently restarts.

  • Does this mean, when my app has a bunch of jobs defined and uses collection.create() every time it starts, it will not re-create those jobs, if they already exist?
  • How does mongodb-cron identify that the job I'm trying to create already exists? There doesn't seem to be an identifier I can set.
  • What about multiple instances, say if I have several replicas of my app running (e.g. in k8s or docker swarm)? Can mongodb-cron handle this seamlessly, when all the replicas are trying to access the job queue collection and process jobs at the same time?

Many thanks.

Question regarding CPU usage when cron activated

When I activate the cron in the application, the CPU usage on this process is actively showing between 20% - 25%. If I disable the cron, then the usage drop to 0.
The application is running in a test environment, and no other activities happens.

Is this the expected behaviour?
I am using pm2 (v3.5.1) to monit the cpu usage. ($ top -c shows the same CPU usage report)

Btw, thanks for your great work!

cron not activated when idleDelay flag set

When I add the idleDelay flag in the configuration and start the application, the worker does not process any queued jobs, even it has passed the defined sleepUntil time.

If the idleDelay flag is removed from configuration, the job is being processed correctly, however, it consume about 12% - 15% CPU usage when cron activated.

I set all queued one-time job to be processed at 5pm everyday, and I wish the cron to be active at approximately 5 mins before job start, so this will reduce my CPU usage over the day time.

Code snippet

// mongoose schema
const schema = new mongoose.Schema({
    ... ...
    sleepUntil: {type: Date, required: true, index: true, default: (5pm every day)},
    autoRemove: {type: Boolean, default: true} 

}, {collection: 'Queue'});


// cron setting
const config = {
    collection: db.collection('Queue'),
    onDocument: (async (doc) => { return console.info(doc); }),
    onStart: (async () => { return console.info('cron started.'); }),
    onStop: (async () => { return console.warn('cron has stopped.'); }),
    idleDelay: 60 * 1000  // 1 min delay
};
return new MongoCron(config);

Cron Interval

The sleepUntil property is required when defining cron jobs but never documented. I think it shouldn't be required to specifically declare it null if possible, if not then docs should be added.

Broken with MongoDB version 6

In mongo version 6, they have made breaking changes to findOneAndUpdate (https://github.com/mongodb/node-mongodb-native/blob/main/etc/notes/CHANGES_6.0.0.md#findoneandx-family-of-methods-will-now-return-only-the-found-document-or-null-by-default-includeresultmetadata-is-false-by-default).

Now, it simply returns the document without any metadata, so in the following res.value will be undefined, it simply needs to return res.

Also, returnOriginal has been removed and should be replaced with returnDocument: 'before'

  protected async lockNext() {
    const sleepUntil = moment().add(this.config.lockDuration, 'milliseconds').toDate();
    const currentDate = moment().toDate();

    const res = await this.getCollection().findOneAndUpdate({
      $and: [
        { [this.config.sleepUntilFieldPath]: { $exists: true, $ne: null }},
        { [this.config.sleepUntilFieldPath]: { $not: { $gt: currentDate } } },
        this.config.condition,
      ].filter((c) => !!c),
    }, {
      $set: { [this.config.sleepUntilFieldPath]: sleepUntil },
    }, {
      returnOriginal: true, // return original document to calculate next start based on the original value
    });
    return res.value;
  }

If you could please update this so we can upgrade to mongo 6 that would be appreciated! Thanks.

mongodb-cron needs a small change to work with Azure Cosmos MongoAPI

Hello,

We want to use mongodb-cron for our project within Azure and the managed database Cosmos via the MongoAPI.

For that we have to use the condition as Cosmos requires always the partition key in a query/update/delete operation.

Unfortunately the condition is only use in the lockNext() but not in the final update/delete in reschedule.

I would like to add the this.config.condition to all query statements regardless of find/delete/update/...

Does some see problems with that?

Cron running twice for the same job.

I have a single job entry in the database, when the server time reach the sleepUnitl time, the job is being executed twice inside the onDocument method.

Here are my settings:

MongoDB Schema

const schema = new Schema(
    {
      name: { type: String, required: true },
      sleepUntil: { type: Date, default: new Date(), required: true },
      interval: { type: String, required: true },
      data: { type: Schema.Types.Mixed }
    },
    {
      timestamps: true
    }
  );

MondoDB Entry

{
    "_id": ObjectId("65d37bbf7933cf20f1064ee3"),
    "name": "JOB_TO_RUN",
    "interval": "0 0 6 * * *",
    "sleepUntil": ISODate("2024-03-19T09:00:00.000Z"),
    "data": {},
    "updatedAt": ISODate("2024-03-01T15:15:14.647Z")
}

Cron job execution code

const { MongoCron } = require('mongodb-cron');

module.exports = async app => {
  const mongooseClient = app.get('mongooseClient');
  const collection = await mongooseClient.connections[0].collections['cron-jobs'];

  const cron = new MongoCron({
    collection,
    onStart: () => console.info('\t✓ Cron jobs.'),
    onDocument: doc => console.count(), // You will get a 2 console logs here!
    onError: async err => console.error(err),
    nextDelay: 1000,
    reprocessDelay: 1000,
    idleDelay: 1000
  });

  cron.start();
};

Timezone / Summer time support

Hi @xpepermint ,

As far as I can observe the library always works with UTC timestamps in the sleepUntil and interval field.
Is it possible to configure the library at the current state to react to summer time changes?

If not here is my idea:
I can see that the library uses moment.
In my code I use moment with the timezone extension. (moment-timezone)
I suppose with changing the imports from import moment from 'moment'; to import moment from 'moment-timezone'; and adding a default timezone moment.tz.setDefault('Europe/Zurich'); the behaviour could be altered.
Of course this would need a new option in the constructor. e.g. new MongoCron({..., timezon: 'Europe/Zurich'})

I didn't test it and have not looked deep into the code.
What do you think?

Thanks,
Daniel

[Question] How does this handle if the process isnt running during the time a job was supposed to run?

This may be something that's just supported inherently by cron but before I start using this would like to know how its handled with this library:

If I schedule a job to run every week to create a new document but my server is down during that time period, does it run the next time the system is started? Does this keep track of when the next run time should be and on bootup check if theres any jobs past their next run time? Or do I need to handle that logic separately?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.