Code Monkey home page Code Monkey logo

sqlalchemy-celery-beat's Introduction

sqlalchemy-celery-beat

A Scheduler Based Sqlalchemy for Celery.

NOTE: This project was originally developed by AngelLiang to use sqlalchemy as the database scheduler for Flask or FastAPI, like django-celery-beat for django. I am trying to continue on his work and maintain a working solution.

Prerequisites

  • Python 3
  • celery >= 5.0
  • sqlalchemy >= 1.4

First you must install celery and sqlalchemy, and celery should be >=5.0

$ pip install sqlalchemy celery

Installing

Install from PyPi:

$ pip install sqlalchemy-celery-beat

Install from source by cloning this repository:

$ git clone [email protected]:farahats9/sqlalchemy-celery-beat.git
$ cd sqlalchemy-celery-beat
$ python setup.py install

Usage

After you have installed sqlalchemy_celery_beat, you can easily start with following steps:

This is a demo for exmaple, you can check the code in examples directory

  1. start celery worker

    $ celery -A tasks worker -l info
    
  2. start the celery beat with DatabaseScheduler as scheduler:

    $ celery -A tasks beat -S sqlalchemy_celery_beat.schedulers:DatabaseScheduler -l info
    

    you can also use the shorthand argument -S sqlalchemy

Description

After the celery beat is started, by default it create a sqlite database(schedule.db) in current folder. You can use SQLiteStudio.exe to inspect it.

Sample from the PeriodicTask model's table

sqlite

When you want to update scheduler, you can update the data in schedule.db. But sqlalchemy_celery_beat don't update the scheduler immediately. Then you shoule be change the first column's last_update field in the celery_periodic_task_changed to now datetime. Finally the celery beat will update scheduler at next wake-up time.

Database Configuration

You can configure sqlalchemy db uri when you configure the celery, example as:

from celery import Celery

celery = Celery('tasks')

beat_dburi = 'sqlite:///schedule.db'

celery.conf.update(
    {
        'beat_dburi': beat_dburi,
        'beat_schema': None  # you can make the scheduler tables under different schema (tested for postgresql, not available in sqlite)
    }
)

Also, you can use MySQL or PostgreSQL.

# MySQL: `pip install mysql-connector`
beat_dburi = 'mysql+mysqlconnector://root:[email protected]:3306/celery-schedule'

# PostgreSQL: `pip install psycopg2`
beat_dburi = 'postgresql+psycopg2://postgres:[email protected]:5432/celery-schedule'

Passing arguments to SQLAlchemy engine creation

You can pass arguments using the beat_engine_options keyword in the config dictionary, for example let's make the engine use echo=True to show verbose ouptut:

celery.conf.update(
    {
        'beat_dburi': beat_dburi,
        'beat_engine_options': {
            'echo': True
        },
        ...
    }
)

You can use this to pass any options required by your DB driver, for more information about what options you can use check the SQLAlchemy docs.

Example Code 1

View examples/base/tasks.py for details.

Run Worker in console 1

$ cd examples/base

# Celery < 5.0
$ celery worker -A tasks:celery -l info

# Celery >= 5.0
$ celery -A tasks:celery worker -l info

Run Beat in console 2

$ cd examples/base

# Celery < 5.0
$ celery beat -A tasks:celery -S tasks:DatabaseScheduler -l info

# Celery >= 5.0
$ celery -A tasks:celery beat -S tasks:DatabaseScheduler -l info

Example Code 2

Example creating interval-based periodic task

To create a periodic task executing at an interval you must first create the interval object:

>>> from sqlalchemy_celery_beat.models import PeriodicTask, IntervalSchedule, Period
>>> from sqlalchemy_celery_beat.session import SessionManager
>>> from celeryconfig import beat_dburi
>>> session_manager = SessionManager()
>>> session = session_manager.session_factory(beat_dburi)

# executes every 10 seconds.
>>> schedule = session.query(IntervalSchedule).filter_by(every=10, period=Period.SECONDS).first()
>>> if not schedule:
...     schedule = IntervalSchedule(every=10, period=Period.SECONDS)
...     session.add(schedule)
...     session.commit()

That's all the fields you need: a period type and the frequency.

You can choose between a specific set of periods:

  • Period.DAYS
  • Period.HOURS
  • Period.MINUTES
  • Period.SECONDS
  • Period.MICROSECONDS

note:

If you have multiple periodic tasks executing every 10 seconds,
then they should all point to the same schedule object.

Now that we have defined the schedule object, we can create the periodic task entry:

    >>> task = PeriodicTask(
    ...     schedule_model=schedule,            # we created this above.
    ...     name='Importing contacts',          # simply describes this periodic task.
    ...     task='proj.tasks.import_contacts',  # name of task.
    ... )
    >>> session.add(task)
    >>> session.commit()

Note that this is a very basic example, you can also specify the arguments and keyword arguments used to execute the task, the queue to send it to[*], and set an expiry time.

Here's an example specifying the arguments, note how JSON serialization is required:

>>> import json
>>> from datetime import datetime, timedelta, UTC

>>> periodic_task = PeriodicTask(
...     schedule_model=schedule,                  # we created this above.
...     name='Importing contacts',          # simply describes this periodic task.
...     task='proj.tasks.import_contacts',  # name of task.
...     args=json.dumps(['arg1', 'arg2']),
...     kwargs=json.dumps({
...        'be_careful': True,
...     }),
...     expires=datetime.now(UTC) + timedelta(seconds=30)
... )
... session.add(periodic_task)
... session.commit()

Example creating crontab-based periodic task

A crontab schedule has the fields: minute, hour, day_of_week, day_of_month and month_of_year, so if you want the equivalent of a 30 * * * * (execute every 30 minutes) crontab entry you specify:

>>> from sqlalchemy_celery_beat.models import PeriodicTask, CrontabSchedule
>>> schedule = CrontabSchedule(
...     minute='30',
...     hour='*',
...     day_of_week='*',
...     day_of_month='*',
...     month_of_year='*',
...     timezone='UTC',
... )

The crontab schedule is linked to a specific timezone using the 'timezone' input parameter.

Then to create a periodic task using this schedule, use the same approach as the interval-based periodic task earlier in this document, the schedule_model is a generic foreign-key implementation which makes things very easy and efficient:

>>> periodic_task = PeriodicTask(
...     schedule_model=schedule,
...     name='Importing contacts',
...     task='proj.tasks.import_contacts',
... )

What the previous code actually do is this:

>>> periodic_task = PeriodicTask(
...     schedule_id=schedule.id,
...     discriminator=schedule.discriminator,
...     name='Importing contacts',
...     task='proj.tasks.import_contacts',
... )

So when you can use discriminator + schedule_id or use the convenient property schedule_model and it will populate them for you behind the scenes.

Temporarily disable a periodic task

You can use the enabled flag to temporarily disable a periodic task:

>>> periodic_task.enabled = False
>>> session.add(periodic_task)
>>> session.commit()

If you are using a bulk operation to update or delete multiple tasks at the same time, the changes won't be noticed by the scheduler until you do PeriodicTaskChanged.update_changed() or .update_from_session()

example:

from sqlalchemy_celery_beat.models import PeriodicTaskChanged
from sqlalchemy_celery_beat.session import SessionManager, session_cleanup

session_manager = SessionManager()
session = session_manager.session_factory(beat_dburi)

with session_cleanup(session):
    stmt = update(PeriodicTask).where(PeriodicTask.name == 'task-123').values(enabled=False)

    session.execute(stmt)  # using execute causes no orm event to fire, changes are in the database but the schduler has no idea
    session.commit()

    PeriodicTaskChanged.update_from_session(session)
    # now scheduler reloads the tasks and all is good

This is not needed when you are updating a specific object using session.add(task) because it will trigger the after_update, after_delete or after_insert events.

Example running periodic tasks

The periodic tasks still need 'workers' to execute them. So make sure the default Celery package is installed. (If not installed, please follow the installation instructions here: https://github.com/celery/celery)

Both the worker and beat services need to be running at the same time.

  1. Start a Celery worker service (specify your project name):

    $ celery -A [project-name] worker --loglevel=info
    
  2. As a separate process, start the beat service (specify the scheduler):

    $ celery -A [project-name] beat -l info --scheduler sqlalchemy_celery_beat.schedulers:DatabaseScheduler
    

Working on adding the following features

  • ✅ Add ClockedSchedule model
  • ✅ Implement a generic foreign key
  • ✅ More robust attribute validation on models
  • ✅ Add Tests
  • Add more examples
  • Support for Async drivers like asyncpg and psycopg3 async mode
  • Use Alembic migrations

Any help with the tasks above or feedback is appreciated 🙂

Acknowledgments

sqlalchemy-celery-beat's People

Contributors

angelliang avatar farahats9 avatar gsingh42 avatar manuwelakanade avatar mbronstein1 avatar minsis avatar subhamd avatar xxxss avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

sqlalchemy-celery-beat's Issues

Confusing README

Hey I was following the instructions in the README.md and they threw me for a spin.
I'll admit that partially is a skill issue on my end but following the code you are lead to the following example:

session_manager = SessionManager()
beat_dburi = 'sqlite:///test.db'
engine, Session = session_manager.create_session(beat_dburi)
session = Session()

schedule = session.query(IntervalSchedule).filter_by(every=10, period=Period.SECONDS).first()
if not schedule:
    schedule = IntervalSchedule(every=10, period=Period.SECONDS)
    session.add(schedule)
    session.commit()

task = PeriodicTask(
    schedule_model=schedule,
    name='Importing contacts',
    task='main.add',
    args='[60, 9]', 
)
session.add(task)
session.commit()

When you try to run this it fails. The version bellow works.

session_manager = SessionManager()
beat_dburi = 'sqlite:///test.db'
session = session_manager.session_factory(beat_dburi)

with session_cleanup(session):

    schedule = session.query(IntervalSchedule).filter_by(every=10, period=Period.SECONDS).first()
    if not schedule:
        schedule = IntervalSchedule(every=20, period=Period.SECONDS)
        session.add(schedule)
        session.commit()

    task = PeriodicTask(
        schedule_model=schedule,
        name='Importing contacts',
        task='main.add',
        args='[60, 9]', 
    )
    session.add(task)
    session.commit()

Question: Is it possible to not create celery tables on start?

My issue is that when deploying a project I first need to up my app and app_db container and migrate, with celery tables. And then only up the workers after I migrate.

Maybe there's a better way of doing the above anyway. Any help/clarification on this is appreciated as always.

Celery Beat Worker creates when deleting a shedule

How I'm deleting a schedule:

async def delete_scheduled_task(task_name: str):
    session_manager = SessionManager()
    session = session_manager.session_factory(db_uri)

    with session_cleanup(session):
        stmt = delete(PeriodicTask).where(PeriodicTask.name == task_name)

        session.execute(stmt)
        session.commit()

        PeriodicTaskChanged.update_from_session(session)

The error:


celery_beat    | [2024-03-09 10:20:31,162: INFO/MainProcess] DatabaseScheduler: Schedule changed.
celery_beat    | [2024-03-09 10:20:31,184: CRITICAL/MainProcess] beat raised exception <class 'AttributeError'>: AttributeError("'NoneType' object has no attribute 'last_run_at'")
celery_beat    | Traceback (most recent call last):
celery_beat    |   File "/usr/local/lib/python3.11/site-packages/celery/apps/beat.py", line 113, in start_scheduler
celery_beat    |     service.start()
celery_beat    |   File "/usr/local/lib/python3.11/site-packages/celery/beat.py", line 643, in start
celery_beat    |     interval = self.scheduler.tick()
celery_beat    |                ^^^^^^^^^^^^^^^^^^^^^
celery_beat    |   File "/usr/local/lib/python3.11/site-packages/celery/beat.py", line 337, in tick
celery_beat    |     not self.schedules_equal(self.old_schedulers, self.schedule)):
celery_beat    |                                                   ^^^^^^^^^^^^^
celery_beat    |   File "/usr/local/lib/python3.11/site-packages/sqlalchemy_celery_beat/schedulers.py", line 429, in schedule
celery_beat    |     self.sync()
celery_beat    |   File "/usr/local/lib/python3.11/site-packages/sqlalchemy_celery_beat/schedulers.py", line 362, in sync
celery_beat    |     self.schedule[name].save()  # save to database
celery_beat    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^
celery_beat    |   File "/usr/local/lib/python3.11/site-packages/sqlalchemy_celery_beat/schedulers.py", line 179, in save
celery_beat    |     setattr(obj, field, getattr(self.model, field))
celery_beat    | AttributeError: 'NoneType' object has no attribute 'last_run_at'

For now I'm just disabling instead of deleting. I guess it's not a big issue and more of curiosity. Have you ran into this issue? Do you know what's going on with it?

Deprecation Warning

/usr/local/lib/python3.11/site-packages/sqlalchemy_celery_beat/session.py:36: MovedIn20Warning: The declarative_base() function is now available as sqlalchemy.orm.declarative_base(). (deprecated since: 2.0) (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)

Cron task is getting called twice

I have a function that gets called from a flask endpoint to insert/update a task and its schedule.

@app.task
def update_task_schedule(name):
    session = session_maker()

    schedule = CrontabSchedule.from_schedule(
        session,
        schedules.crontab(
            minute='22',
            hour='*',
            day_of_week='*',
           day_of_month='*',
           month_of_year='*',
        )
    )

    task_stmt = db.select(PeriodicTask).where(PeriodicTask.name == name)
    result = session.execute(task_stmt)
    task = result.first()

    if task is None:
        stmt = db.insert(PeriodicTask).values(
            name=name,
            task='tasks.celery.index',
            schedule_id=schedule.id,
            discriminator=schedule.discriminator)
    else:
        stmt = db.update(PeriodicTask).values(
        schedule_id=schedule.id,
        discriminator=schedule.discriminator).where(PeriodicTask.name == name)

    session.execute(stmt)
    session.commit()

    PeriodicTaskChanged.update_from_session(session)

After this function is called the task looks like this in the database:

'3','index','tasks.celery.index','[]','{}',NULL,NULL,NULL,'{}',NULL,NULL,NULL,'0',NULL,'1',NULL,'0','2024-02-15 15:21:27','','crontabschedule','11'

'11','22','*','*','*','*','UTC'

On the console I see the task being invoked twice:

INFO: 2024/02/15 11:22:00 Task tasks.celery.index[11418f46-42d5-4db5-9605-83831699b097] received
DEBUG: 2024/02/15 11:22:00 TaskPool: Apply <function fast_trace_task at 0x10ca44700> (args:('tasks.celery.index', '11418f46-42d5-4db5-9605-83831699b097', {'lang': 'py', 'task': 'tasks.celery.index', 'id': '11418f46-42d5-4db5-9605-83831699b097', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '11418f46-42d5-4db5-9605-83831699b097', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': '[email protected]', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}, 'properties': {'correlation_id': '11418f46-42d5-4db5-9605-83831699b097', 'reply_to': '6ddb9a64-3fe0-310e-948b-f79802c585a0', 'periodic_task_name': 'index', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': '3654ceb6-cb2b-430a-988d-d33d24e32e61'}, 'reply_to': '6ddb9a64-3fe0-310e-948b-f79802c585a0', 'correlation_id': '11418f46-42d5-4db5-9605-83831699b097', 'hostname': '[email protected]', 'delivery_info':... kwargs:{})

WARNING: 2024/02/15 11:22:00 INDEXING

INFO: 2024/02/15 11:22:00 Task tasks.celery.index[11418f46-42d5-4db5-9605-83831699b097] succeeded in 0.0007132080000360475s: None
INFO: 2024/02/15 11:22:00 Task tasks.celery.index[88c8b00f-920f-4763-9ca9-571f6746cb59] received
DEBUG: 2024/02/15 11:22:00 TaskPool: Apply <function fast_trace_task at 0x10ca44700> (args:('tasks.celery.index', '88c8b00f-920f-4763-9ca9-571f6746cb59', {'lang': 'py', 'task': 'tasks.celery.index', 'id': '88c8b00f-920f-4763-9ca9-571f6746cb59', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '88c8b00f-920f-4763-9ca9-571f6746cb59', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': '[email protected]', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}, 'properties': {'correlation_id': '88c8b00f-920f-4763-9ca9-571f6746cb59', 'reply_to': '5ccfc9a3-ed52-3606-b511-260e477914f9', 'periodic_task_name': 'index', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': '39dacb2b-c6a0-4e31-8179-256eebb46372'}, 'reply_to': '5ccfc9a3-ed52-3606-b511-260e477914f9', 'correlation_id': '88c8b00f-920f-4763-9ca9-571f6746cb59', 'hostname': '[email protected]', 'delivery_info':... kwargs:{})

WARNING: 2024/02/15 11:22:00 INDEXING

INFO: 2024/02/15 11:22:00 Task tasks.celery.index[88c8b00f-920f-4763-9ca9-571f6746cb59] succeeded in 0.0005478840000137097s: None

Question: How to relate celery_periodic_task to celery_task_meta

Essentially what I'm looking for is how to associate a Task with the results from its execution. To be able to show instances of failure and the current status of a Task.

I think this can be achieved by:
Option 1: linking the celery_periodictask.id to the celery_taskmeta rows.
Option 2: give the same in celery_periodictask.name to the celery_taskmeta.name.

I'm just not sure about how to execute this the above steps. Celery's documentation does not provide decent insights.

Once again I'm probably missing something glaring. Sorry in advance, getting to around Celery for the first time :).

Passing a dict as an argument to a task

Hey, again @farahats9. 'm trying to pass a dictionary as an argument to a scheduled task:

asd = {
    "x": 5,
    "y": 5
}

import json
task = PeriodicTask(
    schedule_model=schedule,
    name='Adding to 10',
    task='main.add',
    args=json.dumps(asd)
)

It's saved with a dict structure,
Screenshot 2024-03-04 at 16 27 34

But on the receiving end only the keys are provided.
Screenshot 2024-03-04 at 16 36 17

Is this a limitation imposed by sqlalchemy-celery-beat or by celery? Still not very sure where the line between the two is.

'Asia/Shanghai' object has no attribute 'key'

When running celery, I was prompted that I could not add celery.backend_cleanup to the database because I changed the time zone,I don't know how to fix it.
Cannot add entry 'celery.backend_cleanup' to database schedule: AttributeError("'Asia/Shanghai' object has no attribute 'key'"). Contents: {'task': 'celery.backend_cleanup', 'schedule': <crontab: 0 4 * * * (m/h/d/dM/MY)>, 'options': {'expire_seconds': 43200}}

Here's my code:
app = CeleryApp('demo') app.con.timezone = 'Asia/Shanghai' app.conf.enable_utc = False

Run celery on linux with the following code:
celery -A celery:app worker -B -l info

I hope someone can help me solve the problem, thank you!

Crontab: hourly scheduled task executes every minute

👋 Hi! I would like to report an issue where a task configured to run every hour at minute 0 is being executed every minute instead:

from celery.schedules import crontab

[...]

config = {
    "beat_schedule": {
        "run_every_hour": {
            "task": "tasks.run_every_hour",
            "schedule": crontab(hour="*", minute=0),
        },
    },
}
celery.conf.update(config)

[...]

It seems there might be an issue in translating the schedule to the database:

mysql> select * from celery_crontabschedule;
+----+--------+------+-------------+--------------+---------------+----------+
| id | minute | hour | day_of_week | day_of_month | month_of_year | timezone |
+----+--------+------+-------------+--------------+---------------+----------+
|  1 | 0      | 4    | *           | *            | *             | UTC      |
|  2 | *      | *    | *           | *            | *             | UTC      |
+----+--------+------+-------------+--------------+---------------+----------+
mysql> select * from celery_periodictask where task = 'tasks.run_every_hour'\G
*************************** 1. row ***************************
             id: 2
           name: run_every_hour
           task: tasks.run_every_hour
           args: []
         kwargs: {}
          queue: NULL
       exchange: NULL
    routing_key: NULL
        headers: {}
       priority: NULL
        expires: NULL
 expire_seconds: NULL
        one_off: 0
     start_time: NULL
        enabled: 1
    last_run_at: 2024-06-11 12:18:00
total_run_count: 3
   date_changed: 2024-06-11 12:18:30
    description:
  discriminator: crontabschedule
    schedule_id: 2
1 row in set (0.01 sec)

I'm using version v0.7.1 using SQLAlchemy 1.4 and MySQL as backend. Thanks!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.