Middleware/Kafka&RabbitMQ 2019. 7. 6. 15:20

 

2019/07/06 - [Kafka&RabbitMQ] - Apache Kafka - Kafka(카프카)란 ? 분산 메시징 플랫폼 - 1

 

Apache Kafka - Kafka(카프카)란 ? 분산 메시징 플랫폼 - 1

이전 포스팅들에서 이미 카프카란 무엇이고, 카프카 프로듀서부터 컨슈머, 스트리밍까지 다루어보았다. 하지만 이번 포스팅을 시작으로 조금더 내용을 다듬고 정리된 상태의 풀세트의 카프카 포스팅을 시작할 것..

coding-start.tistory.com

이전 포스팅에서 간단히 카프카란 무엇이며 카프카의 요소들에 대해 다루어보았다. 이번 포스팅에는 이전에 소개했던 요소중 카프카 프로듀서에 대해 다루어볼 것이다.

 

카프카 프로듀서란 카프카 클러스터에 대해 데이터를 pub, 기록하는 애플리케이션이다.

 

카프카 프로듀서의 내부 구조

우리는 카프카 프로듀서를 사용하기 위해 추상화된 API를 사용한다. 하지만 우리가 사용하는 API는 내부적으로 많은 절차에 따라 행동을 한다. 이러한 프로듀서가 책임지는 역할들은 아래와 같다.

 

  • 카프카 브로커 URL 부트스트랩 : 카프카 프로듀서는 카프카 클러스터에 대한 메타데이터를 가져오기 위해 최소 하나 이상의 브로커에 연결한다. 프로듀서가 연결하길 원하는 첫 번째 브로커가 다운될 경우를 대비하여 보통 한개 이상의 브로커 URL 리스트를 설정한다.
  • 데이터 직렬화 : 카프카는 TCP 기반의 데이터 송수신을 위해 이진 프로토콜을 사용한다. 이는 카프카에 데이터를 기록할 때, 프로듀서는 미리 설정한 직렬화 클래스를 이용해 직렬화 한 이후에 전송한다. 즉, 카프카 프로듀서는 네트워크 상의 브로커들에게 데이터를 보낵기 전에 모든 메시지 데이터 객체를 바이트 배열로 직렬화한다.
  • 토픽 파티션의 결정 :  어떤 파티션으로 데이터가 전송돼야 하는지 결정하는 일은 카프카 프로듀서의 책임이다. 만약 프로듀서가 데이터를 보내기 위해 API에 파티션을 명시하지 않은 경우 파티셔너에 의해 키의 해시값을 이용해 파티션을 결정하고 데이터를 보낸다. 하지만 키값이 존재하지 않으면 라운드 로빈 방식으로 데이터를 전송한다.
  • 처리 실패/재시도 : 처리 실패 응답이나 재시도 횟수는 프로듀서 애플리케이션에 의해 제어된다.
  • 배치처리 : 호율적인 메시지 전송을 위해서 배치는 매우 유용하다. 미리 설정한 바이트 임계치를 넘지 않으면 버퍼에 데이터를 유지하고 임계치를 넘으면 데이터를 토픽으로 전송한다.

 

카프카 프로듀서의 내부 흐름을 설명하면, 프로듀서는 키/값 쌍으로 이루어진 데이터 객체를 전달 받는다. 그리고 해당 객체를 미리 정의된 시리얼라이저 혹은 사용자 정의 시리얼라이저를 이용해 직렬화 한다. 데이터 직렬화 후 데이터가 전송될 파티션을 결정한다. 파티션 정보가 API 호출에 포함되어 있다면 해당 파티션에 직접 데이터를 보내지만 파티션정보가 포함되지 않았다면 데이터 객체의 키값의 해시를 이용해 파티셔너가 데이터를 보낼 파티션을 결정해준다. 만약 키값이 null이라면 라운드 로빈 방식으로 파티션을 결정한다. 파티션 결정이 끝나면 리더 브로커를 이용하여 데이터 전송 요청을 보낸다.

 

카프카 프로듀서 API

카프카 프로듀서 API를 사용하기 위한 필수파라미터들이다.

 

  • bootstrap.servers : 카프카 브로커 주소의 목록을 설정한다. 주소는 hostname:port 형식으로 지정되며 하나 이상의 브로커 리스트를 지정한다.
  • key.serializer : 메시지는 키 값의 쌍으로 이뤄진 형태로 카프카 브로커에게 전송된다. 브로커는 이 키값이 바이트 배열로 돼있다고 가정한다. 그래서 프로듀서에게 어떠한 직렬화 클래스가 키값을 바이트 배열로 변환할때 사용됬는 지 알려주어야한다. 카프카는 내장된 직렬화 클래스를 제공한다.(ByteArraySerializer, StringSerializer, IntegerSerializer / org.apache.kafka.common.serialization)
  • value.serializer : key.serializer 속성과 유사하지만 이 속성은 프로듀서가 값을 직렬화 하기 위해 사용하는 클래스를 알려준다.

 

1
2
3
4
5
6
7
8
9
10
11
public KafkaProducer<StringString> getProducer(){
        
        Properties producerProps = new Properties();
        
        producerProps.put("bootstrap.servers""localhost:9092,localhost:9093");
        producerProps.put("key.serializer""org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer""org.apache.kafka.common.serialization.StringSerializer");
                
        return new KafkaProducer<>(producerProps);
        
}
cs

 

자바 클래스로 위의 필수 설정들을 사용하여 카프카 프로듀서를 생성하는 코드이다. Properties 객체를 이용해 모든 설정 값을 설정해주고 카프카 프로듀서 객체 생성자의 매개변수로 해당 설정을 넣어준다.

 

 

카프카 프로듀서 객체와 ProducerRecord 객체

프로듀서는 메시지 전송을 위해 ProducerRecord 객체를 이용한다. 해당 ProducerRecord는 토픽이름, 파티션 번호, 타임스탬프, 키, 값 등을 포함하는 객체이다. 파티션 번호, 타임스탬프, 키 등은 선택 파라미터 이지만, 데이터를 보낼 토픽과 데이터 값을 반드시 포함해야한다. 그리고 보낼 파티션 선정에는 3가지 방법이 존재한다.

 

  1. 파티션 번호가 지정되면, 지정된 파티션에 데이터를 전송한다.
  2. 파티션이 지정되지 않고 키값이 존재하는 경우 키의 해시 값을 이용하여 파티션을 정한다.
  3. 키와 파티션 모두 지정되지 않은 경우 파티션은 라운드 로빈 방식으로 할당된다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
public void sendMessageSync(String topicName, String data) throws InterruptedException, ExecutionException {
        
        try(KafkaProducer<StringString> producer = getProducer();){
        
            ProducerRecord<StringString> producerRecord = new ProducerRecord<StringString>(topicName, data);
            
            Future<RecordMetadata> recordMetadata = producer.send(producerRecord);
            
            RecordMetadata result = recordMetadata.get();
            
        }
 
}
cs

 

자바 코드를 이용해 데이터 객체를 생성한 후에 동기식으로 토픽에 데이터를 보내는 코드이다.

 

1
2
3
4
ProducerRecord(String topicName, Integer partition, K key, V value)
ProducerRecord(String topicName, Integer partition, Long timestamp, K key, V value)
ProducerRecord(String topicName, K key, V value)
ProducerRecord(String topicName, V value)
cs

 

ProducerRecord의 생성자 리스트의 일부를 보여준다.

 

일단 send()를 사용해 데이터를 전송하면 브로커는 파티션 로그에 메시지를 기록하고, 메시지에 대한 서버 응답의 메타데이터를 포함한 RecordMetadata를 반환하는데, 이 객체는 오프셋,체크섬,타임스탬프,토픽,serializedKeySize 등을 포함한다.

 

앞에서 일반적인 메시지 게시 유형에 대해 설명했지만, 메시지 전송은 동기 또는 비동기로 수행될 수 있다.

 

  • 동기 메시징 : 프로듀서는 메시지를 보내고 브로커의 회신을 기다린다. 카프카 브로커는 오류 또는 RecordMetadata를 보낸다. 일반적으로 카프카는 어떤 연결 오류가 발생하면 재전송을 시도한다. 그러나 직렬화, 메시지 등에 관련된 오류는 애플리케이션 단에서 처리돼야 하며, 이경우 카프카는 재전송을 시도하지 않고 예외를 즉시 발생시킨다.
  • 비동기 메시징 : 일부 즉각적으로 응답 처리를 하지 않아도 되는 혹은 원하지 않는 경우 또는 한두 개의 메시지를 잃어버려도 문제되지 않거나 나중에 처리하기를 원할 수 있다. 카프카는 send() 성공 여부와 관계없이 메시지 응답 처리를 콜백 인터페이스 제공한다.

 

1
send(ProducerRecord<K,V> record, Callback callback)
cs

 

위의 콜백 인터페이스는 onCompletion 메서드를 포함하는데, 오버라이드하여 구현하면 된다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
producer.send(producerRecord, new Callback() {
                
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    
                    if(exception != null) {
                        log.info("예외 발생");
                        /*
                         * 예외처리
                         */
                    }else {
                        log.info("정상처리");
                        /*
                         * 정상처리 로직
                         */
                    }
                    
                }
});
cs

 

onCompletion 메소드 내부에 성공했거나 오류가 발생한 메시지를 처리할 수 있다.

 

 

