Code Monkey home page Code Monkey logo

pq's Introduction

PQ

A transactional queue system for PostgreSQL written in Python.

PQ does the job!

It allows you to push and pop items in and out of a queue in various ways and also provides two scheduling options: delayed processing and prioritization.

The system uses a single table that holds all jobs across queues; the specifics are easy to customize.

The system currently supports only the psycopg2 database driver - or psycopg2cffi for PyPy.

The basic queue implementation is similar to Ryan Smith's queue_classic library written in Ruby, but uses SKIP LOCKED for concurrency control.

In terms of performance, the implementation clock in at about 1,000 operations per second. Using the PyPy interpreter, this scales linearly with the number of cores available.

Getting started

All functionality is encapsulated in a single class PQ.

class PQ(conn=None, pool=None, table="queue", schema=None)

The optional schema argument can be used to qualify the table with a schema if necessary.

Example usage:

from psycopg2 import connect
from pq import PQ

conn = connect('dbname=example user=postgres')
pq = PQ(conn)

For multi-threaded operation, use a connection pool such as psycopg2.pool.ThreadedConnectionPool.

You probably want to make sure your database is created with the utf-8 encoding.

To create and configure the queue table, call the create() method.

pq.create()

Queues

The pq object exposes queues through Python's dictionary interface:

queue = pq['apples']

The queue object provides get and put methods as explained below, and in addition, it also works as a context manager where it manages a transaction:

with queue as cursor:
    ...

The statements inside the context manager are either committed as a transaction or rejected, atomically. This is useful when a queue is used to manage jobs because it allows you to retrieve a job from the queue, perform a job and write a result, with transactional semantics.

Methods

Use the put(data) method to insert an item into the queue. It takes a JSON-compatible object such as a Python dictionary:

queue.put({'kind': 'Cox'})
queue.put({'kind': 'Arthur Turner'})
queue.put({'kind': 'Golden Delicious'})

Items are pulled out of the queue using get(block=True). The default behavior is to block until an item is available with a default timeout of one second after which a value of None is returned.

def eat(kind):
    print 'umm, %s apples taste good.' % kind

job = queue.get()
eat(**job.data)

The job object provides additional metadata in addition to the data attribute as illustrated by the string representation:

>>> job
<pq.Job id=77709 size=1 enqueued_at="2014-02-21T16:22:06Z" schedule_at=None>

The get operation is also available through iteration:

for job in queue:
    if job is None:
        break

    eat(**job.data)

The iterator blocks if no item is available. Again, there is a default timeout of one second, after which the iterator yields a value of None.

An application can then choose to break out of the loop, or wait again for an item to be ready.

for job in queue:
    if job is not None:
        eat(**job.data)

    # This is an infinite loop!

Scheduling

Items can be scheduled such that they're not pulled until a later time:

queue.put({'kind': 'Cox'}, '5m')

In this example, the item is ready for work five minutes later. The method also accepts datetime and timedelta objects.

Priority

If some items are more important than others, a time expectation can be expressed:

queue.put({'kind': 'Cox'}, expected_at='5m')

This tells the queue processor to give priority to this item over an item expected at a later time, and conversely, to prefer an item with an earlier expected time. Note that items without a set priority are pulled last.

The scheduling and priority options can be combined:

queue.put({'kind': 'Cox'}, '1h', '2h')

This item won't be pulled out until after one hour, and even then, it's only processed subject to it's priority of two hours.

Encoding and decoding

The task data is encoded and decoded into JSON using the built-in json module. If you want to use a different implementation or need to configure this, pass encode and/or decode arguments to the PQ constructor.

Pickles

If a queue name is provided as <name>/pickle (e.g. 'jobs/pickle'), items are automatically pickled and unpickled using Python's built-in cPickle module:

queue = pq['apples/pickle']

class Apple(object):
    def __init__(self, kind):
       self.kind = kind

queue.put(Apple('Cox'))

