Fala, pessoal! Tudo certo?

Espero que sim!

Recentemente adquiri o livro “97 things every data engineer should know”, o livro retrata 97 dicas de diversos engenheiros de dados ao redor do mundo, e é bem curiosa a forma como eles realizaram a escrita do livro, onde você pode abrir o livro em qualquer uma das páginas e simplesmente pescar uma dica.

Ele não trás em seu conteúdo algo encadeado com uma ordem logica do mais simples ao complexo, lendo-o, me senti em um papo com cada um dos engenheiros que me traziam essas dicas.

E uma dessas dicas, vem da engenheira Emily Riederer, na pagina 28 do livro, sobre “nome de colunas como contrato”.

Pesquisando sobre ela, achei um artigo em seu blog muito mais completo do que o livros nos trás.

Sugiro dar uma olhada no artigo: https://emilyriederer.netlify.app/post/column-name-contracts/, e também dar uma olhada no que ela vem produzindo por lá, o conteúdo é muito legal!

O que seriam nomes de colunas como contrato?

No desenvolvimento de software, temos diversos padrões de nomenclatura que nos auxiliam no entendimento de uma função, programa, arquivo, etc.

Por exemplo, quando usamos o pytest para escrever testes para nossa aplicação, usamos obrigatoriamente o prefixo “test” antes da função, conforme o exemplo abaixo:

def inc(x):
    return x + 1

def test_answer():
    assert inc(3) == 5

Claro, no caso dos testes, o próprio pytest nos “obriga” a usar o prefixo, mas creio que deu para ilustrar o exemplo da utilização de padrões para uma determinada situação.

Do mesmo modo, informações padronizadas nos nomes das colunas podem trazer um grande entendimento sobre o tipo de dados que temos dentro dela, bem como quais são as validações que podemos realizar e indo um pouco mais longe, até mesmo criar validações padronizadas dado o nome da coluna.

Padrão de colunas em três níveis

A autora, separa em 3 níveis o padrão de nomenclatura, veja:

  1. Tipo da coluna.
  2. Assunto da coluna.
  3. Detalhes da coluna.

Com isso podemos contar a “historia” da coluna, veja o exemplo abaixo:

Só de olharmos o nome da coluna, sabemos que é a data de inicio da proposta, certo? 🙂

Sabendo disso, poderíamos executar testes para essa coluna de acordo com o nosso contexto, por exemplo:

  • Essa coluna sempre tem a data preenchida (not null).
  • Essa coluna sempre tem a data menor ou igual a coluna DT_FINAL_PROPOSTA.

Sei que parece meio obvio, mas são essas pequenas coisas que irão dar qualidade em nossos dados lá no final.

Normalmente, poderíamos criar constraints dentro do nosso banco para garantir essas regras, mas digamos que recebemos essas informações em csv ou txt e carregamos em nosso data lake?

O ideal é criarmos testes para ter o feedback rápido de que algo está errado.

Exemplo de testes

Vamos aproveitar o exemplo acima e desenhar alguns testes para essa tabela? Vou usar o pyspark + pytest para formular alguns testes para a base.

Você pode encontrar os scripts utilizados no github do blog: https://github.com/thedataengineerblog/column-names-as-contracts

Basicamente, criei uma função (transform) que realiza um simples cast, colocando as tipagens dos dados. O normal é termos algumas regras de negocio, pois o objetivo de testes é o feedback rápido.

Abaixo a função que realiza a transformação dos dados:

def transform(df):
    df = (
        df
        .withColumn("ID_NUM_PROPOSTA", col("ID_NUM_PROPOSTA").cast('integer'))
        .withColumn("DT_INICIO_PROPOSTA", to_date("DT_INICIO_PROPOSTA"))
        .withColumn("DT_FIM_PROPOSTA", to_date("DT_FIM_PROPOSTA"))
        .withColumn("VL_PROPOSTA", col("VL_PROPOSTA").cast('double'))
    )
    return df

Então criei algumas funções que irão testar nossas premissas de qualidade.

@pytest.fixture()
def table():    
    spark = SparkSession.builder.getOrCreate()
    
    df = (
        spark
        .read
        .option("header", True)
        .csv("data/proposta.csv", sep=";")
    )    
        
    yield transform(df)
    
class TestTablePropostas():

    def test_if_exists_dt_init_greater_than_dt_end(self, table):        
        qtd = table.filter(col("DT_INICIO_PROPOSTA")>col("DT_FIM_PROPOSTA") ).count()
        assert qtd == 0

    def test_if_exists_nulls(self, table):
        cols = [
                    'ID_NUM_PROPOSTA',
                    'DT_INICIO_PROPOSTA',
                    'DT_FIM_PROPOSTA',
                    'VL_PROPOSTA',
              ]        
        for c in cols:
            assert 0 == table.filter(col(c).isNull()).count(), f"{c}: contains nulls"

Dentro do repo, temos 2 bases. Uma com erros e outra sem erros.

Primeiro vamos testar a sem erros.

Você pode alterar a base na função table.

Para executar o teste, basta estar com o ambiente ativado (existem alguns passos no repo para a preparação do ambiente) e digitar “pytest” em seu console.

Como pode-se ver, na base sem erros, nossos testes passaram tranquilamente.

Agora, mudamos a base para com erros e executamos o pytest novamente:

Como pudemos ver, os dois testes falharam e tivemos o feedback rápido da qualidade do processo o que poderia evitar que esses dados inconsistentes parassem no nosso lake.

Conclusão

Eu já havia visto esse tipo de padrão anteriormente no trabalho, mas nunca vi estruturado da maneira que a Emilly Riederer estruturou.

Espero que tenham gostado dessa dica, eu gostei bastante de fazer esse post e entendo o quanto que isso pode impactar na qualidade de nosso pipeline.

Obrigado por lêr e até logo!

🙂


Jefferson Soares

Olá! Sou Jefferson. Trabalho com: Dados, Dashboards, SQL, SAS, Python e muito mais! Criei esse cantinho para postar alguns conhecimentos. :)

0 0 votes
Article Rating
Subscribe
Notify of
guest
0 Comentários
Inline Feedbacks
View all comments