Skip to content

FlinkCluster API#

You can obttain a Flink Cluster API handle using Project.get_flink_cluster_api. Once you have it, you can use the following methods to obtain FlinkCluster objects:

setup_cluster(
    name: str, config=None
) -> flink_cluster.FlinkCluster

Create a new flink job representing a flink cluster, or update an existing one.

import hopsworks

project = hopsworks.login()

flink_cluster_api = project.get_flink_cluster_api()

flink_config = flink_cluster_api.get_configuration()

flink_config['appName'] = "myFlinkCluster"

flink_cluster = flink_cluster_api.setup_cluster(name="myFlinkCluster", config=flink_config)
PARAMETER DESCRIPTION
name

Name of the cluster.

TYPE: str

config

Configuration of the cluster.

DEFAULT: None

RETURNS DESCRIPTION
flink_cluster.FlinkCluster

The FlinkCluster object representing the cluster.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

get_cluster(name: str) -> flink_cluster.FlinkCluster | None

Get the job corresponding to the flink cluster.

import hopsworks

project = hopsworks.login()

flink_cluster_api = project.get_flink_cluster_api()

flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
PARAMETER DESCRIPTION
name

Name of the cluster.

TYPE: str

RETURNS DESCRIPTION
flink_cluster.FlinkCluster | None

The FlinkCluster object representing the cluster or None if it does not exist.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

config

Configuration for the cluster.

creation_time

Date of creation for the cluster.

creator

Creator of the cluster.

id

Id of the cluster.

name

Name of the cluster.

state

State of the cluster.

get_jars() -> list[dict]

Get already uploaded jars from the flink cluster.

# log in to hopsworks
import hopsworks
project = hopsworks.login()

# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

# get jar files from this cluster
flink_cluster.get_jars()
RETURNS DESCRIPTION
list[dict]

The array of dictionaries with jar metadata.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

get_job(job_id) -> dict

Get specific job from the flink cluster.

# log in to hopsworks
import hopsworks
project = hopsworks.login()

# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

# get jobs from this cluster
job_id = '113a2af5b724a9b92085dc2d9245e1d6'
flink_cluster.get_job(job_id)
PARAMETER DESCRIPTION
job_id

ID of the job within this cluster.

RETURNS DESCRIPTION
dict

A dictionary with flink job id and status of the job.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

get_jobs() -> list[dict]

Get jobs from the flink cluster.

# log in to hopsworks
import hopsworks
project = hopsworks.login()

# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

# get jobs from this flink cluster
flink_cluster.get_jobs()
RETURNS DESCRIPTION
list[dict]

The array of dictionaries with flink job id and status of the job.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

get_url()

Get url to the flink cluster in Hopsworks.

job_state(
    job_id,
) -> Literal[
    "INITIALIZING",
    "CREATED",
    "RUNNING",
    "FAILING",
    "FAILED",
    "CANCELLING",
    "CANCELED",
    "FINISHED",
    "RESTARTING",
    "SUSPENDED",
    "RECONCILING",
]

Gets state of the job submitted to the flink cluster.

# log in to hopsworks
import hopsworks
project = hopsworks.login()

# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

# get jobs from this flink cluster
job_id = '113a2af5b724a9b92085dc2d9245e1d6'
flink_cluster.job_state(job_id)
PARAMETER DESCRIPTION
job_id

ID of the job within this flink cluster.

RETURNS DESCRIPTION
Literal['INITIALIZING', 'CREATED', 'RUNNING', 'FAILING', 'FAILED', 'CANCELLING', 'CANCELED', 'FINISHED', 'RESTARTING', 'SUSPENDED', 'RECONCILING']

Status of the job.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

start(await_time=1800)

Start the flink cluster and wait until it reaches RUNNING state.

import hopsworks

project = hopsworks.login()

flink_cluster_api = project.get_flink_cluster_api()

flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

flink_cluster.start()
PARAMETER DESCRIPTION
await_time

defaults to 1800 seconds to account for auto-scale mechanisms.

DEFAULT: 1800

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

stop()

Stop this cluster.

# log in to hopsworks
import hopsworks
project = hopsworks.login()

# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()

flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

flink_cluster.stop()
RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

stop_job(job_id)

Stop specific job in the flink cluster.

# log in to hopsworks
import hopsworks
project = hopsworks.login()

# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

# stop the job
job_id = '113a2af5b724a9b92085dc2d9245e1d6'
flink_cluster.stop_job(job_id)
PARAMETER DESCRIPTION
job_id

ID of the job within this flink cluster.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

submit_job(jar_id, main_class, job_arguments=None) -> str

Submit job using the specific jar file uploaded to the flink cluster.

# log in to hopsworks
import hopsworks
project = hopsworks.login()

# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

# upload jar file to this cluster
main_class = "com.example.Main"
job_arguments = "-arg1 arg1 -arg2 arg2"
jar_file_path = "./flink-example.jar"
flink_cluster.upload_jar(jar_file_path)

#get jar file metadata (and select the 1st one for demo purposes)
jar_metadata = flink_cluster.get_jars()[0]
jar_id = jar_metadata["id"]
flink_cluster.submit_job(jar_id, main_class, job_arguments=job_arguments)
PARAMETER DESCRIPTION
jar_id

ID of the jar file.

main_class

Path to the main class of the jar file.

job_arguments

Job arguments, if any.

DEFAULT: None

RETURNS DESCRIPTION
str

Job ID.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

upload_jar(jar_file)

Upload jar file to the flink cluster.

# log in to hopsworks
import hopsworks
project = hopsworks.login()

# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")

# upload jar file to this cluster
jar_file_path = "./flink-example.jar"
flink_cluster.upload_jar(jar_file_path)
PARAMETER DESCRIPTION
jar_file

path to the jar file.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.