Em um outro post, falei de uma biblioteca chamada Pandas que serve para fazer a transformação de dados em Python. Hoje, vamos falar de uma alternativa de orquestrar Pipelines. Vejamos a biblioteca Luigi.
Vamos imaginar que você tenha dividido o seu trabalho da seguinte maneira:
$ python extracao_dos_dados.py
$ python limpeza_dos_dados.py
$ python juncao_dos_dados.py
$ python fazer_coisas_com_os_dados.py
Estas atividades são muito comuns em projetos de dados. Cada script realiza uma atividade, tudo "organizadinho". Esse conjunto de atividades é conhecido, no jargão da engenharia de dados como Pipeline. Daí toda vez que você for executar a pipeline como um todo, terá que executar cada um dos scripts. Quando estamos falando de um ou dois conjuntos de dados beleza, mas sabemos que não é assim que funcionam as coisas. Para cada conjunto de dados teríamos 4 scripts para realizar o processo mencionado acima. A medida que as pipelines se multiplicam, a situação fica muito complicada.
Daí um colega sugere de botar todas essas atividades num script só e depois uma rápida refatoração, o script faz_tudo.py pode ficar assim:
if __name__ == '__main__':
extracao_dos_dados()
limpeza_dos_dados()
juncao_dos_dados()
fazer_coisas_com_os_dados()
Isso é bastante simples de executar: $ python faz_tudo.py
Poderíamos também colocar tudo em um script bash, que chamaria esses scripts todos, em sequência, mas as deficiências serão mais ou menos as mesmas.
Ao avançar para um pipeline pronto para produção, há mais alguns aspectos a serem considerados além do código de execução de tudo. Em particular, o tratamento de erros deve ser levado em consideração:
try:
extracao_dos_dados()
except ExtracaoDeDadosError as e:
# handle this
Mas se encadearmos todas as tarefas individuais, acabamos com uma árvore de Natal de try/except:
try:
extracao_dos_dados()
try:
limpeza_dos_dados()
try:
# Rapaz, aonde isso vai parar!
except EvenMoreErrors:
# ...
except limpeza_dos_dadosError as e:
# tratando limpeza_dos_dadosError
except extracao_dos_dadosError as e:
# tratando extracao_dos_dadosError
Outro aspecto importante a considerar é como retomar um pipeline. Por exemplo, se as primeiras tarefas forem concluídas, mas ocorrer um erro no meio do caminho, como executaremos novamente o pipeline sem executar novamente as etapas iniciais bem-sucedidas? Poderíamos tratar com if.
# testando se uma tarefa já foi executada com sucesso.
if not i_got_the_data_already():
# if not, run it
try:
limpeza_dos_dados()
except limpeza_dos_dadosError as e:...
Percebe o quanto pode ficar complicado esse controle das pipelines, caso a pipeline seja implementada desta forma?
Para resolver essa situação de uma maneira mais elegante, eis que surgem diversas ferramentas para tratar esse problema entre elas está o pacote Luigi.
Luigi é uma ferramenta Python para gerenciamento de fluxo de trabalho. Ele foi desenvolvido pela Spotify, para ajudar a construir pipelines de dados complexos de trabalhos em lote.
Alguns dos recursos úteis do Luigi incluem:
- Gerenciamento de dependência
- Pontos de verificação/recuperação de falhas
- Integração / parametrização CLI
- Visualização do gráfico de dependência
Existem dois conceitos principais para entender como podemos aplicar o Luigi ao nosso próprio pipeline de dados: Tarefas e Destinos. Uma tarefa é uma unidade de trabalho, projetada estendendo a classe luigi.Task e substituindo alguns métodos básicos. A saída de uma tarefa é um destino, que pode ser um arquivo no sistema de arquivos local, um arquivo no S3 da Amazon, ou ainda, algum dado em um banco de dados..
As dependências são definidas em termos de entradas e saídas, ou seja, se tarefa2 depende da tarefa1, significa que a saída da tarefa1 será a entrada da tarefa2.
Então vamos deixar de bla bla bla, partindo para a parte prática.
Para instalar o Luigi: $ pip install luigi
""" Trata-se de scripts que contem classes que extendem a
a classe luigi task
"""
import luigi
#métodos que precisam ser implmentados
class Tarefa1(luigi.Task):
#o método requires recebe as tarefas das quais ela depende.
# neste caso a tarefa 1 não depende de nenhuma outra tarefa
def requires(self):
return []
#o método output é a saída da tarefa que pode ou não virar entrada de
#outra tarefa.
def output(self):
return luigi.LocalTarget("numbers_up_to_10.txt")
#o método run é a implementação da tarefa o que ela faz.
def run(self):
with self.output().open('w') as f:
for i in range(1, 11):
f.write("{}\n".format(i))
class Tarefa2(luigi.Task):
#veja que a tarefa2 depende da tarefa 1, ou seja, a tarefa 1 tem
#que rodar primeiro para depois a tarefa dois rodar.
def requires(self):
return [Tarefa1()]
def output(self):
return luigi.LocalTarget("squares.txt")
def run(self):
with self.input()[0].open() as fin, self.output().open('w') as fout:
for line in fin:
n = int(line.strip())
out = n * n
fout.write("{}:{}\n".format(n, out))
if __name__ == '__main__':
luigi.run()
# para executar $ luigi -m run_luigi.py SquaredNumbers --local-scheduler
Este código apresenta duas tarefas: tarefa1 , que grava o número de 1 a 10 em um arquivo chamado numbers_up_to_10.txt , um número por linha, e tarefa2 , que lê esse arquivo e gera uma lista de pares number-square em squares.txt , também um par por linha.
Salve esse código em um arquivo chamado dag_luigi e execute o mesmo através da linha de comando.
$ luigi -m dag_luigi.py tarefa2 --local-scheduler
O primeiro argumento que estamos passando para o arquivo é o nome da última tarefa no pipeline que queremos executar. O segundo argumento simplesmente diz ao Luigi para usar um agendador local .
O Luigi cuidará de verificar as dependências entre tarefas, se a entrada de tarefa2 não estiver lá, então ele executará a tarefa tarefa1 primeiro, depois continuará com a execução.
Para criar uma tarefa, simplesmente precisamos criar uma classe que estenderá a classe luigi.Task e sobrescrever alguns métodos. Em particular:
- require() deve retornar a lista de dependências para uma determinada tarefa — em outras palavras, uma lista de tarefas
- output() deve retornar o destino da tarefa (por exemplo, um arquivo local, ou um a inserção dos dados em um Banco).
- run() deve conter a lógica para executar
Luigi verificará os valores de retorno de require() e output() e construirá o gráfico de dependência entre as tarefas.
Para ver o gráfico de dependências e acompanhar a execução das tarefas, o Luigi disponibiliza uma interface web que pode ser acessada através do endereço localhost:8082. Para disponibilizar essa interface rode no pronpt de comando a seguinte instrução em foreground $luigid ou em background $luigid --background
Interface web do luigi
Nem tudo são as mil maravilhas, o agendamento da execução da DAG é feita através do velho e bom Cron.
Existem outras alternativas e o apache Airflow é uma delas.
Bom pessoal, era isso. Obviamente tem mais coisa para ser estudada sobre essa biblioteca veja a documentação dela em https://luigi.readthedocs.io/en/stable/index.html