FlinkCluster API#
You can obttain a Flink Cluster API handle using Project.get_flink_cluster_api. Once you have it, you can use the following methods to obtain FlinkCluster objects:
setup_cluster #
setup_cluster(
name: str, config=None
) -> flink_cluster.FlinkCluster
Create a new flink job representing a flink cluster, or update an existing one.
import hopsworks
project = hopsworks.login()
flink_cluster_api = project.get_flink_cluster_api()
flink_config = flink_cluster_api.get_configuration()
flink_config['appName'] = "myFlinkCluster"
flink_cluster = flink_cluster_api.setup_cluster(name="myFlinkCluster", config=flink_config)
| PARAMETER | DESCRIPTION |
|---|---|
name | Name of the cluster. TYPE: |
config | Configuration of the cluster. DEFAULT: |
| RETURNS | DESCRIPTION |
|---|---|
flink_cluster.FlinkCluster | The FlinkCluster object representing the cluster. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
get_cluster #
get_cluster(name: str) -> flink_cluster.FlinkCluster | None
Get the job corresponding to the flink cluster.
import hopsworks
project = hopsworks.login()
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
| PARAMETER | DESCRIPTION |
|---|---|
name | Name of the cluster. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
flink_cluster.FlinkCluster | None | The FlinkCluster object representing the cluster or |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
FlinkCluster #
get_jars #
Get already uploaded jars from the flink cluster.
# log in to hopsworks
import hopsworks
project = hopsworks.login()
# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
# get jar files from this cluster
flink_cluster.get_jars()
| RETURNS | DESCRIPTION |
|---|---|
list[dict] | The array of dictionaries with jar metadata. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
get_job #
get_job(job_id) -> dict
Get specific job from the flink cluster.
# log in to hopsworks
import hopsworks
project = hopsworks.login()
# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
# get jobs from this cluster
job_id = '113a2af5b724a9b92085dc2d9245e1d6'
flink_cluster.get_job(job_id)
| PARAMETER | DESCRIPTION |
|---|---|
job_id | ID of the job within this cluster.
|
| RETURNS | DESCRIPTION |
|---|---|
dict | A dictionary with flink job id and status of the job. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
get_jobs #
Get jobs from the flink cluster.
# log in to hopsworks
import hopsworks
project = hopsworks.login()
# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
# get jobs from this flink cluster
flink_cluster.get_jobs()
| RETURNS | DESCRIPTION |
|---|---|
list[dict] | The array of dictionaries with flink job id and status of the job. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
job_state #
job_state(
job_id,
) -> Literal[
"INITIALIZING",
"CREATED",
"RUNNING",
"FAILING",
"FAILED",
"CANCELLING",
"CANCELED",
"FINISHED",
"RESTARTING",
"SUSPENDED",
"RECONCILING",
]
Gets state of the job submitted to the flink cluster.
# log in to hopsworks
import hopsworks
project = hopsworks.login()
# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
# get jobs from this flink cluster
job_id = '113a2af5b724a9b92085dc2d9245e1d6'
flink_cluster.job_state(job_id)
| PARAMETER | DESCRIPTION |
|---|---|
job_id | ID of the job within this flink cluster.
|
| RETURNS | DESCRIPTION |
|---|---|
Literal['INITIALIZING', 'CREATED', 'RUNNING', 'FAILING', 'FAILED', 'CANCELLING', 'CANCELED', 'FINISHED', 'RESTARTING', 'SUSPENDED', 'RECONCILING'] | Status of the job. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
start #
start(await_time=1800)
Start the flink cluster and wait until it reaches RUNNING state.
import hopsworks
project = hopsworks.login()
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
flink_cluster.start()
| PARAMETER | DESCRIPTION |
|---|---|
await_time | defaults to 1800 seconds to account for auto-scale mechanisms. DEFAULT: |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
stop #
stop()
Stop this cluster.
# log in to hopsworks
import hopsworks
project = hopsworks.login()
# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
flink_cluster.stop()
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
stop_job #
stop_job(job_id)
Stop specific job in the flink cluster.
# log in to hopsworks
import hopsworks
project = hopsworks.login()
# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
# stop the job
job_id = '113a2af5b724a9b92085dc2d9245e1d6'
flink_cluster.stop_job(job_id)
| PARAMETER | DESCRIPTION |
|---|---|
job_id | ID of the job within this flink cluster.
|
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
submit_job #
submit_job(jar_id, main_class, job_arguments=None) -> str
Submit job using the specific jar file uploaded to the flink cluster.
# log in to hopsworks
import hopsworks
project = hopsworks.login()
# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
# upload jar file to this cluster
main_class = "com.example.Main"
job_arguments = "-arg1 arg1 -arg2 arg2"
jar_file_path = "./flink-example.jar"
flink_cluster.upload_jar(jar_file_path)
#get jar file metadata (and select the 1st one for demo purposes)
jar_metadata = flink_cluster.get_jars()[0]
jar_id = jar_metadata["id"]
flink_cluster.submit_job(jar_id, main_class, job_arguments=job_arguments)
| PARAMETER | DESCRIPTION |
|---|---|
jar_id | ID of the jar file.
|
main_class | Path to the main class of the jar file.
|
job_arguments | Job arguments, if any. DEFAULT: |
| RETURNS | DESCRIPTION |
|---|---|
str | Job ID. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
upload_jar #
upload_jar(jar_file)
Upload jar file to the flink cluster.
# log in to hopsworks
import hopsworks
project = hopsworks.login()
# fetch flink cluster handle
flink_cluster_api = project.get_flink_cluster_api()
flink_cluster = flink_cluster_api.get_cluster(name="myFlinkCluster")
# upload jar file to this cluster
jar_file_path = "./flink-example.jar"
flink_cluster.upload_jar(jar_file_path)
| PARAMETER | DESCRIPTION |
|---|---|
jar_file | path to the jar file.
|
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |