Code Monkey home page Code Monkey logo

db-queue's Introduction

Build Status License: MIT Javadoc Download

Database Queue

Library provides worker-queue implementation on top of Java and database.

Fintech company YooMoney uses db-queue in cases where reliability is a must-have requirement.

Project uses Semantic Versioning.
Library is available on Maven Central

implementation 'ru.yoomoney.tech:db-queue:15.0.0'

Why?

There are several reasons:

  • You need simple, efficient and flexible task processing tool which supports delayed job execution.
  • You already have a database and don't want to introduce additional tools in your infrastructure.
  • You have somewhat small load. Db-queue can handle more than 1000 rps on a single table. Moreover, you can shard your database and get horizontal scalability.
  • You require strong guaranties for task delivery, processing and consistency.

Features

The library provides one-time tasks - tasks that are executed once. If you need (recurring tasks)/(periodic tasks) - tasks that are executed periodically, look at db-queue-scheduler library, please.

Usage

How it works?

  1. You have a task that you want to process later.
  2. You tell QueueProducer to schedule the task.
  3. QueueProducer chooses a database shard.
  4. QueueProducer converts the task payload to string representation through TaskPayloadTransformer .
  5. QueueProducer inserts the task in the database through QueueDao.
  6. ... the task has been selected from database at specified time according to queue settings ...
  7. The task payload is converted to typed representation through TaskPayloadTransformer .
  8. The task is passed to the QueueConsumer instance in order to be processed.
  9. You process the task and return processing result.
  10. ... the task is updated according to processing result and queue settings ...

Code configuration

The main steps to configure the library:

Database configuration

As of now the library supports PostgreSQL, MSSQL, Oracle and H2 as backing database, however library architecture makes it easy to add other relational databases which has support for transactions and "for update skip locked" feature,
for example MySql.
Feel free to add support for other databases via pull request.

PostgreSQL

Create table (with index) where tasks will be stored.

CREATE TABLE queue_tasks (
  id                BIGSERIAL PRIMARY KEY,
  queue_name        TEXT NOT NULL,
  payload           TEXT,
  created_at        TIMESTAMP WITH TIME ZONE DEFAULT now(),
  next_process_at   TIMESTAMP WITH TIME ZONE DEFAULT now(),
  attempt           INTEGER                  DEFAULT 0,
  reenqueue_attempt INTEGER                  DEFAULT 0,
  total_attempt     INTEGER                  DEFAULT 0
);
CREATE INDEX queue_tasks_name_time_desc_idx
  ON queue_tasks USING btree (queue_name, next_process_at, id DESC);

You should always analyze your database workload before applying these recommendations. Settings heavily depends on a hardware, and a load you have.

  • Fill Factor

You need to set a low fill-factor for table in order to let database put row updates to the same page. In that case database will need less amount of random page writes. This technique also prevents fragmentation so we get more robust selects. Same rules are applied to an indexes. You can safely set fill-factor for tables and indexes to 30%.

Our production settings for frequently updated tasks table are:

CREATE TABLE queue_tasks (...) WITH (fillfactor=30)
CREATE INDEX ... ON queue_tasks USING btree (...) WITH (fillfactor=30)
  • Autovacuum

You need to make autovacuum more aggressive in order to eliminate dead tuples. Dead tuples leads to excessive page reads because they occupy space that can be reused by active tuples. Autovacuum can be configured in many ways, for example, you can set scale-factor to 1% or even lower.

Our production settings for frequently updated tasks tables are:

CREATE TABLE queue_tasks (...) WITH (
autovacuum_vacuum_cost_delay=5, 
autovacuum_vacuum_cost_limit=500,
autovacuum_vacuum_scale_factor=0.0001)

MSSQL

Create table (with index) where tasks will be stored.

CREATE TABLE queue_tasks (
  id                INT IDENTITY(1,1) NOT NULL,
  queue_name        TEXT NOT NULL,
  payload           TEXT,
  created_at        DATETIMEOFFSET NOT NULL  DEFAULT SYSDATETIMEOFFSET(),
  next_process_at   DATETIMEOFFSET NOT NULL  DEFAULT SYSDATETIMEOFFSET(),
  attempt           INTEGER NOT NULL         DEFAULT 0,
  reenqueue_attempt INTEGER NOT NULL         DEFAULT 0,
  total_attempt     INTEGER NOT NULL         DEFAULT 0,
  PRIMARY KEY (id)
);
CREATE INDEX queue_tasks_name_time_desc_idx
  ON queue_tasks (queue_name, next_process_at, id DESC);

Oracle

Create table (with index) where tasks will be stored.

CREATE TABLE queue_tasks (
  id                NUMBER(38) NOT NULL PRIMARY KEY,
  queue_name        VARCHAR2(128) NOT NULL,
  payload           CLOB,
  created_at        TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
  next_process_at   TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
  attempt           NUMBER(38)                  DEFAULT 0,
  reenqueue_attempt NUMBER(38)                  DEFAULT 0,
  total_attempt     NUMBER(38)                  DEFAULT 0
);
CREATE INDEX queue_tasks_name_time_desc_idx
  ON queue_tasks (queue_name, next_process_at, id DESC);

Create sequence and specify its name through QueueLocation.Builder.withIdSequence(String) or id-sequence in file config.

CREATE SEQUENCE tasks_seq;

H2 database

A table that is needed for a work

CREATE TABLE queue_tasks (
  id                BIGSERIAL PRIMARY KEY,
  queue_name        VARCHAR(100) NOT NULL,
  payload           VARCHAR(100),
  created_at        TIMESTAMP WITH TIME ZONE DEFAULT now(),
  next_process_at   TIMESTAMP WITH TIME ZONE DEFAULT now(),
  attempt           INTEGER                  DEFAULT 0,
  reenqueue_attempt INTEGER                  DEFAULT 0,
  total_attempt     INTEGER                  DEFAULT 0
);
CREATE INDEX queue_tasks_name_time_desc_idx
  ON queue_tasks (queue_name, next_process_at, id DESC);

