Middleware/Kafka&RabbitMQ 2019. 3. 24. 13:54

Kafka - Kafka Consumer(카프카 컨슈머) Java&CLI



이전 포스팅에서 kafka producer를 java 소스기반으로 예제를 짜보았습니다. 이번 포스팅은 kafka consumer를 java 소스로 다루어보려고 합니다.

Kafka Producer(카프카 프로듀서)가 메시지를 생산해서 카프카의 토픽으로 메시지를 보내면 그 토픽의 메시지를 가져와서 소비(consume)하는 역할을 하는 애플리케이션, 서버 등을 지칭하여 컨슈머라고 한다. 컨슈머의 주요 기능은 특정 파티션을 관리하고 있는 파티션 리더에게 메시지를 가져오기 요청을 하는 것이다. 각 요청은 컨슈머가 메시지 오프셋을 명시하고 그 위치로부터 메시지를 수신한다. 그래서 컨슈머는 가져올 메시지의 위치를 조정할 수 있고, 필요하다면 이미 가져온 데이터도 다시 가져올 수 있다. 이렇게 이미 가져온 메시지를 다시 가져올 수 있는 기능은 여타 다른 메시지 시스템(래빗엠큐,RabbitMQ)들은 제공하지 않는 기능이다.(내부 구동방식이 다른 이유도 있음) 최근의 메시지큐 솔루션 사용자들에게 이러한 기능은 필수가 되고 있다. 

카프카에서 컨슈머라고 불리는 컨슈머는 두 가지 종류가 있는데, 올드 컨슈머(Old Consumer)와 뉴 컨슈머(New Consumer)이다. 두 컨슈머의 큰 차이점은 오프셋에 대한 주키퍼 사용 유무이다. 구 버전의 카프카에서는 컨슈머의 오프셋을 주키퍼의 znode에 저장하는 방식을 지원하다가 카프카 0.9버전부터 컨슈머의 오프셋 저장을 주키퍼가 아닌 카프카의 도픽에 저장하는 방식으로 변경되었다. 아마 성능상의 이슈때문에 오프셋 저장 정책을 바꾼것 같다. 두가지 방식을 특정 버전이전까지는 지원하겠지만 아마 추후에는 후자(뉴 컨슈머)의 방식으로 변경되지 않을까싶다. 코드 레벨로 카프카 컨슈머를 다루기전 카프카 컨슈머의 주요 옵션을 본다.




컨슈머 주요 옵션(Consumer option)

-bootstrap.servers : 카프카 클러스터에 처음 연결을 하기 위한 호스트와 포트 정보로 구성된 리스트 정보이다.


-fetch.min.bytes : 한번에 가져올 수 있는 최소 데이터 사이즈이다. 만약 지정한 사이즈보다 작은 경우, 요청에 대해 응답하지 않고 데이터가 누적될 때까지 기다린다.


-group.id : 컨슈머가 속한 컨슈머 그룹을 식별하는 식별자이다.


-enable.auto.commit : 백그라운드에서 주기적으로 오프셋을 자동 커밋한다.


-auto.offset.reset : 카프카에서 초기 오프셋이 없거나 현재 오프셋이 더 이상 존재하지 않은 경우에 다음 옵션으로 리셋한다.

1)earliest : 가장 초기의 오프셋값으로 설정

2)latest : 가장 마지막의 오프셋값으로 설정

3)none : 이전 오프셋값을 찾지 못하면 에러를 발생


-fetch.max.bytes : 한번에 가져올 수 있는 최대 데이터 사이즈


-request.timeout.ms : 요청에 대해 응답을 기다리는 최대 시간


-session.timeout.ms : 컨슈머와 브로커사이의 세션 타임 아웃시간. 브로커가 컨슈머가 살아있는 것으로 판단하는 시간(기본값 10초) 만약 컨슈머가 그룹 코디네이터에게 하트비트를 보내지 않고 session.timeout.ms이 지나면 해당 컨슈머는 종료되거나 장애가 발생한 것으로 판단하고 컨슈머 그룹은 리밸런스(rebalance)를 시도한다. session.timeout.ms는 하트비트 없이 얼마나 오랫동안 컨슈머가 있을 수 있는지를 제어하는 시간이며, 이 속성은 heartbeat.interval.ms와 밀접한 관련이 있다. session.timeout.ms를 기본값보다 낮게 설정하면 실패를 빨리 감지 할 수 있지만, GC나 poll 루프를 완료하는 시간이 길어지게 되면 원하지 않게 리밸런스가 일어나기도 한다. 반대로는 리밸런스가 일어날 가능성은 줄지만 실제 오류를 감지하는 데 시간이 오래 걸릴 수 있다.


-hearbeat.interval.ms : 그룹 코디네이터에게 얼마나 자주 KafkaConsumer poll() 메소드로 하트비트를 보낼 것인지 조정한다. session.timeout.ms와 밀접한 관계가 있으며 session.timeout.ms보다 낮아야한다. 일반적으로 1/3 값정도로 설정한다.(기본값 3초)


-max.poll.records : 단일 호출 poll()에 대한 최대 레코드 수를 조정한다. 이 옵션을 통해 애플리케이션이 폴링 루프에서 데이터를 얼마나 가져올지 양을 조정할 수 있다.


-max.poll.interval.ms : 컨슈머가 살아있는지를 체크하기 위해 하트비트를 주기적으로 보내는데, 컨슈머가 계속해서 하트비트만 보내고 실제로 메시지를 가져가지 않는 경우가 있을 수도 있다. 이러한 경우 컨슈머가 무한정 해당 파티션을 점유할 수 없도록 주기적으로 poll을 호출하지 않으면 장애라고 판단하고 컨슈머 그룹에서 제외한 후 다른 컨슈머가 해당 파티션에서 메시지를 가져갈 수 있게한다.


-auto.commit.interval.ms : 주기적으로 오프셋을 커밋하는 시간


-fetch.max.wait.ms : fetch.min.bytes에 의해 설정된 데이터보다 적은 경우 요청에 응답을 기다리는 최대 시간


기타 많은 설정들이 있다. 혹시나 인증과 ssl 등의 다양한 설정들을 알고 싶다면 공식 홈페이지를 참고하길 바란다.(https://kafka.apache.org/documentation/#consumerconfigs)




콘솔 컨슈머로 메시지 가져오기


카프카는 기본적으로 콘솔로 메시지를 가져올 수 있는 명령어를 제공한다. 우선 진행하기 앞서 예제로 진행하려는 환경은 이전 포스팅에서 구성해보았던 카프카 클러스터 환경에서 진행한다. 만약 카프카 클러스터 환경을 구성해본적이 없다면 밑의 링크를 참고하기 바란다.


▶︎▶︎▶︎2019/03/13 - [Kafka&RabbitMQ] - Kafka - Kafka(카프카) cluster(클러스터) 구성 및 간단한 CLI사용법


클러스터 환경 구축 후에 예제로 토픽하나를 생성했다는 가정하게 진행한다.



Kafka console producer로 접속하여 메시지를 보내보면 위의 컨슈머로 메시지가 전달 될 것이다. 이러한 컨슈머를 실행할 때는 항상 컨슈머 그룹이라는 것이 필요하다. 토픽의 메시지를 가져오기 위한 kafka-console-consumer.sh 명령어를 실행하면서 추가 옵션으로 컨슈머 그룹 이름을 지정해야 하는데, 만약 추가 옵션을 주지 않고 실행한 경우에는 자동으로 console-consumer-xxxxx(숫자)로 컨슈머 그룹이 생성된다.



필자가 예제로 생성한 그룹아이디도 보인다. 이렇게 그룹아이디 리스트를 볼 수 있는 명령어도 제공한다. 혹시나 모르시는 분들을 위해 토픽 리스트를 볼 수 있는 명령어도 있다.



보면 필자가 사용하고 있는 카프카 버전은 뉴 컨슈머를 이용하는 것을 알 수 있다. 바로 메시지 오프셋을 저장하기 위한 __consumer_offsets가 토픽에 존재하기 때문이다.



또한 컨슈머를 실행시킬 때에 위에서 설명한 것과 같이 그룹아이디 값 옵션을 추가하여 실행시킬 수도 있다.



자바코드를 이용한 컨슈머


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import java.util.Arrays;
import java.util.Properties;
import java.time.Duration;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
 
import lombok.extern.slf4j.Slf4j;
 
/**
 * Kafka Consumer Exam
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaBookConsumer1 {
    
    public static Properties init() {
        Properties props = new Properties();
        props.put("bootstrap.servers""localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put("group.id""exam-consumer-group");
        props.put("enable.auto.commit""true");
        props.put("auto.offset.reset""latest");
        props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }
    
    /**
     * 컨슈머예제
     */
    public static void consume() {
        
        try (KafkaConsumer<StringString> consumer = new KafkaConsumer<>(init());){
            //토픽리스트를 매개변수로 준다. 구독신청을 한다.
            consumer.subscribe(Arrays.asList("test-topic"));
            while (true) {
              //컨슈머는 토픽에 계속 폴링하고 있어야한다. 아니면 브로커는 컨슈머가 죽었다고 판단하고, 해당 파티션을 다른 컨슈머에게 넘긴다.
              ConsumerRecords<StringString> records = consumer.poll(Duration.ofMillis(100));
              for (ConsumerRecord<StringString> record : records)
                log.info("Topic: {}, Partition: {}, Offset: {}, Key: {}, Value: {}\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
            }
        } finally {}
    }
        
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        consume();
    }
 
}
 
cs


소스에 대해 간단히 설명하면 우선 컨슈머 옵션들을 Properties 객체로 정의한다. 오프셋 리셋 옵션을 lastest로 주어 토픽의 가장 마지막부터 메시지를 가져온다. 메시지의 키와 값의 역직렬화 옵션을 문자열로 해준다. 이렇게 컨슈머 옵션을 설정하고 해당 옵션값을 이용하여 KafkaConsumer 객체를 생성해준다. 해당 객체는 사용이 후에 꼭 close 해줘야 하기 때문에 try문 안에 객체 생성코드를 넣어놔 자동으로 자원반납을 할 수 있게 해주었다. 그리고 해당 컨슈머 객체로 토픽에 대해 구독신청을 해준다. 구독 대상이 되는 토픽은 리스트로 여러개 지정가능하다. 이것 또한 카프카의 장점중 하나이다.(하나의 컨슈머가 여러 메시지큐를 구독하는 것)

그리고 해당 컨슈머는 계속해서 폴링한다. 무한 루프로 계속해서 폴링하지 않으면 컨슈머가 종료된 것으로 간주되어 컨슈머에 할당된 파티션은 다른 컨슈머에게 전달되고 다른 컨슈머에 의해 메시지가 컨슘된다. poll() 메소드의 매개변수는 타임아웃 주기이다. 이 타임아웃주기는 데이터가 컨슈머 버퍼에 없다면 poll()은 얼마 동안 블록할지를 조정하는 것이다. 또한 poll()은 레코드 전체를 리턴하고, 레코드에는 토픽,파티션,파티션의 오프셋,키,값을 포함하고 있다. 한 번에 하나의 메시지만 가져오는 것이 아니라 여러개의 메시지를 가져오기 때문에 for-each로 데이터들을 처리하고 있다.




메시지의 순서


파티션이 3개인 토픽을 새로 만들어서 해당 토픽으로 메시지를 a,b,c,d,e 순서대로 보내보았다. 그리고 컨슈머 실행후 결과를 보니 a,d,b,e,c 순서대로 메시지가 들어왔다. 컨슈머에 문제가 있는 것인가? 아니면 진짜 내부적으로 문제가 있어서 순서가 보장되지 않은 것인가 궁금할 수 있다. 하지만 지극히 정상인 결과값이다.


먼저 해당 토픽에 메시지가 어떻게 저장되어 있는지 부터 확인해봐야 한다. 해당 토픽은 파티션이 3개로 구성되어 있기 때문에 각 파티션별로 메시지가 어떻게 저장되어 있는지 확인해봐야한다.



0번 파티션에는 b,e 1번 파티션에는 a,d 2번 파티션에는 c 데이터가 저장되어 있는 것을 볼수 있다. 즉, 컨슈머는 프로듀서가 어떤 순서대로 메시지를 보내는지 알수 없다. 단지 파티션의 오프셋 기준으로만 메시지를 가져올 뿐이다. 이말은 무엇이냐면, 카프카 컨슈머에서의 메시지 순서는 동일한 파티션내에서만 유지되고 파티션끼리의 메시지 순서는 보장하지 않는 것이다. 


카프카를 사용하면서 메시지의 순서를 보장해야 하는 경우에는 토픽의 파티션 수를 1로 설정한다. 하지만 단점도 존재한다. 메시지의 순서는 보장되지만 파티션 수가 하나이기 때문에 분산해서 처리할 수 없고 하나의 컨슈머에서만 처리할 수 있기 때문에 처리량이 높지 않다. 즉 처리량이 높은 카프카를 사용하지만 메시지의 순서를 보장해야 한다면 파티션 수를 하나로 만든 토픽을 사용해야 하며, 어느 정도 처리량이 떨어지는 부분은 감수해야한다.





컨슈머 그룹

카프카의 큰 장점 중 하나는, 하나의 토픽에 여러 컨슈머 그룹이 동시에 접속해 메시지를 가져 올 수 있다는 것이다. 이것은 기존의 다른 메시징큐 솔루션에서 컨슈머가 메시지를 가져가면 큐에서 삭제되어 다른 컨슈머가 가져갈 수 없다는 것과는 다른 방식인데 이 방식이 좋은 이유는 최근에 하나의 데이터를 다양한 용도로 사용하는 요구가 많아졌기 때문이다. 카프카가 이러한 기능제공이 가능한 이유는 파일시스템방식을 채택했기 이고, 컨슈머 그룹마다 각자의 오프셋을 별도로 관리하기 때문에 하나의 토픽에 두개의 컨슈머 그룹뿐만 아니라 더 많은 컨슈머 그룹이 연결되어도 다른 컨슈머 그룹에게 영향이 없이 메시지를 가져갈 수 있다. 이렇게 여러개의 컨슈머 그룹이 동시에 하나의 토픽에서 메시지를 가져갈 때는 컨슈머 그룹아이디를 서로 유일하게 설정해주어야한다.
 이전 이야기는 똑같은 데이터에 대해 요구가 다른 처리를 하기 위한 이야기 였다면, 이제 이야기 하려고하는 것은 하나의 토픽메시지를 여러개의 컨슈머가 나누어 처리하는 이야기이다.

만약 토픽에 a,b,c,d,e 라는 메시지가 들어왔고, 컨슈머는 한번에 하나의 메시지밖에 처리를 하지 못한다고 가정해보자. 그렇다면 컨슈머는 총 5번의 읽는 행위를 해야한다. 점차 메시지가 들어오는 양이 많아지면 해당 컨슈머의 처리 지연에 대한 영향은 점점 커질 것이다. 이러한 점을 해야결하기 위하여 하나의 컨슈머 그룹에 하나의 컨슈머가 아니라 여러 컨슈머를 그룹에 포함시켜 a,b,c,d,e 라는 메시지를 적절히 나누어 처리하게 하는 방법이다.

