Comments (19)
Hi @stas, as @malthe just mentioned,
-
I am not sharing 1 DB connection string among all threads, I open a new connection with the DB for each thread (please refer to '_create_queue_instance' function and its usage in my code example).
So, why should the first thread block the others? -
Although my code example doesn't use
queue.get(block=False)
, it shouldn't block because LISTEN is not enabled specifically in my dev environment. But anyways, as mentioned in the previous clause I use a different connection for each thread. -
I need the
with queue:
so in case of failure (of any kind) the item will be returned to the queue.
e.g: while handling an item the proccess is shutdown - by usingwith queue:
the item will not be lost.
from pq.
This looks more like a question, @elawdio please check the testcase for an example how to handle the jobs in parallel:
https://github.com/malthe/pq/blob/master/pq/tests.py#L326
Also consider using a connection pooler if you're running jobs in parallel (examples also available in the tests).
from pq.
Thanks for the reply :)
I went through the testcase that you mentioned, but I don't see how it relates to my use case.
The testcase iterates through the queue under one single transaction, so if there's a failure - it will affect all of the items that were pulled during the transaction.
In my use case, I want to open a separate transaction for each item so in case of a failure, only that item will be returned to the queue.
Regarding the connection pooler, it will not make a difference as I establish a new fresh connection with Postgres for each thread. (see the usage of '_create_queue_instance' in my code example)
Am I still missing something?
from pq.
There is no table lock. In fact, the pg
library uses advisory locks. In theory, one transaction finds the next queue item and holds an advisory lock on just that item until the transaction ends. But last time I checked, the library has some rough edges on transaction management:
- Documentation could be better
- There are some warts in the API
from pq.
Exactly, before I started to use this library, I was glad to see that it uses advisory locks, and this is why I was surprised by this code behavior.
Do you have an idea of what can be done in my implementation / pq implementation to make it work?
If it has something to do with pq, I'll be glad to contribute a new version based on your guidance.
from pq.
Back when #19 was reported, I had a new look at how the whole thing is set up and tried to implement changes that would better support the presumably common scenario where you pull out an item from the queue, make other database queries in the same transaction and either commit or abort the transaction.
But I struggled to get the test cases to work out and ran out of time.
from pq.
To be honest, I didn't completely understand from your last comment if I can solve my problem with the current version of pq.
It seems like it can not be done, am I right?
I understand why #19 can be complex, but do you think that my specific problem can be solved easily? If so, as I mentioned before - I'll be glad to help.
from pq.
@stas, why did you add LIMIT 2
here: 362681e – ?
It looks like that would limit the possible concurrency to 2.
from pq.
Thanks for helping me investigating this!
Why would that limit the concurrency to 2? Correct me if I am wrong but the change to LIMIT 2 was made in the 'put' method and not in the 'get'.
The problem that I am raising relates to being able to execute 'get' multiple times concurrently (in multiple threads) where each 'get' executed inside a separated transaction (with queue: ... handle_item_logic(queue.get()) ...), without blocking each other.
from pq.
@elawdio – using SKIP LOCKED
(a newer feature from PostgreSQL 9.5+), this seems to work correctly (see https://blog.2ndquadrant.com/what-is-select-skip-locked-for-in-postgresql-9-5/).
Your test script now runs as expected. Can you confirm?
from pq.
It works amazingly! Thank you very much!
When do you think it can be merged so I can use it as a package?
from pq.
Hmm,
@elawdio, I'm still not clear on what you're trying to achieve here. Please correct me if I'm missing out something from below...
First of all, I don't know why you'd want to place a queue that's blocking into a thread with a DB connection which is shared with a bunch of other threads. This is clearly not safe and it seems like that is the reason why your first thread is blocking the other threads (based on your logs, the get_ident()
is the same).
Another question that I have is how and why you're fetching the items from the queue. The Queue.get()
method, will block by default (and yes, your DB connection will be in use, this is how LISTEN
works and It doesn't seem to have anything to do with the transactions), if you want to reuse your connection, consider Queue.get(block=False)
, you're in an infinite loop anyway.
One more thing, when you do with queue:
you're starting a transaction as well. Just removing that, will give different results.
A bit of context, we've been running pq in production scaling over multiple processes with dozens of threads and had no issues around transactions being blocked. My advise would be to follow strictly the best practices re parallelism when designing your queue (as in, use a connections pool, use a separate connections pool inside the job handlers and so on...). What worked for us, is to wrap every queue into a separate process and let it manage a set of threads. Obviously you can use processes all-over, but that will cost you an expensive connection to every process (also more resources and stress on your DB).
from pq.
The two items are used to calculate the next_timeout
, this way we can look ahead and say, don't bother running another query for this amount of time, since you won't get anything.
from pq.
@stas – he's not sharing the connection, each thread opens its own connection.
Which means that there's something incorrect about our tests because they didn't pick this up.
from pq.
@stas – re LIMIT 2
– gotcha. That's also supported by the code in this PR.
from pq.
Apologies, indeed, I missed the part where connection is initiated.
I started looking at the locks in the running database. And it looks like there's a race condition or something. There are 28 locks with the query RELEASE SAVEPOINT pq
. I still belive that with queue:
is the issue here.
Take a look at this query while running your example:
SELECT
datname,
relation,
mode,
granted,
query,
state,
pg_locks.pid
FROM pg_stat_activity
JOIN pg_locks on pg_locks.pid = pg_stat_activity.pid
WHERE state != 'idle'
AND query NOT LIKE '% FROM pg_stat_activity %'
;
It would be great if we could find a technical explanation of the current use-case. Though again, I doubt running pq in such a setup makes any sense (one connection per queue in a thread). Threads are just a pain to manage.
from pq.
@stas, well truth be told, that's the whole point of pq
– to be used in a concurrent scenario.
from pq.
Can I kindly ask you when are you going to release a new version of pq including the skip locked change?
from pq.
This has now been fixed as is available in release 1.7.0.
from pq.
Related Issues (20)
- 1.6? HOT 2
- Support for schema qualified tables HOT 3
- Hardwired json.dumps introduces double encoding problem HOT 7
- connection pool exhausted HOT 3
- DuplicatePreparedStatement error HOT 10
- Prepared statement name not properly escaped HOT 1
- Question: why am I seeing "NOTICE: function pq_notify does not exist" in the logs? HOT 3
- Delete executed tasks?! HOT 1
- Wait on a job? HOT 3
- get() timeout not honoured HOT 1
- Any interest in porting to cockroach HOT 3
- Pipenv using old create.sql HOT 5
- Get id of current task HOT 2
- Performance with a very large queue HOT 9
- lost trigger when setting up multiple queue-tables within the same schema
- Adding recurring tasks? HOT 4
- queue.put() inside a transaction sets enqueued_at to the transaction start time, not the current time HOT 4
- Adding name of the job producer HOT 2
- Dashboard for PQ HOT 3
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from pq.