Continuando a serie do pipeline de dados, agora iremos fazer a preparação do ambiente em MySQL e realizar o processo de ingestão de dados com o Streamsets.

Nessa etapa iremos adicionar dois novos itens em nosso pipeline.

O primeiro é o docker-compose, que servirá de gerenciador permitindo a orquestração e algumas definições importantes, como a rede.

O segundo um script bash em linux, ele reproduzirá um sistema que tem como saída um arquivo em um diretório.

Basicamente iremos ao site https://brasil.io/dataset/covid19/caso/ baixar os dados de covid-19 com esse script shell.

Caso você não tenha acompanhado a postagem anterior, ela é importante e parte dessa serie. Você pode encontra-lá nesse link: postagem-1

Agora vamos lá! 🙂

Ambiente

Mais uma vez utilizarei o docker como ferramenta para realizar o setup do ambiente. Também vou utilizar docker-compose para realizar a iniciação do ambiente, assim, podemos garantir os IP’s que serão utilizados.

O script abaixo demonstra como criar um contêiner para rodar um banco de dados MySQL, vamos salvar esse script como mysql.dockerfile.

FROM mysql:5.7.32
ENV MYSQL_ROOT_PASSWORD=12345

Primeiro, vamos “buildar” a imagem do MySQL, o comando abaixo faz isso:

$ docker build -f mysql.dockerfile . -t mysql-streamsets

Devemos criar também um volume para a persistência dos dados. Para isso podemos utilizar o código abaixo:

$ docker volume create mysql-data

Assim, temos nosso volume e as imagens construídas, vamos iniciar os contêineres com o docker-compose. 😀

O script abaixo faz a preparação do ambiente, subindo os dois contêineres e definindo um endereço fixo de rede.

version: "3.3" 

services:
  streamsets:
    image: streamsets 
    networks:      
        net-data-pipeline:
            ipv4_address: 172.16.0.11
    ports: 
    - "18630:18630"
    volumes:
    - streamsets-data:/opt/streamsets

  mysql-db:
    image: mysql-streamsets
    networks:      
        net-data-pipeline:
            ipv4_address: 172.16.0.10
    ports: 
    - "3306:3306"
    volumes:
    - mysql-data:/var/lib/mysql

volumes: 
  streamsets-data:
  mysql-data:

networks:
    net-data-pipeline:
        driver: bridge        
        ipam:
            driver: default
            config:
              - subnet: 172.16.0.0/24

Salvamos esse script como o nome de env-docker-compose.yaml, vamos iniciar o docker-compose com o comando abaixo:

$ docker-compose -f env-docker-compose.yaml up

Agora é possível acessar o Streamsets através do seguinte endereço: http://172.16.0.11:18630/

Lembrando que o login e senha padrões são admin/admin 🙂

Script Shell

Com nosso ambiente rodando, vamos criar o script shell para baixar os dados de covid-19.

Primeiro precisamos acessar nosso contêiner onde está o Streamsets. Dentro do contêiner, iremos criar uma pasta onde iremos manter o shell script e também os dados baixados.

O script abaixo lista os contêineres rodando, podemos acessar o contêiner do Streamsets pelo ID.

$ docker container ls

O script abaixo acessa o contêiner pelo ID

$ docker exec -it ID_DO_CONTAINER /bin/bash

Vamos criar agora o diretório onde deixaremos o nosso script.

$ mkdir /opt/streamsets/app-data

Feito isso, utilizaremos o vim, editor de texto do linux via terminal, para criarmos e editarmos nosso arquivo.

Se digitarmos o comando abaixo, iremos criar o arquivo no diretório.

$ vi /opt/streamsets/app-data/pega_dados_covid.sh

Teremos a tela abaixo:

Para entrarmos no modo de inserção, devemos apertar a tecla “I”, a palavra “–INSERT–” deve aparecer no rodapé do terminal.

Agora, podemos colar o nosso código dentro do arquivo:

cd /opt/streamsets/app-data
wget https://data.brasil.io/dataset/covid19/caso_full.csv.gz
gunzip caso_full.csv.gz

Para salvarmos os dados devemos apertar “ESC” depois “:” e então escrever “wq”.

Agora, podemos executar nosso script.

$ sh /opt/streamsets/app-data/pega_dados_covid.sh

Esse passo está terminado, poderíamos, se fosse o caso, agendar uma tarefa dentro do crontab do linux, e fazer esse script executar 1x ao dia, ou 2x ao dia.

MySQL

Antes de criarmos o pipeline dentro do Streamsets, vamos criar a tabela onde iremos carregar os dados.

Para isso, vou usar o script abaixo dentro do MySQL.

CREATE DATABASE IF NOT EXISTS COVID19;

CREATE TABLE IF NOT EXISTS T_CARGA_COVID
(
    city varchar(50),
    city_ibge_code int,
    date date,
    epidemiological_week int,
    estimated_population int,
    estimated_population_2019 int,
    is_last BOOLEAN,
    is_repeated BOOLEAN,
    last_available_confirmed float,
    last_available_confirmed_per_100k_inhabitants float,
    last_available_date date,
    last_available_death_rate float,
    last_available_deaths float,
    order_for_place int,
    place_type varchar(10),
    state varchar(5),
    new_confirmed int,
    new_deaths int
);

Você pode usar qualquer interface para a comunicação, eu fiz através de um plugin do Visual Studio Code chamado “MySQL Client”.

Os dados para criar a conexão são:

Host: 172.16.0.10

Porta: 3306

Usuário: root

Senha: 12345

Streamsets

Acessamos o endereço do Streamsets: http://172.16.0.11:18630/

