ํ ์คํธ๋?
ย ๊ฐ๋ฐํ ์ฝ๋๋ฅผ ๋ฐฐํฌํ ๋ ๊ฐ์ฅ ์ค์ํ ๊ฒ์?
โ ์ค์ ์๋น์ค์์ ์ ๋๋ก ๋์ํ๋ ์ง โํ
์คํธโํ๋ ๊ฒ
โข
99.99999% ์ ๊ธฐ์
์์๋ ๋ฐฐํฌ ์ ํ
์คํธ๋ฅผ ์ํํ๋ฉฐ, ๋๋ถ๋ถ ํ
์คํ
์ CI ํ์ดํ๋ผ์ธ์์ ์๋ํ๋์ด ์๋ค.
Github Actions๋ฅผ ์ด์ฉํ ํ
์คํธ UI
โข
ํตํฉ ํ
์คํธ์ ๊ฒฝ์ฐ Github Actions๋ฅผ ํตํด ๊ตฌํํ๊ธฐ๋ ํ์ง๋ง, ํ
์คํธ ๋จ๊ณ์ ๋ฐ๋ผ ์ฝ๋ ๋ ๋ฒจ์์ ๋ง์ ๋จ์ ํ
์คํธ(unit test)๋ฅผ ๊ตฌํํ๊ณ , ์ง์ ์ํํ๋ฉฐ ์คํจ์ ๋๋ฒ๊น
์ ํ๋ค.
Golang์์ ํ
์คํธ
โข
golang์์๋ ํ
์คํธ ํ๋ ์์ํฌ ๋ด์ฅ
โฆ
go test ํ๋ฉด ํ์ฌ ํด๋์ *_test.go๋ฅผ ์ธ์ํ๊ณ ์ผ๊ด ์คํ
ย ๋จ์ ํ ์คํธ? ํตํฉ ํ ์คํธ?
๋จ์ ํ
์คํธ: ๊ฐ๋ณ Task ๋จ์(๋จ์ผ ๊ธฐ๋ฅ)๋ก ํ
์คํธ๋ฅผ ๋ถํ ํ์ฌ ์ํํ๋ ๊ฒ
โข
๊ฐ ๋จ์์ ์ฌ๋ฐ๋ฅธ ๋์์ ๊ฒ์ฆํ ์ ์์ง๋ง, ์ฌ๋ฌ ๋จ์๋ก ๊ตฌ์ฑ๋ ๋ณต์กํ ์์คํ
์ ๋์์ ๋ชจ๋ ๊ฒ์ฆํ์ง๋ ์์ผ๋ฏ๋ก ์ต์ข
์ ์ผ๋ก๋ ํตํฉ ํ
์คํธ๋ฅผ ์์ฑํด์ผํจ
ย ์ Airflow์์ ํ ์คํธ๊ฐ ์ค์ํ ๊น?
โข
๋ง์ ์ธ๋ถ ์์คํ
๊ณผ ํต์ ์ด ๋ฐ์ํ๋ ํน์ฑ์ ๊ฐ์ง
โข
๋ก์ง์ ์ค์ ๋ก ์ํํ๋ Task๋ฅผ ์์ํ๊ณ ์ข
๋ฃ์ํค๋ ์ญํ (์ค์ผ์คํธ๋ ์ด์
์์คํ
)
9.1.1 ๋ชจ๋ DAG์ ๋ํ ๋ฌด๊ฒฐ์ฑ ํ ์คํธ
๋ฌด๊ฒฐ์ฑ ํ ์คํธ
๋ชจ๋ DAG์ ๋ฌด๊ฒฐ์ฑ์ ๋ํด DAG๊ฐ ์ ์์ ์ผ๋ก ๊ตฌํ๋์๋์ง ํ์ธ
โข
DAG์ ์ฌ์ดํด์ด ํฌํจ๋์ด ์์ง๋ ์์์ง
โข
DAG์ Task ID๊ฐ ๊ณ ์ ํ์ง
โข
๋ฑ๋ฑ๋ฑ
ํ ์คํธ ๊ตฌ์ฑ
ํ๋ก์ ํธ์ ์ต์๋จ ๋๋ ํ ๋ฆฌ์ ๋ณ๋์ tests/ ๋๋ ํ ๋ฆฌ๋ฅผ ์์ฑํ์ฌ ๊ฒ์ฌ ๋์ ์ฝ๋๋ฅผ ๊ทธ๋๋ก ๋ณต์ฌํ์ฌ ๊ตฌ์ฑ
ํ์๋ ์๋์ง๋งโฆ
โข
pytest๋ ์ฃผ์ด์ง ๋๋ ํ ๋ฆฌ์์ test_ || _test๋ฅผ ๊ฒ์
โข
ํ
์คํธ๋ ์ ๋๋ก ์์กด์ ์ด์ด์ ์ ๋๋ค.
ํ ์คํธ ์์
# DAG_PATH: ํ
์คํธ๋ฅผ ํ DAG์ ์์น, DAG_FILES: DAG_PATH ์์ ์๋ ํ์ผ๋ค์ list
DAG_PATH = os.path.join(os.path.dirname(__file__), "..", "..", "dags/**.py")
DAG_FILES = glob.glob(DAG_PATH)
@pytest.mark.parametrize("dag_file", DAG_FILES)
def test_dag_integrity(dag_file):
"""Import DAG files and check for DAG."""
module_name, _ = os.path.splitext(dag_file) # ๋ชจ๋์ ์ ๋๊ฒฝ๋ก ํฌํจ ์ด๋ฆ, ํ์ฅ์
module_path = os.path.join(DAG_PATH, dag_file) # ํ
์คํธํ DAG์ ์์น
mod_spec = importlib.util.spec_from_file_location(module_name, module_path) # ๋ชจ๋์ ๊ฐ์ ธ์ด
module = importlib.util.module_from_spec(mod_spec)
mod_spec.loader.exec_module(module)
# ํ์ผ ์ค DAG ์ถ์ถ
dag_objects = [var for var in vars(
module).values() if isinstance(var, DAG)]
# ๊ฒ์ฌ 1: ๋ชจ๋ ํ์ด์ฌ ํ์ผ์์ DAG ๊ฐ์ฒด๊ฐ ์กด์ฌํ๋์ง ์ ํจ์ฑ ๊ฒ์ฌ
assert dag_objects
# ๊ฒ์ฌ 2: DAG ๊ฐ์ฒด์ ์ํ ์ฃผ๊ธฐ ์กด์ฌ ์ฌ๋ถ ํ์ธ
for dag in dag_objects:
# Test cycles
dag.test_cycle()
Python
๋ณต์ฌ
โข
๊ฒ์ฌ 1: ๋ชจ๋ ํ์ด์ฌ ํ์ผ์์ DAG ๊ฐ์ฒด๊ฐ ์กด์ฌํ๋์ง ์ ํจ์ฑ ๊ฒ์ฌ
ย assert? raise?
โฆ
assert [์กฐ๊ฑด], [์๋ฌ ๋ฉ์์ง]: ์กฐ๊ฑด์ด ์ฐธ์ด๋ฉด ๊ทธ๋๋ก ์งํ, ๊ฑฐ์ง์ด๋ฉด ์๋ฌ ๋ฉ์์ง ์ถ๋ ฅ
โฆ
[์กฐ๊ฑด]์ ๋ณด์ฅํจ!
โข
๊ฒ์ฌ 2: DAG ๊ฐ์ฒด์ ์ํ ์ฃผ๊ธฐ ์กด์ฌ ์ฌ๋ถ ํ์ธ
โข
๊ทธ ์ธโฆ
โฆ
DAG์ ๊ธฐ๋ฅ์ด๋ ๊ตฌํ๊ณผ ๊ด๋ จ๋ ๊ฒ์ฆ์ ํ ์๋ ์๊ณ , ๋จ์ ํ์ด์ฌ์ ์ด์ฉํ ํ
์คํธ๋ฅผ ํ ์๋ ์์!
โฆ
ex) ๋ญํ๋ ์ ์ผ๊น์?
assert dag.dag_id.startswith((โimportโ, โexportโ))
Python
๋ณต์ฌ
์ค์ต
1.
์ ์ ๋ฐ ์คํ
docker exec -it boaz-airflow-study-airflow-worker-1 /bin/bash
Shell
๋ณต์ฌ
cd /opt/airflow/dags/Chap09/tests/dags
pytest test_dag_integrity.py
Shell
๋ณต์ฌ
ย ๋๋ฌด ๊ธธ์ด์!!!
โข
์๋ ๊ฒฐ๋ก ์ ๋ง์ง๋ง์ ์์ด..
================================================ short test summary info ================================================
FAILED test_dag_integrity.py::test_dag_integrity[/opt/airflow/dags/Chap09/tests/dags/../../dags/bash_operator_no_command.py] - airflow.exceptions.AirflowException: missing keyword argument 'bash_command'
FAILED test_dag_integrity.py::test_dag_integrity[/opt/airflow/dags/Chap09/tests/dags/../../dags/dag_cycle.py] - AttributeError: 'DAG' object has no attribute 'test_cycle'
FAILED test_dag_integrity.py::test_dag_integrity[/opt/airflow/dags/Chap09/tests/dags/../../dags/duplicate_task_ids.py] - airflow.exceptions.DuplicateTaskIdFound: Task id 'task' has already been added to the DAG
FAILED test_dag_integrity.py::test_dag_integrity[/opt/airflow/dags/Chap09/tests/dags/../../dags/testme.py] - airflow.exceptions.DuplicateTaskIdFound: Task id 'test2' has already been added to the DAG
============================================ 4 failed, 11 warnings in 0.55s =============================================
Shell
๋ณต์ฌ
2.
๋ก๊ทธ๋ฅผ ๋ณด๊ณ ย ๊ฐ์ ์๋ฌ ์ฐพ๊ธฐ
1) bash_operator_no_command.py
import airflow.utils.dates
from airflow import DAG
from airflow.operators.bash import BashOperator
dag = DAG(
dag_id="chapter8_bash_operator_no_command",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval=None,
)
BashOperator(task_id="this_should_fail", dag=dag)
Python
๋ณต์ฌ
Test failure log
2) dag_cycle.py
import airflow.utils.dates
from airflow import DAG
from airflow.operators.dummy import DummyOperator
dag = DAG(
dag_id="chapter8_dag_cycle",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval=None,
)
t1 = DummyOperator(task_id="t1", dag=dag)
t2 = DummyOperator(task_id="t2", dag=dag)
t3 = DummyOperator(task_id="t3", dag=dag)
t1 >> t2 >> t3 >> t1
Python
๋ณต์ฌ
Test failure log
3) duplicate_task_ids.py
import airflow.utils.dates
from airflow import DAG
from airflow.operators.dummy import DummyOperator
dag = DAG(
dag_id="chapter8_duplicate_task_ids",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval=None,
)
t1 = DummyOperator(task_id="task", dag=dag)
for i in range(5):
DummyOperator(task_id="task", dag=dag) >> t1
Python
๋ณต์ฌ
Test failure log
4) testme.py
import airflow.utils.dates
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
dag = DAG(
dag_id="testme", start_date=airflow.utils.dates.days_ago(3), schedule_interval=None
)
t1 = DummyOperator(task_id="test", dag=dag)
for tasknr in range(5):
BashOperator(task_id="test2", bash_command=f"echo '{tasknr}'", dag=dag) >> t1
Python
๋ณต์ฌ
Test failure log
9.1.2 CI/CD ํ์ดํ๋ผ์ธ ์ค์ ํ๊ธฐ
CI/CD ํ์ดํ๋ผ์ธ?
์ฝ๋ ์ ์ฅ์๋ฅผ ํตํด ์ฝ๋๊ฐ ๋ณ๊ฒฝ๋ ๋ ์ฌ์ ์ ์๋ ์คํฌ๋ฆฝํธ๋ฅผ ์คํํ๋ ์์คํ
โข
CI(Continuous integration): ๋ณ๊ฒฝ๋ ์ฝ๋๊ฐ ์ฝ๋ฉ ํ์ค๊ณผ ํ
์คํธ ์กฐ๊ฑด์ ์ค์ํ๋ ์ง ํ์ธ/๊ฒ์ฆ
โฆ
Flake8, Pylint, Black ๋ฑ์ ์ฝ๋ ๊ฒ์ฌ๊ธฐ๋ฅผ ํตํด ์ฝ๋๋ฅผ ๋ ํฌ์งํ ๋ฆฌ์ ํธ์ํ ๋ ์ฝ๋์ ํ์ง ํ์ธ ๋ฐ ํ
์คํธ ์คํ
โข
CD(Continuous deployment): ์ฌ๋์ ๊ฐ์ญ ์์ด ์์ ํ ์๋ํ๋ ์ฝ๋๋ฅผ ํ๋ก๋์
์์คํ
์ ์๋์ผ๋ก ๋ฐฐํฌํ๋ ๊ฒ
โฆ
์๋์ผ๋ก ๊ฒ์ฆํ๊ณ ๋ฐฐํฌํ์ง ์๊ณ ๋ ์์ฐ์ฑ ๊ทน๋ํ
โข
์์
.github/workflows/control-image-build.yml
name: Docker Image CI(Control server)
on:
push:
paths:
- "resources/images/control-ubuntu/**"
branches:
- "main"
jobs:
build-and-push-image:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
- name: Login to DockerHub
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Build and push
uses: docker/build-push-action@v2
with:
context: ./resources/images/control-ubuntu/
file: ./resources/images/control-ubuntu/Dockerfile
platforms: linux/amd64
push: true
tags: ryann3/control-ubuntu:4.0
YAML
๋ณต์ฌ
main ๋ธ๋์น์ ๋จธ์ง๋ ์ปค๋ฐ ๋จ์๋ก workflow๊ฐ ์คํ๋๋ค
โฆ
์ ์ํ build-and-push-image job์ step ๋ณ๋ก ์คํ๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
์คํจํ ๊ฒฝ์ฐ
Pytest with airflow
โข
์ถํ ํ
์คํธ ์์ !
name: Python tatic checks and tests for dags
on:
push:
paths:
- "dags/Chap09/**"
branches:
- "main"
jobs:
test-and-check:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup Python
uses: actions/setup-python@v2
with:
python-version: "3.6.9"
- name: Install Flake8
run: pip install flake8
- name: Run Flake8
run: flake8
- name: Install Pylint
run: pip install pylint
- name: Run Pylint
run: find dags/Chap09/tests -name "*.py" | xargs pylint --output-format=colorized
- name: Install Black
run: pip install black
- name: Run Black
run: find dags/Chap09/tests -name "*.py" | xargs black --check
- name: Install dependencies
run: pip install apache-airflow pytest
- name: Test DAG integrity
run: pytest dags/Chap09/tests
YAML
๋ณต์ฌ
9.1.3 ๋จ์ ํ ์คํธ ์์ฑํ๊ธฐ
์ง๋ ์๊ฐ ์ค์ต์ ๋ ์ฌ๋ ค๋ณด์โฆ
...
def get_ratings(self, start_date=None, end_date=None, batch_size=100):
"""
Fetches ratings between the given start/end date.
Parameters
----------
start_date : str
Start date to start fetching ratings from (inclusive). Expected
format is YYYY-MM-DD (equal to Airflow's ds formats).
end_date : str
End date to fetching ratings up to (exclusive). Expected
format is YYYY-MM-DD (equal to Airflow's ds formats).
batch_size : int
Size of the batches (pages) to fetch from the API. Larger values
mean less requests, but more data transferred per request.
"""
yield from self._get_with_pagination(
endpoint="/ratings",
params={"start_date": start_date, "end_date": end_date},
batch_size=batch_size,
)
...
Python
๋ณต์ฌ
โข
get_ratings์์ batch size๊ฐ ์์๋ผ๋ฉด?!?!
1.
์ํค์ด ์กฐ์ก๋ค~
2.
์ฌ์ฉ์์ ์
๋ ฅ ๊ฐ์ ์ ๋ฌํ๊ธฐ ์ ์ ํด๋น ๊ฐ์ด ์ ํจํ์ง๋ฅผ ๋จผ์ ํ๋จํด๋ณผ๊น?
๋๊ฐ ํ์ฌ์์ ๋ ์์จ๋ฐ์๊น์?
์์: ์ฃผ์ด์ง ๋ ๋ ์ง ์ฌ์ด์ ์์ N๊ฐ์ ์ธ๊ธฐ ์ํ๋ฅผ ๋ฐํ
import datetime as dt
import logging
import json
import os
from airflow import DAG
from airflow.operators.python import PythonOperator
from collections import defaultdict, Counter
from custom.hooks import MovielensHook
class MovielensPopulatriyOperator:
def __init__(self, conn_id, start_date, end_date, min_ratings=4, top_n=5, **kwargs):
super().__init__(**kwargs)
self._conn_id = conn_id
self._start_date = start_date
self._end_date = end_date
self._min_ratings = min_ratings
self._top_n = top_n
def execute(self, context):
with MovielensHook(self._conn_id) as hook:
ratings = hook.get_ratings(
start_date=self._start_date, end_date=self._end_date,)
rating_sums = defaultdict(Counter)
for rating in ratings:
rating_sums[rating["movieId"]].update(
count=1, rating=rating["rating"])
averages = {
movie_id: (rating_counter["rating"]/rating_counter["count"],
rating_counter["count"]
)
for movie_id, rating_counter in rating_sums.items()
if rating_counter["count"] >= self._min_ratings
}
return sorted(
averages.items(),
key=lambda x: x[1],
reverse=True
)[:self._top_n]
Python
๋ณต์ฌ
ย MovielensPopulatriyOperator์ ์ ํ์ฑ์ ์ด๋ป๊ฒ ํ ์คํธํด์ผํ ๊น?
โ ์ฃผ์ด์ง ๊ฐ์ผ๋ก ์คํผ๋ ์ดํฐ๋ฅผ ์คํํ๊ณ ๊ฒฐ๊ณผ๊ฐ ์์๋๋ก์ธ์ง ํ์ธ
โข
์ด๋ฅผ ์ํ์ฌ ๋ช ๊ฐ์ง pytest ์ปดํฌ๋ํธ๋ค์ด ํ์ํจ
9.1.4 Pytest ํ๋ก์ ํธ ๊ตฌ์ฑํ๊ธฐ
โข
BashOperator๋ฅผ ํ
์คํธํ๋ ์์
def test_example():
task = BashOperator(
task_id="test",
bash_command="echo 'hello!'",
xcom_push=True,
)
result = task.execute(context={})
assert result == "hello!"
Python
๋ณต์ฌ
โข
MovielensPopularityOperator์ ์ ์ฉ
def test_movielenspopularityoperator(mocker):
task = MovielensPopularityOperator(
task_id="test_id",
# conn_id="testconn", # ํ์ ์ธ์์ด๋ฏ๋ก ์์ ๋ฉด test์ ์๋ฌ ๋ฐ์
start_date="2015-01-01",
end_date="2015-01-03",
top_n=5,
)
result = task.execute(context=None)
assert len(result) == 5
Python
๋ณต์ฌ
ย ํ ์คํธ ๊ฒฐ๊ณผ๊ฐ์ ๋ค๋ฅธ ์ชฝ์ ์ ๋ฌํ ์ ์์๊น?
โข
ํ
์คํธ๋ ๊ฒฉ๋ฆฌ๋ ํ๊ฒฝ์์ ์งํ๋๊ธฐ ๋๋ฌธ์ ์ด๋ค ํ
์คํธ์ ๊ฒฐ๊ณผ๋ ๋ค๋ฅธ ํ
์คํธ์ ์ํฅ์ ์ค ์ ์๋ค.
โข
๋ฐ๋ผ์ ํ
์คํธ ์ฌ์ด์ ๋ฐ์ํ๋ ์ ๋ณด๋ฅผ ๋ชฉ์
(Mockup)์ ์ด์ฉํ์ฌ ์ ๋ฌํ๋ค.
๋ชฉ์ ์ด๋
ํน์ ์์
์ด๋ ๊ฐ์ฒด๋ฅผ ๋ชจ์กฐ๋ก ๋ง๋๋ ๊ฒ
์ค์ ์ด์ ํ๊ฒฝ์์๋ ์์๋์ง๋ง ํ
์คํธ ์ค์๋ ๋ฐ์ํ์ง ์๋ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ๋ํ ํธ์ถ์ ์ค์ ๋ก ๋ฐ์์ํค์ง ์๊ณ ํน์ ๊ฐ์ ๋ฐํํ๋๋ก ํ์ด์ฌ ์ฝ๋๋ก ์ง์ํ์ฌ ์์์ ๊ฐ์ ์ ๋ฌํจ
โข
์ธ๋ถ ์์คํ
์ด ์ค์ ๋ก ์ฐ๊ฒฐํ์ง ์๊ณ ๋ ํ
์คํธ๋ฅผ ๊ฐ๋ฐํ๊ณ ์คํํ ์ ์์
def test_movielenspopularityoperator(mocker):
mock_get = mocker.patch.object(
MovielensHook, # BaseHook์ผ๋ก ํ๋ฉด ์ ๋จ(์ ์ ์์น x ํธ์ถ ์์น o)
"get_connection",
return_value=Connection(
conn_id="test", login="airflow", password="airflow"),
)
task = MovielensPopularityOperator(
task_id="test_id",
conn_id="test",
start_date="2015-01-01",
end_date="2015-01-03",
top_n=5,
)
result = task.execute(context=None)
assert len(result) == 5
assert mock_get.call_count == 1 # ํ ๋ฒ๋ง ํธ์ถ๋ ๊ฒ์ธ์ง ํ์ธ
mock_get.assert_called_with("test") # ์์๋๋ conn_id๋ก ๊ฒ์ฆ
Python
๋ณต์ฌ
cd /opt/airflow/dags/Chap09/tests/dags/chapter9/custom
pytest test_operators.py
Shell
๋ณต์ฌ
Test log
9.1.5 ๋์คํฌ ํ์ผ๋ก ํ ์คํธํ๊ธฐ
JSON ๋ฆฌ์คํธ๋ฅผ ๊ฐ์ง ํ์ผ์ ์ฝ๊ณ ์ด๋ฅผ CSV ํ์์ผ๋ก ์ฐ๋ ์คํผ๋ ์ดํฐ!
Operator
class JsonToCsvOperator(BaseOperator):
def __init__(self,input_path, output_path, **kwargs):
super().__init__(**kwargs)
self._input_path = input_path
self._output_path = output_path
def execute(self, context):
with open(self._input_path, "r") as json_file:
data=json.load(json_file)
columns={key for row in data for key in row.keys()}
with open(self._output_path, "w") as csv_file:
writer=csv.DictWriter(csv_file, fieldnames=columns)
writer.writeheader()
writer.writerows(data)
Python
๋ณต์ฌ
Test
โข
ํฝ์ค์ฒ(fixture): ์ ์ธํย ํจ์๋ฅผย ํ
์คํธย ํจ์์์ย ์ธ์๋กย ๋ฐ์ย ์ฌ์ฉํ ย ์ย ์๊ฒย ํด์ฃผ๋ย ๊ธฐ๋ฅ(ex. tmp_path ์ธ์)
1) ํ
์คํธ ํจ์ A๋ฅผ ์์ฑํ๊ณ
2) A์ fixture ๋ฅผ ์ ์ธํ๋ฉด,
3) ๋ค๋ฅธ ํ
์คํธ ํจ์์์ A์ ๊ฒฐ๊ณผ๋ฅผ ๋ฐ์์ ์ฌ์ฉํ ์ ์๋ค.
def test_json_to_csv_operator(tmp_path: Path):
print(tmp_path.as_posix())
input_path = tmp_path / "input.json"
output_path = tmp_path / "output.csv"
# Write input data to tmp path
input_data = [
{"name": "bob", "age": "41", "sex": "M"},
{"name": "alice", "age": "24", "sex": "F"},
{"name": "carol", "age": "60", "sex": "F"},
]
with open(input_path, "w") as f:
f.write(json.dumps(input_data))
# Run task
operator = JsonToCsvOperator(
task_id="test", input_path=input_path, output_path=output_path
)
operator.execute(context={})
# Read result
with open(output_path, "r") as f:
reader = csv.DictReader(f)
result = [dict(row) for row in reader]
# Assert
assert result == input_data
Python
๋ณต์ฌ
cd /opt/airflow/dags/Chap09/tests/airflowbook/operators
pytest test_json_to_csv_operator.py
Shell
๋ณต์ฌ
Test log
ํ์ผ ์ด๋ฆ์ด static์ด๋ผ๋.. ๋๋ฌด dumb ํด์!
โ ๋ค์ ์น์
์ ๋๊ธฐํ์ธ์
9.2 ํ ์คํธ์์ DAG ๋ฐ ํ ์คํฌ ์ฝํ ์คํธ๋ก ์์ ํ๊ธฐ
โข
์คํผ๋ ์ดํฐ๋ ๋ณดํต ์คํ์ ์ํด ๋ ๋ง์ ์ฝํ
์คํธ(๋ณ์ ํ
ํ๋ฆฟ) ๋๋ ์์
์ธ์คํด์ค ์ฝํ
์คํธ๊ฐ ํ์ํ๋ค
Task๊ฐ ์คํ๋ ๋ ์ํ๋๋ ๋จ๊ณ
โข
operator.execute(context={}) ํํ๋ณด๋ค ๋ ์ค์ ์์คํ
์ ๊ฐ๊น๊ฒ Task๋ฅผ ์คํํด์ผํ๋ค
โข
์ ๊ทธ๋ฆผ์์ output_path ์ธ์๋ฅผ /output/{{ ds }}.json ํํ๋ก ์ ๊ณตํ ์ ์์ง๋ง context={}์ผ๋ก ํ
์คํธํ๋ฉด ds ๋ณ์๋ฅผ ์ฌ์ฉํ ์ ์๋ค.
โ ๋ฐ๋ผ์ airflow ์์ฒด์์ Task๋ฅผ ์์ํ ๋ ์ฌ์ฉํ๋ ์ค์ ๋ฉ์๋์ธ operator.run()์ ํธ์ถํจ
โข
๊ธฐ๋ณธ DAG
dag = DAG(
"test_dag",
default_args={"owner": "airflow", "start_date": datetime.datetime(2019, 1, 1)},
schedule_interval="@daily",
)
Python
๋ณต์ฌ
โข
ํ
ํ๋ฆฟ ๋ณ์๋ฅผ ํ ๋นํ๊ธฐ ์ํด DAG๋ก ํ
์คํธ
def test_movielens_operator(tmp_path: Path, mocker: MockFixture):
mocker.patch.object(
MovielensHook,
"get_connection",
return_value=Connection(conn_id="test", login="airflow", password="airflow"),
)
dag = DAG(
"test_dag",
default_args={"owner": "airflow", "start_date": datetime.datetime(2019, 1, 1)},
schedule_interval="@daily",
)
task = MovielensDownloadOperator(
task_id="test",
conn_id="testconn",
start_date="{{ prev_ds }}",
end_date="{{ ds }}",
output_path=str(tmp_path / "{{ ds }}.json"),
dag=dag,
)
dag.clear()
task.run(
start_date=dag.default_args["start_date"],
end_date=dag.default_args["start_date"],
ignore_ti_state=True,
)
Python
๋ณต์ฌ
์ฌ์ฌ์ฉ
DAG๋ฅผ ์ฌ์ฉํ๋ ํ
์คํธ๊ฐ ์ฌ๋ฌ๋ฒ ์๋ ๊ฒฝ์ฐ Pytest์ ํจ๊ป ์ฌ์ฌ์ฉํ ์ ์๋ค
โข
conftest.py ์ฌ์ฉํ์ฌ (ํ์)๋๋ ํ ๋ฆฌ์ ์ฌ๋ฌ ํ์ผ ์ฌ์ฌ์ฉ
@pytest.fixture
def test_dag():
return DAG(
"test_dag",
default_args={"owner": "airflow",
"start_date": datetime.datetime(2015, 1, 1)},
schedule_interval="@daily",
)
Python
๋ณต์ฌ
โข
ํ
์คํธ์ fixture ํฌํจํ์ฌ ํ์ ๊ฐ์ฒด ์์ฑ
โฆ
task.run(): DAG์ schedule_interval์ ๋ฐ๋ผ ์ง์ ๋ ๋ ๋ ์ง ์ฌ์ด์ ์คํํ Task์ ์ธ์คํด์ค ์ค์ผ์ค ์ฃผ๊ธฐ ๊ณ์ฐ
def test_movielens_operator(tmp_path: Path, mocker: MockFixture):
mocker.patch.object(
MovielensHook,
"get_connection",
return_value=Connection(
conn_id="test", login="airflow", password="airflow"),
)
dag = DAG(
"test_dag",
default_args={"owner": "airflow",
"start_date": datetime.datetime(2019, 1, 1)},
schedule_interval="@daily",
)
task = MovielensDownloadOperator(
task_id="test",
conn_id="testconn",
start_date="{{ prev_ds }}",
end_date="{{ ds }}",
output_path=str(tmp_path / "{{ ds }}.json"),
dag=dag,
)
dag.clear()
task.run(
start_date=dag.default_args["start_date"],
end_date=dag.default_args["start_date"],
ignore_ti_state=True,
)
Python
๋ณต์ฌ
9.2.1 ์ธ๋ถ ์์คํ ์์
MovieLens ํ์ ์ ์ฝ๊ณ ๊ฒฐ๊ณผ๋ฅผ Postgres ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ ์ฅํ๋ ์คํผ๋ ์ดํฐ๋ฅผ ๋ง๋ค์ด๋ณด์.
class MovielensToPostgresOperator(BaseOperator):
template_fields=("_start_date", "_end_date", "_insert_query")
def __init__(self,movielens_conn_id, start_date, end_date, insert_query, postgres_conn_id, **kwargs):
super().__init__(**kwargs)
self._movielens_conn_id = movielens_conn_id
self._start_date = start_date
self._end_date = end_date
self._insert_query = insert_query
self._postgres_conn_id = postgres_conn_id
def execute(self, context):
with MovielensHook(self._movielens_conn_id) as movielens_hook:
ratings=list(movielens_hook.get_ratings(start_date=self._start_date, end_date=self._end_date))
postgres_hook=PostgresHook(postgres_conn_id=self._postgres_conn_id)
insert_queries=[
self._insert_query.format(",".join([str(_[1]) for _ in sorted(rating.items())])
) for rating in ratings
]
postgres_hook.run(insert_queries)
Python
๋ณต์ฌ
_insert_query.format(",".join([str(_[1]) for _ in sorted(rating.items())])
ํ ์คํธ ๋จ๊ณ์์ Postgres ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ ๊ทผํ ์ ์๋ค๋ฉด?
โข
pytest์์ ํ
์คํธ๋ฅผ ํ ์ ์๋๋ก ๋์ปค ์ปจํ
์ด๋์ ์ ์ด๊ฐ ๊ฐ๋ฅํ ์ด๋ฐ์ ๋ฐ ํ์ด์ฌ ํจํค์ง๋ค์ด ์๋ค.
โฆ
pytest-docker-tools
โฆ
ํ
์คํธ ๋์ปค ์ปจํ
์ด๋๋ฅผ ์์ฑํ ์ ์๋ ์ฌ๋ฌ ํจ์ ์ ๊ณต
โ ๋์ปค ์ปจํ
์ด๋๋ก ํ
์คํธํ๋ฉด ๋ชฉ์
ํ๊ฒฝ์ ๊ตฌ์ฑํ ํ์ ์์ด ์ค์ ํ
๋ฉ์๋๋ฅผ ์ฌ์ฉํ ์ ์๋ค!
ํ ์คํธ
postgres_image = fetch(repository="postgres:11.1-alpine")
postgres = container(
image="{postgres_image.id}",
environment={"POSTGRES_USER": "testuser", "POSTGRES_PASSWORD": "testpass"},
ports={"5432/tcp": None}, # ์ปจํ
์ด๋์ 5432 ํฌํธ๋ฅผ ํธ์คํธ์ ์์์ ์ด๋ฆฐ ํฌํธ(None)์ผ๋ก ๋งคํ
volumes={
os.path.join(os.path.dirname(__file__), "postgres-init.sql"): {
"bind": "/docker-entrypoint-initdb.d/postgres-init.sql"
}
},
)
def test_movielens_to_postgres_operator(mocker, test_dag, postgres):
mocker.patch.object(
MovielensHook,
"get_connection",
return_value=Connection(
conn_id="test", login="airflow", password="airflow"),
)
mocker.patch.object(
PostgresHook,
"get_connection",
return_value=Connection(
conn_id="postgres",
conn_type="postgres",
host="localhost",
login="testuser",
password="testpass",
port=postgres.ports["5432/tcp"][0], # ํธ์คํธ์ ํ ๋น๋ ํฌํธ
),
)
task = MovielensToPostgresOperator(
task_id="test",
movielens_conn_id="movielens_id",
start_date="{{ prev_ds }}",
end_date="{{ ds }}",
postgres_conn_id="postgres_id",
insert_query=(
"INSERT INTO movielens (movieId,rating,ratingTimestamp,userId,scrapeTime) "
"VALUES ({0}, '{{ macros.datetime.now() }}')"
),
dag=test_dag,
)
pg_hook = PostgresHook()
row_count = pg_hook.get_first("SELECT COUNT(*) FROM movielens")[0]
assert row_count == 0
pytest.helpers.run_airflow_task(task, test_dag)
row_count = pg_hook.get_first("SELECT COUNT(*) FROM movielens")[0]
assert row_count > 0
Python
๋ณต์ฌ
โข
pytest_docker_tools์ container ํจ์๋ fixture์ด๊ธฐ ๋๋ฌธ์ ํ
์คํธ๋ฅผ ์ํด ์ธ์๋ก ์ ๊ณตํด์ผ ํธ์ถํ ์ ์๋ค.
โข
/docker-entrypoint-initdb.d ๋๋ ํฐ๋ฆฌ์ ๋ฐฐ์นํ์ฌ ์์ ์คํฌ๋ฆฝํธ๋ก ์ปจํ
์ด๋๋ฅผ ์ด๊ธฐํํ ์ ์์(์ปจํ
์ด๋ ๋ถํ
์ ์คํ)
cd /opt/airflow/dags/Chap09/tests/airflowbook/operators
pytest test_movielens_operator_postgres.py
Shell
๋ณต์ฌ
Test log
9.3 ๊ฐ๋ฐ์ ์ํด ํ ์คํธ ์ฌ์ฉํ๊ธฐ
โข
๋ค๋ค ์ฉ ์ฐ์ด์ ๋๋ฒ๊น
ํด๋ณด์
จ์ฃ ?
โข
PyCharm, VSCode ๋ฑ ์ฌ๋งํ IDE๋ ๋ค ๊ฐ๋ฅํฉ๋๋ค
โข
์ผ์ข
์ breakpoint๋ฅผ ๋ง๋๋ ๊ฑด๋ฐ, ํด๋น ์ฝ๋ ๋ผ์ธ๊น์ง ๋๋ฌํ๋ฉด
1.
๋ณ์์ ํ์ฌ ์ํ(๊ฐ)
2.
ํด๋น ์์ ์์ ์ฝ๋ ์คํ ๊ฐ๋ฅ(๋ค์ ์ค, โฆ)
โข
IDE์์ ์ฌ์ฉํ ์ ์๋ ๊ฒฝ์ฐ pdb(Python Debugger)๋ผ๋ ๋ด์ฅ ๋๋ฒ๊ฑฐ๊ฐ ์์ด์ ๋๋ฒ๊ทธํ๋ ค๋ ์์น์ ๋ค์ ์ฝ๋ ๋ผ์ธ์ ์ถ๊ฐํ๋ฉด ๋จ
import pdb; pdb.set_trace()
Python
๋ณต์ฌ
โข
airflow์ ์ ์ฉํ๊ธฐ ์ํด์๋ airflow tasks test๋ฅผ ์ด์ฉํ์ฌ ๋ฉํ์คํ ์ด์ ์คํ ๊ธฐ๋ก์ ๋ฑ๋กํ์ง ์๊ณ Task๋ฅผ ์คํํ์ฌ ํ
์คํธ!
airflow tasks test [dagid] [taskid] [execution date]
Python
๋ณต์ฌ
9.3.1 DAG ์๋ฃ ํ ์คํธํ๊ธฐ
โข
Task ์ธ์คํด์ค ์ฝํ
์คํธ๋ฅผ ํฌํจํ ํ
์คํธ, ๋ก์ปฌ ํ์ผ ์์คํ
์ ์ด์ํ ์คํผ๋ ์ดํฐ, ๋์ปค๋ฅผ ์ด์ฉํ ์ธ๋ถ ์์คํ
์ ์ฌ์ฉํ๋ ์คํผ๋ ์ดํฐ ํ
์คํธ ๋ฑ๋ฑโฆ
โข
์ด๋ ๋ชจ๋ ๋จ์ผ ์คํผ๋ ์ดํฐ ํ
์คํธ์ ์ค์ ์ ๋์ง๋งโฆ
ย ์ํฌํ๋ก ๊ฐ๋ฐ์์ ๊ฐ์ฅ ์ค์ํ ๊ฒ์ ๋ชจ๋ ๊ตฌ์ฑ ์๊ฑด์ด ์ ๋ง๋์ง ํ์ธํ๋ ๊ฒ ์๋๊ฐ์?
โข
๊ทธ๋ฌ๋ ์ด๊ฒ์ ๋๋ตํ๊ธฐ ์ด๋ ต์ต๋๋คโฆ
โข
๊ฐ์ธ ์ ๋ณด ๋ณดํธ ๊ท์ ์ด๋ ๋ฐ์ดํฐ ํฌ๊ธฐ๋ก ์ธํด ๊ฐ๋ฐ ํ๊ฒฝ์์ ์๋ฒฝํ ํ๋ก๋์
ํ๊ฒฝ์ ์๋ฎฌ๋ ์ด์
ํ๊ธฐ๋ ์ด๋ ค์(๋ฐ์ดํฐ๊ฐ ํํ๋ฐ์ดํธ ๋จ์๋ผ๋ฉด..?)
โข
๋ฐ๋ผ์ sw๋ฅผ ๊ฐ๋ฐํ๊ณ ๊ฒ์ฆํ๋ ๋ฐ ์ฌ์ฉํ ์ ์๋ ๊ฐ๋ฅํ ํ ๊ตฌํํ ์ ์๋ ํ๋ก๋์
ํ๊ฒฝ์ ๋ง๋ค์ด์ ํ
์คํธํจ
9.4 Whirl์ ์ด์ฉํ ํ๋ก๋์ ํ๊ฒฝ ์๋ฎฌ๋ ์ด์
โข
๋์ปค์์ ์ด์ ํ๊ฒฝ์ ๋ชจ๋ ๊ตฌ์ฑ ์์๋ฅผ ์๋ฎฌ๋ ์ด์
โข
๋์ปค ์ปดํฌ์ฆ๋ก ๊ตฌ์ฑ ์์ ๊ด๋ฆฌ
โข
๊ทธ๋ฌ๋ ๋ชจ๋ ๊ฒ์ด ๋์ปค ์ด๋ฏธ์ง๋ก ์ ๊ณต๋์ง๋ ์๋๋ค๋ ๋จ์ ์ด ์์
9.5 DTAP ํ๊ฒฝ ์์ฑ
DTAP๋?
Development, Test, Acceptance, Production์ ๋ถ๋ฆฌํ๋ ์์คํ
๊ฒฉ๋ฆฌ๋ DTAP ํ๊ฒฝ์ ์๋ก ์ค์ , ๊ตฌํํ์ฌ ํ
์คํธ (์ค์ ๋ก ๊ฐ๋ฅํ ๋ฐ์ง๋โฆ)
Github Action ๋ง๋ณด๊ธฐ
โข
์ฑ
์๋ ์ด๋ก ๋ง ์์ง๋งโฆ
์ฑ ์ ์๋ ๊ฑฐ
โข
์ด๊ฑด ํ์ค์ ์ผ๋ก ์ง๊ธ ์ฐ๋ฆฌ๊ฐ ํ๊ธฐ ์ด๋ ต์ต๋๋คโฆ
name: Python static checks and tests for dags
on:
push:
paths:
- "dags/Chap09/**"
branches:
- "main"
jobs:
test-and-check:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup Python
uses: actions/setup-python@v2
with:
python-version: "3.6.9"
- name: Install Flake8
run: pip install flake8
- name: Run Flake8
run: flake8
- name: Install Pylint
run: pip install pylint
- name: Run Pylint
run: find dags/Chap09/tests -name "*.py" | xargs pylint --output-format=colorized
- name: Install Black
run: pip install black
- name: Run Black
run: find dags/Chap09/tests -name "*.py" | xargs black --check
- name: Install dependencies
run: pip install apache-airflow pytest
- name: Test DAG integrity
run: pytest dags/Chap09/tests
YAML
๋ณต์ฌ
Black
โข
์ฝ๋ ์คํ์ผ ํต์ผํ๊ธฐ
โข
๋ค๋ค ์ฌ์ฉํ๋ ์ฝ๋ ํฌ๋งทํฐ๋ ๋ค๋ฅด๊ณ (์์ ์ ์ฐ์ค ์๋ ์๊ณ ) ํด์ ํจ์ค
Pylint
โข
๋ฆฐํธ(lint) ๋๋ย ๋ฆฐํฐ(linter)๋ผ๊ณ ๋ถ๋ฆ
โข
์๋ฌ, ์คํ์ผ ๋ฑ ๊ฒ์ฌ
Flake8
โข
pep8 ๊ธฐ๋ฐ ์ฝ๋ฉ ์ปจ๋ฒค์
์ ์ค์
๋น๊ต
์ฌ์ฉ?
์ ํฌ ํ์ฌ๋ Python ๊ธฐ๋ฐ ๋ชจ๋
ธ๋ ํฌ์ด๋ค๋ณด๋ ์ธ๊ฐ์ง๋ฅผ ์ ๋ถ ์ฌ์ฉํ๊ณ ์์ด๋๋ค
โข
๋ชจ๋
ธ๋ ํฌ?
โฆ
์ฌ๋ฌ ๊ฐ์ ํ๋ก์ ํธ๋ฅผ ํ ์ ์ฅ์์ ์ ์ฅํ๋ sw ๊ฐ๋ฐ ์ ๋ต
โช
์ค๋ณต๋๋ ์ฝ๋
โช
์๋ก ๋ค๋ฅธ ํจํค์ง ์์กด์ฑ
โช
์๋ก ์์กดํ๋ ํ๋ก์ ํธ๋ค๋ผ๋ฆฌ์ ๋ฆฌํฉํฐ๋ง ๋น์ฉ
โช
์ฝ๋๊ฐ ์ ์ฅ์๋ง๋ค ์ํฉ์ด ๋ค๋ฅด๊ธฐ์ ๊ฐ๋ฐ์๋ค ์ฌ์ด์ ํ์
๋ ์ด๋ ค์์ง๋ ๋ฌธ์
ํ ์คํธ
name: Python mini test for dags
on:
push:
paths:
- "dags/Chap09/**"
branches:
- "main"
jobs:
test-and-check:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup Python
uses: actions/setup-python@v2
with:
python-version: "3.6.9"
- name: Install dependencies
run: pip install apache-airflow pytest
- name: Test DAG integrity
run: cd dags/Chap09/tests/dags && pytest test_dag_integrity.py
YAML
๋ณต์ฌ
โข
์ด๋ฏธ workflow์ ์ถ๊ฐํด๋จ์ด๋ค
โข
dags/Chap09 ๋ฐ์ ๋ณ๊ฒฝ์ฌํญ์ด ์๊ธธ ๋๋ง๋ค ์ํฌํ๋ก์ฐ๊ฐ ํธ๋ฆฌ๊ฑฐ ๋ฉ๋๋ค
CI ๋จ๊ณ
github action ์ ์ฉ pytest๋ฅผ ์ด์ฉํ airflow integrity test
ubuntu - python ๊ธฐ๋ณธ ํ๊ฒฝ์ ๊ธฐ๋ฐ์ผ๋ก git action yaml์ ์์ฑ.
์ด ๋, airflow๋ฅผ ์ค์นํ ํ, airflow metadatabase๋ฅผ init ํด์ค๋ค
๊ทธ๋ฆฌ๊ณ ํ์ํ ์ปค๋ฅ์
, variable๋ค์ importํ ์ ์๊ฒ ์คํฌ๋ฆฝํธ๋ฅผ ์์ฑํด์ค ํ,
ํด๋น ์คํฌ๋ฆฝํธ๋ฅผ ๊น ์ก์
์์ ์ฌ์ฉํ ์ ์๋๋ก ํ๋ค
ํด๋น ๋ถ๋ถ์ ์์ธํ ์ค๋ช
์ ์๋ต.
Test ๋จ๊ณ
์๋๋ pytest๋ฅผ ์ด์ฉํ integrity test ๋จ๊ณ
import pytest
from airflow.models import DagBag
@pytest.fixture(scope="session")
def dag_bag():
dagbag = DagBag(dag_folder='dags', include_examples=False)
return dagbag
def test_dags_load_with_no_errors(dag_bag):
print(dag_bag.dags)
assert dag_bag.dags is not None
assert len(dag_bag.import_errors) == 0
def test_dag_structure(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert dag.tasks, f"DAG {dag_id} has no tasks"
def test_schedule_interval(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert dag.schedule_interval, f"DAG {dag_id} has no schedule_interval"
def test_default_args(dag_bag):
for dag_id, dag in dag_bag.dags.items():
assert dag.default_args, f"DAG {dag_id} has no default_args"
# Further checks can be added here, e.g., checking for specific default arguments
def test_task_dependencies(dag_bag):
for dag_id, dag in dag_bag.dags.items():
for task in dag.tasks:
assert task.downstream_list or task.upstream_list, f"Task {task.task_id} in DAG {dag_id} has no dependencies"
JavaScript
๋ณต์ฌ
dag์ ์ ํฉ์ฑ์ ์ฒดํฌํด์ฃผ๋ ๋จ๊ณ์ด๋ค.