Broker-Konfigurationen
| Eigentum | Vorgabe | Beschreibung | | broker.id | | Jeder Broker kann mit einer eindeutigen nicht-negativen Ganzzahl-ID identifiziert werden; Diese ID kann als "Name" des Brokers verwendet werden, und ihre Existenz ermöglicht es dem Broker, zu einem anderen Host/Port zu migrieren, ohne die Konsumenten zu verwirren. Du kannst jede beliebige Nummer als ID wählen, solange die ID eindeutig ist. | | log.dirs | /tmp/kafka-logs | Der Pfad, auf dem Kafka Daten speichert. Dieser Pfad ist nicht eindeutig, er kann mehrere sein, und die Pfade müssen nur durch Kommas getrennt sein; Wann immer eine neue Partition erstellt wird, wird sie so gewählt, dass sie dies unter dem Pfad mit der wenigsten Anzahl von Partitionen tut. | | Hafen | 6667 | Der Server akzeptiert den Port, mit dem der Client verbunden ist | | zookeeper.connect | null | Das Format der ZooKeeper-Verbindungsstring lautet: hostname:port, wobei Hostname und Port jeweils Host und Port eines Knotens im ZooKeeper-Cluster sind. Um sich mit anderen ZooKeeper-Knoten zu verbinden, wenn ein Host ausfällt, können Sie wie folgt mehrere Hosts erstellen:
hostname1:port1, hostname2:port2, hostname3:port3.
ZooKeeper erlaubt es, einen "chroot"-Pfad hinzuzufügen, um alle Kafka-Daten im Cluster unter einem bestimmten Pfad zu speichern. Wenn mehrere Kafka-Cluster oder andere Anwendungen denselben ZooKeeper-Cluster verwenden, können Sie mit dieser Methode den Datenspeicherpfad festlegen. Dies kann durch Formatierung der Verbindungszeichenkette wie folgt implementiert werden: hostname1:port1,hostname2:port2,hostname3:port3/chroot/path Dieses Setup speichert alle Kafka-Clusterdaten unter dem /chroot/path-Pfad. Beachten Sie, dass Sie vor Beginn des Brokers diesen Pfad erstellen müssen und die Kunden dasselbe Verbindungsformat verwenden müssen. | | message.max.bytes | 1000000 | Die maximale Anzahl der Nachrichten, die ein Server empfangen kann. Es ist wichtig, dass die Einstellungen von Konsument und Produzent bezüglich dieser Eigenschaft synchronisiert sind, sonst ist die vom Produzent veröffentlichte Botschaft für den Konsumenten zu groß. | | num.network.threads | 3 | Die Anzahl der Netzwerkthreads, die der Server zur Verarbeitung von Netzwerkanfragen verwendet; Sie müssen diese Immobilie in der Regel nicht ändern. | | num.io.threads | 8 | Die Anzahl der I/O-Threads, die der Server zur Bearbeitung von Anfragen verwendet; Die Anzahl der Threads sollte mindestens der Anzahl der Festplatten entsprechen. | | Hintergrund.Threads | 4 | Die Anzahl der Threads, die für die Hintergrundverarbeitung verwendet werden, wie zum Beispiel Dateilöschung; Sie müssen diese Immobilie nicht ändern. | | queued.max.Anfragen | 500 | Die maximale Anzahl an Anfragen, die für einen I/O-Thread in die Warteschlange gelegt werden kann, bevor ein Netzwerkthread aufhört, neue Anfragen zu lesen. | | host.name | null | Der Hostname des Brokers; Wenn der Hostname bereits gesetzt ist, ist der Broker nur an diese Adresse gebunden; Wenn keine Einstellung vorhanden ist, bindet es alle Schnittstellen und veröffentlicht eine Kopie im ZK | | advertised.host.name | null | Wenn gesetzt, wird es als Hostname des Brokers an Produzenten, Verbraucher und andere Vermittler gesendet | | advertised.port | null | Dieser Port wird Produzenten, Verbrauchern und anderen Vermittlern zugewiesen und dient dazu, eine Verbindung herzustellen; Es muss nur gesetzt werden, wenn der eigentliche Port und der Port, den der Server binden muss, unterschiedlich sind. | | socket.send.buffer.bytes | 100 * 1024 | SO_SNDBUFF Cache-Größe, die der Server verwendet, um Socket-Verbindungen herzustellen | | socket.receive.buffer.bytes | 100 * 1024 | SO_RCVBUFF Cache-Größe, die vom Server genutzt wird, um sich mit Sockets zu verbinden | | socket.request.max.bytes | 100 * 1024 * 1024 | Die maximal vom Server erlaubte Anforderungsgröße; Dadurch werden Serverüberläufe vermieden, die kleiner sein sollten als die Größe des Java-Heaps | | num.partitions | 1 | Wenn die Anzahl der Partitionen beim Erstellen eines Themas nicht angegeben ist, ist diese Zahl die Standardanzahl der Partitionen unter dem Thema. | | log.segment.bytes | 1014*1024*1024 | Die Logs der Topic-Partition werden in vielen Dateien in einem bestimmten Verzeichnis gespeichert, die die Logs der Partition in Segmente unterteilen; Dieses Attribut ist die maximale Größe jeder Datei; Wenn die Dimensionen diesen Wert erreichen, wird eine neue Datei erstellt. Diese Einstellung kann durch die Grundlage jedes Themas überschrieben werden. Ansehen Der Hyperlink-Login ist sichtbar. | | log.roll.hours | 24 * 7 | Selbst wenn die Datei log.segment.bytes nicht erreicht, wird eine neue Datei erstellt, sobald die Dateierstellungszeit diese Eigenschaft erreicht. Diese Einstellung kann auch durch Themenebene überschrieben werden; AnsehenDer Hyperlink-Login ist sichtbar. | | log.cleanup.policy | Löschen | | | log.retention.minutes und log.retention.hours | 7 Tage | Die Zeit, in der jede Logdatei gespeichert wurde, bevor sie gelöscht wurde. Die Standard-Datensparzeit ist für alle Themen gleich. log.retention.minutes und log.retention.bytes werden beide verwendet, um die Löschung von Logdateien festzulegen, unabhängig davon, welche Eigenschaft überlaufen wurde. Diese Eigenschaftseinstellung kann überschrieben werden, wenn das Thema im Grunde festgelegt ist. AnsehenDer Hyperlink-Login ist sichtbar. | | log.retention.bytes | -1 | Die Gesamtmenge der von jeder Partition unter jedem Thema gespeicherten Daten; Beachten Sie, dass dies die obere Grenze pro Partition ist, sodass diese Zahl multipliziert mit der Anzahl der Partitionen die Gesamtmenge der pro Thema gespeicherten Daten darstellt. Beachten Sie auch: Wenn sowohl log.retention.hours als auch log.retention.bytes gesetzt sind, führt das Überschreiten eines der beiden Limits dazu, dass eine Segmentdatei gelöscht wird. Beachten Sie, dass diese Einstellung von jedem Thema überschrieben werden kann. AnsehenDer Hyperlink-Login ist sichtbar. | | log.retention.check.interval.ms | 5 Minuten | Überprüfen Sie das Intervall zwischen den segmentierten Log-Dateien, um festzustellen, ob die Dateiattribute die Löschanforderungen erfüllen. | | log.cleaner.enable | false | Wenn diese Eigenschaft auf false gesetzt wird, wird sie gelöscht, sobald das Log für eine maximale Zeit oder Größe gespeichert ist. Wenn auf true gesetzt, passiert es, wenn das Speicherattribut das obere Limit erreichtDer Hyperlink-Login ist sichtbar.。 | | log.cleaner.threads | 1 | Die Anzahl der Threads, die Log-Kompression durchführen | | log.cleaner.io.max.bytes.per.second | Nichts | Die maximale Anzahl an I/O-Anlagen, die ein Logreiniger bei einer Logverdichtung haben kann. Diese Einstellung schränkt den Cleaner ein, um eine Störung aktiver Anfrageservices zu vermeiden. | | log.cleaner.io.buffer.size | 500*1024*1024 | Log Cleaner indexiert Logs während des Bereinigungsprozesses und verringert die Größe des verwendeten Caches. Am besten ist es, sie groß einzustellen, um ausreichend Speicher zu bieten. | | log.cleaner.io.buffer.load.factor | 512*1024 | Die Größe des I/O-Stücks, das für die Holzreinigung benötigt wird. Du musst diese Einstellung nicht ändern. | | log.cleaner.io.buffer.load.factor | 0.9 | Lastfaktor der Hashtabelle, die bei der Log-Reinigung verwendet wird; Du musst diese Option nicht ändern. | | log.cleaner.backoff.ms | 15000 | Das Zeitintervall, in dem das Protokoll bereinigt wird, wird ausgeführt | | Log.cleaner.min.cleanable.ratio | 0.5 | Diese Konfiguration steuert, wie oft der Logkompaktor versucht, die Logs zu bereinigen (angenommenDer Hyperlink-Login ist sichtbar.ist offen). Vermeiden Sie es standardmäßig, mehr als 50 % der Stämme zu reinigen. Dieses Verhältnis ist an den maximal benötigten Speicherplatz des Backup-Logbuchs gebunden (50 % der Logs werden bei 50 % komprimiert). Eine höhere Menge bedeutet weniger Abfall und mehr Fläche kann effizienter geräumt werden. Diese Einstellung kann in jeder Themeneinstellung überschrieben werden. AnsehenDer Hyperlink-Login ist sichtbar.。 | | log.cleaner.delete.retention.ms | 1 Tag | Lagerzeit; Die maximale Zeit zur Lagerung komprimierter Protokolle; Es ist außerdem die maximale Zeit, in der der Client Nachrichten konsumiert, und der Unterschied zwischen log.retention.minutes besteht darin, dass der eine die unkomprimierten Daten kontrolliert und der andere die komprimierten Daten, die bis zur angegebenen Zeit beim Erstellen des Themas überschrieben werden. | | log.index.size.max.bytes | 10*1024*1024 | Die maximale Größe pro Stammsegment. Beachten Sie, dass, wenn die Log-Größe diesen Wert erreicht, ein neues Log-Segment generiert werden muss, selbst wenn die Größe das log.segment.bytes-Limit nicht überschreitet. | | log.index.interval.bytes | 4096 | Beim Fetch musst du den nächstgelegenen Offset mit einer bestimmten Menge Platz scannen, je größer die Einstellung, desto besser, normalerweise den Standardwert verwenden | | log.flush.interval.messages | Long.MaxValue | Die Datei "synchronisiert" auf die Festplatte, bevor Nachrichten angesammelt werden. Da die Operationen der Festplatten-IO langsam, aber auch ein notwendiges Mittel der "Datenzuverlässigkeit" sind, prüfen Sie, ob das Zeitintervall zur Aushärtung auf die Festplatte erforderlich ist. Ist dieser Wert zu groß, führt das zu einer zu langen "Synchronisations"-Zeit (IO-Blocking), wenn dieser Wert zu klein ist, führt dies zu einer langen Zeit von "Fsync" (IO-Blocking), wenn dieser Wert zu klein ist, führt dies zu einer großen Anzahl von "Synchronisations"-Zeiten, was bedeutet, dass die gesamte Client-Anfrage eine gewisse Verzögerung hat, und der Ausfall des physischen Servers führt zum Verlust von Nachrichten ohne Fsync. | | log.flush.scheduler.interval.ms | Long.MaxValue | Prüfen Sie, ob FSync-Intervalle erforderlich sind | | log.flush.interval.ms | Long.MaxValue | Diese Zahl wird verwendet, um das Zeitintervall von "fsync" zu steuern; wenn die Anzahl der Nachrichten nie die an die Festplatte festgebundene Nachrichten erreicht, aber das Zeitintervall ab der letzten Festplattensynchronisation die Schwelle erreicht, wird auch die Festplattensynchronisation ausgelöst. | | log.delete.delay.ms | 60000 | Die Aufbewahrungszeit nach dem Löschen der Datei im Index muss in der Regel nicht geändert werden | | auto.create.topics.enable | true | Ob die automatische Erstellung von Themen erlaubt werden soll. Wenn zutreffend, wird automatisch ein Thema erstellt, das nicht existiert, wenn produce oder fetch nicht existiert. Andernfalls musst du die Kommandozeile verwenden, um ein Thema zu erstellen | | controller.socket.timeout.ms | 30000 | Die Zeitabbruchzeit des Sockets, wenn der Partitionsverwaltungscontroller ein Backup durchführt. | | controller.message.queue.size | Int.MaxValue | Controller-zu-Broker-Channles | | default.replication.factor | 1 | Die Standardanzahl der Backup-Kopien bezieht sich nur auf automatisch erstellte Themen | | replica.lag.time.max.ms | 10000 | Wenn ein Follower innerhalb dieser Zeit keine Fetch-Anfrage sendet, entfernt der Leader den Follower wieder aus dem ISR und betrachtet den Follower als gehängt | | replica.lag.max.Nachrichten | 4000 | Wenn eine Replik mehr als diese Anzahl von nicht gesicherten Exemplaren hat, entfernt der Führer den Follower und betrachtet den Follower als aufgehängt | | replica.socket.timeout.ms | 30*1000 | Leiter-Timeout-Zeit für Socket-Netzwerkanfragen beim Sichern von Daten | | replica.socket.receive.buffer.bytes | 64*1024 | Socket-Empfangspuffer beim Senden einer Netzwerkanfrage an den Leiter während der Sicherung | | replica.fetch.max.bytes | 1024*1024 | Der maximale Wert jedes Abrufs zum Zeitpunkt der Sicherung | | replica.fetch.min.bytes | 500 | Die maximale Wartezeit, bis Daten den Führer erreichen, wenn dieser eine Backup-Anfrage stellt | | replica.fetch.min.bytes | 1 | Die kleinste Größe der Antwort nach jedem Abruf beim Rückwärtsfahren | | num.replica.fetchers | 1 | Die Anzahl der Threads, die Daten vom Leader sichern | | replica.high.watermark.checkpoint.interval.ms | 5000 | Jede Replik prüft, wie oft der höchste Wasserspiegel ausgehärtet wird | | fetch.purgatory.purge.interval.requests | 1000 | Fetch-Anfrage für das Purge-Intervall | | producer.purgatory.purge.interval.requests | 1000 | Produzent fordert ein Purge-Intervall an | | zookeeper.session.timeout.ms | 6000 | Zookeeper-Sitzungspause. | | zookeeper.connection.timeout.ms | 6000 | Die maximale Wartezeit, die der Kunde auf die Aufnahme einer Verbindung mit dem Zookeeper warten muss | | zookeeper.sync.time.ms | 2000 | ZK-Follower hinkt ZK Leader lange Zeit hinterher. | | controlled.shutdown.enable | true | Ob es möglich ist, die Schließung des Brokers zu kontrollieren. Wenn möglich, kann der Broker alle Führungskräfte vor dem Abschluss zu anderen Brokern verlagern. Dies reduziert die Verfügbarkeit während des Abschaltungsprozesses. | | controlled.shutdown.max.wiederholte Versuche | 3 | Die Anzahl der Befehle, die eine Abschaltung erfolgreich ausführen können, bevor eine unvollständige Abschaltung durchgeführt wird. | | controlled.shutdown.retry.backoff.ms | 5000 | Rückzugszeit zwischen den Abschaltungen | | auto.leader.rebalance.enable | true | Wenn dies zutrifft, wird der Controller automatisch die Führung der Makler über die Partitionen ausgleichen. | | leader.imbalance.per.broker.percentage | 10 | Das maximal erlaubte Ungleichgewichtsverhältnis, das jeder Broker erlaubt | | leader.imbalance.check.interval.seconds | 300 | Überprüfen Sie die Häufigkeit des Führungsungleichgewichts | | offset.metadata.max.bytes | 4096 | Ermöglicht es den Kunden, die maximale Anzahl ihrer Offsets zu speichern | | max.connections.per.ip | Int.MaxValue | Die maximale Anzahl von Verbindungen pro Broker kann zu jeder IP-Adresse hergestellt werden | | max.connections.per.ip.overrides | | Die maximale Abdeckung der Standardverbindung pro IP oder Hostname | | connections.max.idle.ms | 600000 | Timeout-Limit für leere Verbindungen | | Log.roll.jitter. {ms,hours} | 0 | Die maximale Anzahl der von logRollTimeMillis abstrahierten Nervositäten | | num.recovery.threads.per.data.dir | 1 | Die Anzahl der Threads, die jedes Datenverzeichnis zur Protokollierung der Wiederherstellung verwendet | | unrein.anführer.wahl.aktivieren | true | Gibt an, ob es möglich ist, die Nicht-Repliken-Einstellung in ISR als Führer zu verwenden | | delete.topic.enable | false | Kann Themen löschen | | offsets.topic.num.partitions | 50 | Die Anzahl der Partitionen für das Offset-Commit-Thema. Da eine Änderung nach der Bereitstellung derzeit nicht unterstützt wird, empfehlen wir, für die Produktion eine höhere Einstellung zu verwenden (z. B. 100-200). | | offsets.topic.retention.minutes | 1440 | Verschiebungen, die länger als dieses Zeitlimit bestehen, werden als ausstehende Löschung markiert | | offsets.retention.check.interval.ms | 600000 | Der Offset-Manager prüft die Häufigkeit veralteter Offsets | | offsets.topic.replication.factor | 3 | Die Anzahl der Backup-Kopien des Themas ist Offset. Es wird empfohlen, höhere Zahlen einzustellen, um eine höhere Verfügbarkeit zu gewährleisten | | offset.topic.segment.bytes | 104857600 | Offset-Thema. | | offsets.load.buffer.size | 5242880 | Diese Einstellung hängt von der Batchgröße ab und wird beim Lesen aus dem Offset-Segment verwendet. | | offsets.commit.required.acks | -1 | Die Anzahl der Bestätigungen muss festgelegt werden, bevor das Offset-Commit akzeptabel ist, und muss in der Regel nicht geändert werden |
| Eigentum | Vorgabe | Server-Standardeigenschaft | Beschreibung | | Cleanup.Policy | Löschen | log.cleanup.policy | Entweder "löschen" oder "kompakt"; Diese Zeichenkette zeigt an, wie der alte Stammteil genutzt werden soll; Die Standardmethode ("Löschen") verwirft das alte Teil, wenn deren Wiedergebrauchszeit- oder Größenlimit erreicht ist. "kompakt" komprimiert die Logs | | delete.retention.ms | 864000000 (24 Stunden) | log.cleaner.delete.retention.ms | Der Unterschied zwischen log.retention.minutes besteht darin, dass das eine unkomprimierte Daten steuert und das andere die komprimierten Daten. Diese Konfiguration kann durch die Pinning-Parameter überschrieben werden, wenn das Thema erstellt wird | | flush.messages | Nichts | log.flush.interval.messages | Diese Konfiguration legt ein Zeitintervall fest, um Fsync-Logs zu erzwingen. Wenn diese Option beispielsweise auf 1 gesetzt ist, ist Fsync nach jeder Nachricht erforderlich, und wenn auf 5 steht, ist Fsync für jeweils 5 Nachrichten erforderlich. Im Allgemeinen wird empfohlen, diesen Wert nicht festzulegen. Die Festlegung dieses Parameters erfordert den notwendigen Kompromiss zwischen "Datenzuverlässigkeit" und "Leistung". Ist dieser Wert zu groß, führt dies zu einer langen Zeit zum "Fsync" jedes Mal (IO-Blocking), und wenn dieser Wert zu klein ist, führt dies zu einer großen Anzahl von "Fsync", was auch eine gewisse Verzögerung bei der gesamten Client-Anfrage bedeutet. Ein physischer Serverausfall führt dazu, dass Nachrichten ohne Fsync verloren gehen. | | flush.ms | Nichts | log.flush.interval.ms | Diese Konfiguration dient dazu, das Zeitintervall zwischen dem Erzwingen von fsync-Logs auf die Festplatte festzulegen; Wenn zum Beispiel auf 1000 eingestellt ist, ist Fsync alle 1000 ms erforderlich. Diese Option wird im Allgemeinen nicht empfohlen | | index.interval.bytes | 4096 | log.index.interval.bytes | Die Standardeinstellung stellt sicher, dass wir der Nachricht alle 4096 Byte einen Index hinzufügen, und weitere Indizes bringen die Lesenachricht näher, aber die Indexgröße wird erhöht. Diese Option ist in der Regel nicht erforderlich | | max.message.bytes | 1000000 | max.message.bytes | Die maximale Größe der Kafka-Anfügungsnachricht. Beachte, dass du, wenn du diese Größe vergrößerst, auch die Abrufgröße deines Konsumenten erhöhen musst, damit der Konsument Nachrichten auf diese maximalen Größen abrufen kann. | | Min.Reinigungsbar.Schmutzig.Verhältnis | 0.5 | Min.Reinigungsbar.Schmutzig.Verhältnis | Diese Konfiguration steuert, wie oft der Log-Kompressor versucht, die Logs zu löschen. Standardmäßig werden Logs mit einer Kompressionsrate von mehr als 50 % vermieden. Dieses Verhältnis vermeidet die größte Platzverschwendung | | min.insync.replicas | 1 | min.insync.replicas | Wenn der Producer auf request.required.acks auf -1 gesetzt ist, gibt min.insync.replicas die Mindestanzahl der Repliken an (jeder Repica-Schreib muss erfolgreich sein), und wenn diese Zahl nicht erreicht wird, erzeugt der Producer eine Ausnahme. | | retention.bytes | Nichts | log.retention.bytes | Wenn Sie die "Lösch"-Aufbewahrungsrichtlinie verwenden, bezieht sich diese Konfiguration auf die maximale Größe, die das Log vor seiner Löschung erreichen kann. Standardmäßig gibt es keine Größenbegrenzung, sondern nur eine Zeitbegrenzung | | retention.ms | 7 Tage | Log.Retention.Minutes | Wenn Sie die "Lösch"-Aufbewahrungsrichtlinie verwenden, bezieht sich diese Konfiguration auf die Zeit, in der das Log vor dem Löschprotokoll gespeichert wurde. | | segment.bytes | 1GB | log.segment.bytes | In Kafka werden Log-Logs in Chunks gespeichert, und diese Konfiguration bezieht sich auf die Größe der Log-Logs, die in Chunks unterteilt sind | | segment.index.bytes | 10MB | log.index.size.max.bytes | Diese Konfiguration entspricht etwa der Größe der Indexdatei, die zwischen Offsets und Dateistandorten abgebildet ist; Diese Konfiguration muss in der Regel nicht geändert werden | | segment.ms | 7 Tage | log.roll.hours | Selbst wenn die Log-Chunk-Datei nicht die zu löschen oder komprimierte Größe erreicht, wird nach Erreichen dieser oberen Grenze eine neue Log-Chunk-Datei erzwungen, sobald die Log-Chunk-Datei diese obere Grenze erreicht | | segment.jitter.ms | 0 | Log.roll.jitter. {ms,hours} | Der maximale Jitter, der von logRollTimeMillis abgezogen werden kann. |
Consumer-Konfigurationen
| Eigentum | Vorgabe | Beschreibung | | group.id | | Eine Zeichenkette, die eindeutig die Gruppe identifiziert, in der sich der Consumer-Prozess befindet, und wenn dieselbe Gruppen-ID gesetzt ist, bedeutet das, dass diese Prozesse zur gleichen Consumer-Gruppe gehören | | zookeeper.connect | | Spezifizieren Sie den String der Zookeeper-Verbindung, das Format ist hostname:port, hier sind Host und Port sowohl Host als auch Port des Zookeeper-Servers. Um einen Kontaktverlust nach einem Ausfall eines Zookeeper-Rechners zu vermeiden, können Sie mehrere hostname:ports angeben, wobei Kommas als Trennung dienen: Hostname1:Port1,Hostname2:Port2,Hostname3:Port3 Du kannst den Chroot-Pfad des ZooKeeper in die ZooKeeper-Verbindungsstring einfügen, die zur Speicherung eigener Daten verwendet wird, auf folgende Weise: hostname1:port1,hostname2:port2,hostname3:port3/chroot/path | | consumer.id | null | Es ist keine Einrichtung erforderlich, und sie wird in der Regel automatisch generiert | | socket.timeout.ms | 30*100 | Zeitlimits für Netzwerkanfragen. Das wahre Timeout-Limit ist max.fetch.wait+socket.timeout.ms | | socket.receive.buffer.bytes | 64*1024 | Socket wird verwendet, um die Cache-Größe von Netzwerkanfragen zu empfangen | | fetch.message.max.bytes | 1024*1024 | Die maximale Anzahl von Bytes pro Fetch-Nachricht pro Fetch-Anfrage. Diese Bytes werden im für jede Partition verwendeten Speicher überwacht, sodass diese Einstellung die vom Verbraucher verwendete Speichermenge steuert. Die Größe der Abrufanforderung muss mindestens der maximal vom Server erlaubten Nachrichtengröße entsprechen, andernfalls ist die Größe der vom Produzenten gesendeten Nachricht größer als die, die der Konsument konsumieren kann. | | num.consumer.fetchers | 1 | Die Anzahl der für Fetch-Daten verwendeten Fetcher-Threads | | auto.commit.enable | true | Falls zutreffend, wird der Offset der vom Konsumenten abgerufenen Nachricht automatisch mit dem Zookeeper synchronisiert. Dieser Commit-Offset wird vom neuen Konsumenten verwendet, wenn der Prozess abhängt | | auto.commit.interval.ms | 60*1000 | Die Häufigkeit, mit der der Verbraucher den Offset an den Tierpfleger abgibt, beträgt Sekunden | | queued.max.message.chunks | 2 | Die maximale Anzahl der Nachrichten, die zum Zwischenspeichern für den Konsum verwendet werden. Jeder Chunk muss mit fetch.message.max.bytes identisch sein | | rebalance.max. Versuche | 4 | Wenn ein neuer Konsument einer Konsumentengruppe hinzugefügt wird, versucht die Sammlung der Konsumenten, die Anzahl der jedem Verbraucher zugewiesenen Partitionen neu auszubalancieren. Wenn sich die Sammlung der Konsumenten ändert, scheitert dieses Rebalancing und wird bei der Ausführung der Allokation wieder aktiviert | | fetch.min.bytes | 1 | Die Mindestanzahl an Bytes, die der Server bei jeder Abrufanfrage zurückgeben sollte. Wenn nicht genügend Daten zurückgegeben werden, wartet die Anfrage, bis genügend Daten zurückgegeben sind. | | fetch.wait.max.ms | 100 | Wenn nicht genügend Daten vorhanden sind, um fetch.min.bytes zu erfüllen, bezieht sich diese Konfiguration auf die maximale Zeit, die der Server blockiert, bevor er auf eine Fetch-Anfrage reagiert. | | rebalance.backoff.ms | 2000 | Zeit zum Zurückziehen, bevor man Reblance erneut versucht | | refresh.leader.backoff.ms | 200 | Es gibt eine Zeit zum Abwarten, bevor man feststellen kann, ob der Führer einer Teilung seine Führung verloren hat | | auto.offset.reset | Der größte | Wenn in Zookeeper kein initialisierter Offset vorhanden ist, ist Offset eine Antwort auf folgenden Wert: Kleinste: Auto-Reset Offset auf kleinste Offset Größter: Auto-Reset Offset auf den Offset von größte Alles andere: Es gibt eine Ausnahme für den Verbraucher | | consumer.timeout.ms | -1 | Wenn keine Nachricht verfügbar ist, selbst nach einer bestimmten Wartezeit, wird eine Timeout-Ausnahme ausgelöst | | exkludieren.internal.topics | true | Ob Botschaften aus internen Themen den Verbrauchern zugänglich gemacht werden sollen | | parition.assignment.strategy | Verbreitung | Wählen Sie die Richtlinie zur Zuweisung von Partitionen zum Consumer-Flow, optional Range, Roundrobin | | client.id | Gruppen-ID-Wert | ist ein benutzerspezifischer String, der hilft, Aufrufe in jeder Anfrage zu verfolgen. Es sollte logisch bestätigen, welche Anwendung die Anfrage erstellt hat | | zookeeper.session.timeout.ms | 6000 | Zeitlimits für Tierpflegesitzungen. Wenn der Verbraucher während dieser Zeit keine Herzschlag-Nachricht an den Tierpfleger sendet, gilt diese als aufgehängt und es wird eine Reblance erzeugt | | zookeeper.connection.timeout.ms | 6000 | Die maximale Wartezeit für einen Kunden, um eine Zookeeper-Verbindung herzustellen | | zookeeper.sync.time.ms | 2000 | ZK-Follower können dem ZK-Leader für eine maximale Zeit hinterherhinken | | offsets.storage | Tierpfleger | Orte, an denen Offsets gelagert werden: Zookeeper oder Kafka | | offset.channel.backoff.ms | 1000 | Die Backoff-Zeit für die Wiederverbindung mit dem Offset-Kanal oder beim erneuten Versuch der Fetch/Commit-Anfrage des fehlgeschlagenen Offsets | | offsets.channel.socket.timeout.ms | 10000 | Die Socket-Timeout-Grenze für die Antwort auf die Fetch/Commit-Anfrage beim Leseverset. Dieses Timeout-Limit wird von der consumerMetadata-Anfrage verwendet, um Offset-Management anzufordern | | offsets.commit.max. Wiederholungen | 5 | Die Anzahl der Male, in denen das Offset-Commit erneut versucht wurde. Dieser Versuch wird nur auf Offset-Comits zwischen Abschaltvorgängen angewendet. Er | | dual.commit.enabled | true | Wenn du "Kafka" als Offsets.Storage verwendest, kannst du Offset zweimal auf Zookeeper übertragen (und einmal auf Kafka). Dies ist ein Muss bei der Migration von Zookeeper-basierter Offset-Speicherung auf Kafka-basierte Offset-Speicherung. Für jede beliebige Verbrauchergruppe ist es eine sichere Empfehlung, diese Option nach Abschluss der Migration auszuschalten. | | partition.assignment.strategy | Verbreitung | Wählen Sie zwischen den "Range"- und "Roundrobin"-Richtlinien als Richtlinie zur Zuweisung von Partitionen an Verbraucherdatenflüsse; Der zirkuläre Partitionsallokator weist alle verfügbaren Partitionen sowie alle verfügbaren Consumer-Threads zu. Er weist die Partitionsschleife dem Consumer-Thread zu. Wenn alle Consumer-Instanzen einem bestimmten Parameter zugeordnet sind, werden die Partitionen in deterministische Verteilungen unterteilt. Die Round-Robin-Allokationsstrategie ist nur möglich, wenn die folgenden Bedingungen erfüllt sind: (1) Jedes Thema hat die gleiche Anzahl von Datenflüssen pro Konsumentenstärke. (2) Die Sammlung der abonnierten Themen wird für jede Consumer-Instanz in der Consumer-Gruppe bestimmt. |
Producer-Konfigurationen
| Eigentum | Vorgabe | Beschreibung | | metadata.broker.list | | Bediene Bootstrapping. Producer wird nur verwendet, um Metadaten (Topics, Partitions, Replicas) zu erhalten. Die Socket-Verbindung zum Senden der tatsächlichen Daten wird auf Basis der zurückgegebenen Metadatendaten hergestellt. Das Format ist: host1:port1,host2:port2 Diese Liste kann eine Unterliste von Maklern oder ein VIP sein, der auf Makler verweist | | Anfrage.erforderlich.acks | 0 | Diese Konfiguration ist ein Bestätigungswert, der anzeigt, wann eine Produktionsanfrage als abgeschlossen gilt. Insbesondere, wie viele andere Makler müssen Daten in ihre Protokolle eingereicht und diese Informationen ihrem Leiter bestätigt haben. Typische Werte sind: 0: Zeigt an, dass der Produzent niemals auf eine Bestätigung vom Broker wartet (gleiches Verhalten wie bei 0,7). Diese Option bietet die geringste Latenz, aber gleichzeitig das größte Risiko (weil Daten verloren gehen, wenn der Server ausfällt). 1: Zeigt an, dass die Leader-Replik die Datenbestätigung erhalten hat. Diese Option hat eine geringe Latenz und stellt sicher, dass der Server bestätigt, dass es empfangen wurde. -1: Der Produzent erhält die Bestätigung, dass alle synchronisierten Repliken Daten erhalten haben. Die Latenz ist am größten, jedoch beseitigt diese Methode das Risiko verlorener Nachrichten nicht vollständig, da die Anzahl der synchronisierten Replikate 1 betragen kann. Wenn Sie sicherstellen möchten, dass einige Repliken Daten empfangen, sollten Sie in den Einstellungen auf Themenebene die Option min.insync.replicas setzen. Lesen Sie die Designdokumentation für eine ausführlichere Diskussion. | | request.timeout.ms | 10000 | Der Broker versucht sein Bestes, die request.required.acks-Anforderung umzusetzen, andernfalls wird ein Fehler an den Client gesendet | | producer.type | Synchronisation | Diese Option bestimmt, ob die Nachricht asynchron in einem Hintergrundthread gesendet wird. Korrekte Werte: (1) asynchron: asynchrones Senden (2) Synchronisation: Synchronisiertes Senden Indem wir den Producer auf asynchron setzen, können wir Anfragen in Batches verarbeiten (was für einen höheren Durchsatz gut ist), aber das entsteht auch die Möglichkeit, dass die Client-Maschine nicht gesendete Daten verliert | | serializer.class | kafka.serializer.DefaultEncoder | Die Serialisierungskategorie der Nachricht. Der Standard-Encoder gibt ein Byte[] ein und gibt dasselbe Byte zurück[] | | key.serializer.class | | Serialisierungsklasse für Schlüsselwörter. Wenn dies nicht angegeben ist, ist standardmäßig das Gleich mit der Nachricht | | partitioner.class | kafka.producer.DefaultPartitioner | Partitioner-Klasse zur Aufteilung von Nachrichten zwischen Unterthemen. Der Standard-Partitioner basiert auf der Hashtabelle des Schlüssels | | compression.codec | nichts | Dieser Parameter kann den Codec zur Komprimierung von Daten festlegen, der als "kein", "gzip", "snappy" ausgewählt werden kann. | | compressd.topics | null | Dieser Parameter kann verwendet werden, um festzulegen, ob bestimmte Themen komprimiert sind. Wenn der komprimierte Codec ein anderer Codec als NoCompressCodec ist, werden diese Codecs auf die angegebenen Themendaten angewendet. Wenn die Liste der komprimierten Themen leer ist, wenden Sie den spezifischen komprimierten Codec auf alle Themen an. Wenn der komprimierte Codec NoCompressionCodec ist, ist die Kompression nicht für alle Themen verfügbar. | | message.send.max. Wiederholungen | 3 | Dieser Parameter führt dazu, dass der Produzent automatisch fehlgeschlagene Send-Anfragen erneut versucht. Dieser Parameter bestimmt die Anzahl der Wiederholungen. Hinweis: Das Setzen eines Nicht-0-Werts führt dazu, dass bestimmte Netzwerkfehler wiederholt werden: Verursachen ein Senden und einen Verlust der Bestätigung | | retry.backoff.ms | 100 | Vor jedem erneuten Versuch aktualisiert der Produzent die Metadaten des relevanten Themas, um zu sehen, ob der neue Leader zugewiesen wurde. Da die Leader-Auswahl etwas Zeit in Anspruch nimmt, legt diese Option fest, wie lange der Produzent warten muss, bevor die Metadaten aktualisiert werden. | | topic.metadata.refresh.interval.ms | 600*1000 | Der Produzent aktualisiert in der Regel die Metadaten des Themas in einigen Ausfallszenarien (Partition fehlt, Leader nicht verfügbar usw.). Er durchläuft einen normalen Zyklus. Wenn du sie auf einen negativen Wert setzt, werden die Metadaten nur aktualisiert, wenn sie fehlschlägt. Ist auf 0 gesetzt, werden die Metadaten nach jeder gesendeten Nachricht aktualisiert (diese Option wird nicht empfohlen, das System verbraucht zu viel). Wichtig: Aktualisierungen finden erst nach dem Versand der Nachricht statt, sodass die Metadaten nie aktualisiert werden, wenn der Produzent die Nachricht nie sendet. | | queue.buffering.max.ms | 5000 | Das maximale Zeitintervall, in dem der Benutzer Daten zwischenspeichert, wenn der asynchrone Modus angewendet wird. Wenn die Nachricht beispielsweise auf 100 eingestellt ist, werden Nachrichten innerhalb von 100 ms in Chargen verarbeitet. Das verbessert den Durchsatz, erhöht aber die Latenz durch Caching. | | queue.buffering.max.Nachrichten | 10000 | Im asynchronen Modus muss die maximale Anzahl der nicht gesendeten Nachrichten, die vor dem Produzenten in die Warteschlange zwischengespeichert werden können, blockiert oder Daten verloren gehen | | batch.num.messages | 200 | Im asynchronen Modus kann man die maximale Anzahl an Nachrichten im Batch verarbeiten. Oder die Anzahl der Nachrichten hat dies online erreicht oder queue.buffer.max.ms ist angekommen, und der Produzent wird es bearbeiten | | send.buffer.bytes | 100*1024 | Socket-Schreib-Cache-Größe | | client.id | “” | Diese Client-ID ist eine benutzerspezifische Zeichenkette, die in jede Anfrage enthalten ist, um den Anruf zu verfolgen, und er sollte logisch bestätigen können, dass die Anwendung die Anfrage gestellt hat. |
Producer-Konfigurationen
| Name | Typ | Vorgabe | Bedeutung | Beschreibung | | boostrap.servers | Liste | | Hoch | Host/Port-Gruppe, um eine Verbindung zum Kafka-Cluster herzustellen. Die Daten werden gleichmäßig auf allen Servern geladen, unabhängig davon, welcher Server für das Bootstrapping vorgesehen ist. Diese Liste betrifft nur die initialisierten Hosts (die zur Entdeckung aller Server verwendet werden). Dieses Listenformat:
host1:port1,host2:port2,... Da diese Server nur zur Initialisierung von Verbindungen verwendet werden, um alle Mitgliedschaften des Clusters zu entdecken (die sich dynamisch ändern können), muss diese Liste nicht alle Server enthalten (man möchte vielleicht mehr als einen Server, obwohl in diesem Fall ein Server ausfallen kann). Wenn kein Server in dieser Liste erscheint, schlägt das Senden der Daten fehl, bis die Liste verfügbar ist. | | AKKS | Schnur | 1 | Hoch | Der Produzent benötigt ein Signal vom Server, um den Empfang nach Erhalt der Daten zu bestätigen, und diese Konfiguration bezieht sich darauf, wie viele solcher Bestätigungssignale der Procuder benötigt. Diese Konfiguration stellt tatsächlich die Verfügbarkeit von Daten-Backups dar. Die folgenden Einstellungen sind gängige Optionen: (1) acks=0: Auf 0 gesetzt bedeutet, dass der Produzent nicht auf eine Bestätigung der empfangenen Informationen warten muss. Die Replik wird sofort dem Socket-Puffer hinzugefügt und als gesendet betrachtet. In diesem Fall gibt es keine Garantie, dass der Server die Daten erfolgreich empfangen hat, und der Wiederholungsversuch der Konfiguration funktioniert nicht (weil der Client nicht weiß, ob sie fehlgeschlagen ist), und der Offset des Feedbacks wird immer auf -1 gesetzt. (2) acks=1: Das bedeutet, dass zumindest gewartet werden muss, bis der Leader die Daten erfolgreich ins lokale Logbuch schreibt, aber nicht, bis alle Follower erfolgreich geschrieben haben. In diesem Fall geht die Nachricht verloren, wenn der Follower die Daten nicht erfolgreich sichert und der Leader erneut auflegt. (3) acks=all: Das bedeutet, dass der Leader warten muss, bis alle Backups erfolgreich Logs geschrieben haben, und diese Strategie stellt sicher, dass Daten nicht verloren gehen, solange ein Backup überlebt. Das ist die stärkste Garantie. (4) Andere Einstellungen, wie Acks=2, sind ebenfalls möglich, was eine bestimmte Anzahl von Acks erfordert, aber diese Strategie wird im Allgemeinen selten verwendet. | | buffer.memory | lang | 33554432 | Hoch | Der Producer kann verwendet werden, um die Speichergröße der Daten zu cachen. Wenn die Daten schneller generiert werden, als sie an den Broker gesendet werden, blockiert oder wirft der Produzent eine Ausnahme, die durch "block.on.buffer.full" angezeigt wird.
Diese Einstellung bezieht sich auf den Gesamtspeicher, den der Produzent nutzen kann, stellt jedoch keine harte Begrenzung dar, da nicht der gesamte vom Produzent genutzte Speicher für das Caching verwendet wird. Ein zusätzlicher Speicher wird für die Kompression verwendet (falls Kompression eingeführt wird), ein anderer für Wartungsanfragen. | | compression.type | Schnur | nichts | Hoch | Producer ist die Art der Kompression, die zur Kompression von Daten verwendet wird. Der Standard ist unkomprimiert. Die korrekten Optionswerte sind keine, gzip, snappy. Kompression eignet sich am besten für die Batch-Verarbeitung; je mehr Nachrichten in Chargen verarbeitet werden, desto besser ist die Kompressionsleistung. | | Wiederversuche | Int | 0 | Hoch | Wenn ein Wert größer als 0 gesetzt wird, muss der Client alle Daten erneut senden, sobald diese Daten ausfallen. Beachten Sie, dass diese Wiederholungen sich nicht von denen unterscheiden, wenn der Client einen Sendefehler erhält. Erlaubt erneute Versuche, die Reihenfolge der Daten potenziell zu ändern; wenn beide Nachrichteneinträge an dieselbe Partition gesendet werden, schlägt die erste Nachricht fehl und die zweite Nachricht erscheint früher als die erste. | | batch.size | Int | 16384 | Mittel | Der Produzent versucht, die Nachrichtendatensätze zu batchen, um die Anzahl der Anfragen zu reduzieren. Das verbessert die Leistung zwischen Client und Server. Diese Konfiguration steuert die Standardanzahl der Bytes von Batch-Verarbeitungsnachrichten. Es werden keine Versuche unternommen, Nachrichtenbytes größer als diese Byteanzahl zu verarbeiten. Anfragen, die an Broker gesendet werden, enthalten mehrere Chargen, die jeweils eine Anfrage für jede Partition enthalten. Kleinere Batching-Werte werden weniger genutzt und können den Durchsatz verringern (0 nutzt nur Batch-Verarbeitung). Größere Chargenwerte verschwenden mehr Speicherplatz, daher müssen Sie Speicher für bestimmte Batch-Werte reservieren. | | client.id | Schnur | | Mittel | Dieser String wird an den Server gesendet, wenn eine Anfrage gestellt wird. Ziel ist es, die Quelle der Anfragen zurückverfolgen zu können, um einigen Anwendungen außerhalb der IP-/Port-Zulassungsliste Informationen zu ermöglichen. Diese App kann jede Zeichenkette setzen, da sie keine funktionale Funktion außer Aufnahme und Tracking hat | | linger.ms | lang | 0 | Mittel | Die Producer-Gruppe aggregiert alle Nachrichten, die zwischen Anforderung und Senden eintreffen, und zeichnet eine separate Charge von Anfragen auf. Typischerweise geschieht dies nur, wenn der Datensatz schneller als die Übertragungsrate erzeugt wird. Unter bestimmten Bedingungen möchte der Client jedoch die Anzahl der Anfragen reduzieren oder sogar auf eine moderate Last reduzieren. Diese Einrichtung erfolgt durch das Hinzufügen einer kleinen Verzögerung – d. h. anstatt einen Datensatz sofort zu senden, wartet der Produzent auf eine bestimmte Verzögerungszeit, damit andere Nachrichteneinträge gesendet werden können, die dann gestapelt werden können. Dies kann als ein ähnlicher Algorithmus wie TCP Nagle betrachtet werden. Diese Einstellung setzt eine höhere Latenzgrenze für das Batching: Sobald wir die batch.size einer Partition haben, wird sie sofort gesendet, unabhängig von dieser Einstellung, aber wenn wir eine Nachricht mit einer viel kleineren Bytezahl erhalten als diese Einstellung, müssen wir eine bestimmte Zeit "verweilen", um mehr Nachrichten zu erhalten. Diese Einstellung steht standardmäßig auf 0, also keine Verzögerung. Das Setzen von linger.ms=5 reduziert zum Beispiel die Anzahl der Anfragen, erhöht aber gleichzeitig die Verzögerung um 5 ms. | | max.request.size | Int | 1028576 | Mittel | Die maximale Anzahl der angeforderten Bytes. Dies ist auch eine effektive Abdeckung für die maximal aufgezeichnete Größe. Hinweis: Der Server hat eine eigene Überschreibung der Nachrichtendatengrößen, die sich von dieser Einstellung unterscheiden. Diese Einstellung begrenzt die Anzahl der Anfragen, die Produzenten gleichzeitig in großen Mengen senden können, um eine große Anzahl von Anfragen zu verhindern. | | receive.buffer.bytes | Int | 32768 | Mittel | TCP-Empfangscache-Größe, die beim Lesen von Daten verwendet wird | | send.buffer.bytes | Int | 131072 | Mittel | TCP-Send-Cache-Größe, die beim Datensenden verwendet wird | | timeout.ms | Int | 30000 | Mittel | Diese Konfigurationsoption steuert die maximale Zeit, in der der Server auf eine Bestätigung von Followern wartet. Wenn die Anzahl der bestätigten Anfragen innerhalb dieser Zeit nicht erfüllt ist, wird ein Fehler zurückgegeben. Dieses Timeout-Limit wird auf der Serverseite gemessen und hat keine Netzwerklatenz einschließlich Anfragen | | block.on.buffer.full | Boolean | true | Niedrig | Wenn unser Speichercache leer ist, müssen wir keine neuen Nachrichteneinträge mehr empfangen oder Fehler auslösen. Standardmäßig ist dies auf true gesetzt, allerdings ist ein gewisses Blocken vielleicht nicht zu erwarten, daher ist es besser, sofort einen Fehler zu geben. Dies ist der Fall, wenn auf false gesetzt wird: Der Producer wirft einen Ausnahmefehler: BufferExhaustedException, wenn der Datensatz gesendet wurde und der Cache voll ist | | metadata.fetch.timeout.ms | lang | 60000 | Niedrig | Es bezieht sich auf die Erstdaten einiger Elemente, die wir erhalten haben. Elemente umfassen: Thema, Host, Partitionen. Diese Konfiguration bezieht sich auf die Zeit, die das Element gemäß dem Abruf erfolgreich abschließen muss, andernfalls wird eine Ausnahme an den Client gesendet. | | metadata.max.age.ms | lang | 300000 | Niedrig | Die Zeit in Mikrosekunden ist das Intervall, in dem wir die Metadaten aktualisieren müssen. Auch wenn wir keine Führungswechsel bei der Teilung sehen. | | metric.reporter | Liste | [] | Niedrig | Eine Liste von Klassen, die zur Messung von Metriken verwendet werden. Die Implementierung der MetricReporter-Schnittstelle ermöglicht das Hinzufügen von Klassen, die sich ändern, wenn neue Metriken generiert werden. JmxReporter bietet immer eine Möglichkeit zur Registrierung von JMX-Statistiken | | metrics.num.samples | Int | 2 | Niedrig | Die Anzahl der Stichproben, die zur Pflege von Metriken verwendet werden | | metrics.sample.window.ms | lang | 30000 | Niedrig | Das Metriksystem speichert eine konfigurierbare Anzahl von Stichproben in einer korrigierbaren Fenstergröße. Diese Konfiguration konfiguriert zum Beispiel die Fenstergröße. Wir können zwei Proben für die Dauer von 30 Sekunden aufbewahren. Wenn ein Fenster ausgerollt wird, löschen und schreiben wir das älteste Fenster neu | | recoonect.backoff.ms | lang | 10 | Niedrig | Wenn die Verbindung ausfällt, ist die Wartezeit, wenn wir wieder verbunden sind. Dies vermeidet wiederholte Client-Wiederverbindungen | | retry.backoff.ms | lang | 100 | Niedrig | Die Wartezeit, bevor man versucht, eine fehlgeschlagene Obst- und Gemüseanfrage erneut zu versuchen. Vermeide es, in einer Send-Fail-Deadloop festzustecken.
|
|