Ten artykuł jest lustrzanym artykułem tłumaczenia maszynowego, kliknij tutaj, aby przejść do oryginalnego artykułu.

Widok: 129246|Odpowiedź: 17

[Źródło] Operacje Kafki dla kolejek komunikatów .NET/C# [z kodem źródłowym]

[Skopiuj link]
Opublikowano 13.04.2021 11:45:31 | | | |
Kafka to wysokowydajny, rozproszony system komunikacji opracowany przez LinkedIn, szeroko wykorzystywany w takich sytuacjach jak zbieranie logów, przetwarzanie danych strumieniowych, dystrybucja wiadomości online i offline oraz wiele innych. Chociaż nie jest zaprojektowany jako tradycyjny MQ, Kafaka może w większości przypadków zastąpić tradycyjne systemy komunikacji, takie jak ActiveMQ.

Kafka organizuje przepływ wiadomości według tematów, a serwer, który je przechowuje, nazywany jest brokerem, a konsumenci mogą subskrybować jeden lub więcej tematów. Aby zrównoważyć obciążenie, przekazy tematu można podzielić na wiele partycji, a im więcej partycji, tym większa równoległość i przepustowość Kafki.

Klastry Kafka wymagają wsparcia Zookeeper do implementacji klastrów, a Zookeeper jest już włączony do najnowszej dystrybucji Kafka, którą można wdrożyć do uruchomienia serwera Zookeeper i serwera Kafka jednocześnie, lub korzystać z innych istniejących klastrów Zookeeper.

W przeciwieństwie do tradycyjnego MQ, konsumenci muszą samodzielnie przechowywać offset, a gdy otrzymują wiadomości od Kafki, pobierają wiadomości dopiero po aktualnym offsetie. Klient Scala/Java Kafki już implementuje tę część logiki, zapisując offset do Zookeepera. Każdy konsument może wybrać identyfikator, a użytkownicy tego samego identyfikatora otrzymają tę samą wiadomość tylko raz.Jeśli wszyscy konsumenci tematu używają tego samego identyfikatora, jest to tradycyjna kolejka. Jeśli każdy konsument używa innego ID, jest to tradycyjny pub-sub.

Recenzja:

Dodaj ActiveMQ do usług systemowych pod Windows
https://www.itsvse.com/thread-6210-1-1.html

LiczbaOczekujących wiadomości, wiadomości w kolejce, wiadomości...
https://www.itsvse.com/thread-4954-1-1.html

Podsumowanie informacji o ActiveMQ i RabbitMQ
https://www.itsvse.com/thread-4659-1-1.html

Do usługi dodano CentOS ActiveMQ
https://www.itsvse.com/thread-4617-1-1.html

Centos 6.2 64-bitowa instalacja activemq
https://www.itsvse.com/thread-4616-1-1.html

ActiveMQ5.15.3 nie uruchamia się, a zgłaszany jest UnsupportedClassVersionError
https://www.itsvse.com/thread-4615-1-1.html

Ustawienia uprawnień tematu activemq
https://www.itsvse.com/thread-4495-1-1.html

Użytkownik nie ma upoważnienia do odczytu z: ActiveMQ.Advisory.TempQueue,Activ...
https://www.itsvse.com/thread-4476-1-1.html

Klient C# ActiveMQ subskrybuje kod źródłowy
https://www.itsvse.com/thread-4470-1-1.html

.net/c# activemq do ustawienia konta połączenia i hasła
https://www.itsvse.com/thread-4282-1-1.html

Ustaw nazwę użytkownika i hasło dla motywu i kolejki ACTIVEMQ
https://www.itsvse.com/thread-4281-1-1.html

ActiveMQ modyfikuje hasło do zarządzania stroną
https://www.itsvse.com/thread-4280-1-1.html

activemq Persistent store jest pełny
https://www.itsvse.com/thread-4125-1-1.html

.NET/C# Przykład operacji ActiveMQ [Kod źródłowy]
https://www.itsvse.com/thread-3907-1-1.html

Konfiguracja uprawnień użytkownika Activemq
https://www.itsvse.com/thread-3906-1-1.html

Różnica między kolejką activemq a tematem polega na tym, że
https://www.itsvse.com/thread-3863-1-1.html

. Platforma .Net
https://www.itsvse.com/thread-3452-1-1.html

Stałe ustawienia subskrypcji ActiveMQ
https://www.itsvse.com/thread-3451-1-1.html

RabbitMQ BasicQos konsumenckie ograniczenia przetwarzania równoległego
https://www.itsvse.com/thread-4667-1-1.html

rabbitMQ Kolejka Kolejka Trwałość Wiadomości [z kodem źródłowym]
https://www.itsvse.com/thread-4657-1-1.html

【Practice】rabbitMQ console to add info account
https://www.itsvse.com/thread-4655-1-1.html

Dogłębna analiza mechanizmu odpowiedzi na wiadomości RabbitMQ
https://www.itsvse.com/thread-4639-1-1.html

.net/c# Rozłączenie połączenia RabbitMQ – rozłączenie i ponowne połączenie
https://www.itsvse.com/thread-4636-1-1.html

Wprowadzenie do trzech trybów wymiany (fanout, direct i topic) w RabbitMQ
https://www.itsvse.com/thread-4635-1-1.html

【Practice】RabbitMQ instaluje wtyczkę zarządzania siecią
https://www.itsvse.com/thread-4631-1-1.html

【Praktyczna walka】Tutorial instalacji RabbitMQ pod Windows
https://www.itsvse.com/thread-4630-1-1.html
Konsumpcja Kafki

1. Konsumenci tego samego group_id, tylko jeden konsument może konsumować wiadomości (Tryb kolejki kolejki

2. Konsumenci różnych group_id otrzymują te same wiadomości

Zalety Kafki

Rozproszone i wysoce skalowalne. Klastry Kafka mogą być transparentnie skalowane, aby dodawać nowe serwery do klastra.

Wysoka wydajność. Wydajność Kafki znacznie przewyższa tradycyjne implementacje MQ, takie jak ActiveMQ i RabbitMQ, zwłaszcza Kafka, która również obsługuje operacje wsadowe. Poniższy obraz pokazuje wyniki testu obciążenia konsumenckiego LinkedIn:

Odporność na błędy. Dane z każdej partycji w Kafka są replikowane na kilka serwerów. Gdy broker zawodzi, usługa ZooKeeper powiadomi producenta i konsumenta, którzy przechodzą do innego brokera.

Wady Kafki:

Powtarzaj wiadomości. Kafka gwarantuje, że każda wiadomość zostanie dostarczona przynajmniej raz, a choć szanse są niewielkie, istnieje szansa, że wiadomość zostanie dostarczona wielokrotnie.
Wiadomości są nie w porządku. Chociaż wiadomości wewnątrz partycji gwarantują uporządkowanie, jeśli temat ma wiele partycji, dostarczanie wiadomości między nimi nie jest gwarantowane uporządkowanie.
Złożoność. Kafka wymaga wsparcia klastrów Zookeeper, a tematy zazwyczaj wymagają ręcznej pracy, by stworzyć, wdrożyć i utrzymać je droższe niż ogólne kolejki wiadomości

Operacje Kafka w kolejce komunikatów .NET/C#

Po pierwsze, użyj .NET Core 3.1 do stworzenia dwóch nowych projektów konsolowych, mianowicie Kafka-Consumer i Kafka-Producer

Użyj nugeta, aby odwołać się do pakietu Confluent.Kafka w ten sposób, wykonując następujące polecenie:

Adres GitHub:Logowanie do linku jest widoczne.

Najpierw uruchamiamy program Producent, a jeśli najpierw uruchomimy konsumenta, pojawi się następujący błąd:
Wystąpił błąd: Broker: Nieznany temat lub partycja

Ten artykuł zajmie się miejscamiEnableAutoOffsetStore to nieprawdaczyli ręcznie ustawiając pamięć przesunięcia (podobnie jak ręczny komunikat potwierdzający)

Konsumenci nie ustawiają OffsetStore po spożyciu

Spróbuj użyć producenta do wygenerowania dwóch wiadomości, włącz konsumpcję konsumpcyjną, MaxPollIntervalMs = 10000 // 10 sekund bez ręcznego ustawiania, pozwól innym klientom na konsumpcję, oczywiście nie zostanie to zużyte przez innych klientów w ciągu 10 sekund

MaxPollIntervalMs wyjaśnia
Dla zaawansowanych konsumentów jest to maksymalny dozwolony czas na zużywanie wiadomości między połączeniami (na przykład rd_kafka_consumer_poll()). Jeśli ten przedział zostanie przekroczony, konsument uznaje się za niezgodnego i grupa jest ponownie zbalansowana, tak aby partycja została przypisana innemu członkowi grupy konsumenckiej. Ostrzeżenie: Commity offsetowe mogą być obecnie niemożliwe. Uwaga: Zaleca się ustawienie "enable.auto.offset.store=false" dla aplikacji przetwarzających przez dłuższy czas, a następnie jawnie przechowywanie offsetu (używając offsets_store()) po przetworzeniu wiadomości*, aby upewnić się, że offset nie zostanie automatycznie zatwierdzony przed zakończeniem przetwarzania. Sprawdzaj raz na sekundę w odstępach dwóch. Więcej informacji można znaleźć w KIP-62.

Przedstawienia przedstawiają się następująco:



OffsetStore jest ustalany po zakończeniu wydatków przez konsumenta

kod

Po zakończeniu konfiguracji odczekaj 10 sekund i nadal będzie działaćOtrzymałem ostatnią wiadomość(Gdy konsument łączy się z brokerem,Rozpocznij zużycie od pozycji offsetowejJeśli c.Commit(cr) jest ustawione; Ostatnia wiadomość nie będzie odbierana wielokrotnie.

Zobacz kod źródłowy



zatwierdzaj offset + 1 commit, a następnie wywołuj Librdkafka.topic_partition_list_destroy(cOffsets);

Logowanie do linku jest widoczne.
Logowanie do linku jest widoczne.

Ustaw inny GroupId

Spróbuj ustawić inny GroupId za pomocą parametru wiersza poleceń, a następnie wyślij wiadomość przez producenta, jak pokazano na poniższym obrazku:



Zarówno clinet1, jak i client2Otrzymuj wiadomości historyczne, a po wysłaniu wiadomości przez producenta obie prawie będąOtrzymuj wiadomości jednocześnie

Nowi konsumenci otrzymują tylko nowe wiadomości

Jak sprawić, by nowy klient otrzymywał tylko nowe wiadomości i ignorował dane historyczne?

Ustawienia są następujące:

Jak pokazano poniżej:



Kod producenta

Następujący sposób:

Kodeks konsumencki

Następujący sposób:

Pobranie kodu źródłowego

Turyści, jeśli chcecie zobaczyć ukrytą zawartość tego wpisu, proszęOdpowiedź






Poprzedni:.NET/C# Wyjątek przy użyciu skrzynki pocztowej Tencent Enterprise: Operacja wygasła.
Następny:NuGet oczyszcza pamięć podręczną
 Ziemianin| Opublikowano 15.04.2021 09:31:05 |
Gdy klient .NET Kafka zostanie rozłączony, nie wyrzuca wyjątku i łączy się ponownie po normalnym ustawieniu sieci
%4|1618450028.267| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Rozłączone (po 59926 ms w stanie UP)
%3|1618450028.267| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Rozłączony (po 59926ms w stanie UP)
%3|1618450028.267| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connect do ipv4#192.168.1.175:9092 nieudane: Nieznany błąd (po 0ms w stanie CONNECT)
%3|1618450028.268| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 niepowiódło się: Nieznany błąd (po 0ms w stanie CONNECT)
%3|1618450028.357| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 niepowiódło: Nieznany błąd (po 10 ms w stanie CONNECT, 1 identyczny błąd zablokowany)
%3|1618450028.357| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 nieudane niepowodzenie: Nieznany błąd (po 10 ms w stanie CONNECT, 1 identyczny błąd zablokowany)
%3|1618450062.882| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 niepowiódło się: Nieznany błąd (po 0ms w stanie CONNECT, 8 identycznych błędów zablokowanych)
%3|1618450062.882| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 niepowiódło się: Nieznany błąd (po 0ms w stanie CONNECT, 8 identycznych błędów zablokowanych)
%3|1618450098.255| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 nieudane zadanie: Nieznany błąd (po 11 ms w stanie CONNECT, 4 identyczne błędy zablokowane)
%3|1618450098.255| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 nieudana awaria: Nieznany błąd (po 11 ms w stanie CONNECT, 4 identyczne błędy zablokowane)
%3|1618450138.243| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 niepowiódło się: Nieznany błąd (po 0ms w stanie CONNECT, 4 identyczne błędy zablokowane)
%3|1618450138.244| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 nieudane zadanie: Nieznany błąd (po 0ms w stanie CONNECT, 4 identyczne błędy zablokowane)
%3|1618450168.254| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 niepowodzenie: Nieznany błąd (po 10 ms w stanie CONNECT, 3 identyczne błędy zablokowane)
%3|1618450168.254| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 niepowodzenie: Nieznany błąd (po 10 ms w stanie CONNECT, 3 identyczne błędy zablokowane)

 Ziemianin| Opublikowano 13.04.2021 16:26:58 |
Zasada konsumpcji wiadomości:

W rzeczywistym procesie produkcyjnym każdy temat będzie miał wiele partycji, a zaletą wielu partycji jest to, że z jednej strony możliwość shardingowania danych na brokerze skutecznie zmniejsza pojemność wiadomości i poprawia wydajność IO. Z drugiej strony, aby zwiększyć moc konsumpcyjną strony konsumenta, ten sam temat będzie zazwyczaj konsumowany przez wielu konsumentów, czyli mechanizm równoważenia obciążenia po stronie konsumenckiej, co zrozumiemy dalej: jak konsumenci konsumują wiadomości w przypadku wielu partycji i wielu konsumentów? Kafka istnieje w koncepcji grup konsumenckich, czyli group.id tego samego rodzaju konsumentów, którzy należą do grupy konsumenckiej, a wszyscy konsumenci w grupie koordynują się, aby konsumować wszystkie partycje tematu subskrypcji. Oczywiście każda partycja może być konsumowana tylko przez konsumentów z tej samej grupy, więc jak konsumenci z tej samej grupy konsumenckiej przydzielają dane, na których partycja powinna być konsumowana? Dla prostego przykładu, jeśli są partycje tracące, czyli gdy liczba partitonów jest taka sama jak liczba konsumerów, każdy konsumujący odpowiada jednej partycji; jeśli liczba konsumujących jest większa niż partycji, to dodatkowa liczba konsumujących nie zadziała, natomiast będą konsumpcji konsumujący wiele partycji.

Strategia przydziału zagospodarowania przestrzennego:

W Kafce istnieją dwie strategie alokacji podziałów: jedna to Range (domyślna), a druga RoundRobin (polling). Jest to ustalane przez parametr konfiguracyjny użytkownika partition.assignment.strategy.


Zobacz wszystkie tematy


Zobacz szczegóły dotyczące tematu




 Ziemianin| Opublikowano 08.05.2021 17:17:33 |
Kafka usuwa grupy konsumenckie



Usunięcie żądanych grup konsumenckich ("itsvse") zakończyło się sukcesem.


Można zgłaszać następujące błędy:

Error: Deletion of some consumer groups failed:
* Grupa 'itsvse' nie mogła zostać usunięta z powodu: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: Grupa nie jest pusta.
rozwiązanie

Konsumuj wszystkie wiadomości lub ustaw offset

Kafka ręcznie ustawia offset offset
https://www.itsvse.com/thread-9641-1-1.html
Potem usuń to jeszcze raz!

 Ziemianin| Opublikowano 13.04.2021 15:40:48 |
Polecenie Power Shell



Każdy klient konsumencki utrzymuje 2 połączenia z usługą Kafka
 Ziemianin| Opublikowano 07.05.2021 12:37:06 |
Kafka, aby zobaczyć liczbę stosów tematów w określonej grupie

Opublikowano 16.06.2021 12:41:09 |
Proszę zapytać, dlaczego kod nie może być przeglądany~
 Ziemianin| Opublikowano 25.06.2021 10:50:06 |
Kafka otrzymuje povelę rozmiaru tematu:



 Ziemianin| Opublikowano 18.07.2021 10:15:01 |
Wiersz poleceń Kafka do tworzenia tematów:

Opublikowano 03.09.2021 11:52:41 |
Kafka wciąż ma wiele pułapek, nauczył się
Zrzeczenie się:
Całe oprogramowanie, materiały programistyczne lub artykuły publikowane przez Code Farmer Network służą wyłącznie celom edukacyjnym i badawczym; Powyższe treści nie mogą być wykorzystywane do celów komercyjnych ani nielegalnych, w przeciwnym razie użytkownicy ponoszą wszelkie konsekwencje. Informacje na tej stronie pochodzą z Internetu, a spory dotyczące praw autorskich nie mają z nią nic wspólnego. Musisz całkowicie usunąć powyższą zawartość z komputera w ciągu 24 godzin od pobrania. Jeśli spodoba Ci się program, wspieraj oryginalne oprogramowanie, kup rejestrację i korzystaj z lepszych, autentycznych usług. W przypadku naruszenia praw prosimy o kontakt mailowy.

Mail To:help@itsvse.com