Skip to content

Storage Connector#

You can obtain storage connectors using FeatureStore.get_storage_connector and FeatureStore.get_online_storage_connector.

StorageConnector #

Bases: ABC

description property #

description: str | None

User provided description of the storage connector.

id property #

id: int | None

Id of the storage connector uniquely identifying it in the Feature store.

name property #

name: str

Name of the storage connector.

type property #

type: str | None

Type of the connector as string, e.g. "HOPFS, S3, ADLS, REDSHIFT, JDBC or SNOWFLAKE.

connector_options #

connector_options() -> dict[str, Any]

Return prepared options to be passed to an external connector library.

Not implemented for this connector type.

get_data #

get_data(data_source: ds.DataSource) -> dsd.DataSourceData

Retrieve the data from the data source.

Example

# connect to the Feature Store
fs = ...

sc = fs.get_storage_connector("conn_name")

tables = sc.get_tables("database_name")

data = sc.get_data(tables[0])
PARAMETER DESCRIPTION
data_source

The data source to retrieve data from.

TYPE: DataSource

RETURNS DESCRIPTION
DataSourceData

An object containing the data retrieved from the data source.

TYPE: dsd.DataSourceData

get_databases #

get_databases() -> list[str]

Retrieve the list of available databases.

Example

# connect to the Feature Store
fs = ...

sc = fs.get_storage_connector("conn_name")

databases = sc.get_databases()
RETURNS DESCRIPTION
list[str]

list[str]: A list of database names available in the storage connector.

get_feature_groups #

get_feature_groups() -> list[FeatureGroup]

Get the feature groups using this storage connector, based on explicit rovenance.

Only the accessible feature groups are returned. For more items use the base method, see get_feature_groups_provenance.

RETURNS DESCRIPTION
list[FeatureGroup]

List of feature groups.

get_feature_groups_provenance #

get_feature_groups_provenance() -> Links | None

Get the generated feature groups using this storage connector, based on explicit provenance.

These feature groups can be accessible or inaccessible.

Explicit provenance does not track deleted generated feature group links, so deleted will always be empty. For inaccessible feature groups, only a minimal information is returned.

RETURNS DESCRIPTION
Links | None

The feature groups generated using this storage connector or None if none were created.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

In case the backend encounters an issue.

get_metadata #

get_metadata(data_source: ds.DataSource) -> dict

Retrieve metadata information about the data source.

Example

# connect to the Feature Store
fs = ...

sc = fs.get_storage_connector("conn_name")

tables = sc.get_tables("database_name")

metadata = sc.get_metadata(tables[0])
PARAMETER DESCRIPTION
data_source

The data source to retrieve metadata from.

TYPE: DataSource

RETURNS DESCRIPTION
dict

A dictionary containing metadata about the data source.

TYPE: dict

get_tables #

get_tables(database: str = None) -> list[ds.DataSource]

Retrieve the list of tables from the specified database.

Example

# connect to the Feature Store
fs = ...

sc = fs.get_storage_connector("conn_name")

tables = sc.get_tables("database_name")
PARAMETER DESCRIPTION
database

The name of the database to list tables from. If not provided, the default database is used.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
list[ds.DataSource]

list[DataSource]: A list of DataSource objects representing the tables.

prepare_spark #

prepare_spark(path: str | None = None) -> str | None

Prepare Spark to use this Storage Connector.

Arguments#

path: Path to prepare for reading from cloud storage. Defaults to `None`.

read #

read(
    query: str | None = None,
    data_format: str | None = None,
    options: dict[str, Any] | None = None,
    path: str | None = None,
    dataframe_type: Literal[
        "default",
        "spark",
        "pandas",
        "polars",
        "numpy",
        "python",
    ] = "default",
) -> (
    TypeVar("pyspark.sql.DataFrame")
    | TypeVar("pyspark.RDD")
    | pd.DataFrame
    | np.ndarray
    | pl.DataFrame
)

Reads a query or a path into a dataframe using the storage connector.

