Search

Transaction

๋ฉฑ๋“ฑ์„ฑ (Impodent)

๋ฉฑ๋“ฑ์„ฑ(Impodent)์ด๋ž€ ๋ฌด์—‡์ธ๊ฐ€?
โ€ข
๋ฐ์ดํ„ฐ ํŒŒ์ดํ”„๋ผ์ธ์ด ์—ฐ์† ์‹คํ–‰๋˜์—ˆ์„ ๋•Œ ์†Œ์Šค์— ์žˆ๋Š” ๋ฐ์ดํ„ฐ๊ฐ€ ๊ทธ๋Œ€๋กœ DW๋กœ ์ €์žฅ๋˜์–ด์•ผํ•จ์„ ์ด์•ผ๊ธฐ
a. No duplicates, no missing data
โ—ฆ
๋งŒ์•ฝ Incremental Update ์‹œ, ์ค‘๋ณต์„ ์ œ๊ฑฐํ•˜์ง€ ์•Š๊ณ  ๊ณ„์† ์ ์žฌํ•˜๋Š” ์†Œ์Šค ์ฝ”๋“œ๋ผ๋ฉด, ๋ฉฑ๋“ฑ์„ฑ์ด ์ง€์ผœ์ง€์ง€ ์•Š์€ ๊ฒƒ.
โ€ข
Full refresh๋ฅผ ํ•˜๋Š” ๋ฐ์ดํ„ฐ ํŒŒ์ดํ”„๋ผ์ธ์ด๋ผ๋ฉด?
1. ๋จผ์ € DW์˜ ๊ด€๋ จ ํ…Œ์ด๋ธ”์—์„œ ๋ชจ๋“  ๋ ˆ์ฝ”๋“œ๋ฅผ ์‚ญ์ œ 2. ๋ฐ์ดํ„ฐ ์†Œ์Šค์—์„œ ์ฝ์–ด์˜จ ๋ฐ์ดํ„ฐ๋ฅผ DW ํ…Œ์ด๋ธ”๋กœ ์ ์žฌ
โ—ฆ
๋งŒ์ผ 1๋ฒˆ์ด ์„ฑ๊ณตํ•˜๊ณ  2๋ฒˆ์ด ์‹คํŒจํ•œ๋‹ค๋ฉด? ๋ฉฑ๋“ฑ์„ฑ ๊นจ์ง โ‡’ ์›๋ณธ ๋ฐ์ดํ„ฐ์™€ DW ํ…Œ์ด๋ธ”์ด ์ผ์น˜ X, ์ •ํ•ฉ์„ฑ์ด ๊นจ์ง
โ—ฆ
๋งŒ์ผ 1๋ฒˆ์ด ์‹คํ–‰๋œ ๋‹ค์Œ์— ๋ˆ„๊ตฐ๊ฐ€ ์ด ํ…Œ์ด๋ธ”์„ ์‚ฌ์šฉํ•œ๋‹ค๋ฉด?

๊ฐ„๋‹จํ•œ ETL

Extract, Transform, Load
โ€ข
Extract:
โ—ฆ
๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ์ดํ„ฐ ์†Œ์Šค์—์„œ ์ฝ์–ด๋‚ด๋Š” ๊ณผ์ •
โ€ข
Transform:
โ—ฆ
ํ•„์š”ํ•˜๋‹ค๋ฉด ๊ทธ ์›๋ณธ ๋ฐ์ดํ„ฐ์˜ ํฌ๋งท์„ ์›ํ•˜๋Š” ํ˜•ํƒœ๋กœ ๋ณ€๊ฒฝ์‹œํ‚ค๋Š” ๊ณผ์ •. ๊ตณ์ด ๋ณ€ํ™˜ํ•  ํ•„์š”๋Š” ์—†๋‹ค
โ€ข
Load:
โ—ฆ
์ตœ์ข…์ ์œผ๋กœ Data Warehouse์— ํ…Œ์ด๋ธ”๋กœ ์ง‘์–ด๋„ฃ๋Š” ๊ณผ์ •
โ€ข
ํ•˜๋‚˜์˜ DAG๋Š” ์—ฌ๋Ÿฌ ๊ฐœ์˜ task๋ผ๋Š” ๋‹จ์œ„๋กœ ๋‚˜๋ˆ„์–ด์ ธ ์žˆ์Œ

์‹ค์Šต ETL ๊ฐœ์š”

โ€ข
์›น(S3) ์ƒ์— ์กด์žฌํ•˜๋Š” ์ด๋ฆ„, ์„ฑ๋ณ„ ๋‚ด์šฉ์˜ CSV ํŒŒ์ผ์„ ๋‹ค์šด๋ฐ›์•„์„œ Redshift์— ์žˆ๋Š” ํ…Œ์ด๋ธ”๋กœ ๋ณต์‚ฌ
1.
๊ฐ์ž์—๊ฒŒ ํ• ๋‹น๋œ schema๋ฐ‘์— ์•„๋ž˜ ํ…Œ์ด๋ธ”์„ ์ƒ์„ฑ
CREATE TABLE (๋ณธ์ธ์˜์Šคํ‚ค๋งˆ).name_gender ( name varchar(32) primary key, gender varchar(8) );
SQL
๋ณต์‚ฌ
โ€ข
ํ•˜์ง€๋งŒ ๋ฐ์ดํ„ฐ์›จ์–ดํ•˜์šฐ์Šค๋Š” PK๋ฅผ ๋ณด์žฅํ•ด์ฃผ์ง€ ์•Š๋Š”๋‹ค. ์•”๋ฌต์  ๋ฃฐ๋กœ ๋ณด์žฅ์„ ํ•ด์ฃผ์–ด์•ผํ•œ๋‹ค.
2.
๋ฐ์ดํ„ฐ ์†Œ์Šค
a.
S3 ๋‚ด๋ถ€ csv ํŒŒ์ผ์—๋Š” 2๊ฐœ์˜ ํ•„๋“œ๊ฐ€ ์กด์žฌ (name, gender)
3.
ํŒŒ์ด์ฌ์œผ๋กœ ์ž‘์„ฑ: ์„ธ ๊ฐœ์˜ ํ•จ์ˆ˜๋กœ ๊ตฌ์„ฑ
a.
extract, transform, load
โ—ฆ
์œ„์˜ ๊ณผ์ •์€ ์žฌ๊ฐœํ•˜๋ฉด ์ค‘๋ณต์ด ์ƒ๊ธฐ๊ฒŒ ๋œ๋‹ค
โ–ช
๋ฉฑ๋“ฑ์„ฑ์„ ๋ณด์žฅํ•ด์ฃผ์–ด์•ผ ํ•œ๋‹ค
โ—ฆ
๋˜ํ•œ ํ•œ ์ค„์”ฉ ์—…๋ฐ์ดํŠธ ํ•˜๋Š” ๊ฒŒ ์•„๋‹ˆ๋ผ ํŒŒ์ผ ๋ฐฐ์น˜์„ฑ์œผ๋กœ bulk update๋ฅผ ํ•ด์ฃผ์–ด์•ผ ํ•œ๋‹ค
โ—ฆ
ํ•œ ์ค„์”ฉ ์ธ์„œํŠธํ•˜๋ฉด์„œ pk ํ™•์ธ์„ ํ•ด์ฃผ๊ฒŒ ๋˜๋ฉด ํšจ์œจ์ด ์•ˆ๋‚˜์˜จ๋‹ค

ETL ์ถ”๊ฐ€ ๋ฌธ์ œ

1.
ํ—ค๋”๊ฐ€ ๋ ˆ์ฝ”๋“œ๋กœ ์ถ”๊ฐ€๋˜๋Š” ๋ฌธ์ œ ํ•ด๊ฒฐํ•˜๊ธฐ
2.
Idempotentํ•˜๊ฒŒ ์žก์„ ๋งŒ๋“ค๊ธฐ (full refresh ์žก์ด๋ผ๊ณ  ๊ฐ€์ •) a. ์—ฌ๋Ÿฌ ๋ฒˆ ์‹คํ–‰ํ•ด๋„ ๋™์ผํ•œ ๊ฒฐ๊ณผ๊ฐ€ ๋‚˜์˜ค๊ฒŒ ๋งŒ๋“ค๊ธฐ
3.
Transaction์„ ์‚ฌ์šฉํ•ด๋ณด๊ธฐ
โ€ข
BEGIN; DELETE FROM ..; INSERT INTOโ€ฆ ;END;
์ •๋‹ต ์ฝ”๋“œ

์‹ค์Šต Python ETL ๊ฐœ์„ ํ•˜๊ธฐ

โ€ข
๋ฐ์ดํ„ฐ ์›จ์–ดํ•˜์šฐ์Šค์—์„œ ํ…Œ์ด๋ธ” ์—…๋ฐ์ดํŠธ ๋ฐฉ๋ฒ•์€ ํฌ๊ฒŒ ๋‘ ๊ฐ€์ง€
โ—ฆ
Full Refresh
โ–ช
๋‹จ์ˆœํ•ด์„œ ์ข‹์ง€๋งŒ ๋ฐ์ดํ„ฐ๊ฐ€ ์ปค์ง€๋ฉด ์‚ฌ์šฉ๋ถˆ๊ฐ€๋Šฅ
โ–ช
์œ„ ๊ณผ์ œ๋Š” Full Refreshํ•˜๋Š” ์˜ˆ๋ผ๊ณ  ๋ณด๋ฉด ๋จ
โ—ฆ
Incremental Update
โ–ช
๋ฐ์ดํ„ฐ๊ฐ€ ํด ๊ฒฝ์šฐ ํšจ๊ณผ์ ์ด์ง€๋งŒ ๋ณต์žก๋„ ์ฆ๊ฐ€
โ–ช
๋ณดํ†ต ํƒ€์ž„์Šคํƒฌํ”„ ํ˜น์€ ์ผ๋ จ ๋ฒˆํ˜ธ ๋“ฑ์˜ ํ•„๋“œ ํ•„์š”
โ–ช
execution_date ํ™œ์šฉ
โ€ข
DELETE FROM vs. TRUNCATE
โ—ฆ
๋‘˜ ๋‹ค ํ…Œ์ด๋ธ”์—์„œ ๋ ˆ์ฝ”๋“œ๋ฅผ ์‚ญ์ œ์‹œ์ผœ์ฃผ๋Š” ๊ฒƒ
โ—ฆ
DELETE FROM ๋ช…๋ น
โ–ช
ํ…Œ์ด๋ธ”์—์„œ ํŠน์ • ๋ ˆ์ฝ”๋“œ๋ฅผ ์‚ญ์ œํ•˜๋Š” ๋ฐ ์‚ฌ์šฉ๋ฉ๋‹ˆ๋‹ค.
โ–ช
Transaction์„ ์กด์ค‘ํ•œ๋‹ค
โ—ฆ
TRUNCATE ๋ช…๋ น
โ–ช
ํ…Œ์ด๋ธ”์—์„œ ์ „์ฒด ๋ฐ์ดํ„ฐ๋ฅผ ์‚ญ์ œํ•˜๋Š” ๋ฐ ์‚ฌ์šฉ๋ฉ๋‹ˆ๋‹ค.
โ–ช
ํฐ ํ…Œ์ด๋ธ”์„ ์‚ญ์ œํ•  ๋•Œ DELETE FROM๋ณด๋‹ค ๋น ๋ฅด๋‹ค.
โ–ช
Transaction์„ ๋ฌด์‹œํ•˜๊ณ  ์‚ญ์ œ๋ฅผ ํ•ด๋ฒ„๋ฆฐ๋‹ค
โ€ข
ํ•จ๋ถ€๋กœ ์“ฐ๋ฉด ๋ฌธ์ œ๊ฐ€ ์ƒ๊ธธ ์—ฌ์ง€๊ฐ€ ์žˆ๋‹ค.

ํŠธ๋žœ์žญ์…˜์ด๋ž€?

โ€ข
ํŠธ๋žœ์žญ์…˜์ด๋ž€?
โ—ฆ
์ค‘๊ฐ„์— ์‹คํŒจํ•˜๋ฉด ๋ถˆ์™„์ „ ์ƒํ™ฉ์— ๋†“์ด๋Š” ์ž‘์—…๋“ค์ด ์žˆ๋‹ค๋ฉด?
โ—ฆ
์•„๋ž˜ ์˜ˆ์—์„œ ์ธ์ถœ์€ ์„ฑ๊ณตํ–ˆ์ง€๋งŒ ์†ก๊ธˆ์—์„œ ๋ฌธ์ œ๊ฐ€ ์ƒ๊ธด๋‹ค๋ฉด?
โ€ข
ํŠธ๋žœ์žญ์…˜ ์ •์˜
Atomicํ•˜๊ฒŒ ์‹คํ–‰๋˜์–ด์•ผ ํ•˜๋Š” SQL๋“ค์„ ๋ฌถ์–ด์„œ ํ•˜๋‚˜์˜ ์ž‘์—…์ฒ˜๋Ÿผ ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐฉ๋ฒ•
โ—ฆ
SQL, ๊ด€๊ณ„ํ˜• DB์—์„œ ์‚ฌ์šฉ๋˜๋Š” ๊ฐœ๋…
โ—ฆ
BEGIN๊ณผ END ํ˜น์€ BEGIN๊ณผ COMMIT ์‚ฌ์ด์— ํ•ด๋‹น SQL๋“ค์„ ๋ฐฐ์น˜ ๋ฐ ใ…Œใ…ˆ์‚ฌ์šฉ
โ—ฆ
COMMIT์„ ํ†ตํ•ด Pysical ํ…Œ์ด๋ธ”์— ๋ฐ˜์˜์„ ์‹œํ‚จ๋‹ค.
โ—ฆ
Transaction ์‚ฌ์ด์— ์ •ํ•ฉ์„ฑ์ด ๊นจ์งˆ ๊ฒฝ์šฐ, ROLLBACK ์‹คํ–‰ โ‡’ Transaction ์ด์ „ ์ƒํƒœ๋กœ ๋Œ์•„๊ฐ„๋‹ค
โ€ข
Redshift(postgre DB) ์—ฐ๊ฒฐ ํ•จ์ˆ˜
import psycopg2 # postgresql db๋ฅผ ์—ฐ๊ฒฐ - redshift # Redshift connection ํ•จ์ˆ˜ # ๋ณธ์ธ ID/PW ์‚ฌ์šฉ! def get_Redshift_connection(boolean=True): host = "" redshift_user = "" redshift_pass = "" port = 5439 dbname = "" conn = psycopg2.connect("dbname={dbname} user={user} host={host} password={password} port={port}".format( dbname=dbname, user=redshift_user, password=redshift_pass, host=host, port=port )) conn.set_session(autocommit=boolean) return conn.cursor()
Python
๋ณต์‚ฌ
โ€ข
๋‘ ๊ฐ€์ง€ ์ข…๋ฅ˜์˜ ํŠธ๋žœ์žญ์…˜์ด ์กด์žฌ
โ—ฆ
conn.set_session(autocommit=boolean)
โ—ฆ
๋ ˆ์ฝ”๋“œ ๋ณ€๊ฒฝ์„ ๋ฐ”๋กœ ๋ฐ˜์˜ํ•˜๋Š”์ง€ ์—ฌ๋ถ€. autocommit์ด๋ผ๋Š” ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ ์กฐ์ ˆ๊ฐ€๋Šฅ
โ—ฆ
autocommit ์€ ์“ฐ๊ธฐ ๋ฐ ์‚ญ์ œ ์ž‘์—…์ผ ๋•Œ ์ค‘์š”ํ•จ
โ€ข
autocommit=True
โ—ฆ
๊ธฐ๋ณธ์ ์œผ๋กœ ๋ณ„๋„์˜ ํŠธ๋žœ์žญ์…˜ ๊ตฌ๊ฐ„์ด ์—†๋‹ค๋ฉด ๋ชจ๋“  SQL statement๊ฐ€ ๋ฐ”๋กœ ์ปค๋ฐ‹๋จ
โ—ฆ
์ด๋ฅผ ๋ฐ”๊พธ๊ณ  ์‹ถ๋‹ค๋ฉด BEGIN;END; ํ˜น์€ BEGIN;COMMIT์„ ์‚ฌ์šฉ (ํ˜น์€ ROLLBACK)
โ—ฆ
COMMIT ์ „๊นŒ์ง€๋Š” ์Šคํ…Œ์ด์ง• ํ…Œ์ด๋ธ”์—๋งŒ ๋ณด์ด๊ฒŒ ๋ฉ๋‹ˆ๋‹ค
โ€ข
autocommit=False
โ—ฆ
๊ธฐ๋ณธ์ ์œผ๋กœ ๋ชจ๋“  SQL statement๊ฐ€ ์ปค๋ฐ‹๋˜์ง€ ์•Š์Œ
โ—ฆ
Commit ์ „๊นŒ์ง€๋Š” ์ž‘์„ฑ ์ค‘์ธ ์‚ฌ์šฉ์ž์—๊ฒŒ๋งŒ ๋ณด์ธ๋‹ค. (๋ฌผ๋ฆฌ์  DB์— ๋ฐ˜์˜์ด ๋˜์ง€ ์•Š๋Š”๋‹ค)
โ—ฆ
์ปค๋„ฅ์…˜ ๊ฐ์ฒด์˜ .commit()๊ณผ .rollback()ํ•จ์ˆ˜๋กœ ์ปค๋ฐ‹ํ• ์ง€ ๋ง์ง€ ๊ฒฐ์ •
โ€ข
2๊ฐœ ์ค‘์—์„œ ๋ฌด์—‡์„ ์‚ฌ์šฉํ•  ์ง€๋Š” ๊ฐœ์ธ ์ทจํ–ฅ
โ—ฆ
Python์˜ ๊ฒฝ์šฐ try/catch์™€ ๊ฐ™์ด ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ด ์ผ๋ฐ˜์ 
โ–ช
try/catch๋กœ ์—๋Ÿฌ๊ฐ€ ๋‚˜๋ฉด rollback์„ ๋ช…์‹œ์ ์œผ๋กœ ์‹คํ–‰. ์—๋Ÿฌ๊ฐ€ ์•ˆ ๋‚˜๋ฉด commit์„ ์‹คํ–‰
conn = get_Redshift_connection(True) # True ์ผ ๋•Œ๋Š” ๋ฐ”๋กœ physical table์— ์ž‘์„ฑ ๋ฐ ๋ฐ˜์˜์„ ํ•ด๋ฒ„๋ฆฐ๋‹ค. cur = conn.cursor() cur.execute("DELETE FROM hajuny129.name_gender;") try: cur.execute("BEGIN;") cur.execute("DELETE FROM hajuny129.name_gender;") cur.execute("INSERT INTO hajuny129.name_gender VALUES ('Claire', 'Female');") cur.execute("END;") except (Exception, psycopg2.DatabaseError) as error: print(error) cur.execute("ROLLBACK;") # finally : # conn.close()
Python
๋ณต์‚ฌ
โ—ฆ
try/except ์‚ฌ์šฉ์‹œ ์œ ์˜ํ•  ์ 
โ–ช
์•„๋ž˜์™€ ๊ฐ™์€ ์ฝ”๋“œ๊ฐ€ ์žˆ์—ˆ๋‹ค๋ฉด exception์ด ๋ฐœ์ƒํ•œ ๊ฒฝ์šฐ ์–ด๋–ป๊ฒŒ ๋ ๊นŒ?
try: cur.execute(create_sql) cur.execute("COMMIT;") except Exception as e: cur.execute("ROLLBACK;") raise # ์—๋Ÿฌ ์ „ํŒŒ => transaction fail
Python
๋ณต์‚ฌ
โ–ช
๋งŒ์•ฝ raise ๋ฌธ์ด ์—†๋‹ค๋ฉด ์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒํ–ˆ๋‹ค๋Š” ์‚ฌ์‹ค์ด ์™ธ๋ถ€๋กœ ์ „ํŒŒ๊ฐ€ ๋˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค. ๋งˆ์น˜ ์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒํ–ˆ๋Š”๋ฐ, ์•„๋ฌด ์ผ๋„ ์—†๋˜ ๊ฒƒ์ฒ˜๋Ÿผ ์ธ์ง€๋  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
โ–ช
except์—์„œ raise๋ฅผ ํ˜ธ์ถœํ•˜๋ฉด ๋ฐœ์ƒํ•œ ์›๋ž˜ exception์ด ์œ„๋กœ ์ „ํŒŒ๋ฉ๋‹ˆ๋‹ค.
โ€ข
์—๋Ÿฌ๊ฐ€ ์ „ํŒŒ๋œ ํ›„, ํŠธ๋žœ์žญ์…˜์ด Fail๋  ๊ฒƒ์ž…๋‹ˆ๋‹ค.
โ€ข
ETL์„ ๊ด€๋ฆฌํ•˜๋Š” ์ž…์žฅ์—์„œ ์–ด๋–ค ์—๋Ÿฌ๊ฐ€ ๊ฐ์ถฐ์ง€๋Š” ๊ฒƒ๋ณด๋‹ค๋Š” ๋ช…ํ™•ํ•˜๊ฒŒ ๋“œ๋Ÿฌ๋‚˜๋Š” ๊ฒƒ์ด ๋” ์ข‹์Œ
โ€ข
์œ„์˜ ๊ฒฝ์šฐ cur.execute ๋’ค์— raise๋ฅผ ํ˜ธ์ถœํ•˜๋Š” ๊ฒƒ์ด ์ข‹์Œ