ELK - Filebeat 란?



https://coding-start.tistory.com/187

조금 더 다듬어서 Filebeat 정리하였습니다.


    <실시간 로그 수집을 위한 프로세스 구성>


만약 많은 애플리케이션이 분산되어 있고, 각 애플리케이션이 로그 파일들을 생성한다고 생각해보자. 만약 해당 로그 파일을 하나의 서버에 일일이 ssh 터미널을 이용하여 로그 파일을 수집하는 것이 합리적인 행동일까? 만약 엄청난 규모의 서비스이고 분산되어 있는 서비스의 애플리케이션이 수백개라고 생각하면 ssh를 이용하는 방법은 생각하기도 싫은 방법일 것이다. 이런 상황에서 Filebeat는 로그와 혹은 파일을 경량화된 방식으로 전달하고 중앙 집중화하여 작업을 보다 간편하게 만들어 주는 역할을 한다. 


다시한번 Elastic 공식 홈페이지에서 소개하는 Filebeat를 설명하자면, Filebeat는 로그 데이터를 전달하고 중앙화하기 위한 경량의 Producer이다. 서버에 에이전트로 설치되는 Filebeat는 지정한 로그 파일 또는 위치를 모니터링하고 로그 이벤트를 수집한 다음 인덱싱을 위해 Elasticsearch 또는 Logstash로 전달한다.




Filebeat의 작동 방식은 어떻게 될까?

Filebeat를 시작하면 설정에서 지정한 로그데이터를 바라보는 하나이상의 inputs을 가진다. 지정한 로그 파일에서 이벤트(데이터발생)가 발생할 때마다 Filebeat는 데이터 수확기(harvester)를 시작한다. 하나의 로그 파일을 바라보는 각 havester는 새 로그 데이터를 읽고 libbeat에 보낸다. 그리고 libbeat는 이벤트를 집계하고 집계된 데이터를 Filebeat 설정에 구성된 출력으로 데이터를 보낸다.






Filebeat 시작하기

▶︎▶︎▶︎Filebeat Download


파일비트를 다운로드 받았다면 압축을 풀고, filebeat.yml 파일을 열어 설정파일을 살펴본다.


1
2
3
4
5
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/*.log
cs


위의 설정은 수집할 로그파일의 경로를 설정한다. 즉, /var/log/*.log의 파일을 수집대상으로 정해놓는 것이다.


1
2
3
4
5
6
7
cloud.id: "staging:dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRjZWM2ZjI2MWE3NGJmMjRjZTMzYmI4ODExYjg0Mjk0ZiRjNmMyY2E2ZDA0MjI0OWFmMGNjN2Q3YTllOTYyNTc0Mw=="
 
output.elasticsearch:
  hosts: ["myEShost:9200"]
 
setup.kibana:
  host: "mykibanahost:5601"
cs


출력을 Elasticsearch로 지정하여 수집한 로그데이터를 엘라스틱서치에 저장할 수 있다. 물론 중간에 Logstash 등을 거쳐 데이터를 추가적인 처리를 한 후에 엘라스틱서치에 데이터를 저장할 수도 있다.


1
2
3
#----------------------------- Logstash output --------------------------------
output.logstash:
  hosts: ["127.0.0.1:5044"]
cs


자세한 설정은 Elastic 공식 페이지를 이용하시길 바랍니다.




Filebeat는 파일의 상태를 어떻게 유지하나?

편집하다

Filebeat는 각 파일의 상태를 유지하며 레지스트리 파일의 상태를 디스크로 자주 플러시한다. 상태는 수확기(havester)가 읽었던 마지막 오프셋을 기억하고 모든 로그 라인이 전송되는지 확인하는 데 사용된다. Elasticsearch 또는 Logstash와 같은 출력에 도달 할 수 없는 경우 Filebeat은 마지막으로 보낸 행을 추적하고 출력이 다시 사용 가능 해지면 파일을 계속 읽는다. Filebeat가 실행되는 동안 상태 정보도 각 입력에 대해 메모리에 보관된다. Filebeat가 다시 시작되면 레지스트리 파일의 데이터가 상태를 다시 작성하는 데 사용되며 Filebeat은 마지막으로 알려진 위치에서 각 수확기를 계속 사용한다 또한 Filebeat는 적어도 한번 이상 구성된 데이터를 지정한 출력으로 전달함을 보장한다.




Filebeat와 Kafka를 이용한 간단한 로그수집


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#=========================== Filebeat prospectors =============================
#filebeat로 어떤 파일을 보낼지를 선택하는 부분이다. /Users/yun-yeoseong/kafka_directory/kafka/logs/server.log 파일을 보내기 위한 설정
kafka.home: /Users/yun-yeoseong/kafka_directory/kafka
filebeat.prospectors:
  - input_type: log
    paths:
      -${kafka.home}/logs/server.log*
    multiline.pattern: '^\['
    multiline.negate: true
    multiline.match: after
    fields.pipeline: kafka-logs #파이브라인 아이디
 
#============================= Filebeat modules ===============================
 
filebeat.config.modules:
  # Glob pattern for configuration loading
  path: ${path.config}/modules.d/*.yml
 
  # Set to true to enable config reloading
  reload.enabled: false
 
  # Period on which files under path should be checked for changes
  #reload.period: 10s
 
#==================== Elasticsearch template setting ==========================
 
setup.template.settings:
  index.number_of_shards: 3
  #index.codec: best_compression
  #_source.enabled: false
 
#==================== Kafka output ==========================
#kafka의 프로듀서의 역할을 설정하는 부분이다.
output.kafka: 
  hosts: ["localhost:9092","localhost:9093","localhost:9094","localhost:9095"]
  topic: 'kafka-log'
  partition.round_robin: 
    reachable_only: false
 
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000
cs


위는 카프카를 output으로 하는 설정이다. 간단하게 설정에 대해 설명하면 filebeat.prospectors는 Filebeat의 input을 정의하는 것이다. '-' 구분으로 여러개의 input을 정의할 수 있다. input_type: log 설정은 로그 파일의 모든 행을 읽는 설정이다. paths는 읽어올 파일의 경로이다. '-'구분으로 여러개의 경로를 지정할 수 있다. multiline 설정은 로그 데이터처럼 여러 개행된 데이터를 처리할때 사용하는 어떠한 규칙이라고 보면된다. 자세한 사항은 Elastic 공식 페이지를 확인하면 될듯하다. output.kafka 설정은 출력을 정의한다. hosts는 Kafka 클러스터 노드들을 배열로 나열한 것이고, 여기서 파일비트는 카프카 입장에서는 하나의 프로듀서이므로 pub할 topic을 설정한다. 나머지 설정들은 kafka의 producer 설정이므로 생략한다. 그리고 파일비트에서 카프카는 위처럼 호스트를 나열하면 내부적으로 로드밸런싱을 해주므로 하나의 브로커에만 데이터 요청이 가질 않는다.

이번 예제에서는 파일비트가 데이터를 정말로 수집하는 지를 보기 위해서 kafka가 기본으로 제공하는 console consumer를 이용할 것이다.
우선은 위의 설정에 작성된 토픽을 생성해준다.

kafka는 3대를 클러스터링한 환경이다. 만약 kafka 클러스터링 방법을 모른다면 밑의 링크에서 참조하고 와도 될듯싶다.
▶︎▶︎▶︎kafka cluster


> ./kafka-topic.sh --zookeeper localhost:2181,localhost:2182,localhost:2181/kafka-broker(자신이 설정한 디렉토리입력) --topic Kafka-log --partitions 3 --replication-factor 2 --create.  -> 토픽생성

>./filebeat -e -c filebeat.yml -d "publish"  -> 위에서 작성한 설정파일에 기준한 파일비트 실행

>./kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic kafka-log --from-beginning ->컨슈머실행

카프카의 클러스터중 하나를 다운시켰다 실행시키는 등의 로그가 출력되는 작업을 수행한 후에 컨슈머에 들어오는 데이터를 확인해보자.


왼쪽이 컨슈머, 오른쪽은 파일비트의 로그이다. 파일비트에서 수집한 카프카의 로그가 카프카로 보내지고 컨슈머가 해당 브로커에서 데이터를 가져오는 것을 볼 수 있다.


posted by 여성게
:
Middleware/Kafka&RabbitMQ 2019. 3. 16. 21:10

Kafka - Kafka Producer(카프카 프로듀서) In Java&CLI




카프카 프로듀서란 메시지를 생산(produce)해서 카프카의 토픽으로 메시지를 보내는 역할을 하는 애플리케이션, 서버 등을 모두 프로듀서라고 부른다.

프로듀서의 주요 기능은 각각의 메시지를 토픽 파티션에 매핑하고 파티션의 리더에 요청을 보내는 것이다. 키 값을 정해 해당 키를 가진 모든 메시지를 동일한

파티션으로 전송할 수 있다. 만약 키 값을 입력하지 않으면, 파티션은 라운드 로빈(round-robin) 방식으로 파티션에 균등하게 분배된다.


이후의 모든 예제는 이전 포스팅에서 구성한 카프카 클러스터링 환경에서 진행하였습니다. 동일한 환경 구성을 

구축하고 예제를 진행하시려면 이전 포스팅을 참조하시길 부탁드립니다.


▶︎▶︎▶︎카프카 클러스터링




콘솔 프로듀서로 메시지를 보내 보기

우선 메시지를 보내기 위해서는 토픽이 카프카 내에 존재해야 한다. 만약 존재하지 않는 토픽으로 메시지를 보낼 경우에 자동으로 토픽이 
생성되는 옵션을 이용하려면 "auto.create.topics.enable = true" 옵션을 환경설정 파일에 넣어주면 된다. 
이번 예제는 직접 수동으로 토픽을 생성할 예정이다.

토픽생성은 카프카에 기본으로 제공하는 스크립트를 이용하여 생성한다.


위의 명령으로 test-topic이라는 이름으로 파티션이 1개로 구성되어 있고, 복제본이 자신을 포함한 3개인 토픽이 생성된다.

만약 복제본들이 어느 클러스터 인스턴스에 할당되어 있으며, 리더는 누구인지 그리고 ISR 그룹 구성등을 보고 싶다면 카프카에서 기본으로 제공하는
스크립트로 조회가능하다.


결과를 보면 Leader는 2번 클러스터 인스턴스이며, 파티션은 0번 한개 그리고 복제본이 1,2,3번의 클러스터 인스턴스에 생성되어 있고,

ISR그룹이 1,2,3 클러스터 인스턴스로 이루어져 있음을 알 수 있다. 여기서 ISR 그룹이란 복제본들이 들어있는 클러스터 인스턴스 그룹으로 볼 수 있다.

그리고 리더 선출의 후보의 전제조건이 클러스터가 ISR 그룹내에 존재하는 클러스터이여야만 리더 선출의 후보가 될 수 있다. ISR은 카프카가 도입한

하나의 개념이라고 볼 수 있다. 하나를 예제로 설명하면 현재 2번 클러스터가 리더인데, 만약 해당 리더가 다운되면 ISR그룹에서 2번 클러스터는 빠지게 되고

ISR그룹내 1,3 클러스터 중에서 카프카 컨트롤러가 리더를 선출하게 된다.




카프카 콘솔을 이용한 간단한 메시징 테스트이다.

사실 이런 예제는 클러스터 구축단계에서 다 해보았던 것이지만 이번 포스팅에서는 프로듀서의 몇몇 중요한 옵션들을 자바를 기반으로

설명할 것이다. 물론 spring cloud stream을 이용해 메시징 미들웨어에 의존적이지 않은 코드로도 작성가능 하지만 이번엔 조금더

로우레벨로 직접 자바코드를 이용해 카프카 시스템을 이용해볼 것이다.


자바를 이용한 프로듀서


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
/**
 * Kafka Producer Exam
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaBookProducer1 {
    
    public static Properties init() {
        Properties props = new Properties();
        props.put("bootstrap.servers""localhost:9092,localhost:9093,localhost:9094");
        props.put("key.serializer""org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer""org.apache.kafka.common.serialization.StringSerializer");
        return props;
    }
    
    /**
     * 메시지 전송의 성공/실패여부를 신경쓰지 않는다.
     * 카프카가 항상 살아있는 상태이고 프로듀서가 자동으로 재전송하기 때문에 대부분의 경우 성공적으로 전송되지만,
     * 일부 유실이 있을 수 있다.
     * 
     * send() method를 이용해 ProducerRecord를 보낸다. 메시지는 버퍼에 저장되고 별도의 스레드를 통해 브로커로 전송한다.(로그만 봐도 main이 아닌 별도의 스레드가 일을 처리)
     * send()는 자바 퓨처(Future) 객체로 RecordMetadata를 리턴받지만 리턴값을 무시하기 때문에 메시지가 성공적으로 전송되었는지 알수 없다.
     * 실 서비스 환경에서는 거의 사용하지 않는다.
     * 
     * 메시지를 보낸 후의 에러는 무시하지만, 전송전의 에러는 잡아서 처리할 수 있다.
     */
    public static void sendNoConfirmResult() {
        long start = System.currentTimeMillis();
        try(Producer<StringString> producer = new KafkaProducer<>(init())){
            producer.send(new ProducerRecord<StringString>("yeoseong""Apache Kafka is a distributed streaming platform-sendNoConfirmResult()"));
        }catch (Exception e) {}
        long end = System.currentTimeMillis();
        log.info("sendNoConfirmResult - during time : "+ (end-start));
    }
    
    /**
     * 리턴 값으로 Future를 반환하는데, get()을 호출하여 결과를 받아 메시지가 성공적으로 전송됬는지 체크한다. 
     * 메인 스레드가 block되는 상황이 있지만, 신뢰성있는 메시지 전송을 보장한다.
     */
    public static void sendSync() {
        long start = System.currentTimeMillis();
        try(Producer<StringString> producer = new KafkaProducer<>(init())){
            RecordMetadata meta = producer.send(new ProducerRecord<StringString>("yeoseong""Apache Kafka is a distributed streaming platform-sendSync()")).get();
            log.info("Partition: {}, Offset: {}",meta.partition(),meta.offset());
        }catch (Exception e) {}
        long end = System.currentTimeMillis();
        log.info("sendSync - during time : "+ (end-start));
    }
    
    /**
     * 콜백방식으로 비동기 전송을 한다. 메시지 전송이 완료 혹은 실패되면,
     * 브로커쪽에서 콜백으로 onCompletion을 호출한다. 만약 실패하면 Exception 객체가 담긴다.
     * org.apache.kafka.clients.producer.Callback
     * 
     * 로그를 보면 main이 아닌 별도의 카프카 스레드에서 콜백을 호출한다.
     */
    public static void sendAsync() {
        long start = System.currentTimeMillis();
        try(Producer<StringString> producer = new KafkaProducer<>(init())){
            producer.send(new ProducerRecord<StringString>("yeoseong""Apache Kafka is a distributed streaming platform-sendAsync()"),new KafkaCallback());
        }catch (Exception e) {}
        long end = System.currentTimeMillis();
        log.info("sendAsync() - during time : "+ (end-start));
    }
    
    /**
     * 프로듀서에서 키값까지 같이 메시지를 보내면 키에 해당하는 파티션에만 메시지가 들어간다.
     * 하지만 키를 포함시키지 않는다면 라운드 로빈 방식으로 파티션마다 균등분배된다.
     * 
     * evenkey&oddkey는 각각 동일한 파티션으로만 데이터를 전송하게 된다.
     */
    public static void sendWithKey() {
        long start = System.currentTimeMillis();
        String topicName = "yoon";
        String oddKey="1";
        String evenKey="2";
        
        Properties props = new Properties();
        props.put("bootstrap.servers""localhost:9092,localhost:9093,localhost:9094");
        props.put("key.serializer""org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer""org.apache.kafka.common.serialization.StringSerializer");
        props.put("acks""1");
        props.put("compression.type""gzip");
        
        try(Producer<StringString> producer = new KafkaProducer<>(props)){
            
            IntStream.rangeClosed(110).forEach(i->{
                if(i%2==1) {
                    producer.send(new ProducerRecord<StringString>(topicName, oddKey ,i+" - Apache Kafka is a distributed streaming platform-sendWithKey() oddKey "+oddKey));
                }else {
                    producer.send(new ProducerRecord<StringString>(topicName, evenKey ,i+" - Apache Kafka is a distributed streaming platform-sendWithKey() evenKey "+evenKey));
                }
            });
            
            
        }catch (Exception e) {}
        long end = System.currentTimeMillis();
        log.info("sendWithKey() - during time : "+ (end-start));
    }
    
    
    public static void main(String[] args) {
        
//        sendNoConfirmResult();
//        sendSync();
//        sendAsync();
        sendWithKey();
    }
 
}
 
