Salve, pessoal!
Tudo certo?
Você conhece o que é CDC (Change Data Capture)?
Basicamente, CDC é uma técnica que, com base na leitura dos logs de um determinado banco transacional, realiza a leitura desses logs e “captura” esse evento, permitindo que nós realizemos a ingestão desse evento (normalmente via streaming de dados), seja ele delete, update ou insert.
Hoje iremos conhecer o Debezium, uma ferramenta open source para captura de mudanças em bancos de dados que é amplamente utilizada dentro do mundo de engenharia de dados.
O que iremos fazer?
Prepararei um ambiente em kubernetes que terá:
- Um pod rodando Postgres.
- Um pod rodando o Kafka para simularmos um broker.
- Um pod rodando o Kafka UI para vermos as mensagens.
- Um pod rodando o Debezium onde iremos cadastrar nossos conectores.
- Um pod rodando o Zookeeper (Usado com o Kafka).
- Um pod com uma aplicação Python realizando a simulação das transações (insert, update e delete).
Você poderá achar todos os arquivos nesse repositório, caso queira realizar o experimento: https://github.com/jeffwsoares/cdc-postgres
Preparando o ambiente
Deixei no repositório um arquivo chamado Makefile contendo todos os passos para subirmos o ambiente.
Se você tem duvidas de como usar um arquivo do tipo makefile, dê uma olhada nesse link aqui: https://makefiletutorial.com/
Para esse tutorial é necessário que você tenha o kubernetes instalado, caso não tenha, sugiro ir na postagem seguinte que eu explico como fazer: https://thedataengineer.com.br/2022/11/22/iniciando-com-kubernetes-parte-1/
Feito isso, vamos ao primeiro comando que irá preparar os pods.
make k8s-setup
Para conferir se o processo está rodando, utilize o comando abaixo:
make k8s-get-all
Como resultado, você deve ter a saída abaixo:
*** Um ultimo ponto de atenção para o ambiente.
Eu precisei redefinir os recursos usados no minikube, pois o default é bem baixo para o ambiente que estamos fazendo. Caso precise, você pode seguir os comandos abaixo:
minikube stop
minikube config set memory 8192
minikube config set cpus 4
minikube start
Sobre o Debezium
Podemos utilizar diversos conectores dentro do Debezium para recuperar as mudanças ocorridas no banco de dados.
Dê uma olhada nos conectores suportados até aqui no momento dessa postagem: https://debezium.io/documentation/reference/stable/connectors/index.html
Uma vez que o pod esteja executando (demora um pouquinho para tudo estar on-line), podemos interagir com o Debezium utilizando a API Rest que a aplicação disponibiliza.
Para verificar se o Debezium já está rodando, podemos ver com o Kafka UI se os tópicos que ele utiliza para controle já estão criados.
Como estamos usando kubernetes, criei um atalho no Makefile que podemos usar para redirecionar o trafego do Kafka UI para o localhost da maquina, basta digitar o comando abaixo:
make k8s-kui-pf
Feito isso, podemos acessar o localhost:8080 para checarmos os tópicos.
*** Os tópicos do Debezium são criados conforme algumas variáveis de ambiente que passamos dentro da definição do Pod.
Veja o arquivo: dbz-connect-pod.yaml
Cadastrando o Connector no Debezium
Uma vez que temos os tópico do Debezium criados, vamos realizar o cadastro do nosso conector que será responsável por capturar as mudanças ocorridas no banco de dados.
Para o exemplo, vou usar apenas uma tabela chamada customers, mas podemos ter múltiplas tabelas dentro de um mesmo cadastro.
Dentro do Pod do app, temos um ConfigMap com a aplicação que cria a tabela e simula as transações.
Veja o arquivo: app-cm.yaml
Vamos usar o Postman para realizar o cadastro do nosso conector, então primeiramente iremos realizar o redirecionamento da porta do Debezium para o localhost:8083 usando nosso Makefile.
make k8s-dbz-pf
Feito isso, estamos preparados para cadastrar o conector via Postman.
Usando o json abaixo:
{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres-svc.ns-dbz-data-engineer.svc.cluster.local",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "debezium_db",
"database.server.name": "postgres",
"table.include.list": "public.CUSTOMERS",
"plugin.name": "pgoutput"
}
}
Os conectores tem diversas opções de parâmetros, recomento uma olhada na documentação documentação: https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-connector-properties
Agora temos o conector cadastrado! Veja:
Após um tempo, podemos ver o tópico (postgres.public.customers) criado no Kafka UI:
Quando clicamos em postgres.public.customers>mensagens no Kafka UI.
Podemos ver os dados que são inseridos, atualizados e deletados!
Basta vermos a propriedade “payload”, basicamente a logica é a seguinte:
No payload, temos as propriedades after e before. Cada uma das propriedade contém os dados que tiveram transações no banco.
Quando a propriedade before estiver com o valor “null”, significa que a transação é um insert.
Quando a propriedade after estiver com o valor “null”, significa que a transação é um delete.
Quando a propriedade after e before tiverem valores preenchidos, teremos o estado de como estava o registro e como ficou, no caso, um update.
Exemplo de dado inserido:
Exemplo de dado deletado.
Exemplo de dado atualizado.
Conclusão
Change Data Capture é um tópico muito interessante da engenharia de dados e é uma forma muito prática de realizar ingestão de dados. Poderíamos, por exemplo, plugar um SparkStreaming no Kafka e realizar essas ingestões em nrt.
Caso queira, você pode modificar os arquivos e testar em outros conectores, como Cassandra e Mongo, por exemplo!
Espero que você tenha gostado da postagem de hoje e que ela te ajude no seu dia a dia!
Um abraço!
🙂