Configurations de courtiers
| Propriété | Faire défaut | Description | | broker.id | | Chaque courtier peut être identifié par un identifiant entier unique non négatif ; Cet identifiant peut être utilisé comme « nom » du courtier, et son existence permet au courtier de migrer vers un autre hôte/port sans embrouiller les consommateurs. Vous pouvez choisir n’importe quel numéro comme pièce d’identité, tant que l’ID est unique. | | log.dirs | /tmp/kafka-logs | Le chemin où Kafka stocke les données. Ce chemin n’est pas unique, il peut être multiple, et les chemins n’ont besoin d’être séparés que par des virgules ; Chaque fois qu’une nouvelle partition est créée, elle est choisie de le faire sous le chemin contenant le moins de partitions. | | Port | 6667 | Le serveur accepte le port auquel le client est connecté | | zookeeper.connect | zéro | Le format de la chaîne de connexion ZooKeeper est : hostname :port, où hostname et port sont respectivement l’hôte et le port d’un nœud dans le cluster ZooKeeper. Pour se connecter à d’autres nœuds ZooKeeper lorsqu’un hôte tombe en panne, vous pouvez créer plusieurs hôtes comme suit :
hostname1:port1, hostname2:port2, hostname3:port3.
ZooKeeper permet d’ajouter un chemin « chroot » pour stocker toutes les données kafka du cluster sous un chemin spécifique. Lorsque plusieurs clusters Kafka ou autres applications utilisent le même cluster ZooKeeper, vous pouvez utiliser cette méthode pour définir le chemin de stockage des données. Cela peut être mis en œuvre en formatant la chaîne de connexion comme ceci : NomHôte1 :Port1,NomHôte2 :Port2,NomHôte3 :Port3/chroot/path Cette configuration stocke toutes les données du cluster kafka sous le chemin /chroot/path. Notez qu’avant de lancer le courtier, vous devez créer ce chemin, et les consommateurs doivent utiliser le même format de connexion. | | message.max.octets | 1000000 | La taille maximale des messages qu’un serveur peut recevoir. Il est important que les paramètres du consommateur et du producteur concernant cette propriété soient synchronisés, sinon le message publié par le producteur est trop important pour le consommateur. | | num.réseau.threads | 3 | Le nombre de threads réseau utilisés par le serveur pour traiter les requêtes réseau ; En général, vous n’avez pas besoin de changer cette propriété. | | num.io.threads | 8 | Le nombre de threads d’E/S utilisés par le serveur pour traiter les requêtes ; Le nombre de threads doit être au moins égal au nombre de disques durs. | | thread.background.threads | 4 | Le nombre de threads utilisés pour le traitement en arrière-plan, comme la suppression de fichiers ; Vous n’avez pas besoin de changer cette propriété. | | queued.max.demandes | 500 | Le nombre maximal de requêtes pouvant être mises en file d’attente pour qu’un thread d’E/S ne soit traité avant qu’un thread réseau cesse de lire les nouvelles requêtes. | | host.name | zéro | nom d’hôte du courtier ; Si le nom d’hôte est déjà défini, le courtier ne sera lié qu’à cette adresse ; S’il n’y a pas de réglage, il se liera à toutes les interfaces et publiera une copie sur le ZK | | advertised.host.name | zéro | S’il est fixé, il est envoyé aux producteurs, consommateurs et autres courtiers comme nom d’hôte du courtier | | publicited.port | zéro | Ce port est attribué aux producteurs, consommateurs et autres courtiers, et sert à établir une connexion ; Il n’est nécessaire de le définir que si le port réel et celui que le serveur doit lier sont différents. | | socket.envoi.buffer.octets | 100 * 1024 | SO_SNDBUFF Taille du cache, que le serveur utilise pour établir des connexions socket | | socket.receive.buffer.bytes | 100 * 1024 | SO_RCVBUFF taille du cache, utilisée par le serveur pour se connecter aux sockets | | socket.request.max.octets | 100 * 1024 * 1024 | La taille maximale de requête autorisée par le serveur ; Cela évitera les débordements de serveurs, qui devraient être plus petits que la taille du tas Java | | num.partitions | 1 | Si le nombre de partitions n’est pas donné lors de la création d’un sujet, ce nombre sera le nombre par défaut de partitions sous le sujet. | | log.segment.bytes | 1014*1024*1024 | Les journaux de la partition du sujet sont stockés dans de nombreux fichiers d’un certain répertoire, qui divisent les journaux de la partition en segments ; Cet attribut correspond à la taille maximale de chaque fichier ; Lorsque les dimensions atteignent cette valeur, un nouveau fichier est créé. Ce cadre peut être contourné par la base de chaque sujet. Vue La connexion hyperlientérée est visible. | | log.roll.hours | 24 * 7 | Même si le fichier n’atteint pas log.segment.bytes, un nouveau fichier est créé chaque fois que le moment de création atteint cette propriété. Ce paramètre peut également être remplacé par des réglages au niveau thématique ; VueLa connexion hyperlientérée est visible. | | log.cleanup.policy | supprimer | | | log.retention.minutes et log.retention.hours | 7 jours | Le moment où chaque fichier journal a été sauvegardé avant d’être supprimé. Le temps de sauvegarde par défaut des données est le même pour tous les sujets. log.retention.minutes et log.retention.bytes sont tous deux utilisés pour configurer la suppression des fichiers journals, quelle que soit la propriété dépassée. Ce réglage de propriété peut être remplacé lorsque le sujet est essentiellement défini. VueLa connexion hyperlientérée est visible. | | log.retention.bytes | -1 | La quantité totale de données enregistrées par chaque partition sous chaque sujet ; Notez que c’est la limite supérieure par partition, donc ce nombre multiplié par le nombre de partitions correspond à la quantité totale de données stockées par sujet. Notez aussi : si log.retention.hours et log.retention.bytes sont définis, dépasser l’une ou l’autre des limites entraînera la suppression d’un fichier segment. Notez que ce paramètre peut être contourné par chaque sujet. VueLa connexion hyperlientérée est visible. | | log.retention.check.interval.ms | 5 minutes | Vérifiez l’intervalle entre les fichiers segmentés dans les journaux pour déterminer si les attributs du fichier répondent aux exigences de suppression. | | log.cleaner.enable | false | Lorsque cette propriété est réglée sur fausse, elle sera supprimée une fois que le journal est stocké pendant une durée ou une taille maximale. Si elle est réglée sur true, cela se produira lorsque l’attribut de sauvegarde atteint la limite supérieureLa connexion hyperlientérée est visible.。 | | log.cleaner.threads | 1 | Le nombre de threads effectuant la compression logarithmique | | log.cleaner.io.max.octets.per.seconde | Aucun | Le nombre maximal d’E/S qu’un nettoyeur de logs peut avoir lors d’une compactation de logs. Ce réglage limite le nettoyeur afin d’éviter d’interférer avec les services de requête actifs. | | log.cleaner.io.buffer.size | 500*1024*1024 | Log Cleaner indexe les journaux pendant le processus de nettoyage et réduit la taille du cache utilisé. Il est préférable de la régler en grande taille pour offrir une mémoire suffisante. | | log.cleaner.io.buffer.load.factor | 512*1024 | La taille du bloc d’E/S nécessaire pour le nettoyage des billes. Vous n’avez pas besoin de changer ce réglage. | | log.cleaner.io.buffer.load.factor | 0.9 | le facteur de charge de la table de hachage utilisée pour le nettoyage des journaux ; Vous n’avez pas besoin de changer cette option. | | log.cleaner.backoff.ms | 15000 | L’intervalle de temps auquel le journal est nettoyé est effectué | | log.cleaner.min.cleanable.ratio | 0.5 | Cette configuration contrôle la fréquence à laquelle le compacteur de grumes tente de nettoyer les billots (supposéLa connexion hyperlientérée est visible.est ouvert). Par défaut, évitez de nettoyer plus de 50 % des bilmes. Ce rapport est lié à l’espace maximal occupé par le journal de sauvegarde (50 % des journaux sont compressés à 50 %). Un taux plus élevé signifie moins de gaspillage et plus d’espace qui peut être dégagé plus efficacement. Ce paramètre peut être remplacé dans chaque cadre. VueLa connexion hyperlientérée est visible.。 | | log.cleaner.delete.retention.ms | 1 jour | le temps de stockage ; Le temps maximal pour conserver les journaux compressés ; C’est aussi le temps maximal pour le client de consommer les messages, et la différence entre log.retention.minutes est que l’un contrôle les données non compressées et l’autre les données compressées, qui seront écrasées par le temps spécifié lors de la création du sujet. | | log.index.size.max.octets | 10*1024*1024 | La taille maximale par segment de log. Notez que si la taille logarithmique atteint cette valeur, un nouveau segment logarthmique doit être généré même si la taille ne dépasse pas la limite log.segment.octets. | | log.index.interval.bytes | 4096 | Lors d’une récupération, il faut scanner le décalage le plus proche avec un certain espace ; plus le réglage est grand, mieux c’est, utilisez généralement la valeur par défaut | | log.flush.interval.messages | Long.MaxValue | le fichier journal « se synchronise » sur le disque avant d’accumuler les messages. Comme les opérations d’E/S sur disque sont lentes, mais aussi un moyen nécessaire de « fiabilité des données », vérifiez si l’intervalle de temps nécessaire pour la guérison sur le disque dur est nécessaire. Si cette valeur est trop grande, elle entraînera un temps de « synchronisation » trop long (blocage d’E/S), si cette valeur est trop petite, cela entraînera une longue période de « fsync » (blocage d’E/S), si cette valeur est trop petite, cela entraînera un grand nombre de temps de « synchronisation », ce qui signifie que la requête client globale subit un certain délai, et la défaillance du serveur physique entraînera la perte de messages sans fsync. | | log.flush.scheduler.interval.ms | Long.MaxValue | Vérifiez si des intervalles fsync sont nécessaires | | log.flush.interval.ms | Long.MaxValue | Ce nombre sert à contrôler l’intervalle de temps du « fsync », si le nombre de messages n’atteint jamais le nombre de messages solidifiés sur le disque, mais que l’intervalle de temps à partir de la dernière synchronisation du disque atteint le seuil, la synchronisation du disque sera également déclenchée. | | log.delete.delay.ms | 60000 | Le temps de conservation après que le fichier a été effacé dans l’index n’a généralement pas besoin d’être modifié | | auto.create.topics.enable | true | S’il faut autoriser la création automatique de sujets. Si c’est vrai, cela créera automatiquement un sujet qui n’existe pas lorsque produire ou récupérer n’existe pas. Sinon, il faut utiliser la ligne de commande pour créer un sujet | | controller.socket.timeout.ms | 30000 | Le temps d’expiration du socket lorsque le contrôleur de gestion de partitions effectue une sauvegarde. | | contrôleur.message.queue.taille | Int.MaxValue | contrôleur vers-changement-de-courtier | | par défaut.réplication.factor | 1 | Le nombre par défaut de copies de sauvegarde ne concerne que les sujets créés automatiquement | | replica.lag.time.max.ms | 10000 | Si un suiveur n’envoie pas de demande de récupération dans ce délai, le leader retirera à nouveau le suiveur de l’ISR et considérera que le suiveur est suspendu | | replica.lag.max.messages | 4000 | Si une réplique a plus de ce nombre de non appuyés, le leader retirera le suiveur et considérera le suiveur comme suspendu | | replica.socket.timeout.ms | 30*1000 | Temps d’expiration du leader pour les requêtes réseau socket lors de la sauvegarde des données | | réplique.socket.receive.buffer.bytes | 64*1024 | Tampon de réception de socket lors de l’envoi d’une requête réseau au leader lors de la sauvegarde | | replica.fetch.max.octets | 1024*1024 | La valeur maximale de chaque récupération au moment de la sauvegarde | | réplique.fetch.min.bytes | 500 | Le temps d’attente maximal pour que les données atteignent le leader lorsque ce dernier effectue une demande de sauvegarde | | réplique.fetch.min.bytes | 1 | La taille la plus petite de la réponse après chaque récupération lors de la sauvegarde | | num.répliques.fetchers | 1 | Le nombre de threads qui sauvegardent les données du leader | | replica.high.watermark.checkpoint.interval.ms | 5000 | Chaque réplique vérifie à quelle fréquence le niveau d’eau le plus élevé est durci | | fetch.purgatory.purge.interval.requests | 1000 | demande de récupération de l’intervalle de purge | | producer.purgatory.purge.interval.requests | 1000 | le producteur demande un intervalle de purge | | zookeeper.session.timeout.ms | 6000 | Temps mort pour la session de soigneur de zoo. | | zookeeper.connection.timeout.ms | 6000 | Le temps maximum que le client attend avant d’établir un lien avec le soigneur du zoo | | zookeeper.sync.time.ms | 2000 | Le suiveur ZK est très longtemps derrière le leader ZK | | contrôlled.shutdown.enable | true | Est-il possible de contrôler la fermeture du courtier. Si possible, le courtier pourra transférer tous les dirigeants vers d’autres courtiers avant la clôture. Cela réduit l’indisponibilité pendant le processus d’arrêt. | | controlled.shutdown.max.réessais | 3 | Le nombre de commandes pouvant exécuter un arrêt avec succès avant d’effectuer un arrêt incomplet. | | controlled.shutdown.retry.backoff.ms | 5000 | Temps de recul entre les arrêts | | auto.leader.rebalance.enable | true | Si cela est vrai, le contrôleur équilibrera automatiquement le leadership des courtiers sur les partitions | | leader.déséquilibre.per.court.pourcentage | 10 | Le ratio d’instabilité maximale autorisé par chaque courtier | | leader.déséquilibre.vérifier.intervalle.secondes | 300 | Vérifiez la fréquence du déséquilibre du leader | | offset.metadata.max.octets | 4096 | Permet aux clients d’enregistrer le maximum de leurs décalages | | max.connections.per.ip | Int.MaxValue | Le nombre maximal de connexions par courtier peut être établi vers chaque adresse IP | | max.connections.per.ip.overrides | | La couverture maximale de la connexion par défaut par IP ou nom d’hôte | | connections.max.idle.ms | 600000 | Limite d’expiration pour les connexions vides | | log.roll.jitter. {ms,hours} | 0 | Le nombre maximal de gigues abstraites de logRollTimeMillis | | num.recovery.threads.per.data.dir | 1 | Le nombre de threads que chaque répertoire de données utilise pour enregistrer la récupération | | Imprévisible.Leader.Élection.Permettre | true | Indique s’il est possible d’utiliser le paramètre non-répliques dans ISR comme leader | | delete.topic.enable | false | Possibilité de supprimer des sujets | | décalages.topic.num.partitions | 50 | Le nombre de partitions pour le sujet de commit décalé. Puisque changer cela après déploiement n’est actuellement pas pris en charge, nous recommandons d’utiliser un réglage plus élevé pour la production (par exemple, 100-200). | | offsets.topic.retention.minutes | 1440 | Les décalages existant depuis plus longtemps seront marqués comme en attente de suppression | | offsets.retention.check.interval.ms | 600000 | Le gestionnaire de décalage vérifie la fréquence des décalages obsols | | décalages.topic.replication.factor | 3 | Le nombre de copies de secours du décalage du sujet. Il est recommandé de fixer des chiffres plus élevés pour garantir une meilleure disponibilité | | offset.topic.segment.bytes | 104857600 | Décale de sujet. | | offsets.load.buffer.size | 5242880 | Ce réglage est lié à la taille du lot et est utilisé lors de la lecture du segment de décalage. | | offsets.commit.required.acks | -1 | Le nombre de confirmations doit être défini avant que le commit de décalage soit acceptable, et en général il n’est pas nécessaire d’être modifié |
| Propriété | Faire défaut | Propriété par défaut du serveur | Description | | cleanup.policy | supprimer | log.cleanup.policy | Soit « supprimer » soit « compacter » ; Cette chaîne indique comment utiliser l’ancienne partie du journal ; La méthode par défaut (« supprimer ») éliminera l’ancienne pièce lorsque leur limite de temps ou de taille de recyclage est atteinte. « compact » comprime les grumes | | delete.retention.ms | 86400000 (24 heures) | log.cleaner.delete.retention.ms | La différence entre log.retention.minutes est que l’un contrôle les données non compressées et l’autre les données compressées. Cette configuration peut être annulée par les paramètres d’épinglage lors de la création du sujet | | flush.messages | Aucun | log.flush.interval.messages | Cette configuration spécifie un intervalle de temps pour forcer les journaux fsync. Par exemple, si cette option est réglée à 1, alors le fsync est requis après chaque message, et si le message est réglé à 5, le fsync est requis pour chaque 5 messages. En général, il est recommandé de ne pas fixer cette valeur. Le réglage de ce paramètre nécessite le compromis nécessaire entre « fiabilité des données » et « performance ». Si cette valeur est trop grande, cela entraînera un long délai de « fsync » à chaque fois (blocage d’E/S), et si cette valeur est trop petite, cela entraînera un grand nombre de « fsync », ce qui signifie aussi un certain délai dans la requête globale du client. Une défaillance physique du serveur entraînera la perte des messages sans fsync. | | flush.ms | Aucun | log.flush.interval.ms | Cette configuration sert à fixer l’intervalle de temps entre le forcing des logs fsync sur le disque ; Par exemple, si elle est réglée à 1000, alors le fsync est nécessaire tous les 1000 ms. Cette option n’est généralement pas recommandée | | index.interval.bytes | 4096 | log.index.interval.bytes | Le réglage par défaut garantit que nous ajoutons un index au message tous les 4096 octets, et plus d’index rendent le message lu plus proche, mais la taille de l’index sera augmentée. Cette option n’est généralement pas obligatoire | | max.message.bytes | 1000000 | max.message.bytes | La taille maximale du message kafka append. Notez que si vous augmentez cette taille, vous devez aussi augmenter la taille de récupération de votre consommateur afin que celui-ci puisse récupérer les messages à ces tailles maximales. | | Ratio min.nettoyable.sale. | 0.5 | Ratio min.nettoyable.sale. | Cette configuration contrôle la fréquence à laquelle le compresseur de logs tente de purger les logs. Par défaut, les journaux avec un taux de compression supérieur à 50 % seront évités. Ce ratio évite le plus grand gaspillage d’espace | | min.insync.replicas | 1 | min.insync.replicas | Lorsque le producteur est réglé sur request.required.acks à -1, min.insync.replicas spécifie le nombre minimum de réplicas (chaque écriture de repica doit être réussie), et si ce nombre n’est pas atteint, le producteur produira une exception. | | retention.bytes | Aucun | log.retention.bytes | Si vous utilisez la politique de « suppression » de rétention, cette configuration fait référence à la taille maximale que le journal peut atteindre avant d’être supprimé. Par défaut, il n’y a pas de limite de taille mais seulement une limite de temps | | retention.ms | 7 jours | log.retention.minutes | Si vous utilisez la politique de « suppression » de rétention, cette configuration fait référence au moment où le journal a été sauvegardé avant le journal de suppression. | | segment.octets | 1 Go | log.segment.bytes | Dans Kafka, les logaritmes sont stockés en blocs, et cette configuration fait référence à la taille des logaritmtes divisés en blocs | | segment.index.bytes | 10 Mo | log.index.size.max.octets | Cette configuration correspond à la taille du fichier index mappé entre les décalages et les emplacements des fichiers ; Cette configuration n’a généralement pas besoin d’être modifiée | | segment.ms | 7 jours | log.roll.hours | Même si le fichier de bloc de journal n’atteint pas la taille nécessaire pour être supprimé ou compressé, une fois que le temps de journal atteint cette limite supérieure, un nouveau fichier de bloc de journal sera forcé | | segment.jitter.ms | 0 | log.roll.jitter. {ms,hours} | Le jitter maximal à soustraire de logRollTimeMillis. |
Configurations grand public
| Propriété | Faire défaut | Description | | group.id | | Une chaîne qui identifie de manière unique le groupe dans lequel se trouve le processus consommateur, et si le même identifiant de groupe est défini, cela signifie que ces processus appartiennent au même groupe consommateur | | zookeeper.connect | | Spécifiez la chaîne de la connexion Zookeeper, le format est hostname :port, ici l’hôte et le port sont à la fois l’hôte et le port du serveur Zookeeper, afin d’éviter de perdre le contact après la panne d’une machine Zookeeper, vous pouvez spécifier plusieurs hostname :ports, en utilisant des virgules comme séparation : Nomdehôte1 :port1,Nomdhôte2 :Port2,NomHôte3 :Port3 :Port3 Vous pouvez ajouter le chemin chroot de ZooKeeper à la chaîne de connexion ZooKeeper, qui sert à stocker ses propres données, d’une manière : NomHôte1 :Port1,NomHôte2 :Port2,NomHôte3 :Port3/chroot/path | | consumer.id | zéro | Aucune configuration n’est requise et est généralement générée automatiquement | | socket.timeout.ms | 30*100 | Limites de délai pour les requêtes réseau. La vraie limite de temps d’expiration est max.fetch.wait+socket.timeout.ms | | socket.receive.buffer.bytes | 64*1024 | Socket est utilisé pour recevoir la taille du cache des requêtes réseau | | fetch.message.max.octets | 1024*1024 | Le nombre maximal d’octets par message de récupération et requête de récupération. Ces octets seront supervisés dans la mémoire utilisée pour chaque partition, ce qui permettra de contrôler la quantité de mémoire utilisée par le consommateur. La taille de la requête de récupération doit être au moins égale à la taille maximale autorisée par le serveur, sinon la taille du message que le producteur peut envoyer est supérieure à celle que le consommateur peut consommer. | | num.consumer.fetchers | 1 | Le nombre de threads de récupération utilisés pour les données de récupération | | auto.commit.enable | true | Si c’est vrai, le décalage du message récupéré par le consommateur sera automatiquement synchronisé avec le gardien. Ce décalage de commit sera utilisé par le nouveau consommateur lorsque le processus se bloque | | auto.commit.interval.ms | 60*1000 | La fréquence à laquelle le consommateur soumet un décalage au gardien est en quelques secondes | | queued.max.message.chunks | 2 | Le nombre maximal de messages utilisés pour la consommation en cache. Chaque morceau doit être identique à fetch.message.max.octets | | rebalance.max.réessais | 4 | Lorsqu’un nouveau consommateur est ajouté à un groupe de consommateurs, le groupe tente de rééquilibrer le nombre de partitions allouées à chaque consommateur. Si la collecte des consommateurs change, ce rééquilibrage échoue et se réentrifie lorsque l’allocation est en cours d’exécution | | fetch.min.bytes | 1 | Le nombre minimum d’octets que le serveur doit retourner à chaque requête de récupération. Si pas assez de données sont retournées, la requête attend qu’assez de données soient retournées. | | fetch.wait.max.ms | 100 | S’il n’y a pas assez de données pour satisfaire fetch.min.bytes, cette configuration fait référence au temps maximum que le serveur bloquera avant de répondre à une requête de récupération. | | rebalance.backoff.ms | 2000 | Temps de retrait avant de recommencer à réorganiser | | refresh.leader.backoff.ms | 200 | Il reste un délai d’attente avant de tenter de déterminer si le dirigeant d’une partition a perdu sa direction | | auto.offset.reset | le plus grand | S’il n’y a pas de décalage initialisé dans Zookeeper, si le décalage est une réponse à la valeur suivante : le plus petit : réinitialisation automatique du décalage vers le plus petit décalage le plus grand : Décalage automatique au décalage du plus grand Tout le reste : fait une exception au consommateur | | consumer.timeout.ms | -1 | Si aucun message n’est disponible, même après avoir attendu un certain temps, une exception de délai d’attente est lancée | | exclure.sujets_internes. | true | Savoir si il faut exposer les messages issus de sujets internes aux consommateurs | | paritition.assignment.strategy | Portée | Sélectionnez la politique d’attribution des partitions au flux consommateur, optionnellement la plage, roundrobin | | client.id | Valeur de l’identifiant de groupe | est une chaîne spécifique à l’utilisateur qui aide à suivre les appels dans chaque requête. Il devrait logiquement confirmer l’application qui a généré la requête | | zookeeper.session.timeout.ms | 6000 | Limites de temps d’arrêt pour les séances avec les gardiens de zoo. Si le consommateur n’envoie pas de message de battement de cœur au gardien pendant ce temps, il est considéré comme raccroché et un signalement sera généré | | zookeeper.connection.timeout.ms | 6000 | Le temps d’attente maximal pour qu’un client établisse une connexion avec un gardien de zoo | | zookeeper.sync.time.ms | 2000 | Les suiveurs ZK peuvent rester en retard par rapport au leader ZK pendant un temps maximum | | offsets.storage | Gardien de zoo | Lieux utilisés pour stocker les compensations : zookeeper ou kafka | | offset.channel.backoff.ms | 1000 | Le temps de remise en retrait pour se reconnecter au canal des décalages ou réessayer la requête de récupération/commit de l’offset échoué | | offsets.channel.socket.timeout.ms | 10000 | La limite de temps d’attente du socket pour la réponse à la requête de récupération/commit lors de la lecture du décalage. Cette limite de délai est utilisée par la requête consumerMetadata pour demander la gestion des offset | | offsets.commit.max.réessais | 5 | Le nombre de fois où le commit offset a été retenté. Cette réévaluation est appliquée uniquement pour compenser les commits entre deux arrêts. Lui | | dual.commit.enabled | true | Si vous utilisez « kafka » comme offsets.storage, vous pouvez valider le décalage sur zookeeper deux fois (et une fois sur kafka). C’est indispensable lors de la migration du stockage offset basé sur le soigneur de zoo vers le stockage offset basé sur le kafka. Pour tout groupe de consommateurs, il est conseillé sans risque de désactiver cette option une fois la migration terminée | | partition.assignation.stratégie | Portée | Choisir entre les politiques « plage » et « roundrobin » comme politique pour assigner les partitions aux flux de données grand public ; L’alloqueur de partitions circulaires alloue toutes les partitions disponibles ainsi que tous les threads consommateurs disponibles. Il assignera la boucle de partition au thread consommateur. Si toutes les instances consommateurs sont souscrites à un déterminé, les partitions sont divisées en distributions déterministes. La stratégie d’allocation round-robin n’est possible que si les conditions suivantes sont remplies : (1) Chaque sujet dispose du même nombre de flux de données par force du consommateur. (2) La collection des sujets abonnés est déterminée pour chaque instance de consommateur du groupe de consommateurs. |
Configurations du producteur
| Propriété | Faire défaut | Description | | metadata.broker.list | | Servir en auto-attaque. Producer est uniquement utilisé pour obtenir des métadonnées (sujets, partitions, répliques). La connexion socket pour envoyer les données réelles sera établie en fonction des métadonnées retournées. Le format est le suivant : hôte1 :port1,hôte2 :port2 Cette liste peut être une sous-liste de courtiers ou un VIP désignant des courtiers | | request.required.acks | 0 | Cette configuration est une valeur d’accusé de réception qui indique quand une demande de production est considérée comme terminée. En particulier, combien d’autres courtiers ont dû soumettre des données à leurs journaux et confirmer ces informations à leur responsable. Les valeurs typiques incluent : 0 : Indique que le producteur n’attend jamais la confirmation du courtier (même comportement que le 0,7). Cette option offre la latence la plus faible mais en même temps le plus grand risque (car les données sont perdues lorsque le serveur tombe en panne). 1 : Indique que la réplique du leader a reçu la confirmation des données. Cette option a une faible latence et garantit que le serveur confirme qu’elle est reçue. -1 : Le producteur reçoit la confirmation que toutes les répliques synchronisées ont reçu des données. La latence est la plus importante, cependant, cette méthode n’élimine pas complètement le risque de perte de messages, car le nombre de répliques synchronisées peut être 1. Si vous souhaitez vous assurer que certaines répliques reçoivent des données, vous devez alors définir l’option min.insync.replicas dans les paramètres au niveau du sujet. Lisez la documentation de conception pour une discussion plus approfondie. | | request.timeout.ms | 10000 | Le courtier fait de son mieux pour mettre en œuvre l’exigence request.required.acks, sinon une erreur sera envoyée au client | | producteur.type | synchronisation | Cette option identifie si le message est envoyé de façon asynchrone dans un thread en arrière-plan. Valeurs correctes : (1) asynchrone : envoi asynchrone (2) synchronisation : envoi synchronisé En mettant le producteur en asynchrone, nous pouvons traiter les requêtes par lots (ce qui est bénéfique pour un débit plus élevé), mais cela crée aussi la possibilité que la machine cliente perde des données non envoyées | | serializer.class | kafka.serializer.DefaultEncoder | La catégorie de sérialisation du message. L’encodeur par défaut entre un octet[] et retourne le même octet[] | | key.serializer.class | | Classe de sérialisation pour mots-clés. Si cela n’est pas donné, le défaut est de correspondre au message | | partitioner.class | kafka.producer.DefaultPartitioner | Partitioner pour diviser les messages entre sous-thèmes. Le partitionneur par défaut est basé sur la table de hachage de la clé | | compression.codec | aucun | Ce paramètre peut définir le codec pour compresser les données, qui peut être sélectionné comme « aucun(ne), « gzip », « snappy ». | | sujets.compressés | zéro | Ce paramètre peut être utilisé pour déterminer si certains sujets sont compressés. Si le codec compressé est un codec autre que NoCompressCodec, ces codecs sont appliqués aux données des sujets spécifiés. Si la liste des sujets compressés est vide, appliquez le codec compressé spécifique à tous les sujets. Si le codec compressé est NoCompressionCodec, la compression n’est pas disponible pour tous les sujets. | | message.send.max.retentatives | 3 | Ce paramètre fera que le producteur réessaiera automatiquement les requêtes d’envoi échouées. Ce paramètre fixe le nombre de tentatives. Note : Définir une valeur non nulle provoquera la répétition de certaines erreurs réseau : provoquer un envoi et une perte d’accusé de réception | | retry.backoff.ms | 100 | Avant chaque réessayage, le producteur met à jour les métadonnées du sujet concerné pour voir si le nouveau leader est assigné. Comme la sélection du leader prend un peu de temps, cette option précise combien de temps le producteur doit attendre avant de mettre à jour les métadonnées. | | topic.metadata.refresh.interval.ms | 600*1000 | Le producteur met généralement à jour les métadonnées du sujet dans certains cas de défaillance (partition manquante, leader indisponible, etc.). Il suivra un cycle régulier. Si vous le mettez à une valeur négative, les métadonnées ne seront mises à jour que si cela échoue. Si elle est réglée à 0, les métadonnées sont mises à jour après chaque envoi de message (cette option n’est pas recommandée, le système en consomme trop). Important : Les mises à jour n’ont lieu qu’après l’envoi du message, donc si le producteur n’envoie jamais le message, les métadonnées ne sont jamais mises à jour. | | queue.buffering.max.ms | 5000 | L’intervalle de temps maximal auquel l’utilisateur met en cache les données lorsque le mode asynchrone est appliqué. Par exemple, si le message est réglé à 100, les messages dans un délai de 100 ms seront traités par lots. Cela améliorera le débit, mais augmentera la latence grâce au cache. | | queue.buffering.max.messages | 10000 | En mode asynchrone, le nombre maximal de messages non envoyés pouvant être mis en cache dans la file avant le producteur doit être bloqué ou les données doivent être perdues | | batch.num.messages | 200 | En utilisant le mode asynchrone, vous pouvez traiter en lots le nombre maximal de messages. Ou le nombre de messages est arrivé en ligne ou queue.buffer.max.ms est arrivé, et le producteur le traitera | | envoyer.buffer.bytes | 100*1024 | Taille du cache d’écriture du socket | | client.id | “” | Cet identifiant client est une chaîne spécifique à l’utilisateur qui est incluse dans chaque requête pour suivre l’appel, et il devrait logiquement pouvoir confirmer que l’application a effectué la requête. |
Configurations du producteur
| Nom | Type | Faire défaut | Importance | Description | | boostrap.servers | Liste | | haut | Groupe hôte/port pour établir une connexion avec le cluster kafka. Les données seront chargées uniformément sur tous les serveurs, quel que soit le serveur désigné pour le bootstrapping. Cette liste n’affecte que les hôtes initialisés (qui sert à découvrir tous les serveurs). Voici le format de la liste :
host1:port1,host2:port2,... Puisque ces serveurs ne servent qu’à initialiser des connexions afin de découvrir toutes les appartenances au cluster (qui peuvent changer dynamiquement), cette liste n’a pas besoin de contenir tous les serveurs (vous pouvez vouloir plus d’un serveur, bien que dans ce cas, un serveur puisse être hors service). Si aucun serveur n’apparaît dans cette liste, l’envoi de données échouera tant que la liste n’est pas disponible. | | acks | Corde | 1 | haut | Le producteur a besoin d’un signal du serveur pour accuser réception après avoir reçu les données, et cette configuration fait référence au nombre de ces signaux d’accusé de réception dont le producteur a besoin. Cette configuration représente en réalité la disponibilité des sauvegardes de données. Les réglages suivants sont des options courantes : (1) acks=0 : Fixé à 0 signifie que le producteur n’a pas besoin d’attendre une confirmation de l’information reçue. La réplique sera immédiatement ajoutée au tampon de socket et considérée comme envoyée. Il n’y a aucune garantie que le serveur ait reçu les données avec succès dans ce cas, et la retentative de la configuration ne fonctionnera pas (car le client ne sait pas si elle a échoué) et le décalage de la rétroaction sera toujours fixé à -1. (2) acks=1 : Cela signifie qu’au moins il faut attendre que le leader écrive avec succès les données dans le journal local, mais pas que tous les suiveurs écrivent avec succès. Dans ce cas, si le suiveur ne sauvegarde pas avec succès les données et que le leader raccroche à nouveau, le message sera perdu. (3) acks=tous : Cela signifie que le leader doit attendre que toutes les sauvegardes écrivent avec succès les journaux, et cette stratégie garantit que les données ne seront pas perdues tant qu’une sauvegarde subsiste. C’est la garantie la plus forte. (4) D’autres réglages, comme ack=2, sont également possibles, nécessitant un nombre donné d’acks, mais cette stratégie est généralement rarement utilisée. | | buffer.memory | long | 33554432 | haut | Le producteur peut être utilisé pour mettre en cache la taille mémoire des données. Si les données sont générées plus rapidement qu’elles ne sont envoyées au courtier, le producteur bloquera ou lancera une exception, indiquée par « block.on.buffer.full ».
Ce paramètre sera lié à la mémoire totale que le producteur peut utiliser, mais ce n’est pas une limite stricte, car toute la mémoire utilisée par le producteur n’est pas utilisée pour la mise en cache. Une partie de la mémoire supplémentaire est utilisée pour la compression (si une compression est introduite), et une autre pour les demandes de maintenance. | | compression.type | Corde | aucun | haut | Producer est le type de compression utilisé pour compresser les données. Le défaut est non compressé. Les bonnes valeurs d’option sont aucune, gzip, snappy. La compression est mieux utilisée pour le traitement par lots : plus les messages sont traités par lots, meilleures sont les performances de compression. | | Essais | int | 0 | haut | Définir une valeur supérieure à 0 poussera le client à renvoyer toute donnée une fois que ces données échouent. Notez que ces tentatives ne diffèrent pas de celles lorsque le client reçoit une erreur d’envoi. Permet aux tentatives de changer potentiellement l’ordre des données : si les deux enregistrements de message sont envoyés à la même partition, que le premier message échoue, que le second apparaît avant le premier. | | taille de lot | int | 16384 | Douleur moyenne | Le producteur tentera de regrouper les enregistrements de messages pour réduire le nombre de requêtes. Cela améliorera les performances entre le client et le serveur. Cette configuration contrôle le nombre par défaut d’octets de messages de traitement par lots. Aucune tentative n’est faite pour traiter des octets de messages supérieurs à ce nombre d’octets. Les requêtes envoyées aux courtiers contiendront plusieurs lots, qui contiendront une requête pour chaque partition. Les valeurs de batch plus petites sont moins utilisées et peuvent réduire le débit (0 n’utilisera que le traitement par lots). Les valeurs de lot plus grandes perdent plus d’espace mémoire, donc il faut allouer de la mémoire pour des valeurs de lot spécifiques. | | client.id | Corde | | Douleur moyenne | Cette chaîne est envoyée au serveur lorsqu’une requête est effectuée. L’objectif est de pouvoir retracer la source des requêtes afin de permettre à certaines applications en dehors de la liste IP ou des ports d’envoyer des informations. Cette application peut définir n’importe quelle chaîne car elle n’a aucune fonction autre que l’enregistrement et le suivi | | linger.ms | long | 0 | Douleur moyenne | Le groupe de producteurs agrégera tous les messages arrivant entre la requête et l’envoi, enregistrant un lot distinct de requêtes. En général, cela ne se produit que lorsque l’enregistrement est généré plus rapidement que le débit d’envoi. Cependant, dans certaines conditions, le client voudra réduire le nombre de requêtes, voire une charge modérée. Cette configuration se fait en ajoutant un petit délai – c’est-à-dire qu’au lieu d’envoyer un enregistrement immédiatement, le producteur attendra un délai donné pour permettre l’envoi d’autres enregistrements de message, qui peuvent être regroupés. On peut considérer cela comme un algorithme similaire à TCP Nagle. Ce réglage fixe une limite de latence plus élevée pour le batch : une fois que nous obtenons la taille batch d’une partition, elle l’enverra immédiatement quel que soit ce paramètre, mais si nous recevons un message avec un nombre d’octets beaucoup plus petit que ce paramètre, nous devons « traîner » un temps précis pour recevoir plus de messages. Ce réglage est par défaut à 0, c’est-à-dire pas de délai. Définir linger.ms=5, par exemple, réduira le nombre de requêtes, mais augmentera en même temps le délai de 5 ms. | | max.request.taille | int | 1028576 | Douleur moyenne | Le nombre maximal d’octets demandés. C’est également une couverture efficace pour la taille maximale enregistrée. Note : Le serveur dispose de sa propre dérogation sur la taille des enregistrements de messages, qui diffère de ce paramètre. Ce réglage limite le nombre de demandes que les producteurs peuvent envoyer en masse à la fois afin d’éviter un grand nombre de demandes. | | receive.buffer.bytes | int | 32768 | Douleur moyenne | Taille TCP receivecache, utilisée lors de la lecture de données | | envoyer.buffer.bytes | int | 131072 | Douleur moyenne | Taille du cache d’envoi TCP, utilisée lors de l’envoi de données | | timeout.ms | int | 30000 | Douleur moyenne | Cette option de configuration contrôle le temps maximum d’attente du serveur pour la confirmation des abonnés. Si le nombre de requêtes confirmées n’est pas satisfait dans ce délai, une erreur est renvoyée. Cette limite de délai est mesurée côté serveur et n’a aucune latence réseau, y compris les requêtes | | bloque.on.buffer.full | Booléen | true | Low | Lorsque notre cache mémoire s’épuise, nous devons cesser de recevoir de nouveaux enregistrements de messages ou lancer des erreurs. Par défaut, cela est réglé sur vrai, cependant certains blocages ne valent pas forcément la peine d’être attendus, il vaut donc mieux lancer une erreur immédiatement. C’est le cas lorsqu’il est réglé sur false : le producteur lance une erreur d’exception : BufferExhaustedException si l’enregistrement a été envoyé et que le cache est plein | | metadata.fetch.timeout.ms | long | 60000 | Low | Il fait référence aux données de la première fois de certains éléments que nous avons obtenues. Les éléments incluent : sujet, hôte, partitions. Cette configuration fait référence au temps nécessaire pour que l’élément soit complété avec succès selon la récupération, sinon une exception sera envoyée au client. | | metadata.max.age.ms | long | 300000 | Low | Le temps en microsecondes est l’intervalle auquel nous forçons la mise à jour des métadonnées. Même si nous ne voyons aucun changement de direction de la partition. | | metric.reporters | Liste | [] | Low | Une liste de classes utilisées pour mesurer les métriques. L’implémentation de l’interface MetricReporter permettra d’ajouter des classes qui changent au fur et à mesure de la génération de nouvelles métriques. JmxReporter inclura toujours un moyen d’enregistrer les statistiques JMX | | metrics.num.samples | int | 2 | Low | Le nombre d’échantillons utilisés pour maintenir les métriques | | metrics.sample.window.ms | long | 30000 | Low | Le système Metrics maintient un nombre configurable d’échantillons dans une taille de fenêtre corrigable. Cette configuration configure, par exemple, la taille de la fenêtre. Nous pouvons conserver deux échantillons pendant une période de 30 secondes. Quand une fenêtre est déployée, nous effaceons et réécrivons la fenêtre la plus ancienne | | recoonect.backoff.ms | long | 10 | Low | Quand la connexion tombe, le temps d’attente pour la reconnexion. Cela évite les reconnexions répétées des clients | | retry.backoff.ms | long | 100 | Low | Le temps d’attente avant de tenter de refaire une demande de fruits et légumes échouée. Évitez de rester coincé dans une boucle d’envoi et d’échec.
|
|