Middleware/Kafka&RabbitMQ 2023. 11. 18. 01:29

아래는 ContainerFactory 및 conusmer, producer 설정들이다.

package com.spring.kafkasample.config

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.*
import org.springframework.kafka.listener.ContainerProperties


@EnableKafka
@Configuration
class KafkaConfig(private val kafkaErrorHandler: KafkaErrorHandler) {
    // RecordListener
    @Bean
    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
        factory.consumerFactory = consumerFactory()
        factory.setConcurrency(1)
        // MANUAL 설정을 해줘야, 리스너에서 Acknowledge 오브젝트를 받을 수 있음.
        factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
        // 컨슈머 로직 처리시 예외 발생시 어떻게 처리할지를 정의
        // default로는 DefaultErrorHandler를 사용하는데 기본 동작구조는 최대 10회 retry 후
        // (backoff ms는 100ms)
        // 실패한 메시지를 커밋해버린다.(물론 커밋을 안하게 할 수 있다. 요건 뒤에서 설명)
        factory.setCommonErrorHandler(kafkaErrorHandler)
        return factory
    }

    // BatchListener
    @Bean
    fun batchKafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
        factory.consumerFactory = consumerFactory()
        factory.setConcurrency(1)
        // 배치 리스너를 사용한다는 설정
        factory.isBatchListener = true
        factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
        factory.setCommonErrorHandler(kafkaErrorHandler)
        return factory
    }

    @Bean
    fun consumerFactory(): ConsumerFactory<String, String> {
        return DefaultKafkaConsumerFactory(consumerConfigs())
    }

    @Bean
    fun consumerConfigs(): Map<String, Any> {
        val props = mutableMapOf<String, Any>()
        props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "0.0.0.0:29092"
        props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = false
        props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
        // 해당 설정은 한번에 poll()로 몇 개의 레코드를 가져오는지 설정.
        // AckMode.MANUAL일 때 커밋 단위가 되는 개수.
        // poll()로 2개의 레코드를 가져와서 해당 2개의 레코드를 다 처리완료되면
        // 2개가 한번에 커밋됨.
        props[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 2
        return props
    }

    @Bean
    fun kafkaTemplate(): KafkaTemplate<String, String> {
        return KafkaTemplate(producerFactory())
    }

    @Bean
    fun producerFactory():ProducerFactory<String, String> {
        val config = mutableMapOf<String, Any>()
        config[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "0.0.0.0:29092"
        config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
        config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java

        return DefaultKafkaProducerFactory(config)
    }
}

 

아래는 에러 핸들러 코드이다.

 

package com.spring.kafkasample.config

import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.kafka.listener.CommonErrorHandler
import org.springframework.kafka.listener.MessageListenerContainer
import org.springframework.stereotype.Component
import java.lang.Exception

@Component
class KafkaErrorHandler: CommonErrorHandler {
    override fun handleRecord(
        thrownException: Exception,
        record: ConsumerRecord<*, *>,
        consumer: Consumer<*, *>,
        container: MessageListenerContainer
    ) {
        print("error: ${thrownException.message}, record: ${record.value()}")
    }

    // 해당 값이 false면 에러 핸들링 이후에도 오프셋 커밋을 하지 않음.
    override fun isAckAfterHandle(): Boolean {
        return true
    }
}

 

(max poll count 개수 만큼 폴링하고 각 메시지를 컨슘해서 커밋하는데, 커밋은 메시지 단위가 아니고 max poll count 만큼 다 처리된 후 한번에 커밋된다. AckMode.MANUAL 일때!)

 

위에서 에러 핸들링이 끝나면, 커밋이 된다고 하였는데 이 설정이 바로 isAckAfterHandler() 설정이다. 요게 false이면 에러 처리해도 오프셋 커밋이 되지 않음.(이건 당연히 true가 맞는게, 항상 예외가 발생하는 로직일 경우 무한으로 컨슘이 발생하기 때문이다.)

 

그렇다면 만약 한번의 poll()에서 2개를 가져왔고, 첫번째 메시지 처리에서 예외가 발생해서 에러 핸들링이 발생하면 어떻게 될까? 커밋? 아니다. 일단 넘어가고 2번째 메시지까지 처리가 완료되어야(성공이던 예외처리던) 첫번째 예외처리된 메시지도 커밋된다.

 

아래는 리스너 설정이다.

 

package com.spring.kafkasample.config

import com.spring.kafkasample.ProduceController
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.support.Acknowledgment
import org.springframework.stereotype.Component

@Component
class KafkaListener {
    @KafkaListener(
        clientIdPrefix = "record-listener",
        topics = ["test-topic"],
        groupId = "test-topic-group",
        containerFactory = "kafkaListenerContainerFactory"
    )
    fun recordListener(message: ProduceController.Message, acknowledgment: Acknowledgment) {
        print("message: $message")
        acknowledgment.acknowledge()
    }

    @KafkaListener(
        clientIdPrefix = "batch-listener",
        topics = ["test-topic"],
        groupId = "test-topic-group",
        containerFactory = "batchKafkaListenerContainerFactory"
    )
    fun batchListener(messages: List<ProduceController.Message>, acknowledgment: Acknowledgment) {
        print("message: $messages")
        acknowledgment.acknowledge()
    }
}

 

예제에 배치 리스너가 있는데, 배치 리스터는 한번에 poll()으로 가져온 메시지 모두를 인자로 받는 것이다. 위 설정대로라면 배치 리스너는 2개의 메시지를 받을 것이다.(AckMode.MANUAL은 배치 리스너 일때 AckMode.BATCH와 동일하게 동작한다.)

 

또 하나 살펴보아야할 것이 있다. 스프링 카프카에서는  브로커에서 메시지를 가져올 때, max poll record 만큼 가져오지 않고 fetch max byte size만큼 가져오는데, 만약 fetch로 4개를 가져왔다 가정해보자.

 

 

a,b,c,d -> 1번째 poll()에 a,b를 가져옴 -> a, b를 처리하다 커밋을 하지 못함

 

위와 같은 상황이 발생하였다면 다음 동작은 어떻게 될까? a,b를 다시 컨슘한다? 아니다. 스프링 카프카는 내부적으로 fetch한 데이터의 오프셋을 따로 관리하고 있기때문에 a,b 커밋을 하지 못하더라도 c,d를 가져온다. (물론 보통은 예외가 발생해서 커밋을 못하는 상황일텐데 그럴경우 에러 핸들러에서 처리 후에 isAckAfterHandler() 설정에 의해 커밋이 될것이다.)

posted by 여성게
:
Middleware/Kafka&RabbitMQ 2023. 11. 7. 22:35

https://stackoverflow.com/questions/64681806/not-acknowledging-the-kafka-message-at-all-in-manual-immediate-mode

 

Not acknowledging the kafka message at all in MANUAL_IMMEDIATE mode

I could not find any documentation related to this issue, hence the question. What happens if @KafkaListener method does not call acknowledgement.acknowledge() at all when the ackModeis set to

stackoverflow.com


nack()은 현재 실패한 메시지 전의 오프셋까지 커밋하고 나머지 오프셋메시지를 버리고 다음 폴링때 버린 오프셋부터 다시 폴링한다.

https://pula39.tistory.com/m/19

 

Kafka의 Auto Commit 에서 Auto는 당신이 생각하는 Auto가 아닐 수 있다.

환경설정을 할 때 auto라는 키워드가 나오면 작업자는 날먹을 꿈꾸며 행복해지는 한 편, 이 auto가 어디까지 자동으로 해주고 어디까지는 안해주는지 공포에 떨며 작업을 하게 된다. 나에게 kafka의

pula39.tistory.com

오토 커밋 문제점

posted by 여성게
:
Web/Spring 2023. 11. 5. 15:16

https://jessyt.tistory.com/m/151?category=966697

 

Spring-Kafka Lifecycle

이번 글에서는 Spring-Kafka의 Lifecycle에 대해서 작성해보겠습니다. 목차 Lifecycle Lifecycle Management 주의사항 1. Lifecycle @KafkaListener는 Application Context 안에 Bean이 아닙니다. @KafkaListener는 KafkaListenerEndpointR

jessyt.tistory.com


해당 포스팅 말고 다음 포스팅에도 읽어볼만한 주제가
많음

https://shining-life.tistory.com/m/3

 

4.1.3. Receiving Messages - (1) MessageListenerContainer

카프카 메시지를 수신하는 방법 두 가지 1. MessageListenerContainer Configuring 2. @KafkaListener 어노테이션을 사용하여, 메시지 리스너를 구현 Message Listeners message listener를 위해 제공되는 8가지 인터페이스

shining-life.tistory.com

nack() 설명

https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.3/reference/html/spring-cloud-stream-binder-kafka.html#kafka-consumer-properties

 

Spring Cloud Stream Kafka Binder Reference Guide

This guide describes the Apache Kafka implementation of the Spring Cloud Stream Binder. It contains information about its design, usage, and configuration options, as well as information on how the Stream Cloud Stream concepts map onto Apache Kafka specifi

docs.spring.io

spring cloud stream kafka docs

spring kafka streams functional 방식으로 컨슈머를 구성하면 왠지..batch mode가 false 이고 레코드 단위로 컨슘 and 커밋하는듯함. 그래서 일전에 반드시 실패하는 로직 경우 커밋을 하지 못하고 무한 컨슘(이건 더 알아봐야 할듯함)

일단 위 설명에는 아래와 같이 써있음

If the ackMode is not set and batch mode is not enabled, RECORD ackMode will be used.



spring kafka docs

https://docs.spring.io/spring-kafka/docs/

 

Index of /spring-kafka/docs

 

docs.spring.io

 

posted by 여성게
:
Web/Spring 2023. 11. 5. 14:35

https://docs.spring.io/spring-kafka/docs/current/reference/html/#committing-offsets

 

Spring for Apache Kafka

When using Spring for Apache Kafka in a Spring Boot application, the Apache Kafka dependency versions are determined by Spring Boot’s dependency management. If you wish to use a different version of kafka-clients or kafka-streams, and use the embedded ka

docs.spring.io


https://hanseom.tistory.com/m/174

 

08. 스프링 카프카 컨슈머(Spring Kafka Consumer)

스프링 카프카 컨슈머는 기존 컨슈머를 2개의 타입으로 나누고 커밋을 7가지로 나누어 세분화 했습니다. 1. 타입 레코드 리스너(MessageListener): 단 1개의 레코드를 처리합니다. (스프링 카프카 컨슈

hanseom.tistory.com


BATCH 타입일 경우 poll로 가져온 모든 레코드 처리가 완료된 후에 한번에 커밋한다.

MANUAL 타입일 경우 커밋을 하면 다음 poll때 커밋한다. 리스너에서 레코드 단위로 처리하게 되어 매번 acknowledge()를 호출하면 BATCH 타입과 동일하게 동작한다.(AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 리스너로 사용해야 한다.)

MANUAL_IMMEDIATE 타입은 acknowledge() 호출시 즉시 커밋한다.(AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 리스너로 사용해야 한다.)

 

https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/message-listeners.html

 

Message Listeners :: Spring Kafka

When you use a message listener container, you must provide a listener to receive data. There are currently eight supported interfaces for message listeners. The following listing shows these interfaces:

docs.spring.io

스프링 카프카 메시지 리스너 레퍼런스

posted by 여성게
:
Web/Spring 2023. 11. 2. 22:30

https://m.youtube.com/watch?v=XBXmHCy1EBA&pp=ygUT64yA7Jqp65-JIO2KuOuemO2UvQ%3D%3D


https://m.youtube.com/watch?v=qzHjK1-07fI&pp=ygUT64yA7Jqp65-JIO2KuOuemO2UvQ%3D%3D

posted by 여성게
:
Middleware/Kafka&RabbitMQ 2023. 10. 30. 15:59

https://d2.naver.com/helloworld/7181840

 

posted by 여성게
:
카테고리 없음 2023. 10. 10. 22:46

https://jongmin92.github.io/2019/03/03/Java/java-nio/

posted by 여성게
:
머신러닝 2023. 8. 30. 17:39

https://ratsgo.github.io/nlpbook/docs/language_model/tr_self_attention/

 

Self Attention

pratical tips for Natural Language Processing

ratsgo.github.io

 

posted by 여성게
: