Search

[Airflow] 세계 나라 정보 API 사용 DAG

과제 해결한 내용을 정리

세계 나라 정보 API 사용 DAG 작성
https://restcountries.com/에 가면 세부 사항을 찾을 수 있음
별도의 API Key가 필요없음
https://restcountries.com/v3/all를 호출하여 나라별로 다양한 정보를 얻을 수 있음
{"name": {"common": "South Korea", "official": "Republic of Korea","area": 100210.0, "population": 51780579,}
Python
복사
Full Refresh로 구현해서 매번 국가 정보를 읽어오게 할 것!
API 결과에서 아래 3개의 정보를 추출하여 Redshift에 각자 스키마 밑에 테이블 생성
country → [“name”][“official”]
population → [“population”]
area → [“area”]
단 이 DAG는 UTC로 매주 토요일 오전 6시 30분에 실행되게 만들어볼 것!
schedule = “20 10 * * *”
숙제는 개인 github에 repo를 만든 후 제출할 것!
Country DAG 코드 리뷰
앞서 finance DAG와 load 부분은 거의 흡사하다
import requests @task def extract_transform(): response = requests.get('https://restcountries.com/v3/all') countries = response.json() # dictionary list records = [] for country in countries: name = country['name']['common'] population = country['population'] area = country['area'] records.append([name, population, area]) return records with DAG( dag_id = 'CountryInfo', start_date = datetime(2023,5,30), catchup=False, tags=['API'], schedule = '30 6 * * 6' # 마지막 자릿수: 0 - Sunday, …, 6 - Saturday ) as dag: results = extract_transform() load("keeyong", "country_info", results)
Python
복사
-- 테이블은 아래와 같이 생성하면 된다. CREATE TABLE {schema}.{table} ( name varchar(256) primary key, --DW가 PK를 보장하진 않고 옵티마이저에 힌트를 주는 정도 population int, area float );
SQL
복사
Docker 환경에서 실행 데모 예시
코드 예시
위에는 없지만 롤백 이후 raise 코드를 넣어주는 것이 좋다 → 디버깅에 유리