Cromwell#
The CPG uses Cromwell to run workflows. This module provides a Python interface to Cromwell, allowing you to submit workflows, monitor their progress, and retrieve their outputs from Hail Batch.
Cromwell module contains helper code for submitting + watching jobs from within Hail batch.
- class cpg_utils.cromwell.CromwellOutputType(name: str, copy_file_into_batch: bool, array_length: int | None, resource_group: dict[str, Any] | None = None)[source]#
Declares output type for cromwell -> hail batch glue
- static array(name: str, length: int) CromwellOutputType[source]#
Array of simple files
- static array_path(name: str, length: int) CromwellOutputType[source]#
Return a list of file paths of the outputs (one path per file)
- static array_resource_group(name: str, length: int, resource_group: dict[str, Any]) CromwellOutputType[source]#
Select an array of resource groups. In this case, the outputs you select within the resource group are zipped. Resource_group has the format:
{<read-group-name>: <corresponding-output-in-cromwell>}
Eg: outputs_to_collect={
- “<this-key-only-exists-in-output-dict>”: CromwellOutputType.array_resource_group({
‘bam’: ‘hello.output_bams’, ‘bai’: ‘hello.output_bam_indexes’
}, length=2)
}
# You get # {“<this-key-only-exists-in-output-dict>”: [__resource_group1, __resource_group2]}
- static single(name: str) CromwellOutputType[source]#
Single file
- static single_path(name: str) CromwellOutputType[source]#
Return the file path of the output in a file
- static single_resource_group(name: str, resource_group: dict[str, Any]) CromwellOutputType[source]#
- Specify a resource group you want to return, where resource_group has the format:
{<read-group-name>: <corresponding-output-in-cromwell>}
Eg: outputs_to_collect={
- “<this-key-only-exists-in-output-dict>”: CromwellOutputType.single_resource_group({
# The hello workflow has two outputs: output_bam, output_bam_index ‘bam’: ‘hello.output_bam’, ‘bai’: ‘hello.output_bam_index’
})
}
- cpg_utils.cromwell.get_cromwell_oauth_token()[source]#
Get oath token for cromwell, specific to audience
- cpg_utils.cromwell.run_cromwell_workflow(job: BashJob, dataset: str, access_level: str, workflow: str, cwd: str | None, libs: list[str], output_prefix: str, labels: dict[str, str] | None = None, input_dict: dict[str, Any] | None = None, input_paths: list[str] | None = None, project: str | None = None, copy_outputs_to_gcp: bool = True, ar_guid_override: str | None = None, backend: CromwellBackend = CromwellBackend.batch)[source]#
Run a cromwell workflow, and return a Batch.ResourceFile that contains the workflow ID
- cpg_utils.cromwell.run_cromwell_workflow_from_repo_and_get_outputs(b: Batch, job_prefix: str, dataset: str, workflow: str, outputs_to_collect: dict[str, CromwellOutputType], libs: list[str], output_prefix: str, labels: dict[str, str] | None = None, input_dict: dict[str, Any] | None = None, input_paths: list[str] | None = None, repo: str | None = None, commit: str | None = None, cwd: str | None = None, driver_image: str | None = None, project: str | None = None, copy_outputs_to_gcp: bool = True, min_watch_poll_interval: int = 5, max_watch_poll_interval: int = 60, time_limit_seconds: int | None = None, backend: CromwellBackend = CromwellBackend.batch) tuple[Job, dict[str, Resource | list[Resource]]][source]#
This function needs to know the structure of the outputs you want to collect. It currently only supports:
a single value, or
a list of values
- Eg: outputs_to_collect={
‘hello.out’: None, # single output ‘hello.outs’: 5, # array output of length=5
}
If the starts with “gs://”, we’ll copy it as a resource file, otherwise write the value into a file which will be a batch resource.
If copy_outputs_to_gcp is True, the outputs will be copied to GCS. Workflows may then choose to copy these outputs to a final destination.
Returns a submit Job object, and a dict of output Resource objects.
Optionally override min/max poll interval for the watch job. This alters how often the Watch job pings Cromwell for Status updates
time_limit_seconds is optional, and will cause the workflow to be aborted if the time limit is exceeded
- cpg_utils.cromwell.watch_workflow(workflow_id_file: str, max_sequential_exception_count: int, min_poll_interval: int, max_poll_interval: int, exponential_decrease_seconds: int, output_json_path: str, time_limit_seconds: int | None = None)[source]#
INNER Python function to watch workflow status, and write output paths to output_json_path on success.
Re-importing dependencies here so the function is self-contained and can be run in a Hail bash job.
- Parameters:
workflow_id_file (str) – file containing the Cromwell WF ID only
max_sequential_exception_count (int) – Fail after X consecutive errors
min_poll_interval (int) – minimum polling wait
max_poll_interval (int) – maximum polling wait
exponential_decrease_seconds (int) – expo curve for interval generation
output_json_path (str) – where to write output results file
time_limit_seconds (int) – kill if not completed before X seconds pass
- cpg_utils.cromwell.watch_workflow_and_get_output(b: Batch, job_prefix: str, workflow_id_file: Resource, outputs_to_collect: dict[str, CromwellOutputType], driver_image: str | None = None, min_poll_interval: int = 5, max_poll_interval: int = 60, exponential_decrease_seconds: int = 1200, max_sequential_exception_count: int = 25, time_limit_seconds: int | None = None)[source]#
This is a little bit tricky, but the process is:
Wait for a cromwell workflow to finish,
If it succeeds, get the outputs (as a json)
(Hard) Get the value of the output back into Hail Batch as a resource file.
- Getting the value of the output back into hail batch because the:
outputs to collect +
number of outputs to collect must be known up-front.
So unfortunately, this function needs to know the structure of the outputs you want to collect. It currently only supports:
a single value, or
a list of values
If the starts with “gs://”, we’ll copy it as a resource file, otherwise write the value into a file which will be a batch resource.
- Parameters:
driver_image – If specified, must contain python3 (w/ requests), gcloud, jq
b – Batch object
job_prefix – Prefix for the job name
workflow_id_file – File containing the workflow ID
outputs_to_collect – dict of output name -> CromwellOutputType
min_poll_interval – Min time to wait between polls
max_poll_interval – Maximum time to wait between polls
exponential_decrease_seconds – Exponential decrease in wait time
max_sequential_exception_count – Maximum number of exceptions before giving up
time_limit_seconds – a maximum runtime before abort is triggered
- class cpg_utils.cromwell_model.CallMetadata(name, executionStatus, stdout=None, backendStatus=None, compressedDockerSize=None, commandLine=None, shardIndex=None, outputs=None, runtimeAttributes=None, callCaching=None, inputs=None, returnCode=None, jobId=None, backend=None, end=None, dockerImageUsed=None, stderr=None, callRoot=None, attempt=None, executionEvents=None, start=None, preemptible=None, jes=None, failures=None, calls: dict[str, list[CallMetadata]] | None = None, **kwargs)[source]#
Python model for cromwell CallMetadata
- class cpg_utils.cromwell_model.ExecutionStatus(value)[source]#
-
- done = 'done'#
- failed = 'failed'#
- in_progress = 'inprogress'#
- preparing = 'preparing'#
- queuedincromwell = 'queuedincromwell'#
- retryablefailure = 'retryablefailure'#
- running = 'running'#
- starting = 'starting'#
- succeeded = 'succeeded'#
- class cpg_utils.cromwell_model.WorkflowMetadataModel(workflowName=None, workflowProcessingEvents=None, metadataSource=None, actualWorkflowLanguageVersion=None, submittedFiles=None, calls: dict[str, list[CallMetadata]] | None = None, outputs=None, workflowRoot=None, actualWorkflowLanguage=None, id=None, inputs=None, labels=None, submission=None, status=None, end=None, start=None, **kwargs)[source]#
- cpg_utils.cromwell_model.collapse_status_of_calls(calls: list[CallMetadata])[source]#
- cpg_utils.cromwell_model.get_readable_duration(seconds: int | None)[source]#
>>> get_readable_duration(86401) '1d:0h:0m:1s'
>>> get_readable_duration(100) '1m:40s'
>>> get_readable_duration(3) '3s'
- cpg_utils.cromwell_model.prepare_inner_calls_string(name, calls: list[CallMetadata], expand_completed=False, monochrome=False)[source]#