Follow this medium blog for detailed understanding about how bullMQ works.
This is a simple backend implementation of message queue and it's powerful features. I am using bullMQ, which is a nodejs library built on top of redis.
The application will have
- 3+2 queues
- 3 producers
- 3+2 workers
Each producer will generate a new task and add it in one of the queues every 3 seconds, the workers will pick a task from the queues, take a random time between 5-10s, which will be an estimated time taken.
Each worker can listen to one queue only.
If the random number is greater than 8, we will assume that it is some kind of error, and it will be pushed into a error_recovery
queue. One worker will work on the error_recovery
queue, and following the same random number generation logic, it will perform the task, this queue will have a 20% chance of success, and the rest 80% will be stashed into a dead_queue
.
# basic setup
npm init
tsc --init
npm i bullmq ioredis
- Running redis and redis-cli on docker
docker run --name redis_container -p 6379:6379 redis
docker exec -it <redis_container_id> redis-cli
I found out that,
You can reuse a processor that goes into the worker across different queues but not the worker itself.
- Whenever a worker picks a job, it locks it to prevent any other worker from accessing it and till now there is no way to bypass it.
So, I will be utilizing the concept of processor and allotting them to workers depending on job.queueName
.
A worker is instantiated with the Worker class, and the work itself will be performed in the process function
. Process functions are meant to be asynchronous
, using either the async keyword or returning a promise.
const worker = new Worker('queue_name', processor, connection);
Jobs in BullMQ go through various states in their lifecycle:
- waiting: Added to queue, waiting to be processed
- delayed: Job is scheduled for a later time
- active: Being processed by worker
- completed: Successfully processed
- failed: Failed processing due to error
- delayedRetry: Failed job is waiting for retry after delay
- paused: Queue/job paused and not active
While adding jobs, we can pass job options to control lifecycle behavior:
await queue.add(jobName, data, options)
Some common options are:
- delay: Delay job by x ms before processing
- attempts: Number of times to retry a failed job (3 is default)
- backoff: Backoff strategy on job failure
- lifo: Use LIFO order instead of FIFO
- priority: Numeric priority value. Higher is processed sooner
- repeat: Repeat job on a cron schedule
Each job emits events during its lifecycle that we can listen to:
Common job events:
- waiting
- active
- stalled (job touched but not done)
- progress (progress updated)
- completed
- failed
- paused
- resumed
- removed
job.on('completed', () => {
// Job completed
})
job.on('failed', (err) => {
// Job failed with error
})