@Slf4j
class KafkaCallback implements Callback{
 
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        // TODO Auto-generated method stub
        if(!ObjectUtils.isEmpty(metadata)) {
            log.info("Partition: {}, Offset: {}",metadata.partition(),metadata.offset());
        }else {
            log.error("KafkaCallback - Exception");
        }
    }
    
}
 
cs

코드는 몇개의 예제를 메소드 형태로 구분하여 만들었다.

첫번째 메소드는 프로듀서에서 서버로 메시지를 보내고 난 후에 성공적으로 도착했는지까지 확인하지 않는다. 카프카가 항상 살아있는 상태이고
프로듀서가 자동으로 재전송하기 때문에 대부분의 경우 성공적으로 전송되지만, 일부 메시지는 손실이 있을 수도 있다. send()는 Future 객체로 RecordMetadata를 리턴받지만 그 리턴값을 객체로 받지 않고 있기 때문에 성공적으로 메시지가 전달되었는지 알 수 없다. 대부분 테스트 용도로만 사용하고 실서비스에서는
이용하지 않는 방법이다.

두번째 메소드는 동기 전송방법이다. 프로듀서는 메시지를 보내고 Future의 get()를 이용하여 Future를 기다린 후에 send()가 성공했는지 실패했는지 확인한다.
Future의 get()은 해당 작업 스레드를 블럭한 상태에서 Future의 리턴을 기다리기 때문에 동기 전송방법인 것이다.

세번째 메소드는 비동기 전송방법이다. 프로듀서는 send() 메소드를 콜백과 같이 호출하고 카프카 브로커에서 응답을 받으면 콜백한다. 
비동기적으로 전송한다면 응답을 기다리지 않기 때문에 더욱 빠른 전송이 가능하다. 콜백을 사용하기 위해 org.apache.kafka.clients.producer.Callback을
구현하는 클래스가 필요하다.(사실 람다식을 이용해서 매개변수로 넘겨주면 클래스는 따로 구현하지 않아도된다.) 카프카가 오류를 리턴하면 onCompletion()는 예외를 갖게 된다.

마지막 메소드는 키값을 추가하여 메시지를 보낼 경우이다. 키값을 포함하였을 경우에 컨슈머에는 어떻게 메시지가 전달 될지 콘솔 컨슈머를
이용하여 결과를 확인해볼 것이다.

우선 파티션 2개짜리인 토픽을 생성한다.


마지막 메소드를 main 메소드에서 실행한 후에 각각의 파티션으로 컨슈머를 실행했을 경우 결과가 어떻게 나오지는 확인한다.



2번 키를 포함시킨 메시지는 모두 0번 파티션으로 갔고, 1번 키를 포함시킨 메시지는 모두 1번 파티션으로 메시지가 전송되었다.

물론 특정파티션이 아닌 토픽으로 컨슈머를 실행시켰다면 모든 메시지 10개가 결과에 포함되었을 것이다. 이렇게 키값을 이용하여

특정 파티션으로만 메시지를 보낼 수 있는 기능을 제공하므로 다양하게 활용 가능하다.




프로듀서 주요옵션

Producer 주요 옵션

1)bootstrap.servers localhost:9092,localhost:9093,localhost:9094

:카프카 클러스터에 클라이언트가 처음 연결을 위한 호스트와 포트정보로 구성된 리스트 정보를 나타낸다. 전체 카프카 리스트가 아닌 호스트 하나만 입력해 사용할 있지만

카프카 인스턴스가 죽으면 클라이언트는 더이상 접속 없습니다. , 전체 클러스터 인스턴스 목록을 옵션으로 넣는 것이 좋습니다.


2)acks 0,1,all , —request-required-acks 0,1,all

-acks = 0 

0으로 설정하면 제작자는 서버로부터의 승인을 전혀 기다리지 않습니다. 레코드가 소켓 버퍼에 즉시 추가되고 전송 것으로 간주됩니다. 경우 서버가 레코드를 수신했음을 보증 없으며 재시도 구성이 적용되지 않습니다 (클라이언트는 일반적으로 어떤 실패도 없으므로). 레코드에 대해 다시 주어진 오프셋은 항상 -1 설정됩니다.


-acks = 1 

만약 1 설정하는 경우 리더는 데이터를 기록하지만, 모든 팔로워는 확인하지 않습니다. 경우 일부 데이터 손실이 있을수 있습니다.


-acks = all 

acks=-1 동일하며, 리더는 ISR 팔로워로부터 데이터에 대한 ack 기다립니다. 하나의 팔로워가 있는 데이터는 손실되지 않으며, 데이터 무손실에 대해 가장 강력하게 보장합니다.


3)buffer.memory

프로듀서가 카프카 서버로 데이터를 보내기 위해 잠시 대기(배치 전송이나 딜레이 ) 있는 전체 메모리 바이트입니다.


4)compression.type

프로듀서가 데이터를 압축해서 보낼 있는데, 어떤 타입으로 압축할지를 정할 있다. 옵션으로 none,gzip,snappy,lz4 같은 다양한 포맷이 있다.


5)retries

일시적인 오류로 인해 전송에 실패한 데이터를 다시 보내는 횟수


6)batch.size

프로듀서는 같은 파티션으로 보내는 여러 데이터를 함께 배치로 보내려고 시도한다. 이러한 동작은 클라이언트와 서버 양쪽에 성능적인 측면에서 도움이 된다. 설정으로 배치 크기 바이트 단위를 조정할 있다. 정의된 크기보다 데이터는 배치를 시도하지 않는다. 배치를 보내기전 클라이언트 장애가 발생하면 배치 내에 있던 메시지는 전달되지 않는다. 만약 고가용성이 필요한 메시지의 경우라면 배치 사이즈를 주지 않는 것도 하나의 방법이다.


7)linger.ms

배치형태의 메시지를 보내기 전에 추가적인 메시지들을 위해 기다리는 시간을 조정한다. 카프카 프로듀서는 지정된 배치 사이즈에 도달하면 옵션과 관계없이 즉시 메시지를 전송하고, 배치 사이즈에 도달하지 못한 상황에서 linger.ms 제한 시간에 도달했을때 메시지들을 전송한다. 0 기본값(지연없음)이며, 0보다 값을 설정하면 지연 시간은 조금 발생하지만 처리량이 좋아진다.


8)max.request.size

프로듀서가 보낼 있는 최대 메시지 바이트 사이즈. 기본값은 1MB이다.


나머지 Kerberos, ssl과 타임관련등의 설정들은 https://kafka.apache.org/documentation/#producerconfigs을 참고



server.propertis ->acks=all 따른 server.properties 설정정보 변경

만약 밑의 설정이 없다면 설정파일의 맨밑에 작성

min.insync.replicas=n

설정은 acks=all 했을 경우 리더를 포함한 인스턴스 n개의 승인(데이터복제,저장) 받으면 바로 프로듀서에게 결과를 전달한다.

최적은 acks=all, min.insync.replicas=2, replication factor=3 ->카프카문서에 명시되어있음.



메시지 전송 방법

프로듀서의 옵션 중 acks 옵션을 어떻게 설정하는지에 따라 카프카로 메시지를 전송할 때 메시지 손실 여부와 메시지 전송 속도 및 처리량이 달라진다.

1) 메시지 손실 가능성이 높지만 빠른 전송이 필요한 경우(acks=0 으로 설정)
메시지를 전송할 때 프로듀서는 카프카 서버에서 응답을 기다리지 않고, 메시지를 보낼 준비가 되는 즉시 다음 요청을 보낸다. 하지만 이런 방법을
이용한다면 자신이 보낸 메시지에 대해 결과를 기다리지 않기때문에 메시지 유실이 있을 수도 있다.


2) 메시지 손실 가능성이 적고 적당한 속도의 전송이 필요한 경우(acks=1 으로 설정)

이 옵션은 앞선 옵션과 달리 프로듀서가 카프카로 메시지를 보낸 후 보낸 메시지에 대해 메시지를 받는 토픽의 리더가 잘 받았는지 확인(acks)한다.

응답 대기 시간 없이 계속 메시지만 보내던 방법과 달리 확인을 하는 시간이 추가되어 메시지를 보내는 속도는 약간 떨어지게 되지만

메시지의 유실 가능성이 조금 줄어든다. 하지만 메시지를 받는 리더가 장애가 발생하면 메시지 유실 가능성이 있다. 하지만 보통 프로듀서 애플리케이션으로

많이 사용하는 로그스태시(logstash), 파일비트(Filebeat) 등에서는 프로듀서의 acks 옵션을 기본 1로 하고 있다. 특별한 경우가 아니라면

속도와 안전성을 확보할 수 있는 acks=1을 사용하는 것을 추천한다. 물론 메시지 유실에 큰 영향이 없는 서비스라는 조건하이다.


3) 메시지 전송 속도는 느리지만 메시지 손실이 없어야 하는 경우(acks=all 으로 설정)

acks=all의 동작 방법은 프로듀서가 메시지를 전송하고 난후 리더가 메시지를 받았는지 확인하고 추가로 팔로워까지 메시지를 잘받았는지(복제) 확인 하는 것이다.

속도적인 측면으로 볼때, acks 옵션 중에서 가장 느리지만 메시지 손실을 허용하지 않은 경우 사용하는 방법이다. acks=all을 완벽하게 사용하고자 한다면

프로듀서의 설정 뿐 아니라 브로커의 설정도 같이 조정해야한다. 브로커의 설정에 따라 응답 확인을 기다리는 수가 달라지게 된다.


-프로듀서의 acks=all과 브로커의 min.insync.replicas=1

이것은 acks=all 설정을 하였고 replication-factor가 예를 들어 3개라고 해도, 응답을 하나의 클러스터에서만 받게 된다.

즉, acks=1과 동일한 설정이 된다.


-프로듀서의 acks=all과 브로커의 min.insync.replicas=2

프로듀서가 메시지를 보내면 2개의 클러스터에 응답을 기다리게 된다.


-프로듀서의 acks=all과 브로커의 min.insync.replicas=3

만약 리플리케이션 팩터가 3이라면 위의 설정은 모든 클러스터 인스턴스의 응답 결과를 기다린다. 여기서 하나의 클러스터가 장애가

발생한다면 프로듀서는 메시지를 보낼 수 없게 될 것이다.


위에서 설명하였지만 카프카가 공식적으로 권장하는 옵션은

acks=all, min.insync.replicas=2, replication factor=3이다.


posted by 여성게
:
Middleware/Kafka&RabbitMQ 2019. 3. 13. 10:38

Kafka - Kafka(카프카) cluster(클러스터) 구성 및 간단한 CLI사용법





▶︎▶︎▶︎카프카란?


이전 포스팅에서는 메시징 시스템은 무엇이고, 카프카는 무엇이며 그리고 카프카의 특징과 다른 메시지 서버와의 차이점에 대한

포스티이었습니다. 이번 포스팅은 간단하게 카프카3대를 클러스터링 구성을 하여 서버를 띄우고 CLI를 이용하여 간단히

카프카를 사용해보려고 합니다. 카프카는 중앙에서 많은 서비스 시스템의 데이터를 받아서 다른 시스템으로 받아주는 역할을 하는

메시지 시스템으로 MSA에서는 없어선 안되는 존재가 되었습니다. 그렇다면 이렇게 중요한 카프카를 한대만 띄워서 프로덕트 환경에서

운영한다는 것은 과연 안전한 생각일까요? 아닙니다. 여러대를 클러스터링 구성하여 고가용성을 높혀야 운영환경에서도 안전하고 신뢰성있는

메시지 시스템 구성이 될것입니다. 






위의 그림은 카프카를 여러대 클러스터링을 했을 경우의 구성도 입니다. 일단 중요한 것은 Zookeeper라는 존재입니다. 주키퍼는

이전 포스팅에서 Zookeeper란 무엇인가? 그리고 Solr 클러스터링 구성을 했을 때 다뤘던 내용이므로 이번 포스팅에서는 생략합니다.




▶︎▶︎▶︎Zookeeper란?

▶︎▶︎▶︎Solr Cluster 구성


간단하게 이야기하면 주키퍼란 분산환경 애플리케이션을 중앙에서 관리해주는 디렉토리 구조의 분산 코디네이터라고 보시면 됩니다. 지금부터는

실습에 초점을 마추겠습니다.



우선은 주키퍼를 설치해줍니다. 이번 실습의 버전은 3.4.12 버전기준입니다.

설정파일을 만지기 전에 주키퍼를 위한 디렉토리 구성을 진행하겠습니다. 저는 카프카 실습을 위해 별도의 하나의 디렉토리를 구성하였고,

그안에 주키퍼와 카프카에 대한 파일들을 모두 넣어놓았습니다. 우선 다음 예제는 주키퍼와 카프카를 별도의 디렉토리에 설치했다는 가정하에 진행합니다.

주키퍼는 링크를 참조하셔서 총 3대로 띄워주시면 됩니다.(Master-slave 관계의 클러스터)


카프카도 동일한 디렉토리에 설치해줍니다. 편의상 카프카의 디렉토리 루트를 $KAFKA로 지칭합니다.


우선 진행하기 앞서 클러스터 구성에서 인스턴스 각각이 자신의 메타 데이터와 카프카가 데이터를 쓸 디렉토리를 만들어줍니다.

kdata1,kdata2,kdata3 디렉토리를 만들어줍니다. 그리고 $KAFKA/config 밑에 있는 server.properties를 2장 복사해줍니다.

그리고 해당 파일 안에 설정들을 클러스터 구성에 가장 기본이 되는 설정으로 바꿔줍니다.


broker.id = 1,broker.id = 2,broker.id = 3 으로 총 3개의 serverN.properties에 작성해줍니다. 이것은

주키퍼 myid와 비슷한 역할을 하는 클러스터 인스턴스 구분용 ID값입니다.

그리고 지금 실습은 로컬에 3대의 카프카서버를 띄워서 클러스터링 구성을 할것임으로 각각 서버의 포트를 달리 지정해줍니다.


listeners=PLAINTEXT://:9092,listeners=PLAINTEXT://:9093,listeners=PLAINTEXT://:9094 으로 각각 파일에 포트를 구분해줍니다.


log.dirs=$KAFAKA/kdata1,log.dirs=$KAFAKA/kdata2,log.dirs=$KAFAKA/kdata3 으로 각각 카프카 서버가 자신의

메타 데이터를 쓸 디렉토리를 구분지어줍니다. 필자는 처음에 하나의 디렉토리만 생성해서 공유했더니 충돌 문제가 있어

각각 별도의 디렉토리 구성을 가져갔습니다.



마지막으로 주키퍼와의 연동 설정입니다.

zookeeper.connect=localhost:2181,localhost:2182,localhost:2183/kafka-broker 으로 각 설정 파일에 동일하게 작성해줍니다.

여기에서 조금 특이한것이 ~/kafka-broker 입니다. 이것은 무엇이냐면 만약 각각 다른 용도의 카프카 클러스터를 2세트 운영한다고 가정하고,

주키퍼는 같은 주키퍼 클러스터를 이용한다고 합니다. 만약 ~/kafka-broker를 입력하지 않으면 카프카 클러스터 2세트가 주키퍼의 루트 디렉토리에 동일한

구조의 데이터를 쓰게 되고 그러면 카프카 클러스터 2세트가 데이터 충돌이 나게됩니다. 그렇기 때문에 ~/kafka-broker라고 입력하여 주키퍼의 루트디렉토리가

아닌 /kafka-broker/.. 라는 디렉토리를 하나 새로 생성하여 아예 카프카 클러스터 1대의 전용 디렉토리를 구성해주는 겁니다. 여기까지 진짜 기본적인

설정으로 클러스터 설정을 마쳤습니다. 사실은 이정도 설정으로는 운영환경에서는 운영하기 힘들것입니다. 나머지 설정관련해서는 카프카 공식 홈페이지에

영어로 친절하게? 작성되어 있으니 참고 부탁드립니다.






이제 $KAFKA/bin/kafka-server-start.sh -daemon ../config/server.properties 명령으로 3개의 카프카를 실행시켜줍니다. 물론 선행조건은

주키퍼가 실행되었다는 조건입니다. 만약 지금까지 잘 따라오셨다면 문제없이 실행됩니다. 마지막으로 간단한 CLI를 이용한 producer,consumer 예제입니다.



위의 명령으로 큐의 역할로 사용될 토픽을 생성해줍니다. 만약 토픽을 잘못 생성하신 분들을 위해서 토픽을 삭제하는 명령입니다.



Noting 메시지와 함께 토픽이 삭제되었습니다. 



프로듀서를 실행시켜줍니다. 설정 인자들은 설명없어도 직관적으로 보입니다.



메시지를 보냅니다.



또 한번 보냅니다.



컨슈머 쪽을 확인해보면 메시지가 잘와있는 것을 확인할 수 있습니다. 여기까지 카프카 클러스터 구성과 간단한 CLI를 이용하여 카프카를 사용해보았습니다.

posted by 여성게
:
Middleware/Kafka&RabbitMQ 2019. 3. 12. 22:30

Kafka - Kafka(카프카)의 동작 방식과 원리


 

 

Kafka는 기본적으로 메시징 서버로 동작합니다. 여기서 메시징 시스템에 대해 간단히 살펴보자면 

메시지라고 불리는 데이터 단위를 보내는 측(publisher,producer)에서 카프카에 토픽이라는 각각의 메시지 저장소에 데이터를 저장하면, 

가져가는 측(subscriber, consumer)이 원하는 토픽에서 데이터를 가져가게 되어 있습니다. 즉, 메시지 시스템은 중앙에 메시징 시스템 서버를

두고 이렇게 메시지를 보내고(publish) 받는(subscriber) 형태의 통신 형태인 pub/sub 모델의 통신구조입니다.

 

여기서 미담이지만, 카프카의 창시자인 제이 크렙스는 대학 시절 문학 수업을 들으며 소설가 프란츠 카프카에 심취했습니다. 자신의 팀이 새로 개발할 시스템이

데이터 저장과 기록, 즉 쓰기에 최적화된 시스템이었기에 이런 시스템은 글을 쓰는 작가의 이름을 사용하는 것이 좋겠다 생각하여 

카프카라는 이름으로 애플리케이션이 탄생하게 되었습니다.

 

 

 

 

pub/sub은 비동기 메시징 전송 방식으로서, 발신자의 메시지에는 수신자가 정해져 있지 않은 상태로 발행합니다. 구독을 신청한 수신자만이 정해진

메시지를 받을 수 있습니다. 또한 수신자는 발신자 정보가 없어도 원하는 메시지만을 수신가능합니다.

 

 

펍/섭 구조가 아닌 일반적인 통신은

 

A ---------------> B

C ---------------> D

 

형태로 통신을 하게 됩니다. 이럴 경우 빠른 전송 속도와 전송 결과를 신속하게 알 수 있는 장점이 있지만, 장애가 발생한 경우

적절한 처리를 수신자 측에서 해주지 않는다면 문제가 될 수 있습니다. 그리고 통신에 참여하는 개체수가 많아 질 수록 

일일이 다 연결하고 데이터를 전송해야 하기 때문에 확장성이 떨어집니다.

 

하지만 펍/섭구조는 

 

         ==========

A ---------------->=메시지서버= ---------->B

C ---------------->========== ---------->D

 

프로듀서가 메시지를 컨슈머에게 직접 전달하는 방식이 아니라 중간의 메시징 시스템에 전달합니다. 이때 메시지 데이터와 수신처 ID를

포함시킵니다. 메시징 시스템의 교환기가 메시지의 수신처ID 값을 확인한 다음 컨슈머들의 큐에 전달합니다. 컨슈머는 자신들의

큐를 모니터링하고 있다가, 큐에 메시지가 전달되면 이 값을 가져갑니다. 이렇게 구성할 경우 장점은 혹시나 통신 개체가 하나 빠지거나 수신 불가능 상태가 되었을 

때에도, 메시징 시스템만 살아 있다면 프로듀서에서 전달된 메시지는 유실되지 않고, 장애에서 복구한 통신 개체가 살아나면 다시 메시지를

안전하게 전달할 수 있습니다. 그리고 메시징 시스템을 중심으로 연결되기 때문에 확장성이 용이합니다. 물론 직접 통신 하지 않기 때문에

메시지가 정확하게 전달되었는지 확인하는 등의 코드가 들어가 복잡해지고, 메시징 서버를 한번 거쳐서 가기에 통신속도가 느린 단점도 있습니다.

하지만 메시징 시스템이 주는 장점때문에 이러한 단점은 상쇄될만큼 크지 않다고 생각합니다.(비동기가 주는 장점 등)

 

(밑의 설명을 두고 넣은 그림은 아님)

 

 

기존 메시징 시스템을 사용하는 펍/섭 모델은 대규모 데이터(데이터 단건당 수MB 이상)를 메시징 시스템을 통해 전달하기 보다는 간단한 이벤트(수KB정도)를 서로 전송하는데 주로 사용되었습니다. 왜냐하면 메시징 시스템 내부의 교환기의 부하, 각 컨슈머들의 큐관리, 큐에 전달되고 가져가는 메시지의 정합성, 전달 결과를

정확하게 관리하기 위한 내부 프로세스가 아주 다양하고 복잡했기 때문입니다.

즉, 기존 메시징 시스템은 속도와 용량보단 메시지의 보관,교환,전달 과정에서의 신뢰성을 보장하는 것에 중점을 두었습니다.

 

카프카는 메시징 시스템이 지닌 성능의 단점을 극복하기 위해, 메시지 교환 전달의 신뢰성 관리를 프로듀서와 컨슈머 쪽으로 넘기고, 부하가 많이 걸리는 교환기 기능

역시 컨슈머가 만들 수 있게 함으로써 메시징 시스템 내에서의 적업량을 줄이고 이렇게 절약한 작업량을 메시지 전달 성능에 집중시켜 

고성능 메시징 시스템을 만들어 냈습니다.

 

 

 

 

 

 

카프카의 특징

프로듀서와 컨슈머의 분리 - 카프카는 메시징 전송 방식 중 메시지를 보내는 역할과 받는 역할이 완벽하게 분리된 펍/섭 방식을 적용했습니다. 각 서비스서버들은 다른 시스템의 상태 유무와 관계없이 카프카로 메시지를 보내는 역할만 하면 되고, 마찬가지로 메시지를 받는 시스템도서비스 서버들의 상태유무와 관계없이 카프카에 저장되어 있는 메시지만 가져오면 됩니다.
멀티 프로듀서, 멀티 컨슈머 - 카프카는 하나의 토픽에 여러 프로듀서 또는 컨슈머들이 접근 가능한 구조로 되어있습니다. 하나의 프로듀서가 하나의토픽에만 메시지를 보내는 것이 아니라, 하나 또는 하나 이상의 토픽으로 메시지를 보낼 수 있습니다. 컨슈머는 역시 하나의 토픽에서만메시지를 가져오는 것이 아니라, 하나 또는 하나 이상의 토픽으로부터 메시지를 가져올 수 있습니다. 이렇게 멀티 프로듀서와 멀티 컨슈머를구성할 수 있기 때문에 카프카는 중앙 집중형 구조로 구성할 수 있게 됩니다.
디스크에 메시지 저장 - 카프카가 기존의 메시징 시스템과 가장 다른 특징 중 하나는 메모리에 데이터를 쓰는 것이 아니라, 바로 디스크에 메시지를 저장하고유지하는 것입니다. 일반적인 메시징 시스템들은 컨슈머가 메시지를 읽어가면 큐에서 바로 삭제하는데, 카프카는 컨슈머가 데이터를 가져가도 일정 기간동안데이터를 유지합니다. 트래픽이 급증해도 컨슈머는 디스크에 안전하게 저장된 데이터를 손실 없이 가져갈 수 있습니다. 또한 앞서 멀티 컨슈머가 가능한 것도이렇게 디스크에 데이터를 쓰기 때문입니다. 디스크에 데이터를 쓰기 때문에 처리가 느릴 것이라는 생각을 할 수 있기도한대, 기존의 디스크 사용법과는다르게 디스크를 나눠서 쓰는 것이 아니라, 특정 영역의 디스크에 순차적으로 쓰기 때문에 읽어가는 디스크의 영역의 범위를 확 줄일 수 있어 읽어가는 속도는조금은 빨라집니다. 그리고 카프카는 무조건 데이터를 디스크에 쓴다기 보다는 VM메모리의 일부를 페이지 캐싱으로 사용하기 때문에 속도가 빠릅니다.
확장성 - 카프카는 확장이 매우 용이하도록 설계되어 있습니다. 하나의 카프카 클러스터는 3대부터 수십대의 브로커로도 메시지 서버의 중지없이 확장 가능합니다.
높은성능 - 카프카는 내부적으로 고성능을 유지하기 위해 분산 처리, 배치 처리 등 다양한 기법을 사용하고 있습니다.(병렬 처리를 위한 토픽 파티셔닝 기능)
이러한 카프카의 사용으로 연동되는 애플리케이션들의 느슨한 결합도를 유지할 수 있게 됩니다. 이번 포스팅은 간단한 카프카의 소개였고,다음 포스팅부터는 진짜 카프카를 이용한 예제 포스팅이 될 것 같습니다.

 

posted by 여성게
:

Java - lambda(람다) 간단한 사용법 !


람다란 무엇일까? 람다(lambda)란 간단하게 표현하면 메서드에 전달 가능한 함수형 인터페이스의 구현체이다.

그럼 여기서 함수형 인터페이스란 무엇인가? 함수형 인터페이스는 하나의 메소드만 선언되어 있는 인터페이스를 함수형 인터페이스라고 부른다.

이것의 예제 소스코드를 보면,

1
2
3
4
@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}
cs

이것은 java.util.concurrent의 Callable 인터페이스이다. 이러한 인터페이스를 함수형 인터페이스라고 부른다.

(@FunctionalInterface 어노테이션을 붙이면 이 인터페이스가 함수형 인터페이스 조건에 어긋나면 예외가 발생)


여기서 이런 생각이 들 수 있다. 하나의 함수만 선언된 인터페이스가 무슨 소용인가..? 함수형 인터페이스는 무조건 우리가 매개변수로 전달한 람다식만 사용가능한 인터페이스인가? 아니다! 자바 1.8의 혁신적인 변화중 람다,스트림등등 이외에도 인터페이스 디폴트 메소드가 존재한다.


1
2
3
4
5
6
7
8
9
10
11
12
@FunctionalInterface
public interface Function<T, R> {
 
    R apply(T t);
 
    default <V> Function<V, R> compose(Function<super V, ? extends T> before) {
        Objects.requireNonNull(before);
        return (V v) -> apply(before.apply(v));
    }
 
    ....
}
cs


위는 자바의 Function 인터페이스 코드의 일부를 발췌한 것이다. 위의 코드를 보면 이상한 생각이 들수도 있다.

분명 함수형 인터페이스는 하나의 메소드 선언만 갖는다고 했는데, 위의 인터페이스는 default라는 선언이 된 메소드를 갖고 있다.


사실 위에서 정의한 바를 정확히 얘기하면 함수형 인터페이스란 구현 해야될 메소드가 하나인 인터페이스를 말하는 것이다. 위의 default메소드의 기능이란 무엇인가 하면, 이전 인터페이스는 메소드 선언만 가능했다. 그런데 지금은 모든 인터페이스 구현체가 동일한 기능을 쓴다면 계속 반복해서 오버라이드하는 것이 아닌, 인터페이스에 구현 메소드를 아예 선언해 버리는 기능인 것이다. 이것이 default 메소드의 기능이다. 


그렇다면 위의 Function 인터페이스는 사용할때 구현해야하는 메소드를 하나만 갖는 함수형 인터페이스인 것이다.



Lambda(람다)에는 아주 다양한 함수형 인터페이스가 있다. 여기서는 몇가지의 함수형 인터페이스를 이용하여

사용법을 다루어볼 것이다.




BiConsumer<T,U>

Represents an operation that accepts two input arguments and returns no result.

BiFunction<T,U,R>

Represents a function that accepts two arguments and produces a result.

BinaryOperator<T>

Represents an operation upon two operands of the same type, producing a result of the same type as the operands.

BiPredicate<T,U>

Represents a predicate (boolean-valued function) of two arguments.

BooleanSupplier

Represents a supplier of boolean-valued results.

Consumer<T>

Represents an operation that accepts a single input argument and returns no result.

DoubleBinaryOperator

Represents an operation upon two double-valued operands and producing a double-valued result.

DoubleConsumer

Represents an operation that accepts a single double-valued argument and returns no result.

DoubleFunction<R>

Represents a function that accepts a double-valued argument and produces a result.

DoublePredicate

Represents a predicate (boolean-valued function) of one double-valued argument.

DoubleSupplier

Represents a supplier of double-valued results.

DoubleToIntFunction

Represents a function that accepts a double-valued argument and produces an int-valued result.

DoubleToLongFunction

Represents a function that accepts a double-valued argument and produces a long-valued result.

DoubleUnaryOperator

Represents an operation on a single double-valued operand that produces a double-valued result.

Function<T,R>

Represents a function that accepts one argument and produces a result.

IntBinaryOperator

Represents an operation upon two int-valued operands and producing an int-valued result.

IntConsumer

Represents an operation that accepts a single int-valued argument and returns no result.

IntFunction<R>

Represents a function that accepts an int-valued argument and produces a result.

IntPredicate

Represents a predicate (boolean-valued function) of one int-valued argument.

IntSupplier

Represents a supplier of int-valued results.

IntToDoubleFunction

Represents a function that accepts an int-valued argument and produces a double-valued result.

IntToLongFunction

Represents a function that accepts an int-valued argument and produces a long-valued result.

IntUnaryOperator

Represents an operation on a single int-valued operand that produces an int-valued result.

LongBinaryOperator

Represents an operation upon two long-valued operands and producing a long-valued result.

LongConsumer

Represents an operation that accepts a single long-valued argument and returns no result.

LongFunction<R>

Represents a function that accepts a long-valued argument and produces a result.

LongPredicate

Represents a predicate (boolean-valued function) of one long-valued argument.

LongSupplier

Represents a supplier of long-valued results.

LongToDoubleFunction

Represents a function that accepts a long-valued argument and produces a double-valued result.

LongToIntFunction

Represents a function that accepts a long-valued argument and produces an int-valued result.

LongUnaryOperator

Represents an operation on a single long-valued operand that produces a long-valued result.

ObjDoubleConsumer<T>

Represents an operation that accepts an object-valued and a double-valued argument, and returns no result.

ObjIntConsumer<T>

Represents an operation that accepts an object-valued and a int-valued argument, and returns no result.

ObjLongConsumer<T>

Represents an operation that accepts an object-valued and a long-valued argument, and returns no result.

Predicate<T>

Represents a predicate (boolean-valued function) of one argument.

Supplier<T>

Represents a supplier of results.

ToDoubleBiFunction<T,U>

Represents a function that accepts two arguments and produces a double-valued result.

ToDoubleFunction<T>

Represents a function that produces a double-valued result.

ToIntBiFunction<T,U>

Represents a function that accepts two arguments and produces an int-valued result.

ToIntFunction<T>

Represents a function that produces an int-valued result.

ToLongBiFunction<T,U>

Represents a function that accepts two arguments and produces a long-valued result.

ToLongFunction<T>

Represents a function that produces a long-valued result.

UnaryOperator<T>

Represents an operation on a single operand that produces a result of the same type as its operand.


위의 표는 자바 api가 제공하는 많은 함수형 인터페이스이다. 사실 위의 표 이외에도 Callable,Runnable 등의 많은 함수형 인터페이스가 존재한다.


이중 몇가지만 예제로 구현해 보았다.



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
public class LambdaTest {
 
    public static void main(String[] args) {
        /*
         * Predicate
         * 문자열을 받아서 해당 문자열이 빈 문자열인지를 반환
         */
        String str = "asd";
        System.out.println(lambdaIsEqual(p->p.isEmpty(), str));
        
        /*
         * Cunsumer
         * 인수를 각각 +1 해줌.
         */
        List<Integer> list = Arrays.asList(1,2,3,4,5);
        lambdaProcessParam( (Integer c)->{
                                        list.set(c-1,c+1);
                                        } , list);
        System.out.println(Arrays.toString(list.toArray()));
        
        /*
         * Function
         * 문자열 리스트를 받아서 각각의 문자열의 길이를 담은 리스트로 반환
         */
        List<String> list2 = Arrays.asList("ab","cds","ewwqd","a");
        List<Integer> result = lambdaConvertParam(list2, f->f.length());
        System.out.println(Arrays.toString(result.toArray()));
        
        /*
         * Supplier
         * 랜덤한 숫자를 반환한다.
         */
        System.out.println(lambdaSupplier(()->{
            return (int)(Math.random()*10)+1;
        }));
        
        /*
         * UnaryOperator
         * 숫자 리스트를 받아 각 숫자에 +2한 값을 담은 리스트로 반환
         */
        List<Integer> ele = Arrays.asList(1,2,3,4,5);
        List<Integer> result2 = lambdaUnaryOper(ele, uo->uo+2);
        System.out.println(Arrays.toString(result2.toArray()));
        
        /*
         * BinaryOperator
         * 두개의 문자열을 받아서 공백을 기준으로 두개의 문자열을 합하여 반환
         */
        String str1 = "yeoseong";
        String str2 = "yoon";
        System.out.println(lambdaBinaryOper(str1, str2, (bo1,bo2)->bo1+" "+bo2));
        
        /*
         * 하나의 정수와 문자열을 입력받아 정수와 문자열의 길이가 같은지 검사
         */
        System.out.println(lambdaBiPred(4"yoon", (bp1,bp2)->bp1 == bp2.length()));
        
        /*
         * 문자열 리스트에 인덱스한 숫자 번호를 붙여준다.
         */
        List<String> list3 = Arrays.asList("a","b","c","d");
        List<String> result4 = new ArrayList<>();
        lambdaBiConsumer(result4,list3, ()->(int)(Math.random()*10)+1, (bc1,bc2)->{
            bc1 = bc1.concat("-"+bc2+"");
            result4.add(bc1);
        });
        System.out.println(Arrays.toString(result4.toArray()));
        
        /*
         * lambda를 포함하면서 길이가 5이상인 문자열인가?
         */
        System.out.println(lambdaPredAnd("It's lambda", (String p1)->p1.contains("lambda"), (String p2)->p2.length()>5));
        
        /*
         * 숫자를 입력받아서 +1한 후에 *2를 수행한 숫자를 반환
         */
        System.out.println(lambdaFuncAndThen(1, f1->f1+1, f2->f2*2));
    }
    
    /**
     * boolean Predicate<T>
     * 하나의 인수를 받아서 적절한 로직을 처리한 후에 boolean을 반환한다.
     */
    public static <T> boolean lambdaIsEqual(Predicate<T> predicate,T t) {
        return predicate.test(t);
    }
    
    /**
     * void Consumer<T>
     * 하나의 인수를 받아서 인수를 소모한후 void를 반환.
     */
    public static <T> void lambdaProcessParam(Consumer<T> consumer,List<T> list) {
        list.forEach(e->consumer.accept(e));
    }
    
    /**
     * R Function<T>
     * <T,R>의 인수를 받아서 T타입을 R타입으로 변환후 반환한다.
     */
    public static <T,R> List<R> lambdaConvertParam(List<T> list, Function<T, R> f) {
        List<R> result = new ArrayList<>();
        list.forEach(c->{
            result.add(f.apply(c));
        });
        return result;
    }
    
    /**
     * T Supplier
     * 매개변수는 없고 T타입을 반환한다.
     * @return 
     */
    public static <T> T lambdaSupplier(Supplier<T> s) {
        return s.get();
    }
    /**
     * T타입의 매개변수를 받아 같은 타입의 T타입의 값을 반환
     */
    public static <T> List<T> lambdaUnaryOper(List<T> list,UnaryOperator<T> uo){
        List<T> result = new ArrayList<>();
        list.forEach(c->{
            result.add(uo.apply(c));
        });
        return result;
    }
    
    /**
     * T타입의 매개변수를 2개 받아서, T타입의 값으로 반환
     */
    public static <T> T lambdaBinaryOper(T a,T b,BinaryOperator<T> bo) {
        return bo.apply(a, b);
    }
    
    /**
     * T,R타입의 매개변수를 받아서 boolean 값을 반환
     */
    public static <T,R> boolean lambdaBiPred(T t,R r,BiPredicate<T, R> bp) {
        return bp.test(t, r);
    }
    
    /**
     * T,R타입의 매개변수를 받아서 적절한 처리를 한다. void 반환
     */
    public static <T,R> void lambdaBiConsumer(List<T> result,List<T> t , Supplier<R> r , BiConsumer<T, R> bc) {
        t.forEach(c->{
            bc.accept(c,r.get());
        });
    }
    
    /**
     * Predicate and/or/negate
     * 두개 이상의 Predicate를 and로 묶을 수 있다.
     */
    public static <T> boolean lambdaPredAnd(T t,Predicate<T> p1,Predicate<T> p2) {
        return p1.and(p2).test(t);
    }
    
    /**
     * Function andThen,compose
     * andThen a.andThen(b) a를 먼저 수행한 결과를 b의 함수의 입력으로 가져간다.
     * compose a.compose(b) b를 먼저 수행한 결과를 a의 함수의 입력으로 가져간다.
     */
    public static <T> T lambdaFuncAndThen(T t, Function<T, T> f1, Function<T, T> f2) {
        return f1.andThen(f2).apply(t);
    }
}
 
cs




이제는 람다에서 알아야할 몇가지 정보가 있다. 

1) 예외, 람다, 함수형 인터페이스의 관계 - 함수형 인터페이스는 확인된 예외를 던지는 동작을 허용하지 않는다. 즉, 예외를 던지는 람다 표현식을 만드려면 확인된 예외를 선언하는 함수형 인터페이스를 직접 정의하거나 람다를 try/catch 블록으로 감싸야한다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
@FunctionalInterface
public interface Lamda{
    T process(T t) throws Exception;
}
 
 
Function<BufferedReader, String> f = 
    (BufferedReader b)-> {
        try{
            return b.readLine();
        }catch(IOException e){
            throw new RuntimeException(e);
        }
    };
cs


2)형식 추론 - 위의 예제를 보면 매개변수로 전달하는 람다의 인자에는 인자의 형식을 캐스팅하지 않는다. 인자의 형식은 선언된 메소드에서 내부적으로 추론해낸다.


3)람다의 조합 - Predicate(and,or,negate),Function(andThen,compose).... 람다식끼리 조합해서 더 제한적인 조건 혹은, 더 많은 처리를 파이프라인처럼

묶을 수 있다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
    /**
     * Predicate and/or/negate
     * 두개 이상의 Predicate를 and로 묶을 수 있다.
     */
    public static <T> boolean lambdaPredAnd(T t,Predicate<T> p1,Predicate<T> p2) {
        return p1.and(p2).test(t);
    }
    
    /**
     * Function andThen,compose
     * andThen a.andThen(b) a를 먼저 수행한 결과를 b의 함수의 입력으로 가져간다.
     * compose a.compose(b) b를 먼저 수행한 결과를 a의 함수의 입력으로 가져간다.
     */
    public static <T> T lambdaFuncAndThen(T t, Function<T, T> f1, Function<T, T> f2) {
        return f1.andThen(f2).apply(t);
    }
cs


위의 예제에도 나왔지만 다시 한번 설명하기 위해 발췌했다. 이렇게 람다식끼리 조건부로 연결가능하다. 


이상 자바8의 람다에 대한 설명이었다. 사실 람다에 대해 설명하지 못한 것이 훨씬 많다. 내가 설명한 것은 람다의 일부일 것이다. 하지만 이번 포스팅에서는

람다란 무엇이고 어떤 식으로 접근하여 어떻게 사용할 것인가를 설명하기 위한 포스팅이었기에 이 글을 읽고 간단한 사용법을 익혀

나중에 응용해 나갔으면 하는 생각에 더 복잡하고 많은 설명을 하지 않았다.

posted by 여성게
:
Middleware/Redis 2019. 3. 1. 14:02

Springboot,redis - Redis repository 간단한사용법!




우선 처음에 조금 헤메긴 했지만, Redis Repository를 사용하려면 기존에 어떠한 Datasource라도 존재를 해야하는 것 같다. 그래서 임의로 인메모리디비인 H2를 dependency하였고, 모든 Datasource는 기본설정으로 두었다. 이렇게 인메모리 디비 데이터소스를 이용함에도 불구하고, 실제로는 Redis에 데이터가 삽입되는 것을 볼수 있다. 이유는 잘모르지만....아시는 분 있으시면 알려주세요...




1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.brownfield.pss</groupId>
    <artifactId>redis-cluster</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>redis-cluster</name>
    <description>Demo project for Spring Boot</description>
 
    <properties>
        <java.version>1.8</java.version>
    </properties>
 
    <dependencies>
        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.4</version>
            <scope>provided</scope>
        </dependency>
        <!-- H2 -->
        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.json/json -->
        <dependency>
            <groupId>org.json</groupId>
            <artifactId>json</artifactId>
            <version>20160810</version>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
 
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
 
</project>
 
cs


pom.xml이다. 일단 Datasource가 필요하다라는 예외메시지에 H2 인메모리 디비를 dependency했다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
 * Redis Configuration
 * @author yun-yeoseong
 *
 */
@Configuration
@EnableRedisRepositories
public class RedisConfig {
    
    
    /**
     * Redis Cluster 구성 설정
     */
    @Autowired
    private RedisClusterConfigurationProperties clusterProperties;
    
    /**
     * JedisPool관련 설정
     * @return
     */
    @Bean
    public JedisPoolConfig jedisPoolConfig() {
        return new JedisPoolConfig();
    }
    
    
    /**
     * Redis Cluster 구성 설정
     */
    @Bean
    public RedisConnectionFactory jedisConnectionFactory(JedisPoolConfig jedisPoolConfig) {
        return new JedisConnectionFactory(new RedisClusterConfiguration(clusterProperties.getNodes()),jedisPoolConfig);
    }
    
    /**
     * RedisTemplate관련 설정
     * 
     * -Thread-safety Bean
     * @param jedisConnectionConfig - RedisTemplate에 설정할 JedisConnectionConfig
     * @return
     */
    @Bean(name="redisTemplate")
    public RedisTemplate redisTemplateConfig(JedisConnectionFactory jedisConnectionConfig) {
        
        RedisTemplate redisTemplate = new RedisTemplate<>();
 
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        redisTemplate.setConnectionFactory(jedisConnectionConfig);
        
        return redisTemplate;
        
    }
//    @Bean
//    RedisTemplate<?, ?> redisTemplate(RedisConnectionFactory connectionFactory) {
//    
//      RedisTemplate<byte[], byte[]> template = new RedisTemplate<>();
//      template.setConnectionFactory(connectionFactory);
//      return template;
//    }
    
    
}
 
cs


Redis Config이다. @EnableRedisRepositories 어노테이션을 이용하여 레디스 레포지토리를 이용한다고 명시하였다. 나머지 설정은

이전 포스팅에서 구성했던 Redis Cluster 환경과 동일하게 진행하였다.



1
2
3
4
5
6
7
8
package com.spring.redis;
 
import org.springframework.data.repository.CrudRepository;
 
public interface RedisRepository extends CrudRepository<RedisEntity, Long> {
    public RedisEntity findByFirstname(String firstname);
}
 
cs

RedisRepositry 인터페이스이다. 사용법은 JPA 레포지토리랑 동일한것 같다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.spring.redis;
 
import java.io.Serializable;
 
import org.springframework.data.annotation.Id;
import org.springframework.data.redis.core.RedisHash;
import org.springframework.data.redis.core.index.Indexed;
 
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
 
@RedisHash("person")
@Getter
@Setter
@ToString
public class RedisEntity implements Serializable{
    
    private static final long serialVersionUID = 1370692830319429806L;
 
    @Id
    private Long id;
    
//    @Indexed
    private String firstname;
    
//    @Indexed
    private String lastname;
 
    private int age;
 
}
 
cs


Redis 엔티티 설정이다. @RedisHash("person")으로 해당 엔티티가 레디스엔티티임을 명시하였다. 여러글을 읽다가 딱 설명하기 좋은 글이있었다.




RedisEntity라는 엔티티 데이터들을 이후에 무수히 많이 저장이 될것이다. 그래서 이 엔티티들만을 보관하는 하나의 해쉬키 값이 @RedisHash("person")이

되는 것이다. 그리고 이 해쉬 공간에서 각 엔티티들이 person:hash_id 라는 아이디 값을 가지게 된다.(실제로 @Id에 매핑되는 것은 Hash_id) 

이것을 간단히 cli로 보여드리면



사실 key list를 불러오기 위해 " keys * "라는 명령어를 사용할 수 있지만, 이 명령어를 실행시키면 그동안 Redis의 모든 행동은 all stop 됨에 주의하자.


데이터 구조가 이해가 되는가?

이것을 자바의 해쉬 타입으로 지정한다면

HashMap<String,HashMap<String,Person>>의 구조가 되는 것이다. 해쉬의해쉬타입이 된다는 것이 그림에서도 표현이 되있다.



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.spring.redis;
 
import java.util.Arrays;
import java.util.List;
 
import javax.annotation.Resource;
 
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.ListOperations;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.data.redis.core.SetOperations;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.test.context.junit4.SpringRunner;
 
 
@RunWith(SpringRunner.class)
@SpringBootTest
public class RedisTest {
    
    @Autowired
    private RedisTemplate redisTemplate;
    
    @Autowired
    private RedisRepository repository;
    
//    @Test
//    public void testDataHandling() {
//        
//        redisTemplate.getConnectionFactory().getConnection().info().toString();
//        
//        String key = "yeoseong";
//        String value = "yoon";
//        redisTemplate.opsForValue().set(key, value);
//        String returnValue = (String) redisTemplate.opsForValue().get(key);
//        
//        System.out.println(value);
//    }
    
    @Test
    public void redisRepository() {
        RedisEntity entity = new RedisEntity();
        entity.setFirstname("yeoseong");
        entity.setLastname("yoon");
        entity.setAge(28);
        repository.save(entity);
        RedisEntity findEntity = repository.findByFirstname(entity.getFirstname());
        System.out.println(findEntity.toString());
    }
}
 
cs


posted by 여성게
:
Middleware/Redis 2019. 3. 1. 12:55

Springboot,Redis - Springboot Redis Nodes Cluster !(레디스 클러스터)



이전 포스팅에서는 Redis Server들의 고가용성을 위해 Redis Sentinel을 구성하여 Master-Slave 관계의 구성을 해보았습니다. 


▶︎▶︎▶︎Redis Sentinel 구성


Sentinel을 구성하여 Redis Server들의 고가용성을 키워주는 방법 이외에도 사실 Redis는 Cluster라는 좋은 기능을 지원해줍니다.

그럼 Sentinel은 무엇이고 Redis Cluster는 다른 것인가? 대답은 엄연히 다른 기능입니다. 

간단히 비교하면 Sentinel는 Master-Slave관계를

구성합니다.(Redis Server 끼리 구성을 갖춤). 하지만 Redis Server 이외에 Sentinel 인스턴스를 띄워주어야합니다. 그런 Sentinel 인스턴스들은

Redis Server들을 모니터링하고 고가용성을 위한 적당한 처리를 해줍니다. 그리고 Redis Server끼리의 데이터 동기화도 마춰줍니다. 이말은,

모든 Redis Server는 모두 같은 데이터들을 가지고 있는 것이죠.

하지만 Cluster를 이용하면 각 Redis Server들은 자신만의 HashSlot을 할당 받게 됩니다. 그리고 Cluster도 Master-Slave 관계를

구성하게 됩니다. 이말은 무엇이냐? 대략 16000개의 데이터 바구니를 나누어가지는 Redis Server들은 Master가 됩니다. Sentinel과는

다르게 하나의 마스터만 갖는 것이 아닙니다. 그리고 각 마스터에 대한 Slave 서버를 가지게 되는 것입니다. 더 자세한 사항은 아래 링크를 참조해주세요.


▶︎▶︎▶︎Cluster&Sentinel 그리고 Redis





이제는 Redis Cluster 구성을 해보겠습니다. 오늘 구성해볼 아키텍쳐입니다.

혹시나 Redis를 설치와 간단한 사용법에 대해 모르신다면 아래링크를 참조해주세요.


▶︎▶︎▶︎Redis 설치와 사용법



3개의 Master와 3개의 Slave 입니다.(편의상 Redis 폴더의 루트 == $REDIS)


$REDIS 위치에 cluster라는 폴더를 하나 구성해줍니다. 


그리고 해당 $REDIS/redis.conf를 cluster 폴더에 6개를 복사해줍니다.(redis-cluster1~6.conf)



이제 각 redis-cluster 설정파일을 수정할 것입니다. 이번에 할 설정은 간단한 설정입니다. 프러덕환경에서는

더 세부적인 설정이 필요할 수 있습니다.


이번예제는 동일한 서버에 6개의 port를 나누어 진행합니다. 만약 서로 다른 서버에 구성을 하시기 위해서는

적절히 인스턴스들을 나누어주시고 각 서버에 대해 포트 개방이 필요합니다.



redis-cluster1.conf - port:6379


설정은 직관적으로 어떠한 설정에 대한 것인지 알수 있습니다. 해당 인스턴스의 포트는 6379를 사용하고

클러스터를 사용하겠다. 그리고 해당 인스턴스가 클러스터에 대한 정보를 남기기위해 nodes.conf를 사용한다.

또한 타임아웃은 5초로 하고 모든 데이터는 영속하기 위해 항상 write마다 기록한다 라는 설정입니다.(데이터 유실방지)


나머지 인스턴스들의 설정도 port와 cluster-config-file의 설정만 구분하고 동일하게 작성합니다.


ex) port 6380, cluster-config-file nodes2.conf


설정 파일작성이 끝나셨으면 6개의 터미널을 띄워줍니다.


>cd src

>./redis-server ../cluster/redis-clusterN.conf 


총 6개의 레디스 인스턴스를 실행시킵니다.


그리고 하나 추가적으로 작업을 해주어야할 것이 있습니다. 실행되고 있는 인스턴스에 대해

명시적으로 클러스터 구성을 생성해주는 작업입니다. 이 과정은 버젼에 따라 총 2가지의 방법이 있습니다.


1
2
3
redis-cli --cluster create 127.0.0.1:6379 127.0.0.1:6380 \
127.0.0.1:6381 127.0.0.1:6382 127.0.0.1:6383 127.0.0.1:6384 \
--cluster-replicas 1
cs


$REDIS/src 의 redis-cli를 이용한 방법입니다. 클러스터 구성에 참여하는 인스턴스 정보를 모두 입력하고 마지막에 replicas 속성을

명시해줍니다. 마지막 속성은 마스터에 대한 슬레이브를 몇개를 둘것인가 라는 설정입니다.


1
2
./redis-trib.rb create --replicas 1 127.0.0.1:6379 127.0.0.1:6380 \
127.0.0.1:6381 127.0.0.1:6382 127.0.0.1:6383 127.0.0.1:6384
cs


동일한 속성에 대한 redis-trib.rb를 이용한 클러스터 구성방법입니다.


저는 첫번째 방법을 이용하였습니다. 명령어를 탁 치는 순간 3개는 마스터 3개는 슬레이브 노드를 임의로 

선택해 이렇게 클러스터를 구성하겠습니까? 라는 질문에 yes||no로 답변해주어야합니다. yes를 입력합니다.


이제는 클러스터 구성이 잘 되었는지 확인해볼까요?



잘 구성이 되었습니다 ! 여기서 한가지 집고 넘어가야 할 것이 있습니다. Redis Cluster 사용을 위해서는 그에 맞는 클라이언트가 필요합니다. 저는

그 클라이언트를 Springboot를 이용하여 구성해보았습니다. springboot의 Spring Redis 프로젝트를 생성해줍니다!



1
2
3
4
#Redis Cluster Config(마스터노드의 리스트)
spring.redis.cluster.nodes=127.0.0.1:6379,127.0.0.1:6380,127.0.0.1:6381
클러스터 노드간의 리다이렉션 숫자를 제한.
spring.redis.cluster.max-redirects=
cs


application.propeties 파일입니다. 클러스터에 참여하는 노드들을 나열해줍니다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
 * Redis Cluster Config
 * @author yun-yeoseong
 *
 */
@Component
@ConfigurationProperties(prefix = "spring.redis.cluster")
public class RedisClusterConfigurationProperties {
    
    /**
     * spring.redis.cluster.nodes[0]=127.0.0.1:6379
     * spring.redis.cluster.nodes[1]=127.0.0.1:6380
     * spring.redis.cluster.nodes[2]=127.0.0.1:6381
     */
    List<String> nodes;
 
    public List<String> getNodes() {
        return nodes;
    }
 
    public void setNodes(List<String> nodes) {
        this.nodes = nodes;
    }
    
    
}
cs


properties에 나열한 노드들의 정보를 얻기위한 빈을 하나 띄워줍니다. 물론 @Value로 직접 주입시켜주어도 상관없습니다. 해당 방법은 Spring Redis Document에 나온데로 진행하고 있는 중입니다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
 * Redis Configuration
 * @author yun-yeoseong
 *
 */
@Configuration
public class RedisConfig {
    
    
    /**
     * Redis Cluster 구성 설정
     */
    @Autowired
    private RedisClusterConfigurationProperties clusterProperties;
    
    /**
     * JedisPool관련 설정
     * @return
     */
    @Bean
    public JedisPoolConfig jedisPoolConfig() {
        return new JedisPoolConfig();
    }
    
    
    /**
     * Redis Cluster 구성 설정
     */
    @Bean
    public RedisConnectionFactory jedisConnectionFactory(JedisPoolConfig jedisPoolConfig) {
        return new JedisConnectionFactory(new RedisClusterConfiguration(clusterProperties.getNodes()),jedisPoolConfig);
    }
    
    /**
     * RedisTemplate관련 설정
     * 
     * -Thread-safety Bean
     * @param jedisConnectionConfig - RedisTemplate에 설정할 JedisConnectionConfig
     * @return
     */
    @Bean(name="redisTemplate")
    public RedisTemplate redisTemplateConfig(JedisConnectionFactory jedisConnectionConfig) {
        
        RedisTemplate redisTemplate = new RedisTemplate<>();
 
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        redisTemplate.setConnectionFactory(jedisConnectionConfig);
        
        return redisTemplate;
        
    }
    
    /**
     * 문자열 중심 편의 RedisTemplate
     * 
     * @param jedisConnectionConfig
     * @return
     */
    @Bean(name="stringRedisTemplate")
    public StringRedisTemplate stringRedisTemplate(JedisConnectionFactory jedisConnectionConfig) {
        StringRedisTemplate stringRedisTemplate = new StringRedisTemplate(jedisConnectionConfig);
        
        return stringRedisTemplate;
    }
    
}
 
cs


Redis Config를 위한 자바클래스입니다. 이제 정말로 잘되는지 확인해볼까요?


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@RunWith(SpringRunner.class)
@SpringBootTest
public class RedisTest {
    
    @Autowired
    private RedisTemplate redisTemplate;
    
    @Test
    public void testDataHandling() {
        
        redisTemplate.getConnectionFactory().getConnection().info().toString();
        
        String key = "yeoseong";
        String value = "yoon";
        redisTemplate.opsForValue().set(key, value);
        String returnValue = (String) redisTemplate.opsForValue().get(key);
        
        System.out.println(value);
    }
    
}
 
cs


결과값으로 "yoon"이라는 값을 얻어옵니다. 그러면 진짜로 클러스터된 노드들에서 얻어왔는지 확인해봐야겠습니다.



6379 포트의 인스턴스로 해당 값을 얻어오려고 하니 실제로는 6381에 해당 데이터가 있어 리다이렉트 됬다라는 로그와 함께 

결과 데이터를 얻어왔습니다. 여기까지 Redis Cluster 구성이었습니다. 부족한 부분이 많아 틀린 부분이 있다면 댓글 부탁드립니다!!


posted by 여성게
:
Middleware/Redis 2019. 2. 28. 14:11

Redis - Cluster & Sentinel 차이점 및 Redis에 대해


 

요즘 챗봇 개발 중에 Redis와 같은 In-memory Cache를 이용할 필요가 생겨서 Redis를 공부 하고있다.

많은 블로그를 보는 도중에 Redis에 대한 기술적인 좋은 글과 설명이 있어서 포스팅한다.


▶︎▶︎▶︎레디스 클러스터, 센티넬 구성 및 레디스 동작 방식




RDBMS만큼의 정합성과 영속성을 보장할 필요가 없는 데이터들을 빠르게 처리하거나 일정 기간동안만 보관하고 있기 위한 용도로 레디스(Redis), memcached 등의 in-memory 기반 저장소가 많이 사용된다. 그중에서도 Redis는 빠른 성능을 유지하면서도 일정 수준 이상의 persistence를 제공하고, 다양한 native 데이터 구조들을 제공하여 효율적이고 편리한 사용이 가능하게 해주기 때문에 다양한 use-case들이 존재한다.

이 글은 실제 명령어를 날려서 레디스를 직접 사용해보는 것을 배우기 보다는 어떤 in-memory 저장소를 선택할지 고민하는 분들을 위해서 주요 운영 방식, 레디스의 내부 동작 방식 및 특징, 주요 클라이언트들에 대한 정보를 제공하는 쪽에 초점이 맞춰져 있다.

운영 모드 (Operation Modes)

레디스는 단일 인스턴스만으로도 충분히 운영이 가능하지만, 물리 머신이 가진 메모리의 한계를 초과하는 데이터를 저장하고 싶거나, failover에 대한 처리를 통해 HA를 보장하려면 센티넬이나 클러스터 등의 운영 모드를 선택해서 사용해야 한다. 각 모드들이 어떤 특성을 갖는지 좀더 자세히 알아보도록 하자.

  • 단일 인스턴스(Single instance)
    • HA(High availibilty) 지원 안됨.
  • 센티넬(Sentinel)
    • HA 지원
    • master/slave replication
    • sentinel process
      • redis와 별도의 process
      • 여러개의 독립적인 sentinel process들이 서로 협동하여 운영된다 (SPOF 아님)
      • 안정적 운영을 위해서는 최소 3개 이상의 sentinel instance 필요 (fail over를 위해 과반수 이상 vote 필요)
        • redis process가 실행되는 각 서버마다 각각 sentinel process를 띄워놓는 방법
        • redis process가 실행되는 서버와 별개로 redis에 액세스를 하는 application server들에 sentinel process를 띄워놓는것도 가능
        • 등등 다양한 구성이 가능
      • 지속적으로 master/slave 가 제대로 동작을 하고있는지 모니터링
      • master에 문제가 감지되면 자동으로 failover 수행
    • 클라이언트는 어떻게 redis server에 연결해서 데이터를 조회하나?
      • 먼저 sentinel에 연결해서 현재 master를 조회해야 한다.
  • 클러스터(Cluster)
    • HA, sharding 지원
      • Sentinel과 동시에 사용하는 것이 아님! 완전히 별도의 솔루션.
      • dataset을 자동으로 여러 노드들에 나눠서 저장해준다.
      • Redis Cluster 기능을 지원하는 client를 써야만 데이터 액세스 시에 올바른 노드로 redirect가 가능하다.
    • Cluster node들의 동작 방식
      • serve clients 6379 (data port)
      • cluster bus 16379 (data port + 10000)
        • 자체적인 바이너리 프로토콜을 통해 node-to-node 통신을 한다.
        • failure detection, configuration update, failover authorization 등을 수행
      • 각 노드들은 클러스터에 속한 다른 노드들에 대한 정보를 모두 갖고있다.
    • Sharding 방식
      • 최대 1000개의 노드로 샤딩해서 사용. 그 이상은 추천하지 않음
      • consistent hashing을 사용하지 않는대신 hashslot이라는 개념을 도입
      • hashslot
        • 결정방법 CRC16(key) mod 16384를
          • CRC16을 이용하면 16384개의 슬롯에 균일하게 잘 분배됨
        • 노드별로 자유롭게 hash slot을 할당 가능
        • 예)
          • Node A contains hash slots from 0 to 5500.
          • Node B contains hash slots from 5501 to 11000.
          • Node C contains hash slots from 11001 to 16383.
      • 운영 중단 없이 hash slots을 다른 노드로 이동시키는 것이 가능
        • add/remove nodes
        • 노드별 hashslot 할당량 조정
      • multiple key operations 을 수행하려면 모든 키값이 같은 hashslot에 들어와야 한다.
        • 이를 보장하기위해 hashtag 라는 개념 도입
          • {} 안에있는 값으로만 hash 계산
          • {foo}_my_key
          • {foo}_your_key
    • Replication & failover
      • failover를 위해 클러스터의 각 노드를 N대로 구성가능 
      • master(1대) / slave(N-1대)
      • async replication (master → slave replication 과정에서 ack을 받지 않음)
        • 데이터 손실 가능성 존재
        • master가 client요청을 받아서 ack을 완료한 후, 해당 요청에 대한 replication이 slave로 전파되기 전에 master가 죽는 경우 존재
    • 클라이언트는 클러스터에 어떻게 연결해서 데이터를 조회하나?
      • redis client는 클러스터 내의 어떤 노드에 쿼리를 날려도 된다(슬레이브에도 가능).
        • ex) GET my_key
      • 쿼리를 받은 노드가 해당 쿼리를 분석
        • 해당 키를 자기 자신이 갖고있다면 바로 찾아서 값을 리턴
        • 그렇지 않은경우 해당 키를 저장하고 있는 노드의 정보를 리턴 (클라이언트는 이 정보를 토대로 쿼리를 다시 보내야함)
        • ex) MOVED 3999 127.0.0.1:6381

메모리 동작 방식

  • key가 만료되거나 삭제되어 redis가 메모리를 해제하더라도, OS에서 해당 분량만큼 바로 메모리가 확보되진 않음
    • 꼭 redis에만 해당되는 이야기는 아님
    • 5GB중 3GB의 데이터를 메모리에서 해제 -> OS 메모리 사용량은 여전히 5GB
    • 하지만 다시 데이터를 add하면 logically freed된 영역에 잡히므로 실제 메모리 5GB를 넘지는 않는다.
  • 따라서 peak memory usage 기준으로 잡아야 한다.
  • 대부분 5GB 정도 사용하고 가끔 10GB 가 필요하더라도, 10GB 메모리를 이상의 머신이 필요.
  • maxmemory 설정을 해두는게 좋음 (하지 않으면 무한히 메모리 사용하다가 머신 전체가 죽을 가능성)
    • maxmemory 설정시의 eviction policy
      • no-eviction (추가 memory를 사용하는 write command에 대해 에러 리턴)
      • allkeys-lru (전체 아이템 중에서 LRU)
      • volatile-lru (expire 되는 아이템 중에서 LRU)
      • volatile-ttl (expire 되는 아이템 중 TTL이 얼마 안남은 녀석 순으로)
    • RDB persistence를 사용한다면 maxmemory를 실제 가용 메모리의 45%정도로 설정하는것을 추천. 스냅샷을 찍을때 현재 사용중인 메모리의 양만큼 필요하다. (5%는 오버헤드에 대비한 마진)
    • 사용하고 있지 않다면 가용 메모리의 95%정도로
  • 동일 key-value 데이터를 저장한다고 가정했을 때, Cluster Mode를 사용할 경우 Single instance 보다 1.5~2배 정도 메모리를 더 사용하는것에 주의해야 한다.
    • Redis cluster의 경우 내부적으로 cluster안에 저장된 key를 hashslot으로 맵핑하기 위한 테이블을 가지고 있기 때문에 추가적인 memory overhead가 발생한다.
    • 이때문에 key의 숫자가 많아질수록 이러한 현상이 더 두드러진다
    • 4.x 버전에 이와 관련한 메모리 최적화 기능이 들어가서 3.x 버전보다는 더 적게 메모리를 사용하지만, 여전히 Single instance보다는 많은 메모리를 필요로 한다.

데이터 영속성 (Data Persistence)

memcached의 경우 데이터가 메모리에만 저장되기 때문에 프로세스가 재기동되면 메모리상의 데이터는 모두 유실된다. 하지만 redis의 경우 기본적으로 disk persistence가 설정되어있기 때문에, 프로세스를 재시작 하더라도 셧다운 되기 전의 마지막 상태와 거의 동일한 (약간의 손실은 있을 수 있다) 상태로 돌려 놓을 수 있다.

  • RDB persistence
    • 일정 인터벌로 point-in-time snapshots을 생성
    • compact한 단일 파일로 저장됨 백업하기 좋음
    • 부모 프로세스는 자식 프로세스를 fork. fork된 프로세스에서 모든 persist I/O처리
    • restart 시간이 빠르다
      • H/W 사양에 따라 다르겠지만 보통 메모리 사용량 1GB당 10~20 초정도 로드타임
    • 멈췄을때 데이터 유실 가능성이 더 높다.
    • 몇 분 정도의 데이터 유실 가능성을 감내 할 수 있다면 RDB를 쓰는것을 추천
  • AOF (append only file)
    • 모든 write operation 을 log로 남기고 서버 재시작 시점에 replay
    • 데이터 양이 많아지면 해당 시점의 데이터셋을 만들어낼 수 있도록하는 minimum log들만 남기는 compaction을 진행
    • 읽기 쉬운 포멧이지만 RDB보다 용량이 크다
    • write 리퀘스트가 많을때 RDB보다 반응성이 느리다
  • 참고: http://oldblog.antirez.com/post/redis-persistence-demystified.html

라인 메신저의 메시징 시스템에서는 RDB또는 AOF 사용으로 인해

트랜잭션 모델(Transaction model)

  • Redis는 single threaded model이고 요청이 들어온 순서대로 처리한다
  • MULTI → commands → EXEC/DISCARD
  • MULTI 이후의 명령들은 queue에 넣어뒀다가 EXEC가 불린순간 순차적으로 진행
    • EXEC를 통해서 실행중일때는 다른 connection에서 중간에 끼어들지 못한다
  • command 실행중에 에러가 발생해도 롤백하지 않고 계속 진행한다.
    • command에 잘못된 명령어나, 잘못된 타입의 인자를 넣었을때 에러 발생 -> 거의 개발자의 실수
    • 롤백기능을 없앤 덕분에 훨씬 빠른 성능 제공 가능
  • optimistic locking (using CAS operation) 사용
    • 값의 변경을 모니터링하다가, 값이 변경되었다면 현재 트랜잭션을 취소하고 다시 처음부터 실행
    • WATCH
      • 특정 키값이 변경되지 않은 경우에만 EXEC를 수행, 변경된경우 transaction자체를 수행하지 않음
      • EXEC가 불리는 시점에 모든 key에 대한 WATCH가 자동으로 UNWATCH
      • client의 연결이 끝나는 시점에도 모든 key에 대해 UNWATCH
    • 예시)
      • WATCH mykey
      • val = GET mykey
      • val = val + 1
      • MULTI
      • SET mykey $val
      • EXEC

주요 특수 기능

  • 다양한 데이터 구조 지원
    • 단순히 key – value 문자열만 저장하는 것이 아니라 고수준의 데이터 구조를 사용 가능하다
    • ex) Hash, Set, List, SortedSet, etc.
    • Hash
      • HSET(key, fields, value), HGET(key, field)
      • web application에서 특정 유저 userId를 key로 두고 해당 유저의 세부 정보들(name, email 등)을 field로 둔다
      • 이렇게하면 특정 유저와 관련된 정보들을 한번에 삭제하는 등의 namespace처럼 사용하는것도 가능하다.
      • hash key당 1000개 정도의 field까지는 레디스가 zipmap을 이용해 압축해서 저장한다
  • Expiration 지정
    • key별로 TTL(Time-To-Live)을 정해두면 레디스가 알아서 해당 시점이 지날때 key 삭제
    • 설정된 max memory에 도달하면 expire되지 않은 key들도 eviction policy에 따라 삭제될 수 있다.
  • Pipelining
    • 여러 커맨드들을 한번에 모아서 보낸후, 실행 결과도 한번에 모아서 받는다.
    • 레이턴시가 큰 네트워크 환경이라면 명령어 하나당 한번의 request/response를 할 때보다 스루풋을 올릴 수 있음.
  • Pub/Sub (출판/구독 모델)
    • 하나의 클라이언트가 같은 채널에 연결된 다른 클라이언트들에게 메시지들을 보내는 것이 가능
    • 이를 이용하여 속도가 빠른 메시지 브로드캐스터 혹은 메시지 큐 처럼 사용하는것이 가능하다.
  • Lua scripting
    • 여러 명령어들이 사용되는 복잡한 작업을 할 때 (특히 트랜잭션이 사용되는 경우) 이를 하나의 lua script로 만들어서 사용할 수있다.
    • 스크립트는 atomic하게 실행되기 때문에 optimistic locking transactions 를 사용할 필요가 없다.

지표 모니터링 (Monitoring Metrics)

  • used_memory
  • total_commands_processed
    • 이 지표와 요청/응답 latency 로깅해두면 명령어 처리량 대비 latency를 알 수 있다.
    • redis-cli -h {host} -p {port} —latency
  • slow command
    • slowlog get 명령으로 확인
  • client connections
    • info clients 명령으로 확인
    • 대략 5000개 이상의 커넥션들이 존재한다면 응답 시간이 느려지고 있을 가능성이 있으니 주의깊게 모니터링
    • maxclients 값을 대략 예상되는 connection 숫자의 1.1배~1.5배정도로 설정해 두면 안전함
  • mem_fragmentation_ratio
    • 정의 = 실제 물리 메모리 사용량 / 레디스의 메모리 사용량
    • 1 이면 이상적인 상태, 커질수록 파편화 심화
    • 1.5가 넘는다면 서버 재시작 추천
    • 1보다 작은 경우
      • 레디스의 메모리 사용량이 실제 물리 메모리 사용량보다 큰 경우는 OS에 의해 메모리의 일부가 swap되어 디스크로 이동되었을때 나타남
      • 디스크 사용으로 인해 성능 저하 발생
      • 메모리 사용량을 줄이거나 램 용량 증설 필요
  • 참고: https://www.datadoghq.com/pdf/Understanding-the-Top-5-Redis-Performance-Metrics.pdf

클라이언트 핸들링(Client Handling)

  • maxclients 설정 가능
    • 설정된 maxclient 도달시에 
  • timeout default = forever
  • Redis 3.2 이상부터 TCP keep-alive 디폴트로 켜져있음(약 300초)

Redis Client library for Java

언어별로 레디스 클라이언트 라이브러리들이 다양하게 존재하고 있다. 그중에서 자바언어로 만들어진 가장 많이 사용되는 3가지를 뽑아서 비교해 보았다.
async style API를 사용하는 경우 처리량(throughput)을 높일 수 있지만 오히려 개별 요청에 대한 응답 지연(latency)은 sync API 보다 더 느려질 수 있으니 상황에 맞게 선택해서 사용하는 것이 좋다.



posted by 여성게
: