Kafka adalah sistem perpesanan terdistribusi berkinerja tinggi yang dikembangkan oleh LinkedIn yang banyak digunakan dalam skenario seperti pengumpulan log, pemrosesan data streaming, distribusi pesan online dan offline, dan banyak lagi. Meskipun tidak dirancang sebagai MQ tradisional, Kafaka dapat menggantikan sistem perpesanan tradisional seperti ActiveMQ dalam banyak kasus.
Kafka mengatur alur pesan berdasarkan topik, dan server yang menyimpan pesan disebut broker, dan konsumen dapat berlangganan satu atau lebih topik. Untuk menyeimbangkan beban, pesan dari suatu topik dapat dibagi menjadi beberapa partisi, dan semakin banyak partisi, semakin tinggi paralelisme dan throughput Kafka.
Klaster Kafka memerlukan dukungan zookeeper untuk mengimplementasikan kluster, dan zookeeper sudah termasuk dalam distribusi Kafka terbaru, yang dapat digunakan untuk memulai server zookeeper dan server Kafka secara bersamaan, atau menggunakan kluster zookeeper lain yang ada.
Tidak seperti MQ tradisional, konsumen perlu menyimpan offset sendiri, dan ketika mendapatkan pesan dari kafka, hanya menarik pesan setelah offset saat ini. Klien scala/java Kafka sudah mengimplementasikan bagian logika ini dengan menyimpan offset ke penjaga kebun binatang. Setiap konsumen dapat memilih ID, dan konsumen dengan ID yang sama hanya akan menerima pesan yang sama satu kali.Jika konsumen dari suatu topik semua menggunakan id yang sama, itu adalah Antrean tradisional. Jika setiap konsumen menggunakan ID yang berbeda, itu adalah pub-sub tradisional.
Resensi:
Konsumsi kafka
1. Konsumen dari group_id yang sama, hanya satu konsumen yang dapat mengonsumsi pesan (Mode Antrean)
2. Konsumen dari berbagai group_id menerima berita yang sama
Keuntungan dari Kafka
Terdistribusi dan sangat terukur. Kluster Kafka dapat diskalakan secara transparan untuk menambahkan server baru ke kluster.
Kinerja tinggi. Kinerja Kafka jauh melebihi implementasi MQ tradisional seperti ActiveMQ dan RabbitMQ, terutama Kafka, yang juga mendukung operasi batch. Gambar berikut menunjukkan hasil uji stres kinerja konsumen LinkedIn:
Toleransi kesalahan. Data dari setiap partisi di Kafka direplikasi ke beberapa server. Ketika broker gagal, layanan ZooKeeper akan memberi tahu produsen dan konsumen, yang beralih ke broker lain.
Kekurangan Kafka:
Ulangi pesan. Kafka hanya menjamin bahwa setiap pesan akan dikirimkan setidaknya sekali, dan meskipun kemungkinannya tipis, ada kemungkinan pesan akan terkirim beberapa kali. Beritanya rusak. Meskipun pesan di dalam partisi dijamin teratur, jika topik memiliki beberapa partisi, pengiriman pesan antar partisi tidak dijamin teratur. Kompleksitas. Kafka membutuhkan dukungan kluster penjaga kebun binatang, dan topik biasanya memerlukan tenaga kerja manual untuk membuat, menyebarkan, dan memelihara lebih mahal daripada antrean pesan umum
Operasi Kafka antrean pesan .NET/C#
Pertama, gunakan .NET Core 3.1 untuk membuat dua proyek konsol baru, yaitu Kafka-Consumer dan Kafka-Producer
Gunakan nuget untuk mereferensikan paket Confluent.Kafka seperti ini, dengan perintah berikut:
Alamat GitHub:Login hyperlink terlihat.
Kita memulai program Produser terlebih dahulu, dan jika kita memulai konsumen terlebih dahulu, kita akan mendapatkan kesalahan berikut:
Terjadi kesalahan: Broker: Topik atau partisi tidak dikenal Artikel ini akan menggunakan pengaturanEnableAutoOffsetStore adalah false, yaitu, mengatur penyimpanan offset secara manual (mirip dengan pesan konfirmasi manual)
Konsumen tidak mengatur OffsetStore setelah konsumsi
Coba gunakan produsen untuk menghasilkan dua pesan, nyalakan konsumsi konsumen, MaxPollIntervalMs = 10000 // 10 detik tanpa pengaturan manual, izinkan klien lain untuk mengkonsumsi, tentu saja tidak akan dikonsumsi oleh klien lain dalam waktu 10 detik
MaxPollIntervalMs menjelaskan
Untuk konsumen tingkat lanjut, waktu maksimum yang diizinkan untuk mengonsumsi pesan di antara panggilan (misalnya, rd_kafka_consumer_poll()). Jika interval ini terlampaui, konsumen dianggap gagal dan grup diseimbangkan ulang sehingga partisi ditetapkan ulang ke anggota grup konsumen lain. Peringatan: Penerapan offset mungkin tidak dimungkinkan saat ini. Catatan: Disarankan untuk mengatur "enable.auto.offset.store=false" untuk aplikasi yang diproses untuk waktu yang lama, lalu secara eksplisit menyimpan offset (menggunakan offsets_store()) setelah pesan diproses* untuk memastikan bahwa offset tidak secara otomatis diterapkan sebelum pemrosesan selesai. Periksa sekali per detik dengan interval dua kali. Untuk informasi lebih lanjut, lihat KIP-62. Renderingnya adalah sebagai berikut:
OffsetStore diatur setelah konsumen menyelesaikan pengeluaran
kode
Setelah pengaturan selesai, tunggu 10 detik dan itu akan tetap dilakukanMenerima pesan terakhir(Ketika konsumen terhubung ke broker,Mulai konsumsi dari posisi offsetJika c.Commit(cr) diatur; Pesan terakhir tidak akan diterima berulang kali.
Melihat kode sumber
melakukan offset + 1 commit, dan akhirnya memanggil Librdkafka.topic_partition_list_destroy(cOffsets);
Login hyperlink terlihat.
Login hyperlink terlihat.
Mengatur GroupId yang berbeda
Coba atur GroupId yang berbeda melalui parameter baris perintah, lalu kirim pesan melalui produsen, seperti yang ditunjukkan pada gambar berikut:
Baik clinet1 maupun client2Menerima pesan historis, dan setelah produser mengirimkan pesan, keduanya hampir akanTerima pesan secara bersamaan。
Konsumen baru hanya menerima pesan baru
Bagaimana Anda membuat klien baru hanya menerima pesan baru dan mengabaikan data historis?
Pengaturannya adalah sebagai berikut:
Seperti yang ditunjukkan di bawah ini:
Kode produser
Sebagai berikut:
Kode konsumen
Sebagai berikut:
Unduh kode sumber
Wisatawan, jika Anda ingin melihat konten tersembunyi dari posting ini, silakan Jawab
|