Skip to content

FeatureGroup#

To create a Feature group, use FeatureStore.create_feature_group or FeatureStore.get_or_create_feature_group. You can retrieve an existing feature group with FeatureStore.get_feature_group.

FeatureGroup #

Bases: FeatureGroupBase

avro_schema property #

avro_schema: str

Avro schema representation of the feature group.

created property #

created: str | None

Timestamp when the feature group was created.

creator property #

creator: user.User | None

Username of the creator.

data_source property writable #

data_source: ds.DataSource | None

The data source which was used to create the feature group, if any.

deprecated property writable #

deprecated: bool

Setting if the feature group is deprecated.

description property writable #

description: str | None

Description of the feature group contents.

event_time property writable #

event_time: str | None

Event time feature in the feature group.

expectation_suite property writable #

expectation_suite: (
    hsfs.expectation_suite.ExpectationSuite | None
)

Expectation Suite configuration object defining the settings for data validation of the feature group.

feature_store property writable #

feature_store: feature_store_mod.FeatureStore

Feature store to which the feature group belongs.

feature_store_id property #

feature_store_id: int | None

ID of the feature store to which the feature group belongs.

feature_store_name property #

feature_store_name: str | None

Name of the feature store in which the feature group is located.

features property writable #

features: list[feature.Feature]

Feature Group schema (alias).

hudi_precombine_key property writable #

hudi_precombine_key: str | None

Feature name that is the hudi precombine key.

id property #

id: int | None

Feature group id.

materialization_job property #

materialization_job: Job | None

Get the Job object reference for the materialization job for this Feature Group.

name property #

name: str | None

Name of the feature group.

notification_topic_name property writable #

notification_topic_name: str | None

The topic used for feature group notifications.

offline_backfill_every_hr property writable #

offline_backfill_every_hr: int | str | None

On Feature Group creation, used to set scheduled run of the materialisation job.

online_enabled property writable #

online_enabled: bool

Setting if the feature group is available in online storage.

parents property writable #

parents: list[explicit_provenance.Links]

Parent feature groups as origin of the data in the current feature group.

This is part of explicit provenance.

partition_key property writable #

partition_key: list[str]

List of features building the partition key.

schema property #

schema: list[feature.Feature]

Feature Group schema.

statistics property #

statistics: Statistics

Get the latest computed statistics for the whole feature group.

storage_connector property #

storage_connector: sc.StorageConnector

The storage connector which was used to create the feature group, if any.

stream property writable #

stream: bool

Whether to enable real time stream writing capabilities.

subject property #

subject: dict[str, Any]

Subject of the feature group.

time_travel_format property writable #

time_travel_format: str | None

Setting of the feature group time travel format.

topic_name property writable #

topic_name: str | None

The topic used for feature group data ingestion.

transformation_functions property writable #

transformation_functions: list[TransformationFunction]

Get transformation functions.

ttl property writable #

ttl: int | None

Get the time-to-live duration in seconds for features in this group.

The TTL determines how long features should be retained before being automatically removed. The value is always returned in seconds, regardless of how it was originally specified.

RETURNS DESCRIPTION
int | None

The TTL value in seconds, or None if no TTL is set.

ttl_enabled property writable #

ttl_enabled: bool

Get whether TTL (time-to-live) is enabled for this feature group.

RETURNS DESCRIPTION
bool

True if TTL is enabled, False otherwise

version property writable #

version: int | None

Version number of the feature group.

add_tag #

add_tag(name: str, value: Any) -> None

Attach a tag to a feature group.

A tag consists of a name-value pair. Tag names are unique identifiers across the whole cluster. The value of a tag can be any valid json -- primitives, arrays or json objects.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

fg.add_tag(name="example_tag", value="42")
PARAMETER DESCRIPTION
name

Name of the tag to be added.

TYPE: str

value

Value of the tag to be added.

TYPE: Any

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

append_features #

append_features(
    features: feature.Feature | list[feature.Feature],
) -> (
    FeatureGroupBase
    | FeatureGroup
    | ExternalFeatureGroup
    | SpineGroup
)

Append features to the schema of the feature group.

Example
# connect to the Feature Store
fs = ...

# define features to be inserted in the feature group
features = [
    Feature(name="id",type="int",online_type="int"),
    Feature(name="name",type="string",online_type="varchar(20)")
]

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

fg.append_features(features)
Safe append

This method appends the features to the feature group description safely. In case of failure your local metadata object will contain the correct schema.

It is only possible to append features to a feature group. Removing features is considered a breaking change. Note that feature views built on top of this feature group will not read appended feature data. Create a new feature view based on an updated query via fg.select to include the new features.

PARAMETER DESCRIPTION
features

A feature object or list thereof to append to the schema of the feature group.

TYPE: feature.Feature | list[feature.Feature]

RETURNS DESCRIPTION
FeatureGroupBase | FeatureGroup | ExternalFeatureGroup | SpineGroup

The updated feature group object.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

as_of #

as_of(
    wallclock_time: str
    | int
    | datetime
    | date
    | None = None,
    exclude_until: str
    | int
    | datetime
    | date
    | None = None,
) -> query.Query

Get Query object to retrieve all features of the group at a point in the past.

Pyspark/Spark Only

Apache HUDI exclusively supports Time Travel and Incremental Query via Spark Context

This method selects all features in the feature group and returns a Query object at the specified point in time. Optionally, commits before a specified point in time can be excluded from the query. The Query can then either be read into a Dataframe or used further to perform joins or construct a training dataset.

Reading features at a specific point in time
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

# get data at a specific point in time and show it
fg.as_of("2020-10-20 07:34:11").read().show()
Reading commits incrementally between specified points in time
fg.as_of("2020-10-20 07:34:11", exclude_until="2020-10-19 07:34:11").read().show()

The first parameter is inclusive while the latter is exclusive. That means, in order to query a single commit, you need to query that commit time and exclude everything just before the commit.

Reading only the changes from a single commit
fg.as_of("2020-10-20 07:31:38", exclude_until="2020-10-20 07:31:37").read().show()

When no wallclock_time is given, the latest state of features is returned. Optionally, commits before a specified point in time can still be excluded.

Reading the latest state of features, excluding commits before a specified point in time
fg.as_of(None, exclude_until="2020-10-20 07:31:38").read().show()

Note that the interval will be applied to all joins in the query. If you want to query different intervals for different feature groups in the query, you have to apply them in a nested fashion:

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg1 = fs.get_or_create_feature_group(...)
fg2 = fs.get_or_create_feature_group(...)

fg1.select_all().as_of("2020-10-20", exclude_until="2020-10-19")
    .join(fg2.select_all().as_of("2020-10-20", exclude_until="2020-10-19"))

If instead you apply another as_of selection after the join, all joined feature groups will be queried with this interval:

Example
fg1.select_all().as_of("2020-10-20", exclude_until="2020-10-19")  # as_of is not applied
    .join(fg2.select_all().as_of("2020-10-20", exclude_until="2020-10-15"))  # as_of is not applied
    .as_of("2020-10-20", exclude_until="2020-10-19")
Warning

This function only works for feature groups with time_travel_format='HUDI'.

Warning

Excluding commits via exclude_until is only possible within the range of the Hudi active timeline. By default, Hudi keeps the last 20 to 30 commits in the active timeline. If you need to keep a longer active timeline, you can overwrite the options: hoodie.keep.min.commits and hoodie.keep.max.commits when calling the insert() method.

PARAMETER DESCRIPTION
wallclock_time

Read data as of this point in time. Strings should be formatted in one of the following formats %Y-%m-%d, %Y-%m-%d %H, %Y-%m-%d %H:%M, or %Y-%m-%d %H:%M:%S.

TYPE: str | int | datetime | date | None DEFAULT: None

exclude_until

Exclude commits until this point in time. Strings should be formatted in one of the following formats %Y-%m-%d, %Y-%m-%d %H, %Y-%m-%d %H:%M, or %Y-%m-%d %H:%M:%S.

TYPE: str | int | datetime | date | None DEFAULT: None

RETURNS DESCRIPTION
query.Query

The query object with the applied time travel condition.

check_deprecated #

check_deprecated() -> None

Print a warning if this feature group is deprecated.

commit_delete_record #

commit_delete_record(
    delete_df: TypeVar("pyspark.sql.DataFrame"),
    write_options: dict[Any, Any] | None = None,
) -> None

Drops records present in the provided DataFrame and commits it as update to this Feature group.

This method can only be used on feature groups stored as HUDI or DELTA.

PARAMETER DESCRIPTION
delete_df

dataFrame containing records to be deleted.

TYPE: TypeVar('pyspark.sql.DataFrame')

write_options

User provided write options.

TYPE: dict[Any, Any] | None DEFAULT: None

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

commit_details #

commit_details(
    wallclock_time: str
    | int
    | datetime
    | date
    | None = None,
    limit: int | None = None,
) -> dict[str, dict[str, str]]

Retrieves commit timeline for this feature group.

This method can only be used on time travel enabled feature groups

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

commit_details = fg.commit_details()
PARAMETER DESCRIPTION
wallclock_time

Commit details as of specific point in time. Strings should be formatted in one of the following formats %Y-%m-%d, %Y-%m-%d %H, %Y-%m-%d %H:%M, %Y-%m-%d %H:%M:%S, or %Y-%m-%d %H:%M:%S.%f.

TYPE: str | int | datetime | date | None DEFAULT: None

limit

Number of commits to retrieve.

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
dict[str, dict[str, str]]

Dictionary object of commit metadata timeline, where Key is commit id and value is Dict[str, str] with key value pairs of date committed on, number of rows updated, inserted and deleted.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

hopsworks.client.exceptions.FeatureStoreException

If the feature group does not have HUDI time travel format.

compute_statistics #

compute_statistics(
    wallclock_time: str
    | int
    | datetime
    | date
    | None = None,
) -> None

Recompute the statistics for the feature group and save them to the feature store.

Statistics are only computed for data in the offline storage of the feature group.

PARAMETER DESCRIPTION
wallclock_time

If specified will recompute statistics on feature group as of specific point in time. If not specified then will compute statistics as of most recent time of this feature group. Strings should be formatted in one of the following formats %Y-%m-%d, %Y-%m-%d %H, %Y-%m-%d %H:%M, %Y-%m-%d %H:%M:%S, or %Y-%m-%d %H:%M:%S.%f.

TYPE: str | int | datetime | date | None DEFAULT: None

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

create_alert #

create_alert(
    receiver: str,
    status: Literal[
        "feature_validation_success",
        "feature_validation_warning",
        "feature_validation_failure",
        "feature_monitor_shift_undetected",
        "feature_monitor_shift_detected",
    ],
    severity: Literal["info", "warning", "critical"],
) -> FeatureGroupAlert

Create an alert for this feature group.

PARAMETER DESCRIPTION
receiver

The receiver of the alert.

TYPE: str

status

The status that will trigger the alert.

TYPE: Literal['feature_validation_success', 'feature_validation_warning', 'feature_validation_failure', 'feature_monitor_shift_undetected', 'feature_monitor_shift_detected']

severity

The severity of the alert.

TYPE: Literal['info', 'warning', 'critical']

RETURNS DESCRIPTION
FeatureGroupAlert

The created FeatureGroupAlert object.

RAISES DESCRIPTION
ValueError

If the status is not valid.

ValueError

If the severity is not valid.

hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

Example
fg.create_alert(
    receiver="email",
    status="feature_validation_failure",
    severity="critical",
)

create_feature_monitoring #

create_feature_monitoring(
    name: str,
    feature_name: str,
    description: str | None = None,
    start_date_time: int
    | str
    | datetime
    | date
    | pd.Timestamp
    | None = None,
    end_date_time: int
    | str
    | datetime
    | date
    | pd.Timestamp
    | None = None,
    cron_expression: str | None = "0 0 12 ? * * *",
) -> fmc.FeatureMonitoringConfig

Enable feature monitoring to compare statistics on snapshots of feature data over time.

Experimental

Public API is subject to change, this feature is not suitable for production use-cases.

Example
# fetch feature group
fg = fs.get_feature_group(name="my_feature_group", version=1)

# enable feature monitoring
my_config = fg.create_feature_monitoring(
    name="my_monitoring_config",
    feature_name="my_feature",
    description="my monitoring config description",
    cron_expression="0 0 12 ? * * *",
).with_detection_window(
    # Data inserted in the last day
    time_offset="1d",
    window_length="1d",
).with_reference_window(
    # Data inserted last week on the same day
    time_offset="1w1d",
    window_length="1d",
).compare_on(
    metric="mean",
    threshold=0.5,
).save()
PARAMETER DESCRIPTION
name

Name of the feature monitoring configuration. The name must be unique for all configurations attached to the feature group.

TYPE: str

feature_name

Name of the feature to monitor.

TYPE: str

description

Description of the feature monitoring configuration.

TYPE: str | None DEFAULT: None

start_date_time

Start date and time from which to start computing statistics.

TYPE: int | str | datetime | date | pd.Timestamp | None DEFAULT: None

end_date_time

End date and time at which to stop computing statistics.

TYPE: int | str | datetime | date | pd.Timestamp | None DEFAULT: None

cron_expression

Cron expression to use to schedule the job. The cron expression must be in UTC and follow the Quartz specification. The default value means "every day at 12pm UTC".

TYPE: str | None DEFAULT: '0 0 12 ? * * *'

RETURNS DESCRIPTION
fmc.FeatureMonitoringConfig

Configuration with minimal information about the feature monitoring.

fmc.FeatureMonitoringConfig

Additional information are required before feature monitoring is enabled.

RAISES DESCRIPTION
hopsworks.client.exceptions.FeatureStoreException

If feature group is not registered with Hopsworks.

create_statistics_monitoring #

create_statistics_monitoring(
    name: str,
    feature_name: str | None = None,
    description: str | None = None,
    start_date_time: int
    | str
    | datetime
    | date
    | pd.Timestamp
    | None = None,
    end_date_time: int
    | str
    | datetime
    | date
    | pd.Timestamp
    | None = None,
    cron_expression: str | None = "0 0 12 ? * * *",
) -> fmc.FeatureMonitoringConfig

Run a job to compute statistics on snapshot of feature data on a schedule.

Experimental

Public API is subject to change, this feature is not suitable for production use-cases.

Example
# fetch feature group
fg = fs.get_feature_group(name="my_feature_group", version=1)

# enable statistics monitoring
my_config = fg.create_statistics_monitoring(
    name="my_config",
    start_date_time="2021-01-01 00:00:00",
    description="my description",
    cron_expression="0 0 12 ? * * *",
).with_detection_window(
    # Statistics computed on 10% of the last week of data
    time_offset="1w",
    row_percentage=0.1,
).save()
PARAMETER DESCRIPTION
name

Name of the feature monitoring configuration. The name must be unique for all configurations attached to the feature group.

TYPE: str

feature_name

Name of the feature to monitor. If not specified, statistics will be computed for all features.

TYPE: str | None DEFAULT: None

description

Description of the feature monitoring configuration.

TYPE: str | None DEFAULT: None

start_date_time

Start date and time from which to start computing statistics.

TYPE: int | str | datetime | date | pd.Timestamp | None DEFAULT: None

end_date_time

End date and time at which to stop computing statistics.

TYPE: int | str | datetime | date | pd.Timestamp | None DEFAULT: None

cron_expression

Cron expression to use to schedule the job. The cron expression must be in UTC and follow the Quartz specification. The default value means "every day at 12pm UTC".

TYPE: str | None DEFAULT: '0 0 12 ? * * *'

RETURNS DESCRIPTION
fmc.FeatureMonitoringConfig

Configuration with minimal information about the feature monitoring.

fmc.FeatureMonitoringConfig

Additional information are required before feature monitoring is enabled.

RAISES DESCRIPTION
hopsworks.client.exceptions.FeatureStoreException

If feature group is not registered with Hopsworks.

delete #

delete() -> None

Drop the entire feature group along with its feature data.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(
        name='bitcoin_price',
        version=1
        )

# delete the feature group
fg.delete()
Potentially dangerous operation

This operation drops all metadata associated with this version of the feature group and all the feature data in offline and online storage associated with it.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

delete_expectation_suite #

delete_expectation_suite() -> None

Delete the expectation suite attached to the Feature Group.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

fg.delete_expectation_suite()
RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

delete_tag #

delete_tag(name: str) -> None

Delete a tag attached to a feature group.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

fg.delete_tag("example_tag")
PARAMETER DESCRIPTION
name

Name of the tag to be removed.

TYPE: str

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

delta_vacuum #

delta_vacuum(retention_hours: int = None) -> None

Vacuum files that are no longer referenced by a Delta table and are older than the retention threshold.

This method can only be used on feature groups stored as DELTA.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

commit_details = fg.delta_vacuum(retention_hours = 168)
PARAMETER DESCRIPTION
retention_hours

User provided retention period. The default retention threshold for the files is 7 days.

TYPE: int DEFAULT: None

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

disable_ttl #

disable_ttl() -> FeatureGroup

Disable the time-to-live (TTL) configuration of the feature group.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

# Disable TTL
fg.disable_ttl()
Safe update

This method updates the TTL configuration safely. In case of failure your local metadata object will keep the old configuration.

RETURNS DESCRIPTION
FeatureGroup

The updated feature group object.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

enable_ttl #

enable_ttl(
    ttl: float | timedelta | None = None,
) -> (
    FeatureGroupBase
    | FeatureGroup
    | ExternalFeatureGroup
    | SpineGroup
)

Enable or update the time-to-live (TTL) configuration of the feature group.

If ttl is not set, the feature group will be enabled with the last TTL value being set.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

# Enable TTL with a TTL of 7 days
fg.enable_ttl(timedelta(days=7))

# Disable TTL
fg.disable_ttl()

# Enable TTL again with a TTL of 7 days
fg.enable_ttl()
Safe update

This method updates the TTL configuration safely. In case of failure your local metadata object will keep the old configuration.

PARAMETER DESCRIPTION
ttl

Optional new TTL value. Can be specified as:

  • An integer or float representing seconds
  • A timedelta object
  • None to keep current value

TYPE: float | timedelta | None DEFAULT: None

RETURNS DESCRIPTION
FeatureGroupBase | FeatureGroup | ExternalFeatureGroup | SpineGroup

The updated feature group object.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

filter #

filter(f: filter.Filter | filter.Logic) -> query.Query

Apply filter to the feature group.

Selects all features and returns the resulting Query with the applied filter.

Example
from hsfs.feature import Feature

# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

fg.filter(Feature("weekly_sales") > 1000)

If you are planning to join the filtered feature group later on with another feature group, make sure to select the filtered feature explicitly from the respective feature group:

Example
fg.filter(fg.feature1 == 1).show(10)

Composite filters require parenthesis and symbols for logical operands (e.g. &, |, ...):

Example
fg.filter((fg.feature1 == 1) | (fg.feature2 >= 2))
PARAMETER DESCRIPTION
f

Filter object.

TYPE: filter.Filter | filter.Logic

RETURNS DESCRIPTION
query.Query

The query object with the applied filter.

finalize_multi_part_insert #

finalize_multi_part_insert() -> None

Finalizes and exits the multi part insert context opened by multi_part_insert in a blocking fashion once all rows have been transmitted.

Multi part insert with manual context management

Instead of letting Python handle the entering and exiting of the multi part insert context, you can start and finalize the context manually.

feature_group = fs.get_or_create_feature_group("fg_name", version=1)

while loop:
    small_batch_df = ...
    feature_group.multi_part_insert(small_batch_df)

# IMPORTANT: finalize the multi part insert to make sure all rows
# have been transmitted
feature_group.finalize_multi_part_insert()

Note that the first call to multi_part_insert initiates the context and be sure to finalize it. The finalize_multi_part_insert is a blocking call that returns once all rows have been transmitted.

find_neighbors #

find_neighbors(
    embedding: list[int | float],
    col: str | None = None,
    k: int | None = 10,
    filter: Filter | Logic | None = None,
    options: dict | None = None,
) -> list[tuple[float, list[Any]]]

Finds the nearest neighbors for a given embedding in the vector database.

If filter is specified, or if embedding feature is stored in default project index, the number of results returned may be less than k. Try using a large value of k and extract the top k items from the results if needed.

PARAMETER DESCRIPTION
embedding

The target embedding for which neighbors are to be found.

TYPE: list[int | float]

col

The column name used to compute similarity score. Required only if there are multiple embeddings.

TYPE: str | None DEFAULT: None

k

The number of nearest neighbors to retrieve.

TYPE: int | None DEFAULT: 10

filter

A filter expression to restrict the search space.

TYPE: Filter | Logic | None DEFAULT: None

options

The options used for the request to the vector database. The keys are attribute values of the hsfs.core.opensearch.OpensearchRequestOption class.

TYPE: dict | None DEFAULT: None

RETURNS DESCRIPTION
list[tuple[float, list[Any]]]

A list of tuples representing the nearest neighbors.

list[tuple[float, list[Any]]]

Each tuple contains: (The similarity score, A list of feature values)

Example
embedding_index = EmbeddingIndex()
embedding_index.add_embedding(name="user_vector", dimension=3)
fg = fs.create_feature_group(
            name='air_quality',
            embedding_index = embedding_index,
            version=1,
            primary_key=['id1'],
            online_enabled=True,
        )
fg.insert(data)
fg.find_neighbors(
    [0.1, 0.2, 0.3],
    k=5,
)

# apply filter
fg.find_neighbors(
    [0.1, 0.2, 0.3],
    k=5,
    filter=(fg.id1 > 10) & (fg.id1 < 30)
)

get_alert #

get_alert(alert_id: int) -> FeatureGroupAlert

Get an alert for this feature group by ID.

PARAMETER DESCRIPTION
alert_id

The ID of the alert to get.

TYPE: int

RETURNS DESCRIPTION
FeatureGroupAlert

The FeatureGroupAlert object.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

Example
# Get a specific alert
alert = fg.get_alert(alert_id=1)

get_alerts #

get_alerts() -> list[FeatureGroupAlert]

Get all alerts for this feature group.

RETURNS DESCRIPTION
list[FeatureGroupAlert]

The list of FeatureGroupAlerts.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

Example
# Get all alerts
alerts = fg.get_alerts()

get_all_statistics #

get_all_statistics(
    computation_time: str
    | float
    | datetime
    | date
    | None = None,
    feature_names: list[str] | None = None,
) -> list[Statistics] | None

Returns all the statistics metadata computed before a specific time for the current feature group.

If computation_time is None, all the statistics metadata are returned.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

fg_statistics = fg.get_statistics(computation_time=None)
PARAMETER DESCRIPTION
computation_time

Date and time when statistics were computed. Strings should be formatted in one of the following formats %Y-%m-%d, %Y-%m-%d %H, %Y-%m-%d %H:%M, %Y-%m-%d %H:%M:%S, or %Y-%m-%d %H:%M:%S.%f.

TYPE: str | float | datetime | date | None DEFAULT: None

feature_names

List of feature names of which statistics are retrieved.

TYPE: list[str] | None DEFAULT: None

RETURNS DESCRIPTION
list[Statistics] | None

Statistics object or None if it does not exist.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

hopsworks.client.exceptions.FeatureStoreException

If statistics are not supported for this feature group type.

get_all_validation_reports #

get_all_validation_reports(
    ge_type: bool = HAS_GREAT_EXPECTATIONS,
) -> list[
    ValidationReport
    | great_expectations.core.ExpectationSuiteValidationResult
]

Return the latest validation report attached to the feature group if it exists.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

val_reports = fg.get_all_validation_reports()
PARAMETER DESCRIPTION
ge_type

If True returns a native Great Expectation type, Hopsworks custom type otherwise. Conversion can be performed via the to_ge_type() method on hopsworks type.

TYPE: bool DEFAULT: HAS_GREAT_EXPECTATIONS

RETURNS DESCRIPTION
list[ValidationReport | great_expectations.core.ExpectationSuiteValidationResult]

All validation reports attached to the feature group.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

hopsworks.client.exceptions.FeatureStoreException

If feature group is not registered with Hopsworks.

get_complex_features #

get_complex_features() -> list[str]

Returns the names of all features with a complex data type in this feature group.

Example
complex_dtype_features = fg.get_complex_features()

get_expectation_suite #

get_expectation_suite(
    ge_type: bool = HAS_GREAT_EXPECTATIONS,
) -> (
    hsfs.expectation_suite.ExpectationSuite
    | great_expectations.core.ExpectationSuite
    | None
)

Return the expectation suite attached to the feature group if it exists.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

exp_suite = fg.get_expectation_suite()
PARAMETER DESCRIPTION
ge_type

If True returns a native Great Expectation type, Hopsworks custom type otherwise. Conversion can be performed via the to_ge_type() method on hopsworks type.

TYPE: bool DEFAULT: HAS_GREAT_EXPECTATIONS

RETURNS DESCRIPTION
hsfs.expectation_suite.ExpectationSuite | great_expectations.core.ExpectationSuite | None

The expectation suite attached to the feature group or None if it does not exist.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

get_feature #

get_feature(name: str) -> feature.Feature | None

Retrieve a Feature object from the schema of the feature group.

There are several ways to access features of a feature group:

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

# get Feature instanse
fg.feature1
fg["feature1"]
fg.get_feature("feature1")
Note

Attribute access to features works only for non-reserved names. For example, features named id or name will not be accessible via fg.name, instead this will return the name of the feature group itself. Fall back on using the get_feature method.

PARAMETER DESCRIPTION
name

The name of the feature to retrieve

TYPE: str

RETURNS DESCRIPTION
feature.Feature | None

The feature object or None if it does not exist.

get_feature_monitoring_configs #

get_feature_monitoring_configs(
    name: str | None = None,
    feature_name: str | None = None,
    config_id: int | None = None,
) -> (
    fmc.FeatureMonitoringConfig
    | list[fmc.FeatureMonitoringConfig]
    | None
)

Fetch all feature monitoring configs attached to the feature group, or fetch by name or feature name only.

If no arguments are provided the method will return all feature monitoring configs attached to the feature group, meaning all feature monitoring configs that are attach to a feature in the feature group. If you wish to fetch a single config, provide the its name. If you wish to fetch all configs attached to a particular feature, provide the feature name.

Example
# fetch your feature group
fg = fs.get_feature_group(name="my_feature_group", version=1)

# fetch all feature monitoring configs attached to the feature group
fm_configs = fg.get_feature_monitoring_configs()

# fetch a single feature monitoring config by name
fm_config = fg.get_feature_monitoring_configs(name="my_config")

# fetch all feature monitoring configs attached to a particular feature
fm_configs = fg.get_feature_monitoring_configs(feature_name="my_feature")

# fetch a single feature monitoring config with a given id
fm_config = fg.get_feature_monitoring_configs(config_id=1)
PARAMETER DESCRIPTION
name

If provided fetch only the feature monitoring config with the given name.

TYPE: str | None DEFAULT: None

feature_name

If provided, fetch only configs attached to a particular feature.

TYPE: str | None DEFAULT: None

config_id

If provided, fetch only the feature monitoring config with the given id.

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
fmc.FeatureMonitoringConfig | list[fmc.FeatureMonitoringConfig] | None

A list of feature monitoring configs.

fmc.FeatureMonitoringConfig | list[fmc.FeatureMonitoringConfig] | None

If name is provided, returns either a single config or None if not found.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

hopsworks.client.exceptions.FeatureStoreException

If feature group is not registered with Hopsworks.

ValueError

If both name and feature_name are provided.

TypeError

If name or feature_name are not string or None.

get_feature_monitoring_history #

get_feature_monitoring_history(
    config_name: str | None = None,
    config_id: int | None = None,
    start_time: int | str | datetime | date | None = None,
    end_time: int | str | datetime | date | None = None,
    with_statistics: bool = True,
) -> list[fmr.FeatureMonitoringResult]

Fetch feature monitoring history for a given feature monitoring config.

Example
# fetch your feature group
fg = fs.get_feature_group(name="my_feature_group", version=1)

# fetch feature monitoring history for a given feature monitoring config
fm_history = fg.get_feature_monitoring_history(
    config_name="my_config",
    start_time="2020-01-01",
)

# fetch feature monitoring history for a given feature monitoring config id
fm_history = fg.get_feature_monitoring_history(
    config_id=1,
    start_time=datetime.now() - timedelta(weeks=2),
    end_time=datetime.now() - timedelta(weeks=1),
    with_statistics=False,
)
PARAMETER DESCRIPTION
config_name

The name of the feature monitoring config to fetch history for.

TYPE: str | None DEFAULT: None

config_id

The id of the feature monitoring config to fetch history for.

TYPE: int | None DEFAULT: None

start_time

The start date of the feature monitoring history to fetch.

TYPE: int | str | datetime | date | None DEFAULT: None

end_time

The end date of the feature monitoring history to fetch.

TYPE: int | str | datetime | date | None DEFAULT: None

with_statistics

Whether to include statistics in the feature monitoring history. If False, only metadata about the monitoring will be fetched.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
list[fmr.FeatureMonitoringResult]

A list of feature monitoring results containing the monitoring metadata as well as the computed statistics for the detection and reference window if requested.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

hopsworks.client.exceptions.FeatureStoreException

If feature group is not registered with Hopsworks.

ValueError

If both config_name and config_id are provided.

TypeError

If config_name or config_id are not respectively string, int or None.

get_fg_name #

get_fg_name() -> str

Returns the full feature group name, that is, its base name combined with its version.

get_generated_feature_groups #

get_generated_feature_groups() -> (
    explicit_provenance.Links | None
)

Get the generated feature groups using this feature group, based on explicit provenance.

These feature groups can be accessible or inaccessible. Explicit provenance does not track deleted generated feature group links, so deleted will always be empty. For inaccessible feature groups, only a minimal information is returned.

RETURNS DESCRIPTION
explicit_provenance.Links | None

Object containing the section of provenance graph requested or None if it does not exist.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

get_generated_feature_views #

get_generated_feature_views() -> (
    explicit_provenance.Links | None
)

Get the generated feature view using this feature group, based on explicit provenance.

These feature views can be accessible or inaccessible. Explicit provenance does not track deleted generated feature view links, so deleted will always be empty. For inaccessible feature views, only a minimal information is returned.

RETURNS DESCRIPTION
explicit_provenance.Links | None

Object containing the section of provenance graph requested or None if it does not exist.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

get_latest_online_ingestion #

get_latest_online_ingestion() -> (
    online_ingestion.OnlineIngestion
)

Retrieve the latest online ingestion operation for this feature group.

This method fetches metadata about the most recent online ingestion job, including its status and progress, if available.

RETURNS DESCRIPTION
online_ingestion.OnlineIngestion

The latest OnlineIngestion object for this feature group.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

Example
fg = fs.get_feature_group("my_fg", 1)
latest_ingestion = fg.get_latest_online_ingestion()

get_latest_validation_report #

get_latest_validation_report(
    ge_type: bool = HAS_GREAT_EXPECTATIONS,
) -> (
    ValidationReport
    | great_expectations.core.ExpectationSuiteValidationResult
    | None
)

Return the latest validation report attached to the Feature Group if it exists.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

latest_val_report = fg.get_latest_validation_report()
PARAMETER DESCRIPTION
ge_type

If True returns a native Great Expectation type, Hopsworks custom type otherwise. Conversion can be performed via the to_ge_type() method on hopsworks type.

TYPE: bool DEFAULT: HAS_GREAT_EXPECTATIONS

RETURNS DESCRIPTION
ValidationReport | great_expectations.core.ExpectationSuiteValidationResult | None

The latest validation report attached to the Feature Group or None if it does not exist.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

get_online_ingestion #

get_online_ingestion(
    id,
) -> online_ingestion.OnlineIngestion

Retrieve a specific online ingestion operation by its ID for this feature group.

This method fetches metadata about a particular online ingestion job, including its status and progress, if available.

PARAMETER DESCRIPTION
id

The unique identifier of the online ingestion operation.

RETURNS DESCRIPTION
online_ingestion.OnlineIngestion

The OnlineIngestion object with the specified ID.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

Example
fg = fs.get_feature_group("my_fg", 1)
ingestion = fg.get_online_ingestion(123)

get_parent_feature_groups #

get_parent_feature_groups() -> (
    explicit_provenance.Links | None
)

Get the parents of this feature group, based on explicit provenance.

Parents are feature groups or external feature groups. These feature groups can be accessible, deleted or inaccessible. For deleted and inaccessible feature groups, only minimal information is returned.

RETURNS DESCRIPTION
explicit_provenance.Links | None

Object containing the section of provenance graph requested or None if it does not exist.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

get_statistics #

get_statistics(
    computation_time: str
    | float
    | datetime
    | date
    | None = None,
    feature_names: list[str] | None = None,
) -> Statistics | None

Returns the statistics computed at a specific time for the current feature group.

If computation_time is None, the most recent statistics are returned.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

fg_statistics = fg.get_statistics(computation_time=None)
PARAMETER DESCRIPTION
computation_time

Date and time when statistics were computed. Strings should be formatted in one of the following formats: %Y-%m-%d, %Y-%m-%d %H, %Y-%m-%d %H:%M, %Y-%m-%d %H:%M:%S, or %Y-%m-%d %H:%M:%S.%f.

TYPE: str | float | datetime | date | None DEFAULT: None

feature_names

List of feature names of which statistics are retrieved.

TYPE: list[str] | None DEFAULT: None

RETURNS DESCRIPTION
Statistics | None

Statistics. Statistics object or None if it does not exist.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

hopsworks.client.exceptions.FeatureStoreException

If statistics are not supported for this feature group type.

get_statistics_by_commit_window #

get_statistics_by_commit_window(
    from_commit_time: str
    | int
    | datetime
    | date
    | None = None,
    to_commit_time: str
    | int
    | datetime
    | date
    | None = None,
    feature_names: list[str] | None = None,
) -> Statistics | list[Statistics] | None

Returns the statistics computed on a specific commit window for this feature group.

If time travel is not enabled, it raises an exception.

If from_commit_time is None, the commit window starts from the first commit. If to_commit_time is None, the commit window ends at the last commit.

Example
# connect to the Feature Store
fs = ...
# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg_statistics = fg.get_statistics_by_commit_window(from_commit_time=None, to_commit_time=None)
PARAMETER DESCRIPTION
to_commit_time

Date and time of the last commit of the window. Defaults to None. Strings should be formatted in one of the following formats %Y-%m-%d, %Y-%m-%d %H, %Y-%m-%d %H:%M, %Y-%m-%d %H:%M:%S, or %Y-%m-%d %H:%M:%S.%f.

TYPE: str | int | datetime | date | None DEFAULT: None

from_commit_time

Date and time of the first commit of the window. Defaults to None. Strings should be formatted in one of the following formats %Y-%m-%d, %Y-%m-%d %H, %Y-%m-%d %H:%M, %Y-%m-%d %H:%M:%S, or %Y-%m-%d %H:%M:%S.%f.

TYPE: str | int | datetime | date | None DEFAULT: None

feature_names

List of feature names of which statistics are retrieved.

TYPE: list[str] | None DEFAULT: None

RETURNS DESCRIPTION
Statistics | list[Statistics] | None

Statistics object or None if it does not exist.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

get_storage_connector #

get_storage_connector() -> sc.StorageConnector | None

Get the storage connector using this feature group, based on explicit provenance.

Only the accessible storage connector is returned. For more items use the base method, see get_storage_connector_provenance.

RETURNS DESCRIPTION
sc.StorageConnector | None

Storage connector or None if it does not exist.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

get_storage_connector_provenance #

get_storage_connector_provenance() -> (
    explicit_provenance.Links | None
)

Get the parents of this feature group, based on explicit provenance.

Parents are storage connectors. These storage connector can be accessible, deleted or inaccessible. For deleted and inaccessible storage connector, only minimal information is returned.

RETURNS DESCRIPTION
explicit_provenance.Links | None

The storage connector used to generate this feature group or None if it does not exist.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

get_tag #

get_tag(name: str) -> tag.Tag | None

Get the tags of a feature group.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

fg_tag_value = fg.get_tag("example_tag")
PARAMETER DESCRIPTION
name

Name of the tag to get.

TYPE: str

RETURNS DESCRIPTION
tag.Tag | None

Tag value or None if it does not exist.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

get_tags #

get_tags() -> dict[str, tag.Tag]

Retrieves all tags attached to a feature group.

RETURNS DESCRIPTION
dict[str, tag.Tag]

The dictionary of tags.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

get_validation_history #

get_validation_history(
    expectation_id: int,
    start_validation_time: str
    | int
    | datetime
    | date
    | None = None,
    end_validation_time: str
    | int
    | datetime
    | date
    | None = None,
    filter_by: list[
        Literal[
            "INGESTED",
            "REJECTED",
            "FG_DATA",
            "EXPERIMENT",
            "UNKNOWN",
        ]
    ] = None,
    ge_type: bool = HAS_GREAT_EXPECTATIONS,
) -> (
    list[ValidationResult]
    | list[
        great_expectations.core.ExpectationValidationResult
    ]
)

Fetch validation history of an Expectation specified by its id.

Example
validation_history = fg.get_validation_history(
    expectation_id=1,
    filter_by=["REJECTED", "UNKNOWN"],
    start_validation_time="2022-01-01 00:00:00",
    end_validation_time=datetime.datetime.now(),
    ge_type=False,
)
PARAMETER DESCRIPTION
expectation_id

ID of the Expectation for which to fetch the validation history.

TYPE: int

filter_by

List of ingestion_result category to keep.

TYPE: list[Literal['INGESTED', 'REJECTED', 'FG_DATA', 'EXPERIMENT', 'UNKNOWN']] DEFAULT: None

start_validation_time

Fetch only validation result posterior to the provided time, inclusive. Supported format include timestamps(int), datetime, date or string formatted to be datutils parsable. See examples above.

TYPE: str | int | datetime | date | None DEFAULT: None

end_validation_time

Fetch only validation result prior to the provided time, inclusive. Supported format include timestamps(int), datetime, date or string formatted to be datutils parsable. See examples above.

TYPE: str | int | datetime | date | None DEFAULT: None

ge_type

If True returns a native Great Expectation type, Hopsworks custom type otherwise. Conversion can be performed via the to_ge_type() method on hopsworks type.

TYPE: bool DEFAULT: HAS_GREAT_EXPECTATIONS

RETURNS DESCRIPTION
list[ValidationResult] | list[great_expectations.core.ExpectationValidationResult]

A list of validation result connected to the expectation_id

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

insert #

insert(
    features: pd.DataFrame
    | pl.DataFrame
    | TypeVar("pyspark.sql.DataFrame")
    | TypeVar("pyspark.RDD")
    | np.ndarray
    | list[list],
    overwrite: bool = False,
    operation: Literal["insert", "upsert"] = "upsert",
    storage: str | None = None,
    write_options: dict[str, Any] | None = None,
    validation_options: dict[str, Any] | None = None,
    wait: bool = False,
    transformation_context: dict[str, Any] = None,
    transform: bool = True,
) -> tuple[Job | None, ValidationReport | None]

Persist the metadata and materialize the feature group to the feature store or insert data from a dataframe into the existing feature group.

Incrementally insert data to a feature group or overwrite all data contained in the feature group. By default, the data is inserted into the offline storage as well as the online storage if the feature group is online_enabled=True.

The features dataframe can be a Spark DataFrame or RDD, a Pandas DataFrame, a Polars DataFrame or a two-dimensional Numpy array or a two-dimensional Python nested list. If statistics are enabled, statistics are recomputed for the entire feature group. If feature group's time travel format is HUDI then operation argument can be either insert or upsert.

If feature group doesn't exist the insert method will create the necessary metadata the first time it is invoked and writes the specified features dataframe as feature group to the online/offline feature store.

Changed in 3.3.0

insert and save methods are now async by default in non-spark clients. To achieve the old behaviour, set wait argument to True.

Upsert new feature data with time travel format HUDI
# connect to the Feature Store
fs = ...

fg = fs.get_or_create_feature_group(
    name='bitcoin_price',
    description='Bitcoin price aggregated for days',
    version=1,
    primary_key=['unix'],
    online_enabled=True,
    event_time='unix'
)

fg.insert(df_bitcoin_processed)
Async insert
# connect to the Feature Store
fs = ...

fg1 = fs.get_or_create_feature_group(
    name='feature_group_name1',
    description='Description of the first FG',
    version=1,
    primary_key=['unix'],
    online_enabled=True,
    event_time='unix'
)
# async insertion in order not to wait till finish of the job
fg.insert(df_for_fg1, write_options={"wait_for_job" : False})

fg2 = fs.get_or_create_feature_group(
    name='feature_group_name2',
    description='Description of the second FG',
    version=1,
    primary_key=['unix'],
    online_enabled=True,
    event_time='unix'
)
fg.insert(df_for_fg2)
PARAMETER DESCRIPTION
features

Features to be saved.

TYPE: pd.DataFrame | pl.DataFrame | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | np.ndarray | list[list]

overwrite

Drop all data in the feature group before inserting new data. This does not affect metadata.

TYPE: bool DEFAULT: False

operation

Apache Hudi operation type "insert" or "upsert".

TYPE: Literal['insert', 'upsert'] DEFAULT: 'upsert'

storage

Overwrite default behaviour, write to offline storage only with "offline" or online only with "online". If the streaming APIs are enabled, specifying the storage option is not supported.

TYPE: str | None DEFAULT: None

write_options

Additional write options as key-value pairs.

When using the python engine, write_options can contain the following entries:

  • key spark and value an object of type hsfs.core.job_configuration.JobConfiguration to configure the Hopsworks Job used to write data into the feature group.
  • key wait_for_job and value True or False to configure whether or not to the insert call should return only after the Hopsworks Job has finished. By default it waits.
  • key wait_for_online_ingestion and value True or False to configure whether or not to the save call should return only after the Hopsworks online ingestion has finished. By default it does not wait.
  • key start_offline_backfill and value True or False to configure whether or not to start the materialization job to write data to the offline storage. start_offline_backfill is deprecated. Use start_offline_materialization instead.
  • key start_offline_materialization and value True or False to configure whether or not to start the materialization job to write data to the offline storage. By default the materialization job gets started immediately.
  • key kafka_producer_config and value an object of type properties used to configure the Kafka client. To optimize for throughput in high latency connection consider changing producer properties.
  • key internal_kafka and value True or False in case you established connectivity from you Python environment to the internal advertised listeners of the Hopsworks Kafka Cluster. Defaults to False and will use external listeners when connecting from outside of Hopsworks.
  • key delta.enableChangeDataFeed set to a string value of true or false to enable or disable cdf operations on the feature group delta table. Set to true by default on Feature Group creation.

TYPE: dict[str, Any] | None DEFAULT: None

validation_options

Additional validation options as key-value pairs.

  • key run_validation boolean value, set to False to skip validation temporarily on ingestion.
  • key save_report boolean value, set to False to skip upload of the validation report to Hopsworks.
  • key ge_validate_kwargs a dictionary containing kwargs for the validate method of Great Expectations.
  • key fetch_expectation_suite a boolean value, by default True, to control whether the expectation suite of the feature group should be fetched before every insert.
  • key schema_validation boolean value, set to True to validate the schema.

TYPE: dict[str, Any] | None DEFAULT: None

wait

Wait for job and online ingestion to finish before returning. Shortcut for write_options {"wait_for_job": False, "wait_for_online_ingestion": False}.

TYPE: bool DEFAULT: False

transformation_context

A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. These variables must be explicitly defined as parameters in the transformation function to be accessible during execution.

TYPE: dict[str, Any] DEFAULT: None

transform

When set to False, the dataframe is inserted without applying any on-demand transformations In this case, all required on-demand features must already exist in the provided dataframe.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
Job

The job information if python engine is used.

TYPE: Job | None

ValidationReport

The validation report if validation is enabled.

TYPE: ValidationReport | None

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

e.g., fail to create feature group, dataframe schema does not match existing feature group schema, etc.

hsfs.client.exceptions.DataValidationException

If data validation fails and the expectation suite validation_ingestion_policy is set to STRICT. Data is NOT ingested.

insert_stream #

insert_stream(
    features: TypeVar("pyspark.sql.DataFrame"),
    query_name: str | None = None,
    output_mode: Literal[
        "append", "complete", "update"
    ] = "append",
    await_termination: bool = False,
    timeout: int | None = None,
    checkpoint_dir: str | None = None,
    write_options: dict[str, Any] | None = None,
    transformation_context: dict[str, Any] = None,
    transform: bool = True,
) -> TypeVar("StreamingQuery")

Ingest a Spark Structured Streaming Dataframe to the online feature store.

This method creates a long running Spark Streaming Query, you can control the termination of the query through the arguments.

It is possible to stop the returned query with the .stop() and check its status with .isActive.

To get a list of all active queries, use:

sqm = spark.streams

# get the list of active streaming queries
[q.name for q in sqm.active]
Engine Support

Spark only

Stream ingestion using Pandas/Python as engine is currently not supported. Python/Pandas has no notion of streaming.

Data Validation Support

insert_stream does not perform any data validation using Great Expectations even when a expectation suite is attached.

PARAMETER DESCRIPTION
features

Features in Streaming Dataframe to be saved.

TYPE: TypeVar('pyspark.sql.DataFrame')

query_name

It is possible to optionally specify a name for the query to make it easier to recognise in the Spark UI.

TYPE: str | None DEFAULT: None

output_mode

Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.

  • "append": Only the new rows in the streaming DataFrame/Dataset will be written to the sink.
  • "complete": All the rows in the streaming DataFrame/Dataset will be written to the sink every time there is some update.
  • "update": Only the rows that were updated in the streaming DataFrame/Dataset will be written to the sink every time there are some updates. If the query doesn't contain aggregations, it will be equivalent to append mode.

TYPE: Literal['append', 'complete', 'update'] DEFAULT: 'append'

await_termination

Waits for the termination of this query, either by query.stop() or by an exception. If the query has terminated with an exception, then the exception will be thrown. If timeout is set, it returns whether the query has terminated or not within the timeout seconds.

TYPE: bool DEFAULT: False

timeout

Only relevant in combination with await_termination=True.

TYPE: int | None DEFAULT: None

checkpoint_dir

Checkpoint directory location. This will be used to as a reference to from where to resume the streaming job. If None then hsfs will construct as "insert_stream_" + online_topic_name.

TYPE: str | None DEFAULT: None

write_options

Additional write options for Spark as key-value pairs.

TYPE: dict[str, Any] | None DEFAULT: None

transformation_context

A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. These variables must be explicitly defined as parameters in the transformation function to be accessible during execution.

TYPE: dict[str, Any] DEFAULT: None

transform

When set to False, the dataframe is inserted without applying any on-demand transformations. In this case, all required on-demand features must already exist in the provided dataframe.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
TypeVar('StreamingQuery')

Spark Structured Streaming Query object.

json #

json() -> str

Get specific Feature Group metadata in json format.

Example
fg.json()

multi_part_insert #

multi_part_insert(
    features: pd.DataFrame
    | pl.DataFrame
    | TypeVar("pyspark.sql.DataFrame")
    | TypeVar("pyspark.RDD")
    | np.ndarray
    | list[list]
    | None = None,
    overwrite: bool = False,
    operation: Literal["insert", "upsert"] = "upsert",
    storage: str | None = None,
    write_options: dict[str, Any] | None = None,
    validation_options: dict[str, Any] | None = None,
    transformation_context: dict[str, Any] = None,
    transform: bool = True,
) -> (
    tuple[Job | None, ValidationReport | None]
    | feature_group_writer.FeatureGroupWriter
)

Get FeatureGroupWriter for optimized multi part inserts or call this method to start manual multi part optimized inserts.

In use cases where very small batches (1 to 1000) rows per Dataframe need to be written to the feature store repeatedly, it might be inefficient to use the standard feature_group.insert() method as it performs some background actions to update the metadata of the feature group object first.

For these cases, the feature group provides the multi_part_insert API, which is optimized for writing many small Dataframes after another.

There are two ways to use this API:

Python Context Manager

Using the Python with syntax you can acquire a FeatureGroupWriter object that implements the same multi_part_insert API.

feature_group = fs.get_or_create_feature_group("fg_name", version=1)

with feature_group.multi_part_insert() as writer:
    # run inserts in a loop:
    while loop:
        small_batch_df = ...
        writer.insert(small_batch_df)

The writer batches the small Dataframes and transmits them to Hopsworks efficiently. When exiting the context, the feature group writer is sure to exit only once all the rows have been transmitted.

Multi part insert with manual context management

Instead of letting Python handle the entering and exiting of the multi part insert context, you can start and finalize the context manually.

feature_group = fs.get_or_create_feature_group("fg_name", version=1)

while loop:
    small_batch_df = ...
    feature_group.multi_part_insert(small_batch_df)

# IMPORTANT: finalize the multi part insert to make sure all rows
# have been transmitted
feature_group.finalize_multi_part_insert()

Note that the first call to multi_part_insert initiates the context and be sure to finalize it. The finalize_multi_part_insert is a blocking call that returns once all rows have been transmitted.

Once you are done with the multi part insert, it is good practice to start the materialization job in order to write the data to the offline storage:

feature_group.materialization_job.run(await_termination=True)
PARAMETER DESCRIPTION
features

Features to be saved.

TYPE: pd.DataFrame | pl.DataFrame | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | np.ndarray | list[list] | None DEFAULT: None

overwrite

Drop all data in the feature group before inserting new data. This does not affect metadata.

TYPE: bool DEFAULT: False

operation

Apache Hudi operation type "insert" or "upsert".

TYPE: Literal['insert', 'upsert'] DEFAULT: 'upsert'

storage

Overwrite default behaviour, write to offline storage only with "offline" or online only with "online".

TYPE: str | None DEFAULT: None

write_options

Additional write options as key-value pairs.

When using the python engine, write_options can contain the following entries:

  • key spark and value an object of type hsfs.core.job_configuration.JobConfiguration to configure the Hopsworks Job used to write data into the feature group.
  • key wait_for_job and value True or False to configure whether or not to the insert call should return only after the Hopsworks Job has finished. By default it waits.
  • key start_offline_backfill and value True or False to configure whether or not to start the materialization job to write data to the offline storage. start_offline_backfill is deprecated. Use start_offline_materialization instead.
  • key start_offline_materialization and value True or False to configure whether or not to start the materialization job to write data to the offline storage. By default the materialization job does not get started automatically for multi part inserts.
  • key kafka_producer_config and value an object of type properties used to configure the Kafka client. To optimize for throughput in high latency connection consider changing producer properties.
  • key internal_kafka and value True or False in case you established connectivity from you Python environment to the internal advertised listeners of the Hopsworks Kafka Cluster. Defaults to False and will use external listeners when connecting from outside of Hopsworks.

TYPE: dict[str, Any] | None DEFAULT: None

validation_options

Additional validation options as key-value pairs.

  • key run_validation boolean value, set to False to skip validation temporarily on ingestion.
  • key save_report boolean value, set to False to skip upload of the validation report to Hopsworks.
  • key ge_validate_kwargs a dictionary containing kwargs for the validate method of Great Expectations.
  • key fetch_expectation_suite a boolean value, by default False for multi part inserts, to control whether the expectation suite of the feature group should be fetched before every insert.

TYPE: dict[str, Any] | None DEFAULT: None

transformation_context

A dictionary mapping variable names to objects that will be provided as contextual information to the transformation function at runtime. These variables must be explicitly defined as parameters in the transformation function to be accessible during execution.

TYPE: dict[str, Any] DEFAULT: None

transform

When set to False, the dataframe is inserted without applying any on-demand transformations. In this case, all required on-demand features must already exist in the provided dataframe.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
tuple[Job | None, ValidationReport | None] | feature_group_writer.FeatureGroupWriter

One of:

tuple[Job | None, ValidationReport | None] | feature_group_writer.FeatureGroupWriter
  • A tuple with job information if python engine is used and the validation report if validation is enabled, or
tuple[Job | None, ValidationReport | None] | feature_group_writer.FeatureGroupWriter
  • FeatureGroupWriter when used as a context manager with Python with statement.

read #

read(
    wallclock_time: str
    | int
    | datetime
    | date
    | None = None,
    online: bool = False,
    dataframe_type: Literal[
        "default",
        "spark",
        "pandas",
        "polars",
        "numpy",
        "python",
    ] = "default",
    read_options: dict | None = None,
) -> (
    pd.DataFrame
    | np.ndarray
    | list[list[Any]]
    | TypeVar("pyspark.sql.DataFrame")
    | TypeVar("pyspark.RDD")
    | pl.DataFrame
)

Read the feature group into a dataframe.

Reads the feature group by default from the offline storage as Spark DataFrame on Hopsworks and Databricks, and as Pandas dataframe on AWS Sagemaker and pure Python environments.

Set online to True to read from the online storage, or change dataframe_type to read as a different format.

Reading feature group as of latest state
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)
fg.read()
Reading feature group as of specific point in time:
fg = fs.get_or_create_feature_group(...)
fg.read("2020-10-20 07:34:11")
PARAMETER DESCRIPTION
wallclock_time

If specified, retrieves feature group as of specific point in time. If not specified, returns as of most recent time. Strings should be formatted in one of the following formats %Y-%m-%d, %Y-%m-%d %H, %Y-%m-%d %H:%M, %Y-%m-%d %H:%M:%S, or %Y-%m-%d %H:%M:%S.%f.

TYPE: str | int | datetime | date | None DEFAULT: None

online

If True, read from online feature store.

TYPE: bool DEFAULT: False

dataframe_type

The type of the returned dataframe. By default, maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine.

TYPE: Literal['default', 'spark', 'pandas', 'polars', 'numpy', 'python'] DEFAULT: 'default'

read_options

Additional options as key/value pairs to pass to the execution engine.

For spark engine: Dictionary of read options for Spark.

For python engine: - key "arrow_flight_config" to pass a dictionary of arrow flight configurations. For example: {"arrow_flight_config": {"timeout": 900}}. - key "pandas_types" and value True to retrieve columns as Pandas nullable types rather than numpy/object(string) types (experimental).

TYPE: dict | None DEFAULT: None

RETURNS DESCRIPTION
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame

One of the following:

pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame
  • DataFrame: The spark dataframe containing the feature data.
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame
  • pyspark.DataFrame: A Spark DataFrame.
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame
  • pandas.DataFrame: A Pandas DataFrame.
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame
  • polars.DataFrame: A Polars DataFrame.
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame
  • numpy.ndarray: A two-dimensional Numpy array.
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame
  • list: A two-dimensional Python list.
RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

No data is available for feature group with this commit date, if time travel enabled.

read_changes #

read_changes(
    start_wallclock_time: str | int | datetime | date,
    end_wallclock_time: str | int | datetime | date,
    read_options: dict | None = None,
) -> (
    pd.DataFrame
    | np.ndarray
    | list[list[Any]]
    | TypeVar("pyspark.sql.DataFrame")
    | TypeVar("pyspark.RDD")
    | pl.DataFrame
)

Reads updates of this feature that occurred between specified points in time.

Deprecated

read_changes method is deprecated. Use as_of(end_wallclock_time, exclude_until=start_wallclock_time).read(read_options=read_options) instead.

Pyspark/Spark Only

Apache HUDI exclusively supports Time Travel and Incremental Query via Spark Context.

Warning

This function only works for feature groups with time_travel_format='HUDI'.

PARAMETER DESCRIPTION
start_wallclock_time

Start time of the time travel query. Strings should be formatted in one of the following formats %Y-%m-%d, %Y-%m-%d %H, %Y-%m-%d %H:%M, %Y-%m-%d %H:%M:%S, or %Y-%m-%d %H:%M:%S.%f.

TYPE: str | int | datetime | date

end_wallclock_time

End time of the time travel query. Strings should be formatted in one of the following formats %Y-%m-%d, %Y-%m-%d %H, %Y-%m-%d %H:%M, %Y-%m-%d %H:%M:%S, or %Y-%m-%d %H:%M:%S.%f.

TYPE: str | int | datetime | date

read_options

Additional options as key/value pairs to pass to the execution engine. For spark engine, it is a dictionary of read options for Spark.

TYPE: dict | None DEFAULT: None

RETURNS DESCRIPTION
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pl.DataFrame

The spark dataframe containing the incremental changes of feature data.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

No data is available for feature group with this commit date.

hopsworks.client.exceptions.FeatureStoreException

If the feature group does not have HUDI time travel format.

save #

save(
    features: pd.DataFrame
    | pl.DataFrame
    | TypeVar("pyspark.sql.DataFrame")
    | TypeVar("pyspark.RDD")
    | np.ndarray
    | list[feature.Feature] = None,
    write_options: dict[str, Any] | None = None,
    validation_options: dict[str, Any] | None = None,
    wait: bool = False,
) -> tuple[
    Job | None,
    great_expectations.core.ExpectationSuiteValidationResult
    | None,
]

Persist the metadata and materialize the feature group to the feature store.

Changed in 3.3.0

insert and save methods are now async by default in non-spark clients. To achieve the old behaviour, set wait argument to True.

Calling save creates the metadata for the feature group in the feature store. If a Pandas DataFrame, Polars DatFrame, RDD or Ndarray is provided, the data is written to the online/offline feature store as specified. By default, this writes the feature group to the offline storage, and if online_enabled for the feature group, also to the online feature store. The features dataframe can be a Spark DataFrame or RDD, a Pandas DataFrame, or a two-dimensional Numpy array or a two-dimensional Python nested list.

PARAMETER DESCRIPTION
features

Features to be saved. This argument is optional if the feature list is provided in the create_feature_group or in the get_or_create_feature_group method invokation.

TYPE: pd.DataFrame | pl.DataFrame | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | np.ndarray | list[feature.Feature] DEFAULT: None

write_options

Additional write options as key-value pairs.

When using the python engine, write_options can contain the following entries:

  • key spark and value an object of type hsfs.core.job_configuration.JobConfiguration to configure the Hopsworks Job used to write data into the feature group.
  • key wait_for_job and value True or False to configure whether or not to the save call should return only after the Hopsworks Job has finished. By default it does not wait.
  • key wait_for_online_ingestion and value True or False to configure whether or not to the save call should return only after the Hopsworks online ingestion has finished. By default it does not wait.
  • key start_offline_backfill and value True or False to configure whether or not to start the materialization job to write data to the offline storage. start_offline_backfill is deprecated. Use start_offline_materialization instead.
  • key start_offline_materialization and value True or False to configure whether or not to start the materialization job to write data to the offline storage. By default the materialization job gets started immediately.
  • key kafka_producer_config and value an object of type properties used to configure the Kafka client. To optimize for throughput in high latency connection, consider changing the producer properties.
  • key internal_kafka and value True or False in case you established connectivity from you Python environment to the internal advertised listeners of the Hopsworks Kafka Cluster. Defaults to False and will use external listeners when connecting from outside of Hopsworks.
  • key delta.enableChangeDataFeed set to a string value of true or false to enable or disable cdf operations on the feature group delta table. Set to true by default on Feature Group creation.

TYPE: dict[str, Any] | None DEFAULT: None

validation_options

Additional validation options as key-value pairs.

  • key run_validation boolean value, set to False to skip validation temporarily on ingestion.
  • key save_report boolean value, set to False to skip upload of the validation report to Hopsworks.
  • key ge_validate_kwargs a dictionary containing kwargs for the validate method of Great Expectations.
  • key schema_validation boolean value, set to True to validate the schema.

TYPE: dict[str, Any] | None DEFAULT: None

wait

Wait for job and online ingestion to finish before returning. Shortcut for write_options {"wait_for_job": False, "wait_for_online_ingestion": False}.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
tuple[Job | None, great_expectations.core.ExpectationSuiteValidationResult | None]

When using the python engine, it returns the Hopsworks Job that was launched to ingest the feature group data.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

save_expectation_suite #

save_expectation_suite(
    expectation_suite: hsfs.expectation_suite.ExpectationSuite
    | great_expectations.core.ExpectationSuite,
    run_validation: bool = True,
    validation_ingestion_policy: Literal[
        "always", "strict"
    ] = "always",
    overwrite: bool = False,
) -> (
    hsfs.expectation_suite.ExpectationSuite
    | great_expectations.core.ExpectationSuite
)

Attach an expectation suite to a feature group and saves it for future use.

If an expectation suite is already attached, it is replaced. Note that the provided expectation suite is modified inplace to include expectationId fields.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

fg.save_expectation_suite(expectation_suite, run_validation=True)
PARAMETER DESCRIPTION
expectation_suite

The expectation suite to attach to the Feature Group.

TYPE: hsfs.expectation_suite.ExpectationSuite | great_expectations.core.ExpectationSuite

overwrite

If an Expectation Suite is already attached, overwrite it. The new suite will have its own validation history, but former reports are preserved.

TYPE: bool DEFAULT: False

run_validation

Set whether the expectation_suite will run on ingestion.

TYPE: bool DEFAULT: True

validation_ingestion_policy

Set the policy for ingestion to the Feature Group.

  • "STRICT" only allows DataFrame passing validation to be inserted into Feature Group.
  • "ALWAYS" always insert the DataFrame to the Feature Group, irrespective of overall validation result.

TYPE: Literal['always', 'strict'] DEFAULT: 'always'

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

save_validation_report #

save_validation_report(
    validation_report: dict[str, Any]
    | ValidationReport
    | great_expectations.core.expectation_validation_result.ExpectationSuiteValidationResult,
    ingestion_result: Literal[
        "UNKNOWN",
        "INGESTED",
        "REJECTED",
        "EXPERIMENT",
        "FG_DATA",
    ] = "UNKNOWN",
    ge_type: bool = HAS_GREAT_EXPECTATIONS,
) -> (
    ValidationReport
    | great_expectations.core.ExpectationSuiteValidationResult
)

Save validation report to hopsworks platform along previous reports of the same Feature Group.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(..., expectation_suite=expectation_suite)

validation_report = great_expectations.from_pandas(
    my_experimental_features_df,
    fg.get_expectation_suite()).validate()

fg.save_validation_report(validation_report, ingestion_result="EXPERIMENT")
PARAMETER DESCRIPTION
validation_report

The validation report to attach to the Feature Group.

TYPE: dict[str, Any] | ValidationReport | great_expectations.core.expectation_validation_result.ExpectationSuiteValidationResult

ingestion_result

Specify the fate of the associated data, defaults to "UNKNOWN". Use "INGESTED" or "REJECTED" for validation of DataFrames to be inserted in the Feature Group. Use "EXPERIMENT" for testing and development and "FG_DATA" when validating data already in the Feature Group.

TYPE: Literal['UNKNOWN', 'INGESTED', 'REJECTED', 'EXPERIMENT', 'FG_DATA'] DEFAULT: 'UNKNOWN'

ge_type

If True returns a native Great Expectation type, Hopsworks custom type otherwise. Conversion can be performed via the to_ge_type() method on hopsworks type.

TYPE: bool DEFAULT: HAS_GREAT_EXPECTATIONS

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

hopsworks.client.exceptions.FeatureStoreException

If feature group is not registered with Hopsworks.

select #

select(
    features: list[str | feature.Feature],
) -> query.Query

Select a subset of features of the feature group and return a query object.

The query can be used to construct joins of feature groups or create a feature view with a subset of features of the feature group.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
from hsfs.feature import Feature
fg = fs.create_feature_group(
        "fg",
        features=[
                Feature("id", type="string"),
                Feature("ts", type="bigint"),
                Feature("f1", type="date"),
                Feature("f2", type="double")
                ],
        primary_key=["id"],
        event_time="ts")

# construct query
query = fg.select(["id", "f1"])
query.features
# [Feature('id', ...), Feature('f1', ...)]
PARAMETER DESCRIPTION
features

A list of Feature objects or feature names as strings to be selected.

TYPE: list[str | feature.Feature]

RETURNS DESCRIPTION
query.Query

A query object with the selected features of the feature group.

select_all #

select_all(
    include_primary_key: bool = True,
    include_foreign_key: bool = True,
    include_partition_key: bool = True,
    include_event_time: bool = True,
) -> query.Query

Select all features along with primary key and event time from the feature group and return a query object.

The query can be used to construct joins of feature groups or create a feature view.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instances
fg1 = fs.get_or_create_feature_group(...)
fg2 = fs.get_or_create_feature_group(...)

# construct the query
query = fg1.select_all().join(fg2.select_all())

# show first 5 rows
query.show(5)

# select all features exclude primary key and event time
from hsfs.feature import Feature
fg = fs.create_feature_group(
        "fg",
        features=[
                Feature("id", type="string"),
                Feature("ts", type="bigint"),
                Feature("f1", type="date"),
                Feature("f2", type="double")
                ],
        primary_key=["id"],
        event_time="ts")

query = fg.select_all()
query.features
# [Feature('id', ...), Feature('ts', ...), Feature('f1', ...), Feature('f2', ...)]

query = fg.select_all(include_primary_key=False, include_event_time=False)
query.features
# [Feature('f1', ...), Feature('f2', ...)]
PARAMETER DESCRIPTION
include_primary_key

If True, include primary key of the feature group to the feature list.

TYPE: bool DEFAULT: True

include_foreign_key

If True, include foreign key of the feature group to the feature list.

TYPE: bool DEFAULT: True

include_partition_key

If True, include partition key of the feature group to the feature list.

TYPE: bool DEFAULT: True

include_event_time

If True, include event time of the feature group to the feature list.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
query.Query

A query object with all features of the feature group.

select_except #

select_except(
    features: list[str | feature.Feature] | None = None,
) -> query.Query

Select all features including primary key and event time feature of the feature group except provided features and return a query object.

The query can be used to construct joins of feature groups or create a feature view with a subset of features of the feature group.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
from hsfs.feature import Feature
fg = fs.create_feature_group(
        "fg",
        features=[
                Feature("id", type="string"),
                Feature("ts", type="bigint"),
                Feature("f1", type="date"),
                Feature("f2", type="double")
                ],
        primary_key=["id"],
        event_time="ts")

# construct query
query = fg.select_except(["ts", "f1"])
query.features
# [Feature('id', ...), Feature('f1', ...)]
PARAMETER DESCRIPTION
features

A list of Feature objects or feature names as strings to be excluded from the selection. None or [] selects all features.

TYPE: list[str | feature.Feature] | None DEFAULT: None

RETURNS DESCRIPTION
query.Query

A query object with the selected features of the feature group.

select_features #

select_features() -> query.Query

Select all the features in the feature group and return a query object.

Queries define the schema of Feature View objects which can be used to create Training Datasets, read from the Online Feature Store, and more. They can also be composed to create more complex queries using the join method.

Info

This method does not select the primary key and event time of the feature group. Use select_all to include them. Note that primary keys do not need to be included in the query to allow joining on them.

Example
# connect to the Feature Store
fs = hopsworks.login().get_feature_store()

# Some dataframe to create the feature group with
# both an event time and a primary key column
my_df.head()
+------------+------------+------------+------------+
|    id      | feature_1  |    ...     |    ts      |
+------------+------------+------------+------------+
|     8      |     8      |            |    15      |
|     3      |     3      |    ...     |    6       |
|     1      |     1      |            |    18      |
+------------+------------+------------+------------+

# Create the Feature Group instances
fg1 = fs.create_feature_group(
        name = "fg1",
        version=1,
        primary_key=["id"],
        event_time="ts",
    )

# Insert data to the feature group.
fg1.insert(my_df)

# select all features from `fg1` excluding primary key and event time
query = fg1.select_features()

# show first 3 rows
query.show(3)

# Output, no id or ts columns
+------------+------------+------------+
| feature_1  | feature_2  | feature_3  |
+------------+------------+------------+
|     8      |     7      |    15      |
|     3      |     1      |     6      |
|     1      |     2      |    18      |
+------------+------------+------------+
Example
# connect to the Feature Store
fs = hopsworks.login().get_feature_store()

# Get the Feature Group from the previous example
fg1 = fs.get_feature_group("fg1", 1)

# Some dataframe to create another feature group
# with a primary key column
+------------+------------+------------+
|    id_2    | feature_6  | feature_7  |
+------------+------------+------------+
|     8      |     11     |            |
|     3      |     4      |    ...     |
|     1      |     9      |            |
+------------+------------+------------+

# join the two feature groups on their indexes, `id` and `id_2`
# but does not include them in the query
query = fg1.select_features().join(fg2.select_features(), left_on="id", right_on="id_2")

# show first 5 rows
query.show(3)

# Output
+------------+------------+------------+------------+------------+
| feature_1  | feature_2  | feature_3  | feature_6  | feature_7  |
+------------+------------+------------+------------+------------+
|     8      |     7      |    15      |    11      |    15      |
|     3      |     1      |     6      |     4      |     3      |
|     1      |     2      |    18      |     9      |    20      |
+------------+------------+------------+------------+------------+
RETURNS DESCRIPTION
query.Query

A query object with all features of the feature group.

show #

show(n: int, online: bool = False) -> list[list[Any]]

Show the first n rows of the feature group.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

# make a query and show top 5 rows
fg.select(['date','weekly_sales','is_holiday']).show(5)
PARAMETER DESCRIPTION
n

Number of rows to show.

TYPE: int

online

If True read from online feature store.

TYPE: bool DEFAULT: False

to_dict #

to_dict() -> dict[str, Any]

Get structured info about specific Feature Group in python dictionary format.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

fg.to_dict()

update_deprecated #

update_deprecated(
    deprecate: bool = True,
) -> (
    FeatureGroupBase
    | FeatureGroup
    | ExternalFeatureGroup
    | SpineGroup
)

Deprecate the feature group.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

fg.update_deprecated(deprecate=True)
Safe update

This method updates the feature group safely. In case of failure your local metadata object will be kept unchanged.

PARAMETER DESCRIPTION
deprecate

Whether the feature group should be deprecated.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
FeatureGroupBase | FeatureGroup | ExternalFeatureGroup | SpineGroup

The updated feature group object.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

update_description #

update_description(
    description: str,
) -> (
    FeatureGroupBase
    | FeatureGroup
    | ExternalFeatureGroup
    | SpineGroup
)

Update the description of the feature group.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

fg.update_description(description="Much better description.")
Safe update

This method updates the feature group description safely. In case of failure your local metadata object will keep the old description.

PARAMETER DESCRIPTION
description

New description string.

TYPE: str

RETURNS DESCRIPTION
FeatureGroupBase | FeatureGroup | ExternalFeatureGroup | SpineGroup

The updated feature group object.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

update_feature_description #

update_feature_description(
    feature_name: str, description: str
) -> (
    FeatureGroupBase
    | FeatureGroup
    | ExternalFeatureGroup
    | SpineGroup
)

Update the description of a single feature in this feature group.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

fg.update_feature_description(
    feature_name="min_temp",
    description="Much better feature description.",
)
Safe update

This method updates the feature description safely. In case of failure your local metadata object will keep the old description.

PARAMETER DESCRIPTION
feature_name

Name of the feature to be updated.

TYPE: str

description

New description string.

TYPE: str

RETURNS DESCRIPTION
FeatureGroupBase | FeatureGroup | ExternalFeatureGroup | SpineGroup

The updated feature group object.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

update_features #

update_features(
    features: feature.Feature | list[feature.Feature],
) -> (
    FeatureGroupBase
    | FeatureGroup
    | ExternalFeatureGroup
    | SpineGroup
)

Update metadata of features in this feature group.

Currently it's only supported to update the description of a feature.

Unsafe update

Note that if you use an existing Feature object of the schema in the feature group metadata object, this might leave your metadata object in a corrupted state if the update fails.

PARAMETER DESCRIPTION
features

A feature object or list thereof to be updated.

TYPE: feature.Feature | list[feature.Feature]

RETURNS DESCRIPTION
FeatureGroupBase | FeatureGroup | ExternalFeatureGroup | SpineGroup

The updated feature group object.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

update_notification_topic_name #

update_notification_topic_name(
    notification_topic_name: str,
) -> (
    FeatureGroupBase
    | ExternalFeatureGroup
    | SpineGroup
    | FeatureGroup
)

Update the notification topic name of the feature group.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

fg.update_notification_topic_name(notification_topic_name="notification_topic_name")
Safe update

This method updates the feature group notification topic name safely. In case of failure your local metadata object will keep the old notification topic name.

PARAMETER DESCRIPTION
notification_topic_name

Name of the topic used for sending notifications when entries are inserted or updated on the online feature store. If set to None no notifications are sent.

TYPE: str

RETURNS DESCRIPTION
FeatureGroupBase | ExternalFeatureGroup | SpineGroup | FeatureGroup

The updated feature group object.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

update_statistics_config #

update_statistics_config() -> (
    FeatureGroup
    | ExternalFeatureGroup
    | SpineGroup
    | FeatureGroupBase
)

Update the statistics configuration of the feature group.

Change the statistics_config object and persist the changes by calling this method.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

fg.update_statistics_config()
RETURNS DESCRIPTION
FeatureGroup | ExternalFeatureGroup | SpineGroup | FeatureGroupBase

The updated metadata object of the feature group.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

hopsworks.client.exceptions.FeatureStoreException

If statistics are not supported for this feature group type.

validate #

validate(
    dataframe: pd.DataFrame
    | TypeVar("pyspark.sql.DataFrame")
    | None = None,
    expectation_suite: hsfs.expectation_suite.ExpectationSuite
    | None = None,
    save_report: bool = False,
    validation_options: dict[str, Any] | None = None,
    ingestion_result: Literal[
        "UNKNOWN",
        "INGESTED",
        "REJECTED",
        "EXPERIMENT",
        "FG_DATA",
    ] = "UNKNOWN",
    ge_type: bool = True,
) -> (
    great_expectations.core.ExpectationSuiteValidationResult
    | ValidationReport
    | None
)

Run validation based on the attached expectations.

Runs the expectation suite attached to the feature group against the provided dataframe. Raise an error if the great_expectations package is not installed.

Example
# connect to the Feature Store
fs = ...

# get feature group instance
fg = fs.get_or_create_feature_group(...)

ge_report = fg.validate(df, save_report=False)
PARAMETER DESCRIPTION
dataframe

The dataframe to run the data validation expectations against.

TYPE: pd.DataFrame | TypeVar('pyspark.sql.DataFrame') | None DEFAULT: None

expectation_suite

Optionally provide an Expectation Suite to override the one that is possibly attached to the feature group. This is useful for testing new Expectation suites. When an extra suite is provided, the results will never be persisted.

TYPE: hsfs.expectation_suite.ExpectationSuite | None DEFAULT: None

validation_options

Additional validation options as key-value pairs.

  • Key "run_validation" is a boolean value, set to False to skip validation temporarily on ingestion.
  • Key "ge_validate_kwargs" is a dictionary containing kwargs for the validate method of Great Expectations.

TYPE: dict[str, Any] | None DEFAULT: None

ingestion_result

Specify the fate of the associated data. Use "INGESTED" or "REJECTED" for validation of DataFrames to be inserted in the Feature Group. Use "EXPERIMENT" for testing and development and "FG_DATA" when validating data already in the Feature Group.

TYPE: Literal['UNKNOWN', 'INGESTED', 'REJECTED', 'EXPERIMENT', 'FG_DATA'] DEFAULT: 'UNKNOWN'

save_report

Whether to save the report to the backend. This is only possible if the Expectation suite is initialised and attached to the Feature Group.

TYPE: bool DEFAULT: False

ge_type

Whether to return a Great Expectations object or Hopsworks own abstraction.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
great_expectations.core.ExpectationSuiteValidationResult | ValidationReport | None

A Validation Report produced by Great Expectations.