[CryptoStream] GCS 요청수 문제 해결하고 비용 줄이기
·
프로젝트/CryptoStream
많은 수의 insert 요청으로 인한 GCS 비용 문제GCS Consumer는 Kafka의 topic에서 메시지를 읽어오고, JSON 타입의 호가창 데이터를 GCS bucket에 저장하는 역할을 합니다.이러한 적재 과정에서 실시간성을 보장하기 위해, Kafka Consumer의 파라미터 설정을 fetch_max_wait_ms=0으로 하여 topic에 메시지가 있다면 바로 읽어오고 처리하는 방식으로 구현했었습니다.이는 GCS bucket에 최신의 데이터를 유지할 수 있다는 장점이 있지만, 초 당 많은 insert 요청이 발생하여 GCS 비용이 증가하는 문제가 발생했습니다. 실제로 데이터 파이프라인을 운영해본 결과, GCS 비용으로만 하루에 ₩150,000 ~ ₩200,000의 비용이 발생했습니다. 또한 b..
[CryptoStream] Kafka 도입기
·
프로젝트/CryptoStream
도입 배경현재 데이터 파이프라인에서 메시지 브로커로 Redis Stream을 사용하고 있으며, 이를 운영하며 다음과 같은 문제점이 있음을 확인했습니다. 늘어나는 메모리 부담Redis Stream는 기본적으로 데이터를 캐시 메모리에 저장합니다.데이터 파이프라인에서 3일치의 데이터를 저장하기 위해, ticker 하나당 1,500,000건을 Redis Stream에 저장하고 있으며, 6개 ticker의 데이터를 수집 했을 때 지속적으로 메모리 사용량이 증가하여 최대 4.5GiB를 사용함을 확인했습니다.이를 기준으로 ticker 하나당 3일치 데이터를 저장하기 위해서는 약 0.75GiB의 메모리가 필요함을 추정할 수 있습니다.현재 원화 마켓에서 거래 중인 코인은 155개입니다. 이를 모두 Redis Stream..
[CryptoStream] 암호화폐 데이터 파이프라인 구축하기
·
프로젝트/CryptoStream
개요본 프로젝트는 암호화폐 거래소인 Upbit에서 제공하는 실시간 호가창 데이터를 추출하고 저장하기 위한 데이터 파이프라인 구축을 목표로 합니다.Upbit의 현재 거래 가능한 암호화폐의 종류는 수십개 이상이며, 호가창 데이터는 실시간으로 변동되는 특징을 가집니다. 이러한 데이터의 특성을 고려하여 모든 암호화폐의 호가창 데이터를 실시간으로 처리하고, 부하 분산이 가능한 스트림형 데이터 파이프라인을 설계하고 구현하고자 합니다. 프로젝트 목표실시간 데이터 적재호가창 데이터를 스트림형 데이터 파이프라인을 통해 실시간으로 수집 및 저장합니다.부하 분산모든 암호화폐에 대한 데이터를 안정적으로 처리하기 위해 부하 분산 메커니즘을 설계 및 검증합니다.확장 가능성 확보초기에는 Docker 환경에서 데이터 파이프라인을 구..
[서울시 대기현황 데이터 적재 프로젝트] Streamlit으로 Dashboard 만들기 (4)
·
프로젝트/서울시 대기현황 데이터 적재 프로젝트
1. 개요서울시 대기현황 ETL 현재 프로젝트에서는 서울시 열린데이터 광장 API를 활용하여 1시간 간격으로 실시간 서울시의 대기현황 데이터를 수집하고, 이를 PostgreSQL에 저장하는 데이터 파이프라인을 구축하여 운영중에 있습니다. 데이터는 효과적으로 관리하기 위해 날짜별로 파티셔닝하여 저장하며, 파티셔닝된 테이블 정보는 다음과 같습니다.# RealtimeCityAir_2024-09-26 테이블 예시|MSRDT |MSRRGN_NM|MSRSTE_NM|PM10|PM25|O3 |NO2 |CO |SO2 |IDEX_NM|IDEX_MVL|ARPLT_MAIN||---------------|---------|---------|----|----|-----|-----|---|-----|----..
[서울시 대기현황 데이터 적재 프로젝트] Terraform으로 배포 환경 구성하기 (3)
·
프로젝트/서울시 대기현황 데이터 적재 프로젝트
1. 도입 배경현재 프로젝트 상태현재 데이터 파이프라인 인프라는 GCE 인스턴스 생성, Docker 설치, Airflow 설치 및 설정 등의 작업을 수작업으로 진행하고 있습니다. 특히 포트 허용, 방화벽 설정, 네트워크 구성과 같은 작업은 매번 GCP 콘솔에 접속해 수동으로 관리하고 있습니다.이러한 수작업 방식은 비효율적일 뿐만 아니라, 인프라 설정을 일관되게 추적하기 어렵고 실수로 인한 오류 가능성도 큽니다.이와 같은 배경에서 반복적인 작업을 자동화하고, 인프라 변경 사항을 효율적으로 추적하기 위해 Terraform을 도입하기로 결정했습니다.  개선 방향Terraform을 도입하면서 다음과 같은 기능을 목표로 하고 있습니다.GCE 생성 자동화: 인프라가 필요할 때마다 Terraform을 통해 코드로 G..
[서울시 대기현황 데이터 적재 프로젝트] Dag 생성하고 실행하기 (2)
·
프로젝트/서울시 대기현황 데이터 적재 프로젝트
1. Dag: dags_seoul_api_RealtimeCityAir(1). "RealtimeCityAir_status_sensor": 서울시 공공데이터 포털에 현재 시간의 데이터가 정상적으로 업데이트되었는지 확인함, 정상적으로 업로드되지 않았다면 10분마다 업데이트 상태를 확인하고 다음 task로 넘어가지 않도록 대기함  (2). "RealtimeCityAir_status_to_csv": sensor에서 정상적으로 데이터가 업데이트 되었음을 확인하면, 데이터를 받고 csv 파일로 저장(3). "insrt_postgres": csv 파일로 저장된 데이터를 postgres에 bulk load2. dags와 plugins의 파일▪ dags: Airflow의 DAG 파일이 저장되는 디렉터리로, 데이터 파이프라..
[서울시 대기현황 데이터 적재 프로젝트] Airflow 환경 구성하기 (1)
·
프로젝트/서울시 대기현황 데이터 적재 프로젝트
1. 배포 환경▪ GCP의 ComputeEngine을 서버로 사용▪ Docker Compose로 Airflow Webserver, Scheduler, Worker, Redis, PostgresSQL 컨테이너 실행  ▪ 서울시 권역별 실시간 대기환경 현황 데이터는 PostgreSQL custom에 적재2. 환경 구성▪ 아래 순서로 작업을 진행하여 환경을 구성 (1). GCP VM 인스턴스 생성하기 -> https://doodo0126.tistory.com/24(2). Ubuntu에 Docker 설치하기 -> https://doodo0126.tistory.com/26(3). Docker로 Airflow 설치하기 -> https://doodo0126.tistory.com/28(4). 개발환경구성 (local ..