9. Create window aggregate features
Create window aggregate features¶
Next feature type we will consider is window aggregate feature. These are features generated by aggregating data within specific time frame.
In [1]:
Copied!
import featurebyte as fb
# Set your profile to the tutorial environment
fb.use_profile("tutorial")
catalog_name = "Grocery Dataset Tutorial"
catalog = fb.Catalog.activate(catalog_name)
import featurebyte as fb
# Set your profile to the tutorial environment
fb.use_profile("tutorial")
catalog_name = "Grocery Dataset Tutorial"
catalog = fb.Catalog.activate(catalog_name)
22:00:05 | INFO | Using configuration file at: /Users/gxav/.featurebyte/config.yaml 22:00:05 | INFO | Active profile: tutorial (https://tutorials.featurebyte.com/api/v1) 22:00:05 | WARNING | Remote SDK version (0.5.0.dev6) is different from local (0.5.0.dev1). Update local SDK to avoid unexpected behavior. 22:00:05 | INFO | No catalog activated. 22:00:06 | INFO | 6 feature lists, 31 features deployed 22:00:06 | INFO | Using profile: tutorial 22:00:06 | INFO | Using configuration file at: /Users/gxav/.featurebyte/config.yaml 22:00:06 | INFO | Active profile: tutorial (https://tutorials.featurebyte.com/api/v1) 22:00:06 | WARNING | Remote SDK version (0.5.0.dev6) is different from local (0.5.0.dev1). Update local SDK to avoid unexpected behavior. 22:00:06 | INFO | No catalog activated. 22:00:07 | INFO | 6 feature lists, 31 features deployed 22:00:08 | INFO | Catalog activated: Grocery Dataset Tutorial
In [2]:
Copied!
# Get view from GROCERYINVOICE event table.
groceryinvoice_view = catalog.get_view("GROCERYINVOICE")
# Get view from GROCERYINVOICE event table.
groceryinvoice_view = catalog.get_view("GROCERYINVOICE")
Do window aggregation from GROCERYINVOICE¶
In [3]:
Copied!
# Group GROCERYINVOICE view by customer entity (GroceryCustomerGuid).
groceryinvoice_view_by_customer = groceryinvoice_view.groupby(['GroceryCustomerGuid'])
# Group GROCERYINVOICE view by customer entity (GroceryCustomerGuid).
groceryinvoice_view_by_customer = groceryinvoice_view.groupby(['GroceryCustomerGuid'])
In [4]:
Copied!
# Get Latest invoice Amount for the customer
customer_latest_invoice_amount = groceryinvoice_view_by_customer.aggregate_over(
"Amount", method="latest",
feature_names=["CUSTOMER_Latest_invoice_Amount"],
windows=[None]
)["CUSTOMER_Latest_invoice_Amount"]
# Get Latest invoice Amount for the customer
customer_latest_invoice_amount = groceryinvoice_view_by_customer.aggregate_over(
"Amount", method="latest",
feature_names=["CUSTOMER_Latest_invoice_Amount"],
windows=[None]
)["CUSTOMER_Latest_invoice_Amount"]
In [5]:
Copied!
# Set desired windows
windows = ['14d', '28d']
# Set desired windows
windows = ['14d', '28d']
In [6]:
Copied!
# Get Count of invoices for the customer
customer_count_of_invoice_14d_28d = groceryinvoice_view_by_customer.aggregate_over(
method="count",
feature_names=[
"CUSTOMER_Count_of_invoice"
+ "_" + w for w in windows
],
windows=windows
)
# Get Count of invoices for the customer
customer_count_of_invoice_14d_28d = groceryinvoice_view_by_customer.aggregate_over(
method="count",
feature_names=[
"CUSTOMER_Count_of_invoice"
+ "_" + w for w in windows
],
windows=windows
)
In [7]:
Copied!
# Get Avg of Amount for the customer over time.
customer_avg_of_invoice_amount_14d_28d = groceryinvoice_view_by_customer.aggregate_over(
"Amount", method="avg",
feature_names=[
"CUSTOMER_Avg_of_invoice_Amount"
+ "_" + w for w in windows
],
windows=windows
)
# Get Avg of Amount for the customer over time.
customer_avg_of_invoice_amount_14d_28d = groceryinvoice_view_by_customer.aggregate_over(
"Amount", method="avg",
feature_names=[
"CUSTOMER_Avg_of_invoice_Amount"
+ "_" + w for w in windows
],
windows=windows
)
In [8]:
Copied!
# Get Std of Amount for the customer over time.
customer_std_of_invoice_amount_14d_28d = groceryinvoice_view_by_customer.aggregate_over(
"Amount", method="std",
feature_names=[
"CUSTOMER_Std_of_invoice_Amount"
+ "_" + w for w in windows
],
windows=windows
)
# Get Std of Amount for the customer over time.
customer_std_of_invoice_amount_14d_28d = groceryinvoice_view_by_customer.aggregate_over(
"Amount", method="std",
feature_names=[
"CUSTOMER_Std_of_invoice_Amount"
+ "_" + w for w in windows
],
windows=windows
)
Preview a feature group¶
In [9]:
Copied!
feature_group = fb.FeatureGroup([
customer_latest_invoice_amount,
customer_count_of_invoice_14d_28d,
customer_avg_of_invoice_amount_14d_28d,
customer_std_of_invoice_amount_14d_28d,
])
feature_group = fb.FeatureGroup([
customer_latest_invoice_amount,
customer_count_of_invoice_14d_28d,
customer_avg_of_invoice_amount_14d_28d,
customer_std_of_invoice_amount_14d_28d,
])
In [10]:
Copied!
# Check the primary entity of the feature group
feature_group.primary_entity
# Check the primary entity of the feature group
feature_group.primary_entity
Out[10]:
[<featurebyte.api.entity.Entity at 0x7f8b88853280> { 'name': 'customer', 'created_at': '2023-09-11T13:56:58.863000', 'updated_at': '2023-09-11T13:57:16.943000', 'description': None, 'serving_names': [ 'GROCERYCUSTOMERGUID' ], 'catalog_name': 'Grocery Dataset Tutorial' }]
In [11]:
Copied!
# Get observation table: 'Preview Table with 10 Customers'
preview_table = catalog.get_observation_table("Preview Table with 10 Customers").to_pandas()
# Get observation table: 'Preview Table with 10 Customers'
preview_table = catalog.get_observation_table("Preview Table with 10 Customers").to_pandas()
Downloading table |████████████████████████████████████████| 10/10 [100%] in 0.1
In [12]:
Copied!
# Preview feature_group
feature_group.preview(preview_table)
# Preview feature_group
feature_group.preview(preview_table)
Out[12]:
POINT_IN_TIME | GROCERYCUSTOMERGUID | CUSTOMER_Latest_invoice_Amount | CUSTOMER_Count_of_invoice_14d | CUSTOMER_Count_of_invoice_28d | CUSTOMER_Avg_of_invoice_Amount_14d | CUSTOMER_Avg_of_invoice_Amount_28d | CUSTOMER_Std_of_invoice_Amount_14d | CUSTOMER_Std_of_invoice_Amount_28d | |
---|---|---|---|---|---|---|---|---|---|
0 | 2022-07-21 12:50:21 | dd0c74f0-9ba8-4ca9-bd84-8148095aa38a | 19.35 | 1 | 4 | 19.350000 | 10.692500 | 0.000000 | 5.104177 |
1 | 2022-11-10 15:57:20 | 0401635c-e6ab-4525-bb5d-00aba7f6d0c4 | 62.55 | 2 | 5 | 38.645000 | 28.872000 | 23.905000 | 19.739113 |
2 | 2022-08-14 19:00:14 | 54d86ef6-f9b8-40e2-9162-a60bd1b705db | 24.73 | 3 | 10 | 23.230000 | 16.530000 | 8.492738 | 8.803087 |
3 | 2022-07-12 08:02:04 | 4eb4ee84-ee13-4eec-9c26-61b6eb4ba35b | 46.00 | 4 | 13 | 25.005000 | 19.339231 | 14.659950 | 17.007705 |
4 | 2022-12-13 08:15:49 | 1e866814-e5a6-475d-87e3-b53377cc005b | 5.00 | 4 | 8 | 3.512500 | 6.938750 | 1.064551 | 5.280136 |
5 | 2023-04-26 16:52:34 | 48072b52-39cf-452c-8531-02cc4d0fc32e | 1.54 | 12 | 28 | 3.740000 | 5.490714 | 5.472707 | 7.571106 |
6 | 2023-03-01 11:31:00 | 081f111a-598b-43ae-a28a-3a5dc3d2a091 | 5.08 | 3 | 5 | 3.983333 | 8.358000 | 1.759968 | 9.600432 |
7 | 2023-01-19 16:33:33 | f3415165-754c-40b6-af17-06ef952a3fa1 | 2.28 | 15 | 23 | 10.937333 | 11.207826 | 15.656393 | 15.238932 |
8 | 2023-04-11 19:07:26 | d0ea14bf-038a-4ae5-887e-e2d4d68dd8f6 | 3.80 | 6 | 14 | 6.605000 | 5.896429 | 6.384240 | 4.526644 |
9 | 2023-04-10 08:24:27 | 69d8718e-8c4a-4264-8edf-e0ffc1ef4737 | 40.56 | 8 | 14 | 8.308750 | 11.961429 | 12.339442 | 15.773543 |
Save features into catalog¶
With feature groups we can do it in one call.
In [13]:
Copied!
feature_group.save()
feature_group.save()
Done! |████████████████████████████████████████| 100% in 13.6s (0.07%/s) Loading Feature(s) |████████████████████████████████████████| 7/7 [100%] in 1.0s
Add description and see feature definition files¶
In [14]:
Copied!
# Add description
customer_latest_invoice_amount.update_description("Latest invoice Amount for the customer")
# See feature definition file
customer_latest_invoice_amount.definition
# Add description
customer_latest_invoice_amount.update_description("Latest invoice Amount for the customer")
# See feature definition file
customer_latest_invoice_amount.definition
Out[14]:
# Generated by SDK version: 0.5.0.dev6
from bson import ObjectId
from featurebyte import ColumnCleaningOperation
from featurebyte import DisguisedValueImputation
from featurebyte import EventTable
from featurebyte import FeatureJobSetting
from featurebyte import ValueBeyondEndpointImputation
# event_table name: "GROCERYINVOICE"
event_table = EventTable.get_by_id(ObjectId("64ff1c910d5bfbfb21bce78a"))
event_view = event_table.get_view(
view_mode="manual",
drop_column_names=["record_available_at"],
column_cleaning_operations=[
ColumnCleaningOperation(
column_name="Amount",
cleaning_operations=[
DisguisedValueImputation(
imputed_value=None, disguised_values=[-99, -98]
),
ValueBeyondEndpointImputation(
type="less_than", end_point=0, imputed_value=0
),
ValueBeyondEndpointImputation(
type="greater_than", end_point=2000, imputed_value=2000
),
],
)
],
)
grouped = event_view.groupby(
by_keys=["GroceryCustomerGuid"], category=None
).aggregate_over(
value_column="Amount",
method="latest",
windows=[None],
feature_names=["CUSTOMER_Latest_invoice_Amount"],
feature_job_setting=FeatureJobSetting(
blind_spot="120s", frequency="3600s", time_modulo_frequency="120s"
),
skip_fill_na=True,
)
feat = grouped["CUSTOMER_Latest_invoice_Amount"]
output = feat
output.save(_id=ObjectId("64ff1d6a98b637caa789748c"))
In [15]:
Copied!
# Add description
customer_count_of_invoice_14d = customer_count_of_invoice_14d_28d["CUSTOMER_Count_of_invoice_14d"]
customer_count_of_invoice_14d.update_description(
"Sum of invoice Amount for the customer over a 14d period."
)
customer_count_of_invoice_28d = customer_count_of_invoice_14d_28d["CUSTOMER_Count_of_invoice_28d"]
customer_count_of_invoice_28d.update_description(
"Count of invoice for the customer over a 28d period."
)
# See feature definition file
customer_count_of_invoice_28d.definition
# Add description
customer_count_of_invoice_14d = customer_count_of_invoice_14d_28d["CUSTOMER_Count_of_invoice_14d"]
customer_count_of_invoice_14d.update_description(
"Sum of invoice Amount for the customer over a 14d period."
)
customer_count_of_invoice_28d = customer_count_of_invoice_14d_28d["CUSTOMER_Count_of_invoice_28d"]
customer_count_of_invoice_28d.update_description(
"Count of invoice for the customer over a 28d period."
)
# See feature definition file
customer_count_of_invoice_28d.definition
Out[15]:
# Generated by SDK version: 0.5.0.dev6
from bson import ObjectId
from featurebyte import EventTable
from featurebyte import FeatureJobSetting
# event_table name: "GROCERYINVOICE"
event_table = EventTable.get_by_id(ObjectId("64ff1c910d5bfbfb21bce78a"))
event_view = event_table.get_view(
view_mode="manual",
drop_column_names=["record_available_at"],
column_cleaning_operations=[],
)
grouped = event_view.groupby(
by_keys=["GroceryCustomerGuid"], category=None
).aggregate_over(
value_column=None,
method="count",
windows=["28d"],
feature_names=["CUSTOMER_Count_of_invoice_28d"],
feature_job_setting=FeatureJobSetting(
blind_spot="120s", frequency="3600s", time_modulo_frequency="120s"
),
skip_fill_na=True,
)
feat = grouped["CUSTOMER_Count_of_invoice_28d"]
feat_1 = feat.copy()
feat_1[feat.isnull()] = 0
feat_1.name = "CUSTOMER_Count_of_invoice_28d"
output = feat_1
output.save(_id=ObjectId("64ff1d6a98b637caa789748f"))
In [16]:
Copied!
# Add description
customer_avg_of_invoice_amount_14d = customer_avg_of_invoice_amount_14d_28d["CUSTOMER_Avg_of_invoice_Amount_14d"]
customer_avg_of_invoice_amount_14d.update_description(
"Avg of invoice Amount for the customer over a 14d period."
)
customer_avg_of_invoice_amount_28d = customer_avg_of_invoice_amount_14d_28d["CUSTOMER_Avg_of_invoice_Amount_28d"]
customer_avg_of_invoice_amount_28d.update_description(
"Avg of invoice Amount for the customer over a 28d period."
)
# See feature definition file
customer_avg_of_invoice_amount_28d.definition
# Add description
customer_avg_of_invoice_amount_14d = customer_avg_of_invoice_amount_14d_28d["CUSTOMER_Avg_of_invoice_Amount_14d"]
customer_avg_of_invoice_amount_14d.update_description(
"Avg of invoice Amount for the customer over a 14d period."
)
customer_avg_of_invoice_amount_28d = customer_avg_of_invoice_amount_14d_28d["CUSTOMER_Avg_of_invoice_Amount_28d"]
customer_avg_of_invoice_amount_28d.update_description(
"Avg of invoice Amount for the customer over a 28d period."
)
# See feature definition file
customer_avg_of_invoice_amount_28d.definition
Out[16]:
# Generated by SDK version: 0.5.0.dev6
from bson import ObjectId
from featurebyte import ColumnCleaningOperation
from featurebyte import DisguisedValueImputation
from featurebyte import EventTable
from featurebyte import FeatureJobSetting
from featurebyte import ValueBeyondEndpointImputation
# event_table name: "GROCERYINVOICE"
event_table = EventTable.get_by_id(ObjectId("64ff1c910d5bfbfb21bce78a"))
event_view = event_table.get_view(
view_mode="manual",
drop_column_names=["record_available_at"],
column_cleaning_operations=[
ColumnCleaningOperation(
column_name="Amount",
cleaning_operations=[
DisguisedValueImputation(
imputed_value=None, disguised_values=[-99, -98]
),
ValueBeyondEndpointImputation(
type="less_than", end_point=0, imputed_value=0
),
ValueBeyondEndpointImputation(
type="greater_than", end_point=2000, imputed_value=2000
),
],
)
],
)
grouped = event_view.groupby(
by_keys=["GroceryCustomerGuid"], category=None
).aggregate_over(
value_column="Amount",
method="avg",
windows=["28d"],
feature_names=["CUSTOMER_Avg_of_invoice_Amount_28d"],
feature_job_setting=FeatureJobSetting(
blind_spot="120s", frequency="3600s", time_modulo_frequency="120s"
),
skip_fill_na=True,
)
feat = grouped["CUSTOMER_Avg_of_invoice_Amount_28d"]
output = feat
output.save(_id=ObjectId("64ff1d6a98b637caa7897492"))
In [17]:
Copied!
# Add description
customer_std_of_invoice_amount_14d = customer_std_of_invoice_amount_14d_28d["CUSTOMER_Std_of_invoice_Amount_14d"]
customer_std_of_invoice_amount_14d.update_description(
"Std of invoice Amount for the customer over a 14d period."
)
customer_std_of_invoice_amount_28d = customer_std_of_invoice_amount_14d_28d["CUSTOMER_Std_of_invoice_Amount_28d"]
customer_std_of_invoice_amount_28d.update_description(
"Std of invoice Amount for the customer over a 28d period."
)
# See feature definition file
customer_std_of_invoice_amount_28d.definition
# Add description
customer_std_of_invoice_amount_14d = customer_std_of_invoice_amount_14d_28d["CUSTOMER_Std_of_invoice_Amount_14d"]
customer_std_of_invoice_amount_14d.update_description(
"Std of invoice Amount for the customer over a 14d period."
)
customer_std_of_invoice_amount_28d = customer_std_of_invoice_amount_14d_28d["CUSTOMER_Std_of_invoice_Amount_28d"]
customer_std_of_invoice_amount_28d.update_description(
"Std of invoice Amount for the customer over a 28d period."
)
# See feature definition file
customer_std_of_invoice_amount_28d.definition
Out[17]:
# Generated by SDK version: 0.5.0.dev6
from bson import ObjectId
from featurebyte import ColumnCleaningOperation
from featurebyte import DisguisedValueImputation
from featurebyte import EventTable
from featurebyte import FeatureJobSetting
from featurebyte import ValueBeyondEndpointImputation
# event_table name: "GROCERYINVOICE"
event_table = EventTable.get_by_id(ObjectId("64ff1c910d5bfbfb21bce78a"))
event_view = event_table.get_view(
view_mode="manual",
drop_column_names=["record_available_at"],
column_cleaning_operations=[
ColumnCleaningOperation(
column_name="Amount",
cleaning_operations=[
DisguisedValueImputation(
imputed_value=None, disguised_values=[-99, -98]
),
ValueBeyondEndpointImputation(
type="less_than", end_point=0, imputed_value=0
),
ValueBeyondEndpointImputation(
type="greater_than", end_point=2000, imputed_value=2000
),
],
)
],
)
grouped = event_view.groupby(
by_keys=["GroceryCustomerGuid"], category=None
).aggregate_over(
value_column="Amount",
method="std",
windows=["28d"],
feature_names=["CUSTOMER_Std_of_invoice_Amount_28d"],
feature_job_setting=FeatureJobSetting(
blind_spot="120s", frequency="3600s", time_modulo_frequency="120s"
),
skip_fill_na=True,
)
feat = grouped["CUSTOMER_Std_of_invoice_Amount_28d"]
output = feat
output.save(_id=ObjectId("64ff1d6a98b637caa7897494"))
In [ ]:
Copied!