designing-data-intensive-applications

11장 스트림 처리

일괄 처리는 파일 집합을 읽어 새로운 파일 집합을 출력(생성)하는 기술이다. 이 때 출력은 파생 데이터이다. 이 아이디어는 검색 색인이나 추천 시스템, 분석 등에 사용된다.

데이터의 생산은 절대 끝나지 않기 때문에 일괄 처리를 위해서는 일정 기간으로 데이터 청크를 나눌 필요가 있다.

하지만 일괄 처리는 일정 시간(청크를 하루로 나눴다면 하루) 뒤에 입력의 변화가 반영되기 때문에 이 지체를 줄이기 위해서는 더 자주 처리를 해야 한다.(청크를 작게)

그래서 고정된 시간 조각이라는 개념을 완전히 버리고 이벤트가 발생할 때마다 처리하는데, 이 방법이 스트림 처리의 기본 개념이다.

일반적으로 스트림**시간 흐름에 따라 점진적으로 생산된 데이터를 말한다.

이벤트 스트림 전송

스트림 처리 문맥에서는 입력을 이벤트로 변환하는 작업을 한다. 이 이벤트는 문자열이나 JSON 등으로 부호화되며 Consumer, Subscriber 등이 처리할 수 있다.

이 때 폴링을 통해 Consumer/Subscriber가 이벤트의 발생을 확인할 수 있지만, 그에 대한 리소스는 클 수 있기 때문에 이벤트가 발생했을 때 알려주는 것이 더 효율적이다.

메시징 시스템

새로운 이벤트를 소비자에게 알릴 때 쓰이는 일반적인 방법으로 메시징 시스템이 있다.

메시징 시스템 구축의 가장 간단한 방법은 생산자와 소비자 사이에 유닉스 파이프나 TCP 연결처럼 직접 통신 채널을 사용하는 방법이다.

발행/구독(publish/subscribe) 모델을 사용하는 여러 시스템은 다양한 접근법을 사용하며 정답은 없다.

  1. 소비자의 메시지 처리 속도보다 생산자의 메시지를 전송하는 속도가 빠르다면 어떻게 될까?
    1. 3가지 선택지가 있다.
      1. 메시지를 버린다.
      2. 큐에 메시지를 버퍼링한다.
        1. 큐에 메시지가 버퍼링될 때 큐 크기가 부족해진다면 어떻게 해야 할까?
          1. 시스템은 중단되어야 하는가?
          2. 메시지를 디스크에 쓸것인가?
          3. 메시지를 디스크에 쓴다면 이건 메시징 시스템의 성능에 어떤 영향을 주는지?
      3. 배압(백프레셔, 흐름 제어)를 적용한다.
  2. 노드가 죽거나 일시적으로 오프라인이 된다면 어떻게 될까? 손실되는 메시지는?

메시지 유실 허용 여부는 애플리케이션 요구사항에 따라 다르다. 조금 누락돼도 큰 문제가 없을 수 있기 때문이다.

생산자에서 소비자로 메시지를 직접 전달하기

많은 메시징 시스템은 중간 노드를 거치지 않고 생산자와 소비자가 다이렉트로 네트워크 통신을 한다.(예를 들어 HTTP나 RPC, 속도가 중요하다면 UDP)

하지만 소비자가 죽어버릴 경우 메시지 유실의 가능성이 존재한다. (재시도 요청을 하더라도, 생산자도 죽을 수 있다)

메시지 브로커(메시지 큐)

직접 메시징 시스템의 대안으로 메시지 브로커를 이용한다. 메시지 스트림을 처리하는 최적화된 데이터베이스의 일종이다.

생산자는 브로커로 메시지를 전송하고, 소비자는 브로커에서 메시지를 읽어 전송 받는다. 지속성 문제가 브로커로 옮겨갔기 때문에 클라이언트의 상태 변경(접속, 해제, 장애)에 쉽게 대처가 가능하다.

브로커는 장애가 났을 때 메시지를 잃어버리지 않기 위해 디스크에 메시지를 기록하며 소비자는 비동기로 메시지를 소비한다.

메시지 브로커와 데이터베이스 비교

XA or JTA를 이용한 2단계 커밋을 수행하기도 하는데, 크게는 데이터 보관에 대한 관점에 차이가 있다.

복수 소비자

복수 소비자(소비자가 여러명)가 같은 토픽에서 메시지를 읽을 때 사용하는 2가지 주요 패턴이 있다.

2가지 패턴은 혼용될 수 있다. 예를 들어 각 그룹에 메시지를 전달하고, 그룹 내에서는 하나의 노드만 메시지를 받도록 하는 식.

확인 응답 재전송

소비자는 언제든 장애가 날 수 있기 때문에, 소비자가 메시지를 정상적으로 처리했는지 확인을 위해 확인 응답을 이용한다.

소비자(클라이언트)는 메시지 처리가 끝났을 때 브로커가 메시지를 큐에서 제거할 수 있도록 브로커에게 명시적으로 알려야 한다.

브로커가 확인 응답을 받기 전에 클라이언트의 연결이 끊기거나, 타임아웃이 발생하면 브로커는 메시지 처리가 되지 않았다 가정하고 다른 소비자에게 메시지를 재전송한다.

로드밸런서와 결합할 때 재전송 행위는 메시지 순서에도 영향을 미친다. m1~m5까지 순서대로 실행되다 중간에 m3를 처리하던 소비자에 장애가 발생하고, m5까지 실행을 마친 후 재처리가 그 후에 이루어질 수 있기 때문이다.

이 문제를 해결하기 위해서는 메시지가 순서에 영향을 받지 않는 비인과적 메시지이거나 로드밸런서를 사용하지 않으면 된다.

파티셔닝된 로그

메시지 브로커는 메시지를 영구 보관하지 않기 때문에 시스템에 새로운 소비자가 추가되면 이전 메시지는 알 수 없고, 이후 메시지부터 받기 시작한다.

그렇다면 데이터베이스의 지속성 있는 데이터 보관과 메시징 시스템의 지연 시간이 짧은 알림 기능을 조합할 수 없을까?

이 아이디어가 바로 로그 기반 메시지 브로커의 기본 아이디어이다.

로그를 사용한 메시지 저장소

브로커의 구현도 데이터베이스 스토리지 엔진과 같은 구조를 사용한다. 생산자가 보낸 메시지는 로그 끝에 추가하고 소비자는 로그를 순차적으로 읽어 메시지를 받는다.(tail -f 처럼)

이 때 디스크 하나를 쓸 때보다 로그를 파티셔닝하여 다른 장비에 저장하여 처리량을 높일 수 있다. 그러면 각 파티션은 다른 파티션과 독립적으로 읽기 쓰기가 가능한 분리된 로그가 완성된다.

각 파티션 내 브로커는 모든 메시지에 오프셋이라고 부르는 단조 증가하는 순번을 부여한다. 이로써 동일 파티션 내 메시지 순서는 보장되지만, 전체 파티션 간 메시지 순서는 보장하지 않는다.

아파치 카프카, 아마존 키네시스 스트림, 트위터 분산 로그가 이런 방식의 로그 기반 메시지 브로커로 동작한다.

로그 방식과 전통적인 메시징 방식 비교

로그 기반 접근법은 팬 아웃 메시징 방식을 제공한다. 소비자가 서로 영향 없이 독립적으로 로그를 읽을 수 있고, 메시지를 읽어도 로그는 사용되지 않는다.

각 클라이언트는 할당된 파티션 메시지를 모두 소비하며 일반적으로 단일 스레드로 파티션에서 순차적으로 메시지를 읽는다. 이런 거친 방식의 로드 밸런싱은 몇 가지 불리한 면이 존재한다.

즉 메시지 처리 비용이 비싸고 메시지 단위로 병렬화 처리하고 싶지만, 메시지 순서는 그렇게 중요하지 않다면 JMS/AMQP 방식의 메시지 브로커가 적합하다.

반면, 처리량이 많고 메시지를 처리하는 속도가 빠르지만 메시지 순서가 중요하다면 로그 기반 접근법이 효과적이다.

소비자 오프셋

파티션 하나를 순서대로 처리하면 소비자의 오프셋을 통해 처리된 메시지를 알 수 있기 때문에(오프셋보다 크면 처리되지 않은 메시지) 별도의 추적 오버헤드가 감소하고 일괄처리와 파이프라이닝 기회를 제공해 로그 기반 시스템의 처리량을 늘려준다.

사실, 메시지 오프셋은 단일 리더 복제에서 널리 쓰는 로그 순차 번호와 상당히 유사하며, 정확히 같은 원리가 사용된다. (메시지 브로커는 리더처럼, 소비자는 팔로워처럼)

만약 팔로워가 리더와 연결이 끊어졌다 다시 접속할 때 순차 번호를 통해 기록을 누락하지 않고 복제를 재개할 수 있다.

소비자 노드가 메시지를 이미 처리햇지만, 장애가 발생할 때 오프셋에 기록하지 못한 메시지가 있다면 재시작될 때 두 번 처리가 되는 문제가 있을 수 있다. (후에 나옴)

디스크 공간 사용

로그를 추가하다 보면 언젠가 디스크가 가득차게 된다. 디스크 공간을 재사용하기 위해 로그를 여러 조각으로 나누고 오래된 조각은 삭제하거나 보관 저장소로 이동한다.(자세한 방법은 후술)

소비자 처리가 너무 느려 메시지 생산 속도를 따라잡지 못하면 소비자가 너무 뒤쳐져 소비자 오프셋이 이미 삭제된 조각을 가리킬 수도 있다. (메시지 유실 가능성)

결과적으로 로그는 크기가 제한된 버퍼로 구현하고 버퍼가 가득 차면 오래된 순서로 버린다. 이런 버퍼를 원형 버퍼(circular buffer), 링 버퍼(ring buffer)라고 한다. 그러나 버퍼가 디스크 상에 있다면 상당히 커질 수 있다.

배치 시스템은 항상 모든 메시지를 디스크에 기록하기 떄문에 로그 처리량은 일정하지만, 메시징 시스템은 큐가 작을 때는 디스크에 기록하지 않기 때문에 빠르지만, 기록하기 시작하면 매우 느려지며 이 때 처리량은 보유한 메시지 양에 따라 다르다.

소비자가 생산자를 따라갈 수 없을 때

이 때 선택지는 3가지이다.

메시지를 잃기 시작해도 해당 소비자만 영향을 받는단건 운영상 큰 이점이다.

오래된 메시지 재생

데이터베이스와 스트림

메시지 브로커를 앞서 비교했지만 전혀 다른 도구처럼 보여도, 로그 기반 브로커는 데이터베이스에서 아이디어를 얻어서 정착했으며, 그 반대도 가능하다.

이 절에서는 이종 데이터 시스템에서 발생할 수 있는 문제 한가지와 이벤트 스트림 아이디어를 데이터베이스에 적용해 해결법을 찾는다.

시스템 동기화 유지하기

주기적으로 데이터베이스 전체를 덤프하는 작업이 너무 느리면 대안으로 이중 기록을 사용할 수 있다. 데이터가 변할 때마다 양쪽에 기록한다. (데이터베이스에 먼저 쓰고, 검색 색인 갱신)

하지만 이중 기록에는 몇가지 심각한 문제가 있는데, 그 중 하나가 경쟁 조건이다. 경쟁 조건으로 인해 두 데이터 시스템 간 데이터가 영원히 불일치 할 수 있다.

두번째로는 한쪽 쓰기가 성공하고 다른 쪽 쓰기는 실패할 가능성이 존재한다. 이는 내결함성 문제로 이 문제는 해결에 많은 비용이 든다.

이는 단일 리더로 동작하도록 한다면 쓰기 순서가 결정되기 때문에 상황이 훨씬 낫다. 하지만 가능한 이야기일까?

변경 데이터 캡처

수십 년간 데이터베이스는 데이터 변경 로그를 얻는 방법에 대해 기술한 문서를 제공하지 않았으나, 최근 들어 변경 데이터 캡처(change data capture, CDC)에 관심이 높아지고 있다. CDC는 데이터가 기록되자마자 변경 내용을 스트림으로 제공한다면 특히 유용하다.

예를 들어 변경 내용을 캡처에 검색 색인에 꾸준히 반영할 수 있다. (동기화해야 하는 시스템들은 변경 스트림의 소비자로 동작한다.)

변경 데이터 캡처 구현

검색 색인이나 데이터 웨어하우스 등은 레코드 시스템(일반적으로 RDB가 될 듯)에 저장된 데이터의 또 다른 뷰일 뿐이므로 파생 데이터 시스템이라고도 할 수 있다.

변경 데이터 캡처는 파생 데이터 시스템에 모든 변경 사항 반영을 보장하는 메커니즘이다. 변경 사항을 캡처할 데이터베이스를 리더, 나머지를 팔로워로 동작하도록 한다.

로그 기반 메시지 브로커는 원본 데이터베이스에서 변경 이벤트 전송에 적합하다. (로그 기반 메시지 브로커는 순서를 보장하기 때문)

변경 데이터 캡처 구현에 데이터베이스 트리거를 이용하기도 한다. (하지만 유지보수 오버헤드)

링크드인의 데이터버스, 페이스북의 웜홀, 야후의 셰르파에서 대규모 데이터를 다룰 때 이 아이디어를 사용한다.

초기 스냅숏

변경 로그를 모두 가지고 있다면 데이터베이스 전체 상태 재구축에 용이하겟지만, 모두 저장하면 디스크 공간이 많이 필요하며, 재생에도 오랜 시간이 걸린다. 그래서 로그는 적당히 잘라야 한다.

로그 컴팩션

로그 히스토리 양을 제한한다면 새로운 파생 데이터 시스템을 추가할 때마다 스냅숏을 만들어야 한다. 하지만 로그 컴팩션이라는 대안이 있다.

이해하기로는 로그에 툼스톤(변경 마크)를 표기하고, 후에 컴팩션할 때 최신본만 남긴다는 설명인 듯

아파치 카프카는 로그 컴팩션을 제공한다.

변경 스트림용 API 지원

최근 데이터베이스들은 변경 스트림을 기본 인터페이스로서 지원하기 시작했다.

이벤트 소싱

여기서 설명한 아이디어가 DDD에서의 이벤트 소싱과 유사한 면이 있다. (대충 이벤트가 불변이라는 유사점인 듯)

이벤트 로그에서 현재 상태 파생하기

이벤트 로그 자체는 유용하지 않고, 사용하기 위해선 재처리가 필요하다.

이 때 CDC는 로그 컴팩션을 통해 재구성해야 하지만, 이벤트 소싱은 사용자 행동 의도를 표현하기 때문에 상태를 재구축하기 위해선 전체 히스토리가 필요하며 로그 컴팩션이 불가능한 방식이다.

하지만 보통 이벤트 소싱을 사용하는 애플리케이션은 이벤트 로그에서 파생된 현재 상태 스냅숏을 저장하는 메커니즘이 있기 때문에 재처리가 필요 없다. (이벤트 소싱 시스템은 모든 원시 이벤트를 영원히 저장하고 필요할 때마다 모든 이벤트를 재처리할 수 있어야 한다는 의도가 있다.)

명령과 이벤트

이벤트 소싱 철학은 이벤트와 명령을 구분하는데 주의한다.

사용자 요청이 처음 도착하면 명령이다. 애플리케이션은 명령이 실행 가능한 지 확인 후 승인되면 불변 이벤트가 된다. 무결성 조건에 의해 실패할 수도 있다.

특정 고객의 특정 좌석 예약 이벤트가 생성됐다면, 생성 시점에 사실이 된다. 사용자가 나중에 예약을 변경하거나 취소하더라도 이전에 좌석 예약을 했다는 사실은 진실이며 변경이나 취소는 나중에 독립적인 이벤트이다.

이벤트 스트림 소비자는 이벤트 거절을 하지 못한다.

상태와 스트림 그리고 불변성

불변성의 이점은 입력 파일에 손상을 주지 않고, 기존 입력 파일을 통해 얼마든지 실험적인 처리를 수행할 수 있다. 또한 이벤트 소싱과 변경 데이터 캡처를 매우 강력하게 만든다.

하지만 데이터베이스에 상태를 저장할 때, 상태는 수시로 변한다. 그러면 불변성과 어떻게 어울릴 수 있을까?

이해하기로는 데이터베이스는 변경에 대해 변경 로그를 기록하니까 이를 통해 어울러질 수 있다는 듯. (데이터베이스는 일종의 로그의 최신본 캐싱이다)

불변 이벤트의 장점

불변 로그를 쓰면 덮어쓸 수 없기 때문에 문제 상황의 진단과 복구가 훨씬 쉽다.

동일한 이벤트 로그로 여러 가지 뷰 만들기

불변 이벤트 로그로 여러 읽기 전용 뷰를 만들 수 있다.

동시서 제어

이벤트 소싱과 변경 데이터 캡처의 가장 큰 단점은 이벤트 로그의 소비가 비동기로 이루어지기 때문에 사용자가 로그에 이벤트를 기록하고, 로그에서 파생된 뷰를 읽어도 이벤트가 아직 읽기 뷰에 반영되지 않았을 가능성이 존재한다.

해결책 하나는 동기식으로 수행하는 방법이 있다. 이 방법을 위해서는 이벤트 로그와 읽기 뷰를 같은 저장 시스템에 담아야 하며, 만약 다른 시스템이 있다면 분산 트랜잭션이 필요하다. 대안으로 “전체 순서 브로드캐스트를 사용해 선형성 저장소 구현하기” 접근법도 있다. (쓰지 말라는 의미인 듯)

불변성 한계

이벤트 소스 모델 외에도 많은 시스템이 불변성을유지한다. 영구적으로 모든 변화의 불변 히스토리를 유지하는 것이 어느 정도까지 가능할까? 그 답은 데이터 셋이 뒤틀리는 양에 따라 다르다.

데이터 추가 작업이 대부분이라면 불변이 쉽고, 갱신이나 삭제가 자주 발생하는 경우엔 힘들 수 있다. (컴팩션과 가비지 컬렉션 문제 등)

성능적 이유 외에 관리상 이유로 데이터를 삭제할 필요가 있을 수도 있다. (사생활 데이터) 데이토믹은 적출, 포씰은 셔닝이라는 비슷한 개념이 있다는 듯.

아무튼 데이터를 진짜 삭제하는 작업은 놀라울 정도로 어렵다. 복제본이 많이 남아있기 때문이다.

스트림 처리

앞서 스트림이 어디서 오는지(사용자 활동 이벤트, 센서, 데이터베이스 쓰기), 스트림이 어떻게 전송되는지(직접 메시징, 메시지 브로커, 이벤트 로그 방법 등)를 다뤘고, 이제부터 스트림으로 할 수 있는 일에 대해 다룬다.

스트림 처리에는 크게 3가지 선택지가 있다.

  1. 이벤트에서 데이터를 꺼내 데이터베이스나 캐시, 검색 색인 등 저장소 시스템에 기록하고, 다른 클라이언트가 이 데이터를 질의해서 사용한다.
  2. 이벤트를 사용자에 직접 보낸다. 이메일 경고나 푸시 알림 등
  3. 하나 이상의 입력 스트림을 처리해 하나 이상의 출력 스트림을 생산한다.

이 장 나머지는 3번 선택지에 대해 설명하며 스트림을 처리하는 코드 조각을 연산자(operator)나 작업(job)이라 부른다.

일괄 처리 작업과 크게 다른 점은 스트림은 끝나지 않는다는 점이다. 내결함성에 대한 고려도 수정이 필요하다. 일괄 처리와 다르게 스트림은 몇 년 동안 실행 중인 작업에 대해 처음부터 재시작하는 것은 비효율적이기 때문이다.

스트림 처리 사용

스트림 처리는 특정 상황 발생 시 조직에 경고 모니터링 목적으로 오랜 기간 사용돼 왔다.

복잡한 이벤트 처리(Complex event processing, CEP)

1990년대 이벤트 스트림 분석용으로 개발된 방법이다. 특정 이벤트 패턴 검색 시 적합하다.

이벤트에 대해 매치를 감지하면 엔진은 복잡한 이벤트를 방출하며, 이 질의는 일반적인 데이터베이스 시스템의 질의와 다르게 오랜 기간 저장되며 들어오는 이벤트는 지속적으로 질의를 지나 흐르며 이벤트 패턴에 매칭되는 질의를 찾게 된다.

스트림 분석

대량의 이벤트를 집계하고 통계적 지표를 뽑아 냄

→ 고정된 시간 간격 기준으로 계산한다.

집계 시간 간격을 윈도우(window)라고 한다.

구체화 뷰 유지하기

파생 데이터 시스템이 원본 데이터베이스의 최신 내용을 따라잡기 위해 사용

이벤트 소싱

스트림 상에서 검색하기

전문 검색 질의와 같은 복잡한 기준을 기반으로 개별 이벤트를 검색

ex)

엘라스틱서치의 여과 기능

메세지 전달과 RPC

차이점

스트림 조인

스트림 스트림 조인(윈도우 조인)

스트림 테이블 조인(스트림 강화)

테이블 테이블 조인(구체화 뷰 유지)

ex) 트윗

캐시 유지를 구현하기 위해서 팔로우 이벤트 스트림 구현

→ 스트림 처리는 새로운 트윗이 도착하면 어떤 타임라인을 갱신해야 하는지 알기 위해 사용자의 팔로우 집합이 포함된 DB를 유지해야 한다.

해결방안

내결함성

일괄처리

스트림

마이크로 일괄 처리 and 체크포인트

해결책

하지만 출력이 스트림을 떠나면 일괄 처리 출력을 더 이상 지울 수 없고, 실패한 태스크를 재시작하면 2번 발생하게 된다.

원자적 커밋 재검토

장애가 발생했을 때 한번 처리되는 것처럼 하려면 성공했을 때만 모든 출력과 이벤트 처리의 부수효과가 발생하게 해야 한다.

원자적으로 모두 일어나거나 모두 일어나지 않아야한다.

멱등성

멱등성을 의존하는 방법

실패 후에 상태 재구축하기

스트림처리가 실패해도 해당 상태가 복구됨을 보장해야 한다.

트레이드 오프는 무조건 있다.