Middleware/Kafka&RabbitMQ 2019. 3. 28. 16:41

Kafka - Spring cloud stream kafka(스프링 클라우드 스트림 카프카)



이전 포스팅까지는 카프카의 아키텍쳐, 클러스터 구성방법, 자바로 이용하는 프로듀서,컨슈머 등의 글을 작성하였다. 이번 포스팅은 이전까지 작성된 지식을 바탕으로 메시징 시스템을 추상화한 구현체인 Spring Cloud Stream을 이용하여 카프카를 사용하는 글을 작성하려고 한다. 혹시라도 카프카에 대해 아직 잘모르는 사람들이 이 글을 본다면 이전 포스팅을 한번 참고하고 와도 좋을 것같다.(이번에 작성하는 포스팅은 Spring Cloud stream 2.0 레퍼런스 기준으로 작성하였다.)

그리고 이번 포스팅에서 진행하는 모든 예제는 카프카를 미들웨어로 사용하는 예제이고, 카프카는 클러스터를 구성하였다.


▶︎▶︎▶︎Spring Cloud Stream v2.0 Reference

▶︎▶︎▶︎2019/03/12 - [Kafka&RabbitMQ] - Kafka - Kafka(카프카)의 동작 방식과 원리

▶︎▶︎▶︎2019/03/13 - [Kafka&RabbitMQ] - Kafka - Kafka(카프카) cluster(클러스터) 구성 및 간단한 CLI사용법

▶︎▶︎▶︎2019/03/16 - [Kafka&RabbitMQ] - Kafka - Kafka Producer(카프카 프로듀서) In Java&CLI

▶︎▶︎▶︎2019/03/24 - [Kafka&RabbitMQ] - Kafka - Kafka Consumer(카프카 컨슈머) Java&CLI

▶︎▶︎▶︎2019/03/26 - [Kafka&RabbitMQ] - Kafka - Kafka Streams API(카프카 스트림즈)


스프링 클라우드 스트림을 이용하면 RabbitMQ,Kafka와 같은 미들웨어 메시지 시스템과는 종속적이지 않게 추상화된 방식으로 메시지 시스템을 이용할 수 있다.(support RabbitMQ,Kafka) 아래 그림은 스프링 클라우드 스트림의 애플리케이션 모델이다.

스프링 클라우드 스트림 애플리케이션과 메시지 미들웨어 시스템은 직접 붙지는 않는다. 중간에 스프링 클라우드 스트림이 제공하는 바인더 구현체를 중간에 두고 통신을 하기 때문에 애플리케이션에서는 미들웨어 독립적으로 추상화된 방식으로 개발 진행이 가능하다. 그리고 애플리케이션과 바인더는 위의 그림과 같이 inputs, outputs 채널과 통신을 하게 된다.

  • Binder : 외부 메시징 시스템과의 통합을 담당하는 구성 요소입니다.
  • Binding(input/output) : 외부 메시징 시스템과 응용 프로그램 간의 브리지 (대상 바인더에서 생성 한 메시지 생성자  소비자 ).
  • Middleware : RabbitMQ, Kafka와 같은 메시지 시스템.

바인더 같은 경우는 스프링이 설정에서 읽어 미들웨어에 해당하는 바인더를 구현체로 제공해준다. 물론 RabbitMQ, Kafka를 동시에 사용 가능하다. 그리고 바인딩 같은 경우는 기본으로 Processor(input,output),Source(output),Sink(input)라는 바인딩을 인터페이스로 제공한다. 이러한 스프링 클라우드 스트림을 이용하여 작성한 완벽히 동작하는 코드의 예제이다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@SpringBootApplication
@EnableBinding(Processor.class)
public class MyApplication {
 
    public static void main(String[] args) {
        SpringApplication.run(MyApplication.class, args);
    }
 
    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public String handle(String value) {
        System.out.println("Received: " + value);
        return value.toUpperCase();
    }
}
cs


이 코드는 Processor라는 바인딩 채널을 사용하고, handle이라는 메소드에서 Processor의 input 채널에서 메시지를 받아서 값을 한번 출력하고 그대로 output 채널로 메시지를 보내는 코드이다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public interface Sink {
 
  String INPUT = "input";
 
  @Input(Sink.INPUT)
  SubscribableChannel input();
 
}
public interface Source {
 
  String OUTPUT = "output";
 
  @Output(Source.OUTPUT)
  MessageChannel output();
 
}
 
public interface Processor extends Source, Sink {}
cs


스프링 클라우드 스트림에서 기본적으로 제공하는 바인딩 채널이다. 만약 이 채널들 이외에 채널을 정의하고 싶다면 위와 같이 인터페이스로 만들어주고, @EnableBinding에 매개변수로 넣어주면된다.(매개변수는 여러개의 인터페이스를 받을 수 있다.)


1
2
3
4
5
6
7
8
9
10
11
public interface Barista {
 
    @Input
    SubscribableChannel orders();
 
    @Output
    MessageChannel hotDrinks();
 
    @Output
    MessageChannel coldDrinks();
}
cs

또한 채널 바인딩 어노테이션(@Input,@Output) 매개변수로 채널이름을 넣어줄 수 있다.

1
@EnableBinding(value = { Orders.class, Payment.class })
cs


스프링 클라우드 스트림에서 바인딩 가능한 채널타입은 MessageChannel(inbound)와 SubscribableChannel(outbound) 두개이다.

1
2
3
4
5
6
public interface PolledBarista {
 
    @Input
    PollableMessageSource orders();
    . . .
}
cs


지금까지는 이벤트 기반 메시지이지만 위처럼 Pollable한 채널을 바인딩 할 수 있다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Autowire
private Source source
 
public void sayHello(String name) {
    source.output().send(MessageBuilder.withPayload(name).build());
}
 
@Autowire
private MessageChannel output;
 
public void sayHello(String name) {
    output.send(MessageBuilder.withPayload(name).build());
}
 
@Autowire
@Qualifier("myChannel")
private MessageChannel output;
cs


정의한 채널에 대한 @Autowired하여 직접 사용도 가능하다. 그리고 @Qualifier 어노테이션을 이용해 다중 채널이 정의되어 있을 경우 특정한 채널을 주입받을 수 있다.


@StreamListener 사용

스프링 클라우드 스트림은 다른 org.springframework.messaging의 어노테이션도 같이 사용가능하다.(@Payload,@Headers,@Header)


1
2
3
4
5
6
7
8
9
10
11
@EnableBinding(Sink.class)
public class VoteHandler {
 
  @Autowired
  VotingService votingService;
 
  @StreamListener(Sink.INPUT)
  public void handle(Vote vote) {
    votingService.record(vote);
  }
}
cs


1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
@Slf4j
public class KafkaListener {
    
    @StreamListener(ExamProcessor.INPUT)
    @SendTo(ExamProcessor.OUTPUT2)
    public Exam listenMessage(@Payload Exam payload,@Header("contentType"String header) {
        log.info("input message = {} = {}",payload.toString(),header);
        return payload;
    }
}
 
=>결과 : input message = Exam(id=id_2, describe=test message != application/json
 
cs


또한 @SendTo 어노테이션을 추가로 붙여서 메시지를 수신하고 어떠한 처리를 한 후에 메시지를 출력채널로 내보낼 수 있다.


1
2
3
4
5
6
7
8
9
10
11
12
@EnableBinding(Processor.class)
public class TransformProcessor {
 
  @Autowired
  VotingService votingService;
 
  @StreamListener(Processor.INPUT)
  @SendTo(Processor.OUTPUT)
  public VoteResult handle(Vote vote) {
    return votingService.record(vote);
  }
}
cs


바로 위의 코드는 즉, 인바운드 채널에서 메시지를 받아서 그대로 해당 데이터를 리턴하여 아웃바운드 채널에 내보내는 예제이다. 실제로는 메소드 내에 비지니스 로직이 들어가 적절히 데이터 조작이 있을 수 있다.


@StreamListener for Content-based routing

조건별로 @StreamListener 주석처리가 된 인입채널 메소드로 메시지를 유입시킬 수 있다. 하지만 이 기능에는 밑에와 같은 조건을 충족해야한다.


  • 반환값이 있으면 안된다.
  • 개별 메시지 처리 메소드여야한다.

조건은 어노테이션 내의 condition 인수에 SpEL 표현식에 의해 지정된다. 그리고 조건과 일치하는 모든 핸들러는 동일한 스레드에서 호출되며 핸들러에 대한 호출이 발생하는 순서는 가정할 수 없다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class TestPojoWithAnnotatedArguments {
 
    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bogey'")
    public void receiveBogey(@Payload BogeyPojo bogeyPojo) {
       // handle the message
    }
 
    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bacall'")
    public void receiveBacall(@Payload BacallPojo bacallPojo) {
       // handle the message
    }
}
cs


위에서 얘기한 것과 같이 조건별로 메시지를 분기하는 경우 해당 채널로 @StreamListener된 메소드들은 모두 반환값이 void여야한다.(Sink.INPUT) 만약 위에서 위는 void, 아래에는 반환값이 있는 메소드로 선언한다면 예외가 발생한다.



Error Handler

Spring Cloud Stream은 오류 처리를 유연하게 처리하는 메커니즘을 제공한다. 오류 처리에는 크게 두가지가 있다.

  • application : 사용자 정의 오류처리이며, 애플리케이션 내에서 오류 처리를 진행한다.
  • system : 오류 처리가 기본 메시징 미들웨어의 기능에 따라 달라진다.


Application Error Handler


애플리케이션 레벨 오류 처리에는 두 가지 유형이 있다. 오류는 각 바인딩 subscription에서 처리되거나 전역 오류 핸들러가 모든 바인딩 subscription의 오류를 처리한다. 각 input 바인딩에 대해, Spring Cloud Stream은 <destinationName>.<groupName>.errors 설정으로 전용 오류 채널을 생성한다.


1
spring.cloud.stream.bindings.input.group=myGroup
cs


컨슈머 그룹을 설정한다.(만약 그룹명을 지정하지 않았을 경우 익명으로 그룹이 생성된다.)


1
2
3
4
5
6
7
8
9
@StreamListener(Sink.INPUT) // destination name 'input.myGroup'
public void handle(Person value) {
    throw new RuntimeException("BOOM!");
}
//만약 Sink.INPUT의 input 채널이름이 input이고, 해당 채널의 consumer group이 myGroup이라면
//해당 채널 단독의 에러 처리기의 inputChannel 매개변수의 값은 아래와 같다.("input.myGroup.errors")
@ServiceActivator(inputChannel = "input.myGroup.errors"//channel name 'input.myGroup.errors'
public void error(Message<?> message) {
    System.out.println("Handling ERROR: " + message);
}
cs


위와 같은 코드를 작성하면 Sink.INPUT.myGroup 채널의 전용 에러채널이 생기는 것이다. 하지만 이렇게 채널별이 아닌 전역 에러 처리기를 작성하려면 아래와 같은 코드를 작성하면 된다.


1
2
3
4
@StreamListener("errorChannel")
public void error2(Message<?> message) {
    log.error("Global Error Handling !");
}
cs


System Error Handling


System 레벨의 오류 처리는 오류가 메시징 시스템에 다시 전달되는 것을 의미하며, 모든 메시징 시스템이 동일하지는 않다. 즉, 바인더마다 기능이 다를 수 있다. 내부 오류 처리기가 구성되어 있지 않으면 오류가 바인더에 전파되고 바인더가 해당 오류를 메시징 시스템에 전파한다. 메시징 시스템의 기능에 따라 시스템은 메시지를 삭제하고 메시지를 다시 처리하거나 실패한 메시지를 DLQ로 보낼 수 있다. 해당 내용은 뒤에서 더 자세히 다룬다.


바인딩 정보 시각화


1
2
3
4
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
cs


의존성을 추가해준다.


1
management.endpoints.web.exposure.include=bindings
cs


application.propertis에 해당 설정을 추가해준 후에, host:port/actuator/bindings 를 호출하면 바인딩된 정보를 JSON형태로 받아볼 수 있다.



Binder Configuration Properties


Spring Auto configuration을 사용하지 않고 사용자 정의 바인더를 등록할때 다음 등록 정보들을 사용할 수 있다. 이 속성들은 모두 org.springframework.cloud.stream.config.BinderProperties 패키지에 정의되어있다. 그리고 해당 설정들은 모두 spring.cloud.stream.binders. 접두어가 붙는다. 밑에서 설명할 설정들은 모두 접두어가 생략된 형태이므로 실제로 작성할때는 접두어를 붙여준다.


  • type

바인더의 타입을 지정한다.(rabbit,kafka)

  • inheritEnvironment

애플리케이션 자체 환경을 상속하는지 여부

기본값 : true

  • environment

바인더 환경을 사용자가 직접 정의하는데 사용할 수 있는 설정이다.

기본값 : empty

  • defaultCandidate

바인더 구성이 기본 바인더로 간주되는지 또는 명시적으로 참조할 때만 사용할 수 있는지 여부 이 설정을 사용하면 기본 처리를 방해하지 않고 바인더 구성이 가능하다.

기본값 : true



Common Binding Properties


binding 설정입니다. 해당 설정은 spring.cloud.stream.bindings.<channelName> 접두어가 붙는다. 밑에서 설명할 설정은 모두 접두어가 생략된 설정이므로, 직접 애플리케이션을 설정할때에는 꼭 접두어를 붙여줘야한다.


  • destination

해당 채널을 메시지 시스템 토픽과 연결해주는 설정이다. 채널이 consumer로 바인딩되어 있다면, 여러 대상에 바인딩 될 수 있으며 대상 이름은 쉼표로 구분된 문자열이다.

  • group

컨슈머 그룹명설정이다. 해당 설정은 인바운드 바인딩에만 적용되는 설정이다.

기본값 : null

  • contentType

메시지의 컨텐츠 타입이다.

기본값 : null

  • binder

사용될 바인더를 설정한다.

기본값 : null



Consumer Properties

  • concurrency

인바운드 소비자의 동시성

기본값 : 1.

  • partitioned

컨슈머가 파티션된 프로듀서로부터 데이터를 수신하는지 여부입니다.

기본값 : false.

  • headerMode

none으로 설정하면 입력시 헤더 구문 분석을 사용하지 않습니다. 기본적으로 메시지 헤더를 지원하지 않으며 헤더 포함이 필요한 메시징 미들웨어에만 유효합니다. 이 옵션은 원시 헤더가 지원되지 않을 때 비 Spring Cloud Stream 애플리케이션에서 데이터를 사용할 때 유용합니다. 로 설정 headers하면 미들웨어의 기본 헤더 메커니즘을 사용합니다. 로 설정 embeddedHeaders하면 메시지 페이로드에 헤더가 포함됩니다.

기본값 : 바인더 구현에 따라 다릅니다.

  • maxAttempts

처리가 실패하면 다시 메시지를 처리하는 시도 횟수 (첫 번째 포함). 1로 설정하면 다시 메시지 처리를 시도하지 않는다.

기본값 : 3.

  • backOffInitialInterval

다시 시도 할 때 백 오프 초기 간격입니다.

기본값 : 1000.

  • backOffMaxInterval

최대 백 오프 간격.

기본값 : 10000.

  • backOffMultiplier

백 오프 승수입니다.

기본값 : 2.0.

  • instanceIndex

0보다 큰 값으로 설정하면이 소비자의 인스턴스 색인을 사용자 정의 할 수 있습니다 (다른 경우spring.cloud.stream.instanceIndex). 음수 값으로 설정하면 기본값은로 설정됩니다.(카프카의 경우 autoRebalence 설정이 false 일 경우)

기본값 : -1.

  • instanceCount

0보다 큰 값으로 설정하면이 소비자의 인스턴스 수를 사용자 정의 할 수 있습니다 (다른 경우 spring.cloud.stream.instanceCount). 음수 값으로 설정하면 기본값은로 설정됩니다.(카프카의 경우 autoRebalence 설정이 false 일 경우)

기본값 : -1.


Producer Properties

  • partitionKeyExpression

아웃 바운드 데이터를 분할하는 방법을 결정하는 SpEL 식입니다. set 또는 ifpartitionKeyExtractorClass가 설정되면이 채널의 아웃 바운드 데이터가 분할됩니다.partitionCount효과가 있으려면 1보다 큰 값으로 설정해야합니다. 

기본값 : null.

  • partitionKeyExtractorClass

PartitionKeyExtractorStrategy구현입니다. set 또는 if partitionKeyExpression가 설정되면이 채널의 아웃 바운드 데이터가 분할됩니다. partitionCount효과가 있으려면 1보다 큰 값으로 설정해야합니다. 

기본값 : null.

  • partitionSelectorClass

PartitionSelectorStrategy구현입니다.

기본값 : null.

  • partitionSelectorExpression

파티션 선택을 사용자 정의하기위한 SpEL 표현식. 

기본값 : null.

  • partitionCount

파티셔닝이 사용 가능한 경우 데이터의 대상 파티션 수입니다. 제작자가 분할 된 경우 1보다 큰 값으로 설정해야합니다. 카프카에서는 힌트로 해석됩니다. 이것보다 크고 대상 항목의 파티션 수가 대신 사용됩니다.

기본값 : 1.


Content-Type


Spring Cloud Stream은 contentType에 대해 세 가지 메커니즘을 제공한다.


  • Header 

contentType 자체를 헤더로 제공한다.

  • binding

spring.cloud.stream.bindings.<inputChannel>.content-type 설정으로 타입을 설정한다.

  • default

contentType이 명시적으로 설정되지 않은 경우 기본 application/json 타입으로 적용한다.


위의 순서대로 우선순위 적용이 된다.(Header>bindings>default) 만약 메소드 반환 타입이 Message 타입이면 해당 타입으로 메시지를 수신하고, 만약 일반 POJO로 반환타입이 정의되면 컨슈머에서 해당 타입으로 메시지를 수신한다.





Apache Kafka Binder


1
2
3
4
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
cs


의존성을 추가해준다.


Kafka Binder Properties

  • spring.cloud.stream.kafka.binder.brokers

Kafka 바인더가 연결될 브로커 목록이다.

기본값 : localhost

  • spring.cloud.stream.kafka.binder.defaultBrokerPort

brokers 설정에 포트 정보가 존재하지 않는다면 해당 호스트 리스트들이 사용할 포트를 지정해준다.

기본값 : 9092

  • spring.cloud.stream.kafka.binder.configuration

바인더로 작성된 모든 클라이언트에 전달 될 클라이언트 속성(프로듀서,컨슈머)의 키/값 맵이다. 이러한 속성은 프로듀서와 컨슈머 모두가 사용한다.

기본값 : empty map

  • spring.cloud.stream.kafka.binder.headers

바인더에 의해 전송되는 사용자 지정 헤더 목록이다.

기본값 : null

  • spring.cloud.stream.kafka.binder.healthTimeout

파티션 정보를 얻는 데 걸리는 시간(초). 

기본값 : 10

  • spring.cloud.stream.kafka.binder.requiredAcks

브로커에서 필요한 ack수이다.(해당 설명은 이전 포스팅에 설명되어있다.)

기본값 : 1

  • spring.cloud.stream.kafka.binder.minPartitionCount

autoCreateTopics,autoAddPartitions true를 설정했을 경우에만 적용되는 설정이다. 바인더가 데이터를 생성하거나 소비하는 주제에 대해 바인더가 구성하는 최소 파티션 수이다.

기본값 : 1

  • spring.cloud.stream.kafka.binder.replicationFactor

autoCreateTopics true를 설정했을 경우에 자동 생성된 토픽 복제요소 수이다.

기본값 : 1

  • spring.cloud.stream.kafka.binder.autoCreateTopics

true로 설정하면 토픽이 존재하지 않을 경우 자동으로 토픽을 만들어준다. 만약 false로 설정되어있다면 미리 토픽이 생성되어 있어야한다.

기본값 : true

  • spring.cloud.stream.kafka.binder.autoAddPartitions

true로 설정하면 바인더가 필요할 경우 파티션을 추가한다. 예를 들어 사용자의 메시지 유입이 증가하여 컨슈머를 추가하여 파티션수가 하나더 늘었다고 가정하자. 그러면 기존의 토픽의 파티션 수는 증설한 파티션의 총수보다 작을 것이고, 이 설정이 true라면 바인더가 자동으로 파티션수를 증가시켜준다.

기본값 : false



Kafka Consumer Properties


spring.cloud.stream.kafka.bindings.<inputChannel>.consumer 접두어가 붙는다.

  • autoRebalanceEnabled

파티션 밸런싱을 자동으로 처리해준다.

기본값 : true

  • autoCommitOffset

메시지가 처리되었을 경우, 오프셋을 자동으로 커밋할지를 설정한다.

기본값 : true

  • startOffset

새 그룹의 시작 오프셋이다. earliest, latest

기본값 : null(==eariest)

  • resetOffsets
consumer의 오프셋을 startOffset에서 제공한 값으로 재설정할지 여부
기본값 : false


Kafka Producer Properties

  • bufferSize

kafka 프로듀서가 전송하기 전에 일괄 처리하려는 데이터 크기이다.

기본값 : 16384

  • batchTimeout

프로듀서가 메시지를 보내기 전에 동일한 배치에 더 많은 메시지가 누적될 수 있도록 대기하는 시간. 예를 들어 버퍼사이즈가 꽉 차지 않았을 경우 얼마나 기다렸다고 메시지 처리할 것인가를 정하는 시간이다.

기본값 : 0


Partitioning with the Kafka Binder


순서가 중요한 메시지가 있을 경우가 있다. 이럴 경우에는 어떠한 키값을 같이 포함시켜 메시지를 발신함으로써 하나의 파티션에만 데이터를 보내 컨슈머쪽에서 데이터의 순서를 정확히 유지하여 데이터를 받아올 수 있다.


1
2
3
#partition-key-expression
spring.cloud.stream.bindings.exam-output.producer.partition-key-expression=headers['partitionKey']
spring.cloud.stream.bindings.exam-output.producer.partition-count=4
cs


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
    @Autowired
    private ExamProcessor processor;
    
    public void sendMessage(Exam exam,String partitionId) {
        log.info("Sending Message = {}",exam.toString());
        
        MessageChannel outputChannel = processor.outboundChannel();
        
        outputChannel.send(MessageBuilder
                                    .withPayload(exam)
                                    .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                                    .setHeader("partitionKey", partitionId)
                                    .build());
    }
 
    @StreamListener(ExamProcessor.INPUT)
    public void listenMessage(@Payload Exam payload,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int header) {
        log.info("input message = {} partition_id= {}",payload.toString(),header);
    }
cs


application.properties에 파티션키로 사용할 헤더의 키값을 설정하고, 프로듀서 쪽에서 키값을 포함시켜 데이터를 보낸 후에 컨슈머쪽에서 어떠한 파티션에서 데이터를 받았는지 확인해본다. 키값을 동일하게 유지한채 메시지를 보내면 하나의 파티션에서만 메시지를 받아온다. 그러나 파티션 키값을 적용하지 않고 메시지를 보내면 컨슈머가 받아오는 파티션 아이디는 계속해서 변경이 된다.


이번 포스팅은 내용이 조금 길어져서 여기까지만 작성하고 다음 포스팅에서 Spring Cloud Stream 기반에서 사용하는 kafka stream API에 대해 포스팅할 것이다. 이번 포스팅에서는 많은 설명들이 레퍼런스에 비해 빠져있다. 아는 내용은 작성하고 모르는 내용은 포함시킬경우 틀릴 가능성이 있기 때문에 레퍼런스와 비교해 내용에 차이가 있다. 혹시나 더 많은 설명이 필요하다면 직접 레퍼런스를 보면 될것 같다.

posted by 여성게
: