Comments (7)
IMHO, technically it could be added as a separate Provider with access through apache-airflow-client, but as every new provider it should comes comes thought Accepting new community providers process. So I would recommend to start a discussion in Dev List
from airflow.
Could you use the HTTP Sensor?
from airflow.
OK thanks. Just to illustrate how the sensor would look like I leave you a code snippet of the hypothetical call to the sensor. If it makes sense to you we can move on :-)
`
wait_for_single_task = ExternalAPITaskSensor(
task_id="wait_for_single_task",
external_dag_id="secondary_dag",
external_task_id="task_a",
allowed_states=["success"],
failed_states=["failed"],
skipped_states=["skipped"],
http_conn_id="http_default",
execution_date="{{ execution_date }}",
deferrable=True
)
wait_for_multiple_tasks = ExternalAPITaskSensor(
task_id="wait_for_multiple_tasks",
external_dag_id="secondary_dag",
external_task_ids=["task_b", "task_c"],
allowed_states=["success"],
failed_states=["failed"],
skipped_states=["skipped"],
http_conn_id="http_default",
execution_date="{{ execution_date }}",
deferrable=True
)
wait_for_task_group = ExternalAPITaskSensor(
task_id="wait_for_task_group",
external_dag_id="secondary_dag",
external_task_group_id="group_1",
allowed_states=["success"],
failed_states=["failed"],
skipped_states=["skipped"],
http_conn_id="http_default",
execution_date="{{ execution_date }}",
deferrable=True
)
`
I'm not sure this can be addressed using the HttpSensor
, because we need to use more than one API Call: /api/v1/dags/{self.external_dag_id}/dagRuns
to count the records matching the given states and /api/v1/dags/{self.external_dag_id}/dagRuns/{dag_run_id}/taskInstances
to monitor the external tasks Ids.
I keep waiting for your response. Thanks you all. @Taragolis @rawwar @RNHTTR ?
from airflow.
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.
from airflow.
Hi there! Thanks for your response and please correct me if I'm wrong about the HttpSensor implementation.
The idea behind the ExternalAPITaskSensor is to address specific needs that go beyond what the HttpSensor can offer. While the HttpSensor provides a basic mechanism to poll an API endpoint, it doesn't really grasp the concept of external tasks and DAGs in the Airflow world.
The ExternalAPITaskSensor would retain the context of external tasks and DAGs, providing a seamless way to monitor specific DAG runs or tasks in another Airflow instance via the API. It would leverage attributes like external_dag_id, execution_date, allowed_states, skipped_states, and failed_states, which are unique to external DAGs and tasks.
Here's why I think this sensor is necessary:
-
State Awareness and Exception Management: It would handle different states (like success, skipped, and failed) and raise exceptions (e.g., AirflowFailException, AirflowSkipException) when a specific state is reached. This ensures that task dependencies are managed correctly.
-
Advanced Error Handling: It would differentiate between transient network errors (retries with a threshold) and API-related issues, providing more robust error handling.
-
Deferrable Execution: The sensor would be able to defer execution to a custom WorkflowTrigger.
In summary, while the HttpSensor is great for basic API polling, it lacks the features needed to monitor external tasks and DAGs properly. The ExternalAPITaskSensor would encapsulate this logic, making it a valuable tool for workflows that rely on task dependencies across multiple DAGs or external Airflow instances (Same concept as ExternalTaskSensor)
I hope this explanation makes the purpose clear. Let me know if you have any questions or suggestions!
from airflow.
State Awareness and Exception Management: It would handle different states (like success, skipped, and failed) and raise exceptions (e.g., AirflowFailException, AirflowSkipException) when a specific state is reached. This ensures that task dependencies are managed correctly.
Can't that be achieved with the HTTPSensor
's response_check
parameter? This allows you to pass a function that evaluates the response to the HTTPSensor
for some condition.
So, if you query the taskInstances
endpoint of the Airflow REST API , you can configure your response_check
accordingly.
You could then use existing TI states to determine how to respond to a given state.
Advanced Error Handling: It would differentiate between transient network errors (retries with a threshold) and API-related issues, providing more robust error handling.
I believe this can also be done both with HTTPSensor
parameters (for example tcp_keep_alive_count
) and normal Airflow retries.
Deferrable Execution: The sensor would be able to defer execution to a custom WorkflowTrigger.
The HTTPSensor
already supports deferrable execution.
from airflow.
This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.
from airflow.
Related Issues (20)
- scheduler_job_runner starving log additional information HOT 1
- OpenLineage defaults in airflow.cfg incorrect HOT 5
- Add deferrable support to `LambdaInvokeFunctionOperator` HOT 10
- OpenLineage Provider issue: scheduler shuts down after pickle of OpenLineageListener fails HOT 5
- Version 2.9.2 airflow db migrate unsupported queries for mysql HOT 8
- Mismatched Xcom Map Index when Dynamic Mapping over TaskGroup and not all mapped tasks have run
- After deleting a dag from the dags directory, it is still displayed in the UI HOT 3
- Syntax highlight and copy button for entries in XCom tab
- Deferred task behaviour is different from normal task HOT 1
- Request to Reintroduce Tasks Duration Multi Line Graph HOT 1
- The task was stuck, but when marking it as successful, it continues to run the remaining part of the job. HOT 2
- Mark Success but remote task continue running HOT 4
- Add on_kill to SSHOperator HOT 4
- Airflow scheduler crashes with a psycopg2.errors.GroupingError due to an incorrect SQL query when detecting orphaned datasets
- Support extraInitContainers for migrateDatabaseJob HOT 1
- ObjectStoragePath should be template-able HOT 2
- After deleting a dag from the dags directory, it is still displayed in the UI HOT 2
- values.yaml `dags.gitSync.containerLifecycleHooks` Does not working HOT 2
- Improve SFTPOperator with directory transfer and DELETE operation HOT 1
- add ower to TaskInstance class HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from airflow.