Middleware/Kafka&RabbitMQ 2019. 3. 30. 00:45

Kafka - Kafka Stream API(카프카 스트림즈) - 2



이전 카프카 스트림즈 포스팅에서는 간단하게 카프카 스트림즈 API를 다루어보았습니다. 이번 2번째 카프카 스트림즈 포스팅은 조금더 깊게 카프카 스트림즈에 대해 알아보려고 합니다.


Kafka Streams는 Kafka 프로듀서 및 컨슈머를 이용하여 들어오는 메시지를 즉각적으로 가공하여 또 다른 토픽으로 메시지를 내보낼 수 있습니다. 이러한 카프카 스트림즈를 사용하는 기업들을 소개하자면 New York Times는 카프카와 카프카 스트림즈를 이용하여 독자들을 위한 실시간 컨첸츠를 저장하고 배포합니다.그리고 라인 같은 경우는 서비스끼리 통신하기 위한 중앙 데이터 허브로 카프카를 사용합니다. 그리고 카프카 스트림즈를 이용하여 토픽을 데이터를 안정적으로 가공하고 필터링하여 컨슈머가 효율적으로 메시지를 컨슘할 수 있게 합니다. 이러한 많은 기업들이 안정적으로 실시간 데이터를 처리하기 위해 카프카와 카프카 스트림즈를 이용합니다.



카프카 메시징 계층은 데이터를 저장하고 전송하기 위해 데이터를 파티션 단위로 분할한다. 카프카 스트림즈 역시 파티션된 토픽을 기반으로 병렬 처리 모델의 논리 단위로 작업을 진행한다. 카프카 스트림즈는 키/값 형태의 스트림 데이터이므로 스트림 데이터의 키값으로 토픽내 특정 파티션으로 라우팅된다. 일반적으로 파티션 개수만큼 컨슈머 그룹안의 컨슈머들이 생성되듯이 카프카 스트림즈도 역시 파티션 개수만큼 태스크가 생성되어 병렬로 데이터 스트림을 처리할 수 있다. 또한 작업스레드 수를 조정할 수 있어 더욱 효율적인 병렬처리가 가능하다.


Required configuration parameters


  • application.id - 스트림 처리 응용 프로그램의 식별자 아이디이다. Kafka 클러스터 내에서 유일한 값이어야 한다.
  • bootstrap.servers - Kafka 인스턴스 호스트:포트 정보이다.


Optional configuration parameters(중요도 보통이상의 설정값)


  • cache.max.bytes.buffering - 모든 스레드에서 레코드 캐시에 사용할 최대 메모리 바이트 수입니다.(default 10485760 bytes)
  • client.id - 요청시 서버에 전달할 ID 문자열이다. 카프카 스트림즈 내부적으로 프로듀서,컨슈머에게 전달된다(default empty string)
  • default.deserialization.exception.handler - DeserializationExceptionHandler인터페이스를 구현하는 예외 처리 클래스이다.

(default  LogAndContinueExceptionHandler)

  • default.production.exception.handler - ProductionExceptionHandler인터페이스를 구현하는 예외 처리 클래스이다.

  (default DefaultProductionExceptionHandler)

  • key.serde - record key에 대한 직렬화/역직렬화 클래스이다. Serde 인터페이스를 구현한다.(default Serdes.ByteArray().getClass().getName() )
  • num.standby.replicas - 각 태스크의 대기 복제본 수이다.(default 0)
  • num.stream.threads - 스트림을 처리할 스레드 수이다.(default 1)
  • replication.factor - 복제본 수이다.(default 1)
  • retries - 처리 실패시 재시도 횟수(default 0)
  • retry.backoff.ms - 재시도 요청 간격 시간(default 100밀리세컨드)
  • state.dir - 상태 저장소 디렉토리 위치(default /tmp/kafka-streams)
  • value.serde - record value에 대한 직렬화/역직렬화 클래스이다. key.serde와 동일.


default.deserialization.exception.handler


기본 deserialization 예외 처리기를 사용하면 deserialize에 실패한 레코드의 예외를 관리할 수 있다. 예외 처리기는 throw된 예외에 따라 FAIL 또는 CONTINUE를 반환해야한다. FAIL은 스트림이 종료될 것이고, CONTINUE는 예외를 무시하고 계속해서 처리를 진행할 것이다. 내부적으로 제공하는 예외 핸들어가 있다.


  • LogAndContinueExceptionHandler - 예외가 발생하여도 계속해서 프로세스를 진행한다.
  • LogAndFailExceptionHandler - 예외가 발생하면 프로세스를 중지한다.

기본적으로 제공하는 핸들러 이외에도 사용자가 직접 정의하여 예외 핸들러를 구현할 수 있다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class SendToDeadLetterQueueExceptionHandler implements DeserializationExceptionHandler {
    KafkaProducer<byte[], byte[]> dlqProducer;
    String dlqTopic;
 
    @Override
    public DeserializationHandlerResponse handle(final ProcessorContext context,
                                                 final ConsumerRecord<byte[], byte[]> record,
                                                 final Exception exception) {
 
        log.warn("Exception caught during Deserialization, sending to the dead queue topic; " +
            "taskId: {}, topic: {}, partition: {}, offset: {}",
            context.taskId(), record.topic(), record.partition(), record.offset(),
            exception);
 
        dlqProducer.send(new ProducerRecord<>(dlqTopic, record.timestamp(), record.key(), record.value(), record.headers())).get();
 
        return DeserializationHandlerResponse.CONTINUE;
    }
 
    @Override
    public void configure(final Map<String, ?> configs) {
        dlqProducer = .. // get a producer from the configs map
        dlqTopic = .. // get the topic name from the configs map
    }
}
cs


