Search

Airflow 운영 상의 어려움 + Executor 개념 + Airflow 아키텍처 리뷰

Airflow 운영 상의 어려움

관리해야하는 DAG의 수가 100개를 넘어간다면?
데이터 품질이나 데이터 리니지 이슈 이외에도 다양한 이슈들이 발생
어떤 이슈들이 있을까?
라이브러리 충돌
Worker의 부족
Worker 서버들의 관리와 활용도 이슈
1.
라이브러리 충돌
라이브러리/모듈의 충돌 이슈가 발생하기 시작함
DAG에 따라 실행에 필요한 라이브러리/모듈이 달라지기 시작
예) Python 버전
이로 인해 DAG 혹은 Task별로 별도의 독립공간을 만들어주는 것이 필요
Docker to the rescue
Dag 혹은 Task 코드를 Docker Image로 만들고 이를 독립된 공간(Docker Container)안에서 실행
2.
Worker의 부족
Scale Up
Scale Out
클라우드 서비스 사용
K8s와 같은 컨테이너 기술 사용
필요한대로 서버 요청
서버가 탄력적으로 돌아가야 하는 모든 환경에서 실행 가능
3.
낮은 Server Utilization 이슈
a.
Airflow 전용 하드웨어를 지정했는데 서버들이 항상 바쁘지 않다면?
b.
서비스별로 전용 서버를 할당하는 것은 여러가지로 이슈를 만들어냄
i.
서비스별로 Capacity 관리를 해야함
각 서버가 필요할 만큼 on demand 형식으로 리소스를 가져다 씀
ii.
각 서비스에 속한 서버들은 보면 utilization이 낮은 이슈 발생
c.
이 역시 K8s와 같은 컨테이너 기술의 도입으로 해결 가능

해결책

1.
태스크나 DAG 코드를 Docker Image로 만들어서 Docker Container 형태로 실행
라이브러리/모듈 충돌을 방지
개발 환경과 프로덕션 환경을 동일하게 유지
2.
Airflow Worker를 K8s에서 필요한 대로 동적으로 할당하여 사용
전용 서버를 Airflow에 할당하지 않고 Container Orchestration 서비스를 통해 할당해서 사용하고 리턴
Airflow에서 이를 해결하는 방법은 3가지
Airflow Operator로 KubernetesPodOperator를 사용
Airflow Operator로 DockerOperator를 사용
Airflow Executor로 아래를 사용
KubernetesExecutor
CeleryKubernetesExecutor
LocalKubernetesExecutor

잠깐! Airflow Executor는 무엇?

Executor는 Task들을 관리하고 실행하는 역할을 수행
즉, Executor는 Scheduler 프로세스안에 존재하는 요소이나, Task가 동작하는 환경과 메커니즘을 Executor 라고 할 수 있음
병렬 혹은 일렬 실행이나 어느 worker에서 실행할지 등등
다양한 수의 Executor 타입이 존재
확장 불가
Sequential Executor
디폴트로 설치되며 Sqlite와 같은 싱글스레드 DB에서만 사용가능
한번에 하나의 Task만 실행이 가능
Local Executor
task들을 Airflow 마스터 노드안에서 실행
한번에 여러 개의 태스크는 실행 가능
하지만 아키텍처 수평 구조 확장 불가 (스케일 아웃 불가)
확장 가능
Celery Executor
다수의 Worker 노드가 있는 경우 사용되며 Celery라는 비동기 큐를 사용해 task들을 worker 노드로 분산하여 실행
수평 구조 확장 가능
Kubernetes Executor
K8s 클러스터를 사용하여 task들을 독립된 환경에서 사용
쿠버네티스의 pod를 이용해 task 실행
각 태스크 실행 시 pod가 배포되어 실행된다
Local Kubernetes ExecutorCelery Kubernetes Executor도 존재

Airflow 아키텍처: Docker와 K8s를 사용하는 방법

Airflow Operator로 KubernetesPodOperator를 사용
이 방식은 특정 태스크를 Docker Image로 만들어 K8s에서 실행
Airflow Operator로 DockerOperator를 사용
이 방식은 특정 태스크를 Docker Image로 만들어 Docker Container 위에서 실행
Airflow Executor로 다음 중의 하나를 사용
KubernetesExecutor
모든 DAG 코드가 Docker Image로 빌드되어 K8s에서 실행됨
CeleryKubernetesExecutor
CeleryExecutor와 KubernetesExecutor를 동시에 사용하는 방법을 제공해주는 하이브리드 Executor
이는 Airflow 로드가 전체적으로 큰데 소수의 task만 Isolation을 필요로 하는 경우
LocalKubernetesExecutor
LocalExecutor와 KubernetesExecutor를 동시에 사용하는 방법을 제공해주는 Executor

