'RabbitMQ'에 해당되는 글 2건
- 2020.07.26 :: RabbitMQ - 레빗엠큐 개념 및 동작방식, 실습
- 2019.02.18 :: Springboot - Rabbitmq를 이용한 비동기 메시징 서비스(리액티브 마이크로서비스) 2
오늘 포스팅할 내용은 래빗엠큐이다. 그 동안에는 카프카를 사용할 일이 많아 카프카에 대한 포스팅이 주였는데, 이번에 래빗엠큐를 사용할 일이 생겨 간단히 래빗엠큐에 대해 간단히 다루어 볼것이다.(예제 코드는 위 깃헙에 올려놓았습니다.)
비동기 작업에 있어 큐를 사용하려면 중간에 메시지 브로커라는 개념이 존재하는데, 이러한 메시지 브로커에는 RabbitMQ, Kafka 등이 대표적으로 있다. 해당 포스트에서는 표준 MQ프로토콜인 AMQP를 구현한 RabbitMQ(래빗엠큐)에 대해 다루어볼 것이다.
간단하게 메시지큐는 아래 그림과 같은 워크 플로우로 이루어져있다.
대부분의 메시지큐는 프로듀서가 있고, 해당 프로듀서가 브로커로 메시지를 발행하면, 적절한 컨슈머가 해당 메시지를 구독(읽다)하는 구조이다. 그렇다면 래빗엠큐는 상세하게 어떠한 구조로 되어있을까?
래빗엠큐는 단순히 프로듀서가 브로커로 메시지를 전달하여 컨슈머가 큐를 읽어가는 구조라는 면에서는 동일하지만, 프로듀싱하는 과정에서 조금 더 복잡한 개념이 들어간다. 바로 exchange와 route, queue라는 개념이다.
간단하게 워크 플로우를 설명하자면 아래와 같다.
- Producer는 Message를 Exchange에게 보내게 됩니다.
- Exchange를 생성할때 Exchange의 Type을 정해야 합니다.
- Exchange는 Routing Key를 사용하여 적절한 Queue로 Routing을 진행합니다.
- Routing은 Exchange Type에 따라 전략이 바뀌게 됩니다.
- Exchange - Queue와 Binding이 완료된 모습을 볼 수 있습니다.
- Message 속성에 따라 적절한 Queue로 Routing이 됩니다.
- Message는 Consumer가 소비할때까지 Queue에 대기하게 됩니다.
- Consumer는 Message를 소비하게 됩니다.
위에서 1번에 Exchange라는 개념이 등장하는데, Exchange는 정해진 규칙으로 메시지를 라우팅하는 기능을 가지고 있다. 여기서 정해진 규칙은 크게 4가지가 존재하는데, 규칙들은 아래와 같다.
exchange routeing 전략
- Direct Exchange
- Message의 Routing Key와 정확히 일치하는 Binding된 Queue로 Routing
- Fanout Exchange
- Binding된 모든 Queue에 Message를 Routing
- Topic Exchange
- 특정 Routing Pattern이 일치하는 Queue로 Routing
- Headers Exchange
- key-value로 정의된 Header 속성을 통한 Routing
익스체인지는 위와 같은 전략으로 메시지를 라우팅하게 된다. 그리고 자주 사용되는 Exchange의 옵션으로는 아래와 같이 있다.
exchange options
- Durability
- 브로커가 재시작 될 때 남아 있는지 여부
- durable -> 재시작해도 유지가능
- transient -> 재시작하면 사라집니다.
- Auto-delete
- 마지막 Queue 연결이 해제되면 삭제
마지막으로 래빗엠큐에서 사용되는 용어가 있다.
- Vhost(virutal host)
- Virtual Host를 통해서 하나의 RabbitMQ 인스턴스 안에 사용하고 있는 Application을 분리할 수 있습니다.
- Connection
- 물리적인 TCP Connection, HTTPS -> TLS(SSL) Connection을 사용
- Channel
- 하나의 물리적인 Connection 내에 생성되는 가상의 Connection
- Consumer의 process나 thread는 각자 Channel을 통해 Queue에 연결 될 수 있습니다.
여기까지 간단하게 래빗엠큐의 동작과 용어에 대해서 다루어봤으니 실제 코드 레벨로 실습을 진행해본다.
docker rabbitmq 설치
sudo docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 \
--restart=unless-stopped -e RABBITMQ_DEFAULT_USER=username \
-e RABBITMQ_DEFAULT_PASS=password rabbitmq:management
위처럼 도커로 래빗엠큐를 설치해주고, http://localhost:15672(username/password)로 접속해보자.
springboot rabbitmq 예제
build.gradle
plugins {
id 'org.springframework.boot' version '2.3.2.RELEASE'
id 'io.spring.dependency-management' version '1.0.9.RELEASE'
id 'java'
}
group = 'com.levi'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '14'
configurations {
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-amqp'
implementation 'org.springframework.boot:spring-boot-starter-web'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
annotationProcessor "org.springframework.boot:spring-boot-configuration-processor"
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
testImplementation 'org.springframework.amqp:spring-rabbit-test'
}
test {
useJUnitPlatform()
}
application.yml
rabbitmq:
test:
username: username
password: password
host: localhost
port: 5672
virtualHost: levi.vhost
routeKey: test.route
exchangeName: test.exchange
queueName: testQueue
deadLetterExchange: dead.letter.exchange
deadLetterRouteKey: dead.letter.route
application.yml에서 프로퍼티를 읽어오는 클래스를 아래 하나 만들었다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
@Configuration
@ConfigurationProperties(prefix = "rabbitmq")
@Getter
@Setter
public class RabbitMqProperty {
private RabbitMqDetailProperty test;
@Getter
@Setter
public static class RabbitMqDetailProperty {
private String username;
private String password;
private String host;
private int port;
private String virtualHost;
private String routeKey;
private String exchangeName;
private String queueName;
private String deadLetterExchange;
private String deadLetterRouteKey;
}
}
|
cs |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
@SpringBootApplication(exclude = {
RabbitAutoConfiguration.class
})
@EnableRabbit
@RestController
@RequiredArgsConstructor
public class RabbitmqApplication {
private final RabbitTemplate testTemplate;
public static void main(String[] args) {
SpringApplication.run(RabbitmqApplication.class, args);
}
@GetMapping("/")
public String inQueue() {
testTemplate.convertAndSend(SampleMessage.of("message!!"));
return "success";
}
@Data
@AllArgsConstructor(staticName = "of")
@NoArgsConstructor
public static class SampleMessage {
private String message;
}
}
|
cs |
래빗엠큐 auto configuration을 꺼주었고, 메시지를 인큐하기 위한 컨트롤러를 하나 만들어 주었다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
@Slf4j
@Configuration
@RequiredArgsConstructor
public class RabbitMqConfig {
private final RabbitMqProperty rabbitMqProperty;
@Bean
public ConnectionFactory testConnectionFactory() {
final RabbitMqProperty.RabbitMqDetailProperty test = rabbitMqProperty.getTest();
final CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost(test.getHost());
factory.setPort(test.getPort());
factory.setUsername(test.getUsername());
factory.setPassword(test.getPassword());
factory.setVirtualHost(test.getVirtualHost());
return factory;
}
@Bean
public SimpleRabbitListenerContainerFactory testContainer(final ConnectionFactory testConnectionFactory) {
final SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(testConnectionFactory);
factory.setConnectionFactory(testConnectionFactory);
factory.setErrorHandler(t -> log.error("ErrorHandler = {}", t.getMessage()));
factory.setDefaultRequeueRejected(false); //true일 경우 리스너에서 예외가 발생시 다시 큐에 메시지가 쌓이게 된다.(예외상황 해결안될 시 무한루프)
factory.setMessageConverter(jacksonConverter());
//예외 발생시 recover할 수 있는 옵션, 위 에러 핸들러와 잘 구분해서 사용해야할듯
//위 핸들러와 적용 순서등도 테스트가 필요(혹은 둘다 동시에 적용되나?) // factory.setAdviceChain(
// RetryInterceptorBuilder
// .stateless()
// .maxAttempts(3) //최대 재시도 횟수
// .recoverer() //recover handler
// .backOffOptions(2000, 4, 10000) //2초 간격으로, 4번, 최대 10초 내에 재시도
// .build()
// );
return factory;
}
@Bean
public RabbitTemplate testTemplate(final ConnectionFactory testConnectionFactory, final MessageConverter jacksonConverter) {
final RabbitMqProperty.RabbitMqDetailProperty property = rabbitMqProperty.getTest();
final RabbitTemplate template = new RabbitTemplate();
template.setConnectionFactory(testConnectionFactory);
template.setMessageConverter(jacksonConverter);
template.setExchange(property.getExchangeName());
template.setRoutingKey(property.getRouteKey());
return template;
}
@Bean
public MessageConverter jacksonConverter() {
return new Jackson2JsonMessageConverter();
}
//app에서 큐만들거나 바인딩 하기 위해서는 RabbitAdmin 필요 @Bean
public RabbitAdmin testAdmin(final ConnectionFactory testConnectionFactory) {
return new RabbitAdmin(testConnectionFactory);
}
}
|
cs |
브로커 연결 및 메시지를 프로듀싱하기 위한 RabbitTemplate 설정이다. 이중 RabbitAdmin 빈을 만들고 있는 수동이 아니라, 앱에서 큐를 만들고 바인딩 하기 위해서는 RabbitAdmin이 빈으로 떠있어야한다. 기타 설정은 주석을 참고하자 !
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
@Configuration
@RequiredArgsConstructor
public class RabbitMqBindingConfig {
private final RabbitMqProperty rabbitMqProperty;
@Bean
public TopicExchange testTopicExchange() {
return ExchangeBuilder
.topicExchange(rabbitMqProperty.getTest().getExchangeName())
.durable(true)
.build();
}
@Bean
public TopicExchange dlExchange() {
return ExchangeBuilder
.topicExchange(rabbitMqProperty.getTest().getDeadLetterExchange())
.durable(true)
.build();
}
@Bean
public Queue testQueue() {
final RabbitMqProperty.RabbitMqDetailProperty testDetail = rabbitMqProperty.getTest();
return QueueBuilder
.durable(testDetail.getQueueName())
.deadLetterExchange(testDetail.getDeadLetterExchange())
.deadLetterRoutingKey(testDetail.getDeadLetterRouteKey())
.build();
}
@Bean
public Queue dlq() {
return QueueBuilder
.durable("DEAD_LETTER_QUEUE")
.build();
}
@Bean
public Binding testBinding(final Queue testQueue, final TopicExchange testTopicExchange) {
return BindingBuilder
.bind(testQueue)
.to(testTopicExchange)
.with(rabbitMqProperty.getTest().getRouteKey());
}
@Bean
public Binding dlBinding(final Queue dlq, final TopicExchange dlExchange) {
return BindingBuilder
.bind(dlq)
.to(dlExchange)
.with(rabbitMqProperty.getTest().getDeadLetterRouteKey());
}
}
|
cs |
프로듀싱과 컨슈밍을 위해 브로커에 exchange와 queue를 binding하는 설정이다. 하나 추가적으로는 컨슈밍에서 예외가 발생하였을 때, Recover를 위한 Dead Letter Queue를 선언하였다.
이제 앱을 실행시키고, localhost:8080으로 요청을 보내보자 ! 요청을 보내면 메시지를 프로듀싱 할것이고, 해당 메시지를 컨슈밍해서 로그를 찍게 될 것이다. 여기까지 간단하게 래빗엠큐에 대해 다루어봤다.
https://jonnung.dev/rabbitmq/2019/02/06/about-amqp-implementtation-of-rabbitmq/
https://nesoy.github.io/articles/2019-02/RabbitMQ
'Middleware > Kafka&RabbitMQ' 카테고리의 다른 글
[Kafka] Parallel Consumers, 메시지별 병렬 처리 (0) | 2023.10.30 |
---|---|
Spring cloud stream kafka - concurrency (1) | 2022.07.27 |
Apache Kafka - Kafka Producer(카프카 프로듀서) - 2 (1) | 2019.07.06 |
Apache Kafka - Kafka(카프카)란 ? 분산 메시징 플랫폼 - 1 (0) | 2019.07.06 |
Kafka - Kafka Stream API(카프카 스트림즈) - 2 (0) | 2019.03.30 |
Spring - Rabbitmq를 이용한 비동기 메시징 서비스
-리액티브 마이크로서비스
Mac OS 환경에서 작성되었습니다.
오늘은 간단히 Spring boot + Rabbitmq를 이용한 비동기 메시징 서비스를 구현해볼 것이다.
일단 이 포스팅을 진행하는 이유는 요즘 시대에는 일체형 애플리케이션이 작은 서비스 단위인 마이크로서비스 단위로 나누어
서비스 단위로 배포하는 아키텍쳐가 대세인듯하다.
이 말은 즉슨, 아주 큰 애플리케이션이 작은 서비스 단위(마이크로서비스)로 나뉘어 각각 단독적으로 독립적으로 실행가능한 상태로 배포가 된다.
이런 경우 마이크로서비스끼리의 통신은 RESTful한 통신도 있지만 메시지 큐와 같은 서비스를 이용하여 비동기적으로 통신하기도 한다.
그리고 이 구조를 발행구독구조라고 한다.
위의 사진과 같이 두 마이크로서비스 애플리케이션이 외부의 큐로 연결되는 구조인 것이다.
간단히 설명하면 Sender라는 하나의 애플리케이션에서 큐로 발신하면
Receiver라는 애플리케이션이 해당 큐에 대한 이벤트를 수신하여
로직을 처리하게 되는 것이다.
Rabbitmq를 다운로드한다.
▶︎▶︎▶︎래빗엠큐다운로드
Standalone MacOS binary를 클릭하여 받는다.
Rabbitmq 관리자 페이지 플러그인을 활성화하는 명령이다.
관리자 페이지에 들어갈 계정을 만들어준다.
만약 계정을 만드는 데 밑의 에러가 발생한다면
Error:
{:undef, [{:crypto, :hash, [:sha256, <<94, 223, 167, 31, 97, 108, 105, 118, 101>>], []}, {:rabbit_password, :hash, 2, [file: 'src/rabbit_password.erl', line: 34]}, {:rabbit_auth_backend_internal, :add_user_sans_validation, 3, [file: 'src/rabbit_auth_backend_internal.erl', line: 252]}, {:rpc, :"-handle_call_call/6-fun-0-", 5, [file: 'rpc.erl', line: 197]}]}
>brew install openssl 로 인스톨하면 된다.
Rabbitmq를 실행시켜 준다. 그리고 localhost:15672로 접속한 후에 방금 전에 만들었던 계정으로
로그인한다.
위의 사진에 Set permisstion 버튼을 클릭하여 해당 계정으로 큐에서 읽거나 쓰는 권한을 부여한다.
여기까지 간단한 Rabbitmq의 설정이 완료되었다. 이것보다 더 많은 설정 요소들이 존재하겠지만,
예제로 간단하게 이정도로만 설정한다.
참고: 관리자페이지 포트 : 15672
애플리케이션(amqp) :5672
클러스터링 : 25672
소스 예제 회원가입 이후에 외부 큐에 가입 메시지를 날리면(Sender) 수신애플리케이션에서 가입 회원의 이메일을 커맨드에 뿌려준다.(Receiver)
<Sender>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 | package org.rvslab.chapter3; import java.util.Optional; import javax.persistence.Entity; import javax.persistence.GeneratedValue; import javax.persistence.GenerationType; import javax.persistence.Id; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Lazy; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.repository.query.Param; import org.springframework.data.rest.core.annotation.RepositoryRestResource; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Mono; import springfox.documentation.swagger2.annotations.EnableSwagger2; @SpringBootApplication @EnableSwagger2 public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } @Bean CommandLineRunner init(CustomerRespository customerRepository) { return (evt) -> { customerRepository.save(new Customer("Adam","adam@boot.com")); customerRepository.save(new Customer("John","john@boot.com")); customerRepository.save(new Customer("Smith","smith@boot.com")); customerRepository.save(new Customer("Edgar","edgar@boot.com")); customerRepository.save(new Customer("Martin","martin@boot.com")); customerRepository.save(new Customer("Tom","tom@boot.com")); customerRepository.save(new Customer("Sean","sean@boot.com")); }; } } @RestController class CustomerController{ CustomerRegistrar customerRegistrar; @Autowired CustomerController(CustomerRegistrar customerRegistrar){ this.customerRegistrar = customerRegistrar; } @RequestMapping( path="/register", method = RequestMethod.POST) Mono<Customer> register(@RequestBody Customer customer){ return customerRegistrar.register(customer); } } @Component @Lazy class CustomerRegistrar { CustomerRespository customerRespository; Sender sender; @Autowired CustomerRegistrar(CustomerRespository customerRespository, Sender sender){ this.customerRespository = customerRespository; this.sender = sender; } public Mono<Customer> registerMono(Mono<Customer> monoCustomer){ monoCustomer.doOnNext(customer -> { if(customerRespository.findByName(customer.getName()).isPresent()) System.out.println("Duplicate Customer"); else { customerRespository.save(customer); //sender.send(customer.getEmail()); } }).subscribe(); return monoCustomer; } // ideally repository will return a Mono object public Mono<Customer> register(Customer customer){ if(customerRespository.findByName(customer.getName()).isPresent()) System.out.println("Duplicate Customer. No Action required"); else { customerRespository.save(customer); //외부 큐에게 메시지전송 sender.send(customer.getEmail()); System.out.println("Rabbitmq send :::: "+customer.toString()); } return Mono.just(customer); } } //외부 큐와 연결하기 위한 Sender를 빈으로 등록한다. //또 빈으로 Queue객체를 등록해준다. 해당 문자열로 Receiver쪽에서도 동일하게 받아야한다. //RabbitMessagingTemplate으로 외부 큐에게 메시지를 전송한다. @Component @Lazy class Sender { RabbitMessagingTemplate template; @Autowired Sender(RabbitMessagingTemplate template){ this.template = template; } @Bean Queue queue() { return new Queue("CustomerQ", false); } public void send(String message){ template.convertAndSend("CustomerQ", message); System.out.println("Ready to send message but suppressed "+ message); } } //repository does not support Reactive. Ideally this should use reactive repository @RepositoryRestResource @Lazy interface CustomerRespository extends JpaRepository <Customer,Long>{ Optional<Customer> findByName(@Param("name") String name); } //Entity class @Entity class Customer{ @Id @GeneratedValue(strategy = GenerationType.AUTO) private Long id; private String name; private String email; public Customer (){} public Customer(String name, String email) { super(); this.name = name; this.email = email; } public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getEmail() { return email; } public void setEmail(String email) { this.email = email; } @Override public String toString() { return "Customer [id=" + id + ", name=" + name + ", email=" + email + "]"; } } | cs |
- management.security.enabled=false
- spring.rabbitmq.host=localhost
- spring.rabbitmq.port=5672
- spring.rabbitmq.username=yeoseong
- spring.rabbitmq.password=yeoseong
<Receiver>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 | package org.rvslab.chapter3; import java.util.Optional; import javax.persistence.Entity; import javax.persistence.GeneratedValue; import javax.persistence.GenerationType; import javax.persistence.Id; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Lazy; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.repository.query.Param; import org.springframework.data.rest.core.annotation.RepositoryRestResource; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Mono; import springfox.documentation.swagger2.annotations.EnableSwagger2; @SpringBootApplication @EnableSwagger2 public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } @Bean CommandLineRunner init(CustomerRespository customerRepository) { return (evt) -> { customerRepository.save(new Customer("Adam","adam@boot.com")); customerRepository.save(new Customer("John","john@boot.com")); customerRepository.save(new Customer("Smith","smith@boot.com")); customerRepository.save(new Customer("Edgar","edgar@boot.com")); customerRepository.save(new Customer("Martin","martin@boot.com")); customerRepository.save(new Customer("Tom","tom@boot.com")); customerRepository.save(new Customer("Sean","sean@boot.com")); }; } } @RestController class CustomerController{ CustomerRegistrar customerRegistrar; @Autowired CustomerController(CustomerRegistrar customerRegistrar){ this.customerRegistrar = customerRegistrar; } @RequestMapping( path="/register", method = RequestMethod.POST) Mono<Customer> register(@RequestBody Customer customer){ return customerRegistrar.register(customer); } } @Component @Lazy class CustomerRegistrar { CustomerRespository customerRespository; Sender sender; @Autowired CustomerRegistrar(CustomerRespository customerRespository, Sender sender){ this.customerRespository = customerRespository; this.sender = sender; } public Mono<Customer> registerMono(Mono<Customer> monoCustomer){ monoCustomer.doOnNext(customer -> { if(customerRespository.findByName(customer.getName()).isPresent()) System.out.println("Duplicate Customer"); else { customerRespository.save(customer); //sender.send(customer.getEmail()); } }).subscribe(); return monoCustomer; } // ideally repository will return a Mono object public Mono<Customer> register(Customer customer){ if(customerRespository.findByName(customer.getName()).isPresent()) System.out.println("Duplicate Customer. No Action required"); else { customerRespository.save(customer); //외부 큐에게 메시지전송 sender.send(customer.getEmail()); System.out.println("Rabbitmq send :::: "+customer.toString()); } return Mono.just(customer); } } //외부 큐와 연결하기 위한 Sender를 빈으로 등록한다. //또 빈으로 Queue객체를 등록해준다. 해당 문자열로 Receiver쪽에서도 동일하게 받아야한다. //RabbitMessagingTemplate으로 외부 큐에게 메시지를 전송한다. @Component @Lazy class Sender { RabbitMessagingTemplate template; @Autowired Sender(RabbitMessagingTemplate template){ this.template = template; } @Bean Queue queue() { return new Queue("CustomerQ", false); } public void send(String message){ template.convertAndSend("CustomerQ", message); System.out.println("Ready to send message but suppressed "+ message); } } //repository does not support Reactive. Ideally this should use reactive repository @RepositoryRestResource @Lazy interface CustomerRespository extends JpaRepository <Customer,Long>{ Optional<Customer> findByName(@Param("name") String name); } //Entity class @Entity class Customer{ @Id @GeneratedValue(strategy = GenerationType.AUTO) private Long id; private String name; private String email; public Customer (){} public Customer(String name, String email) { super(); this.name = name; this.email = email; } public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getEmail() { return email; } public void setEmail(String email) { this.email = email; } @Override public String toString() { return "Customer [id=" + id + ", name=" + name + ", email=" + email + "]"; } } | cs |
- server.port=8090
- spring.rabbitmq.host=localhost
- spring.rabbitmq.port=5672
- spring.rabbitmq.username=yeoseong
- spring.rabbitmq.password=yeoseong
부트 프로젝트생성할 때 의존성 설정 부분에서 I/O>AMQP를 선택해준다.
'Middleware > Kafka&RabbitMQ' 카테고리의 다른 글
Kafka - Kafka Streams API(카프카 스트림즈) - 1 (2) | 2019.03.26 |
---|---|
Kafka - Kafka Consumer(카프카 컨슈머) Java&CLI (2) | 2019.03.24 |
Kafka - Kafka Producer(카프카 프로듀서) In Java&CLI (0) | 2019.03.16 |
Kafka - Kafka(카프카) cluster(클러스터) 구성 및 간단한 CLI사용법 (0) | 2019.03.13 |
Kafka - Kafka(카프카)의 동작 방식과 원리 (0) | 2019.03.12 |