Search

Airflow Latest only operator 작성

Airflow Latest only operator 작성

Airflow의 DAG는 backfill과 같이 현재 시점이 아닌 다른 시점에 실행될 수 있음
이 때 특정 날짜가 실행되지 않기를 원하면 LatestOnlyOperator를 사용
최근 실행한 DAG가 아닌 경우, 모든 downstream Task를 skip
import datetime as dt from airflow import DAG from airflow.operators.dummy import DummyOperator from airflow.operators.latest_only import LatestOnlyOperator from airflow.utils.trigger_rule import TriggerRule with DAG( dag_id='latest_only_with_trigger', schedule_interval=dt.timedelta(hours=4), start_date=dt.datetime(2021, 1, 1), catchup=False, tags=['example3'], ) as dag: latest_only = LatestOnlyOperator(task_id='latest_only') task1 = DummyOperator(task_id='task1') task2 = DummyOperator(task_id='task2') task3 = DummyOperator(task_id='task3') task4 = DummyOperator(task_id='task4', trigger_rule=TriggerRule.ALL_DONE) latest_only >> task1 >> [task3, task4] task2 >> [task3, task4]
Shell
복사
기본적인 코드는 위와 같습니다
task1은 latest_only 와 바로 의존관계가 있는 downstream이므로 최근 날짜의 실행이 아니면 skip
task2는 latest_only와 바로 의존관계가 있지 않으므로 과거 날짜도 실행
task3은 trigger rule이 default(ALL_SUCCESS)이므로 task1이 최근 날짜의 실행이 아니면 skip하여 task3도 skip하게 됨
task4는 trigger rule이 ALL_DONE으로 설정되었으므로 실행
LatestOnlyOperator는 작업 흐름에서 특정 시점에만 최신 실행을 허용합니다.
만약 이미 실행 중인 작업이 있다면, 이 오퍼레이터 뒤에 오는 작업들은 스킵됩니다.
start_sensor >> etl_pipeline_task >> latest_only >> mongo_upload_task >> end_sensor
Shell
복사
mongo_upload_task >> end_sensor 가 스킵될 것입니다.
이와 같이 latest_only operator를 사용하면 최신 dag만 실행하게 할 수 있습니다.
1시간 마다 실행되는 dag이고 이전 작업이 1시간 이상 걸려서 다음 스케줄까지 실행된다면 다음 스케줄 작업은 skip할 수 있습니다.
LatestOnlyOperatorstart_sensor보다 앞에 배치하면, DAG의 동작이 예상과 다를 수 있습니다. LatestOnlyOperator는 자신 다음에 오는 모든 태스크에 대해 "최신 실행만 실행하라"는 지시를 합니다. 그런데 이를 start_sensor 앞에 배치하면, DAG이 시작할 때마다 LatestOnlyOperator가 먼저 실행되어 현재 실행이 최신인지를 검사하게 됩니다.
만약 현재 실행이 최신이 아닐 경우 (즉, 동일한 DAG에 대한 더 최신의 실행이 존재할 경우), LatestOnlyOperator 뒤에 오는 모든 태스크는 스킵되고, 그 중 하나인 start_sensor도 실행되지 않습니다. 결과적으로 DAG의 실행이 시작되자마자 종료되어, etl_pipeline_task 및 이후에 정의된 모든 태스크들이 실행되지 않습니다.
이러한 설정은 일반적으로 원치 않는 결과를 초래할 수 있으며, 특히 start_sensor와 같은 시작 지점 태스크의 경우에는 더욱 그렇습니다. LatestOnlyOperator는 일반적으로 DAG의 중간 또는 마지막 부분에 배치하여, 특정 태스크 또는 태스크 그룹이 최신 DAG 실행에만 의존하도록 설정하는 것이 좋습니다.
그렇다면 질문
etl_pipeline_task가 실행 중인데 다른 최신의 etl_pipeline_task가 실행되면 어떻게 될까요?
만약 etl_pipeline_task가 실행 중이고, 동시에 또 다른 최신의 etl_pipeline_task가 실행되려고 할 때 LatestOnlyOperator의 동작은 다음과 같다고 합니다
1.
LatestOnlyOperator의 위치와 작동 방식: LatestOnlyOperatorprs_etl_pipeline_task 이후에 배치되어 있기 때문에, prs_etl_pipeline_task의 실행 여부와 관계없이 해당 태스크가 실행되고 완료됩니다. 그 후 LatestOnlyOperator가 실행됩니다.
2.
최신 DAG 실행 확인: LatestOnlyOperator는 실행 시점에서 가장 최신의 DAG 실행인지를 확인합니다. 만약 현재 실행 중인 prs_etl_pipeline_task가 최신이 아니라면, 즉 더 최신의 prs_etl_pipeline_task가 실행 중이거나 대기 중이라면, LatestOnlyOperator 뒤에 오는 trigger_mongo_upload_des_task는 스킵됩니다.
3.
최신 실행에 대한 처리: 만약 현재 실행 중인 prs_etl_pipeline_task가 가장 최신의 실행이라면, LatestOnlyOperator는 아무런 영향을 주지 않고 trigger_mongo_upload_des_task는 정상적으로 실행됩니다.

참고

과거에 강의를 들으면서 적었던 내용이 있어 다시 복기해볼 겸 가져와 보았다

LatestOnlyOperator

LatestOnlyOperator를 사용하면 가장 최근에 스케줄된 dag만 실행됩니다.
Time-sensitive한 태스크들이 과거 데이터의 backfill시 실행되는 것을 막기 위함
현재 시간이 지금 태스크가 처리하는 execution_date보다 미래이고 다음 execution_date보다는 과거인 경우에만 뒤로 실행을 이어가고 아니면 여기서 중단됨
현재 시간이 execution_time과 다음 스케줄된 execution_time 사이가 아닌 경우, LatestOnlyOperator는 모든 하위 태스크를 스킵합니다.
task의 status는 success, failed, skipped 이렇게 3 종류
t1 >> t3 >> [t2, t4]
예시 코드
from airflow.operators.latest_only import LatestOnlyOperator from airflow.operators.empty import EmptyOperator with DAG( dag_id='latest_only_example', schedule=timedelta(hours=48), # 매 48시간마다 실행되는 DAG로 설정 start_date=datetime(2023, 6, 14), catchup=True) as dag: t1 = EmptyOperator(task_id='task1') t2 = LatestOnlyOperator(task_id = 'latest_only') t3 = EmptyOperator(task_id='task3') t4 = EmptyOperator(task_id='task4') t1 >> t2 >> [t3, t4]
Python
복사
LatestOnlyOperator t2가 backfill을 할 때, Time-sensitive한 태스크인 t3, t4가 실행되는 것을 막아줄 수 있다.
airflow dags test Learn_LatestOnlyOperator 2023-06-15
현재 시간 : 2023-06-22
execution_date : 2023-06-15, 다음 스케쥴링 시간 : 2023-06-17
현재 시간이 execution_time과 다음 스케줄된 execution_time 사이가 아닌 경우이기 때문에 다음 2개의 태스크를 막는다
2023-06-18일 오후 늦게 실행 과거 날짜가 catchup=True 때문에 실행되었지만 스킵됨