2020. 7. 26. 17:32ㆍMiddleware/Kafka&RabbitMQ
오늘 포스팅할 내용은 래빗엠큐이다. 그 동안에는 카프카를 사용할 일이 많아 카프카에 대한 포스팅이 주였는데, 이번에 래빗엠큐를 사용할 일이 생겨 간단히 래빗엠큐에 대해 간단히 다루어 볼것이다.(예제 코드는 위 깃헙에 올려놓았습니다.)
비동기 작업에 있어 큐를 사용하려면 중간에 메시지 브로커라는 개념이 존재하는데, 이러한 메시지 브로커에는 RabbitMQ, Kafka 등이 대표적으로 있다. 해당 포스트에서는 표준 MQ프로토콜인 AMQP를 구현한 RabbitMQ(래빗엠큐)에 대해 다루어볼 것이다.
간단하게 메시지큐는 아래 그림과 같은 워크 플로우로 이루어져있다.
대부분의 메시지큐는 프로듀서가 있고, 해당 프로듀서가 브로커로 메시지를 발행하면, 적절한 컨슈머가 해당 메시지를 구독(읽다)하는 구조이다. 그렇다면 래빗엠큐는 상세하게 어떠한 구조로 되어있을까?
래빗엠큐는 단순히 프로듀서가 브로커로 메시지를 전달하여 컨슈머가 큐를 읽어가는 구조라는 면에서는 동일하지만, 프로듀싱하는 과정에서 조금 더 복잡한 개념이 들어간다. 바로 exchange와 route, queue라는 개념이다.
간단하게 워크 플로우를 설명하자면 아래와 같다.
- Producer는 Message를 Exchange에게 보내게 됩니다.
- Exchange를 생성할때 Exchange의 Type을 정해야 합니다.
- Exchange는 Routing Key를 사용하여 적절한 Queue로 Routing을 진행합니다.
- Routing은 Exchange Type에 따라 전략이 바뀌게 됩니다.
- Exchange - Queue와 Binding이 완료된 모습을 볼 수 있습니다.
- Message 속성에 따라 적절한 Queue로 Routing이 됩니다.
- Message는 Consumer가 소비할때까지 Queue에 대기하게 됩니다.
- Consumer는 Message를 소비하게 됩니다.
위에서 1번에 Exchange라는 개념이 등장하는데, Exchange는 정해진 규칙으로 메시지를 라우팅하는 기능을 가지고 있다. 여기서 정해진 규칙은 크게 4가지가 존재하는데, 규칙들은 아래와 같다.
exchange routeing 전략
- Direct Exchange
- Message의 Routing Key와 정확히 일치하는 Binding된 Queue로 Routing
- Fanout Exchange
- Binding된 모든 Queue에 Message를 Routing
- Topic Exchange
- 특정 Routing Pattern이 일치하는 Queue로 Routing
- Headers Exchange
- key-value로 정의된 Header 속성을 통한 Routing
익스체인지는 위와 같은 전략으로 메시지를 라우팅하게 된다. 그리고 자주 사용되는 Exchange의 옵션으로는 아래와 같이 있다.
exchange options
- Durability
- 브로커가 재시작 될 때 남아 있는지 여부
- durable -> 재시작해도 유지가능
- transient -> 재시작하면 사라집니다.
- Auto-delete
- 마지막 Queue 연결이 해제되면 삭제
마지막으로 래빗엠큐에서 사용되는 용어가 있다.
- Vhost(virutal host)
- Virtual Host를 통해서 하나의 RabbitMQ 인스턴스 안에 사용하고 있는 Application을 분리할 수 있습니다.
- Connection
- 물리적인 TCP Connection, HTTPS -> TLS(SSL) Connection을 사용
- Channel
- 하나의 물리적인 Connection 내에 생성되는 가상의 Connection
- Consumer의 process나 thread는 각자 Channel을 통해 Queue에 연결 될 수 있습니다.
여기까지 간단하게 래빗엠큐의 동작과 용어에 대해서 다루어봤으니 실제 코드 레벨로 실습을 진행해본다.
docker rabbitmq 설치
sudo docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 \
--restart=unless-stopped -e RABBITMQ_DEFAULT_USER=username \
-e RABBITMQ_DEFAULT_PASS=password rabbitmq:management
위처럼 도커로 래빗엠큐를 설치해주고, http://localhost:15672(username/password)로 접속해보자.
springboot rabbitmq 예제
build.gradle
plugins {
id 'org.springframework.boot' version '2.3.2.RELEASE'
id 'io.spring.dependency-management' version '1.0.9.RELEASE'
id 'java'
}
group = 'com.levi'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '14'
configurations {
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-amqp'
implementation 'org.springframework.boot:spring-boot-starter-web'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
annotationProcessor "org.springframework.boot:spring-boot-configuration-processor"
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
testImplementation 'org.springframework.amqp:spring-rabbit-test'
}
test {
useJUnitPlatform()
}
application.yml
rabbitmq:
test:
username: username
password: password
host: localhost
port: 5672
virtualHost: levi.vhost
routeKey: test.route
exchangeName: test.exchange
queueName: testQueue
deadLetterExchange: dead.letter.exchange
deadLetterRouteKey: dead.letter.route
application.yml에서 프로퍼티를 읽어오는 클래스를 아래 하나 만들었다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
@Configuration
@ConfigurationProperties(prefix = "rabbitmq")
@Getter
@Setter
public class RabbitMqProperty {
private RabbitMqDetailProperty test;
@Getter
@Setter
public static class RabbitMqDetailProperty {
private String username;
private String password;
private String host;
private int port;
private String virtualHost;
private String routeKey;
private String exchangeName;
private String queueName;
private String deadLetterExchange;
private String deadLetterRouteKey;
}
}
|
cs |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
@SpringBootApplication(exclude = {
RabbitAutoConfiguration.class
})
@EnableRabbit
@RestController
@RequiredArgsConstructor
public class RabbitmqApplication {
private final RabbitTemplate testTemplate;
public static void main(String[] args) {
SpringApplication.run(RabbitmqApplication.class, args);
}
@GetMapping("/")
public String inQueue() {
testTemplate.convertAndSend(SampleMessage.of("message!!"));
return "success";
}
@Data
@AllArgsConstructor(staticName = "of")
@NoArgsConstructor
public static class SampleMessage {
private String message;
}
}
|
cs |
래빗엠큐 auto configuration을 꺼주었고, 메시지를 인큐하기 위한 컨트롤러를 하나 만들어 주었다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
@Slf4j
@Configuration
@RequiredArgsConstructor
public class RabbitMqConfig {
private final RabbitMqProperty rabbitMqProperty;
@Bean
public ConnectionFactory testConnectionFactory() {
final RabbitMqProperty.RabbitMqDetailProperty test = rabbitMqProperty.getTest();
final CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost(test.getHost());
factory.setPort(test.getPort());
factory.setUsername(test.getUsername());
factory.setPassword(test.getPassword());
factory.setVirtualHost(test.getVirtualHost());
return factory;
}
@Bean
public SimpleRabbitListenerContainerFactory testContainer(final ConnectionFactory testConnectionFactory) {
final SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(testConnectionFactory);
factory.setConnectionFactory(testConnectionFactory);
factory.setErrorHandler(t -> log.error("ErrorHandler = {}", t.getMessage()));
factory.setDefaultRequeueRejected(false); //true일 경우 리스너에서 예외가 발생시 다시 큐에 메시지가 쌓이게 된다.(예외상황 해결안될 시 무한루프)
factory.setMessageConverter(jacksonConverter());
//예외 발생시 recover할 수 있는 옵션, 위 에러 핸들러와 잘 구분해서 사용해야할듯
//위 핸들러와 적용 순서등도 테스트가 필요(혹은 둘다 동시에 적용되나?) // factory.setAdviceChain(
// RetryInterceptorBuilder
// .stateless()
// .maxAttempts(3) //최대 재시도 횟수
// .recoverer() //recover handler
// .backOffOptions(2000, 4, 10000) //2초 간격으로, 4번, 최대 10초 내에 재시도
// .build()
// );
return factory;
}
@Bean
public RabbitTemplate testTemplate(final ConnectionFactory testConnectionFactory, final MessageConverter jacksonConverter) {
final RabbitMqProperty.RabbitMqDetailProperty property = rabbitMqProperty.getTest();
final RabbitTemplate template = new RabbitTemplate();
template.setConnectionFactory(testConnectionFactory);
template.setMessageConverter(jacksonConverter);
template.setExchange(property.getExchangeName());
template.setRoutingKey(property.getRouteKey());
return template;
}
@Bean
public MessageConverter jacksonConverter() {
return new Jackson2JsonMessageConverter();
}
//app에서 큐만들거나 바인딩 하기 위해서는 RabbitAdmin 필요 @Bean
public RabbitAdmin testAdmin(final ConnectionFactory testConnectionFactory) {
return new RabbitAdmin(testConnectionFactory);
}
}
|
cs |
브로커 연결 및 메시지를 프로듀싱하기 위한 RabbitTemplate 설정이다. 이중 RabbitAdmin 빈을 만들고 있는 수동이 아니라, 앱에서 큐를 만들고 바인딩 하기 위해서는 RabbitAdmin이 빈으로 떠있어야한다. 기타 설정은 주석을 참고하자 !
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
@Configuration
@RequiredArgsConstructor
public class RabbitMqBindingConfig {
private final RabbitMqProperty rabbitMqProperty;
@Bean
public TopicExchange testTopicExchange() {
return ExchangeBuilder
.topicExchange(rabbitMqProperty.getTest().getExchangeName())
.durable(true)
.build();
}
@Bean
public TopicExchange dlExchange() {
return ExchangeBuilder
.topicExchange(rabbitMqProperty.getTest().getDeadLetterExchange())
.durable(true)
.build();
}
@Bean
public Queue testQueue() {
final RabbitMqProperty.RabbitMqDetailProperty testDetail = rabbitMqProperty.getTest();
return QueueBuilder
.durable(testDetail.getQueueName())
.deadLetterExchange(testDetail.getDeadLetterExchange())
.deadLetterRoutingKey(testDetail.getDeadLetterRouteKey())
.build();
}
@Bean
public Queue dlq() {
return QueueBuilder
.durable("DEAD_LETTER_QUEUE")
.build();
}
@Bean
public Binding testBinding(final Queue testQueue, final TopicExchange testTopicExchange) {
return BindingBuilder
.bind(testQueue)
.to(testTopicExchange)
.with(rabbitMqProperty.getTest().getRouteKey());
}
@Bean
public Binding dlBinding(final Queue dlq, final TopicExchange dlExchange) {
return BindingBuilder
.bind(dlq)
.to(dlExchange)
.with(rabbitMqProperty.getTest().getDeadLetterRouteKey());
}
}
|
cs |
프로듀싱과 컨슈밍을 위해 브로커에 exchange와 queue를 binding하는 설정이다. 하나 추가적으로는 컨슈밍에서 예외가 발생하였을 때, Recover를 위한 Dead Letter Queue를 선언하였다.
이제 앱을 실행시키고, localhost:8080으로 요청을 보내보자 ! 요청을 보내면 메시지를 프로듀싱 할것이고, 해당 메시지를 컨슈밍해서 로그를 찍게 될 것이다. 여기까지 간단하게 래빗엠큐에 대해 다루어봤다.
https://jonnung.dev/rabbitmq/2019/02/06/about-amqp-implementtation-of-rabbitmq/
https://nesoy.github.io/articles/2019-02/RabbitMQ
'Middleware > Kafka&RabbitMQ' 카테고리의 다른 글
[Kafka] Parallel Consumers, 메시지별 병렬 처리 (0) | 2023.10.30 |
---|---|
Spring cloud stream kafka - concurrency (1) | 2022.07.27 |
Apache Kafka - Kafka Producer(카프카 프로듀서) - 2 (1) | 2019.07.06 |
Apache Kafka - Kafka(카프카)란 ? 분산 메시징 플랫폼 - 1 (0) | 2019.07.06 |
Kafka - Kafka Stream API(카프카 스트림즈) - 2 (0) | 2019.03.30 |