Jobs API#
You can obtain a JobApi handle via Project.get_job_api.
JobApi #
create_job #
Create a new job or update an existing one.
import hopsworks
project = hopsworks.login()
job_api = project.get_job_api()
spark_config = job_api.get_configuration("PYSPARK")
spark_config['appPath'] = "/Resources/my_app.py"
job = job_api.create_job("my_spark_job", spark_config)
| PARAMETER | DESCRIPTION |
|---|---|
name | Name of the job. TYPE: |
config | Configuration of the job. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
job.Job | The created job. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
exists #
get_configuration #
Get configuration for the specific job type.
| PARAMETER | DESCRIPTION |
|---|---|
type | The job type to retrieve the configuration of. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
dict | The default job configuration for the specific job type. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
get_job #
Job #
create_alert #
create_alert(
receiver: str,
status: Literal[
"long_running", "failed", "finished", "killed"
],
severity: Literal["critical", "warning", "info"],
) -> alert.JobAlert
Create an alert for the job.
# Create alert for the job
job.create_alert(
receiver="email",
status="failed",
severity="critical"
)
| PARAMETER | DESCRIPTION |
|---|---|
receiver | The receiver of the alert. TYPE: |
status | The status of the alert. TYPE: |
severity | The severity of the alert. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
alert.JobAlert | The created JobAlert object. |
| RAISES | DESCRIPTION |
|---|---|
ValueError | If |
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
delete #
delete()
Delete the job.
Potentially dangerous operation
This operation deletes the job and all executions.
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
get_alert #
get_alert(alert_id: int) -> alert.JobAlert
Get an alert for the job by ID.
| PARAMETER | DESCRIPTION |
|---|---|
alert_id | ID of the alert. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
alert.JobAlert | The JobAlert object. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
get_alerts #
get_alerts() -> list[alert.JobAlert]
Get all alerts for the job.
| RETURNS | DESCRIPTION |
|---|---|
list[alert.JobAlert] | List of JobAlert objects. |
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
get_executions #
get_executions()
Retrieves all executions for the job ordered by submission time.
| RETURNS | DESCRIPTION |
|---|---|
|
|
| RAISES | DESCRIPTION |
|---|---|
hopsworks.client.exceptions.RestAPIError | If the backend encounters an error when handling the request. |
get_final_state #
get_final_state() -> Literal[
"UNDEFINED",
"FINISHED",
"FAILED",
"KILLED",
"FRAMEWORK_FAILURE",
"APP_MASTER_START_FAILED",
"INITIALIZATION_FAILED",
]
Get the final state of the job.
| RETURNS | DESCRIPTION |
|---|---|
Literal['UNDEFINED', 'FINISHED', 'FAILED', 'KILLED', 'FRAMEWORK_FAILURE', 'APP_MASTER_START_FAILED', 'INITIALIZATION_FAILED'] | The final state of the job. |
Literal['UNDEFINED', 'FINISHED', 'FAILED', 'KILLED', 'FRAMEWORK_FAILURE', 'APP_MASTER_START_FAILED', 'INITIALIZATION_FAILED'] |
|
get_state #
get_state() -> Literal[
"UNDEFINED",
"INITIALIZING",
"INITIALIZATION_FAILED",
"FINISHED",
"RUNNING",
"ACCEPTED",
"FAILED",
"KILLED",
"NEW",
"NEW_SAVING",
"SUBMITTED",
"AGGREGATING_LOGS",
"FRAMEWORK_FAILURE",
"STARTING_APP_MASTER",
"APP_MASTER_START_FAILED",
"GENERATING_SECURITY_MATERIAL",
"CONVERTING_NOTEBOOK",
]
Get the state of the job.
| RETURNS | DESCRIPTION |
|---|---|
Literal['UNDEFINED', 'INITIALIZING', 'INITIALIZATION_FAILED', 'FINISHED', 'RUNNING', 'ACCEPTED', 'FAILED', 'KILLED', 'NEW', 'NEW_SAVING', 'SUBMITTED', 'AGGREGATING_LOGS', 'FRAMEWORK_FAILURE', 'STARTING_APP_MASTER', 'APP_MASTER_START_FAILED', 'GENERATING_SECURITY_MATERIAL', 'CONVERTING_NOTEBOOK'] | The current state of the job. |
Literal['UNDEFINED', 'INITIALIZING', 'INITIALIZATION_FAILED', 'FINISHED', 'RUNNING', 'ACCEPTED', 'FAILED', 'KILLED', 'NEW', 'NEW_SAVING', 'SUBMITTED', 'AGGREGATING_LOGS', 'FRAMEWORK_FAILURE', 'STARTING_APP_MASTER', 'APP_MASTER_START_FAILED', 'GENERATING_SECURITY_MATERIAL', 'CONVERTING_NOTEBOOK'] | If no executions are found for the job, a warning is raised and it returns |
run #
Run the job.
Run the job, by default awaiting its completion, with the option of passing runtime arguments.
Example
# connect to the Feature Store
fs = ...
# get the Feature Group instances
fg = fs.get_or_create_feature_group(...)
# insert in to feature group
job, _ = fg.insert(df, write_options={"start_offline_materialization": False})
# run job
execution = job.run()
# True if job executed successfully
print(execution.success)
# Download logs
out_log_path, err_log_path = execution.download_logs()
| PARAMETER | DESCRIPTION |
|---|---|
args | Optional runtime arguments for the job. TYPE: |
await_termination | Identifies if the client should wait for the job to complete. TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
|
|
save #
save() -> Job
Save the job.
This function should be called after changing a property such as the job configuration to save it persistently.
job.config['appPath'] = "Resources/my_app.py"
job.save()
| RETURNS | DESCRIPTION |
|---|---|
Job | The updated job object. |
schedule #
schedule(
cron_expression: str,
start_time: datetime = None,
end_time: datetime = None,
) -> JobSchedule
Schedule the execution of the job.
If a schedule for this job already exists, the method updates it.
# Schedule the job
job.schedule(
cron_expression="0 */5 * ? * * *",
start_time=datetime.datetime.now(tz=timezone.utc)
)
# Retrieve the next execution time
print(job.job_schedule.next_execution_date_time)
| PARAMETER | DESCRIPTION |
|---|---|
cron_expression | The quartz cron expression. TYPE: |
start_time | The schedule start time in UTC. If TYPE: |
end_time | The schedule end time in UTC. If TYPE: |
| RETURNS | DESCRIPTION |
|---|---|
JobSchedule | The schedule of the job |
JobConfiguration #
Configuration of a Hopsworks job.
Each job has a config attribute, which can be used in combination with job.save() to update the job's configuration.