Instalar astro cli
astro dev init
astro dev start ou stop
Retry, Orquestração, Varios pipelines para monitorar
Tasks na ordem correta
- Interativo
- Escalavel
- Customizavel
- Webserver
- Scheduler
- Metadata Database
- Executores (como a task será executada) [HOW]
- Worker [WHERE]
- One Node
- Tudo dentro do mesmo nó
- Multi Node
- Vários nós com as diversas aplicações (nesse caso, precisamos de um Redis)
- Nós diferentes para os workers
- DAG: pipeline
- Operators: é uma task na DAG
- Action operator: executa algo
- Transfer operator: permite transferir dados
- Senson operator: sensor que aguarda algo acontecer antes de mover para a próxima ação da dag.
- Task: instancia de um operador
- Task Instance: DAG + Task + Ponto no tempo
- Dependências: << ou >>
- Workflow: combinação de tudo
- UI
- CLI
- Rest API
executar o bash em um container
$ docker exec -it e23c9fd919ac /bin/bash
no airflow
airflow db init
airflow db up
airflow db reset (remove tudo do db)
airflow webserver
airflow scheduler
airflow celery worker
airflow dags pause ou unpause (semelhante ao botão da UI)
airflow dags trigger
airflow dags list
airflow dags list <nome>
airflow tasks test (executar um teste em task específica)
airflow dags backfill -s[start] -e[end] <nome/id da dag>(rerun as dags que não foram pausadas)
Por deafult o período da DAG é rodar diariamente (a cada 24h) 2 Parâmetros que obrigatóriamente precisamos definir
- start_date
- schedule_interval
- (opcional) end_date
Exemplo:
Primeira execução = start_date + schedule_interval
start_date: 01/01/2022 às 10h
schedule_interval: 10 min
A primeira execução será no dia 01/01 às 10h10\
- Novas DAGS Runs
Então o start_date se torna o execution_date e o 01/01 10h10 se torna o start_date
Os datetime por padrão são em UTC. Por padrão todas as dags não executadas entre o dia atual e o start date serão executadas quando o start_date for no passado. Não devemos colocar uma data dinâmica, exemplo datetime.now()
Padrões do airflow:
- "@daily"
- "@weekly"
- "@monthly"
- None (nunca é ativada automaticamente, somente manual ou por algum operador externo)
Define a exeução a cada intervalo RELATIVO, exemplo: schedule_interval = timedelta(hours=1)
- Executa a DAG a cada 7h
Por padrão o airflow irá executar as DAGs que não foram executadas entre o start_date e a data atual.
catchup=False or True
Podemos limitar o número de dags runs ativas. com max_active_runs = N
Catchup False e usar backfill com terminal
ou
Catchup True com max dags runs
Dividir as tasks entre operadores diferentes, removendo as dependências em caso de falha em uma das tasks.
1 operador -> 1 task Todas tasks precisam ter um ID único Retry N vezes antes de considerar a dag falha
Definir os args para todos os operadores
from airflow.operators.python import PythonOperator
acessar o contexto da execução: **kwargs
from airflow.operators.bash import BashOperator
from airflow.sensors.filesystem import FileSensor
upstream = X.set_upstream(Y) -> execute Y antes de X\
downstream = X.set_downstream(Y)-> execute X antes de Y\
Outra opção mais fácil e clara:
X >> Y = execute X depois Y
X << Y
X >> [Y, Z] = Eecute X antes de Y e Z
from airflow.models.baseoperator import chain, cross_downstream
-
Chain:
- chain(X,Y,Z) = x >> y >> z
-
Cross_downstream
- cross_downstream([X,Y], [Z,K])
Não é possível usar [X,Y] >> [Z,K]
- cross_downstream([X,Y], [Z,K])
- Temos limitações:
XCOM
-> Cross Comunication
Podemos usar o return
ou ti.xcom_push
Sendo ti = task instance object
Admin -> XCOMs -> Chave, Valor, Data, Task Id e Dag ID
Method xcom_pull exemplo:
my_xcom = ti.xcom_pull(key='return_value', task_ids=['download_data'])
xcom são limitados em tamanho já que são armazenados no airflow usando sqlite podemos armazenar até 2gb em 1 xcom. Não é uma boa prática armazenar dados aqui, melhor usar para armazenar estados, pequenos dados.
Podemos similuar falhas com o operador bash. Usando a UI podemos selecionar várias dags com o mesmo estado em caso de falha, por exemplo up_for_retry.
Browser -> Tasks Instance -> Filtros -> Selecionar todas -> Actions -> Clear Isso faz com que todas as tasks selecionadas sejam executadas novamente.
Args: email_on_failure: True | False email_on_retry: True | False email: '[email protected] on_failure_callback= código para usar em caso de falha
-
Sequential executor
São executados em sequência: util para debugar ou experimentar -
Tasks em paralelo
parallelism: define o número de máximo de tasks executados em paralelo na instancia toda do airflow. (default = 32)
dag_concurrency: número de tasks em uma dag que pode ser executadas em paralelo nas DAG Runs (default = 16)
max_active_runs_per_dag: número máximo de dag runs que podem ser rodadas ao mesmo tempo em uma determinada dag (default = 16)
parametro para uma dag: max_active_runs concurrency: limita o número de tasks executadas em paralelo em uma dag específica.
-
Celery Executor
- distributed task queue
- distribui as tasks entre várias maquinas Todas máquinas devem ter os mesmos pacotes e ambientes.
-
Kubernetes