|
|
Опубликовано 13.04.2021 11:45:31
|
|
|
|

Kafka — это высокопроизводительная распределённая система обмена сообщениями, разработанная LinkedIn, широко используемая в таких ситуациях, как сбор журналов, обработка потоковых данных, распространение сообщений в интернете и офлайн-формате и многое другое. Хотя Kafaka не разработана как традиционная MQ, в большинстве случаев она может заменить традиционные системы обмена сообщениями, такие как ActiveMQ.
Kafka организует поток сообщений по темам, а сервер, на котором они хранятся, называется брокером, и потребители могут подписаться на одну или несколько тем. Для балансировки нагрузки сообщения темы можно разделить на несколько разделов, и чем больше разделов, тем выше параллелизм и пропускная способность Кафки.
Кластеры Kafka требуют поддержки Zookeeper для реализации кластеров, и Zookeeper уже включен в последний дистрибутив Kafka, который можно развернуть для запуска сервера Zookeeper и сервера Kafka одновременно, либо использовать другие существующие кластеры Zookeeper.
В отличие от традиционного MQ, потребителям нужно самостоятельно держать офсет, а при получении сообщений от Kafka — брать сообщения только после текущего смещения. Scala/java-клиент Kafka уже реализует эту часть логики, сохраняя смещение в zookeeper. Каждый потребитель может выбрать удостоверение личности, и пользователи с таким же удостоверением получат одно и то же сообщение только один раз.Если потребители темы используют один и тот же идентификатор, это традиционная очередь. Если каждый потребитель использует свой идентификатор, это традиционный паб-саб.
Обзор:
Потребление кафки
1. Потребители одного и того же group_id, только один пользователь может потреблять сообщения (Режим очереди)
2. Потребители разных group_id получают одни и те же новости
Преимущества Kafka
Распределённый и высокомасштабируемый. Кластеры Kafka можно прозрачно масштабировать для добавления новых серверов в кластер.
Высокая производительность. Производительность Kafka значительно превосходит традиционные реализации MQ, такие как ActiveMQ и RabbitMQ, особенно Kafka, которая также поддерживает пакетные операции. Следующее изображение показывает результаты стресс-теста производительности потребителей LinkedIn:
Отказостойкость. Данные с каждого раздела в Kafka реплицируются на несколько серверов. Когда брокер терпит неудачу, сервис ZooKeeper уведомляет производителя и потребителя, после чего они переходят к другому брокеру.
Недостатки Kafka:
Повторяйте сообщения. Кафка гарантирует, что каждое сообщение будет доставлено хотя бы один раз, и хотя шансы невелики, есть шанс, что сообщение будет доставлено несколько раз. Новости не по порядку. Хотя сообщения внутри раздела гарантированно упорядочены, если тема состоит из нескольких разделов, доставка сообщений между разделами не гарантирует упорядочения. Сложность. Kafka требует поддержки кластеров смотрителей зоопарка, а темы обычно требуют ручного труда для создания, развертывания и поддержания дороже, чем обычные очереди сообщений
.NET/C# очередь сообщений Операции Кафки
Во-первых, используйте .NET Core 3.1 для создания двух новых консольных проектов — Kafka-Consumer и Kafka-Producer
Используйте nuget для ссылки на пакет Confluent.Kafka вот так, с помощью следующей команды:
Адрес на GitHub:Вход по гиперссылке виден.
Сначала запускаем программу Producer, и если первым запускаем Consumer, получим следующую ошибку:
Произошла ошибка: Брокер: неизвестная тема или раздел Эта статья будет охватывать настройкиEnableAutoOffsetStore ложен, то есть вручную устанавливать смещённое хранилище (аналогично ручному подтверждению)
Потребители не устанавливают OffsetStore после потребления
Попробуйте использовать производитель для создания двух сообщений, включите потребительское потребление, MaxPollIntervalMs = 10000 // 10 секунд без ручной настройки, позвольте другим клиентам потреблять, конечно, они не будут потреблены другими клиентами в течение 10 секунд
MaxPollIntervalMs объясняет
Для продвинутых пользователей максимальное время, позволяемое на поглощение сообщений между звонками (например, rd_kafka_consumer_poll()). Если этот интервал превышается, потребитель считается неудачным, и группа перебалансируется так, чтобы раздел переназначается другому участнику потребительской группы. Предупреждение: офсетные коммиты могут быть невозможны в данный момент. Примечание: рекомендуется установить «enable.auto.offset.store=false» для приложений, которые долго обрабатывают, а затем явно сохранять смещение (используя offsets_store()) после обработки сообщения*, чтобы убедиться, что смещение не будет автоматически закреплено до завершения обработки. Проверяйте раз в секунду с интервалом в два. Для получения дополнительной информации см. KIP-62. Визуализации следующие:
OffsetStore устанавливается после окончания расходов потребителя
код
После завершения настройки подождите 10 секунд, и всё равно будет работатьПолучил последнее сообщение(Когда потребитель соединяется с брокером,Начинайте расход с смещённой позицииЕсли c.Commit(cr) установлен; Последнее сообщение не будет получать повторно.
Просмотр исходного кода
фиксировать фиксацию смещения + 1 и в конечном итоге вызывать Librdkafka.topic_partition_list_destroy(cOffsets);
Вход по гиперссылке виден.
Вход по гиперссылке виден.
Установите другой GroupId
Попробуйте установить другой GroupId через параметр командной строки, а затем отправьте сообщение через продюсера, как показано на следующем изображении:
И clinet1, и client2Получение исторических сообщений, и после того, как продюсер отправит сообщение, оба почти будутПолучать сообщения одновременно。
Новые потребители получают только новые сообщения
Как заставить нового клиента получать только новые сообщения и игнорировать исторические данные?
Настройки следующие:
Как показано ниже:
Код продюсера
Следующим образом:
Потребительский код
Следующим образом:
Скачать исходный код
Туристы, если вы хотите увидеть скрытое содержание этого поста, пожалуйста Ответ
|
Предыдущий:.NET/C# Исключение, использующее почтовый ящик Tencent Enterprise: Операция завершилась.Следующий:NuGet очищает кэш
|