Deploy Spark MLlib model with PMML InferenceService¶
Setup¶
- Install
pyspark
3.0.x andpyspark2pmml
pip install pyspark~=3.0.0 pip install pyspark2pmml
- Get JPMML-SparkML jar
Train a Spark MLlib model and export to PMML file¶
Launch pyspark with --jars
to specify the location of the JPMML-SparkML
uber-JAR
pyspark --jars ./jpmml-sparkml-executable-1.6.3.jar
Fitting a Spark ML pipeline:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import RFormula
df = spark.read.csv("Iris.csv", header = True, inferSchema = True)
formula = RFormula(formula = "Species ~ .")
classifier = DecisionTreeClassifier()
pipeline = Pipeline(stages = [formula, classifier])
pipelineModel = pipeline.fit(df)
from pyspark2pmml import PMMLBuilder
pmmlBuilder = PMMLBuilder(sc, df, pipelineModel)
pmmlBuilder.buildFile("DecisionTreeIris.pmml")
Upload the DecisionTreeIris.pmml
to a GCS bucket.
gsutil cp ./DecisionTreeIris.pmml gs://$BUCKET_NAME/sparkpmml/model.pmml
Test the Model locally¶
For testing the model locally, please refer the pmml server documentation.
Deploy Spark MLlib model with V1 protocol¶
Create the InferenceService with PMMLServer¶
Create the InferenceService
with pmml
predictor and specify the storageUri
with bucket location you uploaded to
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "spark-pmml"
spec:
predictor:
model:
modelFormat:
name: pmml
storageUri: gs://kfserving-examples/models/sparkpmml
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "spark-pmml"
spec:
predictor:
pmml:
storageUri: gs://kfserving-examples/models/sparkpmml
Warning
The pmmlserver
is based on Py4J and that doesn't support multi-process mode, so we can't set spec.predictor.containerConcurrency
.
If you want to scale the PMMLServer to improve prediction performance, you should set the InferenceService's resources.limits.cpu
to 1 and scale
the replica size.
Apply the InferenceService
custom resource
kubectl apply -f spark_pmml.yaml
Expected Output
$ inferenceservice.serving.kserve.io/spark-pmml created
Wait the InferenceService
to be ready
kubectl wait --for=condition=Ready inferenceservice spark-pmml
$ inferenceservice.serving.kserve.io/spark-pmml condition met
Run a prediction¶
The first step is to determine the ingress IP and ports and set INGRESS_HOST
and INGRESS_PORT
.
You can see an example payload below. Create a file named iris-input.json
with the sample input.
{
"instances": [
[5.1, 3.5, 1.4, 0.2]
]
}
MODEL_NAME=spark-pmml
INPUT_PATH=@./iris-input.json
SERVICE_HOSTNAME=$(kubectl get inferenceservice spark-pmml -o jsonpath='{.status.url}' | cut -d "/" -f 3)
curl -v -H "Host: ${SERVICE_HOSTNAME}" http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict -d $INPUT_PATH
Expected Output
* Connected to spark-pmml.default.35.237.217.209.xip.io (35.237.217.209) port 80 (#0)
> POST /v1/models/spark-pmml:predict HTTP/1.1
> Host: spark-pmml.default.35.237.217.209.xip.io
> User-Agent: curl/7.73.0
> Accept: */*
> Content-Length: 45
> Content-Type: application/x-www-form-urlencoded
>
* upload completely sent off: 45 out of 45 bytes
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< content-length: 39
< content-type: application/json; charset=UTF-8
< date: Sun, 07 Mar 2021 19:32:50 GMT
< server: istio-envoy
< x-envoy-upstream-service-time: 14
<
* Connection #0 to host spark-pmml.default.35.237.217.209.xip.io left intact
{"predictions": [[1.0, 0.0, 1.0, 0.0]]}
Deploy the model with Open Inference Protocol¶
Deploy the Model with REST endpoint through InferenceService¶
Lastly, you will use KServe to deploy the trained model onto Kubernetes.
For this, you will just need to use version v1beta1
of the
InferenceService
CRD and set the protocolVersion
field to v2
.
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "spark-v2-iris"
spec:
predictor:
model:
modelFormat:
name: pmml
protocolVersion: v2
runtime: kserve-pmmlserver
storageUri: "gs://kfserving-examples/models/sparkpmml"
Warning
The pmmlserver
is based on Py4J and that doesn't support multi-process mode, so we can't set spec.predictor.containerConcurrency
.
If you want to scale the PMMLServer to improve prediction performance, you should set the InferenceService's resources.limits.cpu
to 1 and scale
the replica size.
kubectl apply -f spark-v2-iris.yaml
Test the Deployed Model¶
You can now test your deployed model by sending a sample request.
Note that this request needs to follow the Open Inference Protocol.
You can see an example payload below. Create a file named iris-input-v2.json
with the sample input.
{
"inputs": [
{
"name": "input-0",
"shape": [2, 4],
"datatype": "FP32",
"data": [
[6.8, 2.8, 4.8, 1.4],
[6.0, 3.4, 4.5, 1.6]
]
}
]
}
INGRESS_HOST
and INGRESS_PORT
.
Now, you can use curl
to send the inference request as:
SERVICE_HOSTNAME=$(kubectl get inferenceservice spark-v2-iris -o jsonpath='{.status.url}' | cut -d "/" -f 3)
curl -v \
-H "Host: ${SERVICE_HOSTNAME}" \
-H "Content-Type: application/json" \
-d @./iris-input-v2.json \
http://${INGRESS_HOST}:${INGRESS_PORT}/v2/models/spark-v2-iris/infer
Expected Output
{
"model_name": "spark-v2-iris",
"model_version": null,
"id": "a187a478-c614-46ce-a7de-2f07871f43f3",
"parameters": null,
"outputs": [
{
"name": "Species",
"shape": [
2
],
"datatype": "BYTES",
"parameters": null,
"data": [
"versicolor",
"versicolor"
]
},
{
"name": "Probability_setosa",
"shape": [
2
],
"datatype": "FP64",
"parameters": null,
"data": [
0,
0
]
},
{
"name": "Probability_versicolor",
"shape": [
2
],
"datatype": "FP64",
"parameters": null,
"data": [
0.9074074074074074,
0.9074074074074074
]
},
{
"name": "Probability_virginica",
"shape": [
2
],
"datatype": "FP64",
"parameters": null,
"data": [
0.09259259259259259,
0.09259259259259259
]
},
{
"name": "Node_Id",
"shape": [
2
],
"datatype": "BYTES",
"parameters": null,
"data": [
"6",
"6"
]
}
]
}
Deploy the Model with GRPC endpoint through InferenceService¶
Create the inference service resource and expose the gRPC port using the below yaml.
Note
Currently, KServe only supports exposing either HTTP or gRPC port. By default, HTTP port is exposed.
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "spark-v2-iris-grpc"
spec:
predictor:
model:
modelFormat:
name: pmml
protocolVersion: v2
runtime: kserve-pmmlserver
storageUri: "gs://kfserving-examples/models/sparkpmml"
ports:
- name: h2c # knative expects grpc port name to be 'h2c'
protocol: TCP
containerPort: 8081
apiVersion: "serving.kserve.io/v1beta1"
kind: "InferenceService"
metadata:
name: "spark-v2-iris-grpc"
spec:
predictor:
model:
modelFormat:
name: pmml
protocolVersion: v2
runtime: kserve-pmmlserver
storageUri: "gs://kfserving-examples/models/sparkpmml"
ports:
- name: grpc-port # Istio requires the port name to be in the format <protocol>[-<suffix>]
protocol: TCP
containerPort: 8081
Warning
The pmmlserver
is based on Py4J and that doesn't support multi-process mode, so we can't set spec.predictor.containerConcurrency
.
If you want to scale the PMMLServer to improve prediction performance, you should set the InferenceService's resources.limits.cpu
to 1 and scale
the replica size.
Apply the InferenceService yaml to get the gRPC endpoint
kubectl apply -f spark-v2-grpc.yaml
Test the deployed model with grpcurl¶
After the gRPC InferenceService
becomes ready, grpcurl, can be used to send gRPC requests to the InferenceService
.
# download the proto file
curl -O https://raw.githubusercontent.com/kserve/open-inference-protocol/main/specification/protocol/open_inference_grpc.proto
INPUT_PATH=iris-input-v2-grpc.json
PROTO_FILE=open_inference_grpc.proto
SERVICE_HOSTNAME=$(kubectl get inferenceservice spark-v2-iris-grpc -o jsonpath='{.status.url}' | cut -d "/" -f 3)
INGRESS_HOST
and INGRESS_PORT
. Now, you can use curl
to send the inference requests.
The gRPC APIs follows the KServe prediction V2 protocol / Open Inference Protocol.
For example, ServerReady
API can be used to check if the server is ready:
grpcurl \
-plaintext \
-proto ${PROTO_FILE} \
-authority ${SERVICE_HOSTNAME} \
${INGRESS_HOST}:${INGRESS_PORT} \
inference.GRPCInferenceService.ServerReady
Expected Output
{
"ready": true
}
You can test the deployed model by sending a sample request with the below payload.
Notice that the input format differs from the in the previous REST endpoint
example.
Prepare the inference input inside the file named iris-input-v2-grpc.json
.
{
"model_name": "spark-v2-iris-grpc",
"inputs": [
{
"name": "input-0",
"shape": [2, 4],
"datatype": "FP32",
"contents": {
"fp32_contents": [6.8, 2.8, 4.8, 1.4, 6.0, 3.4, 4.5, 1.6]
}
}
]
}
ModelInfer
API takes input following the ModelInferRequest
schema defined in the grpc_predict_v2.proto
file.
grpcurl \
-vv \
-plaintext \
-proto ${PROTO_FILE} \
-authority ${SERVICE_HOSTNAME} \
-d @ \
${INGRESS_HOST}:${INGRESS_PORT} \
inference.GRPCInferenceService.ModelInfer \
<<< $(cat "$INPUT_PATH")
Expected Output
Resolved method descriptor:
// The ModelInfer API performs inference using the specified model. Errors are
// indicated by the google.rpc.Status returned for the request. The OK code
// indicates success and other codes indicate failure.
rpc ModelInfer ( .inference.ModelInferRequest ) returns ( .inference.ModelInferResponse );
Request metadata to send:
(empty)
Response headers received:
content-type: application/grpc
date: Mon, 09 Oct 2023 11:07:26 GMT
grpc-accept-encoding: identity, deflate, gzip
server: istio-envoy
x-envoy-upstream-service-time: 16
Estimated response size: 83 bytes
Response contents:
{
"model_name": "spark-v2-iris",
"model_version": null,
"id": "a187a478-c614-46ce-a7de-2f07871f43f3",
"parameters": null,
"outputs": [
{
"name": "Species",
"shape": [
2
],
"datatype": "BYTES",
"parameters": null,
"data": [
"versicolor",
"versicolor"
]
},
{
"name": "Probability_setosa",
"shape": [
2
],
"datatype": "FP64",
"parameters": null,
"data": [
0,
0
]
},
{
"name": "Probability_versicolor",
"shape": [
2
],
"datatype": "FP64",
"parameters": null,
"data": [
0.9074074074074074,
0.9074074074074074
]
},
{
"name": "Probability_virginica",
"shape": [
2
],
"datatype": "FP64",
"parameters": null,
"data": [
0.09259259259259259,
0.09259259259259259
]
},
{
"name": "Node_Id",
"shape": [
2
],
"datatype": "BYTES",
"parameters": null,
"data": [
"6",
"6"
]
}
]
}
Response trailers received:
(empty)
Sent 1 request and received 1 response