Fala, pessoal! Tudo certo?

Nessa postagem irei trazer alguns conceitos básicos de Pyspark.

Até para que possamos, posteriormente, ir avançando na ferramenta.

Trarei os seguintes conceitos:

  • Criando um ambiente de desenvolvimento.
  • Iniciando uma sessão PySpark.
  • Carregando o primeiro Dataframe.
  • Realizando uma query.
  • Salvando um Dataframe.
Lembrando que todo o código estará disponível no meu github:

https://github.com/jeffwsoares/jupyter-spark

Vamos lá?

Criando um ambiente de desenvolvimento

Para a criação do ambiente de desenvolvimento, criei uma imagem docker que contém o PySpark + Jupyterlab, que em minha opinião, é uma boa combinação de ferramentas.

FROM ubuntu:bionic

# Instalação Python
RUN apt-get update -y
RUN apt-get install python3 -y
RUN apt-get install python3-pip -y

# Instalando Jupyterlab
RUN pip3 install jupyterlab

# Instalando o PySpark
RUN pip3 install pyspark

# Instalando o Java
RUN apt-get install openjdk-8-jdk -y
ENV JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64

# Portas
EXPOSE 8888

# Workdir
WORKDIR /opt/sandbox

# Entrypoint
ENTRYPOINT [ "jupyter-lab","--no-browser","--ip=\"0.0.0.0\"", "--allow-root" ]

Agora que vimos o Dockerfile, vamos criar um docker compose, assim, poderemos atribuir uma pasta para a persistência de dados e também um IP fixo para a aplicação.

# ----------------------------------------------
# Autor: Jefferson Soares
# Contato: contato@thedataengineer.com.br
# Descrição: Ambiente Jupyter + Spark
# ----------------------------------------------

version: '3.3' 
services:

  jupyter_tde:
    build:
      context: .
      dockerfile: Dockerfile
    hostname: jupyter
    volumes:        
      - ./data:/opt/sandbox    
    ports:      
      - "8888:8888"      
    networks:
      thedataengineer:
        ipv4_address: 192.10.0.111

networks:
  thedataengineer:
    ipam:
      driver: default
      config:
        - subnet: "192.10.0.0/24" 

Para realizar o build da nossa imagem, devemos entrar no diretório da aplicação e digitar o seguinte comando:

docker-compose build

Depois de buildar a imagem, podemos iniciar a aplicação com o comando abaixo:

docker-compose up

Agora, basta acessarmos endereço que aparece no terminal:

Iniciando uma sessão PySpark

Para iniciar uma sessão PySpark, podemos utilizar a sintaxe abaixo:

from pyspark.sql import SparkSession
spark=SparkSession.builder.master("local[*]").appName("thedataengineer.com.br").getOrCreate()

Carregando o primeiro Dataframe

Para o primeiro Dataframe, estou utilizando uma base do Kaggle sobre as cidades brasileiras.

Você pode baixar no seguinte link:

Brazilian Cities

Se você clonou o repositório, você pode colocar a base no diretório “data”, ou se preferir, pode fazer o upload através do próprio jupyter lab.

Uma vez realizada a carga da base, podemos criar um Dataframe com o comando abaixo:

df = spark.read.option("header",True).csv("BRAZIL_CITIES.csv", sep=";")

Basicamente, consideramos o header do arquivo como o nome das colunas, passamos o arquivo e o separador. 

Depois disso, já podemos executar.

Como podemos ver, os dados foram carregados com sucesso!

Realizando uma query

Agora, vamos realizar a primeira query, a sintaxe é bem simples!

No comando abaixo, é realizada uma query filtrando os casos do estado de São Paulo:

df.filter(df.STATE == 'SP').show(5)

Salvando um Dataframe

Para salvar um Dataframe, também é muito simples e temos diversos formatos, como csv, parquet ou orc.

Abaixo como salvar nossa base filtrada com o estado de São Paulo, nesses diversos formatos:

df.filter(df.STATE == 'SP') \
    .withColumnRenamed("IBGE_60+","IBGE_60") \
    .withColumnRenamed("IBGE_CROP_PRODUCTION_$","IBGE_CROP_PRODUCTION") \
    .withColumnRenamed("IDHM Ranking 2010","IDHM_Ranking_2010") \
    .withColumnRenamed(" GVA_TOTAL ","GVA_TOTAL") \
    .write.mode("overwrite").parquet("sao_paulo.parquet") 

df.filter(df.STATE == 'SP') \
    .withColumnRenamed("IBGE_60+","IBGE_60") \
    .withColumnRenamed("IBGE_CROP_PRODUCTION_$","IBGE_CROP_PRODUCTION") \
    .withColumnRenamed("IDHM Ranking 2010","IDHM_Ranking_2010") \
    .withColumnRenamed(" GVA_TOTAL ","GVA_TOTAL") \
    .write.mode("overwrite").save("sao_paulo.orc")

df.filter(df.STATE == 'SP') \
    .withColumnRenamed("IBGE_60+","IBGE_60") \
    .withColumnRenamed("IBGE_CROP_PRODUCTION_$","IBGE_CROP_PRODUCTION") \
    .withColumnRenamed("IDHM Ranking 2010","IDHM_Ranking_2010") \
    .withColumnRenamed(" GVA_TOTAL ","GVA_TOTAL") \
    .write.mode("overwrite").csv("sao_paulo.csv")

Como pode-se notar, tive que renomear algumas colunas antes de salvar, pois haviam caracteres especiais nos nomes.

Você pode olhar o schema do Dataframe com o comando abaixo:

df.printSchema()

Conclusão

Espero que tenham gostado dessa pequena introdução do PySpark!

Embora bem simples, isso já nos dá um norte de por onde seguir!

Uma boa fonte de informações é o site PySpark By Examples, lá tem muita informação, vale a pena conferir!

Por hoje é só!

Até logo! 🙂

Categorias: PySpark

Jefferson Soares

Olá! Sou Jefferson. Trabalho com: Dados, Dashboards, SQL, SAS, Python e muito mais! Criei esse cantinho para postar alguns conhecimentos. :)

0 0 votos
Article Rating
Inscrever-se
Notificar de
guest
1 Comentário
mais antigos
mais recentes Mais votado
Feedbacks embutidos
Ver todos os comentários
Ariana Vasconcelos
Ariana Vasconcelos
15 dias atrás

Adorei conhecer seu blog, tem muito artigos bem interessantes.

Última edição 9 horas atrás por Jefferson Soares