KafkaSchema API#
You can obtain a KafkaApi handle via Project.get_kafka_api. Once you have it, you can manage Kafka schemas using the KafkaApi methods:
create_schema #
create_schema(
subject: str, schema: dict
) -> kafka_schema.KafkaSchema
Create a new kafka schema.
import hopsworks
project = hopsworks.login()
kafka_api = project.get_kafka_api()
avro_schema = {
"type": "record",
"name": "tutorial",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "data",
"type": "string"
}
]
}
kafka_topic = kafka_api.create_schema("my_schema", avro_schema)
| PARAMETER | DESCRIPTION |
|---|---|
subject | Subject name of the schema. TYPE: |
schema | Avro schema definition. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
kafka_schema.KafkaSchema | The KafkaSchema object. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
get_schema #
get_schema(
subject: str, version: int
) -> kafka_schema.KafkaSchema | None
Get schema given subject name and version.
| PARAMETER | DESCRIPTION |
|---|---|
subject | Subject name. TYPE: |
version | Version number. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
kafka_schema.KafkaSchema | None | KafkaSchema object or |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
get_schemas #
get_schemas(subject: str) -> list[kafka_schema.KafkaSchema]
Get all schema versions for the subject.
| PARAMETER | DESCRIPTION |
|---|---|
subject | Subject name. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
list[kafka_schema.KafkaSchema] | List of KafkaSchema objects. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |