Configuraciones de Broker
| Propiedad | Predeterminado | Descripción | | broker.id | | Cada broker puede identificarse con un ID entero único no negativo; Este id puede usarse como el "nombre" del broker, y su existencia permite que el broker migre a otro host/puerto sin confundir a los consumidores. Puedes elegir cualquier número que quieras como identificación, siempre que el ID sea único. | | log.dirs | /tmp/kafka-logs | El camino donde Kafka almacena datos. Este camino no es único, puede ser múltiple, y los caminos solo necesitan separarse por comas; Cada vez que se crea una nueva partición, se elige para hacerlo bajo el camino que contiene el menor número de particiones. | | Puerto | 6667 | El servidor acepta el puerto al que el cliente se conecta | | zookeeper.connect | nulo | El formato de la cadena de conexión de ZooKeeper es: hostname:port, donde hostname y port son el host y port de un nodo en el clúster ZooKeeper, respectivamente. Para conectarse a otros nodos de ZooKeeper cuando un host cae, puedes crear varios hosts de la siguiente manera:
hostname1:port1, hostname2:port2, hostname3:port3.
ZooKeeper permite añadir una ruta "chroot" para almacenar todos los datos kafka del clúster bajo una ruta específica. Cuando varios clústeres Kafka u otras aplicaciones usan el mismo clúster ZooKeeper, puedes usar este método para establecer la ruta de almacenamiento de datos. Esto se puede implementar formateando la cadena de conexión de la siguiente manera: nombre host1:puerto1,nombre host2:puerto2,nombre host3:puerto3/chroot/path Esta configuración almacena todos los datos del clúster kafka bajo la ruta /chroot/path. Ten en cuenta que antes de iniciar el broker, debes crear este camino, y los consumidores deben usar el mismo formato de conexión. | | message.max.bytes | 1000000 | El tamaño máximo de mensajes que puede recibir un servidor. Es importante que las configuraciones del consumidor y el productor respecto a esta propiedad estén sincronizadas, de lo contrario el mensaje publicado por el productor sería demasiado grande para el consumidor. | | num.network.threads | 3 | El número de hilos de red que utiliza el servidor para procesar solicitudes de red; Por lo general, no necesitas cambiar esta propiedad. | | num.io.threads | 8 | El número de hilos de E/S utilizados por el servidor para procesar solicitudes; El número de hilos debería ser al menos igual al número de discos duros. | | antecedentes.hilos | 4 | El número de hilos usados para el procesamiento en segundo plano, como la eliminación de archivos; No necesitas cambiar esta propiedad. | | queued.max.peticiones | 500 | El número máximo de solicitudes que se pueden poner en cola para que un hilo de E/S las procese antes de que un hilo de red deje de leer nuevas solicitudes. | | host.name | nulo | nombre de anfitrión del corredor; Si el nombre de host ya está establecido, el broker solo estará vinculado a esta dirección; Si no hay ninguna configuración, se vinculará a todas las interfaces y publicará una copia en el ZK | | advertised.host.name | nulo | Si está establecido, se envía a productores, consumidores y otros corredores como nombre host del corredor | | publicidad.puerto | nulo | Este puerto se concede a productores, consumidores y otros intermediarios, y se utiliza para establecer una conexión; Solo tiene que configurarse si el puerto real y el puerto que el servidor debe vincular son diferentes. | | socket.send.buffer.bytes | 100 * 1024 | SO_SNDBUFF Tamaño de caché, que utiliza el servidor para hacer conexiones de socket | | socket.recepción.buffer.bytes | 100 * 1024 | SO_RCVBUFF tamaño de caché, que utiliza el servidor para conectarse a los sockets | | socket.request.max.bytes | 100 * 1024 * 1024 | El tamaño máximo de solicitud permitido por el servidor; Esto evitará desbordamientos de servidores, que deberían ser menores que el tamaño del montón Java | | num.particiones | 1 | Si no se indica el número de particiones al crear un tema, este número será el número por defecto de particiones bajo el tema. | | log.segment.bytes | 1014*1024*1024 | Los registros de la partición tema se almacenan en muchos archivos de un determinado directorio, que dividen los registros de la partición en segmentos; Este atributo es el tamaño máximo de cada archivo; Cuando las dimensiones alcanzan este valor, se crea un nuevo archivo. Esta configuración puede ser anulada por la base de cada tema. Vista El inicio de sesión del hipervínculo es visible. | | log.roll.hours | 24 * 7 | Incluso si el archivo no llega a log.segment.bytes, se crea un nuevo archivo cada vez que el momento de creación del archivo alcanza esta propiedad. Esta configuración también puede ser anulada por la configuración a nivel temático; VistaEl inicio de sesión del hipervínculo es visible. | | log.cleanup.policy | eliminar | | | log.retention.minutes y log.retention.hours | 7 días | El tiempo que se guardó cada archivo de registro antes de ser eliminado. El tiempo predeterminado de ahorro de datos es el mismo para todos los temas. log.retention.minutes y log.retention.bytes se utilizan ambos para configurar la eliminación de archivos de registro, independientemente de qué propiedad haya sido desbordada. Esta configuración de propiedad puede ser anulada cuando el tema está prácticamente establecido. VistaEl inicio de sesión del hipervínculo es visible. | | log.retention.bytes | -1 | La cantidad total de datos almacenados por cada partición bajo cada tema; Ten en cuenta que este es el límite superior por partición, así que este número multiplicado por el número de particiones es la cantidad total de datos almacenados por tema. También ten en cuenta: Si se activan tanto log.retention.hours como log.retention.bytes, superar cualquiera de los límites hará que se elimine un archivo de segmento. Ten en cuenta que esta configuración puede ser anulada por cada tema. VistaEl inicio de sesión del hipervínculo es visible. | | log.retention.check.interval.ms | 5 minutos | Comprueba el intervalo entre archivos segmentados en registro para determinar si los atributos cumplen los requisitos de eliminación. | | log.cleaner.enable | false | Cuando esta propiedad se pone falsa, se eliminará una vez que el registro se almacene durante un tiempo o tamaño máximo. Si se pone en verdadero, ocurrirá cuando el atributo de guardado alcance el límite superiorEl inicio de sesión del hipervínculo es visible.。 | | log.cleaner.threads | 1 | El número de hilos que realizan compresión logarítmica | | log.cleaner.io.max.bytes.per.segundo | Ninguno | El número máximo de E/S que puede tener un limpiador de troncos al realizar una compactación de registros. Esta configuración limita al limpiador para evitar interferir con los servicios de solicitud activos. | | log.cleaner.io.buffer.size | 500*1024*1024 | Log Cleaner indexa los registros durante el proceso de limpieza y reduce el tamaño de la caché utilizada. Lo mejor es ajustarlo grande para proporcionar suficiente memoria. | | log.cleaner.io.buffer.load.factor | 512*1024 | El tamaño del trozo de E/S necesario para limpiar troncos. No necesitas cambiar esta configuración. | | log.cleaner.io.buffer.load.factor | 0.9 | factor de carga de la tabla hash utilizada en la limpieza de logs; No necesitas cambiar esta opción. | | log.cleaner.backoff.ms | 15000 | Se realiza el intervalo de tiempo en el que se limpia el registro | | log.cleaner.min.cleanable.ratio | 0.5 | Esta configuración controla con qué frecuencia el compactador de troncos intenta limpiar los troncos (se suponeEl inicio de sesión del hipervínculo es visible.está abierto). Por defecto, evita limpiar más del 50% de los troncos. Esta proporción está ligada al espacio máximo ocupado por el registro de copia de seguridad (el 50% de los registros se comprime al 50%). Una tasa más alta significa menos desperdicio y más espacio que se puede despejar de forma más eficiente. Esta configuración puede ser anulada en cada temática. VistaEl inicio de sesión del hipervínculo es visible.。 | | log.cleaner.delete.retention.ms | 1 día | tiempo de almacenamiento; El tiempo máximo para mantener los registros comprimidos; También es el tiempo máximo para que el cliente consuma mensajes, y la diferencia entre log.retention.minutes es que uno controla los datos sin comprimir y el otro los datos comprimidos, que serán sobrescritos por el tiempo especificado cuando se crea el tema. | | log.index.size.max.bytes | 10*1024*1024 | El tamaño máximo por segmento log. Ten en cuenta que si el tamaño logarítmico alcanza este valor, debe generarse un nuevo segmento logarítmico incluso si el tamaño no supera el límite de log.segment.bytes. | | log.index.interval.bytes | 4096 | Al realizar un fetch, necesitas escanear el desplazamiento más cercano con cierto espacio; cuanto mayor sea el ajuste, mejor, generalmente usa el valor por defecto | | log.flush.intervalo.mensajes | Long.MaxValue | el archivo de registro "sincroniza" con el disco antes de acumular mensajes. Dado que las operaciones de E/S en disco son una operación lenta, pero también un medio necesario de "fiabilidad de datos", comprueba si se requiere el intervalo de tiempo para curar en el disco duro. Si este valor es demasiado grande, provocará un tiempo de "sincronización" demasiado largo (bloqueo de E/S); si este valor es demasiado pequeño, provocará un largo tiempo de "fsync" (bloqueo de E/S); si este valor es demasiado pequeño, provocará un gran número de tiempos de "sincronización", lo que significa que la solicitud global del cliente tiene cierto retardo, y el fallo del servidor físico provocará la pérdida de mensajes sin fsync. | | log.flush.scheduler.interval.ms | Long.MaxValue | Comprueba si se requieren intervalos fsync | | log.flush.interval.ms | Long.MaxValue | Este número se utiliza para controlar el intervalo de tiempo de "fsync"; si el número de mensajes nunca alcanza el número de mensajes fijados en el disco, pero el intervalo de tiempo desde la última sincronización del disco alcanza el umbral, también se activará la sincronización del disco. | | log.delete.delay.ms | 60000 | El tiempo de retención tras la eliminación del archivo en el índice generalmente no necesita modificarse | | auto.create.topics.enable | true | Si permitir la creación automática de temas. Si es cierto, creará automáticamente un tema que no existe cuando produce o fetch no existe. De lo contrario, tienes que usar la línea de comandos para crear un tema | | controller.socket.timeout.ms | 30000 | El tiempo de espera del socket cuando el controlador de gestión de particiones realiza una copia de seguridad. | | controlador.mensaje.queue.tamaño | Int.MaxValue | controlador-a-intermediario-channles | | factor.replicación.por defecto | 1 | El número predeterminado de copias de seguridad se refiere únicamente a temas creados automáticamente | | replica.lag.time.max.ms | 10000 | Si un seguidor no envía una solicitud de recado dentro de este tiempo, el líder volverá a retirar al seguidor del ISR y considerará que está colgado | | replica.lag.max.mensajes | 4000 | Si una réplica tiene más de este número de sin respaldo, el líder retirará al seguidor y considerará que está colgado | | replica.socket.timeout.ms | 30*1000 | Tiempo de espera del líder para las solicitudes de red socket al hacer copias de seguridad de datos | | replica.socket.receive.buffer.bytes | 64*1024 | Búfer de recepción de socket al enviar una solicitud de red al líder durante la copia de seguridad | | replica.fetch.max.bytes | 1024*1024 | El valor máximo de cada obtención en el momento de la copia de seguridad | | replica.fetch.min.bytes | 500 | El tiempo máximo de espera para que los datos lleguen al líder cuando este realiza una solicitud de respaldo | | replica.fetch.min.bytes | 1 | El tamaño mínimo de la respuesta después de cada recolección al retroceder | | num.réplica.buscadores | 1 | El número de hilos que respaldan los datos del líder | | replica.high.watermark.checkpoint.interval.ms | 5000 | Cada réplica comprueba con qué frecuencia se cura el nivel de agua más alto | | fetch.purgatory.purge.interval.requests | 1000 | Solicitud de obtención del intervalo de purga | | producer.purgatory.purge.interval.requests | 1000 | El productor solicita un intervalo de purga | | zookeeper.session.timeout.ms | 6000 | Tiempo muerto para la sesión de cuidador del zoológico. | | zookeeper.connection.timeout.ms | 6000 | El tiempo máximo que el cliente espera para establecer una conexión con el cuidador | | zookeeper.sync.time.ms | 2000 | El seguidor de ZK va por detrás de ZK Leader durante mucho tiempo | | controlled.shutdown.enable | true | Si es posible controlar el cierre del corredor. Si es posible, el corredor podrá trasladar a todos los líderes a otros corredores antes del cierre. Esto reduce la falta de disponibilidad durante el proceso de apagado. | | controlled.shutdown.max.reintentos | 3 | El número de comandos que pueden ejecutar con éxito un apagado antes de realizar un apagado incompleto. | | controlled.shutdown.retry.backoff.ms | 5000 | Tiempo de descanso entre cierres | | auto.leader.rebalance.enable | true | Si esto es cierto, el controlador equilibrará automáticamente el liderazgo de los brokers sobre las particiones | | líder.desequilibrio.per.broker.porcentaje | 10 | La razón máxima de desequilibrio permitida por cada corredor | | líder.desequilibrio.cheque.intervalo.segundos | 300 | Comprueba la frecuencia del desequilibrio del líder | | offset.metadata.max.bytes | 4096 | Permite a los clientes guardar el máximo número de sus desplazamientos | | max.connections.per.ip | Int.MaxValue | Se puede hacer el número máximo de conexiones por intermediario a cada dirección IP | | max.connections.per.ip.overrides | | La cobertura máxima de la conexión predeterminada por IP o nombre de host | | connections.max.idle.ms | 600000 | Límite de tiempo de espera para conexiones vacías | | log.roll.jitter. {ms,hours} | 0 | El número máximo de jitters abstraídos de logRollTimeMillis | | num.recovery.threads.per.data.dir | 1 | El número de hilos que utiliza cada directorio de datos para registrar la recuperación | | unclean.leader.elección.habilitar | true | Indica si es posible usar la configuración de no réplicas en ISR como líder | | delete.topic.enable | false | Poder eliminar temas | | desplazamientos.topic.num.particiones | 50 | El número de particiones para el tema de commit de desplazamiento. Como cambiar esto tras el despliegue actualmente no está soportado, recomendamos usar una configuración más alta para producción (por ejemplo, 100-200). | | offsets.topic.retention.minutes | 1440 | Los desplazamientos que hayan existido más tiempo que este límite de tiempo se marcarán como eliminación pendiente | | offsets.retention.check.interval.ms | 600000 | El gestor de desplazamientos comprueba la frecuencia de los desplazamientos obsoletos | | desplazamientos.tópico.replicación.factor | 3 | El número de copias de respaldo del offset del tema. Se recomienda establecer números más altos para garantizar una mayor disponibilidad | | offset.topic.segment.bytes | 104857600 | Desajusta el tema. | | desplazamientos.carga.buffer.tamaño | 5242880 | Esta configuración está relacionada con el tamaño del lote y se utiliza al leer desde el segmento de desplazamientos. | | offsets.commit.required.acks | -1 | El número de confirmaciones debe establecerse antes de que el commit de offset sea aceptable y, en general, no necesita cambiarse |
| Propiedad | Predeterminado | Propiedad predeterminada del servidor | Descripción | | cleanup.policy | eliminar | log.cleanup.policy | O bien "eliminar" o "compactar"; Esta cadena indica cómo utilizar la parte antigua del log; El método por defecto ("eliminar") descartará la pieza antigua cuando se alcance el límite de tiempo o tamaño de reciclaje. "compactar" comprimirá los troncos | | delete.retention.ms | 86400000 (24 horas) | log.cleaner.delete.retention.ms | La diferencia entre log.retention.minutes es que uno controla los datos sin comprimir y el otro los datos comprimidos. Esta configuración puede ser anulada por los parámetros de fijación cuando se crea el tema | | flush.messages | Ninguno | log.flush.intervalo.mensajes | Esta configuración especifica un intervalo de tiempo para forzar los registros fsync. Por ejemplo, si esta opción está en 1, se requiere fsync después de cada mensaje, y si está en 5, se requiere fsync por cada 5 mensajes. En general, se recomienda no establecer este valor. La configuración de este parámetro requiere el compromiso necesario entre "fiabilidad de datos" y "rendimiento". Si este valor es demasiado grande, provocará un largo tiempo de "fsync" cada vez (bloqueo de entrada/salida), y si este valor es demasiado pequeño, provocará un gran número de "fsync", lo que también significa que hay cierto retraso en la solicitud general del cliente. Un fallo físico del servidor hará que los mensajes se pierdan sin fsync. | | flush.ms | Ninguno | log.flush.interval.ms | Esta configuración se utiliza para fijar el intervalo de tiempo entre forzar los registros fsync al disco; Por ejemplo, si está en 1000, entonces se requiere fsync cada 1000ms. Esta opción generalmente no se recomienda | | index.interval.bytes | 4096 | log.index.interval.bytes | La configuración predeterminada garantiza que añadamos un índice al mensaje cada 4096 bytes, y más índices acercan el mensaje de lectura, pero el tamaño del índice aumentará. Esta opción generalmente no es obligatoria | | max.message.bytes | 1000000 | max.message.bytes | El tamaño máximo del mensaje de añadir kafka. Ten en cuenta que si aumentas este tamaño, también debes aumentar el tamaño de recogida de tu consumidor para que el consumidor pueda obtener mensajes a esos tamaños máximos. | | min.cleanable.dirty.ratio | 0.5 | min.cleanable.dirty.ratio | Esta configuración controla con qué frecuencia el compresor de logs intenta purgar los logs. Por defecto, se evitarán los registros con una tasa de compresión superior al 50%. Esta proporción evita el mayor desperdicio de espacio | | min.insync.replicas | 1 | min.insync.replicas | Cuando el productor está configurado en request.required.acks a -1, min.insync.replicas especifica el número mínimo de réplicas (cada escritura de repica debe ser exitosa), y si no se alcanza este número, el productor producirá una excepción. | | retention.bytes | Ninguno | log.retention.bytes | Si se utiliza la política de retención de "eliminar", esta configuración se refiere al tamaño máximo que el registro puede alcanzar antes de ser eliminado. Por defecto, no hay límite de tamaño, solo un límite de tiempo | | retention.ms | 7 días | log.retention.minutes | Si usas la política de retención de "eliminar", esta configuración se refiere al momento en que el registro se guardó antes del registro de borrado. | | segment.bytes | 1GB | log.segment.bytes | En Kafka, los logarítmics se almacenan en bloques, y esta configuración se refiere al tamaño de los troncos de logarítmics divididos en fragmentos | | segment.index.bytes | 10MB | log.index.size.max.bytes | Esta configuración es aproximadamente el tamaño del archivo índice que se asigna entre desplazamientos y ubicaciones de archivos; Esta configuración generalmente no necesita modificarse | | segment.ms | 7 días | log.roll.hours | Incluso si el archivo de bloque de registro no alcanza el tamaño necesario para eliminar o comprimirse, una vez que el tiempo de registro alcance este límite superior, se forzará un nuevo archivo de bloque de registro | | segment.jitter.ms | 0 | log.roll.jitter. {ms,hours} | El jitter máximo a restar de logRollTimeMillis. |
Configuraciones de consumo
| Propiedad | Predeterminado | Descripción | | group.id | | Una cadena que identifica de forma única el grupo en el que se encuentra el proceso consumidor, y si se establece el mismo ID de grupo, significa que estos procesos pertenecen al mismo grupo de consumidores | | zookeeper.connect | | Especifica la cadena de la conexión Zookeeper, el formato es hostname:port; aquí el host y el port son tanto el host como el puerto del servidor Zookeeper, para evitar perder el contacto después de que una máquina Zookeeper se cae, puedes especificar varios hostname:ports, usando comas como separación: nombre host1:puerto1,nombre host2:puerto2,nombre host3:puerto3 Puedes añadir la ruta chroot de ZooKeeper a la cadena de conexión de ZooKeeper, que se usa para almacenar sus propios datos, de una manera: nombre host1:puerto1,nombre host2:puerto2,nombre host3:puerto3/chroot/path | | consumer.id | nulo | No se requiere ninguna configuración y generalmente se genera automáticamente | | socket.timeout.ms | 30*100 | Límites de tiempo de espera para solicitudes de red. El límite real de tiempo de espera es max.fetch.wait+socket.timeout.ms | | socket.recepción.buffer.bytes | 64*1024 | Socket se utiliza para recibir el tamaño de caché de las solicitudes de red | | fetch.message.max.bytes | 1024*1024 | El número máximo de bytes por mensaje de obtención por solicitud de obtención. Estos bytes estarán supervisados en la memoria utilizada para cada partición, por lo que esta configuración controlará la cantidad de memoria utilizada por el consumidor. El tamaño de la petición de obtención debe ser al menos igual al tamaño máximo permitido por el servidor, de lo contrario el tamaño del mensaje que el productor puede enviar es mayor que el tamaño que el consumidor puede consumir. | | num.consumidor.buscadores | 1 | El número de hilos de recolección utilizados para datos de obtención | | auto.commit.enable | true | Si es cierto, el desplazamiento del mensaje que busca el consumidor se sincronizará automáticamente con el cuidador. Este offset de compromiso será utilizado por el nuevo consumidor cuando el proceso se cuelgue | | auto.commit.interval.ms | 60*1000 | La frecuencia con la que el consumidor presenta el desfasamiento al cuidador es en segundos | | queued.max.mensaje.chunks | 2 | El número máximo de mensajes usados para almacenar en caché para su consumo. Cada fragmento debe ser igual que fetch.message.max.bytes | | rebalance.max.reintentos | 4 | Cuando se añade un nuevo consumidor a un grupo de consumidores, el conjunto intenta reequilibrar el número de particiones asignadas a cada consumidor. Si cambia la colección de los consumidores, este reequilibrio falla y vuelve a entrar cuando se ejecuta la asignación | | fetch.min.bytes | 1 | El número mínimo de bytes que el servidor debe devolver con cada solicitud de obtención. Si no se devuelven suficientes datos, la solicitud espera hasta que se devuelvan suficientes datos. | | fetch.wait.max.ms | 100 | Si no hay suficientes datos para satisfacer fetch.min.bytes, esta configuración se refiere al tiempo máximo que el servidor bloqueará antes de responder a una solicitud de obtención. | | rebalance.backoff.ms | 2000 | Tiempo de retroceso antes de intentar de nuevo el reblance | | refresh.leader.backoff.ms | 200 | Hay un tiempo de espera antes de intentar determinar si el líder de una partición ha perdido su liderazgo | | auto.offset.reset | Mayor | Si no hay un desplazamiento inicializado en Zookeeper, si el desplazamiento es una respuesta al siguiente valor: el más pequeño: Desplazamiento de restablecimiento automático al desplazamiento menor el mayorest: Desplazamiento de reinicio automático al desplazamiento del mayor Cualquier otra cosa: Hace una excepción al consumidor | | consumer.timeout.ms | -1 | Si no hay mensaje disponible, incluso después de esperar un tiempo determinado, se lanza una excepción de tiempo de espera | | exclude.tópicos.internos | true | Si exponer mensajes de temas internos a los consumidores | | paritition.assignment.strategy | Distribución | Selecciona la política para asignar particiones al flujo de consumidor, opcionalmente rango, roundrobin | | client.id | Valor del ID de grupo | es una cadena específica para el usuario que ayuda a rastrear las llamadas en cada solicitud. Debería confirmar lógicamente la aplicación que generó la solicitud | | zookeeper.session.timeout.ms | 6000 | Límites de tiempo para sesiones con cuidadores. Si el consumidor no envía un mensaje de latido al cuidador durante ese tiempo, se considera que está colgado y se generará un reflujo | | zookeeper.connection.timeout.ms | 6000 | El tiempo máximo de espera para que un cliente establezca una conexión con Zookeeper | | zookeeper.sync.time.ms | 2000 | Los seguidores de ZK pueden quedarse atrás del líder ZK durante un tiempo máximo | | offsets.storage | cuidador de zoológico | Lugares donde se solían almacenar compensaciones: cuidador de zoológico o kafka | | offset.channel.backoff.ms | 1000 | El tiempo de retroceso de reconectar al canal de desplazamientos o de intentar de nuevo la petición de obtención/compromiso del offset fallido | | offsets.channel.socket.timeout.ms | 10000 | El límite de tiempo de espera del socket para la respuesta a la respuesta a la petición de obtención/commit al leer el desplazamiento. Este límite de tiempo de espera es utilizado por la solicitud consumerMetadata para solicitar la gestión de offset | | offsets.commit.max.reintentos | 5 | El número de veces que se volvió a intentar el commit de desplazamiento. Este reintento solo se aplica para deshacer los commits entre apagados. Él | | dual.commit.enabled | true | Si usas "kafka" como offsets.storage, puedes comprometer el offset a zookeeper dos veces (y una vez a kafka). Esto es imprescindible al migrar del almacenamiento offset basado en cuidador zoológico al almacenamiento offset basado en kafka. Para cualquier grupo de consumidores, es recomendable desactivar esta opción una vez completada la migración | | partition.assignment.strategy | Distribución | Elige entre las políticas de "rango" y "roundrobin" como política para asignar particiones a los flujos de datos del consumidor; El asignador de particiones circulares asigna todas las particiones disponibles así como todos los hilos de consumo disponibles. Asignará el bucle de partición al hilo de consumo. Si todas las instancias de consumo están suscritas a un determinado, las particiones se dividen en distribuciones deterministas. La estrategia de asignación round-robin solo es posible si se cumplen las siguientes condiciones: (1) Cada tema tiene el mismo número de flujos de datos por fuerza de usuario. (2) La colección de temas suscritos se determina para cada instancia de consumidor dentro del grupo de consumidores. |
Configuraciones de productores
| Propiedad | Predeterminado | Descripción | | metadata.broker.list | | Sáquense por iniciativa propia. Producer solo se usa para obtener metadatos (temas, particiones, réplicas). La conexión al socket para enviar los datos reales se establecerá en función de los metadatos devueltos. El formato es: host1:puerto1,anfitrión2:puerto2 Esta lista puede ser una sublista de corredores o un VIP que apunta a corredores | | request.required.acks | 0 | Esta configuración es un valor de acuse de recibo que indica cuándo una solicitud de producción se considera completa. En particular, ¿cuántos otros corredores debieron haber enviado datos a sus registros y confirmado esta información a su líder? Los valores típicos incluyen: 0: Indica que el productor nunca espera la confirmación del bróker (mismo comportamiento que en 0,7). Esta opción ofrece la menor latencia pero al mismo tiempo el mayor riesgo (porque se pierden datos cuando el servidor cae). 1: Indica que la réplica líder ha recibido la confirmación de los datos. Esta opción tiene una baja latencia y asegura que el servidor confirme que ha sido recibida. -1: El productor recibe la confirmación de que todas las réplicas sincronizadas han recibido datos. La latencia es la mayor, sin embargo, este método no elimina completamente el riesgo de perder mensajes, ya que el número de réplicas sincronizadas puede ser 1. Si quieres asegurarte de que algunas réplicas reciban datos, deberías configurar la opción min.insync.replicas en la configuración a nivel de tema. Lee la documentación de diseño para una discusión más profunda. | | request.timeout.ms | 10000 | El bróker hace todo lo posible por implementar el requisito request.required.acks, de lo contrario se enviará un error al cliente | | producer.type | Sincronizar | Esta opción fija si el mensaje se envía de forma asincrónica en un hilo en segundo plano. Valores correctos: (1) asíncrono: Envío asíncrono (2) sincronización: Envío sincronizado Al configurar el productor en asincrónico, podemos procesar solicitudes en lotes (lo cual es bueno para un mayor rendimiento), pero esto también crea la posibilidad de que la máquina cliente pierda datos no enviados | | serializer.class | kafka.serializer.DefaultEncoder | La categoría de serialización del mensaje. El codificador por defecto introduce un byte[] y devuelve el mismo byte[] | | key.serializer.class | | Clase de serialización para palabras clave. Si esto no se indica, el valor por defecto es coincidir con el mensaje | | partitioner.class | kafka.producer.DefaultPartitioner | Partitioner para dividir mensajes entre subtemas. El particionador por defecto se basa en la tabla hash de la clave | | compression.codec | ninguno | Este parámetro puede configurar el códec para comprimir datos, que puede seleccionarse como "ninguno", "gzip", "snappy". | | tópicos comprimidos | nulo | Este parámetro puede usarse para determinar si ciertos temas están comprimidos. Si el códec comprimido es otro códec distinto de NoCompressCodec, estos códecs se aplican a los datos de los temas especificados. Si la lista de temas comprimidos está vacía, aplica el códec comprimido específico a todos los temas. Si el códec comprimido es NoCompressionCodec, la compresión no está disponible para todos los temas. | | message.send.max.reintentos | 3 | Este parámetro hará que el productor vuelva a intentar automáticamente las solicitudes de envío fallidas. Este parámetro fija el número de intentos. Nota: Establecer un valor distinto de 0 provocará que se repitan ciertos errores de red: provocará un envío y una pérdida de acuse de recibo | | retry.backoff.ms | 100 | Antes de cada intento, el productor actualiza los metadatos del tema relevante para ver si se asigna el nuevo líder. Como la selección del líder lleva un poco de tiempo, esta opción especifica cuánto tiempo debe esperar el productor antes de actualizar los metadatos. | | topic.metadata.refresh.interval.ms | 600*1000 | El productor generalmente actualiza los metadatos del tema en algunos escenarios de fallo (partición faltante, líder no disponible, etc.). Seguirá un ciclo regular. Si lo pones en un valor negativo, los metadatos solo se actualizarán si falla. Si se configura en 0, los metadatos se actualizan después de enviar cada mensaje (esta opción no se recomienda, el sistema consume demasiado). Importante: Las actualizaciones solo ocurren después de que el mensaje se envía, así que si el productor nunca envía el mensaje, los metadatos nunca se actualizan. | | queue.buffering.max.ms | 5000 | El intervalo máximo de tiempo en el que el usuario almacena en caché los datos cuando se aplica el modo asincrónico. Por ejemplo, si el mensaje está configurado en 100, los mensajes dentro de los 100 ms se procesarán en lotes. Esto mejorará el rendimiento, pero aumentará la latencia debido al almacenamiento en caché. | | queue.buffering.max.mensajes | 10000 | Al usar el modo asincrónico, el número máximo de mensajes no enviados que se pueden almacenar en caché en la cola antes del productor debe ser bloqueado o se deben perder datos | | batch.num.messages | 200 | Al usar el modo asincrónico, puedes procesar en lote el máximo número de mensajes. O el número de mensajes ha llegado a esto online o ha llegado queue.buffer.max.ms, y el productor lo procesará | | enviar.buffer.bytes | 100*1024 | Tamaño de la caché de escritura del socket | | client.id | “” | Este id de cliente es una cadena específica de usuario que se incluye en cada petición para rastrear la llamada, y lógicamente debería poder confirmar que la aplicación hizo la solicitud. |
Configuraciones de productores
| Nombre | Tipo | Predeterminado | Importancia | Descripción | | boostrap.servers | Lista | | Alto | Grupo host/puerto para establecer una conexión con el clúster kafka. Los datos se cargarán de forma uniforme en todos los servidores, independientemente del servidor designado para el bootstrapping. Esta lista solo afecta a los hosts inicializados (que se utilizan para descubrir todos los servidores). Este formato de lista:
host1:port1,host2:port2,... Dado que estos servidores solo se utilizan para inicializar conexiones y descubrir todas las membresías del clúster (que pueden cambiar dinámicamente), esta lista no necesita contener todos los servidores (puede que quieras más de un servidor, aunque en este caso, uno puede estar caído). Si no aparece ningún servidor en esta lista, el envío de datos fallará hasta que la lista esté disponible. | | acks | Cuerda | 1 | Alto | El productor necesita una señal del servidor para confirmar la recepción tras recibir los datos, y esta configuración se refiere a cuántas señales de acuse de recibo necesita el productor. Esta configuración representa en realidad la disponibilidad de copias de seguridad de datos. Las siguientes configuraciones son opciones habituales: (1) acks=0: Poner en 0 significa que el productor no necesita esperar ninguna confirmación de la información recibida. La réplica se añadirá inmediatamente al búfer de socket y se considerará enviada. No hay garantía de que el servidor haya recibido correctamente los datos en este caso, y el reintento de la configuración no funcionará (porque el cliente no sabe si falló) y el desplazamiento de la retroalimentación siempre se establecerá en -1. (2) acks=1: Esto significa que al menos hay que esperar a que el líder escriba con éxito los datos en el registro local, pero no a que todos los seguidores escriban con éxito. En este caso, si el seguidor no hace una copia de seguridad correcta de los datos y el líder cuelga de nuevo, el mensaje se perderá. (3) acks=todos: Esto significa que el líder debe esperar a que todas las copias de seguridad escriban con éxito los registros, y esta estrategia asegurará que no se pierdan datos mientras sobreviva una copia de seguridad. Esta es la garantía más fuerte. (4) También son posibles otros ajustes, como acks=2, que requerirán un número determinado de acks, pero esta estrategia generalmente se usa raramente. | | buffer.memory | largo | 33554432 | Alto | El productor puede usarse para almacenar en caché el tamaño de memoria de los datos. Si los datos se generan más rápido de lo que se envían al broker, el productor bloqueará o lanzará una excepción, indicada por "block.on.buffer.full".
Esta configuración estará relacionada con la memoria total que puede usar el productor, pero no es un límite estricto, ya que no toda la memoria utilizada por el productor se utiliza para almacenamiento en caché. Se utiliza parte de la memoria adicional para la compresión (si se introduce compresión), y parte para solicitudes de mantenimiento. | | compression.type | Cuerda | ninguno | Alto | productor es el tipo de compresión utilizado para comprimir datos. El valor predeterminado es sin comprimir. Los valores correctos de las opciones son ninguno, gzip, rápido. La compresión es mejor utilizada para el procesamiento por lotes; cuantos más mensajes se procesen en lotes, mejor será el rendimiento de la compresión. | | Reintentos | int | 0 | Alto | Establecer un valor mayor que 0 hará que el cliente reenvíe cualquier dato una vez que esos datos fallen. Ten en cuenta que estos intentos no son diferentes de los que el cliente recibe un error de envío. Permite que los reintentos cambien potencialmente el orden de los datos; si ambos registros de mensajes se envían a la misma partición, el primer mensaje falla y el segundo aparece antes que el primero. | | tamaño lote | int | 16384 | Medio | El productor intentará agrupar los registros de mensajes para reducir el número de solicitudes. Esto mejorará el rendimiento entre cliente y servidor. Esta configuración controla el número predeterminado de bytes de los mensajes de procesamiento por lotes. No se intenta procesar bytes de mensaje superiores a este recuento de bytes. Las solicitudes enviadas a los brokers contendrán varios lotes, que contendrán una petición para cada partición. Los valores de procesamiento por lotes más pequeños se usan menos y pueden reducir el rendimiento (0 solo usará procesamiento por lotes). Los valores de lote más grandes desperdician más espacio de memoria, por lo que necesitas asignar memoria para valores específicos de lote. | | client.id | Cuerda | | Medio | Esta cadena se envía al servidor cuando se realiza una solicitud. El objetivo es poder rastrear el origen de las solicitudes para permitir que algunas aplicaciones fuera de la lista de permisos IP/puerto envíen información. Esta app puede configurar cualquier cadena porque no tiene otra función funcional que grabar y registrar | | linger.ms | largo | 0 | Medio | El grupo de productores agregará cualquier mensaje que llegue entre la solicitud y el envío, registrando un lote separado de solicitudes. Normalmente, esto solo ocurre cuando el registro se genera más rápido que la tasa de envío. Sin embargo, bajo ciertas condiciones, el cliente querrá reducir el número de solicitudes, o incluso a una carga moderada. Esta configuración se realiza añadiendo un pequeño retardo; es decir, en lugar de enviar un registro inmediatamente, el productor espera un tiempo de retardo dado para permitir que se envíen otros registros de mensajes, que pueden ser agrupados. Esto puede considerarse un algoritmo similar al TCP Nagle. Esta configuración establece un límite de latencia más alto para el batching: una vez que obtenemos el tamaño de batch de una partición, la enviará inmediatamente independientemente de esta configuración, pero si recibimos un mensaje con un número de bytes mucho menor que este, necesitamos "demorarnos" un tiempo específico para recibir más mensajes. Esta configuración por defecto es 0, es decir, sin retraso. Configurar linger.ms=5, por ejemplo, reducirá el número de peticiones, pero al mismo tiempo aumentará el retardo en 5 ms. | | max.request.size | int | 1028576 | Medio | El número máximo de bytes solicitados. Esta también es una cobertura eficaz para el tamaño máximo registrado. Nota: El servidor tiene su propia anulación de tamaños de registros de mensajes, que son diferentes a esta configuración. Esta configuración limita el número de solicitudes que los productores pueden enviar en masa a la vez para evitar un gran número de solicitudes. | | recibido.buffer.bytes | int | 32768 | Medio | Tamaño de recepción de caché TCP, que se utiliza al leer datos | | enviar.buffer.bytes | int | 131072 | Medio | Tamaño de caché de envío TCP, que se utiliza al enviar datos | | timeout.ms | int | 30000 | Medio | Esta opción de configuración controla el tiempo máximo que el servidor espera la confirmación de los seguidores. Si el número de solicitudes confirmadas no se cumple dentro de este plazo, se devuelve un error. Este límite de tiempo de espera se mide en el lado del servidor y no tiene latencia de red, incluyendo las solicitudes | | bloquear.on.buffer.full | Booleano | true | Bajo | Cuando se agota nuestra memoria caché, debemos dejar de recibir nuevos registros de mensajes o lanzar errores. Por defecto, esto está configurado como verdadero, aunque algunos bloqueos pueden no ser lo que merezca la pena, así que es mejor lanzar un error de inmediato. Esto ocurre cuando se configura como false: el productor lanza un error de excepción: BufferExhaustedException si el registro ha sido enviado y la caché está llena | | metadata.fetch.timeout.ms | largo | 60000 | Bajo | Se refiere a los datos de la primera vez de algunos elementos que hemos obtenido. Los elementos incluyen: tema, anfitrión, particiones. Esta configuración se refiere al tiempo necesario para que el elemento se complete correctamente según la obtención, de lo contrario se enviará una excepción al cliente. | | metadata.max.age.ms | largo | 300000 | Bajo | El tiempo en microsegundos es el intervalo en el que forzamos la actualización de los metadatos. Aunque no veamos cambios en el liderazgo de la partición. | | metric.reporters | Lista | [] | Bajo | Una lista de clases utilizadas para medir métricas. Implementar la interfaz MetricReporter permitirá añadir clases que cambian a medida que se generan nuevas métricas. JmxReporter siempre incluirá una forma de registrar estadísticas de JMX | | metrics.num.samples | int | 2 | Bajo | El número de muestras utilizadas para mantener métricas | | metrics.sample.window.ms | largo | 30000 | Bajo | El sistema de Métricas mantiene un número configurable de muestras en un tamaño de ventana corregible. Esta configuración configura, por ejemplo, el tamaño de la ventana. Podemos mantener dos muestras durante un periodo de 30 segundos. Cuando se despliega una ventana, borramos y reescribimos la ventana más antigua | | recoonect.backoff.ms | largo | 10 | Bajo | Cuando la conexión falla, el tiempo de espera para volver a conectar. Esto evita reconexiones repetidas de clientes | | retry.backoff.ms | largo | 100 | Bajo | El tiempo de espera antes de intentar de nuevo una solicitud de frutas y verduras fallida. Evita quedarte atascado en un bucle muerto de envío y fallo.
|
|