apache / dolphinscheduler-sdk-python Goto Github PK
View Code? Open in Web Editor NEWApache DolphinScheduler Python API, aka PyDolphinscheduler.
Home Page: https://dolphinscheduler.apache.org/python/main
License: Apache License 2.0
Apache DolphinScheduler Python API, aka PyDolphinscheduler.
Home Page: https://dolphinscheduler.apache.org/python/main
License: Apache License 2.0
I can get the workflow of project through workflow.project, but I am curious about how can I find all workflows given a single project, which means I do not see any related methods or attributes of project in this document. So how can I do that?
There are many convert function in Task class
timeout
timeout_flag
is_cache
resource_list
environment_code
I think we can a better way to deal with it
Support Flink SQL & Flink Stream task
https://github.com/apache/dolphinscheduler-sdk-python/blob/main/examples/yaml_define/Sql.yaml
examples/yaml_define/example_sql.sql
I failed to execute the sql sample. How do I add delimiters for non-query sql;
It looks like example sql.sql There are multiple SQLS in the sql file;
kan read the document said that no specified will automatically parse, in fact, in the test did not specify the separator will report an error;
Looking forward to your reply!!!
The workflow parameter timeout is of type int, but the task parameter timeout is timedelta.
And the parameter timeout of the task cannot support defining tasks in yaml for the current type timedelta.
I plan to set the timeout type to Union [timedelta, int], which not only unifies the timeout type but also conveniently supports defining tasks in Python and YAML
Python API for apache/dolphinscheduler#13194
I pushed the tasks from YAML.
How to create HIVECLI task.
We should only declare tenant once when we create a user instead of each time create workflows,
log:
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling t.createOrUpdateWorkflow. Trace:
py4j.Py4JException: Method createOrUpdateWorkflow([class java.lang.String, class java.lang.String, class java.lang.String, class java.lang.String, class java.lang.String, class java.lang.String, class java.lang.String, class java.lang.Integer, class java.lang.Integer, class java.lang.String, class java.lang.Integer, class java.lang.String, class java.lang.String, null, class java.lang.String]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
cause:
self.gateway.entry_point.createOrUpdateWorkflow() do not have onlinescheduler param
Sometimes we may create a PR base on apache/dolphinscheduler change, but DolphinScheduler-sdk-python only can run on dev head ref commit, we should find out a way to run on specific apache/dolphinscheduler pr to make our CI more smooth. I have a POC in 93e7de4, it work but not pretty.
If we can change the PR number from a comment or a commit message instead of directly change CI constants will be prefect
dolphinscheduler version: 3.1.4
pydolphinscheduler version: 4.0.1
problem:
py4j.protocol.Py4JError: An error occurred while calling t.createOrUpdateWorkflow. Trace:
py4j.Py4JException: Method createOrUpdateWorkflow([class java.lang.String, class java.lang.String, class java.lang.String, class java.lang.String, class java.lang.String, class java.lang.String, class java.lang.String, class java.lang.Integer, class java.lang.Integer, class java.lang.String, class java.lang.Integer, class java.lang.String, class java.lang.String, null, class java.lang.String]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
Hi,
I have installed Apache Dolphin scheduler successfully on ubuntu and able to create shell and http tasks. However when I create a python task and tried to run it it gives me this error /opt/soft/python: No such file or directory .
OS environment: 22.04 (jammy jellyfish)
Python: 3.10.4
I have also installed pydolphinscheduler and tried to run some python code from pydolphinscheduler/examples directory from cli, but facing same issue.
Kindly suggest on how to successfully run a python task in apcahe dolphinscheduler web ui.
For example
from pydolphinscheduler.core import Workflow
to
from pydolphinscheduler import Workflow
currently task. timeout_notify_strategy
not use in our code
3.1.6版本,正常生成工作流,资源列表为空,手动指定资源运行后,任务没有启动,日志显示为“任务实例host为空”
Currently, each time submit code from python api to dolphinscheduler, a new version will be add even though do not change any task, workflow, and relation. It is better to find a way check whether there are any models change to judge add version or not
dolphinscheduler verion: 3.1.4
apapche-dolphinscheduler: 4.0.0
in api server config file application.yaml, the token is set:
in client(win11), the auth_token is set the same:
but the result is : auth error
client log
E:\python_env\mysql\lib\site-packages\pydolphinscheduler\java_gateway.py:316: UserWarning: Using unmatched version of pydolphinscheduler (version 4.0.0) and Java gateway (version unknown) may cause errors. We strongly recommend you to find the matched version (check: https://pypi.org/project/apache-dolphinscheduler)
gateway = GatewayEntryPoint()
Traceback (most recent call last):
File "E:\python_env\mysql\lib\site-packages\py4j\java_gateway.py", line 982, in _get_connection
connection = self.deque.pop()
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "e:\program\shell\test.py", line 11, in <module>
task_parent = Shell(name="task_parent", command="echo hello pydolphinscheduler")
File "E:\python_env\mysql\lib\site-packages\pydolphinscheduler\tasks\shell.py", line 58, in __init__
super().__init__(name, TaskType.SHELL, *args, **kwargs)
File "E:\python_env\mysql\lib\site-packages\pydolphinscheduler\java_gateway.py", line 139, in get_code_and_version
return self.gateway.entry_point.getCodeAndVersion(
File "E:\python_env\mysql\lib\site-packages\py4j\java_gateway.py", line 1321, in __call__
answer = self.gateway_client.send_command(command)
File "E:\python_env\mysql\lib\site-packages\py4j\java_gateway.py", line 1036, in send_command
connection = self._get_connection()
File "E:\python_env\mysql\lib\site-packages\py4j\java_gateway.py", line 984, in _get_connection
connection = self._create_connection()
File "E:\python_env\mysql\lib\site-packages\py4j\java_gateway.py", line 990, in _create_connection
connection.start()
File "E:\python_env\mysql\lib\site-packages\py4j\java_gateway.py", line 1136, in start
self._authenticate_connection()
File "E:\python_env\mysql\lib\site-packages\py4j\java_gateway.py", line 1158, in _authenticate_connection
raise Py4JAuthenticationError(
py4j.protocol.Py4JAuthenticationError: Failed to authenticate with gateway server.
api-server log:
[INFO] 2023-04-21 11:07:22.636 +0800 py4j.GatewayConnection:[227] - Gateway Connection ready to receive messages
[INFO] 2023-04-21 11:07:22.636 +0800 py4j.GatewayServer:[519] - Connection Started
[ERROR] 2023-04-21 11:07:22.637 +0800 py4j.GatewayConnection:[251] - Authentication error.
py4j.Py4JAuthenticationException: Client authentication unsuccessful.
at py4j.commands.AuthCommand.execute(AuthCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:236)
at java.lang.Thread.run(Thread.java:748)
[INFO] 2023-04-21 11:07:22.637 +0800 py4j.GatewayConnection:[164] - Connection Stopped
[INFO] 2023-04-21 11:07:22.662 +0800 py4j.GatewayServer:[519] - Connection Started
[INFO] 2023-04-21 11:07:22.662 +0800 py4j.GatewayConnection:[227] - Gateway Connection ready to receive messages
[ERROR] 2023-04-21 11:07:22.662 +0800 py4j.GatewayConnection:[251] - Authentication error.
py4j.Py4JAuthenticationException: Client authentication unsuccessful.
at py4j.commands.AuthCommand.execute(AuthCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:236)
at java.lang.Thread.run(Thread.java:748)
[INFO] 2023-04-21 11:07:22.663 +0800 py4j.GatewayConnection:[164] - Connection Stopped
I generate a token like: 13cc1637a92432cf06598c7dfe06d2f6 on web site.
And set by environ
os.environ['PYDS_JAVA_GATEWAY_AUTH_TOKEN'] = '13cc1637a92432cf06598c7dfe06d2f6'
Got a auth failed Log:
'2023-02-23 21:00:02,825 [30520] DEBUG [MainThread] py4j.java_gateway java_gateway.py:1206 Command to send: A
13cc1637a92432cf06598c7dfe06d2f6
'
'2023-02-23 21:00:02,862 [30520] DEBUG [MainThread] py4j.java_gateway java_gateway.py:1218 Answer received: !xsAuthentication error: bad auth token received.'
'2023-02-23 21:00:02,862 [30520] ERROR [MainThread] py4j.java_gateway java_gateway.py:1138 Cannot authenticate with gateway server.'
Default Token is OK!
'2023-02-23 21:10:22,701 [23660] DEBUG [MainThread] py4j.java_gateway java_gateway.py:1206 Command to send: A
jwUDzpLsNKEFER4*a8gruBH_GsAurNxU7A@Xc
'
'2023-02-23 21:10:22,934 [23660] DEBUG [MainThread] py4j.java_gateway java_gateway.py:1218 Answer received: !yv'
Any Thing Wrong?
Version Info:
Client side:
Python 3.10.9
apache-dolphinscheduler 4.0.0
OpenJDK 11 (Windows)
Sever side:
OpenJDK 11( yum install java-11-openjdk)
dolphinscheduler 3.1.3
dolphinscheduler-sdk-python/src/pydolphinscheduler/core/workflow.py
Lines 123 to 125 in 1867f1e
we can config alert mechanism in workflow parameter, but currently we do not have doc to desc it
You can though link https://github.com/apache/dolphinscheduler/issues?q=is%3Aopen+is%3Aissue+label%3APython to see all our old issues in repository apache/dolphinscheduler
In #1 and apache/dolphinscheduler#12779, we separate Python API code into this new repository, but some of the exists issues in https://github.com/apache/dolphinscheduler/issues?q=is%3Aopen+is%3Aissue+label%3APython can not migrate them. We discuss to keep them in the repository https://github.com/apache/dolphinscheduler. But all new issue to Python API should in this new repository.
And anyone who are you interested to fix issues https://github.com/apache/dolphinscheduler/issues?q=is%3Aopen+is%3Aissue+label%3APython is welcome
see more detail in mail thread: https://lists.apache.org/thread/4z7l5l54c4d81smjlk1n8nq380p9f0oo
In the dolphinscheduler 3.2.0, TaskDefinition
has two non-null field: isCache
and taskExecuteType
.
If the former is null, workflow cannot be submitted, and if the latter is null, task will be retried with infinite fault-tolerance.
For example
If we define the shell task, and the command like this
task = Shell(name="example", command="${setValue(a=1)}")
Then we can auto add the output local_params {"prop": "a", "direct": "OUT", "type": "VARCHAR", "value": ""}
The Python, SQL, Procedure also can do this
environment = Environment(name="python-numpy", config="export PYTHON_HOME=/xxx/xxx/python3")
task = Python(name="123", definition="import numpy as np", environment_name=environment)
if python-numpy
exist, update the environment, if not create a new environment.
when call python script.py
will raise some log like below
Auth token is default token, highly recommend add a token in production, especially you deploy in public network.
we should add more logging config for it and make it more clear
We can not activate dolphinscheduler resource center in docker because we have to change common.properties. and I try to change them in 34aa71b by java properties but fail, we should activate the resource center to test resource function work
ref: #15 (comment)
our base task support resource threshold such as cpu and memory, we should add them to our base task.
but some of our task do not support them, so maybe we should add a python minix class to handle them
currently like this
http = Http(
name="http",
url="http://www.google.com",
http_method="GET",
http_params=[
{"prop": "abc", "httpParametersType": "PARAMETER", "value": "def"}
],
)
should be more pythonic
dophinscheduler version: 3.2.0
pydophinscheduler version: 4.0.4
[root@vmware-k8s-master dolphinescheduler]# python3 tutorial.py
Auth token is default token, highly recommend add a token in production, especially you deploy in public network.
/usr/local/python3/lib/python3.6/site-packages/pydolphinscheduler/java_gateway.py:324: UserWarning: Using unmatched version of pydolphinscheduler (version 4.0.4) and Java gateway (version 3.2.0) may cause errors. We strongly recommend you to find the matched version (check: https://pypi.org/project/apache-dolphinscheduler)
gateway = GatewayEntryPoint()
Traceback (most recent call last):
File "tutorial.py", line 74, in <module>
workflow.run()
File "/usr/local/python3/lib/python3.6/site-packages/pydolphinscheduler/core/workflow.py", line 395, in run
self.submit()
File "/usr/local/python3/lib/python3.6/site-packages/pydolphinscheduler/core/workflow.py", line 456, in submit
None,
File "/usr/local/python3/lib/python3.6/site-packages/pydolphinscheduler/java_gateway.py", line 297, in create_or_update_workflow
execution_type,
File "/usr/local/python3/lib/python3.6/site-packages/py4j/java_gateway.py", line 1323, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/local/python3/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling t.createOrUpdateWorkflow.
: ServiceException(code=10001)
at org.apache.dolphinscheduler.api.service.impl.ProcessDefinitionServiceImpl.generateTaskRelationList(ProcessDefinitionServiceImpl.java:489)
at org.apache.dolphinscheduler.api.service.impl.ProcessDefinitionServiceImpl.createProcessDefinition(ProcessDefinitionServiceImpl.java:301)
at org.apache.dolphinscheduler.api.service.impl.ProcessDefinitionServiceImpl$$FastClassBySpringCGLIB$$e8e34ed9.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:793)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763)
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:708)
at org.apache.dolphinscheduler.api.service.impl.ProcessDefinitionServiceImpl$$EnhancerBySpringCGLIB$$1c9dd086.createProcessDefinition(<generated>)
at org.apache.dolphinscheduler.api.python.PythonGateway.createOrUpdateWorkflow(PythonGateway.java:275)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
Currently, we do not support change log in our package, we should add change log in our website https://dolphinscheduler.apache.org/python/main/index.html
in apache/dolphinscheduler#10911, we add new mechanisms for reading resource from other system, but in yaml define, we also have its own file resource syntax
dolphinscheduler-sdk-python/examples/yaml_define/Sagemaker.yaml
Lines 24 to 27 in 6cd9784
and we should combine then into unify
Sometime, we hope out workflow can run success locally, and also can work in production environment, and we will they can use variable too.
such as in locally
the_var = "only-run-local"
but after we submit it to dolphinscheduler, we wish it replace by the parameter
the_var = ${var-name}
For example, If I have the workspace like this: https://github.com/pytorch/examples/tree/main/word_language_model
We can submit the task
# [start workflow]
from pydolphinscheduler.core import Workflow
from pydolphinscheduler.core.resource import Resource, LocalResource
from pydolphinscheduler.tasks import Shell
with Workflow(
name="auto_resources_example",
) as workflow:
# [start use_exists_resources]
task_use_resource = Shell(
name="example",
command=f"python main.py --cuda",
auto_resource=True
)
# [end use_exists_resources]
workflow.run()
# [end workflow]
Then, all the files will be uploaded to the resource center.
And there is another best way to track the file version, If this is the git repository, we only record the git repository, version, and diff.
Then, we can reproduce the workspace in dolphinscheduler. (Inspired by ClearML)
So, After developing a project locally, we can directly submit it to DolphinScheduler and run it directly. Except for some inconsistent environment variables.
When we want to use parameter local_param
in class Task
, have to use
local_params = { "prop": "n", "direct": "IN", "type": "VARCHAR", "value": "${n}" }
that is not pythonic and even a little ugly, we should make it more pythonic and easy to use
https://endoflife.date/python python 3.6 is already EOL 1 years ago
Only the task definition specific to each task is detected
The version of my pydolphinscheduler is 4.0.3, and dolphinscheduler is 3.1.5. , my OS is ubuntu 22.10,Other than that,I use hdfs for the resource center.
When I run code in the official documentation
something wrong happened.I was able to upload main.py and dependence.py without a problem, but my workflow didn't work. When I look at the log it says the host instance does not exist.
# [start workflow]
from pydolphinscheduler.core import Workflow
from pydolphinscheduler.core.resource import Resource
from pydolphinscheduler.tasks import Shell
dependence = "dependence.py"
main = "main.py"
with Workflow(
name="multi_resources_example",
# [start create_new_resources]
resource_list=[
Resource(
name=dependence,
content="from datetime import datetime\nnow = datetime.now()",
),
Resource(name=main, content="from dependence import now\nprint(now)"),
],
# [end create_new_resources]
) as workflow:
# [start use_exists_resources]
task_use_resource = Shell(
name="use-resource",
command=f"python {main}",
resource_list=[
dependence,
main,
],
)
# [end use_exists_resources]
workflow.run()
# [end workflow]
Change the above usage to the following usage
# [start workflow]
from pydolphinscheduler.core import Workflow
from pydolphinscheduler.core.resource import Resource, LocalResource
from pydolphinscheduler.tasks import Shell
dependence = "dependence.py"
main = "main.py"
with Workflow(
name="multi_resources_example",
) as workflow:
# [start use_exists_resources]
task_use_resource = Shell(
name="use-resource",
command=f"python {main}",
resource_list=[
LocalResource('dependence.py'),
LocalResource('main.py'),
LocalResource('*.py'),
],
)
# [end use_exists_resources]
workflow.run()
# [end workflow]
Hey, I find this inconsistency when I try to update the schedule of my workflow to dolphinscheduler web, which the document says 0 to 6 represents Sunday to Saturday (here I want to set it Monday to execute and I should set 1 here)
while in dolphinscheduler web I have to set it 2 to do that in my python code.
So I am curious about whether the PyDolphinScheduler document here is incorrect, where I find 1 to 7 actually represents Sunday to Saturday.
Now, we can upload files to the resource center, but if we cannot manage the path of the resource, there will make some errors. For example, different files have the same name.
I think we can upload the file to a special folder, such as PYDS/workflow_name/workflow_version/task_name/task_version
.
But it will waste a lot of space storing the same files
we should change the class keyword from process definition
to workflow
Sometime user may want to overwrite workflow parameter during trigger workflow with CLI,
such as:
with ProcessDefinition(
name="tutorial",
schedule="0 0 0 * * ? *",
start_time="2021-01-01",
tenant="tenant_exists",
) as pd:
and in bash
python tutorial.py --tenant=new-tenant
to replace the value from tenant="tenant_exists"
to tenant="new-tenant"
in file tutorial.py
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.