Search

[러닝 스파크 9장] 아파치 스파크를 통한 안정적인 데이터 레이크 구축 + 델타 레이크

[러닝 스파크 9장] 아파치 스파크를 통한 안정적인 데이터 레이크 구축 + 델타 레이크

시간이 지나면서 다양한 종류의 저장 솔루션들이 데이터베이스와 데이터 레이크에서 발전했습니다. 이 장에서는 이러한 솔루션들과 아파치 스파크를 사용하는 방법에 대해 살펴보고, 데이터 레이크하우스라는 차세대 스토리지 솔루션에 대해서도 알아보겠습니다. 데이터베이스의 트랜잭션 보장과 데이터 레이크의 확장성과 유연성을 모두 제공할 수 있는 이 솔루션은 중요한 역할을 합니다.

최적의 스토리지 솔루션의 중요성

1.
확장성 및 성능
스토리지 솔루션은 데이터 볼륨을 확장하고, 워크로드에 맞게 읽기/쓰기를 수행할 수 있어야 합니다.
2.
트랜잭션 지원
복잡한 워크로드에서는 ACID 트랜잭션 지원이 필수적입니다. 이는 동시에 데이터를 읽고 쓰기 때문에 최종 결과의 품질을 보장합니다.
3.
다양한 데이터 형식 지원
스토리지 솔루션은 비정형 데이터(예: 원시 로그와 같은 텍스트 파일), 반정형 데이터(예: JSON 데이터) 및 정형 데이터(예: 테이블 형식 데이터)를 저장할 수 있어야 합니다.
4.
다양한 워크로드 지원
스토리지 솔루션은 다음과 같은 다양한 비즈니스 워크로드를 지원해야 합니다:
전통적인 SQL BI 분석
ETL 작업으로 비정형 원시 데이터 처리
실시간 모니터링 및 장애 알람으로 스트리밍 워크로드 처리
추천 및 이탈 예측으로 ML 및 AI 워크로드 처리
5.
개방성
광범위한 워크로드를 지원하는 경우, 데이터를 오픈 데이터 형식으로 저장해야 하는 경우가 많습니다. 이 경우, 표준 API를 사용하면 데이터에 다양한 도구와 엔진에서 액세스할 수 있으므로, 비즈니스는 각 유형의 워크로드에 대해 최적의 도구를 사용하여 최상의 비즈니스 결정을 내릴 수 있습니다.

데이터 베이스

데이터베이스 아키텍처와 해당 워크로드, 데이터베이스의 분석 워크로드에 아파치 스파크를 사용하는 방법
데이터베이스는 구조화된 데이터를 SQL 쿼리를 사용하여 읽을 수 있는 테이블로 저장하도록 설계되어 있다.
데이터는 데이터베이스 관리 시스템이 데이터 저장 및 처리를 모두 최적화할 수 있도록 하는 엄격한 스키마를 준수
즉, 디스크 상의 파일에 있는 데이터 및 인덱스의 내부 레이아웃을 고도로 최적화된 쿼리 처리 엔진과 긴밀하게 연결하여, 모든 읽기/쓰기 작업에 대하여 강력한 트랜잭션 ACID를 보장하고 저장된 데이터를 매우 빠르게 계산.
데이터베이스의 SQL 워크로드
1.
온라인 트랜잭션 처리(online transaction processing, OLTP) 워크로드
은행 계좌 거래와 마찬가지로 OLTP 워크로드는 일반적으로 한 번에 몇 개의 레코드를 읽거나 업데이트 하는 간단한 쿼리로 높은 동시성, 짧은 지연 시간이 특징
2.
온라인 분석 처리(online analytical processing, OLAP)
정기 리포트와 같은 OLAP 워크로드는 일반적으로 많은 레코드에 대한 높은 처리량 스캔이 필요 한 복잡한 쿼리(집계 및 조인 포함)
아파치 스파크는 주로 OLTP 워크로드가 아닌 OLAP 워크로드용으로 설계된 쿼리 엔진

아파치 스파크를 사용하여 데이터베이스 읽기 및 쓰기