기본적으로 컨슈머 그룹 안에서 컨슈머들은 메시지를 가져오고 있는 토픽의 파티션에 대해 소유권을 공유한다. 컨슈머 그룹 내 컨슈머의 수가 보족해 프로듀서가 전송하는 메시지를 처리하지 못하는 경우에는 컨슈머를 추가해야하며, 추가 컨슈머를 동일한 컨슈머 그룹 내에 추가시키면 하나의 컨슈머가 가져오고 있던 메시지를 적절하게 나누어 가져오기 시작한다.만약 위의 그림처럼 만약 Consumer Group A가 C1만 있었고, C2라는 컨슈머를 동일한 그룹내에 추가했다면 P2,P3의 소유권이 C1에서 C2로 이동한다. 이것이 초반에 이야기 했던 리밸런스라고한다. 이렇게 리밸런스라는 기능을 통해 컨슈머를 쉽고 안전하게 추가할 수 있고 제거할 수도 있어 높은 가용성과 확장성을 확보할 수 있다. 하지만 이러한 리밸런스도 단점은 있다. 리밸런스를 하는 동안 일시적으로 컨슈머는 메시지를 가져올 수 없다. 그래서 리밸런스가 발생하면 컨슈머 그룹 전체가 일시적으로 사용할 수 없다는 단점이 있다.


위의 그림에서 Consumer Group B에 4개의 컨슈머가 있음에도 불구하고 처리가 계속 지연된다면 어떻게 될까? 이전처럼 해당 컨슈머 그룹에 컨슈머만 추가하면 될까? 아니다. 이뉴는 토픽의 파티션에는 하나의 컨슈머만 연결 할 수 있기 때문이다. 이말은 즉슨, 토픽의 파티션 수 만큼 최대 컨슈머 수가 연결될수 있는 것이다. 토픽의 파티션 수와 동일하게 컨슈머 수를 늘렸는데도 프로듀서가 보내는 메시지의 속도를 따라가지 못한다면 컨슈머만 추가하는 것이 아니라, 토픽의 파티션 수까지 늘려주고 컨슈머 수도 늘려줘야한다. 


이번에는 잘 동작하던 컨슈머 그룹 내에서 컨슈머 하나가 다운되는 경우를 생각해보자. 컨슈머가 컨슈머 그룹 안에서 멤버로 유지하고 할당된 파티션의 소유권을 유지하는 방법은 하트비트를 보내는 것이다. 반대로 생각해보면, 컨슈머가 일정한 주기로 하트비트를 보내다는 사실은 해당 파티션의 메시지를 잘 처리하고 있다는 것이다. 하트비트는 컨슈머가 poll 할때와 가져간 메시지의 오프셋을 커밋할 때 보내게 된다. 만약 컨슈머가 오랫동안 하트비트를 보내지 않으면 세션은 타임아웃되고, 해당 컨슈머가 다운되었다고 판단하여 리밸런스가 일어난다. 그 이후 다른 컨슈머가 다운된 컨슈머의 파티션의 할당 파티션을 맡게 되는 것이다.




커밋과 오프셋

컨슈머가 poll을 호출할 때마다 컨슈머 그룹은 카프카에 저장되어 있는 아직 읽지 않은 메시지를 가져온다. 이렇게 동작할 수 있는 것은 컨슈머 그룹이 메시지를 어디까지 가져갔는지 알 수 있기 때문이다. 컨슈머 그룹의 컨슈머들은 각각의 파티션에 자신이 가져간 메시지의 위치 정보(오프셋)을 기록한다. 각 파티션에 대해 현재 위치를 업데이트하는 동작을 커밋한다고 한다. 카프카는 각 컨슈머 그룹의 파티션별로 오프셋을 저장하기 위하여 __consumer_offsets 토픽을 만들고 오프셋을 저장한다. 리밸런스가 일어난 후 각각의 컨슈머는 이전에 처리했던 토픽의 다른 파티션을 할당 받게 되고 컨슈머는 새로운 파티션에 대해 가장 최근에 커밋된 오프셋을 일고 그 이후부터 메시지들을 가져오기 시작한다.


자동커밋

