Airflow 설명
•
에어플로우는 (스케줄러를 넘어) 파이썬에서 데이터 파이프라인을 위한 플랫폼입니다
◦
Top level Apache 프로젝트
◦
Airbnb에서 시작한 아파치 오픈소스 프로젝트
◦
가장 많이 사용되는 데이터 파이프라인 관리/작성 프레임 워크
•
데이터 파이프라인 프레임워크
◦
Python 3에서 데이터 파이프라인을 프로그래밍 방식으로 설계, 예약 및 모니터링 지원
•
데이터 파이프라인 스케줄링 지원
◦
정해진 시간에 ETL 실행 혹은 한 ETL의 실행이 끝나면 다음 ETL 실행
◦
웹 UI도 제공합니다
•
데이터 파이프라인(ETL)을 쉽게 만들 수 있도록 해줌
◦
다양한 데이터 소스와 데이터 웨어하우스를 쉽게 통합해주는 모듈 제공
◦
데이터 파이프라인 관리 관련 다양한 기능을 제공 ⇒ 특히 Backfill
•
Airflow의 데이터 파이프라인을 "방향 비순환 그래프"(DAG)라고 합니다
◦
하나의 DAG는 하나 이상의 태스크(일명 task, 연산자)으로 구성됩니다.
•
Airflow는 하나 이상의 서버로 구성된 클러스터입니다
◦
Worker 및 Scheduler로 구성됨
◦
스케줄러는 당신의 Task를 여러 개의 worker들에게 분배할 것입니다
◦
DAG 및 스케줄링 정보는 DB에 저장됩니다(SQLite가 기본적으로 사용됨)
▪
실제 프로덕션 용도로는 MySQL 또는 Postgres가 선호됨
•
다양한 타사 서비스와의 매우 포괄적인 통합을 이루고 있습니다.
•
2020년 12월 에어플로 2.0 출시
◦
에어플로 2.0을 사용합니다
◦
Airflow 1.x에서 2.0으로 업그레이드 지침
•
Airflow 버전 선택 방법: 큰 회사에서 사용하는 버전이 무엇인지 확인. (무조건 최신 사용 X)
•
참고로 DAG의 tag 붙이는 기준은 아래와 같은 예시를 들 수 있습니다
◦
DAG의 중요도, 소유자, 팀 명칭, DAG의 특징
Airflow 구성
Apache Airflow 컴포넌트
Apache Airflow는 총 5개의 컴포넌트로 구성됩니다.
1.
Web Server
•
마스터 기능
•
웹 서버는 python Flask로 구현이 되어 있습니다.
•
웹 UI는 스케쥴러와 DAG의 실행 상황을 시각화해줍니다.
2.
Scheduler
•
마스터 기능
•
주기적으로 DAG들을 파싱해서 Executor를 통해서 워커에게 배정하는 역활을 수행합니다. 이를 통해 Airflow 내 존재하는 DAG 파이프라인들을 인지합니다. 이후, 파싱된 DAG들의 정보를 Meta DB에 기록합니다.
•
스케쥴러는 데이터 파이프라인을 정해진 시간에 실행해주거나, 하나의 DAG나 task가 끝난 후 다른 작업을 트리거링합니다.
•
위의 작업들을 전담하는 scheduler라는 모듈이 존재
3.
Worker
•
실제로 DAG를 실행하는 역활을 수행합니다.
•
Task에 해당하는 코드를 실행해주는 역활을 하는 모듈
•
Airflow를 Scaling해준다는 의미는 워커의 수를 늘리는 것을 의미합니다.
4.
Meta Database
•
Sqlite가 기본으로 설치됨 (싱글쓰레드, 파일기반)
•
보통 Production 레벨에서는 Sqlite이 아닌 MySQL 혹은 Postgres를 설치해서 사용
•
스케쥴링, 워커에 대한 정보 및 실행 결과들을 기록해주는 기록용 메타 데이터 베이스
•
즉, 스케쥴러와 각 DAG들의 실행 결과가 메타 DB에 기록이 됩니다.
5.
Queue
•
태스크가 여러 개의 워커에 분산 처리가 될려면 ‘큐’ 라는 자료구조가 필요합니다.
•
하나의 서버로 돌아갈 경우, 워커는 한 대의 서버 용량으로 제약이 걸립니다. 이 경우, 데이터 파이프라인의 수가 늘어나면 더 많은 DAG가 자주 실행되어야 하므로, 서버(노드)를 추가하는 선택을 하게 됩니다. 추가된 서버는 워커 용으로 사용됩니다. 데이터 파이프라인의 특정 태스크가 어느 워커에서 실행될지 미리 결정할 수 없으므로, 보통 큐를 만들어서 큐에 태스크를 넣은 후, 놀고 있는 워커가 있을 경우 큐에 있는 태스크를 읽어서 해당 워커에 할당해 주는 형태로 구성됩니다.
•
큐는 다수의 서버(멀티 노드) 구성인 경우에만 사용됨
•
Executor를 어떤 것을 쓰느냐에 따라 큐가 달라집니다. (CeleryExecuter, KubernetesExecutor)
•
서버가 한 대여도 Executor에 따라서 큐를 사용하기도 한다.
Airflow 스케일링 방법
빅데이터 프로세싱 작업이란
외부에서는 볼 때는 유기적인 하나의 컴퓨터처럼 보이지만, 내부에서는 다수의 서버가 다양한 프로토콜과 복잡한 제어방식을 통해서 같이 협업을 하는 구조
빅데이터 프로세싱에 한계가 올 때마다 스케일 업, 스케일 아웃을 동시 혹은 별도로 선택하게 됩니다.
•
총 2가지 접근법이 존재합니다.
•
스케일 업 (더 좋은 사양의 서버 사용)
◦
동일한 노드 개수로 서버 사양을 올리기
◦
하나의 노드가 더 많은 태스크를 수행할 수 있게 하지만, 언젠가는 한계에 도달.
•
스케일 아웃 (서버 추가)
◦
worker node(서버)의 개수를 증가
◦
마스터 서버 노드 = 스케쥴러와 웹 서버만 소유
◦
나머지 서버들은 워커 노드들에 할당
◦
최대한 지양하는 것이 좋다 (마지막 수단)
▪
보통 서버의 용량이 부족해지기 시작하면 스케일업으로 수행을 하다가, 스케일 아웃을 선택
▪
서버의 사양을 높이는 것이 노드의 개수를 늘리는 것보다 비용이 저렴한 경우가 많다.
▪
직접 스케일 아웃을 하는 것이 아닌 Airflow를 제공해주는 클라우드 서비스를 사용하는 것을 권장 (내가 관리하는 오버 헤드를 클라우드에 넘기는 것)
•
다수의 노드를 워커로 사용해야한다면 GCP 클라우드 컴포저, AWS MWAA를 사용하는 것이 편의에 좋다
•
Airflow가 서버 한 대로 구성되었을 때 구조 예시
◦
서버 한 대
◦
worker node 1개
◦
서버 한 대의 경우, 한 대가 가지고 있는 용량, CPU의 제한이 존재하기 때문에 동시에 실행할 수 있는 태스크의 제한이 생기게 됩니다. 또한 데이터 파이프라인의 수가 증가함에 따라 서버 한 개의 처리량에 대한 부족함이 생길 것입니다.
•
Airflow 구조: 다수 서버
◦
마스터 노드에는 웹서버와 스케쥴러가 존재
◦
다수의 워커 노드들이 존재
◦
워커 노드들과 마스터 노드 내부 Executor은 큐를 통해 통신
◦
스케쥴러가 Executor를 통해서 큐를 거쳐 워커와 통신하는 구조를 가진다.
▪
Executor가 무엇인지에 따라서 큐의 사용 여부가 나뉜다
◦
처리 성능의 Bottleneck은 Worker에 존재하기 때문에, 마스터 노드 외의 서버들을 워커 전용으로 할당
잠깐! Airflow Executor는 무엇?
•
Executor는 Task들을 관리하고 실행하는 역할을 수행
◦
병렬 혹은 일렬 실행이나 어느 worker에서 실행할지 등등
•
다양한 수의 Executor 타입이 존재
◦
Sequential Executor: 디폴트로 설치되며 Sqlite와 같은 싱글스레드 DB에서만 사용가능
◦
Local Executor: task들을 Airflow 마스터 노드안에서 실행
◦
Celery Executor: 다수의 Worker 노드가 있는 경우 사용되며 Celery 큐를 사용해 task들을 worker 노드로 분산하여 실행
▪
워커를 스케일아웃할 수 있는 방법 중 하나입니다. Celery 백엔드로 메시지 브로커(broker)가 필요하며, 메시지 브로커로는 RabbitMQ나 Redis를 사용할 수 있습니다. 스케줄러는 실행해야 할 태스크를 메시지 브로커에 전달하고, 각 워커 장비의 Celery 워커가 태스크를 실행합니다.
Celery의 Queue에 대한 개념
1.
Queue:
•
Celery에서, 태스크는 특정 '큐'에 전송됩니다.
•
큐는 워커가 태스크를 가져와 실행할 때 참조하는 것입니다.
•
기본적으로 Celery는 'default'라는 이름의 큐를 사용합니다.
•
다양한 워커 구성을 통해 특정 워커가 특정 큐의 태스크만 소비하도록 설정할 수 있습니다.
2.
Airflow에서의 활용:
•
Airflow에서는 각 태스크 또는 DAG 단위로 큐를 지정할 수 있습니다.
•
이를 통해 특정 워커 머신이 특정 태스크나 DAG만 실행하도록 설정할 수 있습니다. 이는 자원 할당, 특정 워커 머신에서만 실행되어야 하는 태스크 등의 요구사항을 처리하는 데 유용합니다.
3.
Queue 설정:
•
Airflow의 DAG 또는 태스크 정의에서 queue 매개변수를 사용하여 큐를 지정할 수 있습니다.
task = BashOperator(
task_id='my_task',
bash_command='echo Hello, World!',
queue='my_queue',
dag=dag
)
Python
복사
•
위 예제에서, my_task는 my_queue라는 큐에 할당됩니다.
•
특정 워커를 해당 큐에서만 태스크를 실행하도록 설정하려면, 워커 시작 시 해당 큐를 지정해줍니다. 예: celery worker -Q my_queue
이렇게 Celery의 큐 기능을 활용하면, Airflow에서 다양한 워크로드와 자원 요구 사항에 따라 태스크의 실행을 세밀하게 관리할 수 있습니다.
◦
Kubernetes Executor : K8s 클러스터를 사용하여 task들을 독립된 환경에서 사용
◦
Local Kubernetes Executor와 Celery Kubernetes Executor도 존재
•
Airflow 개발의 장단점
◦
장점
▪
데이터 파이프라인을 세밀하게 제어 가능
▪
다양한 데이터 소스와 데이터 웨어하우스를 지원
▪
백필(Backfill)이 쉬움
◦
단점
▪
배우기가 쉽지 않음 ⇒ 러닝 커브가 존재
▪
상대적으로 개발환경을 구성하기가 쉽지 않음
▪
클러스터 노드(다수의 서버)의 경우, 복잡해서 직접 운영이 쉽지 않음.
이 경우, 클라우드 버전 사용이 선호됨
•
GCP provides “Cloud Composer”
•
AWS provides “Managed Workflows for Apache Airflow”
Airflow 코드의 기본 구조
•
DAG 대표하는 객체를 먼저 만듬
◦
DAG 이름, 실행주기, 실행날짜, 오너 등등
•
다음으로 DAG를 구성하는 태스크들을 만듬
◦
태스크별로 적합한 오퍼레이터를 선택
◦
태스크 ID를 부여하고 해야할 작업의 세부사항 지정
•
최종적으로 태스크들간의 실행 순서를 결정
DAG란?
DAG란 무엇인가?
•
Directed Acyclic Graph의 줄임말
•
Airflow에서 ETL을 부르는 명칭
•
하나의 DAG는 다수의 태스크로 구성됨
◦
예를 3개의 태스크로 구성된다면 Extract, Transform, Load로 구성
•
태스크란?
◦
태스크는 파이썬 스크립트 혹은 코드로 이루어져 있습니다.
◦
Airflow의 오퍼레이터(Operator)로 만들어짐
▪
오퍼레이터는 OOP의 클래스와 유사
◦
Airflow에서 이미 다양한 종류의 오퍼레이터를 제공함
◦
경우에 맞게 사용 오퍼레이터를 결정하거나 필요하다면 직접 개발
▪
e.g., Redshift writing, Postgres query, S3 Read/Write, Hive query, Spark job, shell script
DAG의 구성 예제
•
1번 예제
◦
3개의 Task로 구성된 DAG.
◦
먼저 t1이 실행되고 t2, t3의 순으로 일렬로 실행 (동기적 처리)
•
2번 예제
◦
3개의 Task로 구성된 DAG.
◦
먼저 t1이 실행되고 여기서 t2와 t3로 분기
◦
분기된 태스크(t2, t3)는 동시에 병렬로 수행
모든 Task에 필요한 기본 정보
•
여기에 지정되는 인자들은 모든 태스크들에 공통으로 적용되는 설정이 됨
•
뒤에서 DAG 객체를 만들 때 지정함
•
예제
from datetime import datetime, timedelta
default_args = {
'owner': 'keeyong',
'start_date': datetime(2020, 8, 7, hour=0, minute=00),
'end_date': datetime(2020, 8, 31, hour=23, minute=00),
'email': ['keeyonghan@hotmail.com'],
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
Python
복사
•
default arg에는 적용될 수 있는 추가적인 인자가 있다
◦
이것은 태스크 수준에 적용됩니다.
▪
on_failure_callback : 실패 콜백을 어디로 할지 ex. 슬랙, 디스코드
▪
on_success_callback : 성공 콜백을 어디로 할지
모든 DAG에 필요한 기본 정보
from airflow import DAG
dag = DAG(
"dag_v1", # DAG name
start_date=datetime(2020,8,7,hour=0,minute=00),
schedule="0 * * * *",
tags=["example"],
catchup=False,
# common settings
default_args=default_args
)
##
Python
복사
•
start_date와 end_date는 이 DAG가 시작하고 멈추는 시기를 지정합니다:
◦
일회성 back filling을 수행하는 데 사용할 수 있습니다
◦
catch up의 의미를 이해하는 것이 중요합니다
▪
catch up을 False로 하게 되면 start date부터 지금 날짜까지의 gap 간 실행을 안할 수 있다.
▪
만약 해당 DAG가 Full refresh를 목표로 할 경우, 캐치업을 False로 하길 권장
▪
incremental update를 할 때, 세팅을 고려하는 경우가 존재
•
schedule_interval은 다음과 같은 cron 표현식 또는 프리셋으로 정의할 수 있습니다
◦
1, 2, 5번째가 많이 쓰인다
◦
“0 * * * *”의 의미는?
▪
분만 보장이 됨 (매시 0분에 스케쥴링) ⇒ 매 시간 스케쥴링
◦
“0 12 * * *”의 의미는?
▪
매일 한 번 (12시 0분) 에 스케쥴링
Operators Creation Example
•
1번 예제 (Bash Operator)
◦
bash operator : bash command를 bash 쉘에서 실행시켜줌
◦
구성
▪
3개의 태스크로 구성
•
t1은 현재 시간 출력
•
t2는 5초간 대기 후 종료
•
t3는 서버의 /tmp 디렉토리의 내용 출력
•
t1이 끝나고 t2와 t3를 병렬로 실행
▪
코드
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 5, 27, hour=0, minute=00),
'email': ['hajuny129@gmail.com'],
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
test_dag = DAG(
"dag_v1", # DAG name
schedule="0 9 * * *",
tags=['test'],
catchUp=False,
default_args=default_args
)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=test_dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5', // 실행 끝나고 5초간 sleep
retries=3,
dag=test_dag)
t3 = BashOperator(
task_id='ls',
bash_command='ls /tmp',
dag=test_dag)
t1 >> [ t2, t3 ] ## 동시 병렬 실행
Python
복사
•
위에 처럼 작성하면 t1이 끝나고 분기가 생긴다
•
t2.set_upstream(t1) 과 t1 >> t2 는 동일 표현이지만 후자를 자주 쓴다
▪
Trigger by UI
•
2번 Bash Operator 예제
start = DummyOperator(dag=dag, task_id="start", *args, **kwargs)
t1 = BashOperator(
task_id='ls1',
bash_command='ls /tmp/downloaded',
retries=3,
dag=dag)
t2 = BashOperator(
task_id='ls2',
bash_command='ls /tmp/downloaded',
dag=dag)
end = DummyOperator(dag=dag, task_id='end', *args, **kwargs)
SQL
복사
위와 같이 실행 시키기 위해서는 아래의 명령을 수행한다
위와 같이 start와 end를 두는 경우는 사이에 병렬 스케쥴링의 시간을 알아볼 때 사용하는 구조이다.
// 1번
start >> t1 >> end
start >> t2 >> end
//2번
start >> [t1, t2] >> end
SQL
복사
How to Trigger a DAG - 터미널에서 실행
•
먼저 Airflow 서버에 로그인하고 다음 명령 실행
•
airflow dags list
•
airflow tasks list DAG이름
•
airflow tasks test DAG이름 Task이름 날짜
◦
test vs. run 비교할 수 있어야 한다
▪
둘은 같은 태스크를 실행하는 명령어 역활을 수행한다
▪
run : 실행한 결과가 메타 DB에 기록됨
▪
test : 실행한 결과가 메타 DB에 기록 X
◦
날짜는 YYYY-MM-DD
▪
start_date보다 과거인 경우는 실행이 되지만 오늘 날짜보다 미래인 경우, 실행 안됨
▪
이게 바로 execution_date의 값이 됨 (execution_date은 나중에 설명)
•
예시 명령어
>> airflow dags list
>> airflow tasks list dag_v1
>> airflow tasks test dag_v1 ls 2020-08-09
>> airflow dags test dag_v1 2019-12-08
>> airflow dags backfill dag_v1 -s 2019-01-01 -e 2019-12-31
Python
복사
◦
docker의 경우, scheduler 컨테이너 안에 들어가서 위 명령어를 수행
▪
docker exec -it 컨테이너이름 /bin/bash
•
UI에서 살펴볼 부분
◦
Grid
▪
특정 task가 어떻게 실행되었는지 알아보려면 grid에서 task 클릭
▪
다시 재실행시키고 싶을 경우, 아래처럼 clear를 클릭해주면 된다
▪
이후, print_date(선택한 task)부터 이후 태스크들까지 다시 실행되게 된다
▪
grid - 특정 task - Log 를 통해 태스크 실행 로그를 확인 가능하다.
Reference
•
실리콘에서 날아온 데이터엔지니어링
•
데이터 엔지니어링 데브코스 1기