Modularity

The library is divided into several modules. Each module contains minimal set of dependencies to easily integrate in any project.

  • db-queue-core module provides base logic and requires org.slf4j:slf4j-api library
  • db-queue-spring module provides access to database and requires Spring Framework: spring-jdbc and spring-tx. Other features of Spring ecosystem are not in use.
  • db-queue-brave module provides tracing support with help of Brave
  • db-queue-test module provides integration testing across all modules. It might help to figure out how to use the library in your code.

Project structure

You should provide implementation for interfaces in that package. The package contains classes which are involved in processing or enqueueing tasks.

Queue settings.

Additional classes for managing storage.

Registration and configuration.

  • internal

Internal classes. Not for public use.

Backward compatibility for classes in that package maybe broken in any release

db-queue's People

Contributors

axesipov avatar everplays avatar f0y avatar f0yme avatar gskoba avatar izebit avatar magdel avatar silron88 avatar talbot avatar ul8422 avatar yoomoney-robot avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

db-queue's Issues

MSSQL support

Even though I wanted to work on introducing priorities within one worker thread as we discussed in #1, the support for MSSQL got higher priority for us. So I focused on implementing mssql driver for db-queue.

So far, I have managed to implement the main classes: MssqlQueueDao, MssqlQueuePickTaskDao, and accompanying test classes. However, I want to clear out a couple of things before I make the PR:

  • are you willing to accept a PR to support MSSQL directly in db-queue?
  • a: if yes, how should we manage the databases in tests? right now, embedded postgres is used. However, that can't be the case for mssql. Should we assume that both of them will be executed externally? (via docker in case of travis-ci).
  • b: if no, because of the way that DatabaseDialect is implemented and used, users of library can not add support for other databases. I think we need to have an implementation registry of sort so dialects can be added dynamically.

Max retries

Hey there,

Despite looking into the code samples and available documentation I wasn't able to find a way to configure the max number of retries for a task. Is there functionality like this available?

Thanks in advance

Could you please move the DAO implementations to a different JAR file?

Hi,

I'd like to use this package but my environment uses a different DB access technology, without Spring.

Could you please move the DAOs to a different, Spring-specific DAO JAR? That way, I could write my own implementation of the DAO interface, without any dependency on Spring. This would also require that the factory class is not part of the DAO interface any more.

Cheers
Matthias

Dynamic change configuration in runtime [betweenTasksTimeout, noTaskTimeout, threadCount, etc]

Hi,

What do you think about dynamic configuration any dbqueue params? It would be very useful for example when you increase thread count on during high load system and another decrease when it is not necessary without restart application. One more example when you set zero betweenTasksTimeout to empty the queue and when queue is empty return old params in back.

Thanks,
Leventsov Ivan

the state of noTaskTimeout and betweenTaskTimeout

Hi there,

I was looking into these two variables in https://github.com/yandex-money-tech/db-queue/blob/c1c16397be1ea83ceb4772c34ba5cf52ff7bdfca/src/main/java/ru/yandex/money/common/dbqueue/settings/QueueSettings.java#L27-L30 but from what I understand, they're not actually used anywhere in the code, i.e. the loop policy doesn't care about these values.

I believe you must have had ideas to use them somehow that you introduced them in the code. I might be able to work on this if you share those ideas.

Spring example

I'm having issues with understanding how to use this library in a "spring way" with @configuration... I've found a nice example on maven for "db-queue-scheduler". Do you also have one for "db-queue" ?

Is there any way to avoid duplicate job?

How to avoid duplicate creation of job based on "payload"? In other words, the requirement is if there is already a task running with same "payload" , it should not be scheduled/accepted in queue.

Multiple subscribers/consumers per queue

Hi,

I was looking into the project and was wondering is there a way to have multiple consumers/subscribers for single queue and single task?

I have in mind the case that I'm registering the task and have registered for the given queue 2 consumers in the same app and both receive/process enqueued task.

Thanks

QueueConsumer error handling

Could the QueueConsumer interface be updated to automatically TaskExecutionResult.fail() on an uncaught exception? The pattern of

try {
   // Process
} catch (e: Exception) {
  // log exception
  // return TaskExecutionResult.fail()
}

isn't compatible with top-level Spring exception handlers and leads to the catch and log pattern.

Question about how a queue working

Is the same row in queue table picked twice or more for task processing if processing mode is USE EXTERNAL EXECUTOR and task takes time to executes?
if so how to avoid it?

Add plain JDBC db-queue-jdbc as an alternative to db-queue-spring

Currently only an out of the box implementation is provided for Spring. A plain JDBC alternative would be great for non-Spring projects.

To avoid duplication the DB-specific logic should probably be moved to the core and only the different DB access implementations should in separate modules.

Unstable Oracle tests

There is an insufficient number of connections in docker image for oracle. We were able to increase connections and sessions by rebuilding the docker image. This image is only available in internal storage.

Does the library work in clustered mode?

Hi, sorry for opening an issue but I didn't see another way to contact the maintainers.

I would like to use this lib as part of a larger monolith that can also operate in clustered mode.

If that is the case, can the library be configured to operate consistently across all nodes of the cluster
so that each node can both insert jobs and work on jobs without conflict?

Every node of the cluster is hooked up to the same SQL db.

Dbqueue tries to catch Error

Is there any reason why QueueLoop tries to catch Throwable that includes critical Errors such as OOM? It just eats them.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.