Acest articol conduce mai întâi ce probleme trebuie de rezolvat middleware-ul pentru mesaje de obicei, ce dificultăți vor întâmpina în rezolvarea acestor probleme, dacă Apache RocketMQ poate fi rezolvat ca un middleware de mesaje distribuite open source de înaltă performanță și debit ridicat de către Alibaba și cum sunt definite aceste probleme în specificație. Acest articol va introduce apoi designul arhitecturii RocketMQ, pentru a oferi cititorilor o înțelegere rapidă a RocketMQ. 1. Ce probleme trebuie rezolvate middleware-ul pentru mesaje? Publicarea/Abonarea este cea mai de bază funcție a middleware-ului de mesaje și este, de asemenea, relativă cu comunicarea RPC tradițională. Nu voi intra în detalii aici. Prioritatea descrisă în specificația Priorității Mesajelor se referă la o coadă de mesaje, fiecare mesaj are o prioritate diferită, de obicei descrisă prin numere întregi, mesajul cu prioritate mare este livrat primul, iar dacă mesajul este complet într-o coadă de memorie, atunci poate fi sortat conform priorității înainte de livrare, astfel încât prioritatea mare să fie livrată prima. Deoarece toate mesajele din RocketMQ sunt persistente, dacă sunt sortate după prioritate, overhead-ul va fi foarte mare, astfel că RocketMQ nu suportă în mod specific prioritatea mesajelor, dar poate implementa funcții similare într-o soluție alternativă, adică să configureze o coadă cu prioritate mare și una cu prioritate normală, și să trimită priorități diferite către cozi diferite. Pentru problemele prioritare, ele pot fi rezumate în 2 categorii:
- Atâta timp cât prioritatea este atinsă, nu este o prioritate în sens strict, iar prioritatea este de obicei împărțită în niveluri înalte, medii, joase sau mai multe niveluri. Fiecare prioritate poate fi reprezentată printr-un subiect diferit, iar atunci când se trimite un mesaj, se specifică subiecte diferite pentru a reprezenta prioritatea, ceea ce poate rezolva majoritatea problemelor de prioritate, dar poate compromite acuratețea priorităților de business.
- Prioritatea strictă, prioritatea este exprimată ca un întreg, cum ar fi 0 ~ 65535, acest tip de problemă de prioritate nu este, în general, potrivită pentru a fi rezolvată cu subiecte diferite. Dacă vrei ca MQ să rezolve această problemă, va avea un impact foarte mare asupra performanței MQ. Iată un punct pentru a ne asigura că afacerea are cu adevărat nevoie de această prioritizare strictă, iar dacă prioritățile sunt comprimate în puține, cât impact va avea asupra afacerii?
Ordinea mesajelor se referă la un tip de mesaj care poate fi consumat în ordinea în care este trimis. De exemplu, o comandă generează 3 mesaje, și anume crearea comenzii, plata comenzii și finalizarea comenzii. Când consumi, este semnificativ să consumi în această ordine. Dar, în același timp, comenzile pot fi consumate în paralel. RocketMQ poate asigura strict ca mesajele să fie ordonate. Filtru de mesaje Broker Filtrarea mesajelor În Broker, filtrarea conform cerințelor consumatorului are avantajul de a reduce transmiterea mesajelor inutile către consumator. Dezavantajul este că aceasta crește povara asupra brokerului și este relativ complexă de implementat. 1. Taobao Notify suportă o varietate de metode de filtrare, inclusiv filtrarea directă după tipul mesajului și filtrarea flexibilă a expresiilor sintaxice, care poate satisface aproape cele mai solicitante nevoi de filtrare. 2. Taobao RocketMQ suportă filtrarea după Message Tag simplu, precum și Message Header și body. 3. Filtrarea flexibilă a expresiilor sintaxice este de asemenea suportată în specificația CORBA Notification. Filtrarea mesajelor pe partea de consumator Această filtrare poate fi complet personalizată de aplicație, dar dezavantajul este că multe mesaje inutile sunt trimise către consumator. Există câteva metode comune de persistență folosite de persistența mesajelor:
- Persistă într-o bază de date, cum ar fi Mysql.
- Persistă la stocarea KV, cum ar fi levelDB, Berkeley DB și alte sisteme de stocare KV.
- Persistența sub forma înregistrărilor de fișiere, cum ar fi Kafka, RocketMQ
- Creează o imagine persistentă a datelor de memorie, cum ar fi Beanstalkd, VisiNotify
- (1), (2) și (3) toate cele trei metode de persistență au capacitatea de a extinde buffer-ul cozii de memorie, iar (4) sunt doar o imagine de memorie, care poate restaura datele din memoria anterioară după ce brokerul închide și repornește.
Specificațiile JMS și CORBA Notification nu specifică explicit cum să persiste, dar performanța părții de persistență determină direct performanța întregului middleware al mesajelor. RocketMQ folosește pe deplin cache-ul memoriei sistemului de fișiere Linux pentru a îmbunătăți performanța. Există mai multe situații în care fiabilitatea mesajelor afectează fiabilitatea mesajelor:
- Brokerul închide în mod normal
- Prăbușirea brokerilor
- Blocarea sistemului de operare
- Mașina pierde alimentarea, dar sursa de alimentare poate fi refăcută imediat.
- Calculatorul nu pornește (poate fi deteriorat la dispozitive cheie precum procesorul, placa de bază, memoria etc.)
- Deteriorarea dispozitivului de disc.
(1), (2), (3) și (4) sunt toate situații în care resursele hardware pot fi recuperate imediat, iar RocketMQ se poate asigura că mesajele nu sunt pierdute sau că o cantitate mică de date se pierde (în funcție de faptul dacă metoda de flashing este sincronă sau asincronă). (5) (6) Este un singur punct de defecțiune și nu poate fi recuperat, odată ce apare, toate mesajele de pe acest punct unic se pierd. În ambele cazuri, RocketMQ asigură că 99% dintre mesaje nu sunt pierdute prin replicare asincronă, dar există totuși foarte puține mesaje care pot fi pierdute. Tehnologia de scriere dublă sincronă poate evita complet punctele individuale, ceea ce va afecta inevitabil performanța, făcând-o potrivită pentru situații cu cerințe extrem de ridicate de fiabilitate a mesajelor, cum ar fi aplicațiile legate de bani. RocketMQ suportă scriere duală sincronă începând cu versiunea 3.0. Mesajele cu latență scăzută pot ajunge la consumator imediat după ce mesajul ajunge la broker, fără acumularea de mesaje. RocketMQ folosește o metodă lungă de pulling pentru a asigura că mesajul este foarte în timp real, iar mesajul în timp real nu este mai mic decât cel al push-ului. Cel puțin o dată înseamnă că fiecare mesaj trebuie livrat o singură dată. RocketMQ Consumer trage mai întâi mesajul în zona locală, apoi returnează ack către server după ce consumul este finalizat. Exact o singură dată- Etapa de trimitere a mesajelor nu permite trimiterea mesajelor duplicate.
- În etapa Consume Message, mesajele duplicate nu pot fi consumate.
Doar când cele două condiții de mai sus sunt îndeplinite poate fi considerat mesajul "Exact o singură dată", iar pentru a atinge cele două puncte de mai sus, inevitabil se va genera un overhead uriaș în mediul sistemului distribuit. Prin urmare, pentru a urmări performanțe ridicate, RocketMQ nu garantează această funcție și necesită deduplicarea în afacere, ceea ce înseamnă că mesajele consumatorilor trebuie să fie idempotente. Deși RocketMQ nu poate garanta strict neduplicarea, în condiții normale, există rareori trimiteri și consumuri repetate, ci doar anomalii de rețea, starturi și opriri ale consumatorilor și alte situații anormale, cum ar fi duplicarea mesajelor. Motivul esențial al acestei probleme este că există incertitudine în apelurile de rețea, adică a treia stare fără succes sau eșec, astfel apare problema repetării mesajelor. Ce ar trebui să fac dacă Broker's Buffer este plin? Buffer-ul brokerului se referă de obicei la dimensiunea buffer-ului de memorie a unei cozi din broker, care este de obicei limitată ca dimensiune; ce se întâmplă dacă buffer-ul este plin? Iată cum este gestionată în specificația de notificare CORBA:
- RejectNewEvents respinge noul mesaj și returnează codul de eroare RejectNewEvents către Producător.
- Eliminați mesajele existente conform unei politici specifice
- AnyOrder - Orice eveniment poate fi aruncat la overflow. Aceasta este setarea implicită pentru această proprietate.
- FifoOrder - Primul eveniment primit va fi primul aruncat.
- LifoOrder - Ultimul eveniment primit va fi primul aruncat.
- Ordinea Priorității - Evenimentele trebuie aruncate în ordinea priorității, astfel încât evenimentele cu prioritate mai mică să fie eliminate înaintea celor cu prioritate mai mare.
- DeadlineOrder - Evenimentele trebuie eliminate mai întâi în ordinea termenului limită de expirare cea mai scurtă.
RocketMQ nu are conceptul de memorie buffer, iar cozile RocketMQ sunt discuri persistente, iar datele sunt șterse regulat. Pentru a rezolva această problemă, RocketMQ are o diferență foarte semnificativă față de alte MQ-uri, buffer-ul de memorie al RocketMQ este abstractizat într-o coadă de lungime infinită, indiferent câte date intră, poate fi instalat, această infinitate este bazată pe premisă, brokerul va șterge regulat datele expirate, de exemplu, brokerul salvează doar 3 zile de mesaje, apoi, deși lungimea acestui buffer este infinită, datele de acum 3 zile vor fi șterse de la sfârșitul cozii. Consumul retrospectiv se referă la mesajul că consumatorul a consumat cu succes, iar mesajul trebuie reconsumat din cauza cererii afacerii. De exemplu, din cauza eșecului sistemului de consum, datele de acum o oră trebuie reconsumate după recuperare, apoi brokerul ar trebui să ofere un mecanism pentru a reveni la progresul consumului în funcție de dimensiunea timpului. RocketMQ suportă consumul retrospectiv bazat pe timp, cu o dimensiune temporală precisă la milisecunde, care poate fi urmărită înapoi sau înapoi. Funcția principală a middleware-ului de stivire a mesajelor este decuplarea asincronă, iar o altă funcție importantă este blocarea vârfului de inundație de date din front-end și asigurarea stabilității sistemului back-end, care necesită ca middleware-ul de mesaje să aibă o anumită capacitate de stivuire, iar heap-ul de mesaje integrează următoarele două situații:
- Mesajele sunt acumulate în buffere de memorie, iar odată ce depășesc bufferul de memorie, mesajele pot fi eliminate conform unei anumite politici de tăiere, așa cum este descris în specificația de notificare CORBA. Este potrivit pentru servicii care pot tolera eliminarea mesajelor; în acest caz, capacitatea de acumulare a mesajelor constă în principal în dimensiunea bufferului de memorie, iar degradarea performanței nu va fi prea mare după ce mesajul este suprapus, deoarece cantitatea de date din memorie are un impact limitat asupra capacității de acces oferite lumii exterioare.
- Mesajele sunt adunate în sisteme de stocare persistentă precum DB, KV storage, format de înregistrare de fișiere. Când mesajele nu pot fi accesate în cache-ul de memorie, este inevitabil accesul la disc, ceea ce va genera o cantitate mare de IO de citire, iar debitul de IO de citire determină direct capacitatea de acces a mesajelor după ce acestea sunt acumulate.
Există patru puncte principale pentru a evalua capacitatea de acumulare a mesajelor:
- Câte mesaje pot fi adunate, câți octeți? Adică, capacitatea de heap a mesajului.
- După ce un mesaj este acumulat, este debitul mesajului afectat de stivuire?
- Va fi afectat consumul normal al consumatorilor după acumularea mesajelor?
- După ce mesajele sunt acumulate, care este debitul atunci când accesezi mesajele adunate pe disc?
Tranzacții distribuite Mai multe specificații cunoscute ale tranzacțiilor distribuite, cum ar fi XA, JTA etc. Dintre acestea, specificația XA este larg susținută de principalii furnizori de baze de date, precum Oracle, Mysql etc. Printre acestea, liderul implementării TM al XA, precum Oracle Tuxedo, este folosit pe scară largă în finanțe, telecomunicații și alte domenii. Tranzacțiile distribuite implică probleme de commit în două etape, iar în ceea ce privește stocarea datelor, stocarea KV trebuie suportată, deoarece a doua etapă a rollback-ului de commit trebuie să modifice starea mesajului, ceea ce trebuie să implice acțiunea de a găsi mesajul conform cheii. RocketMQ ocolește problema găsirii mesajului conform cheii în a doua etapă, folosind prima treaptă pentru a trimite mesajul pregătit, obținând offset-ul mesajului, iar a doua treaptă pentru a accesa mesajul prin offset și a modifica starea, offset-ul fiind adresa datelor. Metoda de implementare a tranzacțiilor din RocketMQ nu se face prin stocare KV, ci prin metoda offset, care are un defect semnificativ, și anume că schimbarea datelor prin offset va cauza prea multe pagini murdare în sistem, ceea ce necesită o atenție specială. Mesaje programate Mesajele programate înseamnă că mesajele nu pot fi consumate de consumatori imediat după ce sunt trimise brokerului și pot fi consumate doar la un anumit moment sau după așteptarea unui anumit interval de timp. Dacă vrei să susții o acuratețe arbitrară a timpului, la nivel de broker trebuie să faci sortarea mesajelor, iar dacă este implicată persistență, sortarea mesajelor va implica inevitabil un overhead mare de performanță. RocketMQ suportă mesaje de cronometrare, dar nu suportă o acuratețe arbitrară a timpului și suportă niveluri specifice, cum ar fi cronometrarea la 5, 10, 1m etc. Reîncercare a mesajului După ce consumatorul nu consumă mesajul, oferă un mecanism de reîncercare pentru ca mesajul să fie consumat din nou. Eșecurile mesajelor de consum ale consumatorilor pot fi de obicei luate în considerare în următoarele situații:
- Din cauza mesajului în sine, cum ar fi eșecul de deserializare, datele mesajului nu pot fi procesate (cum ar fi încărcarea facturii de telefon, numărul de telefon mobil al mesajului curent este deconectat, nu poate fi reîncărcat) etc. Această eroare necesită de obicei sărirea peste acest mesaj și consumarea altor mesaje, iar acest mesaj eșuat este 99% nereușit chiar dacă consumul este încercat din nou imediat, așa că este mai bine să se ofere un mecanism de reîncercare temporizată, adică reîncercare după 10 secunde.
- Pentru că serviciile aplicațiilor dependente din aval nu sunt disponibile, cum ar fi conexiunea bazei de date indisponibilă, rețeaua externă a sistemului este inaccesibilă etc. Când se întâmpină această eroare, chiar dacă mesajul eșuat curent este sărit, alte mesaje vor fi consumate și ele. În acest caz, se recomandă aplicarea sleep 30s și consumarea următorului mesaj, ceea ce poate reduce presiunea asupra brokerului de a încerca din nou mesajul.
Prezentare generală RocketMQ Să aflăm dacă RocketMQ rezolvă problemele cu care se confruntă middleware-ul de mesaje menționat mai sus.
Ce este RocketMQ?
Figura de mai sus este un model tipic de middleware de mesaje de trimitere și recepție de mesaje; RocketMQ este de asemenea proiectat astfel, pe scurt, RocketMQ are următoarele caracteristici:
- Este un middleware de tip coadă, cu performanță ridicată, fiabilitate ridicată, caracteristici de timp real ridicat și distribuite.
- Producător, Consumator și Coadă pot fi toate distribuite.
- Producătorul trimite mesaje către unele cozi pe rând, colectarea cozii se numește Topic, Consumer If broadcast consumption, o instanță consumer consumă toate cozile corespunzătoare acestui subiect, iar dacă consumă clusterul, mai multe instanțe consumer consumă colecția de cozi corespunzătoare acestui subiect în mod egal.
- Ordinea strictă a mesajelor poate fi garantată
- Oferă moduri bogate de tragere a mesajelor
- Capabilități eficiente de scalare orizontală a abonaților.
- Mecanism de abonare la mesaje în timp real
- Sute de milioane de mesaje de capacitate de acumulare
- Dependență mai redusă
Structura fizică de implementare a RocketMQ
Așa cum se arată în figura de mai sus, structura de implementare a RocketMQ are următoarele caracteristici:
- Serverul de nume este un nod practic fără stare care poate fi implementat în clustere fără nicio sincronizare a informațiilor între noduri.
- Implementarea Brokerului este relativ complexă, Brokerul este împărțit în Master și Slave, un Master poate corespunde mai multor Slaves, dar un Slave poate corespunde doar unui singur Master, corespondența dintre Master și Slave este definită prin specificarea aceluiași BrokerName, BrokerId-ul diferit, BrokerId este 0 pentru Master, iar non-0 înseamnă Slave. Masterele pot fi de asemenea utilizate în mai multe formate. Fiecare broker stabilește o conexiune lungă cu toate nodurile din clusterul de Servere de Nume și înregistrează informațiile de subiect către toate Serverele de Nume la intervale regulate.
- Producătorul stabilește o conexiune lungă cu unul dintre nodurile din clusterul de servere de nume (selectată aleatoriu), recuperează periodic informații despre rutarea subiectelor de la serverul de nume, stabilește o conexiune lungă cu masterul care furnizează serviciul de subiect și trimite bătăi de inimă către master la intervale regulate. Producer este complet fără stare și poate fi implementat în clustere.
- Consumatorul stabilește o conexiune lungă cu unul dintre nodurile din clusterul de Server de Nume (selectată aleatoriu), recuperează regulat informații de rutare a subiectelor de la Serverul de Nume și stabilește o conexiune lungă cu Maestrul și Slavul care oferă serviciul de subiect, trimițând bătăi de inimă către Maestru și Slav la intervale regulate. Consumatorii se pot abona la mesaje atât de la Master, cât și de la Slave, iar regulile de abonament sunt determinate de configurația Brokerului.
Structura logică de implementare RocketMQ
Așa cum se arată în figura de mai sus, structura logică de implementare a RocketMQ are două caracteristici: Producător și Consumator.
Folosit pentru a reprezenta o aplicație de mesagerie, un Grup de Producători conține mai multe instanțe de Producător, care pot fi mai multe mașini, mai multe procese ale unei mașini sau mai multe obiecte Producător ale unui proces. Un Grup de Producători poate trimite mai multe mesaje Tematice, iar Grupul de Producători funcționează astfel:
- Identifică un tip de producător
- Poți interoga dacă există mai multe instanțe Producer în această aplicație de mesagerie prin instrumentul O&M
- Când se trimite un mesaj de tranzacție distribuită, dacă producătorul cade neașteptat, brokerul va reveni activ la orice mașină din grupul producătorilor pentru a confirma starea tranzacției.
Folosit pentru a reprezenta o aplicație de mesagerie pentru consumatori, un grup de consumatori conține mai multe instanțe de consumatori, care pot fi mai multe mașini, mai multe procese sau mai multe obiecte de consum ale unui proces. Mai mulți consumatori dintr-un grup de consumatori consumă mesajele într-un mod distribuit uniform, iar dacă este setat pe difuzare, fiecare instanță din acest grup consumă întreaga cantitate de date.
Structura de stocare a datelor RocketMQ
Așa cum se arată în figura de mai sus, RocketMQ adoptă o metodă de stocare care separă datele de indexuri. Reducerea eficientă a pierderii resurselor de fișiere, resurselor IO și memoriei. Chiar și cu date masive precum Alibaba, scenariile cu concurență mare pot reduce eficient latența end-to-end și pot avea capacități puternice de scalare orizontală.
|