(Dbt) Data build tool - model 톺아보기
dbt란 무엇인가?
dbt는 sql을 효과적으로 생성해주고 밑바닥에 있는 DW에 실행을 해줌으로써 db 모델 관리를 잘해주고 테스트 붙이고 스냅샷 잡고 document도 쉽게 만들 수 있는 ELT 툴
•
•
ELT용 오픈소스: In-warehouse data transformation
•
dbt Labs라는 회사가 상용화 ($4.2B valuation)
•
Analytics Engineer라는 말을 만들어냄
◦
다양한 데이터 웨어하우스를 지원
•
Redshift, Snowflake, Bigquery, Spark
◦
클라우드 버전도 존재
•
dbt Cloud
•
dbt가 서포트해주는 데이터 시스템
◦
BigQuery
◦
Redshift
◦
Snowflake
◦
Spark
•
dbt 구성 컴포넌트
◦
데이터 모델 (models)
▪
테이블들을 몇 개의 티어로 관리
•
테이블의 품질, 가공 정도를 티어로 관리
◦
가공 정도
▪
Raw →Staging → Core
•
일종의 CTAS (SELECT 문들), Lineage 트래킹
▪
Table, View, CTE 등등
◦
데이터 품질 검증 (tests)
▪
스냅샷 (snapshots)
dbt 사용 시나리오
dbt가 어떻게 사용될 수 있는지 가상 환경을 생각해보자
•
다음과 같은 요구조건을 달성해야한다면?
◦
데이터 변경 사항을 이해하기 쉽고 필요하다면 롤백 가능
◦
데이터간 리니지 확인 가능
◦
데이터 품질 테스트 및 에러 보고
◦
Fact 테이블의 증분 로드 (Incremental Update)
◦
Dimension 테이블 변경 추적 (히스토리 테이블)
◦
용이한 문서 작성
•
보통 사용하는 테크 스택
예시가 redshift인 것 뿐 DW는 다 가능
◦
Redshift/Spark/Snowflake/BigQuery
◦
dbt
◦
Airflow
dbt Models: Input
dbt Model을 사용해 입력 데이터들을 transform해보자
•
Model이란?
◦
ELT 테이블을 만듬에 있어 기본이 되는 빌딩블록
▪
테이블이나 뷰나 CTE의 형태로 존재
◦
입력,중간,최종 테이블을 정의하는 곳
▪
티어 (raw, staging, core, …)
▪
raw : ETL을 통해 만들어짐
▪
staging : 클린업을 거침
▪
core : Transform을 거쳐 입력으로 쓰임
▪
raw ⇒ staging (src) ⇒ core
•
잠깐: View란 무엇인가?
◦
SELECT 결과를 기반으로 만들어진 가상 테이블 (물리적 디스크에 저장되는 것이 아님)
▪
기존 테이블의 일부 혹은 여러 테이블들을 조인한 결과를 제공함
▪
CREATE VIEW 이름 AS SELECT …
◦
View의 장점
▪
데이터의 추상화: 사용자는 View를 통해 필요 데이터에 직접 접근. 원본 데이터를 알 필요가 없음
•
누군가 view를 통해 칼럼을 만들면 그 것을 사용하면 된다.
▪
데이터 보안: View를 통해 사용자에게 필요한 데이터만 제공. 원본 데이터 접근 불필요
▪
복잡한 쿼리의 간소화: SQL(View)를 사용하면 복잡한 쿼리를 단순화.
◦
View의 단점
▪
매번 쿼리가 실행되므로 시간이 걸릴 수 있음
▪
원본 데이터의 변경을 모르면 실행이 실패함
CTE (Common Table Expression)
WITH temp1 AS (
SELECT k1, k2
FROM t1
JOIN t2 ON t1.id = t2.foreign_id
), temp2 AS (
…
)
SELECT *
FROM temp1 t1
JOIN temp2 t2 ON …
WITH src_user_event AS (
SELECT * FROM raw_data.user_event
)
SELECT
user_id,
datestamp,
item_id,
clicked,
purchased,
paidamount
FROM
src_user_event
SQL
복사
Model 구성 요소
•
Input
◦
입력(raw)과 중간(staging, src) 데이터 정의
◦
raw는 CTE로 정의
◦
staging은 View로 정의
•
Output
◦
최종(core) 데이터 정의
◦
core는 Table로 정의
•
이 모두는 models 폴더 밑에 sql 파일로 존재
◦
기본적으로는 SELECT + Jinja 템플릿과 매크로
◦
jinja 템플릿을 통해 다양한 테이블 접근 가능
◦
다른 테이블들을 사용 가능 (reference)
▪
이를 통해 리니지(계보) 파악
데이터 빌딩 프로세스
위 staging field를 정의할 때 dbt model을 사용
•
models/src/src_user_event.sql
WITH src_user_event AS (
SELECT * FROM raw_data.user_event
)
SELECT
user_id,
datestamp,
item_id,
clicked,
purchased,
paidamount
FROM
src_user_event
SQL
복사
mkdir src
cd src
vi src_user_event.sql
vi src_user_metadata.sql
vi src_user_variant.sql
Shell
복사
•
먼저 dbt_project.yml
◦
아래 example을 주석처리해준다.
◦
models/example 도 삭제할 것
models:
learn_dbt:
# Config indicated by + and applies to all files under models/example/
# example:
# +materialized: view
SQL
복사
•
models/src/src_user_variant.sql
WITH src_user_variant AS (
SELECT * FROM raw_data.user_variant
)
SELECT
user_id,
variant_id
FROM
src_user_variant
SQL
복사
•
models/src/src_user_metadata.sql
WITH src_user_metadata AS (
SELECT * FROM raw_data.user_metadata
)
SELECT
user_id,
age,
gender,
updated_at
FROM
src_user_metadata
SQL
복사
•
root로 이동 후 Model 빌딩: dbt run
keeyong learn_dbt % dbt run
09:56:39 Running with dbt=1.4.3
09:56:39 Unable to do partial parsing because profile has changed
09:56:39 [WARNING]: Configuration paths exist in your dbt_project.yml file which do not apply to any resources.
There are 1 unused configuration paths:
- models.learn_dbt.example
09:56:39 Found 3 models, 0 tests, 0 snapshots, 0 analyses, 327 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics
09:56:39
09:56:45 Concurrency: 1 threads (target='dev')
09:56:45
09:56:45 1 of 3 START sql view model keeyong.src_user_event ............................. [RUN]
09:56:47 1 of 3 OK created sql view model keeyong.src_user_event ........................ [CREATE VIEW in 2.65s]
09:56:47 2 of 3 START sql view model keeyong.src_user_metadata .......................... [RUN]
09:56:50 2 of 3 OK created sql view model keeyong.src_user_metadata ..................... [CREATE VIEW in 2.69s]
09:56:50 3 of 3 START sql view model keeyong.src_user_variant ........................... [RUN]
09:56:53 3 of 3 OK created sql view model keeyong.src_user_variant ...................... [CREATE VIEW in 2.57s]
09:56:54
09:56:54 Finished running 3 view models in 0 hours 0 minutes and 14.78 seconds (14.78s).
09:56:54
09:56:54 Completed successfully
09:56:54
09:56:54 Done. PASS=3 WARN=0 ERROR=0 SKIP=0 TOTAL=3
Shell
복사
Model 빌딩 확인
•
해당 스키마 밑에 테이블 생성 여부 확인
•
dbt run은 프로젝트 구성 다양한 SQL 실행
◦
이 SQL들은 DAG로 구성됨
◦
모델을 빌딩함
•
dbt run은 보통 다른 더 큰 명령의 일부로 실행
◦
dbt test
◦
dbt docs generate
•
View로 만들어짐
dbt models: Output
Materialization이란?
•
ELT와 같은 개념
•
입력 데이터(테이블)들을 연결해서 새로운 데이터(테이블) 생성하는 것
◦
보통 여기서 추가 transformation이나 데이터 클린업 수행
•
4가지의 내장 materialization이 제공됨
•
파일이나 프로젝트 레벨에서 가능
•
역시 dbt run을 기타 파라미터를 가지고 실행
•
4가지의 Materialization 종류
◦
View
▪
데이터를 자주 사용하지 않는 경우
◦
Table
▪
데이터를 반복해서 자주 사용하는 경우
◦
Incremental (Table Appends)
▪
Fact 테이블
▪
과거 레코드를 수정할 필요가 없는 경우
◦
Ephemeral (CTE)
▪
한 SELECT에서 자주 사용되는 데이터를 모듈화하는데 사용
데이터 빌딩 프로세스 1
•
dbt models output에서의 핵심은 core 테이블을 만드는 것
◦
클린업이 된 형태
•
잠깐 Jinja 템플릿이란?
◦
파이썬이 제공해주는 템플릿 엔진으로 Flask에서 많이 사용
▪
Airflow에서도 사용함
◦
입력 파라미터 기준으로 HTML 페이지(마크업)를 동적으로 생성
◦
조건문, 루프, 필터등을 제공
models 밑에 core 테이블들을 위한 폴더 생성
•
models 폴더로 이동하여 dim 폴더와 fact 폴더 생성
◦
dim 밑에 각각 dim_user_variant.sql과 dim_user_metadata.sql 생성
◦
fact 밑에 fact_user_event.sql 생성
•
이 모두를 physical table로 생성
•
models/dim/dim_user_variant.sql
◦
Jinja 템플릿과 ref 태그를 사용해서 dbt 내 다른 테이블들을 액세스
WITH src_user_variant AS (
SELECT * FROM {{ ref('src_user_variant') }}
)
SELECT
user_id,
variant_id
FROM
src_user_variant
Python
복사
{{ ref('src_user_variant') }}
▪
models 밑에 테이블 이름이 src_user_variant인 것들을 사용해 정의된 모든 것들을 읽어 오겠다.
▪
기본적으로 staging 테이블은 클린 업
▪
Core 테이블은 클린업 이후 transform
•
models/dim/dim_user_metadata.sql
◦
설정에 따라 view/table/CTE 등으로 만들어져서 사용됨
▪
materialized라는 키워드로 설정
WITH src_user_metadata AS (
SELECT * FROM {{ ref('src_user_metadata') }}
)
SELECT
user_id,
age,
gender,
updated_at
FROM
src_user_metadata
Python
복사
•
models/fact/fact_user_event.sql
◦
Incremental Table로 빌드 (materialized = 'incremental')
▪
incremental하게 복사를 하기 때문에 내용이 조금 복잡
▪
아래 그림 중 append strategy만 구현을 해볼 예정
{{
config(
materialized = 'incremental',
on_schema_change='fail'
)
}}
WITH src_user_event AS (
SELECT * FROM {{ ref("src_user_event") }}
)
SELECT
user_id,
datestamp,
item_id,
clicked,
purchased,
paidamount
FROM
src_user_event
Python
복사
•
다음으로 model의 materialized format 결정
◦
기본적으로 learn_dbt 폴더 밑에 만들어지는 테이블들을 view로 만듦
◦
dim이라는 폴더 밑에 생기는 최종 Core 테이블들은 view가 아닌 table로 빌드
◦
dbt_project.yml을 편집 (indentation 주의)
models:
learn_dbt:
+materialized: view
dim:
+materialized: table
Python
복사
dbt compile vs. dbt run
•
dbt compile은 SQL 코드까지만 생성하고 실행하지는 않음
dbt compile
16:06:26 Running with dbt=1.5.2
16:06:26 Registered adapter: redshift=1.5.6
16:06:26 Unable to do partial parsing because a project config has changed
16:06:27 Found 6 models, 0 tests, 0 snapshots, 0 analyses, 346 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics, 0 groups
16:06:27
16:06:28 Concurrency: 1 threads (target='dev')
16:06:28
Python
복사
•
dbt run은 생성된 코드를 실제 실행함
•
Model 빌딩: dbt run (dbt compile도 있음)
Model 빌딩 확인
•
해당 스키마 밑에 테이블 생성 여부 확인
•
Core 테이블들은 Table
•
Staging 테이블들은 View
models/fact/fact_user_event.sql
•
WHERE 조건 붙이기
{{
config(
materialized = 'incremental',
on_schema_change='fail'
)
}}
WITH src_user_event AS (
SELECT * FROM {{ ref("src_user_event") }}
)
SELECT
user_id,
datestamp,
item_id,
clicked,
purchased,
paidamount
FROM
src_user_event
WHERE datestamp is not NULL
{% if is_incremental() %}
AND datestamp > (SELECT max(datestamp) from {{ this }})
{% endif %}
SQL
복사
•
이후 다시 dbt run을 실행
16:10:01 Running with dbt=1.5.2
16:10:01 Registered adapter: redshift=1.5.6
16:10:01 Found 6 models, 0 tests, 0 snapshots, 0 analyses, 346 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics, 0 groups
16:10:01
16:10:03 Concurrency: 1 threads (target='dev')
16:10:03
16:10:03 1 of 6 START sql view model hajuny129.src_user_event ........................... [RUN]
16:10:03 1 of 6 OK created sql view model hajuny129.src_user_event ...................... [SUCCESS in 0.45s]
16:10:03 2 of 6 START sql view model hajuny129.src_user_metadata ........................ [RUN]
16:10:04 2 of 6 OK created sql view model hajuny129.src_user_metadata ................... [SUCCESS in 0.38s]
16:10:04 3 of 6 START sql view model hajuny129.src_user_variant ......................... [RUN]
16:10:04 3 of 6 OK created sql view model hajuny129.src_user_variant .................... [SUCCESS in 0.38s]
16:10:04 4 of 6 START sql incremental model hajuny129.fact_user_event ................... [RUN]
16:10:05 4 of 6 OK created sql incremental model hajuny129.fact_user_event .............. [SUCCESS in 1.12s]
16:10:05 5 of 6 START sql table model hajuny129.dim_user_metadata ....................... [RUN]
16:10:06 5 of 6 OK created sql table model hajuny129.dim_user_metadata .................. [SUCCESS in 0.62s]
16:10:06 6 of 6 START sql table model hajuny129.dim_user_variant ........................ [RUN]
16:10:06 6 of 6 OK created sql table model hajuny129.dim_user_variant ................... [SUCCESS in 0.61s]
16:10:06
16:10:06 Finished running 3 view models, 1 incremental model, 2 table models in 0 hours 0 minutes and 5.21 seconds (5.21s).
16:10:06
16:10:06 Completed successfully
16:10:06
16:10:06 Done. PASS=6 WARN=0 ERROR=0 SKIP=0 TOTAL=6
SQL
복사
•
Copy가 제대로 되어 있는지 확인하기 위해 카운트를 해본다
◦
SELECT COUNT(1) FROM hajuny129.dim_user_metadata;
◦
SELECT COUNT(1) FROM hajuny129.dim_user_variant;
◦
SELECT COUNT(1) FROM hajuny129.fact_user_event;
•
raw_data.user_event에 새 레코드 추가후 dbt run 수행
◦
적당한 Redshift 클라이언트 툴에서 아래 수행
INSERT INTO raw_data.user_event VALUES (100, '2023-06-10', 100, 1, 0, 0);
◦
다음으로 dbt run을 수행
◦
compiled SQL을 확인해서 정말 Incremental하게 업데이트되었는지 확인
◦
최종적으로 Redshift 클라이언트 툴에서 다시 확인
SELECT * FROM keeyong.fact_user_event WHERE datestamp = '2023-06-10';
•
Model 빌딩: Compile 결과 확인
◦
learn_dbt/target/compiled/learn_dbt/models/fact 폴더로 이동하면 컴파일 된 sql확인 가능
▪
models에는 실행된 서브 폴더들이 존재
▪
fact_user_event.sql의 내용은 아래와 같음
WITH src_user_event AS (
SELECT * FROM "dev"."hajuny129"."src_user_event"
)
SELECT
user_id,
datestamp,
item_id,
clicked,
purchased,
paidamount
FROM
src_user_event
WHERE datestamp is not NULL
AND datestamp > (SELECT max(datestamp) from "dev"."hajuny129"."fact_user_event")
Python
복사
src 테이블들을 CTE로 변환해보기
•
src 테이블들을 굳이 빌드할 필요가 있나?
◦
지금까지는 staging 테이블을 view로 만들었는데 이를 CTE로 변환할 예정
•
dbt_project.yml 편집
models:
learn_dbt:
# Config indicated by + and applies to all files under models
+materialized: view
dim:
+materialized: table
src:
+materialized: ephemeral
Python
복사
•
src 테이블들 (View) 삭제
DROP VIEW hajuny129.src_user_event;
DROP VIEW hajuny129.src_user_metadata;
DROP VIEW hajuny129.src_user_variant;
Python
복사
•
dbt run 실행
◦
이제 SRC 테이블들은 VIEW가 아닌 CTE 형태로 임베드되어서 빌드됨
•
Model 빌딩: dbt run (dbt compile도 있음)
데이터 빌딩 프로세스 2
•
models/dim/dim_user.sql
◦
dim_user_variant와 dim_user_metadata를 조인
WITH um AS (
SELECT * FROM {{ ref("dim_user_metadata") }}
), uv AS (
SELECT * FROM {{ ref("dim_user_variant") }}
)
SELECT
uv.user_id,
uv.variant_id,
um.age,
um.gender
FROM uv
LEFT JOIN um ON uv.user_id = um.user_id
SQL
복사
•
models/analytics/analytics_variant_user_daily.sql
◦
dim_user와 fact_user_event를 조인 - analytics 폴더를 models 밑에 WITH u AS 생성
WITH u AS (
SELECT * FROM {{ ref("dim_user") }}
), ue AS (
SELECT * FROM {{ ref("fact_user_event") }}
)
SELECT
variant_id,
ue.user_id,
datestamp,
age,
gender,
COUNT(DISTINCT item_id) num_of_items, -- 총 impression
COUNT(DISTINCT CASE WHEN clicked THEN item_id END) num_of_clicks, -- 총 click
SUM(purchased) num_of_purchases, -- 총 purchase
SUM(paidamount) revenue -- 총 revenue
FROM ue LEFT JOIN u ON ue.user_id = u.user_id
GROUP by 1, 2, 3, 4, 5
SQL
복사
•
Model 빌딩: dbt run
◦
최종 테이블까지 생성
◦
최종 analytics 테이블의 타입을 View에서 Table로 바꾸기
▪
dbt_project.yml 편집
models:
learn_dbt:
+materialized: view
dim:
+materialized: table
analytics:
+materialized: table
src:
+materialized: ephemeral
SQL
복사