Search

Apache Airflow 입문 및 구조, 개념

Airflow 설명

에어플로우는 (스케줄러를 넘어) 파이썬에서 데이터 파이프라인을 위한 플랫폼입니다
Top level Apache 프로젝트
Airbnb에서 시작한 아파치 오픈소스 프로젝트
가장 많이 사용되는 데이터 파이프라인 관리/작성 프레임 워크
데이터 파이프라인 프레임워크
Python 3에서 데이터 파이프라인을 프로그래밍 방식으로 설계, 예약 및 모니터링 지원
데이터 파이프라인 스케줄링 지원
정해진 시간에 ETL 실행 혹은 한 ETL의 실행이 끝나면 다음 ETL 실행
웹 UI도 제공합니다
데이터 파이프라인(ETL)을 쉽게 만들 수 있도록 해줌
다양한 데이터 소스와 데이터 웨어하우스를 쉽게 통합해주는 모듈 제공
데이터 파이프라인 관리 관련 다양한 기능을 제공 ⇒ 특히 Backfill
Airflow의 데이터 파이프라인을 "방향 비순환 그래프"(DAG)라고 합니다
하나의 DAG는 하나 이상의 태스크(일명 task, 연산자)으로 구성됩니다.
Airflow는 하나 이상의 서버로 구성된 클러스터입니다
Worker 및 Scheduler로 구성됨
스케줄러는 당신의 Task를 여러 개의 worker들에게 분배할 것입니다
DAG 및 스케줄링 정보는 DB에 저장됩니다(SQLite가 기본적으로 사용됨)
실제 프로덕션 용도로는 MySQL 또는 Postgres가 선호됨
다양한 타사 서비스와의 매우 포괄적인 통합을 이루고 있습니다.
2020년 12월 에어플로 2.0 출시
에어플로 2.0을 사용합니다
Airflow 1.x에서 2.0으로 업그레이드 지침
Airflow 버전 선택 방법: 큰 회사에서 사용하는 버전이 무엇인지 확인. (무조건 최신 사용 X)
참고로 DAG의 tag 붙이는 기준은 아래와 같은 예시를 들 수 있습니다
DAG의 중요도, 소유자, 팀 명칭, DAG의 특징

Airflow 구성

Apache Airflow 컴포넌트

Apache Airflow는 총 5개의 컴포넌트로 구성됩니다.
1.
Web Server
마스터 기능
웹 서버는 python Flask로 구현이 되어 있습니다.
웹 UI는 스케쥴러와 DAG의 실행 상황을 시각화해줍니다.
2.
Scheduler
마스터 기능
주기적으로 DAG들을 파싱해서 Executor를 통해서 워커에게 배정하는 역활을 수행합니다. 이를 통해 Airflow 내 존재하는 DAG 파이프라인들을 인지합니다. 이후, 파싱된 DAG들의 정보를 Meta DB에 기록합니다.
스케쥴러는 데이터 파이프라인을 정해진 시간에 실행해주거나, 하나의 DAG나 task가 끝난 후 다른 작업을 트리거링합니다.
위의 작업들을 전담하는 scheduler라는 모듈이 존재
3.
Worker
실제로 DAG를 실행하는 역활을 수행합니다.
Task에 해당하는 코드를 실행해주는 역활을 하는 모듈
Airflow를 Scaling해준다는 의미는 워커의 수를 늘리는 것을 의미합니다.
4.
Meta Database
Sqlite가 기본으로 설치됨 (싱글쓰레드, 파일기반)
보통 Production 레벨에서는 Sqlite이 아닌 MySQL 혹은 Postgres를 설치해서 사용
스케쥴링, 워커에 대한 정보 및 실행 결과들을 기록해주는 기록용 메타 데이터 베이스
즉, 스케쥴러와 각 DAG들의 실행 결과가 메타 DB에 기록이 됩니다.
5.
Queue
태스크가 여러 개의 워커에 분산 처리가 될려면 ‘큐’ 라는 자료구조가 필요합니다.
하나의 서버로 돌아갈 경우, 워커는 한 대의 서버 용량으로 제약이 걸립니다. 이 경우, 데이터 파이프라인의 수가 늘어나면 더 많은 DAG가 자주 실행되어야 하므로, 서버(노드)를 추가하는 선택을 하게 됩니다. 추가된 서버는 워커 용으로 사용됩니다. 데이터 파이프라인의 특정 태스크가 어느 워커에서 실행될지 미리 결정할 수 없으므로, 보통 큐를 만들어서 큐에 태스크를 넣은 후, 놀고 있는 워커가 있을 경우 큐에 있는 태스크를 읽어서 해당 워커에 할당해 주는 형태로 구성됩니다.
큐는 다수의 서버(멀티 노드) 구성인 경우에만 사용됨
Executor를 어떤 것을 쓰느냐에 따라 큐가 달라집니다. (CeleryExecuter, KubernetesExecutor)
서버가 한 대여도 Executor에 따라서 큐를 사용하기도 한다.

Airflow 스케일링 방법

빅데이터 프로세싱 작업이란 외부에서는 볼 때는 유기적인 하나의 컴퓨터처럼 보이지만, 내부에서는 다수의 서버가 다양한 프로토콜과 복잡한 제어방식을 통해서 같이 협업을 하는 구조 빅데이터 프로세싱에 한계가 올 때마다 스케일 업, 스케일 아웃을 동시 혹은 별도로 선택하게 됩니다.
총 2가지 접근법이 존재합니다.
스케일 업 (더 좋은 사양의 서버 사용)
동일한 노드 개수로 서버 사양을 올리기
하나의 노드가 더 많은 태스크를 수행할 수 있게 하지만, 언젠가는 한계에 도달.
스케일 아웃 (서버 추가)
worker node(서버)의 개수를 증가
마스터 서버 노드 = 스케쥴러와 웹 서버만 소유
나머지 서버들은 워커 노드들에 할당
최대한 지양하는 것이 좋다 (마지막 수단)
보통 서버의 용량이 부족해지기 시작하면 스케일업으로 수행을 하다가, 스케일 아웃을 선택
서버의 사양을 높이는 것이 노드의 개수를 늘리는 것보다 비용이 저렴한 경우가 많다.
직접 스케일 아웃을 하는 것이 아닌 Airflow를 제공해주는 클라우드 서비스를 사용하는 것을 권장 (내가 관리하는 오버 헤드를 클라우드에 넘기는 것)
다수의 노드를 워커로 사용해야한다면 GCP 클라우드 컴포저, AWS MWAA를 사용하는 것이 편의에 좋다
Airflow가 서버 한 대로 구성되었을 때 구조 예시
서버 한 대
worker node 1개
서버 한 대의 경우, 한 대가 가지고 있는 용량, CPU의 제한이 존재하기 때문에 동시에 실행할 수 있는 태스크의 제한이 생기게 됩니다. 또한 데이터 파이프라인의 수가 증가함에 따라 서버 한 개의 처리량에 대한 부족함이 생길 것입니다.
Airflow 구조: 다수 서버
마스터 노드에는 웹서버와 스케쥴러가 존재
다수의 워커 노드들이 존재
워커 노드들과 마스터 노드 내부 Executor은 큐를 통해 통신
스케쥴러가 Executor를 통해서 큐를 거쳐 워커와 통신하는 구조를 가진다.
Executor가 무엇인지에 따라서 큐의 사용 여부가 나뉜다
처리 성능의 Bottleneck은 Worker에 존재하기 때문에, 마스터 노드 외의 서버들을 워커 전용으로 할당
Airflow 구조 다시 보기
여러 종류의 Executor들
Sequential Executor
기본적으로 Sqlite와 연동되는 제약이 많은 Executor
Local Executor
Celery Executor
Kubernetes Executor
CeleryKubernetes Executor

잠깐! Airflow Executor는 무엇?

Executor는 Task들을 관리하고 실행하는 역할을 수행
병렬 혹은 일렬 실행이나 어느 worker에서 실행할지 등등
다양한 수의 Executor 타입이 존재
Sequential Executor: 디폴트로 설치되며 Sqlite와 같은 싱글스레드 DB에서만 사용가능
Local Executor: task들을 Airflow 마스터 노드안에서 실행
Celery Executor: 다수의 Worker 노드가 있는 경우 사용되며 Celery 큐를 사용해 task들을 worker 노드로 분산하여 실행
CeleryExecutorCelery라는 분산 태스크 큐를 사용하여 Airflow의 태스크를 여러 대의 워커 머신에서 병렬로 실행할 수 있게 합니다.
워커를 스케일아웃할 수 있는 방법 중 하나입니다. Celery 백엔드로 메시지 브로커(broker)가 필요하며, 메시지 브로커로는 RabbitMQ나 Redis를 사용할 수 있습니다. 스케줄러는 실행해야 할 태스크를 메시지 브로커에 전달하고, 각 워커 장비의 Celery 워커가 태스크를 실행합니다.
Celery의 Queue에 대한 개념
1.
Queue:
Celery에서, 태스크는 특정 '큐'에 전송됩니다.
큐는 워커가 태스크를 가져와 실행할 때 참조하는 것입니다.
기본적으로 Celery는 'default'라는 이름의 큐를 사용합니다.
다양한 워커 구성을 통해 특정 워커가 특정 큐의 태스크만 소비하도록 설정할 수 있습니다.
2.
Airflow에서의 활용:
Airflow에서는 각 태스크 또는 DAG 단위로 큐를 지정할 수 있습니다.
이를 통해 특정 워커 머신이 특정 태스크나 DAG만 실행하도록 설정할 수 있습니다. 이는 자원 할당, 특정 워커 머신에서만 실행되어야 하는 태스크 등의 요구사항을 처리하는 데 유용합니다.
3.
Queue 설정:
Airflow의 DAG 또는 태스크 정의에서 queue 매개변수를 사용하여 큐를 지정할 수 있습니다.
task = BashOperator( task_id='my_task', bash_command='echo Hello, World!', queue='my_queue', dag=dag )
Python
복사
위 예제에서, my_taskmy_queue라는 큐에 할당됩니다.
특정 워커를 해당 큐에서만 태스크를 실행하도록 설정하려면, 워커 시작 시 해당 큐를 지정해줍니다. 예: celery worker -Q my_queue
이렇게 Celery의 큐 기능을 활용하면, Airflow에서 다양한 워크로드와 자원 요구 사항에 따라 태스크의 실행을 세밀하게 관리할 수 있습니다.
Kubernetes Executor : K8s 클러스터를 사용하여 task들을 독립된 환경에서 사용
Local Kubernetes ExecutorCelery Kubernetes Executor도 존재
Airflow 개발의 장단점
장점
데이터 파이프라인을 세밀하게 제어 가능
다양한 데이터 소스와 데이터 웨어하우스를 지원
백필(Backfill)이 쉬움
단점
배우기가 쉽지 않음 ⇒ 러닝 커브가 존재
상대적으로 개발환경을 구성하기가 쉽지 않음
클러스터 노드(다수의 서버)의 경우, 복잡해서 직접 운영이 쉽지 않음. 이 경우, 클라우드 버전 사용이 선호됨
GCP provides “Cloud Composer”
AWS provides “Managed Workflows for Apache Airflow”

Airflow 코드의 기본 구조

DAG 대표하는 객체를 먼저 만듬
DAG 이름, 실행주기, 실행날짜, 오너 등등
다음으로 DAG를 구성하는 태스크들을 만듬
태스크별로 적합한 오퍼레이터를 선택
태스크 ID를 부여하고 해야할 작업의 세부사항 지정
최종적으로 태스크들간의 실행 순서를 결정

DAG란?

DAG란 무엇인가?
Directed Acyclic Graph의 줄임말
Airflow에서 ETL을 부르는 명칭
하나의 DAG는 다수의 태스크로 구성됨
예를 3개의 태스크로 구성된다면 Extract, Transform, Load로 구성
태스크란?
태스크는 파이썬 스크립트 혹은 코드로 이루어져 있습니다.
Airflow의 오퍼레이터(Operator)로 만들어짐
오퍼레이터는 OOP의 클래스와 유사
Airflow에서 이미 다양한 종류의 오퍼레이터를 제공함
경우에 맞게 사용 오퍼레이터를 결정하거나 필요하다면 직접 개발
e.g., Redshift writing, Postgres query, S3 Read/Write, Hive query, Spark job, shell script
DAG의 구성 예제
1번 예제
3개의 Task로 구성된 DAG.
먼저 t1이 실행되고 t2, t3의 순으로 일렬로 실행 (동기적 처리)
2번 예제
3개의 Task로 구성된 DAG.
먼저 t1이 실행되고 여기서 t2와 t3로 분기
분기된 태스크(t2, t3)는 동시에 병렬로 수행
모든 Task에 필요한 기본 정보
여기에 지정되는 인자들은 모든 태스크들에 공통으로 적용되는 설정이 됨
뒤에서 DAG 객체를 만들 때 지정함
예제
from datetime import datetime, timedelta default_args = { 'owner': 'keeyong', 'start_date': datetime(2020, 8, 7, hour=0, minute=00), 'end_date': datetime(2020, 8, 31, hour=23, minute=00), 'email': ['keeyonghan@hotmail.com'], 'retries': 1, 'retry_delay': timedelta(minutes=3), }
Python
복사
default arg에는 적용될 수 있는 추가적인 인자가 있다
이것은 태스크 수준에 적용됩니다.
on_failure_callback : 실패 콜백을 어디로 할지 ex. 슬랙, 디스코드
on_success_callback : 성공 콜백을 어디로 할지
모든 DAG에 필요한 기본 정보
from airflow import DAG dag = DAG( "dag_v1", # DAG name start_date=datetime(2020,8,7,hour=0,minute=00), schedule="0 * * * *", tags=["example"], catchup=False, # common settings default_args=default_args ) ##
Python
복사
start_date와 end_date는 이 DAG가 시작하고 멈추는 시기를 지정합니다:
일회성 back filling을 수행하는 데 사용할 수 있습니다
catch up의 의미를 이해하는 것이 중요합니다
catch up을 False로 하게 되면 start date부터 지금 날짜까지의 gap 간 실행을 안할 수 있다.
만약 해당 DAG가 Full refresh를 목표로 할 경우, 캐치업을 False로 하길 권장
incremental update를 할 때, 세팅을 고려하는 경우가 존재
schedule_interval은 다음과 같은 cron 표현식 또는 프리셋으로 정의할 수 있습니다
1, 2, 5번째가 많이 쓰인다
“0 * * * *”의 의미는?
분만 보장이 됨 (매시 0분에 스케쥴링) ⇒ 매 시간 스케쥴링
“0 12 * * *”의 의미는?
매일 한 번 (12시 0분) 에 스케쥴링

Operators Creation Example

1번 예제 (Bash Operator)
bash operator : bash command를 bash 쉘에서 실행시켜줌
구성
3개의 태스크로 구성
t1은 현재 시간 출력
t2는 5초간 대기 후 종료
t3는 서버의 /tmp 디렉토리의 내용 출력
t1이 끝나고 t2와 t3를 병렬로 실행
코드
from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'airflow', 'start_date': datetime(2023, 5, 27, hour=0, minute=00), 'email': ['hajuny129@gmail.com'], 'retries': 1, 'retry_delay': timedelta(minutes=3), } test_dag = DAG( "dag_v1", # DAG name schedule="0 9 * * *", tags=['test'], catchUp=False, default_args=default_args ) t1 = BashOperator( task_id='print_date', bash_command='date', dag=test_dag) t2 = BashOperator( task_id='sleep', bash_command='sleep 5', // 실행 끝나고 5초간 sleep retries=3, dag=test_dag) t3 = BashOperator( task_id='ls', bash_command='ls /tmp', dag=test_dag) t1 >> [ t2, t3 ] ## 동시 병렬 실행
Python
복사
위에 처럼 작성하면 t1이 끝나고 분기가 생긴다
t2.set_upstream(t1) 과 t1 >> t2 는 동일 표현이지만 후자를 자주 쓴다
Trigger by UI
2번 Bash Operator 예제
start = DummyOperator(dag=dag, task_id="start", *args, **kwargs) t1 = BashOperator( task_id='ls1', bash_command='ls /tmp/downloaded', retries=3, dag=dag) t2 = BashOperator( task_id='ls2', bash_command='ls /tmp/downloaded', dag=dag) end = DummyOperator(dag=dag, task_id='end', *args, **kwargs)
SQL
복사
위와 같이 실행 시키기 위해서는 아래의 명령을 수행한다
위와 같이 start와 end를 두는 경우는 사이에 병렬 스케쥴링의 시간을 알아볼 때 사용하는 구조이다.
// 1번 start >> t1 >> end start >> t2 >> end //2번 start >> [t1, t2] >> end
SQL
복사

How to Trigger a DAG - 터미널에서 실행

먼저 Airflow 서버에 로그인하고 다음 명령 실행
airflow dags list
airflow tasks list DAG이름
airflow tasks test DAG이름 Task이름 날짜
test vs. run 비교할 수 있어야 한다
둘은 같은 태스크를 실행하는 명령어 역활을 수행한다
run : 실행한 결과가 메타 DB에 기록됨
test : 실행한 결과가 메타 DB에 기록 X
날짜는 YYYY-MM-DD
start_date보다 과거인 경우는 실행이 되지만 오늘 날짜보다 미래인 경우, 실행 안됨
이게 바로 execution_date의 값이 됨 (execution_date은 나중에 설명)
예시 명령어
>> airflow dags list >> airflow tasks list dag_v1 >> airflow tasks test dag_v1 ls 2020-08-09 >> airflow dags test dag_v1 2019-12-08 >> airflow dags backfill dag_v1 -s 2019-01-01 -e 2019-12-31
Python
복사
docker의 경우, scheduler 컨테이너 안에 들어가서 위 명령어를 수행
docker exec -it 컨테이너이름 /bin/bash
UI에서 살펴볼 부분
Grid
특정 task가 어떻게 실행되었는지 알아보려면 grid에서 task 클릭
다시 재실행시키고 싶을 경우, 아래처럼 clear를 클릭해주면 된다
이후, print_date(선택한 task)부터 이후 태스크들까지 다시 실행되게 된다
grid - 특정 task - Log 를 통해 태스크 실행 로그를 확인 가능하다.

Reference

실리콘에서 날아온 데이터엔지니어링
데이터 엔지니어링 데브코스 1기