Search

[Airflow] Summary table 만들기 (ELT version 1)

Summary Table 구현 Airflow+Redshift로 간단한 ELT 구현을 알아보자

Summary table: 써머리 테이블 예

MAU 써머리 테이블 생성 DAG

SELECT TO_CHAR(A.ts, 'YYYY-MM') AS month, COUNT(DISTINCT B.userid) AS mau FROM raw_data.session_timestamp A JOIN raw_data.user_session_channel B ON A.sessionid = B.sessionid GROUP BY 1 ;
SQL
복사
Build_Summary.py
from airflow import DAG from airflow.operators.python import PythonOperator from airflow.models import Variable from airflow.hooks.postgres_hook import PostgresHook from datetime import datetime from datetime import timedelta from airflow import AirflowException import requests import logging import psycopg2 from airflow.exceptions import AirflowException def get_Redshift_connection(): hook = PostgresHook(postgres_conn_id = 'redshift_dev_db') return hook.get_conn().cursor() def execSQL(**context): schema = context['params']['schema'] table = context['params']['table'] select_sql = context['params']['sql'] logging.info(schema) logging.info(table) logging.info(select_sql) cur = get_Redshift_connection() sql = f"""DROP TABLE IF EXISTS {schema}.temp_{table};CREATE TABLE {schema}.temp_{table} AS """ sql += select_sql cur.execute(sql) cur.execute(f"""SELECT COUNT(1) FROM {schema}.temp_{table}""") count = cur.fetchone()[0] # 위의 cur.execute 결과 1번째 열 if count == 0: raise ValueError(f"{schema}.{table} didn't have any record") try: sql = f"""DROP TABLE IF EXISTS {schema}.{table};ALTER TABLE {schema}.temp_{table} RENAME to {table};""" sql += "COMMIT;" logging.info(sql) cur.execute(sql) except Exception as e: cur.execute("ROLLBACK") logging.error('Failed to sql. Completed ROLLBACK!') raise AirflowException("") dag = DAG( dag_id = "Build_Summary", start_date = datetime(2021,12,10), schedule = '@once', catchup = False ) execsql = PythonOperator( task_id = 'mau_summary', python_callable = execSQL, params = { 'schema' : 'hajuny129', 'table': 'mau_summary', 'sql' : """SELECT TO_CHAR(A.ts, 'YYYY-MM') AS month, COUNT(DISTINCT B.userid) AS mau FROM raw_data.session_timestamp A JOIN raw_data.user_session_channel B ON A.sessionid = B.sessionid GROUP BY 1 ;""" }, dag = dag )
Python
복사
@once 로 스케쥴링할 경우, API가 존재하면 권한이 있을 경우에 트리거링할 수 있게 된다.
execSQL 함수
params로 각각의 변수를 받아온다.
임시 테이블에 CTAS를 적용
그 결과가 에러가 없는 경우
원본 테이블과 임시 테이블의 결과를 바꿈 (원본 테이블을 먼저 삭제 후 교체)
AirflowException 내에 아무 내용을 쓰지 않을 경우, 그냥 raise문을 사용하는 것이 나은 선택이다.

사용자별 Channel 정보를 요약해주는 정보 추가 DAG

SELECT DISTINCT A.userid, FIRST_VALUE(A.channel) over(partition by A.userid order by B.ts rows between unbounded preceding and unbounded following) AS First_Channel, LAST_VALUE(A.channel) over(partition by A.userid order by B.ts rows between unbounded preceding and unbounded following) AS Last_Channel FROM raw_data.user_session_channel A LEFT JOIN raw_data.session_timestamp B ON A.sessionid = B.sessionid;
SQL
복사
from airflow import DAG from airflow.operators.python import PythonOperator from airflow.models import Variable from airflow.hooks.postgres_hook import PostgresHook from datetime import datetime from datetime import timedelta from airflow import AirflowException import requests import logging import psycopg2 from airflow.exceptions import AirflowException def get_Redshift_connection(): hook = PostgresHook(postgres_conn_id = 'redshift_dev_db') return hook.get_conn().cursor() def execSQL(**context): schema = context['params']['schema'] table = context['params']['table'] select_sql = context['params']['sql'] logging.info(schema) logging.info(table) logging.info(select_sql) cur = get_Redshift_connection() sql = f"""DROP TABLE IF EXISTS {schema}.temp_{table};CREATE TABLE {schema}.temp_{table} AS """ sql += select_sql cur.execute(sql) cur.execute(f"""SELECT COUNT(1) FROM {schema}.temp_{table}""") count = cur.fetchone()[0] if count == 0: raise ValueError(f"{schema}.{table} didn't have any record") try: sql = f"""DROP TABLE IF EXISTS {schema}.{table};ALTER TABLE {schema}.temp_{table} RENAME to {table};""" sql += "COMMIT;" logging.info(sql) cur.execute(sql) except Exception as e: cur.execute("ROLLBACK") logging.error('Failed to sql. Completed ROLLBACK!') raise AirflowException("") dag = DAG( dag_id = "Build_Summary_channel", start_date = datetime(2021,12,10), schedule_interval = '@once', catchup = False ) execsql = PythonOperator( task_id = 'execsql', python_callable = execSQL, params = { 'schema' : 'hajuny129', 'table': 'channel_summary', 'sql' : """SELECT DISTINCT A.userid, FIRST_VALUE(A.channel) over(partition by A.userid order by B.ts rows between unbounded preceding and unbounded following) AS First_Channel, LAST_VALUE(A.channel) over(partition by A.userid order by B.ts rows between unbounded preceding and unbounded following) AS Last_Channel FROM raw_data.user_session_channel A LEFT JOIN raw_data.session_timestamp B ON A.sessionid = B.sessionid;""" }, provide_context = True, dag = dag )
Python
복사

CTAS 부분을 아예 별도의 환경설정 파일로 떼어내면 어떨까?

환경 설정 중심의 접근 방식
config 폴더를 생성
그 안에 써머리 테이블별로 하나의 환경설정 파일 생성
파이썬 dictionary 형태로 유지할 것이라 .py 확장자를 가져야함
이렇게 하면 비개발자들이 사용할 때 어려움을 덜 느끼게 됨
그러면서 더 다양한 테스트를 추가
mau_summary.py
{ 'table': 'mau_summary', 'schema': 'keeyong', 'main_sql': """SELECT …;""", 'input_check': [ ], 'output_check': [ ], }
SQL
복사
질문 input check를 비워두어도 되는 것인가?

일별 NPS 계산해보자

NPS란? Net Promoter Score
10점 만점으로 '주변에 추천하겠는가?'라는 질문을 기반으로 고객 만족도를 계산
10, 9점 추천하겠다는 고객(promoter)의 비율에서 0-6점의 불평고객(detractor)의 비율을 뺀 것이 NPS
7, 8점은 아예 계산에 안 들어감
아래를 변경하여 일별 nps를 계산하는 써머리 테이블 만들어보기
먼저 SQL을 구현
각자스키마.nps 테이블 기준으로 일별 nps 써머리 생성 SQL 구현
아래는 일별 NPS 계산하는 SQL 쿼리
SELECT LEFT(created_at, 10) AS date, ROUND( SUM( CASE WHEN score >= 9 THEN 1 WHEN score <= 6 THEN -1 END )::float*100/COUNT(1), 2 ) nps FROM keeyong.nps GROUP BY 1 ORDER BY 1;
SQL
복사
CTAS 부분을 아예 별도의 파일로 떼어내면 어떨까?
환경 설정 중심의 접근 방식
config 폴더 밑에 nps_summary.py를 만든다 (config/nps_summary.py)
{ 'table': 'nps_summary', 'schema': 'keeyong', 'main_sql': """ SELECT LEFT(created_at, 10) AS date, ROUND(SUM(CASE WHEN score >= 9 THEN 1 WHEN score <= 6 THEN -1 END)::float*100/COUNT(1), 2) FROM keeyong.nps GROUP BY 1 ORDER BY 1;""", 'input_check': [ { 'sql': 'SELECT COUNT(1) FROM keeyong.nps', 'count': 150000 }, ], 'output_check': [ { 'sql': 'SELECT COUNT(1) FROM {schema}.temp_{table}', 'count': 12 } ], }
SQL
복사
실행한 sql 쿼리의 check를 통해 unit test를 할 수 있음
최종적으로 output check의 결과를 통해 에러 발생 가능하다.
앞의 SQL을 바탕으로 각자 스키마내에 nps_summary라는 테이블을 구성
위의 nps summary table을 주기적으로 실행하기 위해서 새로운 operator 클래스과 helper 함수를 구현
dags/plugins/redshift_summary.py 안의 클래스, 함수 정의
RedshiftSummaryOperator 클래스
overwrite : True
기존 테이블을 삭제하고 덮어쓰기, False일 경우는 append 형식으로 사용
build_summary_table 함수
지정된 테이블(tables_load)들에 대해서 RedshiftSummaryOperator 를 하나씩 만들고 있다
dags/plugins/redshift_summary.py 코드
새로운 Operator와 helper 함수 구현
Build_Summary_v2.py | +--plugins | +--redshift_summary.py +--config | +-- nps_summary.py +-- mau_summary.py # 추가 요망 +-- channel_summary.py # 추가 요망
Python
복사
+-- mau_summary.py +-- channel_summary.py
앞서 살펴본 위의 두 개의 테이블을 config 형태로 바꿔보는 것이 숙제!
Build_Summary_v2.py
# Build_Summary_v2.py from airflow import DAG from airflow.macros import * import os from glob import glob import logging import subprocess from plugins import redshift_summary # 자체 제작 플러그인 from plugins import slack # 자체 제작 플러그인 DAG_ID = "Build_Summary_v2" dag = DAG( DAG_ID, schedule_interval="25 13 * * *", max_active_runs=1, concurrency=1, catchup=False, start_date=datetime(2021, 9, 17), default_args= { 'on_failure_callback': slack.on_failure_callback, 'retries': 1, 'retry_delay': timedelta(minutes=1), } ) # this should be listed in dependency order (all in analytics) tables_load = [ 'nps_summary' ] dag_root_path = os.path.dirname(os.path.abspath(__file__)) redshift_summary.build_summary_table(dag_root_path, dag, tables_load, "redshift_dev_db")
Python
복사
dag 파일 내부 tables_load 밑에 mau_summary, channel_summary를 문자열로 추가할 것
이어서, 앞서 살펴본 위의 두 개의 테이블을 config 형태로 바꿔보는 것이 숙제!
dag_root_path는 지금 이 파일이 있는 디렉토리를 넘겨줌
dag_root_path 기준으로 tables_load에 해당되는 config 폴더에 있는 configuration python 파일을 찾아서 redshift_dev_db connection을 통해 커넥션 실행

과제 - channel, mau summary를 추가하기

from airflow import DAG from airflow.macros import * import os from glob import glob import logging import subprocess from plugins import redshift_summary from plugins import slack DAG_ID = "Build_Summary_v2" dag = DAG( DAG_ID, schedule_interval="25 13 * * *", max_active_runs=1, concurrency=1, catchup=False, start_date=datetime(2021, 9, 17), default_args= { 'on_failure_callback': slack.on_failure_callback, 'retries': 1, 'retry_delay': timedelta(minutes=1), } ) # this should be listed in dependency order (all in analytics) tables_load = [ # 'nps_summary', 'mau_summary', 'channel_summary' ] dag_root_path = os.path.dirname(os.path.abspath(__file__)) redshift_summary.build_summary_table(dag_root_path, dag, tables_load, "redshift_dev_db")
Python
복사
channel_summary.py
{ 'table': 'channel_summary', 'schema': 'hajuny129', 'main_sql': """ SELECT DISTINCT A.userid, FIRST_VALUE(A.channel) over(partition by A.userid order by B.ts rows between unbounded preceding and unbounded following) AS First_Channel, LAST_VALUE(A.channel) over(partition by A.userid order by B.ts rows between unbounded preceding and unbounded following) AS Last_Channel FROM raw_data.user_session_channel A LEFT JOIN raw_data.session_timestamp B ON A.sessionid = B.sessionid; """, 'input_check': [ # { # 'sql': 'SELECT COUNT(1) FROM hajuny129.channel_summary', # 'count': 949 # }, ], 'output_check': [ { 'sql': 'SELECT COUNT(1) FROM {schema}.temp_{table}', 'count': 949 } ], }
Python
복사
mau_summary.py
{ 'table': 'mau_summary', 'schema': 'hajuny129', 'main_sql': """ SELECT TO_CHAR(A.ts, 'YYYY-MM') AS month, COUNT(DISTINCT B.userid) AS mau FROM raw_data.session_timestamp A JOIN raw_data.user_session_channel B ON A.sessionid = B.sessionid GROUP BY 1; """, 'input_check': [ # { # 'sql': 'SELECT COUNT(1) FROM hajuny129.mau_summary', # 'count': 7 # }, ], 'output_check': [ { 'sql': 'SELECT COUNT(1) FROM {schema}.temp_{table}', 'count': 7 } ], }
Python
복사
airflow dags test Build_Summary_v2 2023-05-14

과제 - channel, mau summary를 추가하기

from airflow import DAG from airflow.macros import * import os from glob import glob import logging import subprocess from plugins import redshift_summary from plugins import slack DAG_ID = "Build_Summary_v2" dag = DAG( DAG_ID, schedule_interval="25 13 * * *", max_active_runs=1, concurrency=1, catchup=False, start_date=datetime(2021, 9, 17), default_args= { 'on_failure_callback': slack.on_failure_callback, 'retries': 1, 'retry_delay': timedelta(minutes=1), } ) # this should be listed in dependency order (all in analytics) tables_load = [ # 'nps_summary', 'mau_summary', 'channel_summary' ] dag_root_path = os.path.dirname(os.path.abspath(__file__)) redshift_summary.build_summary_table(dag_root_path, dag, tables_load, "redshift_dev_db")
Python
복사
channel_summary.py
{ 'table': 'channel_summary', 'schema': 'hajuny129', 'main_sql': """ SELECT DISTINCT A.userid, FIRST_VALUE(A.channel) over(partition by A.userid order by B.ts rows between unbounded preceding and unbounded following) AS First_Channel, LAST_VALUE(A.channel) over(partition by A.userid order by B.ts rows between unbounded preceding and unbounded following) AS Last_Channel FROM raw_data.user_session_channel A LEFT JOIN raw_data.session_timestamp B ON A.sessionid = B.sessionid; """, 'input_check': [ # { # 'sql': 'SELECT COUNT(1) FROM hajuny129.channel_summary', # 'count': 949 # }, ], 'output_check': [ { 'sql': 'SELECT COUNT(1) FROM {schema}.temp_{table}', 'count': 949 } ], }
Python
복사
mau_summary.py
{ 'table': 'mau_summary', 'schema': 'hajuny129', 'main_sql': """ SELECT TO_CHAR(A.ts, 'YYYY-MM') AS month, COUNT(DISTINCT B.userid) AS mau FROM raw_data.session_timestamp A JOIN raw_data.user_session_channel B ON A.sessionid = B.sessionid GROUP BY 1; """, 'input_check': [ # { # 'sql': 'SELECT COUNT(1) FROM hajuny129.mau_summary', # 'count': 7 # }, ], 'output_check': [ { 'sql': 'SELECT COUNT(1) FROM {schema}.temp_{table}', 'count': 7 } ], }
Python
복사
airflow dags test Build_Summary_v2 2023-05-14

Slack API hook

발급 후 테스트, Variable 등록

에러 만들기

name_gender_v3
schema를 keeyong이라 만들어서 에러 발생하게끔 설정
plugins import, dag 파라미터에 slack 플러그인 호출
from airflow import DAG from airflow.operators.python import PythonOperator from airflow.models import Variable from airflow.providers.postgres.hooks.postgres import PostgresHook from datetime import datetime from datetime import timedelta import requests import logging import psycopg2 from plugins import slack def get_Redshift_connection(autocommit=True): hook = PostgresHook(postgres_conn_id='redshift_dev_db') conn = hook.get_conn() conn.autocommit = autocommit return conn.cursor() def extract(**context): link = context["params"]["url"] task_instance = context['task_instance'] execution_date = context['execution_date'] logging.info(execution_date) f = requests.get(link) return (f.text) def transform(**context): logging.info("Transform started") text = context["task_instance"].xcom_pull(key="return_value", task_ids="extract") lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리 records = [] for l in lines: (name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ] records.append([name, gender]) logging.info("Transform ended") return records def load(**context): logging.info("load started") schema = context["params"]["schema"] table = context["params"]["table"] lines = context["task_instance"].xcom_pull(key="return_value", task_ids="transform") """ records = [ [ "Keeyong", "M" ], [ "Claire", "F" ], ... ] """ # BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음 cur = get_Redshift_connection() try: cur.execute("BEGIN;") cur.execute(f"DELETE FROM {schema}.name_gender;") # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태 for r in records: name = r[0] gender = r[1] print(name, "-", gender) sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')" cur.execute(sql) cur.execute("COMMIT;") # cur.execute("END;") except (Exception, psycopg2.DatabaseError) as error: print(error) cur.execute("ROLLBACK;") raise logging.info("load done") dag = DAG( dag_id = 'name_gender_v3', start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨 schedule = '0 2 * * *', # 적당히 조절 catchup = False, max_active_runs = 1, default_args = { 'retries': 1, 'retry_delay': timedelta(minutes=3), 'on_failure_callback': slack.on_failure_callback, } ) extract = PythonOperator( task_id = 'extract', python_callable = extract, params = { 'url': Variable.get("csv_url") }, dag = dag) transform = PythonOperator( task_id = 'transform', python_callable = transform, params = { }, dag = dag) load = PythonOperator( task_id = 'load', python_callable = load, params = { 'schema': 'keeyong', 'table': 'name_gender' }, dag = dag) extract >> transform >> load
Python
복사
에러가 발생했지만 에러 관련 Hook 연동이 되지 않습니다!
질문 1
위와 같이 Slack Hook이 되지 않는 점
질문 2
name_gender_v4 dag를 실행할 때, airflow dags test name_gender_v4 2023-05-15로 실행을 했을 때 성공했었는데, 그 이후로는 실패한 코드를 넣어놓아도 execution date를 최신 날짜로 넣어도 성공했다는 결과가 나옵니다. 이유를 모르겠습니다.