Эта статья является зеркальной статьёй машинного перевода, пожалуйста, нажмите здесь, чтобы перейти к оригиналу.

Вид: 129246|Ответ: 17

[Источник] Операции Kafka для очередей сообщений .NET/C# [с исходным кодом]

[Скопировать ссылку]
Опубликовано 13.04.2021 11:45:31 | | | |
Kafka — это высокопроизводительная распределённая система обмена сообщениями, разработанная LinkedIn, широко используемая в таких ситуациях, как сбор журналов, обработка потоковых данных, распространение сообщений в интернете и офлайн-формате и многое другое. Хотя Kafaka не разработана как традиционная MQ, в большинстве случаев она может заменить традиционные системы обмена сообщениями, такие как ActiveMQ.

Kafka организует поток сообщений по темам, а сервер, на котором они хранятся, называется брокером, и потребители могут подписаться на одну или несколько тем. Для балансировки нагрузки сообщения темы можно разделить на несколько разделов, и чем больше разделов, тем выше параллелизм и пропускная способность Кафки.

Кластеры Kafka требуют поддержки Zookeeper для реализации кластеров, и Zookeeper уже включен в последний дистрибутив Kafka, который можно развернуть для запуска сервера Zookeeper и сервера Kafka одновременно, либо использовать другие существующие кластеры Zookeeper.

В отличие от традиционного MQ, потребителям нужно самостоятельно держать офсет, а при получении сообщений от Kafka — брать сообщения только после текущего смещения. Scala/java-клиент Kafka уже реализует эту часть логики, сохраняя смещение в zookeeper. Каждый потребитель может выбрать удостоверение личности, и пользователи с таким же удостоверением получат одно и то же сообщение только один раз.Если потребители темы используют один и тот же идентификатор, это традиционная очередь. Если каждый потребитель использует свой идентификатор, это традиционный паб-саб.

Обзор:

Добавить ActiveMQ в системные сервисы под Windows
https://www.itsvse.com/thread-6210-1-1.html

Количество ожидающих сообщений, сообщений в очереди, сообщений...
https://www.itsvse.com/thread-4954-1-1.html

Краткое изложение информации о ActiveMQ и RabbitMQ
https://www.itsvse.com/thread-4659-1-1.html

В сервис добавлен CentOS ActiveMQ
https://www.itsvse.com/thread-4617-1-1.html

Tutorial Activemq по 64-битной установке Centos 6.2
https://www.itsvse.com/thread-4616-1-1.html

ActiveMQ5.15.3 не запускается, и сообщается о ошибке UnsupportedClassVersionError
https://www.itsvse.com/thread-4615-1-1.html

Настройки разрешений на тему Activemq
https://www.itsvse.com/thread-4495-1-1.html

Пользователь itsvse не имеет права читать с: ActiveMQ.Advisory.TempQueue,Activ...
https://www.itsvse.com/thread-4476-1-1.html

Клиент C# ActiveMQ подписывается на исходный код
https://www.itsvse.com/thread-4470-1-1.html

.NET/C# Activemq для установки аккаунта подключения и пароля
https://www.itsvse.com/thread-4282-1-1.html

Задайте имя пользователя и пароль для темы ACTIVEMQ и очереди
https://www.itsvse.com/thread-4281-1-1.html

activemq изменяет пароль управления сайтом
https://www.itsvse.com/thread-4280-1-1.html

activemq Persistent Store заполнен
https://www.itsvse.com/thread-4125-1-1.html

Пример операции .NET/C# ActiveMQ [Исходный код]
https://www.itsvse.com/thread-3907-1-1.html

Конфигурация разрешения пользователя Activemq
https://www.itsvse.com/thread-3906-1-1.html

Разница между очередью activemq и Topic в том, что
https://www.itsvse.com/thread-3863-1-1.html

. Платформа .Net
https://www.itsvse.com/thread-3452-1-1.html

Настройки постоянной подписки ActiveMQ
https://www.itsvse.com/thread-3451-1-1.html

Ограничение потребительской параллельной обработки RabbitMQ BasicQos
https://www.itsvse.com/thread-4667-1-1.html

rabbitMQ Queue Queue Message Persistence [с исходным кодом]
https://www.itsvse.com/thread-4657-1-1.html

【Practice】rabbitMQ console для добавления информации о аккаунте
https://www.itsvse.com/thread-4655-1-1.html

Глубокий анализ механизма отклика на сообщения RabbitMQ
https://www.itsvse.com/thread-4639-1-1.html

.net/c# Отключение соединения RabbitMQ — отключение и повторное подключение
https://www.itsvse.com/thread-4636-1-1.html

Введение в три режима обмена (фанаут, прямой и тематический) RabbitMQ
https://www.itsvse.com/thread-4635-1-1.html

【Practice】RabbitMQ устанавливает плагин управления вебом
https://www.itsvse.com/thread-4631-1-1.html

【Practical Combat】RabbitMQ installation tutorial under Windows
https://www.itsvse.com/thread-4630-1-1.html
Потребление кафки

1. Потребители одного и того же group_id, только один пользователь может потреблять сообщения (Режим очереди

2. Потребители разных group_id получают одни и те же новости

Преимущества Kafka

Распределённый и высокомасштабируемый. Кластеры Kafka можно прозрачно масштабировать для добавления новых серверов в кластер.

Высокая производительность. Производительность Kafka значительно превосходит традиционные реализации MQ, такие как ActiveMQ и RabbitMQ, особенно Kafka, которая также поддерживает пакетные операции. Следующее изображение показывает результаты стресс-теста производительности потребителей LinkedIn:

Отказостойкость. Данные с каждого раздела в Kafka реплицируются на несколько серверов. Когда брокер терпит неудачу, сервис ZooKeeper уведомляет производителя и потребителя, после чего они переходят к другому брокеру.

Недостатки Kafka:

Повторяйте сообщения. Кафка гарантирует, что каждое сообщение будет доставлено хотя бы один раз, и хотя шансы невелики, есть шанс, что сообщение будет доставлено несколько раз.
Новости не по порядку. Хотя сообщения внутри раздела гарантированно упорядочены, если тема состоит из нескольких разделов, доставка сообщений между разделами не гарантирует упорядочения.
Сложность. Kafka требует поддержки кластеров смотрителей зоопарка, а темы обычно требуют ручного труда для создания, развертывания и поддержания дороже, чем обычные очереди сообщений

.NET/C# очередь сообщений Операции Кафки

Во-первых, используйте .NET Core 3.1 для создания двух новых консольных проектов — Kafka-Consumer и Kafka-Producer

Используйте nuget для ссылки на пакет Confluent.Kafka вот так, с помощью следующей команды:

Адрес на GitHub:Вход по гиперссылке виден.

Сначала запускаем программу Producer, и если первым запускаем Consumer, получим следующую ошибку:
Произошла ошибка: Брокер: неизвестная тема или раздел

Эта статья будет охватывать настройкиEnableAutoOffsetStore ложен, то есть вручную устанавливать смещённое хранилище (аналогично ручному подтверждению)

Потребители не устанавливают OffsetStore после потребления

Попробуйте использовать производитель для создания двух сообщений, включите потребительское потребление, MaxPollIntervalMs = 10000 // 10 секунд без ручной настройки, позвольте другим клиентам потреблять, конечно, они не будут потреблены другими клиентами в течение 10 секунд

MaxPollIntervalMs объясняет
Для продвинутых пользователей максимальное время, позволяемое на поглощение сообщений между звонками (например, rd_kafka_consumer_poll()). Если этот интервал превышается, потребитель считается неудачным, и группа перебалансируется так, чтобы раздел переназначается другому участнику потребительской группы. Предупреждение: офсетные коммиты могут быть невозможны в данный момент. Примечание: рекомендуется установить «enable.auto.offset.store=false» для приложений, которые долго обрабатывают, а затем явно сохранять смещение (используя offsets_store()) после обработки сообщения*, чтобы убедиться, что смещение не будет автоматически закреплено до завершения обработки. Проверяйте раз в секунду с интервалом в два. Для получения дополнительной информации см. KIP-62.

Визуализации следующие:



OffsetStore устанавливается после окончания расходов потребителя

код

После завершения настройки подождите 10 секунд, и всё равно будет работатьПолучил последнее сообщение(Когда потребитель соединяется с брокером,Начинайте расход с смещённой позицииЕсли c.Commit(cr) установлен; Последнее сообщение не будет получать повторно.

Просмотр исходного кода



фиксировать фиксацию смещения + 1 и в конечном итоге вызывать Librdkafka.topic_partition_list_destroy(cOffsets);

Вход по гиперссылке виден.
Вход по гиперссылке виден.

Установите другой GroupId

Попробуйте установить другой GroupId через параметр командной строки, а затем отправьте сообщение через продюсера, как показано на следующем изображении:



И clinet1, и client2Получение исторических сообщений, и после того, как продюсер отправит сообщение, оба почти будутПолучать сообщения одновременно

Новые потребители получают только новые сообщения

Как заставить нового клиента получать только новые сообщения и игнорировать исторические данные?

Настройки следующие:

Как показано ниже:



Код продюсера

Следующим образом:

Потребительский код

Следующим образом:

Скачать исходный код

Туристы, если вы хотите увидеть скрытое содержание этого поста, пожалуйстаОтвет






Предыдущий:.NET/C# Исключение, использующее почтовый ящик Tencent Enterprise: Операция завершилась.
Следующий:NuGet очищает кэш
 Хозяин| Опубликовано 15.04.2021 9:31:05 |
Когда клиент .NET Kafka отключён, он не выдает исключения и подключается снова после того, как сеть в норме
%4|1618450028.267| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Отключено (после 59926мс в штате UP)
%3|1618450028.267| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Отключено (после 59926 мс в штате UP)
%3|1618450028.267| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Подключение к ipv4#192.168.1.175:9092 не удалось: Неизвестная ошибка (после 0 мс в состоянии CONNECT)
%3|1618450028.268| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Подключение к ipv4#192.168.1.175:9092 не удалось: Неизвестная ошибка (после 0 мс в состоянии CONNECT)
%3|1618450028.357| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Подключение к ipv4#192.168.1.175:9092 не удалось: Неизвестная ошибка (после 10 мс в состоянии CONNECT, 1 идентичная ошибка подавлена)
%3|1618450028.357| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Подключение к ipv4#192.168.1.175:9092 не удалось: Неизвестная ошибка (после 10 мс в состоянии CONNECT, 1 идентичная ошибка подавлена)
%3|1618450062.882| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Подключиться к ipv4#192.168.1.175:9092 не удалось: Неизвестная ошибка (после 0 мс в состоянии CONNECT, 8 идентичных ошибок подавлены)
%3|1618450062.882| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Подключение к ipv4#192.168.1.175:9092 не удалось: Неизвестная ошибка (после 0 мс в состоянии CONNECT, 8 идентичных ошибок подавлены)
%3|1618450098.255| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Подключение к ipv4#192.168.1.175:9092 не удалось: Неизвестная ошибка (после 11 мс в состоянии CONNECT, 4 идентичные ошибки подавлены)
%3|1618450098.255| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Подключиться к ipv4#192.168.1.175:9092 не удалось: Неизвестная ошибка (после 11 мс в состоянии CONNECT, 4 одинаковые ошибки подавлены)
%3|1618450138.243| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Подключение к ipv4#192.168.1.175:9092 не удалось: Неизвестная ошибка (после 0 мс в состоянии CONNECT, 4 одинаковые ошибки подавлены)
%3|1618450138.244| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Подключиться к ipv4#192.168.1.175:9092 не удалось: Неизвестная ошибка (после 0 мс в состоянии CONNECT, 4 одинаковые ошибки подавлены)
%3|1618450168.254| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Подключение к ipv4#192.168.1.175:9092 не удалось: Неизвестная ошибка (после 10 мс в состоянии CONNECT, 3 идентичные ошибки подавлены)
%3|1618450168.254| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Подключиться к ipv4#192.168.1.175:9092 не удалось: Ошибка неизвестна (после 10 мс в состоянии CONNECT, 3 идентичные ошибки подавлены)

 Хозяин| Опубликовано 13.04.2021 16:26:58 |
Принцип потребления сообщений:

В процессе производства каждая тема будет содержать несколько разделов, и преимущество нескольких разделов в том, что, с одной стороны, возможность шардинга данных на брокере эффективно снижает ёмкость сообщений и повышает производительность ввода-вывода. С другой стороны, чтобы повысить потребляющую способность потребительской стороны, одна и та же тема обычно будет восприниматься через нескольких потребителей, то есть механизм балансировки нагрузки со стороны потребителя, что и будет дальше: как потребители воспринимают сообщения в случае нескольких разделов и нескольких потребителей? Кафка существует в концепции потребительских групп, то есть group.id одного и того же типа потребителей, которые относятся к потребительской группе, и все потребители в группе координируют потребление всех разделов темы подписки. Конечно, каждый раздел может использоваться только потребителями одной и той же группы потребителей, так как же потребители из одной потребительской группы распределяют данные, в которых должен быть использован раздел? Для простого примера: если есть разбиения, проигрывают, то есть когда количество партитонов совпадает с количеством потребителей, каждый потребляющий соответствует разбиению; если число потребляющих больше разбиения, то дополнительное количество потребителей не работает, наоборот, будут потребляющие несколько разбиний.

Стратегия распределения зонирования:

В Кафке существуют две стратегии распределения по делениям: одна — Диапазон (по умолчанию), а другая — RoundRobin (опросы). Это задаётся параметром конфигурации потребителя partition.assignment.strategy.


Смотреть все темы


Подробнее по теме




 Хозяин| Опубликовано 08.05.2021 17:17:33 |
Kafka удаляет потребительские группы



Удаление запрошенных потребительских групп ('itsvse') прошло успешно.


Могут быть указаны следующие ошибки:

Error: Deletion of some consumer groups failed:
* Группа 'itsvse' не могла быть удалена из-за: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: Группа не является пустой.
решение

Потребляйте все сообщения или устанавливайте смещение

Kafka вручную устанавливает смещение
https://www.itsvse.com/thread-9641-1-1.html
А потом удалите это снова!

 Хозяин| Опубликовано 13.04.2021 15:40:48 |
Команда силовой оболочки



Каждый потребительский клиент поддерживает 2 подключения к сервису Kafka
 Хозяин| Опубликовано 07.05.2021 12:37:06 |
kafka, чтобы просмотреть количество стопок тем в определённой группе

Опубликовано 16.06.2021 12:41:09 |
Пожалуйста, спросите, почему код нельзя посмотреть~
 Хозяин| Опубликовано 25.06.2021 10:50:06 |
Кафка получает команду размера темы:



 Хозяин| Опубликовано 18.07.2021 10:15:01 |
Командная строка Kafka для создания тем:

Опубликовано 03.09.2021 11:52:41 |
В Кафке всё ещё много ловушек, как это было сделано
Отказ:
Всё программное обеспечение, программные материалы или статьи, публикуемые Code Farmer Network, предназначены исключительно для учебных и исследовательских целей; Вышеуказанный контент не должен использоваться в коммерческих или незаконных целях, иначе пользователи несут все последствия. Информация на этом сайте взята из Интернета, и споры по авторским правам не имеют отношения к этому сайту. Вы должны полностью удалить вышеуказанный контент с компьютера в течение 24 часов после загрузки. Если вам нравится программа, пожалуйста, поддержите подлинное программное обеспечение, купите регистрацию и получите лучшие подлинные услуги. Если есть нарушение, пожалуйста, свяжитесь с нами по электронной почте.

Mail To:help@itsvse.com