위의 예제코드는 예외가 발생하면 DLQ 토픽에 메시지를 보낸 이후 계속해서 프로세스를 진행하는 예외 핸들러이다. 위에서 이야기하였듯이 반드시 FAIL,CONTINUE 둘중하나를 반환해야 한다.


  • default.production.exception.handler 


프로덕션 예외 처리기를 사용하여 너무 큰 메시지 데이터를 생성하는 등 브로커와 상호 작용하려고 할때 트리거되는 예외를 컨트롤 할 수 있습니다. 기본적으로 카프카는 DefaultProductionExceptionHandler를 제공한다. 이 예외 처리기도 역시 FAIL,CONTINUE등 하나를 반환하여 프로세스 진행여부를 결정해주어야 한다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse;
 
public class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler {
    public void configure(Map<String, Object> config) {}
 
    public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
                                                     final Exception exception) {
        if (exception instanceof RecordTooLargeException) {
            return ProductionExceptionHandlerResponse.CONTINUE;
        } else {
            return ProductionExceptionHandlerResponse.FAIL;
        }
    }
}
 
Properties settings = new Properties();
 
// other various kafka streams settings, e.g. bootstrap servers, application id, etc
 
settings.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
             IgnoreRecordTooLargeHandler.class);
cs



Stream DSL


  • branch - 제공되는 predicate에 따라서 적절히 여러개의 KStream으로 분할한다.(KStream[] 반환)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
 * branch
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam {
 
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        source.foreach((key,value)->log.info("key,value ={}",key+" "+value));
        KStream<StringString>[] streamArr = source.branch(
            (key,value) -> value.startsWith("A"),
            (key,value) -> value.startsWith("B"),
            (key,value) -> true
        );
        
        streamArr[0].to("stream-exam-output1");
        streamArr[1].to("stream-exam-output2");
        streamArr[2].to("stream-exam-output3");
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs


위의 코드는 stream-exam-input이라는 토픽에 "A"로 시작하는 데이터가 들어오면 stream-exam-output1, "B"로 시작하는 데이터가 들어오면 stream-exam-output2, 그 밖의 데이터는 stream-exam-output3으로 보내는 간단한 branch 예제이다. branch에는 적절하게 Predicate를 리스트 형태로 작성한다. 데이터의 값에 따라 다른 처리를 해야할때에는 branch를 사용하면 각각 다른 처리를 하는 토픽으로 메시지를 내보낼 수 있을 것 같다.


  • filter - 적절한 Predicate을 인자로 받아 메시지를 필터링한다.(KStream 반환)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
 * filter
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam2 {
 
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        source.filter( (key,value)->value.length()>3 ).to("stream-exam-output");;
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs


위의 예제는 토픽으로 메시지를 입력받아 value 값의 길이가 3보다 크다면 stream-exam-output 토픽으로 메시지를 내보내는 예제이다.


  • filterNot - filter와 반대되는 개념이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
 * filterNot
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam3 {
 
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        source.filterNot( (key,value)->value.length()>3 ).to("stream-exam-output");;
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs

fliter와는 반대되는 개념이다. 크게 설명할 부분을 없을 것같다.

  • flatMap - 하나의 레코드를 이용해 0개 혹은 하나 이상의 레코드를 생성한다. flatMap은 key,value가 바뀔 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
 * flatMap
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam4 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        source.flatMap(
            new KeyValueMapper() {
                @Override
                public Object apply(Object key, Object value) {
                    List<KeyValue<Object, Object>> list = new LinkedList<>();
                    String keyStr = (String)key;
                    String valueStr = (String)value;
                    list.add(KeyValue.pair(valueStr.toUpperCase(),value));
                    list.add(KeyValue.pair(valueStr.toLowerCase(),value));
                    return list;
                }
            }
        ).to("stream-exam-output");;
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs

위의 예제는 메시지를 받아서 해당 메시지의 값을 대문자,소문자를 키로하여 2개의 레코드를 만드는 예제이다.

  • flatMapValues - 원본 레코드의 키를 유지하면서 하나의 레코드를 가져와서 0개 혹은 하나 이상의 레코드를 생성한다. value의 값과 값의 타입이 바뀔 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
 * flatMapValue
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam5 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        source.flatMapValues(value->Arrays.asList(value.split(" "))).to("stream-exam-output");
//        source.flatMapValues(
//                new ValueMapper() {
//                    @Override
//                    public Object apply(Object value) {
//                        String valueStr = (String)value;
//                        return Arrays.asList(valueStr.split(" "));
//                    }
//                }
//        );
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs

위의 예제는 메시지의 값에 공백을 포함하고 있으면 해당 메시지를 공백기준으로 split하여 list로 반환한다. 그럼 해당 List는 flat되어 여러개의 String Stream으로 변환되어 stream-exam-output 토픽으로 메시지가 전달된다.

  • GroupByKey - 기존 키로 레코드를 그룹핑한다. 스트림이나 테이블을 집계하고 후속작업을 위해 키로 그룹화하여 파티션하는 것을 보장한다. 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
 * GroupByKey
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam6 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        KGroupedStream<StringString> groupedStream = source.flatMap(
            new KeyValueMapper() {
                @Override
                public Object apply(Object key, Object value) {
                    List<KeyValue<Object, Object>> list = new LinkedList<>();
                    String keyStr = (String)key;
                    String valueStr = (String)value;
                    list.add(KeyValue.pair(value,valueStr.toUpperCase()));
                    list.add(KeyValue.pair(value,valueStr.toLowerCase()));
                    return list;
                }
            }
        ).groupByKey();
        
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs


스트림의 기존 키값을 이용하여 그룹핑한다.


GroupBy - 새로운 키로 그룹핑한다. 테이블을 그룹핑할 때는 새로운 값과 값 유형을 지정할 수도 있다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
 * GroupBy
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam7 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        KGroupedStream<StringString> groupedStream = source.groupBy(
                (key,value)->value, Serialized.with(Serdes.String(),Serdes.String())
        );
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs


위의 예제와 같이 기존 스트림에서 받은 키값을 그대로 사용하는 것이 아니라 변경하여 사용할 수 있다.(예제는 value를 키로 사용한다)


  • map - 하나의 레코드를 가져와서 다른 하나의 레코드를 생성한다. 타입을 포함하여 키와 값을 수정할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
 * map
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam8 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        source.map(
            (key,value)->KeyValue.pair(value, key)
        );
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs


메시지를 받아서 키와 값을 바꿔주는 map 예제이다.


  • mapValues - 하나의 레코드를 받아서 값을 바꿔준다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
 * mapValues
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam9 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        source.mapValues(
            (value)->value+"_map"
        ).to("stream-exam-output");
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs


메시지를 받아서 값에 "_map"이라는 문자열을 추가하여 수정하였다.


  • merge - 두 스트림의 레코드를 하나의 스트림으로 merge한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
 * merge
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam10 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        source.foreach((key,value)->log.info("key,value ={}",key+" "+value));
        KStream<StringString>[] streamArr = source.branch(
            (key,value) -> value.startsWith("A"),
            (key,value) -> value.startsWith("B"),
            (key,value) -> true
        );
        
        KStream<StringString> merge1 = streamArr[0].merge(streamArr[1]);
        merge1.merge(streamArr[2]).to("stream-exam-output");
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs



  • selectKey - 각 레코드에 새키를 할당한다. 마치 map을 이용하여 key값을 바꾸는 것과 동일하다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
 * selectKey
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam11 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
 
        source.selectKey(
  //리턴되는 값이 키가된다.
            (key,value)->value.charAt(0)+""
        ).to("stream-exam-output");
            
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs


  • toStream - KTable을 스트림으로 가져온다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Slf4j
public class WordCounter {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
      
        final StreamsBuilder builder = new StreamsBuilder();
        
        NoriAnalyzer analyzer = new NoriAnalyzer();
      
        KStream<StringString> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(analyzer.analyzeForString(value).split(" ")))
                //return하는 value가 키가 된다. 리턴값으로 그룹핑된 카프카스트림 객체를 리턴한다. KGroupedStream
                .groupBy(new KeyValueMapper<StringStringString>() {
                  @Override
                  public String apply(String key, String value) {
                    log.info("key = {},value = {}",key,value);
                    return value;
                  }
                })
                //count()를 호출하여 해당 키값으로 몇개의 요소가 있는지 체크한다. 그리고 해당 데이터를 스토어에(KeyValueStore<Bytes,byte[]> counts-store) 담고
                //변경이 있는 KTable에 대해서만 결과값을 리턴한다.
                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
                .toStream() //KTable -> Stream
                .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
      
        final Topology topology = builder.build();
        System.out.println(topology.describe());
      
        final KafkaStreams streams = new KafkaStreams(topology, props);
      
        try {
          streams.start();
        } catch (Throwable e) {
          System.exit(1);
        }
    }
}
 
cs


지금까지는 메시지의 상태가 없는 StreamDSL이었다. 다음으로 알아볼 DSL은 이전 메시지의 상태를 참조하는 상태기반 스트림이다. 아마 실시간으로 데이터 스트림을 처리할때 상태가 필요없는 처리도 있겠지만, 오늘하루 사용자가 많이 문의한 단어등 단어의 빈도수를 계산해야하는 스트림이 있다고 해보자. 그렇다면 지금 이순간 전까지 해당 단어는 몇번이 누적되었는지를 알아야 이시간 이후로 들어오는 같은 단어도 이전 빈도수에 누적하여 통계를 낼 것이다. 이렇게 상태에 기반하여 처리해야하는 스트림을 처리하는 것이 상태기반 스트림이다.



다음 포스팅에서 다루어볼 내용은 Stateful transformations이다. 레퍼런스의 양이 굉장히 많은 부분으로 조금 정리한 후 포스팅하려한다.


posted by 여성게
:
Middleware/Kafka&RabbitMQ 2019. 3. 30. 00:45

kafka is a distributed streaming platform

이번 포스팅은 Spring Cloud Stream 환경에서의 kafka Streams API입니다. 물론 이전 포스팅들에서 자바코드로 카프카 스트림즈를 다루어봤지만 이번에는 스프링 클라우드 스트림즈 환경에서 진행합니다. 카프카 스트림즈에 대한 설명은 이전 포스팅에서 진행하였기에 개념적인 설명은 하지 않고 코드레벨로 다루어보겠습니다. 혹시나 카프카 스트림즈를 모르시는 분이 있으시다면 아래 링크를 참조하시길 바랍니다.

지금부터 진행될 예제 및 설명은 모두 Spring Cloud Stream Reference를 참조하였습니다. 번역 중 오역이 있을 수 있습니다.

▶︎▶︎▶︎2019/03/26 - [Kafka&RabbitMQ] - Kafka - Kafka Streams API(카프카 스트림즈)

 

Kafka - Kafka Streams API(카프카 스트림즈)

Kafka - Kafka Streams API(카프카 스트림즈) 카프카는 대규모 메시지를 저장하고 빠르게 처리하기 위해 만들어진 제품이다. 초기 사용 목적과는 다른 뛰어난 성능에 일련의 연속된 메시지인 스트림을 처리하는..

coding-start.tistory.com

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
<?xml version="1.0" encoding="UTF-8"?>
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>kafka-streams-exam</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka-streams-exam</name>
    <description>Demo project for Spring Boot</description>
 
    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Greenwich.SR1</spring-cloud.version>
    </properties>
 
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
 
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
 
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
 
</project>
 
http://colorscripter.com/info#e" target="_blank" style="color:#e5e5e5; text-decoration:none">Colored by Color Scripter
http://colorscripter.com/info#e" target="_blank" style="text-decoration:none; color:white">cs

의존성을 추가해줍니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@SpringBootApplication
@EnableBinding(KStreamProcessor.class)
public class WordCountProcessorApplication {
 
    @StreamListener("input")
    @SendTo("output")
    public KStream<?, WordCount> process(KStream<?, String> input) {
        return input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, value) -> value)
                .windowedBy(TimeWindows.of(5000))
                .count(Materialized.as("WordCounts-multi"))
                .toStream()
                .map((key, value) -> new KeyValue<>(nullnew WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
    }
 
    public static void main(String[] args) {
        SpringApplication.run(WordCountProcessorApplication.class, args);
    }
}
cs

해당 코드를 레퍼런스에서 소개하는 카프카 스트림즈의 예제코드이다 토픽에서 메시지를 받아서 적절하게 데이터를 가공하여 특정 토픽으로 데이터를 내보내고 있다 내부 로직은 마치 Java1.8의 Stream와 비슷한 코드같이 보인다 메소드를 보면 매개변수와 반환타입이 모두 KStream객체다 나가고 들어노는 객체의 타입은 하나로 고정되어있으므로 비지니스로직에 조금이라도 더 집중할 수 있을 것 같다.

 

Configuration Option

Kafka Streams Properties는 모두 spring.cloud.stream.kafka.streams.binder 접두어가 붙는다. 

 

  • brokers - broker URL
  • zkNodes - zookeeper URL
  • serdeError - 역 직렬화 오류 처리 유형. logAndContinue,logAndFail,sendToDlq 
  • applicationId - 애플리케이션에서 사용하는 카프카 스트림 아이디값. 모든 카프카 스트림의 아이디값은 유일해야한다.

Producer Properties

  • keySerde - key serde, 키의 직렬화 옵션(default value = none)
  • valueSerde -  value serde, 값의 직렬화 옵션 (default value = none)
  • userNativeEncoding - native 인코딩 활성화 플래그(default value = false)

Consumer Properties

  • keySerde - key serde, 키의 역직렬화 옵션(default value = none)
  • valueSerde - value serde, 값의 역직렬화 옵션(default value = none)
  • materializedAs - KTable 타입을 사용할 때 상태저장소를 구체화한다.(default value = none)
  • useNativeDecoding - native 디코드 활성화 플래그(default value = false)
  • dlqName - DLQ 토픽 이름(default value = none)

기타 다른 설정들은 레퍼런스 참고하시길 바랍니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
 * in,out channel defined
 * 즉, 카프카 토픽에 메시지를 쓰는 발신 채널과
 * 카프카 토픽에서 메시지를 읽어오는 수신 채널을 정의해주는 것이다.
 * 꼭 @Input & @Output 어노테이션을 하나씩 넣을 필요는 없다.
 * 필요한 채널수만큼 정의가능하다.
 * 
 * 런타임동안에는 스프링이 구현체를 제공해준다.
 * @author yun-yeoseong
 *
 */
public interface ExamProcessor {
    String INPUT = "exam-input";
    String OUTPUT = "exam-output";
    String OUTPUT2 = "exam-output2";
    
    /**
     * @Input 어노테이션 설명
     * 메시지 시스템에서 메시지를 읽어오는 입력채널 
     * 매겨변수로 채널이름을 넣을 수 있다. 넣지 않으면 기본으로
     * 메소드 이름으로 들어간다.
     * @return
     */
    @Input(INPUT)
    SubscribableChannel inboundChannel();
    
    @Input("streams-input")
    KStream<?, String> inboundChannel2();
    
    @Input("streams-input2")
    KStream<?, String> inboundChannel3();
    /**
     * @Output 어노테이션 설명
     * 메시지 시스템으로 메시지를 보내는 출력채널
     * 매겨변수로 채널이름을 넣을 수 있다. 넣지 않으면 기본으로
     * 메소드 이름으로 들어간다.
     * @return
     */
    @Output(OUTPUT)
    MessageChannel outboundChannel();
    
    @Output(OUTPUT2)
    MessageChannel outboundChannel2();
    
    @Output("streams-output")
    KStream<?, String> outboundChannel3();
    
}
 
cs

카프카 스트림즈를 스프링 클라우드 스트림에서 사용하려면 input&output을 KStream을 반환타입으로 채널을 정의한다. 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Slf4j
@EnableBinding(ExamProcessor.class)
@SpringBootApplication
public class KafkaStreamsExamApplication {
 
    public static void main(String[] args) {
        SpringApplication.run(KafkaStreamsExamApplication.class, args);
    }
    
    @StreamListener(target= ExamProcessor.INPUT,condition="headers['producerName']=='yeoseong'")
    public void receive(@Payload Exam exam) {
        log.info("Only Header value yeoseong = {}",exam.toString());
    }
    
    @StreamListener("streams-input")
    @SendTo("streams-output")
    public KStream<?, String> streams(KStream<?, String> input){
        return input.flatMapValues(new ValueMapper() {
            @Override
            public Object apply(Object value) {
                // TODO Auto-generated method stub
                String valueStr = (String)value;
                System.out.println(valueStr);
                return Arrays.asList(valueStr.split(" "));
            }
        });
    }
    
    @StreamListener("streams-input2")
    public void streams2(KStream<?, String> input) {
        System.out.println("streams2");
        input.foreach( (key,value) -> System.out.println(value));
    }
}
 
cs

위에서 보이듯이 sink processor라면 그 밑으로 더 이상 processor가 붙지 않으므로 void 반환타입으로 데이터를 입력받아 적절한 처리를 하면된다. 하지만 밑으로 스트림이 더 붙는 스트림들은 반환타입으로 KStream을 반환해야한다. 지금까지 간단하게 Kafka Streams API를 Spring Cloud Stream에서 사용해보았다. kafka streams DSL의 자세한 문법은 곧 포스팅할 카프카 스트림즈 API에 대한 글을 참고하시거나 카프카 레퍼런스를 참고하시면 될듯합니다. 

 

posted by 여성게
:
Middleware/Kafka&RabbitMQ 2019. 3. 26. 00:07

Kafka - Kafka Streams API(카프카 스트림즈)



카프카는 대규모 메시지를 저장하고 빠르게 처리하기 위해 만들어진 제품이다. 초기 사용 목적과는 다른 뛰어난 성능에 일련의 연속된 메시지인 스트림을 처리하는 데도 사용이 되기 시작했다. 이러한 스트림을 카프카는 Kafka Streams API를 통해 제공한다.


설명하기 앞서 우선 스트림 프로세싱과 배치 프로세싱의 차이점이란 무엇일까? 스트림 프로세싱(Stream Processing)은 데이터들이 지속적으로 유입되고 나가는 과정에서 이 데이터에 대한 일련의 처리 혹은 분석을 수행하는 것을 의미한다. 즉, 스트림 프로세싱은 실시간 분석(Real Time Analysis)이라고 불리기도 한다. 스트림 프로세싱과는 대비되는 개념으로 배치(Batch)처리 또는 정적 데이터(Data-at-rest)처리를 들 수 있다. 배치 및 정적 데이터 처리란 위의 스트림 프로세싱과는 다르게 데이터를 한번에 특정 시간에 처리한다라는 특징이 있다. 주로 사용자의 요청이 몰리지 않는 새벽 시간대에 많이 수행하기도 한다.(물론 무조건 그렇다고는 할 수 없다) 그렇지만 사실 뭐가 좋고 나쁨은 이야기 할 수 없다. 스트림과 배치의 서로의 장단점이 있기 때문이다. 하지만 요즘은 역시나 실시간 데이터 처리가 각광받고 있는 사실은 숨길 수 없다.


<특정 이벤트 발생시 바로바로 애플리케이션에서 데이터처리를 하고 있다.>


그렇다면 스트림 프로세싱의 장점은 무엇이 있을까? 우선은 애플리케이션이 이벤트에 즉각적으로 반응을 하기 때문에 이벤트 발생과 분석,조치에 있어 거의 지연시간이 발생하지 않는다. 또한 항상 최신의 데이터를 반영한다라는 특징이 있다. 그리고 데이터를 어디에 담아두고 처리하지 않기 때문에 대규모 공유 데이터베이스에 대한 요구를 줄일 수 있고, 인프라에 독립적인 수행이 가능하다.



상태 기반과 무상태 스트림 처리

스트림 프로세싱에는 상태 기반과 무상태 스트림 처리가 있다. 차이점이란 실시간 데이터 처리를 위하여 이전에 분석된 데이터의 결과가 필요한지이다. 이렇게 상태를 유지할 스트림 프로세싱은 이벤트를 처리하고 그 결과를 저장할 상태 저장소가 필요하다. 이와는 반대로 무상태 스트림 처리는 이전 스트림의 처리 결과와 관계없이 현재 데이터로만 처리를 한다.




카프카 스트림즈(Kafka Streams API)



카프카 스트림즈는 카프카에 저장된 데이터를 처리하고 분석하기 위해 개발된 클라이언트 라이브러리이다. 카프카 스트림즈는 이벤트 시간과 처리 시간을 분리해서 다루고 다양한 시간 간격 옵션을 지원하기에 실시간 분석을 간단하지만 효율적으로 진행할 수 있다. 우선 카프카 스트림즈에 대해 설명하기 전에 용어 정리를 할 필요가 있을 것같다.


용어 

설명 

스트림(Stream) 

스트림은 카프카 스트림즈 API를 사용해 생성된 토폴로지로, 끊임없이 전달되는 데이터 세트를 의미한다. 스트림에 기록되는 단위는 key-value 형태이다. 

스트림 처리 애플리케이션(Stream Processing Application) 

카프카 스트림 클라이언트를 사용하는 애플리케이션으로서, 하나 이상의 프로세서 토폴로지에서 처리되는 로직을 의미한다. 프로세서 토폴로지는 스트림 프로세서가 서로 연결된 그래프를 의미한다. 

스트림 프로세서(Stream Processor) 

프로세서 토폴로지를 이루는 하나의 노드를 말하여 여기서 노드들은 프로세서 형상에 의해 연결된 하나의 입력 스트림으로부터 데이터를 받아서 처리한 다음 다시 연결된 프로세서에 보내는 역할을 한다. 

 소스 프로세서(Source Processor)

위쪽으로 연결된 프로세서가 없는 프로세서를 말한다. 이 프로세서는 하나 이상의 카프카 토픽에서 데이터 레코드를 읽어서 아래쪽 프로세서에 전달한다. 

싱크 프로세서(Sink Processor) 

토폴로지 아래쪽에 프로세서 연결이 없는 프로세서를 뜻한다. 상위 프로세서로부터 받은 데이터 레코드를 카프카 특정 토픽에 저장한다. 



카프카 스트림즈 아키텍쳐

카프카 스트림즈에 들어오는 데이터는 카프카 토픽의 메시지이다. 각 스트림 파티션은 카프카의 토픽 파티션에 저장된 정렬된 메시지이고, 해당 메시지는 key-value형태이다. 또한 해당 메시지의 키를 통해 다음 스트림(카프카 토픽)으로 전달된다. 위의 그림에서 보듯 카프카 스트림즈는 입력 스트림의 파티션 개수만큼 태스크를 생성한다. 각 태스크에는 입력 스트림(카프카 토픽) 파티션들이 할당되고, 이것은 한번 정해지면 입력 토픽의 파티션에 변화가 생기지 않는 한 변하지 않는다. 마지 컨슈머 그룹 안의 컨슈머들이 각각 토픽 파티션을 점유하는 것과 비슷한 개념이다.



자바를 이용한 간단한 파이프 예제 프로그램

지금부터 진행할 예제는 이전 포스팅에서 구성했던 카프카 클러스터 환경에서 진행한다. 만약 클러스터 구성이 되어 있지않다면 밑의 링크를 참조해 구성하면 될것 같다.

스프링 부트 애플리케이션 하나를 생성한다. 그리고 필요한 라이브러리 의존성을 추가한다. 로그를 사용하기 위하여 롬복 라이브러리를 추가하였다. 혹시 이클립스 환경에서 롬복을 사용하는 방법을 알고 싶다면 밑의 링크를 참조하자.

▶︎▶︎▶︎2019/02/02 - [Java&Servlet] - Mac OS - Eclipse & Lombok(롬복 사용방법)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.4</version>
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>2.0.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>2.0.1</version>
        </dependency>
cs


이번에 만들 프로그램은 간단히 한쪽 토픽에 입력된 값을 다른 쪽 토픽으로 옮기는 역할을 수행한다. 


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
 
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
 
import lombok.extern.slf4j.Slf4j;
 
@Slf4j
public class Pipe {
    
    public static void main(String[] args) {
        
        /*
         * 카프카 스트림 파이프 프로세스에 필요한 설정값
         * StreamsConfig의 자세한 설정값은
         * https://kafka.apache.org/10/documentation/#streamsconfigs 참고
         */
        Properties props = new Properties();
        //카프카 스트림즈 애플리케이션을 유일할게 구분할 아이디
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
        //스트림즈 애플리케이션이 접근할 카프카 브로커정보
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        //데이터를 어떠한 형식으로 Read/Write할지를 설정(키/값의 데이터 타입을 지정) - 문자열
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
        //데이터의 흐름으로 구성된 토폴로지를 정의할 빌더
        final StreamsBuilder builder = new StreamsBuilder();
        
        //streams-*input에서 streams-*output으로 데이터 흐름을 정의한다.
        /*
         * KStream<String, String> source = builder.stream("streams-plaintext-input");
           source.to("streams-pipe-output");
         */
        builder.stream("streams-plaintext-input").to("streams-pipe-output");
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        //만들어진 토폴로지 확인
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        try {
          streams.start();
          System.out.println("topology started");
      
        } catch (Throwable e) {
          System.exit(1);
        }
        System.exit(0);
    }
}
 
cs


우선 Properties 객체를 이용하여 카프카 스트림즈가 사용할 설정값을 입력한다. 우선 StreamsConfig.APPLICATION_ID_CONFIG는 카프카 클러스터 내에서 스트림즈 애플리케이션을 구분하기 위한 유일한 아이디 값을 설정하기 위한 값이다. 그리고 그 밑에 카프카 클러스터 구성이 된 인스턴스를 리스트로 작성한다. 또한 위에서 설명했듯이 카프카 스트림즈는 키-값 형태로 데이터의 흐름이 이루어지기에 해당 키-값을 어떠한 타입으로 직렬화 할 것인지 선택한다. 현재는 String 타입으로 지정하였다. 그 다음은 스트림 토폴로지를 구성해준다. 토폴로지는 StreamsBuilder 객체를 통하여 구성한다. 위의 소스는 "streams-plaintext-input"이라는 토픽에서 데이터를 읽어 "streams-pipe-output"이라는 토픽으로 데이터를 보내는 토폴로지를 구성하였다. 최종적으로 StreamsBuilder.build()를 호출하여 토폴로지 객체를 만든다. 위의 Topology.describe()를 호출하면 해당 토폴로지가 어떠한 구성으로 이루어졌는지 로그로 상세하게 볼 수 있다.


22:07:24.234 [main] INFO com.kafka.exam.streams.Pipe - Topology info = Topologies:

   Sub-topology: 0

    Source: KSTREAM-SOURCE-0000000000 (topics: [streams-plaintext-input])

      --> KSTREAM-SINK-0000000001

    Sink: KSTREAM-SINK-0000000001 (topic: streams-pipe-output)

      <-- KSTREAM-SOURCE-0000000000


그 다음 KafkaStreams 객체를 생성한다. 매개변수로 토폴로지와 설정 객체를 전달한다.  마지막으로 KafkaStreams.start();를 호출하여 카프카 스트림을 시작한다. 해당 예제에서는 명시하지 않았지만 스트림즈를 종료하려면 close() 해주는 코드도 삽입해야한다.



카프카가 제공하는 콘솔 프로듀서,컨슈머를 이용하여 위의 스트림즈 코드를 수행해보았다. input 토픽에서 스트림즈 애플리케이션이 메시지를 꺼내서 output 토픽으로 잘 전달한것을 볼 수 있다.(>"hi hello") 컨슈머 실행시 세팅한 설정은 직관적으로 해석 가능하므로 따로 설명하지는 않는다.


방금은 단순히 메시지를 받아 다른 쪽 토픽으로 전달만 하는 예제이지만 다음 해볼 예제는 중간에 데이터를 처리하여 output 토픽으로 보내는 행 분리 예제 프로그램이다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
 
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
 
import lombok.extern.slf4j.Slf4j;
 
@Slf4j
public class LineSplit {
    public static void main(String[] args) {
        /*
         * 카프카 스트림 파이프 프로세스에 필요한 설정값
         * StreamsConfig의 자세한 설정값은
         * https://kafka.apache.org/10/documentation/#streamsconfigs 참고
         */
        Properties props = new Properties();
        //카프카 스트림즈 애플리케이션을 유일할게 구분할 아이디
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
        //스트림즈 애플리케이션이 접근할 카프카 브로커정보
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        //데이터를 어떠한 형식으로 Read/Write할지를 설정(키/값의 데이터 타입을 지정) - 문자열
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
        //데이터의 흐름으로 구성된 토폴로지를 정의할 빌더
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
            .to("streams-linesplit-output");
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        //만들어진 토폴로지 확인
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
 
        try {
          streams.start();
          System.out.println("topology started");
        } catch (Throwable e) {
          System.exit(1);
        }
        System.exit(0);
    }
}
 
cs


설정 값은 동일하다 하지만 토폴로지에 로직이 추가된 것을 볼 수 있다. 우선 "streams-plaintext-input" 토픽으로 들어온 메시지를 KStream객체로 만들어준다. 그리고 flatMapValues()를 이용하여 데이터를 공백 단위로 쪼개는 것을 볼 수 있다. 여기서 flatMapValues()를 쓴 이유는 여기서 전달되는 value는 하나의 스트링 객체인데 이것을 List<String>으로 변환을 하였다. 이것을 flatMapValues()를 이용해 리스트의 각 요소를 스트링으로 flat하는 메소드이다. 즉, 메시지를 받아서 flatMapValues()를 통해 새로운 데이터 값이 만들어 졌다. 그러면 .to()에 전달되는 데이터 스트림은 공백단위로 짤린 단어 스트링이 전달될 것이다. 이전 예제와는 다르게 단순 데이터 전달만이 아니라 전달 이전에 적절한 처리를 하나 추가 해준 것이다. 만약 키와 값 모두를 새롭게 만들어서 사용하고 싶다면 flatMap() 메소드를 이용하면 된다. 그 밖에 메소드들을 알고 싶다면 공식 홈페이지를 참고하길 바란다.


22:49:08.941 [main] INFO com.kafka.exam.streams.LineSplit - Topology info = Topologies:

   Sub-topology: 0

    Source: KSTREAM-SOURCE-0000000000 (topics: [streams-plaintext-input])

      --> KSTREAM-FLATMAPVALUES-0000000001

    Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])

      --> KSTREAM-SINK-0000000002

      <-- KSTREAM-SOURCE-0000000000

    Sink: KSTREAM-SINK-0000000002 (topic: streams-linesplit-output)

      <-- KSTREAM-FLATMAPVALUES-0000000001


토폴로지 정보를 보아도 중간에 Processor라는 이름으로 하나의 처리 프로세스가 추가된 것을 볼 수 있다.



hi hello 라는 메시지를 보냈는데 hi / hello 로 짤려서 출력된 것을 볼 수 있다. 


필자는 챗봇을 개발하고 있는데 카프카 스트림즈를 이용하여 간단히 사용자 질문 로그들을 분석하여 형태소 분리된 형태로 데이터를 저장할 수도 있을 것 같아서 간단히 예제로 짜보았다. NoriAnalyzer는 필자가 Nori 형태소 분석기를 이용한 간단히 형태소분석 유틸 클래스를 만든것이다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
 
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
 
import lombok.extern.slf4j.Slf4j;
import rnb.analyzer.nori.analyzer.NoriAnalyzer;
 
@Slf4j
public class LineSplit {
    public static void main(String[] args) {
        /*
         * 카프카 스트림 파이프 프로세스에 필요한 설정값
         * StreamsConfig의 자세한 설정값은
         * https://kafka.apache.org/10/documentation/#streamsconfigs 참고
         */
        Properties props = new Properties();
        //카프카 스트림즈 애플리케이션을 유일할게 구분할 아이디
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
        //스트림즈 애플리케이션이 접근할 카프카 브로커정보
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        //데이터를 어떠한 형식으로 Read/Write할지를 설정(키/값의 데이터 타입을 지정) - 문자열
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
        //데이터의 흐름으로 구성된 토폴로지를 정의할 빌더
        final StreamsBuilder builder = new StreamsBuilder();
        //Nori 형태소 분석기를 이용한 유틸클래스
        NoriAnalyzer analyzer = new NoriAnalyzer();
        
        KStream<StringString> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(analyzer.analyzeForString(value).split(" ")))
            .to("streams-linesplit-output");
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        //만들어진 토폴로지 확인
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
 
        try {
          streams.start();
          System.out.println("topology started");
        } catch (Throwable e) {
          System.exit(1);
        }
    }
}
 
cs



이렇게 추후에는 형태소분리된 모든 단어가 아니라 특정 명사만 추출하여 사용자들의 질문중 가장 많이 포함된 명사들을 뽑아내 데이터를 분석할 수도 있을 것같다.


지금까지의 예제는 바로 무상태 스트림 프로세싱이다. 다음 예제는 이전 데이터 처리에 대한 상태를 참조하여 데이터를 처리하는 단어 빈도수 세기 프로그램 예제이다. 


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
 
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.state.KeyValueStore;
 
import lombok.extern.slf4j.Slf4j;
import rnb.analyzer.nori.analyzer.NoriAnalyzer;
 
@Slf4j
public class WordCounter {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
      
        final StreamsBuilder builder = new StreamsBuilder();
        
        NoriAnalyzer analyzer = new NoriAnalyzer();
      
        KStream<StringString> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(analyzer.analyzeForString(value).split(" ")))
                //return하는 value가 키가 된다.
                .groupBy(new KeyValueMapper<StringStringString>() {
                  @Override
                  public String apply(String key, String value) {
                    log.info("key = {},value = {}",key,value);
                    return value;
                  }
                })
                //count()를 호출하여 해당 키값으로 몇개의 요소가 있는지 체크한다. 그리고 해당 데이터를 스토어에(KeyValueStore<Bytes,byte[]> counts-store) 담고

  //변경이 있는 KTable에 대해서만 결과값을 리턴한다.

                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
                .toStream()
                .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
      
        final Topology topology = builder.build();
        System.out.println(topology.describe());
      
        final KafkaStreams streams = new KafkaStreams(topology, props);
      
        try {
          streams.start();
        } catch (Throwable e) {
          System.exit(1);
        }
    }
}
 
cs


설정값은 동일하다. 조금 달라진 것은 데이터처리부이다. 우선은 입력받은 데이터를 형태소분리하여 토큰 단위로 스트림을 만든다. 그리고 groupBy를 이용하여 토큰값을 키로 하여 스트림을 그룹핑한다. 그 다음 count()를 통해 같은 키값으로 몇개의 요소가 있는 지를 KeyValueStore에 담고, 만약 키값스토어의 값이 변경이 있을 때만 다운스트림으로 내려준다.(key == null은 무시한다) 처음에는 해당 단어:빈도수가 출력이 되지만 이후에는 이 단어가 또 들어오지 않는다면 출력되지 않고 해당 단어가 들어오면 키값 저장소에 빈도수가 변경이 되므로 다운스트림으로 내려준다. 이말은 무엇이냐면 즉, 키값스토어가 이전 데이터 처리의 상태를 담고 있는 저장소가 되어 이후 데이터에서 참조되어 사용되는 것이다.(사실 더 자세한 것은 API문서를 봐야할 듯하다.)




일부 출력이 안된 것들은 토픽의 버퍼가 원하는 용량에 도달하지 못해 아직 출력하지 못한 데이터들이다. 이렇게 상태를 유지하여 스트림 프로세스를 구성할 수도 있다. 


지금까지 아주 간단하게 카프카 스트림즈 API에 대해 다루었다. 사실 실무에서 이용할 수 있을 만큼의 예제는 아니지만 카프카가 이렇게 실시간 데이터 스트림에도 이용될 수 있다는 것을 알았다면 추후에 API문서를 참조하여 카프카를 응용할 수 있을 것같다.

posted by 여성게
: