Kafka is a high-performance, distributed messaging system developed by LinkedIn that is widely used in scenarios such as log collection, streaming data processing, online and offline message distribution, and more. Although not designed as a traditional MQ, Kafaka can replace traditional messaging systems such as ActiveMQ in most cases.
Kafka organizes the flow of messages by topics, and the server that holds the messages is called a broker, and consumers can subscribe to one or more topics. In order to balance the load, the messages of a topic can be divided into multiple partitions, and the more partitions, the higher the parallelism and throughput of Kafka.
Kafka clusters require zookeeper support to implement clusters, and zookeeper is already included in the latest kafka distribution, which can be deployed to start a zookeeper server and a Kafka server at the same time, or use other existing zookeeper clusters.
Unlike traditional MQ, consumers need to keep an offset by themselves, and when getting messages from kafka, only pull messages after the current offset. Kafka's scala/java client already implements this part of the logic by saving the offset to the zookeeper. Each consumer can choose an ID, and consumers with the same ID will only receive the same message once.If consumers of a topic all use the same id, it is a traditional Queue. If each consumer uses a different ID, it is a traditional pub-sub.
Review:
kafka consumption
1. Consumers of the same group_id, only one consumer can consume messages (queue queue mode)
2. Consumers of different group_id receive the same news
Advantages of Kafka
Distributed and highly scalable. Kafka clusters can be transparently scaled to add new servers to the cluster.
High performance. Kafka's performance greatly exceeds that of traditional MQ implementations such as ActiveMQ and RabbitMQ, especially Kafka, which also supports batch operations. The following image shows the results of LinkedIn's consumer performance stress test:
Fault tolerance. Data from each partition in Kafka is replicated to several servers. When a broker fails, the ZooKeeper service will notify the producer and the consumer, who switch to another broker.
Disadvantages of Kafka:
Repeat messages. Kafka only guarantees that each message will be delivered at least once, and while the odds are slim, there is a chance that a message will be delivered multiple times. The news is out of order. Although messages inside a partition are guaranteed to be orderly, if a topic has multiple partitions, message delivery between partitions is not guaranteed to be orderly. Complexity. Kafka requires the support of zookeeper clusters, and topics usually require manual labor to create, deploy, and maintain more expensive than general message queues
.NET/C# message queue Kafka operations
First, use .NET Core 3.1 to create two new console projects, namely Kafka-Consumer and Kafka-Producer
Use nuget to reference the Confluent.Kafka package like this, with the following command:
GitHub address:The hyperlink login is visible.
We start the Producer program first, and if we start the consumer first, we will get the following error:
Error occured: Broker: Unknown topic or partition This article will consume settingsEnableAutoOffsetStore is false, that is, manually setting the offset storage (similar to a manual confirmation message)
Consumers do not set OffsetStore after consumption
Try to use the producer to produce two messages, turn on consumer consumption, MaxPollIntervalMs = 10000 // 10 seconds without manual setting, allow other clients to consume, of course, it will not be consumed by other clients within 10 seconds
MaxPollIntervalMs explains
For advanced consumers, the maximum allowed time to consume messages between calls (for example, rd_kafka_consumer_poll()). If this interval is exceeded, the consumer is considered to have failed and the group is rebalanced so that the partition is reassigned to another consumer group member. Warning: Offset commits may not be possible at this time. Note: It is recommended to set "enable.auto.offset.store=false" for applications that are processing for a long time, and then explicitly store the offset (using offsets_store()) after the message is processed* to ensure that the offset is not automatically committed before processing is complete. Check once per second at intervals of two. For more information, see KIP-62. The renderings are as follows:
OffsetStore is set after the consumer finishes spending
code
After the setup is complete, wait 10 seconds and it will still doReceived the last message(When the consumer connects to the broker,Start consumption from the offset positionIf c.Commit(cr) is set; The last message will not be received repeatedly.
View source code
commit the offset + 1 commit, and eventually call Librdkafka.topic_partition_list_destroy(cOffsets);
The hyperlink login is visible.
The hyperlink login is visible.
Set a different GroupId
Try setting a different GroupId via the command line parameter, and then send a message through the producer, as shown in the following image:
Both clinet1 and client2Receive historical messages, and after the producer sends out a message, both will almost beReceive messages at the same time。
New consumers only receive new messages
How do you make a new client receive only new messages and ignore historical data?
The settings are as follows:
As shown below:
Producer code
As follows:
Consumer code
As follows:
Source code download
Tourists, if you want to see the hidden content of this post, please Reply
|