많은 수의 insert 요청으로 인한 GCS 비용 문제
GCS Consumer는 Kafka의 topic에서 메시지를 읽어오고, JSON 타입의 호가창 데이터를 GCS bucket에 저장하는 역할을 합니다.
이러한 적재 과정에서 실시간성을 보장하기 위해, Kafka Consumer의 파라미터 설정을 fetch_max_wait_ms=0으로 하여 topic에 메시지가 있다면 바로 읽어오고 처리하는 방식으로 구현했었습니다.
이는 GCS bucket에 최신의 데이터를 유지할 수 있다는 장점이 있지만, 초 당 많은 insert 요청이 발생하여 GCS 비용이 증가하는 문제가 발생했습니다.
실제로 데이터 파이프라인을 운영해본 결과, GCS 비용으로만 하루에 ₩150,000 ~ ₩200,000의 비용이 발생했습니다.
또한 bucket의 write request은 200/s ~ 600/s 정도 발생했습니다.
GCS 비용 정책과 발생 비용
GCS는 작업을 수행할 때 마다 비용이 부과됩니다. 작업의 클래스는 A, B, 무료로 분류되며, 스토리지 클래스 별로 작업 1,000개 단위 당 다음의 비용이 부과됩니다.
GCS Consumer가 JSON 타입의 호가창 데이터를 GCS에 적재하는 작업은 A 클래스 작업에 해당하며, 즉 1,000건의 insert 시 $0.005의 비용이 발생합니다.
이를 기반으로 초당 400건의 write 요청이 발생한다고 가정하고, 하루에 예상되는 write 요청 비용을 계산하면 172.8$으로 이는 너무 많은 비용으로 보여집니다.
- 하루 wirte 비용 = 400(초 당 write 요청 수) x 60(초) x 60(분) x 24(시) / 1000(비용 측정 단위) x 0.005(A 클래스 작업 비용) = 172.8$
Consumer Batch 처리를 통해 요청 수 줄이기
write 요청 비용을 줄이기 위해, GCS Consumer에서 Kafka의 topic에 메시지가 있을 때 마다 바로 읽어와 처리하던 기존의 방식을 일정한 주기 마다 여러 개의 메시지를 한꺼번에 읽어 처리하는 Batch 방식으로 변경하였습니다.
구체적인 시나리오는 다음과 같습니다.
10초 주기로 Kafka의 topic으로부터 155개의 Ticker에 대한 호가창 데이터를 읽어오고, GCS bucket에 JSONL 파일로 적재합니다. 그러면 10초에 155번의 insert 작업을 수행하며, 초 당 15.5번의 write 요청이 발생하게 됩니다.
Batch 방식으로 설계할 경우 하루에 발생하는 write 요청 비용은 아래와 같으며, 비용을 172.8$에서 6.696$로 절감할 수 있게 됩니다.
- 하루 wirte 비용 = 15.5(초 당 write 요청 수) x 60(초) x 60(분) x 24(시) / 1000(비용 측정 단위) x 0.005(A 클래스 작업 비용) = 6.696$
Kafka Consumer 파라미터 설정
Batch 처리를 위해 Kafka Consumer의 파라미터 설정을 다음과 같이 변경했습니다.
- fetch_max_wait_ms: 10000으로 설정하여, 최대 10초까지 데이터가 쌓이는 것을 대기하고, 이를 초과할 경우 데이터를 가져오도록 설정했습니다.
- fetch_min_bytes: 750,000으로 설정하여, 최소 750,000 Bytes의 데이터가 쌓이면 가져오도록 설정했습니다.
- 750,000으로 설정한 기준: 보통 메시지 하나의 크기가 450~500 Bytes이고, 파티션 하나당 1초에 30~100건의 메시지가 발생하는 것을 확인했는데, 이를 통해 partition 하나당 10초 동안 발생하는 데이터의 크기를 계산할 경우 500 Bytes x 100 (건) x 10 (초) = 500,000 Bytes 정도 인 것을 확인할 수 있습니다. 여기에 더 많은 메시지가 발생하는 경우도 고려해 1.5배 만큼인 750,000로 설정했습니다.
- max_partition_fetch_bytes: 해당 파라미터는 Consumer가 각 partition에서 가져올 수 있는 최대 bytes의 크기를 설정하는 파라미터 입니다. fetch_min_bytes와 마찬가지로 750,000로 설정했습니다.
consumer = KafkaConsumer(
bootstrap_servers=bootstrap_servers,
group_id=group_id,
auto_offset_reset='earliest',
enable_auto_commit=False,
value_deserializer=lambda x: x.decode('utf-8'),
fetch_max_wait_ms=10000,
fetch_min_bytes=750000,
max_partition_fetch_bytes=750000
)
GCS Consumer 리팩토링
기존에 호가창 데이터 1건 마다 JSON 파일 생성하여 GCS Bucket에 적재하던 방식을 변경했습니다.
buffer를 활용하여 partition 별로 데이터를 모으고, 같은 patition의 데이터를 하나의 JSONL 파일로 GCS Bucket에 적재하는 방식으로 변경했습니다.
다음과 같이 리팩토링할 경우 Ticker 하나당 10초에 1번의 write 요청을 하게 됩니다.
class gcs_consumer:
...
def upload_buffer_to_gcs(self, buffer: dict[str, any]):
for partition_key, data_list in buffer.items():
# JSONL 포맷으로 변환
jsonl_data = "\n".join(json.dumps(d) for d in data_list)
# GCS 업로드
last_data = data_list[-1]
blob_name = f"{partition_key}/{last_data['ticker']}-{str(last_data['timestamp'])}.jsonl"
blob = self.bucket.blob(blob_name)
blob.upload_from_string(
data=jsonl_data,
content_type="application/json"
)
def main(self):
while True:
try:
# 메시지 처리
msg = self.kafka_consumer.poll(timeout_ms=20000)
buffer = defaultdict(list)
if msg:
for topic_partition, messages in msg.items():
for message in messages:
up_data = json.loads(message.value)
up_data = self.transform_data(up_data)
# 파티션 키 생성
partition_date = up_data["timestamp_date"].strftime("%Y/%m/%d/%H/%M")
partition_key = f"{up_data['ticker']}/{partition_date}"
# timestamp_date 포멧 변경
up_data["timestamp_date"] = str(up_data["timestamp_date"])
# 버퍼에 데이터 추가
buffer[partition_key].append(up_data)
self.upload_buffer_to_gcs(buffer)
self.kafka_consumer.commit()
except Exception as e:
logging.error(f"gcs consumer error: {e}")
time.sleep(5)
결과 및 후기
Batch 방식으로 변경한 후 bucket의 write 요청 수는 14/s~18/s으로 나타났습니다.
기존에 200/s~600/s와 비교 했을 때, 약 1/33~1/14 가량 요청 수를 줄일 수 있었습니다.
GCS 비용 또한 ₩150,000~₩200,000에서 ₩6,000~₩10,000 정도로 크게 줄었습니다.
이를 통해 GCS Bucket에 적재되는 데이터의 실시간성을 어느 정도 포기하고, GCS 요청 비용을 크게 줄일 수 있었습니다.
또한 클라우드 플랫폼 사용 시 비용 정책을 잘 확인하는 것이 필요함을 느꼈습니다.
'프로젝트 > CryptoStream' 카테고리의 다른 글
[CryptoStream] Kafka 도입기 (0) | 2025.01.28 |
---|---|
[CryptoStream] 암호화폐 데이터 파이프라인 구축하기 (0) | 2024.12.24 |