Kafka est un système de messagerie distribué haute performance développé par LinkedIn, largement utilisé dans des situations telles que la collecte de journaux, le traitement de données en streaming, la distribution de messages en ligne et hors ligne, et bien d’autres. Bien que non conçu comme un MQ traditionnel, Kafaka peut remplacer les systèmes de messagerie traditionnels tels qu’ActiveMQ dans la plupart des cas.
Kafka organise le flux des messages par sujets, et le serveur qui contient les messages s’appelle un courtier, et les consommateurs peuvent s’abonner à un ou plusieurs sujets. Pour équilibrer la charge, les messages d’un sujet peuvent être divisés en plusieurs partitions, et plus il y a de partitions, plus le parallélisme et le débit de Kafka sont élevés.
Les clusters Kafka nécessitent un support zookeeper pour implémenter les clusters, et zookeeper est déjà inclus dans la dernière distribution kafka, qui peut être déployée pour démarrer simultanément un serveur zookeeper et un serveur Kafka, ou utiliser d’autres clusters zookeepers existants.
Contrairement au MQ traditionnel, les consommateurs doivent garder un décalage eux-mêmes, et lorsqu’ils reçoivent des messages de kafka, ne récupèrent les messages qu’après le décalage actuel. Le client scala/java de Kafka implémente déjà cette partie de la logique en enregistrant le décalage sur le zookeeper. Chaque consommateur peut choisir un identifiant, et les consommateurs avec le même identifiant ne recevront le même message qu’une seule fois.Si les consommateurs d’un sujet utilisent tous le même identifiant, il s’agit d’une file d’attente traditionnelle. Si chaque consommateur utilise un identifiant différent, c’est un pub-sub traditionnel.
Révision:
Consommation de kafka
1. Les consommateurs du même group_id, un seul consommateur peut consommer des messages (Mode file d’attente)
2. Les consommateurs de group_id différents reçoivent la même nouvelle
Avantages de Kafka
Distribué et très évolutif. Les clusters Kafka peuvent être adaptés de manière transparente pour ajouter de nouveaux serveurs au cluster.
Haute performance. Les performances de Kafka dépassent largement celles des implémentations MQ traditionnelles telles qu’ActiveMQ et RabbitMQ, en particulier Kafka, qui prend également en charge les opérations batch. L’image suivante montre les résultats du test de résistance des consommateurs sur LinkedIn :
Tolérance aux fautes. Les données de chaque partition dans Kafka sont répliquées vers plusieurs serveurs. Lorsqu’un courtier échoue, le service ZooKeeper en informe le producteur et le consommateur, qui passent à un autre courtier.
Inconvénients de Kafka :
Répétez les messages. Kafka ne garantit que chaque message sera livré au moins une fois, et même si les chances sont minces, il y a une chance qu’un message soit livré plusieurs fois. Les nouvelles sont hors de propos. Bien que les messages à l’intérieur d’une partition soient garantis d’être ordonnés, si un sujet possède plusieurs partitions, la transmission des messages entre partitions n’est pas garantie d’être ordonnée. Complexité. Kafka nécessite le support de groupes de soigneurs de zoos, et les sujets nécessitent généralement un travail manuel pour créer, déployer et maintenir plus coûteux que les files d’attente de messages générales
Opérations Kafka de file de messages .NET/C#
Tout d’abord, utilisez .NET Core 3.1 pour créer deux nouveaux projets de console, à savoir Kafka-Consumer et Kafka-Producer
Utilisez nuget pour référencer le package Confluent.Kafka de cette façon, avec la commande suivante :
Adresse GitHub :La connexion hyperlientérée est visible.
Nous lançons d’abord le programme Producteur, et si nous lanceons le consommateur en premier, nous obtiendrons l’erreur suivante :
Erreur survenue : Courtier : sujet ou partition inconnu Cet article va consommer les réglagesEnableAutoOffsetStore est fauxc’est-à-dire en réglant manuellement le stockage décalé (similaire à un message de confirmation manuelle)
Les consommateurs ne définissent pas OffsetStore après consommation
Essayez d’utiliser le producteur pour produire deux messages, activez la consommation des consommateurs, MaxPollIntervalMs = 10000 // 10 secondes sans réglage manuel, laissez les autres clients consommer, bien sûr, cela ne sera pas consommé par d’autres clients dans les 10 secondes
MaxPollIntervalMs explique
Pour les consommateurs avancés, le temps maximal autorisé à consommer les messages entre les appels (par exemple, rd_kafka_consumer_poll()). Si cet intervalle est dépassé, le consommateur est considéré comme ayant échoué et le groupe est rééquilibré afin que la partition soit réattribuée à un autre membre du groupe de consommateurs. Avertissement : les commits de décalage peuvent ne pas être possibles pour le moment. Note : Il est recommandé de définir « enable.auto.offset.store=false » pour les applications en traitement prolongé, puis de stocker explicitement le décalage (en utilisant offsets_store()) après le traitement du message* afin de s’assurer que le décalage n’est pas automatiquement engagé avant la fin du traitement. Vérifiez une fois par seconde à des intervalles de deux. Pour plus d’informations, voir KIP-62. Les rendus sont les suivants :
OffsetStore est défini après que le consommateur a fini de dépenser
code
Une fois la configuration terminée, attendez 10 secondes et ça continueraReçu le dernier message(Lorsque le consommateur se connecte au courtier,Commencez la consommation depuis la position décaléeSi c.Commit(cr) est défini ; Le dernier message ne sera pas reçu à répétition.
Voir le code source
commit le décalage + 1, puis finalement appeler Librdkafka.topic_partition_list_destroy(cOffsets) ;
La connexion hyperlientérée est visible.
La connexion hyperlientérée est visible.
Définir un GroupID différent
Essayez de définir un GroupId différent via le paramètre en ligne de commande, puis envoyez un message via le producteur, comme montré sur l’image suivante :
Clinet1 et client2Recevoir des messages historiques, et après que le producteur ait envoyé un message, les deux seront presqueRecevoir des messages en même temps。
Les nouveaux consommateurs ne reçoivent que de nouveaux messages
Comment faire en sorte qu’un nouveau client ne reçoive que de nouveaux messages et ignore les données historiques ?
Les réglages sont les suivants :
Comme montré ci-dessous :
Code du producteur
Comme suit:
Code grand public
Comme suit:
Téléchargement du code source
Touristes, si vous voulez voir le contenu caché de ce post, s’il vous plaît Répondre
|