Code Monkey home page Code Monkey logo

persist-queue's Introduction

persist-queue - A thread-safe, disk-based queue for Python

image

image

image

image

PyPI - Python Version

persist-queue implements a file-based queue and a serial of sqlite3-based queues. The goals is to achieve following requirements:

  • Disk-based: each queued item should be stored in disk in case of any crash.
  • Thread-safe: can be used by multi-threaded producers and multi-threaded consumers.
  • Recoverable: Items can be read after process restart.
  • Green-compatible: can be used in greenlet or eventlet environment.

While queuelib and python-pqueue cannot fulfil all of above. After some try, I found it's hard to achieve based on their current implementation without huge code change. this is the motivation to start this project.

By default, persist-queue use pickle object serialization module to support object instances. Most built-in type, like int, dict, list are able to be persisted by persist-queue directly, to support customized objects, please refer to Pickling and unpickling extension types(Python2) and Pickling Class Instances(Python3)

This project is based on the achievements of python-pqueue and queuelib

Slack channels

Join persist-queue <https://join.slack .com/t/persist-queue/shared_invite /enQtOTM0MDgzNTQ0MDg3LTNmN2IzYjQ1MDc0MDYzMjI4OGJmNmVkNWE3ZDBjYzg5MDc0OWUzZDJkYTkwODdkZmYwODdjNjUzMTk3MWExNDE> channel

Requirements

  • Python 3.5 or newer versions (refer to Deprecation for older Python versions)
  • Full support for Linux and MacOS.
  • Windows support (with Caution if persistqueue.Queue is used).

Features

  • Multiple platforms support: Linux, macOS, Windows
  • Pure python
  • Both filed based queues and sqlite3 based queues are supported
  • Filed based queue: multiple serialization protocol support: pickle(default), msgpack, cbor, json

Deprecation

Installation

from pypi

pip install persist-queue
# for msgpack, cbor and mysql support, use following command
pip install "persist-queue[extra]"

from source code

git clone https://github.com/peter-wangxu/persist-queue
cd persist-queue
# for msgpack and cbor support, run 'pip install -r extra-requirements.txt' first
python setup.py install

Benchmark

Here are the time spent(in seconds) for writing/reading 1000 items to the disk comparing the sqlite3 and file queue.

  • Windows
    • OS: Windows 10
    • Disk: SATA3 SSD
    • RAM: 16 GiB
Write Write/Read(1 task_done) Write/Read(many task_done)
SQLite3 Queue 1.8880 2.0290 3.5940
File Queue 4.9520 5.0560 8.4900

windows note Performance of Windows File Queue has dramatic improvement since v0.4.1 due to the atomic renaming support(3-4X faster)

  • Linux
    • OS: Ubuntu 16.04 (VM)
    • Disk: SATA3 SSD
    • RAM: 4 GiB
Write Write/Read(1 task_done) Write/Read(many task_done)
SQLite3 Queue 1.8282 1.8075 2.8639
File Queue 0.9123 1.0411 2.5104
  • Mac OS
    • OS: 10.14 (macOS Mojave)
    • Disk: PCIe SSD
    • RAM: 16 GiB
Write Write/Read(1 task_done) Write/Read(many task_done)
SQLite3 Queue 0.1879 0.2115 0.3147
File Queue 0.5158 0.5357 1.0446

note

  • The value above is in seconds for reading/writing 1000 items, the less the better
  • Above result was got from:
python benchmark/run_benchmark.py 1000

To see the real performance on your host, run the script under benchmark/run_benchmark.py:

python benchmark/run_benchmark.py <COUNT, default to 100>

Examples

Example usage with a SQLite3 based queue

>>> import persistqueue
>>> q = persistqueue.SQLiteQueue('mypath', auto_commit=True)
>>> q.put('str1')
>>> q.put('str2')
>>> q.put('str3')
>>> q.get()
'str1'
>>> del q

Close the console, and then recreate the queue:

>>> import persistqueue
>>> q = persistqueue.SQLiteQueue('mypath', auto_commit=True)
>>> q.get()
'str2'
>>>

New functions: Available since v0.8.0

  • shrink_disk_usage perform a VACUUM against the sqlite, and rebuild the database file, this usually takes long time and frees a lot of disk space after get()

Example usage of SQLite3 based UniqueQ

This queue does not allow duplicate items.

>>> import persistqueue
>>> q = persistqueue.UniqueQ('mypath')
>>> q.put('str1')
>>> q.put('str1')
>>> q.size
1
>>> q.put('str2')
>>> q.size
2
>>>

Example usage of SQLite3 based SQLiteAckQueue/UniqueAckQ

The core functions:

  • put: add item to the queue. Returns id
  • get: get item from queue and mark as unack. Returns item, Optional paramaters (block, timeout, id, next_in_order, raw)
  • update: update an item. Returns id, Paramaters (item), Optional parameter if item not in raw format (id)
  • ack: mark item as acked. Returns id, Parameters (item or id)
  • nack: there might be something wrong with current consumer, so mark item as ready and new consumer will get it. Returns id, Parameters (item or id)
  • ack_failed: there might be something wrong during process, so just mark item as failed. Returns id, Parameters (item or id)
  • clear_acked_data: perform a sql delete agaist sqlite. It removes 1000 items, while keeping 1000 of the most recent, whose status is AckStatus.acked (note: this does not shrink the file size on disk) Optional paramters (max_delete, keep_latest, clear_ack_failed)
  • shrink_disk_usage perform a VACUUM against the sqlite, and rebuild the database file, this usually takes long time and frees a lot of disk space after clear_acked_data
  • queue: returns the database contents as a Python List[Dict]
  • active_size: The active size changes when an item is added (put) and completed (ack/ack_failed) unlike qsize which changes when an item is pulled (get) or returned (nack).
>>> import persistqueue
>>> ackq = persistqueue.SQLiteAckQueue('path')
>>> ackq.put('str1')
>>> item = ackq.get()
>>> # Do something with the item
>>> ackq.ack(item) # If done with the item
>>> ackq.nack(item) # Else mark item as `nack` so that it can be proceeded again by any worker
>>> ackq.ack_failed(item) # Or else mark item as `ack_failed` to discard this item

Parameters:

  • clear_acked_data
    • max_delete (defaults to 1000): This is the LIMIT. How many items to delete.
    • keep_latest (defaults to 1000): This is the OFFSET. How many recent items to keep.
    • clear_ack_failed (defaults to False): Clears the AckStatus.ack_failed in addition to the AckStatus.ack.
  • get
    • raw (defaults to False): Returns the metadata along with the record, which includes the id (pqid) and timestamp. On the SQLiteAckQueue, the raw results can be ack, nack, ack_failed similar to the normal return.
    • id (defaults to None): Accepts an id or a raw item containing pqid. Will select the item based on the row id.
    • next_in_order (defaults to False): Requires the id attribute. This option tells the SQLiteAckQueue/UniqueAckQ to get the next item based on id, not the first available. This allows the user to get, nack, get, nack and progress down the queue, instead of continuing to get the same nack'd item over again.

raw example:

>>> q.put('val1')
>>> d = q.get(raw=True)
>>> print(d)
>>> {'pqid': 1, 'data': 'val1', 'timestamp': 1616719225.012912}
>>> q.ack(d)

next_in_order example:

>>> q.put("val1")
>>> q.put("val2")
>>> q.put("val3")
>>> item = q.get()
>>> id = q.nack(item)
>>> item = q.get(id=id, next_in_order=True)
>>> print(item)
>>> val2

Note:

  1. The SQLiteAckQueue always uses "auto_commit=True".
  2. The Queue could be set in non-block style, e.g. "SQLiteAckQueue.get(block=False, timeout=5)".
  3. UniqueAckQ only allows for unique items

Example usage with a file based queue

Parameters:

  • path: specifies the directory wher enqueued data persisted.
  • maxsize: indicates the maximum size stored in the queue, if maxsize<=0 the queue is unlimited.
  • chunksize: indicates how many entries should exist in each chunk file on disk. When a all entries in a chunk file was dequeued by get(), the file would be removed from filesystem.
  • tempdir: indicates where temporary files should be stored. The tempdir has to be located on the same disk as the enqueued data in order to obtain atomic operations.
  • serializer: controls how enqueued data is serialized.
  • auto_save: True or False. By default, the change is only persisted when task_done() is called. If autosave is enabled, info data is persisted immediately when get() is called. Adding data to the queue with put() will always persist immediately regardless of this setting.
>>> from persistqueue import Queue
>>> q = Queue("mypath")
>>> q.put('a')
>>> q.put('b')
>>> q.put('c')
>>> q.get()
'a'
>>> q.task_done()

Close the python console, and then we restart the queue from the same path,

>>> from persistqueue import Queue
>>> q = Queue('mypath')
>>> q.get()
'b'
>>> q.task_done()

Example usage with an auto-saving file based queue

Available since: v0.5.0

By default, items added to the queue are persisted during the put() call, and items removed from a queue are only persisted when task_done() is called.

>>> from persistqueue import Queue
>>> q = Queue("mypath")
>>> q.put('a')
>>> q.put('b')
>>> q.get()
'a'
>>> q.get()
'b'

After exiting and restarting the queue from the same path, we see the items remain in the queue, because task_done() wasn't called before.

>>> from persistqueue import Queue
>>> q = Queue('mypath')
>>> q.get()
'a'
>>> q.get()
'b'

This can be advantageous. For example, if your program crashes before finishing processing an item, it will remain in the queue after restarting. You can also spread out the task_done() calls for performance reasons to avoid lots of individual writes.

Using autosave=True on a file based queue will automatically save on every call to get(). Calling task_done() is not necessary, but may still be used to join() against the queue.

>>> from persistqueue import Queue
>>> q = Queue("mypath", autosave=True)
>>> q.put('a')
>>> q.put('b')
>>> q.get()
'a'

After exiting and restarting the queue from the same path, only the second item remains:

>>> from persistqueue import Queue
>>> q = Queue('mypath', autosave=True)
>>> q.get()
'b'

Example usage with a SQLite3 based dict

>>> from persisitqueue import PDict
>>> q = PDict("testpath", "testname")
>>> q['key1'] = 123
>>> q['key2'] = 321
>>> q['key1']
123
>>> len(q)
2
>>> del q['key1']
>>> q['key1']
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "persistqueue\pdict.py", line 58, in __getitem__
    raise KeyError('Key: {} not exists.'.format(item))
KeyError: 'Key: key1 not exists.'

Close the console and restart the PDict

>>> from persisitqueue import PDict
>>> q = PDict("testpath", "testname")
>>> q['key2']
321

Multi-thread usage for SQLite3 based queue

from persistqueue import FIFOSQLiteQueue

q = FIFOSQLiteQueue(path="./test", multithreading=True)

def worker():
    while True:
        item = q.get()
        do_work(item)

for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

multi-thread usage for Queue

from persistqueue import Queue

q = Queue()

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done

Example usage with a MySQL based queue

Available since: v0.8.0

>>> import persistqueue
>>> db_conf = {
>>>     "host": "127.0.0.1",
>>>     "user": "user",
>>>     "passwd": "passw0rd",
>>>     "db_name": "testqueue",
>>>     # "name": "",
>>>     "port": 3306
>>> }
>>> q = persistqueue.MySQLQueue(name="testtable", **db_conf)
>>> q.put('str1')
>>> q.put('str2')
>>> q.put('str3')
>>> q.get()
'str1'
>>> del q

Close the console, and then recreate the queue:

>>> import persistqueue
>>> q = persistqueue.MySQLQueue(name="testtable", **db_conf)
>>> q.get()
'str2'
>>>

note

Due to the limitation of file queue described in issue #89, task_done in one thread may acknowledge items in other threads which should not be. Considering the SQLiteAckQueue if you have such requirement.

Serialization via msgpack/cbor/json

  • v0.4.1: Currently only available for file based Queue
  • v0.4.2: Also available for SQLite3 based Queues
>>> from persistqueue
>>> q = persistqueue.Queue('mypath', serializer=persistqueue.serializers.msgpack)
>>> # via cbor2
>>> # q = persistqueue.Queue('mypath', serializer=persistqueue.serializers.cbor2)
>>> # via json
>>> # q = Queue('mypath', serializer=persistqueue.serializers.json)
>>> q.get()
'b'
>>> q.task_done()

Explicit resource reclaim

For some reasons, an application may require explicit reclamation for file handles or sql connections before end of execution. In these cases, user can simply call: .. code-block:: python

q = Queue() # or q = persistqueue.SQLiteQueue('mypath', auto_commit=True) del q

to reclaim related file handles or sql connections.

Tips

task_done is required both for file based queue and SQLite3 based queue (when auto_commit=False) to persist the cursor of next get to the disk.

Performance impact

  • WAL

    Starting on v0.3.2, the persistqueue is leveraging the sqlite3 builtin feature WAL which can improve the performance significantly, a general testing indicates that persistqueue is 2-4 times faster than previous version.

  • auto_commit=False

    Since persistqueue v0.3.0, a new parameter auto_commit is introduced to tweak the performance for sqlite3 based queues as needed. When specify auto_commit=False, user needs to perform queue.task_done() to persist the changes made to the disk since last task_done invocation.

  • pickle protocol selection

    From v0.3.6, the persistqueue will select Protocol version 2 for python2 and Protocol version 4 for python3 respectively. This selection only happens when the directory is not present when initializing the queue.

Tests

persist-queue use tox to trigger tests.

  • Unit test
tox -e <PYTHON_VERSION>

Available <PYTHON_VERSION>: py27, py34, py35, py36, py37

  • PEP8 check
tox -e pep8

pyenv is usually a helpful tool to manage multiple versions of Python.

Caution

Currently, the atomic operation is supported on Windows while still in experimental, That's saying, the data in persistqueue.Queue could be in unreadable state when an incidental failure occurs during Queue.task_done.

DO NOT put any critical data on persistqueue.queue on Windows.

Contribution

Simply fork this repo and send PR for your code change(also tests to cover your change), remember to give a title and description of your PR. I am willing to enhance this project with you :).

License

BSD

Contributors

Contributors

FAQ

  • sqlite3.OperationalError: database is locked is raised.

persistqueue open 2 connections for the db if multithreading=True, the SQLite database is locked until that transaction is committed. The timeout parameter specifies how long the connection should wait for the lock to go away until raising an exception. Default time is 10, increase timeout when creating the queue if above error occurs.

  • sqlite3 based queues are not thread-safe.

The sqlite3 queues are heavily tested under multi-threading environment, if you find it's not thread-safe, please make sure you set the multithreading=True when initializing the queue before submitting new issue:).

persist-queue's People

Contributors

alefnula avatar andyman1 avatar bierus avatar brendan-simon-indt avatar delica1 avatar dexter-xiong avatar eharney avatar everwinter23 avatar imidoriya avatar jthacker avatar mo-pyy avatar murray-liang avatar occoder avatar peter-wangxu avatar relud avatar sgloutnikov avatar snyk-bot avatar teunis90 avatar yomguithereal avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

persist-queue's Issues

Request to fork and use persist-queue in task queue implementation.

I've been working on a similar project as this to create a task queue based on sqlite3, i found your project actually covers all my use cases for the queue part and don't want to re-implement largely the same functionality.

I want to ask permission to fork your repo and start integrating the server and workers component.

Bug happens when storing queue in different file system

Hello, I got errors when storing queue data in different file system (/tmp/ and my home folder are in different disks).

  • Lib version persist-queue 0.4.0
  • Code (filename pq.py):
import persistqueue

