This article is a mirror article of machine translation, please click here to jump to the original article.

View: 129246|Reply: 17

[Source] Kafka operations for .NET/C# message queues [with source code]

[Copy link]
Posted on 4/13/2021 11:45:31 AM | | | |
Kafka is a high-performance, distributed messaging system developed by LinkedIn that is widely used in scenarios such as log collection, streaming data processing, online and offline message distribution, and more. Although not designed as a traditional MQ, Kafaka can replace traditional messaging systems such as ActiveMQ in most cases.

Kafka organizes the flow of messages by topics, and the server that holds the messages is called a broker, and consumers can subscribe to one or more topics. In order to balance the load, the messages of a topic can be divided into multiple partitions, and the more partitions, the higher the parallelism and throughput of Kafka.

Kafka clusters require zookeeper support to implement clusters, and zookeeper is already included in the latest kafka distribution, which can be deployed to start a zookeeper server and a Kafka server at the same time, or use other existing zookeeper clusters.

Unlike traditional MQ, consumers need to keep an offset by themselves, and when getting messages from kafka, only pull messages after the current offset. Kafka's scala/java client already implements this part of the logic by saving the offset to the zookeeper. Each consumer can choose an ID, and consumers with the same ID will only receive the same message once.If consumers of a topic all use the same id, it is a traditional Queue. If each consumer uses a different ID, it is a traditional pub-sub.

Review:

Add ActiveMQ to system services under Windows
https://www.itsvse.com/thread-6210-1-1.html

NumberOfPendingMessages, MessagesEnqueued, Messag...
https://www.itsvse.com/thread-4954-1-1.html

Summary of information about ActiveMQ and RabbitMQ
https://www.itsvse.com/thread-4659-1-1.html

CentOS ActiveMQ is added to the service
https://www.itsvse.com/thread-4617-1-1.html

Centos 6.2 64-bit installation activemq tutorial
https://www.itsvse.com/thread-4616-1-1.html

ActiveMQ5.15.3 fails to start, and UnsupportedClassVersionError is reported
https://www.itsvse.com/thread-4615-1-1.html

Activemq topic permission settings
https://www.itsvse.com/thread-4495-1-1.html

User itsvse is not authorized to read from: ActiveMQ.Advisory.TempQueue,Activ...
https://www.itsvse.com/thread-4476-1-1.html

C# ActiveMQ client subscribes to source code
https://www.itsvse.com/thread-4470-1-1.html

.net/c# activemq to set the connection account and password
https://www.itsvse.com/thread-4282-1-1.html

Set the user name and password for the ACTIVEMQ theme and queue
https://www.itsvse.com/thread-4281-1-1.html

activemq modifies the website management password
https://www.itsvse.com/thread-4280-1-1.html

activemq Persistent store is Full
https://www.itsvse.com/thread-4125-1-1.html

.NET/C# ActiveMQ operation example [Source code]
https://www.itsvse.com/thread-3907-1-1.html

Activemq user permission configuration
https://www.itsvse.com/thread-3906-1-1.html

The difference between activemq Queue and Topic is that
https://www.itsvse.com/thread-3863-1-1.html

. .Net platform
https://www.itsvse.com/thread-3452-1-1.html

ActiveMQ persistent subscription settings
https://www.itsvse.com/thread-3451-1-1.html

RabbitMQ BasicQos consumer parallel processing limit
https://www.itsvse.com/thread-4667-1-1.html

rabbitMQ Queue Queue Message Persistence [with source code]
https://www.itsvse.com/thread-4657-1-1.html

【Practice】rabbitMQ console to add account information
https://www.itsvse.com/thread-4655-1-1.html

An in-depth analysis of the mechanism of RabbitMQ message response
https://www.itsvse.com/thread-4639-1-1.html

.net/c# RabbitMQ connection disconnection - disconnection and reconnection
https://www.itsvse.com/thread-4636-1-1.html

Introduction to the three exchange modes (fanout, direct, and topic) of RabbitMQ
https://www.itsvse.com/thread-4635-1-1.html

【Practice】RabbitMQ installs the web management plugin
https://www.itsvse.com/thread-4631-1-1.html

【Practical Combat】RabbitMQ installation tutorial under Windows
https://www.itsvse.com/thread-4630-1-1.html
kafka consumption

1. Consumers of the same group_id, only one consumer can consume messages (queue queue mode

2. Consumers of different group_id receive the same news

Advantages of Kafka

Distributed and highly scalable. Kafka clusters can be transparently scaled to add new servers to the cluster.

High performance. Kafka's performance greatly exceeds that of traditional MQ implementations such as ActiveMQ and RabbitMQ, especially Kafka, which also supports batch operations. The following image shows the results of LinkedIn's consumer performance stress test:

Fault tolerance. Data from each partition in Kafka is replicated to several servers. When a broker fails, the ZooKeeper service will notify the producer and the consumer, who switch to another broker.

Disadvantages of Kafka:

Repeat messages. Kafka only guarantees that each message will be delivered at least once, and while the odds are slim, there is a chance that a message will be delivered multiple times.
The news is out of order. Although messages inside a partition are guaranteed to be orderly, if a topic has multiple partitions, message delivery between partitions is not guaranteed to be orderly.
Complexity. Kafka requires the support of zookeeper clusters, and topics usually require manual labor to create, deploy, and maintain more expensive than general message queues

.NET/C# message queue Kafka operations

First, use .NET Core 3.1 to create two new console projects, namely Kafka-Consumer and Kafka-Producer

Use nuget to reference the Confluent.Kafka package like this, with the following command:

GitHub address:The hyperlink login is visible.

We start the Producer program first, and if we start the consumer first, we will get the following error:
Error occured: Broker: Unknown topic or partition

This article will consume settingsEnableAutoOffsetStore is false, that is, manually setting the offset storage (similar to a manual confirmation message)

Consumers do not set OffsetStore after consumption

Try to use the producer to produce two messages, turn on consumer consumption, MaxPollIntervalMs = 10000 // 10 seconds without manual setting, allow other clients to consume, of course, it will not be consumed by other clients within 10 seconds

MaxPollIntervalMs explains
For advanced consumers, the maximum allowed time to consume messages between calls (for example, rd_kafka_consumer_poll()). If this interval is exceeded, the consumer is considered to have failed and the group is rebalanced so that the partition is reassigned to another consumer group member. Warning: Offset commits may not be possible at this time. Note: It is recommended to set "enable.auto.offset.store=false" for applications that are processing for a long time, and then explicitly store the offset (using offsets_store()) after the message is processed* to ensure that the offset is not automatically committed before processing is complete. Check once per second at intervals of two. For more information, see KIP-62.

The renderings are as follows:



OffsetStore is set after the consumer finishes spending

code

After the setup is complete, wait 10 seconds and it will still doReceived the last message(When the consumer connects to the broker,Start consumption from the offset positionIf c.Commit(cr) is set; The last message will not be received repeatedly.

View source code



commit the offset + 1 commit, and eventually call Librdkafka.topic_partition_list_destroy(cOffsets);

The hyperlink login is visible.
The hyperlink login is visible.

Set a different GroupId

Try setting a different GroupId via the command line parameter, and then send a message through the producer, as shown in the following image:



Both clinet1 and client2Receive historical messages, and after the producer sends out a message, both will almost beReceive messages at the same time

New consumers only receive new messages

How do you make a new client receive only new messages and ignore historical data?

The settings are as follows:

As shown below:



Producer code

As follows:

Consumer code

As follows:

Source code download

Tourists, if you want to see the hidden content of this post, pleaseReply






Previous:.NET/C# Exception using Tencent Enterprise Mailbox: The operation has timed out.
Next:NuGet clears the cache
 Landlord| Posted on 4/15/2021 9:31:05 AM |
When the .NET Kafka client is disconnected, it does not throw an exception and reconnects after the network is normal
%4|1618450028.267| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Disconnected (after 59926ms in state UP)
%3|1618450028.267| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Disconnected (after 59926ms in state UP)
%3|1618450028.267| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 0ms in state CONNECT)
%3|1618450028.268| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 0ms in state CONNECT)
%3|1618450028.357| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 10ms in state CONNECT, 1 identical error(s) suppressed)
%3|1618450028.357| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 10ms in state CONNECT, 1 identical error(s) suppressed)
%3|1618450062.882| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 0ms in state CONNECT, 8 identical error(s) suppressed)
%3|1618450062.882| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 0ms in state CONNECT, 8 identical error(s) suppressed)
%3|1618450098.255| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 11ms in state CONNECT, 4 identical error(s) suppressed)
%3|1618450098.255| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 11ms in state CONNECT, 4 identical error(s) suppressed)
%3|1618450138.243| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 0ms in state CONNECT, 4 identical error(s) suppressed)
%3|1618450138.244| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 0ms in state CONNECT, 4 identical error(s) suppressed)
%3|1618450168.254| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 10ms in state CONNECT, 3 identical error(s) suppressed)
%3|1618450168.254| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: Connect to ipv4#192.168.1.175:9092 failed: Unknown error (after 10ms in state CONNECT, 3 identical error(s) suppressed)

 Landlord| Posted on 4/13/2021 4:26:58 PM |
Principle of message consumption:

In the actual production process, each topic will have multiple partitions, and the advantage of multiple partitions is that on the one hand, the ability to sharding the data on the broker effectively reduces the capacity of messages and improves IO performance. On the other hand, in order to improve the consumption power of the consumer side, the same topic will generally be consumed through multiple consumers, that is, the load balancing mechanism of the consumer side, which is what we will understand next, how do consumers consume messages in the case of multiple partitions and multiple consumers? Kafka exists in the concept of consumer groups, that is, group.id the same kind of consumers, which belong to a consumer group, and all the consumers in the group coordinate to consume all the partitions of the subscription topic. Of course, each partition can only be consumed by consumers in the same consumer group, so how do consumers in the same consumer group allocate the data in which partition should be consumed? For a simple example, if there are partitions losing, that is, when the number of partitons is the same as the number of comsumers, each comsumer corresponds to a partition, if the number of comsumers is more than partitions, then the extra number of comsumers will not work, on the contrary, there will be comsumers consuming multiple partitions.

Zoning Assignment Strategy:

In kafka, there are two partition allocation strategies, one is Range (default) and the other is RoundRobin (polling). This is set by the comsumer's configuration partition.assignment.strategy parameter.


See all topics


View details for a topic




 Landlord| Posted on 5/8/2021 5:17:33 PM |
Kafka deletes consumer groups



Deletion of requested consumer groups ('itsvse') was successful.


The following errors may be reported:

Error: Deletion of some consumer groups failed:
* Group 'itsvse' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.
solution

Consume all messages, or set an offset

Kafka manually sets the offset offset
https://www.itsvse.com/thread-9641-1-1.html
Then, delete it again!

 Landlord| Posted on 4/13/2021 3:40:48 PM |
power shell command



Each consumer client maintains 2 connections to the Kafka service
 Landlord| Posted on 5/7/2021 12:37:06 PM |
kafka, to view the number of stacks of topics under a specified group

Posted on 6/16/2021 12:41:09 PM |
Please ask why the code cannot be viewed~
 Landlord| Posted on 6/25/2021 10:50:06 AM |
Kafka gets the topic size command:



 Landlord| Posted on 7/18/2021 10:15:01 AM |
Kafka command line to create topics:

Posted on 9/3/2021 11:52:41 AM |
There are still many pitfalls in kafka, learned
Disclaimer:
All software, programming materials or articles published by Code Farmer Network are only for learning and research purposes; The above content shall not be used for commercial or illegal purposes, otherwise, users shall bear all consequences. The information on this site comes from the Internet, and copyright disputes have nothing to do with this site. You must completely delete the above content from your computer within 24 hours of downloading. If you like the program, please support genuine software, purchase registration, and get better genuine services. If there is any infringement, please contact us by email.

Mail To:help@itsvse.com