Skip to content

Jobs API#

You can obtain a JobApi handle via Project.get_job_api.

JobApi #

create_job #

create_job(name: str, config: dict) -> job.Job

Create a new job or update an existing one.

import hopsworks

project = hopsworks.login()

job_api = project.get_job_api()

spark_config = job_api.get_configuration("PYSPARK")

spark_config['appPath'] = "/Resources/my_app.py"

job = job_api.create_job("my_spark_job", spark_config)
PARAMETER DESCRIPTION
name

Name of the job.

TYPE: str

config

Configuration of the job.

TYPE: dict

RETURNS DESCRIPTION
job.Job

The created job.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

exists #

exists(name: str) -> bool

Check if a job exists.

PARAMETER DESCRIPTION
name

Name of the job.

TYPE: str

RETURNS DESCRIPTION
bool

True if the job exists, otherwise False.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

get_configuration #

get_configuration(
    type: Literal[
        "SPARK", "PYSPARK", "PYTHON", "DOCKER", "FLINK"
    ],
) -> dict

Get configuration for the specific job type.

PARAMETER DESCRIPTION
type

The job type to retrieve the configuration of.

TYPE: Literal['SPARK', 'PYSPARK', 'PYTHON', 'DOCKER', 'FLINK']

RETURNS DESCRIPTION
dict

The default job configuration for the specific job type.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

get_job #

get_job(name: str) -> job.Job | None

Get a job.

PARAMETER DESCRIPTION
name

Name of the job.

TYPE: str

RETURNS DESCRIPTION
job.Job | None

The Job object or None if it does not exist.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

get_jobs #

get_jobs() -> list[job.Job]

Get all jobs.

RETURNS DESCRIPTION
list[job.Job]

List of all jobs.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

Job #

config property #

config

Configuration for the job.

creation_time property #

creation_time

Date of creation for the job.

creator property #

creator

Creator of the job.

executions property #

executions

List of executions for the job.

href property #

href

The URL of the job in Hopsworks UI, use get_url instead.

id property #

id

Id of the job.

job_schedule property #

job_schedule

Return the Job schedule.

job_type property #

job_type

Type of the job.

name property #

name

Name of the job.

create_alert #

create_alert(
    receiver: str,
    status: Literal[
        "long_running", "failed", "finished", "killed"
    ],
    severity: Literal["critical", "warning", "info"],
) -> alert.JobAlert

Create an alert for the job.

# Create alert for the job
job.create_alert(
    receiver="email",
    status="failed",
    severity="critical"
)
PARAMETER DESCRIPTION
receiver

The receiver of the alert.

TYPE: str

status

The status of the alert.

TYPE: Literal['long_running', 'failed', 'finished', 'killed']

severity

The severity of the alert.

TYPE: Literal['critical', 'warning', 'info']

RETURNS DESCRIPTION
alert.JobAlert

The created JobAlert object.

RAISES DESCRIPTION
ValueError

If status or severity is not valid.

hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

delete #

delete()

Delete the job.

Potentially dangerous operation

This operation deletes the job and all executions.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

get_alert #

get_alert(alert_id: int) -> alert.JobAlert

Get an alert for the job by ID.

PARAMETER DESCRIPTION
alert_id

ID of the alert.

TYPE: int

RETURNS DESCRIPTION
alert.JobAlert

The JobAlert object.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

get_alerts #

get_alerts() -> list[alert.JobAlert]

Get all alerts for the job.

RETURNS DESCRIPTION
list[alert.JobAlert]

List of JobAlert objects.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

get_executions #

get_executions()

Retrieves all executions for the job ordered by submission time.

RETURNS DESCRIPTION

List[Execution]: List of Execution objects.

RAISES DESCRIPTION
hopsworks.client.exceptions.RestAPIError

If the backend encounters an error when handling the request.

get_final_state #

get_final_state() -> Literal[
    "UNDEFINED",
    "FINISHED",
    "FAILED",
    "KILLED",
    "FRAMEWORK_FAILURE",
    "APP_MASTER_START_FAILED",
    "INITIALIZATION_FAILED",
]

Get the final state of the job.

RETURNS DESCRIPTION
Literal['UNDEFINED', 'FINISHED', 'FAILED', 'KILLED', 'FRAMEWORK_FAILURE', 'APP_MASTER_START_FAILED', 'INITIALIZATION_FAILED']

The final state of the job.

Literal['UNDEFINED', 'FINISHED', 'FAILED', 'KILLED', 'FRAMEWORK_FAILURE', 'APP_MASTER_START_FAILED', 'INITIALIZATION_FAILED']

UNDEFINED indicates that the job is still running.

get_state #

get_state() -> Literal[
    "UNDEFINED",
    "INITIALIZING",
    "INITIALIZATION_FAILED",
    "FINISHED",
    "RUNNING",
    "ACCEPTED",
    "FAILED",
    "KILLED",
    "NEW",
    "NEW_SAVING",
    "SUBMITTED",
    "AGGREGATING_LOGS",
    "FRAMEWORK_FAILURE",
    "STARTING_APP_MASTER",
    "APP_MASTER_START_FAILED",
    "GENERATING_SECURITY_MATERIAL",
    "CONVERTING_NOTEBOOK",
]

Get the state of the job.

RETURNS DESCRIPTION
Literal['UNDEFINED', 'INITIALIZING', 'INITIALIZATION_FAILED', 'FINISHED', 'RUNNING', 'ACCEPTED', 'FAILED', 'KILLED', 'NEW', 'NEW_SAVING', 'SUBMITTED', 'AGGREGATING_LOGS', 'FRAMEWORK_FAILURE', 'STARTING_APP_MASTER', 'APP_MASTER_START_FAILED', 'GENERATING_SECURITY_MATERIAL', 'CONVERTING_NOTEBOOK']

The current state of the job.

Literal['UNDEFINED', 'INITIALIZING', 'INITIALIZATION_FAILED', 'FINISHED', 'RUNNING', 'ACCEPTED', 'FAILED', 'KILLED', 'NEW', 'NEW_SAVING', 'SUBMITTED', 'AGGREGATING_LOGS', 'FRAMEWORK_FAILURE', 'STARTING_APP_MASTER', 'APP_MASTER_START_FAILED', 'GENERATING_SECURITY_MATERIAL', 'CONVERTING_NOTEBOOK']

If no executions are found for the job, a warning is raised and it returns UNDEFINED.

get_url #

get_url()

Get url to the job in Hopsworks.

pause_schedule #

pause_schedule()

Pauses the schedule of a Job execution.

resume_schedule #

resume_schedule()

Resumes the schedule of a Job execution.

run #

run(args: str = None, await_termination: bool = True)

Run the job.

Run the job, by default awaiting its completion, with the option of passing runtime arguments.

Example
# connect to the Feature Store
fs = ...

# get the Feature Group instances
fg = fs.get_or_create_feature_group(...)

# insert in to feature group
job, _ = fg.insert(df, write_options={"start_offline_materialization": False})

# run job
execution = job.run()

# True if job executed successfully
print(execution.success)

# Download logs
out_log_path, err_log_path = execution.download_logs()
PARAMETER DESCRIPTION
args

Optional runtime arguments for the job.

TYPE: str DEFAULT: None

await_termination

Identifies if the client should wait for the job to complete.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION

Execution: The execution object for the submitted run.

save #

save() -> Job

Save the job.

This function should be called after changing a property such as the job configuration to save it persistently.

job.config['appPath'] = "Resources/my_app.py"
job.save()
RETURNS DESCRIPTION
Job

The updated job object.

schedule #

schedule(
    cron_expression: str,
    start_time: datetime = None,
    end_time: datetime = None,
) -> JobSchedule

Schedule the execution of the job.

If a schedule for this job already exists, the method updates it.

# Schedule the job
job.schedule(
    cron_expression="0 */5 * ? * * *",
    start_time=datetime.datetime.now(tz=timezone.utc)
)

# Retrieve the next execution time
print(job.job_schedule.next_execution_date_time)
PARAMETER DESCRIPTION
cron_expression

The quartz cron expression.

TYPE: str

start_time

The schedule start time in UTC. If None, the current time is used. The start_time can be a value in the past.

TYPE: datetime DEFAULT: None

end_time

The schedule end time in UTC. If None, the schedule will continue running indefinitely. The end_time can be a value in the past.

TYPE: datetime DEFAULT: None

RETURNS DESCRIPTION
JobSchedule

The schedule of the job

unschedule #

unschedule()

Unschedule the exceution of a Job.

JobConfiguration #

Configuration of a Hopsworks job.

Each job has a config attribute, which can be used in combination with job.save() to update the job's configuration.