Middleware/Kafka&RabbitMQ

Springboot - Rabbitmq를 이용한 비동기 메시징 서비스(리액티브 마이크로서비스)

여성게 2019. 2. 18. 00:39

Spring - Rabbitmq를 이용한 비동기 메시징 서비스

     -리액티브 마이크로서비스



RabbitMQ에 대해

Mac OS 환경에서 작성되었습니다.


오늘은 간단히 Spring boot + Rabbitmq를 이용한 비동기 메시징 서비스를 구현해볼 것이다. 

일단 이 포스팅을 진행하는 이유는 요즘 시대에는 일체형 애플리케이션이 작은 서비스 단위인 마이크로서비스 단위로 나누어 

서비스 단위로 배포하는 아키텍쳐가 대세인듯하다. 

이 말은 즉슨, 아주 큰 애플리케이션이 작은 서비스 단위(마이크로서비스)로 나뉘어 각각 단독적으로 독립적으로 실행가능한 상태로 배포가 된다. 

이런 경우 마이크로서비스끼리의 통신은 RESTful한 통신도 있지만 메시지 큐와 같은 서비스를 이용하여 비동기적으로 통신하기도 한다.

그리고 이 구조를 발행구독구조라고 한다.


위의 사진과 같이 두 마이크로서비스 애플리케이션이 외부의 큐로 연결되는 구조인 것이다.

간단히 설명하면 Sender라는 하나의 애플리케이션에서 큐로 발신하면 

Receiver라는 애플리케이션이 해당 큐에 대한 이벤트를 수신하여

로직을 처리하게 되는 것이다.





외부의 큐도 어찌됬는 하나의 데몬으로 떠있는 애플리케이션 같은 것이다.

Rabbitmq를 다운로드한다.


▶︎▶︎▶︎래빗엠큐다운로드



Standalone MacOS binary를 클릭하여 받는다.




Rabbitmq 관리자 페이지 플러그인을 활성화하는 명령이다.



관리자 페이지에 들어갈 계정을 만들어준다.


만약 계정을 만드는 데 밑의 에러가 발생한다면 


Error:

{:undef, [{:crypto, :hash, [:sha256, <<94, 223, 167, 31, 97, 108, 105, 118, 101>>], []}, {:rabbit_password, :hash, 2, [file: 'src/rabbit_password.erl', line: 34]}, {:rabbit_auth_backend_internal, :add_user_sans_validation, 3, [file: 'src/rabbit_auth_backend_internal.erl', line: 252]}, {:rpc, :"-handle_call_call/6-fun-0-", 5, [file: 'rpc.erl', line: 197]}]}


>brew install openssl 로 인스톨하면 된다.



Rabbitmq를 실행시켜 준다. 그리고 localhost:15672로 접속한 후에 방금 전에 만들었던 계정으로

로그인한다.




위의 사진에 Set permisstion 버튼을 클릭하여 해당 계정으로 큐에서 읽거나 쓰는 권한을 부여한다.


여기까지 간단한 Rabbitmq의 설정이 완료되었다. 이것보다 더 많은 설정 요소들이 존재하겠지만,

예제로 간단하게 이정도로만 설정한다.


참고: 관리자페이지 포트 : 15672

애플리케이션(amqp) :5672

클러스터링 : 25672





소스 예제 회원가입 이후에 외부 큐에 가입 메시지를 날리면(Sender) 수신애플리케이션에서 가입 회원의 이메일을 커맨드에 뿌려준다.(Receiver)



<Sender>


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
package org.rvslab.chapter3;
 
import java.util.Optional;
 
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
 
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.repository.query.Param;
import org.springframework.data.rest.core.annotation.RepositoryRestResource;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
 
import reactor.core.publisher.Mono;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
 
 
@SpringBootApplication
@EnableSwagger2
public class Application {
 
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
    
    
    @Bean
    CommandLineRunner init(CustomerRespository customerRepository) {
            return (evt) ->  {
                              customerRepository.save(new Customer("Adam","adam@boot.com"));
                              customerRepository.save(new Customer("John","john@boot.com"));
                              customerRepository.save(new Customer("Smith","smith@boot.com"));
                              customerRepository.save(new Customer("Edgar","edgar@boot.com"));
                              customerRepository.save(new Customer("Martin","martin@boot.com"));
                              customerRepository.save(new Customer("Tom","tom@boot.com"));
                              customerRepository.save(new Customer("Sean","sean@boot.com"));
            };
    }
    
 
}
 
 
@RestController
class CustomerController{
        
        CustomerRegistrar customerRegistrar;
        
        @Autowired
        CustomerController(CustomerRegistrar customerRegistrar){
                this.customerRegistrar = customerRegistrar;
        }
        
        @RequestMapping( path="/register", method = RequestMethod.POST)
        Mono<Customer> register(@RequestBody Customer customer){                
                return customerRegistrar.register(customer);     
        }
}
 
@Component
@Lazy
class CustomerRegistrar {
        
        CustomerRespository customerRespository;
        Sender sender;
        
        @Autowired
        CustomerRegistrar(CustomerRespository customerRespository, Sender sender){
                this.customerRespository = customerRespository;
                this.sender = sender;
        }
        
        
        public Mono<Customer> registerMono(Mono<Customer> monoCustomer){
                monoCustomer.doOnNext(customer -> {
                        if(customerRespository.findByName(customer.getName()).isPresent())
                                System.out.println("Duplicate Customer");
                        else {
                                customerRespository.save(customer);
                                //sender.send(customer.getEmail());             
                        }
                }).subscribe();
                return monoCustomer;
        }
 
        // ideally repository will return a Mono object
        public Mono<Customer> register(Customer customer){
                        if(customerRespository.findByName(customer.getName()).isPresent())
                                System.out.println("Duplicate Customer. No Action required");
                        else {
                                customerRespository.save(customer);
                                //외부 큐에게 메시지전송
                                sender.send(customer.getEmail());       
                                System.out.println("Rabbitmq send :::: "+customer.toString());
                        }
                return Mono.just(customer);
        }
}
 
 
//외부 큐와 연결하기 위한 Sender를 빈으로 등록한다.
//또 빈으로 Queue객체를 등록해준다. 해당 문자열로 Receiver쪽에서도 동일하게 받아야한다.
//RabbitMessagingTemplate으로 외부 큐에게 메시지를 전송한다.
@Component
@Lazy
class Sender {
        
        RabbitMessagingTemplate template;
        
        @Autowired
        Sender(RabbitMessagingTemplate template){
                this.template = template;
        }
 
        @Bean
        Queue queue() {
                return new Queue("CustomerQ"false);
        }
        
        public void send(String message){
                template.convertAndSend("CustomerQ", message);
                System.out.println("Ready to send message but suppressed "+ message);
                 
        }
}
 
//repository does not support Reactive. Ideally this should use reactive repository
@RepositoryRestResource
@Lazy
interface CustomerRespository extends JpaRepository <Customer,Long>{
        Optional<Customer> findByName(@Param("name"String name);
}
 
 
//Entity class
@Entity
class Customer{
        @Id
        @GeneratedValue(strategy = GenerationType.AUTO)
        private Long id;
        private String name;
        private String email;
        
        public Customer (){}
 
        
        public Customer(String name, String email) {
                super();
                this.name = name;
                this.email = email;
        }
 
 
        public Long getId() {
                return id;
        }
 
        public void setId(Long id) {
                this.id = id;
        }
 
        public String getName() {
                return name;
        }
 
        public void setName(String name) {
                this.name = name;
        }
 
        public String getEmail() {
                return email;
        }
 
        public void setEmail(String email) {
                this.email = email;
        }
 
        @Override
        public String toString() {
                return "Customer [id=" + id + ", name=" + name + ", email=" + email + "]";
        }
        
         
        
}
cs


  1. management.security.enabled=false
  2.  
  3. spring.rabbitmq.host=localhost
  4. spring.rabbitmq.port=5672
  5. spring.rabbitmq.username=yeoseong
  6. spring.rabbitmq.password=yeoseong


<Receiver>


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
package org.rvslab.chapter3;
 
import java.util.Optional;
 
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
 
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.repository.query.Param;
import org.springframework.data.rest.core.annotation.RepositoryRestResource;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
 
import reactor.core.publisher.Mono;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
 
 
@SpringBootApplication
@EnableSwagger2
public class Application {
 
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
    
    
    @Bean
    CommandLineRunner init(CustomerRespository customerRepository) {
            return (evt) ->  {
                              customerRepository.save(new Customer("Adam","adam@boot.com"));
                              customerRepository.save(new Customer("John","john@boot.com"));
                              customerRepository.save(new Customer("Smith","smith@boot.com"));
                              customerRepository.save(new Customer("Edgar","edgar@boot.com"));
                              customerRepository.save(new Customer("Martin","martin@boot.com"));
                              customerRepository.save(new Customer("Tom","tom@boot.com"));
                              customerRepository.save(new Customer("Sean","sean@boot.com"));
            };
    }
    
 
}
 
 
@RestController
class CustomerController{
        
        CustomerRegistrar customerRegistrar;
        
        @Autowired
        CustomerController(CustomerRegistrar customerRegistrar){
                this.customerRegistrar = customerRegistrar;
        }
        
        @RequestMapping( path="/register", method = RequestMethod.POST)
        Mono<Customer> register(@RequestBody Customer customer){                
                return customerRegistrar.register(customer);     
        }
}
 
@Component
@Lazy
class CustomerRegistrar {
        
        CustomerRespository customerRespository;
        Sender sender;
        
        @Autowired
        CustomerRegistrar(CustomerRespository customerRespository, Sender sender){
                this.customerRespository = customerRespository;
                this.sender = sender;
        }
        
        
        public Mono<Customer> registerMono(Mono<Customer> monoCustomer){
                monoCustomer.doOnNext(customer -> {
                        if(customerRespository.findByName(customer.getName()).isPresent())
                                System.out.println("Duplicate Customer");
                        else {
                                customerRespository.save(customer);
                                //sender.send(customer.getEmail());             
                        }
                }).subscribe();
                return monoCustomer;
        }
 
        // ideally repository will return a Mono object
        public Mono<Customer> register(Customer customer){
                        if(customerRespository.findByName(customer.getName()).isPresent())
                                System.out.println("Duplicate Customer. No Action required");
                        else {
                                customerRespository.save(customer);
                                //외부 큐에게 메시지전송
                                sender.send(customer.getEmail());       
                                System.out.println("Rabbitmq send :::: "+customer.toString());
                        }
                return Mono.just(customer);
        }
}
 
 
//외부 큐와 연결하기 위한 Sender를 빈으로 등록한다.
//또 빈으로 Queue객체를 등록해준다. 해당 문자열로 Receiver쪽에서도 동일하게 받아야한다.
//RabbitMessagingTemplate으로 외부 큐에게 메시지를 전송한다.
@Component
@Lazy
class Sender {
        
        RabbitMessagingTemplate template;
        
        @Autowired
        Sender(RabbitMessagingTemplate template){
                this.template = template;
        }
 
        @Bean
        Queue queue() {
                return new Queue("CustomerQ"false);
        }
        
        public void send(String message){
                template.convertAndSend("CustomerQ", message);
                System.out.println("Ready to send message but suppressed "+ message);
                 
        }
}
 
//repository does not support Reactive. Ideally this should use reactive repository
@RepositoryRestResource
@Lazy
interface CustomerRespository extends JpaRepository <Customer,Long>{
        Optional<Customer> findByName(@Param("name"String name);
}
 
 
//Entity class
@Entity
class Customer{
        @Id
        @GeneratedValue(strategy = GenerationType.AUTO)
        private Long id;
        private String name;
        private String email;
        
        public Customer (){}
 
        
        public Customer(String name, String email) {
                super();
                this.name = name;
                this.email = email;
        }
 
 
        public Long getId() {
                return id;
        }
 
        public void setId(Long id) {
                this.id = id;
        }
 
        public String getName() {
                return name;
        }
 
        public void setName(String name) {
                this.name = name;
        }
 
        public String getEmail() {
                return email;
        }
 
        public void setEmail(String email) {
                this.email = email;
        }
 
        @Override
        public String toString() {
                return "Customer [id=" + id + ", name=" + name + ", email=" + email + "]";
        }
        
         
        
}
cs


  1. server.port=8090
  2.  
  3. spring.rabbitmq.host=localhost
  4. spring.rabbitmq.port=5672
  5. spring.rabbitmq.username=yeoseong
  6. spring.rabbitmq.password=yeoseong
  7.  
  8. spring.mail.host=localhost
  9. spring.mail.port=2525


부트 프로젝트생성할 때 의존성 설정 부분에서 I/O>AMQP를 선택해준다.