Kafka는 링크드인에서 개발한 고성능 분산 메시징 시스템으로, 로그 수집, 스트리밍 데이터 처리, 온라인 및 오프라인 메시지 배포 등 다양한 시나리오에서 널리 사용됩니다. 전통적인 MC로 설계된 것은 아니지만, Kafaka는 대부분의 경우 ActiveMQ와 같은 전통적인 메시징 시스템을 대체할 수 있습니다.
Kafka는 메시지 흐름을 주제별로 조직하며, 메시지를 보관하는 서버를 브로커라고 하며, 소비자는 하나 이상의 주제에 구독할 수 있습니다. 부하를 균형 있게 하기 위해 주제의 메시지를 여러 파티션으로 나눌 수 있으며, 파티션이 많을수록 Kafka의 병렬성과 처리량이 높아집니다.
Kafka 클러스터는 클러스터를 구현하기 위해 Zookeeper 지원이 필요하며, zookeeper는 최신 kafka 배포판에 이미 포함되어 있어 zookeeper 서버와 Kafka 서버를 동시에 시작하거나 기존 Zookeeper 클러스터를 사용할 수 있습니다.
전통적인 MC와 달리, 소비자는 스스로 오프셋을 유지해야 하며, Kafka에서 메시지를 받을 때는 현재 오프셋 이후에만 메시지를 가져옵니다. Kafka의 Scala/Java 클라이언트는 이미 이 로직 부분을 Zookeeper에 오프셋을 저장해 구현하고 있습니다. 각 소비자는 ID를 선택할 수 있으며, 같은 ID를 가진 소비자는 단 한 번만 같은 메시지를 받게 됩니다.만약 한 주제의 소비자들이 모두 같은 ID를 사용한다면, 그것은 전통적인 큐입니다. 각 소비자가 다른 ID를 사용한다면, 그것은 전통적인 pub-sub입니다.
복습:
카프카 소비
1. 같은 group_id 소비자 중 한 명만 메시지를 소비할 수 있습니다 (큐 큐 모드)
2. 서로 다른 group_id 소비자들이 동일한 뉴스를 받는다
카프카의 장점
분산되어 있고 매우 확장 가능합니다. Kafka 클러스터는 투명하게 확장되어 새로운 서버를 클러스터에 추가할 수 있습니다.
고성능 Kafka의 성능은 ActiveMQ와 RabbitMQ와 같은 전통적인 MQ 구현, 특히 배치 연산을 지원하는 Kafka보다 훨씬 뛰어납니다. 다음 이미지는 LinkedIn의 소비자 성과 스트레스 테스트 결과를 보여줍니다:
결함 허용성. Kafka의 각 파티션에서 나온 데이터는 여러 서버에 복제됩니다. 중개인이 실패하면 ZooKeeper 서비스는 생산자와 소비자에게 통보하며, 소비자는 다른 중개인으로 전환합니다.
카프카의 단점:
메시지를 반복하세요. 카프카는 각 메시지가 적어도 한 번은 전달될 것만 보장하며, 확률은 낮지만 메시지가 여러 번 전달될 가능성도 있습니다. 뉴스가 순서가 뒤죽박죽이야. 파티션 내 메시지는 질서 정연하게 유지되지만, 주제에 여러 파티션이 있으면 파티션 간 메시지 전달이 질서 정연하다는 보장은 없습니다. 복잡성. Kafka는 Zookeeper 클러스터 지원이 필요하며, 주제들은 일반 메시지 큐보다 더 비용이 많이 드는 수작업 작업이 필요합니다
.NET/C# 메시지 큐 Kafka 작업
먼저, .NET Core 3.1을 사용해 Kafka-Consumer와 Kafka-Producer라는 두 개의 새로운 콘솔 프로젝트를 만듭니다
Nuget을 사용하여 Confluent.Kafka 패키지를 다음과 같이 참조하며, 다음 명령어를 사용합니다:
GitHub 주소:하이퍼링크 로그인이 보입니다.
먼저 프로듀서 프로그램을 시작하고, 컨퍼머를 먼저 시작하면 다음과 같은 오류가 발생합니다:
오류 발생: 브로커: 알 수 없는 주제 또는 파티션 이 글은 여러 설정을 다룰 것입니다EnableAutoOffsetStore 는 false입니다즉, 오프셋 저장 공간을 수동으로 설정하는 것(수동 확인 메시지와 유사)
소비자는 소비 후 오프셋스토어를 설정하지 않습니다
프로듀서를 사용해 두 개의 메시지를 생성하고, 소비자 소비를 켜고, MaxPollIntervalMs = 10000 // 수동 설정 없이 10초를 설정하며, 다른 클라이언트가 소비하도록 허용하세요. 물론 10초 이내에 다른 클라이언트가 소비하지 않습니다
MaxPollIntervalMs 설명
고급 소비자의 경우, 통화 간 메시지를 소비할 수 있는 최대 허용 시간(예: rd_kafka_consumer_poll())). 이 구간을 초과하면 소비자가 실패한 것으로 간주되어 그룹이 재균형되어 파티션이 다른 소비자 그룹 멤버에게 재할당됩니다. 경고: 현재로서는 오프셋 커밋이 불가능할 수 있습니다. 참고: 오랜 시간 처리되는 애플리케이션은 "enable.auto.offset.store=false"를 설정하고, 메시지 처리 후 offsets_store()를 사용해* 오프셋을 명시적으로 저장하여 처리가 완료되기 전에 자동으로 커밋되지 않도록 하는 것이 권장됩니다. 초당 두 번씩 한 번 확인하세요. 자세한 내용은 KIP-62를 참조하세요. 렌더링은 다음과 같습니다:
오프셋스토어는 소비자가 지출을 마친 후에 설정됩니다
코드
설정이 끝난 후에도 10초 정도 기다려도 여전히 작동합니다마지막 메시지 받았습니다(소비자가 중개인과 연결될 때,오프셋 위치에서 소비를 시작하세요c.Commit(cr)이 설정되어 있다면; 마지막 메시지는 반복적으로 수신되지 않습니다.
소스 코드 보기
오프셋 + 1 커밋을 커밋하고, 결국 Librdkafka.topic_partition_list_destroy(cOffsets)를 호출합니다;
하이퍼링크 로그인이 보입니다.
하이퍼링크 로그인이 보입니다.
다른 GroupID 설정
명령줄 매개변수를 통해 다른 GroupID를 설정한 후, 다음 이미지와 같이 프로듀서를 통해 메시지를 보내보세요:
clinet1과 client2 모두과거 메시지 수신하기, 그리고 제작자가 메시지를 보내면 두 가지 모두 거의동시에 메시지를 수신하기。
신규 소비자는 새로운 메시지만 받습니다
새 클라이언트가 새 메시지만 받고 과거 데이터는 무시하게 하려면 어떻게 해야 하나요?
배경은 다음과 같습니다:
아래에 나와 있습니다:
프로듀서 코드
다음과 같습니다:
소비자 코드
다음과 같습니다:
소스 코드 다운로드
관광객 여러분, 이 게시물의 숨겨진 내용을 보고 싶으시다면 부탁드립니다 회답
|