IT이론 2019. 4. 3. 21:57

 

 

 

최근 소프트웨어를 서비스 형태로 제공하는게 일반화 되면서, 웹앱 혹은 SaaS(Software As A Service)라고 부르게 되었다. Twelve-Factor app은 아래 특징을 가진 SaaS 앱을 만들기 위한 방법론이다.

  • 설정 자동화를 위한 절차를 체계화하여 새로운 개발자가 프로젝트에 참여하는데 드는 시간과 비용을 최소화한다.
  • OS에 따라 달라지는 부분을 명확히하고, 실행 환경 사이의 이식성을 극대화한다.(OS에 종속되지 않는 애플리케이션)
  • 클라우드 플랫폼에 적합하고, 서버와 시스템의 관리가 필요없게 된다.
  • 개발 환경과 운영 환경의 차이를 최소화하고 민첩성을 극대화하기 위해 지속적인 배포가 가능하다.
  • 툴, 아키텍쳐, 개발방식을 크게 바꾸지 않고 확장(scale up)할 수 있다.

Twelve-Factor 방법론은 어떤 프로그래밍 언어로 작성된 앱에도 적용할 수 있고, 백엔드 서비스(DB,큐,Mem chache 등)와 다양한 조합으로 사용할 수 있다.

 

 

1. 코드베이스(Code Base)

버전관리되는 하나의 코드베이스와 다양한 배포

 

Twelve-Factor 앱은 항상 깃,서브버전 같은 버전 제어 시스템을 사용하여 변화를 추적하며, 버전 추적 데이터베이스의 사본을 코드 저장소, 줄여서 저장소라고 부른다. 코드베이스는 단일 저장소(서브버전 같은 중앙 집중식 버전 관리 시스템)일 수도 있고, 루트 커밋을 공유하는 여러 저장소(깃 같은 분산 버전 관리 시스템)일수도 있다.

코드베이스와 앱 사이에는 항상 1대1 관계가 성립된다.

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

  • 코드베이스가 여러개 있는 경우, 앱이 아니라 분산 시스템으로 봐야한다. 분산 시스템의 개별 구성요소가 앱이 되며, 개별 앱이 Twelve-Factor를 따른다.
  • 여러개 앱이 동일한 코드를 공유한다면 Twelve-Factor를 위반하는 것이다. 이를 해결하려면 공유하는 코드를 라이브러리화 시키고, 해당 라이브러리를 종속성 매니저로 관리해야한다.

앱의 코드베이스는 한개여야 하지만, 앱 배포는 여러개가 될 수 있다. 배포는 실행중인 앱의 인스턴스를 가리킨다. 보통 운영 사이트와 여러 스테이징 사이트가 여기에 해당된다. 모든 개발자는 자신의 로컬 개발 환경에 실행되는 앱을 가지고 있는데, 이것 역시 하나의 배포로 볼 수 있다.(옆 그림참조)

배포마다 다른 버전이 활성화 될 수 있지만, 코드베이스 자체는 모든 배포에 대해 동일하다. 예를 들어, 개발자는 아직 스테이징 환경에 배포하지 않은 커밋이 있을 수 있으며, 스테이징 환경에는 아직 운영 환경에 배포되지 않은 커밋이 있을 수 있다. 하지만 이 모든 것들이 같은 코드베이스를 공유하고, 같은 앱의 다른 배포라고 할 수 있다.

 

2. 종속성

명시적으로 선언되고 분리된 종속성

 

대부분의 프로그래밍 언어는 라이브러리 배포를 위한 패키징 시스템을 제공하고 있다. 라이브러리는 패키징 시스템을 통해 시스템 전체나 애플리케이션을 포함한 디렉토리에 설치될 수 있다.

Twelve-Factor App은 전체 시스템에 특정 패키지가 암묵적으로 존재하는 것에 절대 의존하지 않는다. 종속성 선언 mainifest를 이용하여 모든 종속성을 완전하고 엄격하게 선언한다. 더 나아가, 종속성 분리툴을 사용하여 실행되는 동안 둘러싼 시스템으로 암묵적인 종속성 "유출
"이 발생하지 않는 것을 보장한다. 이런 완전하고 명시적인 종속성의 명시는 개발과 서비스 모두에게 동일하게 적용된다.

 

3. 설정

환경에 저장된 설정

 

애플리케이션의 설정은 배포(스테이징,프로덕션,개발 등)마다 달리질 수 있는 모든 것들이다. 설정에는 다음이 포함된다.

  • 데이터베이스,메모리캐시 등 백엔드 서비스들의 리소스 핸들
  • 외부 서비스 인증정보
  • 배포된 호스트의 정규화된 호스트이름처럼 각 배포마다 달리지는 값

애플리케이션은 종종 설정을 상수로 코드에 저장한다. 이것은 Twelve-Factor를 위반하며, Twelve-Factor는 설정을 코드에서 엄격하게 분리하는 것을 요구한다. 설정은 배치마다 크게 다르지만, 코드는 그렇지 않기 때문이다.

 

애플리케이션의 모든 설정이 정상적으로 코드 바깥으로 분리되어 있는지 확인할 수 있는 간단한 테스트는 어떠한 인정정보도 유출시키지 않고 코드베이스가 지금 당장 오픈소스가 될 수 있는지 확인하는 것이다. 이 "설정"의 정의는 애플리케이션 내부 설정을 포함하지 않는다는 점에 유의해야한다. Spring의 "어떻게 코드 모듈이 연결되는 가"와 같은 설정들은 배치 사이에 변하지 않기 때문에 코드의 내부에 있는 것이 가장 좋다.

 

설정에 대한 또 다른 접근방식은 Rails의 config/database.yaml처럼 버전관리 시스템에 등록되지 않은 설정 파일을 이용하는 것이다. 이 방법은 코드 저장소에 등록된 상수를 사용하는 것에 비하면 매우 큰 발전이지만, 설정 파일이 여러 위치에 여러 포맷으로 흝어지고 모든 설정을 한 곳에서 확인하고 관리하기 어렵게 만드는 경향이 있다.

 

Twelve-Factor App은 설정을 환경변수에 저장한다. 환경 변수는 코드 변경 없이 배포 때마다 쉽게 변경할 수 있다. 설정 파일과 달리, 잘못해서 코드 저장소에 올라갈 가능성도 낮다. 또한, 커스텀 설정 파일이나 Java System Property와 같은 다른 설정 매커니즘과 달리 언어나 OS에 의존하지 않은 표준입니다.

 

설정 관리의 다른 측면은 그룹핑입니다. 종종 애플리케이션은 설정을 명명된 그룹(“environments”라고도 함)으로 구성하기도 합니다. 해당 그룹은 Rails의 ‘development’, ‘test’, ‘production’ environments처럼, 배포의 이름을 따서 명명됩니다. 이 방법은 깔끔하게 확장하기 어렵습니다. 응용 프로그램의 배포가 증가함에 따라, ‘staging’이라던가 ‘qa’같은 새로운 그룹의 이름이 필요하게 됩니다. 프로젝트가 성장함에 따라, 개발자은 자기 자신의 그룹를 추가하게 됩니다. 결과적으로 설정이 각 그룹의 조합으로 폭발하게 되고, 애플리케이션의 배포를 불안정하게 만듭니다.

Twelve-Factor App에서 환경 변수는 매우 정교한 관리이며, 각각의 환경변수는 서로 직교합니다. 환경 변수는 “environments”로 절대 그룹으로 묶이지 않지만, 대신 각 배포마다 독립적으로 관리됩니다. 이 모델은 애플리케이션의 수명주기를 거치는 동안 더 많은 배포로 원활하게 확장해 나갈 수 있습니다.

 

4. 백앤드 서비스

백엔드 서비스를 연결된 리소스로 취급

 

백엔드 서비스는 애플리케이션 정상 동작 중 네트워크를 통해 이용하는 모든 서비스입니다. 예를 들어, 데이터 저장소(예: MySQL, CouchDB), 메시지 큐잉 시스템(예: RabbitMQ, Beanstalkd), 메일을 보내기 위한 SMTP 서비스 (예: Postfix), 캐시 시스템(예: Memcached) 등이 있습니다.

데이터베이스와 같은 백엔드 서비스들은 통상적으로 배포된 애플리케이션과 같은 시스템 관리자에 의해서 관리되고 있었습니다. 애플리케이션은 이런 로컬에서 관리하는 서비스 대신, 서드파티에 의해서 제공되고 관리되는 서비스를 이용할 수 있습니다. 예를 들어, SMTP 서비스 (예: Postmark), 지표 수집 서비스 (예: New Relic, Loggly), 스토리지 서비스 (예: Amazon S3), API로 접근 가능한 소비자 서비스 (예: Twitter, Google Maps, Last.fm)등이 있습니다.

Twelve-Factor App의 코드는 로컬 서비스와 서드파티 서비스를 구별하지 않습니다. 애플리케이션에게는 양 쪽 모두 연결된 리소스이며, 설정에 있는 URL 혹은 다른 로케이터와 인증 정보를 사용해서 접근 됩니다. Twelve-Factor App의 배포는 애플리케이션 코드를 수정하지 않고 로컬에서 관리되는 MySQL DB를 서드파티에서 관리되는 DB(예: Amazon RDS)로 전환할 수 있어야 합니다. 마찬가지로, 로컬 SMTP 서버는 서드파티 SMTP 서비스(예: Postmark)로 코드 수정 없이 전환이 가능해야 합니다. 두 경우 모두 설정에 있는 리소스 핸들만 변경하면 됩니다.

각각의 다른 백엔드 서비스는 리소스입니다. 예를 들어, 하나의 MySQL DB는 하나의 리소스입니다. 애플리케이션 레이어에서 샤딩을 하는 두 개의 MySQL 데이터베이스는 두 개의 서로 다른 리소스라고 볼 수 있습니다. Twelve-Factor App은 이러한 데이터베이스들을 첨부된(Attached) 리소스로 다룹니다. 이는 서로 느슨하게 결합된다는 점을 암시합니다.

리소스는 자유롭게 배포에 연결되거나 분리될 수 있습니다. 예를 들어, 애플리케이션의 데이터베이스가 하드웨어 이슈로 작용이 이상한 경우, 애플리케이션의 관리자는 최신 백업에서 새로운 데이터베이스 서버를 시작시킬 것입니다. 그리고 코드를 전혀 수정하지 않고 현재 운영에 사용하고 있는 데이터베이스를 분리하고 새로운 데이터베이스를 연결할 수 있습니다.

 

5. 빌드, 릴리즈, 실행

철저하게 분리된 빌드와 실행 단계

 

코드베이스는 3 단계를 거쳐 (개발용이 아닌) 배포로 변환됩니다.

  • 빌드 단계는 코드 저장소를 빌드라는 실행 가능한 번들로 변환시키는 단계입니다. 빌드 단계에서는 커밋된 코드 중 배포 프로세스에서 지정된 버전을 사용하며, 종속성을 가져와 바이너리와 에셋들을 컴파일합니다.
  • 릴리즈 단계에서는 빌드 단계에서 만들어진 빌드와 배포의 현재 설정을 결합 합니다. 완성된 릴리즈는 빌드와 설정을 모두 포함하며 실행 환경에서 바로 실행될 수 있도록 준비됩니다.
  • 실행 단계(런타임이라고도 하는)에서는 선택된 릴리즈에 대한 애플리케이션 프로세스의 집합을 시작하여, 애플리케이션을 실행 환경에서 돌아가도록 합니다.

Twelve-Factor App은 빌드, 릴리즈, 실행 단계를 엄격하게 서로 분리합니다. 예를 들어, 실행 단계에서 코드를 변경할 수는 없습니다. 변경을 실행 단계보다 앞에 있는 빌드 단계로 전달할 수 있는 방법이 없기 때문입니다.

배포 도구는 일반적으로 릴리즈 관리 도구를 제공합니다. 특히 주목할만한 점은 이전 릴리즈로 되돌릴 수 있는 롤백 기능입니다. 예를 들어, Capistrano는 배포 툴은 릴리즈를 releases라는 하위 디렉토리에 저장시키고, 현재 릴리즈는 현재 릴리즈 디렉토리로 심볼릭 링크로 연결합니다. 이 툴의 rollback 명령어는 이전 버전으로 쉽고 빠르게 이전 릴리즈로 롤백할 수 있도록 해줍니다. 모든 릴리즈는 항상 유니크한 릴리즈 아이디를 지녀야 합니다. 예를 들어, 릴리즈의 타임 스템프(예: 2011-04-06-20:32:17)나 증가하는 번호(예: v100, v101)가 있습니다. 릴리즈는 추가만 될 수 있으며, 한번 만들어진 릴리즈는 변경될 수 없습니다. 모든 변경은 새로운 릴리즈를 만들어야 합니다.

빌드는 새로운 코드가 배포 될 때마다 개발자에 의해 시작됩니다. 반면, 실행 단계는 서버가 재부팅되거나 충돌이 발생한 프로세스가 프로세스 매니저에 의해 재시작 되었을 때 자동으로 실행될 수 있습니다. 따라서 대응할 수 있는 개발자가 없는 한밤중에 문제가 발생하는 것을 방지하기 위해, 실행 단계는 최대한 변화가 적어야합니다. 빌드 단계는 좀 더 복잡해져도 괜찮습니다. 항상 배포를 진행하고 있는 개발자의 눈 앞에서 에러가 발생하기 때문입니다.

 

6.프로세스

애플리케이션을 하나 혹은 여러개의 무상태 프로세스로 실행

 

실행 환경에서 앱은 하나 이상의 프로세스로 실행됩니다.

가장 간단한 케이스는 코드가 stand-alone 스크립트인 경우입니다. 이 경우, 실행 환경은 개발자의 언어 런타임이 설치된 로컬 노트북이며, 프로세스는 커맨드 라인 명령어에 의해서 실행됩니다.(예: python my_script.py) 복잡한 케이스로는 많은 프로세스 타입별로 여러개의 프로세스가 사용되는 복잡한 애플리케이션이 있습니다.

Twelve-Factor 프로세스는 무상태(stateless)이며, 아무 것도 공유하지 않습니다. 유지될 필요가 있는 모든 데이터는 데이터베이스 같은 안정된 백엔드 서비스에 저장되어야 합니다.

짧은 단일 트랙잭션 내에서 캐시로 프로세스의 메모리 공간이나 파일시스템을 사용해도 됩니다. 예를 들자면 큰 파일을 받고, 해당 파일을 처리하고, 그 결과를 데이터베이스에 저장하는 경우가 있습니다. Twelve-Factor 앱에서 절대로 메모리나 디스크에 캐시된 내용이 미래의 요청이나 작업에서도 유효할 것이라고 가정해서는 안됩니다. 각 프로세스 타입의 프로세스가 여러개 돌아가고 있는 경우, 미래의 요청은 다른 프로세스에 의해서 처리될 가능성이 높습니다. 하나의 프로세스만 돌고 있는 경우에도 여러 요인(코드 배포, 설정 변경, 프로세스를 다른 물리적 장소에 재배치 등)에 의해서 발생하는 재실행은 보통 모든 로컬의 상태(메모리와 파일 시스템 등)를 없애버립니다.

에셋 패키징 도구 (예: Jammit, django-assetpackager)는 컴파일된 에셋을 저장할 캐시로 파일 시스템을 사용합니다. Twelve-Factor App은 이러한 컴파일을 런타임에 진행하기보다는, Rails asset pipeline처럼 빌드 단계에서 수행하는 것을 권장합니다.

웹 시스템 중에서는 “Sticky Session”에 의존하는 것도 있습니다. 이는 유저의 세션 데이터를 앱의 프로세스 메모리에 캐싱하고, 같은 유저의 이후 요청도 같은 프로세스로 전달될 것을 가정하는 것입니다. Sticky Session은 Twelve-Factor에 위반되며, 절대로 사용하거나 의존해서는 안됩니다. 세션 상태 데이터는 Memcached Redis처럼 유효기간을 제공하는 데이터 저장소에 저장하는 것이 적합합니다.

 

7.포트바인딩

포트 바인딩을 사용해서 서비스를 공개함

 

웹앱은 웹서버 컨테이너 내부에서 실행되기도 합니다. 예를 들어, PHP 앱은 Apache HTTPD의 모듈로 실행될 수도 있고, Java 앱은 Tomcat 내부에서 실행될 수도 있습니다.

Twelve-Factor 앱은 완전히 독립적이며 웹서버가 웹 서비스를 만들기 위해 처리하는 실행환경에 대한 런타임 인젝션에 의존하지 않습니다. Twelve-Factor 웹 앱은 포트를 바인딩하여 HTTP 서비스로 공개되며 그 포트로 들어오는 요청을 기다립니다.

로컬 개발 환경에서는 http://localhost:5000과 같은 주소를 통해 개발자가 애플리케이션 서비스에 접근할 수 있습니다. 배포에서는 라우팅 레이어가 외부에 공개된 호스트명으로 들어온 요청을 포트에 바인딩된 웹 프로세스에 전달 합니다.

이는 일반적으로 종속성 선언에 웹서버 라이브러리를 추가함으로써 구현됩니다. 예를 들어, 파이썬의 Tornado나 루비의 Thin이나 자바와 JVM 기반 언어들을 위한 Jetty가 있습니다. 이것들은 전적으로 유저 스페이스 즉, 애플리케이션의 코드 내에서 처리됩니다. 실행 환경과의 규약은 요청을 처리하기 위해 포트를 바인딩하는 것입니다.

포트 바인딩에 의해 공개되는 서비스는 HTTP 뿐만이 아닙니다. 거의 모든 종류의 서버 소프트웨어는 포트를 바인딩하고 요청이 들어오길 기다리는 프로세스를 통해 실행될 수 있습니다. 예를 들면, ejabberd (XMPP을 따름)나 Redis (Redis protocol을 따름) 등이 있습니다.

포트 바인딩을 사용한다는 것은 하나의 앱이 다른 앱을 위한 백엔드 서비스가 될 수 있다는 것을 의미한다는 점에 주목합시다. 백엔드 앱의 URL을 사용할 앱의 설정의 리소스 핸들로 추가하는 방식으로 앱이 다른 앱을 백엔드 서비스로 사용할 수 있습니다.

 

8. 동시성

프로세스 모델을 통한 확장

 

모든 컴퓨터 프로그램은 실행되면 하나 이상의 프로세스로 표현됩니다. 웹 애플리케이션은 다양한 프로세스 실행 형태를 취해왔습니다. 예를 들어, PHP 프로세스는 Apache의 자식 프로세스로 실행되며, request의 양에 따라 필요한 만큼 시작됩니다. 자바 프로세스들은 반대 방향에서의 접근법을 취합니다. JVM은, 시작될 때 큰 시스템 리소스(CPU와 메모리) 블록을 예약하는 하나의 거대한 부모 프로세스를 제공하고, 내부 쓰레드를 통해 동시성(concurrency)을 관리합니다. 두 경우 모두 실행되는 프로세스는 애플리케이션 개발자에게 최소한으로 노출됩니다.

Twelve-Factor App에서 프로세스들은 일급 시민입니다.Twelve-Factor App에서의 프로세스는 서비스 데몬들을 실행하기 위한 유닉스 프로세스 모델에서 큰 힌트를 얻었습니다. 이 모델을 사용하면 개발자는 애플리케이션의 작업을 적절한 프로세스 타입에 할당함으로서 다양한 작업 부하를 처리할 수 있도록 설계할 수 있습니다. 예를 들어, HTTP 요청은 웹 프로세스가 처리하며, 시간이 오래 걸리는 백그라운드 작업은 worker 프로세스가 처리하도록 할 수 있습니다.

이는 런타임 VM 내부의 쓰레드나 EventMachine, Twisted, Node.js에서 구성된 것 처럼 async/evented 모델처럼 개별 프로세스가 내부적으로 동시에 처리하는 것을 금지하는 것은 아닙니다. 하지만 개별 VM이 너무 커질 수 있습니다.(수직 확장) 따라서 애플리케이션은 여러개의 물리적인 머신에서 돌아가는 여러개의 프로세스로 넓게 퍼질 수 있어야만 합니다.

 

프로세스 모델이 진정으로 빛나는 것은 수평적으로 확장하는 경우입니다. 아무것도 공유하지 않고, 수평으로 분할할 수 있는 Twelve-Factor App 프로세스의 성질은 동시성을 높이는 것은 간단하고 안정적인 작업이라는 것을 의미 합니다. 프로세스의 타입과 각 타입별 프로세스의 갯수의 배치를 프로세스 포메이션이라고 합니다.

Twelve-Factor App 프로세스는 절대 데몬화해서는 안되며 PID 파일을 작성해서는 안됩니다. 대신, OS의 프로세스 관리자(예: systemd)나 클라우드 플랫폼의 분산 프로세스 매니저, 혹은 Foreman 같은 툴에 의존하여 아웃풋 스트림을 관리하고, 충돌이 발생한 프로세스에 대응하고, 재시작과 종료를 처리해야 합니다.

 

9. 폐기 가능

빠른 시작과 그레이스풀 셧다운을 통한 안정성 극대화

 

Twelve-Factor App의 프로세스 간단하게 폐기 가능합니다. 즉, 프로세스는 바로 시작하거나 종료될 수 있습니다. 이러한 속성은 신축성 있는 확장과 코드 설정의 변화를 빠르게 배포하는 것을 쉽게 하며, production 배포를 안정성 있게 해줍니다.

프로세스는 시작 시간을 최소화하도록 노력해야합니다. 이상적으로, 프로세스는 실행 커맨드가 실행된 뒤 몇 초만에 요청이나 작업을 받을 수 있도록 준비 됩니다. 짧은 실행 시간은 릴리즈 작업과 확장(scale up)이 더 민첩하게 이루어질 수 있게 합니다. 또한 프로세스 매니저가 필요에 따라 쉽게 프로세스를 새로운 머신으로 프로세스를 옮길 수 있기 때문에 안정성도 높아집니다.

프로세스는 프로세스 매니저로부터 SIGTERM 신호를 받았을 때 그레이스풀 셧다운(graceful shutdown)을 합니다. 웹프로세스의 그레이스풀 셧다운 과정에서는 서비스 포트의 수신을 중지하고(그럼으로써 새로운 요청을 거절함), 현재 처리 중인 요청이 끝나길 기다린 뒤에 프로세스가 종료 되게 됩니다. 이 모델은 암묵적으로 HTTP 요청이 짧다는 가정(기껏해야 몇 초)을 깔고 있습니다. long polling의 경우에는 클라이언트가 연결이 끊긴 시점에 바로 다시 연결을 시도해야 합니다.

worker 프로세스의 경우, 그레이스풀 셧다운은 현재 처리중인 작업을 작업 큐로 되돌리는 방법으로 구현됩니다. 예를 들어, RabbitMQ에서는 worker는 NACK을 메시지큐로 보낼 수 있습니다. Beanstalkd에서는 woker와의 연결이 끊기면 때 자동으로 작업을 큐로 되돌립니다. Delayed Job와 같은 Lock-based 시스템들은 작업 레코드에 걸어놨던 lock을 확실하게 풀어놓을 필요가 있습니다. 이 모델은 암묵적으로 모든 작업은 재입력 가능(reentrant)하다고 가정합니다. 이는 보통, 결과를 트랜잭션으로 감싸거나 요청을 멱등(idempotent)하게 함으로써 구현될 수 있습니다.

프로세스는 하드웨어 에러에 의한 갑작스러운 죽음에도 견고해야합니다. 이러한 사태는 SIGTERM에 의한 그레이스풀 셧다운에 비하면 드문 일이지만, 그럼에도 발생할 수 있습니다. 이런 일에 대한 대책으로 Beanstalkd와 같은 견고한 큐잉 백엔드를 사용하는 것을 권장합니다. 이러한 백엔드는 클라이언트가 접속이 끊기거나, 타임 아웃이 발생했을 때, 작업을 큐로 되돌립니다. Twelve-Factor App은 예기치 못한, 우아하지 않은 종료도 처리할 수 있도록 설계됩니다. Crash-only design에서는 논리적인 결론으로 이러한 컨셉을 가져왔습니다.

 

10. dev/prod 일치

개발,스테이징,프로덕트 환경을 최대한 비슷하게 유지

 

역사적으로, 개발 환경(애플리케이션의 개발자가 직접 수정하는 로컬의 배포)과 production 환경(최종 사용자가 접근하게 되는 실행 중인 배포) 사이에는 큰 차이가 있었습니다. 이러한 차이는 3가지 영역에 걸처 나타납니다.

  • 시간의 차이: 개발자가 작업한 코드는 production에 반영되기까지 며칠, 몇주, 때로는 몇개월이 걸릴 수 있습니다.
  • 담당자의 차이: 개발자가 작성한 코드를 시스템 엔지니어가 배포합니다.
  • 툴의 차이: production 배포는 아파치, MySQL, 리눅스를 사용하는데, 개발자는 Nginx, SQLite, OS X를 사용할 수 있습니다.

Twelve Factor App은 개발 환경과 production 환경의 차이를 작게 유지하여 지속적인 배포가 가능하도록 디자인 되었습니다. 위에서 언급한 3가지 차이에 대한 대응책은 아래와 같습니다.

  • 시간의 차이을 최소화: 개발자가 작성한 코드는 몇 시간, 심지어 몇 분 후에 배포됩니다.
  • 담당자의 차이를 최소화: 코드를 작성한 개발자들이 배포와 production에서의 모니터링에 깊게 관여합니다.
  • 툴의 차이를 최소화: 개발과 production 환경을 최대한 비슷하게 유지합니다.

위의 내용을 표로 요약하면 아래와 같습니다.

전통적인 애플리케이션Twelve-Factor App배포 간의 간격코드 작성자와 코드 배포자개발 환경과 production 환경

몇 주 몇 시간
다른 사람 같은 사람
불일치함 최대한 유사함

데이터베이스, 큐잉 시스템, 캐시와 같은 백엔드 서비스는 dev/prod 일치가 중요한 영역 중 하나 입니다. 많은 언어들은 다른 종류의 서비스에 대한 어댑터를 포함하고 간단하게 백엔드 서비스에 접근할 수 있는 라이브러리들을 제공합니다. 아래의 표에 몇가지 예가 나와있습니다.

종류언어라이브러리어댑터

데이터 베이스 Ruby/Rails ActiveRecord MySQL, PostgreSQL, SQLite
큐(Queue) Python/Django Celery RabbitMQ, Beanstalkd, Redis
캐쉬 Ruby/Rails ActiveSupport::Cache 메모리, 파일시스템, Memcached

production 환경에서는 더 본격적이고 강력한 백엔드 서비스가 사용됨에도 불구하고, 개발자는 자신의 로컬 개발 환경에서는 가벼운 백엔드 서비스를 사용하는 것에 큰 매력을 느낄 수도 있습니다. 예를 들어, 로컬에서는 SQLite를 사용하고 production에서는 PostgreSQL을 사용한다던가, 개발 중에는 로컬 프로세스의 메모리를 캐싱용으로 사용하고 production에서는 Memcached를 사용하는 경우가 있습니다.

Twelve-Factor 개발자는 개발 환경과 production 환경에서 다른 백엔드 서비스를 쓰고 싶은 충동에 저항합니다. 이론적으로는 어댑터가 백엔드 서비스 간의 차이를 추상화해준다고 해도, 백엔드 서비스 간의 약간의 불일치가 개발 환경과 스테이징 환경에서는 동작하고 테스트에 통과된 코드가 production 환경에서 오류를 일으킬 수 있기 때문입니다. 이런 종류의 오류는 지속적인 배포를 방해합니다. 애플리케이션의 생명 주기 전체를 보았을 때, 이러한 방해와 지속적인 배포의 둔화가 발생시키는 손해는 엄청나게 큽니다.

가벼운 로컬 서비스는 예전처럼 필수적인 것은 아닙니다. Memcache, PostgreSQL, RabbitMQ와 같은 현대적인 백엔드 서비스들은 Homebrew apt-get와 같은 현대적인 패키징 시스템 덕분에 설치하고 실행하는데 아무런 어려움도 없습니다. 혹은 Chef and Puppet와 같은 선언적 provisioning 툴과 Vagrant등의 가벼운 가상 환경을 결합하여 로컬 환경을 production 환경과 매우 유사하게 구성할 수 있습니다. dev/prod 일치와 지속적인 배포의 이점에 비하면 이러한 시스템을 설치하고 사용하는 비용은 낮습니다.

여러 백엔드 서비스에 접근할 수 있는 어댑터는 여전히 유용합니다. 새로운 백엔드 서비스를 사용하도록 포팅하는 작업의 고통을 낮춰주기 때문입니다. 하지만, 모든 애플리케이션의 배포들(개발자 환경, 스테이징, production)은 같은 종류, 같은 버전의 백엔드 서비스를 이용해야합니다.

 

11. 로그

로그를 이벤트 스트림으로 취급

 

로그는 실행 중인 app의 동작을 확인할 수 있는 수단입니다. 서버 기반 환경에서 로그는 보통 디스크에 파일(로그 파일)로 저장됩니다. 하지만, 이것은 출력 포맷 중 하나에 불과합니다.

로그는 모든 실행중인 프로세스와 백그라운드 서비스의 아웃풋 스트림으로부터 수집된 이벤트가 시간 순서로 정렬된 스트림입니다. 가공되지 않는 로그는 보통, 하나의 이벤트가 하나의 라인으로 기록된 텍스트 포맷입니다.(예외(exception)에 의한 backtrace는 여러 라인에 걸쳐 있을 수도 있습니다.) 로그는 고정된 시작과 끝이 있는 것이 아니라, app이 실행되는 동안 계속 흐르는 흐름입니다.

Twelve-Factor App은 아웃풋 스트림의 전달이나 저장에 절대 관여하지 않습니다. app은 로그 파일을 작성하거나, 관리하려고 해서는 안됩니다. 대신, 각 프로세스는 이벤트 스트림을 버퍼링 없이 stdout에 출력합니다. 로컬 개발환경에서 작업 중인 개발자는 app의 동작을 관찰하기 원하면 각자의 터미널에 출력되는 이 스트림을 볼 수 있습니다.

스테이징이나 production 배포에서는 각 프로세스의 스트림은 실행 환경에 의해서 수집된 후, 앱의 다른 모든 스트림과 병합되어 열람하거나 보관하기 위한 하나 이상의 최종 목적지로 전달됩니다. 이러한 목적지들은 앱이 열람하거나 설정할 수 없지만, 대신 실행 환경에 의해서 완벽하게 관리됩니다. 이를 위해 오픈 소스 로그 라우터를 사용할 수 있습니다.(예: (Logplex, Fluentd))

앱의 이벤트 스트림은 파일로 보내지거나 터미널에서 실시간으로 보여질 수 있습니다. 가장 중요한 점은 스트림은 Splunk같은 로그 분석 시스템과 Hadoop/Hive같은 범용 데이터 보관소에 보내질 수 있다는 점입니다. 이러한 시스템은 장기간에 걸쳐 앱의 동작을 조사할 수 있는 강력함과 유연성을 가지게 됩니다.

  • 과거의 특정 이벤트를 찾기
  • 트렌드에 대한 거대한 규모의 그래프 (예: 분당 요청 수)
  • 유저가 정의한 휴리스틱에 따른 알림 (예: 분당 오류 수가 임계 값을 넘는 경우 알림을 발생시킴)

12. Admin 프로세스

admin/maintenance 작업을 일회성 프로세스로 실행

 

프로세스 포메이션은 애플리케이션의 일반적인 기능들(예: Web request의 처리)을 처리하기 위한 프로세스들의 집합 입니다. 이와는 별도로, 개발자들은 종종 일회성 관리나 유지 보수 작업이 필요합니다. 그 예는 아래와 같습니다.

  • 데이터베이스 마이그레이션을 실행합니다. (예: Django에서 manage.py migrate, Rail에서 rake db:migrate)
  • 임의의 코드를 실행하거나 라이브 데이터베이스에서 앱의 모델을 조사하기 위해 콘솔(REPL Shell로도 알려져 있는)을 실행합니다. 대부분의 언어에서는 인터프리터를 아무런 인자 없이 실행하거나(예: python, perl) 별도의 명령어로 실행(예: ruby의 irb, rails의 rails console)할 수 있는 REPL를 제공합니다.
  • 애플리케이션 저장소에 커밋된 일회성 스크립트의 실행 (예: php scripts/fix_bad_records.php)

일회성 admin 프로세스는 애플리케이션의 일반적인 오래 실행되는 프로세스들과 동일한 환경에서 실행되어야 합니다. 일회성 admin 프로세스들은 릴리즈를 기반으로 실행되며, 해당 릴리즈를 기반으로 돌아가는 모든 프로세스처럼 같은 코드베이스 설정를 사용해야 합니다. admin 코드는 동기화 문제를 피하기 위해 애플리케이션 코드와 함께 배포되어야 합니다.

모든 프로세스 타입들에는 동일한 종속성 분리 기술이 사용되어야 합니다. 예를 들어, 루비 웹 프로세스가 bundle exec thin start 명령어를 사용한다면, 데이터베이스 마이그레이션은 bundle exec rake db:migrate를 사용해야합니다. 마찬가지로, virtualenv를 사용하는 파이썬 프로그램은 tornado 웹 서버와 모든 manage.py admin 프로세스가 같은 virtualenv에서의 bin/python을 사용해야 합니다.

Twelve-Factor는 별도의 설치나 구성없이 REPL shell을 제공하는 언어를 강하게 선호합니다. 이러한 점은 일회성 스크립트를 실행하기 쉽게 만들어주기 때문입니다. 로컬 배포에서, 개발자는 앱을 체크아웃한 디렉토리에서 일회성 admin 프로세스를 shell 명령어로 바로 실행시킵니다. production 배포에서, 개발자는 ssh나 배포의 실행 환경에서 제공하는 다른 원격 명령어 실행 메커니즘을 사용하여 admin 프로세스를 실행할 수 있습니다.

posted by 여성게
:
Web/Spring 2019. 4. 2. 21:57

Spring boot - Redis를 이용한 HttpSession


오늘의 포스팅은 Spring boot 환경에서 Redis를 이용한 HttpSession 사용법입니다. 무슨 말이냐? 일반 Springframework와는 다르게 Spring boot 환경에서는 그냥 HttpSession을 사용하는 것이 아니고, Redis와 같은 in-memory DB 혹은 RDB(JDBC),MongoDB와 같은 외부 저장소를 이용하여 HttpSession을 이용합니다. 어떻게 보면 단점이라고 볼 수 있지만, 다른 한편으로는 장점?도 존재합니다. 일반 war 형태의 배포인 Dynamic Web은 같은 애플리케이션을 여러개 띄울 경우 세션 공유를 위하여 WAS단에서 Session Clustering 설정이 들어갑니다. 물론 WAS 설정에 익숙한 분들이라면 별 문제 없이 설정가능하지만, WAS설정 등에 미숙하다면 확실함 없이 구글링을 통하여 막 찾아서 설정을 할 것입니다. 물론 나쁘다는 것은 아닙니다. 벤치마킹 또한 하나의 전략이니까요. 하지만 Spring boot의 경우 Session Cluster를 위하여 별도의 설정은 필요하지 않습니다. 이중화를 위한 같은 애플리케이션 여러개가 HttpSession을 위한 같은 저장소만 바라보면 됩니다. 어떻게 보면 설정이 하나 추가된 것이긴 하지만 익숙한 application.properties등에 설정을 하니, 자동완성도 되고... 실수할 일도 줄고, 디버깅을 통해 테스트도 가능합니다. 크게 중요한 이야기는 아니므로 바로 예제를 들어가겠습니다.



테스트 환경

  • Spring boot 2.1.3.RELEASE(App1,App2)
  • Redis 5.0.3 Cluster(Master-6379,6380,6381 Slave-6382,6383,6384)

만약 Redis Cluster환경을 구성한 이유는, 프로덕 환경에서는 Redis 한대로는 위험부담이 있기때문에 고가용성을 위하여 클러스터 환경으로 테스트를 진행하였습니다. 한대가 죽어도 서비스되게 하기 위해서이죠. 만약 한대로 하시고 싶다면 한대로 진행하셔도 됩니다. 하지만 클러스터환경을 구성하고 싶지만 환경구성에 대해 잘 모르시는 분은 아래 링크를 참조하여 구성하시길 바랍니다.


▶︎▶︎▶︎2019/03/01 - [Redis] - Springboot,Redis - Springboot Redis Nodes Cluster !(레디스 클러스터)

▶︎▶︎▶︎2019/02/28 - [Redis] - Redis - Cluster & Sentinel 차이점 및 Redis에 대해


애플리케이션은 총 2대를 준비하였고, 한대를 클라이언트 진입점인 API G/W, 한대를 서비스 애플리케이션이라고 가정하고 테스트를 진행하였습니다. 즉, 두 애플리케이션이 하나의 세션을 공유할 수 있을까라는 궁금즘을 해결하기 위한 구성이라고 보시면 됩니다. 사실 같은 애플리케이션을 이중화 구성을 한 것이 아니고 별도의 2개의 애플리케이션끼리 세션을 공유해도 되는지는 아직 의문입니다. 하지만 다른 애플리케이션끼리도 HttpSession을 공유할 수 있다면 많은 이점이 있을 것같아서 진행한 테스트입니다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.session</groupId>
            <artifactId>spring-session-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
cs


두개의 애플리케이션에 동일하게 의존 라이브러리를 추가해줍니다. 저는 부트 프로젝트 생성시 Web을 체크하였고, 나머지 위에 4개는 수동으로 추가해주었습니다.


1
2
3
spring.session.store-type=redis
spring.redis.cluster.nodes=127.0.0.1:6379,127.0.0.1:6380,127.0.0.1:6381
 
cs


application.properties입니다. 이 또한 두개의 애플리케이션에 동일하게 넣어줍니다. 간단히 설정에 대해 설명하면 spring.session.store-type=redis는 HttpSession 데이터를 위한 저장소를 Redis를 이용하겠다는 설정입니다. Redis 말고도 MongoDB,JDBC등이 있습니다. 두번째 spring.redis.cluster.nodes=~설정은 저장소로 사용할 Redis의 클러스터 노드 리스트(마스터)를 넣어줍니다. 


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
 * Redis Cluster Config
 * @author yun-yeoseong
 *
 */
@Component
@ConfigurationProperties(prefix = "spring.redis.cluster")
public class RedisClusterConfigurationProperties {
    
    /**
     * spring.redis.cluster.nodes[0]=127.0.0.1:6379
     * spring.redis.cluster.nodes[1]=127.0.0.1:6380
     * spring.redis.cluster.nodes[2]=127.0.0.1:6381
     */
    private List<String> nodes;
 
    public List<String> getNodes() {
        return nodes;
    }
 
    public void setNodes(List<String> nodes) {
        this.nodes = nodes;
    }
    
    
}
cs


Redis 설정을 위하여 클러스터 노드리스트 값을 application.proerties에서 읽어올 빈입니다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Configuration
public class RedisConfig {
    /**
     * Redis Cluster 구성 설정
     */
    @Autowired
    private RedisClusterConfigurationProperties clusterProperties;
 
    /**
     * JedisPool관련 설정
     * @return
     */
    @Bean
    public JedisPoolConfig jedisPoolConfig() {
        return new JedisPoolConfig();
    }
    
    /**
     * Redis Cluster 구성 설정 - Cluster 구성
     */
    @Bean
    public RedisConnectionFactory jedisConnectionFactory(JedisPoolConfig jedisPoolConfig) {
        return new JedisConnectionFactory(new RedisClusterConfiguration(clusterProperties.getNodes()),jedisPoolConfig);
    }
        
}
cs


JedisPoolConfig 및 RedisConnectionFacotry 빈입니다. 아주 작동만 할 수 있는 기본입니다. 추후에는 적절히 설정값을 넣어서 성능 튜닝이 필요합니다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Slf4j
@EnableRedisHttpSession
@RestController
@SpringBootApplication
public class SessionWebTest1Application {
 
    public static void main(String[] args) {
        SpringApplication.run(SessionWebTest1Application.class, args);
    }
    
    @GetMapping("/request")
    public String getCookie(HttpSession session) {
        String sessionKey = session.getId();
        session.setAttribute("ID""yeoseong_yoon");
        log.info("set userId = {}","yeoseong_yoon");
        
        RestTemplate restTemplate = new RestTemplate();
        HttpHeaders header = new HttpHeaders();
        header.add("Cookie""SESSION="+redisSessionId);
        HttpEntity<String> requestEntity = new HttpEntity<>(null, header);
        
        ResponseEntity<String> cookieValue = restTemplate.exchange("http://localhost:8090/request",HttpMethod.GET ,requestEntity ,String.class);
        return "server1_sessionKey : "+session.getId()+"<br>server2_sessionKey : "+cookieValue.getBody();
    }
    
}
 
cs


App1의 클래스입니다. 우선 로그를 찍기위해 lombok 어노테이션을 사용하였고, Redis를 이용한 HttpSession 사용을 위해 @EnableRedisHttpSession 어노테이션을 선언하였습니다. 여기서 조금 특이한 점은 RestTemplate 요청에 SESSION이라는 쿠키값을 하나 포함시켜 보내는 것입니다. 잘 생각해보면 일반 웹프로젝트에서는 세션객체의 식별을 위해 JSESSIONID라는 쿠키값을 이용합니다. 이것과 동일한 용도로 Redis HttpSession은 SESSION이라는 쿠키값을 이용하여 자신의 HttpSession 객체를 식별합니다. 즉, App2에서도 동일한 HttpSession객체 사용을 위하여 SESSION 쿠키값을 보내는 것입니다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Slf4j
@EnableRedisHttpSession
@RestController
@SpringBootApplication
public class SessionWebTest2Application {
 
    public static void main(String[] args) {
        SpringApplication.run(SessionWebTest2Application.class, args);
    }
    
    @GetMapping("/request")
    public String getCookie(HttpSession session) {
        log.info("get userId = {}",session.getAttribute("ID"));
        System.out.println(session.getAttribute("ID"));
        System.out.println(session.getId());
        return session.getId();
    }
    
}
cs

App2번의 클래스입니다. App1에서 보낸 요청을 받기위한 컨트롤러가 존재합니다. 결과값으로는 HttpSession의 Id값을 리턴합니다. 그리고 App1의 컨트롤러에서는 App2번이 보낸 세션 아이디와 자신의 세션아이디를 리턴합니다. 


브라우저에서 요청한 최종 결과입니다. 두 애플리케이션의 HttpSession ID 값이 동일합니다.


각각 애플리케이션의 로그입니다. App1번에서 yeoseong_yoon이라는 데이터를 세션에 추가하였고, 해당 데이터를 App2번에서 잘 가져오는 것을 볼 수 있습니다.



마지막으로 Redis 클라이언트 명령어를 이용해 진짜 Redis에 세션관련 데이터가 들어가있는지 확인해보니 잘 들어가있습니다. (Redis serialization 설정을 적절히 맞추지 않아 yeoseong_yoon이라는 데이터 앞에 알 수 없게 인코딩된 데이터가 있내요..) 


여기까지 Spring boot환경에서 Redis를 이용한 HttpSession 사용방법이었습니다. 혹시 틀린점이 있다면 코멘트 부탁드립니다.

posted by 여성게
:
Middleware/Kafka&RabbitMQ 2019. 3. 30. 00:45

Kafka - Kafka Stream API(카프카 스트림즈) - 2



이전 카프카 스트림즈 포스팅에서는 간단하게 카프카 스트림즈 API를 다루어보았습니다. 이번 2번째 카프카 스트림즈 포스팅은 조금더 깊게 카프카 스트림즈에 대해 알아보려고 합니다.


Kafka Streams는 Kafka 프로듀서 및 컨슈머를 이용하여 들어오는 메시지를 즉각적으로 가공하여 또 다른 토픽으로 메시지를 내보낼 수 있습니다. 이러한 카프카 스트림즈를 사용하는 기업들을 소개하자면 New York Times는 카프카와 카프카 스트림즈를 이용하여 독자들을 위한 실시간 컨첸츠를 저장하고 배포합니다.그리고 라인 같은 경우는 서비스끼리 통신하기 위한 중앙 데이터 허브로 카프카를 사용합니다. 그리고 카프카 스트림즈를 이용하여 토픽을 데이터를 안정적으로 가공하고 필터링하여 컨슈머가 효율적으로 메시지를 컨슘할 수 있게 합니다. 이러한 많은 기업들이 안정적으로 실시간 데이터를 처리하기 위해 카프카와 카프카 스트림즈를 이용합니다.



카프카 메시징 계층은 데이터를 저장하고 전송하기 위해 데이터를 파티션 단위로 분할한다. 카프카 스트림즈 역시 파티션된 토픽을 기반으로 병렬 처리 모델의 논리 단위로 작업을 진행한다. 카프카 스트림즈는 키/값 형태의 스트림 데이터이므로 스트림 데이터의 키값으로 토픽내 특정 파티션으로 라우팅된다. 일반적으로 파티션 개수만큼 컨슈머 그룹안의 컨슈머들이 생성되듯이 카프카 스트림즈도 역시 파티션 개수만큼 태스크가 생성되어 병렬로 데이터 스트림을 처리할 수 있다. 또한 작업스레드 수를 조정할 수 있어 더욱 효율적인 병렬처리가 가능하다.


Required configuration parameters


  • application.id - 스트림 처리 응용 프로그램의 식별자 아이디이다. Kafka 클러스터 내에서 유일한 값이어야 한다.
  • bootstrap.servers - Kafka 인스턴스 호스트:포트 정보이다.


Optional configuration parameters(중요도 보통이상의 설정값)


  • cache.max.bytes.buffering - 모든 스레드에서 레코드 캐시에 사용할 최대 메모리 바이트 수입니다.(default 10485760 bytes)
  • client.id - 요청시 서버에 전달할 ID 문자열이다. 카프카 스트림즈 내부적으로 프로듀서,컨슈머에게 전달된다(default empty string)
  • default.deserialization.exception.handler - DeserializationExceptionHandler인터페이스를 구현하는 예외 처리 클래스이다.

(default  LogAndContinueExceptionHandler)

  • default.production.exception.handler - ProductionExceptionHandler인터페이스를 구현하는 예외 처리 클래스이다.

  (default DefaultProductionExceptionHandler)

  • key.serde - record key에 대한 직렬화/역직렬화 클래스이다. Serde 인터페이스를 구현한다.(default Serdes.ByteArray().getClass().getName() )
  • num.standby.replicas - 각 태스크의 대기 복제본 수이다.(default 0)
  • num.stream.threads - 스트림을 처리할 스레드 수이다.(default 1)
  • replication.factor - 복제본 수이다.(default 1)
  • retries - 처리 실패시 재시도 횟수(default 0)
  • retry.backoff.ms - 재시도 요청 간격 시간(default 100밀리세컨드)
  • state.dir - 상태 저장소 디렉토리 위치(default /tmp/kafka-streams)
  • value.serde - record value에 대한 직렬화/역직렬화 클래스이다. key.serde와 동일.


default.deserialization.exception.handler


기본 deserialization 예외 처리기를 사용하면 deserialize에 실패한 레코드의 예외를 관리할 수 있다. 예외 처리기는 throw된 예외에 따라 FAIL 또는 CONTINUE를 반환해야한다. FAIL은 스트림이 종료될 것이고, CONTINUE는 예외를 무시하고 계속해서 처리를 진행할 것이다. 내부적으로 제공하는 예외 핸들어가 있다.


  • LogAndContinueExceptionHandler - 예외가 발생하여도 계속해서 프로세스를 진행한다.
  • LogAndFailExceptionHandler - 예외가 발생하면 프로세스를 중지한다.

기본적으로 제공하는 핸들러 이외에도 사용자가 직접 정의하여 예외 핸들러를 구현할 수 있다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class SendToDeadLetterQueueExceptionHandler implements DeserializationExceptionHandler {
    KafkaProducer<byte[], byte[]> dlqProducer;
    String dlqTopic;
 
    @Override
    public DeserializationHandlerResponse handle(final ProcessorContext context,
                                                 final ConsumerRecord<byte[], byte[]> record,
                                                 final Exception exception) {
 
        log.warn("Exception caught during Deserialization, sending to the dead queue topic; " +
            "taskId: {}, topic: {}, partition: {}, offset: {}",
            context.taskId(), record.topic(), record.partition(), record.offset(),
            exception);
 
        dlqProducer.send(new ProducerRecord<>(dlqTopic, record.timestamp(), record.key(), record.value(), record.headers())).get();
 
        return DeserializationHandlerResponse.CONTINUE;
    }
 
    @Override
    public void configure(final Map<String, ?> configs) {
        dlqProducer = .. // get a producer from the configs map
        dlqTopic = .. // get the topic name from the configs map
    }
}
cs


위의 예제코드는 예외가 발생하면 DLQ 토픽에 메시지를 보낸 이후 계속해서 프로세스를 진행하는 예외 핸들러이다. 위에서 이야기하였듯이 반드시 FAIL,CONTINUE 둘중하나를 반환해야 한다.


  • default.production.exception.handler 


프로덕션 예외 처리기를 사용하여 너무 큰 메시지 데이터를 생성하는 등 브로커와 상호 작용하려고 할때 트리거되는 예외를 컨트롤 할 수 있습니다. 기본적으로 카프카는 DefaultProductionExceptionHandler를 제공한다. 이 예외 처리기도 역시 FAIL,CONTINUE등 하나를 반환하여 프로세스 진행여부를 결정해주어야 한다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse;
 
public class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler {
    public void configure(Map<String, Object> config) {}
 
    public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
                                                     final Exception exception) {
        if (exception instanceof RecordTooLargeException) {
            return ProductionExceptionHandlerResponse.CONTINUE;
        } else {
            return ProductionExceptionHandlerResponse.FAIL;
        }
    }
}
 
Properties settings = new Properties();
 
// other various kafka streams settings, e.g. bootstrap servers, application id, etc
 
settings.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
             IgnoreRecordTooLargeHandler.class);
cs



Stream DSL


  • branch - 제공되는 predicate에 따라서 적절히 여러개의 KStream으로 분할한다.(KStream[] 반환)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
 * branch
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam {
 
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        source.foreach((key,value)->log.info("key,value ={}",key+" "+value));
        KStream<StringString>[] streamArr = source.branch(
            (key,value) -> value.startsWith("A"),
            (key,value) -> value.startsWith("B"),
            (key,value) -> true
        );
        
        streamArr[0].to("stream-exam-output1");
        streamArr[1].to("stream-exam-output2");
        streamArr[2].to("stream-exam-output3");
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs


위의 코드는 stream-exam-input이라는 토픽에 "A"로 시작하는 데이터가 들어오면 stream-exam-output1, "B"로 시작하는 데이터가 들어오면 stream-exam-output2, 그 밖의 데이터는 stream-exam-output3으로 보내는 간단한 branch 예제이다. branch에는 적절하게 Predicate를 리스트 형태로 작성한다. 데이터의 값에 따라 다른 처리를 해야할때에는 branch를 사용하면 각각 다른 처리를 하는 토픽으로 메시지를 내보낼 수 있을 것 같다.


  • filter - 적절한 Predicate을 인자로 받아 메시지를 필터링한다.(KStream 반환)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
 * filter
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam2 {
 
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        source.filter( (key,value)->value.length()>3 ).to("stream-exam-output");;
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs


위의 예제는 토픽으로 메시지를 입력받아 value 값의 길이가 3보다 크다면 stream-exam-output 토픽으로 메시지를 내보내는 예제이다.


  • filterNot - filter와 반대되는 개념이다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
 * filterNot
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam3 {
 
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        source.filterNot( (key,value)->value.length()>3 ).to("stream-exam-output");;
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs

fliter와는 반대되는 개념이다. 크게 설명할 부분을 없을 것같다.

  • flatMap - 하나의 레코드를 이용해 0개 혹은 하나 이상의 레코드를 생성한다. flatMap은 key,value가 바뀔 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
 * flatMap
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam4 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        source.flatMap(
            new KeyValueMapper() {
                @Override
                public Object apply(Object key, Object value) {
                    List<KeyValue<Object, Object>> list = new LinkedList<>();
                    String keyStr = (String)key;
                    String valueStr = (String)value;
                    list.add(KeyValue.pair(valueStr.toUpperCase(),value));
                    list.add(KeyValue.pair(valueStr.toLowerCase(),value));
                    return list;
                }
            }
        ).to("stream-exam-output");;
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs

위의 예제는 메시지를 받아서 해당 메시지의 값을 대문자,소문자를 키로하여 2개의 레코드를 만드는 예제이다.

  • flatMapValues - 원본 레코드의 키를 유지하면서 하나의 레코드를 가져와서 0개 혹은 하나 이상의 레코드를 생성한다. value의 값과 값의 타입이 바뀔 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
 * flatMapValue
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam5 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        source.flatMapValues(value->Arrays.asList(value.split(" "))).to("stream-exam-output");
//        source.flatMapValues(
//                new ValueMapper() {
//                    @Override
//                    public Object apply(Object value) {
//                        String valueStr = (String)value;
//                        return Arrays.asList(valueStr.split(" "));
//                    }
//                }
//        );
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs

위의 예제는 메시지의 값에 공백을 포함하고 있으면 해당 메시지를 공백기준으로 split하여 list로 반환한다. 그럼 해당 List는 flat되어 여러개의 String Stream으로 변환되어 stream-exam-output 토픽으로 메시지가 전달된다.

  • GroupByKey - 기존 키로 레코드를 그룹핑한다. 스트림이나 테이블을 집계하고 후속작업을 위해 키로 그룹화하여 파티션하는 것을 보장한다. 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
 * GroupByKey
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam6 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        KGroupedStream<StringString> groupedStream = source.flatMap(
            new KeyValueMapper() {
                @Override
                public Object apply(Object key, Object value) {
                    List<KeyValue<Object, Object>> list = new LinkedList<>();
                    String keyStr = (String)key;
                    String valueStr = (String)value;
                    list.add(KeyValue.pair(value,valueStr.toUpperCase()));
                    list.add(KeyValue.pair(value,valueStr.toLowerCase()));
                    return list;
                }
            }
        ).groupByKey();
        
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs


스트림의 기존 키값을 이용하여 그룹핑한다.


GroupBy - 새로운 키로 그룹핑한다. 테이블을 그룹핑할 때는 새로운 값과 값 유형을 지정할 수도 있다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
 * GroupBy
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam7 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        KGroupedStream<StringString> groupedStream = source.groupBy(
                (key,value)->value, Serialized.with(Serdes.String(),Serdes.String())
        );
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs


위의 예제와 같이 기존 스트림에서 받은 키값을 그대로 사용하는 것이 아니라 변경하여 사용할 수 있다.(예제는 value를 키로 사용한다)


  • map - 하나의 레코드를 가져와서 다른 하나의 레코드를 생성한다. 타입을 포함하여 키와 값을 수정할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
 * map
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam8 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        source.map(
            (key,value)->KeyValue.pair(value, key)
        );
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs


메시지를 받아서 키와 값을 바꿔주는 map 예제이다.


  • mapValues - 하나의 레코드를 받아서 값을 바꿔준다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
 * mapValues
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam9 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        
        source.mapValues(
            (value)->value+"_map"
        ).to("stream-exam-output");
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs


메시지를 받아서 값에 "_map"이라는 문자열을 추가하여 수정하였다.


  • merge - 두 스트림의 레코드를 하나의 스트림으로 merge한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
 * merge
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam10 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
        source.foreach((key,value)->log.info("key,value ={}",key+" "+value));
        KStream<StringString>[] streamArr = source.branch(
            (key,value) -> value.startsWith("A"),
            (key,value) -> value.startsWith("B"),
            (key,value) -> true
        );
        
        KStream<StringString> merge1 = streamArr[0].merge(streamArr[1]);
        merge1.merge(streamArr[2]).to("stream-exam-output");
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs



  • selectKey - 각 레코드에 새키를 할당한다. 마치 map을 이용하여 key값을 바꾸는 것과 동일하다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
 * selectKey
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaStreamExam11 {
 
    @SuppressWarnings("unchecked")
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-exam");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("stream-exam-input");
 
        source.selectKey(
  //리턴되는 값이 키가된다.
            (key,value)->value.charAt(0)+""
        ).to("stream-exam-output");
            
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        streams.start();
    }
 
}
 
cs


  • toStream - KTable을 스트림으로 가져온다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Slf4j
public class WordCounter {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
      
        final StreamsBuilder builder = new StreamsBuilder();
        
        NoriAnalyzer analyzer = new NoriAnalyzer();
      
        KStream<StringString> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(analyzer.analyzeForString(value).split(" ")))
                //return하는 value가 키가 된다. 리턴값으로 그룹핑된 카프카스트림 객체를 리턴한다. KGroupedStream
                .groupBy(new KeyValueMapper<StringStringString>() {
                  @Override
                  public String apply(String key, String value) {
                    log.info("key = {},value = {}",key,value);
                    return value;
                  }
                })
                //count()를 호출하여 해당 키값으로 몇개의 요소가 있는지 체크한다. 그리고 해당 데이터를 스토어에(KeyValueStore<Bytes,byte[]> counts-store) 담고
                //변경이 있는 KTable에 대해서만 결과값을 리턴한다.
                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
                .toStream() //KTable -> Stream
                .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
      
        final Topology topology = builder.build();
        System.out.println(topology.describe());
      
        final KafkaStreams streams = new KafkaStreams(topology, props);
      
        try {
          streams.start();
        } catch (Throwable e) {
          System.exit(1);
        }
    }
}
 
cs


지금까지는 메시지의 상태가 없는 StreamDSL이었다. 다음으로 알아볼 DSL은 이전 메시지의 상태를 참조하는 상태기반 스트림이다. 아마 실시간으로 데이터 스트림을 처리할때 상태가 필요없는 처리도 있겠지만, 오늘하루 사용자가 많이 문의한 단어등 단어의 빈도수를 계산해야하는 스트림이 있다고 해보자. 그렇다면 지금 이순간 전까지 해당 단어는 몇번이 누적되었는지를 알아야 이시간 이후로 들어오는 같은 단어도 이전 빈도수에 누적하여 통계를 낼 것이다. 이렇게 상태에 기반하여 처리해야하는 스트림을 처리하는 것이 상태기반 스트림이다.



다음 포스팅에서 다루어볼 내용은 Stateful transformations이다. 레퍼런스의 양이 굉장히 많은 부분으로 조금 정리한 후 포스팅하려한다.


posted by 여성게
:
Middleware/Kafka&RabbitMQ 2019. 3. 30. 00:45

kafka is a distributed streaming platform

이번 포스팅은 Spring Cloud Stream 환경에서의 kafka Streams API입니다. 물론 이전 포스팅들에서 자바코드로 카프카 스트림즈를 다루어봤지만 이번에는 스프링 클라우드 스트림즈 환경에서 진행합니다. 카프카 스트림즈에 대한 설명은 이전 포스팅에서 진행하였기에 개념적인 설명은 하지 않고 코드레벨로 다루어보겠습니다. 혹시나 카프카 스트림즈를 모르시는 분이 있으시다면 아래 링크를 참조하시길 바랍니다.

지금부터 진행될 예제 및 설명은 모두 Spring Cloud Stream Reference를 참조하였습니다. 번역 중 오역이 있을 수 있습니다.

▶︎▶︎▶︎2019/03/26 - [Kafka&RabbitMQ] - Kafka - Kafka Streams API(카프카 스트림즈)

 

Kafka - Kafka Streams API(카프카 스트림즈)

Kafka - Kafka Streams API(카프카 스트림즈) 카프카는 대규모 메시지를 저장하고 빠르게 처리하기 위해 만들어진 제품이다. 초기 사용 목적과는 다른 뛰어난 성능에 일련의 연속된 메시지인 스트림을 처리하는..

coding-start.tistory.com

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
<?xml version="1.0" encoding="UTF-8"?>
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>kafka-streams-exam</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka-streams-exam</name>
    <description>Demo project for Spring Boot</description>
 
    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Greenwich.SR1</spring-cloud.version>
    </properties>
 
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
 
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
 
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
 
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
 
</project>
 
http://colorscripter.com/info#e" target="_blank" style="color:#e5e5e5; text-decoration:none">Colored by Color Scripter
http://colorscripter.com/info#e" target="_blank" style="text-decoration:none; color:white">cs

의존성을 추가해줍니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@SpringBootApplication
@EnableBinding(KStreamProcessor.class)
public class WordCountProcessorApplication {
 
    @StreamListener("input")
    @SendTo("output")
    public KStream<?, WordCount> process(KStream<?, String> input) {
        return input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, value) -> value)
                .windowedBy(TimeWindows.of(5000))
                .count(Materialized.as("WordCounts-multi"))
                .toStream()
                .map((key, value) -> new KeyValue<>(nullnew WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
    }
 
    public static void main(String[] args) {
        SpringApplication.run(WordCountProcessorApplication.class, args);
    }
}
cs

해당 코드를 레퍼런스에서 소개하는 카프카 스트림즈의 예제코드이다 토픽에서 메시지를 받아서 적절하게 데이터를 가공하여 특정 토픽으로 데이터를 내보내고 있다 내부 로직은 마치 Java1.8의 Stream와 비슷한 코드같이 보인다 메소드를 보면 매개변수와 반환타입이 모두 KStream객체다 나가고 들어노는 객체의 타입은 하나로 고정되어있으므로 비지니스로직에 조금이라도 더 집중할 수 있을 것 같다.

 

Configuration Option

Kafka Streams Properties는 모두 spring.cloud.stream.kafka.streams.binder 접두어가 붙는다. 

 

  • brokers - broker URL
  • zkNodes - zookeeper URL
  • serdeError - 역 직렬화 오류 처리 유형. logAndContinue,logAndFail,sendToDlq 
  • applicationId - 애플리케이션에서 사용하는 카프카 스트림 아이디값. 모든 카프카 스트림의 아이디값은 유일해야한다.

Producer Properties

  • keySerde - key serde, 키의 직렬화 옵션(default value = none)
  • valueSerde -  value serde, 값의 직렬화 옵션 (default value = none)
  • userNativeEncoding - native 인코딩 활성화 플래그(default value = false)

Consumer Properties

  • keySerde - key serde, 키의 역직렬화 옵션(default value = none)
  • valueSerde - value serde, 값의 역직렬화 옵션(default value = none)
  • materializedAs - KTable 타입을 사용할 때 상태저장소를 구체화한다.(default value = none)
  • useNativeDecoding - native 디코드 활성화 플래그(default value = false)
  • dlqName - DLQ 토픽 이름(default value = none)

기타 다른 설정들은 레퍼런스 참고하시길 바랍니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
 * in,out channel defined
 * 즉, 카프카 토픽에 메시지를 쓰는 발신 채널과
 * 카프카 토픽에서 메시지를 읽어오는 수신 채널을 정의해주는 것이다.
 * 꼭 @Input & @Output 어노테이션을 하나씩 넣을 필요는 없다.
 * 필요한 채널수만큼 정의가능하다.
 * 
 * 런타임동안에는 스프링이 구현체를 제공해준다.
 * @author yun-yeoseong
 *
 */
public interface ExamProcessor {
    String INPUT = "exam-input";
    String OUTPUT = "exam-output";
    String OUTPUT2 = "exam-output2";
    
    /**
     * @Input 어노테이션 설명
     * 메시지 시스템에서 메시지를 읽어오는 입력채널 
     * 매겨변수로 채널이름을 넣을 수 있다. 넣지 않으면 기본으로
     * 메소드 이름으로 들어간다.
     * @return
     */
    @Input(INPUT)
    SubscribableChannel inboundChannel();
    
    @Input("streams-input")
    KStream<?, String> inboundChannel2();
    
    @Input("streams-input2")
    KStream<?, String> inboundChannel3();
    /**
     * @Output 어노테이션 설명
     * 메시지 시스템으로 메시지를 보내는 출력채널
     * 매겨변수로 채널이름을 넣을 수 있다. 넣지 않으면 기본으로
     * 메소드 이름으로 들어간다.
     * @return
     */
    @Output(OUTPUT)
    MessageChannel outboundChannel();
    
    @Output(OUTPUT2)
    MessageChannel outboundChannel2();
    
    @Output("streams-output")
    KStream<?, String> outboundChannel3();
    
}
 
cs

카프카 스트림즈를 스프링 클라우드 스트림에서 사용하려면 input&output을 KStream을 반환타입으로 채널을 정의한다. 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Slf4j
@EnableBinding(ExamProcessor.class)
@SpringBootApplication
public class KafkaStreamsExamApplication {
 
    public static void main(String[] args) {
        SpringApplication.run(KafkaStreamsExamApplication.class, args);
    }
    
    @StreamListener(target= ExamProcessor.INPUT,condition="headers['producerName']=='yeoseong'")
    public void receive(@Payload Exam exam) {
        log.info("Only Header value yeoseong = {}",exam.toString());
    }
    
    @StreamListener("streams-input")
    @SendTo("streams-output")
    public KStream<?, String> streams(KStream<?, String> input){
        return input.flatMapValues(new ValueMapper() {
            @Override
            public Object apply(Object value) {
                // TODO Auto-generated method stub
                String valueStr = (String)value;
                System.out.println(valueStr);
                return Arrays.asList(valueStr.split(" "));
            }
        });
    }
    
    @StreamListener("streams-input2")
    public void streams2(KStream<?, String> input) {
        System.out.println("streams2");
        input.foreach( (key,value) -> System.out.println(value));
    }
}
 
cs

위에서 보이듯이 sink processor라면 그 밑으로 더 이상 processor가 붙지 않으므로 void 반환타입으로 데이터를 입력받아 적절한 처리를 하면된다. 하지만 밑으로 스트림이 더 붙는 스트림들은 반환타입으로 KStream을 반환해야한다. 지금까지 간단하게 Kafka Streams API를 Spring Cloud Stream에서 사용해보았다. kafka streams DSL의 자세한 문법은 곧 포스팅할 카프카 스트림즈 API에 대한 글을 참고하시거나 카프카 레퍼런스를 참고하시면 될듯합니다. 

 

posted by 여성게
:
Middleware/Kafka&RabbitMQ 2019. 3. 28. 16:41

Kafka - Spring cloud stream kafka(스프링 클라우드 스트림 카프카)



이전 포스팅까지는 카프카의 아키텍쳐, 클러스터 구성방법, 자바로 이용하는 프로듀서,컨슈머 등의 글을 작성하였다. 이번 포스팅은 이전까지 작성된 지식을 바탕으로 메시징 시스템을 추상화한 구현체인 Spring Cloud Stream을 이용하여 카프카를 사용하는 글을 작성하려고 한다. 혹시라도 카프카에 대해 아직 잘모르는 사람들이 이 글을 본다면 이전 포스팅을 한번 참고하고 와도 좋을 것같다.(이번에 작성하는 포스팅은 Spring Cloud stream 2.0 레퍼런스 기준으로 작성하였다.)

그리고 이번 포스팅에서 진행하는 모든 예제는 카프카를 미들웨어로 사용하는 예제이고, 카프카는 클러스터를 구성하였다.


▶︎▶︎▶︎Spring Cloud Stream v2.0 Reference

▶︎▶︎▶︎2019/03/12 - [Kafka&RabbitMQ] - Kafka - Kafka(카프카)의 동작 방식과 원리

▶︎▶︎▶︎2019/03/13 - [Kafka&RabbitMQ] - Kafka - Kafka(카프카) cluster(클러스터) 구성 및 간단한 CLI사용법

▶︎▶︎▶︎2019/03/16 - [Kafka&RabbitMQ] - Kafka - Kafka Producer(카프카 프로듀서) In Java&CLI

▶︎▶︎▶︎2019/03/24 - [Kafka&RabbitMQ] - Kafka - Kafka Consumer(카프카 컨슈머) Java&CLI

▶︎▶︎▶︎2019/03/26 - [Kafka&RabbitMQ] - Kafka - Kafka Streams API(카프카 스트림즈)


스프링 클라우드 스트림을 이용하면 RabbitMQ,Kafka와 같은 미들웨어 메시지 시스템과는 종속적이지 않게 추상화된 방식으로 메시지 시스템을 이용할 수 있다.(support RabbitMQ,Kafka) 아래 그림은 스프링 클라우드 스트림의 애플리케이션 모델이다.

스프링 클라우드 스트림 애플리케이션과 메시지 미들웨어 시스템은 직접 붙지는 않는다. 중간에 스프링 클라우드 스트림이 제공하는 바인더 구현체를 중간에 두고 통신을 하기 때문에 애플리케이션에서는 미들웨어 독립적으로 추상화된 방식으로 개발 진행이 가능하다. 그리고 애플리케이션과 바인더는 위의 그림과 같이 inputs, outputs 채널과 통신을 하게 된다.

  • Binder : 외부 메시징 시스템과의 통합을 담당하는 구성 요소입니다.
  • Binding(input/output) : 외부 메시징 시스템과 응용 프로그램 간의 브리지 (대상 바인더에서 생성 한 메시지 생성자  소비자 ).
  • Middleware : RabbitMQ, Kafka와 같은 메시지 시스템.

바인더 같은 경우는 스프링이 설정에서 읽어 미들웨어에 해당하는 바인더를 구현체로 제공해준다. 물론 RabbitMQ, Kafka를 동시에 사용 가능하다. 그리고 바인딩 같은 경우는 기본으로 Processor(input,output),Source(output),Sink(input)라는 바인딩을 인터페이스로 제공한다. 이러한 스프링 클라우드 스트림을 이용하여 작성한 완벽히 동작하는 코드의 예제이다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@SpringBootApplication
@EnableBinding(Processor.class)
public class MyApplication {
 
    public static void main(String[] args) {
        SpringApplication.run(MyApplication.class, args);
    }
 
    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public String handle(String value) {
        System.out.println("Received: " + value);
        return value.toUpperCase();
    }
}
cs


이 코드는 Processor라는 바인딩 채널을 사용하고, handle이라는 메소드에서 Processor의 input 채널에서 메시지를 받아서 값을 한번 출력하고 그대로 output 채널로 메시지를 보내는 코드이다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public interface Sink {
 
  String INPUT = "input";
 
  @Input(Sink.INPUT)
  SubscribableChannel input();
 
}
public interface Source {
 
  String OUTPUT = "output";
 
  @Output(Source.OUTPUT)
  MessageChannel output();
 
}
 
public interface Processor extends Source, Sink {}
cs


스프링 클라우드 스트림에서 기본적으로 제공하는 바인딩 채널이다. 만약 이 채널들 이외에 채널을 정의하고 싶다면 위와 같이 인터페이스로 만들어주고, @EnableBinding에 매개변수로 넣어주면된다.(매개변수는 여러개의 인터페이스를 받을 수 있다.)


1
2
3
4
5
6
7
8
9
10
11
public interface Barista {
 
    @Input
    SubscribableChannel orders();
 
    @Output
    MessageChannel hotDrinks();
 
    @Output
    MessageChannel coldDrinks();
}
cs

또한 채널 바인딩 어노테이션(@Input,@Output) 매개변수로 채널이름을 넣어줄 수 있다.

1
@EnableBinding(value = { Orders.class, Payment.class })
cs


스프링 클라우드 스트림에서 바인딩 가능한 채널타입은 MessageChannel(inbound)와 SubscribableChannel(outbound) 두개이다.

1
2
3
4
5
6
public interface PolledBarista {
 
    @Input
    PollableMessageSource orders();
    . . .
}
cs


지금까지는 이벤트 기반 메시지이지만 위처럼 Pollable한 채널을 바인딩 할 수 있다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Autowire
private Source source
 
public void sayHello(String name) {
    source.output().send(MessageBuilder.withPayload(name).build());
}
 
@Autowire
private MessageChannel output;
 
public void sayHello(String name) {
    output.send(MessageBuilder.withPayload(name).build());
}
 
@Autowire
@Qualifier("myChannel")
private MessageChannel output;
cs


정의한 채널에 대한 @Autowired하여 직접 사용도 가능하다. 그리고 @Qualifier 어노테이션을 이용해 다중 채널이 정의되어 있을 경우 특정한 채널을 주입받을 수 있다.


@StreamListener 사용

스프링 클라우드 스트림은 다른 org.springframework.messaging의 어노테이션도 같이 사용가능하다.(@Payload,@Headers,@Header)


1
2
3
4
5
6
7
8
9
10
11
@EnableBinding(Sink.class)
public class VoteHandler {
 
  @Autowired
  VotingService votingService;
 
  @StreamListener(Sink.INPUT)
  public void handle(Vote vote) {
    votingService.record(vote);
  }
}
cs


1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
@Slf4j
public class KafkaListener {
    
    @StreamListener(ExamProcessor.INPUT)
    @SendTo(ExamProcessor.OUTPUT2)
    public Exam listenMessage(@Payload Exam payload,@Header("contentType"String header) {
        log.info("input message = {} = {}",payload.toString(),header);
        return payload;
    }
}
 
=>결과 : input message = Exam(id=id_2, describe=test message != application/json
 
cs


또한 @SendTo 어노테이션을 추가로 붙여서 메시지를 수신하고 어떠한 처리를 한 후에 메시지를 출력채널로 내보낼 수 있다.


1
2
3
4
5
6
7
8
9
10
11
12
@EnableBinding(Processor.class)
public class TransformProcessor {
 
  @Autowired
  VotingService votingService;
 
  @StreamListener(Processor.INPUT)
  @SendTo(Processor.OUTPUT)
  public VoteResult handle(Vote vote) {
    return votingService.record(vote);
  }
}
cs


바로 위의 코드는 즉, 인바운드 채널에서 메시지를 받아서 그대로 해당 데이터를 리턴하여 아웃바운드 채널에 내보내는 예제이다. 실제로는 메소드 내에 비지니스 로직이 들어가 적절히 데이터 조작이 있을 수 있다.


@StreamListener for Content-based routing

조건별로 @StreamListener 주석처리가 된 인입채널 메소드로 메시지를 유입시킬 수 있다. 하지만 이 기능에는 밑에와 같은 조건을 충족해야한다.


  • 반환값이 있으면 안된다.
  • 개별 메시지 처리 메소드여야한다.

조건은 어노테이션 내의 condition 인수에 SpEL 표현식에 의해 지정된다. 그리고 조건과 일치하는 모든 핸들러는 동일한 스레드에서 호출되며 핸들러에 대한 호출이 발생하는 순서는 가정할 수 없다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class TestPojoWithAnnotatedArguments {
 
    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bogey'")
    public void receiveBogey(@Payload BogeyPojo bogeyPojo) {
       // handle the message
    }
 
    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bacall'")
    public void receiveBacall(@Payload BacallPojo bacallPojo) {
       // handle the message
    }
}
cs


위에서 얘기한 것과 같이 조건별로 메시지를 분기하는 경우 해당 채널로 @StreamListener된 메소드들은 모두 반환값이 void여야한다.(Sink.INPUT) 만약 위에서 위는 void, 아래에는 반환값이 있는 메소드로 선언한다면 예외가 발생한다.



Error Handler

Spring Cloud Stream은 오류 처리를 유연하게 처리하는 메커니즘을 제공한다. 오류 처리에는 크게 두가지가 있다.

  • application : 사용자 정의 오류처리이며, 애플리케이션 내에서 오류 처리를 진행한다.
  • system : 오류 처리가 기본 메시징 미들웨어의 기능에 따라 달라진다.


Application Error Handler


애플리케이션 레벨 오류 처리에는 두 가지 유형이 있다. 오류는 각 바인딩 subscription에서 처리되거나 전역 오류 핸들러가 모든 바인딩 subscription의 오류를 처리한다. 각 input 바인딩에 대해, Spring Cloud Stream은 <destinationName>.<groupName>.errors 설정으로 전용 오류 채널을 생성한다.


1
spring.cloud.stream.bindings.input.group=myGroup
cs


컨슈머 그룹을 설정한다.(만약 그룹명을 지정하지 않았을 경우 익명으로 그룹이 생성된다.)


1
2
3
4
5
6
7
8
9
@StreamListener(Sink.INPUT) // destination name 'input.myGroup'
public void handle(Person value) {
    throw new RuntimeException("BOOM!");
}
//만약 Sink.INPUT의 input 채널이름이 input이고, 해당 채널의 consumer group이 myGroup이라면
//해당 채널 단독의 에러 처리기의 inputChannel 매개변수의 값은 아래와 같다.("input.myGroup.errors")
@ServiceActivator(inputChannel = "input.myGroup.errors"//channel name 'input.myGroup.errors'
public void error(Message<?> message) {
    System.out.println("Handling ERROR: " + message);
}
cs


위와 같은 코드를 작성하면 Sink.INPUT.myGroup 채널의 전용 에러채널이 생기는 것이다. 하지만 이렇게 채널별이 아닌 전역 에러 처리기를 작성하려면 아래와 같은 코드를 작성하면 된다.


1
2
3
4
@StreamListener("errorChannel")
public void error2(Message<?> message) {
    log.error("Global Error Handling !");
}
cs


System Error Handling


System 레벨의 오류 처리는 오류가 메시징 시스템에 다시 전달되는 것을 의미하며, 모든 메시징 시스템이 동일하지는 않다. 즉, 바인더마다 기능이 다를 수 있다. 내부 오류 처리기가 구성되어 있지 않으면 오류가 바인더에 전파되고 바인더가 해당 오류를 메시징 시스템에 전파한다. 메시징 시스템의 기능에 따라 시스템은 메시지를 삭제하고 메시지를 다시 처리하거나 실패한 메시지를 DLQ로 보낼 수 있다. 해당 내용은 뒤에서 더 자세히 다룬다.


바인딩 정보 시각화


1
2
3
4
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
cs


의존성을 추가해준다.


1
management.endpoints.web.exposure.include=bindings
cs


application.propertis에 해당 설정을 추가해준 후에, host:port/actuator/bindings 를 호출하면 바인딩된 정보를 JSON형태로 받아볼 수 있다.



Binder Configuration Properties


Spring Auto configuration을 사용하지 않고 사용자 정의 바인더를 등록할때 다음 등록 정보들을 사용할 수 있다. 이 속성들은 모두 org.springframework.cloud.stream.config.BinderProperties 패키지에 정의되어있다. 그리고 해당 설정들은 모두 spring.cloud.stream.binders. 접두어가 붙는다. 밑에서 설명할 설정들은 모두 접두어가 생략된 형태이므로 실제로 작성할때는 접두어를 붙여준다.


  • type

바인더의 타입을 지정한다.(rabbit,kafka)

  • inheritEnvironment

애플리케이션 자체 환경을 상속하는지 여부

기본값 : true

  • environment

바인더 환경을 사용자가 직접 정의하는데 사용할 수 있는 설정이다.

기본값 : empty

  • defaultCandidate

바인더 구성이 기본 바인더로 간주되는지 또는 명시적으로 참조할 때만 사용할 수 있는지 여부 이 설정을 사용하면 기본 처리를 방해하지 않고 바인더 구성이 가능하다.

기본값 : true



Common Binding Properties


binding 설정입니다. 해당 설정은 spring.cloud.stream.bindings.<channelName> 접두어가 붙는다. 밑에서 설명할 설정은 모두 접두어가 생략된 설정이므로, 직접 애플리케이션을 설정할때에는 꼭 접두어를 붙여줘야한다.


  • destination

해당 채널을 메시지 시스템 토픽과 연결해주는 설정이다. 채널이 consumer로 바인딩되어 있다면, 여러 대상에 바인딩 될 수 있으며 대상 이름은 쉼표로 구분된 문자열이다.

  • group

컨슈머 그룹명설정이다. 해당 설정은 인바운드 바인딩에만 적용되는 설정이다.

기본값 : null

  • contentType

메시지의 컨텐츠 타입이다.

기본값 : null

  • binder

사용될 바인더를 설정한다.

기본값 : null



Consumer Properties

  • concurrency

인바운드 소비자의 동시성

기본값 : 1.

  • partitioned

컨슈머가 파티션된 프로듀서로부터 데이터를 수신하는지 여부입니다.

기본값 : false.

  • headerMode

none으로 설정하면 입력시 헤더 구문 분석을 사용하지 않습니다. 기본적으로 메시지 헤더를 지원하지 않으며 헤더 포함이 필요한 메시징 미들웨어에만 유효합니다. 이 옵션은 원시 헤더가 지원되지 않을 때 비 Spring Cloud Stream 애플리케이션에서 데이터를 사용할 때 유용합니다. 로 설정 headers하면 미들웨어의 기본 헤더 메커니즘을 사용합니다. 로 설정 embeddedHeaders하면 메시지 페이로드에 헤더가 포함됩니다.

기본값 : 바인더 구현에 따라 다릅니다.

  • maxAttempts

처리가 실패하면 다시 메시지를 처리하는 시도 횟수 (첫 번째 포함). 1로 설정하면 다시 메시지 처리를 시도하지 않는다.

기본값 : 3.

  • backOffInitialInterval

다시 시도 할 때 백 오프 초기 간격입니다.

기본값 : 1000.

  • backOffMaxInterval

최대 백 오프 간격.

기본값 : 10000.

  • backOffMultiplier

백 오프 승수입니다.

기본값 : 2.0.

  • instanceIndex

0보다 큰 값으로 설정하면이 소비자의 인스턴스 색인을 사용자 정의 할 수 있습니다 (다른 경우spring.cloud.stream.instanceIndex). 음수 값으로 설정하면 기본값은로 설정됩니다.(카프카의 경우 autoRebalence 설정이 false 일 경우)

기본값 : -1.

  • instanceCount

0보다 큰 값으로 설정하면이 소비자의 인스턴스 수를 사용자 정의 할 수 있습니다 (다른 경우 spring.cloud.stream.instanceCount). 음수 값으로 설정하면 기본값은로 설정됩니다.(카프카의 경우 autoRebalence 설정이 false 일 경우)

기본값 : -1.


Producer Properties

  • partitionKeyExpression

아웃 바운드 데이터를 분할하는 방법을 결정하는 SpEL 식입니다. set 또는 ifpartitionKeyExtractorClass가 설정되면이 채널의 아웃 바운드 데이터가 분할됩니다.partitionCount효과가 있으려면 1보다 큰 값으로 설정해야합니다. 

기본값 : null.

  • partitionKeyExtractorClass

PartitionKeyExtractorStrategy구현입니다. set 또는 if partitionKeyExpression가 설정되면이 채널의 아웃 바운드 데이터가 분할됩니다. partitionCount효과가 있으려면 1보다 큰 값으로 설정해야합니다. 

기본값 : null.

  • partitionSelectorClass

PartitionSelectorStrategy구현입니다.

기본값 : null.

  • partitionSelectorExpression

파티션 선택을 사용자 정의하기위한 SpEL 표현식. 

기본값 : null.

  • partitionCount

파티셔닝이 사용 가능한 경우 데이터의 대상 파티션 수입니다. 제작자가 분할 된 경우 1보다 큰 값으로 설정해야합니다. 카프카에서는 힌트로 해석됩니다. 이것보다 크고 대상 항목의 파티션 수가 대신 사용됩니다.

기본값 : 1.


Content-Type


Spring Cloud Stream은 contentType에 대해 세 가지 메커니즘을 제공한다.


  • Header 

contentType 자체를 헤더로 제공한다.

  • binding

spring.cloud.stream.bindings.<inputChannel>.content-type 설정으로 타입을 설정한다.

  • default

contentType이 명시적으로 설정되지 않은 경우 기본 application/json 타입으로 적용한다.


위의 순서대로 우선순위 적용이 된다.(Header>bindings>default) 만약 메소드 반환 타입이 Message 타입이면 해당 타입으로 메시지를 수신하고, 만약 일반 POJO로 반환타입이 정의되면 컨슈머에서 해당 타입으로 메시지를 수신한다.





Apache Kafka Binder


1
2
3
4
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
cs


의존성을 추가해준다.


Kafka Binder Properties

  • spring.cloud.stream.kafka.binder.brokers

Kafka 바인더가 연결될 브로커 목록이다.

기본값 : localhost

  • spring.cloud.stream.kafka.binder.defaultBrokerPort

brokers 설정에 포트 정보가 존재하지 않는다면 해당 호스트 리스트들이 사용할 포트를 지정해준다.

기본값 : 9092

  • spring.cloud.stream.kafka.binder.configuration

바인더로 작성된 모든 클라이언트에 전달 될 클라이언트 속성(프로듀서,컨슈머)의 키/값 맵이다. 이러한 속성은 프로듀서와 컨슈머 모두가 사용한다.

기본값 : empty map

  • spring.cloud.stream.kafka.binder.headers

바인더에 의해 전송되는 사용자 지정 헤더 목록이다.

기본값 : null

  • spring.cloud.stream.kafka.binder.healthTimeout

파티션 정보를 얻는 데 걸리는 시간(초). 

기본값 : 10

  • spring.cloud.stream.kafka.binder.requiredAcks

브로커에서 필요한 ack수이다.(해당 설명은 이전 포스팅에 설명되어있다.)

기본값 : 1

  • spring.cloud.stream.kafka.binder.minPartitionCount

autoCreateTopics,autoAddPartitions true를 설정했을 경우에만 적용되는 설정이다. 바인더가 데이터를 생성하거나 소비하는 주제에 대해 바인더가 구성하는 최소 파티션 수이다.

기본값 : 1

  • spring.cloud.stream.kafka.binder.replicationFactor

autoCreateTopics true를 설정했을 경우에 자동 생성된 토픽 복제요소 수이다.

기본값 : 1

  • spring.cloud.stream.kafka.binder.autoCreateTopics

true로 설정하면 토픽이 존재하지 않을 경우 자동으로 토픽을 만들어준다. 만약 false로 설정되어있다면 미리 토픽이 생성되어 있어야한다.

기본값 : true

  • spring.cloud.stream.kafka.binder.autoAddPartitions

true로 설정하면 바인더가 필요할 경우 파티션을 추가한다. 예를 들어 사용자의 메시지 유입이 증가하여 컨슈머를 추가하여 파티션수가 하나더 늘었다고 가정하자. 그러면 기존의 토픽의 파티션 수는 증설한 파티션의 총수보다 작을 것이고, 이 설정이 true라면 바인더가 자동으로 파티션수를 증가시켜준다.

기본값 : false



Kafka Consumer Properties


spring.cloud.stream.kafka.bindings.<inputChannel>.consumer 접두어가 붙는다.

  • autoRebalanceEnabled

파티션 밸런싱을 자동으로 처리해준다.

기본값 : true

  • autoCommitOffset

메시지가 처리되었을 경우, 오프셋을 자동으로 커밋할지를 설정한다.

기본값 : true

  • startOffset

새 그룹의 시작 오프셋이다. earliest, latest

기본값 : null(==eariest)

  • resetOffsets
consumer의 오프셋을 startOffset에서 제공한 값으로 재설정할지 여부
기본값 : false


Kafka Producer Properties

  • bufferSize

kafka 프로듀서가 전송하기 전에 일괄 처리하려는 데이터 크기이다.

기본값 : 16384

  • batchTimeout

프로듀서가 메시지를 보내기 전에 동일한 배치에 더 많은 메시지가 누적될 수 있도록 대기하는 시간. 예를 들어 버퍼사이즈가 꽉 차지 않았을 경우 얼마나 기다렸다고 메시지 처리할 것인가를 정하는 시간이다.

기본값 : 0


Partitioning with the Kafka Binder


순서가 중요한 메시지가 있을 경우가 있다. 이럴 경우에는 어떠한 키값을 같이 포함시켜 메시지를 발신함으로써 하나의 파티션에만 데이터를 보내 컨슈머쪽에서 데이터의 순서를 정확히 유지하여 데이터를 받아올 수 있다.


1
2
3
#partition-key-expression
spring.cloud.stream.bindings.exam-output.producer.partition-key-expression=headers['partitionKey']
spring.cloud.stream.bindings.exam-output.producer.partition-count=4
cs


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
    @Autowired
    private ExamProcessor processor;
    
    public void sendMessage(Exam exam,String partitionId) {
        log.info("Sending Message = {}",exam.toString());
        
        MessageChannel outputChannel = processor.outboundChannel();
        
        outputChannel.send(MessageBuilder
                                    .withPayload(exam)
                                    .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                                    .setHeader("partitionKey", partitionId)
                                    .build());
    }
 
    @StreamListener(ExamProcessor.INPUT)
    public void listenMessage(@Payload Exam payload,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int header) {
        log.info("input message = {} partition_id= {}",payload.toString(),header);
    }
cs


application.properties에 파티션키로 사용할 헤더의 키값을 설정하고, 프로듀서 쪽에서 키값을 포함시켜 데이터를 보낸 후에 컨슈머쪽에서 어떠한 파티션에서 데이터를 받았는지 확인해본다. 키값을 동일하게 유지한채 메시지를 보내면 하나의 파티션에서만 메시지를 받아온다. 그러나 파티션 키값을 적용하지 않고 메시지를 보내면 컨슈머가 받아오는 파티션 아이디는 계속해서 변경이 된다.


이번 포스팅은 내용이 조금 길어져서 여기까지만 작성하고 다음 포스팅에서 Spring Cloud Stream 기반에서 사용하는 kafka stream API에 대해 포스팅할 것이다. 이번 포스팅에서는 많은 설명들이 레퍼런스에 비해 빠져있다. 아는 내용은 작성하고 모르는 내용은 포함시킬경우 틀릴 가능성이 있기 때문에 레퍼런스와 비교해 내용에 차이가 있다. 혹시나 더 많은 설명이 필요하다면 직접 레퍼런스를 보면 될것 같다.

posted by 여성게
:
Middleware/Kafka&RabbitMQ 2019. 3. 26. 00:07

Kafka - Kafka Streams API(카프카 스트림즈)



카프카는 대규모 메시지를 저장하고 빠르게 처리하기 위해 만들어진 제품이다. 초기 사용 목적과는 다른 뛰어난 성능에 일련의 연속된 메시지인 스트림을 처리하는 데도 사용이 되기 시작했다. 이러한 스트림을 카프카는 Kafka Streams API를 통해 제공한다.


설명하기 앞서 우선 스트림 프로세싱과 배치 프로세싱의 차이점이란 무엇일까? 스트림 프로세싱(Stream Processing)은 데이터들이 지속적으로 유입되고 나가는 과정에서 이 데이터에 대한 일련의 처리 혹은 분석을 수행하는 것을 의미한다. 즉, 스트림 프로세싱은 실시간 분석(Real Time Analysis)이라고 불리기도 한다. 스트림 프로세싱과는 대비되는 개념으로 배치(Batch)처리 또는 정적 데이터(Data-at-rest)처리를 들 수 있다. 배치 및 정적 데이터 처리란 위의 스트림 프로세싱과는 다르게 데이터를 한번에 특정 시간에 처리한다라는 특징이 있다. 주로 사용자의 요청이 몰리지 않는 새벽 시간대에 많이 수행하기도 한다.(물론 무조건 그렇다고는 할 수 없다) 그렇지만 사실 뭐가 좋고 나쁨은 이야기 할 수 없다. 스트림과 배치의 서로의 장단점이 있기 때문이다. 하지만 요즘은 역시나 실시간 데이터 처리가 각광받고 있는 사실은 숨길 수 없다.


<특정 이벤트 발생시 바로바로 애플리케이션에서 데이터처리를 하고 있다.>


그렇다면 스트림 프로세싱의 장점은 무엇이 있을까? 우선은 애플리케이션이 이벤트에 즉각적으로 반응을 하기 때문에 이벤트 발생과 분석,조치에 있어 거의 지연시간이 발생하지 않는다. 또한 항상 최신의 데이터를 반영한다라는 특징이 있다. 그리고 데이터를 어디에 담아두고 처리하지 않기 때문에 대규모 공유 데이터베이스에 대한 요구를 줄일 수 있고, 인프라에 독립적인 수행이 가능하다.



상태 기반과 무상태 스트림 처리

스트림 프로세싱에는 상태 기반과 무상태 스트림 처리가 있다. 차이점이란 실시간 데이터 처리를 위하여 이전에 분석된 데이터의 결과가 필요한지이다. 이렇게 상태를 유지할 스트림 프로세싱은 이벤트를 처리하고 그 결과를 저장할 상태 저장소가 필요하다. 이와는 반대로 무상태 스트림 처리는 이전 스트림의 처리 결과와 관계없이 현재 데이터로만 처리를 한다.




카프카 스트림즈(Kafka Streams API)



카프카 스트림즈는 카프카에 저장된 데이터를 처리하고 분석하기 위해 개발된 클라이언트 라이브러리이다. 카프카 스트림즈는 이벤트 시간과 처리 시간을 분리해서 다루고 다양한 시간 간격 옵션을 지원하기에 실시간 분석을 간단하지만 효율적으로 진행할 수 있다. 우선 카프카 스트림즈에 대해 설명하기 전에 용어 정리를 할 필요가 있을 것같다.


용어 

설명 

스트림(Stream) 

스트림은 카프카 스트림즈 API를 사용해 생성된 토폴로지로, 끊임없이 전달되는 데이터 세트를 의미한다. 스트림에 기록되는 단위는 key-value 형태이다. 

스트림 처리 애플리케이션(Stream Processing Application) 

카프카 스트림 클라이언트를 사용하는 애플리케이션으로서, 하나 이상의 프로세서 토폴로지에서 처리되는 로직을 의미한다. 프로세서 토폴로지는 스트림 프로세서가 서로 연결된 그래프를 의미한다. 

스트림 프로세서(Stream Processor) 

프로세서 토폴로지를 이루는 하나의 노드를 말하여 여기서 노드들은 프로세서 형상에 의해 연결된 하나의 입력 스트림으로부터 데이터를 받아서 처리한 다음 다시 연결된 프로세서에 보내는 역할을 한다. 

 소스 프로세서(Source Processor)

위쪽으로 연결된 프로세서가 없는 프로세서를 말한다. 이 프로세서는 하나 이상의 카프카 토픽에서 데이터 레코드를 읽어서 아래쪽 프로세서에 전달한다. 

싱크 프로세서(Sink Processor) 

토폴로지 아래쪽에 프로세서 연결이 없는 프로세서를 뜻한다. 상위 프로세서로부터 받은 데이터 레코드를 카프카 특정 토픽에 저장한다. 



카프카 스트림즈 아키텍쳐

카프카 스트림즈에 들어오는 데이터는 카프카 토픽의 메시지이다. 각 스트림 파티션은 카프카의 토픽 파티션에 저장된 정렬된 메시지이고, 해당 메시지는 key-value형태이다. 또한 해당 메시지의 키를 통해 다음 스트림(카프카 토픽)으로 전달된다. 위의 그림에서 보듯 카프카 스트림즈는 입력 스트림의 파티션 개수만큼 태스크를 생성한다. 각 태스크에는 입력 스트림(카프카 토픽) 파티션들이 할당되고, 이것은 한번 정해지면 입력 토픽의 파티션에 변화가 생기지 않는 한 변하지 않는다. 마지 컨슈머 그룹 안의 컨슈머들이 각각 토픽 파티션을 점유하는 것과 비슷한 개념이다.



자바를 이용한 간단한 파이프 예제 프로그램

지금부터 진행할 예제는 이전 포스팅에서 구성했던 카프카 클러스터 환경에서 진행한다. 만약 클러스터 구성이 되어 있지않다면 밑의 링크를 참조해 구성하면 될것 같다.

스프링 부트 애플리케이션 하나를 생성한다. 그리고 필요한 라이브러리 의존성을 추가한다. 로그를 사용하기 위하여 롬복 라이브러리를 추가하였다. 혹시 이클립스 환경에서 롬복을 사용하는 방법을 알고 싶다면 밑의 링크를 참조하자.

▶︎▶︎▶︎2019/02/02 - [Java&Servlet] - Mac OS - Eclipse & Lombok(롬복 사용방법)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.4</version>
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>2.0.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>2.0.1</version>
        </dependency>
cs


이번에 만들 프로그램은 간단히 한쪽 토픽에 입력된 값을 다른 쪽 토픽으로 옮기는 역할을 수행한다. 


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
 
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
 
import lombok.extern.slf4j.Slf4j;
 
@Slf4j
public class Pipe {
    
    public static void main(String[] args) {
        
        /*
         * 카프카 스트림 파이프 프로세스에 필요한 설정값
         * StreamsConfig의 자세한 설정값은
         * https://kafka.apache.org/10/documentation/#streamsconfigs 참고
         */
        Properties props = new Properties();
        //카프카 스트림즈 애플리케이션을 유일할게 구분할 아이디
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
        //스트림즈 애플리케이션이 접근할 카프카 브로커정보
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        //데이터를 어떠한 형식으로 Read/Write할지를 설정(키/값의 데이터 타입을 지정) - 문자열
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
        //데이터의 흐름으로 구성된 토폴로지를 정의할 빌더
        final StreamsBuilder builder = new StreamsBuilder();
        
        //streams-*input에서 streams-*output으로 데이터 흐름을 정의한다.
        /*
         * KStream<String, String> source = builder.stream("streams-plaintext-input");
           source.to("streams-pipe-output");
         */
        builder.stream("streams-plaintext-input").to("streams-pipe-output");
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        //만들어진 토폴로지 확인
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
        
        try {
          streams.start();
          System.out.println("topology started");
      
        } catch (Throwable e) {
          System.exit(1);
        }
        System.exit(0);
    }
}
 
cs


우선 Properties 객체를 이용하여 카프카 스트림즈가 사용할 설정값을 입력한다. 우선 StreamsConfig.APPLICATION_ID_CONFIG는 카프카 클러스터 내에서 스트림즈 애플리케이션을 구분하기 위한 유일한 아이디 값을 설정하기 위한 값이다. 그리고 그 밑에 카프카 클러스터 구성이 된 인스턴스를 리스트로 작성한다. 또한 위에서 설명했듯이 카프카 스트림즈는 키-값 형태로 데이터의 흐름이 이루어지기에 해당 키-값을 어떠한 타입으로 직렬화 할 것인지 선택한다. 현재는 String 타입으로 지정하였다. 그 다음은 스트림 토폴로지를 구성해준다. 토폴로지는 StreamsBuilder 객체를 통하여 구성한다. 위의 소스는 "streams-plaintext-input"이라는 토픽에서 데이터를 읽어 "streams-pipe-output"이라는 토픽으로 데이터를 보내는 토폴로지를 구성하였다. 최종적으로 StreamsBuilder.build()를 호출하여 토폴로지 객체를 만든다. 위의 Topology.describe()를 호출하면 해당 토폴로지가 어떠한 구성으로 이루어졌는지 로그로 상세하게 볼 수 있다.


22:07:24.234 [main] INFO com.kafka.exam.streams.Pipe - Topology info = Topologies:

   Sub-topology: 0

    Source: KSTREAM-SOURCE-0000000000 (topics: [streams-plaintext-input])

      --> KSTREAM-SINK-0000000001

    Sink: KSTREAM-SINK-0000000001 (topic: streams-pipe-output)

      <-- KSTREAM-SOURCE-0000000000


그 다음 KafkaStreams 객체를 생성한다. 매개변수로 토폴로지와 설정 객체를 전달한다.  마지막으로 KafkaStreams.start();를 호출하여 카프카 스트림을 시작한다. 해당 예제에서는 명시하지 않았지만 스트림즈를 종료하려면 close() 해주는 코드도 삽입해야한다.



카프카가 제공하는 콘솔 프로듀서,컨슈머를 이용하여 위의 스트림즈 코드를 수행해보았다. input 토픽에서 스트림즈 애플리케이션이 메시지를 꺼내서 output 토픽으로 잘 전달한것을 볼 수 있다.(>"hi hello") 컨슈머 실행시 세팅한 설정은 직관적으로 해석 가능하므로 따로 설명하지는 않는다.


방금은 단순히 메시지를 받아 다른 쪽 토픽으로 전달만 하는 예제이지만 다음 해볼 예제는 중간에 데이터를 처리하여 output 토픽으로 보내는 행 분리 예제 프로그램이다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
 
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
 
import lombok.extern.slf4j.Slf4j;
 
@Slf4j
public class LineSplit {
    public static void main(String[] args) {
        /*
         * 카프카 스트림 파이프 프로세스에 필요한 설정값
         * StreamsConfig의 자세한 설정값은
         * https://kafka.apache.org/10/documentation/#streamsconfigs 참고
         */
        Properties props = new Properties();
        //카프카 스트림즈 애플리케이션을 유일할게 구분할 아이디
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
        //스트림즈 애플리케이션이 접근할 카프카 브로커정보
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        //데이터를 어떠한 형식으로 Read/Write할지를 설정(키/값의 데이터 타입을 지정) - 문자열
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
        //데이터의 흐름으로 구성된 토폴로지를 정의할 빌더
        final StreamsBuilder builder = new StreamsBuilder();
        
        KStream<StringString> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
            .to("streams-linesplit-output");
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        //만들어진 토폴로지 확인
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
 
        try {
          streams.start();
          System.out.println("topology started");
        } catch (Throwable e) {
          System.exit(1);
        }
        System.exit(0);
    }
}
 
cs


설정 값은 동일하다 하지만 토폴로지에 로직이 추가된 것을 볼 수 있다. 우선 "streams-plaintext-input" 토픽으로 들어온 메시지를 KStream객체로 만들어준다. 그리고 flatMapValues()를 이용하여 데이터를 공백 단위로 쪼개는 것을 볼 수 있다. 여기서 flatMapValues()를 쓴 이유는 여기서 전달되는 value는 하나의 스트링 객체인데 이것을 List<String>으로 변환을 하였다. 이것을 flatMapValues()를 이용해 리스트의 각 요소를 스트링으로 flat하는 메소드이다. 즉, 메시지를 받아서 flatMapValues()를 통해 새로운 데이터 값이 만들어 졌다. 그러면 .to()에 전달되는 데이터 스트림은 공백단위로 짤린 단어 스트링이 전달될 것이다. 이전 예제와는 다르게 단순 데이터 전달만이 아니라 전달 이전에 적절한 처리를 하나 추가 해준 것이다. 만약 키와 값 모두를 새롭게 만들어서 사용하고 싶다면 flatMap() 메소드를 이용하면 된다. 그 밖에 메소드들을 알고 싶다면 공식 홈페이지를 참고하길 바란다.


22:49:08.941 [main] INFO com.kafka.exam.streams.LineSplit - Topology info = Topologies:

   Sub-topology: 0

    Source: KSTREAM-SOURCE-0000000000 (topics: [streams-plaintext-input])

      --> KSTREAM-FLATMAPVALUES-0000000001

    Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])

      --> KSTREAM-SINK-0000000002

      <-- KSTREAM-SOURCE-0000000000

    Sink: KSTREAM-SINK-0000000002 (topic: streams-linesplit-output)

      <-- KSTREAM-FLATMAPVALUES-0000000001


토폴로지 정보를 보아도 중간에 Processor라는 이름으로 하나의 처리 프로세스가 추가된 것을 볼 수 있다.



hi hello 라는 메시지를 보냈는데 hi / hello 로 짤려서 출력된 것을 볼 수 있다. 


필자는 챗봇을 개발하고 있는데 카프카 스트림즈를 이용하여 간단히 사용자 질문 로그들을 분석하여 형태소 분리된 형태로 데이터를 저장할 수도 있을 것 같아서 간단히 예제로 짜보았다. NoriAnalyzer는 필자가 Nori 형태소 분석기를 이용한 간단히 형태소분석 유틸 클래스를 만든것이다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
 
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
 
import lombok.extern.slf4j.Slf4j;
import rnb.analyzer.nori.analyzer.NoriAnalyzer;
 
@Slf4j
public class LineSplit {
    public static void main(String[] args) {
        /*
         * 카프카 스트림 파이프 프로세스에 필요한 설정값
         * StreamsConfig의 자세한 설정값은
         * https://kafka.apache.org/10/documentation/#streamsconfigs 참고
         */
        Properties props = new Properties();
        //카프카 스트림즈 애플리케이션을 유일할게 구분할 아이디
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
        //스트림즈 애플리케이션이 접근할 카프카 브로커정보
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        //데이터를 어떠한 형식으로 Read/Write할지를 설정(키/값의 데이터 타입을 지정) - 문자열
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        
        //데이터의 흐름으로 구성된 토폴로지를 정의할 빌더
        final StreamsBuilder builder = new StreamsBuilder();
        //Nori 형태소 분석기를 이용한 유틸클래스
        NoriAnalyzer analyzer = new NoriAnalyzer();
        
        KStream<StringString> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(analyzer.analyzeForString(value).split(" ")))
            .to("streams-linesplit-output");
        
        //최종적인 토폴로지 생성
        final Topology topology = builder.build();
        
        //만들어진 토폴로지 확인
        log.info("Topology info = {}",topology.describe());
        
        final KafkaStreams streams = new KafkaStreams(topology, props);
 
        try {
          streams.start();
          System.out.println("topology started");
        } catch (Throwable e) {
          System.exit(1);
        }
    }
}
 
cs



이렇게 추후에는 형태소분리된 모든 단어가 아니라 특정 명사만 추출하여 사용자들의 질문중 가장 많이 포함된 명사들을 뽑아내 데이터를 분석할 수도 있을 것같다.


지금까지의 예제는 바로 무상태 스트림 프로세싱이다. 다음 예제는 이전 데이터 처리에 대한 상태를 참조하여 데이터를 처리하는 단어 빈도수 세기 프로그램 예제이다. 


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
 
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.state.KeyValueStore;
 
import lombok.extern.slf4j.Slf4j;
import rnb.analyzer.nori.analyzer.NoriAnalyzer;
 
@Slf4j
public class WordCounter {
    
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
      
        final StreamsBuilder builder = new StreamsBuilder();
        
        NoriAnalyzer analyzer = new NoriAnalyzer();
      
        KStream<StringString> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(analyzer.analyzeForString(value).split(" ")))
                //return하는 value가 키가 된다.
                .groupBy(new KeyValueMapper<StringStringString>() {
                  @Override
                  public String apply(String key, String value) {
                    log.info("key = {},value = {}",key,value);
                    return value;
                  }
                })
                //count()를 호출하여 해당 키값으로 몇개의 요소가 있는지 체크한다. 그리고 해당 데이터를 스토어에(KeyValueStore<Bytes,byte[]> counts-store) 담고

  //변경이 있는 KTable에 대해서만 결과값을 리턴한다.

                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
                .toStream()
                .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
      
        final Topology topology = builder.build();
        System.out.println(topology.describe());
      
        final KafkaStreams streams = new KafkaStreams(topology, props);
      
        try {
          streams.start();
        } catch (Throwable e) {
          System.exit(1);
        }
    }
}
 
cs


설정값은 동일하다. 조금 달라진 것은 데이터처리부이다. 우선은 입력받은 데이터를 형태소분리하여 토큰 단위로 스트림을 만든다. 그리고 groupBy를 이용하여 토큰값을 키로 하여 스트림을 그룹핑한다. 그 다음 count()를 통해 같은 키값으로 몇개의 요소가 있는 지를 KeyValueStore에 담고, 만약 키값스토어의 값이 변경이 있을 때만 다운스트림으로 내려준다.(key == null은 무시한다) 처음에는 해당 단어:빈도수가 출력이 되지만 이후에는 이 단어가 또 들어오지 않는다면 출력되지 않고 해당 단어가 들어오면 키값 저장소에 빈도수가 변경이 되므로 다운스트림으로 내려준다. 이말은 무엇이냐면 즉, 키값스토어가 이전 데이터 처리의 상태를 담고 있는 저장소가 되어 이후 데이터에서 참조되어 사용되는 것이다.(사실 더 자세한 것은 API문서를 봐야할 듯하다.)




일부 출력이 안된 것들은 토픽의 버퍼가 원하는 용량에 도달하지 못해 아직 출력하지 못한 데이터들이다. 이렇게 상태를 유지하여 스트림 프로세스를 구성할 수도 있다. 


지금까지 아주 간단하게 카프카 스트림즈 API에 대해 다루었다. 사실 실무에서 이용할 수 있을 만큼의 예제는 아니지만 카프카가 이렇게 실시간 데이터 스트림에도 이용될 수 있다는 것을 알았다면 추후에 API문서를 참조하여 카프카를 응용할 수 있을 것같다.

posted by 여성게
:
Middleware/Kafka&RabbitMQ 2019. 3. 24. 13:54

Kafka - Kafka Consumer(카프카 컨슈머) Java&CLI



이전 포스팅에서 kafka producer를 java 소스기반으로 예제를 짜보았습니다. 이번 포스팅은 kafka consumer를 java 소스로 다루어보려고 합니다.

Kafka Producer(카프카 프로듀서)가 메시지를 생산해서 카프카의 토픽으로 메시지를 보내면 그 토픽의 메시지를 가져와서 소비(consume)하는 역할을 하는 애플리케이션, 서버 등을 지칭하여 컨슈머라고 한다. 컨슈머의 주요 기능은 특정 파티션을 관리하고 있는 파티션 리더에게 메시지를 가져오기 요청을 하는 것이다. 각 요청은 컨슈머가 메시지 오프셋을 명시하고 그 위치로부터 메시지를 수신한다. 그래서 컨슈머는 가져올 메시지의 위치를 조정할 수 있고, 필요하다면 이미 가져온 데이터도 다시 가져올 수 있다. 이렇게 이미 가져온 메시지를 다시 가져올 수 있는 기능은 여타 다른 메시지 시스템(래빗엠큐,RabbitMQ)들은 제공하지 않는 기능이다.(내부 구동방식이 다른 이유도 있음) 최근의 메시지큐 솔루션 사용자들에게 이러한 기능은 필수가 되고 있다. 

카프카에서 컨슈머라고 불리는 컨슈머는 두 가지 종류가 있는데, 올드 컨슈머(Old Consumer)와 뉴 컨슈머(New Consumer)이다. 두 컨슈머의 큰 차이점은 오프셋에 대한 주키퍼 사용 유무이다. 구 버전의 카프카에서는 컨슈머의 오프셋을 주키퍼의 znode에 저장하는 방식을 지원하다가 카프카 0.9버전부터 컨슈머의 오프셋 저장을 주키퍼가 아닌 카프카의 도픽에 저장하는 방식으로 변경되었다. 아마 성능상의 이슈때문에 오프셋 저장 정책을 바꾼것 같다. 두가지 방식을 특정 버전이전까지는 지원하겠지만 아마 추후에는 후자(뉴 컨슈머)의 방식으로 변경되지 않을까싶다. 코드 레벨로 카프카 컨슈머를 다루기전 카프카 컨슈머의 주요 옵션을 본다.




컨슈머 주요 옵션(Consumer option)

-bootstrap.servers : 카프카 클러스터에 처음 연결을 하기 위한 호스트와 포트 정보로 구성된 리스트 정보이다.


-fetch.min.bytes : 한번에 가져올 수 있는 최소 데이터 사이즈이다. 만약 지정한 사이즈보다 작은 경우, 요청에 대해 응답하지 않고 데이터가 누적될 때까지 기다린다.


-group.id : 컨슈머가 속한 컨슈머 그룹을 식별하는 식별자이다.


-enable.auto.commit : 백그라운드에서 주기적으로 오프셋을 자동 커밋한다.


-auto.offset.reset : 카프카에서 초기 오프셋이 없거나 현재 오프셋이 더 이상 존재하지 않은 경우에 다음 옵션으로 리셋한다.

1)earliest : 가장 초기의 오프셋값으로 설정

2)latest : 가장 마지막의 오프셋값으로 설정

3)none : 이전 오프셋값을 찾지 못하면 에러를 발생


-fetch.max.bytes : 한번에 가져올 수 있는 최대 데이터 사이즈


-request.timeout.ms : 요청에 대해 응답을 기다리는 최대 시간


-session.timeout.ms : 컨슈머와 브로커사이의 세션 타임 아웃시간. 브로커가 컨슈머가 살아있는 것으로 판단하는 시간(기본값 10초) 만약 컨슈머가 그룹 코디네이터에게 하트비트를 보내지 않고 session.timeout.ms이 지나면 해당 컨슈머는 종료되거나 장애가 발생한 것으로 판단하고 컨슈머 그룹은 리밸런스(rebalance)를 시도한다. session.timeout.ms는 하트비트 없이 얼마나 오랫동안 컨슈머가 있을 수 있는지를 제어하는 시간이며, 이 속성은 heartbeat.interval.ms와 밀접한 관련이 있다. session.timeout.ms를 기본값보다 낮게 설정하면 실패를 빨리 감지 할 수 있지만, GC나 poll 루프를 완료하는 시간이 길어지게 되면 원하지 않게 리밸런스가 일어나기도 한다. 반대로는 리밸런스가 일어날 가능성은 줄지만 실제 오류를 감지하는 데 시간이 오래 걸릴 수 있다.


-hearbeat.interval.ms : 그룹 코디네이터에게 얼마나 자주 KafkaConsumer poll() 메소드로 하트비트를 보낼 것인지 조정한다. session.timeout.ms와 밀접한 관계가 있으며 session.timeout.ms보다 낮아야한다. 일반적으로 1/3 값정도로 설정한다.(기본값 3초)


-max.poll.records : 단일 호출 poll()에 대한 최대 레코드 수를 조정한다. 이 옵션을 통해 애플리케이션이 폴링 루프에서 데이터를 얼마나 가져올지 양을 조정할 수 있다.


-max.poll.interval.ms : 컨슈머가 살아있는지를 체크하기 위해 하트비트를 주기적으로 보내는데, 컨슈머가 계속해서 하트비트만 보내고 실제로 메시지를 가져가지 않는 경우가 있을 수도 있다. 이러한 경우 컨슈머가 무한정 해당 파티션을 점유할 수 없도록 주기적으로 poll을 호출하지 않으면 장애라고 판단하고 컨슈머 그룹에서 제외한 후 다른 컨슈머가 해당 파티션에서 메시지를 가져갈 수 있게한다.


-auto.commit.interval.ms : 주기적으로 오프셋을 커밋하는 시간


-fetch.max.wait.ms : fetch.min.bytes에 의해 설정된 데이터보다 적은 경우 요청에 응답을 기다리는 최대 시간


기타 많은 설정들이 있다. 혹시나 인증과 ssl 등의 다양한 설정들을 알고 싶다면 공식 홈페이지를 참고하길 바란다.(https://kafka.apache.org/documentation/#consumerconfigs)




콘솔 컨슈머로 메시지 가져오기


카프카는 기본적으로 콘솔로 메시지를 가져올 수 있는 명령어를 제공한다. 우선 진행하기 앞서 예제로 진행하려는 환경은 이전 포스팅에서 구성해보았던 카프카 클러스터 환경에서 진행한다. 만약 카프카 클러스터 환경을 구성해본적이 없다면 밑의 링크를 참고하기 바란다.


▶︎▶︎▶︎2019/03/13 - [Kafka&RabbitMQ] - Kafka - Kafka(카프카) cluster(클러스터) 구성 및 간단한 CLI사용법


클러스터 환경 구축 후에 예제로 토픽하나를 생성했다는 가정하게 진행한다.



Kafka console producer로 접속하여 메시지를 보내보면 위의 컨슈머로 메시지가 전달 될 것이다. 이러한 컨슈머를 실행할 때는 항상 컨슈머 그룹이라는 것이 필요하다. 토픽의 메시지를 가져오기 위한 kafka-console-consumer.sh 명령어를 실행하면서 추가 옵션으로 컨슈머 그룹 이름을 지정해야 하는데, 만약 추가 옵션을 주지 않고 실행한 경우에는 자동으로 console-consumer-xxxxx(숫자)로 컨슈머 그룹이 생성된다.



필자가 예제로 생성한 그룹아이디도 보인다. 이렇게 그룹아이디 리스트를 볼 수 있는 명령어도 제공한다. 혹시나 모르시는 분들을 위해 토픽 리스트를 볼 수 있는 명령어도 있다.



보면 필자가 사용하고 있는 카프카 버전은 뉴 컨슈머를 이용하는 것을 알 수 있다. 바로 메시지 오프셋을 저장하기 위한 __consumer_offsets가 토픽에 존재하기 때문이다.



또한 컨슈머를 실행시킬 때에 위에서 설명한 것과 같이 그룹아이디 값 옵션을 추가하여 실행시킬 수도 있다.



자바코드를 이용한 컨슈머


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import java.util.Arrays;
import java.util.Properties;
import java.time.Duration;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
 
import lombok.extern.slf4j.Slf4j;
 
/**
 * Kafka Consumer Exam
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaBookConsumer1 {
    
    public static Properties init() {
        Properties props = new Properties();
        props.put("bootstrap.servers""localhost:9092,localhost:9093,localhost:9094,localhost:9095");
        props.put("group.id""exam-consumer-group");
        props.put("enable.auto.commit""true");
        props.put("auto.offset.reset""latest");
        props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }
    
    /**
     * 컨슈머예제
     */
    public static void consume() {
        
        try (KafkaConsumer<StringString> consumer = new KafkaConsumer<>(init());){
            //토픽리스트를 매개변수로 준다. 구독신청을 한다.
            consumer.subscribe(Arrays.asList("test-topic"));
            while (true) {
              //컨슈머는 토픽에 계속 폴링하고 있어야한다. 아니면 브로커는 컨슈머가 죽었다고 판단하고, 해당 파티션을 다른 컨슈머에게 넘긴다.
              ConsumerRecords<StringString> records = consumer.poll(Duration.ofMillis(100));
              for (ConsumerRecord<StringString> record : records)
                log.info("Topic: {}, Partition: {}, Offset: {}, Key: {}, Value: {}\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
            }
        } finally {}
    }
        
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        consume();
    }
 
}
 
cs


소스에 대해 간단히 설명하면 우선 컨슈머 옵션들을 Properties 객체로 정의한다. 오프셋 리셋 옵션을 lastest로 주어 토픽의 가장 마지막부터 메시지를 가져온다. 메시지의 키와 값의 역직렬화 옵션을 문자열로 해준다. 이렇게 컨슈머 옵션을 설정하고 해당 옵션값을 이용하여 KafkaConsumer 객체를 생성해준다. 해당 객체는 사용이 후에 꼭 close 해줘야 하기 때문에 try문 안에 객체 생성코드를 넣어놔 자동으로 자원반납을 할 수 있게 해주었다. 그리고 해당 컨슈머 객체로 토픽에 대해 구독신청을 해준다. 구독 대상이 되는 토픽은 리스트로 여러개 지정가능하다. 이것 또한 카프카의 장점중 하나이다.(하나의 컨슈머가 여러 메시지큐를 구독하는 것)

그리고 해당 컨슈머는 계속해서 폴링한다. 무한 루프로 계속해서 폴링하지 않으면 컨슈머가 종료된 것으로 간주되어 컨슈머에 할당된 파티션은 다른 컨슈머에게 전달되고 다른 컨슈머에 의해 메시지가 컨슘된다. poll() 메소드의 매개변수는 타임아웃 주기이다. 이 타임아웃주기는 데이터가 컨슈머 버퍼에 없다면 poll()은 얼마 동안 블록할지를 조정하는 것이다. 또한 poll()은 레코드 전체를 리턴하고, 레코드에는 토픽,파티션,파티션의 오프셋,키,값을 포함하고 있다. 한 번에 하나의 메시지만 가져오는 것이 아니라 여러개의 메시지를 가져오기 때문에 for-each로 데이터들을 처리하고 있다.




메시지의 순서


파티션이 3개인 토픽을 새로 만들어서 해당 토픽으로 메시지를 a,b,c,d,e 순서대로 보내보았다. 그리고 컨슈머 실행후 결과를 보니 a,d,b,e,c 순서대로 메시지가 들어왔다. 컨슈머에 문제가 있는 것인가? 아니면 진짜 내부적으로 문제가 있어서 순서가 보장되지 않은 것인가 궁금할 수 있다. 하지만 지극히 정상인 결과값이다.


먼저 해당 토픽에 메시지가 어떻게 저장되어 있는지 부터 확인해봐야 한다. 해당 토픽은 파티션이 3개로 구성되어 있기 때문에 각 파티션별로 메시지가 어떻게 저장되어 있는지 확인해봐야한다.



0번 파티션에는 b,e 1번 파티션에는 a,d 2번 파티션에는 c 데이터가 저장되어 있는 것을 볼수 있다. 즉, 컨슈머는 프로듀서가 어떤 순서대로 메시지를 보내는지 알수 없다. 단지 파티션의 오프셋 기준으로만 메시지를 가져올 뿐이다. 이말은 무엇이냐면, 카프카 컨슈머에서의 메시지 순서는 동일한 파티션내에서만 유지되고 파티션끼리의 메시지 순서는 보장하지 않는 것이다. 


카프카를 사용하면서 메시지의 순서를 보장해야 하는 경우에는 토픽의 파티션 수를 1로 설정한다. 하지만 단점도 존재한다. 메시지의 순서는 보장되지만 파티션 수가 하나이기 때문에 분산해서 처리할 수 없고 하나의 컨슈머에서만 처리할 수 있기 때문에 처리량이 높지 않다. 즉 처리량이 높은 카프카를 사용하지만 메시지의 순서를 보장해야 한다면 파티션 수를 하나로 만든 토픽을 사용해야 하며, 어느 정도 처리량이 떨어지는 부분은 감수해야한다.





컨슈머 그룹

카프카의 큰 장점 중 하나는, 하나의 토픽에 여러 컨슈머 그룹이 동시에 접속해 메시지를 가져 올 수 있다는 것이다. 이것은 기존의 다른 메시징큐 솔루션에서 컨슈머가 메시지를 가져가면 큐에서 삭제되어 다른 컨슈머가 가져갈 수 없다는 것과는 다른 방식인데 이 방식이 좋은 이유는 최근에 하나의 데이터를 다양한 용도로 사용하는 요구가 많아졌기 때문이다. 카프카가 이러한 기능제공이 가능한 이유는 파일시스템방식을 채택했기 이고, 컨슈머 그룹마다 각자의 오프셋을 별도로 관리하기 때문에 하나의 토픽에 두개의 컨슈머 그룹뿐만 아니라 더 많은 컨슈머 그룹이 연결되어도 다른 컨슈머 그룹에게 영향이 없이 메시지를 가져갈 수 있다. 이렇게 여러개의 컨슈머 그룹이 동시에 하나의 토픽에서 메시지를 가져갈 때는 컨슈머 그룹아이디를 서로 유일하게 설정해주어야한다.
 이전 이야기는 똑같은 데이터에 대해 요구가 다른 처리를 하기 위한 이야기 였다면, 이제 이야기 하려고하는 것은 하나의 토픽메시지를 여러개의 컨슈머가 나누어 처리하는 이야기이다.

만약 토픽에 a,b,c,d,e 라는 메시지가 들어왔고, 컨슈머는 한번에 하나의 메시지밖에 처리를 하지 못한다고 가정해보자. 그렇다면 컨슈머는 총 5번의 읽는 행위를 해야한다. 점차 메시지가 들어오는 양이 많아지면 해당 컨슈머의 처리 지연에 대한 영향은 점점 커질 것이다. 이러한 점을 해야결하기 위하여 하나의 컨슈머 그룹에 하나의 컨슈머가 아니라 여러 컨슈머를 그룹에 포함시켜 a,b,c,d,e 라는 메시지를 적절히 나누어 처리하게 하는 방법이다.

기본적으로 컨슈머 그룹 안에서 컨슈머들은 메시지를 가져오고 있는 토픽의 파티션에 대해 소유권을 공유한다. 컨슈머 그룹 내 컨슈머의 수가 보족해 프로듀서가 전송하는 메시지를 처리하지 못하는 경우에는 컨슈머를 추가해야하며, 추가 컨슈머를 동일한 컨슈머 그룹 내에 추가시키면 하나의 컨슈머가 가져오고 있던 메시지를 적절하게 나누어 가져오기 시작한다.만약 위의 그림처럼 만약 Consumer Group A가 C1만 있었고, C2라는 컨슈머를 동일한 그룹내에 추가했다면 P2,P3의 소유권이 C1에서 C2로 이동한다. 이것이 초반에 이야기 했던 리밸런스라고한다. 이렇게 리밸런스라는 기능을 통해 컨슈머를 쉽고 안전하게 추가할 수 있고 제거할 수도 있어 높은 가용성과 확장성을 확보할 수 있다. 하지만 이러한 리밸런스도 단점은 있다. 리밸런스를 하는 동안 일시적으로 컨슈머는 메시지를 가져올 수 없다. 그래서 리밸런스가 발생하면 컨슈머 그룹 전체가 일시적으로 사용할 수 없다는 단점이 있다.


위의 그림에서 Consumer Group B에 4개의 컨슈머가 있음에도 불구하고 처리가 계속 지연된다면 어떻게 될까? 이전처럼 해당 컨슈머 그룹에 컨슈머만 추가하면 될까? 아니다. 이뉴는 토픽의 파티션에는 하나의 컨슈머만 연결 할 수 있기 때문이다. 이말은 즉슨, 토픽의 파티션 수 만큼 최대 컨슈머 수가 연결될수 있는 것이다. 토픽의 파티션 수와 동일하게 컨슈머 수를 늘렸는데도 프로듀서가 보내는 메시지의 속도를 따라가지 못한다면 컨슈머만 추가하는 것이 아니라, 토픽의 파티션 수까지 늘려주고 컨슈머 수도 늘려줘야한다. 


이번에는 잘 동작하던 컨슈머 그룹 내에서 컨슈머 하나가 다운되는 경우를 생각해보자. 컨슈머가 컨슈머 그룹 안에서 멤버로 유지하고 할당된 파티션의 소유권을 유지하는 방법은 하트비트를 보내는 것이다. 반대로 생각해보면, 컨슈머가 일정한 주기로 하트비트를 보내다는 사실은 해당 파티션의 메시지를 잘 처리하고 있다는 것이다. 하트비트는 컨슈머가 poll 할때와 가져간 메시지의 오프셋을 커밋할 때 보내게 된다. 만약 컨슈머가 오랫동안 하트비트를 보내지 않으면 세션은 타임아웃되고, 해당 컨슈머가 다운되었다고 판단하여 리밸런스가 일어난다. 그 이후 다른 컨슈머가 다운된 컨슈머의 파티션의 할당 파티션을 맡게 되는 것이다.




커밋과 오프셋

컨슈머가 poll을 호출할 때마다 컨슈머 그룹은 카프카에 저장되어 있는 아직 읽지 않은 메시지를 가져온다. 이렇게 동작할 수 있는 것은 컨슈머 그룹이 메시지를 어디까지 가져갔는지 알 수 있기 때문이다. 컨슈머 그룹의 컨슈머들은 각각의 파티션에 자신이 가져간 메시지의 위치 정보(오프셋)을 기록한다. 각 파티션에 대해 현재 위치를 업데이트하는 동작을 커밋한다고 한다. 카프카는 각 컨슈머 그룹의 파티션별로 오프셋을 저장하기 위하여 __consumer_offsets 토픽을 만들고 오프셋을 저장한다. 리밸런스가 일어난 후 각각의 컨슈머는 이전에 처리했던 토픽의 다른 파티션을 할당 받게 되고 컨슈머는 새로운 파티션에 대해 가장 최근에 커밋된 오프셋을 일고 그 이후부터 메시지들을 가져오기 시작한다.


자동커밋

오프셋을 직접 관리해도 되지만, 각 파티션에 대한 오프셋 정보관리, 파티션 변경에 대한 관리 등이 매우 번거로울 수 있다. 그래서 카프카에서는 자동커밋 기능을 제공해준다. 자동 커밋을 사용하고 싶을 때는 컨슈머 옵션 중 enable.auto.commit=true로 설정하면 5초마다 컨슈머는 poll을 호출할 때 가장 마지막 오프셋을 커밋한다. 5초 주기는 기본 값이며, auto.commit.interval.ms 옵션을 통해 조정이 가능하다. 컨슈머는 poll을 요청할 때마다 커밋할 시간이 아닌지 체크하게 되고, poll 요청으로 가져온 마지막 오프셋을 커밋한다. 하지만 이 옵션을 사용할 경우 커밋 직전 리밸런스등의 작업이 일어나면 동일한 메시지에 대한 중복처리가 일어날 수 있다. 물론 자동커밋 주기를 작게 잡아 최대한 중복을 줄일 수 있지만 중복등을 완전하게 피할 수 는없다.


수동커밋

경우에 따라 자동 커밋이 아닌 수동 커밋을 사용해야하는 경우도 있다. 이러한 경우는 메시지 처리가 완료될 때까지 메시지를 가져온 것으로 간주되어서는 안되는 경우에 사용한다. 자동 커밋을 사용하는 경우라면 자동 커밋의 주기로 인해 일부 메시지들을 데이터베이스에는 자장하지 못한 상태로 컨슈머 장애가 발생한다면 해당 메시지들은 손실될 수도 있다. 이러한 경우를 방지하기 위해 컨슈머가 메시지를 가져오자마자 커밋을 하는 것이 아니라, 데이터베이스에 메시지를 저장한 후 커밋을 해서 위의 문제를 조금이나마 해결할 수 있다.(밑의 소스에서 그룹 아이디와 토픽등은 자신에 환경에 맞게 설정해주시길 바랍니다.)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import java.util.Arrays;
import java.util.Properties;
import java.time.Duration;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
 
import lombok.extern.slf4j.Slf4j;
 
/**
 * Kafka Consumer Exam
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaBookConsumer1 {
    
    public static Properties init() {
        Properties props = new Properties();
        props.put("bootstrap.servers""localhost:9092,localhost:9093,localhost:9094");
        props.put("group.id""yoon-consumer");
        props.put("enable.auto.commit""true");
        props.put("auto.offset.reset""latest");
        props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }
    
    /**
     * 수동커밋
     */
    public static void commitConsume() {
        try (KafkaConsumer<StringString> consumer = new KafkaConsumer<>(init());){
            //토픽리스트를 매개변수로 준다. 구독신청을 한다.
            consumer.subscribe(Arrays.asList("yoon"));
            while (true) {
              //컨슈머는 토픽에 계속 폴링하고 있어야한다. 아니면 브로커는 컨슈머가 죽었다고 판단하고, 해당 파티션을 다른 컨슈머에게 넘긴다.
              ConsumerRecords<StringString> records = consumer.poll(Duration.ofMillis(100));
              for (ConsumerRecord<StringString> record : records)
                log.info("Topic: {}, Partition: {}, Offset: {}, Key: {}, Value: {}\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
              
              /*
               * DB로직(CRUD) 이후 커밋.
               */
              
              consumer.commitSync(); ->수동으로 커밋하여 메시지를 가져온 것으로 간주하는 시점을 자유롭게 조정할 수있다.
            }
        } finally {}
    }
        
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        commitConsume();
    }
 
}
 
cs



특정 파티션 할당

지금까지 컨슈머는 토픽을 subscribe하고, 카프카가 컨슈머 그룹의 컨슈머들에게 직접 파티션을 공정분배했다. 하지만 특별한 경우 특정 파티션에 대해 세밀하게 제어하길 원할 수도 있다. 이럴때에는 특정 컨슈머에게 특정 파티션을 직접 할당할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.kafka.exam;
 
import java.util.Arrays;
import java.util.Properties;
import java.time.Duration;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
 
import lombok.extern.slf4j.Slf4j;
 
/**
 * Kafka Consumer Exam
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaBookConsumer1 {
    
    /**
     * 특정 파티션 할당
     */
    public static void specificPart() {
        Properties props = new Properties();
        props.put("bootstrap.servers""localhost:9092,localhost:9093,localhost:9094");
        //특정 파티션을 할당하기 위해서는 기존과는 다른 그룹아이디 값을 주어야한다. 왜냐하면 컨슈머 그룹내에 같이 있게되면 서로의
        //오프셋을 공유하기 때문에 종료된 컨슈머의 파티션을 다른 컨슈머가 할당받아 메시지를 이어서 가져가게 되고, 오프셋을 커밋하게 된다.
        props.put("group.id""specificPart");
        props.put("enable.auto.commit""false");
        props.put("auto.offset.reset""latest");
        props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
        
        String topicName = "yoon";
        
        try (KafkaConsumer<StringString> consumer = new KafkaConsumer<>(init());){
            //토픽리스트를 매개변수로 준다. 구독신청을 한다.
            TopicPartition partition0 = new TopicPartition(topicName, 0);
            TopicPartition partition1 = new TopicPartition(topicName, 1);
            consumer.assign(Arrays.asList(partition0,partition1));
            
            while (true) {
              //컨슈머는 토픽에 계속 폴링하고 있어야한다. 아니면 브로커는 컨슈머가 죽었다고 판단하고, 해당 파티션을 다른 컨슈머에게 넘긴다.
              ConsumerRecords<StringString> records = consumer.poll(Duration.ofMillis(100));
              for (ConsumerRecord<StringString> record : records)
                log.info("Topic: {}, Partition: {}, Offset: {}, Key: {}, Value: {}\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
              
              /*
               * DB로직(CRUD) 이후 커밋.
               */
              
              consumer.commitSync();
            }
        } finally {}
    }
    
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        specificPart();
    }
 
}
 
cs

특정 오프셋부터 메시지 가져오기


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package com.kafka.exam;
 
import java.util.Arrays;
import java.util.Properties;
import java.time.Duration;
 
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
 
import lombok.extern.slf4j.Slf4j;
 
/**
 * Kafka Consumer Exam
 * @author yun-yeoseong
 *
 */
@Slf4j
public class KafkaBookConsumer1 {
    
    /**
     * 특정파티션에서 특정 오프셋의 메시지 가져오기
     */
    public static void specificOffset() {
        Properties props = new Properties();
        props.put("bootstrap.servers""localhost:9092,localhost:9093,localhost:9094");
        //특정 파티션을 할당하기 위해서는 기존과는 다른 그룹아이디 값을 주어야한다. 왜냐하면 컨슈머 그룹내에 같이 있게되면 서로의
        //오프셋을 공유하기 때문에 종료된 컨슈머의 파티션을 다른 컨슈머가 할당받아 메시지를 이어서 가져가게 되고, 오프셋을 커밋하게 된다.
        props.put("group.id""specificPartAndOffset");
        props.put("enable.auto.commit""false");
        props.put("auto.offset.reset""latest");
        props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
        
        String topicName = "yoon";
        
        try (KafkaConsumer<StringString> consumer = new KafkaConsumer<>(init());){
            //토픽리스트를 매개변수로 준다. 구독신청을 한다.
            TopicPartition partition0 = new TopicPartition(topicName, 0);
            TopicPartition partition1 = new TopicPartition(topicName, 1);
            consumer.assign(Arrays.asList(partition0,partition1));
            //0,1번 파티션의 2번 오프셋의 메시지를 가져와라
            consumer.seek(partition0, 2);
            consumer.seek(partition1, 2);
            while (true) {
              //컨슈머는 토픽에 계속 폴링하고 있어야한다. 아니면 브로커는 컨슈머가 죽었다고 판단하고, 해당 파티션을 다른 컨슈머에게 넘긴다.
              ConsumerRecords<StringString> records = consumer.poll(Duration.ofMillis(100));
              for (ConsumerRecord<StringString> record : records)
                log.info("Topic: {}, Partition: {}, Offset: {}, Key: {}, Value: {}\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
              
              /*
               * DB로직(CRUD) 이후 커밋.
               */
              
              consumer.commitSync();
            }
        } finally {}
    }
    
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        specificOffset();
    }
 
}
 
cs


여기까지 자바소스로 다루어본 카프카 컨슈머 예제들이었습니다. 부족한 점이 많습니다. 혹시나 잘못된 점이 있다면 지적해주시면 감사하겠습니다!

posted by 여성게
:
Web/Spring 2019. 3. 22. 15:45

Spring - Springboot GenericController(제네릭컨트롤러), 컨트롤러 추상화



Web applcation을 개발하면 공통적으로 개발하는 Flow가 있다.



Controller->Service->Repository는 거의 왠만한 웹어플리케이션의 개발 플로우일 것이다. 하지만 이런 플로우 안에서도 거의 모든 비즈니스마다 공통적인 로직이있다. 바로 CRUD이다. 모든 도메인에는 생성,수정,삭제,조회 로직이 들어간다. 이러한 로직이 모든 컨트롤러,서비스 클래스에 도메인마다 작성이 된다면 이것도 중복코드인 것이다. 오늘 포스팅할 주제는 바로 이러한 로직을 추상화한 Generic Controller이다. 사실 아직 많이 부족한 탓에 더 좋은 방법도 있겠지만 나름 혼자 고심하여 개발한 것이기 때문에 만약 잘못된 부분이 있다면 지적을 해주셨음 좋겠다.





Entity Class


1
2
3
4
5
6
7
8
import org.springframework.context.annotation.Configuration;
import org.springframework.data.jpa.repository.config.EnableJpaAuditing;
 
@EnableJpaAuditing
@Configuration
public class JpaConfig {
 
}
cs


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import java.time.LocalDateTime;
 
import javax.persistence.Column;
import javax.persistence.EntityListeners;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.MappedSuperclass;
 
import org.springframework.data.annotation.CreatedDate;
import org.springframework.data.annotation.LastModifiedDate;
import org.springframework.data.jpa.domain.support.AuditingEntityListener;
 
import lombok.Getter;
import lombok.Setter;
 
/**
 * BaseEntity 추상클래스
 * 해당 추상클래스를 상속할때 @Tablegenerator는 상속하는 클래스에서 정의해야함
 * 또한 id field의 칼럼속성도 필요할때에 재정의해야함
 * @author yun-yeoseong
 *
 */
@MappedSuperclass
@EntityListeners(value = { AuditingEntityListener.class })
@Setter
@Getter
public abstract class BaseEntity<extends BaseEntity<?>> implements Comparable<T>{
    
    @Id
    @GeneratedValue(strategy=GenerationType.TABLE, generator = "RNB_SEQ_GENERATOR")
    private long id;
    
    @Column(name="CREATED_DATE",nullable=false,updatable=false)
    @CreatedDate
    private LocalDateTime createdDate;
    
    @Column(name="UPDATED_DATE",nullable=false)
    @LastModifiedDate
    private LocalDateTime modifiedDate;
 
    @Override
    public int compareTo(T o) {
        Long result = getId()-o.getId();
        return result.intValue();
    }
    
}
 
cs


엔티티마다도 공통적인 필드를 가지고 있으므로 BaseEntity 클래스 하나를 정의하였다. 정렬을 위한 compareTo는 자신에 맞게 재정의하면 될 것 같다. @EntityListener 어노테이션을 추가해 AuditingEntityListener 사용을 명시한다. 해당 어노테이션을 붙인 후에 @CreatedDate는 엔티티 생성시점 처음에 생성되는 날짜필드가 되는 것이고, @LastModifiedDate는 엔티티 수정이 될때 매번 수정되는 날짜 필드이다.(@PrePersist,@PreUpdate 등을 사용해도 될듯) 사실 이 두개의 어노테이션말고 @CreatedBy@LastModifiedBy 어노테이션도 존재한다. 이것을 생성자와 수정자를 넣어주는 어노테이션인데, 이것은 스프링 시큐리티의 Principal을 사용한다. 이것은 당장 필요하지 않으므로 구현하지 않았다.(구글링 참조) 그리고 마지막으로는 @Id 필드이다. 해당 생성 전략을 테이블 타입을 이용하였다. 만약 더욱 다양한 타입의 @Id가 필요하다면 아래 링크를 참조하자.


▶︎▶︎▶︎JPA 기본키 매핑 전략!


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import java.io.Serializable;
 
import javax.persistence.AttributeOverride;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Index;
import javax.persistence.Table;
import javax.persistence.TableGenerator;
 
import com.web.rnbsoft.common.jpa.BaseEntity;
 
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
 
/**
 * Intent Entity
 * @author yun-yeoseong
 *
 */
@Entity
@Table(name="RNB_INTENT"
        ,indexes=@Index(columnList="INTENT_NAME",unique=false))
@AttributeOverride(name = "id",column = @Column(name = "INTENT_ID"))
@TableGenerator(name="RNB_SEQ_GENERATOR",table="TB_SEQUENCE",
                pkColumnName="SEQ_NAME",pkColumnValue="RNB_INTENT_SEQ",allocationSize=1)
@Getter
@Setter
@ToString
public class IntentEntity extends BaseEntity<IntentEntity> implements Serializable{
    
    private static final long serialVersionUID = 1864304860822295551L;
    
    @Column(name="INTENT_NAME",nullable=false)
    private String intentName;
    
}
 
cs


BaseEntity를 상속한 엔티티클래스이다. 몇가지 설명을 하자면 우선 BaseEntity에 있는 id 필드를 오버라이드 했다는 것이고, @TableGenerator를 선언하여 BaseEntity의 @GenerateValue에서 참조하여 기본키를 생성할 수 있게 하였다.



Service


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import com.web.rnbsoft.entity.intent.IntentEntity;
 
/**
 * 
 * @author yun-yeoseong
 *
 */
public interface IntentService {
    public IntentEntity findByName(String name);
}
 
/************************************************************/
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
 
import com.web.rnbsoft.entity.intent.IntentEntity;
import com.web.rnbsoft.repository.intent.IntentRepository;
 
import lombok.extern.slf4j.Slf4j;
 
/**
 * 
 * @author yun-yeoseong
 *
 */
@Slf4j
@Service
public class IntentServiceImpl implements IntentService {
    
    @Autowired
    private IntentRepository intentRepository;
    
    @Override
    public IntentEntity findByName(String name) {
        log.debug("IntentServiceImple.findByName - {}",name);
        return intentRepository.findByIntentName(name);
    }
    
    
}
 
cs


Repository


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import org.springframework.data.jpa.repository.JpaRepository;
 
import com.web.rnbsoft.entity.intent.IntentEntity;
 
/**
 * IntentRepository
 * @author yun-yeoseong
 *
 */
public interface IntentRepository extends JpaRepository<IntentEntity, Long>, IntentRepositoryCustom{
    public IntentEntity findByIntentName(String intentName);
}
 
 
/************************************************************/
 
import java.util.List;
 
import com.web.rnbsoft.entity.intent.IntentEntity;
 
/**
 * 
 * @author yun-yeoseong
 *
 */
public interface IntentRepositoryCustom {
    public List<IntentEntity> selectByCategoryAndName(String category,String name);
}
 
/************************************************************/
 
import java.util.List;
 
import org.springframework.data.jpa.repository.support.QuerydslRepositorySupport;
 
import com.querydsl.jpa.JPQLQuery;
import com.web.rnbsoft.entity.intent.IntentEntity;
import com.web.rnbsoft.entity.intent.QIntentEntity;
 
/**
 * 
 * @author yun-yeoseong
 *
 */
public class IntentRepositoryImpl extends QuerydslRepositorySupport implements IntentRepositoryCustom {
 
    public IntentRepositoryImpl() {
        super(IntentEntity.class);
    }
 
    @Override
    public List<IntentEntity> selectByCategoryAndName(String category, String name) {
        
        QIntentEntity intent = QIntentEntity.intentEntity;
        
        JPQLQuery<IntentEntity> query = from(intent)
                          .where(intent.intentName.contains(name));
                          
        return query.fetch();
    }
 
}
cs


Repository는 QueryDSL을 이용하여 작성하였다. QueryDSL 설정법 등은 아래 링크를 참조하자

▶︎▶︎▶︎Spring JPA+QueryDSL

▶︎▶︎▶︎QueryDSL Query작성법




Controller


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import java.util.List;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
 
import lombok.extern.slf4j.Slf4j;
 
/**
 * 
 * GenericController - 컨트롤러의 공통 기능 추상화(CRUD)
 * 
 * @author yun-yeoseong
 *
 */
@Slf4j
public abstract class GenericController<T ,ID> {
    
    @Autowired
    private JpaRepository<T, ID> repository;
    
    @GetMapping("/{id}")
    public T select(@PathVariable ID id) {
        log.debug("GenericController.select - {}",id);
        return repository.findById(id).get();
    }
    
    @GetMapping
    public List<T> list(){
        log.debug("GenericController.list");
        return repository.findAll();
    }
    
    @PostMapping
    public T create(@RequestBody T t) {
        log.debug("GenericController.create - {}",t.toString());
        T created = repository.save(t);
        return created;
    }
    
    @PutMapping("/{id}")
    public T update(@RequestBody T t) {
        log.debug("GenericController.update - {}",t.toString());
        T updated = repository.save(t);
        return updated;
    }
    
    @DeleteMapping("/{id}")
    public boolean delete(@PathVariable ID id) {
        log.debug("GenericController.delete - {}",id);
        repository.deleteById(id);
        return true;
    }
}
cs


세부로직은 신경쓰지말자. 이것이 공통적으로 컨트롤러가 사용하는 것을 추상화한 GenericController이다. 사실 Service 단까지도 GenericService를 구현하려고 했으나, 크게 복잡한 로직도 아니고 해서 전부 Repository를 주입받아서 사용하였다. 나중에 이 틀을 가지고 다듬어서 사용하면 될듯 싶다.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
 
import com.web.rnbsoft.controller.GenericController;
import com.web.rnbsoft.entity.intent.IntentEntity;
import com.web.rnbsoft.service.intent.IntentService;
 
import lombok.extern.slf4j.Slf4j;
 
@Slf4j
@RestController
@RequestMapping("/intent")
public class IntentController extends GenericController<IntentEntity,Long> {
    
    private IntentService service;
    
    public IntentController(IntentService service) {
        this.service = service;
    }
    
    @PostMapping("/search")
    public IntentEntity selectByName(@RequestBody String intentName) {
        log.debug("IntentController.selectByName - {}",intentName);
        return service.findByName(intentName);
    }
}
 
cs


GenericController를 상속받는 Controller이다. CRUD의 공통로직이 모두 제거되었고 물론 이 컨트롤러 말고도 다른 컨트롤러 또한 모두 CRUD 로직은 제거 될것이다. 훨씬 코드가 간결해졌다. 여기서 하나더 기능을 추가하자면 모든 예외처리를 한곳에서 처리해버리는 것이다. 


RestControllerAdvice


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RestControllerAdvice;
import org.springframework.web.context.request.WebRequest;
 
import com.fasterxml.jackson.core.JsonProcessingException;
 
import lombok.extern.slf4j.Slf4j;
 
/**
 * RestControllerAdvice
 * RestController 공통 예외처리
 * @author yun-yeoseong
 *
 */
@Slf4j
@RestControllerAdvice("com.web.rnbsoft")
public class RestCtrlAdvice {
    @ExceptionHandler(value = {Exception.class})
    protected ResponseEntity<String> example(RuntimeException exception,
            Object body,
            WebRequest request) throws JsonProcessingException {
        log.debug("RestCtrlAdvice");
        return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("{\"message\":\"example\"}");
    }
}
 
cs

RestController의 모든 공통 예외처리를 해당 클래스에 정의한다. 물론 추후에는 메시지소스를 이용해 응답까지 하나로 통일할 수도 있을 것같다.

여기까지 GenericController를 구현해봤다. 많이 부족한 점이 많은 코드이기에 많은 지적을 해주셨으면 하는 바람이다.

posted by 여성게
: