도입 배경
현재 데이터 파이프라인에서 메시지 브로커로 Redis Stream을 사용하고 있으며, 이를 운영하며 다음과 같은 문제점이 있음을 확인했습니다.
늘어나는 메모리 부담
Redis Stream는 기본적으로 데이터를 캐시 메모리에 저장합니다.
데이터 파이프라인에서 3일치의 데이터를 저장하기 위해, ticker 하나당 1,500,000건을 Redis Stream에 저장하고 있으며, 6개 ticker의 데이터를 수집 했을 때 지속적으로 메모리 사용량이 증가하여 최대 4.5GiB를 사용함을 확인했습니다.
이를 기준으로 ticker 하나당 3일치 데이터를 저장하기 위해서는 약 0.75GiB의 메모리가 필요함을 추정할 수 있습니다.
현재 원화 마켓에서 거래 중인 코인은 155개입니다. 이를 모두 Redis Stream에 저장한다고 가정할 경우, 필요한 총 메모리 용량은 155 × 0.75GiB = 116.25GiB으로 지나치게 높은 메모리 요구 사항으로 보여집니다.
단일 노드 구성의 불안정성과 클러스터 구성의 복잡성
현재 Redis는 단일 노드 구성으로 하나의 노드에 문제가 생길 경우, 전체 데이터 파이프라인에 장애가 발생하게 됩니다. 이러한 문제를 해결하기 위해 Redis 클러스터 구성을 고려할 수 있지만, 이는 다음과 같은 복잡성이 추가됩니다.
- 데이터 분산 관리: 클라이언트가 특정 노드에 데이터를 읽기 위해 접근할 때, 데이터가 존재하지 않는 경우 해당 노드는 MOVED 오류를 반환합니다. 이때, 클라이언트는 이 오류를 받아 올바른 노드로 요청을 재전송해야 합니다. 즉, Redis 클러스터는 이러한 리다이렉션을 자동으로 처리하지 않으며, Producer와 Consumer 측에서 이를 관리해야 합니다.
- 복제를 위한 추가적인 노드 구성: Redis 클러스터에서 마스터 노드는 장애 발생 시 슬레이브 노드로 Failover되는 방식으로, 이를 위해 복제를 위한 노드가 추가적으로 필요합니다. 예를 들어 분산 처리를 위해 마스터 노드를 3개 구성한다고 가정하면, 복제를 위한 슬레이브 노드도 3개도 추가로 필요하게 됩니다.
Kafka 선택 이유
이러한 점에서 Redis Stream은 데이터 파이프라인의 메시지 브로커로 사용하는 것은 한계가 있다고 판단하여, Kafka을 메시지 브로커로 도입함으로써 해당 문제를 해결하고자 했으며, 메시지 브로커로 Kafka를 선택하게 된 이유는 다음과 같습니다.
- 디스크 기반 데이터 저장: Kafka는 데이터를 디스크에 저장하며, 페이지 캐시를 통해 Producer와 Consumer의 읽기/쓰기 속도를 향상시킵니다. 이러한 방식은 메모리 사용량을 감소시키고, 효율적인 메모리 사용을 기대할 수 있습니다.
- 자동으로 리다이렉션 수행: Kafka의 Broker는 클라이언트의 요청을 해당 토픽의 파티션이 있는 Broker로 자동으로 리다이렉트 합니다. 따라서 Producer와 Consumer 측에서 이를 관리할 필요가 없습니다.
- 복제를 위한 추가적인 노드가 필요 없음: Kafka는 메시지를 파티션 단위로 저장하며, 하나의 Broker는 분산 처리를 위한 파티션과 파티션을 복제하는 레플리카를 포함할 수 있습니다. 이러한 구조는 추가적인 복제 노드가 필요한 Redis 클러스터와 다르게, 추가적인 노드가 필요하지 않습니다.
Kafka 도입 시 고려사항
Kafka 도입 시 고려한 사항은 다음과 같습니다.
메시지 보존 기간 설정
- Kafka 기본 메시지 보존 기간: Kafka는 메시지를 세그먼트 단위로 디스크에 저장하며, 오래된 세그먼트는 시간 또는 크기 기반 정책에 따라 삭제됩니다. 세그먼트의 보존 기간은 log.retention.ms 파라미터로 설정할 수 있으며, 기본값은 7일입니다.
- 기존의 메시지 보존 기간: 기존의 Redis Stream 기반 데이터 파이프라인에서는 메모리 한계로 인해 데이터를 최대 3일까지만 보존해 왔으나, 보존 기간이 짧아 이를 늘릴 필요가 있었습니다.
- 어떤 기준으로 보존 기간을 설정할 것인가? 암호화폐 시장은 다른 금융 시장의 영향을 받는 특성이 있습니다. 이에 따라 주식 시장이 운영되는 주중의 데이터를 보존하는 것이 필요하다고 생각했으며, 글로벌 금융 시장 간의 시차를 고려해 추가로 2일을 더해 총 7일 동안 데이터를 보존하도록 설정했습니다.
기본 replica 수 설정
기본 replica의 수를 설정하는 파라미터인 default.replication.factor를 2로 설정하여, 하나의 Broker 노드에 장애가 발생할 경우에도 데이터 파이프라인에 장애가 생기지 않도록 했습니다.
replica 수를 늘려서 더 안정적인 내결함성을 보장할 수도 있지만, replica 수 만큼 필요한 디스크 용량도 배로 늘어나기 때문에 2로 설정하여 진행하였습니다.
Producer와 Consumer 설정
데이터 파이프라인의 실시간 처리를 보장하기 위해 Producer와 Consumer의 파라미터를 다음과 같이 설정했습니다.
- enable.auto.commit=False: Consumer가 처리한 메시지의 offset을 자동으로 commit 할지 여부를 설정하는 파라미터입니다. 기본값인 True는 지정된 시간 간격으로 자동 commit을 수행하며, 이로 인해 실시간성이 저하될 수 있습니다. 이를 방지하고 메시지를 처리한 즉시 commit 할 수 있도록 enable.auto.commit을 False로 설정했습니다.
- fetch.max.wait.ms=0: Consumer가 메시지를 요청했을 때, 브로커가 fetch.min.bytes에 지정된 크기 만큼 메시지를 채워 보내기 위해, 대기하는 시간을 설정하는 파라미터입니다. 해당 값을 0으로 설정하여 대기 시간 없이 메시지를 Consumer에게 바로 전송하도록 설정했습니다.
- linger.ms=0: Producer가 여러 메시지를 배치로 묶어 전송하기 위해, 메시지를 축적할 대기할 시간을 설정하는 파라미터 입니다. 해당 값을 0으로 설정하여 Producer가 메시지를 즉시 전송하도록 설정했습니다.
Infra 구성
terraform을 사용하여 GCP에서 Kafka와 데이터 파이프라인을 위한 인프라를 프로비저닝 했습니다. 생성한 리소스들은 다음과 같으며, 데이터 파이프라인을 구성하는 컴포넌트들은 Docker 환경에서 실행됩니다.
- VM: kafka 노드 3개와 데이터 파이프라인 노드 1개를 생성했습니다.
- Kafka 노드: vCPU 2개, Memory 4GB
- 데이터 파이프라인 노드: vCPU 4개, Memory 16GB
- VPC: Kafka와 데이터 파이프라인 간 내부 통신을 위해 cryptostream-subnet 서브넷을 생성하고, 모든 VM을 해당 서브넷에 배치했습니다.
- Persistant Disk: Kafka와 PostgreSQL의 데이터를 저장하기 위해 Persistant Disk를 생성했습니다.
- Kafka 노드의 Disk 크기: Kafka 노드의 mount되는 Disk의 크기는 ticker 하나당 하루에 0.25 GiB 데이터가 생성된다고 가정하고, 계산한 결과 아래와 같이 180.3GiB 가량 필요함을 확인했고, 여유롭게 250GiB로 설정했습니다.
- 약 180.3GiB = 0.25GiB (ticker 하나 당 하루에 생성되는 데이터량) x 7 (7일 데이터 보존) x 2 (replica의 수) x 155 (원화 마켓에 상장된 ticker의 개수) / 3 (Kafka 노드의 수)
- PostgreSQL Disk 크기: PostgreSQL에 mount되는 Disk의 크기는 100G로 설정했으며, 이후 데이터 마트를 위한 배치형 데이터 파이프라인을 구축하며 조정할 계획입니다.
- Kafka 노드의 Disk 크기: Kafka 노드의 mount되는 Disk의 크기는 ticker 하나당 하루에 0.25 GiB 데이터가 생성된다고 가정하고, 계산한 결과 아래와 같이 180.3GiB 가량 필요함을 확인했고, 여유롭게 250GiB로 설정했습니다.
- GCS bucket: 호가창 데이터를 저장하기 위한 GCS bucket을 생성했습니다.
데이터 파이프라인
주요 컴포넌트
Upbit Producer
Producer는 WebSocket을 통해 특정 Ticker의 호가창 데이터를 실시간으로 수신하고, 메시지를 Kafka의 topic에 전송하는 역할을 담당합니다.
Producer는 Kafka Python SDK를 사용해서 구현했습니다.
- 생성한 Producer의 수: 하나의 Producer가 155개 Ticker의 데이터를 모두 구독할 경우, 컨테이너 장애 발생 시 전체 데이터 수신이 중단될 위험이 있습니다. 이를 방지하기 위해 Producer를 여러개 생성하여, 장애 상황에서 일부 데이터는 계속 수신할 수 있도록 했습니다. Ticker 별로 개별 Producer를 생성하는 것이 가장 이상적이지만, 컨테이너 관리와 효율적인 리소스 사용을 고려해 총 16개의 Producer를 생성했습니다.
- Producer가 구독하는 Ticker: 각각의 Producer는 저마다 다른 10개의 Ticker를 할당 받고, 해당 Ticker들의 호가창 데이터를 구독합니다.
- 순서 보장: Producer는 지정된 topic의 partition에 메시지를 전송하도록 하여, Ticker 별로 메시지 순서를 보장했습니다.
Kafka
Kafka는 데이터 파이프라인에서 메시지 브로커 역할을 수행하며, Producer가 보낸 메시지를 topic에 저장하고, Consumer가 이를 읽어갈 수 있도록 관리하는 역할을 합니다.
- Kafka 클러스터 구성: Kafka 클러스터는 분산 처리와 장애 복구를 위해 3개의 노드로 구성했습니다.
- KRaft 사용: Zookeeper를 사용할 경우 Kafka 클러스터의 메타 데이터 관리를 위해 추가적인 애플리케이션이 필요하다는 점에서, 관리할 애플리케이션을 줄이기 위해 KRaft를 사용했습니다. 또한 KRaft를 사용할 경우 Kafka의 아키텍처가 단순화되며, 이를 통해 추후 확장성을 향상시킬 수 있기 때문에 선택했습니다.
- 모니터링 방법: Kafka UI에서 제공하는 Broker 상태 확인, Consumer Group 별로 Lag 확인, 토픽에 쌓이는 메시지 실시간 조회 등의 기능을 활용하여 Kafka를 관리하고 있습니다.
- topic의 partition 개수: topic의 patition의 개수는 생성한 Producer의 개수에 맞춰 16개로 설정했습니다.
GCS Consumer
Kafka의 topic에서 메시지를 읽어와 처리하며, JSON 데이터를 GCS bucket에 저장하는 컴포넌트입니다. 마찬가지로 Kafka Python SDK를 사용해서 구현했으며, 지정된 topic의 patition으로부터 메시지를 읽어옵니다.
- 생성한 consumer의 수: topic의 patition의 수에 맞춰 16개의 Consumer를 생성했습니다.
- 멀티 쓰레드 방식: 단일 쓰레드로 GCS에 적재할 경우 topic의 메시지 생성량을 따라가지 못해 consumer lag가 발생했습니다. 이를 해결하기 위해 5개의 쓰레드를 사용하여 GCS에 JSON 데이터를 적재하도록 했습니다.
데이터 처리 흐름
Extract
- 데이터 수집: Upbit Producer에서 호가창 데이터를 구독하여 실시간으로 수집합니다.
- Kafka에 전송: 수집한 JSON 데이터를 Kafka의 topic에 전송합니다.
Load
- 메시지 읽기: GCS Consumer가 Kafka의 topic에서 메시지를 읽어옵니다.
- 데이터 적재: JSON 타입의 호가창 데이터를 Ticker와 시간 별로 파티셔닝하여 GCS bucket에 적재합니다.
- 예시: ticker=BTC/year=2025/month=02/day=02/hour=16/minute=16
도입 후기
지금까지 데이터 파이프라인의 메시지 브로커를 Redis Stream을 사용하면서 겪었던 한계와 문제점을 해결하기 위해, Kafka를 도입하는 과정을 담아보았습니다.
이 과정에서 Kafka 클러스터 구축을 위해 VM instance를 추가했고, Producer와 Consumer를 Kafka SDK로 리펙토링하는 작업을 수행했습니다.
추가적인 리소스와 작업이 발생하기는 했지만 이로 인해 다음과 같은 사항이 개선되었습니다.
- 메모리 사용량 절감: Redis Stream에서는 메시지 저장을 위해 많은 메모리를 사용하였으나, Kafka을 도입하면서 메모리 사용량을 줄일 수 있었습니다.
- 메시지 보존 기간 연장: Kafka의 디스크 기반 저장 방식의 이점을 활용하여, 메시지의 보존 기간을 3일에서 7일로 늘릴 수 있었습니다.
- 고가용성 확보: Kafka 클러스터를 다중 노드로 구성함으로써, 하나의 노드에 문제가 발생하더라도 정상적으로 운영이 가능해졌습니다.
- 효율적인 모니터링: Kafka UI를 통해 메시지 브로커의 상태를 실시간으로 모니터링할 수 있게 되었습니다.
- 모든 Ticker의 데이터 적재: 기존에는 메모리 문제와 분산 처리의 한계 때문에 일부 Ticker의 데이터만 적재하였는데, 해당 문제들을 해결한 뒤 암호화폐 거래소에 상장된 모든 Ticker의 호가창 데이터를 안정적으로 적재할 수 있게 되었습니다.
GitHub - dojun43/CryptoStream-docker: Crypto 데이터 파이프라인 구축 프로젝트
Crypto 데이터 파이프라인 구축 프로젝트. Contribute to dojun43/CryptoStream-docker development by creating an account on GitHub.
github.com
참고:
redis 클러스터: https://jaimemin.tistory.com/2329
kafka replication: https://damdam-kim.tistory.com/17
Kafka Disk I/O가 빠른 이유: https://medium.com/sjk5766/kafka-disk-i-o가-빠른-이유-899c4da5084
zookeeper와 kraft 비교: https://devocean.sk.com/blog/techBoardDetail.do?ID=165711&boardType=techBlog
zookeeper와 kraft 비교: https://adjh54.tistory.com/639?utm_source=chatgpt.com
FMS 데이터파이프라인 구축기: https://tech.socarcorp.kr/data/2023/01/17/build-fms-data-pipeline-1.html#11-fms-서비스-소개
'프로젝트 > CryptoStream' 카테고리의 다른 글
[CryptoStream] GCS 요청수 문제 해결하고 비용 줄이기 (0) | 2025.02.28 |
---|---|
[CryptoStream] 암호화폐 데이터 파이프라인 구축하기 (0) | 2024.12.24 |