Kubeflow - Kserve Python SDK Serving
kubeflow 설치는 아래 링크 참조
https://jeawoo0594.tistory.com/54
Kubeflow - Kubeadm K8s Single Cluster
kubeflow 설치 docker, helm, kustomize 설치 필요 Kubernetes v1.22.7 version Kubeadm K8s Cluster 로컬환경이 아닌 LocalPC(Client)에서 Server VM에 요청하는 방식 Install Kubernetes - Kubeadm 모두의 MLOps의 Kubeadm 설치 kubelet kubead
jeawoo0594.tistory.com
kubeflow notebooks 컴포넌트를 통해 노트북 생성
workspace volume은 host path로 설정한 storageclass를 통해 pv, pvc를 생성한다.
connect 버튼을 눌러서 주피터랩 하나를 생성한다.
mnist Pytorch 학습 코드 작성
from __future__ import print_function
import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
def train(args, model, device, train_loader, optimizer, epoch):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % args.log_interval == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(train_loader.dataset),
100. * batch_idx / len(train_loader), loss.item()))
if args.dry_run:
break
def test(model, device, test_loader):
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss
pred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probability
correct += pred.eq(target.view_as(pred)).sum().item()
test_loss /= len(test_loader.dataset)
print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
test_loss, correct, len(test_loader.dataset),
100. * correct / len(test_loader.dataset)))
parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
parser.add_argument('--batch-size', type=int, default=64, metavar='N',
help='input batch size for training (default: 64)')
parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
help='input batch size for testing (default: 1000)')
parser.add_argument('--epochs', type=int, default=1, metavar='N',
help='number of epochs to train (default: 14)')
parser.add_argument('--lr', type=float, default=1.0, metavar='LR',
help='learning rate (default: 1.0)')
parser.add_argument('--gamma', type=float, default=0.7, metavar='M',
help='Learning rate step gamma (default: 0.7)')
parser.add_argument('--no-cuda', action='store_true', default=False,
help='disables CUDA training')
parser.add_argument('--no-mps', action='store_true', default=False,
help='disables macOS GPU training')
parser.add_argument('--dry-run', action='store_true', default=False,
help='quickly check a single pass')
parser.add_argument('--seed', type=int, default=1, metavar='S',
help='random seed (default: 1)')
parser.add_argument('--log-interval', type=int, default=10, metavar='N',
help='how many batches to wait before logging training status')
parser.add_argument('--save-model', action='store_true', default=True,
help='For Saving the current Model')
args = parser.parse_args(args=[])
use_cuda = not args.no_cuda and torch.cuda.is_available()
use_mps = args.no_mps
torch.manual_seed(args.seed)
if use_cuda:
device = torch.device("cuda")
elif use_mps:
device = torch.device("mps")
else:
device = torch.device("cpu")
train_kwargs = {'batch_size': args.batch_size}
test_kwargs = {'batch_size': args.test_batch_size}
if use_cuda:
cuda_kwargs = {'num_workers': 1,
'pin_memory': True,
'shuffle': True}
train_kwargs.update(cuda_kwargs)
test_kwargs.update(cuda_kwargs)
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])
dataset1 = datasets.MNIST('./data', train=True, download=True,
transform=transform)
dataset2 = datasets.MNIST('./data', train=False,
transform=transform)
train_loader = torch.utils.data.DataLoader(dataset1,**train_kwargs)
test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs)
model = Net().to(device)
optimizer = optim.Adadelta(model.parameters(), lr=args.lr)
scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma)
for epoch in range(1, args.epochs + 1):
train(args, model, device, train_loader, optimizer, epoch)
test(model, device, test_loader)
scheduler.step()
if args.save_model:
torch.save(model.state_dict(), "mnist_cnn.pt")
위 학습코드를 실행하여 mnist_cnn.pt 라는 model weight file을 생성한다.
!pip install torch-model-archiver==0.5.1
kserve는 pytorch 모델을 위한 서빙 라이브러리인 torchserve를 기본적으로 지원하기 때문에 torch-model-archiver 패키지를 통해 모델 아티팩트를 하나로 패키징하는 Torchserve 모델 아카이브 파일(MAR) 를 만들어야 한다.
또한, MAR 파일을 inferenceservice로 만들기 위한 폴더 생성규칙을 지키기 위해 아래와 같이 폴더를 생성해야 한다.
!mkdir -p {my-model-management-dir}/config {my-model-management-dir}/model-store
torchserve의 모델 archive config정보인 config.properties를 작성하여 {my-model-management-dir}/config 디렉토리 안에 넣어주고, mar파일은 아래와 같이 {my-model-management-dir}/model-storte 디렉토리 안에 생성해준다.
mnist.py, mnist_cnn.pt, handler와 같은 파일들은 https://github.com/pytorch/serve.git git clone 하여 가져올 수 있다.
!torch-model-archiver --model-name=mnist \
--version=1.0 \
--model-file=serve/examples/image_classifier/mnist/mnist.py \
--serialized-file=serve/examples/image_classifier/mnist/mnist_cnn.pt \
--handler=serve/examples/image_classifier/mnist/mnist_handler.py \
--export-path={my-model-management-dir}/model-store/
%%writefile {my-model-management-dir}/config/config.properties
inference_address=http://0.0.0.0:8085
management_address=http://0.0.0.0:8085
metrics_address=http://0.0.0.0:8082
grpc_inference_port=7070
grpc_management_port=7071
enable_envvars_config=true
install_py_dep_per_model=true
enable_metrics_api=true
metrics_format=prometheus
NUM_WORKERS=1
number_of_netty_threads=4
job_queue_size=10
model_store=/home/model-server/shared/model-store
model_snapshot={"name":"startup.cfg","modelCount":1,"models":{"mnist":{"1.0":{"defaultVersion":true,"marName":"mnist.mar","minWorkers":1,"maxWorkers":5,"batchSize":5,"maxBatchDelay":200,"responseTimeout":60}}}}
kserve python api 설치
!pip install kserve==0.8 && pip uninstall pyOpenSSL -y
kserve 라이브러리 불러오기
from kubernetes import client
from kserve import utils
from kserve import KServeClient
from kserve import constants
from kserve import V1beta1PredictorSpec
from kserve import V1beta1InferenceServiceSpec
from kserve import V1beta1InferenceService
from kserve import V1beta1TorchServeSpec
default model spec 선언
default_model_spec = V1beta1InferenceServiceSpec(predictor=V1beta1PredictorSpec(pytorch=V1beta1TorchServeSpec(
storage_uri='pvc://{my-notebookserver-pvc}/{my-model-management-dir}')))
inferenceservice 생성
namespace = utils.get_default_target_namespace()
print('namespace : {}'.format(namespace))
isvc = V1beta1InferenceService(api_version=constants.KSERVE_V1BETA1,
kind=constants.KSERVE_KIND,
metadata=client.V1ObjectMeta(name='mnist-test', namespace=namespace),
spec=default_model_spec)
kserve = KServeClient()
kserve.create(isvc)
#############아래와 같이 output이 나오면 성공#################
{'apiVersion': 'serving.kserve.io/v1beta1',
'kind': 'InferenceService',
'metadata': {'creationTimestamp': '2023-01-26T10:44:15Z',
'generation': 1,
'labels': {'serviceEnvelope': 'kserve'},
'managedFields': [{'apiVersion': 'serving.kserve.io/v1beta1',
'fieldsType': 'FieldsV1',
'fieldsV1': {'f:spec': {'.': {},
'f:predictor': {'.': {},
'f:pytorch': {'.': {}, 'f:name': {}, 'f:storageUri': {}}}}},
'manager': 'OpenAPI-Generator',
'operation': 'Update',
'time': '2023-01-26T10:44:13Z'}],
'name': 'mnist-test',
'namespace': 'kubeflow-user-example-com',
'resourceVersion': '43776',
'uid': '760a15e5-f44f-4e96-b52b-c2f49ff3ba76'},
'spec': {'predictor': {'model': {'modelFormat': {'name': 'pytorch'},
'name': '',
'resources': {},
'runtime': 'kserve-torchserve',
'storageUri': 'pvc://sbd-volume/mnist-serving'}}}}
inferenceservice create python api 호출 후 약간 기다리면 predict 관련 pod이 생성되는 것을 cluster에서 확인할 수 있다. 또한, kubeflow models에 아래와 같이 등록이 된다.
kserve-container, queue-proxy 컨테이너가 정상적으로 running 되고 나는 istio injection을 false로 지정하지않았으니 istio-proxy 컨테이너까지 정상적으로 3개 컨테이너가 running되는 것을 확인한다.
request to model Rest API
k8s 클러스터 외부에서 istio-gateway로 호출할 수 있다.
curl --location --request POST 'http://{ClusterIP}:{Istio-ingress-port}/v1/models/mnist-test:predict' \
--header 'Host: mnist-test.kubeflow-user-example-com.example.com' \
--header 'Cookie: authservice_session=MTY3NDc...' \
--header 'Content-Type: application/json' \
--data-raw '{
"instances": [
{
"data": "iVBORw0KGgoAAAANSUhEUgAAABwAAAAcCAAAAABXZoBIAAAAw0lEQVR4nGNgGFggVVj4/y8Q2GOR83n+58/fP0DwcSqmpNN7oOTJw6f+/H2pjUU2JCSEk0EWqN0cl828e/FIxvz9/9cCh1zS5z9/G9mwyzl/+PNnKQ45nyNAr9ThMHQ/UG4tDofuB4bQIhz6fIBenMWJQ+7Vn7+zeLCbKXv6z59NOPQVgsIcW4QA9YFi6wNQLrKwsBebW/68DJ388Nun5XFocrqvIFH59+XhBAxThTfeB0r+vP/QHbuDCgr2JmOXoSsAAKK7bU3vISS4AAAAAElFTkSuQmCC",
"target": 0
}
]
}
'
내 model namedms mnist-test이고, header의 Host값은 kubectl get ksvc -n kubeflow-user-example-com 하여 나온 url의 http:// 를 제외한 주소이며 Cookie값은 kubeflow Authorization인 authservice_session값이다.
위와 같이 호출하게 되면 아래와 같이 정상적으로 응답이 온다.
{
"predictions": [
2
]
}