Search

Airflow API 톺아보기

개요

Airflow가 제공해주는 API에 대해서 알아보고 이를 이용해 모니터링 방법에 대해 알아보자
Airflow의 건강 여부 체크 (health check)을 어떻게 할지 학습
Airflow API로 외부에서 Airflow를 조작해보는 방법에 대해 학습

Airflow API 활성화

airflow.cfg의 api 섹션에서 auth_backend의 값을 변경
[api] auth_backend = airflow.api.auth.backend.basic_auth
Python
복사
docker-compose.yaml에는 이미 설정이 되어 있음 (environments)
x-airflow-common 부분의 env 를 수정해주면 된다
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
AIRFLOW__API__AUTH_BACKENDS와 같이 Airflow 뒤에 __ 가 붙으면, airflow.cfg를 오버라이딩하는 것
AIRFLOW__SECTION__KEY
Python
복사
section - api, key - auth_backends airflow.cfg에 써져 있는 설정들을 위의 변수를 통해 오버라이딩이 가능해진다.
아래 명령으로 확인해보기 $ docker exec -it learn-airflow-airflow-scheduler-1 airflow config get-value api auth_backend airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session
docker exec -it airflow-setup-airflow-scheduler-1 airflow config get-value api auth_backend >>airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session
Python
복사
가능하면, Airflow의 Web UI 자체는 VPN(Virtual Private Network) 뒤에 놓아 접근 권한에 허들을 놓는 편이 좋다
Airflow Web UI에서 새로운 사용자 추가 (API 사용자)
Admin 권한이 있는 유저일 경우 아래의 Security가 보일 것
Security ⇒ List Users ⇒ +
이후 화면에서 새 사용자 정보 추가 (monitor:MonitorUser1)

Health API 호출 (모니터링)

/health API 호출
curl -X GET --user "monitor:MonitorUser1" http://localhost:8080/health
Python
복사
webserver 자체의 작동을 확인
응답 코드 4xx, 5xx ⇒ 웹서버 자체의 에러
정상 경우 응답(2xx):
{ "metadatabase": { "status": "healthy" }, "scheduler": { "status": "healthy", "latest_scheduler_heartbeat": "2022-03-12T06:02:38.067178+00:00" } }
Python
복사
메타 DB와 스케쥴러의 status를 알려준다.
timezone은 UTC 기준

API 사용 예시

API 레퍼런스 살펴보기

Get Current Configuration
지금 설정되어 있는 config를 응답해준다.
http://localhost:8080/api/v1/config
json 형태로 응답을 해준다

특정 DAG를 API로 Trigger하기

curl -X POST --user "airflow:airflow" -H 'Content-Type: application/json' -d '{"execution_date":"2023-05-24T00:00:00Z"}' "http://localhost:8080/api/v1/dags/HelloWorld/dagRuns" curl -X 'POST' \ 'http://{ip}:8080/api/v1/dags/{dag_id}/dagRuns' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ --user "id:$AIRFLOW_PASSWORD" \ -d '{ "logical_date": "2023-11-16T09:52:55.522Z" }'
Python
복사
이 명령을 실행하면 해당 API 엔드포인트로 POST 요청이 전송되고, 지정된 DAG의 실행이 시작됩니다. 실행 날짜에 대한 세부 정보는 JSON 본문의 "execution_date" 필드에 의해 제공됩니다.
1.
curl -X POST: CURL을 사용하여 POST 요청을 보냄을 나타냅니다.
2.
-user "airflow:airflow": Basic 인증을 사용하여 사용자 이름과 비밀번호를 지정합니다. 여기서는 사용자 이름과 비밀번호를 "airflow"로 설정합니다.
3.
H 'Content-Type: application/json': 요청 헤더에 포함될 내용으로, 이 경우 JSON 형식의 데이터를 보낸다는 것을 나타냅니다.
4.
d '{"execution_date":"2023-05-24T00:00:00Z"}': 요청 본문에 포함될 데이터입니다. JSON 형식으로 작성되며, "execution_date" 필드에는 실행할 DAG의 실행 날짜 및 시간을 지정합니다. 여기서는 2023년 5월 24일 00:00:00(UTC)를 실행 날짜로 설정합니다.
5.
"http://localhost:8080/api/v1/dags/HelloWorld/dagRuns": 요청을 보낼 Airflow REST API의 엔드포인트입니다. 여기서는 "HelloWorld" DAG의 실행을 시작하기 위해 "/api/v1/dags/HelloWorld/dagRuns" 엔드포인트를 사용합니다. 로컬 호스트의 8080 포트에서 실행 중인 Airflow 웹 서버에 요청을 보냅니다.
{ "detail": "DAGRun with DAG ID: 'HelloWorld' and DAGRun logical date: '2023-05-24 00:00:00+00:00' already exists", "status": 409, "title": "Conflict", "type": "https://airflow.apache.org/docs/apache-airflow/2.5.1/stable-rest-api-ref.html#section/Errors/AlreadyExists" }
Python
복사
실습 후 응답값 확인
curl -X POST --user "airflow:airflow" -H 'Content-Type: application/json' -d '{"execution_date":"2023-05-24T00:00:00Z"}' "http://localhost:8080/api/v1/dags/HelloWorld/dagRuns"
Python
복사
{ "conf": {}, "dag_id": "HelloWorld", "dag_run_id": "manual__2023-05-24T00:00:00+00:00", "data_interval_end": "2023-05-23T02:00:00+00:00", "data_interval_start": "2023-05-22T02:00:00+00:00", "end_date": null, "execution_date": "2023-05-24T00:00:00+00:00", "external_trigger": true, "last_scheduling_decision": null, "logical_date": "2023-05-24T00:00:00+00:00", "note": null, "run_type": "manual", "start_date": null, "state": "queued" }
Python
복사
hello world dag가 큐에 들어간 것을 확인 가능하다.

모든 DAG 리스트하기

curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/dags
Python
복사
아래는 DAG 리스트 중 일부에 대한 응답 형식이다.
{ "dag_id": "SQL_to_Sheet", "default_view": "grid", "description": null, "file_token": "...", "fileloc": "/opt/airflow/dags/SQL_to_Sheet.py", "has_import_errors": false, "has_task_concurrency_limits": false, "is_active": true, # 중요 "is_paused": true, "is_subdag": false, "last_expired": null, "last_parsed_time": "2023-06-18T05:21:34.266335+00:00", "last_pickled": null, "max_active_runs": 16, "max_active_tasks": 16, "next_dagrun": "2022-06-18T00:00:00+00:00", "next_dagrun_create_after": "2022-06-18T00:00:00+00:00", "next_dagrun_data_interval_end": "2022-06-18T00:00:00", "next_dagrun_data_interval_start": "2022-06-18T00:00:00", "owners": [ "airflow" ], "pickle_id": null, "root_dag_id": null, "schedule_interval": { "__type": "CronExpression", "value": "@once" }, "scheduler_lock": null, "tags": [ { "name": "example" }], "timetable_description": "Once, as soon as possible" }
Python
복사
보통 주기적으로 스케쥴링 되는 DAG가 아니라 외부조건 즉, 다른 DAG 혹은 API를 통해 트리거링되는 DAG는 @once 로 설정하는 경우가 일반적이다. 그리고 신기하게도 is_active, is_paused 둘 다 True로 설정이 되어있다. 코드가 dags 폴더 안에 존재하면서 트리거링 가능하게끔 하려면 is_active = True, 하지만 아직 dag가 활성화가 되어 있지 않기 때문에 paused가 True.
스케쥴링과 더불어서 is_active와 is_paused 파라미터의 의미를 아는 것도 중요하다
is_active
"is_active"은 Apache Airflow의 DAG(Directed Acyclic Graph) 설정 중 하나로, DAG가 현재 활성화되어 있는지를 나타냅니다.
"is_active"가 "true"로 설정
해당 DAG는 활성화 상태로 Airflow 스케줄러에 의해 자동으로 실행됩니다. DAG를 실행할 예정 시간이 지나면 자동으로 실행됩니다.
반대로, "is_active"가 "false"로 설정
해당 DAG는 비활성화되어 Airflow 스케줄러가 무시하고 실행하지 않습니다. DAG를 일시적으로 중지하거나 임시로 비활성화하려면 이 속성을 사용할 수 있습니다.
따라서 "is_active"는 DAG의 실행 가능 여부를 결정하는 중요한 설정 값입니다.
is_paused
"is_paused"이 "true"로 설정
해당 DAG는 일시 중지 상태입니다. 일시 중지된 DAG는 예약된 실행 시간에도 실행되지 않습니다. DAG 일시 중지는 실행을 잠시 중단하고자 할 때 사용됩니다.
"is_paused"이 "false"로 설정
DAG는 일시 중지되지 않았으며 정상적으로 실행될 수 있습니다. 스케줄에 따라 예약된 시간에 실행되거나, 트리거 또는 외부 이벤트에 따라 실행될 수 있습니다.
"is_paused" 속성은 DAG의 실행 상태를 관리합니다. DAG의 실행을 일시적으로 중단하거나 재개할 수 있습니다. 이를 통해 DAG의 실행을 조정하고, 원하는 시기에 실행을 제어할 수 있습니다.
REST API가 아닌 파이썬 스크립트로 만들기 (get_dags.py)
import requests from requests.auth import HTTPBasicAuth url = "http://localhost:8080/api/v1/dags" dags = requests.get(url, auth=HTTPBasicAuth("airflow", "airflow")) print(dags.text)
Python
복사
지금 활성화되어 있는 (is_paused = False)인 DAG를 프린트하는 파이썬 스크립트 작성
get_active_dag.py
import requests from requests.auth import HTTPBasicAuth url = "http://localhost:8080/api/v1/dags" dags = requests.get(url, auth=HTTPBasicAuth("airflow", "airflow")) for d in dags.json()["dags"]: if not d["is_paused"]: print(d["dag_id"])
Python
복사

모든 Variable, Connections 리스트하기

curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/variables
Python
복사
# variables 예제 { "total_entries": 7, "variables": [ { "description": null, "key": "api_token", "value": "12345667" }, { "description": null, "key": "csv_url", "value": "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv" },
Python
복사
curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/connections
Python
복사
위의 connections, variables를 리스팅할 때, Docker env 환경 변수로 정의가 된 것들도 보여줄까?
docker로 환경 변수 처리된 것들은 API를 통해선 확인이 되지 않는다
왜냐하면 web ui, 즉 메타 데이터베이스에 저장된 정보들만 리턴을 해주게 끔 세팅이 되어 있기 때문이다.
컨테이너 내부에 들어가서 env 명령어를 통해 리스트를 확인해야 한다.
$ docker exec -it airflow-setup-airflow-scheduler-1 airflow variables get DATA_DIR => /opt/airflow/data이 리턴됨
Python
복사

모든 Config 리스트하기

curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/config
Python
복사
{ "detail": "Your Airflow administrator chose not to expose the configuration, most likely for security reasons.", "status": 403, "title": "Forbidden", "type": "https://airflow.apache.org/docs/apache-airflow/2.5.1/stable-rest-api-ref.html#section/Errors/PermissionDe nied" }
Python
복사
이는 admin 계정으로 접근했는데도 위 엔드포인트는 기본적으로 막혀 있음. 어떻게 풀어줄 수 있을까?
airflow.cfg에는 이 것을 풀 수 있는 키가 존재
아래 키는 API 엔드포인트에 대한 인증과 액세스 제어를 관리한다.
Airflow 설정 파일(airflow.cfg)
webserver 섹션의 expose_config이며 이를 True로 설정해야함 (문자열로 설정)
docker 환경에서 풀기 위해서는 Docker compose에서 이 키를 오버라이딩하여 접근 제한 해제를 해야한다.
x-airflow-common: &airflow-common image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.5.1} # build: . environment: &airflow-common-env AIRFLOW_VAR_DATA_DIR: /opt/airflow/data . . . AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session' AIRFLOW__WEBSERVER__EXPOSE_CONFIG: 'true'
Python
복사
이후 docker compose down 한 후, docker compose up을 다시 하자.
airflow.cfg가 갱신되었기 때문.

Variables/Connections Import/Export

스케쥴러 컨테이너 내부 접속
airflow variables export variables.json airflow variables import variables.json airflow connections export connections.json airflow connections import connections.json
Python
복사
컨테이너 외부
docker exec -it airflow-setup-airflow-scheduler-1 airflow variables export variables.json
Python
복사