7장. 스파크 애플리케이션의 최적화 및 튜닝
효율적으로 스파크를 최적화 및 튜닝하기
아파치 스파크 설정 확인 및 세팅
1.
설정 파일 확인하기
•
spark 설치 폴더에서 conf 폴더의 하위 .template 파일을 수정 후 .template 부분을 지우고 저장하면 스파크에 새로운 설정값을 사용한다고 알려주게 된다.
2.
스파크 애플리케이션 안에 지정을 해주거나,
def main(args: Array[String]) = {
val spark = SparkSession.builder
.config("spark.sql.shuffle.partitions", 5)
.config("spark.executor.memory", "2g")
.master("local[*]")
.appName("SparkConfig")
.getOrCreate()
}
Scala
복사
spark-submit --conf spark.sql.shuffle.partitions=5 --conf "spark.executor.memory=2g" --class [main이 들어가 있는 파일 명]
Bash
복사
3.
스파크 셸 이용
•
다음의 코드를 이용하여 스파크가 로컬 모드로 실행된 호스트의 스파크 설정을 확인할 수 있다.
val mconf = spark.conf.getAll
for (k <- mconf.keySet) { println(s"${k} -> ${mconf(k)}\n") }
Scala
복사
•
현재의 설정을 변경하기 위해서는 변경 가능한 값인지 확인해야 하고, 이는 spark.conf.isModifiable(”<설정필드 이름>”)을 호출하여 확인할 수 있다.
◦
true면 변경 가능하고, false는 변경 불가능하다.
# 설정 변경 가능 여부 확인하기
spark.conf.isModifiable("spark.sql.shuffle.partitions")
Bash
복사
•
모든 수정 가능한 설정값은 API를 통해 새로 설정할 수 있다.
spark.conf.get("spark.sql.shuffle.partitions") # 200
spark.conf.set("spark.sql.shuffle.partitions", 5) # 값 변경
Bash
복사
위에서 언급한 세 가지의 방식에는 우선 순위가 존재한다. spark-default.conf에 정의된 값이나 플래그 > spark-submit의 명령 행 설정 > 스파크 애플리케이션에서 SparkSession을 통해 설정된 값 순서대로 읽히게 된다. 이 과정에서 중복된 설정은 초기화되며 뒤의 값으로 대체된다.
대규모 워크로드를 위한 스파크 규모 확장
•
자원 사용의 최적화를 어떻게 조정하는지, 태스크를 어떻게 병렬적으로 실행시키는지, 다수의 태스크에 의한 병목 현상을 어떻게 회피하는지 등을 설명
정적/동적 자원 할당
•
정적 자원 할당
◦
태스크가 오랫동안 기다리는 상황이 되더라도 추가적인 자원을 할당할 수 없게 된다.
spark.executor.memory 2g
Bash
복사
•
동적 자원 할당
◦
스파크 드라이버는 거대한 워크로드가 밀려오거나 빠져나갈 때마다 그 요청에 맞춰 컴퓨팅 자원을 더 할당하거나 줄이도록 요청할 수 있다.
◦
스파크가 자원 사용을 더 효율적으로 하게 해 주며 사용하지 않는 executer를 해제하고 필요할 때 새롭게 띄우도록 한다.
◦
사용가능한 예시로는 스트리밍, 온디맨드 데이터 분석 등의 상황이 있다.
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.minExecutors 2
spark.dynamicAllocation.schedulerBacklogTimeout 1m
spark.dynamicAllocation.maxExecutors 20
spark.dynamicAllocation.executorIdleTimeout 2min
Bash
복사
설정 해설
스파크 이그제큐터의 메모리와 셔플 서비스 설정
•
각 이그제큐터에서 사용 가능한 메모리의 양은 spark.executor.memory에 의해 제어되고, 이 메모리는 실행 메모리, 저장 메모리, 예비 메모리의 세 부분으로 나뉘어진다.
◦
예비 메모리에는 OOM(Out Of Memory Error) 방지를 위하여 300MB가 할당됨.
◦
일반적으로 나머지 공간은 실행 메모리 60%, 저장 메모리 40%에 할당된다.
•
실행 메모리는 스파크의 셔플, 조인, 정렬, 집계 등에 사용되고, 저장 메모리는 사용자 데이터 구조를 캐싱하고 데이터 프레임에서 온 파티션들을 저장하는 데에 주로 쓰인다.
•
책에 ‘맵과 셔플 작업 중의 I/O를 조절할 수 있는 스파크 설정’과 관련된 표가 있으므로 필요할 때 참고해보도록 하자.
스파크 병렬성 최대화
•
파티션(partition)
◦
디스크에 연속된 위치에 데이터를 조각이나 블록들의 모음으로 나눠서 저장하는 방법.
◦
이 데이터 모음들은 병렬적으로 또 독립적으로 읽어서 처리가 가능하며 필요하면 하나의 프로세스 안에서 멀티 스레딩으로 처리도 가능하다.
•
스파크는 각 코어에 태스크를 할당하고, 태스크에 스레드를 스케줄링하고, 각 태스크는 개별 파티션을 처리한다.
•
자원 사용을 최적화하고 병렬성을 최적화하기 위해서는 이그제큐터에 할당된 코어의 개수만큼 파티션들이 최소한으로 할당되는 것이다. (즉, 파티션의 개수가 코어의 개수를 넘겨서 더 많은 코어가 일을 할 수 있도록 하는 것이다.
•
파티션의 수는 클러스터의 코어 개수보다 서너배 더 많은 파티션을 사용하는 것이 좋다.
◦
너무 적으면 클러스터를 충분히 활용할 수 없고, 메모리 문제가 발생할 수 있다.
◦
어느정도는 좀 많아도 무방하나, 너무 많이 늘리면 태스크 관리 작업에 병목 현상이 발생한다.
파티션의 생성
•
스파크의 태스크들은 데이터를 디스크에서 읽어 메모리로 올리면서 파티션 단위로 처리한다.
•
디스크의 데이터는 저장장치에 따라 조각이나 연속된 파일 블록으로 존재하며, 이런 블록들의 연속된 모음이 하나의 파티션을 구성하게 된다.
•
스파크에서 한 파티션의 크기는 spark.sql.files.maxPartitionBytes 에 따라 결정되고, default는 128MB이다.
◦
이 크기를 줄이게 되면 작은 파티션이 많아지면서 디스크 I/O양이 급증하여 성능 저하를 일으키면서 분산 파일 시스템이 느려지는 ‘작은 파일 문제(small file problem)’이 발생하게 된다.
•
셔플 파티션(shuffle partitions)
◦
groupBy()나 join()과 같이 넓은 트랜스포메이션으로 알려진 작업 (여러 개의 파티션을 참고해야 하는 연산으로 이해) 중에 생성되는 파티션
◦
네트워크과 디스크 I/O 모두를 사용한다. 이런 작업 중에는 spark.local.directory에 지정된 이그제큐터의 로컬 디렉터리에 중간 결과를 작성하며, SSD 디스크를 장착해 두면 이 부분의 성능을 상당히 올릴 수 있다.
◦
기본적으로 셔플 파티션의 개수는 spark.sql.shuffle.partitions에 50으로 지정되어 있다.
▪
이 기본값은 작은 규모나 스트리밍 워크로드에는 너무 높을 수 있기 때문에 하고자하는 일에 따라 조정해주는 것이 좋다.
데이터 캐싱과 영속화
DataFrame.cache()
•
cache() 는 허용하는 메모리 수준만큼 스파크 이그제큐터들의 메모리에 읽은 파티션을 최대한 저장한다. 하지만 일부만을 캐싱할 수 있는 데이터프레임과는 달리 파티션은 개별 파티션의 전체를 캐싱해야 한다.
•
예제 - 교재의 예시를 pyspark에서 실행
◦
처음 count()에서 실제로 캐시를 수행하게 되고, 두 번째 count()에서는 캐시된 df를 사용하게 되어 실행속도가 매우 빨라졌다. (약 14배)
- 8개의 partition으로 메모리에 캐시되어 있다.
DataFrame.persist()
•
persist(StorageLevel.LEVEL)은 StorageLevel에 따라 데이터가 어떤 방식으로 캐시될 것인지 제어할 수 있다. - 저장 위치과 직렬화 여부를 결정
StorageLevel | 설명 |
MEMORY_ONLY | 데이터가 곧바로 객체 형태로 메모리에 저장된다. |
MEMORY_ONLY_SER | 데이터가 직렬화되어 용량이 최소화된 바이트 배열 형태로 메모리에 저장된다. 사용 시 역직렬화를 위한 비용이 소모된다. |
MEMORY_AND_DISK | 데이터가 곧바로 객체 형태로 메모리에 저장되지만 부족한 경우 직렬화되어 디스크에 저장된다. |
DISK_ONLY | 데이터가 직렬화되어 디스크에 저장된다. |
OFF_HEAP | 데이터가 오프힙(off-heap) 메모리에 저장된다. 오프힙 메모리는 스파크에서 저장 및 쿼리 실행에 사용된다. |
MEMORY_AND_DISK_SER | MEMORY_AND_DISK와 비슷하지만 메모리에 저장되는 데이터가 직렬화된다. (디스크에 저장되는 데이터는 항상 직렬화된다,) |
각 StorageLevel은 OFF_HEAP을 제외하고 ‘레벨이름_2’ 형태의 옵션이 존재하는데 이는 서로 다른 스파크 이그제큐터에 복제해서 두 벌이 저장되는 것을 의미한다. 이 옵션을 사용하면 캐싱에 자원을 더 소모하지만 데이터를 두 군데에 저장하기 때문에 장애 상황에 대처할 수 있다.
•
예제
◦
DISK_ONLY option으로 persist()를 한 결과 메모리가 아닌 디스크에 df가 저장된 것을 볼 수 있다.
◦
persist(MEMORY_ONLY)는 cache()와 같은 역할을 한다.
◦
캐시된 데이터를 비우고 싶을 때는 df.unpersist()를 호출하면 된다.
캐시나 영속화를 사용해야 할 때
•
큰 데이터세트에 쿼리나 트랜스포메이션으로 반복적으로 접근해야 하는 시나리오일 때
◦
반복적인 머신러닝 학습을 위해 계속 접근해야 하는 데이터 프레임들
◦
ETL이나 데이터 파이프라인 구축 시 빈도 높은 트랜스포메이션 연산으로 자주 접근해야 하는 데이터 프레임들
캐시나 영속화를 쓰지 말아야 할 때
•
메모리 캐시는 사용하는 StorageLevel에 따라 직렬화나 역직렬화에서 비용을 발생시킬 수 있으므로 주의깊게 사용해야 한다. 다음의 경우에는 캐시나 영속화가 부적절한 경우이다.
◦
데이터 프레임이 메모리에 들어가기에 너무 클 때
◦
크기에 상관없이 자주 쓰지 않는 데이터 프레임에 대해 비용이 크지 않은 트랜스포메이션을 수행할 때
스파크 조인의 종류
조인(Join)
•
테이블이나 데이터 프레임 형태로 되어 있는 두 종류의 데이터세트를 공통적으로 일치하는 키를 기준으로 병합하는 연산
•
관계형 데이터베이스와 유사하게 스파크 데이터 프레임, 데이터세트 API, 스파크 SQL은 inner / outer / left / right join 등의 여러 종류의 조인 트렌스포메이션을 제공한다.
•
조인 연산들은 스파크 이그제큐터들 사이에 방대한 데이터 이동을 일으킨다.
•
CORE SPARK JOIN
RDD 형태의 조인.
DataFrame과 DataSet을 통한 Spark coding이 훨씬 보편적이고, 추천되는 방식이기도 한 지금에 와서는, 직접 RDD를 활용해 low level의 API를 사용할 이유는 특별히 목적이 있는 경우를 제외하고는 많지 않을 것이다.
하지만, 스파크의 모든 워크로드는 저수준 기능을 사용하는 기초적인 형태로 컴파일되므로 이 구조를 이해해두는 것이 효율적 활용에 도움이 될 수 있다.
◦
일반적으로 JOIN은 동일한 키의 데이터가 동일한 파티션 내에 있어야 하므로 비용이 비싼 작업이다.RDD의 Known Partitionor 를 갖고 있지 않다면 이를 공유하도록 셔플이 필요하고, 이를 통해 동일한 키의 데이터는 동일한 파티션에 위치하게 된다.
◦
조인의 비용은 키의 개수와 올바른 파티션으로 위치하기 위해 움직이는 규모에 비례해서 커진다.
스파크의 조인 전략
•
스파크는 이그제큐터 간에 데이터를 교환, 이동, 정렬, 그룹화, 병합하는 방식에 따라 다섯 종류의 조인 전략을 가진다.
◦
브로드캐스트 해시 조인(Broadcast Hash Join, BHJ)
◦
셔플 해시 조인(Shuffle Hash Join, SHJ)
◦
셔플 소트 머지 조인(Shuffle Sort Merge Join, SMJ)
◦
브로드캐스트 네스티드 루프 조인(Broadcast Nested Loop Join, BNLJ)
◦
셔플 복제 네스티드 루프 조인(Crtesian Product Join)
•
본 교재에서는 BHJ와 SMJ만을 다룬다.
브로드캐스트 해시 조인(BHJ) (= 맵사이드 조인, map-side-only join)
•
이상적으로 데이터 이동이 거의 필요 없도록 한 쪽은 작고(드라이버와 이그제큐터 메모리에 들어갈 사이즈) 다른 쪽은 큰 두 종류의 데이터를 사용하여 특정 조건이나 칼럼 기준으로 조인한다.
•
더 작은 쪽의 데이터가 드라이버에 의해 모든 스파크 이그제큐터에 복사되어서 뿌려지고, 이어서 각 이그제큐터에 나뉘어 있는 큰 데이터와 조인된다.
•
이 전략은 큰 데이터 교환이 이루어지지 않게 한다.
•
일반적으로 작은 쪽의 데이터가 10MB 이하일 때 브로드캐스트 조인을 사용한다. 이 설정은 spark.sql.autoBroadcastJoinThreshold 에 의해 지정되며, 각 이그제큐터와 드라이버의 설정에 따라 높이거나 낮출 수 있다.
◦
일반적으로 작은 쪽의 데이터가 이 설정값보다 낮은 경우 스파크는 자동으로 BHJ을 시도한다.
브로드캐스트 해시 조인을 사용하면 좋은 경우
•
두 데이터 프레임에 공통적인 키들이 존재하고, 한 쪽이 가지고 있는 정보가 적으며 양쪽의 뷰를 병합하는 경우
◦
예제) 전 세계의 축구 선수들에 대한 정보가 있는 큰 데이터세트인 playersDF와 선수들이 활동하는 축구 팀들의 정보가 있는 더 작은 데이터세트인 clubDF를 조인하는 상황 (아래 scala 예제)
val joinedDF = playerDF.join(broadcast(clubDF), "key1 == key2")
Scala
복사
•
양쪽 데이터세트의 각 키가 스파크에서 동일한 파티션 안에 해시될 때
•
한 데이터가 다른 쪽보다 많이 작은 경우
•
•
더 작은 쪽의 데이터가 모든 스파크 이그제큐터에 브로드캐스트될 때 발생하는 과도한 네트워크 대역폭이나 OOM 오류에 대해 걱정할 필요가 없는 경우
Equi-Join vs Non-Equi-Join
•
Equi-Join: 두 테이블에서 공통적으로 존재하는 컬럼의 값이 일치되는 행을 연결해서 결과를 생성하는 것 (= 조건을 검사하는 일반적인 조인)
•
Non-Equi-Join: = 연산자 이외의 비교 연산자를 이용하여 조인을 수행하는 것
셔플 소트 머지 조인 (SMJ)
•
정렬 가능하고 겹치지 않으면서 공통 파티션에 저장 가능한 공통 키를 기반으로 큰 두 종류의 데이터세트를 합칠 수 있는 효과적인 방법
•
각 데이터세트의 동일 키를 가진 데이터세트의 모든 레코드가 동일 이그제큐터의 동일 파티션에 존재해야 한다. 즉, 데이터가 이그제큐터 사이에 교환이 되어야 하거나 공통 위치에 존재해야 함을 의미한다.
•
1. 정렬 단계 와 2. 머지 단계 로 나뉘어진다.
◦
정렬 단계 : 각 데이터를 조인 키에 따라 정렬
◦
머지 단계 : 각 데이터세트에서 키 순서대로 데이터를 순회하며 키가 일치하는 로우끼리 병합
•
기본적으로 SHJ는 spark.sql.join.preferSortMergeJoin 설정에 의해 활성화된다.
◦
또한, spark.sql.autoBroadcastJoinThreshold 가 -1로 설정된 경우 스파크는 기본적으로 SMJ를 시도한다.
SMJ 예제 - 교재의 scala 예제를 pyspark로 변경
•
필요한 패키지 불러오기 및 환경 설정, dictionary 만들기
import randon
# 아래의 설정으로 자동으로 SMJ가 시행됨.
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
# state, item column 생성을 위한 dict
states_dict = {0:"AZ", 1:"CO", 2:"CA", 3:"TX", 4:"NY", 5:"MI" }
items_dict = {0:"SKU-0", 1:"SKU-1", 2:"SKU-2", 3:"SKU-3", 4:"SKU-4", 5:"SKU-5"}
Python
복사
•
usersDF 생성
usersDF = spark.range(1 * 10000000).rdd.map(lambda x: (x[0],
"user_"+str(x[0]),
"user_"+str(x[0])+"@databricks.com",
states_dict[random.choice(range(6))])
).toDF(["uid", "login", "email", "user_state"])
Python
복사
•
ordersDF 생성
ordersDF = spark.range(1 * 10000000).rdd.map(lambda x: (x[0],
x[0],
random.choice(range(10001)),
10 * x[0] * 0.2,
states_dict[random.choice(range(6))],
items_dict[random.choice(range(6))])
).toDF(["transaction_id", "quantity", "users_id", "amount", "state", "items"
Python
복사
•
join
usersOrdersDF = ordersDF.join(usersDF, ordersDF.users_id == usersDF.uid)
Python
복사
•
실행 계획 확인
◦
dataframe에 explain() 메서드를 입력하면 어떤 조인이 사용되는지 물리적 계획을 확인할 수 있다.
◦
아래의 예제에서는 기대한 것 처럼 SortMergeJoin이 사용된 것을 확인할 수 있다.
◦
sparkUI를 이용하여 DAG를 살펴보면 총 3단계의 스테이지로 전체 작업이 이루어지고 (앞에 두 개는 데이터 생성, 뒤에 한 개가 Join 작업) 마지막 스테이지에서 Exchange와 Sort 작업이 병합 직접에 실행되는 것을 알 수 있다.
◦
Exchange는 이그제큐터 간에 네트워크상으로 파티션이 셔플되어야 하는 비싼 작업이다.
usersOrdersDF.explain()
Python
복사
# 출력 결과
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [users_id#893L], [uid#864L], Inner
:- Sort [users_id#893L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(users_id#893L, 5), ENSURE_REQUIREMENTS, [plan_id=902]
: +- Filter isnotnull(users_id#893L)
: +- Scan ExistingRDD[transaction_id#891L,quantity#892L,users_id#893L,amount#894,state#895,items#896]
+- Sort [uid#864L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(uid#864L, 5), ENSURE_REQUIREMENTS, [plan_id=903]
+- Filter isnotnull(uid#864L)
+- Scan ExistingRDD[uid#864L,login#865,email#866,user_state#867]
Python
복사
SMJ(Shuffle Merge Join) 최적화 - 버켓팅(Bucketing)
•
SMJ를 빈번하게 사용해야 하는 상황이라면 공통의 정렬된 키나 칼럼을 위한 파티션된 버킷을 만들어 Exchange 단계를 없앨 수 있다.
•
이 방식으로 데이터를 재구성하면 Exchange 단계를 생략하고 WholeStageCodegen으로 넘어가므로 성능을 올릴 수 있다.
•
예제 코드
◦
usersDF와 ordersDF를 각각 uid와 users_id로 정렬 후 버킷을 만들어 스파크 테이블에 파케이 포맷으로 저장 후에 다시 불러들인 후 조인을 시키는 예제
◦
테이블을 정렬된 상태로 저장했기 때문에 SortMergeJoin 동안 정렬할 필요가 없고, 작업 DAG를 살펴보면 Exchange를 건너뛰고 바로 WholeStageCodegen 단계로 넘어갔음을 볼 수 있다.
◦
explain()을 통해 물리 계획(Physical Plan)을 살펴보았을 때도 Exchange 연산이 실행되지 않았음을 알 수 있다.
# usersDF bucketing
usersDF.orderBy(col("uid").asc()) \
.write.format("parquet") \
.bucketBy(8, "uid") \
.mode("overwrite") \
.saveAsTable("UsersTbl")
# ordersDF bucketing
ordersDF.orderBy(col("users_id").asc()) \
.write.format("parquet") \
.bucketBy(8, "users_id") \
.mode("overwrite") \
.saveAsTable("OrdersTbl")
# table caching
spark.sql("CACHE TABLE UsersTbl")
spark.sql("CACHE TABLE OrdersTbl")
# reread
usersBucketDF = spark.table("UsersTbl")
ordersBucketDF = spark.table("OrdersTbl")
# join
joinUsersOrdersBucketDF = ordersBucketDF.join(usersBucketDF, ordersBucketDF.users_id == usersBucketDF.uid)
Python
복사
# joinUsersOrdersBucketDF.explain() 실행 결과
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
*(3) SortMergeJoin [users_id#268], [uid#129], Inner
:- *(1) Sort [users_id#268 ASC NULLS FIRST], false, 0
: +- *(1) Filter isnotnull(users_id#268)
: +- Scan In-memory table OrdersTbl [transaction_id#266L, quantity#267L, users_id#268, amount#269, state#270, items#271], [isnotnull(users_id#268)]
: +- InMemoryRelation [transaction_id#266L, quantity#267L, users_id#268, amount#269, state#270, items#271], StorageLevel(disk, memory, deserialized, 1 replicas)
: +- *(1) ColumnarToRow
: +- FileScan parquet default.orderstbl[transaction_id#266L,quantity#267L,users_id#268,amount#269,state#270,items#271] Batched: true, Bucketed: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/yschoi/Library/CloudStorage/Dropbox/yunseo/development/BOA..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<transaction_id:bigint,quantity:bigint,users_id:string,amount:double,state:string,items:str..., SelectedBucketsCount: 8 out of 8
+- *(2) Sort [uid#129 ASC NULLS FIRST], false, 0
+- *(2) Filter isnotnull(uid#129)
+- Scan In-memory table UsersTbl [uid#129, login#130, email#131, user_state#132], [isnotnull(uid#129)]
+- InMemoryRelation [uid#129, login#130, email#131, user_state#132], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) ColumnarToRow
+- FileScan parquet default.userstbl[uid#129,login#130,email#131,user_state#132] Batched: true, Bucketed: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/yschoi/Library/CloudStorage/Dropbox/yunseo/development/BOA..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<uid:string,login:string,email:string,user_state:string>, SelectedBucketsCount: 8 out of 8
Python
복사
셔플 소트 머지 조인을 사용하면 좋은 경우
•
두 개의 큰 데이터세트의 각 키가 정렬 및 해시되어 스파크에서 동일 파티션에 위치할 수 있을 때
•
Equi-Join만을 수행하여 정렬된 동일 키 기반으로 두 데이터세트를 조합하기를 원할 때
•
네트워크 간에 규모가 큰 셔플을 일으키는 Exchange 와 Sort 연산을 피하고 싶을 때
스파크 UI 들여다보기
•
메모리 사용량, 잡, 스테이지, 태스크 등에 대한 자세한 정보뿐 아니라 이벤트 타임라인, 로그, 스파크 애플리케이션 안의 통계 정보를 스파크 드라이버와 각 개별 이그제큐터 레벨에서 제공
•
spark-submit은 스파크 UI를 띄우게 되며 로컬 호스트나 스파크 드라이버를 통해 기본 포트 4040으로 연결할 수 있다.
Jobs와 Stage 탭
•
스파크는 애플리케이션은 잡, 스테이지, 테스크 단위로 나누어 처리하는데, jobs와 stage 탭을 통해 완료 상태, I/O 관련 수치, 메모리 소비량, 실행 시간 등을 살펴볼 수 있다.
Jobs
•
이그제큐터들이 어느 시점에 클러스터에 추가되거나 삭제되었는지 확인 할 수 있다.
•
실행 정도과 실행 시간(Duration)을 확인 할 수 있다.
◦
실행 시간이 비정상적으로 크다면 딜레이를 일으키는 태스크가 해당 잡의 스테이지에 포함되어 있을 가능성이 있으므로 이를 살펴볼 필요가 있다.
Stage
•
모든 잡의 모든 스테이지의 현재 상태에 대한 요약을 제공한다.
•
디테일 페이지로 들어가면 DAG와 태스크의 메트릭도 확인 가능하다.
◦
각 태스트의 평균 수행시간, GC(garbage collection)에 걸린 시간, 셔플 읽기로 읽어 들인 바이트 크기나 레코드 개수 등의 지표를 확인 할 수 있다.
◦
만약 셔플 데이터가 원격 이그제큐터에서 읽고 있는 중이라면 높은 셔플 읽기 대기 시간 수치가 I/O 이슈일 수 있고,
◦
높은 GC 시간은 힙 메모리에 너무 많은 객체가 있다는 의미일 수 있다. (이그제큐터에 메모리 부족 문제 발생 가능)
◦
최대 테스크 수행시간이 평균보다 너무 높다면 파티션들끼리 균등하지 못한 데이터 분포에 의해 데이터 불균형(data skew)이 일어났을 수 있다.
Executors 탭
•
애플리케이션에 생성된 이그제큐터들에 대한 정보를 제공
•
상세 자원 사용량(디스크, 메모리, CPU 코어), GC 수행시간, 셔플 동안 읽고 쓴 데이터양 등을 확인할 수 있다.
Storage 탭
•
storage 탭을 통해 요약된 통계량 외에도 개별 이그제큐터에서 메모리를 얼마나 쓰는지, 무슨 목적으로 쓰는지 볼 수 있다. 이는 데이터 프레임이나 관리 테이블에서 cache()나 persist()를 썼을 때 사용량을 확인하는 데 도움이 된다.
•
즉, 메모리 사용량의 세부 사항을 알려준다.
•
위의 버켓팅 예제에서도 storage 탭을 통해 이를 확인했었다.
SQL 탭
•
스파크 SQL 쿼리의 효과를 확인할 수 있다.
•
쿼리의 Description을 클릭하면 아래의 2번째 그림처럼 모든 물리적 오퍼레이터들과 함께 상세 실행 계획을 보여준다.
•
실행 계획의 각 물리 오퍼레이터(Scan In-memory table, Filter 등) 밑에는 SQL 통계 수치들이 이쓴ㄴ데, 이 수치들을 통해 물리 오퍼레이터의 세부적인 내용과 어떤 일을 했는지 알고 싶을 때 유용하게 사용할 수 있다.(레코드를 몇 개나 읽어 들였는지, 셔플로 몇 바이트를 썼는지 등)
Environment 탭
•
스파크 애플리케이션이 돌아가는 환경 정보를 알 수 있다.
•
어떤 환경 변수가 지정되어 있는지, 어떤 jar 파일들이 포함되어 있는지, 어떤 스파크 특성이나 시스템 특성이 지정되어 있는지, 어떤 런타임 환경이 사용되는지 등을 알 수 있다.
Reference
•
러닝스파크 2nd Edition, 제이펍, 2022