Yahoo Finance API DAG 작성
구현 DAG의 세부 사항 - Full Refresh로 구현
1.
Yahoo Finance API를 호출하여 애플 주식 정보 수집 (지난 30일)
2.
Redshift 상의 테이블로 1에서 받은 레코드들을 적재
•
매일 지난 30일 간의 정보들만 Full refresh 형식으로 구현하게 됩니다.
◦
첫번째 태스크가 E ~ T를 담당
◦
두번째 태스크가 Load를 담당
•
먼저 라이브러리 설치
pip3 install yfinance
Python
복사
•
Extract/Transform: Yahoo Finance API 호출
◦
Yahoo Finance API를 호출하여 애플 주식 정보 수집하고 파싱
◦
기본으로 지난 한달의 주식 가격을 리턴해줌
import yfinance as yf
@task
def get_historical_prices(symbol):
# extract
ticket = yf.Ticker(symbol)
data = ticket.history() #pandas 데이터 프레임 형태로 리턴
records = []
# transform
for index, row in data.iterrows():
date = index.strftime('%Y-%m-%d %H:%M:%S')
records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]])
return records
Python
복사
•
Load: Redshift의 테이블을 업데이트
◦
Full Refresh로 구현
▪
매번 테이블을 새로 만드는 형태로 구성
◦
트랜잭션 형태로 구성 (NameGender DAG와 동일)
•
실행 데모
◦
앞서 코드 실행
from airflow import DAG
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from pandas import Timestamp
import yfinance as yf
import pandas as pd
import logging
def get_Redshift_connection(autocommit=True):
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
conn = hook.get_conn()
conn.autocommit = autocommit
return conn.cursor()
@task
def get_historical_prices(symbol):
ticket = yf.Ticker(symbol)
data = ticket.history()
records = []
for index, row in data.iterrows():
date = index.strftime('%Y-%m-%d %H:%M:%S')
records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]])
return records
@task
def load(schema, table, records):
logging.info("load started")
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
cur.execute(f"""
CREATE TABLE {schema}.{table} (
date date,
"open" float,
high float,
low float,
close float,
volume bigint
);""")
# DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
for r in records:
sql = f"INSERT INTO {schema}.{table} VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
print(sql)
cur.execute(sql)
cur.execute("COMMIT;") # cur.execute("END;")
except Exception as error:
print(error)
cur.execute("ROLLBACK;")
raise # error를 raise
logging.info("load done")
with DAG(
dag_id = 'UpdateSymbol',
start_date = datetime(2023,5,30),
catchup=False,
tags=['API'],
schedule = '0 10 * * *'
) as dag:
results = get_historical_prices("AAPL")
load("hajuny129", "stock_info", results)
Python
복사
◦
Docker 스케쥴러와 워커 컨테이너에 로그인해서 yfinance 모듈 설치가 필요
▪
docker exec -it {컨테이너 id} /bin/bash
▪
pip3 install yfinance
▪
airflow tasks list UpdateSymbol
▪
airflow dags test UpdateSymbol 2023-05-30
◦
docker container에 루트 유저로 로그인하는 방법
▪
docker exec --user root -it 2d1d8fe0fd67 /bin/bash
▪
apt 패키지를 설치 가능해진다.
Yahoo Finance API를 호출해서 애플 주식을 읽어오는 Incremental Update 기반의 DAG를 만들자
구현 DAG의 세부 사항 - Incremental Update로 구현
기존 테이블 내용 임시 테이블로 복사 - 가져온 데이터 적재 - 임시 테이블 중복 제거
- 다시 기존 테이블 Full refresh
1.
Yahoo Finance API를 호출하여 애플 주식 정보 수집 (지난 30일)
2.
Yahoo Finance API를 호출하여 애플 주식 정보 수집하고 파싱
3.
기본으로 지난 한달의 주식 가격을 리턴해줌
4.
Redshift 상의 테이블로 1에서 받은 레코드들을 적재하고 중복 제거
•
매일 하루 치의 데이터씩 늘어남
•
Load: Redshift의 테이블을 업데이트
◦
Incremental Update로 구현
▪
임시 테이블 생성하면서 현재 테이블의 레코드를 복사
•
(CREATE TEMP TABLE … AS SELECT)
•
임시 테이블 만드는 부분에서 에러가 발생해도 정합성에 큰 문제는 없다
▪
임시 테이블로 Yahoo Finance API로 읽어온 레코드를 적재
•
그럼 중복이 발생하는 부분이 생길 것
▪
원본 테이블을 삭제하고 새로 생성
▪
원본 테이블에 임시 테이블의 내용을 복사
•
이 때 SELECT DISTINCT *를 사용하여 중복 제거
◦
트랜잭션 형태로 구성 (NameGender DAG와 동일)
3.
구조
•
임시 테이블을 만들어서 적재
•
임시 테이블의 중복을 제거해서 적재
4.
눈 여겨 볼 코드 (Load : Redshift의 테이블을 업데이트)
def _create_table(cur, schema, table, drop_first):
if drop_first:
cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
cur.execute(f"""
CREATE TABLE IF NOT EXISTS {schema}.{table} (
date date,
"open" float,
high float,
low float,
close float,
volume bigint
);""")
def load(schema, table, records):
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
# 원본 테이블 생성 - 테이블이 처음 한번 만들어질 때 필요
_create_table(cur, schema, table, False)
# 임시 테이블로 원본 테이블을 복사(테이블이 만들어지면서 채워짐)
cur.execute(f"CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};")
for r in records:
sql = f"INSERT INTO t VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
cur.execute(sql)
# 원본 테이블 생성
_create_table(cur, schema, table, True)
# 임시 테이블 내용을 중복을 제거해서 원본 테이블로 복사
# 테이블이 이미 만들어진 상태에서 적재
cur.execute(f"INSERT INTO {schema}.{table} SELECT DISTINCT * FROM t;")
cur.execute("COMMIT;")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
cur.execute("ROLLBACK;")
Python
복사
•
전체 코드
from airflow import DAG
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from pandas import Timestamp
import yfinance as yf
import pandas as pd
import logging
def get_Redshift_connection(autocommit=True):
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
conn = hook.get_conn()
conn.autocommit = autocommit
return conn.cursor()
@task
def get_historical_prices(symbol):
ticket = yf.Ticker(symbol)
data = ticket.history()
records = []
for index, row in data.iterrows():
date = index.strftime('%Y-%m-%d %H:%M:%S')
records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]])
return records
def _create_table(cur, schema, table, drop_first):
if drop_first:
cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
cur.execute(f"""
CREATE TABLE IF NOT EXISTS {schema}.{table} (
date date,
"open" float,
high float,
low float,
close float,
volume bigint
);""")
@task
def load(schema, table, records):
logging.info("load started")
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
# 원본 테이블이 없으면 생성 - 테이블이 처음 한번 만들어질 때 필요한 코드
_create_table(cur, schema, table, False)
# 임시 테이블로 원본 테이블을 복사
cur.execute(f"CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};")
for r in records:
sql = f"INSERT INTO t VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
print(sql)
cur.execute(sql)
# 원본 테이블 생성
_create_table(cur, schema, table, True)
# 임시 테이블 내용을 원본 테이블로 복사
cur.execute(f"INSERT INTO {schema}.{table} SELECT DISTINCT * FROM t;")
cur.execute("COMMIT;") # cur.execute("END;")
except Exception as error:
print(error)
cur.execute("ROLLBACK;")
raise
logging.info("load done")
with DAG(
dag_id = 'UpdateSymbol_v2',
start_date = datetime(2023,5,30),
catchup=False,
tags=['API'],
schedule = '0 10 * * *'
) as dag:
results = get_historical_prices("AAPL")
load("hajuny129", "stock_info_v2", results)
# 이 이름으로 이 스키마 밑에 테이블들이 만들어질 것
Python
복사
5.
적재 후 확인 쿼리
SELECT * FROM {id}.stock_info_v2 ORDER BY 1;
Python
복사
과제 - ROW_NUMBER 방식을 사용해서 Primary key가 동일한 레코드들을 처리
UpdateSymbol_v2의 Incremental Update 방식 수정해보기
•
DISTINCT 방식의 중복처리는 데이터 소스에 따라 이상 동작할 수 있음
◦
위 사진과 같이 날짜별 다른 값이 존재할 경우, 날짜 별로 추가적인 레코드가 생겨버림
•
앞서 배운 ROW_NUMBER 방식을 사용해서 Primary key가 동일한 레코드들을 처리하기
from airflow import DAG
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from datetime import datetime
from pandas import Timestamp
import yfinance as yf
import pandas as pd
import logging
def get_Redshift_connection(autocommit=True):
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
conn = hook.get_conn()
conn.autocommit = autocommit
return conn.cursor()
@task
def get_historical_prices(symbol):
ticket = yf.Ticker(symbol)
data = ticket.history()
records = []
for index, row in data.iterrows():
date = index.strftime('%Y-%m-%d %H:%M:%S')
records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]])
return records
def _create_table(cur, schema, table, drop_first):
if drop_first:
cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};")
cur.execute(f"""
CREATE TABLE IF NOT EXISTS {schema}.{table} (
date date,
"open" float,
high float,
low float,
close float,
volume bigint,
created_date timestamp DEFAULT GETDATE()
);""")
# default 값으로 현재 시간 => GETDATE()
@task
def load(schema, table, records):
logging.info("load started")
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
# 원본 테이블이 없으면 생성 - 테이블이 처음 한번 만들어질 때 필요한 코드
_create_table(cur, schema, table, False)
# 임시 테이블로 원본 테이블을 복사
cur.execute(f"CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};")
for r in records:
sql = f"INSERT INTO t VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});"
print(sql)
cur.execute(sql)
# 임시 테이블 내용을 원본 테이블로 복사
cur.execute(f"DELETE FROM {schema}.{table};")
cur.execute(f"""INSERT INTO {schema}.{table}
SELECT date, "open", high, low, close, volume FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq
FROM t
)
WHERE seq = 1;""")
cur.execute("COMMIT;") # cur.execute("END;")
except Exception as error:
print(error)
cur.execute("ROLLBACK;")
raise
logging.info("load done")
with DAG(
dag_id = 'UpdateSymbol_v3',
start_date = datetime(2023,5,30),
catchup=False,
tags=['API'],
schedule = '0 10 * * *'
) as dag:
results = get_historical_prices("AAPL")
load("hajuny129", "stock_info_v3", results)
Python
복사