Code Monkey home page Code Monkey logo

airflow-spark's Introduction

Data Pipelines com Apache Airflow e Spark

Construção de um data pipeline para um processo de ELT seguindo o modelo batch. O processo roda uma vez ao dia e armazena os dados em um data lake no formato JSON. Esse repositório foi feito acompanhando o curso "Engenharia de dados: Conhecendo Apache Airflow" da Alura.

O objetivo da aplicação é extrair dados do Twitter e tranformar esses dados de forma que possam ser analisados da forma mais simples possível.

As principais ferramentas utilizadas são: Apache Airflow, para orquestrar os processos (agendar o horário de cada processo e monitorar sua execução); e Apache Spark para processamento distribuído.

  1. Rodando a aplicação
  2. Preparando o ambiente virtual
  3. Iniciado o Airflow
  4. Consumindo a API do Twitter
  5. Airflow

0 Rodando a aplicação

Após clonar o projeto, execute os comandos abaixo na raiz do diretório clonado:

  1. Instalar o pacote responsável por gerenciar os ambientes virtuais: apt-get install python3-venv
  2. Criar ambiente virtual: python3 -m venv .env
  3. Carregar ambiente virtual: source .env/bin/activate
  4. Ativar a variável de ambiente: export AIRFLOW_HOME=$(pwd)/airflow
  5. Iniciar o web service: airflow webserver
  6. Acessando o web service: http://localhost:8080
  7. Iniciar o agendador: airflow scheduler

1 Preparando o ambiente (virtual)

  • Instalação do pacote no sistema:
apt-get install python3-venv
  • Criação do ambiente dentro da pasta que vai ficar a aplicação:
mkdir airflow-spark
cd airflow-spark
python3 -m venv .env
  • Carregando o ambiente:
source .env/bin/activate
  • Antes de instalar o Airflow é necessário criar uma variável de ambiente que aponte para pasta onde ele vai ficar. É necessário manter essa variável ativa para que o Airflow não seja instalado na home do usuário, e depois de instalado, para que o terminal não vá la procurar pela aplicação.
export AIRFLOW_HOME=$(pwd)/airflow
  • Instalando o Apache Airflow:
pip install apache-airflow==1.10.14 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-1.10.14/constraints-3.7.txt"
  • Criando o banco de dados do Airflow:
airflow db init

Esse comando cria uma pasta chamada airflow com arquivos de configurações (airflow.cfg), um banco SQLite (airflow.db), pasta de logs e testes unitários (unittests.cfg).

↑ voltar ao topo

2 Iniciado o Airflow

Para começar a trabalhar com o Airflow devemos rodar o web service, que é a interface gráfica do Airflow:

airflow webserver

Após rodar o comando podemos observar algumas saídas no terminal como: o executor utilizado (sequencial, vem configurado por padrão), o local onde ficam armazenados os dags e a porta em que ele está rodando (Listening at).

Agora podemos acessar o web service no navegador através do endereço http://localhost:8080. Ao carregar podemos ver uma tabel com DAGs, suas informações de controle e alguns links. No topo da página existe um aviso informado que o scheduler (agendador) não está rodando, então executamos o comando abaixo para iniciar esse segundo serviço:

airflow scheduler

O agendador é o serviço que executa o heart beat, que fica observando quando um processo deve ser executado.

↑ voltar ao topo

3 Consumindo a API do Twitter

Vamos extrair os dados do Twitter consumindo sua API. Para isso é necessário primeiro criar um projeto no "portal do desenvolvedor" do Twitter. Em seguida começamos escrever o código Python a partir de um exemplo da própria API.

Criamos um arquivo chamado recent_search.py e colamos o código de exemplo (commit 8bba23a). Logo no começo vemos que o Bearer Token é obtido através de uma variável de ambiente do sistema operacional:

def auth():
    return os.environ.get("BEARER_TOKEN")

Podemos criar uma variável de ambiente para o _token_ou alterar o código para que essa informação seja recuperada de outro lugar (commit eee61a7). Só não é uma boa idéia deixar isso em plain text dentro do código.

Em seguida partimos para o método create_url() onde começamos alterando a variável query para que a busca seja feita em relação a "AluraOnline" e depois passamos para a variável tweet_fields, alterando os campos que devem ser retornados (a lista dos parâmetros disponíveis pode ser encontrada na documentação da API).

Também criamos outras duas variáveis, uma com campos referentes aos usuários e outra com as datas de início e fim da consulta. Lembrando que a API retorna apenas dados dos últimos sete dias para uma conta gratuita (commit 5ab8e41).

Neste ponto podemos testar esse código. Então se tudo funcionou corretamente, após executar o código Python, devemos ter como retorno um JSON com três itens: data, includes e meta. Em data é um vetor com os tweets e suas informações, includes possui a chave users com um vetor com as informações dos usuários. E por fim temos o item meta, que possui um campo chamado next_token indicando que existem mais dados para retornar e para retornar esses dados devemos implementar a paginação no código (commit 150ba91).

↑ voltar ao topo

4 Airflow

  • projeto open source
  • google (clound composer) e aws
  • dag (gráficos aciclicos direcionados)
    • conjunto de passos conectados, com inicio e fim
      • podem ser executados de forma sequencial ou paralela
      • são executados através de operadores
        • são nodes de um DAG
        • caracteristicas de um operador: indepotência, isosalmento e atomicidade
        • 3 tipos de operadores
    • como limpar os dags default da instalação?
  • serviços
    • webservice: ui, monitoramento
    • scheduler: agendador, heartbeat (fica observando quando os dags devem executar), executores (sequencial, local, cluster)

4.1 Criando uma conexão

O Airflow permite acessar diversas fontes de dados externas e geralmente é necessário alguma credencial para esses acessos, usuário e senha para um banco de dados ou um token para uma API.

O Airflow possui um "repositório" que centraliza todas as conexões para que qualquer DAG possa utiliza-lás. O web service nos fornece um CRUD para as conexões, no menu Admin -> Connections.

Ao acessar a página de create podemos preencher os seguintes campos do formulário para criar a conexão com a API do Twitter:

  • Conn Id: twitter_default
  • Conn Type: HTTP
  • Host: https://api.twitter.com
  • Extra: {"Authorization": "Bearer <coloque o token aqui>"}

4.2 Criando um hook

A forma mais adequada de se conectar a alguma ferramenta externa é através de uma interface comum que seja desacoplada do pipeline. No Airflow, essas interfaces são chamadas de hooks. Nos hooks são colocados todo código necessário para interagir com a ferramenta bem como a definição da conexão que deve ser usada.

Na seção 1 (ou 2) após criarmos o Banco do Airflow, foi criada uma estrutura de pastas e arquivos de configuração, sendo a raiz uma pasta chamada airflow. Dentro dessa pasta vamos criar o arquivo plugins/hooks/twitter_hook.py e dentro desse arquivo escrevemos nossa classe de hook (commit b790f86).

Como o hook vai interagir com uma API, faz mais sentido herdarmos uma classe mais adequada para essa finalidade. Partimos do código de exemplo escrito na seção 3 e o modificamos de acordo com a classe base (commit 5e513a2).

4.3 Criando um operador

Dentro da pasta plugins vamos criar uma nova pasta chamada operator e dentro dela o arquivo twitter_operator.py. Da mesma forma como os hooks, operadores são classes que seguem uma estrutura já definida e herdam uma classe base.

Obrigatóriamente devemos implementar o método execute recebendo um parâmetro chamado context, que traz alguns parâmetros do Airflow. Este é o método chamado para executar o DAG.

No método __init__ aplicamos o decorator apply_defaults, que facilita o envio/captura de parâmetros padrâo das DAGs(?), definimos os mesmo parâmetros recebidos pelo hook e chamamos o método super (commit 70f9b98).

No passo anterior escrevemos um hook responsável por interagir com a API do Twitter, então agora vamos conectá-lo ao operador. Dessa forma teremos uma tarefa responsável por buscar os dados no Twitter e tendo as características de um operador. Por hora vamos apenas retornar os dados na saída padrão, então apenas importamos a classe de hook que escrevemos, a instânciamos no método execute e iteramos sobre o retorno do método run(). Para testar o operador devemos criar uma tarefa (DAG) de testes (commit 34b6548), e ao executar o código devemos esperar o mesmo comportamento da seçao 3.

Agora vamos alterar a classe para que os dados coletados sejam armazenados em arquivos. Começamos adicionando um novo parâmetro ao construtor da classe do operador, que receberá o caminho em que o arquivo deve ser salvo. Em seguida alteramos a parte "printava" os dados para que eles sejam enviado para um arquivo. De modo a garantir que o caminho do arquivo sempre exista, implementamos o método create_parent_folder.

Sempre que instânciamos uma tarefa no Airflow podemos ter acesso a dados específicos da execução através das macros e eu posso receber essas informações para cada um dos meus parâmetros usando jinja template. Devemos indicar em quais parâmetros isso será permitido e fazemos isso através do template_fields. O template_fields é uma lista com os parâmtros que queremos permitir esse acesso e é definido como um parâmetro para a classe. Para que o jinja template seja tranformado, ao invés de executar o operador vou executar a tarefa fictícia, pois ela irá fazer isso(?). Agora quando executamos o código deve ser criado um arquivo JSON com os dados coletados (commit faa2f3e).

↑ voltar ao topo

Apêndice

Data pipelines

Um data pipeline é uma cadeia de processos (workflows) que manipula dados seguindo os formatos ETL ou ELT - E de extration (extração), T para transformation (tranfosrmação) e L de loading (armazenamento). Ou seja, servem para movimentar dados de uma ou mais fontes, para um ou mais destinos, (e se necessário) transformando-os durante o processo.

Data pipelines podem seguir dois modelos de funcionamento: batch e streaming. Batch significa lote, neste formato os dados são primeiro agrupados e então processados. O tamanho dos lotes e a frequência (a cada segundos ou até mesmo meses) com que seram extraídos vai depender da infraestrutura e da finalidade.

Por sua vez, streaming significa fluxo. Neste formato os dados são extraídos seguindo um fluxo constante e o processamento ocorre em tempo real conforme os dados vão chegando.

↑ voltar ao topo

airflow-spark's People

Contributors

brnocesar avatar

Watchers

 avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.