Registering UDF in Spark (Databricks)
Introduction¶
In this tutorial we will go through the process of creating and registering Spark UDF function which will be available in the Spark SQL and will perform calls to the deployed transformer model.
We will go through the process of setting up functions for MLFLow, TorchServe and GCP Cloud function deployment.
Steps¶
Data warehouses based on Spark and Databricks support Hive User-Defined Functions (UDFs) that are implemented in Java. Therefore, we need to develop a Java-based UDF and integrate it into the Spark cluster.
Spark processes data one row at a time when using a User-Defined Function (UDF), making it highly compatible with inference services that support server-side batch processing, such as TorchServe. Additionally, integrating other methods like GCP Cloud Functions is also feasible with this setup.
The steps we will go through are:
-
Implementation of the Hive UDF function in Java
-
Building the jar file and including it into the Spark cluster
-
Enabling and using the UDF function in SQL
Implementation of the Hive UDF function¶
Hive UDF function has to be implemented following concrete Hive interface, packaged as a jar file and included into the Spark cluster classpath.
The process of implementing and packaging Java code can be a bit involved, necessitating the use of build and packaging tools like Gradle. Gradle facilitates dependency management, code compilation, and assembling of jar files, streamlining these essential steps in Java development.
The example project creating such a jar file is available here. It includes dependencies for json parsing, JWT authentication (needed for calling GCP functions), as well as Hive related dependencies.
It also includes an EmbeddingUDF.java file, which contains implementation of the UDF function itself.
The function supports two modes, switching between them is done via API_TYPE
env variable.
-
API_TYPE=gcp
means calling GPC external function. -
API_TYPE=torchserve
means calling custom torch serve deployment. -
API_TYPE=mlflow
- for DataBricks based MLFlow deployment.
For API_TYPE=gcp
following extra environment variables are required:
-
ENDPOINT_URL
- URL to the API gateway + function subpath, e.g.{api_gateway_url}/sbert_embedding
. -
SA_KEYFILE_PATH
- Path to the private keyfile from the GCP Service Account. -
SA_EMAIL
- Client email. -
SA_AUDIENCE
- Same as{managed_service_value}
in the Snowflake registration tutorial.
For API_TYPE=torchserve
following extra environment variables are required:
ENDPOINT_URL
- URL to the TorchServe service with a transformer model deployed. For example if we run both Spark and TorchServe locally this will be something likehttp://localhost:8080/predictions/transformer
.
For API_TYPE=mlflow
following extra environment variables are required:
-
ENDPOINT_URL
- DataBricks MLFlow served model http url. -
DB_TOKEN
- DataBricks token.
Building jar file and including it into the classpath¶
To build the jar, run the following command:
This will create the jar file hive-embedding-udf/lib/build/libs/featurebyte-hive-embedding-udf-all.jar
. We will include this jar into the Spark classpath. This can be done via --jar parameter when starting spark master, using ADD JAR command.
Enabling UDF in Spark SQL¶
In order to enable the UDF function in Spark SQL we need to connect to the Spark thrift server using SQL client (for example DBeaver) and create a function referring to a class in the jar file we created. For example, given that jar file is built and included to the Spark classpath, the SQL might look like this:
add jar '/path/to/featurebyte-hive-embedding-udf-all.jar';
create or replace function F_SBERT_EMBEDDING AS 'com.featurebyte.hive.embedding.udf.EmbeddingUDF'
using jar '/path/to/featurebyte-hive-embedding-udf-all.jar';
Note that the UDF must be created within the schema specifically dedicated to FeatureByte. This schema is also used for storing other functions and metadata.
To confirm that the function is operating correctly, execute the following SQL command:
select
sentence,
F_SBERT_EMBEDDING(sentence) as vector
from (
select 'This is first example' as sentence
union
select 'This is second example' as sentence
);
Result: