Comments (4)
I think this is a nice solution for the case where you want to schedule a subsequent run only if the queue item is being "worked".
from pq.
You could use a cronjob to add items to the queue. You can then either use schedule_at
to defer execution to a later time or just rely on the cronjob schedule to make the queue item available for work.
from pq.
@ThibTrip I second @malthe here
Add the logic to self-schedule a job in the future, pq
will take care of it (if your job does too many things, create a dispatcher worker).
from pq.
Thanks for your quick answers
@stas This is the solution I went for pq
does not seem to save the timezone for schedule_at
(because I always provide schedule_at
as a datetime with UTC timezone).
import datetime
import pytz
from croniter import croniter # pip install croniter
from loguru import logger # pip install loguru
def requeue_job(queue, job, delete_job=False, engine=None):
"""
Requeues a job created with the use of the library pq.
This is a workaround for making recurring tasks.
The job must fullfill the following conditions:
* have a key called "cron" with a valid cronjob expression in its data
* have a datetime for its schedule_at attribute
**WARNING**: we assume the timezone of the datetime is UTC (pq does not
seem to save the TZ information in the attribute schedule_at).
Parameters
----------
queue : pq.Queue
job : pq.Job
delete_job : bool, default False
Whether to delete the job that's been passed (happens after requeuing it).
If True, an engine must be provided.
engine : sqlalchemy.engine.base.Engine or None, default None
Needed if delete_job is True
Examples
--------
>>> import datetime
>>>
>>> # let's assume you have defined a queue somewhere
>>> queue.put(data={'url':'https://www.github.com', 'cron':'0 2 * * *'},
... schedule_at=datetime.datetime.now().astimezone(datetime.timezone.utc))
>>>
>>> job = queue.get()
>>> # do something...
>>> requeue_job(queue=queue, job=job)
"""
# verify data
## type
data = job.data
if not isinstance(data, dict):
raise TypeError(f'Expecting job.data to be dict. Received type {type(data)} instead')
## content
if 'cron' not in data:
raise ValueError('Key "cron" is missing from job.data')
if not job.schedule_at:
raise ValueError('schedule_at must have been provided for the job (it is None)!')
# make sure schedule_at is timezoned
schedule_at = job.schedule_at
if schedule_at.tzinfo is None:
schedule_at = pytz.utc.localize(schedule_at)
# get next schedule based on cron expression
cron, start_time = data['cron'], job.schedule_at
iter = croniter(expr_format=cron, start_time=start_time)
schedule_at = iter.get_next(datetime.datetime)
logger.debug(f'Requeuing job {job.id}. Next execution: {schedule_at} (base_time: {start_time}, cron: {cron})')
# requeue job
new_job_id = queue.put(job.data, schedule_at=schedule_at)
# delete job
if delete_job:
if not engine:
raise ValueError('I need a sqlalchemy engine to drop jobs! You have not provided any')
engine.execute(f'DELETE FROM "{queue.name}" WHERE id=%(job_id)s;', job_id=job.id)
return new_job_id
from pq.
Related Issues (20)
- 1.6? HOT 2
- pq as a "jobs queue" - items can not be read concurrently when using transactional get HOT 19
- Support for schema qualified tables HOT 3
- Hardwired json.dumps introduces double encoding problem HOT 7
- connection pool exhausted HOT 3
- DuplicatePreparedStatement error HOT 10
- Prepared statement name not properly escaped HOT 1
- Question: why am I seeing "NOTICE: function pq_notify does not exist" in the logs? HOT 3
- Delete executed tasks?! HOT 1
- Wait on a job? HOT 3
- get() timeout not honoured HOT 1
- Any interest in porting to cockroach HOT 3
- Pipenv using old create.sql HOT 5
- Get id of current task HOT 2
- Performance with a very large queue HOT 9
- lost trigger when setting up multiple queue-tables within the same schema
- queue.put() inside a transaction sets enqueued_at to the transaction start time, not the current time HOT 4
- Adding name of the job producer HOT 2
- Dashboard for PQ HOT 3
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from pq.