[러닝스파크 11장] 아파치 스파크로 머신러닝 파이프라인 관리, 배포 및 확장
모델 관리
•
재현성 : 모델을 생성한 코드, 훈련에 사용된 환경, 훈련된 데이터 및 모델 자체를 재현할 수 있어야 함.
•
라이브러리 버전 관리 : 모델 작성 시 라이브러리의 어떤 버전을 사용하였는지 명시. requirement.txt
•
데이터 변경 : 훈련 당시의 데이터에 데이터가 삭제되거나 추가되면서 재현이 어려워질 수 있음.
•
실행 순서 : 코드의 실행은 위에서 부터 아래로 진행되어야 한다. - 코드 공유 시 주의하기!
•
병렬 작업 : 실행 순서가 보장되지 않는 경우. 결과가 비결정적이게 될 수 있다.
MLflow
•
MLflow : 개발자가 실험을 재현 및 공유하고, 모델을 관리하는 등의 작업을 수행하는데 도움이 되는 오픈소스 플랫폼. 파이썬, R, 자바/scala 인터페이스와 REST API 제공.
•
MLflow의 네 가지 구성 요소
◦
트래킹: 코드, 데이터, 구성 및 결과 등 실험을 기록하고 쿼리한다.
◦
프로젝트 : 프로젝트 및 종속성을 패키징하는 표준화된 형식. 모든 플랫폼에서 실행을 재현할 수 있는 형식으로 코드를 패키징한다.
◦
모델 : 모델을 패키징하는 표준화된 형식. 모델을 빌드하는 데 사용된 알고리즘이나 라이브러리에 관계없이 모델을 로드하고 적용하기 위한 일관된 API를 제공.
◦
레지스트리: 모델 계보, 모델 버전, 단계 전환 및 주석을 트래킹하는 저장소. 중앙 저장소에서 모델을 저장, 주석 달기, 검색 및 관리를 수행한다.
MLflow 시작하기
•
mlflow 라이브러리 설치
# terminal
pip install mlflow
Bash
복사
•
랜덤 포레스트 모델 트래킹 해보기
◦
모델 생성
filePath = "sf-airbnb-clean.parquet"
airbnbDF = spark.read.parquet(filePath)
(trainDF, testDF) = airbnbDF.randomSplit([.8, .2], seed = 42)
categoricalCols = [field for (field, dataType) in trainDF.dtypes if dataType == "string"]
indexOutputCols = [x + "Index" for x in categoricalCols]
stringIndexer = StringIndexer(inputCols = categoricalCols,
outputCols = indexOutputCols,
handleInvalid = "skip")
numericCols = [field for (field, dataType) in trainDF.dtypes if ((dataType == "double") & (field != "price"))]
assemblerInputs = indexOutputCols + numericCols
vecAssembler = VectorAssembler(inputCols = assemblerInputs,
outputCol = "features")
rf = RandomForestRegressor(labelCol = "price", maxBins = 40, maxDepth = 5, numTrees = 100, seed = 42)
pipeline = Pipeline(stages = [stringIndexer, vecAssembler, rf])
Python
복사
◦
mlflow에 트래킹 파일 저장
with mlflow.start_run(run_name = "random-forest") as run:
# 로그 매개변수
mlflow.log_param("num_trees", rf.getNumTrees())
mlflow.log_param("max_depth", rf.getMaxDepth())
# 로그 모델
pipelineModel = pipeline.fit(trainDF)
mlflow.spark.log_model(pipelineModel, "model")
# 로그 매트릭
predDF = pipelineModel.transform(testDF)
regressionEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price")
rmse = regressionEvaluator.setMetricName("rmse").evaluate(predDF)
r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF)
mlflow.log_metrics({"rmse": rmse, "r2": r2})
# 로그 아티팩트(artifact) : 기능 중요도 점수
rfModel = pipelineModel.stages[-1]
pandasDF = (pd.DataFrame(list(zip(vecAssembler.getInputCols(),
rfModel.featureImportances)),
columns=["feature", "importance"])
.sort_values(by = "importance", ascending = False))
# 먼저 로컬에 파일을 저장하고 MLflow에 파일 경로를 알려준다.
pandasDF.to_csv("feature-importance.csv", index = False)
mlflow.log_artifact("feature-importance.csv")
Python
복사
◦
mlflow ui로 저장파일 확인
# terminal
cd (mlruns가 있는 폴더)
mlflow ui
Bash
복사
◦
MLflowClient로 접근
from mlflow.tracking import MlflowClient
client = MlflowClient()
runs = client.search_runs(run.info.experiment_id,
order_by= ["attributes.start_time desc"],
max_results=3) # max_results : 몇 개까지 찾을 건지
run_id = runs[2].info.run_id
run_name = runs[2].info.run_name
print(run_id)
print(run_name)
print(runs[2].data.metrics)
Python
복사
# 결과
443c46fd533e4bfdb33dd3efe0372c2b
random-forest
{'r2': 0.22794251914574226, 'rmse': 211.5096898777315}
Python
복사
MLlib을 사용한 모델 배포 옵션
•
비즈니스 환경에 따라 대기 시간, 처리량, 비용 등의 요구사항이 다르기 때문에 배치, 스트리밍, 실시간 또는 모바일/임베디드와 같은 배포 모드를 선택하여 배포해야 한다.
•
모델링 프로세스를 시작하기 전 모델 배포 요구사항을 정의하고, 어떤 도구를 사용할 것인지를 결정해야 한다. 결국 MLlib과 스파크도 하나의 도구에 불과하다.(실시간 배포를 위해서는 다른 도구를 사용해야 한다.)
•
아래는 대표적인 3가지 경우의 처리량과 지연시간 그리고 예시 상황을 표로 정리한 것이다.
종류 | 처리량 | 지연시간 | 응용 상황 | 특징 |
배치 | 높음 | 높음 | 고객 이탈 예측 | - 정기적인 일정에 따라 예측을 생성 후 영구 저장소에 출력할 결과를 저장
- 모델 훈련 중에만 컴퓨팅 비용을 지불하면 되므로 일반적으로 가장 저렵하고 쉬운 배포 옵션
- 예측 작업마다 발생하는 오버 헤드가 적어지기 때문에 데이터 포인트 당 효율적이다. 특히 스파크의 경우 드라이버와 실행기 사이에 앞뒤로 통신하는 오버헤드가 크기 때문에 영향도가 크다. |
스트리밍 | 중간 | 중간 | 동적 가격 책정 | - 처리량과 대기 시간의 적적절한 균형을 제공
- 정형화 데이터를 사용하는 경우 거의 모든 코드가 배치 사용 사례와 비슷하기 때문에 두 옵션을 쉽게 오갈 수 있다.
- 지속적으로 가동하고 실행하는 데 사용하는 VM 또는 컴퓨팅 리소스에 대한 비용을 지불해야 한다.
- 스트림이 내결함성을 유지하고 수신 데이터 스파이크 시에 버퍼링을 제공할 수 있도록 적절한 구성이 되어있는지 확인해야 한다. |
실시간 | 낮음 | 낮음 | 온라인 광고 입찰 | - 실시간으로 모델 예측을 생성한다.
- 스파크가 대기 시간 요구사항을 충족할 수 없으므로 이를 사용하려면 스파크 외부에 모델을 가져가야 한다. |
배치 배포
•
모델을 훈련시키고 예측을 생성하고 결과를 다운스트림에서 소비할 수 있도록 테이블, 데이터베이스, 데이터 레이크 등에 저장한다.
•
model.transform()을 이용하여 데이터에 대한 예측을 생성할 수 있다.
# mlflow를 사용하여 저장된 모델 로드
pipelineModel = mlflow.spark.load_model(f"runs:/{run_id}/model")
# 예측 생성
inputDF = spark.read.parquet("../data/sf-airbnb-clean.parquet")
predDF = pipelineModel.transform(inputDF)
Python
복사
•
배치 배포 시 결정해야 하는 것
◦
얼마나 자주 예측을 생성할 것인지
◦
얼마나 자주 모델을 재교육할 것인지
◦
모델을 어떻게 버전화할 것인지
스트리밍 배포
•
유입되는 데이터에 대해 지속적으로 추론을 수행할 수 있다.
•
컴퓨팅 시간에 대해 지속적으로 비용을 지불해야 하기 때문에 배치 솔루션보다 비용이 많이 들지만 예측을 더 자주 생성하여 더 빠른 조치를 취할 수 있다는 추가 이점을 얻을 수 있다.
•
스파크를 이용하면 배치 예측을 스트리밍 예측으로 매우 쉽게 변환할 수 있으며 거의 모든 코드가 동일하다. 유일한 차이점은 spark.read() 대신 spark.readStream()을 사용하고 데이터 소스를 변경해야 한다는 것이다.
•
아래 코드는 동일한 parquet 파일을 100개의 작은 파케이 파일로 분할하여 스트리밍으로 작업을 수행하는 코드이다.
# 모델 로드
pipelineModel = mlflow.spark.load_model(f"runs:/{run_id}/model")
# 스트리밍 데이터 셋업
repartitionedPath = "../data/sf-airbnb-clean-100p.parquet"
schema = spark.read.parquet(repartitionedPath).schema
streamingData = (spark
.readStream
.schema(schema)
.option("maxFilesPerTrigger", 1)
.parquet(repartitionedPath))
# 예측 생성
streamPred = pipelineModel.transform(streamingData)
Python
복사
스트리밍 데이터로 작업할 때는 스키마를 먼저 정의해야 하기 때문에 parquet 파일로 작업을 수행하고 있지만 스키마를 지정해주었다.
준-실시간 배포
•
사용 사례에 따라 수백 밀리초에서 몇 초 정도의 예측이 필요한 경우, MLlib을 사용하여 예측을 생성하는 예측 서버를 구축할 수 있다. 이것은 매우 적은 양의 데이터를 처리하기 때문에 스파크의 이상적인 사용 사례는 아니지만 스트리밍 또는 배치 솔루션보다 대기 시간이 짧다.
실시간 배포
•
사기 탐지, 광고 추천 등 실시간 추론이 필요한 응용 애플리케이션 분야에서 사용.
•
적은 수의 레코드로 예측을 수행하면 실시간 추론에 필요한 짧은 대기 시간을 달성할 수 있지만, 로드 밸런싱과 지리적 위치를 고려해야 한다. 짧은 지연 시간을 제공하는 관리형 서비스 중에서는 Amazon SageMaker, Azure ML과 같은 솔류션이 인기있다.
•
위의 서비스에 배포를 위해서는 모델을 스파크 외부로 내보내야 하는데 아래의 방법이 있다. 자세한 내용은 필요할 때 더 찾아보자..
◦
방법1 : 파이썬, C 등으로 모델을 다시 구현.
◦
방법2 : 라이브러리 이용
▪
MLeap(더 이상 지원X), ONNX
◦
방법3 : 써드파티 라이브러리 사용
▪
XGBoost, H20.ai
import XGBoost as xgb
bst = xgb.Booster({'ntread':4})
bst.load_model("XGBoost_native_model")
Python
복사
비 MLlib 모델에 스파크 활용
•
MLlib에 사용하려는 알고리즘에 대한 기본 제공 자원이 없거나, 대기 시간이 매우 짧은 추론 요구사항을 충족하지 않을 수 있다. 그런 경우 스파크는 활용할 수 있지만, MLlib은 사용할 수 없다. 그런 경우 사용자 정의 함수를 spark에 적용하여 해결할 수 있다. 그 중 아래에서는 판다스 UDF에 대해 설명하고자 한다.
판다스 UDF
•
교재의 자료가 너무 불친절해서 다음의 자료를 참고하여 내용을 정리하였습니다.
•
PySpark도 많은 데이터 변환을 제공하지만, pandas에서 제공하는 다양한 변환 기능을 따라가지는 못 한다. Spark 3.x 부터 pandas user-defined function (이후 pandas UDF)를 이용하여 PySpark와 pandas를 결합하는 것이 가능해졌다.
샘플 데이터 생성
# sample data 생성
g = np.tile(['group a', 'group b'], 10)
x = np.linspace(0, 10., 20)
np.random.seed(3)
y_lin = 2*x + np.random.rand(len(x))/10.
y_qua = 3*x**2 + np.random.rand(len(x))
df = pd.DataFrame({'group': g, 'x':x, 'y_lin': y_lin, 'y_qua':y_qua})
schema = "group STRING, x DOUBLE, ylin DOUBLE, y_qua DOUBLE"
df = spark.createDataFrame(df, schema=schema)
Python
복사
pyspark.sql.functions.pandas_udf() 사용하기
•
예시1) 데이터 표준화 : Series to Series
# 표준화 작업 : series to series pandas UDF
@F.pandas_udf(T.DoubleType())
def standardise(col1: pd.Series) -> pd.Series:
return (col1 - col1.mean()) / col1.std()
res = df.select(standardise(F.col('y_lin')).alias('result')) # alias : column 이름 변경
res.show(3)
Python
복사
# 위와 동일한 기능을 하는 코드
def standardise_func(col1: pd.Series) -> pd.Series:
return (col1 - col1.mean()) / col1.std()
standardise = F.pandas_udf(standardise_func, returnType=T.DoubleType())
res = df.select(standardise(F.col('y_lin')).alias('result'))
res.show(3)
Python
복사
•
예시2) 전체 열에 * 2 하기 : Iterator of Series to Iterator of Series
# 전체 열에 *2 하기: Iterator of Series to Iterator of Series
@F.pandas_udf(T.DoubleType())
def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
for x in iterator:
yield x * 2
df.select(plus_one(F.col('x'))).show(3)
Python
복사
•
예시3) 두 열의 RMSE 구하기 : Iterator of Multiple Series to Iterator of Series
from typing import Tuple
@F.pandas_udf(T.DoubleType())
def calculate_rmse(iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for a, b in iterator:
yield ((a * 2) - b) ** 2
df.select(calculate_rmse("x", "y_lin")).show(3)
Python
복사
Pandas Function APIs
•
applyInPandas()
◦
group 별로 함수를 적용할 수 있다.
◦
(응용) group별로 다른 모델을 이용하여 예측하고자 할 때 사용할 수 있음.
# applyInPandas() : group 별 평균 구하기
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
x = pdf.x
return pdf.assign(x = x.mean())
result = df.groupby("group").applyInPandas(subtract_mean, schema = "group STRING, x DOUBLE, y_lin DOUBLE, y_qua DOUBLE").toPandas()
Python
복사
•
mapInPandas()
◦
위에서 iterator → iterator와 비슷한 기능을 하는 함수. 하지만 입력과 동일한 개수의 자료를 반환할 필요는 없다.
# mapInPandas() : group a 데이터만 반환하기
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.group == "group a"]
result = df.mapInPandas(filter_func, schema=df.schema).toPandas()
Python
복사
분산 하이퍼파라미터 조정을 위한 스파크
•
분산 추론을 수행할 의도가 없거나 MLlib의 분산 훈련 기능이 필요하지 않더라도 분산 하이퍼파라미터 조정을 위해 스파크를 활용할 수 있다. 아래에서는 Joblib과 Hyperopt 오픈소스에 대해 설명한다.
Joblib
•
파이썬에서 정량 파이프라이닝을 제공하는 도구 세트
•
잡립은 데이터 복사본을 모든 작업자에게 자동으로 브로드캐스트하기 때문에 동시에 (하이퍼파라미터만 다른) 동일한 모델을 동일한 데이터에 대해 훈련시킬 수 있다.
•
scikit-learn 0.21 이상, pyspark 2.4.4 이상
•
joblib을 이용하여 하이퍼파라미터 튜닝 병렬 실행하기 with GridSearch
register_spark() # 스파크 백엔드 등록
# 파일 불러오기 및 데이터 분할
filePath = "../data/sf-airbnb-clean.parquet"
df = spark.read.parquet(filePath).toPandas() # pandas 객체로 가져오기
df = df.drop(columns = ['host_is_superhost', 'cancellation_policy', 'instant_bookable', 'neighbourhood_cleansed', 'property_type', 'room_type', 'bed_type']) # 수치형 변수만 남김 (편의를 위해...)
X_train, X_test, y_train, y_test = train_test_split(df.drop(["price"], axis=1), df[["price"]].values.ravel(), random_state=42)
# GridSearch 객체 생성
param_grid = {"max_depth" : [2,5,7,10], "n_estimators": [20, 50, 70, 100]}
gscv = GridSearchCV(rf, param_grid, cv = 3)
# joblib (실행시간 : 16.25)
with parallel_backend("spark", n_jobs = 3):
gscv.fit(X_train, y_train)
print(gscv.cv_results_)
# 기본 실행 (실행시간 : 25.99)
gscv.fit(X_train, y_train)
Python
복사
Hyperopt
•
Bayesian Optimization에 기반한 하이퍼파라미터 튜닝 라이브러리
•
단일 시스템에서 사용법 : https://teddylee777.github.io/machine-learning/hyper-opt/
•
아래의 코드에서 fmin()은 training_function에 사용될 새로운 하이퍼파라미터 구성을 생성(베이지안 최적화 기반)하고 이를 Spark에 전달한다. SparkTrials는 이러한 훈련 작업의 배치를 각 스파크 실행기에서 단일 스파크 작업으로 병렬 실행한다. 단일 모델 훈련이 끝나면 이를 드라이버에 반환하고, 하이퍼옵트는 다음 하이퍼파라미터 조합을 구성한다.
import hyperopt
best_hyperparameters = hyperopt.fmin(
fn = training_function, # 모델 훈련 함수
space = search_space, # 하이퍼파라미터 탐색 영역 지정
algo = hyperopt.tpe.suggest, # 최적화 알고리즘 선택
max_evals = 64, # 최대 반복 횟수
trials = hyperopt.SparkTrials(parallelism = 4)
)
Python
복사