오프셋을 직접 관리해도 되지만, 각 파티션에 대한 오프셋 정보관리, 파티션 변경에 대한 관리 등이 매우 번거로울 수 있다. 그래서 카프카에서는 자동커밋 기능을 제공해준다. 자동 커밋을 사용하고 싶을 때는 컨슈머 옵션 중 enable.auto.commit=true로 설정하면 5초마다 컨슈머는 poll을 호출할 때 가장 마지막 오프셋을 커밋한다. 5초 주기는 기본 값이며, auto.commit.interval.ms 옵션을 통해 조정이 가능하다. 컨슈머는 poll을 요청할 때마다 커밋할 시간이 아닌지 체크하게 되고, poll 요청으로 가져온 마지막 오프셋을 커밋한다. 하지만 이 옵션을 사용할 경우 커밋 직전 리밸런스등의 작업이 일어나면 동일한 메시지에 대한 중복처리가 일어날 수 있다. 물론 자동커밋 주기를 작게 잡아 최대한 중복을 줄일 수 있지만 중복등을 완전하게 피할 수 는없다.


수동커밋

경우에 따라 자동 커밋이 아닌 수동 커밋을 사용해야하는 경우도 있다. 이러한 경우는 메시지 처리가 완료될 때까지 메시지를 가져온 것으로 간주되어서는 안되는 경우에 사용한다. 자동 커밋을 사용하는 경우라면 자동 커밋의 주기로 인해 일부 메시지들을 데이터베이스에는 자장하지 못한 상태로 컨슈머 장애가 발생한다면 해당 메시지들은 손실될 수도 있다. 이러한 경우를 방지하기 위해 컨슈머가 메시지를 가져오자마자 커밋을 하는 것이 아니라, 데이터베이스에 메시지를 저장한 후 커밋을 해서 위의 문제를 조금이나마 해결할 수 있다.(밑의 소스에서 그룹 아이디와 토픽등은 자신에 환경에 맞게 설정해주시길 바랍니다.)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import java.util.Arrays;
import java.util.Properties;
import java.time.Duration;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
 
import lombok.extern.slf4j.Slf4j;
 
/**
 * Kafka Consumer Exam
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaBookConsumer1 {
    
    public static Properties init() {
        Properties props = new Properties();
        props.put("bootstrap.servers""localhost:9092,localhost:9093,localhost:9094");
        props.put("group.id""yoon-consumer");
        props.put("enable.auto.commit""true");
        props.put("auto.offset.reset""latest");
        props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }
    
    /**
     * 수동커밋
     */
    public static void commitConsume() {
        try (KafkaConsumer<StringString> consumer = new KafkaConsumer<>(init());){
            //토픽리스트를 매개변수로 준다. 구독신청을 한다.
            consumer.subscribe(Arrays.asList("yoon"));
            while (true) {
              //컨슈머는 토픽에 계속 폴링하고 있어야한다. 아니면 브로커는 컨슈머가 죽었다고 판단하고, 해당 파티션을 다른 컨슈머에게 넘긴다.
              ConsumerRecords<StringString> records = consumer.poll(Duration.ofMillis(100));
              for (ConsumerRecord<StringString> record : records)
                log.info("Topic: {}, Partition: {}, Offset: {}, Key: {}, Value: {}\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
              
              /*
               * DB로직(CRUD) 이후 커밋.
               */
              
              consumer.commitSync(); ->수동으로 커밋하여 메시지를 가져온 것으로 간주하는 시점을 자유롭게 조정할 수있다.
            }
        } finally {}
    }
        
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        commitConsume();
    }
 
}
 
cs



특정 파티션 할당

지금까지 컨슈머는 토픽을 subscribe하고, 카프카가 컨슈머 그룹의 컨슈머들에게 직접 파티션을 공정분배했다. 하지만 특별한 경우 특정 파티션에 대해 세밀하게 제어하길 원할 수도 있다. 이럴때에는 특정 컨슈머에게 특정 파티션을 직접 할당할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.kafka.exam;
 
import java.util.Arrays;
import java.util.Properties;
import java.time.Duration;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
 
import lombok.extern.slf4j.Slf4j;
 