Celery Executor

대표적인 Executor, 파이썬의 Celery 라는 비동기 큐 라이브러리를 이용
Celery 처리 방식
아래에서 백엔드는 같은 메세지 브로커 혹은 다른 메세지 브로커가 될 수 있다.
Celery는 Redis / RabbitMQ 등의 큐를 사용
파이썬의 celery 라이브러리를 이용해 작업을 요청하는 파이썬 클라이언트를 통해 작업 요청해놓고, 다른일을 할 수 있게 된다
1.
태스크를 메세지 브로커에게 전달
2.
Celery 워커(Airflow 워커)가 태스크를 실행
3.
백엔드 DB에 태스크 처리 결과 저장
a.
위에서는 백엔드 DB와 메타 DB가 같은 역활을 한다.
4.
스케쥴러가 백엔드 DB로부터 태스크 처리 결과 확인
5.
메타 데이터베이스에 태스크 상태를 업데이트
Celery Executor에서 스케쥴러와 워커는 같은 서버에 있을 필요가 없다
스케쥴러 1개에 워커 서버를 다수 띄워서 작동이 가능하다.
부하에 대해서 수평적으로 대응이 가능함

Celery Executor 구성방법

질문 1) Scheduler나 Worker를 더 띄울 수 있을까? → 파이썬 Celery는 task를 요청하는 client / task를 처리하는 Celery worker가 여러 개인 환경에서도 동작하므로 scheduler, airflow worker를 더 띄울 수 있다
질문 2) Airflow 모듈을 꼭 컨테이너로 구성해야 할까? → No!
프로세스로써 Airflow 모듈을 시작하는 것도 가능합니다. #> airflow celery worker (celery worker 프로세스 기동) #> airflow scheduler (scheduler 프로세스 기동)
질문 3) Scheduler, Worker를 서로 다른 물리적 서버에 올려도 될까? → Yes! (동일한 airflow.cfg(환경변수 설정파일)만 가지고 있다면)

Celery Executor 구성 방법 예시

Celery Executor 적용시 Airflow 구성방법은 자유자재이다. (각 서비스를 프로세스로 구성 or 컨테이너로 구성 or 쿠버네티스 Pod로 구성) Kubernetes Executor만 쿠버네티스에서 운영할 수 있는 것은 아니다
조건 1) 각 서비스는 동일한 airflow.cfg 파일(환경변수 설정파일)을 가지고 있어야 한다. 조건 2) 동일한 DAG 소스를 바라봐야 한다.
→ 조건 2를 위해 NAS(파일 전용 스토리지 디바이스)를 구성
1번째 구성 방법
다수 노드에 걸쳐 구성하거나 프로세스로써 기동도 가능
L4 로드밸러서를 두면 로드 밸런싱이 가능해진다.
L4를 통해 웹서버 라우팅을 분기처리 조정해준다
워커는 노드 개수만큼 띄워도 문제가 없다. 오히려 분산됨
Redis 혹은 메타 데이터 베이스도 2개 이상 띄워도 된다
하지만 이는 서버 이중화 전략에 해당된다 (Active와 Stand by)
2번째 구성 방법
컨테이너로써 기동 선택 가능
컨테이너로써 기동한다면 컨테이너 오케스트레이션 플랫폼인 쿠버네티스 플랫폼 상에 Pod로써 기동할 수 있지 않을까?
위와 같이 쿠버네티스 환경에 Pod 형태로 띄우고 Celery Executor 형태로도 사용한다고 합니다

Cloud SaaS 서비스

3대 Cloud Provider인 AWS, GCP, MS Azure에 Airflow 서비스를 완전 관리형으로 제공하고 있음
Cloud SaaS 서비스로써 Airflow 사용 가능하지만 각 Provider, 버전 별 제약사항이 존재하므로 잘 알아봐야 한다.
가격, 로그, 디버깅 면에서 불편하고 적합하지 않을 수 있음.
GCP 관련 많은 서비스의 operator, hook, sensor 등 제공되고 있어서 GCP와 궁합이 좋다

Kubernetes Executor

Kubernetes를 이용한 Task 실행
쿠버네티스는 머리에 해당하는 컨트롤 Plane, 손에 해당하는 워커노드로 구성
배포할 컨테이너들을 그룹화하여 pod 라는 이름으로 배포함
Airflow의 Scheduler는 쿠버네티스의 API 서버를 이용, Pod 배포 요청
Airflow의 Task는 Pod로써 배포 Task를 마치면 Pod는 제거됨