커스텀 컴포넌트 빌드
AirflowPlugin (먼저 맛보기)
from airflow.plugins_manager import AirflowPlugin
...
class CustomSqlToS3Operator(BaseOperator):
...
class CustomSqlToS3Plugin(AirflowPlugin):
name = "custom_sql"
hooks = []
operators = [CustomSqlToS3Operator]
Python
복사
AirflowPlugin 클래스를 상속하는 View, Operator,hook... 클래스를 생성한다
plugin 이름 등 속성을 설정한다
디렉토리는 아래와 같이 설정한다
dags
|
plugin
├── __init__.py
├── custom_sql
│ ├── __init__.py
│ ├── __pycache__
│ └── custom_sql_to_s3.py
Python
복사
위와 같이 적용을 한 후, Custom operator를 plugin에 적용하려면 airflow를 재작동하면 된다
어렵게 말하면, 생성 후 Lazy Loaded 이기때문에 airflow 인스턴스를 재시작해야한다.
본격 시작
DAG를 구성 시 계속해서 반복적으로 수행해야 하는 태스크가 있을 때, 단순 반복적으로 같은 코드를 사용해야 하는 경우가 있을 수 있다. 이런 경우 Airflow에서 커스텀 오퍼레이션을 직접 구현하여 코드의 재사용성을 높일 수 있게 된다.
본 장에서는 사용자가 자신만의 오퍼레이터를 빌드하고 DAG에서 이를 사용하는 방법을 설명한다. 그리고 이 커스텀 컴포넌트를 파이썬 패키지로 패키징하여 여러 환경에서 설치하거나 재사용할 수 있도록 하는 실습도 진행해본다.
8.0 예제 설명 및 실습 준비 사항
과거의 영화관람 기록에 따라 영화를 추천해주는 추천 시스템
교재에서는 MovieLens Dataset으로 미리 만들어놓은 API를 이용하여 실습 진행
영화 평점 API : startDate(시작 날짜)와 endDate(끝 날짜)를 입력받아 해당되는 기간의 Movielens Dataset을 반환해주는 API
•
docker-compose up 후, 브라우저에서 localhost:5001로 들어갔을 때 아래의 화면이 뜨면 잘 작동 하는 것.
•
localhost:5001/ratings?start_date=2019-01-01&end_date=2019-01-02 를 이용하면 2019년 1월 1일부터 2019년 1월 2일의 데이터를 반환받을 수 있음
→ 이를 이용하여 데이터 파이프라인을 구축할 때 전체 데이터 세트를 다 로드할 필요없이 (일별) 증분 방식으로 데이터를 로드해 올 수 있음.
•
위의 결과를 잘 보면 limit과 offset이 함께 반환되는 것을 볼 수 있는데, 여기서 limit 은 한 번에 가져올 수 있는 데이터의 행의 수이고, offset은 어디서 부터 가져올 것인지를 지정하는 parameter임.
◦
위의 start_date와 end_date 처럼 parameter에 넣어서 함께 요청하면 됨.
8.1 PythonOperator로 작업하기(Chap08/dags/01_python.py)
•
API에서 데이터를 일별로 가져오고, 처리하는 과정을 기존에 사용하던 PythonOperator로는 어떻게 구현할 수 있는지를 살펴보는 구간
8.1.1 API 액세스를 위해 request Session을 생성하는 함수 작성
# 환경 변수 설정
MOVIELENS_HOST = os.environ.get("MOVIELENS_HOST", "movielens")
MOVIELENS_SCHEMA = os.environ.get("MOVIELENS_SCHEMA", "http")
MOVIELENS_PORT = os.environ.get("MOVIELENS_PORT", "5001")
MOVIELENS_USER = "airflow"
MOVIELENS_PASSWORD = "airflow"
Python
복사
def _get_session():
"""Builds a requests Session for the Movielens API."""
# Setup our requests session.
session = requests.Session()
session.auth = (MOVIELENS_USER, MOVIELENS_PASSWORD)
# Define API base url from connection details.
schema = MOVIELENS_SCHEMA
host = MOVIELENS_HOST
port = MOVIELENS_PORT
base_url = f"{schema}://{host}:{port}"
return session, base_url
Python
복사
8.1.2 페이지를 처리하는 함수 작성
•
위의 API 설명에서 봤듯이 사용하는 API는 한 번에 가져올 수 있는 데이터의 개수가 제한되어 있음. 따라서 해당 기간의 데이터가 끝날 때까지 while문을 이용하여 데이터를 끝까지 가져와주어야 함.
•
while total is None or offset < total: 조건을 이용하여 데이터를 반복적으로 가져오는 함수.
def _get_with_pagination(session, url, params, batch_size=100):
"""
Fetches records using a get request with given url/params,
taking pagination into account.
"""
offset = 0
total = None
while total is None or offset < total:
response = session.get(
url, params={**params, **{"offset": offset, "limit": batch_size}}
)
response.raise_for_status()
response_json = response.json()
yield from response_json["result"]
offset += batch_size
total = response_json["total"]
Python
복사
8.1.3 특정 기간의 데이터를 반환하는 함수 작성
•
위에서 작성한 _get_session() 과 _get_with_pagination() 을 이용하여 특정 기간의 평점 데이터를 생성하는 generator()를 생성하는 함수
def _get_ratings(start_date, end_date, batch_size=100):
session, base_url = _get_session()
yield from _get_with_pagination(
session=session,
url=base_url + "/ratings",
params={"start_date": start_date, "end_date": end_date},
batch_size=batch_size,
)
Python
복사
8.1.4 _get_ratings 함수를 통해 가져온 데이터를 output_path에 저장하는 함수 작성
def _fetch_ratings(templates_dict, batch_size=1000, **_):
logger = logging.getLogger(__name__)
start_date = templates_dict["start_date"]
end_date = templates_dict["end_date"]
output_path = templates_dict["output_path"]
logger.info(f"Fetching ratings for {start_date} to {end_date}")
ratings = list(
_get_ratings(
start_date=start_date, end_date=end_date, batch_size=batch_size
)
)
logger.info(f"Fetched {len(ratings)} ratings")
logger.info(f"Writing ratings to {output_path}")
# Make sure output directory exists.
output_dir = os.path.dirname(output_path)
os.makedirs(output_dir, exist_ok=True)
with open(output_path, "w") as file_:
json.dump(ratings, fp=file_) # 데이터 저장
Python
복사
8.1.5 Ranking을 계산하는 함수 작성
•
저장한 평점 데이터를 이용하여 영화의 순위를 계산한 뒤 csv로 저장
# Chap08/dags/custom/ranking.py
def rank_movies_by_rating(ratings, min_ratings=2):
ranking = (
ratings.groupby("movieId")
.agg(
avg_rating=pd.NamedAgg(column="rating", aggfunc="mean"),
num_ratings=pd.NamedAgg(column="userId", aggfunc="nunique"),
)
.loc[lambda df: df["num_ratings"] > min_ratings]
.sort_values(["avg_rating", "num_ratings"], ascending=False)
)
return ranking
Python
복사
def _rank_movies(templates_dict, min_ratings=2, **_):
input_path = templates_dict["input_path"]
output_path = templates_dict["output_path"]
ratings = pd.read_json(input_path)
ranking = rank_movies_by_rating(ratings, min_ratings=min_ratings)
# Make sure output directory exists.
output_dir = os.path.dirname(output_path)
os.makedirs(output_dir, exist_ok=True)
ranking.to_csv(output_path, index=True)
Python
복사
8.1.6 Dag 작성 (dag_id = ‘Chap08_01_python’)
with DAG(
dag_id="Chap08_01_python",
description="Fetches ratings from the Movielens API using the Python Operator.",
start_date=dt.datetime(2019, 1, 1),
end_date=dt.datetime(2019, 1, 10),
schedule_interval="@daily",
) as dag:
/* _fetch_ratings 코드 */
fetch_ratings = PythonOperator(
task_id="fetch_ratings",
python_callable=_fetch_ratings,
templates_dict={
"start_date": "{{ds}}",
"end_date": "{{next_ds}}",
"output_path": "/opt/airflow/data/python/ratings/{{ds}}.json",
},
)
/* _rank_movies 코드 */
rank_movies = PythonOperator(
task_id="rank_movies",
python_callable=_rank_movies,
templates_dict={
"input_path": "/opt/airflow/data/python/ratings/{{ds}}.json",
"output_path": "/opt/airflow/data/python/ratings/{{ds}}.csv",
},
)
fetch_ratings >> rank_movies # 의존성 정의
Python
복사
8.1.7 Dag 실행
•
localhost:8080 → Chap08_01_python dag 실행
•
worker node 접속 후 파일 생성 확인
# 생성된 파일 확인
docker exec - it xxx-airflow-study-airflow-worker-1 /bin/bash
ls /opt/airflow/data/python/ratings
Python
복사
8.2 커스텀 훅 빌드하기
•
위의 코드에서 대부분의 함수의 경우 MovieLens Dataset의 API를 연결하고 데이터를 가져오는 용도로 쓰임. 위의 예시에서는 하나의 dag 파일에 함수 형태로 API 연결 및 호출 작업을 작성하였는데, 이런 경우 다른 DAG에서 같은 방식으로 API를 사용하고자 할 때 코드를 반복적으로 작성해야 한다는 문제가 있음.
•
Airflow의 Custom Hook을 이용하면 반복되는 작업을 캡슐화 할 수 있음.
8.2.0 Admin → Connection에서 movielens API 등록
비밀번호도 작성해주어야 합니다 : airflow
8.2.1 커스텀 훅 설계하기(Chap08/dags/custom/hooks.py)
•
Airflow에서 모든 훅은 추상 클래스(abstract class)인 BaseHook 클래스의 서브클래스(subclass)로 생성됨.
class MovielensHook(BaseHook):
...
Python
복사
•
훅을 구현하기 위해 훅 연결과 훅에 필요한 다른 추가적인 인수를 지정하는 __init__ 메서드를 정의해주어야 함. __init__ 메서드에는 훅 연결에 필요한 정보와 추가적인 인수를 정의할 수 있음.
def __init__(self, conn_id, retry=3):
super().__init__() # BaseHook 클래스 생성자 호출
self._conn_id = conn_id # conn_id를 통해 훅에게 어떤 connection을 사용하는지 전달.
self._retry = retry
self._session = None
self._base_url = None
Python
복사
•
다음으로는 외부 시스템과의 연결 설정을 책임지는 get_conn 메서드를 정의해주어야 함.
◦
◦
BaseHook에서 기본적으로 제공하는 self.get_connenction 메서드를 통해 Airflow 자격 인증 저장소에서 입력받은 conn_id에 해당하는 connection 정보를 얻어옴. → 정상적인 작동을 위해 사전에 8.2.0 작업이 필요.
◦
아래의 코드는 if self._session is None: 조건을 이용하여 get_conn 메서드가 반복적으로 호출되더라도 처음 한 번만 세션 설정을 진행하도록 작성됨. (동일한 작업을 방지)
def get_conn(self):
"""
Returns the connection used by the hook for querying data.
Should in principle not be used directly.
"""
# self._session에 세션을 캐싱시켜 불필요한 함수 호출을 방지
if self._session is None:
# Fetch config for the given connection (host, login, etc).
config = self.get_connection(self._conn_id)
# UI에서 필수적으로 입력되어야 하는 값이 입력되지 않았을 때 error 발생
if not config.host:
raise ValueError(f"No host specified in connection {self._conn_id}")
schema = config.schema or self.DEFAULT_SCHEMA
port = config.port or self.DEFAULT_PORT
self._base_url = f"{schema}://{config.host}:{port}"
# Build our session instance, which we will use for any
# requests to the API.
self._session = requests.Session()
if config.login:
self._session.auth = (config.login, config.password)
return self._session, self._base_url
Python
복사
•
영화의 평점을 가져오기 위해 필요한 get_ratings 와 _get_with_pagination 메서드 작성
◦
기존과 동일한 코드이므로 설명 생략
8.2.2 MovielensHook으로 DAG 빌드하기(Chap08/dags/02_hooks.py)
•
현재 파일 구조
.
├── 01_python.py
├── 02_hook.py ----> Chap08_02_hook DAG가 정의된 파일
├── 03_operator.py
├── 04_sensor.py
└── custom
├── hooks.py ----> Movielens Hook이 작성된 파일
├── operators.py
├── ranking.py
└── sensors.py
Python
복사
•
DAG 에서 ./custom/hooks.py에 정의된 Hook을 사용하기 위해서는 다음과 같이 정의한 Hook class를 import 해오면 됨.
from custom.hooks import MovielensHook
Python
복사
•
DAG 정의는 기존의 방식과 동일
◦
불러들인 MovielensHook class로 instance 만들어서 사용하면 됨.
→ 이를 통해 다른 DAG에서도 사용가능해짐.
with DAG(
dag_id="Chap08_02_hook",
description="Fetches ratings from the Movielens API using a custom hook.",
start_date=dt.datetime(2019, 1, 1),
end_date=dt.datetime(2019, 1, 10),
schedule_interval="@daily",
) as dag:
def _fetch_ratings(conn_id, templates_dict, batch_size=1000, **_):
logger = logging.getLogger(__name__)
start_date = templates_dict["start_date"]
end_date = templates_dict["end_date"]
output_path = templates_dict["output_path"]
logger.info(f"Fetching ratings for {start_date} to {end_date}")
hook = MovielensHook(conn_id=conn_id)
ratings = list(
hook.get_ratings(
start_date=start_date, end_date=end_date, batch_size=batch_size
)
)
logger.info(f"Fetched {len(ratings)} ratings")
logger.info(f"Writing ratings to {output_path}")
# Make sure output directory exists.
output_dir = os.path.dirname(output_path)
os.makedirs(output_dir, exist_ok=True)
with open(output_path, "w") as file_:
json.dump(ratings, fp=file_)
PythonOperator(
task_id="fetch_ratings",
python_callable=_fetch_ratings,
op_kwargs={"conn_id": "movielens"},
templates_dict={
"start_date": "{{ds}}",
"end_date": "{{next_ds}}",
"output_path": "/opt/airflow/data/custom_hook/{{ds}}.json",
},
)
Python
복사
8.2.3 Dag 실행
•
localhost:8080 → Chap08_02_custom dag 실행
•
worker node 접속 후 파일 생성 확인
# 생성된 파일 확인
docker exec - it boaz-airflow-study-airflow-worker-1 /bin/bash
ls /opt/airflow/data/custom_hook
Python
복사
8.3 커스텀 오퍼레이터 빌드하기
•
API 호출 부분은 훅으로 잘 묶었는데, 시작/종료 부분과 날짜 정의, 파일 저장 등의 코드도 Airflow의 Custom Operator를 이용하여 재사용할 수 있음.
8.3.1 커스텀 오퍼레이터 정의하기(Chap08/dags/custom/operator.py)
•
Airflow에서 모든 오퍼레이터는 BaseOperator 클래스의 서브클래스(subclass)로 생성됨.
class MovielensFetchRatingsOperator(BaseOperator):
Python
복사
•
__init__ 메서드 정의
◦
BaseOperator 클래스는 일반적인 동작을 정의하는 generic 인수들을 가지고 있는데, 이는 **kwargs 를 통해 전달할 수 있음.
▪
제네릭 인수에는 task_id, retries, retry_delay와 같은 태스크 관련 인수들이 포함됨.
▪
이 인수들은 DAG를 정의할 때 ‘default_args’ 파라미터를 통해 수정할 수 있음.
▪
커스텀 오퍼레이터의 기본 인수들이 정상적으로 적용되었는지 확인하기 위해 apply_defaults 데코레이터를 사용할 수 있으며, 커스텀 오퍼레이터를 정의할 때 apply_defaults 를 필수적으로 사용해야 함.
@apply_defaults
def __init__(
self,
conn_id,
output_path,
start_date="{{ds}}",
end_date="{{next_ds}}",
batch_size=1000,
**kwargs,
):
super(MovielensFetchRatingsOperator, self).__init__(**kwargs)
self._conn_id = conn_id
self._output_path = output_path
self._start_date = start_date
self._end_date = end_date
self._batch_size = batch_size
Python
복사
•
execute 메서드 정의
◦
오퍼레이터의 실제 실행 내역을 정의하는 메서드
◦
평점 데이터를 실제로 가져와서 출력 파일로 저장하는 기능을 구현
◦
context는 Airflow의 모든 context 변수를 포함하고 있음. (dict 객체)
◦
template_fields 변수를 정의하여 템플릿화할 변수를 Airflow에게 알려줄 수 있음.
▪
Airflow는 execute 메서드를 호출하기 전에 이 값들을 템플릿화함.
template_fields = ("_start_date", "_end_date", "_output_path")
def execute(self, context):
hook = MovielensHook(self._conn_id)
try:
self.log.info(
f"Fetching ratings for {self._start_date} to {self._end_date}"
)
ratings = list(
hook.get_ratings(
start_date=self._start_date,
end_date=self._end_date,
batch_size=self._batch_size,
)
)
self.log.info(f"Fetched {len(ratings)} ratings")
finally:
# Make sure we always close our hook's session.
hook.close()
self.log.info(f"Writing ratings to {self._output_path}")
# Make sure output directory exists.
output_dir = os.path.dirname(self._output_path)
os.makedirs(output_dir, exist_ok=True)
# Write output as JSON.
with open(self._output_path, "w") as file_:
json.dump(ratings, fp=file_)
Python
복사
8.3.2 커스텀 오퍼레이터 빌드하기(Chap08/dags/03_operator.py)
•
현재 파일 구조
.
├── 01_python.py
├── 02_hook.py
├── 03_operator.py ----> Chap08_03_operator DAG가 정의된 파일
├── 04_sensor.py
└── custom
├── hooks.py
├── operators.py ----> Movielens custom operator가 작성된 파일
├── ranking.py
└── sensors.py
Python
복사
•
DAG 에서 ./custom/operator.py에 정의된 Custom Operator를 사용하기 위해서는 다음과 같이 정의한 Operator class를 import 해오면 됨.
from custom.operators import MovielensFetchRatingsOperator
Python
복사
•
DAG 정의
with DAG(
dag_id="Chap08_03_operator",
description="Fetches ratings from the Movielens API using a custom operator.",
start_date=dt.datetime(2019, 1, 1),
end_date=dt.datetime(2019, 1, 10),
schedule_interval="@daily",
) as dag:
MovielensFetchRatingsOperator(
task_id="fetch_ratings",
conn_id="movielens",
start_date="{{ds}}", --> template으로 정의된 변수1
end_date="{{next_ds}}", --> template으로 정의된 변수2
output_path="/opt/airflow/data/custom_operator/{{ds}}.json", --> template으로 정의된 변수3
)
Python
복사
8.3.3 Dag 실행
•
localhost:8080 → Chap08_03_operator dag 실행
•
worker node 접속 후 파일 생성 확인
# 생성된 파일 확인
docker exec - it boaz-airflow-study-airflow-worker-1 /bin/bash
ls /opt/airflow/data/custom_operator
Python
복사
8.4 커스텀 센서 빌드하기
•
센서 : DAG 안에서 다운스트림 태스크를 실행하기 전에 특정 조건이 충족될 때까지 대기하는 task를 정의
8.4.1 커스텀 센서 정의하기(Chap08/dags/custom/sensor.py)
•
Airflow에서 모든 센서는 BaseSensorOperator 클래스의 서브클래스(subclass)로 생성됨.
◦
이 외에는 커스텀 오퍼레이터를 정의하는 것과 매우 유사함.
◦
커스텀 센서에서는 execute 메서드 대신 poke 메서드를 구현함.
•
__init__ 메서드 구현
◦
커스텀 오퍼레이터와 동일함 → 설명 생략
@apply_defaults
def __init__(self, conn_id, start_date="{{ds}}", end_date="{{next_ds}}", **kwargs):
super().__init__(**kwargs)
self._conn_id = conn_id
self._start_date = start_date
self._end_date = end_date
Python
복사
•
poke 메서드 구현
◦
커스텀 오퍼레이터에서 execute 를 구현해다면, 커스텀 센서에서는 poke 를 구현
◦
return 값이 boolean으로 주어지며, 만약 False가 리턴되면, 센서가 상태를 다시 체크할 때까지 대기(wait) 상태로 들어갔다가, 상태값이 True가 되거나 타임아웃(timeout)이 될 때까지 반복함.
◦
센서가 실행을 끝내고 True를 반환하면 다음 Task를 실행.
◦
다음의 코드는 특정 기간 동안 평점 데이터가 존재하는지를 검사하는 작업을 구현한 것.
▪
get_ratings을 호출 후 next를 호출하여 한 줄이라도 데이터가 있는지를 검사.
def poke(self, context):
hook = MovielensHook(self._conn_id)
try:
next(
hook.get_ratings(
start_date=self._start_date, end_date=self._end_date, batch_size=1
)
)
# If no StopIteration is raised, the request returned at least one record.
# This means that there are records for the given period, which we indicate
# to Airflow by returning True.
self.log.info(
f"Found ratings for {self._start_date} to {self._end_date}, continuing!"
)
return True
except StopIteration:
self.log.info(
f"Didn't find any ratings for {self._start_date} "
f"to {self._end_date}, waiting..."
)
# If StopIteration is raised, we know that the request did not find
# any records. This means that there a no ratings for the time period,
# so we should return False.
return False
finally:
# Make sure we always close our hook's session.
hook.close()
Python
복사
8.4.2 커스텀 센서 빌드하기(Chap08/dags/04_sensor.py)
•
방식은 위와 동일함.
from custom.operators import MovielensFetchRatingsOperator
from custom.sensors import MovielensRatingsSensor
with DAG(
dag_id="Chap08_04_sensor",
description="Fetches ratings from the Movielens API, with a custom sensor.",
start_date=dt.datetime(2019, 1, 1),
end_date=dt.datetime(2019, 1, 10),
schedule_interval="@daily",
) as dag:
wait_for_ratings = MovielensRatingsSensor(
task_id="wait_for_ratings",
conn_id="movielens",
start_date="{{ds}}",
end_date="{{next_ds}}",
)
fetch_ratings = MovielensFetchRatingsOperator(
task_id="fetch_ratings",
conn_id="movielens",
start_date="{{ds}}",
end_date="{{next_ds}}",
output_path="/opt/airflow/data/custom_sensor/{{ds}}.json",
)
wait_for_ratings >> fetch_ratings
Python
복사
8.4.3 Dag 실행
•
localhost:8080 → Chap08_04_sensor dag 실행
•
worker node 접속 후 파일 생성 확인
# 생성된 파일 확인
docker exec - it boaz-airflow-study-airflow-worker-1 /bin/bash
ls /opt/airflow/data/custom_sensor
Python
복사
8.5 컴포넌트 패키징하기
•
커스텀 컴포넌트를 파이썬 패키지로 패키징해보기
•
앞 절에서 구현한 훅, 오퍼레이터, 센서 클래스를 포함하는 airflow_movielens 라는 패키지를 생성
8.5.1 파이썬 패키지 부트스트랩 작업하기
•
파일 세팅
# 경로 통일을 위한 이동
cd ./dags/Chap08/
# 패키지 폴더 생성
mkdir -p airflow-movielens
cd airflow-movielens
# __init__.py 파일 생성
mkdir -p src/airflow_movielens
touch src/airflow_movielens/__init__.py
# 파일 복사
cp ../dags/custom/*s.py ./src/airflow_movielens/
Python
복사
# 파일 구조 확인
tree .
<출력>
.
└── src
└── airflow_movielens
├── __init__.py
├── hooks.py
├── operators.py
└── sensors.py
Python
복사
•
setup.py 파일 생성
vi setup.py
Python
복사
#!/usr/bin/env python
import setuptools
requirements = ["apache-airflow", "requests"]
setuptools.setup(
name="airflow_movielens",
version="0.1.0",
description="Hooks, sensors and operators for the Movielens API.",
author="BOAZ",
author_email="",
install_requires=requirements,
packages=setuptools.find_packages("src"),
package_dir={"":"src"},
url="",
license="MIT license"
)
Python
복사
8.5.2 패키지 설치하기
cd ..
python -m pip install ./airflow-movielens
Python
복사
8.5.3 패키지 실행하기
python
>>> import airflow_movielens
>>> airflow_movielens
<module 'airflow_movielens' from '/Users/xxx/miniconda/lib/python3.10/site-packages/airflow_movielens/__init__.py'>
Python
복사
8.5.4 패키지 삭제하기
python -m pip uninstall airflow-movielens
Python
복사