Cet article est un article miroir de traduction automatique, veuillez cliquer ici pour accéder à l’article original.

Vue: 129246|Répondre: 17

[Source] Opérations Kafka pour les files d’attente de messages .NET/C# [avec code source]

[Copié le lien]
Publié sur 13/04/2021 11:45:31 | | | |
Kafka est un système de messagerie distribué haute performance développé par LinkedIn, largement utilisé dans des situations telles que la collecte de journaux, le traitement de données en streaming, la distribution de messages en ligne et hors ligne, et bien d’autres. Bien que non conçu comme un MQ traditionnel, Kafaka peut remplacer les systèmes de messagerie traditionnels tels qu’ActiveMQ dans la plupart des cas.

Kafka organise le flux des messages par sujets, et le serveur qui contient les messages s’appelle un courtier, et les consommateurs peuvent s’abonner à un ou plusieurs sujets. Pour équilibrer la charge, les messages d’un sujet peuvent être divisés en plusieurs partitions, et plus il y a de partitions, plus le parallélisme et le débit de Kafka sont élevés.

Les clusters Kafka nécessitent un support zookeeper pour implémenter les clusters, et zookeeper est déjà inclus dans la dernière distribution kafka, qui peut être déployée pour démarrer simultanément un serveur zookeeper et un serveur Kafka, ou utiliser d’autres clusters zookeepers existants.

Contrairement au MQ traditionnel, les consommateurs doivent garder un décalage eux-mêmes, et lorsqu’ils reçoivent des messages de kafka, ne récupèrent les messages qu’après le décalage actuel. Le client scala/java de Kafka implémente déjà cette partie de la logique en enregistrant le décalage sur le zookeeper. Chaque consommateur peut choisir un identifiant, et les consommateurs avec le même identifiant ne recevront le même message qu’une seule fois.Si les consommateurs d’un sujet utilisent tous le même identifiant, il s’agit d’une file d’attente traditionnelle. Si chaque consommateur utilise un identifiant différent, c’est un pub-sub traditionnel.

Révision:

Ajouter ActiveMQ aux services système sous Windows
https://www.itsvse.com/thread-6210-1-1.html

NombreDeMessagesEn Attente, MessagesEnfile, Message...
https://www.itsvse.com/thread-4954-1-1.html

Résumé des informations sur ActiveMQ et RabbitMQ
https://www.itsvse.com/thread-4659-1-1.html

CentOS ActiveMQ est ajouté au service
https://www.itsvse.com/thread-4617-1-1.html

Tutoriel d’installation activemq 64 bits de Centos 6.2
https://www.itsvse.com/thread-4616-1-1.html

ActiveMQ5.15.3 ne démarre pas, et UnsupportedClassVersionError est signalée
https://www.itsvse.com/thread-4615-1-1.html

Paramètres d’autorisation des sujets Activemq
https://www.itsvse.com/thread-4495-1-1.html

L’utilisateur itsvse n’est pas autorisé à lire : ActiveMQ.Advisory.TempQueue,Activ...
https://www.itsvse.com/thread-4476-1-1.html

Le client C# ActiveMQ s’abonne au code source
https://www.itsvse.com/thread-4470-1-1.html

.net/c# activemq pour définir le compte de connexion et le mot de passe
https://www.itsvse.com/thread-4282-1-1.html

Définissez le nom d’utilisateur et le mot de passe pour le thème et la file d’attente ACTIVEMQ
https://www.itsvse.com/thread-4281-1-1.html

activemq modifie le mot de passe de gestion du site web
https://www.itsvse.com/thread-4280-1-1.html

activemq Le stockage persistant est complet
https://www.itsvse.com/thread-4125-1-1.html

Exemple d’opération ActiveMQ en .NET/C# [Code source]
https://www.itsvse.com/thread-3907-1-1.html

Configuration des permissions utilisateur d’Activemq
https://www.itsvse.com/thread-3906-1-1.html

La différence entre activemq Queue et Topic est que
https://www.itsvse.com/thread-3863-1-1.html

. Plateforme .Net
https://www.itsvse.com/thread-3452-1-1.html

Paramètres d’abonnement persistants ActiveMQ
https://www.itsvse.com/thread-3451-1-1.html

Limite de traitement parallèle grand public BasicQos
https://www.itsvse.com/thread-4667-1-1.html

Persistance des messages de file d’attente rabbitMQ [avec code source]
https://www.itsvse.com/thread-4657-1-1.html

【Entraînement】rabbitMQ console pour ajouter des informations de compte
https://www.itsvse.com/thread-4655-1-1.html

Une analyse approfondie du mécanisme de réponse aux messages de RabbitMQ
https://www.itsvse.com/thread-4639-1-1.html

.net/c# Déconnexion de connexion RabbitMQ - déconnexion et reconnexion
https://www.itsvse.com/thread-4636-1-1.html

Introduction aux trois modes d’échange (fanout, direct et topic) de RabbitMQ
https://www.itsvse.com/thread-4635-1-1.html

【Practice】RabbitMQ installe le plugin de gestion web
https://www.itsvse.com/thread-4631-1-1.html

【Combat pratique】Tutoriel d’installation de RabbitMQ sous Windows
https://www.itsvse.com/thread-4630-1-1.html
Consommation de kafka

1. Les consommateurs du même group_id, un seul consommateur peut consommer des messages (Mode file d’attente

2. Les consommateurs de group_id différents reçoivent la même nouvelle

Avantages de Kafka

Distribué et très évolutif. Les clusters Kafka peuvent être adaptés de manière transparente pour ajouter de nouveaux serveurs au cluster.

Haute performance. Les performances de Kafka dépassent largement celles des implémentations MQ traditionnelles telles qu’ActiveMQ et RabbitMQ, en particulier Kafka, qui prend également en charge les opérations batch. L’image suivante montre les résultats du test de résistance des consommateurs sur LinkedIn :

Tolérance aux fautes. Les données de chaque partition dans Kafka sont répliquées vers plusieurs serveurs. Lorsqu’un courtier échoue, le service ZooKeeper en informe le producteur et le consommateur, qui passent à un autre courtier.

Inconvénients de Kafka :

Répétez les messages. Kafka ne garantit que chaque message sera livré au moins une fois, et même si les chances sont minces, il y a une chance qu’un message soit livré plusieurs fois.
Les nouvelles sont hors de propos. Bien que les messages à l’intérieur d’une partition soient garantis d’être ordonnés, si un sujet possède plusieurs partitions, la transmission des messages entre partitions n’est pas garantie d’être ordonnée.
Complexité. Kafka nécessite le support de groupes de soigneurs de zoos, et les sujets nécessitent généralement un travail manuel pour créer, déployer et maintenir plus coûteux que les files d’attente de messages générales

Opérations Kafka de file de messages .NET/C#

Tout d’abord, utilisez .NET Core 3.1 pour créer deux nouveaux projets de console, à savoir Kafka-Consumer et Kafka-Producer

Utilisez nuget pour référencer le package Confluent.Kafka de cette façon, avec la commande suivante :

Adresse GitHub :La connexion hyperlientérée est visible.

Nous lançons d’abord le programme Producteur, et si nous lanceons le consommateur en premier, nous obtiendrons l’erreur suivante :
Erreur survenue : Courtier : sujet ou partition inconnu

Cet article va consommer les réglagesEnableAutoOffsetStore est fauxc’est-à-dire en réglant manuellement le stockage décalé (similaire à un message de confirmation manuelle)

Les consommateurs ne définissent pas OffsetStore après consommation

Essayez d’utiliser le producteur pour produire deux messages, activez la consommation des consommateurs, MaxPollIntervalMs = 10000 // 10 secondes sans réglage manuel, laissez les autres clients consommer, bien sûr, cela ne sera pas consommé par d’autres clients dans les 10 secondes

MaxPollIntervalMs explique
Pour les consommateurs avancés, le temps maximal autorisé à consommer les messages entre les appels (par exemple, rd_kafka_consumer_poll()). Si cet intervalle est dépassé, le consommateur est considéré comme ayant échoué et le groupe est rééquilibré afin que la partition soit réattribuée à un autre membre du groupe de consommateurs. Avertissement : les commits de décalage peuvent ne pas être possibles pour le moment. Note : Il est recommandé de définir « enable.auto.offset.store=false » pour les applications en traitement prolongé, puis de stocker explicitement le décalage (en utilisant offsets_store()) après le traitement du message* afin de s’assurer que le décalage n’est pas automatiquement engagé avant la fin du traitement. Vérifiez une fois par seconde à des intervalles de deux. Pour plus d’informations, voir KIP-62.

Les rendus sont les suivants :



OffsetStore est défini après que le consommateur a fini de dépenser

code

Une fois la configuration terminée, attendez 10 secondes et ça continueraReçu le dernier message(Lorsque le consommateur se connecte au courtier,Commencez la consommation depuis la position décaléeSi c.Commit(cr) est défini ; Le dernier message ne sera pas reçu à répétition.

Voir le code source



commit le décalage + 1, puis finalement appeler Librdkafka.topic_partition_list_destroy(cOffsets) ;

La connexion hyperlientérée est visible.
La connexion hyperlientérée est visible.

Définir un GroupID différent

Essayez de définir un GroupId différent via le paramètre en ligne de commande, puis envoyez un message via le producteur, comme montré sur l’image suivante :



Clinet1 et client2Recevoir des messages historiques, et après que le producteur ait envoyé un message, les deux seront presqueRecevoir des messages en même temps

Les nouveaux consommateurs ne reçoivent que de nouveaux messages

Comment faire en sorte qu’un nouveau client ne reçoive que de nouveaux messages et ignore les données historiques ?

Les réglages sont les suivants :

Comme montré ci-dessous :



Code du producteur

Comme suit:

Code grand public

Comme suit:

Téléchargement du code source

Touristes, si vous voulez voir le contenu caché de ce post, s’il vous plaîtRépondre






Précédent:Exception .NET/C# utilisant la boîte aux lettres Tencent Enterprise : L’opération est expirée.
Prochain:NuGet vide le cache
 Propriétaire| Publié sur 15/04/2021 09:31:05 |
Lorsque le client .NET Kafka est déconnecté, il ne lance pas d’exception et se reconnecte une fois le réseau normal
%4|1618450028,267| FAIL|rdkafka#consumer-1| [thrd :192.168.1.175:9092/bootstrap] : 192.168.1.175:9092/1 : Déconnecté (après 59926ms en état UP)
%3|1618450028,267| ERROR|rdkafka#consumer-1| [thrd :app] : rdkafka#consumer-1 : 192.168.1.175:9092/1 : Déconnecté (après 59926ms dans l’état UP)
%3|1618450028,267| FAIL|rdkafka#consumer-1| [thrd :192.168.1.175:9092/bootstrap] : 192.168.1.175:9092/1 : Connect to ipv4#192.168.1.175:9092 failed : Erreur inconnue (après 0 ms dans l’état CONNECT)
%3|1618450028,268| ERROR|rdkafka#consumer-1| [thrd :app] : rdkafka#consumer-1 : 192.168.1.175:9092/1 : Connect to ipv4#192.168.1.175:9092 failed : Erreur inconnue (après 0 ms dans l’état CONNECT)
%3|1618450028,357| FAIL|rdkafka#consumer-1| [thrd :192.168.1.175:9092/bootstrap] : 192.168.1.175:9092/1 : Connect to ipv4#192.168.1.175:9092 failed : Erreur inconnue (après 10 ms en état CONNECT, 1(s) erreur(s) identique(s) supprimée)
%3|1618450028,357| ERROR|rdkafka#consumer-1| [thrd :app] : rdkafka#consumer-1 : 192.168.1.175:9092/1 : Connect to ipv4#192.168.1.175:9092 failed : Erreur inconnue (après 10 ms dans l’état CONNECT, 1 ou plusieurs erreur(s) identique(s) supprimée)
%3|1618450062,882| FAIL|rdkafka#consumer-1| [thrd :192.168.1.175:9092/bootstrap] : 192.168.1.175:9092/1 : Connect to ipv4#192.168.1.175:9092 failed : Erreur inconnue (après 0 ms dans l’état CONNECT, 8 erreurs identiques supprimées)
%3|1618450062,882| ERROR|rdkafka#consumer-1| [thrd :app] : rdkafka#consumer-1 : 192.168.1.175:9092/1 : Connect to ipv4#192.168.1.175:9092 failed : Erreur inconnue (après 0 ms dans l’état CONNECT, 8 erreurs identiques supprimées)
%3|1618450098,255| FAIL|rdkafka#consumer-1| [thrd :192.168.1.175:9092/bootstrap] : 192.168.1.175:9092/1 : Connect to ipv4#192.168.1.175:9092 failed : Erreur inconnue (après 11 ms dans l’état CONNECT, 4 erreurs identiques supprimées)
%3|1618450098,255| ERROR|rdkafka#consumer-1| [thrd :app] : rdkafka#consumer-1 : 192.168.1.175:9092/1 : Connect to ipv4#192.168.1.175:9092 failed : Erreur inconnue (après 11 ms en état CONNECT, 4 erreurs identiques supprimées)
%3|1618450138,243| FAIL|rdkafka#consumer-1| [thrd :192.168.1.175:9092/bootstrap] : 192.168.1.175:9092/1 : Connect to ipv4#192.168.1.175:9092 failed : Erreur inconnue (après 0 ms dans l’état CONNECT, 4 erreurs identiques supprimées)
%3|1618450138,244| ERROR|rdkafka#consumer-1| [thrd :app] : rdkafka#consumer-1 : 192.168.1.175:9092/1 : Connect to ipv4#192.168.1.175:9092 failed : Erreur inconnue (après 0 ms dans l’état CONNECT, 4 erreurs identiques supprimées)
%3|1618450168,254| FAIL|rdkafka#consumer-1| [thrd :192.168.1.175:9092/bootstrap] : 192.168.1.175:9092/1 : Connect to ipv4#192.168.1.175:9092 failed : Erreur inconnue (après 10 ms dans l’état CONNECT, 3 erreurs identiques supprimées)
%3|1618450168,254| ERROR|rdkafka#consumer-1| [thrd :app] : rdkafka#consumer-1 : 192.168.1.175:9092/1 : Connect to ipv4#192.168.1.175:9092 failed : Erreur inconnue (après 10 ms dans l’état CONNECT, 3 erreurs identiques supprimées)

 Propriétaire| Publié sur 13/04/2021 16:26:58 |
Principe de la consommation des messages :

Dans le processus de production proprement dit, chaque sujet comportera plusieurs partitions, et l’avantage de plusieurs partitions est que, d’une part, la capacité à fragmenter les données sur le courtier réduit effectivement la capacité des messages et améliore les performances des E/S. D’un autre côté, afin d’améliorer le pouvoir de consommation du côté consommateur, le même sujet sera généralement abordé par plusieurs consommateurs, c’est-à-dire le mécanisme d’équilibrage de charge du côté consommateur, ce que nous comprendrons ensuite : comment les consommateurs consomment-ils les messages dans le cas de multiples partitions et plusieurs consommateurs ? Kafka existe dans le concept de groupes de consommateurs, c’est-à-dire group.id le même type de consommateurs, qui appartiennent à un groupe de consommateurs, et tous les consommateurs du groupe se coordonnent pour consommer toutes les partitions du sujet d’abonnement. Bien sûr, chaque partition ne peut être consommée que par les consommateurs du même groupe de consommateurs, alors comment les consommateurs du même groupe de consommateurs répartissent-ils les données dans lesquelles la partition doit être consommée ? Pour un exemple simple, s’il y a des partitions qui perdent, c’est-à-dire que lorsque le nombre de partitons est égal au nombre de sommeurs, chaque comsumer correspond à une partition, si le nombre de comsumers est supérieur à des partitions, alors le nombre supplémentaire de comsumers ne fonctionnera pas, au contraire, il y aura des comsumeurs consommant plusieurs partitions.

Stratégie d’attribution de zonage :

Dans kafka, il existe deux stratégies d’allocation de partition, l’une est la plage (par défaut) et l’autre le RoundRobin (polling). Ceci est défini par le paramètre configuration, partition.assignment.strategy, du sommeur.


Voir tous les sujets


Voir les détails d’un sujet




 Propriétaire| Publié sur 08/05/2021 17:17:33 |
Kafka supprime les groupes de consommateurs



La suppression des groupes de consommateurs demandés (« itsvse ») a été réussie.


Les erreurs suivantes peuvent être signalées :

Error: Deletion of some consumer groups failed:
* Le groupe 'itsvse' n’a pas pu être supprimé à cause de : java.util.concurrent.ExecutionException : org.apache.kafka.common.errors.GroupNotEmptyException : Le groupe n’est pas vide.
solution

Consommez tous les messages, ou définissez un décalage

Kafka règle manuellement le décalage
https://www.itsvse.com/thread-9641-1-1.html
Alors, supprimez-le à nouveau !

 Propriétaire| Publié sur 13/04/2021 15:40:48 |
Commande Power Shell



Chaque client grand public maintient 2 connexions au service Kafka
 Propriétaire| Publié sur 07/05/2021 12:37:06 |
kafka, pour voir le nombre de piles de sujets sous un groupe spécifié

Publié sur 16/06/2021 12:41:09 |
Veuillez demander pourquoi le code ne peut pas être consulté~
 Propriétaire| Publié sur 25/06/2021 10:50:06 |
Kafka reçoit la commande taille du sujet :



 Propriétaire| Publié sur 18/07/2021 10:15:01 |
Ligne de commande Kafka pour créer des sujets :

Publié sur 03/09/2021 11:52:41 |
Il y a encore beaucoup d’écueils dans la kafka, appris
Démenti:
Tous les logiciels, supports de programmation ou articles publiés par Code Farmer Network sont uniquement destinés à l’apprentissage et à la recherche ; Le contenu ci-dessus ne doit pas être utilisé à des fins commerciales ou illégales, sinon les utilisateurs assumeront toutes les conséquences. Les informations sur ce site proviennent d’Internet, et les litiges de droits d’auteur n’ont rien à voir avec ce site. Vous devez supprimer complètement le contenu ci-dessus de votre ordinateur dans les 24 heures suivant le téléchargement. Si vous aimez le programme, merci de soutenir un logiciel authentique, d’acheter l’immatriculation et d’obtenir de meilleurs services authentiques. En cas d’infraction, veuillez nous contacter par e-mail.

Mail To:help@itsvse.com