FeatureGroup#
To create a Feature group, use FeatureStore.create_feature_group or FeatureStore.get_or_create_feature_group. You can retrieve an existing feature group with FeatureStore.get_feature_group.
FeatureGroup #
Bases: FeatureGroupBase
data_source property writable #
data_source: ds.DataSource | None
The data source which was used to create the feature group, if any.
expectation_suite property writable #
expectation_suite: (
hsfs.expectation_suite.ExpectationSuite | None
)
Expectation Suite configuration object defining the settings for data validation of the feature group.
feature_store property writable #
feature_store: feature_store_mod.FeatureStore
Feature store to which the feature group belongs.
feature_store_id property #
feature_store_id: int | None
ID of the feature store to which the feature group belongs.
feature_store_name property #
feature_store_name: str | None
Name of the feature store in which the feature group is located.
hudi_precombine_key property writable #
hudi_precombine_key: str | None
Feature name that is the hudi precombine key.
materialization_job property #
materialization_job: Job | None
Get the Job object reference for the materialization job for this Feature Group.
notification_topic_name property writable #
notification_topic_name: str | None
The topic used for feature group notifications.
offline_backfill_every_hr property writable #
On Feature Group creation, used to set scheduled run of the materialisation job.
online_enabled property writable #
online_enabled: bool
Setting if the feature group is available in online storage.
parents property writable #
Parent feature groups as origin of the data in the current feature group.
This is part of explicit provenance.
partition_key property writable #
List of features building the partition key.
statistics property #
statistics: Statistics
Get the latest computed statistics for the whole feature group.
storage_connector property #
storage_connector: sc.StorageConnector
The storage connector which was used to create the feature group, if any.
time_travel_format property writable #
time_travel_format: str | None
Setting of the feature group time travel format.
topic_name property writable #
topic_name: str | None
The topic used for feature group data ingestion.
transformation_functions property writable #
transformation_functions: list[TransformationFunction]
Get transformation functions.
ttl property writable #
ttl: int | None
Get the time-to-live duration in seconds for features in this group.
The TTL determines how long features should be retained before being automatically removed. The value is always returned in seconds, regardless of how it was originally specified.
| RETURNS | DESCRIPTION |
|---|---|
int | None | The TTL value in seconds, or |
ttl_enabled property writable #
ttl_enabled: bool
Get whether TTL (time-to-live) is enabled for this feature group.
| RETURNS | DESCRIPTION |
|---|---|
bool |
|
add_tag #
Attach a tag to a feature group.
A tag consists of a name-value pair. Tag names are unique identifiers across the whole cluster. The value of a tag can be any valid json -- primitives, arrays or json objects.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg.add_tag(name="example_tag", value="42")
| PARAMETER | DESCRIPTION |
|---|---|
name | Name of the tag to be added. TYPE: |
value | Value of the tag to be added. TYPE: |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
append_features #
append_features(
features: feature.Feature | list[feature.Feature],
) -> (
FeatureGroupBase
| FeatureGroup
| ExternalFeatureGroup
| SpineGroup
)
Append features to the schema of the feature group.
Example
# connect to the Feature Store
fs = ...
# define features to be inserted in the feature group
features = [
Feature(name="id",type="int",online_type="int"),
Feature(name="name",type="string",online_type="varchar(20)")
]
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg.append_features(features)
Safe append
This method appends the features to the feature group description safely. In case of failure your local metadata object will contain the correct schema.
It is only possible to append features to a feature group. Removing features is considered a breaking change. Note that feature views built on top of this feature group will not read appended feature data. Create a new feature view based on an updated query via fg.select to include the new features.
| PARAMETER | DESCRIPTION |
|---|---|
features | A feature object or list thereof to append to the schema of the feature group. |
| RETURNS | DESCRIPTION |
|---|---|
FeatureGroupBase | FeatureGroup | ExternalFeatureGroup | SpineGroup | The updated feature group object. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
as_of #
as_of(
wallclock_time: str
| int
| datetime
| date
| None = None,
exclude_until: str
| int
| datetime
| date
| None = None,
) -> query.Query
Get Query object to retrieve all features of the group at a point in the past.
Pyspark/Spark Only
Apache HUDI exclusively supports Time Travel and Incremental Query via Spark Context
This method selects all features in the feature group and returns a Query object at the specified point in time. Optionally, commits before a specified point in time can be excluded from the query. The Query can then either be read into a Dataframe or used further to perform joins or construct a training dataset.
Reading features at a specific point in time
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
# get data at a specific point in time and show it
fg.as_of("2020-10-20 07:34:11").read().show()
Reading commits incrementally between specified points in time
fg.as_of("2020-10-20 07:34:11", exclude_until="2020-10-19 07:34:11").read().show()
The first parameter is inclusive while the latter is exclusive. That means, in order to query a single commit, you need to query that commit time and exclude everything just before the commit.
Reading only the changes from a single commit
fg.as_of("2020-10-20 07:31:38", exclude_until="2020-10-20 07:31:37").read().show()
When no wallclock_time is given, the latest state of features is returned. Optionally, commits before a specified point in time can still be excluded.
Reading the latest state of features, excluding commits before a specified point in time
fg.as_of(None, exclude_until="2020-10-20 07:31:38").read().show()
Note that the interval will be applied to all joins in the query. If you want to query different intervals for different feature groups in the query, you have to apply them in a nested fashion:
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg1 = fs.get_or_create_feature_group(...)
fg2 = fs.get_or_create_feature_group(...)
fg1.select_all().as_of("2020-10-20", exclude_until="2020-10-19")
.join(fg2.select_all().as_of("2020-10-20", exclude_until="2020-10-19"))
If instead you apply another as_of selection after the join, all joined feature groups will be queried with this interval:
Example
fg1.select_all().as_of("2020-10-20", exclude_until="2020-10-19") # as_of is not applied
.join(fg2.select_all().as_of("2020-10-20", exclude_until="2020-10-15")) # as_of is not applied
.as_of("2020-10-20", exclude_until="2020-10-19")
Warning
This function only works for feature groups with time_travel_format='HUDI'.
Warning
Excluding commits via exclude_until is only possible within the range of the Hudi active timeline. By default, Hudi keeps the last 20 to 30 commits in the active timeline. If you need to keep a longer active timeline, you can overwrite the options: hoodie.keep.min.commits and hoodie.keep.max.commits when calling the insert() method.
| PARAMETER | DESCRIPTION |
|---|---|
wallclock_time | Read data as of this point in time. Strings should be formatted in one of the following formats |
exclude_until | Exclude commits until this point in time. Strings should be formatted in one of the following formats |
| RETURNS | DESCRIPTION |
|---|---|
query.Query | The query object with the applied time travel condition. |
commit_delete_record #
commit_delete_record(
delete_df: TypeVar("pyspark.sql.DataFrame"),
write_options: dict[Any, Any] | None = None,
) -> None
Drops records present in the provided DataFrame and commits it as update to this Feature group.
This method can only be used on feature groups stored as HUDI or DELTA.
| PARAMETER | DESCRIPTION |
|---|---|
delete_df | dataFrame containing records to be deleted. TYPE: |
write_options | User provided write options. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
commit_details #
commit_details(
wallclock_time: str
| int
| datetime
| date
| None = None,
limit: int | None = None,
) -> dict[str, dict[str, str]]
Retrieves commit timeline for this feature group.
This method can only be used on time travel enabled feature groups
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
commit_details = fg.commit_details()
| PARAMETER | DESCRIPTION |
|---|---|
wallclock_time | Commit details as of specific point in time. Strings should be formatted in one of the following formats |
limit | Number of commits to retrieve. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
dict[str, dict[str, str]] | Dictionary object of commit metadata timeline, where Key is commit id and value is |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
hopsworks.client.exceptions.FeatureStoreException | If the feature group does not have |
compute_statistics #
Recompute the statistics for the feature group and save them to the feature store.
Statistics are only computed for data in the offline storage of the feature group.
| PARAMETER | DESCRIPTION |
|---|---|
wallclock_time | If specified will recompute statistics on feature group as of specific point in time. If not specified then will compute statistics as of most recent time of this feature group. Strings should be formatted in one of the following formats |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
create_alert #
create_alert(
receiver: str,
status: Literal[
"feature_validation_success",
"feature_validation_warning",
"feature_validation_failure",
"feature_monitor_shift_undetected",
"feature_monitor_shift_detected",
],
severity: Literal["info", "warning", "critical"],
) -> FeatureGroupAlert
Create an alert for this feature group.
| PARAMETER | DESCRIPTION |
|---|---|
receiver | The receiver of the alert. TYPE: |
status | The status that will trigger the alert. TYPE: |
severity | The severity of the alert. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
FeatureGroupAlert | The created FeatureGroupAlert object. |
| RAISES | DESCRIPTION |
|---|---|
ValueError | If the status is not valid. |
ValueError | If the severity is not valid. |
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
Example
fg.create_alert(
receiver="email",
status="feature_validation_failure",
severity="critical",
)
create_feature_monitoring #
create_feature_monitoring(
name: str,
feature_name: str,
description: str | None = None,
start_date_time: int
| str
| datetime
| date
| pd.Timestamp
| None = None,
end_date_time: int
| str
| datetime
| date
| pd.Timestamp
| None = None,
cron_expression: str | None = "0 0 12 ? * * *",
) -> fmc.FeatureMonitoringConfig
Enable feature monitoring to compare statistics on snapshots of feature data over time.
Experimental
Public API is subject to change, this feature is not suitable for production use-cases.
Example
# fetch feature group
fg = fs.get_feature_group(name="my_feature_group", version=1)
# enable feature monitoring
my_config = fg.create_feature_monitoring(
name="my_monitoring_config",
feature_name="my_feature",
description="my monitoring config description",
cron_expression="0 0 12 ? * * *",
).with_detection_window(
# Data inserted in the last day
time_offset="1d",
window_length="1d",
).with_reference_window(
# Data inserted last week on the same day
time_offset="1w1d",
window_length="1d",
).compare_on(
metric="mean",
threshold=0.5,
).save()
| PARAMETER | DESCRIPTION |
|---|---|
name | Name of the feature monitoring configuration. The name must be unique for all configurations attached to the feature group. TYPE: |
feature_name | Name of the feature to monitor. TYPE: |
description | Description of the feature monitoring configuration. TYPE: |
start_date_time | Start date and time from which to start computing statistics. TYPE: |
end_date_time | End date and time at which to stop computing statistics. TYPE: |
cron_expression | Cron expression to use to schedule the job. The cron expression must be in UTC and follow the Quartz specification. The default value means "every day at 12pm UTC". TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
fmc.FeatureMonitoringConfig | Configuration with minimal information about the feature monitoring. |
fmc.FeatureMonitoringConfig | Additional information are required before feature monitoring is enabled. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.FeatureStoreException | If feature group is not registered with Hopsworks. |
create_statistics_monitoring #
create_statistics_monitoring(
name: str,
feature_name: str | None = None,
description: str | None = None,
start_date_time: int
| str
| datetime
| date
| pd.Timestamp
| None = None,
end_date_time: int
| str
| datetime
| date
| pd.Timestamp
| None = None,
cron_expression: str | None = "0 0 12 ? * * *",
) -> fmc.FeatureMonitoringConfig
Run a job to compute statistics on snapshot of feature data on a schedule.
Experimental
Public API is subject to change, this feature is not suitable for production use-cases.
Example
# fetch feature group
fg = fs.get_feature_group(name="my_feature_group", version=1)
# enable statistics monitoring
my_config = fg.create_statistics_monitoring(
name="my_config",
start_date_time="2021-01-01 00:00:00",
description="my description",
cron_expression="0 0 12 ? * * *",
).with_detection_window(
# Statistics computed on 10% of the last week of data
time_offset="1w",
row_percentage=0.1,
).save()
| PARAMETER | DESCRIPTION |
|---|---|
name | Name of the feature monitoring configuration. The name must be unique for all configurations attached to the feature group. TYPE: |
feature_name | Name of the feature to monitor. If not specified, statistics will be computed for all features. TYPE: |
description | Description of the feature monitoring configuration. TYPE: |
start_date_time | Start date and time from which to start computing statistics. TYPE: |
end_date_time | End date and time at which to stop computing statistics. TYPE: |
cron_expression | Cron expression to use to schedule the job. The cron expression must be in UTC and follow the Quartz specification. The default value means "every day at 12pm UTC". TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
fmc.FeatureMonitoringConfig | Configuration with minimal information about the feature monitoring. |
fmc.FeatureMonitoringConfig | Additional information are required before feature monitoring is enabled. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.FeatureStoreException | If feature group is not registered with Hopsworks. |
delete #
delete() -> None
Drop the entire feature group along with its feature data.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(
name='bitcoin_price',
version=1
)
# delete the feature group
fg.delete()
Potentially dangerous operation
This operation drops all metadata associated with this version of the feature group and all the feature data in offline and online storage associated with it.
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
delete_expectation_suite #
delete_expectation_suite() -> None
Delete the expectation suite attached to the Feature Group.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg.delete_expectation_suite()
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
delete_tag #
delete_tag(name: str) -> None
Delete a tag attached to a feature group.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg.delete_tag("example_tag")
| PARAMETER | DESCRIPTION |
|---|---|
name | Name of the tag to be removed. TYPE: |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
delta_vacuum #
delta_vacuum(retention_hours: int = None) -> None
Vacuum files that are no longer referenced by a Delta table and are older than the retention threshold.
This method can only be used on feature groups stored as DELTA.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
commit_details = fg.delta_vacuum(retention_hours = 168)
| PARAMETER | DESCRIPTION |
|---|---|
retention_hours | User provided retention period. The default retention threshold for the files is 7 days. TYPE: |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
disable_ttl #
disable_ttl() -> FeatureGroup
Disable the time-to-live (TTL) configuration of the feature group.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
# Disable TTL
fg.disable_ttl()
Safe update
This method updates the TTL configuration safely. In case of failure your local metadata object will keep the old configuration.
| RETURNS | DESCRIPTION |
|---|---|
FeatureGroup | The updated feature group object. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
enable_ttl #
enable_ttl(
ttl: float | timedelta | None = None,
) -> (
FeatureGroupBase
| FeatureGroup
| ExternalFeatureGroup
| SpineGroup
)
Enable or update the time-to-live (TTL) configuration of the feature group.
If ttl is not set, the feature group will be enabled with the last TTL value being set.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
# Enable TTL with a TTL of 7 days
fg.enable_ttl(timedelta(days=7))
# Disable TTL
fg.disable_ttl()
# Enable TTL again with a TTL of 7 days
fg.enable_ttl()
Safe update
This method updates the TTL configuration safely. In case of failure your local metadata object will keep the old configuration.
| PARAMETER | DESCRIPTION |
|---|---|
ttl | Optional new TTL value. Can be specified as:
|
| RETURNS | DESCRIPTION |
|---|---|
FeatureGroupBase | FeatureGroup | ExternalFeatureGroup | SpineGroup | The updated feature group object. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
filter #
Apply filter to the feature group.
Selects all features and returns the resulting Query with the applied filter.
Example
from hsfs.feature import Feature
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg.filter(Feature("weekly_sales") > 1000)
If you are planning to join the filtered feature group later on with another feature group, make sure to select the filtered feature explicitly from the respective feature group:
Example
fg.filter(fg.feature1 == 1).show(10)
Composite filters require parenthesis and symbols for logical operands (e.g. &, |, ...):
Example
fg.filter((fg.feature1 == 1) | (fg.feature2 >= 2))
| PARAMETER | DESCRIPTION |
|---|---|
f | Filter object. |
| RETURNS | DESCRIPTION |
|---|---|
query.Query | The query object with the applied filter. |
finalize_multi_part_insert #
finalize_multi_part_insert() -> None
Finalizes and exits the multi part insert context opened by multi_part_insert in a blocking fashion once all rows have been transmitted.
Multi part insert with manual context management
Instead of letting Python handle the entering and exiting of the multi part insert context, you can start and finalize the context manually.
feature_group = fs.get_or_create_feature_group("fg_name", version=1)
while loop:
small_batch_df = ...
feature_group.multi_part_insert(small_batch_df)
# IMPORTANT: finalize the multi part insert to make sure all rows
# have been transmitted
feature_group.finalize_multi_part_insert()
Note that the first call to multi_part_insert initiates the context and be sure to finalize it. The finalize_multi_part_insert is a blocking call that returns once all rows have been transmitted.
find_neighbors #
find_neighbors(
embedding: list[int | float],
col: str | None = None,
k: int | None = 10,
filter: Filter | Logic | None = None,
options: dict | None = None,
) -> list[tuple[float, list[Any]]]
Finds the nearest neighbors for a given embedding in the vector database.
If filter is specified, or if embedding feature is stored in default project index, the number of results returned may be less than k. Try using a large value of k and extract the top k items from the results if needed.
| PARAMETER | DESCRIPTION |
|---|---|
embedding | The target embedding for which neighbors are to be found. |
col | The column name used to compute similarity score. Required only if there are multiple embeddings. TYPE: |
k | The number of nearest neighbors to retrieve. TYPE: |
filter | A filter expression to restrict the search space. TYPE: |
options | The options used for the request to the vector database. The keys are attribute values of the TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
list[tuple[float, list[Any]]] | A list of tuples representing the nearest neighbors. |
list[tuple[float, list[Any]]] | Each tuple contains: |
Example
embedding_index = EmbeddingIndex()
embedding_index.add_embedding(name="user_vector", dimension=3)
fg = fs.create_feature_group(
name='air_quality',
embedding_index = embedding_index,
version=1,
primary_key=['id1'],
online_enabled=True,
)
fg.insert(data)
fg.find_neighbors(
[0.1, 0.2, 0.3],
k=5,
)
# apply filter
fg.find_neighbors(
[0.1, 0.2, 0.3],
k=5,
filter=(fg.id1 > 10) & (fg.id1 < 30)
)
get_alert #
get_alert(alert_id: int) -> FeatureGroupAlert
Get an alert for this feature group by ID.
| PARAMETER | DESCRIPTION |
|---|---|
alert_id | The ID of the alert to get. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
FeatureGroupAlert | The FeatureGroupAlert object. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
Example
# Get a specific alert
alert = fg.get_alert(alert_id=1)
get_alerts #
get_alerts() -> list[FeatureGroupAlert]
Get all alerts for this feature group.
| RETURNS | DESCRIPTION |
|---|---|
list[FeatureGroupAlert] | The list of FeatureGroupAlerts. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
Example
# Get all alerts
alerts = fg.get_alerts()
get_all_statistics #
get_all_statistics(
computation_time: str
| float
| datetime
| date
| None = None,
feature_names: list[str] | None = None,
) -> list[Statistics] | None
Returns all the statistics metadata computed before a specific time for the current feature group.
If computation_time is None, all the statistics metadata are returned.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg_statistics = fg.get_statistics(computation_time=None)
| PARAMETER | DESCRIPTION |
|---|---|
computation_time | Date and time when statistics were computed. Strings should be formatted in one of the following formats |
feature_names | List of feature names of which statistics are retrieved. |
| RETURNS | DESCRIPTION |
|---|---|
list[Statistics] | None | Statistics object or |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
hopsworks.client.exceptions.FeatureStoreException | If statistics are not supported for this feature group type. |
get_all_validation_reports #
get_all_validation_reports(
ge_type: bool = HAS_GREAT_EXPECTATIONS,
) -> list[
ValidationReport
| great_expectations.core.ExpectationSuiteValidationResult
]
Return the latest validation report attached to the feature group if it exists.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
val_reports = fg.get_all_validation_reports()
| PARAMETER | DESCRIPTION |
|---|---|
ge_type | If TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
list[ValidationReport | great_expectations.core.ExpectationSuiteValidationResult] | All validation reports attached to the feature group. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
hopsworks.client.exceptions.FeatureStoreException | If feature group is not registered with Hopsworks. |
get_complex_features #
Returns the names of all features with a complex data type in this feature group.
Example
complex_dtype_features = fg.get_complex_features()
get_expectation_suite #
get_expectation_suite(
ge_type: bool = HAS_GREAT_EXPECTATIONS,
) -> (
hsfs.expectation_suite.ExpectationSuite
| great_expectations.core.ExpectationSuite
| None
)
Return the expectation suite attached to the feature group if it exists.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
exp_suite = fg.get_expectation_suite()
| PARAMETER | DESCRIPTION |
|---|---|
ge_type | If TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
hsfs.expectation_suite.ExpectationSuite | great_expectations.core.ExpectationSuite | None | The expectation suite attached to the feature group or |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
get_feature #
Retrieve a Feature object from the schema of the feature group.
There are several ways to access features of a feature group:
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
# get Feature instanse
fg.feature1
fg["feature1"]
fg.get_feature("feature1")
Note
Attribute access to features works only for non-reserved names. For example, features named id or name will not be accessible via fg.name, instead this will return the name of the feature group itself. Fall back on using the get_feature method.
| PARAMETER | DESCRIPTION |
|---|---|
name | The name of the feature to retrieve TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
feature.Feature | None | The feature object or |
get_feature_monitoring_configs #
get_feature_monitoring_configs(
name: str | None = None,
feature_name: str | None = None,
config_id: int | None = None,
) -> (
fmc.FeatureMonitoringConfig
| list[fmc.FeatureMonitoringConfig]
| None
)
Fetch all feature monitoring configs attached to the feature group, or fetch by name or feature name only.
If no arguments are provided the method will return all feature monitoring configs attached to the feature group, meaning all feature monitoring configs that are attach to a feature in the feature group. If you wish to fetch a single config, provide the its name. If you wish to fetch all configs attached to a particular feature, provide the feature name.
Example
# fetch your feature group
fg = fs.get_feature_group(name="my_feature_group", version=1)
# fetch all feature monitoring configs attached to the feature group
fm_configs = fg.get_feature_monitoring_configs()
# fetch a single feature monitoring config by name
fm_config = fg.get_feature_monitoring_configs(name="my_config")
# fetch all feature monitoring configs attached to a particular feature
fm_configs = fg.get_feature_monitoring_configs(feature_name="my_feature")
# fetch a single feature monitoring config with a given id
fm_config = fg.get_feature_monitoring_configs(config_id=1)
| PARAMETER | DESCRIPTION |
|---|---|
name | If provided fetch only the feature monitoring config with the given name. TYPE: |
feature_name | If provided, fetch only configs attached to a particular feature. TYPE: |
config_id | If provided, fetch only the feature monitoring config with the given id. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
fmc.FeatureMonitoringConfig | list[fmc.FeatureMonitoringConfig] | None | A list of feature monitoring configs. |
fmc.FeatureMonitoringConfig | list[fmc.FeatureMonitoringConfig] | None | If |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
hopsworks.client.exceptions.FeatureStoreException | If feature group is not registered with Hopsworks. |
ValueError | If both name and feature_name are provided. |
TypeError | If name or feature_name are not string or None. |
get_feature_monitoring_history #
get_feature_monitoring_history(
config_name: str | None = None,
config_id: int | None = None,
start_time: int | str | datetime | date | None = None,
end_time: int | str | datetime | date | None = None,
with_statistics: bool = True,
) -> list[fmr.FeatureMonitoringResult]
Fetch feature monitoring history for a given feature monitoring config.
Example
# fetch your feature group
fg = fs.get_feature_group(name="my_feature_group", version=1)
# fetch feature monitoring history for a given feature monitoring config
fm_history = fg.get_feature_monitoring_history(
config_name="my_config",
start_time="2020-01-01",
)
# fetch feature monitoring history for a given feature monitoring config id
fm_history = fg.get_feature_monitoring_history(
config_id=1,
start_time=datetime.now() - timedelta(weeks=2),
end_time=datetime.now() - timedelta(weeks=1),
with_statistics=False,
)
| PARAMETER | DESCRIPTION |
|---|---|
config_name | The name of the feature monitoring config to fetch history for. TYPE: |
config_id | The id of the feature monitoring config to fetch history for. TYPE: |
start_time | The start date of the feature monitoring history to fetch. |
end_time | The end date of the feature monitoring history to fetch. |
with_statistics | Whether to include statistics in the feature monitoring history. If TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
list[fmr.FeatureMonitoringResult] | A list of feature monitoring results containing the monitoring metadata as well as the computed statistics for the detection and reference window if requested. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
hopsworks.client.exceptions.FeatureStoreException | If feature group is not registered with Hopsworks. |
ValueError | If both config_name and config_id are provided. |
TypeError | If config_name or config_id are not respectively string, int or None. |
get_fg_name #
get_fg_name() -> str
Returns the full feature group name, that is, its base name combined with its version.
get_generated_feature_groups #
get_generated_feature_groups() -> (
explicit_provenance.Links | None
)
Get the generated feature groups using this feature group, based on explicit provenance.
These feature groups can be accessible or inaccessible. Explicit provenance does not track deleted generated feature group links, so deleted will always be empty. For inaccessible feature groups, only a minimal information is returned.
| RETURNS | DESCRIPTION |
|---|---|
explicit_provenance.Links | None | Object containing the section of provenance graph requested or |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
get_generated_feature_views #
get_generated_feature_views() -> (
explicit_provenance.Links | None
)
Get the generated feature view using this feature group, based on explicit provenance.
These feature views can be accessible or inaccessible. Explicit provenance does not track deleted generated feature view links, so deleted will always be empty. For inaccessible feature views, only a minimal information is returned.
| RETURNS | DESCRIPTION |
|---|---|
explicit_provenance.Links | None | Object containing the section of provenance graph requested or |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
get_latest_online_ingestion #
get_latest_online_ingestion() -> (
online_ingestion.OnlineIngestion
)
Retrieve the latest online ingestion operation for this feature group.
This method fetches metadata about the most recent online ingestion job, including its status and progress, if available.
| RETURNS | DESCRIPTION |
|---|---|
online_ingestion.OnlineIngestion | The latest OnlineIngestion object for this feature group. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
Example
fg = fs.get_feature_group("my_fg", 1)
latest_ingestion = fg.get_latest_online_ingestion()
get_latest_validation_report #
get_latest_validation_report(
ge_type: bool = HAS_GREAT_EXPECTATIONS,
) -> (
ValidationReport
| great_expectations.core.ExpectationSuiteValidationResult
| None
)
Return the latest validation report attached to the Feature Group if it exists.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
latest_val_report = fg.get_latest_validation_report()
| PARAMETER | DESCRIPTION |
|---|---|
ge_type | If TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
ValidationReport | great_expectations.core.ExpectationSuiteValidationResult | None | The latest validation report attached to the Feature Group or |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
get_online_ingestion #
get_online_ingestion(
id,
) -> online_ingestion.OnlineIngestion
Retrieve a specific online ingestion operation by its ID for this feature group.
This method fetches metadata about a particular online ingestion job, including its status and progress, if available.
| PARAMETER | DESCRIPTION |
|---|---|
id | The unique identifier of the online ingestion operation.
|
| RETURNS | DESCRIPTION |
|---|---|
online_ingestion.OnlineIngestion | The OnlineIngestion object with the specified ID. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
Example
fg = fs.get_feature_group("my_fg", 1)
ingestion = fg.get_online_ingestion(123)
get_parent_feature_groups #
get_parent_feature_groups() -> (
explicit_provenance.Links | None
)
Get the parents of this feature group, based on explicit provenance.
Parents are feature groups or external feature groups. These feature groups can be accessible, deleted or inaccessible. For deleted and inaccessible feature groups, only minimal information is returned.
| RETURNS | DESCRIPTION |
|---|---|
explicit_provenance.Links | None | Object containing the section of provenance graph requested or |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
get_statistics #
get_statistics(
computation_time: str
| float
| datetime
| date
| None = None,
feature_names: list[str] | None = None,
) -> Statistics | None
Returns the statistics computed at a specific time for the current feature group.
If computation_time is None, the most recent statistics are returned.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg_statistics = fg.get_statistics(computation_time=None)
| PARAMETER | DESCRIPTION |
|---|---|
computation_time | Date and time when statistics were computed. Strings should be formatted in one of the following formats: |
feature_names | List of feature names of which statistics are retrieved. |
| RETURNS | DESCRIPTION |
|---|---|
Statistics | None |
|
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
hopsworks.client.exceptions.FeatureStoreException | If statistics are not supported for this feature group type. |
get_statistics_by_commit_window #
get_statistics_by_commit_window(
from_commit_time: str
| int
| datetime
| date
| None = None,
to_commit_time: str
| int
| datetime
| date
| None = None,
feature_names: list[str] | None = None,
) -> Statistics | list[Statistics] | None
Returns the statistics computed on a specific commit window for this feature group.
If time travel is not enabled, it raises an exception.
If from_commit_time is None, the commit window starts from the first commit. If to_commit_time is None, the commit window ends at the last commit.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg_statistics = fg.get_statistics_by_commit_window(from_commit_time=None, to_commit_time=None)
| PARAMETER | DESCRIPTION |
|---|---|
to_commit_time | Date and time of the last commit of the window. Defaults to |
from_commit_time | Date and time of the first commit of the window. Defaults to |
feature_names | List of feature names of which statistics are retrieved. |
| RETURNS | DESCRIPTION |
|---|---|
Statistics | list[Statistics] | None | Statistics object or |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
get_storage_connector #
get_storage_connector() -> sc.StorageConnector | None
Get the storage connector using this feature group, based on explicit provenance.
Only the accessible storage connector is returned. For more items use the base method, see get_storage_connector_provenance.
| RETURNS | DESCRIPTION |
|---|---|
sc.StorageConnector | None | Storage connector or |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
get_storage_connector_provenance #
get_storage_connector_provenance() -> (
explicit_provenance.Links | None
)
Get the parents of this feature group, based on explicit provenance.
Parents are storage connectors. These storage connector can be accessible, deleted or inaccessible. For deleted and inaccessible storage connector, only minimal information is returned.
| RETURNS | DESCRIPTION |
|---|---|
explicit_provenance.Links | None | The storage connector used to generate this feature group or |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
get_tag #
get_tag(name: str) -> tag.Tag | None
Get the tags of a feature group.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg_tag_value = fg.get_tag("example_tag")
| PARAMETER | DESCRIPTION |
|---|---|
name | Name of the tag to get. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
tag.Tag | None | Tag value or |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
get_tags #
get_validation_history #
get_validation_history(
expectation_id: int,
start_validation_time: str
| int
| datetime
| date
| None = None,
end_validation_time: str
| int
| datetime
| date
| None = None,
filter_by: list[
Literal[
"INGESTED",
"REJECTED",
"FG_DATA",
"EXPERIMENT",
"UNKNOWN",
]
] = None,
ge_type: bool = HAS_GREAT_EXPECTATIONS,
) -> (
list[ValidationResult]
| list[
great_expectations.core.ExpectationValidationResult
]
)
Fetch validation history of an Expectation specified by its id.
Example
validation_history = fg.get_validation_history(
expectation_id=1,
filter_by=["REJECTED", "UNKNOWN"],
start_validation_time="2022-01-01 00:00:00",
end_validation_time=datetime.datetime.now(),
ge_type=False,
)
| PARAMETER | DESCRIPTION |
|---|---|
expectation_id | ID of the Expectation for which to fetch the validation history. TYPE: |
filter_by | List of ingestion_result category to keep. TYPE: |
start_validation_time | Fetch only validation result posterior to the provided time, inclusive. Supported format include timestamps(int), datetime, date or string formatted to be datutils parsable. See examples above. |
end_validation_time | Fetch only validation result prior to the provided time, inclusive. Supported format include timestamps(int), datetime, date or string formatted to be datutils parsable. See examples above. |
ge_type | If TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
list[ValidationResult] | list[great_expectations.core.ExpectationValidationResult] | A list of validation result connected to the expectation_id |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
insert #
insert(
features: pd.DataFrame
| pl.DataFrame
| TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
| np.ndarray
| list[list],
overwrite: bool = False,
operation: Literal["insert", "upsert"] = "upsert",
storage: str | None = None,
write_options: dict[str, Any] | None = None,
validation_options: dict[str, Any] | None = None,
wait: bool = False,
transformation_context: dict[str, Any] = None,
transform: bool = True,
) -> tuple[Job | None, ValidationReport | None]
Persist the metadata and materialize the feature group to the feature store or insert data from a dataframe into the existing feature group.
Incrementally insert data to a feature group or overwrite all data contained in the feature group. By default, the data is inserted into the offline storage as well as the online storage if the feature group is online_enabled=True.
The features dataframe can be a Spark DataFrame or RDD, a Pandas DataFrame, a Polars DataFrame or a two-dimensional Numpy array or a two-dimensional Python nested list. If statistics are enabled, statistics are recomputed for the entire feature group. If feature group's time travel format is HUDI then operation argument can be either insert or upsert.
If feature group doesn't exist the insert method will create the necessary metadata the first time it is invoked and writes the specified features dataframe as feature group to the online/offline feature store.
Changed in 3.3.0
insert and save methods are now async by default in non-spark clients. To achieve the old behaviour, set wait argument to True.
Upsert new feature data with time travel format HUDI
# connect to the Feature Store
fs = ...
fg = fs.get_or_create_feature_group(
name='bitcoin_price',
description='Bitcoin price aggregated for days',
version=1,
primary_key=['unix'],
online_enabled=True,
event_time='unix'
)
fg.insert(df_bitcoin_processed)
Async insert
# connect to the Feature Store
fs = ...
fg1 = fs.get_or_create_feature_group(
name='feature_group_name1',
description='Description of the first FG',
version=1,
primary_key=['unix'],
online_enabled=True,
event_time='unix'
)
# async insertion in order not to wait till finish of the job
fg.insert(df_for_fg1, write_options={"wait_for_job" : False})
fg2 = fs.get_or_create_feature_group(
name='feature_group_name2',
description='Description of the second FG',
version=1,
primary_key=['unix'],
online_enabled=True,
event_time='unix'
)
fg.insert(df_for_fg2)
| PARAMETER | DESCRIPTION |
|---|---|
features | Features to be saved. TYPE: |
overwrite | Drop all data in the feature group before inserting new data. This does not affect metadata. TYPE: |
operation | Apache Hudi operation type TYPE: |
storage | Overwrite default behaviour, write to offline storage only with TYPE: |
write_options | Additional write options as key-value pairs. When using the
|
validation_options | Additional validation options as key-value pairs.
|
wait | Wait for job and online ingestion to finish before returning. Shortcut for write_options TYPE: |
transformation_context | A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. |
transform | When set to TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
Job | The job information if python engine is used. TYPE: |
ValidationReport | The validation report if validation is enabled. TYPE: |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | e.g., fail to create feature group, dataframe schema does not match existing feature group schema, etc. |
hsfs.client.exceptions.DataValidationException | If data validation fails and the expectation suite |
insert_stream #
insert_stream(
features: TypeVar("pyspark.sql.DataFrame"),
query_name: str | None = None,
output_mode: Literal[
"append", "complete", "update"
] = "append",
await_termination: bool = False,
timeout: int | None = None,
checkpoint_dir: str | None = None,
write_options: dict[str, Any] | None = None,
transformation_context: dict[str, Any] = None,
transform: bool = True,
) -> TypeVar("StreamingQuery")
Ingest a Spark Structured Streaming Dataframe to the online feature store.
This method creates a long running Spark Streaming Query, you can control the termination of the query through the arguments.
It is possible to stop the returned query with the .stop() and check its status with .isActive.
To get a list of all active queries, use:
sqm = spark.streams
# get the list of active streaming queries
[q.name for q in sqm.active]
Engine Support
Spark only
Stream ingestion using Pandas/Python as engine is currently not supported. Python/Pandas has no notion of streaming.
Data Validation Support
insert_stream does not perform any data validation using Great Expectations even when a expectation suite is attached.
| PARAMETER | DESCRIPTION |
|---|---|
features | Features in Streaming Dataframe to be saved. TYPE: |
query_name | It is possible to optionally specify a name for the query to make it easier to recognise in the Spark UI. TYPE: |
output_mode | Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
TYPE: |
await_termination | Waits for the termination of this query, either by query.stop() or by an exception. If the query has terminated with an exception, then the exception will be thrown. If timeout is set, it returns whether the query has terminated or not within the timeout seconds. TYPE: |
timeout | Only relevant in combination with TYPE: |
checkpoint_dir | Checkpoint directory location. This will be used to as a reference to from where to resume the streaming job. If TYPE: |
write_options | Additional write options for Spark as key-value pairs. |
transformation_context | A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. |
transform | When set to TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
TypeVar('StreamingQuery') | Spark Structured Streaming Query object. |
multi_part_insert #
multi_part_insert(
features: pd.DataFrame
| pl.DataFrame
| TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
| np.ndarray
| list[list]
| None = None,
overwrite: bool = False,
operation: Literal["insert", "upsert"] = "upsert",
storage: str | None = None,
write_options: dict[str, Any] | None = None,
validation_options: dict[str, Any] | None = None,
transformation_context: dict[str, Any] = None,
transform: bool = True,
) -> (
tuple[Job | None, ValidationReport | None]
| feature_group_writer.FeatureGroupWriter
)
Get FeatureGroupWriter for optimized multi part inserts or call this method to start manual multi part optimized inserts.
In use cases where very small batches (1 to 1000) rows per Dataframe need to be written to the feature store repeatedly, it might be inefficient to use the standard feature_group.insert() method as it performs some background actions to update the metadata of the feature group object first.
For these cases, the feature group provides the multi_part_insert API, which is optimized for writing many small Dataframes after another.
There are two ways to use this API:
Python Context Manager
Using the Python with syntax you can acquire a FeatureGroupWriter object that implements the same multi_part_insert API.
feature_group = fs.get_or_create_feature_group("fg_name", version=1)
with feature_group.multi_part_insert() as writer:
# run inserts in a loop:
while loop:
small_batch_df = ...
writer.insert(small_batch_df)
The writer batches the small Dataframes and transmits them to Hopsworks efficiently. When exiting the context, the feature group writer is sure to exit only once all the rows have been transmitted.
Multi part insert with manual context management
Instead of letting Python handle the entering and exiting of the multi part insert context, you can start and finalize the context manually.
feature_group = fs.get_or_create_feature_group("fg_name", version=1)
while loop:
small_batch_df = ...
feature_group.multi_part_insert(small_batch_df)
# IMPORTANT: finalize the multi part insert to make sure all rows
# have been transmitted
feature_group.finalize_multi_part_insert()
Note that the first call to multi_part_insert initiates the context and be sure to finalize it. The finalize_multi_part_insert is a blocking call that returns once all rows have been transmitted.
Once you are done with the multi part insert, it is good practice to start the materialization job in order to write the data to the offline storage:
feature_group.materialization_job.run(await_termination=True)
| PARAMETER | DESCRIPTION |
|---|---|
features | Features to be saved. TYPE: |
overwrite | Drop all data in the feature group before inserting new data. This does not affect metadata. TYPE: |
operation | Apache Hudi operation type TYPE: |
storage | Overwrite default behaviour, write to offline storage only with TYPE: |
write_options | Additional write options as key-value pairs. When using the
|
validation_options | Additional validation options as key-value pairs.
|
transformation_context | A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. These variables must be explicitly defined as parameters in the transformation function to be accessible during execution. |
transform | When set to TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
tuple[Job | None, ValidationReport | None] | feature_group_writer.FeatureGroupWriter | One of: |
tuple[Job | None, ValidationReport | None] | feature_group_writer.FeatureGroupWriter |
|
tuple[Job | None, ValidationReport | None] | feature_group_writer.FeatureGroupWriter |
|
read #
read(
wallclock_time: str
| int
| datetime
| date
| None = None,
online: bool = False,
dataframe_type: Literal[
"default",
"spark",
"pandas",
"polars",
"numpy",
"python",
] = "default",
read_options: dict | None = None,
) -> (
pd.DataFrame
| np.ndarray
| list[list[Any]]
| TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
| pl.DataFrame
)
Read the feature group into a dataframe.
Reads the feature group by default from the offline storage as Spark DataFrame on Hopsworks and Databricks, and as Pandas dataframe on AWS Sagemaker and pure Python environments.
Set online to True to read from the online storage, or change dataframe_type to read as a different format.
Reading feature group as of latest state
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg.read()
Reading feature group as of specific point in time:
fg = fs.get_or_create_feature_group(...)
fg.read("2020-10-20 07:34:11")
| PARAMETER | DESCRIPTION |
|---|---|
wallclock_time | If specified, retrieves feature group as of specific point in time. If not specified, returns as of most recent time. Strings should be formatted in one of the following formats |
online | If TYPE: |
dataframe_type | The type of the returned dataframe. By default, maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine. TYPE: |
read_options | Additional options as key/value pairs to pass to the execution engine. For spark engine: Dictionary of read options for Spark. For python engine: - key TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame | One of the following: |
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame |
|
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame |
|
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame |
|
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame |
|
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame |
|
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame |
|
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | No data is available for feature group with this commit date, if time travel enabled. |
read_changes #
read_changes(
start_wallclock_time: str | int | datetime | date,
end_wallclock_time: str | int | datetime | date,
read_options: dict | None = None,
) -> (
pd.DataFrame
| np.ndarray
| list[list[Any]]
| TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
| pl.DataFrame
)
Reads updates of this feature that occurred between specified points in time.
Deprecated
read_changes method is deprecated. Use as_of(end_wallclock_time, exclude_until=start_wallclock_time).read(read_options=read_options) instead.
Pyspark/Spark Only
Apache HUDI exclusively supports Time Travel and Incremental Query via Spark Context.
Warning
This function only works for feature groups with time_travel_format='HUDI'.
| PARAMETER | DESCRIPTION |
|---|---|
start_wallclock_time | Start time of the time travel query. Strings should be formatted in one of the following formats |
end_wallclock_time | End time of the time travel query. Strings should be formatted in one of the following formats |
read_options | Additional options as key/value pairs to pass to the execution engine. For spark engine, it is a dictionary of read options for Spark. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame | The spark dataframe containing the incremental changes of feature data. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | No data is available for feature group with this commit date. |
hopsworks.client.exceptions.FeatureStoreException | If the feature group does not have |
save #
save(
features: pd.DataFrame
| pl.DataFrame
| TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
| np.ndarray
| list[feature.Feature] = None,
write_options: dict[str, Any] | None = None,
validation_options: dict[str, Any] | None = None,
wait: bool = False,
) -> tuple[
Job | None,
great_expectations.core.ExpectationSuiteValidationResult
| None,
]
Persist the metadata and materialize the feature group to the feature store.
Changed in 3.3.0
insert and save methods are now async by default in non-spark clients. To achieve the old behaviour, set wait argument to True.
Calling save creates the metadata for the feature group in the feature store. If a Pandas DataFrame, Polars DatFrame, RDD or Ndarray is provided, the data is written to the online/offline feature store as specified. By default, this writes the feature group to the offline storage, and if online_enabled for the feature group, also to the online feature store. The features dataframe can be a Spark DataFrame or RDD, a Pandas DataFrame, or a two-dimensional Numpy array or a two-dimensional Python nested list.
| PARAMETER | DESCRIPTION |
|---|---|
features | Features to be saved. This argument is optional if the feature list is provided in the TYPE: |
write_options | Additional write options as key-value pairs. When using the
|
validation_options | Additional validation options as key-value pairs.
|
wait | Wait for job and online ingestion to finish before returning. Shortcut for write_options TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
tuple[Job | None, great_expectations.core.ExpectationSuiteValidationResult | None] | When using the |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
save_expectation_suite #
save_expectation_suite(
expectation_suite: hsfs.expectation_suite.ExpectationSuite
| great_expectations.core.ExpectationSuite,
run_validation: bool = True,
validation_ingestion_policy: Literal[
"always", "strict"
] = "always",
overwrite: bool = False,
) -> (
hsfs.expectation_suite.ExpectationSuite
| great_expectations.core.ExpectationSuite
)
Attach an expectation suite to a feature group and saves it for future use.
If an expectation suite is already attached, it is replaced. Note that the provided expectation suite is modified inplace to include expectationId fields.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg.save_expectation_suite(expectation_suite, run_validation=True)
| PARAMETER | DESCRIPTION |
|---|---|
expectation_suite | The expectation suite to attach to the Feature Group. TYPE: |
overwrite | If an Expectation Suite is already attached, overwrite it. The new suite will have its own validation history, but former reports are preserved. TYPE: |
run_validation | Set whether the expectation_suite will run on ingestion. TYPE: |
validation_ingestion_policy | Set the policy for ingestion to the Feature Group.
TYPE: |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
save_validation_report #
save_validation_report(
validation_report: dict[str, Any]
| ValidationReport
| great_expectations.core.expectation_validation_result.ExpectationSuiteValidationResult,
ingestion_result: Literal[
"UNKNOWN",
"INGESTED",
"REJECTED",
"EXPERIMENT",
"FG_DATA",
] = "UNKNOWN",
ge_type: bool = HAS_GREAT_EXPECTATIONS,
) -> (
ValidationReport
| great_expectations.core.ExpectationSuiteValidationResult
)
Save validation report to hopsworks platform along previous reports of the same Feature Group.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(..., expectation_suite=expectation_suite)
validation_report = great_expectations.from_pandas(
my_experimental_features_df,
fg.get_expectation_suite()).validate()
fg.save_validation_report(validation_report, ingestion_result="EXPERIMENT")
| PARAMETER | DESCRIPTION |
|---|---|
validation_report | The validation report to attach to the Feature Group. TYPE: |
ingestion_result | Specify the fate of the associated data, defaults to TYPE: |
ge_type | If TYPE: |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
hopsworks.client.exceptions.FeatureStoreException | If feature group is not registered with Hopsworks. |
select #
Select a subset of features of the feature group and return a query object.
The query can be used to construct joins of feature groups or create a feature view with a subset of features of the feature group.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
from hsfs.feature import Feature
fg = fs.create_feature_group(
"fg",
features=[
Feature("id", type="string"),
Feature("ts", type="bigint"),
Feature("f1", type="date"),
Feature("f2", type="double")
],
primary_key=["id"],
event_time="ts")
# construct query
query = fg.select(["id", "f1"])
query.features
# [Feature('id', ...), Feature('f1', ...)]
| PARAMETER | DESCRIPTION |
|---|---|
features | A list of |
| RETURNS | DESCRIPTION |
|---|---|
query.Query | A query object with the selected features of the feature group. |
select_all #
select_all(
include_primary_key: bool = True,
include_foreign_key: bool = True,
include_partition_key: bool = True,
include_event_time: bool = True,
) -> query.Query
Select all features along with primary key and event time from the feature group and return a query object.
The query can be used to construct joins of feature groups or create a feature view.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instances
fg1 = fs.get_or_create_feature_group(...)
fg2 = fs.get_or_create_feature_group(...)
# construct the query
query = fg1.select_all().join(fg2.select_all())
# show first 5 rows
query.show(5)
# select all features exclude primary key and event time
from hsfs.feature import Feature
fg = fs.create_feature_group(
"fg",
features=[
Feature("id", type="string"),
Feature("ts", type="bigint"),
Feature("f1", type="date"),
Feature("f2", type="double")
],
primary_key=["id"],
event_time="ts")
query = fg.select_all()
query.features
# [Feature('id', ...), Feature('ts', ...), Feature('f1', ...), Feature('f2', ...)]
query = fg.select_all(include_primary_key=False, include_event_time=False)
query.features
# [Feature('f1', ...), Feature('f2', ...)]
| PARAMETER | DESCRIPTION |
|---|---|
include_primary_key | If TYPE: |
include_foreign_key | If TYPE: |
include_partition_key | If TYPE: |
include_event_time | If TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
query.Query | A query object with all features of the feature group. |
select_except #
Select all features including primary key and event time feature of the feature group except provided features and return a query object.
The query can be used to construct joins of feature groups or create a feature view with a subset of features of the feature group.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
from hsfs.feature import Feature
fg = fs.create_feature_group(
"fg",
features=[
Feature("id", type="string"),
Feature("ts", type="bigint"),
Feature("f1", type="date"),
Feature("f2", type="double")
],
primary_key=["id"],
event_time="ts")
# construct query
query = fg.select_except(["ts", "f1"])
query.features
# [Feature('id', ...), Feature('f1', ...)]
| PARAMETER | DESCRIPTION |
|---|---|
features | A list of |
| RETURNS | DESCRIPTION |
|---|---|
query.Query | A query object with the selected features of the feature group. |
select_features #
select_features() -> query.Query
Select all the features in the feature group and return a query object.
Queries define the schema of Feature View objects which can be used to create Training Datasets, read from the Online Feature Store, and more. They can also be composed to create more complex queries using the join method.
Info
This method does not select the primary key and event time of the feature group. Use select_all to include them. Note that primary keys do not need to be included in the query to allow joining on them.
Example
# connect to the Feature Store
fs = hopsworks.login().get_feature_store()
# Some dataframe to create the feature group with
# both an event time and a primary key column
my_df.head()
+------------+------------+------------+------------+
| id | feature_1 | ... | ts |
+------------+------------+------------+------------+
| 8 | 8 | | 15 |
| 3 | 3 | ... | 6 |
| 1 | 1 | | 18 |
+------------+------------+------------+------------+
# Create the Feature Group instances
fg1 = fs.create_feature_group(
name = "fg1",
version=1,
primary_key=["id"],
event_time="ts",
)
# Insert data to the feature group.
fg1.insert(my_df)
# select all features from `fg1` excluding primary key and event time
query = fg1.select_features()
# show first 3 rows
query.show(3)
# Output, no id or ts columns
+------------+------------+------------+
| feature_1 | feature_2 | feature_3 |
+------------+------------+------------+
| 8 | 7 | 15 |
| 3 | 1 | 6 |
| 1 | 2 | 18 |
+------------+------------+------------+
Example
# connect to the Feature Store
fs = hopsworks.login().get_feature_store()
# Get the Feature Group from the previous example
fg1 = fs.get_feature_group("fg1", 1)
# Some dataframe to create another feature group
# with a primary key column
+------------+------------+------------+
| id_2 | feature_6 | feature_7 |
+------------+------------+------------+
| 8 | 11 | |
| 3 | 4 | ... |
| 1 | 9 | |
+------------+------------+------------+
# join the two feature groups on their indexes, `id` and `id_2`
# but does not include them in the query
query = fg1.select_features().join(fg2.select_features(), left_on="id", right_on="id_2")
# show first 5 rows
query.show(3)
# Output
+------------+------------+------------+------------+------------+
| feature_1 | feature_2 | feature_3 | feature_6 | feature_7 |
+------------+------------+------------+------------+------------+
| 8 | 7 | 15 | 11 | 15 |
| 3 | 1 | 6 | 4 | 3 |
| 1 | 2 | 18 | 9 | 20 |
+------------+------------+------------+------------+------------+
| RETURNS | DESCRIPTION |
|---|---|
query.Query | A query object with all features of the feature group. |
show #
Show the first n rows of the feature group.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
# make a query and show top 5 rows
fg.select(['date','weekly_sales','is_holiday']).show(5)
| PARAMETER | DESCRIPTION |
|---|---|
n | Number of rows to show. TYPE: |
online | If TYPE: |
to_dict #
Get structured info about specific Feature Group in python dictionary format.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg.to_dict()
update_deprecated #
update_deprecated(
deprecate: bool = True,
) -> (
FeatureGroupBase
| FeatureGroup
| ExternalFeatureGroup
| SpineGroup
)
Deprecate the feature group.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg.update_deprecated(deprecate=True)
Safe update
This method updates the feature group safely. In case of failure your local metadata object will be kept unchanged.
| PARAMETER | DESCRIPTION |
|---|---|
deprecate | Whether the feature group should be deprecated. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
FeatureGroupBase | FeatureGroup | ExternalFeatureGroup | SpineGroup | The updated feature group object. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
update_description #
update_description(
description: str,
) -> (
FeatureGroupBase
| FeatureGroup
| ExternalFeatureGroup
| SpineGroup
)
Update the description of the feature group.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg.update_description(description="Much better description.")
Safe update
This method updates the feature group description safely. In case of failure your local metadata object will keep the old description.
| PARAMETER | DESCRIPTION |
|---|---|
description | New description string. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
FeatureGroupBase | FeatureGroup | ExternalFeatureGroup | SpineGroup | The updated feature group object. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
update_feature_description #
update_feature_description(
feature_name: str, description: str
) -> (
FeatureGroupBase
| FeatureGroup
| ExternalFeatureGroup
| SpineGroup
)
Update the description of a single feature in this feature group.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg.update_feature_description(
feature_name="min_temp",
description="Much better feature description.",
)
Safe update
This method updates the feature description safely. In case of failure your local metadata object will keep the old description.
| PARAMETER | DESCRIPTION |
|---|---|
feature_name | Name of the feature to be updated. TYPE: |
description | New description string. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
FeatureGroupBase | FeatureGroup | ExternalFeatureGroup | SpineGroup | The updated feature group object. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
update_features #
update_features(
features: feature.Feature | list[feature.Feature],
) -> (
FeatureGroupBase
| FeatureGroup
| ExternalFeatureGroup
| SpineGroup
)
Update metadata of features in this feature group.
Currently it's only supported to update the description of a feature.
Unsafe update
Note that if you use an existing Feature object of the schema in the feature group metadata object, this might leave your metadata object in a corrupted state if the update fails.
| PARAMETER | DESCRIPTION |
|---|---|
features | A feature object or list thereof to be updated. |
| RETURNS | DESCRIPTION |
|---|---|
FeatureGroupBase | FeatureGroup | ExternalFeatureGroup | SpineGroup | The updated feature group object. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
update_notification_topic_name #
update_notification_topic_name(
notification_topic_name: str,
) -> (
FeatureGroupBase
| ExternalFeatureGroup
| SpineGroup
| FeatureGroup
)
Update the notification topic name of the feature group.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg.update_notification_topic_name(notification_topic_name="notification_topic_name")
Safe update
This method updates the feature group notification topic name safely. In case of failure your local metadata object will keep the old notification topic name.
| PARAMETER | DESCRIPTION |
|---|---|
notification_topic_name | Name of the topic used for sending notifications when entries are inserted or updated on the online feature store. If set to None no notifications are sent. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
FeatureGroupBase | ExternalFeatureGroup | SpineGroup | FeatureGroup | The updated feature group object. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
update_statistics_config #
update_statistics_config() -> (
FeatureGroup
| ExternalFeatureGroup
| SpineGroup
| FeatureGroupBase
)
Update the statistics configuration of the feature group.
Change the statistics_config object and persist the changes by calling this method.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg.update_statistics_config()
| RETURNS | DESCRIPTION |
|---|---|
FeatureGroup | ExternalFeatureGroup | SpineGroup | FeatureGroupBase | The updated metadata object of the feature group. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
hopsworks.client.exceptions.FeatureStoreException | If statistics are not supported for this feature group type. |
validate #
validate(
dataframe: pd.DataFrame
| TypeVar("pyspark.sql.DataFrame")
| None = None,
expectation_suite: hsfs.expectation_suite.ExpectationSuite
| None = None,
save_report: bool = False,
validation_options: dict[str, Any] | None = None,
ingestion_result: Literal[
"UNKNOWN",
"INGESTED",
"REJECTED",
"EXPERIMENT",
"FG_DATA",
] = "UNKNOWN",
ge_type: bool = True,
) -> (
great_expectations.core.ExpectationSuiteValidationResult
| ValidationReport
| None
)
Run validation based on the attached expectations.
Runs the expectation suite attached to the feature group against the provided dataframe. Raise an error if the great_expectations package is not installed.
Example
# connect to the Feature Store
fs = ...
# get feature group instance
fg = fs.get_or_create_feature_group(...)
ge_report = fg.validate(df, save_report=False)
| PARAMETER | DESCRIPTION |
|---|---|
dataframe | The dataframe to run the data validation expectations against. TYPE: |
expectation_suite | Optionally provide an Expectation Suite to override the one that is possibly attached to the feature group. This is useful for testing new Expectation suites. When an extra suite is provided, the results will never be persisted. TYPE: |
validation_options | Additional validation options as key-value pairs.
|
ingestion_result | Specify the fate of the associated data. Use TYPE: |
save_report | Whether to save the report to the backend. This is only possible if the Expectation suite is initialised and attached to the Feature Group. TYPE: |
ge_type | Whether to return a Great Expectations object or Hopsworks own abstraction. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
great_expectations.core.ExpectationSuiteValidationResult | ValidationReport | None | A Validation Report produced by Great Expectations. |