인프라/Docker&Kubernetes 2020. 8. 24. 22:35

이번 포스팅에서는 쿠버네티스 로깅 파이프라인 구성에 대해 다루어볼 것이다. 저번 포스팅에서는 Fluentd + ES + Kibana 조합으로 클러스터 로깅 시스템을 구성했었는데, 이번 시간에는 Fluentd + kafka + ELK 조합으로 구성해본다.

<fluentd + ES + kibana logging>

 

 

Kubernetes - Kubernetes 로깅 운영(logging), Fluentd

오늘 다루어볼 내용은 쿠버네티스 환경에서의 로깅운영 방법이다. 지금까지는 쿠버네티스에 어떻게 팟을 띄우는지에 대해 집중했다면 오늘 포스팅 내용은 운영단계의 내용이 될 것 같다. 사실

coding-start.tistory.com

중간에 카프카를 두는 이유는 여러가지가 있을 수 있을 것 같다. 첫번째 버퍼역할을 하기때문에 어느정도 파이프라인의 속도 조절이 가능하다. 두번째 로그를 카프카 큐에 담아두고, 여러 컨슈머 그룹이 각기의 목적으로 로그데이터를 사용가능하다. 바로 실습에 들어가보자.

 

구성

 

 

구성은 위 그림과 같다. fluentd는 컨테이너 로그를 tail하고 있고, tail한 데이터를 카프카로 프로듀싱한다. 그리고 아웃풋으로 로그스태시로 보내고 로그 스태시는 엘라스틱서치에 색인을하게 된다.

 

실습이전에 본 실습에서 진행하는 예제중 카프카 구성과 엘라스틱서치의 구성은 별도로 옵션 튜닝 및 물리머신에 구성하는 것이 좋다. 필자는 구성의 편의를 위해 아무런 옵션을 튜닝하지 않은채 같은 쿠버네티스 클러스터에 카프카와 엘라스틱서치를 구성하였다.

 

kafka install & deploy on kubernetes unsing helm
 

TheOpenCloudEngine/uEngine-cloud-k8s

Contribute to TheOpenCloudEngine/uEngine-cloud-k8s development by creating an account on GitHub.

github.com

<헬름 설치>

> curl https://raw.githubusercontent.com/kubernetes/helm/master/scripts/get | bash
> kubectl --namespace kube-system create sa tiller
> kubectl create clusterrolebinding tiller --clusterrole cluster-admin --serviceaccount=kube-system:tiller
> helm init --service-account tiller
> helm repo update

위 명령어로 헬름을 다운로드 받는다.

 

<카프카 헬름 차트 설치 및 배포>

> kubectl create ns kafka
> helm repo add incubator http://storage.googleapis.com/kubernetes-charts-incubator
> helm install --name my-kafka --namespace kafka incubator/kafka

 

kafka라는 별도의 네임스페이스를 생성하여 그 안에 카프카를 배포하였다.

 

<헬름차트 삭제>

차트 삭제가 필요하면 아래 명령어를 이용하자.

# --purge 옵션으로 관련된 모든 정보를 지운다. 
helm delete my-kafka --purge

 

<fluentd가 데이터를 보낼 토픽생성>

> kubectl -n kafka exec my-kafka-0 -- /usr/bin/kafka-topics \
--zookeeper my-kafka-zookeeper:2181 --topic fluentd-container-logging \
--create --partitions 3 --replication-factor 3

Created topic "fluentd-container-logging".

 

"fluentd-container-logging"이라는 이름으로 토픽을 생성하였다.

 

<생성된 topic 확인>

> kubectl -n kafka exec my-kafka-0 -- /usr/bin/kafka-topics --zookeeper my-kafka-zookeeper:2181 --list

fluentd-container-logging

 

토픽리스트를 조회해서 우리가 생성한 토픽이 있는지 조회해본다.

 

<fluentd가 보낸 데이터가 큐로 잘들어오는지 확인하기 위해 컨슘머 실행>

> kubectl -n kafka exec -ti my-kafka-0 -- /usr/bin/kafka-console-consumer \
--bootstrap-server my-kafka:9092 --topic fluentd-container-logging --from-beginning

 

이제 실제로 카프카와 주키퍼가 쿠버네티스에 잘 떠있는지 확인해보자 !

 

> kubectl get pod,svc -n kafka
  NAME                       READY   STATUS    RESTARTS   AGE
  pod/my-kafka-0             1/1     Running   2          4m14s
  pod/my-kafka-1             1/1     Running   0          116s
  pod/my-kafka-2             1/1     Running   0          78s
  pod/my-kafka-zookeeper-0   1/1     Running   0          4m14s
  pod/my-kafka-zookeeper-1   1/1     Running   0          3m32s
  pod/my-kafka-zookeeper-2   1/1     Running   0          3m
  NAME                                  TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                      AGE
  service/my-kafka                      ClusterIP   10.108.104.66   <none>        9092/TCP                     4m14s
  service/my-kafka-headless             ClusterIP   None            <none>        9092/TCP                     4m14s
  service/my-kafka-zookeeper            ClusterIP   10.97.205.63    <none>        2181/TCP                     4m14s
  service/my-kafka-zookeeper-headless   ClusterIP   None            <none>        2181/TCP,3888/TCP,2888/TCP   4m14s

 

