Middleware/Kafka&RabbitMQ 2022. 7. 27. 16:19

spring.cloud.stream.bindings.<channelName>.consumer.concurrency 옵션과 관련해 설명한다.

 

아래 상황이 있다고 가정하자.

 

  • 토픽 이름 : A-topic
  • 파티션 개수 : 4
  • 앱 인스턴스 개수 : 1(concurrency == 2)

 

보통은 위와 같이 설정을 하게 되면, 당연히 하나의 앱에서 2개는 동시 처리 하겠구나 생각을 하기 마련이지만 실제로는 그렇게 동작하지 않을 수 있다. 아래와 같이 컨슈머 그룹이 구성 되어 있다고 생각해보자

 

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group a-topic-group --describe

GROUP         TOPIC       PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                             CLIENT-ID
a-topic-group a-topic     2          18846           18846           0               consumer-a-topic-3-cdb73066-d0de-4079-8fb0-a5663ba9c76d consumer-a-topic-3
a-topic-group a-topic     3          18887           18887           0               consumer-a-topic-3-cdb73066-d0de-4079-8fb0-a5663ba9c76d consumer-a-topic-3
a-topic-group a-topic     0          18838           18838           0               consumer-a-topic-2-2f6ac6eb-35f1-464a-b89e-0ea0579c28b2 consumer-a-topic-2
a-topic-group a-topic     1          19020           19020           0               consumer-a-topic-2-2f6ac6eb-35f1-464a-b89e-0ea0579c28b2 consumer-a-topic-2

 

위 그룹을 보면 2개의 컨슈머(CONSUMER-ID를 기준으로 보자)가 각각 2개의 파티션을 나눠가지고 있는데, 여기서 카프카의 특징을 보아야 한다. "하나의 컨슈머는 요청 하나씩만 처리가능하다" 이 특징에서 우리가 기대한대로 동작이 안하는것인데, 이유는 메시지가 2개가 발행되었고 이 2개의 메시지가 위 파티션중에 0,1번에 들어갔다고 보자. 그러면 우리는 2개가 동시에 처리 될것이라 기대하지만 실제로 컨슈머가 한놈이기때문에 먼저 들어온 것을 처리하고 다음것을 처리한다.(컨슈머 하나는 반드시 한번에 하나의 요청만 처리) 그렇기에 2개가 동시에 실행되지 않는 것이고 만약에 운좋게 서로 다른 컨슈머가 붙은 파티션으로 메시지가 들어가면 2개가 동시에 처리 될것이다.

 

그렇다면 우리가 동시에 처리하고 싶다면? "concurrency == 파티션 개수"로 설정해주면 된다. 그러면 아래와 같이 하나의 앱에 4개의 컨슈머가 각각 다른 파티션에 할당될 것이고 실제로 4개의 메시지가 동시에 처리 될 것이다.

 

 

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group a-topic-group --describe

GROUP         TOPIC       PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                             CLIENT-ID
a-topic-group a-topic     2          18846           18846           0               consumer-a-topic-3-cdb73066-d0de-4079-8fb0-a5663ba9c76d consumer-a-topic-3
a-topic-group a-topic     3          18887           18887           0               consumer-a-topic-3-dfdfad01-d0de-4079-8fb0-a5663ba9c76d consumer-a-topic-3
a-topic-group a-topic     0          18838           18838           0               consumer-a-topic-2-dcvzc023-35f1-464a-b89e-0ea0579c28b2 consumer-a-topic-2
a-topic-group a-topic     1          19020           19020           0               consumer-a-topic-2-2f6ac6eb-35f1-464a-b89e-0ea0579c28b2 consumer-a-topic-2
posted by 여성게
:
Middleware/Kafka&RabbitMQ 2019. 7. 6. 13:28

 

 

이전 포스팅들에서 이미 카프카란 무엇이고, 카프카 프로듀서부터 컨슈머, 스트리밍까지 다루어보았다. 하지만 이번 포스팅을 시작으로 조금더 내용을 다듬고 정리된 상태의 풀세트의 카프카 포스팅을 시작할 것이다.

 

카프카 시스템의 목표

  • 메시지 프로듀서와 컨슈머 사이의 느슨한 연결
  • 다양한 형태의 데이터 사용 시나리오와 장애 처리 지원을 위한 메시지 데이터 유지
  • 빠른 처리 시간을 지원하는 구성 요소로 시스템의 전반적인 처리량을 최대화
  • 이진 데이터 형식을 사용해서 다양한 데이터 형식과 유형을 관리
  • 기존의 클러스터 구성에 영향을 주지 않고 일정한 서버의 확장성을 지원

 

카프카의 구조

카프카 토픽에서 모든 메시지는 바이트의 배열로 표현되며, 카프카 프로듀서는 카프카 토픽에 메시지를 저장하는 애플리케이션이다. 이렇게 프로듀서가 보낸 데이터를 저장하고 있는 모든 토픽은 하나 이상의 파티션으로 나뉘어져있다. 각 토픽에 대해 여러개로 나뉘어져 있는 파티션은 메시지를 도착한 순서에 맞게 저장한다.(내부적으로는 timestamp를 가지고 있다.) 카프카에서는 프로듀서와 컨슈머가 수행하는 두 가지 중요한 동작이 있다. 프로듀서는 로그 선행 기입 파일 마지막에 메시지를 추가한다. 컨슈머는 주어진 토픽 파티션에 속한 로그 파일에서 메시지를 가져온다. 물리적으로 각 토픽은 자신에게 할당된 하나 이상의 파티션을 다른 브로커(카프카 데몬 서버)들에게 균등하게 분배된다.

 

이상적으로 카프카 파이프라인은 브로커별로 파티션과 각 시스템의 모든 토픽에 대해 균등하게 분배되어야 한다. 컨슈머는 토픽에 대한 구독 또는 이런 토픽에서 메시지를 수신하는 애플리케이션이다.

 

이러한 카프카 시스템은 고가용성을 위한 클러스터를 지원한다. 전형적인 카프카 클러스터는 다중 브로커로 구성된다. 클러스터에 대한 메시지 읽기와 쓰기 작업의 부하 분산을 돕는다. 각 브로커는 자신의 상태를 저장하지 않지만 Zookeeper를 사용해 상태 정보를 유지한다. 각각의 토픽 파티션에는 리더로 활동하는 브로커가 하나씩 있고, 0개 이상의 팔로워를 갖는다. 여느 리더/팔로워 관계와 비슷하게 리더는 해당하는 파티션의 읽기나 쓰기 요청을 관리한다. 여기서 Zookeeper는 카프카 클러스터에서 중요한 요소로, 카프카 브로커와 컨슈머를 관리하고 조정한다. 그리고 카프카 클러스터 안에서 새로운 브로커의 추가나 기존 브로커의 장애를 감시한다. 예전 카프카 버전에서는 파티션 오프셋 관리도 Zookeeper에서 관리하였지만 최신 버전의 카프카는 오프셋 관리를 위한 토픽이 생성되어 오프셋관련된 데이터를 하나의 토픽으로 관리하게 된다.

 

 

메시지 토픽

메시징 시스템에서 메시지는 어디간에 저장이 되어 있어야한다. 카프카는 토픽이라는 곳에 메시지를 바이트 배열 형태로 저장한다. 각 토픽은 비즈니스 관점에서 하나의 카테고리가 될 수 있다. 다음은 메시지 토픽에 대한 용어 설명이다.

 

  • 보관 기간 : 토픽 안의 메시지는 처리 시간과 상관없이 정해진 기간 동안에만 메시지를 저장하고 있는다. 기본 값은 7일이며 사용자가 값 변경이 가능하다.
  • 공간 유지 정책 : 메시지의 크기가 설정된 임계값에 도달하면 메시지를 지우도록 설정가능하다. 카프카 시스템 구축시 충분한 용량 계획을 수립해야 원치않은 삭제가 발생하지 않는다.
  • 오프셋 : 카프카에 할당된 각 메시지는 오프셋이라는 값이 사용된다. 토픽은 많은 파티션으로 구성돼 있으며, 각 파티션은 도착한 순서에 따라 메시지를 저장하고, 컨슈머는 이러한 오프셋으로 메시지를 인식하고 특정 오프셋 이전의 메시지는 컨슈머가 수신했던 메시지로 인식한다.
  • 파티션 : 카프카 메시지 토픽은 1개 이상의 파티션으로 구성되어 분산 처리된다. 해당 파티션의 숫자는 토픽 생성시 설정가능하다. 만약 순서가 아주 중요한 데이터의 경우에는 파티션 수를 1개로 지정하는 것도 고려할 수 있다.(아무리 파티션이 시계열로 데이터가 저장되지만 여러 파티션에 대한 메시지 수신은 어떠한 순서로 구독될지 모르기 때문이다.)
  • 리더 : 파티션은 지정된 복제 팩터에 따라 카프카 클러스터 전역에 걸쳐 복제된다. 각 파티션은 리더 브로커와 팔로워 브로커를 가지며 파티션에 대한 모든 읽기와 쓰기 요청은 리더를 통해서만 진행된다.

 

메시지 파티션

각 메시지는 파티션에 추가되며 각 단위 메시지는 오프셋으로 불리는 숫자에 맞게 할당된다. 카프카는 유사한 키를 갖고 있는 메시지가 동일한 파티션으로 전송되도록 하며, 메시지 키의 해시 값을 산출하고 해당 파티션에 메시지를 추가한다. 여기서 중요하게 짚고 넘어가야 할 것이 있다. 각 단위 메시지에 대한 시간적인 순서는 토픽에 대해서는 보장되지 않지만, 파티션 안에서는 항상 보장된다. 즉, 나중에 도착한 메시지가 항상 파티션의 끝 부분에 추가됨을 의미한다. 예를 들어 설명하자면,

 

A 토픽은 a,b,c라는 파티션으로 구성된다는 가정이다. 만약 라운드 로빈 방식으로 메시지가 각 파티션에 분배가 되는 옵션을 적용했다고 생각해보자. 1,2,3,4,5,6이라는 메시지가 pub되었고 각 메시지는 a-1,4 / b-2,5 / c-3,6 와 같이 파티션에 분배된다. 1,2,3,4,5,6이라는 순서로 메시지가 들어왔고 해당 메시지는 시간적인 순서와 라운드로빈 방식으로 a,b,c라는 파티션에 배분되었다. 또한 각 파티션 안을 살펴보면 확실히 시간 순서로 배분되었다. 하지만 컨슈머 입장에서는 a,b,c파티션에서 데이터를 수신했을 경우 1,4,2,5,3,6이라는 메시지 순서로 데이터를 수신하게 될것이다.(a,b,c순서로 수신, 하지만 파티션 수신순서는 바뀔수 있다.) 무엇을 의미할까? 위에서 말한것과 같이 파티션 내에서는 시간적인 순서를 보장하지만 전체적인 토픽관점에서는 순서가 고려되지 않는 것이다. 즉, 순서가 중요한 데이터는 동일한 키를 사용해 동일 파티션에만 데이터를 pub하거나 혹은 토픽에 파티션을 하나만 생성해 하나의 파티션에 시간적인 순서로 데이터를 저장되게 해야한다.

 

