Comments (2)
@avico78 glad that you are enjoying the project so far, I wish I could update it more as well.
To accomplish what you are describing, should be doable via the following:
server = FastAPI()
@server.on_event('startup')
async def setup():
worker = await EasyJobsWorker.create(
server,
server_secret='abcd1234',
manager_host='0.0.0.0',
manager_port=8222,
manager_secret='abcd1234',
jobs_queue='ETL',
max_tasks_per_worker=5
)
@worker.task(run_after=['task4'])
async def task1 ():
print(f"task1 - starting")
await asyncio.sleep(5)
print(f"task1 - finished")
return f"task1!"
@worker.task()
async def task2():
print(f"task1 - starting")
await asyncio.sleep(5)
print(f"task2 - finished")
return f"task2!"
@worker.task(run_before=['task2'])
async def task3():
print(f"task3 - starting")
await asyncio.sleep(5)
print(f"task3 - finished")
return f"task3!"
@worker.task(run_before=['task3'])
async def task4():
print(f"task4 - starting")
await asyncio.sleep(5)
print(f"task4 - finished")
return f"task4"
run_after
- the named task is triggered once the curent task completes.
run_before
- the named tasks(and its current depedencies) are run before the current task completes
from easyjobs.
@codemation - thank much for you answer ,
is both run_after and run_before providing same functionality for triggering dependencies /correct tasks order?
where run_after will trigger the "from current task till" and run_before will trigger "all tasks till current one"?
for "run_after":
I tried triggering the tasks flow as you advised - it is not working ,
it seems each task starting the second one required at least one parameter inside it
and also return statement .
Not sure what is required and why but from some reason - it only work this way ,
Maybe there's reason for that as assumption each task must required some *args/**kwrags to preform tasks pipeline ...
If you could explain more how it works and what is required .
Working example for task1--> task2
@worker.task(run_after=['task2'],default_args=default_args)
async def task1(*args,**kwargs): # actually will work even without it for first task
func_name = sys._getframe().f_code.co_name
print(f"{func_name} started")
await asyncio.sleep(1)
print(f"{func_name} finished")
return {'data': None}
@worker.task()
async def task2(*args,**kwargs):
func_name = sys._getframe().f_code.co_name
print(f"{func_name} started")
await asyncio.sleep(1)
print(f"{func_name} finished")
return {'data': None}
for "run_before":
I tried below tasks setup:
task2 depends on task1
task1 depends on task0
Here I couldn't understand if it must require some parameter(ars/kwargs) as it doesn't work properly
for tasks which has more than one level of dependencies , meaning:
task2--> task1--> task0
where for one level it does work for:
task2 --> task1 && task0
see code for 2 levels(not working):
@worker.task()
async def task0(*args,**kwargs):
func_name = sys._getframe().f_code.co_name
print(f"{func_name} started")
await asyncio.sleep(1)
print(f"{func_name} finished")
return {'data': None}
@worker.task(run_before=['task0'])
async def task1(*args,**kwargs):
func_name = sys._getframe().f_code.co_name
print(f"{func_name} started")
await asyncio.sleep(1)
print(f"{func_name} finished")
return {'data': None}
@worker.task(run_before=['task1'] ,default_args=default_args)
async def task2(*args,**kwargs):
func_name = sys._getframe().f_code.co_name
print(f"{func_name} started")
await asyncio.sleep(1)
print(f"{func_name} finished")
return {'data': None}
@worker.task()
async def pipeline():
print(f"pipline started")
result = await task2(data={'test': 'data'})
print(f"pipline - result is {result} - finished")
return result
While running the run_before on one level of dependencies :
task2 -> task1 && task0
Code:
@worker.task()
async def task0(*args,**kwargs):
func_name = sys._getframe().f_code.co_name
print(f"{func_name} started")
await asyncio.sleep(1)
print(f"{func_name} finished")
return {'data': None}
@worker.task()
async def task1(*args,**kwargs):
func_name = sys._getframe().f_code.co_name
print(f"{func_name} started")
await asyncio.sleep(1)
print(f"{func_name} finished")
return {'data': None}
@worker.task(run_before=['task1','task0'] ,default_args=default_args)
async def task2(*args,**kwargs):
func_name = sys._getframe().f_code.co_name
print(f"{func_name} started")
await asyncio.sleep(1)
print(f"{func_name} finished")
return {'data': None}
@worker.task()
async def pipeline():
print(f"pipline started")
result = await task2(data={'test': 'data'})
print(f"pipline - result is {result} - finished")
return result
it does work for 1-3 times and then it failed:
05-02 16:40 EasyRpc-server /ws/jobs WARNING WORKER_MONITOR: working 3 / 5
pipline started
05-02 16:40 EasyRpc-server /ws/jobs WARNING worker 44090c30_ca1d_11ec_9ae2_8d908ace95c7 - pulled job {'job_id': '766817c0-ca1d-11ec-9ae2-8d908ace95c7', 'namespace': 'ETL', 'node_id': '44090c30_ca1d_11ec_9ae2_8d908ace95c7-REQ-5ea28ca6-ca1d-11ec-9ae2-8d908ace95c7', 'status': 'reserved', 'name': 'task2', 'args': {'args': []}, 'kwargs': {'data': {'test': 'data'}}, 'retry_policy': 'retry_once', 'on_failure': None, 'run_before': {'run_before': ['task1', 'task0']}, 'run_after': {'run_after': []}, 'last_update': '2022-05-02T16:40:43'}
05-02 16:40 EasyRpc-server /ws/jobs WARNING WORKER_MONITOR: working 4 / 5
task1 started
05-02 16:40 EasyRpc-server /ws/jobs WARNING worker 44090c30_ca1d_11ec_9ae2_8d908ace95c7 - pulled job {'job_id': '76801348-ca1d-11ec-9ae2-8d908ace95c7', 'namespace': 'ETL', 'node_id': '44090c30_ca1d_11ec_9ae2_8d908ace95c7-REQ-5f20d41c-ca1d-11ec-9ae2-8d908ace95c7', 'status': 'reserved', 'name': 'task0', 'args': {'args': []}, 'kwargs': {}, 'retry_policy': 'retry_once', 'on_failure': None, 'run_before': {'run_before': []}, 'run_after': {'run_after': []}, 'last_update': '2022-05-02T16:40:44'}
05-02 16:40 EasyRpc-server /ws/jobs WARNING WORKER_MONITOR: working 5 / 5
task0 started
task1 finished
task0 finished
05-02 16:40 EasyRpc-server /ws/jobs WARNING WORKER_MONITOR: working 4 / 5
pipline started
05-02 16:41 EasyRpc-server /ws/jobs WARNING worker 44090c30_ca1d_11ec_9ae2_8d908ace95c7 - pulled job {'job_id': '807ad806-ca1d-11ec-9ae2-8d908ace95c7', 'namespace': 'ETL', 'node_id': '44090c30_ca1d_11ec_9ae2_8d908ace95c7-REQ-779a17f6-ca1d-11ec-9ae2-8d908ace95c7', 'status': 'reserved', 'name': 'task2', 'args': {'args': []}, 'kwargs': {'data': {'test': 'data'}}, 'retry_policy': 'retry_once', 'on_failure': None, 'run_before': {'run_before': ['task1', 'task0']}, 'run_after': {'run_after': []}, 'last_update': '2022-05-02T16:41:00'}
05-02 16:41 EasyRpc-server /ws/jobs WARNING WORKER_MONITOR: working 5 / 5
05-02 16:41 EasyRpc-server /ws/jobs ERROR error with ws_sender
Traceback (most recent call last):
File "/testusers/env/test/nadavp/anaconda3/lib/python3.8/site-packages/easyrpc/proxy.py", line 320, in ws_sender
raise last_exception
easyrpc.exceptions.ServerConnectionError: (ServerConnectionError(...), 'Proxy -> Server connection error: server 0.0.0.0 - port: 8222')
05-02 16:41 EasyRpc-server /ws/jobs WARNING started connection to server 0.0.0.0:8222
05-02 16:41 asyncio ERROR Task was destroyed but it is pending!
task: <Task pending name='Task-32' coro=<EasyRpcProxy.get_upstream_registered_functions() done, defined at /testusers/env/test/nadavp/anaconda3/lib/python3.8/site-packages/easyrpc/proxy.py:224> wait_for=<Future cancelled>>
Also sometimes below error show - couldn't understand why:
Traceback (most recent call last):
File "/testusers/env/test/nadavp/anaconda3/lib/python3.8/site-packages/easyrpc/proxy.py", line 320, in ws_sender
raise last_exception
easyrpc.exceptions.ServerConnectionError: (ServerConnectionError(...), 'Proxy -> Server connection error: server 0.0.0.0 - port: 8222')
Suggestion to add:
1.Add an endpoint for getting the tasks workflow tree (even json view) -
this could help visually see the dependencies
2.for reloading changes im running the apis as:
#manager:
python -m uvicorn --host 0.0.0.0 --port 8222 job_manager:server --reload
#workder
python -m uvicorn --host 0.0.0.0 --port 8221 job_worker:server --workers=1 --reload
but seem very slow to reload all changes,
maybe adding the option to reset & reload would really help - or can suggest any other alternative ?
3.it could be really interesting if it could be more generic solution for orchestrating tasks ,
meaning task can dynamically configure for both run_before/run_after and also for what functionality ,
so user can build up his ETL flow more dynamically with better usability .
I really think u came up with great idea and hopefully continue developing this.
from easyjobs.
Related Issues (2)
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from easyjobs.