위와 같이 팟과 서비스 목록이 보인다면 다음으로 넘어간다.

 

ELK Stack 구성

<elasticsearch 실행>

아래 deployment와 service 설정파일을 이용하여 쿠버네티스 위에 엘라스틱서치를 구성한다.

 

apiVersion: v1
kind: Service
metadata:
  name: elasticsearch
  namespace: elk-stack
spec:
  selector:
    app: elasticsearch
  ports:
    - port: 9200
      protocol: TCP
      targetPort: 9200
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: elasticsearch
  namespace: elk-stack
  labels:
    app: elasticsearch
spec:
  replicas: 1
  selector:
    matchLabels:
      app: elasticsearch
  template:
    metadata:
      labels:
        app: elasticsearch
    spec:
      containers:
      - name: elasticsearch
        image: elastic/elasticsearch:6.8.6
        ports:
        - containerPort: 9200
          name: http
        - containerPort: 9300
          name: tcp

 

위 설정 파일은 볼륨을 구성하지 않아서 일회성(테스트)로만 가능하다. 실제로 운영환경에서는 물리머신에 클러스터를 구성하던가, 혹은 쿠버네티스 볼륨을 붙여서 구성하자.

 

> kubectl apply -f ./kube-logging/fluentd-elasticsearch/elasticsearch.yaml
> kubectl get pod,svc -n elk-stack
  NAME                                 READY   STATUS    RESTARTS   AGE
  pod/elasticsearch-654c5b6b77-l8k2z   1/1     Running   0          50s
  NAME                    TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)    AGE
  service/elasticsearch   ClusterIP   10.101.27.73   <none>        9200/TCP   50s

 

<kibana 실행>

키바나는 아래 설정파일을 예제로 구성하였다.

 

apiVersion: v1
kind: Service
metadata:
  name: kibana
  namespace: elk-stack
spec:
  selector:
    app: kibana
  ports:
  - protocol: TCP
    port: 5601
    targetPort: 5601
  type: NodePort
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kibana
  namespace: elk-stack
  labels:
    app: kibana
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kibana
  template:
    metadata:
      labels:
        app: kibana
    spec:
      containers:
      - name: kibana
        image: elastic/kibana:6.8.6
        ports:
        - containerPort: 5601
          name: http

 

위 설정중 조금 살펴봐야할 것은 서비스 타입을 NodePort로 준 점이다. 실제로 외부로 포트를 개방해 localhost로 접근 가능하다. 실제 운영환경에서는 ingress까지 구성하여 배포하자.

 

> kubectl apply -f ./kube-logging/fluentd-elasticsearch/kibana.yaml
> kubectl get pod,svc -n elk-stack | grep kibana
  NAME                                 READY   STATUS    RESTARTS   AGE
  pod/kibana-6d474df8c6-fsfc7          1/1     Running   0          24s
  NAME                                 READY   STATUS    RESTARTS   AGE
  service/kibana          NodePort    10.97.240.55   <none>        5601:30578/TCP   24s

 

http://localhost:30578로 접근해 키바나가 잘 떠있는지와 엘라스틱서치와 잘 연동되었는지 확인하자.

 

<logstash 실행>

로그스태시는 아래 예시 설정 파일로 구성하였다.

 

apiVersion: v1
kind: ConfigMap
metadata:
  name: logstash-configmap
  namespace: elk-stack
data:
  logstash.yml: |
    http.host: "127.0.0.1"
    path.config: /usr/share/logstash/pipeline
    pipeline.workers: 2
  logstash.conf: |
    # all input will come from filebeat, no local logs
    input {
      kafka {
        bootstrap_servers => "my-kafka.kafka.svc.cluster.local:9092"
        topics => "fluentd-container-logging"
        group_id => "fluentd-consumer-group"
        enable_auto_commit => "true"
        auto_offset_reset => "latest"
        consumer_threads => 4
        codec => "json"
      }
    }

    output {
        elasticsearch {
          hosts => ["http://elasticsearch.elk-stack.svc.cluster.local:9200"]
          manage_template => false
          index => "kubernetes-container-log-%{+YYYY-MM-dd}"
        }
    }
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: logstash-deployment
  namespace: elk-stack
spec:
  replicas: 1
  selector:
    matchLabels:
      app: logstash
  template:
    metadata:
      labels:
        app: logstash
    spec:
      containers:
        - name: logstash
          image: docker.elastic.co/logstash/logstash:5.6.0
          ports:
            - containerPort: 5044
          volumeMounts:
            - name: config-volume
              mountPath: /usr/share/logstash/config
            - name: logstash-pipeline-volume
              mountPath: /usr/share/logstash/pipeline
      volumes:
        - name: config-volume
          configMap:
            name: logstash-configmap
            items:
              - key: logstash.yml
                path: logstash.yml
        - name: logstash-pipeline-volume
          configMap:
            name: logstash-configmap
            items:
              - key: logstash.conf
                path: logstash.conf
---
apiVersion: v1
kind: Service
metadata:
  name: logstash-service
  namespace: elk-stack
spec:
  selector:
    app: logstash
  ports:
    - protocol: TCP
      port: 5044
      targetPort: 5044
  type: ClusterIP

 

설정에서 잘 살펴볼 것은 input과 output의 호스트 설정이다. 우리는 모든 모듈을 같은 클러스터에 설치할 것이기 때문에 쿠버네티스 내부 DNS를 사용하였다.(실습에 편의를 위한 것이기도 하지만, 실제 운영환경에서도 내부 시스템은 종종 클러스터 내부 DNS를 사용하기도 한다. 그러면 실제로 통신하기 위해 클러스터 밖으로 나갔다 오지 않는다.)

 

또 한가지 설정은 Deployment에 볼륨을 마운트 하는 부분이다. 실제 쿠버네티스에서 ConfigMap은 볼륨으로 잡히기 때문에 그 ConfigMap을 logstash pod 내부로 마운트하여 실행시점에 해당 설정파일을 물고 올라가도록 하였다.

 

> kubectl apply -f ./kube-logging/fluentd-elasticsearch/logstash.yaml
> kubectl get pod,svc -n elk-stack | grep logstash
  NAME                                       READY   STATUS    RESTARTS   AGE  
  pod/logstash-deployment-556cfb66b5-6xrs6   1/1     Running   0          34s
  service/logstash-service   ClusterIP   10.96.13.170   <none>        5044/TCP         33s

 

<fluentd 실행>

이제는 실제 컨테이너 로그를 tail하여 수집하는 fluentd를 실행시켜보자.

 

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluentd
  namespace: kube-system
  labels:
    app: fluentd-logging
    version: v1
    kubernetes.io/cluster-service: "true"
spec:
  selector:
    matchLabels:
      app: fluentd-logging
  template:
    metadata:
      labels:
        app: fluentd-logging
        version: v1
        kubernetes.io/cluster-service: "true"
    spec:
      tolerations:
        - key: node-role.kubernetes.io/master
          effect: NoSchedule
      containers:
        - name: fluentd
          image: 1223yys/fluentd-kafka:latest
          imagePullPolicy: Always
          env:
            - name: FLUENT_KAFKA_BROKERS
              value: "my-kafka.kafka.svc.cluster.local:9092"
            - name: FLUENT_KAFKA_DEFAULT_TOPIC
              value: "fluentd-container-logging"
            - name: FLUENT_KAFKA_OUTPUT_DATA_TYPE
              value: "json"
            - name: FLUENT_KAFKA_COMPRESSION_CODEC
              value: "snappy"
            - name: FLUENT_KAFKA_MAX_SEND_LIMIT_BYTES
              value: "4096"
          resources:
            limits:
              memory: 200Mi
            requests:
              cpu: 100m
              memory: 200Mi
          volumeMounts:
            - name: varlog
              mountPath: /var/log
            - name: varlibdockercontainers
              mountPath: /var/lib/docker/containers
              readOnly: true
      terminationGracePeriodSeconds: 30
      volumes:
        - name: varlog
          hostPath:
            path: /var/log
        - name: varlibdockercontainers
          hostPath:
            path: /var/lib/docker/containers

 

fluentd 설정파일은 몇가지 짚고 넘어갈 것들이 있다. 첫번째는 컨테이너를 tail하기 위해 마운트한 설정이다. /var/log, /var/lib/docker/container를 마운트하였다. 실제 호스트머신에 해당 디렉토리에 들어가면 파일이 보이지 않을 것이다. 만약 파일을 보고 싶다면 아래 설정을 통해 도커 컨테이너를 실행시키고 볼 수 있다.

 

> docker run -it --rm -v /var/lib/docker/containers:/json-log alpine ash

 

위 도커이미지를 실행한후 /json-log 디렉토리에 들어가면 호스트머신에 쌓인 컨테이너 로그들을 볼 수 있다.

 

두번째, tail한 로그를 내보내기 위한 env 설정이다. 아웃풋은 카프카로 두었고, 역시 도메인은 내부 클러스터 DNS로 잡아주었다. 그리고, 우리가 미리 생성한 토픽에 데이터를 보내고 있고 타입은 json으로 보내고 있다.(사실상 튜닝할 설정은 많지만 실습의 편의를 위해 대부분 기본 설정으로 잡았다.)

 

그리고 필자가 fluentd 이미지를 새로 빌드한 이유는 카프카로 보내는 로그 포맷을 수정하기 위하여 fluentd 설정파일들을 조금 수정하였기 때문이다. 혹시나 fluentd 설정 파일들이 궁금하다면 포스팅 마지막 Github을 참조하자.(https://github.com/yoonyeoseong/kubernetes-sample/tree/master/kube-logging/fluentd-kafka)

 

> kubectl apply -f ./kube-logging/fluentd-kafka/fluentd-kafka-daemonset.yaml
> kubectl get pod,daemonset -n kube-system | grep fluentd
  NAME                                         READY   STATUS    RESTARTS   AGE
  pod/fluentd-bqmnl                            1/1     Running   0          34s
  daemonset.extensions/fluentd      1         1         1       1            1           <none>                        34s

 

이제 로그 출력을 위해 샘플 앱을 실행시켜보자. 로그 출력을 위한 앱은 꼭 아래 필자가 빌드한 웹 어플리케이션을 실행시킬 필요는 없다. 만약 아래 애플리케이션을 실행시키려면 ingress 설정 혹은 service node port를 설정하자.

 

> kubectl apply -f ./kube-resource/deployment-sample.yaml
> kubectl get pod
  NAME                                 READY   STATUS    RESTARTS   AGE
  sample-deployment-5fbf569554-4pzrf   0/1     Running   0          17s

 

이제 요청을 보내보자.

 

> kubectl get svc -n ingress-nginx
  NAME                                 TYPE        CLUSTER-IP     EXTERNAL-IP   PORT(S)                      AGE
  ingress-nginx-controller             NodePort    10.97.27.106   <none>        80:30431/TCP,443:31327/TCP   21d
  ingress-nginx-controller-admission   ClusterIP   10.96.76.113   <none>        443/TCP                      21d
> curl localhost:30431/api

 

이제 키바나에 접속해보면 앱에서 출력하고 있는 로그 데이터를 볼 수 있다. 모든 예제 설정 및 코드는 아래 깃헙을 참고하자 !

 

 

yoonyeoseong/kubernetes-sample

Kubernetes(쿠버네티스) sample. Contribute to yoonyeoseong/kubernetes-sample development by creating an account on GitHub.

github.com

 

posted by 여성게
:

 

오늘 포스팅할 내용은 ELK Stack의 요소중 하나인 Logstash(로그스태시)입니다. 로그스태시 설명에 앞서 로그란 시스템이나 애플리케이션 상태 및 행위와 관련된 풍부한 정보를 포함하고 있습니다. 이러한 정보를 각각 시스템마다 파일로 기록하고 있는 경우가 대다수 일겁니다. 그렇다면 과연 이러한 정보를 파일로 관리하는 것이 효율적인 것인가를 생각해볼 필요가 있습니다. 한곳에 모든 로그데이터를 시스템별로 구분하여 저장하고 하나의 뷰에서 모든 시스템의 로그데이터를 볼 수 있다면 굉장히 관리가 편해질 것입니다. 이러한 모든 로그정보를 수집하여 하나의 저장소(DB, Elasticsearch 등)에 출력해주는 시스템이 로그스태시라는 시스템입니다. 앞선 포스팅에서 다루어보았던 Filebeat와 연동을 한다면 파일에 축적되고 있는 로그데이터를 하나의 저장소로 보낼 수도 있고, 카프카의 토픽에 누적되어 있는 메시지들을 가져와 하나의 저장소에 보낼 수도 있습니다.

 

2019/06/17 - [Elasticsearch&Solr] - ELK Stack - Filebeat(파일비트)란? 간단한 사용법

 

ELK Stack - Filebeat(파일비트)란? 간단한 사용법

오늘 포스팅할 내용은 ELK Stack에서 중요한 보조 수단 중 하나인 Filebeat(파일비트)에 대해 다루어볼 것이다. 우선 Filebeat를 이용하는 사례를 간단하게 하나 들어보자면, 운영중인 애플리케이션에서 File을..

coding-start.tistory.com

다시 한번 로그스태시는 아래 그림과 같이 오픈소스 데이터 수집 엔진으로, 실시간 파이프라인 기능을 갖춘 데다 널리 사용되고 있는 시스템입니다. 로그스태시를 활용하면 다양한 입력차원에서 데이터를 수집 및 분석, 가공 및 통합해 다양한 목적지에 저장하는 파이프라인을 쉽게 구축할 수 있습니다.

 

 

게다가 로그스태시는 사용하기 쉽고, 연동하기 간단한 입력 필터와 출력 플러그인과 같이 다양한 플러그인을 제공합니다. 이처럼 로그스태시를 사용하면 대용량 데이터와 각종 데이터 형식을 통합하고 정규화하는 프로세스 구축이 가능합니다.

 

 

로그스태시 아키텍쳐

 

 

로그스태시는 위의 그림과 같이 여러 원천데이터를 가져와 필터를 통해 가공하여 하나 이상의 시스템으로 내보낼 수 있습니다. 입력은 여러 원천데이터가 될 수 있고, 필터 또한 하나 이상을 정의하여 가공할 수 있습니다. 즉 입력,필터,출력 세 가지 단계로 구성이 됩니다. 이중 입력과 출력은 필수요소, 필터는 선택요소입니다. 또한 로그스태시는 기본적으로 파이프라인 단계마다 이벤트를 버퍼에 담기 위해 인메모리 바운드 큐를 사용하는데, 로그스태시가 비정상 종료된다면 인메모리에 저장된 이벤트는 손실됩니다. 하지만 손실 방지를 위하여 영구큐를 사용해 실행하면 이벤트를 디스크에 작성하기 때문에 비정상 종료후 재시작되어도 데이터의 손실이 없습니다.

 

 

영구 큐는 LOGSTASH_HOME/config 디렉토리의 logstash.yml 설정 파일에서 queue.type: persisted 속성을 설정하면 활성화 가능하다. 또한 로그스태시 힙메모리를 늘리려면 LOGSTASH_HOME/config 디렉토리 밑에 jvm.options 파일에서 조정가능합니다.

 

입력 플러그인

일반적으로 많이 사용되는 입력 플러그인에 대해 설명한다.

 

1)File

File 플러그인은 파일에서 이벤트를 한 줄씩 가져오는데 사용한다. 리눅스와 유닉스 명령어 tail -f와 유사한 방식으로 동작한다. 각 파일의 모든 변경 사항을 추적하고 마지막으로 읽은 위치를 기반으로 해당 시점 이후의 데이터만 전송한다. 각 파일에서 현재 위치는 sincedb라는 별도로 분리한 파일에 기록하여 로그스태시를 재기동하여도 읽지 못한 데이터를 빠지지 않고 읽어 올 수 있다.

 

1
2
3
4
5
6
7
8
9
10
11
#sample.conf
 
#input plugin file
input
{
 file {
  path => ["/Users/yun-yeoseong/*","...","..."]
  start_position => "beginning"
#sincedb_path => "NULL" ~> 로그스태시를 재시작할때마다 파일의 처음부터 읽는다.
  exclude => ["*.csv"]
  discover_interval => "10s"
  type => "applogs" ~>새로운 필드를 선언한다. 추후 필터에서 유용하게 쓰이는 필드이다.
 }
}
cs

 

위의 input 설정을 설명하면 path에 설정된 경로에서 파일을 읽어오며 파일에서 로그 추출 시작점을 파일의 처음으로 잡았고 경로의 파일중 csv 확장자는 제외하고 10초마다 파일을 읽어들이며 type이라는 필드에 applogs라는 데이터를 추가한다. type 필드를 이용하여 시스템 이름등을 넣어 로그스태시가 수집한 로그데이터 구분이 가능하다. 나머지 기타 설정들을 공식 홈페이지를 확인하자.

 

2)Beat

Beat 입력 플러그인을 사용하면 로그스태시에서 엘라스틱비트 프레임워크의 이벤트를 수신할 수 있다. 엘라스틱비트란 로그스태시를 보조하는 역할이며, 다양한 시스템에서 데이터를 수집하여 전달해주는 역할이다. 로그스태시와 공통점이라면 다양한 시스템의 데이터를 수집할 수 있다는 점이며, 차이점은 수집한 데이터를 가공하지 못한다는 점이다. 즉, 보통 엘라스틱비트에서 데이터를 수집하여 로그스태시에 보내면 로그스태시는 데이터를 가공하여 출력 포인트로 가공된 데이터를 내보낸다. 

 

1
2
3
4
5
6
7
8
9
10
11
12
#input plugin beat
input
{
 beats {
  host => "192.168.10.229"
  port => 2134 =>"실행중인 비트 포트만 작성해주면 된다.
 }
 beats { =>beats를 여러개 입력하여 다중 비트 입력을 받을 수 있다.
  host => "ip"
  port => "port"
 }
}
cs

 

위의 input 설정을 설명하자면 192.168.10.229:2134로 떠있는 비트에서 이벤트를 수신한다는 설정이다. 여러개의 beat 설정을 넣어서 여러 비트프레임워크에서 이벤트를 수신할 수 있다. 만약 로그스태시와 비트가 같은 서버에 있다면 host는 생략하고 port만 명시해도 된다.

 

설명한 input 플러그인 이외에도 JDBC,Kafka 등 많은 입력 플러그인이 있다. 그 중 카프카에서 데이터를 가져오는 예제는 이 포스팅 마지막에 할 예정이라 설명을 생략하였다.

 

TIP 로그스태시 실행 시 -r 옵션을 지정하면 설정을 변경하고 저장할 때마다 자동으로 바뀐 환경설정을 다시 로드한다. 즉, 매번 설정이 변경될 때마다 로그스태시를 재시작하지 않아도 된다.

 

 

출력 플러그인

가장 일반적으로 많이 사용되는 출력 플러그인 설명이다.

 

1)Elasticsearch

로그스태시에서 일래스틱서치로 이벤트 혹은 로그 데이터를 전송하는 데 사용하며, 권장하는 방법이라고 한다. 엘라스틱서치에 데이터가 있으면 키바나로 손쉽게 시각화가 가능하기에 사용하면 여러모로 유용하다.

 

1
2
3
4
5
6
7
8
9
10
#output to elasticsearch
output {
 elasticsearch {
 index => "elasticserach-%{+YYYY.MM.dd}" => default logstash-%{+YYYY.MM.dd}
 document_type => "_doc"
 hosts => "192.162.43.30" / ["ip1:9200","ip2:9200"] => default localhost:9200
 user => "username"
 password => "password"
 }
}
cs

 

직관적인 설정방법이라 크게 설명할 것은 없다. 몇개 짚고 넘어가자면 여러 엘라스틱서치 노드가 존재한다면 위와 같이 배열형태로 설정할 수 있고, 엘라스틱서치 사용자 자격 증명이 필요하다면 user,password를 지정할 수 있다. 데이터를 보낼 인덱스명은 기본값을 가지고 있지만 사용자 정의가 가능하다. 호스트 포트를 생략하면 기본적으로 엘라스틱서치의 기본포트를 이용한다.

 

2)CSV

CSV 플러그인을 사용하면 출력을 CSV 형식으로 저장할 수 있다. 플러그인 사용 시 필요한 설정은 출력 파일 위치를 지정하는 path 매개변수와 CSV 파일에 기록할 이벤트 필드명을 지정하는 fields 매개변수다. 이벤트에 필드가 없는 경우, 빈 문자열이 기록된다. 

 

1
2
3
4
5
6
7
#output to elasticsearch
output {
 csv {
  fields => ["message","@timestamp","host"]
  path => "/home/..."
 }
}
cs

 

설명한 출력 플러그인 이외에도 많은 출력 플러그인이 존재한다. 나머지는 공식 홈페이지를 참고하자.

 

코덱 플러그인

가장 일반적으로 많이 사용되는 코덱 플러그인이다.

 

1)JSON

해당 코덱은 데이터가 json으로 구성된 경우, 입력 플러그인에서 데이터를 디코딩하고 출력 플러그인에서 데이터를 인코딩하도록 사용하는데 유용하다. 만약 \n 문자가 들어간 JSON 데이터가 있는 경우 json_lines 코덱을 사용한다. 

 

1
2
3
4
5
6
input codec exam
input {
 stdin {
  codec => "json" => \n 구분자가 있는 pretty json 일경우 json_lines codec을 이용한다.
 }
}
 
#input exam
{"question":"안녕","answer":"네, 안녕하세요"}
 
#output result
{

      "answer" => "네, 안녕하세요",

      "question" => "안녕",

      "host" => "yun-yeoseong-ui-MacBook-Pro.local",

      "@version" => "1",

    "@timestamp" => 2019-06-26T13:26:08.704Z

}

 

#input exam2

{"question":"안녕","answer":"네, \n 안녕하세요"}

#output result

{

      "message" => "{\"question\":\"안녕\",\"answer\"n 안세요\"}",

      "host" => "yun-yeoseong-ui-MacBook-Pro.local",

      "@version" => "1",

      "tags" => [

        [0] "_jsonparsefailure"

    ],

    "@timestamp" => 2019-06-26T13:28:58.986Z

}

 

=============================================================================

input codec exam

input {

 stdin {

  codec => "json_lines" => \n 구분자가 있는 pretty json 일경우 json_lines codec을 이용한다.

 }

}

 

#input exam

{"question":"안녕","answer":"네, \n 안녕하세요"}

#output result

{

      "host" => "yun-yeoseong-ui-MacBook-Pro.local",

      "question" => "안녕",

      "answer" => "네, \n 안녕하세요",

      "@version" => "1",

    "@timestamp" => 2019-06-26T13:30:12.650Z

}

cs

 

코덱플러그인은 위와 같이 입력,출력 플러그인에 설정이 들어간다.

 

2)Multiline

여러 행에 걸친 데이터를 단일 이벤트로 병합하는 데 유용한 플러그인이다.

 

1
2
3
4
5
6
7
8
9
10
11
#input codex exam2
input {
 file {
  path => "/var/log/access.log"
  codex => multiline {
   pattern => "^\s" 공백으로 시작하는 데이터를 이전 행과 결합
   negate => false
   what => "previous"
  }
 }
}
cs

 

공백으로 시작하는 모든 행을 이전 행과 결합하는 코덱설정이다.

 

이외에도 더 많은 코덱 플러그인이 존재한다. 자세한 사항을 공식 홈페이지를 참고바란다.

 

 

Kafka(카프카) + ELK Stack을 이용한 로그 분석 구현

사실 데이터의 흐름은 정의하기 나름이지만 필자 나름대로의 플로우로 로그분석을 위한 로그데이터 파이프라인을 구축해보았고, 실제 솔루션 내에도 적용한 플로우이다.

 

전체적인 플로는 아래와 같다.

 

App -> Kafka -> Logstash -> Elasticsearch

 

위와 같이 애플리케이션에서 발생하는 로그들을 DB에 저장하는 것이 아니라, 카프카를 이용해 특정 토픽에 메시지를 보낸 후에 해당 토픽을 폴링하고 있는 로그스태시가 데이터를 수집하여 엘라스틱서치에 색인한다.

 

1)App(Spring boot + Spring Cloud Stream)

애플리케이션은 스프링부트 기반의 웹프로젝트이며, 스프링 클라우드 스트림 라이브러리를 이용하여 카프카와 연동하였다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
############################<Binder Config>###############################
#broker list(cluster)
spring.cloud.stream.kafka.binder.brokers=localhost:9092,localhost:9093,localhost:9094,localhost:9095
#acks mode
spring.cloud.stream.kafka.binder.producer-properties.acks=all
#required acks num
spring.cloud.stream.kafka.binder.required-acks=3
#min partition num
#spring.cloud.stream.kafka.binder.min-partition-count=4
#auto create topic enable
spring.cloud.stream.kafka.binder.auto-create-topics=true
#auto add partitions
#spring.cloud.stream.kafka.binder.auto-add-partitions=true
#replication factor
spring.cloud.stream.kafka.binder.replication-factor=2
############################</Binder Config>###############################
############################<Producer Config>##############################
 
#Log Producer
spring.cloud.stream.bindings.logstash_producer.destination=KIVESCRIPT_CHAT_LOG
spring.cloud.stream.bindings.logstash_producer.content-type=application/json
spring.cloud.stream.bindings.logstash_producer.producer.partition-count=4
 
############################</Producer Config>#############################
cs

 

애플리케이션에서 카프카와 연동하기 위한 application.properties 설정들이다. 만약 위의 설정들을 모른다면 이전에 포스팅했던 글을 참고하길 바란다.

 

 

Kafka - Spring cloud stream kafka(스프링 클라우드 스트림 카프카)

Kafka - Spring cloud stream kafka(스프링 클라우드 스트림 카프카) 이전 포스팅까지는 카프카의 아키텍쳐, 클러스터 구성방법, 자바로 이용하는 프로듀서,컨슈머 등의 글을 작성하였다. 이번 포스팅은 이전까지..

coding-start.tistory.com

밑의 링크는 카프카 클러스터링에 관련된 포스팅이다.

 

 

Kafka - Kafka(카프카) cluster(클러스터) 구성 및 간단한 CLI사용법

Kafka - Kafka(카프카) cluster(클러스터) 구성 및 간단한 CLI사용법 ▶︎▶︎▶︎카프카란? 이전 포스팅에서는 메시징 시스템은 무엇이고, 카프카는 무엇이며 그리고 카프카의 특징과 다른 메시지 서버와의 차이..

coding-start.tistory.com

 

다음은 카프카 바인더와 연결시켜줄 채널을 명시해주는 자바 컨피그 클래스이다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
 * Custom Message Processor
 * @author yun-yeoseong
 *
 */
public interface MessageProcessor {
    
    public static final String CHAT_LOG = "logstash_producer";
    
    @Output(KiveMessageProcessor.CHAT_LOG)
    MessageChannel chatLog();
    
}
cs

 

애플리케이션에서 카프카 토픽으로 메시지를 내보내는 코드이다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
 * 
 * @author yun-yeoseong
 *
 */
@Slf4j
@Component
public class MessageSender {
    
    @Autowired
    private MessageProcessor messageProcessor;
    
    public void sendMessage(String log) {
                
        MessageChannel outputChannel = messageProcessor.chatLog();
        
        outputChannel.send(MessageBuilder
                           .withPayload(request)
                           .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                           .build());
    }
    
}
cs

 

다음은 로그스태시 설명이다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
input {
        kafka {
                bootstrap_servers => "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095"
                topics => "LOGSTASH_PRODUCER"
                group_id => "APP_LOG_GROUP"
                enable_auto_commit => "true"
                auto_offset_reset => "latest"
                consumer_threads => 4
                codec => "json"
        }
}
 
output {
        elasticsearch {
                index => "chatlog-%{+YYYY.MM.dd}"
                document_type => "_doc"
                hosts => ["127.0.0.1:9200","127.0.0.1:9400"]
                template_name => "log-template"
        }
}
cs

 

위 설정은 Logstash 입력과 출력을 정의한 conf 파일이다. 별다른 설정은 없고 입력으로 카프카 클러스터의 특정 토픽을 폴링하고 있으며, 출력으로 엘라스틱서치를 바라보고 있다. 설정에 대한 자세한 사항은 공식홈페이지를 확인하길 바란다. 하나 짚고 넘어갈 것이 있다면 필자는 로그를 색인하기 위한 인덱스를 동적으로 생성하지 않고, 미리 Index Template를 선언해 놓았다. 만약 동적으로 인덱스를 생성한다면 비효율적인 필드 데이터 타입으로 생성될 것이다.(keyword성 데이터도 text타입으로 생성)

 

아래 링크는 엘라스틱서치 Rest 자바 클라이언트를 이용하여 인덱스 템플릿을 생성하는 예제이다.

 

Elasticsearch - Rest High Level Client를 이용한 Index Template 생성

오늘 간단히 다루어볼 내용은 엘라스틱서치의 REST 자바 클라이언트인 Rest High Level Client를 이용하여 Index Template을 생성해보는 예제이다. 바로 예제로 들어간다. 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 1..

coding-start.tistory.com

생략된 나머지 설정(카프카,엘라스틱서치 등)들은 이전 포스팅 글들을 참조하자. 혹시나 모든 설정파일 및 소스가 필요하다면 댓글을 달아주시면 될 듯하다.

 

 

기타 명령어

 

LOGSTASH_HOME/bin logstash-plugin list -> 현재 설치된 플러그인 목록

LOGSTASH_HOME/bin logstash-plugin list --group filter -> 필터 플러그인 목록 출력(input,output,codec 등을 그룹명으로 줄 수 있음)

LOGSTASH_HOME/bin logstash-plugin list 'kafka' -> kafka라는 단어가 포함된 플러그인이 있다면 출력

LOGSTASH_HOME/bin logstash-plugin install logstash-output-email -> logstash-output-email 플러그인 설치

LOGSTASH_HOME/bin logstash-plugin update logstash-output-email -> logstash-output-email 플러그인 최신버전으로 업데이트

 

 

posted by 여성게
:
일상&기타/IT News 2019. 2. 11. 22:56

ElasticSearch(엘라스틱), 22일 서울서 기술 세미나 개최



현재 챗봇을 개발하고 있기 때문에 챗봇의 중요한 역할을 하는 자연어처리, 그리고 검색엔진에 관심이 참 많이 간다. 현재는 Solr(솔라)를 사용중인데 요즘 굉장히 핫한 ElasticSearch도 슬슬 공부를 해볼 예정이다. 그런데 22일 엘리스틱서치 기술 세미나 개최가 있다고 해서 기사를 참조해보았다. 바빠서 갈 수 있을지는 모르겠지만, 혹시 저처럼 검색엔진에 관심이 많으신 분이 이 기사를 보시고 한번 참가 해보셨음 좋겠다는 마음에 기사를 올려봅니다.


▶︎▶︎▶︎네이버뉴스




오픈소스 기반의 실시간 로그분석 및 검색 기술업체인 엘라스틱이 오는 22일 서울에서 글로벌 기술 세미나를 개최한다.

엘라스틱서치코리아(대표 한성엽)는 서울 삼성동 인터컨티넨탈서울코엑스에서 창업자이자 본사 CEO인 샤이 배넌이 참석하는 ‘서울 엘라스틱{온} 투어(Elastic{ONTour Seoul) 2019’를 개최한다고 11일 밝혔다.

샤이 배넌 창업자 외에도 시각화 툴인 ‘키바나(Kibana)’의 최초 개발자인 라시드 칸, 루신 노리 한글분석기 개발자 짐 페렌지 등이 참석한다. 

한성엽 엘라스틱서치코리아 대표는 “2년전 한국에 엘라스틱 지사가 설립된 이후 주요 대기업을 비롯해 다양한 중견·중소기업, 스타트업 등이 자사의 실시간 검색 및 로그분석 솔루션을 도입했다”며 “이번 행사를 통해 전세계 최신 기술 동향 및 엘라스틱의 신기술, 다양한 국내 구축사례 및 파트너 솔루션을 소개할 예정”이라고 말했다.





그나저나...한글 형태소분석기 개발자가 외국인이라니...요즘 가장 핫하고 기능이 좋다는 노리 형태소분석기가 외국인이 만들었다는거에 한번 더 충격...입니다.


posted by 여성게
: