biomero package
biomero.slurm_client module
- class biomero.slurm_client.SlurmClient(host='slurm', user=None, port=None, config=None, gateway=None, forward_agent=None, connect_timeout=None, connect_kwargs=None, inline_ssh_env=True, slurm_data_path: str = 'my-scratch/data', slurm_images_path: str = 'my-scratch/singularity_images/workflows', slurm_converters_path: str = 'my-scratch/singularity_images/converters', slurm_model_paths: Optional[dict] = None, slurm_model_repos: Optional[dict] = None, slurm_model_images: Optional[dict] = None, converter_images: Optional[dict] = None, slurm_model_jobs: Optional[dict] = None, slurm_model_jobs_params: Optional[dict] = None, slurm_script_path: str = 'slurm-scripts', slurm_script_repo: Optional[str] = None, init_slurm: bool = False, track_workflows: bool = True, enable_job_accounting: bool = True, enable_job_progress: bool = True, enable_workflow_analytics: bool = True, sqlalchemy_url: Optional[str] = None, config_only: bool = False)[source]
Bases:
fabric.connection.Connection
A client for connecting to and interacting with a Slurm cluster over SSH.
This class extends the Connection class, adding methods and attributes specific to working with Slurm.
SlurmClient accepts the same arguments as Connection. Below only mentions the added ones:
The easiest way to set this client up is by using a slurm-config.ini and the from_config() method.
- slurm_images_path
The path to the directory containing the Singularity images for Slurm jobs.
- Type
- slurm_converters_path
The path to the directory containing the Singularity images for file converters.
- Type
- slurm_model_paths
A dictionary containing the paths to the Singularity images for specific Slurm job models.
- Type
- slurm_model_repos
A dictionary containing the git repositories of Singularity images for specific Slurm job models.
- Type
- slurm_model_images
A dictionary containing the dockerhub of the Singularity images for specific Slurm job models. Will fill automatically from the data in the git repository, if you set init_slurm.
- Type
- slurm_script_path
The path to the directory containing the Slurm job submission scripts on Slurm.
- Type
- slurm_script_repo
The git https URL for cloning the repo containing the Slurm job submission scripts. Optional.
- Type
Example
# Create a SlurmClient object as contextmanager
with SlurmClient.from_config() as client:
# Run a command on the remote host
result = client.run(‘sbatch myjob.sh’)
# Check whether the command succeeded
- if result.ok:
print(‘Job submitted successfully!’)
# Print the output of the command
print(result.stdout)
Example 2:
# Create a SlurmClient and setup Slurm (download containers etc.)
with SlurmClient.from_config(init_slurm=True) as client:
client.run_workflow(…)
- check_job_status(slurm_job_ids: List[int], env: Optional[Dict[str, str]] = None) Tuple[Dict[int, str], fabric.runners.Result] [source]
Check the status of Slurm jobs with the given job IDs.
Note: This doesn’t return job arrays individually. It takes the last value returned for those sub ids (generally the one still PENDING).
- Parameters
- Returns
A tuple containing the status per input ID and the result of the command execution.
- Return type
- Raises
SSHException – If the command execution fails or no response is received after multiple retries.
- cleanup_tmp_files(slurm_job_id: str, filename: Optional[str] = None, data_location: Optional[str] = None, logfile: Optional[str] = None) fabric.runners.Result [source]
Cleanup zip and unzipped files/folders associated with a Slurm job.
- Parameters
slurm_job_id (str) – The job ID of the Slurm script.
filename (str) – The zip filename on Slurm.
data_location (str, optional) – The location of data files on Slurm. If not provided, it will be extracted from the log file.
logfile (str, optional) – The log file of the Slurm job. If not provided, a default log file will be used.
- Returns
The result of the cleanup operation.
- Return type
Result
Note
The cleanup process involves removing the specified zip file, the log file, and associated data files and folders.
Example
# Cleanup temporary files for a Slurm job client.cleanup_tmp_files(“12345”, “output.zip”)
- convert_cytype_to_omtype(cytype: str, _default, *args, **kwargs) Any [source]
Convert a Cytomine type to an OMERO type and instantiates it with args/kwargs.
Note that Cytomine has a Python Client, and some conversion methods to python types, but nothing particularly worth depending on that library for yet. Might be useful in the future perhaps. (e.g. https://github.com/Cytomine-ULiege/Cytomine-python-client/ blob/master/cytomine/cytomine_job.py)
- Parameters
cytype (str) – The Cytomine type to convert.
_default – The default value. Required to distinguish between float and int.
*args – Additional positional arguments.
**kwargs – Additional keyword arguments.
- Returns
The converted OMERO type class instance or None if errors occured.
- Return type
Any
- convert_url(input_url: str) str [source]
Convert the input GitHub URL to an output URL that retrieves the ‘descriptor.json’ file in raw format.
- Parameters
input_url (str) – The input GitHub URL.
- Returns
The output URL to the ‘descriptor.json’ file.
- Return type
- Raises
ValueError – If the input URL is not a valid GitHub URL.
- copy_zip_locally(local_tmp_storage: str, filename: str) fabric.transfer.Result [source]
Copy a zip file from Slurm to the local server.
Note about (Transfer)Result:
Unlike similar classes such as invoke.runners.Result or fabric.runners.Result (which have a concept of “warn and return anyways on failure”) this class has no useful truthiness behavior. If a file transfer fails, some exception will be raised, either an OSError or an error from within Paramiko.
- Parameters
local_tmp_storage (String) – Path to store the zip file locally.
filename (String) – Zip filename on Slurm.
- Returns
The result of the scp attempt.
- Return type
TransferResult
- extract_data_location_from_log(slurm_job_id: Optional[str] = None, logfile: Optional[str] = None) str [source]
Read SLURM job logfile to find location of the data.
One of the parameters is required, either id or file.
- extract_job_id(result: fabric.runners.Result) int [source]
Extract the Slurm job ID from the result of a command.
- Parameters
result (Result) – The result of a command execution.
- Returns
The Slurm job ID extracted from the result, or -1 if not found.
- Return type
- extract_parts_from_url(input_url: str) Tuple[List[str], str] [source]
Extract the repository and branch information from the input URL.
- Parameters
input_url (str) – The input GitHub URL.
- Returns
The list of url parts and the branch/version. If no branch is found, it will return “master”
- Return type
- Raises
ValueError – If the input URL is not a valid GitHub URL.
- classmethod from_config(configfile: str = '', init_slurm: bool = False, config_only: bool = False) biomero.slurm_client.SlurmClient [source]
Creates a new SlurmClient object using the parameters read from a configuration file (.ini).
- Defaults paths to look for config files are:
/etc/slurm-config.ini
~/slurm-config.ini
Note that this is only for the SLURM-specific values that we added. Most configuration values are set via configuration mechanisms from Fabric library, like SSH settings being loaded from SSH config, /etc/fabric.yml or environment variables. See Fabric’s documentation for more info on configuration if needed.
- Parameters
- Returns
A new SlurmClient object.
- Return type
- generate_slurm_job_for_workflow(workflow: str, substitutes: Dict[str, str], template: str = 'job_template.sh') str [source]
Generate a Slurm job script for a specific workflow.
- Parameters
- Returns
The generated Slurm job script as a string.
- Return type
- get_active_job_progress(slurm_job_id: str, pattern: str = '\\d+%', env: Optional[Dict[str, str]] = None) Any [source]
Get the progress of an active Slurm job from its logfiles.
- Parameters
- Returns
The progress of the Slurm job according to the pattern, or None.
- Return type
Any
- get_all_image_versions_and_data_files() Tuple[Dict[str, List[str]], List[str]] [source]
Retrieve all available image versions and data files from the Slurm cluster.
- get_cellpose_command(image_version: str, input_data: str, cp_model: str, nuc_channel: int, prob_threshold: float, cell_diameter: float, email: Optional[str] = None, time: Optional[str] = None, use_gpu: bool = True, model: str = 'cellpose') Tuple[str, Dict] [source]
Return the command and environment dictionary to run a CellPose job on the Slurm workload manager. A specific example of using the generic ‘get_workflow_command’.
- Parameters
image_version (str) – The version of the Singularity image to use.
input_data (str) – The name of the input data folder on the shared file system.
cp_model (str) – The name of the CellPose model to use.
nuc_channel (int) – The index of the nuclear channel.
prob_threshold (float) – The probability threshold for nuclei detection.
cell_diameter (float) – The expected cell diameter in pixels.
email (Optional[str]) – The email address to send notifications to. Defaults to None.
time (Optional[str]) – The maximum time for the job to run. Defaults to None.
use_gpu (bool) – Whether to use GPU for the CellPose job. Defaults to True.
model (str) – The name of the folder of the Docker image to use. Defaults to “cellpose”.
- Returns
A tuple containing the Slurm sbatch command and the environment dictionary.
- Return type
- get_conversion_command(data_path: str, config_file: str, source_format: str = 'zarr', target_format: str = 'tiff') Tuple[str, Dict, str, str] [source]
Generate Slurm conversion command and environment variables for data conversion.
- Parameters
- Returns
A tuple containing the Slurm conversion command and the environment variables, followed by the converter image name and version.
- Return type
Warning
The default implementation only supports conversion from ‘zarr’ to ‘tiff’. If using other source or target formats, users must implement and configure additional converters themselves.
- get_image_versions_and_data_files(model: str) Tuple[List[str], List[str]] [source]
Retrieve the available image versions and input data files for a given model.
- Parameters
model (str) – The name of the model to query for.
- Returns
A tuple containing two lists: - The first list includes the available image versions,
sorted in descending order.
The second list includes the available data files.
- Return type
- Raises
ValueError – If the provided model is not found in the SlurmClient’s known model paths.
- get_job_status_command(slurm_job_ids: List[int]) str [source]
Return the Slurm command to get the status of jobs with the given job IDs.
- get_jobs_info_command(start_time: str = '2023-01-01', end_time: str = 'now', columns: str = 'JobId', states: str = 'r,cd,f,to,rs,dl,nf') str [source]
Return the Slurm command to retrieve information about old jobs.
The command will be formatted with the specified start time, which is expected to be in the ISO format “YYYY-MM-DD”. The command will use the “sacct” tool to query the Slurm accounting database for jobs that started on or after the specified start time, and will output only the job IDs (-o JobId) without header or trailer lines (-n -X).
- Parameters
start_time (str) – The start time from which to retrieve job information. Defaults to “2023-01-01”.
end_time (str) – The end time until which to retrieve job information. Defaults to “now”.
columns (str) – The columns to retrieve from the job information. Defaults to “JobId”. It is comma separated, e.g. “JobId,State”.
states (str) – The job states to include in the query. Defaults to “r,cd,f,to,rs,dl,nf”.
- Returns
A string representing the Slurm command to retrieve information about old jobs.
- Return type
- get_logfile_from_slurm(slurm_job_id: str, local_tmp_storage: str = '/tmp/', logfile: Optional[str] = None) Tuple[str, str, fabric.transfer.Result] [source]
Copy the logfile of the given SLURM job to the local server.
Note about (Transfer)Result:
Unlike similar classes such as invoke.runners.Result or fabric.runners.Result (which have a concept of “warn and return anyways on failure”) this class has no useful truthiness behavior. If a file transfer fails, some exception will be raised, either an OSError or an error from within Paramiko.
- Parameters
- Returns
directory, full path of the logfile, and TransferResult
- Return type
Tuple
- get_recent_log_command(log_file: str, n: int = 10) str [source]
Get the command to retrieve the recent log entries from a specified log file.
- get_unzip_command(zipfile: str, filter_filetypes: str = '*.zarr *.tiff *.tif') str [source]
Generate a command string for unzipping a data archive and creating required directories for Slurm jobs.
- Parameters
zipfile (str) – The name of the zip archive file to extract. Without extension.
filter_filetypes (str, optional) – A space-separated string containing the file extensions to extract from the zip file. E.g. defaults to “*.zarr *.tiff *.tif”. Setting this argument to None will omit the file filter and extract all files.
- Returns
The command to extract the specified filetypes from the zip file.
- Return type
- get_update_slurm_scripts_command() str [source]
Generate the command to update the Git repository containing the Slurm scripts, if necessary.
- Returns
A string containing the Git command to update the Slurm scripts.
- Return type
- get_workflow_command(workflow: str, workflow_version: str, input_data: str, email: Optional[str] = None, time: Optional[str] = None, **kwargs) Tuple[str, Dict] [source]
Generate the Slurm workflow command and environment variables.
- Parameters
workflow (str) – The name of the workflow.
workflow_version (str) – The version of the workflow.
input_data (str) – The name of the input data folder containing the input image files.
email (str, optional) – The email address for job notifications. Defaults to None, which defaults to what is in the job script.
time (str, optional) – The time limit for the job in the format HH:MM:SS. Defaults to None, which defaults to what is in the job script.
**kwargs – Additional keyword arguments for the workflow.
- Returns
A tuple containing the Slurm workflow command and the environment variables.
- Return type
Tuple[str, Dict]
- get_workflow_parameters(workflow: str) Dict[str, Dict[str, Any]] [source]
Retrieve the parameters of a workflow.
- Parameters
workflow (str) – The workflow for which to retrieve the parameters.
- Returns
A dictionary containing the workflow parameters.
- Return type
- Raises
ValueError – If an error occurs while retrieving the workflow parameters.
- get_zip_command(data_location: str, filename: str) str [source]
Generate a command string for zipping the data on Slurm.
- init_workflows(force_update: bool = False)[source]
Retrieves the required info for the configured workflows from github. It will fill slurm_model_images with dockerhub links.
- Parameters
force_update (bool) – Will overwrite already given paths in slurm_model_images
- initialize_analytics_system(reset_tables=False)[source]
Initialize the analytics system based on the analytics configuration passed to the constructor.
- Parameters
reset_tables (bool) – If True, drops and recreates all views.
- list_active_jobs(env: Optional[Dict[str, str]] = None) List[str] [source]
Get a list of active jobs from SLURM.
- list_all_jobs(env: Optional[Dict[str, str]] = None) List[str] [source]
Get a list of all jobs from SLURM.
- list_available_converter_versions() Dict [source]
Note, assumes you use versioned converters. Will return a dict with a version of each converter on your Slurm. However, doesn’t work properly with unversioned sif.
- list_completed_jobs(env: Optional[Dict[str, str]] = None) List[str] [source]
Get a list of completed jobs from SLURM.
- parse_docker_image_version(image: str) Tuple[str, str] [source]
Parses the Docker image string to extract the image name and version tag.
- pull_descriptor_from_github(workflow: str) Dict [source]
Pull the workflow descriptor from GitHub.
- Parameters
workflow (str) – The workflow for which to pull the descriptor.
- Returns
The JSON descriptor.
- Return type
Dict
- Raises
ValueError – If an error occurs while pulling the descriptor file.
- run_commands(cmdlist: List[str], env: Optional[Dict[str, str]] = None, sep: str = ' && ', **kwargs) fabric.runners.Result [source]
Run a list of shell commands consecutively on the Slurm cluster, ensuring the success of each before proceeding to the next.
The environment variables can be set using the env argument. These commands retain the same session (environment variables etc.), unlike running them separately.
- Parameters
cmdlist (List[str]) – A list of shell commands to run on Slurm.
env (Dict[str, str], optional) – Optional environment variables to set when running the command. Defaults to None.
sep (str, optional) – The separator used to concatenate the commands. Defaults to ‘ && ‘.
**kwargs – Additional keyword arguments.
- Returns
The result of the last command in the list.
- Return type
Result
- run_commands_split_out(cmdlist: List[str], env: Optional[Dict[str, str]] = None) List[str] [source]
Run a list of shell commands consecutively and split the output of each command.
Each command in the list is executed with a separator in between that is unique and can be used to split the output of each command later. The separator used is specified by the _OUT_SEP attribute of the SlurmClient instance.
- Parameters
- Returns
A list of strings, where each string corresponds to the output of a single command in cmdlist split by the separator _OUT_SEP.
- Return type
List[str]
- Raises
SSHException – If any of the commands fail to execute successfully.
- run_conversion_workflow_job(folder_name: str, source_format: str = 'zarr', target_format: str = 'tiff', wf_id: Optional[uuid.UUID] = None) biomero.slurm_client.SlurmJob [source]
Run the data conversion workflow on Slurm using the given data folder.
- Parameters
- Returns
the conversion job
- Return type
Warning
The default implementation only supports conversion from ‘zarr’ to ‘tiff’. If using other source or target formats, users must implement and configure additional converters themselves.
- run_workflow(workflow_name: str, workflow_version: str, input_data: str, email: Optional[str] = None, time: Optional[str] = None, wf_id: Optional[uuid.UUID] = None, **kwargs) Tuple[fabric.runners.Result, int, uuid.UUID, uuid.UUID] [source]
Run a specified workflow on Slurm using the given parameters.
- Parameters
workflow_name (str) – Name of the workflow to execute.
workflow_version (str) – Version of the workflow (image version on Slurm).
input_data (str) – Name of the input data folder containing input image files.
email (str, optional) – Email address for Slurm job notifications.
time (str, optional) – Time limit for the Slurm job in the format HH:MM:SS.
wf_id (UUID, optional) – Workflow ID for tracking purposes. If not provided, a new one is created.
**kwargs – Additional keyword arguments for the workflow.
- Returns
A tuple containing the result of starting the workflow job, the Slurm job ID, the workflow ID, and the task ID. If the Slurm job ID could not be extracted, it returns -1 for the job ID.
- Return type
Tuple[Result, int, UUID, UUID]
Note
The Slurm job ID is extracted from the result of the run_commands method. If track_workflows is enabled, workflow and task tracking is performed.
- run_workflow_job(workflow_name: str, workflow_version: str, input_data: str, email: Optional[str] = None, time: Optional[str] = None, wf_id: Optional[uuid.UUID] = None, **kwargs) biomero.slurm_client.SlurmJob [source]
Run a specified workflow on Slurm using the given parameters and return a SlurmJob instance.
- Parameters
workflow_name (str) – Name of the workflow to execute.
workflow_version (str) – Version of the workflow (image version on Slurm).
input_data (str) – Name of the input data folder containing input image files.
email (str, optional) – Email address for Slurm job notifications.
time (str, optional) – Time limit for the Slurm job in the format HH:MM:SS.
wf_id (UUID, optional) – Workflow ID for tracking purposes. If not provided, a new one is created.
**kwargs – Additional keyword arguments for the workflow.
- Returns
A SlurmJob instance representing the started workflow job.
- Return type
- setup_container_images()[source]
Sets up container images for Slurm operations.
This function creates specific directories for container images and pulls necessary images from Docker repositories. It generates and executes a script to pull images and copies it to the remote location.
- Raises
SSHException – If there is an issue executing commands or copying files.
- setup_converters()[source]
Sets up converters for Slurm operations.
This function creates necessary directories for converters and copies converter scripts and definitions to the appropriate locations. It also builds Singularity containers from the provided definitions.
- Raises
SSHException – If there is an issue executing commands or copying files.
- setup_directories()[source]
Creates necessary directories for Slurm operations.
This function creates directories for data storage, scripts, and workflows as specified in the SlurmClient object.
- Raises
SSHException – If there is an issue executing directory creation commands.
- setup_job_scripts()[source]
Sets up job scripts for Slurm operations.
This function either clones a Git repository containing job scripts into the specified script path or generates scripts locally if no repository is provided.
- Raises
SSHException – If there is an issue executing Git commands or generating scripts.
- setup_slurm()[source]
Validates or creates the required setup on the Slurm cluster.
- Raises
SSHException – if it cannot connect to Slurm, or runs into an error
- str_to_class(module_name: str, class_name: str, *args, **kwargs)[source]
Return a class instance from a string reference.
- Parameters
- Returns
- An instance of the specified class, or None if the class or
module does not exist.
- Return type
- transfer_data(local_path: str) fabric.runners.Result [source]
Transfers a file or directory from the local machine to the remote Slurm cluster.
- Parameters
local_path (str) – The local path to the file or directory to transfer.
- Returns
The result of the file transfer operation.
- Return type
Result
- unpack_data(zipfile: str, env: Optional[Dict[str, str]] = None) fabric.runners.Result [source]
Unpacks a zipped file on the remote Slurm cluster.
- update_slurm_scripts(generate_jobs: bool = False, env: Optional[Dict[str, str]] = None) fabric.runners.Result [source]
Updates the local copy of the Slurm job submission scripts.
This function pulls the latest version of the scripts from the Git repository and copies them to the slurm_script_path directory. Alternatively, it can generate scripts from a template. This is the default behavior if no Git repo is provided or can be forced via the generate_jobs parameter.
- Parameters
- Returns
The result of the command.
- Return type
Result
- validate(validate_slurm_setup: bool = False)[source]
Validate the connection to the Slurm cluster by running a simple command.
- workflow_params_to_envvars(**kwargs) Dict [source]
Convert workflow parameters to environment variables.
- Parameters
**kwargs – Additional keyword arguments for the workflow.
- Returns
A dictionary containing the environment variables.
- Return type
Dict
- workflow_params_to_subs(params) Dict[str, str] [source]
Convert workflow parameters to substitution dictionary for job script.
- class biomero.slurm_client.SlurmJob(submit_result: fabric.runners.Result, job_id: int, wf_id: uuid.UUID, task_id: uuid.UUID, slurm_polling_interval: int = 10)[source]
Bases:
object
Represents a job submitted to a Slurm cluster.
This class encapsulates information and methods related to managing a job submitted to a Slurm cluster. It provides functionality to monitor the job’s state, wait for completion, and perform cleanup operations.
- submit_result
The result of submitting the job.
- Type
Result
- wf_id
The workflow ID associated with the job.
- Type
UUID
- task_id
The task ID within the workflow.
- Type
UUID
Example
# Submit some job with the SlurmClient submit_result, job_id, wf_id, task_id = slurmClient.run_workflow(
workflow_name, workflow_version, input_data, email, time, wf_id, **kwargs)
# Create a SlurmJob instance slurmJob = SlurmJob(submit_result, job_id, wf_id, task_id)
- if not slurmJob.ok:
logger.warning(f”Error with job: {slurmJob.get_error()}”)
- else:
- try:
slurmJob.wait_for_completion(slurmClient, conn) if not slurmJob.completed():
raise Exception(f”Job is not completed: {slurmJob}”)
- else:
slurmJob.cleanup(slurmClient)
- except Exception as e:
logger.error(f” ERROR WITH JOB: {e}”) raise e
- SLURM_POLLING_INTERVAL = 10
- cleanup(slurmClient) fabric.runners.Result [source]
Cleanup remaining log files.
- Parameters
slurmClient – The Slurm client.
- Returns
The result of the cleanup operation.
- Return type
Result
- completed()[source]
Check if the Slurm job has completed successfully.
- Returns
True if the job has completed; False otherwise.
- Return type