この記事は機械翻訳のミラー記事です。元の記事にジャンプするにはこちらをクリックしてください。

眺める: 129246|答える: 17

[出典] .NET/C#メッセージキュー用のKafka操作[ソースコード付き]

[リンクをコピー]
掲載地 2021/04/13 11:45:31 | | | |
KafkaはLinkedInが開発した高性能で分散型メッセージングシステムで、ログ収集、ストリーミングデータ処理、オンラインおよびオフラインのメッセージ配信などのシナリオで広く使われています。 従来のMQとして設計されているわけではありませんが、Kafakaはほとんどの場合、ActiveMQのような従来のメッセージングシステムに代わることができます。

Kafkaはトピックごとにメッセージの流れを整理し、メッセージを保持するサーバーはブローカーと呼ばれ、消費者は1つ以上のトピックに購読できます。 負荷を均衡させるために、トピックのメッセージを複数のパーティションに分割でき、パーティションが多いほどKafkaの並列性とスループットが向上します。

Kafkaクラスターはクラスタを実装するためにzookeeperのサポートを必要とし、zookeeperは最新のkafkaディストリビューションにすでに含まれており、ZookeeperサーバーとKafkaサーバーを同時に起動したり、既存のZookeeperクラスターを利用するために展開できます。

従来のMQとは異なり、消費者は自分でオフセットを保持し、Kafkaからのメッセージを受け取る際は現在のオフセット以降のみメッセージを引き出します。 KafkaのScala/Javaクライアントはすでにこのロジック部分をZookeeperにオフセット保存して実装しています。 各消費者はIDを選択でき、同じIDの消費者は同じメッセージを一度だけ受け取ります。トピックの利用者が全員同じIDを使う場合、それは伝統的なキューです。 各消費者が異なるIDを使う場合、それは伝統的なパブサブです.

復習:

Windows上でシステムサービスにActiveMQを追加する
https://www.itsvse.com/thread-6210-1-1.html

保留中のメッセージ数、待ち受けているメッセージ、メッセージタグ...
https://www.itsvse.com/thread-4954-1-1.html

ActiveMQおよびRabbitMQに関する情報概要
https://www.itsvse.com/thread-4659-1-1.html

CentOS ActiveMQがサービスに追加されました
https://www.itsvse.com/thread-4617-1-1.html

Centos 6.2 64ビットインストール activemqチュートリアル
https://www.itsvse.com/thread-4616-1-1.html

ActiveMQ5.15.3が起動に失敗し、UnsupportedClassVersionErrorが報告されます
https://www.itsvse.com/thread-4615-1-1.html

ActiveMQトピック権限設定
https://www.itsvse.com/thread-4495-1-1.html

ユーザーのitsvseは以下の場所から読み取る権限がありません:ActiveMQ.Advisory.TempQueue, Activ...
https://www.itsvse.com/thread-4476-1-1.html

C# ActiveMQクライアントはソースコードをサブスクライブします
https://www.itsvse.com/thread-4470-1-1.html

.net/c# activemq:接続アカウントとパスワードを設定するために
https://www.itsvse.com/thread-4282-1-1.html

ACTIVEMQテーマとキューのユーザー名とパスワードを設定します
https://www.itsvse.com/thread-4281-1-1.html

ActiveMQはウェブサイト管理パスワードを変更します
https://www.itsvse.com/thread-4280-1-1.html

activemqの永続ストアは満員です
https://www.itsvse.com/thread-4125-1-1.html

.NET/C# ActiveMQ操作例【ソースコード】
https://www.itsvse.com/thread-3907-1-1.html

Activemqユーザー権限設定
https://www.itsvse.com/thread-3906-1-1.html

activemqキューとトピックの違いは次の通りです。
https://www.itsvse.com/thread-3863-1-1.html

. .Netプラットフォーム
https://www.itsvse.com/thread-3452-1-1.html

ActiveMQの永続的サブスクリプション設定
https://www.itsvse.com/thread-3451-1-1.html

RabbitMQ BasicQos 消費者並列処理の制限
https://www.itsvse.com/thread-4667-1-1.html

rabbitMQ キューキューメッセージの永続性[ソースコード付き]
https://www.itsvse.com/thread-4657-1-1.html

【プラクティス】rabbitMQコンソールでアカウント情報を追加する
https://www.itsvse.com/thread-4655-1-1.html

RabbitMQメッセージ応答のメカニズムの詳細な分析
https://www.itsvse.com/thread-4639-1-1.html

.net/c# RabbitMQ接続切断 - 切断と再接続
https://www.itsvse.com/thread-4636-1-1.html

RabbitMQの3つの交換モード(ファンアウト、ダイレクト、トピック)の紹介
https://www.itsvse.com/thread-4635-1-1.html

【Practice】RabbitMQがウェブ管理プラグインをインストールする
https://www.itsvse.com/thread-4631-1-1.html

【実戦】RabbitMQのWindowsインストールチュートリアル
https://www.itsvse.com/thread-4630-1-1.html
カフカの消費

1. 同じgroup_idの消費者は1人だけがメッセージを消費できます(キューキューモード

2. 異なるgroup_idの消費者が同じニュースを受け取る

カフカの利点

分散型で非常にスケーラブルです。 Kafkaクラスターは透過的にスケーリングして新しいサーバーをクラスタに追加できます。

高性能です。 Kafkaのパフォーマンスは、ActiveMQやRabbitMQなどの従来のMQ実装、特にバッチ操作をサポートするKafkaを大きく上回っています。 以下の画像はLinkedInの消費者パフォーマンスストレステストの結果を示しています:

フォールトトレランス。 Kafkaの各パーティションからのデータが複数のサーバーに複製されます。 ブローカーが失敗した場合、ZooKeeperサービスは生産者と消費者に通知し、消費者は別のブローカーに切り替えます。

カフカの欠点:

メッセージを繰り返して。 カフカは各メッセージが少なくとも一度は届くことを保証しており、その確率は低いものの、同じメッセージが複数回届く可能性はあります。
ニュースは順序が分からない。 パーティション内のメッセージは秩序が保証されますが、トピックに複数のパーティションがある場合、パーティション間のメッセージ配信が秩序であることは保証されません。
複雑さ。 Kafkaはzookeeperクラスタのサポートを必要とし、通常は一般的なメッセージキューよりも高コストな手作業で作成、展開、保守が必要です

.NET/C# メッセージキュー Kafka 操作

まず、.NET Core 3.1を使って、Kafka-ConsumerとKafka-Producerという2つの新しいコンソールプロジェクトを作成します

Nugetを使って、以下のコマンドでConfluent.Kafkaパッケージを参照します。

GitHubアドレス:ハイパーリンクのログインが見えます。

まずProducerプログラムを起動し、消費者を先に起動すると以下のエラーが出ます。
エラーが発生しました:ブローカー:不明なトピックまたはパーティション

この記事では、さまざまな設定を扱いますEnableAutoOffsetStore は誤りですつまり、手動でオフセットストレージを設定する(手動確認メッセージに似ています)

消費者は消費後にオフセットストアを設定しません

プロデューサーを使って2つのメッセージを生成し、消費者消費をオンにし、MaxPollIntervalMs = 10000 // 10秒を手動設定なしで設定し、他のクライアントに消費を許可します。もちろん、10秒以内に他のクライアントに消費されることはありません

MaxPollIntervalMs が説明しています
高度な消費者向けには、通話間でメッセージを消費する最大時間(例:rd_kafka_consumer_poll()))が設定されています。 この区間を超えると、コンシューマーは失敗したとみなされ、グループは再バランスされ、パーティションは別のコンシューマーグループメンバーに再割り当てされます。 警告:現時点でオフセットコミットはできない場合があります。 注意:長時間処理しているアプリケーションには「enable.auto.offset.store=false」を設定し、メッセージ処理後にオフセットを明示的に保存する(offsets_store())を用いて*、処理完了前にオフセットが自動的にコミットされないようにすることが推奨されます。 1秒ごとに2回ずつ確認してください。 詳細はKIP-62をご覧ください。

レンダリングは以下の通りです:



オフセットストアは消費者の支出を終えた後に設定されます

コード

セットアップが完了した後、10秒待ってもまだ動作します最後のメッセージを受け取りました(消費者がブローカーに接続するとき、オフセット位置から消費を開始しますc.Commit(cr) が設定されている場合; 最後のメッセージは繰り返し受信されません。

ソースコードを見る



オフセット+1コミットをコミットし、最終的にLibrdkafka.topic_partition_list_destroy(cOffsets)を呼び出します。

ハイパーリンクのログインが見えます。
ハイパーリンクのログインが見えます。

別のGroupIDを設定しましょう

コマンドラインパラメータで別のGroupIDを設定し、次の画像のようにプロデューサー経由でメッセージを送信してみてください。



clinet1とclient2の両方です過去のメッセージを受け取るそして、プロデューサーがメッセージを送った後、両者はほぼ同時にメッセージを受け取る

新しい消費者は新しいメッセージしか受け取りません

新しいクライアントに新しいメッセージだけを受け取り、過去のデータを無視するにはどうすればいいですか?

舞台設定は以下の通りです:

以下に示すように:



プロデューサーコード

次のように:

消費者コード

次のように:

ソースコードダウンロード

観光客の皆さん、この投稿の隠された内容を見たい方は、どうぞ答える






先の:Tencent Enterprise Mailboxを使用した.NET/C#例外:操作がタイムアウトしました。
次に:NuGetがキャッシュをクリアします
 地主| 掲載地 2021/04/15 9:31:05 |
.NET Kafkaクライアントが切断されると例外は出さず、ネットワークが正常になった後に再接続されます
%4|1618450028.267|FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1:切断(UP州で59926ms以降)
%3|1618450028.267|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: 切断(州UPで59926ms以降)
%3|1618450028.267|FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 失敗:不明エラー(状態CONNECTに0ms後)
%3|1618450028.268|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: 接続 to ipv4#192.168.1.175:9092 失敗: 不明エラー(状態 CONNECTに0ms後)
%3|1618450028.357|FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: IPv4#192.168.1.175:9092 接続失敗:不明エラー(CONNECTの状態で10ms経過後、同一のエラー1件抑制)
%3|1618450028.357|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 失敗:不明エラー(状態CONNECTで10ms経過後、同一のエラー1件抑制)
%3|1618450062.882|FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: ipv4#192.168.1.175:9092 に接続が失敗しました:不明エラー(CONNECTの状態が0ms経過後、同一のエラー8件抑制)
%3|1618450062.882|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 失敗:不明エラー(状態CONNECTで0ms経過後、同一エラー8件抑制)
%3|1618450098.255|FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: IPv4#192.168.1.175:9092に接続失敗:不明エラー(CONNECTの状態で11ms後、同一のエラー4件抑制)
%3|1618450098.255|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: IPv4への接続失敗#192.168.1.175:9092失敗:不明エラー(CONNECT状態11ms後、同一のエラー4件抑制)
%3|1618450138.243|FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: IPv4#192.168.1.175:9092 に接続が失敗:不明エラー(CONNECTの状態に0ms後、同一のエラー4件抑制)
%3|1618450138.244|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 失敗:不明エラー(状態CONNECTで0ms経過後、同一エラー4件抑制)
%3|1618450168.254|FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: IPv4に接続#192.168.1.175:9092に失敗:不明エラー(状態CONNECTで10ms後、同一のエラー3件抑制)
%3|1618450168.254|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 失敗:不明エラー(状態CONNECTで10ms経過後、同一エラー3件抑制)

 地主| 掲載地 2021/04/13 16:26:58 |
メッセージ消費の原理:

実際の本番プロセスでは、各トピックに複数のパーティションがあり、複数パーティションの利点は、一方でブローカー上でデータをシャーディングできる能力によってメッセージの容量を効果的に減らし、IO性能を向上させることです。 一方で、消費者側の消費力を向上させるためには、同じトピックを複数の消費者、すなわち消費者側の負荷分散メカニズムを通じて消費します。これは次に理解するテーマです。複数のパーティションや複数の消費者の場合、消費者はどのようにメッセージを消費するのか? カフカは消費者グループの概念に存在します。すなわち、消費者グループに属する同じ種類の消費者 group.id、そのグループ内のすべての消費者がサブスクリプショントピックのすべての分割を消費するように協調します。 もちろん、各パーティションは同じ消費者グループの消費者のみが消費できるので、同じ消費者グループの消費者はどのパーティションで消費すべきかをどのように割り当てるのでしょうか? 簡単な例として、パーティションが失われている場合、すなわちパーティトンの数がコンシューマーの数と同じ場合、各コンシューマーはパーティションに対応します。コンシューマーの数がパーティションより多ければ、その余った数のコンシューマーは機能しません。逆に、複数のパーティションを消費するコンシューマーが存在します。

ゾーニング割り当て戦略:

カフカには2つの分割割り当て戦略があり、1つはレンジ(デフォルト)で、もう1つはラウンドロビン(ポーリング)です。 これは消費者の構成 partition.assignment.strategy パラメータによって設定されます。


すべてのトピックを見る


トピックの詳細を見る




 地主| 掲載地 2021/05/08 17:17:33 |
Kafkaは消費者グループを削除します



リクエストされた消費者グループ(「itsvse」)の削除は成功しました。


以下の誤りが報告されることがあります:

Error: Deletion of some consumer groups failed:
* グループ「itsvse」は以下の理由で削除できませんでした:java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: グループは空ではありません。
解決

すべてのメッセージを消費するか、オフセットを設定するか

カフカは手動でオフセットオフセットを設定します
https://www.itsvse.com/thread-9641-1-1.html
それから、また削除してください!

 地主| 掲載地 2021/04/13 15:40:48 |
パワーシェルコマンド



各消費者クライアントはKafkaサービスに対して2つの接続を維持しています
 地主| 掲載地 2021/05/07 12:37:06 |
Kafkaは、指定されたグループ内のトピックのスタック数を表示するために

掲載地 2021/06/16 12:41:09 |
なぜコードが見られないのか、ぜひお尋ねください~
 地主| 掲載地 2021/06/25 10:50:06 |
カフカはトピックサイズコマンドを受け取ります:



 地主| 掲載地 2021/07/18 10:15:01 |
トピックを作成するためのKafkaコマンドライン:

掲載地 2021/09/03 11:52:41 |
カフカにはまだ多くの落とし穴があることが学ばれています
免責事項:
Code Farmer Networkが発行するすべてのソフトウェア、プログラミング資料、記事は学習および研究目的のみを目的としています。 上記の内容は商業的または違法な目的で使用されてはならず、そうでなければ利用者はすべての結果を負うことになります。 このサイトの情報はインターネットからのものであり、著作権紛争はこのサイトとは関係ありません。 ダウンロード後24時間以内に上記の内容を完全にパソコンから削除してください。 もしこのプログラムを気に入ったら、正規のソフトウェアを支持し、登録を購入し、より良い本物のサービスを受けてください。 もし侵害があれば、メールでご連絡ください。

Mail To:help@itsvse.com