Web/Spring 2023. 11. 5. 15:16

https://jessyt.tistory.com/m/151?category=966697

 

Spring-Kafka Lifecycle

이번 글에서는 Spring-Kafka의 Lifecycle에 대해서 작성해보겠습니다. 목차 Lifecycle Lifecycle Management 주의사항 1. Lifecycle @KafkaListener는 Application Context 안에 Bean이 아닙니다. @KafkaListener는 KafkaListenerEndpointR

jessyt.tistory.com


해당 포스팅 말고 다음 포스팅에도 읽어볼만한 주제가
많음

https://shining-life.tistory.com/m/3

 

4.1.3. Receiving Messages - (1) MessageListenerContainer

카프카 메시지를 수신하는 방법 두 가지 1. MessageListenerContainer Configuring 2. @KafkaListener 어노테이션을 사용하여, 메시지 리스너를 구현 Message Listeners message listener를 위해 제공되는 8가지 인터페이스

shining-life.tistory.com

nack() 설명

https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.3/reference/html/spring-cloud-stream-binder-kafka.html#kafka-consumer-properties

 

Spring Cloud Stream Kafka Binder Reference Guide

This guide describes the Apache Kafka implementation of the Spring Cloud Stream Binder. It contains information about its design, usage, and configuration options, as well as information on how the Stream Cloud Stream concepts map onto Apache Kafka specifi

docs.spring.io

spring cloud stream kafka docs

spring kafka streams functional 방식으로 컨슈머를 구성하면 왠지..batch mode가 false 이고 레코드 단위로 컨슘 and 커밋하는듯함. 그래서 일전에 반드시 실패하는 로직 경우 커밋을 하지 못하고 무한 컨슘(이건 더 알아봐야 할듯함)

일단 위 설명에는 아래와 같이 써있음

If the ackMode is not set and batch mode is not enabled, RECORD ackMode will be used.



spring kafka docs

https://docs.spring.io/spring-kafka/docs/

 

Index of /spring-kafka/docs

 

docs.spring.io

 

posted by 여성게
:
Web/Spring 2023. 11. 5. 14:35

https://docs.spring.io/spring-kafka/docs/current/reference/html/#committing-offsets

 

Spring for Apache Kafka

When using Spring for Apache Kafka in a Spring Boot application, the Apache Kafka dependency versions are determined by Spring Boot’s dependency management. If you wish to use a different version of kafka-clients or kafka-streams, and use the embedded ka

docs.spring.io


https://hanseom.tistory.com/m/174

 

08. 스프링 카프카 컨슈머(Spring Kafka Consumer)

스프링 카프카 컨슈머는 기존 컨슈머를 2개의 타입으로 나누고 커밋을 7가지로 나누어 세분화 했습니다. 1. 타입 레코드 리스너(MessageListener): 단 1개의 레코드를 처리합니다. (스프링 카프카 컨슈

hanseom.tistory.com


BATCH 타입일 경우 poll로 가져온 모든 레코드 처리가 완료된 후에 한번에 커밋한다.

MANUAL 타입일 경우 커밋을 하면 다음 poll때 커밋한다. 리스너에서 레코드 단위로 처리하게 되어 매번 acknowledge()를 호출하면 BATCH 타입과 동일하게 동작한다.(AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 리스너로 사용해야 한다.)

MANUAL_IMMEDIATE 타입은 acknowledge() 호출시 즉시 커밋한다.(AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 리스너로 사용해야 한다.)

 

https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/message-listeners.html

 

Message Listeners :: Spring Kafka

When you use a message listener container, you must provide a listener to receive data. There are currently eight supported interfaces for message listeners. The following listing shows these interfaces:

docs.spring.io

스프링 카프카 메시지 리스너 레퍼런스

posted by 여성게
:
Web/Spring 2023. 11. 2. 22:30

https://m.youtube.com/watch?v=XBXmHCy1EBA&pp=ygUT64yA7Jqp65-JIO2KuOuemO2UvQ%3D%3D


https://m.youtube.com/watch?v=qzHjK1-07fI&pp=ygUT64yA7Jqp65-JIO2KuOuemO2UvQ%3D%3D

posted by 여성게
:
Web/Spring 2021. 11. 17. 18:21

웹플럭스에서 블록킹 연산을 발생시키는 채널이 있다면, 이벤트 채널을 관리하는 이벤트 루프 자체에 블럭킹이 발생하기 때문에 전체적으로 요청 처리를 하나도 못하는 문제가 발생할 수 있다. 그렇기 때문에 블럭킹을 발생시키는 연산이 있을 경우 스케쥴을 분리시켜주는 것이 좋고, 실제로 리액터에서도 이러한 것을 고려해 스케쥴러 생성 팩토리 메서드를 제공한다.

위 두개의 팩토리 메서드는 non-blocking 연산을 위한 스케쥴러 팩토리 메서드이다. 오래 걸리는 연산 등을 이벤트 루프 쓰레드에서 분리하고 싶을 때 사용하며, 블럭킹 연산이 포함되지 않은 연산에서만 사용해야한다. 만약 블록킹 연산에 대해 스케쥴을 분리하고 싶다면 boundedElastic()을 이용하면 된다.

 

사용법은 아래와 같다.

 

 

그런데, 블록킹 연산에 대해 스케쥴 분리하면 된다는 것은 알겠는데 이걸 어떻게 일일이 찾아서 분리해줄까? 이것은 메서드 시그니쳐로 약속하자.

 

 

위 두개의 메서드는 동기 메서드이며, 첫번째 메서드는 동기코드이며 블럭킹을 유발시키는 코드이다.(InterruptedException 발생) 세번째 메서드는 블록킹 콜이 없는 비동기 메서드이다. 첫번째 두번째 메서드는 사용하는 쪽에서 스케쥴 분리를 신경쓰면 되고 세번째 메서드는 해당 메서드 안에서 스케쥴 분리 처리를 해주어야한다. 근데 진짜 위 메서드 시그니처를 지키면 블록킹 콜이 발생하지 않을까?.. 이것을 찾기위한 도구로 BlockHound가 있다.

 

 

BlockHound는 운영환경에 같이 띄우면 안된다.(실제 바이트 코드등의 영향을 줄 수 있어서) 그렇기에 테스트 코드와 결합하여 사전에 블록킹 연산을 찾아내는 용도로만 사용하자 !(비효율적인 연산 등을 잡는 역할로는 사용 불가)

 

출처: IfKakao

posted by 여성게
:
Web/Spring 2020. 6. 25. 16:03

 

오늘 다루어볼 내용은 spring data mongo + querydsl 연동 및 간단한 예제를 다루어볼 것이다. 예제 환경은 아래와 같다.

 

- gradle : 6.4.1
- spring boot : 2.3.1.RELEASE

 

모든 코드는 아래 깃헙을 참고하자.

 

yoonyeoseong/spring-mongo-querydsl

Contribute to yoonyeoseong/spring-mongo-querydsl development by creating an account on GitHub.

github.com

 

<gradle 설정>

아래는 spring data mongo와 querydsl 연동을 위한 gradle 설정이다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
buildscript {
    ext {
        queryDslVersion = '4.3.0'
    }
    repositories {
        maven {
            url "https://plugins.gradle.org/m2/"
        }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:2.3.1.RELEASE")
    }
}
 
plugins {
    id 'org.springframework.boot' version '2.3.1.RELEASE'
    id 'io.spring.dependency-management' version '1.0.9.RELEASE'
    id "com.ewerk.gradle.plugins.querydsl" version '1.0.10'
    id 'java'
    id 'idea'
}
 
group = 'com.levi'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '14'
 
repositories {
    mavenCentral()
}
 
dependencies {
    compile 'org.springframework.boot:spring-boot-starter-data-mongodb-reactive'
    compile 'org.springframework.boot:spring-boot-starter-webflux'
    compile('org.springframework.boot:spring-boot-configuration-processor')
 
    compile "com.querydsl:querydsl-mongodb:${queryDslVersion}"
 
    compile 'org.projectlombok:lombok'
 
    annotationProcessor(
            'org.springframework.boot:spring-boot-configuration-processor',
            "com.querydsl:querydsl-apt:${queryDslVersion}",
            'org.projectlombok:lombok'
    )
 
    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
    testImplementation 'io.projectreactor:reactor-test'
}
 
test {
    useJUnitPlatform()
}
 
def querydslSrcDir = 'src/main/querydsl'
 
querydsl {
    springDataMongo = true
    querydslSourcesDir = querydslSrcDir
}
 
sourceSets {
    main {
        java {
            srcDirs = ['src/main/java', querydslSrcDir]
        }
    }
}
 
compileQuerydsl{
    options.annotationProcessorPath = configurations.querydsl
}
 
configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }
    querydsl.extendsFrom compileClasspath
}
 
