[Airflow] 외부 python 함수 실행하기

2024. 1. 21. 18:43·Data Engineering/Airflow
목차
  1. Python 모듈 경로의 이해 
  2. sys.path에 경로를 추가하는 방법
  3. Airflow에서의 경로
  4. 1. dags_python_import_func.py
  5. 2. common_func.py
  6. 3. File 디렉토리 
  7. 4. ".env" 파일 만들기
  8. 5. Dag 실행하기


Python 모듈 경로의 이해 

▪ python은 sys.path 변수에서 모듈의 위치를 검색

▪ 실행하는 파이썬 파일과 동일 경로에 있는 파일은 실행 가능함

▪ 어떤 함수를 만들고 dag에 그 함수를 import하기 위해서는 만들었던 python 파일의 경로를 "sys.path"에 추가해야함

sys.path에 경로를 추가하는 방법

▪ 명시적으로 추가 ex) sys.path.append('/home/new')

▪ 또는 os 환경변수 pythonpath에 값을 추가

Airflow에서의 경로

▪ Airflow는 자동적으로 dags 폴더와 plugins 폴더를 sys.path에 추가함

▪ airflow 컨테이너로 이동해서 "airflow info" 명령어를 입력하면 python_path를 확인할 수 있는데, dags 폴더와 plugins 폴더가 기본적으로 잡혀있는 것을 확인할 수 있음

▪ 따라서 python 파일을 plugins 폴더에 위치시켜 놓으면, import해서 파일에 접근할 수 있음

sudo docker ps # 컨테이너 정보 확인
sudo docker exec -it {WORKER CONTAINER ID} bash # worker 컨테이너로 접속
airflow info
# 확인했으면 Ctrl + P + Q으로 나가기


1. dags_python_import_func.py

▪ 매일 6시 30분에 작업하도록 스케줄링

▪ "from common.common_func import get_sftp" 부분에서 외부 python 함수를 import

from airflow import DAG
import pendulum
import datetime
from airflow.operators.python import PythonOperator
from common.common_func import get_sftp
with DAG(
dag_id="dags_python_import_func",
schedule="30 6 * * *",
start_date=pendulum.datetime(2023, 12, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
task_get_sftp = PythonOperator(
task_id='task_get_sftp',
python_callable=get_sftp
)

2. common_func.py

▪ 실행할 python 외부 함수

def get_sftp():
print('sftp 작업을 시작합니다')

3. File 디렉토리 

▪ "dags_python_import_func.py"는 "airflow/dags"에 위치

▪ "common_func.py"는 "airflow/pugins/common"에 위치

4. ".env" 파일 만들기

▪ plugins 폴더가 pythonpath로 잡혀있지 않기 때문에, ".env" 파일 만들어서 path 추가하기

WORKSPACE_FOLDER="airflow 최상위 디렉토리"
PYTHONPATH=${WORKSPACE_FOLDER}/plugins

 

▪ ".env" 파일은 Git에 올릴 필요가 없기 때문에, ".gitignore" 파일에 추가해 줌

5. Dag 실행하기

▪ 변경사항과 dag 업로드하고 airflow 디렉토리에 git pull

▪ 그러면 다음과 같이 common 폴더에 "common_func.py" 파일이 생겼음

 

▪ webserver 접속해보면 dag이 정상적으로 올라가있음

 

▪ dag unpause 후 log 확인해보면 외부함수를 호출 후 실행한 결과가 나옴

 

 

 

참고:

 

Airflow 마스터 클래스 강의 - 인프런

데이터 파이프라인을 효율적으로 만들고 관리하기 위한 Orchestration 도구인 Airflow에 대해 배우는 강의입니다. 초보자도 차근차근 배울 수 있는 Airflow 마스터 클래스, 환영합니다!, 데이터 파이프

www.inflearn.com

 

 

'Data Engineering > Airflow' 카테고리의 다른 글

[Airflow] Python 함수 파라미터  (0) 2024.01.23
[Airflow] Task decorator 사용하기  (1) 2024.01.21
[Airflow] Python Operator 기본  (0) 2024.01.21
[Airflow] Email Operator 사용하기  (1) 2024.01.21
[Airflow] Bash Operator & 외부 Shell 파일 실행하기  (0) 2024.01.20
  1. Python 모듈 경로의 이해 
  2. sys.path에 경로를 추가하는 방법
  3. Airflow에서의 경로
  4. 1. dags_python_import_func.py
  5. 2. common_func.py
  6. 3. File 디렉토리 
  7. 4. ".env" 파일 만들기
  8. 5. Dag 실행하기
'Data Engineering/Airflow' 카테고리의 다른 글
  • [Airflow] Python 함수 파라미터
  • [Airflow] Task decorator 사용하기
  • [Airflow] Python Operator 기본
  • [Airflow] Email Operator 사용하기
Doodo
Doodo
  • Doodo
    Doodo
    Doodo
  • 전체
    오늘
    어제
    • 분류 전체보기 (192)
      • CS (17)
        • Network (11)
        • Database (6)
      • Language (19)
        • Python (11)
        • SQL (6)
        • R (2)
      • Linux (17)
      • DevOps (35)
        • Git (7)
        • Docker (8)
        • Kubernetes (9)
        • GCP (4)
        • AWS (7)
      • Data Engineering (50)
        • 책 리뷰 (14)
        • Airflow (35)
        • Redis (1)
      • DBMS (21)
        • CUBRID (21)
      • ML & DL (2)
      • 코딩테스트 (24)
      • 프로젝트 (7)
        • 서울시 대기현황 데이터 적재 프로젝트 (4)
        • CryptoStream (3)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

  • 인기 글

  • 태그

  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.0
Doodo
[Airflow] 외부 python 함수 실행하기
상단으로

티스토리툴바

단축키

내 블로그

내 블로그 - 관리자 홈 전환
Q
Q
새 글 쓰기
W
W

블로그 게시글

글 수정 (권한 있는 경우)
E
E
댓글 영역으로 이동
C
C

모든 영역

이 페이지의 URL 복사
S
S
맨 위로 이동
T
T
티스토리 홈 이동
H
H
단축키 안내
Shift + /
⇧ + /

* 단축키는 한글/영문 대소문자로 이용 가능하며, 티스토리 기본 도메인에서만 동작합니다.