Skip to content

KafkaTopic API#

You can obtain a KafkaApi handle via Project.get_kafka_api. Once you have it, you can manage Kafka topics using the KafkaApi methods:

get_default_config #

get_default_config(
    internal_kafka: bool | None = None,
) -> dict

Get the configuration to set up a Producer or Consumer for a Kafka broker using confluent-kafka.

import hopsworks

project = hopsworks.login()

kafka_api = project.get_kafka_api()

kafka_conf = kafka_api.get_default_config()

from confluent_kafka import Producer

producer = Producer(kafka_conf)
RETURNS DESCRIPTION
dict

The kafka configuration.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

create_topic #

create_topic(
    name: str,
    schema: str,
    schema_version: int,
    replicas: int = 1,
    partitions: int = 1,
) -> kafka_topic.KafkaTopic

Create a new kafka topic.

import hopsworks

project = hopsworks.login()

kafka_api = project.get_kafka_api()

kafka_topic = kafka_api.create_topic("my_topic", "my_schema", 1)
PARAMETER DESCRIPTION
name

Name of the topic.

TYPE: str

schema

Subject name of the schema.

TYPE: str

schema_version

Version of the schema.

TYPE: int

replicas

Replication factor for the topic.

TYPE: int DEFAULT: 1

partitions

Number of partitions for the topic.

TYPE: int DEFAULT: 1

RETURNS DESCRIPTION
kafka_topic.KafkaTopic

The KafkaTopic object.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

get_topic #

get_topic(name: str) -> kafka_topic.KafkaTopic | None

Get kafka topic by name.

PARAMETER DESCRIPTION
name

Name of the topic.

TYPE: str

RETURNS DESCRIPTION
kafka_topic.KafkaTopic | None

The KafkaTopic object or None if not found.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

get_topics #

get_topics() -> list[kafka_topic.KafkaTopic]

Get all kafka topics.

RETURNS DESCRIPTION
list[kafka_topic.KafkaTopic]

List of KafkaTopic objects.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

KafkaTopic #

Configuration for a Kafka topic.

name property writable #

name

Name of the Kafka topic.

num_partitions property #

num_partitions

Number of partitions of the Kafka topic.

num_replicas property writable #

num_replicas

Number of replicas of the Kafka topic.

partitions property #

partitions

Number of partitions of the Kafka topic.

replicas property #

replicas

Number of replicas of the Kafka topic.

schema property #

schema

Schema for the topic.

delete #

delete()

Delete the topic.

Potentially dangerous operation

This operation deletes the topic.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request

describe #

describe()

Print a JSON description of the Kafka topic.