Deploying Spark MLlib Models with KServe
This guide demonstrates how to train and deploy Spark MLlib models with PMML format using KServe's InferenceService. Spark MLlib is a scalable machine learning library that provides various algorithms and utilities.
Prerequisites
Before you begin, make sure you have:
- A Kubernetes cluster with KServe installed.
kubectl
CLI configured to communicate with your cluster.- Basic knowledge of Kubernetes concepts and Spark MLlib.
- Python environment with the following packages:
pyspark
3.0.x or laterpyspark2pmml
- JPMML-SparkML jar file
- Access to cloud storage (like Google Cloud Storage) to store your PMML model
Training a Spark MLlib Model and Exporting to PMML
Setting Up Your Environment
- Install the required Python packages:
pip install pyspark~=3.0.0
pip install pyspark2pmml
- Download the JPMML-SparkML jar:
wget https://github.com/jpmml/jpmml-sparkml/releases/download/1.6.3/jpmml-sparkml-executable-1.6.3.jar
Training and Exporting the Model
Launch PySpark with the JPMML-SparkML jar:
pyspark --jars ./jpmml-sparkml-executable-1.6.3.jar
Train a model using the Iris dataset and export it to PMML format:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import RFormula
from pyspark2pmml import PMMLBuilder
spark = SparkSession.builder.appName("SparkMLlib-KServe-Example").getOrCreate()
# Load the Iris dataset
df = spark.read.csv("Iris.csv", header=True, inferSchema=True)
# Define the pipeline
formula = RFormula(formula="Species ~ .")
classifier = DecisionTreeClassifier()
pipeline = Pipeline(stages=[formula, classifier])
pipelineModel = pipeline.fit(df)
# Export to PMML
pmmlBuilder = PMMLBuilder(spark, df, pipelineModel)
pmmlBuilder.buildFile("DecisionTreeIris.pmml")
Uploading the Model to Cloud Storage
Upload the generated PMML file to your cloud storage:
gsutil cp ./DecisionTreeIris.pmml gs://YOUR_BUCKET_NAME/sparkpmml/model.pmml
Testing the Model Locally
For local testing, you can use the KServe PMML server. Please refer to the PMML server documentation for detailed instructions on testing locally.
The pmmlserver
runtime used for Spark MLlib model deployment is based on Py4J and doesn't support multi-process mode, so you can't set spec.predictor.containerConcurrency
. If you want to scale the PMMLServer to improve prediction performance, you should set the InferenceService's resources.limits.cpu
to 1 and scale the replica size.
Deploying the Model with V1 Protocol
Creating the InferenceService
To deploy your Spark MLlib PMML model, create an InferenceService
resource with the PMML format:
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "spark-pmml"
spec:
predictor:
model:
modelFormat:
name: pmml
storageUri: "gs://kfserving-examples/models/sparkpmml"
Apply the YAML manifest:
kubectl apply -f spark-pmml.yaml
inferenceservice.serving.kserve.io/spark-pmml created
Wait for the InferenceService
to be ready:
kubectl wait --for=condition=Ready inferenceservice spark-pmml
Running a Prediction
First, determine the ingress IP and ports, then set the INGRESS_HOST
and INGRESS_PORT
environment variables.
Create a file named iris-input.json
with the following sample input:
{
"instances": [
[5.1, 3.5, 1.4, 0.2]
]
}
Send the inference request:
MODEL_NAME=spark-pmml
INPUT_PATH=@./iris-input.json
SERVICE_HOSTNAME=$(kubectl get inferenceservice spark-pmml -o jsonpath='{.status.url}' | cut -d "/" -f 3)
curl -v -H "Host: ${SERVICE_HOSTNAME}" -H "Content-Type: application/json" http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict -d $INPUT_PATH
* Connected to spark-pmml.default.35.237.217.209.xip.io (35.237.217.209) port 80 (#0)
> POST /v1/models/spark-pmml:predict HTTP/1.1
> Host: spark-pmml.default.35.237.217.209.xip.io
> User-Agent: curl/7.73.0
> Accept: */*
> Content-Length: 45
> Content-Type: application/x-www-form-urlencoded
>
* upload completely sent off: 45 out of 45 bytes
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< content-length: 39
< content-type: application/json; charset=UTF-8
< date: Sun, 07 Mar 2021 19:32:50 GMT
< server: istio-envoy
< x-envoy-upstream-service-time: 14
<
* Connection #0 to host spark-pmml.default.35.237.217.209.xip.io left intact
{"predictions": [[1.0, 0.0, 1.0, 0.0]]}
Deploying the Model with Open Inference Protocol (V2)
Creating the InferenceService
To deploy your Spark MLlib model with the Open Inference Protocol, create an InferenceService
resource with protocolVersion: v2
:
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "spark-iris"
spec:
predictor:
model:
modelFormat:
name: pmml
protocolVersion: v2
runtime: kserve-pmmlserver
storageUri: "gs://kfserving-examples/models/sparkpmml"
Apply the YAML manifest:
kubectl apply -f spark-iris.yaml
Testing the Deployed Model
First, determine the ingress IP and ports, then set the INGRESS_HOST
and INGRESS_PORT
environment variables.
Create a file named iris-input-v2.json
with the following sample input:
{
"inputs": [
{
"name": "input-0",
"shape": [2, 4],
"datatype": "FP32",
"data": [
[6.8, 2.8, 4.8, 1.4],
[6.0, 3.4, 4.5, 1.6]
]
}
]
}
Send the inference request:
SERVICE_HOSTNAME=$(kubectl get inferenceservice spark-iris -o jsonpath='{.status.url}' | cut -d "/" -f 3)
curl -v \
-H "Host: ${SERVICE_HOSTNAME}" \
-H "Content-Type: application/json" \
-d @./iris-input-v2.json \
http://${INGRESS_HOST}:${INGRESS_PORT}/v2/models/spark-iris/infer
{
"model_name": "spark-iris",
"model_version": null,
"id": "a187a478-c614-46ce-a7de-2f07871f43f3",
"parameters": null,
"outputs": [
{
"name": "Species",
"shape": [2],
"datatype": "BYTES",
"parameters": null,
"data": ["versicolor", "versicolor"]
},
{
"name": "Probability_setosa",
"shape": [2],
"datatype": "FP64",
"parameters": null,
"data": [0, 0]
},
{
"name": "Probability_versicolor",
"shape": [2],
"datatype": "FP64",
"parameters": null,
"data": [0.9074074074074074, 0.9074074074074074]
},
{
"name": "Probability_virginica",
"shape": [2],
"datatype": "FP64",
"parameters": null,
"data": [0.09259259259259259, 0.09259259259259259]
},
{
"name": "Node_Id",
"shape": [2],
"datatype": "BYTES",
"parameters": null,
"data": ["6", "6"]
}
]
}
Deploying the Model with gRPC Endpoint
For applications requiring gRPC communication, you can expose a gRPC endpoint by modifying the InferenceService
definition.
KServe currently supports exposing either HTTP or gRPC port, not both simultaneously. By default, the HTTP port is exposed.
- Serverless
- Raw Deployment
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "spark-iris-grpc"
spec:
predictor:
model:
modelFormat:
name: pmml
protocolVersion: v2
runtime: kserve-pmmlserver
storageUri: "gs://kfserving-examples/models/sparkpmml"
ports:
- name: h2c # knative expects grpc port name to be 'h2c'
protocol: TCP
containerPort: 8081
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "spark-iris-grpc"
spec:
predictor:
model:
modelFormat:
name: pmml
protocolVersion: v2
runtime: kserve-pmmlserver
storageUri: "gs://kfserving-examples/models/sparkpmml"
ports:
- name: grpc-port # Istio requires the port name to be in the format <protocol>[-<suffix>]
protocol: TCP
containerPort: 8081
Apply the YAML to create the gRPC InferenceService:
kubectl apply -f spark-iris-grpc.yaml
Testing the gRPC Endpoint with grpcurl
First, determine the ingress IP and ports, then set the INGRESS_HOST
and INGRESS_PORT
environment variables.
After the gRPC InferenceService
becomes ready, use grpcurl to send gRPC requests:
# Download the proto file
curl -O https://raw.githubusercontent.com/kserve/open-inference-protocol/main/specification/protocol/open_inference_grpc.proto
INPUT_PATH=iris-input-grpc.json
PROTO_FILE=open_inference_grpc.proto
SERVICE_HOSTNAME=$(kubectl get inferenceservice spark-iris-grpc -o jsonpath='{.status.url}' | cut -d "/" -f 3)
First, check if the server is ready:
grpcurl \
-plaintext \
-proto ${PROTO_FILE} \
-authority ${SERVICE_HOSTNAME} \
${INGRESS_HOST}:${INGRESS_PORT} \
inference.GRPCInferenceService.ServerReady
{
"ready": true
}
To test the model with inference requests, create an input file iris-input-grpc.json
:
{
"model_name": "spark-iris-grpc",
"inputs": [
{
"name": "input-0",
"shape": [2, 4],
"datatype": "FP32",
"contents": {
"fp32_contents": [6.8, 2.8, 4.8, 1.4, 6.0, 3.4, 4.5, 1.6]
}
}
]
}
Send the gRPC inference request:
grpcurl \
-vv \
-plaintext \
-proto ${PROTO_FILE} \
-authority ${SERVICE_HOSTNAME} \
-d @ \
${INGRESS_HOST}:${INGRESS_PORT} \
inference.GRPCInferenceService.ModelInfer \
<<< $(cat "$INPUT_PATH")
Response contents:
{
"model_name": "spark-iris",
"model_version": null,
"id": "a187a478-c614-46ce-a7de-2f07871f43f3",
"parameters": null,
"outputs": [
{
"name": "Species",
"shape": [
2
],
"datatype": "BYTES",
"parameters": null,
"data": [
"versicolor",
"versicolor"
]
},
{
"name": "Probability_setosa",
"shape": [
2
],
"datatype": "FP64",
"parameters": null,
"data": [
0,
0
]
},
{
"name": "Probability_versicolor",
"shape": [
2
],
"datatype": "FP64",
"parameters": null,
"data": [
0.9074074074074074,
0.9074074074074074
]
},
{
"name": "Probability_virginica",
"shape": [
2
],
"datatype": "FP64",
"parameters": null,
"data": [
0.09259259259259259,
0.09259259259259259
]
},
{
"name": "Node_Id",
"shape": [
2
],
"datatype": "BYTES",
"parameters": null,
"data": [
"6",
"6"
]
}
]
}