Storage Connector#
You can obtain storage connectors using FeatureStore.get_storage_connector and FeatureStore.get_online_storage_connector.
StorageConnector #
Bases: ABC
id property #
id: int | None
Id of the storage connector uniquely identifying it in the Feature store.
type property #
type: str | None
Type of the connector as string, e.g. "HOPFS, S3, ADLS, REDSHIFT, JDBC or SNOWFLAKE.
connector_options #
Return prepared options to be passed to an external connector library.
Not implemented for this connector type.
get_data #
get_data(data_source: ds.DataSource) -> dsd.DataSourceData
Retrieve the data from the data source.
Example
# connect to the Feature Store
fs = ...
sc = fs.get_storage_connector("conn_name")
tables = sc.get_tables("database_name")
data = sc.get_data(tables[0])
| PARAMETER | DESCRIPTION |
|---|---|
data_source | The data source to retrieve data from. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
DataSourceData | An object containing the data retrieved from the data source. TYPE: |
get_databases #
get_feature_groups #
get_feature_groups() -> list[FeatureGroup]
Get the feature groups using this storage connector, based on explicit rovenance.
Only the accessible feature groups are returned. For more items use the base method, see get_feature_groups_provenance.
| RETURNS | DESCRIPTION |
|---|---|
list[FeatureGroup] | List of feature groups. |
get_feature_groups_provenance #
get_feature_groups_provenance() -> Links | None
Get the generated feature groups using this storage connector, based on explicit provenance.
These feature groups can be accessible or inaccessible.
Explicit provenance does not track deleted generated feature group links, so deleted will always be empty. For inaccessible feature groups, only a minimal information is returned.
| RETURNS | DESCRIPTION |
|---|---|
Links | None | The feature groups generated using this storage connector or |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | In case the backend encounters an issue. |
get_metadata #
get_metadata(data_source: ds.DataSource) -> dict
Retrieve metadata information about the data source.
Example
# connect to the Feature Store
fs = ...
sc = fs.get_storage_connector("conn_name")
tables = sc.get_tables("database_name")
metadata = sc.get_metadata(tables[0])
| PARAMETER | DESCRIPTION |
|---|---|
data_source | The data source to retrieve metadata from. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
dict | A dictionary containing metadata about the data source. TYPE: |
get_tables #
Retrieve the list of tables from the specified database.
Example
# connect to the Feature Store
fs = ...
sc = fs.get_storage_connector("conn_name")
tables = sc.get_tables("database_name")
| PARAMETER | DESCRIPTION |
|---|---|
database | The name of the database to list tables from. If not provided, the default database is used. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
list[ds.DataSource] | list[DataSource]: A list of DataSource objects representing the tables. |
prepare_spark #
Prepare Spark to use this Storage Connector.
Arguments#
path: Path to prepare for reading from cloud storage. Defaults to `None`.
read #
read(
query: str | None = None,
data_format: str | None = None,
options: dict[str, Any] | None = None,
path: str | None = None,
dataframe_type: Literal[
"default",
"spark",
"pandas",
"polars",
"numpy",
"python",
] = "default",
) -> (
TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
| pd.DataFrame
| np.ndarray
| pl.DataFrame
)
Reads a query or a path into a dataframe using the storage connector.
Note, paths are only supported for object stores like S3, HopsFS and ADLS, while queries are meant for JDBC or databases like Redshift and Snowflake.
| PARAMETER | DESCRIPTION |
|---|---|
query | By default, the storage connector will read the table configured together with the connector, if any. It's possible to overwrite this by passing a SQL query here. TYPE: |
data_format | When reading from object stores such as S3, HopsFS and ADLS, specify the file format to be read, e.g., TYPE: |
options | Any additional key/value options to be passed to the connector. |
path | Path to be read from within the bucket of the storage connector. Not relevant for JDBC or database based connectors such as Snowflake, JDBC or Redshift. TYPE: |
dataframe_type | The type of the returned dataframe. Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame | The read dataframe. |
HopsFSConnector #
Bases: StorageConnector
JdbcConnector #
Bases: StorageConnector
arguments property #
Additional JDBC arguments.
When running hsfs with PySpark/Spark in Hopsworks, the driver is automatically provided in the classpath but you need to set the driver argument to com.mysql.cj.jdbc.Driver when creating the Storage Connector.
read #
read(
query: str,
data_format: str | None = None,
options: dict[str, Any] | None = None,
path: str | None = None,
dataframe_type: str = "default",
) -> (
TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
| pd.DataFrame
| np.ndarray
| pl.DataFrame
)
Reads a query into a dataframe using the storage connector.
| PARAMETER | DESCRIPTION |
|---|---|
query | A SQL query to be read. TYPE: |
data_format | Not relevant for JDBC based connectors. TYPE: |
options | Any additional key/value options to be passed to the JDBC connector. |
path | Not relevant for JDBC based connectors. TYPE: |
dataframe_type | str, optional. The type of the returned dataframe. Possible values are TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame |
|
S3Connector #
Bases: StorageConnector
arguments property #
Additional spark options for the S3 connector, passed as a dictionary.
These are set using the Spark Options field in the UI when creating the connector. Example: {"fs.s3a.endpoint": "s3.eu-west-1.amazonaws.com", "fs.s3a.path.style.access": "true"}.
path property #
path: str | None
If the connector refers to a path (e.g. S3) - return the path of the connector.
server_encryption_algorithm property #
server_encryption_algorithm: str | None
Encryption algorithm if server-side S3 bucket encryption is enabled.
server_encryption_key property #
server_encryption_key: str | None
Encryption key if server-side S3 bucket encryption is enabled.
connector_options #
Return options to be passed to an external S3 connector library.
prepare_spark #
Prepare Spark to use this Storage Connector.
conn.prepare_spark()
spark.read.format("json").load("s3a://[bucket]/path")
# or
spark.read.format("json").load(conn.prepare_spark("s3a://[bucket]/path"))
| PARAMETER | DESCRIPTION |
|---|---|
path | Path to prepare for reading from cloud storage. TYPE: |
read #
read(
query: str | None = None,
data_format: str | None = None,
options: dict[str, Any] | None = None,
path: str = "",
dataframe_type: Literal[
"default",
"spark",
"pandas",
"polars",
"numpy",
"python",
] = "default",
) -> (
TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
| pd.DataFrame
| np.ndarray
| pl.DataFrame
)
Reads a query or a path into a dataframe using the storage connector.
Note, paths are only supported for object stores like S3, HopsFS and ADLS, while queries are meant for JDBC or databases like Redshift and Snowflake.
| PARAMETER | DESCRIPTION |
|---|---|
query | Not relevant for S3 connectors. TYPE: |
data_format | The file format of the files to be read, e.g. TYPE: |
options | Any additional key/value options to be passed to the S3 connector. |
path | Path within the bucket to be read. TYPE: |
dataframe_type | The type of the returned dataframe. Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame |
|
RedshiftConnector #
Bases: StorageConnector
cluster_identifier property #
cluster_identifier: str | None
Cluster identifier for redshift cluster.
database_user_name property #
database_user_name: str | None
Database username for redshift cluster.
connector_options #
Return options to be passed to an external Redshift connector library.
read #
read(
query: str | None = None,
data_format: str | None = None,
options: dict[str, Any] | None = None,
path: str | None = None,
dataframe_type: str = "default",
) -> (
TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
| pd.DataFrame
| np.ndarray
| pl.DataFrame
)
Reads a table or query into a dataframe using the storage connector.
| PARAMETER | DESCRIPTION |
|---|---|
query | By default, the storage connector will read the table configured together with the connector, if any. It's possible to overwrite this by passing a SQL query here. Defaults to TYPE: |
data_format | Not relevant for JDBC based connectors such as Redshift. TYPE: |
options | Any additional key/value options to be passed to the JDBC connector. |
path | Not relevant for JDBC based connectors such as Redshift. TYPE: |
dataframe_type | str, optional. The type of the returned dataframe. Possible values are TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame |
|
refetch #
refetch() -> None
Refetch storage connector in order to retrieve updated temporary credentials.
AdlsConnector #
Bases: StorageConnector
path property #
path: str | None
If the connector refers to a path (e.g. ADLS) - return the path of the connector.
service_credential property #
service_credential: str | None
Service credential of the ADLS storage connector.
prepare_spark #
Prepare Spark to use this Storage Connector.
conn.prepare_spark()
spark.read.format("json").load("abfss://[container-name]@[account_name].dfs.core.windows.net/[path]")
# or
spark.read.format("json").load(conn.prepare_spark("abfss://[container-name]@[account_name].dfs.core.windows.net/[path]"))
| PARAMETER | DESCRIPTION |
|---|---|
path | Path to prepare for reading from cloud storage. Defaults to TYPE: |
read #
read(
query: str | None = None,
data_format: str | None = None,
options: dict[str, Any] | None = None,
path: str = "",
dataframe_type: str = "default",
) -> (
TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
| pd.DataFrame
| np.ndarray
| pl.DataFrame
)
Reads a path into a dataframe using the storage connector.
| PARAMETER | DESCRIPTION |
|---|---|
query | Not relevant for ADLS connectors. TYPE: |
data_format | The file format of the files to be read, e.g. TYPE: |
options | Any additional key/value options to be passed to the ADLS connector. |
path | Path within the bucket to be read. For example, path= TYPE: |
dataframe_type | str, optional. The type of the returned dataframe. Possible values are TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame |
|
SnowflakeConnector #
Bases: StorageConnector
options property #
Additional options for the Snowflake storage connector.
private_key property #
private_key: str | None
Path to the private key file for key pair authentication.
connector_options #
Prepare a Python dictionary with the needed arguments for you to connect to a Snowflake database.
It is useful for the snowflake.connector Python library.
import snowflake.connector
sc = fs.get_storage_connector("snowflake_conn")
ctx = snowflake.connector.connect(**sc.connector_options())
read #
read(
query: str | None = None,
data_format: str | None = None,
options: dict[str, Any] | None = None,
path: str | None = None,
dataframe_type: str = "default",
) -> (
TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
| pd.DataFrame
| np.ndarray
| pl.DataFrame
)
Reads a table or query into a dataframe using the storage connector.
| PARAMETER | DESCRIPTION |
|---|---|
query | By default, the storage connector will read the table configured together with the connector, if any. It's possible to overwrite this by passing a SQL query here. Defaults to TYPE: |
data_format | Not relevant for Snowflake connectors. TYPE: |
options | Any additional key/value options to be passed to the engine. |
path | Not relevant for Snowflake connectors. TYPE: |
dataframe_type | str, optional. The type of the returned dataframe. Possible values are TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame |
|
snowflake_connector_options #
Alias for connector_options.
GcsConnector #
Bases: StorageConnector
This storage connector provides integration to Google Cloud Storage (GCS).
Once you create a connector in FeatureStore, you can transact data from a GCS bucket into a spark dataframe by calling the read API.
Authentication to GCP is handled by uploading the JSON keyfile for service account to the Hopsworks Project. For more information on service accounts and creating keyfile in GCP, read Google Cloud documentation.
The connector also supports the optional encryption method Customer Supplied Encryption Key by Google. The encryption details are stored as Secrets in the FeatureStore for keeping it secure. Read more about encryption on Google Documentation.
The storage connector uses the Google gcs-connector-hadoop behind the scenes. For more information, check out Google Cloud Storage Connector for Spark and Hadoop.
prepare_spark #
Prepare Spark to use this Storage Connector.
conn.prepare_spark()
spark.read.format("json").load("gs://bucket/path")
# or
spark.read.format("json").load(conn.prepare_spark("gs://bucket/path"))
| PARAMETER | DESCRIPTION |
|---|---|
path | Path to prepare for reading from Google cloud storage. Defaults to TYPE: |
read #
read(
query: str | None = None,
data_format: str | None = None,
options: dict[str, Any] | None = None,
path: str = "",
dataframe_type: str = "default",
) -> (
TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
| pd.DataFrame
| np.ndarray
| pl.DataFrame
)
Reads GCS path into a dataframe using the storage connector.
To read directly from the default bucket, you can omit the path argument:
conn.read(data_format='spark_formats')
conn.read(data_format='spark_formats', paths='Path/object')
conn.read(data_format='spark_formats',path='gs://BUCKET/DATA')
None. options: Spark options. Defaults to None. path: GCS path. Defaults to None. dataframe_type: str, optional. The type of the returned dataframe. Possible values are "default", "spark","pandas", "polars", "numpy" or "python". Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine. | RAISES | DESCRIPTION |
|---|---|
ValueError | Malformed arguments. |
| RETURNS | DESCRIPTION |
|---|---|
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame |
|
BigQueryConnector #
Bases: StorageConnector
The BigQuery storage connector provides integration to Google Cloud BigQuery.
You can use it to run bigquery on your GCP cluster and load results into spark dataframe by calling the read API.
Authentication to GCP is handled by uploading the JSON keyfile for service account to the Hopsworks Project. For more information on service accounts and creating keyfile in GCP, read Google Cloud documentation.
The storage connector uses the Google spark-bigquery-connector behind the scenes. To read more about the spark connector, like the spark options or usage, check Apache Spark SQL connector for Google BigQuery.
materialization_dataset property #
materialization_dataset: str | None
BigQuery materialization dataset (The dataset where the materialized view is going to be created, used in case of query).
parent_project property #
parent_project: str | None
BigQuery parent project (Google Cloud Project ID of the table to bill for the export).
query_project property #
query_project: str | None
BigQuery project (The Google Cloud Project ID of the table).
connector_options #
Return options to be passed to an external BigQuery connector library.
read #
read(
query: str | None = None,
data_format: str | None = None,
options: dict[str, Any] | None = None,
path: str | None = None,
dataframe_type: str = "default",
) -> (
TypeVar("pyspark.sql.DataFrame")
| TypeVar("pyspark.RDD")
| pd.DataFrame
| np.ndarray
| pl.DataFrame
)
Reads results from BigQuery into a spark dataframe using the storage connector.
Reading from bigquery is done via either specifying the BigQuery table or BigQuery query. For example, to read from a BigQuery table, set the BigQuery project, dataset and table on storage connector and read directly from the corresponding path.
conn.read()
Materialization Dataset on storage connector, and pass your SQL to query argument. conn.read(query='SQL')
query argument will take priority at runtime if the table options were also set on the storage connector. This allows user to run from both a query or table with same connector, assuming all fields were set. Also, user can set the path argument to a bigquery table path to read at runtime, if table options were not set initially while creating the connector. conn.read(path='project.dataset.table')
| PARAMETER | DESCRIPTION |
|---|---|
query | BigQuery query. Defaults to TYPE: |
data_format | Spark data format. Defaults to TYPE: |
options | Spark options. Defaults to |
path | BigQuery table path. Defaults to TYPE: |
dataframe_type | str, optional. The type of the returned dataframe. Possible values are TYPE: |
| RAISES | DESCRIPTION |
|---|---|
ValueError | Malformed arguments. |
| RETURNS | DESCRIPTION |
|---|---|
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame |
|
KafkaConnector #
Bases: StorageConnector
ssl_endpoint_identification_algorithm property #
ssl_endpoint_identification_algorithm: str | None
Bootstrap servers string.
confluent_options #
Return prepared options to be passed to confluent_kafka, based on the provided apache spark configuration.
Right now only producer values with Importance >= medium are implemented.
See https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html.
create_pem_files #
Create PEM (Privacy Enhanced Mail) files for Kafka SSL authentication.
This method writes the necessary PEM files for SSL authentication with Kafka, using the provided keystore and truststore locations and passwords. The generated file paths are stored as the following instance variables:
- self.ca_chain_path: Path to the generated CA chain PEM file.
- self.client_cert_path: Path to the generated client certificate PEM file.
- self.client_key_path: Path to the generated client key PEM file.
These files are used for configuring secure Kafka connections (e.g., with Spark or confluent_kafka). The method is idempotent and will only create the files once per connector instance.
kafka_options #
Return prepared options to be passed to kafka, based on the additional arguments.
read #
read(
query: str | None = None,
data_format: str | None = None,
options: dict[str, Any] | None = None,
path: str | None = None,
dataframe_type: str = "default",
) -> None
NOT SUPPORTED.
read_stream #
read_stream(
topic: str,
topic_pattern: bool = False,
message_format: str = "avro",
schema: str | None = None,
options: dict[str, Any] | None = None,
include_metadata: bool = False,
) -> TypeVar("pyspark.sql.DataFrame") | TypeVar(
"pyspark.sql.streaming.StreamingQuery"
)
Reads a Kafka stream from a topic or multiple topics into a Dataframe.
Engine Support
Spark only
Reading from data streams using Pandas/Python as engine is currently not supported. Python/Pandas has no notion of streaming.
| PARAMETER | DESCRIPTION |
|---|---|
topic | Name or pattern of the topic(s) to subscribe to. TYPE: |
topic_pattern | Flag to indicate if TYPE: |
message_format | The format of the messages to use for decoding. Can be TYPE: |
schema | Optional schema, to use for decoding, can be an Avro schema string for TYPE: |
options | Additional options as key/value string pairs to be passed to Spark. Defaults to |
include_metadata | Indicate whether to return additional metadata fields from messages in the stream. Otherwise, only the decoded value fields are returned. Defaults to TYPE: |
| RAISES | DESCRIPTION |
|---|---|
ValueError | Malformed arguments. |
| RETURNS | DESCRIPTION |
|---|---|
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.sql.streaming.StreamingQuery') |
|
spark_options #
Return prepared options to be passed to Spark, based on the additional arguments.
This is done by just adding 'kafka.' prefix to kafka_options.