Search

[Airflow] Jinja Template with Airflow

Jinja Template์ด๋ž€?

โ€ข
Jinja ํ…œํ”Œ๋ฆฟ์€ Python์—์„œ ๋„๋ฆฌ ์‚ฌ์šฉ๋˜๋Š” ํ…œํ”Œ๋ฆฟ ์—”์ง„
โ—ฆ
Django ํ…œํ”Œ๋ฆฟ ์—”์ง„์—์„œ ์˜๊ฐ์„ ๋ฐ›์•„ ๊ฐœ๋ฐœ
โ—ฆ
Jinja๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ํ”„๋ ˆ์  ํ…Œ์ด์…˜ ๋กœ์ง๊ณผ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ๋กœ์ง์„ ๋ถ„๋ฆฌํ•˜์—ฌ ๋™์ ์œผ๋กœ HTML ์ƒ์„ฑ
โ—ฆ
Flask์—์„œ ์‚ฌ์šฉ๋จ
โ–ช
๋ณ€์ˆ˜๋Š” ์ด์ค‘ ์ค‘๊ด„ํ˜ธ {{ }}๋กœ ๊ฐ์‹ธ์„œ ์‚ฌ์šฉ
<h1>์•ˆ๋…•ํ•˜์„ธ์š”, {{ name }}๋‹˜!</h1>
Python
๋ณต์‚ฌ
โ–ช
์ œ์–ด๋ฌธ์€ ํผ์„ผํŠธ ๊ธฐํ˜ธ {% %}๋กœ ํ‘œ์‹œ
<ul> {% for item in items %} <li>{{ item }}</li> {% endfor %} </ul>
Python
๋ณต์‚ฌ

์ฐธ๊ณ : Jinja Template + Airflow

โ€ข
Airflow์—์„œ Jinja ํ…œํ”Œ๋ฆฟ์„ ์‚ฌ์šฉํ•˜๋ฉด ์ž‘์—… ์ด๋ฆ„, ํŒŒ๋ผ๋ฏธํ„ฐ ๋˜๋Š” SQL ์ฟผ๋ฆฌ์™€ ๊ฐ™์€ ์ž‘์—… ๋งค๊ฐœ๋ณ€์ˆ˜๋ฅผ ํ…œํ”Œ๋ฆฟํ™”๋œ ๋ฌธ์ž์—ด๋กœ ์ •์˜ ๊ฐ€๋Šฅ
โ—ฆ
์ด๋ฅผ ํ†ตํ•ด ์žฌ์‚ฌ์šฉ๊ฐ€๋Šฅํ•˜๊ณ  ์‚ฌ์šฉ์ž ์ •์˜ ๊ฐ€๋Šฅํ•œ ์›Œํฌํ”Œ๋กœ์šฐ ์ƒ์„ฑ
โ€ข
์˜ˆ 1) execution_date ๊ณผ ๊ฐ™์€ ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ์ฝ”๋“œ ๋‚ด์—์„œ ์‰ฝ๊ฒŒ ์‚ฌ์šฉ: {{ ds }}
โ—ฆ
๊ฐ€๋Šฅํ•œ ๋ชจ๋“  ์‹œ์Šคํ…œ ๋ณ€์ˆ˜๋Š” ์—ฌ๊ธฐ๋ฅผ ์ฐธ์กฐ
# BashOperator๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ํ…œํ”Œ๋ฆฟ ์ž‘์—… ์ •์˜ task1 = BashOperator( task_id='task1', bash_command='echo "{{ ds }}"', dag=dag )
Python
๋ณต์‚ฌ
โ€ข
์˜ˆ 2) ํŒŒ๋ผ๋ฏธํ„ฐ ๋“ฑ์œผ๋กœ ๋„˜์–ด์˜จ ๋ณ€์ˆ˜๋ฅผ ์‰ฝ๊ฒŒ ์‚ฌ์šฉ ๊ฐ€๋Šฅ
# ๋™์  ๋งค๊ฐœ๋ณ€์ˆ˜๊ฐ€ ์žˆ๋Š” ๋‹ค๋ฅธ ํ…œํ”Œ๋ฆฟ ์ž‘์—… ์ •์˜ task2 = BashOperator( task_id='task2', bash_command='echo "์•ˆ๋…•ํ•˜์„ธ์š”, {{ params.name }}!"', params={'name': 'John'}, # ์‚ฌ์šฉ์ž ์ •์˜ ๊ฐ€๋Šฅํ•œ ๋งค๊ฐœ๋ณ€์ˆ˜ dag=dag )
Python
๋ณต์‚ฌ

์ฐธ๊ณ : BashOperator ๋ ˆํผ๋Ÿฐ์Šค ๋ณด๊ธฐ

โ€ข
bash_command (str) โ€“ The command, set of commands or reference to a bash script (must be โ€˜.shโ€™) to be executed. (templated)
โ€ข
env (dict[str, str] | None) โ€“ If env is not None, it must be a dict that defines the environment variables for the new process; these are used instead of inheriting the current process environment, which is the default behavior. (templated)
โ€ข
append_env (bool) โ€“ If False(default) uses the environment variables passed in env params and does not inherit the current process environment. If True, inherits the environment variables from current passes and then environment variable passed by the user will either update the existing inherited environment variables or the new variables gets appended to it
โ€ข
output_encoding (str) โ€“ Output encoding of bash command
โ€ข
skip_on_exit_code (int | Container[int] | None) โ€“ If task exits with this exit code, leave the task in skipped state (default: 99). If set to None, any non-zero exit code will be treated as a failure.
โ€ข
cwd (str | None) โ€“ Working directory to execute the command in. If None (default), the command is run in a temporary directory

์ฐธ๊ณ : Airflow์—์„œ ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ Jinja ๋ณ€์ˆ˜๋“ค ๋ช‡๊ฐœ ์‚ดํŽด๋ณด๊ธฐ

โ€ข
{{ ds }}
โ—ฆ
ํ˜„์žฌ ์‹คํ–‰ ์ค‘์ธ DAG ์ธ์Šคํ„ด์Šค์˜ ์‹คํ–‰ ๋‚ ์งœ (๋‚ ์งœ ๋ฌธ์ž์—ด ํ˜•์‹: "YYYY-MM-DD").
โ€ข
{{ ds_nodash }}
โ—ฆ
ํ˜„์žฌ ์‹คํ–‰ ์ค‘์ธ DAG ์ธ์Šคํ„ด์Šค์˜ ์‹คํ–‰ ๋‚ ์งœ (๋‚ ์งœ ๋ฌธ์ž์—ด ํ˜•์‹: "YYYYMMDD").
โ€ข
{{ ts }}
โ—ฆ
ํ˜„์žฌ ์‹คํ–‰ ์ค‘์ธ ํƒœ์Šคํฌ์˜ ์‹œ์ž‘ ์‹œ๊ฐ„ (์‹œ๊ฐ„ ๋ฐ ๋‚ ์งœ ๋ฌธ์ž์—ด ํ˜•์‹: "YYYY-MM-DDTHH:MM:SS")
โ€ข
{{ dag }}
โ—ฆ
ํ˜„์žฌ ์‹คํ–‰ ์ค‘์ธ DAG ์ธ์Šคํ„ด์Šค์˜ DAG ๊ฐ์ฒด.
โ€ข
{{ task }}
โ—ฆ
ํ˜„์žฌ ์‹คํ–‰ ์ค‘์ธ ํƒœ์Šคํฌ ๊ฐ์ฒด.
โ€ข
{{ dag_run }}
โ—ฆ
ํ˜„์žฌ ์‹คํ–‰ ์ค‘์ธ DAG ์ธ์Šคํ„ด์Šค์˜ DAGRun ๊ฐ์ฒด.
โ€ข
{{ var.value }}
โ—ฆ
{{ var.value.get('my.var', 'fallback') }}
โ—ฆ
Airflow ๋ณ€์ˆ˜์˜ ๊ฐ’์„ ๊ฐ€์ ธ์˜ต๋‹ˆ๋‹ค.
โ—ฆ
var์€ Airflow์—์„œ ์ •์˜๋œ ๋ณ€์ˆ˜์˜ ์ด๋ฆ„์„ ๋‚˜ํƒ€๋‚ด๋ฉฐ, value๋Š” ํ•ด๋‹น ๋ณ€์ˆ˜์˜ ๊ฐ’์„ ๊ฐ€์ ธ์˜ต๋‹ˆ๋‹ค.
โ—ฆ
์˜ˆ๋ฅผ ๋“ค์–ด, {{ var.value.get('my.var', 'fallback') }}๋Š” my.var๋ผ๋Š” ๋ณ€์ˆ˜์˜ ๊ฐ’์„ ๊ฐ€์ ธ์˜ค๋ฉฐ, ๊ฐ’์ด ์—†๋Š” ๊ฒฝ์šฐ 'fallback'์„ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.
โ€ข
{{ var.json }}
โ—ฆ
{{ var.json.my_dict_var.key1 }}
โ—ฆ
Airflow ๋ณ€์ˆ˜์˜ JSON ํ˜•์‹ ๊ฐ’์„ ๊ฐ€์ ธ์˜ต๋‹ˆ๋‹ค.
โ—ฆ
var.json์€ JSON ํ˜•์‹์˜ ๋ณ€์ˆ˜ ๊ฐ’์„ ๊ฐ€์ ธ์˜ค๋ฉฐ, ์ด๋ฅผ ํ†ตํ•ด ๋‚ด๋ถ€ ์†์„ฑ์— ์ ‘๊ทผํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
โ—ฆ
์˜ˆ๋ฅผ ๋“ค์–ด, {{ var.json.my_dict_var.key1 }}๋Š” my_dict_var๋ผ๋Š” ๋ณ€์ˆ˜์˜ key1 ์†์„ฑ ๊ฐ’์„ ๊ฐ€์ ธ์˜ต๋‹ˆ๋‹ค.
โ€ข
{{ conn }}
โ—ฆ
{{ conn.my_conn_id.login }}, {{ conn.my_conn_id.password }}
โ—ฆ
Airflow ์—ฐ๊ฒฐ (Connection)์˜ ์†์„ฑ์— ์ ‘๊ทผํ•ฉ๋‹ˆ๋‹ค.
โ—ฆ
conn์€ Airflow์—์„œ ์ •์˜๋œ ์—ฐ๊ฒฐ์˜ ์ด๋ฆ„์„ ๋‚˜ํƒ€๋‚ด๋ฉฐ, my_conn_id๋ผ๋Š” ์—ฐ๊ฒฐ์˜ login ๋ฐ password ์†์„ฑ ๊ฐ’์— ์ ‘๊ทผํ•˜๋Š” ์˜ˆ์‹œ๋Š” ์•„๋ž˜์™€ ๊ฐ™์Šต๋‹ˆ๋‹ค
โ—ฆ
{{ conn.my_conn_id.login }}, {{ conn.my_conn_id.password }}.

์ฐธ๊ณ : Airflow์—์„œ Jinja ๋ณ€์ˆ˜๋ฅผ ์‚ฌ์šฉํ•œ ์˜ˆ์ œ ์ฝ”๋“œ ์‚ดํŽด๋ณด๊ธฐ

โ€ข
Learn_Jinja
โ—ฆ
BashOperator 3๊ฐœ๋กœ ๊ตฌ์„ฑ
โ–ช
โ€œairflow dags test Learn_Jinja 2023-05-30โ€
โ€ข
์ฝ”๋“œ
from airflow import DAG from airflow.operators.bash import BashOperator from datetime import datetime # DAG ์ •์˜ dag = DAG( 'Learn_Jinja', schedule='0 0 * * *', # ๋งค์ผ ์‹คํ–‰ start_date=datetime(2023, 6, 1), catchup=False ) # BashOperator๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ํ…œํ”Œ๋ฆฟ ์ž‘์—… ์ •์˜ task1 = BashOperator( task_id='task1', bash_command='echo "{{ ds }}"', dag=dag ) # ๋™์  ๋งค๊ฐœ๋ณ€์ˆ˜๊ฐ€ ์žˆ๋Š” ๋‹ค๋ฅธ ํ…œํ”Œ๋ฆฟ ์ž‘์—… ์ •์˜ task2 = BashOperator( task_id='task2', bash_command='echo "์•ˆ๋…•ํ•˜์„ธ์š”, {{ params.name }}!"', params={'name': 'John'}, # ์‚ฌ์šฉ์ž ์ •์˜ ๊ฐ€๋Šฅํ•œ ๋งค๊ฐœ๋ณ€์ˆ˜ dag=dag ) task3 = BashOperator( task_id='task3', bash_command="""echo "{{ dag }}, {{ task }}, {{ var.value.get('csv_url') }}" """, dag=dag ) task1 >> task2 >> task3
Python
๋ณต์‚ฌ