Search

MySQL 테이블 복사하기(S3 Connection, MySQL Connection, sql_to_s3)

프로덕션 데이터베이스 (MySQL) ⇒ 데이터 웨어하우스(Redshift)

구현 ETL 예제

OLTP 복사 ⇒ OLAP
MySQL의 nps라는 테이블을 Redshift의 raw_data 밑에 nps라는 테이블로 복사하기
위의 내용을 수행하는 DAG를 2가지 버젼으로 만들어서 수행
1.
Full Refresh 하는 버젼
2.
Incremental Update를 하는 버젼
a.
Execution date를 사용해서 airflow에서 원하는 형태의 Incremental update를 수행
b.
Backfill까지 하는 과정
첫번째 방법 : 파일을 row 하나씩 insert하게 되면 시간이 너무 오래 걸려 비효율적이다. 레코드가 별로 없을 때 선호되는 방법
두번째 방법 : MySQL을 레코드 읽어서 파일로 다운로드 받아서 S3에 적재한 후 Redshift에 Bulk insert 한다.
Connection
Airflow MySQL 커넥션
S3
Airflow 서버가 S3에 접근할 수 있는 보안 세팅
Redshift가 S3에 접근할 수 있는지 보안 세팅

Airflow Connection 세팅 (MySQL, S3)

AWS 관련 권한 설정

Airflow DAG에서 S3 접근 (쓰기 권한)
IAM User를 만들고 S3 버킷에 대한 읽기/쓰기 권한 설정하고 access key와 secret key를 사용
Redshift가 S3 접근 (읽기 권한)
Redshift에 S3를 접근할 수 있는 역할 (Role)을 만들고 이를 Redshift에 지정

MySQL Connection 설정이 필요하다 (production DB)

Host:
Schema: prod
Login:
Password:
Port: 3306
MySQL Connections 설정 시 유의사항
EC2 인스턴스
connection type에 mySQL을 추가하기 위해서는 ubuntu 유저로 설치를 하나 더 해주어야 한다 1. MySQL connection을 추가할 때 MySQL이 드롭다운에서 안 보이면 아래 명령을 ubuntu에서 실행
2. webserver와 scheduler를 재시작해주시기 바랍니다.
sudo pip3 install --ignore-installed "apache-airflow-providers-postgres" sudo pip3 install --ignore-installed "apache-airflow-providers-amazon" ## 위에 까지가 이전 설치 sudo apt-get install -y libmysqlclient-dev sudo pip3 install --ignore-installed "apache-airflow-providers-mysql" sudo systemctl restart airflow-webserver sudo systemctl restart airflow-scheduler
Python
복사
Docker container
아래 명령을 Airflow Scheduler Docker Container에 root 유저로 로그인해서 실행
Worker, Webserver container에도 동일 방식 적용
docker exec --user root -it 0017662673c3 /bin/bash (airflow) sudo apt-get update sudo apt-get install -y default-libmysqlclient-dev sudo apt-get install -y gcc sudo pip3 install --ignore-installed "apache-airflow-providers-mysql"
Python
복사
위의 명령들은 “ModuleNotFoundError: No module named 'MySQLdb'”에러를 해결하기 위함

AWS S3 Connection 설정

S3 버켓 이름 : grepp-data-engineering
이 S3 연결을 위해 별도 사용자를 만들고 그 사용자의 키들을 권한 인증을 위해 사용할 예정
Access Key ID와 Secret Access Key를 사용하는 걸로 바뀜
루트 사용자의 키들을 사용하면 해킹시 AWS 자원들을 마음대로 사용 가능 ⇒ 여러번 사고가 남
우리가 사용해볼 Best Practice는
IAM(Identity and Access Management)을 사용해 별도의 사용자를 만들고
그 사용자에게 해당 S3 bucket을 읽고 쓸 수 있는 권한을 제공하고
그 사용자의 Access Key ID와 Secret Access Key를 사용
이 키도 주기적으로 변경해서 해킹이 될 경우의 피해를 최소화
AWS IAM 사용자 설정
IAM User 이름을 airflow-s3-access로 지정
Policy를 직접 추가
“Attach policies directly” 선택
두 가지 방법이 존재
1.
Create policy를 선택하고 대상이 되는 S3 bucket에 대한 권한만 지정
2.
S3에 대한 모든 권한 지정(AmazonS3FullAccess) 당연히 더 좋은 방법은 1번이며 이를 사용해볼 예정
Create policy를 선택하고 대상이 되는 S3 bucket에 대한 권한만 지정하는 방법
Custom Policy의 내용으로 아래를 설정
정책 이름 :
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetBucketLocation", "s3:ListAllMyBuckets" ], "Resource": "arn:aws:s3:::*" }, { "Effect": "Allow", "Action": "s3:*", "Resource": [ "arn:aws:s3:::grepp-data-engineering", "arn:aws:s3:::grepp-data-engineering/*" ] } ] }
Python
복사
IAM > Users > airflow-s3-access 선택
security credential ⇒ access key 발급
나중에 redshift 클러스터를 직접 구성해서 airflow를 만들 때 위와 같은 세팅을 거치면 된다.
Airflow - S3 Connection 설정
connection_id : aws_conn_id
Conn Type: s3나 Amazon Web Service나 Generic을 선택
Extra: { "region_name": "ap-northeast-2" }
S3 operators 에 명확하게 사용되어야 한다
로그인과 패스워드 설정이 없어도 접근이 가능하다…
IAM과 s3 버켓 접근 권한 : 그 이유는 airflow 서버를 띄울 때, advanced setting 설정하게 되는데 설정된 ec2 서버들은 s3 접근 권한 정책을 부여가 되어 있기 때문
총 2개의 새로운 connection 생성

MySQL의 테이블 리뷰 (OLTP, Production Database)

이미 테이블은 이미 MySQL쪽에 만들어져있고 레코드들이 존재하며 이를 Redshift로 복사하는 것이 이번 실습 내용입니다.
MySQL의 테이블 리뷰 (OLTP, Production Database)
MySQL - OLTP.
처리할 수 있는 데이터가 크진 않지만 처리 속도가 빠르다. 서버 한대짜리이며 일반적으로 용량이 작습니다. 서버 한대에서 설치 가능한 디스크와 메모리 한계가 존재
CREATE TABLE prod.nps ( id INT NOT NULL AUTO_INCREMENT primary key, created_at timestamp, score smallint );
SQL
복사
Redshift(OLAP, Data Warehouse)에 해당 테이블 생성
이 테이블들은 Redshift쪽에 본인 스키마 밑에 별도로 만들고 뒤에서 실습할 DAG를 통해 MySQL쪽 테이블로부터 Redshift 테이블로 복사하는 것이 우리가 해볼 실습
CREATE TABLE (생성 본인의스키마).nps ( id INT NOT NULL primary key, created_at timestamp, score smallint );
SQL
복사

DAG 코드 설명

Airflow의 SqlToS3Operator는 데이터베이스에서 데이터를 쿼리하여 그 결과를 Amazon S3에 파일 형식으로 저장하는 작업을 자동화하는데 사용됩니다.
이 오퍼레이터는 일반적으로 ETL(Extract, Transform, Load) 파이프라인의 일부로 활용되며, 데이터를 추출하고 S3 버킷에 저장하는 과정을 단순화합니다.
작동 방식은 다음과 같습니다:
1.
초기 설정: SqlToS3Operator는 SQL 쿼리, 대상 S3 버킷 및 파일 경로, 데이터베이스 연결 정보 및 S3 연결 정보 등을 매개변수로 받습니다. 또한, 저장할 파일 형식(csv, json, parquet 등)과 Pandas DataFrame의 변환 옵션도 설정할 수 있습니다.
a.
저장할 파일 형식에 알맞게
2.
데이터 추출: 지정된 SQL 쿼리가 sql alchemy를 통해 실행되어 데이터베이스로부터 데이터가 추출됩니다. 이 데이터는 Pandas DataFrame 형식으로 변환되며, 필요한 경우 추가적인 데이터 변환 작업이 이루어질 수 있습니다.
3.
데이터 저장: 추출된 데이터는 설정된 파일 형식으로 S3 버킷에 저장됩니다. 여기서 데이터는 지정된 S3 경로에 파일로 생성됩니다. 이 과정에서 파일이 이미 존재하는 경우에는 replace 매개변수에 따라 기존 파일을 대체하거나 예외를 발생시킬 수 있습니다.
4.
로깅 및 예외 처리: 오퍼레이터는 실행 중 발생하는 이벤트를 로깅하며, 예외 발생 시 이를 적절히 처리합니다.
SqlToS3Operator는 Airflow의 강력한 스케줄링 및 모니터링 기능과 함께 사용되어, 데이터 파이프라인의 안정성과 효율성을 높이는 데 중요한 역할을 합니다. 이 오퍼레이터를 사용함으로써, 복잡한 데이터 이동 과정을 단순화하고 자동화할 수 있습니다.

MySQL_to_Redshift DAG의 Task 구성

아래 2개는 airflow의
SqlToS3Operator
MySQL SQL select 결과 → S3 버켓에 파일로 업로드
(s3://grepp-data-engineering/{본인ID}-nps)
s3://s3_bucket/s3_key
S3ToRedshiftOperator
S3 → Redshift 테이블
S3에 업로드된 파일을 Redshift 특정 스키마에 Bulk 업로드
(s3://grepp-data-engineering/{본인ID}-nps) → Redshift (본인스키마.nps)
COPY command is used

MySQL_to_Redshift.py 코드

DAG : MySQL_to_Redshift.py 코드
MySQL 있는 테이블 nps를 Redshift내의 각자 스키마 밑의 nps 테이블로 복사
S3를 경유해서 COPY 명령으로 복사
아래 DAG는 한 번 돌면 에러 발생
from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator from airflow.models import Variable from datetime import datetime from datetime import timedelta import requests import logging import psycopg2 import json dag = DAG( dag_id = 'MySQL_to_Redshift', start_date = datetime(2022,8,24), # 날짜가 미래인 경우 실행이 안됨 schedule = '0 9 * * *', # 적당히 조절 max_active_runs = 1, catchup = False, default_args = { 'retries': 1, 'retry_delay': timedelta(minutes=3), } ) schema = "keeyong" table = "nps" s3_bucket = "grepp-data-engineering" s3_key = schema + "-" + table mysql_to_s3_nps = SqlToS3Operator( task_id = 'mysql_to_s3_nps', query = "SELECT * FROM prod.nps", # sql에 실행할 쿼리 s3_bucket = s3_bucket, s3_key = s3_key, sql_conn_id = "mysql_conn_id", aws_conn_id = "aws_conn_id", verify = False, replace = True, # 기존 파일 대체 pd_kwargs={"index": False, "header": False}, dag = dag ) s3_to_redshift_nps = S3ToRedshiftOperator( task_id = 's3_to_redshift_nps', s3_bucket = s3_bucket, s3_key = s3_key, schema = schema, table = table, copy_options=['csv'], method = 'REPLACE', redshift_conn_id = "redshift_dev_db", aws_conn_id = "aws_conn_id", dag = dag ) mysql_to_s3_nps >> s3_to_redshift_nps
Python
복사
1.
SqlToS3Operator
query (str) - 실행할 SQL 쿼리입니다. 파일을 실행하려면 .sql 확장자로 끝나는 절대 경로를 지정하십시오. (템플릿)
s3_bucket (str) - 데이터가 저장될 버킷입니다
s3_key (str) - 파일의 키입니다. 파일 이름을 포함합니다.
replace (bool) - 이전에 S3에 파일이 있었을 경우 파일을 대체할지 여부입니다.
sql_conn_id (str) - 특정 데이터베이스에 대한 참조입니다.
parameters (None | Mapping | Iterable) - (옵션) SQL 쿼리를 렌더링하는 데 사용할 매개변수입니다.
aws_conn_id (str) - 특정 S3 연결에 대한 참조입니다.
verify : S3 연결에 대한 SSL 인증서 검증 여부를 결정합니다. 기본적으로 SSL 인증서는 검증됩니다.
file_format (typing_extensions.Literal[csv, json, parquet])
대상 파일 형식입니다. 문자열 'csv', 'json' 또는 'parquet'만 허용됩니다.
pd_kwargs (dict | None)
DataFrame .to_parquet(), .to_json() 또는 .to_csv()에서 포함할 인수입니다.
index, header 포함 여부 결정 가능
groupby_kwargs (dict | None) - DataFrame groupby()에서 포함할 인수입니다.
2.
S3ToRedshiftOperator
schema (str) - redshift 데이터베이스의 특정 스키마에 대한 참조
table (str) - redshift 데이터베이스의 특정 테이블에 대한 참조
s3_bucket (str) - 특정 S3 버킷에 대한 참조
s3_key (str) - S3에서 하나 또는 여러 개의 객체를 선택하는 데 사용되는 키 접두사
redshift_conn_id (str) - 특정 redshift 데이터베이스 또는 redshift data-api 연결에 대한 참조
aws_conn_id (str)
특정 S3 연결에 대한 참조. AWS 연결이 extras에 'aws_iam_role'을 포함하고 있다면 연산자는 토큰을 사용하는 AWS STS 자격 증명을 사용합니다.
verify (bool | str | None)
S3 연결의 SSL 인증서를 검증할지 여부. 기본적으로 SSL 인증서가 검증됩니다. 다음 값을 제공할 수 있습니다.
False: SSL 인증서를 검증하지 않습니다.(use_ssl이 False가 아니라면) 여전히 SSL이 사용됩니다. 
path/to/cert/bundle.pem: 사용할 CA cert 번들의 파일 이름입니다. botocore에서 사용하는 CA cert 번들과 다른 CA cert 번들을 사용하려면이 인수를 지정할 수 있습니다.
column_list (list[str] | None) - 로드 할 열 이름 목록
copy_options (list | None) - s3 파일 양식
method (str) - 실행 시 수행할 작업.
APPEND : 추가
UPSERT : Incremental update
REPLACE : full refresh
upsert_keys (list[str] | None) - upsert 작업에서 키로 사용할 필드 목록
redshift_data_api_kwargs (dict)
SQL 기반 연결 대신 Redshift Data API를 사용하는 경우 Hook의 execute_query 메서드에 대한 인수 딕셔너리.
이러한 kwargs 중 어느 하나도 포함할 수 없습니다: {'sql', 'parameters'}
airflow scheduler 도커 컨테이너 접속
airflow dags test MySQL_to_Redshift

MySQL_to_Redshift_v2.py (MySQL 테이블의 Incremental Update 방식)

아래는 2.5.1버젼 기준 코드
DB에 어떤 시점에 바뀌거나 생성되었다는 정보 칼럼이 없다면 Full refresh 밖에 방법이 없다.
MySQL/PostgreSQL 테이블이라면 다음을 만족해야함
created (timestamp): Optional
modified (timestamp)
deleted (boolean): 물리적으로 삭제가 가능하다면 레코드를 삭제하지 않고 deleted를 True로 설정
Daily Update이고 테이블의 이름이 A이고 MySQL에서 읽어오는 방법
1.
ROW_NUMBER로 직접 구현하는 경우
먼저 Redshift의 A 테이블의 내용을 임시 테이블 temp_A로 복사
MySQL의 A 테이블의 레코드 중 modified의 날짜가 지난 일(execution_date)에 해당하는 모든 레코드를 읽어다가 temp_A로 복사
아래는 MySQL에 보내는 쿼리. 결과를 파일로 저장한 후 S3로 업로드하고 COPY 수행
SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)
temp_A의 레코드들을 primary key를 기준으로 파티션한 다음에 modified 값을 기준으로 DESC 정렬해서, 일련번호가 1인 것들만 다시 A로 복사
2.
S3ToRedshiftOperator로 구현하는 경우 ⇒ 아래 코드와 같다.
query 파라미터로 아래를 지정
SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)
method 파라미터로 “UPSERT”를 지정
upsert : insert + update
pk를 확인해서 만약 db에 존재할 시 skip, 없을 시 insert하는 방식
upsert_keys 파라미터로 Primary key를 지정
upsert_keys → 리스트
앞서 nps 테이블이라면 “id” 필드를 사용
코드
MySQL 있는 테이블 nps를 Redshift내의 각자 스키마 밑의 nps 테이블로 복사
이 작업이 성공하려면 Redshift가 S3 버킷에 대한 액세스 권한을 갖고 있어야함
권한의 생성은 Redshift에게 위 S3 버켓에 대한 액세스 권한 지정
2개의 Operator를 사용해서 구현
SqlToS3Operator: execution_date에 해당하는 레코드만 읽어오게 바뀜
S3ToRedshiftOperator
from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator from airflow.models import Variable from datetime import datetime from datetime import timedelta import requests import logging import psycopg2 import json dag = DAG( dag_id = 'MySQL_to_Redshift_v2', start_date = datetime(2023,1,1), # 날짜가 미래인 경우 실행이 안됨 schedule = '0 9 * * *', # 적당히 조절 max_active_runs = 1, catchup = False, default_args = { 'retries': 1, 'retry_delay': timedelta(minutes=3), } ) schema = "hajuny129" table = "nps" s3_bucket = "grepp-data-engineering" s3_key = schema + "-" + table # s3_key = schema + "/" + table # 지정된 날짜(execution_date)에 바뀐 레코드들만 읽어다가 s3에 업로드 sql = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')" print(sql) mysql_to_s3_nps = SqlToS3Operator( task_id = 'mysql_to_s3_nps', query = sql, s3_bucket = s3_bucket, s3_key = s3_key, sql_conn_id = "mysql_conn_id", aws_conn_id = "aws_conn_id", verify = False, replace = True, pd_kwargs={"index": False, "header": False}, dag = dag ) # upsert = id를 기존으로 기존의 레코드 변경 시 대체 및 추가 s3_to_redshift_nps = S3ToRedshiftOperator( task_id = 's3_to_redshift_nps', s3_bucket = s3_bucket, s3_key = s3_key, schema = schema, table = table, copy_options=['csv'], redshift_conn_id = "redshift_dev_db", aws_conn_id = "aws_conn_id", method = "UPSERT", upsert_keys = ["id"], dag = dag ) mysql_to_s3_nps >> s3_to_redshift_nps
Python
복사
{{ execution_date }} ⇒ airflow의 시스템 변수를 읽어오는 방식
airflow scheduler 도커 컨테이너 접속
airflow dags test MySQL_to_Redshift_v2 2023-01-27
execution_date 시스템 변수로 2023-01-27 로 설정
요약
AWS S3 Connections 설정 (IAM User 설정)
Redshift S3 Connections 설정 (IAM Role 설정)
MySQL 관련 모듈 설치 (Docker)
MySQL_to_Redshift DAG 실행
MySQL_to_Redshift_v2 DAG 실행

MySQL_to_Redshift_v3.py 살펴보기

incremental update를 하는 버젼
2.3 이하의 airflow를 사용한다면 자체 제작된 plugin 사용
MySQL 있는 테이블 nps를 Redshift내의 각자 스키마 밑의 nps 테이블로 복사
Variable로 iam_role_for_copy_access_token 추가
arn:aws:iam::080705373126:role/redshift.read.s3
이 권한의 생성은 Redshift에게 위 S3 버켓에 대한 액세스 권한 지정 참고
이는 COPY 명령을 실행할 수 있는 권한이 있음을 보여주기 위해 사용됨
COPY keeyong.nps FROM 's3://grepp-data-engineering/keeyong-nps' with credentials 'aws_iam_role=***' csv;
Python
복사
3개의 Operator를 사용해서 구현
S3DeleteObjectsOperator
MySQLToS3Operator: execution_date에 해당하는 레코드만 읽어오게 바뀜
자체 구현한 S3ToRedshiftOperator (plugins 폴더)
s3_to_redshift_nps = S3ToRedshiftOperator( task_id = 's3_to_redshift_nps', s3_bucket = s3_bucket, s3_key = s3_key, schema = schema, table = table, copy_options=['csv'], redshift_conn_id = "redshift_dev_db", primary_key = "id", order_key = "created_at", dag = dag )
Python
복사
from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.transfers.mysql_to_s3 import MySQLToS3Operator from plugins.s3_to_redshift_operator import S3ToRedshiftOperator from airflow.providers.amazon.aws.operators.s3_delete_objects import S3DeleteObjectsOperator from airflow.models import Variable from datetime import datetime from datetime import timedelta import requests import logging import psycopg2 import json dag = DAG( dag_id = 'MySQL_to_Redshift_v3', start_date = datetime(2022,8,24), # 날짜가 미래인 경우 실행이 안됨 schedule_interval = '0 9 * * *', # 적당히 조절 max_active_runs = 1, catchup = True, default_args = { 'retries': 1, 'retry_delay': timedelta(minutes=3), } ) schema = "keeyong" table = "nps" s3_bucket = "grepp-data-engineering" s3_key = schema + "-" + table # s3_key = schema + "/" + table s3_folder_cleanup = S3DeleteObjectsOperator( task_id = 's3_folder_cleanup', bucket = s3_bucket, keys = s3_key, aws_conn_id = "aws_conn_id", dag = dag ) ## {{ sys var }} => airflow system variable mysql_to_s3_nps = MySQLToS3Operator( task_id = 'mysql_to_s3_nps', query = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')", s3_bucket = s3_bucket, s3_key = s3_key, mysql_conn_id = "mysql_conn_id", aws_conn_id = "aws_conn_id", verify = False, dag = dag ) s3_to_redshift_nps = S3ToRedshiftOperator( task_id = 's3_to_redshift_nps', s3_bucket = s3_bucket, s3_key = s3_key, schema = schema, table = table, copy_options=['csv'], redshift_conn_id = "redshift_dev_db", primary_key = "id", order_key = "created_at", dag = dag ) s3_folder_cleanup >> mysql_to_s3_nps >> s3_to_redshift_nps
Python
복사