Search

[Airflow] Primary Key Uniqueness란?, Upsert

Primary Key Uniqueness란?

테이블에서 하나의 레코드를 유일하게 지칭할 수 있는 필드(들)
이를 CREATE TABLE 사용시 지정
관계형 데이터베이스 시스템이 Primary key의 값이 중복 존재하는 것을 막아줌
예 1) Users 테이블에서 email 필드
예 2) Products 테이블에서 product_id 필드
CREATE TABLE products ( product_id INT PRIMARY KEY, name VARCHAR(50), price decimal(7, 2) );
Python
복사
하나의 필드가 일반적이지만 다수의 필드를 사용할 수도 있음
CREATE TABLE orders ( order_id INT, product_id INT, PRIMARY KEY (order_id, product_id), FOREIGN KEY (product_id) REFERENCES products (product_id) ); -- product 테이블을 참조
SQL
복사
Foreign key를 두게 되면 SQL 옵티마이저가 쿼리 최적화를 더 잘할 수 있게 된다.

빅데이터 기반 데이터 웨어하우스들은 Primary Key를 지켜주지 않음

정책적으로 Primary key를 기준으로 유일성 보장을 해주지 않음
이를 보장하는 것은 데이터 인력의 책임
각각 ETL, ELT를 수행할 때 보장을 해줘야한다.
빅데이터 기반 데이터 웨어하우스Primary key 유일성을 보장해주지 않는 이유는?
모든 레코드들의 Pk를 B+Tree 형태로 메모리에 올려놓은 후 룩업을 해야한다
레코드가 추가될 때마다 PK가 존재하고 있는가를 체크를 매번 해야한다.
체크를 하기 위한 효율적인 자료구조를 만들어놔야할 확률이 높다
OLTP는 이런 pk uniqueness를 꼭 필요로 하지만, 일반적으로 데이터의 크기가 작기 때문에, pk uniqueness를 매번 확인하는 작업이 부담이 되지 않는다.
이로 인해 보장하는데 메모리와 시간이 더 들기 때문에 대용량 데이터의 적재가 걸림돌이 된다
CREATE TABLE keeyong.test ( date date primary key, value bigint ); INSERT INTO keeyong.test VALUES ('2023-05-10', 100); INSERT INTO keeyong.test VALUES ('2023-05-10', 150); -- 두번째 작업이 성공함! 일반 RDB는 성공 못함 -- pK를 지정한다고 유일성 보장 X
SQL
복사

Primary Key 유지 방법 - SQL 기반 Upsert

Upsert란?
Primary Key를 기준으로 존재하는 레코드라면 새 정보로 수정
존재하지 않는 레코드라면 새 레코드로 적재
보통 데이터 웨어하우스마다 UPSERT를 효율적으로 해주는 문법을 지원해줌
MySQL to Redshift DAG를 구현할 때 살펴볼 예정
아래에서는 Upsert를 row number를 통해 구현
데이터 웨어하우스들은 Primary Key를 지켜주지 않음
timestamp를 받아온 후 중복이 생길 경우, row_number를 사용해서 가장 최근의 레코드를 사용
앞서 살펴본 keeyong.weather_forecast 테이블을 대상으로 살펴보자
CREATE TABLE keeyong.weather_forecast ( date date primary key, temp float, min_temp float, max_temp float, created_date timestamp default GETDATE() );
SQL
복사
날씨 정보이기 때문에 최근 정보가 더 신뢰할 수 있음.
date는 중복을 허용할 것
그래서 어느 정보가 더 최근 정보인지를 created_date 필드에 기록하고 이를 활용
⇒ 즉 date이 같은 레코드들이 있다면 created_date을 기준으로 더 최근 정보를 선택 ⇒ Incremental update
이를 하는데 적합한 SQL 문법이 ROW_NUMBER
ROW_NUMBER를 이용해서 primary key로 partition을 잡고 적당한 다른 필드(보통 타임스탬프 필드)로 ordering을 수행해 primary key별로 하나의 레코드를 잡아냄
1.
date별로 created_date의 역순으로 일련번호를 매기고 싶다면?
2.
새로운 컬럼 추가!! ⇒ seq 칼럼
date별로 레코드를 모으고 그 안에서 created_date의 역순으로 소팅한 후, 1번부터 일련 번호 (seq) 부여
3.
ROW_NUMBER를 쓰면 2를 구현 가능 ROW_NUMBER() OVER (partition by date order by created_date DESC) seq
4.
where을 통해 seq가 1인 row들만 뽑아내면 중복이 제거됨
UPSERT 일반화 정리
임시 테이블(스테이징 테이블)을 만들고 거기로 현재 모든 레코드를 복사
임시 테이블에 새로 데이터소스에서 읽어들인 레코드들을 복사
이 때 중복 존재 가능
만약 DISTINCT SELECT를 하게 되면 한 날짜에 2개의 정보를 가진 행들이 생길 수도 있다.
중복을 걸러주는 SQL 작성:
최신 레코드를 우선 순위로 선택
ROW_NUMBER를 이용해서 primary key로 partition을 잡고 적당한 다른 필드(보통 타임스탬프 필드)로 ordering(역순 DESC)을 수행해 primary key별로하나의 레코드를 잡아냄
위의 SQL을 바탕으로 최종 원본 테이블로 복사
이때 원본 테이블에서 레코드들을 삭제
임시 temp 테이블을 원본 테이블로 복사
코드를 통한 정리
1.
CREATE TEMP TABLE t AS SELECT * FROM keeyong.weather_forecast;
원래 테이블의 내용을 임시 테이블 t로 복사
2.
DAG는 임시 테이블(스테이징 테이블)에 레코드를 추가
이 때 중복 데이터가 들어갈 수 있음
3.
DELETE FROM keeyong.weather_forecast;
4.
중복을 없앤 형태로 새로운 테이블 생성
INSERT INTO keeyong.weather_forecast SELECT date, temp, min_temp, max_temp, created_date FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq FROM keeyong.temp_weather_forecast ) WHERE seq = 1;
SQL
복사
여기서 (Auto_commit = True) 라는 전제 하에서 transaction(BEGIN~END)으로 처리되어야 하는 최소 범위의 SQL들은?
⇒ 3~4번, 3 ~ 4 사이에 문제가 생기면 정합성이 깨진다.
만약, auto_commit = False는 이 모든 작업이 Transaction
weather_forecast로 Incremental Update 다시 설명
화살표로 가르킨 레코드들이 우선시 되어야 함.
이를 위해 created_date을 만들었고 이를 기준으로 ROW_NUMBER로 일련번호를 만듬
위의 SQL 기반 Upsert 코드를 airflow DAG로 구현한 코드
from airflow import DAG from airflow.decorators import task from airflow.models import Variable from airflow.providers.postgres.hooks.postgres import PostgresHook from datetime import datetime from datetime import timedelta import requests import logging import json def get_Redshift_connection(): # autocommit is False by default hook = PostgresHook(postgres_conn_id='redshift_dev_db') return hook.get_conn().cursor() @task def etl(schema, table, lat, lon, api_key): # https://openweathermap.org/api/one-call-api url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts" response = requests.get(url) data = json.loads(response.text) """ {'dt': 1622948400, 'sunrise': 1622923873, 'sunset': 1622976631, 'moonrise': 1622915520, 'moonset': 1622962620, 'moon_phase': 0.87, 'temp': {'day': 26.59, 'min': 15.67, 'max': 28.11, 'night': 22.68, 'eve': 26.29, 'morn': 15.67}, 'feels_like': {'day': 26.59, 'night': 22.2, 'eve': 26.29, 'morn': 15.36}, 'pressure': 1003, 'humidity': 30, 'dew_point': 7.56, 'wind_speed': 4.05, 'wind_deg': 250, 'wind_gust': 9.2, 'weather': [{'id': 802, 'main': 'Clouds', 'description': 'scattered clouds', 'icon': '03d'}], 'clouds': 44, 'pop': 0, 'uvi': 3} """ ret = [] for d in data["daily"]: day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d') ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"])) cur = get_Redshift_connection() # 원본 테이블이 없다면 생성 create_table_sql = f"""CREATE TABLE IF NOT EXISTS {schema}.{table} ( date date, temp float, min_temp float, max_temp float, created_date timestamp default GETDATE() );""" logging.info(create_table_sql) # 임시 테이블 생성 create_t_sql = f"""CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};""" logging.info(create_t_sql) try: cur.execute(create_table_sql) cur.execute(create_t_sql) cur.execute("COMMIT;") except Exception as e: cur.execute("ROLLBACK;") raise # 임시 테이블 데이터 입력 - 8일간의 데이터를 임시 테이블에 적재 insert_sql = f"INSERT INTO t VALUES " + ",".join(ret) logging.info(insert_sql) try: cur.execute(insert_sql) cur.execute("COMMIT;") except Exception as e: cur.execute("ROLLBACK;") raise # 기존 테이블 대체 - row number seq 기준 1인 것들만 alter_sql = f"""DELETE FROM {schema}.{table}; INSERT INTO {schema}.{table} SELECT date, temp, min_temp, max_temp FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq FROM t ) WHERE seq = 1;""" logging.info(alter_sql) try: cur.execute(alter_sql) cur.execute("COMMIT;") except Exception as e: cur.execute("ROLLBACK;") raise with DAG( dag_id = 'Weather_to_Redshift_v2', start_date = datetime(2022,8,24), # 날짜가 미래인 경우 실행이 안됨 schedule = '0 4 * * *', # 적당히 조절 max_active_runs = 1, catchup = False, default_args = { 'retries': 1, 'retry_delay': timedelta(minutes=3), } ) as dag: etl("hajuny129", "weather_forecast_v2", 37.5665, 126.9780, Variable.get("open_weather_api_key"))
Python
복사