Os dados para login são:

Usuário: admin

Senha: admin

Bom, antes de iniciarmos o pipeline, vamos registrar o Streamsets. Isso será útil, pois, poderemos baixar outras opções dentro da plataforma.

Vamos no ícone de engrenagens e em “Activation”.

A janela abaixo será exibida, preenchemos os dados e eles enviam uma chave para ativação no e-mail.

A tela abaixo aparecerá e então colocamos a chave recebida no e-mail e ativamos o produto.

Prontinho! Agora vamos ao que interessa. 🙂

Pipeline

Primeiro vamos realizar a instalação de um pacote. Esse pacote é a conexão JDBC, que usaremos para conectar ao banco de dados.

Em “Package Manager”, o ícone do pacote, encontramos os pacotes já instalados. Dentro dele, podemos fazer a pesquisa por “JDBC”.

Abaixo o pacote será exibido:

Nos três pontos, do lado direito, podemos encontrar a opção de instalação do pacote, clicamos e então iniciamos a instalação.

Depois da instalação, você pode reiniciar o serviço para que as alterações fiquem vigentes.

Ufa! Agora, vamos criar nosso pipeline! 🙂

Vamos em “CREATE NEW PIPELINE”, conforme a tela abaixo.

Agora, preenchemos o titulo, descrição e um label, caso queiramos.

Concluindo esse passo, o Streamsets entra no pipeline, então vemos a seguinte janela:

Como podem reparar na imagem, no canto direito temos diversos recursos para utilização, utilizaremos para ler o diretório o recurso “Directory”, pesquisando seu nome em “Type to search” .

O ícone do “Directory” será adicionado ao nosso pipeline, em suas configurações, na parte de baixo, devemos colocar os seguintes parâmetros:

General

Name: Dados Covid-19

Files

Files Directory: /opt/streamsets/app-data

File Name Pattern: caso_full.csv

Data Format

Data Format: Delimited

Delimiter Format Type: Default CSV (ignores empty lines)

Header Line: With Header Line

Lines to Skip: 0

Em “Advanced Options”:

Parse NULLs: Deixar verdadeiro

NULL constant: Deixar vazio

Quando terminamos a configuração, ainda há alguns warnings, mas isso ocorre porque não destinamos o dado para lugar algum. Então é normal.

Agora, podemos adicionar o JDBC, que irá levar os dados para o MySQL.

Então vamos novamente em “Type to search” pesquisamos “JDBC producer” e adicionamos no pipeline.

Adicionamos os parâmetros abaixo:

General

Name: Ingestão

Description: Ingestão de dados no MySQL

JDBC

JDBC Connection String: jdbc:mysql://172.16.0.10:3306/COVID19

Table Name: T_CARGA_COVID

Também devemos habilitar as configurações avançadas, isso exibirá uma guia chamada “Advanced”.

Credentials

Username: root

Password: 12345

Advanced

Init Query: TRUNCATE TABLE COVID19.T_CARGA_COVID

Como a base de dados é completa, devemos primeiro limpar a tabela antes de realizar uma nova carga, então precisamos utilizar essa opção “Init Query” para fazer um truncate.

Termos o resultado abaixo:

Podemos observar que nos dois icones temos um circulo, então realizamos a ligação do Directory com o JDBC producer através dele, clicando no circulo do Directory e arrastando até o JDBC producer, ficando com o resultado abaixo:

Nesse momento, não temos mais warnings em nosso pipeline, porém, precisamos realizar o upload do arquivo .jar do conector do MySQL.

Podemos baixar no seguinte link: https://dev.mysql.com/downloads/connector/j/

Depois de baixar, descompactamos, e na opção “External Libraries” do JDBC producer, realizamos o upload.

Pronto! Depois disso podemos fazer o “start” do pipeline.

O próprio Streamsets gera um dashboard sobre o processo, como você pode conferir abaixo:

Se houver algum erro, poderemos ver no próprio gráfico. Como podemos observar, não tivemos erros no pipeline.

Importante: Essa tabela de carga contém tanto os dados consolidados por estado quanto os dados da cidade, então eu recomendo que a documentação seja lida antes de utilizar.

Documentação: https://github.com/turicas/covid19-br/blob/master/faq.md

Resumo

Nessa postagem fizemos:

  • Criamos um contêiner rodando o MySQL.
  • Criamos um volume para que os dados sejam persistidos.
  • Criamos um arquivo de docker-compose para coordenar os contêineres e fixar o seus IP’s.
  • Criamos um programa em shellscritp que realiza o download dos dados de COVID-19.
  • Instalamos bibliotecas externas no Streamsets.
  • Registramos a aplicação do Streamsets.
  • Criamos um pipeline que lê um arquivo CSV de um diretório e carrega em uma tabela no MySQL (Processo de ingestão de dados).

Bastante coisa, não? 😀

O Streamsets é bem completo e podemos montar muitas opções de pipeline. Poderíamos buscar dados de um tópico Kafka, buscar de outro banco de dados, ler uma API, buscar dados de um servidor SFTP, entre outros…

Recomento que leiam a documentação que é bem rica.

Documentação: https://streamsets.com/documentation/controlhub/latest/help/controlhub/UserGuide/GettingStarted/DPM.html

No próximo post veremos como instalar e configurar o Apache Superset.

Então até lá! 🙂


Jefferson Soares

Olá! Sou Jefferson. Trabalho com: Dados, Dashboards, SQL, SAS, Python e muito mais! Criei esse cantinho para postar alguns conhecimentos. :)

0 0 votes
Article Rating
Subscribe
Notify of
guest
0 Comentários
Inline Feedbacks
View all comments