Skip to content

Query#

Query objects are strictly generated by HSFS APIs called on FeatureGroup objects. Users will never construct a Query object using the constructor of the class. For this reason we do not provide the full documentation of the class here.

Query #

featuregroups property #

featuregroups: list[
    fg_mod.FeatureGroup
    | fg_mod.ExternalFeatureGroup
    | fg_mod.SpineGroup
]

List of feature groups used in the query.

features property #

features: list[Feature]

List of all features in the query.

filters property #

filters: Logic | None

All filters used in the query.

joins property #

joins: list[join.Join]

List of joins in the query.

left_feature_group_end_time property writable #

left_feature_group_end_time: (
    str | int | date | datetime | None
)

End time of time travel for the left feature group.

left_feature_group_start_time property writable #

left_feature_group_start_time: (
    str | int | date | datetime | None
)

Start time of time travel for the left feature group.

append_feature #

append_feature(feature: str | Feature) -> Query

Append a feature to the query.

PARAMETER DESCRIPTION
feature

[str, Feature]. Name of the feature to append to the query.

TYPE: str | Feature

as_of #

as_of(
    wallclock_time: str
    | int
    | datetime
    | date
    | None = None,
    exclude_until: str
    | int
    | datetime
    | date
    | None = None,
) -> Query

Perform time travel on the given Query.

Pyspark/Spark Only

Apache HUDI exclusively supports Time Travel and Incremental Query via Spark Context

This method returns a new Query object at the specified point in time. Optionally, commits before a specified point in time can be excluded from the query. The Query can then either be read into a Dataframe or used further to perform joins or construct a training dataset.

Reading features at a specific point in time
fs = connection.get_feature_store();
query = fs.get_feature_group("example_feature_group", 1).select_all()
query.as_of("2020-10-20 07:34:11").read().show()
Reading commits incrementally between specified points in time
fs = connection.get_feature_store();
query = fs.get_feature_group("example_feature_group", 1).select_all()
query.as_of("2020-10-20 07:34:11", exclude_until="2020-10-19 07:34:11").read().show()

The first parameter is inclusive while the latter is exclusive. That means, in order to query a single commit, you need to query that commit time and exclude everything just before the commit.

Reading only the changes from a single commit
fs = connection.get_feature_store();
query = fs.get_feature_group("example_feature_group", 1).select_all()
query.as_of("2020-10-20 07:31:38", exclude_until="2020-10-20 07:31:37").read().show()

When no wallclock_time is given, the latest state of features is returned. Optionally, commits before a specified point in time can still be excluded.

Reading the latest state of features, excluding commits before a specified point in time
fs = connection.get_feature_store();
query = fs.get_feature_group("example_feature_group", 1).select_all()
query.as_of(None, exclude_until="2020-10-20 07:31:38").read().show()

Note that the interval will be applied to all joins in the query. If you want to query different intervals for different feature groups in the query, you have to apply them in a nested fashion:

query1.as_of(..., ...)
    .join(query2.as_of(..., ...))

If instead you apply another as_of selection after the join, all joined feature groups will be queried with this interval:

query1.as_of(..., ...)  # as_of is not applied
    .join(query2.as_of(..., ...))  # as_of is not applied
    .as_of(..., ...)

Warning

This function only works for queries on feature groups with time_travel_format='HUDI'.

Warning

Excluding commits via exclude_until is only possible within the range of the Hudi active timeline. By default, Hudi keeps the last 20 to 30 commits in the active timeline. If you need to keep a longer active timeline, you can overwrite the options: hoodie.keep.min.commits and hoodie.keep.max.commits when calling the insert() method.

PARAMETER DESCRIPTION
wallclock_time

Read data as of this point in time. Strings should be formatted in one of the following formats %Y-%m-%d, %Y-%m-%d %H, %Y-%m-%d %H:%M, or %Y-%m-%d %H:%M:%S.

TYPE: str | int | datetime | date | None DEFAULT: None

exclude_until

Exclude commits until this point in time. Strings should be formatted in one of the following formats %Y-%m-%d, %Y-%m-%d %H, %Y-%m-%d %H:%M, or %Y-%m-%d %H:%M:%S.

TYPE: str | int | datetime | date | None DEFAULT: None

RETURNS DESCRIPTION
Query

Query. The query object with the applied time travel condition.

check_and_warn_ambiguous_features #

check_and_warn_ambiguous_features() -> None

Function that fetches ambiguous features from a query and displays a warning.

filter #

filter(f: Filter | Logic) -> Query

Apply filter to the feature group.

Selects all features and returns the resulting Query with the applied filter.

Example
from hsfs.feature import Feature

query.filter(Feature("weekly_sales") > 1000)
query.filter(Feature("name").like("max%"))

If you are planning to join the filtered feature group later on with another feature group, make sure to select the filtered feature explicitly from the respective feature group:

query.filter(fg.feature1 == 1).show(10)

Composite filters require parenthesis and symbols for logical operands (e.g. &, |, ...):

query.filter((fg.feature1 == 1) | (fg.feature2 >= 2))

Filters are fully compatible with joins
fg1 = fs.get_feature_group("...")
fg2 = fs.get_feature_group("...")
fg3 = fs.get_feature_group("...")

query = fg1.select_all()
    .join(fg2.select_all(), on=["date", "location_id"])
    .join(fg3.select_all(), left_on=["location_id"], right_on=["id"], join_type="left")
    .filter((fg1.location_id == 10) | (fg1.location_id == 20))
Filters can be applied at any point of the query
fg1 = fs.get_feature_group("...")
fg2 = fs.get_feature_group("...")
fg3 = fs.get_feature_group("...")

query = fg1.select_all()
    .join(fg2.select_all().filter(fg2.avg_temp >= 22), on=["date", "location_id"])
    .join(fg3.select_all(), left_on=["location_id"], right_on=["id"], join_type="left")
    .filter(fg1.location_id == 10)
PARAMETER DESCRIPTION
f

Filter object.

TYPE: Filter | Logic

RETURNS DESCRIPTION
Query

Query. The query object with the applied filter.

get_ambiguous_features #

get_ambiguous_features() -> dict[str, set[str]]

Function to check ambiguous features in the query. The function will return a dictionary with feature name of the ambiguous features as key and list feature groups they are in as value.

RETURNS DESCRIPTION
dict[str, set[str]]

Dict[str, List[str]]: Dictionary with ambiguous feature name as key and corresponding set of feature group names and version as value.

get_feature #

get_feature(feature_name: str) -> Feature

Get a feature by name.

PARAMETER DESCRIPTION
feature_name

str. Name of the feature to get.

TYPE: str

RETURNS DESCRIPTION
Feature

Feature. Feature object.

is_cache_feature_group_only #

is_cache_feature_group_only() -> bool

Query contains only cached feature groups.

is_time_travel #

is_time_travel() -> bool

Query contains time travel.

join #

join(
    sub_query: Query,
    on: list[str] | None = None,
    left_on: list[str] | None = None,
    right_on: list[str] | None = None,
    join_type: str | None = "left",
    prefix: str | None = None,
) -> Query

Join Query with another Query.

If no join keys are specified, Hopsworks will use the maximal matching subset of the primary keys of the feature groups you are joining.

Join two feature groups
fg1 = fs.get_feature_group("...")
fg2 = fs.get_feature_group("...")

query = fg1.select_all().join(fg2.select_all())
More complex join
fg1 = fs.get_feature_group("...")
fg2 = fs.get_feature_group("...")
fg3 = fs.get_feature_group("...")

query = fg1.select_all()
        .join(fg2.select_all(), on=["date", "location_id"])
        .join(fg3.select_all(), left_on=["location_id"], right_on=["id"], join_type="left")
PARAMETER DESCRIPTION
sub_query

Right-hand side query to join.

TYPE: Query

on

List of feature names to join on if they are available in both feature groups. Defaults to [].

TYPE: list[str] | None DEFAULT: None

left_on

List of feature names to join on from the left feature group of the join. Defaults to [].

TYPE: list[str] | None DEFAULT: None

right_on

List of feature names to join on from the right feature group of the join. Defaults to [].

TYPE: list[str] | None DEFAULT: None

join_type

Type of join to perform, can be "inner", "outer", "left" or "right". Defaults to "left".

TYPE: str | None DEFAULT: 'left'

prefix

User provided prefix to avoid feature name clash. If no prefix was provided and there is feature name clash then prefixes will be automatically generated and applied. Generated prefix is feature group alias in the query (e.g. fg1, fg2). Prefix is applied to the right feature group of the query. Defaults to None.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
Query

Query: A new Query object representing the join.

pull_changes #

pull_changes(
    wallclock_start_time: str | int | date | datetime,
    wallclock_end_time: str | int | date | datetime,
) -> Query

Same as as_of method, kept for backward compatibility.

Deprecated

pull_changes method is deprecated. Use `as_of(end_wallclock_time, exclude_until=start_wallclock_time) instead.

read #

read(
    online: bool = False,
    dataframe_type: str = "default",
    read_options: dict[str, Any] | None = None,
) -> (
    pd.DataFrame
    | np.ndarray
    | list[list[Any]]
    | TypeVar("pyspark.sql.DataFrame")
    | TypeVar("pyspark.RDD")
)

Read the specified query into a DataFrame.

It is possible to specify the storage (online/offline) to read from and the type of the output DataFrame (Spark, Pandas, Numpy, Python Lists).

External Feature Group Engine Support

Spark only

Reading a Query containing an External Feature Group directly into a Pandas Dataframe using Python/Pandas as Engine is not supported, however, you can use the Query API to create Feature Views/Training Data containing External Feature Groups.

PARAMETER DESCRIPTION
online

Read from online storage. Defaults to False.

TYPE: bool DEFAULT: False

dataframe_type

DataFrame type to return. Defaults to "default".

TYPE: str DEFAULT: 'default'

read_options

Dictionary of read options for Spark in spark engine. Only for python engine: * key "arrow_flight_config" to pass a dictionary of arrow flight configurations. For example: {"arrow_flight_config": {"timeout": 900}} Defaults to {}.

TYPE: dict[str, Any] | None DEFAULT: None

RETURNS DESCRIPTION
pd.DataFrame | np.ndarray | list[list[Any]] | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD')

DataFrame: DataFrame depending on the chosen type.

show #

show(n: int, online: bool = False) -> list[list[Any]]

Show the first N rows of the Query.

Show the first 10 rows
fg1 = fs.get_feature_group("...")
fg2 = fs.get_feature_group("...")

query = fg1.select_all().join(fg2.select_all())

query.show(10)
PARAMETER DESCRIPTION
n

Number of rows to show.

TYPE: int

online

Show from online storage. Defaults to False.

TYPE: bool DEFAULT: False

to_string #

to_string(
    online: bool = False, arrow_flight: bool = False
) -> str

Convert the Query to its string representation.

Example
fg1 = fs.get_feature_group("...")
fg2 = fs.get_feature_group("...").

query = fg1.select_all().join(fg2.select_all())

query.to_string()