
Python 모듈 경로의 이해
▪ python은 sys.path 변수에서 모듈의 위치를 검색
▪ 실행하는 파이썬 파일과 동일 경로에 있는 파일은 실행 가능함
▪ 어떤 함수를 만들고 dag에 그 함수를 import하기 위해서는 만들었던 python 파일의 경로를 "sys.path"에 추가해야함

sys.path에 경로를 추가하는 방법
▪ 명시적으로 추가 ex) sys.path.append('/home/new')
▪ 또는 os 환경변수 pythonpath에 값을 추가
Airflow에서의 경로
▪ Airflow는 자동적으로 dags 폴더와 plugins 폴더를 sys.path에 추가함
▪ airflow 컨테이너로 이동해서 "airflow info" 명령어를 입력하면 python_path를 확인할 수 있는데, dags 폴더와 plugins 폴더가 기본적으로 잡혀있는 것을 확인할 수 있음
▪ 따라서 python 파일을 plugins 폴더에 위치시켜 놓으면, import해서 파일에 접근할 수 있음
sudo docker ps # 컨테이너 정보 확인 sudo docker exec -it {WORKER CONTAINER ID} bash # worker 컨테이너로 접속 airflow info # 확인했으면 Ctrl + P + Q으로 나가기

1. dags_python_import_func.py
▪ 매일 6시 30분에 작업하도록 스케줄링
▪ "from common.common_func import get_sftp" 부분에서 외부 python 함수를 import
from airflow import DAG import pendulum import datetime from airflow.operators.python import PythonOperator from common.common_func import get_sftp with DAG( dag_id="dags_python_import_func", schedule="30 6 * * *", start_date=pendulum.datetime(2023, 12, 1, tz="Asia/Seoul"), catchup=False ) as dag: task_get_sftp = PythonOperator( task_id='task_get_sftp', python_callable=get_sftp )
2. common_func.py
▪ 실행할 python 외부 함수
def get_sftp(): print('sftp 작업을 시작합니다')
3. File 디렉토리
▪ "dags_python_import_func.py"는 "airflow/dags"에 위치
▪ "common_func.py"는 "airflow/pugins/common"에 위치

4. ".env" 파일 만들기
▪ plugins 폴더가 pythonpath로 잡혀있지 않기 때문에, ".env" 파일 만들어서 path 추가하기
WORKSPACE_FOLDER="airflow 최상위 디렉토리" PYTHONPATH=${WORKSPACE_FOLDER}/plugins

▪ ".env" 파일은 Git에 올릴 필요가 없기 때문에, ".gitignore" 파일에 추가해 줌

5. Dag 실행하기
▪ 변경사항과 dag 업로드하고 airflow 디렉토리에 git pull
▪ 그러면 다음과 같이 common 폴더에 "common_func.py" 파일이 생겼음

▪ webserver 접속해보면 dag이 정상적으로 올라가있음

▪ dag unpause 후 log 확인해보면 외부함수를 호출 후 실행한 결과가 나옴

참고:
Airflow 마스터 클래스 강의 - 인프런
데이터 파이프라인을 효율적으로 만들고 관리하기 위한 Orchestration 도구인 Airflow에 대해 배우는 강의입니다. 초보자도 차근차근 배울 수 있는 Airflow 마스터 클래스, 환영합니다!, 데이터 파이프
www.inflearn.com
'Data Engineering > Airflow' 카테고리의 다른 글
[Airflow] Python 함수 파라미터 (0) | 2024.01.23 |
---|---|
[Airflow] Task decorator 사용하기 (1) | 2024.01.21 |
[Airflow] Python Operator 기본 (0) | 2024.01.21 |
[Airflow] Email Operator 사용하기 (1) | 2024.01.21 |
[Airflow] Bash Operator & 외부 Shell 파일 실행하기 (0) | 2024.01.20 |