Skip to content

ExternalFeatureGroup#

You can create an external feature group by calling FeatureStore.create_external_feature_group and retrieve an existing one by calling FeatureStore.get_external_feature_group.

ExternalFeatureGroup #

Bases: FeatureGroupBase

A feature group that references data stored outside Hopsworks.

creator property #

creator: user.User | None

User who created the feature group.

description property writable #

description: str | None

Description of the feature group, as it appears in the UI.

feature_store_name property #

feature_store_name: str | None

Name of the feature store in which the feature group is located.

id property #

id: int | None

ID of the feature group, set by backend.

find_neighbors #

find_neighbors(
    embedding: list[int | float],
    col: str | None = None,
    k: int | None = 10,
    filter: Filter | Logic | None = None,
    options: dict | None = None,
) -> list[tuple[float, list[Any]]]

Finds the nearest neighbors for a given embedding in the vector database.

If filter is specified, or if embedding feature is stored in default project index, the number of results returned may be less than k. Try using a large value of k and extract the top k items from the results if needed.

PARAMETER DESCRIPTION
embedding

The target embedding for which neighbors are to be found.

TYPE: list[int | float]

col

The column name used to compute similarity score. Required only if there are multiple embeddings.

TYPE: str | None DEFAULT: None

k

The number of nearest neighbors to retrieve.

TYPE: int | None DEFAULT: 10

filter

A filter expression to restrict the search space.

TYPE: Filter | Logic | None DEFAULT: None

options

The options used for the request to the vector database. The keys are attribute values of the hsfs.core.opensearch.OpensearchRequestOption class.

TYPE: dict | None DEFAULT: None

RETURNS DESCRIPTION
list[tuple[float, list[Any]]]

A list of tuples representing the nearest neighbors.

list[tuple[float, list[Any]]]

Each tuple contains: (The similarity score, A list of feature values).

Example
embedding_index = EmbeddingIndex()
embedding_index.add_embedding(name="user_vector", dimension=3)
fg = fs.create_feature_group(
            name='air_quality',
            embedding_index = embedding_index,
            version=1,
            primary_key=['id1'],
            online_enabled=True,
        )
fg.insert(data)
fg.find_neighbors(
    [0.1, 0.2, 0.3],
    k=5,
)

# apply filter
fg.find_neighbors(
    [0.1, 0.2, 0.3],
    k=5,
    filter=(fg.id1 > 10) & (fg.id1 < 30)
)

insert #

insert(
    features: pd.DataFrame
    | TypeVar("pyspark.sql.DataFrame")
    | TypeVar("pyspark.RDD")
    | np.ndarray
    | list[list],
    write_options: dict[str, Any] | None = None,
    validation_options: dict[str, Any] | None = None,
    wait: bool = False,
) -> tuple[
    None,
    great_expectations.core.ExpectationSuiteValidationResult
    | None,
]

Insert the dataframe feature values ONLY in the online feature store.

External Feature Groups contains metadata about feature data in an external storage system. External storage system are usually offline, meaning feature values cannot be retrieved in real-time. In order to use the feature values for real-time use-cases, you can insert them in Hopsoworks Online Feature Store via this method.

The Online Feature Store has a single-entry per primary key value, meaining that providing a new value with for a given primary key will overwrite the existing value. No record of the previous value is kept.

Example
# connect to the Feature Store
fs = ...

# get the External Feature Group instance
fg = fs.get_feature_group(name="external_sales_records", version=1)

# get the feature values, e.g reading from csv files in a S3 bucket
feature_values = ...

# insert the feature values in the online feature store
fg.insert(feature_values)
Note

Data Validation via Great Expectation is supported if you have attached an expectation suite to your External Feature Group. However, as opposed to regular Feature Groups, this can lead to discrepancies between the data in the external storage system and the online feature store.

PARAMETER DESCRIPTION
features

Features to be saved.

TYPE: pd.DataFrame | TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | np.ndarray | list[list]

write_options

Additional write options as key-value pairs.

When using the python engine, write_options can contain the following entries:

  • key wait_for_job and value True or False to configure whether or not to the insert call should return only after the Hopsworks Job has finished. By default it waits.
  • key wait_for_online_ingestion and value True or False to configure whether or not to the save call should return only after the Hopsworks online ingestion has finished. By default it does not wait.
  • key kafka_producer_config and value an object of type properties used to configure the Kafka client. To optimize for throughput in high latency connection consider changing producer properties.
  • key internal_kafka and value True or False in case you established connectivity from you Python environment to the internal advertised listeners of the Hopsworks Kafka Cluster. Defaults to False and will use external listeners when connecting from outside of Hopsworks.

TYPE: dict[str, Any] | None DEFAULT: None

validation_options

Additional validation options as key-value pairs.

  • key run_validation boolean value, set to False to skip validation temporarily on ingestion.
  • key save_report boolean value, set to False to skip upload of the validation report to Hopsworks.
  • key ge_validate_kwargs a dictionary containing kwargs for the validate method of Great Expectations.
  • key fetch_expectation_suite a boolean value, by default True, to control whether the expectation suite of the feature group should be fetched before every insert.

TYPE: dict[str, Any] | None DEFAULT: None

wait

Wait for job and online ingestion to finish before returning. Shortcut for write_options {"wait_for_job": False, "wait_for_online_ingestion": False}.

TYPE: bool DEFAULT: False

RETURNS DESCRIPTION
tuple[None, great_expectations.core.ExpectationSuiteValidationResult | None]

The validation report if validation is enabled.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

e.g., fail to create feature group, dataframe schema does not match existing feature group schema, etc.

hsfs.client.exceptions.DataValidationException

If data validation fails and the expectation suite validation_ingestion_policy is set to STRICT. Data is NOT ingested.

read #

read(
    dataframe_type: Literal[
        "default",
        "spark",
        "pandas",
        "polars",
        "numpy",
        "python",
    ] = "default",
    online: bool = False,
    read_options: dict[str, Any] | None = None,
) -> (
    TypeVar("pyspark.sql.DataFrame")
    | TypeVar("pyspark.RDD")
    | pd.DataFrame
    | pl.DataFrame
    | np.ndarray
)

Get the feature group as a DataFrame.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

df = fg.read()
Engine Support

Spark only

Reading an External Feature Group directly into a Pandas Dataframe using Python/Pandas as Engine is not supported, however, you can use the Query API to create Feature Views/Training Data containing External Feature Groups.

PARAMETER DESCRIPTION
dataframe_type

The type of the returned dataframe. By default, maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine.

TYPE: Literal['default', 'spark', 'pandas', 'polars', 'numpy', 'python'] DEFAULT: 'default'

online

If True read from online feature store.

TYPE: bool DEFAULT: False

read_options

Additional options as key/value pairs to pass to the spark engine.

TYPE: dict[str, Any] | None DEFAULT: None

RETURNS DESCRIPTION
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | pl.DataFrame | np.ndarray

One of:

TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | pl.DataFrame | np.ndarray
  • DataFrame: The spark dataframe containing the feature data.
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | pl.DataFrame | np.ndarray
  • pyspark.DataFrame: A Spark DataFrame.
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | pl.DataFrame | np.ndarray
  • pandas.DataFrame: A Pandas DataFrame.
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | pl.DataFrame | np.ndarray
  • numpy.ndarray: A two-dimensional Numpy array.
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | pl.DataFrame | np.ndarray
  • list: A two-dimensional Python list.
RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

hopsworks.client.exceptions.FeatureStoreException

If trying to read an external feature group directly in.

save #

save() -> None

Persist the metadata for this external feature group.

Without calling this method, your feature group will only exist in your Python Kernel, but not in Hopsworks.

query = "SELECT * FROM sales"

fg = feature_store.create_external_feature_group(name="sales",
    version=1,
    description="Physical shop sales features",
    query=query,
    storage_connector=connector,
    primary_key=['ss_store_sk'],
    event_time='sale_date'
)

fg.save()

show #

show(n: int, online: bool = False) -> list[list[Any]]

Show the first n rows of the feature group.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instance
fg = fs.get_or_create_feature_group(...)

# make a query and show top 5 rows
fg.select(['date','weekly_sales','is_holiday']).show(5)
PARAMETER DESCRIPTION
n

Number of rows to show.

TYPE: int

online

If True read from online feature store.

TYPE: bool DEFAULT: False