Code Monkey home page Code Monkey logo

fang's Introduction

fang

Crates.io docs page test style

Fang

Background task processing library for Rust. It can use PostgreSQL, SQLite or MySQL as an asyncronous task queue.

Key Features

Here are some of the fang's key features:

  • Async and threaded workers. Workers can be started in threads (threaded workers) or tokio tasks (async workers)
  • Scheduled tasks. Tasks can be scheduled at any time in the future
  • Periodic (CRON) tasks. Tasks can be scheduled using cron expressions
  • Unique tasks. Tasks are not duplicated in the queue if they are unique
  • Single-purpose workers. Tasks are stored in a single table but workers can execute only tasks of the specific type
  • Retries. Tasks can be retried with a custom backoff mode

Installation

  1. Add this to your Cargo.toml

the Blocking feature

[dependencies]
fang = { version = "0.11.0-rc1" , features = ["blocking"], default-features = false }

the Asynk feature

  • PostgreSQL as a queue
[dependencies]
fang = { version = "0.11.0-rc1" , features = ["asynk-postgres"], default-features = false }
  • SQLite as a queue
[dependencies]
fang = { version = "0.11.0-rc1" , features = ["asynk-sqlite"], default-features = false }
  • MySQL as a queue
[dependencies]
fang = { version = "0.11.0-rc1" , features = ["asynk-mysql"], default-features = false }

the Asynk feature with derive macro

Substitute database with your desired backend.

[dependencies]
fang = { version = "0.11.0-rc1" , features = ["asynk-{database}", "derive-error" ], default-features = false }

All features

fang = { version = "0.11.0-rc1" }

Supports rustc 1.77+

  1. Create the fang_tasks table in the database. The migration of each database can be found in fang/{database}-migrations where database is postgres, mysql or sqlite.

Migrations can be also run as code, importing the feature migrations-{database} being the database the backend queue you want to use.

[dependencies]
fang = { version = "0.11.0-rc1" , features = ["asynk-postgres", "migrations-postgres" ], default-features = false }
use fang::run_migrations_postgres;
run_migrations_postgres(&mut connection).unwrap();

Usage

Defining a task

Blocking feature

Every task should implement the fang::Runnable trait which is used by fang to execute it.

If you have a CustomError, it is recommended to implement From<FangError>. So this way you can use ? operator inside the run function available in fang::Runnable trait.

You can easily implement it with the macro ToFangError. This macro is only available in the feature derive-error.

use fang::FangError;
use fang::Runnable;
use fang::typetag;
use fang::PgConnection;
use fang::serde::{Deserialize, Serialize};
use fang::ToFangError;
use std::fmt::Debug;


#[derive(Debug, ToFangError)]
enum CustomError {
    ErrorOne(String),
    ErrorTwo(u32),
}

fn my_func(num : u16) -> Result<(), CustomError> {
    if num == 0 {
        Err(CustomError::ErrorOne("is zero".to_string()))
    }

    if num > 500 {
        Err(CustomError::ErrorTwo(num))
    }

    Ok(())
}

#[derive(Serialize, Deserialize)]
#[serde(crate = "fang::serde")]
struct MyTask {
    pub number: u16,
}

#[typetag::serde]
impl Runnable for MyTask {
    fn run(&self, _queue: &dyn Queueable) -> Result<(), FangError> {
        println!("the number is {}", self.number);

        my_func(self.number)?;
        // You can use ? operator because
        // From<FangError> is implemented thanks to ToFangError derive macro.

        Ok(())
    }

    // If `uniq` is set to true and the task is already in the storage, it won't be inserted again
    // The existing record will be returned for for any insertions operaiton
    fn uniq(&self) -> bool {
        true
    }

    // This will be useful if you want to filter tasks.
    // the default value is `common`
    fn task_type(&self) -> String {
        "my_task".to_string()
    }

    // This will be useful if you would like to schedule tasks.
    // default value is None (the task is not scheduled, it's just executed as soon as it's inserted)
    fn cron(&self) -> Option<Scheduled> {
        let expression = "0/20 * * * Aug-Sep * 2022/1";
        Some(Scheduled::CronPattern(expression.to_string()))
    }

    // the maximum number of retries. Set it to 0 to make it not retriable
    // the default value is 20
    fn max_retries(&self) -> i32 {
        20
    }

    // backoff mode for retries
    fn backoff(&self, attempt: u32) -> u32 {
        u32::pow(2, attempt)
    }
}

As you can see from the example above, the trait implementation has #[typetag::serde] attribute which is used to deserialize the task.

The second parameter of the run function is a struct that implements fang::Queueable. You can re-use it to manipulate the task queue, for example, to add a new job during the current job's execution. If you don't need it, just ignore it.

Asynk feature

