Fala, pessoal! Tudo certo?

Nessa postagem irei trazer alguns conceitos básicos de Pyspark.

Até para que possamos, posteriormente, ir avançando na ferramenta.

Trarei os seguintes conceitos:

  • Criando um ambiente de desenvolvimento.
  • Iniciando uma sessão PySpark.
  • Carregando o primeiro Dataframe.
  • Realizando uma query.
  • Salvando um Dataframe.
Lembrando que todo o código estará disponível no meu github:

https://github.com/jeffwsoares/jupyter-spark

Vamos lá?

Criando um ambiente de desenvolvimento

Para a criação do ambiente de desenvolvimento, criei uma imagem docker que contém o PySpark + Jupyterlab, que em minha opinião, é uma boa combinação de ferramentas.

FROM ubuntu:bionic

# Instalação Python
RUN apt-get update -y
RUN apt-get install python3 -y
RUN apt-get install python3-pip -y

# Instalando Jupyterlab
RUN pip3 install jupyterlab

# Instalando o PySpark
RUN pip3 install pyspark

# Instalando o Java
RUN apt-get install openjdk-8-jdk -y
ENV JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64

# Portas
EXPOSE 8888

# Workdir
WORKDIR /opt/sandbox

# Entrypoint
ENTRYPOINT [ "jupyter-lab","--no-browser","--ip=\"0.0.0.0\"", "--allow-root" ]

Agora que vimos o Dockerfile, vamos criar um docker compose, assim, poderemos atribuir uma pasta para a persistência de dados e também um IP fixo para a aplicação.

# ----------------------------------------------
# Autor: Jefferson Soares
# Contato: contato@thedataengineer.com.br
# Descrição: Ambiente Jupyter + Spark
# ----------------------------------------------

version: '3.3' 
services:

  jupyter_tde:
    build:
      context: .
      dockerfile: Dockerfile
    hostname: jupyter
    volumes:        
      - ./data:/opt/sandbox    
    ports:      
      - "8888:8888"      
    networks:
      thedataengineer:
        ipv4_address: 192.10.0.111

networks:
  thedataengineer:
    ipam:
      driver: default
      config:
        - subnet: "192.10.0.0/24" 

Para realizar o build da nossa imagem, devemos entrar no diretório da aplicação e digitar o seguinte comando:

docker-compose build

Depois de buildar a imagem, podemos iniciar a aplicação com o comando abaixo:

docker-compose up

Agora, basta acessarmos endereço que aparece no terminal:

Iniciando uma sessão PySpark

Para iniciar uma sessão PySpark, podemos utilizar a sintaxe abaixo:

from pyspark.sql import SparkSession
spark=SparkSession.builder.master("local[*]").appName("thedataengineer.com.br").getOrCreate()

Carregando o primeiro Dataframe

Para o primeiro Dataframe, estou utilizando uma base do Kaggle sobre as cidades brasileiras.

Você pode baixar no seguinte link:

Brazilian Cities

Se você clonou o repositório, você pode colocar a base no diretório “data”, ou se preferir, pode fazer o upload através do próprio jupyter lab.

Uma vez realizada a carga da base, podemos criar um Dataframe com o comando abaixo:

df = spark.read.option("header",True).csv("BRAZIL_CITIES.csv", sep=";")

Basicamente, consideramos o header do arquivo como o nome das colunas, passamos o arquivo e o separador. 

Depois disso, já podemos executar.

Como podemos ver, os dados foram carregados com sucesso!

Realizando uma query

Agora, vamos realizar a primeira query, a sintaxe é bem simples!

No comando abaixo, é realizada uma query filtrando os casos do estado de São Paulo:

df.filter(df.STATE == 'SP').show(5)

Salvando um Dataframe

Para salvar um Dataframe, também é muito simples e temos diversos formatos, como csv, parquet ou orc.

Abaixo como salvar nossa base filtrada com o estado de São Paulo, nesses diversos formatos:

df.filter(df.STATE == 'SP') \
    .withColumnRenamed("IBGE_60+","IBGE_60") \
    .withColumnRenamed("IBGE_CROP_PRODUCTION_$","IBGE_CROP_PRODUCTION") \
    .withColumnRenamed("IDHM Ranking 2010","IDHM_Ranking_2010") \
    .withColumnRenamed(" GVA_TOTAL ","GVA_TOTAL") \
    .write.mode("overwrite").parquet("sao_paulo.parquet") 

df.filter(df.STATE == 'SP') \
    .withColumnRenamed("IBGE_60+","IBGE_60") \
    .withColumnRenamed("IBGE_CROP_PRODUCTION_$","IBGE_CROP_PRODUCTION") \
    .withColumnRenamed("IDHM Ranking 2010","IDHM_Ranking_2010") \
    .withColumnRenamed(" GVA_TOTAL ","GVA_TOTAL") \
    .write.mode("overwrite").save("sao_paulo.orc")

df.filter(df.STATE == 'SP') \
    .withColumnRenamed("IBGE_60+","IBGE_60") \
    .withColumnRenamed("IBGE_CROP_PRODUCTION_$","IBGE_CROP_PRODUCTION") \
    .withColumnRenamed("IDHM Ranking 2010","IDHM_Ranking_2010") \
    .withColumnRenamed(" GVA_TOTAL ","GVA_TOTAL") \
    .write.mode("overwrite").csv("sao_paulo.csv")

Como pode-se notar, tive que renomear algumas colunas antes de salvar, pois haviam caracteres especiais nos nomes.

Você pode olhar o schema do Dataframe com o comando abaixo:

df.printSchema()

Conclusão

Espero que tenham gostado dessa pequena introdução do PySpark!

Embora bem simples, isso já nos dá um norte de por onde seguir!

Uma boa fonte de informações é o site PySpark By Examples, lá tem muita informação, vale a pena conferir!

Por hoje é só!

Até logo! 🙂

Categorias: PySpark

Jefferson Soares

Olá! Sou Jefferson. Trabalho com: Dados, Dashboards, SQL, SAS, Python e muito mais! Criei esse cantinho para postar alguns conhecimentos. :)

0 0 votes
Article Rating
Subscribe
Notify of
guest
0 Comentários
Inline Feedbacks
View all comments