if __name__ == "__main__":
    pq = persistqueue.Queue('somewhere')
    pq.put(444)
  • Exception trace
Traceback (most recent call last):
  File "pq.py", line 5, in <module>
    pq.put(444)
  File "/home/dat/Workspace/ulti_monitor/venv/lib/python3.5/site-packages/persistqueue/queue.py", line 111, in put
    self._put(item)
  File "/home/dat/Workspace/ulti_monitor/venv/lib/python3.5/site-packages/persistqueue/queue.py", line 129, in _put
    self._saveinfo()
  File "/home/dat/Workspace/ulti_monitor/venv/lib/python3.5/site-packages/persistqueue/queue.py", line 223, in _saveinfo
    os.rename(tmpfn, self._infopath())
OSError: [Errno 18] Invalid cross-device link: '/tmp/tmpcq_fecb_' -> 'somewhere/info'

_qsize() discrepancy with multiple queue object sharing same dir

Not sure if it's a bug or mis-usage. In the case that two queue object is defined with same path, e.g.

q1 = Queue('path1')
q2 = Queue('path1')
q1.put(1)
q2.put(2)

Both q1.qsize() and q2.qsize() will be equal to 1, instead of 2. Because qsize() is calling _qsize(), which directly pull size from member variable self.info['size'], hence won't be aware the change from another object.

Implement AckQueue

I want to try implementing AckQueue with raw file storage from Queue and ack support fromSQLiteAckQueue.

Motivation: I need to reliably retry if I get transient exceptions when processing my queue, which is implemented most directly with ack support. I also need high performance, and the raw file storage in Queue performs faster than SQLiteQueue for me.

Failed to persist data when specifying a path on a different device

We found some failure in OpenStack tests. In our environment, the Cinder service is running inside a docker container.

[root@samctl0 test]# tempest run --regex VolumesSnapshotTestJSON
......
{0} tearDownClass (tempest.api.volume.test_volumes_snapshots.VolumesSnapshotTestJSON) [0.000000s] ...
FAILED

Captured traceback:
~~~~~~~~~~~~~~~~~~~
    Traceback (most recent call last):
      File "/usr/lib/python2.7/site-packages/tempest/test.py", line 224, in tearDownClass
        six.reraise(etype, value, trace)
      File "/usr/lib/python2.7/site-packages/tempest/test.py", line 196, in tearDownClass
        teardown()
      File "/usr/lib/python2.7/site-packages/tempest/test.py", line 562, in resource_cleanup
        raise testtools.MultipleExceptions(*cleanup_errors)
    testtools.runtest.MultipleExceptions: (<class 'tempest.lib.exceptions.DeleteErrorException'>, Resource 9ea09b32-6189-4701-823d-1dd392c10f87 failed to delete and is in ERROR status, <traceback object at 0x7f4c712114d0>)

Cinder logs showed :

2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server   File "/usr/lib/python2.7/site-packages/cinder/volume/drivers/dell_emc/vnx/client.py", line 170, in delay_delete_lun
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server     self.queue.put(self.vnx.delete_lun, name=name)
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server   File "/usr/lib/python2.7/site-packages/storops/lib/tasks.py", line 47, in put
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server     self._q.put_nowait(item)
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server   File "/usr/lib/python2.7/site-packages/persistqueue/queue.py", line 124, in put_nowait
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server     return self.put(item, False)
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server   File "/usr/lib/python2.7/site-packages/persistqueue/queue.py", line 103, in put
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server     self._put(item)
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server   File "/usr/lib/python2.7/site-packages/persistqueue/queue.py", line 121, in _put
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server     self._saveinfo()
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server   File "/usr/lib/python2.7/site-packages/persistqueue/queue.py", line 216, in _saveinfo
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server     os.rename(tmpfn, self._infopath())
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server OSError: [Errno 18] Invalid cross-device link
2018-09-21 01:56:51.396 31 ERROR oslo_messaging.rpc.server 

From above log, tmpfn and self._infopath() passed to os.rename are two files on two devices.
In this case, tmpfn = /tmp/xxxxx, and self._infopath() = /var/lib/cinder/xxxxx

Inside docker, /tmp is an overlay filesystem, and /var/lib/cinder is on device /dev/sda2.

File-based queue is unreadable after system hard reset

Hello. I'm using the persist-queue library for storing python-objects locally in a file and then interact with them via multiple threads.
It works great for our purposes so far. However, in our current conditions, there are sometimes sudden system poweroffs / hard reboots, which sometimes happen exactly during the writing process to the local persistent-queue file.
This leads to the following content inside the file queue:

...
sb.(iobject.object
Object
p0
(dp1
S'FIELD_1'
p2
I0
sS'FIELD_2'
I0
sb.^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^
@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^
@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^
@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^
@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@
^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@sb.(iobject.object
Object
p0
(dp1
S'FIELD_1'
p2
I0
sS'FIELD_2'
...

Those @ signs also appear in a system syslog file as well, and indicate to a failed writing operation due to a sudden reset.

After the reset, when a program tries to get value from the persisten-queue file, with those @ signs above, the following exception appears:

message = PERSISTENT_QUEUE.get()
File "build/bdist.linux-armv7l/egg/persistqueue/queue.py", line 152, in get
item = self._get()
File "build/bdist.linux-armv7l/egg/persistqueue/queue.py", line 166, in _get
data = pickle.load(self.tailf)
File "/usr/lib/python2.7/pickle.py", line 1378, in load
return Unpickler(file).load()
File "/usr/lib/python2.7/pickle.py", line 858, in load
dispatchkey
KeyError: 'Y'

This makes the entire file queue with all the objects inside completely unreadable. So, the persistent-queue is not quite "persistent" in that sense.

Is there any way to fix or to avoid this issue, specifically for the file-based queue?

Thanks!

Performance Comparison with DiskCache

Hi there -- author of python-diskcache here. I wonder what the performance of these two projects is like. DiskCache implements the collections.deque interface rather than queue.Queue but they are similar enough.

You may be curious to see the diskcache source for some speedup ideas. For example, you could go a bit faster with a higher pickle protocol version.

If you create a benchmark then please share results. I think your file-based Queue implementation may be faster.

Benchmarks are unclear to me

There are no units given at the benchmarks. Is it a time value (e.g. s), i.e. higher values are worse, lower values are better? Or is it some kind of throughput (e.g. Bytes/s), which means higher values are better, lower are worse?

ACK Queue: clear_acked_data() behavior

Playing around with the clear_acked_data() function, it seems to hang on to the last 1000 acked queue items. Why is that? I've already acked the data, yet disk space continued to be used.

Looking at the code in question:

    @sqlbase.with_conditional_transaction
    def clear_acked_data(self):
        sql = """DELETE FROM {table_name}
            WHERE {key_column} IN (
                SELECT _id FROM {table_name} WHERE status = ?
                ORDER BY {key_column} DESC
                LIMIT 1000 OFFSET {max_acked_length}
            )""".format(table_name=self._table_name,
                        key_column=self._key_column,
                        max_acked_length=self._MAX_ACKED_LENGTH)
        return sql, AckStatus.acked

It seems that self._MAX_ACKED_LENGTH is a private member constant. Can this not be made tunable by the user (e.g.. a kwarg in init for the class)?

I opened my resulting sqlite data files and manually ran:

DELETE FROM ack_queue_default WHERE status = 5;
VACUUM;

Which reduced the file size by several GB. Unless there is some edge case, surely you'd want to do something more like this?

    @sqlbase.with_conditional_transaction
    def clear_acked_data(self):
        sql = """DELETE FROM {table_name} WHERE status = ?""".format(table_name=self._table_name)
        return sql, AckStatus.acked

    @sqlbase.with_conditional_transaction
    def shrink_disk_usage(self):
        sql = """VACUUM"""
        return sql, None

`atomic_rename` failure on Windows Python2.7

I found below error during the CI test of storops. It reported by storops appveyor tests: https://ci.appveyor.com/project/emc-openstack/storops/build/job/y6tctpnpe54j4is0

I am just wondering whether you met this before on your Windows Py27 testing?

The version of persist-queue is persist-queue==0.4.1.

___________________ TestPQueue.test_enqueue_expected_error ____________________
args = (<storops_test.lib.test_tasks.TestPQueue testMethod=test_enqueue_expected_error>,)
    @functools.wraps(func)
    @patch(target='storops.vnx.navi_command.'
                  'NaviCommand.execute_naviseccli',
           new=cli.mock_execute)
    def func_wrapper(*args):
>       return func(*args)
storops_test\vnx\cli_mock.py:106: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
storops_test\lib\test_tasks.py:96: in test_enqueue_expected_error
    self.q.put(fake_vnx.delete_hba, hba_uid=uid)
storops\lib\tasks.py:47: in put
    self._q.put_nowait(item)
.tox\py27\lib\site-packages\persistqueue\queue.py:185: in put_nowait
    return self.put(item, False)
.tox\py27\lib\site-packages\persistqueue\queue.py:161: in put
    self._put(item)
.tox\py27\lib\site-packages\persistqueue\queue.py:182: in _put
    self._saveinfo()
.tox\py27\lib\site-packages\persistqueue\queue.py:274: in _saveinfo
    atomic_rename(tmpfn, self._infopath())
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
src = 'c:\\users\\appveyor\\appdata\\local\\temp\\1\\tmperonkh'
dst = 'c:\users\appveyor\appdata\local\temp\1\tmpuqdetsstorops\info'
    def atomic_rename(src, dst):
        try:
            os.replace(src, dst)
        except AttributeError:  # python < 3.3
            import sys
    
            if sys.platform == 'win32':
                import ctypes
    
                if sys.version_info[0] == 2:
                    _str = unicode  # noqa
                    _bytes = str
                else:
                    _str = str
                    _bytes = bytes
    
                if isinstance(src, _str) and isinstance(dst, _str):
                    MoveFileEx = ctypes.windll.kernel32.MoveFileExW
                elif isinstance(src, _bytes) and isinstance(dst, _bytes):
                    MoveFileEx = ctypes.windll.kernel32.MoveFileExA
                else:
>                   raise ValueError("Both args must be bytes or unicode.")
E                   ValueError: Both args must be bytes or unicode.
.tox\py27\lib\site-packages\persistqueue\queue.py:44: ValueError
____________________ TestPQueue.test_enqueue_storops_error ____________________

queue length for SQLiteQueue is incorrect when running in multiple processes

Possibly expected behavior, but I think it's worth reporting, because the queue looks usable otherwise.

The queue size is set only once on queue creation. self.total = self._count(), so if we have a producer in 1 process and a consumer in another process, we end up with size in the negatives.

To reproduce, we need producer and a consumer that's faster than the producer.

# producer process
import persistqueue as Q; q = Q.SQLiteQueue('queue', multithreading=True)
while True: q.put('hi'); time.sleep(0.01)
# consumer process
import persistqueue as Q; q = Q.SQLiteQueue('queue', auto_commit=False, multithreading=True)


while True:
    try:
        q.qsize(), q.get(block=False); q.task_done()
    except persistqueue.exceptions.Empty:
        pass

Calling q._count() returns the correct size, because it hits the DB, of course.

Unique items only (no duplicates)

Not an issue but wasn't sue where to post this. Would it be possible to implement a queue that allows no duplicate items? Kind of like a set. On second thought it could be an option for the existing queue implementation too. I would try to add it locally for me but not sure how to check if an item already exists in the queue!

Logger in SQLiteAckQueue produces UnicodeDecodeError as it tries to print item as a string (Python 2.7)

I stumbled upon a UnicodeDecodeError: 'ascii' codec can't decode byte... from the logger with certain values in SQLiteAckQueue + Python 2.7. After some investigation, it turns out that the problem lies with log.warning("Can't find item %s from unack cache", item) in combination with from __future__ import unicode_literals on Python 2.7.

Possible solutions:

  1. Remove from __future__ import unicode_literals
  2. Don't log item at all, but acknowledge the warning.

I am personally more in favor of the second approach as I don't think it is safe to be logging items in this situation. Maybe they contain sensitive information, can be very large, etc.

  • Generally how to trigger the exception:
from __future__ import unicode_literals
import logging
import struct

foo = struct.pack("i", 255)
logging.warn("%s", foo)
  • Example in SQLiteAckQueue:
q = SQLiteAckQueue('testdata', auto_resume=True)
inflight_map = dict()

# Generate some messages.
q.put('test')
q.put(struct.pack("i", 255))
q.put('test2')

# Publish message over network. Store 'in flight' messages
for id in range(3):
    foo = q.get()
    inflight_map[id] = foo

# Some time passes and no acknowledgement. Republish them.
time.sleep(1)
q.resume_unack_tasks()
for id in range(4, 7):
    foo = q.get()
    inflight_map[id] = foo

# Some more time passes, and server confirms original messages. Acknowledge them.
# _unack_cache now does not contain the items, and triggers the log with the printing of item as a string.
time.sleep(1)
for id in range(3):
    bar = inflight_map[id]
    q.ack(bar)

Request for file name api for SQLiteQueue

I love this project and would like to request an option to set the db file name as in my (our) projects we have multiple persistent queues and would like to keep them all in the same folder for simplicity.

Issues where persistqueue is not thread safe

Hi, I am using Persist-Queue in a multi-threaded environment where queue objects (UniqueQ) are being added and removed multiple times per second. Sometimes, I am seeing different threads obtain the same Queue object (see below):

2018-05-23 19:16:16,960 DEBUG    [root] Processing /data/x/x/x/ready/x-csv.tbz2
2018-05-23 19:16:16,960 DEBUG    [root] Processing /data/x/x/x/ready/x-csv.tbz2

As you can see, they are pulling from the same queue but getting the same file (sanitized paths, of course).

Here's how I am creating my queue object:

q = persistqueue.UniqueQ(queue_path, auto_commit=True)

Using:

file_to_process = queue.get()

persist-queue==0.3.5

ACK Queue: readme example for SQLiteAckQueue missing clear_acked_data()

Unless I'm missing something, it seems the ACKQueue does not clean up after its self when you ACK the items. I discovered this function by looking over the unit tests. Is it expected that users call this function manually? It seems that clear_acked_data() should always be called after ackq.ack(item) if you don't want the sqlite DB to grow unbounded?

Feature request: peek head item

Hi. It'd be handy to be able to peek the head item in the queue so you can deal with it and only pop off the queue when you've finished.

The value of a persistent queue is lessened without this feature. For example, if the reader crashes while uploading the thing it read into the cloud, or the cloud service is offline so it fails to upload, etc. I believe this is how more monolithic stuff like Kafka does things - you commit when you've finished consuming so Kafka knows it can move on.

Queue stuck on same item after each get()

I am having this queue running for over 200 instances of my application.
I checked the logs of one instance and I saw itemX was inserted once.

Latter I get the item:
Globals.validated_profiles_queue.get(block=False)
this will return me "shahar.ha" which is itemX.
Then each time I do "get" I will find "shahar.ha" as the item~~!!!~~

The data files:

  • IN COMMENT

Please assist.

OS: Windows 7
NOT multithreded
Simple put and get.

OSError when /tmp and actual destination are different devices

Hi,

If both /tmp and the directory holding the queue are located on separate devices you get following error:

Traceback (most recent call last):
  File "push.py", line 51, in <module>
    main()
  File "push.py", line 47, in main
    q.put(b'a')
  File "/home/smetj/data/python/virtualenv/default/lib/python3.6/site-packages/persistqueue/queue.py", line 107, in put
    self._put(item)
  File "/home/smetj/data/python/virtualenv/default/lib/python3.6/site-packages/persistqueue/queue.py", line 125, in _put
    self._saveinfo()
  File "/home/smetj/data/python/virtualenv/default/lib/python3.6/site-packages/persistqueue/queue.py", line 219, in _saveinfo
    os.rename(tmpfn, self._infopath())
OSError: [Errno 18] Invalid cross-device link: '/tmp/tmp9r_95kn4' -> 'blah/info'

in-memory queue is broken

the multi-threading tests are consistently crash the python when using memory queue.

Need investigate.

Sqlite3 database self-destructs if app aborted with ^C

scenario: using persistqueue.SQLiteAckQueue, a few messages in queue but not yet consumed, process is terminated with SIGTERM or ^C. Subsequent attempts to run the app and use .get() to pull a message off the queue are met with the exception OperationalError: Could not decode to UTF-8 column 'data' with text '?}q' which is because the sqlite3 database contains one or more rows with invalid data.

My quick and dirty solution:

--- Delete all the rows where the data field is less than 10 characters. My normal payload length is over 250 bytes so this is very effective at blowing away those rows with corrupted data. 
CREATE VIEW view_lengthchecker AS select *,LENGTH(data) 'ld' from ack_queue_default WHERE ld < 10;
DELETE FROM ack_queue_default WHERE _id IN (SELECT _id FROM view_lengthchecker);

task_done not behaving as I would expect

I've been playing with both the file and sqlite based queues. My understanding with both is that you can put items to the queue, you can get items, but only when you do a 'task_done' (per item in the case of the file based queue or once in the case of the sqlite based queue) does the item get removed. I could therefore 'get' what I like from the queue, but if I never did a task_done, I could kill my app, restart and all the items would still be there. (i.e. all the items that were put there originally) My experience of putting 10000 items and doing some 'get's without 'task_done' and then restarting is thus:

Queue: I'll get something like the following when I restart:
$python streamer.py
Traceback (most recent call last):
...
File ".../lib/python2.7/site-packages/persistqueue/queue.py", line 57, in init
self.tailf = self._openchunk(tnum)
File ".../lib/python2.7/site-packages/persistqueue/queue.py", line 192, in _openchunk
return open(self._qfile(number), mode)
IOError: [Errno 2] No such file or directory: '/var/tmp/queue/q00005'

SQLiteQueue: I initialise the queue with auto_commit=False, I consume many items and even though I never call task_done, the items I 'get' are no longer on the queue when I restart the script. (I would expect them all to still be there)

Please advise.

File based queue: how to clean old files?

I am using a file based queue and all is working as intended, except...

I would expect that after issuing queue.task_done() the file on disk would shrink, in order to reflect the actual size of the queue and, most important, to avoid files growing unbounded.

I missed to find a solution to this issue; someone can shed a light please?
Thanks!

Queue size

Hi,

Thank you for this great contribution! I was wondering whether there is any way to limit the queue size?

Global `enable_callback_tracebacks` call prevents importing under pypy3

I have a bunch of code that operates under both python3 and pypy3.

Pypy has a complete sqlite3 module, but they appear to not have implemented the non-DBAPI call enable_callback_tracebacks(). Currently, this is called unconditionally in multiple places on import of the persistqueue module.


durr@rwpscrape /m/S/S/ReadableWebProxy> pypy3
Python 3.5.3 (fdd60ed87e94, Apr 24 2018, 06:10:04)
[PyPy 6.0.0 with GCC 6.2.0 20160901] on linux
Type "help", "copyright", "credits" or "license" for more information.
i>>>> import sqlite3
>>>> sqlite3.enable_callback_tracebacks
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: module 'sqlite3' has no attribute 'enable_callback_tracebacks'

Setting aside the fact that this should really probably be called on the construction of a SQLiteQueue object, rather then globally on import (don't run code on import!), this also is apparently basically the one thing preventing the module from functioning under pypy.

I went through and manually edited the files to instead feature:

if not '__pypy__' in sys.builtin_module_names:
    sqlite3.enable_callback_tracebacks(True)

and it fixes the issue (you do also need to import sys).

Can't test if the queue is empty?

Hi,

I like this module a lot, I'm just not getting something -- because this doesn't seem to implement queue.empty() unlike regular Python queue, I'm not sure how best to break out of a while loop, like:

    while True:
        if q_dir.empty():
            break
        else:
            path = q_dir.get()
            # do stuff
            q_dir.task_done()

Assuming I'm overlooking something simple. Thanks!

SQLiteAckQueue file size growth

I am using a SQLiteAckQueue between threads.

Every 1 second, the producer thread reads a JSON file that is ~20K bytes, into a dict, which is put onto the queue, and then issuestask_done().

The consumer thread blocks on get, then processes the queue-item, and when all goes well, calls ack, task_done, and finally clear_acked_data.

After about 8 hours, the queue/database file sizes are:

-rw-r--r-- 1 2005 2005 32899072 Dec 10 15:35 data.db
-rw-r--r-- 1 2005 2005    32768 Dec 10 15:35 data.db-shm
-rw-r--r-- 1 2005 2005  4185952 Dec 10 15:35 data.db-wal

With the .db and .db-wal files continuing to growing over time. Is this expected behavior, or am I doing something wrong?

Prior to discovering clear_acked_data via issues #78 and #77, these files sizes seemed to grow much faster, and eventually my application slowed to a crawl, AFAICT the persist-queue accesses to the database grew to unacceptable latencies.

rollback?

is there an opposite of task_done() where a commit can be aborted?

alternatively, is there a peek() function that will get the next value but not remove it from the queue?
my use case is moving a piece of data to another system, where i only want to remove it from the queue if its successfully persisted to the next stage of processing.

Cannot use persist queue in docker when the persist path is mounted as a volume between host and container

Repeating subject: Cannot use persist queue in docker when the persist path is mounted as a volume between host and container.

The error:

...
2018-07-26T11:44:14.714846510Z   File "/usr/local/lib/python3.7/site-packages/persistqueue/queue.py", line 111, in put
2018-07-26T11:44:14.714850922Z     self._put(item)
2018-07-26T11:44:14.714854968Z   File "/usr/local/lib/python3.7/site-packages/persistqueue/queue.py", line 129, in _put
2018-07-26T11:44:14.714859295Z     self._saveinfo()
2018-07-26T11:44:14.714863277Z   File "/usr/local/lib/python3.7/site-packages/persistqueue/queue.py", line 223, in _saveinfo
2018-07-26T11:44:14.714867563Z     os.rename(tmpfn, self._infopath())
2018-07-26T11:44:14.714871686Z OSError: [Errno 18] Cross-device link: '/tmp/tmpki1agaqp' -> '<mounted_path>/incoming_requests_queue/info'

Thanks!

Using task_done() in multiple threads

I'd like to use Queue to store items to be processed by threads. However, if one of the items fails to get processed (and task_done is hence not called) it's still possible that the item is removed from the queue persistently (whereas one would expect it not to be, as is usual behaviour).

Example:

import threading
import time

from persistqueue import Queue

q = Queue("testq")


def worker1():
    print("getting from worker1")
    x = q.get()
    print("got", x, "from worker1")
    # processing goes here ... takes some time
    time.sleep(2)
    try:
        assert False, "something went wrong"
        q.task_done()
    except:
        print("something went wrong with worker1 in processing", x, "so not calling task_done")


def worker2():
    time.sleep(1)
    print("getting from worker2")
    x = q.get()
    print("got", x, "from worker2")
    # processing would happen here - but happens quicker than task1
    print("finished processing", x, "from worker2 so calling task_done")
    q.task_done()
    print("called task_done from worker2")


if __name__ == "__main__":

    q.put("a")
    q.put("b")

    t1 = threading.Thread(target=worker1)
    t1.start()
    t2 = threading.Thread(target=worker2)
    t2.start()
    t1.join()
    t2.join()
    print("reloading q")
    del q
    q = Queue("testq")
    print("qsize", q.qsize())

Output:

getting from worker1
got a from worker1
getting from worker2
got b from worker2
finished processing b from worker2 so calling task_done
called task_done from worker2
something went wrong with worker1 in processing a so not calling task_done
reloading q
qsize 0

As you can see, 'a' was permanently removed, even though task_done "wasn't" called. In other words, I'd expect to see qsize 1 as the output. Is there a way to achieve this, i.e. task_done only completes a specific task, not all tasks in all threads?

Bonus question: how do I also add 'a' back onto the in-memory queue (ignoring persistence)? I.e. the equivalent of SQLiteAckQueue.nack? The only way I see how would be reloading the queue from disk (in which case the get wouldn't have persisted) but this seems messy.

(Also, yes, I know of the SQLiteAckQueue which seems well-suited, but I'd prefer to use plain files if possible.)

In-Memory Database Issues

Cool, project! I've incorporated it into changeme and was looking at switching the disk-based FIFO queue to be an in-memory queue. It looks like you've got some code indicating you're interested in supporting an in-memory database but when I tried it out, I have run into a few bugs:

:memory: Directory Gets Created

The first issue I noticed was that when specifying :memory: as a path the code actually creates that directory. Test code:

>>> from persistqueue import FIFOSQLiteQueue
>>> q = FIFOSQLiteQueue(path=":memory:", multithreading=True, name='test')

Results:

# file \:memory\:/
:memory:/: directory

This can be fixed by adding a second condition to line 74 of sqlbase.py:

@@ -71,7 +71,7 @@ class SQLiteBase(object):

     def _init(self):
         """Initialize the tables in DB."""
-        if not os.path.exists(self.path):
+        if not os.path.exists(self.path) and not self.path == self._MEMORY:
             os.makedirs(self.path)
         log.debug('Initializing Sqlite3 Queue with path {}'.format(self.path))

No Such Table

After fixing the :memory: path issue, it looks like the table's not getting created correctly.

>>> from persistqueue import FIFOSQLiteQueue
>>> q = FIFOSQLiteQueue(path=':memory:', multithreading=True, name='test')
>>> q.put('foo')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "persistqueue/sqlqueue.py", line 36, in put
    self._insert_into(obj, _time.time())
  File "persistqueue/sqlbase.py", line 20, in _execute
    obj._putter.execute(stat, param)
sqlite3.OperationalError: no such table: queue_test

I haven't had a chance to track this bug down, but thought I'd let you know.

Cheers,
Zach

How to do vacuum for persistqueue.SQLiteQueue

I want reduce size of file because it's inflated seriously even when queue is empty, and I see no ways to execute SQL 'vacuum' statement to reduce queue size on disk

how I can do that?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.