Search

Backfill과 Airflow

Incremental Update의 실패

관리하는 데이터 파이프라인의 수가 늘어나면 이 중의 몇은 항상 실패하게 되며 이를 어떻게 관리하느냐가 데이터 엔지니어의 삶에 큰 영향을 준다
1.
Incremental Update가 실패하면?
2.
하루에 한번 동작하고 Incremental하게 업데이트하는 파이프라인이라면?
a.
만약 아래와 같이 실행될 경우, 이틀간의 정보가 빠져있게 된다
3.
실패한 부분을 재실행하는 것이 얼마나 중요한가?
⇒ 관리하는 데이터 파이프라인의 수가 늘어나면 이 중의 몇은 항상 실패하게 되며 이를 어떻게 관리하느냐가 데이터 엔지니어의 삶에 큰 영향을 준다
이제부터 할 이야기는 Incremental Update시에만 의미가 있습니다.
다시 한번 가능하면 Full Refresh를 사용하는 것이 좋음
문제가 생겨도 다시 실행하면 됨
Incremental Update는 효율성이 더 좋을 수 있지만 운영/유지보수의 난이도가 올라갑니다.
실수등으로 데이터가 빠지는 일이 생길 수 있음
과거 데이터를 다시 다 읽어와야하는 경우 다시 모두 재실행을 해주어야함

Backfill

Backfill의 용이성 여부 → 데이터 엔지니어 삶에 직접적인 영향!
Backfill의 정의
실패한 데이터 파이프라인을 재실행 혹은 읽어온 데이터들의 문제로 다시 다 읽어와야하는 경우를 의미
Backfill 해결은 Incremental Update에서 복잡해짐
Full Refresh에서는 간단하다. 그냥 다시 실행하면 된다,
즉, 실패한 데이터 파이프라인의 재실행이 얼마나 용이한 구조인가?
이게 잘 디자인된 것이 바로 Airflow

보통 Daily DAG를 작성한다고 하면 어떻게 할까?

지금 시간을 기준으로 어제 날짜를 계산하고 그 날짜에 해당하는 데이터를 읽어옴
from datetime import datetime, timedelta # 지금 시간 기준으로 어제 날짜를 계산 y = datetime.now() - timedelta(1) yesterday = datetime.strftime(y, '%Y-%m-%d') # yesterday에 해당하는 데이터를 소스에서 읽어옴 # 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면 sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"
Python
복사
그런데 지난 1년치 데이터를 Backfill 해야한다면?
from datetime import datetime, timedelta y = datetime.now() - timedelta(1) yesterday = datetime.strftime(y, '%Y-%m-%d') yesterday = '2023-01-01' # yesterday에 해당하는 데이터를 소스에서 읽어옴 # 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면 sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"
Python
복사
기존 ETL 코드를 조금 수정(반복문을 추가)해서 지난 1년치 데이터에 대해 돌린다
실수하기 쉽고 수정하는데 시간이 걸림
읽어와야 할 데이터를 현재 시간 기준으로 계산하는 순간, 빠지는 데이터가 생겨 나중에 Backfill 작업을 수행하려고 하면 오류가 발생한다.

어떻게 ETL을 구현해놓으면 이런 일이 편해질까?

시스템적으로 이걸 쉽게 해주는 방법을 구현한다
Airflow는 Incremental update 혹은 Full Refresh를 구분하지 못한다.
모든 DAG가 incremental 하게 작동된다고 가정함
날짜 별로 backfill 결과를 기록하고 성공 여부 기록: 나중에 결과를 쉽게 확인
이 날짜를 시스템에서 ETL의 인자로 제공
데이터 엔지니어는 읽어와야하는 데이터의 날짜를 계산하지 않고 시스템(Airflow)이 지정해준 날짜를 사용
execution_date
Airflow의 접근방식
ETL 별로 실행날짜와 결과를 메타데이터 데이터베이스에 기록
모든 DAG 실행에는 “execution_date”이 지정되어 있음
execution_date으로 채워야하는 날짜와 시간이 넘어옴
이를 바탕으로 데이터를 갱신하도록 코드를 작성해야함
이점: backfill이 쉬워짐

Execution Date?

일반적으로 일자별 배치는 하루가 마감된 새벽에 수행된다. 예를 들어, 2020–04–20 하루 24시간의 데이터를 집계하기 위한 Job은 2020–04–21 새벽에 수행된다.
즉, 2020–04–20 00:00 ~ 2020–04–21 00:00의 데이터를 집계한다는 것의 의미는 ‘2020–04–21’의 데이터가 아닌 ‘2020-04–20’ 의 데이터를 집계하는 것이므로, 날짜 파라미터를 left-bound하여 사용한다고 한다.
Airflow의 Execution Date은 현재 수행하는 일자로부터 left-bound된 값이 제공된다.
Task Instance Detail에서 실제 수행된 시간과 생성된 execution date을 볼 수 있다.
공식 문서 참고
그렇다면, start_date란?
이 시간에 처음 실행되는 것이 아니라 start_date + 실행주기가 처음 실행시간이 됨
2022-02-23 01:00:00이 start_date인 DAG가 있다면
Daily job이면 2022-02-24 01:00:00에 처음 실행됨
이때 execution_date으로는 2022-02-23 01:00:00이 들어옴
Hourly job이면 2022-02-23 02:00:00에 처음 실행됨
이때 execution_date으로는 2022-02-23 01:00:00이 들어옴
execution_date을 보고 업데이트할 데이터의 날짜를 정하게 코딩하면
Backfill이 코드 변경없이 가능
단 이는 incremental update를 하는 DAG에서만 의미가 있음

Airflow의 Backfill 방식 설명

Daily Incremental Update를 구현해야 한다면?
예를 들어 2020년 11월 7일날부터 매일 매일 하루치 데이터를 읽어온다고 가정해보자
이 경우 언제부터 해당 ETL이 동작해야하나?
2020년 11월 8일
그래야 11월 7일의 데이터를 읽어올 수 있기 때문
다르게 이야기하면 2020년 11월 8일날 동작하지만 읽어와야 하는 데이터의 날짜는?
2020년 11월 7일 ⇒ 이 것이 start_date
Airflow의 start_date은 시작 날짜라기는 보다는 읽어와야하는 데이터의 날짜
이 때, execution_date이란 시스템 변수로 읽어와야하는 데이터의 날짜를 지정
다시 말하지만, Airflow는 full refresh 하는지 incremental update를 하는지 모른다
incremental update를 한다고 가정한다
daily DAG는 start_date에 지정된 일자의 다음 날에 처음 실행된다
hourly DAG는 start_date에 지정된 시간의 다음 시간에 처음 실행된다

start_date과 execution_date 이해하기

2020-08-10 02:00:00로 start date로 설정된 daily job(DAG)이 있다
catch up이 True로 설정되어 있다고 가정 (디폴트가 True)
지금 시간이 2020-08-13 20:00:00이고 처음으로 이 job이 활성화되었다
질문: 이 경우 이 job은 몇번 실행될까? (execution_date) ⇒ 3번 실행
좌 : start date 및 실행되는 날짜/시간, 우 : execution date
2020-08-10 02:00:00 → 실행 X ⇒ start date 하루 뒤에 실행
2020-08-11 02:00:00 → 실행 ⇒ execution date : 2020-08-10 02:00:00
2020-08-12 02:00:00 → 실행 ⇒ execution date : 2020-08-11 02:00:00
2020-08-13 02:00:00 → 실행 ⇒ execution date : 2020-08-12 02:00:00
catch up이 False로 되어 있었다면 실행이 안되고 있을 것
Full refresh를 하다가 도저히 안되겠을 때 incremental update로 간다
밀린 작업들이기 때문에, 위의 작업들이 큐에 들어가서 위와 같이 execution date를 찍어내면서 실행이 된다

비용이 많이 나오게 될 경우

잘못된 만남 ㅜㅜ
최적화되지 않은 Airflow configuration (start_date, catchup, …)
엄청 큰 쿼리
BigQuery/Snowflake
Redshift는 문제가 없음. 뭐를 하건 월별 정액 지출.
2천불짜리 쿼리가 Airflow에 의해 8번 스케줄된다면?
예를 들어 daily job이 2020년 8월 6일을 start_date로 설정되었고 오늘 날짜가 8월 14일이라고 가정하자. 이 때 이 잡이 Enable되는 순간 (아래 그림 참고)
이 잡은 catchup 파라미터의 값에 따라 8번 자동 실행되게 된다. ⇒ 8번의 비용이 발생하게 됨 ⇒ 만약 1번의 비용이 크다면 어마어마한 손실을 초래
Incremental update가 필요한 경우 ⇒ catch up = True
Full refresh 혹은 과거에 실행된 것이 아무 의미 없는 경우 ⇒ catch up = False

Backfill과 관련된 Airflow 변수들

이름
설명
start_date
DAG가 처음 실행되는 날짜가 아니라 DAG가 처음 읽어와야하는 데이터의 날짜/시간. 실제 첫 실행날짜는 start_date + DAG의 실행주기
execution_date
DAG가 읽어와야하는 데이터의 날짜와 시간
catchup
DAG가 처음 활성화된 시점이 start_date보다 미래라면 그 사이에 실행이 안된 것들을 어떻게 할 것인지 결정해주는 파라미터. True가 디폴트값이고 이 경우 실행 안 된 것들을 모두 따라잡으려고 함. False가 되면 실행 안된 것들을 무시함
end_date
이 값은 보통 필요하지 않으며 Backfill을 날짜 범위에 대해 하는 경우에만 필요 airflow dags backfill -s …. -e …. -s : start date -e : end date

퀴즈 리뷰

Airflow에서 하나의 DAG는 다수의 Task로 구성된다.
Task = Airflow Operator
Airflow는 모든 DAG가 Incremental Update를 한다고 가정함
start_date: DAG의 처음 실행 시간이 아님
Incremenal update를 구현한다는 관점에서 처음 읽어와야하는 데이터의 날짜가 됨
Daily job이라면 start_date + 하루가 실제 DAG의 처음 실행 시간이 됨
매일 동작하는 DAG의 Start date이 2021-02-05라면 이 DAG의 첫 실행 날짜는?
2021-02-06
위 DAG의 경우 이때 execution_date으로 들어오는 날짜는?
2021-02-05
Hourly job이라면 start_date + 한시간이 실제 DAG의 처음 실행 시간이 됨
execution_date:
Airflow가 제공해주는 시스템 변수로 읽어와야할 데이터의 날짜와 시간이 들어옴
보통 daily job이면 start_date는 첫 execution_date와 같음
Schedule interval이 "30 * * * *"으로 설정된 DAG에 대한 올바른 설명은?
매시 30분마다 한번씩 실행된다
Schedule interval이 "0 * * * *"으로 설정된 DAG의 start date이 "2021-02-04 00:00:00"으로 잡혀있다면 이 DAG의 첫 번째 실행 날짜와 시간은 언제인가?
1시간에 한번 실행하는 hourly DAG
2021-02-04 01:00:00
Airflow의 DAG가 처음 ON이 되었을 때 start_date과 현재 날짜 사이에 실행이 안된 run들이 있을 경우 이를 실행한다. 이는 (??) 파라미터에 의해 결정된다. 이 파라미터를 False로 세팅하면 과거 실행이 안된 run을 무시한다
catchup 파라미터
아래와 같이 ON 이 되어있지 않으면 실행이 되지 않음
실행이 되었을 때 실행이 되지 않은 run들을 catchup 파라미터를 통해 실행할지 말지 결정 가능
다음 중 Redshift에서 큰 데이터를 테이블로 복사하는 방식을 제대로 설명한 것은?
복사할 레코드들을 파일로 저장해서 S3로 올린 후에 거기서 Redshift로 벌크 복사한다

Backfill을 커맨드라인에서 실행하는 방법

Daily Incremental DAG에서 2018년 7월달 데이터를 다 다시 읽어와야 한다면
Airflow가 추천하는 방식으로 Incremental Update를 구현했다면 Backfill이 쉬워짐
하지만 이를 어떻게 실행하나
하루에 31번 실행?
airflow dags test MySQL_to_Redshift_v2 2023-07-01
airflow dags test MySQL_to_Redshift_v2 2023-07-31
한번에 여러 날짜를 동시에 실행 가능한가?
구현 방법에 따라 한번에 하나씩 실행하는 것이 안전할 수 있음
이를 제어해주는 DAG 파라미터가 max_active_runs = 1
한번에 한 날짜가 실행되고 동기적으로 실행되게 함
커맨드라인
dag_id = backfill을 수행할 dag의 id
airflow dags backfill dag_id -s 2018-07-01 -e 2018-08-01
이 명령어를 수행하려면 아래의 가정이 필요합니다:
catchUpTrue로 설정되어 있음
execution_date을 사용해서 Incremental update가 구현되어 있음
start_date부터 시작하지만 end_date은 포함하지 않음
실행순서는 날짜/시간순은 아니고 랜덤. 만일 날짜순으로 하고 싶다면
DAG default_args의 depends_on_pastTrue로 설정
default_args = { 'depends_on_past': True,
SQL
복사

How to Make Your DAG Backfill ready

먼저 모든 DAG가 backfill을 필요로 하지는 않음
Full Refresh를 한다면 backfill은 의미가 없음
여기서 backfill은 일별 혹은 시간별로 업데이트하는 경우를 의미함
마지막 업데이트 시간 기준 backfill을 하는 경우라면 (Data Warehouse 테이블에 기록된 시간 기준) 이런 경우에도 execution_date을 이용한 backfill은 필요하지 않음
데이터의 크기가 굉장히 커지면 backfill 기능을 구현해 두는 것이 필수
airflow가 큰 도움이 됨
하지만 데이터 소스의 도움 없이는 불가능
created at 혹은 modified at 과 같은 칼럼이 존재해야 한다
어떻게 backfill로 구현할 것인가
제일 중요한 것은 데이터 소스가 backfill 방식을 지원해야함
“execution_date”을 사용해서 업데이트할 데이터 결정
“catchup” 필드를 True로 설정
start_date/end_date을 backfill하려는 날짜로 설정
다음으로 중요한 것은 DAG 구현이 execution_date을 고려해야 하는 것이고 idempotent 해야함