Search

[Airflow] Dag Dependencies (TriggerDagRunOperator, Sensor, ExternalTaskSensor)

Dag를 실행하는 방법

주기적 실행: schedule로 지정
다른 Dag에 의해 트리거
Explicit Trigger: Dag A가 분명하게 Dag B를 트리거 (TriggerDagRunOperator)
Reactive Trigger: Dag B가 Dag A가 끝나기를 대기 (ExternalTaskSensor)
알아두면 좋은 상황에 따라 다른 태스크 실행 방식들
조건에 따라 다른 태스크로 분기 (BranchPythonOperator)
과거 데이터 Backfill시에는 불필요한 태스크 처리 (LatestOnlyOperator)
앞단 태스크들의 실행상황
어떤 경우에는 앞단이 실패해도 동작해야하는 경우가 있을 수 있음
2가지 방법이 존재
Explicit trigger
TriggerDagRunOperator
DAG A가 명시적으로 DAG B를 트리거한다.
의존관계가 명확해진다
Reactive trigger
ExternalTaskSensor
DAG B가 DAG A의 태스크가 끝나기를 대기
이 경우 DAG A는 이 의존관계에 대한 사실을 모름 (의존 관계 불명확)
이에 대한 이슈가 생길 수 있음

TriggerDagRunOperator

DAG A의 태스크를 TriggerDagRunOperator로 구현
from airflow.operators.trigger_dagrun import TriggerDagRunOperator trigger_B = TriggerDagRunOperator( task_id="trigger_B", trigger_dag_id="트리거하려는DAG이름" )
Python
복사
trigger_dag_id가 중요하다
이 operator가 시작하게 해야하는 DAG의 id

TriggerDagRunOperator

원문
trigger_dag_id (str)
트리거할 dag_id (템플릿화 가능).
trigger_run_id (str | None)
트리거된 DAG 실행을 위한 run ID (템플릿화 가능). 제공되지 않으면, run ID는 자동으로 생성됩니다.
conf (dict | None)
DAG 실행을 위한 설정 (템플릿화 가능).
execution_date (str | datetime.datetime | None)
dag의 실행 날짜 (템플릿화 가능).
reset_dag_run (bool)
이미 존재하는 dag run을 초기화할지 여부.
이것은 기존의 dag run을 백필하거나 재실행할 때 유용합니다. dag run을 초기화만 할 뿐 재생성하지 않습니다.
Dag run conf는 변경 불가능하며, 기존 dag run의 재실행 시 초기화되지 않습니다.
reset_dag_run=False이고 dag run이 존재하는 경우, DagRunAlreadyExists가 발생합니다.
reset_dag_run=True이고 dag run이 존재하는 경우, 기존의 dag run은 재실행을 위해 초기화됩니다.
True일 경우 해당 날짜가 이미 실행되었더라는 다시 재실행
wait_for_completion (bool)
dag run 완료를 기다릴지 여부. (기본값: False)
트리거 대상 DAG가 끝날 때까지 기다릴지 여부를 결정. 디폴트값은 False
poke_interval (int)
wait_for_completion=True일 때 dag run 상태를 확인하는 poke interval. (기본값: 60)
TriggerDagRunOperator 예시
airflow.cfg의 dag_run_conf_overrides_params가 True로 설정되어 있어야함
from airflow.operators.trigger_dagrun import TriggerDagRunOperator trigger_B = TriggerDagRunOperator( task_id="trigger_B", trigger_dag_id="트리거 하려는 DAG 이름", conf={ 'path': '/opt/ml/conf' }, execution_date="{{ ds }}", # Jinja 템플릿을 통해 DAG A의 execution_date을 패스 reset_dag_run=True, # True일 경우 해당 날짜가 이미 실행되었더라도 다시 재실행 wait_for_completion=True # DAG B가 끝날 때까지 기다릴지 여부를 결정. 디폴트값은 False )
Python
복사
conf={ 'path': '/opt/ml/conf' }
# DAG B에 넘기고 싶은 정보. DAG B에서는 Jinja 템플릿(dag_run.conf["path"])으로 접근 가능. # DAG B PythonOperator(**context)에서라면 kwargs['dag_run'].conf.get('path')
Python
복사
TriggerDagRunOperator 실습
먼저 실행이 되는 SourceDag - TriggerDag.py
from airflow import DAG from airflow.operators.trigger_dagrun import TriggerDagRunOperator from datetime import datetime dag = DAG( dag_id='SourceDag', start_date=datetime(2023, 6, 19), schedule='@daily' ) trigger_task = TriggerDagRunOperator( task_id='trigger_task', trigger_dag_id='TargetDag', conf={'path': 'value1'}, execution_date='{{ ds }}', # {{ ds }} -> source dag가 실행될 때의 execution date reset_dag_run=True, dag=dag )
Python
복사
execution_date='{{ ds }}'
source dag가 실행될 때의 execution date
airflow tasks test SourceDag trigger_task 2023-06-19
트리거링되는 TargetDag - TargetDag.py
from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime dag = DAG( dag_id='TargetDag', schedule='@once', # 매일 실행 start_date=datetime(2023, 6, 1), ) task1 = BashOperator( task_id='task1', bash_command="""echo '{{ ds }}, {{ dag_run.conf.get("path", "none") }}' """, dag=dag )
Python
복사

Sensor + ExternalTaskSensor ,BranchPythonOperator ,LatestOnlyOperator ,Trigger Rules

Sensor란 무엇인가?

Sensor는 특정 조건이 충족될 때까지 대기하는 Operator
Sensor는 외부 리소스의 가용성이나 특정 조건의 완료와 같은 상황 동기화에 유용
Airflow는 몇 가지 내장 Sensor를 제공
FileSensor: 지정된 위치에 파일이 생길 때까지 대기
HttpSensor: HTTP 요청을 수행하고 지정된 응답이 대기
SqlSensor: SQL 데이터베이스에서 특정 조건을 충족할 때까지 대기
TimeSensor: 특정 시간에 도달할 때까지 워크플로우를 일시 중지
ExternalTaskSensor: 다른 Airflow DAG의 특정 작업 완료를 대기
️ Sensor의 동작은 기본적으로 주기적으로 poke를 하는 것
Sensor의 mode라는 파라미터를 통해 Sensor가 동작하는 방식(기다리는 방식)이 결정됨
mode의 값은 reschedule 혹은 poke가 됨
poke
'poke'는 Sensor 작업이 특정 조건을 확인하기 위해 주기적으로 폴링(polling)하는 동작을 의미합니다.
Sensor는 특정 작업에서 'poke'라는 동작을 반복적으로 수행하며, 이때 worker 하나가 해당 Sensor에 할당됩니다. Sensor는 작업의 완료를 기다리며 주기적으로 상태를 확인합니다. 만약 해당 작업이 완료되지 않으면 Sensor는 다시 sleep 상태로 전환되고 일정 시간 후에 다시 'poke' 동작을 수행합니다. 또한 Sensor는 주기적으로 조건을 체크하며 조건이 충족될 때까지 대기합니다.
'poke' 간에는 sleep 상태가 되는데, 이는 worker가 다른 작업을 수행하지 못하게 하므로 일종의 자원 낭비가 발생합니다. 즉, worker 하나가 Sensor 하나에 전담되기 때문에 그 worker가 수행할 수 있는 다른 작업들이 제한됩니다.
이러한 이유로 'poke' 방식은 자원 사용에 있어 비효율적일 수 있습니다. 하지만, 이 방식이 Sensor의 기본 mode이기 때문에 디폴트 설정으로 사용되는 경우가 많습니다.
reschedule
Sensor가 작업을 잠시 중지('release')하고 나중에 다시 시작('reschedule')하는 방식을 사용합니다. 이 방식에서는 'poke'를 수행한 후에 worker를 다른 작업에 할당할 수 있으므로, 자원 사용 측면에서 'poke' 방식보다 효율적입니다. 이는 여러 작업을 병렬로 수행할 수 있게 하므로 전반적인 처리 시간을 단축할 수 있습니다.
그러나 이 방식의 단점은 worker를 다시 잡을 수 있는지가 보장되지 않는다는 점입니다. 즉, worker가 다른 작업에 할당된 후에는 원래의 Sensor 작업을 재개할 수 없을 수도 있습니다. 이로 인해 작업의 완료 시간이 예측보다 지연될 수 있습니다.

ExternalTaskSensor

DAG B의 ExternalTaskSensor 태스크가 DAG A의 특정 태스크가 끝났는지 체크함
먼저 동일한 schedule_interval을 사용
이 경우 두 태스크들의 Execution Date이 동일해야함. 아니면 매칭이 안됨!
맞아야하는 조건이 꽤 까다로워서 사용하는데 어려움이 존재
from airflow.sensors.external_task import ExternalTaskSensor waiting_for_end_of_dag_a = ExternalTaskSensor( task_id='waiting_for_end_of_dag_a', external_dag_id='DAG이름', external_task_id='end', timeout=5*60, mode='reschedule' # poke로 하면 worker를 하나 낭비 )
Python
복사
만일 DAG A와 DAG B가 서로 다른 schedule interval을 갖는다면 ?
예를 들어 DAG A가 DAG B보다 5분 먼저 실행된다면?
execution_delta를 사용
execution_date_fn을 사용하면 조금 더 복잡하게 컨트롤 가능
만일 두개의 DAG가 서로 다른 frequency를 갖고 있다면 이 경우 ExternalTaskSensor는 사용불가 ⇒ 사용이 Tricky하다…
from airflow.sensors.external_task import ExternalTaskSensor waiting_for_end_of_dag_a = ExternalTaskSensor( task_id='waiting_for_end_of_dag_a', external_dag_id='DAG이름', external_task_id='end', timeout=5*60, mode='reschedule', execution_delta=timedelta(minutes=5) )
Python
복사

BranchPythonOperator

상황에 따라 뒤에 실행되어야할 태스크를 동적으로 결정해주는 오퍼레이터
미리 정해준 Operator들 중에 선택하는 형태로 돌아감
python_callable로 전달받은 함수가 조건에 따라 반환한 Task_id 또는 Task_id 목록을 실행하도록 한다. 분기 논리가 Python 함수로 간단히 구현할 수 있을 때 사용한다.
TriggerDagOperator 앞에 이 오퍼레이터를 사용하는 경우도 있음
mode가 dev면 아무 것도 안하다가 mode가 dev가 아니면 trigger_b를 수행
개발 중일 때는 이 뒤에 트리거를 실행 안하다가 프로덕션 환경(prod)일 때만 실행하자
from airflow.operators.python import BranchPythonOperator # 상황에 따라 뒤에 실행되어야 하는 태스크를 리턴 def skip_or_cont_trigger(): if Variable.get("mode", "dev") == "dev": return [] else: return ["trigger_b"] # "mode"라는 Variable의 값이 "dev"이면 trigger_b 태스크를 스킵 branching = BranchPythonOperator( task_id='branching', python_callable=skip_or_cont_trigger, )
Python
복사
예제
태스크 설명 : UTC 기준으로 12시전이면 morning_task로 가고 아니면 afternoon_task로 브랜치
current_hour: 0 INFO - Following branch morning_task Skipping tasks ['afternoon_task']
Python
복사
위의 경우, 실행되지 않은 afternoon_task의 상태는 skipped가 됨
from airflow import DAG from airflow.operators.empty import EmptyOperator from airflow.operators.python import BranchPythonOperator from datetime import datetime default_args = { 'start_date': datetime(2023, 1, 1) } dag = DAG( 'Learn_BranchPythonOperator', schedule='@daily', default_args=default_args) def decide_branch(**context): current_hour = datetime.now().hour print(f"current_hour: {current_hour}") if current_hour < 12: return 'morning_task' else: return 'afternoon_task' branching_operator = BranchPythonOperator( task_id='branching_task', python_callable=decide_branch, dag=dag ) morning_task = EmptyOperator( task_id='morning_task', dag=dag ) afternoon_task = EmptyOperator( task_id='afternoon_task', dag=dag ) branching_operator >> morning_task branching_operator >> afternoon_task
Python
복사

LatestOnlyOperator

LatestOnlyOperator를 사용하면 가장 최근에 스케줄된 dag만 실행됩니다.
Time-sensitive한 태스크들이 과거 데이터의 backfill시 실행되는 것을 막기 위함
현재 시간이 지금 태스크가 처리하는 execution_date보다 미래이고 다음 execution_date보다는 과거인 경우에만 뒤로 실행을 이어가고 아니면 여기서 중단됨
현재 시간이 execution_time과 다음 스케줄된 execution_time 사이가 아닌 경우, LatestOnlyOperator는 모든 하위 태스크를 스킵합니다.
task의 status는 success, failed, skipped 이렇게 3 종류
t1 >> t3 >> [t2, t4]
예시 코드
from airflow.operators.latest_only import LatestOnlyOperator from airflow.operators.empty import EmptyOperator with DAG( dag_id='latest_only_example', schedule=timedelta(hours=48), # 매 48시간마다 실행되는 DAG로 설정 start_date=datetime(2023, 6, 14), catchup=True) as dag: t1 = EmptyOperator(task_id='task1') t2 = LatestOnlyOperator(task_id = 'latest_only') t3 = EmptyOperator(task_id='task3') t4 = EmptyOperator(task_id='task4') t1 >> t2 >> [t3, t4]
Python
복사
LatestOnlyOperator t2가 backfill을 할 때, Time-sensitive한 태스크인 t3, t4가 실행되는 것을 막아줄 수 있다.
airflow dags test Learn_LatestOnlyOperator 2023-06-15
현재 시간 : 2023-06-22
execution_date : 2023-06-15, 다음 스케쥴링 시간 : 2023-06-17
현재 시간이 execution_time과 다음 스케줄된 execution_time 사이가 아닌 경우이기 때문에 다음 2개의 태스크를 막는다
2023-06-18일 오후 늦게 실행 과거 날짜가 catchup=True 때문에 실행되었지만 스킵됨

Trigger Rules이란?

Upstream 태스크의 성공실패 상황에 따라 뒷단 태스크의 실행여부를 결정하고 싶다면?
보통 앞단이 하나라도 실패하면 뒷 단의 태스크는 실행불가
모든 Operator에 trigger_rule이란 파라미터로 결정 가능
trigger_rule은 태스크에 주어지는 파라미터로 다음과 같은 값이 가능
all_success (기본값), all_failed, all_done, one_failed, one_success, none_failed, none_failed_min_one_success
Trigger Rule의 가능값 (airflow.utils.trigger_rule.TriggerRule)
ALL_SUCCESS: (기본값) 모든 부모 작업이 성공한 경우에 실행됩니다.
ALL_FAILED: 모든 부모 작업이 실패하거나 상위 실패 상태에 있는 경우에 실행됩니다.
ALL_DONE: 모든 부모 작업이 실행을 완료한 경우에 실행됩니다. (성공 또는 실패 여부와 상관없이)
ONE_FAILED: 적어도 하나의 부모 작업이 실패한 경우에 즉시 실행되며, 모든 부모 작업이 완료되기를 기다리지 않습니다.
ONE_SUCCESS: 적어도 하나의 부모 작업이 성공한 경우에 즉시 실행되며, 모든 부모 작업이 완료되기를 기다리지 않습니다.
NONE_FAILED: 모든 부모 작업이 실패하지 않았거나(또는 상위 실패하지 않았거나) 즉, 모든 부모 작업이 성공하거나 건너뛰었을 경우에 실행됩니다.
NONE_FAILED_MIN_ONE_SUCCESS: 적어도 하나의 부모 작업이 완료되었지만 실패하지 않은 경우에 실행됩니다.
none_failed_or_skipped: 모든 부모 작업이 실패하지 않았고 최소한 하나의 부모 작업이 성공한 경우 실행됩니다.
none_skipped: 어떤 부모 작업도 "skipped" 상태가 아니므로 모든 부모 작업이 "success", "failed", 또는 "upstream_failed" 상태인 경우 실행됩니다.
dummy: 의존성이 시각적 표현을 위한 것일 뿐이며, 원하는 때에 트리거됩니다.
Trigger Rule 사용 예 (Learn_TriggerRule.py)
from airflow.utils.trigger_rule import TriggerRule with DAG("trigger_rules", default_args=default_args, schedule=timedelta(1)) as dag: t1 = BashOperator(task_id="print_date", bash_command="date") t2 = BashOperator(task_id="sleep", bash_command="sleep 5") t3 = BashOperator(task_id="exit", bash_command="exit 1") # 종료 코드 1로 프로세스 종료 -> 의도적 실패 t4 = BashOperator( task_id='final_task', bash_command='echo DONE!', trigger_rule=TriggerRule.ALL_DONE ) [t1, t2, t3] >> t4
Python
복사
final_task (task_id="final_task")은 BashOperator로 구현되어 있으며, "echo DONE!" 명령어를 실행하여 "DONE!" 메시지를 출력합니다. 이 작업은 print_date, sleep, exit 작업이 모두 완료(결정)되는 경우에만 실행됩니다.
DAG가 실행될 때 print_date 작업이 실행되고, 그 다음 sleep 작업이 실행되며 5초 동안 대기한 후, exit 작업이 실패합니다. 마지막으로 final_task 작업이 실행되어 "DONE!" 메시지가 출력됩니다.
실제 실행 후 Web UI에서의 모습

팁: Airflow 메타데이터 DB 내용 살펴보기

airflow:airflow로 Postgres에 로그인 가능
docker exec -it airflow-setup-airflow-webserver-1 sh
그 다음에 아래 명령 수행
psql -h postgres
로그인
psql shell에서 아래 명령 수행
\dt List of relations Schema | Name | Type | Owner --------+--------------------------------+-------+--------- public | ab_permission | table | airflow public | ab_permission_view | table | airflow public | ab_permission_view_role | table | airflow public | ab_register_user | table | airflow public | ab_role | table | airflow public | ab_user | table | airflow public | ab_user_role | table | airflow public | ab_view_menu | table | airflow public | alembic_version | table | airflow public | callback_request | table | airflow public | celery_taskmeta | table | airflow public | celery_tasksetmeta | table | airflow public | connection | table | airflow public | dag | table | airflow public | dag_code | table | airflow public | dag_owner_attributes | table | airflow public | dag_pickle | table | airflow public | dag_run | table | airflow public | dag_run_note | table | airflow public | dag_schedule_dataset_reference | table | airflow public | dag_tag | table | airflow public | dag_warning | table | airflow public | dagrun_dataset_event | table | airflow public | dataset | table | airflow public | dataset_dag_run_queue | table | airflow public | dataset_event | table | airflow public | import_error | table | airflow public | job | table | airflow public | log | table | airflow public | log_template | table | airflow public | rendered_task_instance_fields | table | airflow public | serialized_dag | table | airflow public | session | table | airflow public | sla_miss | table | airflow public | slot_pool | table | airflow public | task_fail | table | airflow public | task_instance | table | airflow public | task_instance_note | table | airflow public | task_map | table | airflow public | task_outlet_dataset_reference | table | airflow public | task_reschedule | table | airflow public | trigger | table | airflow public | variable | table | airflow public | xcom | table | airflow (44 rows)
Python
복사
이 중에 dag_run이 dag가 실행되었던 기록이 남는 테이블이다.
쿼리 실행
select * FROM dag_run LIMIT 10; select count(1) FROM dag_run WHERE dag_id = '기록을조회하고싶은DAG’; DELETE FROM dag_run WHERE dag_id = '기록을삭제하고싶은DAG’;
Python
복사
위의 메타 DB는 네임드 볼륨으로 구성이 되어 있기 때문에 컨테이너를 죽이고 살리고 해도 안의 내용이 유지 및 살아있게 된다.

질문

Dag 몇몇개가 파싱이 안될때 어떻게 하면 될까?

Reference