Konfiguracje brokera
| Własność | Domyślny | Opis | | broker.id | | Każdego brokera można zidentyfikować za pomocą unikalnego, nieujemnego identyfikatora liczby całkowitej; Ten identyfikator może być używany jako "nazwa" brokera, a jego istnienie pozwala brokerowi na migrację na inny host/port bez wprowadzania konsumentów w błąd. Możesz wybrać dowolny numer jako identyfikator, pod warunkiem, że jest unikalny. | | log.dirs | /tmp/kafka-logs | Ścieżka, na której Kafka przechowuje dane. Ta ścieżka nie jest unikalna, może być wielka, a ścieżki muszą być oddzielone tylko przecinkami; Za każdym razem, gdy tworzona jest nowa partycja, wybiera się ją na ścieżce zawierającej najmniejszą liczbę partycji. | | port | 6667 | serwer akceptuje port, do którego klient się łączy | | zookeeper.connect | zero | Format ciągu połączeń ZooKeeper to: hostname:port, gdzie hostname i port to odpowiednio host i port węzła w klastrze ZooKeeper. Aby połączyć się z innymi węzłami ZooKeeper, gdy host przestaje działać, możesz utworzyć wiele hostów w następujący sposób:
hostname1:port1, hostname2:port2, hostname3:port3.
ZooKeeper pozwala dodać ścieżkę "chroot" do przechowywania wszystkich danych Kafka w klastrze pod określoną ścieżką. Gdy wiele klastrów Kafki lub innych aplikacji korzysta z tego samego klastra ZooKeeper, możesz użyć tej metody do ustawienia ścieżki przechowywania danych. Można to zaimplementować, formatując ciąg połączeń w następujący sposób: hostname1:port1,hostname2:port2,hostname3:port3/chroot/path To ustawienie przechowuje wszystkie dane klastra kafka pod ścieżką /chroot/path. Pamiętaj, że przed uruchomieniem brokera musisz stworzyć tę ścieżkę, a konsumenci muszą używać tego samego formatu połączenia. | | message.max.bajtów | 1000000 | Maksymalny rozmiar wiadomości, jakie serwer może odebrać. Ważne jest, aby ustawienia konsumenta i producenta dotyczące tej właściwości były zsynchronizowane, w przeciwnym razie komunikat publikowany przez producenta jest zbyt duży dla konsumenta. | | num.network.threads | 3 | Liczba wątków sieciowych używanych przez serwer do przetwarzania żądań sieciowych; Zazwyczaj nie musisz zmieniać tej nieruchomości. | | num.io.threads | 8 | Liczba wątków I/O używanych przez serwer do przetwarzania żądań; Liczba wątków powinna być co najmniej równa liczbie dysku twardych. | | tło.wątki | 4 | Liczba wątków używanych do przetwarzania w tle, takich jak usuwanie plików; Nie musisz zmieniać tej nieruchomości. | | queued.max.requests | 500 | Maksymalna liczba żądań, które mogą zostać zakolejone, aby wątek I/O przetworzył, zanim wątek sieciowy przestanie odczytywać nowe żądania. | | host.name | zero | nazwa hosta brokera; Jeśli nazwa hosta jest już ustawiona, broker będzie powiązany tylko z tym adresem; Jeśli nie ma ustawienia, system wiąże się ze wszystkimi interfejsami i publikuje jedną kopię na ZK | | advertised.host.name | zero | Jeśli jest ustawiony, jest wysyłany do producentów, konsumentów i innych brokerów jako nazwa hosta brokera | | advertised.port | zero | Port ten jest przyznawany producentom, konsumentom i innym pośrednikom i służy do nawiązania połączenia; Trzeba ustawić tylko wtedy, gdy faktyczny port i port, który serwer musi powiązać, są różne. | | socket.send.buffer.bytes | 100 * 1024 | SO_SNDBUFF rozmiar pamięci podręcznej, którą serwer wykorzystuje do łączenia gniazd | | socket.receive.buffer.bytes | 100 * 1024 | SO_RCVBUFF rozmiar pamięci podręcznej, który serwer wykorzystuje do łączenia się z gniazdami | | socket.request.max.bajtów | 100 * 1024 * 1024 | Maksymalny dopuszczalny rozmiar żądania przez serwer; Dzięki temu unikniesz przepełnień serwera, które powinny być mniejsze niż rozmiar sterty Java | | num.partitions | 1 | Jeśli liczba partycji nie jest podana podczas tworzenia tematu, będzie to domyślna liczba partycji pod tematem. | | log.segment.bytes | 1014*1024*1024 | Logi partycji tematu są przechowywane w wielu plikach w określonym katalogu, które dzielą logi partycji na segmenty; Ten atrybut to maksymalny rozmiar każdego pliku; Gdy wymiary osiągną tę wartość, tworzy się nowy plik. To ustawienie można zmienić na podstawie każdego tematu. Zobacz Logowanie do linku jest widoczne. | | log.roll.hours | 24 * 7 | Nawet jeśli plik nie dotrze do log.segment.bytes, nowy plik jest tworzony za każdym razem, gdy czas tworzenia pliku osiągnie tę właściwość. To ustawienie można również nadpisać przez ustawienia na poziomie tematu; WidokLogowanie do linku jest widoczne. | | log.cleanup.policy | usuń | | | log.retention.minutes oraz log.retention.hours | 7 dni | Czas zapisywania każdego pliku loga przed jego usunięciem. Domyślny czas oszczędzania danych jest taki sam dla wszystkich tematów. log.retention.minutes i log.retention.bytes są używane do ustawiania usuwania plików logów, niezależnie od tego, która właściwość została przepełniona. To ustawienie właściwości można nadpisać, gdy temat jest praktycznie ustawiony. WidokLogowanie do linku jest widoczne. | | log.retention.bytes | -1 | Całkowita ilość danych zapisanych przez każdą partycję pod każdym tematem; Należy zauważyć, że jest to górna granica na partycję, więc ta liczba pomnożona przez liczbę partycji to całkowita ilość danych przechowywanych na temat. Warto też zauważyć: jeśli zarówno log.retention.hours, jak i log.retention.bytes są ustawione, przekroczenie któregokolwiek z tych limitów spowoduje usunięcie pliku segmentu. Należy zauważyć, że to ustawienie można nadpisać przez każdy temat. WidokLogowanie do linku jest widoczne. | | log.retention.check.interval.ms | 5 minut | Sprawdź odstępy między plikami podzielonymi na logi, aby ustalić, czy atrybuty plików spełniają wymagania dotyczące usuwania. | | log.cleaner.enable | false | Gdy ta właściwość zostanie ustawiona na false, zostanie usunięta po zapisie logu przez maksymalny czas lub rozmiar. Jeśli ustawiono na true, stanie się to, gdy atrybut zapisu osiągnie górną granicęLogowanie do linku jest widoczne.。 | | log.cleaner.threads | 1 | Liczba wątków wykonujących kompresję logów | | log.cleaner.io.max.bajtów.na sekundę | Żaden | Maksymalna liczba wejść/wyjścia, jaką czyściciel logów może mieć podczas zagęszczania logów. To ustawienie ogranicza sprzątacza, aby nie zakłócał aktywnych usług żądań. | | log.cleaner.io.buffer.size | 500*1024*1024 | Log Cleaner indeksuje logi podczas procesu czyszczenia i zmniejsza rozmiar używanej pamięci podręcznej. Najlepiej ustawić go dużo, aby zapewnić wystarczająco dużo pamięci. | | log.cleaner.io.buffer.load.factor | 512*1024 | Rozmiar kawałka I/O wymagany do czyszczenia logów. Nie musisz zmieniać tego ustawienia. | | log.cleaner.io.buffer.load.factor | 0.9 | współczynnik obciążenia tabeli skrótów używanej w oczyszczaniu logów; Nie musisz zmieniać tej opcji. | | log.cleaner.backoff.ms | 15000 | Przeprowadza się odstęp czasu, w którym kłoda jest sprzątana | | log.cleaner.min.cleanable.ratio | 0.5 | Ta konfiguracja kontroluje, jak często kompaktowak logów próbuje oczyścić logi (zakładaneLogowanie do linku jest widoczne.jest otwarta). Domyślnie unikaj czyszczenia więcej niż 50% kłod. Ten stosunek jest powiązany z maksymalną przestrzenią zajmowaną przez log kopii zapasowej (50% logów jest kompresowanych do 50%). Wyższa stawka oznacza mniej odpadów i więcej miejsca można efektywniej usuwać. To ustawienie można nadpisać w każdym ustawieniu tematu. WidokLogowanie do linku jest widoczne.。 | | log.cleaner.delete.retention.ms | 1day | czas przechowywania; Maksymalny czas przechowywania skompresowanych kłód; Jest to także maksymalny czas, jaki klient może konsumować wiadomości, a różnica między log.retention.minutes polega na tym, że jeden kontroluje nieskompresowane dane, a drugi kontroluje skompresowane dane, które zostaną nadpisane w określonym czasie w momencie utworzenia tematu. | | log.index.size.max.bajtów | 10*1024*1024 | Maksymalny rozmiar na segment kłodu. Należy zauważyć, że jeśli rozmiar logu osiągnie tę wartość, należy wygenerować nowy segment logu, nawet jeśli jego rozmiar nie przekracza limitu log.segment.bytes. | | log.index.interval.bytes | 4096 | Podczas pobierania musisz zeskanować najbliższy offset z określoną przestrzenią, im większe ustawienie, tym lepiej, zazwyczaj używaj wartości domyślnej | | log.flush.interval.messages | Long.MaxValue | plik loga "synchronizuje" się z dysku przed gromadzeniem komunikatów. Ponieważ operacje dyskowego IO są powolne, ale jednocześnie niezbędne dla "niezawodności danych", sprawdź, czy wymagany jest odstęp czasu na wychłodzenie do dysku twardego. Jeśli ta wartość jest zbyt duża, spowoduje zbyt długi czas "synchronizacji" (blokowanie IO), jeśli jest zbyt mała, powoduje długi czas "fsync" (blokowanie IO), jeśli jest zbyt mała, powoduje to dużą liczbę czasów "synchronizacji", co oznacza, że całe żądanie klienta ma pewne opóźnienie, a awaria fizycznego serwera prowadzi do utraty wiadomości bez fsync. | | log.flush.scheduler.interval.ms | Long.MaxValue | Sprawdź, czy wymagane są interwały fsync | | log.flush.interval.ms | Long.MaxValue | Ta liczba służy do kontrolowania interwału czasowego "fsync" – jeśli liczba wiadomości nigdy nie osiągnie liczby wiadomości utrwalonych na dysku, ale odstęp czasu od ostatniej synchronizacji dysku osiągnie próg, synchronizacja dysku również zostanie wywołana. | | log.delete.delay.ms | 60000 | Czas przechowywania pliku po wyczyszczeniu pliku w indeksie zazwyczaj nie wymaga modyfikacji | | auto.create.topics.enable | true | Czy pozwolić na automatyczne tworzenie tematów. Jeśli to prawda, automatycznie utworzy temat, który nie istnieje, gdy nie istnieje "produce" lub fetch. W przeciwnym razie musisz użyć wiersza poleceń, aby utworzyć temat | | controller.socket.timeout.ms | 30000 | Czas przekroczenia czasu gniazda, gdy kontroler zarządzania partycjami wykonuje kopię zapasową. | | kontroler.message.queue.size | Int.MaxValue | controller-to-broker-channles | | default.replication.factor | 1 | Domyślna liczba kopii zapasowych odnosi się wyłącznie do automatycznie tworzonych tematów | | replica.lag.time.max.ms | 10000 | Jeśli podążający nie wyśle żądania pobierania w tym czasie, lider usuwa podążającego z ISR i uznaje go za zawieszonego | | replica.lag.max.messages | 4000 | Jeśli replika ma więcej niż tę liczbę niezabezpieczonych, lider usunie poddajnik i uzna go za powieszony | | replica.socket.timeout.ms | 30*1000 | Czas przekroczenia czasowego lidera dla żądań sieci socket podczas tworzenia kopii zapasowych danych | | replica.socket.receive.buffer.bytes | 64*1024 | Bufor odbiorczy gniazda podczas wysyłania żądania sieciowego do lidera podczas kopii zapasowej | | replica.fetch.max.bajtów | 1024*1024 | Maksymalna wartość każdego pobrania w momencie kopii zapasowej | | replica.fetch.min.bytes | 500 | Maksymalny czas oczekiwania na dotarcie danych do lidera po wykonaniu żądania kopii zapasowej przez lidera | | replica.fetch.min.bytes | 1 | Najmniejszy rozmiar odpowiedzi po każdym pobieraniu podczas kopii zapasowej | | num.replica.fetchers | 1 | Liczba wątków, które tworzą kopie zapasowe danych z lidera | | replica.high.watermark.checkpoint.interval.ms | 5000 | Każda replika sprawdza, jak często najwyższy poziom wody jest utwardzany | | fetch.purgatory.purge.interval.requests | 1000 | pobierz żądanie przedziału czyszczenia | | producer.purgatory.purge.interval.requests | 1000 | producent prosi o interwał czyszczenia | | zookeeper.session.timeout.ms | 6000 | Przerwa na sesję opiekuna zoo. | | zookeeper.connection.timeout.ms | 6000 | Maksymalny czas, jaki klient czeka na nawiązanie połączenia z Zookeeperem | | zookeeper.sync.time.ms | 2000 | Zwolennik ZK przez najdłuższy czas pozostaje w tyle za liderem ZK | | controlled.shutdown.enable | true | Czy możliwe jest kontrolowanie zamknięcia brokera. Jeśli to możliwe, broker będzie mógł przenieść wszystkich liderów do innych brokerów przed zamknięciem transakcji. To zmniejsza niedostępność podczas procesu wyłączania. | | controlled.shutdown.max.retries | 3 | Liczba poleceń, które mogą skutecznie wykonać wyłączenie przed wykonaniem niepełnego wyłączenia. | | controlled.shutdown.retry.backoff.ms | 5000 | Czas odcięcia między wyłączeniami | | auto.leader.rebalance.enable | true | Jeśli to prawda, kontroler automatycznie zrównoważy przywództwo brokerów na partycjach | | lider.nierównowaga.za.broker.procent | 10 | Maksymalny wskaźnik nierównowagi dozwolony przez każdego brokera | | lider.nierównowaga.sprawdź.interwał.sekundy | 300 | Sprawdź częstotliwość nierównowagi lidera | | offset.metadata.max.bajtów | 4096 | Pozwala klientom zapisać maksymalną liczbę swoich offsetów | | max.connections.per.ip | Int.MaxValue | Maksymalna liczba połączeń na brokera może być nawiązana na każdy adres IP | | max.connections.per.ip.overrides | | Maksymalny zasięg domyślnego połączenia dla adresu IP lub nazwy hosta | | connections.max.idle.ms | 600000 | Limit czasowy dla pustych połączeń | | log.roll.jitter. {ms,hours} | 0 | Maksymalna liczba drgań pobranych z logRollTimeMillis | | num.recovery.threads.per.data.dir | 1 | Liczba wątków, z których każdy katalog danych służy do logowania odzyskiwania | | unclean.leader.election.enable | true | Wskazuje, czy możliwe jest użycie ustawienia non-replicas w ISR jako lidera | | delete.topic.enable | false | Możliwość usuwania tematów | | offsets.topic.num.partitions | 50 | Liczba partycji dla tematu commitu offset. Ponieważ zmiana tego po wdrożeniu jest obecnie nieobsługiwana, zalecamy użycie wyższego ustawienia produkcyjnego (np. 100-200). | | offsets.topic.retention.minutes | 1440 | Przesunięcia, które istnieją dłużej niż ten limit, będą oznaczane jako oczekujące na usunięcie | | offsets.retention.check.interval.ms | 600000 | Menedżer offsetów sprawdza częstotliwość przesunięcia nieaktualnych | | offsets.topic.replication.factor | 3 | Liczba kopii zapasowych offsetu tematu. Zaleca się ustawianie wyższych numerów, aby zagwarantować większą dostępność | | offset.topic.segment.bytes | 104857600 | Temat offsetów. | | offsets.load.buffer.size | 5242880 | To ustawienie jest powiązane z rozmiarem partii i jest używane podczas odczytu z segmentu offsetów. | | offsets.commit.required.acks | -1 | Liczba potwierdzeń musi być ustalona, zanim commit offsetu będzie akceptowalny i zazwyczaj nie wymaga zmiany |
| Własność | Domyślny | Domyślna właściwość serwera | Opis | | cleanup.policy | usuń | log.cleanup.policy | Albo "usunąć" lub "zkompaktować"; Ten ciąg wskazuje, jak wykorzystać starą część logarytmu; Domyślna metoda ("usuń") odrzuca starą część, gdy osiągnie się limit czasu recyklingu lub rozmiaru. "kompaktowy" skompresuje logi | | delete.retention.ms | 86400000 (24 godziny) | log.cleaner.delete.retention.ms | Różnica między log.retention.minutes polega na tym, że jeden kontroluje dane nieskompresowane, a drugi kontroluje skompresowane dane. Konfiguracja ta może być nadpisana przez parametry przypinania podczas tworzenia tematu | | flush.messages | Żaden | log.flush.interval.messages | Ta konfiguracja określa przedział czasowy wymuszający logi fsync. Na przykład, jeśli ta opcja jest ustawiona na 1, fsync jest wymagany po każdej wiadomości, a jeśli ustawiony na 5, fsync jest wymagany dla każdego 5 wiadomości. Ogólnie zaleca się, aby nie ustawiać tej wartości. Ustawienie tego parametru wymaga koniecznego kompromisu między "niezawodnością danych" a "wydajnością". Jeśli ta wartość jest zbyt duża, powoduje to długi czas "fsync" za każdym razem (blokowanie IO), a jeśli jest zbyt mała, powoduje to dużą liczbę "fsync", co oznacza również pewne opóźnienie w ogólnym żądaniu klienta. Fizyczna awaria serwera spowoduje utratę wiadomości bez fsync. | | flush.ms | Żaden | log.flush.interval.ms | Ta konfiguracja służy do przypinania czasu między wymuszaniem logów fsync na dysku; Na przykład, jeśli ustawisz na 1000, fsync jest wymagany co 1000ms. Ta opcja zazwyczaj nie jest zalecana | | index.interval.bytes | 4096 | log.index.interval.bytes | Domyślne ustawienie zapewnia, że dodajemy indeks do wiadomości co 4096 bajtów, a więcej indeksów sprawia, że wiadomość odczytu jest bliżej, ale rozmiar indeksu zostanie zwiększony. Ta opcja zazwyczaj nie jest wymagana | | max.message.bytes | 1000000 | max.message.bytes | Maksymalny rozmiar komunikatu Kafka addend. Należy zauważyć, że jeśli zwiększysz ten rozmiar, musisz także zwiększyć rozmiar pobierania odbiorcy, aby konsument mógł pobierać wiadomości do tych maksymalnych rozmiarów. | | min.cleanable.dirty.ratio | 0.5 | min.cleanable.dirty.ratio | Ta konfiguracja kontroluje, jak często sprężarka bali próbuje oczyścić kłody. Domyślnie unika się logów o współczynniku kompresji powyżej 50%. Ten stosunek unika największego marnotrawstwa miejsca | | min.insync.replicas | 1 | min.insync.replicas | Gdy producent jest ustawiony na request.required.acks na -1, min.insync.replicas określa minimalną liczbę replik (każdy zapis repica musi zakończyć się sukcesem), a jeśli ta liczba nie zostanie osiągnięta, producent wygeneruje wyjątek. | | retention.bytes | Żaden | log.retention.bytes | Jeśli korzystasz z polityki retencji "delete", ta konfiguracja odnosi się do maksymalnego rozmiaru, jaki dziennik może osiągnąć przed usunięciem. Domyślnie nie ma limitu rozmiaru, a jedynie limit czasu | | retention.ms | 7 dni | log.retention.minutes | Jeśli korzystasz z polityki przechowywania "usuń", ta konfiguracja odnosi się do czasu, w którym log został zapisany przed zapisem do rejestru usunięcia. | | segment.bytes | 1GB | log.segment.bytes | W Kafce logi są przechowywane w kawałkach, a ta konfiguracja odnosi się do rozmiaru kłod podzielonych na kawałki | | segment.index.bytes | 10MB | log.index.size.max.bajtów | Ta konfiguracja jest mniej więcej wielkości pliku indeksowego przypisanego między offsety a lokalizacje plików; Ta konfiguracja zazwyczaj nie wymaga modyfikacji | | segment.ms | 7 dni | log.roll.hours | Nawet jeśli plik log chunk nie osiągnie rozmiaru wymaganego do usunięcia lub kompresji, gdy log time osiągnie ten górny limit, zostanie wymuszony nowy plik log chunk | | segment.jitter.ms | 0 | log.roll.jitter. {ms,hours} | Maksymalne drganie do odjęcia od logRollTimeMillis. |
Konfiguracje konsumenckie
| Własność | Domyślny | Opis | | group.id | | Ciąg znaków, który jednoznacznie identyfikuje grupę, w której znajduje się proces konsumencki, a jeśli ten sam identyfikator grupy jest ustawiony, oznacza to, że procesy te należą do tej samej grupy konsumenckiej | | zookeeper.connect | | Określ ciąg ciągu połączenia Zookeeper, format to hostname:port, tutaj host i port to zarówno host, jak i port serwera Zookeeper, aby uniknąć utraty kontaktu po awarii maszyny Zookeeper, możesz określić wiele nazw hosta:ports, używając przecinków jako rozdzielenia: nazwa hosta1:port1,nazwa hosta2:port2,nazwa hosta3:port3 Możesz dodać ścieżkę chroot ZooKeepera do ciągu połączenia ZooKeeper, który służy do przechowywania własnych danych, w taki sposób: hostname1:port1,hostname2:port2,hostname3:port3/chroot/path | | consumer.id | zero | Nie jest wymagana żadna konfiguracja i zazwyczaj generowana jest automatycznie | | socket.timeout.ms | 30*100 | Limity czasowe dla żądań sieciowych. Prawdziwy limit timeoutu to max.fetch.wait+socket.timeout.ms | | socket.receive.buffer.bytes | 64*1024 | Socket służy do odbierania rozmiaru pamięci podręcznej żądań sieciowych | | fetch.message.max.bajtów | 1024*1024 | Maksymalna liczba bajtów na wiadomość pobierania na jedno żądanie pobierania. Te bajty będą nadzorowane w pamięci używanej dla każdej partycji, więc to ustawienie będzie kontrolować ilość pamięci używanej przez użytkownika. Rozmiar żądania pobierania musi być co najmniej równy maksymalnemu rozmiarowi wiadomości dozwolonemu przez serwer, w przeciwnym razie rozmiar wiadomości, którą może wysłać producent, jest większy niż rozmiar, który konsument może konsumować. | | num.consumer.fetchers | 1 | Liczba wątków pobierania używanych do pobierania danych | | auto.commit.enable | true | Jeśli to prawda, przesunięcie wiadomości pobranej przez konsumenta zostanie automatycznie zsynchronizowane z opiekunem zookeepera. To przesunięcie commitów będzie używane przez nowego konsumenta, gdy proces się zawiesza | | auto.commit.interval.ms | 60*1000 | Częstotliwość, z jaką konsument zgłasza offset do opiekuna zoo, wynosi kilka sekund | | queued.max.message.chunks | 2 | Maksymalna liczba wiadomości używanych do buforowania do konsumpcji. Każdy chunk musi mieć takie samo jak fetch.message.max.bajty | | rebalance.max.retries | 4 | Gdy do grupy konsumentów dodaje się nowego konsumenta, grupa próbuje zrównoważyć liczbę partycji przypisanych każdemu konsumentowi. Jeśli kolekcja konsumencka się zmieni, to rebalansowanie zawodzi i ponownie się rejestruje podczas wykonywania alokacji | | fetch.min.bytes | 1 | Minimalna liczba bajtów, które serwer powinien zwracać przy każdym żądaniu pobierania. Jeśli nie zostanie zwrócona wystarczająca ilość danych, żądanie czeka na zwrot wystarczającej liczby danych. | | fetch.wait.max.ms | 100 | Jeśli nie ma wystarczającej ilości danych, aby spełnić fetch.min.bytes, ta konfiguracja odnosi się do maksymalnego czasu, jaki serwer zablokuje przed odpowiedzią na żądanie pobierania. | | rebalance.backoff.ms | 2000 | Czas na wycofanie przed ponowną próbą reblance | | refresh.leader.backoff.ms | 200 | Istnieje czas na odchylenie, zanim spróbuje się ustalić, czy przywódca podziału stracił przywództwo | | auto.offset.reset | największy | Jeśli w Zookeeper nie ma zainicjalizowanego offsetu, jeśli offset jest odpowiedzią na następującą wartość: najmniejsze: Automatyczny reset przesunięcia do najmniejszego przesunięcia największy: Automatyczny reset przesunięcie do przesunięcia największego Wszystko inne: To wyjątek dla konsumenta | | consumer.timeout.ms | -1 | Jeśli nie ma dostępnej wiadomości, nawet po określonym czasie oczekiwania, wyrzucany jest wyjątek timeoutu | | exclude.internal.topics | true | Czy udostępnić komunikaty z wewnętrznych tematów konsumentom | | paritition.assignment.strategy | Zasięg | Wybierz politykę przypisywania partycji do przepływu konsumenckiego, opcjonalnie zakres, roundrobin | | client.id | Wartość ID grupy | to specyficzny dla użytkownika ciąg danych, który pomaga śledzić wywołania w każdym żądaniu. Powinien logicznie potwierdzić aplikację, która wygenerowała żądanie | | zookeeper.session.timeout.ms | 6000 | Limity czasowe dla sesji opiekuna zoo. Jeśli konsument nie wyśle wiadomości o biciu serca do opiekuna zoo, uważa się, że została ona rozłączona i zostanie wygenerowana reblance | | zookeeper.connection.timeout.ms | 6000 | Maksymalny czas oczekiwania na nawiązanie połączenia z Zookeeperem | | zookeeper.sync.time.ms | 2000 | Zwolennicy ZK mogą maksymalnie opóźniać lidera ZK | | offsets.storage | Opiekun Zoo | Miejsca przechowywania offsetów: Zookeeper lub Kafka | | offset.channel.backoff.ms | 1000 | Czas cofnięcia przy ponownym połączeniu z kanałem offsetów lub ponownej próbie żądania pobrania/commitu nieudanego offsetu | | offsets.channel.socket.timeout.ms | 10000 | Limit czasowy limitu gniazda dla odpowiedzi na odpowiedź żądania pobrania/zatwierdzenia podczas odczytu offsetu. Ten limit czasu jest wykorzystywany przez żądanie consumerMetadata do żądania zarządzania offsetami | | offsets.commit.max.retries | 5 | Liczba powtórzeń commitu offsetowego. Ta próba jest stosowana tylko do commitów offsetowych między wyłączeniami. On | | dual.commit.enabled | true | Jeśli użyjesz "kafka" jako offsets.storage, możesz commitować offset do zookeepera dwa razy (i raz do kafki). Jest to konieczne podczas migracji z magazynu offsetowego opartego na Zookeeper na magazyn offsetowy oparty na Kafce. Dla każdej grupy konsumentów bezpiecznie zaleca się wyłączenie tej opcji po zakończeniu migracji | | partition.assignment.strategy | Zasięg | Wybierz politykę "zakresu" i "roundrobin" jako politykę przypisywania partycji do przepływów danych konsumenckich; Alokator partycji kołowej alokuje wszystkie dostępne partycje oraz wszystkie dostępne wątki konsumenckie. Przypisze pętlę partycyjną do wątku konsumenckiego. Jeśli wszystkie instancje konsumenckie są subskrybowane do determined, partycje dzielą się na rozkłady deterministyczne. Strategia alokacji round-robin jest możliwa tylko wtedy, gdy spełnione są następujące warunki: (1) Każdy temat ma taką samą liczbę przepływów danych na liczbę konsumentów. (2) Zbiór subskrybowanych tematów jest określany dla każdego przypadku konsumenckiego w grupie konsumenckiej. |
Konfiguracje producentów
| Własność | Domyślny | Opis | | metadata.broker.list | | Obsługa sama na własne podstawy. Producer służy tylko do pobierania metadanych (tematy, partycje, repliki). Połączenie socket do przesyłania rzeczywistych danych zostanie ustanowione na podstawie zwróconych metadanych. Format jest następujący: host1:port1,host2:port2 Ta lista może być podlistą brokerów lub VIP-em wskazującym na brokerów | | request.required.acks | 0 | Ta konfiguracja jest wartością potwierdzającą, która wskazuje, kiedy żądanie produce jest uznawane za zakończone. W szczególności, ilu innych brokerów musiało przesłać dane do swoich logów i potwierdzić je swojemu liderowi. Typowe wartości obejmują: 0: Oznacza, że producent nigdy nie czeka na potwierdzenie od brokera (to samo zachowanie co 0.7). Ta opcja zapewnia najmniejsze opóźnienia, ale jednocześnie największe ryzyko (ponieważ dane są tracone podczas awarii serwera). 1: Wskazuje, że replika lidera otrzymała potwierdzenie danych. Ta opcja ma niskie opóźnienia i zapewnia, że serwer potwierdza odbiór. -1: Producent otrzymuje potwierdzenie, że wszystkie zsynchronizowane repliki otrzymały dane. Opóźnienie jest największe, jednak ta metoda nie eliminuje całkowicie ryzyka utraty wiadomości, ponieważ liczba zsynchronizowanych replik może wynosić 1. Jeśli chcesz mieć pewność, że niektóre repliki otrzymują dane, powinieneś ustawić opcję min.insync.replicas w ustawieniach na poziomie tematu. Przeczytaj dokumentację projektową, aby uzyskać bardziej szczegółową dyskusję. | | request.timeout.ms | 10000 | Broker stara się jak najlepiej wdrożyć wymóg request.required.acks, w przeciwnym razie do klienta zostanie wysłany błąd | | producer.type | synchronizacja | Ta opcja przypina, czy wiadomość jest wysyłana asynchronicznie w wątku tła. Poprawne wartości: (1) asynchroniczny: Wysyłanie asynchroniczne (2) synchronizacja: Synchronizowane wysyłanie Ustawiając producenta na asynchroniczny, możemy przetwarzać żądania w partiach (co jest dobre dla wyższej przepustowości), ale jednocześnie istnieje możliwość, że komputer kliencki straci niewysłane dane | | serializer.class | kafka.serializer.DefaultEncoder | Kategoria serializacji wiadomości. Domyślny enkoder wprowadza jeden bajt[] i zwraca ten sam bajt[] | | key.serializer.class | | Klasa serializacji dla słów kluczowych. Jeśli nie jest to dane, domyślnie dopasowuje się do wiadomości | | partitioner.class | kafka.producer.DefaultPartitioner | klasa partycjonera do dzielenia wiadomości między podtematy. Domyślny partycjoner opiera się na tablicy skrótów klucza | | compression.codec | żaden | Ten parametr może ustawić kodek do kompresji danych, który można wybrać jako "none", "gzip", "snappy". | | compressed.topics | zero | Ten parametr może być używany do określenia, czy niektóre tematy są kompresowane. Jeśli skompresowany kodek jest kodekiem innym niż NoCompressCodec, te kodeki są stosowane do określonych danych tematów. Jeśli lista skompresowanych tematów jest pusta, zastosuj konkretny skompresowany kodek do wszystkich tematów. Jeśli skompresowany kodek to NoCompressionCodec, kompresja nie jest dostępna dla wszystkich tematów. | | message.send.max.retries | 3 | Ten parametr spowoduje, że producent automatycznie powtóri próbę nieudanych żądań wysłania. Ten parametr przypina liczbę powtórzeń. Uwaga: Ustawienie wartości nie0 spowoduje powtarzanie się niektórych błędów sieciowych: powoduje wysłanie i utratę potwierdzenia | | retry.backoff.ms | 100 | Przed każdą powtórną próbą producent aktualizuje metadane odpowiedniego tematu, aby sprawdzić, czy nowy lider został przypisany. Ponieważ wybór lidera zajmuje trochę czasu, ta opcja określa, jak długo producent musi czekać przed aktualizacją metadanych. | | topic.metadata.refresh.interval.ms | 600*1000 | Producent zazwyczaj aktualizuje metadane tematu w niektórych scenariuszach awarii (brak partycji, niedostępny lider itp.). Przechodzi przez regularny cykl. Jeśli ustawisz wartość ujemną, metadane zostaną zaktualizowane tylko wtedy, gdy się nie powiedzie. Jeśli ustawiono na 0, metadane są aktualizowane po wysłaniu każdej wiadomości (ta opcja nie jest zalecana, ponieważ system zużywa zbyt dużo). Ważne: Aktualizacje następują dopiero po wysłaniu wiadomości, więc jeśli producent nigdy nie wyśle wiadomości, metadane nie są aktualizowane. | | queue.buffering.max.ms | 5000 | Maksymalny odstęp czasu, w którym użytkownik zapisuje dane w trybie asynchronicznym. Na przykład, jeśli wiadomość jest ustawiona na 100, wiadomości w czasie 100 ms będą przetwarzane w partiach. To poprawi przepustowość, ale zwiększy opóźnienia dzięki buforowaniu. | | queue.buffering.max.messages | 10000 | W trybie asynchronicznym maksymalna liczba niewysłanych wiadomości, które można zbuforować do kolejki przed producentem, musi zostać zablokowana lub utracone dane | | batch.num.messages | 200 | W trybie asynchronicznym możesz przetwarzać maksymalną liczbę wiadomości wsadowo. Albo liczba wiadomości dotarła do tego online albo queue.buffer.max.ms już dotarł, a producent je przetwarza | | send.buffer.bytes | 100*1024 | Rozmiar pamięci podręcznej do zapisu gniazda | | client.id | “” | Identyfikator klienta to specyficzny dla użytkownika ciąg tekstu, który jest uwzględniany w każdym żądaniu śledzenia połączenia i logicznie powinien on móc potwierdzić, że aplikacja wykonała to żądanie. |
Konfiguracje producentów
| Nazwa | Typ | Domyślny | Znaczenie | Opis | | boostrap.servers | Lista | | wysoki | Grupa host/port do nawiązania połączenia z klastrem Kafka. Dane będą ładowane równomiernie na wszystkich serwerach, niezależnie od tego, który serwer jest przeznaczony do bootstrapowania. Ta lista dotyczy tylko zainicjowanych hostów (które służą do wykrywania wszystkich serwerów). Ten format listy:
host1:port1,host2:port2,... Ponieważ te serwery służą jedynie do inicjalizacji połączeń w celu odkrycia wszystkich członków klastra (które mogą się dynamicznie zmieniać), lista ta nie musi zawierać wszystkich serwerów (możesz chcieć więcej niż jeden serwer, choć w tym przypadku jeden serwer może być niedostępny). Jeśli żaden serwer nie pojawi się na tej liście, wysyłanie danych nie ulegnie, dopóki lista nie będzie dostępna. | | Acks | struna | 1 | wysoki | Producent potrzebuje sygnału od serwera, aby potwierdzić odbiór po otrzymaniu danych, a ta konfiguracja odnosi się do liczby takich sygnałów potwierdzających potrzebnych przez wykonawcę. Ta konfiguracja faktycznie oznacza dostępność kopii zapasowych danych. Następujące ustawienia są powszechnymi opcjami: (1) acks=0: Ustawiony na 0 oznacza, że producent nie musi czekać na potwierdzenie otrzymanych informacji. Replika zostanie natychmiast dodana do bufora gniazda i uznana za wysłaną. W tym przypadku nie ma gwarancji, że serwer pomyślnie odebrał dane, a ponowna próba konfiguracji nie zadziała (ponieważ klient nie wie, czy się nie powiodła), a przesunięcie sprzężenia zwrotnego zawsze zostanie ustawione na -1. (2) acks=1: Oznacza to, że przynajmniej czekamy, aż lider pomyślnie zapisze dane do lokalnego loga, ale nie wszyscy followerzy zapiszą się pomyślnie. W takim przypadku, jeśli towarzysz nie wykona poprawnej kopii zapasowej danych i lider ponownie się rozłączy, wiadomość zostanie utracona. (3) acks=all: Oznacza to, że lider musi czekać, aż wszystkie kopie zapasowe pomyślnie zapiszą logi, a ta strategia zapewni, że dane nie zostaną utracone, o ile jedna kopia zapasowa przetrwa. To najsilniejsza gwarancja. (4) Możliwe są także inne ustawienia, takie jak acks=2, które wymagają określonej liczby acks, ale ta strategia jest zazwyczaj rzadko stosowana. | | buffer.memory | długie | 33554432 | wysoki | Producent może być używany do buforowania rozmiaru pamięci danych. Jeśli dane są generowane szybciej niż są wysyłane do brokera, producent zablokuje lub wyrzuci wyjątek, oznaczany jako "block.on.buffer.full".
To ustawienie będzie powiązane z całkowitą pamięcią, którą producent może wykorzystać, ale nie jest to twardy limit, ponieważ nie cała pamięć używana przez producenta jest wykorzystywana do buforowania. Część dodatkowej pamięci służy do kompresji (jeśli kompresja jest wprowadzona), a część do żądań konserwacyjnych. | | kompresja.typ | struna | żaden | wysoki | Producent to rodzaj kompresji używany do kompresji danych. Domyślnie jest niekompresowany. Poprawne wartości opcji to zero, gzip, snappy. Kompresja najlepiej sprawdza się w przetwarzaniu wsadowym – im więcej wiadomości jest przetwarzanych w partiach, tym lepsza wydajność kompresji. | | Próby ponowne | int | 0 | wysoki | Ustawienie wartości większej niż 0 spowoduje, że klient ponownie wyśle dowolne dane, gdy te dane się nie wypowiedzą. Należy zauważyć, że te próby nie różnią się od tych, gdy klient otrzymuje błąd wysyłania. Pozwala na potencjalną zmianę kolejności danych, jeśli oba rekordy wiadomości zostaną wysłane na tę samą partycję, pierwsza wiadomość się nie powiedzie, a druga pojawi się wcześniej niż pierwsza. | | batch.size | int | 16384 | Średni | Producent będzie próbował wsadowo przetwarzać rekordy wiadomości, aby zmniejszyć liczbę żądań. To poprawi wydajność między klientem a serwerem. Ta konfiguracja kontroluje domyślną liczbę bajtów wiadomości przetwarzanych wsadowo. Nie podejmuje się prób przetwarzania bajtów wiadomości większych niż ta liczba bajtów. Żądania wysyłane do brokerów będą zawierać wiele partii, z których jedno żądanie dla każdej partycji. Mniejsze wartości grupowania są rzadziej używane i mogą zmniejszać przepustowość (0 będzie wymagało tylko przetwarzania wsadowego). Większe wartości wsadowe zużywają więcej miejsca w pamięci, więc musisz alokować pamięć pod konkretne wartości wsadowe. | | client.id | struna | | Średni | Ten ciąg jest wysyłany do serwera po złożeniu żądania. Celem jest możliwość śledzenia źródła żądań, aby umożliwić niektórym aplikacjom spoza listy IP/Portów przesyłanie informacji. Ta aplikacja może ustawić dowolny ciąg tekstów, ponieważ nie ma żadnego funkcji poza nagrywaniem i śledzeniem | | linger.ms | długie | 0 | Średni | Grupa producentów agreguje wszystkie wiadomości, które nadejdą między żądaniem a wysłaniem, rejestrując osobną partię żądań. Zazwyczaj dzieje się to tylko wtedy, gdy rekord jest generowany szybciej niż szybkość wysyłania. Jednak w pewnych warunkach klient będzie chciał zmniejszyć liczbę żądań, a nawet do umiarkowanego obciążenia. To ustawienie odbywa się poprzez dodanie niewielkiego opóźnienia – tzn. zamiast wysyłać rekord od razu, producent poczeka na określony czas opóźnienia, aby umożliwić wysłanie innych rekordów wiadomości, które mogą być grupowane. Można to uznać za podobny algorytm do TCP Nagle. To ustawienie wyznacza wyższą granicę opóźnień dla batch przetwarzania: gdy tylko uzyskamy rozmiar partii batch.size, wyśle ją natychmiast, niezależnie od tego ustawienia, ale jeśli otrzymamy wiadomość o znacznie mniejszej liczbie bajtów niż to ustawienie, musimy "zatrzymać" się w określonym czasie, aby otrzymać więcej wiadomości. To ustawienie domyślnie jest ustawione na 0, czyli brak opóźnienia. Ustawienie linger.ms=5, na przykład, zmniejszy liczbę żądań, ale jednocześnie zwiększy opóźnienie o 5 ms. | | max.request.size | int | 1028576 | Średni | Maksymalna liczba bajtów żądanych. Jest to również skuteczne pokrycie dla maksymalnego rejestrowanego rozmiaru. Uwaga: Serwer ma własne nadpisanie rozmiarów rekordów wiadomości, które różnią się od tego ustawienia. To ustawienie ogranicza liczbę żądań, które producenci mogą wysyłać hurtowo jednocześnie, aby zapobiec ich dużej liczbie. | | receive.buffer.bytes | int | 32768 | Średni | Rozmiar pamięci podręcznej odbiorcy TCP, który jest używany podczas odczytu danych | | send.buffer.bytes | int | 131072 | Średni | Rozmiar pamięci podręcznej TCP wysyłania danych, który jest używany podczas przesyłania danych | | timeout.ms | int | 30000 | Średni | Ta opcja konfiguracyjna kontroluje maksymalny czas, jaki serwer czeka na potwierdzenie od obserwujących. Jeśli liczba potwierdzonych żądań nie zostanie spełniona w tym czasie, zwracany jest błąd. Ten limit czasu jest mierzony po stronie serwera i nie ma opóźnień sieciowych, w tym żądań | | block.on.buffer.full | Boolean | true | niskie | Gdy kończy się nam pamięć podręczna, musimy przestać otrzymywać nowe rekordy wiadomości lub wyrzucać błędy. Domyślnie jest ustawione na true, jednak niektóre blokowanie może nie być warte oczekiwania, więc lepiej od razu zgłosić błąd. Tak jest w przypadku ustawienia false: producent wyrzuca błąd wyjątku: BufferExhaustedException, jeśli rekord został wysłany, a pamięć podręczna jest pełna | | metadata.fetch.timeout.ms | długie | 60000 | niskie | Odnosi się do danych o pierwszym czasie niektórych elementów, które uzyskaliśmy. Elementy obejmują: temat, host, partycje. Ta konfiguracja odnosi się do czasu potrzebnego do pomyślnego ukończenia elementu zgodnie z fetch, w przeciwnym razie klientowi zostanie wysłany wyjątek. | | metadata.max.age.ms | długie | 300000 | niskie | Czas w mikrosekundach to interwał, w którym wymuszamy aktualizację metadanych. Nawet jeśli nie zobaczymy żadnych zmian w przywództwie podziału. | | metric.reporters | Lista | [] | niskie | Lista klas używanych do pomiaru metryk. Implementacja interfejsu MetricReporter pozwoli na dodawanie klas, które zmieniają się wraz z generowaniem nowych metryk. JmxReporter zawsze będzie miał możliwość rejestracji statystyk JMX | | metrics.num.samples | int | 2 | niskie | Liczba próbek używanych do utrzymania metryk | | metrics.sample.window.ms | długie | 30000 | niskie | System Metrics utrzymuje konfigurowalną liczbę próbek w korygowalnym rozmiarze okna. Ta konfiguracja konfiguruje na przykład rozmiar okna. Możemy przechowywać dwie próbki przez okres 30 sekund. Gdy okno jest wysuwane, wymazujemy i przepisujemy najstarsze okno | | recoonect.backoff.ms | długie | 10 | niskie | Gdy połączenie się nie ulegnie, czas oczekiwania po ponownym połączeniu. To zapobiega powtarzającym się ponownym połączeniom klienta | | retry.backoff.ms | długie | 100 | niskie | Czas oczekiwania przed próbą ponownego spróbowania nieudanego zamówienia na warzywa. Unikaj utknięcia w martwą pętlę wysyłania i niepowodzenia.
|
|