Code Monkey home page Code Monkey logo

Comments (2)

codemation avatar codemation commented on June 10, 2024 1

@avico78 glad that you are enjoying the project so far, I wish I could update it more as well.

To accomplish what you are describing, should be doable via the following:

server = FastAPI()

@server.on_event('startup')
async def setup():
    worker = await EasyJobsWorker.create(
        server,
        server_secret='abcd1234',
        manager_host='0.0.0.0',
        manager_port=8222,
        manager_secret='abcd1234',
        jobs_queue='ETL',
        max_tasks_per_worker=5
    )
    
    @worker.task(run_after=['task4'])
    async def task1 ():
        print(f"task1  - starting")
        await asyncio.sleep(5)
        print(f"task1 - finished")
        return f"task1!"
        
    @worker.task()
    async def task2():
        print(f"task1 - starting")
        await asyncio.sleep(5)
        print(f"task2 - finished")
        return f"task2!"
        
    @worker.task(run_before=['task2'])
    async def task3():
        print(f"task3 - starting")
        await asyncio.sleep(5)
        print(f"task3 - finished")
        return f"task3!"

    @worker.task(run_before=['task3'])
    async def task4():
        print(f"task4 - starting")
        await asyncio.sleep(5)
        print(f"task4 - finished")
        return f"task4"

run_after - the named task is triggered once the curent task completes.
run_before - the named tasks(and its current depedencies) are run before the current task completes

from easyjobs.

avico78 avatar avico78 commented on June 10, 2024

@codemation - thank much for you answer ,

is both run_after and run_before providing same functionality for triggering dependencies /correct tasks order?
where run_after will trigger the "from current task till" and run_before will trigger "all tasks till current one"?

for "run_after":

I tried triggering the tasks flow as you advised - it is not working ,
it seems each task starting the second one required at least one parameter inside it
and also return statement .
Not sure what is required and why but from some reason - it only work this way ,
Maybe there's reason for that as assumption each task must required some *args/**kwrags to preform tasks pipeline ...
If you could explain more how it works and what is required .

Working example for task1--> task2

    @worker.task(run_after=['task2'],default_args=default_args)
    async def task1(*args,**kwargs):  # actually will work even without it for first task
        func_name = sys._getframe().f_code.co_name
        print(f"{func_name} started")
        await asyncio.sleep(1)
        print(f"{func_name} finished")
        return {'data': None}

    @worker.task()
    async def task2(*args,**kwargs):
        func_name = sys._getframe().f_code.co_name
        print(f"{func_name} started")
        await asyncio.sleep(1)
        print(f"{func_name} finished")
        return {'data': None}

for "run_before":
I tried below tasks setup:
task2 depends on task1
task1 depends on task0

Here I couldn't understand if it must require some parameter(ars/kwargs) as it doesn't work properly
for tasks which has more than one level of dependencies , meaning:

task2--> task1--> task0

where for one level it does work for:

task2 --> task1 && task0

see code for 2 levels(not working):

    @worker.task()
    async def task0(*args,**kwargs):
        func_name = sys._getframe().f_code.co_name
        print(f"{func_name} started")
        await asyncio.sleep(1)
        print(f"{func_name} finished")
        return {'data': None}
        
    @worker.task(run_before=['task0'])
    async def task1(*args,**kwargs):
        func_name = sys._getframe().f_code.co_name
        print(f"{func_name} started")
        await asyncio.sleep(1)
        print(f"{func_name} finished")
        return {'data': None}

    @worker.task(run_before=['task1'] ,default_args=default_args)
    async def task2(*args,**kwargs):
        func_name = sys._getframe().f_code.co_name
        print(f"{func_name} started")
        await asyncio.sleep(1)
        print(f"{func_name} finished")
        return {'data': None}

    @worker.task()
    async def pipeline():
        print(f"pipline started")
        result = await task2(data={'test': 'data'})
        print(f"pipline - result is {result} - finished")
        return result

While running the run_before on one level of dependencies :

task2 -> task1 && task0

Code:

    @worker.task()
    async def task0(*args,**kwargs):
        func_name = sys._getframe().f_code.co_name
        print(f"{func_name} started")
        await asyncio.sleep(1)
        print(f"{func_name} finished")
        return {'data': None}
        
    @worker.task()
    async def task1(*args,**kwargs):
        func_name = sys._getframe().f_code.co_name
        print(f"{func_name} started")
        await asyncio.sleep(1)
        print(f"{func_name} finished")
        return {'data': None}

    @worker.task(run_before=['task1','task0'] ,default_args=default_args)
    async def task2(*args,**kwargs):
        func_name = sys._getframe().f_code.co_name
        print(f"{func_name} started")
        await asyncio.sleep(1)
        print(f"{func_name} finished")
        return {'data': None}

    @worker.task()
    async def pipeline():
        print(f"pipline started")
        result = await task2(data={'test': 'data'})
        print(f"pipline - result is {result} - finished")
        return result

it does work for 1-3 times and then it failed:

