Search

[러닝스파크 6장-2] Spark Dataset 개념과 이해

DataSet이 아직도 이해가 안가는 그대, 조금은 아는 척 할 수 있도록.

DataSet 개념

1.
우선, DataSet은 구조적 api 의 기본 데이터 타입입니다. DataFrame 또한 row 타입의 DataSet 입니다. 그리고, 스파크가 지원하는 다양한 언어에서 사용할 수 있습니다.
2.
DataSet 의 경우는 jvm 을 이용하는 언어인 스칼라와 자바에서만 사용할 수 있습니다. DataSet을 이용해 DataSet의 각 로우를 구성하는 객체를 정의합니다.
a.
스칼라에서는 스키마가 정의된 케이스 클래스 객체를 이용해 DataSet을 정의합니다.
b.
자바에서는 자바빈 객체를 이용해 DataSet을 정의합니다.
3.
도메인별 특정 객체를 효과적으로 지원하기 위해 인코더(encoder)라 불리는 특수 개념이 필요합니다.
a.
인코더는 도메인별 특정 객체 T를 스파크 내부 데이터타입으로 매핑하는 시스템을 의미합니다.
b.
예를 들어, Person 클래스는 name(string)과 age(int) 필드를 가집니다. 인코더는 런타임 환경에서 Person 객체를 바이너리구조로 생성하도록 스파크에게 지시합니다. DataFrame이나 구조적 API를 이용하면 Row 타입을 직렬화된 바이너리구조로 변환합니다. 만약 도메인에 특화된 객체를 이용하려면, 스칼라의 케이스 클래스(case class)자바 빈 형태로 사용자 정의 데이터타입을 정의해야 합니다.
4.
DataSet API를 이용한다면, 스파크는 데이터셋에 접근할 때마다 기존의 Row 포맷이 아닌 사용자 정의 데이터타입으로 반환합니다.
a.
이 변환 작업은 느리긴하지만, 사용자에게 더 많은 유연성을 제공합니다.
b.
하지만 사용자 정의데이터 타입을 이용하면, 성능은 나빠지게 됩니다. 이게 파이썬 udf 보다 자릿수가 달라질 정도로 훨씬 느립니다. 그 이유는 프로그래밍 언어를 전환하는 것이 사용자 정의 데이터 타입을 사용하는 것 보다 훨씬 느리기 때문입니다.

DataSet 을 이용하는 시기

DataSet을 사용하는 경우에 대해 정리해봅시다. "DataSet을 이용하면, 성능이 떨어지는데, 사용할 이유가 있을까?" 라는 질문에 대해 답변해보겠습니다.
사용하는 경우와 장점
DataFrame API의 기능만으로 수행할 연산을 표현할 수 없는 경우
복잡한 비즈니스 sql 이나, DataFrame대신, 단일 함수로 인코딩해야하는 경우
성능 저하를 감수하더라도 타입 안정성(type safe)을 가진 데이터 타입을 사용하고 싶은 경우
또한, DataSet API는 타입 안정성이 있습니다.
예를 들어 두 개의 문자열을 사용해 뺄셈 연산을 하는 것 처럼 데이터 타입이 유효하지 않은 작업은 런타임이 아닌 컴파일 타임에 오류가 발생합니다.
위와 같이 정확도와 방어적 코드를 가장 중요시해야하는 경우라면, 성능을 희생하더라도, DataSet을 이용하는 것이 좋은 선택일 수 있습니다.
DataSet API를 이용하면, 잘못된 데이터로부터 애플리케이션을 보호할수는 없지만, 보다 우아하게(?) 데이터를 제어하고 구조화할 수 있습니다.
단일 노드의 워크로드와 스파크 워크로드에서 전체 로우에 대한 다양한 트랜스포메이션을 재사용하려면, DataSet을 사용하는 것이 적합합니다.
스칼라를 사용해본 경험이 있다면, 스파크 api는 스칼라 sequence 타입의 api 가 일부 반영돼있지만, 분산 방식으로 동작하는 것을 알 수 있습니다. (일단 저는 경험이 없습니다 훗)
결국 DataSet을 사용하는 장점 중 하나는 로컬과 분산 환경에 재사용할 수 있는 것입니다.
케이스 클래스로 구현된 데이터 타입을 이용해 모든 데이터와 트랜스포메이션을 정의하면, 재사용할 수 있습니다.
또한, 올바른 클래스와 데이터 타입이 지정된 dataframe을 로컬 디스크에 저장하면,다음 처리 과정에 이용할수 있어, 더 쉽게 데이터를 다룰 수 있습니다.
더 적합한 워크 로드(work load)를 만들기 위해 DataFrame과 DataSet을 동시에 이용해야할 수 있습니다. ⇒ 이 경우, 성능과 타입 안정성을 위해 반드시 희생할 수 밖에 없습니다. 이러한 방식은 대량의 DataFrame 기반의 etl 트랜스포메이션의 마지막 단계에서 이용할 수 있습니다.
예를 들어, 드라이버로 데이터를 수집해, 단일 노드의 라이브러리로 수집된 데이터를 처리해야하는 경우 입니다.
반대로 트랜스포메이션의 첫번째 단계에서 이용할 수 있습니다. 예를 들어 스파크 sql 에서 필터링 전에 로우 단위로 데이터를 파싱하는 경우 입니다.

실습

실습 준비를 위한 3가지 방법
## 1번째 방법 - spark shell ./bin/spark-shell ------- ## 2번째 방법 - 로컬 주피터 노트북 pip install spylon-kernel # or conda install -c conda-forge spylon-kernel python -m spylon_kernel install # spylon-kernel은 scala 언어를 지원합니다. ------- ## 3. docker jupyter notebook 사용 docker run -p 8888:8888 -p 4040:4040 -e GRANT_SUDO=yes --user root -e JUPYTER_ENABLE_LAB=yes -v "$(pwd)":/home/jovyan --name jupyter --restart always jupyter/all-spark-notebook sudo apt-get update sudo apt-get install default-jdk sudo apt-get install scala pip install spylon-kernel sudo python -m spylon_kernel install jupyter kernelspec list --> spylon_kernel이 추가되어있을 것
Shell
복사

자바

Encoders

WebSocket용 Java API는 인코더 및 디코더를 사용하여 WebSocket 메시지와 사용자 정의 Java 유형 간의 변환을 지원합니다. 인코더는 Java 개체를 가져와 WebSocket 메시지로 전송할 수 있는 표현을 생성합니다. 예를 들어 인코더는 일반적으로 JSON, XML 또는 이진 표현을 생성합니다. 디코더는 역기능을 수행합니다. WebSocket 메시지를 읽고 Java 객체를 생성합니다.
데이터 타입 클래스를 정의하고 DataFrame에 지정해 인코딩
import org.apache.spark.sql.Encoders; public class Flight implements Serializable{ String DEST_COUNTRY_NAME; String ORIGIN_COUNTRY_NAME; Long DEST_COUNTRY_NAME; }
Scala
복사
Dataset flights = spark.read.parquet('../2010_summary.parquet/').as(Encoders.bean(Flight.class))
Scala
복사

Java Bean

자바빈(JavaBean)은 인수 없는 생성자, 게터/세터 쌍, 기타 메서드로 구성된 클래스를 의미합니다.
자바빈 클래스란, 데이터를 캡슐화하고, 이 데이터에 접근할 수 있는 게터(getter)와 세터(setter) 메서드를 제공하는 자바 클래스를 의미
게터와 세터는 반드시 정해진 패턴을 따라야 합니다.
자바빈즈의 사양은 썬 마이크로시스템즈 에서 다음과 같이 정의되었습니다. "빌더 형식의 개발도구에서 가시적으로 조작이 가능하고 또한 재사용이 가능한 소프트웨어 컴포넌트입니다."
public void setProperty(Type newValue) // 세터 메서드 // 이 메서드는 Type 타입의 값을 클래스의 속성으로 설정하는 데 사용됩니다. // Type 클래스의 newValue 인자를 받아들여 클래스의 프라이빗(private) 변수에 값을 설정 // newValue 매개변수로 전달된 값은 클래스의 속성에 저장됩니다. public Type getProperty() // 게터 메서드 // 이 메서드는 클래스의 속성 값을 반환하는 데 사용됩니다. // Type 클래스의 프라이빗(private) 변수의 값을 반환 // 반환되는 값은 Type 타입입니다.
Scala
복사

스칼라

스칼라: 케이스 클래스

스칼라에서 Dataset을 생성하려면 스칼라 case class 구문을 사용해 데이터 타입을 정의해야함
케이스 클래스는 다음과 같은 특징을 가진 정규 클래스임
불변성
객체들이 언제 어디서 변경되었는지 추적할 필요 없음
패턴 매칭으로 분해 가능
로직 분기(로직의 트리 분기)를 단순화해 버그를 줄이고 가독성을 좋게 만듦
참조값 대신 클래스 구조를 기반으로 비교
값으로 비교하면 인스턴스를 마치 원시 데이터 타입의 값처럼 비교함
따라서, 클래스 인스턴스가 값으로 비교되는지, 참조로 비교되는지 더는 불확실하지 않게됨
사용하기 쉽고 다루기 편함
예제
// 레코드를 표현할 case class 정의(Flight 데이터 타입의 Dataset) case class Flight( DEST_COUNTRY_NAME:String, ORIGIN_COUNTRY_NAME: String, count: BigInt )
Scala
복사
// 데이터를 읽어야 DataFrame이 반환 val flightsDF = spark.read.parquet("./2010-summary.parquet") // as 메서드로 Flight 데이터 타입으로 변환 val flights = flightsDF.as[Flight]
Scala
복사

액션

Dataset과 DataFrame에 collect, take, count 같은 액션을 적용할 수 있음
flights.show(2)
Scala
복사
케이스 클래스에 실제로 접근할 때 어떠한 데이터 타입도 필요하지 않음
flights.first.DEST_COUNTRY_NAME // United States
Scala
복사
case class 속성명을 지정하면 속성에 맞는 값과 데이터 타입 모두를 반환함

트랜스포메이션

Dataset 트랜스포메이션은 DataFrame과 동일
Dataset을 사용하면 원형의 JVM 데이터 타입을 다루므로 DataFrame만 사용해서 트랜스포메이션을 수행하는 것보다 좀 더 복잡하고 강력한 데이터 타입으로 수행 가능

1. 필터링

필터링은 단순한 트랜스포메이션
ex) 출발지가 도착지와 동일한지 반환하는 함수 생성
// Flight 클래스를 파라미터로 사용해 불리언값을 반환하는 함수 def originIsDestination(flight_row:Flight):Boolean = { return flight_row.ORIGIN_COUNTRY_NAME == flight_row.DEST_COUNTRY_NAME } // 위에서 적용한 함수 테슽 flights.filter(flight_row => originIsDestination(flight_row)).first() flights.collect().filter(flight_row => originIsDestination(flight_row)).first()
Scala
복사
위 함수는 사용자 정의 함수가 아닌 일반 함수임
따라서 모든 로우를 평가하므로 매우 많은 자원을 사용
단순 필터라면 SQL표현식을 사용하는 것이 좋음

2. 매핑

특정 값을 다른 값으로 매핑
ex) 로우의 특정 컬럼값을 추출하는 것
// 목적지 컬럼을 추출하여 매핑 val destinations = flights.map(f => f.DEST_COUNTRY_NAME) // 결과값을 모아 문자열 타입의 배열로 반환 val localDestinations = destinations.take(5)
Scala
복사
스파크는 결과로 반환할 JVM 데이터 타입을 알고 있기에 컴파일 타임에 데이터 타입의 유효성을 검사할 수 있음

3. 조인

joinWith 메서드 제공
각 컬럼은 단일 Dataset이므로 Dataset 객체를 컬럼처럼 다룰 수 있음
따라서 조인 수행 시 더 많은 정보를 유지할 수 있고, 고급 맵이나 필터처럼 정교하게 데이터를 다룰 수 있다.
joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)] articles .joinWith(views, articles("id") === views("articleId"), "left") .map { case (a, null) => AuthorViews(a.author, 0) case (a,v) => AuthorViews(a.author, v.viewCount) }
Scala
복사

4. 그룹화와 집계

기본 표준을 따르므로 groupBy, rollup, cube 메서드를 사용할 수 있음
하지만 Dataset 대신 DataFrame을 반환하므로 데이터 타입 정보를 잃음
근데 유지할 방법이 있긴 함 → groupByKey
groupByKey 메서드: Dataset의 특정 키를 기준으로 그룹화하고 형식화된 Dataset 반환
groupByKey 메서드의 파라미터로 컬럼명 대신 함수를 사용하여 유연성을 얻을 수 있음
하지만 스파크는 함수와 JVM 데이터 타입을 최적화할 수 없으므로 트레이드오프가 발생
flights.groupBy('DEST_COUNTRY_NAME').count()
Scala
복사
flights.groupByKey(x => x.DEST_COUNTRY_NAME).count()
Scala
복사

샘플 데이터 세트 생성

샘플 데이터세트를 생성하는 간단하고 동적인 방법 중 하나는 SparkSession 인스턴스를 사용하는 것입니다.
다음 스칼라 객체를 동적으로 생성하여 사용자의 고유 ID, 임의 생성된 사용자 이름 문자열(unname), 그리고 서버 또는 서비스 사용 시간(분)을 포함하는 usage 필드를 가집니다:
import scala.util.Random._ // 데이터세트를 위한 케이스 클래스 case class Usage(uid: Int, uname: String, usage: Int) val r = new scala.util.Random(42) // 스칼라 Usage 클래스의 1000개 인스턴스 생성 // 데이터를 즉시 생성 val data = for (i <- 0 to 1000) yield { Usage(i, "user-" + r.alphanumeric.take(5).mkString(","), r.nextInt(1000)) } // Usage 형태의 데이터세트 생성 val dsUsage = spark.createDataset(data) dsUsage.show(10)
Scala
복사
참고 : 자바에서는 명시적 인코더를 사용해야 한다 ⇒ 스칼라에서 스파크는 암시적으로 처리
무작위 시드 알고리즘이 다를 수 있기 때문에 스칼라와 자바 사이에 생성된 데이터세트가 다를 것이다. 따라서 스칼라와 자바의 쿼리 결과는 서로 다르다.

샘플 데이터 변환

데이터세트는 도메인별 객체의 강력하게 정형화된 컬렉션
객체는 함수적인 또는 관계적인 연산을 사용하여 병렬로 변환 가능
변환의 예 : map(), reduce(),filter(), select(), aggregate()
⇒ 위의 병렬 변환을 람다, 클로저 또는 함수를 인수로 사용하고 결과를 반환할 수 있다
스칼라는 함수형 프로그래밍 언어이고, 최근에는 람다, 함수형 인수, 클로저가 자바에도 추가되었다.
스파크에서 고차 함수를 몇 가지 사용해보고 앞에서 생성한 샘플 데이터와 함께 함수형 프로그래밍 구조를 사용해보자

고차 함수 및 함수형 프로그래밍