많은 수의 파티션을 구성하는 경우의 장단점

  • 높은 처리량 보장 : 파티션을 여러개 구성한다면 병렬 처리되어 높은 처리량을 보장할 것이다. 왜냐하면 여러 파티션에 대해 쓰기 동작이 여러 스레드를 이용해 동시에 수행되기 때문이다. 또한 하나의 컨슈머 그룹 내에서 하나의 파티션에 대해 하나의 컨슈머가 할당되기 때문에 여러 파티션의 메시지를 여러 컨슈머가 동시에 수신하여 병렬처리가 가능하다. 여기서 중요한 것은 동일한 컨슈머 그룹 안의 하나의 파티션에 대해 여러 컨슈머가 읽을 수 없다는 것이다.
  • 프로듀서 메모리 증가 : 만약 파티션 수가 많아 진다면 일시적으로 프로듀서의 버퍼의 메모리가 과도해질 수 있어, 토픽에 메시지를 보내는데 문제가 생길 수 있으므로 파티션 수는 신중히 고려해야한다.
  • 고가용성 문제 : 여러 파티션을 생성하므로서 카프카 클러스터의 고가용성을 지원한다. 즉, 리더인 파티션이 중지되면 팔로워중에 하나가 리더로 선출될 것이다. 하지만 너무 과중한 파티션 수는 리더 선출에 지연이 생길 가능성이 있기 때문에 신중히 고려해야한다.

복제와 복제로그

복제는 카프카 시스템에서 신뢰성있는 시스템을 구현하기 위해 가장 중요한 부분이다. 각 토픽 파티션에 대한 메시지 로그의 복제본은 카프카 클러스터 내의 여러 서버에 걸쳐서 관리되고, 각 토픽마다 복제 팩터를 다르게 지정가능하다.

 

일반적으로 팔로워는 리더의 로그 복사본을 보관하는데, 이는 리더가 모든 팔로워로부터 ACK를 받기 전까지는 메시지를 커밋하지 않는다는 것을 의미한다.(ack=all 설정시)

 

메시지 프로듀서 

일반적으로 프로듀서는 파티션으로 데이터를 쓰기 않고, 메시지에 대한 쓰기 요청을 생성해서 리더 브로커에게 전송한다. 그 이후 파티셔너가 메시지의 해시 값을 계산하여 프로듀서로 하여금 어느 파티션에 메시지를 pub할지 알 수 있도록 한다.

 

일반적으로 해시 값은 메시지 키를 갖고 계산하며, 메시지 키는 카프카의 토픽으로 메시지를 기록할 때 제공된다. null 키를 갖는 메시지는 분산 메시징을 지원하는 파티션에 대해 라운드 로빈 방식으로 분배된다. 카프카에서의 각 파티션은 한 개의 리더를 가지며, 각 읽기와 쓰기 요청은 리더를 통해 진행된다.

 

프로듀서는 설정에 따라 메시지의 ACK를 기다리며, 일반적으로 모든 팔로워 파티션에 대해 복제가 완료되면 커밋을 완료한다. 또한 커밋이 완료되지 않으면 읽기 작업은 허용되지 않는다. 이는 메시지의 손실을 방지한다. 그렇지만 ACK 설정을 1로 설정할 수 있는데, 이경우 리더 파티션이 하나의 팔로워에게만 복제 완료 응답을 받으면 커밋을 완료한다. 이 경우 처리 성능은 좋아지지만 메시지의 손실은 어느정도 감수 해야한다. 즉, 메시지의 손실을 허용하고 빠른 처리 시간을 원하는 경우에 사용할 수 있는 설정이다.

 

메시지 컨슈머

카프카 토픽을 구독하는 역할을 하는 애플리케이션이다. 각 컨슈머는 컨슈머 그룹에 속해 있으며, 일부 컨슈머 그룹은 여러개의 컨슈머를 포함한다. 위에서도 간단히 설명하였지만 동일 그룹의 컨슈머들은 동시에 같은 토픽의 서로다른 파티션에서 메시지를 읽어온다. 하지만 서로 다른 그룹의 컨슈머들은 같은 토픽에서 데이터를 읽어와 서로 영향을 미치지 않으면서 메시지 사용이 가능하다.

 

주키퍼의 역할

  • 컨트롤러 선정 : 컨트롤러는 파티션 관리를 책임지는 브로커 중에 하나이며, 파티션 관리는 리더 선정, 토픽 생성, 파티션 생성, 복제본 관리 등을 포함한다. 하나의 노드 또는 서버가 꺼지면 카프카 컨트롤러는 팔로워 중에서 파티션 리더를 선정한다. 카프카는 컨트롤러를 선정하기 위해 주키퍼의 메타데이터를 정보를 이용한다.
  • 브로커 메타데이터 : 주키퍼는 카프카 클러스터의 일부인 각 브로커에 대해 상태 정보를 기록한다.
  • 토픽 메타데이터 : 주키퍼는 또한 파티션 수, 특정한 설정 파라미터 등의 토픽 메타 데이터를 기록한다.

이외에 주키퍼는 더 많은 역할을 담당하고 있다.

 

여기까지 간단히 카프카에 대한 소개였다. 이후 포스팅들에서는 프로듀서,컨슈머,스트리밍,클러스터 구축 등의 내용을 몇 차례에 거쳐 다루어 볼 것이다.

posted by 여성게
:
Middleware/Kafka&RabbitMQ 2019. 3. 24. 13:54

Kafka - Kafka Consumer(카프카 컨슈머) Java&CLI



이전 포스팅에서 kafka producer를 java 소스기반으로 예제를 짜보았습니다. 이번 포스팅은 kafka consumer를 java 소스로 다루어보려고 합니다.

Kafka Producer(카프카 프로듀서)가 메시지를 생산해서 카프카의 토픽으로 메시지를 보내면 그 토픽의 메시지를 가져와서 소비(consume)하는 역할을 하는 애플리케이션, 서버 등을 지칭하여 컨슈머라고 한다. 컨슈머의 주요 기능은 특정 파티션을 관리하고 있는 파티션 리더에게 메시지를 가져오기 요청을 하는 것이다. 각 요청은 컨슈머가 메시지 오프셋을 명시하고 그 위치로부터 메시지를 수신한다. 그래서 컨슈머는 가져올 메시지의 위치를 조정할 수 있고, 필요하다면 이미 가져온 데이터도 다시 가져올 수 있다. 이렇게 이미 가져온 메시지를 다시 가져올 수 있는 기능은 여타 다른 메시지 시스템(래빗엠큐,RabbitMQ)들은 제공하지 않는 기능이다.(내부 구동방식이 다른 이유도 있음) 최근의 메시지큐 솔루션 사용자들에게 이러한 기능은 필수가 되고 있다. 

카프카에서 컨슈머라고 불리는 컨슈머는 두 가지 종류가 있는데, 올드 컨슈머(Old Consumer)와 뉴 컨슈머(New Consumer)이다. 두 컨슈머의 큰 차이점은 오프셋에 대한 주키퍼 사용 유무이다. 구 버전의 카프카에서는 컨슈머의 오프셋을 주키퍼의 znode에 저장하는 방식을 지원하다가 카프카 0.9버전부터 컨슈머의 오프셋 저장을 주키퍼가 아닌 카프카의 도픽에 저장하는 방식으로 변경되었다. 아마 성능상의 이슈때문에 오프셋 저장 정책을 바꾼것 같다. 두가지 방식을 특정 버전이전까지는 지원하겠지만 아마 추후에는 후자(뉴 컨슈머)의 방식으로 변경되지 않을까싶다. 코드 레벨로 카프카 컨슈머를 다루기전 카프카 컨슈머의 주요 옵션을 본다.




컨슈머 주요 옵션(Consumer option)

-bootstrap.servers : 카프카 클러스터에 처음 연결을 하기 위한 호스트와 포트 정보로 구성된 리스트 정보이다.


-fetch.min.bytes : 한번에 가져올 수 있는 최소 데이터 사이즈이다. 만약 지정한 사이즈보다 작은 경우, 요청에 대해 응답하지 않고 데이터가 누적될 때까지 기다린다.


-group.id : 컨슈머가 속한 컨슈머 그룹을 식별하는 식별자이다.


-enable.auto.commit : 백그라운드에서 주기적으로 오프셋을 자동 커밋한다.


-auto.offset.reset : 카프카에서 초기 오프셋이 없거나 현재 오프셋이 더 이상 존재하지 않은 경우에 다음 옵션으로 리셋한다.

1)earliest : 가장 초기의 오프셋값으로 설정

2)latest : 가장 마지막의 오프셋값으로 설정

3)none : 이전 오프셋값을 찾지 못하면 에러를 발생


-fetch.max.bytes : 한번에 가져올 수 있는 최대 데이터 사이즈


-request.timeout.ms : 요청에 대해 응답을 기다리는 최대 시간


-session.timeout.ms : 컨슈머와 브로커사이의 세션 타임 아웃시간. 브로커가 컨슈머가 살아있는 것으로 판단하는 시간(기본값 10초) 만약 컨슈머가 그룹 코디네이터에게 하트비트를 보내지 않고 session.timeout.ms이 지나면 해당 컨슈머는 종료되거나 장애가 발생한 것으로 판단하고 컨슈머 그룹은 리밸런스(rebalance)를 시도한다. session.timeout.ms는 하트비트 없이 얼마나 오랫동안 컨슈머가 있을 수 있는지를 제어하는 시간이며, 이 속성은 heartbeat.interval.ms와 밀접한 관련이 있다. session.timeout.ms를 기본값보다 낮게 설정하면 실패를 빨리 감지 할 수 있지만, GC나 poll 루프를 완료하는 시간이 길어지게 되면 원하지 않게 리밸런스가 일어나기도 한다. 반대로는 리밸런스가 일어날 가능성은 줄지만 실제 오류를 감지하는 데 시간이 오래 걸릴 수 있다.


-hearbeat.interval.ms : 그룹 코디네이터에게 얼마나 자주 KafkaConsumer poll() 메소드로 하트비트를 보낼 것인지 조정한다. session.timeout.ms와 밀접한 관계가 있으며 session.timeout.ms보다 낮아야한다. 일반적으로 1/3 값정도로 설정한다.(기본값 3초)


-max.poll.records : 단일 호출 poll()에 대한 최대 레코드 수를 조정한다. 이 옵션을 통해 애플리케이션이 폴링 루프에서 데이터를 얼마나 가져올지 양을 조정할 수 있다.


-max.poll.interval.ms : 컨슈머가 살아있는지를 체크하기 위해 하트비트를 주기적으로 보내는데, 컨슈머가 계속해서 하트비트만 보내고 실제로 메시지를 가져가지 않는 경우가 있을 수도 있다. 이러한 경우 컨슈머가 무한정 해당 파티션을 점유할 수 없도록 주기적으로 poll을 호출하지 않으면 장애라고 판단하고 컨슈머 그룹에서 제외한 후 다른 컨슈머가 해당 파티션에서 메시지를 가져갈 수 있게한다.


-auto.commit.interval.ms : 주기적으로 오프셋을 커밋하는 시간


-fetch.max.wait.ms : fetch.min.bytes에 의해 설정된 데이터보다 적은 경우 요청에 응답을 기다리는 최대 시간


기타 많은 설정들이 있다. 혹시나 인증과 ssl 등의 다양한 설정들을 알고 싶다면 공식 홈페이지를 참고하길 바란다.(https://kafka.apache.org/documentation/#consumerconfigs)




콘솔 컨슈머로 메시지 가져오기


카프카는 기본적으로 콘솔로 메시지를 가져올 수 있는 명령어를 제공한다. 우선 진행하기 앞서 예제로 진행하려는 환경은 이전 포스팅에서 구성해보았던 카프카 클러스터 환경에서 진행한다. 만약 카프카 클러스터 환경을 구성해본적이 없다면 밑의 링크를 참고하기 바란다.


▶︎▶︎▶︎2019/03/13 - [Kafka&RabbitMQ] - Kafka - Kafka(카프카) cluster(클러스터) 구성 및 간단한 CLI사용법


클러스터 환경 구축 후에 예제로 토픽하나를 생성했다는 가정하게 진행한다.



Kafka console producer로 접속하여 메시지를 보내보면 위의 컨슈머로 메시지가 전달 될 것이다. 이러한 컨슈머를 실행할 때는 항상 컨슈머 그룹이라는 것이 필요하다. 토픽의 메시지를 가져오기 위한 kafka-console-consumer.sh 명령어를 실행하면서 추가 옵션으로 컨슈머 그룹 이름을 지정해야 하는데, 만약 추가 옵션을 주지 않고 실행한 경우에는 자동으로 console-consumer-xxxxx(숫자)로 컨슈머 그룹이 생성된다.



필자가 예제로 생성한 그룹아이디도 보인다. 이렇게 그룹아이디 리스트를 볼 수 있는 명령어도 제공한다. 혹시나 모르시는 분들을 위해 토픽 리스트를 볼 수 있는 명령어도 있다.



보면 필자가 사용하고 있는 카프카 버전은 뉴 컨슈머를 이용하는 것을 알 수 있다. 바로 메시지 오프셋을 저장하기 위한 __consumer_offsets가 토픽에 존재하기 때문이다.



또한 컨슈머를 실행시킬 때에 위에서 설명한 것과 같이 그룹아이디 값 옵션을 추가하여 실행시킬 수도 있다.



자바코드를 이용한 컨슈머


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import java.util.Arrays;
import java.util.Properties;
import java.time.Duration;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
 
import lombok.extern.slf4j.Slf4j;
 
/**
 * Kafka Consumer Exam
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaBookConsumer1 {
    
    public static Properties init() {
        Properties props = new Properties();
        props.put("bootstrap.servers""localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put("group.id""exam-consumer-group");
        props.put("enable.auto.commit""true");
        props.put("auto.offset.reset""latest");
        props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }
    
    /**
     * 컨슈머예제
     */
    public static void consume() {
        
        try (KafkaConsumer<StringString> consumer = new KafkaConsumer<>(init());){
            //토픽리스트를 매개변수로 준다. 구독신청을 한다.
            consumer.subscribe(Arrays.asList("test-topic"));
            while (true) {
              //컨슈머는 토픽에 계속 폴링하고 있어야한다. 아니면 브로커는 컨슈머가 죽었다고 판단하고, 해당 파티션을 다른 컨슈머에게 넘긴다.
              ConsumerRecords<StringString> records = consumer.poll(Duration.ofMillis(100));
              for (ConsumerRecord<StringString> record : records)
                log.info("Topic: {}, Partition: {}, Offset: {}, Key: {}, Value: {}\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
            }
        } finally {}
    }
        
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        consume();
    }
 
}
 
cs


소스에 대해 간단히 설명하면 우선 컨슈머 옵션들을 Properties 객체로 정의한다. 오프셋 리셋 옵션을 lastest로 주어 토픽의 가장 마지막부터 메시지를 가져온다. 메시지의 키와 값의 역직렬화 옵션을 문자열로 해준다. 이렇게 컨슈머 옵션을 설정하고 해당 옵션값을 이용하여 KafkaConsumer 객체를 생성해준다. 해당 객체는 사용이 후에 꼭 close 해줘야 하기 때문에 try문 안에 객체 생성코드를 넣어놔 자동으로 자원반납을 할 수 있게 해주었다. 그리고 해당 컨슈머 객체로 토픽에 대해 구독신청을 해준다. 구독 대상이 되는 토픽은 리스트로 여러개 지정가능하다. 이것 또한 카프카의 장점중 하나이다.(하나의 컨슈머가 여러 메시지큐를 구독하는 것)

그리고 해당 컨슈머는 계속해서 폴링한다. 무한 루프로 계속해서 폴링하지 않으면 컨슈머가 종료된 것으로 간주되어 컨슈머에 할당된 파티션은 다른 컨슈머에게 전달되고 다른 컨슈머에 의해 메시지가 컨슘된다. poll() 메소드의 매개변수는 타임아웃 주기이다. 이 타임아웃주기는 데이터가 컨슈머 버퍼에 없다면 poll()은 얼마 동안 블록할지를 조정하는 것이다. 또한 poll()은 레코드 전체를 리턴하고, 레코드에는 토픽,파티션,파티션의 오프셋,키,값을 포함하고 있다. 한 번에 하나의 메시지만 가져오는 것이 아니라 여러개의 메시지를 가져오기 때문에 for-each로 데이터들을 처리하고 있다.




