Middleware/Kafka&RabbitMQ

Kafka - Spring kafka Producer, Consumer 예제 코드 및 오프셋 커밋, 에러 핸들링 설명

여성게 2023. 11. 18. 01:29

아래는 ContainerFactory 및 conusmer, producer 설정들이다.

package com.spring.kafkasample.config

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.*
import org.springframework.kafka.listener.ContainerProperties


@EnableKafka
@Configuration
class KafkaConfig(private val kafkaErrorHandler: KafkaErrorHandler) {
    // RecordListener
    @Bean
    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
        factory.consumerFactory = consumerFactory()
        factory.setConcurrency(1)
        // MANUAL 설정을 해줘야, 리스너에서 Acknowledge 오브젝트를 받을 수 있음.
        factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
        // 컨슈머 로직 처리시 예외 발생시 어떻게 처리할지를 정의
        // default로는 DefaultErrorHandler를 사용하는데 기본 동작구조는 최대 10회 retry 후
        // (backoff ms는 100ms)
        // 실패한 메시지를 커밋해버린다.(물론 커밋을 안하게 할 수 있다. 요건 뒤에서 설명)
        factory.setCommonErrorHandler(kafkaErrorHandler)
        return factory
    }

    // BatchListener
    @Bean
    fun batchKafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
        factory.consumerFactory = consumerFactory()
        factory.setConcurrency(1)
        // 배치 리스너를 사용한다는 설정
        factory.isBatchListener = true
        factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
        factory.setCommonErrorHandler(kafkaErrorHandler)
        return factory
    }

    @Bean
    fun consumerFactory(): ConsumerFactory<String, String> {
        return DefaultKafkaConsumerFactory(consumerConfigs())
    }

    @Bean
    fun consumerConfigs(): Map<String, Any> {
        val props = mutableMapOf<String, Any>()
        props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "0.0.0.0:29092"
        props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = false
        props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
        // 해당 설정은 한번에 poll()로 몇 개의 레코드를 가져오는지 설정.
        // AckMode.MANUAL일 때 커밋 단위가 되는 개수.
        // poll()로 2개의 레코드를 가져와서 해당 2개의 레코드를 다 처리완료되면
        // 2개가 한번에 커밋됨.
        props[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 2
        return props
    }

    @Bean
    fun kafkaTemplate(): KafkaTemplate<String, String> {
        return KafkaTemplate(producerFactory())
    }

    @Bean
    fun producerFactory():ProducerFactory<String, String> {
        val config = mutableMapOf<String, Any>()
        config[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "0.0.0.0:29092"
        config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
        config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java

        return DefaultKafkaProducerFactory(config)
    }
}

 

아래는 에러 핸들러 코드이다.

 

package com.spring.kafkasample.config

import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.kafka.listener.CommonErrorHandler
import org.springframework.kafka.listener.MessageListenerContainer
import org.springframework.stereotype.Component
import java.lang.Exception

@Component
class KafkaErrorHandler: CommonErrorHandler {
    override fun handleRecord(
        thrownException: Exception,
        record: ConsumerRecord<*, *>,
        consumer: Consumer<*, *>,
        container: MessageListenerContainer
    ) {
        print("error: ${thrownException.message}, record: ${record.value()}")
    }

    // 해당 값이 false면 에러 핸들링 이후에도 오프셋 커밋을 하지 않음.
    override fun isAckAfterHandle(): Boolean {
        return true
    }
}

 

(max poll count 개수 만큼 폴링하고 각 메시지를 컨슘해서 커밋하는데, 커밋은 메시지 단위가 아니고 max poll count 만큼 다 처리된 후 한번에 커밋된다. AckMode.MANUAL 일때!)

 

위에서 에러 핸들링이 끝나면, 커밋이 된다고 하였는데 이 설정이 바로 isAckAfterHandler() 설정이다. 요게 false이면 에러 처리해도 오프셋 커밋이 되지 않음.(이건 당연히 true가 맞는게, 항상 예외가 발생하는 로직일 경우 무한으로 컨슘이 발생하기 때문이다.)

 

그렇다면 만약 한번의 poll()에서 2개를 가져왔고, 첫번째 메시지 처리에서 예외가 발생해서 에러 핸들링이 발생하면 어떻게 될까? 커밋? 아니다. 일단 넘어가고 2번째 메시지까지 처리가 완료되어야(성공이던 예외처리던) 첫번째 예외처리된 메시지도 커밋된다.

 

아래는 리스너 설정이다.

 

package com.spring.kafkasample.config

import com.spring.kafkasample.ProduceController
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.support.Acknowledgment
import org.springframework.stereotype.Component

@Component
class KafkaListener {
    @KafkaListener(
        clientIdPrefix = "record-listener",
        topics = ["test-topic"],
        groupId = "test-topic-group",
        containerFactory = "kafkaListenerContainerFactory"
    )
    fun recordListener(message: ProduceController.Message, acknowledgment: Acknowledgment) {
        print("message: $message")
        acknowledgment.acknowledge()
    }

    @KafkaListener(
        clientIdPrefix = "batch-listener",
        topics = ["test-topic"],
        groupId = "test-topic-group",
        containerFactory = "batchKafkaListenerContainerFactory"
    )
    fun batchListener(messages: List<ProduceController.Message>, acknowledgment: Acknowledgment) {
        print("message: $messages")
        acknowledgment.acknowledge()
    }
}

 

예제에 배치 리스너가 있는데, 배치 리스터는 한번에 poll()으로 가져온 메시지 모두를 인자로 받는 것이다. 위 설정대로라면 배치 리스너는 2개의 메시지를 받을 것이다.(AckMode.MANUAL은 배치 리스너 일때 AckMode.BATCH와 동일하게 동작한다.)

 

또 하나 살펴보아야할 것이 있다. 스프링 카프카에서는  브로커에서 메시지를 가져올 때, max poll record 만큼 가져오지 않고 fetch max byte size만큼 가져오는데, 만약 fetch로 4개를 가져왔다 가정해보자.

 

 

a,b,c,d -> 1번째 poll()에 a,b를 가져옴 -> a, b를 처리하다 커밋을 하지 못함

 

위와 같은 상황이 발생하였다면 다음 동작은 어떻게 될까? a,b를 다시 컨슘한다? 아니다. 스프링 카프카는 내부적으로 fetch한 데이터의 오프셋을 따로 관리하고 있기때문에 a,b 커밋을 하지 못하더라도 c,d를 가져온다. (물론 보통은 예외가 발생해서 커밋을 못하는 상황일텐데 그럴경우 에러 핸들러에서 처리 후에 isAckAfterHandler() 설정에 의해 커밋이 될것이다.)