Sdk dlt example
In [1]:
Copied!
import time
import os
import featurebyte as fb
import time
import os
import featurebyte as fb
10:51:42 | INFO | Using configuration file at: /Users/viktor/.featurebyte/config.yaml 10:51:42 | INFO | Active profile: local (http://127.0.0.1:8088) 10:51:42 | INFO | SDK version: 0.6.0 10:51:42 | INFO | No catalog activated. 10:51:42 | INFO | 0 feature list, 0 feature deployed
Connect to databricks data warehouse¶
In [2]:
Copied!
ds = fb.FeatureStore.get_or_create(
name="databricks",
source_type=fb.SourceType.DATABRICKS,
details=fb.DatabricksDetails(
host="<host>",
http_path="<http_path>",
featurebyte_catalog="hive_metastore",
featurebyte_schema="grocery",
storage_type=fb.StorageType.S3,
storage_url=f"<storage_url>",
storage_spark_url=f"<storage_spark_url>",
),
database_credential=fb.AccessTokenCredential(
access_token="<databricks_access_token>"
),
storage_credential=fb.S3StorageCredential(
s3_access_key_id="<s3_access_key_id>",
s3_secret_access_key="<s3_secret_access_key>",
)
).get_data_source()
ds = fb.FeatureStore.get_or_create(
name="databricks",
source_type=fb.SourceType.DATABRICKS,
details=fb.DatabricksDetails(
host="",
http_path="",
featurebyte_catalog="hive_metastore",
featurebyte_schema="grocery",
storage_type=fb.StorageType.S3,
storage_url=f"",
storage_spark_url=f"",
),
database_credential=fb.AccessTokenCredential(
access_token=""
),
storage_credential=fb.S3StorageCredential(
s3_access_key_id="",
s3_secret_access_key="",
)
).get_data_source()
Create catalog¶
In [3]:
Copied!
catalog_name = "DLT Example Catalog"
fb.Catalog.create(catalog_name, feature_store_name="databricks")
catalog = fb.Catalog.activate(catalog_name)
catalog_name = "DLT Example Catalog"
fb.Catalog.create(catalog_name, feature_store_name="databricks")
catalog = fb.Catalog.activate(catalog_name)
10:51:44 | INFO | Catalog activated: DLT Example Catalog
Create tables¶
In [4]:
Copied!
%%capture
customer_table = ds.get_source_table(
database_name="hive_metastore", schema_name="grocery", table_name="grocerycustomer"
).create_scd_table(
name="grocerycustomer",
surrogate_key_column='RowID',
natural_key_column="GroceryCustomerGuid",
effective_timestamp_column="ValidFrom",
current_flag_column="CurrentRecord",
record_creation_timestamp_column="record_available_at",
)
invoice_table = ds.get_source_table(
database_name="hive_metastore", schema_name="grocery", table_name="groceryinvoice"
).create_event_table(
name="groceryinvoice",
event_id_column="GroceryInvoiceGuid",
event_timestamp_column="Timestamp",
event_timestamp_timezone_offset_column="tz_offset",
record_creation_timestamp_column="record_available_at",
)
invoice_table.initialize_default_feature_job_setting()
items_table = ds.get_source_table(
database_name="hive_metastore", schema_name="grocery", table_name="invoiceitems"
).create_item_table(
name="invoiceitems",
event_id_column="GroceryInvoiceGuid",
item_id_column="GroceryInvoiceItemGuid",
event_table_name="groceryinvoice",
record_creation_timestamp_column="record_available_at",
)
product_table = ds.get_source_table(
database_name="hive_metastore", schema_name="grocery", table_name="groceryproduct_embedding"
).create_dimension_table(name="groceryproduct", dimension_id_column="GroceryProductGuid")
%%capture
customer_table = ds.get_source_table(
database_name="hive_metastore", schema_name="grocery", table_name="grocerycustomer"
).create_scd_table(
name="grocerycustomer",
surrogate_key_column='RowID',
natural_key_column="GroceryCustomerGuid",
effective_timestamp_column="ValidFrom",
current_flag_column="CurrentRecord",
record_creation_timestamp_column="record_available_at",
)
invoice_table = ds.get_source_table(
database_name="hive_metastore", schema_name="grocery", table_name="groceryinvoice"
).create_event_table(
name="groceryinvoice",
event_id_column="GroceryInvoiceGuid",
event_timestamp_column="Timestamp",
event_timestamp_timezone_offset_column="tz_offset",
record_creation_timestamp_column="record_available_at",
)
invoice_table.initialize_default_feature_job_setting()
items_table = ds.get_source_table(
database_name="hive_metastore", schema_name="grocery", table_name="invoiceitems"
).create_item_table(
name="invoiceitems",
event_id_column="GroceryInvoiceGuid",
item_id_column="GroceryInvoiceItemGuid",
event_table_name="groceryinvoice",
record_creation_timestamp_column="record_available_at",
)
product_table = ds.get_source_table(
database_name="hive_metastore", schema_name="grocery", table_name="groceryproduct_embedding"
).create_dimension_table(name="groceryproduct", dimension_id_column="GroceryProductGuid")
Create and tag entities¶
In [5]:
Copied!
# entities
catalog.create_entity(name="grocerycustomer", serving_names=["GROCERYCUSTOMERGUID"])
catalog.create_entity(name="groceryinvoice", serving_names=["GROCERYINVOICEGUID"])
catalog.create_entity(name="groceryitem", serving_names=["GROCERYINVOICEITEMGUID"])
catalog.create_entity(name="groceryproduct", serving_names=["GROCERYPRODUCTGUID"])
catalog.create_entity(name="productgroup", serving_names=["PRODUCTGROUP"])
catalog.create_entity(name="frenchstate", serving_names=["FRENCHSTATE"])
# tag the entities for the grocery customer table
customer_table.GroceryCustomerGuid.as_entity("grocerycustomer")
customer_table.State.as_entity("frenchstate")
# tag the entities for the grocery invoice table
invoice_table.GroceryInvoiceGuid.as_entity("groceryinvoice")
invoice_table.GroceryCustomerGuid.as_entity("grocerycustomer")
# tag the entities for the grocery items table
items_table.GroceryInvoiceItemGuid.as_entity("groceryitem")
items_table.GroceryInvoiceGuid.as_entity("groceryinvoice")
items_table.GroceryProductGuid.as_entity("groceryproduct")
# tag the entities for the grocery product table
product_table.GroceryProductGuid.as_entity("groceryproduct")
product_table.ProductGroup.as_entity("productgroup")
# entities
catalog.create_entity(name="grocerycustomer", serving_names=["GROCERYCUSTOMERGUID"])
catalog.create_entity(name="groceryinvoice", serving_names=["GROCERYINVOICEGUID"])
catalog.create_entity(name="groceryitem", serving_names=["GROCERYINVOICEITEMGUID"])
catalog.create_entity(name="groceryproduct", serving_names=["GROCERYPRODUCTGUID"])
catalog.create_entity(name="productgroup", serving_names=["PRODUCTGROUP"])
catalog.create_entity(name="frenchstate", serving_names=["FRENCHSTATE"])
# tag the entities for the grocery customer table
customer_table.GroceryCustomerGuid.as_entity("grocerycustomer")
customer_table.State.as_entity("frenchstate")
# tag the entities for the grocery invoice table
invoice_table.GroceryInvoiceGuid.as_entity("groceryinvoice")
invoice_table.GroceryCustomerGuid.as_entity("grocerycustomer")
# tag the entities for the grocery items table
items_table.GroceryInvoiceItemGuid.as_entity("groceryitem")
items_table.GroceryInvoiceGuid.as_entity("groceryinvoice")
items_table.GroceryProductGuid.as_entity("groceryproduct")
# tag the entities for the grocery product table
product_table.GroceryProductGuid.as_entity("groceryproduct")
product_table.ProductGroup.as_entity("productgroup")
Get and join views¶
In [6]:
Copied!
invoice_view = invoice_table.get_view()
items_view = items_table.get_view()
product_view = product_table.get_view()
invoice_view = invoice_table.get_view()
items_view = items_table.get_view()
product_view = product_table.get_view()
In [7]:
Copied!
items_view = items_view.join(product_view)
items_view = items_view.join(invoice_view, rsuffix="_invoice")
items_view = items_view.join(product_view)
items_view = items_view.join(invoice_view, rsuffix="_invoice")
Create invoice product groups embedding¶
In [8]:
Copied!
invoice_product_group_embedding = items_view.groupby("GroceryInvoiceGuid").aggregate(
"ProductGroupEmbedding",
method=fb.AggFunc.AVG,
feature_name="INVOICE_Avg_of_ProductGroup_Embedding",
)
invoice_product_group_embedding = items_view.groupby("GroceryInvoiceGuid").aggregate(
"ProductGroupEmbedding",
method=fb.AggFunc.AVG,
feature_name="INVOICE_Avg_of_ProductGroup_Embedding",
)
Create customer level product group embeddings¶
In [9]:
Copied!
customer_avg_product_groups = items_view.groupby("GroceryCustomerGuid").aggregate_over(
"ProductGroupEmbedding",
method=fb.AggFunc.AVG,
feature_names=["CUSTOMER_Avg_of_ProductGroup_Embedding_14d", "CUSTOMER_Avg_of_ProductGroup_Embedding_183d"],
windows=["14d", "183d"]
)
customer_avg_product_groups_cosine = customer_avg_product_groups["CUSTOMER_Avg_of_ProductGroup_Embedding_14d"].vec.cosine_similarity(
customer_avg_product_groups["CUSTOMER_Avg_of_ProductGroup_Embedding_183d"]
)
customer_avg_product_groups_cosine.name = "CUSTOMER_Consistency_of_Avg_of_ProductGroup_Embedding_14d_183d"
customer_avg_product_groups = items_view.groupby("GroceryCustomerGuid").aggregate_over(
"ProductGroupEmbedding",
method=fb.AggFunc.AVG,
feature_names=["CUSTOMER_Avg_of_ProductGroup_Embedding_14d", "CUSTOMER_Avg_of_ProductGroup_Embedding_183d"],
windows=["14d", "183d"]
)
customer_avg_product_groups_cosine = customer_avg_product_groups["CUSTOMER_Avg_of_ProductGroup_Embedding_14d"].vec.cosine_similarity(
customer_avg_product_groups["CUSTOMER_Avg_of_ProductGroup_Embedding_183d"]
)
customer_avg_product_groups_cosine.name = "CUSTOMER_Consistency_of_Avg_of_ProductGroup_Embedding_14d_183d"
Preview features¶
In [10]:
Copied!
features = fb.FeatureGroup([
customer_avg_product_groups_cosine,
customer_avg_product_groups,
invoice_product_group_embedding,
])
features = fb.FeatureGroup([
customer_avg_product_groups_cosine,
customer_avg_product_groups,
invoice_product_group_embedding,
])
In [11]:
Copied!
observation_table = items_view.create_observation_table(
name="Preview table",
sample_rows=10,
columns=["Timestamp", "GroceryCustomerGuid", "GroceryInvoiceGuid"],
columns_rename_mapping={
"Timestamp": "POINT_IN_TIME",
"GroceryCustomerGuid": "GROCERYCUSTOMERGUID",
"GroceryInvoiceGuid": "GROCERYINVOICEGUID",
},
)
observation_table = items_view.create_observation_table(
name="Preview table",
sample_rows=10,
columns=["Timestamp", "GroceryCustomerGuid", "GroceryInvoiceGuid"],
columns_rename_mapping={
"Timestamp": "POINT_IN_TIME",
"GroceryCustomerGuid": "GROCERYCUSTOMERGUID",
"GroceryInvoiceGuid": "GROCERYINVOICEGUID",
},
)
In [12]:
Copied!
features.preview(observation_table.to_pandas())
features.preview(observation_table.to_pandas())
Out[12]:
POINT_IN_TIME | GROCERYCUSTOMERGUID | GROCERYINVOICEGUID | CUSTOMER_Consistency_of_Avg_of_ProductGroup_Embedding_14d_183d | CUSTOMER_Avg_of_ProductGroup_Embedding_14d | CUSTOMER_Avg_of_ProductGroup_Embedding_183d | INVOICE_Avg_of_ProductGroup_Embedding | |
---|---|---|---|---|---|---|---|
0 | 2023-01-25 16:17:02+00:00 | 967e4beb-c889-4ff9-9140-66655248bbde | 452619d8-da2c-4b87-8869-fabd8617f0b5 | 0.947687 | [-0.07795123751428601, 0.026106864757143, -0.0... | [-0.05989266951781601, 0.028268971296264003, -... | [-0.024409980539999997, 0.0467800437, 0.022734... |
1 | 2023-01-25 17:58:34+00:00 | ed957eca-941a-47df-93ee-30ab1ae45ee5 | 3b7f52a4-eab9-409b-b329-11547eff92a3 | NaN | NaN | [-0.050461753724194, 0.03191136753629, -0.0203... | [-0.056538398775, 0.038149968064286, -0.025998... |
2 | 2023-06-22 16:39:56+00:00 | 7de2b5ba-8078-4c60-aa69-ef7b68c42349 | ebe828ea-44af-4865-b2ed-eea02857b889 | 0.943922 | [-0.054724733771875006, 0.04170257735937501, 0... | [-0.054099960254567, 0.036855927644231005, -0.... | [-0.0376417868, 0.0208652486275, -0.0210912462... |
3 | 2022-08-01 09:44:05+00:00 | 52476713-ad96-42e3-983d-31b1d3745a3f | 47ec017f-c65b-4bc9-917d-5a7b211c3621 | 0.866544 | [-0.05683987525, 0.045103899862500005, -0.0329... | [-0.053429381090463, 0.018712235034605, -0.015... | [-0.0524657229375, 0.02363832113125, -0.018923... |
4 | 2022-08-16 17:13:05+00:00 | e5d71d02-7fec-4eaf-8327-84b3583b7cec | bccd2292-1920-407f-a307-79d6237eab7c | 0.956710 | [-0.058180038666667, 0.035606893166667, -0.016... | [-0.044520215697211006, 0.031701440613546, -0.... | [-0.020503219, 0.03539911665625, -0.0233798176... |
5 | 2022-08-18 15:43:51+00:00 | 8ad0c1f9-e6cb-43d2-a459-3297f1b29f55 | ac5c7e47-8f65-4538-9146-f9edf130ea35 | NaN | NaN | [-0.039852984969097005, 0.030179503280556002, ... | [-0.0536430575, 0.037170080630769, -0.01672707... |
6 | 2023-03-14 19:55:25+00:00 | 00b5a352-7300-4fad-adea-907f8d68f393 | 79e801ba-0bec-48fa-b505-6da5bc159434 | 0.756868 | [-0.008574453749999999, 0.06710812775, -0.0338... | [-0.050725246419149, 0.036322272146809005, -0.... | [-0.054892299269231004, 0.047410778044231, -0.... |
7 | 2023-04-05 18:02:15+00:00 | 1b82b9eb-cc54-4cc4-a7e3-9a7417faa8a5 | 17745065-0092-4a56-852c-39e8dde94479 | 0.920511 | [-0.05063521202381, 0.033994672869048004, -0.0... | [-0.054906137815789006, 0.037337559488596, -0.... | [-0.048078875181818005, 0.015896207727273, -0.... |
8 | 2023-04-08 17:43:14+00:00 | 34be2f38-fe5b-4c18-863d-178b7ad6ff4e | 08b15987-d39c-4719-adbb-72e29059c81f | 0.955495 | [-0.059002682755263006, 0.033196180236842, -0.... | [-0.057579961551508005, 0.033872317339907004, ... | [-0.075094953, 0.036157493033333, -0.055885230... |
9 | 2023-04-16 14:53:28+00:00 | 2b068f1d-d99b-4c2f-a737-46f619a76cc8 | 512a824c-641a-4e9e-a259-6fbe700e805f | 0.861831 | [-0.058782766133333005, 0.0196883025, -0.00864... | [-0.051915258536648, 0.02764649977358, -0.0113... | [-0.06907756024999999, 0.01242989875, -0.01187... |
In [ ]:
Copied!