Kafka — це високопродуктивна розподілена система обміну повідомленнями, розроблена LinkedIn, яка широко використовується в таких сценаріях, як збір журналів, обробка потокових даних, розповсюдження повідомлень онлайн та офлайн тощо. Хоча Kafaka не розроблений як традиційний MQ, у більшості випадків Kafaka може замінити традиційні системи обміну повідомленнями, такі як ActiveMQ.
Кафка організовує потік повідомлень за темами, а сервер, який зберігає повідомлення, називається брокером, і споживачі можуть підписатися на одну або кілька тем. Щоб збалансувати навантаження, повідомлення теми можна розділити на кілька розділів, і чим більше розділів, тим вищий паралелізм і пропускна здатність Кафки.
Кластери Kafka потребують підтримки Zookeeper для реалізації кластерів, і Zookeeper вже включений до останнього дистрибутиву Kafka, який можна розгорнути для одночасного запуску сервера Zookeeper і сервера Kafka або використання інших існуючих кластерів Zookeeper.
На відміну від традиційного MQ, споживачі повинні самостійно зберігати зсув, а при отриманні повідомлень від Kafka отримувати повідомлення лише після поточного зсуву. Scala/java-клієнт Kafka вже реалізує цю частину логіки, зберігаючи зсув у zookeeper. Кожен споживач може обрати посвідчення особи, і споживачі з таким самим посвідченням отримають те саме повідомлення лише один раз.Якщо споживачі теми всі використовують один і той самий id, це традиційна черга. Якщо кожен споживач використовує свій ID, це традиційний pub-саб.
Огляд:
Споживання Кафки
1. Споживачі однієї й тієї ж group_id, лише один споживач може споживати повідомлення (Режим черги в черзі)
2. Споживачі різних group_id отримують однакові новини
Переваги Kafka
Розподілений і дуже масштабований. Кластери Kafka можна прозоро масштабувати, щоб додати нові сервери до кластера.
Висока продуктивність. Продуктивність Kafka значно перевищує традиційні реалізації MQ, такі як ActiveMQ і RabbitMQ, особливо Kafka, яка також підтримує пакетні операції. Наступне зображення показує результати стрес-тесту продуктивності споживачів LinkedIn:
Стійкість до відмов. Дані з кожного розділу Kafka реплікуються на кілька серверів. Коли брокер зазнає невдачі, сервіс ZooKeeper повідомляє виробника та споживача, які переходять до іншого брокера.
Недоліки Kafka:
Повторюйте повідомлення. Кафка гарантує, що кожне повідомлення буде доставлено принаймні один раз, і хоча шанси невеликі, існує ймовірність, що повідомлення буде доставлено кілька разів. Новини не за порядком. Хоча повідомлення всередині розділу гарантовано впорядковані, якщо тема має кілька розділів, доставка повідомлень між розділами не гарантовано впорядкована. Складність. Kafka потребує підтримки кластерів доглядачів зоопарку, а теми зазвичай потребують ручної праці для створення, розгортання та підтримки дорожче, ніж загальні черги повідомлень
.NET/C# черга повідомлень Операції Kafka
По-перше, використовуйте .NET Core 3.1 для створення двох нових консольних проєктів — Kafka-Consumer і Kafka-Producer
Використовуйте nuget для посилання на пакет Confluent.Kafka ось так, з такою командою:
Адреса GitHub:Вхід за гіперпосиланням видно.
Спочатку запускаємо програму Producer, і якщо спочатку запускаємо споживача, отримаємо таку помилку:
Виникла помилка: Брокер: невідома тема або розділ Ця стаття охоплює налаштуванняEnableAutoOffsetStore є хибним, тобто ручне налаштування зміщеного сховища (подібно до ручного підтвердження)
Споживачі не встановлюють OffsetStore після споживання
Спробуйте використати виробник для створення двох повідомлень, увімкніть споживання споживачів, MaxPollIntervalMs = 10000 // 10 секунд без ручного налаштування, дозвольте іншим клієнтам споживати, звісно, це не буде використано іншими клієнтами протягом 10 секунд
Пояснення MaxPollIntervalMs
Для досвідчених споживачів максимальний час, дозволений для споживання повідомлень між дзвінками (наприклад, rd_kafka_consumer_poll()). Якщо цей інтервал перевищується, споживач вважається невдалим, і група перебалансовується так, що розділ перепризначається іншому учаснику споживчої групи. Увага: Зміщені комміти наразі можуть бути неможливі. Примітка: рекомендується встановити "enable.auto.offset.store=false" для додатків, які обробляються тривалий час, а потім явно зберігати зсув (використовуючи offsets_store()) після обробки повідомлення*, щоб переконатися, що зсув не буде автоматично зафіксований до завершення обробки. Перевіряйте раз на секунду з інтервалом у два. Детальніше див. KIP-62. Зображення такі:
OffsetStore встановлюється після завершення витрат споживачем
код
Після завершення налаштування зачекайте 10 секунд — і все одно будеОтримав останнє повідомлення(Коли споживач підключається до брокера,Починайте споживання з позиції зміщення.Якщо c.Commit(cr) встановлено; Останнє повідомлення не буде надходить неодноразово.
Переглянути вихідний код
зафіксувати коміт offset + 1 і зрештою викликати Librdkafka.topic_partition_list_destroy(cOffsets);
Вхід за гіперпосиланням видно.
Вхід за гіперпосиланням видно.
Встановіть інший GroupId
Спробуйте встановити інший GroupId через параметр командного рядка, а потім надіслати повідомлення через виробника, як показано на наступному зображенні:
І clinet1, і client2Отримуйте історичні повідомлення, і після того, як продюсер надішле повідомлення, обидва майже будутьОтримувати повідомлення одночасно。
Нові споживачі отримують лише нові повідомлення
Як змусити нового клієнта отримувати лише нові повідомлення і ігнорувати історичні дані?
Налаштування такі:
Як показано нижче:
Код продюсера
Наступним чином:
Споживчий код
Наступним чином:
Завантаження вихідного коду
Туристи, якщо ви хочете побачити прихований контент цього допису, будь ласка Відповідь
|