Every task should implement fang::AsyncRunnable trait which is used by fang to execute it.

Be careful not to call two implementations of the AsyncRunnable trait with the same name, because it will cause a failure in the typetag crate.

use fang::AsyncRunnable;
use fang::asynk::async_queue::AsyncQueueable;
use fang::serde::{Deserialize, Serialize};
use fang::async_trait;

#[derive(Serialize, Deserialize)]
#[serde(crate = "fang::serde")]
struct AsyncTask {
    pub number: u16,
}

#[typetag::serde]
#[async_trait]
impl AsyncRunnable for AsyncTask {
    async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> {
        Ok(())
    }
    // this func is optional
    // Default task_type is common
    fn task_type(&self) -> String {
        "my-task-type".to_string()
    }


    // If `uniq` is set to true and the task is already in the storage, it won't be inserted again
    // The existing record will be returned for for any insertions operaiton
    fn uniq(&self) -> bool {
        true
    }

    // This will be useful if you would like to schedule tasks.
    // default value is None (the task is not scheduled, it's just executed as soon as it's inserted)
    fn cron(&self) -> Option<Scheduled> {
        let expression = "0/20 * * * Aug-Sep * 2022/1";
        Some(Scheduled::CronPattern(expression.to_string()))
    }

    // the maximum number of retries. Set it to 0 to make it not retriable
    // the default value is 20
    fn max_retries(&self) -> i32 {
        20
    }

    // backoff mode for retries
    fn backoff(&self, attempt: u32) -> u32 {
        u32::pow(2, attempt)
    }
}

In both modules, tasks can be scheduled to be executed once. Use Scheduled::ScheduleOnce enum variant.

Datetimes and cron patterns are interpreted in the UTC timezone. So you should introduce the offset to schedule in a different timezone.

Example:

If your timezone is UTC + 2 and you want to schedule at 11:00:

let expression = "0 0 9 * * * *";

Enqueuing a task

the Blocking feature

To enqueue a task use Queue::enqueue_task

use fang::Queue;

// create a r2d2 pool

// create a fang queue

let queue = Queue::builder().connection_pool(pool).build();

let task_inserted = queue.insert_task(&MyTask::new(1)).unwrap();

the Asynk feature

To enqueue a task use AsyncQueueable::insert_task.

For Postgres backend:

use fang::asynk::async_queue::AsyncQueue;
use fang::AsyncRunnable;

// Create an AsyncQueue
let max_pool_size: u32 = 2;

let mut queue = AsyncQueue::builder()
    // Postgres database url
    .uri("postgres://postgres:postgres@localhost/fang")
    // Max number of connections that are allowed
    .max_pool_size(max_pool_size)
    .build();

// Always connect first in order to perform any operation
queue.connect().await.unwrap();

Encryption is always used with crate rustls. We plan to add the possibility of disabling it in the future.

// AsyncTask from the first example
let task = AsyncTask { 8 };
let task_returned = queue
    .insert_task(&task as &dyn AsyncRunnable)
    .await
    .unwrap();

Starting workers

the Blocking feature

Every worker runs in a separate thread. In case of panic, they are always restarted.

Use WorkerPool to start workers. Use WorkerPool::builder to create your worker pool and run tasks.

use fang::WorkerPool;
use fang::Queue;

// create a Queue

let mut worker_pool = WorkerPool::<Queue>::builder()
    .queue(queue)
    .number_of_workers(3_u32)
    // if you want to run tasks of the specific kind
    .task_type("my_task_type")
    .build();

worker_pool.start();

the Asynk feature

Every worker runs in a separate tokio task. In case of panic, they are always restarted. Use AsyncWorkerPool to start workers.

use fang::asynk::async_worker_pool::AsyncWorkerPool;

// Need to create a queue
// Also insert some tasks

let mut pool: AsyncWorkerPool<AsyncQueue> = AsyncWorkerPool::builder()
        .number_of_workers(max_pool_size)
        .queue(queue.clone())
        // if you want to run tasks of the specific kind
        .task_type("my_task_type")
        .build();

pool.start().await;

Check out:

Configuration

Blocking feature

Just use TypeBuilder for WorkerPool.

Asynk feature

Just use TypeBuilder for AsyncWorkerPool.

Configuring the type of workers

Configuring retention mode

By default, all successfully finished tasks are removed from the DB, failed tasks aren't.

There are three retention modes you can use:

pub enum RetentionMode {
    KeepAll,        // doesn't remove tasks
    RemoveAll,      // removes all tasks
    RemoveFinished, // default value
}

Set retention mode with worker pools TypeBuilder in both modules.

Configuring sleep values

Blocking feature

You can use use SleepParams to configure sleep values:

pub struct SleepParams {
    pub sleep_period: Duration,     // default value is 5 seconds
    pub max_sleep_period: Duration, // default value is 15 seconds
    pub min_sleep_period: Duration, // default value is 5 seconds
    pub sleep_step: Duration,       // default value is 5 seconds
}

If there are no tasks in the DB, a worker sleeps for sleep_period and each time this value increases by sleep_step until it reaches max_sleep_period. min_sleep_period is the initial value for sleep_period. All values are in seconds.

Use set_sleep_params to set it:

let sleep_params = SleepParams {
    sleep_period: Duration::from_secs(2),
    max_sleep_period: Duration::from_secs(6),
    min_sleep_period: Duration::from_secs(2),
    sleep_step: Duration::from_secs(1),
};

Set sleep params with worker pools TypeBuilder in both modules.

Contributing

  1. Fork it!
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create a new Pull Request

Running tests locally

  • Install diesel_cli.
cargo install diesel_cli --no-default-features --features "postgres sqlite mysql"
  • Install docker on your machine.

  • Install SQLite 3 on your machine.

  • Setup databases for testing.

make -j db
  • Run tests. make db does not need to be run in between each test cycle.
make -j tests
  • Run dirty/long tests.
make -j ignored
  • Take down databases.
make -j stop

The -j flag in the above examples enables parallelism for make, is not necessary but highly recommended.

Authors

  • Ayrat Badykov (@ayrat555)

  • Pepe MΓ‘rquez (@pxp9)

fang's People

Contributors

ayrat555 avatar davidmhewitt avatar dependabot[bot] avatar dopplerian avatar jess-sol avatar pxp9 avatar v0idpwn avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

fang's Issues

Please, correct README

When you wrote about "Defining a task" in "Usage", an errata was made in the second paragraph after the image where
The second parameter of the run function is a is an struct...

Please, correct the errata thanks 🌝

Add support for SQLite

I see that fang uses traits to implement AsyncQueueable. It would be great is SQLite is supported as it allows to build applications that just works without additional installation and configuration and they can always upgrade to better postgres support if needed.

Thank you

Hey @ayrat555 and @pxp9,

I hope you're both doing well -- I just wanted to say thanks for this wonderful crate. It's because of efforts like this that the rust ecosystem can continue to grow and see more adoption.

Implement async worker

  1. let's call it AsyncWorker. I think it will store AsyncQueue and some configuration parameters

this struct can be created with TypedBuilder

  1. create looping that will be fetching tasks and executing them

  2. if there are no tasks, it should sleep

Correct readme

In the Enqueuing a task, Asynk feaure part of the readme, depending of does not exist , please correct it.

πŸ‘Ž
πŸ’― πŸ€•

Create async background processing

Currently, the library spawns a thread for each background worker.

Using async processing it should be possible to spawn several workers in a single thread

Simplifying api

One thing I have found it confusing is that both producer and consumer uses AsyncRunnable which has methods such as cron and uniq. This makes it confusing on if the value is taken from producer or from consumer. Would be great if the apis were a bit more like faktory-rs.

You create a producer that has that enqueue jobs with all the metadata.

use faktory::{Producer, Job};
let mut p = Producer::connect(None).unwrap();
p.enqueue(Job::new("foobar", vec!["data1"])).unwrap();

and to consume you register the handler only.

use faktory::ConsumerBuilder;
use std::io;
let mut c = ConsumerBuilder::default();
c.register("foobar", |job| -> io::Result<()> {
    println!("{:?}", job);
    Ok(())
});
let mut c = c.connect(None).unwrap();
if let Err(e) = c.run(&["default"]) {
    println!("worker failed: {}", e);
}

One thing I like about faktory-rs is that it is few lines of code. To add other features such as schedule the following could be used.

