Search

Airflow with Test in CI

ํ…Œ์ŠคํŠธ๋ž€?

ย ๊ฐœ๋ฐœํ•œ ์ฝ”๋“œ๋ฅผ ๋ฐฐํฌํ•  ๋•Œ ๊ฐ€์žฅ ์ค‘์š”ํ•œ ๊ฒƒ์€?

โ†’ ์‹ค์ œ ์„œ๋น„์Šค์—์„œ ์ œ๋Œ€๋กœ ๋™์ž‘ํ•˜๋Š” ์ง€ โ€œํ…Œ์ŠคํŠธโ€ํ•˜๋Š” ๊ฒƒ
โ€ข
99.99999% ์˜ ๊ธฐ์—…์—์„œ๋Š” ๋ฐฐํฌ ์ „ ํ…Œ์ŠคํŠธ๋ฅผ ์ˆ˜ํ–‰ํ•˜๋ฉฐ, ๋Œ€๋ถ€๋ถ„ ํ…Œ์ŠคํŒ…์€ CI ํŒŒ์ดํ”„๋ผ์ธ์—์„œ ์ž๋™ํ™”๋˜์–ด ์žˆ๋‹ค.
Github Actions๋ฅผ ์ด์šฉํ•œ ํ…Œ์ŠคํŠธ UI
โ€ข
ํ†ตํ•ฉ ํ…Œ์ŠคํŠธ์˜ ๊ฒฝ์šฐ Github Actions๋ฅผ ํ†ตํ•ด ๊ตฌํ˜„ํ•˜๊ธฐ๋„ ํ•˜์ง€๋งŒ, ํ…Œ์ŠคํŠธ ๋‹จ๊ณ„์— ๋”ฐ๋ผ ์ฝ”๋“œ ๋ ˆ๋ฒจ์—์„œ ๋งŽ์€ ๋‹จ์œ„ ํ…Œ์ŠคํŠธ(unit test)๋ฅผ ๊ตฌํ˜„ํ•˜๊ณ , ์ง์ ‘ ์ˆ˜ํ–‰ํ•˜๋ฉฐ ์‹คํŒจ์‹œ ๋””๋ฒ„๊น…์„ ํ•œ๋‹ค.
Golang์—์„œ ํ…Œ์ŠคํŠธ
โ€ข
golang์—์„œ๋Š” ํ…Œ์ŠคํŠธ ํ”„๋ ˆ์ž„์›Œํฌ ๋‚ด์žฅ
โ—ฆ
go test ํ•˜๋ฉด ํ˜„์žฌ ํด๋”์˜ *_test.go๋ฅผ ์ธ์‹ํ•˜๊ณ  ์ผ๊ด„ ์‹คํ–‰
โ†’ Python์€ Pytest๋ฅผ ์‚ฌ์šฉํ•จ

ย ๋‹จ์œ„ ํ…Œ์ŠคํŠธ? ํ†ตํ•ฉ ํ…Œ์ŠคํŠธ?

๋‹จ์œ„ ํ…Œ์ŠคํŠธ: ๊ฐœ๋ณ„ Task ๋‹จ์œ„(๋‹จ์ผ ๊ธฐ๋Šฅ)๋กœ ํ…Œ์ŠคํŠธ๋ฅผ ๋ถ„ํ• ํ•˜์—ฌ ์ˆ˜ํ–‰ํ•˜๋Š” ๊ฒƒ
โ€ข
๊ฐ ๋‹จ์œ„์˜ ์˜ฌ๋ฐ”๋ฅธ ๋™์ž‘์€ ๊ฒ€์ฆํ•  ์ˆ˜ ์žˆ์ง€๋งŒ, ์—ฌ๋Ÿฌ ๋‹จ์œ„๋กœ ๊ตฌ์„ฑ๋œ ๋ณต์žกํ•œ ์‹œ์Šคํ…œ์˜ ๋™์ž‘์„ ๋ชจ๋‘ ๊ฒ€์ฆํ•˜์ง€๋Š” ์•Š์œผ๋ฏ€๋กœ ์ตœ์ข…์ ์œผ๋กœ๋Š” ํ†ตํ•ฉ ํ…Œ์ŠคํŠธ๋ฅผ ์ž‘์„ฑํ•ด์•ผํ•จ

ย ์™œ Airflow์—์„œ ํ…Œ์ŠคํŠธ๊ฐ€ ์ค‘์š”ํ• ๊นŒ?

โ€ข
๋งŽ์€ ์™ธ๋ถ€ ์‹œ์Šคํ…œ๊ณผ ํ†ต์‹ ์ด ๋ฐœ์ƒํ•˜๋Š” ํŠน์„ฑ์„ ๊ฐ€์ง
โ€ข
๋กœ์ง์„ ์‹ค์ œ๋กœ ์ˆ˜ํ–‰ํ•˜๋Š” Task๋ฅผ ์‹œ์ž‘ํ•˜๊ณ  ์ข…๋ฃŒ์‹œํ‚ค๋Š” ์—ญํ• (์˜ค์ผ€์ŠคํŠธ๋ ˆ์ด์…˜ ์‹œ์Šคํ…œ)

9.1.1 ๋ชจ๋“  DAG์— ๋Œ€ํ•œ ๋ฌด๊ฒฐ์„ฑ ํ…Œ์ŠคํŠธ

๋ฌด๊ฒฐ์„ฑ ํ…Œ์ŠคํŠธ

๋ชจ๋“  DAG์˜ ๋ฌด๊ฒฐ์„ฑ์— ๋Œ€ํ•ด DAG๊ฐ€ ์ •์ƒ์ ์œผ๋กœ ๊ตฌํ˜„๋˜์—ˆ๋Š”์ง€ ํ™•์ธ
โ€ข
DAG์— ์‚ฌ์ดํด์ด ํฌํ•จ๋˜์–ด ์žˆ์ง€๋Š” ์•Š์€์ง€
โ€ข
DAG์˜ Task ID๊ฐ€ ๊ณ ์œ ํ•œ์ง€
โ€ข
๋“ฑ๋“ฑ๋“ฑ

ํ…Œ์ŠคํŠธ ๊ตฌ์„ฑ

ํ”„๋กœ์ ํŠธ์˜ ์ตœ์ƒ๋‹จ ๋””๋ ‰ํ† ๋ฆฌ์— ๋ณ„๋„์˜ tests/ ๋””๋ ‰ํ† ๋ฆฌ๋ฅผ ์ƒ์„ฑํ•˜์—ฌ ๊ฒ€์‚ฌ ๋Œ€์ƒ ์ฝ”๋“œ๋ฅผ ๊ทธ๋Œ€๋กœ ๋ณต์‚ฌํ•˜์—ฌ ๊ตฌ์„ฑ
ํ•„์ˆ˜๋Š” ์•„๋‹ˆ์ง€๋งŒโ€ฆ
โ€ข
pytest๋Š” ์ฃผ์–ด์ง„ ๋””๋ ‰ํ† ๋ฆฌ์—์„œ test_ || _test๋ฅผ ๊ฒ€์ƒ‰
โ€ข
ํ…Œ์ŠคํŠธ๋Š” ์ ˆ๋Œ€๋กœ ์˜์กด์ ์ด์–ด์„  ์•ˆ ๋œ๋‹ค.

ํ…Œ์ŠคํŠธ ์˜ˆ์‹œ