/**
 * Kafka Consumer Exam
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaBookConsumer1 {
    
    /**
     * 특정 파티션 할당
     */
    public static void specificPart() {
        Properties props = new Properties();
        props.put("bootstrap.servers""localhost:9092,localhost:9093,localhost:9094");
        //특정 파티션을 할당하기 위해서는 기존과는 다른 그룹아이디 값을 주어야한다. 왜냐하면 컨슈머 그룹내에 같이 있게되면 서로의
        //오프셋을 공유하기 때문에 종료된 컨슈머의 파티션을 다른 컨슈머가 할당받아 메시지를 이어서 가져가게 되고, 오프셋을 커밋하게 된다.
        props.put("group.id""specificPart");
        props.put("enable.auto.commit""false");
        props.put("auto.offset.reset""latest");
        props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
        
        String topicName = "yoon";
        
        try (KafkaConsumer<StringString> consumer = new KafkaConsumer<>(init());){
            //토픽리스트를 매개변수로 준다. 구독신청을 한다.
            TopicPartition partition0 = new TopicPartition(topicName, 0);
            TopicPartition partition1 = new TopicPartition(topicName, 1);
            consumer.assign(Arrays.asList(partition0,partition1));
            
            while (true) {
              //컨슈머는 토픽에 계속 폴링하고 있어야한다. 아니면 브로커는 컨슈머가 죽었다고 판단하고, 해당 파티션을 다른 컨슈머에게 넘긴다.
              ConsumerRecords<StringString> records = consumer.poll(Duration.ofMillis(100));
              for (ConsumerRecord<StringString> record : records)
                log.info("Topic: {}, Partition: {}, Offset: {}, Key: {}, Value: {}\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
              
              /*
               * DB로직(CRUD) 이후 커밋.
               */
              
              consumer.commitSync();
            }
        } finally {}
    }
    
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        specificPart();
    }
 
}
 
cs

특정 오프셋부터 메시지 가져오기


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package com.kafka.exam;
 
import java.util.Arrays;
import java.util.Properties;
import java.time.Duration;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
 
import lombok.extern.slf4j.Slf4j;
 
/**
 * Kafka Consumer Exam
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaBookConsumer1 {
    
    /**
     * 특정파티션에서 특정 오프셋의 메시지 가져오기
     */
    public static void specificOffset() {
        Properties props = new Properties();
        props.put("bootstrap.servers""localhost:9092,localhost:9093,localhost:9094");
        //특정 파티션을 할당하기 위해서는 기존과는 다른 그룹아이디 값을 주어야한다. 왜냐하면 컨슈머 그룹내에 같이 있게되면 서로의
        //오프셋을 공유하기 때문에 종료된 컨슈머의 파티션을 다른 컨슈머가 할당받아 메시지를 이어서 가져가게 되고, 오프셋을 커밋하게 된다.
        props.put("group.id""specificPartAndOffset");
        props.put("enable.auto.commit""false");
        props.put("auto.offset.reset""latest");
        props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
        
        String topicName = "yoon";
        
        try (KafkaConsumer<StringString> consumer = new KafkaConsumer<>(init());){
            //토픽리스트를 매개변수로 준다. 구독신청을 한다.
            TopicPartition partition0 = new TopicPartition(topicName, 0);
            TopicPartition partition1 = new TopicPartition(topicName, 1);
            consumer.assign(Arrays.asList(partition0,partition1));
            //0,1번 파티션의 2번 오프셋의 메시지를 가져와라
            consumer.seek(partition0, 2);
            consumer.seek(partition1, 2);
            while (true) {
              //컨슈머는 토픽에 계속 폴링하고 있어야한다. 아니면 브로커는 컨슈머가 죽었다고 판단하고, 해당 파티션을 다른 컨슈머에게 넘긴다.
              ConsumerRecords<StringString> records = consumer.poll(Duration.ofMillis(100));
              for (ConsumerRecord<StringString> record : records)
                log.info("Topic: {}, Partition: {}, Offset: {}, Key: {}, Value: {}\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
              
              /*
               * DB로직(CRUD) 이후 커밋.
               */
              
              consumer.commitSync();
            }
        } finally {}
    }
    
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        specificOffset();
    }
 
}
 
cs


여기까지 자바소스로 다루어본 카프카 컨슈머 예제들이었습니다. 부족한 점이 많습니다. 혹시나 잘못된 점이 있다면 지적해주시면 감사하겠습니다!

posted by 여성게
: