Acest articol este un articol oglindă al traducerii automate, vă rugăm să faceți clic aici pentru a sări la articolul original.

Vedere: 129246|Răspunde: 17

[Sursă] Operații Kafka pentru cozi de mesaje .NET/C# [cu cod sursă]

[Copiază linkul]
Postat pe 13.04.2021 11:45:31 | | | |
Kafka este un sistem de mesagerie distribuit, de înaltă performanță, dezvoltat de LinkedIn, folosit pe scară largă în scenarii precum colectarea jurnalelor, procesarea datelor în streaming, distribuirea de mesaje online și offline și altele. Deși nu este proiectat ca un MQ tradițional, Kafaka poate înlocui sistemele tradiționale de mesagerie precum ActiveMQ în majoritatea cazurilor.

Kafka organizează fluxul mesajelor pe subiecte, iar serverul care stochează mesajele se numește broker, iar consumatorii se pot abona la unul sau mai multe subiecte. Pentru a echilibra încărcătura, mesajele unui subiect pot fi împărțite în mai multe partiții, iar cu cât sunt mai multe partiții, cu atât paralelismul și debitul Kafka sunt mai mari.

Clusterele Kafka necesită suport zookeeper pentru a implementa clustere, iar zookeeper este deja inclus în cea mai recentă distribuție kafka, care poate fi implementată pentru a începe simultan un server zookeeper și un server Kafka sau pentru a folosi alte clustere de zookeeper existente.

Spre deosebire de MQ tradițional, consumatorii trebuie să păstreze singuri un offset, iar când primesc mesaje de la kafka, trag mesaje doar după offset-ul curent. Clientul scala/java al Kafka implementează deja această parte a logicii salvând offset-ul către îngrijitor zoologic. Fiecare consumator poate alege un ID, iar consumatorii cu același ID vor primi același mesaj o singură dată.Dacă toți consumatorii unui subiect folosesc același ID, este o coadă tradițională. Dacă fiecare consumator folosește un ID diferit, este un pub-sub tradițional.

Recenzie:

Adaugă ActiveMQ la serviciile de sistem din Windows
https://www.itsvse.com/thread-6210-1-1.html

NumărulDeMesajeAșteptate, MesajeAliniate, Mesaje...
https://www.itsvse.com/thread-4954-1-1.html

Rezumatul informațiilor despre ActiveMQ și RabbitMQ
https://www.itsvse.com/thread-4659-1-1.html

CentOS ActiveMQ este adăugat serviciului
https://www.itsvse.com/thread-4617-1-1.html

Tutorial activmq pentru instalarea pe 64 de biți a Centos 6.2
https://www.itsvse.com/thread-4616-1-1.html

ActiveMQ5.15.3 nu pornește, iar UnsupportedClassVersionError este raportată
https://www.itsvse.com/thread-4615-1-1.html

Setări de permisiuni pentru subiecte activemq
https://www.itsvse.com/thread-4495-1-1.html

Utilizatorul itsvse nu este autorizat să citească din: ActiveMQ.Advisory.TempQueue,Activ...
https://www.itsvse.com/thread-4476-1-1.html

Clientul C# ActiveMQ se abonează la codul sursă
https://www.itsvse.com/thread-4470-1-1.html

.net/c# activemq pentru a seta contul de conexiune și parola
https://www.itsvse.com/thread-4282-1-1.html

Setează numele de utilizator și parola pentru tema și coada ACTIVEMQ
https://www.itsvse.com/thread-4281-1-1.html

activemq modifică parola de gestionare a site-ului
https://www.itsvse.com/thread-4280-1-1.html

activemq Persistent store este plin
https://www.itsvse.com/thread-4125-1-1.html

Exemplu de operație .NET/C# ActiveMQ [Cod sursă]
https://www.itsvse.com/thread-3907-1-1.html

Configurarea permisiunilor utilizatorului Activemq
https://www.itsvse.com/thread-3906-1-1.html

Diferența dintre activemq Queue și Topic este că
https://www.itsvse.com/thread-3863-1-1.html

. Platforma .Net
https://www.itsvse.com/thread-3452-1-1.html

Setări persistente de abonament ActiveMQ
https://www.itsvse.com/thread-3451-1-1.html

Limita procesării paralele pentru consumatori RabbitMQ BasicQos
https://www.itsvse.com/thread-4667-1-1.html

Persistența mesajelor rabbitMQ Queue Queue [cu cod sursă]
https://www.itsvse.com/thread-4657-1-1.html

【Practice】rabbitMQ consolă pentru a adăuga informații despre cont
https://www.itsvse.com/thread-4655-1-1.html

O analiză aprofundată a mecanismului de răspuns la mesaje RabbitMQ
https://www.itsvse.com/thread-4639-1-1.html

.net/c# Deconectare conexiune RabbitMQ - deconectare și reconectare
https://www.itsvse.com/thread-4636-1-1.html

Introducere în cele trei moduri de schimb (fanout, direct și topic) ale RabbitMQ
https://www.itsvse.com/thread-4635-1-1.html

【Practice】RabbitMQ instalează pluginul de management web
https://www.itsvse.com/thread-4631-1-1.html

【Practical Combat】Tutorial de instalare RabbitMQ sub Windows
https://www.itsvse.com/thread-4630-1-1.html
Consum kafka

1. Consumatorii de același group_id, doar un singur consumator poate consuma mesaje (Modul coadă de coadă

2. Consumatorii din group_id diferite primesc aceleași vești

Avantajele Kafka

Distribuit și foarte scalabil. Clusterele Kafka pot fi scalate transparent pentru a adăuga servere noi în cluster.

Performanță ridicată. Performanța Kafka depășește cu mult implementările tradiționale MQ, cum ar fi ActiveMQ și RabbitMQ, în special Kafka, care suportă și operațiuni batch. Imaginea următoare arată rezultatele testului de stres de performanță al consumatorilor de la LinkedIn:

Toleranță la greșeli. Datele din fiecare partiție din Kafka sunt replicate către mai multe servere. Când un broker eșuează, serviciul ZooKeeper va notifica producătorul și consumatorul, care trec la un alt broker.

Dezavantaje ale Kafka:

Repetă mesajele. Kafka garantează că fiecare mesaj va fi livrat cel puțin o dată, iar deși șansele sunt mici, există șansa ca un mesaj să fie livrat de mai multe ori.
Știrile sunt nepotrivite. Deși mesajele din interiorul unei partiții sunt garantate să fie ordonate, dacă un subiect are mai multe partiții, livrarea mesajelor între partiții nu este garantată să fie ordonată.
Complexitate. Kafka necesită sprijinul clusterelor de îngrijitori de grădini zoologice, iar subiectele necesită de obicei muncă manuală pentru a fi create, implementate și menținute mai costisitoare decât cozile generale de mesaje

.NET/C# operații Kafka din coada mesajelor .NET/C#

În primul rând, folosește .NET Core 3.1 pentru a crea două noi proiecte de consolă, și anume Kafka-Consumer și Kafka-Producer

Folosește nuget pentru a face referire la pachetul Confluent.Kafka astfel, cu următoarea comandă:

Adresă GitHub:Autentificarea cu hyperlink este vizibilă.

Începem mai întâi programul Producer, iar dacă începem mai întâi cu consumatorul, vom primi următoarea eroare:
A apărut o eroare: Broker: Subiect sau partiție necunoscută

Acest articol va consuma setărileEnableAutoOffsetStore este fals, adică setarea manuală a stocării offset (similar unui mesaj de confirmare manuală)

Consumatorii nu setează OffsetStore după consum

Încearcă să folosești producătorul pentru a produce două mesaje, activează consumul consumatorilor, MaxPollIntervalMs = 10000 // 10 secunde fără setare manuală, permite altor clienți să consume, desigur, nu va fi consumat de alți clienți în 10 secunde

MaxPollIntervalMs explică
Pentru consumatorii avansați, timpul maxim permis pentru a consuma mesajele între apeluri (de exemplu, rd_kafka_consumer_poll()). Dacă acest interval este depășit, consumatorul este considerat că a eșuat și grupul este reechilibrat astfel încât partiția să fie realocată unui alt membru al grupului de consumatori. Atenție: S-ar putea să nu fie posibile commit-uri offset în acest moment. Notă: Se recomandă să se seteze "enable.auto.offset.store=false" pentru aplicațiile care sunt procesate de mult timp, apoi să se stocheze explicit offset-ul (folosind offsets_store()) după procesarea mesajului* pentru a se asigura că offset-ul nu este automat confirmat înainte de finalizarea procesării. Verifică o dată pe secundă, la intervale de câte două. Pentru mai multe informații, vezi KIP-62.

Randările sunt următoarele:



OffsetStore este setat după ce consumatorul termină de cheltuit

cod

După ce configurarea este completă, așteaptă 10 secunde și tot va funcționaAm primit ultimul mesaj(Când consumatorul se conectează cu brokerul,Începe consumul din poziția de offsetDacă c.Commit(cr) este setat; Ultimul mesaj nu va fi primit în mod repetat.

Vezi codul sursă



să commit offset-ul + 1 și în cele din urmă să apeleze Librdkafka.topic_partition_list_destroy(cOffsets);

Autentificarea cu hyperlink este vizibilă.
Autentificarea cu hyperlink este vizibilă.

Setează un alt GroupID

Încearcă să setezi un alt GroupId prin parametrul de linie de comandă, apoi trimite un mesaj prin producător, așa cum se arată în imaginea următoare:



Atât clinet1, cât și client2Primește mesaje istorice, iar după ce producătorul trimite un mesaj, amândoi aproape că vor fiPrimește mesaje în același timp

Consumatorii noi primesc doar mesaje noi

Cum faci ca un client nou să primească doar mesaje noi și să ignore datele istorice?

Setările sunt următoarele:

Așa cum se arată mai jos:



Cod producător

Așa cum urmează:

Cod pentru consumatori

Așa cum urmează:

Descărcare codului sursă

Turiști, dacă vreți să vedeți conținutul ascuns al acestei postări, vă rogRăspunde






Precedent:.NET/C# Excepție folosind Tencent Enterprise Mailbox: Operațiunea a expirat.
Următor:NuGet golește cache-ul
 Proprietarul| Postat pe 15.04.2021 09:31:05 |
Când clientul .NET Kafka este deconectat, nu afișează nicio excepție și se reconectează după ce rețeaua este normală
%4|1618450028.267| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Deconectat (după 59926ms în starea UP)
%3|1618450028,267| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Deconectat (după 59926ms în starea UP)
%3|1618450028,267| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed : Eroare necunoscută (după 0ms în starea CONNECT)
%3|1618450028.268| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed : Eroare necunoscută (după 0ms în starea CONNECT)
%3|1618450028.357| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connect la ipv4#192.168.1.175:9092 eșuat: Eroare necunoscută (după 10ms în starea CONNECT, 1 eroare identică suprimată)
%3|1618450028.357| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed : Eroare necunoscută (după 10ms în starea CONNECT, 1 eroare identică suprimată)
%3|1618450062,882| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed : Eroare necunoscută (după 0ms în starea CONNECT, 8 erori identice suprimate)
%3|1618450062,882| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed : Eroare necunoscută (după 0ms în starea CONNECT, 8 eroare identice suprimate)
%3|1618450098.255| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed : Eroare necunoscută (după 11ms în stare CONNECT, 4 eroare identice suprimate)
%3|1618450098.255| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed : Eroare necunoscută (după 11ms în starea CONNECT, 4 eroare identice suprimate)
%3|1618450138,243| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed : Eroare necunoscută (după 0ms în starea CONNECT, 4 eroare identice suprimate)
%3|1618450138,244| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect la ipv4#192.168.1.175:9092 eșuat: Eroare necunoscută (după 0ms în starea CONNECT, 4 eroare identice suprimate)
%3|1618450168,254| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed : Eroare necunoscută (după 10ms în starea CONNECT, 3 eroare identice suprimate)
%3|1618450168,254| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect la ipv4#192.168.1.175:9092 a eșuat: Eroare necunoscută (după 10ms în starea CONNECT, 3 erori identice suprimate)

 Proprietarul| Postat pe 13.04.2021 16:26:58 |
Principiul consumului mesajelor:

În procesul efectiv de producție, fiecare subiect va avea mai multe partiții, iar avantajul mai multor partiții este că, pe de o parte, capacitatea de a fragmenta datele pe broker reduce efectiv capacitatea mesajelor și îmbunătățește performanța IO-ului. Pe de altă parte, pentru a îmbunătăți puterea de consum a părții consumatorilor, același subiect va fi în general consumat prin mai mulți consumatori, adică mecanismul de echilibrare a încărcării al părții consumatorilor, ceea ce vom înțelege în continuare: cum consumă consumatorii mesajele în cazul mai multor partiții și mai mulți consumatori? Kafka există în conceptul de grupuri de consumatori, adică group.id același tip de consumatori, care aparțin unui grup de consumatori, iar toți consumatorii din grup se coordonează pentru a consuma toate partițiile subiectului de abonament. Desigur, fiecare partiție poate fi consumată doar de consumatori din același grup de consumatori, deci cum alocă consumatorii din același grup datele în care partiție ar trebui consumată? De exemplu simplu, dacă există partiții care pierd, adică când numărul de partitoni este același cu numărul de comumatori, fiecare comumator corespunde unei partiții, dacă numărul de comumatori este mai mare decât partițiile, atunci numărul suplimentar de comumatori nu va funcționa, dimpotrivă, vor exista comumatori care vor consuma mai multe partiții.

Strategia de alocare a zonei:

În kafka, există două strategii de alocare a partițiilor, una este Range (implicit) și cealaltă este RoundRobin (sondaje). Aceasta este setată de parametrul de configurare partition.assignment.strategy al comsumerului.


Vezi toate subiectele


Vezi detalii pentru un subiect




 Proprietarul| Postat pe 08.05.2021 17:17:33 |
Kafka șterge grupurile de consumatori



Ștergerea grupurilor de consumatori solicitate ("itsvse") a fost un succes.


Următoarele erori pot fi raportate:

Error: Deletion of some consumer groups failed:
* Grupul 'itsvse' nu a putut fi șters cauza: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: Grupul nu este gol.
soluție

Consumă toate mesajele sau setează un offset

Kafka setează manual offset-ul
https://www.itsvse.com/thread-9641-1-1.html
Apoi, șterge-l din nou!

 Proprietarul| Postat pe 13.04.2021 15:40:48 |
Comanda Power Shell



Fiecare client consumator menține 2 conexiuni la serviciul Kafka
 Proprietarul| Postat pe 07.05.2021 12:37:06 |
kafka, pentru a vedea numărul de teancuri de subiecte dintr-un grup specificat

Postat pe 16.06.2021 12:41:09 |
Vă rog să întrebați de ce nu poate fi vizualizat codul~
 Proprietarul| Postat pe 25.06.2021 10:50:06 |
Kafka primește comanda de mărime a subiectului:



 Proprietarul| Postat pe 18.07.2021 10:15:01 |
Linia de comandă Kafka pentru a crea subiecte:

Postat pe 03.09.2021 11:52:41 |
Încă există multe capcane în kafka, învățate
Disclaimer:
Tot software-ul, materialele de programare sau articolele publicate de Code Farmer Network sunt destinate exclusiv scopurilor de învățare și cercetare; Conținutul de mai sus nu va fi folosit în scopuri comerciale sau ilegale, altfel utilizatorii vor suporta toate consecințele. Informațiile de pe acest site provin de pe Internet, iar disputele privind drepturile de autor nu au legătură cu acest site. Trebuie să ștergi complet conținutul de mai sus de pe calculatorul tău în termen de 24 de ore de la descărcare. Dacă îți place programul, te rugăm să susții software-ul autentic, să cumperi înregistrarea și să primești servicii autentice mai bune. Dacă există vreo încălcare, vă rugăm să ne contactați prin e-mail.

Mail To:help@itsvse.com