Search

[Airflow] Custom 컴포넌트 빌드

커스텀 컴포넌트 빌드

AirflowPlugin (먼저 맛보기)

from airflow.plugins_manager import AirflowPlugin ... class CustomSqlToS3Operator(BaseOperator): ... class CustomSqlToS3Plugin(AirflowPlugin): name = "custom_sql" hooks = [] operators = [CustomSqlToS3Operator]
Python
복사
AirflowPlugin 클래스를 상속하는 View, Operator,hook... 클래스를 생성한다
plugin 이름 등 속성을 설정한다
디렉토리는 아래와 같이 설정한다
dags | plugin ├── __init__.py ├── custom_sql │ ├── __init__.py │ ├── __pycache__ │ └── custom_sql_to_s3.py
Python
복사
위와 같이 적용을 한 후, Custom operator를 plugin에 적용하려면 airflow를 재작동하면 된다
어렵게 말하면, 생성 후 Lazy Loaded 이기때문에 airflow 인스턴스를 재시작해야한다.

본격 시작

DAG를 구성 시 계속해서 반복적으로 수행해야 하는 태스크가 있을 때, 단순 반복적으로 같은 코드를 사용해야 하는 경우가 있을 수 있다. 이런 경우 Airflow에서 커스텀 오퍼레이션을 직접 구현하여 코드의 재사용성을 높일 수 있게 된다.
본 장에서는 사용자가 자신만의 오퍼레이터를 빌드하고 DAG에서 이를 사용하는 방법을 설명한다. 그리고 이 커스텀 컴포넌트를 파이썬 패키지로 패키징하여 여러 환경에서 설치하거나 재사용할 수 있도록 하는 실습도 진행해본다.

8.0 예제 설명 및 실습 준비 사항

과거의 영화관람 기록에 따라 영화를 추천해주는 추천 시스템

교재에서는 MovieLens Dataset으로 미리 만들어놓은 API를 이용하여 실습 진행

영화 평점 API : startDate(시작 날짜)와 endDate(끝 날짜)를 입력받아 해당되는 기간의 Movielens Dataset을 반환해주는 API

docker-compose up 후, 브라우저에서 localhost:5001로 들어갔을 때 아래의 화면이 뜨면 잘 작동 하는 것.
localhost:5001/ratings?start_date=2019-01-01&end_date=2019-01-02 를 이용하면 2019년 1월 1일부터 2019년 1월 2일의 데이터를 반환받을 수 있음
→ 이를 이용하여 데이터 파이프라인을 구축할 때 전체 데이터 세트를 다 로드할 필요없이 (일별) 증분 방식으로 데이터를 로드해 올 수 있음.
위의 결과를 잘 보면 limitoffset이 함께 반환되는 것을 볼 수 있는데, 여기서 limit 은 한 번에 가져올 수 있는 데이터의 행의 수이고, offset은 어디서 부터 가져올 것인지를 지정하는 parameter임.
위의 start_date와 end_date 처럼 parameter에 넣어서 함께 요청하면 됨.

8.1 PythonOperator로 작업하기(Chap08/dags/01_python.py)

API에서 데이터를 일별로 가져오고, 처리하는 과정을 기존에 사용하던 PythonOperator로는 어떻게 구현할 수 있는지를 살펴보는 구간

8.1.1 API 액세스를 위해 request Session을 생성하는 함수 작성

# 환경 변수 설정 MOVIELENS_HOST = os.environ.get("MOVIELENS_HOST", "movielens") MOVIELENS_SCHEMA = os.environ.get("MOVIELENS_SCHEMA", "http") MOVIELENS_PORT = os.environ.get("MOVIELENS_PORT", "5001") MOVIELENS_USER = "airflow" MOVIELENS_PASSWORD = "airflow"
Python
복사
def _get_session(): """Builds a requests Session for the Movielens API.""" # Setup our requests session. session = requests.Session() session.auth = (MOVIELENS_USER, MOVIELENS_PASSWORD) # Define API base url from connection details. schema = MOVIELENS_SCHEMA host = MOVIELENS_HOST port = MOVIELENS_PORT base_url = f"{schema}://{host}:{port}" return session, base_url
Python
복사

8.1.2 페이지를 처리하는 함수 작성

위의 API 설명에서 봤듯이 사용하는 API는 한 번에 가져올 수 있는 데이터의 개수가 제한되어 있음. 따라서 해당 기간의 데이터가 끝날 때까지 while문을 이용하여 데이터를 끝까지 가져와주어야 함.
while total is None or offset < total: 조건을 이용하여 데이터를 반복적으로 가져오는 함수.
def _get_with_pagination(session, url, params, batch_size=100): """ Fetches records using a get request with given url/params, taking pagination into account. """ offset = 0 total = None while total is None or offset < total: response = session.get( url, params={**params, **{"offset": offset, "limit": batch_size}} ) response.raise_for_status() response_json = response.json() yield from response_json["result"] offset += batch_size total = response_json["total"]
Python
복사

8.1.3 특정 기간의 데이터를 반환하는 함수 작성

위에서 작성한 _get_session()_get_with_pagination() 을 이용하여 특정 기간의 평점 데이터를 생성하는 generator()를 생성하는 함수
def _get_ratings(start_date, end_date, batch_size=100): session, base_url = _get_session() yield from _get_with_pagination( session=session, url=base_url + "/ratings", params={"start_date": start_date, "end_date": end_date}, batch_size=batch_size, )
Python
복사

8.1.4 _get_ratings 함수를 통해 가져온 데이터를 output_path에 저장하는 함수 작성

def _fetch_ratings(templates_dict, batch_size=1000, **_): logger = logging.getLogger(__name__) start_date = templates_dict["start_date"] end_date = templates_dict["end_date"] output_path = templates_dict["output_path"] logger.info(f"Fetching ratings for {start_date} to {end_date}") ratings = list( _get_ratings( start_date=start_date, end_date=end_date, batch_size=batch_size ) ) logger.info(f"Fetched {len(ratings)} ratings") logger.info(f"Writing ratings to {output_path}") # Make sure output directory exists. output_dir = os.path.dirname(output_path) os.makedirs(output_dir, exist_ok=True) with open(output_path, "w") as file_: json.dump(ratings, fp=file_) # 데이터 저장
Python
복사

8.1.5 Ranking을 계산하는 함수 작성

저장한 평점 데이터를 이용하여 영화의 순위를 계산한 뒤 csv로 저장
# Chap08/dags/custom/ranking.py def rank_movies_by_rating(ratings, min_ratings=2): ranking = ( ratings.groupby("movieId") .agg( avg_rating=pd.NamedAgg(column="rating", aggfunc="mean"), num_ratings=pd.NamedAgg(column="userId", aggfunc="nunique"), ) .loc[lambda df: df["num_ratings"] > min_ratings] .sort_values(["avg_rating", "num_ratings"], ascending=False) ) return ranking
Python
복사
def _rank_movies(templates_dict, min_ratings=2, **_): input_path = templates_dict["input_path"] output_path = templates_dict["output_path"] ratings = pd.read_json(input_path) ranking = rank_movies_by_rating(ratings, min_ratings=min_ratings) # Make sure output directory exists. output_dir = os.path.dirname(output_path) os.makedirs(output_dir, exist_ok=True) ranking.to_csv(output_path, index=True)
Python
복사

8.1.6 Dag 작성 (dag_id = ‘Chap08_01_python’)

with DAG( dag_id="Chap08_01_python", description="Fetches ratings from the Movielens API using the Python Operator.", start_date=dt.datetime(2019, 1, 1), end_date=dt.datetime(2019, 1, 10), schedule_interval="@daily", ) as dag: /* _fetch_ratings 코드 */ fetch_ratings = PythonOperator( task_id="fetch_ratings", python_callable=_fetch_ratings, templates_dict={ "start_date": "{{ds}}", "end_date": "{{next_ds}}", "output_path": "/opt/airflow/data/python/ratings/{{ds}}.json", }, ) /* _rank_movies 코드 */ rank_movies = PythonOperator( task_id="rank_movies", python_callable=_rank_movies, templates_dict={ "input_path": "/opt/airflow/data/python/ratings/{{ds}}.json", "output_path": "/opt/airflow/data/python/ratings/{{ds}}.csv", }, ) fetch_ratings >> rank_movies # 의존성 정의
Python
복사

8.1.7 Dag 실행

localhost:8080 → Chap08_01_python dag 실행
worker node 접속 후 파일 생성 확인
# 생성된 파일 확인 docker exec - it xxx-airflow-study-airflow-worker-1 /bin/bash ls /opt/airflow/data/python/ratings
Python
복사

8.2 커스텀 훅 빌드하기

위의 코드에서 대부분의 함수의 경우 MovieLens Dataset의 API를 연결하고 데이터를 가져오는 용도로 쓰임. 위의 예시에서는 하나의 dag 파일에 함수 형태로 API 연결 및 호출 작업을 작성하였는데, 이런 경우 다른 DAG에서 같은 방식으로 API를 사용하고자 할 때 코드를 반복적으로 작성해야 한다는 문제가 있음.
Airflow의 Custom Hook을 이용하면 반복되는 작업을 캡슐화 할 수 있음.

8.2.0 Admin → Connection에서 movielens API 등록

비밀번호도 작성해주어야 합니다 : airflow

8.2.1 커스텀 훅 설계하기(Chap08/dags/custom/hooks.py)

Airflow에서 모든 훅은 추상 클래스(abstract class)인 BaseHook 클래스의 서브클래스(subclass)로 생성됨.
class MovielensHook(BaseHook): ...
Python
복사
훅을 구현하기 위해 훅 연결과 훅에 필요한 다른 추가적인 인수를 지정하는 __init__ 메서드를 정의해주어야 함. __init__ 메서드에는 훅 연결에 필요한 정보와 추가적인 인수를 정의할 수 있음.
def __init__(self, conn_id, retry=3): super().__init__() # BaseHook 클래스 생성자 호출 self._conn_id = conn_id # conn_id를 통해 훅에게 어떤 connection을 사용하는지 전달. self._retry = retry self._session = None self._base_url = None
Python
복사
다음으로는 외부 시스템과의 연결 설정을 책임지는 get_conn 메서드를 정의해주어야 함.
get_conn 메서드는 API 연결 정보를 가져오는 곳으로 8.1에서 정의한 get_session 함수와 동일한 역할을 함.
BaseHook에서 기본적으로 제공하는 self.get_connenction 메서드를 통해 Airflow 자격 인증 저장소에서 입력받은 conn_id에 해당하는 connection 정보를 얻어옴. → 정상적인 작동을 위해 사전에 8.2.0 작업이 필요.
아래의 코드는 if self._session is None: 조건을 이용하여 get_conn 메서드가 반복적으로 호출되더라도 처음 한 번만 세션 설정을 진행하도록 작성됨. (동일한 작업을 방지)
def get_conn(self): """ Returns the connection used by the hook for querying data. Should in principle not be used directly. """ # self._session에 세션을 캐싱시켜 불필요한 함수 호출을 방지 if self._session is None: # Fetch config for the given connection (host, login, etc). config = self.get_connection(self._conn_id) # UI에서 필수적으로 입력되어야 하는 값이 입력되지 않았을 때 error 발생 if not config.host: raise ValueError(f"No host specified in connection {self._conn_id}") schema = config.schema or self.DEFAULT_SCHEMA port = config.port or self.DEFAULT_PORT self._base_url = f"{schema}://{config.host}:{port}" # Build our session instance, which we will use for any # requests to the API. self._session = requests.Session() if config.login: self._session.auth = (config.login, config.password) return self._session, self._base_url
Python
복사
영화의 평점을 가져오기 위해 필요한 get_ratings_get_with_pagination 메서드 작성
기존과 동일한 코드이므로 설명 생략

8.2.2 MovielensHook으로 DAG 빌드하기(Chap08/dags/02_hooks.py)

