Search

(Dbt) Data build tool - model 톺아보기

(Dbt) Data build tool - model 톺아보기

dbt란 무엇인가?

dbt는 sql을 효과적으로 생성해주고 밑바닥에 있는 DW에 실행을 해줌으로써 db 모델 관리를 잘해주고 테스트 붙이고 스냅샷 잡고 document도 쉽게 만들 수 있는 ELT 툴
Data Build Tool (https://www.getdbt.com/) 을 줄인 말 = dbt
ELT용 오픈소스: In-warehouse data transformation
dbt Labs라는 회사가 상용화 ($4.2B valuation)
Analytics Engineer라는 말을 만들어냄
다양한 데이터 웨어하우스를 지원
Redshift, Snowflake, Bigquery, Spark
클라우드 버전도 존재
dbt Cloud
dbt가 서포트해주는 데이터 시스템
BigQuery
Redshift
Snowflake
Spark
dbt 구성 컴포넌트
데이터 모델 (models)
테이블들을 몇 개의 티어로 관리
테이블의 품질, 가공 정도를 티어로 관리
가공 정도
Raw →Staging → Core
일종의 CTAS (SELECT 문들), Lineage 트래킹
Table, View, CTE 등등
데이터 품질 검증 (tests)
스냅샷 (snapshots)

dbt 사용 시나리오

dbt가 어떻게 사용될 수 있는지 가상 환경을 생각해보자
다음과 같은 요구조건을 달성해야한다면?
데이터 변경 사항을 이해하기 쉽고 필요하다면 롤백 가능
데이터간 리니지 확인 가능
데이터 품질 테스트 및 에러 보고
Fact 테이블의 증분 로드 (Incremental Update)
Dimension 테이블 변경 추적 (히스토리 테이블)
용이한 문서 작성
보통 사용하는 테크 스택
예시가 redshift인 것 뿐 DW는 다 가능
Redshift/Spark/Snowflake/BigQuery
dbt
Airflow

dbt Models: Input

dbt Model을 사용해 입력 데이터들을 transform해보자
Model이란?
ELT 테이블을 만듬에 있어 기본이 되는 빌딩블록
테이블이나 뷰나 CTE의 형태로 존재
입력,중간,최종 테이블을 정의하는 곳
티어 (raw, staging, core, …)
raw : ETL을 통해 만들어짐
staging : 클린업을 거침
core : Transform을 거쳐 입력으로 쓰임
raw ⇒ staging (src) ⇒ core
잠깐: View란 무엇인가?
SELECT 결과를 기반으로 만들어진 가상 테이블 (물리적 디스크에 저장되는 것이 아님)
기존 테이블의 일부 혹은 여러 테이블들을 조인한 결과를 제공함
CREATE VIEW 이름 AS SELECT …
View의 장점
데이터의 추상화: 사용자는 View를 통해 필요 데이터에 직접 접근. 원본 데이터를 알 필요가 없음
누군가 view를 통해 칼럼을 만들면 그 것을 사용하면 된다.
데이터 보안: View를 통해 사용자에게 필요한 데이터만 제공. 원본 데이터 접근 불필요
복잡한 쿼리의 간소화: SQL(View)를 사용하면 복잡한 쿼리를 단순화.
View의 단점
매번 쿼리가 실행되므로 시간이 걸릴 수 있음
원본 데이터의 변경을 모르면 실행이 실패함

CTE (Common Table Expression)

WITH temp1 AS ( SELECT k1, k2 FROM t1 JOIN t2 ON t1.id = t2.foreign_id ), temp2 AS () SELECT * FROM temp1 t1 JOIN temp2 t2 ONWITH src_user_event AS ( SELECT * FROM raw_data.user_event ) SELECT user_id, datestamp, item_id, clicked, purchased, paidamount FROM src_user_event
SQL
복사

Model 구성 요소

Input
입력(raw)과 중간(staging, src) 데이터 정의
raw는 CTE로 정의
staging은 View로 정의
Output
최종(core) 데이터 정의
core는 Table로 정의
이 모두는 models 폴더 밑에 sql 파일로 존재
기본적으로는 SELECT + Jinja 템플릿과 매크로
jinja 템플릿을 통해 다양한 테이블 접근 가능
다른 테이블들을 사용 가능 (reference)
이를 통해 리니지(계보) 파악

데이터 빌딩 프로세스

위 staging field를 정의할 때 dbt model을 사용
models/src/src_user_event.sql
WITH src_user_event AS ( SELECT * FROM raw_data.user_event ) SELECT user_id, datestamp, item_id, clicked, purchased, paidamount FROM src_user_event
SQL
복사
mkdir src cd src vi src_user_event.sql vi src_user_metadata.sql vi src_user_variant.sql
Shell
복사
먼저 dbt_project.yml
아래 example을 주석처리해준다.
models/example 도 삭제할 것
models: learn_dbt: # Config indicated by + and applies to all files under models/example/ # example: # +materialized: view
SQL
복사
models/src/src_user_variant.sql
WITH src_user_variant AS ( SELECT * FROM raw_data.user_variant ) SELECT user_id, variant_id FROM src_user_variant
SQL
복사
models/src/src_user_metadata.sql
WITH src_user_metadata AS ( SELECT * FROM raw_data.user_metadata ) SELECT user_id, age, gender, updated_at FROM src_user_metadata
SQL
복사
root로 이동 후 Model 빌딩: dbt run
keeyong learn_dbt % dbt run 09:56:39 Running with dbt=1.4.3 09:56:39 Unable to do partial parsing because profile has changed 09:56:39 [WARNING]: Configuration paths exist in your dbt_project.yml file which do not apply to any resources. There are 1 unused configuration paths: - models.learn_dbt.example 09:56:39 Found 3 models, 0 tests, 0 snapshots, 0 analyses, 327 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics 09:56:39 09:56:45 Concurrency: 1 threads (target='dev') 09:56:45 09:56:45 1 of 3 START sql view model keeyong.src_user_event ............................. [RUN] 09:56:47 1 of 3 OK created sql view model keeyong.src_user_event ........................ [CREATE VIEW in 2.65s] 09:56:47 2 of 3 START sql view model keeyong.src_user_metadata .......................... [RUN] 09:56:50 2 of 3 OK created sql view model keeyong.src_user_metadata ..................... [CREATE VIEW in 2.69s] 09:56:50 3 of 3 START sql view model keeyong.src_user_variant ........................... [RUN] 09:56:53 3 of 3 OK created sql view model keeyong.src_user_variant ...................... [CREATE VIEW in 2.57s] 09:56:54 09:56:54 Finished running 3 view models in 0 hours 0 minutes and 14.78 seconds (14.78s). 09:56:54 09:56:54 Completed successfully 09:56:54 09:56:54 Done. PASS=3 WARN=0 ERROR=0 SKIP=0 TOTAL=3
Shell
복사

Model 빌딩 확인

해당 스키마 밑에 테이블 생성 여부 확인
dbt run은 프로젝트 구성 다양한 SQL 실행
이 SQL들은 DAG로 구성됨
모델을 빌딩함
dbt run은 보통 다른 더 큰 명령의 일부로 실행
dbt test
dbt docs generate
View로 만들어짐

dbt models: Output

Materialization이란?

ELT와 같은 개념
입력 데이터(테이블)들을 연결해서 새로운 데이터(테이블) 생성하는 것
보통 여기서 추가 transformation이나 데이터 클린업 수행
4가지의 내장 materialization이 제공됨
파일이나 프로젝트 레벨에서 가능
역시 dbt run을 기타 파라미터를 가지고 실행
4가지의 Materialization 종류
View
데이터를 자주 사용하지 않는 경우
Table
데이터를 반복해서 자주 사용하는 경우
Incremental (Table Appends)
Fact 테이블
과거 레코드를 수정할 필요가 없는 경우
Ephemeral (CTE)
한 SELECT에서 자주 사용되는 데이터를 모듈화하는데 사용

데이터 빌딩 프로세스 1

dbt models output에서의 핵심은 core 테이블을 만드는 것
클린업이 된 형태
잠깐 Jinja 템플릿이란?
파이썬이 제공해주는 템플릿 엔진으로 Flask에서 많이 사용
Airflow에서도 사용함
입력 파라미터 기준으로 HTML 페이지(마크업)를 동적으로 생성
조건문, 루프, 필터등을 제공

models 밑에 core 테이블들을 위한 폴더 생성

models 폴더로 이동하여 dim 폴더fact 폴더 생성
dim 밑에 각각 dim_user_variant.sql과 dim_user_metadata.sql 생성
fact 밑에 fact_user_event.sql 생성
이 모두를 physical table로 생성
models/dim/dim_user_variant.sql
Jinja 템플릿과 ref 태그를 사용해서 dbt 내 다른 테이블들을 액세스
WITH src_user_variant AS ( SELECT * FROM {{ ref('src_user_variant') }} ) SELECT user_id, variant_id FROM src_user_variant
Python
복사
{{ ref('src_user_variant') }}
models 밑에 테이블 이름이 src_user_variant인 것들을 사용해 정의된 모든 것들을 읽어 오겠다.
기본적으로 staging 테이블은 클린 업
Core 테이블은 클린업 이후 transform
models/dim/dim_user_metadata.sql
설정에 따라 view/table/CTE 등으로 만들어져서 사용됨
materialized라는 키워드로 설정
WITH src_user_metadata AS ( SELECT * FROM {{ ref('src_user_metadata') }} ) SELECT user_id, age, gender, updated_at FROM src_user_metadata
Python
복사
models/fact/fact_user_event.sql
Incremental Table로 빌드 (materialized = 'incremental')
incremental하게 복사를 하기 때문에 내용이 조금 복잡
아래 그림 중 append strategy만 구현을 해볼 예정
{{ config( materialized = 'incremental', on_schema_change='fail' ) }} WITH src_user_event AS ( SELECT * FROM {{ ref("src_user_event") }} ) SELECT user_id, datestamp, item_id, clicked, purchased, paidamount FROM src_user_event
Python
복사
다음으로 model의 materialized format 결정
기본적으로 learn_dbt 폴더 밑에 만들어지는 테이블들을 view로 만듦
dim이라는 폴더 밑에 생기는 최종 Core 테이블들은 view가 아닌 table로 빌드
dbt_project.yml을 편집 (indentation 주의)
models: learn_dbt: +materialized: view dim: +materialized: table
Python
복사
dbt compile vs. dbt run
dbt compile은 SQL 코드까지만 생성하고 실행하지는 않음
dbt compile 16:06:26 Running with dbt=1.5.2 16:06:26 Registered adapter: redshift=1.5.6 16:06:26 Unable to do partial parsing because a project config has changed 16:06:27 Found 6 models, 0 tests, 0 snapshots, 0 analyses, 346 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics, 0 groups 16:06:27 16:06:28 Concurrency: 1 threads (target='dev') 16:06:28
Python
복사
dbt run은 생성된 코드를 실제 실행함
Model 빌딩: dbt run (dbt compile도 있음)
Model 빌딩 확인
해당 스키마 밑에 테이블 생성 여부 확인
Core 테이블들은 Table
Staging 테이블들은 View
models/fact/fact_user_event.sql
WHERE 조건 붙이기
{{ config( materialized = 'incremental', on_schema_change='fail' ) }} WITH src_user_event AS ( SELECT * FROM {{ ref("src_user_event") }} ) SELECT user_id, datestamp, item_id, clicked, purchased, paidamount FROM src_user_event WHERE datestamp is not NULL {% if is_incremental() %} AND datestamp > (SELECT max(datestamp) from {{ this }}) {% endif %}
SQL
복사
이후 다시 dbt run을 실행
16:10:01 Running with dbt=1.5.2 16:10:01 Registered adapter: redshift=1.5.6 16:10:01 Found 6 models, 0 tests, 0 snapshots, 0 analyses, 346 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics, 0 groups 16:10:01 16:10:03 Concurrency: 1 threads (target='dev') 16:10:03 16:10:03 1 of 6 START sql view model hajuny129.src_user_event ........................... [RUN] 16:10:03 1 of 6 OK created sql view model hajuny129.src_user_event ...................... [SUCCESS in 0.45s] 16:10:03 2 of 6 START sql view model hajuny129.src_user_metadata ........................ [RUN] 16:10:04 2 of 6 OK created sql view model hajuny129.src_user_metadata ................... [SUCCESS in 0.38s] 16:10:04 3 of 6 START sql view model hajuny129.src_user_variant ......................... [RUN] 16:10:04 3 of 6 OK created sql view model hajuny129.src_user_variant .................... [SUCCESS in 0.38s] 16:10:04 4 of 6 START sql incremental model hajuny129.fact_user_event ................... [RUN] 16:10:05 4 of 6 OK created sql incremental model hajuny129.fact_user_event .............. [SUCCESS in 1.12s] 16:10:05 5 of 6 START sql table model hajuny129.dim_user_metadata ....................... [RUN] 16:10:06 5 of 6 OK created sql table model hajuny129.dim_user_metadata .................. [SUCCESS in 0.62s] 16:10:06 6 of 6 START sql table model hajuny129.dim_user_variant ........................ [RUN] 16:10:06 6 of 6 OK created sql table model hajuny129.dim_user_variant ................... [SUCCESS in 0.61s] 16:10:06 16:10:06 Finished running 3 view models, 1 incremental model, 2 table models in 0 hours 0 minutes and 5.21 seconds (5.21s). 16:10:06 16:10:06 Completed successfully 16:10:06 16:10:06 Done. PASS=6 WARN=0 ERROR=0 SKIP=0 TOTAL=6
SQL
복사
Copy가 제대로 되어 있는지 확인하기 위해 카운트를 해본다
SELECT COUNT(1) FROM hajuny129.dim_user_metadata;
SELECT COUNT(1) FROM hajuny129.dim_user_variant;
SELECT COUNT(1) FROM hajuny129.fact_user_event;
raw_data.user_event에 새 레코드 추가후 dbt run 수행
적당한 Redshift 클라이언트 툴에서 아래 수행 INSERT INTO raw_data.user_event VALUES (100, '2023-06-10', 100, 1, 0, 0);
다음으로 dbt run을 수행
compiled SQL을 확인해서 정말 Incremental하게 업데이트되었는지 확인
최종적으로 Redshift 클라이언트 툴에서 다시 확인 SELECT * FROM keeyong.fact_user_event WHERE datestamp = '2023-06-10';
Model 빌딩: Compile 결과 확인
learn_dbt/target/compiled/learn_dbt/models/fact 폴더로 이동하면 컴파일 된 sql확인 가능
models에는 실행된 서브 폴더들이 존재
fact_user_event.sql의 내용은 아래와 같음
WITH src_user_event AS ( SELECT * FROM "dev"."hajuny129"."src_user_event" ) SELECT user_id, datestamp, item_id, clicked, purchased, paidamount FROM src_user_event WHERE datestamp is not NULL AND datestamp > (SELECT max(datestamp) from "dev"."hajuny129"."fact_user_event")
Python
복사

src 테이블들을 CTE로 변환해보기

src 테이블들을 굳이 빌드할 필요가 있나?
지금까지는 staging 테이블을 view로 만들었는데 이를 CTE로 변환할 예정
dbt_project.yml 편집
models: learn_dbt: # Config indicated by + and applies to all files under models +materialized: view dim: +materialized: table src: +materialized: ephemeral
Python
복사
src 테이블들 (View) 삭제
DROP VIEW hajuny129.src_user_event; DROP VIEW hajuny129.src_user_metadata; DROP VIEW hajuny129.src_user_variant;
Python
복사
dbt run 실행
이제 SRC 테이블들은 VIEW가 아닌 CTE 형태로 임베드되어서 빌드됨
Model 빌딩: dbt run (dbt compile도 있음)

데이터 빌딩 프로세스 2

models/dim/dim_user.sql
dim_user_variant와 dim_user_metadata를 조인
WITH um AS ( SELECT * FROM {{ ref("dim_user_metadata") }} ), uv AS ( SELECT * FROM {{ ref("dim_user_variant") }} ) SELECT uv.user_id, uv.variant_id, um.age, um.gender FROM uv LEFT JOIN um ON uv.user_id = um.user_id
SQL
복사
models/analytics/analytics_variant_user_daily.sql
dim_user와 fact_user_event를 조인 - analytics 폴더를 models 밑에 WITH u AS 생성
WITH u AS ( SELECT * FROM {{ ref("dim_user") }} ), ue AS ( SELECT * FROM {{ ref("fact_user_event") }} ) SELECT variant_id, ue.user_id, datestamp, age, gender, COUNT(DISTINCT item_id) num_of_items, -- 총 impression COUNT(DISTINCT CASE WHEN clicked THEN item_id END) num_of_clicks, -- 총 click SUM(purchased) num_of_purchases, -- 총 purchase SUM(paidamount) revenue -- 총 revenue FROM ue LEFT JOIN u ON ue.user_id = u.user_id GROUP by 1, 2, 3, 4, 5
SQL
복사
Model 빌딩: dbt run
최종 테이블까지 생성
최종 analytics 테이블의 타입을 View에서 Table로 바꾸기
dbt_project.yml 편집
models: learn_dbt: +materialized: view dim: +materialized: table analytics: +materialized: table src: +materialized: ephemeral
SQL
복사