Search

[Airflow] Task-Group & Dynamic Dags

ํƒœ์Šคํฌ ๊ทธ๋ฃน

ํƒœ์Šคํฌ ๊ทธ๋ฃนํ•‘์˜ ํ•„์š”์„ฑ

โ€ข
ํƒœ์Šคํฌ ์ˆ˜๊ฐ€ ๋งŽ์€ DAG์˜ ๊ฒฝ์šฐ, ํƒœ์Šคํฌ๋“ค์„ ์„ฑ๊ฒฉ์— ๋”ฐ๋ผ ๊ด€๋ฆฌํ•˜๊ณ  ์‹ถ์€ ์š”๊ตฌ๊ฐ€ ์กด์žฌํ•ฉ๋‹ˆ๋‹ค.
โ—ฆ
SubDAG๊ฐ€ ์‚ฌ์šฉ๋˜๋˜ ์ค‘, Airflow 2.0์—์„œ๋Š” Task Grouping์ด ๋” ๋งŽ์ด ์‚ฌ์šฉ๋˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค.
โ–ช
SubDAG๋Š” ๋น„์Šทํ•œ ์ผ์„ ํ•˜๋Š” ํƒœ์Šคํฌ๋“ค์„ SubDAG๋ผ๋Š” Child DAG๋กœ ๋งŒ๋“ค์–ด์„œ ๊ด€๋ฆฌํ•ฉ๋‹ˆ๋‹ค.
โ–ช
ํƒœ์Šคํฌ ๊ทธ๋ฃน์œผ๋กœ ๋ฌถ์ด๋Š” ๊ฒƒ๋“ค์€ ๋น„์Šทํ•œ ์„ฑ๊ฒฉ์˜ ์ผ์„ ์ˆ˜ํ–‰ํ•˜๋Š” DAG์ผ ๊ฐ€๋Šฅ์„ฑ์ด ๋†’๋‹ค
โ€ข
๋‹ค์ˆ˜์˜ ํŒŒ์ผ์„ ์ฒ˜๋ฆฌํ•˜๋Š” DAG์˜ ๊ฒฝ์šฐ,
โ—ฆ
ํŒŒ์ผ ๋‹ค์šด๋กœ๋“œ ํƒœ์Šคํฌ๋“ค๊ณผ ํŒŒ์ผ ์ฒดํฌ ํƒœ์Šคํฌ์™€ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ํƒœ์Šคํฌ๋“ค๋กœ ๊ตฌ์„ฑ๋ฉ๋‹ˆ๋‹ค.
โ€ข
์†Œ์Šค ์ฝ”๋“œ
โ—ฆ
Learn Task Groups
from airflow.utils.task_group import TaskGroup start = EmptyOperator(task_id="start") with TaskGroup("Download", tooltip="Tasks for downloading data") as section_1: task_1 = EmptyOperator(task_id="task_1") task_2 = BashOperator(task_id="task_2", bash_command='echo 1') task_3 = EmptyOperator(task_id="task_3") task_1 >> [task_2, task_3] start >> section_1
Python
๋ณต์‚ฌ
โ–ช
TaskGroup ์•ˆ์— TaskGroup nesting ๊ฐ€๋Šฅ
โ–ช
TaskGroup๋„ ํƒœ์Šคํฌ์ฒ˜๋Ÿผ ์‹คํ–‰ ์ˆœ์„œ ์ •์˜ ๊ฐ€๋Šฅ
โ–ช
tooltip์œผ๋กœ TaskGroup ๋„ค์ด๋ฐ์ด ๊ฐ€๋Šฅํ•ด์ง„๋‹ค.
โ–ช
/Learn_TaskGroups.py
from airflow.models.dag import DAG from airflow.operators.empty import EmptyOperator from airflow.operators.bash import BashOperator from airflow.utils.task_group import TaskGroup import pendulum with DAG(dag_id="Learn_Task_Group", start_date=pendulum.today('UTC').add(days=-2), tags=["example"]) as dag: start = EmptyOperator(task_id="start") # Task Group #1 with TaskGroup("Download", tooltip="Tasks for downloading data") as section_1: task_1 = EmptyOperator(task_id="task_1") task_2 = BashOperator(task_id="task_2", bash_command='echo 1') task_3 = EmptyOperator(task_id="task_3") task_1 >> [task_2, task_3] # Task Group #2 with TaskGroup("Process", tooltip="Tasks for processing data") as section_2: task_1 = EmptyOperator(task_id="task_1") with TaskGroup("inner_section_2", tooltip="Tasks for inner_section2") as inner_section_2: task_2 = BashOperator(task_id="task_2", bash_command='echo 1') task_3 = EmptyOperator(task_id="task_3") task_4 = EmptyOperator(task_id="task_4") [task_2, task_3] >> task_4 end = EmptyOperator(task_id='end') start >> section_1 >> section_2 >> end
Python
๋ณต์‚ฌ

Dynamic Dag๋ž€ ๋ฌด์—‡์ธ๊ฐ€?

Dynamic Dags๋ฅผ ์‚ฌ์šฉํ•ด์„œ ์ฝ”๋“œ ์žฌ์‚ฌ์šฉ์„ ์ตœ๋Œ€ํ™”ํ•  ์ˆ˜ ์žˆ๋‹ค! Jinja template์™€ YAML์„ ์‚ฌ์šฉํ•ด์•ผ ํ•จ
โ€ข
Dynamic Dag๋ž€ ๋ฌด์—‡์ธ๊ฐ€?
โ—ฆ
ํ…œํ”Œ๋ฆฟ๊ณผ YAML์„ ๊ธฐ๋ฐ˜์œผ๋กœ DAG๋ฅผ ๋™์ ์œผ๋กœ ๋งŒ๋“ค์–ด๋ณด์ž
โ–ช
Jinja๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ DAG ์ž์ฒด์˜ ํ…œํ”Œ๋ฆฟ์„ ๋””์ž์ธํ•˜๊ณ  YAML์„ ํ†ตํ•ด ์•ž์„œ ๋งŒ๋“  ํ…œํ”Œ๋ฆฟ์— ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ์ œ๊ณต
โ—ฆ
์ด๋ฅผ ํ†ตํ•ด ๋น„์Šทํ•œ DAG๋ฅผ ๊ณ„์†ํ•ด์„œ ๋งค๋‰ด์–ผํ•˜๊ฒŒ ๊ฐœ๋ฐœํ•˜๋Š” ๊ฒƒ์„ ๋ฐฉ์ง€
โ—ฆ
DAG๋ฅผ ๊ณ„์†ํ•ด์„œ ๋งŒ๋“œ๋Š” ๊ฒƒ๊ณผ ํ•œ DAG์•ˆ์—์„œ ํƒœ์Šคํฌ๋ฅผ ๋Š˜๋ฆฌ๋Š” ๊ฒƒ ์‚ฌ์ด์˜ ๋ฐธ๋Ÿฐ์Šค ํ•„์š”
โ–ช
์˜ค๋„ˆ๊ฐ€ ๋‹ค๋ฅด๊ฑฐ๋‚˜ ํƒœ์Šคํฌ์˜ ์ˆ˜๊ฐ€ ๋„ˆ๋ฌด ์ปค์ง€๋Š” ๊ฒฝ์šฐ DAG๋ฅผ ๋ณต์ œํ•ด๋‚˜๊ฐ€๋Š” ๊ฒƒ์ด ๋” ์ข‹์Œ
โ€ข
Dynamic Dag์˜ ๊ธฐ๋ณธ์ ์ธ ์•„์ด๋””์–ด
โ€ข
๋ฐ”๋€Œ์–ด์•ผ ํ•˜๋Š” ํŒŒ๋ผ๋ฏธํ„ฐ๋งŒ jinja ํ…œํ”Œ๋ฆฟ(templated_dag.jinja2)์„ ํ†ตํ•ด ๋ฐ”๊ฟˆ
โ€ข
generator.py๋ฅผ ํ†ตํ•ด์„œ ํ…œํ”Œ๋ฆฟ์„ ํ†ตํ•ด Dag ํŒŒ์ผ์„ ์ƒ์„ฑํ•ด์ค€๋‹ค.
โ€ข
๊ฐ„๋‹จํ•œ ์˜ˆ์ œ
โ—ฆ
ํ…œํ”Œ๋ฆฟ์„ ํ†ตํ•œ ์ตœ์ข… DAG ํŒŒ์ผ ์ƒ์„ฑ
โ–ช
$ python3 dags/dynamic_dags/generator.py
โ—ฆ
์ด๋Š” dags ํด๋”์— yml ํŒŒ์ผ์˜ ์ˆ˜ ๋งŒํผ์˜ DAG ์ฝ”๋“œ๋ฅผ ์ƒ์„ฑํ•ด์คŒ
โ–ช
generator ์‹คํ–‰์„ ์–ธ์ œํ• ์ง€ ๊ฒฐ์ •์ด ํ•„์š”
โ–ช
generator๋ฅผ ํ†ตํ•ด ์•„๋ž˜ get_price_XXXX.py DAG๋ฅผ ์ƒ์„ฑ
โ–ช
generator.py
from jinja2 import Environment, FileSystemLoader import yaml import os file_dir = os.path.dirname(os.path.abspath(__file__)) env = Environment(loader=FileSystemLoader(file_dir)) template = env.get_template('templated_dag.jinja2') for f in os.listdir(file_dir): if f.endswith(".yml"): with open(f"{file_dir}/{f}", "r") as cf: config = yaml.safe_load(cf) with open(f"dags/get_price_{config['dag_id']}.py", "w") as f: f.write(template.render(config))
Python
๋ณต์‚ฌ
โ–ช
templated_dag.jinja2
from airflow import DAG from airflow.decorators import task from datetime import datetime with DAG(dag_id="get_price_{{ dag_id }}", start_date=datetime(2023, 6, 15), schedule='{{ schedule }}', catchup={{ catchup or True }}) as dag: @task def extract(symbol): return symbol @task def process(symbol): return symbol @task def store(symbol): return symbol store(process(extract("{{ symbol }}")))
Python
๋ณต์‚ฌ
โ–ช
config_appl.yml
dag_id: 'APPL' schedule: '@daily' catchup: False symbol: 'APPL'
Python
๋ณต์‚ฌ
โ–ช
config_goog.yml
dag_id: 'GOOG' schedule: '@weekly' symbol: 'GOOG'
Python
๋ณต์‚ฌ
catch up์ด ์—†์œผ๋ฉด True๋กœ jinja์—์„œ ์„ธํŒ…ํ•ด์ค€๋‹ค.