9. Create window aggregate features
Create window aggregate features¶
Next feature type we will consider is window aggregate feature. These are features generated by aggregating data within specific time frame.
In [1]:
Copied!
import featurebyte as fb
# Set your profile to the tutorial environment
fb.use_profile("tutorial")
catalog_name = "Grocery Dataset Tutorial"
catalog = fb.Catalog.activate(catalog_name)
import featurebyte as fb
# Set your profile to the tutorial environment
fb.use_profile("tutorial")
catalog_name = "Grocery Dataset Tutorial"
catalog = fb.Catalog.activate(catalog_name)
16:42:36 | INFO | Using configuration file at: /Users/viktor/.featurebyte/config.yaml 16:42:36 | INFO | Active profile: tutorial (https://tutorials.featurebyte.com/api/v1) 16:42:36 | INFO | SDK version: 0.6.0.dev121 16:42:36 | INFO | No catalog activated. 16:42:36 | INFO | 11 feature lists, 66 features deployed 16:42:36 | INFO | Using profile: tutorial 16:42:36 | INFO | Using configuration file at: /Users/viktor/.featurebyte/config.yaml 16:42:36 | INFO | Active profile: tutorial (https://tutorials.featurebyte.com/api/v1) 16:42:37 | INFO | SDK version: 0.6.0.dev121 16:42:37 | INFO | No catalog activated. 16:42:38 | INFO | 11 feature lists, 66 features deployed 16:42:38 | INFO | Catalog activated: Grocery Dataset Tutorial
In [2]:
Copied!
# Get view from GROCERYINVOICE event table.
groceryinvoice_view = catalog.get_view("GROCERYINVOICE")
# Get view from GROCERYINVOICE event table.
groceryinvoice_view = catalog.get_view("GROCERYINVOICE")
Do window aggregation from GROCERYINVOICE¶
In [3]:
Copied!
# Group GROCERYINVOICE view by customer entity (GroceryCustomerGuid).
groceryinvoice_view_by_customer = groceryinvoice_view.groupby(['GroceryCustomerGuid'])
# Group GROCERYINVOICE view by customer entity (GroceryCustomerGuid).
groceryinvoice_view_by_customer = groceryinvoice_view.groupby(['GroceryCustomerGuid'])
In [4]:
Copied!
# Get Latest invoice Amount for the customer
customer_latest_invoice_amount = groceryinvoice_view_by_customer.aggregate_over(
"Amount", method="latest",
feature_names=["CUSTOMER_Latest_invoice_Amount"],
windows=[None]
)["CUSTOMER_Latest_invoice_Amount"]
# Get Latest invoice Amount for the customer
customer_latest_invoice_amount = groceryinvoice_view_by_customer.aggregate_over(
"Amount", method="latest",
feature_names=["CUSTOMER_Latest_invoice_Amount"],
windows=[None]
)["CUSTOMER_Latest_invoice_Amount"]
In [5]:
Copied!
# Set desired windows
windows = ['14d', '28d']
# Set desired windows
windows = ['14d', '28d']
In [6]:
Copied!
# Get Count of invoices for the customer
customer_count_of_invoice_14d_28d = groceryinvoice_view_by_customer.aggregate_over(
method="count",
feature_names=[
"CUSTOMER_Count_of_invoice"
+ "_" + w for w in windows
],
windows=windows
)
# Get Count of invoices for the customer
customer_count_of_invoice_14d_28d = groceryinvoice_view_by_customer.aggregate_over(
method="count",
feature_names=[
"CUSTOMER_Count_of_invoice"
+ "_" + w for w in windows
],
windows=windows
)
In [7]:
Copied!
# Get Avg of Amount for the customer over time.
customer_avg_of_invoice_amount_14d_28d = groceryinvoice_view_by_customer.aggregate_over(
"Amount", method="avg",
feature_names=[
"CUSTOMER_Avg_of_invoice_Amount"
+ "_" + w for w in windows
],
windows=windows
)
# Get Avg of Amount for the customer over time.
customer_avg_of_invoice_amount_14d_28d = groceryinvoice_view_by_customer.aggregate_over(
"Amount", method="avg",
feature_names=[
"CUSTOMER_Avg_of_invoice_Amount"
+ "_" + w for w in windows
],
windows=windows
)
In [8]:
Copied!
# Get Std of Amount for the customer over time.
customer_std_of_invoice_amount_14d_28d = groceryinvoice_view_by_customer.aggregate_over(
"Amount", method="std",
feature_names=[
"CUSTOMER_Std_of_invoice_Amount"
+ "_" + w for w in windows
],
windows=windows
)
# Get Std of Amount for the customer over time.
customer_std_of_invoice_amount_14d_28d = groceryinvoice_view_by_customer.aggregate_over(
"Amount", method="std",
feature_names=[
"CUSTOMER_Std_of_invoice_Amount"
+ "_" + w for w in windows
],
windows=windows
)
Preview a feature group¶
In [9]:
Copied!
feature_group = fb.FeatureGroup([
customer_latest_invoice_amount,
customer_count_of_invoice_14d_28d,
customer_avg_of_invoice_amount_14d_28d,
customer_std_of_invoice_amount_14d_28d,
])
feature_group = fb.FeatureGroup([
customer_latest_invoice_amount,
customer_count_of_invoice_14d_28d,
customer_avg_of_invoice_amount_14d_28d,
customer_std_of_invoice_amount_14d_28d,
])
In [10]:
Copied!
# Check the primary entity of the feature group
feature_group.primary_entity
# Check the primary entity of the feature group
feature_group.primary_entity
Out[10]:
[<featurebyte.api.entity.Entity at 0x13b95d600> { 'name': 'customer', 'created_at': '2023-11-27T15:39:09.477000', 'updated_at': '2023-11-27T15:39:19.968000', 'description': None, 'serving_names': [ 'GROCERYCUSTOMERGUID' ], 'catalog_name': 'Grocery Dataset Tutorial' }]
In [11]:
Copied!
# Get observation table: 'Preview Table with 10 Customers'
preview_table = catalog.get_observation_table("Preview Table with 10 Customers").to_pandas()
# Get observation table: 'Preview Table with 10 Customers'
preview_table = catalog.get_observation_table("Preview Table with 10 Customers").to_pandas()
Downloading table |████████████████████████████████████████| 10/10 [100%] in 0.1
In [12]:
Copied!
# Preview feature_group
feature_group.preview(preview_table)
# Preview feature_group
feature_group.preview(preview_table)
Out[12]:
POINT_IN_TIME | GROCERYCUSTOMERGUID | CUSTOMER_Latest_invoice_Amount | CUSTOMER_Count_of_invoice_14d | CUSTOMER_Count_of_invoice_28d | CUSTOMER_Avg_of_invoice_Amount_14d | CUSTOMER_Avg_of_invoice_Amount_28d | CUSTOMER_Std_of_invoice_Amount_14d | CUSTOMER_Std_of_invoice_Amount_28d | |
---|---|---|---|---|---|---|---|---|---|
0 | 2022-11-28 11:36:31 | d4559f7d-eb28-42c6-b47d-847de24952c2 | 6.72 | 0 | 1 | NaN | 6.720000 | NaN | 0.000000 |
1 | 2022-10-09 15:47:55 | 3f8c7c4c-f2c2-408e-a08e-622de3d3a0b9 | 12.28 | 0 | 0 | NaN | NaN | NaN | NaN |
2 | 2022-09-14 15:42:42 | 35390325-8443-43c1-a934-18db923d9a47 | 10.02 | 0 | 4 | NaN | 26.415000 | NaN | 19.563101 |
3 | 2022-12-26 18:39:46 | 4eb4ee84-ee13-4eec-9c26-61b6eb4ba35b | 53.09 | 5 | 10 | 15.626000 | 26.450000 | 19.000809 | 26.837185 |
4 | 2022-12-06 08:47:43 | e42fa5f3-7737-4c6a-9ef4-856f113e60bd | 21.74 | 3 | 6 | 15.560000 | 10.555000 | 6.681337 | 7.307384 |
5 | 2022-11-09 12:14:40 | 8440debb-6abc-4adc-8c6c-749928141fd0 | 15.30 | 1 | 1 | 15.300000 | 15.300000 | 0.000000 | 0.000000 |
6 | 2022-10-12 17:32:15 | 8a54e527-e9a4-47a9-a28f-8b3c6ecc02db | 14.55 | 2 | 4 | 14.560000 | 15.050000 | 0.010000 | 10.109369 |
7 | 2023-01-01 11:51:28 | cea213d4-36e4-48c3-ae8d-c7a25911e11c | 0.89 | 12 | 29 | 3.209167 | 5.220345 | 4.154236 | 8.163223 |
8 | 2023-02-05 15:48:23 | 3b4f2821-b761-40e9-a32a-5f09685cc597 | 11.43 | 4 | 4 | 11.317500 | 11.317500 | 4.526297 | 4.526297 |
9 | 2023-03-10 16:15:46 | 91a64566-e212-4e36-8f23-c1f1f324a301 | 2.00 | 6 | 8 | 4.568333 | 10.431250 | 2.832763 | 11.335775 |
Save features into catalog¶
With feature groups we can do it in one call.
In [13]:
Copied!
feature_group.save()
feature_group.save()
Done! |████████████████████████████████████████| 100% in 13.0s (0.08%/s) Loading Feature(s) |████████████████████████████████████████| 7/7 [100%] in 0.6s
Add description and see feature definition files¶
In [14]:
Copied!
# Add description
customer_latest_invoice_amount.update_description("Latest invoice Amount for the customer")
# See feature definition file
customer_latest_invoice_amount.definition
# Add description
customer_latest_invoice_amount.update_description("Latest invoice Amount for the customer")
# See feature definition file
customer_latest_invoice_amount.definition
Out[14]:
# Generated by SDK version: 0.6.0.dev121
from bson import ObjectId
from featurebyte import ColumnCleaningOperation
from featurebyte import DisguisedValueImputation
from featurebyte import EventTable
from featurebyte import FeatureJobSetting
from featurebyte import ValueBeyondEndpointImputation
# event_table name: "GROCERYINVOICE"
event_table = EventTable.get_by_id(ObjectId("6564b7ebbeba6c193e0fe3bc"))
event_view = event_table.get_view(
view_mode="manual",
drop_column_names=["record_available_at"],
column_cleaning_operations=[
ColumnCleaningOperation(
column_name="Amount",
cleaning_operations=[
DisguisedValueImputation(
imputed_value=None, disguised_values=[-99.0, -98.0]
),
ValueBeyondEndpointImputation(
type="less_than", end_point=0.0, imputed_value=0.0
),
ValueBeyondEndpointImputation(
type="greater_than", end_point=2000.0, imputed_value=2000.0
),
],
)
],
)
grouped = event_view.groupby(
by_keys=["GroceryCustomerGuid"], category=None
).aggregate_over(
value_column="Amount",
method="latest",
windows=[None],
feature_names=["CUSTOMER_Latest_invoice_Amount"],
feature_job_setting=FeatureJobSetting(
blind_spot="120s", frequency="3600s", time_modulo_frequency="120s"
),
skip_fill_na=True,
)
feat = grouped["CUSTOMER_Latest_invoice_Amount"]
output = feat
output.save(_id=ObjectId("6564b8f0a69f1a9a43bdfd7d"))
In [15]:
Copied!
# Add description
customer_count_of_invoice_14d = customer_count_of_invoice_14d_28d["CUSTOMER_Count_of_invoice_14d"]
customer_count_of_invoice_14d.update_description(
"Sum of invoice Amount for the customer over a 14d period."
)
customer_count_of_invoice_28d = customer_count_of_invoice_14d_28d["CUSTOMER_Count_of_invoice_28d"]
customer_count_of_invoice_28d.update_description(
"Count of invoice for the customer over a 28d period."
)
# See feature definition file
customer_count_of_invoice_28d.definition
# Add description
customer_count_of_invoice_14d = customer_count_of_invoice_14d_28d["CUSTOMER_Count_of_invoice_14d"]
customer_count_of_invoice_14d.update_description(
"Sum of invoice Amount for the customer over a 14d period."
)
customer_count_of_invoice_28d = customer_count_of_invoice_14d_28d["CUSTOMER_Count_of_invoice_28d"]
customer_count_of_invoice_28d.update_description(
"Count of invoice for the customer over a 28d period."
)
# See feature definition file
customer_count_of_invoice_28d.definition
Out[15]:
# Generated by SDK version: 0.6.0.dev121
from bson import ObjectId
from featurebyte import EventTable
from featurebyte import FeatureJobSetting
# event_table name: "GROCERYINVOICE"
event_table = EventTable.get_by_id(ObjectId("6564b7ebbeba6c193e0fe3bc"))
event_view = event_table.get_view(
view_mode="manual",
drop_column_names=["record_available_at"],
column_cleaning_operations=[],
)
grouped = event_view.groupby(
by_keys=["GroceryCustomerGuid"], category=None
).aggregate_over(
value_column=None,
method="count",
windows=["28d"],
feature_names=["CUSTOMER_Count_of_invoice_28d"],
feature_job_setting=FeatureJobSetting(
blind_spot="120s", frequency="3600s", time_modulo_frequency="120s"
),
skip_fill_na=True,
)
feat = grouped["CUSTOMER_Count_of_invoice_28d"]
feat_1 = feat.copy()
feat_1[feat.isnull()] = 0
feat_1.name = "CUSTOMER_Count_of_invoice_28d"
output = feat_1
output.save(_id=ObjectId("6564b8f0a69f1a9a43bdfd80"))
In [16]:
Copied!
# Add description
customer_avg_of_invoice_amount_14d = customer_avg_of_invoice_amount_14d_28d["CUSTOMER_Avg_of_invoice_Amount_14d"]
customer_avg_of_invoice_amount_14d.update_description(
"Avg of invoice Amount for the customer over a 14d period."
)
customer_avg_of_invoice_amount_28d = customer_avg_of_invoice_amount_14d_28d["CUSTOMER_Avg_of_invoice_Amount_28d"]
customer_avg_of_invoice_amount_28d.update_description(
"Avg of invoice Amount for the customer over a 28d period."
)
# See feature definition file
customer_avg_of_invoice_amount_28d.definition
# Add description
customer_avg_of_invoice_amount_14d = customer_avg_of_invoice_amount_14d_28d["CUSTOMER_Avg_of_invoice_Amount_14d"]
customer_avg_of_invoice_amount_14d.update_description(
"Avg of invoice Amount for the customer over a 14d period."
)
customer_avg_of_invoice_amount_28d = customer_avg_of_invoice_amount_14d_28d["CUSTOMER_Avg_of_invoice_Amount_28d"]
customer_avg_of_invoice_amount_28d.update_description(
"Avg of invoice Amount for the customer over a 28d period."
)
# See feature definition file
customer_avg_of_invoice_amount_28d.definition
Out[16]:
# Generated by SDK version: 0.6.0.dev121
from bson import ObjectId
from featurebyte import ColumnCleaningOperation
from featurebyte import DisguisedValueImputation
from featurebyte import EventTable
from featurebyte import FeatureJobSetting
from featurebyte import ValueBeyondEndpointImputation
# event_table name: "GROCERYINVOICE"
event_table = EventTable.get_by_id(ObjectId("6564b7ebbeba6c193e0fe3bc"))
event_view = event_table.get_view(
view_mode="manual",
drop_column_names=["record_available_at"],
column_cleaning_operations=[
ColumnCleaningOperation(
column_name="Amount",
cleaning_operations=[
DisguisedValueImputation(
imputed_value=None, disguised_values=[-99.0, -98.0]
),
ValueBeyondEndpointImputation(
type="less_than", end_point=0.0, imputed_value=0.0
),
ValueBeyondEndpointImputation(
type="greater_than", end_point=2000.0, imputed_value=2000.0
),
],
)
],
)
grouped = event_view.groupby(
by_keys=["GroceryCustomerGuid"], category=None
).aggregate_over(
value_column="Amount",
method="avg",
windows=["28d"],
feature_names=["CUSTOMER_Avg_of_invoice_Amount_28d"],
feature_job_setting=FeatureJobSetting(
blind_spot="120s", frequency="3600s", time_modulo_frequency="120s"
),
skip_fill_na=True,
)
feat = grouped["CUSTOMER_Avg_of_invoice_Amount_28d"]
output = feat
output.save(_id=ObjectId("6564b8f0a69f1a9a43bdfd83"))
In [17]:
Copied!
# Add description
customer_std_of_invoice_amount_14d = customer_std_of_invoice_amount_14d_28d["CUSTOMER_Std_of_invoice_Amount_14d"]
customer_std_of_invoice_amount_14d.update_description(
"Std of invoice Amount for the customer over a 14d period."
)
customer_std_of_invoice_amount_28d = customer_std_of_invoice_amount_14d_28d["CUSTOMER_Std_of_invoice_Amount_28d"]
customer_std_of_invoice_amount_28d.update_description(
"Std of invoice Amount for the customer over a 28d period."
)
# See feature definition file
customer_std_of_invoice_amount_28d.definition
# Add description
customer_std_of_invoice_amount_14d = customer_std_of_invoice_amount_14d_28d["CUSTOMER_Std_of_invoice_Amount_14d"]
customer_std_of_invoice_amount_14d.update_description(
"Std of invoice Amount for the customer over a 14d period."
)
customer_std_of_invoice_amount_28d = customer_std_of_invoice_amount_14d_28d["CUSTOMER_Std_of_invoice_Amount_28d"]
customer_std_of_invoice_amount_28d.update_description(
"Std of invoice Amount for the customer over a 28d period."
)
# See feature definition file
customer_std_of_invoice_amount_28d.definition
Out[17]:
# Generated by SDK version: 0.6.0.dev121
from bson import ObjectId
from featurebyte import ColumnCleaningOperation
from featurebyte import DisguisedValueImputation
from featurebyte import EventTable
from featurebyte import FeatureJobSetting
from featurebyte import ValueBeyondEndpointImputation
# event_table name: "GROCERYINVOICE"
event_table = EventTable.get_by_id(ObjectId("6564b7ebbeba6c193e0fe3bc"))
event_view = event_table.get_view(
view_mode="manual",
drop_column_names=["record_available_at"],
column_cleaning_operations=[
ColumnCleaningOperation(
column_name="Amount",
cleaning_operations=[
DisguisedValueImputation(
imputed_value=None, disguised_values=[-99.0, -98.0]
),
ValueBeyondEndpointImputation(
type="less_than", end_point=0.0, imputed_value=0.0
),
ValueBeyondEndpointImputation(
type="greater_than", end_point=2000.0, imputed_value=2000.0
),
],
)
],
)
grouped = event_view.groupby(
by_keys=["GroceryCustomerGuid"], category=None
).aggregate_over(
value_column="Amount",
method="std",
windows=["28d"],
feature_names=["CUSTOMER_Std_of_invoice_Amount_28d"],
feature_job_setting=FeatureJobSetting(
blind_spot="120s", frequency="3600s", time_modulo_frequency="120s"
),
skip_fill_na=True,
)
feat = grouped["CUSTOMER_Std_of_invoice_Amount_28d"]
output = feat
output.save(_id=ObjectId("6564b8f0a69f1a9a43bdfd85"))
In [ ]:
Copied!