Query#
Query objects are strictly generated by HSFS APIs called on FeatureGroup objects. Users will never construct a Query object using the constructor of the class. For this reason we do not provide the full documentation of the class here.
Query #
featuregroups property #
featuregroups: list[
fg_mod.FeatureGroup
| fg_mod.ExternalFeatureGroup
| fg_mod.SpineGroup
]
List of feature groups used in the query.
left_feature_group_end_time property writable #
End time of time travel for the left feature group.
left_feature_group_start_time property writable #
Start time of time travel for the left feature group.
append_feature #
as_of #
as_of(
wallclock_time: str
| int
| datetime
| date
| None = None,
exclude_until: str
| int
| datetime
| date
| None = None,
) -> Query
Perform time travel on the given Query.
Pyspark/Spark Only
Apache HUDI exclusively supports Time Travel and Incremental Query via Spark Context
This method returns a new Query object at the specified point in time. Optionally, commits before a specified point in time can be excluded from the query. The Query can then either be read into a Dataframe or used further to perform joins or construct a training dataset.
Reading features at a specific point in time
fs = connection.get_feature_store();
query = fs.get_feature_group("example_feature_group", 1).select_all()
query.as_of("2020-10-20 07:34:11").read().show()
Reading commits incrementally between specified points in time
fs = connection.get_feature_store();
query = fs.get_feature_group("example_feature_group", 1).select_all()
query.as_of("2020-10-20 07:34:11", exclude_until="2020-10-19 07:34:11").read().show()
The first parameter is inclusive while the latter is exclusive. That means, in order to query a single commit, you need to query that commit time and exclude everything just before the commit.
Reading only the changes from a single commit
fs = connection.get_feature_store();
query = fs.get_feature_group("example_feature_group", 1).select_all()
query.as_of("2020-10-20 07:31:38", exclude_until="2020-10-20 07:31:37").read().show()
When no wallclock_time is given, the latest state of features is returned. Optionally, commits before a specified point in time can still be excluded.
Reading the latest state of features, excluding commits before a specified point in time
fs = connection.get_feature_store();
query = fs.get_feature_group("example_feature_group", 1).select_all()
query.as_of(None, exclude_until="2020-10-20 07:31:38").read().show()
Note that the interval will be applied to all joins in the query. If you want to query different intervals for different feature groups in the query, you have to apply them in a nested fashion:
query1.as_of(..., ...)
.join(query2.as_of(..., ...))
If instead you apply another as_of selection after the join, all joined feature groups will be queried with this interval:
query1.as_of(..., ...) # as_of is not applied
.join(query2.as_of(..., ...)) # as_of is not applied
.as_of(..., ...)
Warning
This function only works for queries on feature groups with time_travel_format='HUDI'.
Warning
Excluding commits via exclude_until is only possible within the range of the Hudi active timeline. By default, Hudi keeps the last 20 to 30 commits in the active timeline. If you need to keep a longer active timeline, you can overwrite the options: hoodie.keep.min.commits and hoodie.keep.max.commits when calling the insert() method.
| PARAMETER | DESCRIPTION |
|---|---|
wallclock_time | Read data as of this point in time. Strings should be formatted in one of the following formats |
exclude_until | Exclude commits until this point in time. Strings should be formatted in one of the following formats |
| RETURNS | DESCRIPTION |
|---|---|
Query |
|
check_and_warn_ambiguous_features #
check_and_warn_ambiguous_features() -> None
Function that fetches ambiguous features from a query and displays a warning.
filter #
filter(f: Filter | Logic) -> Query
Apply filter to the feature group.
Selects all features and returns the resulting Query with the applied filter.
Example
from hsfs.feature import Feature
query.filter(Feature("weekly_sales") > 1000)
query.filter(Feature("name").like("max%"))
If you are planning to join the filtered feature group later on with another feature group, make sure to select the filtered feature explicitly from the respective feature group:
query.filter(fg.feature1 == 1).show(10)
Composite filters require parenthesis and symbols for logical operands (e.g. &, |, ...):
query.filter((fg.feature1 == 1) | (fg.feature2 >= 2))
Filters are fully compatible with joins
fg1 = fs.get_feature_group("...")
fg2 = fs.get_feature_group("...")
fg3 = fs.get_feature_group("...")
query = fg1.select_all()
.join(fg2.select_all(), on=["date", "location_id"])
.join(fg3.select_all(), left_on=["location_id"], right_on=["id"], join_type="left")
.filter((fg1.location_id == 10) | (fg1.location_id == 20))
Filters can be applied at any point of the query
fg1 = fs.get_feature_group("...")
fg2 = fs.get_feature_group("...")
fg3 = fs.get_feature_group("...")
query = fg1.select_all()
.join(fg2.select_all().filter(fg2.avg_temp >= 22), on=["date", "location_id"])
.join(fg3.select_all(), left_on=["location_id"], right_on=["id"], join_type="left")
.filter(fg1.location_id == 10)
| PARAMETER | DESCRIPTION |
|---|---|
f | Filter object. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
Query |
|
get_ambiguous_features #
Function to check ambiguous features in the query. The function will return a dictionary with feature name of the ambiguous features as key and list feature groups they are in as value.
| RETURNS | DESCRIPTION |
|---|---|
dict[str, set[str]] |
|
get_feature #
is_cache_feature_group_only #
is_cache_feature_group_only() -> bool
Query contains only cached feature groups.
join #
join(
sub_query: Query,
on: list[str] | None = None,
left_on: list[str] | None = None,
right_on: list[str] | None = None,
join_type: str | None = "left",
prefix: str | None = None,
) -> Query
Join Query with another Query.
If no join keys are specified, Hopsworks will use the maximal matching subset of the primary keys of the feature groups you are joining.
Join two feature groups
fg1 = fs.get_feature_group("...")
fg2 = fs.get_feature_group("...")
query = fg1.select_all().join(fg2.select_all())
More complex join
fg1 = fs.get_feature_group("...")
fg2 = fs.get_feature_group("...")
fg3 = fs.get_feature_group("...")
query = fg1.select_all()
.join(fg2.select_all(), on=["date", "location_id"])
.join(fg3.select_all(), left_on=["location_id"], right_on=["id"], join_type="left")
| PARAMETER | DESCRIPTION |
|---|---|
sub_query | Right-hand side query to join. TYPE: |
on | List of feature names to join on if they are available in both feature groups. Defaults to |
left_on | List of feature names to join on from the left feature group of the join. Defaults to |
right_on | List of feature names to join on from the right feature group of the join. Defaults to |
join_type | Type of join to perform, can be TYPE: |
prefix | User provided prefix to avoid feature name clash. If no prefix was provided and there is feature name clash then prefixes will be automatically generated and applied. Generated prefix is feature group alias in the query (e.g. fg1, fg2). Prefix is applied to the right feature group of the query. Defaults to TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
Query |
|
pull_changes #
pull_changes(
wallclock_start_time: str | int | date | datetime,
wallclock_end_time: str | int | date | datetime,
) -> Query
Same as as_of method, kept for backward compatibility.
Deprecated
pull_changes method is deprecated. Use `as_of(end_wallclock_time, exclude_until=start_wallclock_time) instead.
read #
read(
online: bool = False,
dataframe_type: str = "default",
read_options: dict[str, Any] | None = None,
) -> (
pd.DataFrame
| np.ndarray
| list[list[Any]]
| TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
)
Read the specified query into a DataFrame.
It is possible to specify the storage (online/offline) to read from and the type of the output DataFrame (Spark, Pandas, Numpy, Python Lists).
External Feature Group Engine Support
Spark only
Reading a Query containing an External Feature Group directly into a Pandas Dataframe using Python/Pandas as Engine is not supported, however, you can use the Query API to create Feature Views/Training Data containing External Feature Groups.
| PARAMETER | DESCRIPTION |
|---|---|
online | Read from online storage. Defaults to TYPE: |
dataframe_type | DataFrame type to return. Defaults to TYPE: |
read_options | Dictionary of read options for Spark in spark engine. Only for python engine: * key |
| RETURNS | DESCRIPTION |
|---|---|
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') |
|
show #
Show the first N rows of the Query.
Show the first 10 rows
fg1 = fs.get_feature_group("...")
fg2 = fs.get_feature_group("...")
query = fg1.select_all().join(fg2.select_all())
query.show(10)
| PARAMETER | DESCRIPTION |
|---|---|
n | Number of rows to show. TYPE: |
online | Show from online storage. Defaults to TYPE: |