Web/Spring
Spring kafka + micrometer tracing
여성게
2024. 5. 14. 20:03
build.gradle
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
id("org.springframework.boot") version "3.2.5"
id("io.spring.dependency-management") version "1.1.4"
kotlin("jvm") version "1.9.23"
kotlin("plugin.spring") version "1.9.23"
}
group = "com.spring"
version = "0.0.1-SNAPSHOT"
val jvmVersion = JavaVersion.VERSION_17
java {
sourceCompatibility = jvmVersion
}
repositories {
mavenCentral()
}
dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.springframework.kafka:spring-kafka")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.3")
implementation("org.springframework.boot:spring-boot-starter-actuator")
// tracing
implementation("io.micrometer:micrometer-tracing-bridge-brave")
implementation("io.zipkin.reporter2:zipkin-reporter-brave")
implementation("io.micrometer:micrometer-registry-prometheus")
// test tracing
testImplementation("io.micrometer:micrometer-tracing-test")
testImplementation("io.micrometer:micrometer-tracing-integration-test")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.kafka:spring-kafka-test")
testImplementation("org.mockito.kotlin:mockito-kotlin:5.0.0")
testApi("org.mockito:mockito-inline:3.6.28")
}
tasks.withType<KotlinCompile> {
kotlinOptions {
freeCompilerArgs = listOf("-Xjsr305=strict")
jvmTarget = jvmVersion.toString()
}
}
tasks.withType<Test> {
useJUnitPlatform()
}
application.yml
spring:
application:
name: kafka-sample
kafka:
brokers: localhost:9092
topic: test-topic
group-id: test-group-id
max-poll-records: 3
enable-auto-commit: false
auto-offset-reset: latest
management:
zipkin:
tracing:
endpoint: http://localhost:9411/api/v2/spans # default
tracing:
sampling:
probability: 1.0
propagation:
type: b3_multi
enabled: true
KafkaConfig.kt
package com.spring.kafkasample.config
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.*
import org.springframework.kafka.listener.ContainerProperties
import org.springframework.kafka.support.micrometer.KafkaListenerObservation
import org.springframework.kafka.support.micrometer.KafkaTemplateObservation
@EnableKafka
@Configuration
class KafkaConfig(
@Value("\${kafka.brokers}")
private val brokers: String,
@Value("\${kafka.enable-auto-commit}")
private val enableAutoCommit: Boolean,
@Value("\${kafka.max-poll-records}")
private val maxPollRecords: Int,
@Value("\${kafka.auto-offset-reset}")
private val autoOffsetReset: String,
private val kafkaErrorHandler: KafkaErrorHandler
) {
@Bean
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = consumerFactory()
factory.setConcurrency(1)
# observation config
factory.containerProperties.observationConvention = KafkaListenerObservation.DefaultKafkaListenerObservationConvention.INSTANCE
factory.containerProperties.isObservationEnabled = true
factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
factory.setCommonErrorHandler(kafkaErrorHandler)
return factory
}
@Bean
fun consumerFactory(): ConsumerFactory<String, String> {
return DefaultKafkaConsumerFactory(consumerConfigs())
}
@Bean
fun consumerConfigs(): Map<String, Any> {
val props = mutableMapOf<String, Any>()
props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = brokers
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = enableAutoCommit
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = autoOffsetReset
props[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = maxPollRecords
return props
}
@Bean
fun kafkaTemplate(): KafkaTemplate<String, String> {
val kafkaTemplate = KafkaTemplate(producerFactory())
# observation config
kafkaTemplate.setObservationEnabled(true)
kafkaTemplate.setObservationConvention(KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention.INSTANCE)
return kafkaTemplate
}
@Bean
fun producerFactory():ProducerFactory<String, String> {
val config = mutableMapOf<String, Any>()
config[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = brokers
config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
return DefaultKafkaProducerFactory(config)
}
}
나머지 consumer, producer 코드는 아래 링크를 참고.
https://coding-start.tistory.com/432