Search

[러닝스파크 10장] MLlib을 사용한 머신러닝

[러닝스파크 10장] MLlib을 사용한 머신러닝

~ 338(362)
스파크에는 spark.mllib 및 spark.ml의 두 가지 머신러닝 패키지 존재
spark.mllib
RDD API를 기반으로 하는 기존의 머신러닝 API
spark.ml
데이터 프레임을 기반으로 하는 최신 API
spark.ml은 모델이 보유한 데이터 포인트 수에 따라 선형으로 확장되는 O(n)O(n) 확장에 중점을 두어 방대한 양의 데이터로 확장

머신러닝 파이프라인 설계

MLlib에서 파이프라인 API는 머신러닝 워크플로를 구성하기 위해 데이터 프레임 위에 구축된 고급 API를 제공
파이프라인 API
일련의 변환기transformer와 추정기estimator로 구성
MLlib을 사용하여 종단 간 파이프라인을 구축하는 것이 목표
변환기 (transformer)
데이터 프레임을 입력으로 받아들이고, 하나 이상의 열이 추가된 새 데이터 프레임을 반환
변환기는 데이터에서 매개변수를 학습하지 않고, 단순히 규칙 기반 변환을 적용하여 모델 훈련을 위한 데 이터를 준비하거나 훈련된 Mllib 모델을 사용하여 예측을 생성.
.transform() 메서드가 있다.
추정기 (estimator)
.fit() 메서드를 통해 데이터 프레임에서 매개변수를 학습하고 변환기인 Model을 반환
파이프라인
일련의 변환기와 추정기를 단일 모델로 구성
파이프라인 자체가 추정기인 반면, pipeline.fit()의 출력은 변환기인 PipelineModel을 반환

데이터 수집 및 탐색

filePath = """../databricks-datasets/learning-spark-v2/sf-airbnb/sf-airbnb-clean.parquet/""" airbnbDF = spark.read.parquet(filePath) airbnbDF.select("neighbourhood_cleansed", "room_type", "bedrooms", "bathrooms", "number_of_reviews", "price").show(5)
Python
복사

학습 및 테스트 데이터세트 생성

피처 엔지니어링 및 모델링을 시작하기 전에 데이터 세트를 학습train 및 테스트test의 두 그룹으로 나눈다. (8:2)
# 파이썬 예제 trainDF, testDF = airbnbDF.randomSplit([.8, .2], seed=42) print(f"""There are {trainDF.count()} rows in the training set, and {testDF.count()} in the test set""") # There are 5780 rows in the training set, and 1366 in the test set
Python
복사
스파크 클러스터의 이그제큐터 수를 변경하면 어떻게 될까?
→ 카탈리스트 옵티마이저는 클러스터 리소스 및 데이터세트 크기에 따라 데이터를 분할하는 최적의 방법을 결정
일관된 결과를 얻을 수 있도록 클러스터 구성과 시드를 수정할 수 있지만 데이터를 한 번 분할한 다음 이러한 재현성 문제가 발생하지 않도록 자체 훈련/테스트 폴더에 기록하는 것이 좋다.

변환기를 사용하여 기능 준비

선형 회귀(스파크의 다른 많은 알고리즘과 마찬가지로)에서는 모든 입력 기능이 데이터 프레임의 단일 벡터 내에 포함되어야 함 ⇒ 데이터를 변환transform
데이터 프레임을 입력으로 받아들이고, 하나 이상의 열이 추가된 새 데이터 프레임을 반환
모든 기능을 단일 벡터에 넣는 작업을 위해 VectorAssembler 변환기를 사용
VectorAssembler는 입력 열 목록을 가져와서 기능이라고 부를 추가 열이 있는 새 데이터 프레임을 만든다. ⇒ 입력 열의 값을 단일 벡터로 결합
침실이라는 단일 특성만을 사용하여 모델을 구축
# 파이썬 예제 from pyspark.ml.feature import VectorAssembler vecAssembler = VectorAssembler(inputCols=["bedrooms"], outputCol="features") vecTrainDF = vecAssembler.transform(trainDF) vecTrainDF.select("bedrooms", "features", "price").show(10)
Python
복사

추정기를 사용하여 모델 구축

선형 회귀에 대한 입력 열(features)이 vectorAssembler의 출력
LinearRegression은 데이터 프레임을 사용하고 모델을 반환한다.
추정기는 데이터에서 매개변수를 배우고, estimator_name.fit() 메서드를 갖고 열성적이라고 평가되는 반면, 변환기는 느리게 평가된다.
추정기의 다른 예로는 Imputer, DecisionTreeClassifierRandom ForestRegressor가 있다.
# 파이썬 예제 from pyspark.ml.regression import LinearRegression Ir = LinearRegression(featuresCol="features", labelCol="price") IrModel = Ir.fit(vecTrainDF)
Python
복사
lnfit()은 변환기인 LinearRegressionModel(lrModel)을 반환
= 추정기의 fit() 메서드의 출력은 변환기
#파이썬 예제 m = round(IrModel.coefficients[0], 2) b = round(IrModel.intercept, 2) print(f"""The formula for the linear regression line is price = {m}*bedrooms + {b}""") # The formula for the linear regression line is price = 123.68*bedrooms + 47.51
Python
복사

파이프라인 생성

모델을 테스트 세트에 적용하려면 훈련 세트와 동일한 방식으로 해당 데이터를 준비
데이터가 통과할 단계를 순서대로 지정하기만 하면 스파크가 알아서 처리를 한다.
Pipelines는 추정기
PipelineModels(피팅된 파이프라인)는 변환기
from pyspark.ml import Pipeline pipeline = Pipeline(stages=[vecAssembler, Lr]) pipelineModel = pipeline.fit(trainDF)
Python
복사
파이프라인 API를 사용하는 또 다른 이점
⇒ 어느 단계가 추정기/변환기인지 결정하므로 각 단계에 대해 name.fit()name.transform()을 지정하는 것에 대해 우려할 필요가 없다는 것
predDF = pipelineModel.transform(testDF) predDF.select("bedrooms", "features", "price", "prediction").show(10)
Python
복사

원-핫 인코딩

범주형 값을 숫잣값으로 변환
모든 숫자 및 범주 기능을 통합하는 약간 더 복잡한 파이프라인을 구축하는 방법
벡터로 표시되는 입력으로 숫자값을 기대
pandas.get_dummies()와 동일한 작업을 수행한다
300마리의 동물이 있는 동물원이 있다면 OHE가 메모리/컴퓨팅 리소스 소비를 크게 증가시킬까?
⇒ 스파크를 사용하면 그렇지 않다!
⇒ 내부적으로 SparseVector를 사용하므로 0값을 저장하는 공간을 낭비하지 않는다. (scipy와 유사)
DenseVector(0, 0, 0, 7, 0, 2, 0, 0, 0, 0) SparseVector(10, [3, 5], [7, 2])
Python
복사
SparseVector를 생성하려면 벡터의 크기, 0이 아닌 요소의 인덱스 및 해당 인덱스의 해당 값을 추적
스파크로 데이터를 원-핫 인코딩하는 몇 가지 방법
Stringindexer 및 OneHotEncoder를 사용하는 것
Stringindexer 추정기를 적용하여 범주형 값을 범주 지수로 변환
카테고리 인덱스를 만든 후에는 이를 OneHotEncoder에 대한 입력으로 전달할 수 있다
범주형 피쳐를 원-핫 인코딩하는 방법을 보여준다.
from pyspark.ml.feature import OneHotEncoder, StringIndexer categoricalCols = [field for (field, dataType) in trainDF.dtypes if dataType == "string"] indexOutputCols = [x + "Index" for x in categoricalCols] oheOutputCols = [x + "OHE" for x in categoricalCols] stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=indexOutputCols, handleInvalid = "skip") oheEncoder = OneHotEncoder(inputCols=indexOutputCols, outputCols=oheOutputCols) numericCols = [field for (field, dataType) in trainDF.dtypes if ((dataType == "double") & (field != "price"))] assemblerInputs = oheOutputCols + numericCols vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
Python
복사
Stringindexer는 테스트 데이터 세트에는 나타나지만 훈련 데이터 세트에는 나타나지 않는 새로운 범주를 어떻게 처리하는가?
⇒ handlelnvalid 매개변수 활용
옵션 : 건너뛰기(잘못된 데이터가 있는 행 필터링), 오류(오류 발생) 또는 유지(인덱스 numLabels의 특수 추가 버킷에 잘못된 데이터를 넣음)
어려움 : Stringindexer에 범주형 기능으로 처리해야 하는 피처를 명시적으로 알려야 한다는 것
Vectorindexer를 사용하여 모든 범주형 변수를 자동으로 감지할 수 있지만, 모든 단일 열을 반복하고 maxcategories 고유값보다 적은지 감지해야 하므로 계산 비용이 많이 든다. maxCategories는 사용자가 지정하는 매개변수이며 이 값을 결정하는 것도 어려울 수 있다.
RFormula를 사용하여 해당 레이블과 포함할 피처를 제공
from pyspark.ml.feature import RFormula rFormula = RFormula(formula="price ~ . ", featuresCol="features", labelCol="price", handleInvalid="skip")
Python
복사
공식 = ”y ~ .”는 사용 가능한 모든 피처를 사용한다는 것을 의미
RFormula 장점: Stringindexer와 OneHotEncoder를 자동으로 결합
RFormula는 자동으로 모든 문자열 열을 Stringindex 및 원-핫 인코딩해서 숫자 열을 이중 유형으로 변환하고, 내부에서 VectorAssembler를 사용하여 이 모든 것을 단일 벡터로 결합
단점 : 모든 알고리즘에 대해 원-핫 인코딩이 필요하지 않거나 권장되지 않는다는 것
모든 피처 준비 및 모델 구축을 파이프라인에 넣고 데이터 세트에 적용
# 파이썬 예제 lr = LinearRegression(labelCol="price", featuresCol="features") pipeline = Pipeline(stages = [stringIndexer, oheEncoder, vecAssembler, lr]) # 또는 RFormula 사용 # pipeline = Pipeline(stages = [rFormula, lr]) pipelineModel = pipeline.fit(trainDF) predDF = pipelineModel.transform(testDF) predDF.select("features", "price", "prediction").show(5)
Python
복사

