Este artículo es un artículo espejo de traducción automática, por favor haga clic aquí para saltar al artículo original.

Vista: 129246|Respuesta: 17

[Fuente] Operaciones de Kafka para colas de mensajes .NET/C# [con código fuente]

[Copiar enlace]
Publicado en 13/4/2021 11:45:31 | | | |
Kafka es un sistema de mensajería distribuida de alto rendimiento desarrollado por LinkedIn que se utiliza ampliamente en escenarios como la recogida de registros, procesamiento de datos en streaming, distribución de mensajes online y offline, y más. Aunque no está diseñado como un MQ tradicional, Kafaka puede reemplazar a los sistemas de mensajería tradicionales como ActiveMQ en la mayoría de los casos.

Kafka organiza el flujo de mensajes por temas, y el servidor que contiene los mensajes se llama broker, y los consumidores pueden suscribirse a uno o más temas. Para equilibrar la carga, los mensajes de un tema pueden dividirse en múltiples particiones, y cuantas más particiones, mayor será el paralelismo y el rendimiento de Kafka.

Los clústeres Kafka requieren soporte de zookeeper para implementarlos, y zookeeper ya está incluido en la última distribución kafka, que puede desplegarse para iniciar un servidor zookeeper y un servidor Kafka al mismo tiempo, o usar otros clústeres de zookeepers existentes.

A diferencia del MQ tradicional, los consumidores deben mantener un offset por sí mismos y, al recibir mensajes de kafka, solo extraen mensajes después del offset actual. El cliente scala/java de Kafka ya implementa esta parte de la lógica guardando el offset para el cuidador del zoo. Cada consumidor puede elegir un ID, y los consumidores con el mismo ID solo recibirán el mismo mensaje una vez.Si los consumidores de un tema usan todos el mismo id, es una Cola tradicional. Si cada consumidor usa un ID diferente, es un pub-sub tradicional.

Revisión:

Añadir ActiveMQ a los servicios del sistema en Windows
https://www.itsvse.com/thread-6210-1-1.html

NúmeroDeMensajesPendientes, MensajesEnqueueados, Mensajes...
https://www.itsvse.com/thread-4954-1-1.html

Resumen de información sobre ActiveMQ y RabbitMQ
https://www.itsvse.com/thread-4659-1-1.html

Se añade CentOS ActiveMQ al servicio
https://www.itsvse.com/thread-4617-1-1.html

Tutorial de instalación activa de Centos 6.2 64 bits
https://www.itsvse.com/thread-4616-1-1.html

ActiveMQ5.15.3 no se inicia y se reporta el error UnsupportedClassVersionError
https://www.itsvse.com/thread-4615-1-1.html

Configuración de permisos de temas de activemq
https://www.itsvse.com/thread-4495-1-1.html

El usuario itsvse no está autorizado para leer de: ActiveMQ.Advisory.TempQueue,Activ...
https://www.itsvse.com/thread-4476-1-1.html

El cliente ActiveMQ en C# se suscribe al código fuente
https://www.itsvse.com/thread-4470-1-1.html

.net/c# activemq para establecer la cuenta de conexión y la contraseña
https://www.itsvse.com/thread-4282-1-1.html

Establezca el nombre de usuario y la contraseña para el tema y la cola de ACTIVEMQ
https://www.itsvse.com/thread-4281-1-1.html

activemq modifica la contraseña de gestión del sitio web
https://www.itsvse.com/thread-4280-1-1.html

activemq Persistent store está lleno
https://www.itsvse.com/thread-4125-1-1.html

.NET/C# Ejemplo de operación ActiveMQ [Código fuente]
https://www.itsvse.com/thread-3907-1-1.html

Configuración de permisos de usuario de Activemq
https://www.itsvse.com/thread-3906-1-1.html

La diferencia entre la cola activmq y el tema es que
https://www.itsvse.com/thread-3863-1-1.html

. Plataforma .Net
https://www.itsvse.com/thread-3452-1-1.html

Configuración persistente de suscripción de ActiveMQ
https://www.itsvse.com/thread-3451-1-1.html

Límite de procesamiento paralelo para consumidores en RabbitMQ BasicQos
https://www.itsvse.com/thread-4667-1-1.html

Persistencia de mensajes en la cola rabbitMQ [con código fuente]
https://www.itsvse.com/thread-4657-1-1.html

【Practica】rabbitMQ para añadir información de cuenta
https://www.itsvse.com/thread-4655-1-1.html

Un análisis en profundidad del mecanismo de respuesta a mensajes de RabbitMQ
https://www.itsvse.com/thread-4639-1-1.html

Desconexión de conexión .net/c# RabbitMQ - desconexión y reconexión
https://www.itsvse.com/thread-4636-1-1.html

Introducción a los tres modos de intercambio (fanout, directo y tema) de RabbitMQ
https://www.itsvse.com/thread-4635-1-1.html

【Practice】RabbitMQ instala el plugin de gestión web
https://www.itsvse.com/thread-4631-1-1.html

【Combate práctico】Tutorial de instalación de RabbitMQ en Windows
https://www.itsvse.com/thread-4630-1-1.html
Consumo de kafka

1. Consumidores del mismo group_id, solo un consumidor puede consumir mensajes (Modo cola de cola

2. Los consumidores de diferentes group_id reciben las mismas noticias

Ventajas de Kafka

Distribuido y altamente escalable. Los clústeres Kafka pueden escalarse de forma transparente para añadir nuevos servidores al clúster.

Alto rendimiento. El rendimiento de Kafka supera ampliamente al de implementaciones tradicionales de MQ como ActiveMQ y RabbitMQ, especialmente Kafka, que también soporta operaciones por lotes. La siguiente imagen muestra los resultados de la prueba de estrés de rendimiento del consumidor de LinkedIn:

Tolerancia a fallos. Los datos de cada partición en Kafka se replican a varios servidores. Cuando un intermediario falla, el servicio ZooKeeper notificará al productor y al consumidor, quienes cambian a otro intermediario.

Desventajas de Kafka:

Repite mensajes. Kafka solo garantiza que cada mensaje se entregue al menos una vez, y aunque las probabilidades son bajas, existe la posibilidad de que un mensaje se entregue varias veces.
Las noticias están fuera de lugar. Aunque los mensajes dentro de una partición están garantizados como ordenados, si un tema tiene múltiples particiones, la entrega de mensajes entre particiones no garantiza que sea ordenada.
Complejidad. Kafka requiere el soporte de grupos de cuidadores, y los temas suelen requerir trabajo manual para crear, desplegar y mantenerse, más costoso que las colas generales de mensajes

.NET/C# cola de mensajes Operaciones de Kafka

Primero, utiliza .NET Core 3.1 para crear dos nuevos proyectos de consola, a saber, Kafka-Consumer y Kafka-Producer

Usa nuget para referenciar el paquete Confluent.Kafka así, con el siguiente comando:

Dirección de GitHub:El inicio de sesión del hipervínculo es visible.

Empezamos primero el programa Productor, y si empezamos primero con el consumidor, obtendremos el siguiente error:
Se ha producido un error: Broker: Tema o partición desconocida

Este artículo consumirá los ajustesEnableAutoOffsetStore es falso, es decir, configurar manualmente el almacenamiento de desplazamiento (similar a un mensaje de confirmación manual)

Los consumidores no configuran OffsetStore después de consumirlo

Intenta usar el productor para producir dos mensajes, activa el consumo del consumidor, MaxPollIntervalMs = 10000 // 10 segundos sin ajuste manual, permite que otros clientes consuman, por supuesto, no será consumido por otros clientes en 10 segundos

MaxPollIntervalMs explica
Para los consumidores avanzados, el tiempo máximo permitido para consumir mensajes entre llamadas (por ejemplo, rd_kafka_consumer_poll()). Si se supera este intervalo, se considera que el consumidor ha fallado y el grupo se reequilibra para que la partición se reasigne a otro miembro del grupo de consumidores. Advertencia: Puede que los commits de offset no sean posibles en este momento. Nota: Se recomienda establecer "enable.auto.offset.store=false" para aplicaciones que estén procesando durante mucho tiempo, y luego almacenar explícitamente el desplazamiento (usando offsets_store()) después de que el mensaje esté procesado* para asegurarse de que el desplazamiento no se comprometa automáticamente antes de que el procesamiento se complete. Revisa una vez por segundo a intervalos de dos. Para más información, véase KIP-62.

Las representaciones son las siguientes:



OffsetStore se establece después de que el consumidor termina de gastar

código

Después de completar la configuración, espera 10 segundos y seguirá funcionandoRecibido el último mensaje(Cuando el consumidor se conecta con el corredor,Comienza el consumo desde la posición de desplazamientoSi se establece c.Commit(cr); El último mensaje no se recibirá repetidamente.

Ver código fuente



commit el offset + 1, y finalmente llamar a Librdkafka.topic_partition_list_destroy(cOffsets);

El inicio de sesión del hipervínculo es visible.
El inicio de sesión del hipervínculo es visible.

Establece un GroupID diferente

Prueba a establecer un GroupId diferente mediante el parámetro de línea de comandos y luego envía un mensaje a través del productor, como se muestra en la siguiente imagen:



Tanto clinet1 como client2Recibir mensajes históricos, y después de que el productor envíe un mensaje, ambos casi seránRecibir mensajes al mismo tiempo

Los nuevos consumidores solo reciben mensajes nuevos

¿Cómo haces que un cliente nuevo reciba solo mensajes nuevos e ignore los datos históricos?

La configuración es la siguiente:

Como se muestra a continuación:



Código de productor

Como sigue:

Código de consumo

Como sigue:

Descarga del código fuente

Turistas, si queréis ver el contenido oculto de esta publicación, por favorRespuesta






Anterior:Excepción .NET/C# usando el buzón empresarial de Tencent: La operación ha expirado.
Próximo:NuGet limpia la caché
 Propietario| Publicado en 15/4/2021 9:31:05 |
Cuando el cliente .NET Kafka se desconecta, no lanza ninguna excepción y se reconecta una vez que la red está normal
%4|1618450028,267| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Desconectado (tras 59926ms en estado UP)
%3|1618450028,267| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumidor-1: 192.168.1.175:9092/1: Desconectado (tras 59926ms en estado UP)
%3|1618450028,267| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Conectar a IPv4#192.168.1.175:9092 fallido: Error desconocido (tras 0 ms en estado CONNECT)
%3|1618450028,268| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Conectar a IPv4#192.168.1.175:9092 fallido: Error desconocido (tras 0 ms en estado CONNECT)
%3|1618450028,357| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Conectar a ipv4#192.168.1.175:9092 fallido: Error desconocido (tras 10 ms en estado CONNECT, 1 error idéntico suprimido)
%3|1618450028,357| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 fallido: Error desconocido (tras 10 ms en estado CONNECT, 1 error idéntico suprimido)
%3|1618450062,882| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Conectar a ipv4#192.168.1.175:9092 fallido: Error desconocido (tras 0ms en estado CONNECT, 8 error(es) idénticos suprimidos)
%3|1618450062,882| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 fallido: Error desconocido (tras 0ms en estado CONNECT, 8 error(es) idéntico(s) suprimidos)
%3|1618450098.255| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Conectar a IPv4#192.168.1.175:9092 fallido: Error desconocido (tras 11 ms en estado CONNECT, 4 error(es) idénticos suprimidos)
%3|1618450098.255| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 fallido: Error desconocido (tras 11ms en estado CONNECT, 4 error(es) idéntico(s) suprimidos)
%3|1618450138,243| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Conectar a IPv4#192.168.1.175:9092 fallido: Error desconocido (tras 0 ms en estado CONNECT, 4 error(es) idéntico(s) suprimidos)
%3|1618450138,244| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 fallido: Error desconocido (tras 0ms en estado CONNECT, 4 error(es) idéntico(s) suprimidos)
%3|1618450168,254| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Conectar a ipv4#192.168.1.175:9092 fallido: Error desconocido (tras 10 ms en estado CONNECT, 3 error(es) idénticos suprimidos)
%3|1618450168,254| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 fallido: Error desconocido (tras 10 ms en estado CONNECT, 3 error(es) idéntico(s) suprimido)

 Propietario| Publicado en 13/4/2021 16:26:58 |
Principio del consumo de mensajes:

En el proceso de producción propiamente dicho, cada tema tendrá múltiples particiones, y la ventaja de varias particiones es que, por un lado, la capacidad de fragmentar los datos en el broker reduce efectivamente la capacidad de los mensajes y mejora el rendimiento de las E/S. Por otro lado, para mejorar el poder de consumo del lado del consumidor, el mismo tema generalmente se abordará a través de múltiples consumidores, es decir, el mecanismo de balanceo de carga del lado del consumidor, que es lo que entenderemos a continuación, ¿cómo consumen los consumidores los mensajes en el caso de múltiples particiones y múltiples consumidores? Kafka existe en el concepto de grupos de consumidores, es decir, group.id el mismo tipo de consumidores, que pertenecen a un grupo de consumidores, y todos los consumidores del grupo se coordinan para consumir todas las particiones del tema de suscripción. Por supuesto, cada partición solo puede ser consumida por consumidores del mismo grupo de consumidores, así que ¿cómo asignan los consumidores del mismo grupo los datos en qué partición debe consumirse? Por ejemplo sencillo, si hay particiones que pierden, es decir, cuando el número de partitones es igual al número de comsumers, cada comsumer corresponde a una partición; si el número de comsumers es mayor que particiones, entonces el número extra de comsumers no funcionará; al contrario, habrá comsumers que consumen múltiples particiones.

Estrategia de asignación de zonificación:

En kafka, existen dos estrategias de asignación de particiones: una es Rango (por defecto) y la otra es RoundRobin (sondeo). Esto se establece mediante el parámetro partition.assignment.strategy de configuración del consumidor.


Ver todos los temas


Ver detalles de un tema




 Propietario| Publicado en 8/5/2021 17:17:33 |
Kafka elimina grupos de consumidores



La eliminación de grupos de consumidores solicitados ('itsvse') fue exitosa.


Se pueden reportar los siguientes errores:

Error: Deletion of some consumer groups failed:
* El grupo 'itsvse' no pudo ser eliminado por: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: El grupo no está vacío.
solución

Consume todos los mensajes o establece un desplazamiento

Kafka ajusta manualmente el desplazamiento
https://www.itsvse.com/thread-9641-1-1.html
¡Entonces, bórrala otra vez!

 Propietario| Publicado en 13/4/2021 15:40:48 |
Comando Power Shell



Cada cliente consumidor mantiene 2 conexiones al servicio Kafka
 Propietario| Publicado en 7/5/2021 12:37:06 |
kafka, para ver el número de pilas de temas bajo un grupo especificado

Publicado en 16/6/2021 12:41:09 |
Por favor, pregunta por qué no se puede ver el código~
 Propietario| Publicado en 25/6/2021 10:50:06 |
Kafka recibe el comando de tamaño del tema:



 Propietario| Publicado en 18/7/2021 10:15:01 |
Kafka en línea de comandos para crear temas:

Publicado en 3/9/2021 11:52:41 |
Todavía hay muchos inconvenientes en la kafka, aprendidos
Renuncia:
Todo el software, materiales de programación o artículos publicados por Code Farmer Network son únicamente para fines de aprendizaje e investigación; El contenido anterior no se utilizará con fines comerciales o ilegales; de lo contrario, los usuarios asumirán todas las consecuencias. La información de este sitio proviene de Internet, y las disputas de derechos de autor no tienen nada que ver con este sitio. Debes eliminar completamente el contenido anterior de tu ordenador en un plazo de 24 horas desde la descarga. Si te gusta el programa, por favor apoya el software genuino, compra el registro y obtén mejores servicios genuinos. Si hay alguna infracción, por favor contáctanos por correo electrónico.

Mail To:help@itsvse.com