BaseBranchOperator를 사용해서 분기처리
▪ class 선언하고 BaseBranchOperator 상속
▪ choose_branch 함수 재정의, 함수의 return 값은 다음 처리할 Task의 id를 입력
▪ 후행으로 하나의 Task만 수행할 경우는 하나의 Task id만 return하고, 여러개의 Task를 수행하고 싶은 경우는 Task id들을 리스트 형태로 묶어서 return
▪ 객체를 선언해주고, 다음에 수행되어야 하는 Task의 후보들을 리스트로 묶어서 flow 표현
dags_base_branch_operator.py
▪ 'custom_branch_operator'에서 다음에 수행할 Task를 랜덤하게 정해서 return, 'A'가 선택될 경우 'task_a'를 수행하고 'B' 또는 'C'가 선택된 경우 'task_b'와 'task_c'를 수행
▪ 'task_a', 'task_b', 'task_c'에서 선택된 값을 출력
from airflow import DAG
import pendulum
from airflow.operators.branch import BaseBranchOperator
from airflow.operators.python import PythonOperator
with DAG(
dag_id='dags_base_branch_operator',
start_date=pendulum.datetime(2023,12,19, tz='Asia/Seoul'),
schedule=None,
catchup=False
) as dag:
class CustomBranchOperator(BaseBranchOperator):
def choose_branch(self, context):
import random
print(context)
item_lst = ['A','B', 'C']
selected_item = random.choice(item_lst)
if selected_item == 'A':
return 'task_a'
elif selected_item in ['B','C']:
return ['task_b', 'task_c']
custom_branch_operator = CustomBranchOperator(task_id='python_branch_task')
def common_func(**kwargs):
print(kwargs['selected'])
task_a = PythonOperator(
task_id='task_a',
python_callable=common_func,
op_kwargs={'selected':'A'}
)
task_b = PythonOperator(
task_id='task_b',
python_callable=common_func,
op_kwargs={'selected':'B'}
)
task_c = PythonOperator(
task_id='task_c',
python_callable=common_func,
op_kwargs={'selected':'C'}
)
custom_branch_operator >> [task_a, task_b, task_c]
Dag 실행하기
▪ dag을 airflow에 upload하고, webserver 접속해서 dag unpause
▪ 스케줄이 따로 지정되어 있지 않기 때문에 [▶] 클릭해서 수동으로 실행
▪ dag의 graph를 확인해보면 task_b와 task_c가 실행되고, task_a가 skip처리됨
▪ task_b의 실행결과 log
▪ task_c의 실행결과 log
참고:
Airflow 마스터 클래스 강의 - 인프런
데이터 파이프라인을 효율적으로 만들고 관리하기 위한 Orchestration 도구인 Airflow에 대해 배우는 강의입니다. 초보자도 차근차근 배울 수 있는 Airflow 마스터 클래스, 환영합니다!, 데이터 파이프
www.inflearn.com
'Data Engineering > Airflow' 카테고리의 다른 글
[Airflow] log file 관리하기 (0) | 2024.03.21 |
---|---|
[Airflow] Trigger Rule 설정 (0) | 2024.03.12 |
[Airflow] @task.branch로 분기처리 (0) | 2024.02.21 |
[Airflow] BranchPython Operator로 분기처리 (0) | 2024.02.21 |
[Airflow] 전역 공유변수 Variable (0) | 2024.02.21 |