Springboot - Rabbitmq를 이용한 비동기 메시징 서비스(리액티브 마이크로서비스)
Spring - Rabbitmq를 이용한 비동기 메시징 서비스
-리액티브 마이크로서비스
Mac OS 환경에서 작성되었습니다.
오늘은 간단히 Spring boot + Rabbitmq를 이용한 비동기 메시징 서비스를 구현해볼 것이다.
일단 이 포스팅을 진행하는 이유는 요즘 시대에는 일체형 애플리케이션이 작은 서비스 단위인 마이크로서비스 단위로 나누어
서비스 단위로 배포하는 아키텍쳐가 대세인듯하다.
이 말은 즉슨, 아주 큰 애플리케이션이 작은 서비스 단위(마이크로서비스)로 나뉘어 각각 단독적으로 독립적으로 실행가능한 상태로 배포가 된다.
이런 경우 마이크로서비스끼리의 통신은 RESTful한 통신도 있지만 메시지 큐와 같은 서비스를 이용하여 비동기적으로 통신하기도 한다.
그리고 이 구조를 발행구독구조라고 한다.
위의 사진과 같이 두 마이크로서비스 애플리케이션이 외부의 큐로 연결되는 구조인 것이다.
간단히 설명하면 Sender라는 하나의 애플리케이션에서 큐로 발신하면
Receiver라는 애플리케이션이 해당 큐에 대한 이벤트를 수신하여
로직을 처리하게 되는 것이다.
Rabbitmq를 다운로드한다.
▶︎▶︎▶︎래빗엠큐다운로드
Standalone MacOS binary를 클릭하여 받는다.
Rabbitmq 관리자 페이지 플러그인을 활성화하는 명령이다.
관리자 페이지에 들어갈 계정을 만들어준다.
만약 계정을 만드는 데 밑의 에러가 발생한다면
Error:
{:undef, [{:crypto, :hash, [:sha256, <<94, 223, 167, 31, 97, 108, 105, 118, 101>>], []}, {:rabbit_password, :hash, 2, [file: 'src/rabbit_password.erl', line: 34]}, {:rabbit_auth_backend_internal, :add_user_sans_validation, 3, [file: 'src/rabbit_auth_backend_internal.erl', line: 252]}, {:rpc, :"-handle_call_call/6-fun-0-", 5, [file: 'rpc.erl', line: 197]}]}
>brew install openssl 로 인스톨하면 된다.
Rabbitmq를 실행시켜 준다. 그리고 localhost:15672로 접속한 후에 방금 전에 만들었던 계정으로
로그인한다.
위의 사진에 Set permisstion 버튼을 클릭하여 해당 계정으로 큐에서 읽거나 쓰는 권한을 부여한다.
여기까지 간단한 Rabbitmq의 설정이 완료되었다. 이것보다 더 많은 설정 요소들이 존재하겠지만,
예제로 간단하게 이정도로만 설정한다.
참고: 관리자페이지 포트 : 15672
애플리케이션(amqp) :5672
클러스터링 : 25672
소스 예제 회원가입 이후에 외부 큐에 가입 메시지를 날리면(Sender) 수신애플리케이션에서 가입 회원의 이메일을 커맨드에 뿌려준다.(Receiver)
<Sender>
| package org.rvslab.chapter3; import java.util.Optional; import javax.persistence.Entity; import javax.persistence.GeneratedValue; import javax.persistence.GenerationType; import javax.persistence.Id; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Lazy; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.repository.query.Param; import org.springframework.data.rest.core.annotation.RepositoryRestResource; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Mono; import springfox.documentation.swagger2.annotations.EnableSwagger2; @SpringBootApplication @EnableSwagger2 public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } @Bean CommandLineRunner init(CustomerRespository customerRepository) { return (evt) -> { customerRepository.save(new Customer("Adam","adam@boot.com")); customerRepository.save(new Customer("John","john@boot.com")); customerRepository.save(new Customer("Smith","smith@boot.com")); customerRepository.save(new Customer("Edgar","edgar@boot.com")); customerRepository.save(new Customer("Martin","martin@boot.com")); customerRepository.save(new Customer("Tom","tom@boot.com")); customerRepository.save(new Customer("Sean","sean@boot.com")); }; } } @RestController class CustomerController{ CustomerRegistrar customerRegistrar; @Autowired CustomerController(CustomerRegistrar customerRegistrar){ this.customerRegistrar = customerRegistrar; } @RequestMapping( path="/register", method = RequestMethod.POST) Mono<Customer> register(@RequestBody Customer customer){ return customerRegistrar.register(customer); } } @Component @Lazy class CustomerRegistrar { CustomerRespository customerRespository; Sender sender; @Autowired CustomerRegistrar(CustomerRespository customerRespository, Sender sender){ this.customerRespository = customerRespository; this.sender = sender; } public Mono<Customer> registerMono(Mono<Customer> monoCustomer){ monoCustomer.doOnNext(customer -> { if(customerRespository.findByName(customer.getName()).isPresent()) System.out.println("Duplicate Customer"); else { customerRespository.save(customer); //sender.send(customer.getEmail()); } }).subscribe(); return monoCustomer; } // ideally repository will return a Mono object public Mono<Customer> register(Customer customer){ if(customerRespository.findByName(customer.getName()).isPresent()) System.out.println("Duplicate Customer. No Action required"); else { customerRespository.save(customer); //외부 큐에게 메시지전송 sender.send(customer.getEmail()); System.out.println("Rabbitmq send :::: "+customer.toString()); } return Mono.just(customer); } } //외부 큐와 연결하기 위한 Sender를 빈으로 등록한다. //또 빈으로 Queue객체를 등록해준다. 해당 문자열로 Receiver쪽에서도 동일하게 받아야한다. //RabbitMessagingTemplate으로 외부 큐에게 메시지를 전송한다. @Component @Lazy class Sender { RabbitMessagingTemplate template; @Autowired Sender(RabbitMessagingTemplate template){ this.template = template; } @Bean Queue queue() { return new Queue("CustomerQ", false); } public void send(String message){ template.convertAndSend("CustomerQ", message); System.out.println("Ready to send message but suppressed "+ message); } } //repository does not support Reactive. Ideally this should use reactive repository @RepositoryRestResource @Lazy interface CustomerRespository extends JpaRepository <Customer,Long>{ Optional<Customer> findByName(@Param("name") String name); } //Entity class @Entity class Customer{ @Id @GeneratedValue(strategy = GenerationType.AUTO) private Long id; private String name; private String email; public Customer (){} public Customer(String name, String email) { super(); this.name = name; this.email = email; } public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getEmail() { return email; } public void setEmail(String email) { this.email = email; } @Override public String toString() { return "Customer [id=" + id + ", name=" + name + ", email=" + email + "]"; } } | cs |
- management.security.enabled=false
- spring.rabbitmq.host=localhost
- spring.rabbitmq.port=5672
- spring.rabbitmq.username=yeoseong
- spring.rabbitmq.password=yeoseong
<Receiver>
| package org.rvslab.chapter3; import java.util.Optional; import javax.persistence.Entity; import javax.persistence.GeneratedValue; import javax.persistence.GenerationType; import javax.persistence.Id; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Lazy; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.repository.query.Param; import org.springframework.data.rest.core.annotation.RepositoryRestResource; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Mono; import springfox.documentation.swagger2.annotations.EnableSwagger2; @SpringBootApplication @EnableSwagger2 public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } @Bean CommandLineRunner init(CustomerRespository customerRepository) { return (evt) -> { customerRepository.save(new Customer("Adam","adam@boot.com")); customerRepository.save(new Customer("John","john@boot.com")); customerRepository.save(new Customer("Smith","smith@boot.com")); customerRepository.save(new Customer("Edgar","edgar@boot.com")); customerRepository.save(new Customer("Martin","martin@boot.com")); customerRepository.save(new Customer("Tom","tom@boot.com")); customerRepository.save(new Customer("Sean","sean@boot.com")); }; } } @RestController class CustomerController{ CustomerRegistrar customerRegistrar; @Autowired CustomerController(CustomerRegistrar customerRegistrar){ this.customerRegistrar = customerRegistrar; } @RequestMapping( path="/register", method = RequestMethod.POST) Mono<Customer> register(@RequestBody Customer customer){ return customerRegistrar.register(customer); } } @Component @Lazy class CustomerRegistrar { CustomerRespository customerRespository; Sender sender; @Autowired CustomerRegistrar(CustomerRespository customerRespository, Sender sender){ this.customerRespository = customerRespository; this.sender = sender; } public Mono<Customer> registerMono(Mono<Customer> monoCustomer){ monoCustomer.doOnNext(customer -> { if(customerRespository.findByName(customer.getName()).isPresent()) System.out.println("Duplicate Customer"); else { customerRespository.save(customer); //sender.send(customer.getEmail()); } }).subscribe(); return monoCustomer; } // ideally repository will return a Mono object public Mono<Customer> register(Customer customer){ if(customerRespository.findByName(customer.getName()).isPresent()) System.out.println("Duplicate Customer. No Action required"); else { customerRespository.save(customer); //외부 큐에게 메시지전송 sender.send(customer.getEmail()); System.out.println("Rabbitmq send :::: "+customer.toString()); } return Mono.just(customer); } } //외부 큐와 연결하기 위한 Sender를 빈으로 등록한다. //또 빈으로 Queue객체를 등록해준다. 해당 문자열로 Receiver쪽에서도 동일하게 받아야한다. //RabbitMessagingTemplate으로 외부 큐에게 메시지를 전송한다. @Component @Lazy class Sender { RabbitMessagingTemplate template; @Autowired Sender(RabbitMessagingTemplate template){ this.template = template; } @Bean Queue queue() { return new Queue("CustomerQ", false); } public void send(String message){ template.convertAndSend("CustomerQ", message); System.out.println("Ready to send message but suppressed "+ message); } } //repository does not support Reactive. Ideally this should use reactive repository @RepositoryRestResource @Lazy interface CustomerRespository extends JpaRepository <Customer,Long>{ Optional<Customer> findByName(@Param("name") String name); } //Entity class @Entity class Customer{ @Id @GeneratedValue(strategy = GenerationType.AUTO) private Long id; private String name; private String email; public Customer (){} public Customer(String name, String email) { super(); this.name = name; this.email = email; } public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getEmail() { return email; } public void setEmail(String email) { this.email = email; } @Override public String toString() { return "Customer [id=" + id + ", name=" + name + ", email=" + email + "]"; } } | cs |
- server.port=8090
- spring.rabbitmq.host=localhost
- spring.rabbitmq.port=5672
- spring.rabbitmq.username=yeoseong
- spring.rabbitmq.password=yeoseong
부트 프로젝트생성할 때 의존성 설정 부분에서 I/O>AMQP를 선택해준다.