segunda-feira, 3 de outubro de 2022

ETL e análise de dados com Apache Spark (PySpark)

Já não se faz mais ETL como antigamente. O bom e velho Excel já não atende mais as necessidades das profissionais que trabalham com isso e deu lugar a ferramentas específicas para essa atividade (Talend, Apache Hop, SSIS). Até mesmo ferramentas de self service BI, possuem funcionalidades para realizar esse trabalho. Os Pythonistas de plantão já contavam com a Biblioteca Pandas para realizar esse trabalho, porém ao submeter um volume estrondoso de dados ao um script utilizando a dita cuja, percebe-se que ela não segura o rojão. Foi necessária a criação de uma ferramenta que possibilita-se o processamento distribuído dos dados. Surge então o ecossistema Hadoop, e mais a frente, o Apache Spark.

A ideia aqui é apresentar de forma prática a utilização do Spark, interagindo com ele através da SDK PySpark. O script lê 5 datasets, faz a conversão deste para dataframe e seguida, converte eles para views de modo a permitir a interação com os dados através de SQL.

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local[*]').getOrCreate()

clientes = spark.read.csv("C:\\Users\\gugum\\dados\\clientes.csv",
sep=';', inferSchema=True,header=True)
vendedores = spark.read.csv("C:\\Users\\gugum\\dados\\vendedores.csv",
sep=';', inferSchema=True,header=True)
produtos = spark.read.csv("C:\\Users\\gugum\\dados\\produtos.csv",
sep=';', inferSchema=True,header=True)
vendas = spark.read.csv("C:\\Users\\gugum\\dados\\vendas.csv",
sep=';', inferSchema=True,header=True)
itensvendas = spark.read.csv("C:\\Users\\gugum\\dados\\itensvenda.csv",
sep=';', inferSchema=True,header=True)

clientes.createOrReplaceTempView("clientesView")
produtos.createOrReplaceTempView("produtosView")
vendedores.createOrReplaceTempView("vendedoresView")
vendas.createOrReplaceTempView("vendasView")
itensvendas.createOrReplaceTempView("itensvendasView")

df_desnormalizado = spark.sql("""select
itensvendasView.IdProduto,
itensvendasView.Quantidade,
itensvendasView.ValorUnitario,
itensvendasView.ValorTotal,
itensvendasView.Desconto,
vendasView.IDVenda,
vendasView.Data,
vendasView.Total,
produtosView.IDProduto,
produtosView.Produto,
produtosView.Preco,
clientesView.Cliente,
clientesView.Estado,
clientesView.Sexo,
clientesView.Status,
vendedoresView.IDVendedor,
vendedoresView.Nome
from itensvendasView
inner join vendasView on vendasView.idvenda = itensvendasview.idvenda
inner join produtosView on produtosView.idproduto = itensvendasview.idproduto
inner join clientesView on clientesView.idcliente = vendasview.idcliente
inner join vendedoresView on vendedoresview.idvendedor  =  vendasview.idvendedor
""")

#vendas por cliente
teste2 = spark.sql("""select
distinct clientesView.cliente,
round(sum(itensvendasView.valorunitario)) as total_vendas
from itensvendasView
inner join vendasView on vendasView.idvenda = itensvendasview.idvenda
inner join produtosView on produtosView.idproduto = itensvendasview.idproduto
inner join clientesView on clientesView.idcliente = vendasview.idcliente
inner join vendedoresView on vendedoresview.idvendedor  =  vendasview.idvendedor
--where clientesView.cliente = 'Bernardete Águeda'
group by clientesview.cliente
order by total_vendas desc
""")
teste2.show(250)
df_desnormalizado.to_csv("vendas.csv")

Ao final é feita uma análise dos dados processados e em seguida, é gerado um arquivo CSV com dados desnormalizados.

Lembrando que esse código foi escrito para fins didáticos. Numa solução feita para ambiente produtivo, as tarefas seriam separadas, formando um Pipeline a ser orquestrado e startado por alguma ferramenta do tipo Job Scheduling.

Até a próxima pessoal!

sábado, 1 de outubro de 2022

Orquestrando scripts python utilizando o pacote/biblioteca LUIGI

Em um outro post, falei de uma biblioteca chamada Pandas que serve para fazer a transformação de dados em Python. Hoje, vamos falar de uma alternativa de orquestrar Pipelines. Vejamos a biblioteca Luigi.



Vamos imaginar que você tenha dividido o seu trabalho da seguinte maneira:

$ python extracao_dos_dados.py

$ python limpeza_dos_dados.py

$ python juncao_dos_dados.py

$ python fazer_coisas_com_os_dados.py

Estas atividades são muito comuns em projetos de dados. Cada script realiza uma atividade, tudo "organizadinho". Esse conjunto de atividades é conhecido, no jargão da engenharia de dados como Pipeline. Daí toda vez que você for executar a pipeline como um todo, terá que executar cada um dos scripts. Quando estamos falando de um ou dois conjuntos de dados beleza, mas sabemos que não é assim que funcionam as coisas. Para cada conjunto de dados teríamos 4 scripts para realizar o processo mencionado acima. A medida que as pipelines se multiplicam, a situação fica muito complicada.

Daí um colega sugere de botar todas essas atividades num script só e depois uma rápida refatoração, o script faz_tudo.py pode ficar assim:

if __name__ == '__main__':

extracao_dos_dados()

limpeza_dos_dados()

juncao_dos_dados()

fazer_coisas_com_os_dados()

   

Isso é bastante simples de executar: $ python faz_tudo.py

Poderíamos também colocar tudo em um script bash, que chamaria esses scripts todos, em sequência, mas as deficiências serão mais ou menos as mesmas.

Ao avançar para um pipeline pronto para produção, há mais alguns aspectos a serem considerados além do código de execução de tudo. Em particular, o tratamento de erros deve ser levado em consideração:

try:

    extracao_dos_dados()

except ExtracaoDeDadosError as e:

    # handle this

Mas se encadearmos todas as tarefas individuais, acabamos com uma árvore de Natal de try/except:

try:

    extracao_dos_dados()

    try:

        limpeza_dos_dados()

        try:

            # Rapaz, aonde isso vai parar!

        except EvenMoreErrors:

            # ...

    except limpeza_dos_dadosError as e:

        # tratando limpeza_dos_dadosError

except extracao_dos_dadosError as e:

    # tratando extracao_dos_dadosError

Outro aspecto importante a considerar é como retomar um pipeline. Por exemplo, se as primeiras tarefas forem concluídas, mas ocorrer um erro no meio do caminho, como executaremos novamente o pipeline sem executar novamente as etapas iniciais bem-sucedidas? Poderíamos tratar com if.

# testando se uma tarefa já foi executada com sucesso.

if not i_got_the_data_already():

    # if not, run it

    try:

       limpeza_dos_dados()

    except limpeza_dos_dadosError as e:...

Percebe o quanto pode ficar complicado esse controle das pipelines, caso a pipeline seja implementada desta forma?

Para resolver essa situação de uma maneira mais elegante, eis que surgem diversas ferramentas para tratar esse problema entre elas está o pacote Luigi.

Luigi é uma ferramenta Python para gerenciamento de fluxo de trabalho. Ele foi desenvolvido pela Spotify, para ajudar a construir pipelines de dados complexos de trabalhos em lote. 

Alguns dos recursos úteis do Luigi incluem:

  • Gerenciamento de dependência
  • Pontos de verificação/recuperação de falhas
  • Integração / parametrização CLI
  • Visualização do gráfico de dependência

Existem dois conceitos principais para entender como podemos aplicar o Luigi ao nosso próprio pipeline de dados: Tarefas e Destinos. Uma tarefa é uma unidade de trabalho, projetada estendendo a classe luigi.Task e substituindo alguns métodos básicos. A saída de uma tarefa é um destino, que pode ser um arquivo no sistema de arquivos local, um arquivo no S3 da Amazon, ou ainda, algum dado em um banco de dados..

As dependências são definidas em termos de entradas e saídas, ou seja, se tarefa2 depende da tarefa1, significa que a saída da tarefa1 será a entrada da tarefa2.

Então vamos deixar de bla bla bla, partindo para a parte prática.

Para instalar o Luigi: $ pip install luigi

""" Trata-se de scripts que contem classes que extendem a
a classe luigi task
"""

import luigi
 
 #métodos que precisam ser implmentados
class Tarefa1(luigi.Task):
 #o método requires recebe as tarefas das quais ela depende.
 # neste caso a tarefa 1 não depende de nenhuma outra tarefa
    def requires(self):
        return []
 #o método output é a saída da tarefa que pode ou não virar entrada de
 #outra tarefa.
    def output(self):
        return luigi.LocalTarget("numbers_up_to_10.txt")
 #o método run é a implementação da tarefa o que ela faz.
    def run(self):
        with self.output().open('w') as f:
            for i in range(1, 11):
                f.write("{}\n".format(i))
 
class Tarefa2(luigi.Task):
    #veja que a tarefa2 depende da tarefa 1, ou seja, a tarefa 1 tem
    #que rodar primeiro para depois a tarefa dois rodar.
    def requires(self):
        return [Tarefa1()]
 
    def output(self):
        return luigi.LocalTarget("squares.txt")
 
    def run(self):
        with self.input()[0].open() as fin, self.output().open('w') as fout:
            for line in fin:
                n = int(line.strip())
                out = n * n
                fout.write("{}:{}\n".format(n, out))
                 
if __name__ == '__main__':
    luigi.run()

# para executar $ luigi -m run_luigi.py SquaredNumbers --local-scheduler

Este código apresenta duas tarefas: tarefa1 , que grava o número de 1 a 10 em um arquivo chamado numbers_up_to_10.txt , um número por linha, e tarefa2 , que lê esse arquivo e gera uma lista de pares number-square em squares.txt , também um par por linha.

Salve esse código em um arquivo chamado dag_luigi e execute o mesmo através da linha de comando.

$ luigi -m dag_luigi.py tarefa2 --local-scheduler

O primeiro argumento que estamos passando para o arquivo é o nome da última tarefa no pipeline que queremos executar. O segundo argumento simplesmente diz ao Luigi para usar um agendador local .

O Luigi cuidará de verificar as dependências entre tarefas, se a entrada de tarefa2 não estiver lá, então ele executará a tarefa tarefa1 primeiro, depois continuará com a execução.

Para criar uma tarefa, simplesmente precisamos criar uma classe que estenderá a classe luigi.Task e sobrescrever alguns métodos. Em particular:

  • require() deve retornar a lista de dependências para uma determinada tarefa — em outras palavras, uma lista de tarefas
  • output() deve retornar o destino da tarefa (por exemplo, um arquivo local, ou um a inserção dos dados em um Banco).
  • run() deve conter a lógica para executar

Luigi verificará os valores de retorno de require() e output() e construirá o gráfico de dependência entre as tarefas.

Para ver o gráfico de dependências e acompanhar a execução das tarefas,  o Luigi disponibiliza uma interface web que pode ser acessada através do endereço localhost:8082. Para disponibilizar essa interface rode no pronpt de comando a seguinte instrução em foreground $luigid ou em background $luigid --background

Interface web do luigi

Nem tudo são as mil maravilhas, o agendamento da execução da DAG é feita através do velho e bom Cron.

Existem outras alternativas e o apache Airflow é uma delas.

Bom pessoal, era isso. Obviamente tem mais coisa para ser estudada sobre essa biblioteca veja a documentação dela em https://luigi.readthedocs.io/en/stable/index.html