This allows you to store most objects without having to add any further serialization code.

The old pickle protocol 0 is used to ensure the pickled data is encoded as ascii which should be compatible with any database encoding. Note that the pickle data is still wrapped as a JSON string at the database level.

While using the pickle protocol is an easy way to serialize objects, for advanced users t might be better to use JSON serialization directly on the objects, using for example the object hook mechanism in the built-in json module or subclassing JSONEncoder <https://docs.python.org/2/library/json.html#json.JSONEncoder>.

Tasks

pq comes with a higher level API that helps to manage tasks.

from pq.tasks import PQ

pq = PQ(...)

queue = pq['default']

@queue.task(schedule_at='1h')
def eat(job_id, kind):
    print 'umm, %s apples taste good.' % kind

eat('Cox')

queue.work()

tasks's jobs can optionally be re-scheduled on failure:

@queue.task(schedule_at='1h', max_retries=2, retry_in='10s')
def eat(job_id, kind):
    # ...

Time expectations can be overriden at task call:

eat('Cox', _expected_at='2m', _schedule_at='1m')

** NOTE ** First positional argument is id of job. It's PK of record in PostgreSQL.

Thread-safety

All objects are thread-safe as long as a connection pool is provided where each thread receives its own database connection.

pq's People

Contributors

3manuek avatar billymillions-zz avatar coffenbacher avatar danilaeremin avatar dansamara avatar gotcha avatar jeanphix avatar kalekseev avatar malthe avatar messense avatar migurski avatar minyoung avatar ronnix avatar stas avatar xq5he avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pq's Issues

Adding recurring tasks?

Hello,

is it possible to add recurring tasks similarly to a cronjob? I scrolled quickly through the issues and the source code and could not find any indication for that. If this functionality does not exist I'd be interested in easy workaround ideas ๐Ÿค” .

The use case is a queue of URLs to be crawled from multiple computers regularly (e.g. everyday at 1 AM). To make things more difficult all URLs are crawled at different times (some at 1 AM, others at 2 AM etc.).

It seems this was implemented in the django-pq library but it is unmaintained and there has not been a commit since 2014. Also your library has been very reliable for me so it'd be nice to use it for this as well ๐Ÿ‘.

Specifications

  • Version: 1.9.0
  • Python version: 3.8.3

Support for schema qualified tables

Hi there,
I would like to place my queue table in a schema other than public.
When I set the table arg to a schema qualified table name, the following error occurs:

Traceback (most recent call last):
  File "worker.py", line 139, in <module>
    for job in queue:
  File "/Users/marcel/.pyenv/versions/2.7.15/lib/python2.7/site-packages/pq/__init__.py", line 86, in next
    return QueueIterator.__next__(self)
  File "/Users/marcel/.pyenv/versions/2.7.15/lib/python2.7/site-packages/pq/__init__.py", line 81, in __next__
    return self.queue.get(timeout=self.timeout)
  File "/Users/marcel/.pyenv/versions/2.7.15/lib/python2.7/site-packages/pq/__init__.py", line 176, in get
    cursor, block
  File "/Users/marcel/.pyenv/versions/2.7.15/lib/python2.7/site-packages/pq/utils.py", line 69, in wrapper
    cursor.execute("PREPARE %s AS\n%s" % (name, query), d)
psycopg2.ProgrammingError: syntax error at or near "."
LINE 1: PREPARE _pull_item_fsm.queue_test_fsm.queue AS

Is this possible?
Regards,
Marcel

asyncio variant would be great

Waiting for new item from a queue shall be awaitable, allowing other tasks around to run.

As there exist asyncio version of postgresql driver, it should be technically possible.

As it would break python 2.x compatibility, it would probably require either fork or new, backward incompatible version.

Regarding "patches welcome" my current priority is to implement it for single consumer consuming items in order of insertion. If I succeed with anything (matter of my capacity and other priorities), I would leave a note here.

Question: why am I seeing "NOTICE: function pq_notify does not exist" in the logs?

I am running the two consumers in docker containers. The to run the process is:

def main(queue_name, processor):
    """
    queue is blocking as long as there are no tasks.
    """
    for i in range(20):
        try:
            pq = PQ(database.connection())
            break
        except peewee.OperationalError as err:
            print(err)
            # the database is still not receiving connections
            if re.search("Connection refused", err.args[0]):
                time.sleep(1)
    try:
        pq.create()
    except errors.DuplicateTable:
        print("queue table already exists")

    queue = pq[queue_name]
    print("waiting for jobs ...")

    for job in queue:
        if job is not None:
            print("Processing job id %d" % job.id)
            try:
                processor(**job.data).process()
            except api.ViesError as E:
                queue.put(job.data, schedule_at='5m')
            except Exception as E:
                print("Something weired happend with that data: ", job.data)
                raise E

When I start a fresh postgresql container I see the following logs:

$ docker logs -f vatprocessor 
Sending emails via smtpd:25
processing vat queue
could not connect to server: Connection refused
	Is the server running on host "db" (172.18.0.2) and accepting
	TCP/IP connections on port 5432?

could not connect to server: Connection refused
	Is the server running on host "db" (172.18.0.2) and accepting
	TCP/IP connections on port 5432?

NOTICE:  function pq_notify() does not exist, skipping

waiting for jobs ...
NOTICE:  function pq_notify() does not exist, skipping

NOTICE:  function pq_notify() does not exist, skipping

NOTICE:  function pq_notify() does not exist, skipping

NOTICE:  function pq_notify() does not exist, skipping

NOTICE:  function pq_notify() does not exist, skipping

NOTICE:  function pq_notify() does not exist, skipping

NOTICE:  function pq_notify() does not exist, skipping

NOTICE:  function pq_notify() does not exist, skipping

NOTICE:  function pq_notify() does not exist, skipping

NOTICE:  function pq_notify() does not exist, skipping

Processing job id 1

If I stop the containers and restart them again, I no longer see the NOTICE message.
As far as I can tell my code works fine, despite the notice. I am just curious to know why I am seeing this message.
I believe it comes for the database it's as I could not find any reference to it in the code.

get items sometimes doesn't work

Expected Behavior

After putting an item in the queue, get must always fetch the item in the queue

Actual Behavior

After putting an item in the queue, sometimes randomly the get function does not fetches the item

Steps to Reproduce the Problem

  1. Put an item
  2. get the item with a timeout, i.e block until item is available
  3. although the item is available in the queue (ive checked the database manually to see that the item is indeed committed to the db) the get function blocks and does not return
  4. I must emphasize that this behavior occurs randomly and such these steps might not work for someone else.

Specifications

  • Version: 1.9.1
  • Python version: 3.11.6
  • Postgres version: 16.1

_pull_item query can be executed over non-existent q_names

When pulling items over non-existent q_name, long execution queries with blocking enabled will hold unnecessary locks and introduce noise in the LISTEN/NOTIFY.

LISTEN/NOTIFY mechanism is nicely explained at Listening connections arent cheap . Also, the documentation quotes an important note regarding notify queue:

Although there is only one queue, notifications are treated as being
 database-local; this is done by including the sender's database OID
 in each notification message.  Listening backends ignore messages
 that don't match their database OID.

The above is important as is a limitation for scaling queues on the same database, as it forces to shard into databases by q_names. This can be considered as separated issues, however the mechanism is ruled by the async.c internals and it won't changes AFAIK.

Probably a q_name existence check could save failed attempts when starting _pull_item.

Performance with a very large queue

Expected Behavior

Expect ms-level times for retrieving the next entry off the queue.

Actual Behavior

Times of several full seconds as the queue size exceeds a couple million.

Steps to Reproduce the Problem

Only potential quirk is this is against PgSQL 9.6. Version issues?

  1. Fill the queue.
  2. Attempt to pull records off the queue
  3. Time them

The bottleneck appeared to be the sort stage, which was not index assisted in the plans. As the number of items on the queue grew the sort took up more and more time.

https://github.com/malthe/pq/blob/master/pq/__init__.py#L308

Specifications

  • Version:
  • Python version: 3.6

Tasks PQ queue_class isn't used

The tasks PQ() class' queue_class attribute isn't being used during instantiating since the base PQ()'s __init__() method hardcodes the default queue class to Queue (from __init__.py):

self.queue_class = kwargs.pop('queue_class', Queue)

A simple fix would be to add a queue_class = Queue to the base PQ and change the line in __init__() to:

self.queue_class = kwargs.pop('queue_class', self.queue_class)

PS: It might also be a good idea to rename the task varieties of PQ and Queue to TaskPQ and TaskQueue to avoid confusion (which might be noticeable from this report)

Delete executed tasks?!

This is merely a proposal, but should we consider adding optional support for auto-deletion of the tasks executed by the pq.tasks API?

Dashboard for PQ

Hi,

This is not an issue with PQ itself, please close if not of interest!

I've recently created a dashboard for pq, imaginatively named pq-dashboard. The intention was to create something generic like rq-dashboard, which people use with rq. It's available via pip install pq-dashboard as you would expect. It supports inspecting queues and items, searching / deleting / requeueing items, and understands how the task API works.

I'm raising this issue just to bring the dashboard to your attention and in case you have any advice for further development / feature suggestions. Something I'd like to do (which rq-dashboard has) is expose which worker processes are processing queue items - but this might be hard to do generically since AFAIK postgres doesn't provide an easy way of seeing who's listening to pq_notify.

Thanks,

Tom

Adding name of the job producer

Is it possible to add the name of the job producer? It would be very helpful.

from psycopg2 import connect
from pq import PQ

conn = connect('dbname=example user=postgres')
pq = PQ(conn, client_name='test_client')

Worker will see the name of producer. And preferrably adding column submitted_by into the database schema.

>>> job
<pq.Job id=77709 submitted_by="test_client" size=1 enqueued_at="2014-02-21T16:22:06Z" schedule_at=None>

Doesn't work with PgBouncer

Unfortunately pq seems to be non-functional when used with PgBouncer, especially combined with threads. I haven't done any comprehensive testing yet sorry.

  1. Prepared statements seem to persist between connections somehow, so errors like:
    psycopg2.ProgrammingError: prepared statement "_count_queue_test" already exists occur.

  2. Prepared statements will then continue to persist even if a new connection is created. Including from a completely new connection. Visible by running select * from pg_prepared_statements;

  3. I've also seen the opposite error:
    psycopg2.OperationalError: prepared statement "_pull_item_queue_test_queue" does not exist

Tested with both the included tests, and a small threaded example, where PgBouncer is running on port 6432:

import threading
import time
from pq import PQ
from psycopg2.pool import ThreadedConnectionPool

pool = ThreadedConnectionPool(2, 5, 'dbname=test host=localhost port=6432')
pq = PQ(pool=pool)
def source(queue):
    """thread worker function"""
    queue = queue['test']
    for i in range(5):
        queue.put({'num': i})
        time.sleep(0.5)

def sink(queue):
    queue = queue['test']
    for job in queue:
        if job is None:
            break

        print('sink got %s' % job.data)
    
mysource = threading.Thread(target=source, args=(pq,))
mysink = threading.Thread(target=sink, args=(pq,))

mysource.start()
mysink.start()

Pipenv using old create.sql

Expected Behavior

PQ.create() is idempotent

Actual Behavior

Subsequent calls to PQ.create() fails.

Steps to Reproduce the Problem

  1. pip install pipenv
  2. mkdir pq-pipenv && cd pq-pipenv
  3. pipenv --python 3.7
  4. pipenv install pq psycopg2-binary
  5. pipenv run pip freeze
  6. Create the following file test.py
from psycopg2 import connect
from pq import PQ

conn = connect('postgres://xx:xx@localhost:5432/xx')
pq = PQ(conn)
pq.create()
  1. Execute the script more than once pipenv run python test.py.
[jodewey:~/pq-pipenv] % pipenv run python test.py
[jodewey:~/pq-pipenv] % pipenv run python test.py
Traceback (most recent call last):
  File "test.py", line 6, in <module>
    pq.create()
  File "/Users/jodewey/.local/share/virtualenvs/pq-pipenv-EeMTMI3R/lib/python3.7/site-packages/pq/__init__.py", line 63, in create
    cursor.execute(sql, {'name': Literal(queue.table)})
psycopg2.errors.DuplicateTable: relation "queue" already exists
CONTEXT:  SQL statement "CREATE TABLE queue (
  id          bigserial    PRIMARY KEY,
  enqueued_at timestamptz  NOT NULL DEFAULT current_timestamp,
  dequeued_at timestamptz,
  expected_at timestamptz,
  schedule_at timestamptz,
  q_name      text         NOT NULL CHECK (length(q_name) > 0),
  data        json         NOT NULL
)"
PL/pgSQL function inline_code_block line 3 at SQL statement

After further investigation the create.sql file in the virtualenv is old. It does not contain the version from 1.8.1.

[jodewey:~/pq-pipenv] % cat /Users/jodewey/.local/share/virtualenvs/pq-pipenv-EeMTMI3R/lib/python3.7/site-packages/pq/create.sql
do $$ begin

CREATE TABLE %(name)s (
  id          bigserial    PRIMARY KEY,
  enqueued_at timestamptz  NOT NULL DEFAULT current_timestamp,
  dequeued_at timestamptz,
  expected_at timestamptz,
  schedule_at timestamptz,
  q_name      text         NOT NULL CHECK (length(q_name) > 0),
  data        json         NOT NULL
);

end $$ language plpgsql;

create index priority_idx_%(name)s on %(name)s
    (schedule_at nulls first, expected_at nulls first, q_name)
    where dequeued_at is null
          and q_name = '%(name)s';

create index priority_idx_no_%(name)s on %(name)s
    (schedule_at nulls first, expected_at nulls first, q_name)
    where dequeued_at is null
          and q_name != '%(name)s';

drop function if exists pq_notify() cascade;

create function pq_notify() returns trigger as $$ begin
  perform pg_notify(new.q_name, '');
  return null;
end $$ language plpgsql;

create trigger pq_insert
after insert on %(name)s
for each row
execute procedure pq_notify();

Specifications

  • Version:
[jodewey:~/pq-pipenv] 1 % pipenv run pip freeze
pq==1.8.1
psycopg2-binary==2.8.5
  • Python version:
[jodewey:~/pq-pipenv] % python --version
Python 3.7.6

Get id of current task

Any mechanism for getting task id in executor.

I can get it via inspect module (get local variable job in Queue.perform). But it's not very good approach.

I see several ways of implementation:

  1. Thread-local variable and job_id property. It's not safe in async functions
  2. Same as 1) but with contextvars module. This will require Python 3.7 or back port module.
  3. Determine if function has keyword argument job_id and pass it in such case.

I can make pull request with any of the options if will be decision about implementation.

get() timeout not honoured

The timeout parameter to get() is sometimes sticky, sometimes not honoured.

Consider this sample:

# [...]
import logging; logging.getLogger('pq').setLevel(logging.DEBUG)
queues = pq.PQ(pool=pool)

q = queues['empty_queue_1']
q.get(timeout=5)
q.get(timeout=0)

q = queues['empty_queue_2']
q.get(timeout=0)
q.get(timeout=5)

Expected Behavior

The requested timeouts are honoured, i.e.

  • 5 seconds
  • 0 seconds (infinite? or same as block=False?)
  • 0 seconds
  • 5 seconds

Actual Behavior

Instead, the first timeout is used in all subsequent calls, although never less than 1 second.

2020-07-03 16:22:43.095 DEBUG [pq] timeout (5.000 seconds).
2020-07-03 16:22:48.154 DEBUG [pq] timeout (5.000 seconds).
2020-07-03 16:22:49.211 DEBUG [pq] timeout (1.000 seconds).
2020-07-03 16:22:50.271 DEBUG [pq] timeout (1.000 seconds).

Steps to Reproduce the Problem

Specifications

  • Version: 1.8.2-dev
  • Python version: 3.7.6

DuplicatePreparedStatement error

My application is based on peewee and bottle, I am getting the following error:

Traceback (most recent call last):
  File "/usr/src/app/venv/src/bottle/bottle.py", line 1005, in _handle
    out = route.call(**args)
  File "/usr/src/app/venv/src/bottle/bottle.py", line 2017, in wrapper
    rv = callback(*a, **ka)
  File "/run/app/apps/backend/views.py", line 33, in _enable_cors
    return fn(*args, **kwargs)
  File "/run/app/apps/backend/views.py", line 60, in register
    User.create(**request.json)
  File "/usr/src/app/venv/lib/python3.6/site-packages/peewee.py", line 6235, in create
    inst.save(force_insert=True)
  File "/usr/src/app/venv/lib/python3.6/site-packages/playhouse/signals.py", line 72, in save
    post_save.send(self, created=created)
  File "/usr/src/app/venv/lib/python3.6/site-packages/playhouse/signals.py", line 51, in send
    responses.append((r, r(sender, instance, *args, **kwargs)))
  File "/run/app/apps/backend/models.py", line 57, in on_entry_save
    queue.put(model_as_dict)
  File "/usr/src/app/venv/lib/python3.6/site-packages/pq/__init__.py", line 232, in put
    utc_format(expected_at) if expected_at is not None else None,
  File "/usr/src/app/venv/lib/python3.6/site-packages/pq/utils.py", line 69, in wrapper
    cursor.execute("PREPARE %s AS\n%s" % (name, query), d)
psycopg2.errors.DuplicatePreparedStatement: prepared statement "_put_item_queue_vat" already exists

Can you maybe explain how to avoid this?

pq as a "jobs queue" - items can not be read concurrently when using transactional get

The use-case: running multiple threads that pull an item from the queue and handle it concurrently. In addition, in case of a failure (of any kind) in the handling logic, the item should be returned to the queue and be available in the next iteration (by opening a new transaction for each pull).

I realized that my jobs are running synchronously and that all threads are blocked when one thread works on an item, and they are released only after it finishes to handle the item and releases the transaction.

I believe it happens because there's an "UPDATE" SQL statement in "_pull_item" method in "pq/init.py" which probably locks the table until the transaction is released.

Any ideas on how to make the instances run concurrently?

Code example:

import time
from datetime import datetime
from threading import Thread, get_ident
from typing import Callable
from pq import PQ, Queue
from psycopg2 import connect


def _create_queue_instance(queue_name: str):
    conn = connect('<CONN_STRING>')
    pq = PQ(conn)
    queue = pq[queue_name]
    return queue


def _job(queue: Queue, handler: Callable):
    while True:
        try:
            with queue:
                item = queue.get()
                if not item:
                    continue

                print(f'[{datetime.utcnow()}] {get_ident()} (thread) | {queue.name} (queue) | Starting to handle an item')
                handler(item.data)
                print(f'[{datetime.utcnow()}] {get_ident()} (thread) | {queue.name} (queue) | Done')
        except:
            print(f'An exception was thrown while handling an item for queue: {queue.name}')


def _simple_handler(item):
    print(item)
    print('sleeping for 10 seconds')
    time.sleep(10)


def main():
    # Inserting some items
    q = _create_queue_instance('test_queue')
    for i in range(5):
        q.put(f'item-{i}')

    # Starting the job instances
    threads = []
    for _ in range(5):
        q = _create_queue_instance('test_queue')
        t = Thread(target=_job,
                   args=(q, _simple_handler),
                   daemon=True)
        t.start()
        threads.append(t)

    # Blocking the app from ending.
    for t in threads:
        t.join()


if __name__ == '__main__':
    main()

The output I get:

[2019-03-31 12:45:26.840991] 123145582637056 (thread) | test_queue (queue) | Starting to handle an item
item-4
sleeping for 10 seconds
[2019-03-31 12:45:36.845939] 123145582637056 (thread) | test_queue (queue) | Done
[2019-03-31 12:45:36.854240] 123145582637056 (thread) | test_queue (queue) | Starting to handle an item
item-3
sleeping for 10 seconds
[2019-03-31 12:45:46.858404] 123145582637056 (thread) | test_queue (queue) | Done
[2019-03-31 12:45:46.866138] 123145582637056 (thread) | test_queue (queue) | Starting to handle an item
item-2
sleeping for 10 seconds
[2019-03-31 12:45:56.871011] 123145582637056 (thread) | test_queue (queue) | Done
[2019-03-31 12:45:56.878916] 123145582637056 (thread) | test_queue (queue) | Starting to handle an item
item-1
sleeping for 10 seconds
[2019-03-31 12:46:06.882754] 123145582637056 (thread) | test_queue (queue) | Done
[2019-03-31 12:46:06.891333] 123145582637056 (thread) | test_queue (queue) | Starting to handle an item
item-0
sleeping for 10 seconds
[2019-03-31 12:46:16.893778] 123145582637056 (thread) | test_queue (queue) | Done

Prepared statement name not properly escaped

Using arbitrary queue names without having to quote them (in base64 or whatever) does not currently work.

Example:

pq = PQ(pool=pool)
pq['two words'].put(1) 

Error:

SyntaxError: syntax error at or near "words"
LINE 1: PREPARE _put_item_queue_two words AS

A related potential problem is that

PQ(pool=pool, table='queue_two')['words']

and

PQ(pool=pool, table='queue')['two_words']

looks like they would map to the same prepared statement name. But I'm not sure if multiple tables are supposed to be supported?

Getting from an empty queue slower than checking its length

Thanks for creating pq, I've found it very useful in my work.

In my application, I observed that checking an empty queue took over 1 second, whereas checking the length of an empty queue was fast. My queue table had ~100 - 1000 rows for this test. Replacing every instance of:

item = queue.get()

with

if len(queue):
    item = queue.get()

improved my response times dramatically.

I'm not sure if this behavior is specific to my setup or a general issue, but i thought I would bring it to your attention in case it's helpful.

Thanks again for the excellent project.

Any interest in porting to cockroach

I was curious if you would be interested in porting to cockroach. As I understand it, cockroach has implemented SKIP LOCKED, and while I like pg, I am currently using cockroach b/c of it's wonderful clustering capabilities.

I do understand this request is probably out of scope for what this project is doing, but was curious if you might be interested if it is minimal effort.

John

1.6?

Hi, thanks for writing this, love the lightweight and straightforward approach. Any update on the 1.6 release? I see the last one was more than 1 year ago.

Current approach doesn't work with two-phase commit

Because Postgres doesn't allow PREPARE of transactions that have notified:

cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY

whimper.

The notify wouldn't necessarily have to be in the same transaction as writing to the queue.

I'm going to try to hack around this:

  • Remove the trigger after calling create()

  • Modify my code to notify in an after commit hook.

No action for you at this time. This is a FYI.

queue.put() inside a transaction sets enqueued_at to the transaction start time, not the current time

Expected Behavior

I'd expect enqueued_at to represent the time at which the task was put into the queue table (e.g. CLOCK_TIMESTAMP(), since that is less arbitrary than the start time of the transaction (CURRENT_TIMESTAMP).

Actual Behavior

enqueued_at is set to the transaction start time by default.

Steps to Reproduce the Problem

with q as cursor:
    thing = q.get()
    # do some processing
    time.sleep(3)
    # put the task back on the queue
    q.put(thing.data)  # timestamp here is the transaction start time, not the current time

Specifications

  • Version: 1.9.0
  • Python version: 3.8
  • PostgreSQL version: 10.10

Wait on a job?

Is there any mechanism to wait for a job to complete? I assume not because there doesn't seem to be any job tracking beyond what time it was dequeued (rather than finished), but thought I'd ask in case I missed something.

lost trigger when setting up multiple queue-tables within the same schema

Expected Behavior

Have completely functional queue-tables when having multiple ones

Actual Behavior

The sql-script for setting up the table sets up the trigger without having a refererence to the name of the table the trigger is associated with. That leads to the earliest defined table to loose its trigger when setting up the second one.

Solution seems relatively simple : in the sql-script make the table name a part of the trigger name.

Is interacting with existing transactions a feature?

I was disappointed to see that by default, put commits.

Reading the docs, I expected pq to only manage transactions when the queue was used as a context manager.

With a little digging in the code, I found that If I passed in a cursor:

pq = PQ(conn, cursor=cursor)
queue = pq['email']
queue.put(dict(to='[email protected]'))

The put worked in a savepoint.

Is this a feature or an accident of implementation?

Being able to use pq as part of larger transactions would be an extremely valuable feature. Triggering it by passing in a cursor seems like a bad way to express it. :)

Setup CI

We should setup either a travis or solano CI to make sure the builds are passing always.

connection pool exhausted

Hi,
I want to run multiple threads that work on jobs in a queue. If a job fails I want it to retry. This is my code:

pool = ThreadedConnectionPool(4, 4, connection_string)
pq = PQ(pool=pool)

try:
    pq.create()
    print("created queue table")
except:
    print("queue table already exists")

job_queue = pq["queue"]

def worker():
  while True:
    try:
      with job_queue:
        for job in job_queue:
          if job is None:
            print("job is None")
            break
          # do stuff

    except Exception as e:
        print(f"worker {current_thread()} died because: {str(e)} -- restarting...")


for i in range(4):
    Thread(target=worker).start()

However, I'm getting connection pool exhausted exceptions. My code probably throws exceptions but this ideally it would be able to retry without any issues.

I already looked at the tests but couldn't find the issue.

Should len(queue) skip items scheduled in the future?

Iโ€™d like to use len(queue) to determine the size of a worker pool. Currently, Queue._count() returns the count of all tasks including those scheduled in the future. This is different from the number of tasks currently available for work.

Can or should len(queue) be modified to count only tasks available to a worker? The addition of one conditional to the WHERE clause should be sufficient:

SELECT COUNT(*) FROM %(table)s
WHERE q_name = %(name)s AND dequeued_at IS NULL
  AND (schedule_at IS NULL OR schedule_at <= NOW())

Hardwired json.dumps introduces double encoding problem

When doing a put() to a Queue, there is a nested call to two different dumps functions:

https://github.com/malthe/pq/blob/master/pq/__init__.py#L226

The problematic expression is dumps(self.dumps(data)). The outer dumps is the stdlib json.dumps and the inner dumps is either the default lambda that does nothing, or a lambda that pickles the object.

There is a more pythonic approach than this double-dumping. A common pattern in python is to provide a json.JSONEncoder subclass that can encode, for example, datetime objects or even pickles for that matter. Currently, the datetime must first be encoded by hand before being passed to put. Because the outer json.dumps is hardwired to the stdlib function, it cannot be overridden with a different JSONEncoder subclass.

A cleaner implementation would be to remove the double dumps and simply support passing in JSONEncoder subclasses. The default json.JSONEncoder would exactly mimic the current default, and a subclass, say, PickleJSONEncoder would implement an encode(obj) function that did exactly what the pickling lambdas do now. With this approach, I or anyone else could pass in a custom JSONEncoder that suited our needs.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.