Kafka е високопроизводителна, разпределена система за съобщения, разработена от LinkedIn, която се използва широко в сценарии като събиране на логове, обработка на поточни данни, онлайн и офлайн разпространение на съобщения и други. Въпреки че не е проектиран като традиционен MQ, Kafaka може да замени традиционните системи за съобщения като ActiveMQ в повечето случаи.
Kafka организира потока от съобщения по теми, а сървърът, който ги съхранява, се нарича брокер, и потребителите могат да се абонират за една или повече теми. За да се балансира натоварването, посланията на дадена тема могат да бъдат разделени на множество дялове, а колкото повече партиции, толкова по-високи са паралелизмът и пропускателната способност на Кафка.
Kafka клъстерите изискват поддръжка от Zookeeper за реализиране на клъстери, а Zookeeper вече е включен в най-новата дистрибуция на Kafka, която може да се внедрява едновременно за стартиране на сървър Zookeeper и Kafka сървър, или да използва други съществуващи клъстери на Zookeeper.
За разлика от традиционния MQ, потребителите трябва сами да поддържат офсет и при получаване на съобщения от Kafka да изтеглят съобщения само след текущия офсет. Scala/java клиентът на Kafka вече реализира тази част от логиката, като запазва offset в zookeeper. Всеки потребител може да избере лична карта, а потребителите със същия ID ще получат едно и също съобщение само веднъж.Ако потребителите на дадена тема използват един и същ идентификатор, това е традиционна опашка. Ако всеки потребител използва различен идентификатор, това е традиционен pub-sub.
Преглед:
Консумация на кафка
1. Потребителите на една и съща group_id, само един потребител може да консумира съобщения (Режим на опашка)
2. Потребителите на различни group_id получават едни и същи новини
Предимства на Kafka
Разпределени и силно мащабируеми. Kafka клъстерите могат да се мащабират прозрачно, за да добавят нови сървъри към клъстера.
Висока производителност. Производителността на Kafka значително надминава тази на традиционните MQ реализации като ActiveMQ и RabbitMQ, особено Kafka, която също поддържа пакетни операции. Следното изображение показва резултатите от стрес теста за потребителска производителност на LinkedIn:
Толерантност към грешки. Данните от всеки дял в Kafka се репликират към няколко сървъра. Когато брокерът се провали, услугата ZooKeeper ще уведоми производителя и потребителя, които преминават към друг брокер.
Недостатъци на Kafka:
Повтаряйте съобщенията. Кафка гарантира, че всяко съобщение ще бъде доставено поне веднъж, и макар шансовете да са малки, има шанс съобщението да бъде доставено многократно. Новините са извън ред. Въпреки че съобщенията вътре в дял гарантирано са подредени, ако една тема има няколко дяла, доставката на съобщения между тях не е гарантирано подредена. Сложност. Kafka се нуждае от подкрепата на клъстерите на зоопазителите, а темите обикновено изискват ръчен труд за създаване, внедряване и поддръжка, по-скъпи от общите опашки за съобщения
.NET/C# опашка за съобщения Кафка операции
Първо, използвайте .NET Core 3.1, за да създадете два нови конзолни проекта – Kafka-Consumer и Kafka-Producer
Използвайте nuget, за да реферирате пакета Confluent.Kafka по този начин, с следната команда:
GitHub адрес:Входът към хиперлинк е видим.
Първо стартираме програмата Producer, а ако стартираме първо потребителя, получаваме следната грешка:
Възникна грешка: Брокер: Неизвестна тема или дял Тази статия ще погълне настройкитеEnableAutoOffsetStore е невярен, тоест ръчно задаване на офсет паметта (подобно на съобщение за ръчно потвърждение)
Потребителите не задават OffsetStore след консумация
Опитайте се да използвате производителя, за да създадете две съобщения, да включите потребителско потребление, MaxPollIntervalMs = 10000 // 10 секунди без ръчна настройка, позволете на други клиенти да консумират, разбира се, няма да бъде консумирано от други клиенти в рамките на 10 секунди
MaxPollIntervalMs обяснява
За напредналите потребители максималното време за консумиране на съобщения между разговорите (например rd_kafka_consumer_poll()). Ако този интервал бъде превишен, потребителят се счита за провалил се и групата се ребалансира, така че дялът да бъде преназначен на друг потребителски член на групата. Внимание: Офсет комити може да не са възможни в момента. Забележка: Препоръчва се да се задава "enable.auto.offset.store=false" за приложения, които обработват дълго време, и след това изрично да се съхранява отклонението (с offsets_store()) след обработката на съобщението*, за да се гарантира, че офсетът няма да бъде автоматично потвърден преди да приключи обработката. Проверявайте веднъж в секунда на интервали от две. За повече информация вижте KIP-62. Визуализациите са както следва:
OffsetStore се задава след като потребителят приключи с разходите си
код
След като настройката приключи, изчакай 10 секунди и пак ще свърши работаПолучих последното съобщение(Когато потребителят се свърже с брокера,Започнете консумацията от офсетната позицияАко c.Commit(cr) е зададен; Последното съобщение няма да се получава многократно.
Вижте изходния код
commit commit offset + 1 и накрая извикай Librdkafka.topic_partition_list_destroy(cOffsets);
Входът към хиперлинк е видим.
Входът към хиперлинк е видим.
Задайте различен GroupId
Опитайте да зададете друг GroupId чрез параметъра на командния ред и след това изпратете съобщение през производителя, както е показано на следното изображение:
И clinet1, и client2Получаване на исторически съобщения, и след като продуцентът изпрати съобщение, и двете почти ще бъдатПолучаване на съобщения едновременно。
Новите потребители получават само нови съобщения
Как да накарате нов клиент да получава само нови съобщения и да игнорира исторически данни?
Настройките са следните:
Както е показано по-долу:
Код на продуцента
Както следва:
Потребителски код
Както следва:
Изтегляне на изходния код
Туристи, ако искате да видите скритото съдържание на този пост, моля Отговор
|