Search

[러닝 스파크 7장] 스파크 애플리케이션의 최적화 및 튜닝

7장. 스파크 애플리케이션의 최적화 및 튜닝

효율적으로 스파크를 최적화 및 튜닝하기

아파치 스파크 설정 확인 및 세팅

1.
설정 파일 확인하기
spark 설치 폴더에서 conf 폴더의 하위 .template 파일을 수정 후 .template 부분을 지우고 저장하면 스파크에 새로운 설정값을 사용한다고 알려주게 된다.
2.
스파크 애플리케이션 안에 지정을 해주거나,
def main(args: Array[String]) = { val spark = SparkSession.builder .config("spark.sql.shuffle.partitions", 5) .config("spark.executor.memory", "2g") .master("local[*]") .appName("SparkConfig") .getOrCreate() }
Scala
복사
애플리케이션을 spark-submit으로 제출할 때 —conf 옵션을 써서 직접 설정할 수 있다.
spark-submit --conf spark.sql.shuffle.partitions=5 --conf "spark.executor.memory=2g" --class [main이 들어가 있는 파일 명]
Bash
복사
3.
스파크 셸 이용
다음의 코드를 이용하여 스파크가 로컬 모드로 실행된 호스트의 스파크 설정을 확인할 수 있다.
val mconf = spark.conf.getAll for (k <- mconf.keySet) { println(s"${k} -> ${mconf(k)}\n") }
Scala
복사
현재의 설정을 변경하기 위해서는 변경 가능한 값인지 확인해야 하고, 이는 spark.conf.isModifiable(”<설정필드 이름>”)을 호출하여 확인할 수 있다.
true면 변경 가능하고, false는 변경 불가능하다.
# 설정 변경 가능 여부 확인하기 spark.conf.isModifiable("spark.sql.shuffle.partitions")
Bash
복사
모든 수정 가능한 설정값은 API를 통해 새로 설정할 수 있다.
spark.conf.get("spark.sql.shuffle.partitions") # 200 spark.conf.set("spark.sql.shuffle.partitions", 5) # 값 변경
Bash
복사
위에서 언급한 세 가지의 방식에는 우선 순위가 존재한다. spark-default.conf에 정의된 값이나 플래그 > spark-submit의 명령 행 설정 > 스파크 애플리케이션에서 SparkSession을 통해 설정된 값 순서대로 읽히게 된다. 이 과정에서 중복된 설정은 초기화되며 뒤의 값으로 대체된다.

대규모 워크로드를 위한 스파크 규모 확장

자원 사용의 최적화를 어떻게 조정하는지, 태스크를 어떻게 병렬적으로 실행시키는지, 다수의 태스크에 의한 병목 현상을 어떻게 회피하는지 등을 설명
정적/동적 자원 할당
정적 자원 할당
태스크가 오랫동안 기다리는 상황이 되더라도 추가적인 자원을 할당할 수 없게 된다.
spark.executor.memory 2g
Bash
복사
동적 자원 할당
스파크 드라이버는 거대한 워크로드가 밀려오거나 빠져나갈 때마다 그 요청에 맞춰 컴퓨팅 자원을 더 할당하거나 줄이도록 요청할 수 있다.
스파크가 자원 사용을 더 효율적으로 하게 해 주며 사용하지 않는 executer를 해제하고 필요할 때 새롭게 띄우도록 한다.
사용가능한 예시로는 스트리밍, 온디맨드 데이터 분석 등의 상황이 있다.
spark.dynamicAllocation.enabled true spark.dynamicAllocation.minExecutors 2 spark.dynamicAllocation.schedulerBacklogTimeout 1m spark.dynamicAllocation.maxExecutors 20 spark.dynamicAllocation.executorIdleTimeout 2min
Bash
복사
설정 해설
스파크 이그제큐터의 메모리와 셔플 서비스 설정
각 이그제큐터에서 사용 가능한 메모리의 양은 spark.executor.memory에 의해 제어되고, 이 메모리는 실행 메모리, 저장 메모리, 예비 메모리의 세 부분으로 나뉘어진다.
예비 메모리에는 OOM(Out Of Memory Error) 방지를 위하여 300MB가 할당됨.
일반적으로 나머지 공간은 실행 메모리 60%, 저장 메모리 40%에 할당된다.
실행 메모리는 스파크의 셔플, 조인, 정렬, 집계 등에 사용되고, 저장 메모리는 사용자 데이터 구조를 캐싱하고 데이터 프레임에서 온 파티션들을 저장하는 데에 주로 쓰인다.
책에 ‘맵과 셔플 작업 중의 I/O를 조절할 수 있는 스파크 설정’과 관련된 표가 있으므로 필요할 때 참고해보도록 하자.
스파크 병렬성 최대화
파티션(partition)
디스크에 연속된 위치에 데이터를 조각이나 블록들의 모음으로 나눠서 저장하는 방법.
이 데이터 모음들은 병렬적으로 또 독립적으로 읽어서 처리가 가능하며 필요하면 하나의 프로세스 안에서 멀티 스레딩으로 처리도 가능하다.
스파크는 각 코어에 태스크를 할당하고, 태스크에 스레드를 스케줄링하고, 각 태스크는 개별 파티션을 처리한다.
자원 사용을 최적화하고 병렬성을 최적화하기 위해서는 이그제큐터에 할당된 코어의 개수만큼 파티션들이 최소한으로 할당되는 것이다. (즉, 파티션의 개수가 코어의 개수를 넘겨서 더 많은 코어가 일을 할 수 있도록 하는 것이다.
파티션의 수는 클러스터의 코어 개수보다 서너배 더 많은 파티션을 사용하는 것이 좋다.
너무 적으면 클러스터를 충분히 활용할 수 없고, 메모리 문제가 발생할 수 있다.
어느정도는 좀 많아도 무방하나, 너무 많이 늘리면 태스크 관리 작업에 병목 현상이 발생한다.
파티션의 생성
스파크의 태스크들은 데이터를 디스크에서 읽어 메모리로 올리면서 파티션 단위로 처리한다.
디스크의 데이터는 저장장치에 따라 조각이나 연속된 파일 블록으로 존재하며, 이런 블록들의 연속된 모음이 하나의 파티션을 구성하게 된다.
스파크에서 한 파티션의 크기는 spark.sql.files.maxPartitionBytes 에 따라 결정되고, default는 128MB이다.
이 크기를 줄이게 되면 작은 파티션이 많아지면서 디스크 I/O양이 급증하여 성능 저하를 일으키면서 분산 파일 시스템이 느려지는 ‘작은 파일 문제(small file problem)’이 발생하게 된다.
셔플 파티션(shuffle partitions)
groupBy()나 join()과 같이 넓은 트랜스포메이션으로 알려진 작업 (여러 개의 파티션을 참고해야 하는 연산으로 이해) 중에 생성되는 파티션
네트워크과 디스크 I/O 모두를 사용한다. 이런 작업 중에는 spark.local.directory에 지정된 이그제큐터의 로컬 디렉터리에 중간 결과를 작성하며, SSD 디스크를 장착해 두면 이 부분의 성능을 상당히 올릴 수 있다.
기본적으로 셔플 파티션의 개수는 spark.sql.shuffle.partitions에 50으로 지정되어 있다.
이 기본값은 작은 규모나 스트리밍 워크로드에는 너무 높을 수 있기 때문에 하고자하는 일에 따라 조정해주는 것이 좋다.

데이터 캐싱과 영속화

DataFrame.cache()

cache() 는 허용하는 메모리 수준만큼 스파크 이그제큐터들의 메모리에 읽은 파티션을 최대한 저장한다. 하지만 일부만을 캐싱할 수 있는 데이터프레임과는 달리 파티션은 개별 파티션의 전체를 캐싱해야 한다.
예제 - 교재의 예시를 pyspark에서 실행
처음 count()에서 실제로 캐시를 수행하게 되고, 두 번째 count()에서는 캐시된 df를 사용하게 되어 실행속도가 매우 빨라졌다. (약 14배)

DataFrame.persist()

persist(StorageLevel.LEVEL)은 StorageLevel에 따라 데이터가 어떤 방식으로 캐시될 것인지 제어할 수 있다. - 저장 위치과 직렬화 여부를 결정
StorageLevel
설명
MEMORY_ONLY
데이터가 곧바로 객체 형태로 메모리에 저장된다.
MEMORY_ONLY_SER
데이터가 직렬화되어 용량이 최소화된 바이트 배열 형태로 메모리에 저장된다. 사용 시 역직렬화를 위한 비용이 소모된다.
MEMORY_AND_DISK
데이터가 곧바로 객체 형태로 메모리에 저장되지만 부족한 경우 직렬화되어 디스크에 저장된다.
DISK_ONLY
데이터가 직렬화되어 디스크에 저장된다.
OFF_HEAP
데이터가 오프힙(off-heap) 메모리에 저장된다. 오프힙 메모리는 스파크에서 저장 및 쿼리 실행에 사용된다.
MEMORY_AND_DISK_SER
MEMORY_AND_DISK와 비슷하지만 메모리에 저장되는 데이터가 직렬화된다. (디스크에 저장되는 데이터는 항상 직렬화된다,)
각 StorageLevel은 OFF_HEAP을 제외하고 ‘레벨이름_2’ 형태의 옵션이 존재하는데 이는 서로 다른 스파크 이그제큐터에 복제해서 두 벌이 저장되는 것을 의미한다. 이 옵션을 사용하면 캐싱에 자원을 더 소모하지만 데이터를 두 군데에 저장하기 때문에 장애 상황에 대처할 수 있다.
예제
DISK_ONLY option으로 persist()를 한 결과 메모리가 아닌 디스크에 df가 저장된 것을 볼 수 있다.
persist(MEMORY_ONLY)는 cache()와 같은 역할을 한다.
캐시된 데이터를 비우고 싶을 때는 df.unpersist()를 호출하면 된다.

캐시나 영속화를 사용해야 할 때

큰 데이터세트에 쿼리나 트랜스포메이션으로 반복적으로 접근해야 하는 시나리오일 때
반복적인 머신러닝 학습을 위해 계속 접근해야 하는 데이터 프레임들
ETL이나 데이터 파이프라인 구축 시 빈도 높은 트랜스포메이션 연산으로 자주 접근해야 하는 데이터 프레임들

캐시나 영속화를 쓰지 말아야 할 때

메모리 캐시는 사용하는 StorageLevel에 따라 직렬화나 역직렬화에서 비용을 발생시킬 수 있으므로 주의깊게 사용해야 한다. 다음의 경우에는 캐시나 영속화가 부적절한 경우이다.
데이터 프레임이 메모리에 들어가기에 너무 클 때
크기에 상관없이 자주 쓰지 않는 데이터 프레임에 대해 비용이 크지 않은 트랜스포메이션을 수행할 때

스파크 조인의 종류

조인(Join)
테이블이나 데이터 프레임 형태로 되어 있는 두 종류의 데이터세트를 공통적으로 일치하는 키를 기준으로 병합하는 연산
관계형 데이터베이스와 유사하게 스파크 데이터 프레임, 데이터세트 API, 스파크 SQL은 inner / outer / left / right join 등의 여러 종류의 조인 트렌스포메이션을 제공한다.
조인 연산들은 스파크 이그제큐터들 사이에 방대한 데이터 이동을 일으킨다.
CORE SPARK JOIN
RDD 형태의 조인.
DataFrame과 DataSet을 통한 Spark coding이 훨씬 보편적이고, 추천되는 방식이기도 한 지금에 와서는, 직접 RDD를 활용해 low level의 API를 사용할 이유는 특별히 목적이 있는 경우를 제외하고는 많지 않을 것이다.
하지만, 스파크의 모든 워크로드는 저수준 기능을 사용하는 기초적인 형태로 컴파일되므로 이 구조를 이해해두는 것이 효율적 활용에 도움이 될 수 있다.
일반적으로 JOIN은 동일한 키의 데이터가 동일한 파티션 내에 있어야 하므로 비용이 비싼 작업이다.RDD의 Known Partitionor 를 갖고 있지 않다면 이를 공유하도록 셔플이 필요하고, 이를 통해 동일한 키의 데이터는 동일한 파티션에 위치하게 된다.
조인의 비용은 키의 개수와 올바른 파티션으로 위치하기 위해 움직이는 규모에 비례해서 커진다.
스파크의 조인 전략
스파크는 이그제큐터 간에 데이터를 교환, 이동, 정렬, 그룹화, 병합하는 방식에 따라 다섯 종류의 조인 전략을 가진다.
브로드캐스트 해시 조인(Broadcast Hash Join, BHJ)
셔플 해시 조인(Shuffle Hash Join, SHJ)
셔플 소트 머지 조인(Shuffle Sort Merge Join, SMJ)
브로드캐스트 네스티드 루프 조인(Broadcast Nested Loop Join, BNLJ)
셔플 복제 네스티드 루프 조인(Crtesian Product Join)
본 교재에서는 BHJ와 SMJ만을 다룬다.

브로드캐스트 해시 조인(BHJ) (= 맵사이드 조인, map-side-only join)

이상적으로 데이터 이동이 거의 필요 없도록 한 쪽은 작고(드라이버와 이그제큐터 메모리에 들어갈 사이즈) 다른 쪽은 큰 두 종류의 데이터를 사용하여 특정 조건이나 칼럼 기준으로 조인한다.
더 작은 쪽의 데이터가 드라이버에 의해 모든 스파크 이그제큐터에 복사되어서 뿌려지고, 이어서 각 이그제큐터에 나뉘어 있는 큰 데이터와 조인된다.
이 전략은 큰 데이터 교환이 이루어지지 않게 한다.
일반적으로 작은 쪽의 데이터가 10MB 이하일 때 브로드캐스트 조인을 사용한다. 이 설정은 spark.sql.autoBroadcastJoinThreshold 에 의해 지정되며, 각 이그제큐터와 드라이버의 설정에 따라 높이거나 낮출 수 있다.
일반적으로 작은 쪽의 데이터가 이 설정값보다 낮은 경우 스파크는 자동으로 BHJ을 시도한다.

브로드캐스트 해시 조인을 사용하면 좋은 경우

두 데이터 프레임에 공통적인 키들이 존재하고, 한 쪽이 가지고 있는 정보가 적으며 양쪽의 뷰를 병합하는 경우
예제) 전 세계의 축구 선수들에 대한 정보가 있는 큰 데이터세트인 playersDF와 선수들이 활동하는 축구 팀들의 정보가 있는 더 작은 데이터세트인 clubDF를 조인하는 상황 (아래 scala 예제)
val joinedDF = playerDF.join(broadcast(clubDF), "key1 == key2")
Scala
복사
양쪽 데이터세트의 각 키가 스파크에서 동일한 파티션 안에 해시될 때
한 데이터가 다른 쪽보다 많이 작은 경우
정렬되지 않은 키들을 기준으로 두 데이터를 결합하면서 equi-join을 수행할 때
더 작은 쪽의 데이터가 모든 스파크 이그제큐터에 브로드캐스트될 때 발생하는 과도한 네트워크 대역폭이나 OOM 오류에 대해 걱정할 필요가 없는 경우
Equi-Join vs Non-Equi-Join
Equi-Join: 두 테이블에서 공통적으로 존재하는 컬럼의 값이 일치되는 행을 연결해서 결과를 생성하는 것 (= 조건을 검사하는 일반적인 조인)
Non-Equi-Join: = 연산자 이외의 비교 연산자를 이용하여 조인을 수행하는 것

셔플 소트 머지 조인 (SMJ)

정렬 가능하고 겹치지 않으면서 공통 파티션에 저장 가능한 공통 키를 기반으로 큰 두 종류의 데이터세트를 합칠 수 있는 효과적인 방법
각 데이터세트의 동일 키를 가진 데이터세트의 모든 레코드가 동일 이그제큐터의 동일 파티션에 존재해야 한다. 즉, 데이터가 이그제큐터 사이에 교환이 되어야 하거나 공통 위치에 존재해야 함을 의미한다.
1. 정렬 단계2. 머지 단계 로 나뉘어진다.
정렬 단계 : 각 데이터를 조인 키에 따라 정렬
머지 단계 : 각 데이터세트에서 키 순서대로 데이터를 순회하며 키가 일치하는 로우끼리 병합
기본적으로 SHJ는 spark.sql.join.preferSortMergeJoin 설정에 의해 활성화된다.
또한, spark.sql.autoBroadcastJoinThreshold 가 -1로 설정된 경우 스파크는 기본적으로 SMJ를 시도한다.
SMJ 예제 - 교재의 scala 예제를 pyspark로 변경
필요한 패키지 불러오기 및 환경 설정, dictionary 만들기
import randon # 아래의 설정으로 자동으로 SMJ가 시행됨. spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1") # state, item column 생성을 위한 dict states_dict = {0:"AZ", 1:"CO", 2:"CA", 3:"TX", 4:"NY", 5:"MI" } items_dict = {0:"SKU-0", 1:"SKU-1", 2:"SKU-2", 3:"SKU-3", 4:"SKU-4", 5:"SKU-5"}
Python
복사
usersDF 생성
usersDF = spark.range(1 * 10000000).rdd.map(lambda x: (x[0], "user_"+str(x[0]), "user_"+str(x[0])+"@databricks.com", states_dict[random.choice(range(6))]) ).toDF(["uid", "login", "email", "user_state"])
Python
복사
ordersDF 생성
ordersDF = spark.range(1 * 10000000).rdd.map(lambda x: (x[0], x[0], random.choice(range(10001)), 10 * x[0] * 0.2, states_dict[random.choice(range(6))], items_dict[random.choice(range(6))]) ).toDF(["transaction_id", "quantity", "users_id", "amount", "state", "items"
Python
복사
join
usersOrdersDF = ordersDF.join(usersDF, ordersDF.users_id == usersDF.uid)
Python
복사
실행 계획 확인
dataframe에 explain() 메서드를 입력하면 어떤 조인이 사용되는지 물리적 계획을 확인할 수 있다.
아래의 예제에서는 기대한 것 처럼 SortMergeJoin이 사용된 것을 확인할 수 있다.
sparkUI를 이용하여 DAG를 살펴보면 총 3단계의 스테이지로 전체 작업이 이루어지고 (앞에 두 개는 데이터 생성, 뒤에 한 개가 Join 작업) 마지막 스테이지에서 Exchange와 Sort 작업이 병합 직접에 실행되는 것을 알 수 있다.
Exchange는 이그제큐터 간에 네트워크상으로 파티션이 셔플되어야 하는 비싼 작업이다.
usersOrdersDF.explain()
Python
복사
# 출력 결과 == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- SortMergeJoin [users_id#893L], [uid#864L], Inner :- Sort [users_id#893L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(users_id#893L, 5), ENSURE_REQUIREMENTS, [plan_id=902] : +- Filter isnotnull(users_id#893L) : +- Scan ExistingRDD[transaction_id#891L,quantity#892L,users_id#893L,amount#894,state#895,items#896] +- Sort [uid#864L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(uid#864L, 5), ENSURE_REQUIREMENTS, [plan_id=903] +- Filter isnotnull(uid#864L) +- Scan ExistingRDD[uid#864L,login#865,email#866,user_state#867]
Python
복사

SMJ(Shuffle Merge Join) 최적화 - 버켓팅(Bucketing)

SMJ를 빈번하게 사용해야 하는 상황이라면 공통의 정렬된 키나 칼럼을 위한 파티션된 버킷을 만들어 Exchange 단계를 없앨 수 있다.
이 방식으로 데이터를 재구성하면 Exchange 단계를 생략하고 WholeStageCodegen으로 넘어가므로 성능을 올릴 수 있다.
예제 코드
usersDF와 ordersDF를 각각 uid와 users_id로 정렬 후 버킷을 만들어 스파크 테이블에 파케이 포맷으로 저장 후에 다시 불러들인 후 조인을 시키는 예제
테이블을 정렬된 상태로 저장했기 때문에 SortMergeJoin 동안 정렬할 필요가 없고, 작업 DAG를 살펴보면 Exchange를 건너뛰고 바로 WholeStageCodegen 단계로 넘어갔음을 볼 수 있다.
explain()을 통해 물리 계획(Physical Plan)을 살펴보았을 때도 Exchange 연산이 실행되지 않았음을 알 수 있다.
# usersDF bucketing usersDF.orderBy(col("uid").asc()) \ .write.format("parquet") \ .bucketBy(8, "uid") \ .mode("overwrite") \ .saveAsTable("UsersTbl") # ordersDF bucketing ordersDF.orderBy(col("users_id").asc()) \ .write.format("parquet") \ .bucketBy(8, "users_id") \ .mode("overwrite") \ .saveAsTable("OrdersTbl") # table caching spark.sql("CACHE TABLE UsersTbl") spark.sql("CACHE TABLE OrdersTbl") # reread usersBucketDF = spark.table("UsersTbl") ordersBucketDF = spark.table("OrdersTbl") # join joinUsersOrdersBucketDF = ordersBucketDF.join(usersBucketDF, ordersBucketDF.users_id == usersBucketDF.uid)
Python
복사
# joinUsersOrdersBucketDF.explain() 실행 결과 == Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(3) SortMergeJoin [users_id#268], [uid#129], Inner :- *(1) Sort [users_id#268 ASC NULLS FIRST], false, 0 : +- *(1) Filter isnotnull(users_id#268) : +- Scan In-memory table OrdersTbl [transaction_id#266L, quantity#267L, users_id#268, amount#269, state#270, items#271], [isnotnull(users_id#268)] : +- InMemoryRelation [transaction_id#266L, quantity#267L, users_id#268, amount#269, state#270, items#271], StorageLevel(disk, memory, deserialized, 1 replicas) : +- *(1) ColumnarToRow : +- FileScan parquet default.orderstbl[transaction_id#266L,quantity#267L,users_id#268,amount#269,state#270,items#271] Batched: true, Bucketed: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/yschoi/Library/CloudStorage/Dropbox/yunseo/development/BOA..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<transaction_id:bigint,quantity:bigint,users_id:string,amount:double,state:string,items:str..., SelectedBucketsCount: 8 out of 8 +- *(2) Sort [uid#129 ASC NULLS FIRST], false, 0 +- *(2) Filter isnotnull(uid#129) +- Scan In-memory table UsersTbl [uid#129, login#130, email#131, user_state#132], [isnotnull(uid#129)] +- InMemoryRelation [uid#129, login#130, email#131, user_state#132], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) ColumnarToRow +- FileScan parquet default.userstbl[uid#129,login#130,email#131,user_state#132] Batched: true, Bucketed: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/yschoi/Library/CloudStorage/Dropbox/yunseo/development/BOA..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<uid:string,login:string,email:string,user_state:string>, SelectedBucketsCount: 8 out of 8
Python
복사

셔플 소트 머지 조인을 사용하면 좋은 경우

두 개의 큰 데이터세트의 각 키가 정렬 및 해시되어 스파크에서 동일 파티션에 위치할 수 있을 때
Equi-Join만을 수행하여 정렬된 동일 키 기반으로 두 데이터세트를 조합하기를 원할 때
네트워크 간에 규모가 큰 셔플을 일으키는 Exchange 와 Sort 연산을 피하고 싶을 때

스파크 UI 들여다보기

메모리 사용량, 잡, 스테이지, 태스크 등에 대한 자세한 정보뿐 아니라 이벤트 타임라인, 로그, 스파크 애플리케이션 안의 통계 정보를 스파크 드라이버와 각 개별 이그제큐터 레벨에서 제공
spark-submit은 스파크 UI를 띄우게 되며 로컬 호스트나 스파크 드라이버를 통해 기본 포트 4040으로 연결할 수 있다.

Jobs와 Stage 탭

스파크는 애플리케이션은 잡, 스테이지, 테스크 단위로 나누어 처리하는데, jobs와 stage 탭을 통해 완료 상태, I/O 관련 수치, 메모리 소비량, 실행 시간 등을 살펴볼 수 있다.
Jobs
이그제큐터들이 어느 시점에 클러스터에 추가되거나 삭제되었는지 확인 할 수 있다.
실행 정도과 실행 시간(Duration)을 확인 할 수 있다.
실행 시간이 비정상적으로 크다면 딜레이를 일으키는 태스크가 해당 잡의 스테이지에 포함되어 있을 가능성이 있으므로 이를 살펴볼 필요가 있다.
Stage
모든 잡의 모든 스테이지의 현재 상태에 대한 요약을 제공한다.
디테일 페이지로 들어가면 DAG와 태스크의 메트릭도 확인 가능하다.
각 태스트의 평균 수행시간, GC(garbage collection)에 걸린 시간, 셔플 읽기로 읽어 들인 바이트 크기나 레코드 개수 등의 지표를 확인 할 수 있다.
만약 셔플 데이터가 원격 이그제큐터에서 읽고 있는 중이라면 높은 셔플 읽기 대기 시간 수치가 I/O 이슈일 수 있고,
높은 GC 시간은 힙 메모리에 너무 많은 객체가 있다는 의미일 수 있다. (이그제큐터에 메모리 부족 문제 발생 가능)
최대 테스크 수행시간이 평균보다 너무 높다면 파티션들끼리 균등하지 못한 데이터 분포에 의해 데이터 불균형(data skew)이 일어났을 수 있다.

Executors 탭

애플리케이션에 생성된 이그제큐터들에 대한 정보를 제공
상세 자원 사용량(디스크, 메모리, CPU 코어), GC 수행시간, 셔플 동안 읽고 쓴 데이터양 등을 확인할 수 있다.

Storage 탭

storage 탭을 통해 요약된 통계량 외에도 개별 이그제큐터에서 메모리를 얼마나 쓰는지, 무슨 목적으로 쓰는지 볼 수 있다. 이는 데이터 프레임이나 관리 테이블에서 cache()나 persist()를 썼을 때 사용량을 확인하는 데 도움이 된다.
즉, 메모리 사용량의 세부 사항을 알려준다.
위의 버켓팅 예제에서도 storage 탭을 통해 이를 확인했었다.

SQL 탭

스파크 SQL 쿼리의 효과를 확인할 수 있다.
쿼리의 Description을 클릭하면 아래의 2번째 그림처럼 모든 물리적 오퍼레이터들과 함께 상세 실행 계획을 보여준다.
실행 계획의 각 물리 오퍼레이터(Scan In-memory table, Filter 등) 밑에는 SQL 통계 수치들이 이쓴ㄴ데, 이 수치들을 통해 물리 오퍼레이터의 세부적인 내용과 어떤 일을 했는지 알고 싶을 때 유용하게 사용할 수 있다.(레코드를 몇 개나 읽어 들였는지, 셔플로 몇 바이트를 썼는지 등)

Environment 탭

스파크 애플리케이션이 돌아가는 환경 정보를 알 수 있다.
어떤 환경 변수가 지정되어 있는지, 어떤 jar 파일들이 포함되어 있는지, 어떤 스파크 특성이나 시스템 특성이 지정되어 있는지, 어떤 런타임 환경이 사용되는지 등을 알 수 있다.

Reference

러닝스파크 2nd Edition, 제이펍, 2022