Search

Airflow DagFile Processing error 해결 및 파싱 안정화 (DagBag import timeout, EFS를 얹은)

Airflow DagFile Processing error 해결 및 파싱 안정화 (DagBag import timeout, EFS를 얹은)

문제의 코드

저는 dynamic dag + dynamic task를 사용해 동적으로 dag를 대량으로 parsing하려고 했습니다.
아래는 예시 코드입니다.
from airflow import DAG from airflow.models import Variable from airflow.operators.empty_operator import EmptyOperator from airflow.providers.amazon.aws.operators.eks import EksPodOperator from datetime import datetime aws_account = Variable.get("AWS_ACCOUNT_ID") aws_region = Variable.get("AWS_REGION") def create_eks_task(etl_type: str, is_incremental: str, dag: DAG) -> EksPodOperator: task = f"eks_batch_{etl_type.lower()}" return EksPodOperator( task_id=task, pod_name=f"eks-batch-{etl_type.lower()}-123456", cluster_name="cluster", namespace="airflow", aws_conn_id="eks-pod-conn-id", container_resources={ "request_memory": "1Gi", "request_cpu": "500m", "limit_memory": "2Gi", "limit_cpu": "1000m", }, on_finish_action="delete_pod", get_logs=True, log_events_on_failure=True, logging_interval=10, dag=dag, image=f"{aws_account}.dkr.ecr.{aws_region}.amazonaws.com/batch:20241203", env_vars={ "ETL_TYPE": etl_type, "ETL_DATE": "{{ logical_date.strftime('%Y-%m-%d-%H-%M-%S') }}", "IS_INCREMENTAL": is_incremental, }, ) def generate_dag(etl_type: str) -> DAG: with DAG( dag_id=f"simplified_{etl_type.lower()}", default_args={ "owner": "airflow", "depends_on_past": False, "retries": 1, }, description=f"Simplified ETL job for {etl_type}", schedule_interval="0 0 * * *", start_date=datetime(2024, 11, 27), catchup=False, tags=["etl", "eks-pod-operator", etl_type.lower()], ) as dag: start = EmptyOperator(task_id="start") etl_task = create_eks_task(etl_type, "false", dag) end = EmptyOperator(task_id="end") start >> etl_task >> end return dag # Create DAGs dags = { "simplified_1": generate_dag("batch1"), "simplified_2": generate_dag("batch2"), ...... } globals().update(dags)
Python
복사
하지만 에어플로우 실행 후 아래와 같이 import 에러가 발생했습니다
import error Broken DAG: [/opt/airflow/git-sync/repo/airflow/dags/xx.py] Traceback (most recent call last): File "/usr/local/lib/python3.10/selectors.py", line 416, in select fd_event_list = self._selector.poll(timeout) File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/timeout.py", line 69, in handle_timeout raise AirflowTaskTimeout(self.error_message) airflow.exceptions.AirflowTaskTimeout: DagBag import timeout for /opt/airflow/git-sync/repo/airflow/dags/xx.py after 30.0s. Please take a look at these docs to improve your DAG import time: *
문제는 Airflow DAG 내에 운영 데이터베이스와 연결하는 코드에 있었습니다.
DAG 파일의 top level에 있는 코드는 파싱할 때마다 실행됩니다. 이는 실제 DAG가 실행되지 않더라도 발생합니다.
Variable.get()은 Airflow의 메타데이터 데이터베이스와 연결을 시도하는 작업인데요
스케쥴러의 매 파싱 시마다 DB 연결 시도하고
많은 DAG 파일이 있는 경우 DB 연결 부하가 증가하게 되며
메타 DB가 응답하지 않는 경우 전체 DAG 파싱이 실패합니다

1차 해결 방안 (코드적 조작)

운영 DB와 연결하는 코드를 제거하거나 조건부로 처리합니다.
Connection 정보를 하드코딩하지 않고 Airflow Connections를 사용합니다.
데이터베이스 연결 로직을 top-level에서 제거하고 실제 task 실행 시점으로 이동합니다.
Airflow best practice에 따라 Variable 호출 부분을 함수 내부로 이동합니다.
# 이전 코드 (문제가 되는 부분) from airflow import DAG # Top-level에서 DB 연결 시도 db = connect_to_prod_db()# 문제 발생!# 수정된 코드 -------변경--------- from airflow import DAG from airflow.hooks.base import BaseHook def get_data(**context): # Task 실행 시점에 연결 conn = BaseHook.get_connection('my_db_conn') # DB 작업 수행 aws_account = Variable.get("AWS_ACCOUNT_ID") aws_region = Variable.get("AWS_REGION")
Python
복사
하지만 위와 같이 해도 dynamic dag가 파싱하는 dag가 늘어나면 import error가 다시 발생하게 됩니다..

2차 해결 방안 (Config 조작)

Scheduler pod의 Scheduler container에서 dag_processor_manager.log를 확인한 결과, 실패한 DAG의 실행 시간과 전체 DAG 파싱 시간이 확인되었습니다.
dag file processing 시간확인 방법
/airflow/logs/dag_processor_manager 경로에서 dag_processor_manager.log 확인하면 됩니다
예시
scheduler에 dag_processor라는것이 포함되어 있고, 이것이 dag를 parsing하는 역할을 해주게 됩니다.
DagFileProcessing의 두가지 주요 구성요소를 알면 더욱 좋습니다.
DagFileProcessorManager: 처리해야할 파일을 결정하는 무한루프를 실행하는 프로세스
새 파일을 확인: Dag가 마지막으로 수정된 후 경과시간> dag_dir_list_interval 이면 file paths list 업데이트
최근에 처리된 파일제외: min_file_process_interval> 파일 최근수정시간 인 파일을 제외
파일 경로 대기열에 추가: 발견한 파일을 경로 큐에 추가
파일 처리: 파일마다 최대 parsing_process까지 새 DagFileProcessProcess시작
결과수집: 완료된 Dag processor들의 결과수집
통계기록: 통계 출력하고 dag_processing.total_parse_time을 보냄
DagFileProcessorProcess 개별파일을 하나이상의 Dag객체로 변환하기위해 시작되는 별도 프로세스
파일 처리: dag_file_processor_timeout 안에 전체프로세스를 완료해야함
Dag 파일을 python모듈로 로드: dagbag_import_timeout안에 완료되어야함
모듈 처리: python모듈내에서 Dag객체를 찾음
DagBag 리턴: Dag 객체 목록을 DagFileProcessorManager에게 리턴
즉, DagFileProcessManger가 새로 dag로 읽어들일 파일목록을 갱신하고 개별 파일을 DagFileProcessorProcess가 Dag객체로 변환해주게 되는데…
이 과정에서 dagbag_import_timeout 에러가 난 부분을 보면 결국 dag파일을 python모듈로 로드하는 시간이 너무 길어 import error를 반환하게 된 것입니다.
이와 관련된 Airflow 설정은 다음과 같습니다
parsing_processes: DAG 파싱을 위한 프로세스 수, 기본값은 2입니다.
dag_file_processor_timeout: DAG 파싱 가능 여부를 체크할 수 있는 최대 대기 시간, 기본값은 50초입니다.
dagbag_import_timeout: DAG 객체가 Airflow에서 사용 가능한 상태가 되는 데 걸리는 최대 시간, 기본값은 30초입니다.
이 중 dag_file_processor_timeoutdagbag_import_timeout이 현상과 관련이 있었고,
AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT를 60으로 늘려서 문제를 해결했습니다.
해결책 설명
dagbag_import_timeout 설정은 DAG 객체가 Airflow에서 사용 가능한 상태가 되는 데 걸리는 최대 시간을 제어합니다.
기본값 30초로는 DAG 파싱 시간이 길어져 import error가 발생했습니다.
이 값을 60초로 늘리면, DAG 객체가 Airflow에서 사용 가능한 상태가 되는 데 걸리는 최대 시간이 증가하여, 파싱 실패를 방지할 수 있었습니다.

3차 코드적 추가 조작

다이나믹 task를 실행하게 되면서 각 태스크마다 variable을 접근하게 되는데 이러한 패턴은 아래와 같은 문제를 일으키게 됩니다
def get_eks_config(**context): # 각 태스크 생성 시점에 Variable에 접근 aws_account = Variable.get("AWS_ACCOUNT_ID") aws_region = Variable.get("AWS_REGION") aws_access_key = Variable.get("AWS_ACCESS_KEY") aws_secret_key = Variable.get("AWS_SECRET_KEY") # ...
SQL
복사
DAG 파싱 시점에 모든 태스크에 대해 Variable 접근이 발생
connection error가 있는 경우 각 태스크마다 timeout(xx초)을 기다림
여러 태스크가 있는 경우 파싱 시간이 크게 증가
개선 코드
def create_eks_task(dag: DAG) -> EksPodOperator: def get_eks_config(**context): # templated string으로 변경하여 런타임에 값을 가져오도록 함 return { "image": "{{ var.value.AWS_ACCOUNT_ID }}.dkr.ecr.{{ var.value.AWS_REGION }}.amazonaws.com/de-batch.pos-staging.togethers:{{ var.value.TOGETHERS_ETL_ECR_IMAGE }}", "region": "{{ var.value.AWS_REGION }}", "env_vars": { "AWS_ACCESS_KEY": "{{ var.value.AWS_ACCESS_KEY }}", "AWS_SECRET_KEY": "{{ var.value.AWS_SECRET_KEY }}", }, }
SQL
복사
위와 같이 템플릿 문자열을 이용해 런타임에 값을 가져오게 합니다
DAG 파싱 시점에는 Variable에 접근하지 않음
실제 태스크 실행 시점에 Variable 값을 가져옴
DAG 파싱 성능이 크게 향상됨

4차 해결 방안 (EFS 사용)

위와 같은 조치를 하고서도 시간이 지나니 다시 parsing error가 발생했습니다….
그래서 로컬에서의 Dag Parsing을 체크해보니, 위의 300초 씩이나 걸리는 dag parsing과 거리가 먼 성능이 측정되었습니다.
필자는 dag git sync를 위해 storage class를 efs를 사용 중이었는데요.
위 로컬 시스템이 로컬 파일 시스템임을 감안해도 성능 차이가 너무 많이 발생했습니다.
문득 Dag Parsing의 병목 지점이 이 파일 시스템일 수 있겠다는 생각이 들었습니다.
EFS가 무엇인지 알아보겠습니다
EFS의 특징과 한계
1.
네트워크 기반 파일시스템
모든 파일 접근에 네트워크 지연 발생
I/O 작업이 로컬 파일시스템보다 훨씬 느림
2.
일관성을 위한 오버헤드
여러 노드에서 동시 접근 시 일관성 보장 필요
이로 인한 추가 지연 발생
현재 상황:
Airflow Scheduler → EFS → Git-Sync → DAG 파일
Plain Text
복사
모든 DAG 파일 접근이 EFS를 통해 이루어짐
파싱 과정에서 반복적인 파일 읽기 발생
EFS의 네트워크 지연이 누적됨
그렇다면 EFS의 성능을 올리기 위해 어떤 것을 적용할 수 있을까요?
바로 EFS Provisioned Throughput 입니다.
EFS Provisioned Throughput은 Amazon EFS에서 제공하는 처리량 모드 중 하나입니다.
1.
EFS의 처리량 모드
Bursting Throughput (기본 모드)
파일 시스템 크기에 따라 처리량이 자동으로 확장/축소
기본 처리량 + 버스트 크레딧으로 추가 처리량 제공
Provisioned Throughput (프로비저닝된 처리량)
파일 시스템 크기와 상관없이 일관된 처리량 보장
미리 지정한 MiB/s 만큼의 처리량을 항상 사용 가능
추가 비용 발생
2.
사용 시나리오:
# Bursting이 적합한 경우 - 간헐적인 높은 처리량이 필요할 때 - 워크로드가 예측 가능할 때 # Provisioned가 적합한 경우 - 지속적으로 높은 처리량이 필요할 때 - DAG 파싱처럼 일관된 성능이 중요할 때 - 파일 시스템 크기가 작아도 높은 처리량이 필요할 때
Plain Text
복사
비용 고려사항:
Provisioned: 지정한 처리량에 대해 추가 비용 발생
Bursting: 파일 시스템 크기에 따른 기본 비용만 발생
AWS EFS의 처리량 모드를 Provisioned로 수정하고 eks에 배포된 airflow의 dag parsing을 확인해보니 무려 300초 → 5초로 안정화되었습니다

내 해결 방안들 정리

1.
메타 데이터 베이스 조작 로직을 함수 내부로 이동
2.
variable과 같은 import 병목 지점을 런타임에 가져올 수 있도록 한다
3.
dag processor 관련 설정을 늘렸더니 해결
a.
AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT
b.
AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT
4.
scheduler.min_file_process_interval = 300
a.
파싱 인터벌도 알맞게 수정
5.
dag를 저장하는 파일시스템 점검 (EFS)
나름의 교훈 도출 사실 airflow의 Best practice를 적용하지 않더라도 4차 수정만 했더라도 문제는 해결되었을 것입니다. 언제나 문제의 Bottleneck이 무엇인지 고민하는 태도가 정말 중요한 것 같습니다. (명탐정 코난 같기도 하네요)

참고 문서들

위 Airflow Best Practice를 보다가 인상 깊었던 부분을 추가적으로 정리해보았습니다.