project.afterEvaluate {
    project.tasks.compileQuerydsl.options.compilerArgs = [
            "-proc:only",
            "-processor", project.querydsl.processors() + ',lombok.launch.AnnotationProcessorHider$AnnotationProcessor'
    ]
}
 
cs

 

작성중...

 

 

<참고>

 

Spring Data MongoDB - Reference Documentation

As of version 3.6, MongoDB supports the concept of sessions. The use of sessions enables MongoDB’s Causal Consistency model, which guarantees running operations in an order that respects their causal relationships. Those are split into ServerSession inst

docs.spring.io

 

gradle 프로젝트에서 querydsl 설정하기

gradle 4.6 / querydsl 4.2.1 / spring-data-jpa 1.11.13.RELEASE / spring-data-mongodb 1.10.8.RELEASE이 환경을 어떻게 gradle 설정으로 푸는지 정리힙니다.

mingpd.github.io

 

ewerk/gradle-plugins

A collection of Gradle plugins. Contribute to ewerk/gradle-plugins development by creating an account on GitHub.

github.com

posted by 여성게
:
Web/Spring 2020. 6. 25. 15:56

 

Spring Data MongoDB - Reference Documentation

As of version 3.6, MongoDB supports the concept of sessions. The use of sessions enables MongoDB’s Causal Consistency model, which guarantees running operations in an order that respects their causal relationships. Those are split into ServerSession inst

docs.spring.io

 

posted by 여성게
:
Web/Spring 2020. 5. 15. 14:12

 

MongoDbConfig를 작성할때, 몽고디비 서버 호스트관련하여 ClusterSettings.Builder를 작성해줘야하는데, mongo host에 모든 클러스터 서버 호스트를 명시하지 않고, 하나의 DNS(여러 서버를 하나로 묶은) 혹은 여러 서버 리스트 중 하나의 primary 호스트(ex. primary host를 명시하면 밑에 예외는 발생하지 않지만, 읽기 부하분산이 안된다.)만 명시한경우에는 반드시 multiple mode를 명시해주어야 한다. 내부적으로 host의 갯수를 보고 single mode인지 multiple mode인지 판단하기 때문이다. 해당 코드는 아래와 같다.

 

private ClusterSettings(final Builder builder) {
        // TODO: Unit test this
        if (builder.srvHost != null) {
            if (builder.srvHost.contains(":")) {
                throw new IllegalArgumentException("The srvHost can not contain a host name that specifies a port");
            }

            if (builder.hosts.get(0).getHost().split("\\.").length < 3) {
                throw new MongoClientException(format("An SRV host name '%s' was provided that does not contain at least three parts. "
                        + "It must contain a hostname, domain name and a top level domain.", builder.hosts.get(0).getHost()));
            }
        }

        if (builder.hosts.size() > 1 && builder.requiredClusterType == ClusterType.STANDALONE) {
            throw new IllegalArgumentException("Multiple hosts cannot be specified when using ClusterType.STANDALONE.");
        }

        if (builder.mode != null && builder.mode == ClusterConnectionMode.SINGLE && builder.hosts.size() > 1) {
            throw new IllegalArgumentException("Can not directly connect to more than one server");
        }

        if (builder.requiredReplicaSetName != null) {
            if (builder.requiredClusterType == ClusterType.UNKNOWN) {
                builder.requiredClusterType = ClusterType.REPLICA_SET;
            } else if (builder.requiredClusterType != ClusterType.REPLICA_SET) {
                throw new IllegalArgumentException("When specifying a replica set name, only ClusterType.UNKNOWN and "
                                                   + "ClusterType.REPLICA_SET are valid.");
            }
        }

        description = builder.description;
        srvHost = builder.srvHost;
        hosts = builder.hosts;
        mode = builder.mode != null ? builder.mode : hosts.size() == 1 ? ClusterConnectionMode.SINGLE : ClusterConnectionMode.MULTIPLE;
        requiredReplicaSetName = builder.requiredReplicaSetName;
        requiredClusterType = builder.requiredClusterType;
        localThresholdMS = builder.localThresholdMS;
        serverSelector = builder.packServerSelector();
        serverSelectionTimeoutMS = builder.serverSelectionTimeoutMS;
        maxWaitQueueSize = builder.maxWaitQueueSize;
        clusterListeners = unmodifiableList(builder.clusterListeners);
    }

 

ClusterSettings.Builder.build 메서드의 일부인데, mode를 set하는 부분에 mode를 명시적으로 넣지 않았다면 작성된 호스트 갯수를 보고 클러스터 모드를 결정한다. 만약 MonoDb 서버 여러개를 하나의 도메인으로 묶어 놓았다면, 보통 DNS하나만 설정에 넣기 마련인데, 이러면 write 요청이 secondary에 들어가게 되면 아래와 같은 에러가 발생하게 된다.(먄약 실수로 secondary host를 넣었다면 쓰기요청에 당연히 아래 예외가 계속 발생한다.)

 

MongoNotPrimaryException: Command failed with error 10107 (NotMaster): 'not master' on server bot-meta01-mongo1.dakao.io:27017. 

 

왠지, SINGLE모드일때는 secondary로 write요청이 들어왔을때 primary로 위임이 안되는듯하다.(이건 조사해봐야할듯함, 왠지 싱글모드이면 당연히 프라이머리라고 판단해서 그럴듯?..) 그렇기 때문에 클러스터는 걸려있고, 서버 리스트를 여러 서버를 묶은 DNS 하나만 작성한다면 반드시 ClusterSetting에 "MULTIPLE"을 명시해서 넣야야한다 !

posted by 여성게
:
Web/Spring 2020. 4. 29. 20:50

 

오늘 다루어볼 내용은 spring application.yaml(properties)파일들의 로드 규칙 및 순서이다. 기본적인 내용일수는 있겠지만 필자는 이번에 해당 순서의 중요성을 다시 한번 알게되서 한번더 정리해보려고 한다.

 

 

Spring Boot Features

If you need to call remote REST services from your application, you can use the Spring Framework’s RestTemplate class. Since RestTemplate instances often need to be customized before being used, Spring Boot does not provide any single auto-configured RestT

docs.spring.io

 

위 링크를 보면 PropertySource의 적용 순서가 나와있다. 우리가 신경 쓸 것은 application.properties와 application-{profiles}.properties의 순서이다. 위 링크에서는 application-{profiles}.properties가 더 우선 순위를 갖는다고 이야기하고 있다. 그 말은 무엇일까? 아래 간단히 application.properties 파일을 확인해보자.

 

#application.yaml
spring:
  application:
    name: name-1
    
#application-dev.yaml
spring:
  application:
    name: name-2

 

과연 위 yaml파일들을 모두 적용하고 나서 아래 코드를 실행한다면 어떤 값이 출력될까? 실행할때 VM option에

 

"-Dspring.profiles.active=dev"

 

을 넣어서 run 해야한다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Slf4j
@RestController
@SpringBootApplication
public class JenkinsSampleApplication implements CommandLineRunner {
 
    @Value("${spring.application.name}")
    private String appName;
 
    public static void main(String[] args) {
        SpringApplication.run(JenkinsSampleApplication.class, args);
    }
 
    @Override
    public void run(String... args) throws Exception {
        log.info(appName);
    }
 
}
cs

 

결과는 "name-2"가 출력된다. 그 말은 application-{profiles}.yaml이 우선 적용이 된다는 뜻이다. 그렇다면 아래와 같은 config가 있다면 어떨까?

 

#application.yaml
spring:
  application:
    name: name-1
    
#application-dev.yaml
spring:
  profiles:
    include: dev-common
  application:
    name: name-2
    
#application-dev-common.yaml
spring:
  application:
    name: name-3

 

출력결과는 "name-3"이다. 그 말은 application-dev.yaml에서 include한 application-dev-common.yaml이 가장 큰 우선순위를 갖는 것이다. 

 

#application.yaml
spring:
  profiles:
    include: dev-common
  application:
    name: name-1
    
#application-dev.yaml
spring:
  application:
    name: name-2
    
#application-dev-common.yaml
spring:
  application:
    name: name-3

 

위 파일은 application-dev-common.yaml파일을 application.yaml에 include하였다. 이때 결과는 어떻게 될것인가? "name-2"를 출력할 것이다. 그 이유는 application.yaml에서 include했지만 application-dev.yaml이 더 우선순위를 갖기 때문이다. 즉, include된 설정값이 가장 우선순위를 갖기 위해서는 application-{profiles}.yaml에 include해야한다.

(만약 application-{profiles}.yaml이 없었다면 application-dev-common.yaml의 설정인 "name-3" 설정이 적용이 될것이다.)

 

해당 규칙들을 잘 활용해서 관리하기 쉽게 설정값들을 유지하면 좋을 것 같다. 여기까지 간단하게 spring application 설정의 적용 규칙 및 순서에 대해 다루어보았다.

posted by 여성게
: