Code Monkey home page Code Monkey logo

Comments (12)

kai3341 avatar kai3341 commented on August 23, 2024 2

@codesutras i think i'll solve this problem

from celery-pool-asyncio.

codesutras avatar codesutras commented on August 23, 2024 1

@kai3341 I've rechecked my code and fond that celery_poop_asyncio is working as expected. and here is Working example of it.

However, What I have observed is that this code is not compatible to work with redbeat scheduler. For the obvious reasons, we should have the capabilities to edit, remove the task in runtime. As redbeat provides those capabilities.

By any chance, are you planning to work on such features, or we would need to get in touch with @sibson to update redbeat source code to make it compatible with the coroutine task?

Meanwhile, I just wanted to express my thanks for the great work you guys have done for the community.

from celery-pool-asyncio.

zikphil avatar zikphil commented on August 23, 2024 1

It might be a hack, but I got it to work dirty by creating this file and using this class as a scheduler:

from celery import beat
from redbeat import RedBeatScheduler
from redbeat.schedulers import get_redis, logger


class AsyncSchedulerMixin:
    async def apply_async(self, entry, producer=None, advance=True, **kwargs):
        # Update time-stamps and run counts before we actually execute,
        # so we have that done if an exception is raised (doesn't schedule
        # forever.)
        entry = self.reserve(entry) if advance else entry
        task = self.app.tasks.get(entry.task)

        try:
            entry_args = [
                v() if isinstance(v, beat.BeatLazyFunc) else v
                for v in (entry.args or [])
            ]
            entry_kwargs = {
                k: v() if isinstance(v, beat.BeatLazyFunc) else v
                for k, v in entry.kwargs.items()
            }

            if task:
                return await task.apply_async(
                    entry_args, entry_kwargs,
                    producer=producer,
                    **entry.options,
                )
            else:
                return await self.send_task(
                    entry.task, entry_args, entry_kwargs,
                    producer=producer,
                    **entry.options,
                )
        except Exception as exc:  # pylint: disable=broad-except
            msg = "Couldn't apply scheduled task {entry.name}: {exc}".format(
                entry=entry,
                exc=exc,
            )
            beat.reraise(
                beat.SchedulingError,
                beat.SchedulingError(msg),
                beat.sys.exc_info()[2]
            )
        finally:
            self._tasks_since_sync += 1
            if self.should_sync():
                self._do_sync()

    async def apply_entry(self, entry, producer=None):
        beat.info(
            'Scheduler: Sending due task %s (%s)',
            entry.name,
            entry.task,
        )

        try:
            coro = self.apply_async(
                entry=entry,
                producer=producer,
                advance=False,
            )
            result = await coro
        except Exception as exc:  # pylint: disable=broad-except
            beat.error(
                'Message Error: %s\n%s',
                exc,
                beat.traceback.format_stack(),
                exc_info=True,
            )
        else:
            beat.debug(
                '%s sent. id->%s',
                entry.task,
                result.id,
            )


class RedBeatAsyncScheduler(AsyncSchedulerMixin, RedBeatScheduler):

    async def maybe_due(self, entry, **kwargs):
        is_due, next_time_to_run = entry.is_due()

        if is_due:
            logger.info('Scheduler: Sending due task %s (%s)', entry.name, entry.task)
            try:
                result = await self.apply_async(entry, **kwargs)
            except Exception as exc:
                logger.exception('Message Error: %s', exc)
            else:
                logger.debug('%s sent. id->%s', entry.task, result.id)
        return next_time_to_run

    async def tick(self, min=min, **kwargs):
        if self.lock:
            logger.debug('beat: Extending lock...')
            get_redis(self.app).pexpire(self.lock_key, int(self.lock_timeout * 1000))

        remaining_times = []
        try:
            for entry in self.schedule.values():
                next_time_to_run = await self.maybe_due(entry, **self._maybe_due_kwargs)
                if next_time_to_run:
                    remaining_times.append(next_time_to_run)
        except RuntimeError:
            logger.debug('beat: RuntimeError', exc_info=True)

        return min(remaining_times + [self.max_interval])

from celery-pool-asyncio.

kai3341 avatar kai3341 commented on August 23, 2024

I can't reproduce this exception. Can you create minimal project with single void task, where this problem will be reproduced?
Are you sure you are running beat on python3.7? I see /lib/python3.8/site-packages/ in the path.
It's wierd thing, but somewhy self.scheduler is not celery_pool_asyncio:PersistentScheduler.
Try to rebuild your virtualenv -- maybe it's cheap solution.

from celery-pool-asyncio.

codesutras avatar codesutras commented on August 23, 2024

I corrected python's version for this problem as 3.8 in my issue description. I'll share small project with a single task soon. Thanks for your quick reply.

from celery-pool-asyncio.

codesutras avatar codesutras commented on August 23, 2024

@codesutras i think i'll solve this problem

Perfect... Thanks a lot for taking it up.

from celery-pool-asyncio.

auvipy avatar auvipy commented on August 23, 2024

this will be a great thing!

from celery-pool-asyncio.

kai3341 avatar kai3341 commented on August 23, 2024

@auvipy now I have some time for it -- i'm on vacation =)
But i'm looking (working on) to issue #22 and I understand how many things require an attention before this pool will be ready to merge into celery upstream =(

from celery-pool-asyncio.

auvipy avatar auvipy commented on August 23, 2024

ohho

from celery-pool-asyncio.

zikphil avatar zikphil commented on August 23, 2024

@kai3341 Can I offer a bounty for this?

from celery-pool-asyncio.

kai3341 avatar kai3341 commented on August 23, 2024

@zikphil great job! Almost ok -- redis IO-bound operations most be wrapped to async/await via asgiref.sync_to_async
I hope i will be available next week

from celery-pool-asyncio.

kai3341 avatar kai3341 commented on August 23, 2024

@zikphil @codesutras please check #28

from celery-pool-asyncio.

Related Issues (16)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.