Search

[러닝스파크 4장] 스파크 SQL과 데이터 프레임

스파크 SQL과 데이터 프레임

스파크 애플리케이션에서의 스파크 SQL
다양한 정형 데이터를 읽거나 쓸 수 있다(예: JSON. 하이브 테이블, Parquet. Avro. ORC. CSV)
Tableau, power Bl, Talend와 같은 외부 business intelligence, Bi의 데이터 소스나 MySQL 및 PostgreSQL과 같은 RDBMS의 데이터를 JDBC/ODBC 커넥터를 사용하여 쿼리 가능
스파크 애플리케이션에서 데이터베이스 안에 테이블 또는 뷰로 저장되어 있는 정형 데이터와 소통할 수 있도록 프로그래밍 인터페이스를 제공

스파크 애플리케이션에서 스파크 SQL 사용

SparkSession
정형화 API로 스파크를 프로그래밍하기 위한 통합된 진입점 unified entry point을 제공
SparkSession을 사용하면 쉽게 클래스를 가져오고 코드에서 인스턴스를 생성할 수 있다.
SQL 쿼리를 실행 ⇒ SparkSession 인스턴스에서 spark.sql(HSELECT * FROM myTableName")과 같은 sql() 함수를 사용
실습
데이터 설명
날짜, 지연, 거리, 출발지, 목적지 등 미국 항공편에 대한 데이터가 포함된 항공사 정시 운항 능력 및 비행 지연 원인 데이터세트
스키마를 사용하여 제공된 데이터를 데이터 프레임으로 읽고, 데이터 프레임을 임시 뷰(임시 뷰에 대해서는 곧 설명하겠다)로 등록하여 SQL로 쿼리
import findspark findspark.init() from pyspark.sql import SparkSession spark = (SparkSession .builder .appName("SparkSQLExampleApp") .getOrCreate()) # 데이터세트 경로 csv_file = "./departuredelays.csv" # 읽고 임시뷰를 생성 # 스키마 추론(더 큰 파일의 경우 스키마를 지정해주도록 하자) df = (spark.read.format("csv") .option("inferSchema", "true") .option("header", "true") .load(csv_file)) df.createOrReplaceTempView("us_delay_flights_tbl")
Python
복사
임시뷰를 사용할 수 있고, 스파크 SQL을 사용하여 SQL 쿼리를 실행 가능
스파크 SQL이 ANSL2003과 호환되는 SQL 인터페이스를 제공하고, SQL과 데이터 프레임 간에 상호 운용성을 보여 준다는 점에 주목
먼저 비행거리가 1,000마일 이상인 모든 항공편을 찾아보자.
spark.sql(""" SELECT distance, origin, destination FROM us_delay_flights_tbl WHERE distance > 1000 ORDER BY distance DESC """)\ .show(10)
Python
복사
샌프란시스코SFO와 시카고ORD 간 2시간 이상 지연이 있었던 모든 항공편
spark.sql(""" SELECT date, delay, origin, destination FROM us_delay_flights_tbl WHERE delay > 120 AND ORIGIN = 'SFO' AND DESTINATION = 'ORD' ORDER by delay DESC """).show(10)
Python
복사
SQL의 CASE 절을 사용하는 좀 더 복잡한 쿼리를 시도
출발지와 목적지에 관계없이 모든 미국 항공편에 매우 긴 지연(> 6시간), 긴 지연(2~6시간) 등의 지연에 대한 표시를 레이블로 지정. 사람이 읽을 수 있도록 Flight_Delays라는 새 칼럼으로 해당 레이블을 추가
spark.sql("""SELECT delay, origin, destination, CASE WHEN delay > 360 THEN 'Very Long Delays' WHEN delay >= 120 AND delay <= 360 THEN 'Long Delays' WHEN delay >= 60 AND delay < 120 THEN 'Short Delays' WHEN delay > 0 AND delay < 60 THEN 'Tolerable Delays' WHEN delay = 0 THEN 'No Delays' ELSE 'Early' END AS Flight_Delays FROM us_delay_flights_tbl ORDER BY origin, delay DESC """).show(10)
Python
복사
위의 3가지 쿼리는 모두 동등한 데이터 프레임 API 쿼리로 표현될 수 있다.
# Spark SQL 예제 1번과 유사한 데이터 프레임 예제 from pyspark.sql.functions import col, desc (df.select("distance", "origin", "destination") .where(col("distance") > 1000) .orderBy(desc("distance"))).show(10) # 또는 (df.select("distance", "origin", "destination") .where("distance > 1000") .orderBy("distance", ascending=False).show(10))
Python
복사
데이터 프레임 API를 사용하여 다른 두 SQL 쿼리 또한 변환
정형 데이터를 쿼리할 수 있도록 스파크는 메모리와 디스크상에서 뷰와 테이블의 생성 및 관리를 해야 하는 복잡한 작업들을 관리

SQL 테이블과 뷰

스파크는 각 테이블과 해당 데이터에 관련된 정보인 스키마, 설명, 테이블 명, 데이터베이스명, 칼럼명, 파티션, 실제 데이터의 물리적 위치 등의 메타데이터를 보유.
이 모든 정보는 중앙 메타스토어에 저장
스파크는 스파크 테이블만을 위한 별도 메타스토어를 생성하지 않고 기본적으로는 /user/hive/warehouse에 있는 아파치 하이브 메타스토어를 사용하여 테이블에 대한 모든 메타데이터를 유지
스파크 구성 변수 spark.sql.warehouse.dir을 로컬 또는 외부 분산 저장소로 설정하여 다른 위치로 기본 경로를 변경 가능
스파크는 관리형(managed)과 비관리형(unmanaged)이라는 두 가지 유형의 테이블을 생성 가능
관리형 테이블
메타데이터와 파일 저장소의 데이터를 모두 관리
파일 저장소
로컬 파일 시스템 또는 HDFS거나 Amazon S3 및 Azure Blob과 같은 객체 저장소
비관리형 테이블
오직 메타데이터만 관리하고 카산드라와 같은 외부 데이터 소스에서 데이터를 직접 관리
DROP TABLE <테이블명>과 같은 SQL 명령
관리형 테이블의 경우 ⇒ 메타데이터와 실제 데이터를 모두 삭제
비관리형 테이블의 경우 ⇒ 동일한 명령이 실제 데이터는 그대로 두고 메타데이터만 삭제

SQL 데이터베이스와 테이블 생성하기

스파크는 default 데이터베이스 안에 테이블을 생성
사용자의 데이터베이스를 새로 생성하고 싶다면, 스파크 애플리케이션이나 노트북에서 SQL 명령어를 실행할 수 있다.
예제
시작하기에 앞서 learn_spark_db라는 데이터베이스를 생성하고 스파크에게 해당 데이터베이스를 사용하겠다고 알려준다
# 스칼라 및 파이썬 예제 spark.sql("CREATE DATABASE learn_spark_db") spark.sql("USE learn_spark_db")
Python
복사
애플리케이션에서 실행되는 어떠한 명령어든 learn_spark_db 데이터베이스 안에서 생성되고 상주
관리형 테이블 생성
learn_spark_db 데이터베이스 안에 us_delay_f lights라는 관리형 테이블을 생성
Spark SQL
# 관리형 테이블 생성하기 spark.sql("CREATE TABLE managed_us_delay_flights_tbl (date STRING, delay INT,distance INT, origin STRING, destination STRING)")
Python
복사
Dataframe API
# 미국 항공편 지연 csv 파일 경로 csv_file = "./departuredelays.csv" # 앞의 예제에서 정의된 스키마 schema="date STRING, delay INT, distance INT, origin STRING, destination STRING" flights_df = spark.read.csv(csv_file, schema=schema) flights_df.write.saveAsTable("managed_us_delay_flights_tbl")
Python
복사
비관리형 테이블 생성
스파크 애플리케이션에서 접근 가능한 파일 저장소에 있는 파케이, CSV 및 JSON 파일 포맷의 데이터 소스로부터 비관리형 테이블을 생성 가능
## SQL API spark.sql(""" CREATE TABLE us_delay_flights_tbl ( date STRING, delay INT, distance INT, origin STRING, destination STRING ) USING csv OPTIONS (PATH './departuredelays.csv') """)
Python
복사
## dataframe api (flights_df .write .option("path", "/tmp/data/us_flights_delay") .saveAsTable("us_delay_flights_tbl"))
Python
복사

뷰 생성하기

기존 테이블을 토대로 뷰를 만들 수 있다.
뷰는 전역(해당 클러스터의 모든 SparkSession에서 볼 수 있음) 또는 세션 범위(단일 SparkSession에서만 볼 수 있음)일 수 있으며 일시적으로 스파크 애플리케이션이 종료되면 사라짐
질문
하나의 클러스터 안에 sparksession은 여러 개가 가능한가?
클러스터와 애플리케이션은 유의어 혹은 동의어인가?
뷰는 테이블과 달리 실제로 데이터를 소유하지 않기 때문에 스파크 애플리케이션이 종료되면 테이블은 유지되지만 뷰는 사라짐
예시
미국 비행 지연 데이터에서 뉴욕 JFK 및 샌프란시스코SFO가 출발지인 공항이 있는 하위 데이터세트에 대해서만 작업하려는 경우
다음 쿼리는 해당 테이블의 일부로 전역 임시 뷰 및 일반 임시 뷰를 생성
df_sfo = spark.sql("SELECT date, delay, origin, destination FROM us_delay_flights_tbl WHERE origin = 'SFO'") df_jfk = spark.sql("SELECT date, delay, origin, destination FROM us_delay_flights_tbl WHERE origin = 'JFK'") # Create a temporary and global temporary view df_sfo.createOrReplaceGlobalTempView("us_origin_airport_SFO_global_tmp_view") df_jfk.createOrReplaceTempView("us_origin_airport_JFK_tmp_view")
Python
복사
global_temp라는 전역 임시 데이터베이스에 전역 임시 뷰를 생성 ⇒ 해당 뷰에 액세스할 때는 global_temp.<view_name>접두사를 사용
SELECT * FROM global_temp.us_origin_airport_SFO_global_tmp_view
Python
복사
일반 임시 뷰는 global_temp 접두사 없이 접근 가능
spark.read.table("us_origin_airport_JFK_tmp_view") spark.sql("SELECT * FROM us_origin_airport_JFK_tmp_view")
Python
복사
테이블처럼 뷰도 드롭 가능
spark.catalog.dropGlobalTempView("us_origin_airport_SFO_global_tmp_view") spark.catalog.dropTempView("us_origin_airport_JFK_tmp_view")
Python
복사
임시 뷰 vs 전역 임시 뷰
임시 뷰
스파크 애플리케이션 내의 단일 SparkSession에 연결된다.
전역 임시 뷰
스파크 애플리케이션 내의 여러 SparkSession에서 볼 수 있다.
사용자는 단일 스파크 애플리케이션 내에서 여러 SparkSession을 만들 수 있다.
동일한 하이브 메타스토어 구성을 공유하지 않는 두 개의 서로 다른 SparkSession에서 같은 데이터에 액세스하고 결합하고자 할 때 이 점은 유용하게 사용

메타 데이터 보기

스파크는 각 관리형 및 비관리형 테이블에 대한 메타데이터를 관리
메타 데이터 저장을 위한 스파크 SQL의 상위 추상화 모듈인 카탈로그에 저장
카탈로그 : 데이터베이스, 테이블 및 뷰와 관련된 메타데이터를 검사
spark.catalog.listDatabases() spark.catalog.listTables() spark.catalog.listColumns("us_delay_flights_tbl")
Python
복사

SQL 테이블 캐싱하기

데이터 프레임처럼 SQL 테이블과 뷰 또한 캐시 및 언캐싱을 할 수 있다
테이블을 LAZY로 지정 가능
테이블을 바로 캐싱하지 않고 처음 사용되는 시점에서 캐싱
-- SQL 예제 CACHE [LAZY] TABLE <table-name> UNCACHE TABLE <table-name>
Python
복사

테이블을 데이터 프레임으로 읽기

일반적인 데이터 수집과 ETL 프로세스의 일부로 데이터 파이프라인을 구축.
애플리케이션의 다운스트림에서 사용할 수 있도록 스파크 SQL 데이터베이스 및 테이블을 정리된 데이터로 로드.
사용할 준비가 된 기존 데이터베이스 learn_spark_db와 테이블 us_delay_flights_tbl.
외부 JSON 파일에서 읽는 대신 SQL을 사용하여 테이블을 쿼리하고 반환된 결과 ⇒데이터 프레임에 저장 가능
us_flights_df = spark.sql("SELECT * FROM us_delay_flights_tbl") us_flights_df2 = spark.table("us_delay_flights_tbl")
Python
복사
기존 스파크 SQL 테이블에서 읽은 가공된 데이터 프레임을 가지게 됨
⇒ 스파크의 내장 데이터 소스를 사용하면 다양한 파일 형식과 상호 작용할 수 있는 유연함 덕택에 다른 형식으로도 데이터를 읽을 수 있다.

데이터 프레임 및 SQL 테이블을 위한 데이터 소스

스파크 SQL은 다양한 데이터 소스에 대한 인터페이스를 제공
데이터 소스 API를 사용하여 이러한 데이터 소스로부터 데이터를 읽고 쓸 수 있도록 일반적인 함수들을 제공
데이터 소스와 관련된 특정 옵션과 함께 기본 제공 데이터 소스, 사용 가능한 파일 형식, 데이터 로드 및 쓰기
DataFrameReaderDataFrameWriter
상위 수준 데이터 소스 API
서로 다른 데이터 소스 간에 의사소통하는 방법을 제공
DataFrameReader
데이터 소스에서 데이터 프레임으로 데이터를 읽기 위한 핵심 구조
DataFrameReader.format(args).option("key", "value").schema(args).load()
Python
복사
오직 SparkSession 인스턴스를 통해서만 이 DataFrameReader에 액세스 가능
DataFrame Reader의 인스턴스를 개별적으로 만들 수 없음
# 인스턴스 핸들을 얻기 SparkSession.read # or SparkSession.readstream
Python
복사
read는 정적 데이터 소스에서 DataFrame으로 읽기 위해 DataFrameReader에 대한 핸들을 반환
readstream은 스트리밍 소스에서 읽을 인스턴스를 반환
DataFrameReader의 공용 함수에 대한 인수는 각각 다른 값을 사용
일반적으로 정적 파케이 데이터 소스에서 읽을 때는 스키마가 필요하지 않다.
파케이 메타데이터는 보통 스키마를 포함하므로 스파크에서 스키마를 파악할 수 있다. 그러나 스트리밍 데이터 소스의 경우에는 스키마를 제공해야 한다
파케이는 효율적이고 칼럼 기반 스토리지를 사용하며 빠른 압축 알고리즘을 사용
DataFrameWriter
지정된 내장 데이터 소스에 데이터를 저장하거나 쓰는 작업을 수행
DataFrameReader와 달리 SparkSession이 아닌 저장하려는 데이터 프레임에서 인스턴스에 액세스가 가능
DataFrameWriter.format(args) .option(args) .bucketBy(args) .partitionBy(args) .save(path) DataFrameWriter.format(args).option(args).sortBy(args).saveAsTable(table) # 인스턴스 핸들을 가져오기 DataFrame.write ## or DataFrame.writeStream
Python
복사
각 함수에 대한 인수도 서로 다른 값을 사용
// 스칼라 예제 // JSON 사용 val location = ... df.write.format("json").mode("overwrite").save(location)
Python
복사

파케이

스파크의 기본 데이터 소스.
많은 빅데이터 처리 프레임 워크 및 플랫폼에서 지원되고 널리 사용
다양한 I/O 최적화를 제공하는 오픈소스 칼럼 기반 파일 형식이다.
예) I/O 최적화 - 저장 공간을 절약하고 데이터 칼럼에 대한 빠른 액세스를 허용하는 압축
데이터를 변환하고 정리한 후 파케이 형식으로 데이터 프레임을 저장하여 다운스트림에서 사용하는 것이 좋다
# 파케이 불러오기 file = """./summary-data/parquet/2010-summary.parquet/""" df = spark.read.format("parquet").load(file)
Python
복사
파케이가 메타데이터의 일부로 저장하기 때문에, 스트리밍 데이터 소스에서 읽는 경우가 아니면 스키마를 제공할 필요가 없다.
데이터 프레임을 파케이 파일로 쓰기
데이터 프레임을 작성하려면 DataFrameWriter에 대한 함수와 인수를 사용하여 파케이 파일을 저장할 위치를 제공
파케이가 기본 파일 형식임을 기억한다.  fomat() 함수가 없어도 데이터 프레임은 파케이 파일로 저장한다
# 데이터 프레임을 파케이 파일로 쓰기 (df.write.format("parquet") .mode("overwrite") .option("compression", "snappy") .save("/tmp/data/parquet/df_parquet"))
Python
복사
스파크 SQL 테이블에 데이터 프레임 쓰기
SQL 테이블에 데이터 프레임을 쓰는 것은 파일에 쓰는 것
save() 대신 saveAsTable()을 사용하면 us_delay_flights_tbl이라는 관리형 테이블이 생성됨
# 파이썬 예제 (df.write .mode("overwrite") .saveAsTable("us_delay_flights_tbl"))
Python
복사

JSON

JSON 파일을 데이터 프레임으로 읽기
단일 라인 모드 : 각 라인은 단일 JSON 개체를 나타냄
다중 라인 모드 : 전체 라인 객체는 단일 JSON 개체를 구성.
이 모드에서 읽으려면 option() 함수에서 multiLine을 true로 설정.
#파이썬 예제 file = "./summary-data/json/*" df = spark.read.format("json").load(file) # format() 함수에서 "json"을 지정
Python
복사
스파크 SQL 테이블로 JSON 파일 읽기
파케이에서 했던 것처럼 JSON 파일에서 SQL 테이블을 만들 수 있다.
테이블이 생성되면 SQL을 사용하여 데이터를 데이터 프레임으로 읽을 수 있다.
spark.sql("SELECT * FROM us_delay_flights_tbl").show()
Python
복사
데이터 프레임을 JSON 파일로 쓰기
데이터 프레임을 JSON 파일로 저장하는 것은 간단
압축된 JSON 파일들이 포함된 디렉터리가 지정된 경로에 생성
# 파이썬 예제 (df.write.format("json") .mode("overwrite") .option("compression", "snappy") .save("tmp/data/json/df_json") )
Python
복사
JSON 데이터 소스 옵션(DataFrameReader 및 DataFrameWriter)

CSV

csv 파일을 데이터 프레임으로 읽기
DataFrameReader의 함수 및 인수를 사용
#파이썬 예제 file = "./summary-data/csv/*" schema = "DEST_COUNTRY_NAME STRING, ORIGIN_COUNTRY_NAME STRING, count INT" df = (spark.read.format("csv") .option("header", "true") .schema(schema).option("mode", "FAILFAST") # 에러 발생 시 종료 .option("nullValue", "") # 모든 null 데이터를 따옴표로 교체 .load(file))
Python
복사
CSV 파일을 스파크 SQL 테이블로 읽기
CSV 데이터 소스에서 SQL 테이블을 생성하는 것은 파케이 또는 JSON을 사용하는 것과 다르지 않다.
-- SQL 예제 CREATE OR REPLACE TEMPORARY VIEW us_delay_flights_tbl USING csv OPTIONS ( path "./summary-data/csv/*") header "true", inferSchema "true", mode "FAILFAST" )
SQL
복사
# SQL을 사용하여 데이터 프레임으로 데이터를 읽을 수 있다. spark.sql("SELECT * FROM us_delay_flights_tbl").show(10)
Python
복사
데이터 프레임을 csv 파일로 쓰기
df.write.format("csv").mode("overwrite").save("/tmp/data/csv/df_csv")
Python
복사
DataFrameReader 및 DataFrameWriter의 추가 CSV 옵션

에이브로

아파치 카프카에서 메시지를 직렬화 및 역직렬화할 때 사용됨
JSON에 대한 직접 매핑, 속도와 효율성, 많은 프로그래밍 언어에 사용할 수 있는 바인딩을 포함한 많은 이점을 제공
에이브로 파일을 데이터 프레임으로 읽기(DataFrameReader)
# 파이썬 예제 df = (spark.read.format("avro") .load("./summary-data/avro/*")) df.show(truncate=False)
Python
복사
에이브로 파일을 스파크 SQL 테이블로 읽기
- SQL 예제 CREATE OR REPLACE TEMPORARY VIEW episode_tbl USING avro OPTIONS ( path "./summary-data/avro/*") ) spark.sql("SELECT * FROM episode_tbl").show(truncate=False)
Python
복사
데이터 프레임을 에이브로 파일로 쓰기
# 파이썬 예제 (df.write .format("avro") .mode("overwrite") .save("./tmp/data/avro/df_avro"))
Python
복사
에이브로 데이터 소스 옵션

ORC

최적화된 칼럼 기반 파일 형식
2가지 Spark 설정으로 어떤 ORC 구현체를 사용할지 지정 가능
spark.sql.orc.impl을 native로 설정
spark.sql.orc.enableVectorizedReader를 true로 설정
⇒ 스파크는 벡터화된 ORC 리더를 사용
벡터화 된 리더는 한 번에 한 행을 읽는 것이 아닌 행 블록(블록당 1024개)을 읽어 작업을 간소화하고 검색, 필터, 집계 및 조인과 같은 집중적인 작업에 대한 CPU 사용량을 줄임
USING HIVE OPTIONS (fileFormat ’ORC') - 하이브 ORC SerDe(직렬화 및 역직렬화)테이블
스파크 구성 파라미터인 spark.sql.hive.convertMetastoreOrc가 true 로 설정되었을 때 벡터화된 리더가 사용
ORC 벡터화 리더를 사용하여 ORC 파일을 데이터 프레임으로 읽기
# 파이썬 예제 file = "./summary-data/orc/*" df = spark.read.format("orc") .option ("path", file) .load() df.show(10. False)
Python
복사
스파크 SQL 테이블로 ORC 파일 읽기
- SQL 예제 CREATE OR REPLACE TEMPORARY VIEW episode_tbl USING orc OPTIONS ( path "./summary-data/avro/*") ) spark.sql("SELECT * FROM episode_tbl").show(truncate=False)
Python
복사
데이터 프레임을 ORC 파일로 쓰기
# 파이썬 예제 (df.write .format("orc") .mode("overwrite") .save("./tmp/data/avro/df_avro"))
Python
복사

이미지

텐서플로와 파이토치와 같은 딥러닝 및 머신러닝 프레임워크를 지원하기 위해 새로운 데이터 소스
컴퓨터 비전 기반 머신러닝 애플리케이션의 경우 이미지 데이터 세트를 로드하고 처리하는 것이 중요
이미지 파일을 데이터 프레임으로 읽기
# 파이썬 예제 file = "./cctvVideos/train_images/" image_df = spark.read.format("image").load(file) image_df.printSchema()
Python
복사
images_df.select("image, height","image.width", "image.nChannels","image.mode", "label") .show(5, truncate=False)
Python
복사

이진 파일

DataFrameReader는 각 이진 파일을 파일의 원본 내용와 메타데이터를 포함하는 단일 데이터 프레임 행(레코드)으로 변환
경로: StringType 수정시간: TimestampType 길이: LongType 내용: BinaryType
데이터 프레임으로 이진 파일 읽기
pathGlobFilter를 사용하여 파티션 검색 동작을 유지하면서 지정된 전역 패턴과 일치하는 경로로 파일을 로드
파티션된 디렉터리에서 모든 JPG 파일을 읽는다.
#파이썬 예제 path = "./cctvVideos/train_images/" binary_files_df = (spark.read.format("binaryFile") .option("pathGlobFilter", "*.jpg") .load(path)) binary_files_df.show(5)
JSON
복사
디렉터리에서 파티션 데이터 검색을 무시
path = "./cctvVideos/train_images/" binary_files_df = (spark.read.format("binaryFile") .option("pathGlobFilter", "*.jpg") .option("recursiveFileLookup", "true") # 옵션 추가 .load(path)) binary_files_df.show(5)
JSON
복사
⇒ label 칼럼이 존재하지 않는다.
현재는 이진 파일 데이터 소스가 데이터 프레임에서 다시 원래 파일 형식으로 쓰는 것을 지원하지 않 는다.