메시지의 순서


파티션이 3개인 토픽을 새로 만들어서 해당 토픽으로 메시지를 a,b,c,d,e 순서대로 보내보았다. 그리고 컨슈머 실행후 결과를 보니 a,d,b,e,c 순서대로 메시지가 들어왔다. 컨슈머에 문제가 있는 것인가? 아니면 진짜 내부적으로 문제가 있어서 순서가 보장되지 않은 것인가 궁금할 수 있다. 하지만 지극히 정상인 결과값이다.


먼저 해당 토픽에 메시지가 어떻게 저장되어 있는지 부터 확인해봐야 한다. 해당 토픽은 파티션이 3개로 구성되어 있기 때문에 각 파티션별로 메시지가 어떻게 저장되어 있는지 확인해봐야한다.



0번 파티션에는 b,e 1번 파티션에는 a,d 2번 파티션에는 c 데이터가 저장되어 있는 것을 볼수 있다. 즉, 컨슈머는 프로듀서가 어떤 순서대로 메시지를 보내는지 알수 없다. 단지 파티션의 오프셋 기준으로만 메시지를 가져올 뿐이다. 이말은 무엇이냐면, 카프카 컨슈머에서의 메시지 순서는 동일한 파티션내에서만 유지되고 파티션끼리의 메시지 순서는 보장하지 않는 것이다. 


카프카를 사용하면서 메시지의 순서를 보장해야 하는 경우에는 토픽의 파티션 수를 1로 설정한다. 하지만 단점도 존재한다. 메시지의 순서는 보장되지만 파티션 수가 하나이기 때문에 분산해서 처리할 수 없고 하나의 컨슈머에서만 처리할 수 있기 때문에 처리량이 높지 않다. 즉 처리량이 높은 카프카를 사용하지만 메시지의 순서를 보장해야 한다면 파티션 수를 하나로 만든 토픽을 사용해야 하며, 어느 정도 처리량이 떨어지는 부분은 감수해야한다.





컨슈머 그룹

카프카의 큰 장점 중 하나는, 하나의 토픽에 여러 컨슈머 그룹이 동시에 접속해 메시지를 가져 올 수 있다는 것이다. 이것은 기존의 다른 메시징큐 솔루션에서 컨슈머가 메시지를 가져가면 큐에서 삭제되어 다른 컨슈머가 가져갈 수 없다는 것과는 다른 방식인데 이 방식이 좋은 이유는 최근에 하나의 데이터를 다양한 용도로 사용하는 요구가 많아졌기 때문이다. 카프카가 이러한 기능제공이 가능한 이유는 파일시스템방식을 채택했기 이고, 컨슈머 그룹마다 각자의 오프셋을 별도로 관리하기 때문에 하나의 토픽에 두개의 컨슈머 그룹뿐만 아니라 더 많은 컨슈머 그룹이 연결되어도 다른 컨슈머 그룹에게 영향이 없이 메시지를 가져갈 수 있다. 이렇게 여러개의 컨슈머 그룹이 동시에 하나의 토픽에서 메시지를 가져갈 때는 컨슈머 그룹아이디를 서로 유일하게 설정해주어야한다.
 이전 이야기는 똑같은 데이터에 대해 요구가 다른 처리를 하기 위한 이야기 였다면, 이제 이야기 하려고하는 것은 하나의 토픽메시지를 여러개의 컨슈머가 나누어 처리하는 이야기이다.

만약 토픽에 a,b,c,d,e 라는 메시지가 들어왔고, 컨슈머는 한번에 하나의 메시지밖에 처리를 하지 못한다고 가정해보자. 그렇다면 컨슈머는 총 5번의 읽는 행위를 해야한다. 점차 메시지가 들어오는 양이 많아지면 해당 컨슈머의 처리 지연에 대한 영향은 점점 커질 것이다. 이러한 점을 해야결하기 위하여 하나의 컨슈머 그룹에 하나의 컨슈머가 아니라 여러 컨슈머를 그룹에 포함시켜 a,b,c,d,e 라는 메시지를 적절히 나누어 처리하게 하는 방법이다.

기본적으로 컨슈머 그룹 안에서 컨슈머들은 메시지를 가져오고 있는 토픽의 파티션에 대해 소유권을 공유한다. 컨슈머 그룹 내 컨슈머의 수가 보족해 프로듀서가 전송하는 메시지를 처리하지 못하는 경우에는 컨슈머를 추가해야하며, 추가 컨슈머를 동일한 컨슈머 그룹 내에 추가시키면 하나의 컨슈머가 가져오고 있던 메시지를 적절하게 나누어 가져오기 시작한다.만약 위의 그림처럼 만약 Consumer Group A가 C1만 있었고, C2라는 컨슈머를 동일한 그룹내에 추가했다면 P2,P3의 소유권이 C1에서 C2로 이동한다. 이것이 초반에 이야기 했던 리밸런스라고한다. 이렇게 리밸런스라는 기능을 통해 컨슈머를 쉽고 안전하게 추가할 수 있고 제거할 수도 있어 높은 가용성과 확장성을 확보할 수 있다. 하지만 이러한 리밸런스도 단점은 있다. 리밸런스를 하는 동안 일시적으로 컨슈머는 메시지를 가져올 수 없다. 그래서 리밸런스가 발생하면 컨슈머 그룹 전체가 일시적으로 사용할 수 없다는 단점이 있다.


위의 그림에서 Consumer Group B에 4개의 컨슈머가 있음에도 불구하고 처리가 계속 지연된다면 어떻게 될까? 이전처럼 해당 컨슈머 그룹에 컨슈머만 추가하면 될까? 아니다. 이뉴는 토픽의 파티션에는 하나의 컨슈머만 연결 할 수 있기 때문이다. 이말은 즉슨, 토픽의 파티션 수 만큼 최대 컨슈머 수가 연결될수 있는 것이다. 토픽의 파티션 수와 동일하게 컨슈머 수를 늘렸는데도 프로듀서가 보내는 메시지의 속도를 따라가지 못한다면 컨슈머만 추가하는 것이 아니라, 토픽의 파티션 수까지 늘려주고 컨슈머 수도 늘려줘야한다. 


이번에는 잘 동작하던 컨슈머 그룹 내에서 컨슈머 하나가 다운되는 경우를 생각해보자. 컨슈머가 컨슈머 그룹 안에서 멤버로 유지하고 할당된 파티션의 소유권을 유지하는 방법은 하트비트를 보내는 것이다. 반대로 생각해보면, 컨슈머가 일정한 주기로 하트비트를 보내다는 사실은 해당 파티션의 메시지를 잘 처리하고 있다는 것이다. 하트비트는 컨슈머가 poll 할때와 가져간 메시지의 오프셋을 커밋할 때 보내게 된다. 만약 컨슈머가 오랫동안 하트비트를 보내지 않으면 세션은 타임아웃되고, 해당 컨슈머가 다운되었다고 판단하여 리밸런스가 일어난다. 그 이후 다른 컨슈머가 다운된 컨슈머의 파티션의 할당 파티션을 맡게 되는 것이다.




커밋과 오프셋

컨슈머가 poll을 호출할 때마다 컨슈머 그룹은 카프카에 저장되어 있는 아직 읽지 않은 메시지를 가져온다. 이렇게 동작할 수 있는 것은 컨슈머 그룹이 메시지를 어디까지 가져갔는지 알 수 있기 때문이다. 컨슈머 그룹의 컨슈머들은 각각의 파티션에 자신이 가져간 메시지의 위치 정보(오프셋)을 기록한다. 각 파티션에 대해 현재 위치를 업데이트하는 동작을 커밋한다고 한다. 카프카는 각 컨슈머 그룹의 파티션별로 오프셋을 저장하기 위하여 __consumer_offsets 토픽을 만들고 오프셋을 저장한다. 리밸런스가 일어난 후 각각의 컨슈머는 이전에 처리했던 토픽의 다른 파티션을 할당 받게 되고 컨슈머는 새로운 파티션에 대해 가장 최근에 커밋된 오프셋을 일고 그 이후부터 메시지들을 가져오기 시작한다.


자동커밋

오프셋을 직접 관리해도 되지만, 각 파티션에 대한 오프셋 정보관리, 파티션 변경에 대한 관리 등이 매우 번거로울 수 있다. 그래서 카프카에서는 자동커밋 기능을 제공해준다. 자동 커밋을 사용하고 싶을 때는 컨슈머 옵션 중 enable.auto.commit=true로 설정하면 5초마다 컨슈머는 poll을 호출할 때 가장 마지막 오프셋을 커밋한다. 5초 주기는 기본 값이며, auto.commit.interval.ms 옵션을 통해 조정이 가능하다. 컨슈머는 poll을 요청할 때마다 커밋할 시간이 아닌지 체크하게 되고, poll 요청으로 가져온 마지막 오프셋을 커밋한다. 하지만 이 옵션을 사용할 경우 커밋 직전 리밸런스등의 작업이 일어나면 동일한 메시지에 대한 중복처리가 일어날 수 있다. 물론 자동커밋 주기를 작게 잡아 최대한 중복을 줄일 수 있지만 중복등을 완전하게 피할 수 는없다.


수동커밋

경우에 따라 자동 커밋이 아닌 수동 커밋을 사용해야하는 경우도 있다. 이러한 경우는 메시지 처리가 완료될 때까지 메시지를 가져온 것으로 간주되어서는 안되는 경우에 사용한다. 자동 커밋을 사용하는 경우라면 자동 커밋의 주기로 인해 일부 메시지들을 데이터베이스에는 자장하지 못한 상태로 컨슈머 장애가 발생한다면 해당 메시지들은 손실될 수도 있다. 이러한 경우를 방지하기 위해 컨슈머가 메시지를 가져오자마자 커밋을 하는 것이 아니라, 데이터베이스에 메시지를 저장한 후 커밋을 해서 위의 문제를 조금이나마 해결할 수 있다.(밑의 소스에서 그룹 아이디와 토픽등은 자신에 환경에 맞게 설정해주시길 바랍니다.)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import java.util.Arrays;
import java.util.Properties;
import java.time.Duration;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
 
import lombok.extern.slf4j.Slf4j;
 
/**
 * Kafka Consumer Exam
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaBookConsumer1 {
    
    public static Properties init() {
        Properties props = new Properties();
        props.put("bootstrap.servers""localhost:9092,localhost:9093,localhost:9094");
        props.put("group.id""yoon-consumer");
        props.put("enable.auto.commit""true");
        props.put("auto.offset.reset""latest");
        props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }
    
    /**
     * 수동커밋
     */
    public static void commitConsume() {
        try (KafkaConsumer<StringString> consumer = new KafkaConsumer<>(init());){
            //토픽리스트를 매개변수로 준다. 구독신청을 한다.
            consumer.subscribe(Arrays.asList("yoon"));
            while (true) {
              //컨슈머는 토픽에 계속 폴링하고 있어야한다. 아니면 브로커는 컨슈머가 죽었다고 판단하고, 해당 파티션을 다른 컨슈머에게 넘긴다.
              ConsumerRecords<StringString> records = consumer.poll(Duration.ofMillis(100));
              for (ConsumerRecord<StringString> record : records)
                log.info("Topic: {}, Partition: {}, Offset: {}, Key: {}, Value: {}\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
              
              /*
               * DB로직(CRUD) 이후 커밋.
               */
              
              consumer.commitSync(); ->수동으로 커밋하여 메시지를 가져온 것으로 간주하는 시점을 자유롭게 조정할 수있다.
            }
        } finally {}
    }
        
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        commitConsume();
    }
 
}
 
