Ten artykuł najpierw prowadzi do tego, jakie problemy middleware wiadomości zwykle musi rozwiązać, jakie trudności napotkają przy ich rozwiązywaniu, czy Apache RocketMQ może być rozwiązany jako wysokowydajny, wysokoprzepustowy rozproszony middleware wiadomości open source przez Alibabę oraz jak te problemy są definiowane w specyfikacji. Następnie ten artykuł przedstawi projekt architektury RocketMQ, aby czytelnikom szybko zrozumieć RocketMQ. 1. Jakie problemy musi rozwiązać middleware wiadomości? Publikowanie/Subskrybowanie to najbardziej podstawowa funkcja middleware wiadomości i jest również powiązana z tradycyjną komunikacją RPC. Nie będę tu wchodził w szczegóły. Priorytet opisany w specyfikacji Priority Message odnosi się do kolejki wiadomości, każda wiadomość ma inny priorytet, zazwyczaj opisywany liczbami całkowitym; wiadomość o wysokim priorytecie jest dostarczana jako pierwsza, jeśli wiadomość znajduje się całkowicie w kolejce pamięci, można ją posortować według priorytetu przed dostawą, tak aby wysoki priorytet był dostarczany jako pierwszy. Ponieważ wszystkie wiadomości w RocketMQ są trwałe, jeśli zostaną posortowane według priorytetów, narzut jest bardzo duży, więc RocketMQ nie obsługuje konkretnie priorytetu wiadomości, ale może implementować podobne funkcje w ramach obejścia, czyli skonfigurować kolejkę z wysokim priorytetem i kolejkę z normalnym priorytetem oraz wysyłać różne priorytety do różnych kolejek. W przypadku kwestii priorytetowych można je podzielić na 2 kategorie:
- Dopóki priorytet jest osiągnięty, nie jest on priorytetem w ścisłym sensie i zwykle dzieli się go na wysoki, średni, niski lub kilka kolejnych poziomów. Każdy priorytet może być reprezentowany przez inny temat, a podczas wysyłania wiadomości można określić różne tematy, które odzwierciedlają priorytet, co może rozwiązać większość problemów priorytetowych, ale obniżyć dokładność priorytetów biznesowych.
- Ścisły priorytet, priorytet wyrażany jest jako liczba całkowita, np. 0 ~ 65535, tego typu problem priorytetu zazwyczaj nie nadaje się do rozwiązywania w różnych tematach. Jeśli chcesz, żeby MQ rozwiązało ten problem, będzie to miało ogromny wpływ na wydajność MQ. Oto kwestia, by upewnić się, że firma naprawdę potrzebuje ścisłego ustalania priorytetów, a jeśli priorytety zostaną sprowadzone do nielicznych grup, jaki wpływ będzie to miało na firmę?
Kolejność wiadomości odnosi się do rodzaju wiadomości, którą można konsumować w kolejności wysłania. Na przykład zamówienie generuje 3 komunikaty: tworzenie zamówienia, płatność zlecenia oraz zakończenie zamówienia. Podczas konsumpcji warto konsumować w tej kolejności. Jednocześnie zamówienia mogą być konsumowane równolegle. RocketMQ może ściśle zapewnić, że wiadomości są uporządkowane. Filtrowanie wiadomości Message FilterBroker W Brokerze filtrowanie według wymagań konsumenta ma tę zaletę, że ogranicza transmisję niepotrzebnych wiadomości do konsumenta. Minusem jest to, że zwiększa obciążenie dla brokera i jest stosunkowo skomplikowany do wdrożenia. 1. Taobao Notify obsługuje różnorodne metody filtrowania, w tym bezpośrednie filtrowanie według typu wiadomości oraz elastyczne filtrowanie wyrazów składniowych, które mogą sprostać niemal najbardziej wymagającym potrzebom filtrowania. 2. Taobao RocketMQ obsługuje filtrowanie według prostego tagu wiadomości, a także nagłówka i treści wiadomości. 3. Elastyczne filtrowanie wyrażeń składniowych jest również wspierane w specyfikacji CORBA Notification. Filtrowanie wiadomości po stronie konsumenta To filtrowanie może być w pełni dostosowane przez aplikację, ale minusem jest to, że do konsumenta trafia wiele bezużytecznych wiadomości. Istnieje kilka powszechnych metod trwałości stosowanych przez utrzymywanie wiadomości:
- Zachowaj w bazie danych, takiej jak Mysql.
- Zachowaj na pamięci KV, takiej jak levelDB, Berkeley DB i inne systemy przechowywania KV.
- Trwałość w formie rekordów plików, takich jak Kafka, RocketMQ
- Stwórz trwały obraz danych pamięci, np. beanstalked, VisiNotify
- (1), (2) i (3) wszystkie trzy metody persystencji mają możliwość rozszerzania bufora kolejki pamięci, a (4) są po prostu obrazem pamięci, który może przywrócić dane z poprzedniej pamięci po rozłączeniu się brokera i ponownym uruchomieniu.
Specyfikacje JMS i CORBA Notification nie określają wprost, jak trwać, ale wydajność części persistence bezpośrednio determinuje wydajność całego middleware wiadomości. RocketMQ w pełni wykorzystuje pamięć systemu plików Linuksa, aby poprawić wydajność. Istnieje kilka sytuacji, w których Niezawodność Wiadomości wpływa na wiarygodność wiadomości:
- Broker zamyka transakcje normalnie
- Krach brokerów
- Awaria systemu operacyjnego
- Maszyna traci zasilanie, ale zasilanie można natychmiast przywrócić.
- Urządzenie nie chce się włączyć (może być uszkodzone przez kluczowe urządzenia, takie jak CPU, płyta główna, pamięć itp.)
- Uszkodzenie urządzenia dysku.
(1), (2), (3) i (4) to sytuacje, w których zasoby sprzętowe można odzyskać natychmiast, a RocketMQ może zapewnić, że wiadomości nie zostaną utracone lub niewielka ilość danych (w zależności od tego, czy metoda flashowania jest synchroniczna, czy asynchroniczna). (5) (6) Jest to pojedynczy punkt awarii i nie można go odzyskać; gdy już nastąpi, wszystkie komunikaty na tym punkcie zostają utracone. W obu przypadkach RocketMQ zapewnia, że 99% wiadomości nie zostaje utraconych przez asynchroniczną replikację, ale wciąż bardzo niewiele wiadomości może zostać utraconych. Synchroniczna technologia podwójnego zapisu może całkowicie uniknąć pojedynczych punktów, co nieuchronnie wpływa na wydajność, co czyni ją odpowiednią w sytuacjach o bardzo wysokich wymaganiach dotyczących niezawodności wiadomości, takich jak aplikacje związane z Money. RocketMQ obsługuje synchroniczne podwójne zapisy począwszy od wersji 3.0. Komunikaty o niskim opóźnieniu mogą dotrzeć do konsumenta natychmiast po dotarciu do brokera, bez konieczności gromadzenia wiadomości. RocketMQ stosuje metodę długiego pobierania ankiet, aby zapewnić, że komunikat jest bardzo w czasie rzeczywistym, a komunikat w czasie rzeczywistym nie jest niższy niż w przypadku push. Przynajmniej raz oznacza, że każda wiadomość musi zostać dostarczona raz. RocketMQ Consumer najpierw pobiera wiadomość do lokalnej strefy, a następnie zwraca ją do serwera po zakończeniu konsumpcji. Dokładnie tylko raz- Etap wysyłania wiadomości nie pozwala na wysyłanie duplikatów.
- Na etapie Skonsumuj wiadomości nie wolno korzystać z duplikatów wiadomości.
Dopiero gdy powyższe dwa warunki zostaną spełnione, wiadomość może być uznana za "Dokładnie tylko raz", a aby osiągnąć powyższe dwa punkty, nieuchronnie powstaną ogromne narzuty w środowisku systemu rozproszonego. Dlatego, aby dążyć do wysokiej wydajności, RocketMQ nie gwarantuje tej funkcji i wymaga deduplikacji w biznesie, co oznacza, że wiadomości konsumenckie muszą być idempotentne. Chociaż RocketMQ nie może ściśle zagwarantować nieduplikacji, w normalnych warunkach rzadko dochodzi do powtarzających się transmisji i konsumpcji, jedynie nieprawidłowości sieci, uruchamiania i wyłączania konsumenta oraz innych nieprawidłowych sytuacji, takich jak duplikacja wiadomości. Podstawowym powodem tego problemu jest niepewność w połączeniach sieciowych, czyli trzeci stan ani sukcesu, ani porażki, co powoduje problem powtarzania wiadomości. Co powinienem zrobić, jeśli Broker's Buffer jest pełny? Bufor brokera zwykle odnosi się do rozmiaru bufora pamięci w kolejce brokera, który zwykle jest ograniczony pod względem rozmiaru, co jeśli bufor jest pełny? Oto jak jest to obsługiwane w specyfikacji powiadomień CORBA:
- RejectNewEvents odrzuca nową wiadomość i zwraca kod błędu RejectNewEvents do Producenta.
- Usuń istniejące wiadomości zgodnie z konkretną polityką
- AnyOrder – Każde zdarzenie może zostać odrzucone po przepełnieniu. To jest domyślne ustawienie dla tej właściwości.
- FifoOrder – Pierwsze zdarzenie, które otrzymamy, będzie pierwszym odrzuconym.
- LifoOrder – Ostatnie otrzymane zdarzenie będzie pierwszym odrzuconym.
- PriorityOrder – Zdarzenia powinny być odrzucane w kolejności priorytetów, tak aby zdarzenia o niższym priorytecie były odrzucane przed wydarzeniami o wyższym priorytecie.
- DeadlineOrder – Zdarzenia powinny być odrzucane w kolejności najkrótszego terminu wygaśnięcia.
RocketMQ nie posiada koncepcji bufora pamięci, a kolejki RocketMQ to dyski trwałe, a dane są regularnie czyszczone. Aby rozwiązać ten problem, RocketMQ ma bardzo istotną różnicę od innych MQ – bufor pamięci RocketMQ jest zgrupowany do nieskończonej kolejki, bez względu na ilość danych, można go zainstalować, ta nieskończoność jest zakładana, broker regularnie usuwa wygasłe dane, na przykład broker zapisuje tylko 3 dni wiadomości, a mimo że długość tego bufora jest nieskończona, to dane sprzed 3 dni są usuwane z końca kolejki. Konsumpcja retrospektywna odnosi się do przekazu, że konsument skutecznie skonsumował i który musi zostać ponownie konsumowany ze względu na zapotrzebowanie biznesowe. Na przykład, z powodu awarii systemu konsumenckiego, dane sprzed 1 godziny muszą zostać ponownie wykorzystane po odzyskaniu, a następnie broker powinien zapewnić mechanizm cofania postępu konsumpcji zgodnie z wymiarem czasowym. RocketMQ obsługuje retrospektywne zużycie oparte na czasie, z wymiarem czasu dokładnym do milisekund, który można cofać do przodu lub do tyłu. Główną funkcją oprogramowania middleware komunikatów do stosowania wiadomości jest asynchroniczne rozdzielanie, a kolejną ważną funkcją jest blokowanie szczytu zalewu danych front-endu i zapewnienie stabilności systemu zaplecza, co wymaga, aby middleware wiadomości posiadało określoną zdolność do stosowania wiadomości, a stos wiadomości integruje następujące dwie sytuacje:
- Wiadomości są gromadzone w pamięci, a gdy przekroczą ten bufor, mogą być odrzucane zgodnie z określoną polityką utraty, opisaną w specyfikacji CORBA Notification. Jest odpowiedni dla usług, które tolerują odrzucanie wiadomości; w tym przypadku zdolność akumulacji wiadomości zależy głównie od wielkości bufora pamięci, a spadek wydajności nie będzie zbyt duży po ułożeniu wiadomości, ponieważ ilość danych w pamięci ma ograniczony wpływ na możliwość dostępu do świata zewnętrznego.
- Wiadomości są gromadzone w trwałych systemach pamięci masowej, takich jak baza danych, pamięć KV, forma rekordów plików. Gdy wiadomości nie mogą zostać trafione do pamięci podręcznej, nieuniknione jest uzyskanie dostępu do dysku, co generuje dużą ilość wejścia do odczytu, a przepustowość odczytu wejścia bezpośrednio decyduje o możliwości dostępu do wiadomości po ich zgromadzeniu.
Istnieją cztery główne punkty oceny zdolności gromadzenia wiadomości:
- Ile wiadomości można nagromadzić, ile bajtów? To znaczy ogromna pojemność przekazu.
- Po nagromadzeniu wiadomości, czy przepustowość wiadomości jest dotknięta przez stosowanie?
- Czy normalne zużycie konsumentów zostanie dotknięte po nagromadzeniu się komunikatów?
- Po nagromadzeniu się wiadomości, jaka jest przepustowość przy uzyskiwaniu się wiadomości nagromadzonych na dysku?
Transakcje rozproszone Kilka znanych specyfikacji transakcji rozproszonych, takich jak XA, JTA itp. Wśród nich specyfikacja XA jest szeroko wspierana przez głównych dostawców baz danych, takich jak Oracle, MySQL i inni. Wśród nich lider implementacji TM XA, taki jak Oracle Tuxedo, jest szeroko wykorzystywany w finansach, telekomunikacji i innych dziedzinach. Rozproszone transakcje wiążą się z dwuetapowymi problemami zatwierdzania, a pod względem przechowywania danych musi być wspierane przechowywanie KV, ponieważ drugi etap cofania zatwierdzeń musi zmodyfikować stan wiadomości, co musi wymagać znalezienia wiadomości zgodnie z kluczem. RocketMQ omija problem znalezienia wiadomości według klucza w drugim stopniu, używając pierwszego stopnia do wysłania przygotowanej wiadomości, uzyskania offsetu wiadomości, a drugiego etapu do uzyskania dostępu do wiadomości przez offset i modyfikację stanu, czyli offset to adres danych. Metoda implementacji transakcji w RocketMQ nie odbywa się przez pamięć KV, lecz przez metodę offset, która ma istotną wadę, mianowicie zmiana danych przez offset powoduje zbyt wiele brudnych stron w systemie, co wymaga szczególnej uwagi. Wiadomości zaplanowane Wiadomości zaplanowane oznaczają, że konsumenci nie mogą być konsumowani bezpośrednio po wysłaniu do brokera i mogą być konsumowane tylko w określonym momencie lub po oczekiwaniu na określony czas. Jeśli chcesz wspierać arbitralną dokładność czasu, na poziomie brokera musisz przeprowadzić sortowanie wiadomości, a jeśli w grę wchodzi trwałość, to sortowanie wiadomości nieuchronnie wiąże się z ogromnymi narzutami wydajnościowymi. RocketMQ obsługuje wiadomości czasowe, ale nie obsługuje dowolnej dokładności czasu i obsługuje określone poziomy, takie jak pomiary 5s, 10s, 1m itd. Próba ponowna po tym, jak użytkownik nie uda się jej skonsumować, zapewnij mechanizm ponownego spróbowania, aby wiadomość była ponownie konsumowana. Niepowodzenia komunikatów konsumenckich można zazwyczaj rozpatrywać w następujących sytuacjach:
- Z powodu samej wiadomości, takiej jak awaria deserializacji, dane wiadomości nie mogą być przetwarzane (np. doładowanie rachunku telefonicznego, numer telefonu komórkowego aktualnej wiadomości jest wylogowany, nie może zostać doładowany) itd. Ten błąd zwykle wymaga pominięcia tej wiadomości i zużywania innych wiadomości, a ta nieudana wiadomość jest w 99% nieudana nawet jeśli próba zostanie powtórzona od razu, dlatego najlepiej jest zapewnić mechanizm powtórki z czasem, czyli próbę powtórną po 10 sekundach.
- Ponieważ zależne usługi aplikacyjne są niedostępne, takie jak połączenie z bazą danych, zewnętrzna sieć systemowa jest niedostępna itd. W przypadku tego błędu, nawet jeśli aktualna nieudana wiadomość zostanie pominięta, inne wiadomości również zostaną zużyte. W takim przypadku zaleca się zastosowanie trybu snu 30 i konsumpcję kolejnej wiadomości, co może zmniejszyć presję na brokera, by próbował ponownie wykonywać wiadomość.
Przegląd RocketMQSprawdźmy, czy RocketMQ rozwiązuje problemy z wymienionym wcześniej middleware wiadomości.
Czym jest RocketMQ?
Powyższa figurka to typowy model middleware wiadomości wysyłających i odbierających wiadomości; RocketMQ również został zaprojektowany w ten sposób, krótko mówiąc, RocketMQ posiada następujące cechy:
- Jest to middleware oparte na modelu kolejki o wysokiej wydajności, wysokiej niezawodności, wysokiej jakości pracy w czasie rzeczywistym i rozproszonych.
- Producer, Consumer i Queue mogą być dystrybuowane.
- Producent wysyła wiadomości do niektórych kolejek z kolei, kolekcja kolejek nazywa się Temat, Konsumencka Jeśli konsumpcja rozgłoszeniowa, jedna instancja konsumencka konsumuje wszystkie kolejki odpowiadające temu tematowi, a jeśli zużycie klastrów, wiele instancji konsumenckich konsumuje kolekcję kolejek odpowiadającą temu tematowi.
- Można zagwarantować ścisłą kolejność wiadomości
- Zapewnia bogate tryby pobierania wiadomości
- Efektywne poziome możliwości skalowania subskrybentów
- Mechanizm subskrypcji wiadomości w czasie rzeczywistym
- Setki milionów wiadomości do gromadzenia
- Mniejsze zależności
Fizyczna struktura rozmieszczenia RocketMQ
Jak pokazano na powyższym rysunku, struktura rozrzutu RocketMQ ma następujące cechy:
- Serwer nazw to praktycznie bezstanowy węzeł, który może być wdrażany w klastrach bez żadnej synchronizacji informacji między węzłami.
- Wdrożenie Brokera jest stosunkowo złożone, Broker jest podzielony na Master i Slave, Master może odpowiadać wielu Slave, ale Slave może odpowiadać tylko jednemu Masterowi, a korespondencja między Master a Slave definiowana jest przez wskazanie tej samej BrokerName, innego BrokerId, BrokerId to 0 dla Master, a non-0 oznacza Slave. Magisterzy mogą być również wysyłani w kilku miejscach. Każdy broker nawiązuje długie połączenie ze wszystkimi węzłami w klastrze serwerów nazw i rejestruje informacje o tematach do wszystkich serwerów nazw w regularnych odstępach czasu.
- Producent nawiązuje długie połączenie z jednym z węzłów w klastrze serwera nazw (losowo wybranym), okresowo pobiera informacje o trasowaniu tematu z serwera nazw, nawiązuje długie połączenie z masterem świadczącym usługę tematyczną i wysyła heartbeat do mastera w regularnych odstępach czasu. Producer jest całkowicie bezstanowy i może być wdrażany w klastrach.
- Konsument nawiązuje długie połączenie z jednym z węzłów w klastrze Name Server (losowo wybranym), regularnie pobiera informacje o trasowaniu tematów z Name Servera oraz nawiązuje długie połączenie z Masterem i Slave, którzy świadczą usługę tematyczną, oraz wysyła heartbeat do Master i Slave w regularnych odstępach czasu. Konsumenci mogą subskrybować wiadomości zarówno od Mastera, jak i Slave, a zasady subskrypcji są określane przez konfigurację Brokera.
Logiczna struktura wdrażania RocketMQ
Jak pokazano na powyższym rysunku, logiczna struktura wdrożenia RocketMQ ma dwie cechy: Producenta i Konsumenca.
Do reprezentowania aplikacji komunikacyjnej Grupa Producenców zawiera wiele instancji Producenów, które mogą być wieloma maszynami, wieloma procesami jednej maszyny lub wieloma obiektami Producenców w procesie. Grupa Producenców może wysyłać wiele wiadomości tematycznych, a Grupa Producenców działa następująco:
- Zidentyfikuj rodzaj Producenta
- Możesz zapytać, czy w tej aplikacji komunikacyjnej jest wiele instancji Producenów za pomocą narzędzia O&M
- Podczas wysyłania wiadomości o transakcji rozproszonej, jeśli producent niespodziewanie przestanie działać, broker aktywnie wywołuje dowolną maszynę z grupy producentów, aby potwierdzić status transakcji.
Używana do reprezentowania aplikacji konsumenckiej do komunikacji, grupa konsumencka zawiera wiele instancji konsumentów, które mogą być wieloma maszynami, wieloma procesami lub wieloma obiektami konsumenckimi danego procesu. Wielu konsumentów w grupie konsumenckiej konsumuje wiadomości w równomiernie rozłożony sposób, a jeśli ustawimy nadawanie, każda instancja w tej grupie konsumenckiej konsumuje pełną ilość danych.
Struktura przechowywania danych RocketMQ
Jak pokazano na powyższym rysunku, RocketMQ stosuje metodę przechowywania danych, która oddziela dane od indeksów. Skutecznie ograniczaj utratę zasobów plikowych, IO i pamięci. Nawet przy ogromnych danych takich jak Alibaba, scenariusze o wysokiej równocześności mogą skutecznie zmniejszyć opóźnienia end-to-end i zapewnić silne poziome możliwości skalowania.
|