Configurații Broker
| Proprietate | Implicit | Descriere | | broker.id | | Fiecare broker poate fi identificat cu un ID unic de întreg nenegativ; Acest id poate fi folosit ca "nume" al brokerului, iar existența sa permite brokerului să migreze către un alt host/port fără a crea confuzie pe consumatori. Poți alege orice număr dorești ca ID, atâta timp cât ID-ul este unic. | | log.dirs | /tmp/kafka-logs | Calea prin care Kafka stochează date. Această cale nu este unică, poate fi multiplă, iar căile trebuie separate doar prin virgule; Ori de câte ori se creează o nouă partiție, aceasta este aleasă să o facă sub calea care conține cel mai mic număr de partiții. | | Port | 6667 | Serverul acceptă portul la care clientul se conectează | | zookeeper.connect | zero | Formatul șirului de conexiune ZooKeeper este: hostname:port, unde hostname și port sunt gazda și portul unui nod din clusterul ZooKeeper, respectiv. Pentru a te conecta la alte noduri ZooKeeper când un host cade, poți crea mai multe gazde după cum urmează:
hostname1:port1, hostname2:port2, hostname3:port3.
ZooKeeper îți permite să adaugi o cale "chroot" pentru a stoca toate datele kafka din cluster sub o cale specifică. Când mai multe clustere Kafka sau alte aplicații folosesc același cluster ZooKeeper, poți folosi această metodă pentru a seta calea de stocare a datelor. Acest lucru poate fi implementat prin formatarea șirului de conexiune astfel: Nume gazdă1:Port1,Numegazd2:Port2,Numegazd3:Port3/chroot/Path Această configurație stochează toate datele clusterului kafka sub calea /chroot/path. Rețineți că, înainte de a începe brokerul, trebuie să creați această cale, iar consumatorii trebuie să folosească același format de conexiune. | | message.max.bytes | 1000000 | Dimensiunea maximă a mesajelor pe care un server le poate primi. Este important ca setările consumatorului și producătorului privind această proprietate să fie sincronizate, altfel mesajul publicat de producător este prea mare pentru consumator. | | num.network.threads | 3 | Numărul de fire de rețea folosite de server pentru procesarea cererilor de rețea; În general, nu trebuie să schimbi această proprietate. | | num.io.threads | 8 | Numărul de fire de I/O folosite de server pentru procesarea cererilor; Numărul de fire ar trebui să fie cel puțin egal cu numărul de hard disk-uri. | | background.threads | 4 | Numărul de fire folosite pentru procesarea în fundal, cum ar fi ștergerea fișierelor; Nu trebuie să schimbi această proprietate. | | queued.max.cereri | 500 | Numărul maxim de cereri care pot fi puse la coadă pentru ca un fir de I/O să le proceseze înainte ca un fir de rețea să înceteze să mai citească cererile noi. | | host.name | zero | numele de gazdă al brokerului; Dacă numele gazdă este deja setat, brokerul va fi legat doar de această adresă; Dacă nu există nicio setare, se va lega de toate interfețele și va publica o copie pe ZK | | advertised.host.name | zero | Dacă este setat, este trimis producătorilor, consumatorilor și altor brokeri ca nume gazdă al brokerului | | publicit.port | zero | Acest port este oferit producătorilor, consumatorilor și altor brokeri și este folosit pentru a stabili o conexiune; Trebuie setat doar dacă portul propriu-zis și portul pe care serverul trebuie să-l asocieze sunt diferite. | | socket.trimite.buffer.bytes | 100 * 1024 | SO_SNDBUFF Dimensiunea cache-ului, pe care serverul o folosește pentru a face conexiuni socket | | socket.receive.buffer.bytes | 100 * 1024 | SO_RCVBUFF dimensiunea cache-ului, care este folosită de server pentru a se conecta la socket-uri | | socket.request.max.bytes | 100 * 1024 * 1024 | Dimensiunea maximă a cererii permisă de server; Acest lucru va evita depășirile serverelor, care ar trebui să fie mai mici decât dimensiunea heap-ului Java | | num.partitions | 1 | Dacă numărul de partiții nu este indicat la crearea unui subiect, acest număr va fi numărul implicit de partiții sub subiect. | | log.segment.bytes | 1014*1024*1024 | Jurnalele partiției subiecte sunt stocate în multe fișiere dintr-un anumit director, care împart logurile partiției în segmente; Acest atribut reprezintă dimensiunea maximă a fiecărui fișier; Când dimensiunile ating această valoare, se creează un fișier nou. Această setare poate fi depășită de baza fiecărui subiect. Vizualizare Autentificarea cu hyperlink este vizibilă. | | log.roll.hours | 24 * 7 | Chiar dacă fișierul nu ajunge la log.segment.bytes, un fișier nou este creat ori de câte ori momentul creării fișierului ajunge la această proprietate. Această setare poate fi de asemenea suprascrisă de setările la nivel de tematică; VedereAutentificarea cu hyperlink este vizibilă. | | log.cleanup.policy | delete | | | log.retention.minutes și log.retention.hours | 7 zile | Momentul în care fiecare fișier de jurnal a fost salvat înainte de a fi șters. Timpul implicit de economisire a datelor este același pentru toate subiectele. log.retention.minutes și log.retention.bytes sunt ambele folosite pentru a seta ștergerea fișierelor de jurnal, indiferent de care proprietate a fost depășită. Această setare de proprietate poate fi suprascrisă atunci când subiectul este practic stabilit. VedereAutentificarea cu hyperlink este vizibilă. | | log.retention.bytes | -1 | Cantitatea totală de date salvate de fiecare partiție sub fiecare subiect; Rețineți că aceasta este limita superioară pe fiecare partiție, deci acest număr înmulțit cu numărul de partiții este cantitatea totală de date stocate pe subiect. De asemenea, notă: dacă atât log.retention.hours, cât și log.retention.bytes sunt setate, depășirea oricăreia dintre limite va determina ștergerea unui fișier segment. Rețineți că această setare poate fi suprascrisă de fiecare subiect. VedereAutentificarea cu hyperlink este vizibilă. | | log.retention.check.interval.ms | 5 minute | Verificați intervalul dintre fișierele segmentate în jurnal pentru a determina dacă atributele fișierului îndeplinesc cerințele de ștergere. | | log.cleaner.enable | false | Când această proprietate este setată pe false, va fi ștearsă odată ce log-ul este stocat pentru un timp sau o dimensiune maximă. Dacă este setat pe true, se va întâmpla când atribuitul de salvare atinge limita superioarăAutentificarea cu hyperlink este vizibilă.。 | | log.cleaner.threads | 1 | Numărul de fire de execuție care efectuează compresia logarithmică | | log.cleaner.io.max.bytes.pe.second. | Niciunul | Numărul maxim de I/O-uri pe care un curățător de bușteni îl poate avea atunci când efectuează o compactare a buștenilor. Această setare limitează curățătorul pentru a evita interferența cu serviciile active de solicitare. | | log.cleaner.io.buffer.size | 500*1024*1024 | Log Cleaner indexează jurnalele în timpul procesului de curățare și reduce dimensiunea cache-ului folosit. Este recomandat să o setezi mare pentru a oferi suficientă memorie. | | log.cleaner.io.buffer.load.factor | 512*1024 | Dimensiunea bucății de I/O necesară pentru curățarea buștenilor. Nu trebuie să schimbi această setare. | | log.cleaner.io.buffer.load.factor | 0.9 | factorul de încărcare al tabelului hash folosit în curățarea logurilor; Nu trebuie să schimbi această opțiune. | | log.cleaner.backoff.ms | 15000 | Intervalul de timp la care logănicul este curățat se realizează | | log.cleaner.min.cleanable.ratio | 0.5 | Această configurație controlează cât de des încearcă compactorul de bușteni să curețe buștenii (presupunândAutentificarea cu hyperlink este vizibilă.este deschis). În mod implicit, evită curățarea a mai mult de 50% din bușteni. Acest raport este legat de spațiul maxim consumat de jurnalul de backup (50% din loguri sunt comprimate la 50%). O rată mai mare înseamnă mai puțină deșeuri și mai mult spațiu care poate fi curățat mai eficient. Această setare poate fi suprascrisă în fiecare setare tematică. VedereAutentificarea cu hyperlink este vizibilă.。 | | log.cleaner.delete.retention.ms | 1 zi | timp de depozitare; Timpul maxim pentru păstrarea buștenilor comprimați; Este, de asemenea, timpul maxim pentru ca clientul să consume mesaje, iar diferența dintre log.retention.minutes este că unul controlează datele necomprimate, iar celălalt controlează datele comprimate, care vor fi suprascrise cu timpul specificat la crearea subiectului. | | log.index.size.max.bytes | 10*1024*1024 | Dimensiunea maximă pe segment de log. Rețineți că dacă dimensiunea logaritimită atinge această valoare, trebuie generat un nou segment logaritmic chiar dacă dimensiunea nu depășește limita log.segment.bytes. | | log.index.interval.bytes | 4096 | Când faci un fetch, trebuie să scanezi cel mai apropiat offset cu un anumit spațiu, cu cât setarea este mai mare, cu atât mai bine, în general folosește valoarea implicită | | log.flush.interval.messages | Long.MaxValue | fișierul de jurnal "sincronizează" pe disc înainte de a acumula mesajele. Deoarece operațiunile IO ale discului sunt lente, dar și un mijloc necesar de "fiabilitate a datelor", verificați dacă este necesar intervalul de timp pentru a se întări pe hard disk. Dacă această valoare este prea mare, va duce la un timp prea lung de "sincronizare" (blocare IO), dacă această valoare este prea mică, va duce la un timp lung de "fsync" (blocare IO), iar dacă această valoare este prea mică, va duce la un număr mare de timpuri de "sincronizare", ceea ce înseamnă că cererea generală a clientului are o anumită întârziere, iar eșecul serverului fizic va duce la pierderea mesajelor fără fsync. | | log.flush.scheduler.interval.ms | Long.MaxValue | Verifică dacă sunt necesare intervale fsync | | log.flush.interval.ms | Long.MaxValue | Acest număr este folosit pentru a controla intervalul de timp al "fsync", dacă numărul de mesaje nu ajunge niciodată la numărul de mesaje consolidate pe disc, dar intervalul de timp de la ultima sincronizare a discului atinge pragul, sincronizarea discului va fi de asemenea declanșată. | | log.delete.delay.ms | 60000 | Timpul de retenție după ce fișierul este eliminat în index, în general, nu trebuie modificat | | auto.create.topics.enable | true | Dacă să permită crearea automată a subiectelor. Dacă este adevărat, va crea automat un subiect care nu există atunci când produce sau fetch nu există. Altfel, trebuie să folosești linia de comandă pentru a crea un subiect | | controller.socket.timeout.ms | 30000 | Timpul de timeout al socket-ului când controlerul de gestionare a partițiilor efectuează un backup. | | controller.message.queue.size | Int.MaxValue | controller-la-broker-channles | | implicit.replication.factor | 1 | Numărul implicit de copii de rezervă se referă doar la subiectele create automat | | replica.lag.time.max.ms | 10000 | Dacă un urmăritor nu trimite o cerere de preluare în acest timp, liderul îl va retrage din nou din ISR și va considera că acesta este spânzurat | | replica.lag.max.mesaje | 4000 | Dacă o replică are mai mult de acest număr de nesusținute, liderul va elimina urmăritorul și va considera că urmează să fie agățat | | replica.socket.timeout.ms | 30*1000 | Timpul de timeout al liderului pentru cererile de rețea socket la backup-ul datelor | | replica.socket.receive.buffer.bytes | 64*1024 | Bufferul de recepție socket atunci când se trimite o cerere de rețea către lider în timpul backup-ului | | replica.fetch.max.bytes | 1024*1024 | Valoarea maximă a fiecărei preluări la momentul backup-ului | | replica.fetch.min.bytes | 500 | Timpul maxim de așteptare pentru ca datele să ajungă la lider atunci când liderul face o cerere de backup | | replica.fetch.min.bytes | 1 | Cea mai mică dimensiune a răspunsului după fiecare preluare la backup | | num.replica.fetchers | 1 | Numărul de fire care susțin datele de la lider | | replica.high.watermark.checkpoint.interval.ms | 5000 | Fiecare replică verifică cât de des este întărit cel mai înalt nivel de apă | | fetch.purgatory.purge.interval.requests | 1000 | cerere de preluare intervalul de purjare | | producer.purgatory.purge.interval.requests | 1000 | producătorul solicită un interval de purjare | | zookeeper.session.timeout.ms | 6000 | Timeout pentru sesiunea de îngrijire a grădinii zoologice. | | zookeeper.connection.timeout.ms | 6000 | Timpul maxim pe care clientul îl așteaptă pentru a stabili o legătură cu îngrijitorul grădinii zoologice | | zookeeper.sync.time.ms | 2000 | Urmăritorul ZK rămâne mult timp în urma liderului ZK | | controlled.shutdown.enable | true | Dacă este posibil să controlezi închiderea brokerului. Dacă este posibil, brokerul va putea muta toți liderii către alți brokeri înainte de închidere. Acest lucru reduce indisponibilitatea în timpul procesului de oprire. | | controlled.shutdown.max.reîncercări | 3 | Numărul de comenzi care pot executa cu succes o oprire înainte de a efectua o oprire incompletă. | | controlled.shutdown.retry.backoff.ms | 5000 | Timp de retragere între opriri | | auto.leader.rebalance.enable | true | Dacă acest lucru este adevărat, controlerul va echilibra automat conducerea brokerilor peste partiții | | lider.dezechilibru.per.broker.procentaj | 10 | Raportul maxim de dezechilibru permis de fiecare broker | | lider.dezechilibru.verifică.interval.secunde | 300 | Verifică frecvența dezechilibrului liderului | | offset.metadata.max.bytes | 4096 | Permite clienților să salveze numărul maxim de offset-uri | | max.connections.per.ip | Int.MaxValue | Numărul maxim de conexiuni per broker poate fi făcut către fiecare adresă IP | | max.connections.per.ip.overrides | | Acoperirea maximă a conexiunii implicite pe fiecare IP sau nume de gazdă | | connections.max.idle.ms | 600000 | Limită de timeout pentru conexiunile goale | | log.roll.jitter. {ms,hours} | 0 | Numărul maxim de jitter-uri abstractizate din logRollTimeMillis | | num.recovery.threads.per.data.dir | 1 | Numărul de fire de discuție pe care fiecare director de date îl folosește pentru a înregistra recuperarea | | necurat.lider.alegeri.permite | true | Indică dacă este posibil să se folosească setarea non-replica în ISR ca lider | | delete.topic.enable | false | Posibilitatea de a șterge subiectele | | offsets.topic.num.partitions | 50 | Numărul de partiții pentru subiectul de commit offset. Deoarece schimbarea acestui lucru după implementare nu este momentan suportată, recomandăm folosirea unei setări mai mari pentru producție (de exemplu, 100-200). | | offsets.topic.retention.minutes | 1440 | Offset-urile care au existat de mai mult timp vor fi marcate ca fiind în așteptare de ștergere | | offsets.retention.check.interval.ms | 600000 | Managerul de offset verifică frecvența deplasărilor învechite | | offsets.topic.replication.factor | 3 | Numărul de copii de rezervă ale offset-ului subiectului. Se recomandă setarea unor numere mai mari pentru a garanta o disponibilitate mai mare | | offset.topic.segment.bytes | 104857600 | Schimbă subiectul. | | offsets.load.buffer.size | 5242880 | Această setare este legată de dimensiunea lotului și este folosită atunci când citești din segmentul de offset. | | offsets.commit.required.acks | -1 | Numărul de confirmări trebuie setat înainte ca commit-ul offset să fie acceptabil și, în general, nu trebuie modificat |
| Proprietate | Implicit | Proprietatea implicită a serverului | Descriere | | cleanup.policy | delete | log.cleanup.policy | Fie "șterge", fie "compactează"; Acest șir indică cum să folosești vechea porțiune de log; Metoda implicită ("ștergere") va elimina piesa veche atunci când se atinge limita de timp sau dimensiune a reciclării. "compact" va comprima buștenii | | delete.retention.ms | 86400000 (24 de ore) | log.cleaner.delete.retention.ms | Diferența dintre log.retention.minutes este că unul controlează datele necomprimate, iar celălalt controlează datele comprimate. Această configurație poate fi suprascrisă de parametrii de fixare atunci când subiectul este creat | | flush.messages | Niciunul | log.flush.interval.messages | Această configurație specifică un interval de timp pentru a forța jurnalele fsync. De exemplu, dacă această opțiune este setată la 1, atunci fsync este necesar după fiecare mesaj, iar dacă este setat la 5, fsync este necesar pentru fiecare 5 mesaje. În general, se recomandă să nu setați această valoare. Setarea acestui parametru necesită compromisul necesar între "fiabilitatea datelor" și "performanță". Dacă această valoare este prea mare, va cauza un timp lung de "fsync" de fiecare dată (blocarea IO), iar dacă această valoare este prea mică, va duce la un număr mare de "fsync", ceea ce înseamnă și că există o anumită întârziere în cererea generală a clientului. O defecțiune fizică a serverului va duce la pierderea mesajelor fără fsync. | | flush.ms | Niciunul | log.flush.interval.ms | Această configurație este folosită pentru a fixa intervalul de timp dintre forțarea jurnalelor fsync pe disc; De exemplu, dacă este setat la 1000, atunci fsync este necesar la fiecare 1000ms. Această opțiune nu este, în general, recomandată | | index.interval.bytes | 4096 | log.index.interval.bytes | Setarea implicită asigură că adăugăm un index la mesaj la fiecare 4096 octeți, iar mai mulți indici fac mesajul citit mai apropiat, dar dimensiunea indexului va fi mărită. Această opțiune, în general, nu este obligatorie | | max.message.bytes | 1000000 | max.message.bytes | Dimensiunea maximă a mesajului de adăugare kafka. Reține că dacă mărești această dimensiune, trebuie să mărești și dimensiunea de preluare a consumatorului tău astfel încât acesta să poată prelua mesajele la aceste dimensiuni maxime. | | raport min.curățabil.murdar. | 0.5 | raport min.curățabil.murdar. | Această configurație controlează cât de des încearcă compresorul de loguri să elimine logurile. În mod implicit, logurile cu o rată de compresie de peste 50% vor fi evitate. Acest raport evită cea mai mare risipă de spațiu | | min.insync.replicas | 1 | min.insync.replicas | Când producătorul este setat la request.required.acks la -1, min.insync.replicas specifică numărul minim de replici (fiecare scriere repica trebuie să fie reușită), iar dacă acest număr nu este atins, producătorul va produce o excepție. | | retention.bytes | Niciunul | log.retention.bytes | Dacă folosești politica de retenție "șterge", această configurație se referă la dimensiunea maximă pe care jurnalul o poate atinge înainte de a fi șters. În mod implicit, nu există o limită de mărime, ci doar o limită de timp | | retention.ms | 7 zile | log.retention.minutes | Dacă folosești politica de retenție "șterge", această configurație se referă la momentul în care jurnalul a fost salvat înainte de jurnalul de ștergere. | | segment.bytes | 1GB | log.segment.bytes | În Kafka, jurnalii sunt stocați în bucăți, iar această configurație se referă la dimensiunea buștenilor împărțiți în bucăți | | segment.index.bytes | 10MB | log.index.size.max.bytes | Această configurație este aproximativ dimensiunea fișierului index mapat între deplasări și locațiile fișierului; Această configurație, în general, nu trebuie modificată | | segment.ms | 7 zile | log.roll.hours | Chiar dacă fișierul log chunk nu atinge dimensiunea necesară pentru ștergere sau comprimare, odată ce timpul loglog atinge această limită superioară, un nou fișier log hunk va fi forțat | | segment.jitter.ms | 0 | log.roll.jitter. {ms,hours} | Jitter-ul maxim de scăzut din logRollTimeMillis. |
Configurații pentru consumatori
| Proprietate | Implicit | Descriere | | group.id | | Un șir care identifică în mod unic grupul în care se află procesul de consum și, dacă același ID de grup este setat, înseamnă că aceste procese aparțin aceluiași grup de consumatori | | zookeeper.connect | | Specifică șirul conexiunii Zookeeper, formatul este hostname:port, aici gazda și portul sunt atât gazda, cât și portul serverului Zookeeper, pentru a evita pierderea contactului după ce o mașină Zookeeper se oprește, poți specifica mai multe hostname:ports, folosind virgule ca separare: Nume gazdă1:port1,Numegazdă2:Port2,Numegazd3:Port3 Poți adăuga traseul chroot al ZooKeeper la șirul de conexiune ZooKeeper, care este folosit pentru a-și stoca propriile date, într-un fel: Nume gazdă1:Port1,Numegazd2:Port2,Numegazd3:Port3/chroot/Path | | consumer.id | zero | Nu este necesară nicio configurare și, în general, este generată automat | | socket.timeout.ms | 30*100 | Limitele de timeout pentru cererile de rețea. Limita reală de timeout este max.fetch.wait+socket.timeout.ms | | socket.receive.buffer.bytes | 64*1024 | Socket este folosit pentru a primi dimensiunea cache-ului cererilor de rețea | | fetch.message.max.bytes | 1024*1024 | Numărul maxim de octeți per mesaj de preluare pentru fiecare cerere de preluare. Acești octeți vor fi supravegheați în memoria folosită pentru fiecare partiție, astfel încât această setare va controla cantitatea de memorie folosită de consumator. Dimensiunea cererii de preluare trebuie să fie cel puțin egală cu dimensiunea maximă a mesajului permisă de server, altfel dimensiunea mesajului pe care producătorul îl poate trimite este mai mare decât dimensiunea pe care o poate consuma consumatorul. | | num.consumer.fetchers | 1 | Numărul de fire de preluare folosite pentru datele de preluare | | auto.commit.enable | true | Dacă este adevărat, offset-ul mesajului preluat de consumator va fi sincronizat automat cu îngrijitorul grădinii zoologice. Acest offset de commit va fi folosit de noul consumator când procesul se blochează | | auto.commit.interval.ms | 60*1000 | Frecvența la care consumatorul trimite un offset către îngrijitorul grădinii zoologice este în câteva secunde | | queued.max.message.chunks | 2 | Numărul maxim de mesaje folosite pentru stocare în cache pentru consum. Fiecare chunk trebuie să fie la fel ca fetch.message.max.bytes | | rebalance.max.reîncercări | 4 | Când un nou consumator este adăugat într-un grup de consumatori, grupul de consumatori încearcă să reechilibreze numărul de partiții alocate fiecărui consumator. Dacă colectarea consumatorilor se schimbă, această reechilibrare eșuează și reintrifică atunci când alocarea este executată | | fetch.min.bytes | 1 | Numărul minim de octeți pe care serverul ar trebui să-l returneze la fiecare cerere de preluare. Dacă nu se returnează suficiente date, cererea așteaptă până când se returnează suficiente date. | | fetch.wait.max.ms | 100 | Dacă nu există suficiente date pentru a satisface fetch.min.bytes, această configurație se referă la timpul maxim pe care serverul îl va bloca înainte de a răspunde la o cerere de preluare. | | rebalance.backoff.ms | 2000 | Timp de retragere înainte de a reîncerca reblance | | refresh.leader.backoff.ms | 200 | Există un timp de așteptare înainte de a încerca să se determine dacă liderul unei partiții și-a pierdut conducerea | | auto.offset.reset | cea mai mare | Dacă nu există un offset inițializat în Zookeeper, dacă offset este un răspuns la următoarea valoare: cel mai mic: Offset de resetare automată la cel mai mic offset cel mai mare: Offset de resetare automată la offset-ul celui mai mare Orice altceva: Aruncă o excepție pentru consumator | | consumer.timeout.ms | -1 | Dacă nu este disponibil niciun mesaj, chiar și după așteptarea unui anumit interval de timp, se aruncă o excepție de timeout | | exclude.internal.topics | true | Dacă să expui mesajele din subiecte interne către consumatori | | paritition.assignment.strategy | Răspândire | Selectează politica pentru atribuirea partițiilor fluxului consumator, opțional interval, roundrobin | | client.id | Valoarea ID-ului grupului | este un șir specific utilizatorului care ajută la urmărirea apelurilor din fiecare cerere. Ar trebui să confirme logic aplicația care a generat cererea | | zookeeper.session.timeout.ms | 6000 | Limitele de timeout pentru ședințele cu îngrijitorul grădinii zoologice. Dacă consumatorul nu trimite un mesaj de bătăi de inimă îngrijitorului în această perioadă, acesta este considerat blocat și va fi generat un raport | | zookeeper.connection.timeout.ms | 6000 | Timpul maxim de așteptare pentru ca un client să stabilească o conexiune cu îngrijitorul grădinii zoologice | | zookeeper.sync.time.ms | 2000 | Urmăritorii ZK pot rămâne în urmă față de liderul ZK pentru un timp maxim | | offsets.storage | îngrijitor de grădină zoologică | Locuri folosite pentru depozitarea compensațiilor: zookeeper sau kafka | | offset.channel.backoff.ms | 1000 | Timpul de backoff pentru reconectarea la canalul de offsets sau pentru reîncercarea cererii de fetch/commit a offset-ului eșuat | | offsets.channel.socket.timeout.ms | 10000 | Limita de timeout a socket-ului pentru răspunsul la cererea de fetch/commit atunci când citește offset-ul. Această limită de timeout este folosită de cererea consumerMetadata pentru a solicita gestionarea offset-urilor | | offsets.commit.max.reîncercări | 5 | Numărul de ori când comm-ul offset a fost reîncercat. Această reîncercare se aplică doar pentru a compensa commit-urile între opriri. El | | dual.commit.enabled | true | Dacă folosești "kafka" ca offsets.storage, poți face commit offset la zookeeper de două ori (și o dată la kafka). Acest lucru este esențial atunci când se migrează de la stocarea offset bazată pe îngrijitori zoologici la cea bazată pe kafka. Pentru orice grup de consumatori, este o recomandare sigură să dezactivezi această opțiune după finalizarea migrării | | partition.assignment.strategy | Răspândire | Alege între politicile "range" și "roundrobin" ca politici pentru atribuirea partițiilor fluxurilor de date ale consumatorilor; Alocatorul circular de partiții alocă toate partițiile disponibile, precum și toate firele de utilizare disponibile. Va atribui bucla de partiție firului de consum. Dacă toate instanțele de consum sunt abonate la un determinat, partițiile sunt împărțite în distribuții deterministe. Strategia de alocare round-robin este posibilă doar dacă sunt îndeplinite următoarele condiții: (1) Fiecare subiect are același număr de fluxuri de date pe forță de consumator. (2) Colecția subiectelor abonate este determinată pentru fiecare instanță de consumator din grupul de consumatori. |
Configurații ale producătorilor
| Proprietate | Implicit | Descriere | | metadata.broker.list | | Servește prin auto-sarcină. Producer este folosit doar pentru a obține metadate (subiecte, partiții, replici). Conexiunea socket pentru a trimite datele reale va fi stabilită pe baza metadatelor returnate. Formatul este: host1:port1,gazdă2:port2 Această listă poate fi o sublistă a brokerilor sau un VIP care indică brokeri | | request.required.acks | 0 | Această configurație este o valoare de confirmare care indică când o cerere de producție este considerată completă. În special, câți alți brokeri trebuie să fi trimis date în jurnalele lor și să fi confirmat aceste informații liderului lor. Valorile tipice includ: 0: Indică faptul că producătorul nu așteaptă niciodată confirmarea de la broker (același comportament ca 0.7). Această opțiune oferă cea mai mică latență, dar în același timp cel mai mare risc (deoarece datele se pierd când serverul cade). 1: Indică faptul că replica liderului a primit confirmarea datelor. Această opțiune are o latență scăzută și asigură că serverul confirmă că a fost primit. -1: Producătorul primește confirmarea că toate replicile sincronizate au primit date. Latența este cea mai mare, însă această metodă nu elimină complet riscul pierderii mesajelor, deoarece numărul replicilor sincronizate poate fi 1. Dacă doriți să vă asigurați că unele replici primesc date, atunci ar trebui să setați opțiunea min.insync.replicas în setările la nivel de subiect. Citește documentația de design pentru o discuție mai detaliată. | | request.timeout.ms | 10000 | Brokerul încearcă din răsputeri să implementeze cerința request.required.acks, altfel o eroare va fi trimisă clientului | | producer.type | sincronizare | Această opțiune fixează dacă mesajul este trimis asincron într-un fir de conversație de fundal. Valori corecte: (1) asincron: Trimitere asincron (2) sincronizare: Trimitere sincronizată Prin setarea producătorului pe asincron, putem procesa cererile în loturi (ceea ce este bun pentru un debit mai mare), dar acest lucru creează și posibilitatea ca mașina client să piardă date netrimise | | serializer.class | kafka.serializer.DefaultEncoder | Categoria de serializare a mesajului. Codificatorul implicit introduce un octet[] și returnează același octet[] | | key.serializer.class | | Clasa de serializare pentru cuvintele cheie. Dacă acest lucru nu este dat, implicit este să se potrivească mesajul | | partitioner.class | kafka.producer.DefaultPartitioner | clasa partitioner pentru a împărți mesajele între subsubiecte. Partiționerul implicit se bazează pe tabelul hash al cheii | | compression.codec | niciunul | Acest parametru poate seta codecul pentru comprimarea datelor, care poate fi selectat ca "none", "gzip", "snappy". | | subiecte.comprimate | zero | Acest parametru poate fi folosit pentru a determina dacă anumite subiecte sunt comprimate. Dacă codec-ul comprimat este un codec diferit de NoCompressCodec, aceste codecuri sunt aplicate datelor specificate subiectelor. Dacă lista de subiecte comprimate este goală, aplică codec-ul comprimat specific tuturor subiectelor. Dacă codecul comprimat este NoCompressionCodec, compresia nu este disponibilă pentru toate subiectele. | | message.send.max.reîncercări | 3 | Acest parametru va determina producătorul să reîncerce automat cererile de trimitere eșuate. Acest parametru fixează numărul de încercări. Notă: Setarea unei valori non-0 va cauza repetarea anumitor erori de rețea: va cauza o trimitere și va cauza pierderea confirmării | | retry.backoff.ms | 100 | Înainte de fiecare reîncercare, producătorul actualizează metadatele subiectului relevant pentru a vedea dacă noul lider este atribuit. Deoarece selecția liderului durează puțin timp, această opțiune specifică cât timp trebuie să aștepte producătorul înainte de a actualiza metadatele. | | topic.metadata.refresh.interval.ms | 600*1000 | Producătorul actualizează în general metadatele subiectului în unele scenarii de eșec (lipsă de partiție, lider indisponibil etc.). Va urma un ciclu regulat. Dacă îl setezi la o valoare negativă, metadatele vor fi actualizate doar dacă eșuează. Dacă este setat la 0, metadatele sunt actualizate după fiecare mesaj trimis (această opțiune nu este recomandată, sistemul consumă prea mult). Important: Actualizările apar doar după ce mesajul este trimis, astfel încât dacă producătorul nu trimite niciodată mesajul, metadatele nu sunt niciodată actualizate. | | queue.buffering.max.ms | 5000 | Intervalul maxim de timp la care utilizatorul stochează datele în cache atunci când este aplicat modul asincron. De exemplu, dacă mesajul este setat la 100, mesajele aflate la 100ms vor fi procesate în loturi. Acest lucru va îmbunătăți debitul, dar va crește latența datorită cache-ului. | | queue.buffering.max.mesaje | 10000 | Când se folosește modul asincron, numărul maxim de mesaje netrimise care pot fi stocate în cache înainte de producător trebuie blocat sau datele trebuie pierdute | | batch.num.messages | 200 | Când folosești modul asincron, poți procesa în lot numărul maxim de mesaje. Sau numărul de mesaje a ajuns online sau queue.buffer.max.ms a sosit, iar producătorul le va procesa | | send.buffer.bytes | 100*1024 | Dimensiunea memoriei de scriere a socket-ului | | client.id | “” | Acest id de client este un șir specific utilizatorului care este inclus în fiecare cerere pentru a urmări apelul, iar el ar trebui logic să poată confirma că aplicația a făcut cererea. |
Configurații ale producătorilor
| Nume | Tip | Implicit | Importanță | Descriere | | boostrap.servers | Listă | | Sus | Grup gazdă/port pentru a stabili o conexiune cu clusterul kafka. Datele vor fi încărcate uniform pe toate serverele, indiferent de serverul desemnat pentru bootstrapping. Această listă afectează doar gazdele inițializate (care sunt folosite pentru a descoperi toate serverele). Acest format de listă:
host1:port1,host2:port2,... Deoarece aceste servere sunt folosite doar pentru a iniția conexiuni pentru a descoperi toate apartenențele clusterului (care se pot schimba dinamic), această listă nu trebuie să conțină toate serverele (este posibil să doriți mai mult de un server, deși în acest caz unul poate fi căzut). Dacă nu apare niciun server în această listă, trimiterea datelor va eșua până când lista va fi disponibilă. | | Acks | Șirul | 1 | Sus | Producătorul are nevoie de un semnal de la server pentru a confirma recepția după primirea datelor, iar această configurație se referă la câte astfel de semnale de confirmare are nevoie procuderul. Această configurație reprezintă de fapt disponibilitatea backup-urilor de date. Următoarele setări sunt opțiuni comune: (1) acks=0: Setat la 0 înseamnă că producătorul nu trebuie să aștepte nicio confirmare a informațiilor primite. Replica va fi adăugată imediat în bufferul soclului și considerată trimisă. Nu există nicio garanție că serverul a primit cu succes datele în acest caz, iar reîncercarea configurației nu va funcționa (deoarece clientul nu știe dacă a eșuat), iar offset-ul feedback-ului va fi întotdeauna setat la -1. (2) acks=1: Asta înseamnă că cel puțin să aștepte ca liderul să scrie cu succes datele în jurnalul local, dar nu ca toți urmăritorii să scrie cu succes. În acest caz, dacă urmăritorul nu face backup cu succes la date și liderul închide din nou, mesajul se va pierde. (3) acks=toți: Aceasta înseamnă că liderul trebuie să aștepte ca toate backup-urile să scrie cu succes jurnalele, iar această strategie va asigura că datele nu vor fi pierdute atâta timp cât supraviețuiește un backup. Aceasta este cea mai puternică garanție. (4) Sunt posibile și alte setări, cum ar fi acks=2, care vor necesita un anumit număr de acks, dar această strategie este în general rar folosită. | | buffer.memory | lung | 33554432 | Sus | Producătorul poate fi folosit pentru a stoca în cache dimensiunea memoriei datelor. Dacă datele sunt generate mai rapid decât sunt trimise brokerului, producătorul va bloca sau arunca o excepție, indicată prin "block.on.buffer.full".
Această setare va fi legată de memoria totală pe care producătorul o poate folosi, dar nu este o limită strictă, deoarece nu toată memoria folosită de producător este folosită pentru caching. O parte din memorie suplimentară este folosită pentru compresie (dacă se introduce compresia), iar o parte este folosită pentru cereri de mentenanță. | | compression.type | Șirul | niciunul | Sus | Producător este tipul de compresie folosit pentru a comprima datele. Implicit este necomprimat. Valorile corecte ale opțiunilor sunt none, gzip, snappy. Compresia este cea mai bună utilizare pentru procesarea în loturi, cu cât sunt procesate mai multe mesaje în loturi, cu atât performanța compresiei este mai bună. | | Reîncercări | int | 0 | Sus | Setarea unei valori mai mari decât 0 va determina clientul să retrimită orice dată odată ce acestea eșuează. Rețineți că aceste încercări nu diferă de cele în care clientul primește o eroare de trimitere. Permite reîncercărilor pentru a schimba eventual ordinea datelor; dacă ambele înregistrări de mesaj sunt trimise către aceeași partiție, primul mesaj eșuează, iar al doilea apare mai devreme decât primul. | | batch.size | int | 16384 | Medie | Producătorul va încerca să grupeze înregistrările mesajelor pentru a reduce numărul de cereri. Acest lucru va îmbunătăți performanța dintre client și server. Această configurație controlează numărul implicit de octeți ai mesajelor de procesare în lot. Nu se fac încercări de a procesa octeții de mesaj mai mari decât acest număr de octeți. Cererile trimise brokerilor vor conține mai multe loturi, care vor conține o cerere pentru fiecare partiție. Valorile de batching mai mici sunt mai puțin folosite și pot reduce debitul (0 va folosi doar procesarea batch). Valorile de loturi mai mari consumă mai mult spațiu de memorie, așa că trebuie să aloci memorie pentru anumite valori de lot. | | client.id | Șirul | | Medie | Acest șir este trimis serverului atunci când se face o cerere. Scopul este de a putea urmări sursa cererilor pentru a permite unor aplicații din afara listei de permisiuni IP/Port să trimită informații. Această aplicație poate seta orice șir, deoarece nu are alt scop funcțional decât înregistrarea și urmărirea | | linger.ms | lung | 0 | Medie | Grupul de producători va agrega orice mesaje care sosesc între cerere și trimitere, înregistrând un lot separat de cereri. De obicei, acest lucru se întâmplă doar când înregistrarea este generată mai rapid decât rata de trimitere. Totuși, în anumite condiții, clientul va dori să reducă numărul de cereri sau chiar să ajungă la o încărcătură moderată. Această configurare se face prin adăugarea unei mici întârzieri – adică, în loc să trimită imediat un disc, producătorul va aștepta un anumit timp de întârziere pentru a permite trimiterea altor înregistrări de mesaje, care pot fi grupate. Acesta poate fi considerat un algoritm similar cu TCP Nagle. Această setare stabilește o limită de latență mai mare pentru batching: odată ce obținem dimensiunea batch.-ului unei partiții, aceasta o va trimite imediat indiferent de această setare, dar dacă primim un mesaj cu un număr mult mai mic de octeți decât această setare, trebuie să "așteptăm" un anumit timp pentru a primi mai multe mesaje. Această setare este implicită la 0, adică fără întârziere. Setarea linger.ms=5, de exemplu, va reduce numărul de cereri, dar în același timp va crește întârzierea cu 5ms. | | max.request.size | int | 1028576 | Medie | Numărul maxim de octeți solicitați. Aceasta este, de asemenea, o acoperire eficientă pentru dimensiunea maximă înregistrată. Notă: Serverul are propria suprascriere a dimensiunilor înregistrărilor mesajelor, care sunt diferite de această setare. Această setare limitează numărul de cereri pe care producătorii le pot trimite în masă simultan pentru a preveni un număr mare de cereri. | | prime.buffer.bytes | int | 32768 | Medie | Dimensiunea cache-ului de recepție TCP, care este folosită la citirea datelor | | send.buffer.bytes | int | 131072 | Medie | Dimensiunea cache-ului de trimitere TCP, care este folosită la trimiterea datelor | | timeout.ms | int | 30000 | Medie | Această opțiune de configurare controlează timpul maxim în care serverul așteaptă confirmarea de la urmăritori. Dacă numărul de cereri confirmate nu este îndeplinit în acest timp, se returnează o eroare. Această limită de timeout este măsurată pe partea de server și nu are latență de rețea, inclusiv cererile | | blochează.on.buffer.full | Boolean | true | joasă | Când memoria noastră cache se epuizează, trebuie să încetăm să mai primim noi înregistrări de mesaje sau să aruncăm erori. În mod implicit, acest lucru este setat pe adevărat, însă unele blocări s-ar putea să nu merite așteptate, așa că este mai bine să arunci o eroare imediat. Acesta este cazul când este setat pe false: producătorul aruncă o eroare de excepție: BufferExhaustedException dacă înregistrarea a fost trimisă și cache-ul este plin | | metadata.fetch.timeout.ms | lung | 60000 | joasă | Se referă la datele pentru prima dată ale unor elemente pe care le-am obținut. Elementele includ: subiect, gazdă, partiții. Această configurație se referă la timpul necesar pentru ca elementul să se finalizeze cu succes conform fetch-ului, altfel o excepție va fi trimisă clientului. | | metadata.max.age.ms | lung | 300000 | joasă | Timpul în microsecunde este intervalul la care forțăm actualizarea metadatelor. Chiar dacă nu vedem nicio schimbare în conducerea partiției. | | metric.reporteri | Listă | [] | joasă | O listă de clase folosite pentru măsurarea metricilor. Implementarea interfeței MetricReporter va permite adăugarea unor clase care se schimbă pe măsură ce sunt generate noi metrici. JmxReporter va include întotdeauna o modalitate de a înregistra statisticile JMX | | metrics.num.samples | int | 2 | joasă | Numărul de eșantioane folosite pentru menținerea metricilor | | metrics.sample.window.ms | lung | 30000 | joasă | Sistemul Metrics menține un număr configurabil de mostre într-o fereastră corectabilă. Această configurație configurează, de exemplu, dimensiunea ferestrei. Putem menține două probe pe o perioadă de 30 de secunde. Când o fereastră este desfăcută, ștergem și rescriem cea mai veche fereastră | | recoonect.backoff.ms | lung | 10 | joasă | Când conexiunea eșuează, timpul de așteptare când ne reconectăm. Astfel se evită reconectările repetate ale clienților | | retry.backoff.ms | lung | 100 | joasă | Timpul de așteptare înainte de a încerca să reîncerci o cerere de produse eșuate. Evită să rămâi blocat într-un cerc mort de trimite-eșec.
|
|