Web/Spring

Spring kafka + micrometer tracing

여성게 2024. 5. 14. 20:03

build.gradle

import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
    id("org.springframework.boot") version "3.2.5"
    id("io.spring.dependency-management") version "1.1.4"
    kotlin("jvm") version "1.9.23"
    kotlin("plugin.spring") version "1.9.23"
}

group = "com.spring"
version = "0.0.1-SNAPSHOT"

val jvmVersion = JavaVersion.VERSION_17

java {
    sourceCompatibility = jvmVersion
}

repositories {
    mavenCentral()
}

dependencies {
    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
    implementation("org.jetbrains.kotlin:kotlin-reflect")
    implementation("org.springframework.kafka:spring-kafka")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")

    implementation("org.springframework.boot:spring-boot-starter-actuator")

    // tracing
    implementation("io.micrometer:micrometer-tracing-bridge-brave")
    implementation("io.zipkin.reporter2:zipkin-reporter-brave")
    implementation("io.micrometer:micrometer-registry-prometheus")

    // test tracing
    testImplementation("io.micrometer:micrometer-tracing-test")
    testImplementation("io.micrometer:micrometer-tracing-integration-test")

    testImplementation("org.springframework.boot:spring-boot-starter-test")
    testImplementation("org.springframework.kafka:spring-kafka-test")
    testImplementation("org.mockito.kotlin:mockito-kotlin:5.0.0")
    testApi("org.mockito:mockito-inline:3.6.28")
}

tasks.withType<KotlinCompile> {
    kotlinOptions {
        freeCompilerArgs = listOf("-Xjsr305=strict")
        jvmTarget = jvmVersion.toString()
    }
}

tasks.withType<Test> {
    useJUnitPlatform()
}

 

application.yml

spring:
  application:
    name: kafka-sample

kafka:
  brokers: localhost:9092
  topic: test-topic
  group-id: test-group-id
  max-poll-records: 3
  enable-auto-commit: false
  auto-offset-reset: latest

management:
  zipkin:
    tracing:
      endpoint: http://localhost:9411/api/v2/spans # default
  tracing:
    sampling:
      probability: 1.0
    propagation:
      type: b3_multi
    enabled: true

 

KafkaConfig.kt

package com.spring.kafkasample.config

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.*
import org.springframework.kafka.listener.ContainerProperties
import org.springframework.kafka.support.micrometer.KafkaListenerObservation
import org.springframework.kafka.support.micrometer.KafkaTemplateObservation


@EnableKafka
@Configuration
class KafkaConfig(
    @Value("\${kafka.brokers}")
    private val brokers: String,
    @Value("\${kafka.enable-auto-commit}")
    private val enableAutoCommit: Boolean,
    @Value("\${kafka.max-poll-records}")
    private val maxPollRecords: Int,
    @Value("\${kafka.auto-offset-reset}")
    private val autoOffsetReset: String,
    private val kafkaErrorHandler: KafkaErrorHandler
) {
    @Bean
    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
        factory.consumerFactory = consumerFactory()
        factory.setConcurrency(1)
        # observation config
        factory.containerProperties.observationConvention = KafkaListenerObservation.DefaultKafkaListenerObservationConvention.INSTANCE
        factory.containerProperties.isObservationEnabled = true
        factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
        factory.setCommonErrorHandler(kafkaErrorHandler)
        return factory
    }

    @Bean
    fun consumerFactory(): ConsumerFactory<String, String> {
        return DefaultKafkaConsumerFactory(consumerConfigs())
    }

    @Bean
    fun consumerConfigs(): Map<String, Any> {
        val props = mutableMapOf<String, Any>()
        props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = brokers
        props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = enableAutoCommit
        props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
        props[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = maxPollRecords
        return props
    }

    @Bean
    fun kafkaTemplate(): KafkaTemplate<String, String> {
        val kafkaTemplate = KafkaTemplate(producerFactory())
        # observation config
        kafkaTemplate.setObservationEnabled(true)
        kafkaTemplate.setObservationConvention(KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention.INSTANCE)
        return kafkaTemplate
    }

    @Bean
    fun producerFactory():ProducerFactory<String, String> {
        val config = mutableMapOf<String, Any>()
        config[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = brokers
        config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
        config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java

        return DefaultKafkaProducerFactory(config)
    }
}

 

나머지 consumer, producer 코드는 아래 링크를 참고.

 

https://coding-start.tistory.com/432

 

Kafka - Spring kafka Producer, Consumer 예제 코드 및 오프셋 커밋, 에러 핸들링 설명

아래는 ContainerFactory 및 conusmer, producer 설정들이다. package com.spring.kafkasample.config import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serializ

coding-start.tistory.com