Middleware/Kafka&RabbitMQ 2020. 7. 26. 17:32

 

 

yoonyeoseong/rabbitmq-sample

Contribute to yoonyeoseong/rabbitmq-sample development by creating an account on GitHub.

github.com

 

오늘 포스팅할 내용은 래빗엠큐이다. 그 동안에는 카프카를 사용할 일이 많아 카프카에 대한 포스팅이 주였는데, 이번에 래빗엠큐를 사용할 일이 생겨 간단히 래빗엠큐에 대해 간단히 다루어 볼것이다.(예제 코드는 위 깃헙에 올려놓았습니다.)

 

비동기 작업에 있어 큐를 사용하려면 중간에 메시지 브로커라는 개념이 존재하는데, 이러한 메시지 브로커에는 RabbitMQ, Kafka 등이 대표적으로 있다. 해당 포스트에서는 표준 MQ프로토콜인 AMQP를 구현한 RabbitMQ(래빗엠큐)에 대해 다루어볼 것이다.

 

간단하게 메시지큐는 아래 그림과 같은 워크 플로우로 이루어져있다. 

 

 

대부분의 메시지큐는 프로듀서가 있고, 해당 프로듀서가 브로커로 메시지를 발행하면, 적절한 컨슈머가 해당 메시지를 구독(읽다)하는 구조이다. 그렇다면 래빗엠큐는 상세하게 어떠한 구조로 되어있을까?

 

래빗엠큐는 단순히 프로듀서가 브로커로 메시지를 전달하여 컨슈머가 큐를 읽어가는 구조라는 면에서는 동일하지만, 프로듀싱하는 과정에서 조금 더 복잡한 개념이 들어간다. 바로 exchange와 route, queue라는 개념이다.

 

간단하게 워크 플로우를 설명하자면 아래와 같다.

 

  1. Producer는 Message를 Exchange에게 보내게 됩니다.
    1. Exchange를 생성할때 Exchange의 Type을 정해야 합니다.
  2. Exchange는 Routing Key를 사용하여 적절한 Queue로 Routing을 진행합니다.
    1. Routing은 Exchange Type에 따라 전략이 바뀌게 됩니다.
  3. Exchange - Queue와 Binding이 완료된 모습을 볼 수 있습니다.
    1. Message 속성에 따라 적절한 Queue로 Routing이 됩니다.
  4. Message는 Consumer가 소비할때까지 Queue에 대기하게 됩니다.
  5. Consumer는 Message를 소비하게 됩니다.

 

위에서 1번에 Exchange라는 개념이 등장하는데, Exchange는 정해진 규칙으로 메시지를 라우팅하는 기능을 가지고 있다. 여기서 정해진 규칙은 크게 4가지가 존재하는데, 규칙들은 아래와 같다.

 

exchange routeing 전략

  • Direct Exchange
    • Message의 Routing Key와 정확히 일치하는 Binding된 Queue로 Routing
  • Fanout Exchange
    • Binding된 모든 Queue에 Message를 Routing
  • Topic Exchange
    • 특정 Routing Pattern이 일치하는 Queue로 Routing
  • Headers Exchange
    • key-value로 정의된 Header 속성을 통한 Routing

 

 

 

익스체인지는 위와 같은 전략으로 메시지를 라우팅하게 된다. 그리고 자주 사용되는 Exchange의 옵션으로는 아래와 같이 있다.

 

exchange options
  • Durability
    • 브로커가 재시작 될 때 남아 있는지 여부
    • durable -> 재시작해도 유지가능
    • transient -> 재시작하면 사라집니다.
  • Auto-delete
    • 마지막 Queue 연결이 해제되면 삭제

마지막으로 래빗엠큐에서 사용되는 용어가 있다.

 

  • Vhost(virutal host)
    • Virtual Host를 통해서 하나의 RabbitMQ 인스턴스 안에 사용하고 있는 Application을 분리할 수 있습니다.
  • Connection
    • 물리적인 TCP Connection, HTTPS -> TLS(SSL) Connection을 사용
  • Channel
    • 하나의 물리적인 Connection 내에 생성되는 가상의 Connection
    • Consumer의 process나 thread는 각자 Channel을 통해 Queue에 연결 될 수 있습니다.

 

여기까지 간단하게 래빗엠큐의 동작과 용어에 대해서 다루어봤으니 실제 코드 레벨로 실습을 진행해본다.

 

