[러닝스파크 10장] MLlib을 사용한 머신러닝
~ 338(362)
스파크에는 spark.mllib 및 spark.ml의 두 가지 머신러닝 패키지 존재
spark.mllib
•
RDD API를 기반으로 하는 기존의 머신러닝 API
spark.ml
•
데이터 프레임을 기반으로 하는 최신 API
•
spark.ml은 모델이 보유한 데이터 포인트 수에 따라 선형으로 확장되는 확장에 중점을 두어 방대한 양의 데이터로 확장
머신러닝 파이프라인 설계
•
MLlib에서 파이프라인 API는 머신러닝 워크플로를 구성하기 위해 데이터 프레임 위에 구축된 고급 API를 제공
•
파이프라인 API
◦
일련의 변환기transformer와 추정기estimator로 구성
•
MLlib을 사용하여 종단 간 파이프라인을 구축하는 것이 목표
•
변환기 (transformer)
◦
데이터 프레임을 입력으로 받아들이고, 하나 이상의 열이 추가된 새 데이터 프레임을 반환
◦
변환기는 데이터에서 매개변수를 학습하지 않고, 단순히 규칙 기반 변환을 적용하여 모델 훈련을 위한 데 이터를 준비하거나 훈련된 Mllib 모델을 사용하여 예측을 생성.
◦
.transform() 메서드가 있다.
•
추정기 (estimator)
◦
.fit() 메서드를 통해 데이터 프레임에서 매개변수를 학습하고 변환기인 Model을 반환
•
파이프라인
◦
일련의 변환기와 추정기를 단일 모델로 구성
◦
파이프라인 자체가 추정기인 반면, pipeline.fit()의 출력은 변환기인 PipelineModel을 반환
데이터 수집 및 탐색
filePath = """../databricks-datasets/learning-spark-v2/sf-airbnb/sf-airbnb-clean.parquet/"""
airbnbDF = spark.read.parquet(filePath)
airbnbDF.select("neighbourhood_cleansed", "room_type", "bedrooms", "bathrooms", "number_of_reviews", "price").show(5)
Python
복사
학습 및 테스트 데이터세트 생성
피처 엔지니어링 및 모델링을 시작하기 전에 데이터 세트를 학습train 및 테스트test의 두 그룹으로 나눈다. (8:2)
# 파이썬 예제
trainDF, testDF = airbnbDF.randomSplit([.8, .2], seed=42)
print(f"""There are {trainDF.count()} rows in the training set,
and {testDF.count()} in the test set""")
# There are 5780 rows in the training set, and 1366 in the test set
Python
복사
스파크 클러스터의 이그제큐터 수를 변경하면 어떻게 될까?
→ 카탈리스트 옵티마이저는 클러스터 리소스 및 데이터세트 크기에 따라 데이터를 분할하는 최적의 방법을 결정
일관된 결과를 얻을 수 있도록 클러스터 구성과 시드를 수정할 수 있지만 데이터를 한 번 분할한 다음 이러한 재현성 문제가 발생하지 않도록 자체 훈련/테스트 폴더에 기록하는 것이 좋다.
변환기를 사용하여 기능 준비
선형 회귀(스파크의 다른 많은 알고리즘과 마찬가지로)에서는 모든 입력 기능이 데이터 프레임의 단일 벡터 내에 포함되어야 함 ⇒ 데이터를 변환transform
데이터 프레임을 입력으로 받아들이고, 하나 이상의 열이 추가된 새 데이터 프레임을 반환
모든 기능을 단일 벡터에 넣는 작업을 위해 VectorAssembler 변환기를 사용
VectorAssembler는 입력 열 목록을 가져와서 기능이라고 부를 추가 열이 있는 새 데이터 프레임을 만든다. ⇒ 입력 열의 값을 단일 벡터로 결합
침실이라는 단일 특성만을 사용하여 모델을 구축
# 파이썬 예제
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=["bedrooms"], outputCol="features")
vecTrainDF = vecAssembler.transform(trainDF)
vecTrainDF.select("bedrooms", "features", "price").show(10)
Python
복사
추정기를 사용하여 모델 구축
선형 회귀에 대한 입력 열(features)이 vectorAssembler의 출력
LinearRegression은 데이터 프레임을 사용하고 모델을 반환한다.
추정기는 데이터에서 매개변수를 배우고, estimator_name.fit() 메서드를 갖고 열성적이라고 평가되는 반면, 변환기는 느리게 평가된다.
추정기의 다른 예로는 Imputer, DecisionTreeClassifier 및 Random ForestRegressor가 있다.
# 파이썬 예제
from pyspark.ml.regression import LinearRegression
Ir = LinearRegression(featuresCol="features", labelCol="price")
IrModel = Ir.fit(vecTrainDF)
Python
복사
lnfit()은 변환기인 LinearRegressionModel(lrModel)을 반환
= 추정기의 fit() 메서드의 출력은 변환기
#파이썬 예제
m = round(IrModel.coefficients[0], 2)
b = round(IrModel.intercept, 2)
print(f"""The formula for the linear regression line is price = {m}*bedrooms + {b}""")
# The formula for the linear regression line is price = 123.68*bedrooms + 47.51
Python
복사
파이프라인 생성
모델을 테스트 세트에 적용하려면 훈련 세트와 동일한 방식으로 해당 데이터를 준비
데이터가 통과할 단계를 순서대로 지정하기만 하면 스파크가 알아서 처리를 한다.
•
Pipelines는 추정기
•
PipelineModels(피팅된 파이프라인)는 변환기
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[vecAssembler, Lr])
pipelineModel = pipeline.fit(trainDF)
Python
복사
파이프라인 API를 사용하는 또 다른 이점
⇒ 어느 단계가 추정기/변환기인지 결정하므로 각 단계에 대해 name.fit() 대 name.transform()을 지정하는 것에 대해 우려할 필요가 없다는 것
predDF = pipelineModel.transform(testDF)
predDF.select("bedrooms", "features", "price", "prediction").show(10)
Python
복사
원-핫 인코딩
범주형 값을 숫잣값으로 변환
모든 숫자 및 범주 기능을 통합하는 약간 더 복잡한 파이프라인을 구축하는 방법
벡터로 표시되는 입력으로 숫자값을 기대
pandas.get_dummies()와 동일한 작업을 수행한다
•
300마리의 동물이 있는 동물원이 있다면 OHE가 메모리/컴퓨팅 리소스 소비를 크게 증가시킬까?
⇒ 스파크를 사용하면 그렇지 않다!
⇒ 내부적으로 SparseVector를 사용하므로 0값을 저장하는 공간을 낭비하지 않는다. (scipy와 유사)
DenseVector(0, 0, 0, 7, 0, 2, 0, 0, 0, 0)
SparseVector(10, [3, 5], [7, 2])
Python
복사
SparseVector를 생성하려면 벡터의 크기, 0이 아닌 요소의 인덱스 및 해당 인덱스의 해당 값을 추적
•
스파크로 데이터를 원-핫 인코딩하는 몇 가지 방법
◦
Stringindexer 및 OneHotEncoder를 사용하는 것
▪
Stringindexer 추정기를 적용하여 범주형 값을 범주 지수로 변환
▪
카테고리 인덱스를 만든 후에는 이를 OneHotEncoder에 대한 입력으로 전달할 수 있다
▪
범주형 피쳐를 원-핫 인코딩하는 방법을 보여준다.
from pyspark.ml.feature import OneHotEncoder, StringIndexer
categoricalCols = [field for (field, dataType) in trainDF.dtypes if dataType == "string"]
indexOutputCols = [x + "Index" for x in categoricalCols]
oheOutputCols = [x + "OHE" for x in categoricalCols]
stringIndexer = StringIndexer(inputCols=categoricalCols,
outputCols=indexOutputCols,
handleInvalid = "skip")
oheEncoder = OneHotEncoder(inputCols=indexOutputCols,
outputCols=oheOutputCols)
numericCols = [field for (field, dataType) in trainDF.dtypes if ((dataType == "double") & (field != "price"))]
assemblerInputs = oheOutputCols + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs,
outputCol="features")
Python
복사
•
Stringindexer는 테스트 데이터 세트에는 나타나지만 훈련 데이터 세트에는 나타나지 않는 새로운 범주를 어떻게 처리하는가?
⇒ handlelnvalid 매개변수 활용
◦
옵션 : 건너뛰기(잘못된 데이터가 있는 행 필터링), 오류(오류 발생) 또는 유지(인덱스 numLabels의 특수 추가 버킷에 잘못된 데이터를 넣음)
•
어려움 : Stringindexer에 범주형 기능으로 처리해야 하는 피처를 명시적으로 알려야 한다는 것
◦
Vectorindexer를 사용하여 모든 범주형 변수를 자동으로 감지할 수 있지만, 모든 단일 열을 반복하고 maxcategories 고유값보다 적은지 감지해야 하므로 계산 비용이 많이 든다.
maxCategories는 사용자가 지정하는 매개변수이며 이 값을 결정하는 것도 어려울 수 있다.
▪
RFormula를 사용하여 해당 레이블과 포함할 피처를 제공
from pyspark.ml.feature import RFormula
rFormula = RFormula(formula="price ~ . ",
featuresCol="features", labelCol="price", handleInvalid="skip")
Python
복사
•
공식 = ”y ~ .”는 사용 가능한 모든 피처를 사용한다는 것을 의미
•
RFormula 장점: Stringindexer와 OneHotEncoder를 자동으로 결합
◦
RFormula는 자동으로 모든 문자열 열을 Stringindex 및 원-핫 인코딩해서 숫자 열을 이중 유형으로 변환하고, 내부에서 VectorAssembler를 사용하여 이 모든 것을 단일 벡터로 결합
•
단점 : 모든 알고리즘에 대해 원-핫 인코딩이 필요하지 않거나 권장되지 않는다는 것
◦
모든 피처 준비 및 모델 구축을 파이프라인에 넣고 데이터 세트에 적용
# 파이썬 예제
lr = LinearRegression(labelCol="price", featuresCol="features")
pipeline = Pipeline(stages = [stringIndexer, oheEncoder, vecAssembler, lr])
# 또는 RFormula 사용
# pipeline = Pipeline(stages = [rFormula, lr])
pipelineModel = pipeline.fit(trainDF)
predDF = pipelineModel.transform(testDF)
predDF.select("features", "price", "prediction").show(5)
Python
복사
모델 평가
spark.ml에는 분류, 회귀, 클러스터링 및 순위 평가기 존재
◦
RMSE를 사용하여 회귀 모델 평가
from pyspark.ml.evaluation import RegressionEvaluator
regressionEvaluator = RegressionEvaluator(
predictionCol="prediction",
labelCol="price",
metricName="rmse")
rmse = regressionEvaluator.evaluate(predDF)
print(f"RMSE is {rmse:.1f}")
# RMSE is 220.6
Python
복사
▪
rmse 적합도 알아보는 법
회귀 작업에 대한 일반적인 기준선 모델은 훈련 세트 에서 레이블의 평균값을 계산 한 다음, 테스트 데이터 세트의 모든 레코드에 대해 를 예측하고 결과 RMSE를 계산하는 것
해당 작업을 거친 작업이 기준선, 대조 모델이 기준을 초과하지 않으면 모델 구축 프로세스에 문제가 발생 한 것일 수 있다.
◦
평가기준
▪
독립변수가 종속변수를 얼마나 잘 설명하는 지를 나타냅니다. R-squared는 0과 1 사이 값을 가집니다.
•
즉 1에 가까울수록 독립변수가 종속변수를 잘 설명할 수 있다는 뜻
▪
모델이 모든 데이터 포인트를 완벽하게 예측한다면 , 이 된다
▪
= 이면 분수는 1/1이므로 은 0이다.
•
모델은 항상 평균값 를 예측하는 것과 동일한 성능을 보인다.
▪
값이 음수이면 모델링 프로세스를 재평가해야 한다
▪
을 사용할 때 좋은 점은 비교할 기준 모델을 정의할 필요가 없다
▪
setter 속성을 사용하여 메트릭 이름을 설정
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
print(f"R2 is {r2}")
# R2 is 0.159854
Python
복사
모델 저장 및 로드
•
API는 model.write().save(path)
•
overwrite() 명령을 제공하여 해당 경로에 포함된 데이터를 덮어쓸 수 있다.
pipelinePath = ".tmp/lr-pipeline-model"
pipelineModel.write().overwrite().save(pipelinePath)
Python
복사
•
저장된 모델을 로드할 때 로드할 모델 유형을 다시 지정
from pyspark.ml import PipelineModel
savedPipelineModel = PipelineModel.load(pipelinePath)
Python
복사
•
스파크에는 ‘웜 스타트 warm start'라는 개념이 없기 때문에 이 모델의 가중치를 새 모델 교육을 위한 초기화 매개변수로 사용할 수 없다 ⇒
데이터 세트가 약간 변경되면 전체 선형 회귀 모델을 처음부터 다시 훈련해야 한다
하이퍼 파라미터 튜닝
•
하이퍼파라미터는 훈련 전에 모델에 대해 정의하는 속성이며 훈련 과정에서 학습되지 않는다
•
트리 기반 모델을 통해 하이퍼 파라미터 튜닝을 학습한다
◦
의사결정나무의 경우, 입력 피처를 표준화하거나 확장하는 것에 대해 걱정할 필요가 없다.
이는 분할에 영향을 미치지 않기 때문이다. 그러나 범주형 피처를 준비하는 방법에 대해서는 주의.
◦
spark.ml에서는 범주형 열을 String Indexer에 전달하기만 하면 나머지는 의사결정나무에서 처리 가능.
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(labelCol="price")
Python
복사
# 숫자 열만 필터링한다(가격, 레이블 제외).
numericCols = [field for (field, dataType) in trainDF.dtypes
if ((dataType == "double") & (field != "price"))]
# 위에서 정의한 Stringindexer의 출력과 숫자 열 결합
assemblerlnputs = indexOutputCols + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
# 단계를 파이프라인으로 결합
stages = [stringIndexer, vecAssembler, dt]
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(trainDF) # 이 라인에서 에러가 발생해야 함
Python
복사
◦
maxBins 매개변수에 문제가 있음
▪
maxBins는 연속 특성이 이산화되거나 분할되는 bin의 수를 결정
▪
모든 데이터와 모델이 단일 머신에 상주하기 때문에 사이킷런에는 maxBins 매개변수가 없다.
▪
그러나 스파크에서 워커는 데이터의 모든 열을 갖고 있지만 행의 하위 집합만 있다.
⇒ 분할할 피처와 값에 대해 통신할 때, 훈련 시간에 설정된 공통 이산화에서 얻은 동일한 분할값에 대해 모두 다루고 있는지 확인해야 한다.
dt.setMaxBins(40)
pipelineModel = pipeline.fit(trainDF)
dtModel = pipelineModel.stages[-1]
print(dtModel.toDebugString)
## feature importance 뽑기
import pandas as pd
featureImp = pd.DataFrame(
list(zip(vecAssembler.getInputCols(), dtModel.featureimportances)), columns=["feature", "importance"])
featureImp.sort_values(by="importance", ascending=False)
Python
복사
랜덤 포레스트 (앙상블)
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(labelCol="price", maxBins=40, seed=42)
Python
복사
하이퍼 파라미터 튜닝
최적의 하이퍼파라미터 구성을 식별하는 데 도움이 되는 하이퍼옵트
1. 평가할 추정기를 정의
2. ParamGridBuilder를 사용하여 변경하려는 하이퍼파라미터와 해당 값을 지정
3. 평가기(evaluator)를 정의하여 다양한 모델을 비교하는 데 사용할 메트릭을 지정.
4. CrossValidator를 사용하여 다양한 모델 각각을 평가하는 교차 검증을 수행.
pipeline = Pipeline(stages = [stringindexer, vecAssembler, rf])
from pyspark.ml.tuning import ParamGridBuilder
paramGrid = (ParamGridBuilder()
.addGrid(rf.maxDepth, [2, 4, 6])
.addGrid(rf.numTrees, [10, 100])
.build())
Python
복사
하이퍼파라미터 그리드를 설정했으므로 각 모델을 평가하여 어떤 모델이 가장 성능이 좋은지 결정하는 방법을 정의
RegressionEvaluator를 사용하고 RMSE를 관심 메트릭
evaluator = RegressionEvaluator(
labelCol="price",
predictionCol="prediction",
metricName="rmse")
Python
복사
estimator, evaluator, estimatorParamMaps를 받아들이는 CrossValidator를 사용하여 k-fold 교차 검증을 수행
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(
estimator=pipeline,
evaluator=evaluator,
estimatorParamMaps=paramGrid,
numFolds=3, seed=42)
cvModel = cv.fit(trainDF)
# 출력은 작업에 소요된 시간을 알려준다
Python
복사
얼마나 많은 모델을 훈련시켰는가? ⇒ 18(6개의 하이퍼파라미터 구성 x 3-폴드 교차 검증)
스파크는 최적의 하이퍼파라미터 구성을 식별하면 전체 교육 데이 터 세트에 대해 모델을 재교육하므로 결국 19개의 모델을 교육
◦
훈련된 중간 모델을 유지하려면 CrossValidator에서 collectSubModels=True를 설정
◦
교차 검증기의 결과를 검사 - avgMetrics
list(zip(cvModel.getEstimatorParamMaps(), cvModel.avgMetrics))
Python
복사
K-폴드 교차검증
1.
평가할 추정기를 정의한다.
pipeline = Pipeline(stages = [stringIndexer, vecAssembler, rf])
Python
복사
2.
ParamGridBuilder를 사용하여 변경하려는 하이퍼파라미터와 해당 값을 지정한다.
from pyspark.ml.tuning import ParamGridBuilder
paramGrid = (ParamGridBuilder()
.addGrid(rf.maxDepth, [2,4,6])
.addGrid(rf.numTrees, [10, 100])
.build())
Python
복사
3.
평가기(evaluator)를 정의하여 다양한 모델을 비교하는 데 사용할 metric을 지정한다.
evaluator = RegressionEvaluator(labelCol="price",
predictionCol="prediction",
metricName="rmse")
Python
복사
4.
CrossValidator를 사용하여 다양한 모델 각각을 평가하는 교차 검증을 수행한다.
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator=pipeline,
evaluator=evaluator,
estimatorParamMaps=paramGrid,
numFolds=3,
seed=42)
cvModel = cv.fit(trainDF)
Python
복사