Note, paths are only supported for object stores like S3, HopsFS and ADLS, while queries are meant for JDBC or databases like Redshift and Snowflake.

PARAMETER DESCRIPTION
query

By default, the storage connector will read the table configured together with the connector, if any. It's possible to overwrite this by passing a SQL query here.

TYPE: str | None DEFAULT: None

data_format

When reading from object stores such as S3, HopsFS and ADLS, specify the file format to be read, e.g., csv, parquet.

TYPE: str | None DEFAULT: None

options

Any additional key/value options to be passed to the connector.

TYPE: dict[str, Any] | None DEFAULT: None

path

Path to be read from within the bucket of the storage connector. Not relevant for JDBC or database based connectors such as Snowflake, JDBC or Redshift.

TYPE: str | None DEFAULT: None

dataframe_type

The type of the returned dataframe. Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine.

TYPE: Literal['default', 'spark', 'pandas', 'polars', 'numpy', 'python'] DEFAULT: 'default'

RETURNS DESCRIPTION
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame

The read dataframe.

refetch #

refetch() -> None

Refetch storage connector.

HopsFSConnector #

Bases: StorageConnector

spark_options #

spark_options() -> dict[str, Any]

Return prepared options to be passed to Spark, based on the additional arguments.

JdbcConnector #

Bases: StorageConnector

arguments property #

arguments: dict[str, Any] | None

Additional JDBC arguments.

When running hsfs with PySpark/Spark in Hopsworks, the driver is automatically provided in the classpath but you need to set the driver argument to com.mysql.cj.jdbc.Driver when creating the Storage Connector.

connection_string property #

connection_string: str | None

JDBC connection string.

read #

read(
    query: str,
    data_format: str | None = None,
    options: dict[str, Any] | None = None,
    path: str | None = None,
    dataframe_type: str = "default",
) -> (
    TypeVar("pyspark.sql.DataFrame")
    | TypeVar("pyspark.RDD")
    | pd.DataFrame
    | np.ndarray
    | pl.DataFrame
)

Reads a query into a dataframe using the storage connector.

PARAMETER DESCRIPTION
query

A SQL query to be read.

TYPE: str

data_format

Not relevant for JDBC based connectors.

TYPE: str | None DEFAULT: None

options

Any additional key/value options to be passed to the JDBC connector.

TYPE: dict[str, Any] | None DEFAULT: None

path

Not relevant for JDBC based connectors.

TYPE: str | None DEFAULT: None

dataframe_type

str, optional. The type of the returned dataframe. Possible values are "default", "spark","pandas", "polars", "numpy" or "python". Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine.

TYPE: str DEFAULT: 'default'

RETURNS DESCRIPTION
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame

DataFrame.

spark_options #

spark_options() -> dict[str, Any]

Return prepared options to be passed to Spark, based on the additional arguments.

S3Connector #

Bases: StorageConnector

access_key property #

access_key: str | None

Access key.

arguments property #

arguments: dict[str, Any] | None

Additional spark options for the S3 connector, passed as a dictionary.

These are set using the Spark Options field in the UI when creating the connector. Example: {"fs.s3a.endpoint": "s3.eu-west-1.amazonaws.com", "fs.s3a.path.style.access": "true"}.

bucket property #

bucket: str | None

Return the bucket for S3 connectors.

iam_role property #

iam_role: str | None

IAM role.

path property #

path: str | None

If the connector refers to a path (e.g. S3) - return the path of the connector.

region property #

region: str | None

Return the region for S3 connectors.

secret_key property #

secret_key: str | None

Secret key.

server_encryption_algorithm property #

server_encryption_algorithm: str | None

Encryption algorithm if server-side S3 bucket encryption is enabled.

server_encryption_key property #

server_encryption_key: str | None

Encryption key if server-side S3 bucket encryption is enabled.

session_token property #

session_token: str | None

Session token.

connector_options #

connector_options() -> dict[str, Any]

Return options to be passed to an external S3 connector library.

prepare_spark #

prepare_spark(path: str | None = None) -> str | None

Prepare Spark to use this Storage Connector.

conn.prepare_spark()

spark.read.format("json").load("s3a://[bucket]/path")

# or
spark.read.format("json").load(conn.prepare_spark("s3a://[bucket]/path"))
PARAMETER DESCRIPTION
path

Path to prepare for reading from cloud storage.

TYPE: str | None DEFAULT: None

read #

read(
    query: str | None = None,
    data_format: str | None = None,
    options: dict[str, Any] | None = None,
    path: str = "",
    dataframe_type: Literal[
        "default",
        "spark",
        "pandas",
        "polars",
        "numpy",
        "python",
    ] = "default",
) -> (
    TypeVar("pyspark.sql.DataFrame")
    | TypeVar("pyspark.RDD")
    | pd.DataFrame
    | np.ndarray
    | pl.DataFrame
)

Reads a query or a path into a dataframe using the storage connector.

Note, paths are only supported for object stores like S3, HopsFS and ADLS, while queries are meant for JDBC or databases like Redshift and Snowflake.

PARAMETER DESCRIPTION
query

Not relevant for S3 connectors.

TYPE: str | None DEFAULT: None

data_format

The file format of the files to be read, e.g. csv, parquet.

TYPE: str | None DEFAULT: None

options

Any additional key/value options to be passed to the S3 connector.

TYPE: dict[str, Any] | None DEFAULT: None

path

Path within the bucket to be read.

TYPE: str DEFAULT: ''

dataframe_type

The type of the returned dataframe. Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine.

TYPE: Literal['default', 'spark', 'pandas', 'polars', 'numpy', 'python'] DEFAULT: 'default'

RETURNS DESCRIPTION
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame

DataFrame.

spark_options #

spark_options() -> dict[str, str]

Return prepared options to be passed to Spark, based on the additional arguments.

RedshiftConnector #

Bases: StorageConnector

arguments property #

arguments: str | None

Additional JDBC, REDSHIFT, or Snowflake arguments.

auto_create property #

auto_create: bool | None

Database username for redshift cluster.

cluster_identifier property #

cluster_identifier: str | None

Cluster identifier for redshift cluster.

database_driver property #

database_driver: str | None

Database endpoint for redshift cluster.

database_endpoint property #

database_endpoint: str | None

Database endpoint for redshift cluster.

database_group property #

database_group: str | None

Database username for redshift cluster.

database_name property #

database_name: str | None

Database name for redshift cluster.

database_password property #

database_password: str | None

Database password for redshift cluster.

database_port property #

database_port: int | str | None

Database port for redshift cluster.

database_user_name property #

database_user_name: str | None

Database username for redshift cluster.

expiration property #

expiration: int | str | None

Cluster temporary credential expiration time.

iam_role property #

iam_role: Any | None

IAM role.

table_name property #

table_name: str | None

Table name for redshift cluster.

connector_options #

connector_options() -> dict[str, Any]

Return options to be passed to an external Redshift connector library.

read #

read(
    query: str | None = None,
    data_format: str | None = None,
    options: dict[str, Any] | None = None,
    path: str | None = None,
    dataframe_type: str = "default",
) -> (
    TypeVar("pyspark.sql.DataFrame")
    | TypeVar("pyspark.RDD")
    | pd.DataFrame
    | np.ndarray
    | pl.DataFrame
)

Reads a table or query into a dataframe using the storage connector.

PARAMETER DESCRIPTION
query

By default, the storage connector will read the table configured together with the connector, if any. It's possible to overwrite this by passing a SQL query here. Defaults to None.

TYPE: str | None DEFAULT: None

data_format

Not relevant for JDBC based connectors such as Redshift.

TYPE: str | None DEFAULT: None

options

Any additional key/value options to be passed to the JDBC connector.

TYPE: dict[str, Any] | None DEFAULT: None

path

Not relevant for JDBC based connectors such as Redshift.

TYPE: str | None DEFAULT: None

dataframe_type

str, optional. The type of the returned dataframe. Possible values are "default", "spark","pandas", "polars", "numpy" or "python". Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine.

TYPE: str DEFAULT: 'default'

RETURNS DESCRIPTION
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame

DataFrame.

refetch #

refetch() -> None

Refetch storage connector in order to retrieve updated temporary credentials.

spark_options #

spark_options() -> dict[str, Any]

Return prepared options to be passed to Spark, based on the additional arguments.

AdlsConnector #

Bases: StorageConnector

account_name property #

account_name: str | None

Account name of the ADLS storage connector.

application_id property #

application_id: str | None

Application ID of the ADLS storage connector.

container_name property #

container_name: str | None

Container name of the ADLS storage connector.

directory_id property #

directory_id: str | None

Directory ID of the ADLS storage connector.

generation property #

generation: str | None

Generation of the ADLS storage connector.

path property #

path: str | None

If the connector refers to a path (e.g. ADLS) - return the path of the connector.

service_credential property #

service_credential: str | None

Service credential of the ADLS storage connector.

prepare_spark #

prepare_spark(path: str | None = None) -> str | None

Prepare Spark to use this Storage Connector.

conn.prepare_spark()

spark.read.format("json").load("abfss://[container-name]@[account_name].dfs.core.windows.net/[path]")

# or
spark.read.format("json").load(conn.prepare_spark("abfss://[container-name]@[account_name].dfs.core.windows.net/[path]"))
PARAMETER DESCRIPTION
path

Path to prepare for reading from cloud storage. Defaults to None.

TYPE: str | None DEFAULT: None

read #

read(
    query: str | None = None,
    data_format: str | None = None,
    options: dict[str, Any] | None = None,
    path: str = "",
    dataframe_type: str = "default",
) -> (
    TypeVar("pyspark.sql.DataFrame")
    | TypeVar("pyspark.RDD")
    | pd.DataFrame
    | np.ndarray
    | pl.DataFrame
)

Reads a path into a dataframe using the storage connector.

PARAMETER DESCRIPTION
query

Not relevant for ADLS connectors.

TYPE: str | None DEFAULT: None

data_format

The file format of the files to be read, e.g. csv, parquet.

TYPE: str | None DEFAULT: None

options

Any additional key/value options to be passed to the ADLS connector.

TYPE: dict[str, Any] | None DEFAULT: None

path

Path within the bucket to be read. For example, path=path will read directly from the container specified on connector by constructing the URI as 'abfss://[container-name]@[account_name].dfs.core.windows.net/[path]'.

TYPE: str DEFAULT: ''

dataframe_type

str, optional. The type of the returned dataframe. Possible values are "default", "spark","pandas", "polars", "numpy" or "python". Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine.

TYPE: str DEFAULT: 'default'

RETURNS DESCRIPTION
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame

DataFrame.

spark_options #

spark_options() -> dict[str, Any]

Return prepared options to be passed to Spark, based on the additional arguments.

SnowflakeConnector #

Bases: StorageConnector

account property #

account: str | None

Account of the Snowflake storage connector.

application property #

application: Any

Application of the Snowflake storage connector.

database property #

database: str | None

Database of the Snowflake storage connector.

options property #

options: dict[str, Any] | None

Additional options for the Snowflake storage connector.

passphrase property #

passphrase: str | None

Passphrase for the private key file.

password property #

password: str | None

Password of the Snowflake storage connector.

private_key property #

private_key: str | None

Path to the private key file for key pair authentication.

role property #

role: Any | None

Role of the Snowflake storage connector.

schema property #

schema: str | None

Schema of the Snowflake storage connector.

table property #

table: str | None

Table of the Snowflake storage connector.

token property #

token: str | None

OAuth token of the Snowflake storage connector.

url property #

url: str | None

URL of the Snowflake storage connector.

user property #

user: Any | None

User of the Snowflake storage connector.

warehouse property #

warehouse: str | None

Warehouse of the Snowflake storage connector.

connector_options #

connector_options() -> dict[str, Any] | None

Prepare a Python dictionary with the needed arguments for you to connect to a Snowflake database.

It is useful for the snowflake.connector Python library.

import snowflake.connector

sc = fs.get_storage_connector("snowflake_conn")
ctx = snowflake.connector.connect(**sc.connector_options())

read #

read(
    query: str | None = None,
    data_format: str | None = None,
    options: dict[str, Any] | None = None,
    path: str | None = None,
    dataframe_type: str = "default",
) -> (
    TypeVar("pyspark.sql.DataFrame")
    | TypeVar("pyspark.RDD")
    | pd.DataFrame
    | np.ndarray
    | pl.DataFrame
)

Reads a table or query into a dataframe using the storage connector.

PARAMETER DESCRIPTION
query

By default, the storage connector will read the table configured together with the connector, if any. It's possible to overwrite this by passing a SQL query here. Defaults to None.

TYPE: str | None DEFAULT: None

data_format

Not relevant for Snowflake connectors.

TYPE: str | None DEFAULT: None

options

Any additional key/value options to be passed to the engine.

TYPE: dict[str, Any] | None DEFAULT: None

path

Not relevant for Snowflake connectors.

TYPE: str | None DEFAULT: None

dataframe_type

str, optional. The type of the returned dataframe. Possible values are "default", "spark","pandas", "polars", "numpy" or "python". Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine.

TYPE: str DEFAULT: 'default'

RETURNS DESCRIPTION
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame

DataFrame.

snowflake_connector_options #

snowflake_connector_options() -> dict[str, Any] | None

Alias for connector_options.

spark_options #

spark_options() -> dict[str, Any]

Return prepared options to be passed to Spark, based on the additional arguments.

GcsConnector #

Bases: StorageConnector

This storage connector provides integration to Google Cloud Storage (GCS).

Once you create a connector in FeatureStore, you can transact data from a GCS bucket into a spark dataframe by calling the read API.

Authentication to GCP is handled by uploading the JSON keyfile for service account to the Hopsworks Project. For more information on service accounts and creating keyfile in GCP, read Google Cloud documentation.

The connector also supports the optional encryption method Customer Supplied Encryption Key by Google. The encryption details are stored as Secrets in the FeatureStore for keeping it secure. Read more about encryption on Google Documentation.

The storage connector uses the Google gcs-connector-hadoop behind the scenes. For more information, check out Google Cloud Storage Connector for Spark and Hadoop.

algorithm property #

algorithm: str | None

Encryption Algorithm.

bucket property #

bucket: str | None

GCS Bucket.

encryption_key property #

encryption_key: str | None

Encryption Key.

encryption_key_hash property #

encryption_key_hash: str | None

Encryption Key Hash.

key_path property #

key_path: str | None

JSON keyfile for service account.

path property #

path: str | None

The path of the connector along with gs file system prefixed.

prepare_spark #

prepare_spark(path: str | None = None) -> str | None

Prepare Spark to use this Storage Connector.

conn.prepare_spark()
spark.read.format("json").load("gs://bucket/path")
# or
spark.read.format("json").load(conn.prepare_spark("gs://bucket/path"))
PARAMETER DESCRIPTION
path

Path to prepare for reading from Google cloud storage. Defaults to None.

TYPE: str | None DEFAULT: None

read #

read(
    query: str | None = None,
    data_format: str | None = None,
    options: dict[str, Any] | None = None,
    path: str = "",
    dataframe_type: str = "default",
) -> (
    TypeVar("pyspark.sql.DataFrame")
    | TypeVar("pyspark.RDD")
    | pd.DataFrame
    | np.ndarray
    | pl.DataFrame
)

Reads GCS path into a dataframe using the storage connector.

To read directly from the default bucket, you can omit the path argument:

conn.read(data_format='spark_formats')
Or to read objects from default bucket provide the object path without gsUtil URI schema. For example, following will read from a path gs://bucket_on_connector/Path/object :
conn.read(data_format='spark_formats', paths='Path/object')
Or to read with full gsUtil URI path,
conn.read(data_format='spark_formats',path='gs://BUCKET/DATA')
Parameters: query: Not relevant for GCS connectors. data_format: Spark data format. Defaults to None. options: Spark options. Defaults to None. path: GCS path. Defaults to None. dataframe_type: str, optional. The type of the returned dataframe. Possible values are "default", "spark","pandas", "polars", "numpy" or "python". Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine.

RAISES DESCRIPTION
ValueError

Malformed arguments.

RETURNS DESCRIPTION
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame

Dataframe: A Spark dataframe.

spark_options #

spark_options() -> dict[str, Any]

Return prepared options to be passed to Spark, based on the additional arguments.

BigQueryConnector #

Bases: StorageConnector

The BigQuery storage connector provides integration to Google Cloud BigQuery.

You can use it to run bigquery on your GCP cluster and load results into spark dataframe by calling the read API.

Authentication to GCP is handled by uploading the JSON keyfile for service account to the Hopsworks Project. For more information on service accounts and creating keyfile in GCP, read Google Cloud documentation.

The storage connector uses the Google spark-bigquery-connector behind the scenes. To read more about the spark connector, like the spark options or usage, check Apache Spark SQL connector for Google BigQuery.

arguments property #

arguments: dict[str, Any]

Additional spark options.

dataset property #

dataset: str | None

BigQuery dataset (The dataset containing the table).

key_path property #

key_path: str | None

JSON keyfile for service account.

materialization_dataset property #

materialization_dataset: str | None

BigQuery materialization dataset (The dataset where the materialized view is going to be created, used in case of query).

parent_project property #

parent_project: str | None

BigQuery parent project (Google Cloud Project ID of the table to bill for the export).

query_project property #

query_project: str | None

BigQuery project (The Google Cloud Project ID of the table).

query_table property #

query_table: str | None

BigQuery table name.

connector_options #

connector_options() -> dict[str, Any]

Return options to be passed to an external BigQuery connector library.

read #

read(
    query: str | None = None,
    data_format: str | None = None,
    options: dict[str, Any] | None = None,
    path: str | None = None,
    dataframe_type: str = "default",
) -> (
    TypeVar("pyspark.sql.DataFrame")
    | TypeVar("pyspark.RDD")
    | pd.DataFrame
    | np.ndarray
    | pl.DataFrame
)

Reads results from BigQuery into a spark dataframe using the storage connector.

Reading from bigquery is done via either specifying the BigQuery table or BigQuery query. For example, to read from a BigQuery table, set the BigQuery project, dataset and table on storage connector and read directly from the corresponding path.

conn.read()
OR, to read results from a BigQuery query, set Materialization Dataset on storage connector, and pass your SQL to query argument.
conn.read(query='SQL')
Optionally, passing query argument will take priority at runtime if the table options were also set on the storage connector. This allows user to run from both a query or table with same connector, assuming all fields were set. Also, user can set the path argument to a bigquery table path to read at runtime, if table options were not set initially while creating the connector.
conn.read(path='project.dataset.table')

PARAMETER DESCRIPTION
query

BigQuery query. Defaults to None.

TYPE: str | None DEFAULT: None

data_format

Spark data format. Defaults to None.

TYPE: str | None DEFAULT: None

options

Spark options. Defaults to None.

TYPE: dict[str, Any] | None DEFAULT: None

path

BigQuery table path. Defaults to None.

TYPE: str | None DEFAULT: None

dataframe_type

str, optional. The type of the returned dataframe. Possible values are "default", "spark","pandas", "polars", "numpy" or "python". Defaults to "default", which maps to Spark dataframe for the Spark Engine and Pandas dataframe for the Python engine.

TYPE: str DEFAULT: 'default'

RAISES DESCRIPTION
ValueError

Malformed arguments.

RETURNS DESCRIPTION
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.RDD') | pd.DataFrame | np.ndarray | pl.DataFrame

Dataframe: A Spark dataframe.

spark_options #

spark_options() -> dict[str, Any]

Return spark options to be set for BigQuery spark connector.

KafkaConnector #

Bases: StorageConnector

bootstrap_servers property #

bootstrap_servers: list[str] | None

Bootstrap servers string.

options property #

options: dict[str, Any]

Bootstrap servers string.

security_protocol property #

security_protocol: str | None

Bootstrap servers string.

ssl_endpoint_identification_algorithm property #

ssl_endpoint_identification_algorithm: str | None

Bootstrap servers string.

ssl_keystore_location property #

ssl_keystore_location: str | None

Bootstrap servers string.

ssl_truststore_location property #

ssl_truststore_location: str | None

Bootstrap servers string.

confluent_options #

confluent_options() -> dict[str, Any]

Return prepared options to be passed to confluent_kafka, based on the provided apache spark configuration.

Right now only producer values with Importance >= medium are implemented.

See https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html.

create_pem_files #

create_pem_files(kafka_options: dict[str, Any]) -> None

Create PEM (Privacy Enhanced Mail) files for Kafka SSL authentication.

This method writes the necessary PEM files for SSL authentication with Kafka, using the provided keystore and truststore locations and passwords. The generated file paths are stored as the following instance variables:

- self.ca_chain_path: Path to the generated CA chain PEM file.
- self.client_cert_path: Path to the generated client certificate PEM file.
- self.client_key_path: Path to the generated client key PEM file.

These files are used for configuring secure Kafka connections (e.g., with Spark or confluent_kafka). The method is idempotent and will only create the files once per connector instance.

kafka_options #

kafka_options(distribute=True) -> dict[str, Any]

Return prepared options to be passed to kafka, based on the additional arguments.

See https://kafka.apache.org/documentation/.

read #

read(
    query: str | None = None,
    data_format: str | None = None,
    options: dict[str, Any] | None = None,
    path: str | None = None,
    dataframe_type: str = "default",
) -> None

NOT SUPPORTED.

read_stream #

read_stream(
    topic: str,
    topic_pattern: bool = False,
    message_format: str = "avro",
    schema: str | None = None,
    options: dict[str, Any] | None = None,
    include_metadata: bool = False,
) -> TypeVar("pyspark.sql.DataFrame") | TypeVar(
    "pyspark.sql.streaming.StreamingQuery"
)

Reads a Kafka stream from a topic or multiple topics into a Dataframe.

Engine Support

Spark only

Reading from data streams using Pandas/Python as engine is currently not supported. Python/Pandas has no notion of streaming.

PARAMETER DESCRIPTION
topic

Name or pattern of the topic(s) to subscribe to.

TYPE: str

topic_pattern

Flag to indicate if topic string is a pattern. Defaults to False.

TYPE: bool DEFAULT: False

message_format

The format of the messages to use for decoding. Can be "avro" or "json". Defaults to "avro".

TYPE: str DEFAULT: 'avro'

schema

Optional schema, to use for decoding, can be an Avro schema string for "avro" message format, or for JSON encoding a Spark StructType schema, or a DDL formatted string. Defaults to None.

TYPE: str | None DEFAULT: None

options

Additional options as key/value string pairs to be passed to Spark. Defaults to {}.

TYPE: dict[str, Any] | None DEFAULT: None

include_metadata

Indicate whether to return additional metadata fields from messages in the stream. Otherwise, only the decoded value fields are returned. Defaults to False.

TYPE: bool DEFAULT: False

RAISES DESCRIPTION
ValueError

Malformed arguments.

RETURNS DESCRIPTION
TypeVar('pyspark.sql.DataFrame') | TypeVar('pyspark.sql.streaming.StreamingQuery')

StreamingDataframe: A Spark streaming dataframe.

spark_options #

spark_options() -> dict[str, Any]

Return prepared options to be passed to Spark, based on the additional arguments.

This is done by just adding 'kafka.' prefix to kafka_options.

See https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#kafka-specific-configurations.