모델 평가

spark.ml에는 분류, 회귀, 클러스터링 및 순위 평가기 존재
RMSE를 사용하여 회귀 모델 평가
from pyspark.ml.evaluation import RegressionEvaluator regressionEvaluator = RegressionEvaluator( predictionCol="prediction", labelCol="price", metricName="rmse") rmse = regressionEvaluator.evaluate(predDF) print(f"RMSE is {rmse:.1f}") # RMSE is 220.6
Python
복사
rmse 적합도 알아보는 법
회귀 작업에 대한 일반적인 기준선 모델은 훈련 세트 yˉ\bar{y} 에서 레이블의 평균값을 계산 한 다음, 테스트 데이터 세트의 모든 레코드에 대해 yˉ\bar{y} 를 예측하고 결과 RMSE를 계산하는 것
해당 작업을 거친 작업이 기준선, 대조 모델이 기준을 초과하지 않으면 모델 구축 프로세스에 문제가 발생 한 것일 수 있다.
R2R^2 평가기준
독립변수가 종속변수를 얼마나 잘 설명하는 지를 나타냅니다. R-squared는 0과 1 사이 값을 가집니다.
즉 1에 가까울수록 독립변수가 종속변수를 잘 설명할 수 있다는 뜻
모델이 모든 데이터 포인트를 완벽하게 예측한다면 SSres=0SS_{res} = 0, R2=1R^2 = 1이 된다
SSres{SS}_{res}= SStot{SS}_{tot}이면 분수는 1/1이므로 R2R^2은 0이다.
모델은 항상 평균값 yˉ\bar{y} 를 예측하는 것과 동일한 성능을 보인다.
R2R^2 값이 음수이면 모델링 프로세스를 재평가해야 한다
R2R^2 을 사용할 때 좋은 점은 비교할 기준 모델을 정의할 필요가 없다
setter 속성을 사용하여 메트릭 이름을 설정
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF) print(f"R2 is {r2}") # R2 is 0.159854
Python
복사

모델 저장 및 로드

API는 model.write().save(path)
overwrite() 명령을 제공하여 해당 경로에 포함된 데이터를 덮어쓸 수 있다.
pipelinePath = ".tmp/lr-pipeline-model" pipelineModel.write().overwrite().save(pipelinePath)
Python
복사
저장된 모델을 로드할 때 로드할 모델 유형을 다시 지정
from pyspark.ml import PipelineModel savedPipelineModel = PipelineModel.load(pipelinePath)
Python
복사
스파크에는 ‘웜 스타트 warm start'라는 개념이 없기 때문에 이 모델의 가중치를 새 모델 교육을 위한 초기화 매개변수로 사용할 수 없다 ⇒ 데이터 세트가 약간 변경되면 전체 선형 회귀 모델을 처음부터 다시 훈련해야 한다

하이퍼 파라미터 튜닝

하이퍼파라미터는 훈련 전에 모델에 대해 정의하는 속성이며 훈련 과정에서 학습되지 않는다
트리 기반 모델을 통해 하이퍼 파라미터 튜닝을 학습한다
의사결정나무의 경우, 입력 피처를 표준화하거나 확장하는 것에 대해 걱정할 필요가 없다. 이는 분할에 영향을 미치지 않기 때문이다. 그러나 범주형 피처를 준비하는 방법에 대해서는 주의.
spark.ml에서는 범주형 열을 String Indexer에 전달하기만 하면 나머지는 의사결정나무에서 처리 가능.
from pyspark.ml.regression import DecisionTreeRegressor dt = DecisionTreeRegressor(labelCol="price")
Python
복사
# 숫자 열만 필터링한다(가격, 레이블 제외). numericCols = [field for (field, dataType) in trainDF.dtypes if ((dataType == "double") & (field != "price"))] # 위에서 정의한 Stringindexer의 출력과 숫자 열 결합 assemblerlnputs = indexOutputCols + numericCols vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features") # 단계를 파이프라인으로 결합 stages = [stringIndexer, vecAssembler, dt] pipeline = Pipeline(stages=stages) pipelineModel = pipeline.fit(trainDF) # 이 라인에서 에러가 발생해야 함
Python
복사
maxBins 매개변수에 문제가 있음
maxBins는 연속 특성이 이산화되거나 분할되는 bin의 수를 결정
모든 데이터와 모델이 단일 머신에 상주하기 때문에 사이킷런에는 maxBins 매개변수가 없다.
그러나 스파크에서 워커는 데이터의 모든 열을 갖고 있지만 행의 하위 집합만 있다. ⇒ 분할할 피처와 값에 대해 통신할 때, 훈련 시간에 설정된 공통 이산화에서 얻은 동일한 분할값에 대해 모두 다루고 있는지 확인해야 한다.
dt.setMaxBins(40) pipelineModel = pipeline.fit(trainDF) dtModel = pipelineModel.stages[-1] print(dtModel.toDebugString) ## feature importance 뽑기 import pandas as pd featureImp = pd.DataFrame( list(zip(vecAssembler.getInputCols(), dtModel.featureimportances)), columns=["feature", "importance"]) featureImp.sort_values(by="importance", ascending=False)
Python
복사

랜덤 포레스트 (앙상블)

from pyspark.ml.regression import RandomForestRegressor rf = RandomForestRegressor(labelCol="price", maxBins=40, seed=42)
Python
복사

하이퍼 파라미터 튜닝

최적의 하이퍼파라미터 구성을 식별하는 데 도움이 되는 하이퍼옵트
1. 평가할 추정기를 정의 2. ParamGridBuilder를 사용하여 변경하려는 하이퍼파라미터와 해당 값을 지정 3. 평가기(evaluator)를 정의하여 다양한 모델을 비교하는 데 사용할 메트릭을 지정. 4. CrossValidator를 사용하여 다양한 모델 각각을 평가하는 교차 검증을 수행.
pipeline = Pipeline(stages = [stringindexer, vecAssembler, rf]) from pyspark.ml.tuning import ParamGridBuilder paramGrid = (ParamGridBuilder() .addGrid(rf.maxDepth, [2, 4, 6]) .addGrid(rf.numTrees, [10, 100]) .build())
Python
복사
하이퍼파라미터 그리드를 설정했으므로 각 모델을 평가하여 어떤 모델이 가장 성능이 좋은지 결정하는 방법을 정의
RegressionEvaluator를 사용하고 RMSE를 관심 메트릭
evaluator = RegressionEvaluator( labelCol="price", predictionCol="prediction", metricName="rmse")
Python
복사
estimator, evaluator, estimatorParamMaps를 받아들이는 CrossValidator를 사용하여 k-fold 교차 검증을 수행
from pyspark.ml.tuning import CrossValidator cv = CrossValidator( estimator=pipeline, evaluator=evaluator, estimatorParamMaps=paramGrid, numFolds=3, seed=42) cvModel = cv.fit(trainDF) # 출력은 작업에 소요된 시간을 알려준다
Python
복사
얼마나 많은 모델을 훈련시켰는가? ⇒ 18(6개의 하이퍼파라미터 구성 x 3-폴드 교차 검증)
스파크는 최적의 하이퍼파라미터 구성을 식별하면 전체 교육 데이 터 세트에 대해 모델을 재교육하므로 결국 19개의 모델을 교육
훈련된 중간 모델을 유지하려면 CrossValidator에서 collectSubModels=True를 설정
교차 검증기의 결과를 검사 - avgMetrics
list(zip(cvModel.getEstimatorParamMaps(), cvModel.avgMetrics))
Python
복사

K-폴드 교차검증

1.
평가할 추정기를 정의한다.
pipeline = Pipeline(stages = [stringIndexer, vecAssembler, rf])
Python
복사
2.
ParamGridBuilder를 사용하여 변경하려는 하이퍼파라미터와 해당 값을 지정한다.
from pyspark.ml.tuning import ParamGridBuilder paramGrid = (ParamGridBuilder() .addGrid(rf.maxDepth, [2,4,6]) .addGrid(rf.numTrees, [10, 100]) .build())
Python
복사
3.
평가기(evaluator)를 정의하여 다양한 모델을 비교하는 데 사용할 metric을 지정한다.
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
Python
복사
4.
CrossValidator를 사용하여 다양한 모델 각각을 평가하는 교차 검증을 수행한다.
from pyspark.ml.tuning import CrossValidator cv = CrossValidator(estimator=pipeline, evaluator=evaluator, estimatorParamMaps=paramGrid, numFolds=3, seed=42) cvModel = cv.fit(trainDF)
Python
복사