Artikel ini adalah artikel cermin dari terjemahan mesin, silakan klik di sini untuk melompat ke artikel aslinya.

Melihat: 129246|Jawab: 17

[Sumber] Operasi Kafka untuk antrean pesan .NET/C# [dengan kode sumber]

[Salin tautan]
Diposting pada 13/04/2021 11.45.31 | | | |
Kafka adalah sistem perpesanan terdistribusi berkinerja tinggi yang dikembangkan oleh LinkedIn yang banyak digunakan dalam skenario seperti pengumpulan log, pemrosesan data streaming, distribusi pesan online dan offline, dan banyak lagi. Meskipun tidak dirancang sebagai MQ tradisional, Kafaka dapat menggantikan sistem perpesanan tradisional seperti ActiveMQ dalam banyak kasus.

Kafka mengatur alur pesan berdasarkan topik, dan server yang menyimpan pesan disebut broker, dan konsumen dapat berlangganan satu atau lebih topik. Untuk menyeimbangkan beban, pesan dari suatu topik dapat dibagi menjadi beberapa partisi, dan semakin banyak partisi, semakin tinggi paralelisme dan throughput Kafka.

Klaster Kafka memerlukan dukungan zookeeper untuk mengimplementasikan kluster, dan zookeeper sudah termasuk dalam distribusi Kafka terbaru, yang dapat digunakan untuk memulai server zookeeper dan server Kafka secara bersamaan, atau menggunakan kluster zookeeper lain yang ada.

Tidak seperti MQ tradisional, konsumen perlu menyimpan offset sendiri, dan ketika mendapatkan pesan dari kafka, hanya menarik pesan setelah offset saat ini. Klien scala/java Kafka sudah mengimplementasikan bagian logika ini dengan menyimpan offset ke penjaga kebun binatang. Setiap konsumen dapat memilih ID, dan konsumen dengan ID yang sama hanya akan menerima pesan yang sama satu kali.Jika konsumen dari suatu topik semua menggunakan id yang sama, itu adalah Antrean tradisional. Jika setiap konsumen menggunakan ID yang berbeda, itu adalah pub-sub tradisional.

Resensi:

Tambahkan ActiveMQ ke layanan sistem di bawah Windows
https://www.itsvse.com/thread-6210-1-1.html

NumberOfPendingMessages, MessagesEnqueued, Messag...
https://www.itsvse.com/thread-4954-1-1.html

Ringkasan informasi tentang ActiveMQ dan RabbitMQ
https://www.itsvse.com/thread-4659-1-1.html

CentOS ActiveMQ ditambahkan ke layanan
https://www.itsvse.com/thread-4617-1-1.html

Centos 6.2 64-bit instalasi activemq tutorial
https://www.itsvse.com/thread-4616-1-1.html

ActiveMQ5.15.3 gagal dimulai, dan UnsupportedClassVersionError dilaporkan
https://www.itsvse.com/thread-4615-1-1.html

Pengaturan izin topik Activemq
https://www.itsvse.com/thread-4495-1-1.html

Pengguna tidak berwenang untuk membaca dari: ActiveMQ.Advisory.TempQueue,Activ...
https://www.itsvse.com/thread-4476-1-1.html

Klien C# ActiveMQ berlangganan kode sumber
https://www.itsvse.com/thread-4470-1-1.html

.net/c# activemq untuk mengatur akun koneksi dan kata sandi
https://www.itsvse.com/thread-4282-1-1.html

Atur nama pengguna dan kata sandi untuk tema dan antrean ACTIVEMQ
https://www.itsvse.com/thread-4281-1-1.html

ActiveMQ memodifikasi kata sandi manajemen situs web
https://www.itsvse.com/thread-4280-1-1.html

activemq Penyimpanan persisten penuh
https://www.itsvse.com/thread-4125-1-1.html

Contoh operasi .NET/C# ActiveMQ [Kode sumber]
https://www.itsvse.com/thread-3907-1-1.html

Konfigurasi izin pengguna Activemq
https://www.itsvse.com/thread-3906-1-1.html

Perbedaan antara Queue dan Topic activemq adalah bahwa
https://www.itsvse.com/thread-3863-1-1.html

. Platform .Net
https://www.itsvse.com/thread-3452-1-1.html

Pengaturan langganan persisten ActiveMQ
https://www.itsvse.com/thread-3451-1-1.html

Batas pemrosesan paralel konsumen RabbitMQ BasicQos
https://www.itsvse.com/thread-4667-1-1.html

rabbitMQ Queue Queue Message Persistence [dengan kode sumber]
https://www.itsvse.com/thread-4657-1-1.html

【Latihan】Konsol rabbitMQ untuk menambahkan informasi akun
https://www.itsvse.com/thread-4655-1-1.html

Analisis mendalam tentang mekanisme respons pesan RabbitMQ
https://www.itsvse.com/thread-4639-1-1.html

.net/c# Pemutusan koneksi RabbitMQ - pemutusan dan koneksi ulang
https://www.itsvse.com/thread-4636-1-1.html

Pengantar tiga mode pertukaran (fanout, langsung, dan topik) RabbitMQ
https://www.itsvse.com/thread-4635-1-1.html

【Latihan】RabbitMQ menginstal plugin manajemen web
https://www.itsvse.com/thread-4631-1-1.html

【Pertempuran Praktis】 Tutorial instalasi RabbitMQ di bawah Windows
https://www.itsvse.com/thread-4630-1-1.html
Konsumsi kafka

1. Konsumen dari group_id yang sama, hanya satu konsumen yang dapat mengonsumsi pesan (Mode Antrean

2. Konsumen dari berbagai group_id menerima berita yang sama

Keuntungan dari Kafka

Terdistribusi dan sangat terukur. Kluster Kafka dapat diskalakan secara transparan untuk menambahkan server baru ke kluster.

Kinerja tinggi. Kinerja Kafka jauh melebihi implementasi MQ tradisional seperti ActiveMQ dan RabbitMQ, terutama Kafka, yang juga mendukung operasi batch. Gambar berikut menunjukkan hasil uji stres kinerja konsumen LinkedIn:

Toleransi kesalahan. Data dari setiap partisi di Kafka direplikasi ke beberapa server. Ketika broker gagal, layanan ZooKeeper akan memberi tahu produsen dan konsumen, yang beralih ke broker lain.

Kekurangan Kafka:

Ulangi pesan. Kafka hanya menjamin bahwa setiap pesan akan dikirimkan setidaknya sekali, dan meskipun kemungkinannya tipis, ada kemungkinan pesan akan terkirim beberapa kali.
Beritanya rusak. Meskipun pesan di dalam partisi dijamin teratur, jika topik memiliki beberapa partisi, pengiriman pesan antar partisi tidak dijamin teratur.
Kompleksitas. Kafka membutuhkan dukungan kluster penjaga kebun binatang, dan topik biasanya memerlukan tenaga kerja manual untuk membuat, menyebarkan, dan memelihara lebih mahal daripada antrean pesan umum

Operasi Kafka antrean pesan .NET/C#

Pertama, gunakan .NET Core 3.1 untuk membuat dua proyek konsol baru, yaitu Kafka-Consumer dan Kafka-Producer

Gunakan nuget untuk mereferensikan paket Confluent.Kafka seperti ini, dengan perintah berikut:

Alamat GitHub:Login hyperlink terlihat.

Kita memulai program Produser terlebih dahulu, dan jika kita memulai konsumen terlebih dahulu, kita akan mendapatkan kesalahan berikut:
Terjadi kesalahan: Broker: Topik atau partisi tidak dikenal

Artikel ini akan menggunakan pengaturanEnableAutoOffsetStore adalah false, yaitu, mengatur penyimpanan offset secara manual (mirip dengan pesan konfirmasi manual)

Konsumen tidak mengatur OffsetStore setelah konsumsi

Coba gunakan produsen untuk menghasilkan dua pesan, nyalakan konsumsi konsumen, MaxPollIntervalMs = 10000 // 10 detik tanpa pengaturan manual, izinkan klien lain untuk mengkonsumsi, tentu saja tidak akan dikonsumsi oleh klien lain dalam waktu 10 detik

MaxPollIntervalMs menjelaskan
Untuk konsumen tingkat lanjut, waktu maksimum yang diizinkan untuk mengonsumsi pesan di antara panggilan (misalnya, rd_kafka_consumer_poll()). Jika interval ini terlampaui, konsumen dianggap gagal dan grup diseimbangkan ulang sehingga partisi ditetapkan ulang ke anggota grup konsumen lain. Peringatan: Penerapan offset mungkin tidak dimungkinkan saat ini. Catatan: Disarankan untuk mengatur "enable.auto.offset.store=false" untuk aplikasi yang diproses untuk waktu yang lama, lalu secara eksplisit menyimpan offset (menggunakan offsets_store()) setelah pesan diproses* untuk memastikan bahwa offset tidak secara otomatis diterapkan sebelum pemrosesan selesai. Periksa sekali per detik dengan interval dua kali. Untuk informasi lebih lanjut, lihat KIP-62.

Renderingnya adalah sebagai berikut:



OffsetStore diatur setelah konsumen menyelesaikan pengeluaran

kode

Setelah pengaturan selesai, tunggu 10 detik dan itu akan tetap dilakukanMenerima pesan terakhir(Ketika konsumen terhubung ke broker,Mulai konsumsi dari posisi offsetJika c.Commit(cr) diatur; Pesan terakhir tidak akan diterima berulang kali.

Melihat kode sumber



melakukan offset + 1 commit, dan akhirnya memanggil Librdkafka.topic_partition_list_destroy(cOffsets);

Login hyperlink terlihat.
Login hyperlink terlihat.

Mengatur GroupId yang berbeda

Coba atur GroupId yang berbeda melalui parameter baris perintah, lalu kirim pesan melalui produsen, seperti yang ditunjukkan pada gambar berikut:



Baik clinet1 maupun client2Menerima pesan historis, dan setelah produser mengirimkan pesan, keduanya hampir akanTerima pesan secara bersamaan

Konsumen baru hanya menerima pesan baru

Bagaimana Anda membuat klien baru hanya menerima pesan baru dan mengabaikan data historis?

Pengaturannya adalah sebagai berikut:

Seperti yang ditunjukkan di bawah ini:



Kode produser

Sebagai berikut:

Kode konsumen

Sebagai berikut:

Unduh kode sumber

Wisatawan, jika Anda ingin melihat konten tersembunyi dari posting ini, silakanJawab






Mantan:Pengecualian .NET/C# menggunakan Tencent Enterprise Mailbox: Operasi telah habis waktunya.
Depan:NuGet menghapus cache
 Tuan tanah| Diposting pada 15/04/2021 09.31.05 |
Ketika klien .NET Kafka terputus, ia tidak melemparkan pengecualian dan terhubung kembali setelah jaringan normal
%4|1618450028.267| GAGALT|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Terputus (setelah 59926ms dalam keadaan UP)
%3|1618450028.267| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Terputus (setelah 59926ms di status UP)
%3|1618450028.267| GAGALT|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Hubungkan ke ipv4#192.168.1.175:9092 gagal: Kesalahan tidak diketahui (setelah 0ms dalam status CONNECT)
%3|1618450028.268| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Sambungkan ke ipv4#192.168.1.175:9092 gagal: Kesalahan tidak diketahui (setelah 0ms dalam status CONNECT)
%3|1618450028.357| GAGALT|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Sambungkan ke ipv4#192.168.1.175:9092 gagal: Kesalahan tidak diketahui (setelah 10ms dalam status CONNECT, 1 kesalahan identik ditekan)
%3|1618450028.357| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Sambungkan ke ipv4#192.168.1.175:9092 gagal: Kesalahan tidak diketahui (setelah 10 ms dalam status CONNECT, 1 kesalahan identik ditekan)
%3|1618450062.882| GAGALT|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Hubungkan ke ipv4#192.168.1.175:9092 gagal: Kesalahan tidak diketahui (setelah 0ms dalam status CONNECT, 8 kesalahan identik ditekan)
%3|1618450062.882| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Sambungkan ke ipv4#192.168.1.175:9092 gagal: Kesalahan tidak diketahui (setelah 0ms dalam status CONNECT, 8 kesalahan identik ditekan)
%3|1618450098.255| GAGALT|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Hubungkan ke ipv4#192.168.1.175:9092 gagal: Kesalahan tidak diketahui (setelah 11 ms dalam status CONNECT, 4 kesalahan identik ditekan)
%3|1618450098.255| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Sambungkan ke ipv4#192.168.1.175:9092 gagal: Kesalahan tidak diketahui (setelah 11 ms dalam status CONNECT, 4 kesalahan identik ditekan)
%3|1618450138.243| GAGALT|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Hubungkan ke ipv4#192.168.1.175:9092 gagal: Kesalahan tidak diketahui (setelah 0ms dalam status CONNECT, 4 kesalahan identik ditekan)
%3|1618450138.244| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Sambungkan ke ipv4#192.168.1.175:9092 gagal: Kesalahan tidak diketahui (setelah 0ms dalam status CONNECT, 4 kesalahan identik ditekan)
%3|1618450168.254| GAGALT|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Hubungkan ke ipv4#192.168.1.175:9092 gagal: Kesalahan tidak diketahui (setelah 10 ms dalam status CONNECT, 3 kesalahan identik ditekan)
%3|1618450168.254| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Sambungkan ke ipv4#192.168.1.175:9092 gagal: Kesalahan tidak diketahui (setelah 10 ms dalam status CONNECT, 3 kesalahan identik ditekan)

 Tuan tanah| Diposting pada 13/04/2021 16.26.58 |
Prinsip konsumsi pesan:

Dalam proses produksi yang sebenarnya, setiap topik akan memiliki beberapa partisi, dan keuntungan dari beberapa partisi adalah bahwa di satu sisi, kemampuan untuk mem-shard data pada broker secara efektif mengurangi kapasitas pesan dan meningkatkan kinerja IO. Di sisi lain, untuk meningkatkan daya konsumsi sisi konsumen, topik yang sama umumnya akan dikonsumsi melalui banyak konsumen, yaitu mekanisme penyeimbangan beban sisi konsumen, yang akan kita pahami selanjutnya, bagaimana konsumen mengonsumsi pesan dalam kasus beberapa partisi dan banyak konsumen? Kafka ada dalam konsep kelompok konsumen, yaitu group.id jenis konsumen yang sama, yang termasuk dalam kelompok konsumen, dan semua konsumen dalam grup berkoordinasi untuk mengkonsumsi semua partisi topik berlangganan. Tentu saja, setiap partisi hanya dapat dikonsumsi oleh konsumen dalam grup konsumen yang sama, jadi bagaimana konsumen dalam grup konsumen yang sama mengalokasikan data di mana partisi harus dikonsumsi? Sebagai contoh sederhana, jika ada partisi yang kalah, yaitu ketika jumlah partiton sama dengan jumlah kosumer, setiap kosumer sesuai dengan partisi, jika jumlah kosumer lebih dari partisi, maka jumlah tambahan kosumer tidak akan berfungsi, sebaliknya, akan ada kosumer yang mengkonsumsi banyak partisi.

Strategi Penetapan Zonasi:

Di kafka, ada dua strategi alokasi partisi, satu adalah Range (default) dan yang lainnya adalah RoundRobin (polling). Ini diatur oleh parameter konfigurasi partition.assignment.strategy kosumer.


Lihat semua topik


Melihat detail untuk topik




 Tuan tanah| Diposting pada 08/05/2021 17.17.33 |
Kafka menghapus grup konsumen



Penghapusan grup konsumen yang diminta ('itsvse') berhasil.


Kesalahan berikut dapat dilaporkan:

Error: Deletion of some consumer groups failed:
* Grup 'itsvse' tidak dapat dihapus karena: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: Grup tidak kosong.
larutan

Menggunakan semua pesan, atau mengatur offset

Kafka secara manual mengatur offset offset
https://www.itsvse.com/thread-9641-1-1.html
Kemudian, hapus lagi!

 Tuan tanah| Diposting pada 13/04/2021 15.40.48 |
perintah shell daya



Setiap klien konsumen mempertahankan 2 koneksi ke layanan Kafka
 Tuan tanah| Diposting pada 07/05/2021 12.37.06 |
kafka, untuk melihat jumlah tumpukan topik di bawah grup tertentu

Diposting pada 16/06/2021 12.41.09 |
Silakan tanyakan mengapa kode tidak dapat dilihat ~
 Tuan tanah| Diposting pada 25/06/2021 10.50.06 |
Kafka mendapatkan perintah ukuran topik:



 Tuan tanah| Diposting pada 18/07/2021 10.15.01 |
Baris perintah Kafka untuk membuat topik:

Diposting pada 03/09/2021 11.52.41 |
Masih banyak jebakan dalam kafka, dipelajari
Sanggahan:
Semua perangkat lunak, materi pemrograman, atau artikel yang diterbitkan oleh Code Farmer Network hanya untuk tujuan pembelajaran dan penelitian; Konten di atas tidak boleh digunakan untuk tujuan komersial atau ilegal, jika tidak, pengguna akan menanggung semua konsekuensi. Informasi di situs ini berasal dari Internet, dan sengketa hak cipta tidak ada hubungannya dengan situs ini. Anda harus sepenuhnya menghapus konten di atas dari komputer Anda dalam waktu 24 jam setelah pengunduhan. Jika Anda menyukai program ini, harap dukung perangkat lunak asli, pembelian pendaftaran, dan dapatkan layanan asli yang lebih baik. Jika ada pelanggaran, silakan hubungi kami melalui email.

Mail To:help@itsvse.com