Using UDF with FeatureByte SDK
Introduction¶
In this section we will use UDF functions defined previously for both Snowflake and Spark in FeatureByte SDK.
In [1]:
Copied!
import featurebyte as fb
fb.use_profile("tutorial")
import featurebyte as fb
fb.use_profile("tutorial")
13:12:05 | INFO | Using configuration file at: /Users/viktor/.featurebyte/config.yaml 13:12:05 | INFO | Active profile: tutorial (https://tutorials.featurebyte.com/api/v1) 13:12:05 | WARNING | Remote SDK version (0.5.1.dev70) is different from local (0.5.1.dev63). Update local SDK to avoid unexpected behavior. 13:12:05 | INFO | No catalog activated. 13:12:05 | INFO | 10 feature lists, 59 features deployed 13:12:05 | INFO | Using profile: tutorial 13:12:05 | INFO | Using configuration file at: /Users/viktor/.featurebyte/config.yaml 13:12:05 | INFO | Active profile: tutorial (https://tutorials.featurebyte.com/api/v1) 13:12:05 | WARNING | Remote SDK version (0.5.1.dev70) is different from local (0.5.1.dev63). Update local SDK to avoid unexpected behavior. 13:12:05 | INFO | No catalog activated. 13:12:06 | INFO | 10 feature lists, 59 features deployed
In [2]:
Copied!
feature_store_name = "playground"
catalog_name = "Grocery Dataset Embedding Tutorial"
catalog = fb.Catalog.create(catalog_name, feature_store_name=feature_store_name)
feature_store_name = "playground"
catalog_name = "Grocery Dataset Embedding Tutorial"
catalog = fb.Catalog.create(catalog_name, feature_store_name=feature_store_name)
Create FeatureByte UserDefinedFunction
instance¶
This object connects to a SQL function created in Data Warehouse.
In [3]:
Copied!
embedding_udf = fb.UserDefinedFunction.create(
name='embedding',
sql_function_name='F_SBERT_EMBEDDING',
function_parameters=[fb.FunctionParameter(name="x", dtype=fb.enum.DBVarType.VARCHAR)],
output_dtype=fb.enum.DBVarType.ARRAY,
is_global=False,
)
embedding_udf = fb.UserDefinedFunction.create(
name='embedding',
sql_function_name='F_SBERT_EMBEDDING',
function_parameters=[fb.FunctionParameter(name="x", dtype=fb.enum.DBVarType.VARCHAR)],
output_dtype=fb.enum.DBVarType.ARRAY,
is_global=False,
)
Create tables¶
In [4]:
Copied!
database_name = 'DEMO_DATASETS'
schema_name = 'GROCERY'
ds = catalog.get_data_source()
customer_source_table = ds.get_source_table(
database_name=database_name,
schema_name=schema_name,
table_name="GROCERYCUSTOMER"
)
invoice_source_table = ds.get_source_table(
database_name=database_name,
schema_name=schema_name,
table_name="GROCERYINVOICE"
)
items_source_table = ds.get_source_table(
database_name=database_name,
schema_name=schema_name,
table_name="INVOICEITEMS"
)
product_source_table = ds.get_source_table(
database_name=database_name,
schema_name=schema_name,
table_name="GROCERYPRODUCT"
)
customer_table = customer_source_table.create_scd_table(
name="GROCERYCUSTOMER",
surrogate_key_column='RowID',
natural_key_column="GroceryCustomerGuid",
effective_timestamp_column="ValidFrom",
current_flag_column ="CurrentRecord",
record_creation_timestamp_column="record_available_at"
)
invoice_table = invoice_source_table.create_event_table(
name="GROCERYINVOICE",
event_id_column="GroceryInvoiceGuid",
event_timestamp_column="Timestamp",
event_timestamp_timezone_offset_column="tz_offset",
record_creation_timestamp_column="record_available_at"
)
items_table = items_source_table.create_item_table(
name="INVOICEITEMS",
event_id_column="GroceryInvoiceGuid",
item_id_column="GroceryInvoiceItemGuid",
event_table_name="GROCERYINVOICE",
record_creation_timestamp_column="record_available_at"
)
product_table = product_source_table.create_dimension_table(
name="GROCERYPRODUCT",
dimension_id_column="GroceryProductGuid"
)
database_name = 'DEMO_DATASETS'
schema_name = 'GROCERY'
ds = catalog.get_data_source()
customer_source_table = ds.get_source_table(
database_name=database_name,
schema_name=schema_name,
table_name="GROCERYCUSTOMER"
)
invoice_source_table = ds.get_source_table(
database_name=database_name,
schema_name=schema_name,
table_name="GROCERYINVOICE"
)
items_source_table = ds.get_source_table(
database_name=database_name,
schema_name=schema_name,
table_name="INVOICEITEMS"
)
product_source_table = ds.get_source_table(
database_name=database_name,
schema_name=schema_name,
table_name="GROCERYPRODUCT"
)
customer_table = customer_source_table.create_scd_table(
name="GROCERYCUSTOMER",
surrogate_key_column='RowID',
natural_key_column="GroceryCustomerGuid",
effective_timestamp_column="ValidFrom",
current_flag_column ="CurrentRecord",
record_creation_timestamp_column="record_available_at"
)
invoice_table = invoice_source_table.create_event_table(
name="GROCERYINVOICE",
event_id_column="GroceryInvoiceGuid",
event_timestamp_column="Timestamp",
event_timestamp_timezone_offset_column="tz_offset",
record_creation_timestamp_column="record_available_at"
)
items_table = items_source_table.create_item_table(
name="INVOICEITEMS",
event_id_column="GroceryInvoiceGuid",
item_id_column="GroceryInvoiceItemGuid",
event_table_name="GROCERYINVOICE",
record_creation_timestamp_column="record_available_at"
)
product_table = product_source_table.create_dimension_table(
name="GROCERYPRODUCT",
dimension_id_column="GroceryProductGuid"
)
Initialize job settings¶
In [5]:
Copied!
%%capture
invoice_table.initialize_default_feature_job_setting()
%%capture
invoice_table.initialize_default_feature_job_setting()
Create and tag entities¶
In [6]:
Copied!
catalog.create_entity(name="customer", serving_names=["GROCERYCUSTOMERGUID"])
catalog.create_entity(name="invoice", serving_names=["GROCERYINVOICEGUID"])
catalog.create_entity(name="item", serving_names=["GROCERYINVOICEITEMGUID"])
catalog.create_entity(name="product", serving_names=["GROCERYPRODUCTGUID"])
catalog.create_entity(name="productgroup", serving_names=["PRODUCTGROUP"])
catalog.create_entity(name="frenchstate", serving_names=["FRENCHSTATE"])
# tag the entities for the grocery customer table
customer_table.GroceryCustomerGuid.as_entity("customer")
customer_table.State.as_entity("frenchstate")
# tag the entities for the grocery invoice table
invoice_table.GroceryInvoiceGuid.as_entity("invoice")
invoice_table.GroceryCustomerGuid.as_entity("customer")
# tag the entities for the grocery items table
items_table.GroceryInvoiceItemGuid.as_entity("item")
items_table.GroceryInvoiceGuid.as_entity("invoice")
items_table.GroceryProductGuid.as_entity("product")
# tag the entities for the grocery product table
product_table.GroceryProductGuid.as_entity("product")
product_table.ProductGroup.as_entity("productgroup")
catalog.create_entity(name="customer", serving_names=["GROCERYCUSTOMERGUID"])
catalog.create_entity(name="invoice", serving_names=["GROCERYINVOICEGUID"])
catalog.create_entity(name="item", serving_names=["GROCERYINVOICEITEMGUID"])
catalog.create_entity(name="product", serving_names=["GROCERYPRODUCTGUID"])
catalog.create_entity(name="productgroup", serving_names=["PRODUCTGROUP"])
catalog.create_entity(name="frenchstate", serving_names=["FRENCHSTATE"])
# tag the entities for the grocery customer table
customer_table.GroceryCustomerGuid.as_entity("customer")
customer_table.State.as_entity("frenchstate")
# tag the entities for the grocery invoice table
invoice_table.GroceryInvoiceGuid.as_entity("invoice")
invoice_table.GroceryCustomerGuid.as_entity("customer")
# tag the entities for the grocery items table
items_table.GroceryInvoiceItemGuid.as_entity("item")
items_table.GroceryInvoiceGuid.as_entity("invoice")
items_table.GroceryProductGuid.as_entity("product")
# tag the entities for the grocery product table
product_table.GroceryProductGuid.as_entity("product")
product_table.ProductGroup.as_entity("productgroup")
Get views¶
In [7]:
Copied!
items_view = items_table.get_view()
product_view = product_table.get_view()
items_view = items_table.get_view()
product_view = product_table.get_view()
Create embedding column¶
We suggest to do this before any joins which will increase the size of the dataset.
For example, if we want to encode a column from dimension table which will be them joined to item or event table, it is better to run embedding on the dimension table itself before join. This way number of calls to transformer model will be minimized.
In [8]:
Copied!
product_view["ProductGroupEmbedding"] = embedding_udf(product_view["ProductGroup"])
product_view["ProductGroupEmbedding"] = embedding_udf(product_view["ProductGroup"])
In [9]:
Copied!
items_view = items_view.join(product_view, rsuffix="")
items_view = items_view.join(product_view, rsuffix="")
Create Embedding Features¶
In [10]:
Copied!
# Create ProductGroup Embedding feature
product_group_embedding = \
items_view["ProductGroupEmbedding"].as_feature("PRODUCT_ProductGroup_Embedding")
# Create ProductGroup Embedding feature
product_group_embedding = \
items_view["ProductGroupEmbedding"].as_feature("PRODUCT_ProductGroup_Embedding")
In [11]:
Copied!
# Create average cusomer's product group over 14 and 183 days periods
customer_avg_product_groups = items_view.groupby("GroceryCustomerGuid").aggregate_over(
"ProductGroupEmbedding",
method=fb.AggFunc.AVG,
feature_names=[
"CUSTOMER_Avg_Of_ProductGroup_Embedding_14d",
"CUSTOMER_Avg_Of_ProductGroup_Embedding_183d",
],
windows=["14d", "183d"]
)
# Create average cusomer's product group over 14 and 183 days periods
customer_avg_product_groups = items_view.groupby("GroceryCustomerGuid").aggregate_over(
"ProductGroupEmbedding",
method=fb.AggFunc.AVG,
feature_names=[
"CUSTOMER_Avg_Of_ProductGroup_Embedding_14d",
"CUSTOMER_Avg_Of_ProductGroup_Embedding_183d",
],
windows=["14d", "183d"]
)
In [12]:
Copied!
# Create cosine similarity between latest 14d average customer's product group
# and average product group in last 183 days
customer_avg_product_groups_cosine = \
customer_avg_product_groups["CUSTOMER_Avg_Of_ProductGroup_Embedding_14d"].vec.cosine_similarity(
customer_avg_product_groups["CUSTOMER_Avg_Of_ProductGroup_Embedding_183d"]
)
customer_avg_product_groups_cosine.name = \
"CUSTOMER_Consistency_of_Avg_of_ProductGroup_Embedding_14d_183d"
# Create cosine similarity between latest 14d average customer's product group
# and average product group in last 183 days
customer_avg_product_groups_cosine = \
customer_avg_product_groups["CUSTOMER_Avg_Of_ProductGroup_Embedding_14d"].vec.cosine_similarity(
customer_avg_product_groups["CUSTOMER_Avg_Of_ProductGroup_Embedding_183d"]
)
customer_avg_product_groups_cosine.name = \
"CUSTOMER_Consistency_of_Avg_of_ProductGroup_Embedding_14d_183d"
In [13]:
Copied!
features = fb.FeatureGroup([
product_group_embedding,
customer_avg_product_groups,
customer_avg_product_groups_cosine
])
features = fb.FeatureGroup([
product_group_embedding,
customer_avg_product_groups,
customer_avg_product_groups_cosine
])
Display features¶
In [14]:
Copied!
observation_table = items_view.create_observation_table(
name="Preview Table",
sample_rows=10,
columns=["Timestamp", "GroceryInvoiceItemGuid"],
columns_rename_mapping={
"Timestamp": "POINT_IN_TIME",
"GroceryInvoiceItemGuid": "GROCERYINVOICEITEMGUID",
},
)
observation_table = items_view.create_observation_table(
name="Preview Table",
sample_rows=10,
columns=["Timestamp", "GroceryInvoiceItemGuid"],
columns_rename_mapping={
"Timestamp": "POINT_IN_TIME",
"GroceryInvoiceItemGuid": "GROCERYINVOICEITEMGUID",
},
)
In [15]:
Copied!
features.preview(observation_table.to_pandas())
features.preview(observation_table.to_pandas())
Out[15]:
POINT_IN_TIME | GROCERYINVOICEITEMGUID | PRODUCT_ProductGroup_Embedding | CUSTOMER_Avg_Of_ProductGroup_Embedding_14d | CUSTOMER_Avg_Of_ProductGroup_Embedding_183d | CUSTOMER_Consistency_of_Avg_of_ProductGroup_Embedding_14d_183d | |
---|---|---|---|---|---|---|
0 | 2022-02-16 18:21:46 | cee5213a-5e6c-4c4e-b2e0-be68641b5794 | [-0.04785451292991601, 0.089341826736927, -0.0... | [-0.058020170524039, 0.032411077673632, -0.000... | [-0.066213653681816, 0.03206242506035, -0.0057... | 0.983710 |
1 | 2022-03-09 14:53:59 | 087a88c0-a095-47fe-9b10-d9ce96099c53 | [-0.062005367130041004, 0.0139962984249, -0.08... | [-0.037672073437044006, 0.020930944440457, -0.... | [-0.05225789626794201, 0.02539920669211, -0.02... | 0.946509 |
2 | 2022-03-22 10:19:28 | bc8e2366-178b-436b-8561-486d30585a71 | [0.007008050568402001, 0.022246360778809003, -... | [-0.06751180471231501, 0.044095732426892006, -... | [-0.057248351460492, 0.018503276885233, -0.003... | 0.911853 |
3 | 2022-04-10 10:27:03 | 56a8f07c-3211-4beb-8f85-72bb1100b1ab | [0.032752003520727005, 0.035201292484999, 0.02... | [-0.05269033904187401, 0.042718844848092005, -... | [-0.04874591809341, 0.038789121775793, -0.0223... | 0.957113 |
4 | 2022-05-31 10:18:15 | 413f40c0-0ec5-4305-8b23-cd85fb7a0c43 | [-0.087121985852718, 0.050529599189758, -0.008... | [-0.05328328095908701, 0.005815382514681, -0.0... | [-0.055783698040449005, 0.029181797294901002, ... | 0.880618 |
5 | 2022-08-03 16:21:22 | f197b217-75c0-411d-86d7-d77aa0f1e058 | [-0.07479558140039401, 0.019945941865444003, 0... | [-0.064987060259141, 0.022283805090757003, -0.... | [-0.054253227964817005, 0.036997466192159004, ... | 0.929665 |
6 | 2022-12-09 14:19:48 | c1985933-fbad-44fd-a846-399f46db1caa | [-0.052736759185791, 0.025664445012808002, 0.0... | [-0.036871877086482, 0.019740114121565, 0.0057... | [-0.043221510310417, 0.024338980163743, -0.010... | 0.952165 |
7 | 2022-12-23 16:19:01 | 2c1a8c50-ed0b-49d0-a3ec-2d2b5170c889 | [-0.07160992175340701, 0.030648233368993003, -... | [-0.046758838812821, 0.040230349521153, -0.010... | [-0.051375815508676, 0.028397644834619002, -0.... | 0.892956 |
8 | 2023-01-01 20:22:50 | 14939bc6-7729-4817-be1a-b967e291e149 | [-0.09608079493045801, 0.061092965304852004, 0... | [-0.04874969327024101, 0.024902904927287, -0.0... | [-0.048542827472633006, 0.029627525927622002, ... | 0.984254 |
9 | 2023-01-13 17:32:02 | fb43e112-7e88-4556-ace0-b558147b3e9d | [-0.04785451292991601, 0.089341826736927, -0.0... | [-0.046684044180438006, 0.021421298384666002, ... | [-0.05595829285366401, 0.039303245678754005, -... | 0.650355 |
In [ ]:
Copied!