커넥터 에코시스템 ⇒ 아파치 스파크는 데이터를 읽고 쓰기 위해 다양한 데이터 베이스에 연결 가능
JDBC 드라이버가 있는 데이터베이스(예: PostgreSQL, MySQL)의 경우 내장 JDBC 데이터 소스를 적절한 JDBC 드라이버 jar 파일을 사용하여 데이터에 액세스할 수 있다.
다른 많은 최신 데이터베이스(예: 애저 코스모스 DB, 스노우플레이크)의 경우 적절한 형식으로 호출할 수 있는 전용 커넥터가 있다.

데이터베이스의 한계

지난 세기부터 데이터베이스와 SQL 쿼리는 BI 워크로드 구축을 위한 훌륭한 솔루션 But, 지난 10년 동안 분석 워크로드에서 두 가지의 새로운 추세가 나타남.
1.
데이터 크기의 증가
빅데이터의 도래와 함께 트렌드와 사용자 행동을 이해하기 위해 모든 것(페이지뷰, 클릭 등)을 측정 하고 수집 ⇒ 기업이나 조직에서 수집하는 데이터의 양이 수십 년 전 기가바이트에서 오늘날 테라바이트 및 페타바이트로 증가했다
2.
분석의 다양성 증가
데이터 수집의 증가와 함께 더 깊은 통찰력이 필요해졌다. ⇒ 머신러닝 및 딥러닝과 같은 복잡한 분석의 폭발적 성장
데이터베이스는 다음과 같은 제약 사항으로 인해 이러한 새로운 추세를 수용하는 데 다소 부적합
1.
데이터베이스를 확장하는 데 비용이 너무 많이 듦
데이터베이스는 단일 시스템에서 데이터를 처리하는 데 매우 효율적
데이터 볼륨의 증가 속도 > 단일 시스템의 성능 기능 증가 속도.
여러 시스템을 사용하여 데이터를 병렬로 처리하는 수평 확장 - 처리 엔진을 위한 유일한 방법
But, 대부분의 데이터베이스, 특히 오픈소스 데이터베이스는 분산 처리를 수행하기 위한 확장이 가능하게 설계되지 않음 ⇒ 이러한 처리 요구사항을 충족하는 소수의 산업용 데이터베이스 솔루션은 특수 하드웨어에서 실행되는 독점 솔루션인 경향이 있어 구입 및 유지 관리 비용이 매우 높음
2.
데이터베이스는 비SQL 기반 분석을 잘 지원하지 않음
데이터베이스는 일반적으로 해당 데이터베이스의 SQL 처리 엔진만 읽을 수 있도록 고도로 최적화된 복잡한(종종 독점된) 형식으로 데이터를 저장
머신러닝 및 딥러닝 시스템과 같은 다른 처리 도구가 데이터에 효율적으로 액세스할 수 없음을 의미 - pandas 등
데이터베이스의 한계로 인해 데이터 레이크라고 하는 완전히 다른 데이터 저장 접근방식이 개발됨

데이터 레이크

데이터 레이크 : 대부분의 데이터베이스와 달리 데이터 레이크는 범용 하드웨어에서 실행되고, 수평으로 쉽게 확장되는 분산 스토리지 솔루션.
데이터 레이크에 대한 간략한 소개
데이터 레이크 아키텍처는 데이터베이스와 달리 분산 컴퓨팅 시스템에서 분산 스토리지 시스템을 분리
각 시스템은 워크로드에 의한 필요에 따라 확장 가능
데이터는 모든 처리 엔진이 표준 API를 사용하여 읽고 쓸 수 있도록 오픈 형식의 파일로 저장
2000년대 후반 아파치 하둡 프로젝트의 HDFS(하둡 파일 시스템)에 의해 대중화된 아이디어
스토리지 시스템
클러스터에서 HDFS를 실행하거나 클라우드 개체 저장소(예: AWS S3, 애저 데이터 레이크 스토리지 또는 구글 클라우드 스토리지)를 사용하도록 선택
파일 형식
다운스트림 워크로드에 따라 데이터는 정형(예: 파케이, ORC), 반정형(예: JSON) 또는 때로 비정형 형식(예: 텍스트, 이미지, 오디오, 비디오)의 파일로 저장
처리 엔진
수행할 분석 작업 부하의 종류에 따라 처리 엔진이 선택
배치 처리 엔진(예: 스파크, 프레스토(AWS athena), 아파치 하이브)
스트림 처리 엔진(예: 스파크, 아파치 플링크)
머신러닝 라이브러리(예: 스파크 MLlib, scikit-learn, R 등)
현재의 워크로드에 가장 적합한 스토리지 시스템, 오픈 데이터 형식 및 처리 엔진을 선택할 수 있 는 기능은 데이터 레이크의 큰 장점

아파치 스파크를 사용하여 데이터 레이크에서 읽고 쓰기

아파치 스파크는 위에서 필요한 모든 주요 기능을 제공하므로 자체 데이터 레이크를 구축할 때 사용
1.
다양한 워크로드 지원
배치 처리, ETL 작업, 스파크 SQL을 사용한 SQL 작업. 정형화 스트리밍을 사용한 스트림 처리, MLlib을 사용한 머신러닝
2.
다양한 파일 형식 지원
비정형, 반정형, 정형 파일 형식에 대한 기본 제공 지원
3.
다양한 파일 시스템 지원
하둡의 파일시스템 API를 지원하는 모든 스토리지 시스템에 대해 데이터 접근 가능
대부분의 클라우드 및 온프레미스 스토리지 시스템이 위 API 지원
이로 인해, 스파크는 대부분의 스토리지 시스템에서 읽고 쓰는 것이 가능
참고로 클라우드 스토리지 시스템(S3)은 표준 파일 시스템에서 기대하는 것과 파일 작업의 특성이 동일하지 않은 경우가 종종 있기 때문에, 이에 따라 스파크를 구성하지 않으면 처리 결과가 일관되지 않을 수 있다.

데이터 레이크의 한계

트랜잭션에 대한 보증이 부족하다.
특히, 데이터 레이크는 다음에 대하여 ACID 보증을 제공하지 못한다.
1.
원자성과 독립성
처리 엔진은 분산 방식으로 많은 파일을 데이터 레이크에 기록
작업이 실패하면 이미 작성된 파일을 롤백하는 메커니즘이 없으므로 잠재적으로 손상된 데이터가 남을 수 있다
2.
일관성
실패한 쓰기에 대한 원자성 부족으로 인해 데이터에 대한 일관성을 잃게 된다
데이터 레이크는 성공적으로 작성된 데이터라도 데이터 품질을 보장하기 어려운 구조
데이터 레이크의 매우 흔한 문제는 실수로 인하여 뜻하지 않게 기존 데이터와 일치하지 않는 형식 및 스키마로 데이터 파일을 작성하는 것
데이터 레이크의 이러한 제약사항을 해결하기 위한 트릭
1.
️ 데이터 레이크에 있는 대규모 데이터 파일 컬렉션은 종종 열 값에 따라 하위 디렉터리로 ‘파티션’ 된다
a.
날짜별로 분할된 대형 파케이 형식의 하이브 테이블 (S3에 날짜별로 데이터 파일이 존재하는 것과 유사)
b.
기존 데이터의 원자적 수정 → 일부 레코드를 업데이트 하거나 삭제하기 위해서 종종 하위 파티션 디렉터리 전체 데이터를 다시 작성
2.
️ 데이터에 대한 동시 접근 또는 이로 인한 데이터 불일치를 회피하기 위해, 데이터 업데이트 작업(예: 일일 ETL 작업) 및 데이터 쿼리 작업(예: 일일 보고 작업)에 대해 일정에 시차를 두고 실행

레이크하우스: 스토리지 솔루션 진화의 다음 단계

레이크하우스

레이크하우스는 데이터 레이크와 데이터웨어하우스의 최고 요소를 결합한 새로운 패러다임으로, OLAP 워크로드를 위한 시스템.
데이터 레이크에 사용되는 확장 가능한 저비용 스토리지에서 직접 데이터베이스와 유사한 데이터 관리 기능을 제공합니다.
따라서 다음과 같은 기능을 제공합니다.
트랜잭션 지원
데이터베이스와 유사하게 레이크하우스는 동시 작업 부하가 있는 경우 ACID 보장을 제공
스키마 적용 및 거버넌스
레이크하우스는 잘못된 스키마가 있는 데이터가 테이블에 삽입되는 것을 방지하고, 필요할 때 테이블 스키마를 명시적으로 발전시켜 지속해서 변화하는 데이터를 수용할 수 있다.
오픈 형식open format의 다양한 데이터 유형 지원
데이터베이스와 달리 데이터 레이크와 유사하지만 레이크하우스는 구조화, 반구조화 또는 비구조화를 포함하여 많은 새로운 데이터 애플리케이션에 필요한 모든 유형의 데이터를 저장, 정제, 분석 및 액세스할 수 있다.
다양한 워크로드 지원
오픈 API를 사용하여 데이터를 읽어내는 다양한 도구들로 구동되는 레이크하우스는 다양한 워크로드가 단일 저장소의 데이터에 대해 작동할 수 있도록 한다
기존 SQL 및 스트리밍 분석에서 머신러닝에 이르기까지 다양하고 복잡한 데이터 솔루션을 보다 쉽게 구축 가능
업서트 및 삭제 지원
변경 데이터 캡처 (change-data-capture, CDC) 및 저속 변경 디멘션 (slowly changing dimension, SCD) 작업 과 같은 복잡한 사용 사례에서는 테이블의 데이터를 지속적으로 업데이트 해야 한다
데이터 거버넌스
데이터 무결성에 대해 추론하고, 정책 준수를 위해 모든 데이터 변경사항을 감사 할 수 있는 도구를 제공
현재 레이크하우스를 구축하는 데 사용할 수 있는 몇 가지 오픈소스 시스템이 있습니다. 아파치 후디(Apache Hudi), 아파치 아이스버그 (Apache Iceberg) 및 델타 레이크(Delta Lake) 등이 그 중 일부. 각 시스템은 아파치 스파크의 데이터 소스 API와 유사한 특성을 가지고 있다.
위의 오픈소스 시스템들은
1.
확장 가능한 파일 시스템에 대용량 데이터를 정형 파일 형식으로 저장합니다.
2.
데이터베이스와 유사하게, 트랜잭션 로그를 유지하여 데이터에 대한 원자적 변경 타임라인을 기록합니다.
3.
로그를 사용하여 테이블 데이터의 버전을 정의하고, 읽기와 쓰기 간의 스냅샷 격리 보장을 제공합니다.
4.
아파치 스파크를 사용하여 테이블에 대한 읽기 및 쓰기를 지원합니다.

델타 레이크

아파치 스파크의 창시자가 구축한 리눅스 파운데이션에서 호스팅하는 오픈 소스 프로젝트
Delta Lake는 데이터 레이크 위에 Lakehouse 아키텍처를 구축할 수 있는 오픈소스 프로젝트
데이터 레이크의 문제점과 데이터 웨어하우스의 문제점을 보완해줄 수 있다.
데이터 레이크는 아주 큰 데이터를 저장할 수 있지만 체계가 정확하게 잡히지 않으면 데이터 늪이 되기가 쉽다.
S3와 같은 클라우드 스토리지는 가장 비용 효율적인 스토리지 시스템이다. 그러나 key-value로 구현이 되어있어서 ACID 트랜잭션과 같은 고성능을 구현하기는 어렵다.
listing object와 같은 메타데이터 동작은 비싸며 일관성 보장은 제한적이다.
델타레이크는 이런 문제점을 보완할 수 있다. ACID 성질을 가질 수 있게 하여 트랜잭션을 구현하며, 테이블에서의 시간 여행을 가능하게 한다. upsert를 구현할 수 있고, 아주 큰 데이터에서 쿼리를 빠르게 실행할 수 있게 해준다.
제공하는 것
ACID 트랜잭션
확장 가능한 메타데이터 처리
Spark 분산 처리를 활용하여 페타바이트 규모 테이블의 메타데이터를 쉽게 처리한다.
스트리밍 및 통합
스키마 적용
시간 여행
Delete, Update와 같은 명령어를 사용하더라도 기존 데이터는 그대로 보존되고 삭제되고 업데이트 된 기록을 남기기 때문에 테이블 시간 여행을 가능하게 한다.
Upsert & Delete
CDC, SCD, Straeming Upsert와 같은 복잡한 기능들을 쉽게 할 수 있다.

아파치 스파크 및 델타 레이크로 레이크하우스 구축

다음 방법 중 하나로 델타 레이크 라이브러리에 연결하도록 아파치 스파크를 구성할 수 있다.
대화형 셸 설정
Delta Lake 버전 선택하기
Delta Lake 버전은 Spark 버전과 맞춰야하므로 현재 사용중인 스파크 버전과 맞는 델타레이크 버전을 확인
아파치 스파크 3.0을 사용
--packages io.delta:delta-core_2.12:0.7.0
pyspark —packages io.delta:delta-core_2.12:0.7.0
스파크 2.4를 실행하는 경우 델타 레이크 0.6.0을 사용

델타 레이크 테이블에 데이터 로드

아파치 스파크 및 모든 정형 데이터 형식(예: 파케이)을 사용하여 데이터 레이크를 구축하는 데 익숙한 경우 기존 워크로드를 마이그레이션하여 델타 레이크 형식을 사용하는 것이 매우 쉽다
format("delta") 사용
# 소스 데이터 경로 설정 sourcePath = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet" # 델타 레이크 경로 설정 deltaPath = "/tmp/loans_delta" # 같은 대출 데이터를 사용하여 델타 테이블 생성 spark .read .format("parquet") .load(sourcePath) .write.format("delta") .save(deltaPath) # loans_delta라는 데이터의 뷰 생성 spark .read .format("delta") .load(deltaPath) .createOrReplaceTempView("loans_delta")
Python
복사
# 대출 로우 카운트 spark.sql("SELECT count(*) FROM loans_delta").show() +-------------+ |count(1)| +------------+14705+...............+ # 대출 테이블의 첫 5행 spark.sql("SELECT * FROM loans_delta LIMIT 5").show()
Python
복사

델타 레이크 테이블에 데이터 스트림 로드

정적 데이터 프레임과 마찬가지로 형식을 "delta"로 설정하여 델타 레이크 테이블에 쓰고 읽도록 기존 정형화 스트리밍 작업을 쉽게 수정 가능
마찬가지로 정형화 스트리밍이 종단 간 일회 처리를 보장
델타 레이크에는 JSON, 파케이 또는 ORC와 같은 기존 형식에 비해 몇 가지 추가 이점이 존재
1.
배치 처리 및 스트리밍 작업 모두에서 동일한 테이블에 쓰기 가능
2.
여러 스트리밍 작업에서 동일한 테이블에 데이터 추가 가능
3.
동시 쓰기에서도 ACID 보장을 제공
newLoanStreamDF = ... # 새 대출 데이터를 사용하는 스트리밍 데이터프레임 checkpointDir = ... # 스트리밍 체크포인트를 위한 디렉터리 streamingQuery = (newLoanStreamDF.writeStream .format("delta") .option("checkpointLocation", checkpointDir) .trigger(processingTime="10 seconds") .start(deltaPath))
Python
복사

데이터 손상을 방지하기 위해 쓰기 시 스키마 적용

끊임없이 변화하는 데이터의 세계에서 델타 레이크는 테이블에 새로운 열을 추가하는 것이 가능
⇒ 새로운 열은 ”mergeSchema” 옵션을 "true"로 설정하였을 때 명시적으로 추가 가능
loanUpdates.write.format("delta").mode("append") .option("mergeSchema", "true") .save(deltaPath)
Python
복사
⇒ closed 열이 테이블 스키마에 추가되고, 새 데이터가 추가된다
⇒ 기존 행을 읽을 때 새 열의 값은 NULL로 간주

기존 데이터 변환

델타 레이크는 복잡한 데이터 파이프라인을 구축할 수 있는 DML 명령 UPDATE, DELETE 및 MERGE를 지원. 이러한 각 데이터 수정 작업은 ACID를 보장한다. 참고로 이 작업을 데이터 프레임 또는 테이블을 사용하여 익숙한 python, scala 언어의 API와 함께 명령을 유연하게 사용 가능
오류 수정을 위한 데이터 업데이트
데이터를 검토할 때 addr_state = 'OR’에 할당된 모든 대출이 addr_state = ’WA'에 할당되어야 한다는 것을 깨달았다고 가정하는 문제
만약 대출 테이블이 파케이 테이블인 경우 이러한 업데이트를 수행하려면 아래를 수행
1. 영향을 받지 않는 모든 행을 새 테이블에 복사한다. 2. 영향을 받는 모든 행을 데이터 프레임에 복사한 다음 데이터 수정을 수행한다. 3. 이전에 언급한 데이터 프레임의 레코드를 새 테이블에 삽입한다. 4. 이전 테이블을 제거하고 새 테이블의 이름을 이전 테이블 이름으로 바꾼다.
Spark3.0에서는 위의 모든 단계를 수동으로 수행하는 대신, SQL UPDATE 명령을 간단히 실행
from delta.tables import * deltaTable = DeltaTable.forPath(spark, deltaPath) deltaTable.update("addr_state = 'OR'", {"addr_state": "'WA'"})
Python
복사
사용자 관련 데이터 삭제
EU의 일반 데이터 보호 규정과 같은 데이터 보호 정책이 시행 됨에 따라 모든 테이블에서 사용자 데이터를 삭제할 수 있는 것이 그 어느 때보다 중요해졌다.
⇒ 완전히 상환된 모든 대출에 대한 데이터를 삭제해야 한다고 가정
from delta.tables import * deltaTable = DeltaTable.forPath(spark, deltaPath) deltaTable.delete("funded_amnt >= paid_amnt")
Python
복사
merge()를 사용하여 테이블에 변경 데이터 업서트
일반적인 업서트 용례 : OLAP 워크로드를 위해 OLTP 테이블에서 행 변경사항을 다른 테이블로 복제해야 하는 변경 데이터 캡처
Spark 3.0부터는 MERGE SQL 명령을 기반으로 하는 DeltaTable.merge() 작업을 사용하여 이러한 변경사항을 테이블에 업서트
from delta.tables import * ''' 새로운 대출 정보에 대한 추가 테이블이 있습니다. 이 중 일부는 신규 대출이며, 다른 일부는 기존 대출에 대한 업데이트입니다. 이 테이블은 loan.delta 테이블과 동일한 스키마를 가집니다. ''' deltaTable = DeltaTable.forPath(spark, deltaPath) deltaTable.alias("t") .merge(loanUpdates.alias("s"), "t.loan_id = s.loan_id") .whenMatchedUpdateAll() .whenNotMatchedInsertAll() .execute()
Python
복사
이러한 캡처된 변경사항의 스트림이 있는 경우, 정형화 스트리밍 쿼리를 사용하여 이러한 변경사항을 지속적으로 적용할 수 있다.
데이터 삽입 전용 병합을 사용하여 삽입하는 동안 데이터 중복 제거
델타 레이크의 병합 작업은 다음과 같은 고급 기능을 포함하여 ANSI 표준에서 지정한 것 이상의 확장된 구문을 지원
삭제 수행
예를 들면, MERGE ... WHEN MATCHED THEN DELETE.
조건 구문
예를 들면, MERGE ... WHEN MATCHED AND <조건> THEN ....
선택적 수행
모든 MATCHED 절과 NOT MATCHED 절은 선택사항입니다.
스타 구문
예를 들어, UPDATE * 및 INSERT *는 소스 데이터 세트의 일치하는 열로 대상 테이블의 모든 열을 업데이트하거나 삽입합니다. 이에 상응하는 델타 레이크 API는 이전 섹션에서 본 updateAll() 및 insertAll()입니다.
INSERT 작업만 사용하여 다음 병합 작업을 실행하여 삽입 하는 동안, loan_id로 중복 제거하는 경우
from delta.tables import * deltaTable = DeltaTable.forPath(spark, deltaPath) deltaTable.alias("t").merge( historicalUpdates.alias("s"), "t.loan_id = s.loan_id" ).whenNotMatchedInsertAll().execute()
Python
복사
삭제가 포함된 CDC 및 SCD 테이블처럼 확장된 병합 구문으로 단순해진 사용 사례 중에서 훨씬 더 복잡한 사례들이 많이 존재
작업 내역으로 데이터 변경 감사
델타 레이크 테이블에 대한 모든 변경사항은 테이블의 트랜잭션 로그에 커밋으로 기록
⇒ 테이블의 작업 기록을 쿼리
from delta.tables import * deltaTable = DeltaTable.forPath(spark, deltaPath) deltaTable.history(3).select("version", "timestamp", "operation", "operationParameters").show(truncate=False)
Python
복사
시간 탐색을 사용하여 테이블의 이전 스냅샷 쿼리
DataFrameReader 옵션 "versionAsOf" 및 "timestampAsOf"를 사용하여 테이블의 이전 버전 스냅샷을 쿼리
spark.read.format("delta") .option("timestampAsOf", "2020-01-01") # 테이블 생성 이후 타임스탬프 .load(deltaPath) spark.read.format("delta").option("versionAsOf", "4").load(deltaPath)
Python
복사
다음과 같은 다양한 상황에서 유용
1. 특정 테이블 버전에서 작업을 다시 실행하여 머신러닝 실험 및 보고서 재현
2. 감사를 위해 서로 다른 버전 간의 데이터 변경 사항 비교
3. 이전 스냅샷을 데이터 프레임으로 읽고 테이블을 덮어씀으로 인해 잘못된 변경사항 롤백

그냥 이 것 만이라도 기억하자

데이터 레이크
데이터 레이크는 데이터베이스의 일부 한계를 완화하기 위해 구축되었으며 아파치 스파크는 이를 구축할 수 있는 최고의 도구
데이터 레이크에는 데이터베이스가 제공하는 일부 주요 기능(예: ACID 보장)이 여전히 부족
레이크 하우스
레이크하우스는 데이터베이스 및 데이터 레이크의 최고의 기능을 제공하고, 다양한 사용 사례 및 워크로드의 모든 요구 사항을 충족하는 것을 목표로 하는 차세대 데이터 솔루션
델타 레이크는 아파치 스파크와 함께 레이크하우스라는 집을 짓기 위해 다음과 같은 기능을 제공하며 뛰어난 벽돌이 되주었습니다 (꽃과 같은…)
데이터베이스와 같은 트랜잭션 보장 및 스키마 관리
데이터 레이크와 같은 확장성 및 개방성
ACID 보장으로 동시 배치 및 스트리밍 워크로드 지원
업데이트, 삭제 및 병합 작업을 사용하여 기존 데이터 변환을 지원하는 ACID 보장
버전 관리, 작업 이력 감사 및 이전 버전 쿼리 지원

Reference

SCD란?
데이터 웨어하우스에서 시간 경과에 따른 현재 및 과거 데이터를 모두 저장하고 관리하는 차원이다.

참고 - S3 델타레이크

S3와 함께 사용하기
S3와 함께 사용하기 위해서는 패키지가 필요
io.delta:delta-core_<scala_version>:<delta_version>,org.apache.hadoop:hadoop-aws:<hadoop_version>
델타 테이블 코드 예시
델타 테이블 인스턴스 함수를 실행시키면 실행되는 즉시 S3에 처리되어 저장이 된다.
from delta.tables import * from pyspark.sql.functions import * # 델타 테이블 불러오기 deltaTable = DeltaTable.forPath(spark, "s3a://{bucket_name}/test/2/delta-table") # 델타 테이블 UPDATE deltaTable.update( condition = expr("id % 2 == 0"), set = { "id": expr("id + 100") }) # 델타 테이블 DELETE deltaTable.delete(condition = expr("id % 2 == 0")) # 델타 테이블 UPSERT newData = spark.range(0, 20) deltaTable.alias("oldData") \\ .merge( newData.alias("newData"), "oldData.id = newData.id") \\ .whenMatchedUpdate(set = { "id": col("newData.id") }) \\ .whenNotMatchedInsert(values = { "id": col("newData.id") }) \\ .execute() deltaTable.toDF().show()
Python
복사
구조적 스트리밍 쓰기과 읽기 예시
streamingDf = spark.readStream.format("rate").load() stream = streamingDf.selectExpr("value as id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")
Python
복사
stream2 = spark.readStream.format("delta").load("/tmp/delta-table").writeStream.format("console").start()
Python
복사
S3에 저장되는 방식
Delete시에도 파일이 제거되지 않고 파일이 추가가 되는 특징이 있다.
_delta_log/에는 한 번 커밋될 때마다 파일이 하나씩 늘어난다.