Search

Airflow 아키텍처 및 Executor들

Airflow 아키텍처

:Airflow의 전체 아키텍처는 아래와 같으며, 핵심 요소는 다음 세 가지입니다:
웹서버
스케줄러
데이터베이스 (메타데이터 데이터베이스)
웹서버
웹 UI를 통해 파이프라인의 현재 상태를 시각적으로 확인하고, 모니터링하며 실행하거나 중지합니다.
스케줄러
워크플로우(DAG)의 실행 스케줄을 관리하고, 조건이 충족되면 해당 워크플로우를 실행합니다. DAG 파일을 읽고, 비트와 조각을 추출하며, 메타 스토어에 저장합니다.
Executor
태스크의 실행 방식을 정의하며, 스케줄러와 워커 사이에서 중재자 역할을 합니다. 다음과 같은 종류의 Executor가 있습니다:
SequentialExecutor
LocalExecutor
CeleryExecutor
KubernetesExecutor
이는 AIRFLOW__CORE__EXECUTOR로 설정할 수 있습니다. docker-compose.yaml에서 이를 확인할 수 있습니다.
⇒ 현재 docker-compose.yaml에서는 airflow의 executor가 CeleryExecutor로 설정되어 있음을 확인할 수 있습니다.
각 Executor의 구성을 비교해보면 다음과 같습니다.
익스큐터
분산 환경 지원
설치 난이도
적합한 환경
SequentialExecutor
지원하지 않음
매우 쉬움
시연 / 테스트
LocalExecutor
지원하지 않음
쉬움
단일 호스트 환경 추천
CeleryExecutor
지원함
보통
멀티 호스트 환경 고려시
KubernetesExecutor
지원하지 않음
어려움
쿠버네티스 기반 컨테이너 환경 구성 고려시

Executor 종류

SequentialExecutor

: 단일 프로세스에서 태스크를 순차적으로 실행하며, 한 번에 하나의 태스크만 실행 가능
설정이 매우 간단하며 외부 의존성이 필요 없음
클러스터 구성 X 및 스케일 인아웃 불가능
로컬 개발 환경이나 작은 워크플로우에서 테스트 용도로 적합
복잡한 병렬 처리나 대규모 워크로드에서는 부적합

LocalExecutor

: 로컬 머신에서 여러 태스크를 병렬로 실행하며, 멀티 프로세싱을 이용하여 여러 태스크를 동시에 처리 가능
중소 규모의 워크플로우를 처리할 때 사용됨
단일 머신에서 복잡한 워크플로우를 실행할 경우에 적합
외부 의존성 없이 로컬에서 병렬 처리를 가능하게 함

CeleryExecutor

: Celery 분산 작업 큐를 활용하여 여러 서버(워커)에 태스크를 분산하여 실행하며, 대규모의 워크플로우를 여러 머신에서 분산 처리할 수 있다.
Local Executor와 가장 큰 차이점으로 모든 구성 요소가 서로 다른 호스트에서 실행되므로 부하가 낮다.
대규모 워크플로우와 고병렬 워크로드를 처리하는 환경에 적합
RabbitMQ, Redis, AWS SQS 등의 메시지 브로커와 함께 동작함
워커를 수평으로 확장하여 처리 능력을 증가시킬 수 있음

KubernetesExecutor

: 각 태스크를 Kubernetes 파드(Pod)로 실행함으로써, 동적으로 워크로드를 조정하고 리소스를 효율적으로 사용
Kubernetes 클러스터를 사용하는 환경에 적합
동적 리소스 할당 및 관리가 필요한 환경에서 사용된다.
각 태스크가 실행될 때마다 독립적인 파드를 생성하며, 태스크가 완료되면 파드가 종료됨
워크로드에 따라 동적으로 리소스를 확장하거나 축소할 수 있음
모든 Executor 중에서 가장 스케일이 쉽다
cf) Airflow in K8s VS Argo Workflow:
Task 실행
Airflow: 각 Airflow Task를, 즉 Python 기반 Task를 개별 파드에서 실행
Argo: .yaml 파일로 DAG를 정의한 후 (steps 배열로 DAG 순서 정의) 각 Task를 파드 형태로 만들어 실행
데이터 소스
Airflow: 강력한 플러그인을 통한 데이터 소스와의 통합 유리
Argo: 상대적으로 데이터 소스 관련 플러그인은 적으나, k8s 기반 워크플로우 관리 및 연동에 강점
설치
Airflow: 설치 및 설정 매우 복잡, BUT 설정한 후 안정적으로 운영
Argo: 애초에 Kubernetes 기반으로 설계되었음. Kubectl 명령어를 통해 argo namespace를 지정한 후에 클러스터를 생성하면 끝

Metadata Database (Metastore)

: Airflow의 동작과 관련된 중요한 정보와 상태를 저장하는 DB.
DAG 실행의 상태, 태스크의 상태, 스케줄 정보 등 Airflow가 원활하게 동작하기 위한 핵심 메타데이터가 저장된다.
SQLAlchemy를 사용하여 직접 DB에 편리하게 작성할 수 있음.
SQLite의 경우 SequentialExecutor 환경에서만 사용 가능하며, 별도의 환경 구성 없이 airflow db init을 실행하면 SQLite 데이터 베이스가 생성됨
(여기선 왜 postrgresql이 생성되는 거지…?)
⇒ AIRFLOW__CORE__SQL__ALCHEMY__CONN을 PostgreSQL URI로 지정했기 때문!
만일 수많은 예제 DAG를 없애버리고 싶으면 다음과 같이 바꾼다.
그러면 짜라란~

Scheduler

: 워크플로우(DAG)의 실행 스케줄을 관리하고, 조건이 충족되면 해당 워크플로우를 실행
DAG 파일을 구문 분석하고 추출된 정보를 DB에 저장
실행 준비가 된 태스크를 지정하고 대기 상태로 전환
태스크 가져오기 및 실행
위의 Scheduler Job에서 스케줄러가 실행된다.
DAG의 오퍼레이터는 밑의 LocalTaskJob에서 실행

Executor 설정

SequentialExecutor

: 단일 프로세스에서 실행하며, 한 번에 하나의 태스크만 실행이 가능하다.
airflow db init을 실행하면 SQLite 데이터베이스가 초기화 됨
airflow schedulerairflow webserver을 시작하면 Airflow가 동작

LocalExecutor

: 위의 Sequential Executor은 Task의 병렬 실행이 불가능하다는 치명적인 단점이 있어, 상용 환경에는 어려움이 존재한다. LocalExecutor은 이와 달리 Subprocess들을 여러개 두어 병렬 실행을 가능하게 한다.
def __init__(self, parallelism: int = PARALLELISM): super().__init__(parallelism=parallelism) ## BaseExecutor 상속 self.manager: Optional[SyncManager] = None self.result_queue: Optional['Queue[TaskInstanceStateType]'] = None self.workers: List[QueuedLocalWorker] = [] self.workers_used: int = 0 self.workers_active: int = 0 self.impl: Optional[ Union['LocalExecutor.UnlimitedParallelism', 'LocalExecutor.LimitedParallelism'] ] = None
Python
복사
BaseExecutor을 상속받고 있으므로, super().__init__을 통해 부모 객체를 상속받고 있다.
그럼 병렬 처리를 구현하는 곳은 어디일까?
self.impl: Optional[ Union['LocalExecutor.UnlimitedParallelism', 'LocalExecutor.LimitedParallelism'] ] = None
Python
복사
⇒ 처음 LocalExecutor 객체가 생성될 때, 병렬처리를 구현하는 구현체는 위 코드에 적혀있다. 즉, UnlimitedParallelism, 또는 LimitedParallelism 둘 중에 하나가 선택되어 구현된다.
def start(self) -> None: """Start the executor.""" old_proctitle = getproctitle() setproctitle("airflow executor -- LocalExecutor") self.manager = Manager() setproctitle(old_proctitle) self.result_queue = self.manager.Queue() self.workers = [] self.workers_used = 0 self.workers_active = 0 self.impl = ( LocalExecutor.UnlimitedParallelism(self) if self.parallelism == 0 # 분기점 else LocalExecutor.LimitedParallelism(self) ) self.impl.start()
Python
복사
⇒ start() 메소드에서 위의 self.impl 속성이 결정된다. 이 self.parallelism 값은 airflow.cfg에서 설정할 수 있다.

UnlimitedParallelism

class UnlimitedParallelism: """ Implement LocalExecutor with unlimited parallelism, starting one process per command executed. :param executor: the executor instance to implement. """ def __init__(self, executor: LocalExecutor): self.executor: LocalExecutor = executor def start(self) -> None: """Start the executor.""" self.executor.workers_used = 0 self.executor.workers_active = 0
Python
복사
: 프로세스가 들어오는 대로 명령이 실행될 때마다 하나씩 프로세스를 실행히킨다.
def execute_async( self, key: TaskInstanceKey, command: CommandType, queue: Optional[str] = None, executor_config: Optional[Any] = None, ) -> None: """ Executes task asynchronously. :param key: the key to identify the task instance :param command: the command to execute :param queue: Name of the queue :param executor_config: configuration for the executor """ if not self.executor.result_queue: raise AirflowException(NOT_STARTED_MESSAGE) local_worker = LocalWorker(self.executor.result_queue, key=key, command=command) self.executor.workers_used += 1 self.executor.workers_active += 1 local_worker.start()
Python
복사
: Task를 실행시키는 subprocess가 실행되며, 병렬 처리가 개시되는 실질적인 코드다.
result_queue는 LocalExecutor가 생성될 때 자동으로 만들어지는 것이므로, 없을 시 Exception을 발생시킨다.
LocalWorker(...)생성자를 통해 queue, task, 명령어를 전달하고 LocalWorker 객체를 생성한다. (파라미터로 queue, task, command를 입력받으므로, 해당 테스크를 실행하는 subprocess가 만들어진 셈이다.)
local_worker.start()을 통해 Local Worker 프로세스를 실행한다.
def sync(self) -> None: """Sync will get called periodically by the heartbeat method.""" if not self.executor.result_queue: raise AirflowException("Executor should be started first") while not self.executor.result_queue.empty(): results = self.executor.result_queue.get() self.executor.change_state(*results) self.executor.workers_active -= 1
Python
복사
: heartbeat를 통해 주기적으로 호출되며, result_queue가 비어질 때 까지 self.executor.change_state(*results)를 통해 Task의 상태를 변경한다.
(BaseExecutor의 change_state())
def change_state(self, key: TaskInstanceKey, state: TaskInstanceState, info=None) -> None: """ Change state of the task. :param info: Executor information for the task instance :param key: Unique key for the task instance :param state: State to set for the task. """ self.log.debug("Changing state: %s", key) try: self.running.remove(key) except KeyError: self.log.debug("Could not find key: %s", key) self.event_buffer[key] = state, info
Python
복사

LocalWorkerBase

class LocalWorkerBase(Process, LoggingMixin): """ LocalWorkerBase implementation to run airflow commands. Executes the given command and puts the result into a result queue when done, terminating execution. :param result_queue: the queue to store result state """ def __init__(self, result_queue: 'Queue[TaskInstanceStateType]'): super().__init__(target=self.do_work) # 프로세스 실행 self.daemon: bool = True self.result_queue: 'Queue[TaskInstanceStateType]' = result_queue def execute_work(self, key: TaskInstanceKey, command: CommandType) -> None: """ Executes command received and stores result state in queue. :param key: the key to identify the task instance :param command: the command to execute """ if key is None: return self.log.info("%s running %s", self.__class__.__name__, command) if settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER: state = self._execute_work_in_subprocess(command) else: state = self._execute_work_in_fork(command) self.result_queue.put((key, state)) def _execute_work_in_subprocess(self, command: CommandType) -> str: try: subprocess.check_call(command, close_fds=True) return State.SUCCESS except subprocess.CalledProcessError as e: self.log.error("Failed to execute task %s.", str(e)) return State.FAILED ... @abstractmethod def do_work(self): """Called in the subprocess and should then execute tasks""" raise NotImplementedError()
Python
복사
LocalWorker의 부모 클래스 LocalWorkerBase의 코드 중, 눈여겨볼만한 부분이다.
생성자에서 target을 do_work()로 지정하고 있으며, 프로세스 실행과 연관이 있다.
EXECUTE_TASKS_NEW_PYTHON_INTERPRETER이 값에 따라 자식 프로세스를 만드는 모듈이 subprocess / os로 갈린다.
os로 자식 프로세스로 만드는 이유는 공식 문서에 다음과 같이 나와있다.
Spawning a whole new python process and then re-loading all of Airflow is expensive. All though this time fades to insignificance for long running tasks, this delay gives a "bad" experience for new users when they are just trying out Airflow for the first time. For the LocalExecutor this cuts the "queued time" down from 1.5s to 0.1s on average.
Markdown
복사