현재 파일 구조
. ├── 01_python.py ├── 02_hook.py ----> Chap08_02_hook DAG가 정의된 파일 ├── 03_operator.py ├── 04_sensor.py └── custom ├── hooks.py ----> Movielens Hook이 작성된 파일 ├── operators.py ├── ranking.py └── sensors.py
Python
복사
DAG 에서 ./custom/hooks.py에 정의된 Hook을 사용하기 위해서는 다음과 같이 정의한 Hook class를 import 해오면 됨.
from custom.hooks import MovielensHook
Python
복사
DAG 정의는 기존의 방식과 동일
불러들인 MovielensHook class로 instance 만들어서 사용하면 됨.
→ 이를 통해 다른 DAG에서도 사용가능해짐.
with DAG( dag_id="Chap08_02_hook", description="Fetches ratings from the Movielens API using a custom hook.", start_date=dt.datetime(2019, 1, 1), end_date=dt.datetime(2019, 1, 10), schedule_interval="@daily", ) as dag: def _fetch_ratings(conn_id, templates_dict, batch_size=1000, **_): logger = logging.getLogger(__name__) start_date = templates_dict["start_date"] end_date = templates_dict["end_date"] output_path = templates_dict["output_path"] logger.info(f"Fetching ratings for {start_date} to {end_date}") hook = MovielensHook(conn_id=conn_id) ratings = list( hook.get_ratings( start_date=start_date, end_date=end_date, batch_size=batch_size ) ) logger.info(f"Fetched {len(ratings)} ratings") logger.info(f"Writing ratings to {output_path}") # Make sure output directory exists. output_dir = os.path.dirname(output_path) os.makedirs(output_dir, exist_ok=True) with open(output_path, "w") as file_: json.dump(ratings, fp=file_) PythonOperator( task_id="fetch_ratings", python_callable=_fetch_ratings, op_kwargs={"conn_id": "movielens"}, templates_dict={ "start_date": "{{ds}}", "end_date": "{{next_ds}}", "output_path": "/opt/airflow/data/custom_hook/{{ds}}.json", }, )
Python
복사

8.2.3 Dag 실행

localhost:8080 → Chap08_02_custom dag 실행
worker node 접속 후 파일 생성 확인
# 생성된 파일 확인 docker exec - it boaz-airflow-study-airflow-worker-1 /bin/bash ls /opt/airflow/data/custom_hook
Python
복사

8.3 커스텀 오퍼레이터 빌드하기

API 호출 부분은 훅으로 잘 묶었는데, 시작/종료 부분과 날짜 정의, 파일 저장 등의 코드도 Airflow의 Custom Operator를 이용하여 재사용할 수 있음.

8.3.1 커스텀 오퍼레이터 정의하기(Chap08/dags/custom/operator.py)

Airflow에서 모든 오퍼레이터는 BaseOperator 클래스의 서브클래스(subclass)로 생성됨.
class MovielensFetchRatingsOperator(BaseOperator):
Python
복사
__init__ 메서드 정의
BaseOperator 클래스는 일반적인 동작을 정의하는 generic 인수들을 가지고 있는데, 이는 **kwargs 를 통해 전달할 수 있음.
제네릭 인수에는 task_id, retries, retry_delay와 같은 태스크 관련 인수들이 포함됨.
이 인수들은 DAG를 정의할 때 ‘default_args’ 파라미터를 통해 수정할 수 있음.
커스텀 오퍼레이터의 기본 인수들이 정상적으로 적용되었는지 확인하기 위해 apply_defaults 데코레이터를 사용할 수 있으며, 커스텀 오퍼레이터를 정의할 때 apply_defaults 를 필수적으로 사용해야 함.
@apply_defaults def __init__( self, conn_id, output_path, start_date="{{ds}}", end_date="{{next_ds}}", batch_size=1000, **kwargs, ): super(MovielensFetchRatingsOperator, self).__init__(**kwargs) self._conn_id = conn_id self._output_path = output_path self._start_date = start_date self._end_date = end_date self._batch_size = batch_size
Python
복사
execute 메서드 정의
오퍼레이터의 실제 실행 내역을 정의하는 메서드
평점 데이터를 실제로 가져와서 출력 파일로 저장하는 기능을 구현
context는 Airflow의 모든 context 변수를 포함하고 있음. (dict 객체)
template_fields 변수를 정의하여 템플릿화할 변수를 Airflow에게 알려줄 수 있음.
Airflow는 execute 메서드를 호출하기 전에 이 값들을 템플릿화함.
template_fields = ("_start_date", "_end_date", "_output_path") def execute(self, context): hook = MovielensHook(self._conn_id) try: self.log.info( f"Fetching ratings for {self._start_date} to {self._end_date}" ) ratings = list( hook.get_ratings( start_date=self._start_date, end_date=self._end_date, batch_size=self._batch_size, ) ) self.log.info(f"Fetched {len(ratings)} ratings") finally: # Make sure we always close our hook's session. hook.close() self.log.info(f"Writing ratings to {self._output_path}") # Make sure output directory exists. output_dir = os.path.dirname(self._output_path) os.makedirs(output_dir, exist_ok=True) # Write output as JSON. with open(self._output_path, "w") as file_: json.dump(ratings, fp=file_)
Python
복사

8.3.2 커스텀 오퍼레이터 빌드하기(Chap08/dags/03_operator.py)

현재 파일 구조
. ├── 01_python.py ├── 02_hook.py ├── 03_operator.py ----> Chap08_03_operator DAG가 정의된 파일 ├── 04_sensor.py └── custom ├── hooks.py ├── operators.py ----> Movielens custom operator가 작성된 파일 ├── ranking.py └── sensors.py
Python
복사
DAG 에서 ./custom/operator.py에 정의된 Custom Operator를 사용하기 위해서는 다음과 같이 정의한 Operator class를 import 해오면 됨.
from custom.operators import MovielensFetchRatingsOperator
Python
복사
DAG 정의
with DAG( dag_id="Chap08_03_operator", description="Fetches ratings from the Movielens API using a custom operator.", start_date=dt.datetime(2019, 1, 1), end_date=dt.datetime(2019, 1, 10), schedule_interval="@daily", ) as dag: MovielensFetchRatingsOperator( task_id="fetch_ratings", conn_id="movielens", start_date="{{ds}}", --> template으로 정의된 변수1 end_date="{{next_ds}}", --> template으로 정의된 변수2 output_path="/opt/airflow/data/custom_operator/{{ds}}.json", --> template으로 정의된 변수3 )
Python
복사

8.3.3 Dag 실행

localhost:8080 → Chap08_03_operator dag 실행
worker node 접속 후 파일 생성 확인
# 생성된 파일 확인 docker exec - it boaz-airflow-study-airflow-worker-1 /bin/bash ls /opt/airflow/data/custom_operator
Python
복사

8.4 커스텀 센서 빌드하기

센서 : DAG 안에서 다운스트림 태스크를 실행하기 전에 특정 조건이 충족될 때까지 대기하는 task를 정의

8.4.1 커스텀 센서 정의하기(Chap08/dags/custom/sensor.py)

Airflow에서 모든 센서는 BaseSensorOperator 클래스의 서브클래스(subclass)로 생성됨.
이 외에는 커스텀 오퍼레이터를 정의하는 것과 매우 유사함.
커스텀 센서에서는 execute 메서드 대신 poke 메서드를 구현함.
__init__ 메서드 구현
커스텀 오퍼레이터와 동일함 → 설명 생략
@apply_defaults def __init__(self, conn_id, start_date="{{ds}}", end_date="{{next_ds}}", **kwargs): super().__init__(**kwargs) self._conn_id = conn_id self._start_date = start_date self._end_date = end_date
Python
복사
poke 메서드 구현
커스텀 오퍼레이터에서 execute 를 구현해다면, 커스텀 센서에서는 poke 를 구현
return 값이 boolean으로 주어지며, 만약 False가 리턴되면, 센서가 상태를 다시 체크할 때까지 대기(wait) 상태로 들어갔다가, 상태값이 True가 되거나 타임아웃(timeout)이 될 때까지 반복함.
센서가 실행을 끝내고 True를 반환하면 다음 Task를 실행.
다음의 코드는 특정 기간 동안 평점 데이터가 존재하는지를 검사하는 작업을 구현한 것.
get_ratings을 호출 후 next를 호출하여 한 줄이라도 데이터가 있는지를 검사.
def poke(self, context): hook = MovielensHook(self._conn_id) try: next( hook.get_ratings( start_date=self._start_date, end_date=self._end_date, batch_size=1 ) ) # If no StopIteration is raised, the request returned at least one record. # This means that there are records for the given period, which we indicate # to Airflow by returning True. self.log.info( f"Found ratings for {self._start_date} to {self._end_date}, continuing!" ) return True except StopIteration: self.log.info( f"Didn't find any ratings for {self._start_date} " f"to {self._end_date}, waiting..." ) # If StopIteration is raised, we know that the request did not find # any records. This means that there a no ratings for the time period, # so we should return False. return False finally: # Make sure we always close our hook's session. hook.close()
Python
복사

8.4.2 커스텀 센서 빌드하기(Chap08/dags/04_sensor.py)

방식은 위와 동일함.
from custom.operators import MovielensFetchRatingsOperator from custom.sensors import MovielensRatingsSensor with DAG( dag_id="Chap08_04_sensor", description="Fetches ratings from the Movielens API, with a custom sensor.", start_date=dt.datetime(2019, 1, 1), end_date=dt.datetime(2019, 1, 10), schedule_interval="@daily", ) as dag: wait_for_ratings = MovielensRatingsSensor( task_id="wait_for_ratings", conn_id="movielens", start_date="{{ds}}", end_date="{{next_ds}}", ) fetch_ratings = MovielensFetchRatingsOperator( task_id="fetch_ratings", conn_id="movielens", start_date="{{ds}}", end_date="{{next_ds}}", output_path="/opt/airflow/data/custom_sensor/{{ds}}.json", ) wait_for_ratings >> fetch_ratings
Python
복사

8.4.3 Dag 실행

localhost:8080 → Chap08_04_sensor dag 실행
worker node 접속 후 파일 생성 확인
# 생성된 파일 확인 docker exec - it boaz-airflow-study-airflow-worker-1 /bin/bash ls /opt/airflow/data/custom_sensor
Python
복사

8.5 컴포넌트 패키징하기

커스텀 컴포넌트를 파이썬 패키지로 패키징해보기
앞 절에서 구현한 훅, 오퍼레이터, 센서 클래스를 포함하는 airflow_movielens 라는 패키지를 생성

8.5.1 파이썬 패키지 부트스트랩 작업하기

파일 세팅
# 경로 통일을 위한 이동 cd ./dags/Chap08/ # 패키지 폴더 생성 mkdir -p airflow-movielens cd airflow-movielens # __init__.py 파일 생성 mkdir -p src/airflow_movielens touch src/airflow_movielens/__init__.py # 파일 복사 cp ../dags/custom/*s.py ./src/airflow_movielens/
Python
복사
# 파일 구조 확인 tree . <출력> . └── src └── airflow_movielens ├── __init__.py ├── hooks.py ├── operators.py └── sensors.py
Python
복사
setup.py 파일 생성
vi setup.py
Python
복사
#!/usr/bin/env python import setuptools requirements = ["apache-airflow", "requests"] setuptools.setup( name="airflow_movielens", version="0.1.0", description="Hooks, sensors and operators for the Movielens API.", author="BOAZ", author_email="", install_requires=requirements, packages=setuptools.find_packages("src"), package_dir={"":"src"}, url="", license="MIT license" )
Python
복사

8.5.2 패키지 설치하기

cd .. python -m pip install ./airflow-movielens
Python
복사

8.5.3 패키지 실행하기

python >>> import airflow_movielens >>> airflow_movielens <module 'airflow_movielens' from '/Users/xxx/miniconda/lib/python3.10/site-packages/airflow_movielens/__init__.py'>
Python
복사

8.5.4 패키지 삭제하기

python -m pip uninstall airflow-movielens
Python
복사

참고