Questo articolo è un articolo speculare di traduzione automatica, clicca qui per saltare all'articolo originale.

Vista: 129246|Risposta: 17

[Fonte] Operazioni Kafka per code di messaggi .NET/C# [con codice sorgente]

[Copiato link]
Pubblicato su 13/04/2021 11:45:31 | | | |
Kafka è un sistema di messaggistica distribuito ad alte prestazioni sviluppato da LinkedIn, ampiamente utilizzato in scenari come la raccolta di log, l'elaborazione di dati in streaming, la distribuzione di messaggi online e offline, e altro ancora. Sebbene non sia progettato come un MQ tradizionale, Kafaka può sostituire i sistemi di messaggistica tradizionali come ActiveMQ nella maggior parte dei casi.

Kafka organizza il flusso dei messaggi per argomenti, e il server che contiene i messaggi si chiama broker, e i consumatori possono iscriversi a uno o più argomenti. Per bilanciare il carico, i messaggi di un argomento possono essere divisi in più partizioni, e più partizioni, maggiore è il parallelismo e la capacità di Kafka.

I cluster Kafka richiedono il supporto di zookeeper per implementarli, e zookeeper è già incluso nell'ultima distribuzione kafka, che può essere implementata per avviare contemporaneamente un server zookeeper e un server Kafka, oppure utilizzare altri cluster di zookeeper esistenti.

A differenza del MQ tradizionale, i consumatori devono mantenere un offset da soli, e quando ricevono messaggi da kafka, estraggono i messaggi solo dopo lo offset corrente. Il client scala/java di Kafka implementa già questa parte della logica salvando lo offset allo zookeeper. Ogni consumatore può scegliere un ID, e chi ha lo stesso ID riceverà lo stesso messaggio solo una volta.Se i consumatori di un argomento usano tutti lo stesso id, si tratta di una coda tradizionale. Se ogni consumatore usa un ID diverso, si tratta di un pub-sub tradizionale.

Recensione:

Aggiungi ActiveMQ ai servizi di sistema sotto Windows
https://www.itsvse.com/thread-6210-1-1.html

NumeroMessaggi Pendenti, Messaggi In Fila, Messaggi...
https://www.itsvse.com/thread-4954-1-1.html

Riepilogo delle informazioni su ActiveMQ e RabbitMQ
https://www.itsvse.com/thread-4659-1-1.html

CentOS ActiveMQ viene aggiunto al servizio
https://www.itsvse.com/thread-4617-1-1.html

Tutorial di installazione attiva a 64 bit di Centos 6.2
https://www.itsvse.com/thread-4616-1-1.html

ActiveMQ5.15.3 non si avvia e viene segnalato UnsupportedClassVersionError
https://www.itsvse.com/thread-4615-1-1.html

Impostazioni dei permessi degli argomenti di activemq
https://www.itsvse.com/thread-4495-1-1.html

L'utente itsvse non è autorizzato a leggere da: ActiveMQ.Advisory.TempQueue,Activ...
https://www.itsvse.com/thread-4476-1-1.html

Il client ActiveMQ in C# si abbona al codice sorgente
https://www.itsvse.com/thread-4470-1-1.html

.net/c# activemq per impostare l'account di connessione e la password
https://www.itsvse.com/thread-4282-1-1.html

Imposta il nome utente e la password per il tema ACTIVEMQ e la coda
https://www.itsvse.com/thread-4281-1-1.html

Activemq modifica la password di gestione del sito web
https://www.itsvse.com/thread-4280-1-1.html

activemq Persistent store è pieno
https://www.itsvse.com/thread-4125-1-1.html

.NET/C# Esempio di operazione ActiveMQ [Codice sorgente]
https://www.itsvse.com/thread-3907-1-1.html

Configurazione dei permessi utente di Activemq
https://www.itsvse.com/thread-3906-1-1.html

La differenza tra activemq Queue e Topic è che
https://www.itsvse.com/thread-3863-1-1.html

. Piattaforma .Net
https://www.itsvse.com/thread-3452-1-1.html

Impostazioni persistenti di abbonamento ActiveMQ
https://www.itsvse.com/thread-3451-1-1.html

Limite di elaborazione parallela per consumatori RabbitMQ BasicQos
https://www.itsvse.com/thread-4667-1-1.html

Persistenza dei messaggi della coda rabbitMQ [con codice sorgente]
https://www.itsvse.com/thread-4657-1-1.html

【Pratica】rabbitMQ console per aggiungere informazioni sull'account
https://www.itsvse.com/thread-4655-1-1.html

Un'analisi approfondita del meccanismo di risposta ai messaggi di RabbitMQ
https://www.itsvse.com/thread-4639-1-1.html

Disconnessione e riconnessione RabbitMQ in .net/c#
https://www.itsvse.com/thread-4636-1-1.html

Introduzione alle tre modalità di scambio (fanout, direct e topic) di RabbitMQ
https://www.itsvse.com/thread-4635-1-1.html

【Practice】RabbitMQ installa il plugin di gestione web
https://www.itsvse.com/thread-4631-1-1.html

【Combattimento Pratico】Tutorial di installazione di RabbitMQ su Windows
https://www.itsvse.com/thread-4630-1-1.html
Consumo di kafka

1. I consumatori dello stesso group_id, solo un consumatore può consumare messaggi (Modalità coda in coda

2. I consumatori di group_id diversi ricevono le stesse notizie

Vantaggi di Kafka

Distribuito e altamente scalabile. I cluster Kafka possono essere scalati in modo trasparente per aggiungere nuovi server al cluster.

Alte prestazioni. Le prestazioni di Kafka superano di gran lunga quelle delle implementazioni tradizionali di MQ come ActiveMQ e RabbitMQ, in particolare Kafka, che supporta anche operazioni batch. L'immagine seguente mostra i risultati del test di stress sulle prestazioni dei consumatori di LinkedIn:

Tolleranza ai difetti. I dati di ogni partizione in Kafka vengono replicati su diversi server. Quando un broker fallisce, il servizio ZooKeeper avviserà il produttore e il consumatore, che passano a un altro broker.

Svantaggi di Kafka:

Ripeti i messaggi. Kafka garantisce che ogni messaggio venga consegnato almeno una volta e, sebbene le probabilità siano basse, c'è la possibilità che un messaggio venga consegnato più volte.
Le notizie sono fuori ordine. Sebbene i messaggi all'interno di una partizione siano garantiti ordinati, se un argomento ha più partizioni, la consegna dei messaggi tra partizioni non è garantita che sia ordinata.
Complessità. Kafka richiede il supporto di cluster di allevatori di zoo, e gli argomenti di solito richiedono lavoro manuale per creare, distribuire e mantenere più costosi delle code di messaggi generali

.NET/C# coda di messaggi operazioni Kafka

Per prima cosa, usa .NET Core 3.1 per creare due nuovi progetti per console, ovvero Kafka-Consumer e Kafka-Producer

Usa nuget per fare riferimento al pacchetto Confluent.Kafka in questo modo, con il seguente comando:

Indirizzo GitHub:Il login del link ipertestuale è visibile.

Iniziamo per primi il programma Producer, e se iniziamo per primi il consumatore, riceveremo il seguente errore:
Errore avvenuto: Broker: Argomento o partizione sconosciuta

Questo articolo approfondirà le impostazioniEnableAutoOffsetStore è falso, cioè, impostando manualmente la memoria offset (simile a un messaggio di conferma manuale)

I consumatori non impostano OffsetStore dopo il consumo

Prova a usare il produttore per produrre due messaggi, attiva il consumo del consumatore, MaxPollIntervalMs = 10000 // 10 secondi senza impostazione manuale, permette agli altri client di consumare, ovviamente non verrà consumato dagli altri clienti entro 10 secondi

MaxPollIntervalMs spiega
Per i consumatori avanzati, il tempo massimo consentito per consumare i messaggi tra una chiamata e l'altra (ad esempio, rd_kafka_consumer_poll()). Se questo intervallo viene superato, il consumatore viene considerato guasto e il gruppo viene ribilanciato in modo che la partizione venga riassegnata a un altro membro del gruppo di consumatori. Attenzione: Al momento i commit offset potrebbero non essere possibili. Nota: Si raccomanda di impostare "enable.auto.offset.store=false" per le applicazioni che sono in elaborazione a lungo, e poi memorizzare esplicitamente l'offset (usando offsets_store()) dopo l'elaborazione del messaggio* per assicurarsi che l'offset non venga automaticamente validato prima che l'elaborazione sia completata. Controlla una volta al secondo a intervalli di due minuti. Per ulteriori informazioni, vedi KIP-62.

Le rappresentazioni sono le seguenti:



OffsetStore viene impostato dopo che il consumatore ha finito di spendere

codice

Dopo che la configurazione è completata, aspetta 10 secondi e continuerà a funzionareRicevuto l'ultimo messaggio(Quando il consumatore si collega al broker,Inizia il consumo dalla posizione offsetSe c.Commit(cr) è impostato; L'ultimo messaggio non verrà ricevuto ripetutamente.

Visualizza codice sorgente



commit dello offset + 1, e infine chiamare Librdkafka.topic_partition_list_destroy(cOffsets);

Il login del link ipertestuale è visibile.
Il login del link ipertestuale è visibile.

Imposta un GroupID diverso

Prova a impostare un GroupId diverso tramite il parametro della riga di comando, e poi invia un messaggio tramite il produttore, come mostrato nell'immagine seguente:



Sia clinet1 che client2Ricevi messaggi storici, e dopo che il produttore invia un messaggio, entrambi saranno quasiRicevi messaggi contemporaneamente

I nuovi consumatori ricevono solo nuovi messaggi

Come si fa a far sì che un nuovo cliente riceva solo nuovi messaggi ignorando i dati storici?

Le impostazioni sono le seguenti:

Come mostrato di seguito:



Codice produttore

Come segue:

Codice per i consumatori

Come segue:

Download del codice sorgente

Turisti, se volete vedere il contenuto nascosto di questo post, vi pregoRisposta






Precedente:.NET/C# Eccezione usando Tencent Enterprise Mailbox: L'operazione è in scaduta.
Prossimo:NuGet svuota la cache
 Padrone di casa| Pubblicato su 15/04/2021 09:31:05 |
Quando il client .NET Kafka viene disconnesso, non rilascia un'eccezione e si riconnette dopo che la rete è normale
%4|1618450028,267| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Disconnesso (dopo 59926ms in stato UP)
%3|1618450028.267| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Disconnesso (dopo 59926ms nello stato UP)
%3|1618450028.267| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connessione a ipv4#192.168.1.175:9092 fallito: Errore sconosciuto (dopo 0ms nello stato CONNECT)
%3|1618450028,268| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connetti a ipv4#192.168.1.175:9092 fallito: Errore sconosciuto (dopo 0ms nello stato CONNECT)
%3|1618450028.357| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connettersi a ipv4#192.168.1.175:9092 fallito: Errore sconosciuto (dopo 10ms nello stato CONNECT, 1 errore identico soppresso)
%3|1618450028.357| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed : Errore sconosciuto (dopo 10ms nello stato CONNECT, 1 errore identico soppresso)
%3|1618450062,882| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connessione a ipv4#192.168.1.175:9092 fallito: Errore sconosciuto (dopo 0ms nello stato CONNECT, 8 errori identici soppressi)
%3|1618450062,882| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 fallito: Errore sconosciuto (dopo 0ms nello stato CONNECT, 8 errori identici soppressi)
%3|1618450098.255| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connettiti a ipv4#192.168.1.175:9092 fallito: Errore sconosciuto (dopo 11ms nello stato CONNECT, 4 errori identici soppressi)
%3|1618450098.255| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed : Errore sconosciuto (dopo 11ms nello stato CONNECT, 4 errori identici soppressi)
%3|1618450138,243| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connessione a ipv4#192.168.1.175:9092 fallito: Errore sconosciuto (dopo 0ms nello stato CONNECT, 4 errori identici soppressi)
%3|1618450138,244| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connetti a ipv4#192.168.1.175:9092 fallito: Errore sconosciuto (dopo 0ms nello stato CONNECT, 4 errori identici soppressi)
%3|1618450168,254| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connetti a ipv4#192.168.1.175:9092 fallito: Errore sconosciuto (dopo 10ms nello stato CONNECT, 3 errori identici soppressi)
%3|1618450168,254| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 fallito: Errore sconosciuto (dopo 10ms nello stato CONNECT, 3 errori identici soppressi)

 Padrone di casa| Pubblicato su 13/04/2021 16:26:58 |
Principio del consumo dei messaggi:

Nel processo produttivo vero e proprio, ogni argomento avrà più partizioni, e il vantaggio di più partizioni è che, da un lato, la capacità di sharding dei dati sul broker riduce efficacemente la capacità dei messaggi e migliora le prestazioni di IO. D'altra parte, per migliorare la potenza di consumo del lato consumatore, lo stesso argomento viene generalmente affrontato attraverso più consumatori, cioè il meccanismo di bilanciamento del carico del lato consumatore, che è ciò che capiremo dopo: come consumano i consumatori i messaggi nel caso di più partizioni e più consumatori? Kafka esiste nel concetto di gruppi di consumatori, cioè group.id lo stesso tipo di consumatori, che appartengono a un gruppo di consumatori, e tutti i consumatori del gruppo si coordinano per consumare tutte le partizioni dell'argomento dell'abbonamento. Ovviamente, ogni partizione può essere consumata solo dai consumatori dello stesso gruppo di consumatori, quindi come fanno i consumatori dello stesso gruppo di consumatori a allocare i dati in cui la partizione dovrebbe essere consumata? Per un esempio semplice, se ci sono partizioni che perdono, cioè quando il numero di partitoni è uguale al numero di comsumori, ogni sommatore corrisponde a una partizione; se il numero di sommatori è superiore a partizioni, allora il numero extra di sommatori non funzionerà; al contrario, ci saranno sommatori che consumano più partizioni.

Strategia per l'assegnazione urbanistica:

In kafka, esistono due strategie di allocazione di partizione, una è Range (predefinito) e l'altra è RoundRobin (polling). Questo è impostato dal parametro di configurazione partition.assignment.strategy del sommer.


Vedi tutti gli argomenti


Visualizza i dettagli di un argomento




 Padrone di casa| Pubblicato su 08/05/2021 17:17:33 |
Kafka elimina i gruppi di consumatori



La cancellazione dei gruppi di consumatori richiesti ('itsvse') è stata un successo.


I seguenti errori possono essere segnalati:

Error: Deletion of some consumer groups failed:
* Il gruppo 'itsvse' non è stato cancellato a causa di: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: Il gruppo non è vuoto.
soluzione

Consuma tutti i messaggi o imposta un offset

Kafka imposta manualmente l'offset
https://www.itsvse.com/thread-9641-1-1.html
Poi, cancellala di nuovo!

 Padrone di casa| Pubblicato su 13/04/2021 15:40:48 |
Comando Power Shell



Ogni cliente consumer mantiene 2 connessioni al servizio Kafka
 Padrone di casa| Pubblicato su 07/05/2021 12:37:06 |
kafka, per visualizzare il numero di pile di argomenti sotto un gruppo specificato

Pubblicato su 16/06/2021 12:41:09 |
Per favore, chiedi perché il codice non può essere visualizzato~
 Padrone di casa| Pubblicato su 25/06/2021 10:50:06 |
Kafka ottiene il comando dimensione del topic:



 Padrone di casa| Pubblicato su 18/07/2021 10:15:01 |
Linea di comando Kafka per creare argomenti:

Pubblicato su 03/09/2021 11:52:41 |
Ci sono ancora molte insidie nella kafka, apprese
Disconoscimento:
Tutto il software, i materiali di programmazione o gli articoli pubblicati dalla Code Farmer Network sono destinati esclusivamente all'apprendimento e alla ricerca; I contenuti sopra elencati non devono essere utilizzati per scopi commerciali o illegali, altrimenti gli utenti dovranno sostenere tutte le conseguenze. Le informazioni su questo sito provengono da Internet, e le controversie sul copyright non hanno nulla a che fare con questo sito. Devi eliminare completamente i contenuti sopra elencati dal tuo computer entro 24 ore dal download. Se ti piace il programma, ti preghiamo di supportare software autentico, acquistare la registrazione e ottenere servizi autentici migliori. In caso di violazione, vi preghiamo di contattarci via email.

Mail To:help@itsvse.com