Code Monkey home page Code Monkey logo

celery-pool-asyncio's People

Contributors

alanjds avatar kai3341 avatar pyup-bot avatar sbneto avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

celery-pool-asyncio's Issues

Celery 5 Compatibility

Hello,

I am noticing multiple errors when using this plugin with Celery 5. Is there any plans to support it in the future?

Celery group tasks

  • Celery Executor version: 4.4.6
  • Python version: 3.8
  • Operating System: Ubuntu 19.10

Description

Hi, I would like to ask how celery tasks group works and does it work? I tried to apply groups, but unfortunately, nothing worked. Sample code and errors are attached.

What I Did

tasks = await group(tasks).delay()
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/celery_pool_asyncio/tracer.py", line 197, in trace_task
R = retval = coro_task.result()
File "/back/core/tasks/tasks.py", line 40, in tasks
tasks = await group(tasks).delay()
TypeError: object GroupResult can't be used in 'await' expression

How got celery task result?

  • Celery Executor version: celery[redis]==4.3.1
  • Python version:Python 3.7
  • Operating System:macOS mojave

Description

I want to using this library in Sanic for asyncio.

I used redis result backend like this.

app = Celery("task", broker="amqp://guest:guest@localhost", backend="redis://localhost:6379/0")

My celery task.py

@app.task(name="test")
async def my_task():
    print("HERE")
    await asyncio.sleep(5)
    print("HERE")

    return "HERE"


async def main():
    async_result = await my_task.delay()
    return await async_result.get()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    print(loop.run_until_complete(main()))

but It's raise TypeError like this.

Traceback (most recent call last):
  File "<input>", line 1, in <module>
  File "/Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile
    pydev_imports.execfile(filename, global_vars, local_vars)  # execute the script
  File "/Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
    exec(compile(contents+"\n", file, 'exec'), glob, loc)
  File "/Users/<project>/core/task.py", line 30, in <module>
    print(loop.run_until_complete(main()))
  File "/Users/gim-uichan/.pyenv/versions/3.7.1/lib/python3.7/asyncio/base_events.py", line 573, in run_until_complete
    return future.result()
  File "/Users/<project>/core/task.py", line 25, in main
    return await async_result.get()
  File "/Users/<project>/worker-venv/lib/python3.7/site-packages/celery_pool_asyncio/asynchronous.py", line 33, in wait_for_pending
    async for _ in self._wait_for_pending(result, **kwargs):
  File "/Users/<project>/worker-venv/lib/python3.7/site-packages/celery_pool_asyncio/asynchronous.py", line 17, in _wait_for_pending
    on_interval=on_interval):
  File "/Users/<project>/worker-venv/lib/python3.7/site-packages/celery_pool_asyncio/asynchronous.py", line 42, in drain_events_until
    on_interval=on_interval,
  File "/Users/<project>/worker-venv/lib/python3.7/site-packages/celery_pool_asyncio/drainer.py", line 23, in drain_events_until
    yield await self.wait_for(p, wait, timeout=1)
  File "/Users/<project>/worker-venv/lib/python3.7/site-packages/celery_pool_asyncio/drainer.py", line 32, in wait_for
    async for _ in wait(timeout=timeout):
TypeError: 'async for' requires an object with __aiter__ method, got NoneType

Can I get celery result using this library?

If don't setting celery result backend. It raise NotImplementError like this.

NotImplementedError: No result backend is configured.
Please see the documentation for more information.

I red your example.
but example is not working too..

What I Did

Tell me how do I get celery task result.

Running a group signature

  • Celery Executor version: celery 4.3.0
  • Python version: 3.8
  • Operating System: Ubuntu 20.04

Description

Im trying to use group to execute a list of subtasks inside of a celery task.

What I Did

I tried lots of variants in order to catch the result from the group. Not sure if I am doing something wrong or not, but anyway, group execution doesnt work. I saw on previous issues that this was fixed but its not working for me; not sure if it is because i am not doing it correctly or if its not fixed.

tasks = [task.s(var1, var2) for var1 in vars1] group = group(tasks) group_result = addr_group().get()

and then getting:
TypeError: 'coroutine' object is not subscriptable

I tried using await, using apply_async, using delay, using a combination of all of them (await and apply_async), even iterating over the AsyncResult objects generated by group and trying to fetch the result using .get(), none of them worked. Its not allowed to use await, and everytime i try to get a result from the GroupResult or a single AsyncResult I got the error I reported above.

Auto retry on exception feature of celery not working.

  • Celery Executor version: 5.2.7
  • Python version: 3.11.2
  • Operating System: python:3.11-slim (debian)

Description

Trying to use the auto retry on exception feature of celery tasks. I've tried to get it to work every way listed in the celery docs and it always fails.
Docs link

What I Did

class RetryTest(celery.Task):
    autoretry_for = ( RuntimeError,)
    max_retries = 5
    retry_backoff = True
    retry_backoff_max = 60
    retry_jitter = True


@celery_app.task(name="task_retrytest", bind=True, base=RetryTest)
def task_retrytest(self: RetryTest):
    raise RuntimeError("test")

Here is the trace from the worker, Sorry I don't have a better way to parse this out from the docker logs.

celeryworker  | [2023-04-05 13:17:05,970: INFO/SpawnProcess-148] Task task_retrytest[6a08dbdc-76b2-42d9-8b6f-847183ff0936] received
celeryworker  | [2023-04-05 13:17:05,988: WARNING/ForkPoolWorker-6] --- Logging error ---
celeryworker  | [2023-04-05 13:17:05,990: WARNING/ForkPoolWorker-6] Traceback (most recent call last):
celeryworker  | [2023-04-05 13:17:05,991: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery_aio_pool/tracer.py", line 237, in trace_task
celeryworker  |     R = retval = AsyncIOPool.run_in_pool(fun, *args, **kwargs)
celeryworker  |                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:05,991: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery_aio_pool/pool.py", line 222, in run_in_pool
celeryworker  |     return worker_pool.run(
celeryworker  |            ^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:05,991: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery_aio_pool/pool.py", line 203, in run
celeryworker  |     raise error
celeryworker  | [2023-04-05 13:17:05,991: WARNING/ForkPoolWorker-6]   File "/usr/local/lib/python3.11/asyncio/threads.py", line 25, in to_thread
celeryworker  |     return await loop.run_in_executor(None, func_call)
celeryworker  |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:05,992: WARNING/ForkPoolWorker-6]   File "/usr/local/lib/python3.11/concurrent/futures/thread.py", line 58, in run
celeryworker  |     result = self.fn(*self.args, **self.kwargs)
celeryworker  |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:05,992: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/app/task.py", line 392, in __call__
celeryworker  |     return self.run(*args, **kwargs)
celeryworker  |            ^^^^^^^^^^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:05,993: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/app/autoretry.py", line 54, in run
celeryworker  |     ret = task.retry(exc=exc, **retry_kwargs)
celeryworker  |           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:05,993: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/app/task.py", line 701, in retry
celeryworker  |     raise_with_context(exc or Retry('Task can be retried', None))
celeryworker  | [2023-04-05 13:17:05,993: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/app/autoretry.py", line 34, in run
celeryworker  |     return task._orig_run(*args, **kwargs)
celeryworker  |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:05,993: WARNING/ForkPoolWorker-6]   File "/app/hawkeye/tasks/amazon/ad/report/advertisedproduct.py", line 60, in task_retrytest
celeryworker  |     raise ADAPIRetriableException("test")
celeryworker  | [2023-04-05 13:17:05,994: WARNING/ForkPoolWorker-6] adapi_pydantic.base.exceptions.ADAPIRetriableException: test
celeryworker  | [2023-04-05 13:17:05,996: WARNING/ForkPoolWorker-6] 
celeryworker  | During handling of the above exception, another exception occurred:
celeryworker  | [2023-04-05 13:17:05,996: WARNING/ForkPoolWorker-6] Traceback (most recent call last):
celeryworker  | [2023-04-05 13:17:05,999: WARNING/ForkPoolWorker-6]   File "/usr/local/lib/python3.11/logging/__init__.py", line 1110, in emit
celeryworker  |     msg = self.format(record)
celeryworker  |           ^^^^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:05,999: WARNING/ForkPoolWorker-6]   File "/usr/local/lib/python3.11/logging/__init__.py", line 953, in format
celeryworker  |     return fmt.format(record)
celeryworker  |            ^^^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:06,000: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/utils/log.py", line 146, in format
celeryworker  |     msg = super().format(record)
celeryworker  |           ^^^^^^^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:06,000: WARNING/ForkPoolWorker-6]   File "/usr/local/lib/python3.11/logging/__init__.py", line 695, in format
celeryworker  |     record.exc_text = self.formatException(record.exc_info)
celeryworker  |                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:06,001: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/utils/log.py", line 142, in formatException
celeryworker  |     r = super().formatException(ei)
celeryworker  |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:06,001: WARNING/ForkPoolWorker-6]   File "/usr/local/lib/python3.11/logging/__init__.py", line 645, in formatException
celeryworker  |     traceback.print_exception(ei[0], ei[1], tb, None, sio)
celeryworker  | [2023-04-05 13:17:06,001: WARNING/ForkPoolWorker-6]   File "/usr/local/lib/python3.11/traceback.py", line 124, in print_exception
celeryworker  |     te = TracebackException(type(value), value, tb, limit=limit, compact=True)
celeryworker  |          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:06,001: WARNING/ForkPoolWorker-6]   File "/usr/local/lib/python3.11/traceback.py", line 690, in __init__
celeryworker  |     self.stack = StackSummary._extract_from_extended_frame_gen(
celeryworker  |                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:06,001: WARNING/ForkPoolWorker-6]   File "/usr/local/lib/python3.11/traceback.py", line 416, in _extract_from_extended_frame_gen
celeryworker  |     for f, (lineno, end_lineno, colno, end_colno) in frame_gen:
celeryworker  | [2023-04-05 13:17:06,002: WARNING/ForkPoolWorker-6]   File "/usr/local/lib/python3.11/traceback.py", line 353, in _walk_tb_with_full_positions
celeryworker  |     positions = _get_code_position(tb.tb_frame.f_code, tb.tb_lasti)
celeryworker  |                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:06,002: WARNING/ForkPoolWorker-6]   File "/usr/local/lib/python3.11/traceback.py", line 366, in _get_code_position
celeryworker  |     positions_gen = code.co_positions()
celeryworker  |                     ^^^^^^^^^^^^^^^^^
celeryworker  | [2023-04-05 13:17:06,002: WARNING/ForkPoolWorker-6] AttributeError: '_Code' object has no attribute 'co_positions'
celeryworker  | [2023-04-05 13:17:06,002: WARNING/ForkPoolWorker-6] Call stack:
celeryworker  | [2023-04-05 13:17:06,009: WARNING/ForkPoolWorker-6]   File "<string>", line 1, in <module>
celeryworker  | [2023-04-05 13:17:06,010: WARNING/ForkPoolWorker-6]   File "/usr/local/lib/python3.11/multiprocessing/spawn.py", line 120, in spawn_main
celeryworker  |     exitcode = _main(fd, parent_sentinel)
celeryworker  | [2023-04-05 13:17:06,010: WARNING/ForkPoolWorker-6]   File "/usr/local/lib/python3.11/multiprocessing/spawn.py", line 133, in _main
celeryworker  |     return self._bootstrap(parent_sentinel)
celeryworker  | [2023-04-05 13:17:06,010: WARNING/ForkPoolWorker-6]   File "/usr/local/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
celeryworker  |     self.run()
celeryworker  | [2023-04-05 13:17:06,010: WARNING/ForkPoolWorker-6]   File "/usr/local/lib/python3.11/multiprocessing/process.py", line 108, in run
celeryworker  |     self._target(*self._args, **self._kwargs)
celeryworker  | [2023-04-05 13:17:06,010: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/watchgod/cli.py", line 51, in run_function
celeryworker  |     func()
celeryworker  | [2023-04-05 13:17:06,010: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/__main__.py", line 15, in main
celeryworker  |     sys.exit(_main())
celeryworker  | [2023-04-05 13:17:06,010: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/bin/celery.py", line 217, in main
celeryworker  |     return celery(auto_envvar_prefix="CELERY")
celeryworker  | [2023-04-05 13:17:06,010: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/click/core.py", line 1130, in __call__
celeryworker  |     return self.main(*args, **kwargs)
celeryworker  | [2023-04-05 13:17:06,010: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/click/core.py", line 1055, in main
celeryworker  |     rv = self.invoke(ctx)
celeryworker  | [2023-04-05 13:17:06,011: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/click/core.py", line 1657, in invoke
celeryworker  |     return _process_result(sub_ctx.command.invoke(sub_ctx))
celeryworker  | [2023-04-05 13:17:06,011: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/click/core.py", line 1404, in invoke
celeryworker  |     return ctx.invoke(self.callback, **ctx.params)
celeryworker  | [2023-04-05 13:17:06,011: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/click/core.py", line 760, in invoke
celeryworker  |     return __callback(*args, **kwargs)
celeryworker  | [2023-04-05 13:17:06,011: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/click/decorators.py", line 26, in new_func
celeryworker  |     return f(get_current_context(), *args, **kwargs)
celeryworker  | [2023-04-05 13:17:06,011: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/bin/base.py", line 134, in caller
celeryworker  |     return f(ctx, *args, **kwargs)
celeryworker  | [2023-04-05 13:17:06,011: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/bin/worker.py", line 351, in worker
celeryworker  |     worker.start()
celeryworker  | [2023-04-05 13:17:06,011: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/worker/worker.py", line 203, in start
celeryworker  |     self.blueprint.start(self)
celeryworker  | [2023-04-05 13:17:06,012: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/bootsteps.py", line 116, in start
celeryworker  |     step.start(parent)
celeryworker  | [2023-04-05 13:17:06,012: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/bootsteps.py", line 365, in start
celeryworker  |     return self.obj.start()
celeryworker  | [2023-04-05 13:17:06,012: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/concurrency/base.py", line 129, in start
celeryworker  |     self.on_start()
celeryworker  | [2023-04-05 13:17:06,012: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/concurrency/prefork.py", line 109, in on_start
celeryworker  |     P = self._pool = Pool(processes=self.limit,
celeryworker  | [2023-04-05 13:17:06,012: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/concurrency/asynpool.py", line 463, in __init__
celeryworker  |     super().__init__(processes, *args, **kwargs)
celeryworker  | [2023-04-05 13:17:06,012: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/billiard/pool.py", line 1046, in __init__
celeryworker  |     self._create_worker_process(i)
celeryworker  | [2023-04-05 13:17:06,012: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/concurrency/asynpool.py", line 480, in _create_worker_process
celeryworker  |     return super()._create_worker_process(i)
celeryworker  | [2023-04-05 13:17:06,012: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/billiard/pool.py", line 1158, in _create_worker_process
celeryworker  |     w.start()
celeryworker  | [2023-04-05 13:17:06,013: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/billiard/process.py", line 124, in start
celeryworker  |     self._popen = self._Popen(self)
celeryworker  | [2023-04-05 13:17:06,013: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/billiard/context.py", line 333, in _Popen
celeryworker  |     return Popen(process_obj)
celeryworker  | [2023-04-05 13:17:06,013: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/billiard/popen_fork.py", line 24, in __init__
celeryworker  |     self._launch(process_obj)
celeryworker  | [2023-04-05 13:17:06,013: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/billiard/popen_fork.py", line 79, in _launch
celeryworker  |     code = process_obj._bootstrap()
celeryworker  | [2023-04-05 13:17:06,013: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/billiard/process.py", line 327, in _bootstrap
celeryworker  |     self.run()
celeryworker  | [2023-04-05 13:17:06,013: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/billiard/process.py", line 114, in run
celeryworker  |     self._target(*self._args, **self._kwargs)
celeryworker  | [2023-04-05 13:17:06,013: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/billiard/pool.py", line 292, in __call__
celeryworker  |     sys.exit(self.workloop(pid=pid))
celeryworker  | [2023-04-05 13:17:06,014: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/billiard/pool.py", line 362, in workloop
celeryworker  |     result = (True, prepare_result(fun(*args, **kwargs)))
celeryworker  | [2023-04-05 13:17:06,014: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/app/trace.py", line 649, in fast_trace_task
celeryworker  |     R, I, T, Rstr = tasks[task].__trace__(
celeryworker  | [2023-04-05 13:17:06,014: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery_aio_pool/tracer.py", line 254, in trace_task
celeryworker  |     I, R, state, retval = on_error(task_request, exc)
celeryworker  | [2023-04-05 13:17:06,014: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery_aio_pool/tracer.py", line 154, in on_error
celeryworker  |     R = I.handle_error_state(
celeryworker  | [2023-04-05 13:17:06,014: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/app/trace.py", line 178, in handle_error_state
celeryworker  |     return {
celeryworker  | [2023-04-05 13:17:06,014: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/app/trace.py", line 237, in handle_failure
celeryworker  |     self._log_error(task, req, einfo)
celeryworker  | [2023-04-05 13:17:06,014: WARNING/ForkPoolWorker-6]   File "/opt/pysetup/.venv/lib/python3.11/site-packages/celery/app/trace.py", line 265, in _log_error
celeryworker  |     logger.log(policy.severity, policy.format.strip(), context,

Signal task_pre_run not working

Hello,

If I make this signal function:

@task_prerun.connect
async def task_pre_run(task_id, task, args, **kwargs):
    print(task_id)

when I schedule a job it hangs forever at running this, I have to kill the celery process to restart

celery.signals does not allow to run corofunctions

This code should work:

import asyncio
from celery_pool_asyncio import pool
from celery.signals import worker_init


@worker_init.connect
def startup_regular(sender, **kwargs):
    print('regular functions works')

@worker_init.connect
async def startup_async(sender, **kwargs):
    await asyncio.sleep(1)
    print('coroutine functions works too')

Celery `redbeat` scheduler compatability

  • Celery Executor version: 4.4.2
  • Python version: 3.8
  • Operating System: Ubuntu 18.0.4

Description

Getting below exception while starting celery beat with --scheduler celery_pool_asyncio:PersistentScheduler and it's not sending task to worker.

Celery beat should start normally and it should send task to workers.

What I Did

I executed below command and got exception.

celery beat -A my.app --scheduler celery_pool_asyncio:PersistentScheduler

erval -> 5.00 seconds (5s)
[2020-05-26 21:57:14,472: DEBUG/MainProcess] Setting default socket timeout to 30
[2020-05-26 21:57:14,473: INFO/MainProcess] beat: Starting...
[2020-05-26 21:57:14,498: DEBUG/MainProcess] Current schedule:

[2020-05-26 21:57:14,499: DEBUG/MainProcess] beat: Ticking with max interval->5.00 minutes
[2020-05-26 21:57:14,500: DEBUG/MainProcess] Using selector: EpollSelector
[2020-05-26 21:57:14,504: WARNING/MainProcess] Traceback (most recent call last):
[2020-05-26 21:57:14,504: WARNING/MainProcess] File "/home/jayant29/anaconda3/envs/trade_env/lib/python3.8/site-packages/celery_pool_asyncio/beat.py", line 23, in Service__async_start
    await self.async_run()
[2020-05-26 21:57:14,504: WARNING/MainProcess] File "/home/jayant29/anaconda3/envs/trade_env/lib/python3.8/site-packages/celery_pool_asyncio/beat.py", line 10, in Service__async_run
    interval = await self.scheduler.tick()
[2020-05-26 21:57:14,505: WARNING/MainProcess] TypeError: object int can't be used in 'await' expression

Sentry SDK compatibility bug

  • Celery Executor version: 4.4.5
  • Python version: 3.8.3
  • Operating System: in-Docker execution
  • celery-pool-asyncio 0.2.0
  • celery-decorator-taskcls 0.1.4
  • Sentry SDK: 0.14.4

Description

Async pool conflicting with Sentry SDK threading while trying to start Celery beat (with --beat command). I am not confident if this error is directly connected with celery-pool-asyncio library, but the bug is exists.

There are also some interesting cases:

  • asyncio pool correctly work with Sentry SDK without beat command
  • asyncio pool correctly work with Sentry SDK with beat command if CeleryIntegration plugin is initialized

So there is only one case, that causes such problem:

  • asyncio pool + celery beat + Sentry SDK without CeleryIntegration plugin

What I Did

Celery command:
celery -A celery_app worker --beat --loglevel=INFO -P celery_pool_asyncio:TaskPool

Traceback:

[2020-06-08 14:37:08,085: INFO/MainProcess] Connected to amqp://**:**@rabbitmq:5672//
[2020-06-08 14:37:08,126: INFO/MainProcess] mingle: searching for neighbors
[2020-06-08 14:37:09,198: INFO/MainProcess] mingle: all alone
[2020-06-08 14:37:09,235: INFO/MainProcess] celery@3491f822d9e5 ready.
[2020-06-08 14:37:12,945: INFO/Beat] beat: Starting...
[2020-06-08 14:37:12,973: WARNING/Beat] Exception in thread
[2020-06-08 14:37:12,974: WARNING/Beat] Thread-1
[2020-06-08 14:37:12,975: WARNING/Beat] :
[2020-06-08 14:37:12,975: WARNING/Beat] Traceback (most recent call last):
[2020-06-08 14:37:12,977: WARNING/Beat] File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
[2020-06-08 14:37:12,978: WARNING/Beat] self.run()
[2020-06-08 14:37:12,978: WARNING/Beat] File "/usr/local/lib/python3.8/site-packages/sentry_sdk/integrations/threading.py", line 69, in run
[2020-06-08 14:37:12,981: WARNING/Beat] reraise(*_capture_exception())
[2020-06-08 14:37:12,981: WARNING/Beat] File "/usr/local/lib/python3.8/site-packages/sentry_sdk/_compat.py", line 57, in reraise
[2020-06-08 14:37:12,982: WARNING/Beat] raise value
[2020-06-08 14:37:12,983: WARNING/Beat] File "/usr/local/lib/python3.8/site-packages/sentry_sdk/integrations/threading.py", line 67, in run
[2020-06-08 14:37:12,984: WARNING/Beat] return old_run_func(self, *a, **kw)
[2020-06-08 14:37:12,984: WARNING/Beat] File "/usr/local/lib/python3.8/threading.py", line 870, in run
[2020-06-08 14:37:12,986: WARNING/Beat] self._target(*self._args, **self._kwargs)
[2020-06-08 14:37:12,987: WARNING/Beat] File "/usr/local/lib/python3.8/asyncio/base_events.py", line 570, in run_forever
[2020-06-08 14:37:12,988: WARNING/Beat] self._run_once()
[2020-06-08 14:37:12,988: WARNING/Beat] File "/usr/local/lib/python3.8/asyncio/base_events.py", line 1823, in _run_once
[2020-06-08 14:37:12,988: WARNING/Beat] event_list = self._selector.select(timeout)
[2020-06-08 14:37:12,990: WARNING/Beat] File "/usr/local/lib/python3.8/selectors.py", line 468, in select
[2020-06-08 14:37:12,991: WARNING/Beat] fd_event_list = self._selector.poll(timeout, max_ev)
[2020-06-08 14:37:12,992: WARNING/Beat] OSError
[2020-06-08 14:37:12,992: WARNING/Beat] :
[2020-06-08 14:37:12,992: WARNING/Beat] [Errno 9] Bad file descriptor

Does not log task result

  • Celery Executor version: 4.3.0 & 4.4.7
  • Python version: 3.8.5
  • Operating System: macOS

Description

Describe what you were trying to get done.
Tell us what happened, what went wrong, and what you expected to happen.

What I Did

celery worker -A worker -l INFO -E -P celery_pool_asyncio:TaskPool
macOS-10.14.6-x86_64-i386-64bit 2020-09-03 14:42:40

[config]
.> app:         __main__:0x1046f0be0
.> transport:   redis://localhost:6379/0
.> results:     redis://localhost:6379/1
.> concurrency: 256 (executors)
.> task events: ON

[queues]
.> celery           exchange=celery(direct) key=celery


[tasks]
  . worker.tasks.test_task

[2020-09-03 14:42:40,574: INFO/MainProcess] Connected to redis://localhost:6379/0
[2020-09-03 14:42:40,584: INFO/MainProcess] mingle: searching for neighbors
[2020-09-03 14:42:41,604: INFO/MainProcess] mingle: all alone
[2020-09-03 14:42:41,612: INFO/MainProcess] [email protected] ready.
[2020-09-03 14:42:42,956: INFO/MainProcess] Received task: worker.tasks.test_task[96f78697-85fd-4c4f-81d2-f6634c5dcac1]

The task was received, but the task result is not logged.

Expected

[2020-09-03 14:48:46,271: INFO/MainProcess] Received task: worker.tasks.test_task[4a1c1cf0-da45-4ca3-baa8-743b1270aeaf]
[2020-09-03 14:48:46,278: INFO/ForkPoolWorker-4] Task worker.tasks.test_task[4a1c1cf0-da45-4ca3-baa8-743b1270aeaf] succeeded in 0.005072645000000264s: 'test task return test'

Fails with python 3.6

  • Celery Executor version:
  • Python version: 3.6.11
  • Operating System: mac

Description

Fails with below error.

Traceback (most recent call last):
  File "/Users/durai/work/venv/harvester/lib/python3.6/site-packages/celery_pool_asyncio/tracer.py", line 184, in trace_task
    coro_task = asyncio.create_task(coro)
AttributeError: module 'asyncio' has no attribute 'create_task'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/durai/work/venv/harvester/lib/python3.6/site-packages/celery_pool_asyncio/tracer.py", line 246, in trace_task
    await coro_utils.await_anyway(waiter_task)
UnboundLocalError: local variable 'waiter_task' referenced before assignment

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/durai/work/venv/harvester/lib/python3.6/site-packages/celery_pool_asyncio/tracer.py", line 324, in trace_task
    await coro_utils.await_anyway(coro_task)
UnboundLocalError: local variable 'coro_task' referenced before assignment

django async views compatibility

there is a report in celery that Django eventlet patch is broken in celery. I would like to explore django 3.1+ async features ompatibilites.

Long running tasks blocking Kombu heartbeats

I used latest commit version of celery-pool-asyncio and [email protected] (latest supported).

Looks like long running tasks blocking Kombu, it don't sending AMQP heatbeats and RabbitMQ kill the connection.

Runtime logs full of

ConnectionResetError: [Errno 54] Connection reset by peer

RuntimeError: await wasn't used with future

  • Celery Executor version: 4.4.7
  • Python version: 3.8
  • Operating System: Ubuntu 18.04

Description

I use this library and sometimes this error appears, I can't figure it out on my own (although I follow all the recommendations from the README). I would really like to solve this problem.

What I Did

Just run task with delay and ...

RuntimeError: await wasn't used with future File "celery_pool_asyncio/tracer.py", line 199, in trace_task R = retval = await coro_utils.send_exception( File "celery_pool_asyncio/coro_utils.py", line 19, in send_exception await coro.throw(exception)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.