MLOPS/SERVING

kserve using kfp of iris classification

개발허재 2023. 3. 9. 14:26

Kubeflow Pipeline python api code

import kfp
from kfp import dsl
from functools import partial
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import accuracy_score
import numpy as np
from kfp.components import InputPath, OutputPath, create_component_from_func


@partial(
    create_component_from_func,
    packages_to_install=["scikit-learn","kubernetes","xgboost==1.5.0","minio"])
def process(model_path: OutputPath(str)) -> None:
    from sklearn.datasets import load_iris
    from sklearn.model_selection import train_test_split
    from xgboost import XGBClassifier
    from sklearn.metrics import accuracy_score
    from kubernetes import client, config
    from minio import Minio
    import os
    import glob
    import joblib
    
    
    iris = load_iris()
    X_train, X_test, y_train, y_test = train_test_split(iris.data, iris.target, test_size=0.2)
    model = XGBClassifier()
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    accuracy = accuracy_score(y_test, y_pred)
    print('test acc: ', accuracy)

    os.makedirs(model_path, exist_ok=True)

    joblib.dump(model, os.path.join(model_path, "model.joblib"))

    minio_client = Minio(
            "minio-service.kubeflow:9000",
            access_key="minio",
            secret_key="minio123",
            secure=False
        )
    minio_bucket = "mlpipeline"

    def upload_local_directory_to_minio(local_path, bucket_name, minio_path):
        assert os.path.isdir(local_path)

        for local_file in glob.glob(local_path + '/**'):
            local_file = local_file.replace(os.sep, "/") # Replace \ with / on Windows
            if not os.path.isfile(local_file):
                upload_local_directory_to_minio(
                    local_file, bucket_name, minio_path + "/" + os.path.basename(local_file))
            else:
                remote_path = os.path.join(
                    minio_path, local_file[1 + len(local_path):])
                remote_path = remote_path.replace(
                    os.sep, "/")  # Replace \ with / on Windows
                minio_client.fput_object(bucket_name, remote_path, local_file)
    
    upload_local_directory_to_minio(model_path, minio_bucket, "models/iris/2/") # 1 for version 1


@partial(
    create_component_from_func,
    packages_to_install=["kubernetes","kserve==0.8.0","protobuf==3.20.0"])
def deploy_model() -> None:
    from kubernetes import client
    from kserve import (
    constants,
    KServeClient,
    V1beta1InferenceService,
    V1beta1InferenceServiceSpec,
    V1beta1PredictorSpec,
    V1beta1ModelSpec,
    V1beta1ModelFormat,
    utils,
    V1beta1XGBoostSpec
    )
    from kubernetes.client import V1ResourceRequirements
    import os
    
    service_name = "iris-kfp"
    predictor = V1beta1PredictorSpec(
        min_replicas=1,
        xgboost=V1beta1XGBoostSpec(
            
            storage_uri="s3://mlpipeline/models/iris/2",
            resources=V1ResourceRequirements(
                requests={"cpu": "100m", "memory": "256Mi"},
                limits={"cpu": "100m", "memory": "256Mi"},
            ),
        ),
        service_account_name='sa-minio-kserve'
    )
    namespace = utils.get_default_target_namespace()
    isvc = V1beta1InferenceService(
        
        api_version=constants.KSERVE_V1BETA1,
        kind=constants.KSERVE_KIND,
        metadata=client.V1ObjectMeta(
            
            name=service_name, namespace=namespace, annotations={'sidecar.istio.io/inject':'false'}
        ),
        spec=V1beta1InferenceServiceSpec(predictor=predictor),
    )

    kserve = KServeClient()

    kserve.create(inferenceservice=isvc)

# Define the pipeline
@kfp.dsl.pipeline(name='Iris Pipeline')
def iris_pipeline():
    process_result = process()
    deploy_result = deploy_model()
    
    
# Compile the pipeline
kfp.compiler.Compiler().compile(iris_pipeline, 'iris_pipeline_and_serving.yaml')

kubeflow pipeline은 데이터 로드, 학습, 테스트, 모델 저장(minio) 순서로 process 라는 component와 kserve로 minio에서 저장된 모델을 서빙하는 deploy_model component로 구성했다.

 

KFP 결과

컴파일된 iris_pipeline_with_serving.yaml 파일을 kubeflow pipelines 에 업로드 후 create run 을 통해 pipeline을 실행시켰다.

위와 같이, 오류없이 정상적으로 Success 한 것을 볼 수 있고, 로그파일은 minio의 mlpipeline 버킷 하위에 저장되는 것을 확인할 수 있다.

또한, 아래와 같이 모델 서빙 성공 여부도 확인 할 수 있다.

 

 

kubeflow Minio service type을 NodePort로 변경한 뒤 Minio UI에 접속해보면, 아래와 같이 mlpipeline 버킷의 modes/iris/2 디렉토리에 모델이 저장된 것을 확인할 수 있다.