2019. 2. 18. 00:39ㆍMiddleware/Kafka&RabbitMQ
Spring - Rabbitmq를 이용한 비동기 메시징 서비스
-리액티브 마이크로서비스
Mac OS 환경에서 작성되었습니다.
오늘은 간단히 Spring boot + Rabbitmq를 이용한 비동기 메시징 서비스를 구현해볼 것이다.
일단 이 포스팅을 진행하는 이유는 요즘 시대에는 일체형 애플리케이션이 작은 서비스 단위인 마이크로서비스 단위로 나누어
서비스 단위로 배포하는 아키텍쳐가 대세인듯하다.
이 말은 즉슨, 아주 큰 애플리케이션이 작은 서비스 단위(마이크로서비스)로 나뉘어 각각 단독적으로 독립적으로 실행가능한 상태로 배포가 된다.
이런 경우 마이크로서비스끼리의 통신은 RESTful한 통신도 있지만 메시지 큐와 같은 서비스를 이용하여 비동기적으로 통신하기도 한다.
그리고 이 구조를 발행구독구조라고 한다.
위의 사진과 같이 두 마이크로서비스 애플리케이션이 외부의 큐로 연결되는 구조인 것이다.
간단히 설명하면 Sender라는 하나의 애플리케이션에서 큐로 발신하면
Receiver라는 애플리케이션이 해당 큐에 대한 이벤트를 수신하여
로직을 처리하게 되는 것이다.
Rabbitmq를 다운로드한다.
▶︎▶︎▶︎래빗엠큐다운로드
Standalone MacOS binary를 클릭하여 받는다.
Rabbitmq 관리자 페이지 플러그인을 활성화하는 명령이다.
관리자 페이지에 들어갈 계정을 만들어준다.
만약 계정을 만드는 데 밑의 에러가 발생한다면
Error:
{:undef, [{:crypto, :hash, [:sha256, <<94, 223, 167, 31, 97, 108, 105, 118, 101>>], []}, {:rabbit_password, :hash, 2, [file: 'src/rabbit_password.erl', line: 34]}, {:rabbit_auth_backend_internal, :add_user_sans_validation, 3, [file: 'src/rabbit_auth_backend_internal.erl', line: 252]}, {:rpc, :"-handle_call_call/6-fun-0-", 5, [file: 'rpc.erl', line: 197]}]}
>brew install openssl 로 인스톨하면 된다.
Rabbitmq를 실행시켜 준다. 그리고 localhost:15672로 접속한 후에 방금 전에 만들었던 계정으로
로그인한다.
위의 사진에 Set permisstion 버튼을 클릭하여 해당 계정으로 큐에서 읽거나 쓰는 권한을 부여한다.
여기까지 간단한 Rabbitmq의 설정이 완료되었다. 이것보다 더 많은 설정 요소들이 존재하겠지만,
예제로 간단하게 이정도로만 설정한다.
참고: 관리자페이지 포트 : 15672
애플리케이션(amqp) :5672
클러스터링 : 25672
소스 예제 회원가입 이후에 외부 큐에 가입 메시지를 날리면(Sender) 수신애플리케이션에서 가입 회원의 이메일을 커맨드에 뿌려준다.(Receiver)
<Sender>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 | package org.rvslab.chapter3; import java.util.Optional; import javax.persistence.Entity; import javax.persistence.GeneratedValue; import javax.persistence.GenerationType; import javax.persistence.Id; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Lazy; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.repository.query.Param; import org.springframework.data.rest.core.annotation.RepositoryRestResource; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Mono; import springfox.documentation.swagger2.annotations.EnableSwagger2; @SpringBootApplication @EnableSwagger2 public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } @Bean CommandLineRunner init(CustomerRespository customerRepository) { return (evt) -> { customerRepository.save(new Customer("Adam","adam@boot.com")); customerRepository.save(new Customer("John","john@boot.com")); customerRepository.save(new Customer("Smith","smith@boot.com")); customerRepository.save(new Customer("Edgar","edgar@boot.com")); customerRepository.save(new Customer("Martin","martin@boot.com")); customerRepository.save(new Customer("Tom","tom@boot.com")); customerRepository.save(new Customer("Sean","sean@boot.com")); }; } } @RestController class CustomerController{ CustomerRegistrar customerRegistrar; @Autowired CustomerController(CustomerRegistrar customerRegistrar){ this.customerRegistrar = customerRegistrar; } @RequestMapping( path="/register", method = RequestMethod.POST) Mono<Customer> register(@RequestBody Customer customer){ return customerRegistrar.register(customer); } } @Component @Lazy class CustomerRegistrar { CustomerRespository customerRespository; Sender sender; @Autowired CustomerRegistrar(CustomerRespository customerRespository, Sender sender){ this.customerRespository = customerRespository; this.sender = sender; } public Mono<Customer> registerMono(Mono<Customer> monoCustomer){ monoCustomer.doOnNext(customer -> { if(customerRespository.findByName(customer.getName()).isPresent()) System.out.println("Duplicate Customer"); else { customerRespository.save(customer); //sender.send(customer.getEmail()); } }).subscribe(); return monoCustomer; } // ideally repository will return a Mono object public Mono<Customer> register(Customer customer){ if(customerRespository.findByName(customer.getName()).isPresent()) System.out.println("Duplicate Customer. No Action required"); else { customerRespository.save(customer); //외부 큐에게 메시지전송 sender.send(customer.getEmail()); System.out.println("Rabbitmq send :::: "+customer.toString()); } return Mono.just(customer); } } //외부 큐와 연결하기 위한 Sender를 빈으로 등록한다. //또 빈으로 Queue객체를 등록해준다. 해당 문자열로 Receiver쪽에서도 동일하게 받아야한다. //RabbitMessagingTemplate으로 외부 큐에게 메시지를 전송한다. @Component @Lazy class Sender { RabbitMessagingTemplate template; @Autowired Sender(RabbitMessagingTemplate template){ this.template = template; } @Bean Queue queue() { return new Queue("CustomerQ", false); } public void send(String message){ template.convertAndSend("CustomerQ", message); System.out.println("Ready to send message but suppressed "+ message); } } //repository does not support Reactive. Ideally this should use reactive repository @RepositoryRestResource @Lazy interface CustomerRespository extends JpaRepository <Customer,Long>{ Optional<Customer> findByName(@Param("name") String name); } //Entity class @Entity class Customer{ @Id @GeneratedValue(strategy = GenerationType.AUTO) private Long id; private String name; private String email; public Customer (){} public Customer(String name, String email) { super(); this.name = name; this.email = email; } public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getEmail() { return email; } public void setEmail(String email) { this.email = email; } @Override public String toString() { return "Customer [id=" + id + ", name=" + name + ", email=" + email + "]"; } } | cs |
- management.security.enabled=false
- spring.rabbitmq.host=localhost
- spring.rabbitmq.port=5672
- spring.rabbitmq.username=yeoseong
- spring.rabbitmq.password=yeoseong
<Receiver>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 | package org.rvslab.chapter3; import java.util.Optional; import javax.persistence.Entity; import javax.persistence.GeneratedValue; import javax.persistence.GenerationType; import javax.persistence.Id; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Lazy; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.repository.query.Param; import org.springframework.data.rest.core.annotation.RepositoryRestResource; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Mono; import springfox.documentation.swagger2.annotations.EnableSwagger2; @SpringBootApplication @EnableSwagger2 public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } @Bean CommandLineRunner init(CustomerRespository customerRepository) { return (evt) -> { customerRepository.save(new Customer("Adam","adam@boot.com")); customerRepository.save(new Customer("John","john@boot.com")); customerRepository.save(new Customer("Smith","smith@boot.com")); customerRepository.save(new Customer("Edgar","edgar@boot.com")); customerRepository.save(new Customer("Martin","martin@boot.com")); customerRepository.save(new Customer("Tom","tom@boot.com")); customerRepository.save(new Customer("Sean","sean@boot.com")); }; } } @RestController class CustomerController{ CustomerRegistrar customerRegistrar; @Autowired CustomerController(CustomerRegistrar customerRegistrar){ this.customerRegistrar = customerRegistrar; } @RequestMapping( path="/register", method = RequestMethod.POST) Mono<Customer> register(@RequestBody Customer customer){ return customerRegistrar.register(customer); } } @Component @Lazy class CustomerRegistrar { CustomerRespository customerRespository; Sender sender; @Autowired CustomerRegistrar(CustomerRespository customerRespository, Sender sender){ this.customerRespository = customerRespository; this.sender = sender; } public Mono<Customer> registerMono(Mono<Customer> monoCustomer){ monoCustomer.doOnNext(customer -> { if(customerRespository.findByName(customer.getName()).isPresent()) System.out.println("Duplicate Customer"); else { customerRespository.save(customer); //sender.send(customer.getEmail()); } }).subscribe(); return monoCustomer; } // ideally repository will return a Mono object public Mono<Customer> register(Customer customer){ if(customerRespository.findByName(customer.getName()).isPresent()) System.out.println("Duplicate Customer. No Action required"); else { customerRespository.save(customer); //외부 큐에게 메시지전송 sender.send(customer.getEmail()); System.out.println("Rabbitmq send :::: "+customer.toString()); } return Mono.just(customer); } } //외부 큐와 연결하기 위한 Sender를 빈으로 등록한다. //또 빈으로 Queue객체를 등록해준다. 해당 문자열로 Receiver쪽에서도 동일하게 받아야한다. //RabbitMessagingTemplate으로 외부 큐에게 메시지를 전송한다. @Component @Lazy class Sender { RabbitMessagingTemplate template; @Autowired Sender(RabbitMessagingTemplate template){ this.template = template; } @Bean Queue queue() { return new Queue("CustomerQ", false); } public void send(String message){ template.convertAndSend("CustomerQ", message); System.out.println("Ready to send message but suppressed "+ message); } } //repository does not support Reactive. Ideally this should use reactive repository @RepositoryRestResource @Lazy interface CustomerRespository extends JpaRepository <Customer,Long>{ Optional<Customer> findByName(@Param("name") String name); } //Entity class @Entity class Customer{ @Id @GeneratedValue(strategy = GenerationType.AUTO) private Long id; private String name; private String email; public Customer (){} public Customer(String name, String email) { super(); this.name = name; this.email = email; } public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getEmail() { return email; } public void setEmail(String email) { this.email = email; } @Override public String toString() { return "Customer [id=" + id + ", name=" + name + ", email=" + email + "]"; } } | cs |
- server.port=8090
- spring.rabbitmq.host=localhost
- spring.rabbitmq.port=5672
- spring.rabbitmq.username=yeoseong
- spring.rabbitmq.password=yeoseong
부트 프로젝트생성할 때 의존성 설정 부분에서 I/O>AMQP를 선택해준다.
'Middleware > Kafka&RabbitMQ' 카테고리의 다른 글
Kafka - Kafka Streams API(카프카 스트림즈) - 1 (2) | 2019.03.26 |
---|---|
Kafka - Kafka Consumer(카프카 컨슈머) Java&CLI (2) | 2019.03.24 |
Kafka - Kafka Producer(카프카 프로듀서) In Java&CLI (0) | 2019.03.16 |
Kafka - Kafka(카프카) cluster(클러스터) 구성 및 간단한 CLI사용법 (0) | 2019.03.13 |
Kafka - Kafka(카프카)의 동작 방식과 원리 (0) | 2019.03.12 |