p.enqueue(Job::new("foobar", args)
    .on_queue("queue1")
    .schedule_at(Utc::now());

Job that is enqueued should be a different struct than the job that was dequeued.

Duplicates are created for unique tasks when running fang on multiple processes

I was playing with async cron tasks from fang-examples.

I configured the task to be unique and to keep the history of executions.

After running the example on 2 parallel processes I noticed duplicated tasks in the DB and in the application logs.

insert_task_if_not_exist_query seems to not handle the case when multiple instances (processes) are running in parallel.

A possible solution to this is to do something like:

  1. Create a unique index like:
CREATE UNIQUE INDEX fang_task_uniqueness_idx ON fang_tasks (uniq_hash, state) WHERE (uniq_hash is not null and state in ('new', 'in_progress', 'retried'));
  1. update insert_task_uniq.sql:
INSERT INTO "fang_tasks" ("metadata", "task_type" , "uniq_hash", "scheduled_at") VALUES ($1, $2 , $3, $4) ON CONFLICT DO NOTHING RETURNING *;

WARNING: the above SQL statements were not tested and might have SQL errors or not be optimal, so should not be treated as a working solution before testing.

This probably doesn't make sense to do if fang is not designed to work on multiple processes. In this case, it is worth mentioning somewhere in the docs that fact (sorry if it is mentioned already and I missed it).

How can I list tasks in the queue?

I'm trying to list all the tasks (both complete and incomplete) in the worker queue, and I haven't managed to find an example anywhere. Is there an established way to do this--preferably in a way that will serialize to JSON?

use fang::fang_tasks::dsl::fang_tasks;

let root = warp::get()
    .and(warp::path::end())
    .map({
        move || index::WithTemplate {
            name: "index.html",
            value: json!({
                "builds": fang_tasks, // what should go here?
            }),
        }
    })
.map(handlebars);

Thanks so much for any help.

Please comment code on fang_examples

If those are supposed to be examples, they should come ,at the very least, with some kind of brief commentary/explanation as, from the point of view of a new fang user, it's hard to follow the code.

Define "background processing" in README, documentation and website

Define "background processing" in README, documentation and website.

Consider linking to, and/or incorporating content from, https://fang.badykov.com/blog/async-processing/

As someone new to systems and desktop/server application programming, I didn't properly understand the term "background processing" in this context (I only had a notion of what it might be). The readme and documentation currently assumes the reader already understands what "background processing" is and what it is for.

And it turns out you've already got a good definition in your first blog post.

In software engineering background processing is a common approach for solving several problems:

  • Carry out periodic tasks. For example, deliver notifications, update cached values.
  • Defer expensive work so your application stays responsive while performing calculations in the background

It might be that perhaps if I don't understand the term I'm not the audience, however, now that I do understand what it's for, I can see myself using it. I think spelling it out would also be beneficial to developers whose first language isn't English as well.

How to process/retrieve the result of a task and how to pass a non serializable context to the task?

Hello,

Thank you for your work. :)

I am wondering the following things:

  • how could someone retrieve the result of a task?
  • how could someone run a callback or call another service with the result of a task?
  • how could someone use clients or database pool from within a run function?

(Feel free to tell me if you would like me to split this into multiple issues.)

E.g. I would like to store the result inside the database or transmit the result to another service (in-memory) without doing an API call to localhost if I am running the workers within an API.
And I don't want to build the client/db pool for every task from environment variables.

async fn run(&self, queue: &mut dyn AsyncQueueable) -> Result<(), FangError>

run signature is Result<(), FangError> thus I could not make my own AsyncWorker<AQueue> or AsyncWorkerPool<AQueue> that would handle the result of a task.

And since a task should be serializable, I cannot provide to it clients (database or otherwise) that I would not want serializable but that I would want to access in the run function.

I am thinking that the only way would be to attempt to access a global reference to a singleton or something alike, e.g.:

async fn run(&self, _queue: &mut dyn AsyncQueueable) -> Result<(), FangError> {
    // do stuff
    let result = ...;
    let pool: Pool<Postgres> = get_global_postgres_pool().expect("Posgres pool is not initialized");
    pool.acquire().await?.execute("<INSERT result into table>").await?;
    Ok(())
}

Though I am not sure that the compiler would allow me to do so.

  1. Did I miss something?
  2. Or do you currently use a workaround to achieve this behavior?
  3. Or would that kind of feature be out of scope of fang and you don't need this?

Maybe it is part or could be part of the following discussion #101 ?

I understand that this could/would considerably complicate the implementation, but no harm in asking πŸ˜‡.

Question: Can tasks be scheduled in distributed systems with multiple workers on different machines.

Looking at the documentation and blog-post as well as skimming through the code, I wasn't able to answer the question if fang allows scheduling tasks across distributed systems, with multiple workers on different machines.

Is there some synchronization to make this work and to prevent duplicate work across multiple workers, as well as a mechanism for another worker to pick up a task if the worker that started it dies before reporting back the task as finished?

Add the possibility to abort a task

Hello,

First of all, I would like to say that I love your library.
As you probably know, the rust ecosystem does not have (yet) a complete production-grade equivalent to celery/dramatiq for python.

For now, I would like to know if it is possible to add a feature for aborting a task (by task_id ?), either a task still not processed by a worker or a task not finished yet.

EDIT: Also, is there a way to create a client (for pushing task) without the impl of the task ?
in my case the worker is pretty heavy (lots of dependencies …) and I don’t want to build my client with all these dependencies.

Thank you a lot !!!!

Translate docs to Spanish

Since you have a spanish contributor, you may consider translating all of the documentation into spanish to better integrate the spanish community into the project.

πŸ‘
πŸ‡ͺπŸ‡Έ πŸ”₯

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.