Construção de um data pipeline para um processo de ELT seguindo o modelo batch. O processo roda uma vez ao dia e armazena os dados em um data lake no formato JSON. Esse repositório foi feito acompanhando o curso "Engenharia de dados: Conhecendo Apache Airflow" da Alura.
O objetivo da aplicação é extrair dados do Twitter e tranformar esses dados de forma que possam ser analisados da forma mais simples possível.
As principais ferramentas utilizadas são: Apache Airflow, para orquestrar os processos (agendar o horário de cada processo e monitorar sua execução); e Apache Spark para processamento distribuído.
- Rodando a aplicação
- Preparando o ambiente virtual
- Iniciado o Airflow
- Consumindo a API do Twitter
- Airflow
Após clonar o projeto, execute os comandos abaixo na raiz do diretório clonado:
- Instalar o pacote responsável por gerenciar os ambientes virtuais:
apt-get install python3-venv
- Criar ambiente virtual:
python3 -m venv .env
- Carregar ambiente virtual:
source .env/bin/activate
- Ativar a variável de ambiente:
export AIRFLOW_HOME=$(pwd)/airflow
- Iniciar o web service:
airflow webserver
- Acessando o web service:
http://localhost:8080
- Iniciar o agendador:
airflow scheduler
- Instalação do pacote no sistema:
apt-get install python3-venv
- Criação do ambiente dentro da pasta que vai ficar a aplicação:
mkdir airflow-spark
cd airflow-spark
python3 -m venv .env
- Carregando o ambiente:
source .env/bin/activate
- Antes de instalar o Airflow é necessário criar uma variável de ambiente que aponte para pasta onde ele vai ficar. É necessário manter essa variável ativa para que o Airflow não seja instalado na home do usuário, e depois de instalado, para que o terminal não vá la procurar pela aplicação.
export AIRFLOW_HOME=$(pwd)/airflow
- Instalando o Apache Airflow:
pip install apache-airflow==1.10.14 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-1.10.14/constraints-3.7.txt"
- Criando o banco de dados do Airflow:
airflow db init
Esse comando cria uma pasta chamada airflow
com arquivos de configurações (airflow.cfg
), um banco SQLite (airflow.db
), pasta de logs e testes unitários (unittests.cfg
).
Para começar a trabalhar com o Airflow devemos rodar o web service, que é a interface gráfica do Airflow:
airflow webserver
Após rodar o comando podemos observar algumas saídas no terminal como: o executor utilizado (sequencial, vem configurado por padrão), o local onde ficam armazenados os dags e a porta em que ele está rodando (Listening at).
Agora podemos acessar o web service no navegador através do endereço http://localhost:8080
. Ao carregar podemos ver uma tabel com DAGs, suas informações de controle e alguns links. No topo da página existe um aviso informado que o scheduler (agendador) não está rodando, então executamos o comando abaixo para iniciar esse segundo serviço:
airflow scheduler
O agendador é o serviço que executa o heart beat, que fica observando quando um processo deve ser executado.
Vamos extrair os dados do Twitter consumindo sua API. Para isso é necessário primeiro criar um projeto no "portal do desenvolvedor" do Twitter. Em seguida começamos escrever o código Python a partir de um exemplo da própria API.
Criamos um arquivo chamado recent_search.py
e colamos o código de exemplo (commit 8bba23a). Logo no começo vemos que o Bearer Token é obtido através de uma variável de ambiente do sistema operacional:
def auth():
return os.environ.get("BEARER_TOKEN")
Podemos criar uma variável de ambiente para o _token_ou alterar o código para que essa informação seja recuperada de outro lugar (commit eee61a7). Só não é uma boa idéia deixar isso em plain text dentro do código.
Em seguida partimos para o método create_url()
onde começamos alterando a variável query
para que a busca seja feita em relação a "AluraOnline"
e depois passamos para a variável tweet_fields
, alterando os campos que devem ser retornados (a lista dos parâmetros disponíveis pode ser encontrada na documentação da API).
Também criamos outras duas variáveis, uma com campos referentes aos usuários e outra com as datas de início e fim da consulta. Lembrando que a API retorna apenas dados dos últimos sete dias para uma conta gratuita (commit 5ab8e41).
Neste ponto podemos testar esse código. Então se tudo funcionou corretamente, após executar o código Python, devemos ter como retorno um JSON com três itens: data
, includes
e meta
. Em data
é um vetor com os tweets e suas informações, includes
possui a chave users
com um vetor com as informações dos usuários. E por fim temos o item meta
, que possui um campo chamado next_token
indicando que existem mais dados para retornar e para retornar esses dados devemos implementar a paginação no código (commit 150ba91).
- projeto open source
- google (clound composer) e aws
- dag (gráficos aciclicos direcionados)
- conjunto de passos conectados, com inicio e fim
- podem ser executados de forma sequencial ou paralela
- são executados através de operadores
- são nodes de um DAG
- caracteristicas de um operador: indepotência, isosalmento e atomicidade
- 3 tipos de operadores
- como limpar os dags default da instalação?
- conjunto de passos conectados, com inicio e fim
- serviços
- webservice: ui, monitoramento
- scheduler: agendador, heartbeat (fica observando quando os dags devem executar), executores (sequencial, local, cluster)
O Airflow permite acessar diversas fontes de dados externas e geralmente é necessário alguma credencial para esses acessos, usuário e senha para um banco de dados ou um token para uma API.
O Airflow possui um "repositório" que centraliza todas as conexões para que qualquer DAG possa utiliza-lás. O web service nos fornece um CRUD para as conexões, no menu Admin -> Connections.
Ao acessar a página de create podemos preencher os seguintes campos do formulário para criar a conexão com a API do Twitter:
- Conn Id: twitter_default
- Conn Type: HTTP
- Host: https://api.twitter.com
- Extra: {"Authorization": "Bearer
<coloque o token aqui>
"}
A forma mais adequada de se conectar a alguma ferramenta externa é através de uma interface comum que seja desacoplada do pipeline. No Airflow, essas interfaces são chamadas de hooks. Nos hooks são colocados todo código necessário para interagir com a ferramenta bem como a definição da conexão que deve ser usada.
Na seção 1 (ou 2) após criarmos o Banco do Airflow, foi criada uma estrutura de pastas e arquivos de configuração, sendo a raiz uma pasta chamada airflow
. Dentro dessa pasta vamos criar o arquivo plugins/hooks/twitter_hook.py
e dentro desse arquivo escrevemos nossa classe de hook (commit b790f86).
Como o hook vai interagir com uma API, faz mais sentido herdarmos uma classe mais adequada para essa finalidade. Partimos do código de exemplo escrito na seção 3 e o modificamos de acordo com a classe base (commit 5e513a2).
Dentro da pasta plugins
vamos criar uma nova pasta chamada operator
e dentro dela o arquivo twitter_operator.py
. Da mesma forma como os hooks, operadores são classes que seguem uma estrutura já definida e herdam uma classe base.
Obrigatóriamente devemos implementar o método execute
recebendo um parâmetro chamado context
, que traz alguns parâmetros do Airflow. Este é o método chamado para executar o DAG.
No método __init__
aplicamos o decorator apply_defaults
, que facilita o envio/captura de parâmetros padrâo das DAGs(?), definimos os mesmo parâmetros recebidos pelo hook e chamamos o método super
(commit 70f9b98).
No passo anterior escrevemos um hook responsável por interagir com a API do Twitter, então agora vamos conectá-lo ao operador. Dessa forma teremos uma tarefa responsável por buscar os dados no Twitter e tendo as características de um operador. Por hora vamos apenas retornar os dados na saída padrão, então apenas importamos a classe de hook que escrevemos, a instânciamos no método execute
e iteramos sobre o retorno do método run()
. Para testar o operador devemos criar uma tarefa (DAG) de testes (commit 34b6548), e ao executar o código devemos esperar o mesmo comportamento da seçao 3.
Agora vamos alterar a classe para que os dados coletados sejam armazenados em arquivos. Começamos adicionando um novo parâmetro ao construtor da classe do operador, que receberá o caminho em que o arquivo deve ser salvo. Em seguida alteramos a parte "printava" os dados para que eles sejam enviado para um arquivo. De modo a garantir que o caminho do arquivo sempre exista, implementamos o método create_parent_folder
.
Sempre que instânciamos uma tarefa no Airflow podemos ter acesso a dados específicos da execução através das macros e eu posso receber essas informações para cada um dos meus parâmetros usando jinja template. Devemos indicar em quais parâmetros isso será permitido e fazemos isso através do template_fields
. O template_fields
é uma lista com os parâmtros que queremos permitir esse acesso e é definido como um parâmetro para a classe. Para que o jinja template seja tranformado, ao invés de executar o operador vou executar a tarefa fictícia, pois ela irá fazer isso(?). Agora quando executamos o código deve ser criado um arquivo JSON com os dados coletados (commit faa2f3e).
Um data pipeline é uma cadeia de processos (workflows) que manipula dados seguindo os formatos ETL ou ELT - E de extration (extração), T para transformation (tranfosrmação) e L de loading (armazenamento). Ou seja, servem para movimentar dados de uma ou mais fontes, para um ou mais destinos, (e se necessário) transformando-os durante o processo.
Data pipelines podem seguir dois modelos de funcionamento: batch e streaming. Batch significa lote, neste formato os dados são primeiro agrupados e então processados. O tamanho dos lotes e a frequência (a cada segundos ou até mesmo meses) com que seram extraídos vai depender da infraestrutura e da finalidade.
Por sua vez, streaming significa fluxo. Neste formato os dados são extraídos seguindo um fluxo constante e o processamento ocorre em tempo real conforme os dados vão chegando.