사용자 정의 파티셔닝

카프카는 내부적으로 내장된 기본 파티셔너와 시리얼라이저를 사용한다. 하지만 가끔은 메시지가 해당 브로커에서 동일한 키는 동일한 파티션으로 가도록 사용자 정의 파티션 로직을 원할 수 있는데, 카프카는 이렇게 사용자 정의 파티셔너 구현을 할 수 있도록 인터페이스를 제공한다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class CustomPartition implements Partitioner{
 
        @Override
        public void configure(Map<String, ?> configs) {
            // TODO Auto-generated method stub
            
        }
 
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
                Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            
            int numPartitions = partitions.size();
            //TODO : 파티션 선택 로직
            return 0;
        }
 
        @Override
        public void close() {
            // TODO Auto-generated method stub
            
        }
        
}
cs

 

해당 커스텀 파티셔너를 사용하려면 Properties에 "partitioner.class"/"package.className" 키/값 형태소 설정해주면 된다.

 

1
2
3
4
5
6
7
8
9
10
11
12
public KafkaProducer<StringString> getProducer(){
        
        Properties producerProps = new Properties();
        
        producerProps.put("bootstrap.servers""localhost:9092,localhost:9093");
        producerProps.put("key.serializer""org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer""org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("partitioner.class","com.kafka.exam.CustomPartition");
        
        return new KafkaProducer<>(producerProps);
        
}
cs

 

추가 프로듀서 설정

 

  • buffer.memory : 카프카 서버로 전송을 기다리는 메시지를 위해 프로듀서가 사용할 버퍼 메모리의 크기다. 간단히 전송되지 않은 메시지를 보관하는 자바 프로듀서가 사용할 전체 메모리다. 이 설정의 한계에 도달하면, 프로듀서는 예외를 발생시키기 전에 max.block.ms 시간 동안 메시지를 대기시킨다. 만약 배치 사이즈가 더 크다면 프로듀서 버퍼에 더 많은 메모리를 할당해야한다. 예를 들어 설명하자면 카프카 서버에 메시지를 전송하는 속도보다 프로듀서가 메시지를 전송하는 요청의 수가 더 빠를 경우(1개씩 메시지를 카프카 서버에 전송하는데 하나를 보내면 2개의 요청이 계속 쌓인다면) 아직 보내지 않은 메시지는 해당 버퍼에 쌓이게 된다. 이 내용에 더해서 큐에 있는 레코드가 계속 남아있는 것을 피하기 위해 request.timeout.ms를 사용해 지정된 시간동안 메시지가 보내지지 않고 큐에 쌓여있다면 메시지는 큐에서 제거되고 예외를 발생시킨다.
  • acks : 이 설정은 각 파티션의 리더가 팔로워(복제본)에게 ACK을 받는 전략을 설정한다. ack=0이라면 리더에 데이터가 쓰이자 마자, 팔로워(복제본)가 데이터 로그를 쓰는 것을 신경쓰지 않고 커밋을 완료한다. ack=1 하나의 팔로워에게 ack를 받으면 커밋을 완료한다. ack=all 모든 팔로워에게 복제완료 ack를 받으면 커밋을 완료한다. 각 설정은 장단점이 있으므로 잘 조율해서 사용해야한다. 만약 ack=0이면 빠른 처리 성능을 보장하지만 매우 높은 가능성의 메시지 손실이 있을 수 있고, ack=1도 역시 처리성능은 빠르지만 여전히 메시지 손실가능성이 존재한다. 마지막으로 ack=all은 메시지 손실 가능성이 매우 낮지만 처리 성능이 느려진다.
  • batch.size : 이 설정은 설정된 크기만큼 여러 데이터를 모아 배치로 데이터를 전송한다. 하지만 프로듀서는 단위 배치가 꽉 찰때까지 기다릴 필요는 없다. 배치는 특정 주기로 전성되며 배치에 포함된 메시지 수는 상관하지 않는다.
  • linger.ms : 브로커로 현재의 배치를 전송하기 전에 프로듀서가 추가 메시지를 기다리는 시간을 나타낸다.
  • compression.type : 프로듀서는 브로커에게 압축되지 않은 상태로 메시지를 전송하는 것이 기본이지만, 해당 옵션을 사용해 데이터를 압축해 보내 처리 성능을 높힐 수 있다. 배치처리가 많을 수록 유용한 옵션이다.
  • retrites : 메시지 전송이 실패하면 프로듀서가 예외를 발생시키기 전에 메시지의 전송을 다시 시도하는 수.

기타 많은 설정들이 존재하지만 여기서 모두 다루지는 않는다. 공식 홈페이지를 이용해자.

 

자바 카프카 프로듀서 예제

 

코드 작성 전에 Maven에 카프카 dependency 설정을 해준다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public KafkaProducer<StringString> getProducer(){
        
        Properties producerProps = new Properties();
        
        producerProps.put("bootstrap.servers""localhost:9092,localhost:9093");
        producerProps.put("acks""all");
        producerProps.put("retries"1);
        producerProps.put("batch.size"20000);
        producerProps.put("linger.ms"1);
        producerProps.put("buffer.memory"24568545);
        
        return new KafkaProducer<>(producerProps);
        
}
/*
     * 동기식 
     * 
     * ProducerRecord 생성자 매개변수(*표시는 필수 매개변수)
     * *1)토픽이름,2)파티션 번호,3)타임스탬프,4)키,*5)값
     * 
     * 파티션선정
     * 1)파티션 번호가 ProducerRecord의 매개변수로 명시되어 있다면 그 파티션으로 보낸다.
     * 2)파티션이 지정되지 않고 ProducerRecord 매개변수에 키가 지정된 경우 파티션은 키의 해시값을 사용해 정한다.
     * 3)ProducerRecord 매개변수에 키와 파티션 번호 모두 지정되지 않은 경우에는 라운드 로빈 방식으로 정한다.
     */
    public void sendMessageSync(String topicName, String data) throws InterruptedException, ExecutionException {
        
        try(KafkaProducer<StringString> producer = getProducer();){
        
            ProducerRecord<StringString> producerRecord = new ProducerRecord<StringString>(topicName, data);
            
            Future<RecordMetadata> recordMetadata = producer.send(producerRecord);
            
            RecordMetadata result = recordMetadata.get();
            
        }
    }
    
    /*
     * 비동기식
     */
    public void sendMessageAsync(String topicName, String data) {
        
        try(KafkaProducer<StringString> producer = getProducer();){
        
            ProducerRecord<StringString> producerRecord = new ProducerRecord<StringString>(topicName, data);
            
            producer.send(producerRecord, new Callback() {
                
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    
                    if(exception != null) {
                        log.info("예외 발생");
                        /*
                         * 예외처리
                         */
                    }else {
                        log.info("정상처리");
                        /*
                         * 정상처리 로직
                         */
                    }
                    
                }
            });
            
        }
    }
http://colorscripter.com/info#e" target="_blank" style="color:#e5e5e5text-decoration:none">Colored by Color Scripter
http://colorscripter.com/info#e" target="_blank" style="text-decoration:none;color:white">cs

 

 

카프카 프로듀서를 사용할 때 유의해야 하는 사항

  • 데이터 유효성 검증 : 보낼 데이터의 유효성 검증은 간과하기 쉽다. 꼭 유효성 검증을 포함하는 로직을 작성하자.
  • 예외처리 : 연결 실패,네트워크 시간 초과 등의 예외로는 프로듀서가 내부적으로 재전송을 한다. 하지만 모든 예외에 대해서 재전송을 하는 것은 아니다. 즉, 예외 발생에 대해 처리의 책임은 온전히 프로듀서 애플리케이션에 있는 것이다.
  • 부트스트랩 URL 수 : 클러스터 노드수가 적다면 모든 URL를 목록으로 작성하자. 하지만 많약 노드가 아주 많다면 대표하는 브로커의 수만 목록으로 설정해주자.
  • 잘못된 파티셔닝 방식의 사용방지 : 파티션은 카프카에서 병렬 처리의 단위다. 메시지가 모든 토픽 파티션으로 균일하게 분배되도록 올바른 파티셔닝 전략을 선택하자. 만약 잘못된 파티셔닝 방식을 도입하게 되면 특정 파티션에만 요청이 집중되는 최악의 상황이 발생할 수 있으니 메시지가 모든 가용한 파티션에 분배 되도록 올바른 파티셔닝 방식을 정의하자.
  • 기존 토픽에 새로운 파티션 추가 방지 : 메시지를 분산시키기 위해서 키 값을 사용하여 파티션을 선택하는 토픽에는 파티션을 추가하는 것을 피해야한다. 새로운 파티션 추가는 각 키에 대해 산출된 해시 코드를 변경시키며, 이는 파티션의 수도 산출에 필요한 입력 중 하나로 사용하기 때문이다.

 

posted by 여성게
:
Middleware/Kafka&RabbitMQ 2019. 3. 16. 21:10

Kafka - Kafka Producer(카프카 프로듀서) In Java&CLI




카프카 프로듀서란 메시지를 생산(produce)해서 카프카의 토픽으로 메시지를 보내는 역할을 하는 애플리케이션, 서버 등을 모두 프로듀서라고 부른다.

프로듀서의 주요 기능은 각각의 메시지를 토픽 파티션에 매핑하고 파티션의 리더에 요청을 보내는 것이다. 키 값을 정해 해당 키를 가진 모든 메시지를 동일한

