segunda-feira, 3 de outubro de 2022

ETL e análise de dados com Apache Spark (PySpark)

Já não se faz mais ETL como antigamente. O bom e velho Excel já não atende mais as necessidades das profissionais que trabalham com isso e deu lugar a ferramentas específicas para essa atividade (Talend, Apache Hop, SSIS). Até mesmo ferramentas de self service BI, possuem funcionalidades para realizar esse trabalho. Os Pythonistas de plantão já contavam com a Biblioteca Pandas para realizar esse trabalho, porém ao submeter um volume estrondoso de dados ao um script utilizando a dita cuja, percebe-se que ela não segura o rojão. Foi necessária a criação de uma ferramenta que possibilita-se o processamento distribuído dos dados. Surge então o ecossistema Hadoop, e mais a frente, o Apache Spark.

A ideia aqui é apresentar de forma prática a utilização do Spark, interagindo com ele através da SDK PySpark. O script lê 5 datasets, faz a conversão deste para dataframe e seguida, converte eles para views de modo a permitir a interação com os dados através de SQL.

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local[*]').getOrCreate()

clientes = spark.read.csv("C:\\Users\\gugum\\dados\\clientes.csv",
sep=';', inferSchema=True,header=True)
vendedores = spark.read.csv("C:\\Users\\gugum\\dados\\vendedores.csv",
sep=';', inferSchema=True,header=True)
produtos = spark.read.csv("C:\\Users\\gugum\\dados\\produtos.csv",
sep=';', inferSchema=True,header=True)
vendas = spark.read.csv("C:\\Users\\gugum\\dados\\vendas.csv",
sep=';', inferSchema=True,header=True)
itensvendas = spark.read.csv("C:\\Users\\gugum\\dados\\itensvenda.csv",
sep=';', inferSchema=True,header=True)

clientes.createOrReplaceTempView("clientesView")
produtos.createOrReplaceTempView("produtosView")
vendedores.createOrReplaceTempView("vendedoresView")
vendas.createOrReplaceTempView("vendasView")
itensvendas.createOrReplaceTempView("itensvendasView")

df_desnormalizado = spark.sql("""select
itensvendasView.IdProduto,
itensvendasView.Quantidade,
itensvendasView.ValorUnitario,
itensvendasView.ValorTotal,
itensvendasView.Desconto,
vendasView.IDVenda,
vendasView.Data,
vendasView.Total,
produtosView.IDProduto,
produtosView.Produto,
produtosView.Preco,
clientesView.Cliente,
clientesView.Estado,
clientesView.Sexo,
clientesView.Status,
vendedoresView.IDVendedor,
vendedoresView.Nome
from itensvendasView
inner join vendasView on vendasView.idvenda = itensvendasview.idvenda
inner join produtosView on produtosView.idproduto = itensvendasview.idproduto
inner join clientesView on clientesView.idcliente = vendasview.idcliente
inner join vendedoresView on vendedoresview.idvendedor  =  vendasview.idvendedor
""")

#vendas por cliente
teste2 = spark.sql("""select
distinct clientesView.cliente,
round(sum(itensvendasView.valorunitario)) as total_vendas
from itensvendasView
inner join vendasView on vendasView.idvenda = itensvendasview.idvenda
inner join produtosView on produtosView.idproduto = itensvendasview.idproduto
inner join clientesView on clientesView.idcliente = vendasview.idcliente
inner join vendedoresView on vendedoresview.idvendedor  =  vendasview.idvendedor
--where clientesView.cliente = 'Bernardete Águeda'
group by clientesview.cliente
order by total_vendas desc
""")
teste2.show(250)
df_desnormalizado.to_csv("vendas.csv")

Ao final é feita uma análise dos dados processados e em seguida, é gerado um arquivo CSV com dados desnormalizados.

Lembrando que esse código foi escrito para fins didáticos. Numa solução feita para ambiente produtivo, as tarefas seriam separadas, formando um Pipeline a ser orquestrado e startado por alguma ferramenta do tipo Job Scheduling.

Até a próxima pessoal!

Nenhum comentário: