IT/AWS

[AWS] Kinesis Data Streams - Internals

물통꿀꿀이 2019. 2. 16. 21:55

이번 포스팅에서는 Kinesis 데이터 스트림의 내부를 살펴보려고 한다.

(참조에서 언급된 자료는 발표 자료를 바탕으로 한다.


그림 1. Kinesis Capacity


Kinesis 스트림에 레코드가 들어왔을 때, 그림 1에서 보는 것처럼 기본적으로 레코드에 접근할 수 있는 시간은 24시간이다.

그렇지만 최대 168시간 (7일) 까지 늘릴 수 있다. (시간이 늘어나면 비용 또한 늘어난다는 것을 고려해야 한다.)


Kinesis는 자체적으로(?) KPL(Kinesis Provider Library), KCL(Kinesis Consumer Library)를 제공하고 있다. 해당 라이브러리는 High Level API로 공급자와 소비자 측에서 Kinesis를 손쉽게 사용 할 수 있게 한다.

그런데 https://github.com/awslabs/amazon-kinesis-producer KPL repository에 들어가면 알 수 있듯이 제공하는 언어가 Java와 C++로 제한되어 있다. 반면에 KCL은 KPL에 비해 조금 나은 편이다. 버전에 따라 다르긴 하지만 https://github.com/awslabs/amazon-kinesis-client-python 처럼 Python 또한 제공하고 있다. (한가지 주의 할 점은 내부 Daemon이 Java로 구성되어 있기 때문에 다른 언어를 사용한다 할지라도 Java가 필수이다.)

때문에 High Level이 아닌 Low Level에 Kinesis 데이터 스트림을 알아보도록 하겠다.

  

그림 2. Shard Iterators


그림 2를 확인해보면 Shard Iterator를 볼 수 있다. Shard Iterator는 Shard 안에 있는 레코드를 읽는 시작 위치를 나타낸다. Iterator 타입에 따라 시작 위치가 조금 달라지는데 타입은 아래와 같다.

- AT_SEQUENCE_NUMBER : 해당 시퀀스 번호부터 레코드를 읽는다.

- AFTER_SEQUNCE_NUMBER : 해당 시퀀스 번호의 바로 이전 위치부터 읽는다.

- AT_TIMESTAMP: 해당 시간부터 읽는다.

- TRIM_HORIZON : 가장 오래된 데이터*부터 읽는다. 

* 오래된 데이터는 보존기간 안에 있어서 접근 할 수 있는 데이터를 의미한다.

- LASTEST : 가장 최근 것부터 읽는다.


그리고 Kinesis 데이터 스트림으로부터 데이터를 받기 위해서는 아래와 같은 (Python boto3 라이브러리) 호출을 통해 Shard Iterator로부터 레코드를 받아 올 수 있다.

response = client.get_records(
    ShardIterator='string',
    Limit=123
)

실제로 get_records를 실행 해보면 항상 스트림으로부터 레코드를 얻을 수 있는 것은 아니다.

그림 3. Get Records


그림 3에서 확인 할 수 있듯이 레코드가 있다면 Records 부분이 채워져서 최대 10MB 크기의 레코드가 반환되었겠지만, 해당 반환되는 값이 없다면 Shard Iterator에서 참조하는 시퀀스 번호로는 사용할 수 있는 데이터 레코드가 없다는 것을 의미한다.

이런 상황에서는 Shard Iterator가 스트림과 동기화 할 수 있도록 적어도 1초 정도 대기해야 한다. 그 이후 레코드를 가져오면 된다.

참고로, 스트림에 레코드가 추가된 이후에 Kinesis에서 소비자가 해당 스트림의 Shard에서 가져 갈 수 있도록 하기 위해 전처리를 수행하는데 이 시간에서 약 2~3초 정도 걸린다. 때문에 공급자가 데이터를 넣고 소비자가 받아가기 위해서는 최소 Kinesis의 전처리 시간 정도의 지연이 발생 할 수 밖에 없다.


그림 4. Splitting/Merging Shards


스트림을 생성할 때 지정하는 Shard의 수는 언제든 바꿀 수 있다. 즉, 리샤딩을 할 수 있다는 것인데 샤드 분할과 샤드 병합 2가지의 리샤딩이 존재한다.

샤드 분할은 샤드 1개를 2개로 분리하여 데이터 스트림의 처리량을 늘리는 것이고 그와 반대로 샤드 병합은 샤드 2개를 1개로 합쳐 데이터 스트림의 처리량을 줄이는 것이다. (당연한 것이겠지만 샤드를 조정하는데는 잠깐이라도 시간이 걸린다.)

그래서 그림 1에서 확인 할 수 있듯이 리샤딩을 하면 샤드에 상태가 달라진다. 먼저 상태에 대해 알아보면 아래와 같다.

- OPEN : 리샤딩 작업 전 상위 샤드

- CLOSED : 리샤딩 작업 후 상위 샤드

- EXPIRED : 스트림의 보존 기간 만료 후 상위 샤드


그림 4는 샤딩을 분할하는데 기존 샤드에 있는 레코드는 리샤딩이 발생한 이후 새로 만들어진 샤드로 복사된다. 그런데 복사되는 레코드가 순서대로 저장된다는 보장이 없다. 따라서 순서대로 레코드를 읽기 위해서는 기존 샤드에 접근하여 데이터를 읽어야 한다.


리샤딩 이후 기존 샤드는 CLOSE 상태가 된다. CLOSE 상태로 바뀌면 해당 샤드로는 더 이상 레코드를 저장 할 수 없다.

그리고 위에서도 언급했듯이 스트림은 보존 기간 정책 (기본 24시간)이 존재하므로 기존 샤드 또한 보존 기간이 넘으면 샤드가 EXPIRED로 변경되면서 더 이상 기존 레코드에 접근이 불가능하다.


참조

https://www.slideshare.net/frodriguezolivera/aws-kinesis-streams

https://docs.aws.amazon.com/ko_kr/streams/latest/dev/developing-consumers-with-sdk.html