Code Monkey home page Code Monkey logo

Comments (19)

elawdio avatar elawdio commented on July 24, 2024 1

Hi @stas, as @malthe just mentioned,

  1. I am not sharing 1 DB connection string among all threads, I open a new connection with the DB for each thread (please refer to '_create_queue_instance' function and its usage in my code example).
    So, why should the first thread block the others?

  2. Although my code example doesn't use queue.get(block=False), it shouldn't block because LISTEN is not enabled specifically in my dev environment. But anyways, as mentioned in the previous clause I use a different connection for each thread.

  3. I need the with queue:so in case of failure (of any kind) the item will be returned to the queue.
    e.g: while handling an item the proccess is shutdown - by using with queue: the item will not be lost.

from pq.

stas avatar stas commented on July 24, 2024

This looks more like a question, @elawdio please check the testcase for an example how to handle the jobs in parallel:
https://github.com/malthe/pq/blob/master/pq/tests.py#L326

Also consider using a connection pooler if you're running jobs in parallel (examples also available in the tests).

from pq.

elawdio avatar elawdio commented on July 24, 2024

Thanks for the reply :)
I went through the testcase that you mentioned, but I don't see how it relates to my use case.
The testcase iterates through the queue under one single transaction, so if there's a failure - it will affect all of the items that were pulled during the transaction.
In my use case, I want to open a separate transaction for each item so in case of a failure, only that item will be returned to the queue.

Regarding the connection pooler, it will not make a difference as I establish a new fresh connection with Postgres for each thread. (see the usage of '_create_queue_instance' in my code example)

Am I still missing something?

from pq.

malthe avatar malthe commented on July 24, 2024

There is no table lock. In fact, the pg library uses advisory locks. In theory, one transaction finds the next queue item and holds an advisory lock on just that item until the transaction ends. But last time I checked, the library has some rough edges on transaction management:

  • Documentation could be better
  • There are some warts in the API

from pq.

elawdio avatar elawdio commented on July 24, 2024

Exactly, before I started to use this library, I was glad to see that it uses advisory locks, and this is why I was surprised by this code behavior.

Do you have an idea of what can be done in my implementation / pq implementation to make it work?
If it has something to do with pq, I'll be glad to contribute a new version based on your guidance.

from pq.

malthe avatar malthe commented on July 24, 2024

Back when #19 was reported, I had a new look at how the whole thing is set up and tried to implement changes that would better support the presumably common scenario where you pull out an item from the queue, make other database queries in the same transaction and either commit or abort the transaction.

But I struggled to get the test cases to work out and ran out of time.

from pq.

elawdio avatar elawdio commented on July 24, 2024

To be honest, I didn't completely understand from your last comment if I can solve my problem with the current version of pq.
It seems like it can not be done, am I right?

I understand why #19 can be complex, but do you think that my specific problem can be solved easily? If so, as I mentioned before - I'll be glad to help.

from pq.

malthe avatar malthe commented on July 24, 2024

@stas, why did you add LIMIT 2 here: 362681e – ?

It looks like that would limit the possible concurrency to 2.

from pq.

elawdio avatar elawdio commented on July 24, 2024

Thanks for helping me investigating this!

Why would that limit the concurrency to 2? Correct me if I am wrong but the change to LIMIT 2 was made in the 'put' method and not in the 'get'.

The problem that I am raising relates to being able to execute 'get' multiple times concurrently (in multiple threads) where each 'get' executed inside a separated transaction (with queue: ... handle_item_logic(queue.get()) ...), without blocking each other.

from pq.

malthe avatar malthe commented on July 24, 2024

@elawdio – using SKIP LOCKED (a newer feature from PostgreSQL 9.5+), this seems to work correctly (see https://blog.2ndquadrant.com/what-is-select-skip-locked-for-in-postgresql-9-5/).

Your test script now runs as expected. Can you confirm?

from pq.

elawdio avatar elawdio commented on July 24, 2024

It works amazingly! Thank you very much!
When do you think it can be merged so I can use it as a package?

from pq.

stas avatar stas commented on July 24, 2024

Hmm,
@elawdio, I'm still not clear on what you're trying to achieve here. Please correct me if I'm missing out something from below...

First of all, I don't know why you'd want to place a queue that's blocking into a thread with a DB connection which is shared with a bunch of other threads. This is clearly not safe and it seems like that is the reason why your first thread is blocking the other threads (based on your logs, the get_ident() is the same).

Another question that I have is how and why you're fetching the items from the queue. The Queue.get() method, will block by default (and yes, your DB connection will be in use, this is how LISTEN works and It doesn't seem to have anything to do with the transactions), if you want to reuse your connection, consider Queue.get(block=False), you're in an infinite loop anyway.

One more thing, when you do with queue: you're starting a transaction as well. Just removing that, will give different results.


A bit of context, we've been running pq in production scaling over multiple processes with dozens of threads and had no issues around transactions being blocked. My advise would be to follow strictly the best practices re parallelism when designing your queue (as in, use a connections pool, use a separate connections pool inside the job handlers and so on...). What worked for us, is to wrap every queue into a separate process and let it manage a set of threads. Obviously you can use processes all-over, but that will cost you an expensive connection to every process (also more resources and stress on your DB).

from pq.

stas avatar stas commented on July 24, 2024

@stas, why did you add LIMIT 2 here: 362681e – ?

The two items are used to calculate the next_timeout, this way we can look ahead and say, don't bother running another query for this amount of time, since you won't get anything.

from pq.

malthe avatar malthe commented on July 24, 2024

@stas – he's not sharing the connection, each thread opens its own connection.

Which means that there's something incorrect about our tests because they didn't pick this up.

from pq.

malthe avatar malthe commented on July 24, 2024

@stas – re LIMIT 2 – gotcha. That's also supported by the code in this PR.

from pq.

stas avatar stas commented on July 24, 2024

Apologies, indeed, I missed the part where connection is initiated.

I started looking at the locks in the running database. And it looks like there's a race condition or something. There are 28 locks with the query RELEASE SAVEPOINT pq. I still belive that with queue: is the issue here.

Take a look at this query while running your example:

SELECT 
  datname,
  relation,
  mode,
  granted,
  query,
  state,
  pg_locks.pid
FROM pg_stat_activity
JOIN pg_locks on pg_locks.pid = pg_stat_activity.pid
WHERE state != 'idle'
AND query NOT LIKE '% FROM pg_stat_activity %' 
;

It would be great if we could find a technical explanation of the current use-case. Though again, I doubt running pq in such a setup makes any sense (one connection per queue in a thread). Threads are just a pain to manage.

from pq.

malthe avatar malthe commented on July 24, 2024

@stas, well truth be told, that's the whole point of pq – to be used in a concurrent scenario.

from pq.

elawdio avatar elawdio commented on July 24, 2024

Can I kindly ask you when are you going to release a new version of pq including the skip locked change?

from pq.

malthe avatar malthe commented on July 24, 2024

This has now been fixed as is available in release 1.7.0.

from pq.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.