인프라/Docker&Kubernetes 2020. 6. 4. 17:10

이번 포스팅은 간단하게 싱글 노드 카프카를 도커로 띄우는 방법이다.

 

git clone https://github.com/wurstmeister/kafka-docker
cd kafka-docker

 

설정 파일은 docker-compoese로 되어있으며, 아래와 같다.

 

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    build: .
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_CREATE_TOPICS: "test:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

 

docker compose 명령으로 실제 컨테이너를 띄운다.

 

docker-compose -f docker-compose-single-broker.yml up -d

 

posted by 여성게
:
Middleware/Kafka&RabbitMQ 2019. 7. 6. 15:20

 

2019/07/06 - [Kafka&RabbitMQ] - Apache Kafka - Kafka(카프카)란 ? 분산 메시징 플랫폼 - 1

 

Apache Kafka - Kafka(카프카)란 ? 분산 메시징 플랫폼 - 1

이전 포스팅들에서 이미 카프카란 무엇이고, 카프카 프로듀서부터 컨슈머, 스트리밍까지 다루어보았다. 하지만 이번 포스팅을 시작으로 조금더 내용을 다듬고 정리된 상태의 풀세트의 카프카 포스팅을 시작할 것..

coding-start.tistory.com

이전 포스팅에서 간단히 카프카란 무엇이며 카프카의 요소들에 대해 다루어보았다. 이번 포스팅에는 이전에 소개했던 요소중 카프카 프로듀서에 대해 다루어볼 것이다.

 

카프카 프로듀서란 카프카 클러스터에 대해 데이터를 pub, 기록하는 애플리케이션이다.

 

카프카 프로듀서의 내부 구조

우리는 카프카 프로듀서를 사용하기 위해 추상화된 API를 사용한다. 하지만 우리가 사용하는 API는 내부적으로 많은 절차에 따라 행동을 한다. 이러한 프로듀서가 책임지는 역할들은 아래와 같다.

 

  • 카프카 브로커 URL 부트스트랩 : 카프카 프로듀서는 카프카 클러스터에 대한 메타데이터를 가져오기 위해 최소 하나 이상의 브로커에 연결한다. 프로듀서가 연결하길 원하는 첫 번째 브로커가 다운될 경우를 대비하여 보통 한개 이상의 브로커 URL 리스트를 설정한다.
  • 데이터 직렬화 : 카프카는 TCP 기반의 데이터 송수신을 위해 이진 프로토콜을 사용한다. 이는 카프카에 데이터를 기록할 때, 프로듀서는 미리 설정한 직렬화 클래스를 이용해 직렬화 한 이후에 전송한다. 즉, 카프카 프로듀서는 네트워크 상의 브로커들에게 데이터를 보낵기 전에 모든 메시지 데이터 객체를 바이트 배열로 직렬화한다.
  • 토픽 파티션의 결정 :  어떤 파티션으로 데이터가 전송돼야 하는지 결정하는 일은 카프카 프로듀서의 책임이다. 만약 프로듀서가 데이터를 보내기 위해 API에 파티션을 명시하지 않은 경우 파티셔너에 의해 키의 해시값을 이용해 파티션을 결정하고 데이터를 보낸다. 하지만 키값이 존재하지 않으면 라운드 로빈 방식으로 데이터를 전송한다.
  • 처리 실패/재시도 : 처리 실패 응답이나 재시도 횟수는 프로듀서 애플리케이션에 의해 제어된다.
  • 배치처리 : 호율적인 메시지 전송을 위해서 배치는 매우 유용하다. 미리 설정한 바이트 임계치를 넘지 않으면 버퍼에 데이터를 유지하고 임계치를 넘으면 데이터를 토픽으로 전송한다.

 

카프카 프로듀서의 내부 흐름을 설명하면, 프로듀서는 키/값 쌍으로 이루어진 데이터 객체를 전달 받는다. 그리고 해당 객체를 미리 정의된 시리얼라이저 혹은 사용자 정의 시리얼라이저를 이용해 직렬화 한다. 데이터 직렬화 후 데이터가 전송될 파티션을 결정한다. 파티션 정보가 API 호출에 포함되어 있다면 해당 파티션에 직접 데이터를 보내지만 파티션정보가 포함되지 않았다면 데이터 객체의 키값의 해시를 이용해 파티셔너가 데이터를 보낼 파티션을 결정해준다. 만약 키값이 null이라면 라운드 로빈 방식으로 파티션을 결정한다. 파티션 결정이 끝나면 리더 브로커를 이용하여 데이터 전송 요청을 보낸다.

 

카프카 프로듀서 API

카프카 프로듀서 API를 사용하기 위한 필수파라미터들이다.

 

  • bootstrap.servers : 카프카 브로커 주소의 목록을 설정한다. 주소는 hostname:port 형식으로 지정되며 하나 이상의 브로커 리스트를 지정한다.
  • key.serializer : 메시지는 키 값의 쌍으로 이뤄진 형태로 카프카 브로커에게 전송된다. 브로커는 이 키값이 바이트 배열로 돼있다고 가정한다. 그래서 프로듀서에게 어떠한 직렬화 클래스가 키값을 바이트 배열로 변환할때 사용됬는 지 알려주어야한다. 카프카는 내장된 직렬화 클래스를 제공한다.(ByteArraySerializer, StringSerializer, IntegerSerializer / org.apache.kafka.common.serialization)
  • value.serializer : key.serializer 속성과 유사하지만 이 속성은 프로듀서가 값을 직렬화 하기 위해 사용하는 클래스를 알려준다.

 

1
2
3
4
5
6
7
8
9
10
11
public KafkaProducer<StringString> getProducer(){
        
        Properties producerProps = new Properties();
        
        producerProps.put("bootstrap.servers""localhost:9092,localhost:9093");
        producerProps.put("key.serializer""org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer""org.apache.kafka.common.serialization.StringSerializer");
                
        return new KafkaProducer<>(producerProps);
        
}
cs

 

자바 클래스로 위의 필수 설정들을 사용하여 카프카 프로듀서를 생성하는 코드이다. Properties 객체를 이용해 모든 설정 값을 설정해주고 카프카 프로듀서 객체 생성자의 매개변수로 해당 설정을 넣어준다.

 

 

카프카 프로듀서 객체와 ProducerRecord 객체

프로듀서는 메시지 전송을 위해 ProducerRecord 객체를 이용한다. 해당 ProducerRecord는 토픽이름, 파티션 번호, 타임스탬프, 키, 값 등을 포함하는 객체이다. 파티션 번호, 타임스탬프, 키 등은 선택 파라미터 이지만, 데이터를 보낼 토픽과 데이터 값을 반드시 포함해야한다. 그리고 보낼 파티션 선정에는 3가지 방법이 존재한다.

 

  1. 파티션 번호가 지정되면, 지정된 파티션에 데이터를 전송한다.
  2. 파티션이 지정되지 않고 키값이 존재하는 경우 키의 해시 값을 이용하여 파티션을 정한다.
  3. 키와 파티션 모두 지정되지 않은 경우 파티션은 라운드 로빈 방식으로 할당된다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
public void sendMessageSync(String topicName, String data) throws InterruptedException, ExecutionException {
        
        try(KafkaProducer<StringString> producer = getProducer();){
        
            ProducerRecord<StringString> producerRecord = new ProducerRecord<StringString>(topicName, data);
            
            Future<RecordMetadata> recordMetadata = producer.send(producerRecord);
            
            RecordMetadata result = recordMetadata.get();
            
        }
 
}
cs

 

자바 코드를 이용해 데이터 객체를 생성한 후에 동기식으로 토픽에 데이터를 보내는 코드이다.

 

1
2
3
4
ProducerRecord(String topicName, Integer partition, K key, V value)
ProducerRecord(String topicName, Integer partition, Long timestamp, K key, V value)
ProducerRecord(String topicName, K key, V value)
ProducerRecord(String topicName, V value)
cs

 

ProducerRecord의 생성자 리스트의 일부를 보여준다.

 

일단 send()를 사용해 데이터를 전송하면 브로커는 파티션 로그에 메시지를 기록하고, 메시지에 대한 서버 응답의 메타데이터를 포함한 RecordMetadata를 반환하는데, 이 객체는 오프셋,체크섬,타임스탬프,토픽,serializedKeySize 등을 포함한다.

 

앞에서 일반적인 메시지 게시 유형에 대해 설명했지만, 메시지 전송은 동기 또는 비동기로 수행될 수 있다.

 

  • 동기 메시징 : 프로듀서는 메시지를 보내고 브로커의 회신을 기다린다. 카프카 브로커는 오류 또는 RecordMetadata를 보낸다. 일반적으로 카프카는 어떤 연결 오류가 발생하면 재전송을 시도한다. 그러나 직렬화, 메시지 등에 관련된 오류는 애플리케이션 단에서 처리돼야 하며, 이경우 카프카는 재전송을 시도하지 않고 예외를 즉시 발생시킨다.
  • 비동기 메시징 : 일부 즉각적으로 응답 처리를 하지 않아도 되는 혹은 원하지 않는 경우 또는 한두 개의 메시지를 잃어버려도 문제되지 않거나 나중에 처리하기를 원할 수 있다. 카프카는 send() 성공 여부와 관계없이 메시지 응답 처리를 콜백 인터페이스 제공한다.

 

1
send(ProducerRecord<K,V> record, Callback callback)
cs

 

위의 콜백 인터페이스는 onCompletion 메서드를 포함하는데, 오버라이드하여 구현하면 된다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
producer.send(producerRecord, new Callback() {
                
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    
                    if(exception != null) {
                        log.info("예외 발생");
                        /*
                         * 예외처리
                         */
                    }else {
                        log.info("정상처리");
                        /*
                         * 정상처리 로직
                         */
                    }
                    
                }
});
cs

 

onCompletion 메소드 내부에 성공했거나 오류가 발생한 메시지를 처리할 수 있다.

 

 

사용자 정의 파티셔닝

카프카는 내부적으로 내장된 기본 파티셔너와 시리얼라이저를 사용한다. 하지만 가끔은 메시지가 해당 브로커에서 동일한 키는 동일한 파티션으로 가도록 사용자 정의 파티션 로직을 원할 수 있는데, 카프카는 이렇게 사용자 정의 파티셔너 구현을 할 수 있도록 인터페이스를 제공한다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class CustomPartition implements Partitioner{
 
        @Override
        public void configure(Map<String, ?> configs) {
            // TODO Auto-generated method stub
            
        }
 
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
                Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            
            int numPartitions = partitions.size();
            //TODO : 파티션 선택 로직
            return 0;
        }
 
        @Override
        public void close() {
            // TODO Auto-generated method stub
            
        }
        
}
cs

 

해당 커스텀 파티셔너를 사용하려면 Properties에 "partitioner.class"/"package.className" 키/값 형태소 설정해주면 된다.

 

1
2
3
4
5
6
7
8
9
10
11
12
public KafkaProducer<StringString> getProducer(){
        
        Properties producerProps = new Properties();
        
        producerProps.put("bootstrap.servers""localhost:9092,localhost:9093");
        producerProps.put("key.serializer""org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer""org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("partitioner.class","com.kafka.exam.CustomPartition");
        
        return new KafkaProducer<>(producerProps);
        
}
cs

 

추가 프로듀서 설정

 

  • buffer.memory : 카프카 서버로 전송을 기다리는 메시지를 위해 프로듀서가 사용할 버퍼 메모리의 크기다. 간단히 전송되지 않은 메시지를 보관하는 자바 프로듀서가 사용할 전체 메모리다. 이 설정의 한계에 도달하면, 프로듀서는 예외를 발생시키기 전에 max.block.ms 시간 동안 메시지를 대기시킨다. 만약 배치 사이즈가 더 크다면 프로듀서 버퍼에 더 많은 메모리를 할당해야한다. 예를 들어 설명하자면 카프카 서버에 메시지를 전송하는 속도보다 프로듀서가 메시지를 전송하는 요청의 수가 더 빠를 경우(1개씩 메시지를 카프카 서버에 전송하는데 하나를 보내면 2개의 요청이 계속 쌓인다면) 아직 보내지 않은 메시지는 해당 버퍼에 쌓이게 된다. 이 내용에 더해서 큐에 있는 레코드가 계속 남아있는 것을 피하기 위해 request.timeout.ms를 사용해 지정된 시간동안 메시지가 보내지지 않고 큐에 쌓여있다면 메시지는 큐에서 제거되고 예외를 발생시킨다.
  • acks : 이 설정은 각 파티션의 리더가 팔로워(복제본)에게 ACK을 받는 전략을 설정한다. ack=0이라면 리더에 데이터가 쓰이자 마자, 팔로워(복제본)가 데이터 로그를 쓰는 것을 신경쓰지 않고 커밋을 완료한다. ack=1 하나의 팔로워에게 ack를 받으면 커밋을 완료한다. ack=all 모든 팔로워에게 복제완료 ack를 받으면 커밋을 완료한다. 각 설정은 장단점이 있으므로 잘 조율해서 사용해야한다. 만약 ack=0이면 빠른 처리 성능을 보장하지만 매우 높은 가능성의 메시지 손실이 있을 수 있고, ack=1도 역시 처리성능은 빠르지만 여전히 메시지 손실가능성이 존재한다. 마지막으로 ack=all은 메시지 손실 가능성이 매우 낮지만 처리 성능이 느려진다.
  • batch.size : 이 설정은 설정된 크기만큼 여러 데이터를 모아 배치로 데이터를 전송한다. 하지만 프로듀서는 단위 배치가 꽉 찰때까지 기다릴 필요는 없다. 배치는 특정 주기로 전성되며 배치에 포함된 메시지 수는 상관하지 않는다.
  • linger.ms : 브로커로 현재의 배치를 전송하기 전에 프로듀서가 추가 메시지를 기다리는 시간을 나타낸다.
  • compression.type : 프로듀서는 브로커에게 압축되지 않은 상태로 메시지를 전송하는 것이 기본이지만, 해당 옵션을 사용해 데이터를 압축해 보내 처리 성능을 높힐 수 있다. 배치처리가 많을 수록 유용한 옵션이다.
  • retrites : 메시지 전송이 실패하면 프로듀서가 예외를 발생시키기 전에 메시지의 전송을 다시 시도하는 수.

기타 많은 설정들이 존재하지만 여기서 모두 다루지는 않는다. 공식 홈페이지를 이용해자.

 

자바 카프카 프로듀서 예제

 

코드 작성 전에 Maven에 카프카 dependency 설정을 해준다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public KafkaProducer<StringString> getProducer(){
        
        Properties producerProps = new Properties();
        
        producerProps.put("bootstrap.servers""localhost:9092,localhost:9093");
        producerProps.put("acks""all");
        producerProps.put("retries"1);
        producerProps.put("batch.size"20000);
        producerProps.put("linger.ms"1);
        producerProps.put("buffer.memory"24568545);
        
        return new KafkaProducer<>(producerProps);
        
}
/*
     * 동기식 
     * 
     * ProducerRecord 생성자 매개변수(*표시는 필수 매개변수)
     * *1)토픽이름,2)파티션 번호,3)타임스탬프,4)키,*5)값
     * 
     * 파티션선정
     * 1)파티션 번호가 ProducerRecord의 매개변수로 명시되어 있다면 그 파티션으로 보낸다.
     * 2)파티션이 지정되지 않고 ProducerRecord 매개변수에 키가 지정된 경우 파티션은 키의 해시값을 사용해 정한다.
     * 3)ProducerRecord 매개변수에 키와 파티션 번호 모두 지정되지 않은 경우에는 라운드 로빈 방식으로 정한다.
     */
    public void sendMessageSync(String topicName, String data) throws InterruptedException, ExecutionException {
        
        try(KafkaProducer<StringString> producer = getProducer();){
        
            ProducerRecord<StringString> producerRecord = new ProducerRecord<StringString>(topicName, data);
            
            Future<RecordMetadata> recordMetadata = producer.send(producerRecord);
            
            RecordMetadata result = recordMetadata.get();
            
        }
    }
    
    /*
     * 비동기식
     */
    public void sendMessageAsync(String topicName, String data) {
        
        try(KafkaProducer<StringString> producer = getProducer();){
        
            ProducerRecord<StringString> producerRecord = new ProducerRecord<StringString>(topicName, data);
            
            producer.send(producerRecord, new Callback() {
                
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    
                    if(exception != null) {
                        log.info("예외 발생");
                        /*
                         * 예외처리
                         */
                    }else {
                        log.info("정상처리");
                        /*
                         * 정상처리 로직
                         */
                    }
                    
                }
            });
            
        }
    }
http://colorscripter.com/info#e" target="_blank" style="color:#e5e5e5text-decoration:none">Colored by Color Scripter
http://colorscripter.com/info#e" target="_blank" style="text-decoration:none;color:white">cs

 

 

카프카 프로듀서를 사용할 때 유의해야 하는 사항

  • 데이터 유효성 검증 : 보낼 데이터의 유효성 검증은 간과하기 쉽다. 꼭 유효성 검증을 포함하는 로직을 작성하자.
  • 예외처리 : 연결 실패,네트워크 시간 초과 등의 예외로는 프로듀서가 내부적으로 재전송을 한다. 하지만 모든 예외에 대해서 재전송을 하는 것은 아니다. 즉, 예외 발생에 대해 처리의 책임은 온전히 프로듀서 애플리케이션에 있는 것이다.
  • 부트스트랩 URL 수 : 클러스터 노드수가 적다면 모든 URL를 목록으로 작성하자. 하지만 많약 노드가 아주 많다면 대표하는 브로커의 수만 목록으로 설정해주자.
  • 잘못된 파티셔닝 방식의 사용방지 : 파티션은 카프카에서 병렬 처리의 단위다. 메시지가 모든 토픽 파티션으로 균일하게 분배되도록 올바른 파티셔닝 전략을 선택하자. 만약 잘못된 파티셔닝 방식을 도입하게 되면 특정 파티션에만 요청이 집중되는 최악의 상황이 발생할 수 있으니 메시지가 모든 가용한 파티션에 분배 되도록 올바른 파티셔닝 방식을 정의하자.
  • 기존 토픽에 새로운 파티션 추가 방지 : 메시지를 분산시키기 위해서 키 값을 사용하여 파티션을 선택하는 토픽에는 파티션을 추가하는 것을 피해야한다. 새로운 파티션 추가는 각 키에 대해 산출된 해시 코드를 변경시키며, 이는 파티션의 수도 산출에 필요한 입력 중 하나로 사용하기 때문이다.

 

posted by 여성게
:
Middleware/Kafka&RabbitMQ 2019. 7. 6. 13:28

 

 

이전 포스팅들에서 이미 카프카란 무엇이고, 카프카 프로듀서부터 컨슈머, 스트리밍까지 다루어보았다. 하지만 이번 포스팅을 시작으로 조금더 내용을 다듬고 정리된 상태의 풀세트의 카프카 포스팅을 시작할 것이다.

 

카프카 시스템의 목표

  • 메시지 프로듀서와 컨슈머 사이의 느슨한 연결
  • 다양한 형태의 데이터 사용 시나리오와 장애 처리 지원을 위한 메시지 데이터 유지
  • 빠른 처리 시간을 지원하는 구성 요소로 시스템의 전반적인 처리량을 최대화
  • 이진 데이터 형식을 사용해서 다양한 데이터 형식과 유형을 관리
  • 기존의 클러스터 구성에 영향을 주지 않고 일정한 서버의 확장성을 지원

 

카프카의 구조

카프카 토픽에서 모든 메시지는 바이트의 배열로 표현되며, 카프카 프로듀서는 카프카 토픽에 메시지를 저장하는 애플리케이션이다. 이렇게 프로듀서가 보낸 데이터를 저장하고 있는 모든 토픽은 하나 이상의 파티션으로 나뉘어져있다. 각 토픽에 대해 여러개로 나뉘어져 있는 파티션은 메시지를 도착한 순서에 맞게 저장한다.(내부적으로는 timestamp를 가지고 있다.) 카프카에서는 프로듀서와 컨슈머가 수행하는 두 가지 중요한 동작이 있다. 프로듀서는 로그 선행 기입 파일 마지막에 메시지를 추가한다. 컨슈머는 주어진 토픽 파티션에 속한 로그 파일에서 메시지를 가져온다. 물리적으로 각 토픽은 자신에게 할당된 하나 이상의 파티션을 다른 브로커(카프카 데몬 서버)들에게 균등하게 분배된다.

 

이상적으로 카프카 파이프라인은 브로커별로 파티션과 각 시스템의 모든 토픽에 대해 균등하게 분배되어야 한다. 컨슈머는 토픽에 대한 구독 또는 이런 토픽에서 메시지를 수신하는 애플리케이션이다.

 

이러한 카프카 시스템은 고가용성을 위한 클러스터를 지원한다. 전형적인 카프카 클러스터는 다중 브로커로 구성된다. 클러스터에 대한 메시지 읽기와 쓰기 작업의 부하 분산을 돕는다. 각 브로커는 자신의 상태를 저장하지 않지만 Zookeeper를 사용해 상태 정보를 유지한다. 각각의 토픽 파티션에는 리더로 활동하는 브로커가 하나씩 있고, 0개 이상의 팔로워를 갖는다. 여느 리더/팔로워 관계와 비슷하게 리더는 해당하는 파티션의 읽기나 쓰기 요청을 관리한다. 여기서 Zookeeper는 카프카 클러스터에서 중요한 요소로, 카프카 브로커와 컨슈머를 관리하고 조정한다. 그리고 카프카 클러스터 안에서 새로운 브로커의 추가나 기존 브로커의 장애를 감시한다. 예전 카프카 버전에서는 파티션 오프셋 관리도 Zookeeper에서 관리하였지만 최신 버전의 카프카는 오프셋 관리를 위한 토픽이 생성되어 오프셋관련된 데이터를 하나의 토픽으로 관리하게 된다.

 

 

메시지 토픽

메시징 시스템에서 메시지는 어디간에 저장이 되어 있어야한다. 카프카는 토픽이라는 곳에 메시지를 바이트 배열 형태로 저장한다. 각 토픽은 비즈니스 관점에서 하나의 카테고리가 될 수 있다. 다음은 메시지 토픽에 대한 용어 설명이다.

 

  • 보관 기간 : 토픽 안의 메시지는 처리 시간과 상관없이 정해진 기간 동안에만 메시지를 저장하고 있는다. 기본 값은 7일이며 사용자가 값 변경이 가능하다.
  • 공간 유지 정책 : 메시지의 크기가 설정된 임계값에 도달하면 메시지를 지우도록 설정가능하다. 카프카 시스템 구축시 충분한 용량 계획을 수립해야 원치않은 삭제가 발생하지 않는다.
  • 오프셋 : 카프카에 할당된 각 메시지는 오프셋이라는 값이 사용된다. 토픽은 많은 파티션으로 구성돼 있으며, 각 파티션은 도착한 순서에 따라 메시지를 저장하고, 컨슈머는 이러한 오프셋으로 메시지를 인식하고 특정 오프셋 이전의 메시지는 컨슈머가 수신했던 메시지로 인식한다.
  • 파티션 : 카프카 메시지 토픽은 1개 이상의 파티션으로 구성되어 분산 처리된다. 해당 파티션의 숫자는 토픽 생성시 설정가능하다. 만약 순서가 아주 중요한 데이터의 경우에는 파티션 수를 1개로 지정하는 것도 고려할 수 있다.(아무리 파티션이 시계열로 데이터가 저장되지만 여러 파티션에 대한 메시지 수신은 어떠한 순서로 구독될지 모르기 때문이다.)
  • 리더 : 파티션은 지정된 복제 팩터에 따라 카프카 클러스터 전역에 걸쳐 복제된다. 각 파티션은 리더 브로커와 팔로워 브로커를 가지며 파티션에 대한 모든 읽기와 쓰기 요청은 리더를 통해서만 진행된다.

 

메시지 파티션

각 메시지는 파티션에 추가되며 각 단위 메시지는 오프셋으로 불리는 숫자에 맞게 할당된다. 카프카는 유사한 키를 갖고 있는 메시지가 동일한 파티션으로 전송되도록 하며, 메시지 키의 해시 값을 산출하고 해당 파티션에 메시지를 추가한다. 여기서 중요하게 짚고 넘어가야 할 것이 있다. 각 단위 메시지에 대한 시간적인 순서는 토픽에 대해서는 보장되지 않지만, 파티션 안에서는 항상 보장된다. 즉, 나중에 도착한 메시지가 항상 파티션의 끝 부분에 추가됨을 의미한다. 예를 들어 설명하자면,

 

A 토픽은 a,b,c라는 파티션으로 구성된다는 가정이다. 만약 라운드 로빈 방식으로 메시지가 각 파티션에 분배가 되는 옵션을 적용했다고 생각해보자. 1,2,3,4,5,6이라는 메시지가 pub되었고 각 메시지는 a-1,4 / b-2,5 / c-3,6 와 같이 파티션에 분배된다. 1,2,3,4,5,6이라는 순서로 메시지가 들어왔고 해당 메시지는 시간적인 순서와 라운드로빈 방식으로 a,b,c라는 파티션에 배분되었다. 또한 각 파티션 안을 살펴보면 확실히 시간 순서로 배분되었다. 하지만 컨슈머 입장에서는 a,b,c파티션에서 데이터를 수신했을 경우 1,4,2,5,3,6이라는 메시지 순서로 데이터를 수신하게 될것이다.(a,b,c순서로 수신, 하지만 파티션 수신순서는 바뀔수 있다.) 무엇을 의미할까? 위에서 말한것과 같이 파티션 내에서는 시간적인 순서를 보장하지만 전체적인 토픽관점에서는 순서가 고려되지 않는 것이다. 즉, 순서가 중요한 데이터는 동일한 키를 사용해 동일 파티션에만 데이터를 pub하거나 혹은 토픽에 파티션을 하나만 생성해 하나의 파티션에 시간적인 순서로 데이터를 저장되게 해야한다.

 

많은 수의 파티션을 구성하는 경우의 장단점

  • 높은 처리량 보장 : 파티션을 여러개 구성한다면 병렬 처리되어 높은 처리량을 보장할 것이다. 왜냐하면 여러 파티션에 대해 쓰기 동작이 여러 스레드를 이용해 동시에 수행되기 때문이다. 또한 하나의 컨슈머 그룹 내에서 하나의 파티션에 대해 하나의 컨슈머가 할당되기 때문에 여러 파티션의 메시지를 여러 컨슈머가 동시에 수신하여 병렬처리가 가능하다. 여기서 중요한 것은 동일한 컨슈머 그룹 안의 하나의 파티션에 대해 여러 컨슈머가 읽을 수 없다는 것이다.
  • 프로듀서 메모리 증가 : 만약 파티션 수가 많아 진다면 일시적으로 프로듀서의 버퍼의 메모리가 과도해질 수 있어, 토픽에 메시지를 보내는데 문제가 생길 수 있으므로 파티션 수는 신중히 고려해야한다.
  • 고가용성 문제 : 여러 파티션을 생성하므로서 카프카 클러스터의 고가용성을 지원한다. 즉, 리더인 파티션이 중지되면 팔로워중에 하나가 리더로 선출될 것이다. 하지만 너무 과중한 파티션 수는 리더 선출에 지연이 생길 가능성이 있기 때문에 신중히 고려해야한다.

복제와 복제로그

복제는 카프카 시스템에서 신뢰성있는 시스템을 구현하기 위해 가장 중요한 부분이다. 각 토픽 파티션에 대한 메시지 로그의 복제본은 카프카 클러스터 내의 여러 서버에 걸쳐서 관리되고, 각 토픽마다 복제 팩터를 다르게 지정가능하다.

 

일반적으로 팔로워는 리더의 로그 복사본을 보관하는데, 이는 리더가 모든 팔로워로부터 ACK를 받기 전까지는 메시지를 커밋하지 않는다는 것을 의미한다.(ack=all 설정시)

 

메시지 프로듀서 

일반적으로 프로듀서는 파티션으로 데이터를 쓰기 않고, 메시지에 대한 쓰기 요청을 생성해서 리더 브로커에게 전송한다. 그 이후 파티셔너가 메시지의 해시 값을 계산하여 프로듀서로 하여금 어느 파티션에 메시지를 pub할지 알 수 있도록 한다.

 

일반적으로 해시 값은 메시지 키를 갖고 계산하며, 메시지 키는 카프카의 토픽으로 메시지를 기록할 때 제공된다. null 키를 갖는 메시지는 분산 메시징을 지원하는 파티션에 대해 라운드 로빈 방식으로 분배된다. 카프카에서의 각 파티션은 한 개의 리더를 가지며, 각 읽기와 쓰기 요청은 리더를 통해 진행된다.

 

프로듀서는 설정에 따라 메시지의 ACK를 기다리며, 일반적으로 모든 팔로워 파티션에 대해 복제가 완료되면 커밋을 완료한다. 또한 커밋이 완료되지 않으면 읽기 작업은 허용되지 않는다. 이는 메시지의 손실을 방지한다. 그렇지만 ACK 설정을 1로 설정할 수 있는데, 이경우 리더 파티션이 하나의 팔로워에게만 복제 완료 응답을 받으면 커밋을 완료한다. 이 경우 처리 성능은 좋아지지만 메시지의 손실은 어느정도 감수 해야한다. 즉, 메시지의 손실을 허용하고 빠른 처리 시간을 원하는 경우에 사용할 수 있는 설정이다.

 

메시지 컨슈머

카프카 토픽을 구독하는 역할을 하는 애플리케이션이다. 각 컨슈머는 컨슈머 그룹에 속해 있으며, 일부 컨슈머 그룹은 여러개의 컨슈머를 포함한다. 위에서도 간단히 설명하였지만 동일 그룹의 컨슈머들은 동시에 같은 토픽의 서로다른 파티션에서 메시지를 읽어온다. 하지만 서로 다른 그룹의 컨슈머들은 같은 토픽에서 데이터를 읽어와 서로 영향을 미치지 않으면서 메시지 사용이 가능하다.

 

주키퍼의 역할

  • 컨트롤러 선정 : 컨트롤러는 파티션 관리를 책임지는 브로커 중에 하나이며, 파티션 관리는 리더 선정, 토픽 생성, 파티션 생성, 복제본 관리 등을 포함한다. 하나의 노드 또는 서버가 꺼지면 카프카 컨트롤러는 팔로워 중에서 파티션 리더를 선정한다. 카프카는 컨트롤러를 선정하기 위해 주키퍼의 메타데이터를 정보를 이용한다.
  • 브로커 메타데이터 : 주키퍼는 카프카 클러스터의 일부인 각 브로커에 대해 상태 정보를 기록한다.
  • 토픽 메타데이터 : 주키퍼는 또한 파티션 수, 특정한 설정 파라미터 등의 토픽 메타 데이터를 기록한다.

이외에 주키퍼는 더 많은 역할을 담당하고 있다.

 

여기까지 간단히 카프카에 대한 소개였다. 이후 포스팅들에서는 프로듀서,컨슈머,스트리밍,클러스터 구축 등의 내용을 몇 차례에 거쳐 다루어 볼 것이다.

posted by 여성게
:

 

오늘 포스팅할 내용은 ELK Stack의 요소중 하나인 Logstash(로그스태시)입니다. 로그스태시 설명에 앞서 로그란 시스템이나 애플리케이션 상태 및 행위와 관련된 풍부한 정보를 포함하고 있습니다. 이러한 정보를 각각 시스템마다 파일로 기록하고 있는 경우가 대다수 일겁니다. 그렇다면 과연 이러한 정보를 파일로 관리하는 것이 효율적인 것인가를 생각해볼 필요가 있습니다. 한곳에 모든 로그데이터를 시스템별로 구분하여 저장하고 하나의 뷰에서 모든 시스템의 로그데이터를 볼 수 있다면 굉장히 관리가 편해질 것입니다. 이러한 모든 로그정보를 수집하여 하나의 저장소(DB, Elasticsearch 등)에 출력해주는 시스템이 로그스태시라는 시스템입니다. 앞선 포스팅에서 다루어보았던 Filebeat와 연동을 한다면 파일에 축적되고 있는 로그데이터를 하나의 저장소로 보낼 수도 있고, 카프카의 토픽에 누적되어 있는 메시지들을 가져와 하나의 저장소에 보낼 수도 있습니다.

 

2019/06/17 - [Elasticsearch&Solr] - ELK Stack - Filebeat(파일비트)란? 간단한 사용법

 

ELK Stack - Filebeat(파일비트)란? 간단한 사용법

오늘 포스팅할 내용은 ELK Stack에서 중요한 보조 수단 중 하나인 Filebeat(파일비트)에 대해 다루어볼 것이다. 우선 Filebeat를 이용하는 사례를 간단하게 하나 들어보자면, 운영중인 애플리케이션에서 File을..

coding-start.tistory.com

다시 한번 로그스태시는 아래 그림과 같이 오픈소스 데이터 수집 엔진으로, 실시간 파이프라인 기능을 갖춘 데다 널리 사용되고 있는 시스템입니다. 로그스태시를 활용하면 다양한 입력차원에서 데이터를 수집 및 분석, 가공 및 통합해 다양한 목적지에 저장하는 파이프라인을 쉽게 구축할 수 있습니다.

 

 

게다가 로그스태시는 사용하기 쉽고, 연동하기 간단한 입력 필터와 출력 플러그인과 같이 다양한 플러그인을 제공합니다. 이처럼 로그스태시를 사용하면 대용량 데이터와 각종 데이터 형식을 통합하고 정규화하는 프로세스 구축이 가능합니다.

 

 

로그스태시 아키텍쳐

 

 

로그스태시는 위의 그림과 같이 여러 원천데이터를 가져와 필터를 통해 가공하여 하나 이상의 시스템으로 내보낼 수 있습니다. 입력은 여러 원천데이터가 될 수 있고, 필터 또한 하나 이상을 정의하여 가공할 수 있습니다. 즉 입력,필터,출력 세 가지 단계로 구성이 됩니다. 이중 입력과 출력은 필수요소, 필터는 선택요소입니다. 또한 로그스태시는 기본적으로 파이프라인 단계마다 이벤트를 버퍼에 담기 위해 인메모리 바운드 큐를 사용하는데, 로그스태시가 비정상 종료된다면 인메모리에 저장된 이벤트는 손실됩니다. 하지만 손실 방지를 위하여 영구큐를 사용해 실행하면 이벤트를 디스크에 작성하기 때문에 비정상 종료후 재시작되어도 데이터의 손실이 없습니다.

 

 

영구 큐는 LOGSTASH_HOME/config 디렉토리의 logstash.yml 설정 파일에서 queue.type: persisted 속성을 설정하면 활성화 가능하다. 또한 로그스태시 힙메모리를 늘리려면 LOGSTASH_HOME/config 디렉토리 밑에 jvm.options 파일에서 조정가능합니다.

 

입력 플러그인

일반적으로 많이 사용되는 입력 플러그인에 대해 설명한다.

 

1)File

File 플러그인은 파일에서 이벤트를 한 줄씩 가져오는데 사용한다. 리눅스와 유닉스 명령어 tail -f와 유사한 방식으로 동작한다. 각 파일의 모든 변경 사항을 추적하고 마지막으로 읽은 위치를 기반으로 해당 시점 이후의 데이터만 전송한다. 각 파일에서 현재 위치는 sincedb라는 별도로 분리한 파일에 기록하여 로그스태시를 재기동하여도 읽지 못한 데이터를 빠지지 않고 읽어 올 수 있다.

 

1
2
3
4
5
6
7
8
9
10
11
#sample.conf
 
#input plugin file
input
{
 file {
  path => ["/Users/yun-yeoseong/*","...","..."]
  start_position => "beginning"
#sincedb_path => "NULL" ~> 로그스태시를 재시작할때마다 파일의 처음부터 읽는다.
  exclude => ["*.csv"]
  discover_interval => "10s"
  type => "applogs" ~>새로운 필드를 선언한다. 추후 필터에서 유용하게 쓰이는 필드이다.
 }
}
cs

 

위의 input 설정을 설명하면 path에 설정된 경로에서 파일을 읽어오며 파일에서 로그 추출 시작점을 파일의 처음으로 잡았고 경로의 파일중 csv 확장자는 제외하고 10초마다 파일을 읽어들이며 type이라는 필드에 applogs라는 데이터를 추가한다. type 필드를 이용하여 시스템 이름등을 넣어 로그스태시가 수집한 로그데이터 구분이 가능하다. 나머지 기타 설정들을 공식 홈페이지를 확인하자.

 

2)Beat

Beat 입력 플러그인을 사용하면 로그스태시에서 엘라스틱비트 프레임워크의 이벤트를 수신할 수 있다. 엘라스틱비트란 로그스태시를 보조하는 역할이며, 다양한 시스템에서 데이터를 수집하여 전달해주는 역할이다. 로그스태시와 공통점이라면 다양한 시스템의 데이터를 수집할 수 있다는 점이며, 차이점은 수집한 데이터를 가공하지 못한다는 점이다. 즉, 보통 엘라스틱비트에서 데이터를 수집하여 로그스태시에 보내면 로그스태시는 데이터를 가공하여 출력 포인트로 가공된 데이터를 내보낸다. 

 

1
2
3
4
5
6
7
8
9
10
11
12
#input plugin beat
input
{
 beats {
  host => "192.168.10.229"
  port => 2134 =>"실행중인 비트 포트만 작성해주면 된다.
 }
 beats { =>beats를 여러개 입력하여 다중 비트 입력을 받을 수 있다.
  host => "ip"
  port => "port"
 }
}
cs

 

위의 input 설정을 설명하자면 192.168.10.229:2134로 떠있는 비트에서 이벤트를 수신한다는 설정이다. 여러개의 beat 설정을 넣어서 여러 비트프레임워크에서 이벤트를 수신할 수 있다. 만약 로그스태시와 비트가 같은 서버에 있다면 host는 생략하고 port만 명시해도 된다.

 

설명한 input 플러그인 이외에도 JDBC,Kafka 등 많은 입력 플러그인이 있다. 그 중 카프카에서 데이터를 가져오는 예제는 이 포스팅 마지막에 할 예정이라 설명을 생략하였다.

 

TIP 로그스태시 실행 시 -r 옵션을 지정하면 설정을 변경하고 저장할 때마다 자동으로 바뀐 환경설정을 다시 로드한다. 즉, 매번 설정이 변경될 때마다 로그스태시를 재시작하지 않아도 된다.

 

 

출력 플러그인

가장 일반적으로 많이 사용되는 출력 플러그인 설명이다.

 

1)Elasticsearch

로그스태시에서 일래스틱서치로 이벤트 혹은 로그 데이터를 전송하는 데 사용하며, 권장하는 방법이라고 한다. 엘라스틱서치에 데이터가 있으면 키바나로 손쉽게 시각화가 가능하기에 사용하면 여러모로 유용하다.

 

1
2
3
4
5
6
7
8
9
10
#output to elasticsearch
output {
 elasticsearch {
 index => "elasticserach-%{+YYYY.MM.dd}" => default logstash-%{+YYYY.MM.dd}
 document_type => "_doc"
 hosts => "192.162.43.30" / ["ip1:9200","ip2:9200"] => default localhost:9200
 user => "username"
 password => "password"
 }
}
cs

 

직관적인 설정방법이라 크게 설명할 것은 없다. 몇개 짚고 넘어가자면 여러 엘라스틱서치 노드가 존재한다면 위와 같이 배열형태로 설정할 수 있고, 엘라스틱서치 사용자 자격 증명이 필요하다면 user,password를 지정할 수 있다. 데이터를 보낼 인덱스명은 기본값을 가지고 있지만 사용자 정의가 가능하다. 호스트 포트를 생략하면 기본적으로 엘라스틱서치의 기본포트를 이용한다.

 

2)CSV

CSV 플러그인을 사용하면 출력을 CSV 형식으로 저장할 수 있다. 플러그인 사용 시 필요한 설정은 출력 파일 위치를 지정하는 path 매개변수와 CSV 파일에 기록할 이벤트 필드명을 지정하는 fields 매개변수다. 이벤트에 필드가 없는 경우, 빈 문자열이 기록된다. 

 

1
2
3
4
5
6
7
#output to elasticsearch
output {
 csv {
  fields => ["message","@timestamp","host"]
  path => "/home/..."
 }
}
cs

 

설명한 출력 플러그인 이외에도 많은 출력 플러그인이 존재한다. 나머지는 공식 홈페이지를 참고하자.

 

코덱 플러그인

가장 일반적으로 많이 사용되는 코덱 플러그인이다.

 

1)JSON

해당 코덱은 데이터가 json으로 구성된 경우, 입력 플러그인에서 데이터를 디코딩하고 출력 플러그인에서 데이터를 인코딩하도록 사용하는데 유용하다. 만약 \n 문자가 들어간 JSON 데이터가 있는 경우 json_lines 코덱을 사용한다. 

 

1
2
3
4
5
6
input codec exam
input {
 stdin {
  codec => "json" => \n 구분자가 있는 pretty json 일경우 json_lines codec을 이용한다.
 }
}
 
#input exam
{"question":"안녕","answer":"네, 안녕하세요"}
 
#output result
{

      "answer" => "네, 안녕하세요",

      "question" => "안녕",

      "host" => "yun-yeoseong-ui-MacBook-Pro.local",

      "@version" => "1",

    "@timestamp" => 2019-06-26T13:26:08.704Z

}

 

#input exam2

{"question":"안녕","answer":"네, \n 안녕하세요"}

#output result

{

      "message" => "{\"question\":\"안녕\",\"answer\"n 안세요\"}",

      "host" => "yun-yeoseong-ui-MacBook-Pro.local",

      "@version" => "1",

      "tags" => [

        [0] "_jsonparsefailure"

    ],

    "@timestamp" => 2019-06-26T13:28:58.986Z

}

 

=============================================================================

input codec exam

input {

 stdin {

  codec => "json_lines" => \n 구분자가 있는 pretty json 일경우 json_lines codec을 이용한다.

 }

}

 

#input exam

{"question":"안녕","answer":"네, \n 안녕하세요"}

#output result

{

      "host" => "yun-yeoseong-ui-MacBook-Pro.local",

      "question" => "안녕",

      "answer" => "네, \n 안녕하세요",

      "@version" => "1",

    "@timestamp" => 2019-06-26T13:30:12.650Z

}

cs

 

코덱플러그인은 위와 같이 입력,출력 플러그인에 설정이 들어간다.

 

2)Multiline

여러 행에 걸친 데이터를 단일 이벤트로 병합하는 데 유용한 플러그인이다.

 

1
2
3
4
5
6
7
8
9
10
11
#input codex exam2
input {
 file {
  path => "/var/log/access.log"
  codex => multiline {
   pattern => "^\s" 공백으로 시작하는 데이터를 이전 행과 결합
   negate => false
   what => "previous"
  }
 }
}
cs

 

공백으로 시작하는 모든 행을 이전 행과 결합하는 코덱설정이다.

 

이외에도 더 많은 코덱 플러그인이 존재한다. 자세한 사항을 공식 홈페이지를 참고바란다.

 

 

Kafka(카프카) + ELK Stack을 이용한 로그 분석 구현

사실 데이터의 흐름은 정의하기 나름이지만 필자 나름대로의 플로우로 로그분석을 위한 로그데이터 파이프라인을 구축해보았고, 실제 솔루션 내에도 적용한 플로우이다.

 

전체적인 플로는 아래와 같다.

 

App -> Kafka -> Logstash -> Elasticsearch

 

위와 같이 애플리케이션에서 발생하는 로그들을 DB에 저장하는 것이 아니라, 카프카를 이용해 특정 토픽에 메시지를 보낸 후에 해당 토픽을 폴링하고 있는 로그스태시가 데이터를 수집하여 엘라스틱서치에 색인한다.

 

1)App(Spring boot + Spring Cloud Stream)

애플리케이션은 스프링부트 기반의 웹프로젝트이며, 스프링 클라우드 스트림 라이브러리를 이용하여 카프카와 연동하였다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
############################<Binder Config>###############################
#broker list(cluster)
spring.cloud.stream.kafka.binder.brokers=localhost:9092,localhost:9093,localhost:9094,localhost:9095
#acks mode
spring.cloud.stream.kafka.binder.producer-properties.acks=all
#required acks num
spring.cloud.stream.kafka.binder.required-acks=3
#min partition num
#spring.cloud.stream.kafka.binder.min-partition-count=4
#auto create topic enable
spring.cloud.stream.kafka.binder.auto-create-topics=true
#auto add partitions
#spring.cloud.stream.kafka.binder.auto-add-partitions=true
#replication factor
spring.cloud.stream.kafka.binder.replication-factor=2
############################</Binder Config>###############################
############################<Producer Config>##############################
 
#Log Producer
spring.cloud.stream.bindings.logstash_producer.destination=KIVESCRIPT_CHAT_LOG
spring.cloud.stream.bindings.logstash_producer.content-type=application/json
spring.cloud.stream.bindings.logstash_producer.producer.partition-count=4
 
############################</Producer Config>#############################
cs

 

애플리케이션에서 카프카와 연동하기 위한 application.properties 설정들이다. 만약 위의 설정들을 모른다면 이전에 포스팅했던 글을 참고하길 바란다.

 

 

Kafka - Spring cloud stream kafka(스프링 클라우드 스트림 카프카)

Kafka - Spring cloud stream kafka(스프링 클라우드 스트림 카프카) 이전 포스팅까지는 카프카의 아키텍쳐, 클러스터 구성방법, 자바로 이용하는 프로듀서,컨슈머 등의 글을 작성하였다. 이번 포스팅은 이전까지..

coding-start.tistory.com

밑의 링크는 카프카 클러스터링에 관련된 포스팅이다.

 

 

Kafka - Kafka(카프카) cluster(클러스터) 구성 및 간단한 CLI사용법

Kafka - Kafka(카프카) cluster(클러스터) 구성 및 간단한 CLI사용법 ▶︎▶︎▶︎카프카란? 이전 포스팅에서는 메시징 시스템은 무엇이고, 카프카는 무엇이며 그리고 카프카의 특징과 다른 메시지 서버와의 차이..

coding-start.tistory.com

 

다음은 카프카 바인더와 연결시켜줄 채널을 명시해주는 자바 컨피그 클래스이다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
 * Custom Message Processor
 * @author yun-yeoseong
 *
 */
public interface MessageProcessor {
    
    public static final String CHAT_LOG = "logstash_producer";
    
    @Output(KiveMessageProcessor.CHAT_LOG)
    MessageChannel chatLog();
    
}
cs

 

애플리케이션에서 카프카 토픽으로 메시지를 내보내는 코드이다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
 * 
 * @author yun-yeoseong
 *
 */
@Slf4j
@Component
public class MessageSender {
    
    @Autowired
    private MessageProcessor messageProcessor;
    
    public void sendMessage(String log) {
                
        MessageChannel outputChannel = messageProcessor.chatLog();
        
        outputChannel.send(MessageBuilder
                           .withPayload(request)
                           .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                           .build());
    }
    
}
cs

 

다음은 로그스태시 설명이다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
input {
        kafka {
                bootstrap_servers => "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095"
                topics => "LOGSTASH_PRODUCER"
                group_id => "APP_LOG_GROUP"
                enable_auto_commit => "true"
                auto_offset_reset => "latest"
                consumer_threads => 4
                codec => "json"
        }
}
 
output {
        elasticsearch {
                index => "chatlog-%{+YYYY.MM.dd}"
                document_type => "_doc"
                hosts => ["127.0.0.1:9200","127.0.0.1:9400"]
                template_name => "log-template"
        }
}
cs

 

위 설정은 Logstash 입력과 출력을 정의한 conf 파일이다. 별다른 설정은 없고 입력으로 카프카 클러스터의 특정 토픽을 폴링하고 있으며, 출력으로 엘라스틱서치를 바라보고 있다. 설정에 대한 자세한 사항은 공식홈페이지를 확인하길 바란다. 하나 짚고 넘어갈 것이 있다면 필자는 로그를 색인하기 위한 인덱스를 동적으로 생성하지 않고, 미리 Index Template를 선언해 놓았다. 만약 동적으로 인덱스를 생성한다면 비효율적인 필드 데이터 타입으로 생성될 것이다.(keyword성 데이터도 text타입으로 생성)

 

아래 링크는 엘라스틱서치 Rest 자바 클라이언트를 이용하여 인덱스 템플릿을 생성하는 예제이다.

 

Elasticsearch - Rest High Level Client를 이용한 Index Template 생성

오늘 간단히 다루어볼 내용은 엘라스틱서치의 REST 자바 클라이언트인 Rest High Level Client를 이용하여 Index Template을 생성해보는 예제이다. 바로 예제로 들어간다. 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 1..

coding-start.tistory.com

생략된 나머지 설정(카프카,엘라스틱서치 등)들은 이전 포스팅 글들을 참조하자. 혹시나 모든 설정파일 및 소스가 필요하다면 댓글을 달아주시면 될 듯하다.

 

 

기타 명령어

 

LOGSTASH_HOME/bin logstash-plugin list -> 현재 설치된 플러그인 목록

LOGSTASH_HOME/bin logstash-plugin list --group filter -> 필터 플러그인 목록 출력(input,output,codec 등을 그룹명으로 줄 수 있음)

LOGSTASH_HOME/bin logstash-plugin list 'kafka' -> kafka라는 단어가 포함된 플러그인이 있다면 출력

LOGSTASH_HOME/bin logstash-plugin install logstash-output-email -> logstash-output-email 플러그인 설치

LOGSTASH_HOME/bin logstash-plugin update logstash-output-email -> logstash-output-email 플러그인 최신버전으로 업데이트

 

 

posted by 여성게
:
Middleware/Kafka&RabbitMQ 2019. 3. 30. 00:45

Kafka - Kafka Stream API(카프카 스트림즈) - 2



이전 카프카 스트림즈 포스팅에서는 간단하게 카프카 스트림즈 API를 다루어보았습니다. 이번 2번째 카프카 스트림즈 포스팅은 조금더 깊게 카프카 스트림즈에 대해 알아보려고 합니다.


Kafka Streams는 Kafka 프로듀서 및 컨슈머를 이용하여 들어오는 메시지를 즉각적으로 가공하여 또 다른 토픽으로 메시지를 내보낼 수 있습니다. 이러한 카프카 스트림즈를 사용하는 기업들을 소개하자면 New York Times는 카프카와 카프카 스트림즈를 이용하여 독자들을 위한 실시간 컨첸츠를 저장하고 배포합니다.그리고 라인 같은 경우는 서비스끼리 통신하기 위한 중앙 데이터 허브로 카프카를 사용합니다. 그리고 카프카 스트림즈를 이용하여 토픽을 데이터를 안정적으로 가공하고 필터링하여 컨슈머가 효율적으로 메시지를 컨슘할 수 있게 합니다. 이러한 많은 기업들이 안정적으로 실시간 데이터를 처리하기 위해 카프카와 카프카 스트림즈를 이용합니다.



카프카 메시징 계층은 데이터를 저장하고 전송하기 위해 데이터를 파티션 단위로 분할한다. 카프카 스트림즈 역시 파티션된 토픽을 기반으로 병렬 처리 모델의 논리 단위로 작업을 진행한다. 카프카 스트림즈는 키/값 형태의 스트림 데이터이므로 스트림 데이터의 키값으로 토픽내 특정 파티션으로 라우팅된다. 일반적으로 파티션 개수만큼 컨슈머 그룹안의 컨슈머들이 생성되듯이 카프카 스트림즈도 역시 파티션 개수만큼 태스크가 생성되어 병렬로 데이터 스트림을 처리할 수 있다. 또한 작업스레드 수를 조정할 수 있어 더욱 효율적인 병렬처리가 가능하다.


Required configuration parameters


  • application.id - 스트림 처리 응용 프로그램의 식별자 아이디이다. Kafka 클러스터 내에서 유일한 값이어야 한다.
  • bootstrap.servers - Kafka 인스턴스 호스트:포트 정보이다.


Optional configuration parameters(중요도 보통이상의 설정값)


  • cache.max.bytes.buffering - 모든 스레드에서 레코드 캐시에 사용할 최대 메모리 바이트 수입니다.(default 10485760 bytes)
  • client.id - 요청시 서버에 전달할 ID 문자열이다. 카프카 스트림즈 내부적으로 프로듀서,컨슈머에게 전달된다(default empty string)
  • default.deserialization.exception.handler - DeserializationExceptionHandler인터페이스를 구현하는 예외 처리 클래스이다.

(default  LogAndContinueExceptionHandler)

  • default.production.exception.handler - ProductionExceptionHandler인터페이스를 구현하는 예외 처리 클래스이다.

  (default DefaultProductionExceptionHandler)

  • key.serde - record key에 대한 직렬화/역직렬화 클래스이다. Serde 인터페이스를 구현한다.(default Serdes.ByteArray().getClass().getName() )
  • num.standby.replicas - 각 태스크의 대기 복제본 수이다.(default 0)
  • num.stream.threads - 스트림을 처리할 스레드 수이다.(default 1)
  • replication.factor - 복제본 수이다.(default 1)
  • retries - 처리 실패시 재시도 횟수(default 0)
  • retry.backoff.ms - 재시도 요청 간격 시간(default 100밀리세컨드)
  • state.dir - 상태 저장소 디렉토리 위치(default /tmp/kafka-streams)
  • value.serde - record value에 대한 직렬화/역직렬화 클래스이다. key.serde와 동일.


default.deserialization.exception.handler


기본 deserialization 예외 처리기를 사용하면 deserialize에 실패한 레코드의 예외를 관리할 수 있다. 예외 처리기는 throw된 예외에 따라 FAIL 또는 CONTINUE를 반환해야한다. FAIL은 스트림이 종료될 것이고, CONTINUE는 예외를 무시하고 계속해서 처리를 진행할 것이다. 내부적으로 제공하는 예외 핸들어가 있다.


  • LogAndContinueExceptionHandler - 예외가 발생하여도 계속해서 프로세스를 진행한다.
  • LogAndFailExceptionHandler - 예외가 발생하면 프로세스를 중지한다.

기본적으로 제공하는 핸들러 이외에도 사용자가 직접 정의하여 예외 핸들러를 구현할 수 있다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class SendToDeadLetterQueueExceptionHandler implements DeserializationExceptionHandler {
    KafkaProducer<byte[], byte[]> dlqProducer;
    String dlqTopic;
 
    @Override
    public DeserializationHandlerResponse handle(final ProcessorContext context,
                                                 final ConsumerRecord<byte[], byte[]> record,
                                                 final Exception exception) {
 
        log.warn("Exception caught during Deserialization, sending to the dead queue topic; " +
            "taskId: {}, topic: {}, partition: {}, offset: {}",
            context.taskId(), record.topic(), record.partition(), record.offset(),
            exception);
 
        dlqProducer.send(new ProducerRecord<>(dlqTopic, record.timestamp(), record.key(), record.value(), record.headers())).get();
 
        return DeserializationHandlerResponse.CONTINUE;
    }
 
    @Override
    public void configure(final Map<String, ?> configs) {
        dlqProducer = .. // get a producer from the configs map
        dlqTopic = .. // get the topic name from the configs map
    }
}
cs


위의 예제코드는 예외가 발생하면 DLQ 토픽에 메시지를 보낸 이후 계속해서 프로세스를 진행하는 예외 핸들러이다. 위에서 이야기하였듯이 반드시 FAIL,CONTINUE 둘중하나를 반환해야 한다.


  • default.production.exception.handler 


프로덕션 예외 처리기를 사용하여 너무 큰 메시지 데이터를 생성하는 등 브로커와 상호 작용하려고 할때 트리거되는 예외를 컨트롤 할 수 있습니다. 기본적으로 카프카는 DefaultProductionExceptionHandler를 제공한다. 이 예외 처리기도 역시 FAIL,CONTINUE등 하나를 반환하여 프로세스 진행여부를 결정해주어야 한다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse;
 
public class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler {
    public void configure(Map<String, Object> config) {}
 
    public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
                                                     final Exception exception) {
        if (exception instanceof RecordTooLargeException) {
            return ProductionExceptionHandlerResponse.CONTINUE;
        } else {
            return ProductionExceptionHandlerResponse.FAIL;
        }
    }
}
 
Properties settings = new Properties();
 
// other various kafka streams settings, e.g. bootstrap servers, application id, etc
 
settings.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
             IgnoreRecordTooLargeHandler.class);
cs



Stream DSL


  • branch - 제공되는 predicate에 따라서 적절히 여러개의 KStream으로 분할한다.(KStream[] 반환)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
 * branch
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam {
 
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        source.foreach((key,value)->log.info("key,value ={}",key+" "+value));
        KStream<StringString>[] streamArr = source.branch(
            (key,value) -> value.startsWith("A"),
            (key,value) -> value.startsWith("B"),
            (key,value) -> true
        );
        
        streamArr[0].to("stream-exam-output1");
        streamArr[1].to("stream-exam-output2");
        streamArr[2].to("stream-exam-output3");
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs


위의 코드는 stream-exam-input이라는 토픽에 "A"로 시작하는 데이터가 들어오면 stream-exam-output1, "B"로 시작하는 데이터가 들어오면 stream-exam-output2, 그 밖의 데이터는 stream-exam-output3으로 보내는 간단한 branch 예제이다. branch에는 적절하게 Predicate를 리스트 형태로 작성한다. 데이터의 값에 따라 다른 처리를 해야할때에는 branch를 사용하면 각각 다른 처리를 하는 토픽으로 메시지를 내보낼 수 있을 것 같다.


  • filter - 적절한 Predicate을 인자로 받아 메시지를 필터링한다.(KStream 반환)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
 * filter
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam2 {
 
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        source.filter( (key,value)->value.length()>3 ).to("stream-exam-output");;
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs


위의 예제는 토픽으로 메시지를 입력받아 value 값의 길이가 3보다 크다면 stream-exam-output 토픽으로 메시지를 내보내는 예제이다.


  • filterNot - filter와 반대되는 개념이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
 * filterNot
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam3 {
 
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        source.filterNot( (key,value)->value.length()>3 ).to("stream-exam-output");;
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs

fliter와는 반대되는 개념이다. 크게 설명할 부분을 없을 것같다.

  • flatMap - 하나의 레코드를 이용해 0개 혹은 하나 이상의 레코드를 생성한다. flatMap은 key,value가 바뀔 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
 * flatMap
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam4 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        source.flatMap(
            new KeyValueMapper() {
                @Override
                public Object apply(Object key, Object value) {
                    List<KeyValue<Object, Object>> list = new LinkedList<>();
                    String keyStr = (String)key;
                    String valueStr = (String)value;
                    list.add(KeyValue.pair(valueStr.toUpperCase(),value));
                    list.add(KeyValue.pair(valueStr.toLowerCase(),value));
                    return list;
                }
            }
        ).to("stream-exam-output");;
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs

위의 예제는 메시지를 받아서 해당 메시지의 값을 대문자,소문자를 키로하여 2개의 레코드를 만드는 예제이다.

  • flatMapValues - 원본 레코드의 키를 유지하면서 하나의 레코드를 가져와서 0개 혹은 하나 이상의 레코드를 생성한다. value의 값과 값의 타입이 바뀔 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
 * flatMapValue
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam5 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        source.flatMapValues(value->Arrays.asList(value.split(" "))).to("stream-exam-output");
//        source.flatMapValues(
//                new ValueMapper() {
//                    @Override
//                    public Object apply(Object value) {
//                        String valueStr = (String)value;
//                        return Arrays.asList(valueStr.split(" "));
//                    }
//                }
//        );
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs

위의 예제는 메시지의 값에 공백을 포함하고 있으면 해당 메시지를 공백기준으로 split하여 list로 반환한다. 그럼 해당 List는 flat되어 여러개의 String Stream으로 변환되어 stream-exam-output 토픽으로 메시지가 전달된다.

  • GroupByKey - 기존 키로 레코드를 그룹핑한다. 스트림이나 테이블을 집계하고 후속작업을 위해 키로 그룹화하여 파티션하는 것을 보장한다. 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
 * GroupByKey
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam6 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        KGroupedStream<StringString> groupedStream = source.flatMap(
            new KeyValueMapper() {
                @Override
                public Object apply(Object key, Object value) {
                    List<KeyValue<Object, Object>> list = new LinkedList<>();
                    String keyStr = (String)key;
                    String valueStr = (String)value;
                    list.add(KeyValue.pair(value,valueStr.toUpperCase()));
                    list.add(KeyValue.pair(value,valueStr.toLowerCase()));
                    return list;
                }
            }
        ).groupByKey();
        
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs


스트림의 기존 키값을 이용하여 그룹핑한다.


GroupBy - 새로운 키로 그룹핑한다. 테이블을 그룹핑할 때는 새로운 값과 값 유형을 지정할 수도 있다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
 * GroupBy
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam7 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        KGroupedStream<StringString> groupedStream = source.groupBy(
                (key,value)->value, Serialized.with(Serdes.String(),Serdes.String())
        );
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs


위의 예제와 같이 기존 스트림에서 받은 키값을 그대로 사용하는 것이 아니라 변경하여 사용할 수 있다.(예제는 value를 키로 사용한다)


  • map - 하나의 레코드를 가져와서 다른 하나의 레코드를 생성한다. 타입을 포함하여 키와 값을 수정할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
 * map
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam8 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        source.map(
            (key,value)->KeyValue.pair(value, key)
        );
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs


메시지를 받아서 키와 값을 바꿔주는 map 예제이다.


  • mapValues - 하나의 레코드를 받아서 값을 바꿔준다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
 * mapValues
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam9 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        source.mapValues(
            (value)->value+"_map"
        ).to("stream-exam-output");
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs


메시지를 받아서 값에 "_map"이라는 문자열을 추가하여 수정하였다.


  • merge - 두 스트림의 레코드를 하나의 스트림으로 merge한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
 * merge
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam10 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        source.foreach((key,value)->log.info("key,value ={}",key+" "+value));
        KStream<StringString>[] streamArr = source.branch(
            (key,value) -> value.startsWith("A"),
            (key,value) -> value.startsWith("B"),
            (key,value) -> true
        );
        
        KStream<StringString> merge1 = streamArr[0].merge(streamArr[1]);
        merge1.merge(streamArr[2]).to("stream-exam-output");
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs



  • selectKey - 각 레코드에 새키를 할당한다. 마치 map을 이용하여 key값을 바꾸는 것과 동일하다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
 * selectKey
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam11 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
 
        source.selectKey(
  //리턴되는 값이 키가된다.
            (key,value)->value.charAt(0)+""
        ).to("stream-exam-output");
            
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs


  • toStream - KTable을 스트림으로 가져온다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Slf4j
public class WordCounter {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
      
        final StreamsBuilder builder = new StreamsBuilder();
        
        NoriAnalyzer analyzer = new NoriAnalyzer();
      
        KStream<StringString> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(analyzer.analyzeForString(value).split(" ")))
                //return하는 value가 키가 된다. 리턴값으로 그룹핑된 카프카스트림 객체를 리턴한다. KGroupedStream
                .groupBy(new KeyValueMapper<StringStringString>() {
                  @Override
                  public String apply(String key, String value) {
                    log.info("key = {},value = {}",key,value);
                    return value;
                  }
                })
                //count()를 호출하여 해당 키값으로 몇개의 요소가 있는지 체크한다. 그리고 해당 데이터를 스토어에(KeyValueStore<Bytes,byte[]> counts-store) 담고
                //변경이 있는 KTable에 대해서만 결과값을 리턴한다.
                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
                .toStream() //KTable -> Stream
                .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
      
        final Topology topology = builder.build();
        System.out.println(topology.describe());
      
        final KafkaStreams streams = new KafkaStreams(topology, props);
      
        try {
          streams.start();
        } catch (Throwable e) {
          System.exit(1);
        }
    }
}
 
cs


지금까지는 메시지의 상태가 없는 StreamDSL이었다. 다음으로 알아볼 DSL은 이전 메시지의 상태를 참조하는 상태기반 스트림이다. 아마 실시간으로 데이터 스트림을 처리할때 상태가 필요없는 처리도 있겠지만, 오늘하루 사용자가 많이 문의한 단어등 단어의 빈도수를 계산해야하는 스트림이 있다고 해보자. 그렇다면 지금 이순간 전까지 해당 단어는 몇번이 누적되었는지를 알아야 이시간 이후로 들어오는 같은 단어도 이전 빈도수에 누적하여 통계를 낼 것이다. 이렇게 상태에 기반하여 처리해야하는 스트림을 처리하는 것이 상태기반 스트림이다.



다음 포스팅에서 다루어볼 내용은 Stateful transformations이다. 레퍼런스의 양이 굉장히 많은 부분으로 조금 정리한 후 포스팅하려한다.


posted by 여성게
:
Middleware/Kafka&RabbitMQ 2019. 3. 30. 00:45

kafka is a distributed streaming platform

이번 포스팅은 Spring Cloud Stream 환경에서의 kafka Streams API입니다. 물론 이전 포스팅들에서 자바코드로 카프카 스트림즈를 다루어봤지만 이번에는 스프링 클라우드 스트림즈 환경에서 진행합니다. 카프카 스트림즈에 대한 설명은 이전 포스팅에서 진행하였기에 개념적인 설명은 하지 않고 코드레벨로 다루어보겠습니다. 혹시나 카프카 스트림즈를 모르시는 분이 있으시다면 아래 링크를 참조하시길 바랍니다.

지금부터 진행될 예제 및 설명은 모두 Spring Cloud Stream Reference를 참조하였습니다. 번역 중 오역이 있을 수 있습니다.

▶︎▶︎▶︎2019/03/26 - [Kafka&RabbitMQ] - Kafka - Kafka Streams API(카프카 스트림즈)

 

Kafka - Kafka Streams API(카프카 스트림즈)

Kafka - Kafka Streams API(카프카 스트림즈) 카프카는 대규모 메시지를 저장하고 빠르게 처리하기 위해 만들어진 제품이다. 초기 사용 목적과는 다른 뛰어난 성능에 일련의 연속된 메시지인 스트림을 처리하는..

coding-start.tistory.com

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
<?xml version="1.0" encoding="UTF-8"?>
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>kafka-streams-exam</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka-streams-exam</name>
    <description>Demo project for Spring Boot</description>
 
    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Greenwich.SR1</spring-cloud.version>
    </properties>
 
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
 
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
 
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
 
</project>
 
http://colorscripter.com/info#e" target="_blank" style="color:#e5e5e5; text-decoration:none">Colored by Color Scripter
http://colorscripter.com/info#e" target="_blank" style="text-decoration:none; color:white">cs

의존성을 추가해줍니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@SpringBootApplication
@EnableBinding(KStreamProcessor.class)
public class WordCountProcessorApplication {
 
    @StreamListener("input")
    @SendTo("output")
    public KStream<?, WordCount> process(KStream<?, String> input) {
        return input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, value) -> value)
                .windowedBy(TimeWindows.of(5000))
                .count(Materialized.as("WordCounts-multi"))
                .toStream()
                .map((key, value) -> new KeyValue<>(nullnew WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
    }
 
    public static void main(String[] args) {
        SpringApplication.run(WordCountProcessorApplication.class, args);
    }
}
cs

해당 코드를 레퍼런스에서 소개하는 카프카 스트림즈의 예제코드이다 토픽에서 메시지를 받아서 적절하게 데이터를 가공하여 특정 토픽으로 데이터를 내보내고 있다 내부 로직은 마치 Java1.8의 Stream와 비슷한 코드같이 보인다 메소드를 보면 매개변수와 반환타입이 모두 KStream객체다 나가고 들어노는 객체의 타입은 하나로 고정되어있으므로 비지니스로직에 조금이라도 더 집중할 수 있을 것 같다.

 

Configuration Option

Kafka Streams Properties는 모두 spring.cloud.stream.kafka.streams.binder 접두어가 붙는다. 

 

  • brokers - broker URL
  • zkNodes - zookeeper URL
  • serdeError - 역 직렬화 오류 처리 유형. logAndContinue,logAndFail,sendToDlq 
  • applicationId - 애플리케이션에서 사용하는 카프카 스트림 아이디값. 모든 카프카 스트림의 아이디값은 유일해야한다.

Producer Properties

  • keySerde - key serde, 키의 직렬화 옵션(default value = none)
  • valueSerde -  value serde, 값의 직렬화 옵션 (default value = none)
  • userNativeEncoding - native 인코딩 활성화 플래그(default value = false)

Consumer Properties

  • keySerde - key serde, 키의 역직렬화 옵션(default value = none)
  • valueSerde - value serde, 값의 역직렬화 옵션(default value = none)
  • materializedAs - KTable 타입을 사용할 때 상태저장소를 구체화한다.(default value = none)
  • useNativeDecoding - native 디코드 활성화 플래그(default value = false)
  • dlqName - DLQ 토픽 이름(default value = none)

기타 다른 설정들은 레퍼런스 참고하시길 바랍니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
 * in,out channel defined
 * 즉, 카프카 토픽에 메시지를 쓰는 발신 채널과
 * 카프카 토픽에서 메시지를 읽어오는 수신 채널을 정의해주는 것이다.
 * 꼭 @Input & @Output 어노테이션을 하나씩 넣을 필요는 없다.
 * 필요한 채널수만큼 정의가능하다.
 * 
 * 런타임동안에는 스프링이 구현체를 제공해준다.
 * @author yun-yeoseong
 *
 */
public interface ExamProcessor {
    String INPUT = "exam-input";
    String OUTPUT = "exam-output";
    String OUTPUT2 = "exam-output2";
    
    /**
     * @Input 어노테이션 설명
     * 메시지 시스템에서 메시지를 읽어오는 입력채널 
     * 매겨변수로 채널이름을 넣을 수 있다. 넣지 않으면 기본으로
     * 메소드 이름으로 들어간다.
     * @return
     */
    @Input(INPUT)
    SubscribableChannel inboundChannel();
    
    @Input("streams-input")
    KStream<?, String> inboundChannel2();
    
    @Input("streams-input2")
    KStream<?, String> inboundChannel3();
    /**
     * @Output 어노테이션 설명
     * 메시지 시스템으로 메시지를 보내는 출력채널
     * 매겨변수로 채널이름을 넣을 수 있다. 넣지 않으면 기본으로
     * 메소드 이름으로 들어간다.
     * @return
     */
    @Output(OUTPUT)
    MessageChannel outboundChannel();
    
    @Output(OUTPUT2)
    MessageChannel outboundChannel2();
    
    @Output("streams-output")
    KStream<?, String> outboundChannel3();
    
}
 
cs

카프카 스트림즈를 스프링 클라우드 스트림에서 사용하려면 input&output을 KStream을 반환타입으로 채널을 정의한다. 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Slf4j
@EnableBinding(ExamProcessor.class)
@SpringBootApplication
public class KafkaStreamsExamApplication {
 
    public static void main(String[] args) {
        SpringApplication.run(KafkaStreamsExamApplication.class, args);
    }
    
    @StreamListener(target= ExamProcessor.INPUT,condition="headers['producerName']=='yeoseong'")
    public void receive(@Payload Exam exam) {
        log.info("Only Header value yeoseong = {}",exam.toString());
    }
    
    @StreamListener("streams-input")
    @SendTo("streams-output")
    public KStream<?, String> streams(KStream<?, String> input){
        return input.flatMapValues(new ValueMapper() {
            @Override
            public Object apply(Object value) {
                // TODO Auto-generated method stub
                String valueStr = (String)value;
                System.out.println(valueStr);
                return Arrays.asList(valueStr.split(" "));
            }
        });
    }
    
    @StreamListener("streams-input2")
    public void streams2(KStream<?, String> input) {
        System.out.println("streams2");
        input.foreach( (key,value) -> System.out.println(value));
    }
}
 
cs

위에서 보이듯이 sink processor라면 그 밑으로 더 이상 processor가 붙지 않으므로 void 반환타입으로 데이터를 입력받아 적절한 처리를 하면된다. 하지만 밑으로 스트림이 더 붙는 스트림들은 반환타입으로 KStream을 반환해야한다. 지금까지 간단하게 Kafka Streams API를 Spring Cloud Stream에서 사용해보았다. kafka streams DSL의 자세한 문법은 곧 포스팅할 카프카 스트림즈 API에 대한 글을 참고하시거나 카프카 레퍼런스를 참고하시면 될듯합니다. 

 

posted by 여성게
:
Middleware/Kafka&RabbitMQ 2019. 3. 28. 16:41

Kafka - Spring cloud stream kafka(스프링 클라우드 스트림 카프카)



이전 포스팅까지는 카프카의 아키텍쳐, 클러스터 구성방법, 자바로 이용하는 프로듀서,컨슈머 등의 글을 작성하였다. 이번 포스팅은 이전까지 작성된 지식을 바탕으로 메시징 시스템을 추상화한 구현체인 Spring Cloud Stream을 이용하여 카프카를 사용하는 글을 작성하려고 한다. 혹시라도 카프카에 대해 아직 잘모르는 사람들이 이 글을 본다면 이전 포스팅을 한번 참고하고 와도 좋을 것같다.(이번에 작성하는 포스팅은 Spring Cloud stream 2.0 레퍼런스 기준으로 작성하였다.)

그리고 이번 포스팅에서 진행하는 모든 예제는 카프카를 미들웨어로 사용하는 예제이고, 카프카는 클러스터를 구성하였다.


▶︎▶︎▶︎Spring Cloud Stream v2.0 Reference

▶︎▶︎▶︎2019/03/12 - [Kafka&RabbitMQ] - Kafka - Kafka(카프카)의 동작 방식과 원리

▶︎▶︎▶︎2019/03/13 - [Kafka&RabbitMQ] - Kafka - Kafka(카프카) cluster(클러스터) 구성 및 간단한 CLI사용법

▶︎▶︎▶︎2019/03/16 - [Kafka&RabbitMQ] - Kafka - Kafka Producer(카프카 프로듀서) In Java&CLI

▶︎▶︎▶︎2019/03/24 - [Kafka&RabbitMQ] - Kafka - Kafka Consumer(카프카 컨슈머) Java&CLI

▶︎▶︎▶︎2019/03/26 - [Kafka&RabbitMQ] - Kafka - Kafka Streams API(카프카 스트림즈)


스프링 클라우드 스트림을 이용하면 RabbitMQ,Kafka와 같은 미들웨어 메시지 시스템과는 종속적이지 않게 추상화된 방식으로 메시지 시스템을 이용할 수 있다.(support RabbitMQ,Kafka) 아래 그림은 스프링 클라우드 스트림의 애플리케이션 모델이다.

스프링 클라우드 스트림 애플리케이션과 메시지 미들웨어 시스템은 직접 붙지는 않는다. 중간에 스프링 클라우드 스트림이 제공하는 바인더 구현체를 중간에 두고 통신을 하기 때문에 애플리케이션에서는 미들웨어 독립적으로 추상화된 방식으로 개발 진행이 가능하다. 그리고 애플리케이션과 바인더는 위의 그림과 같이 inputs, outputs 채널과 통신을 하게 된다.

  • Binder : 외부 메시징 시스템과의 통합을 담당하는 구성 요소입니다.
  • Binding(input/output) : 외부 메시징 시스템과 응용 프로그램 간의 브리지 (대상 바인더에서 생성 한 메시지 생성자  소비자 ).
  • Middleware : RabbitMQ, Kafka와 같은 메시지 시스템.

바인더 같은 경우는 스프링이 설정에서 읽어 미들웨어에 해당하는 바인더를 구현체로 제공해준다. 물론 RabbitMQ, Kafka를 동시에 사용 가능하다. 그리고 바인딩 같은 경우는 기본으로 Processor(input,output),Source(output),Sink(input)라는 바인딩을 인터페이스로 제공한다. 이러한 스프링 클라우드 스트림을 이용하여 작성한 완벽히 동작하는 코드의 예제이다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@SpringBootApplication
@EnableBinding(Processor.class)
public class MyApplication {
 
    public static void main(String[] args) {
        SpringApplication.run(MyApplication.class, args);
    }
 
    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public String handle(String value) {
        System.out.println("Received: " + value);
        return value.toUpperCase();
    }
}
cs


이 코드는 Processor라는 바인딩 채널을 사용하고, handle이라는 메소드에서 Processor의 input 채널에서 메시지를 받아서 값을 한번 출력하고 그대로 output 채널로 메시지를 보내는 코드이다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public interface Sink {
 
  String INPUT = "input";
 
  @Input(Sink.INPUT)
  SubscribableChannel input();
 
}
public interface Source {
 
  String OUTPUT = "output";
 
  @Output(Source.OUTPUT)
  MessageChannel output();
 
}
 
public interface Processor extends Source, Sink {}
cs


스프링 클라우드 스트림에서 기본적으로 제공하는 바인딩 채널이다. 만약 이 채널들 이외에 채널을 정의하고 싶다면 위와 같이 인터페이스로 만들어주고, @EnableBinding에 매개변수로 넣어주면된다.(매개변수는 여러개의 인터페이스를 받을 수 있다.)


1
2
3
4
5
6
7
8
9
10
11
public interface Barista {
 
    @Input
    SubscribableChannel orders();
 
    @Output
    MessageChannel hotDrinks();
 
    @Output
    MessageChannel coldDrinks();
}
cs

또한 채널 바인딩 어노테이션(@Input,@Output) 매개변수로 채널이름을 넣어줄 수 있다.

1
@EnableBinding(value = { Orders.class, Payment.class })
cs


스프링 클라우드 스트림에서 바인딩 가능한 채널타입은 MessageChannel(inbound)와 SubscribableChannel(outbound) 두개이다.

1
2
3
4
5
6
public interface PolledBarista {
 
    @Input
    PollableMessageSource orders();
    . . .
}
cs


지금까지는 이벤트 기반 메시지이지만 위처럼 Pollable한 채널을 바인딩 할 수 있다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Autowire
private Source source
 
public void sayHello(String name) {
    source.output().send(MessageBuilder.withPayload(name).build());
}
 
@Autowire
private MessageChannel output;
 
public void sayHello(String name) {
    output.send(MessageBuilder.withPayload(name).build());
}
 
@Autowire
@Qualifier("myChannel")
private MessageChannel output;
cs


정의한 채널에 대한 @Autowired하여 직접 사용도 가능하다. 그리고 @Qualifier 어노테이션을 이용해 다중 채널이 정의되어 있을 경우 특정한 채널을 주입받을 수 있다.


@StreamListener 사용

스프링 클라우드 스트림은 다른 org.springframework.messaging의 어노테이션도 같이 사용가능하다.(@Payload,@Headers,@Header)


1
2
3
4
5
6
7
8
9
10
11
@EnableBinding(Sink.class)
public class VoteHandler {
 
  @Autowired
  VotingService votingService;
 
  @StreamListener(Sink.INPUT)
  public void handle(Vote vote) {
    votingService.record(vote);
  }
}
cs


1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
@Slf4j
public class KafkaListener {
    
    @StreamListener(ExamProcessor.INPUT)
    @SendTo(ExamProcessor.OUTPUT2)
    public Exam listenMessage(@Payload Exam payload,@Header("contentType"String header) {
        log.info("input message = {} = {}",payload.toString(),header);
        return payload;
    }
}
 
=>결과 : input message = Exam(id=id_2, describe=test message != application/json
 
cs


또한 @SendTo 어노테이션을 추가로 붙여서 메시지를 수신하고 어떠한 처리를 한 후에 메시지를 출력채널로 내보낼 수 있다.


1
2
3
4
5
6
7
8
9
10
11
12
@EnableBinding(Processor.class)
public class TransformProcessor {
 
  @Autowired
  VotingService votingService;
 
  @StreamListener(Processor.INPUT)
  @SendTo(Processor.OUTPUT)
  public VoteResult handle(Vote vote) {
    return votingService.record(vote);
  }
}
cs


바로 위의 코드는 즉, 인바운드 채널에서 메시지를 받아서 그대로 해당 데이터를 리턴하여 아웃바운드 채널에 내보내는 예제이다. 실제로는 메소드 내에 비지니스 로직이 들어가 적절히 데이터 조작이 있을 수 있다.


@StreamListener for Content-based routing

조건별로 @StreamListener 주석처리가 된 인입채널 메소드로 메시지를 유입시킬 수 있다. 하지만 이 기능에는 밑에와 같은 조건을 충족해야한다.


  • 반환값이 있으면 안된다.
  • 개별 메시지 처리 메소드여야한다.

조건은 어노테이션 내의 condition 인수에 SpEL 표현식에 의해 지정된다. 그리고 조건과 일치하는 모든 핸들러는 동일한 스레드에서 호출되며 핸들러에 대한 호출이 발생하는 순서는 가정할 수 없다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class TestPojoWithAnnotatedArguments {
 
    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bogey'")
    public void receiveBogey(@Payload BogeyPojo bogeyPojo) {
       // handle the message
    }
 
    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bacall'")
    public void receiveBacall(@Payload BacallPojo bacallPojo) {
       // handle the message
    }
}
cs


위에서 얘기한 것과 같이 조건별로 메시지를 분기하는 경우 해당 채널로 @StreamListener된 메소드들은 모두 반환값이 void여야한다.(Sink.INPUT) 만약 위에서 위는 void, 아래에는 반환값이 있는 메소드로 선언한다면 예외가 발생한다.



Error Handler

Spring Cloud Stream은 오류 처리를 유연하게 처리하는 메커니즘을 제공한다. 오류 처리에는 크게 두가지가 있다.

  • application : 사용자 정의 오류처리이며, 애플리케이션 내에서 오류 처리를 진행한다.
  • system : 오류 처리가 기본 메시징 미들웨어의 기능에 따라 달라진다.


Application Error Handler


애플리케이션 레벨 오류 처리에는 두 가지 유형이 있다. 오류는 각 바인딩 subscription에서 처리되거나 전역 오류 핸들러가 모든 바인딩 subscription의 오류를 처리한다. 각 input 바인딩에 대해, Spring Cloud Stream은 <destinationName>.<groupName>.errors 설정으로 전용 오류 채널을 생성한다.


1
spring.cloud.stream.bindings.input.group=myGroup
cs


컨슈머 그룹을 설정한다.(만약 그룹명을 지정하지 않았을 경우 익명으로 그룹이 생성된다.)


1
2
3
4
5
6
7
8
9
@StreamListener(Sink.INPUT) // destination name 'input.myGroup'
public void handle(Person value) {
    throw new RuntimeException("BOOM!");
}
//만약 Sink.INPUT의 input 채널이름이 input이고, 해당 채널의 consumer group이 myGroup이라면
//해당 채널 단독의 에러 처리기의 inputChannel 매개변수의 값은 아래와 같다.("input.myGroup.errors")
@ServiceActivator(inputChannel = "input.myGroup.errors"//channel name 'input.myGroup.errors'
public void error(Message<?> message) {
    System.out.println("Handling ERROR: " + message);
}
cs


위와 같은 코드를 작성하면 Sink.INPUT.myGroup 채널의 전용 에러채널이 생기는 것이다. 하지만 이렇게 채널별이 아닌 전역 에러 처리기를 작성하려면 아래와 같은 코드를 작성하면 된다.


1
2
3
4
@StreamListener("errorChannel")
public void error2(Message<?> message) {
    log.error("Global Error Handling !");
}
cs


System Error Handling


System 레벨의 오류 처리는 오류가 메시징 시스템에 다시 전달되는 것을 의미하며, 모든 메시징 시스템이 동일하지는 않다. 즉, 바인더마다 기능이 다를 수 있다. 내부 오류 처리기가 구성되어 있지 않으면 오류가 바인더에 전파되고 바인더가 해당 오류를 메시징 시스템에 전파한다. 메시징 시스템의 기능에 따라 시스템은 메시지를 삭제하고 메시지를 다시 처리하거나 실패한 메시지를 DLQ로 보낼 수 있다. 해당 내용은 뒤에서 더 자세히 다룬다.


바인딩 정보 시각화


1
2
3
4
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
cs


의존성을 추가해준다.


1
management.endpoints.web.exposure.include=bindings
cs


application.propertis에 해당 설정을 추가해준 후에, host:port/actuator/bindings 를 호출하면 바인딩된 정보를 JSON형태로 받아볼 수 있다.



Binder Configuration Properties


Spring Auto configuration을 사용하지 않고 사용자 정의 바인더를 등록할때 다음 등록 정보들을 사용할 수 있다. 이 속성들은 모두 org.springframework.cloud.stream.config.BinderProperties 패키지에 정의되어있다. 그리고 해당 설정들은 모두 spring.cloud.stream.binders. 접두어가 붙는다. 밑에서 설명할 설정들은 모두 접두어가 생략된 형태이므로 실제로 작성할때는 접두어를 붙여준다.


  • type

바인더의 타입을 지정한다.(rabbit,kafka)

  • inheritEnvironment

애플리케이션 자체 환경을 상속하는지 여부

기본값 : true

  • environment

바인더 환경을 사용자가 직접 정의하는데 사용할 수 있는 설정이다.

기본값 : empty

  • defaultCandidate

바인더 구성이 기본 바인더로 간주되는지 또는 명시적으로 참조할 때만 사용할 수 있는지 여부 이 설정을 사용하면 기본 처리를 방해하지 않고 바인더 구성이 가능하다.

기본값 : true



Common Binding Properties


binding 설정입니다. 해당 설정은 spring.cloud.stream.bindings.<channelName> 접두어가 붙는다. 밑에서 설명할 설정은 모두 접두어가 생략된 설정이므로, 직접 애플리케이션을 설정할때에는 꼭 접두어를 붙여줘야한다.


  • destination

해당 채널을 메시지 시스템 토픽과 연결해주는 설정이다. 채널이 consumer로 바인딩되어 있다면, 여러 대상에 바인딩 될 수 있으며 대상 이름은 쉼표로 구분된 문자열이다.

  • group

컨슈머 그룹명설정이다. 해당 설정은 인바운드 바인딩에만 적용되는 설정이다.

기본값 : null

  • contentType

메시지의 컨텐츠 타입이다.

기본값 : null

  • binder

사용될 바인더를 설정한다.

기본값 : null



Consumer Properties

  • concurrency

인바운드 소비자의 동시성

기본값 : 1.

  • partitioned

컨슈머가 파티션된 프로듀서로부터 데이터를 수신하는지 여부입니다.

기본값 : false.

  • headerMode

none으로 설정하면 입력시 헤더 구문 분석을 사용하지 않습니다. 기본적으로 메시지 헤더를 지원하지 않으며 헤더 포함이 필요한 메시징 미들웨어에만 유효합니다. 이 옵션은 원시 헤더가 지원되지 않을 때 비 Spring Cloud Stream 애플리케이션에서 데이터를 사용할 때 유용합니다. 로 설정 headers하면 미들웨어의 기본 헤더 메커니즘을 사용합니다. 로 설정 embeddedHeaders하면 메시지 페이로드에 헤더가 포함됩니다.

기본값 : 바인더 구현에 따라 다릅니다.

  • maxAttempts

처리가 실패하면 다시 메시지를 처리하는 시도 횟수 (첫 번째 포함). 1로 설정하면 다시 메시지 처리를 시도하지 않는다.

기본값 : 3.

  • backOffInitialInterval

다시 시도 할 때 백 오프 초기 간격입니다.

기본값 : 1000.

  • backOffMaxInterval

최대 백 오프 간격.

기본값 : 10000.

  • backOffMultiplier

백 오프 승수입니다.

기본값 : 2.0.

  • instanceIndex

0보다 큰 값으로 설정하면이 소비자의 인스턴스 색인을 사용자 정의 할 수 있습니다 (다른 경우spring.cloud.stream.instanceIndex). 음수 값으로 설정하면 기본값은로 설정됩니다.(카프카의 경우 autoRebalence 설정이 false 일 경우)

기본값 : -1.

  • instanceCount

0보다 큰 값으로 설정하면이 소비자의 인스턴스 수를 사용자 정의 할 수 있습니다 (다른 경우 spring.cloud.stream.instanceCount). 음수 값으로 설정하면 기본값은로 설정됩니다.(카프카의 경우 autoRebalence 설정이 false 일 경우)

기본값 : -1.


Producer Properties

  • partitionKeyExpression

아웃 바운드 데이터를 분할하는 방법을 결정하는 SpEL 식입니다. set 또는 ifpartitionKeyExtractorClass가 설정되면이 채널의 아웃 바운드 데이터가 분할됩니다.partitionCount효과가 있으려면 1보다 큰 값으로 설정해야합니다. 

기본값 : null.

  • partitionKeyExtractorClass

PartitionKeyExtractorStrategy구현입니다. set 또는 if partitionKeyExpression가 설정되면이 채널의 아웃 바운드 데이터가 분할됩니다. partitionCount효과가 있으려면 1보다 큰 값으로 설정해야합니다. 

기본값 : null.

  • partitionSelectorClass

PartitionSelectorStrategy구현입니다.

기본값 : null.

  • partitionSelectorExpression

파티션 선택을 사용자 정의하기위한 SpEL 표현식. 

기본값 : null.

  • partitionCount

파티셔닝이 사용 가능한 경우 데이터의 대상 파티션 수입니다. 제작자가 분할 된 경우 1보다 큰 값으로 설정해야합니다. 카프카에서는 힌트로 해석됩니다. 이것보다 크고 대상 항목의 파티션 수가 대신 사용됩니다.

기본값 : 1.


Content-Type


Spring Cloud Stream은 contentType에 대해 세 가지 메커니즘을 제공한다.


  • Header 

contentType 자체를 헤더로 제공한다.

  • binding

spring.cloud.stream.bindings.<inputChannel>.content-type 설정으로 타입을 설정한다.

  • default

contentType이 명시적으로 설정되지 않은 경우 기본 application/json 타입으로 적용한다.


위의 순서대로 우선순위 적용이 된다.(Header>bindings>default) 만약 메소드 반환 타입이 Message 타입이면 해당 타입으로 메시지를 수신하고, 만약 일반 POJO로 반환타입이 정의되면 컨슈머에서 해당 타입으로 메시지를 수신한다.





Apache Kafka Binder


1
2
3
4
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
cs


의존성을 추가해준다.


Kafka Binder Properties

  • spring.cloud.stream.kafka.binder.brokers

Kafka 바인더가 연결될 브로커 목록이다.

기본값 : localhost

  • spring.cloud.stream.kafka.binder.defaultBrokerPort

brokers 설정에 포트 정보가 존재하지 않는다면 해당 호스트 리스트들이 사용할 포트를 지정해준다.

기본값 : 9092

  • spring.cloud.stream.kafka.binder.configuration

바인더로 작성된 모든 클라이언트에 전달 될 클라이언트 속성(프로듀서,컨슈머)의 키/값 맵이다. 이러한 속성은 프로듀서와 컨슈머 모두가 사용한다.

기본값 : empty map

  • spring.cloud.stream.kafka.binder.headers

바인더에 의해 전송되는 사용자 지정 헤더 목록이다.

기본값 : null

  • spring.cloud.stream.kafka.binder.healthTimeout

파티션 정보를 얻는 데 걸리는 시간(초). 

기본값 : 10

  • spring.cloud.stream.kafka.binder.requiredAcks

브로커에서 필요한 ack수이다.(해당 설명은 이전 포스팅에 설명되어있다.)

기본값 : 1

  • spring.cloud.stream.kafka.binder.minPartitionCount

autoCreateTopics,autoAddPartitions true를 설정했을 경우에만 적용되는 설정이다. 바인더가 데이터를 생성하거나 소비하는 주제에 대해 바인더가 구성하는 최소 파티션 수이다.

기본값 : 1

  • spring.cloud.stream.kafka.binder.replicationFactor

autoCreateTopics true를 설정했을 경우에 자동 생성된 토픽 복제요소 수이다.

기본값 : 1

  • spring.cloud.stream.kafka.binder.autoCreateTopics

true로 설정하면 토픽이 존재하지 않을 경우 자동으로 토픽을 만들어준다. 만약 false로 설정되어있다면 미리 토픽이 생성되어 있어야한다.

기본값 : true

  • spring.cloud.stream.kafka.binder.autoAddPartitions

true로 설정하면 바인더가 필요할 경우 파티션을 추가한다. 예를 들어 사용자의 메시지 유입이 증가하여 컨슈머를 추가하여 파티션수가 하나더 늘었다고 가정하자. 그러면 기존의 토픽의 파티션 수는 증설한 파티션의 총수보다 작을 것이고, 이 설정이 true라면 바인더가 자동으로 파티션수를 증가시켜준다.

기본값 : false



Kafka Consumer Properties


spring.cloud.stream.kafka.bindings.<inputChannel>.consumer 접두어가 붙는다.

  • autoRebalanceEnabled

파티션 밸런싱을 자동으로 처리해준다.

기본값 : true

  • autoCommitOffset

메시지가 처리되었을 경우, 오프셋을 자동으로 커밋할지를 설정한다.

기본값 : true

  • startOffset

새 그룹의 시작 오프셋이다. earliest, latest

기본값 : null(==eariest)

  • resetOffsets
consumer의 오프셋을 startOffset에서 제공한 값으로 재설정할지 여부
기본값 : false


Kafka Producer Properties

  • bufferSize

kafka 프로듀서가 전송하기 전에 일괄 처리하려는 데이터 크기이다.

기본값 : 16384

  • batchTimeout

프로듀서가 메시지를 보내기 전에 동일한 배치에 더 많은 메시지가 누적될 수 있도록 대기하는 시간. 예를 들어 버퍼사이즈가 꽉 차지 않았을 경우 얼마나 기다렸다고 메시지 처리할 것인가를 정하는 시간이다.

기본값 : 0


Partitioning with the Kafka Binder


순서가 중요한 메시지가 있을 경우가 있다. 이럴 경우에는 어떠한 키값을 같이 포함시켜 메시지를 발신함으로써 하나의 파티션에만 데이터를 보내 컨슈머쪽에서 데이터의 순서를 정확히 유지하여 데이터를 받아올 수 있다.


1
2
3
#partition-key-expression
spring.cloud.stream.bindings.exam-output.producer.partition-key-expression=headers['partitionKey']
spring.cloud.stream.bindings.exam-output.producer.partition-count=4
cs


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
    @Autowired
    private ExamProcessor processor;
    
    public void sendMessage(Exam exam,String partitionId) {
        log.info("Sending Message = {}",exam.toString());
        
        MessageChannel outputChannel = processor.outboundChannel();
        
        outputChannel.send(MessageBuilder
                                    .withPayload(exam)
                                    .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                                    .setHeader("partitionKey", partitionId)
                                    .build());
    }
 
    @StreamListener(ExamProcessor.INPUT)
    public void listenMessage(@Payload Exam payload,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int header) {
        log.info("input message = {} partition_id= {}",payload.toString(),header);
    }
cs


application.properties에 파티션키로 사용할 헤더의 키값을 설정하고, 프로듀서 쪽에서 키값을 포함시켜 데이터를 보낸 후에 컨슈머쪽에서 어떠한 파티션에서 데이터를 받았는지 확인해본다. 키값을 동일하게 유지한채 메시지를 보내면 하나의 파티션에서만 메시지를 받아온다. 그러나 파티션 키값을 적용하지 않고 메시지를 보내면 컨슈머가 받아오는 파티션 아이디는 계속해서 변경이 된다.


이번 포스팅은 내용이 조금 길어져서 여기까지만 작성하고 다음 포스팅에서 Spring Cloud Stream 기반에서 사용하는 kafka stream API에 대해 포스팅할 것이다. 이번 포스팅에서는 많은 설명들이 레퍼런스에 비해 빠져있다. 아는 내용은 작성하고 모르는 내용은 포함시킬경우 틀릴 가능성이 있기 때문에 레퍼런스와 비교해 내용에 차이가 있다. 혹시나 더 많은 설명이 필요하다면 직접 레퍼런스를 보면 될것 같다.

posted by 여성게
:
Middleware/Kafka&RabbitMQ 2019. 3. 26. 00:07

Kafka - Kafka Streams API(카프카 스트림즈)



카프카는 대규모 메시지를 저장하고 빠르게 처리하기 위해 만들어진 제품이다. 초기 사용 목적과는 다른 뛰어난 성능에 일련의 연속된 메시지인 스트림을 처리하는 데도 사용이 되기 시작했다. 이러한 스트림을 카프카는 Kafka Streams API를 통해 제공한다.


설명하기 앞서 우선 스트림 프로세싱과 배치 프로세싱의 차이점이란 무엇일까? 스트림 프로세싱(Stream Processing)은 데이터들이 지속적으로 유입되고 나가는 과정에서 이 데이터에 대한 일련의 처리 혹은 분석을 수행하는 것을 의미한다. 즉, 스트림 프로세싱은 실시간 분석(Real Time Analysis)이라고 불리기도 한다. 스트림 프로세싱과는 대비되는 개념으로 배치(Batch)처리 또는 정적 데이터(Data-at-rest)처리를 들 수 있다. 배치 및 정적 데이터 처리란 위의 스트림 프로세싱과는 다르게 데이터를 한번에 특정 시간에 처리한다라는 특징이 있다. 주로 사용자의 요청이 몰리지 않는 새벽 시간대에 많이 수행하기도 한다.(물론 무조건 그렇다고는 할 수 없다) 그렇지만 사실 뭐가 좋고 나쁨은 이야기 할 수 없다. 스트림과 배치의 서로의 장단점이 있기 때문이다. 하지만 요즘은 역시나 실시간 데이터 처리가 각광받고 있는 사실은 숨길 수 없다.


<특정 이벤트 발생시 바로바로 애플리케이션에서 데이터처리를 하고 있다.>


그렇다면 스트림 프로세싱의 장점은 무엇이 있을까? 우선은 애플리케이션이 이벤트에 즉각적으로 반응을 하기 때문에 이벤트 발생과 분석,조치에 있어 거의 지연시간이 발생하지 않는다. 또한 항상 최신의 데이터를 반영한다라는 특징이 있다. 그리고 데이터를 어디에 담아두고 처리하지 않기 때문에 대규모 공유 데이터베이스에 대한 요구를 줄일 수 있고, 인프라에 독립적인 수행이 가능하다.



상태 기반과 무상태 스트림 처리

스트림 프로세싱에는 상태 기반과 무상태 스트림 처리가 있다. 차이점이란 실시간 데이터 처리를 위하여 이전에 분석된 데이터의 결과가 필요한지이다. 이렇게 상태를 유지할 스트림 프로세싱은 이벤트를 처리하고 그 결과를 저장할 상태 저장소가 필요하다. 이와는 반대로 무상태 스트림 처리는 이전 스트림의 처리 결과와 관계없이 현재 데이터로만 처리를 한다.




카프카 스트림즈(Kafka Streams API)



카프카 스트림즈는 카프카에 저장된 데이터를 처리하고 분석하기 위해 개발된 클라이언트 라이브러리이다. 카프카 스트림즈는 이벤트 시간과 처리 시간을 분리해서 다루고 다양한 시간 간격 옵션을 지원하기에 실시간 분석을 간단하지만 효율적으로 진행할 수 있다. 우선 카프카 스트림즈에 대해 설명하기 전에 용어 정리를 할 필요가 있을 것같다.


용어 

설명 

스트림(Stream) 

스트림은 카프카 스트림즈 API를 사용해 생성된 토폴로지로, 끊임없이 전달되는 데이터 세트를 의미한다. 스트림에 기록되는 단위는 key-value 형태이다. 

스트림 처리 애플리케이션(Stream Processing Application) 

카프카 스트림 클라이언트를 사용하는 애플리케이션으로서, 하나 이상의 프로세서 토폴로지에서 처리되는 로직을 의미한다. 프로세서 토폴로지는 스트림 프로세서가 서로 연결된 그래프를 의미한다. 

스트림 프로세서(Stream Processor) 

프로세서 토폴로지를 이루는 하나의 노드를 말하여 여기서 노드들은 프로세서 형상에 의해 연결된 하나의 입력 스트림으로부터 데이터를 받아서 처리한 다음 다시 연결된 프로세서에 보내는 역할을 한다. 

 소스 프로세서(Source Processor)

위쪽으로 연결된 프로세서가 없는 프로세서를 말한다. 이 프로세서는 하나 이상의 카프카 토픽에서 데이터 레코드를 읽어서 아래쪽 프로세서에 전달한다. 

싱크 프로세서(Sink Processor) 

토폴로지 아래쪽에 프로세서 연결이 없는 프로세서를 뜻한다. 상위 프로세서로부터 받은 데이터 레코드를 카프카 특정 토픽에 저장한다. 



카프카 스트림즈 아키텍쳐

카프카 스트림즈에 들어오는 데이터는 카프카 토픽의 메시지이다. 각 스트림 파티션은 카프카의 토픽 파티션에 저장된 정렬된 메시지이고, 해당 메시지는 key-value형태이다. 또한 해당 메시지의 키를 통해 다음 스트림(카프카 토픽)으로 전달된다. 위의 그림에서 보듯 카프카 스트림즈는 입력 스트림의 파티션 개수만큼 태스크를 생성한다. 각 태스크에는 입력 스트림(카프카 토픽) 파티션들이 할당되고, 이것은 한번 정해지면 입력 토픽의 파티션에 변화가 생기지 않는 한 변하지 않는다. 마지 컨슈머 그룹 안의 컨슈머들이 각각 토픽 파티션을 점유하는 것과 비슷한 개념이다.



자바를 이용한 간단한 파이프 예제 프로그램

지금부터 진행할 예제는 이전 포스팅에서 구성했던 카프카 클러스터 환경에서 진행한다. 만약 클러스터 구성이 되어 있지않다면 밑의 링크를 참조해 구성하면 될것 같다.

스프링 부트 애플리케이션 하나를 생성한다. 그리고 필요한 라이브러리 의존성을 추가한다. 로그를 사용하기 위하여 롬복 라이브러리를 추가하였다. 혹시 이클립스 환경에서 롬복을 사용하는 방법을 알고 싶다면 밑의 링크를 참조하자.

▶︎▶︎▶︎2019/02/02 - [Java&Servlet] - Mac OS - Eclipse & Lombok(롬복 사용방법)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.4</version>
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>2.0.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>2.0.1</version>
        </dependency>
cs


이번에 만들 프로그램은 간단히 한쪽 토픽에 입력된 값을 다른 쪽 토픽으로 옮기는 역할을 수행한다. 


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
 
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
 
import lombok.extern.slf4j.Slf4j;
 
@Slf4j
public class Pipe {
    
    public static void main(String[] args) {
        
        /*
         * 카프카 스트림 파이프 프로세스에 필요한 설정값
         * StreamsConfig의 자세한 설정값은
         * https://kafka.apache.org/10/documentation/#streamsconfigs 참고
         */
        Properties props = new Properties();
        //카프카 스트림즈 애플리케이션을 유일할게 구분할 아이디
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
        //스트림즈 애플리케이션이 접근할 카프카 브로커정보
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        //데이터를 어떠한 형식으로 Read/Write할지를 설정(키/값의 데이터 타입을 지정) - 문자열
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
        //데이터의 흐름으로 구성된 토폴로지를 정의할 빌더
        final StreamsBuilder builder = new StreamsBuilder();
        
        //streams-*input에서 streams-*output으로 데이터 흐름을 정의한다.
        /*
         * KStream<String, String> source = builder.stream("streams-plaintext-input");
           source.to("streams-pipe-output");
         */
        builder.stream("streams-plaintext-input").to("streams-pipe-output");
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        //만들어진 토폴로지 확인
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        try {
          streams.start();
          System.out.println("topology started");
      
        } catch (Throwable e) {
          System.exit(1);
        }
        System.exit(0);
    }
}
 
cs


우선 Properties 객체를 이용하여 카프카 스트림즈가 사용할 설정값을 입력한다. 우선 StreamsConfig.APPLICATION_ID_CONFIG는 카프카 클러스터 내에서 스트림즈 애플리케이션을 구분하기 위한 유일한 아이디 값을 설정하기 위한 값이다. 그리고 그 밑에 카프카 클러스터 구성이 된 인스턴스를 리스트로 작성한다. 또한 위에서 설명했듯이 카프카 스트림즈는 키-값 형태로 데이터의 흐름이 이루어지기에 해당 키-값을 어떠한 타입으로 직렬화 할 것인지 선택한다. 현재는 String 타입으로 지정하였다. 그 다음은 스트림 토폴로지를 구성해준다. 토폴로지는 StreamsBuilder 객체를 통하여 구성한다. 위의 소스는 "streams-plaintext-input"이라는 토픽에서 데이터를 읽어 "streams-pipe-output"이라는 토픽으로 데이터를 보내는 토폴로지를 구성하였다. 최종적으로 StreamsBuilder.build()를 호출하여 토폴로지 객체를 만든다. 위의 Topology.describe()를 호출하면 해당 토폴로지가 어떠한 구성으로 이루어졌는지 로그로 상세하게 볼 수 있다.


22:07:24.234 [main] INFO com.kafka.exam.streams.Pipe - Topology info = Topologies:

   Sub-topology: 0

    Source: KSTREAM-SOURCE-0000000000 (topics: [streams-plaintext-input])

      --> KSTREAM-SINK-0000000001

    Sink: KSTREAM-SINK-0000000001 (topic: streams-pipe-output)

      <-- KSTREAM-SOURCE-0000000000


그 다음 KafkaStreams 객체를 생성한다. 매개변수로 토폴로지와 설정 객체를 전달한다.  마지막으로 KafkaStreams.start();를 호출하여 카프카 스트림을 시작한다. 해당 예제에서는 명시하지 않았지만 스트림즈를 종료하려면 close() 해주는 코드도 삽입해야한다.



카프카가 제공하는 콘솔 프로듀서,컨슈머를 이용하여 위의 스트림즈 코드를 수행해보았다. input 토픽에서 스트림즈 애플리케이션이 메시지를 꺼내서 output 토픽으로 잘 전달한것을 볼 수 있다.(>"hi hello") 컨슈머 실행시 세팅한 설정은 직관적으로 해석 가능하므로 따로 설명하지는 않는다.


방금은 단순히 메시지를 받아 다른 쪽 토픽으로 전달만 하는 예제이지만 다음 해볼 예제는 중간에 데이터를 처리하여 output 토픽으로 보내는 행 분리 예제 프로그램이다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
 
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
 
import lombok.extern.slf4j.Slf4j;
 
@Slf4j
public class LineSplit {
    public static void main(String[] args) {
        /*
         * 카프카 스트림 파이프 프로세스에 필요한 설정값
         * StreamsConfig의 자세한 설정값은
         * https://kafka.apache.org/10/documentation/#streamsconfigs 참고
         */
        Properties props = new Properties();
        //카프카 스트림즈 애플리케이션을 유일할게 구분할 아이디
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
        //스트림즈 애플리케이션이 접근할 카프카 브로커정보
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        //데이터를 어떠한 형식으로 Read/Write할지를 설정(키/값의 데이터 타입을 지정) - 문자열
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
        //데이터의 흐름으로 구성된 토폴로지를 정의할 빌더
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
            .to("streams-linesplit-output");
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        //만들어진 토폴로지 확인
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
 
        try {
          streams.start();
          System.out.println("topology started");
        } catch (Throwable e) {
          System.exit(1);
        }
        System.exit(0);
    }
}
 
cs


설정 값은 동일하다 하지만 토폴로지에 로직이 추가된 것을 볼 수 있다. 우선 "streams-plaintext-input" 토픽으로 들어온 메시지를 KStream객체로 만들어준다. 그리고 flatMapValues()를 이용하여 데이터를 공백 단위로 쪼개는 것을 볼 수 있다. 여기서 flatMapValues()를 쓴 이유는 여기서 전달되는 value는 하나의 스트링 객체인데 이것을 List<String>으로 변환을 하였다. 이것을 flatMapValues()를 이용해 리스트의 각 요소를 스트링으로 flat하는 메소드이다. 즉, 메시지를 받아서 flatMapValues()를 통해 새로운 데이터 값이 만들어 졌다. 그러면 .to()에 전달되는 데이터 스트림은 공백단위로 짤린 단어 스트링이 전달될 것이다. 이전 예제와는 다르게 단순 데이터 전달만이 아니라 전달 이전에 적절한 처리를 하나 추가 해준 것이다. 만약 키와 값 모두를 새롭게 만들어서 사용하고 싶다면 flatMap() 메소드를 이용하면 된다. 그 밖에 메소드들을 알고 싶다면 공식 홈페이지를 참고하길 바란다.


22:49:08.941 [main] INFO com.kafka.exam.streams.LineSplit - Topology info = Topologies:

   Sub-topology: 0

    Source: KSTREAM-SOURCE-0000000000 (topics: [streams-plaintext-input])

      --> KSTREAM-FLATMAPVALUES-0000000001

    Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])

      --> KSTREAM-SINK-0000000002

      <-- KSTREAM-SOURCE-0000000000

    Sink: KSTREAM-SINK-0000000002 (topic: streams-linesplit-output)

      <-- KSTREAM-FLATMAPVALUES-0000000001


토폴로지 정보를 보아도 중간에 Processor라는 이름으로 하나의 처리 프로세스가 추가된 것을 볼 수 있다.



hi hello 라는 메시지를 보냈는데 hi / hello 로 짤려서 출력된 것을 볼 수 있다. 


필자는 챗봇을 개발하고 있는데 카프카 스트림즈를 이용하여 간단히 사용자 질문 로그들을 분석하여 형태소 분리된 형태로 데이터를 저장할 수도 있을 것 같아서 간단히 예제로 짜보았다. NoriAnalyzer는 필자가 Nori 형태소 분석기를 이용한 간단히 형태소분석 유틸 클래스를 만든것이다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
 
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
 
import lombok.extern.slf4j.Slf4j;
import rnb.analyzer.nori.analyzer.NoriAnalyzer;
 
@Slf4j
public class LineSplit {
    public static void main(String[] args) {
        /*
         * 카프카 스트림 파이프 프로세스에 필요한 설정값
         * StreamsConfig의 자세한 설정값은
         * https://kafka.apache.org/10/documentation/#streamsconfigs 참고
         */
        Properties props = new Properties();
        //카프카 스트림즈 애플리케이션을 유일할게 구분할 아이디
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
        //스트림즈 애플리케이션이 접근할 카프카 브로커정보
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        //데이터를 어떠한 형식으로 Read/Write할지를 설정(키/값의 데이터 타입을 지정) - 문자열
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
        //데이터의 흐름으로 구성된 토폴로지를 정의할 빌더
        final StreamsBuilder builder = new StreamsBuilder();
        //Nori 형태소 분석기를 이용한 유틸클래스
        NoriAnalyzer analyzer = new NoriAnalyzer();
        
        KStream<StringString> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(analyzer.analyzeForString(value).split(" ")))
            .to("streams-linesplit-output");
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        //만들어진 토폴로지 확인
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
 
        try {
          streams.start();
          System.out.println("topology started");
        } catch (Throwable e) {
          System.exit(1);
        }
    }
}
 
cs



이렇게 추후에는 형태소분리된 모든 단어가 아니라 특정 명사만 추출하여 사용자들의 질문중 가장 많이 포함된 명사들을 뽑아내 데이터를 분석할 수도 있을 것같다.


지금까지의 예제는 바로 무상태 스트림 프로세싱이다. 다음 예제는 이전 데이터 처리에 대한 상태를 참조하여 데이터를 처리하는 단어 빈도수 세기 프로그램 예제이다. 


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
 
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.state.KeyValueStore;
 
import lombok.extern.slf4j.Slf4j;
import rnb.analyzer.nori.analyzer.NoriAnalyzer;
 
@Slf4j
public class WordCounter {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
      
        final StreamsBuilder builder = new StreamsBuilder();
        
        NoriAnalyzer analyzer = new NoriAnalyzer();
      
        KStream<StringString> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(analyzer.analyzeForString(value).split(" ")))
                //return하는 value가 키가 된다.
                .groupBy(new KeyValueMapper<StringStringString>() {
                  @Override
                  public String apply(String key, String value) {
                    log.info("key = {},value = {}",key,value);
                    return value;
                  }
                })
                //count()를 호출하여 해당 키값으로 몇개의 요소가 있는지 체크한다. 그리고 해당 데이터를 스토어에(KeyValueStore<Bytes,byte[]> counts-store) 담고

  //변경이 있는 KTable에 대해서만 결과값을 리턴한다.

                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
                .toStream()
                .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
      
        final Topology topology = builder.build();
        System.out.println(topology.describe());
      
        final KafkaStreams streams = new KafkaStreams(topology, props);
      
        try {
          streams.start();
        } catch (Throwable e) {
          System.exit(1);
        }
    }
}
 
cs


설정값은 동일하다. 조금 달라진 것은 데이터처리부이다. 우선은 입력받은 데이터를 형태소분리하여 토큰 단위로 스트림을 만든다. 그리고 groupBy를 이용하여 토큰값을 키로 하여 스트림을 그룹핑한다. 그 다음 count()를 통해 같은 키값으로 몇개의 요소가 있는 지를 KeyValueStore에 담고, 만약 키값스토어의 값이 변경이 있을 때만 다운스트림으로 내려준다.(key == null은 무시한다) 처음에는 해당 단어:빈도수가 출력이 되지만 이후에는 이 단어가 또 들어오지 않는다면 출력되지 않고 해당 단어가 들어오면 키값 저장소에 빈도수가 변경이 되므로 다운스트림으로 내려준다. 이말은 무엇이냐면 즉, 키값스토어가 이전 데이터 처리의 상태를 담고 있는 저장소가 되어 이후 데이터에서 참조되어 사용되는 것이다.(사실 더 자세한 것은 API문서를 봐야할 듯하다.)




일부 출력이 안된 것들은 토픽의 버퍼가 원하는 용량에 도달하지 못해 아직 출력하지 못한 데이터들이다. 이렇게 상태를 유지하여 스트림 프로세스를 구성할 수도 있다. 


지금까지 아주 간단하게 카프카 스트림즈 API에 대해 다루었다. 사실 실무에서 이용할 수 있을 만큼의 예제는 아니지만 카프카가 이렇게 실시간 데이터 스트림에도 이용될 수 있다는 것을 알았다면 추후에 API문서를 참조하여 카프카를 응용할 수 있을 것같다.

posted by 여성게
: