Web/Spring 2020. 5. 15. 14:12

 

MongoDbConfig를 작성할때, 몽고디비 서버 호스트관련하여 ClusterSettings.Builder를 작성해줘야하는데, mongo host에 모든 클러스터 서버 호스트를 명시하지 않고, 하나의 DNS(여러 서버를 하나로 묶은) 혹은 여러 서버 리스트 중 하나의 primary 호스트(ex. primary host를 명시하면 밑에 예외는 발생하지 않지만, 읽기 부하분산이 안된다.)만 명시한경우에는 반드시 multiple mode를 명시해주어야 한다. 내부적으로 host의 갯수를 보고 single mode인지 multiple mode인지 판단하기 때문이다. 해당 코드는 아래와 같다.

 

private ClusterSettings(final Builder builder) {
        // TODO: Unit test this
        if (builder.srvHost != null) {
            if (builder.srvHost.contains(":")) {
                throw new IllegalArgumentException("The srvHost can not contain a host name that specifies a port");
            }

            if (builder.hosts.get(0).getHost().split("\\.").length < 3) {
                throw new MongoClientException(format("An SRV host name '%s' was provided that does not contain at least three parts. "
                        + "It must contain a hostname, domain name and a top level domain.", builder.hosts.get(0).getHost()));
            }
        }

        if (builder.hosts.size() > 1 && builder.requiredClusterType == ClusterType.STANDALONE) {
            throw new IllegalArgumentException("Multiple hosts cannot be specified when using ClusterType.STANDALONE.");
        }

        if (builder.mode != null && builder.mode == ClusterConnectionMode.SINGLE && builder.hosts.size() > 1) {
            throw new IllegalArgumentException("Can not directly connect to more than one server");
        }

        if (builder.requiredReplicaSetName != null) {
            if (builder.requiredClusterType == ClusterType.UNKNOWN) {
                builder.requiredClusterType = ClusterType.REPLICA_SET;
            } else if (builder.requiredClusterType != ClusterType.REPLICA_SET) {
                throw new IllegalArgumentException("When specifying a replica set name, only ClusterType.UNKNOWN and "
                                                   + "ClusterType.REPLICA_SET are valid.");
            }
        }

        description = builder.description;
        srvHost = builder.srvHost;
        hosts = builder.hosts;
        mode = builder.mode != null ? builder.mode : hosts.size() == 1 ? ClusterConnectionMode.SINGLE : ClusterConnectionMode.MULTIPLE;
        requiredReplicaSetName = builder.requiredReplicaSetName;
        requiredClusterType = builder.requiredClusterType;
        localThresholdMS = builder.localThresholdMS;
        serverSelector = builder.packServerSelector();
        serverSelectionTimeoutMS = builder.serverSelectionTimeoutMS;
        maxWaitQueueSize = builder.maxWaitQueueSize;
        clusterListeners = unmodifiableList(builder.clusterListeners);
    }

 

ClusterSettings.Builder.build 메서드의 일부인데, mode를 set하는 부분에 mode를 명시적으로 넣지 않았다면 작성된 호스트 갯수를 보고 클러스터 모드를 결정한다. 만약 MonoDb 서버 여러개를 하나의 도메인으로 묶어 놓았다면, 보통 DNS하나만 설정에 넣기 마련인데, 이러면 write 요청이 secondary에 들어가게 되면 아래와 같은 에러가 발생하게 된다.(먄약 실수로 secondary host를 넣었다면 쓰기요청에 당연히 아래 예외가 계속 발생한다.)

 

MongoNotPrimaryException: Command failed with error 10107 (NotMaster): 'not master' on server bot-meta01-mongo1.dakao.io:27017. 

 

왠지, SINGLE모드일때는 secondary로 write요청이 들어왔을때 primary로 위임이 안되는듯하다.(이건 조사해봐야할듯함, 왠지 싱글모드이면 당연히 프라이머리라고 판단해서 그럴듯?..) 그렇기 때문에 클러스터는 걸려있고, 서버 리스트를 여러 서버를 묶은 DNS 하나만 작성한다면 반드시 ClusterSetting에 "MULTIPLE"을 명시해서 넣야야한다 !

posted by 여성게
:
Database/MongoDB 2019. 9. 21. 11:51

 

몽고디비 서버에서도 다른 대용량 분산 NoSQL DBMS처럼 맵리듀스 기능을 제공한다. 맵리듀스와 유사한 집계(Aggregations)기능도 제공하지만 더욱 복잡한 패턴의 분석은 맵리듀스 기능이 필요 할 수도 있다. 몽고디비의 맵리듀스는 자바스크립트 언어의 문법을 사용하여 구현한다. 즉, 내부적으로는 자바스크립트 엔진을 이용한다는 뜻이다. 몽고디비가 현재까지 SpiderMonkey, V8 등의 자바스크립트 엔진을 사용했지만 현재는 SpiderMonkey 엔진을 기본으로 사용한다. 두 엔진의 차이가 여기서 이야기할 내용은 아니지만 간단히 아래와 같은 차이를 갖는다.

 

V8 엔진은 멀티 프로세스 방식이며, SpiderMonkey는 단일 프로세스 내에서 멀티 스레드로 작동한다.

 

어찌됬든 몽고디비에서 성능이 더 나은 엔진을 선택했을 것이다. 내부적으로 도큐먼트를 자바스크립트 변수로 매핑하고 자바스크립트로 여러가지 맵리듀스 과정을 거쳐 최종적으로 다시 몽고디비 문서로 생성되는 것이다.

 

아래 이미지는 공식 레퍼런스에 있는 맵리듀스 과정을 도식화한 것이다. 

 

 

결론적으로는 map 과정에서 emit이라는 함수를 호출하는데, 키로 사용될 필드, 값으로 사용될 필드를 인자로 넣어준다. 그러면 내부적으로 같은 키값에 대한 여러개의 값을 배열로 가지고 있게 되고 이후 reduce 과정에서 해당 키와 값배열을 받아 reduce 연산(이밎에서는 Array.sum(values)라는 합산 연산)을 실행한 후 결과 문서를 출력 혹은 별도 컬렉션의 저장등의 작업을 수행한다. 여기서 키와 값은 스칼라 값일 수도 있고, 문서 혹은 서브도큐먼트 일수도 있다.

 

그러면 mapReduce에서 사용하는 각종 옵션에 대해 알아보자.

 

> db.collection.mapReduce(

                                              <map>

                                              ,<reduce>

                                              {

                                                out: <collection>

                                                ,query:<document>

                                                ,sort:<document>

                                                ,limit:<number>

                                                ,finalize:<function>

                                                ,scope:<document>

                                                ,jsMode:<boolean>

                                                ,verbose:<boolean>

                                              }

                                            )

 

옵션 설명
out 맵리듀스 명령의 결과를 저장하거나 출력할 위치를 명시하는데, out 인자에는 문자열 또는 도큐먼트를 설정할 수 있다.
query 맵리듀스 명령을 실행할 대상 문서를 검색하는 조건을 명시한다. query 옵션에는 일반적인 find 명령의 검색 조건과 동일한 오퍼레이터를 활용할 수 있으며, 쿼리의 검색 성능이나 인덱스 활용도 find 명령과 동일한 방식으로 작동한다. 풀스캔보다는 인덱스를 활용하여 검색 성능을 향상시키는 것이 좋다.
sort 맵리듀스 명령을 실행할 대상 문서를 먼저 정렬해서 맵리듀스 엔진으로 전달하고자 할 때에는 sort 옵션을 사용한다. 맵리듀스 엔진은 key 필드의 값으로 정렬을 수행하는데, sort 옵션을 사용하여 인덱스를 활용한 정렬을 수행할 수 있다면 맵리듀스 엔진의 정렬에 드는 부하를 줄일 수 있다.
limit 맵리듀스 엔진으로 전달할 문서의 개수를 설정한다.
finalize 대게 맵리듀스는 map과 reduce 함수로 이루어지는데, reduce함수의 결과를 다시 한번 가공해 최종결과를 만들고 싶다면 finalize 함수를 이용한다.
scope map과 reduce 함수 그리고 finalize 함수에서 접근할 수 있는 글로벌 변수를 정의한다. 보통은 map&reduce에는 인자로 전달된 값만 이용할 수 있으므로 모든 문서가 공유할 변수값등이 필요하면 scope를 이용하면 된다.
jsMode 몽고디비 맵리듀스 엔진은 map이나 reduce 함수로 전달되는 문서를 계속해서 몽고디비서버와 자바스크립트 엔진 사이에서 변환 작업을 한다. 이는 맵리듀스 처리과정에 상당한 성능 저하를 일으키는데, jsMode를 true로 설정하면 중간 과정의 데이터를 메모리에 모두 보관한다. 더 빠른 처리가 가능하지만 그만큼 메모리를 많이 먹는 작업이다. 처리하는 문서가 50만건을 초과하면 jsMode를 true로 설정할 수 없다. jsMode는 기본값이 false이다.
verbose 맵리듀스의 처리 결과에 단계별 처리 과정 및 소요 시간에 대한 정보를 포함할 것인지 결정한다. 기본값은 true이다.

 

밑은 간단하게 out에 적용할 수 있는 옵션에 대한 설명이다.

 

out 옵션 설명

> db.collection.mapReduce(mapFunction, reduceFunction,

  {out: {inline:1}}

);

맵리듀스 결과를 화면 혹은 클라이언트 명령의 결과로 전달한다.

> db.collection.mapReduce(mapFunction, reduceFunction, 

  {out: "collection"}

);

맵리듀스의 결과를 collection이라는 이름의 컬렉션으로 저장한다.

> db.collection.mapReduce(mapFunction, reduceFunction, 

  {out: {replace:"collection"}}

);

collection이라는 컬렉션이 이미 존재한다면 기존 내용을 모두 삭제하고, 맵리듀스 명령의 결과를 해당 컬렉션에 저장한다.

> db.collection.mapReduce(mapFunction, reduceFunction, 

  {out: {merge:"collection"}}

);

collection이라는 컬렉션이 이미 존재하고, 프라이머리 키가 동일한 문서가 존재한다면 맵리듀스의 결과로 그 문서를 덮어쓰기 한다.

> db.collection.mapReduce(mapFunction, reduceFunction, 

  {out: {reduce:"collection"}}

);

collection이라는 컬렉션이 이미 존재하고, 프라이머리 키가 동일한 문서가 존재한다면 맵리듀스의 결과와 기존 문서를 이용해서 다시 맵리듀스를 실행해서 결과를 저장한다. 즉, reduce 타입의 out은 incremental 맵리듀스를 실행할 때 사용한다.

> db.collection.mapReduce(mapFunction, reduceFunction, 

  {out: {replace:"collection", db: database}}

);

맵리듀스의 결과를 새로운 database라는 데이터베이스의 collection 컬렉션에 저장한다. 

> db.collection.mapReduce(mapFunction, reduceFunction, 

  {out: {replace:"collection", sharded:true}}

);

맵리듀스의 결과를 collection 컬렉션에 저장하고, 이때 collection 컬렉션을 _id 필드를 기준으로 샤딩한 다음 결과를 저장한다. 4.2 버전 기준으로 해당 옵션은 deprecated 됬다. 현재는 샤딩된 컬렉션으로 출력하기 위해서는 미리 먼저 샤딩된 컬렉션을 생성해야한다.

> db.collection.mapReduce(mapFunction, reduceFunction, 

  {out: {merge:"collection", nonAtomic:true}}

);

맵리듀스의 결과를 collection 컬렉션에 저장할 때 원자 단위로 처리한다.

 

참고로 nonAtomic 옵션은 merge나 reduce 방식의 out에만 적용할 수 있는 옵션이며, 기본값은 false이다. 만약 nonAtomic 옵션을 false로 지정하면 맵리듀스 처리결과를 컬렉션에 저장할 때, 데이터베이스 레벨의 잠금을 걸게된다. nonAtomic이 true라면 물론 데이터베이스 잠금이 걸리지만, 적절히 잠금을 해제(Yield)했다가 다시 잠금을 걸면서 처리한다. 해당 옵션은 상황에 따라 적용하면 될듯하다. 만약 잠금이 걸려있다면 다른 커넥션에서 조회 변경 등을 할 수 없다.

 

Map-Reduce And Sharded Collection

 

-Sharded Collection as Input

맵리듀스 작업의 입력으로 샤딩된 컬렉션을 이용할 때, 몽고디비서버는 각 샤드에 맵리듀스 작업을 자동으로 병렬 전송한다. 입력으로 샤딩된 컬렉션을 이용하기 위해 별도의 옵션은 필요하지 않고 몽고디비서버는 모든 작업이 끝날 때까지 기다린다.

 

-Sharded Collection as output

sharded 옵션이 true라면, 몽고디비서버는 _id 필드 값 샤딩키로 사용하여 맵리듀스 결과 출력 컬렉션을 샤딩한다.

 

샤딩된 컬렉션으로 출력을 하려면 아래와 같은 조건을 지켜줘야한다.

 

  1. 맵리듀스 결과로 출력할 컬렉션이 존재하지 않는다면, 먼저 샤딩된 컬렉션을 생성해야한다. 4.2버전 기준의 몽고디비는 맵리듀스 옵션으로 새로운 샤딩된 컬렉션을 생성하는 것이 deprecated됬고, sharded 옵션 사용도 deprecated됬다. 즉, 샤딩된 컬렉션을 출력 컬렉션으로 사용하고 싶다면 맵리듀스 작업 이전에 먼저 샤딩된 컬렉션을 생성해야한다. 물론 현재 샤딩된 컬레션이 존재하지 않으면, _id 필드값을 기준으로 샤딩된 컬렉션을 생성하긴 하지만 공식적으로 추천하지 않는 방법이다.
  2. 4.2버전 몽고디비서버부터 이미 존재하는 샤딩된 컬렉션을 replacement하는 방식은 deprecated됬다.
  3. 버전 4.0부터 출력 컬렉션이 이미 있지만 샤딩되지 않은 경우 map-reduce가 실패한다.
  4. 새롭게 생성된 샤딩된 컬렉션 또는 비어있는 샤딩된 컬렉션의 경우 MongoDB는 맵리듀스 작업의 첫 단계 결과를 사용하여 샤드에 분산 된 초기 청크를 만듭니다.
  5. 몽고디비서버는 출력 후처리를 모든 샤드에 병렬로 수행한다.

 

Map-Recuce Concurrency

맵리듀스 작업에는 많은 작업들이 있는데, 각 작업마다 동시성을 위한 락이 걸린다. 각 단계에 대한 락은 아래와 같다.

 

  • The read phase takes a read lock. It yields every 100 documents.
  • The insert into the temporary collection takes a write lock for a single write.
  • If the output collection does not exist, the creation of the output collection takes a write lock.
  • If the output collection exists, then the output actions (i.e. merge, replace, reduce) take a write lock. This write lock is global, and blocks all operations on the mongod instance.
  • 읽기 단계는 읽기 잠금을 수행한다. 100개 문서마다 잠금을 해제한다.
  • 임시 콜렉션에 삽입하면 단일 쓰기에 대한 쓰기 잠금이 사용된다.
  • 출력 콜렉션이 존재하지 않으면 출력 콜렉션 작성에 쓰기 잠금이 사용됩니다. 출
  • 력 콜렉션이 존재하면 출력 조치 (즉, 병합, 대체, 리듀스)가 쓰기 잠금을 수행합니다. 이 쓰기 잠금은 전역 적이며 mongod 인스턴스의 모든 작업을 차단합니다.

위 설명은 간단히 전체 문서를 한번에 읽고 작업하는 것이 아니라 일정 크기의 데이터를 잘라서 맵리듀스 작업을 한다라고 생각하면 된다. 작업을 잘라서 수행하기 때문에 일부 문서들을 map하여 reduce한 다음에 결과를 임시 저장소에 보관했다가 나중에 다시 reduce가 수행될때 동일한 key를 가진 중간 결과가 있으면 같이 모아 reduce를 수행한다.

 

최종 출력에서 merge나 reduce 작업은 문서의 양의 따라 상당히 많은 시간이 걸릴 수 있다. 즉, 오랜 시간동안 락이 걸리면 안되기 때문에 nonAtomic 옵션을 true로 주어서 잠금을 해제하고 잠금을 다시 걸고 하는 방식으로 작업을 해줘야한다. 즉, merge&reduce 출력 옵션은 nonAtomic 옵션을 true로 하길 권장한다.

 

Map-Reduce 예제

 

1
2
3
4
5
6
7
8
9
db.orders.insert({
...      _id: ObjectId("50a8240b927d5d8b5891743c"),
...      cust_id: "abc123",
...      ord_date: new Date("Oct 04, 2012"),
...      status: 'A',
...      price: 25,
...      items: [ { sku: "mmm", qty: 5, price: 2.5 },
...               { sku: "nnn", qty: 5, price: 2.5 } ]
... });
cs

 

위 문서를 삽입한다. 위 문서를 삽입한 후에 소비자별 총액을 구하는 맵리듀스 연산을 해보도록 할 것이다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
> var mapFunction1 = function() {
...                        emit(this.cust_id, this.price);
...                    };
> var reduceFunction1 = function(keyCustId, valuesPrices) {
...                           return Array.sum(valuesPrices);
...                       };
> db.orders.mapReduce(
...                      mapFunction1,
...                      reduceFunction1,
...                      { out: "map_reduce_example" }
...                    )
{
    "result" : "map_reduce_example",
    "timeMillis" : 76,
    "counts" : {
        "input" : 1,
        "emit" : 1,
        "reduce" : 0,
        "output" : 1
    },
    "ok" : 1
}
 
->result
 
> db.map_reduce_example.find()
{ "_id" : "abc123", "value" : 25 }
cs

 

위는 소비자별 금액 총액을 구하는 맵리듀스 연산이고 결과를 map_reduce_example이라는 새로운 컬렉션에 저장하는 예제이다. 우선 각 단계별로 사용법을 살펴보자.

 

-Map

mapFunction1이라는 변수에 함수를 정의한다. emit이라는 함수는 특정 키값별로 값을 그룹핑하는 함수이다. 여기서 this 연산자는 map 작업에 사용될 문서를 참조하는 키워드이다. 즉, cust_id 별로 price를 배열로 그룹핑하고 있다.

 

-Reduce

reduceFunction1이라는 변수에 함수를 정의한다. 이 함수는 그룹핑된 문서들의 값을 조작하는 함수이다. 즉, 결과값을 가공하는 단계라고 보면 된다. 인자의 keyCustId와 valuesPrices는 Map 단계에서 그룹핑된 키와 값의 배열을 인자로 받고 있다.

 

-output

db.orders.mapReduce 명령으로 우리가 정의한 Map함수와 Reduce함수를 전달하여 작업을 수행한 후 map_reduce_example이라는 컬렉션을 새로 생성하여 결과값을 저장한다.

 

문서하나로는 부족하니 3개의 문서를 더 삽입하여 다시 실행해보자.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
> db.orders.insertMany([{
...      cust_id: "abc123",
...      ord_date: new Date("Oct 04, 2012"),
...      status: 'A',
...      price: 40,
...      items: [ { sku: "mmm", qty: 5, price: 2.5 },
...               { sku: "nnn", qty: 5, price: 2.5 } ]
... },
... {
...      cust_id: "abc123",
...      ord_date: new Date("Oct 04, 2012"),
...      status: 'A',
...      price: 25,
...      items: [ { sku: "mmm", qty: 5, price: 2.5 },
...               { sku: "nnn", qty: 5, price: 2.5 } ]
... },
... {
...      cust_id: "abc123",
...      ord_date: new Date("Oct 04, 2012"),
...      status: 'A',
...      price: 10,
...      items: [ { sku: "mmm", qty: 5, price: 2.5 },
...               { sku: "nnn", qty: 5, price: 2.5 } ]
... }]);
 
> var mapFunction1 = function() {emit(this.cust_id, this.price)};
> var reduceFunction1 = function(keyCustId, valuesPrices) {return Array.sum(valuesPrices);};
> db.orders.mapReduce(mapFunction1,reduceFunction1,{ out: "map_reduce_example"});
{
    "result" : "map_reduce_example",
    "timeMillis" : 47,
    "counts" : {
        "input" : 4,
        "emit" : 4,
        "reduce" : 1,
        "output" : 1
    },
    "ok" : 1
}
> db.map_reduce_example.find()
"_id" : "abc123""value" : 100 }
cs

 

4개의 문서의 price를 모두 더하고 있다. 다음은 replace 출력이다. 기존 맵리듀스 결과를 저장하는 컬렉션에 더미 데이터를 하나 삽입하고 맵리듀스를 다시 실행시켰다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
> db.map_reduce_example.insert({description:"dummy data"});
WriteResult({ "nInserted" : 1 })
> db.map_reduce_example.find()
"_id" : "abc123""value" : 100 }
"_id" : ObjectId("5d859b0b9ca9d5ca23a3447e"), "description" : "dummy data" }
> var mapFunction1 = function() {                        emit(this.cust_id, this.price);                    };
> var reduceFunction1 = function(keyCustId, valuesPrices) {                           return Array.sum(valuesPrices);                       };
> db.orders.mapReduce(mapFunction1,reduceFunction1,{out:{replace:"map_reduce_example"}});
{
    "result" : "map_reduce_example",
    "timeMillis" : 44,
    "counts" : {
        "input" : 4,
        "emit" : 4,
        "reduce" : 1,
        "output" : 1
    },
    "ok" : 1
}
> db.map_reduce_example.find()
"_id" : "abc123""value" : 100 }
cs

 

기존 컬렉션의 데이터를 모두 지우고 새로운 맵리듀스 결과를 삽입하였다. 다음은 merge 출력이다. orders 컬렉션에 하나의 문서를 추가했고 map_reduce_example에 역시 더미 데이터를 하나 삽입하였다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
> db.map_reduce_example.insert({description:"dummy data"});
WriteResult({ "nInserted" : 1 })
> db.orders.insertOne({cust_id:"abc123",price:20});
{
    "acknowledged" : true,
    "insertedId" : ObjectId("5d859bdd9ca9d5ca23a34480")
}
> var mapFunction1 = function() {                        emit(this.cust_id, this.price);                    };
> var reduceFunction1 = function(keyCustId, valuesPrices) {                           return Array.sum(valuesPrices);                       };
> db.orders.mapReduce(mapFunction1,reduceFunction1,{out:{merge:"map_reduce_example"}});
{
    "result" : "map_reduce_example",
    "timeMillis" : 47,
    "counts" : {
        "input" : 5,
        "emit" : 5,
        "reduce" : 1,
        "output" : 2
    },
    "ok" : 1
}
> db.map_reduce_example.find()
"_id" : "abc123""value" : 120 }
"_id" : ObjectId("5d859bbf9ca9d5ca23a3447f"), "description" : "dummy data" }
cs

 

기존의 더미 데이터는 그대로 남아있고 새로운 결과를 기존 문서에 병합하였다. 마지막은 reduce 출력이다. orders 컬렉션에 새로운 문서하나를 삽입하였다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
> db.orders.insertOne({cust_id:"abc123",price:30});
{
    "acknowledged" : true,
    "insertedId" : ObjectId("5d859c8c9ca9d5ca23a34481")
}
> var mapFunction1 = function() {                        emit(this.cust_id, this.price);                    };
> var reduceFunction1 = function(keyCustId, valuesPrices) {                           return Array.sum(valuesPrices);                       };
> db.orders.mapReduce(mapFunction1,reduceFunction1,{out:{reduce:"map_reduce_example"}});
{
    "result" : "map_reduce_example",
    "timeMillis" : 39,
    "counts" : {
        "input" : 6,
        "emit" : 6,
        "reduce" : 1,
        "output" : 2
    },
    "ok" : 1
}
> db.map_reduce_example.find()
"_id" : "abc123""value" : 270 }
"_id" : ObjectId("5d859bbf9ca9d5ca23a3447f"), "description" : "dummy data" }
cs

 

내가 생각했던 결과와는 조금 다르다. 내가 생각했던 결과를 실제 결과를 보고 다시 생각하니 당연히 내가 생각한 결과가 나올 수가없다. 내가 생각한 결과는 기존에 120이 합산되어있고 price가 30인 문서를 새로 삽입했으니 150이 되겠지 했는데, 결과는 당연히 아니다 맵리듀스결과는 150으로 나오고 기존 120에 150이 합해져 270이라는 결과가 나오는 것이다. 당연하다..맵리듀스 결과 컬렉션에 문서단위로 들어간게 아니니..당연히 120에 150이 더해진다. 보통 incremental 맵리듀스는 날짜등을 기준으로 증분시키면서 사용하면 될듯하다. 역시 바보였다 나는..

 

다음은 finalize를 통해 맵리듀스 결과를 다시 한번 가공하는 예제이다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
> var mapFunction2 = function() {
...                        for (var idx = 0; idx < this.items.length; idx++) {
...                            var key = this.items[idx].sku;
...                            var value = {
...                                          count: 1,
...                                          qty: this.items[idx].qty
...                                        };
...                            emit(key, value);
...                        }
...                     };
> var reduceFunction2 = function(keySKU, countObjVals) {
...                      reducedVal = { count: 0, qty: 0 };
... 
...                      for (var idx = 0; idx < countObjVals.length; idx++) {
...                          reducedVal.count += countObjVals[idx].count;
...                          reducedVal.qty += countObjVals[idx].qty;
...                      }
... 
...                      return reducedVal;
...                   };
> var finalizeFunction2 = function (key, reducedVal) {
... 
...                        reducedVal.avg = reducedVal.qty/reducedVal.count;
... 
...                        return reducedVal;
... 
...                     };
> db.orders.mapReduce( mapFunction2,
...                      reduceFunction2,
...                      {
...                        out: { merge: "map_reduce_example" },
...                        query: { ord_date:
...                                   { $gt: new Date('01/01/2012') }
...                               },
...                        finalize: finalizeFunction2
...                      }
...                    )
{
    "result" : "map_reduce_example",
    "timeMillis" : 47,
    "counts" : {
        "input" : 4,
        "emit" : 8,
        "reduce" : 2,
        "output" : 4
    },
    "ok" : 1
}
> db.map_reduce_example.find()
"_id" : "abc123""value" : 270 }
"_id" : ObjectId("5d859bbf9ca9d5ca23a3447f"), "description" : "dummy data" }
"_id" : "mmm""value" : { "count" : 4"qty" : 20"avg" : 5 } }
"_id" : "nnn""value" : { "count" : 4"qty" : 20"avg" : 5 } }
cs

 

-input

입력값을 전체 문서 대상이 아닌 ord_date 값이 2012년1월1일 이후인 문서를 입력값으로 받고 있다.

 

-Map

item 배열을 하나씩 순회하면서 값을 채워주고 있다. 여기서 value는 하나의 문서로 들어가고 있다. 처음에 설명했듯이 key나 value에는 스칼라 값뿐만 아니라 문서도 들어갈 수 있다.

 

-Reduce

하나의 key 값에 대해 값을 누적시키고 있다. 각 배열의 요소에는 count가 1과 각자의 qty를 가지고 있고, 해당 값들을 누적시키기 위해서 reducedVal 변수를 초기화했다.

 

-finalize

맵리듀스의 결과를 키/값으로 받고 해당 값들을 가공하고 있다. 여기서는 평균 수량을 구하기 위한 연산을 수행하고 있고 인자로 들어온 reducedVal에 새로운 필드를 생성하고 평균값을 넣었다.

 

-output

map_reduce_example 컬렉션으로 결과를 merge한다.

 

Incremental MapReduce

증분 맵리듀스에 대한 간단한 예제이다. 각 map/reduce/finalize에 대한 설명은 하지 않는다.

 

1
2
3
4
5
6
7
8
9
db.sessions.save( { userid: "a", ts: ISODate('2011-11-03 14:17:00'), length95 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-03 14:23:00'), length110 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-03 15:02:00'), length120 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-03 16:45:00'), length45 } );
 
db.sessions.save( { userid: "a", ts: ISODate('2011-11-04 11:05:00'), length105 } );
db.sessions.save( { userid: "b", ts: ISODate('2011-11-04 13:14:00'), length120 } );
db.sessions.save( { userid: "c", ts: ISODate('2011-11-04 17:00:00'), length130 } );
db.sessions.save( { userid: "d", ts: ISODate('2011-11-04 15:37:00'), length65 } );
cs

 

위 문서들을 삽입해준다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
> db.sessions.insertMany([db.sessions.save( { userid: "a", ts: ISODate('2011-11-03 14:17:00'), length95 } ),
... db.sessions.save( { userid: "b", ts: ISODate('2011-11-03 14:23:00'), length110 } ),
... db.sessions.save( { userid: "c", ts: ISODate('2011-11-03 15:02:00'), length120 } ),
... db.sessions.save( { userid: "d", ts: ISODate('2011-11-03 16:45:00'), length45 } ),
... db.sessions.save( { userid: "a", ts: ISODate('2011-11-04 11:05:00'), length105 } ),
... db.sessions.save( { userid: "b", ts: ISODate('2011-11-04 13:14:00'), length120 } ),
... db.sessions.save( { userid: "c", ts: ISODate('2011-11-04 17:00:00'), length130 } ),
... db.sessions.save( { userid: "d", ts: ISODate('2011-11-04 15:37:00'), length65 } )]);
{
    "acknowledged" : true,
    "insertedIds" : [
        ObjectId("5d85a1f29ca9d5ca23a3448a"),
        ObjectId("5d85a1f29ca9d5ca23a3448b"),
        ObjectId("5d85a1f29ca9d5ca23a3448c"),
        ObjectId("5d85a1f29ca9d5ca23a3448d"),
        ObjectId("5d85a1f29ca9d5ca23a3448e"),
        ObjectId("5d85a1f29ca9d5ca23a3448f"),
        ObjectId("5d85a1f29ca9d5ca23a34490"),
        ObjectId("5d85a1f29ca9d5ca23a34491")
    ]
}
> var mapFunction = function() {
...                       var key = this.userid;
...                       var value = {
...                                     userid: this.userid,
...                                     total_time: this.length,
...                                     count: 1,
...                                     avg_time: 0
...                                    };
... 
...                       emit( key, value );
...                   };
> var reduceFunction = function(key, values) {
... 
...                         var reducedObject = {
...                                               userid: key,
...                                               total_time: 0,
...                                               count:0,
...                                               avg_time:0
...                                             };
... 
...                         values.forEach( function(value) {
...                                               reducedObject.total_time += value.total_time;
...                                               reducedObject.count += value.count;
...                                         }
...                                       );
...                         return reducedObject;
...                      };
> var finalizeFunction = function (key, reducedValue) {
... 
...                           if (reducedValue.count > 0)
...                               reducedValue.avg_time = reducedValue.total_time / reducedValue.count;
... 
...                           return reducedValue;
...                        };
> db.sessions.mapReduce( mapFunction,
...                        reduceFunction,
...                        {
...                          out: "session_stat",
...                          finalize: finalizeFunction
...                        }
...                      )
{
    "result" : "session_stat",
    "timeMillis" : 44,
    "counts" : {
        "input" : 16,
        "emit" : 16,
        "reduce" : 5,
        "output" : 5
    },
    "ok" : 1
}
> db.session_stat.find()
"_id" : null"value" : { "userid" : null"total_time" : NaN, "count" : 8"avg_time" : NaN } }
"_id" : "a""value" : { "userid" : "a""total_time" : 200"count" : 2"avg_time" : 100 } }
"_id" : "b""value" : { "userid" : "b""total_time" : 230"count" : 2"avg_time" : 115 } }
"_id" : "c""value" : { "userid" : "c""total_time" : 250"count" : 2"avg_time" : 125 } }
"_id" : "d""value" : { "userid" : "d""total_time" : 110"count" : 2"avg_time" : 55 } }
cs

 

첫 맵리듀스의 수행 결과이다. 유저 아이디당 총 세션시간과 접속 횟수, 평균 세션 시간을 넣고 있다. 추가적으로 문서를 추가한다.

 

1
2
3
4
db.sessions.save( { userid: "a", ts: ISODate('2011-11-05 14:17:00'), length100 } ),
db.sessions.save( { userid: "b", ts: ISODate('2011-11-05 14:23:00'), length115 } ),
db.sessions.save( { userid: "c", ts: ISODate('2011-11-05 15:02:00'), length125 } ),
db.sessions.save( { userid: "d", ts: ISODate('2011-11-05 16:45:00'), length55 } )
cs

 

맵리듀스를 2011년11월5일 이후에 새로 들어온 문서를 기준으로 결과를 내고, 이전의 맵리듀스 결과에 증분(누적)시킨다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
> db.sessions.insertMany([db.sessions.save( { userid: "a", ts: ISODate('2011-11-05 14:17:00'), length100 } ),
... db.sessions.save( { userid: "b", ts: ISODate('2011-11-05 14:23:00'), length115 } ),
... db.sessions.save( { userid: "c", ts: ISODate('2011-11-05 15:02:00'), length125 } ),
... db.sessions.save( { userid: "d", ts: ISODate('2011-11-05 16:45:00'), length55 } )]);
{
    "acknowledged" : true,
    "insertedIds" : [
        ObjectId("5d85a29d9ca9d5ca23a34496"),
        ObjectId("5d85a29d9ca9d5ca23a34497"),
        ObjectId("5d85a29d9ca9d5ca23a34498"),
        ObjectId("5d85a29d9ca9d5ca23a34499")
    ]
}
> var mapFunction = function() {
...                       var key = this.userid;
...                       var value = {
...                                     userid: this.userid,
...                                     total_time: this.length,
...                                     count: 1,
...                                     avg_time: 0
...                                    };
... 
...                       emit( key, value );
...                   };
> var reduceFunction = function(key, values) {
... 
...                         var reducedObject = {
...                                               userid: key,
...                                               total_time: 0,
...                                               count:0,
...                                               avg_time:0
...                                             };
... 
...                         values.forEach( function(value) {
...                                               reducedObject.total_time += value.total_time;
...                                               reducedObject.count += value.count;
...                                         }
...                                       );
...                         return reducedObject;
...                      };
> var finalizeFunction = function (key, reducedValue) {
... 
...                           if (reducedValue.count > 0)
...                               reducedValue.avg_time = reducedValue.total_time / reducedValue.count;
... 
...                           return reducedValue;
...                        };
> db.sessions.mapReduce( mapFunction,
...                        reduceFunction,
...                        {
...                          query: { ts: { $gt: ISODate('2011-11-05 00:00:00') } },
...                          out: { reduce: "session_stat" },
...                          finalize: finalizeFunction
...                        }
...                      );
{
    "result" : "session_stat",
    "timeMillis" : 43,
    "counts" : {
        "input" : 4,
        "emit" : 4,
        "reduce" : 0,
        "output" : 5
    },
    "ok" : 1
}
> db.session_stat.find()
"_id" : null"value" : { "userid" : null"total_time" : NaN, "count" : 8"avg_time" : NaN } }
"_id" : "a""value" : { "userid" : "a""total_time" : 300"count" : 3"avg_time" : 100 } }
"_id" : "b""value" : { "userid" : "b""total_time" : 345"count" : 3"avg_time" : 115 } }
"_id" : "c""value" : { "userid" : "c""total_time" : 375"count" : 3"avg_time" : 125 } }
"_id" : "d""value" : { "userid" : "d""total_time" : 165"count" : 3"avg_time" : 55 } }
cs

 

결과가 누적된 것을 볼수 있다. 이런식으로 날짜를 기준으로 증분 맵리듀스를 사용하면 가장 활용도가 좋을 듯 싶다. 즉, 시점을 기준으로 하자는 것이다.

 

마지막으로 맵리듀스를 사용하면서 꼭 알아야할 내용이다.

 

  • Map 함수에서 호출하는 emit() 함수의 두번째 인자와 Reduce 함수의 리턴 값은 같은 포맷이어야 한다.
  • Reduce 함수의 연산 작업은 멱등(Idempotent)이어야 한다. 즉, Reduce 함수 작업이 멱등하지 않다면 정확한 결과가 나오지 않을 수 있다.

 

여기까지 간단하게 몽고디비의 맵리듀스를 다루어보았다. 사실 영어로된 레퍼런스가 해석이 애매모호한 부분도 많고 아직 다루어보지 않은 맵리듀스 관련 내용도 있다. 해당 내용들을 보완해 추후에 다루어볼 것이다. 이번 포스팅은 맵리듀스가 뭐고 간단히 어떻게 사용해볼 수 있냐를 다루어봤다.

posted by 여성게
:
Database/MongoDB 2019. 9. 19. 13:41

 

MongoDB에서는 여러 명령을 하나의 트랜잭션으로 묶어서 사용할 수 없다. 그 이유는 몽고디비는 단일 문서 단위의 트랜잭션만 지원되기 때문이다. 이때문에 변경 직전이나 직후의 문서 데이터를 확인하기란 쉽지 않다. 사실 일반적으로 응용 프로그램에서 변경 직후의 데이터는 자신이 직접 변경한 데이터이므로 크게 필요없을 수 있지만, 변경 직전의 데이터를 확인하는 기능은 필요할 수 있다. 이러한 기능을 제공하기 위해서 몽고디비는 FindAndModify라는 명령을 제공한다. 해당 명령은 검색 조건에 일치하는 문서를 검색하고, 그 문서를 변경하거나 삭제하는 후속 오퍼레이션을 설정할 수 있다.

 

> db.collection.findAndModify({

        query:<document>,

        sort:<document>,

        remove:<boolean>,

        update:<document>,

        new:<boolean>,

        fields:<document>,

        upsert:<boolean>,

        bypassDocumentValidation:<boolean>,

        writeConcern:<document>,

        collation:<document>

});

 

위는 findAndModify 사용법이다. 주의해야할 점은 FindAndModify 명령의 조건에 일치하는 문서는 여러개 일 수 있다. 하지만 해당 명령은 반드시 하나의 문서만 변경 혹은 삭제하고, 변경된 도큐먼트를 반환한다. 만약 해당 명령의 조건에 일치하는 도큐먼트 중에서 특정 도큐먼트를 변경하거나 삭제하고 싶다면 sort옵션을 사용하면 된다.

 

옵션 설명
query 변경하고자 하는 도큐먼트를 검색할 조건을 명시한다. 주어진 조건에 일치하는 도큐먼트가 여러 개라 하더라도 그 중에서 첫 번째로 검색된 문서에 대해서만 변경 또는 삭제 작업을 수행한다.
sort 검색된 문서가 여러 개일 때, 실제 몽고디비서버가 어떤 문서를 변경했는지 명확히 판단이 힘들다. 그래서 검색 조건에 일치하는 문서가 여러 개일 것이라고 예상될 때, sort 옵션을 이용해서 변경 또는 삭제할 문서를 조정할 수 있다.
remove 검색된 문서 결과를 삭제한다. 기본 값은 false이며, 이 값을 true로 설정하면 몽고디비 서버는 검색 결과를 삭제한다. 만약 upsert 옵션을 설정하면 해당 옵션은 설정이 되지 않거나 false로 설정해야 한다. 하나의 findAndModify 명령으로 update,delete를 동시에 할 수 없다.
update 검색된 문서를 어떻게 변경할 지 설정한다. update 쿼리의 두번째 인자처럼 사용하면 된다.
new findAndModify 명령은 검색된 문서를 변경하거나 삭제하고, 변경하거나 삭제된 문서를 반환한다. 이때 삭제 또는 변경되기 직전의 문서를 반환할지 아니면 변경 또는 삭제된 이후의 문서를 반환할 것인지 new 옵션으로 결정한다. 기본값은 false이며, true로 설정하면 변경 직후의 문서가 반환된다.
fields 결과로 반환되는 문서의 Projection을 설정한다. 즉, 결과로 반환될 문서의 노출 필드를 지정하는 것이다.
upsert 검색결과가 있으면 update, 없다면 insert를 수행한다.
bypassDocumentValidation 몽고디비는 삽입,수정 시점에 필드에 대한 유효성 체크 지정이 가능한데, 해당 옵션을 통해 문서 유효성 검사를 건너 뛰게 설정한다.
writeConcern 변경 또는 삭제 작업의 writeConcern을 조정한다.
maxTimeMS findAndModify 명령이 실행될 최대 시간을 밀리초 단위로 설정한다.
collation 문서 검색시 사용할 콜레이션을 지정한다.

 

upsert와 new 옵션 값에 대한 결과값이다.

 

  new = false new = true
upsert = false 변경되기 전 문서 반환, 검색 결과가 없으면 Null 검색된 결과가 있으면 변경된 직후의 문서 반환, 검색 결과가 없으면 Null
upsert = true 변경되기 전 문서 반환, 검색 결과가 없으면 Null 검색된 결과가 있으면 변경된 직후의 문서 반환, 검색된 결과가 없으면 Insert된 문서 반환

 

upsert 옵션이 true인 findAndModify 명령과 update 명령과 아주 유사해보인다. 이 둘의 차이점은 아래와 같다.

 

  • 디폴트 옵션에서는 두 명령보두 하나의 문서만 변경할 수 있다. 하지만 update 명령은 multi 옵션을 true로 설정해서 여러 문서를 한번에 변경할 수 있다.
  • update 명령은 실제 어떤 문서를 변경할지 알 수없다.(매칭된 문서가 여러개일때) 하지만 findAndModify는 sort 옵션을 통해 특정 문서를 정렬해 첫 번째 문서만 변경할 수 있다.
  • update명령은 처리 결과를 반환하지만, findAndModify는 변경전 혹은 직후의 문서를 결과로 반환한다.

 

> db.users.insertMany([{name:"abc",age:22},{name:"yeoseong",age:28},{name:"sora",age:30},{name:"mija",age:50}])

{

"acknowledged" : true,

"insertedIds" : [

ObjectId("5d83050d04a68b589a9c1aa5"),

ObjectId("5d83050d04a68b589a9c1aa6"),

ObjectId("5d83050d04a68b589a9c1aa7"),

ObjectId("5d83050d04a68b589a9c1aa8")

]

}

> db.collection.findAndModify({

... query:{name:"abc"},

... sort:{name:1},

... remove:false,

... update:{$set:{name:"cba"}},

... new:true,

... fields:{_id:0,name:1,age:1},

... upsert:false,

... bypassDocumentValidation:false})

 

위와 같이 문서를 삽입하고 findAndModify 명령을 수행했다. 결과 값으로는 아래와 같다.

 

 

new를 true로 적용했고, 프로젝션은 이름과 나이로 적용했으므로 변경된 직후의 문서가 결과로 반환된다. 반대로 적용해보자.

 

> db.collection.findAndModify({

... query:{name:"cba"},

... sort:{name:1},

... remove:false,

... update:{$set:{name:"abc"}},

... new:false,

... fields:{_id:0,name:1,age:1},

... upsert:false,

... bypassDocumentValidation:false})

 

 

 

결과로 변경되기 전의 문서가 반환되었다. 간단한 예제이지만, 다양하게 옵션을 적용해보며 사용해보면 추후에 유용하게 사용될 명령일 듯하다. 그리고 잘만 사용한다면 insert,update,delete를 해당 명령으로만 사용가능하지 않을까? 물론 문서하나씩만 수행되는 명령이지만..또한 성능이 어쩔지는 모르지만..

posted by 여성게
:
Database/MongoDB 2019. 9. 19. 12:06

 

해당 예외는 정렬시 사용되는 메모리 크기에 관한 예외이다. find로 데이터를 조회한 후에 sort()를 통해서 정렬을 할때, 만약 인덱스를 이용해서 정렬을 수행할 수 있을 때는 메모리 크기와 크게 관계가 없지만, sort() 옵션이 인덱스를 사용할 수 없을 때는 MongoDB 서버가 쿼리를 실행하는 도중에 퀵소트를 실행해서 find 명령의 결과 도큐먼트를 정렬한 다음 클라이언트에게 응답한다. 이때 정렬을 위한 추가적인 큰 메모리 공간이 필요하다. 몽고디비 서버는 기본값으로 정렬을 수행할때 사용할 수 있는 메모리값이 대략 32MB이다. 즉, 아주 큰 결과 도큐먼트들을 정렬할 때는 해당 값(32MB)을 초과하여 위와 같은 예외를 발생시킬 수 있다.

 

이렇게 메모리 공간이 부족해서 정렬을 수행하지 못하는 경우에는 3가지 정도의 해결방법이 있다. 정확히는 해결방법이라기 보다는 우회방법이라고도 볼 수 있을 것 같다.

 

  1. 정렬 작업이 인덱스 필드를 활용할 수 있게 쿼리를 변경

  2. 정렬을 위한 메모리 공간을 더 크게 설정

  3. find() 대신 aggregate()를 사용하고 allowDiskUse 옵션을 true로 설정

> use admin

> db.runCommand({setParameter:1,internalQueryExecMaxBlockingSortBytes:1024*1024*1024})

 

> db.collection.aggregate([$sort:{field:1}],{allowDiskUse:true})

 

추가적으로 정렬을 위해 할당된 메모리는 쿼리가 완전히 완료되기 전까지는 운영체제에 자원을 반납하지 않는다. 즉, 클라이언트의 쿼리의 결과를 모두 가져가지 전까지는 정렬을 위해서 할당된 메모리 공간이 반납되지 못한다는 것이다. 만약 클라이언트가 나머지 결과가 더 필요하지 않아 커서에 도큐먼트가 남아 있는 상태로 방치한다면 커서 타임아웃(기본값 10분)이 나기전까지는 메모리에 그대로 남아있게 된다. 

 

그래서 쿼리의 실행 결과로 전달받은 커서는 반드시 모든 도큐먼트를 클라이언트로 가져오거나, 그렇지 않고 더 이상 필요하지 않아서 도큐먼트 패치를 중간에 멈추는 경우에는 커서를 반드시 닫아주는 게 좋다.

posted by 여성게
:
Database/MongoDB 2019. 9. 19. 10:38

 

MongoDB의 Insert 문은 2개의 인자가 들어간다. 첫번째 인자는 삽입할 문서, 두번째 인자는 선택적인 옵션이다. 여기서 오늘 알아볼 것은 두번째 인자중 ordered에 대해서 알아볼 것이다. 

 

우선 몽고디비에서는 디폴트로 ordered가 true인 상태에서 삽입을 수행한다. ordered가 true라는 것은 무슨 뜻일까? 만약 삽입하는 문서가 단일 문서가 아니고, insertMany를 사용하여 여러 문서를 한번에 삽입하는 상황을 생각해보자. 만약 ordered가 true라면 싱글 스레드로 삽입하려는 여러개의 문서(배열)를 명시된 순서대로 하나씩 삽입할 것이다. 순서가 중요한 상황이라면 유용할 것이다. 하지만 ordered가 false라면 멀티스레드로 여러 문서를 병렬로 삽입한다. 

 

그렇다면 당연히 ordered를 false로 하는 것이 좋지 않냐라는 생각이 들 수 있다. 답은 상황에 따라 다르다는 것이다. 아래와 같은 상황을 생각해보자.

 

> db.insertcollection.createIndex({name:1},{unique:true})

> db.insertcollection.insertMany([{name:"a"},{name:"a"},{name:"c"},{name:"b"}],{ordered:true})

 

 

유니크한 인덱스 필드로 "name"필드를 지정하였고 벌크로 4개의 문서를 삽입하는 상황이다.(ordered:true는 생략가능) 이 삽입 쿼리는 어떤 결과를 반환할까? 아마 유니크한 인덱스인데 중복된 값이 들어왔다는 예외를 반환할 것이다. ordered가 true라면 몽고디비는 내부적으로 예외가 발생한 시점에서 삽입연산을 마치게 된다. 그 말은 위의 쿼리에서 name:"a"라는 문서하나만 삽입되고 나머지는 삽입연산 수행이 중지 될 것이다. 그렇다면 아래와 같은 쿼리는 어떻게 될까?

 

> db.insertcollection.insertMany([{name:"a"},{name:"a"},{name:"c"},{name:"b"}],{ordered:false})

 

 

분명 중복 예외가 발생했음에도 불구하고 예외가 발생한 문서를 제외하고 모두 삽입이 되었다. 이것이 바로 ordered 옵션의 차이점이다. ordered가 true라면 단일스레드가 순차적으로 문서를 삽입하다 예외가 발생하면 그 상태에서 작업을 멈추지만, ordered가 false라면 예외가 발생하여도 그 문서만 무시하고 나머지 작업을 계속 수행하게 된다.

 

무엇이 좋고 나쁨을 사실 이야기하기 힘들다. 상황에 맞게 사용하는 것이 좋을 듯하다.

posted by 여성게
:
Database/MongoDB 2019. 9. 16. 23:26

 

MongoDB는 문자열 내용의 텍스트 검색을 수행하는 쿼리를 지원한다. 텍스트 검색을 수행하기 위해 몽고디비는 텍스트 인덱스와 $text 연산자를 사용한다.(View는 텍스트 검색을 지원하지 않는다.)

 

예제 진행을 위해 아래 문서들을 삽입한다.

 

1
2
3
4
5
6
7
8
9
db.stores.insert(
   [
     { _id: 1, name: "Java Hut", description: "Coffee and cakes" },
     { _id: 2, name: "Burger Buns", description: "Gourmet hamburgers" },
     { _id: 3, name: "Coffee Shop", description: "Just coffee" },
     { _id: 4, name: "Clothes Clothes Clothes", description: "Discount clothing" },
     { _id: 5, name: "Java Shopping", description: "Indonesian goods" }
   ]
)
cs

 

Text Index

몽고디비는 문자열 컨텐츠에 대한 텍스트 검색 쿼리를 지원하기 위해 텍스트 인덱스을 제공한다. text 인덱스은 값이 문자열 또는 문자열 요소 배열인 필드를 포함할 수 있다.

 

텍스트 검색 쿼리를 수행하려면 컬렉션에 text 인덱스가 있어야한다. 컬렉션은 하나의 텍스트 검색 인덱스만 가질 수 있지만 해당 인덱스는 여러 필드를 포함할 수 있다.

 

> db.stores.createIndex({name:"text",description:"text"})

 

위 명령은 name 필드와 description 필드를 text 타입으로 인덱스를 생성한다. 위는 두 개의 텍스트 인덱스 필드를 지정하였다.

 

$text Operation

$text 쿼리 연산자를 이용하여 텍스트 인덱스가 있는 컬렉션에서 텍스트 검색을 할 수 있다.

 

$text는 공백과 대부분의 구두점을 구분 기호로 사용하여 검색 문자열을 토큰화하고 모든 토큰에 대해 OR 논리조건으로 쿼리를 수행한다.

 

> db.stores.find( { $text: { $search: "java coffee shop" } } )

 

위 텍스트 검색의 결과이다.

 

 

검색 조건의 "java coffee shop"이 java / coffee / shop 으로 토크나이징되고 각각의 토큰이 하나라도 포함되어 있다면 해당 문서를 결과값에 포함시킨다.

 

Exact Phrase

문자열 검색에 사용된 조건에 해당하는 문자열을 정확하게 일치하는 문서를 검색할 수도 있다.

 

> db.stores.find( { $text: { $search: "\"coffee shop\"" } } )

 

큰 따옴표로 검색 조건의 문자열을 묶으면 대소구분없이 "coffee shop"과 정확히 일치하는 문서만 결과값에 포함시킨다.

 

Term Exclustion

"-"연산자를 사용하여 검색에 제외할 텀을 지정할 수 있다. 밑의 명령은 java 또는 shop을 포함하고 coffee는 포함하지 않는 텍스트를 검색한다.

 

> db.stores.find( { $text: { $search: "java shop -coffee" } } )

 

Sort

몽고디비는 기본적으로 정렬되지 않은 검색 결과를 반환한다. 그러나 텍스트 검색 쿼리는 문서가 쿼리와 얼마나 잘 일치 하는지를 지정하는 각 문서에 대한 스코어를 계산하기 때문에 스코어링 기준으로 정렬이 가능하다.

 

> db.stores.find(

...    { $text: { $search: "java coffee shop" } },

...    { score: { $meta: "textScore" } }

... ).sort( { score: { $meta: "textScore" } } )

 

위 쿼리의 결과값이다. 결과값에 스코어값이 포함된 것을 볼 수 있다.

 

 

간단하게 몽고디비의 텍스트 검색에 대해 다루어봤습니다. 추후에 집계 부분에서도 또 다루어보겠지만, 텍스트 검색에 대해 다루어보지 않은 부분이 몇개 있으므로 나중에 다시 다루어 보겠습니다.

posted by 여성게
:
Database/MongoDB 2019. 9. 16. 22:57

 

이번 포스팅 내용은 자주 사용되는 SQL문과 MongoDB와의 쿼리를 비교하는 포스팅입니다. 

 

SQL Schema Statements MongoDB Schema Statements

CREATE TABLE people(

id MEDIUMINT NOT NULL AUTO_INCREMENT,

user_id varchar(30),

age number,

status char(1),

PRIMARY KEY(id)

)

db.people.insertOne({

user_id:"abc123",

age:55,

status:"A"

})

암시적으로 삽입 작업에서 _id를 생략했다면 내부적으로 _id에 값을 추가한다. 컬렉션 또한 삽입시점에 생성된다.

 

그러나 컬렉션을 명시적으로 생성가능하다.

 

db.createCollection("people")

 

ALTER TABLE people ADD join_date DATETIME

컬렉션은 문서 구조를 설명하거나 강제하지 않는다. 

그러나, 문서 레벨에서 updateMany() 오퍼레이션은 $set 오퍼레이션을 사용하여 존재하는 문서에 새로운 필드를 추가할 수 있다.

 

db.people.updateMany({},{$set:{join_date:new Date()}})

ALTER TABLE people DROP COLUMN join_date

컬렉션은 문서 구조를 설명하거나 강제하지 않는다.

 

그러나, 문서 레벨에서 updateMany() 오퍼레이션은 $unset 오퍼레이션을 사용하여 이미 존재하는 문서의 필드를 제거할 수 있다.

 

db.people.updateMany({},{$unset:{"join_date":""}})

CREATE INDEX idx_user_id_asc ON people(user_id) db.people.createIndex({user_id:1})
CREATE INDEX idx_user_id_asc_age_desc ON people(user_id,age DESC) db.people.createIndex({user_id:1,age:-1})
DROP TABLE people db.people.drop()

 

위는 테이블 수준에서 자주 사용되는 SQL과 비교한 몽고디비 명령이다.

 

Insert

 

SQL Insert Statements MongoDB insertOne() Statements
INSERT INTO people(user_id,age,status) VALUES ("bcd001",45,"A") db.people.insertOne({user_id:"bcd001",age:45,status:"A"})

 

Select

암묵적으로 몽고디비는 _id 필드를 결과에 포함시킨다. 만약 _id 필드를 결과에서 제외시키고 싶다면 명시적으로 옵션을 지정해야한다.

SQL Select Statements MongoDB find() Statements
SELECT * FROM PEOPLE db.people.find()
SELECT id,user_id,status FROM people db.people.find({},{user_id:1,status:1})
SELECT * FROM people WHERE status = "A" db.people.find({status:"A"})
SELECT user_id,status FROM people WHERE status = "A" db.people.find({status:"A"},{user_id:1,status:1,_id:0})
SELECT * FROM people WHERE status != "A" db.people.find({status:{$ne:"A"}})
SELECT * FROM people WHERE status = "A" AND age = 50 db.people.find({status:"A",age:50})
SELECT * FROM people WHERE status = "A" OR age = 50 db.people.find({$or:[{status:"A"},{age:50}]})
SELECT * FROM people WHERE age > 25 AND age <=50 db.people.find({age:{$gt:25,$lte:50}})
SELECT * FROM people WHERE user_id LIKE "%bc%"

db.people.find({user_id:/bc/})

or

db.people.find({user_id:{$regex:/bc/}})

SELECT * FROM people WHERE user_id LIKE "bc%"

db.people.find({user_id:/^bc/})

or

db.people.find({user_id:{$regex:/^bc/}})

SELECT * FROM people WHERE status = "A" ORDER BY user_id ACS db.people.find({status:"A"}).sort({user_id:1})
SELECT * FROM people WHERE status="A" ORDER BY user_id DESC db.people.find({status:"A"}).sort({user_id:-1})
SELECT COUNT(*) FROM people

db.people.count()

or

db.people.find().count()

SELECT COUNT(user_id) FROM people

db.people.count({user_id:{$exist:true}})

or

db.people.find({user_id:{$exist:true}}).count()

SELECT COUNT(*) FROM people WHERE age > 30

db.people.count({age:{$gt:30}})

or

db.people.find({age:{$gt:30}}).count()

SELECT DISTINCT(status) FROM people

db.people.aggregate([{$group:{_id:"$status"}}])

or

db.people.distinct("status")

SELECT * FROM people LIMIT 1

db.people.findOne()

or

db.people.find().limit(1)

SELECT * FROM people LIMIT 5 SKIP 10 db.people.find().limit(5).skip(10)

 

Update Records

 

SQL Update Statements MongoDB updateMany() Statements
UPDATE people SET status = "C" WHERE age > 25 db.people.updateMany({age:{$gt:25}},{$set:{status:"C"}})
UPDATE people SET age = age +3 WHERE status = "A" db.people.updateMany({status:"A"},{$inc:{age:3}})

 

Delete Records

 

SQL Delete Statements MongoDB deleteMany() Statements
DELETE FROM people WHERE status = "D" db.people.deleteMany({status:"D"})
DELETE FROM people db.people.deleteMany({})

 

여기까지 자주 사용되는 SQL에 대한 MongoDB 쿼리 비교였습니다. 이보다 더 많은 기능들이 있지만 추후 천천히 다루어보겠습니다.

posted by 여성게
:
Database/MongoDB 2019. 9. 16. 21:16

 

이번 포스팅은 몽고디비 CRUD 3번째 글입니다. 이번 내용은 문서 update부터 다루어볼 예정입니다. 혹시나 이전 포스팅을 못 보신분들은 간단히 아래 링크에서 참고 부탁드립니다.

 

 

DB - MongoDB CRUD 사용방법 및 기타 사용방법 - 1

이번 포스팅은 간단하게 MongoDB 사용법에 대해 다루어봅니다. 모든 쿼리는 특정 클라이언트 드라이버를 이용하는 것이 아니라, Shell을 이용하여 직접 쿼리를 작성해보는 내용입니다. 실습 이전에 혹시나 몽고디..

coding-start.tistory.com

 

DB - MongoDB CRUD 사용방법 및 기타 사용방법 - 2

몽고디비 CRUD 사용방법을 다루는 포스팅 2번째 글입니다. 만약 첫번째 글을 못보신 분은 아래 링크를 참조하시길 바랍니다. DB - MongoDB CRUD 사용방법 및 기타 사용방법 - 1 이번 포스팅은 간단하게 MongoDB..

coding-start.tistory.com

Update Documents

예제에 앞서 아래 문서들을 삽입해줍니다. 아래 문서들을 이용해서 예제를 진행할 것입니다.

 

1
2
3
4
5
6
7
8
9
10
11
12
db.inventory.insertMany( [
   { item: "canvas", qty: 100, size: { h: 28, w: 35.5, uom: "cm" }, status: "A" },
   { item: "journal", qty: 25, size: { h: 14, w: 21, uom: "cm" }, status: "A" },
   { item: "mat", qty: 85, size: { h: 27.9, w: 35.5, uom: "cm" }, status: "A" },
   { item: "mousepad", qty: 25, size: { h: 19, w: 22.85, uom: "cm" }, status: "P" },
   { item: "notebook", qty: 50, size: { h: 8.5, w: 11, uom: "in" }, status: "P" },
   { item: "paper", qty: 100, size: { h: 8.5, w: 11, uom: "in" }, status: "D" },
   { item: "planner", qty: 75, size: { h: 22.85, w: 30, uom: "cm" }, status: "D" },
   { item: "postcard", qty: 45, size: { h: 10, w: 15.25, uom: "cm" }, status: "A" },
   { item: "sketchbook", qty: 80, size: { h: 14, w: 21, uom: "cm" }, status: "A" },
   { item: "sketch pad", qty: 95, size: { h: 22.85, w: 30.5, uom: "cm" }, status: "A" }
] );
cs

 

-Update Documents in a Collection

몽고디비에선 문서 업데이트를 위해 $set와 같은 업데이트 연산자를 제공하여 필드값을 수정한다. $set과 같은 몇몇 수정 연산자는 필드가 존재하지 않을 경우 필드를 생성해준다.

 

-Update a Single Document

db.collection.updateOne() 명령어를 이용해서 단일 문서 수정을 수행한다.

 

> db.inventory.updateOne({item:"paper"},{$set:{"size.uom":"cm",status:"P"},$currentDate:{lastModified:true}})

 

위 쿼리는 item이 paper인 첫번째 문서의 임베디드 문서 필드인 size 필드의 uom값을 cm으로 status필드의 값을 P로 바꾸어준다. 그리고 $currentDate 연산자를 사용하여 lastModified 필드의 값을 현재 날짜로 업데이트한다. 만약 lastModified 필드가 없으면 $currentDate가 필드를 만들어 값을 넣어준다.

 

-Update Multiple Documents

db.collection.updateMany() 명령어를 이용해서 다수의 문서를 한번에 수정한다.

 

> db.inventory.updateMany({qty:{$lt:50}},{$set:{"size.uom":"in",status:"P"},$currentDate:{lastModified:true}})

 

위 쿼리는 qty 필드의 값이 50보다 작은 모든 문서에 대해 $set 연산자에 대한 필드를 수정하고, lastModified 필드에 변경 날짜와 시간을 넣어준다.

 

-Replace a Document

_id 필드를 제외한 문서의 전체 내용을 바꾸려면 완전히 새로운 문서를 두 번째 인수로 db.collection.replaceOne()에 전달한다.

 

문서를 교체 할 때 교체 문서는 필드 / 값 쌍으로 만 구성되어야한다. 즉, 업데이트 연산자 표현식을 포함하면 안된다.

 

대체 문서는 원본 문서와 다른 필드를 가질 수 있다. 대체 문서에서 _id 필드는 변경할 수 없으므로 _id 필드를 생략 할 수 있다. 그러나 _id 필드를 포함 시키면 현재 값과 동일한 값을 가져야한다.

 

> db.inventory.replaceOne({item:"paper"},{ item:"paper", instock:[{warehouse:"A",qty:60},{warehouse: "B", qty: 40}]})

 

위 쿼리는 item이 paper인 첫번째 문서를 두번째 인자인 문서로 교체해준다. _id 필드를 생략하였으므로 기존과 동일한 _id필드를 가지며, 혹시나 _id 필드를 넣더라도 대체하길 원하는 문서의 _id값과 동일하게 넣어줘야한다. 

 

여기까지 간단히 수정 쿼리을 다루어봤는데, 조금더 자세히 수정쿼리에 대해 다루어보자.

 

Method 설명
db.collection.updateOne()

여러 문서가 지정된 필터와 일치하더라도 지정된 필터와 일치하는 최대 하나의 문서를 업데이트한다.

버전 3.2의 새로운 기능.

db.collection.updateMany() 지정된 필터와 일치하는 모든 문서를 업데이트한다.
db.collection.replaceOne()

여러 문서가 지정된 필터와 일치하더라도 지정된 필터와 일치하는 최대 하나의 문서를 대체한다.

 

버전 3.2의 새로운 기능

db.collection.update()

지정된 필터와 일치하는 단일 문서를 업데이트하거나 지정된 필터와 일치하는 모든 문서를 업데이트한다. 

기본적으로 해당 Method는 단일 문서를 업데이트한다. 여러 문서를 업데이트하려면 multi 옵션을 사용한다.

 

-추가적인 업데이트 Method

Method
db.collection.findOneAndReplace()
db.collection.findOneAndUpdate()
db.collection.findAndModify()
db.collection.save()
db.collection.bulkWrite()

 

Delete Documents

문서 삭제에 대한 예제를 다루어봅니다.

 

-Delete All Documents

특정 컬렉션의 모든 문서를 삭제하기 위해서는 필터 값에 아무런 값을 넣지 않고 db.collection.deleteMany() 메서드를 작성하면 된다.

 

> db.inventory.deleteMany({})

 

해당 컬렉션에 모든 문서를 삭제하고 삭제대상이 된 문서의 갯수를 결과로 반환한다.

 

-Delete All Documents that Match a Condition

삭제할 문서를 식별하는 기준 또는 필터를 지정할 수 있다. 필터는 조회 쿼리와 동일한 구문을 사용한다.

 

> db.inventory.deleteMany({status:"A"})

 

위 쿼리는 status 값이 A인 모든 문서를 삭제한다.

 

-Delete Only One Document that Matches a Condition

지정된 필터와 일치하는 문서를 하나만 삭제할때 db.collection.deleteOne()을 사용한다.

 

> db.inventory.deleteOne({status:"D"})

 

Delete Behavior

-Indexes 

컬렉션에서 모든 문서를 삭제하더라도 인덱스는 삭제하지 않는다. (이것은 인덱스의 내부 구조에 대한 내용까지 들어갈 듯한데, 보통 B+Tree에서 인덱스 삭제 연산이 있더라도 삭제 플래그값만 남기고 값은 바로 삭제하지 않았던 걸로 기억. 정확히 아시는 분은 댓글로 좀 부탁드립니다...)

 

삭제 연산에 사용되는 Method들은 아래와 같다.

 

Method 설명
db.collection.deleteOne()

여러 문서가 지정된 필터와 일치하더라도 지정된 필터와 일치하는 문서를 하나만 삭제한다.

버전 3.2의 새로운 기능.

db.collection.deleteMany()

지정된 필터와 일치하는 모든 문서를 삭제한다.

버전 3.2의 새로운 기능.

db.collection.remove() 지정된 필터와 일치하는 단일 문서 또는 모든 문서를 삭제한다. 

 

-추가적인 문서삭제 Method

Method 설명
db.collection.findOneAndDelete() 해당 메서드는 정렬 옵션을 제공한다. 이 옵션을 사용하면 지정된 순서대로 정렬된 첫번째 문서를 삭제한다.
db.collection.findAndModify() 해당 메서드는 정렬 옵션을 제공한다. 이 옵션을 사용하면 지정된 순서대로 정렬된 첫번째 문서를 삭제할 수 있다.
db.collection.bulkWrite()  

 

Bulk Write Operations

몽고디비는 클라이언트에게 벌크작업을 수행할 수 있는 기능을 제공한다. 벌크작업은 단일 컬렉션에 대해서만 수행가능하다. 몽고디비는 애플리케이션이 벌크작업에 필요한 수용 가능한 Acknowledgement 수준을 결정할 수 있도록한다.

 

db.collection.bulkWrite() 메소드는 대량 삽입, 업데이트 및 제거 작업을 수행하는 기능을 제공한다. MongoDB는 db.collection.insertMany()를 통한 대량 삽입도 지원한다.

 

-Ordered vs Unordered Operations

벌크작업은 작업에 순서가 있거나 순서가 맞지 않을 수 있다.

 

순서가 지정된 작업 목록을 사용하여 MongoDB는 작업을 순차적으로 실행한다. 쓰기 작업 중 하나를 처리하는 동안 오류가 발생하면 MongoDB는 목록에 남아있는 쓰기 작업을 처리하지 않고 반환한다.

 

정렬되지 않은 작업 목록을 사용하면 MongoDB가 작업을 병렬로 실행할 수 있지만이 동작은 보장되지 않는다. 쓰기 작업 중 하나를 처리하는 동안 오류가 발생하면 MongoDB는 목록에 남아있는 쓰기 작업을 계속 처리한다.

 

분할 된 컬렉션에서 정렬 된 작업 목록을 실행하면 정렬 된 목록을 사용하여 정렬되지 않은 목록을 실행하는 것보다 일반적으로 속도가 느려진다. 각 작업은 이전 작업이 완료 될 때까지 기다려야한다.

 

기본적으로 bulkWrite ()는 순서가 지정된 작업을 수행한다. 순서가없는 쓰기 작업을 지정하려면 옵션 문서에서 ordered : false를 설정하면 된다.

 

즉, 벌크 연산은 아래와 같은 오퍼레이션과 동일한 역할을 수행할 수 있다.

 

insertOne
insertMany
updateOne
updateMany
replaceOne
deleteOne
deleteMany

 

1
2
3
{ "_id" : 1, "char" : "Brisbane", "class" : "monk", "lvl" : 4 },
{ "_id" : 2, "char" : "Eldon", "class" : "alchemist", "lvl" : 3 },
{ "_id" : 3, "char" : "Meldane", "class" : "ranger", "lvl" : 3 }
cs

 

characters 컬렉션에는 위와 같은 문서가 있다고 가정한다. 이 컬렉션에 아래와 같이 벌크 연산을 수행할 수 있다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
try {
   db.characters.bulkWrite(
      [
         { insertOne :
            {
               "document" :
               {
                  "_id" : 4, "char" : "Dithras", "class" : "barbarian", "lvl" : 4
               }
            }
         },
         { insertOne :
            {
               "document" :
               {
                  "_id" : 5, "char" : "Taeln", "class" : "fighter", "lvl" : 3
               }
            }
         },
         { updateOne :
            {
               "filter" : { "char" : "Eldon" },
               "update" : { $set : { "status" : "Critical Injury" } }
            }
         },
         { deleteOne :
            { "filter" : { "char" : "Brisbane"} }
         },
         { replaceOne :
            {
               "filter" : { "char" : "Meldane" },
               "replacement" : { "char" : "Tanys", "class" : "oracle", "lvl" : 4 }
            }
         }
      ]
   );
}
catch (e) {
   print(e);
}
cs

 

벌크 연산의 결과는 아래와 같다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
   "acknowledged" : true,
   "deletedCount" : 1,
   "insertedCount" : 2,
   "matchedCount" : 2,
   "upsertedCount" : 0,
   "insertedIds" : {
      "0" : 4,
      "1" : 5
   },
   "upsertedIds" : {
 
   }
}
cs

 

샤드 컬렉션에 벌크 연산 전략

초기 데이터 삽입 또는 일상적인 데이터 가져오기를 포함한 벌크삽입 연산 작업은 샤드 클러스터 성능에 영향을 줄 수 있다. 벌크삽입의 경우 다음 전략들을 고려할 수 있다.

 

-컬렉션 사전 분할

샤딩된 컬렉션이 비어 있다면, 컬렉션은 하나의 초기화된 청크만 갖고 있는다. 그런 다음 몽고디비는 데이터를 수신한 이후에 분할을 생성하고 분할 청크를 사용 가능한 샤드에 분배한다. 즉, 리소스가 어느정도 큰 작업이 될것이다. 이 성능 비용을 줄이기위해 벌크작업 이전에 컬렉션을 사전 분할 할 수 있다.

 

Split Chunks in a Sharded Cluster — MongoDB Manual

Split Chunks in a Sharded Cluster Normally, MongoDB splits a chunk after an insert if the chunk exceeds the maximum chunk size. However, you may want to split chunks manually if: you have a large amount of data in your cluster and very few chunks, as is th

docs.mongodb.com

-순서없는 벌크 연산

샤드 클러스터에 대한 쓰기 성능을 향상시키려면 벌크연산의 선택적인 매개변수인 orderded를 false로 설정하면 된다. 이 설정은 순서없이 작업을 수행하는데, 병렬로 수행하기 때문에 여러 샤드에 동시에 작업수행이 가능하다. 하지만 이 작업을 하기 전에는 컬렉션이 미리 분할되어 있어야한다.

 

여기까지 수정,삭제,벌크 연산에 대해 다루어봤습니다. 이후 포스팅에서 더 다양한 연산들을 다루어보겠습니다.

posted by 여성게
: