Search

데이터 파이프라인 입문

용어 설명: ETL

ETL: Extract, Transform and Load
Extract : 소스를 사용해서 데이터 덤프로부터 데이터를 받아오는 작업들
Transform : 데이터의 형태 및 포맷을 바꾸는 것, 경우에 따라서 이 과정 생략 가능
Load : 데이터 웨어하우스에 테이블 형태로 적재 혹은 레이크에 적재
Data Pipeline, ETL, Data Workflow, DAG
ETL (Extract, Transform, and Load)
Called DAG (Directed Acyclic Graph) in Airflow (단방향 무사이클 그래프)
ETL vs ELT
ETL: 데이터를 데이터 웨어하우스 외부에서 내부로 가져오는 프로세스
ELT: 데이터 웨어하우스 내부 데이터를 조작해서 (보통은 좀더 추상화되고 요약된) 새로운 데이터를 만드는 프로세스
이 경우 데이터 레이크(DW보다 Scalable)를 쓰기도 함
이런 프로세스 전용 기술들이 있으며 DBT가 가장 유명: Analytics Engineering
데이터 웨어하우스의 구성 예시
Airflow를 사용해서 다수의 데이터 소스로부터 데이터 웨어하우스에 적재
그 후 Airflow를 통해 요약 테이블 구성도 가능 ⇒ ELT가 된다
Spark/Athena 사용 시나리오
비구조화된 데이터 처리하기
spark의 API인 dataframe은 다수의 서버에 분산된 데이터를 처리할 수 있게 됨
배치, 스트리밍 데이터 처리도 가능함
대용량 데이터 병렬 처리 (feature 계산)
머신 러닝 모델의 입력으로 들어가는 feature를 배치로 미리 계산하는 경우
spark는 streaming 처리 가능 (redshift는 배치 처리만 가능)
spark는 큰 데이터를 processing하기 위한 프레임 워크에 가깝다

용어 설명: Data Lake vs. Data Warehouse

Data Lake
정형 데이터 + 다양한 형식의 비정형 데이터 저장
과거 데이터 스토리지에 가깝습니다(보존 정책 없음)
ex) S3
크기 면에서 데이터 웨어하우스보다 큰 규모
ETL
ETL + ELT
S3에 비구조화 데이터를 적재 후, Spark, Athena, presto를 이용하여 의미가 있는 것들만 DW 혹은 마트를 구성
다양한 종류의 ETL을 스케쥴링해주는 것이 Airflow
Data Warehouse
보통 DL보다 가격이 비싼 클러스터 사용
일부 보존 정책을 통해 정교하고 구조화된 데이터에 더욱 집중
관계형 DB라 비구조화된 데이터를 다루기엔 어려움
일반적으로 BI 툴(Looker, Tableau, Superset 등)은 데이터 웨어하우스와 연결됩니다
데이터 마트로 DW를 사용

Data Pipeline의 정의

데이터를 소스로부터 목적지로 복사하는 작업
이 작업은 보통 코딩 (파이썬 혹은 스칼라) 혹은 SQL을 통해 이뤄짐
대부분의 경우 목적지는 데이터 웨어하우스가 됨
데이터 소스의 예:
Click stream, call data, ads performance data, transactions, sensor data, metadata, …
More concrete examples: production databases, log files, API, stream data (Kafka topic)
데이터 목적지의 예:
데이터 웨어하우스, 캐시 시스템 (Redis, Memcache), 프로덕션 데이터베이스, NoSQL, S3, ...

다양한 종류의 데이터 파이프라인들

1.
Raw Data ETL Jobs
a.
외부와 내부 데이터 소스에서 데이터를 읽어다가 (많은 경우 API를 통하게 됨)
b.
적당한 데이터 포맷 변환 후 (데이터의 크기가 커지면 Spark등이 필요해짐)
c.
데이터 웨어하우스 로드
이 작업은 보통 데이터 엔지니어가 함
2.
Summary/Report Jobs (ELT)
a.
DW(혹은 DL)로부터 데이터를 읽어 다시 DW에 쓰는 ETL
b.
Raw Data를 읽어서 일종의 리포트 형태나 써머리 형태의 테이블을 다시 만드는 용도
c.
특수한 형태로는 AB 테스트 결과를 분석하는 데이터 파이프라인도 존재
요약 테이블의 경우, SQL (CTAS를 통해)만으로 만들고 이는 데이터 분석가가 하는 것이 맞음. 데이터 엔지니어 관점에서는 어떻게 데이터 분석가들이 편하게 할 수 있는 환경을 만들어 주느냐가 관건이다.
⇒ Analytics Engineer (DBT)
Production Data Jobs
1.
DW로부터 데이터를 읽어 외부에 있는 시스템
즉, 다른 Storage(많은 경우 프로덕션 환경)로 쓰는(작성) ETL
a. 써머리 정보가 프로덕션 환경에서 성능 이유로 필요한 경우 b. 혹은 머신러닝 모델에서 필요한 피쳐들을 미리 계산해두는 경우
2.
이 경우 흔한 타켓 스토리지 a. Cassandra/HBase/DynamoDB와 같은 NoSQL b. MySQL과 같은 관계형 데이터베이스 (OLTP) c. Redis/Memcache와 같은 캐시 d. ElasticSearch와 같은 검색엔진
3.
예시
DW에서 계산될 쿼리 ->실시간 강의별 수강생수, 리뷰수, 평점 계산
SELECT c.courseid, COUNT(DISTINCT cr.studentid) "수강생수", COUNT(DISTINCT cr.reviewid) "리뷰수", AVG(cr.rating) "평점" FROM course c LEFT JOIN course_review cr ON c.courseid = cr.courseid GROUP BY 1;
SQL
복사
데이터 웨어하우스에서 읽어서 프로덕션 DB에 쏘아주는 방법으로 해결 가능
⇒ 너무 인기가 많은 영상은 view count가 real time으로 올라가지 않는다
데이터 웨어하우스에서 한시간에 한번씩 모든 코스에 대해서 계산한 다음에 mysql로 쏘아준다
프론트나 백엔드에서는 해당 mysql을 읽어다가 유저에게 보여준다

데이터 파이프라인을 만들 때, 고려해야할 점

이상과 현실간의 괴리

이상 혹은 환상
내가 만든 데이터 파이프라인은 문제 없이 동작할 것이다
내가 만든 데이터 파이프라인을 관리하는 것은 어렵지 않을 것이다
현실 혹은 실상
데이터 파이프라인은 많은 이유로 실패함
버그 :)
데이터 소스상의 이슈: 데이터 소스가 없거나 데이터 형식이 변경되는 경우 어떻게 될까?
DAG 수의 증가에 따른 데이터 파이프라인들간의 의존도에 이해도 부족
데이터 파이프라인의 수가 늘어나면 유지보수 비용이 기하급수적으로 늘어남
데이터 소스간의 의존도가 생기면서 이는 더 복잡해짐. 만일 마케팅 채널 정보가 업데이트가 안된다면 마케팅 관련 다른 모든 정보들이 갱신되지 않음
파이프라인 간 의존 관계가 명확하게 보여야 한다
안보이는 의존도 생김. DAG 간 의존도도 발생
모든 데이터 파이프라인은 비지니스 오너가 있어야 한다. 관련 파이프라인 ETL 문제가 생길 시 해당 비지니스 오너에게 브로드캐스팅 수행.
더 많은 테이블을 관리해야 함 (source of truth, search cost, …)

Best Practices

1.
첫번째 Best Practice
가능하면 데이터가 작을 경우 매번 통채로 복사해서 테이블을 만들기 (Full Refresh)
Full Refresh: 매번 쓸 수 있는 데이터를 가져다가 통째로 복사해서 업데이트
하지만 데이터가 커지면 통째로 업데이트하는 것이 해법이 아닐 수 있음.
Full refresh를 하는데 12시간 정도의 큰 시간이 필요할 경우
Incremental update만이 가능하다면, 대상 데이터소스가 갖춰야할 몇 가지 조건이 있음
Incremental update : 매시간 매일 바뀐 것만 가져다 DW에 업데이트 하는 것
데이터 소스가 프로덕션 데이터베이스 테이블이라면 아래 필드들이 필요
created (데이터 업데이트 관점에서 필요하지는 않음)
modified
deleted
모든 경우에 Incremental update가 가능한 것이 아니다
Incremental Update를 하려면 데이터 소스가 API일 경우, 특정 날짜를 기준으로 새로 생성되거나 업데이트된 레코드들을 읽어올 수 있어야함
2.
두번째 Best Practice
멱등성(Idempotency)을 보장하는 것이 중요
멱등성은 무엇인가?
동일한 입력 데이터로 데이터 파이프라인을 다수 실행해도 최종 테이블의 내용이 달라지지 말아야한다는 점
예를 들면 중복 데이터가 생기지 말아야함
중요한 포인트는 critical point들이 모두 one atomic action으로 실행이 되어야 한다는 점
SQL의 transaction이 꼭 필요한 기술
3.
3번째 Best Practice
멱등성을 보장한다는 것 = 데이터 파이프라인이 깔끔하게 쉬어야한다는 것
실패한 데이터 파이프라인 작업의 재실행이 쉬어야함
재실행이 쉽지 않은 케이스 : Incremental update
업데이트를 실패한 그 당시로 돌아가서 재실행을 해야한다
Incremental update는 그 실패 케이스가 full refresh보다 복잡함
어떤 날짜가 실패할 경우, 특정 날짜가 빌 수 있음
즉, 과거 데이터를 다시 채우는 과정(Backfill)이 쉬어야 함
Apache Airflow는 이 부분(특히 backfill)에 강점을 갖고 있음
DAG의 catchup 파라미터가 True가 되어야하고 start_date과 end_date이 적절하게 설정되어야함
대상 테이블이 incremental update가 되는 경우에만 의미가 있음
execution_date 파라미터를 사용해서 업데이트되는 날짜 혹은 시간을 알아내게 코드를 작성해야함
현재 시간을 기준으로 업데이트 대상을 선택하는 것은 안티 패턴
4.
4번째 Best Practice
데이터 파이프라인의 입력과 출력을 명확히 하고 문서화
데이터 디스커버리 문제!
비지니스 오너를 명시할 것 : 누가 이 데이터를 요청했는지를 기록으로 남길 것
나중에 데이터 카탈로그로 들어가서 데이터 디스커버리에 사용이 가능해진다
데이터 리니지가 중요해짐 ⇒ 이것을 이해못하면 온갖 종류의 사고가 발생
데이터 리니지 : 현재 쓰이는 데이터가 어떻게 생성됐고, 어떤 과정을 거쳤으며, 어디에 쓰이고 있는지 등의 계보를 관리해 현황을 파악
5.
5번째 Best Practice
주기적으로 쓸모없는 데이터들을 삭제 ⇒ 데이터 디스커버리를 효율적으로 해주기 위해
사용되지 않는 테이블과 데이터 파이프라인을 적극적으로 제거합니다.
DW에 필요한 데이터만 유지하고 이전 데이터를 DL(또는 저장소)로 이동합니다.
위와 같은 작업은 분기에 한번정도 고려 가능
6.
6번째 Best Practice
데이터 파이프라인 사고시 마다 사고 리포트(post-mortem) 쓰기
목적은 동일한 혹은 아주 비슷한 사고가 또 발생하는 것을 막기 위함
사고 원인을 이해하고 이를 방지하기 위한 액션 아이템들의 실행이 중요해짐
기술의 부채의 정도를 이야기해주는 바로미터
7.
7번째 Best Practice
중요 데이터 파이프라인의 입력과 출력을 체크하기
아주 간단하게 입력 레코드의 수와 출력 레코드의 수가 몇개인지 체크하는 것부터 시작
써머리 테이블을 만들어내고 Primary key가 존재한다면 Primary key uniqueness가 보장되는지 체크하는 것이 필요함
중복 레코드 체크
데이터 대상의 유닛 테스트를 적용

요약 : 데이터 파이프라인 작성시 기억할 점

데이터 파이프라인에 관한 정보를 수집하는 것이 중요
비지니스 오너와 데이터 리니지에 주의할 것
결국 데이터 카탈로그가 필요
데이터 품질 체크
입력 데이터와 출력 데이터
코드 실패를 어설프게 복구하려는 것보다는 깔끔하게 실패하는 것이 좋음
가능하면 Full Refresh
Incremental Update를 쓸 수 밖에 없다면 Backfill 방식을 먼저 생각해둘 것 ⇒ Airflow가 필요한 이유
주기적인 청소 (데이터, 테이블, Dag)

Reference

실리콘에서 날아온 데이터엔지니어링
데이터 엔지니어링 데브코스 1기