A plugin of Apache Airflow that exposes REST endpoints for custom REST APIs, providing richer functionalities to support more powerful DAG and task management capabilities for Airflow. See the "Using the REST API" section for all the APIs enriched in this plugin.
- Download plugins zip from:
https://github.corp.ebay.com/linhgao/airflow-rest-api-plugin/archive/master.zip
- Check the plugins_floder configuration in airflow.cfg. If not, please configure.
- Unzip the plugins zip, move
rest_api_plugin.py
andtemplates folder
to plugins_floder directory.
unzip airflow-rest-api-plugin-master.zip
cp -r airflow-rest-api-plugin-master/plugins/* {AIRFLOW_PLUGINS_FOLDER}
- Start services of the airflow webserver and the airflow scheduler.
airflow webserver -p 8080
airflow scheduler
- /plugins
- /rest_api_plugin.py - Airflow plugins, achieve some custom interface
- /templates - Airflow plugins front-end page, integrated on airflow ui
- /tests - Airflow plugins unittests
- /LICENSE - airflow-rest-api-plugin license
Airflow does not have permission authentication by default
, and the following configuration can be ignored
. If you need to add RBAC authorization verification, please refer to the following configuration.
Airflow supports RBAC function since version 1.10.4.
RBAC means Role-Based Access Control.In RBAC, permissions are associated with roles, and users get the permissions of these roles by becoming members of appropriate roles. This greatly simplifies the management of permissions. In this way, the management is hierarchical, and permissions are assigned to roles, and roles are assigned to users. Such permissions are clearly designed and easy to manage.
In RBAC, Permission verification combines JWT(JSON Web Token).
- Set
rbac = True
inairflow.cfg
- Run
airflow initdb
command which initialize some related tables of rbac, such as ab_user, ab_role, ab_permission, etc. - When airflow rbac is enabled for the first time, run airflow create_user command which add users to ab_user, and specify the role through -r. Examples:
airflow create_user --role Admin --username linhgao --email [email protected] --firstname linhua --lastname gao --password airflow
- Use username and password to login the airflow web UI.
Plugin enables JWT Token based authentication for Airflow versions 1.10.4 or higher when RBAC support is enabled.
curl -XPOST http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/security/login -H "Content-Type: application/json" -d '{"username":"username", "password":"password", "refresh":true, "provider": "db"}'
curl -X POST http://localhost:8080/api/v1/security/login -H "Content-Type: application/json" -d '{"username":"linhgao", "password":"airflow", "refresh":true, "provider": "db"}'
{
"access_token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE2MDQyMTc4MzgsIm5iZiI6MTYwNDIxNzgzOCwianRpIjoiMTI4ZDE2OGQtMTZiOC00NzU0LWJiY2EtMTEyN2E2ZTNmZWRlIiwiZXhwIjoxNjA0MjE4NzM4LCJpZGVudGl0eSI6MSwiZnJlc2giOnRydWUsInR5cGUiOiJhY2Nlc3MifQ.xSWIE4lR-_0Qcu58OiSy-X0XBxuCd_59ic-9TB7cP9Y",
"refresh_token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE2MDQyMTc4MzgsIm5iZiI6MTYwNDIxNzgzOCwianRpIjoiZjA5NTNkODEtNWY4Ni00YjY0LThkMzAtYzg5NTYzMmFkMTkyIiwiZXhwIjoxNjA2ODA5ODM4LCJpZGVudGl0eSI6MSwidHlwZSI6InJlZnJlc2gifQ.VsiRr8_ulCoQ-3eAbcFz4dQm-y6732QR6OmYXsy4HLk"
}
By default, JWT access token is valid for 15 mins and refresh token is valid for 30 days. You can renew the access token with the help of refresh token as shown below.
curl -X POST "http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/api/v1/security/refresh" -H 'Authorization: Bearer <refresh_token>'
curl -X POST "http://localhost:8080/api/v1/security/refresh" -H 'Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE2MDQyMTc4MzgsIm5iZiI6MTYwNDIxNzgzOCwianRpIjoiZjA5NTNkODEtNWY4Ni00YjY0LThkMzAtYzg5NTYzMmFkMTkyIiwiZXhwIjoxNjA2ODA5ODM4LCJpZGVudGl0eSI6MSwidHlwZSI6InJlZnJlc2gifQ.VsiRr8_ulCoQ-3eAbcFz4dQm-y6732QR6OmYXsy4HLk'
{
"access_token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE2MDQyODQ2OTksIm5iZiI6MTYwNDI4NDY5OSwianRpIjoiZDhhN2IzMmYtMWE5Zi00Y2E5LWFhM2ItNDEwMmU3ZmMyMzliIiwiZXhwIjoxNjA0Mjg1NTk5LCJpZGVudGl0eSI6MSwiZnJlc2giOmZhbHNlLCJ0eXBlIjoiYWNjZXNzIn0.qY2e-bNSgOY-YboinOoGqLfKX9aQkdRjo025mZwBadA"
}
Working with the rest_api_plugin and JWT Auth tokens.
{"msg":"Missing Authorization Header"}
Examples:
curl -X GET -H 'Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE2MDQyODQ2OTksIm5iZiI6MTYwNDI4NDY5OSwianRpIjoiZDhhN2IzMmYtMWE5Zi00Y2E5LWFhM2ItNDEwMmU3ZmMyMzliIiwiZXhwIjoxNjA0Mjg1NTk5LCJpZGVudGl0eSI6MSwiZnJlc2giOmZhbHNlLCJ0eXBlIjoiYWNjZXNzIn0.qY2e-bNSgOY-YboinOoGqLfKX9aQkdRjo025mZwBadA' http://localhost:8080/rest_api/api\?api\=dag_state\&dag_id\=dag_test\&run_id\=manual__2020-10-28T17%3A36%3A28.838356%2B00%3A00
Once you deploy the plugin and restart the webserver, you can start to use the REST API. Bellow you will see the endpoints that are supported. In addition, you can also interact with the REST API from the Airflow Webserver. When you reload the page, you will see a link under the Admin tab called "REST API". Clicking on the link will navigate you to the following URL:
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/admin/rest_api/
Note: If enable RBAC, http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/rest_api/
This web page will show the Endpoints supported and provide a form for you to test submitting to them.
- deploy_dag
- refresh_all_dags
- delete_dag
- upload_file
- dag_state
- task_instance_detail
- restart_failed_task
- kill_running_tasks
- run_task_instance
- skip_task_instance
- Deploy a new dag, and refresh dag to session.
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/rest_api/api?api=deploy_dag
- POST
- dag_file - file - Python file to upload and deploy.
- force (optional) - boolean - Whether to forcefully upload the file if the file already exists or not.
- pause (optional) - boolean - The DAG will be forced to be paused.
- unpause (optional) - boolean - The DAG will be forced to be unpaused.
curl -X POST -H 'Content-Type: multipart/form-data' -F 'dag_file=@dag_test.py' -F 'force=on' -F 'unpause=true' http://localhost:8080/admin/rest_api/api?api=deploy_dag
{
"message": "DAG File [<module 'module.name' from '/Users/linhgao/airflow/dags/dag_test.py'>] has been uploaded",
"status": "success"
}
- Get all dags from dag_floder, refresh the dags to the session.
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/admin/rest_api/api?api=refresh_all_dags
- GET
- None
curl -X GET http://localhost:8080/admin/rest_api/api?api=refresh_all_dags
{
"message": "All DAGs are now up to date",
"status": "success"
}
- Delete dag based on dag_id.
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/admin/rest_api/api?api=delete_dag&dag_id=value
- GET
- dag_id - string - The id of dag.
curl -X GET http://localhost:8080/admin/rest_api/api?api=delete_dag&dag_id=dag_test
{
"message": "DAG [dag_test] deleted",
"status": "success"
}
- Upload a new File to the specified folder.
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/admin/rest_api/api?api=upload_file
- POST
- file - file - uploaded file.
- force (optional) - boolean - Whether to forcefully upload the file if the file already exists or not.
- path (optional) - string - The path of file.
curl -X POST -H 'Content-Type: multipart/form-data' -F 'file=@dag_test.txt' -F 'force=on' http://localhost:8080/admin/rest_api/api?api=upload_file
{
"message": "File [/Users/linhgao/airflow/dags/dag_test.txt] has been uploaded",
"status": "success"
}
- Get the status of a dag run.
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/admin/rest_api/api?api=dag_state&dag_id=value&run_id=value
- GET
- dag_id - string - The id of dag.
- run_id - string - The id of the dagRun.
curl -X GET http://localhost:8080/admin/rest_api/api?api=dag_state&dag_id=dag_test&run_id=manual__2020-10-28T16%3A15%3A19.427214%2B00%3A00
{
"state": "success",
"startDate": "2020-10-28T16:15:19.436693+0000",
"endDate": "2020-10-28T16:21:36.245696+0000",
"status": "success"
}
- Get the detail info of a task instance.
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/admin/rest_api/api?api=task_instance_detail&dag_id=value&run_id=value&task_id=value
- GET
- dag_id - string - The id of dag.
- run_id - string - The id of the dagRun.
- task_id - string - The id of the task.
curl -X GET http://localhost:8080/admin/rest_api/api?api=task_instance_detail&dag_id=dag_test&run_id=manual__2020-10-28T16%3A31%3A17.247035%2B00%3A00&task_id=task_test
{
"taskId": "task_test",
"dagId": "dag_test",
"state": "success",
"tryNumber": null,
"maxTries": null,
"startDate": "2020-10-28T16:31:57.882329+0000",
"endDate": "2020-10-28T16:31:57.882329+0000",
"duration": null,
"status": "success"
}
- Restart failed tasks with downstream.
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/admin/rest_api/api?api=restart_failed_task&dag_id=value&run_id=value
- GET
- dag_id - string - The id of dag.
- run_id - string - The id of the dagRun.
curl -X GET http://localhost:8080/admin/rest_api/api?api=restart_failed_task&dag_id=dag_test&run_id=manual__2020-10-28T16%3A31%3A17.247035%2B00%3A00
{
"failed_task_count": 2,
"clear_task_count": 6,
"status": "success"
}
- Kill running tasks that status in ['none', 'running'].
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/admin/rest_api/api?api=kill_running_tasks&dag_id=value&run_id=value&task_id=value
- GET
- dag_id - string - The id of dag.
- run_id - string - The id of the dagRun.
- task_id - string - If task_id is none, kill all tasks, else kill one task.
curl -X GET http://localhost:8080/admin/rest_api/api?api=kill_running_tasks&dag_id=dag_test&run_id=manual__2020-10-28T16%3A31%3A17.247035%2B00%3A00&task_id=task_test
{
"status": "success"
}
- Create dagRun, and run some tasks, other task skip.
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/admin/rest_api/api?api=run_task_instance
- POST
- dag_id - string - The id of dag.
- run_id - string - The id of the dagRun.
- tasks - string - The id of the tasks, Multiple tasks are split by comma.
- conf - string - Conf of creating dagRun.
curl -X POST -F 'dag_id=dag_test' -F 'run_id=manual__2020-10-28T17:36:28.838356+00:00' -F 'tasks=task_test_3,task_test_4,task_test_6' http://localhost:8080/admin/rest_api/api?api=run_task_instance
{
"execution_date": "2020-10-28T17:39:14.941060+0000",
"status": "success"
}
- Skip one task instance and downstream task.
http://{AIRFLOW_HOST}:{AIRFLOW_PORT}/admin/rest_api/api?api=skip_task_instance&dag_id=value&run_id=value&task_id=value
- GET
- dag_id - string - The id of dag.
- run_id - string - The id of the dagRun.
- task_id - string - The id of the task.
curl -X GET http://localhost:8080/admin/rest_api/api?api=skip_task_instance&dag_id=dag_test&run_id=manual__2020-10-28T17%3A43%3A10.053716%2B00%3A00&task_id=task_test_2
{
"status": "success"
}
In order to view the ut coverage, you need to install coverage
, which can be installed by the following command:
pip install coverage
Run the tests/test_rest_api_plugins.py
file,generate a .coverage
file
coverage run tests/test_rest_api_plugins.py
Generate html format file
coverage html
Open the htmlcov/index.html
file with a browser to view the coverage of ut
Bugs and new features should be submitted using Github issues. Please include with a detailed description and the expected behaviour. If you would like to submit a change yourself do the following steps.
- Fork it.
- Create a branch (git checkout -b fix-for-that-thing)
- Commit a failing test (git commit -am "adds a failing test to demonstrate that thing")
- Commit a fix that makes the test pass (git commit -am "fixes that thing")
- Push to the branch (git push origin fix-for-that-thing)
- Open a [Pull Request](github repo)
Please keep your branch up to date by rebasing upstream changes from master.
- Airflow configuration documentation
- Contact email
- sky -
[email protected]
- linhua -
[email protected]
- sky -
Achieve airflow custom interface, make up for the lack of flexibility of airflow official interface, and meet business needs at the same time