05-02 16:40 EasyRpc-server /ws/jobs WARNING  WORKER_MONITOR: working 3 / 5
pipline started
05-02 16:40 EasyRpc-server /ws/jobs WARNING  worker 44090c30_ca1d_11ec_9ae2_8d908ace95c7 - pulled job {'job_id': '766817c0-ca1d-11ec-9ae2-8d908ace95c7', 'namespace': 'ETL', 'node_id': '44090c30_ca1d_11ec_9ae2_8d908ace95c7-REQ-5ea28ca6-ca1d-11ec-9ae2-8d908ace95c7', 'status': 'reserved', 'name': 'task2', 'args': {'args': []}, 'kwargs': {'data': {'test': 'data'}}, 'retry_policy': 'retry_once', 'on_failure': None, 'run_before': {'run_before': ['task1', 'task0']}, 'run_after': {'run_after': []}, 'last_update': '2022-05-02T16:40:43'}
05-02 16:40 EasyRpc-server /ws/jobs WARNING  WORKER_MONITOR: working 4 / 5
task1 started
05-02 16:40 EasyRpc-server /ws/jobs WARNING  worker 44090c30_ca1d_11ec_9ae2_8d908ace95c7 - pulled job {'job_id': '76801348-ca1d-11ec-9ae2-8d908ace95c7', 'namespace': 'ETL', 'node_id': '44090c30_ca1d_11ec_9ae2_8d908ace95c7-REQ-5f20d41c-ca1d-11ec-9ae2-8d908ace95c7', 'status': 'reserved', 'name': 'task0', 'args': {'args': []}, 'kwargs': {}, 'retry_policy': 'retry_once', 'on_failure': None, 'run_before': {'run_before': []}, 'run_after': {'run_after': []}, 'last_update': '2022-05-02T16:40:44'}
05-02 16:40 EasyRpc-server /ws/jobs WARNING  WORKER_MONITOR: working 5 / 5
task0 started
task1 finished
task0 finished
05-02 16:40 EasyRpc-server /ws/jobs WARNING  WORKER_MONITOR: working 4 / 5
pipline started
05-02 16:41 EasyRpc-server /ws/jobs WARNING  worker 44090c30_ca1d_11ec_9ae2_8d908ace95c7 - pulled job {'job_id': '807ad806-ca1d-11ec-9ae2-8d908ace95c7', 'namespace': 'ETL', 'node_id': '44090c30_ca1d_11ec_9ae2_8d908ace95c7-REQ-779a17f6-ca1d-11ec-9ae2-8d908ace95c7', 'status': 'reserved', 'name': 'task2', 'args': {'args': []}, 'kwargs': {'data': {'test': 'data'}}, 'retry_policy': 'retry_once', 'on_failure': None, 'run_before': {'run_before': ['task1', 'task0']}, 'run_after': {'run_after': []}, 'last_update': '2022-05-02T16:41:00'}
05-02 16:41 EasyRpc-server /ws/jobs WARNING  WORKER_MONITOR: working 5 / 5
05-02 16:41 EasyRpc-server /ws/jobs ERROR    error with ws_sender
Traceback (most recent call last):
  File "/testusers/env/test/nadavp/anaconda3/lib/python3.8/site-packages/easyrpc/proxy.py", line 320, in ws_sender
    raise last_exception
easyrpc.exceptions.ServerConnectionError: (ServerConnectionError(...), 'Proxy -> Server connection error: server 0.0.0.0 - port: 8222')
05-02 16:41 EasyRpc-server /ws/jobs WARNING  started connection to server 0.0.0.0:8222
05-02 16:41 asyncio      ERROR    Task was destroyed but it is pending!
task: <Task pending name='Task-32' coro=<EasyRpcProxy.get_upstream_registered_functions() done, defined at /testusers/env/test/nadavp/anaconda3/lib/python3.8/site-packages/easyrpc/proxy.py:224> wait_for=<Future cancelled>>

Also sometimes below error show - couldn't understand why:


Traceback (most recent call last):
  File "/testusers/env/test/nadavp/anaconda3/lib/python3.8/site-packages/easyrpc/proxy.py", line 320, in ws_sender
    raise last_exception
easyrpc.exceptions.ServerConnectionError: (ServerConnectionError(...), 'Proxy -> Server connection error: server 0.0.0.0 - port: 8222')

Suggestion to add:

1.Add an endpoint for getting the tasks workflow tree (even json view) -
this could help visually see the dependencies

2.for reloading changes im running the apis as:

#manager: 
python -m uvicorn --host 0.0.0.0 --port 8222 job_manager:server --reload


#workder
python -m uvicorn --host 0.0.0.0 --port 8221 job_worker:server --workers=1 --reload

but seem very slow to reload all changes,
maybe adding the option to reset & reload would really help - or can suggest any other alternative ?

3.it could be really interesting if it could be more generic solution for orchestrating tasks ,
meaning task can dynamically configure for both run_before/run_after and also for what functionality ,
so user can build up his ETL flow more dynamically with better usability .

I really think u came up with great idea and hopefully continue developing this.

from easyjobs.

Related Issues (2)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.