cs



특정 파티션 할당

지금까지 컨슈머는 토픽을 subscribe하고, 카프카가 컨슈머 그룹의 컨슈머들에게 직접 파티션을 공정분배했다. 하지만 특별한 경우 특정 파티션에 대해 세밀하게 제어하길 원할 수도 있다. 이럴때에는 특정 컨슈머에게 특정 파티션을 직접 할당할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.kafka.exam;
 
import java.util.Arrays;
import java.util.Properties;
import java.time.Duration;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
 
import lombok.extern.slf4j.Slf4j;
 
/**
 * Kafka Consumer Exam
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaBookConsumer1 {
    
    /**
     * 특정 파티션 할당
     */
    public static void specificPart() {
        Properties props = new Properties();
        props.put("bootstrap.servers""localhost:9092,localhost:9093,localhost:9094");
        //특정 파티션을 할당하기 위해서는 기존과는 다른 그룹아이디 값을 주어야한다. 왜냐하면 컨슈머 그룹내에 같이 있게되면 서로의
        //오프셋을 공유하기 때문에 종료된 컨슈머의 파티션을 다른 컨슈머가 할당받아 메시지를 이어서 가져가게 되고, 오프셋을 커밋하게 된다.
        props.put("group.id""specificPart");
        props.put("enable.auto.commit""false");
        props.put("auto.offset.reset""latest");
        props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
        
        String topicName = "yoon";
        
        try (KafkaConsumer<StringString> consumer = new KafkaConsumer<>(init());){
            //토픽리스트를 매개변수로 준다. 구독신청을 한다.
            TopicPartition partition0 = new TopicPartition(topicName, 0);
            TopicPartition partition1 = new TopicPartition(topicName, 1);
            consumer.assign(Arrays.asList(partition0,partition1));
            
            while (true) {
              //컨슈머는 토픽에 계속 폴링하고 있어야한다. 아니면 브로커는 컨슈머가 죽었다고 판단하고, 해당 파티션을 다른 컨슈머에게 넘긴다.
              ConsumerRecords<StringString> records = consumer.poll(Duration.ofMillis(100));
              for (ConsumerRecord<StringString> record : records)
                log.info("Topic: {}, Partition: {}, Offset: {}, Key: {}, Value: {}\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
              
              /*
               * DB로직(CRUD) 이후 커밋.
               */
              
              consumer.commitSync();
            }
        } finally {}
    }
    
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        specificPart();
    }
 
}
 
cs

특정 오프셋부터 메시지 가져오기


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package com.kafka.exam;
 
import java.util.Arrays;
import java.util.Properties;
import java.time.Duration;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
 
import lombok.extern.slf4j.Slf4j;
 
/**
 * Kafka Consumer Exam
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaBookConsumer1 {
    
    /**
     * 특정파티션에서 특정 오프셋의 메시지 가져오기
     */
    public static void specificOffset() {
        Properties props = new Properties();
        props.put("bootstrap.servers""localhost:9092,localhost:9093,localhost:9094");
        //특정 파티션을 할당하기 위해서는 기존과는 다른 그룹아이디 값을 주어야한다. 왜냐하면 컨슈머 그룹내에 같이 있게되면 서로의
        //오프셋을 공유하기 때문에 종료된 컨슈머의 파티션을 다른 컨슈머가 할당받아 메시지를 이어서 가져가게 되고, 오프셋을 커밋하게 된다.
        props.put("group.id""specificPartAndOffset");
        props.put("enable.auto.commit""false");
        props.put("auto.offset.reset""latest");
        props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
        
        String topicName = "yoon";
        
        try (KafkaConsumer<StringString> consumer = new KafkaConsumer<>(init());){
            //토픽리스트를 매개변수로 준다. 구독신청을 한다.
            TopicPartition partition0 = new TopicPartition(topicName, 0);
            TopicPartition partition1 = new TopicPartition(topicName, 1);
            consumer.assign(Arrays.asList(partition0,partition1));
            //0,1번 파티션의 2번 오프셋의 메시지를 가져와라
            consumer.seek(partition0, 2);
            consumer.seek(partition1, 2);
            while (true) {
              //컨슈머는 토픽에 계속 폴링하고 있어야한다. 아니면 브로커는 컨슈머가 죽었다고 판단하고, 해당 파티션을 다른 컨슈머에게 넘긴다.
              ConsumerRecords<StringString> records = consumer.poll(Duration.ofMillis(100));
              for (ConsumerRecord<StringString> record : records)
                log.info("Topic: {}, Partition: {}, Offset: {}, Key: {}, Value: {}\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
              
              /*
               * DB로직(CRUD) 이후 커밋.
               */
              
              consumer.commitSync();
            }
        } finally {}
    }
    
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        specificOffset();
    }
 
}
 
cs


여기까지 자바소스로 다루어본 카프카 컨슈머 예제들이었습니다. 부족한 점이 많습니다. 혹시나 잘못된 점이 있다면 지적해주시면 감사하겠습니다!

posted by 여성게
: