Salve, pessoal!

Tudo certo?

Você conhece o que é CDC (Change Data Capture)?

Basicamente, CDC é uma técnica que, com base na leitura dos logs de um determinado banco transacional, realiza a leitura desses logs e “captura” esse evento, permitindo que nós realizemos a ingestão desse evento (normalmente via streaming de dados), seja ele delete, update ou insert.

Hoje iremos conhecer o Debezium, uma ferramenta open source para captura de mudanças em bancos de dados que é amplamente utilizada dentro do mundo de engenharia de dados.

O que iremos fazer?

Prepararei um ambiente em kubernetes que terá:

  • Um pod rodando Postgres.
  • Um pod rodando o Kafka para simularmos um broker.
  • Um pod rodando o Kafka UI para vermos as mensagens.
  • Um pod rodando o Debezium onde iremos cadastrar nossos conectores.
  • Um pod rodando o Zookeeper (Usado com o Kafka).
  • Um pod com uma aplicação Python realizando a simulação das transações (insert, update e delete).

Você poderá achar todos os arquivos nesse repositório, caso queira realizar o experimento: https://github.com/jeffwsoares/cdc-postgres

Preparando o ambiente

Deixei no repositório um arquivo chamado Makefile contendo todos os passos para subirmos o ambiente.

Se você tem duvidas de como usar um arquivo do tipo makefile, dê uma olhada nesse link aqui: https://makefiletutorial.com/

Para esse tutorial é necessário que você tenha o kubernetes instalado, caso não tenha, sugiro ir na postagem seguinte que eu explico como fazer: https://thedataengineer.com.br/2022/11/22/iniciando-com-kubernetes-parte-1/

Feito isso, vamos ao primeiro comando que irá preparar os pods.

make k8s-setup

Para conferir se o processo está rodando, utilize o comando abaixo:

make k8s-get-all

Como resultado, você deve ter a saída abaixo:

*** Um ultimo ponto de atenção para o ambiente.

Eu precisei redefinir os recursos usados no minikube, pois o default é bem baixo para o ambiente que estamos fazendo. Caso precise, você pode seguir os comandos abaixo:

minikube stop
minikube config set memory 8192
minikube config set cpus 4
minikube start

Sobre o Debezium

Podemos utilizar diversos conectores dentro do Debezium para recuperar as mudanças ocorridas no banco de dados.

Dê uma olhada nos conectores suportados até aqui no momento dessa postagem: https://debezium.io/documentation/reference/stable/connectors/index.html

Uma vez que o pod esteja executando (demora um pouquinho para tudo estar on-line), podemos interagir com o Debezium utilizando a API Rest que a aplicação disponibiliza.

Para verificar se o Debezium já está rodando, podemos ver com o Kafka UI se os tópicos que ele utiliza para controle já estão criados.

Como estamos usando kubernetes, criei um atalho no Makefile que podemos usar para redirecionar o trafego do Kafka UI para o localhost da maquina, basta digitar o comando abaixo:

make k8s-kui-pf

Feito isso, podemos acessar o localhost:8080 para checarmos os tópicos.

*** Os tópicos do Debezium são criados conforme algumas variáveis de ambiente que passamos dentro da definição do Pod.

Veja o arquivo: dbz-connect-pod.yaml

Cadastrando o Connector no Debezium

Uma vez que temos os tópico do Debezium criados, vamos realizar o cadastro do nosso conector que será responsável por capturar as mudanças ocorridas no banco de dados.

Para o exemplo, vou usar apenas uma tabela chamada customers, mas podemos ter múltiplas tabelas dentro de um mesmo cadastro.

Dentro do Pod do app, temos um ConfigMap com a aplicação que cria a tabela e simula as transações.

Veja o arquivo: app-cm.yaml

Vamos usar o Postman para realizar o cadastro do nosso conector, então primeiramente iremos realizar o redirecionamento da porta do Debezium para o localhost:8083 usando nosso Makefile.

make k8s-dbz-pf

Feito isso, estamos preparados para cadastrar o conector via Postman.

Usando o json abaixo:

{
    "name": "postgres-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "postgres-svc.ns-dbz-data-engineer.svc.cluster.local",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname" : "debezium_db",
        "database.server.name": "postgres",        
        "table.include.list": "public.CUSTOMERS",
        "plugin.name": "pgoutput"
    }
}

Os conectores tem diversas opções de parâmetros, recomento uma olhada na documentação documentação: https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-connector-properties

Agora temos o conector cadastrado! Veja:

Após um tempo, podemos ver o tópico (postgres.public.customers) criado no Kafka UI:

Quando clicamos em postgres.public.customers>mensagens no Kafka UI.

Podemos ver os dados que são inseridos, atualizados e deletados!

Basta vermos a propriedade “payload”, basicamente a logica é a seguinte:

No payload, temos as propriedades after e before. Cada uma das propriedade contém os dados que tiveram transações no banco.

Quando a propriedade before estiver com o valor “null”, significa que a transação é um insert.

Quando a propriedade after estiver com o valor “null”, significa que a transação é um delete.

Quando a propriedade after e before tiverem valores preenchidos, teremos o estado de como estava o registro e como ficou, no caso, um update.

Exemplo de dado inserido:

Exemplo de dado deletado.

Exemplo de dado atualizado.

Conclusão

Change Data Capture é um tópico muito interessante da engenharia de dados e é uma forma muito prática de realizar ingestão de dados. Poderíamos, por exemplo, plugar um SparkStreaming no Kafka e realizar essas ingestões em nrt.

Caso queira, você pode modificar os arquivos e testar em outros conectores, como Cassandra e Mongo, por exemplo!

Espero que você tenha gostado da postagem de hoje e que ela te ajude no seu dia a dia!

Um abraço!

🙂


Jefferson Soares

Olá! Sou Jefferson. Trabalho com: Dados, Dashboards, SQL, SAS, Python e muito mais! Criei esse cantinho para postar alguns conhecimentos. :)

0 0 votos
Article Rating
Inscrever-se
Notificar de
guest
0 Comentários
mais antigos
mais recentes Mais votado
Feedbacks embutidos
Ver todos os comentários