Revisión:
Utiliza el siguiente comando para comprobar el estado de ejecución de Kafka. Como sigue:
kafka.service Cargado: cargado (/usr/lib/systemd/system/kafka.service; Habilitado; Preajuste de vendedor: deshabilitado) Activo: fallido (Resultado: código de salida) desde mié 2021-09-22 14:43:11 CST; Hace 1h 43 Proceso: 7363 ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties (code=exited, status=1/FAILURE) PID principal: 7363 (código=cerrado, estado=1/FALLO)
22 sep 14:43:11 devops02 kafka-server-start.sh[7363]: [2021-09-22 14:43:11,295] WARN [ReplicaManager broker=1] Dejando de servir réplicas en dir /tmp/kafka-logs ( kafka.server.ReplicaManager) 22 sep 14:43:11 devops02 kafka-server-start.sh[7363]: [2021-09-22 14:43:11,298] ADVERTENCIA [GroupCoordinator 1]: No se escribió metadatos vacíos para el grupo KqBatchAna: Esto no es correcto coordinador. (kafka.co... upCoordinator) 22 sep 14:43:11 devops02 kafka-server-start.sh[7363]: [2021-09-22 14:43:11,303] INFO [ReplicaFetcherManager en el broker 1] Se eliminó el buscador para particiones HashSet(__consumer_offsets-22, __ consumer_offsets-30, ...-8, __consumer 22 sep 14:43:11 devops02 kafka-server-start.sh[7363]: [2021-09-22 14:43:11,304] INFO [ReplicaAlterLogDirsManager en el broker 1] Se eliminó el buscador para particiones HashSet(__consumer_ desplazamientos-22, __consumer_offsets... fsets-8, __con 22 sep 14:43:11 devops02 kafka-server-start.sh[7363]: [2021-09-22 14:43:11,378] WARN [ReplicaManager broker=1] Broker 1 detuvo el buscador de particiones __consumer_offsets-22,__ consumer_offsets-30.__consumer_... fsets-21,__con 22 sep 14:43:11 devops02 kafka-server-start.sh[7363]: [2021-09-22 14:43:11,379] ADVERTENCIA Dejando de servir registros en dir/tmp/kafka-logs (kafka.log.LogManager) 22 sep 14:43:11 devops02 kafka-server-start.sh[7363]: [2021-09-22 14:43:11,386] ERROR Apagado broker porque todos los directores de registro en /tmp/kafka-logs han fallado (kafka.log.LogManager) 22 sep 14:43:11 devops02 systemd[1]: kafka.service: proceso principal cerrado, code=exited, status=1/FAILURE 22 sep 14:43:11 devops02 systemd[1]: La unidad kafka.service entró en estado fallido. 22 sep 14:43:11 devops02 systemd[1]: kafka.service fallido.
Hint: Some lines were ellipsized, use -l to show in full.
Ve al directorio de registro de Kafka /usr/local/kafka/logs para ver los archivos de registro server.log, de la siguiente manera:
[2021-09-22 14:43:11,286] ERROR Error al rodar el segmento de registro para __consumer_offsets-8 en dir/tmp/kafka-logs (kafka.server.LogDirFailureChannel)
java.io.FileNotFoundException: /tmp/kafka-logs/__consumer_offsets-8/00000000000000000000.index (No such file or directory) en java.io.RandomAccessFile.open0(Método nativo) en java.io.RandomAccessFile.open(RandomAccessFile.java:316) en java.io.RandomAccessFile. <init>(RandomAccessFile.java:243) en kafka.log.AbstractIndex.$anonfun$resize$1(AbstractIndex.scala:182) en kafka.log.AbstractIndex.resize(AbstractIndex.scala:175) en kafka.log.AbstractIndex.$anonfun$trimToValidSize$1(AbstractIndex.scala:241) en kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:241) en kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:507) en kafka.log.Log.$anonfun$roll$8(Log.scala:2037) en kafka.log.Log.$anonfun$roll$8$adapted(Log.scala:2037) En Scala. Option.foreach(Option.scala:437) en kafka.log.Log.$anonfun$roll$2(Log.scala:2037) en kafka.log.Log.roll(Log.scala:2453) en kafka.log.Log.maybeRoll(Log.scala:1988) en kafka.log.Log.append(Log.scala:1263) en kafka.log.Log.appendAsLeader(Log.scala:1112) en kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1069) en kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1057) at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:958) en scala.collection.Iterator$$anon$9.next(Iterator.scala:575) en scala.collection.mutable.Growable.addAll(Growable.scala:62) en scala.collection.mutable.Growable.addAll$(Growable.scala:57) en scala.collection.immutable.MapBuilderImpl.addAll(Map.scala:692) en scala.collection.immutable.Map$.from(Map.scala:643) en scala.collection.immutable.Map$.from(Map.scala:173) en scala.collection.MapOps.map(Map.scala:266) en scala.collection.MapOps.map$(Map.scala:266) en scala.collection.AbstractMap.map(Map.scala:372) at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:946) en kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:616) en kafka.coordinator.group.GroupMetadataManager.storeGroup(GroupMetadataManager.scala:325) en kafka.coordinator.group.GroupCoordinator.$anonfun$onCompleteJoin$1(GroupCoordinator.scala:1206) en kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:227) en kafka.coordinator.group.GroupCoordinator.onCompleteJoin(GroupCoordinator.scala:1178) en kafka.coordinator.group.DelayedJoin.onComplete(DelayedJoin.scala:43) en kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:72) en kafka.coordinator.group.DelayedJoin.$anonfun$tryComplete$1(DelayedJoin.scala:38) en kafka.coordinator.group.GroupCoordinator.$anonfun$tryCompleteJoin$1(GroupCoordinator.scala:1172) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.scala:17) en kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:227) en kafka.coordinator.group.GroupCoordinator.tryCompleteJoin(GroupCoordinator.scala:1171) en kafka.coordinator.group.DelayedJoin.tryComplete(DelayedJoin.scala:38) en kafka.server.DelayedOperation.safeTryCompleteOrElse(DelayedOperation.scala:110) en kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:234) en kafka.coordinator.group.GroupCoordinator.prepareRebalance(GroupCoordinator.scala:1144) en kafka.coordinator.group.GroupCoordinator.$anonfun$maybePrepareRebalance$1(GroupCoordinator.scala:1118) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) en kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:227) en kafka.coordinator.group.GroupCoordinator.maybePrepareRebalance(GroupCoordinator.scala:1117) en kafka.coordinator.group.GroupCoordinator.removeMemberAndUpdateGroup(GroupCoordinator.scala:1156) en kafka.coordinator.group.GroupCoordinator.$anonfun$handleLeaveGroup$3(GroupCoordinator.scala:498) en scala.collection.immutable.List.map(List.scala:246) en kafka.coordinator.group.GroupCoordinator.$anonfun$handleLeaveGroup$2(GroupCoordinator.scala:470) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) en kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:227) en kafka.coordinator.group.GroupCoordinator.handleLeaveGroup(GroupCoordinator.scala:467) at kafka.server.KafkaApis.handleLeaveGroupRequest(KafkaApis.scala:1659) at kafka.server.KafkaApis.handle(KafkaApis.scala:180) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74) at java.lang.Thread.run(Thread.java:748) Causas del error:Linux limpia regularmente archivos en el directorio /tmp, el directorio de archivos kafka se almacena por defecto/tmp/kafka-logsdirectorios, lo que provocaba que se limpiara regularmente, lo que resultaba en un funcionamiento anormal del programa.
Bajo CentOS 7, hay 3 servicios del sistema relacionados con la limpieza:
systemd-tmpfiles-setup.service :Crear archivos y directorios volátiles systemd-tmpfiles-setup-dev.service:Crear nodos estáticos de dispositivos en /dev systemd-tmpfiles-clean.service :Limpieza de directorios temporales También hay 3 perfiles relacionados, como sigue:
/etc/tmpfiles.d/*.conf
/run/tmpfiles.d/*.conf
/usr/lib/tmpfiles.d/*.conf Utiliza el siguiente comando para ver los registros relacionados con tmpfiles:
Directorio TMP en/usr/lib/tmpfiles.d/tmp.confLa configuración del archivo es la que se muestra en la siguiente figura:
# Este archivo forma parte de systemd.
# # SystemD es software libre; puedes redistribuirlo y/o modificarlo # bajo los términos de la Licencia Pública General Menor GNU publicada por # la Fundación de Software Libre; ya sea la versión 2.1 de la Licencia, o # (a tu elección) Cualquier versión posterior.
# Consulta tmpfiles.d(5) para más detalles
# Borrar los directorios tmp por separado, para facilitar su anulación v /tmp 1777 raíz raíz 10d v /var/tmp 1777 raíz raíz 30d
# Excluir puntos de montaje de espacio de nombres creados con PrivateTmp=sí x /tmp/systemd-private-%b-* X /tmp/systemd-private-%b-*/tmp x /var/tmp/systemd-private-%b-* X /var/tmp/systemd-private-%b-*/tmp
Solución 1
Modifica el archivo de configuración de Kafka /config/server.properties para modificar la configuración de los log.dirs, por ejemplo:
Solución 2
Añade un directorio de exclusión y edita el archivo: /usr/lib/tmpfiles.d/tmp.conf
(Fin)
|