@task.branch를 사용해서 분기처리
▪ 실행할 Python 함수에 @task.branch로 랩핑
▪ Python 함수의 return 값은 다음 처리할 Task의 id를 입력
▪ 후행으로 하나의 Task만 수행할 경우는 하나의 Task id만 return하고, 여러개의 Task를 수행하고 싶은 경우는 Task id들을 리스트 형태로 묶어서 return
▪ flow는 다음에 수행되어야하는 Task의 후보들을 리스트로 묶어서 표현
dags_python_with_branch_decorator.py
▪ 'python_branch_task'에서 다음에 수행할 Task를 랜덤하게 정해서 return, 'A'가 선택될 경우 'task_a'를 수행하고 'B' 또는 'C'가 선택된 경우 'task_b'와 'task_c'를 수행
▪ 'task_a', 'task_b', 'task_c'에서 선택된 값을 출력
from airflow import DAG
from datetime import datetime
from airflow.operators.python import PythonOperator
from airflow.decorators import task
with DAG(
dag_id='dags_python_with_branch_decorator',
start_date=datetime(2023,12,18),
schedule=None,
catchup=False
) as dag:
@task.branch(task_id='python_branch_task')
def select_random():
import random
item_lst = ['A', 'B', 'C']
selected_item = random.choice(item_lst)
if selected_item == 'A':
return 'task_a'
elif selected_item in ['B', 'C']:
return ['task_b', 'task_c']
def common_func(**kwargs):
print(kwargs['selected'])
task_a = PythonOperator(
task_id='task_a',
python_callable=common_func,
op_kwargs={'selected':'A'}
)
task_b = PythonOperator(
task_id='task_b',
python_callable=common_func,
op_kwargs={'selected':'B'}
)
task_c = PythonOperator(
task_id='task_c',
python_callable=common_func,
op_kwargs={'selected':'C'}
)
select_random() >> [task_a, task_b, task_c]
Dag 실행하기
▪ dag을 airflow에 upload하고, webserver 접속해서 dag unpause
▪ 스케줄이 따로 지정되어 있지 않기 때문에 [▶] 클릭해서 수동으로 실행
▪ dag의 graph를 확인해보면 task_b와 task_c가 실행되고, task_a가 skip처리됨
▪ task_b의 실행결과 log
▪ task_c의 실행결과 log
참고:
Airflow 마스터 클래스 강의 - 인프런
데이터 파이프라인을 효율적으로 만들고 관리하기 위한 Orchestration 도구인 Airflow에 대해 배우는 강의입니다. 초보자도 차근차근 배울 수 있는 Airflow 마스터 클래스, 환영합니다!, 데이터 파이프
www.inflearn.com
'Data Engineering > Airflow' 카테고리의 다른 글
[Airflow] Trigger Rule 설정 (0) | 2024.03.12 |
---|---|
[Airflow] BaseBranch Operator로 분기처리 (0) | 2024.02.21 |
[Airflow] BranchPython Operator로 분기처리 (0) | 2024.02.21 |
[Airflow] 전역 공유변수 Variable (0) | 2024.02.21 |
[Airflow] Python/email Operator간 Xcom 사용 (0) | 2024.02.11 |