KafkaはLinkedInが開発した高性能で分散型メッセージングシステムで、ログ収集、ストリーミングデータ処理、オンラインおよびオフラインのメッセージ配信などのシナリオで広く使われています。 従来のMQとして設計されているわけではありませんが、Kafakaはほとんどの場合、ActiveMQのような従来のメッセージングシステムに代わることができます。
Kafkaはトピックごとにメッセージの流れを整理し、メッセージを保持するサーバーはブローカーと呼ばれ、消費者は1つ以上のトピックに購読できます。 負荷を均衡させるために、トピックのメッセージを複数のパーティションに分割でき、パーティションが多いほどKafkaの並列性とスループットが向上します。
Kafkaクラスターはクラスタを実装するためにzookeeperのサポートを必要とし、zookeeperは最新のkafkaディストリビューションにすでに含まれており、ZookeeperサーバーとKafkaサーバーを同時に起動したり、既存のZookeeperクラスターを利用するために展開できます。
従来のMQとは異なり、消費者は自分でオフセットを保持し、Kafkaからのメッセージを受け取る際は現在のオフセット以降のみメッセージを引き出します。 KafkaのScala/Javaクライアントはすでにこのロジック部分をZookeeperにオフセット保存して実装しています。 各消費者はIDを選択でき、同じIDの消費者は同じメッセージを一度だけ受け取ります。トピックの利用者が全員同じIDを使う場合、それは伝統的なキューです。 各消費者が異なるIDを使う場合、それは伝統的なパブサブです.
復習:
カフカの消費
1. 同じgroup_idの消費者は1人だけがメッセージを消費できます(キューキューモード)
2. 異なるgroup_idの消費者が同じニュースを受け取る
カフカの利点
分散型で非常にスケーラブルです。 Kafkaクラスターは透過的にスケーリングして新しいサーバーをクラスタに追加できます。
高性能です。 Kafkaのパフォーマンスは、ActiveMQやRabbitMQなどの従来のMQ実装、特にバッチ操作をサポートするKafkaを大きく上回っています。 以下の画像はLinkedInの消費者パフォーマンスストレステストの結果を示しています:
フォールトトレランス。 Kafkaの各パーティションからのデータが複数のサーバーに複製されます。 ブローカーが失敗した場合、ZooKeeperサービスは生産者と消費者に通知し、消費者は別のブローカーに切り替えます。
カフカの欠点:
メッセージを繰り返して。 カフカは各メッセージが少なくとも一度は届くことを保証しており、その確率は低いものの、同じメッセージが複数回届く可能性はあります。 ニュースは順序が分からない。 パーティション内のメッセージは秩序が保証されますが、トピックに複数のパーティションがある場合、パーティション間のメッセージ配信が秩序であることは保証されません。 複雑さ。 Kafkaはzookeeperクラスタのサポートを必要とし、通常は一般的なメッセージキューよりも高コストな手作業で作成、展開、保守が必要です
.NET/C# メッセージキュー Kafka 操作
まず、.NET Core 3.1を使って、Kafka-ConsumerとKafka-Producerという2つの新しいコンソールプロジェクトを作成します
Nugetを使って、以下のコマンドでConfluent.Kafkaパッケージを参照します。
GitHubアドレス:ハイパーリンクのログインが見えます。
まずProducerプログラムを起動し、消費者を先に起動すると以下のエラーが出ます。
エラーが発生しました:ブローカー:不明なトピックまたはパーティション この記事では、さまざまな設定を扱いますEnableAutoOffsetStore は誤りですつまり、手動でオフセットストレージを設定する(手動確認メッセージに似ています)
消費者は消費後にオフセットストアを設定しません
プロデューサーを使って2つのメッセージを生成し、消費者消費をオンにし、MaxPollIntervalMs = 10000 // 10秒を手動設定なしで設定し、他のクライアントに消費を許可します。もちろん、10秒以内に他のクライアントに消費されることはありません
MaxPollIntervalMs が説明しています
高度な消費者向けには、通話間でメッセージを消費する最大時間(例:rd_kafka_consumer_poll()))が設定されています。 この区間を超えると、コンシューマーは失敗したとみなされ、グループは再バランスされ、パーティションは別のコンシューマーグループメンバーに再割り当てされます。 警告:現時点でオフセットコミットはできない場合があります。 注意:長時間処理しているアプリケーションには「enable.auto.offset.store=false」を設定し、メッセージ処理後にオフセットを明示的に保存する(offsets_store())を用いて*、処理完了前にオフセットが自動的にコミットされないようにすることが推奨されます。 1秒ごとに2回ずつ確認してください。 詳細はKIP-62をご覧ください。 レンダリングは以下の通りです:
オフセットストアは消費者の支出を終えた後に設定されます
コード
セットアップが完了した後、10秒待ってもまだ動作します最後のメッセージを受け取りました(消費者がブローカーに接続するとき、オフセット位置から消費を開始しますc.Commit(cr) が設定されている場合; 最後のメッセージは繰り返し受信されません。
ソースコードを見る
オフセット+1コミットをコミットし、最終的にLibrdkafka.topic_partition_list_destroy(cOffsets)を呼び出します。
ハイパーリンクのログインが見えます。
ハイパーリンクのログインが見えます。
別のGroupIDを設定しましょう
コマンドラインパラメータで別のGroupIDを設定し、次の画像のようにプロデューサー経由でメッセージを送信してみてください。
clinet1とclient2の両方です過去のメッセージを受け取るそして、プロデューサーがメッセージを送った後、両者はほぼ同時にメッセージを受け取る。
新しい消費者は新しいメッセージしか受け取りません
新しいクライアントに新しいメッセージだけを受け取り、過去のデータを無視するにはどうすればいいですか?
舞台設定は以下の通りです:
以下に示すように:
プロデューサーコード
次のように:
消費者コード
次のように:
ソースコードダウンロード
観光客の皆さん、この投稿の隠された内容を見たい方は、どうぞ 答える
|