biomero package

biomero.slurm_client module

class biomero.slurm_client.SlurmClient(host='slurm', user=None, port=None, config=None, gateway=None, forward_agent=None, connect_timeout=None, connect_kwargs=None, inline_ssh_env=True, slurm_data_path: str = 'my-scratch/data', slurm_images_path: str = 'my-scratch/singularity_images/workflows', slurm_converters_path: str = 'my-scratch/singularity_images/converters', slurm_model_paths: Optional[dict] = None, slurm_model_repos: Optional[dict] = None, slurm_model_images: Optional[dict] = None, converter_images: Optional[dict] = None, slurm_model_jobs: Optional[dict] = None, slurm_model_jobs_params: Optional[dict] = None, slurm_script_path: str = 'slurm-scripts', slurm_script_repo: Optional[str] = None, init_slurm: bool = False, track_workflows: bool = True, enable_job_accounting: bool = True, enable_job_progress: bool = True, enable_workflow_analytics: bool = True, sqlalchemy_url: Optional[str] = None, config_only: bool = False)[source]

Bases: fabric.connection.Connection

A client for connecting to and interacting with a Slurm cluster over SSH.

This class extends the Connection class, adding methods and attributes specific to working with Slurm.

SlurmClient accepts the same arguments as Connection. Below only mentions the added ones:

The easiest way to set this client up is by using a slurm-config.ini and the from_config() method.

slurm_data_path

The path to the directory containing the data files for Slurm jobs.

Type

str

slurm_images_path

The path to the directory containing the Singularity images for Slurm jobs.

Type

str

slurm_converters_path

The path to the directory containing the Singularity images for file converters.

Type

str

slurm_model_paths

A dictionary containing the paths to the Singularity images for specific Slurm job models.

Type

dict

slurm_model_repos

A dictionary containing the git repositories of Singularity images for specific Slurm job models.

Type

dict

slurm_model_images

A dictionary containing the dockerhub of the Singularity images for specific Slurm job models. Will fill automatically from the data in the git repository, if you set init_slurm.

Type

dict

slurm_script_path

The path to the directory containing the Slurm job submission scripts on Slurm.

Type

str

slurm_script_repo

The git https URL for cloning the repo containing the Slurm job submission scripts. Optional.

Type

str

Example

# Create a SlurmClient object as contextmanager

with SlurmClient.from_config() as client:

# Run a command on the remote host

result = client.run(‘sbatch myjob.sh’)

# Check whether the command succeeded

if result.ok:

print(‘Job submitted successfully!’)

# Print the output of the command

print(result.stdout)

Example 2:

# Create a SlurmClient and setup Slurm (download containers etc.)

with SlurmClient.from_config(init_slurm=True) as client:

client.run_workflow(…)

bring_listener_uptodate(listener, start=1)[source]
check_job_status(slurm_job_ids: List[int], env: Optional[Dict[str, str]] = None) Tuple[Dict[int, str], fabric.runners.Result][source]

Check the status of Slurm jobs with the given job IDs.

Note: This doesn’t return job arrays individually. It takes the last value returned for those sub ids (generally the one still PENDING).

Parameters
  • slurm_job_ids (List[int]) – The job IDs of the Slurm jobs to check.

  • env (Dict[str, str], optional) – Optional environment variables to set when running the command. Defaults to None.

Returns

A tuple containing the status per input ID and the result of the command execution.

Return type

Tuple[Dict[int, str], Result]

Raises

SSHException – If the command execution fails or no response is received after multiple retries.

cleanup_tmp_files(slurm_job_id: str, filename: Optional[str] = None, data_location: Optional[str] = None, logfile: Optional[str] = None) fabric.runners.Result[source]

Cleanup zip and unzipped files/folders associated with a Slurm job.

Parameters
  • slurm_job_id (str) – The job ID of the Slurm script.

  • filename (str) – The zip filename on Slurm.

  • data_location (str, optional) – The location of data files on Slurm. If not provided, it will be extracted from the log file.

  • logfile (str, optional) – The log file of the Slurm job. If not provided, a default log file will be used.

Returns

The result of the cleanup operation.

Return type

Result

Note

The cleanup process involves removing the specified zip file, the log file, and associated data files and folders.

Example

# Cleanup temporary files for a Slurm job client.cleanup_tmp_files(“12345”, “output.zip”)

convert_cytype_to_omtype(cytype: str, _default, *args, **kwargs) Any[source]

Convert a Cytomine type to an OMERO type and instantiates it with args/kwargs.

Note that Cytomine has a Python Client, and some conversion methods to python types, but nothing particularly worth depending on that library for yet. Might be useful in the future perhaps. (e.g. https://github.com/Cytomine-ULiege/Cytomine-python-client/ blob/master/cytomine/cytomine_job.py)

Parameters
  • cytype (str) – The Cytomine type to convert.

  • _default – The default value. Required to distinguish between float and int.

  • *args – Additional positional arguments.

  • **kwargs – Additional keyword arguments.

Returns

The converted OMERO type class instance or None if errors occured.

Return type

Any

convert_url(input_url: str) str[source]

Convert the input GitHub URL to an output URL that retrieves the ‘descriptor.json’ file in raw format.

Parameters

input_url (str) – The input GitHub URL.

Returns

The output URL to the ‘descriptor.json’ file.

Return type

str

Raises

ValueError – If the input URL is not a valid GitHub URL.

copy_zip_locally(local_tmp_storage: str, filename: str) fabric.transfer.Result[source]

Copy a zip file from Slurm to the local server.

Note about (Transfer)Result:

Unlike similar classes such as invoke.runners.Result or fabric.runners.Result (which have a concept of “warn and return anyways on failure”) this class has no useful truthiness behavior. If a file transfer fails, some exception will be raised, either an OSError or an error from within Paramiko.

Parameters
  • local_tmp_storage (String) – Path to store the zip file locally.

  • filename (String) – Zip filename on Slurm.

Returns

The result of the scp attempt.

Return type

TransferResult

extract_data_location_from_log(slurm_job_id: Optional[str] = None, logfile: Optional[str] = None) str[source]

Read SLURM job logfile to find location of the data.

One of the parameters is required, either id or file.

Parameters
  • slurm_job_id (str) – Id of the slurm job

  • logfile (str) – Path to the logfile

Returns

Data location according to the log

Return type

str

Raises

SSHException – If there is an issue with the command execution.

extract_job_id(result: fabric.runners.Result) int[source]

Extract the Slurm job ID from the result of a command.

Parameters

result (Result) – The result of a command execution.

Returns

The Slurm job ID extracted from the result, or -1 if not found.

Return type

int

extract_parts_from_url(input_url: str) Tuple[List[str], str][source]

Extract the repository and branch information from the input URL.

Parameters

input_url (str) – The input GitHub URL.

Returns

The list of url parts and the branch/version. If no branch is found, it will return “master”

Return type

Tuple[List[str], str]

Raises

ValueError – If the input URL is not a valid GitHub URL.

classmethod from_config(configfile: str = '', init_slurm: bool = False, config_only: bool = False) biomero.slurm_client.SlurmClient[source]

Creates a new SlurmClient object using the parameters read from a configuration file (.ini).

Defaults paths to look for config files are:
  • /etc/slurm-config.ini

  • ~/slurm-config.ini

Note that this is only for the SLURM-specific values that we added. Most configuration values are set via configuration mechanisms from Fabric library, like SSH settings being loaded from SSH config, /etc/fabric.yml or environment variables. See Fabric’s documentation for more info on configuration if needed.

Parameters
  • configfile (str) – The path to your configuration file. Optional.

  • init_slurm (bool) – Initiate / validate slurm setup. Optional Might take some time the first time with downloading, etc.

Returns

A new SlurmClient object.

Return type

SlurmClient

generate_slurm_job_for_workflow(workflow: str, substitutes: Dict[str, str], template: str = 'job_template.sh') str[source]

Generate a Slurm job script for a specific workflow.

Parameters
  • workflow (str) – The name of the workflow.

  • substitutes (Dict[str, str]) – A dictionary containing key-value pairs for substituting placeholders in the job template.

  • template (str, optional) – The filename of the job template. Defaults to “job_template.sh”.

Returns

The generated Slurm job script as a string.

Return type

str

get_active_job_progress(slurm_job_id: str, pattern: str = '\\d+%', env: Optional[Dict[str, str]] = None) Any[source]

Get the progress of an active Slurm job from its logfiles.

Parameters
  • slurm_job_id (str) – The ID of the Slurm job.

  • pattern (str) – The pattern to match in the job log to extract the progress (default: r”d+%”).

  • env (Dict[str, str], optional) – Optional environment variables to set when running the command. Defaults to None.

Returns

The progress of the Slurm job according to the pattern, or None.

Return type

Any

get_all_image_versions_and_data_files() Tuple[Dict[str, List[str]], List[str]][source]

Retrieve all available image versions and data files from the Slurm cluster.

Returns

A tuple containing: - A dictionary mapping models to available versions. - A list of available input data folders.

Return type

Tuple[Dict[str, List[str]], List[str]]

get_cellpose_command(image_version: str, input_data: str, cp_model: str, nuc_channel: int, prob_threshold: float, cell_diameter: float, email: Optional[str] = None, time: Optional[str] = None, use_gpu: bool = True, model: str = 'cellpose') Tuple[str, Dict][source]

Return the command and environment dictionary to run a CellPose job on the Slurm workload manager. A specific example of using the generic ‘get_workflow_command’.

Parameters
  • image_version (str) – The version of the Singularity image to use.

  • input_data (str) – The name of the input data folder on the shared file system.

  • cp_model (str) – The name of the CellPose model to use.

  • nuc_channel (int) – The index of the nuclear channel.

  • prob_threshold (float) – The probability threshold for nuclei detection.

  • cell_diameter (float) – The expected cell diameter in pixels.

  • email (Optional[str]) – The email address to send notifications to. Defaults to None.

  • time (Optional[str]) – The maximum time for the job to run. Defaults to None.

  • use_gpu (bool) – Whether to use GPU for the CellPose job. Defaults to True.

  • model (str) – The name of the folder of the Docker image to use. Defaults to “cellpose”.

Returns

A tuple containing the Slurm sbatch command and the environment dictionary.

Return type

Tuple[str, dict]

get_conversion_command(data_path: str, config_file: str, source_format: str = 'zarr', target_format: str = 'tiff') Tuple[str, Dict, str, str][source]

Generate Slurm conversion command and environment variables for data conversion.

Parameters
  • data_path (str) – Path to the data folder.

  • config_file (str) – Path to the configuration file.

  • source_format (str) – Source data format (default is ‘zarr’).

  • target_format (str) – Target data format (default is ‘tiff’).

Returns

A tuple containing the Slurm conversion command and the environment variables, followed by the converter image name and version.

Return type

Tuple[str, Dict, str, str]

Warning

The default implementation only supports conversion from ‘zarr’ to ‘tiff’. If using other source or target formats, users must implement and configure additional converters themselves.

get_image_versions_and_data_files(model: str) Tuple[List[str], List[str]][source]

Retrieve the available image versions and input data files for a given model.

Parameters

model (str) – The name of the model to query for.

Returns

A tuple containing two lists: - The first list includes the available image versions,

sorted in descending order.

  • The second list includes the available data files.

Return type

Tuple[List[str], List[str]]

Raises

ValueError – If the provided model is not found in the SlurmClient’s known model paths.

get_job_status_command(slurm_job_ids: List[int]) str[source]

Return the Slurm command to get the status of jobs with the given job IDs.

Parameters

slurm_job_ids (List[int]) – The job IDs of the jobs to check.

Returns

The Slurm command to get the status of the jobs.

Return type

str

get_jobs_info_command(start_time: str = '2023-01-01', end_time: str = 'now', columns: str = 'JobId', states: str = 'r,cd,f,to,rs,dl,nf') str[source]

Return the Slurm command to retrieve information about old jobs.

The command will be formatted with the specified start time, which is expected to be in the ISO format “YYYY-MM-DD”. The command will use the “sacct” tool to query the Slurm accounting database for jobs that started on or after the specified start time, and will output only the job IDs (-o JobId) without header or trailer lines (-n -X).

Parameters
  • start_time (str) – The start time from which to retrieve job information. Defaults to “2023-01-01”.

  • end_time (str) – The end time until which to retrieve job information. Defaults to “now”.

  • columns (str) – The columns to retrieve from the job information. Defaults to “JobId”. It is comma separated, e.g. “JobId,State”.

  • states (str) – The job states to include in the query. Defaults to “r,cd,f,to,rs,dl,nf”.

Returns

A string representing the Slurm command to retrieve information about old jobs.

Return type

str

get_listeners(runner)[source]
get_logfile_from_slurm(slurm_job_id: str, local_tmp_storage: str = '/tmp/', logfile: Optional[str] = None) Tuple[str, str, fabric.transfer.Result][source]

Copy the logfile of the given SLURM job to the local server.

Note about (Transfer)Result:

Unlike similar classes such as invoke.runners.Result or fabric.runners.Result (which have a concept of “warn and return anyways on failure”) this class has no useful truthiness behavior. If a file transfer fails, some exception will be raised, either an OSError or an error from within Paramiko.

Parameters
  • slurm_job_id (str) – The ID of the SLURM job.

  • local_tmp_storage (str, optional) – Path to store the logfile locally. Defaults to “/tmp/”.

  • logfile (str, optional) – Path to the logfile on the SLURM server. Defaults to None.

Returns

directory, full path of the logfile, and TransferResult

Return type

Tuple

get_or_create_github_session()[source]
get_recent_log_command(log_file: str, n: int = 10) str[source]

Get the command to retrieve the recent log entries from a specified log file.

Parameters
  • log_file (str) – The path to the log file.

  • n (int, optional) – The number of recent log entries to retrieve. Defaults to 10.

Returns

The command to retrieve the recent log entries.

Return type

str

get_unzip_command(zipfile: str, filter_filetypes: str = '*.zarr *.tiff *.tif') str[source]

Generate a command string for unzipping a data archive and creating required directories for Slurm jobs.

Parameters
  • zipfile (str) – The name of the zip archive file to extract. Without extension.

  • filter_filetypes (str, optional) – A space-separated string containing the file extensions to extract from the zip file. E.g. defaults to “*.zarr *.tiff *.tif”. Setting this argument to None will omit the file filter and extract all files.

Returns

The command to extract the specified filetypes from the zip file.

Return type

str

get_update_slurm_scripts_command() str[source]

Generate the command to update the Git repository containing the Slurm scripts, if necessary.

Returns

A string containing the Git command to update the Slurm scripts.

Return type

str

get_workflow_command(workflow: str, workflow_version: str, input_data: str, email: Optional[str] = None, time: Optional[str] = None, **kwargs) Tuple[str, Dict][source]

Generate the Slurm workflow command and environment variables.

Parameters
  • workflow (str) – The name of the workflow.

  • workflow_version (str) – The version of the workflow.

  • input_data (str) – The name of the input data folder containing the input image files.

  • email (str, optional) – The email address for job notifications. Defaults to None, which defaults to what is in the job script.

  • time (str, optional) – The time limit for the job in the format HH:MM:SS. Defaults to None, which defaults to what is in the job script.

  • **kwargs – Additional keyword arguments for the workflow.

Returns

A tuple containing the Slurm workflow command and the environment variables.

Return type

Tuple[str, Dict]

get_workflow_parameters(workflow: str) Dict[str, Dict[str, Any]][source]

Retrieve the parameters of a workflow.

Parameters

workflow (str) – The workflow for which to retrieve the parameters.

Returns

A dictionary containing the workflow parameters.

Return type

Dict[str, Dict[str, Any]]

Raises

ValueError – If an error occurs while retrieving the workflow parameters.

get_zip_command(data_location: str, filename: str) str[source]

Generate a command string for zipping the data on Slurm.

Parameters
  • data_location (str) – The folder to be zipped.

  • filename (str) – The name of the zip archive file to extract. Without extension.

Returns

The command to create the zip file.

Return type

str

init_workflows(force_update: bool = False)[source]

Retrieves the required info for the configured workflows from github. It will fill slurm_model_images with dockerhub links.

Parameters

force_update (bool) – Will overwrite already given paths in slurm_model_images

initialize_analytics_system(reset_tables=False)[source]

Initialize the analytics system based on the analytics configuration passed to the constructor.

Parameters

reset_tables (bool) – If True, drops and recreates all views.

list_active_jobs(env: Optional[Dict[str, str]] = None) List[str][source]

Get a list of active jobs from SLURM.

Parameters

env (Dict[str, str], optional) – Optional environment variables to set when running the command. Defaults to None.

Returns

A list of job IDs.

Return type

List[str]

list_all_jobs(env: Optional[Dict[str, str]] = None) List[str][source]

Get a list of all jobs from SLURM.

Parameters

env (Dict[str, str], optional) – Optional environment variables to set when running the command. Defaults to None.

Returns

A list of job IDs.

Return type

List[str]

list_available_converter_versions() Dict[source]

Note, assumes you use versioned converters. Will return a dict with a version of each converter on your Slurm. However, doesn’t work properly with unversioned sif.

list_completed_jobs(env: Optional[Dict[str, str]] = None) List[str][source]

Get a list of completed jobs from SLURM.

Parameters

env (Dict[str, str], optional) – Optional environment variables to set when running the command. Defaults to None.

Returns

A list of job IDs.

Return type

List[str]

parse_docker_image_version(image: str) Tuple[str, str][source]

Parses the Docker image string to extract the image name and version tag.

Parameters

image (str) – The Docker image string in the format ‘image_name:version’.

Returns

A tuple containing:
  • version (str or None): The version tag if present, otherwise None.

  • image_name (str): The image name if matched, otherwise the original image string.

Return type

tuple

pull_descriptor_from_github(workflow: str) Dict[source]

Pull the workflow descriptor from GitHub.

Parameters

workflow (str) – The workflow for which to pull the descriptor.

Returns

The JSON descriptor.

Return type

Dict

Raises

ValueError – If an error occurs while pulling the descriptor file.

run_commands(cmdlist: List[str], env: Optional[Dict[str, str]] = None, sep: str = ' && ', **kwargs) fabric.runners.Result[source]

Run a list of shell commands consecutively on the Slurm cluster, ensuring the success of each before proceeding to the next.

The environment variables can be set using the env argument. These commands retain the same session (environment variables etc.), unlike running them separately.

Parameters
  • cmdlist (List[str]) – A list of shell commands to run on Slurm.

  • env (Dict[str, str], optional) – Optional environment variables to set when running the command. Defaults to None.

  • sep (str, optional) – The separator used to concatenate the commands. Defaults to ‘ && ‘.

  • **kwargs – Additional keyword arguments.

Returns

The result of the last command in the list.

Return type

Result

run_commands_split_out(cmdlist: List[str], env: Optional[Dict[str, str]] = None) List[str][source]

Run a list of shell commands consecutively and split the output of each command.

Each command in the list is executed with a separator in between that is unique and can be used to split the output of each command later. The separator used is specified by the _OUT_SEP attribute of the SlurmClient instance.

Parameters
  • cmdlist (List[str]) – A list of shell commands to run.

  • env (Dict[str, str], optional) – Optional environment variables to set when running the command. Defaults to None.

Returns

A list of strings, where each string corresponds to the output of a single command in cmdlist split by the separator _OUT_SEP.

Return type

List[str]

Raises

SSHException – If any of the commands fail to execute successfully.

run_conversion_workflow_job(folder_name: str, source_format: str = 'zarr', target_format: str = 'tiff', wf_id: Optional[uuid.UUID] = None) biomero.slurm_client.SlurmJob[source]

Run the data conversion workflow on Slurm using the given data folder.

Parameters
  • folder_name (str) – The name of the data folder containing source format files.

  • source_format (str) – Source data format for conversion (default is ‘zarr’).

  • target_format (str) – Target data format after conversion (default is ‘tiff’).

Returns

the conversion job

Return type

SlurmJob

Warning

The default implementation only supports conversion from ‘zarr’ to ‘tiff’. If using other source or target formats, users must implement and configure additional converters themselves.

run_workflow(workflow_name: str, workflow_version: str, input_data: str, email: Optional[str] = None, time: Optional[str] = None, wf_id: Optional[uuid.UUID] = None, **kwargs) Tuple[fabric.runners.Result, int, uuid.UUID, uuid.UUID][source]

Run a specified workflow on Slurm using the given parameters.

Parameters
  • workflow_name (str) – Name of the workflow to execute.

  • workflow_version (str) – Version of the workflow (image version on Slurm).

  • input_data (str) – Name of the input data folder containing input image files.

  • email (str, optional) – Email address for Slurm job notifications.

  • time (str, optional) – Time limit for the Slurm job in the format HH:MM:SS.

  • wf_id (UUID, optional) – Workflow ID for tracking purposes. If not provided, a new one is created.

  • **kwargs – Additional keyword arguments for the workflow.

Returns

A tuple containing the result of starting the workflow job, the Slurm job ID, the workflow ID, and the task ID. If the Slurm job ID could not be extracted, it returns -1 for the job ID.

Return type

Tuple[Result, int, UUID, UUID]

Note

The Slurm job ID is extracted from the result of the run_commands method. If track_workflows is enabled, workflow and task tracking is performed.

run_workflow_job(workflow_name: str, workflow_version: str, input_data: str, email: Optional[str] = None, time: Optional[str] = None, wf_id: Optional[uuid.UUID] = None, **kwargs) biomero.slurm_client.SlurmJob[source]

Run a specified workflow on Slurm using the given parameters and return a SlurmJob instance.

Parameters
  • workflow_name (str) – Name of the workflow to execute.

  • workflow_version (str) – Version of the workflow (image version on Slurm).

  • input_data (str) – Name of the input data folder containing input image files.

  • email (str, optional) – Email address for Slurm job notifications.

  • time (str, optional) – Time limit for the Slurm job in the format HH:MM:SS.

  • wf_id (UUID, optional) – Workflow ID for tracking purposes. If not provided, a new one is created.

  • **kwargs – Additional keyword arguments for the workflow.

Returns

A SlurmJob instance representing the started workflow job.

Return type

SlurmJob

setup_container_images()[source]

Sets up container images for Slurm operations.

This function creates specific directories for container images and pulls necessary images from Docker repositories. It generates and executes a script to pull images and copies it to the remote location.

Raises

SSHException – If there is an issue executing commands or copying files.

setup_converters()[source]

Sets up converters for Slurm operations.

This function creates necessary directories for converters and copies converter scripts and definitions to the appropriate locations. It also builds Singularity containers from the provided definitions.

Raises

SSHException – If there is an issue executing commands or copying files.

setup_directories()[source]

Creates necessary directories for Slurm operations.

This function creates directories for data storage, scripts, and workflows as specified in the SlurmClient object.

Raises

SSHException – If there is an issue executing directory creation commands.

setup_job_scripts()[source]

Sets up job scripts for Slurm operations.

This function either clones a Git repository containing job scripts into the specified script path or generates scripts locally if no repository is provided.

Raises

SSHException – If there is an issue executing Git commands or generating scripts.

setup_listeners(runner, reset_tables)[source]
setup_slurm()[source]

Validates or creates the required setup on the Slurm cluster.

Raises

SSHException – if it cannot connect to Slurm, or runs into an error

str_to_class(module_name: str, class_name: str, *args, **kwargs)[source]

Return a class instance from a string reference.

Parameters
  • module_name (str) – The name of the module.

  • class_name (str) – The name of the class.

  • *args – Additional positional arguments for the class constructor.

  • **kwargs – Additional keyword arguments for the class constructor.

Returns

An instance of the specified class, or None if the class or

module does not exist.

Return type

object

transfer_data(local_path: str) fabric.runners.Result[source]

Transfers a file or directory from the local machine to the remote Slurm cluster.

Parameters

local_path (str) – The local path to the file or directory to transfer.

Returns

The result of the file transfer operation.

Return type

Result

unpack_data(zipfile: str, env: Optional[Dict[str, str]] = None) fabric.runners.Result[source]

Unpacks a zipped file on the remote Slurm cluster.

Parameters
  • zipfile (str) – The name of the zipped file to be unpacked.

  • env (Dict[str, str], optional) – Optional environment variables to set when running the command. Defaults to None.

Returns

The result of the command.

Return type

Result

update_slurm_scripts(generate_jobs: bool = False, env: Optional[Dict[str, str]] = None) fabric.runners.Result[source]

Updates the local copy of the Slurm job submission scripts.

This function pulls the latest version of the scripts from the Git repository and copies them to the slurm_script_path directory. Alternatively, it can generate scripts from a template. This is the default behavior if no Git repo is provided or can be forced via the generate_jobs parameter.

Parameters
  • generate_jobs (bool) – Whether to generate new slurm job scripts INSTEAD (of pulling from git). Defaults to False, except if no slurm_script_repo is configured.

  • env (Dict[str, str], optional) – Optional environment variables to set when running the command. Defaults to None.

Returns

The result of the command.

Return type

Result

validate(validate_slurm_setup: bool = False)[source]

Validate the connection to the Slurm cluster by running a simple command.

Parameters

validate_slurm_setup (bool) – Whether to also check and fix the Slurm setup (folders, images, etc.)

Returns

True if the validation is successful, False otherwise.

Return type

bool

workflow_params_to_envvars(**kwargs) Dict[source]

Convert workflow parameters to environment variables.

Parameters

**kwargs – Additional keyword arguments for the workflow.

Returns

A dictionary containing the environment variables.

Return type

Dict

workflow_params_to_subs(params) Dict[str, str][source]

Convert workflow parameters to substitution dictionary for job script.

Parameters

params – A dictionary containing workflow parameters.

Returns

A dictionary with parameter names as keys and

corresponding flags with placeholders as values.

Return type

Dict[str, str]

zip_data_on_slurm_server(data_location: str, filename: str, env: Optional[Dict[str, str]] = None) fabric.runners.Result[source]

Zip the output folder of a job on Slurm

Parameters
  • data_location (String) – Folder on SLURM with the “data/out” subfolder.

  • filename (String) – Name to give to the zipfile.

  • env (Dict[str, str], optional) – Optional environment variables to set when running the command. Defaults to None.

Returns

The result of the zip attempt.

Return type

Result

class biomero.slurm_client.SlurmJob(submit_result: fabric.runners.Result, job_id: int, wf_id: uuid.UUID, task_id: uuid.UUID, slurm_polling_interval: int = 10)[source]

Bases: object

Represents a job submitted to a Slurm cluster.

This class encapsulates information and methods related to managing a job submitted to a Slurm cluster. It provides functionality to monitor the job’s state, wait for completion, and perform cleanup operations.

job_id

The ID of the Slurm job.

Type

int

submit_result

The result of submitting the job.

Type

Result

ok

Indicates whether the job submission was successful.

Type

bool

job_state

The current state of the Slurm job.

Type

str

progress

The progress of the Slurm job.

Type

str

error_message

The error message, if any, encountered during job submission.

Type

str

wf_id

The workflow ID associated with the job.

Type

UUID

task_id

The task ID within the workflow.

Type

UUID

slurm_polling_interval

The polling interval (in seconds) for checking the job status.

Type

int

Example

# Submit some job with the SlurmClient submit_result, job_id, wf_id, task_id = slurmClient.run_workflow(

workflow_name, workflow_version, input_data, email, time, wf_id, **kwargs)

# Create a SlurmJob instance slurmJob = SlurmJob(submit_result, job_id, wf_id, task_id)

if not slurmJob.ok:

logger.warning(f”Error with job: {slurmJob.get_error()}”)

else:
try:

slurmJob.wait_for_completion(slurmClient, conn) if not slurmJob.completed():

raise Exception(f”Job is not completed: {slurmJob}”)

else:

slurmJob.cleanup(slurmClient)

except Exception as e:

logger.error(f” ERROR WITH JOB: {e}”) raise e

SLURM_POLLING_INTERVAL = 10
cleanup(slurmClient) fabric.runners.Result[source]

Cleanup remaining log files.

Parameters

slurmClient – The Slurm client.

Returns

The result of the cleanup operation.

Return type

Result

completed()[source]

Check if the Slurm job has completed successfully.

Returns

True if the job has completed; False otherwise.

Return type

bool

get_error() str[source]

Get the error message associated with the Slurm job submission.

Returns

The error message, or an empty string if no error occurred.

Return type

str

wait_for_completion(slurmClient, omeroConn) str[source]

Wait for the Slurm job to reach completion, cancellation, failure, or timeout.

Parameters
  • slurmClient – The Slurm client.

  • omeroConn – The OMERO connection.

Returns

The final state of the Slurm job.

Return type

str