개요
Airflow가 제공해주는 API에 대해서 알아보고 이를 이용해 모니터링 방법에 대해 알아보자
•
Airflow의 건강 여부 체크 (health check)을 어떻게 할지 학습
•
Airflow API로 외부에서 Airflow를 조작해보는 방법에 대해 학습
Airflow API 활성화
•
airflow.cfg의 api 섹션에서 auth_backend의 값을 변경
[api]
auth_backend = airflow.api.auth.backend.basic_auth
Python
복사
•
docker-compose.yaml에는 이미 설정이 되어 있음 (environments)
◦
x-airflow-common 부분의 env 를 수정해주면 된다
◦
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
◦
AIRFLOW__API__AUTH_BACKENDS와 같이 Airflow 뒤에 __ 가 붙으면, airflow.cfg를 오버라이딩하는 것
AIRFLOW__SECTION__KEY
Python
복사
section - api, key - auth_backends
airflow.cfg에 써져 있는 설정들을 위의 변수를 통해 오버라이딩이 가능해진다.
•
아래 명령으로 확인해보기
$ docker exec -it learn-airflow-airflow-scheduler-1 airflow config get-value api auth_backend
airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session
docker exec -it airflow-setup-airflow-scheduler-1 airflow config get-value api auth_backend
>>airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session
Python
복사
•
가능하면, Airflow의 Web UI 자체는 VPN(Virtual Private Network) 뒤에 놓아 접근 권한에 허들을 놓는 편이 좋다
•
Airflow Web UI에서 새로운 사용자 추가 (API 사용자)
◦
Admin 권한이 있는 유저일 경우 아래의 Security가 보일 것
◦
Security ⇒ List Users ⇒ +
◦
이후 화면에서 새 사용자 정보 추가 (monitor:MonitorUser1)
Health API 호출 (모니터링)
•
/health API 호출
curl -X GET --user "monitor:MonitorUser1" http://localhost:8080/health
Python
복사
◦
webserver 자체의 작동을 확인
◦
응답 코드 4xx, 5xx ⇒ 웹서버 자체의 에러
•
정상 경우 응답(2xx):
{
"metadatabase": {
"status": "healthy"
},
"scheduler": {
"status": "healthy",
"latest_scheduler_heartbeat": "2022-03-12T06:02:38.067178+00:00"
}
}
Python
복사
◦
메타 DB와 스케쥴러의 status를 알려준다.
◦
timezone은 UTC 기준
API 사용 예시
API 레퍼런스 살펴보기
•
Get Current Configuration
◦
지금 설정되어 있는 config를 응답해준다.
◦
http://localhost:8080/api/v1/config
◦
json 형태로 응답을 해준다
특정 DAG를 API로 Trigger하기
curl -X POST --user "airflow:airflow" -H 'Content-Type: application/json' -d
'{"execution_date":"2023-05-24T00:00:00Z"}'
"http://localhost:8080/api/v1/dags/HelloWorld/dagRuns"
curl -X 'POST' \
'http://{ip}:8080/api/v1/dags/{dag_id}/dagRuns' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
--user "id:$AIRFLOW_PASSWORD" \
-d '{
"logical_date": "2023-11-16T09:52:55.522Z"
}'
Python
복사
이 명령을 실행하면 해당 API 엔드포인트로 POST 요청이 전송되고, 지정된 DAG의 실행이 시작됩니다. 실행 날짜에 대한 세부 정보는 JSON 본문의 "execution_date" 필드에 의해 제공됩니다.
1.
curl -X POST: CURL을 사용하여 POST 요청을 보냄을 나타냅니다.
2.
-user "airflow:airflow": Basic 인증을 사용하여 사용자 이름과 비밀번호를 지정합니다. 여기서는 사용자 이름과 비밀번호를 "airflow"로 설정합니다.
3.
H 'Content-Type: application/json': 요청 헤더에 포함될 내용으로, 이 경우 JSON 형식의 데이터를 보낸다는 것을 나타냅니다.
4.
d '{"execution_date":"2023-05-24T00:00:00Z"}': 요청 본문에 포함될 데이터입니다. JSON 형식으로 작성되며, "execution_date" 필드에는 실행할 DAG의 실행 날짜 및 시간을 지정합니다. 여기서는 2023년 5월 24일 00:00:00(UTC)를 실행 날짜로 설정합니다.
5.
"http://localhost:8080/api/v1/dags/HelloWorld/dagRuns": 요청을 보낼 Airflow REST API의 엔드포인트입니다. 여기서는 "HelloWorld" DAG의 실행을 시작하기 위해 "/api/v1/dags/HelloWorld/dagRuns" 엔드포인트를 사용합니다. 로컬 호스트의 8080 포트에서 실행 중인 Airflow 웹 서버에 요청을 보냅니다.
{
"detail": "DAGRun with DAG ID: 'HelloWorld' and DAGRun logical date: '2023-05-24 00:00:00+00:00'
already exists",
"status": 409,
"title": "Conflict",
"type":
"https://airflow.apache.org/docs/apache-airflow/2.5.1/stable-rest-api-ref.html#section/Errors/AlreadyExists"
}
Python
복사
•
실습 후 응답값 확인
curl -X POST --user "airflow:airflow" -H 'Content-Type: application/json' -d '{"execution_date":"2023-05-24T00:00:00Z"}' "http://localhost:8080/api/v1/dags/HelloWorld/dagRuns"
Python
복사
{
"conf": {},
"dag_id": "HelloWorld",
"dag_run_id": "manual__2023-05-24T00:00:00+00:00",
"data_interval_end": "2023-05-23T02:00:00+00:00",
"data_interval_start": "2023-05-22T02:00:00+00:00",
"end_date": null,
"execution_date": "2023-05-24T00:00:00+00:00",
"external_trigger": true,
"last_scheduling_decision": null,
"logical_date": "2023-05-24T00:00:00+00:00",
"note": null,
"run_type": "manual",
"start_date": null,
"state": "queued"
}
Python
복사
◦
hello world dag가 큐에 들어간 것을 확인 가능하다.
모든 DAG 리스트하기
curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/dags
Python
복사
아래는 DAG 리스트 중 일부에 대한 응답 형식이다.
{
"dag_id": "SQL_to_Sheet",
"default_view": "grid",
"description": null,
"file_token": "...",
"fileloc": "/opt/airflow/dags/SQL_to_Sheet.py",
"has_import_errors": false,
"has_task_concurrency_limits": false,
"is_active": true, # 중요
"is_paused": true,
"is_subdag": false,
"last_expired": null,
"last_parsed_time": "2023-06-18T05:21:34.266335+00:00",
"last_pickled": null,
"max_active_runs": 16,
"max_active_tasks": 16,
"next_dagrun": "2022-06-18T00:00:00+00:00",
"next_dagrun_create_after": "2022-06-18T00:00:00+00:00",
"next_dagrun_data_interval_end": "2022-06-18T00:00:00",
"next_dagrun_data_interval_start": "2022-06-18T00:00:00",
"owners": [ "airflow" ],
"pickle_id": null,
"root_dag_id": null,
"schedule_interval": {
"__type": "CronExpression",
"value": "@once"
},
"scheduler_lock": null,
"tags": [ { "name": "example" }],
"timetable_description": "Once, as soon as possible"
}
Python
복사
보통 주기적으로 스케쥴링 되는 DAG가 아니라 외부조건 즉, 다른 DAG 혹은 API를 통해 트리거링되는 DAG는 @once 로 설정하는 경우가 일반적이다.
그리고 신기하게도 is_active, is_paused 둘 다 True로 설정이 되어있다. 코드가 dags 폴더 안에 존재하면서 트리거링 가능하게끔 하려면 is_active = True, 하지만 아직 dag가 활성화가 되어 있지 않기 때문에 paused가 True.
스케쥴링과 더불어서 is_active와 is_paused 파라미터의 의미를 아는 것도 중요하다
•
is_active
"is_active"은 Apache Airflow의 DAG(Directed Acyclic Graph) 설정 중 하나로, DAG가 현재 활성화되어 있는지를 나타냅니다.
"is_active"가 "true"로 설정
◦
해당 DAG는 활성화 상태로 Airflow 스케줄러에 의해 자동으로 실행됩니다. DAG를 실행할 예정 시간이 지나면 자동으로 실행됩니다.
반대로, "is_active"가 "false"로 설정
◦
해당 DAG는 비활성화되어 Airflow 스케줄러가 무시하고 실행하지 않습니다. DAG를 일시적으로 중지하거나 임시로 비활성화하려면 이 속성을 사용할 수 있습니다.
따라서 "is_active"는 DAG의 실행 가능 여부를 결정하는 중요한 설정 값입니다.
•
is_paused
"is_paused"이 "true"로 설정
◦
해당 DAG는 일시 중지 상태입니다. 일시 중지된 DAG는 예약된 실행 시간에도 실행되지 않습니다. DAG 일시 중지는 실행을 잠시 중단하고자 할 때 사용됩니다.
"is_paused"이 "false"로 설정
◦
DAG는 일시 중지되지 않았으며 정상적으로 실행될 수 있습니다. 스케줄에 따라 예약된 시간에 실행되거나, 트리거 또는 외부 이벤트에 따라 실행될 수 있습니다.
"is_paused" 속성은 DAG의 실행 상태를 관리합니다. DAG의 실행을 일시적으로 중단하거나 재개할 수 있습니다. 이를 통해 DAG의 실행을 조정하고, 원하는 시기에 실행을 제어할 수 있습니다.
•
REST API가 아닌 파이썬 스크립트로 만들기 (get_dags.py)
import requests
from requests.auth import HTTPBasicAuth
url = "http://localhost:8080/api/v1/dags"
dags = requests.get(url, auth=HTTPBasicAuth("airflow", "airflow"))
print(dags.text)
Python
복사
•
지금 활성화되어 있는 (is_paused = False)인 DAG를 프린트하는 파이썬 스크립트 작성
◦
get_active_dag.py
import requests
from requests.auth import HTTPBasicAuth
url = "http://localhost:8080/api/v1/dags"
dags = requests.get(url, auth=HTTPBasicAuth("airflow", "airflow"))
for d in dags.json()["dags"]:
if not d["is_paused"]:
print(d["dag_id"])
Python
복사
모든 Variable, Connections 리스트하기
curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/variables
Python
복사
# variables 예제
{
"total_entries": 7,
"variables": [
{
"description": null,
"key": "api_token",
"value": "12345667"
},
{
"description": null,
"key": "csv_url",
"value": "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"
},
Python
복사
curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/connections
Python
복사
위의 connections, variables를 리스팅할 때, Docker env 환경 변수로 정의가 된 것들도 보여줄까?
•
docker로 환경 변수 처리된 것들은 API를 통해선 확인이 되지 않는다
◦
왜냐하면 web ui, 즉 메타 데이터베이스에 저장된 정보들만 리턴을 해주게 끔 세팅이 되어 있기 때문이다.
•
컨테이너 내부에 들어가서 env 명령어를 통해 리스트를 확인해야 한다.
$ docker exec -it airflow-setup-airflow-scheduler-1 airflow variables get DATA_DIR
=> /opt/airflow/data이 리턴됨
Python
복사
모든 Config 리스트하기
curl -X GET --user "airflow:airflow" http://localhost:8080/api/v1/config
Python
복사
{
"detail": "Your Airflow administrator chose not to expose the configuration, most likely for security
reasons.",
"status": 403,
"title": "Forbidden",
"type":
"https://airflow.apache.org/docs/apache-airflow/2.5.1/stable-rest-api-ref.html#section/Errors/PermissionDe
nied"
}
Python
복사
•
이는 admin 계정으로 접근했는데도 위 엔드포인트는 기본적으로 막혀 있음. 어떻게 풀어줄 수 있을까?
◦
airflow.cfg에는 이 것을 풀 수 있는 키가 존재
▪
아래 키는 API 엔드포인트에 대한 인증과 액세스 제어를 관리한다.
▪
Airflow 설정 파일(airflow.cfg)
•
webserver 섹션의 expose_config이며 이를 True로 설정해야함 (문자열로 설정)
◦
docker 환경에서 풀기 위해서는 Docker compose에서 이 키를 오버라이딩하여 접근 제한 해제를 해야한다.
x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.5.1}
# build: .
environment:
&airflow-common-env
AIRFLOW_VAR_DATA_DIR: /opt/airflow/data
.
.
.
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
AIRFLOW__WEBSERVER__EXPOSE_CONFIG: 'true'
Python
복사
◦
이후 docker compose down 한 후, docker compose up을 다시 하자.
▪
airflow.cfg가 갱신되었기 때문.
Variables/Connections Import/Export
•
스케쥴러 컨테이너 내부 접속
airflow variables export variables.json
airflow variables import variables.json
airflow connections export connections.json
airflow connections import connections.json
Python
복사
•
컨테이너 외부
docker exec -it airflow-setup-airflow-scheduler-1 airflow variables export variables.json
Python
복사