๋ฉฑ๋ฑ์ฑ (Impodent)
๋ฉฑ๋ฑ์ฑ(Impodent)์ด๋ ๋ฌด์์ธ๊ฐ?
โข
๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ์ด ์ฐ์ ์คํ๋์์ ๋ ์์ค์ ์๋ ๋ฐ์ดํฐ๊ฐ ๊ทธ๋๋ก DW๋ก
์ ์ฅ๋์ด์ผํจ์ ์ด์ผ๊ธฐ
a. No duplicates, no missing data
โฆ
๋ง์ฝ Incremental Update ์, ์ค๋ณต์ ์ ๊ฑฐํ์ง ์๊ณ ๊ณ์ ์ ์ฌํ๋ ์์ค ์ฝ๋๋ผ๋ฉด, ๋ฉฑ๋ฑ์ฑ์ด ์ง์ผ์ง์ง ์์ ๊ฒ.
โข
Full refresh๋ฅผ ํ๋ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ์ด๋ผ๋ฉด?
1. ๋จผ์ DW์ ๊ด๋ จ ํ
์ด๋ธ์์ ๋ชจ๋ ๋ ์ฝ๋๋ฅผ ์ญ์
2. ๋ฐ์ดํฐ ์์ค์์ ์ฝ์ด์จ ๋ฐ์ดํฐ๋ฅผ DW ํ
์ด๋ธ๋ก ์ ์ฌ
โฆ
๋ง์ผ 1๋ฒ์ด ์ฑ๊ณตํ๊ณ 2๋ฒ์ด ์คํจํ๋ค๋ฉด? ๋ฉฑ๋ฑ์ฑ ๊นจ์ง โ ์๋ณธ ๋ฐ์ดํฐ์ DW ํ
์ด๋ธ์ด ์ผ์น X, ์ ํฉ์ฑ์ด ๊นจ์ง
โฆ
๋ง์ผ 1๋ฒ์ด ์คํ๋ ๋ค์์ ๋๊ตฐ๊ฐ ์ด ํ
์ด๋ธ์ ์ฌ์ฉํ๋ค๋ฉด?
๊ฐ๋จํ ETL
Extract, Transform, Load
โข
Extract:
โฆ
๋ฐ์ดํฐ๋ฅผ ๋ฐ์ดํฐ ์์ค์์ ์ฝ์ด๋ด๋ ๊ณผ์
โข
Transform:
โฆ
ํ์ํ๋ค๋ฉด ๊ทธ ์๋ณธ ๋ฐ์ดํฐ์ ํฌ๋งท์ ์ํ๋ ํํ๋ก ๋ณ๊ฒฝ์ํค๋ ๊ณผ์ . ๊ตณ์ด ๋ณํํ ํ์๋ ์๋ค
โข
Load:
โฆ
์ต์ข
์ ์ผ๋ก Data Warehouse์ ํ
์ด๋ธ๋ก ์ง์ด๋ฃ๋ ๊ณผ์
โข
ํ๋์ DAG๋ ์ฌ๋ฌ ๊ฐ์ task๋ผ๋ ๋จ์๋ก ๋๋์ด์ ธ ์์
์ค์ต ETL ๊ฐ์
โข
์น(S3) ์์ ์กด์ฌํ๋ ์ด๋ฆ, ์ฑ๋ณ ๋ด์ฉ์ CSV ํ์ผ์ ๋ค์ด๋ฐ์์ Redshift์ ์๋ ํ
์ด๋ธ๋ก ๋ณต์ฌ
1.
๊ฐ์์๊ฒ ํ ๋น๋ schema๋ฐ์ ์๋ ํ
์ด๋ธ์ ์์ฑ
CREATE TABLE (๋ณธ์ธ์์คํค๋ง).name_gender (
name varchar(32) primary key,
gender varchar(8)
);
SQL
๋ณต์ฌ
โข
ํ์ง๋ง ๋ฐ์ดํฐ์จ์ดํ์ฐ์ค๋ PK๋ฅผ ๋ณด์ฅํด์ฃผ์ง ์๋๋ค. ์๋ฌต์ ๋ฃฐ๋ก ๋ณด์ฅ์ ํด์ฃผ์ด์ผํ๋ค.
2.
๋ฐ์ดํฐ ์์ค
a.
S3 ๋ด๋ถ csv ํ์ผ์๋ 2๊ฐ์ ํ๋๊ฐ ์กด์ฌ (name, gender)
3.
ํ์ด์ฌ์ผ๋ก ์์ฑ: ์ธ ๊ฐ์ ํจ์๋ก ๊ตฌ์ฑ
a.
extract, transform, load
โฆ
์์ ๊ณผ์ ์ ์ฌ๊ฐํ๋ฉด ์ค๋ณต์ด ์๊ธฐ๊ฒ ๋๋ค
โช
๋ฉฑ๋ฑ์ฑ์ ๋ณด์ฅํด์ฃผ์ด์ผ ํ๋ค
โฆ
๋ํ ํ ์ค์ฉ ์
๋ฐ์ดํธ ํ๋ ๊ฒ ์๋๋ผ ํ์ผ ๋ฐฐ์น์ฑ์ผ๋ก bulk update๋ฅผ ํด์ฃผ์ด์ผ ํ๋ค
โฆ
ํ ์ค์ฉ ์ธ์ํธํ๋ฉด์ pk ํ์ธ์ ํด์ฃผ๊ฒ ๋๋ฉด ํจ์จ์ด ์๋์จ๋ค
ETL ์ถ๊ฐ ๋ฌธ์
1.
ํค๋๊ฐ ๋ ์ฝ๋๋ก ์ถ๊ฐ๋๋ ๋ฌธ์ ํด๊ฒฐํ๊ธฐ
2.
Idempotentํ๊ฒ ์ก์ ๋ง๋ค๊ธฐ (full refresh ์ก์ด๋ผ๊ณ ๊ฐ์ )
a. ์ฌ๋ฌ ๋ฒ ์คํํด๋ ๋์ผํ ๊ฒฐ๊ณผ๊ฐ ๋์ค๊ฒ ๋ง๋ค๊ธฐ
3.
Transaction์ ์ฌ์ฉํด๋ณด๊ธฐ
โข
BEGIN; DELETE FROM ..;
INSERT INTOโฆ ;END;
์ ๋ต ์ฝ๋
์ค์ต Python ETL ๊ฐ์ ํ๊ธฐ
โข
๋ฐ์ดํฐ ์จ์ดํ์ฐ์ค์์ ํ
์ด๋ธ ์
๋ฐ์ดํธ ๋ฐฉ๋ฒ์ ํฌ๊ฒ ๋ ๊ฐ์ง
โฆ
Full Refresh
โช
๋จ์ํด์ ์ข์ง๋ง ๋ฐ์ดํฐ๊ฐ ์ปค์ง๋ฉด ์ฌ์ฉ๋ถ๊ฐ๋ฅ
โช
์ ๊ณผ์ ๋ Full Refreshํ๋ ์๋ผ๊ณ ๋ณด๋ฉด ๋จ
โฆ
Incremental Update
โช
๋ฐ์ดํฐ๊ฐ ํด ๊ฒฝ์ฐ ํจ๊ณผ์ ์ด์ง๋ง ๋ณต์ก๋ ์ฆ๊ฐ
โช
๋ณดํต ํ์์คํฌํ ํน์ ์ผ๋ จ ๋ฒํธ ๋ฑ์ ํ๋ ํ์
โช
execution_date ํ์ฉ
โข
DELETE FROM vs. TRUNCATE
โฆ
๋ ๋ค ํ
์ด๋ธ์์ ๋ ์ฝ๋๋ฅผ ์ญ์ ์์ผ์ฃผ๋ ๊ฒ
โฆ
DELETE FROM ๋ช
๋ น
โช
ํ
์ด๋ธ์์ ํน์ ๋ ์ฝ๋๋ฅผ ์ญ์ ํ๋ ๋ฐ ์ฌ์ฉ๋ฉ๋๋ค.
โช
Transaction์ ์กด์คํ๋ค
โฆ
TRUNCATE ๋ช
๋ น
โช
ํ
์ด๋ธ์์ ์ ์ฒด ๋ฐ์ดํฐ๋ฅผ ์ญ์ ํ๋ ๋ฐ ์ฌ์ฉ๋ฉ๋๋ค.
โช
ํฐ ํ
์ด๋ธ์ ์ญ์ ํ ๋ DELETE FROM๋ณด๋ค ๋น ๋ฅด๋ค.
โช
Transaction์ ๋ฌด์ํ๊ณ ์ญ์ ๋ฅผ ํด๋ฒ๋ฆฐ๋ค
โข
ํจ๋ถ๋ก ์ฐ๋ฉด ๋ฌธ์ ๊ฐ ์๊ธธ ์ฌ์ง๊ฐ ์๋ค.
ํธ๋์ญ์ ์ด๋?
โข
ํธ๋์ญ์
์ด๋?
โฆ
์ค๊ฐ์ ์คํจํ๋ฉด ๋ถ์์ ์ํฉ์ ๋์ด๋ ์์
๋ค์ด ์๋ค๋ฉด?
โฆ
์๋ ์์์ ์ธ์ถ์ ์ฑ๊ณตํ์ง๋ง ์ก๊ธ์์ ๋ฌธ์ ๊ฐ ์๊ธด๋ค๋ฉด?
โข
ํธ๋์ญ์
์ ์
Atomicํ๊ฒ ์คํ๋์ด์ผ ํ๋ SQL๋ค์ ๋ฌถ์ด์ ํ๋์ ์์
์ฒ๋ผ ์ฒ๋ฆฌํ๋ ๋ฐฉ๋ฒ
โฆ
SQL, ๊ด๊ณํ DB์์ ์ฌ์ฉ๋๋ ๊ฐ๋
โฆ
BEGIN๊ณผ END ํน์ BEGIN๊ณผ COMMIT ์ฌ์ด์ ํด๋น SQL๋ค์ ๋ฐฐ์น ๋ฐ ใ
ใ
์ฌ์ฉ
โฆ
COMMIT์ ํตํด Pysical ํ
์ด๋ธ์ ๋ฐ์์ ์ํจ๋ค.
โฆ
Transaction ์ฌ์ด์ ์ ํฉ์ฑ์ด ๊นจ์ง ๊ฒฝ์ฐ, ROLLBACK ์คํ โ Transaction ์ด์ ์ํ๋ก ๋์๊ฐ๋ค
โข
Redshift(postgre DB) ์ฐ๊ฒฐ ํจ์
import psycopg2
# postgresql db๋ฅผ ์ฐ๊ฒฐ - redshift
# Redshift connection ํจ์
# ๋ณธ์ธ ID/PW ์ฌ์ฉ!
def get_Redshift_connection(boolean=True):
host = ""
redshift_user = ""
redshift_pass = ""
port = 5439
dbname = ""
conn = psycopg2.connect("dbname={dbname} user={user} host={host} password={password} port={port}".format(
dbname=dbname,
user=redshift_user,
password=redshift_pass,
host=host,
port=port
))
conn.set_session(autocommit=boolean)
return conn.cursor()
Python
๋ณต์ฌ
โข
๋ ๊ฐ์ง ์ข
๋ฅ์ ํธ๋์ญ์
์ด ์กด์ฌ
โฆ
conn.set_session(autocommit=boolean)
โฆ
๋ ์ฝ๋ ๋ณ๊ฒฝ์ ๋ฐ๋ก ๋ฐ์ํ๋์ง ์ฌ๋ถ. autocommit์ด๋ผ๋ ํ๋ผ๋ฏธํฐ๋ก ์กฐ์ ๊ฐ๋ฅ
โฆ
autocommit ์ ์ฐ๊ธฐ ๋ฐ ์ญ์ ์์
์ผ ๋ ์ค์ํจ
โข
autocommit=True
โฆ
๊ธฐ๋ณธ์ ์ผ๋ก ๋ณ๋์ ํธ๋์ญ์
๊ตฌ๊ฐ์ด ์๋ค๋ฉด ๋ชจ๋ SQL statement๊ฐ ๋ฐ๋ก ์ปค๋ฐ๋จ
โฆ
์ด๋ฅผ ๋ฐ๊พธ๊ณ ์ถ๋ค๋ฉด BEGIN;END; ํน์ BEGIN;COMMIT์ ์ฌ์ฉ (ํน์ ROLLBACK)
โฆ
COMMIT ์ ๊น์ง๋ ์คํ
์ด์ง ํ
์ด๋ธ์๋ง ๋ณด์ด๊ฒ ๋ฉ๋๋ค
โข
autocommit=False
โฆ
๊ธฐ๋ณธ์ ์ผ๋ก ๋ชจ๋ SQL statement๊ฐ ์ปค๋ฐ๋์ง ์์
โฆ
Commit ์ ๊น์ง๋ ์์ฑ ์ค์ธ ์ฌ์ฉ์์๊ฒ๋ง ๋ณด์ธ๋ค. (๋ฌผ๋ฆฌ์ DB์ ๋ฐ์์ด ๋์ง ์๋๋ค)
โฆ
์ปค๋ฅ์
๊ฐ์ฒด์ .commit()๊ณผ .rollback()ํจ์๋ก ์ปค๋ฐํ ์ง ๋ง์ง ๊ฒฐ์
โข
2๊ฐ ์ค์์ ๋ฌด์์ ์ฌ์ฉํ ์ง๋ ๊ฐ์ธ ์ทจํฅ
โฆ
Python์ ๊ฒฝ์ฐ try/catch์ ๊ฐ์ด ์ฌ์ฉํ๋ ๊ฒ์ด ์ผ๋ฐ์
โช
try/catch๋ก ์๋ฌ๊ฐ ๋๋ฉด rollback์ ๋ช
์์ ์ผ๋ก ์คํ. ์๋ฌ๊ฐ ์ ๋๋ฉด commit์ ์คํ
conn = get_Redshift_connection(True)
# True ์ผ ๋๋ ๋ฐ๋ก physical table์ ์์ฑ ๋ฐ ๋ฐ์์ ํด๋ฒ๋ฆฐ๋ค.
cur = conn.cursor()
cur.execute("DELETE FROM hajuny129.name_gender;")
try:
cur.execute("BEGIN;")
cur.execute("DELETE FROM hajuny129.name_gender;")
cur.execute("INSERT INTO hajuny129.name_gender VALUES ('Claire', 'Female');")
cur.execute("END;")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
cur.execute("ROLLBACK;")
# finally :
# conn.close()
Python
๋ณต์ฌ
โฆ
try/except ์ฌ์ฉ์ ์ ์ํ ์
โช
์๋์ ๊ฐ์ ์ฝ๋๊ฐ ์์๋ค๋ฉด exception์ด ๋ฐ์ํ ๊ฒฝ์ฐ ์ด๋ป๊ฒ ๋ ๊น?
try:
cur.execute(create_sql)
cur.execute("COMMIT;")
except Exception as e:
cur.execute("ROLLBACK;")
raise # ์๋ฌ ์ ํ => transaction fail
Python
๋ณต์ฌ
โช
๋ง์ฝ raise ๋ฌธ์ด ์๋ค๋ฉด ์๋ฌ๊ฐ ๋ฐ์ํ๋ค๋ ์ฌ์ค์ด ์ธ๋ถ๋ก ์ ํ๊ฐ ๋์ง ์์ต๋๋ค. ๋ง์น ์๋ฌ๊ฐ ๋ฐ์ํ๋๋ฐ, ์๋ฌด ์ผ๋ ์๋ ๊ฒ์ฒ๋ผ ์ธ์ง๋ ์ ์์ต๋๋ค.
โช
except์์ raise๋ฅผ ํธ์ถํ๋ฉด ๋ฐ์ํ ์๋ exception์ด ์๋ก ์ ํ๋ฉ๋๋ค.
โข
์๋ฌ๊ฐ ์ ํ๋ ํ, ํธ๋์ญ์
์ด Fail๋ ๊ฒ์
๋๋ค.
โข
ETL์ ๊ด๋ฆฌํ๋ ์
์ฅ์์ ์ด๋ค ์๋ฌ๊ฐ ๊ฐ์ถฐ์ง๋ ๊ฒ๋ณด๋ค๋ ๋ช
ํํ๊ฒ ๋๋ฌ๋๋ ๊ฒ์ด ๋ ์ข์
โข
์์ ๊ฒฝ์ฐ cur.execute ๋ค์ raise๋ฅผ ํธ์ถํ๋ ๊ฒ์ด ์ข์