MLOPS/SERVING
kserve using kfp of iris classification
개발허재
2023. 3. 9. 14:26
Kubeflow Pipeline python api code
import kfp
from kfp import dsl
from functools import partial
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score
import numpy as np
from kfp.components import InputPath, OutputPath, create_component_from_func
@partial(
create_component_from_func,
packages_to_install=["scikit-learn","kubernetes","xgboost==1.5.0","minio"])
def process(model_path: OutputPath(str)) -> None:
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from xgboost import XGBClassifier
from sklearn.metrics import accuracy_score
from kubernetes import client, config
from minio import Minio
import os
import glob
import joblib
iris = load_iris()
X_train, X_test, y_train, y_test = train_test_split(iris.data, iris.target, test_size=0.2)
model = XGBClassifier()
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print('test acc: ', accuracy)
os.makedirs(model_path, exist_ok=True)
joblib.dump(model, os.path.join(model_path, "model.joblib"))
minio_client = Minio(
"minio-service.kubeflow:9000",
access_key="minio",
secret_key="minio123",
secure=False
)
minio_bucket = "mlpipeline"
def upload_local_directory_to_minio(local_path, bucket_name, minio_path):
assert os.path.isdir(local_path)
for local_file in glob.glob(local_path + '/**'):
local_file = local_file.replace(os.sep, "/") # Replace \ with / on Windows
if not os.path.isfile(local_file):
upload_local_directory_to_minio(
local_file, bucket_name, minio_path + "/" + os.path.basename(local_file))
else:
remote_path = os.path.join(
minio_path, local_file[1 + len(local_path):])
remote_path = remote_path.replace(
os.sep, "/") # Replace \ with / on Windows
minio_client.fput_object(bucket_name, remote_path, local_file)
upload_local_directory_to_minio(model_path, minio_bucket, "models/iris/2/") # 1 for version 1
@partial(
create_component_from_func,
packages_to_install=["kubernetes","kserve==0.8.0","protobuf==3.20.0"])
def deploy_model() -> None:
from kubernetes import client
from kserve import (
constants,
KServeClient,
V1beta1InferenceService,
V1beta1InferenceServiceSpec,
V1beta1PredictorSpec,
V1beta1ModelSpec,
V1beta1ModelFormat,
utils,
V1beta1XGBoostSpec
)
from kubernetes.client import V1ResourceRequirements
import os
service_name = "iris-kfp"
predictor = V1beta1PredictorSpec(
min_replicas=1,
xgboost=V1beta1XGBoostSpec(
storage_uri="s3://mlpipeline/models/iris/2",
resources=V1ResourceRequirements(
requests={"cpu": "100m", "memory": "256Mi"},
limits={"cpu": "100m", "memory": "256Mi"},
),
),
service_account_name='sa-minio-kserve'
)
namespace = utils.get_default_target_namespace()
isvc = V1beta1InferenceService(
api_version=constants.KSERVE_V1BETA1,
kind=constants.KSERVE_KIND,
metadata=client.V1ObjectMeta(
name=service_name, namespace=namespace, annotations={'sidecar.istio.io/inject':'false'}
),
spec=V1beta1InferenceServiceSpec(predictor=predictor),
)
kserve = KServeClient()
kserve.create(inferenceservice=isvc)
# Define the pipeline
@kfp.dsl.pipeline(name='Iris Pipeline')
def iris_pipeline():
process_result = process()
deploy_result = deploy_model()
# Compile the pipeline
kfp.compiler.Compiler().compile(iris_pipeline, 'iris_pipeline_and_serving.yaml')
kubeflow pipeline은 데이터 로드, 학습, 테스트, 모델 저장(minio) 순서로 process 라는 component와 kserve로 minio에서 저장된 모델을 서빙하는 deploy_model component로 구성했다.
KFP 결과
컴파일된 iris_pipeline_with_serving.yaml 파일을 kubeflow pipelines 에 업로드 후 create run 을 통해 pipeline을 실행시켰다.
위와 같이, 오류없이 정상적으로 Success 한 것을 볼 수 있고, 로그파일은 minio의 mlpipeline 버킷 하위에 저장되는 것을 확인할 수 있다.
또한, 아래와 같이 모델 서빙 성공 여부도 확인 할 수 있다.
kubeflow Minio service type을 NodePort로 변경한 뒤 Minio UI에 접속해보면, 아래와 같이 mlpipeline 버킷의 modes/iris/2 디렉토리에 모델이 저장된 것을 확인할 수 있다.