ํ์คํฌ ๊ทธ๋ฃน
ํ์คํฌ ๊ทธ๋ฃนํ์ ํ์์ฑ
โข
ํ์คํฌ ์๊ฐ ๋ง์ DAG์ ๊ฒฝ์ฐ, ํ์คํฌ๋ค์ ์ฑ๊ฒฉ์ ๋ฐ๋ผ ๊ด๋ฆฌํ๊ณ ์ถ์ ์๊ตฌ๊ฐ ์กด์ฌํฉ๋๋ค.
โฆ
SubDAG๊ฐ ์ฌ์ฉ๋๋ ์ค, Airflow 2.0์์๋ Task Grouping์ด ๋ ๋ง์ด ์ฌ์ฉ๋๊ณ ์์ต๋๋ค.
โช
SubDAG๋ ๋น์ทํ ์ผ์ ํ๋ ํ์คํฌ๋ค์ SubDAG๋ผ๋ Child DAG๋ก ๋ง๋ค์ด์ ๊ด๋ฆฌํฉ๋๋ค.
โช
ํ์คํฌ ๊ทธ๋ฃน์ผ๋ก ๋ฌถ์ด๋ ๊ฒ๋ค์ ๋น์ทํ ์ฑ๊ฒฉ์ ์ผ์ ์ํํ๋ DAG์ผ ๊ฐ๋ฅ์ฑ์ด ๋๋ค
โข
๋ค์์ ํ์ผ์ ์ฒ๋ฆฌํ๋ DAG์ ๊ฒฝ์ฐ,
โฆ
ํ์ผ ๋ค์ด๋ก๋ ํ์คํฌ๋ค๊ณผ ํ์ผ ์ฒดํฌ ํ์คํฌ์ ๋ฐ์ดํฐ ์ฒ๋ฆฌ ํ์คํฌ๋ค๋ก ๊ตฌ์ฑ๋ฉ๋๋ค.
โข
์์ค ์ฝ๋
โฆ
Learn Task Groups
from airflow.utils.task_group import TaskGroup
start = EmptyOperator(task_id="start")
with TaskGroup("Download", tooltip="Tasks for downloading data") as section_1:
task_1 = EmptyOperator(task_id="task_1")
task_2 = BashOperator(task_id="task_2", bash_command='echo 1')
task_3 = EmptyOperator(task_id="task_3")
task_1 >> [task_2, task_3]
start >> section_1
Python
๋ณต์ฌ
โช
TaskGroup ์์ TaskGroup nesting ๊ฐ๋ฅ
โช
TaskGroup๋ ํ์คํฌ์ฒ๋ผ ์คํ ์์ ์ ์ ๊ฐ๋ฅ
โช
tooltip์ผ๋ก TaskGroup ๋ค์ด๋ฐ์ด ๊ฐ๋ฅํด์ง๋ค.
โช
/Learn_TaskGroups.py
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup
import pendulum
with DAG(dag_id="Learn_Task_Group", start_date=pendulum.today('UTC').add(days=-2), tags=["example"]) as dag:
start = EmptyOperator(task_id="start")
# Task Group #1
with TaskGroup("Download", tooltip="Tasks for downloading data") as section_1:
task_1 = EmptyOperator(task_id="task_1")
task_2 = BashOperator(task_id="task_2", bash_command='echo 1')
task_3 = EmptyOperator(task_id="task_3")
task_1 >> [task_2, task_3]
# Task Group #2
with TaskGroup("Process", tooltip="Tasks for processing data") as section_2:
task_1 = EmptyOperator(task_id="task_1")
with TaskGroup("inner_section_2", tooltip="Tasks for inner_section2") as inner_section_2:
task_2 = BashOperator(task_id="task_2", bash_command='echo 1')
task_3 = EmptyOperator(task_id="task_3")
task_4 = EmptyOperator(task_id="task_4")
[task_2, task_3] >> task_4
end = EmptyOperator(task_id='end')
start >> section_1 >> section_2 >> end
Python
๋ณต์ฌ
Dynamic Dag๋ ๋ฌด์์ธ๊ฐ?
Dynamic Dags๋ฅผ ์ฌ์ฉํด์ ์ฝ๋ ์ฌ์ฌ์ฉ์ ์ต๋ํํ ์ ์๋ค!
Jinja template์ YAML์ ์ฌ์ฉํด์ผ ํจ
โข
Dynamic Dag๋ ๋ฌด์์ธ๊ฐ?
โฆ
ํ
ํ๋ฆฟ๊ณผ YAML์ ๊ธฐ๋ฐ์ผ๋ก DAG๋ฅผ ๋์ ์ผ๋ก ๋ง๋ค์ด๋ณด์
โช
Jinja๋ฅผ ๊ธฐ๋ฐ์ผ๋ก DAG ์์ฒด์ ํ
ํ๋ฆฟ์ ๋์์ธํ๊ณ YAML์ ํตํด ์์ ๋ง๋ ํ
ํ๋ฆฟ์ ํ๋ผ๋ฏธํฐ๋ฅผ ์ ๊ณต
โฆ
์ด๋ฅผ ํตํด ๋น์ทํ DAG๋ฅผ ๊ณ์ํด์ ๋งค๋ด์ผํ๊ฒ ๊ฐ๋ฐํ๋ ๊ฒ์ ๋ฐฉ์ง
โฆ
DAG๋ฅผ ๊ณ์ํด์ ๋ง๋๋ ๊ฒ๊ณผ ํ DAG์์์ ํ์คํฌ๋ฅผ ๋๋ฆฌ๋ ๊ฒ ์ฌ์ด์ ๋ฐธ๋ฐ์ค ํ์
โช
์ค๋๊ฐ ๋ค๋ฅด๊ฑฐ๋ ํ์คํฌ์ ์๊ฐ ๋๋ฌด ์ปค์ง๋ ๊ฒฝ์ฐ DAG๋ฅผ ๋ณต์ ํด๋๊ฐ๋ ๊ฒ์ด ๋ ์ข์
โข
Dynamic Dag์ ๊ธฐ๋ณธ์ ์ธ ์์ด๋์ด
โข
๋ฐ๋์ด์ผ ํ๋ ํ๋ผ๋ฏธํฐ๋ง jinja ํ
ํ๋ฆฟ(templated_dag.jinja2)์ ํตํด ๋ฐ๊ฟ
โข
generator.py๋ฅผ ํตํด์ ํ
ํ๋ฆฟ์ ํตํด Dag ํ์ผ์ ์์ฑํด์ค๋ค.
โข
๊ฐ๋จํ ์์
โฆ
ํ
ํ๋ฆฟ์ ํตํ ์ต์ข
DAG ํ์ผ ์์ฑ
โช
$ python3 dags/dynamic_dags/generator.py
โฆ
์ด๋ dags ํด๋์ yml ํ์ผ์ ์ ๋งํผ์ DAG ์ฝ๋๋ฅผ ์์ฑํด์ค
โช
generator ์คํ์ ์ธ์ ํ ์ง ๊ฒฐ์ ์ด ํ์
โช
generator๋ฅผ ํตํด ์๋ get_price_XXXX.py DAG๋ฅผ ์์ฑ
โช
generator.py
from jinja2 import Environment, FileSystemLoader
import yaml
import os
file_dir = os.path.dirname(os.path.abspath(__file__))
env = Environment(loader=FileSystemLoader(file_dir))
template = env.get_template('templated_dag.jinja2')
for f in os.listdir(file_dir):
if f.endswith(".yml"):
with open(f"{file_dir}/{f}", "r") as cf:
config = yaml.safe_load(cf)
with open(f"dags/get_price_{config['dag_id']}.py", "w") as f:
f.write(template.render(config))
Python
๋ณต์ฌ
โช
templated_dag.jinja2
from airflow import DAG
from airflow.decorators import task
from datetime import datetime
with DAG(dag_id="get_price_{{ dag_id }}",
start_date=datetime(2023, 6, 15),
schedule='{{ schedule }}',
catchup={{ catchup or True }}) as dag:
@task
def extract(symbol):
return symbol
@task
def process(symbol):
return symbol
@task
def store(symbol):
return symbol
store(process(extract("{{ symbol }}")))
Python
๋ณต์ฌ
โช
config_appl.yml
dag_id: 'APPL'
schedule: '@daily'
catchup: False
symbol: 'APPL'
Python
๋ณต์ฌ
โช
config_goog.yml
dag_id: 'GOOG'
schedule: '@weekly'
symbol: 'GOOG'
Python
๋ณต์ฌ
catch up์ด ์์ผ๋ฉด True๋ก jinja์์ ์ธํ
ํด์ค๋ค.