Search

Yahoo Finance API DAG(update symbol) 작성

Yahoo Finance API DAG 작성

구현 DAG의 세부 사항 - Full Refresh로 구현

1.
Yahoo Finance API를 호출하여 애플 주식 정보 수집 (지난 30일)
2.
Redshift 상의 테이블로 1에서 받은 레코드들을 적재
매일 지난 30일 간의 정보들만 Full refresh 형식으로 구현하게 됩니다.
첫번째 태스크가 E ~ T를 담당
두번째 태스크가 Load를 담당
먼저 라이브러리 설치
pip3 install yfinance
Python
복사
Extract/Transform: Yahoo Finance API 호출
Yahoo Finance API를 호출하여 애플 주식 정보 수집하고 파싱
기본으로 지난 한달의 주식 가격을 리턴해줌
import yfinance as yf @task def get_historical_prices(symbol): # extract ticket = yf.Ticker(symbol) data = ticket.history() #pandas 데이터 프레임 형태로 리턴 records = [] # transform for index, row in data.iterrows(): date = index.strftime('%Y-%m-%d %H:%M:%S') records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]]) return records
Python
복사
Load: Redshift의 테이블을 업데이트
Full Refresh로 구현
매번 테이블을 새로 만드는 형태로 구성
트랜잭션 형태로 구성 (NameGender DAG와 동일)
실행 데모
앞서 코드 실행
from airflow import DAG from airflow.decorators import task from airflow.providers.postgres.hooks.postgres import PostgresHook from datetime import datetime from pandas import Timestamp import yfinance as yf import pandas as pd import logging def get_Redshift_connection(autocommit=True): hook = PostgresHook(postgres_conn_id='redshift_dev_db') conn = hook.get_conn() conn.autocommit = autocommit return conn.cursor() @task def get_historical_prices(symbol): ticket = yf.Ticker(symbol) data = ticket.history() records = [] for index, row in data.iterrows(): date = index.strftime('%Y-%m-%d %H:%M:%S') records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]]) return records @task def load(schema, table, records): logging.info("load started") cur = get_Redshift_connection() try: cur.execute("BEGIN;") cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};") cur.execute(f""" CREATE TABLE {schema}.{table} ( date date, "open" float, high float, low float, close float, volume bigint );""") # DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태 for r in records: sql = f"INSERT INTO {schema}.{table} VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});" print(sql) cur.execute(sql) cur.execute("COMMIT;") # cur.execute("END;") except Exception as error: print(error) cur.execute("ROLLBACK;") raise # error를 raise logging.info("load done") with DAG( dag_id = 'UpdateSymbol', start_date = datetime(2023,5,30), catchup=False, tags=['API'], schedule = '0 10 * * *' ) as dag: results = get_historical_prices("AAPL") load("hajuny129", "stock_info", results)
Python
복사
Docker 스케쥴러와 워커 컨테이너에 로그인해서 yfinance 모듈 설치가 필요
docker exec -it {컨테이너 id} /bin/bash
pip3 install yfinance
airflow tasks list UpdateSymbol
airflow dags test UpdateSymbol 2023-05-30
docker container에 루트 유저로 로그인하는 방법
docker exec --user root -it 2d1d8fe0fd67 /bin/bash
apt 패키지를 설치 가능해진다.

Yahoo Finance API를 호출해서 애플 주식을 읽어오는 Incremental Update 기반의 DAG를 만들자

구현 DAG의 세부 사항 - Incremental Update로 구현 기존 테이블 내용 임시 테이블로 복사 - 가져온 데이터 적재 - 임시 테이블 중복 제거 - 다시 기존 테이블 Full refresh
1.
Yahoo Finance API를 호출하여 애플 주식 정보 수집 (지난 30일)
2.
Yahoo Finance API를 호출하여 애플 주식 정보 수집하고 파싱
3.
기본으로 지난 한달의 주식 가격을 리턴해줌
4.
Redshift 상의 테이블로 1에서 받은 레코드들을 적재하고 중복 제거
매일 하루 치의 데이터씩 늘어남
Load: Redshift의 테이블을 업데이트
Incremental Update로 구현
임시 테이블 생성하면서 현재 테이블의 레코드를 복사
(CREATE TEMP TABLE … AS SELECT)
임시 테이블 만드는 부분에서 에러가 발생해도 정합성에 큰 문제는 없다
임시 테이블로 Yahoo Finance API로 읽어온 레코드를 적재
그럼 중복이 발생하는 부분이 생길 것
원본 테이블을 삭제하고 새로 생성
원본 테이블에 임시 테이블의 내용을 복사
이 때 SELECT DISTINCT *를 사용하여 중복 제거
트랜잭션 형태로 구성 (NameGender DAG와 동일)
3.
구조
임시 테이블을 만들어서 적재
임시 테이블의 중복을 제거해서 적재
4.
눈 여겨 볼 코드 (Load : Redshift의 테이블을 업데이트)
def _create_table(cur, schema, table, drop_first): if drop_first: cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};") cur.execute(f""" CREATE TABLE IF NOT EXISTS {schema}.{table} ( date date, "open" float, high float, low float, close float, volume bigint );""") def load(schema, table, records): cur = get_Redshift_connection() try: cur.execute("BEGIN;") # 원본 테이블 생성 - 테이블이 처음 한번 만들어질 때 필요 _create_table(cur, schema, table, False) # 임시 테이블로 원본 테이블을 복사(테이블이 만들어지면서 채워짐) cur.execute(f"CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};") for r in records: sql = f"INSERT INTO t VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});" cur.execute(sql) # 원본 테이블 생성 _create_table(cur, schema, table, True) # 임시 테이블 내용을 중복을 제거해서 원본 테이블로 복사 # 테이블이 이미 만들어진 상태에서 적재 cur.execute(f"INSERT INTO {schema}.{table} SELECT DISTINCT * FROM t;") cur.execute("COMMIT;") except (Exception, psycopg2.DatabaseError) as error: print(error) cur.execute("ROLLBACK;")
Python
복사
전체 코드
from airflow import DAG from airflow.decorators import task from airflow.providers.postgres.hooks.postgres import PostgresHook from datetime import datetime from pandas import Timestamp import yfinance as yf import pandas as pd import logging def get_Redshift_connection(autocommit=True): hook = PostgresHook(postgres_conn_id='redshift_dev_db') conn = hook.get_conn() conn.autocommit = autocommit return conn.cursor() @task def get_historical_prices(symbol): ticket = yf.Ticker(symbol) data = ticket.history() records = [] for index, row in data.iterrows(): date = index.strftime('%Y-%m-%d %H:%M:%S') records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]]) return records def _create_table(cur, schema, table, drop_first): if drop_first: cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};") cur.execute(f""" CREATE TABLE IF NOT EXISTS {schema}.{table} ( date date, "open" float, high float, low float, close float, volume bigint );""") @task def load(schema, table, records): logging.info("load started") cur = get_Redshift_connection() try: cur.execute("BEGIN;") # 원본 테이블이 없으면 생성 - 테이블이 처음 한번 만들어질 때 필요한 코드 _create_table(cur, schema, table, False) # 임시 테이블로 원본 테이블을 복사 cur.execute(f"CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};") for r in records: sql = f"INSERT INTO t VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});" print(sql) cur.execute(sql) # 원본 테이블 생성 _create_table(cur, schema, table, True) # 임시 테이블 내용을 원본 테이블로 복사 cur.execute(f"INSERT INTO {schema}.{table} SELECT DISTINCT * FROM t;") cur.execute("COMMIT;") # cur.execute("END;") except Exception as error: print(error) cur.execute("ROLLBACK;") raise logging.info("load done") with DAG( dag_id = 'UpdateSymbol_v2', start_date = datetime(2023,5,30), catchup=False, tags=['API'], schedule = '0 10 * * *' ) as dag: results = get_historical_prices("AAPL") load("hajuny129", "stock_info_v2", results) # 이 이름으로 이 스키마 밑에 테이블들이 만들어질 것
Python
복사
5.
적재 후 확인 쿼리
SELECT * FROM {id}.stock_info_v2 ORDER BY 1;
Python
복사

과제 - ROW_NUMBER 방식을 사용해서 Primary key가 동일한 레코드들을 처리

UpdateSymbol_v2의 Incremental Update 방식 수정해보기
DISTINCT 방식의 중복처리는 데이터 소스에 따라 이상 동작할 수 있음
위 사진과 같이 날짜별 다른 값이 존재할 경우, 날짜 별로 추가적인 레코드가 생겨버림
앞서 배운 ROW_NUMBER 방식을 사용해서 Primary key가 동일한 레코드들을 처리하기
from airflow import DAG from airflow.decorators import task from airflow.providers.postgres.hooks.postgres import PostgresHook from datetime import datetime from pandas import Timestamp import yfinance as yf import pandas as pd import logging def get_Redshift_connection(autocommit=True): hook = PostgresHook(postgres_conn_id='redshift_dev_db') conn = hook.get_conn() conn.autocommit = autocommit return conn.cursor() @task def get_historical_prices(symbol): ticket = yf.Ticker(symbol) data = ticket.history() records = [] for index, row in data.iterrows(): date = index.strftime('%Y-%m-%d %H:%M:%S') records.append([date, row["Open"], row["High"], row["Low"], row["Close"], row["Volume"]]) return records def _create_table(cur, schema, table, drop_first): if drop_first: cur.execute(f"DROP TABLE IF EXISTS {schema}.{table};") cur.execute(f""" CREATE TABLE IF NOT EXISTS {schema}.{table} ( date date, "open" float, high float, low float, close float, volume bigint, created_date timestamp DEFAULT GETDATE() );""") # default 값으로 현재 시간 => GETDATE() @task def load(schema, table, records): logging.info("load started") cur = get_Redshift_connection() try: cur.execute("BEGIN;") # 원본 테이블이 없으면 생성 - 테이블이 처음 한번 만들어질 때 필요한 코드 _create_table(cur, schema, table, False) # 임시 테이블로 원본 테이블을 복사 cur.execute(f"CREATE TEMP TABLE t AS SELECT * FROM {schema}.{table};") for r in records: sql = f"INSERT INTO t VALUES ('{r[0]}', {r[1]}, {r[2]}, {r[3]}, {r[4]}, {r[5]});" print(sql) cur.execute(sql) # 임시 테이블 내용을 원본 테이블로 복사 cur.execute(f"DELETE FROM {schema}.{table};") cur.execute(f"""INSERT INTO {schema}.{table} SELECT date, "open", high, low, close, volume FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY created_date DESC) seq FROM t ) WHERE seq = 1;""") cur.execute("COMMIT;") # cur.execute("END;") except Exception as error: print(error) cur.execute("ROLLBACK;") raise logging.info("load done") with DAG( dag_id = 'UpdateSymbol_v3', start_date = datetime(2023,5,30), catchup=False, tags=['API'], schedule = '0 10 * * *' ) as dag: results = get_historical_prices("AAPL") load("hajuny129", "stock_info_v3", results)
Python
복사