이번에 다루어볼 내용은 엘라스틱서치 Aggregation API이다. 해당 기능은 SQL과 비교하면 Group by의 기능과 아주 유사하다. 즉, 문서 데이터를 그룹화해서 각종 통계 지표 만들어 낼 수 있다.

 

엘라스틱서치의 집계(Aggregation)

통계 분석을 위한 프로그램은 아주 많다. 하지만 실시간에 가깝게 어떠한 대용량의 데이터를 처리하여 분석 결과를 내놓은 프로그램은 많지 않다. 즉, RDBMS이나 하둡등의 대용량 데이터를 적재하고 배치등을 돌려 분석을 내는 것이 대부분이다. 하지만 엘라스틱서치는 많은 양의 데이터를 조각내어(샤딩)내어 관리하며 그 덕분에 다른 분석 프로그램보다 거의 실시간에 가까운 통계 결과를 만들어낼 수 있다.

 

하지만 집계기능은 일반 검색 기능보다 훨씬 더 많은 리소스를 소모한다. 성능 문제를 어느정도 효율적으로 해결하기 위해서는 캐시를 적절히 이용해야 하는 것이다. 물론 우리가 직접적으로 캐시를 조작하는 API를 사용하거나 하지는 않지만 어느정도 설정으로 조정가능하다. 그렇다면 엘라스틱서치에서 사용하는 캐시의 종류가 뭐가 있는지 간단히 알아보자.

 

캐시 종류 설명에 앞서 우선 엘라스틱서치의 캐시를 이용하면 질의의 결과를 캐시에 두고 다음에 동일한 요청이 오면 다시 한번 요청을 처리하는 것이 아닌 캐시에 있는 결과값을 그대로 돌려준다. 보통 캐시의 크기는 일반적으로 힙 메모리의 1%로 정도를 할당하며, 캐시에 없는 질의의 경우 성능 향상에 별다른 도움이 되지 못한다. 만약 엘라스틱서치가 사용하는 캐시 크기를 키우고 싶다면 아래와 같은 설정이 가능하다.

 

~/elasticsearch.yml

indices.requests.cache.size: n%

 

여기서 퍼센트(%)는 엘라스틱서치가 사용하는 힙메모리 중 몇 퍼센트를 나타내는 것이다. 다음은 엘라스틱서치가 사용하는 캐시 종류이다.

 

캐시명 설명
Node query Cache

노드의 모든 샤드가 공유하는 LRU(Least-Recently-Used)캐시다. 캐시 용량이 가득차면 사용량이 가장 적은 데이터를 삭제하고 새로운 결과값을 캐싱한다.

쿼리 캐싱 사용여부는 elasticsearch.yml 파일에 아래 옵션을 추가한다. 기본값은 true이다.

index.queries.cache.enabled: true

Shard request Cache 샤드는 데이터를 분산 저장하기 위한 단위로서, 사실 그 자체가 온전한 기능을 가지는 인덱스라고 볼 수 있다. 그래서 우리는 조회 기능을 특정 샤드에 요청해서 그 샤드에 있는 결과값만 얻어올 수 있는 이유가 그렇다. Shard request Cache는 바로 이 샤드에서 수행된 쿼리의 결과를 캐싱한다. 샤드의 내용이 변경되면 캐시도 삭제하기 때문에 문서 수정이 빈번한 인덱스에서는 오히려 성능 저하가 있을 수 있다.
Field data Cache 엘라스틱서치가 필드에서 집계 연산을 수행할 때는 모든 필드 값을 메모리에 로드한다. 이러한 이유로 엘라스틱서치에서 계산되는 집계 쿼리는 성능적인 측면에서 비용이 상당하다. Field data Cache는 집계 계산동안 필드의 값을 메모리에 보관한다.

 

Aggregation API

 

집계 쿼리 구조

 

GET>http://localhost:9200/indexName/_search?size=0

1
2
3
4
5
6
7
8
9
10
"aggregations":{
    "<aggregation_name>":{
        "<aggregation_type>":{
            "<aggregation_body>"
        }
        [,"meta":{[<meta_data_body>]}]?
        [,"aggregations":{[<sub_aggregation>]+}]?
    }
    ,[,"<aggregation_name_2>:{...}"]*
}
cs

 

집계쿼리는 위와 같은 구조를 갖는다. 각각의 키값에 대한 설명은 직접 예제 쿼리를 통해 다루어볼 것이다. 엘라스틱서치의 집계 기능이 강력한 이유중 하나는 위의 쿼리에서 보듯 여러 집계를 중첩하여 더 고도화된 데이터를 반환할 수 있다는 점이다. 물론 중첩이 될수록 성능은 떨어지지만 더 다양한 데이터를 집계할 수 있다. 또한 URL 요청을 보면 맨뒤에 size=0이 보일 것이다. 해당 쿼리스트링을 보내지 않으면 집계 스코프 대상(query)의 결과도 노출되니 집계 결과만 보고 싶다면 size=0으로 지정해주어야 한다.

 

집계 Scope

집계 API의 전체 요청 JSON구조를 보면 아래와 같다.

 

GET> http://localhost:9200/apache-web-log/_search?size=0

1
2
3
4
5
6
7
8
9
10
11
12
13
{
    "query":{
        "match_all":{}
    },
    "aggs":{
        "region_count":{
            "terms":{
                "field":"geoip.region_name.keyword",
                "size":20
            }
        }
    }
}
cs

 

위에 query라는 필드가 하나더 존재하는데, 해당 쿼리를 통해 나온 결과값을 이용하여 집계를 내겠다라는 집계 대상이 되는 Scope를 query를 이용하여 지정한다. 참고로 aggregations는 aggs로 줄여서 필드명을 작성할 수 있다. 그렇다면 아래와 같은 쿼리는 어떠한 결과를 낼까?

 

GET> http://localhost:9200/apache-web-log/_search?size=0

1
2
3
4
5
6
7
8
9
10
11
{
    "size":0,
    "aggs":{
        "region_count":{
            "terms":{
                "field":"geoip.region_name.keyword",
                "size":3
            }
        }
    }
}
cs

 

쿼리가 생략되면 내부적으로 match_all 쿼리를 수행한다. 또한 이러한 경우도 있다. 한번의 집계 쿼리를 통해 사용자가 지정한 질의에 해당하는 문서들 집계를 수행하고 전체 문서에 대해서도 집계를 수행해야 하는 경우는 아래와 같이 글로벌 버킷을 사용하면 된다.

 

GET> http://localhost:9200/apache-web-log/_search?size=0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
{
    "query":{
        "match":{
            "geoip.region_name":"California"
        }
    },
    "aggs":{
        "region_count":{
            "terms":{
                "field":"geoip.region_name.keyword",
                "size":3
            }
        },
        "global_aggs":{
            "global":{},
            "aggs":{
                "all_doc_aggs":{
                    "terms":{
                        "field":"geoip.region_name.keyword",
                        "size":3
                    }
                }
            }
        }
    }
}
cs

 

우선 region_name이 California인 질의의 결과를 이용하여 region_count라는 집계를 수행하고 이것 이외로 global_aggs 글로벌 버킷의 all_doc_aggs 집계를 전체 문서를 대상으로 한번더 수행한다. 결과는 아래와 같다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
{
    "took": 10,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": 756,
        "max_score": 0,
        "hits": []
    },
    "aggregations": {
        "global_aggs": {
            "doc_count": 10001,
            "all_doc_aggs": {
                "doc_count_error_upper_bound": 77,
                "sum_other_doc_count": 5729,
                "buckets": [
                    {
                        "key": "California",
                        "doc_count": 756
                    },
                    {
                        "key": "Texas",
                        "doc_count": 588
                    },
                    {
                        "key": "Virginia",
                        "doc_count": 424
                    }
                ]
            }
        },
        "region_count": {
            "doc_count_error_upper_bound": 0,
            "sum_other_doc_count": 0,
            "buckets": [
                {
                    "key": "California",
                    "doc_count": 756
                }
            ]
        }
    }
}
cs

 

그렇다면 집계의 종류에는 무엇이 있을까?

 

집계 종류 설명
버킷 집계 쿼리 결과로 도출된 문서 집합에 대해 특정 기준으로 나눈 다음 나눠진 문서들에 대한 산술 연산을 수행한다. 이때 나눠진 문서들의 모음들이 각 버컷에 해당된다.
메트릭 집계 쿼리 결과로 도출된 문서 집합에서 필드의 값을 더하거나 평균을 내는 등의 산술 연산을 수행한다.
파이프라인 집계 다른 집계 또는 관련 메트릭 연산의 결과를 집계한다.
행렬 집계 버킷 대상이 되는 문서의 여러 필드에서 추출한 값으로 행렬 연산을 수행한다. 이를 토대로 다양한 통계정보를 제공한다.

 

이제 위의 집계 종류에 대하여 하나하나 간단히 다루어보자.

 

메트릭 집계

메트릭 집계(Metrics Aggregations)를 사용하면 특정 필드에 대해 합이나 평균을 계산하거나 다른 집계와 중첩해서 결과에 대해 특정 필드의 _score 값에 따라 정렬을 수행하거나 지리 정보를 통해 범위 계산을 하는 등의 다양한 집계를 수행할 수 있다. 이름에서도 알 수 있듯이 정수 또는 실수와 같이 숫자 연산을 할 수 있는 값들에 대한 집계를 수행한다.

 

메트릭 집계는 또한 단일 숫자 메트릭 집계와 다중 숫자 메트릭 집계로 나뉘는데, 단일 숫자 메트릭 집계는 집계를 수행한 결과값이 하나라는 의미로 sum과 avg 등이 속한다. 다중 숫자 메트릭 집계는 집계를 수행한 결과값이 여러개가 될 수 있고, stats나 geo_bounds가 이에 속한다.

 

-합산집계(sum)

합산집계는 단일 숫자 메트릭 집계에 해당한다.

 

apache 로그에 유입되는 데이터의 바이트 총합을 구하는 집계 쿼리이다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
{
    "aggs":{
        "total_bytes":{
            "sum":{
                "field":"bytes"
            }
        }
    }
}
 
->result
{
    "took": 12,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": 10001,
        "max_score": 0,
        "hits": []
    },
    "aggregations": {
        "total_bytes": {
            "value": 2747282505
        }
    }
}
cs

 

만약 전체 데이터가 아닌 쿼리를 날려 매치되는 문서를 집계하기 위해서 특정 지역에서 유입된 apache 로그를 검색해 그 결과로 bytes 수를 총합하는 쿼리는 아래와 같다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
{
    "query":{
        "constant_score":{
            "filter":{
                "match":{
                    "geoip.city_name":"Paris"
                }
            }
        }
    },
    "aggs":{
        "total_bytes":{
            "sum":{
                "field":"bytes"
            }
        }
    }
}
 
->result
{
    "took": 3,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": 21,
        "max_score": 0,
        "hits": []
    },
    "aggregations": {
        "total_bytes": {
            "value": 428964
        }
    }
}
cs

 

논외의 이야기이지만, 스코어 점수가 필요없는 어떠한 검색에 constant_score 쿼리를 사용하면 성능상 이슈가 있다. 자주 사용되는 필터 쿼리는 엘라스틱 서치에서 캐시하므로 성능에 이점이 있을 수 있다. 만약 위의 쿼리에서 바이트를 KB나 MB,GB 단위로 보고 싶다면 어떻게 하면 좋을까? 사실 집계 쿼리에 데이터 크기 단위를 조정하는 기능은 없다. 하지만 script를 이용하면 집계되는 데이터를 원하는 단위로 변환이 가능하다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
    "query":{
        "constant_score":{
            "filter":{
                "match":{
                    "geoip.city_name":"Paris"
                }
            }
        }
    },
    "aggs":{
        "total_bytes":{
            "sum":{
                "script":{
                    "lang":"painless",
                    "source":"doc.bytes.value"
                }
            }
        }
    }
}
cs

 

해당 쿼리는 위의 쿼리와 동일한 결과를 내놓는다. 

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
{
    "query":{
        "constant_score":{
            "filter":{
                "match":{
                    "geoip.city_name":"Paris"
                }
            }
        }
    },
    "aggs":{
        "total_bytes":{
            "sum":{
                "script":{
                    "lang":"painless",
                    "source":"doc.bytes.value / params.divice_value",
                    "params":{
                        "divice_value":1000
                    }
                }
            }
        }
    }
}
 
->result
{
    "took": 30,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": 21,
        "max_score": 0,
        "hits": []
    },
    "aggregations": {
        "total_bytes": {
            "value": 422
        }
    }
}
cs

 

이렇게 스크립트를 이용하면 결과값을 일부 후처리할 수 있다. 하지만 결과가 조금이상하다. 428964/1000 인데 422가 됬다. 분명 428이 되야하는데 말이다. 그 이유는 모든 합산 값에 대한 나누기가 아니라 각 문서의 개별적인 값을 1000으로 나눈 후에 더했기 때문이다. 즉, 1000보다 작은수는 모두 0이 되어 합산이 되었다. 이 문제를 해결하기 위해서는 정수가 아닌 실수로 값을 계산해야한다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
{
    "query":{
        "constant_score":{
            "filter":{
                "match":{
                    "geoip.city_name":"Paris"
                }
            }
        }
    },
    "aggs":{
        "total_bytes":{
            "sum":{
                "script":{
                    "lang":"painless",
                    "source":"doc.bytes.value / (double)params.divice_value",
                    "params":{
                        "divice_value":1000
                    }
                }
            }
        }
    }
}
 
->result
{
    "took": 18,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": 21,
        "max_score": 0,
        "hits": []
    },
    "aggregations": {
        "total_bytes": {
            "value": 428.96399999999994
        }
    }
}
cs

 

-평균 집계(avg)

평균 집계는 단일 숫자 메트릭 집계에 해당한다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
{
    "query":{
        "constant_score":{
            "filter":{
                "match":{
                    "geoip.city_name":"Paris"
                }
            }
        }
    },
    "aggs":{
        "avg_bytes":{
            "avg":{
                "field":"bytes"
            }
        }
    }
}
 
->result
{
    "took": 2,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": 21,
        "max_score": 0,
        "hits": []
    },
    "aggregations": {
        "total_bytes": {
            "value": 20426.85714285714
        }
    }
}
cs

 

-최소값 집계(min)

최소값 집계는 단일 숫자 메트릭 집계에 해당한다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
{
    "query":{
        "constant_score":{
            "filter":{
                "match":{
                    "geoip.city_name":"Paris"
                }
            }
        }
    },
    "aggs":{
        "min_bytes":{
            "min":{
                "field":"bytes"
            }
        }
    }
}
 
->result
{
    "took": 3,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": 21,
        "max_score": 0,
        "hits": []
    },
    "aggregations": {
        "min_bytes": {
            "value": 1015
        }
    }
}
cs

 

최대값 집계는 aggregation_type을 max로 바꾸어주면 된다.

 

-개수집계(count)

개수집계는 단일 숫자 메트릭 집계에 해당한다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
{
    "query":{
        "constant_score":{
            "filter":{
                "match":{
                    "geoip.city_name":"Paris"
                }
            }
        }
    },
    "aggs":{
        "count_bytes":{
            "value_count":{
                "field":"bytes"
            }
        }
    }
}
 
->result
{
    "took": 3,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": 21,
        "max_score": 0,
        "hits": []
    },
    "aggregations": {
        "count_bytes": {
            "value": 21
        }
    }
}
cs

 

-통계집계(Stats)

통계집계는 결과값이 여러 개인 다중 숫자 메트릭 집계에 해당한다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
{
    "query":{
        "constant_score":{
            "filter":{
                "match":{
                    "geoip.city_name":"Paris"
                }
            }
        }
    },
    "aggs":{
        "stats_bytes":{
            "stats":{
                "field":"bytes"
            }
        }
    }
}
 
->result
{
    "took": 3,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": 21,
        "max_score": 0,
        "hits": []
    },
    "aggregations": {
        "stats_bytes": {
            "count": 21,
            "min": 1015,
            "max": 53270,
            "avg": 20426.85714285714,
            "sum": 428964
        }
    }
}
cs

 

count,min,max,avg,sum 등 한번에 모든 집계 결과를 받을 수 있다.

 

-확장 통계 집계(extended Stats)

확장 통계 집계는 결과값이 여러 개인 다중 숫자 메트릭 집계에 해당한다. 앞의 통계 집계를 확장해서 표준편차 같은 통계값이 추가된다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
{
    "query":{
        "constant_score":{
            "filter":{
                "match":{
                    "geoip.city_name":"Paris"
                }
            }
        }
    },
    "aggs":{
        "count_bytes":{
            "extended_stats":{
                "field":"bytes"
            }
        }
    }
}
 
->result
{
    "took": 4,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": 21,
        "max_score": 0,
        "hits": []
    },
    "aggregations": {
        "count_bytes": {
            "count": 21,
            "min": 1015,
            "max": 53270,
            "avg": 20426.85714285714,
            "sum": 428964,
            "sum_of_squares": 18371748404,
            "variance": 457588669.3605442,
            "std_deviation": 21391.32229107271,
            "std_deviation_bounds": {
                "upper": 63209.501725002556,
                "lower": -22355.787439288277
            }
        }
    }
}
cs

 

-카디널리티 집계(Cardinality)

카디널리티 집계는 단일 숫자 메트릭 집계에 해당한다. 개수 집합과 유사하게 횟수를 계산하는데, 중복된 값은 제외한 고유한 값에 대한 집계를 수행한다. 하지만 모든 문서에 대해 중복된 값을 집계하는 것은 성능에 큰 영향을 줄 수 있기에 근사치를 통해 집계한다. 근사치를 구하기 위해 HyperLogLog++ 알고리즘 기반으로 동작한다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
{
    "query":{
        "bool":{
            "must":[
                {
                    "match":{
                        "geoip.country_name":"United"    
                    }
                },
                {
                    "match":{
                        "geoip.country_name":"States"
                    }
                }
            ]
        }
    },
    "aggs":{
        "us_city_names":{
            "cardinality":{
                "field":"geoip.city_name.keyword"
            }
        }
    }
}
 
->result
{
    "took": 25,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": 3974,
        "max_score": 0,
        "hits": []
    },
    "aggregations": {
        "us_city_names": {
            "value": 206
        }
    }
}
cs

 

-백분위 수 집계(Percentiles)

역시나 근사치이고 TDigest 알고리즘을 이용한다. 카디날리티 집계와 마찬가지로 문서들의 집합 크기각 작을 수록 정확도는 높아지고, 문서의 집합이 클수록 오차범위가 늘어난다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
{
    "query":{
        "constant_score":{
            "filter":{
                "match":{
                    "geoip.city_name":"Paris"
                }
            }
        }
    },
    "aggs":{
        "bytes_percentiles":{
            "percentiles":{
                "field":"bytes"
            }    
        }
    }
}
 
->result
{
    "took": 14,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": 21,
        "max_score": 0,
        "hits": []
    },
    "aggregations": {
        "bytes_percentiles": {
            "values": {
                "1.0": 1015,
                "5.0": 1015,
                "25.0": 3638,
                "50.0": 6146,
                "75.0": 50662.75,
                "95.0": 53270,
                "99.0": 53270
            }
        }
    }
}
cs

 

아래와 같이 백분율을 명시할 수도 있다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
{
    "query":{
        "constant_score":{
            "filter":{
                "match":{
                    "geoip.city_name":"Paris"
                }
            }
        }
    },
    "aggs":{
        "bytes_percentiles":{
            "percentiles":{
                "field":"bytes",
                "percents":[0,10,20,30,40,50,60,70,80,90,100]
            }    
        }
    }
}
 
->result
{
    "took": 2,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": 21,
        "max_score": 0,
        "hits": []
    },
    "aggregations": {
        "bytes_percentiles": {
            "values": {
                "0.0": 1015,
                "10.0": 1015,
                "20.0": 3638,
                "30.0": 4629.2,
                "40.0": 4877,
                "50.0": 6146,
                "60.0": 17147,
                "70.0": 37258.399999999994,
                "80.0": 52315,
                "90.0": 52697,
                "100.0": 53270
            }
        }
    }
}
cs

 

-백분위 수 랭크 집계

위의 랭크 집계와는 달리 특정 값을 주고 어느 백분위 범위에 속하는지를 결과값으로 돌려준다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
{
    "query":{
        "constant_score":{
            "filter":{
                "match":{
                    "geoip.city_name":"Paris"
                }
            }
        }
    },
    "aggs":{
        "bytes_percentiles_rank":{
            "percentile_ranks":{
                "field":"bytes",
                "values":[4000,6900]
            }    
        }
    }
}
 
->result
{
    "took": 5,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": 21,
        "max_score": 0,
        "hits": []
    },
    "aggregations": {
        "bytes_percentiles_rank": {
            "values": {
                "4000.0": 26.592105768861217,
                "6900.0": 53.03370689244701
            }
        }
    }
}
cs

 

-지형 경계 집계

지형 좌표를 포함하고 있는 필드에 대해 해당 지역 경계 상자를 계산하는 메트릭 집계다. 해당 집계를 사용하기 위해서는 계산하려는 필드의 타입이 geo_point여야 한다.

 

 

필드 매핑타입이다.

 

해당 필드에 들어간 값의 예제이다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
{
    "aggs":{
        "viewport":{
            "geo_bounds":{
                "field":"geoip.location",
                "wrap_longitude":true
            }    
        }
    }
}
 
->result
{
    "took": 14,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": 10001,
        "max_score": 0,
        "hits": []
    },
    "aggregations": {
        "viewport": {
            "bounds": {
                "top_left": {
                    "lat": 69.34059997089207,
                    "lon": -159.76670005358756
                },
                "bottom_right": {
                    "lat": -45.88390002027154,
                    "lon": 176.91669998690486
                }
            }
        }
    }
}
cs

 

추후 키바나에서 이러한 지형집계로 시각화를 할 수 있다.

 

 

-지형 중심 집계

지형 경계 집계 범위에서 정가운데의 위치를 반환한다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
{
    "aggs":{
        "centroid":{
            "geo_centroid":{
                "field":"geoip.location"
            }    
        }
    }
}
 
->result
{
    "took": 10,
    "timed_out": false,
    "_shards": {
        "total": 5,
        "successful": 5,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": 10001,
        "max_score": 0,
        "hits": []
    },
    "aggregations": {
        "centroid": {
            "location": {
                "lat": 38.715619301146354,
                "lon": -22.189867686554656
            },
            "count": 9993
        }
    }
}
cs

 

여기까지 메트릭 집계에 대해 간단히 다루어봤다. 글이 길어져 다음 포스팅에 이어서 집계 API를 다루어보도록 한다.

 

2019/09/20 - [Search-Engine/Elasticsearch&Solr] - Elasticsearch - Aggregation API(엘라스틱서치 집계,버킷(Bucket Aggregations) 집계) -2

 

Elasticsearch - Aggregation API(엘라스틱서치 집계,버킷(Bucket Aggregations) 집계) -2

이번 포스팅은 엘라스틱서치 Aggregation(집계) API 두번째 글이다. 이번 글에서는 집계중 버킷집계(Bucket)에 대해 알아볼 것이다. 우선 버킷 집계는 메트릭 집계와는 다르게 메트릭을 계산하지 않고 버킷을 생..

coding-start.tistory.com

2019/09/20 - [Search-Engine/Elasticsearch&Solr] - Elasticsearch - Aggregation API(엘라스틱서치 집계,파이프라인(Pipeline Aggregations) 집계) -3

 

Elasticsearch - Aggregation API(엘라스틱서치 집계,파이프라인(Pipeline Aggregations) 집계) -3

파이프라인 집계(Pipeline Aggregations)는 다른 집계와 달리 쿼리 조건에 부합하는 문서에 대해 집계를 수행하는 것이 아니라, 다른 집계로 생성된 버킷을 참조해서 집계를 수행한다. 집계 또는 중첩된 집계를..

coding-start.tistory.com

 

posted by 여성게
:

 

오늘 포스팅할 내용은 ELK Stack의 요소중 하나인 Logstash(로그스태시)입니다. 로그스태시 설명에 앞서 로그란 시스템이나 애플리케이션 상태 및 행위와 관련된 풍부한 정보를 포함하고 있습니다. 이러한 정보를 각각 시스템마다 파일로 기록하고 있는 경우가 대다수 일겁니다. 그렇다면 과연 이러한 정보를 파일로 관리하는 것이 효율적인 것인가를 생각해볼 필요가 있습니다. 한곳에 모든 로그데이터를 시스템별로 구분하여 저장하고 하나의 뷰에서 모든 시스템의 로그데이터를 볼 수 있다면 굉장히 관리가 편해질 것입니다. 이러한 모든 로그정보를 수집하여 하나의 저장소(DB, Elasticsearch 등)에 출력해주는 시스템이 로그스태시라는 시스템입니다. 앞선 포스팅에서 다루어보았던 Filebeat와 연동을 한다면 파일에 축적되고 있는 로그데이터를 하나의 저장소로 보낼 수도 있고, 카프카의 토픽에 누적되어 있는 메시지들을 가져와 하나의 저장소에 보낼 수도 있습니다.

 

2019/06/17 - [Elasticsearch&Solr] - ELK Stack - Filebeat(파일비트)란? 간단한 사용법

 

ELK Stack - Filebeat(파일비트)란? 간단한 사용법

오늘 포스팅할 내용은 ELK Stack에서 중요한 보조 수단 중 하나인 Filebeat(파일비트)에 대해 다루어볼 것이다. 우선 Filebeat를 이용하는 사례를 간단하게 하나 들어보자면, 운영중인 애플리케이션에서 File을..

coding-start.tistory.com

다시 한번 로그스태시는 아래 그림과 같이 오픈소스 데이터 수집 엔진으로, 실시간 파이프라인 기능을 갖춘 데다 널리 사용되고 있는 시스템입니다. 로그스태시를 활용하면 다양한 입력차원에서 데이터를 수집 및 분석, 가공 및 통합해 다양한 목적지에 저장하는 파이프라인을 쉽게 구축할 수 있습니다.

 

 

게다가 로그스태시는 사용하기 쉽고, 연동하기 간단한 입력 필터와 출력 플러그인과 같이 다양한 플러그인을 제공합니다. 이처럼 로그스태시를 사용하면 대용량 데이터와 각종 데이터 형식을 통합하고 정규화하는 프로세스 구축이 가능합니다.

 

 

로그스태시 아키텍쳐

 

 

로그스태시는 위의 그림과 같이 여러 원천데이터를 가져와 필터를 통해 가공하여 하나 이상의 시스템으로 내보낼 수 있습니다. 입력은 여러 원천데이터가 될 수 있고, 필터 또한 하나 이상을 정의하여 가공할 수 있습니다. 즉 입력,필터,출력 세 가지 단계로 구성이 됩니다. 이중 입력과 출력은 필수요소, 필터는 선택요소입니다. 또한 로그스태시는 기본적으로 파이프라인 단계마다 이벤트를 버퍼에 담기 위해 인메모리 바운드 큐를 사용하는데, 로그스태시가 비정상 종료된다면 인메모리에 저장된 이벤트는 손실됩니다. 하지만 손실 방지를 위하여 영구큐를 사용해 실행하면 이벤트를 디스크에 작성하기 때문에 비정상 종료후 재시작되어도 데이터의 손실이 없습니다.

 

 

영구 큐는 LOGSTASH_HOME/config 디렉토리의 logstash.yml 설정 파일에서 queue.type: persisted 속성을 설정하면 활성화 가능하다. 또한 로그스태시 힙메모리를 늘리려면 LOGSTASH_HOME/config 디렉토리 밑에 jvm.options 파일에서 조정가능합니다.

 

입력 플러그인

일반적으로 많이 사용되는 입력 플러그인에 대해 설명한다.

 

1)File

File 플러그인은 파일에서 이벤트를 한 줄씩 가져오는데 사용한다. 리눅스와 유닉스 명령어 tail -f와 유사한 방식으로 동작한다. 각 파일의 모든 변경 사항을 추적하고 마지막으로 읽은 위치를 기반으로 해당 시점 이후의 데이터만 전송한다. 각 파일에서 현재 위치는 sincedb라는 별도로 분리한 파일에 기록하여 로그스태시를 재기동하여도 읽지 못한 데이터를 빠지지 않고 읽어 올 수 있다.

 

1
2
3
4
5
6
7
8
9
10
11
#sample.conf
 
#input plugin file
input
{
 file {
  path => ["/Users/yun-yeoseong/*","...","..."]
  start_position => "beginning"
#sincedb_path => "NULL" ~> 로그스태시를 재시작할때마다 파일의 처음부터 읽는다.
  exclude => ["*.csv"]
  discover_interval => "10s"
  type => "applogs" ~>새로운 필드를 선언한다. 추후 필터에서 유용하게 쓰이는 필드이다.
 }
}
cs

 

위의 input 설정을 설명하면 path에 설정된 경로에서 파일을 읽어오며 파일에서 로그 추출 시작점을 파일의 처음으로 잡았고 경로의 파일중 csv 확장자는 제외하고 10초마다 파일을 읽어들이며 type이라는 필드에 applogs라는 데이터를 추가한다. type 필드를 이용하여 시스템 이름등을 넣어 로그스태시가 수집한 로그데이터 구분이 가능하다. 나머지 기타 설정들을 공식 홈페이지를 확인하자.

 

2)Beat

Beat 입력 플러그인을 사용하면 로그스태시에서 엘라스틱비트 프레임워크의 이벤트를 수신할 수 있다. 엘라스틱비트란 로그스태시를 보조하는 역할이며, 다양한 시스템에서 데이터를 수집하여 전달해주는 역할이다. 로그스태시와 공통점이라면 다양한 시스템의 데이터를 수집할 수 있다는 점이며, 차이점은 수집한 데이터를 가공하지 못한다는 점이다. 즉, 보통 엘라스틱비트에서 데이터를 수집하여 로그스태시에 보내면 로그스태시는 데이터를 가공하여 출력 포인트로 가공된 데이터를 내보낸다. 

 

1
2
3
4
5
6
7
8
9
10
11
12
#input plugin beat
input
{
 beats {
  host => "192.168.10.229"
  port => 2134 =>"실행중인 비트 포트만 작성해주면 된다.
 }
 beats { =>beats를 여러개 입력하여 다중 비트 입력을 받을 수 있다.
  host => "ip"
  port => "port"
 }
}
cs

 

위의 input 설정을 설명하자면 192.168.10.229:2134로 떠있는 비트에서 이벤트를 수신한다는 설정이다. 여러개의 beat 설정을 넣어서 여러 비트프레임워크에서 이벤트를 수신할 수 있다. 만약 로그스태시와 비트가 같은 서버에 있다면 host는 생략하고 port만 명시해도 된다.

 

설명한 input 플러그인 이외에도 JDBC,Kafka 등 많은 입력 플러그인이 있다. 그 중 카프카에서 데이터를 가져오는 예제는 이 포스팅 마지막에 할 예정이라 설명을 생략하였다.

 

TIP 로그스태시 실행 시 -r 옵션을 지정하면 설정을 변경하고 저장할 때마다 자동으로 바뀐 환경설정을 다시 로드한다. 즉, 매번 설정이 변경될 때마다 로그스태시를 재시작하지 않아도 된다.

 

 

출력 플러그인

가장 일반적으로 많이 사용되는 출력 플러그인 설명이다.

 

1)Elasticsearch

로그스태시에서 일래스틱서치로 이벤트 혹은 로그 데이터를 전송하는 데 사용하며, 권장하는 방법이라고 한다. 엘라스틱서치에 데이터가 있으면 키바나로 손쉽게 시각화가 가능하기에 사용하면 여러모로 유용하다.

 

1
2
3
4
5
6
7
8
9
10
#output to elasticsearch
output {
 elasticsearch {
 index => "elasticserach-%{+YYYY.MM.dd}" => default logstash-%{+YYYY.MM.dd}
 document_type => "_doc"
 hosts => "192.162.43.30" / ["ip1:9200","ip2:9200"] => default localhost:9200
 user => "username"
 password => "password"
 }
}
cs

 

직관적인 설정방법이라 크게 설명할 것은 없다. 몇개 짚고 넘어가자면 여러 엘라스틱서치 노드가 존재한다면 위와 같이 배열형태로 설정할 수 있고, 엘라스틱서치 사용자 자격 증명이 필요하다면 user,password를 지정할 수 있다. 데이터를 보낼 인덱스명은 기본값을 가지고 있지만 사용자 정의가 가능하다. 호스트 포트를 생략하면 기본적으로 엘라스틱서치의 기본포트를 이용한다.

 

2)CSV

CSV 플러그인을 사용하면 출력을 CSV 형식으로 저장할 수 있다. 플러그인 사용 시 필요한 설정은 출력 파일 위치를 지정하는 path 매개변수와 CSV 파일에 기록할 이벤트 필드명을 지정하는 fields 매개변수다. 이벤트에 필드가 없는 경우, 빈 문자열이 기록된다. 

 

1
2
3
4
5
6
7
#output to elasticsearch
output {
 csv {
  fields => ["message","@timestamp","host"]
  path => "/home/..."
 }
}
cs

 

설명한 출력 플러그인 이외에도 많은 출력 플러그인이 존재한다. 나머지는 공식 홈페이지를 참고하자.

 

코덱 플러그인

가장 일반적으로 많이 사용되는 코덱 플러그인이다.

 

1)JSON

해당 코덱은 데이터가 json으로 구성된 경우, 입력 플러그인에서 데이터를 디코딩하고 출력 플러그인에서 데이터를 인코딩하도록 사용하는데 유용하다. 만약 \n 문자가 들어간 JSON 데이터가 있는 경우 json_lines 코덱을 사용한다. 

 

1
2
3
4
5
6
input codec exam
input {
 stdin {
  codec => "json" => \n 구분자가 있는 pretty json 일경우 json_lines codec을 이용한다.
 }
}
 
#input exam
{"question":"안녕","answer":"네, 안녕하세요"}
 
#output result
{

      "answer" => "네, 안녕하세요",

      "question" => "안녕",

      "host" => "yun-yeoseong-ui-MacBook-Pro.local",

      "@version" => "1",

    "@timestamp" => 2019-06-26T13:26:08.704Z

}

 

#input exam2

{"question":"안녕","answer":"네, \n 안녕하세요"}

#output result

{

      "message" => "{\"question\":\"안녕\",\"answer\"n 안세요\"}",

      "host" => "yun-yeoseong-ui-MacBook-Pro.local",

      "@version" => "1",

      "tags" => [

        [0] "_jsonparsefailure"

    ],

    "@timestamp" => 2019-06-26T13:28:58.986Z

}

 

=============================================================================

input codec exam

input {

 stdin {

  codec => "json_lines" => \n 구분자가 있는 pretty json 일경우 json_lines codec을 이용한다.

 }

}

 

#input exam

{"question":"안녕","answer":"네, \n 안녕하세요"}

#output result

{

      "host" => "yun-yeoseong-ui-MacBook-Pro.local",

      "question" => "안녕",

      "answer" => "네, \n 안녕하세요",

      "@version" => "1",

    "@timestamp" => 2019-06-26T13:30:12.650Z

}

cs

 

코덱플러그인은 위와 같이 입력,출력 플러그인에 설정이 들어간다.

 

2)Multiline

여러 행에 걸친 데이터를 단일 이벤트로 병합하는 데 유용한 플러그인이다.

 

1
2
3
4
5
6
7
8
9
10
11
#input codex exam2
input {
 file {
  path => "/var/log/access.log"
  codex => multiline {
   pattern => "^\s" 공백으로 시작하는 데이터를 이전 행과 결합
   negate => false
   what => "previous"
  }
 }
}
cs

 

공백으로 시작하는 모든 행을 이전 행과 결합하는 코덱설정이다.

 

이외에도 더 많은 코덱 플러그인이 존재한다. 자세한 사항을 공식 홈페이지를 참고바란다.

 

 

Kafka(카프카) + ELK Stack을 이용한 로그 분석 구현

사실 데이터의 흐름은 정의하기 나름이지만 필자 나름대로의 플로우로 로그분석을 위한 로그데이터 파이프라인을 구축해보았고, 실제 솔루션 내에도 적용한 플로우이다.

 

전체적인 플로는 아래와 같다.

 

App -> Kafka -> Logstash -> Elasticsearch

 

위와 같이 애플리케이션에서 발생하는 로그들을 DB에 저장하는 것이 아니라, 카프카를 이용해 특정 토픽에 메시지를 보낸 후에 해당 토픽을 폴링하고 있는 로그스태시가 데이터를 수집하여 엘라스틱서치에 색인한다.

 

1)App(Spring boot + Spring Cloud Stream)

애플리케이션은 스프링부트 기반의 웹프로젝트이며, 스프링 클라우드 스트림 라이브러리를 이용하여 카프카와 연동하였다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
############################<Binder Config>###############################
#broker list(cluster)
spring.cloud.stream.kafka.binder.brokers=localhost:9092,localhost:9093,localhost:9094,localhost:9095
#acks mode
spring.cloud.stream.kafka.binder.producer-properties.acks=all
#required acks num
spring.cloud.stream.kafka.binder.required-acks=3
#min partition num
#spring.cloud.stream.kafka.binder.min-partition-count=4
#auto create topic enable
spring.cloud.stream.kafka.binder.auto-create-topics=true
#auto add partitions
#spring.cloud.stream.kafka.binder.auto-add-partitions=true
#replication factor
spring.cloud.stream.kafka.binder.replication-factor=2
############################</Binder Config>###############################
############################<Producer Config>##############################
 
#Log Producer
spring.cloud.stream.bindings.logstash_producer.destination=KIVESCRIPT_CHAT_LOG
spring.cloud.stream.bindings.logstash_producer.content-type=application/json
spring.cloud.stream.bindings.logstash_producer.producer.partition-count=4
 
############################</Producer Config>#############################
cs

 

애플리케이션에서 카프카와 연동하기 위한 application.properties 설정들이다. 만약 위의 설정들을 모른다면 이전에 포스팅했던 글을 참고하길 바란다.

 

 

Kafka - Spring cloud stream kafka(스프링 클라우드 스트림 카프카)

Kafka - Spring cloud stream kafka(스프링 클라우드 스트림 카프카) 이전 포스팅까지는 카프카의 아키텍쳐, 클러스터 구성방법, 자바로 이용하는 프로듀서,컨슈머 등의 글을 작성하였다. 이번 포스팅은 이전까지..

coding-start.tistory.com

밑의 링크는 카프카 클러스터링에 관련된 포스팅이다.

 

 

Kafka - Kafka(카프카) cluster(클러스터) 구성 및 간단한 CLI사용법

Kafka - Kafka(카프카) cluster(클러스터) 구성 및 간단한 CLI사용법 ▶︎▶︎▶︎카프카란? 이전 포스팅에서는 메시징 시스템은 무엇이고, 카프카는 무엇이며 그리고 카프카의 특징과 다른 메시지 서버와의 차이..

coding-start.tistory.com

 

다음은 카프카 바인더와 연결시켜줄 채널을 명시해주는 자바 컨피그 클래스이다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
 * Custom Message Processor
 * @author yun-yeoseong
 *
 */
public interface MessageProcessor {
    
    public static final String CHAT_LOG = "logstash_producer";
    
    @Output(KiveMessageProcessor.CHAT_LOG)
    MessageChannel chatLog();
    
}
cs

 

애플리케이션에서 카프카 토픽으로 메시지를 내보내는 코드이다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
 * 
 * @author yun-yeoseong
 *
 */
@Slf4j
@Component
public class MessageSender {
    
    @Autowired
    private MessageProcessor messageProcessor;
    
    public void sendMessage(String log) {
                
        MessageChannel outputChannel = messageProcessor.chatLog();
        
        outputChannel.send(MessageBuilder
                           .withPayload(request)
                           .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                           .build());
    }
    
}
cs

 

다음은 로그스태시 설명이다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
input {
        kafka {
                bootstrap_servers => "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095"
                topics => "LOGSTASH_PRODUCER"
                group_id => "APP_LOG_GROUP"
                enable_auto_commit => "true"
                auto_offset_reset => "latest"
                consumer_threads => 4
                codec => "json"
        }
}
 
output {
        elasticsearch {
                index => "chatlog-%{+YYYY.MM.dd}"
                document_type => "_doc"
                hosts => ["127.0.0.1:9200","127.0.0.1:9400"]
                template_name => "log-template"
        }
}
cs

 

위 설정은 Logstash 입력과 출력을 정의한 conf 파일이다. 별다른 설정은 없고 입력으로 카프카 클러스터의 특정 토픽을 폴링하고 있으며, 출력으로 엘라스틱서치를 바라보고 있다. 설정에 대한 자세한 사항은 공식홈페이지를 확인하길 바란다. 하나 짚고 넘어갈 것이 있다면 필자는 로그를 색인하기 위한 인덱스를 동적으로 생성하지 않고, 미리 Index Template를 선언해 놓았다. 만약 동적으로 인덱스를 생성한다면 비효율적인 필드 데이터 타입으로 생성될 것이다.(keyword성 데이터도 text타입으로 생성)

 

아래 링크는 엘라스틱서치 Rest 자바 클라이언트를 이용하여 인덱스 템플릿을 생성하는 예제이다.

 

Elasticsearch - Rest High Level Client를 이용한 Index Template 생성

오늘 간단히 다루어볼 내용은 엘라스틱서치의 REST 자바 클라이언트인 Rest High Level Client를 이용하여 Index Template을 생성해보는 예제이다. 바로 예제로 들어간다. 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 1..

coding-start.tistory.com

생략된 나머지 설정(카프카,엘라스틱서치 등)들은 이전 포스팅 글들을 참조하자. 혹시나 모든 설정파일 및 소스가 필요하다면 댓글을 달아주시면 될 듯하다.

 

 

기타 명령어

 

LOGSTASH_HOME/bin logstash-plugin list -> 현재 설치된 플러그인 목록

LOGSTASH_HOME/bin logstash-plugin list --group filter -> 필터 플러그인 목록 출력(input,output,codec 등을 그룹명으로 줄 수 있음)

LOGSTASH_HOME/bin logstash-plugin list 'kafka' -> kafka라는 단어가 포함된 플러그인이 있다면 출력

LOGSTASH_HOME/bin logstash-plugin install logstash-output-email -> logstash-output-email 플러그인 설치

LOGSTASH_HOME/bin logstash-plugin update logstash-output-email -> logstash-output-email 플러그인 최신버전으로 업데이트

 

 

posted by 여성게
:

 

오늘 포스팅할 내용은 ELK Stack에서 중요한 보조 수단 중 하나인 Filebeat(파일비트)에 대해 다루어볼 것이다. 우선 Filebeat를 이용하는 사례를 간단하게 하나 들어보자면, 운영중인 애플리케이션에서 File을 통한 로그데이터를 계속 해서 쌓고 있다면 이러한 로그데이터를 단순 파일로 가지고 있는 것이 유용할까? 물론 모니터링하는 시스템이 존재 할 수 있다. 하지만 이러한 모니터링 시스템이 아닌 로그데이터를 계속해서 축적하여 통계를 내고 싶고, 데이터의 증가,하강 추이를 시각화하여 보고 싶을 수도 있다. 이렇게 특정 로그파일을 주기적으로 스캔하여 쌓이고 있는 데이터를 긁어오는 역할을 하는 것이 파일비트이다. 물론 록그스태시만 이용하여 파일에 쌓이는 행데이터를 가져올 수 있다. 하지만 이러한 로그스태시를 엔드서버에 설치하지 못하는 상황이 있을 수 있기에 가벼운 파일비트를 이용하곤 한다.

 

 

파일비트 아키텍쳐

파일비트는 Prospectors, Harvesters, Spooler라는 주요 구성 요소를 가지고 있다. Prospector는 로그를 읽을 파일 목록을 구분하는 역할을 담당한다. 여러 파일 경로를 설정하면 로그를 읽을 파일을 식별하고 각 파일에서 로그를 읽기 시작한다. 이때 파일 컨텐츠, 즉 이벤트 데이터(로그)를 읽는 역할은 Harvester가 담당한다. 파일을 행 단위로 읽고 출력으로 보낸다. 하나의 Harvester가 개별로 파일을 담당하며 파일을 열고 닫는다. 읽어올 파일 수가 여러개가 되면 그에 따라 Harvester도 여러개가 되는 것이다. 이벤트가 발생하여 읽어온 로그데이터는 Spooler에게 보낸다. 그리고 Spooler는 이벤트를 집계하고 설정할 출력으로 전달한다.

 

위의 그림처럼 파일비트는 다수의 Prospector, Harvester로 이루어진다. 파일비트가 지원하는 Input 타입은 log와 stdin이 있다. log는 Prospectors에 정의된 파일을 읽어 데이터를 수집하며, stdin은 표준 입력에서 데이터를 읽는다. 그럼 여기서 의문이 드는 것이 있다. 물리적인 파일을 읽는데 어디까지 읽었고, 출력은 어디까지 보냈고 이런 정보를 어디서 유지하고 있는 것일까? 이러한 정보는 Harvester가 offset으로 디스크에 주기적으로 기록하며 이는 레지스트리 파일에서 관리한다. 엘라스틱서치,카프카,레디스 같은 출력 부분 미들웨어 시스템에 문제가 발생하면 파일비트는 마지막으로 보낸 행을 기록하고, 문제가 해결될때까지 계속 데이터를 수집하고 있는다. 이러한 관리 덕분에 파일비트를 내렸다 다시 올려도 데이터의 위치를 기억한 상태에서 기동되게 된다. 또한 Harvester 같은 경우 출력에게 데이터를 보낸 후에 출력 부분에서 데이터를 잘 받았다는 응답을 기다리는데 해당 응답을 받지 않은 경우 다시 데이터를 보내게됨으로 반드시 한번은 데이터 손실없이 보내게 된다.

 

파일비트 사용법(filebeat-6.6.2 버전기준으로 작성됨)

여기서 파일비트 설치법은 다루지 않는다. 예제에서 다루어볼 것은 Input(file-log type) output(es,logstash) 이다. 간단한 예제이므로 주석에 설명을 달아놓았고 별도의 설명은 하지 않는다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
filebeat.yml
#Filebeat prospectors
filebeat.prospectors:
- input_type: log
  paths:
    - /Users/yun-yeoseong/rnb-chatbot-rivescript/logs/*.log
  #파일비트로 읽어오지 않을 패턴을 지정
  #exclude_lines: ["^INFO"]
  #파일비트로 읽어올 패턴을 지정
  #include_liens: ["^ERR","^WARN"]
 
  #파일비트가 파일을 output으로 내보낼때 밑의 tags도 추가해서 보낸다.(field -> tags)
  #보통 수집기 별로 혹은 애플리케이션 별로 tags 필드의 값을 다르게 주어
  #키바나와 로그스태시에서 이벤트를 필터링하는데 사용한다.
  tags: ["app_logs"]
 
  #output으로 보낼때 해당 필드를 추가해서 보낸다.(field -> fields.env)
  fields:
    env: dev
 
  #여러라인의 로그를 어떻게 처리할지 패턴을 정의한다
  #RE2 구문을 기반으로 정규식을 짠다(https://godoc.org/regexp/syntax)
  #공백으로 시작하는 연속행을 발견한다.
  multiline.pattern: '^[[:space:]]'
  #정규식패턴 무효화 여부
  multiline.negate: false
  #공백으로 시작하는 구문을 공백으로 시작하지 않는 구문 어디로 합칠것인가(after,before)
  multiline.match: after
 
  #파일을 몇 초마다 스캔할 것인가(default 10s)
  scan_frequency: 5s
  #엘라스틱서치가 아웃풋이라면 document type을 무엇으로 할지
  #document_type: doc
 
#Outputs
#Elasticsearch output
output.elasticsearch:
  #enabled로 출력을 활성/비활성 할 수 있다.
  enabled: true
  #ES클러스터 노드 리스트를 지정할 수 있다. 각 노드의 데이터 분배는
  #라운드로빈 방식을 이용한다.
  hosts: ["localhost:9200","localhost:9300"]
  #인증이 필요할 경우 username/password 지정이 가능
  username: "elasticsearch"
  password: "password"
  #인제스트노드 파이프라인에 전달해 색인 이전에 데이터 전처리가 가능
  #pipeline: "pipeline-name"
 
#logstash output, 로그스태시를 지정하려면 로그스태시에서
#비트 이벤트를 수신할 수 있는 비트 입력 플러그인을 설정해야한다.
#output.logstash:
 # enabled: true
  #출력 노드를 여러개 지정할 수 있다. 기본적으로 임의의 노드를 선택해 데이터를 보내고,
  #데이터를 보낸 호스트에서 응답을 하지 않으면 다른 노드들중 하나로 다시 데이터를 보낸다.
  #hosts: ["localhost:5044","localhost:5045"]
  #로그스태시 호스트들에서 이벤트데이터를 균등히 보내고 싶다면 로드벨런싱 설정을 넣어준다.
  #loadbalance: true
#전역옵션
#파일 정보를 유지하는 데 사용하는 레지스트리 파일 위치를 지정한다.
#마지막으로 읽은 오프셋과 읽은 행이 설정 시 지정한 출력 지점의 응답 여부 등
#filebeat.registry_file: /Users/yun-yeoseong/elasticsearch-file/filebeat/registry
#파일비트 종료 시 데이터 송신이 끝마칠 때까지 대기하는 시간
#파일비트는 기본적으로 종료시 이벤트 송신 처리를 기다리지 않고 종료하기 때문에
#이벤트 송신이 잘되었는지 수신을 종료전에 기다리도록 시간설정을 할 수 있다.
filebeat.shutdown_timeout: 10s
 
#일반옵션
#네트워크 데이터를 발송하는 수집기의 이름 기본적으로 필드에는 호스트 이름을 지정한다.
name: "app_log_harvester"
#동시에 실행할 수 있는 최대 CPU 개수를 설정. 기본값은 시스템에서 사용 가능한 논리적인 CPU 개수
#max_procs: 2
 
cs

 

./filebeat --c ./filebeat_exam.yml -> 해당 명령어로 실행시켜준다.

 

데이터는 spring boot 프로젝트를 실행시켰을 때, 올라가는 로그를 수집한 결과이다. kibana Discovery tab을 이용하여 데이터가 어떻게 들어왔는지 결과를 확인할 것이다.

 

Result

 

 

일부 예제에 들어간 설정과 다른 데이터가 들어간 것도 있다. 데이터는 고려하지 말고 어떤 필드들이 생겼고 어떤 데이터가 설정과 비교하여 들어왔는지 확인하자. 일반적으로 파일비트는 인덱스를 버전정보+날짜정보를 이용하여 일자별로 인덱스 생성을 진행한다.(매일 생긴다는 것은? 그만큼 인덱스가 많아진다는 것이기 때문에 백업 스케쥴을 잘 잡아줘야한다.)그리고 @timestamp를 찍어 데이터 색인 시간을 볼 수 있고, 모든 데이터는 message 필드에 저장된다. 또한 설정 부분에 tags, fields.env 설정이 들어간 것이 있을 것이다. 만약 성격이 다른 애플리케이션이 있고 각각 따로 통계를 내고 싶다면 해당 필드들을 이용하여 필터를 걸수도 있고 또한 Logstash를 이용하여 데이터 처리를 할때 해당 필드들을 이용하여 필터를 걸어 사용할 수도 있을 것이다.(Input의 파일 경로(- input_type)를 여러개 지정할 수 있으므로 각 패스마다 해당 필드의 값을 다르게 준다)

 

여기까지 간단히 파일비트가 뭔지 알아봤고 사용법도 알아보았다. 사실 운영환경에서는 이렇게 간단히 사용할 수 없는 환경이 대부분일 것이다. 레퍼런스를 활용하여 더 깊게 공부할 필요성이 있다.

posted by 여성게
: