[Airflow] Bash Operator에서 Xcom 사용

2024. 2. 4. 16:35·Data Engineering/Airflow


Bash Operator에서 Xcom 사용하기

▪ bash operator에서 Xcom을 사용하기 위해서는 env, bash_command 파라미터에서 template을 이용

▪ bash_command 파라미터에 템플릿 문법을 작성해서,  데이터를 Xcom에 올리기

▪ ti 객체에 xcom_push 함수를 이용해서 key, value 형태로 데이터 올리기

▪ bash_command는 출력하는 문장이 return 값으로 간주되어, Xcom에 "return_value" 라는 key 값으로 value가 저장됨

 

▪ env 파라미터에 템플릿 문법을 작성해서, 데이터를 Xcom에서 꺼내기

▪ ti 객체에서 xcom_pull 함수를 이용해서 데이터 꺼내기 

▪ xcom_pull 함수에서 task_ids만 작성을 하면, task에서 return 한 값을 꺼내옴

▪ "do_xcom_push=False"로 지정할 경우, 마지막 return 값이 Xcom에 안 올라감


dags_bash_with_xcom.py

▪ BashOperator에서 Xcom을 사용하여 데이터를 올리고 꺼내오는 dag

▪ "bash_push"에서 데이터를 올리고, "bash_pull"에서 데이터를 꺼내와서 출력함

▪ "bash_pull"에서 마지막에 출력되는 값을 Xcom에 올리기 않기 위해, "do_xcom_push=False"로 지정

from airflow import DAG
import pendulum
import datetime
from airflow.operators.bash import BashOperator

with DAG(
    dag_id="dags_bash_with_xcom",
    schedule="10 0 * * *",
    start_date=pendulum.datetime(2023, 12, 10, tz="Asia/Seoul"),
    catchup=False,
) as dag:
    bash_push = BashOperator(
        task_id='bash_push',
        bash_command="echo START && "
                     "echo XCOM_PUSHED "
                     "{{ ti.xcom_push(key='bash_pushed', value = 'first_bash_message') }} &&"
                     "echo COMPLETE"
    )

    bash_pull = BashOperator(
        task_id='bash_pull',
        env={'PUSHED_VALUE':"{{ ti.xcom_pull(key='bash_pushed') }}",
            'RETURN_VALUE':"{{ ti.xcom_pull(task_ids='bash_push') }}"},
        bash_command="echo $PUSHED_VALUE && echo $RETURN_VALUE ",
        do_xcom_push=False
    ) 

    bash_push >> bash_pull

Dag 실행하기

▪ dag을 airflow에 upload하고, webserver 접속해서 dag unpause 

 

▪ bash_push의 실행 결과 log

 

▪ bash_pull의 실행 결과 log

▪ 첫번째 출력은 xcom_push 함수를 사용해서 올렸던 value 값인 first_bash_message

▪ 두번째 출력은 가장 마지막에 return한 값인 COMPLETE

 

 

 

참고:

 

Airflow 마스터 클래스 강의 - 인프런

데이터 파이프라인을 효율적으로 만들고 관리하기 위한 Orchestration 도구인 Airflow에 대해 배우는 강의입니다. 초보자도 차근차근 배울 수 있는 Airflow 마스터 클래스, 환영합니다!, 데이터 파이프

www.inflearn.com

 

'Data Engineering > Airflow' 카테고리의 다른 글

[Airflow] Python/email Operator간 Xcom 사용  (0) 2024.02.11
[Airflow] Python/Bash Operator간 Xcom 사용  (2) 2024.02.04
[Airflow] Python Operator에서 Xcom 사용  (0) 2024.02.01
[Airflow] Python Operator에서 Macro 변수 사용하기  (0) 2024.01.28
[Airflow] Bash Operator에서 Macro 변수 사용하기  (1) 2024.01.27
'Data Engineering/Airflow' 카테고리의 다른 글
  • [Airflow] Python/email Operator간 Xcom 사용
  • [Airflow] Python/Bash Operator간 Xcom 사용
  • [Airflow] Python Operator에서 Xcom 사용
  • [Airflow] Python Operator에서 Macro 변수 사용하기
Doodo
Doodo
  • Doodo
    Doodo
    Doodo
  • 전체
    오늘
    어제
    • 분류 전체보기 (192)
      • CS (17)
        • Network (11)
        • Database (6)
      • Language (19)
        • Python (11)
        • SQL (6)
        • R (2)
      • Linux (17)
      • DevOps (35)
        • Git (7)
        • Docker (8)
        • Kubernetes (9)
        • GCP (4)
        • AWS (7)
      • Data Engineering (50)
        • 책 리뷰 (14)
        • Airflow (35)
        • Redis (1)
      • DBMS (21)
        • CUBRID (21)
      • ML & DL (2)
      • 코딩테스트 (24)
      • 프로젝트 (7)
        • 서울시 대기현황 데이터 적재 프로젝트 (4)
        • CryptoStream (3)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

  • 인기 글

  • 태그

  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.0
Doodo
[Airflow] Bash Operator에서 Xcom 사용
상단으로

티스토리툴바