filter 메서드를 사용하여 사용량이 900분을 초과하는 dsUsage 데이터세트의 모든 사용자를 반환하는 예제
import org.apache.spark.sql.functions._ // filter() 함수에 대한 인수로 함수 표현식을 사용하는 것 dsUsage .filter(d => d.usage > 900) .orderBy(desc("usage")) .show(5, false) // => filter 방법에 대한 인수로 람다식 {d.usage > 900}을 사용 // 함수를 정의하고 해당 함수를 filter 함수의 인수로 제공 def filterWithUsage(u: Usage) = u.usage > 900 dsUsage .filter(filterWithUsage(_)) .orderBy(desc("usage")) .show(5) // => def filterWithUsage(u: Usage) = u.dump > 900을 사용하여 스칼라 함수를 정의
Scala
복사
두 경우 모두 filter 함수는 분산된 데이터 세트에서 Usage 객체의 각 행에 람다 식을 적용하거나 함수를 실행하여 값이 참인 행에 대해 새로운 Usage 데이터 세트를 반환합니다.
모든 람다 또는 함수 인수가 불린값으로 평가될 필요는 없으며, 계산된 값도 반환될 수 있습니다.
예로 들어, 아래 예제의 목표는 고차 함수 map()을 사용하여 특정 임계치를 초과하여 사용한 사용자에 대한 사용 비용을 분당 특별 가격으로 제공하는 것입니다.
// if-then-else 람다식을 사용하여 값을 계산 dsUsage .map { u => if (u.usage > 750) u.usage * 0.1 else u.usage * 0.50 } .show(5, false) // 사용량을 계산하는 함수를 정의 def computeCostUsage(usage: Int): Double = { if (usage > 750) usage * 0.15 else usage * 0.50 } // map()에 인자로서 함수를 사용 dsUsage .map(u => computeCostUsage(u.usage)) .show(5, false)
Scala
복사
계산값이 어떤 사용자와 연관되어 있는지 알 수 없습니다. 이 정보를 어떻게 얻을 수 있을까요?
1.
UsageCost라는 스칼라 케이스 클래스 또는 자바빈 클래스를 생성하기 위해 추가 필드를 사용합니다.
2.
비용을 계산하는 함수를 정의하여 map() 함수에 사용합니다.
// cost 필드를 포함한 새로운 케이스 클래스 생성 case class UsageCost(uid: Int, uname: String, usage: Int, cost: Double) // Usage를 인수로 사용하여 사용 비용을 계산 // 새로운 UsageCost 객체 반환 def computeUserCostUsage(u: Usage): UsageCost = { val v = if (u.usage > 750) u.usage * 0.15 else u.usage * 0.50 UsageCost(u.uid, u.uname, u.usage, v) } // 기존 데이터세트에 map()을 사용 dsUsage .map(u => computeUserCostUsage(u)) .show(5)
Scala
복사
축하드립니다. 만약 이 코드가 다 돌아갔다면 map() 변환의 함수에 의해 계산된 새로운 열, 비용을 가진 변환된 데이터 세트와 다른 모든 열을 가지고 있게 됩니다

데이터 프레임을 데이터세트로 변환

데이터 프레임을 데이터 세트로 변환하여 쿼리 및 구조의 강력한 유형을 확인할 수 있습니다.
기존 데이터 프레임 df를 SomeCaseClass 유형의 데이터 집합으로 변환하려면 df.as[SomeCaseClass] 표기법을 사용하면 됩니다.
val bloggersDS = spark .read .format("json") .option("path", "./blogs.json") .load() .as[Blogger]
Scala
복사
spark.read.format("json")은 DataFrame<Row>을 반환합니다. ⇒ 스칼라에서 Dataset[Row]의 형식 별칭입니다.
.as[Blogger]를 사용하면 스파크가 인코더를 사용하여, 스파크의 내부 메모리 표현에서 JVM Blogger 객체로 직렬화/역직렬화하도록 지시할 수 있습니다.

데이터세트 및 데이터 프레임을 위한 메모리 관리

스파크가 통합된 고차원의 API의 일부인 데이터세트 구성을 수용하기 위해 어떻게 메모리를 관리하는지. 데이터세트 사용과 관련된 일부 비용을 고려하였고, 이러한 비용을 줄이는 방법을 알아보자.
스파크는 집중적인 인메모리 분산 빅데이터 엔진입니다. 메모리를 효율적으로 사용하는 것은 실행 속도에 큰 영향을 미칩니다. 스파크의 메모리 사용은 엄청나게 진화해왔습니다.
1.
Spark 1.0에서는 메모리 스토리지, 직렬화 및 역직렬화에 RDD 기반 자바 객체를 사용했습니다.
2.
스파크 1.X에 도입된 프로젝트 텅스텐의 두드러진 특징 중 하나는, 오프셋과 포인터를 사용하여 오프 힙 메모리에 데이터세트와 데이터 프레임을 배치하는 새로운 내부 행 기반 형식입니다. 스파크는 인코더라고 불리는 효율적인 메커니즘을 사용하여 JVM과 내부 텅스텐 포맷 사이를 직렬화하고 역직렬화합니다. 오프 힙에 메모리를 직접 할당한다는 것은 스파크가 GC에 의해 받는 영향을 줄일 수 있다는 것을 의미합니다.
3.
스파크 2.x는 전체 단계 코드 생성 및 벡터화된 칼럼 기반 메모리 레이아웃을 특징으로 하는 2세대 텅스텐 엔진을 도입했습니다. 최신 컴파일러의 아이디어와 기술을 기반으로 제작된 이 새로운 버전은 빠른 병렬 데이터 액세스를 위해 최신 CPU 및 캐시 아키텍처의 '단일 명령, 다중 데이터(SIMD)' 접근 방식을 활용하고 있습니다.

데이터 집합 인코더

인코더오프 힙 메모리의 데이터를 스파크의 내부 텅스텐 형식에서 JVM 자바 오브젝트로 변환합니다. 이는 데이터세트 객체를 직렬화하고 역직렬화함으로써 가능합니다. 스파크의 내부 형식에서 원시 데이터 유형을 포함한 JVM 객체로 변환됩니다. 예를 들어, Encoder[T]는 스파크의 내부 텅스텐 형식에서 Dataset[T]로 변환됩니다.
스파크는 내장된 지원 기능으로 원시 유형(예: String, Integer, Long), 스칼라 케이스 클래스 및 자바 빈에 대한 인코더를 자동으로 생성할 수 있습니다. 스파크 인코더는 자바와 크리오 직렬화, 역직렬화에 비해 빠릅니다.
인코더를 명시적으로 생성할 수도 있습니다.
Encoder<UsageCost> usageCostEncoder = Encoders.bean(UsageCost.class);
Scala
복사
스칼라의 경우에는 스파크가 이러한 효율적인 변환을 위해 바이트 코드를 자동으로 생성한다.

스파크의 내부 형식과 자바 객체 형식 비교

자바 객체에는 헤더 정보, 해시 코드, 유니코드 정보 등 큰 오버헤드가 존재합니다.
'abcd'와 같은 간단한 자바 문자열도 예상하는 4바이트 대신 48바이트의 스토리지를 사용합니다.
⇒ MyClass(Int, String, String) 객체를 생성할 때 발생하는 오버헤드
반면, 스파크는 데이터세트 또는 데이터 프레임을 위한 JVM 기반 객체를 생성하는 대신 오프 힙 자바 메모리를 할당하여 데이터를 레이아웃하고, 인코더를 사용하여 데이터를 메모리 내 표현에서 JVM 객체로 변환합니다.
JVM 객체 MyClass(Int, String, String)가 내부적으로 저장되는 방식

직렬화 및 역직렬화