# DAG_PATH: ํ…Œ์ŠคํŠธ๋ฅผ ํ•  DAG์˜ ์œ„์น˜, DAG_FILES: DAG_PATH ์•ˆ์— ์žˆ๋Š” ํŒŒ์ผ๋“ค์˜ list DAG_PATH = os.path.join(os.path.dirname(__file__), "..", "..", "dags/**.py") DAG_FILES = glob.glob(DAG_PATH) @pytest.mark.parametrize("dag_file", DAG_FILES) def test_dag_integrity(dag_file): """Import DAG files and check for DAG.""" module_name, _ = os.path.splitext(dag_file) # ๋ชจ๋“ˆ์˜ ์ ˆ๋Œ€๊ฒฝ๋กœ ํฌํ•จ ์ด๋ฆ„, ํ™•์žฅ์ž module_path = os.path.join(DAG_PATH, dag_file) # ํ…Œ์ŠคํŠธํ•  DAG์˜ ์œ„์น˜ mod_spec = importlib.util.spec_from_file_location(module_name, module_path) # ๋ชจ๋“ˆ์„ ๊ฐ€์ ธ์˜ด module = importlib.util.module_from_spec(mod_spec) mod_spec.loader.exec_module(module) # ํŒŒ์ผ ์ค‘ DAG ์ถ”์ถœ dag_objects = [var for var in vars( module).values() if isinstance(var, DAG)] # ๊ฒ€์‚ฌ 1: ๋ชจ๋“  ํŒŒ์ด์ฌ ํŒŒ์ผ์—์„œ DAG ๊ฐ์ฒด๊ฐ€ ์กด์žฌํ•˜๋Š”์ง€ ์œ ํšจ์„ฑ ๊ฒ€์‚ฌ assert dag_objects # ๊ฒ€์‚ฌ 2: DAG ๊ฐ์ฒด์— ์ˆœํ™˜ ์ฃผ๊ธฐ ์กด์žฌ ์—ฌ๋ถ€ ํ™•์ธ for dag in dag_objects: # Test cycles dag.test_cycle()
Python
๋ณต์‚ฌ
โ€ข
๊ฒ€์‚ฌ 1: ๋ชจ๋“  ํŒŒ์ด์ฌ ํŒŒ์ผ์—์„œ DAG ๊ฐ์ฒด๊ฐ€ ์กด์žฌํ•˜๋Š”์ง€ ์œ ํšจ์„ฑ ๊ฒ€์‚ฌ
ย assert? raise?
โ—ฆ
assert [์กฐ๊ฑด], [์—๋Ÿฌ ๋ฉ”์‹œ์ง€]: ์กฐ๊ฑด์ด ์ฐธ์ด๋ฉด ๊ทธ๋Œ€๋กœ ์ง„ํ–‰, ๊ฑฐ์ง“์ด๋ฉด ์—๋Ÿฌ ๋ฉ”์‹œ์ง€ ์ถœ๋ ฅ
โ—ฆ
[์กฐ๊ฑด]์„ ๋ณด์žฅํ•จ!
โ€ข
๊ฒ€์‚ฌ 2: DAG ๊ฐ์ฒด์— ์ˆœํ™˜ ์ฃผ๊ธฐ ์กด์žฌ ์—ฌ๋ถ€ ํ™•์ธ
โ€ข
๊ทธ ์™ธโ€ฆ
โ—ฆ
DAG์˜ ๊ธฐ๋Šฅ์ด๋‚˜ ๊ตฌํ˜„๊ณผ ๊ด€๋ จ๋œ ๊ฒ€์ฆ์„ ํ•  ์ˆ˜๋„ ์žˆ๊ณ , ๋‹จ์ˆœ ํŒŒ์ด์ฌ์„ ์ด์šฉํ•œ ํ…Œ์ŠคํŠธ๋ฅผ ํ•  ์ˆ˜๋„ ์žˆ์Œ!
โ—ฆ
ex) ๋ญํ•˜๋Š” ์• ์ผ๊นŒ์š”?
assert dag.dag_id.startswith((โ€importโ€, โ€œexportโ€))
Python
๋ณต์‚ฌ

์‹ค์Šต

1.
์ ‘์† ๋ฐ ์‹คํ–‰
docker exec -it boaz-airflow-study-airflow-worker-1 /bin/bash
Shell
๋ณต์‚ฌ
cd /opt/airflow/dags/Chap09/tests/dags pytest test_dag_integrity.py
Shell
๋ณต์‚ฌ
ย ๋„ˆ๋ฌด ๊ธธ์–ด์š”!!!
โ€ข
์›๋ž˜ ๊ฒฐ๋ก ์€ ๋งˆ์ง€๋ง‰์— ์žˆ์Šด..
================================================ short test summary info ================================================ FAILED test_dag_integrity.py::test_dag_integrity[/opt/airflow/dags/Chap09/tests/dags/../../dags/bash_operator_no_command.py] - airflow.exceptions.AirflowException: missing keyword argument 'bash_command' FAILED test_dag_integrity.py::test_dag_integrity[/opt/airflow/dags/Chap09/tests/dags/../../dags/dag_cycle.py] - AttributeError: 'DAG' object has no attribute 'test_cycle' FAILED test_dag_integrity.py::test_dag_integrity[/opt/airflow/dags/Chap09/tests/dags/../../dags/duplicate_task_ids.py] - airflow.exceptions.DuplicateTaskIdFound: Task id 'task' has already been added to the DAG FAILED test_dag_integrity.py::test_dag_integrity[/opt/airflow/dags/Chap09/tests/dags/../../dags/testme.py] - airflow.exceptions.DuplicateTaskIdFound: Task id 'test2' has already been added to the DAG ============================================ 4 failed, 11 warnings in 0.55s =============================================
Shell
๋ณต์‚ฌ
2.
๋กœ๊ทธ๋ฅผ ๋ณด๊ณ  ย ๊ฐ™์€ ์—๋Ÿฌ ์ฐพ๊ธฐ
1) bash_operator_no_command.py
import airflow.utils.dates from airflow import DAG from airflow.operators.bash import BashOperator dag = DAG( dag_id="chapter8_bash_operator_no_command", start_date=airflow.utils.dates.days_ago(3), schedule_interval=None, ) BashOperator(task_id="this_should_fail", dag=dag)
Python
๋ณต์‚ฌ
Test failure log
2) dag_cycle.py
import airflow.utils.dates from airflow import DAG from airflow.operators.dummy import DummyOperator dag = DAG( dag_id="chapter8_dag_cycle", start_date=airflow.utils.dates.days_ago(3), schedule_interval=None, ) t1 = DummyOperator(task_id="t1", dag=dag) t2 = DummyOperator(task_id="t2", dag=dag) t3 = DummyOperator(task_id="t3", dag=dag) t1 >> t2 >> t3 >> t1
Python
๋ณต์‚ฌ
Test failure log
3) duplicate_task_ids.py
import airflow.utils.dates from airflow import DAG from airflow.operators.dummy import DummyOperator dag = DAG( dag_id="chapter8_duplicate_task_ids", start_date=airflow.utils.dates.days_ago(3), schedule_interval=None, ) t1 = DummyOperator(task_id="task", dag=dag) for i in range(5): DummyOperator(task_id="task", dag=dag) >> t1
Python
๋ณต์‚ฌ
Test failure log
4) testme.py
import airflow.utils.dates from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.dummy import DummyOperator dag = DAG( dag_id="testme", start_date=airflow.utils.dates.days_ago(3), schedule_interval=None ) t1 = DummyOperator(task_id="test", dag=dag) for tasknr in range(5): BashOperator(task_id="test2", bash_command=f"echo '{tasknr}'", dag=dag) >> t1
Python
๋ณต์‚ฌ
Test failure log

9.1.2 CI/CD ํŒŒ์ดํ”„๋ผ์ธ ์„ค์ •ํ•˜๊ธฐ

CI/CD ํŒŒ์ดํ”„๋ผ์ธ?

์ฝ”๋“œ ์ €์žฅ์†Œ๋ฅผ ํ†ตํ•ด ์ฝ”๋“œ๊ฐ€ ๋ณ€๊ฒฝ๋  ๋•Œ ์‚ฌ์ „ ์ •์˜๋œ ์Šคํฌ๋ฆฝํŠธ๋ฅผ ์‹คํ–‰ํ•˜๋Š” ์‹œ์Šคํ…œ
โ€ข
CI(Continuous integration): ๋ณ€๊ฒฝ๋œ ์ฝ”๋“œ๊ฐ€ ์ฝ”๋”ฉ ํ‘œ์ค€๊ณผ ํ…Œ์ŠคํŠธ ์กฐ๊ฑด์„ ์ค€์ˆ˜ํ•˜๋Š” ์ง€ ํ™•์ธ/๊ฒ€์ฆ
โ—ฆ
Flake8, Pylint, Black ๋“ฑ์˜ ์ฝ”๋“œ ๊ฒ€์‚ฌ๊ธฐ๋ฅผ ํ†ตํ•ด ์ฝ”๋“œ๋ฅผ ๋ ˆํฌ์ง€ํ† ๋ฆฌ์— ํ‘ธ์‹œํ•  ๋•Œ ์ฝ”๋“œ์˜ ํ’ˆ์งˆ ํ™•์ธ ๋ฐ ํ…Œ์ŠคํŠธ ์‹คํ–‰
โ€ข
CD(Continuous deployment): ์‚ฌ๋žŒ์˜ ๊ฐ„์„ญ ์—†์ด ์™„์ „ํžˆ ์ž๋™ํ™”๋œ ์ฝ”๋“œ๋ฅผ ํ”„๋กœ๋•์…˜ ์‹œ์Šคํ…œ์— ์ž๋™์œผ๋กœ ๋ฐฐํฌํ•˜๋Š” ๊ฒƒ
โ—ฆ
์ˆ˜๋™์œผ๋กœ ๊ฒ€์ฆํ•˜๊ณ  ๋ฐฐํฌํ•˜์ง€ ์•Š๊ณ ๋„ ์ƒ์‚ฐ์„ฑ ๊ทน๋Œ€ํ™”
โ€ข
์˜ˆ์‹œ
.github/workflows/control-image-build.yml
name: Docker Image CI(Control server) on: push: paths: - "resources/images/control-ubuntu/**" branches: - "main" jobs: build-and-push-image: runs-on: ubuntu-latest steps: - name: Checkout uses: actions/checkout@v2 - name: Set up Docker Buildx uses: docker/setup-buildx-action@v1 - name: Login to DockerHub uses: docker/login-action@v1 with: username: ${{ secrets.DOCKERHUB_USERNAME }} password: ${{ secrets.DOCKERHUB_TOKEN }} - name: Build and push uses: docker/build-push-action@v2 with: context: ./resources/images/control-ubuntu/ file: ./resources/images/control-ubuntu/Dockerfile platforms: linux/amd64 push: true tags: ryann3/control-ubuntu:4.0
YAML
๋ณต์‚ฌ
main ๋ธŒ๋žœ์น˜์— ๋จธ์ง€๋œ ์ปค๋ฐ‹ ๋‹จ์œ„๋กœ workflow๊ฐ€ ์‹คํ–‰๋œ๋‹ค
โ—ฆ
์ •์˜ํ•œ build-and-push-image job์˜ step ๋ณ„๋กœ ์‹คํ–‰๋œ ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.
์‹คํŒจํ•œ ๊ฒฝ์šฐ

Pytest with airflow

โ€ข
์ถ”ํ›„ ํ…Œ์ŠคํŠธ ์˜ˆ์ •!
name: Python tatic checks and tests for dags on: push: paths: - "dags/Chap09/**" branches: - "main" jobs: test-and-check: runs-on: ubuntu-latest steps: - name: Checkout uses: actions/checkout@v2 - name: Setup Python uses: actions/setup-python@v2 with: python-version: "3.6.9" - name: Install Flake8 run: pip install flake8 - name: Run Flake8 run: flake8 - name: Install Pylint run: pip install pylint - name: Run Pylint run: find dags/Chap09/tests -name "*.py" | xargs pylint --output-format=colorized - name: Install Black run: pip install black - name: Run Black run: find dags/Chap09/tests -name "*.py" | xargs black --check - name: Install dependencies run: pip install apache-airflow pytest - name: Test DAG integrity run: pytest dags/Chap09/tests
YAML
๋ณต์‚ฌ

9.1.3 ๋‹จ์œ„ ํ…Œ์ŠคํŠธ ์ž‘์„ฑํ•˜๊ธฐ

์ง€๋‚œ ์‹œ๊ฐ„ ์‹ค์Šต์„ ๋– ์˜ฌ๋ ค๋ณด์žโ€ฆ
... def get_ratings(self, start_date=None, end_date=None, batch_size=100): """ Fetches ratings between the given start/end date. Parameters ---------- start_date : str Start date to start fetching ratings from (inclusive). Expected format is YYYY-MM-DD (equal to Airflow's ds formats). end_date : str End date to fetching ratings up to (exclusive). Expected format is YYYY-MM-DD (equal to Airflow's ds formats). batch_size : int Size of the batches (pages) to fetch from the API. Larger values mean less requests, but more data transferred per request. """ yield from self._get_with_pagination( endpoint="/ratings", params={"start_date": start_date, "end_date": end_date}, batch_size=batch_size, ) ...
Python
๋ณต์‚ฌ
โ€ข
get_ratings์—์„œ batch size๊ฐ€ ์Œ์ˆ˜๋ผ๋ฉด?!?!
1.
์—ํ—ค์ด ์กฐ์กŒ๋„ค~
2.
์‚ฌ์šฉ์ž์˜ ์ž…๋ ฅ ๊ฐ’์„ ์ „๋‹ฌํ•˜๊ธฐ ์ „์— ํ•ด๋‹น ๊ฐ’์ด ์œ ํšจํ•œ์ง€๋ฅผ ๋จผ์ € ํŒ๋‹จํ•ด๋ณผ๊นŒ?
๋ˆ„๊ฐ€ ํšŒ์‚ฌ์—์„œ ๋” ์˜ˆ์จ๋ฐ›์„๊นŒ์š”?

์˜ˆ์‹œ: ์ฃผ์–ด์ง„ ๋‘ ๋‚ ์งœ ์‚ฌ์ด์— ์ƒ์œ„ N๊ฐœ์˜ ์ธ๊ธฐ ์˜ํ™”๋ฅผ ๋ฐ˜ํ™˜

import datetime as dt import logging import json import os from airflow import DAG from airflow.operators.python import PythonOperator from collections import defaultdict, Counter from custom.hooks import MovielensHook class MovielensPopulatriyOperator: def __init__(self, conn_id, start_date, end_date, min_ratings=4, top_n=5, **kwargs): super().__init__(**kwargs) self._conn_id = conn_id self._start_date = start_date self._end_date = end_date self._min_ratings = min_ratings self._top_n = top_n def execute(self, context): with MovielensHook(self._conn_id) as hook: ratings = hook.get_ratings( start_date=self._start_date, end_date=self._end_date,) rating_sums = defaultdict(Counter) for rating in ratings: rating_sums[rating["movieId"]].update( count=1, rating=rating["rating"]) averages = { movie_id: (rating_counter["rating"]/rating_counter["count"], rating_counter["count"] ) for movie_id, rating_counter in rating_sums.items() if rating_counter["count"] >= self._min_ratings } return sorted( averages.items(), key=lambda x: x[1], reverse=True )[:self._top_n]
Python
๋ณต์‚ฌ

ย MovielensPopulatriyOperator์˜ ์ •ํ™•์„ฑ์„ ์–ด๋–ป๊ฒŒ ํ…Œ์ŠคํŠธํ•ด์•ผํ• ๊นŒ?

โ†’ ์ฃผ์–ด์ง„ ๊ฐ’์œผ๋กœ ์˜คํผ๋ ˆ์ดํ„ฐ๋ฅผ ์‹คํ–‰ํ•˜๊ณ  ๊ฒฐ๊ณผ๊ฐ€ ์˜ˆ์ƒ๋Œ€๋กœ์ธ์ง€ ํ™•์ธ
โ€ข
์ด๋ฅผ ์œ„ํ•˜์—ฌ ๋ช‡ ๊ฐ€์ง€ pytest ์ปดํฌ๋„ŒํŠธ๋“ค์ด ํ•„์š”ํ•จ

9.1.4 Pytest ํ”„๋กœ์ ํŠธ ๊ตฌ์„ฑํ•˜๊ธฐ

โ€ข
BashOperator๋ฅผ ํ…Œ์ŠคํŠธํ•˜๋Š” ์˜ˆ์ œ
def test_example(): task = BashOperator( task_id="test", bash_command="echo 'hello!'", xcom_push=True, ) result = task.execute(context={}) assert result == "hello!"
Python
๋ณต์‚ฌ
โ€ข
MovielensPopularityOperator์— ์ ์šฉ
def test_movielenspopularityoperator(mocker): task = MovielensPopularityOperator( task_id="test_id", # conn_id="testconn", # ํ•„์ˆ˜ ์ธ์ž์ด๋ฏ€๋กœ ์—†์• ๋ฉด test์‹œ ์—๋Ÿฌ ๋ฐœ์ƒ start_date="2015-01-01", end_date="2015-01-03", top_n=5, ) result = task.execute(context=None) assert len(result) == 5
Python
๋ณต์‚ฌ

ย ํ…Œ์ŠคํŠธ ๊ฒฐ๊ณผ๊ฐ’์„ ๋‹ค๋ฅธ ์ชฝ์— ์ „๋‹ฌํ•  ์ˆ˜ ์žˆ์„๊นŒ?

โ€ข
ํ…Œ์ŠคํŠธ๋Š” ๊ฒฉ๋ฆฌ๋œ ํ™˜๊ฒฝ์—์„œ ์ง„ํ–‰๋˜๊ธฐ ๋•Œ๋ฌธ์— ์–ด๋–ค ํ…Œ์ŠคํŠธ์˜ ๊ฒฐ๊ณผ๋Š” ๋‹ค๋ฅธ ํ…Œ์ŠคํŠธ์— ์˜ํ–ฅ์„ ์ค„ ์ˆ˜ ์—†๋‹ค.
โ€ข
๋”ฐ๋ผ์„œ ํ…Œ์ŠคํŠธ ์‚ฌ์ด์— ๋ฐœ์ƒํ•˜๋Š” ์ •๋ณด๋ฅผ ๋ชฉ์—…(Mockup)์„ ์ด์šฉํ•˜์—ฌ ์ „๋‹ฌํ•œ๋‹ค.

๋ชฉ์—…์ด๋ž€

ํŠน์ • ์ž‘์—…์ด๋‚˜ ๊ฐ์ฒด๋ฅผ ๋ชจ์กฐ๋กœ ๋งŒ๋“œ๋Š” ๊ฒƒ
์‹ค์ œ ์šด์˜ ํ™˜๊ฒฝ์—์„œ๋Š” ์˜ˆ์ƒ๋˜์ง€๋งŒ ํ…Œ์ŠคํŠธ ์ค‘์—๋Š” ๋ฐœ์ƒํ•˜์ง€ ์•Š๋Š” ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ๋Œ€ํ•œ ํ˜ธ์ถœ์„ ์‹ค์ œ๋กœ ๋ฐœ์ƒ์‹œํ‚ค์ง€ ์•Š๊ณ  ํŠน์ • ๊ฐ’์„ ๋ฐ˜ํ™˜ํ•˜๋„๋ก ํŒŒ์ด์ฌ ์ฝ”๋“œ๋กœ ์ง€์‹œํ•˜์—ฌ ์ž„์˜์˜ ๊ฐ’์„ ์ „๋‹ฌํ•จ
โ€ข
์™ธ๋ถ€ ์‹œ์Šคํ…œ์ด ์‹ค์ œ๋กœ ์—ฐ๊ฒฐํ•˜์ง€ ์•Š๊ณ ๋„ ํ…Œ์ŠคํŠธ๋ฅผ ๊ฐœ๋ฐœํ•˜๊ณ  ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ์Œ
def test_movielenspopularityoperator(mocker): mock_get = mocker.patch.object( MovielensHook, # BaseHook์œผ๋กœ ํ•˜๋ฉด ์•ˆ ๋จ(์ •์˜ ์œ„์น˜ x ํ˜ธ์ถœ ์œ„์น˜ o) "get_connection", return_value=Connection( conn_id="test", login="airflow", password="airflow"), ) task = MovielensPopularityOperator( task_id="test_id", conn_id="test", start_date="2015-01-01", end_date="2015-01-03", top_n=5, ) result = task.execute(context=None) assert len(result) == 5 assert mock_get.call_count == 1 # ํ•œ ๋ฒˆ๋งŒ ํ˜ธ์ถœ๋œ ๊ฒƒ์ธ์ง€ ํ™•์ธ mock_get.assert_called_with("test") # ์˜ˆ์ƒ๋˜๋Š” conn_id๋กœ ๊ฒ€์ฆ
Python
๋ณต์‚ฌ
cd /opt/airflow/dags/Chap09/tests/dags/chapter9/custom pytest test_operators.py
Shell
๋ณต์‚ฌ
Test log

9.1.5 ๋””์Šคํฌ ํŒŒ์ผ๋กœ ํ…Œ์ŠคํŠธํ•˜๊ธฐ

JSON ๋ฆฌ์ŠคํŠธ๋ฅผ ๊ฐ€์ง„ ํŒŒ์ผ์„ ์ฝ๊ณ  ์ด๋ฅผ CSV ํ˜•์‹์œผ๋กœ ์“ฐ๋Š” ์˜คํผ๋ ˆ์ดํ„ฐ!

Operator

class JsonToCsvOperator(BaseOperator): def __init__(self,input_path, output_path, **kwargs): super().__init__(**kwargs) self._input_path = input_path self._output_path = output_path def execute(self, context): with open(self._input_path, "r") as json_file: data=json.load(json_file) columns={key for row in data for key in row.keys()} with open(self._output_path, "w") as csv_file: writer=csv.DictWriter(csv_file, fieldnames=columns) writer.writeheader() writer.writerows(data)
Python
๋ณต์‚ฌ

Test

โ€ข
ํ”ฝ์Šค์ฒ˜(fixture): ์„ ์–ธํ•œย ํ•จ์ˆ˜๋ฅผย ํ…Œ์ŠคํŠธย ํ•จ์ˆ˜์—์„œย ์ธ์ž๋กœย ๋ฐ›์•„ย ์‚ฌ์šฉํ• ย ์ˆ˜ย ์žˆ๊ฒŒย ํ•ด์ฃผ๋Š”ย ๊ธฐ๋Šฅ(ex. tmp_path ์ธ์ˆ˜)
1) ํ…Œ์ŠคํŠธ ํ•จ์ˆ˜ A๋ฅผ ์ž‘์„ฑํ•˜๊ณ 
2) A์— fixture ๋ฅผ ์„ ์–ธํ•˜๋ฉด,
3) ๋‹ค๋ฅธ ํ…Œ์ŠคํŠธ ํ•จ์ˆ˜์—์„œ A์˜ ๊ฒฐ๊ณผ๋ฅผ ๋ฐ›์•„์™€ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.
def test_json_to_csv_operator(tmp_path: Path): print(tmp_path.as_posix()) input_path = tmp_path / "input.json" output_path = tmp_path / "output.csv" # Write input data to tmp path input_data = [ {"name": "bob", "age": "41", "sex": "M"}, {"name": "alice", "age": "24", "sex": "F"}, {"name": "carol", "age": "60", "sex": "F"}, ] with open(input_path, "w") as f: f.write(json.dumps(input_data)) # Run task operator = JsonToCsvOperator( task_id="test", input_path=input_path, output_path=output_path ) operator.execute(context={}) # Read result with open(output_path, "r") as f: reader = csv.DictReader(f) result = [dict(row) for row in reader] # Assert assert result == input_data
Python
๋ณต์‚ฌ
cd /opt/airflow/dags/Chap09/tests/airflowbook/operators pytest test_json_to_csv_operator.py
Shell
๋ณต์‚ฌ
Test log

ํŒŒ์ผ ์ด๋ฆ„์ด static์ด๋ผ๋‹ˆ.. ๋„ˆ๋ฌด dumb ํ•ด์š”!

โ†’ ๋‹ค์Œ ์„น์…˜์„ ๋Œ€๊ธฐํ•˜์„ธ์š”

9.2 ํ…Œ์ŠคํŠธ์—์„œ DAG ๋ฐ ํ…Œ์Šคํฌ ์ฝ˜ํ…์ŠคํŠธ๋กœ ์ž‘์—…ํ•˜๊ธฐ

โ€ข
์˜คํผ๋ ˆ์ดํ„ฐ๋Š” ๋ณดํ†ต ์‹คํ–‰์„ ์œ„ํ•ด ๋” ๋งŽ์€ ์ฝ˜ํ…์ŠคํŠธ(๋ณ€์ˆ˜ ํ…œํ”Œ๋ฆฟ) ๋˜๋Š” ์ž‘์—… ์ธ์Šคํ„ด์Šค ์ฝ˜ํ…์ŠคํŠธ๊ฐ€ ํ•„์š”ํ•˜๋‹ค
Task๊ฐ€ ์‹คํ–‰๋  ๋•Œ ์ˆ˜ํ–‰๋˜๋Š” ๋‹จ๊ณ„
โ€ข
operator.execute(context={}) ํ˜•ํƒœ๋ณด๋‹ค ๋” ์‹ค์ œ ์‹œ์Šคํ…œ์— ๊ฐ€๊น๊ฒŒ Task๋ฅผ ์‹คํ–‰ํ•ด์•ผํ•œ๋‹ค
โ€ข
์œ„ ๊ทธ๋ฆผ์—์„œ output_path ์ธ์ˆ˜๋ฅผ /output/{{ ds }}.json ํ˜•ํƒœ๋กœ ์ œ๊ณตํ•  ์ˆ˜ ์žˆ์ง€๋งŒ context={}์œผ๋กœ ํ…Œ์ŠคํŠธํ•˜๋ฉด ds ๋ณ€์ˆ˜๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์—†๋‹ค.
โ‡’ ๋”ฐ๋ผ์„œ airflow ์ž์ฒด์—์„œ Task๋ฅผ ์‹œ์ž‘ํ•  ๋•Œ ์‚ฌ์šฉํ•˜๋Š” ์‹ค์ œ ๋ฉ”์„œ๋“œ์ธ operator.run()์„ ํ˜ธ์ถœํ•จ
โ€ข
๊ธฐ๋ณธ DAG
dag = DAG( "test_dag", default_args={"owner": "airflow", "start_date": datetime.datetime(2019, 1, 1)}, schedule_interval="@daily", )
Python
๋ณต์‚ฌ
โ€ข
ํ…œํ”Œ๋ฆฟ ๋ณ€์ˆ˜๋ฅผ ํ• ๋‹นํ•˜๊ธฐ ์œ„ํ•ด DAG๋กœ ํ…Œ์ŠคํŠธ
def test_movielens_operator(tmp_path: Path, mocker: MockFixture): mocker.patch.object( MovielensHook, "get_connection", return_value=Connection(conn_id="test", login="airflow", password="airflow"), ) dag = DAG( "test_dag", default_args={"owner": "airflow", "start_date": datetime.datetime(2019, 1, 1)}, schedule_interval="@daily", ) task = MovielensDownloadOperator( task_id="test", conn_id="testconn", start_date="{{ prev_ds }}", end_date="{{ ds }}", output_path=str(tmp_path / "{{ ds }}.json"), dag=dag, ) dag.clear() task.run( start_date=dag.default_args["start_date"], end_date=dag.default_args["start_date"], ignore_ti_state=True, )
Python
๋ณต์‚ฌ

์žฌ์‚ฌ์šฉ

DAG๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ํ…Œ์ŠคํŠธ๊ฐ€ ์—ฌ๋Ÿฌ๋ฒˆ ์žˆ๋Š” ๊ฒฝ์šฐ Pytest์™€ ํ•จ๊ป˜ ์žฌ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค
โ€ข
conftest.py ์‚ฌ์šฉํ•˜์—ฌ (ํ•˜์œ„)๋””๋ ‰ํ† ๋ฆฌ์˜ ์—ฌ๋Ÿฌ ํŒŒ์ผ ์žฌ์‚ฌ์šฉ
@pytest.fixture def test_dag(): return DAG( "test_dag", default_args={"owner": "airflow", "start_date": datetime.datetime(2015, 1, 1)}, schedule_interval="@daily", )
Python
๋ณต์‚ฌ
โ€ข
ํ…Œ์ŠคํŠธ์— fixture ํฌํ•จํ•˜์—ฌ ํ•„์š” ๊ฐ์ฒด ์ƒ์„ฑ
โ—ฆ
task.run(): DAG์˜ schedule_interval์— ๋”ฐ๋ผ ์ง€์ •๋œ ๋‘ ๋‚ ์งœ ์‚ฌ์ด์— ์‹คํ–‰ํ•  Task์˜ ์ธ์Šคํ„ด์Šค ์Šค์ผ€์ค„ ์ฃผ๊ธฐ ๊ณ„์‚ฐ
def test_movielens_operator(tmp_path: Path, mocker: MockFixture): mocker.patch.object( MovielensHook, "get_connection", return_value=Connection( conn_id="test", login="airflow", password="airflow"), ) dag = DAG( "test_dag", default_args={"owner": "airflow", "start_date": datetime.datetime(2019, 1, 1)}, schedule_interval="@daily", ) task = MovielensDownloadOperator( task_id="test", conn_id="testconn", start_date="{{ prev_ds }}", end_date="{{ ds }}", output_path=str(tmp_path / "{{ ds }}.json"), dag=dag, ) dag.clear() task.run( start_date=dag.default_args["start_date"], end_date=dag.default_args["start_date"], ignore_ti_state=True, )
Python
๋ณต์‚ฌ

9.2.1 ์™ธ๋ถ€ ์‹œ์Šคํ…œ ์ž‘์—…

MovieLens ํ‰์ ์„ ์ฝ๊ณ  ๊ฒฐ๊ณผ๋ฅผ Postgres ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ์ €์žฅํ•˜๋Š” ์˜คํผ๋ ˆ์ดํ„ฐ๋ฅผ ๋งŒ๋“ค์–ด๋ณด์ž.
class MovielensToPostgresOperator(BaseOperator): template_fields=("_start_date", "_end_date", "_insert_query") def __init__(self,movielens_conn_id, start_date, end_date, insert_query, postgres_conn_id, **kwargs): super().__init__(**kwargs) self._movielens_conn_id = movielens_conn_id self._start_date = start_date self._end_date = end_date self._insert_query = insert_query self._postgres_conn_id = postgres_conn_id def execute(self, context): with MovielensHook(self._movielens_conn_id) as movielens_hook: ratings=list(movielens_hook.get_ratings(start_date=self._start_date, end_date=self._end_date)) postgres_hook=PostgresHook(postgres_conn_id=self._postgres_conn_id) insert_queries=[ self._insert_query.format(",".join([str(_[1]) for _ in sorted(rating.items())]) ) for rating in ratings ] postgres_hook.run(insert_queries)
Python
๋ณต์‚ฌ
_insert_query.format(",".join([str(_[1]) for _ in sorted(rating.items())])

ํ…Œ์ŠคํŠธ ๋‹จ๊ณ„์—์„œ Postgres ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ์ ‘๊ทผํ•  ์ˆ˜ ์—†๋‹ค๋ฉด?

โ€ข
pytest์—์„œ ํ…Œ์ŠคํŠธ๋ฅผ ํ•  ์ˆ˜ ์žˆ๋„๋ก ๋„์ปค ์ปจํ…Œ์ด๋„ˆ์˜ ์ œ์–ด๊ฐ€ ๊ฐ€๋Šฅํ•œ ์ด๋Ÿฐ์ €๋Ÿฐ ํŒŒ์ด์ฌ ํŒจํ‚ค์ง€๋“ค์ด ์žˆ๋‹ค.
โ—ฆ
pytest-docker-tools
โ—ฆ
ํ…Œ์ŠคํŠธ ๋„์ปค ์ปจํ…Œ์ด๋„ˆ๋ฅผ ์ƒ์„ฑํ•  ์ˆ˜ ์žˆ๋Š” ์—ฌ๋Ÿฌ ํ•จ์ˆ˜ ์ œ๊ณต
โ†’ ๋„์ปค ์ปจํ…Œ์ด๋„ˆ๋กœ ํ…Œ์ŠคํŠธํ•˜๋ฉด ๋ชฉ์—… ํ™˜๊ฒฝ์„ ๊ตฌ์„ฑํ•  ํ•„์š” ์—†์ด ์‹ค์ œ ํ›… ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค!

ํ…Œ์ŠคํŠธ

postgres_image = fetch(repository="postgres:11.1-alpine") postgres = container( image="{postgres_image.id}", environment={"POSTGRES_USER": "testuser", "POSTGRES_PASSWORD": "testpass"}, ports={"5432/tcp": None}, # ์ปจํ…Œ์ด๋„ˆ์˜ 5432 ํฌํŠธ๋ฅผ ํ˜ธ์ŠคํŠธ์˜ ์ž„์˜์˜ ์—ด๋ฆฐ ํฌํŠธ(None)์œผ๋กœ ๋งคํ•‘ volumes={ os.path.join(os.path.dirname(__file__), "postgres-init.sql"): { "bind": "/docker-entrypoint-initdb.d/postgres-init.sql" } }, ) def test_movielens_to_postgres_operator(mocker, test_dag, postgres): mocker.patch.object( MovielensHook, "get_connection", return_value=Connection( conn_id="test", login="airflow", password="airflow"), ) mocker.patch.object( PostgresHook, "get_connection", return_value=Connection( conn_id="postgres", conn_type="postgres", host="localhost", login="testuser", password="testpass", port=postgres.ports["5432/tcp"][0], # ํ˜ธ์ŠคํŠธ์— ํ• ๋‹น๋œ ํฌํŠธ ), ) task = MovielensToPostgresOperator( task_id="test", movielens_conn_id="movielens_id", start_date="{{ prev_ds }}", end_date="{{ ds }}", postgres_conn_id="postgres_id", insert_query=( "INSERT INTO movielens (movieId,rating,ratingTimestamp,userId,scrapeTime) " "VALUES ({0}, '{{ macros.datetime.now() }}')" ), dag=test_dag, ) pg_hook = PostgresHook() row_count = pg_hook.get_first("SELECT COUNT(*) FROM movielens")[0] assert row_count == 0 pytest.helpers.run_airflow_task(task, test_dag) row_count = pg_hook.get_first("SELECT COUNT(*) FROM movielens")[0] assert row_count > 0
Python
๋ณต์‚ฌ
โ€ข
pytest_docker_tools์˜ container ํ•จ์ˆ˜๋„ fixture์ด๊ธฐ ๋•Œ๋ฌธ์— ํ…Œ์ŠคํŠธ๋ฅผ ์œ„ํ•ด ์ธ์ˆ˜๋กœ ์ œ๊ณตํ•ด์•ผ ํ˜ธ์ถœํ•  ์ˆ˜ ์žˆ๋‹ค.
โ€ข
/docker-entrypoint-initdb.d ๋””๋ ‰ํ„ฐ๋ฆฌ์— ๋ฐฐ์น˜ํ•˜์—ฌ ์‹œ์ž‘ ์Šคํฌ๋ฆฝํŠธ๋กœ ์ปจํ…Œ์ด๋„ˆ๋ฅผ ์ดˆ๊ธฐํ™”ํ•  ์ˆ˜ ์žˆ์Œ(์ปจํ…Œ์ด๋„ˆ ๋ถ€ํŒ… ์‹œ ์‹คํ–‰)
cd /opt/airflow/dags/Chap09/tests/airflowbook/operators pytest test_movielens_operator_postgres.py
Shell
๋ณต์‚ฌ
โ€ข
์šฐ๋ฆฌ ํ™˜๊ฒฝ์€ docker ๋‚ด๋ถ€ airflow์ด๋ฏ€๋กœ ์ œ๋Œ€๋กœ ์‹คํ–‰ํ•˜๊ธฐ ์œ„ํ•ด์„œ DinD ํŒจํ„ด ๊ตฌํ˜„์ด ํ•„์š”ํ•จ
โ—ฆ
docker-compose.yaml์˜ volumes์— ๋กœ์ปฌ์˜ docker.sock mountํ•ด์„œ ์†Œ์ผ“์„ ๊ณต์œ ํ•˜๋Š” ๋ฐฉ์‹์œผ๋กœ ์ง„ํ–‰
Test log

9.3 ๊ฐœ๋ฐœ์„ ์œ„ํ•ด ํ…Œ์ŠคํŠธ ์‚ฌ์šฉํ•˜๊ธฐ

โ€ข
๋‹ค๋“ค ์ฉœ ์ฐ์–ด์„œ ๋””๋ฒ„๊น… ํ•ด๋ณด์…จ์ฃ ?
โ€ข
PyCharm, VSCode ๋“ฑ ์›ฌ๋งŒํ•œ IDE๋Š” ๋‹ค ๊ฐ€๋Šฅํ•ฉ๋‹ˆ๋‹ค
โ€ข
์ผ์ข…์˜ breakpoint๋ฅผ ๋งŒ๋“œ๋Š” ๊ฑด๋ฐ, ํ•ด๋‹น ์ฝ”๋“œ ๋ผ์ธ๊นŒ์ง€ ๋„๋‹ฌํ•˜๋ฉด
1.
๋ณ€์ˆ˜์˜ ํ˜„์žฌ ์ƒํƒœ(๊ฐ’)
2.
ํ•ด๋‹น ์‹œ์ ์—์„œ ์ฝ”๋“œ ์‹คํ–‰ ๊ฐ€๋Šฅ(๋‹ค์Œ ์ค„, โ€ฆ)
โ€ข
IDE์—์„œ ์‚ฌ์šฉํ•  ์ˆ˜ ์—†๋Š” ๊ฒฝ์šฐ pdb(Python Debugger)๋ผ๋Š” ๋‚ด์žฅ ๋””๋ฒ„๊ฑฐ๊ฐ€ ์žˆ์–ด์„œ ๋””๋ฒ„๊ทธํ•˜๋ ค๋Š” ์œ„์น˜์— ๋‹ค์Œ ์ฝ”๋“œ ๋ผ์ธ์„ ์ถ”๊ฐ€ํ•˜๋ฉด ๋จ
import pdb; pdb.set_trace()
Python
๋ณต์‚ฌ
โ€ข
airflow์— ์ ์šฉํ•˜๊ธฐ ์œ„ํ•ด์„œ๋Š” airflow tasks test๋ฅผ ์ด์šฉํ•˜์—ฌ ๋ฉ”ํƒ€์Šคํ† ์–ด์— ์‹คํ–‰ ๊ธฐ๋ก์„ ๋“ฑ๋กํ•˜์ง€ ์•Š๊ณ  Task๋ฅผ ์‹คํ–‰ํ•˜์—ฌ ํ…Œ์ŠคํŠธ!
airflow tasks test [dagid] [taskid] [execution date]
Python
๋ณต์‚ฌ

9.3.1 DAG ์™„๋ฃŒ ํ…Œ์ŠคํŠธํ•˜๊ธฐ

โ€ข
Task ์ธ์Šคํ„ด์Šค ์ฝ˜ํ…์ŠคํŠธ๋ฅผ ํฌํ•จํ•œ ํ…Œ์ŠคํŠธ, ๋กœ์ปฌ ํŒŒ์ผ ์‹œ์Šคํ…œ์„ ์ด์š”ํ•œ ์˜คํผ๋ ˆ์ดํ„ฐ, ๋„์ปค๋ฅผ ์ด์šฉํ•œ ์™ธ๋ถ€ ์‹œ์Šคํ…œ์„ ์‚ฌ์šฉํ•˜๋Š” ์˜คํผ๋ ˆ์ดํ„ฐ ํ…Œ์ŠคํŠธ ๋“ฑ๋“ฑโ€ฆ
โ€ข
์ด๋Š” ๋ชจ๋‘ ๋‹จ์ผ ์˜คํผ๋ ˆ์ดํ„ฐ ํ…Œ์ŠคํŠธ์— ์ค‘์ ์„ ๋‘์ง€๋งŒโ€ฆ

ย ์›Œํฌํ”Œ๋กœ ๊ฐœ๋ฐœ์—์„œ ๊ฐ€์žฅ ์ค‘์š”ํ•œ ๊ฒƒ์€ ๋ชจ๋“  ๊ตฌ์„ฑ ์š”๊ฑด์ด ์ž˜ ๋งž๋Š”์ง€ ํ™•์ธํ•˜๋Š” ๊ฒƒ ์•„๋‹Œ๊ฐ€์š”?

โ€ข
๊ทธ๋Ÿฌ๋‚˜ ์ด๊ฒƒ์€ ๋Œ€๋‹ตํ•˜๊ธฐ ์–ด๋ ต์Šต๋‹ˆ๋‹คโ€ฆ
โ€ข
๊ฐœ์ธ ์ •๋ณด ๋ณดํ˜ธ ๊ทœ์ •์ด๋‚˜ ๋ฐ์ดํ„ฐ ํฌ๊ธฐ๋กœ ์ธํ•ด ๊ฐœ๋ฐœ ํ™˜๊ฒฝ์—์„œ ์™„๋ฒฝํ•œ ํ”„๋กœ๋•์…˜ ํ™˜๊ฒฝ์„ ์‹œ๋ฎฌ๋ ˆ์ด์…˜ํ•˜๊ธฐ๋Š” ์–ด๋ ค์›€(๋ฐ์ดํ„ฐ๊ฐ€ ํŽ˜ํƒ€๋ฐ”์ดํŠธ ๋‹จ์œ„๋ผ๋ฉด..?)
โ€ข
๋”ฐ๋ผ์„œ sw๋ฅผ ๊ฐœ๋ฐœํ•˜๊ณ  ๊ฒ€์ฆํ•˜๋Š” ๋ฐ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š” ๊ฐ€๋Šฅํ•œ ํ•œ ๊ตฌํ˜„ํ•  ์ˆ˜ ์žˆ๋Š” ํ”„๋กœ๋•์…˜ ํ™˜๊ฒฝ์„ ๋งŒ๋“ค์–ด์„œ ํ…Œ์ŠคํŠธํ•จ

9.4 Whirl์„ ์ด์šฉํ•œ ํ”„๋กœ๋•์…˜ ํ™˜๊ฒฝ ์—๋ฎฌ๋ ˆ์ด์…˜

โ€ข
๋„์ปค์—์„œ ์šด์˜ ํ™˜๊ฒฝ์˜ ๋ชจ๋“  ๊ตฌ์„ฑ ์š”์†Œ๋ฅผ ์‹œ๋ฎฌ๋ ˆ์ด์…˜
โ€ข
๋„์ปค ์ปดํฌ์ฆˆ๋กœ ๊ตฌ์„ฑ ์š”์†Œ ๊ด€๋ฆฌ
โ€ข
๊ทธ๋Ÿฌ๋‚˜ ๋ชจ๋“  ๊ฒƒ์ด ๋„์ปค ์ด๋ฏธ์ง€๋กœ ์ œ๊ณต๋˜์ง€๋Š” ์•Š๋Š”๋‹ค๋Š” ๋‹จ์ ์ด ์žˆ์Œ

9.5 DTAP ํ™˜๊ฒฝ ์ƒ์„ฑ

DTAP๋ž€?

Development, Test, Acceptance, Production์„ ๋ถ„๋ฆฌํ•˜๋Š” ์‹œ์Šคํ…œ
๊ฒฉ๋ฆฌ๋œ DTAP ํ™˜๊ฒฝ์„ ์ƒˆ๋กœ ์„ค์ •, ๊ตฌํ˜„ํ•˜์—ฌ ํ…Œ์ŠคํŠธ (์‹ค์ œ๋กœ ๊ฐ€๋Šฅํ• ๋Ÿฐ์ง€๋Š”โ€ฆ)

Github Action ๋ง›๋ณด๊ธฐ

โ€ข
์ฑ…์—๋Š” ์ด๋ก ๋งŒ ์žˆ์ง€๋งŒโ€ฆ

์ฑ…์— ์žˆ๋˜ ๊ฑฐ

โ€ข
์ด๊ฑด ํ˜„์‹ค์ ์œผ๋กœ ์ง€๊ธˆ ์šฐ๋ฆฌ๊ฐ€ ํ•˜๊ธฐ ์–ด๋ ต์Šต๋‹ˆ๋‹คโ€ฆ
name: Python static checks and tests for dags on: push: paths: - "dags/Chap09/**" branches: - "main" jobs: test-and-check: runs-on: ubuntu-latest steps: - name: Checkout uses: actions/checkout@v2 - name: Setup Python uses: actions/setup-python@v2 with: python-version: "3.6.9" - name: Install Flake8 run: pip install flake8 - name: Run Flake8 run: flake8 - name: Install Pylint run: pip install pylint - name: Run Pylint run: find dags/Chap09/tests -name "*.py" | xargs pylint --output-format=colorized - name: Install Black run: pip install black - name: Run Black run: find dags/Chap09/tests -name "*.py" | xargs black --check - name: Install dependencies run: pip install apache-airflow pytest - name: Test DAG integrity run: pytest dags/Chap09/tests
YAML
๋ณต์‚ฌ

Black

โ€ข
์ฝ”๋“œ ์Šคํƒ€์ผ ํ†ต์ผํ•˜๊ธฐ
โ€ข
๋‹ค๋“ค ์‚ฌ์šฉํ•˜๋Š” ์ฝ”๋“œ ํฌ๋งทํ„ฐ๋„ ๋‹ค๋ฅด๊ณ (์•„์˜ˆ ์•ˆ ์“ฐ์‹ค ์ˆ˜๋„ ์žˆ๊ณ ) ํ•ด์„œ ํŒจ์Šค

Pylint

โ€ข
๋ฆฐํŠธ(lint) ๋˜๋Š”ย ๋ฆฐํ„ฐ(linter)๋ผ๊ณ  ๋ถ€๋ฆ„
โ€ข
์—๋Ÿฌ, ์Šคํƒ€์ผ ๋“ฑ ๊ฒ€์‚ฌ
[Python]Pylint๋กœ ํŒŒ์ด์ฌ ์ฝ”๋“œ ํ’ˆ์งˆ ๊ด€๋ฆฌํ•˜๊ธฐ
PylintํŒŒ์ด์ฌ ๊ฐœ๋ฐœ์ž๋“ค์ด ๊ฐ€์žฅ ์ˆ˜์›”ํ•˜๊ฒŒ ์ฝ”๋”ฉ์„ ํ•  ์ˆ˜ ์žˆ๋„๋ก ๋„์™€์ฃผ๋Š” ๋„๊ตฌ ์ค‘ ํ•˜๋‚˜๊ฐ€ ๋ฐ”๋กœ ์ •์  ๋ถ„์„ ๋„๊ตฌ์ด๋‹ค. Pylint๋Š” ํŒŒ์ด์ฌ ์†Œ์Šค ์ฝ”๋“œ์—์„œ ์—๋Ÿฌ, ๋ฒ„๊ทธ, ์Šคํƒ€์ผ ๋ฌธ์ œ ๋“ฑ์„ ๊ฐ์ง€ํ•˜๋Š” ์ธ๊ธฐ ์žˆ๋Š” ์ •์  ๋ถ„์„ ๋„๊ตฌ ์ค‘ ํ•˜๋‚˜์ด๋‹ค. ์ด ๊ธ€์—์„œ๋Š” Pylint๋ฅผ ์„ค์น˜ํ•˜๊ณ  ์‚ฌ์šฉํ•˜๋Š” ๋ฐฉ๋ฒ•, ๋‹ค์–‘ํ•œ ์˜ต์…˜ ๊ทธ๋ฆฌ๊ณ  ์‹ค์ œ ์‚ฌ์šฉ ์˜ˆ์‹œ๋ฅผ ํ†ตํ•ด Pylint๋กœ ํŒŒ์ด์ฌ ์ฝ”๋“œ์˜ ํ’ˆ์งˆ์„ ํšจ๊ณผ์ ์œผ๋กœ ๊ด€๋ฆฌํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ์•Œ์•„๋ณธ๋‹ค.Pylint ์„ค์น˜Pylint๋Š” pip๋ฅผ ์‚ฌ์šฉํ•ด ๊ฐ„ํŽธํ•˜๊ฒŒ ์„ค์น˜ํ•  ์ˆ˜ ์žˆ๋‹ค. ํ„ฐ๋ฏธ๋„์ด๋‚˜ ๋ช…๋ น ํ”„๋กฌํ”„ํŠธ์— ๋‹ค์Œ ๋ช…๋ น์–ด๋ฅผ ์ž…๋ ฅํ•˜์ž.pip install pylint Pylint ์‚ฌ์šฉ๋ฒ•Pylint๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์†Œ์Šค ์ฝ”๋“œ๋ฅผ ๊ฒ€์‚ฌํ•˜๋ ค๋ฉด, ์•„๋ž˜์™€ ๊ฐ™์€ ๋ช…๋ น์–ด๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.pylint myfile.py ๋ช…๋ น์„ ์‹คํ–‰ํ•˜๋ฉด, Pylin..

Flake8

โ€ข
pep8 ๊ธฐ๋ฐ˜ ์ฝ”๋”ฉ ์ปจ๋ฒค์…˜์„ ์ค€์ˆ˜
python flake8 ๊ธฐ๋ณธ ์‚ฌ์šฉ๋ฒ•
flake8์€ pep8 ์ฝ”๋”ฉ์ปจ๋ฒค์…˜์„ ์ค€์ˆ˜ํ•˜๋Š” Lint๋ฅผ ์œ„ํ•œ ํŒจํ‚ค์ง€์ด๋‹ค. ์ •์ ์œผ๋กœ ์ฝ”๋“œ๋ฅผ ๊ฒ€์‚ฌ๋ฅผ ํ•ด์ฃผ๋ฉด ์ˆ˜์ •์€ ํ•ด์ฃผ์ง€ ์•Š๋Š”๋‹ค. ์ˆ˜์ •์„ ํ•ด์ฃผ๋Š” ํŒจํ‚ค์ง€๋Š” black์œผ๋กœ ๋‹ค์Œ๋ฒˆ์— ๋‹ค๋ฃฐ ์˜ˆ์ •์ด๋‹ค. flake8 ์„ค์น˜ pip install flake8 flake ์‚ฌ์šฉ๋ฒ• ์•„๋ž˜์™€ ๊ฐ™์ด ํŒŒ์ผ๋ช…์„ ์ž…๋ ฅํ•˜๊ฑฐ๋‚˜ ํŒŒ์ผ๋ช…์„ ์ž…๋ ฅํ•˜์ง€ ์•Š์œผ๋ฉด ํด๋” ์ „์ฒด๋ฅผ ๊ฒ€์‚ฌํ•œ๋‹ค. flake8 # ํด๋” ์ „์ฒด ๊ฒ€์‚ฌ flake8 file_name.py # ํ•œ๊ฐœ์˜ ํŒŒ์ผ๋งŒ ๊ฒ€์‚ฌ flake8 ์„ค์ •ํŒŒ์ผ .flake8 file์„ ์ƒ์„ฑํ•ด์ฃผ์–ด ์•„๋ž˜์™€ ๊ฐ™์€ ๋‚ด์šฉ์„ ์ž…๋ ฅํ•˜๋ฉด ์ œ์™ธํ•  ํด๋”์™€ ํŒŒ์ผ์€ ์ œ์™ธ๋ฅผ ํ•˜๊ณ  ๋˜ํ•œ ๋ฌด์‹œํ•  ์—๋Ÿฌ๋ฉ”์‹œ์ง€๋„ ์„ค์ • ํ•  ์ˆ˜ ์žˆ๋”ฐ.. [flake8] exclude = .git, .gitignore, *.pot, *.py[co], __pycache_..

๋น„๊ต

ํŒŒ์ด์ฌ ์ฝ”๋“œ ์Šคํƒ€์ผ ๋ฆฌํŒฉํ† ๋ง ๋ชจ๋“ˆ ์ •๋ฆฌ(flake8, black, isort, pylint)
์•ˆ๋…•ํ•˜์„ธ์š”. ํŒŒ์ด์ฌ์—์„œ ์ฝ”๋“œ ๋ฆฌํŒฉํ† ๋ง์„ ๋„์™€์ฃผ๋Š” ๋Œ€ํ‘œ์ ์ธ ํˆด ๋ช‡ ๊ฐ€์ง€์— ๋Œ€ํ•˜์—ฌ ์‚ฌ์šฉ ์˜ˆ์‹œ๋ฅผ ์ค‘์‹ฌ์œผ๋กœ ์ •๋ฆฌ๋ฅผ ํ•ด๋ณด๋„๋ก ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค. ๋‹ค์Œ๊ณผ ๊ฐ™์€ main.py ํŒŒ์ผ์„ ๊ธฐ์ค€์œผ๋กœ ์‚ฌ์šฉ์„ ํ•ด๋ณด๋„๋ก ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค. import numpy as np import pandas as pd def ABc(a): b = np.array(a) c = b + 1 return b - 1 if __name__=='__main__': print(ABc([1, 2, 3])) pandas ๋ชจ๋“ˆ๊ณผ ๋ณ€์ˆ˜ c๊ฐ€ ๋ฏธ์‚ฌ์šฉ ๋˜์—ˆ์œผ๋ฉฐ, ์ ์ ˆํ•˜์ง€ ๋ชปํ•œ ์ค„๋ฐ”๊ฟˆ, ๋ฉ”์†Œ๋“œ๋ช…, operator ์‚ฌ์ด์— ๊ณต๋ฐฑ์ด ์—†๋Š” ํ˜„์ƒ ๋“ฑ์ด ํ˜„์žฌ ์ฝ”๋“œ ๋‚ด์— ์กด์žฌํ•˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค. 1. flake8 ๋ชจ๋“ˆ ์ฒซ ๋ฒˆ์งธ ์ฝ”๋“œ ์Šคํƒ€์ผ ๋ฆฌํŒฉํ† ๋ง ๋ชจ๋“ˆ์ธ flake8์„ ์„ค์น˜ํ•˜๊ณ  ์‚ฌ์šฉํ•ด๋ณด๋„๋ก ํ•˜๊ฒ ์Šต..

์‚ฌ์šฉ?

์ €ํฌ ํšŒ์‚ฌ๋Š” Python ๊ธฐ๋ฐ˜ ๋ชจ๋…ธ๋ ˆํฌ์ด๋‹ค๋ณด๋‹ˆ ์„ธ๊ฐ€์ง€๋ฅผ ์ „๋ถ€ ์‚ฌ์šฉํ•˜๊ณ  ์žˆ์Šด๋‹ˆ๋‹ค
โ€ข
๋ชจ๋…ธ๋ ˆํฌ?
โ—ฆ
์—ฌ๋Ÿฌ ๊ฐœ์˜ ํ”„๋กœ์ ํŠธ๋ฅผ ํ•œ ์ €์žฅ์†Œ์— ์ €์žฅํ•˜๋Š” sw ๊ฐœ๋ฐœ ์ „๋žต
โ–ช
์ค‘๋ณต๋˜๋Š” ์ฝ”๋“œ
โ–ช
์„œ๋กœ ๋‹ค๋ฅธ ํŒจํ‚ค์ง€ ์˜์กด์„ฑ
โ–ช
์„œ๋กœ ์˜์กดํ•˜๋Š” ํ”„๋กœ์ ํŠธ๋“ค๋ผ๋ฆฌ์˜ ๋ฆฌํŒฉํ„ฐ๋ง ๋น„์šฉ
โ–ช
์ฝ”๋“œ๊ฐ€ ์ €์žฅ์†Œ๋งˆ๋‹ค ์ƒํ™ฉ์ด ๋‹ค๋ฅด๊ธฐ์— ๊ฐœ๋ฐœ์ž๋“ค ์‚ฌ์ด์˜ ํ˜‘์—…๋„ ์–ด๋ ค์›Œ์ง€๋Š” ๋ฌธ์ œ
๋ชจ๋…ธ๋ ˆํฌ(monorepo) ๋ž€?
๋ชจ๋…ธ๋ ˆํฌ๊ฐ€ ๋ญ˜๊นŒ? ๋‘ ๊ฐœ ์ด์ƒ์˜ ํ”„๋กœ์ ํŠธ ์ฝ”๋“œ๋ฅผ ํ•˜๋‚˜์˜ ๋ฒ„์ „ ๊ด€๋ฆฌ ์ €์žฅ์†Œ(repository)์—์„œ ๊ด€๋ฆฌํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ๋งํ•ฉ๋‹ˆ๋‹ค. ๋ชจ๋…ธ๋ ˆํฌ๊ฐ€ ์™œ ๋‚˜์™”์„๊นŒ? ๋ชจ๋…ธ๋ ˆํฌ(mono repo) ๊ฐœ๋…์ด ์ฒ˜์Œ ๋“ฑ์žฅํ•œ ์ด์œ ๋Š” ์—ฌ๋Ÿฌ ๊ฐ€์ง€๊ฐ€ ์žˆ์ง€๋งŒ, ๋Œ€ํ‘œ์ ์ธ ์ด์œ  ์ค‘ ํ•˜๋‚˜๋Š” ํฐ ๊ทœ๋ชจ์˜ ์†Œํ”„ํŠธ์›จ์–ด ๊ฐœ๋ฐœ ํ”„๋กœ์ ํŠธ์—์„œ ๋ฐœ์ƒํ•˜๋Š” ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•˜๊ธฐ ์œ„ํ•ด์„œ์ž…๋‹ˆ๋‹ค. ํฐ ๊ทœ๋ชจ์˜ ํ”„๋กœ์ ํŠธ์—์„œ๋Š” ์—ฌ๋Ÿฌ ๊ฐœ์˜ ์ฝ”๋“œ ์ €์žฅ์†Œ๊ฐ€ ์ƒ์„ฑ๋ฉ๋‹ˆ๋‹ค. ํ‘œ๋ฉด์ ์œผ๋กœ ๋ดค์„ ๋•Œ๋Š” ํฐ ๋ฌธ์ œ๊ฐ€ ์—†์œผ๋‚˜ ๋‹ค์Œ๊ณผ ๊ฐ™์€ ๋ฌธ์ œ์ ๋“ค ๋•Œ๋ฌธ์— ํ”„๋กœ์ ํŠธ ๊ด€๋ฆฌ์— ์–ด๋ ค์›€์ด ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ค‘๋ณต๋˜๋Š” ์ฝ”๋“œ ์„œ๋กœ ๋‹ค๋ฅธ ํŒจํ‚ค์ง€ ์˜์กด์„ฑ ์„œ๋กœ ์˜์กดํ•˜๋Š” ํ”„๋กœ์ ํŠธ๋“ค๋ผ๋ฆฌ์˜ ๋ฆฌํŒฉํ„ฐ๋ง ๋น„์šฉ ์ฝ”๋“œ๊ฐ€ ์ €์žฅ์†Œ๋งˆ๋‹ค ์ƒํ™ฉ์ด ๋‹ค๋ฅด๊ธฐ์— ๊ฐœ๋ฐœ์ž๋“ค ์‚ฌ์ด์˜ ํ˜‘์—…๋„ ์–ด๋ ค์›Œ์ง€๋Š” ๋ฌธ์ œ ์ด๋Ÿฌํ•œ ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•˜๊ธฐ ์œ„ํ•ด ๋ชจ๋“  ํ”„๋กœ์ ํŠธ ์ฝ”๋“œ..

ํ…Œ์ŠคํŠธ

name: Python mini test for dags on: push: paths: - "dags/Chap09/**" branches: - "main" jobs: test-and-check: runs-on: ubuntu-latest steps: - name: Checkout uses: actions/checkout@v2 - name: Setup Python uses: actions/setup-python@v2 with: python-version: "3.6.9" - name: Install dependencies run: pip install apache-airflow pytest - name: Test DAG integrity run: cd dags/Chap09/tests/dags && pytest test_dag_integrity.py
YAML
๋ณต์‚ฌ
โ€ข
์ด๋ฏธ workflow์— ์ถ”๊ฐ€ํ•ด๋†จ์Šด๋‹ค
โ€ข
dags/Chap09 ๋ฐ‘์— ๋ณ€๊ฒฝ์‚ฌํ•ญ์ด ์ƒ๊ธธ ๋•Œ๋งˆ๋‹ค ์›Œํฌํ”Œ๋กœ์šฐ๊ฐ€ ํŠธ๋ฆฌ๊ฑฐ ๋ฉ๋‹ˆ๋‹ค

CI ๋‹จ๊ณ„

github action ์ „์šฉ pytest๋ฅผ ์ด์šฉํ•œ airflow integrity test

ubuntu - python ๊ธฐ๋ณธ ํ™˜๊ฒฝ์„ ๊ธฐ๋ฐ˜์œผ๋กœ git action yaml์„ ์ž‘์„ฑ.
์ด ๋•Œ, airflow๋ฅผ ์„ค์น˜ํ•œ ํ›„, airflow metadatabase๋ฅผ init ํ•ด์ค€๋‹ค
๊ทธ๋ฆฌ๊ณ  ํ•„์š”ํ•œ ์ปค๋„ฅ์…˜, variable๋“ค์„ importํ•  ์ˆ˜ ์žˆ๊ฒŒ ์Šคํฌ๋ฆฝํŠธ๋ฅผ ์ž‘์„ฑํ•ด์ค€ ํ›„,
ํ•ด๋‹น ์Šคํฌ๋ฆฝํŠธ๋ฅผ ๊นƒ ์•ก์…˜์—์„œ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋„๋ก ํ•œ๋‹ค
ํ•ด๋‹น ๋ถ€๋ถ„์€ ์ž์„ธํ•œ ์„ค๋ช…์€ ์ƒ๋žต.

Test ๋‹จ๊ณ„

์•„๋ž˜๋Š” pytest๋ฅผ ์ด์šฉํ•œ integrity test ๋‹จ๊ณ„
import pytest from airflow.models import DagBag @pytest.fixture(scope="session") def dag_bag(): dagbag = DagBag(dag_folder='dags', include_examples=False) return dagbag def test_dags_load_with_no_errors(dag_bag): print(dag_bag.dags) assert dag_bag.dags is not None assert len(dag_bag.import_errors) == 0 def test_dag_structure(dag_bag): for dag_id, dag in dag_bag.dags.items(): assert dag.tasks, f"DAG {dag_id} has no tasks" def test_schedule_interval(dag_bag): for dag_id, dag in dag_bag.dags.items(): assert dag.schedule_interval, f"DAG {dag_id} has no schedule_interval" def test_default_args(dag_bag): for dag_id, dag in dag_bag.dags.items(): assert dag.default_args, f"DAG {dag_id} has no default_args" # Further checks can be added here, e.g., checking for specific default arguments def test_task_dependencies(dag_bag): for dag_id, dag in dag_bag.dags.items(): for task in dag.tasks: assert task.downstream_list or task.upstream_list, f"Task {task.task_id} in DAG {dag_id} has no dependencies"
JavaScript
๋ณต์‚ฌ
dag์˜ ์ •ํ•ฉ์„ฑ์„ ์ฒดํฌํ•ด์ฃผ๋Š” ๋‹จ๊ณ„์ด๋‹ค.

์ฐธ๊ณ  ๋งํฌ

์ฐธ๊ณ  ๋™์˜์ƒ

์ฐธ๊ณ  ๋ธ”๋กœ๊ทธ

[Airflow] Test
์Šคํฌ๋ฆฝํŠธ ์‹คํ–‰ ๋งŒ๋“  ์Šคํฌ๋ฆฝํŠธ์— ๋ฌธ๋ฒ•์  ์˜ค๋ฅ˜๊ฐ€ ์—†๋Š”์ง€ ํ™•์ธ์„ ํ•˜๊ธฐ ์œ„ํ•ด ์Šคํฌ๋ฆฝํŠธ๋ฅผ ์‹คํ–‰ํ•ด ๋ด…๋‹ˆ๋‹ค. ๋งŒ์•ฝ ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ•˜์ง€ ์•Š์•˜๋‹ค๋ฉด ์–ด๋Š ์ •๋„ ๋ฌธ๋ฒ•์ ์ธ ์˜ค๋ฅ˜๊ฐ€ ์—†๋‹ค๋ผ๊ณ  ๋ณผ ์ˆ˜ ์žˆ์ง€๋งŒ ์Šคํฌ๋ฆฝํŠธ ํŠน์„ฑ ์ƒ, ํ˜ธ์ถœ๋˜์ง€ ์•Š์€ ํ•จ์ˆ˜ ๋‚ด์˜ ๋ฌธ๋ฒ•์  ์˜ค๋ฅ˜๋Š” ์กด์žฌํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๋ช…๋ น์–ด๋ฅผ ํ†ตํ•œ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ์œ ํšจ์„ฑ ๊ฒ€์‚ฌ ์•„๋ž˜ ๋ช…๋ น์–ด๋ฅผ ํ†ตํ•ด DAG ๋ฐ task๊ฐ€ ์˜๋„ํ•œ๋Œ€๋กœ ๋“ฑ๋ก๋˜์—ˆ๋Š”์ง€, ๊ตฌ์กฐ๊ฐ€ ์ž˜ ์žกํ˜”๋Š”์ง€ ํ™•์ธํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. # ๋“ฑ๋ก๋œ dag ๋ชฉ๋ก ์ถœ๋ ฅ $ airflow dags list dag_id | filepath | owner | paused ======================+==========================+===========+======= dag_decorator_test | temp.py | a..

์ฐธ๊ณ  ๊นƒํ—ˆ๋ธŒ

Airflow with TDD