Dieser Artikel ist ein Spiegelartikel der maschinellen Übersetzung, bitte klicken Sie hier, um zum Originalartikel zu springen.

Ansehen: 129246|Antwort: 17

[Quelle] Kafka-Operationen für .NET/C#-Nachrichtenwarteschlangen [mit Quellcode]

[Link kopieren]
Veröffentlicht am 13.04.2021 11:45:31 | | | |
Kafka ist ein leistungsstarkes, verteiltes Messaging-System, das von LinkedIn entwickelt wurde und häufig in Szenarien wie Logsammlung, Streaming-Datenverarbeitung, Online- und Offline-Nachrichtenverteilung und mehr eingesetzt wird. Obwohl Kafaka nicht als traditionelles MQ konzipiert ist, kann es in den meisten Fällen traditionelle Messaging-Systeme wie ActiveMQ ersetzen.

Kafka organisiert den Nachrichtenfluss nach Themen, und der Server, der die Nachrichten speichert, heißt Broker; Konsumenten können ein oder mehrere Themen abonnieren. Um die Last auszugleichen, können die Nachrichten eines Themas in mehrere Partitionen unterteilt werden, und je mehr Partitionen, desto höher ist die Parallelität und der Durchsatz von Kafka.

Kafka-Cluster benötigen Zookeeper-Unterstützung, um Cluster zu implementieren, und Zookeeper ist bereits in der neuesten Kafka-Distribution enthalten, die gleichzeitig einen Zookeeper-Server und einen Kafka-Server starten oder andere bestehende Zookeeper-Cluster nutzen kann.

Im Gegensatz zur traditionellen MQ müssen Verbraucher einen Offset für sich behalten und bei Nachrichten von Kafka erst nach dem aktuellen Offset abrufen. Kafkas Scala/Java-Client implementiert diesen Teil der Logik bereits, indem er den Offset im Zookeeper speichert. Jeder Verbraucher kann eine ID wählen, und Nutzer mit derselben ID erhalten dieselbe Nachricht nur einmal.Wenn alle Konsumenten eines Themas dieselbe ID verwenden, handelt es sich um eine traditionelle Warteschlange. Wenn jeder Verbraucher eine andere ID verwendet, handelt es sich um ein traditionelles Pub-Sub.

Rezension:

ActiveMQ unter Windows zu Systemdiensten hinzufügen
https://www.itsvse.com/thread-6210-1-1.html

NummerVonPendingNachrichten, NachrichtenWarteschlange, Messag...
https://www.itsvse.com/thread-4954-1-1.html

Zusammenfassung der Informationen zu ActiveMQ und RabbitMQ
https://www.itsvse.com/thread-4659-1-1.html

CentOS ActiveMQ wird dem Dienst hinzugefügt
https://www.itsvse.com/thread-4617-1-1.html

Centos 6.2 64-Bit-Installation activemq Tutorial
https://www.itsvse.com/thread-4616-1-1.html

ActiveMQ5.15.3 startet fehl, und UnsupportedClassVersionError wird gemeldet
https://www.itsvse.com/thread-4615-1-1.html

Activemq Topic-Berechtigungseinstellungen
https://www.itsvse.com/thread-4495-1-1.html

Benutzer itsvse ist nicht autorisiert, von: ActiveMQ.Advisory.TempQueue,Activ... zu lesen.
https://www.itsvse.com/thread-4476-1-1.html

C# ActiveMQ-Client abonniert Quellcode
https://www.itsvse.com/thread-4470-1-1.html

.net/c# activemq, um das Verbindungskonto und Passwort festzulegen
https://www.itsvse.com/thread-4282-1-1.html

Setzen Sie Benutzernamen und Passwort für das ACTIVEMQ-Theme und die Warteschlange ein
https://www.itsvse.com/thread-4281-1-1.html

ActiveMQ ändert das Webmanagement-Passwort
https://www.itsvse.com/thread-4280-1-1.html

activemq Persistent Store ist Full
https://www.itsvse.com/thread-4125-1-1.html

.NET/C# Beispiel für eine ActiveMQ-Operation [Quellcode]
https://www.itsvse.com/thread-3907-1-1.html

Activemq-Benutzerberechtigungskonfiguration
https://www.itsvse.com/thread-3906-1-1.html

Der Unterschied zwischen activemq Queue und Topic ist, dass
https://www.itsvse.com/thread-3863-1-1.html

. .Net-Plattform
https://www.itsvse.com/thread-3452-1-1.html

Persistent-Abonnement-Einstellungen von ActiveMQ
https://www.itsvse.com/thread-3451-1-1.html

RabbitMQ BasicQos Consumer-Parallelverarbeitungslimit
https://www.itsvse.com/thread-4667-1-1.html

rabbitMQ Queue Queue Nachrichtenpersistenz [mit Quellcode]
https://www.itsvse.com/thread-4657-1-1.html

【Practice】rabbitMQ console zum Hinzufügen von Kontoinformationen
https://www.itsvse.com/thread-4655-1-1.html

Eine eingehende Analyse des Mechanismus der RabbitMQ-Nachrichtenantwort
https://www.itsvse.com/thread-4639-1-1.html

.net/c# RabbitMQ Verbindungsabbruch – Abschaltung und Wiederverbindung
https://www.itsvse.com/thread-4636-1-1.html

Einführung in die drei Austauschmodi (Fanout, Direct und Topic) von RabbitMQ
https://www.itsvse.com/thread-4635-1-1.html

【Practice】RabbitMQ installiert das Webmanagement-Plugin
https://www.itsvse.com/thread-4631-1-1.html

【Practical Combat】RabbitMQ Installationsanleitung unter Windows
https://www.itsvse.com/thread-4630-1-1.html
Kafka-Konsum

1. Konsumenten derselben group_id, nur ein Konsument kann Nachrichten konsumieren (Warteschlangen-Warteschlangen-Modus

2. Verbraucher verschiedener group_id erhalten dieselben Nachrichten

Vorteile der Kafka

Verteilt und hochgradig skalierbar. Kafka-Cluster können transparent skaliert werden, um neue Server zum Cluster hinzuzufügen.

Hohe Leistung. Kafkas Leistung übertrifft bei weitem die traditioneller MQ-Implementierungen wie ActiveMQ und RabbitMQ, insbesondere Kafka, das ebenfalls Batch-Operationen unterstützt. Das folgende Bild zeigt die Ergebnisse des Stresstests der Verbraucherleistung von LinkedIn:

Fehlertoleranz. Daten von jeder Partition in Kafka werden auf mehrere Server repliziert. Wenn ein Makler scheitert, informiert der ZooKeeper-Dienst den Produzenten und den Verbraucher, die dann zu einem anderen Makler wechseln.

Nachteile der Kafka:

Wiederholen Sie die Nachrichten. Kafka garantiert nur, dass jede Nachricht mindestens einmal zugestellt wird, und obwohl die Chancen gering sind, besteht die Wahrscheinlichkeit, dass eine Nachricht mehrfach übergeben wird.
Die Nachrichten sind außer Ordnung. Obwohl Nachrichten innerhalb einer Partition garantiert geordnet sind, ist bei mehreren Partitionen die Nachrichtenzustellung zwischen den Partitionen nicht garantiert.
Komplexität. Kafka benötigt die Unterstützung von Zookeeper-Clustern, und Themen erfordern meist manuelle Arbeit zur Erstellung, Bereitstellung und Wartung teurer als allgemeine Nachrichtenwarteschlangen

.NET/C#-Nachrichtenwarteschlange Kafka-Operationen

Zuerst verwenden Sie .NET Core 3.1, um zwei neue Konsolenprojekte zu erstellen, nämlich Kafka-Consumer und Kafka-Producer

Verwenden Sie nuget, um das Confluent.Kafka-Paket so zu referenzieren, mit folgendem Befehl:

GitHub-Adresse:Der Hyperlink-Login ist sichtbar.

Wir starten zuerst das Producer-Programm, und wenn wir zuerst den Consumer starten, erhalten wir folgenden Fehler:
Fehler aufgetreten: Broker: Unbekanntes Thema oder Partition

Dieser Artikel wird die Einstellungen umfassenEnableAutoOffsetStore ist falsch, das heißt, manuell den Versatzspeicher einzustellen (ähnlich einer manuellen Bestätigungsmeldung)

Verbraucher setzen OffsetStore nach dem Verbrauch nicht mehr

Versuche, den Producer zu verwenden, um zwei Nachrichten zu erzeugen, aktiviere den Verbraucherverbrauch, MaxPollIntervalMs = 10000 // 10 Sekunden ohne manuelle Einstellung, lass andere Clients konsumieren, natürlich wird es nicht innerhalb von 10 Sekunden von anderen Clients verbraucht.

MaxPollIntervalMs erklärt
Für fortgeschrittene Verbraucher ist die maximal erlaubte Zeit für Nachrichten zwischen Anrufen (zum Beispiel rd_kafka_consumer_poll()). Wenn dieses Intervall überschritten wird, gilt der Konsument als gescheitert und die Gruppe wird neu ausbalanciert, sodass die Partition einem anderen Mitglied der Konsumentengruppe zugewiesen wird. Warnung: Offset-Commits sind derzeit möglicherweise nicht möglich. Hinweis: Es wird empfohlen, für Anwendungen, die lange verarbeiten, "enable.auto.offset.store=false" zu setzen und dann den Offset explizit (mit offsets_store()) nach der Bearbeitung der Nachricht* zu speichern, um sicherzustellen, dass der Offset nicht automatisch vor Abschluss der Verarbeitung festgelegt wird. Überprüfen Sie einmal pro Sekunde im Abstand von zwei Sekunden. Weitere Informationen finden Sie unter KIP-62.

Die Darstellungen sind wie folgt:



OffsetStore wird festgelegt, nachdem der Verbraucher seine Ausgaben abgeschlossen hat

Code

Nachdem die Einrichtung abgeschlossen ist, warte 10 Sekunden, dann funktioniert es immer nochLetzte Nachricht erhalten(Wenn der Verbraucher sich mit dem Broker verbindet,Beginne den Verbrauch aus der Offset-PositionWenn c.Commit(cr) gesetzt ist; Die letzte Nachricht wird nicht wiederholt empfangen.

Quellcode anzeigen



Commit den Offset + 1 Commit und ruft schließlich Librdkafka.topic_partition_list_destroy(cOffsets) auf;

Der Hyperlink-Login ist sichtbar.
Der Hyperlink-Login ist sichtbar.

Setze eine andere GroupId

Versuche, eine andere GroupId über den Kommandozeilenparameter zu setzen und dann eine Nachricht über den Producer zu senden, wie im folgenden Bild gezeigt:



Sowohl clinet1 als auch client2Empfangen Sie historische Botschaften, und nachdem der Produzent eine Nachricht verschickt hat, werden beide fastNachrichten gleichzeitig empfangen

Neue Verbraucher erhalten nur neue Nachrichten

Wie sorgt man dafür, dass ein neuer Client nur neue Nachrichten erhält und historische Daten ignoriert?

Die Einstellungen sind wie folgt:

Wie unten gezeigt:



Produzentencode

Folgendermaßen:

Verbrauchercode

Folgendermaßen:

Quellcode-Download

Touristen, wenn ihr den versteckten Inhalt dieses Beitrags sehen wollt, bitteAntwort






Vorhergehend:.NET/C#-Ausnahme mit Tencent Enterprise Mailbox: Die Operation ist abgelaufen.
Nächster:NuGet löscht den Cache
 Vermieter| Veröffentlicht am 15.04.2021 09:31:05 |
Wenn der .NET Kafka-Client getrennt wird, wirft er keine Ausnahme und verbindet sich wieder, nachdem das Netzwerk normal ist
%4|1618450028.267| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Getrennt (nach 59926 ms im Zustand UP)
%3|1618450028.267| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Getrennt (nach 59926 ms im Zustand UP)
%3|1618450028.267| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Verbindung zu ipv4#192.168.1.175:9092 fehlgeschlagen: Unbekannter Fehler (nach 0 ms im Zustand CONNECT)
%3|1618450028.268| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Verbindung zu IPv4#192.168.1.175:9092 fehlgeschlagen: Unbekannter Fehler (nach 0 ms im Zustand CONNECT)
%3|1618450028.357| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Verbindung zu IPv4#192.168.1.175:9092 fehlgeschlagen: Unbekannter Fehler (nach 10 ms im Zustand CONNECT, 1 identischer Fehler unterdrückt)
%3|1618450028.357| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Verbindung zu IPv4#192.168.1.175:9092 fehlgeschlagen: Unbekannter Fehler (nach 10 ms im Zustand CONNECT, 1 identischer Fehler unterdrückt)
%3|1618450062.882| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Verbindung zu IPv4#192.168.1.175:9092 fehlgeschlagen: Unbekannter Fehler (nach 0 ms im Zustand CONNECT, 8 identische Fehler unterdrückt)
%3|1618450062.882| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Verbindung zu IPv4#192.168.1.175:9092 fehlgeschlagen: Unbekannter Fehler (nach 0 ms im Zustand CONNECT, 8 identische Fehler unterdrückt)
%3|1618450098.255| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Verbindung zu IPv4#192.168.1.175:9092 fehlgeschlagen: Unbekannter Fehler (nach 11 ms im Zustand CONNECT, 4 identische Fehler(n) unterdrückt)
%3|1618450098.255| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Verbindung zu IPv4#192.168.1.175:9092 fehlgeschlagen: Unbekannter Fehler (nach 11 ms im Zustand CONNECT, 4 identische Fehler unterdrückt)
%3|1618450138.243| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Verbindung zu ipv4#192.168.1.175:9092 fehlgeschlagen: Unbekannter Fehler (nach 0 ms im Zustand CONNECT, 4 identische Fehler(n) unterdrückt)
%3|1618450138.244| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Verbindung zu IPv4#192.168.1.175:9092 fehlgeschlagen: Unbekannter Fehler (nach 0 ms im Zustand CONNECT, 4 identische Fehler(n) unterdrückt)
%3|1618450168.254| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Verbindung zu ipv4#192.168.1.175:9092 fehlgeschlagen: Unbekannter Fehler (nach 10 ms im Zustand CONNECT, 3 identische Fehler(n) unterdrückt)
%3|1618450168.254| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Verbindung zu IPv4#192.168.1.175:9092 fehlgeschlagen: Unbekannter Fehler (nach 10 ms im Zustand CONNECT, 3 identische Fehler(n) unterdrückt)

 Vermieter| Veröffentlicht am 13.04.2021 16:26:58 |
Prinzip des Nachrichtenkonsums:

Im eigentlichen Produktionsprozess hat jedes Thema mehrere Partitionen, und der Vorteil mehrerer Partitionen besteht darin, dass einerseits die Möglichkeit, die Daten im Broker zu sharden, die Kapazität der Nachrichten effektiv reduziert und die IO-Leistung verbessert. Andererseits wird dasselbe Thema zur Verbesserung der Verbrauchskraft der Verbraucherseite in der Regel über mehrere Verbraucher konsumiert, also über den Lastverteilungsmechanismus der Verbraucherseite, was wir als Nächstes verstehen werden: Wie konsumieren Verbraucher Nachrichten im Fall mehrerer Partitionen und mehrerer Verbraucher? Kafka existiert im Konzept der Konsumentengruppen, das heißt, group.id dieselbe Art von Konsumenten, die zu einer Konsumentengruppe gehören, und alle Konsumenten der Gruppe koordinieren, um alle Partitionen des Abonnementthemas zu konsumieren. Natürlich kann jede Partition nur von Konsumenten derselben Konsumentengruppe konsumiert werden, also wie verteilen Verbraucher derselben Konsumentengruppe die Daten, in denen die Partition konsumiert werden soll? Ein einfaches Beispiel: Wenn es Partitionen gibt, die verlieren, das heißt, wenn die Anzahl der Partitonen der Anzahl der Komsumanten entspricht, entspricht jeder Komsummator einer Partition; wenn die Anzahl der Komsumanten größer ist als die Partitionen, funktioniert die zusätzliche Anzahl der Komsumanten nicht, im Gegenteil, es gibt Komsumatoren, die mehrere Partitionen verbrauchen.

Strategie zur Zuweisung der Zonierung:

In Kafka gibt es zwei Partitionsallokationsstrategien: eine ist Range (Standard) und die andere RoundRobin (Polling). Dies wird durch den Konfigurationsparameter partition.assignment.strategy des Komsumers gesetzt.


Sehen Sie alle Themen


Details zu einem Thema anzeigen




 Vermieter| Veröffentlicht am 08.05.2021 17:17:33 |
Kafka löscht Konsumentengruppen



Die Löschung der angeforderten Konsumentengruppen ("itsvse") war erfolgreich.


Folgende Fehler können gemeldet werden:

Error: Deletion of some consumer groups failed:
* Gruppe 'itsvse' konnte nicht gelöscht werden wegen: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: Die Gruppe ist nicht leer.
Lösung

Verbrauche alle Nachrichten oder setze einen Offset

Kafka stellt den Offset manuell ein
https://www.itsvse.com/thread-9641-1-1.html
Dann lösche es nochmal!

 Vermieter| Veröffentlicht am 13.04.2021 15:40:48 |
Power Shell Command



Jeder Verbraucherclient unterhält zwei Verbindungen zum Kafka-Dienst
 Vermieter| Veröffentlicht am 07.05.2021 12:37:06 |
Kafka, um die Anzahl der Themenstapel unter einer bestimmten Gruppe anzuzeigen

Veröffentlicht am 16.06.2021 12:41:09 |
Bitte fragen Sie, warum der Code nicht angezeigt werden kann~
 Vermieter| Veröffentlicht am 25.06.2021 10:50:06 |
Kafka erhält den Befehl zur Themengröße:



 Vermieter| Veröffentlicht am 18.07.2021 10:15:01 |
Kafka-Kommandozeile zum Erstellen von Themen:

Veröffentlicht am 03.09.2021 11:52:41 |
Es gibt immer noch viele Fallstricke in der Kafka, gelernt
Verzichtserklärung:
Alle von Code Farmer Network veröffentlichten Software, Programmiermaterialien oder Artikel dienen ausschließlich Lern- und Forschungszwecken; Die oben genannten Inhalte dürfen nicht für kommerzielle oder illegale Zwecke verwendet werden, andernfalls tragen die Nutzer alle Konsequenzen. Die Informationen auf dieser Seite stammen aus dem Internet, und Urheberrechtsstreitigkeiten haben nichts mit dieser Seite zu tun. Sie müssen die oben genannten Inhalte innerhalb von 24 Stunden nach dem Download vollständig von Ihrem Computer löschen. Wenn Ihnen das Programm gefällt, unterstützen Sie bitte echte Software, kaufen Sie die Registrierung und erhalten Sie bessere echte Dienstleistungen. Falls es eine Verletzung gibt, kontaktieren Sie uns bitte per E-Mail.

Mail To:help@itsvse.com