Middleware/Kafka&RabbitMQ 2023. 11. 18. 01:29

아래는 ContainerFactory 및 conusmer, producer 설정들이다.

package com.spring.kafkasample.config

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.*
import org.springframework.kafka.listener.ContainerProperties


@EnableKafka
@Configuration
class KafkaConfig(private val kafkaErrorHandler: KafkaErrorHandler) {
    // RecordListener
    @Bean
    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
        factory.consumerFactory = consumerFactory()
        factory.setConcurrency(1)
        // MANUAL 설정을 해줘야, 리스너에서 Acknowledge 오브젝트를 받을 수 있음.
        factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
        // 컨슈머 로직 처리시 예외 발생시 어떻게 처리할지를 정의
        // default로는 DefaultErrorHandler를 사용하는데 기본 동작구조는 최대 10회 retry 후
        // (backoff ms는 100ms)
        // 실패한 메시지를 커밋해버린다.(물론 커밋을 안하게 할 수 있다. 요건 뒤에서 설명)
        factory.setCommonErrorHandler(kafkaErrorHandler)
        return factory
    }

    // BatchListener
    @Bean
    fun batchKafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
        factory.consumerFactory = consumerFactory()
        factory.setConcurrency(1)
        // 배치 리스너를 사용한다는 설정
        factory.isBatchListener = true
        factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
        factory.setCommonErrorHandler(kafkaErrorHandler)
        return factory
    }

    @Bean
    fun consumerFactory(): ConsumerFactory<String, String> {
        return DefaultKafkaConsumerFactory(consumerConfigs())
    }

    @Bean
    fun consumerConfigs(): Map<String, Any> {
        val props = mutableMapOf<String, Any>()
        props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "0.0.0.0:29092"
        props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = false
        props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
        // 해당 설정은 한번에 poll()로 몇 개의 레코드를 가져오는지 설정.
        // AckMode.MANUAL일 때 커밋 단위가 되는 개수.
        // poll()로 2개의 레코드를 가져와서 해당 2개의 레코드를 다 처리완료되면
        // 2개가 한번에 커밋됨.
        props[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 2
        return props
    }

    @Bean
    fun kafkaTemplate(): KafkaTemplate<String, String> {
        return KafkaTemplate(producerFactory())
    }

    @Bean
    fun producerFactory():ProducerFactory<String, String> {
        val config = mutableMapOf<String, Any>()
        config[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "0.0.0.0:29092"
        config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
        config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java

        return DefaultKafkaProducerFactory(config)
    }
}

 

아래는 에러 핸들러 코드이다.

 

package com.spring.kafkasample.config

import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.kafka.listener.CommonErrorHandler
import org.springframework.kafka.listener.MessageListenerContainer
import org.springframework.stereotype.Component
import java.lang.Exception

@Component
class KafkaErrorHandler: CommonErrorHandler {
    override fun handleRecord(
        thrownException: Exception,
        record: ConsumerRecord<*, *>,
        consumer: Consumer<*, *>,
        container: MessageListenerContainer
    ) {
        print("error: ${thrownException.message}, record: ${record.value()}")
    }

    // 해당 값이 false면 에러 핸들링 이후에도 오프셋 커밋을 하지 않음.
    override fun isAckAfterHandle(): Boolean {
        return true
    }
}

 

(max poll count 개수 만큼 폴링하고 각 메시지를 컨슘해서 커밋하는데, 커밋은 메시지 단위가 아니고 max poll count 만큼 다 처리된 후 한번에 커밋된다. AckMode.MANUAL 일때!)

 

위에서 에러 핸들링이 끝나면, 커밋이 된다고 하였는데 이 설정이 바로 isAckAfterHandler() 설정이다. 요게 false이면 에러 처리해도 오프셋 커밋이 되지 않음.(이건 당연히 true가 맞는게, 항상 예외가 발생하는 로직일 경우 무한으로 컨슘이 발생하기 때문이다.)

 

그렇다면 만약 한번의 poll()에서 2개를 가져왔고, 첫번째 메시지 처리에서 예외가 발생해서 에러 핸들링이 발생하면 어떻게 될까? 커밋? 아니다. 일단 넘어가고 2번째 메시지까지 처리가 완료되어야(성공이던 예외처리던) 첫번째 예외처리된 메시지도 커밋된다.

 

아래는 리스너 설정이다.

 

package com.spring.kafkasample.config

import com.spring.kafkasample.ProduceController
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.support.Acknowledgment
import org.springframework.stereotype.Component

@Component
class KafkaListener {
    @KafkaListener(
        clientIdPrefix = "record-listener",
        topics = ["test-topic"],
        groupId = "test-topic-group",
        containerFactory = "kafkaListenerContainerFactory"
    )
    fun recordListener(message: ProduceController.Message, acknowledgment: Acknowledgment) {
        print("message: $message")
        acknowledgment.acknowledge()
    }

    @KafkaListener(
        clientIdPrefix = "batch-listener",
        topics = ["test-topic"],
        groupId = "test-topic-group",
        containerFactory = "batchKafkaListenerContainerFactory"
    )
    fun batchListener(messages: List<ProduceController.Message>, acknowledgment: Acknowledgment) {
        print("message: $messages")
        acknowledgment.acknowledge()
    }
}

 

예제에 배치 리스너가 있는데, 배치 리스터는 한번에 poll()으로 가져온 메시지 모두를 인자로 받는 것이다. 위 설정대로라면 배치 리스너는 2개의 메시지를 받을 것이다.(AckMode.MANUAL은 배치 리스너 일때 AckMode.BATCH와 동일하게 동작한다.)

 

또 하나 살펴보아야할 것이 있다. 스프링 카프카에서는  브로커에서 메시지를 가져올 때, max poll record 만큼 가져오지 않고 fetch max byte size만큼 가져오는데, 만약 fetch로 4개를 가져왔다 가정해보자.

 

 

a,b,c,d -> 1번째 poll()에 a,b를 가져옴 -> a, b를 처리하다 커밋을 하지 못함

 

위와 같은 상황이 발생하였다면 다음 동작은 어떻게 될까? a,b를 다시 컨슘한다? 아니다. 스프링 카프카는 내부적으로 fetch한 데이터의 오프셋을 따로 관리하고 있기때문에 a,b 커밋을 하지 못하더라도 c,d를 가져온다. (물론 보통은 예외가 발생해서 커밋을 못하는 상황일텐데 그럴경우 에러 핸들러에서 처리 후에 isAckAfterHandler() 설정에 의해 커밋이 될것이다.)

posted by 여성게
:
Middleware/Kafka&RabbitMQ 2023. 11. 7. 22:35

https://stackoverflow.com/questions/64681806/not-acknowledging-the-kafka-message-at-all-in-manual-immediate-mode

 

Not acknowledging the kafka message at all in MANUAL_IMMEDIATE mode

I could not find any documentation related to this issue, hence the question. What happens if @KafkaListener method does not call acknowledgement.acknowledge() at all when the ackModeis set to

stackoverflow.com


nack()은 현재 실패한 메시지 전의 오프셋까지 커밋하고 나머지 오프셋메시지를 버리고 다음 폴링때 버린 오프셋부터 다시 폴링한다.

https://pula39.tistory.com/m/19

 

Kafka의 Auto Commit 에서 Auto는 당신이 생각하는 Auto가 아닐 수 있다.

환경설정을 할 때 auto라는 키워드가 나오면 작업자는 날먹을 꿈꾸며 행복해지는 한 편, 이 auto가 어디까지 자동으로 해주고 어디까지는 안해주는지 공포에 떨며 작업을 하게 된다. 나에게 kafka의

pula39.tistory.com

오토 커밋 문제점

posted by 여성게
:
Middleware/Kafka&RabbitMQ 2023. 10. 30. 15:59

https://d2.naver.com/helloworld/7181840

 

posted by 여성게
:
Middleware/Kafka&RabbitMQ 2022. 7. 27. 16:19

spring.cloud.stream.bindings.<channelName>.consumer.concurrency 옵션과 관련해 설명한다.

 

아래 상황이 있다고 가정하자.

 

  • 토픽 이름 : A-topic
  • 파티션 개수 : 4
  • 앱 인스턴스 개수 : 1(concurrency == 2)

 

보통은 위와 같이 설정을 하게 되면, 당연히 하나의 앱에서 2개는 동시 처리 하겠구나 생각을 하기 마련이지만 실제로는 그렇게 동작하지 않을 수 있다. 아래와 같이 컨슈머 그룹이 구성 되어 있다고 생각해보자

 

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group a-topic-group --describe

GROUP         TOPIC       PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                             CLIENT-ID
a-topic-group a-topic     2          18846           18846           0               consumer-a-topic-3-cdb73066-d0de-4079-8fb0-a5663ba9c76d consumer-a-topic-3
a-topic-group a-topic     3          18887           18887           0               consumer-a-topic-3-cdb73066-d0de-4079-8fb0-a5663ba9c76d consumer-a-topic-3
a-topic-group a-topic     0          18838           18838           0               consumer-a-topic-2-2f6ac6eb-35f1-464a-b89e-0ea0579c28b2 consumer-a-topic-2
a-topic-group a-topic     1          19020           19020           0               consumer-a-topic-2-2f6ac6eb-35f1-464a-b89e-0ea0579c28b2 consumer-a-topic-2

 

위 그룹을 보면 2개의 컨슈머(CONSUMER-ID를 기준으로 보자)가 각각 2개의 파티션을 나눠가지고 있는데, 여기서 카프카의 특징을 보아야 한다. "하나의 컨슈머는 요청 하나씩만 처리가능하다" 이 특징에서 우리가 기대한대로 동작이 안하는것인데, 이유는 메시지가 2개가 발행되었고 이 2개의 메시지가 위 파티션중에 0,1번에 들어갔다고 보자. 그러면 우리는 2개가 동시에 처리 될것이라 기대하지만 실제로 컨슈머가 한놈이기때문에 먼저 들어온 것을 처리하고 다음것을 처리한다.(컨슈머 하나는 반드시 한번에 하나의 요청만 처리) 그렇기에 2개가 동시에 실행되지 않는 것이고 만약에 운좋게 서로 다른 컨슈머가 붙은 파티션으로 메시지가 들어가면 2개가 동시에 처리 될것이다.

 

그렇다면 우리가 동시에 처리하고 싶다면? "concurrency == 파티션 개수"로 설정해주면 된다. 그러면 아래와 같이 하나의 앱에 4개의 컨슈머가 각각 다른 파티션에 할당될 것이고 실제로 4개의 메시지가 동시에 처리 될 것이다.

 

 

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group a-topic-group --describe

GROUP         TOPIC       PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                             CLIENT-ID
a-topic-group a-topic     2          18846           18846           0               consumer-a-topic-3-cdb73066-d0de-4079-8fb0-a5663ba9c76d consumer-a-topic-3
a-topic-group a-topic     3          18887           18887           0               consumer-a-topic-3-dfdfad01-d0de-4079-8fb0-a5663ba9c76d consumer-a-topic-3
a-topic-group a-topic     0          18838           18838           0               consumer-a-topic-2-dcvzc023-35f1-464a-b89e-0ea0579c28b2 consumer-a-topic-2
a-topic-group a-topic     1          19020           19020           0               consumer-a-topic-2-2f6ac6eb-35f1-464a-b89e-0ea0579c28b2 consumer-a-topic-2
posted by 여성게
:
Middleware/Kafka&RabbitMQ 2020. 7. 26. 17:32

 

 

yoonyeoseong/rabbitmq-sample

Contribute to yoonyeoseong/rabbitmq-sample development by creating an account on GitHub.

github.com

 

오늘 포스팅할 내용은 래빗엠큐이다. 그 동안에는 카프카를 사용할 일이 많아 카프카에 대한 포스팅이 주였는데, 이번에 래빗엠큐를 사용할 일이 생겨 간단히 래빗엠큐에 대해 간단히 다루어 볼것이다.(예제 코드는 위 깃헙에 올려놓았습니다.)

 

비동기 작업에 있어 큐를 사용하려면 중간에 메시지 브로커라는 개념이 존재하는데, 이러한 메시지 브로커에는 RabbitMQ, Kafka 등이 대표적으로 있다. 해당 포스트에서는 표준 MQ프로토콜인 AMQP를 구현한 RabbitMQ(래빗엠큐)에 대해 다루어볼 것이다.

 

간단하게 메시지큐는 아래 그림과 같은 워크 플로우로 이루어져있다. 

 

 

대부분의 메시지큐는 프로듀서가 있고, 해당 프로듀서가 브로커로 메시지를 발행하면, 적절한 컨슈머가 해당 메시지를 구독(읽다)하는 구조이다. 그렇다면 래빗엠큐는 상세하게 어떠한 구조로 되어있을까?

 

래빗엠큐는 단순히 프로듀서가 브로커로 메시지를 전달하여 컨슈머가 큐를 읽어가는 구조라는 면에서는 동일하지만, 프로듀싱하는 과정에서 조금 더 복잡한 개념이 들어간다. 바로 exchange와 route, queue라는 개념이다.

 

간단하게 워크 플로우를 설명하자면 아래와 같다.

 

  1. Producer는 Message를 Exchange에게 보내게 됩니다.
    1. Exchange를 생성할때 Exchange의 Type을 정해야 합니다.
  2. Exchange는 Routing Key를 사용하여 적절한 Queue로 Routing을 진행합니다.
    1. Routing은 Exchange Type에 따라 전략이 바뀌게 됩니다.
  3. Exchange - Queue와 Binding이 완료된 모습을 볼 수 있습니다.
    1. Message 속성에 따라 적절한 Queue로 Routing이 됩니다.
  4. Message는 Consumer가 소비할때까지 Queue에 대기하게 됩니다.
  5. Consumer는 Message를 소비하게 됩니다.

 

위에서 1번에 Exchange라는 개념이 등장하는데, Exchange는 정해진 규칙으로 메시지를 라우팅하는 기능을 가지고 있다. 여기서 정해진 규칙은 크게 4가지가 존재하는데, 규칙들은 아래와 같다.

 

exchange routeing 전략

  • Direct Exchange
    • Message의 Routing Key와 정확히 일치하는 Binding된 Queue로 Routing
  • Fanout Exchange
    • Binding된 모든 Queue에 Message를 Routing
  • Topic Exchange
    • 특정 Routing Pattern이 일치하는 Queue로 Routing
  • Headers Exchange
    • key-value로 정의된 Header 속성을 통한 Routing

 

 

 

익스체인지는 위와 같은 전략으로 메시지를 라우팅하게 된다. 그리고 자주 사용되는 Exchange의 옵션으로는 아래와 같이 있다.

 

exchange options
  • Durability
    • 브로커가 재시작 될 때 남아 있는지 여부
    • durable -> 재시작해도 유지가능
    • transient -> 재시작하면 사라집니다.
  • Auto-delete
    • 마지막 Queue 연결이 해제되면 삭제

마지막으로 래빗엠큐에서 사용되는 용어가 있다.

 

  • Vhost(virutal host)
    • Virtual Host를 통해서 하나의 RabbitMQ 인스턴스 안에 사용하고 있는 Application을 분리할 수 있습니다.
  • Connection
    • 물리적인 TCP Connection, HTTPS -> TLS(SSL) Connection을 사용
  • Channel
    • 하나의 물리적인 Connection 내에 생성되는 가상의 Connection
    • Consumer의 process나 thread는 각자 Channel을 통해 Queue에 연결 될 수 있습니다.

 

여기까지 간단하게 래빗엠큐의 동작과 용어에 대해서 다루어봤으니 실제 코드 레벨로 실습을 진행해본다.

 

docker rabbitmq 설치
sudo docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 \
	--restart=unless-stopped -e RABBITMQ_DEFAULT_USER=username \
    -e RABBITMQ_DEFAULT_PASS=password rabbitmq:management

 

위처럼 도커로 래빗엠큐를 설치해주고, http://localhost:15672(username/password)로 접속해보자.

 

springboot rabbitmq 예제

build.gradle

plugins {
    id 'org.springframework.boot' version '2.3.2.RELEASE'
    id 'io.spring.dependency-management' version '1.0.9.RELEASE'
    id 'java'
}

group = 'com.levi'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '14'

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }
}

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-amqp'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
    annotationProcessor "org.springframework.boot:spring-boot-configuration-processor"
    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
    testImplementation 'org.springframework.amqp:spring-rabbit-test'
}

test {
    useJUnitPlatform()
}

 

application.yml

rabbitmq:
  test:
    username: username
    password: password
    host: localhost
    port: 5672
    virtualHost: levi.vhost
    routeKey: test.route
    exchangeName: test.exchange
    queueName: testQueue
    deadLetterExchange: dead.letter.exchange
    deadLetterRouteKey: dead.letter.route

 

application.yml에서 프로퍼티를 읽어오는 클래스를 아래 하나 만들었다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Configuration
@ConfigurationProperties(prefix = "rabbitmq")
@Getter
@Setter
public class RabbitMqProperty {
    private RabbitMqDetailProperty test;
 
    @Getter
    @Setter
    public static class RabbitMqDetailProperty {
        private String username;
        private String password;
        private String host;
        private int port;
        private String virtualHost;
        private String routeKey;
        private String exchangeName;
        private String queueName;
        private String deadLetterExchange;
        private String deadLetterRouteKey;
    }
}
cs

 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@SpringBootApplication(exclude = {
        RabbitAutoConfiguration.class
})
@EnableRabbit
@RestController
@RequiredArgsConstructor
public class RabbitmqApplication {
    private final RabbitTemplate testTemplate;
 
    public static void main(String[] args) {
        SpringApplication.run(RabbitmqApplication.class, args);
    }
 
    @GetMapping("/")
    public String inQueue() {
        testTemplate.convertAndSend(SampleMessage.of("message!!"));
        return "success";
    }
 
    @Data
    @AllArgsConstructor(staticName = "of")
    @NoArgsConstructor
    public static class SampleMessage {
        private String message;
    }
}
cs

 

래빗엠큐 auto configuration을 꺼주었고, 메시지를 인큐하기 위한 컨트롤러를 하나 만들어 주었다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Slf4j
@Configuration
@RequiredArgsConstructor
public class RabbitMqConfig {
    private final RabbitMqProperty rabbitMqProperty;
 
    @Bean
    public ConnectionFactory testConnectionFactory() {
        final RabbitMqProperty.RabbitMqDetailProperty test = rabbitMqProperty.getTest();
        final CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setHost(test.getHost());
        factory.setPort(test.getPort());
        factory.setUsername(test.getUsername());
        factory.setPassword(test.getPassword());
        factory.setVirtualHost(test.getVirtualHost());
 
        return factory;
    }
 
    @Bean
    public SimpleRabbitListenerContainerFactory testContainer(final ConnectionFactory testConnectionFactory) {
        final SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(testConnectionFactory);
        factory.setConnectionFactory(testConnectionFactory);
        factory.setErrorHandler(t -> log.error("ErrorHandler = {}", t.getMessage()));
        factory.setDefaultRequeueRejected(false); //true일 경우 리스너에서 예외가 발생시 다시 큐에 메시지가 쌓이게 된다.(예외상황 해결안될 시 무한루프)
        factory.setMessageConverter(jacksonConverter());
        //예외 발생시 recover할 수 있는 옵션, 위 에러 핸들러와 잘 구분해서 사용해야할듯
//위 핸들러와 적용 순서등도 테스트가 필요(혹은 둘다 동시에 적용되나?)
//        factory.setAdviceChain(
//                RetryInterceptorBuilder
//                .stateless()
//                .maxAttempts(3) //최대 재시도 횟수
//                .recoverer() //recover handler
//                .backOffOptions(2000, 4, 10000) //2초 간격으로, 4번, 최대 10초 내에 재시도
//                .build()
//        );
        return factory;
    }
 
    @Bean
    public RabbitTemplate testTemplate(final ConnectionFactory testConnectionFactory, final MessageConverter jacksonConverter) {
        final RabbitMqProperty.RabbitMqDetailProperty property = rabbitMqProperty.getTest();
        final RabbitTemplate template = new RabbitTemplate();
        template.setConnectionFactory(testConnectionFactory);
        template.setMessageConverter(jacksonConverter);
        template.setExchange(property.getExchangeName());
        template.setRoutingKey(property.getRouteKey());
        return template;
    }
 
    @Bean
    public MessageConverter jacksonConverter() {
        return new Jackson2JsonMessageConverter();
    }

//app에서 큐만들거나 바인딩 하기 위해서는 RabbitAdmin 필요
    @Bean
    public RabbitAdmin testAdmin(final ConnectionFactory testConnectionFactory) {
        return new RabbitAdmin(testConnectionFactory);
    }
}
cs

 

브로커 연결 및 메시지를 프로듀싱하기 위한 RabbitTemplate 설정이다. 이중 RabbitAdmin 빈을 만들고 있는 수동이 아니라, 앱에서 큐를 만들고 바인딩 하기 위해서는 RabbitAdmin이 빈으로 떠있어야한다. 기타 설정은 주석을 참고하자 !

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@Configuration
@RequiredArgsConstructor
public class RabbitMqBindingConfig {
    private final RabbitMqProperty rabbitMqProperty;
 
    @Bean
    public TopicExchange testTopicExchange() {
        return ExchangeBuilder
                .topicExchange(rabbitMqProperty.getTest().getExchangeName())
                .durable(true)
                .build();
    }
 
    @Bean
    public TopicExchange dlExchange() {
        return ExchangeBuilder
                .topicExchange(rabbitMqProperty.getTest().getDeadLetterExchange())
                .durable(true)
                .build();
    }
 
    @Bean
    public Queue testQueue() {
        final RabbitMqProperty.RabbitMqDetailProperty testDetail = rabbitMqProperty.getTest();
        return QueueBuilder
                .durable(testDetail.getQueueName())
                .deadLetterExchange(testDetail.getDeadLetterExchange())
                .deadLetterRoutingKey(testDetail.getDeadLetterRouteKey())
                .build();
    }
 
    @Bean
    public Queue dlq() {
        return QueueBuilder
                .durable("DEAD_LETTER_QUEUE")
                .build();
    }
 
    @Bean
    public Binding testBinding(final Queue testQueue, final TopicExchange testTopicExchange) {
        return BindingBuilder
                .bind(testQueue)
                .to(testTopicExchange)
                .with(rabbitMqProperty.getTest().getRouteKey());
    }
 
    @Bean
    public Binding dlBinding(final Queue dlq, final TopicExchange dlExchange) {
        return BindingBuilder
                .bind(dlq)
                .to(dlExchange)
                .with(rabbitMqProperty.getTest().getDeadLetterRouteKey());
    }
}
cs

 

 

프로듀싱과 컨슈밍을 위해 브로커에 exchange와 queue를 binding하는 설정이다. 하나 추가적으로는 컨슈밍에서 예외가 발생하였을 때, Recover를 위한 Dead Letter Queue를 선언하였다.

 

이제 앱을 실행시키고, localhost:8080으로 요청을 보내보자 ! 요청을 보내면 메시지를 프로듀싱 할것이고, 해당 메시지를 컨슈밍해서 로그를 찍게 될 것이다. 여기까지 간단하게 래빗엠큐에 대해 다루어봤다.

 

 

 

https://jonnung.dev/rabbitmq/2019/02/06/about-amqp-implementtation-of-rabbitmq/

 

조은우 개발 블로그

 

jonnung.dev

https://velog.io/@hellozin/Spring-Boot%EC%99%80-RabbitMQ-%EC%B4%88%EA%B0%84%EB%8B%A8-%EC%84%A4%EB%AA%85%EC%84%9C

 

Spring Boot와 RabbitMQ 초간단 설명서

이번 포스트에서는 Spring boot 프로젝트에서 RabbitMQ를 사용하는 간단한 방법을 알아보겠습니다. Consumer 코드와 Producer 코드는 GitHub에 있습니다. 먼저 RabbitMQ 서버를 실행해야 하는데 Docker를 사용하�

velog.io

https://nesoy.github.io/articles/2019-02/RabbitMQ

 

RabbitMQ에 대해

 

nesoy.github.io

 

posted by 여성게
:
Middleware/Kafka&RabbitMQ 2019. 7. 6. 15:20

 

2019/07/06 - [Kafka&RabbitMQ] - Apache Kafka - Kafka(카프카)란 ? 분산 메시징 플랫폼 - 1

 

Apache Kafka - Kafka(카프카)란 ? 분산 메시징 플랫폼 - 1

이전 포스팅들에서 이미 카프카란 무엇이고, 카프카 프로듀서부터 컨슈머, 스트리밍까지 다루어보았다. 하지만 이번 포스팅을 시작으로 조금더 내용을 다듬고 정리된 상태의 풀세트의 카프카 포스팅을 시작할 것..

coding-start.tistory.com

이전 포스팅에서 간단히 카프카란 무엇이며 카프카의 요소들에 대해 다루어보았다. 이번 포스팅에는 이전에 소개했던 요소중 카프카 프로듀서에 대해 다루어볼 것이다.

 

카프카 프로듀서란 카프카 클러스터에 대해 데이터를 pub, 기록하는 애플리케이션이다.

 

카프카 프로듀서의 내부 구조

우리는 카프카 프로듀서를 사용하기 위해 추상화된 API를 사용한다. 하지만 우리가 사용하는 API는 내부적으로 많은 절차에 따라 행동을 한다. 이러한 프로듀서가 책임지는 역할들은 아래와 같다.

 

  • 카프카 브로커 URL 부트스트랩 : 카프카 프로듀서는 카프카 클러스터에 대한 메타데이터를 가져오기 위해 최소 하나 이상의 브로커에 연결한다. 프로듀서가 연결하길 원하는 첫 번째 브로커가 다운될 경우를 대비하여 보통 한개 이상의 브로커 URL 리스트를 설정한다.
  • 데이터 직렬화 : 카프카는 TCP 기반의 데이터 송수신을 위해 이진 프로토콜을 사용한다. 이는 카프카에 데이터를 기록할 때, 프로듀서는 미리 설정한 직렬화 클래스를 이용해 직렬화 한 이후에 전송한다. 즉, 카프카 프로듀서는 네트워크 상의 브로커들에게 데이터를 보낵기 전에 모든 메시지 데이터 객체를 바이트 배열로 직렬화한다.
  • 토픽 파티션의 결정 :  어떤 파티션으로 데이터가 전송돼야 하는지 결정하는 일은 카프카 프로듀서의 책임이다. 만약 프로듀서가 데이터를 보내기 위해 API에 파티션을 명시하지 않은 경우 파티셔너에 의해 키의 해시값을 이용해 파티션을 결정하고 데이터를 보낸다. 하지만 키값이 존재하지 않으면 라운드 로빈 방식으로 데이터를 전송한다.
  • 처리 실패/재시도 : 처리 실패 응답이나 재시도 횟수는 프로듀서 애플리케이션에 의해 제어된다.
  • 배치처리 : 호율적인 메시지 전송을 위해서 배치는 매우 유용하다. 미리 설정한 바이트 임계치를 넘지 않으면 버퍼에 데이터를 유지하고 임계치를 넘으면 데이터를 토픽으로 전송한다.

 

카프카 프로듀서의 내부 흐름을 설명하면, 프로듀서는 키/값 쌍으로 이루어진 데이터 객체를 전달 받는다. 그리고 해당 객체를 미리 정의된 시리얼라이저 혹은 사용자 정의 시리얼라이저를 이용해 직렬화 한다. 데이터 직렬화 후 데이터가 전송될 파티션을 결정한다. 파티션 정보가 API 호출에 포함되어 있다면 해당 파티션에 직접 데이터를 보내지만 파티션정보가 포함되지 않았다면 데이터 객체의 키값의 해시를 이용해 파티셔너가 데이터를 보낼 파티션을 결정해준다. 만약 키값이 null이라면 라운드 로빈 방식으로 파티션을 결정한다. 파티션 결정이 끝나면 리더 브로커를 이용하여 데이터 전송 요청을 보낸다.

 

카프카 프로듀서 API

카프카 프로듀서 API를 사용하기 위한 필수파라미터들이다.

 

  • bootstrap.servers : 카프카 브로커 주소의 목록을 설정한다. 주소는 hostname:port 형식으로 지정되며 하나 이상의 브로커 리스트를 지정한다.
  • key.serializer : 메시지는 키 값의 쌍으로 이뤄진 형태로 카프카 브로커에게 전송된다. 브로커는 이 키값이 바이트 배열로 돼있다고 가정한다. 그래서 프로듀서에게 어떠한 직렬화 클래스가 키값을 바이트 배열로 변환할때 사용됬는 지 알려주어야한다. 카프카는 내장된 직렬화 클래스를 제공한다.(ByteArraySerializer, StringSerializer, IntegerSerializer / org.apache.kafka.common.serialization)
  • value.serializer : key.serializer 속성과 유사하지만 이 속성은 프로듀서가 값을 직렬화 하기 위해 사용하는 클래스를 알려준다.

 

1
2
3
4
5
6
7
8
9
10
11
public KafkaProducer<StringString> getProducer(){
        
        Properties producerProps = new Properties();
        
        producerProps.put("bootstrap.servers""localhost:9092,localhost:9093");
        producerProps.put("key.serializer""org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer""org.apache.kafka.common.serialization.StringSerializer");
                
        return new KafkaProducer<>(producerProps);
        
}
cs

 

자바 클래스로 위의 필수 설정들을 사용하여 카프카 프로듀서를 생성하는 코드이다. Properties 객체를 이용해 모든 설정 값을 설정해주고 카프카 프로듀서 객체 생성자의 매개변수로 해당 설정을 넣어준다.

 

 

카프카 프로듀서 객체와 ProducerRecord 객체

프로듀서는 메시지 전송을 위해 ProducerRecord 객체를 이용한다. 해당 ProducerRecord는 토픽이름, 파티션 번호, 타임스탬프, 키, 값 등을 포함하는 객체이다. 파티션 번호, 타임스탬프, 키 등은 선택 파라미터 이지만, 데이터를 보낼 토픽과 데이터 값을 반드시 포함해야한다. 그리고 보낼 파티션 선정에는 3가지 방법이 존재한다.

 

  1. 파티션 번호가 지정되면, 지정된 파티션에 데이터를 전송한다.
  2. 파티션이 지정되지 않고 키값이 존재하는 경우 키의 해시 값을 이용하여 파티션을 정한다.
  3. 키와 파티션 모두 지정되지 않은 경우 파티션은 라운드 로빈 방식으로 할당된다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
public void sendMessageSync(String topicName, String data) throws InterruptedException, ExecutionException {
        
        try(KafkaProducer<StringString> producer = getProducer();){
        
            ProducerRecord<StringString> producerRecord = new ProducerRecord<StringString>(topicName, data);
            
            Future<RecordMetadata> recordMetadata = producer.send(producerRecord);
            
            RecordMetadata result = recordMetadata.get();
            
        }
 
}
cs

 

자바 코드를 이용해 데이터 객체를 생성한 후에 동기식으로 토픽에 데이터를 보내는 코드이다.

 

1
2
3
4
ProducerRecord(String topicName, Integer partition, K key, V value)
ProducerRecord(String topicName, Integer partition, Long timestamp, K key, V value)
ProducerRecord(String topicName, K key, V value)
ProducerRecord(String topicName, V value)
cs

 

ProducerRecord의 생성자 리스트의 일부를 보여준다.

 

일단 send()를 사용해 데이터를 전송하면 브로커는 파티션 로그에 메시지를 기록하고, 메시지에 대한 서버 응답의 메타데이터를 포함한 RecordMetadata를 반환하는데, 이 객체는 오프셋,체크섬,타임스탬프,토픽,serializedKeySize 등을 포함한다.

 

앞에서 일반적인 메시지 게시 유형에 대해 설명했지만, 메시지 전송은 동기 또는 비동기로 수행될 수 있다.

 

  • 동기 메시징 : 프로듀서는 메시지를 보내고 브로커의 회신을 기다린다. 카프카 브로커는 오류 또는 RecordMetadata를 보낸다. 일반적으로 카프카는 어떤 연결 오류가 발생하면 재전송을 시도한다. 그러나 직렬화, 메시지 등에 관련된 오류는 애플리케이션 단에서 처리돼야 하며, 이경우 카프카는 재전송을 시도하지 않고 예외를 즉시 발생시킨다.
  • 비동기 메시징 : 일부 즉각적으로 응답 처리를 하지 않아도 되는 혹은 원하지 않는 경우 또는 한두 개의 메시지를 잃어버려도 문제되지 않거나 나중에 처리하기를 원할 수 있다. 카프카는 send() 성공 여부와 관계없이 메시지 응답 처리를 콜백 인터페이스 제공한다.

 

1
send(ProducerRecord<K,V> record, Callback callback)
cs

 

위의 콜백 인터페이스는 onCompletion 메서드를 포함하는데, 오버라이드하여 구현하면 된다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
producer.send(producerRecord, new Callback() {
                
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    
                    if(exception != null) {
                        log.info("예외 발생");
                        /*
                         * 예외처리
                         */
                    }else {
                        log.info("정상처리");
                        /*
                         * 정상처리 로직
                         */
                    }
                    
                }
});
cs

 

onCompletion 메소드 내부에 성공했거나 오류가 발생한 메시지를 처리할 수 있다.

 

 

사용자 정의 파티셔닝

카프카는 내부적으로 내장된 기본 파티셔너와 시리얼라이저를 사용한다. 하지만 가끔은 메시지가 해당 브로커에서 동일한 키는 동일한 파티션으로 가도록 사용자 정의 파티션 로직을 원할 수 있는데, 카프카는 이렇게 사용자 정의 파티셔너 구현을 할 수 있도록 인터페이스를 제공한다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class CustomPartition implements Partitioner{
 
        @Override
        public void configure(Map<String, ?> configs) {
            // TODO Auto-generated method stub
            
        }
 
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
                Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            
            int numPartitions = partitions.size();
            //TODO : 파티션 선택 로직
            return 0;
        }
 
        @Override
        public void close() {
            // TODO Auto-generated method stub
            
        }
        
}
cs

 

해당 커스텀 파티셔너를 사용하려면 Properties에 "partitioner.class"/"package.className" 키/값 형태소 설정해주면 된다.

 

1
2
3
4
5
6
7
8
9
10
11
12
public KafkaProducer<StringString> getProducer(){
        
        Properties producerProps = new Properties();
        
        producerProps.put("bootstrap.servers""localhost:9092,localhost:9093");
        producerProps.put("key.serializer""org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer""org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("partitioner.class","com.kafka.exam.CustomPartition");
        
        return new KafkaProducer<>(producerProps);
        
}
cs

 

추가 프로듀서 설정

 

  • buffer.memory : 카프카 서버로 전송을 기다리는 메시지를 위해 프로듀서가 사용할 버퍼 메모리의 크기다. 간단히 전송되지 않은 메시지를 보관하는 자바 프로듀서가 사용할 전체 메모리다. 이 설정의 한계에 도달하면, 프로듀서는 예외를 발생시키기 전에 max.block.ms 시간 동안 메시지를 대기시킨다. 만약 배치 사이즈가 더 크다면 프로듀서 버퍼에 더 많은 메모리를 할당해야한다. 예를 들어 설명하자면 카프카 서버에 메시지를 전송하는 속도보다 프로듀서가 메시지를 전송하는 요청의 수가 더 빠를 경우(1개씩 메시지를 카프카 서버에 전송하는데 하나를 보내면 2개의 요청이 계속 쌓인다면) 아직 보내지 않은 메시지는 해당 버퍼에 쌓이게 된다. 이 내용에 더해서 큐에 있는 레코드가 계속 남아있는 것을 피하기 위해 request.timeout.ms를 사용해 지정된 시간동안 메시지가 보내지지 않고 큐에 쌓여있다면 메시지는 큐에서 제거되고 예외를 발생시킨다.
  • acks : 이 설정은 각 파티션의 리더가 팔로워(복제본)에게 ACK을 받는 전략을 설정한다. ack=0이라면 리더에 데이터가 쓰이자 마자, 팔로워(복제본)가 데이터 로그를 쓰는 것을 신경쓰지 않고 커밋을 완료한다. ack=1 하나의 팔로워에게 ack를 받으면 커밋을 완료한다. ack=all 모든 팔로워에게 복제완료 ack를 받으면 커밋을 완료한다. 각 설정은 장단점이 있으므로 잘 조율해서 사용해야한다. 만약 ack=0이면 빠른 처리 성능을 보장하지만 매우 높은 가능성의 메시지 손실이 있을 수 있고, ack=1도 역시 처리성능은 빠르지만 여전히 메시지 손실가능성이 존재한다. 마지막으로 ack=all은 메시지 손실 가능성이 매우 낮지만 처리 성능이 느려진다.
  • batch.size : 이 설정은 설정된 크기만큼 여러 데이터를 모아 배치로 데이터를 전송한다. 하지만 프로듀서는 단위 배치가 꽉 찰때까지 기다릴 필요는 없다. 배치는 특정 주기로 전성되며 배치에 포함된 메시지 수는 상관하지 않는다.
  • linger.ms : 브로커로 현재의 배치를 전송하기 전에 프로듀서가 추가 메시지를 기다리는 시간을 나타낸다.
  • compression.type : 프로듀서는 브로커에게 압축되지 않은 상태로 메시지를 전송하는 것이 기본이지만, 해당 옵션을 사용해 데이터를 압축해 보내 처리 성능을 높힐 수 있다. 배치처리가 많을 수록 유용한 옵션이다.
  • retrites : 메시지 전송이 실패하면 프로듀서가 예외를 발생시키기 전에 메시지의 전송을 다시 시도하는 수.

기타 많은 설정들이 존재하지만 여기서 모두 다루지는 않는다. 공식 홈페이지를 이용해자.

 

자바 카프카 프로듀서 예제

 

코드 작성 전에 Maven에 카프카 dependency 설정을 해준다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public KafkaProducer<StringString> getProducer(){
        
        Properties producerProps = new Properties();
        
        producerProps.put("bootstrap.servers""localhost:9092,localhost:9093");
        producerProps.put("acks""all");
        producerProps.put("retries"1);
        producerProps.put("batch.size"20000);
        producerProps.put("linger.ms"1);
        producerProps.put("buffer.memory"24568545);
        
        return new KafkaProducer<>(producerProps);
        
}
/*
     * 동기식 
     * 
     * ProducerRecord 생성자 매개변수(*표시는 필수 매개변수)
     * *1)토픽이름,2)파티션 번호,3)타임스탬프,4)키,*5)값
     * 
     * 파티션선정
     * 1)파티션 번호가 ProducerRecord의 매개변수로 명시되어 있다면 그 파티션으로 보낸다.
     * 2)파티션이 지정되지 않고 ProducerRecord 매개변수에 키가 지정된 경우 파티션은 키의 해시값을 사용해 정한다.
     * 3)ProducerRecord 매개변수에 키와 파티션 번호 모두 지정되지 않은 경우에는 라운드 로빈 방식으로 정한다.
     */
    public void sendMessageSync(String topicName, String data) throws InterruptedException, ExecutionException {
        
        try(KafkaProducer<StringString> producer = getProducer();){
        
            ProducerRecord<StringString> producerRecord = new ProducerRecord<StringString>(topicName, data);
            
            Future<RecordMetadata> recordMetadata = producer.send(producerRecord);
            
            RecordMetadata result = recordMetadata.get();
            
        }
    }
    
    /*
     * 비동기식
     */
    public void sendMessageAsync(String topicName, String data) {
        
        try(KafkaProducer<StringString> producer = getProducer();){
        
            ProducerRecord<StringString> producerRecord = new ProducerRecord<StringString>(topicName, data);
            
            producer.send(producerRecord, new Callback() {
                
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    
                    if(exception != null) {
                        log.info("예외 발생");
                        /*
                         * 예외처리
                         */
                    }else {
                        log.info("정상처리");
                        /*
                         * 정상처리 로직
                         */
                    }
                    
                }
            });
            
        }
    }
http://colorscripter.com/info#e" target="_blank" style="color:#e5e5e5text-decoration:none">Colored by Color Scripter
http://colorscripter.com/info#e" target="_blank" style="text-decoration:none;color:white">cs

 

 

카프카 프로듀서를 사용할 때 유의해야 하는 사항

  • 데이터 유효성 검증 : 보낼 데이터의 유효성 검증은 간과하기 쉽다. 꼭 유효성 검증을 포함하는 로직을 작성하자.
  • 예외처리 : 연결 실패,네트워크 시간 초과 등의 예외로는 프로듀서가 내부적으로 재전송을 한다. 하지만 모든 예외에 대해서 재전송을 하는 것은 아니다. 즉, 예외 발생에 대해 처리의 책임은 온전히 프로듀서 애플리케이션에 있는 것이다.
  • 부트스트랩 URL 수 : 클러스터 노드수가 적다면 모든 URL를 목록으로 작성하자. 하지만 많약 노드가 아주 많다면 대표하는 브로커의 수만 목록으로 설정해주자.
  • 잘못된 파티셔닝 방식의 사용방지 : 파티션은 카프카에서 병렬 처리의 단위다. 메시지가 모든 토픽 파티션으로 균일하게 분배되도록 올바른 파티셔닝 전략을 선택하자. 만약 잘못된 파티셔닝 방식을 도입하게 되면 특정 파티션에만 요청이 집중되는 최악의 상황이 발생할 수 있으니 메시지가 모든 가용한 파티션에 분배 되도록 올바른 파티셔닝 방식을 정의하자.
  • 기존 토픽에 새로운 파티션 추가 방지 : 메시지를 분산시키기 위해서 키 값을 사용하여 파티션을 선택하는 토픽에는 파티션을 추가하는 것을 피해야한다. 새로운 파티션 추가는 각 키에 대해 산출된 해시 코드를 변경시키며, 이는 파티션의 수도 산출에 필요한 입력 중 하나로 사용하기 때문이다.

 

posted by 여성게
:
Middleware/Kafka&RabbitMQ 2019. 7. 6. 13:28

 

 

이전 포스팅들에서 이미 카프카란 무엇이고, 카프카 프로듀서부터 컨슈머, 스트리밍까지 다루어보았다. 하지만 이번 포스팅을 시작으로 조금더 내용을 다듬고 정리된 상태의 풀세트의 카프카 포스팅을 시작할 것이다.

 

카프카 시스템의 목표

  • 메시지 프로듀서와 컨슈머 사이의 느슨한 연결
  • 다양한 형태의 데이터 사용 시나리오와 장애 처리 지원을 위한 메시지 데이터 유지
  • 빠른 처리 시간을 지원하는 구성 요소로 시스템의 전반적인 처리량을 최대화
  • 이진 데이터 형식을 사용해서 다양한 데이터 형식과 유형을 관리
  • 기존의 클러스터 구성에 영향을 주지 않고 일정한 서버의 확장성을 지원

 

카프카의 구조

카프카 토픽에서 모든 메시지는 바이트의 배열로 표현되며, 카프카 프로듀서는 카프카 토픽에 메시지를 저장하는 애플리케이션이다. 이렇게 프로듀서가 보낸 데이터를 저장하고 있는 모든 토픽은 하나 이상의 파티션으로 나뉘어져있다. 각 토픽에 대해 여러개로 나뉘어져 있는 파티션은 메시지를 도착한 순서에 맞게 저장한다.(내부적으로는 timestamp를 가지고 있다.) 카프카에서는 프로듀서와 컨슈머가 수행하는 두 가지 중요한 동작이 있다. 프로듀서는 로그 선행 기입 파일 마지막에 메시지를 추가한다. 컨슈머는 주어진 토픽 파티션에 속한 로그 파일에서 메시지를 가져온다. 물리적으로 각 토픽은 자신에게 할당된 하나 이상의 파티션을 다른 브로커(카프카 데몬 서버)들에게 균등하게 분배된다.

 

이상적으로 카프카 파이프라인은 브로커별로 파티션과 각 시스템의 모든 토픽에 대해 균등하게 분배되어야 한다. 컨슈머는 토픽에 대한 구독 또는 이런 토픽에서 메시지를 수신하는 애플리케이션이다.

 

이러한 카프카 시스템은 고가용성을 위한 클러스터를 지원한다. 전형적인 카프카 클러스터는 다중 브로커로 구성된다. 클러스터에 대한 메시지 읽기와 쓰기 작업의 부하 분산을 돕는다. 각 브로커는 자신의 상태를 저장하지 않지만 Zookeeper를 사용해 상태 정보를 유지한다. 각각의 토픽 파티션에는 리더로 활동하는 브로커가 하나씩 있고, 0개 이상의 팔로워를 갖는다. 여느 리더/팔로워 관계와 비슷하게 리더는 해당하는 파티션의 읽기나 쓰기 요청을 관리한다. 여기서 Zookeeper는 카프카 클러스터에서 중요한 요소로, 카프카 브로커와 컨슈머를 관리하고 조정한다. 그리고 카프카 클러스터 안에서 새로운 브로커의 추가나 기존 브로커의 장애를 감시한다. 예전 카프카 버전에서는 파티션 오프셋 관리도 Zookeeper에서 관리하였지만 최신 버전의 카프카는 오프셋 관리를 위한 토픽이 생성되어 오프셋관련된 데이터를 하나의 토픽으로 관리하게 된다.

 

 

메시지 토픽

메시징 시스템에서 메시지는 어디간에 저장이 되어 있어야한다. 카프카는 토픽이라는 곳에 메시지를 바이트 배열 형태로 저장한다. 각 토픽은 비즈니스 관점에서 하나의 카테고리가 될 수 있다. 다음은 메시지 토픽에 대한 용어 설명이다.

 

  • 보관 기간 : 토픽 안의 메시지는 처리 시간과 상관없이 정해진 기간 동안에만 메시지를 저장하고 있는다. 기본 값은 7일이며 사용자가 값 변경이 가능하다.
  • 공간 유지 정책 : 메시지의 크기가 설정된 임계값에 도달하면 메시지를 지우도록 설정가능하다. 카프카 시스템 구축시 충분한 용량 계획을 수립해야 원치않은 삭제가 발생하지 않는다.
  • 오프셋 : 카프카에 할당된 각 메시지는 오프셋이라는 값이 사용된다. 토픽은 많은 파티션으로 구성돼 있으며, 각 파티션은 도착한 순서에 따라 메시지를 저장하고, 컨슈머는 이러한 오프셋으로 메시지를 인식하고 특정 오프셋 이전의 메시지는 컨슈머가 수신했던 메시지로 인식한다.
  • 파티션 : 카프카 메시지 토픽은 1개 이상의 파티션으로 구성되어 분산 처리된다. 해당 파티션의 숫자는 토픽 생성시 설정가능하다. 만약 순서가 아주 중요한 데이터의 경우에는 파티션 수를 1개로 지정하는 것도 고려할 수 있다.(아무리 파티션이 시계열로 데이터가 저장되지만 여러 파티션에 대한 메시지 수신은 어떠한 순서로 구독될지 모르기 때문이다.)
  • 리더 : 파티션은 지정된 복제 팩터에 따라 카프카 클러스터 전역에 걸쳐 복제된다. 각 파티션은 리더 브로커와 팔로워 브로커를 가지며 파티션에 대한 모든 읽기와 쓰기 요청은 리더를 통해서만 진행된다.

 

메시지 파티션

각 메시지는 파티션에 추가되며 각 단위 메시지는 오프셋으로 불리는 숫자에 맞게 할당된다. 카프카는 유사한 키를 갖고 있는 메시지가 동일한 파티션으로 전송되도록 하며, 메시지 키의 해시 값을 산출하고 해당 파티션에 메시지를 추가한다. 여기서 중요하게 짚고 넘어가야 할 것이 있다. 각 단위 메시지에 대한 시간적인 순서는 토픽에 대해서는 보장되지 않지만, 파티션 안에서는 항상 보장된다. 즉, 나중에 도착한 메시지가 항상 파티션의 끝 부분에 추가됨을 의미한다. 예를 들어 설명하자면,

 

A 토픽은 a,b,c라는 파티션으로 구성된다는 가정이다. 만약 라운드 로빈 방식으로 메시지가 각 파티션에 분배가 되는 옵션을 적용했다고 생각해보자. 1,2,3,4,5,6이라는 메시지가 pub되었고 각 메시지는 a-1,4 / b-2,5 / c-3,6 와 같이 파티션에 분배된다. 1,2,3,4,5,6이라는 순서로 메시지가 들어왔고 해당 메시지는 시간적인 순서와 라운드로빈 방식으로 a,b,c라는 파티션에 배분되었다. 또한 각 파티션 안을 살펴보면 확실히 시간 순서로 배분되었다. 하지만 컨슈머 입장에서는 a,b,c파티션에서 데이터를 수신했을 경우 1,4,2,5,3,6이라는 메시지 순서로 데이터를 수신하게 될것이다.(a,b,c순서로 수신, 하지만 파티션 수신순서는 바뀔수 있다.) 무엇을 의미할까? 위에서 말한것과 같이 파티션 내에서는 시간적인 순서를 보장하지만 전체적인 토픽관점에서는 순서가 고려되지 않는 것이다. 즉, 순서가 중요한 데이터는 동일한 키를 사용해 동일 파티션에만 데이터를 pub하거나 혹은 토픽에 파티션을 하나만 생성해 하나의 파티션에 시간적인 순서로 데이터를 저장되게 해야한다.

 

많은 수의 파티션을 구성하는 경우의 장단점

  • 높은 처리량 보장 : 파티션을 여러개 구성한다면 병렬 처리되어 높은 처리량을 보장할 것이다. 왜냐하면 여러 파티션에 대해 쓰기 동작이 여러 스레드를 이용해 동시에 수행되기 때문이다. 또한 하나의 컨슈머 그룹 내에서 하나의 파티션에 대해 하나의 컨슈머가 할당되기 때문에 여러 파티션의 메시지를 여러 컨슈머가 동시에 수신하여 병렬처리가 가능하다. 여기서 중요한 것은 동일한 컨슈머 그룹 안의 하나의 파티션에 대해 여러 컨슈머가 읽을 수 없다는 것이다.
  • 프로듀서 메모리 증가 : 만약 파티션 수가 많아 진다면 일시적으로 프로듀서의 버퍼의 메모리가 과도해질 수 있어, 토픽에 메시지를 보내는데 문제가 생길 수 있으므로 파티션 수는 신중히 고려해야한다.
  • 고가용성 문제 : 여러 파티션을 생성하므로서 카프카 클러스터의 고가용성을 지원한다. 즉, 리더인 파티션이 중지되면 팔로워중에 하나가 리더로 선출될 것이다. 하지만 너무 과중한 파티션 수는 리더 선출에 지연이 생길 가능성이 있기 때문에 신중히 고려해야한다.

복제와 복제로그

복제는 카프카 시스템에서 신뢰성있는 시스템을 구현하기 위해 가장 중요한 부분이다. 각 토픽 파티션에 대한 메시지 로그의 복제본은 카프카 클러스터 내의 여러 서버에 걸쳐서 관리되고, 각 토픽마다 복제 팩터를 다르게 지정가능하다.

 

일반적으로 팔로워는 리더의 로그 복사본을 보관하는데, 이는 리더가 모든 팔로워로부터 ACK를 받기 전까지는 메시지를 커밋하지 않는다는 것을 의미한다.(ack=all 설정시)

 

메시지 프로듀서 

일반적으로 프로듀서는 파티션으로 데이터를 쓰기 않고, 메시지에 대한 쓰기 요청을 생성해서 리더 브로커에게 전송한다. 그 이후 파티셔너가 메시지의 해시 값을 계산하여 프로듀서로 하여금 어느 파티션에 메시지를 pub할지 알 수 있도록 한다.

 

일반적으로 해시 값은 메시지 키를 갖고 계산하며, 메시지 키는 카프카의 토픽으로 메시지를 기록할 때 제공된다. null 키를 갖는 메시지는 분산 메시징을 지원하는 파티션에 대해 라운드 로빈 방식으로 분배된다. 카프카에서의 각 파티션은 한 개의 리더를 가지며, 각 읽기와 쓰기 요청은 리더를 통해 진행된다.

 

프로듀서는 설정에 따라 메시지의 ACK를 기다리며, 일반적으로 모든 팔로워 파티션에 대해 복제가 완료되면 커밋을 완료한다. 또한 커밋이 완료되지 않으면 읽기 작업은 허용되지 않는다. 이는 메시지의 손실을 방지한다. 그렇지만 ACK 설정을 1로 설정할 수 있는데, 이경우 리더 파티션이 하나의 팔로워에게만 복제 완료 응답을 받으면 커밋을 완료한다. 이 경우 처리 성능은 좋아지지만 메시지의 손실은 어느정도 감수 해야한다. 즉, 메시지의 손실을 허용하고 빠른 처리 시간을 원하는 경우에 사용할 수 있는 설정이다.

 

메시지 컨슈머

카프카 토픽을 구독하는 역할을 하는 애플리케이션이다. 각 컨슈머는 컨슈머 그룹에 속해 있으며, 일부 컨슈머 그룹은 여러개의 컨슈머를 포함한다. 위에서도 간단히 설명하였지만 동일 그룹의 컨슈머들은 동시에 같은 토픽의 서로다른 파티션에서 메시지를 읽어온다. 하지만 서로 다른 그룹의 컨슈머들은 같은 토픽에서 데이터를 읽어와 서로 영향을 미치지 않으면서 메시지 사용이 가능하다.

 

주키퍼의 역할

  • 컨트롤러 선정 : 컨트롤러는 파티션 관리를 책임지는 브로커 중에 하나이며, 파티션 관리는 리더 선정, 토픽 생성, 파티션 생성, 복제본 관리 등을 포함한다. 하나의 노드 또는 서버가 꺼지면 카프카 컨트롤러는 팔로워 중에서 파티션 리더를 선정한다. 카프카는 컨트롤러를 선정하기 위해 주키퍼의 메타데이터를 정보를 이용한다.
  • 브로커 메타데이터 : 주키퍼는 카프카 클러스터의 일부인 각 브로커에 대해 상태 정보를 기록한다.
  • 토픽 메타데이터 : 주키퍼는 또한 파티션 수, 특정한 설정 파라미터 등의 토픽 메타 데이터를 기록한다.

이외에 주키퍼는 더 많은 역할을 담당하고 있다.

 

여기까지 간단히 카프카에 대한 소개였다. 이후 포스팅들에서는 프로듀서,컨슈머,스트리밍,클러스터 구축 등의 내용을 몇 차례에 거쳐 다루어 볼 것이다.

posted by 여성게
:
Middleware/Kafka&RabbitMQ 2019. 3. 30. 00:45

Kafka - Kafka Stream API(카프카 스트림즈) - 2



이전 카프카 스트림즈 포스팅에서는 간단하게 카프카 스트림즈 API를 다루어보았습니다. 이번 2번째 카프카 스트림즈 포스팅은 조금더 깊게 카프카 스트림즈에 대해 알아보려고 합니다.


Kafka Streams는 Kafka 프로듀서 및 컨슈머를 이용하여 들어오는 메시지를 즉각적으로 가공하여 또 다른 토픽으로 메시지를 내보낼 수 있습니다. 이러한 카프카 스트림즈를 사용하는 기업들을 소개하자면 New York Times는 카프카와 카프카 스트림즈를 이용하여 독자들을 위한 실시간 컨첸츠를 저장하고 배포합니다.그리고 라인 같은 경우는 서비스끼리 통신하기 위한 중앙 데이터 허브로 카프카를 사용합니다. 그리고 카프카 스트림즈를 이용하여 토픽을 데이터를 안정적으로 가공하고 필터링하여 컨슈머가 효율적으로 메시지를 컨슘할 수 있게 합니다. 이러한 많은 기업들이 안정적으로 실시간 데이터를 처리하기 위해 카프카와 카프카 스트림즈를 이용합니다.



카프카 메시징 계층은 데이터를 저장하고 전송하기 위해 데이터를 파티션 단위로 분할한다. 카프카 스트림즈 역시 파티션된 토픽을 기반으로 병렬 처리 모델의 논리 단위로 작업을 진행한다. 카프카 스트림즈는 키/값 형태의 스트림 데이터이므로 스트림 데이터의 키값으로 토픽내 특정 파티션으로 라우팅된다. 일반적으로 파티션 개수만큼 컨슈머 그룹안의 컨슈머들이 생성되듯이 카프카 스트림즈도 역시 파티션 개수만큼 태스크가 생성되어 병렬로 데이터 스트림을 처리할 수 있다. 또한 작업스레드 수를 조정할 수 있어 더욱 효율적인 병렬처리가 가능하다.


Required configuration parameters


  • application.id - 스트림 처리 응용 프로그램의 식별자 아이디이다. Kafka 클러스터 내에서 유일한 값이어야 한다.
  • bootstrap.servers - Kafka 인스턴스 호스트:포트 정보이다.


Optional configuration parameters(중요도 보통이상의 설정값)


  • cache.max.bytes.buffering - 모든 스레드에서 레코드 캐시에 사용할 최대 메모리 바이트 수입니다.(default 10485760 bytes)
  • client.id - 요청시 서버에 전달할 ID 문자열이다. 카프카 스트림즈 내부적으로 프로듀서,컨슈머에게 전달된다(default empty string)
  • default.deserialization.exception.handler - DeserializationExceptionHandler인터페이스를 구현하는 예외 처리 클래스이다.

(default  LogAndContinueExceptionHandler)

  • default.production.exception.handler - ProductionExceptionHandler인터페이스를 구현하는 예외 처리 클래스이다.

  (default DefaultProductionExceptionHandler)

  • key.serde - record key에 대한 직렬화/역직렬화 클래스이다. Serde 인터페이스를 구현한다.(default Serdes.ByteArray().getClass().getName() )
  • num.standby.replicas - 각 태스크의 대기 복제본 수이다.(default 0)
  • num.stream.threads - 스트림을 처리할 스레드 수이다.(default 1)
  • replication.factor - 복제본 수이다.(default 1)
  • retries - 처리 실패시 재시도 횟수(default 0)
  • retry.backoff.ms - 재시도 요청 간격 시간(default 100밀리세컨드)
  • state.dir - 상태 저장소 디렉토리 위치(default /tmp/kafka-streams)
  • value.serde - record value에 대한 직렬화/역직렬화 클래스이다. key.serde와 동일.


default.deserialization.exception.handler


기본 deserialization 예외 처리기를 사용하면 deserialize에 실패한 레코드의 예외를 관리할 수 있다. 예외 처리기는 throw된 예외에 따라 FAIL 또는 CONTINUE를 반환해야한다. FAIL은 스트림이 종료될 것이고, CONTINUE는 예외를 무시하고 계속해서 처리를 진행할 것이다. 내부적으로 제공하는 예외 핸들어가 있다.


  • LogAndContinueExceptionHandler - 예외가 발생하여도 계속해서 프로세스를 진행한다.
  • LogAndFailExceptionHandler - 예외가 발생하면 프로세스를 중지한다.

기본적으로 제공하는 핸들러 이외에도 사용자가 직접 정의하여 예외 핸들러를 구현할 수 있다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class SendToDeadLetterQueueExceptionHandler implements DeserializationExceptionHandler {
    KafkaProducer<byte[], byte[]> dlqProducer;
    String dlqTopic;
 
    @Override
    public DeserializationHandlerResponse handle(final ProcessorContext context,
                                                 final ConsumerRecord<byte[], byte[]> record,
                                                 final Exception exception) {
 
        log.warn("Exception caught during Deserialization, sending to the dead queue topic; " +
            "taskId: {}, topic: {}, partition: {}, offset: {}",
            context.taskId(), record.topic(), record.partition(), record.offset(),
            exception);
 
        dlqProducer.send(new ProducerRecord<>(dlqTopic, record.timestamp(), record.key(), record.value(), record.headers())).get();
 
        return DeserializationHandlerResponse.CONTINUE;
    }
 
    @Override
    public void configure(final Map<String, ?> configs) {
        dlqProducer = .. // get a producer from the configs map
        dlqTopic = .. // get the topic name from the configs map
    }
}
cs


위의 예제코드는 예외가 발생하면 DLQ 토픽에 메시지를 보낸 이후 계속해서 프로세스를 진행하는 예외 핸들러이다. 위에서 이야기하였듯이 반드시 FAIL,CONTINUE 둘중하나를 반환해야 한다.


  • default.production.exception.handler 


프로덕션 예외 처리기를 사용하여 너무 큰 메시지 데이터를 생성하는 등 브로커와 상호 작용하려고 할때 트리거되는 예외를 컨트롤 할 수 있습니다. 기본적으로 카프카는 DefaultProductionExceptionHandler를 제공한다. 이 예외 처리기도 역시 FAIL,CONTINUE등 하나를 반환하여 프로세스 진행여부를 결정해주어야 한다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse;
 
public class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler {
    public void configure(Map<String, Object> config) {}
 
    public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
                                                     final Exception exception) {
        if (exception instanceof RecordTooLargeException) {
            return ProductionExceptionHandlerResponse.CONTINUE;
        } else {
            return ProductionExceptionHandlerResponse.FAIL;
        }
    }
}
 
Properties settings = new Properties();
 
// other various kafka streams settings, e.g. bootstrap servers, application id, etc
 
settings.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
             IgnoreRecordTooLargeHandler.class);
cs



Stream DSL


  • branch - 제공되는 predicate에 따라서 적절히 여러개의 KStream으로 분할한다.(KStream[] 반환)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
 * branch
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam {
 
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        source.foreach((key,value)->log.info("key,value ={}",key+" "+value));
        KStream<StringString>[] streamArr = source.branch(
            (key,value) -> value.startsWith("A"),
            (key,value) -> value.startsWith("B"),
            (key,value) -> true
        );
        
        streamArr[0].to("stream-exam-output1");
        streamArr[1].to("stream-exam-output2");
        streamArr[2].to("stream-exam-output3");
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs


위의 코드는 stream-exam-input이라는 토픽에 "A"로 시작하는 데이터가 들어오면 stream-exam-output1, "B"로 시작하는 데이터가 들어오면 stream-exam-output2, 그 밖의 데이터는 stream-exam-output3으로 보내는 간단한 branch 예제이다. branch에는 적절하게 Predicate를 리스트 형태로 작성한다. 데이터의 값에 따라 다른 처리를 해야할때에는 branch를 사용하면 각각 다른 처리를 하는 토픽으로 메시지를 내보낼 수 있을 것 같다.


  • filter - 적절한 Predicate을 인자로 받아 메시지를 필터링한다.(KStream 반환)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
 * filter
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam2 {
 
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        source.filter( (key,value)->value.length()>3 ).to("stream-exam-output");;
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs


위의 예제는 토픽으로 메시지를 입력받아 value 값의 길이가 3보다 크다면 stream-exam-output 토픽으로 메시지를 내보내는 예제이다.


  • filterNot - filter와 반대되는 개념이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
 * filterNot
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam3 {
 
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        source.filterNot( (key,value)->value.length()>3 ).to("stream-exam-output");;
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs

fliter와는 반대되는 개념이다. 크게 설명할 부분을 없을 것같다.

  • flatMap - 하나의 레코드를 이용해 0개 혹은 하나 이상의 레코드를 생성한다. flatMap은 key,value가 바뀔 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
 * flatMap
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam4 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        source.flatMap(
            new KeyValueMapper() {
                @Override
                public Object apply(Object key, Object value) {
                    List<KeyValue<Object, Object>> list = new LinkedList<>();
                    String keyStr = (String)key;
                    String valueStr = (String)value;
                    list.add(KeyValue.pair(valueStr.toUpperCase(),value));
                    list.add(KeyValue.pair(valueStr.toLowerCase(),value));
                    return list;
                }
            }
        ).to("stream-exam-output");;
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs

위의 예제는 메시지를 받아서 해당 메시지의 값을 대문자,소문자를 키로하여 2개의 레코드를 만드는 예제이다.

  • flatMapValues - 원본 레코드의 키를 유지하면서 하나의 레코드를 가져와서 0개 혹은 하나 이상의 레코드를 생성한다. value의 값과 값의 타입이 바뀔 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
 * flatMapValue
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam5 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        source.flatMapValues(value->Arrays.asList(value.split(" "))).to("stream-exam-output");
//        source.flatMapValues(
//                new ValueMapper() {
//                    @Override
//                    public Object apply(Object value) {
//                        String valueStr = (String)value;
//                        return Arrays.asList(valueStr.split(" "));
//                    }
//                }
//        );
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs

위의 예제는 메시지의 값에 공백을 포함하고 있으면 해당 메시지를 공백기준으로 split하여 list로 반환한다. 그럼 해당 List는 flat되어 여러개의 String Stream으로 변환되어 stream-exam-output 토픽으로 메시지가 전달된다.

  • GroupByKey - 기존 키로 레코드를 그룹핑한다. 스트림이나 테이블을 집계하고 후속작업을 위해 키로 그룹화하여 파티션하는 것을 보장한다. 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
 * GroupByKey
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam6 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        KGroupedStream<StringString> groupedStream = source.flatMap(
            new KeyValueMapper() {
                @Override
                public Object apply(Object key, Object value) {
                    List<KeyValue<Object, Object>> list = new LinkedList<>();
                    String keyStr = (String)key;
                    String valueStr = (String)value;
                    list.add(KeyValue.pair(value,valueStr.toUpperCase()));
                    list.add(KeyValue.pair(value,valueStr.toLowerCase()));
                    return list;
                }
            }
        ).groupByKey();
        
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs


스트림의 기존 키값을 이용하여 그룹핑한다.


GroupBy - 새로운 키로 그룹핑한다. 테이블을 그룹핑할 때는 새로운 값과 값 유형을 지정할 수도 있다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
 * GroupBy
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam7 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        KGroupedStream<StringString> groupedStream = source.groupBy(
                (key,value)->value, Serialized.with(Serdes.String(),Serdes.String())
        );
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs


위의 예제와 같이 기존 스트림에서 받은 키값을 그대로 사용하는 것이 아니라 변경하여 사용할 수 있다.(예제는 value를 키로 사용한다)


  • map - 하나의 레코드를 가져와서 다른 하나의 레코드를 생성한다. 타입을 포함하여 키와 값을 수정할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
 * map
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam8 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        source.map(
            (key,value)->KeyValue.pair(value, key)
        );
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs


메시지를 받아서 키와 값을 바꿔주는 map 예제이다.


  • mapValues - 하나의 레코드를 받아서 값을 바꿔준다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
 * mapValues
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam9 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        source.mapValues(
            (value)->value+"_map"
        ).to("stream-exam-output");
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs


메시지를 받아서 값에 "_map"이라는 문자열을 추가하여 수정하였다.


  • merge - 두 스트림의 레코드를 하나의 스트림으로 merge한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
 * merge
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam10 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        source.foreach((key,value)->log.info("key,value ={}",key+" "+value));
        KStream<StringString>[] streamArr = source.branch(
            (key,value) -> value.startsWith("A"),
            (key,value) -> value.startsWith("B"),
            (key,value) -> true
        );
        
        KStream<StringString> merge1 = streamArr[0].merge(streamArr[1]);
        merge1.merge(streamArr[2]).to("stream-exam-output");
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs



  • selectKey - 각 레코드에 새키를 할당한다. 마치 map을 이용하여 key값을 바꾸는 것과 동일하다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
 * selectKey
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam11 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
 
        source.selectKey(
  //리턴되는 값이 키가된다.
            (key,value)->value.charAt(0)+""
        ).to("stream-exam-output");
            
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs


  • toStream - KTable을 스트림으로 가져온다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Slf4j
public class WordCounter {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
      
        final StreamsBuilder builder = new StreamsBuilder();
        
        NoriAnalyzer analyzer = new NoriAnalyzer();
      
        KStream<StringString> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(analyzer.analyzeForString(value).split(" ")))
                //return하는 value가 키가 된다. 리턴값으로 그룹핑된 카프카스트림 객체를 리턴한다. KGroupedStream
                .groupBy(new KeyValueMapper<StringStringString>() {
                  @Override
                  public String apply(String key, String value) {
                    log.info("key = {},value = {}",key,value);
                    return value;
                  }
                })
                //count()를 호출하여 해당 키값으로 몇개의 요소가 있는지 체크한다. 그리고 해당 데이터를 스토어에(KeyValueStore<Bytes,byte[]> counts-store) 담고
                //변경이 있는 KTable에 대해서만 결과값을 리턴한다.
                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
                .toStream() //KTable -> Stream
                .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
      
        final Topology topology = builder.build();
        System.out.println(topology.describe());
      
        final KafkaStreams streams = new KafkaStreams(topology, props);
      
        try {
          streams.start();
        } catch (Throwable e) {
          System.exit(1);
        }
    }
}
 
cs


지금까지는 메시지의 상태가 없는 StreamDSL이었다. 다음으로 알아볼 DSL은 이전 메시지의 상태를 참조하는 상태기반 스트림이다. 아마 실시간으로 데이터 스트림을 처리할때 상태가 필요없는 처리도 있겠지만, 오늘하루 사용자가 많이 문의한 단어등 단어의 빈도수를 계산해야하는 스트림이 있다고 해보자. 그렇다면 지금 이순간 전까지 해당 단어는 몇번이 누적되었는지를 알아야 이시간 이후로 들어오는 같은 단어도 이전 빈도수에 누적하여 통계를 낼 것이다. 이렇게 상태에 기반하여 처리해야하는 스트림을 처리하는 것이 상태기반 스트림이다.



다음 포스팅에서 다루어볼 내용은 Stateful transformations이다. 레퍼런스의 양이 굉장히 많은 부분으로 조금 정리한 후 포스팅하려한다.


posted by 여성게
: