TudoSobre.dev

O universo dev ao seu alcance.

TudoSobre.dev

O universo dev ao seu alcance.

Apache Kafka e .Net: como produzir e consumir mensagens

O Apache Kafka é uma ferramenta útil em vários cenários, como integrações de sistemas, processamento assíncrono, processamento de streams, microsserviços, etc. Bastante usado por muitas empresas, possui uma vasta comunidade. Neste artigo, vamos ver como usar o Apache Kafka no .Net, com exemplos de produção e consumo de mensagens.


Rodando o Kafka localmente usando o Docker Compose

É bastante simples rodar o Kafka localmente, usando o docker compose (é necessário ter o docker instalado). Crie um arquivo chamado docker-compose.yml e adicione o seguinte conteúdo:

services:
  broker:
    image: apache/kafka:latest
    hostname: broker
    container_name: broker
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_NODE_ID: 1
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
      KAFKA_LISTENERS: PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk

Depois, estando na mesma pasta onde está o arquivo docker-compose.yml, digite o seguinte comando:

 docker compose up -d

Em instantes o Kafka estará em execução na porta 9092.


Definição das aplicações

Vamos criar duas aplicações do tipo console, uma produtora e uma consumidora. A produtora enviará as mensagens para o tópico “mensagens” do Kafka; a consumidora ficará “escutando” o tópico “mensagens”, e quando uma mensagem chegar, ela será recebida e tratada.

Este será um cenário simples, mas que pode atender as necessidades em muitos casos reais, onde uma aplicação necessita enviar informações para outra.

Aplicação Produtora

Vamos criar a aplicação produtora de mensagens, ou seja, a aplicação que vai enviar mensagens através do Kafka. No Visual Studio 2022, crie um projeto do tipo Blank Solution e dê o nome de KafkaLab. Depois, adicione à Solution criada uma aplicação do tipo Console usando o .Net 9 (pode ser também a versão 6, 7 ou 8) e dê o nome de ProducerApp. Via Nuget, adicione o pacote Confluent.Kafka. Agora vamos para o código:

// APLICAÇÃO PRODUTORA
using Confluent.Kafka;

// Configuração do Kafka
var producerConfig = new ProducerConfig
{
    BootstrapServers = "localhost:9092",
};
var producer = new ProducerBuilder<string, string>(producerConfig).Build();

Console.CancelKeyPress += Console_CancelKeyPress;

Console.WriteLine("Digite a mensagem a ser enviada.");
Console.WriteLine("- Tecle ENTER para enviar");
Console.WriteLine("- Tecle Ctrl + C para sair");
string topic = "mensagens";

while (true)
{
    var msg = Console.ReadLine();
    if (string.IsNullOrWhiteSpace(msg))
        continue;

    await producer.ProduceAsync(topic, new Message<string, string>() { Key = "", Value = msg });
}

void Console_CancelKeyPress(object? sender, ConsoleCancelEventArgs e)
{
    producer.Dispose();
    Environment.Exit(0);
}


Aplicação Consumidora

A aplicação consumidora será a que receberá as mensagens enviadas pela aplicação produtora. Em cenários reais, ela poderia atuar como uma aplicação que recebe os eventos gerados por outras aplicações e os processa, ou então seria a aplicação que executaria processos em background demandados por outras aplicações… Os casos de uso são infinitos! Vamos adicionar à Solution KafkaLab uma nova aplicação do tipo console igual a aplicação produtora, e dê o nome de ConsumerApp. Adicione também o pacote Confluent.Kafka. Segue o código:

// APLICAÇÃO CONSUMIDORA
using Confluent.Kafka;

// Configurando o consumer do Kafka
var consumerConfig = new ConsumerConfig()
{
    BootstrapServers = "localhost:9092",
    GroupId = "mensagens.grp.id",
    EnableAutoCommit = false 
};
var topic = "mensagens";
using var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
consumer.Subscribe(topic);

var source = new CancellationTokenSource();
CancellationToken cancellationToken = source.Token; 

Console.WriteLine("Aguardando mensagens...");
Console.WriteLine("- Tecle Ctrl + C para sair");

Console.CancelKeyPress += Console_CancelKeyPress;

while (true)
{
    try
    {
        // Obtém a mensagem quando ela chega
        var result = consumer.Consume(cancellationToken);

        if (result != null)
        {
            // Extraindo o texto enviado
            var message = result.Message.Value;
            Console.WriteLine($"[RECEBIDO] Mensagem: {message}");

            // Confirma que a mensagem foi processada com sucesso, e com isso
            // o Kafka move o offset para que esta mensagem não seja processada novamente.
            consumer.Commit();
        }
    }
    catch (OperationCanceledException)
    {
        break;
    }
    catch (ConsumeException e)
    {
        var errorMessage = $"[KAFKA] Consume error: {e.Error.Reason}";

        if (e.Error.IsFatal)
            break;
    }
    catch (Exception e)
    {
        var errorMessage = $"[KAFKA] Erro ao consumir mensagem: {e.Message}";
        Console.WriteLine($"[ERRO] {errorMessage}");
    }
}

// Ao digitar Control + C, o consumer é fechado e descartado, e o programa é encerrado.
void Console_CancelKeyPress(object? sender, ConsoleCancelEventArgs e)
{
    Console.WriteLine("Cancelando...");
    source.Cancel();
    consumer.Close();
    consumer.Dispose();
    Environment.Exit(0);
}


Executando

Para executar as duas aplicações simultaneamente, siga os seguintes passos:

Clique na opção “Configure Startup Projects” conforme imagem abaixo:

Será aberta uma janela para configurar a execução múltipla das aplicações, deixe conforme a imagem abaixo:

Após clicar em Ok, você pode executar as aplicações pressionando F5.

Examine o código, tente entender, faça alterações, experimentos, fuce bastante… Assim você vai aprender de verdade!

Em breve vamos criar duas APIs web se comunicando através do Kafka, fique atento!

Apache Kafka e .Net: como produzir e consumir mensagens

Deixe um comentário

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *

Rolar para o topo