프로덕션 데이터베이스 (MySQL) ⇒ 데이터 웨어하우스(Redshift)
구현 ETL 예제
•
OLTP 복사 ⇒ OLAP
◦
MySQL의 nps라는 테이블을 Redshift의 raw_data 밑에 nps라는 테이블로 복사하기
◦
위의 내용을 수행하는 DAG를 2가지 버젼으로 만들어서 수행
1.
Full Refresh 하는 버젼
2.
Incremental Update를 하는 버젼
a.
Execution date를 사용해서 airflow에서 원하는 형태의 Incremental update를 수행
b.
Backfill까지 하는 과정
◦
첫번째 방법 : 파일을 row 하나씩 insert하게 되면 시간이 너무 오래 걸려 비효율적이다. 레코드가 별로 없을 때 선호되는 방법
◦
두번째 방법 : MySQL을 레코드 읽어서 파일로 다운로드 받아서 S3에 적재한 후 Redshift에 Bulk insert 한다.
•
Connection
◦
Airflow MySQL 커넥션
◦
S3
▪
Airflow 서버가 S3에 접근할 수 있는 보안 세팅
▪
Redshift가 S3에 접근할 수 있는지 보안 세팅
Airflow Connection 세팅 (MySQL, S3)
AWS 관련 권한 설정
•
Airflow DAG에서 S3 접근 (쓰기 권한)
◦
IAM User를 만들고 S3 버킷에 대한 읽기/쓰기 권한 설정하고 access key와 secret key를 사용
•
Redshift가 S3 접근 (읽기 권한)
◦
Redshift에 S3를 접근할 수 있는 역할 (Role)을 만들고 이를 Redshift에 지정
MySQL Connection 설정이 필요하다 (production DB)
•
Host:
•
Schema: prod
•
Login:
•
Password:
•
Port: 3306
•
MySQL Connections 설정 시 유의사항
◦
EC2 인스턴스
▪
connection type에 mySQL을 추가하기 위해서는 ubuntu 유저로 설치를 하나 더 해주어야 한다
1. MySQL connection을 추가할 때 MySQL이 드롭다운에서 안 보이면 아래 명령을 ubuntu에서 실행
2. webserver와 scheduler를 재시작해주시기 바랍니다.
sudo pip3 install --ignore-installed "apache-airflow-providers-postgres"
sudo pip3 install --ignore-installed "apache-airflow-providers-amazon"
## 위에 까지가 이전 설치
sudo apt-get install -y libmysqlclient-dev
sudo pip3 install --ignore-installed "apache-airflow-providers-mysql"
sudo systemctl restart airflow-webserver
sudo systemctl restart airflow-scheduler
Python
복사
◦
Docker container
▪
아래 명령을 Airflow Scheduler Docker Container에 root 유저로 로그인해서 실행
•
Worker, Webserver container에도 동일 방식 적용
docker exec --user root -it 0017662673c3 /bin/bash
(airflow) sudo apt-get update
sudo apt-get install -y default-libmysqlclient-dev
sudo apt-get install -y gcc
sudo pip3 install --ignore-installed "apache-airflow-providers-mysql"
Python
복사
▪
위의 명령들은 “ModuleNotFoundError: No module named 'MySQLdb'”에러를 해결하기 위함
AWS S3 Connection 설정
•
S3 버켓 이름 : grepp-data-engineering
◦
이 S3 연결을 위해 별도 사용자를 만들고 그 사용자의 키들을 권한 인증을 위해 사용할 예정
•
Access Key ID와 Secret Access Key를 사용하는 걸로 바뀜
◦
루트 사용자의 키들을 사용하면 해킹시 AWS 자원들을 마음대로 사용 가능 ⇒ 여러번 사고가 남
•
우리가 사용해볼 Best Practice는
◦
IAM(Identity and Access Management)을 사용해 별도의 사용자를 만들고
◦
그 사용자에게 해당 S3 bucket을 읽고 쓸 수 있는 권한을 제공하고
◦
그 사용자의 Access Key ID와 Secret Access Key를 사용
▪
이 키도 주기적으로 변경해서 해킹이 될 경우의 피해를 최소화
•
AWS IAM 사용자 설정
◦
IAM User 이름을 airflow-s3-access로 지정
◦
Policy를 직접 추가
▪
“Attach policies directly” 선택
▪
두 가지 방법이 존재
1.
Create policy를 선택하고 대상이 되는 S3 bucket에 대한 권한만 지정
2.
S3에 대한 모든 권한 지정(AmazonS3FullAccess)
당연히 더 좋은 방법은 1번이며 이를 사용해볼 예정
◦
Create policy를 선택하고 대상이 되는 S3 bucket에 대한 권한만 지정하는 방법
▪
Custom Policy의 내용으로 아래를 설정
•
정책 이름 :
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:ListAllMyBuckets"
],
"Resource": "arn:aws:s3:::*"
},
{
"Effect": "Allow",
"Action": "s3:*",
"Resource": [
"arn:aws:s3:::grepp-data-engineering",
"arn:aws:s3:::grepp-data-engineering/*"
]
}
]
}
Python
복사
◦
IAM > Users > airflow-s3-access 선택
▪
security credential ⇒ access key 발급
◦
나중에 redshift 클러스터를 직접 구성해서 airflow를 만들 때 위와 같은 세팅을 거치면 된다.
•
Airflow - S3 Connection 설정
◦
connection_id : aws_conn_id
◦
Conn Type: s3나 Amazon Web Service나 Generic을 선택
◦
Extra: { "region_name": "ap-northeast-2" }
◦
S3 operators 에 명확하게 사용되어야 한다
◦
로그인과 패스워드 설정이 없어도 접근이 가능하다…
▪
IAM과 s3 버켓 접근 권한 : 그 이유는 airflow 서버를 띄울 때, advanced setting 설정하게 되는데 설정된 ec2 서버들은 s3 접근 권한 정책을 부여가 되어 있기 때문
•
총 2개의 새로운 connection 생성
MySQL의 테이블 리뷰 (OLTP, Production Database)
•
이미 테이블은 이미 MySQL쪽에 만들어져있고 레코드들이 존재하며 이를 Redshift로 복사하는 것이 이번 실습 내용입니다.
•
MySQL의 테이블 리뷰 (OLTP, Production Database)
◦
MySQL - OLTP.
◦
처리할 수 있는 데이터가 크진 않지만 처리 속도가 빠르다. 서버 한대짜리이며 일반적으로 용량이 작습니다. 서버 한대에서 설치 가능한 디스크와 메모리 한계가 존재
CREATE TABLE prod.nps (
id INT NOT NULL AUTO_INCREMENT primary key,
created_at timestamp,
score smallint
);
SQL
복사
•
Redshift(OLAP, Data Warehouse)에 해당 테이블 생성
◦
이 테이블들은 Redshift쪽에 본인 스키마 밑에 별도로 만들고 뒤에서 실습할 DAG를 통해 MySQL쪽 테이블로부터 Redshift 테이블로 복사하는 것이 우리가 해볼 실습
CREATE TABLE (생성 본인의스키마).nps (
id INT NOT NULL primary key,
created_at timestamp,
score smallint
);
SQL
복사
DAG 코드 설명
Airflow의 SqlToS3Operator는 데이터베이스에서 데이터를 쿼리하여 그 결과를 Amazon S3에 파일 형식으로 저장하는 작업을 자동화하는데 사용됩니다.
이 오퍼레이터는 일반적으로 ETL(Extract, Transform, Load) 파이프라인의 일부로 활용되며, 데이터를 추출하고 S3 버킷에 저장하는 과정을 단순화합니다.
작동 방식은 다음과 같습니다:
1.
초기 설정: SqlToS3Operator는 SQL 쿼리, 대상 S3 버킷 및 파일 경로, 데이터베이스 연결 정보 및 S3 연결 정보 등을 매개변수로 받습니다. 또한, 저장할 파일 형식(csv, json, parquet 등)과 Pandas DataFrame의 변환 옵션도 설정할 수 있습니다.
a.
저장할 파일 형식에 알맞게
2.
데이터 추출: 지정된 SQL 쿼리가 sql alchemy를 통해 실행되어 데이터베이스로부터 데이터가 추출됩니다. 이 데이터는 Pandas DataFrame 형식으로 변환되며, 필요한 경우 추가적인 데이터 변환 작업이 이루어질 수 있습니다.
3.
데이터 저장: 추출된 데이터는 설정된 파일 형식으로 S3 버킷에 저장됩니다. 여기서 데이터는 지정된 S3 경로에 파일로 생성됩니다. 이 과정에서 파일이 이미 존재하는 경우에는 replace 매개변수에 따라 기존 파일을 대체하거나 예외를 발생시킬 수 있습니다.
4.
로깅 및 예외 처리: 오퍼레이터는 실행 중 발생하는 이벤트를 로깅하며, 예외 발생 시 이를 적절히 처리합니다.
SqlToS3Operator는 Airflow의 강력한 스케줄링 및 모니터링 기능과 함께 사용되어, 데이터 파이프라인의 안정성과 효율성을 높이는 데 중요한 역할을 합니다. 이 오퍼레이터를 사용함으로써, 복잡한 데이터 이동 과정을 단순화하고 자동화할 수 있습니다.
MySQL_to_Redshift DAG의 Task 구성
•
아래 2개는 airflow의
•
SqlToS3Operator
◦
MySQL SQL select 결과 → S3 버켓에 파일로 업로드
◦
(s3://grepp-data-engineering/{본인ID}-nps)
◦
s3://s3_bucket/s3_key
•
S3ToRedshiftOperator
◦
S3 → Redshift 테이블
▪
S3에 업로드된 파일을 Redshift 특정 스키마에 Bulk 업로드
◦
(s3://grepp-data-engineering/{본인ID}-nps) → Redshift (본인스키마.nps)
◦
COPY command is used
MySQL_to_Redshift.py 코드
•
DAG : MySQL_to_Redshift.py 코드
◦
MySQL 있는 테이블 nps를 Redshift내의 각자 스키마 밑의 nps 테이블로 복사
◦
S3를 경유해서 COPY 명령으로 복사
◦
아래 DAG는 한 번 돌면 에러 발생
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable
from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2
import json
dag = DAG(
dag_id = 'MySQL_to_Redshift',
start_date = datetime(2022,8,24), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 9 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
)
schema = "keeyong"
table = "nps"
s3_bucket = "grepp-data-engineering"
s3_key = schema + "-" + table
mysql_to_s3_nps = SqlToS3Operator(
task_id = 'mysql_to_s3_nps',
query = "SELECT * FROM prod.nps", # sql에 실행할 쿼리
s3_bucket = s3_bucket,
s3_key = s3_key,
sql_conn_id = "mysql_conn_id",
aws_conn_id = "aws_conn_id",
verify = False,
replace = True, # 기존 파일 대체
pd_kwargs={"index": False, "header": False},
dag = dag
)
s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = 's3_to_redshift_nps',
s3_bucket = s3_bucket,
s3_key = s3_key,
schema = schema,
table = table,
copy_options=['csv'],
method = 'REPLACE',
redshift_conn_id = "redshift_dev_db",
aws_conn_id = "aws_conn_id",
dag = dag
)
mysql_to_s3_nps >> s3_to_redshift_nps
Python
복사
1.
SqlToS3Operator
•
•
•
•
•
•
parameters (None | Mapping | Iterable) - (옵션) SQL 쿼리를 렌더링하는 데 사용할 매개변수입니다.
•
•
verify : S3 연결에 대한 SSL 인증서 검증 여부를 결정합니다. 기본적으로 SSL 인증서는 검증됩니다.
•
file_format (typing_extensions.Literal[csv, json, parquet])
◦
대상 파일 형식입니다. 문자열 'csv', 'json' 또는 'parquet'만 허용됩니다.
•
◦
DataFrame .to_parquet(), .to_json() 또는 .to_csv()에서 포함할 인수입니다.
◦
index, header 포함 여부 결정 가능
•
2.
S3ToRedshiftOperator
•
•
•
•
•
•
◦
특정 S3 연결에 대한 참조. AWS 연결이 extras에 'aws_iam_role'을 포함하고 있다면 연산자는 토큰을 사용하는 AWS STS 자격 증명을 사용합니다.
•
•
•
◦
SQL 기반 연결 대신 Redshift Data API를 사용하는 경우 Hook의 execute_query 메서드에 대한 인수 딕셔너리.
◦
이러한 kwargs 중 어느 하나도 포함할 수 없습니다: {'sql', 'parameters'}
•
airflow scheduler 도커 컨테이너 접속
•
airflow dags test MySQL_to_Redshift
MySQL_to_Redshift_v2.py (MySQL 테이블의 Incremental Update 방식)
•
아래는 2.5.1버젼 기준 코드
•
DB에 어떤 시점에 바뀌거나 생성되었다는 정보 칼럼이 없다면 Full refresh 밖에 방법이 없다.
•
MySQL/PostgreSQL 테이블이라면 다음을 만족해야함
◦
created (timestamp): Optional
◦
modified (timestamp)
◦
deleted (boolean): 물리적으로 삭제가 가능하다면 레코드를 삭제하지 않고 deleted를 True로 설정
•
Daily Update이고 테이블의 이름이 A이고 MySQL에서 읽어오는 방법
1.
ROW_NUMBER로 직접 구현하는 경우
•
먼저 Redshift의 A 테이블의 내용을 임시 테이블 temp_A로 복사
•
MySQL의 A 테이블의 레코드 중 modified의 날짜가 지난 일(execution_date)에 해당하는 모든 레코드를 읽어다가 temp_A로 복사
◦
아래는 MySQL에 보내는 쿼리. 결과를 파일로 저장한 후 S3로 업로드하고 COPY 수행
▪
SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)
•
temp_A의 레코드들을 primary key를 기준으로 파티션한 다음에 modified 값을 기준으로 DESC 정렬해서, 일련번호가 1인 것들만 다시 A로 복사
2.
S3ToRedshiftOperator로 구현하는 경우 ⇒ 아래 코드와 같다.
•
query 파라미터로 아래를 지정
◦
SELECT * FROM A WHERE DATE(modified) = DATE(execution_date)
•
method 파라미터로 “UPSERT”를 지정
◦
upsert : insert + update
▪
pk를 확인해서 만약 db에 존재할 시 skip, 없을 시 insert하는 방식
•
upsert_keys 파라미터로 Primary key를 지정
◦
upsert_keys → 리스트
◦
앞서 nps 테이블이라면 “id” 필드를 사용
•
코드
◦
MySQL 있는 테이블 nps를 Redshift내의 각자 스키마 밑의 nps 테이블로 복사
◦
이 작업이 성공하려면 Redshift가 S3 버킷에 대한 액세스 권한을 갖고 있어야함
◦
권한의 생성은 Redshift에게 위 S3 버켓에 대한 액세스 권한 지정
◦
2개의 Operator를 사용해서 구현
▪
SqlToS3Operator: execution_date에 해당하는 레코드만 읽어오게 바뀜
▪
S3ToRedshiftOperator
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from airflow.models import Variable
from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2
import json
dag = DAG(
dag_id = 'MySQL_to_Redshift_v2',
start_date = datetime(2023,1,1), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 9 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
)
schema = "hajuny129"
table = "nps"
s3_bucket = "grepp-data-engineering"
s3_key = schema + "-" + table # s3_key = schema + "/" + table
# 지정된 날짜(execution_date)에 바뀐 레코드들만 읽어다가 s3에 업로드
sql = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')"
print(sql)
mysql_to_s3_nps = SqlToS3Operator(
task_id = 'mysql_to_s3_nps',
query = sql,
s3_bucket = s3_bucket,
s3_key = s3_key,
sql_conn_id = "mysql_conn_id",
aws_conn_id = "aws_conn_id",
verify = False,
replace = True,
pd_kwargs={"index": False, "header": False},
dag = dag
)
# upsert = id를 기존으로 기존의 레코드 변경 시 대체 및 추가
s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = 's3_to_redshift_nps',
s3_bucket = s3_bucket,
s3_key = s3_key,
schema = schema,
table = table,
copy_options=['csv'],
redshift_conn_id = "redshift_dev_db",
aws_conn_id = "aws_conn_id",
method = "UPSERT",
upsert_keys = ["id"],
dag = dag
)
mysql_to_s3_nps >> s3_to_redshift_nps
Python
복사
◦
{{ execution_date }} ⇒ airflow의 시스템 변수를 읽어오는 방식
◦
airflow scheduler 도커 컨테이너 접속
◦
airflow dags test MySQL_to_Redshift_v2 2023-01-27
▪
execution_date 시스템 변수로 2023-01-27 로 설정
•
요약
◦
AWS S3 Connections 설정 (IAM User 설정)
◦
Redshift S3 Connections 설정 (IAM Role 설정)
◦
MySQL 관련 모듈 설치 (Docker)
◦
MySQL_to_Redshift DAG 실행
◦
MySQL_to_Redshift_v2 DAG 실행
MySQL_to_Redshift_v3.py 살펴보기
•
incremental update를 하는 버젼
•
2.3 이하의 airflow를 사용한다면 자체 제작된 plugin 사용
•
MySQL 있는 테이블 nps를 Redshift내의 각자 스키마 밑의 nps 테이블로 복사
•
Variable로 iam_role_for_copy_access_token 추가
◦
arn:aws:iam::080705373126:role/redshift.read.s3
▪
이 권한의 생성은 Redshift에게 위 S3 버켓에 대한 액세스 권한 지정 참고
◦
이는 COPY 명령을 실행할 수 있는 권한이 있음을 보여주기 위해 사용됨
COPY keeyong.nps
FROM 's3://grepp-data-engineering/keeyong-nps'
with credentials 'aws_iam_role=***'
csv;
Python
복사
•
3개의 Operator를 사용해서 구현
◦
S3DeleteObjectsOperator
◦
MySQLToS3Operator: execution_date에 해당하는 레코드만 읽어오게 바뀜
◦
자체 구현한 S3ToRedshiftOperator (plugins 폴더)
s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = 's3_to_redshift_nps',
s3_bucket = s3_bucket,
s3_key = s3_key,
schema = schema,
table = table,
copy_options=['csv'],
redshift_conn_id = "redshift_dev_db",
primary_key = "id",
order_key = "created_at",
dag = dag
)
Python
복사
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.mysql_to_s3 import MySQLToS3Operator
from plugins.s3_to_redshift_operator import S3ToRedshiftOperator
from airflow.providers.amazon.aws.operators.s3_delete_objects import S3DeleteObjectsOperator
from airflow.models import Variable
from datetime import datetime
from datetime import timedelta
import requests
import logging
import psycopg2
import json
dag = DAG(
dag_id = 'MySQL_to_Redshift_v3',
start_date = datetime(2022,8,24), # 날짜가 미래인 경우 실행이 안됨
schedule_interval = '0 9 * * *', # 적당히 조절
max_active_runs = 1,
catchup = True,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
)
schema = "keeyong"
table = "nps"
s3_bucket = "grepp-data-engineering"
s3_key = schema + "-" + table # s3_key = schema + "/" + table
s3_folder_cleanup = S3DeleteObjectsOperator(
task_id = 's3_folder_cleanup',
bucket = s3_bucket,
keys = s3_key,
aws_conn_id = "aws_conn_id",
dag = dag
)
## {{ sys var }} => airflow system variable
mysql_to_s3_nps = MySQLToS3Operator(
task_id = 'mysql_to_s3_nps',
query = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')",
s3_bucket = s3_bucket,
s3_key = s3_key,
mysql_conn_id = "mysql_conn_id",
aws_conn_id = "aws_conn_id",
verify = False,
dag = dag
)
s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = 's3_to_redshift_nps',
s3_bucket = s3_bucket,
s3_key = s3_key,
schema = schema,
table = table,
copy_options=['csv'],
redshift_conn_id = "redshift_dev_db",
primary_key = "id",
order_key = "created_at",
dag = dag
)
s3_folder_cleanup >> mysql_to_s3_nps >> s3_to_redshift_nps
Python
복사