Kafka to wysokowydajny, rozproszony system komunikacji opracowany przez LinkedIn, szeroko wykorzystywany w takich sytuacjach jak zbieranie logów, przetwarzanie danych strumieniowych, dystrybucja wiadomości online i offline oraz wiele innych. Chociaż nie jest zaprojektowany jako tradycyjny MQ, Kafaka może w większości przypadków zastąpić tradycyjne systemy komunikacji, takie jak ActiveMQ.
Kafka organizuje przepływ wiadomości według tematów, a serwer, który je przechowuje, nazywany jest brokerem, a konsumenci mogą subskrybować jeden lub więcej tematów. Aby zrównoważyć obciążenie, przekazy tematu można podzielić na wiele partycji, a im więcej partycji, tym większa równoległość i przepustowość Kafki.
Klastry Kafka wymagają wsparcia Zookeeper do implementacji klastrów, a Zookeeper jest już włączony do najnowszej dystrybucji Kafka, którą można wdrożyć do uruchomienia serwera Zookeeper i serwera Kafka jednocześnie, lub korzystać z innych istniejących klastrów Zookeeper.
W przeciwieństwie do tradycyjnego MQ, konsumenci muszą samodzielnie przechowywać offset, a gdy otrzymują wiadomości od Kafki, pobierają wiadomości dopiero po aktualnym offsetie. Klient Scala/Java Kafki już implementuje tę część logiki, zapisując offset do Zookeepera. Każdy konsument może wybrać identyfikator, a użytkownicy tego samego identyfikatora otrzymają tę samą wiadomość tylko raz.Jeśli wszyscy konsumenci tematu używają tego samego identyfikatora, jest to tradycyjna kolejka. Jeśli każdy konsument używa innego ID, jest to tradycyjny pub-sub.
Recenzja:
Konsumpcja Kafki
1. Konsumenci tego samego group_id, tylko jeden konsument może konsumować wiadomości (Tryb kolejki kolejki)
2. Konsumenci różnych group_id otrzymują te same wiadomości
Zalety Kafki
Rozproszone i wysoce skalowalne. Klastry Kafka mogą być transparentnie skalowane, aby dodawać nowe serwery do klastra.
Wysoka wydajność. Wydajność Kafki znacznie przewyższa tradycyjne implementacje MQ, takie jak ActiveMQ i RabbitMQ, zwłaszcza Kafka, która również obsługuje operacje wsadowe. Poniższy obraz pokazuje wyniki testu obciążenia konsumenckiego LinkedIn:
Odporność na błędy. Dane z każdej partycji w Kafka są replikowane na kilka serwerów. Gdy broker zawodzi, usługa ZooKeeper powiadomi producenta i konsumenta, którzy przechodzą do innego brokera.
Wady Kafki:
Powtarzaj wiadomości. Kafka gwarantuje, że każda wiadomość zostanie dostarczona przynajmniej raz, a choć szanse są niewielkie, istnieje szansa, że wiadomość zostanie dostarczona wielokrotnie. Wiadomości są nie w porządku. Chociaż wiadomości wewnątrz partycji gwarantują uporządkowanie, jeśli temat ma wiele partycji, dostarczanie wiadomości między nimi nie jest gwarantowane uporządkowanie. Złożoność. Kafka wymaga wsparcia klastrów Zookeeper, a tematy zazwyczaj wymagają ręcznej pracy, by stworzyć, wdrożyć i utrzymać je droższe niż ogólne kolejki wiadomości
Operacje Kafka w kolejce komunikatów .NET/C#
Po pierwsze, użyj .NET Core 3.1 do stworzenia dwóch nowych projektów konsolowych, mianowicie Kafka-Consumer i Kafka-Producer
Użyj nugeta, aby odwołać się do pakietu Confluent.Kafka w ten sposób, wykonując następujące polecenie:
Adres GitHub:Logowanie do linku jest widoczne.
Najpierw uruchamiamy program Producent, a jeśli najpierw uruchomimy konsumenta, pojawi się następujący błąd:
Wystąpił błąd: Broker: Nieznany temat lub partycja Ten artykuł zajmie się miejscamiEnableAutoOffsetStore to nieprawdaczyli ręcznie ustawiając pamięć przesunięcia (podobnie jak ręczny komunikat potwierdzający)
Konsumenci nie ustawiają OffsetStore po spożyciu
Spróbuj użyć producenta do wygenerowania dwóch wiadomości, włącz konsumpcję konsumpcyjną, MaxPollIntervalMs = 10000 // 10 sekund bez ręcznego ustawiania, pozwól innym klientom na konsumpcję, oczywiście nie zostanie to zużyte przez innych klientów w ciągu 10 sekund
MaxPollIntervalMs wyjaśnia
Dla zaawansowanych konsumentów jest to maksymalny dozwolony czas na zużywanie wiadomości między połączeniami (na przykład rd_kafka_consumer_poll()). Jeśli ten przedział zostanie przekroczony, konsument uznaje się za niezgodnego i grupa jest ponownie zbalansowana, tak aby partycja została przypisana innemu członkowi grupy konsumenckiej. Ostrzeżenie: Commity offsetowe mogą być obecnie niemożliwe. Uwaga: Zaleca się ustawienie "enable.auto.offset.store=false" dla aplikacji przetwarzających przez dłuższy czas, a następnie jawnie przechowywanie offsetu (używając offsets_store()) po przetworzeniu wiadomości*, aby upewnić się, że offset nie zostanie automatycznie zatwierdzony przed zakończeniem przetwarzania. Sprawdzaj raz na sekundę w odstępach dwóch. Więcej informacji można znaleźć w KIP-62. Przedstawienia przedstawiają się następująco:
OffsetStore jest ustalany po zakończeniu wydatków przez konsumenta
kod
Po zakończeniu konfiguracji odczekaj 10 sekund i nadal będzie działaćOtrzymałem ostatnią wiadomość(Gdy konsument łączy się z brokerem,Rozpocznij zużycie od pozycji offsetowejJeśli c.Commit(cr) jest ustawione; Ostatnia wiadomość nie będzie odbierana wielokrotnie.
Zobacz kod źródłowy
zatwierdzaj offset + 1 commit, a następnie wywołuj Librdkafka.topic_partition_list_destroy(cOffsets);
Logowanie do linku jest widoczne.
Logowanie do linku jest widoczne.
Ustaw inny GroupId
Spróbuj ustawić inny GroupId za pomocą parametru wiersza poleceń, a następnie wyślij wiadomość przez producenta, jak pokazano na poniższym obrazku:
Zarówno clinet1, jak i client2Otrzymuj wiadomości historyczne, a po wysłaniu wiadomości przez producenta obie prawie będąOtrzymuj wiadomości jednocześnie。
Nowi konsumenci otrzymują tylko nowe wiadomości
Jak sprawić, by nowy klient otrzymywał tylko nowe wiadomości i ignorował dane historyczne?
Ustawienia są następujące:
Jak pokazano poniżej:
Kod producenta
Następujący sposób:
Kodeks konsumencki
Następujący sposób:
Pobranie kodu źródłowego
Turyści, jeśli chcecie zobaczyć ukrytą zawartość tego wpisu, proszę Odpowiedź
|