Ця стаття є дзеркальною статтею машинного перекладу, будь ласка, натисніть тут, щоб перейти до оригінальної статті.

Вид: 129246|Відповідь: 17

[Джерело] Операції Kafka для черг повідомлень .NET/C# [з вихідним кодом]

[Копіювати посилання]
Опубліковано 13.04.2021 11:45:31 | | | |
Kafka — це високопродуктивна розподілена система обміну повідомленнями, розроблена LinkedIn, яка широко використовується в таких сценаріях, як збір журналів, обробка потокових даних, розповсюдження повідомлень онлайн та офлайн тощо. Хоча Kafaka не розроблений як традиційний MQ, у більшості випадків Kafaka може замінити традиційні системи обміну повідомленнями, такі як ActiveMQ.

Кафка організовує потік повідомлень за темами, а сервер, який зберігає повідомлення, називається брокером, і споживачі можуть підписатися на одну або кілька тем. Щоб збалансувати навантаження, повідомлення теми можна розділити на кілька розділів, і чим більше розділів, тим вищий паралелізм і пропускна здатність Кафки.

Кластери Kafka потребують підтримки Zookeeper для реалізації кластерів, і Zookeeper вже включений до останнього дистрибутиву Kafka, який можна розгорнути для одночасного запуску сервера Zookeeper і сервера Kafka або використання інших існуючих кластерів Zookeeper.

На відміну від традиційного MQ, споживачі повинні самостійно зберігати зсув, а при отриманні повідомлень від Kafka отримувати повідомлення лише після поточного зсуву. Scala/java-клієнт Kafka вже реалізує цю частину логіки, зберігаючи зсув у zookeeper. Кожен споживач може обрати посвідчення особи, і споживачі з таким самим посвідченням отримають те саме повідомлення лише один раз.Якщо споживачі теми всі використовують один і той самий id, це традиційна черга. Якщо кожен споживач використовує свій ID, це традиційний pub-саб.

Огляд:

Додати ActiveMQ до системних сервісів під Windows
https://www.itsvse.com/thread-6210-1-1.html

КількістьОчікуваних Повідомлень, ПовідомленьВ Черзі, Повідомлення...
https://www.itsvse.com/thread-4954-1-1.html

Короткий зміст інформації про ActiveMQ та RabbitMQ
https://www.itsvse.com/thread-4659-1-1.html

До сервісу додається CentOS ActiveMQ
https://www.itsvse.com/thread-4617-1-1.html

Навчальний посібник Centos 6.2 з 64-бітної інсталяції activemq
https://www.itsvse.com/thread-4616-1-1.html

ActiveMQ5.15.3 не запускається, і повідомляється про UnsupportedClassVersionError
https://www.itsvse.com/thread-4615-1-1.html

Налаштування дозволу теми Activemq
https://www.itsvse.com/thread-4495-1-1.html

Користувач itsvse не має права читати з: ActiveMQ.Advisory.TempQueue,Activ...
https://www.itsvse.com/thread-4476-1-1.html

Клієнт C# ActiveMQ підписується на вихідний код
https://www.itsvse.com/thread-4470-1-1.html

.net/c# activemq встановлює обліковий запис підключення та пароль
https://www.itsvse.com/thread-4282-1-1.html

Встановіть ім'я користувача та пароль для теми ACTIVEMQ і в черзі
https://www.itsvse.com/thread-4281-1-1.html

activemq змінює пароль для керування сайтом
https://www.itsvse.com/thread-4280-1-1.html

Постійний магазин activemq заповнений
https://www.itsvse.com/thread-4125-1-1.html

Приклад операції .NET/C# ActiveMQ [Вихідний код]
https://www.itsvse.com/thread-3907-1-1.html

Конфігурація дозволу користувача Activemq
https://www.itsvse.com/thread-3906-1-1.html

Різниця між чергою activemq і Topic у тому, що
https://www.itsvse.com/thread-3863-1-1.html

. Платформа .Net
https://www.itsvse.com/thread-3452-1-1.html

Налаштування постійної підписки ActiveMQ
https://www.itsvse.com/thread-3451-1-1.html

Обмеження споживчої паралельної обробки RabbitMQ BasicQos
https://www.itsvse.com/thread-4667-1-1.html

rabbitMQ Queue Queue Persistence Message [з вихідним кодом]
https://www.itsvse.com/thread-4657-1-1.html

【Practice】rabbitMQ console для додавання account information
https://www.itsvse.com/thread-4655-1-1.html

Глибокий аналіз механізму відповіді на повідомлення RabbitMQ
https://www.itsvse.com/thread-4639-1-1.html

.net/c# Відключення з'єднання RabbitMQ — відключення та повторне підключення
https://www.itsvse.com/thread-4636-1-1.html

Вступ до трьох режимів обміну (фанаут, прямий і тематичний) RabbitMQ
https://www.itsvse.com/thread-4635-1-1.html

【Practice】RabbitMQ встановлює плагін управління вебом
https://www.itsvse.com/thread-4631-1-1.html

【Practical Combat】RabbitMQ installation tutorial under Windows
https://www.itsvse.com/thread-4630-1-1.html
Споживання Кафки

1. Споживачі однієї й тієї ж group_id, лише один споживач може споживати повідомлення (Режим черги в черзі

2. Споживачі різних group_id отримують однакові новини

Переваги Kafka

Розподілений і дуже масштабований. Кластери Kafka можна прозоро масштабувати, щоб додати нові сервери до кластера.

Висока продуктивність. Продуктивність Kafka значно перевищує традиційні реалізації MQ, такі як ActiveMQ і RabbitMQ, особливо Kafka, яка також підтримує пакетні операції. Наступне зображення показує результати стрес-тесту продуктивності споживачів LinkedIn:

Стійкість до відмов. Дані з кожного розділу Kafka реплікуються на кілька серверів. Коли брокер зазнає невдачі, сервіс ZooKeeper повідомляє виробника та споживача, які переходять до іншого брокера.

Недоліки Kafka:

Повторюйте повідомлення. Кафка гарантує, що кожне повідомлення буде доставлено принаймні один раз, і хоча шанси невеликі, існує ймовірність, що повідомлення буде доставлено кілька разів.
Новини не за порядком. Хоча повідомлення всередині розділу гарантовано впорядковані, якщо тема має кілька розділів, доставка повідомлень між розділами не гарантовано впорядкована.
Складність. Kafka потребує підтримки кластерів доглядачів зоопарку, а теми зазвичай потребують ручної праці для створення, розгортання та підтримки дорожче, ніж загальні черги повідомлень

.NET/C# черга повідомлень Операції Kafka

По-перше, використовуйте .NET Core 3.1 для створення двох нових консольних проєктів — Kafka-Consumer і Kafka-Producer

Використовуйте nuget для посилання на пакет Confluent.Kafka ось так, з такою командою:

Адреса GitHub:Вхід за гіперпосиланням видно.

Спочатку запускаємо програму Producer, і якщо спочатку запускаємо споживача, отримаємо таку помилку:
Виникла помилка: Брокер: невідома тема або розділ

Ця стаття охоплює налаштуванняEnableAutoOffsetStore є хибним, тобто ручне налаштування зміщеного сховища (подібно до ручного підтвердження)

Споживачі не встановлюють OffsetStore після споживання

Спробуйте використати виробник для створення двох повідомлень, увімкніть споживання споживачів, MaxPollIntervalMs = 10000 // 10 секунд без ручного налаштування, дозвольте іншим клієнтам споживати, звісно, це не буде використано іншими клієнтами протягом 10 секунд

Пояснення MaxPollIntervalMs
Для досвідчених споживачів максимальний час, дозволений для споживання повідомлень між дзвінками (наприклад, rd_kafka_consumer_poll()). Якщо цей інтервал перевищується, споживач вважається невдалим, і група перебалансовується так, що розділ перепризначається іншому учаснику споживчої групи. Увага: Зміщені комміти наразі можуть бути неможливі. Примітка: рекомендується встановити "enable.auto.offset.store=false" для додатків, які обробляються тривалий час, а потім явно зберігати зсув (використовуючи offsets_store()) після обробки повідомлення*, щоб переконатися, що зсув не буде автоматично зафіксований до завершення обробки. Перевіряйте раз на секунду з інтервалом у два. Детальніше див. KIP-62.

Зображення такі:



OffsetStore встановлюється після завершення витрат споживачем

код

Після завершення налаштування зачекайте 10 секунд — і все одно будеОтримав останнє повідомлення(Коли споживач підключається до брокера,Починайте споживання з позиції зміщення.Якщо c.Commit(cr) встановлено; Останнє повідомлення не буде надходить неодноразово.

Переглянути вихідний код



зафіксувати коміт offset + 1 і зрештою викликати Librdkafka.topic_partition_list_destroy(cOffsets);

Вхід за гіперпосиланням видно.
Вхід за гіперпосиланням видно.

Встановіть інший GroupId

Спробуйте встановити інший GroupId через параметр командного рядка, а потім надіслати повідомлення через виробника, як показано на наступному зображенні:



І clinet1, і client2Отримуйте історичні повідомлення, і після того, як продюсер надішле повідомлення, обидва майже будутьОтримувати повідомлення одночасно

Нові споживачі отримують лише нові повідомлення

Як змусити нового клієнта отримувати лише нові повідомлення і ігнорувати історичні дані?

Налаштування такі:

Як показано нижче:



Код продюсера

Наступним чином:

Споживчий код

Наступним чином:

Завантаження вихідного коду

Туристи, якщо ви хочете побачити прихований контент цього допису, будь ласкаВідповідь






Попередній:Виняток .NET/C# при використанні Tencent Enterprise Mailbox: Операція закінчилася.
Наступний:NuGet очищає кеш
 Орендодавець| Опубліковано 15.04.2021 09:31:05 |
Коли клієнт .NET Kafka відключається, він не створює виключення і підключається знову після нормального стану мережі
%4|1618450028.267| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Відключено (після 59926мс у штаті UP)
%3|1618450028.267| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Відключено (після 59926мс у штаті UP)
%3|1618450028.267| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Підключитися до ipv4#192.168.1.175:9092 не вдалося: Невідома помилка (після 0 мс у стані CONNECT)
%3|1618450028.268| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Підключення до ipv4#192.168.1.175:9092 не вдалося: Невідома помилка (після 0 мс у стані CONNECT)
%3|1618450028.357| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Підключитися до ipv4#192.168.1.175:9092 не вдалося: Невідома помилка (після 10 мс у стані CONNECT, 1 ідентична помилка пригнічена)
%3|1618450028.357| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Підключитися до ipv4#192.168.1.175:9092 не вдалося: Невідома помилка (після 10 мс у стані CONNECT, 1 ідентична помилка приглушена)
%3|1618450062.882| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Підключитися до ipv4#192.168.1.175:9092 не вдалося: Невідома помилка (після 0 мс у стані CONNECT, 8 ідентичних помилок пригнічено)
%3|1618450062.882| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Підключитися до ipv4#192.168.1.175:9092 не вдалося: Невідома помилка (після 0 мс у стані CONNECT, 8 ідентичних помилок пригнічено)
%3|1618450098.255| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Підключитися до ipv4#192.168.1.175:9092 не вдалося: Невідома помилка (після 11 мс у стані CONNECT, 4 однакові помилки пригнічено)
%3|1618450098.255| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Підключитися до ipv4#192.168.1.175:9092 не вдалося: Невідома помилка (після 11 мс у стані CONNECT, 4 ідентичні помилки пригнічено)
%3|1618450138.243| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Підключення до ipv4#192.168.1.175:9092 не вдалося: Невідома помилка (після 0 мс у стані CONNECT, 4 ідентичні помилки приглушено)
%3|1618450138.244| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Підключитися до ipv4#192.168.1.175:9092 не вдалося: Невідома помилка (після 0 мс у стані CONNECT, 4 ідентичні помилки приглушено)
%3|1618450168.254| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Підключитися до ipv4#192.168.1.175:9092 не вдалося: Невідома помилка (після 10 мс у стані CONNECT, 3 ідентичні помилки пригнічено)
%3|1618450168.254| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Підключитися до ipv4#192.168.1.175:9092 не вдалося: Невідома помилка (після 10 мс у стані CONNECT, 3 ідентичні помилки приглушено)

 Орендодавець| Опубліковано 13.04.2021 16:26:58 |
Принцип споживання повідомлень:

У реальному виробничому процесі кожна тема матиме кілька розділів, і перевага кількох розділів полягає в тому, що, з одного боку, можливість шардингу даних на брокері ефективно зменшує пропускну здатність повідомлень і покращує продуктивність IO. З іншого боку, щоб підвищити споживчу потужність, одна й та сама тема зазвичай споживається через кількох споживачів, тобто механізм балансування навантаження на стороні споживача, що ми й зрозуміємо далі: як споживачі споживають повідомлення у випадку кількох розділів і кількох споживачів? Кафка існує у концепції споживчих груп, тобто group.id однакових споживачів, які належать до споживчої групи, і всі споживачі в групі координуються для споживання всіх розділів теми підписки. Звісно, кожен розділ може бути використаний лише споживачами однієї й тієї ж групи споживачів, тож як споживачі однієї споживчої групи розподіляють дані, в яких слід споживати розділ? Для простого прикладу: якщо є розбиття, які програють, тобто коли кількість споживачів дорівнює кількості споживачів, кожен споживач відповідає розбиттю; якщо кількість споживачів більша за розбиття, то додаткова кількість споживачів не працюватиме, навпаки, будуть споживачі, які споживають кілька розбиттів.

Стратегія зонування:

У Kafka існують дві стратегії розподілу розділів: одна — Range (за замовчуванням), а інша — RoundRobin (опитування). Це встановлюється параметром конфігурації споживача partition.assignment.strategy.


Переглянути всі теми


Переглянути деталі теми




 Орендодавець| Опубліковано 08.05.2021 17:17:33 |
Kafka видаляє споживчі групи



Видалення запитаних споживчих груп ('itsvse') було успішним.


Можна повідомити про такі помилки:

Error: Deletion of some consumer groups failed:
* Група 'itsvse' не могла бути видалена через: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: Група не є порожньою.
рішення

Споживайте всі повідомлення або встановіть зміщення

Кафка вручну встановлює зміщення зсуву
https://www.itsvse.com/thread-9641-1-1.html
Потім видаліть його знову!

 Орендодавець| Опубліковано 13.04.2021 15:40:48 |
Команда power shell



Кожен клієнт-споживач підтримує 2 підключення до сервісу Kafka
 Орендодавець| Опубліковано 07.05.2021 12:37:06 |
kafka, щоб переглянути кількість стеків тем у певній групі

Опубліковано 16.06.2021 12:41:09 |
Будь ласка, запитайте, чому код не можна переглянути~
 Орендодавець| Опубліковано 25.06.2021 10:50:06 |
Кафка отримує команду розміру теми:



 Орендодавець| Опубліковано 18.07.2021 10:15:01 |
Командний рядок Kafka для створення тем:

Опубліковано 03.09.2021 11:52:41 |
У Кафці ще багато пасток, як це зрозумілося.
Застереження:
Усе програмне забезпечення, програмні матеріали або статті, опубліковані Code Farmer Network, призначені лише для навчання та досліджень; Вищезазначений контент не повинен використовуватися в комерційних чи незаконних цілях, інакше користувачі несуть усі наслідки. Інформація на цьому сайті надходить з Інтернету, і спори щодо авторських прав не мають до цього сайту. Ви повинні повністю видалити вищезазначений контент зі свого комп'ютера протягом 24 годин після завантаження. Якщо вам подобається програма, будь ласка, підтримуйте справжнє програмне забезпечення, купуйте реєстрацію та отримайте кращі справжні послуги. Якщо є будь-яке порушення, будь ласка, зв'яжіться з нами електронною поштою.

Mail To:help@itsvse.com