Search

[러닝 스파크 3장] 아파치 스파크의 정형화 API

RDD

간단한 RDD 프로그래밍 API 모델
RDD는 스파크에서 가장 기본적인 추상적 부분.
RDD의 세 가지의 핵심 특성
의존성 dependency
어떤 입력을 필요로 하고 현재의 RDD가 어떻게 만들어지는지 스파크에게 가르쳐 줌
결과를 새로 만들어야 하는 경우에 스파크는 이 의존성 정보를 참고하고 연산을 다시 반복해서 RDD를 다시 만들 수 있다
파티션(지역성 정보 포함)
스파크에게 작업을 나눠서 이그제큐터들에 분산해 파티션별로 병렬 연산할 수 있는 능력을 부여
연산 함수: Partition ⇒ Iterator[T]
RDD에 저장되는 데이터를 Iterator[T] 형태로 만들어 주는 연산 함수 compute function를 가짐
RDD의 문제
사용자가 연산 함수 안에서 무얼 하는지 스파크가 알 수 없었다.
다른 문제는 Iterator[T] 데이터 타입이 파이썬 RDD에서 불투명. 스파크에서는 단지 파이썬 기본 객체로만 인식이 가능
스파크가 함수에서의 연산이나 표현식을 검사하지 못하다 보니 최적화할 방법이 없었다.
스파크는 위에서 T로 표시한 타입에 대한 정보가 전혀 없었다.
Spark는 어떤 데이터 압축 테크닉도 적용하지 못하고, 그 정체를 알 수 없는 객체를 바이트 뭉치로 직렬화해 쓰는 것만 가능했다
스파크가 연산 순서를 재정렬해 효과적인 질의 계획으로 바꾸는 능력을 방해
스파크의 구조 확립
구조를 갖추면 스파크 컴포넌트를 통틀어 더 나은 성능과 공간 효율성 등 많은 이득을 얻을 수 있다.
각 이름별로 모든 나이들을 모아서 그룹화하고, 나이의 평균을 구함 저수준의 RDD API 패턴을 사용
#파이썬 예제#(name, age) 형태의 튜플로 된 RDD를 생성한다. dataRDD = sc.parallelize([("Brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 35), ("Brooke”, 25)]) #집계와 평균을 위한 람다 표현식과 함께 map과 reduceByKey 트랜스포메이션을 사용한다. agesRDD = (dataRDD .map(lambda x: (x[0], (x[1], 1))) .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) .map(lambda x: (x[0], x[1][0]/x[1][1])))
Python
복사
스파크에게 쿼리 를 계산하는 과정을 직접적으로 지시하고 있다.
의도가 전달되지 않기 때문에 스파크에게는 매우 분명하지 않아 보인다.
동일한 질의를 고수준 DSL 연산자들과 데이터 프레임 API를 써서, 즉 스파크에게 무엇을 할 지를 알려준다면?
#파이썬 예제 from pyspark.sql import SparkSession from pyspark.sql.functions import avg # SparkSession으로부터 데이터 프레임을 만든다 spark = (SparkSession .builder .appName("AuthorsAges") .getOrCreate()) # 데이터 프레임 생성 data_df = spark.createDataFrame([("Brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 35), ("Brooke", 25)], ["name", "age"]) #동일한 이름으로 그룹화하여 나이별로 계산해 평균을 구한다. avg_df = data_df.groupBy("name").agg(avg("age")) # 최종 실행 결과를 보여준다. avg_df.show()
Python
복사
스파크는 이런 쿼리를 파악해서 사용자의 의도를 이해할 수 있기 때문에 효과적인 실행을 위해 연산 들을 최적화하거나 적절하게 재배열할 수 있다.
읽기가 간단하다는 것 말고도 스파크의 상위 수준 API는 컴포넌트들과 언어를 통틀어 일관성을 갖고 있다.
질의를 데이터 프레임에 대해 사용하면 언제나 정형화인 데이터 형태로 데이터 프레임에서 변환하고 연산

데이터 프레임 API

판다스 데이터 프레임에 영향을 받은 스파크 데이터 프레임
이름 있는 칼럼과 스키마를 가진 분산 인메모리 테이블처럼 동작
각 칼럼은 다음과 같은 특정한 데이터 타입을 가질 수 있다
정수 integer, 문자열 string, 배열 array, 맵 map, 실수 real, 날짜date, 타임스탬프 timestamp
데이터 프레임은 불변성을 지니며 스파크는 그에 대한 모든 변경 내역(계보, Lineage)를 보관
이전 버전의 내용들을 보존한 채로도 칼럼의 이름이나 타입을 추가하거나 바꿀 수 있다.
데이터 프레임에서 이름이 붙은 칼럼과 연관 데이터 타입은 스키마에 선언 가능
스파크의 기본 데이터 타입
지원하는 프로그래밍 언어와 맞게 스파크는 기본적인 내부 데이터 타입을 지원
이 타입들은 스파크 애플리케이션에서 선언할 수도 있고, 스키마에서 정의할 수도 있다.
스파크의 정형화 타입과 복합 타입
대상 데이터는 복합적이고 자체적 구조를 따로 갖고 있거나 내부적으로 반복될 것
맵map, 배열array, 구조체struct, 날짜date, 타임스탬프 timestamp, 필드field 등 다양한 타입

스키마와 데이터 프레임 만들기

스파크에서 스키마 schema는 데이터 프레임을 위해 칼럼 이름과 연관된 데이터 타입을 정의한 것
스키마는 외부 데이터 소스에서 구조화된 데이터를 읽어 들일 때 쓰이게 된다
스파크가 데이터 타입을 추측해야 하는 책임을 덜어줌
스파크가 스키마를 확정하기 위해 파일의 많은 부분을 읽어 들이려고 별도의 잡을 만드는 것을 방지. 데이터 파일이 큰 경우, 이는 비용과 시간이 많이 드는 작업
데이터가 스키마와 맞지 않는 경우. 조기에 문제를 발견 가능
데이터 소스에서 큰 파일을 읽어야 한다면 가능한 한 반드시 스키마를 미리 지정해 두기를 권장
스키마를 정의하는 두 가지 방법
프로그래밍 스타일로 정의
스파크 데이터 프레임 API 사용
from pyspark.sql.types import * schema = StructType([ StructField("author", StringType(), False), StructField("title", StringType(), False), StructField("pages", IntegerType(), False) ])
Python
복사
DDL(data definition language)을 사용
schema = "author STRING, title STRING, pages INT"
Python
복사
from pyspark.sql.types import * from pyspark.sql import SparkSession from pyspark.sql.functions import * # 1과 2 중에 선택 # 1. DDL을 써서 스키마를 정의한다. schema = "'Id' INT, 'First' STRING, 'Last' STRING, 'Uri' STRING, 'Published' STRING, 'Hits' INT, 'Campaigns' ARRAY<STRING>" # 2. define schema for our data schema = StructType([ StructField("Id", IntegerType(), False), StructField("First", StringType(), False), StructField("Last", StringType(), False), StructField("Url", StringType(), False), StructField("Published", StringType(), False), StructField("Hits", IntegerType(), False), StructField("Campaigns", ArrayType(StringType()), False)]) #create our data data = [[1, "Jules", "Damji", "https://tinyurl.1", "1/4/2016", 4535, ["twitter", "LinkedIn"]], [2, "Brooke","Wenig","https://tinyurl.2", "5/5/2018", 8908, ["twitter", "LinkedIn"]], [3, "Denny", "Lee", "https://tinyurl.3","6/7/2019",7659, ["web", "twitter", "FB", "LinkedIn"]], [4, "Tathagata", "Das","https://tinyurl.4", "5/12/2018", 10568, ["twitter", "FB"]], [5, "Matei","Zaharia", "https://tinyurl.5", "5/14/2014", 40578, ["web", "twitter", "FB", "LinkedIn"]], [6, "Reynold", "Xin", "https://tinyurl.6", "3/2/2015", 25568, ["twitter", "LinkedIn"]] ] # main program if __name__ == "__main__": #create a SparkSession spark = (SparkSession .builder .appName("Example-3_6") .getOrCreate()) # 위에 정의했던 스키마로 데이터 프레임 생성 blogs_df = spark.createDataFrame(data, schema) # 데이터 프레임 내용을 보여준다. 위에서 만든 데이터를 보여주게 된다 blogs_df.show() print() # 데이터 프레임 처리에 사용된 스키마를 출력한다. print(blogs_df.printSchema()) # Show columns and expressions blogs_df.select(expr("Hits") * 2).show(2) blogs_df.select(col("Hits") * 2).show(2) blogs_df.select(expr("Hits * 2")).show(2) # show heavy hitters blogs_df.withColumn("Big Hitters", (expr("Hits > 10000"))).show() print(blogs_df.schema)
Python
복사
blogs_df.show(), blogs_df.printSchema()
print(blogs_df.schema)

칼럼과 표현식

데이터 프레임의 칼럼 column과 로우row, 그리고 데이터 프레임 API로 동작할 때 이들이 어떤 식으로 적용될까?
사용자는 이름으로 칼럼들을 나열해 볼 수도 있고, 관계형 표현이나 계산식 형태의 표현식으로 그 값들에 연산을 수행할 수 있다
칼럼은 공개(public) 메소드를 가진 객체로 표현된다(칼럼 타입으로 표현된다)
columnName이 스파크가 지원하는 타입일 때(integer, string 등)
expr("columnName * 5")이라든가 (expr("columnName - 5") > col(anothercolumnName)) 같은 방식으로 단순한 표현식 가능
Column은 객체의 이름이며 col()은 Column을 되돌려 주는 내장 함수
expr()도 스파크가 결과를 계산해 표현식으로 해석할 수 있는 인자를 받는다.
데이터 프레임의 Column 객체는 단독으로 존재할 수는 없다. 각 칼럼은 한 레코드의 로우의 일 부분이며 모든 로우가 합쳐져서 하나의 데이터 프레임을 구성

로우

하나의 행은 일반적으로 하나 이상의 칼럼을 갖고 있는 로우 row 객체로 표현
각 칼럼은 동일한 칼럼 타입일 수도 있고(예. 정수나 문자열) 혹은 다른 타입들일 수도 있다(정수. 문자 열, 맵, 배열 등).
ROW는 스파크의 객체이고 순서가 있는 필드 집합 객체 → 스파크의 지원 언어들에서 각 필드를 0부터 시작하는 인덱스로 접근
예제
# 파이썬 예제»> from pyspark.sql import Row blog_row = Row(6, "Reynold", "Xin", "https://tinyurl.6", 255568, "3/2/2015", ["twitter", "Linkedln"]) # 인덱스로 개별 아이템에 접근한다. blog_row[1] # 'Reynold'
Python
복사
Row 객체들은 빠른 탐색을 위해 데이터 프레임으로 만들어 사용하기도 한다.
#파이썬 rows = [Row("Matei Zaharia", "CA"), Row("Reynold Xin", "CA")] authors_df = spark.createDataFrame(rows, ["Authors","State"]) authors_df.show()
Python
복사
대부분의 경우 파일들은 규모가 크기 때문에 스키마를 미리 지정해 사용하는 것이 데이터 프레임 작성에 훨씬 더 빠르고 효율적인 방법

자주 쓰이는 데이터 프레임 작업들

DataFrameReader와 DataFrameWriter 사용하기
스파크에서 데이터를 읽고 쓰는 작업은 쉬운 편
DataFrameReader
우선 구조화된 데이터를 갖고 있는 데이터 소 스에서 데이터 프레임으로 로드
JSON, CSV, Paquet(파케이), 텍스트, 에이브로, ORC 같은 다양한 포맷의 데이터 소스에서 데이터를 읽어 데이터 프레임으로 갖고 오게 해 준다.
DataFrameWriter
동일하게 특정 포맷의 데이터 소스에 데이터 프레임의 데이터를 써서 내보내기
예제
샌프란시스코San Francisco 소방서 호출 데이터를 담고 있는 큰 CSV 파일 하나를 읽기.
28개의 칼럼과 4,380,660개 이상의 레코드가 있기 때문에 스파크가 스키마를 추측하게 두는 것보다 미리 지정해 주는 것이 훨씬 효과적
spark.read.csv() : CSV 파일을 읽어서 row 객체와 스키마에 맞는 타입의 이름 있는 칼럼들로 이루어진 데이터 프레임을 되돌려 준다.
#파이썬에서 스키마를 정의한다. from pyspark.sql.types import * # 프로그래밍적인 방법으로 스키마를 정의한다. fire_schema = StructType([ StructField('CallNumber', IntegerType(), True), StructField('UnitID', StringType(), True), StructField('IncidentNumber', IntegerType(), True), StructField('CallType', StringType(), True), StructField('CallDate', StringType(), True), StructField('WatchDate', StringType(), True), StructField('CallFinalDisposition', StringType(), True), StructField('AvailableDtTm', StringType(), True), StructField('Address', StringType(), True), StructField('City', StringType(), True), StructField('Zipcode', IntegerType(), True), StructField('Battalion', StringType(), True), StructField('StationArea', StringType(), True), StructField('Box', StringType(), True), StructField('OriginalPriority', StringType(), True), StructField('Priority', StringType(), True), StructField('FinalPriority', IntegerType(), True), StructField('ALSUnit', BooleanType(), True), StructField('CallTypeGroup', StringType(), True), StructField('NumAlarms', IntegerType(), True), StructField('UnitType', StringType(), True), StructField('UnitSequencelnCallDispatch', IntegerType(), True), StructField('FirePreventionDistrict', StringType(), True), StructField('SupervisorDistrict', StringType(), True), StructField('Neighborhood', StringType(), True), StructField('Location', StringType(), True), StructField('RowID', StringType(), True), StructField('Delay', FloatType(), True) ]) # DataFrameReader 인터페이스로 CSV 파일을 읽는다 sf_fire_file = "/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv" fire_df = spark.read.csv(sf_fire_file, header=True, schema=fire_schema)
Python
복사
스키마를 미리 지정하고 싶지 않다면, 스파크가 적은 비용으로 샘플링해서 스키마를 추론할 수 있게 할 수는 있다. 예를 들면 다음처럼 samplingRatio 옵션을 적용하는 것이 가능
//scala val sampleDF = spark .read .option("samplingRatio", 0.001) .option("header", true) .csv("""/databricks-datasets/learning-spark-v2/ sf-fire/sf-fire-calls.csv""")
Scala
복사

데이터 프레임을 파케이 파일이나 SQL 테이블로 저장하기

데이터를 탐색하고 변환한 후 파케이 포맷이나 SQL 테이블로 데이터를 저장하는 것
# 파이썬에서 파케이로 저장 parquet_path = ... fire_df.write.format("parquet").save(parquet_path)
Python
복사
하이브 메타스토어에 메타데이터로 등록되는 테이블로 저장할 수 있다
# 파이썬 예제 parquet_table = ... # 테이블 이름 fire_df.write.format("parquet").saveAsTable(parquet_table)
Python
복사

트랜스포메이션과 액션

먼저 하게 될 것은 칼럼들이 어떻게 구성되어 있는지 살펴보는 것
프로젝션 projection
관계형 DB 식으로 말하면 필터를 이용해 특정 관계 상태와 매치되는 행들만 되돌려 주는 방법
프로젝션은 select() 메서드로 수행
필터는 filter()나 where() 메서드로 표현
#파이썬 예제 few_fire_df = (fire_df .select("IncidentNumber", "AvailableDtTm", "CaUType") .where(col("CallType") != "Medical Incident")) few_fire_df.show(5, truncate=False)
Python
복사
화재 신고로 기록된 CallType 종류가 몇 가지인지 알고 싶다면
#파이썬 예제, countDistinctQ를 써서 신고 타입의 개수를 되돌려 준다. from pyspark.sql.functions import * (fire_df .select ("CaUType") .where(col("CallType").isNotNull()) .agg(countDistinct("CallType").alias("DistinctCallTypes")) .show())
Python
복사
null이 아닌 신고 타입의 목록
# 파이썬 예제, 모든 행에서 null이 아닌 개별 CallType을 추출한다. (fire_df .select("CallType") .where(col("CallType").isNotNull()) .distinct() .show(10. False))
Python
복사

칼럼의 이름 변경 및 추가 삭제

스타일이나 컨벤션 준수의 이유로, 혹은 가독성이나 간결성을 위해 특정 칼럼의 이름을 바꿔야 할 때가 있다.
칼럼 이름에 포함되는 공백들은 공백이 허용되지 않는 파케이 파일 포맷으로 써야 한다면 더욱 문제가 될 수 있다.
칼럼 이름 변경 방법들
StructField를 써서 스키마 내에서 원하는 칼럼 이름들을 지정하면 결과 데이터 프레임에서 원하는 대로 칼럼 이름이 출력
withColumnRenamed()
원하는 이름으로 변경
데이터 프레임 변형은 변경 불가 방식으로 동작 ⇒ withColumRenamed로 칼럼 이름을 변경할 때는, 기존 칼럼 이름을 갖고 있는 원본을 유지한 채로 칼럼 이름이 변경된 새로운 데이터 프레임을 받아 오게 된다.
#파이썬 예제 new_fire_df = fire_df.withColumnRenamed("Delay", "ResponseDelayedinMins") (new_fire_df .select("ResponseDelayedinMins") .where(col("ResponseDelayedinMins") > 5) .show(5. False))
Python
복사
# 파이썬 예제 (fire_ts_df .select(year('IncidentDate')) .distinct() .orderBy(year('IncidentDate')) .show())
Python
복사
spark.sql.functions 패키지
사용할 to_timestamp()나 to_date() 같은 to/from - date/timestamp 이름의 함수들이 존재
칼럼 ⇒ 사용할 만한 타입으로의 변환
# 파이썬 예제 fire_ts_df = (new_fire_df .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")) .drop("CallDate") .withColumn("WatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy")) .drop("WatchDate") .withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ssa")) .drop("AvailableDtTm")) # 변환된 칼럼들을 가져온다. (fire_ts_df.select("IncidentDate", "OnWatchDate", "AvailableDtTS") .show(5. False))
Python
복사
1. 기존 칼럼의 데이터 타입을 문자열에서 스파크에서 지원하는 타임스탬프 타입으로 변환 2. “MM/dd/yyyy”나 “MM/dd/yyyy hh:mm:ss a” 같은 적절한 포맷 문자열로 새로운 시간 표시 포맷을 지정해 사용 3. 새로운 데이터 타입으로 변환 후, 예전 칼럼은 drop()으로 삭제하고 새로운 칼럼은 withColumn() 함수의 첫 번째 인자의 이름으로 덧붙인다. 4. 새로이 수정된 데이터 프레임을 fire_ts_df 변수에 담는다
수정된 날짜/시간 칼럼을 가지게 됨
⇒ spark.sql.functions에서 dayofmonth(). dayofyear(), dayofweek() 같은 함수들을 써서 질의 가능
# 파이썬 예제 (fire_ts_df.select(year('IncidentDate')) .distinct() .orderBy(year('IncidentDate')) .show())
Python
복사

집계 연산

데이터를 칼럼의 어떤 값들끼리 그룹화하고 단순하게는 개수를 세 는 것
groupBy(), orderBy(), count()와 같이 데이터 프레임에서 쓰는 일부 트랜스포메이션과 액션은 칼럼 이름으로 집계해서 각각 개수를 세어주는 기능을 제공한다.
#파이썬 예제 (fire_ts_df .select("CallType") .where(col("CallType") .isNotNull()) .groupBy("CallType") .count() .orderBy("count", ascending=False) .show(n=10, truncate=False) )
Python
복사
데이터 프레임 API는 collect() 함수를 제공하지만 극단적으로 큰 데이터 프레임에서는 메모리 부족 예외 (out of memory, OOM)를 발생시킬 수 있기 때문에 자원도 많이 쓰고 위험하다 드라이버에 결과 숫자 하나만 전달하는 count()와는 달리 collect()는 전체 데이터 프레임 혹은 데이터세트의 모든 Row 객체 모음을 되돌려 준다. 몇 개의 Row 결과만 보고 싶다면 최초 n개의 Row 객체만 되돌려 주는 take(n) 함수를 쓰는 것이 훨씬 나을 것

그 외의 데이터 프레임 연산

데이터 프레임 API는 min(), max(), sum(), avg() 등 이름만으로 알 수 있을 만한 통계 함수들을 지원
샌프란시스코 소방서 데이터 연산 예제
# 파이썬 예제 import pyspark.sql.functions as F (fire__ts_df .select(F.sum("NumAlarms"), F.avg("ResponseDelayedinMins"), F.min("ResponseDelayedinMins"), F.max("ResponseDelayedinMins")) .show())
Python
복사
경보 횟수의 합, 응답시간 평균, 모든 신고에 대한 최소/최장 응답시간 등을 계산하면서 pyspark 함수들을 파이썬 식으로 가져다 썼으므로 내장 파이썬 함수들과는 충돌하지 않는다
stat(), describeO, correlation(). covariance(), sampleBy(), approxQuantile(), frequentltems() 등 고수준의 pyspark API 메서드들도 지원
RDD로 위의 질의와 똑같이 하려고 한다면 가독성이나 투명성이 몹시 떨어질 것

데이터세트 API

한 종류의 API만 알면 되게 하기 위해 데이터 프레임과 데이터세트 API를 유사한 인터페이스를 갖도록 정형화 API로 일원화했다
데이터 세트는 정적 타입 API, 동적 타입 API의 두 특성 모두 가진다.
스칼라의 데이터 프레임
공용 객체 모음인 Dataset[ROW]의 다른 이름
Row = 서로 다른 타입의 값을 저장할 수 있는 포괄적 JVM 객체
데이터세트는 스칼라에서 엄격하게 타입이 정해진 JVM 객체의 집합(자바에서는 클래스)
스파크 문서에서의 데이터 세트 정의
함수형/관계형 작업에서 병렬적으로 변형 가능한, 작업 분야에 특화된 객체들의 묶음이며 엄격하게 타입을 가짐. (스칼라의) 각 데이터세트는 동시에 데이터 프레임이라고 불리는 동적 타입 뷰를 가지며 이는 Row 타입의 데이터세트다

정적 타입 객체, 동적 타입 객체, 포괄적인 ROW

스파크가 지원하는 언어들에서 데이터세트는 자바와 스칼라에서 통용.
타입은 변수와 객체에 컴파일 시점에 연결
스칼라에서는 DataFrame은 타입 제한이 없는 DataSet[Row]의 단순한 다른 이름일 뿐.
반대로 파이썬과 R에서는 데이터 프레임만 사용 가능.
파이썬과 R이 컴파일 시 타입의 안전을 보장하는 언어가 아니기 때문
타입은 동적으로 추측 X, 컴파일 때 X, 실행 시에 정해짐
Row
스파크의 포괄적 객체 타입
(배열처럼) 인덱스를 사용하여 접근할 수 있으며 다양한 타입 의 값들을 담을 수 있음
스파크 내부적으로 Row 객체를 사용 가능한 타입으로 변환하여 반환
#파이썬 예제 from pyspark.sql import Row row = Row(350, True, "Learning Spark 2E", None)
Python
복사
Row 안에 있는 Int 타입은 스칼라나 자바와 파이썬에서 각각 적절 하게 IntegerType이나 IntegerType()으로 변환
Row 객체에 공개되어 있는 게터 getter류 함수들에 인덱스를 사용해 개별 필드에 접근
# 파이썬 예제 row[0] Out[13]: 350 row[1] Out[14]: True row[2] Out[15]: 'Learning Spark 2E'
Python
복사
정적 객체들은 JVM에서 실제 자바 클래스나 스칼라 클래스가 된다. 그러므로 데이터세트의 각 아이템들은 곧바로 하나의 JVM 객체가 되어 쓸 수 있다.

데이터 세트의 생성

데이터 소스에서 데이터 프레임을 만들 때처럼, 데이터세트를 만들 때에도 해당 스키마를 알아야 한다. 즉, 데이터 타입들을 모두 알고 있어야 한다. JSON이나 CSV 데이터라면 스키마 추론이 가능하겠지만 대용량 데이터에서는 이런 작업은 비용 지향적.
스칼라의 케이스 클래스
자신만의 특화된 객체를 데이터세트로 초기화해서 만듦
각 JSON 엔트리를 특화 객체인 DeviceloTData로 만들기 위해 스칼라 케이스 클래스를 정의
케이스 클래스를 정의한 이후에는 파일을 읽어서 Dataset[Row]를 Dataset[DeviceIoTData]로 바꾸는 데에 사용 가능

데이터 세트 vs 데이터 프레임

스파크에게 어떻게 하는지가 아니라 무엇을 해야 하는지 말하고 싶으면 데이터 프레임이나 데이터 세트를 사용
풍부한 표현과 높은 수준의 추상화 및 DSL 연산을 원한다면 데이터 프레임이나 데이터세트를 사용
자신의 작업이 높은 수준의 표현력, 필터, 맵. 집계, 평균과 합계 계산, SQL 질의, 칼럼 지향 접근, 반정형화된 데이터에 대한 관계형 연산 등이 필요하다면 데이터 프레임이나 데이터세트를 사용
실행 시에 발생하는 에러를 찾기보다 컴파일 시에 발생하는 에러를 찾고 싶다면 아래의 표를 참고
데이터 세트
스칼라와 자바
데이터세트에서 우리가 수행할 수 있는 작업들은 데이터 프레임에서와 유사한 filter(), map(), groupBy(), select(), take() 등이 존재
데이터세트는 함수들의 형태나 컴파일 타임 안전성을 보장한다는 점에서 RDD와 유사하지만 훨씬 읽기 쉬우며 객체 지향 프로그래밍 인터페이스를 가짐
하부의 스파크 SQL 엔진이 JVM 객체의 생성, 변환, 직렬화, 역직렬 화를 담당
데이터세트 인코더의 도움을 받아 자바의 오프힙 메모리 관리
사용처
컴파일 타임에 엄격한 타입 체크를 원하며 특정한 Dataset[T]를 위해 여러 개의 케이스 클래스를 만드는 것에 부담이 없다면 데이터세트를 사용
인코더Encoder를 써서 프로젝트 텅스텐의 직렬화 능력을 통한 이득을 보고 싶다면 데이터세트를 사용
데이터 프레임
스칼라, 파이썬, R
사용처
자신의 작업이 SQL과 유사한 질의를 쓰는 관계형 변환을 필요로 할 경우 사용
일원화. 코드 최적화. 스파크 컴포넌트들 사이에서의 API 단순화 등을 원한다면 데이터 프레임을 사용
파이썬 사용자라면 데이터 프레임을 쓰되, 제어권을 좀 더 갖고 싶으면 RDD로 바꿔 사용
공간/속도의 효율성을 원하면 사용

언제 RDD를 사용하는가

정형화 API의 본질 = 효과적인 질의를 구축하고 간소한 코드를 생성하는 과정이 스파크 SQL 엔진이 하는 일
RDD 사용을 고려해보아야 할 만한 시나리오
RDD를 사용하도록 작성된 서드파티 패키지를 사용.
데이터 프레임과 데이터세트에서 얻을 수 있는 코드 최적화, 효과적인 공간 사용, 퍼포먼스의 이득을 포기 가능하다면.
스파크가 어떻게 질의를 수행할지 정확하게 지정해 주고 싶을 경우.
데이터세트나 데이터 프레임에서 RDD로 가기 위해서는 단순한 API 함수인 df.rdd 호출 = 변환 비용 발생
데이터 프레임과 데이터세트는 RDD에 기반해서 만들어졌고 전체 단계 코드 생성 중에 최소화된 RDD 코드로 분해

스파크 SQL과 하부의 엔진

스파크 SQL은 개발자들이 스키마를 가진 정형화 데이터에 ANSI SQL 2003 호환 질의를 사용 가능하게 만듦
SQL 같은 질의를 수행하게 해 주는 것 외 에도 스파크 SQL 엔진은 다음과 같은 일을 수행
1.
스파크 컴포넌트들을 통합하고 데이터 프레임/데이터 세트가 자바, 스칼라. 파이썬 R 등으로 정형화 데이터 관련 작업을 단순화할 수 있도록 추상화를 해줌
2.
아파치 하이브 메타스토어와 테이블에 접근.
3.
정형화된 파일 포맷 (JSON, CSV, 텍스트, 에이브로, 파케이, ORC 등)에서 스키마와 정형화 데이터를 읽고 쓰며 데이터를 임시 테이블로 변환.
4.
빠른 데이터 탐색을 할 수 있도록 대화형 스파크 SQL 셸을 제공.
5.
표준 데이터베이스 JDBC/ODBC 커넥터를 통해 외부의 도구들과 연결할 수 있는 중간 역할을 수행 
6.
최종 실행을 위해 최적화된 질의 계획과 JVM을 위한 최적화된 코드를 생성

스파크 SQL 엔진의 핵심

카탈리스트 옵티마이저텅스텐 프로젝트
상위 수준의 데이터 프레임과 데이터세트 API 및 SQL 쿼리 등을 지원
카탈리스트 옵티마이저
연산 쿼리를 받아 실행 계획으로 변환
1. 분석  2. 논리적 최적화  3. 물리 계획 수립  4. 코드생성
사용하는 언어에 상관없이 사용자가 실행한 작업은 동일한 여정을 거쳐 같은 실행 계획 생성과 바이트 코드에 결과적으로 이르게 된다
# 파이썬 예제 count_mnm_df = (mnm_df .select("State", "Color", "Count") .groupBy("State", "Color") .agg(count("Count") .alias("Total")) .orderBy("Total", ascending=False)) # SQL 예제 SELECT State, Color, Count, sum(Count) AS Total FROM MNM_TABLE_NAME GROUP BY State, Color, Count ORDER BY Total DESC
Python
복사
파이썬 코드가 거치게 되는 다른 스테이지들을 보려면 데이터 프레임에서 count_mnm_ df.explain(True) 함수를 실행하면 된다.
스칼라에서는 df.queryExecution.logical이나 df.queryExecution.optimizedPlan을 실행

네 단계의 쿼리 최적화 과정

예시 코드
// 스칼라 예제 // 파케이 테이블에서 읽어 온 users 데이터 프레임 val usersDF = ••. // 파케이 테이블에서 읽어 온 events 데이터 프레임 val eventsDF = ... // 두 데이터 프레임을 조인한다. val joinedDF = users .join(events, users("id") === events("uid")) .filter(events("date") > "2015-01-01")
Python
복사
단계별 분석
1단계 : 분석
스파크 SQL 엔진은 SQL이나 데이터 프레임 쿼리를 위한 추상 문법 트리(abstract syntax tree, AST) 생성 으로 시작. 초기 단계에서는 어떤 칼럼이나 테이블 이름이든 칼럼, 데이터 타입, 함수, 테이블, 데이터베이스 이름 목록을 갖고 있는 스파크 SQL의 프로그래밍 인터페이스인 Catalog 객체로 접근 하여 가져올 수 있음
2단계 : 논리적 최적화
내부적으로 두 가지 단계
표준적인 규칙을 기반으로 하는 최적화 접근 방식을 적용하면서 카탈리스트 옵티마이저는 먼저 여러 계획들을 수립
이후, 비용 기반 옵티마이저(cost based optimizer, CBO)를 써서 각 계획에 비용을 책정.
이 계획들은 연산 트리들로 배열됨
예 ) 이 트리들은 조건절 하부 배치, 칼럼 걸러내기, 불리언 Boolean 연산 단순화 등을 포함.
이렇게 수립된 논리 계획은 물리 계획 수립의 입력 데이터가 됨.
3단계 : 물리 계획 수립
스파크 실행 엔진에서 선택된 논리 계획을 바탕으로 대응되는 물리적 연산자를 사용해 최적화된 물리 계획을 생성
4단계 : 코드 생성
각 머신에서 실행할 효율적인 자바 바이트 코드를 생성하는 것을 포함.
스파크 SQL은 메모리에 올라와 있는 데이터 집합을 다룸
스파크는 실행 속도를 높이기 위한 코드 생성을 위해 최신 컴파일러 기술을 사용
포괄(whole-stage) 코드 생성을 가능하게 하는 프로젝트 텅스텐이 역활
포괄 코드 생성이란?
물리적 쿼리 최적화 단계로 전체 쿼리를 하나의 함수로 합치면서 가상 함수 호출이나 중간 데이터를 위한 CPU 레지스터 사용을 없앰