Search

[러닝스파크 11장] 아파치 스파크로 머신러닝 파이프라인 관리, 배포 및 확장

[러닝스파크 11장] 아파치 스파크로 머신러닝 파이프라인 관리, 배포 및 확장

모델 관리

재현성 : 모델을 생성한 코드, 훈련에 사용된 환경, 훈련된 데이터 및 모델 자체를 재현할 수 있어야 함.
라이브러리 버전 관리 : 모델 작성 시 라이브러리의 어떤 버전을 사용하였는지 명시. requirement.txt
데이터 변경 : 훈련 당시의 데이터에 데이터가 삭제되거나 추가되면서 재현이 어려워질 수 있음.
실행 순서 : 코드의 실행은 위에서 부터 아래로 진행되어야 한다. - 코드 공유 시 주의하기!
병렬 작업 : 실행 순서가 보장되지 않는 경우. 결과가 비결정적이게 될 수 있다.

MLflow

MLflow : 개발자가 실험을 재현 및 공유하고, 모델을 관리하는 등의 작업을 수행하는데 도움이 되는 오픈소스 플랫폼. 파이썬, R, 자바/scala 인터페이스와 REST API 제공.
MLflow의 네 가지 구성 요소
트래킹: 코드, 데이터, 구성 및 결과 등 실험을 기록하고 쿼리한다.
프로젝트 : 프로젝트 및 종속성을 패키징하는 표준화된 형식. 모든 플랫폼에서 실행을 재현할 수 있는 형식으로 코드를 패키징한다.
모델 : 모델을 패키징하는 표준화된 형식. 모델을 빌드하는 데 사용된 알고리즘이나 라이브러리에 관계없이 모델을 로드하고 적용하기 위한 일관된 API를 제공.
레지스트리: 모델 계보, 모델 버전, 단계 전환 및 주석을 트래킹하는 저장소. 중앙 저장소에서 모델을 저장, 주석 달기, 검색 및 관리를 수행한다.

MLflow 시작하기

mlflow 라이브러리 설치
# terminal pip install mlflow
Bash
복사
랜덤 포레스트 모델 트래킹 해보기
모델 생성
filePath = "sf-airbnb-clean.parquet" airbnbDF = spark.read.parquet(filePath) (trainDF, testDF) = airbnbDF.randomSplit([.8, .2], seed = 42) categoricalCols = [field for (field, dataType) in trainDF.dtypes if dataType == "string"] indexOutputCols = [x + "Index" for x in categoricalCols] stringIndexer = StringIndexer(inputCols = categoricalCols, outputCols = indexOutputCols, handleInvalid = "skip") numericCols = [field for (field, dataType) in trainDF.dtypes if ((dataType == "double") & (field != "price"))] assemblerInputs = indexOutputCols + numericCols vecAssembler = VectorAssembler(inputCols = assemblerInputs, outputCol = "features") rf = RandomForestRegressor(labelCol = "price", maxBins = 40, maxDepth = 5, numTrees = 100, seed = 42) pipeline = Pipeline(stages = [stringIndexer, vecAssembler, rf])
Python
복사
mlflow에 트래킹 파일 저장
with mlflow.start_run(run_name = "random-forest") as run: # 로그 매개변수 mlflow.log_param("num_trees", rf.getNumTrees()) mlflow.log_param("max_depth", rf.getMaxDepth()) # 로그 모델 pipelineModel = pipeline.fit(trainDF) mlflow.spark.log_model(pipelineModel, "model") # 로그 매트릭 predDF = pipelineModel.transform(testDF) regressionEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="price") rmse = regressionEvaluator.setMetricName("rmse").evaluate(predDF) r2 = regressionEvaluator.setMetricName("r2").evaluate(predDF) mlflow.log_metrics({"rmse": rmse, "r2": r2}) # 로그 아티팩트(artifact) : 기능 중요도 점수 rfModel = pipelineModel.stages[-1] pandasDF = (pd.DataFrame(list(zip(vecAssembler.getInputCols(), rfModel.featureImportances)), columns=["feature", "importance"]) .sort_values(by = "importance", ascending = False)) # 먼저 로컬에 파일을 저장하고 MLflow에 파일 경로를 알려준다. pandasDF.to_csv("feature-importance.csv", index = False) mlflow.log_artifact("feature-importance.csv")
Python
복사
mlflow ui로 저장파일 확인
# terminal cd (mlruns가 있는 폴더) mlflow ui
Bash
복사
MLflowClient로 접근
from mlflow.tracking import MlflowClient client = MlflowClient() runs = client.search_runs(run.info.experiment_id, order_by= ["attributes.start_time desc"], max_results=3) # max_results : 몇 개까지 찾을 건지 run_id = runs[2].info.run_id run_name = runs[2].info.run_name print(run_id) print(run_name) print(runs[2].data.metrics)
Python
복사
# 결과 443c46fd533e4bfdb33dd3efe0372c2b random-forest {'r2': 0.22794251914574226, 'rmse': 211.5096898777315}
Python
복사

MLlib을 사용한 모델 배포 옵션

비즈니스 환경에 따라 대기 시간, 처리량, 비용 등의 요구사항이 다르기 때문에 배치, 스트리밍, 실시간 또는 모바일/임베디드와 같은 배포 모드를 선택하여 배포해야 한다.
모델링 프로세스를 시작하기 전 모델 배포 요구사항을 정의하고, 어떤 도구를 사용할 것인지를 결정해야 한다. 결국 MLlib과 스파크도 하나의 도구에 불과하다.(실시간 배포를 위해서는 다른 도구를 사용해야 한다.)
아래는 대표적인 3가지 경우의 처리량과 지연시간 그리고 예시 상황을 표로 정리한 것이다.
종류
처리량
지연시간
응용 상황
특징
배치
높음
높음
고객 이탈 예측
- 정기적인 일정에 따라 예측을 생성 후 영구 저장소에 출력할 결과를 저장 - 모델 훈련 중에만 컴퓨팅 비용을 지불하면 되므로 일반적으로 가장 저렵하고 쉬운 배포 옵션 - 예측 작업마다 발생하는 오버 헤드가 적어지기 때문에 데이터 포인트 당 효율적이다. 특히 스파크의 경우 드라이버와 실행기 사이에 앞뒤로 통신하는 오버헤드가 크기 때문에 영향도가 크다.
스트리밍
중간
중간
동적 가격 책정
- 처리량과 대기 시간의 적적절한 균형을 제공 - 정형화 데이터를 사용하는 경우 거의 모든 코드가 배치 사용 사례와 비슷하기 때문에 두 옵션을 쉽게 오갈 수 있다. - 지속적으로 가동하고 실행하는 데 사용하는 VM 또는 컴퓨팅 리소스에 대한 비용을 지불해야 한다. - 스트림이 내결함성을 유지하고 수신 데이터 스파이크 시에 버퍼링을 제공할 수 있도록 적절한 구성이 되어있는지 확인해야 한다.
실시간
낮음
낮음
온라인 광고 입찰
- 실시간으로 모델 예측을 생성한다. - 스파크가 대기 시간 요구사항을 충족할 수 없으므로 이를 사용하려면 스파크 외부에 모델을 가져가야 한다.

배치 배포

모델을 훈련시키고 예측을 생성하고 결과를 다운스트림에서 소비할 수 있도록 테이블, 데이터베이스, 데이터 레이크 등에 저장한다.
model.transform()을 이용하여 데이터에 대한 예측을 생성할 수 있다.
# mlflow를 사용하여 저장된 모델 로드 pipelineModel = mlflow.spark.load_model(f"runs:/{run_id}/model") # 예측 생성 inputDF = spark.read.parquet("../data/sf-airbnb-clean.parquet") predDF = pipelineModel.transform(inputDF)
Python
복사
배치 배포 시 결정해야 하는 것
얼마나 자주 예측을 생성할 것인지
얼마나 자주 모델을 재교육할 것인지
모델을 어떻게 버전화할 것인지

스트리밍 배포

유입되는 데이터에 대해 지속적으로 추론을 수행할 수 있다.
컴퓨팅 시간에 대해 지속적으로 비용을 지불해야 하기 때문에 배치 솔루션보다 비용이 많이 들지만 예측을 더 자주 생성하여 더 빠른 조치를 취할 수 있다는 추가 이점을 얻을 수 있다.
스파크를 이용하면 배치 예측을 스트리밍 예측으로 매우 쉽게 변환할 수 있으며 거의 모든 코드가 동일하다. 유일한 차이점은 spark.read() 대신 spark.readStream()을 사용하고 데이터 소스를 변경해야 한다는 것이다.
아래 코드는 동일한 parquet 파일을 100개의 작은 파케이 파일로 분할하여 스트리밍으로 작업을 수행하는 코드이다.
# 모델 로드 pipelineModel = mlflow.spark.load_model(f"runs:/{run_id}/model") # 스트리밍 데이터 셋업 repartitionedPath = "../data/sf-airbnb-clean-100p.parquet" schema = spark.read.parquet(repartitionedPath).schema streamingData = (spark .readStream .schema(schema) .option("maxFilesPerTrigger", 1) .parquet(repartitionedPath)) # 예측 생성 streamPred = pipelineModel.transform(streamingData)
Python
복사
스트리밍 데이터로 작업할 때는 스키마를 먼저 정의해야 하기 때문에 parquet 파일로 작업을 수행하고 있지만 스키마를 지정해주었다.

준-실시간 배포

사용 사례에 따라 수백 밀리초에서 몇 초 정도의 예측이 필요한 경우, MLlib을 사용하여 예측을 생성하는 예측 서버를 구축할 수 있다. 이것은 매우 적은 양의 데이터를 처리하기 때문에 스파크의 이상적인 사용 사례는 아니지만 스트리밍 또는 배치 솔루션보다 대기 시간이 짧다.

실시간 배포

사기 탐지, 광고 추천 등 실시간 추론이 필요한 응용 애플리케이션 분야에서 사용.
적은 수의 레코드로 예측을 수행하면 실시간 추론에 필요한 짧은 대기 시간을 달성할 수 있지만, 로드 밸런싱과 지리적 위치를 고려해야 한다. 짧은 지연 시간을 제공하는 관리형 서비스 중에서는 Amazon SageMaker, Azure ML과 같은 솔류션이 인기있다.
위의 서비스에 배포를 위해서는 모델을 스파크 외부로 내보내야 하는데 아래의 방법이 있다. 자세한 내용은 필요할 때 더 찾아보자..
방법1 : 파이썬, C 등으로 모델을 다시 구현.
방법2 : 라이브러리 이용
MLeap(더 이상 지원X), ONNX
방법3 : 써드파티 라이브러리 사용
XGBoost, H20.ai
import XGBoost as xgb bst = xgb.Booster({'ntread':4}) bst.load_model("XGBoost_native_model")
Python
복사

비 MLlib 모델에 스파크 활용

MLlib에 사용하려는 알고리즘에 대한 기본 제공 자원이 없거나, 대기 시간이 매우 짧은 추론 요구사항을 충족하지 않을 수 있다. 그런 경우 스파크는 활용할 수 있지만, MLlib은 사용할 수 없다. 그런 경우 사용자 정의 함수를 spark에 적용하여 해결할 수 있다. 그 중 아래에서는 판다스 UDF에 대해 설명하고자 한다.

판다스 UDF

교재의 자료가 너무 불친절해서 다음의 자료를 참고하여 내용을 정리하였습니다.
PySpark도 많은 데이터 변환을 제공하지만, pandas에서 제공하는 다양한 변환 기능을 따라가지는 못 한다. Spark 3.x 부터 pandas user-defined function (이후 pandas UDF)를 이용하여 PySpark와 pandas를 결합하는 것이 가능해졌다.
샘플 데이터 생성
# sample data 생성 g = np.tile(['group a', 'group b'], 10) x = np.linspace(0, 10., 20) np.random.seed(3) y_lin = 2*x + np.random.rand(len(x))/10. y_qua = 3*x**2 + np.random.rand(len(x)) df = pd.DataFrame({'group': g, 'x':x, 'y_lin': y_lin, 'y_qua':y_qua}) schema = "group STRING, x DOUBLE, ylin DOUBLE, y_qua DOUBLE" df = spark.createDataFrame(df, schema=schema)
Python
복사
pyspark.sql.functions.pandas_udf() 사용하기
예시1) 데이터 표준화 : Series to Series
# 표준화 작업 : series to series pandas UDF @F.pandas_udf(T.DoubleType()) def standardise(col1: pd.Series) -> pd.Series: return (col1 - col1.mean()) / col1.std() res = df.select(standardise(F.col('y_lin')).alias('result')) # alias : column 이름 변경 res.show(3)
Python
복사
# 위와 동일한 기능을 하는 코드 def standardise_func(col1: pd.Series) -> pd.Series: return (col1 - col1.mean()) / col1.std() standardise = F.pandas_udf(standardise_func, returnType=T.DoubleType()) res = df.select(standardise(F.col('y_lin')).alias('result')) res.show(3)
Python
복사
예시2) 전체 열에 * 2 하기 : Iterator of Series to Iterator of Series
# 전체 열에 *2 하기: Iterator of Series to Iterator of Series @F.pandas_udf(T.DoubleType()) def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: for x in iterator: yield x * 2 df.select(plus_one(F.col('x'))).show(3)
Python
복사
예시3) 두 열의 RMSE 구하기 : Iterator of Multiple Series to Iterator of Series
from typing import Tuple @F.pandas_udf(T.DoubleType()) def calculate_rmse(iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]: for a, b in iterator: yield ((a * 2) - b) ** 2 df.select(calculate_rmse("x", "y_lin")).show(3)
Python
복사
Pandas Function APIs
applyInPandas()
group 별로 함수를 적용할 수 있다.
(응용) group별로 다른 모델을 이용하여 예측하고자 할 때 사용할 수 있음.
# applyInPandas() : group 별 평균 구하기 def subtract_mean(pdf): # pdf is a pandas.DataFrame x = pdf.x return pdf.assign(x = x.mean()) result = df.groupby("group").applyInPandas(subtract_mean, schema = "group STRING, x DOUBLE, y_lin DOUBLE, y_qua DOUBLE").toPandas()
Python
복사
mapInPandas()
위에서 iterator → iterator와 비슷한 기능을 하는 함수. 하지만 입력과 동일한 개수의 자료를 반환할 필요는 없다.
# mapInPandas() : group a 데이터만 반환하기 def filter_func(iterator): for pdf in iterator: yield pdf[pdf.group == "group a"] result = df.mapInPandas(filter_func, schema=df.schema).toPandas()
Python
복사

분산 하이퍼파라미터 조정을 위한 스파크

분산 추론을 수행할 의도가 없거나 MLlib의 분산 훈련 기능이 필요하지 않더라도 분산 하이퍼파라미터 조정을 위해 스파크를 활용할 수 있다. 아래에서는 Joblib과 Hyperopt 오픈소스에 대해 설명한다.
Joblib
파이썬에서 정량 파이프라이닝을 제공하는 도구 세트
잡립은 데이터 복사본을 모든 작업자에게 자동으로 브로드캐스트하기 때문에 동시에 (하이퍼파라미터만 다른) 동일한 모델을 동일한 데이터에 대해 훈련시킬 수 있다.
scikit-learn 0.21 이상, pyspark 2.4.4 이상
joblib을 이용하여 하이퍼파라미터 튜닝 병렬 실행하기 with GridSearch
register_spark() # 스파크 백엔드 등록 # 파일 불러오기 및 데이터 분할 filePath = "../data/sf-airbnb-clean.parquet" df = spark.read.parquet(filePath).toPandas() # pandas 객체로 가져오기 df = df.drop(columns = ['host_is_superhost', 'cancellation_policy', 'instant_bookable', 'neighbourhood_cleansed', 'property_type', 'room_type', 'bed_type']) # 수치형 변수만 남김 (편의를 위해...) X_train, X_test, y_train, y_test = train_test_split(df.drop(["price"], axis=1), df[["price"]].values.ravel(), random_state=42) # GridSearch 객체 생성 param_grid = {"max_depth" : [2,5,7,10], "n_estimators": [20, 50, 70, 100]} gscv = GridSearchCV(rf, param_grid, cv = 3) # joblib (실행시간 : 16.25) with parallel_backend("spark", n_jobs = 3): gscv.fit(X_train, y_train) print(gscv.cv_results_) # 기본 실행 (실행시간 : 25.99) gscv.fit(X_train, y_train)
Python
복사
Hyperopt
Bayesian Optimization에 기반한 하이퍼파라미터 튜닝 라이브러리
아래의 코드에서 fmin()은 training_function에 사용될 새로운 하이퍼파라미터 구성을 생성(베이지안 최적화 기반)하고 이를 Spark에 전달한다. SparkTrials는 이러한 훈련 작업의 배치를 각 스파크 실행기에서 단일 스파크 작업으로 병렬 실행한다. 단일 모델 훈련이 끝나면 이를 드라이버에 반환하고, 하이퍼옵트는 다음 하이퍼파라미터 조합을 구성한다.
import hyperopt best_hyperparameters = hyperopt.fmin( fn = training_function, # 모델 훈련 함수 space = search_space, # 하이퍼파라미터 탐색 영역 지정 algo = hyperopt.tpe.suggest, # 최적화 알고리즘 선택 max_evals = 64, # 최대 반복 횟수 trials = hyperopt.SparkTrials(parallelism = 4) )
Python
복사