[Airflow] docker-compose.yaml 해석
·
Data Engineering/Airflow
도커 컴포즈란?여러 개의 컨테이너를 하나의 스크립트에 정의하고, 한번에 모든 컨테이너를 관리할 수 있도록 해주는 도구 docker-compose.yaml 파일에 시스템을 구동하기 위해 필요한 컨테이너들의 설정을 입력 docker compose up 명령을 입력하면, docker-compose.yaml 파일에 기재된 모든 컨테이너들이 실행됨 docker-compose.yaml 해석yaml 파일이란?  json, xml과 같이 key, value로 구성되고 계층적 구조를 가짐, 들여쓰기 문법을 사용 airflow의 docker-compose.yaml은 다음과 같은 주 항목을 가짐version: '3.8' x-airflow-common공통으로 지정할 항목으로 여러 컨테이너에서 동일하게 설정할 내용이 기..
[Airflow] log file 관리하기
·
Data Engineering/Airflow
문제 발생▪ "could not close temporary statistics file "pg_stat_tmp/global.tmp": No space left on device"라는 메시지와 함께 Airflow 서비스가 올라가지 않는 문제가 발생했습니다. ▪ 아마 Airflow를 운영하면서 데이터가 쌓여 충분한 용량이 없는 것으로 보여지기에, 어떤 항목이 용량을 많이 차지하고 있는지 확인했습니다. logs 폴더 확인 ▪ logs 폴더로 이동해서 용량을 확인해봤더니 scheduler가 2.3G로 상당 부분 차지하고 있음을 확인할 수 있었습니다.▪ 용량 확보를 위해 불필요한 log는 지워주도록 합니다.log 파일 지우기 # -type f : 일반 파일만 검색# -mtime +10: 만든지 10일 이상 지난..
[Airflow] Trigger Rule 설정
·
Data Engineering/Airflow
Trigger Rule ▪ 상위 Task의 상태에 따라 다음 Task의 수행 여부를 결정하고 싶을때 사용 ▪ Ex) Task 1,2,3 중 실패한 Task가 하나 이상 있을 경우 Task4를 수행 Trigger Rule 종류 all_success(기본값) 상위 Task가 모두 성공하면 실행 all_failed 상위 Task가 모두 실패하면 실행 all_done 상위 Task가 모두 수행되면 실행(실패해도 수행된 것에 포함) all_skipped 상위 Task가 모두 Skipped 상태면 실행 one_failed 상위 Task 중 하나 이상 실패하면 실행(모든 상위 Task 완료를 기다리지 않음) one_success 상위 Task 중 하나 이상 성공하면 실행(모든 상위 Task 완료를 기다리지 않음) o..
[Airflow] BaseBranch Operator로 분기처리
·
Data Engineering/Airflow
BaseBranchOperator를 사용해서 분기처리 ▪ class 선언하고 BaseBranchOperator 상속 ▪ choose_branch 함수 재정의, 함수의 return 값은 다음 처리할 Task의 id를 입력 ▪ 후행으로 하나의 Task만 수행할 경우는 하나의 Task id만 return하고, 여러개의 Task를 수행하고 싶은 경우는 Task id들을 리스트 형태로 묶어서 return ▪ 객체를 선언해주고, 다음에 수행되어야 하는 Task의 후보들을 리스트로 묶어서 flow 표현 dags_base_branch_operator.py ▪ 'custom_branch_operator'에서 다음에 수행할 Task를 랜덤하게 정해서 return, 'A'가 선택될 경우 'task_a'를 수행하고 'B' 또..
[Airflow] @task.branch로 분기처리
·
Data Engineering/Airflow
@task.branch를 사용해서 분기처리 ▪ 실행할 Python 함수에 @task.branch로 랩핑 ▪ Python 함수의 return 값은 다음 처리할 Task의 id를 입력 ▪ 후행으로 하나의 Task만 수행할 경우는 하나의 Task id만 return하고, 여러개의 Task를 수행하고 싶은 경우는 Task id들을 리스트 형태로 묶어서 return ▪ flow는 다음에 수행되어야하는 Task의 후보들을 리스트로 묶어서 표현 dags_python_with_branch_decorator.py ▪ 'python_branch_task'에서 다음에 수행할 Task를 랜덤하게 정해서 return, 'A'가 선택될 경우 'task_a'를 수행하고 'B' 또는 'C'가 선택된 경우 'task_b'와 'task..
[Airflow] BranchPython Operator로 분기처리
·
Data Engineering/Airflow
Task 분기처리 ▪ Task1의 결과에 따라 여러 Task 2들 중 하나만 수행하도록 구현해야 할때 필요 ▪ BranchPythonOperator, task.branch 데커레이터, BaseBranchOperator 상속 등을 사용하여 분기처리 가능 BranchPythonOperator를 사용해서 분기처리 ▪ BranchPyhonOperator의 python_callable에 실행할 Python 함수를 입력 ▪ Python 함수의 return 값은 다음 처리할 Task의 id를 입력 ▪ 후행으로 하나의 Task만 수행할 경우는 하나의 Task id만 return하고, 여러개의 Task를 수행하고 싶은 경우는 Task id들을 리스트 형태로 묶어서 return ▪ flow는 다음에 수행되어야하는 Task..
[Airflow] 전역 공유변수 Variable
·
Data Engineering/Airflow
전역변수 Variable ▪ XCOM은 특정 DAG이나 schedule에 수행되는 Task 간에만 데이터를 공유할 수 있었음 ▪ 모든 DAG이 공유할 수 있는 전역변수가 Variable ▪ Variable에 등록한 key, value 값은 메타 DB에 저장 ▪ 협업 환경에서 표준화된 dag을 만들기 위해 사용되며, 상수로 지정해서 사용할 변수를 셋팅 Variable 등록하기 ▪ webserver에 접속하고, [Admin] -> [Variables] 클릭 ▪ [+] 클릭해서 새로운 Variable 생성 ▪ 다음과 같이 Key, value 값 입력하고 [save] 클릭 전역변수 사용하기 ▪ Variable 라이브러리의 get 함수를 이용해서 값 꺼내온 뒤 사용 ▪ 스케줄러의 주기적 DAG 파싱시 Variab..
[Airflow] Python/email Operator간 Xcom 사용
·
Data Engineering/Airflow
Email Operator에서 사용 가능한 Template ▪ 'to', 'subject', 'html_content', 'files' 파라미터에 Template 적용 가능 Python Operator에서 Email Operator로 Xcom 전달하기 ▪ PythonOperator에서 return한 값을 ti객체에 xcom_pull 함수를 사용하여 값 받아오기 ▪ 이때 template 문법을 사용해야함 dags_python_email_xcom.py ▪ 'something_task'에서 'Success' 와 'Fail' 중 하나의 값을 랜덤으로 return해서 Xcom 전달 ▪ 'send_email'에서 return한 값을 가져오고, 해당 시간을 기준으로 이메일 제목과 내용을 작성해서 전송 from air..