Dit artikel is een spiegelartikel van machinevertaling, klik hier om naar het oorspronkelijke artikel te gaan.

Bekijken: 129246|Antwoord: 17

[Bron] Kafka-operaties voor .NET/C# berichtwachtrijen [met broncode]

[Link kopiëren]
Geplaatst op 13-04-2021 11:45:31 | | | |
Kafka is een high-performance, gedistribueerd berichtensysteem ontwikkeld door LinkedIn dat veel wordt gebruikt in scenario's zoals logverzameling, verwerking van streaming data, online en offline distributie van berichten, en meer. Hoewel Kafaka niet als een traditionele MQ is ontworpen, kan het in de meeste gevallen traditionele berichtensystemen zoals ActiveMQ vervangen.

Kafka organiseert de stroom van berichten per onderwerp, en de server die de berichten bewaart heet een broker, en consumenten kunnen zich abonneren op één of meer onderwerpen. Om de belasting te balanceren, kunnen de berichten van een onderwerp worden onderverdeeld in meerdere partities, en hoe meer partities, hoe hoger de paralleliteit en doorvoer van Kafka.

Kafka-clusters vereisen Zookeeper-ondersteuning om clusters te implementeren, en Zookeeper is al opgenomen in de nieuwste Kafka-distributie, die kan worden ingezet om tegelijkertijd een Zookeeper-server en een Kafka-server te starten, of om andere bestaande Zookeeper-clusters te gebruiken.

In tegenstelling tot traditionele MQ moeten consumenten zelf een offset bijhouden, en bij het ontvangen van berichten van Kafka trekken ze alleen berichten na de huidige offset. Kafka's scala/java-client implementeert dit deel van de logica al door de offset op te slaan naar de zookeeper. Elke consument kan een ID kiezen, en consumenten met hetzelfde ID ontvangen hetzelfde bericht slechts één keer.Als consumenten van een onderwerp allemaal dezelfde id gebruiken, is het een traditionele wachtrij. Als elke consument een ander ID gebruikt, is het een traditionele pub-sub.

Recensie:

Voeg ActiveMQ toe aan systeemservices onder Windows
https://www.itsvse.com/thread-6210-1-1.html

AantalAfPendingBerichten, BerichtenInlineer, Bericht...
https://www.itsvse.com/thread-4954-1-1.html

Samenvatting van informatie over ActiveMQ en RabbitMQ
https://www.itsvse.com/thread-4659-1-1.html

CentOS ActiveMQ is aan de dienst toegevoegd
https://www.itsvse.com/thread-4617-1-1.html

Centos 6.2 64-bit installatie activemq tutorial
https://www.itsvse.com/thread-4616-1-1.html

ActiveMQ5.15.3 start niet op, en UnsupportedClassVersionError wordt gerapporteerd
https://www.itsvse.com/thread-4615-1-1.html

Activemq onderwerpmachtigingen
https://www.itsvse.com/thread-4495-1-1.html

Gebruiker itsvse is niet bevoegd om te lezen van: ActiveMQ.Advisory.TempQueue, Activ...
https://www.itsvse.com/thread-4476-1-1.html

C# ActiveMQ-client abonneert zich op broncode
https://www.itsvse.com/thread-4470-1-1.html

.net/c# activemq om het verbindingsaccount en wachtwoord in te stellen
https://www.itsvse.com/thread-4282-1-1.html

Stel de gebruikersnaam en het wachtwoord in voor het ACTIVEMQ-thema en de wachtrij
https://www.itsvse.com/thread-4281-1-1.html

ActiveMQ wijzigt het wachtwoord voor websitebeheer
https://www.itsvse.com/thread-4280-1-1.html

activemq Persistent store is Volledig
https://www.itsvse.com/thread-4125-1-1.html

.NET/C# ActiveMQ-operatievoorbeeld [Broncode]
https://www.itsvse.com/thread-3907-1-1.html

Activemq gebruikersrechtconfiguratie
https://www.itsvse.com/thread-3906-1-1.html

Het verschil tussen activemq Queue en Topic is dat
https://www.itsvse.com/thread-3863-1-1.html

. .Net-platform
https://www.itsvse.com/thread-3452-1-1.html

ActiveMQ persistent abonnementsinstellingen
https://www.itsvse.com/thread-3451-1-1.html

RabbitMQ BasicQos consumenten parallelle verwerkingslimiet
https://www.itsvse.com/thread-4667-1-1.html

rabbitMQ Queue Queue Message Persistence [met broncode]
https://www.itsvse.com/thread-4657-1-1.html

【Practice】rabbitMQ console om accountinformatie toe te voegen
https://www.itsvse.com/thread-4655-1-1.html

Een diepgaande analyse van het mechanisme van RabbitMQ-berichtrespons
https://www.itsvse.com/thread-4639-1-1.html

.net/c# RabbitMQ verbinding verbreken - verbinding verbreken en herverbinden
https://www.itsvse.com/thread-4636-1-1.html

Introductie tot de drie uitwisselingsmodi (fanout, direct en topic) van RabbitMQ
https://www.itsvse.com/thread-4635-1-1.html

【Practice】RabbitMQ installeert de webbeheer-plugin
https://www.itsvse.com/thread-4631-1-1.html

【Practical Combat】RabbitMQ installatietutorial onder Windows
https://www.itsvse.com/thread-4630-1-1.html
Kafkaconsumptie

1. Consumenten van dezelfde group_id, slechts één consument kan berichten consumeren (wachtrijmodus

2. Consumenten van verschillende group_id ontvangen hetzelfde nieuws

Voordelen van Kafka

Gedistribueerd en zeer schaalbaar. Kafka-clusters kunnen transparant worden opgeschaald om nieuwe servers aan het cluster toe te voegen.

Hoge prestaties. De prestaties van Kafka overtreffen die van traditionele MQ-implementaties zoals ActiveMQ en RabbitMQ ruimschoots, met name Kafka, dat ook batchbewerkingen ondersteunt. De volgende afbeelding toont de resultaten van LinkedIn's consumentenprestatie-stresstest:

Fouttolerantie. Data van elke partitie in Kafka wordt gerepliceerd naar meerdere servers. Wanneer een makelaar faalt, zal de ZooKeeper-dienst de producent en de consument informeren, waarna ze overstappen naar een andere makelaar.

Nadelen van Kafka:

Herhaal berichten. Kafka garandeert slechts dat elk bericht minstens één keer wordt afgeleverd, en hoewel de kans klein is, is er een kans dat een bericht meerdere keren wordt bezorgd.
Het nieuws is niet in orde. Hoewel berichten binnen een partitie gegarandeerd ordelijk zijn, is het niet gegarandeerd dat de levering tussen partities ordelijk is als een onderwerp meerdere partities heeft.
Complexiteit. Kafka heeft ondersteuning nodig van zookeeper-clusters, en onderwerpen vereisen meestal handmatige arbeid om te creëren, uitrollen en onderhouden duurder dan algemene berichtwachtrijen

.NET/C# berichtenwachtrij Kafka-operaties

Gebruik eerst .NET Core 3.1 om twee nieuwe consoleprojecten te creëren, namelijk Kafka-Consumer en Kafka-Producer

Gebruik nuget om het Confluent.Kafka-pakket zo te refereren, met het volgende commando:

GitHub-adres:De hyperlink-login is zichtbaar.

We starten eerst het Producer-programma, en als we eerst de consument starten, krijgen we de volgende foutmelding:
Fout opgetreden: Broker: Onbekend onderwerp of partitie

Dit artikel zal de instellingen opnemenEnableAutoOffsetStore is onjuist, dat wil zeggen, handmatig instellen van de offsetopslag (vergelijkbaar met een handmatige bevestigingsmelding)

Consumenten stellen OffsetStore niet in na consumptie

Probeer de producer te gebruiken om twee berichten te produceren, zet consumentenverbruik aan, MaxPollIntervalMs = 10000 // 10 seconden zonder handmatige instellingen, laat andere clients het consumeren, natuurlijk wordt het niet binnen 10 seconden door andere clients verbruikt

MaxPollIntervalMs legt het uit
Voor gevorderde gebruikers is de maximale toegestane tijd om berichten tussen gesprekken te verwerken (bijvoorbeeld rd_kafka_consumer_poll()). Als dit interval wordt overschreden, wordt de consument als mislukt beschouwd en wordt de groep opnieuw gebalanceerd zodat de partitie wordt toegewezen aan een ander lid van de consumentengroep. Waarschuwing: Offset commits zijn op dit moment mogelijk niet mogelijk. Opmerking: Het wordt aanbevolen om "enable.auto.offset.store=false" in te stellen voor applicaties die lange tijd aan het verwerken zijn, en vervolgens expliciet de offset op te slaan (met offsets_store()) nadat het bericht is verwerkt* om te garanderen dat de offset niet automatisch wordt gecommitteerd voordat de verwerking is voltooid. Controleer één keer per seconde met tussenpozen van twee. Voor meer informatie, zie KIP-62.

De renderings zijn als volgt:



OffsetStore wordt ingesteld nadat de consument heeft besteed

code

Wacht na de installatie 10 seconden en dan gebeurt het nog steedsHet laatste bericht ontvangen(Wanneer de consument verbinding maakt met de makelaar,Start de consumptie vanuit de offsetpositieAls c.Commit(cr) is ingesteld; Het laatste bericht wordt niet herhaaldelijk ontvangen.

Bekijk broncode



commit de offset + 1 commit, en roep uiteindelijk Librdkafka.topic_partition_list_destroy(cOffsets) aan;

De hyperlink-login is zichtbaar.
De hyperlink-login is zichtbaar.

Stel een andere GroupId in

Probeer een andere GroupId in te stellen via de commandoregelparameter, en stuur vervolgens een bericht via de producer, zoals te zien is in de volgende afbeelding:



Zowel clinet1 als client2Ontvang historische berichten, en nadat de producent een bericht heeft gestuurd, zullen ze bijna allebeiTegelijkertijd berichten ontvangen

Nieuwe consumenten ontvangen alleen nieuwe berichten

Hoe zorg je ervoor dat een nieuwe klant alleen nieuwe berichten ontvangt en historische data negeert?

De instellingen zijn als volgt:

Zoals hieronder getoond:



Producentcode

Als volgt:

Consumentencode

Als volgt:

Broncode downloaden

Toeristen, als jullie de verborgen inhoud van dit bericht willen zien, alsjeblieftAntwoord






Vorig:.NET/C# Uitzondering met Tencent Enterprise Mailbox: De operatie is afgelopen.
Volgend:NuGet wist de cache
 Huisbaas| Geplaatst op 15-04-2021 09:31:05 |
Wanneer de .NET Kafka-client wordt losgekoppeld, wordt er geen uitzondering gegooid en wordt het opnieuw verbonden nadat het netwerk normaal is
%4|1618450028.267| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Losgekoppeld (na 59926ms in staat UP)
%3|1618450028.267| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Losgekoppeld (na 59926ms in staat UP)
%3|1618450028.267| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Verbinding met ipv4#192.168.1.175:9092 mislukt: Onbekende fout (na 0ms in toestand CONNECT)
%3|1618450028.268| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Verbind met ipv4#192.168.1.175:9092 mislukt: Onbekende fout (na 0ms in toestand CONNECT)
%3|1618450028.357| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Verbinding maken met ipv4#192.168.1.175:9092 mislukt: Onbekende fout (na 10ms in staat CONNECT, 1 identieke fout(en) onderdrukt)
%3|1618450028.357| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Verbinding met IPv4#192.168.1.175:9092 mislukt: Onbekende fout (na 10ms in toestand CONNECT, 1 identieke fout(en) onderdrukt)
%3|1618450062.882| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Verbinding met IPv4#192.168.1.175:9092 mislukt: Onbekende fout (na 0ms in toestand CONNECT, 8 identieke fout(en) onderdrukt)
%3|1618450062.882| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Verbinding met ipv4#192.168.1.175:9092 mislukt: Onbekende fout (na 0ms in toestand CONNECT, 8 identieke fout(en) onderdrukt)
%3|1618450098.255| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Verbinding maken met ipv4#192.168.1.175:9092 mislukt: Onbekende fout (na 11ms in toestand CONNECT, 4 identieke fout(en) onderdrukt)
%3|1618450098.255| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Verbind met ipv4#192.168.1.175:9092 mislukt: Onbekende fout (na 11ms in toestand CONNECT, 4 identieke fout(en) onderdrukt)
%3|1618450138.243| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Verbinding met IPv4#192.168.1.175:9092 mislukt: Onbekende fout (na 0ms in toestand CONNECT, 4 identieke fout(en) onderdrukt)
%3|1618450138.244| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Verbind met ipv4#192.168.1.175:9092 mislukt: Onbekende fout (na 0ms in toestand CONNECT, 4 identieke fout(en) onderdrukt)
%3|1618450168.254| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Verbinding maken met ipv4#192.168.1.175:9092 mislukt: Onbekende fout (na 10ms in staat CONNECT, 3 identieke fout(en) onderdrukt)
%3|1618450168.254| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Verbind met IPv4#192.168.1.175:9092 mislukt: Onbekende fout (na 10ms in toestand CONNECT, 3 identieke fout(en) onderdrukt)

 Huisbaas| Geplaatst op 13-04-2021 16:26:58 |
Principe van berichtconsumptie:

In het daadwerkelijke productieproces zal elk onderwerp meerdere partities hebben, en het voordeel van meerdere partities is dat enerzijds de mogelijkheid om de data op de broker te sharden effectief de capaciteit van berichten vermindert en de IO-prestaties verbetert. Aan de andere kant, om de consumptiekracht van de consumentenkant te verbeteren, wordt hetzelfde onderwerp meestal via meerdere consumenten gebruikt, dat wil zeggen het load balancing-mechanisme van de consumentenkant, wat we als volgende zullen begrijpen: hoe consumeren consumenten boodschappen in het geval van meerdere partities en meerdere consumenten? Kafka bestaat in het concept van consumentengroepen, dat wil zeggen group.id hetzelfde soort consumenten, die tot een consumentengroep behoren, en alle consumenten in die groep coördineren om alle partities van het abonnementsonderwerp te consumeren. Natuurlijk kan elke partitie alleen worden gebruikt door consumenten in dezelfde consumentengroep, dus hoe verdelen consumenten in dezelfde consumentengroep de data waarin de partitie gebruikt moet worden? Als eenvoudig voorbeeld: als er partities verliezen, dat wil zeggen, wanneer het aantal partitonen gelijk is aan het aantal comsumers, elke comsumer overeenkomt met een partitie; als het aantal comsumers groter is dan de partities, dan werkt het extra aantal comsumers niet, integendeel, er zullen comsumers meerdere partities verbruiken.

Strategie voor bestemmingsplannen:

In Kafka zijn er twee partitie-allocatiestrategieën: één is Range (standaard) en de andere is RoundRobin (polling). Dit wordt ingesteld door de configuratieparameter partition.assignment.strategy van de comsumer.


Bekijk alle onderwerpen


Bekijk details voor een onderwerp




 Huisbaas| Geplaatst op 08-05-2021 17:17:33 |
Kafka verwijdert consumentengroepen



Het verwijderen van gevraagde consumentengroepen ('itsvse') was succesvol.


De volgende fouten kunnen worden gemeld:

Error: Deletion of some consumer groups failed:
* Groep 'itsvse' kon niet worden verwijderd wegens: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: De groep is niet leeg.
oplossing

Consumeer alle berichten, of stel een offset in

Kafka stelt handmatig de offset in
https://www.itsvse.com/thread-9641-1-1.html
Verwijder het dan weer!

 Huisbaas| Geplaatst op 13-04-2021 15:40:48 |
Power Shell Command



Elke consumentenclient onderhoudt 2 verbindingen met de Kafka-dienst
 Huisbaas| Geplaatst op 07-05-2021 12:37:06 |
Kafka, om het aantal stapels onderwerpen onder een bepaalde groep te bekijken

Geplaatst op 16-06-2021 12:41:09 |
Vraag alsjeblieft waarom de code niet bekeken kan worden~
 Huisbaas| Geplaatst op 25-06-2021 10:50:06 |
Kafka krijgt het commando voor onderwerpgrootte:



 Huisbaas| Geplaatst op 18-07-2021 10:15:01 |
Kafka-opdrachtregel om onderwerpen te maken:

Geplaatst op 03-09-2021 11:52:41 |
Er zijn nog steeds veel valkuilen in Kafka, geleerd
Disclaimer:
Alle software, programmeermaterialen of artikelen die door Code Farmer Network worden gepubliceerd, zijn uitsluitend bedoeld voor leer- en onderzoeksdoeleinden; De bovenstaande inhoud mag niet worden gebruikt voor commerciële of illegale doeleinden, anders dragen gebruikers alle gevolgen. De informatie op deze site komt van het internet, en auteursrechtconflicten hebben niets met deze site te maken. Je moet bovenstaande inhoud volledig van je computer verwijderen binnen 24 uur na het downloaden. Als je het programma leuk vindt, steun dan de echte software, koop registratie en krijg betere echte diensten. Als er sprake is van een inbreuk, neem dan contact met ons op via e-mail.

Mail To:help@itsvse.com