Search

[러닝 스파크 2장] 아파치 스파크 시작

스파크 설치 구성요소 톺아보기

tar -xf ~/Downloads/spark-3.1.1-bin-hadoop2.7.tgz 를 통해 해당 디렉토리를 톺아보자
이 중에서 살펴볼 것들을 알아보자
bin 이름이 가리키듯 이 디렉터리는 스파크 셸들을 포함해서 (spark-sql, pyspark. spark-shell, sparkR) 스파크와 상호 작용할 수 있는 대부분의 스크립트.
여기와 셸과 실행 파일은 나중에 spark-submit을 써서 단독 스파크 애플리케이션을 제출하거나 쿠버네티스로 스파크를 실행할 때 도커 이미지를 만들고 푸시하는 스크립트 작성을 위해 사용하게 될 것이다.
sbin 이 디렉터리의 대부분의 스크립트는 다양한 배포 모드에서 클러스터의 스파크 컴포넌트들을 시작하고 중지하기 위한 관리 목적
Kubernetes
스파크 2.4릴리스부터 이 디렉토리에 쿠버네티스 클러스터에서 쓰는 스파크를 위한 도커 이미지 제작을 위한 Dockerfile들이 존재
data
Mlib, 정형화 프로그래밍, GraphX 등에서 입력으로 사용되는 *.txt 파일이 존재
다양할 언어의 셸들이 있지만 pySpark만 소개

Pyspark Shell

대화형 shell로써 동작
해당 셸은 클러스터에 연결하고 분산 데이터를 스파크 워커 노드의 메모리에 로드할 수 있도록 확장
기가바이트급 데이터를 다루든 작은 데이터세트를 다루든 스파크 셸은 스파크를 빨리 배우는 데에 적절
공식 홈페이지 압축 해제 설치
파이스파크를 시작하려면 cd로 bin 디렉터리로 가서 pyspark를 실행해서 셸을 띄운다.
PyPI
pip install pyspark
PyPI를 써서 파이스파크를 설치했다면 아무 데서나 그냥 pyspark를 실행하면 된다.

로컬에서 pySpark 실행

스파크 연산들은 작업으로 표현
작업들은 태스크라고 불리는 저수준 RDD 바이트 코드로 변환되며 실행을 위해 스파크의 이그제큐터들에 분산됨
Word Count 예제
데이터 프레임을 써서 텍스트 파일을 읽고, 읽은 문자열을 보여주고 파일의 줄 수를 세는 짧은 예제를 보자.
RDD를 쓰지 않고 상위수준 정형화 API을 써서 텍스트 파일을 스파크 데이터 프레임으로 읽어드림
show(10, false) : 데이터 프레임에서 문자열을 자르지 않고 첫 번째 10줄만 보여줌
>> strings = spark.read.text(" .. /README.md") >> strings.show(10, truncate=False) >> strings.count() # 124
Python
복사
셸 Exit : Ctrl + D

Spark Application 개념 이해

애플리케이션 : API를 써서 스파크 위에서 돌아가는 사용자 프로그램. 드라이버 프로그램과 클러스터의 실행기
SparkSession : 스파크 코어 기능들과 상호 작용할 수 있는 진입점(게이트웨이)을 제공하며 그 API로 프로그래밍을 할 수 있게 해주는 객체
잡(job) : 스파크 액션 action(예: save(), collect()) 에 대한 응답으로 생성되는 여러 태스크 task로 이루어진 병렬 연산
스테이지(stage): 각 잡은 스테이지라 불리는 서로 의존성을 가지는 다수의 태스크 모음
태스크(task): 스파크 executor로 보내지는 작업 실행의 가장 기본적인 단위

스파크 애플리케이션과 SparkSession

모든 스파크 애플리케이션의 핵심 == 스파크 드라이버 프로그램
이 드라이버가 SparkSession 객체를 만든다.
스파크 셸을 써서 작업을 할 때
드라이버는 셸에 포함되어 있는 형태
SparkSession 객체가 미리 만들어진다.
SparkSession 객체 ⇒ spark라는 이름의 변수로 접근 가능하다
스파크 셸을 랩톱에서 로컬 실행할 경우 ⇒ 모든 연산 또한 단일 JVM에서 로컬 실행
클러스터에서의 스파크 셸을 실행
아래 그림은 클러스터에서 어떻게 스파크가 실행되는지 보여준다
요점 : SparkSession 객체를 만들었으면 그를 통해 스파크 연산을 수행하는 API를 써서 프로그래밍이 가능

스파크 잡

스파크 shell로 상호 작용 작업 동안, 드라이버는 스파크 앱을 하나 이상의 스파크 잡으로 변환
각 잡 → DAG로 변환
본질적으로 이것이 스파크의 실행 계획
DAG 그래프에서 각각의 노드는 하나 이상의 스파크 스테이지에 해당.

스테이지

어떤 작업이 연속적으로 또는 병렬적으로 수행되는지에 맞춰 스테이지에 해당하는 DAG로 구성
모든 스파크 연산이 하나의 스테이지 안에서 실행될 수는 없으므로 여러 스테이지로 나뉘어야 한다.
종종 스파크 이그제큐터끼리의 데이터 전송이 이루어지는 연산 범위 경계 위에서 스테이지가 결정되기도 한다.
ex) Shuffle : 노드끼리의 데이터 교환이 스테이지의 경계 → 비용이 많이 들어가

스파크 태스크

스테이지최소 실행 단위
스파크 executor들 위에서 연합 실행되는 스파크 태스크들로 이루어진다.
각 태스크 - 개별 CPU 코어에서 할당 + 데이터의 개별 파티션을 갖고 작업
16코어 이그제큐터 - 16개 이상의 파티션을 갖는 16개 이상의 태스크를 할당받아 작업 → 철저한 병렬 처리 수행

트랜스포메이션, 액션, 지연 평가

분산 데이터의 스파크 연산은 트랜스포메이션 transformation액션 action으로 구분
트랜스포메이션
python.deepcopy()
이미 불변성의 특징을 가진 원본 데이터를 수정하지 않고 하나의 스파크 데이터 프레임새로운 데이터 프레임으로 그 이름처럼 변형 transform한다.
즉, select()filter() 같은 연산은 원본 데이터 프레임을 수정하지 않으며, 대신 새로운 데이터 프레임으로 연산 결과를 만들어 되돌려 준다.
모든 트랜스포메이션은 뒤늦게 평가(지연 평가)된다.
다시 말해 그 결과는 즉시 계산되는 게 아니라 계보 lineage라 불리는 형태로 기록.
기록된 리니지는 실행 계획에서 후반쯤에 스파크가 확실한 트랜스포메이션들끼리 재배열하거나 합치거나 해서 더 효율적으로 실행할 수 있도록 최적화하도록 함
지연 평가는 액션이 실행되는 시점이나 데이터에 실제 접근하는 시점(디스크에서 읽거나 쓰는 시점)까지 실제 실행을 미루는 스파크의 전략
액션
하나의 액션모든 기록된 트랜스포메이션의 지연 연산을 발동시킨다.
모든 트랜스포메이션 T액션 A를 호출할 때까지 기록된다.
트랜스포메이션 T새로운 데이터 프레임을 생성한다.
지연평가
스파크가 사용자의 연계된 트랜스포메이션들을 살펴봄으로써 쿼리 최적화를 가능하게 함
리니지데이터 불변성은 장애에 대한 데이터 내구성을 제공
스파크는 리니지의 트랜스포메이션을 기록해 놓고 데이터 프레임들은 트랜스포메이션을 거치는 동안 변하지 않음
⇒ 단순히 기록된 리니지를 재실행하는 것만으로도 원래 상태를 다시 만들어 낼 수 있으며 이 덕분에 장애 상황에도 유연성을 확보 가능 (일종의 트랜스포메이션들의 샤딩 같은 느낌)
스파크 연산 중 트랜스포메이션액션
트랜스포메이션
액션
OrderBy()
show()
groupBy()
take()
filter()
count()
select()
collect()
join()
save()
액션과 트랜스포메이션들은 스파크 쿼리 계획이 만들어지는 데에 도움을 준다
하나의 쿼리 계획 안의 어떤 것도 액션이 호출되기 전에는 실행되지 않는다.
strings = spark.read.text(" .. /README.md") filtered = strings.filter(strings.value.contains("Spark")) filtered.count() # 20
Python
복사
예제 설명
두 개의 트랜스포매이션 read()와 filter()
하나의 액션 count()
액션은 쿼리 실행 계획의 일부로서 기록된 모든 트랜스포메이션들의 실행을 시작하게 한다 예제에서는 액션인 filtered.count()가 입력되기 전까지는 아무것도 셸에서 실제로 실행되지 않음

좁은/넓은 트랜스포메이션

트랜스포메이션은 스파크가 지연 평가하는 연산 종류이다.
지연 연산 개념의 큰 이득은 스파크가 연산 쿼리를 분석하고 어디를 최적화할지 알 수 있다는 점이다.
이 최적화는 조인이나 파이프라이닝이 될 수도 있음
연산들을 한 스테이지로 합치거나 반대로 어떤 연산이 셔플이나 클러스터 데이터 교환이 필요한지 파악해 나누거나 하는 식으로 이루어질 수 있다.
트랜스포메이션은 좁은 narrow 의존성넓은 wide 의존성으로 분류할 수 있다.
넓은 트랜스포메이션 → 셔플 요구, 좁은 트랜스포메이션 → 셔플 요구 X
좁은 트랜스포메이션
하나의 입력 파티션을 연산하여 하나의 결과 파티션을 내놓는 트랜스포메이션 = 좁은 트랜스포메이션
앞의 예제에서 filter()contains()
하나의 파티션을 처리하여 데이터 교환 없이 결과 파티션을 생성 = 좁은 트랜스포메이션
좁은 종속성은 파티션간 데이터의 이동을 요구 X → 드라이버와 통신이 필요 없음 → 드라이버가 보낸 명령을 파티션에서 바로 실행
좁은 트랜스포메이션들의 각 모음이 하나의 스테이지 안에서 연산 가능
넓은 트랜스포메이션
groupBy()orderBy()
스파크가 넓은 트랜스포메이션을 수행하게 하는데, 이는 다른 파티션으로부터 데이터를 읽어 들여서 합치고 디스크에 쓰는 등의 일을 하기 때문이다. → 셔플 필요
넓은 트랜스포메이션들을 연속으로 놓고 실행하는 것은(만약 메모리 오류를 일으킬 확률이 높다면) 높은 부하의 재연산을 불러올 수도 있다.

스파크 UI

스파크는 그래픽 유저 인터페이스 (graphical user interface, GUI)를 써서 스파크 애플리케이션을 살펴볼 수 있게 해주며 다양한 레벨(잡, 스테이지, 태스크 레벨)에서 확인 가능하다.
웹 UI를 띄우게 되며 기본적으로 4040 포트를 사용(http://localhost:4040)
RDD 크기와 메모리 사용의 요약
환경 정보
실행 중인 이그제큐터 정보
모든 스파크 SQL 쿼리
스테이지 안에서 각각의 연산은 파란 박스로 표시된다. 스테이지 0은 하나의 태스크로 구성된다.
만약 태스크가 여러 개라면, 모두 병렬로 실행될 것이다.
UI가 스파크 내부의 작업에 대한 디버깅과 검사 도구로서 현미경 역할을 한다는 것

첫 번째 단독 애플리케이션

쿠키 몬스터를 위한 M&M 세기

파일이 너무 큰 경우 데이터를 작은 조각들로 나눠 클러스터에 분산
우리가 작성한 스파크 프로그램이 각 파티션의 단어를 세는 태스크를 분산 처리한 후 최종적으로 단어 개수 결과를 집계해 되돌려 주게 될 것
더욱 큰 데이터에 스파크의 분산 기능과 데이터 프레임 API를 쓸 것
Q. 10만 개 이상의 데이터를 갖고 있는 파일을 읽어 들여서(각 라인은 주, M&M 색깔 개수를 갖고 있다)
색깔과 주별로 집계하는 스파크 프로그램을 작성
⇒ 각 주별로 학생들이 어떤 색깔의 M&M을 좋아하는지 알려줄 것
mnmcount.py
# 필요한 라이브러리들을 불러온다. # 파이썬을 쓰므로 SparkSession과 관련 함수들을 PySpark 모듈에서 불러온다. import sys from pyspark.sql import SparkSession if __name__ == "__main__": if len(sys.argv) != 2: print("Usage: mnmcount <file>", file=sys.stderr) sys.exit(-1) # SparkSession API 를 써서 SparkSession 객체를 만든다. # 존재하지 않으면 객체를 생성한다. # JVM마다 SparkSession 객체는 하나만 존재할 수 있다. spark = (SparkSession .builder .appName("PythonMnMCount") .getOrCreate()) # get the M&M data set file name # 명령행 인자에서 M&M data가 들어 있는 파일 이름을 얻는다 mnm_file = sys.argv[1] # read the file into a Spark DataFrame # 스키마 추론과 쉼표로 구분된 구분된 칼럼 이름이 제공되는 헤더가 있음 # 지정해 주고 CSV 포맷으로 파일을 읽어 들여 데이터 프레임에 저장한다 mnm_df = (spark.read.format("csv") .option("header", "true") .option("inferSchema", "true") .load(mnm_file)) mnm_df.show(n=5, truncate=False) # 데이터 프레임 고수준 API를 사용하고 RDD는 전혀 쓰지 않는다는 점에 주목 # 일부 스파크함수들은 동일한 객체를 되돌려 주므로 함수 호출을 체이닝할 수 있다. # 1. 데이터 프레임에서 "State", "Color", "Count" 필드를 읽는다 # 2. 각 주별로, 색깔별로 그룹화하기 원하므로 groupBy()를 사용한다 # 3. 그룹화된 주/색깔별로 모든 색깔별 집계를 한다 # 4. 역순으로 orderBy() 한다. count_mnm_df = (mnm_df.select("State", "Color", "Count") .groupBy("State", "Color") .sum("Count") .orderBy("sum(Count)", ascending=False)) # 모든 주와 색깔 별로 결과를 보여준다 # show()는 액션이므로 위의 쿼리 내용들이 시작되게 된다는 점에 주목 count_mnm_df.show(n=60, truncate=False) print("Total Rows = %d" % (count_mnm_df.count())) # find the aggregate count for California by filtering # 필터링을 통해 캘리포니아의 결과만을 찾아낸다 # 1. 데이터 프레임에서 모든 줄을 읽는다 # 2. CA주에 대한 것만 걸러낸다 # 3. 위에서 했던 것처럼 주와 색깔별로 groupBy # 4. 각 색깔 별로 카운트를 합친다 # 5. order by로 sum(count) 기준 역순 정렬 ca_count_mnm_df = (mnm_df.select("*") .where(mnm_df.State == 'CA') .groupBy("State", "Color") .sum("Count") .orderBy("sum(Count)", ascending=False)) # 캘리포니아의 집계 결과 보여줌 # show를 통해 전체 연산 실행을 발동 ca_count_mnm_df.show(n=10, truncate=False) # SparkSession을 멈춘다 spark.stop()
Python
복사
선호하는 편집기를 써서 mmcount.py라는 이름의 파일을 만들어 위 파이썬 코드를 입력하고 이 책의 깃허브 저장소에서 mnm_dataset .csv를 다운로드해 bin 디렉터리의 submit-spark 스크립트로 스파크 잡을 제출한다.
SPARK_HOME 환경변수는 로컬 머신의 스파크 설치 디렉터리의 루트 레벨 경로로 지정
콘솔에서 정신 없는 INFO 메시지 출력을 없애고 싶다면 log4j.properties.template 파일을 conf/log4j.properties로 카피해 놓고 log4j.rootCateory=WARN을 지정해준다.
pyspark가 깔려있다는 전제
. ├── data │ └── mnm_dataset.csv └── mnmcount.py
Python
복사
$SPARK_HOME/bin/spark-submit mnrncount.py data/mnm_dataset.csv
python mnmcount.py data/mnm_dataset.csv
일단 각 주별 색깔별 합계를 볼 수 있으며 이어서 캘리포니아(CA)에 대한 결과만이 출력
참고로 RDD 연산을 위와 같이 안 사용하고 다른 자료구조 혹은 데이터 프레임 API를 사용하는 것이 로컬에서 오류가 덜 난다. RDD API는 py4j 오류가 많이 발생하는 편이다. 만약 궁금하면 보여드리겠습니다.

참고

파이썬은 스칼라와 다르게 인터프리터 언어 때문에 컴파일 과정이 필요 없다(파이썬 언어도 바이트 코드 컴파일(.pyc)가 가능하긴 하나 이 과정에서는 생략)
스칼라는 build.sbt라는 C의 Makefile 처럼 스칼라 컴파일러가 사용자의 스칼라 관련 작업, jar 파일과 같은 파일 생성, 패키징, 의존성 관리, 디렉터리 지정 등을 어떻게 처리할지 기술해놓은 명세 파일.
스칼라로 spark 프로그램을 빌드하기 위해서는 JAVA_HOME과 SPARK_HOME 환경 변수도 설정한 후, 아까와 비슷한 스칼라 명령어로 스파크 명령어로 스파크 앱을 빌드할 수 있다. (책을 참고하길 바란다)