데이터가 인접한 방식으로 저장되고 포인터 산술과 오프셋을 통해 액세스할 수 있을 때, 인코더는 데이터를 빠르게 직렬화하거나 역직렬화할 수 있다 → 인코더가 스파크의 내부 텅스텐 이진 형식에서 JVM 객체로 직렬화하고 역직렬화 하는 방법
JVM 객체 - 직렬화, 역직렬화에서의 비효율적인 느림
JVM 객체 MyClass를 스파크 클러스터의 노드 간에 공유해야 하는 경우, 송신자는 이를 바이트 배열로 직렬화하고 수신자는 이를 다시 MyClass 유형의 JVM 객체로 직렬화한다.
JVM에는 자체 자바 직렬화기와 역직렬화기가 내장되어 있지만, 힙 메모리에서 JVM에 의해 생성된 자바 객체가 비대하기 때문에 비효율적이고 느리다.
데이터세트 인코더에서는 다음과 같은 사항으로 개선됩니다
스파크의 내부 tungsten 이진 형식은 자바 힙 메모리에 객체를 저장하며, 크기가 작아 공간을 적게 차지합니다.
인코더는 메모리 주소와 오프셋이 있는 간단한 포인터 계산을 사용하여 메모리를 가로질러 빠르게 직렬화할 수 있습니다.
수신 측에서 인코더는 스파크의 내부 표현을 사용하여 이진 표현을 빠르게 역직렬화할 수 있기 때문에 JVM의 가비지 컬렉션의 일시 중지로 인한 방해를 받지 않습니다.

데이터세트 사용 비용

데이터세트의 장점만 존재하는 것이 아니다.
람다 또는 함수형 인수를 사용하는 filter(), map() 또는 flatMap()과 같은 고차 함수에 전달될 때, 스파크 내부의 텅스텐 형식에서 JVM 객체로 역직렬화하는 비용이 발생
스파크의 인코더가 도입되기 전에 사용된 다른 직렬화 방법들과 비교하여 그 비용은 경미하고 감내할 만합니다. 그러나 대규모 데이터 세트와 많은 쿼리에 걸쳐 이러한 비용이 발생하며 성능에 영향을 미칠 수 있습니다.

비용 절감 전략

위와 같은 과도한 직렬화 및 역직렬화를 완화하기 위한 전략들은 아래와 같습니다.
1.
쿼리에서 도메인 특화 언어(DSL) 표현을 사용하고 람다를 고차 함수에 대한 인수로 과도하게 사용하여 익명성을 높이는 것을 피하는 것입니다.
람다는 런타임까지 명확하지 않으므로, 카탈리스트 옵티마이저에서 익명으로 처리됩니다. 이는 사용자가 수행하는 작업을 효율적으로 식별할 수 없어 (스파크에게 무엇을 하라고 지시하지 않음) 쿼리 최적화가 불가능합니다.
2.
직렬화 및 역직렬화가 최소화되도록 쿼리를 함께 연결하는 것
a.
예제
스칼라 케이스 클래스로 정의되는 Person 데이터세트
case class Person(id: Integer, firstName: String, middleName: String, lastName: String, gender: String, birthDate: String, ssn: String, salary: String)
Scala
복사
함수형 프로그래밍을 사용하여 이 데이터세트에 쿼리의 세트를 실행한다면?
직렬화 및 역직렬화의 비용을 반복적으로 발생시키는 방식으로 쿼리를 비효율적으로 구성하는 경우
람다에서 DSL로 이동할 때마다(filter($"salary” > 8000)) Person JVM 객체를 직렬화하고 역직렬화하는 비용이 발생된다.
DSL만 사용하고 람다를 사용하지 않은 경우
전체 합성 및 체인 쿼리에 대해서는 직렬화/역직렬화가 필요하지 않다

Reference

spylon-kernel
vericast
DSL
러닝스파크 6장
Garbage Collection(가비지 컬렉션)이란? 
프로그램을 개발 하다 보면 유효하지 않은 메모리인 가바지(Garbage)가 발생하게 된다. C언어를 이용하면 free()라는 함수를 통해 직접 메모리를 해제해주어야 한다. 하지만 Java나 Kotlin을 이용해 개발을 하다 보면 개발자가 메모리를 직접 해제해주는 일이 없다. 그 이유는 JVM의 가비지 컬렉터가 불필요한 메모리를 알아서 정리해주기 때문이다