djangsters / redis-tasks Goto Github PK
View Code? Open in Web Editor NEWredis task runner
Home Page: https://redis-tasks.readthedocs.io/en/latest/
License: GNU Lesser General Public License v3.0
redis task runner
Home Page: https://redis-tasks.readthedocs.io/en/latest/
License: GNU Lesser General Public License v3.0
Just for fun I added time.sleep(1)
in taskwait():
(redis-tasks) guettli@yoga15:~/projects/redis-tasks$ git diff
diff --git a/tests/test_workerprocess.py b/tests/test_workerprocess.py
index 96920a3..463f52f 100644
--- a/tests/test_workerprocess.py
+++ b/tests/test_workerprocess.py
@@ -278,6 +278,7 @@ def test_execute_task(mocker, settings, time_mocker):
def taskwait():
with multiprocessing.connection.Client(os.environ['RT_TEST_SOCKET']) as conn:
conn.send("A")
+ time.sleep(1)
try:
time.sleep(10)
conn.send("C")
This results in this error:
__________________________________________________________________________ test_signal_shutdown_in_task __________________________________________________________________________
suprocess_socket = <multiprocessing.connection.Listener object at 0x7f0230fbd190>
def test_signal_shutdown_in_task(suprocess_socket):
queue = QueueFactory()
task = queue.enqueue_call(taskwait)
wp = WorkerProcess([queue])
process = multiprocessing.Process(target=wp.run)
process.start()
with suprocess_socket.accept() as taskconn:
assert taskconn.poll(1)
assert taskconn.recv() == "A"
os.kill(process.pid, signal.SIGTERM)
assert taskconn.poll(1)
> assert taskconn.recv() == "B"
tests/test_workerprocess.py:310:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/lib/python3.8/multiprocessing/connection.py:250: in recv
buf = self._recv_bytes()
/usr/lib/python3.8/multiprocessing/connection.py:414: in _recv_bytes
buf = self._recv(4)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <multiprocessing.connection.Connection object at 0x7f02319cf160>, size = 4, read = <built-in function read>
def _recv(self, size, read=_read):
buf = io.BytesIO()
handle = self._handle
remaining = size
while remaining > 0:
chunk = read(handle, remaining)
n = len(chunk)
if n == 0:
if remaining == size:
> raise EOFError
E EOFError
/usr/lib/python3.8/multiprocessing/connection.py:383: EOFError
------------------------------------------------------------------------------- Captured log call --------------------------------------------------------------------------------
INFO redis_tasks.task:task.py:145 Task tests.test_workerprocess.taskwait() [2cb18aa6-d9f6-4ebe-8f5a-e118925b7b57] enqueued
---------------------------------------------------------------------------- Captured stderr teardown ----------------------------------------------------------------------------
Process Process-6:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/home/guettli/projects/redis-tasks/redis_tasks/worker_process.py", line 117, in run
self.maybe_shutdown()
File "/home/guettli/projects/redis-tasks/redis_tasks/worker_process.py", line 235, in maybe_shutdown
raise ShutdownRequested()
redis_tasks.worker_process.ShutdownRequested
============================================================================ short test summary info =============================================================================
FAILED tests/test_workerprocess.py::test_signal_shutdown_in_task - EOFError
=
Version:
Python 3.8.5 (default, Jan 27 2021, 15:41:15)
[GCC 9.3.0] on linux
It works on my machine if I use time.sleep(0.001)
, but fails with time.sleep(0.01)
(or higher values)
To be honest, I have not understood test_signal_shutdown_in_task
up to now.
I guess you want the task to be able to catch the SIGTERM and be able to do something, before the task stops to work.
Is this catching of WorkerShutdown
, then doing some clean-up before finishing a documented behavior? (I like this behavior)
If I run all tests, everything is fine:
(redis-tasks) guettli@yoga15:~/projects/redis-tasks$ pytest
=========================================== test session starts ============================================
platform linux -- Python 3.8.5, pytest-6.2.1, py-1.10.0, pluggy-0.13.1
rootdir: /home/guettli/projects/redis-tasks, configfile: tox.ini, testpaths: tests
plugins: mock-3.3.1, cov-2.9.0
collected 93 items
tests/test_cli.py ... [ 3%]
tests/test_conf.py .... [ 7%]
tests/test_maintenance.py s [ 8%]
tests/test_queue.py ............s [ 22%]
tests/test_registries.py ..... [ 27%]
tests/test_scheduler.py ..........sss... [ 45%]
tests/test_smear_dst.py ... [ 48%]
tests/test_task.py .......... [ 59%]
tests/test_task_execution.py ........... [ 70%]
tests/test_utils.py .... [ 75%]
tests/test_worker.py ...... [ 81%]
tests/test_workerprocess.py .......... [ 92%]
tests/test_workhorse.py .... [ 96%]
tests/contrib/test_graph.py ... [100%]
====================================== 88 passed, 5 skipped in 1.06s =======================================
But if I run one particular test, it fails:
(redis-tasks) guettli@yoga15:~/projects/redis-tasks$ pytest -k test_signal_shutdown_in_task
============================================================================== test session starts ===============================================================================
platform linux -- Python 3.8.5, pytest-6.2.1, py-1.10.0, pluggy-0.13.1
rootdir: /home/guettli/projects/redis-tasks, configfile: tox.ini, testpaths: tests
plugins: mock-3.3.1, cov-2.9.0
collected 93 items / 92 deselected / 1 selected
tests/test_workerprocess.py E [100%]
===================================================================================== ERRORS =====================================================================================
_________________________________________________________________ ERROR at setup of test_signal_shutdown_in_task _________________________________________________________________
tmpdir = local('/tmp/pytest-of-guettli/pytest-5/test_signal_shutdown_in_task0')
@pytest.fixture()
def suprocess_socket(tmpdir):
socket_file = str(tmpdir.join('socket'))
os.environ['RT_TEST_SOCKET'] = socket_file
> with multiprocessing.connection.Listener(socket_file) as listener:
E AttributeError: module 'multiprocessing' has no attribute 'connection'
tests/test_workerprocess.py:293: AttributeError
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.