파티션으로 전송할 수 있다. 만약 키 값을 입력하지 않으면, 파티션은 라운드 로빈(round-robin) 방식으로 파티션에 균등하게 분배된다.


이후의 모든 예제는 이전 포스팅에서 구성한 카프카 클러스터링 환경에서 진행하였습니다. 동일한 환경 구성을 

구축하고 예제를 진행하시려면 이전 포스팅을 참조하시길 부탁드립니다.


▶︎▶︎▶︎카프카 클러스터링




콘솔 프로듀서로 메시지를 보내 보기

우선 메시지를 보내기 위해서는 토픽이 카프카 내에 존재해야 한다. 만약 존재하지 않는 토픽으로 메시지를 보낼 경우에 자동으로 토픽이 
생성되는 옵션을 이용하려면 "auto.create.topics.enable = true" 옵션을 환경설정 파일에 넣어주면 된다. 
이번 예제는 직접 수동으로 토픽을 생성할 예정이다.

토픽생성은 카프카에 기본으로 제공하는 스크립트를 이용하여 생성한다.


위의 명령으로 test-topic이라는 이름으로 파티션이 1개로 구성되어 있고, 복제본이 자신을 포함한 3개인 토픽이 생성된다.

만약 복제본들이 어느 클러스터 인스턴스에 할당되어 있으며, 리더는 누구인지 그리고 ISR 그룹 구성등을 보고 싶다면 카프카에서 기본으로 제공하는
스크립트로 조회가능하다.


결과를 보면 Leader는 2번 클러스터 인스턴스이며, 파티션은 0번 한개 그리고 복제본이 1,2,3번의 클러스터 인스턴스에 생성되어 있고,

ISR그룹이 1,2,3 클러스터 인스턴스로 이루어져 있음을 알 수 있다. 여기서 ISR 그룹이란 복제본들이 들어있는 클러스터 인스턴스 그룹으로 볼 수 있다.

그리고 리더 선출의 후보의 전제조건이 클러스터가 ISR 그룹내에 존재하는 클러스터이여야만 리더 선출의 후보가 될 수 있다. ISR은 카프카가 도입한

하나의 개념이라고 볼 수 있다. 하나를 예제로 설명하면 현재 2번 클러스터가 리더인데, 만약 해당 리더가 다운되면 ISR그룹에서 2번 클러스터는 빠지게 되고

ISR그룹내 1,3 클러스터 중에서 카프카 컨트롤러가 리더를 선출하게 된다.




카프카 콘솔을 이용한 간단한 메시징 테스트이다.

사실 이런 예제는 클러스터 구축단계에서 다 해보았던 것이지만 이번 포스팅에서는 프로듀서의 몇몇 중요한 옵션들을 자바를 기반으로

설명할 것이다. 물론 spring cloud stream을 이용해 메시징 미들웨어에 의존적이지 않은 코드로도 작성가능 하지만 이번엔 조금더

로우레벨로 직접 자바코드를 이용해 카프카 시스템을 이용해볼 것이다.


자바를 이용한 프로듀서


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
/**
 * Kafka Producer Exam
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaBookProducer1 {
    
    public static Properties init() {
        Properties props = new Properties();
        props.put("bootstrap.servers""localhost:9092,localhost:9093,localhost:9094");
        props.put("key.serializer""org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer""org.apache.kafka.common.serialization.StringSerializer");
        return props;
    }
    
    /**
     * 메시지 전송의 성공/실패여부를 신경쓰지 않는다.
     * 카프카가 항상 살아있는 상태이고 프로듀서가 자동으로 재전송하기 때문에 대부분의 경우 성공적으로 전송되지만,
     * 일부 유실이 있을 수 있다.
     * 
     * send() method를 이용해 ProducerRecord를 보낸다. 메시지는 버퍼에 저장되고 별도의 스레드를 통해 브로커로 전송한다.(로그만 봐도 main이 아닌 별도의 스레드가 일을 처리)
     * send()는 자바 퓨처(Future) 객체로 RecordMetadata를 리턴받지만 리턴값을 무시하기 때문에 메시지가 성공적으로 전송되었는지 알수 없다.
     * 실 서비스 환경에서는 거의 사용하지 않는다.
     * 
     * 메시지를 보낸 후의 에러는 무시하지만, 전송전의 에러는 잡아서 처리할 수 있다.
     */
    public static void sendNoConfirmResult() {
        long start = System.currentTimeMillis();
        try(Producer<StringString> producer = new KafkaProducer<>(init())){
            producer.send(new ProducerRecord<StringString>("yeoseong""Apache Kafka is a distributed streaming platform-sendNoConfirmResult()"));
        }catch (Exception e) {}
        long end = System.currentTimeMillis();
        log.info("sendNoConfirmResult - during time : "+ (end-start));
    }
    
    /**
     * 리턴 값으로 Future를 반환하는데, get()을 호출하여 결과를 받아 메시지가 성공적으로 전송됬는지 체크한다. 
     * 메인 스레드가 block되는 상황이 있지만, 신뢰성있는 메시지 전송을 보장한다.
     */
    public static void sendSync() {
        long start = System.currentTimeMillis();
        try(Producer<StringString> producer = new KafkaProducer<>(init())){
            RecordMetadata meta = producer.send(new ProducerRecord<StringString>("yeoseong""Apache Kafka is a distributed streaming platform-sendSync()")).get();
            log.info("Partition: {}, Offset: {}",meta.partition(),meta.offset());
        }catch (Exception e) {}
        long end = System.currentTimeMillis();
        log.info("sendSync - during time : "+ (end-start));
    }
    
    /**
     * 콜백방식으로 비동기 전송을 한다. 메시지 전송이 완료 혹은 실패되면,
     * 브로커쪽에서 콜백으로 onCompletion을 호출한다. 만약 실패하면 Exception 객체가 담긴다.
     * org.apache.kafka.clients.producer.Callback
     * 
     * 로그를 보면 main이 아닌 별도의 카프카 스레드에서 콜백을 호출한다.
     */
    public static void sendAsync() {
        long start = System.currentTimeMillis();
        try(Producer<StringString> producer = new KafkaProducer<>(init())){
            producer.send(new ProducerRecord<StringString>("yeoseong""Apache Kafka is a distributed streaming platform-sendAsync()"),new KafkaCallback());
        }catch (Exception e) {}
        long end = System.currentTimeMillis();
        log.info("sendAsync() - during time : "+ (end-start));
    }
    
    /**
     * 프로듀서에서 키값까지 같이 메시지를 보내면 키에 해당하는 파티션에만 메시지가 들어간다.
     * 하지만 키를 포함시키지 않는다면 라운드 로빈 방식으로 파티션마다 균등분배된다.
     * 
     * evenkey&oddkey는 각각 동일한 파티션으로만 데이터를 전송하게 된다.
     */
    public static void sendWithKey() {
        long start = System.currentTimeMillis();
        String topicName = "yoon";
        String oddKey="1";
        String evenKey="2";
        
        Properties props = new Properties();
        props.put("bootstrap.servers""localhost:9092,localhost:9093,localhost:9094");
        props.put("key.serializer""org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer""org.apache.kafka.common.serialization.StringSerializer");
        props.put("acks""1");
        props.put("compression.type""gzip");
        
        try(Producer<StringString> producer = new KafkaProducer<>(props)){
            
            IntStream.rangeClosed(110).forEach(i->{
                if(i%2==1) {
                    producer.send(new ProducerRecord<StringString>(topicName, oddKey ,i+" - Apache Kafka is a distributed streaming platform-sendWithKey() oddKey "+oddKey));
                }else {
                    producer.send(new ProducerRecord<StringString>(topicName, evenKey ,i+" - Apache Kafka is a distributed streaming platform-sendWithKey() evenKey "+evenKey));
                }
            });
            
            
        }catch (Exception e) {}
        long end = System.currentTimeMillis();
        log.info("sendWithKey() - during time : "+ (end-start));
    }
    
    
    public static void main(String[] args) {
        
//        sendNoConfirmResult();
//        sendSync();
//        sendAsync();
        sendWithKey();
    }
 
}
 
@Slf4j
class KafkaCallback implements Callback{
 
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        // TODO Auto-generated method stub
        if(!ObjectUtils.isEmpty(metadata)) {
            log.info("Partition: {}, Offset: {}",metadata.partition(),metadata.offset());
        }else {
            log.error("KafkaCallback - Exception");
        }
    }
    
}
 
cs

코드는 몇개의 예제를 메소드 형태로 구분하여 만들었다.

첫번째 메소드는 프로듀서에서 서버로 메시지를 보내고 난 후에 성공적으로 도착했는지까지 확인하지 않는다. 카프카가 항상 살아있는 상태이고
프로듀서가 자동으로 재전송하기 때문에 대부분의 경우 성공적으로 전송되지만, 일부 메시지는 손실이 있을 수도 있다. send()는 Future 객체로 RecordMetadata를 리턴받지만 그 리턴값을 객체로 받지 않고 있기 때문에 성공적으로 메시지가 전달되었는지 알 수 없다. 대부분 테스트 용도로만 사용하고 실서비스에서는
이용하지 않는 방법이다.

두번째 메소드는 동기 전송방법이다. 프로듀서는 메시지를 보내고 Future의 get()를 이용하여 Future를 기다린 후에 send()가 성공했는지 실패했는지 확인한다.
Future의 get()은 해당 작업 스레드를 블럭한 상태에서 Future의 리턴을 기다리기 때문에 동기 전송방법인 것이다.

세번째 메소드는 비동기 전송방법이다. 프로듀서는 send() 메소드를 콜백과 같이 호출하고 카프카 브로커에서 응답을 받으면 콜백한다. 
비동기적으로 전송한다면 응답을 기다리지 않기 때문에 더욱 빠른 전송이 가능하다. 콜백을 사용하기 위해 org.apache.kafka.clients.producer.Callback을
구현하는 클래스가 필요하다.(사실 람다식을 이용해서 매개변수로 넘겨주면 클래스는 따로 구현하지 않아도된다.) 카프카가 오류를 리턴하면 onCompletion()는 예외를 갖게 된다.

마지막 메소드는 키값을 추가하여 메시지를 보낼 경우이다. 키값을 포함하였을 경우에 컨슈머에는 어떻게 메시지가 전달 될지 콘솔 컨슈머를
이용하여 결과를 확인해볼 것이다.

우선 파티션 2개짜리인 토픽을 생성한다.


마지막 메소드를 main 메소드에서 실행한 후에 각각의 파티션으로 컨슈머를 실행했을 경우 결과가 어떻게 나오지는 확인한다.



2번 키를 포함시킨 메시지는 모두 0번 파티션으로 갔고, 1번 키를 포함시킨 메시지는 모두 1번 파티션으로 메시지가 전송되었다.

물론 특정파티션이 아닌 토픽으로 컨슈머를 실행시켰다면 모든 메시지 10개가 결과에 포함되었을 것이다. 이렇게 키값을 이용하여

특정 파티션으로만 메시지를 보낼 수 있는 기능을 제공하므로 다양하게 활용 가능하다.




프로듀서 주요옵션

Producer 주요 옵션

1)bootstrap.servers localhost:9092,localhost:9093,localhost:9094

:카프카 클러스터에 클라이언트가 처음 연결을 위한 호스트와 포트정보로 구성된 리스트 정보를 나타낸다. 전체 카프카 리스트가 아닌 호스트 하나만 입력해 사용할 있지만

카프카 인스턴스가 죽으면 클라이언트는 더이상 접속 없습니다. , 전체 클러스터 인스턴스 목록을 옵션으로 넣는 것이 좋습니다.


2)acks 0,1,all , —request-required-acks 0,1,all

-acks = 0 

0으로 설정하면 제작자는 서버로부터의 승인을 전혀 기다리지 않습니다. 레코드가 소켓 버퍼에 즉시 추가되고 전송 것으로 간주됩니다. 경우 서버가 레코드를 수신했음을 보증 없으며 재시도 구성이 적용되지 않습니다 (클라이언트는 일반적으로 어떤 실패도 없으므로). 레코드에 대해 다시 주어진 오프셋은 항상 -1 설정됩니다.


-acks = 1 

만약 1 설정하는 경우 리더는 데이터를 기록하지만, 모든 팔로워는 확인하지 않습니다. 경우 일부 데이터 손실이 있을수 있습니다.


-acks = all 

acks=-1 동일하며, 리더는 ISR 팔로워로부터 데이터에 대한 ack 기다립니다. 하나의 팔로워가 있는 데이터는 손실되지 않으며, 데이터 무손실에 대해 가장 강력하게 보장합니다.


3)buffer.memory

프로듀서가 카프카 서버로 데이터를 보내기 위해 잠시 대기(배치 전송이나 딜레이 ) 있는 전체 메모리 바이트입니다.


4)compression.type

프로듀서가 데이터를 압축해서 보낼 있는데, 어떤 타입으로 압축할지를 정할 있다. 옵션으로 none,gzip,snappy,lz4 같은 다양한 포맷이 있다.


5)retries

일시적인 오류로 인해 전송에 실패한 데이터를 다시 보내는 횟수


6)batch.size

프로듀서는 같은 파티션으로 보내는 여러 데이터를 함께 배치로 보내려고 시도한다. 이러한 동작은 클라이언트와 서버 양쪽에 성능적인 측면에서 도움이 된다. 설정으로 배치 크기 바이트 단위를 조정할 있다. 정의된 크기보다 데이터는 배치를 시도하지 않는다. 배치를 보내기전 클라이언트 장애가 발생하면 배치 내에 있던 메시지는 전달되지 않는다. 만약 고가용성이 필요한 메시지의 경우라면 배치 사이즈를 주지 않는 것도 하나의 방법이다.


7)linger.ms

배치형태의 메시지를 보내기 전에 추가적인 메시지들을 위해 기다리는 시간을 조정한다. 카프카 프로듀서는 지정된 배치 사이즈에 도달하면 옵션과 관계없이 즉시 메시지를 전송하고, 배치 사이즈에 도달하지 못한 상황에서 linger.ms 제한 시간에 도달했을때 메시지들을 전송한다. 0 기본값(지연없음)이며, 0보다 값을 설정하면 지연 시간은 조금 발생하지만 처리량이 좋아진다.


8)max.request.size

프로듀서가 보낼 있는 최대 메시지 바이트 사이즈. 기본값은 1MB이다.


나머지 Kerberos, ssl과 타임관련등의 설정들은 https://kafka.apache.org/documentation/#producerconfigs을 참고



server.propertis ->acks=all 따른 server.properties 설정정보 변경

만약 밑의 설정이 없다면 설정파일의 맨밑에 작성

min.insync.replicas=n

설정은 acks=all 했을 경우 리더를 포함한 인스턴스 n개의 승인(데이터복제,저장) 받으면 바로 프로듀서에게 결과를 전달한다.

최적은 acks=all, min.insync.replicas=2, replication factor=3 ->카프카문서에 명시되어있음.



메시지 전송 방법

프로듀서의 옵션 중 acks 옵션을 어떻게 설정하는지에 따라 카프카로 메시지를 전송할 때 메시지 손실 여부와 메시지 전송 속도 및 처리량이 달라진다.

1) 메시지 손실 가능성이 높지만 빠른 전송이 필요한 경우(acks=0 으로 설정)
메시지를 전송할 때 프로듀서는 카프카 서버에서 응답을 기다리지 않고, 메시지를 보낼 준비가 되는 즉시 다음 요청을 보낸다. 하지만 이런 방법을
이용한다면 자신이 보낸 메시지에 대해 결과를 기다리지 않기때문에 메시지 유실이 있을 수도 있다.


2) 메시지 손실 가능성이 적고 적당한 속도의 전송이 필요한 경우(acks=1 으로 설정)

이 옵션은 앞선 옵션과 달리 프로듀서가 카프카로 메시지를 보낸 후 보낸 메시지에 대해 메시지를 받는 토픽의 리더가 잘 받았는지 확인(acks)한다.

응답 대기 시간 없이 계속 메시지만 보내던 방법과 달리 확인을 하는 시간이 추가되어 메시지를 보내는 속도는 약간 떨어지게 되지만

메시지의 유실 가능성이 조금 줄어든다. 하지만 메시지를 받는 리더가 장애가 발생하면 메시지 유실 가능성이 있다. 하지만 보통 프로듀서 애플리케이션으로

많이 사용하는 로그스태시(logstash), 파일비트(Filebeat) 등에서는 프로듀서의 acks 옵션을 기본 1로 하고 있다. 특별한 경우가 아니라면

속도와 안전성을 확보할 수 있는 acks=1을 사용하는 것을 추천한다. 물론 메시지 유실에 큰 영향이 없는 서비스라는 조건하이다.


3) 메시지 전송 속도는 느리지만 메시지 손실이 없어야 하는 경우(acks=all 으로 설정)

acks=all의 동작 방법은 프로듀서가 메시지를 전송하고 난후 리더가 메시지를 받았는지 확인하고 추가로 팔로워까지 메시지를 잘받았는지(복제) 확인 하는 것이다.

속도적인 측면으로 볼때, acks 옵션 중에서 가장 느리지만 메시지 손실을 허용하지 않은 경우 사용하는 방법이다. acks=all을 완벽하게 사용하고자 한다면

프로듀서의 설정 뿐 아니라 브로커의 설정도 같이 조정해야한다. 브로커의 설정에 따라 응답 확인을 기다리는 수가 달라지게 된다.


-프로듀서의 acks=all과 브로커의 min.insync.replicas=1

이것은 acks=all 설정을 하였고 replication-factor가 예를 들어 3개라고 해도, 응답을 하나의 클러스터에서만 받게 된다.

즉, acks=1과 동일한 설정이 된다.


-프로듀서의 acks=all과 브로커의 min.insync.replicas=2

프로듀서가 메시지를 보내면 2개의 클러스터에 응답을 기다리게 된다.


-프로듀서의 acks=all과 브로커의 min.insync.replicas=3

만약 리플리케이션 팩터가 3이라면 위의 설정은 모든 클러스터 인스턴스의 응답 결과를 기다린다. 여기서 하나의 클러스터가 장애가

발생한다면 프로듀서는 메시지를 보낼 수 없게 될 것이다.


위에서 설명하였지만 카프카가 공식적으로 권장하는 옵션은

acks=all, min.insync.replicas=2, replication factor=3이다.


posted by 여성게
: