Search

[Airflow] Open Weathermap DAG 구현

Open Weathermap API 소개

위도/경도를 기반으로 그 지역의 기후 정보를 알려주는 서비스
무료 계정으로 api key를 받아서 이를 호출시에 사용
만드려는 DAG: 서울 8일 낮/최소/최대 온도 읽기
먼저 Open Weathermap에 각자 등록하고 자신의 API Key를 다운로드 받을 것
API Key를 open_weather_api_key라는 Variable로 저장
먼저 서울의 위도와 경도를 찾을 것
위도와 경도 중심으로 돌아가는 API
앞서 API KEY와 서울의 위도/경도를 사용해서 위의 API를 requests 모듈을 사용해서 호출
응답 결과에서 온도 정보(평균/최소/최대)만 앞으로 7일을 대상으로 출력해볼 것
날짜, 낮 온도(day), 최소 온도(min), 최대 온도(max)
엔드포인트
https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&exclude={part}&appid={API key}&units=metric
Plain Text
복사

DAG 구현 절차

1.
Open Weathermap 무료 API를 사용해서 서울의 다음 7일간의 낮/최소/최대 온도를 읽어다가 각자 스키마 밑의 weather_forecast라는 테이블로 저장
무료 계정으로 api key를 받아서 open_weather_api_key라는 Variable로 저장
https://openweathermap.org/api/one-call-api를 호출해서 테이블을 채움
weather_forecast라는 테이블이 대상이 됨
여기서 유의할 점은 created_date은 레코드 생성시간으로 자동 채워지는 필드라는 점
CREATE TABLE keeyong.weather_forecast ( date date primary key, temp float, -- 낮온도 min_temp float, max_temp float, created_date timestamp default GETDATE() );
Python
복사
윈도우 함수를 통해 중복 제거
2.
One-Call API는 결과를 JSON 형태로 리턴해줌
이를 읽어들이려면 requests.get 결과의 text를 JSON으로 변환해 주어야함
아니면 requests.get 결과 오브젝트가 제공해주는 .json()이란 함수 사용
f = requests.get(link)
f_js = f.json()
3.
결과 JSON에서 daily라는 필드에 앞으로 7일간 날씨 정보가 들어감 있음
daily 필드는 리스트(8개의 레코드)이며 각 레코드가 하나의 날짜에 해당
날짜 정보는 “dt”라는 필드에 들어 있음. 이는 epoch이라고 해서 1970년 1월 1일 이후 밀리 세컨드로 시간을 표시. 이는 아래와 같은 코드로 읽을 수 있는 날짜로 변경 가능하다.
datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d') # 2021-10-09
epoch에서 사람이 이해할 수 있는 월 일로 변경
4. Open Weather API 호출 응답 보기
json 중 daily라는 키 값으로 존재하는 리스트에 앞으로 7일간의 온도 정보가 들어옴
dt 필드가 날짜를 나타낸다.
이는 epoch이라고 해서 1970년 1월 1일 이후 밀리 세컨드로 시간을 표시. 이는 아래와 같은 코드로 읽을 수 있는 날짜로 변경 가능
datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d') # 2021-10-09
temp 필드가 온도 정보를 나타냄
day
min
max
night
eve
morn
5.
Airflow Connections를 통해 만들어진 Redshift connection
autocommit이 False인 점을 유의
마지막 예제(NameGenderCSVtoRedshift.py)를 이용, python operator을 이용해서 api 링크를 통해 데이터를 받아서 결과에서 daily field 찾아서 루프를 돌면서 각 아이템에서 dt, 와 day, min, max 뽑아낸 후 아래 2가지 방식으로 Full refresh 구현
6.
두 가지 방식의 Full Refresh 구현 방식
Full RefreshINSERT INTO를 사용
Full RefreshCOPY를 사용 → S3에 적재한 후 COPY 벌크 업데이트

Weather_Forecast DAG 구현

앞서 설명한 Open Weather DAG를 실제로 구현하기
FULL REFRESH 형태로 구현해보기
매번 테이블을 지우고 다시 빌드
API Key는 어디에 저장해야할까?
DW상의 테이블은 아래처럼 정의
CREATE TABLE 각자스키마.weather_forecast ( date date primary key, temp float, min_temp float, max_temp float, updated_date timestamp default GETDATE() );
SQL
복사
Full Refresh 매번 테이블을 지우고 다시 빌드
Context 변수
""" CREATE TABLE hajuny129.weather_forecast ( date date, temp float, min_temp float, max_temp float, updated_date timestamp default GETDATE() ); """
Python
복사
from airflow import DAG from airflow.operators.python import PythonOperator # from airflow.operators import PythonOperator from airflow.models import Variable from airflow.hooks.postgres_hook import PostgresHook from datetime import datetime from datetime import timedelta import requests import logging import psycopg2 import json def get_Redshift_connection(): # autocommit is False by default hook = PostgresHook(postgres_conn_id='redshift_dev_db') return hook.get_conn().cursor() # 현업에서는 아래와 같이 etl을 하나의 함수로 정의하면 안됨 (e, t, l 따로 하나씩 정의해서 운영해야 한다) def etl(**context): api_key = Variable.get("open_weather_api_key") # 서울의 위도/경도 lat = 37.5665 lon = 126.9780 # https://openweathermap.org/api/one-call-api url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts" response = requests.get(url) data = json.loads(response.text) """ {'dt': 1622948400, 'sunrise': 1622923873, 'sunset': 1622976631, 'moonrise': 1622915520, 'moonset': 1622962620, 'moon_phase': 0.87, 'temp': {'day': 26.59, 'min': 15.67, 'max': 28.11, 'night': 22.68, 'eve': 26.29, 'morn': 15.67}, 'feels_like': {'day': 26.59, 'night': 22.2, 'eve': 26.29, 'morn': 15.36}, 'pressure': 1003, 'humidity': 30, 'dew_point': 7.56, 'wind_speed': 4.05, 'wind_deg': 250, 'wind_gust': 9.2, 'weather': [{'id': 802, 'main': 'Clouds', 'description': 'scattered clouds', 'icon': '03d'}], 'clouds': 44, 'pop': 0, 'uvi': 3} """ ret = [] for d in data["daily"]: day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d') ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"])) cur = get_Redshift_connection() insert_sql = """DELETE FROM hajuny129.weather_forecast;INSERT INTO hajuny129.weather_forecast VALUES """ + ",".join(ret) logging.info(insert_sql) # 이렇게 받아온 코드는 auto commit이 False임 => commit이 필요함 try: cur.execute(insert_sql) cur.execute("Commit;") except Exception as e: cur.execute("Rollback;") raise dag = DAG( dag_id = 'Weather_to_Redshift', start_date = datetime(2022,8,24), # 날짜가 미래인 경우 실행이 안됨 schedule_interval = '0 2 * * *', # 적당히 조절 max_active_runs = 1, catchup = False, default_args = { 'retries': 1, 'retry_delay': timedelta(minutes=3), } ) etl = PythonOperator( task_id = 'etl', python_callable = etl, dag = dag )
Python
복사
Task Decorator
from airflow import DAG from airflow.models import Variable from airflow.providers.postgres.hooks.postgres import PostgresHook from airflow.decorators import task from datetime import datetime from datetime import timedelta import requests import logging import json def get_Redshift_connection(): # autocommit is False by default hook = PostgresHook(postgres_conn_id='redshift_dev_db') return hook.get_conn().cursor() @task def etl(schema, table): api_key = Variable.get("open_weather_api_key") # 서울의 위도/경도 lat = 37.5665 lon = 126.9780 # https://openweathermap.org/api/one-call-api url = f"https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&appid={api_key}&units=metric&exclude=current,minutely,hourly,alerts" response = requests.get(url) data = json.loads(response.text) # response.json도 가능 """ {'dt': 1622948400, 'sunrise': 1622923873, 'sunset': 1622976631, 'moonrise': 1622915520, 'moonset': 1622962620, 'moon_phase': 0.87, 'temp': {'day': 26.59, 'min': 15.67, 'max': 28.11, 'night': 22.68, 'eve': 26.29, 'morn': 15.67}, 'feels_like': {'day': 26.59, 'night': 22.2, 'eve': 26.29, 'morn': 15.36}, 'pressure': 1003, 'humidity': 30, 'dew_point': 7.56, 'wind_speed': 4.05, 'wind_deg': 250, 'wind_gust': 9.2, 'weather': [{'id': 802, 'main': 'Clouds', 'description': 'scattered clouds', 'icon': '03d'}], 'clouds': 44, 'pop': 0, 'uvi': 3} """ ret = [] # 향후 8일 간의 날씨 레코드 정보 for d in data["daily"]: day = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d') ret.append("('{}',{},{},{})".format(day, d["temp"]["day"], d["temp"]["min"], d["temp"]["max"])) cur = get_Redshift_connection() # 테이블 존재 시 drop -> create table drop_recreate_sql = f"""DROP TABLE IF EXISTS {schema}.{table}; CREATE TABLE {schema}.{table} ( date date, temp float, min_temp float, max_temp float, created_date timestamp default GETDATE() ); """ insert_sql = f"""INSERT INTO {schema}.{table} VALUES """ + ",".join(ret) logging.info(drop_recreate_sql) logging.info(insert_sql) # SQL 문을 만들어놓고 try 문 안에서 해결 try: cur.execute(drop_recreate_sql) cur.execute(insert_sql) cur.execute("Commit;") except Exception as e: cur.execute("Rollback;") raise with DAG( dag_id = 'Weather_to_Redshift', start_date = datetime(2023,5,30), # 날짜가 미래인 경우 실행이 안됨 schedule = '0 2 * * *', # 매일 오전 2시에 시작하는 daily DAG max_active_runs = 1, catchup = False, default_args = { 'retries': 1, 'retry_delay': timedelta(minutes=3), } ) as dag: etl("keeyong", "weather_forecast")
Python
복사
Incremental Update
a.
임시 테이블 생성하고 임시 테이블로 원본 테이블 내용 복사 ⇒ 임시 테이블이 핵심
CTAS의 문제점: default와 같은 attribute가 보존되지 않음
CREATE TABLE (LIKE)을 사용
CREATE TABLE hajuny129.temp_nps (LIKE hajuny129.nps INCLUDING DEFAULTS);
INCLUDING DEFAULTS 주의 ⇒ 테이블의 updated_date 필드의 default GETDATE()를 가져온다
""" CREATE TABLE user.weather_forecast ( date date, temp float, min_temp float, max_temp float, created_date timestamp default GETDATE() -- 이를 상속 ); """
Python
복사
try ~ except
raise를 하지 않으면 그냥 롤백하고 airflow에서는 오류처리가 안됩니다
b. 임시 테이블로 새 레코드 추가
이 때 중복 발생
c. 임시 테이블을 원본 테이블로 교체하면서 중복 제거
코드

What is raise? (예외 일으키기)

프로그래머가 인위적으로 에러를 발생 시킬 수 있습니다. 바로, raise 문을 통해서 가능
만약 여러분들이 파싱을 해주는 함수를 만들었는데, 이상한 값이 들어 왔다고 알려 주기 위해 에러를 발생 시킬 수도 있습니다.
그렇다면 상위에 존재하는 try ... except 문에서 에러를 처리 가능
def str_to_fraction(string): arr = string.split('/') if len(arr) != 2: raise ValueError('분수를 입력 해 주세요') else: try: numerator = int(arr[0]) denominator = int(arr[1]) return { 'numerator': numerator, 'denominator': denominator, 'value': numerator / denominator, 'string': string } except ValueError: raise ValueError('정수를 입력 하세요') try: str_to_fraction('1//2') except ValueError as v: print(v) try: str_to_fraction('1.1/2') except ValueError as v: print(v) try: str_to_fraction('1/2') except ValueError as v: print(v)
Python
복사