Kafka é um sistema de mensagens distribuídas de alto desempenho desenvolvido pelo LinkedIn, amplamente utilizado em cenários como coleta de logs, processamento de dados em streaming, distribuição de mensagens online e offline, entre outros. Embora não seja projetado como um MQ tradicional, o Kafaka pode substituir sistemas de mensagens tradicionais como o ActiveMQ na maioria dos casos.
O Kafka organiza o fluxo de mensagens por tópicos, e o servidor que armazena as mensagens é chamado de corretor, e os consumidores podem assinar um ou mais tópicos. Para equilibrar a carga, as mensagens de um tópico podem ser divididas em múltiplas partições, e quanto mais partições, maior o paralelismo e o throughput do Kafka.
Clusters Kafka exigem suporte zookeeper para implementar clusters, e zookeeper já está incluído na distribuição kafka mais recente, que pode ser implantada para iniciar um servidor zookeeper e um servidor Kafka ao mesmo tempo, ou usar outros clusters zookeeper existentes.
Diferente do MQ tradicional, os consumidores precisam manter um offset por conta própria e, ao receber mensagens do kafka, só puxam mensagens após o offset atual. O cliente scala/java do Kafka já implementa essa parte da lógica salvando o offset para o tratador do zoológico. Cada consumidor pode escolher um ID, e consumidores com o mesmo ID só receberão a mesma mensagem uma vez.Se os consumidores de um tópico usam todos o mesmo id, trata-se de uma Fila tradicional. Se cada consumidor usa um ID diferente, é um pub-sub tradicional.
Revisar:
Consumo de kafka
1. Consumidores do mesmo group_id, apenas um consumidor pode consumir mensagens (Modo fila de fila)
2. Consumidores de diferentes group_id recebem as mesmas notícias
Vantagens do Kafka
Distribuído e altamente escalável. Clusters Kafka podem ser escalonados de forma transparente para adicionar novos servidores ao cluster.
Alto desempenho. O desempenho do Kafka supera muito o das implementações tradicionais do MQ, como ActiveMQ e RabbitMQ, especialmente o Kafka, que também suporta operações em lote. A imagem a seguir mostra os resultados do teste de estresse de desempenho do consumidor do LinkedIn:
Tolerância a falhas. Os dados de cada partição em Kafka são replicados para vários servidores. Quando um corretor falha, o serviço ZooKeeper notificará o produtor e o consumidor, que mudam para outro corretor.
Desvantagens do Kafka:
Repita mensagens. Kafka só garante que cada mensagem será entregue pelo menos uma vez, e embora as chances sejam pequenas, há a chance de que uma mensagem seja entregue várias vezes. As notícias estão fora de ordem. Embora mensagens dentro de uma partição sejam garantidamente ordenadas, se um tópico tiver múltiplas partições, a entrega de mensagens entre partições não é garantida como ordenada. Complexidade. Kafka requer o suporte de clusters de tratadores de zoológicos, e os tópicos geralmente exigem trabalho manual para criar, implantar e manter filas de mensagens mais caras do que filas de mensagens gerais
.NET/C# operações de Kafka na fila de mensagens .NET/C#
Primeiro, use o .NET Core 3.1 para criar dois novos projetos de console, a saber, Kafka-Consumer e Kafka-Producer
Use nuget para referenciar o pacote Confluent.Kafka assim, com o seguinte comando:
Endereço do GitHub:O login do hiperlink está visível.
Começamos o programa Produtor primeiro, e se começarmos o consumidor primeiro, receberemos o seguinte erro:
Erro ocorredo: Corretor: Tema ou partição desconhecida Este artigo vai consumir configuraçõesEnableAutoOffsetStore é falso, ou seja, configurando manualmente o armazenamento de offset (semelhante a uma mensagem de confirmação manual)
Os consumidores não configuram a OffsetStore após o consumo
Tente usar o produtor para produzir duas mensagens, ative o consumo do consumidor, MaxPollIntervalMs = 10000 // 10 segundos sem configuração manual, permita que outros clientes consumam, claro, não será consumido por outros clientes em até 10 segundos
MaxPollIntervalMs explica
Para consumidores avançados, o tempo máximo permitido para consumir mensagens entre chamadas (por exemplo, rd_kafka_consumer_poll()). Se esse intervalo for ultrapassado, o consumidor é considerado como falhando e o grupo é rebalanceado para que a partição seja reatribuída a outro membro do grupo consumidor. Aviso: Commits offset podem não ser possíveis no momento. Nota: Recomenda-se definir "enable.auto.offset.store=false" para aplicações que estejam processando por muito tempo, e então armazenar explicitamente o offset (usando offsets_store()) após o processamento da mensagem* para garantir que o offset não seja automaticamente comprometido antes da conclusão do processamento. Verifique uma vez por segundo, em intervalos de dois. Para mais informações, veja KIP-62. As renderizações são as seguintes:
OffsetStore é definido após o consumidor terminar de gastar
código
Depois que a configuração estiver concluída, espere 10 segundos e ele ainda vai funcionarRecebi a última mensagem(Quando o consumidor se conecta ao corretor,Comece o consumo a partir da posição de deslocamentoSe c.Commit(cr) for definido; A última mensagem não será recebida repetidamente.
Ver código-fonte
commit o offset + 1, e eventualmente chamar Librdkafka.topic_partition_list_destroy(cOffsets);
O login do hiperlink está visível.
O login do hiperlink está visível.
Defina um GroupID diferente
Tente definir um GroupId diferente via parâmetro de linha de comando e então envie uma mensagem pelo produtor, como mostrado na imagem a seguir:
Tanto clinet1 quanto client2Receber mensagens históricas, e depois que o produtor envia uma mensagem, ambos quase serãoReceber mensagens ao mesmo tempo。
Novos consumidores só recebem novas mensagens
Como fazer com que um novo cliente receba apenas mensagens novas e ignore dados históricos?
As configurações são as seguintes:
Como mostrado abaixo:
Código do produtor
Como segue:
Código de consumo
Como segue:
Download do código-fonte
Turistas, se quiserem ver o conteúdo oculto deste post, por favor Resposta
|