docker rabbitmq 설치
sudo docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 \
	--restart=unless-stopped -e RABBITMQ_DEFAULT_USER=username \
    -e RABBITMQ_DEFAULT_PASS=password rabbitmq:management

 

위처럼 도커로 래빗엠큐를 설치해주고, http://localhost:15672(username/password)로 접속해보자.

 

springboot rabbitmq 예제

build.gradle

plugins {
    id 'org.springframework.boot' version '2.3.2.RELEASE'
    id 'io.spring.dependency-management' version '1.0.9.RELEASE'
    id 'java'
}

group = 'com.levi'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '14'

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }
}

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-amqp'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
    annotationProcessor "org.springframework.boot:spring-boot-configuration-processor"
    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
    testImplementation 'org.springframework.amqp:spring-rabbit-test'
}

test {
    useJUnitPlatform()
}

 

application.yml

rabbitmq:
  test:
    username: username
    password: password
    host: localhost
    port: 5672
    virtualHost: levi.vhost
    routeKey: test.route
    exchangeName: test.exchange
    queueName: testQueue
    deadLetterExchange: dead.letter.exchange
    deadLetterRouteKey: dead.letter.route

 

application.yml에서 프로퍼티를 읽어오는 클래스를 아래 하나 만들었다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Configuration
@ConfigurationProperties(prefix = "rabbitmq")
@Getter
@Setter
public class RabbitMqProperty {
    private RabbitMqDetailProperty test;
 
    @Getter
    @Setter
    public static class RabbitMqDetailProperty {
        private String username;
        private String password;
        private String host;
        private int port;
        private String virtualHost;
        private String routeKey;
        private String exchangeName;
        private String queueName;
        private String deadLetterExchange;
        private String deadLetterRouteKey;
    }
}
cs

 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@SpringBootApplication(exclude = {
        RabbitAutoConfiguration.class
})
@EnableRabbit
@RestController
@RequiredArgsConstructor
public class RabbitmqApplication {
    private final RabbitTemplate testTemplate;
 
    public static void main(String[] args) {
        SpringApplication.run(RabbitmqApplication.class, args);
    }
 
    @GetMapping("/")
    public String inQueue() {
        testTemplate.convertAndSend(SampleMessage.of("message!!"));
        return "success";
    }
 
    @Data
    @AllArgsConstructor(staticName = "of")
    @NoArgsConstructor
    public static class SampleMessage {
        private String message;
    }
}
cs

 

래빗엠큐 auto configuration을 꺼주었고, 메시지를 인큐하기 위한 컨트롤러를 하나 만들어 주었다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Slf4j
@Configuration
@RequiredArgsConstructor
public class RabbitMqConfig {
    private final RabbitMqProperty rabbitMqProperty;
 
    @Bean
    public ConnectionFactory testConnectionFactory() {
        final RabbitMqProperty.RabbitMqDetailProperty test = rabbitMqProperty.getTest();
        final CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setHost(test.getHost());
        factory.setPort(test.getPort());
        factory.setUsername(test.getUsername());
        factory.setPassword(test.getPassword());
        factory.setVirtualHost(test.getVirtualHost());
 
        return factory;
    }
 
    @Bean
    public SimpleRabbitListenerContainerFactory testContainer(final ConnectionFactory testConnectionFactory) {
        final SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(testConnectionFactory);
        factory.setConnectionFactory(testConnectionFactory);
        factory.setErrorHandler(t -> log.error("ErrorHandler = {}", t.getMessage()));
        factory.setDefaultRequeueRejected(false); //true일 경우 리스너에서 예외가 발생시 다시 큐에 메시지가 쌓이게 된다.(예외상황 해결안될 시 무한루프)
        factory.setMessageConverter(jacksonConverter());
        //예외 발생시 recover할 수 있는 옵션, 위 에러 핸들러와 잘 구분해서 사용해야할듯
//위 핸들러와 적용 순서등도 테스트가 필요(혹은 둘다 동시에 적용되나?)
//        factory.setAdviceChain(
//                RetryInterceptorBuilder
//                .stateless()
//                .maxAttempts(3) //최대 재시도 횟수
//                .recoverer() //recover handler
//                .backOffOptions(2000, 4, 10000) //2초 간격으로, 4번, 최대 10초 내에 재시도
//                .build()
//        );
        return factory;
    }
 
    @Bean
    public RabbitTemplate testTemplate(final ConnectionFactory testConnectionFactory, final MessageConverter jacksonConverter) {
        final RabbitMqProperty.RabbitMqDetailProperty property = rabbitMqProperty.getTest();
        final RabbitTemplate template = new RabbitTemplate();
        template.setConnectionFactory(testConnectionFactory);
        template.setMessageConverter(jacksonConverter);
        template.setExchange(property.getExchangeName());
        template.setRoutingKey(property.getRouteKey());
        return template;
    }
 
    @Bean
    public MessageConverter jacksonConverter() {
        return new Jackson2JsonMessageConverter();
    }

//app에서 큐만들거나 바인딩 하기 위해서는 RabbitAdmin 필요
    @Bean
    public RabbitAdmin testAdmin(final ConnectionFactory testConnectionFactory) {
        return new RabbitAdmin(testConnectionFactory);
    }
}
cs

 

브로커 연결 및 메시지를 프로듀싱하기 위한 RabbitTemplate 설정이다. 이중 RabbitAdmin 빈을 만들고 있는 수동이 아니라, 앱에서 큐를 만들고 바인딩 하기 위해서는 RabbitAdmin이 빈으로 떠있어야한다. 기타 설정은 주석을 참고하자 !

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@Configuration
@RequiredArgsConstructor
public class RabbitMqBindingConfig {
    private final RabbitMqProperty rabbitMqProperty;
 
    @Bean
    public TopicExchange testTopicExchange() {
        return ExchangeBuilder
                .topicExchange(rabbitMqProperty.getTest().getExchangeName())
                .durable(true)
                .build();
    }
 
    @Bean
    public TopicExchange dlExchange() {
        return ExchangeBuilder
                .topicExchange(rabbitMqProperty.getTest().getDeadLetterExchange())
                .durable(true)
                .build();
    }
 
    @Bean
    public Queue testQueue() {
        final RabbitMqProperty.RabbitMqDetailProperty testDetail = rabbitMqProperty.getTest();
        return QueueBuilder
                .durable(testDetail.getQueueName())
                .deadLetterExchange(testDetail.getDeadLetterExchange())
                .deadLetterRoutingKey(testDetail.getDeadLetterRouteKey())
                .build();
    }
 
    @Bean
    public Queue dlq() {
        return QueueBuilder
                .durable("DEAD_LETTER_QUEUE")
                .build();
    }
 
    @Bean
    public Binding testBinding(final Queue testQueue, final TopicExchange testTopicExchange) {
        return BindingBuilder
                .bind(testQueue)
                .to(testTopicExchange)
                .with(rabbitMqProperty.getTest().getRouteKey());
    }
 
    @Bean
    public Binding dlBinding(final Queue dlq, final TopicExchange dlExchange) {
        return BindingBuilder
                .bind(dlq)
                .to(dlExchange)
                .with(rabbitMqProperty.getTest().getDeadLetterRouteKey());
    }
}
cs

 

 

프로듀싱과 컨슈밍을 위해 브로커에 exchange와 queue를 binding하는 설정이다. 하나 추가적으로는 컨슈밍에서 예외가 발생하였을 때, Recover를 위한 Dead Letter Queue를 선언하였다.

 

이제 앱을 실행시키고, localhost:8080으로 요청을 보내보자 ! 요청을 보내면 메시지를 프로듀싱 할것이고, 해당 메시지를 컨슈밍해서 로그를 찍게 될 것이다. 여기까지 간단하게 래빗엠큐에 대해 다루어봤다.

 

 

 

https://jonnung.dev/rabbitmq/2019/02/06/about-amqp-implementtation-of-rabbitmq/

 

조은우 개발 블로그

 

jonnung.dev

https://velog.io/@hellozin/Spring-Boot%EC%99%80-RabbitMQ-%EC%B4%88%EA%B0%84%EB%8B%A8-%EC%84%A4%EB%AA%85%EC%84%9C

 

Spring Boot와 RabbitMQ 초간단 설명서

이번 포스트에서는 Spring boot 프로젝트에서 RabbitMQ를 사용하는 간단한 방법을 알아보겠습니다. Consumer 코드와 Producer 코드는 GitHub에 있습니다. 먼저 RabbitMQ 서버를 실행해야 하는데 Docker를 사용하�

velog.io

https://nesoy.github.io/articles/2019-02/RabbitMQ

 

RabbitMQ에 대해

 

nesoy.github.io

 

posted by 여성게
:
Middleware/Kafka&RabbitMQ 2019. 2. 18. 00:39

Spring - Rabbitmq를 이용한 비동기 메시징 서비스

     -리액티브 마이크로서비스



RabbitMQ에 대해

Mac OS 환경에서 작성되었습니다.


오늘은 간단히 Spring boot + Rabbitmq를 이용한 비동기 메시징 서비스를 구현해볼 것이다. 

일단 이 포스팅을 진행하는 이유는 요즘 시대에는 일체형 애플리케이션이 작은 서비스 단위인 마이크로서비스 단위로 나누어 

서비스 단위로 배포하는 아키텍쳐가 대세인듯하다. 

이 말은 즉슨, 아주 큰 애플리케이션이 작은 서비스 단위(마이크로서비스)로 나뉘어 각각 단독적으로 독립적으로 실행가능한 상태로 배포가 된다. 

이런 경우 마이크로서비스끼리의 통신은 RESTful한 통신도 있지만 메시지 큐와 같은 서비스를 이용하여 비동기적으로 통신하기도 한다.

그리고 이 구조를 발행구독구조라고 한다.


위의 사진과 같이 두 마이크로서비스 애플리케이션이 외부의 큐로 연결되는 구조인 것이다.

간단히 설명하면 Sender라는 하나의 애플리케이션에서 큐로 발신하면 

Receiver라는 애플리케이션이 해당 큐에 대한 이벤트를 수신하여

로직을 처리하게 되는 것이다.





외부의 큐도 어찌됬는 하나의 데몬으로 떠있는 애플리케이션 같은 것이다.

Rabbitmq를 다운로드한다.


▶︎▶︎▶︎래빗엠큐다운로드



Standalone MacOS binary를 클릭하여 받는다.




Rabbitmq 관리자 페이지 플러그인을 활성화하는 명령이다.



관리자 페이지에 들어갈 계정을 만들어준다.


만약 계정을 만드는 데 밑의 에러가 발생한다면 


Error:

{:undef, [{:crypto, :hash, [:sha256, <<94, 223, 167, 31, 97, 108, 105, 118, 101>>], []}, {:rabbit_password, :hash, 2, [file: 'src/rabbit_password.erl', line: 34]}, {:rabbit_auth_backend_internal, :add_user_sans_validation, 3, [file: 'src/rabbit_auth_backend_internal.erl', line: 252]}, {:rpc, :"-handle_call_call/6-fun-0-", 5, [file: 'rpc.erl', line: 197]}]}


>brew install openssl 로 인스톨하면 된다.



Rabbitmq를 실행시켜 준다. 그리고 localhost:15672로 접속한 후에 방금 전에 만들었던 계정으로

로그인한다.




위의 사진에 Set permisstion 버튼을 클릭하여 해당 계정으로 큐에서 읽거나 쓰는 권한을 부여한다.


여기까지 간단한 Rabbitmq의 설정이 완료되었다. 이것보다 더 많은 설정 요소들이 존재하겠지만,

예제로 간단하게 이정도로만 설정한다.


참고: 관리자페이지 포트 : 15672

애플리케이션(amqp) :5672

클러스터링 : 25672





소스 예제 회원가입 이후에 외부 큐에 가입 메시지를 날리면(Sender) 수신애플리케이션에서 가입 회원의 이메일을 커맨드에 뿌려준다.(Receiver)



<Sender>


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
package org.rvslab.chapter3;
 
import java.util.Optional;
 
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
 
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.repository.query.Param;
import org.springframework.data.rest.core.annotation.RepositoryRestResource;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
 
import reactor.core.publisher.Mono;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
 
 
@SpringBootApplication
@EnableSwagger2
public class Application {
 
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
    
    
    @Bean
    CommandLineRunner init(CustomerRespository customerRepository) {
            return (evt) ->  {
                              customerRepository.save(new Customer("Adam","adam@boot.com"));
                              customerRepository.save(new Customer("John","john@boot.com"));
                              customerRepository.save(new Customer("Smith","smith@boot.com"));
                              customerRepository.save(new Customer("Edgar","edgar@boot.com"));
                              customerRepository.save(new Customer("Martin","martin@boot.com"));
                              customerRepository.save(new Customer("Tom","tom@boot.com"));
                              customerRepository.save(new Customer("Sean","sean@boot.com"));
            };
    }
    
 
}
 
 
@RestController
class CustomerController{
        
        CustomerRegistrar customerRegistrar;
        
        @Autowired
        CustomerController(CustomerRegistrar customerRegistrar){
                this.customerRegistrar = customerRegistrar;
        }
        
        @RequestMapping( path="/register", method = RequestMethod.POST)
        Mono<Customer> register(@RequestBody Customer customer){                
                return customerRegistrar.register(customer);     
        }
}
 
@Component
@Lazy
class CustomerRegistrar {
        
        CustomerRespository customerRespository;
        Sender sender;
        
        @Autowired
        CustomerRegistrar(CustomerRespository customerRespository, Sender sender){
                this.customerRespository = customerRespository;
                this.sender = sender;
        }
        
        
        public Mono<Customer> registerMono(Mono<Customer> monoCustomer){
                monoCustomer.doOnNext(customer -> {
                        if(customerRespository.findByName(customer.getName()).isPresent())
                                System.out.println("Duplicate Customer");
                        else {
                                customerRespository.save(customer);
                                //sender.send(customer.getEmail());             
                        }
                }).subscribe();
                return monoCustomer;
        }
 
        // ideally repository will return a Mono object
        public Mono<Customer> register(Customer customer){
                        if(customerRespository.findByName(customer.getName()).isPresent())
                                System.out.println("Duplicate Customer. No Action required");
                        else {
                                customerRespository.save(customer);
                                //외부 큐에게 메시지전송
                                sender.send(customer.getEmail());       
                                System.out.println("Rabbitmq send :::: "+customer.toString());
                        }
                return Mono.just(customer);
        }
}
 
 
//외부 큐와 연결하기 위한 Sender를 빈으로 등록한다.
//또 빈으로 Queue객체를 등록해준다. 해당 문자열로 Receiver쪽에서도 동일하게 받아야한다.
//RabbitMessagingTemplate으로 외부 큐에게 메시지를 전송한다.
@Component
@Lazy
class Sender {
        
        RabbitMessagingTemplate template;
        
        @Autowired
        Sender(RabbitMessagingTemplate template){
                this.template = template;
        }
 
        @Bean
        Queue queue() {
                return new Queue("CustomerQ"false);
        }
        
        public void send(String message){
                template.convertAndSend("CustomerQ", message);
                System.out.println("Ready to send message but suppressed "+ message);
                 
        }
}
 
//repository does not support Reactive. Ideally this should use reactive repository
@RepositoryRestResource
@Lazy
interface CustomerRespository extends JpaRepository <Customer,Long>{
        Optional<Customer> findByName(@Param("name"String name);
}
 
 
//Entity class
@Entity
class Customer{
        @Id
        @GeneratedValue(strategy = GenerationType.AUTO)
        private Long id;
        private String name;
        private String email;
        
        public Customer (){}
 
        
        public Customer(String name, String email) {
                super();
                this.name = name;
                this.email = email;
        }
 
 
        public Long getId() {
                return id;
        }
 
        public void setId(Long id) {
                this.id = id;
        }
 
        public String getName() {
                return name;
        }
 
        public void setName(String name) {
                this.name = name;
        }
 
        public String getEmail() {
                return email;
        }
 
        public void setEmail(String email) {
                this.email = email;
        }
 
        @Override
        public String toString() {
                return "Customer [id=" + id + ", name=" + name + ", email=" + email + "]";
        }
        
         
        
}
cs


  1. management.security.enabled=false
  2.  
  3. spring.rabbitmq.host=localhost
  4. spring.rabbitmq.port=5672
  5. spring.rabbitmq.username=yeoseong
  6. spring.rabbitmq.password=yeoseong


<Receiver>


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
package org.rvslab.chapter3;
 
import java.util.Optional;
 
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
 
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.repository.query.Param;
import org.springframework.data.rest.core.annotation.RepositoryRestResource;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
 
import reactor.core.publisher.Mono;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
 
 
@SpringBootApplication
@EnableSwagger2
public class Application {
 
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
    
    
    @Bean
    CommandLineRunner init(CustomerRespository customerRepository) {
            return (evt) ->  {
                              customerRepository.save(new Customer("Adam","adam@boot.com"));
                              customerRepository.save(new Customer("John","john@boot.com"));
                              customerRepository.save(new Customer("Smith","smith@boot.com"));
                              customerRepository.save(new Customer("Edgar","edgar@boot.com"));
                              customerRepository.save(new Customer("Martin","martin@boot.com"));
                              customerRepository.save(new Customer("Tom","tom@boot.com"));
                              customerRepository.save(new Customer("Sean","sean@boot.com"));
            };
    }
    
 
}
 
 
@RestController
class CustomerController{
        
