개요
본 프로젝트는 암호화폐 거래소인 Upbit에서 제공하는 실시간 호가창 데이터를 추출하고 저장하기 위한 데이터 파이프라인 구축을 목표로 합니다.
Upbit의 현재 거래 가능한 암호화폐의 종류는 수십개 이상이며, 호가창 데이터는 실시간으로 변동되는 특징을 가집니다. 이러한 데이터의 특성을 고려하여 모든 암호화폐의 호가창 데이터를 실시간으로 처리하고, 부하 분산이 가능한 스트림형 데이터 파이프라인을 설계하고 구현하고자 합니다.
프로젝트 목표
- 실시간 데이터 적재
- 호가창 데이터를 스트림형 데이터 파이프라인을 통해 실시간으로 수집 및 저장합니다.
- 부하 분산
- 모든 암호화폐에 대한 데이터를 안정적으로 처리하기 위해 부하 분산 메커니즘을 설계 및 검증합니다.
- 확장 가능성 확보
- 초기에는 Docker 환경에서 데이터 파이프라인을 구축하고, 여러 개의 data producer와 data loader를 컨테이너 형태로 생성하여 안정적인 부하 분산 가능성을 테스트합니다.
- 이후 안정적인 데이터 처리를 지원하기 위해 Kubernetes 환경에서 데이터 파이프라인을 확장 및 운영할 계획입니다.
Data source
암호화폐 데이터의 특징
- 실시간 변동성
- 데이터는 실시간으로 빠르게 변화하며, 처리 지연이 최소화되어야 합니다.
- 데이터 생성량의 변동성
- 데이터 생성량은 예측이 어렵습니다.
- 거래량이 적은 경우, 암호화폐 별 초당 3~5건의 데이터가 생성됩니다.
- 거래량이 많은 경우, 초당 생성되는 데이터의 양은 시장 참여자 수에 따라 달라질 수 있으며, 정확한 추정은 추가 테스트가 필요합니다.
- 데이터 생성량은 예측이 어렵습니다.
- 암호화폐 단위 데이터 구조
- 암호화폐은 개별적인 가격 및 호가 데이터를 가지고 있으며, 모든 암호화폐의 데이터를 실시간으로 관리해야 합니다.
Ticker
각각의 암호화폐은 Ticker라는 고유한 식별자를 가지며, Producer가 WebSocket을 통해 데이터를 수신할 때, 구독 요청에 사용됩니다.
- Ticker: 암호화폐의 고유한 식별자
- 예시: BTC-KRW, DOGE-KRW, XRP-KRW 등
호가창
호가창은 암호화폐 거래 시 매수와 매도의 주문 상황을 실시간으로 보여주는 정보입니다. 이를 통해 시장의 공급과 수요, 거래량, 체결된 가격 등을 파악할 수 있습니다.
호가창 데이터 구성 요소
호가창 데이터에서 추출해야 할 주요 정보는 다음과 같습니다.
- 매도 호가 (ask price): 매도자가 제시한 판매 가격
- 매도 수량 (ask volume): 특정 매도 호가에 해당하는 암호화폐 총 수량
- 매수 호가 (bid price): 매수자가 제시한 구매 가격
- 매수 수량 (bid volume): 특정 매수 호가에 해당하는 암호화폐 총 수량
데이터의 특성
- 다수의 매수/매도 호가
- 시장 참여자가 많아질수록 각 참여자가 제시한 가격이 달라지며, 이에 따라 매수/매도 호가는 여러 개가 됩니다.
- 예시: 호가창 스냅샷 저장 시, 매도 호가/수량과 매수 호가/수량이 각각 최대 16개씩 기록됩니다.
- askprice1: 95,704,000, askprice2: 95,705,000, ..., askprice16: 95,749,000
- 주문 요청에 따른 실시간 데이터 갱신
- 호가창 데이터는 시장 참여자의 주문 요청에 따라 실시간으로 변경됩니다.
수집할 데이터 및 수집 계획
- 암호화폐 중 일부만 데이터 수집
- 초기 단계에서는 전체 암호화폐 중 일부 주요 암호화폐에 대해서만 데이터를 수집합니다.
- 수집할 암호화폐는 한달 간 거래량을 기준으로 많음, 중간, 적음으로 분류하여 각 그룹별로 2개의 암호화폐를 선정해 데이터를 수집합니다. 이를 통해 다양한 거래량에 따른 시스템 성능을 검토하고 최적화합니다.
- 5단계 호가창 데이터 추출
- 초기에는 매수/매도 호가 상위 5단계 데이터만 수집 및 저장하여 적재 부담을 줄입니다.
- 데이터 적재량 분석
- 하루 동안 생성되는 데이터 양을 분석하여 디스크 사용량을 평가합니다.
- 점진적 확장
- 초기 시스템을 검증하며 저장하는 호가창 데이터를 단계적으로 확장하고, 수집 대상 암호화폐의 범위를 점차 넓힙니다.
데이터 파이프라인 아키텍처
주요 컴포넌트
Producer
Producer는 WebSocket을 통해 실시간으로 호가창 데이터를 수신받는 역할을 담당합니다. 데이터를 효율적으로 처리하기 위해 비동기 방식을 활용하며, 병렬적으로 데이터를 처리합니다.
주요 특징
- 특정 Ticker의 데이터만 처리
- 각 Producer는 특정 Ticker들을 구독하며, 지정된 Ticker들의 호가창 데이터만 수신받습니다.
- 구독할 Ticker는 설정 파일(conf/producer.conf)에서 지정 가능합니다.
- 부하 분산
- Producer를 여러 개 생성하여 Ticker 별로 데이터를 나누어 처리함으로써, 수신 받는 데이터의 부하를 분산합니다.
- 확장성
- Producer의 개수를 늘림으로써 구독하는 Ticker의 범위를 확장할 수 있습니다. 이를 통해 다양한 암호화폐의 데이터를 효율적으로 적재할 수 있습니다.
Redis Stream
Redis Stream은 데이터 파이프라인에서 메시지 브로커 역할을 수행하며, Producer가 보낸 메시지를 Stream에 저장하고, Dataloader가 이를 읽어갈 수 있도록 관리하는 역할을 합니다.
또한, Producer와 Dataloader 사이에서 메시지의 재전송, 분산 처리를 통해 메세지를 효율적으로 전달하고 관리합니다.
Redis Stream 선택 배경
- 메시지 재전송 기능의 필요성
- Redis List를 메시지 브로커로 테스트하는 과정에서 메시지가 소실될 가능성과 데이터 관리의 한계를 경험했습니다.
- 이러한 문제점을 해결하기 위해 Consumer의 메시지 처리 상태를 추적하여, 처리 완료 여부를 기반으로 메시지를 재전송하는 기능이 필요했습니다.
- 메시지 분산 처리
- Producer에서 하나의 Ticker는 초당 약 3~5개의 메시지를 생성하며, 수십 개의 Ticker 데이터를 안정적으로 처리하기 위해 메시지 분산 처리 기술이 필요할 것이라고 생각했습니다.
- 이를 위해 Redis Stream과 Kafka를 후보로 검토하였으며, 초기 환경에서는 Redis Stream이 별도의 추가 인프라 없이 기존 Redis 환경에서 확장 가능하다는 점에서 적합하다고 판단했습니다.
- 따라서 초기 단계에서는 Redis Stream을 기반으로 메시지 분산 처리를 구현하고, 향후 데이터 처리량 증가 또는 복잡한 스트리밍 요구사항이 발생하면 Kafka와 같은 고성능 분산 스트리밍 플랫폼으로 전환할 계획입니다.
고려사항
- Stream 데이터 길이 제한
- Redis Stream의 무한 데이터 적재 특성으로 인해 메모리 문제를 방지하기 위해 Stream 길이를 제한했습니다.
- 하루에 Ticker 하나당 평균 500,000건의 메시지가 생성된다는 점을 기준으로, Ticker 당 3일치 데이터(1,500,000건)만 보관하도록 설정했습니다.
dataloader
Redis Stream에서 데이터를 읽어와 처리하며, 수집한 JSON 형태의 호가창 데이터를 변환하여 데이터베이스에 저장하는 컴포넌트입니다.
주요 특징
- 메시지를 정상적으로 데이터베이스에 저장하면 Redis Stream에 ACK 명령을 전송하여 해당 메시지가 처리되었음을 알립니다.
- 데이터베이스 commit 주기는 설정 파일(conf/dataloader.conf)에서 조정 가능하며, 기본값은 10건 단위로 commit 하도록 설정되어 있습니다.
데이터 처리 흐름
1. Extract
- 데이터 수집: Upbit Producer에서 WebSocket 방식으로 호가창 데이터를 구독하여 실시간으로 수집합니다.
- Redis Stream에 추가: 수집된 데이터를 Redis Stream에 추가합니다.
2. 메시지 브로커
- Stream 생성: Producer의 개수만큼 Redis Stream을 생성하고,각 Producer로부터 수신한 데이터를 일시적으로 저장합니다.
- 데이터 분산: Stream을 통해 데이터를 Consumer Group으로 안정적으로 전달합니다.
3. Transform & Load
- 데이터 읽기: Dataloader가 Redis Stream에서 메시지를 읽어옵니다.
- 데이터 변환: JSON 형식의 호가창 데이터를 PostgreSQL 테이블 구조에 맞게 변환합니다.
- 데이터 저장: 변환된 데이터를 PostgreSQL에 저장하며, 데이터는 날짜별로 파티셔닝하여 관리됩니다.
Docker 환경에서 배포하기
데이터 파이프라인 배포는 docker compose를 활용하여 진행했습니다.
사전에 terraform을 활용하여 포트 허용, 방화벽 설정, 네트워크 구성을 한 GCE 인스턴스를 하나 생성하고, docker를 설치해주었습니다.
docker-compose.yaml
postgres
수집한 데이터를 저장하기 위해 postgres 컨테이너를 생성합니다.
- DB 생성 시 필요한 환경설정들은 .env 파일에서 읽어옵니다.
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: cryptostream
TZ: Asia/Seoul
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "${POSTGRES_USER}", "-d", "cryptostream"]
interval: 10s
retries: 5
start_period: 5s
restart: always
ports:
- 5432:5432
networks:
network_custom:
ipv4_address: 172.28.0.2
Redis
Redis Stream를 사용하기 위해 redis 컨테이너를 생성합니다.
- 마찬가지로 필요한 환경설정들은 .env 파일에서 읽어옵니다.
services:
...
redis:
image: redis:7.2-bookworm
command: [ "redis-server", "--requirepass", "${REDIS_PASSWORD}"]
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 30s
retries: 50
start_period: 30s
restart: always
ports:
- 6379:6379
networks:
network_custom:
ipv4_address: 172.28.0.3
x-crptostream-common
producer와 dataloader에 공통적으로 적용될 부분입니다.
- image: 베이스 이미지로 python:3.12를 사용했습니다.
- env_file: producer와 dataloader에서 postgres와 redis에 접근하기 위해 필요한 접속 정보를 읽어 옵니다.
- volumes: 실행하기 위해 필요한 파일의 경로들을 mount 합니다.
- depends_on: postgres와 redis 컨테이너가 정상적으로 실행되면 컨테이너를 실행합니다.
x-crptostream-common:
&crptostream-common
image: python:3.12
env_file:
- .env
volumes:
- ${CRPTOSTREAM_PROJ_DIR:-.}/conf:/CryptoStream/conf
- ${CRPTOSTREAM_PROJ_DIR:-.}/src:/CryptoStream/src
- ${CRPTOSTREAM_PROJ_DIR:-.}/logs:/CryptoStream/logs
working_dir: /CryptoStream
depends_on:
&crptostream-common-depends-on
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
...
upbit_producer
3개의 producer를 생성합니다.
- <<: *crptostream-common: crptostream-common에 내용을 참조합니다.
- upbit_producer.py <producer_name>: command의 마지막 부분에 해당 producer의 고유 이름을 지정해줍니다.
- 지정한 이름을 기준으로 producer.conf 파일에서 환경설정 값을 읽어들입니다.
services:
...
upbit_producer1:
<<: *crptostream-common
command: >
bash -c "
python -m venv /venv &&
/venv/bin/pip install -r /CryptoStream/src/requirements.txt &&
/venv/bin/python /CryptoStream/src/upbit_producer.py upbit_producer1"
networks:
network_custom:
ipv4_address: 172.28.0.4
upbit_producer2:
<<: *crptostream-common
command: >
bash -c "
python -m venv /venv &&
/venv/bin/pip install -r /CryptoStream/src/requirements.txt &&
/venv/bin/python /CryptoStream/src/upbit_producer.py upbit_producer2"
networks:
network_custom:
ipv4_address: 172.28.0.5
upbit_producer3:
<<: *crptostream-common
command: >
bash -c "
python -m venv /venv &&
/venv/bin/pip install -r /CryptoStream/src/requirements.txt &&
/venv/bin/python /CryptoStream/src/upbit_producer.py upbit_producer3"
networks:
network_custom:
ipv4_address: 172.28.0.6
upbit_dataloader
3개의 dataloader를 생성합니다.
- <<: *crptostream-common: crptostream-common에 내용을 참조합니다.
- upbit_dataloader.py <producer_name>: command의 마지막 부분에 해당 dataloader의 고유 이름을 지정해줍니다.
- 지정한 이름을 기준으로 dataloader.conf 파일에서 환경설정 값을 읽어들입니다.
services:
...
upbit_dataloader1:
<<: *crptostream-common
command: >
bash -c "
python -m venv /venv &&
/venv/bin/pip install -r /CryptoStream/src/requirements.txt &&
/venv/bin/python /CryptoStream/src/upbit_dataloader.py upbit_dataloader1"
networks:
network_custom:
ipv4_address: 172.28.0.7
upbit_dataloader2:
<<: *crptostream-common
command: >
bash -c "
python -m venv /venv &&
/venv/bin/pip install -r /CryptoStream/src/requirements.txt &&
/venv/bin/python /CryptoStream/src/upbit_dataloader.py upbit_dataloader2"
networks:
network_custom:
ipv4_address: 172.28.0.8
upbit_dataloader3:
<<: *crptostream-common
command: >
bash -c "
python -m venv /venv &&
/venv/bin/pip install -r /CryptoStream/src/requirements.txt &&
/venv/bin/python /CryptoStream/src/upbit_dataloader.py upbit_dataloader3"
networks:
network_custom:
ipv4_address: 172.28.0.9
배포하기
1. .env 설정
~/CryptoStream-docker/.env 파일에 postgres와 redis 접속 정보를 입력합니다.
POSTGRES_HOST=postgres
POSTGRES_PORT=5432
POSTGRES_USER=<DB user>
POSTGRES_PASSWORD=<DB password>
REDIS_HOST=redis
REDIS_PORT=6379
REDIS_PASSWORD=<redis password>
2. producer.conf 설정
~/CryptoStream-docker/conf/producer.conf에서 stream_name, maxlen, tickers를 설정합니다.
- stream_name: 해당 producer가 메시지를 추가할 redis stream의 이름
- maxlen: redis stream의 길이
- tickers: 해당 producer가 구독할 ticker들
[upbit_producer1]
stream_name=upbit_stream1
maxlen=3000000
tickers=BTC,ETH
[upbit_producer2]
stream_name=upbit_stream2
maxlen=3000000
tickers=SOL,ETC
[upbit_producer3]
stream_name=upbit_stream3
maxlen=3000000
tickers=XRP,BCH
3. dataloader.conf 설정
~/CryptoStream-docker/conf/dataloader.conf에서 stream_name, group_name, consumer_name, commit_count를 설정합니다.
- stream_name: 해당 dataloader가 읽어올 redis steam의 이름
- group_name: 해당 dataloader가 속할 consumer group 이름
- consumer_name: 해당 dataloader의 consumer 이름
- commit_count: postgres에 commit 할 주기
[upbit_dataloader1]
stream_name=upbit_stream1
group_name=dataloader1
consumer_name=consumer1
commit_count=10
[upbit_dataloader2]
stream_name=upbit_stream2
group_name=dataloader2
consumer_name=consumer1
commit_count=10
[upbit_dataloader3]
stream_name=upbit_stream3
group_name=dataloader3
consumer_name=consumer1
commit_count=10
4. docker compose up
설정이 완료되면, 백그라운드로 컨테이너들을 실행합니다.
$ docker compose up -d
그러면, 다음과 같이 정상적으로 배포되었음을 확인할 수 있습니다.
수집된 데이터는 다음과 같이 Postgres에 날짜 별로 파티셔닝되어 저장됩니다.
앞으로의 계획
- 수집 암호화폐 범위 확장: 현재 일부 암호화폐 데이터만 수집 중이며, 서비스 운영과 안정적인 분산처리가 가능해지면, 점진적으로 수집 암호화폐의 종류를 확장할 계획입니다.
- 호가 데이터 단계 확장: 현재 상위 5단계 호가창 데이터를 수집 중입니다. 수집되는 데이터 양을 분석한 후, 추가적인 호가 데이터 수집을 진행할 예정입니다.
- 쿠버네티스 환경에서의 배포: Docker 환경에서 안정적으로 동작하는 데이터 파이프라인을 Kubernetes 환경으로 확장하여, 안정적인 데이터 처리와 운영을 지원할 수 있는 인프라를 구축할 계획입니다.
- Kafka 기반의 데이터 분산 처리: 현재 Redis Stream을 사용하여 데이터 분산 처리를 구현하고 있지만, 향후 데이터 처리량이 증가하거나 더 복잡한 스트리밍 요구 사항이 발생할 경우, Kafka를 도입할 예정입니다.
GitHub - dojun43/CryptoStream-docker: Crypto 데이터 파이프라인 구축 프로젝트
Crypto 데이터 파이프라인 구축 프로젝트. Contribute to dojun43/CryptoStream-docker development by creating an account on GitHub.
github.com
'프로젝트 > CryptoStream' 카테고리의 다른 글
[CryptoStream] GCS 요청수 문제 해결하고 비용 줄이기 (0) | 2025.02.28 |
---|---|
[CryptoStream] Kafka 도입기 (0) | 2025.01.28 |