이 글은 기계 번역의 미러 문서이며, 원본 기사로 바로 이동하려면 여기를 클릭해 주세요.

보기: 129246|회답: 17

[출처] .NET/C# 메시지 큐를 위한 Kafka 연산 [소스 코드 포함]

[링크 복사]
게시됨 2021. 4. 13. 오전 11:45:31 | | | |
Kafka는 링크드인에서 개발한 고성능 분산 메시징 시스템으로, 로그 수집, 스트리밍 데이터 처리, 온라인 및 오프라인 메시지 배포 등 다양한 시나리오에서 널리 사용됩니다. 전통적인 MC로 설계된 것은 아니지만, Kafaka는 대부분의 경우 ActiveMQ와 같은 전통적인 메시징 시스템을 대체할 수 있습니다.

Kafka는 메시지 흐름을 주제별로 조직하며, 메시지를 보관하는 서버를 브로커라고 하며, 소비자는 하나 이상의 주제에 구독할 수 있습니다. 부하를 균형 있게 하기 위해 주제의 메시지를 여러 파티션으로 나눌 수 있으며, 파티션이 많을수록 Kafka의 병렬성과 처리량이 높아집니다.

Kafka 클러스터는 클러스터를 구현하기 위해 Zookeeper 지원이 필요하며, zookeeper는 최신 kafka 배포판에 이미 포함되어 있어 zookeeper 서버와 Kafka 서버를 동시에 시작하거나 기존 Zookeeper 클러스터를 사용할 수 있습니다.

전통적인 MC와 달리, 소비자는 스스로 오프셋을 유지해야 하며, Kafka에서 메시지를 받을 때는 현재 오프셋 이후에만 메시지를 가져옵니다. Kafka의 Scala/Java 클라이언트는 이미 이 로직 부분을 Zookeeper에 오프셋을 저장해 구현하고 있습니다. 각 소비자는 ID를 선택할 수 있으며, 같은 ID를 가진 소비자는 단 한 번만 같은 메시지를 받게 됩니다.만약 한 주제의 소비자들이 모두 같은 ID를 사용한다면, 그것은 전통적인 큐입니다. 각 소비자가 다른 ID를 사용한다면, 그것은 전통적인 pub-sub입니다.

복습:

Windows에서 시스템 서비스에 ActiveMQ를 추가하세요
https://www.itsvse.com/thread-6210-1-1.html

대기 중인 메시지, 인큐된 메시지, 메시지...
https://www.itsvse.com/thread-4954-1-1.html

ActiveMQ와 RabbitMQ에 대한 정보 요약
https://www.itsvse.com/thread-4659-1-1.html

CentOS ActiveMQ가 서비스에 추가되었습니다
https://www.itsvse.com/thread-4617-1-1.html

Centos 6.2 64비트 설치 activemq 튜토리얼
https://www.itsvse.com/thread-4616-1-1.html

ActiveMQ5.15.3이 시작되지 않고, UnsupportedClassVersionError가 보고됩니다
https://www.itsvse.com/thread-4615-1-1.html

Activemq 주제 권한 설정
https://www.itsvse.com/thread-4495-1-1.html

사용자(itsvse)는 ActiveMQ.Advisory.TempQueue, Activ...에서 읽을 권한이 없습니다.
https://www.itsvse.com/thread-4476-1-1.html

C# ActiveMQ 클라이언트는 소스 코드를 구독합니다
https://www.itsvse.com/thread-4470-1-1.html

.net/c# activemq: 연결 계정과 비밀번호를 설정하기
https://www.itsvse.com/thread-4282-1-1.html

ACTIVEMQ 테마와 큐의 사용자 이름과 비밀번호를 설정하세요
https://www.itsvse.com/thread-4281-1-1.html

ActiveMQ는 웹사이트 관리 비밀번호를 수정합니다
https://www.itsvse.com/thread-4280-1-1.html

activemq 지속 저장소가 가득 찼습니다
https://www.itsvse.com/thread-4125-1-1.html

.NET/C# ActiveMQ 연산 예시 [소스 코드]
https://www.itsvse.com/thread-3907-1-1.html

Activemq 사용자 권한 구성
https://www.itsvse.com/thread-3906-1-1.html

activemq 큐와 Topic의 차이점은 다음과 같습니다
https://www.itsvse.com/thread-3863-1-1.html

. .Net 플랫폼
https://www.itsvse.com/thread-3452-1-1.html

ActiveMQ 영구 구독 설정
https://www.itsvse.com/thread-3451-1-1.html

RabbitMQ BasicQos 소비자 병렬 처리 한계
https://www.itsvse.com/thread-4667-1-1.html

rabbitMQ 큐 큐 메시지 영속성 [소스 코드 포함]
https://www.itsvse.com/thread-4657-1-1.html

【연습】rabbitMQ 콘솔을 통해 계정 정보를 추가하세요
https://www.itsvse.com/thread-4655-1-1.html

RabbitMQ 메시지 응답 메커니즘에 대한 심층 분석
https://www.itsvse.com/thread-4639-1-1.html

.net/c# RabbitMQ 연결 끊김 - 연결 해제 및 재연결
https://www.itsvse.com/thread-4636-1-1.html

RabbitMQ의 세 가지 교환 모드(팬아웃, 다이렉트, 토픽) 소개
https://www.itsvse.com/thread-4635-1-1.html

【Practice】RabbitMQ가 웹 관리 플러그인을 설치합니다
https://www.itsvse.com/thread-4631-1-1.html

【실전 전투】Windows에서 RabbitMQ 설치 튜토리얼
https://www.itsvse.com/thread-4630-1-1.html
카프카 소비

1. 같은 group_id 소비자 중 한 명만 메시지를 소비할 수 있습니다 (큐 큐 모드

2. 서로 다른 group_id 소비자들이 동일한 뉴스를 받는다

카프카의 장점

분산되어 있고 매우 확장 가능합니다. Kafka 클러스터는 투명하게 확장되어 새로운 서버를 클러스터에 추가할 수 있습니다.

고성능 Kafka의 성능은 ActiveMQ와 RabbitMQ와 같은 전통적인 MQ 구현, 특히 배치 연산을 지원하는 Kafka보다 훨씬 뛰어납니다. 다음 이미지는 LinkedIn의 소비자 성과 스트레스 테스트 결과를 보여줍니다:

결함 허용성. Kafka의 각 파티션에서 나온 데이터는 여러 서버에 복제됩니다. 중개인이 실패하면 ZooKeeper 서비스는 생산자와 소비자에게 통보하며, 소비자는 다른 중개인으로 전환합니다.

카프카의 단점:

메시지를 반복하세요. 카프카는 각 메시지가 적어도 한 번은 전달될 것만 보장하며, 확률은 낮지만 메시지가 여러 번 전달될 가능성도 있습니다.
뉴스가 순서가 뒤죽박죽이야. 파티션 내 메시지는 질서 정연하게 유지되지만, 주제에 여러 파티션이 있으면 파티션 간 메시지 전달이 질서 정연하다는 보장은 없습니다.
복잡성. Kafka는 Zookeeper 클러스터 지원이 필요하며, 주제들은 일반 메시지 큐보다 더 비용이 많이 드는 수작업 작업이 필요합니다

.NET/C# 메시지 큐 Kafka 작업

먼저, .NET Core 3.1을 사용해 Kafka-Consumer와 Kafka-Producer라는 두 개의 새로운 콘솔 프로젝트를 만듭니다

Nuget을 사용하여 Confluent.Kafka 패키지를 다음과 같이 참조하며, 다음 명령어를 사용합니다:

GitHub 주소:하이퍼링크 로그인이 보입니다.

먼저 프로듀서 프로그램을 시작하고, 컨퍼머를 먼저 시작하면 다음과 같은 오류가 발생합니다:
오류 발생: 브로커: 알 수 없는 주제 또는 파티션

이 글은 여러 설정을 다룰 것입니다EnableAutoOffsetStore 는 false입니다즉, 오프셋 저장 공간을 수동으로 설정하는 것(수동 확인 메시지와 유사)

소비자는 소비 후 오프셋스토어를 설정하지 않습니다

프로듀서를 사용해 두 개의 메시지를 생성하고, 소비자 소비를 켜고, MaxPollIntervalMs = 10000 // 수동 설정 없이 10초를 설정하며, 다른 클라이언트가 소비하도록 허용하세요. 물론 10초 이내에 다른 클라이언트가 소비하지 않습니다

MaxPollIntervalMs 설명
고급 소비자의 경우, 통화 간 메시지를 소비할 수 있는 최대 허용 시간(예: rd_kafka_consumer_poll())). 이 구간을 초과하면 소비자가 실패한 것으로 간주되어 그룹이 재균형되어 파티션이 다른 소비자 그룹 멤버에게 재할당됩니다. 경고: 현재로서는 오프셋 커밋이 불가능할 수 있습니다. 참고: 오랜 시간 처리되는 애플리케이션은 "enable.auto.offset.store=false"를 설정하고, 메시지 처리 후 offsets_store()를 사용해* 오프셋을 명시적으로 저장하여 처리가 완료되기 전에 자동으로 커밋되지 않도록 하는 것이 권장됩니다. 초당 두 번씩 한 번 확인하세요. 자세한 내용은 KIP-62를 참조하세요.

렌더링은 다음과 같습니다:



오프셋스토어는 소비자가 지출을 마친 후에 설정됩니다

코드

설정이 끝난 후에도 10초 정도 기다려도 여전히 작동합니다마지막 메시지 받았습니다(소비자가 중개인과 연결될 때,오프셋 위치에서 소비를 시작하세요c.Commit(cr)이 설정되어 있다면; 마지막 메시지는 반복적으로 수신되지 않습니다.

소스 코드 보기



오프셋 + 1 커밋을 커밋하고, 결국 Librdkafka.topic_partition_list_destroy(cOffsets)를 호출합니다;

하이퍼링크 로그인이 보입니다.
하이퍼링크 로그인이 보입니다.

다른 GroupID 설정

명령줄 매개변수를 통해 다른 GroupID를 설정한 후, 다음 이미지와 같이 프로듀서를 통해 메시지를 보내보세요:



clinet1과 client2 모두과거 메시지 수신하기, 그리고 제작자가 메시지를 보내면 두 가지 모두 거의동시에 메시지를 수신하기

신규 소비자는 새로운 메시지만 받습니다

새 클라이언트가 새 메시지만 받고 과거 데이터는 무시하게 하려면 어떻게 해야 하나요?

배경은 다음과 같습니다:

아래에 나와 있습니다:



프로듀서 코드

다음과 같습니다:

소비자 코드

다음과 같습니다:

소스 코드 다운로드

관광객 여러분, 이 게시물의 숨겨진 내용을 보고 싶으시다면 부탁드립니다회답






이전의:텐센트 엔터프라이즈 메일박스를 사용한 .NET/C# 예외: 작업이 타임아웃되었습니다.
다음:NuGet이 캐시를 정리합니다
 집주인| 게시됨 2021. 4. 15. 오전 9:31:05 |
.NET Kafka 클라이언트가 연결이 끊겨도 예외를 발생시키지 않고 네트워크가 정상 상태가 된 후 다시 연결됩니다
%4|1618450028.267| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: 연결 끊김 (UP 주 기준 59926ms 이후)
%3|1618450028.267| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: 연결 끊김 (UP 주 기준 59926ms 이후)
%3|1618450028.267| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: IPv4에 연결하라#192.168.1.175:9092에 연결 실패함: 상태 연결 후 0ms 이상 확인 불가 오류
%3|1618450028.268| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: IPv4에 연결하려 하지 않음 #192.168.1.175:9092 실패: 알 수 없는 오류 (상태 연결 0ms 후)
%3|1618450028.357| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: ipv4#192.168.1.175:9092 연결 실패: 알 수 없는 오류 (CONNECT에서 10ms 후 동일한 오류 1건 억제됨)
%3|1618450028.357| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: IPv4에 연결함 실패 #192.168.1.175:9092 실패: 알 수 없는 오류 (상태 CONNECT에서 10ms 후 동일한 오류 1건 억제됨)
%3|1618450062.882| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: ipv4#192.168.1.175:9092 연결 실패: 알 수 없는 오류 (CONNECT에서 0ms 후, 동일 오류 8건 억제됨)
%3|1618450062.882| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: IPv4에 연결함 실패 #192.168.1.175:9092 실패: 알 수 없는 오류 (CONNECT에서 0ms 후, 동일 오류 8건 억제됨)
%3|1618450098.255| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: ipv4#192.168.1.175:9092에 연결 실패: 알 수 없는 오류 (상태 CONNECT에서 11ms 후, 동일한 오류 4건 억제됨)
%3|1618450098.255| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: IPv4에 연결함 실패 #192.168.1.175:9092 실패: 알 수 없는 오류 (CONNECT에서 11ms 후 4개의 동일한 오류 억제)
%3|1618450138.243| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: IPv4에 연결함#192.168.1.175:9092 실패: 알 수 없는 오류 (상태 CONNECT에서 0ms 후, 동일 오류 4건 억제됨)
%3|1618450138.244| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: IPv4에 연결함 실패 #192.168.1.175:9092 실패: 알 수 없는 오류 (CONNECT에서 0ms 후 4개의 동일한 오류 억제)
%3|1618450168.254| FAIL|rdkafka#consumer-1| [thrd:192.168.1.175:9092/bootstrap]: 192.168.1.175:9092/1: ipv4#192.168.1.175:9092에 연결 실패: 알 수 없는 오류 (CONNECT에서 10ms 후 3개의 동일한 오류 억제)
%3|1618450168.254| ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 192.168.1.175:9092/1: IPv4에 연결함 실패 #192.168.1.175:9092 실패: 알 수 없는 오류 (CONNECT에서 10ms 후 3개의 동일한 오류 억제)

 집주인| 게시됨 2021. 4. 13. 오후 4:26:58 |
메시지 소비 원리:

실제 생산 과정에서 각 주제는 여러 파티션을 가지며, 여러 파티션의 장점은 한편으로는 브로커에서 데이터를 샤딩할 수 있어 메시지 용량을 효과적으로 줄이고 IO 성능을 향상시킨다는 점입니다. 반면, 소비자 측의 소비 능력을 향상시키기 위해 동일한 주제를 여러 소비자, 즉 소비자 측의 부하 분산 메커니즘을 통해 소비하게 됩니다. 이 메커니즘은 다음에 이해할 것입니다. 다중 파티션과 다중 소비자의 경우 소비자는 어떻게 메시지를 소비하는가? 카프카는 소비자 그룹의 개념에 존재하는데, 이는 소비자 그룹에 속하는 동일한 종류의 소비자 group.id 있으며, 그룹 내 모든 소비자가 구독 주제의 모든 분할을 소비하도록 조정합니다. 물론, 각 파티션은 같은 소비자 그룹 내 소비자만 소비할 수 있으므로, 같은 소비자 그룹의 소비자들은 어떤 파티션에 소비해야 할지 어떻게 데이터를 할당할까요? 간단한 예로, 분할이 손실되고 있다면, 즉 파트리톤의 수가 소비자 수와 같을 때 각 소비자는 분할에 해당하며, 소비자 수가 파티션보다 많으면 추가 소비자 수는 작동하지 않으며, 반대로 여러 파티션을 소비하는 소비자가 존재합니다.

구역 할당 전략:

카프카에는 두 가지 파티션 할당 전략이 있는데, 하나는 범위(기본값)이고 다른 하나는 라운드로빈(폴링)입니다. 이는 소비자의 구성 partition.assignment.strategy 매개변수에 의해 설정됩니다.


모든 주제 보기


주제에 대한 세부 정보 보기




 집주인| 게시됨 2021. 5. 8. 오후 5:17:33 |
Kafka는 소비자 그룹을 삭제합니다



요청된 소비자 그룹('itsvse') 삭제는 성공했습니다.


다음과 같은 오류가 보고될 수 있습니다:

Error: Deletion of some consumer groups failed:
* 그룹 'itsvse'는 다음과 같은 이유로 삭제할 수 없었다: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: 그룹이 비어 있지 않다.
용액

모든 메시지를 소비하거나 오프셋을 설정하세요

카프카는 수동으로 오프셋 오프셋을 설정합니다
https://www.itsvse.com/thread-9641-1-1.html
그럼 다시 삭제하세요!

 집주인| 게시됨 2021. 4. 13. 오후 3:40:48 |
파워 셸 명령어



각 소비자 클라이언트는 Kafka 서비스에 2개의 연결을 유지합니다
 집주인| 게시됨 2021. 5. 7. 오후 12:37:06 |
Kafka, 지정된 그룹 내 주제 스택 수를 보기 위해

게시됨 2021. 6. 16. 오후 12:41:09 |
코드를 왜 볼 수 없는지 물어보세요~
 집주인| 게시됨 2021. 6. 25. 오전 10:50:06 |
카프카는 주제 크기 명령을 받습니다:



 집주인| 게시됨 2021. 7. 18. 오전 10:15:01 |
주제를 만들기 위한 Kafka 명령줄:

게시됨 2021. 9. 3. 오전 11:52:41 |
카프카에는 여전히 많은 함정이 존재한다는 것을 배웠다
면책 조항:
Code Farmer Network에서 발행하는 모든 소프트웨어, 프로그래밍 자료 또는 기사는 학습 및 연구 목적으로만 사용됩니다; 위 내용은 상업적 또는 불법적인 목적으로 사용되지 않으며, 그렇지 않으면 모든 책임이 사용자에게 부담됩니다. 이 사이트의 정보는 인터넷에서 가져온 것이며, 저작권 분쟁은 이 사이트와는 관련이 없습니다. 위 내용은 다운로드 후 24시간 이내에 컴퓨터에서 완전히 삭제해야 합니다. 프로그램이 마음에 드신다면, 진짜 소프트웨어를 지원하고, 등록을 구매하며, 더 나은 진짜 서비스를 받아주세요. 침해가 있을 경우 이메일로 연락해 주시기 바랍니다.

Mail To:help@itsvse.com