Este artigo é um artigo espelhado de tradução automática, por favor clique aqui para ir para o artigo original.

Vista: 129246|Resposta: 17

[Fonte] Operações Kafka para filas de mensagens .NET/C# [com código-fonte]

[Copiar link]
Publicado em 13/04/2021 11:45:31 | | | |
Kafka é um sistema de mensagens distribuídas de alto desempenho desenvolvido pelo LinkedIn, amplamente utilizado em cenários como coleta de logs, processamento de dados em streaming, distribuição de mensagens online e offline, entre outros. Embora não seja projetado como um MQ tradicional, o Kafaka pode substituir sistemas de mensagens tradicionais como o ActiveMQ na maioria dos casos.

O Kafka organiza o fluxo de mensagens por tópicos, e o servidor que armazena as mensagens é chamado de corretor, e os consumidores podem assinar um ou mais tópicos. Para equilibrar a carga, as mensagens de um tópico podem ser divididas em múltiplas partições, e quanto mais partições, maior o paralelismo e o throughput do Kafka.

Clusters Kafka exigem suporte zookeeper para implementar clusters, e zookeeper já está incluído na distribuição kafka mais recente, que pode ser implantada para iniciar um servidor zookeeper e um servidor Kafka ao mesmo tempo, ou usar outros clusters zookeeper existentes.

Diferente do MQ tradicional, os consumidores precisam manter um offset por conta própria e, ao receber mensagens do kafka, só puxam mensagens após o offset atual. O cliente scala/java do Kafka já implementa essa parte da lógica salvando o offset para o tratador do zoológico. Cada consumidor pode escolher um ID, e consumidores com o mesmo ID só receberão a mesma mensagem uma vez.Se os consumidores de um tópico usam todos o mesmo id, trata-se de uma Fila tradicional. Se cada consumidor usa um ID diferente, é um pub-sub tradicional.

Revisar:

Adicionar ActiveMQ aos serviços do sistema no Windows
https://www.itsvse.com/thread-6210-1-1.html

NúmeroDeMensagensPendentes, MensagensEnfileiradas, Mensagens...
https://www.itsvse.com/thread-4954-1-1.html

Resumo das informações sobre ActiveMQ e RabbitMQ
https://www.itsvse.com/thread-4659-1-1.html

O CentOS ActiveMQ é adicionado ao serviço
https://www.itsvse.com/thread-4617-1-1.html

Tutorial de instalação ativamq de 64 bits do Centos 6.2
https://www.itsvse.com/thread-4616-1-1.html

ActiveMQ5.15.3 falha em iniciar, e o UnsupportedClassVersionError é reportado
https://www.itsvse.com/thread-4615-1-1.html

Configurações de permissão de tópicos do activemq
https://www.itsvse.com/thread-4495-1-1.html

Usuário itsvse não está autorizado a ler em: ActiveMQ.Advisory.TempQueue,Activ...
https://www.itsvse.com/thread-4476-1-1.html

Cliente ActiveMQ em C# assina o código-fonte
https://www.itsvse.com/thread-4470-1-1.html

.net/c# activemq para definir a conta de conexão e a senha
https://www.itsvse.com/thread-4282-1-1.html

Defina o nome de usuário e a senha para o tema e a fila ACTIVEMQ
https://www.itsvse.com/thread-4281-1-1.html

Activemq modifica a senha de gerenciamento de sites
https://www.itsvse.com/thread-4280-1-1.html

activemq Armazenamento persistente está cheio
https://www.itsvse.com/thread-4125-1-1.html

Exemplo de operação .NET/C# ActiveMQ [Código-fonte]
https://www.itsvse.com/thread-3907-1-1.html

Configuração de permissão do usuário Activemq
https://www.itsvse.com/thread-3906-1-1.html

A diferença entre a fila ativamq e o tópico é que
https://www.itsvse.com/thread-3863-1-1.html

. Plataforma .Net
https://www.itsvse.com/thread-3452-1-1.html

Configurações persistentes de assinatura do ActiveMQ
https://www.itsvse.com/thread-3451-1-1.html

Limite de processamento paralelo para consumidores no RabbitMQ BasicQos
https://www.itsvse.com/thread-4667-1-1.html

Persistência de Mensagens da Fila de Fila rabbitMQ [com código-fonte]
https://www.itsvse.com/thread-4657-1-1.html

【Practice】rabbitMQ console para adicionar informações da conta
https://www.itsvse.com/thread-4655-1-1.html

Uma análise aprofundada do mecanismo de resposta às mensagens do RabbitMQ
https://www.itsvse.com/thread-4639-1-1.html

.net/c# Desconexão e reconexão da conexão RabbitMQ
https://www.itsvse.com/thread-4636-1-1.html

Introdução aos três modos de troca (fanout, direct e topic) do RabbitMQ
https://www.itsvse.com/thread-4635-1-1.html

【Practice】RabbitMQ instala o plugin de gerenciamento web
https://www.itsvse.com/thread-4631-1-1.html

【Combate Prático】Tutorial de instalação do RabbitMQ no Windows
https://www.itsvse.com/thread-4630-1-1.html
Consumo de kafka

1. Consumidores do mesmo group_id, apenas um consumidor pode consumir mensagens (Modo fila de fila

2. Consumidores de diferentes group_id recebem as mesmas notícias

Vantagens do Kafka

Distribuído e altamente escalável. Clusters Kafka podem ser escalonados de forma transparente para adicionar novos servidores ao cluster.

Alto desempenho. O desempenho do Kafka supera muito o das implementações tradicionais do MQ, como ActiveMQ e RabbitMQ, especialmente o Kafka, que também suporta operações em lote. A imagem a seguir mostra os resultados do teste de estresse de desempenho do consumidor do LinkedIn:

Tolerância a falhas. Os dados de cada partição em Kafka são replicados para vários servidores. Quando um corretor falha, o serviço ZooKeeper notificará o produtor e o consumidor, que mudam para outro corretor.

Desvantagens do Kafka:

Repita mensagens. Kafka só garante que cada mensagem será entregue pelo menos uma vez, e embora as chances sejam pequenas, há a chance de que uma mensagem seja entregue várias vezes.
As notícias estão fora de ordem. Embora mensagens dentro de uma partição sejam garantidamente ordenadas, se um tópico tiver múltiplas partições, a entrega de mensagens entre partições não é garantida como ordenada.
Complexidade. Kafka requer o suporte de clusters de tratadores de zoológicos, e os tópicos geralmente exigem trabalho manual para criar, implantar e manter filas de mensagens mais caras do que filas de mensagens gerais

.NET/C# operações de Kafka na fila de mensagens .NET/C#

Primeiro, use o .NET Core 3.1 para criar dois novos projetos de console, a saber, Kafka-Consumer e Kafka-Producer

Use nuget para referenciar o pacote Confluent.Kafka assim, com o seguinte comando:

Endereço do GitHub:O login do hiperlink está visível.

Começamos o programa Produtor primeiro, e se começarmos o consumidor primeiro, receberemos o seguinte erro:
Erro ocorredo: Corretor: Tema ou partição desconhecida

Este artigo vai consumir configuraçõesEnableAutoOffsetStore é falso, ou seja, configurando manualmente o armazenamento de offset (semelhante a uma mensagem de confirmação manual)

Os consumidores não configuram a OffsetStore após o consumo

Tente usar o produtor para produzir duas mensagens, ative o consumo do consumidor, MaxPollIntervalMs = 10000 // 10 segundos sem configuração manual, permita que outros clientes consumam, claro, não será consumido por outros clientes em até 10 segundos

MaxPollIntervalMs explica
Para consumidores avançados, o tempo máximo permitido para consumir mensagens entre chamadas (por exemplo, rd_kafka_consumer_poll()). Se esse intervalo for ultrapassado, o consumidor é considerado como falhando e o grupo é rebalanceado para que a partição seja reatribuída a outro membro do grupo consumidor. Aviso: Commits offset podem não ser possíveis no momento. Nota: Recomenda-se definir "enable.auto.offset.store=false" para aplicações que estejam processando por muito tempo, e então armazenar explicitamente o offset (usando offsets_store()) após o processamento da mensagem* para garantir que o offset não seja automaticamente comprometido antes da conclusão do processamento. Verifique uma vez por segundo, em intervalos de dois. Para mais informações, veja KIP-62.

As renderizações são as seguintes:



OffsetStore é definido após o consumidor terminar de gastar

código

Depois que a configuração estiver concluída, espere 10 segundos e ele ainda vai funcionarRecebi a última mensagem(Quando o consumidor se conecta ao corretor,Comece o consumo a partir da posição de deslocamentoSe c.Commit(cr) for definido; A última mensagem não será recebida repetidamente.

Ver código-fonte



commit o offset + 1, e eventualmente chamar Librdkafka.topic_partition_list_destroy(cOffsets);

O login do hiperlink está visível.
O login do hiperlink está visível.

Defina um GroupID diferente

Tente definir um GroupId diferente via parâmetro de linha de comando e então envie uma mensagem pelo produtor, como mostrado na imagem a seguir:



Tanto clinet1 quanto client2Receber mensagens históricas, e depois que o produtor envia uma mensagem, ambos quase serãoReceber mensagens ao mesmo tempo

Novos consumidores só recebem novas mensagens

Como fazer com que um novo cliente receba apenas mensagens novas e ignore dados históricos?

As configurações são as seguintes:

Como mostrado abaixo:



Código do produtor

Como segue:

Código de consumo

Como segue:

Download do código-fonte

Turistas, se quiserem ver o conteúdo oculto deste post, por favorResposta






Anterior:.NET/C# Exceção usando a Caixa de Correio Empresarial Tencent: A operação expirou.
Próximo:NuGet limpa o cache
 Senhorio| Publicado em 15/04/2021 09:31:05 |
Quando o cliente .NET Kafka é desconectado, ele não lança uma exceção e se reconecta após a rede estar normal
%4|1618450028,267| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Desconectado (após 59926ms no estado UP)
%3|1618450028,267| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumidor-1: 192.168.1.175:9092/1: Desconectado (após 59926ms no estado UP)
%3|1618450028,267| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Conectar ao ipv4#192.168.1.175:9092 falhou: Erro desconhecido (após 0ms no estado CONNECT)
%3|1618450028,268| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumidor-1: 192.168.1.175:9092/1: Conectar ao ipv4#192.168.1.175:9092 falhou: Erro desconhecido (após 0ms no estado CONNECT)
%3|1618450028,357| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Conectar ao ipv4#192.168.1.175:9092 falhou: Erro desconhecido (após 10ms no estado CONNECT, 1 erro idêntico suprimido)
%3|1618450028,357| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumidor-1: 192.168.1.175:9092/1: Conectar ao ipv4#192.168.1.175:9092 falhou: Erro desconhecido (após 10ms no estado CONNECT, 1 erro idêntico suprimido)
%3|1618450062,882| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Conectar ao ipv4#192.168.1.175:9092 falhou: Erro desconhecido (após 0ms no estado CONNECT, 8 erro(s) idêntico(s) suprimido)
%3|1618450062,882| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumidor-1: 192.168.1.175:9092/1: Conecte ao ipv4#192.168.1.175:9092 falhou: Erro desconhecido (após 0ms no estado CONNECT, 8 erro(s) idêntico(s) suprimido)
%3|1618450098,255| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Conectar ao ipv4#192.168.1.175:9092 falhou: Erro desconhecido (após 11ms no estado CONNECT, 4 erro(s) idêntico(s) suprimido(s)
%3|1618450098,255| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumidor-1: 192.168.1.175:9092/1: Conectar ao ipv4#192.168.1.175:9092 falhou: Erro desconhecido (após 11ms no estado CONNECT, 4 erro(s) idêntico(s) suprimido)
%3|1618450138,243| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Conectar ao ipv4#192.168.1.175:9092 falhou: Erro desconhecido (após 0ms no estado CONNECT, 4 erro(s) idêntico(s) suprimido)
%3|1618450138,244| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumidor-1: 192.168.1.175:9092/1: Conecte ao ipv4#192.168.1.175:9092 falhou: Erro desconhecido (após 0ms no estado CONNECT, 4 erro(s) idêntico(s) suprimido)
%3|1618450168,254| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Conectar ao ipv4#192.168.1.175:9092 falhou: Erro desconhecido (após 10ms no estado CONNECT, 3 erro(s) idênticos suprimidos)
%3|1618450168,254| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumidor-1: 192.168.1.175:9092/1: Conectar ao ipv4#192.168.1.175:9092 falhou: Erro desconhecido (após 10ms no estado CONNECT, 3 erro(s) idêntico(s) suprimido)

 Senhorio| Publicado em 13/04/2021 16:26:58 |
Princípio do consumo de mensagens:

No processo de produção propriamente dito, cada tópico terá múltiplas partições, e a vantagem de múltiplas partições é que, por um lado, a capacidade de fragmentar os dados no broker reduz efetivamente a capacidade de mensagens e melhora o desempenho da IO. Por outro lado, para melhorar o poder de consumo do lado do consumidor, o mesmo tema geralmente será consumido por múltiplos consumidores, ou seja, o mecanismo de balanceamento de carga do lado do consumidor, que é o que entenderemos a seguir: como os consumidores consomem mensagens no caso de múltiplas partições e múltiplos consumidores? Kafka existe no conceito de grupos de consumidores, ou seja, group.id o mesmo tipo de consumidores, que pertencem a um grupo de consumidores, e todos os consumidores desse grupo coordenam para consumir todas as partições do tema de assinatura. Claro, cada partição só pode ser consumida por consumidores do mesmo grupo de consumidores, então como os consumidores do mesmo grupo de consumidores alocam os dados em qual partição deve ser consumida? Por exemplo, se houver partições perdendo, ou seja, quando o número de partitons é igual ao número de comumadores, cada comumador corresponde a uma partição; se o número de comumadores for maior que partições, então o número extra de comumadores não funcionará; pelo contrário, haverá comumadores consumindo múltiplas partições.

Estratégia de Atribuição de Zoneamento:

Na kafka, existem duas estratégias de alocação de partições, uma é Range (padrão) e a outra é RoundRobin (polling). Isso é definido pelo parâmetro partition.assignment.strategy de configuração do consumidor.


Veja todos os tópicos


Veja detalhes de um tema




 Senhorio| Publicado em 08/05/2021 17:17:33 |
Kafka exclui grupos de consumidores



A exclusão dos grupos de consumidores solicitados ('itsvse') foi bem-sucedida.


Os seguintes erros podem ser relatados:

Error: Deletion of some consumer groups failed:
* O grupo 'itsvse' não pôde ser excluído devido a: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: O grupo não está vazio.
solução

Consuma todas as mensagens ou defina um offset

Kafka define manualmente o deslocamento
https://www.itsvse.com/thread-9641-1-1.html
Depois, apague de novo!

 Senhorio| Publicado em 13/04/2021 15:40:48 |
Comando Power Shell



Cada cliente consumidor mantém 2 conexões com o serviço Kafka
 Senhorio| Publicado em 07/05/2021 12:37:06 |
Kafka, para visualizar o número de pilhas de tópicos em um grupo especificado

Publicado em 16/06/2021 12:41:09 |
Por favor, pergunte por que o código não pode ser visualizado~
 Senhorio| Publicado em 25/06/2021 10:50:06 |
Kafka recebe o comando de tamanho do tópico:



 Senhorio| Publicado em 18/07/2021 10:15:01 |
Linha de comando Kafka para criar tópicos:

Publicado em 03/09/2021 11:52:41 |
Ainda existem muitas armadilhas na kafka, aprendidas
Disclaimer:
Todo software, material de programação ou artigos publicados pela Code Farmer Network são apenas para fins de aprendizado e pesquisa; O conteúdo acima não deve ser usado para fins comerciais ou ilegais, caso contrário, os usuários terão todas as consequências. As informações deste site vêm da Internet, e disputas de direitos autorais não têm nada a ver com este site. Você deve deletar completamente o conteúdo acima do seu computador em até 24 horas após o download. Se você gosta do programa, por favor, apoie um software genuíno, compre o registro e obtenha serviços genuínos melhores. Se houver qualquer infração, por favor, entre em contato conosco por e-mail.

Mail To:help@itsvse.com