Airflow DAG 작성(PythonOperator, params, context, Xcom, Airflow.cfg, DAG Parameters)
Hello World 예제 프로그램 살펴보기
Operators - PythonOperator
•
python으로 하는 일반적인 기능은 구현이 가능한 자유도가 높은 operator
load_nps = PythonOperator(
dag=dag,
task_id='task_id',
python_callable=python_func,
params={
'table': 'delighted_nps',
'schema': 'raw_data'
},
)
Python
복사
from airflow.exceptions import AirflowException
def python_func(**cxt):
table = cxt["params"]["table"]
schema = cxt["params"]["schema"]
ex_date = cxt["execution_date"]
# do what you need to do
...
Python
복사
•
python operator에 정의된 params는 context 변수의 params key로 저장이 된다
Python Operator를 이용한 소스 코드 확인하기
2개의 태스크로 구성된 데이터 파이프라인 (DAG)
•
DAG 선언
◦
Airflow의 DAG는 task들의 집합
◦
하나의 Task는 하나의 Operator로 이루어짐
◦
tag를 통해 business owner를 적어놓는 것을 권장
▪
tag는 여러 개 사용 가능
dag = DAG(
dag_id = 'my_first_dag',
start_date = datetime(2021,8,26),
catchup=False, # 미래에 활성화 시켜도 catchup 시키지 않음
tags=['example'],
schedule_interval = '0 2 * * *' # 2시 0분에 한번 도는 daily DAG
)
Python
복사
◦
Task
▪
print_hello: PythonOperator로 구성되어 있으며 먼저 실행
▪
print_goodbye: PythonOperator로 구성되어 있으며 두번째로 실행
•
sudo su airflow로 사용자 전환을 한 후
•
•
성공적으로 만들면 위와 같이 명령어를 쳤을 때 저렇게 뜬다.
◦
airflow는 dags 경로를 5분마다 scan해서 확인한다
•
아래는 코드
## 전체 소스 코드
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
dag = DAG(
dag_id = 'my_first_dag',
start_date = datetime(2022,5,5),
catchup=False,
tags=['example'],
schedule_interval = '0 2 * * *')
def print_hello():
print("hello!")
return "hello!"
def print_goodbye():
print("goodbye!")
return "goodbye!"
print_hello = PythonOperator(
task_id = 'print_hello',
#python_callable param points to the function you want to run
python_callable = print_hello,
#dag param points to the DAG that this task is a part of
dag = dag)
print_goodbye = PythonOperator(
task_id = 'print_goodbye',
python_callable = print_goodbye,
dag = dag)
#Assign the order of the tasks in our DAG
print_hello >> print_goodbye
Python
복사
•
Tree 뷰에서는 그 동안 실행되었던 record들이 나온다
•
Graph 탭에 들어가게 되면 Task 들 간의 순서가 graph 형식으로 표현이 된다
Airflow Decorator를 이용한 소스 코드 확인하기
•
Airflow Decorator를 사용하게 되면 훨씬 프로그램이 더 직관적이게 된다.
•
소스코드
from airflow.decorators import task
@task
def print_hello():
print("hello!")
return "hello!"
@task
def print_goodbye():
print("goodbye!")
return "goodbye!"
with DAG(
dag_id = 'HelloWorld_v2',
start_date = datetime(2022,5,5),
catchup=False,
tags=['example'],
schedule = '0 2 * * *'
) as dag:
# Assign the tasks to the DAG in order
print_hello() >> print_goodbye()
# 이 함수 이름이 태스크 ID가 됨
Python
복사
함수들 앞에 데코레이터를 통해서 annotation을 한다
함수를 정의하면 그 자체가 task가 된다
DAG 내 task 함수 실행 순서를 지정해주면 된다. 또한 task id를 별도로 설정해주지 않으면, task 함수명이 task id가 된다.
•
참고로
with DAG(
dag_id = 'HelloWorld_v2',
start_date = datetime(2022,5,5),
catchup=False,
tags=['example'],
schedule = '0 2 * * *'
) as dag:
# Assign the tasks to the DAG in order
print_hello() >> print_goodbye()
# 이 함수 이름이 태스크 ID가 됨
Python
복사
위 방식처럼 하는 것이 context manage 방법이라고 한다.
operator와 task는 dag에 할당돼야 실행이 가능한데
1. dag=DAG()는 dag id를 태스크마다 명시해줘서 할당해줘야 해요
2. 근데 with 절을 사용하면 해당 오퍼레이터에 자동으로 dag를 할당해줘서 간결하게 작성 가능해요
3. decorator는 여러번 호출시에 재사용 가능해서 용이해요
Plain Text
복사
1,2,3 순서대로 airflow가 버전업 되면서 쓰기 편하게 새로 생기는 기능들이라고 생각하면 편하다.
중요한 DAG 파라미터
•
Important DAG parameters (not task parameters)
with DAG(
dag_id = 'HelloWorld_v2',
start_date = datetime(2022,5,5),
catchup=False,
tags=['example'],
schedule = '0 2 * * *'
) as dag:
Python
복사
◦
위의 DAG 파라미터 말고도 아래와 같은 파라미터 설정 가능
◦
max_active_runs
▪
한번에 동시에 실행이 될 수 있는 DAG 인스턴스들의 수
▪
예를 들어, Backfill을 해야하는 경우
•
Daily Incremental update를 하는 DAG이고 과거의 데이터 품질 이슈가 생겨서 지난 1년 간의 데이터를 다시 읽어와야 할 경우, 365번 과거 일년의 날짜들에 대해서 이 DAG가 실행이 되야 한다
•
만약 이 경우, DAG가 1번에 1개씩만 실행이 가능하면, 동기적으로 시간이 너무 오래걸릴 것이다.
•
이럴 경우, max_active_runs 를 30으로 설정해서 12번만 실행해서 시간을 단축할 수 있다.
◦
max_active_tasks
▪
이 DAG에 속한 task들 중 동시에 병렬로 실행할 수 있는 태스크의 수
▪
airflow node에 할당된 cpu 수에 의해 결정이 된다.
•
지금 airflow worker node에 할당된 cpu의 총 합이 upper bound가 된다
•
worker node = 1, cpu = 4 ⇒ 한번에 실행가능한 태스크 = 4
•
즉, cpu 사양에 의해 upper bound가 결정이 됨
▪
airflow에서 동일한 시간에 2개의 DAG가 실행된다면 CPU에서 병렬로 처리가 됨
▪
만약 DAG의 task가 모두 동기적으로 실행될 경우, 필요없는 파라미터
◦
catchup
▪
start date가 현재보다 과거일 경우, 과거의 run들을 backfill할지 여부
▪
default는 True, Full refresh의 경우에는 의미가 크게 없다.
DAG parameters vs. Task parameters의 차이점 이해가 중요
◦
위의 파라미터들은 모두 DAG 파라미터로 DAG 객체를 만들 때 지정해주어야함
◦
default_args로 지정해주면 에러는 안 나지만 적용이 안됨
▪
default_args로 지정되는 파라미터들은 태스크 레벨로 적용되는 파라미터들
Colab Python 코드를 Airflow로 포팅하기
•
기존 Colab python 코드 (개선된 ETL 코드)
1.
헤더가 레코드로 추가되는 문제 해결하기
2.
Idempotent하게 잡을 만들기 (멱등성)
•
auto_commit = True, SQL : BEGIN; END;
a. 여러 번 실행해도 동일한 결과가 나오게 만들기
b. 매번 새로 모든 데이터를 읽어오는 잡이라고 가정하고 구현할 것
import logging
import psycopg2
def get_Redshift_connection():
host = "learnde......ap-northeast-2.redshift.amazonaws.com"
user = "...." # 본인 ID 사용
password = "..." # 본인 Password 사용
port = 5439
dbname = "dev"
conn = psycopg2.connect(f"dbname={dbname} user={user} host={host} password={password} port={port}")
conn.set_session(autocommit=True)
return conn.cursor()
def extract(url):
f = requests.get(url)
return (f.text)
def transform(text):
lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
records = []
for l in lines:
(name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
records.append([name, gender])
return records
def load(records):
"""
records = [
[ "Keeyong", "M" ],
[ "Claire", "F" ],
...
]
"""
schema = "keeyong"
# BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
cur.execute(f"DELETE FROM {schema}.name_gender;")
# DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
cur.execute(sql)
cur.execute("COMMIT;") # cur.execute("END;")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
cur.execute("ROLLBACK;")
Python
복사
Airflow 포팅방법 1
•
NameGenderCSVtoRedshift.py
◦
ETL 을 각각의 태스크로 나눠서 구성할 것인가
◦
혹은 ETL을 합쳐서 하나의 태스크로 포팅할 것인가
⇒ 아래 코드는 각각 ETL 3개의 함수를 하나의 태스크에서 실행
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import requests
import logging
import psycopg2
def get_Redshift_connection():
host = "learnde.......ap-northeast-2.redshift.amazonaws.com"
user = "...." # 본인 ID 사용
password = "..." # 본인 Password 사용
port = 5439
dbname = "dev"
conn = psycopg2.connect(f"dbname={dbname} user={user} host={host} password={password} port={port}")
conn.set_session(autocommit=True)
return conn.cursor()
def extract(url):
logging.info("Extract started")
f = requests.get(url)
logging.info("Extract done")
return (f.text)
def transform(text):
logging.info("transform started")
# ignore the first line - header
lines = text.split("\n")[1:]
logging.info("transform done")
return lines
def load(lines):
logging.info("load started")
cur = get_Redshift_connection()
sql = "BEGIN;DELETE FROM keeyong.name_gender;"
for l in lines:
if l != '':
(name, gender) = l.split(",")
sql += f"INSERT INTO keeyong.name_gender VALUES ('{name}', '{gender}');"
sql += "END;"
cur.execute(sql)
logging.info(sql)
logging.info("load done")
def etl():
link = "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"
data = extract(link)
lines = transform(data)
load(lines)
dag_second_assignment = DAG(
dag_id = 'name_gender',
catchup = False, # 모르겠으면 False가 안전
start_date = datetime(2022,10,6), # 날짜가 미래인 경우 실행이 안됨
schedule_interval = '0 2 * * *' # 적당히 조절
)
task = PythonOperator(
task_id = 'perform_etl',
python_callable = etl,
dag = dag_second_assignment)
etl
Python
복사
params, context 변수
Airflow 포팅방법 2
•
◦
PythonOperator 사용 시 params 변수 사용하여 etl 함수에 param 전달
◦
Airflow 시스템 변수를 로그로 찍어보기 (execution_date를 찍어본다)
◦
context 변수 설명
def etl(**context):
link = context["params"]["url"]
# task 자체에 대한 정보 (일부는 DAG의 정보가 되기도 함)를 읽고 싶다면 context['task_instance'] 혹은 context['ti']를 통해 가능
# https://airflow.readthedocs.io/en/latest/_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance
task_instance = context['task_instance']
execution_date = context['execution_date']
.
.
.
task = PythonOperator(
task_id = 'perform_etl',
python_callable = etl,
params = {
'url': "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"
},
dag = dag_second_assignment)
Python
복사
▪
context는 딕셔너리
•
Airflow가 관리하고 있는 다양한 변수들이 존재
1.
task_instance : 지금 실행된 태스크의 유니크한 id
2.
execution_date
a.
Airflow가 제공해주는 시스템 변수로 읽어와야할 데이터의 날짜와 시간이 들어옴
b.
보통 daily job이면 start_date는 첫 execution_date와 같음
3.
params
a.
pythonOperator로부터 넘겨받은 params 인자에는 다양한 키가 존재
•
NameGenderCSVtoRedshift_v2.py 코드
from airflow import DAG
from airflow.operators.python import PythonOperator
# from airflow.operators import PythonOperator
from airflow.models import Variable
from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2
def get_Redshift_connection():
host = "learnde......ap-northeast-2.redshift.amazonaws.com"
redshift_user = "..." # 본인 ID 사용
redshift_pass = "..." # 본인 Password 사용
port = 5439
dbname = "dev"
conn = psycopg2.connect(f"dbname={dbname} user={redshift_user} host={host} password={redshift_pass} port={port}")
conn.set_session(autocommit=True)
return conn.cursor()
def extract(url):
logging.info("Extract started")
f = requests.get(url)
logging.info("Extract done")
return (f.text)
def transform(text):
logging.info("transform started")
# ignore the first line - header
lines = text.split("\n")[1:]
logging.info("transform done")
return lines
# 적재 -> BEGIN;END를 이용 -> 장합성 보장
def load(lines):
logging.info("load started")
cur = get_Redshift_connection()
sql = "BEGIN;DELETE FROM keeyong.name_gender;"
for l in lines:
if l != '':
(name, gender) = l.split(",")
sql += f"INSERT INTO keeyong.name_gender VALUES ('{name}', '{gender}');"
sql += "END;"
logging.info(sql)
"""
Do we want to enclose try/catch here
"""
cur.execute(sql)
logging.info("load done")
def etl(**context):
link = context["params"]["url"]
# task 자체에 대한 정보 (일부는 DAG의 정보가 되기도 함)를 읽고 싶다면 context['task_instance'] 혹은 context['ti']를 통해 가능
# https://airflow.readthedocs.io/en/latest/_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance
task_instance = context['task_instance']
execution_date = context['execution_date']
logging.info(execution_date)
data = extract(link)
lines = transform(data)
load(lines)
dag_second_assignment = DAG(
dag_id = 'second_assignment_v2',
start_date = datetime(2022,10,6), # 날짜가 미래인 경우 실행이 안됨
schedule_interval = '0 2 * * *', # 적당히 조절
catchup = False,
max_active_runs = 1,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
)
# param 변수를 통해 etl 함수에 param(link) 전달
task = PythonOperator(
task_id = 'perform_etl',
python_callable = etl,
params = {
'url': "https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv"
},
dag = dag_second_assignment)
Python
복사
◦
List connection에서 기록한 키-값을 params를 통해 변수 넘기기
▪
execution_date 얻어내기
▪
“delete from” vs. “truncate”
•
DELETE FROM raw_data.name_gender; -- WHERE 사용 가능
•
TRUNCATE raw_data.name_gender;
Connections and Variables
Airflow에 필요한 정보 및 변수들을 Encrypt해서 사용 가능해집니다!
connection과 variable에서 정의한 key value는 global합니다
•
위치
•
Connections
◦
이것은 호스트 이름, 포트 번호 및 액세스 자격 증명과 같은 연결 관련 정보를 저장하는 데 사용됩니다.
◦
Postgres 연결 또는 Redshift 연결 정보, AWS connection 정보들을 여기에 저장할 수 있습니다.
◦
Conn Id가 connection의 이름이 됩니다.
•
Variables
◦
Airflow에서 사용하는 key-value 스토리지에 접근 가능하게 해주는 모듈입니다.
▪
키-값 형태로 저장합니다
▪
get, set method를 사용할 수 있습니다
from airflow.models import Variable
Variable.get("key") # 키를 주고 읽어오기
Variable.set("key", "val") # 키와 value를 주고 variable을 세팅해줍니다.
Python
복사
◦
API 키 또는 Configuration 과 같은 구성 정보를 저장하는 데 사용됩니다
◦
값이 암호화되기를 원하는 경우 이름에 "access" 또는 "secret"을 사용 가능합니다.
Xcom이란?
태스크(Operator)들 간에 데이터를 주고 받기 위한 방식
•
Xcom 필요 예시
◦
태스크 하나로 구성이 된 경우 → 필요 없음
data = extract(link)
lines = transform(data)
load(lines)
Python
복사
◦
3개의 태스크로 나눈 경우
▪
데이터를 어떻게 다음 Operator에게 넘길까? ⇒ Xcom
•
보통 한 Operator의 리턴값을 다른 Operator에서 읽어가는 형태가 됨
◦
이 값들(Operator 및 task의 리턴 값)은 Airflow 메타 데이터 DB에 저장
▪
형식
•
task instance의 id와 return value 혹은 xcom_push value를 묶어서 메타 데이터베이스에 저장
◦
앞선 task의 id를 넘겨 받을 때, 넘겨받은 id가 리턴했던 값을 메타 데이터 DB로부터 가져온다
•
이 값들(Operator 및 task의 리턴 값)은 Airflow 메타 데이터 DB에 저장이 되기에 큰 데이터를 주고받는데는 사용 불가
◦
예를 들면, 판다스 데이터 프레임과 같이 큰 데이터는 오버 로드를 발생시킬 수 있다.
◦
보통 큰 데이터는 S3 등에 로드하고 그 위치(URI)를 넘기는 것이 일반적
•
Xcom을 사용하기 위해서는 각 task에서 push, pull 하는 방식으로 기본적으로 사용되지만, PythtonOperator의 경우 return이 자동적으로 Xcom 변수로 지정되게 된다.
Airflow 포팅방법 3
•
NameGenderCSVtoRedshift.py 개선하기 2번째 방법
◦
3개의 python Operator로 나눠질 경우, 각각의 Operator 간의 통신을 위해서 Xcom을 사용한다
◦
첫번째 python Operator의 출력값이 두번째 python Operator의 입력값이 될 수 있도록 설정
◦
Xcom 객체를 사용해서 세 개의 task로 나누기
▪
xcom_pull
•
extract → transform → load
•
context["task_instance"]에 저장된 정보를 xcom_pull로 가져온다
▪
xcom_push
▪
xcom으로 만든 key value는 해당 DAG에서만 사용이 가능함
◦
Redshift의 스키마와 테이블 이름을 params로 넘기기
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2
def get_Redshift_connection():
host = "learnde.....ap-northeast-2.redshift.amazonaws.com"
redshift_user = "..." # 본인 ID 사용
redshift_pass = "..." # 본인 Password 사용
port = 5439
dbname = "dev"
conn = psycopg2.connect(f"dbname={dbname} user={redshift_user} host={host} password={redshift_pass} port={port}")
conn.set_session(autocommit=True)
return conn.cursor()
def extract(**context):
link = context["params"]["url"]
task_instance = context['task_instance']
execution_date = context['execution_date']
logging.info(execution_date)
f = requests.get(link)
return (f.text)
def transform(**context):
logging.info("Transform started")
# task_instance의 xcom_pull을 통해 extract id의 return_value 를 읽어와서 text 변수로 할당
text = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
records = []
for l in lines:
(name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
records.append([name, gender])
logging.info("Transform ended")
return records
def load(**context):
logging.info("load started")
schema = context["params"]["schema"]
table = context["params"]["table"]
lines = context["task_instance"].xcom_pull(key="return_value", task_ids="transform")
"""
records = [
[ "Keeyong", "M" ],
[ "Claire", "F" ],
...
]
"""
# BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
cur.execute(f"DELETE FROM {schema}.name_gender;")
# DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
cur.execute(sql)
cur.execute("COMMIT;") # cur.execute("END;")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
cur.execute("ROLLBACK;")
logging.info("load done")
dag = DAG(
dag_id = 'name_gender_v3',
start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 2 * * *', # 적당히 조절
catchup = False,
max_active_runs = 1,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
)
extract = PythonOperator(
task_id = 'extract',
python_callable = extract,
params = {
'url': Variable.get("csv_url")
},
dag = dag)
transform = PythonOperator(
task_id = 'transform',
python_callable = transform,
params = {
},
dag = dag)
load = PythonOperator(
task_id = 'load',
python_callable = load,
params = {
'schema': 'keeyong',
'table': 'name_gender'
},
dag = dag)
extract >> transform >> load
Python
복사
Airflow 포팅 4
•
Redshift connection을 이용해서 connection 보호
◦
PostgresHook의 autocommit 파라미터
▪
Default 값은 False로 주어짐
▪
이 경우 BEGIN은 아무런 영향이 없음 (no-operation)
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from datetime import timedelta
# from plugins import slack
import requests
import logging
import psycopg2
def get_Redshift_connection(autocommit=True):
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
conn = hook.get_conn()
conn.autocommit = autocommit
return conn.cursor()
def extract(**context):
link = context["params"]["url"]
task_instance = context['task_instance']
execution_date = context['execution_date']
logging.info(execution_date)
f = requests.get(link)
return (f.text)
def transform(**context):
logging.info("Transform started")
text = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
records = []
for l in lines:
(name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
records.append([name, gender])
logging.info("Transform ended")
return records
def load(**context):
logging.info("load started")
schema = context["params"]["schema"]
table = context["params"]["table"]
lines = context["task_instance"].xcom_pull(key="return_value", task_ids="transform")
"""
records = [
[ "Keeyong", "M" ],
[ "Claire", "F" ],
...
]
"""
# BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
cur.execute(f"DELETE FROM {schema}.name_gender;")
# DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
cur.execute(sql)
cur.execute("COMMIT;") # cur.execute("END;")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
cur.execute("ROLLBACK;")
logging.info("load done")
dag = DAG(
dag_id = 'name_gender_v4',
start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 2 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
# 'on_failure_callback': slack.on_failure_callback,
}
)
extract = PythonOperator(
task_id = 'extract',
python_callable = extract,
params = {
'url': Variable.get("csv_url")
},
dag = dag)
transform = PythonOperator(
task_id = 'transform',
python_callable = transform,
params = {
},
dag = dag)
load = PythonOperator(
task_id = 'load',
python_callable = load,
params = {
'schema': 'hajuny129', ## 자신의 스키마로 변경
'table': 'name_gender'
},
dag = dag)
extract >> transform >> load
Python
복사
Airflow 포팅 5
•
from airflow.decorators import task
◦
task decorator를 사용
◦
이 경우 xcom을 사용할 필요가 없음
◦
기본적으로 PythonOperator 대신에 airflow.decorators.task를 사용
◦
코드
from airflow import DAG
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.decorators import task # decorator 임포트
from datetime import datetime
from datetime import timedelta
import requests
import logging
def get_Redshift_connection(autocommit=True):
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
conn = hook.get_conn()
conn.autocommit = autocommit
return conn.cursor()
@task
def extract(url):
logging.info(datetime.utcnow())
f = requests.get(url)
return f.text
@task
def transform(text):
lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
records = []
for l in lines:
(name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
records.append([name, gender])
logging.info("Transform ended")
return records
@task
def load(schema, table, records):
logging.info("load started")
cur = get_Redshift_connection()
"""
records = [
[ "Keeyong", "M" ],
[ "Claire", "F" ],
...
]
"""
# BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
try:
cur.execute("BEGIN;")
cur.execute(f"DELETE FROM {schema}.name_gender;")
# DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
cur.execute(sql)
cur.execute("COMMIT;") # cur.execute("END;")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
cur.execute("ROLLBACK;")
logging.info("load done")
with DAG(
dag_id='namegender_v5',
start_date=datetime(2022, 10, 6), # 날짜가 미래인 경우 실행이 안됨
schedule='0 2 * * *', # 적당히 조절
max_active_runs=1,
catchup=False,
default_args={
'retries': 1,
'retry_delay': timedelta(minutes=3),
# 'on_failure_callback': slack.on_failure_callback,
}
) as dag:
url = Variable.get("csv_url")
schema = 'hajuny129' ## 자신의 스키마로 변경
table = 'name_gender'
lines = transform(extract(url))
load(schema, table, lines)
Python
복사
테스크 데코레이터를 사용하게 되면 python operator 처럼 **context를 인자로 받아오는 것이 아니라 그냥 함수 인자들처럼 받아와서 사용할 수 있다.
아래와 같이 데코레이터 포맷을 사용해서 함수들 각각이 별개의 태스크로 사용 가능해진다.
with DAG(
dag_id='namegender_v5',
start_date=datetime(2022, 10, 6), # 날짜가 미래인 경우 실행이 안됨
schedule='0 2 * * *', # 적당히 조절
max_active_runs=1,
catchup=False,
default_args={
'retries': 1,
'retry_delay': timedelta(minutes=3),
# 'on_failure_callback': slack.on_failure_callback,
}
) as dag:
url = Variable.get("csv_url")
schema = 'hajuny129' ## 자신의 스키마로 변경
table = 'name_gender'
lines = transform(extract(url))
load(schema, table, lines)
Python
복사
•
docker ps를 통해 airflow scheduler의 id를 알아내서 접속해보자
◦
docker exec -it 2d1d8fe0fd67 /bin/bash
•
컨테이너를 통해 명령어를 실행
$ airflow dags list
## dags들의 list 출력
# dag 내 task가 어떤 것이 존재하는지
$ airflow tasks list namegender_v5
>>
extract
load
transform
$ airflow variables list
>>
key
=======
csv_url
$ airflow variables get csv_url
>> 키에 맞는 값을 가져온다
Python
복사
PostgresHook의 autocommit 파라미터
•
Default 값은 False로 주어짐
◦
명시적으로 END 혹은 COMMIT을 해줘야 물리적 테이블에 반영이 된다
•
이 경우(False), BEGIN은 아무런 영향이 없음 (no-operation)
DAG에서 task를 어느 정도로 분리하는 것이 좋을까?
•
task를 많이 만들면 전체 DAG이 실행되는데 오래 걸리고 스케줄러에 부하가 감
•
task를 너무 적게 만들면 모듈화가 안되고 실패시 재실행을 시간이 오래 걸림
•
오래 걸리는 DAG이라는 실패시 재실행이 쉽게 다수의 task로 나누는 것이 좋음
Airflow의 Variable 관리 vs. 코드 관리
•
가끔 SQL 문들도 variable로 뽑아서 사용할 때가 존재
•
장점: 코드 푸시의 필요성이 없음
•
단점: 관리나 테스트가 안되어서 사고로 이어질 가능성이 있음
◦
너무 중요한 SQL의 경우에는 코드 푸시를 하길 권장
DAG 디버깅하기 혹은 에러 메세지 확인하기
•
Airflow WebUI DAG UI에서 문제가 있는 DAG를 클릭하고 거기서 빨간색으로 표시된 task 클릭 후 View Log로 확인
•
Airflow WebUI DAG UI에서 문제가 있는 DAG를 클릭하고 거기서 빨간 색으로 표시된 task 클릭 후 View Log로 확인
•
보통 variables 나 connection 에러가 많다