인프라/Docker&Kubernetes 2020. 8. 24. 22:35

이번 포스팅에서는 쿠버네티스 로깅 파이프라인 구성에 대해 다루어볼 것이다. 저번 포스팅에서는 Fluentd + ES + Kibana 조합으로 클러스터 로깅 시스템을 구성했었는데, 이번 시간에는 Fluentd + kafka + ELK 조합으로 구성해본다.

<fluentd + ES + kibana logging>

 

 

Kubernetes - Kubernetes 로깅 운영(logging), Fluentd

오늘 다루어볼 내용은 쿠버네티스 환경에서의 로깅운영 방법이다. 지금까지는 쿠버네티스에 어떻게 팟을 띄우는지에 대해 집중했다면 오늘 포스팅 내용은 운영단계의 내용이 될 것 같다. 사실

coding-start.tistory.com

중간에 카프카를 두는 이유는 여러가지가 있을 수 있을 것 같다. 첫번째 버퍼역할을 하기때문에 어느정도 파이프라인의 속도 조절이 가능하다. 두번째 로그를 카프카 큐에 담아두고, 여러 컨슈머 그룹이 각기의 목적으로 로그데이터를 사용가능하다. 바로 실습에 들어가보자.

 

구성

 

 

구성은 위 그림과 같다. fluentd는 컨테이너 로그를 tail하고 있고, tail한 데이터를 카프카로 프로듀싱한다. 그리고 아웃풋으로 로그스태시로 보내고 로그 스태시는 엘라스틱서치에 색인을하게 된다.

 

실습이전에 본 실습에서 진행하는 예제중 카프카 구성과 엘라스틱서치의 구성은 별도로 옵션 튜닝 및 물리머신에 구성하는 것이 좋다. 필자는 구성의 편의를 위해 아무런 옵션을 튜닝하지 않은채 같은 쿠버네티스 클러스터에 카프카와 엘라스틱서치를 구성하였다.

 

kafka install & deploy on kubernetes unsing helm
 

TheOpenCloudEngine/uEngine-cloud-k8s

Contribute to TheOpenCloudEngine/uEngine-cloud-k8s development by creating an account on GitHub.

github.com

<헬름 설치>

> curl https://raw.githubusercontent.com/kubernetes/helm/master/scripts/get | bash
> kubectl --namespace kube-system create sa tiller
> kubectl create clusterrolebinding tiller --clusterrole cluster-admin --serviceaccount=kube-system:tiller
> helm init --service-account tiller
> helm repo update

위 명령어로 헬름을 다운로드 받는다.

 

<카프카 헬름 차트 설치 및 배포>

> kubectl create ns kafka
> helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
> helm install --name my-kafka --namespace kafka incubator/kafka

 

kafka라는 별도의 네임스페이스를 생성하여 그 안에 카프카를 배포하였다.

 

<헬름차트 삭제>

차트 삭제가 필요하면 아래 명령어를 이용하자.

# --purge 옵션으로 관련된 모든 정보를 지운다. 
helm delete my-kafka --purge

 

<fluentd가 데이터를 보낼 토픽생성>

> kubectl -n kafka exec my-kafka-0 -- /usr/bin/kafka-topics \
--zookeeper my-kafka-zookeeper:2181 --topic fluentd-container-logging \
--create --partitions 3 --replication-factor 3

Created topic "fluentd-container-logging".

 

"fluentd-container-logging"이라는 이름으로 토픽을 생성하였다.

 

<생성된 topic 확인>

> kubectl -n kafka exec my-kafka-0 -- /usr/bin/kafka-topics --zookeeper my-kafka-zookeeper:2181 --list

fluentd-container-logging

 

토픽리스트를 조회해서 우리가 생성한 토픽이 있는지 조회해본다.

 

<fluentd가 보낸 데이터가 큐로 잘들어오는지 확인하기 위해 컨슘머 실행>

> kubectl -n kafka exec -ti my-kafka-0 -- /usr/bin/kafka-console-consumer \
--bootstrap-server my-kafka:9092 --topic fluentd-container-logging --from-beginning

 

이제 실제로 카프카와 주키퍼가 쿠버네티스에 잘 떠있는지 확인해보자 !

 

> kubectl get pod,svc -n kafka
  NAME                       READY   STATUS    RESTARTS   AGE
  pod/my-kafka-0             1/1     Running   2          4m14s
  pod/my-kafka-1             1/1     Running   0          116s
  pod/my-kafka-2             1/1     Running   0          78s
  pod/my-kafka-zookeeper-0   1/1     Running   0          4m14s
  pod/my-kafka-zookeeper-1   1/1     Running   0          3m32s
  pod/my-kafka-zookeeper-2   1/1     Running   0          3m
  NAME                                  TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                      AGE
  service/my-kafka                      ClusterIP   10.108.104.66   <none>        9092/TCP                     4m14s
  service/my-kafka-headless             ClusterIP   None            <none>        9092/TCP                     4m14s
  service/my-kafka-zookeeper            ClusterIP   10.97.205.63    <none>        2181/TCP                     4m14s
  service/my-kafka-zookeeper-headless   ClusterIP   None            <none>        2181/TCP,3888/TCP,2888/TCP   4m14s

 

위와 같이 팟과 서비스 목록이 보인다면 다음으로 넘어간다.

 

ELK Stack 구성

<elasticsearch 실행>

아래 deployment와 service 설정파일을 이용하여 쿠버네티스 위에 엘라스틱서치를 구성한다.

 

apiVersion: v1
kind: Service
metadata:
  name: elasticsearch
  namespace: elk-stack
spec:
  selector:
    app: elasticsearch
  ports:
    - port: 9200
      protocol: TCP
      targetPort: 9200
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: elasticsearch
  namespace: elk-stack
  labels:
    app: elasticsearch
spec:
  replicas: 1
  selector:
    matchLabels:
      app: elasticsearch
  template:
    metadata:
      labels:
        app: elasticsearch
    spec:
      containers:
      - name: elasticsearch
        image: elastic/elasticsearch:6.8.6
        ports:
        - containerPort: 9200
          name: http
        - containerPort: 9300
          name: tcp

 

위 설정 파일은 볼륨을 구성하지 않아서 일회성(테스트)로만 가능하다. 실제로 운영환경에서는 물리머신에 클러스터를 구성하던가, 혹은 쿠버네티스 볼륨을 붙여서 구성하자.

 

> kubectl apply -f ./kube-logging/fluentd-elasticsearch/elasticsearch.yaml
> kubectl get pod,svc -n elk-stack
  NAME                                 READY   STATUS    RESTARTS   AGE
  pod/elasticsearch-654c5b6b77-l8k2z   1/1     Running   0          50s
  NAME                    TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)    AGE
  service/elasticsearch   ClusterIP   10.101.27.73   <none>        9200/TCP   50s

 

<kibana 실행>

키바나는 아래 설정파일을 예제로 구성하였다.

 

apiVersion: v1
kind: Service
metadata:
  name: kibana
  namespace: elk-stack
spec:
  selector:
    app: kibana
  ports:
  - protocol: TCP
    port: 5601
    targetPort: 5601
  type: NodePort
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kibana
  namespace: elk-stack
  labels:
    app: kibana
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kibana
  template:
    metadata:
      labels:
        app: kibana
    spec:
      containers:
      - name: kibana
        image: elastic/kibana:6.8.6
        ports:
        - containerPort: 5601
          name: http

 

위 설정중 조금 살펴봐야할 것은 서비스 타입을 NodePort로 준 점이다. 실제로 외부로 포트를 개방해 localhost로 접근 가능하다. 실제 운영환경에서는 ingress까지 구성하여 배포하자.

 

> kubectl apply -f ./kube-logging/fluentd-elasticsearch/kibana.yaml
> kubectl get pod,svc -n elk-stack | grep kibana
  NAME                                 READY   STATUS    RESTARTS   AGE
  pod/kibana-6d474df8c6-fsfc7          1/1     Running   0          24s
  NAME                                 READY   STATUS    RESTARTS   AGE
  service/kibana          NodePort    10.97.240.55   <none>        5601:30578/TCP   24s

 

http://localhost:30578로 접근해 키바나가 잘 떠있는지와 엘라스틱서치와 잘 연동되었는지 확인하자.

 

<logstash 실행>

로그스태시는 아래 예시 설정 파일로 구성하였다.

 

apiVersion: v1
kind: ConfigMap
metadata:
  name: logstash-configmap
  namespace: elk-stack
data:
  logstash.yml: |
    http.host: "127.0.0.1"
    path.config: /usr/share/logstash/pipeline
    pipeline.workers: 2
  logstash.conf: |
    # all input will come from filebeat, no local logs
    input {
      kafka {
        bootstrap_servers => "my-kafka.kafka.svc.cluster.local:9092"
        topics => "fluentd-container-logging"
        group_id => "fluentd-consumer-group"
        enable_auto_commit => "true"
        auto_offset_reset => "latest"
        consumer_threads => 4
        codec => "json"
      }
    }

    output {
        elasticsearch {
          hosts => ["http://elasticsearch.elk-stack.svc.cluster.local:9200"]
          manage_template => false
          index => "kubernetes-container-log-%{+YYYY-MM-dd}"
        }
    }
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: logstash-deployment
  namespace: elk-stack
spec:
  replicas: 1
  selector:
    matchLabels:
      app: logstash
  template:
    metadata:
      labels:
        app: logstash
    spec:
      containers:
        - name: logstash
          image: docker.elastic.co/logstash/logstash:5.6.0
          ports:
            - containerPort: 5044
          volumeMounts:
            - name: config-volume
              mountPath: /usr/share/logstash/config
            - name: logstash-pipeline-volume
              mountPath: /usr/share/logstash/pipeline
      volumes:
        - name: config-volume
          configMap:
            name: logstash-configmap
            items:
              - key: logstash.yml
                path: logstash.yml
        - name: logstash-pipeline-volume
          configMap:
            name: logstash-configmap
            items:
              - key: logstash.conf
                path: logstash.conf
---
apiVersion: v1
kind: Service
metadata:
  name: logstash-service
  namespace: elk-stack
spec:
  selector:
    app: logstash
  ports:
    - protocol: TCP
      port: 5044
      targetPort: 5044
  type: ClusterIP

 

설정에서 잘 살펴볼 것은 input과 output의 호스트 설정이다. 우리는 모든 모듈을 같은 클러스터에 설치할 것이기 때문에 쿠버네티스 내부 DNS를 사용하였다.(실습에 편의를 위한 것이기도 하지만, 실제 운영환경에서도 내부 시스템은 종종 클러스터 내부 DNS를 사용하기도 한다. 그러면 실제로 통신하기 위해 클러스터 밖으로 나갔다 오지 않는다.)

 

또 한가지 설정은 Deployment에 볼륨을 마운트 하는 부분이다. 실제 쿠버네티스에서 ConfigMap은 볼륨으로 잡히기 때문에 그 ConfigMap을 logstash pod 내부로 마운트하여 실행시점에 해당 설정파일을 물고 올라가도록 하였다.

 

> kubectl apply -f ./kube-logging/fluentd-elasticsearch/logstash.yaml
> kubectl get pod,svc -n elk-stack | grep logstash
  NAME                                       READY   STATUS    RESTARTS   AGE  
  pod/logstash-deployment-556cfb66b5-6xrs6   1/1     Running   0          34s
  service/logstash-service   ClusterIP   10.96.13.170   <none>        5044/TCP         33s

 

<fluentd 실행>

이제는 실제 컨테이너 로그를 tail하여 수집하는 fluentd를 실행시켜보자.

 

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluentd
  namespace: kube-system
  labels:
    app: fluentd-logging
    version: v1
    kubernetes.io/cluster-service: "true"
spec:
  selector:
    matchLabels:
      app: fluentd-logging
  template:
    metadata:
      labels:
        app: fluentd-logging
        version: v1
        kubernetes.io/cluster-service: "true"
    spec:
      tolerations:
        - key: node-role.kubernetes.io/master
          effect: NoSchedule
      containers:
        - name: fluentd
          image: 1223yys/fluentd-kafka:latest
          imagePullPolicy: Always
          env:
            - name: FLUENT_KAFKA_BROKERS
              value: "my-kafka.kafka.svc.cluster.local:9092"
            - name: FLUENT_KAFKA_DEFAULT_TOPIC
              value: "fluentd-container-logging"
            - name: FLUENT_KAFKA_OUTPUT_DATA_TYPE
              value: "json"
            - name: FLUENT_KAFKA_COMPRESSION_CODEC
              value: "snappy"
            - name: FLUENT_KAFKA_MAX_SEND_LIMIT_BYTES
              value: "4096"
          resources:
            limits:
              memory: 200Mi
            requests:
              cpu: 100m
              memory: 200Mi
          volumeMounts:
            - name: varlog
              mountPath: /var/log
            - name: varlibdockercontainers
              mountPath: /var/lib/docker/containers
              readOnly: true
      terminationGracePeriodSeconds: 30
      volumes:
        - name: varlog
          hostPath:
            path: /var/log
        - name: varlibdockercontainers
          hostPath:
            path: /var/lib/docker/containers

 

fluentd 설정파일은 몇가지 짚고 넘어갈 것들이 있다. 첫번째는 컨테이너를 tail하기 위해 마운트한 설정이다. /var/log, /var/lib/docker/container를 마운트하였다. 실제 호스트머신에 해당 디렉토리에 들어가면 파일이 보이지 않을 것이다. 만약 파일을 보고 싶다면 아래 설정을 통해 도커 컨테이너를 실행시키고 볼 수 있다.

 

> docker run -it --rm -v /var/lib/docker/containers:/json-log alpine ash

 

위 도커이미지를 실행한후 /json-log 디렉토리에 들어가면 호스트머신에 쌓인 컨테이너 로그들을 볼 수 있다.

 

두번째, tail한 로그를 내보내기 위한 env 설정이다. 아웃풋은 카프카로 두었고, 역시 도메인은 내부 클러스터 DNS로 잡아주었다. 그리고, 우리가 미리 생성한 토픽에 데이터를 보내고 있고 타입은 json으로 보내고 있다.(사실상 튜닝할 설정은 많지만 실습의 편의를 위해 대부분 기본 설정으로 잡았다.)

 

그리고 필자가 fluentd 이미지를 새로 빌드한 이유는 카프카로 보내는 로그 포맷을 수정하기 위하여 fluentd 설정파일들을 조금 수정하였기 때문이다. 혹시나 fluentd 설정 파일들이 궁금하다면 포스팅 마지막 Github을 참조하자.(https://github.com/yoonyeoseong/kubernetes-sample/tree/master/kube-logging/fluentd-kafka)

 

> kubectl apply -f ./kube-logging/fluentd-kafka/fluentd-kafka-daemonset.yaml
> kubectl get pod,daemonset -n kube-system | grep fluentd
  NAME                                         READY   STATUS    RESTARTS   AGE
  pod/fluentd-bqmnl                            1/1     Running   0          34s
  daemonset.extensions/fluentd      1         1         1       1            1           <none>                        34s

 

이제 로그 출력을 위해 샘플 앱을 실행시켜보자. 로그 출력을 위한 앱은 꼭 아래 필자가 빌드한 웹 어플리케이션을 실행시킬 필요는 없다. 만약 아래 애플리케이션을 실행시키려면 ingress 설정 혹은 service node port를 설정하자.

 

> kubectl apply -f ./kube-resource/deployment-sample.yaml
> kubectl get pod
  NAME                                 READY   STATUS    RESTARTS   AGE
  sample-deployment-5fbf569554-4pzrf   0/1     Running   0          17s

 

이제 요청을 보내보자.

 

> kubectl get svc -n ingress-nginx
  NAME                                 TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)                      AGE
  ingress-nginx-controller             NodePort    10.97.27.106   <none>        80:30431/TCP,443:31327/TCP   21d
  ingress-nginx-controller-admission   ClusterIP   10.96.76.113   <none>        443/TCP                      21d
> curl localhost:30431/api

 

이제 키바나에 접속해보면 앱에서 출력하고 있는 로그 데이터를 볼 수 있다. 모든 예제 설정 및 코드는 아래 깃헙을 참고하자 !

 

 

yoonyeoseong/kubernetes-sample

Kubernetes(쿠버네티스) sample. Contribute to yoonyeoseong/kubernetes-sample development by creating an account on GitHub.

github.com

 

posted by 여성게
:

 

오늘 간단히 다루어볼 내용은 엘라스틱서치의 REST 자바 클라이언트인 Rest High Level Client를 이용하여 Index Template을 생성해보는 예제이다. 바로 예제로 들어간다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void indexTemplate() throws IOException {
        
        String typeName = "_doc";
        
        if(!existTemplate()) {
            
            try(RestHighLevelClient client = createConnection();){
                PutIndexTemplateRequest templateRequest = new PutIndexTemplateRequest("log-template");
                
                templateRequest.patterns(Arrays.asList("logstash-*"));
                
                XContentBuilder mapping = XContentFactory.jsonBuilder()
                                                         .startObject()
                                                             .startObject(typeName)
                                                                 .startObject("properties")
                                                                     .startObject("date")
                                                                           .field("type","date")
                                                                      .endObject()
                                                                      .startObject("fieldName")
                                                                          .field("type","keyword")
                                                                      .endObject()
                                                                 .endObject()
                                                             .endObject()
                                                         .endObject();
                
                templateRequest.mapping("_doc", mapping);
                
                AcknowledgedResponse templateResponse = client.indices().putTemplate(templateRequest, RequestOptions.DEFAULT);
                
                if(!templateResponse.isAcknowledged()) throw new ElasticsearchException("Create Index Template Failed !");
                
            }
        }
        
    }
cs

 

해당 인덱스 템플릿으로 생성될 수 있는 인덱스 패턴은 배열로 여러개 지정가능하다. 현재 설정은 단순 mapping만 설정하였지만, settings 정보까지 인덱스 템플릿 설정으로 넣어줄 수 있다. 만약 로그스태시나 비트 프레임워크를 엘라스틱과 연동하여 일자별 로그를 수집하는 기능을 구현한다면 미리 인덱스 템플릿으로 생성될 인덱스의 정의를 잡아주는 것이 좋을 것이다.

 

이제 해당 인덱스 템플릿을 구성한 이후에 logstash-*로 시작하는 인덱스가 생성될때 위와 같은 mapping 설정대로 필드가 생성될 것이다.

posted by 여성게
:

 

엘라스틱서치는 대량의 데이터를 처리하기 위해 기본적으로 데이터를 분산해서 처리한다. 검색요청이 발생하면 엘라스틱서치는 모든 샤드에게 브로드캐스트 방식으로 동시에 요청을 보내고 각각 샤드들이 데이터를 검색한후 결과를 반환하면 엘라스틱서치는 모든 결과를 취합하여 사용자에게 검색 결과를 전달한다. 이러한 동작 방식 때문에 제공되는 부가적인 환경설정값이 있다.

 

동적 분배 방식의 샤드 선택

엘라스틱서치는 부하 분산과 장애처리를 위하여 원본 샤드 + 복제 리플리카 샤드를 운영한다. 물론 원본 샤드와 복제 리플리카 샤드는 각각 다른 노드에 위치하게 된다. 그렇다면 위에서 엘라스틱서치는 검색요청시 모든 샤드에 브로드캐스트 방식으로 검색요청을 보낸다 했는데, 원본 샤드와 복제 리플리카 샤드 두개 모두에게 검색 요청이 갈까? 만약 그렇다면 중복된 답변을 사용자에게 내보낼 것이다. 하지만 엘라스틱서치는 내부적으로 기본 라운드로빈 방식으로 원본,복제 샤드를 번갈아가며 요청을 보낸다. 하지만 이 라운드로빈 방식 이외에도 동적 분배 방식의 알고리즘도 제공한다. 동적 분배 방식은 검색 요청의 응답시간, 검색 요청을 수행하는 스레드 풀의 크기등을 고려해 최적의 샤드를 동적으로 결정하는 방식이다.

 

1
2
3
4
5
6
7
설정) PUT http://localhost:9200/_cluster/settings
 
{
    "transient":{
        "cluster.routing.use_adaptive_replica_selection":true
    }
}
cs

 

글로벌 타임아웃 설정

이전 포스팅에서 쿼리 요청시 응답까지의 타임아웃 설정을 할 수 있다는 것을 다루어봤다. 하지만 모든 쿼리마다 타임아웃 시간을 보내기는 번거로울 수가 있다. 그럴때 사용하는 것이 글로벌 타임아웃 설정이다.

 

 

Elasticsearch - 2.검색 API(Elasticsearch Query DSL)

엘라스틱서치는 인덱스에 저장된 문서를 검색할 수 있도록 다양한 검색기능을 제공한다. 문서는 색인시 설정한 Analyzer에 의해 분석과정을 거쳐 토큰으로 분리되는데, 이러한 Analyzer는 색인 시점 말고도 검색..

coding-start.tistory.com

 

1
2
3
4
5
6
7
설정) PUT http://localhost:9200/_cluster/settings
 
{
    "transient":{
        "search.default_search_timeout":"3s"    
    }
}
cs

 

Count Search API

결과값으로 전문이 아닌 문서 매칭 개수를 알고 싶다면 해당 쿼리를 사용하면 된다.

 

1
2
3
4
5
6
7
8
9
설정) POST http://localhost:9200/movie_search/_count
 
{
    "query":{
        "match":{
            "movieNm":"그대"
        }
    }
}
cs

 

Validate API

작성한 쿼리에 문법적인 오류가 있는지 확인할 수 있는 요청이다.

 

1
2
3
4
5
설정) POST http://localhost:9200/movie_search/_validate/query?rewrite=true
 
{
    "validation 할 쿼리작성"
}
cs

 

Explain API

쿼리 결과로 나온 점수에 대한 상세한 설명이 필요하다면 해당 쿼리를 이용한다.

 

1
2
3
4
5
6
7
8
9
설정) POST http://localhost:9200/movie_search/_doc/문서ID/_explain
 
{
    "query":{
        "term":{
            "typeNm":"장편"
        }
    }
}
cs

 

Profile API

요청한 질의를 실행하는 과정에서 각 샤드별로 얼마나 많은 시간이 소요됐는지의 등의 정보를 각 샤드별로 보고 싶다면 해당 쿼리를 이용한다.

 

1
2
3
4
5
6
7
8
9
10
설정) POST http://localhost:9200/movie_search/_search
 
{
    "profile":true,
    "query":{
        "term":{
            "typeNm":"장편"
        }
    }
}
cs

 

여기까지 일부 환경설정 및 부가적인 쿼리 API에 대해 다루었다. 

posted by 여성게
:

엘라스틱서치는 인덱스에 저장된 문서를 검색할 수 있도록 다양한 검색기능을 제공한다. 문서는 색인시 설정한 Analyzer에 의해 분석과정을 거쳐 토큰으로 분리되는데, 이러한 Analyzer는 색인 시점 말고도 검색 시점에도 이용된다. 특정 문장이 검색어로 요청되면 분석기를 통해 분석된 토큰의 일치 여부를 판단하여 그 결과에 Score을 매긴다. 이러한 엘라스틱서치에서는 다양한 검색 조건을 주기위하여 Query DSL이라는 특수한 쿼리 문법을 제공한다.

 

1. 검색 API

문장은 색인 시점에 텀으로 분리된다. 검색 시에는 이 텀을 일치시켜야 검색이 가능하다. 엘라스틱서치는 루씬기반이기 때문에 색인 시점에 Analyzer를 통해 분석된 텀을 Term, 출현빈도, 문서번화와 같이 역색인 구조로 만들어 내부적으로 저장한다. 검색 시점에는 Keyword 데이터 타입과 같은 분석이 되지 않는 데이터와 Text 데이터 타입과 같은 분석이 가능한 데이터를 구분해서 분석이 가능할 경우 분석기를 이용해 분석을 수행한다. 이를 통해 검색 시점에도 형태소분석이 되든 되지 않든 텀을 얻을 수 있으며, 해당 텀으로 색인 시점에 생긴 역색인 테이블을 뒤져 문서를 찾고 스코어를 계산하여 결과로 제공한다.

 

1.1. 검색 질의 방식

엘라스틱서치에서 제공하는 검색 API는 기본적으로 Query기반으로 동작한다. 검색 질의에는 검색하고자 하는 각종 Query 조건을 명시할 수 있으며, 동일한 조건을 두 가지 방식으로 표현 가능하다.

 

  1. URI 검색(루씬 스타일)
  2. Request Body 검색

첫번째로 우선 URI방식을 살펴본다. 

 

1.1.1. URI 검색 방식

URI를 이용하는 방식은 HTTP GET 요청을 활용한다. 즉, 쿼리파라미터로 Query를 작성하는 것이다. 간단한 쿼리는 쉽지만 쿼리파라미터의 표현한계로 인해 복잡한 질의는 불가능하다.

 

예시) GET http://localhost:9200/인덱스명/_search?q=필드명:검색값

 

1.1.2. Request Body 검색 방식

Request Body 방식은 HTTP 요청 시 Body에 검색할 칼럼과 검색어를 미리 정의된 JSON 구조로 표현하여 질의하는 방식이다. JSON 구조의 표현을 조금더 효율적으로 하기 위해 엘라스틱서치는 Query DSL이라는 미리 정의된 특별한 문법을 지원한다. URI 검색방식보다 더욱 풍부한 조건의 질의가 가능하다.

 

1
2
3
4
5
6
7
8
9
예시) POST http://localhost:9200/인덱스명/_search
     Header Content-Type:application/json
Request Body {
                "query":{
                   "term":{
                      "typeNm":"장편"
                   }
                }
             }
cs

 

그러면 각 검색 표현법의 장단점은 무엇일까?

 

1.2.1. URI 검색 방식

URI 검색은 Request Body 검색에 비해 단순하고 사용하기 편하지만 복잡한 질의문을 입력하기 힘들다는 치명적인 단점이 있다. 또한 URI 검색을 이용할 경우에는 엘라스틱서치에서 제공하는 모든 검색 API 옵션을 사용할 수 없다. 이유는 쿼리파라미터 표현법에는 표현상 한계가 있기 때문이다. 하지만 간단한 쿼리 혹은 테스트에서는 간편함을 제공하기도 함으로 URI 검색 방식의 주요 파라미터를 알아보자.

 

파라미터 기본값 설명
q - 검색을 수행할 쿼리 문자열 조건을 지정.
df - 쿼리에 검색을 수행할 필드가 지정되지 않았을 경우 기본값으로 검색할 필드를 지정한다.
analyzer 검색 대상 필드에 설정된 형태소 분석기(색인 시점 분석기) 쿼리 문자열을 형태소 분석할때 사용할 분석기 지정
analyze_wildcard false 접두어/와일드카드 검색 활성화 여부 지정
default_operator OR 두 개 이상의 검색 조건이 q에 포함된 경우 검색 조건 연산자를 설정한다.
_source true 검색 결과에 문서 본문 포함 여부를 지정
sort - 정렬 기준 필드 지정
from - 검색 시작 위치 지정
size - 반환 결과 개수 지정

 

q 옵션에는 기본적으로 '필드명:검색어' 형태로 입력할 수 있으며, 여러개의 필드를 검색 할때는 공백을 입력한 후에 추가적인 필드명과 검색어를 입력한다. URI 검색 방식의 q 옵션은 Request Body 검색에서 제공하는 Query String Search 옵션과 동일하게 동작한다.

 

예시) http://localhost:9200/인덱스명/_search?q=필드명1:필드값1 AND 필드명2:필드값2&

        analyze_wildcard=true&from=0&size=5&sort=_score:desc,필드명:asc&

       _source_includes=필드명,필드명,필드명,필드명

 

 

1.2.2. Request Body 검색 방식

위의 URI 검색방식 예시를 그대로 Request Body 검색 방식으로 바꾸어보겠다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
POST http://localhost:9200/인덱스명/_search
Header Content-Type : application/json
<Request Body>
{
    "query":{
        "query_string":{
            "default_field":"필드명"
            ,"query":"필드명1:필드값1 AND 필드명2:필드값2"
        }
    }
    ,"from":0
    ,"size":5
    ,"sort":[{
        "_score":{
            "order":"desc"
        }
        ,"필드명":{
            "order":"asc"
        }
    }]
    ,"_source":[
        "필드명"
        ,"필드명"
        ,"필드명"
        ,"필드명"
    ]
}
cs

 

간단한 검색 조건인 경우에는 URI 검색방식이 편해보일 수도 있지만, 구조화된 JSON 구조를 파악하면 더욱 깔끔한 질의작성이 가능하다.

 

 

2. Request Body 검색 방식 - Query DSL 

엘라스틱서치로 검색 질의를 요청할때는 Request Body 방식과 URI 방식 모두 _search API를 이용한다. 하지만 Query DSL 검색을 이용하면 여러개의 질의를 조합하거나 질의 결과에 대해 또 검색을 수행하는 등의 기존 URI 방식 보다 강력한 검색이 가능하다. 엘라스틱서치의 Query DSL은 구조화된 JSON 형태를 사용한다.

 

2.1. Query DSL 구조

 

<요청>

1
2
3
4
5
6
7
8
9
10
{
    "size" : "-->반환받는 결과 개수"
    ,"from" : "-->몇번째 문서부터 가져올지 지정. 기본값은 0이다."
    ,"timeout" : "-->검색 요청시 결과를 받는 데까지 걸리는 시간. timeout을 너무 작게하면
                  전체 샤드에서 timeout을 넘기지 않은 문서만 결과로 출력된다. 기본 값은 무한이다."
    ,"_source" : "-->검색시 필요한 필드만 출력하고 싶을 때 사용"
    ,"query" : "-->검색 조건문이 들어가는 공간"
    ,"aggs" : "-->통계 및 집계 데이터를 사용할때 사용하는 공간"
    ,"sort" : "-->문서 결과의 정렬 조건이 들어가는 공간"
}
cs

 

엘라스틱서치로 쿼리가 요청되면 해당 쿼리를 파싱해서 문법에 맞는 요청인지 검사한 후에 파싱에 성공하면 해당 쿼리를 기반으로 검색을 수행하고, 결과를 JSON으로 돌려준다.

 

<응답>

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
    "took" : "-->쿼리 실행 시간"
    ,"timed_out" : "-->쿼리 시간이 초과할 경우를 나타낸다"
    ,"_shards" : {
        "total" : "-->쿼리를 요청한 전체 샤드 개수"
        ,"successful" : "-->검색 요청에 성공적으로 응답한 샤드 개수"
        ,"failed" : "-->검색 요청에 실패한 샤드 개수"
    }
    ,"hits" : {
        "total" : "-->질의에 매칭된 문서의 전체 개수"
        ,"max_score" : "-->일치하는 문서의 스코어 값중 가장 높은 값"
        ,"hits" : [결과 문서 정보와 스코어 값등을 보여줌]
    }
}
cs

 

만약 JSON문법의 오류가 있다면 오류 결과를 JSON으로 넘겨준다.

 

 

2.2. Query DSL 쿼리와 필터

Query DSL을 이용해 검색 질의를 작성할 때 조금만 조건이 복잡해 지더라도 여러개의 질의를 조합해 사용해야 한다. 이때 작성되는 작은 질의들을 두 가지 형태로 나눌 수 있다. 실제 분석기에 의한 전문 분석이 필요한 경우와 단순히 "YES/NO"로 판단할 수 있는 조건 검색의 경우다. 엘라스틱서치에서는 전자를 쿼리 컨텍스트라고 하고, 후자를 필터 컨텍스트라는 용어로 구분한다.

 

  쿼리 컨텍스트 필터 컨텍스트
용도 전문 검색 시 사용 조건 검색 시 사용
특징

1.분석기에 의해 분석이 수행됨.                    2.연관성 관련 점수 계산                              3.루씬 레벨에서 분석 과정을 거처야 하므로 상대적으로 느림.

1.YES/NO로 단순 판별 가능                           2.연관성 관련 점수 계산 안함                         3.엘라스틱서치 레벨에서 처리가 가능하므로 상대적으로 빠름

사용 예 "삼성전자의 본사는 어디있죠?" 같은 문장 분석

"created_year" 필드값이 2018년도인지 여부 "status" 필드에 "user"라는 코드 포함여부

 

대부분의 경우 쿼리 방식과 필터 방식 중 어떤 방식으로 검색 질의를 표현하더라도 같은 결과를 얻을 수도 있다. 하지만 둘의 용도와 성능차이가 있기 때문에 반드시 용도에 맞는 사용을 하는 것이 좋다.

 

2.2.1. 쿼리 컨텍스트

 

  • 문서가 쿼리와 얼마나 유사한지를 점수로 계산
  • 질의가 요청될 때마다 엘라스틱서치에서 내부의 루씬을 이용해 계산을 수행(결과 캐싱 x)
  • 일반적으로 전문 검색에 이용
  • 캐싱되지 않고 디스크 연산을 수행하기에 상대적으로 느림

 

2.2.2. 필터 컨텍스트

 

  • 쿼리의 조건과 문서가 일치하는지를 구분
  • 별도로 점수 계산은 하지 않고 단순 매칭 여부만 검사
  • 자주 사용되는 필터의 결과는 내부적으로 캐싱
  • 기본적으로 메모리 연산이기에 속도 빠름

 

2.3.1. Query DSL 주요 파라미터

 

Multi Index 검색

기본적으로 모든 검색 요청은 Multi Index, Multi Type 검색이 가능하다. 즉, 여러 인덱스를 검색할때 한번의 요청으로 검색요청을 보낼 수 있다. 검색 요청시 ","로 다수의 인덱스명을 입력한다.

 

예시) POST http://localhost:9200/인덱스명1,인덱스명2/_search ...

 

두개의 인덱스명이 공통적인 필드만 갖고 있다면 두개가 서로다른 스키마 구조이더라도 한번에 검색할 수 있다. 검색 요청시 인덱스명에 "*" 입력이 가능하기 때문에 더욱 손쉬운 멀티 인덱스 검색이 가능하다. 예를 들어 매일매일 날짜별로 인덱스를 생성하여 데이터를 색인하는 로그 수집으로 엘라스틱 서치를 이용한다고 생각해보면, "log-2019-01-01" 형태라고 가정시 "log-2019-*"로 손쉽게 멀티 인덱스 검색이 가능하다. 솔라 검색엔진과 비교하면 아주 강력한 기능이다. 보통 솔라는 엘라스틱서치와 달리 인덱스로 도메인을 구분하지 않고 인스턴스로 도메인을 구분하기 때문에 멀티테넌시를 제공하기 쉽지 않다. 그 말은 즉, 도메인 인스턴스별로 같은 질문을 여러번 요청을 보내야 엘라스틱서치의 멀티 인덱스 검색과 같은 결과를 얻을 수 있는 것이다.(최소한 필자 사용시는 그랬음...솔라도 비슷한 기능이 있을 수도 있음)

 

쿼리 결과 페이징

페이징을 하기 위해서는 두가지 파라미터를 이용한다. from과 size이다. 각 기본값은 0,5로 설정되어 있다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
예시) POST http://localhost:9200/인덱스명/_search - 1Page
{
    "from":0,
    "size":5,
    "query":{
        "term":{
            "fieldName":"value"
        }
    }
}
 
예시) POST http://localhost:9200/인덱스명/_search - 2Page
{
    "from":5,
    "size":5,
    "query":{
        "term":{
            "fieldName":"value"
        }
    }
}
cs

 

엘라스틱서치는 관계형 데이터베이스와는 조금 다르게 페이징된 범위의 문서만 가져오는 것이 아니라, 모든 데이터를 읽고 그 중 필터링하여 몇개의 문서를 가져오는 것이기 때문에 페이지 번호가 높아질수록 쿼리 비용은 비례하여 높아진다.

 

 

쿼리 결과 정렬

엘라스틱서치는 기본적으로 점수를 내림차순으로 결과정렬은 한다. 하지만 점수 이외에 정렬 조건을 주고 싶다면 sort 파라미터를 이용한다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
예시) POST http://localhost:9200/인덱스명/_search
{
    "query":{
        "term":{
            "fieldName":"value"
        }
    }
    ,"sort":{
        "_score":{
            "order":"desc"
        }
        ,"fieldName":{
            "order":"asc"
        }
    }
}
 
cs

 

위 예제는 점수가 동일할 시에 추가 정렬 조건으로 특정 필드 값으로 오름차순 정렬을 한다.

 

_source 필드 필터링

검색의 결과로 _source의 모든 항목을 결과로 반환한다.(_source의 모든 항목은 문서의 모든 필드를 포함한 결과) 하지만 모든 필드를 결과로 내보낼 필요가 없을 때도 있다. 그때 사용하는 것이 _source 파라미터이다. 필요한 필드만 결과값으로 출력한다면 네트워크 사용량을 줄여 응답 속도도 빨라질 수 있다.

 

1
2
3
4
5
6
7
8
9
10
예시) POST http://localhost:9200/인덱스명/_search
{
    "query":{
        "term":{
            "fieldName":"value"
        }
    }
    ,"_source":["fieldName1","fieldName2"]
}
 
cs

 

범위검색

숫자나 날짜 등을 특정한 숫자나 날짜가 아니라 범위를 지정하여 질의 요청을 할 수도 있다.

 

문법 연산자 설명
lt < 피연산자보다 작음
gt > 피연산자보다 큼
lte <= 피연산자보다 작거나 같음
gte >= 피연산자보다 크거나 같음

 

2016년부터 2019년까지의 데이터 조회 예시이다.

 

1
2
3
4
5
6
7
8
9
10
11
12
예시) POST http://localhost:9200/인덱스명/_search
{
    "query":{
        "range":{
            "date":{
                "gte":"2016",
                "lte":"2019"
            }
        }
    }
}
 
cs

 

 

operator 설정

엘라스틱서치는 검색 시 문장이 들어올 경우 기본적으로 OR연산으로 동작한다. 하지만 실무에서 더 정확도 높은 결과를 위하여 AND연산을 사용하여 검색할때도 있다. 그때 사용하는 것이 operator 파라미터이다.(기본값은 OR이다)

 

1
2
3
4
5
6
7
8
9
10
11
12
예시) POST http://localhost:9200/인덱스명/_search
{
    "query":{
        "match":{
            "fieldName":{
                "query":"coding start",
                "operator":"and"
            }
        }
    }
}
 
cs

 

coding start가 분석기에 의해 분리되고, 두개의 텀을 and 연산으로 검색을 하게 된다. 즉, 둘다 포함된 문서가 결과로 나올 것이다. 만약 OR 연산이라면 coding과 start라는 단어 둘 중 최소한 하나만 포함이 되어도 결과로 반환될 것이다.

 

minimum_should_match 설정

바로 위에서 보았던 operator 설정중 OR연산을 사용하면 너무 많은 결과가 나오게 된다. 이 경우 텀의 개수가 몇 개 이상 매칭될 때만 검색 결과로 나오게 할 수 있는데 이것이 바로 minimum_should_match 파라미터를 이용하는 것이다. 즉, OR연산자로 AND연산과 비슷한 효과를 낼 수 있게 된다.

 

1
2
3
4
5
6
7
8
9
10
11
12
예시) POST http://localhost:9200/인덱스명/_search
{
    "query":{
        "match":{
            "fieldName":{
                "query":"this is coding start",
                "minimum_should_match" : 3
            }
        }
    }
}
 
cs

 

위의 쿼리의 뜻은 OR연산을 사용하되 4개의 텀중 최소한 3개가 일치해야 결과로 내보낸다라는 쿼리이다.

 

 

fuzziness 설정

fuzziness 파라미터를 이용하면 완벽히 동일한 텀으로 문서를 찾는 Match Query를 유사한 텀의 형태로 검색을 하는 Fuzzy Query로 변경할 수 있다. 이는 레벤슈타인 편집 거리 알고리즘을 기반으로 문서의 필드 값을 여러번 변경하는 방식으로 동작한다. 유사한 검색 결과를 찾기 위해 허용 범위의 텀으로 변경해 가며 문서를 찾아 결과로 출력한다. 예를 들어, 편집 거리수를 2로 설정하면 오차범위가 두 글자 이하인 검색 결과까지 포함해서 결과를 준다. 하지만 한국어에는 적용하기 쉽지 않다.(만약 키워드성 질의가 많은 검색이라면 어느정도는 한국어도 가능하기는 한것같다.) 오차범위 값으로 0,1,2,AUTO 총 4가지 값을 사용가능하다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
예시) POST http://localhost:9200/인덱스명/_search
{
    "query":{
        "match":{
            "movieNm":{
                "query":"곤지엠",
                "fuzziness":1
            }
        }
    }
}
 
 
cs

 

 

위와 같이 질의를 날리면 "곤지암"이라는 영화 제목을 결과값으로 반환받을 수 있다.

 

 

boost 설정

boost 설정으로 관련성이 높은 필드나 키워드에 가중치를 더 줄 수 있게 해준다.

 

1
2
3
4
5
6
7
8
9
10
11
12
예시) POST http://localhost:9200/인덱스명/_search
{
    "query":{
        "multi_match":{
            "query":"Fly",
            "fields":["field1^3","field2"]
        }
    }
}
 
 
 
cs

 

위의 쿼리는 Fly라는 단어가 field1에 있을 경우 해당 문서의 가중치에 곱하기 3을 하여 더 높은 점수를 얻게 된다.

 

 

2.3.2. Query DSL 주요 쿼리

 

Match All Query

match_all 파라미터를 사용하여 색인의 모든 문서를 검색하는 쿼리이다.

 

1
2
3
4
5
6
7
8
9
10
예시) POST http://localhost:9200/인덱스명/_search
{
    "query":{
        "match_all":{}
    }
}
 
 
 
 
cs

 

Match Query

Match Query는 텍스트, 숫자, 날짜 등이 포함된 문장을 형태소 분석을 통해 텀으로 분리한 후 이 텀들을 이용해 검색 질의를 수행한다. 즉, 검색어가 분리돼야 할 경우에 사용해야한다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
예시) POST http://localhost:9200/인덱스명/_search
{
    "query":{
        "match":{
            "fieldName":"coding start"
        }
    }
}
 
 
 
 
 
cs

 

위의 쿼리는 형태소분석에 의해 coding, start라는 두개의 텀으로 분리되고 별도의 operator가 없기에 OR연산으로 fieldName이라는 필드에 coding,start 텀을 OR연산으로 검색한다.

 

 

Multi Match Query

multi_match 파라미터를 이용하여 하나의 필드가 아니라 여러개의 필드에 대해서 검색을 수행할 수 있다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
예시) POST http://localhost:9200/인덱스명/_search
{
    "query":{
        "multi_match":{
            "query":"가족",
            "fields":["fieldName1","fieldName2"]
        }
    }
}
 
 
 
 
 
 
cs

 

위에서도 다루어봤지만 multi_match를 사용하면서 특정 필드에 가중치를 부여할 수도 있다.

 

 

Term Query

텍스트 형태의 값을 검색하기 위해 엘라스틱서치는 두 가지 매핑 유형을 지원한다.

 

타입 설명
Text 타입 필드에 데이터가 저장되기 전에 데이터가 Analyzer에 의해 분석되어 역색인 구조로 저장됨.
Keyword 타입 데이터가 분석되지 않고 문장이라도 하나의 텀(분석되지 않은)으로 저장됨

 

바로 이전에 다룬 Match Query는 검색어 매칭 전에 검색 텍스트에 대해 형태소 분석이 들어간다. 하지만 Term Query는 별도의 형태소 분석없이 검색 문자 그대로 해당 문자와 동일한 문서가 있는지 찾는다. 즉, Keyword 데이터 타입을 사용하는 필드를 검색하기 위해서는 Term Query를 사용한다. 그리고 정확히 일치하지 않으면 검색 결과로 나오지 않는다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
예시) POST http://localhost:9200/인덱스명/_search
{
    "query":{
        "term":{
            "fieldName":"코미디"
        }
    }
}
 
 
 
 
 
 
 
cs

 

fieldsName이라는 필드에 코미디라는 정확한 단어가 포함된 문서가 있다면 결과로 돌려준다.

 

 

Bool Query

RDB에서는 다양하게 AND,OR,NOT 등의 조건을 여러개 WHERE절에 사용할 수 있다. 이처럼 엘라스틱서치도 여러개의 조건 절을 연결하여 더욱 정확한 검색결과를 얻을 수 있는 쿼리가 있는데, 바로 boot query이다. 

 

엘라스틱서치 SQL 설명
must : [] AND 칼럼=조건 반드시 조건에 만족하는 문서만 검색
must_not : [] AND 칼럼!=조건 조건을 만족하지 않는 문서가 검색
should : [] OR 칼럼=조건 여러 조건 중 하나 이상을 만족하는 문서 검색
filter : [] 칼럼 IN (조건) 조건을 포함하고 있는 문서를 출력. 해당 파라미터를 사용하면 점수별로 정렬하지 않는다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
예시) POST http://localhost:9200/인덱스명/_search
{
    "query":{
        "bool":{
            "must":[
                {
                    "term":{
                        "repGenreNm":"코미디"    
                    }
                },
                {
                    "match":{
                        "repNationNm":"한국"
                    }
                }
            ],
            "must_not":[
                {
                    "match":{
                        "typeNm":"단편"
                    }
                }
            ]
        }
    }
}
 
 
 
 
 
 
 
 
cs

 

해당 쿼리는 장르가 코미디이면서 제작사 나라가 한국이라는 단어를 포함하고 있으며, 종류가 단편이 아닌 영화를 조회하는 쿼리이다.

 

 

Prefix Query

Prefix Query는 해당 접두어가 있는 모든 문서를 검색하는데 사용한다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
예시) POST http://localhost:9200/인덱스명/_search
{
    "query":{
        "prefix":{
            "movieNm":"자전"
        }
    }
}
 
 
 
 
 
 
 
 
 
cs

 

영화제목에 자전이라고 시작하는 영화 제목을 검색 결과로 모두 가져온다.

 

 

Exists Query

필드 값이 null 인  데이터를 제외하고 검색한다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
예시) POST http://localhost:9200/인덱스명/_search
{
    "query":{
        "exists":{
            "field":"movieNm"
        }
    }
}
 
 
 
 
 
 
 
 
 
cs

 

Wildcard Query

검색어가 와일드카드와 일치하는 구문을 찾는다. 이때 입력된 검색어는 형태소 분석이 이뤄지지 않는다.

 

와일드카드 옵션 설명
* 문자의 길이와 상관없이 와일드카드와 일치하는 모든 문서를 찾는다
? 지정된 위치의 한 글자가 다른 경우의 문서를 찾는다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
예시) POST http://localhost:9200/인덱스명/_search
{
    "query":{
        "wildcard":{
            "fieldName":"여?"
        }
    }
}
 
 
 
 
 
 
 
 
 
cs

 

여기까지 간단한 엘라스틱서치 검색 API에 대해 다루어보았다. 다음 포스팅에서는 더 다루지 못한 검색 API 나머지 부분을 다룰 예정이다. 혹시 잘못된 점이나 모르는 점 등이 있다면 댓글을 꼭 부탁드린다..

posted by 여성게
:

ELK - Filebeat 란?



https://coding-start.tistory.com/187

조금 더 다듬어서 Filebeat 정리하였습니다.


    <실시간 로그 수집을 위한 프로세스 구성>


만약 많은 애플리케이션이 분산되어 있고, 각 애플리케이션이 로그 파일들을 생성한다고 생각해보자. 만약 해당 로그 파일을 하나의 서버에 일일이 ssh 터미널을 이용하여 로그 파일을 수집하는 것이 합리적인 행동일까? 만약 엄청난 규모의 서비스이고 분산되어 있는 서비스의 애플리케이션이 수백개라고 생각하면 ssh를 이용하는 방법은 생각하기도 싫은 방법일 것이다. 이런 상황에서 Filebeat는 로그와 혹은 파일을 경량화된 방식으로 전달하고 중앙 집중화하여 작업을 보다 간편하게 만들어 주는 역할을 한다. 


다시한번 Elastic 공식 홈페이지에서 소개하는 Filebeat를 설명하자면, Filebeat는 로그 데이터를 전달하고 중앙화하기 위한 경량의 Producer이다. 서버에 에이전트로 설치되는 Filebeat는 지정한 로그 파일 또는 위치를 모니터링하고 로그 이벤트를 수집한 다음 인덱싱을 위해 Elasticsearch 또는 Logstash로 전달한다.




Filebeat의 작동 방식은 어떻게 될까?

Filebeat를 시작하면 설정에서 지정한 로그데이터를 바라보는 하나이상의 inputs을 가진다. 지정한 로그 파일에서 이벤트(데이터발생)가 발생할 때마다 Filebeat는 데이터 수확기(harvester)를 시작한다. 하나의 로그 파일을 바라보는 각 havester는 새 로그 데이터를 읽고 libbeat에 보낸다. 그리고 libbeat는 이벤트를 집계하고 집계된 데이터를 Filebeat 설정에 구성된 출력으로 데이터를 보낸다.






Filebeat 시작하기

▶︎▶︎▶︎Filebeat Download


파일비트를 다운로드 받았다면 압축을 풀고, filebeat.yml 파일을 열어 설정파일을 살펴본다.


1
2
3
4
5
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/*.log
cs


위의 설정은 수집할 로그파일의 경로를 설정한다. 즉, /var/log/*.log의 파일을 수집대상으로 정해놓는 것이다.


1
2
3
4
5
6
7
cloud.id: "staging:dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRjZWM2ZjI2MWE3NGJmMjRjZTMzYmI4ODExYjg0Mjk0ZiRjNmMyY2E2ZDA0MjI0OWFmMGNjN2Q3YTllOTYyNTc0Mw=="
 
output.elasticsearch:
  hosts: ["myEShost:9200"]
 
setup.kibana:
  host: "mykibanahost:5601"
cs


출력을 Elasticsearch로 지정하여 수집한 로그데이터를 엘라스틱서치에 저장할 수 있다. 물론 중간에 Logstash 등을 거쳐 데이터를 추가적인 처리를 한 후에 엘라스틱서치에 데이터를 저장할 수도 있다.


1
2
3
#----------------------------- Logstash output --------------------------------
output.logstash:
  hosts: ["127.0.0.1:5044"]
cs


자세한 설정은 Elastic 공식 페이지를 이용하시길 바랍니다.




Filebeat는 파일의 상태를 어떻게 유지하나?

편집하다

Filebeat는 각 파일의 상태를 유지하며 레지스트리 파일의 상태를 디스크로 자주 플러시한다. 상태는 수확기(havester)가 읽었던 마지막 오프셋을 기억하고 모든 로그 라인이 전송되는지 확인하는 데 사용된다. Elasticsearch 또는 Logstash와 같은 출력에 도달 할 수 없는 경우 Filebeat은 마지막으로 보낸 행을 추적하고 출력이 다시 사용 가능 해지면 파일을 계속 읽는다. Filebeat가 실행되는 동안 상태 정보도 각 입력에 대해 메모리에 보관된다. Filebeat가 다시 시작되면 레지스트리 파일의 데이터가 상태를 다시 작성하는 데 사용되며 Filebeat은 마지막으로 알려진 위치에서 각 수확기를 계속 사용한다 또한 Filebeat는 적어도 한번 이상 구성된 데이터를 지정한 출력으로 전달함을 보장한다.




Filebeat와 Kafka를 이용한 간단한 로그수집


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#=========================== Filebeat prospectors =============================
#filebeat로 어떤 파일을 보낼지를 선택하는 부분이다. /Users/yun-yeoseong/kafka_directory/kafka/logs/server.log 파일을 보내기 위한 설정
kafka.home: /Users/yun-yeoseong/kafka_directory/kafka
filebeat.prospectors:
  - input_type: log
    paths:
      -${kafka.home}/logs/server.log*
    multiline.pattern: '^\['
    multiline.negate: true
    multiline.match: after
    fields.pipeline: kafka-logs #파이브라인 아이디
 
#============================= Filebeat modules ===============================
 
filebeat.config.modules:
  # Glob pattern for configuration loading
  path: ${path.config}/modules.d/*.yml
 
  # Set to true to enable config reloading
  reload.enabled: false
 
  # Period on which files under path should be checked for changes
  #reload.period: 10s
 
#==================== Elasticsearch template setting ==========================
 
setup.template.settings:
  index.number_of_shards: 3
  #index.codec: best_compression
  #_source.enabled: false
 
#==================== Kafka output ==========================
#kafka의 프로듀서의 역할을 설정하는 부분이다.
output.kafka: 
  hosts: ["localhost:9092","localhost:9093","localhost:9094","localhost:9095"]
  topic: 'kafka-log'
  partition.round_robin: 
    reachable_only: false
 
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000
cs


위는 카프카를 output으로 하는 설정이다. 간단하게 설정에 대해 설명하면 filebeat.prospectors는 Filebeat의 input을 정의하는 것이다. '-' 구분으로 여러개의 input을 정의할 수 있다. input_type: log 설정은 로그 파일의 모든 행을 읽는 설정이다. paths는 읽어올 파일의 경로이다. '-'구분으로 여러개의 경로를 지정할 수 있다. multiline 설정은 로그 데이터처럼 여러 개행된 데이터를 처리할때 사용하는 어떠한 규칙이라고 보면된다. 자세한 사항은 Elastic 공식 페이지를 확인하면 될듯하다. output.kafka 설정은 출력을 정의한다. hosts는 Kafka 클러스터 노드들을 배열로 나열한 것이고, 여기서 파일비트는 카프카 입장에서는 하나의 프로듀서이므로 pub할 topic을 설정한다. 나머지 설정들은 kafka의 producer 설정이므로 생략한다. 그리고 파일비트에서 카프카는 위처럼 호스트를 나열하면 내부적으로 로드밸런싱을 해주므로 하나의 브로커에만 데이터 요청이 가질 않는다.

이번 예제에서는 파일비트가 데이터를 정말로 수집하는 지를 보기 위해서 kafka가 기본으로 제공하는 console consumer를 이용할 것이다.
우선은 위의 설정에 작성된 토픽을 생성해준다.

kafka는 3대를 클러스터링한 환경이다. 만약 kafka 클러스터링 방법을 모른다면 밑의 링크에서 참조하고 와도 될듯싶다.
▶︎▶︎▶︎kafka cluster


> ./kafka-topic.sh --zookeeper localhost:2181,localhost:2182,localhost:2181/kafka-broker(자신이 설정한 디렉토리입력) --topic Kafka-log --partitions 3 --replication-factor 2 --create.  -> 토픽생성

>./filebeat -e -c filebeat.yml -d "publish"  -> 위에서 작성한 설정파일에 기준한 파일비트 실행

>./kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic kafka-log --from-beginning ->컨슈머실행

카프카의 클러스터중 하나를 다운시켰다 실행시키는 등의 로그가 출력되는 작업을 수행한 후에 컨슈머에 들어오는 데이터를 확인해보자.


왼쪽이 컨슈머, 오른쪽은 파일비트의 로그이다. 파일비트에서 수집한 카프카의 로그가 카프카로 보내지고 컨슈머가 해당 브로커에서 데이터를 가져오는 것을 볼 수 있다.


posted by 여성게
: