Incremental Update의 실패
관리하는 데이터 파이프라인의 수가 늘어나면 이 중의 몇은 항상 실패하게 되며 이를 어떻게 관리하느냐가 데이터 엔지니어의 삶에 큰 영향을 준다
1.
Incremental Update가 실패하면?
2.
하루에 한번 동작하고 Incremental하게 업데이트하는 파이프라인이라면?
a.
만약 아래와 같이 실행될 경우, 이틀간의 정보가 빠져있게 된다
3.
실패한 부분을 재실행하는 것이 얼마나 중요한가?
⇒ 관리하는 데이터 파이프라인의 수가 늘어나면 이 중의 몇은 항상 실패하게 되며 이를 어떻게 관리하느냐가 데이터 엔지니어의 삶에 큰 영향을 준다
•
이제부터 할 이야기는 Incremental Update시에만 의미가 있습니다.
◦
다시 한번 가능하면 Full Refresh를 사용하는 것이 좋음
▪
문제가 생겨도 다시 실행하면 됨
◦
Incremental Update는 효율성이 더 좋을 수 있지만 운영/유지보수의 난이도가 올라갑니다.
▪
실수등으로 데이터가 빠지는 일이 생길 수 있음
▪
과거 데이터를 다시 다 읽어와야하는 경우 다시 모두 재실행을 해주어야함
Backfill
Backfill의 용이성 여부 → 데이터 엔지니어 삶에 직접적인 영향!
•
Backfill의 정의
실패한 데이터 파이프라인을 재실행 혹은 읽어온 데이터들의 문제로 다시 다 읽어와야하는 경우를 의미
•
Backfill 해결은 Incremental Update에서 복잡해짐
◦
Full Refresh에서는 간단하다. 그냥 다시 실행하면 된다,
•
즉, 실패한 데이터 파이프라인의 재실행이 얼마나 용이한 구조인가?
◦
이게 잘 디자인된 것이 바로 Airflow
보통 Daily DAG를 작성한다고 하면 어떻게 할까?
•
지금 시간을 기준으로 어제 날짜를 계산하고 그 날짜에 해당하는 데이터를 읽어옴
from datetime import datetime, timedelta
# 지금 시간 기준으로 어제 날짜를 계산
y = datetime.now() - timedelta(1)
yesterday = datetime.strftime(y, '%Y-%m-%d')
# yesterday에 해당하는 데이터를 소스에서 읽어옴
# 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면
sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"
Python
복사
•
그런데 지난 1년치 데이터를 Backfill 해야한다면?
from datetime import datetime, timedelta
y = datetime.now() - timedelta(1)
yesterday = datetime.strftime(y, '%Y-%m-%d')
yesterday = '2023-01-01'
# yesterday에 해당하는 데이터를 소스에서 읽어옴
# 예를 들어 프로덕션 DB의 특정 테이블에서 읽어온다면
sql = f"SELECT * FROM table WHERE DATE(ts) = '{yesterday}'"
Python
복사
◦
기존 ETL 코드를 조금 수정(반복문을 추가)해서 지난 1년치 데이터에 대해 돌린다
◦
실수하기 쉽고 수정하는데 시간이 걸림
◦
읽어와야 할 데이터를 현재 시간 기준으로 계산하는 순간, 빠지는 데이터가 생겨 나중에 Backfill 작업을 수행하려고 하면 오류가 발생한다.
어떻게 ETL을 구현해놓으면 이런 일이 편해질까?
•
시스템적으로 이걸 쉽게 해주는 방법을 구현한다
◦
Airflow는 Incremental update 혹은 Full Refresh를 구분하지 못한다.
▪
모든 DAG가 incremental 하게 작동된다고 가정함
◦
날짜 별로 backfill 결과를 기록하고 성공 여부 기록: 나중에 결과를 쉽게 확인
◦
이 날짜를 시스템에서 ETL의 인자로 제공
◦
데이터 엔지니어는 읽어와야하는 데이터의 날짜를 계산하지 않고 시스템(Airflow)이 지정해준 날짜를 사용
▪
execution_date
•
Airflow의 접근방식
◦
ETL 별로 실행날짜와 결과를 메타데이터 데이터베이스에 기록
◦
모든 DAG 실행에는 “execution_date”이 지정되어 있음
▪
execution_date으로 채워야하는 날짜와 시간이 넘어옴
◦
이를 바탕으로 데이터를 갱신하도록 코드를 작성해야함
◦
이점: backfill이 쉬워짐
Execution Date?
일반적으로 일자별 배치는 하루가 마감된 새벽에 수행된다.
예를 들어, 2020–04–20 하루 24시간의 데이터를 집계하기 위한 Job은 2020–04–21 새벽에 수행된다.
즉, 2020–04–20 00:00 ~ 2020–04–21 00:00의 데이터를 집계한다는 것의 의미는 ‘2020–04–21’의 데이터가 아닌 ‘2020-04–20’ 의 데이터를 집계하는 것이므로, 날짜 파라미터를 left-bound하여 사용한다고 한다.
Airflow의 Execution Date은 현재 수행하는 일자로부터 left-bound된 값이 제공된다.
Task Instance Detail에서 실제 수행된 시간과 생성된 execution date을 볼 수 있다.
공식 문서 참고
그렇다면, start_date란?
•
이 시간에 처음 실행되는 것이 아니라 start_date + 실행주기가 처음 실행시간이 됨
◦
2022-02-23 01:00:00이 start_date인 DAG가 있다면
▪
Daily job이면 2022-02-24 01:00:00에 처음 실행됨
•
이때 execution_date으로는 2022-02-23 01:00:00이 들어옴
▪
Hourly job이면 2022-02-23 02:00:00에 처음 실행됨
•
이때 execution_date으로는 2022-02-23 01:00:00이 들어옴
•
execution_date을 보고 업데이트할 데이터의 날짜를 정하게 코딩하면
◦
Backfill이 코드 변경없이 가능
◦
단 이는 incremental update를 하는 DAG에서만 의미가 있음
Airflow의 Backfill 방식 설명
Daily Incremental Update를 구현해야 한다면?
•
예를 들어 2020년 11월 7일날부터 매일 매일 하루치 데이터를 읽어온다고 가정해보자
•
이 경우 언제부터 해당 ETL이 동작해야하나?
◦
2020년 11월 8일
◦
그래야 11월 7일의 데이터를 읽어올 수 있기 때문
•
다르게 이야기하면 2020년 11월 8일날 동작하지만 읽어와야 하는 데이터의 날짜는?
◦
2020년 11월 7일 ⇒ 이 것이 start_date
•
Airflow의 start_date은 시작 날짜라기는 보다는 읽어와야하는 데이터의 날짜임
이 때, execution_date이란 시스템 변수로 읽어와야하는 데이터의 날짜를 지정
•
다시 말하지만, Airflow는 full refresh 하는지 incremental update를 하는지 모른다
◦
incremental update를 한다고 가정한다
•
daily DAG는 start_date에 지정된 일자의 다음 날에 처음 실행된다
•
hourly DAG는 start_date에 지정된 시간의 다음 시간에 처음 실행된다
start_date과 execution_date 이해하기
•
2020-08-10 02:00:00로 start date로 설정된 daily job(DAG)이 있다
◦
catch up이 True로 설정되어 있다고 가정 (디폴트가 True)
•
지금 시간이 2020-08-13 20:00:00이고 처음으로 이 job이 활성화되었다
•
질문: 이 경우 이 job은 몇번 실행될까? (execution_date) ⇒ 3번 실행
◦
좌 : start date 및 실행되는 날짜/시간, 우 : execution date
◦
2020-08-10 02:00:00 → 실행 X ⇒ start date 하루 뒤에 실행
◦
2020-08-11 02:00:00 → 실행 ⇒ execution date : 2020-08-10 02:00:00
◦
2020-08-12 02:00:00 → 실행 ⇒ execution date : 2020-08-11 02:00:00
◦
2020-08-13 02:00:00 → 실행 ⇒ execution date : 2020-08-12 02:00:00
•
catch up이 False로 되어 있었다면 실행이 안되고 있을 것
•
Full refresh를 하다가 도저히 안되겠을 때 incremental update로 간다
•
밀린 작업들이기 때문에, 위의 작업들이 큐에 들어가서 위와 같이 execution date를 찍어내면서 실행이 된다
비용이 많이 나오게 될 경우
•
잘못된 만남 ㅜㅜ
◦
최적화되지 않은 Airflow configuration (start_date, catchup, …)
▪
엄청 큰 쿼리
▪
BigQuery/Snowflake
•
Redshift는 문제가 없음. 뭐를 하건 월별 정액 지출.
•
2천불짜리 쿼리가 Airflow에 의해 8번 스케줄된다면?
◦
예를 들어 daily job이 2020년 8월 6일을 start_date로 설정되었고 오늘 날짜가 8월 14일이라고 가정하자. 이 때 이 잡이 Enable되는 순간 (아래 그림 참고)
◦
이 잡은 catchup 파라미터의 값에 따라 8번 자동 실행되게 된다.
⇒ 8번의 비용이 발생하게 됨 ⇒ 만약 1번의 비용이 크다면 어마어마한 손실을 초래
▪
Incremental update가 필요한 경우 ⇒ catch up = True
▪
Full refresh 혹은 과거에 실행된 것이 아무 의미 없는 경우 ⇒ catch up = False
Backfill과 관련된 Airflow 변수들
이름 | 설명 |
start_date | DAG가 처음 실행되는 날짜가 아니라 DAG가 처음 읽어와야하는 데이터의 날짜/시간. 실제 첫 실행날짜는 start_date + DAG의 실행주기 |
execution_date | DAG가 읽어와야하는 데이터의 날짜와 시간 |
catchup | DAG가 처음 활성화된 시점이 start_date보다 미래라면
그 사이에 실행이 안된 것들을 어떻게 할 것인지 결정해주는 파라미터.
True가 디폴트값이고 이 경우 실행 안 된 것들을 모두 따라잡으려고 함.
False가 되면 실행 안된 것들을 무시함 |
end_date | 이 값은 보통 필요하지 않으며 Backfill을 날짜 범위에 대해 하는 경우에만 필요
airflow dags backfill -s …. -e ….
-s : start date
-e : end date |
퀴즈 리뷰
•
Airflow에서 하나의 DAG는 다수의 Task로 구성된다.
◦
Task = Airflow Operator
•
Airflow는 모든 DAG가 Incremental Update를 한다고 가정함
•
start_date: DAG의 처음 실행 시간이 아님
◦
Incremenal update를 구현한다는 관점에서 처음 읽어와야하는 데이터의 날짜가 됨
◦
Daily job이라면 start_date + 하루가 실제 DAG의 처음 실행 시간이 됨
▪
매일 동작하는 DAG의 Start date이 2021-02-05라면 이 DAG의 첫 실행 날짜는?
•
2021-02-06
▪
위 DAG의 경우 이때 execution_date으로 들어오는 날짜는?
•
2021-02-05
◦
Hourly job이라면 start_date + 한시간이 실제 DAG의 처음 실행 시간이 됨
•
execution_date:
◦
Airflow가 제공해주는 시스템 변수로 읽어와야할 데이터의 날짜와 시간이 들어옴
◦
보통 daily job이면 start_date는 첫 execution_date와 같음
•
Schedule interval이 "30 * * * *"으로 설정된 DAG에 대한 올바른 설명은?
◦
매시 30분마다 한번씩 실행된다
•
Schedule interval이 "0 * * * *"으로 설정된 DAG의 start date이 "2021-02-04 00:00:00"으로 잡혀있다면 이 DAG의 첫 번째 실행 날짜와 시간은 언제인가?
◦
1시간에 한번 실행하는 hourly DAG
◦
2021-02-04 01:00:00
•
Airflow의 DAG가 처음 ON이 되었을 때 start_date과 현재 날짜 사이에 실행이 안된 run들이 있을 경우 이를 실행한다. 이는 (??) 파라미터에 의해 결정된다. 이 파라미터를 False로 세팅하면 과거 실행이 안된 run을 무시한다
◦
catchup 파라미터
◦
아래와 같이 ON 이 되어있지 않으면 실행이 되지 않음
▪
실행이 되었을 때 실행이 되지 않은 run들을 catchup 파라미터를 통해 실행할지 말지 결정 가능
•
다음 중 Redshift에서 큰 데이터를 테이블로 복사하는 방식을 제대로 설명한 것은?
◦
복사할 레코드들을 파일로 저장해서 S3로 올린 후에 거기서 Redshift로 벌크 복사한다
Backfill을 커맨드라인에서 실행하는 방법
•
Daily Incremental DAG에서 2018년 7월달 데이터를 다 다시 읽어와야 한다면
◦
Airflow가 추천하는 방식으로 Incremental Update를 구현했다면 Backfill이 쉬워짐
•
하지만 이를 어떻게 실행하나
◦
하루에 31번 실행?
▪
airflow dags test MySQL_to_Redshift_v2 2023-07-01
▪
…
▪
airflow dags test MySQL_to_Redshift_v2 2023-07-31
◦
한번에 여러 날짜를 동시에 실행 가능한가?
▪
구현 방법에 따라 한번에 하나씩 실행하는 것이 안전할 수 있음
▪
이를 제어해주는 DAG 파라미터가 max_active_runs = 1
▪
한번에 한 날짜가 실행되고 동기적으로 실행되게 함
•
커맨드라인
◦
dag_id = backfill을 수행할 dag의 id
airflow dags backfill dag_id -s 2018-07-01 -e 2018-08-01
▪
이 명령어를 수행하려면 아래의 가정이 필요합니다:
•
catchUp이 True로 설정되어 있음
•
execution_date을 사용해서 Incremental update가 구현되어 있음
•
start_date부터 시작하지만 end_date은 포함하지 않음
•
실행순서는 날짜/시간순은 아니고 랜덤. 만일 날짜순으로 하고 싶다면
◦
DAG default_args의 depends_on_past를 True로 설정
default_args = {
'depends_on_past': True,
SQL
복사
How to Make Your DAG Backfill ready
•
먼저 모든 DAG가 backfill을 필요로 하지는 않음
◦
Full Refresh를 한다면 backfill은 의미가 없음
•
여기서 backfill은 일별 혹은 시간별로 업데이트하는 경우를 의미함
◦
마지막 업데이트 시간 기준 backfill을 하는 경우라면 (Data Warehouse 테이블에 기록된 시간
기준) 이런 경우에도 execution_date을 이용한 backfill은 필요하지 않음
•
데이터의 크기가 굉장히 커지면 backfill 기능을 구현해 두는 것이 필수
◦
airflow가 큰 도움이 됨
◦
하지만 데이터 소스의 도움 없이는 불가능
▪
created at 혹은 modified at 과 같은 칼럼이 존재해야 한다
•
어떻게 backfill로 구현할 것인가
◦
제일 중요한 것은 데이터 소스가 backfill 방식을 지원해야함
◦
“execution_date”을 사용해서 업데이트할 데이터 결정
◦
“catchup” 필드를 True로 설정
◦
start_date/end_date을 backfill하려는 날짜로 설정
◦
다음으로 중요한 것은 DAG 구현이 execution_date을 고려해야 하는 것이고 idempotent 해야함