Конфигурации брокера
| Свойства | По умолчанию | Описание | | broker.id | | Каждый брокер может быть идентифицирован уникальным неотрицательным целочисленным идентификатором; Этот идентификатор может использоваться как «имя» брокера, а его существование позволяет брокеру переходить на другой хост/порт, не путав потребителей. Вы можете выбрать любое число в качестве идентификатора, если оно уникальное. | | log.dirs | /tmp/kafka-logs | Путь, где Кафка хранит данные. Этот путь не уникален, он может быть множественным, и пути нужно разделять только запятыми; Всякий раз, когда создаётся новая секция, она выбирается для этого по пути, содержащему наименьшее количество разбитий. | | Порт | 6667 | Сервер принимает порт, к которому подключается клиент | | zookeeper.connect | недействительный | Формат строки соединения ZooKeeper: hostname:port, где hostname и port — это хост и порт узла в кластере ZooKeeper соответственно. Чтобы подключиться к другим узлам ZooKeeper при выходе из строя хоста, можно создать несколько хостов следующим образом:
hostname1:port1, hostname2:port2, hostname3:port3.
ZooKeeper позволяет добавить путь «chroot» для хранения всех данных Kafka в кластере под определённым путём. Когда несколько кластеров Kafka или других приложений используют один и тот же кластер ZooKeeper, вы можете использовать этот метод для настройки пути хранения данных. Это можно реализовать, отформатировав строку соединения следующим образом: hostname1:port1,hostname2:port2,hostname3:port3/chroot/path Эта настройка хранит все данные кластера Kafka в пути /chroot/path. Обратите внимание, что перед запуском брокера вы должны создать этот путь, а пользователи должны использовать тот же формат соединения. | | message.max.bytes | 1000000 | Максимальный размер сообщений, которые сервер может получить. Важно, чтобы настройки потребителя и производителя относительно этого свойства были синхронизированы, иначе сообщение, опубликованное производителем, слишком большое для потребителя. | | num.network.threads | 3 | Количество сетевых потоков, используемых сервером для обработки сетевых запросов; Обычно менять это свойство не обязательно. | | num.io.threads | 8 | Количество потоков ввода-вывода, используемых сервером для обработки запросов; Количество нитей должно быть как минимум равным количеству жёстких дисков. | | background.threads | 4 | Количество потоков, используемых для фоновой обработки, таких как удаление файлов; Вам не нужно менять этот объект. | | queued.max.requests | 500 | Максимальное количество запросов, которые можно поставить в очередь для обработки потока ввода-вывода до того, как сетевой поток перестанет читать новые запросы. | | host.name | недействительный | имя брокера; Если имя хоста уже установлено, брокер будет привязан только к этому адресу; Если настройки нет, он привязывается ко всем интерфейсам и публикует одну копию на ZK | | advertised.host.name | недействительный | Если он настроен, он отправляется производителям, потребителям и другим брокерам как хост-имя брокера | | рекламированный порт | недействительный | Этот порт предоставляется производителям, потребителям и другим брокерам и используется для установления связи; Его нужно настроить только если разные порты и порт, который должен привязать сервер. | | socket.send.buffer.bytes | 100 * 1024 | SO_SNDBUFF Размер кэша, который сервер использует для соединения сокетов | | socket.receive.buffer.bytes | 100 * 1024 | SO_RCVBUFF размер кэша, который используется сервером для подключения к сокетам | | socket.request.max.bytes | 100 * 1024 * 1024 | Максимальный размер запроса, разрешённый сервером; Это позволит избежать переполнения серверов, которые должны быть меньше размера Java-кучи | | num.partitions | 1 | Если при создании темы не указано количество разделов, это число будет по умолчанию числом разделов под этой темой. | | log.segment.bytes | 1014*1024*1024 | Логи раздела тема хранятся во многих файлах в определённой директории, которые делят логи раздела на сегменты; Этот атрибут — максимальный размер каждого файла; Когда размеры достигают этого значения, создаётся новый файл. Этот параметр можно переопределить основой каждой темы. Вид Вход по гиперссылке виден. | | log.roll.hours | 24 * 7 | Даже если файл не достигает log.segment.bytes, новый файл создаётся каждый раз, когда время создания файла достигает этого свойства. Эта настройка также может быть переопределена настройками на уровне темы; ВидВход по гиперссылке виден. | | log.cleanup.policy | Удалить | | | log.retention.minutes и log.retention.hours | 7 дней | Время, когда каждый файл журнала сохранялся до его удаления. Стандартное время хранения данных одинаково для всех тем. log.retention.minutes и log.retention.bytes используются для настройки удаления файлов журнала, независимо от того, какое свойство было переполнено. Эту настройку свойства можно переопределить, когда тема фактически установлена. ВидВход по гиперссылке виден. | | log.retention.bytes | -1 | Общее количество данных, сохранённых каждым разделом по каждой теме; Обратите внимание, что это верхний предел для каждой порции, поэтому это число, умноженное на количество разделов, — это общее количество данных, хранящихся в каждой теме. Также обратите внимание: если установлены и log.retention.hours, и log.retention.bytes, превышение любого из ограничений приведёт к удалению файла сегмента. Обратите внимание, что эта настройка может быть заменена каждой темой. ВидВход по гиперссылке виден. | | log.retention.check.interval.ms | 5 минут | Проверьте интервал между сегментированными файлами в логе, чтобы определить, соответствуют ли атрибуты файлов требованиям удаления. | | log.cleaner.enable | false | Когда это свойство установлено в false, оно будет удалено после хранения журнала на максимальное время или размер. Если установлено true, это произойдёт, когда атрибут сохранения достигнет верхнего пределаВход по гиперссылке виден.。 | | log.cleaner.threads | 1 | Количество потоков, выполняющих сжатие логарифма | | log.cleaner.io.max.bytes.per.sec | Никакой | Максимальное количество ввода-выводов, которое может иметь уборщик логарифма при выполнении уплотнения логарифма. Эта настройка ограничивает уборщика в предотвращении вмешательства в активные сервисы запросов. | | log.cleaner.io.buffer.size | 500*1024*1024 | Log Cleaner индексирует логи в процессе очистки и уменьшает размер используемого кэша. Лучше всего настраивать её большой, чтобы обеспечить достаточно памяти. | | log.cleaner.io.buffer.load.factor | 512*1024 | Размер блока ввода-вывода, необходимого для очистки бревна. Вам не нужно менять эту настановку. | | log.cleaner.io.buffer.load.factor | 0.9 | коэффициент загрузки хеш-таблицы, используемой при очистке логов; Вам не нужно менять этот вариант. | | log.cleaner.backoff.ms | 15000 | Проводится временной интервал, в течение которого уборка бревна | | log.cleaner.min.cleanable.ratio | 0.5 | Эта конфигурация определяет, как часто компактор бревен пытается очистить логи (предполагается)Вход по гиперссылке виден.открыта). По умолчанию избегайте очистки более 50% бревен. Это соотношение связано с максимальным пространством, занимаемым резервным журналом (50% логов сжимаются при 50%). Более высокая ставка означает меньше отходов и больше пространства можно эффективнее очистить. Эта настройка может быть отменена в каждой теме. ВидВход по гиперссылке виден.。 | | log.cleaner.delete.retention.ms | 1 день | время хранения; Максимальное время хранения сжатых журналов; Это также максимальное время для клиента для потребления сообщений, и разница между log.retention.minutes в том, что один управляет несжатыми данными, а другой — сжатыми данными, которые будут перезаписаны на указанное время создания темы. | | log.index.size.max.bytes | 10*1024*1024 | Максимальный размер на сегмент древево. Обратите внимание, что если размер логарифма достигает этого значения, необходимо сгенерировать новый сегмент логарифма, даже если его размер не превышает лимит log.segment.bytes. | | log.index.interval.bytes | 4096 | При выполнении извлечения нужно отсканировать ближайшее смещение с определённым пространством, чем выше настройка, тем лучше, обычно используйте значение по умолчанию | | log.flush.interval.messages | Long.MaxValue | лог-файл «синхронизируется» с диском перед накоплением сообщений. Поскольку операции ввода-вывода диска — это медленная операция, но также необходимый способ «надёжности данных», проверьте, требуется ли интервал времени для отверждения на жёстком диске. Если это значение слишком велико, это приведёт к слишком долгому времени «синхронизации» (блокировка ввода-вывода), если это значение слишком мало — к длительному времени «fsync» (блокировка ввода), если это слишком мало — к большому числу времени «синхронизации», что означает, что общий запрос клиента имеет определённую задержку, а сбой физического сервера приведёт к потере сообщений без fsync. | | log.flush.scheduler.interval.ms | Long.MaxValue | Проверьте, нужны ли fsync-интервалы | | log.flush.interval.ms | Long.MaxValue | Это число используется для управления временным интервалом «fsync»: если количество сообщений никогда не достигает количества сообщений, затвердённых на диске, но интервал от последнего синхронизации диска достигает порога, синхронизация диска также запускается. | | log.delete.delay.ms | 60000 | Время хранения после очистки файла в индексе обычно не требует изменений | | auto.create.topics.enable | true | Разрешать ли автоматическое создание тем. Если это верно, то автоматически создаст тему, которой не существует, когда не существует функции produce или fetch. В противном случае нужно использовать командную строку для создания темы | | controller.socket.timeout.ms | 30000 | Время тайм-аута сокета, когда контроллер управления разделами выполняет резервное копирование. | | controller.message.queue.size | Int.MaxValue | Controller-to-broker-channles | | default.replication.factor | 1 | По умолчанию количество резервных копий относится только к автоматически созданным темам | | replica.lag.time.max.ms | 10000 | Если последователь не отправит запрос на получение в течение этого времени, лидер удаляет его из ISR и считает его подвешенным | | replica.lag.max.сообщения | 4000 | Если у реплики больше неподкрепленного числа, лидер убирает последователя и считает его подвешенным | | replica.socket.timeout.ms | 30*1000 | Время тайм-аута лидера для запросов сети сокетов при резервном копировании данных | | replica.socket.receive.buffer.bytes | 64*1024 | Сокетный буфер приёма при отправке сетевого запроса лидеру во время резервного копирования | | replica.fetch.max.bytes | 1024*1024 | Максимальное значение каждой сборки на момент резервного копирования | | replica.fetch.min.bytes | 500 | Максимальное время ожидания, пока данные достигнут лидера, когда лидер делает резервный запрос | | replica.fetch.min.bytes | 1 | Минимальный размер ответа после каждого выбора при резервном копировании | | num.replica.fetchers | 1 | Количество потоков, которые делают резервную копию данных от лидера | | replica.high.watermark.checkpoint.interval.ms | 5000 | Каждая реплика проверяет, как часто высший уровень воды отверждается | | fetch.purgatory.purge.interval.requests | 1000 | Запрос на получение интервала очистки | | producer.purgatory.purge.interval.requests | 1000 | продюсер запрашивает интервал очистки | | zookeeper.session.timeout.ms | 6000 | Тайм-аут сессии смотрителя зоопарка. | | zookeeper.connection.timeout.ms | 6000 | Максимальное время, которое клиент ждёт для установления связи с смотрителем зоопарка | | zookeeper.sync.time.ms | 2000 | ZK Follower долгое время отстаёт от ZK Leader | | controlled.shutdown.enable | true | Возможно ли контролировать закрытие брокера. Если возможно, брокер сможет перевести всех лидеров к другим брокерам до закрытия сделки. Это снижает недоступность во время процесса отключения. | | controlled.shutdown.max. повторяет | 3 | Количество команд, которые могут успешно выполнить отключение до выполнения неполного отключения. | | controlled.shutdown.retry.backoff.ms | 5000 | Время отдыха между закрытиями | | auto.leader.rebalance.enable | true | Если это так, контроллер автоматически сбалансирует руководство брокеров по разделам | | leader.imbalance.per.broker.percent | 10 | Максимальное соотношение дисбаланса, разрешённое каждым брокером | | leader.imbalance.check.interval.seconds | 300 | Проверьте частоту дисбаланса лидера | | offset.metadata.max.bytes | 4096 | Позволяет клиентам сохранять максимальное количество своих смещений | | max.connections.per.ip | Int.MaxValue | Максимальное количество соединений на одного брокера может быть установлено к каждому IP-адресу | | max.connections.per.ip.overrides | | Максимальное покрытие по умолчанию соединения на IP или имя хоста | | connections.max.idle.ms | 600000 | Лимит тайм-аута для пустых соединений | | log.roll.jitter. {MS,HOURS} | 0 | Максимальное количество дрожения, абстрагированных из logRollTimeMillis | | num.recovery.threads.per.data.dir | 1 | Количество потоков, которые каждый каталог данных использует для ведения восстановления | | нечистый.лидер.выбор.включить | true | Указывает, возможно ли использовать настройки нереплик в ISR в качестве лидера | | delete.topic.enable | false | Возможность удалять темы | | offsets.topic.num.partitions | 50 | Количество разделов для темы смещенного коммита. Поскольку изменение этого решения после развертывания в настоящее время не поддерживается, мы рекомендуем использовать более высокие настройки для продакшена (например, 100-200). | | offsets.topic.retention.minutes | 1440 | Смещения, существующие дольше этого срока, будут отмечены как ожидающие удаления | | offsets.retention.check.interval.ms | 600000 | Менеджер смещений проверяет частоту устаревших смещений | | offsets.topic.replication.factor | 3 | Количество резервных копий темы смещено. Рекомендуется устанавливать более высокие цифры для обеспечения большей доступности | | offset.topic.segment.bytes | 104857600 | Смещает тему. | | offsets.load.buffer.size | 5242880 | Эта настройка связана с размером партии и используется при чтении из сегмента смещения. | | offsets.commit.required.acks | -1 | Количество подтверждений должно быть установлено до того, как офсет-коммит станет приемлемым, и обычно его не нужно менять |
| Свойства | По умолчанию | Свойство сервера по умолчанию | Описание | | cleanup.policy | Удалить | log.cleanup.policy | Либо «удалить», либо «компактить»; Эта строка указывает, как использовать старую часть логарифма; Стандартный метод («delete») сбрасывает старую деталь по мере достижения лимита времени переработки или размера. «компактный» будет сжимать логи | | delete.retention.ms | 86400000 (24 часа) | log.cleaner.delete.retention.ms | Отличие log.retention.minutes в том, что один управляет несжатыми данными, а другой — сжатыми данными. Эта конфигурация может быть переопределена параметрами закрепления при создании темы | | Смывать.сообщения | Никакой | log.flush.interval.messages | Эта конфигурация задаёт временной интервал для принудительного создания fsync-логов. Например, если эта опция установлена в 1, то fsync требуется после каждого сообщения, а если установлен на 5 — fsync требуется для каждых 5 сообщений. В целом рекомендуется не устанавливать это значение. Настройка этого параметра требует необходимого компромисса между «надёжностью данных» и «производительностью». Если это значение слишком велико, это приведёт к длительному времени на «fsync» каждый раз (блокировка ввода-вывода), а если это значение слишком мало, это приведёт к большому числу «fsync», что также означает определённую задержку в общем запросе клиента. Физический сбой сервера приведёт к потере сообщений без fsync. | | flush.ms | Никакой | log.flush.interval.ms | Эта конфигурация используется для закрепления интервала времени между принудительными fsync-логами на диске; Например, если установлен на 1000, то fsync требуется каждые 1000 мс. Этот вариант обычно не рекомендуется | | index.interval.bytes | 4096 | log.index.interval.bytes | Настройка по умолчанию гарантирует, что мы добавляем индекс в сообщение каждые 4096 байт, и больше индексов приближает чтение, но размер индекса увеличивается. Эта опция, как правило, не требуется | | max.message.bytes | 1000000 | max.message.bytes | Максимальный размер сообщения с добавлением кафки. Обратите внимание, что если вы увеличиваете этот размер, вам также необходимо увеличить размер сборки пользователя, чтобы пользователь мог получать сообщения до этих максимальных размеров. | | минимальное.чистое.грязное.соотношение | 0.5 | минимальное.чистое.грязное.соотношение | Эта конфигурация определяет, как часто компрессор бревна пытается прочистить полена. По умолчанию журналы с степенью сжатия более 50% будут избегаться. Такое соотношение позволяет избежать наибольших потерь пространства | | min.insync.replicas | 1 | min.insync.replicas | Когда производитель установлен на request.required.acks в -1, min.insync.replicas указывает минимальное количество реплик (каждая запись repica должна быть успешной), и если это число не достигнуто, производитель создаёт исключение. | | retention.bytes | Никакой | log.retention.bytes | Если вы используете политику удержания «удалить», эта конфигурация относится к максимальному размеру, которого лог может достичь до его удаления. По умолчанию нет ограничения по размеру, только ограничение по времени | | retention.ms | 7 дней | log.retention.minutes | Если вы используете политику сохранения «удалить», эта конфигурация относится к времени, когда журнал был сохранён до журнала удаления. | | segment.bytes | 1GB | log.segment.bytes | В Kafka логарифмические логарифмы хранятся в чанках, и эта конфигурация относится к размеру логарифмических журналов, разделённых на части | | segment.index.bytes | 10 МБ | log.index.size.max.bytes | Эта конфигурация примерно соответствует размеру индексного файла, отображаемого между смещениями и расположениями файлов; Эта конфигурация, как правило, не требует изменений | | segment.ms | 7 дней | log.roll.hours | Даже если файл log chunk не достигает нужного размера, необходимого удаления или сжатия, когда время журнала достигнет этого верхнего предела, будет вынужден создавать новый файл log chunk | | segment.jitter.ms | 0 | log.roll.jitter. {MS,HOURS} | Максимальный джиттер, который нужно вычесть из logRollTimeMillis. |
Потребительские конфигурации
| Свойства | По умолчанию | Описание | | group.id | | Строка, которая уникально определяет группу, в которой находится потребительский процесс, и если установлен тот же идентификатор группы, это означает, что эти процессы принадлежат одной и той же потребительской группе | | zookeeper.connect | | Укажите строку соединения Zookeeper, формат — hostname:port, здесь хост и порт — это и хост, и порт сервера Zookeeper, чтобы избежать потери связи после выхода из строя компьютера Zookeeper, можно указать несколько hostname:port, используя запятые как разделение: Имя хоста1:порт1,имя хоста2:порт2,имя хоста3:порт3 Вы можете добавить путь chroot ZooKeeper в строку соединения ZooKeeper, которая используется для хранения собственных данных, следующим образом: hostname1:port1,hostname2:port2,hostname3:port3/chroot/path | | consumer.id | недействительный | Настройка не требуется, обычно она генерируется автоматически | | socket.timeout.ms | 30*100 | Ограничения тайм-аута для сетевых запросов. Истинный лимит тайм-аута — max.fetch.wait+socket.timeout.ms | | socket.receive.buffer.bytes | 64*1024 | Socket используется для приема размера кэша сетевых запросов | | fetch.message.max.bytes | 1024*1024 | Максимальное количество байт на сообщение выборки на запрос на выбор. Эти байты будут контролироваться в памяти, используемой для каждого раздела, поэтому эта настройка будет контролировать объём памяти, используемой потребителем. Размер запроса на выборку должен быть как минимум равен максимальному размеру сообщения, разрешённому сервером, иначе размер сообщения, которое может отправить производитель, превышает размер, который может потреблять потребитель. | | num.consumer.fetchers | 1 | Количество потоков fetcher, используемых для извлечения данных | | auto.commit.enable | true | Если верно, смещение сообщения, полученного потребителем, автоматически синхронизируется с смотрителем зоопарка. Этот смещение фикса будет использоваться новым потребителем, когда процесс останавливается | | auto.commit.interval.ms | 60*1000 | Частота, с которой потребитель отправляет офсет смотрителю зоопарка, составляет секунды | | queued.max.message.chunks | 2 | Максимальное количество сообщений, используемых для кэширования с целью потребления. Каждый блок должен быть совпадать с fetch.message.max.bytes | | rebalance.max. повторяет | 4 | Когда в потребительскую группу добавляется новый потребитель, коллекция потребителей пытается сбалансировать количество разделов, выделенных каждому потребителю. Если потребительская коллекция меняется, эта ребалансировка проваливается и возвращается во время выполнения распределения | | fetch.min.bytes | 1 | Минимальное количество байт, которые сервер должен возвращать с каждым запросом на получение. Если возвращается недостаточно данных, запрос ждёт возвращения достаточного количества данных. | | fetch.wait.max.ms | 100 | Если данных недостаточно для выполнения fetch.min.bytes, эта конфигурация отражает максимальное время, которое сервер блокирует перед ответом на запрос на получение. | | rebalance.backoff.ms | 2000 | Время отступить перед повторной попыткой ребланса | | refresh.leader.backoff.ms | 200 | Есть время отступления, чтобы подождать, прежде чем пытаться определить, потерял ли лидер раздела своё руководство | | auto.offset.reset | Самый крупный | Если в Zookeeper нет инициализированного смещения, если смещение является ответом на следующее значение: Самый маленький: Автоматический сброс смещения на наименьший сдвиг наибольшее: Автоматический сброс смещения на смещение наибольшего Всё остальное: Это исключение для потребителя | | consumer.timeout.ms | -1 | Если сообщение недоступно, даже после определённого времени ожидания, вводится исключение тайм-аута | | exclude.internal.topics | true | Стоит ли раскрывать сообщения из внутренних тем потребителям | | paritition.assignment.strategy | Ареал | Выберите политику назначения разделов потребительскому потоку, опционально диапазон, круговой | | client.id | Значение идентификатора группы | — это строка, специфичная для пользователя, которая помогает отслеживать вызовы в каждом запросе. Логически должно подтвердить приложение, которое сгенерировало запрос | | zookeeper.session.timeout.ms | 6000 | Ограничения тайм-аута для сессий смотрителя зоопарка. Если потребитель не отправляет сообщение о сердцебиении смотрителю зоопарка в это время, оно считается заблокированным, и возникает ребланс | | zookeeper.connection.timeout.ms | 6000 | Максимальное время ожидания для клиента установления связи с Zookeeper | | zookeeper.sync.time.ms | 2000 | Последователи ZK могут отставать от лидера ZK в течение максимального времени | | offsets.storage | Смотритель зоопарка | Места, используемые для хранения офсетов: Zookeeper или Kafka | | offset.channel.backoff.ms | 1000 | Время отключения при повторном подключении к каналу смещения или повторной попытке поиска/фиксации неудачного смещения | | offsets.channel.socket.timeout.ms | 10000 | Лимит тайм-аута сокета для ответа на запрос на выборку/фиксацию при чтении смещения. Этот лимит тайм-аута используется в запросе consumerMetadata для запроса управления смещениями | | offsets.commit.max. повторяет | 5 | Количество повторных попыток офсетного коммита. Эта повторная попытка применяется только к офсетным коммитам между выключением. он | | dual.commit.enabled | true | Если использовать «kafka» как offsets.storage, можно дважды коммитировать offset в zookeeper (и один раз в kafka). Это обязательно при переходе с офсетного хранилища на базе Zookeeper на офсетное хранилище Kafka. Для любой группы потребителей рекомендуется отключить эту опцию после завершения перехода | | partition.assignment.strategy | Ареал | Выберите между политиками «диапазон» и «круговой робин» в качестве политики для назначения разделов потребительским потокам данных; Циркулярный аллокатор разделов выделяет все доступные разделы, а также все доступные потребительские потоки. Он назначит цикл раздела потребительскому потоку. Если все потребительские экземпляры подписаны на определенный, разбиения делятся на детерминированные распределения. Стратегия распределения по круговой системе возможна только при выполнении следующих условий: (1) Каждая тема имеет одинаковое количество потоков данных на уровень потребителей. (2) Коллекция подписанных тем определяется для каждого потребительского экземпляра в потребительской группе. |
Конфигурации продюсера
| Свойства | По умолчанию | Описание | | metadata.broker.list | | Подавать самостоятельный процесс. Producer используется только для получения метаданных (темы, разделы, реплики). Соединение сокета для отправки фактических данных будет установлено на основе возвращённых метаданных. Формат таков: host1:port1,host2:port2 Этот список может представлять собой подсписок брокеров или VIP, указывающий на брокеров | | request.required.acks | 0 | Эта конфигурация является значением подтверждения, которое указывает, когда запрос на продукт считается завершённым. В частности, сколько других брокеров должны были отправить данные в свои логи и подтвердить эту информацию своему руководителю. Типичные значения включают: 0: Означает, что производитель никогда не ждёт подтверждения от брокера (то же самое, что и 0.7). Этот вариант обеспечивает наименьшую задержку, но при этом наибольший риск (потому что данные теряются при выходе из строя сервера). 1: Указывает, что реплика лидера получила подтверждение данных. Эта опция имеет низкую задержку и гарантирует, что сервер подтверждает получение сообщения. -1: Производитель получает подтверждение, что все синхронизированные реплики получили данные. Задержка самая большая, однако этот метод не полностью исключает риск потери сообщений, так как количество синхронизированных реплик может быть 1. Если вы хотите гарантировать, что некоторые реплики получают данные, стоит установить опцию min.insync.replicas в настройках темы. Ознакомьтесь с проектной документацией для более глубокого обсуждения. | | request.timeout.ms | 10000 | Брокер старается реализовать требование request.required.acks, иначе ошибка будет отправлена клиенту | | producer.type | синхронизация | Эта опция фиксирует, отправляется ли сообщение асинхронно в фоновый поток. Правильные значения: (1) асинхронная: асинхронная отправка (2) синхронизация: синхронизированная отправка Установив производитель на асинхронный режим, мы можем обрабатывать запросы пакетами (что хорошо для большей пропускной способности), но это также создаёт вероятность того, что клиентская машина потеряет неотправленные данные | | serializer.class | kafka.serializer.DefaultEncoder | Категория сериализации сообщения. Стандартный кодировщик вводит один байт[] и возвращает тот же байт[] | | key.serializer.class | | Класс сериализации по ключевым словам. Если это не указано, по умолчанию нужно совпадать с сообщением | | partitioner.class | kafka.producer.DefaultPartitioner | Класс разделителя для разделения сообщений между подтемами. Стандартный разделитель основан на хеш-таблице ключа | | compression.codec | никакой | Этот параметр может задать кодек для сжатия данных, который можно выбрать как «none», «gzip», «snappy». | | compressed.topics | недействительный | Этот параметр можно использовать для установки сжатия определённых тем. Если сжатый кодек является кодеком, отличным от NoCompressCodec, эти кодеки применяются к указанным данным тем. Если список сжатых тем пуст, применяйте конкретный сжатый кодек ко всем темам. Если сжатый кодек — NoCompressionCodec, то сжатие доступно не для всех тем. | | message.send.max. повторяет | 3 | Этот параметр заставит производителя автоматически повторять неудачные запросы отправки. Этот параметр фиксирует количество повторных попыток. Примечание: установка значения, не отличного от 0, приведёт к повторению определённых ошибок в сети: вызовет отправку и потеря подтверждения | | retry.backoff.ms | 100 | Перед каждой повторной попыткой продюсер обновляет метаданные соответствующей темы, чтобы узнать, назначен ли новый лидер. Поскольку выбор лидера занимает немного времени, эта опция указывает, сколько времени нужно подождать производителю перед обновлением метаданных. | | topic.metadata.refresh.interval.ms | 600*1000 | Производитель обычно обновляет метаданные темы в некоторых случаях отказа (отсутствует раздел, лидер недоступен и т.д.). Он будет проходить через регулярный цикл. Если вы установите отрицательное значение, метаданные обновятся только в случае сбоя. Если установлен 0, метаданные обновляются после каждого отправленного сообщения (эта опция не рекомендуется, система потребляет слишком много). Важно: обновления происходят только после отправки сообщения, поэтому если продюсер никогда не отправляет сообщение, метаданные не обновляются. | | queue.buffering.max.ms | 5000 | Максимальный временной интервал, на котором пользователь кэширует данные при применении асинхронного режима. Например, если сообщение установлено на 100, сообщения в пределах 100 мс будут обрабатываться пакетами. Это повысит пропускную способность, но увеличит задержку из-за кэширования. | | queue.buffering.max.messages | 10000 | При использовании асинхронного режима максимальное количество неотправленных сообщений, которые можно кэшировать в очередь до появления производителя, должно быть заблокировано, иначе данные должны быть потеряны | | batch.num.messages | 200 | При использовании асинхронного режима можно пакетно обрабатывать максимальное количество сообщений. Или количество сообщений уже дошло до этого онлайн, или queue.buffer.max.ms пришло, и продюсер их обрабатывает | | send.buffer.bytes | 100*1024 | Размер кэша записи сокета | | client.id | “” | Этот идентификатор клиента — это строка, специфичная для пользователя, которая входит в каждый запрос для отслеживания вызова, и он логически должен подтвердить, что запрос сделал приложение. |
Конфигурации продюсера
| Имя | Тип | По умолчанию | Значение | Описание | | boostrap.servers | Список | | высокий | Группа хост/порт для установления соединения с кластером Kafka. Данные будут равномерно загружаться на всех серверах, независимо от того, какой сервер предназначен для загрузки. Этот список затрагивает только инициализованные хосты (которые используются для обнаружения всех серверов). Этот формат списка:
host1:port1,host2:port2,... Поскольку эти серверы используются только для инициализации соединений с целью выявления всех членов кластера (которые могут изменяться динамически), этот список не обязан содержать все серверы (возможно, потребуется несколько серверов, хотя в данном случае один сервер может быть не работать). Если в этом списке нет сервера, отправка данных не завершится, пока список не станет доступен. | | ACKS | Строка | 1 | высокий | Производительу нужен сигнал от сервера для подтверждения получения данных после получения данных, и эта конфигурация указывает, сколько таких сигналов подтверждения нужно прокудеру. Эта конфигурация фактически отражает доступность резервных копий данных. Следующие настройки являются распространёнными вариантами: (1) acks=0: Установка в 0 означает, что производителю не нужно ждать подтверждения полученной информации. Реплика сразу же будет добавлена в буфер сокета и считается отправленной. В этом случае нет гарантии, что сервер успешно получил данные, и повторная попытка конфигурации не сработает (потому что клиент не знает, сработала ли она), и смещение обратной связи всегда будет установлено на -1. (2) acks=1: Это означает, что нужно хотя бы дождаться, пока лидер успешно запишет данные в локальный лог, но не все последователи. В этом случае, если последователь не сделает резервную копию данных и лидер снова положит трубку, сообщение будет потеряно. (3) acks=all: Это означает, что лидеру нужно дождаться успешного записи всех резервных копий, и эта стратегия гарантирует, что данные не будут потеряны, пока одна из них сохраняется. Это самая сильная гарантия. (4) Возможны и другие настройки, например acks=2, которые требуют определённого количества acks, но эта стратегия обычно используется редко. | | buffer.memory | Долго | 33554432 | высокий | Производитель может использоваться для кэширования размера памяти данных. Если данные генерируются быстрее, чем отправляются брокеру, производитель блокирует или выбросит исключение, обозначенное как «block.on.buffer.full».
Эта настройка будет связана с общим объёмом памяти, которую может использовать производитель, но это не жёсткое ограничение, так как не вся память, используемая производителем, используется для кэширования. Часть дополнительной памяти используется для сжатия (если введётся сжатие), а часть — для запросов на обслуживание. | | compression.type | Строка | никакой | высокий | Producer — это тип сжатия, используемый для сжатия данных. По умолчанию он не сжат. Правильные значения опции: none, gzip, snappy. Сжатие лучше всего применяется для пакетной обработки: чем больше сообщений обрабатывается пакетами, тем лучше производительность сжатия. | | Повторные попытки | int | 0 | высокий | Установка значения выше 0 заставляет клиент повторно отправлять любые данные после отказа. Обратите внимание, что эти повторные попытки ничем не отличаются от тех, когда клиент получает ошибку отправки. Позволяет повторять попытки изменить порядок данных, если обе записи сообщений отправляются в один раздел, первое сообщение не работает, второе появляется раньше первого. | | batch.size | int | 16384 | Терпимая | Продюсер попытается объединить записи сообщений в пакеты, чтобы уменьшить количество запросов. Это повысит производительность между клиентом и сервером. Эта конфигурация управляет по умолчанию количество байт пакетных сообщений. Не предпринимаются попытки обработать байты сообщения, превышающие это количество байт. Запросы, отправленные брокерам, содержат несколько пакетов, которые содержат по одному запросу для каждого раздела. Меньшие значения пакетирования используются реже и могут снизить пропускную способность (0 использует только пакетную обработку). Большие пакетные значения расходуют больше памяти, поэтому нужно выделять память для конкретных пакетных значений. | | client.id | Строка | | Терпимая | Эта строка отправляется на сервер при выполнении запроса. Цель — отследить источник запросов, чтобы некоторые приложения вне списка разрешений IP/порта могли отправлять информацию. Это приложение может настраивать любую строку, потому что у него нет функциональной функции, кроме записи и отслеживания | | linger.ms | Долго | 0 | Терпимая | Группа продюсеров агрегирует все сообщения, поступающие между запросом и отправкой, записывая отдельную партию запросов. Обычно это происходит только тогда, когда запись генерируется быстрее скорости отправки. Однако при определённых условиях клиент захочет сократить количество запросов или даже до умеренной нагрузки. Эта настройка осуществляется с добавлением небольшой задержки — то есть вместо немедленной отправки записи продюсер ждёт заданного времени задержки, чтобы отправить другие записи сообщений, которые можно пакетировать. Это можно рассматривать как схожий алгоритм с TCP Nagle. Эта настройка устанавливает более высокую границу задержки для пакетирования: как только мы получаем размер партии batch.size, он отправляется сразу независимо от этой настройки, но если мы получаем сообщение с гораздо меньшим количеством байтов по сравнению с этим, нам нужно «задержать» определённое время, чтобы получить больше сообщений. Эта настройка по умолчанию нулевая, то есть задержки нет. Установка linger.ms=5, например, уменьшит количество запросов, но одновременно увеличивает задержку на 5 мс. | | max.request.size | int | 1028576 | Терпимая | Максимальное количество запрошенных байт. Это также эффективное покрытие для максимального зафиксированного размера. Примечание: сервер имеет собственное переопределяние размеров записей сообщений, отличающееся от этой настройки. Эта настройка ограничивает количество запросов, которые производители могут отправлять массово одновременно, чтобы предотвратить большое количество запросов. | | receive.buffer.bytes | int | 32768 | Терпимая | Размер TCP-receivecache, который используется при чтении данных | | send.buffer.bytes | int | 131072 | Терпимая | Размер кэша отправки TCP, который используется при передаче данных | | timeout.ms | int | 30000 | Терпимая | Эта опция контролирует максимальное время, в течение которого сервер ожидает подтверждения от подписчиков. Если количество подтверждённых запросов не выполнено в течение этого времени, возвращается ошибка. Этот лимит тайм-аута измеряется на серверной стороне и не имеет сетевой задержки, включая запросы | | block.on.buffer.full | Булев | true | низкий | Когда наш кэш памяти заканчивается, мы должны перестать получать новые записи сообщений или выдавать ошибки. По умолчанию это установлено как true, однако некоторые блокировки могут быть не оправданы, поэтому лучше сразу выбросить ошибку. Это происходит при установке false: продюсер выдает ошибку исключения: BufferExhaustedException, если запись отправлена и кэш заполнен | | metadata.fetch.timeout.ms | Долго | 60000 | низкий | Он относится к данным о первом времени некоторых элементов, которые мы получили. Элементы включают: тему, хост, разделы. Эта конфигурация относится к времени, необходимому для успешного завершения элемента согласно извлечению, иначе клиенту будет отправлено исключение. | | metadata.max.age.ms | Долго | 300000 | низкий | Время в микросекундах — это интервал, на котором мы вынужденно обновляем метаданные. Даже если мы не увидим изменений в руководстве после раздела. | | metric.reporters | Список | [] | низкий | Список классов, используемых для измерения метрик. Внедрение интерфейса MetricReporter позволит добавлять классы, которые меняются по мере генерации новых метрик. JmxReporter всегда будет включать способ регистрации статистики JMX | | metrics.num.samples | int | 2 | низкий | Количество выборок, используемых для поддержания метрик | | metrics.sample.window.ms | Долго | 30000 | низкий | Система метрик поддерживает настраиваемое количество выборок в корректируемом размере окна. Такая конфигурация, например, настраивает размер окна. Мы можем хранить два образца в течение 30 секунд. Когда окно разворачивается, мы стираем и переписываем самое старое окно | | recoonect.backoff.ms | Долго | 10 | низкий | Когда соединение не выходит из строя, время ожидания при повторном подключении. Это позволяет избежать повторных переподключений клиентов | | retry.backoff.ms | Долго | 100 | низкий | Время ожидания перед попыткой повторить неудачный запрос на продукты. Избегайте застревания в цикле отправки и ошибки.
|
|