        CustomerRegistrar customerRegistrar;
        
        @Autowired
        CustomerController(CustomerRegistrar customerRegistrar){
                this.customerRegistrar = customerRegistrar;
        }
        
        @RequestMapping( path="/register", method = RequestMethod.POST)
        Mono<Customer> register(@RequestBody Customer customer){                
                return customerRegistrar.register(customer);     
        }
}
 
@Component
@Lazy
class CustomerRegistrar {
        
        CustomerRespository customerRespository;
        Sender sender;
        
        @Autowired
        CustomerRegistrar(CustomerRespository customerRespository, Sender sender){
                this.customerRespository = customerRespository;
                this.sender = sender;
        }
        
        
        public Mono<Customer> registerMono(Mono<Customer> monoCustomer){
                monoCustomer.doOnNext(customer -> {
                        if(customerRespository.findByName(customer.getName()).isPresent())
                                System.out.println("Duplicate Customer");
                        else {
                                customerRespository.save(customer);
                                //sender.send(customer.getEmail());             
                        }
                }).subscribe();
                return monoCustomer;
        }
 
        // ideally repository will return a Mono object
        public Mono<Customer> register(Customer customer){
                        if(customerRespository.findByName(customer.getName()).isPresent())
                                System.out.println("Duplicate Customer. No Action required");
                        else {
                                customerRespository.save(customer);
                                //외부 큐에게 메시지전송
                                sender.send(customer.getEmail());       
                                System.out.println("Rabbitmq send :::: "+customer.toString());
                        }
                return Mono.just(customer);
        }
}
 
 
//외부 큐와 연결하기 위한 Sender를 빈으로 등록한다.
//또 빈으로 Queue객체를 등록해준다. 해당 문자열로 Receiver쪽에서도 동일하게 받아야한다.
//RabbitMessagingTemplate으로 외부 큐에게 메시지를 전송한다.
@Component
@Lazy
class Sender {
        
        RabbitMessagingTemplate template;
        
        @Autowired
        Sender(RabbitMessagingTemplate template){
                this.template = template;
        }
 
        @Bean
        Queue queue() {
                return new Queue("CustomerQ"false);
        }
        
        public void send(String message){
                template.convertAndSend("CustomerQ", message);
                System.out.println("Ready to send message but suppressed "+ message);
                 
        }
}
 
//repository does not support Reactive. Ideally this should use reactive repository
@RepositoryRestResource
@Lazy
interface CustomerRespository extends JpaRepository <Customer,Long>{
        Optional<Customer> findByName(@Param("name"String name);
}
 
 
//Entity class
@Entity
class Customer{
        @Id
        @GeneratedValue(strategy = GenerationType.AUTO)
        private Long id;
        private String name;
        private String email;
        
        public Customer (){}
 
        
        public Customer(String name, String email) {
                super();
                this.name = name;
                this.email = email;
        }
 
 
        public Long getId() {
                return id;
        }
 
        public void setId(Long id) {
                this.id = id;
        }
 
        public String getName() {
                return name;
        }
 
        public void setName(String name) {
                this.name = name;
        }
 
        public String getEmail() {
                return email;
        }
 
        public void setEmail(String email) {
                this.email = email;
        }
 
        @Override
        public String toString() {
                return "Customer [id=" + id + ", name=" + name + ", email=" + email + "]";
        }
        
         
        
}
cs


  1. server.port=8090
  2.  
  3. spring.rabbitmq.host=localhost
  4. spring.rabbitmq.port=5672
  5. spring.rabbitmq.username=yeoseong
  6. spring.rabbitmq.password=yeoseong
  7.  
  8. spring.mail.host=localhost
  9. spring.mail.port=2525


부트 프로젝트생성할 때 의존성 설정 부분에서 I/O>AMQP를 선택해준다.



posted by 여성게
: