Тази статия е огледална статия за машинен превод, моля, кликнете тук, за да преминете към оригиналната статия.

Изглед: 129246|Отговор: 17

[Източник] Кафка операции за .NET/C# опашки за съобщения [с изходен код]

[Копирай линк]
Публикувано в 13.04.2021 г. 11:45:31 ч. | | | |
Kafka е високопроизводителна, разпределена система за съобщения, разработена от LinkedIn, която се използва широко в сценарии като събиране на логове, обработка на поточни данни, онлайн и офлайн разпространение на съобщения и други. Въпреки че не е проектиран като традиционен MQ, Kafaka може да замени традиционните системи за съобщения като ActiveMQ в повечето случаи.

Kafka организира потока от съобщения по теми, а сървърът, който ги съхранява, се нарича брокер, и потребителите могат да се абонират за една или повече теми. За да се балансира натоварването, посланията на дадена тема могат да бъдат разделени на множество дялове, а колкото повече партиции, толкова по-високи са паралелизмът и пропускателната способност на Кафка.

Kafka клъстерите изискват поддръжка от Zookeeper за реализиране на клъстери, а Zookeeper вече е включен в най-новата дистрибуция на Kafka, която може да се внедрява едновременно за стартиране на сървър Zookeeper и Kafka сървър, или да използва други съществуващи клъстери на Zookeeper.

За разлика от традиционния MQ, потребителите трябва сами да поддържат офсет и при получаване на съобщения от Kafka да изтеглят съобщения само след текущия офсет. Scala/java клиентът на Kafka вече реализира тази част от логиката, като запазва offset в zookeeper. Всеки потребител може да избере лична карта, а потребителите със същия ID ще получат едно и също съобщение само веднъж.Ако потребителите на дадена тема използват един и същ идентификатор, това е традиционна опашка. Ако всеки потребител използва различен идентификатор, това е традиционен pub-sub.

Преглед:

Добавете ActiveMQ към системните услуги под Windows
https://www.itsvse.com/thread-6210-1-1.html

БройНа чакащитеСъобщения, СъобщенияВ Опашка, Съобщения...
https://www.itsvse.com/thread-4954-1-1.html

Обобщение на информацията за ActiveMQ и RabbitMQ
https://www.itsvse.com/thread-4659-1-1.html

CentOS ActiveMQ е добавен към услугата
https://www.itsvse.com/thread-4617-1-1.html

Centos 6.2 64-битов инсталационен туториал за activemq
https://www.itsvse.com/thread-4616-1-1.html

ActiveMQ5.15.3 не стартира и се докладва UnsupportedClassVersionError
https://www.itsvse.com/thread-4615-1-1.html

Настройки за разрешение за теми в Activemq
https://www.itsvse.com/thread-4495-1-1.html

Потребителят itsvse няма право да чете от: ActiveMQ.Advisory.TempQueue,Activ...
https://www.itsvse.com/thread-4476-1-1.html

C# ActiveMQ клиентът се абонира за изходния код
https://www.itsvse.com/thread-4470-1-1.html

.NET/C# Activemq за настройка на акаунта за връзка и паролата
https://www.itsvse.com/thread-4282-1-1.html

Задайте потребителското име и паролата за темата ACTIVEMQ и опашката
https://www.itsvse.com/thread-4281-1-1.html

activemq променя паролата за управление на уебсайта
https://www.itsvse.com/thread-4280-1-1.html

activemq Persistent Store е пълен
https://www.itsvse.com/thread-4125-1-1.html

Пример за .NET/C# ActiveMQ операция [Изходен код]
https://www.itsvse.com/thread-3907-1-1.html

Конфигурация на потребителски разрешения в Activemq
https://www.itsvse.com/thread-3906-1-1.html

Разликата между activemq Queue и Topic е, че
https://www.itsvse.com/thread-3863-1-1.html

. .Net платформа
https://www.itsvse.com/thread-3452-1-1.html

Постоянни абонаментни настройки на ActiveMQ
https://www.itsvse.com/thread-3451-1-1.html

Ограничение на потребителската паралелна обработка в RabbitMQ BasicQos
https://www.itsvse.com/thread-4667-1-1.html

rabbitMQ Опашка за опашка Запазване на съобщенията [с изходен код]
https://www.itsvse.com/thread-4657-1-1.html

【Practice】rabbitMQ console за добавяне на информация за акаунта
https://www.itsvse.com/thread-4655-1-1.html

Задълбочен анализ на механизма на отговор на съобщенията в RabbitMQ
https://www.itsvse.com/thread-4639-1-1.html

.net/c# RabbitMQ прекъсване на връзката - прекъсване и повторно свързване
https://www.itsvse.com/thread-4636-1-1.html

Въведение в трите режима на обмен (фанаут, директен и тематичен режим) на RabbitMQ
https://www.itsvse.com/thread-4635-1-1.html

【Practice】RabbitMQ инсталира плъгина за управление на уеб
https://www.itsvse.com/thread-4631-1-1.html

【Practical Combat】RabbitMQ инсталационен урок под Windows
https://www.itsvse.com/thread-4630-1-1.html
Консумация на кафка

1. Потребителите на една и съща group_id, само един потребител може да консумира съобщения (Режим на опашка

2. Потребителите на различни group_id получават едни и същи новини

Предимства на Kafka

Разпределени и силно мащабируеми. Kafka клъстерите могат да се мащабират прозрачно, за да добавят нови сървъри към клъстера.

Висока производителност. Производителността на Kafka значително надминава тази на традиционните MQ реализации като ActiveMQ и RabbitMQ, особено Kafka, която също поддържа пакетни операции. Следното изображение показва резултатите от стрес теста за потребителска производителност на LinkedIn:

Толерантност към грешки. Данните от всеки дял в Kafka се репликират към няколко сървъра. Когато брокерът се провали, услугата ZooKeeper ще уведоми производителя и потребителя, които преминават към друг брокер.

Недостатъци на Kafka:

Повтаряйте съобщенията. Кафка гарантира, че всяко съобщение ще бъде доставено поне веднъж, и макар шансовете да са малки, има шанс съобщението да бъде доставено многократно.
Новините са извън ред. Въпреки че съобщенията вътре в дял гарантирано са подредени, ако една тема има няколко дяла, доставката на съобщения между тях не е гарантирано подредена.
Сложност. Kafka се нуждае от подкрепата на клъстерите на зоопазителите, а темите обикновено изискват ръчен труд за създаване, внедряване и поддръжка, по-скъпи от общите опашки за съобщения

.NET/C# опашка за съобщения Кафка операции

Първо, използвайте .NET Core 3.1, за да създадете два нови конзолни проекта – Kafka-Consumer и Kafka-Producer

Използвайте nuget, за да реферирате пакета Confluent.Kafka по този начин, с следната команда:

GitHub адрес:Входът към хиперлинк е видим.

Първо стартираме програмата Producer, а ако стартираме първо потребителя, получаваме следната грешка:
Възникна грешка: Брокер: Неизвестна тема или дял

Тази статия ще погълне настройкитеEnableAutoOffsetStore е невярен, тоест ръчно задаване на офсет паметта (подобно на съобщение за ръчно потвърждение)

Потребителите не задават OffsetStore след консумация

Опитайте се да използвате производителя, за да създадете две съобщения, да включите потребителско потребление, MaxPollIntervalMs = 10000 // 10 секунди без ръчна настройка, позволете на други клиенти да консумират, разбира се, няма да бъде консумирано от други клиенти в рамките на 10 секунди

MaxPollIntervalMs обяснява
За напредналите потребители максималното време за консумиране на съобщения между разговорите (например rd_kafka_consumer_poll()). Ако този интервал бъде превишен, потребителят се счита за провалил се и групата се ребалансира, така че дялът да бъде преназначен на друг потребителски член на групата. Внимание: Офсет комити може да не са възможни в момента. Забележка: Препоръчва се да се задава "enable.auto.offset.store=false" за приложения, които обработват дълго време, и след това изрично да се съхранява отклонението (с offsets_store()) след обработката на съобщението*, за да се гарантира, че офсетът няма да бъде автоматично потвърден преди да приключи обработката. Проверявайте веднъж в секунда на интервали от две. За повече информация вижте KIP-62.

Визуализациите са както следва:



OffsetStore се задава след като потребителят приключи с разходите си

код

След като настройката приключи, изчакай 10 секунди и пак ще свърши работаПолучих последното съобщение(Когато потребителят се свърже с брокера,Започнете консумацията от офсетната позицияАко c.Commit(cr) е зададен; Последното съобщение няма да се получава многократно.

Вижте изходния код



commit commit offset + 1 и накрая извикай Librdkafka.topic_partition_list_destroy(cOffsets);

Входът към хиперлинк е видим.
Входът към хиперлинк е видим.

Задайте различен GroupId

Опитайте да зададете друг GroupId чрез параметъра на командния ред и след това изпратете съобщение през производителя, както е показано на следното изображение:



И clinet1, и client2Получаване на исторически съобщения, и след като продуцентът изпрати съобщение, и двете почти ще бъдатПолучаване на съобщения едновременно

Новите потребители получават само нови съобщения

Как да накарате нов клиент да получава само нови съобщения и да игнорира исторически данни?

Настройките са следните:

Както е показано по-долу:



Код на продуцента

Както следва:

Потребителски код

Както следва:

Изтегляне на изходния код

Туристи, ако искате да видите скритото съдържание на този пост, моляОтговор






Предишен:.NET/C# Изключение при използване на Tencent Enterprise Mailbox: Операцията е изтекла.
Следващ:NuGet изчиства кеша
 Хазяин| Публикувано в 15.04.2021 г. 9:31:05 ч. |
Когато .NET Kafka клиентът е прекъснат, той не прави изключение и се свързва отново след като мрежата е нормална
%4|1618450028.267| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Прекъснат (след 59926ms в щата UP)
%3|1618450028.267| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Прекъснат (след 59926ms в щат UP)
%3|1618450028.267| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Свързване към ipv4#192.168.1.175:9092 неуспешно: Неизвестна грешка (след 0ms в състояние CONNECT)
%3|1618450028.268| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Свързване към ipv4#192.168.1.175:9092 неуспешно: Неизвестна грешка (след 0ms в състояние CONNECT)
%3|1618450028.357| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Свързване към ipv4#192.168.1.175:9092 неуспешно: Неизвестна грешка (след 10ms в състояние CONNECT, 1 идентична грешка(и) потисната)
%3|1618450028.357| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Свързване към ipv4#192.168.1.175:9092 неуспешно: Неизвестна грешка (след 10ms в състояние CONNECT, 1 идентична грешка(и) потисната)
%3|1618450062.882| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Свързване към ipv4#192.168.1.175:9092 неуспешно: Неизвестна грешка (след 0ms в състояние CONNECT, 8 идентични грешки потиснати)
%3|1618450062.882| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Свързване към ipv4#192.168.1.175:9092 неуспешно: Неизвестна грешка (след 0ms в състояние CONNECT, 8 идентични грешки потиснати)
%3|1618450098.255| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Свързване към ipv4#192.168.1.175:9092 неуспешно: Неизвестна грешка (след 11ms в състояние CONNECT, 4 идентични грешки потиснати)
%3|1618450098.255| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Свързване към ipv4#192.168.1.175:9092 неуспешно: Неизвестна грешка (след 11ms в състояние CONNECT, 4 идентични грешки потиснати)
%3|1618450138.243| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Свързване към ipv4#192.168.1.175:9092 неуспешно: Неизвестна грешка (след 0ms в състояние CONNECT, 4 идентични грешки потиснати)
%3|1618450138.244| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Свързване към ipv4#192.168.1.175:9092 неуспешно: Неизвестна грешка (след 0ms в състояние CONNECT, 4 идентични грешки потиснати)
%3|1618450168.254| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Свързване към ipv4#192.168.1.175:9092 неуспешно: Неизвестна грешка (след 10ms в състояние CONNECT, 3 идентични грешки потиснати)
%3|1618450168.254| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Свързване към ipv4#192.168.1.175:9092 неуспешно: Неизвестна грешка (след 10ms в състояние CONNECT, 3 идентични грешки потиснати)

 Хазяин| Публикувано в 13.04.2021 г. 16:26:58 ч. |
Принцип на консумация на съобщения:

В реалния производствен процес всяка тема ще има множество дяла, а предимството на множество дялове е, че от една страна възможността да се шардират данните върху брокера ефективно намалява капацитета на съобщенията и подобрява производителността на IO. От друга страна, за да се подобри консумационната мощ на потребителската страна, същата тема обикновено се консумира чрез множество потребители, тоест механизмът за балансиране на натоварването на потребителската страна, което ще разберем следващо: как потребителите консумират съобщения в случай на множество дялове и множество потребители? Кафка съществува в концепцията за потребителски групи, тоест group.id един и същ тип потребители, които принадлежат към потребителска група, и всички потребители в групата координират да консумират всички части на абонаментната тема. Разбира се, всеки дял може да бъде консумиран само от потребителите в една и съща потребителска група, така че как потребителите в същата потребителска група разпределят данните, в които трябва да се консумира дялът? За прост пример, ако има разклонения, които губят, тоест когато броят на партитоните е равен на броя на консуматорите, всеки консуматор съответства на разпределение; ако броят на консуматорите е по-голям от разпределенията, тогава допълнителният брой консуматори няма да работи, напротив, ще има потребители, които консумират множество разпределения.

Стратегия за разпределение на зонирането:

В kafka има две стратегии за разпределение на разделяне – едната е Range (по подразбиране), а другата е RoundRobin (анкетиране). Това се задава от конфигурационния параметър partition.assignment.strategy на потребителя.


Вижте всички теми


Вижте подробности за дадена тема




 Хазяин| Публикувано в 8.05.2021 г. 17:17:33 ч. |
Kafka изтрива потребителски групи



Изтриването на поискани потребителски групи ("itsvse") беше успешно.


Могат да бъдат докладвани следните грешки:

Error: Deletion of some consumer groups failed:
* Групата 'itsvse' не можа да бъде изтрита поради: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: Групата не е празна.
решение

Консумирайте всички съобщения или задайте офсет

Kafka ръчно задава офсетния офсет
https://www.itsvse.com/thread-9641-1-1.html
После го изтрий отново!

 Хазяин| Публикувано в 13.04.2021 г. 15:40:48 ч. |
Команда за power shell



Всеки потребителски клиент поддържа 2 връзки към услугата Kafka
 Хазяин| Публикувано в 7.05.2021 г. 12:37:06 ч. |
kafka, за да се види броят стекове от теми под определена група

Публикувано в 16.06.2021 г. 12:41:09 ч. |
Моля, попитайте защо кодът не може да бъде прегледан~
 Хазяин| Публикувано в 25.06.2021 г. 10:50:06 ч. |
Кафка получава командата за размер на темата:



 Хазяин| Публикувано в 18.07.2021 г. 10:15:01 ч. |
Kafka команден ред за създаване на теми:

Публикувано в 3.09.2021 г. 11:52:41 ч. |
Все още има много капани в кафката, научени
Отричане:
Целият софтуер, програмни материали или статии, публикувани от Code Farmer Network, са само за учебни и изследователски цели; Горното съдържание не трябва да се използва за търговски или незаконни цели, в противен случай потребителите ще понесат всички последствия. Информацията на този сайт идва от интернет, а споровете за авторски права нямат нищо общо с този сайт. Трябва напълно да изтриете горното съдържание от компютъра си в рамките на 24 часа след изтеглянето. Ако ви харесва програмата, моля, подкрепете оригинален софтуер, купете регистрация и получете по-добри услуги. Ако има нарушение, моля, свържете се с нас по имейл.

Mail To:help@itsvse.com