Search

Airflow - Gsheet 사용법 정리

구글 시트를 테이블로 복사하는 예제

구글 시트를 테이블로 복사하는 예제

구현 절차는 다음과 같다

1.
시트 API 활성화하고 구글 서비스 어카운트 생성하고 그 내용을 JSON 파일로 다운로드
2.
어카운트에서 생성해준 이메일을 조작하고 싶은 시트에 공유
3.
Airflow DAG쪽에서 해당 JSON 파일로 인증하고 앞서 시트를 조작

구글 서비스 어카운트 생성

구글 클라우드 로그인
구글 스프레드시트 API 활성화 필요 + 구글 드라이브 API도 활성화하면 좋다 (사본을 드라이브에 생성했을 경우)
이 JSON 파일의 내용을 google_sheet_access_token이란 이름의 Variable로 등록
이 JSON 파일을 보면 이메일 주소가 하나 존재
이를 읽고 싶은 구글스프레드시트 파일에 공유. 이 이메일은 iam.gserviceaccount.com로 끝남

구글 서비스 어카운트 생성 절차

시트 API 활성화가 된 후에 왼쪽에서
1. Credentials 선택하고 위에서
2. +CREATE CREDENTIALS 선택하고 최종적으로
3. “Service account” 선택
4.
API Keys 화면에서 방금 생성한 서비스 어카운트 선택
5.
탭 중에 KEYS 선택
6.
Private key 생성 화면에서 JSON을 선택하고 해당 파일을 다운로드 (자동으로 됨)
7.
이 파일의 내용을 뒤에서 Airflow Variable로 등록해주어야함 (google_sheet_access_token)
먼저 스프레드 시트를 보자
이 시트에 앞서 구글 서비스 어카운트에서 찾은 이메일을 공유하자
iam.gserviceaccount.com가 포함된 이메일!
편집자 권한을 부여

dags/Gsheet_to_Redshift.py

구글 스프레드시트에서 읽기를 쉽게 해주는 모듈입니다. 아직은 쓰는 기능은 없습니다만 쉽게 추가 가능합니다.
메인 함수는 get_google_sheet_to_csv입니다.
이는 google sheet API를 통해 구글 스프레드시트를 읽고 쓰는 것이 가능하게 해줍니다.
읽으려는 시트(탭)가 있는 스프레드시트 파일이 구글 서비스 어카운트 이메일과 공유가 되어있어야 합니다.
️ Airflow 상에서는 서비스어카운트 JSON 파일의 내용이 google_sheet_access_token이라는 이름의 Variable로 저장되어 있어야 합니다.
이 이메일은 iam.gserviceaccount.com로 끝납니다.
️위 Variable의 내용이 매번 파일로 쓰여지고 그 파일이 구글에 권한 체크를 하는데 사용되는데 이 파일은 local_data_dir Variable로 지정된 로컬 파일 시스템에 저장된다.
이 Variable은 보통 /var/lib/airflow/data/로 설정되며 이를 먼저 생성두어야 한다 (airflow 사용자)
JSON 기반 서비스 어카운트를 만들려면 이 링크를 참고하세요: https://denisluiz.medium.com/python-with-google-sheets-service-account-step-by-step-8f74c26ed28e
아래 2개의 모듈 설치가 별도로 필요합니다. - pip3 install oauth2client - pip3 install gspread
get_google_sheet_to_csv 함수:
첫 번째 인자로 스프레드시트 링크를 제공. 이 시트를 service account 이메일과 공유해야합니다.
두 번째 인자로 데이터를 읽어올 tab의 이름을 지정합니다.
세 번째 인자로 지정된 test.csv로 저장합니다.
gsheet.get_google_sheet_to_csv( 'https://docs.google.com/spreadsheets/d/1hW-_16OqgctX-xxx/', 'Test', 'test.csv', )
Python
복사
여기 예제에서는 아래와 같이 테이블을 만들어두고 이를 구글스프레드시트로부터 채운다
CREATE TABLE keeyong.spreadsheet_copy_testing ( col1 int, col2 int, col3 int, col4 int );
Python
복사
from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator from airflow.models import Variable from datetime import datetime from datetime import timedelta from plugins import gsheet from plugins import s3 import requests import logging import psycopg2 import json def download_tab_in_gsheet(**context): url = context["params"]["url"] tab = context["params"]["tab"] table = context["params"]["table"] data_dir = Variable.get("DATA_DIR") gsheet.get_google_sheet_to_csv( url, tab, data_dir+'{}.csv'.format(table) ) def copy_to_s3(**context): table = context["params"]["table"] s3_key = context["params"]["s3_key"] s3_conn_id = "aws_conn_id" s3_bucket = "grepp-data-engineering" data_dir = Variable.get("DATA_DIR") local_files_to_upload = [ data_dir+'{}.csv'.format(table) ] replace = True s3.upload_to_s3(s3_conn_id, s3_bucket, s3_key, local_files_to_upload, replace) dag = DAG( dag_id = 'Gsheet_to_Redshift', start_date = datetime(2021,11,27), # 날짜가 미래인 경우 실행이 안됨 schedule = '0 9 * * *', # 적당히 조절 max_active_runs = 1, max_active_tasks = 2, catchup = False, default_args = { 'retries': 1, 'retry_delay': timedelta(minutes=3), } ) sheets = [ { "url": "https://docs.google.com/spreadsheets/d/1hW-_16OqgctX-xxx/", "tab": "SheetToRedshift", "schema": "keeyong", "table": "spreadsheet_copy_testing" } ] for sheet in sheets: download_tab_in_gsheet = PythonOperator( task_id = 'download_{}_in_gsheet'.format(sheet["table"]), python_callable = download_tab_in_gsheet, params = sheet, dag = dag) s3_key = sheet["schema"] + "_" + sheet["table"] copy_to_s3 = PythonOperator( task_id = 'copy_{}_to_s3'.format(sheet["table"]), python_callable = copy_to_s3, params = { "table": sheet["table"], "s3_key": s3_key }, dag = dag) run_copy_sql = S3ToRedshiftOperator( task_id = 'run_copy_sql_{}'.format(sheet["table"]), s3_bucket = "grepp-data-engineering", s3_key = s3_key, schema = sheet["schema"], table = sheet["table"], copy_options=['csv', 'IGNOREHEADER 1'], method = 'REPLACE', redshift_conn_id = "redshift_dev_db", aws_conn_id = 'aws_conn_id', dag = dag ) download_tab_in_gsheet >> copy_to_s3 >> run_copy_sql
Python
복사
docker의 environment에 저장한 Airflow Variable 변수는 웹 UI에서는 보이지 않지만, 커맨드라인에서는 보인다. 컨테이너들끼리 해당 변수가 공유가 되고 있다.
다수의 태스크, 다수의 워커가 돌아갈 때, 만약 하나의 워커 내 로컬 파일 시스템에 저장을 해버리면 다른 워커는 접근을 못하게 된다. 위에처럼 볼륨을 통해 컨테이너끼리 통신을 하게 되면 워커 간 파일시스템 저장 장소 공유가 가능해진다.

dags/plugin/gsheet.py

get_gsheet_client 함수 : Data dir로 airflow variable에 있는 인증 정보를 읽어다가 google-sheet.json 파일로 저장해 인증에 사용
# -*- coding: utf-8 -*- from airflow.hooks.postgres_hook import PostgresHook from airflow.models import Variable from oauth2client.service_account import ServiceAccountCredentials import base64 import gspread import json import logging import os import pandas as pd import pytz def write_variable_to_local_file(variable_name, local_file_path): content = Variable.get(variable_name) f = open(local_file_path, "w") f.write(content) f.close() # Data dir로 airflow variable에 있는 인증 정보를 읽어다가 google-sheet.json 파일로 저장해 인증에 사용 def get_gsheet_client(): data_dir = Variable.get("DATA_DIR") scope = ['https://spreadsheets.google.com/feeds', 'https://www.googleapis.com/auth/drive'] gs_json_file_path = data_dir + 'google-sheet.json' write_variable_to_local_file('google_sheet_access_token', gs_json_file_path) credentials = ServiceAccountCredentials.from_json_keyfile_name(gs_json_file_path, scope) gc = gspread.authorize(credentials) return gc def p2f(x): return float(x.strip('%'))/100 def get_google_sheet_to_csv( sheet_uri, tab, filename, header_line=1, remove_dollar_comma=0, rate_to_float=0): """ Download data in a tab (indicated by "tab") in a spreadsheet ("sheet_uri") as a csv ("filename") - if tab is None, then the records in the first tab of the sheet will be downloaded - if tab has only one row in the header, then just use the default value which is 1 - setting remove_dollar_comma to 1 will remove any dollar signs or commas from the values in the CSV file - dollar sign might need to be won sign instead here - setting rate_to_float to 1 will convert any percentage numeric values to fractional values (50% -> 0.5) """ data, header = get_google_sheet_to_lists( sheet_uri, tab, header_line, remove_dollar_comma=remove_dollar_comma) if rate_to_float: for row in data: for i in range(len(row)): if str(row[i]).endswith("%"): row[i] = p2f(row[i]) data = pd.DataFrame(data, columns=header).to_csv( filename, index=False, header=True, encoding='utf-8' ) def get_google_sheet_to_lists(sheet_uri, tab=None, header_line=1, remove_dollar_comma=0): gc = get_gsheet_client() # no tab is given, then take the first sheet # here tab is the title of a sheet of interest if tab is None: wks = gc.open_by_url(sheet_uri).sheet1 else: wks = gc.open_by_url(sheet_uri).worksheet(tab) # list of lists, first value of each list is column header print(wks.get_all_values()) print(int(header_line)-1) data = wks.get_all_values()[header_line-1:] # header = wks.get_all_values()[0] header = data[0] if remove_dollar_comma: data = [replace_dollar_comma(l) for l in data if l != header] else: data = [l for l in data if l != header] return data, header def add_df_to_sheet_in_bulk(sh, sheet, df, header=None, clear=False): records = [] headers = list(df.columns) records.append(headers) for _, row in df.iterrows(): record = [] for column in headers: if str(df.dtypes[column]) in ('object', 'datetime64[ns]'): record.append(str(row[column])) else: record.append(row[column]) records.append(record) if clear: sh.worksheet(sheet).clear() sh.values_update( '{sheet}!A1'.format(sheet=sheet), params={'valueInputOption': 'RAW'}, body={'values': records} ) def update_sheet(filename, sheetname, sql, conn_id): client = get_gsheet_client() hook = PostgresHook(postgres_conn_id=conn_id) sh = client.open(filename) df = hook.get_pandas_df(sql) print(sh.worksheets()) sh.worksheet(sheetname).clear() add_df_to_sheet_in_bulk(sh, sheetname, df.fillna('')) def replace_dollar_comma(lll): return [ ll.replace(',', '').replace('$', '') for ll in lll ]
Python
복사

실행

airflow$ airflow dags test Gsheet_to_Redshift 2023-05-23
⇒ 해결

SQL 결과를 구글 시트로 복사하는 예제

개요

코드

SQL_to_Sheet.py
from airflow import DAG from airflow.operators.python import PythonOperator from plugins import gsheet from datetime import datetime def update_gsheet(**context): sql = context["params"]["sql"] sheetfilename = context["params"]["sheetfilename"] sheetgid = context["params"]["sheetgid"] gsheet.update_sheet(sheetfilename, sheetgid, sql, "redshift_dev_db") with DAG( dag_id = 'SQL_to_Sheet', start_date = datetime(2022,6,18), catchup=False, tags=['example'], schedule = '@once' ) as dag: sheet_update = PythonOperator( dag=dag, task_id='update_sql_to_sheet1', python_callable=update_gsheet, params = { "sql": "SELECT date, round AS nps FROM analytics.nps_summary ORDER BY date", "sheetfilename": "spreadsheet-copy-testing", "sheetgid": "RedshiftToSheet" } )
Python
복사
plugins/gsheet.py의 update_sheet
def update_sheet(filename, sheetname, sql, conn_id): client = get_gsheet_client() hook = PostgresHook(postgres_conn_id=conn_id) sh = client.open(filename) df = hook.get_pandas_df(sql) print(sh.worksheets()) sh.worksheet(sheetname).clear() add_df_to_sheet_in_bulk(sh, sheetname, df.fillna(''))
Python
복사
db 테이블을 읽어다가 df로 바꾼 후, 구글 스프레드 시트에 벌크 업데이트

실행

앞서 데모에서 사용했던 동일한 시트에 새로운 탭을 하나 만듬
이미 필요한 이메일 주소가 해당 시트에 편집자로 공유가 되어 있기에 별도 작업이 필요 없음
거기에 “SELECT * FROM analytics.nps_summary”의 내용을 복사
이 과정을 PythonOperator로 구현
해당 기능은 gsheet 모듈내에 있는 update_sheet라는 함수로 구현했음
airflow$ airflow dags test SQL_to_Sheet 2023-05-23

SQL_to_Sheet 에러 공유

구글 시트 api 발급 받아서 다음과 같이 google sheet access token (구글 서비스 어카운트 json 그대로)를 Variable로 등록한 후에 다음과 같이 SQL_to_Sheet 실행시 에러가 발생합니다. 구글 스프레드 시트 편집자도 허용해놓았습니다.
{ "type": "service_account", "project_id": "linen-office-357903", "private_key_id": "xxxxx", "private_key": "-----BEGIN PRIVATE KEY-----\xxxxx\nGf46fBszYFqzA3+0jJGhKWktmFru4ZAbrVyBIS8d08PPjw6KredfyqczSk5URtDW\nXzG6T2UA4dwZRX6RX3PSBE60i8oC8Ns1vdDEoG3QpITff52kUf0YkSDvBLe/HLRt\nz3HQOlS9L3u6yj8IkTKRiQDIHHNPg8RlUKJ8kq4+ZlCCNOaGAd28r4pAUKGGUT/6\nok4dehSREfdfVSldimm+aTI4bmpZhE9yzmZnWuG977bmBipsm0Ovh3KmkQhjSITh\nd8mShMTjStfSaYa1QF2Y+zgDx/HAQqZU8H1wQYuMERRD0hW+id1ImvuZQf9JmGMV\nmHxsdvrjAgMBAAECgf8Bk34A6kVX/RNEYjxFeolddu3K5zrtnKdWD+70sl6P2j1r\nRgr+zDiCTolNJfac8ITmcBX76iqYYd2k3PnLPLkWSGImcqK1pOlxdJW5QdpkKoKY\nLE7aTjETf/6yyyHmRU0LMTwFp6zf5vqbEb6r8bVGMLOdg7OQBWH5kJVwuBQIGyWY\n/rv3wB7qjlN4+Qcy63GMe8jrDIqq8PTANbzTMPVrRsxVM9t7TD5hcTMueHNI3c/R\nbDDZ+yh3E4eJOn4sxOQq2+RgtrlAQtZJ/3grIERSwqy19HZRTgekl4TjTook9TYk\nCO5sPllFh7P595B/HlnjjC39a5H3eOUCf+x08WECgYEA+iMAZj5ufHvliNHBfEzX\nz9lkUZLEnB42vDpXBjdBIV/8SyT3LK1lbqEknkVp0Dm9ZA/MbwePlCmHz30d44Xx\negSvvjL3QFoKlTYLy0i8OG4jZSbDGyOZHuoHMGFCe4pR1ZHHdqb/LEUjSPCbTakz\niioQzw24+JH3dt9YZ9OWWVECgYEA98YdQfOwMcD1KbXAHKeULFTkx8cSMOq6sw0z\ndb3zyKaN6DpAxsuCX7PrZ1jWv7vrTwvNZt79CQoCViewoos5OdE09nzwmj7KAZGR\nF+5SeqQnw+SL208LeekOV4oQl0/tWzbqwHsyQ1mLSlKwgN+g65R/psj/y0q2O8Z9\nga1gQ/MCgYAZMrF1m1ByBeEnmUnOhccvzwyGEyzvNTP6Xj4qCCLtg8fcogqsW4Ne\nU6lmsENqkIZ9sAG+JZrXKB/gTRBXvUSIMDSsCmJR/0AVq+4gOmGLpby3EeKRt1JT\nhOvYedPCsjpwM1FOqKAIC6b0UBi1CBiDEJ3E3fMGR14QgYtcWjpJIQKBgBxioe9w\nT3sjs/J3U+70c9EwWJVahQb9nS3uETsn68nM6uzz33/myKTWI93PG6sTiD2iCIIE\ngdCLSQE9wsqcUI4DHaIvZhEFqHbNimXHc8OBEt0qTatuPs5UmMQGmSf8jCCrBfHP\nK07z7zUwi+/3rqPOe2FMHaHYiVAHyVEV37VpAoGAKDbg3eN4Gytbll5pNviHmqod\njSlmMVDQJHXSmdY7lI7k9nzSZnDzxd9bkeyrk2XA+zIRj91hZRU8w1jf4tktN63f\n9SjEFnn/zxDVbvFDdUDiJ4UP+KjnMca6X0xWwkKST92NGP0nmr/4XnQf8wzJwGWP\nNUGeV1gQ3g66Y+iH33I=\n-----END PRIVATE KEY-----\n", "client_email": "gsheet@linen-office-357903.iam.gserviceaccount.com", "client_id": "xxxxxx", "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/gsheet%40linen-office-357903.iam.gserviceaccount.com", "universe_domain": "googleapis.com" }
Python
복사
해결 방안
스프레드 시트를 나의 google drive에 사본 저장 후 해당 스프레드 시트를 공유했는데
드라이브에 대한 권한이 없어서 오류가 발생한 것
2.
프로젝트와 관련된 Google 계정으로 로그인되어 있는지 확인합니다.
3.
프로젝트를 생성하거나 기존 프로젝트를 선택하는 프롬프트가 나타나면, ID가 763307781051인 프로젝트를 선택합니다.
4.
"활성화" 버튼을 클릭하여 프로젝트에 Google Drive API를 활성화합니다.
5.
변경 사항이 Google 시스템에 전파되기까지 몇 분 동안 기다립니다.
6.
API가 활성화된 후에 코드 또는 작업을 다시 시도합니다.