Cromwell#

The CPG uses Cromwell to run workflows. This module provides a Python interface to Cromwell, allowing you to submit workflows, monitor their progress, and retrieve their outputs from Hail Batch.

Cromwell module contains helper code for submitting + watching jobs from within Hail batch.

class cpg_utils.cromwell.CromwellBackend(value)[source]#
batch = 'batch'#
pipelines_api = 'papi'#
class cpg_utils.cromwell.CromwellOutputType(name: str, copy_file_into_batch: bool, array_length: int | None, resource_group: dict[str, Any] | None = None)[source]#

Declares output type for cromwell -> hail batch glue

static array(name: str, length: int) CromwellOutputType[source]#

Array of simple files

static array_path(name: str, length: int) CromwellOutputType[source]#

Return a list of file paths of the outputs (one path per file)

static array_resource_group(name: str, length: int, resource_group: dict[str, Any]) CromwellOutputType[source]#

Select an array of resource groups. In this case, the outputs you select within the resource group are zipped. Resource_group has the format:

{<read-group-name>: <corresponding-output-in-cromwell>}

Eg: outputs_to_collect={

“<this-key-only-exists-in-output-dict>”: CromwellOutputType.array_resource_group({

‘bam’: ‘hello.output_bams’, ‘bai’: ‘hello.output_bam_indexes’

}, length=2)

}

# You get # {“<this-key-only-exists-in-output-dict>”: [__resource_group1, __resource_group2]}

static single(name: str) CromwellOutputType[source]#

Single file

static single_path(name: str) CromwellOutputType[source]#

Return the file path of the output in a file

static single_resource_group(name: str, resource_group: dict[str, Any]) CromwellOutputType[source]#
Specify a resource group you want to return, where resource_group has the format:

{<read-group-name>: <corresponding-output-in-cromwell>}

Eg: outputs_to_collect={

“<this-key-only-exists-in-output-dict>”: CromwellOutputType.single_resource_group({

# The hello workflow has two outputs: output_bam, output_bam_index ‘bam’: ‘hello.output_bam’, ‘bai’: ‘hello.output_bam_index’

})

}

cpg_utils.cromwell.get_cromwell_oauth_token()[source]#

Get oath token for cromwell, specific to audience

cpg_utils.cromwell.run_cromwell_workflow(job: BashJob, dataset: str, access_level: str, workflow: str, cwd: str | None, libs: list[str], output_prefix: str, labels: dict[str, str] | None = None, input_dict: dict[str, Any] | None = None, input_paths: list[str] | None = None, project: str | None = None, copy_outputs_to_gcp: bool = True, ar_guid_override: str | None = None, backend: CromwellBackend = CromwellBackend.batch)[source]#

Run a cromwell workflow, and return a Batch.ResourceFile that contains the workflow ID

cpg_utils.cromwell.run_cromwell_workflow_from_repo_and_get_outputs(b: Batch, job_prefix: str, dataset: str, workflow: str, outputs_to_collect: dict[str, CromwellOutputType], libs: list[str], output_prefix: str, labels: dict[str, str] | None = None, input_dict: dict[str, Any] | None = None, input_paths: list[str] | None = None, repo: str | None = None, commit: str | None = None, cwd: str | None = None, driver_image: str | None = None, project: str | None = None, copy_outputs_to_gcp: bool = True, min_watch_poll_interval: int = 5, max_watch_poll_interval: int = 60, time_limit_seconds: int | None = None, backend: CromwellBackend = CromwellBackend.batch) tuple[Job, dict[str, Resource | list[Resource]]][source]#

This function needs to know the structure of the outputs you want to collect. It currently only supports:

  • a single value, or

  • a list of values

Eg: outputs_to_collect={

‘hello.out’: None, # single output ‘hello.outs’: 5, # array output of length=5

}

If the starts with “gs://”, we’ll copy it as a resource file, otherwise write the value into a file which will be a batch resource.

If copy_outputs_to_gcp is True, the outputs will be copied to GCS. Workflows may then choose to copy these outputs to a final destination.

Returns a submit Job object, and a dict of output Resource objects.

Optionally override min/max poll interval for the watch job. This alters how often the Watch job pings Cromwell for Status updates

time_limit_seconds is optional, and will cause the workflow to be aborted if the time limit is exceeded

cpg_utils.cromwell.watch_workflow(workflow_id_file: str, max_sequential_exception_count: int, min_poll_interval: int, max_poll_interval: int, exponential_decrease_seconds: int, output_json_path: str, time_limit_seconds: int | None = None)[source]#

INNER Python function to watch workflow status, and write output paths to output_json_path on success.

Re-importing dependencies here so the function is self-contained and can be run in a Hail bash job.

Parameters:
  • workflow_id_file (str) – file containing the Cromwell WF ID only

  • max_sequential_exception_count (int) – Fail after X consecutive errors

  • min_poll_interval (int) – minimum polling wait

  • max_poll_interval (int) – maximum polling wait

  • exponential_decrease_seconds (int) – expo curve for interval generation

  • output_json_path (str) – where to write output results file

  • time_limit_seconds (int) – kill if not completed before X seconds pass

cpg_utils.cromwell.watch_workflow_and_get_output(b: Batch, job_prefix: str, workflow_id_file: Resource, outputs_to_collect: dict[str, CromwellOutputType], driver_image: str | None = None, min_poll_interval: int = 5, max_poll_interval: int = 60, exponential_decrease_seconds: int = 1200, max_sequential_exception_count: int = 25, time_limit_seconds: int | None = None)[source]#

This is a little bit tricky, but the process is:

  • Wait for a cromwell workflow to finish,

  • If it succeeds, get the outputs (as a json)

  • (Hard) Get the value of the output back into Hail Batch as a resource file.

Getting the value of the output back into hail batch because the:
  • outputs to collect +

  • number of outputs to collect must be known up-front.

So unfortunately, this function needs to know the structure of the outputs you want to collect. It currently only supports:

  • a single value, or

  • a list of values

If the starts with “gs://”, we’ll copy it as a resource file, otherwise write the value into a file which will be a batch resource.

Parameters:
  • driver_image – If specified, must contain python3 (w/ requests), gcloud, jq

  • b – Batch object

  • job_prefix – Prefix for the job name

  • workflow_id_file – File containing the workflow ID

  • outputs_to_collect – dict of output name -> CromwellOutputType

  • min_poll_interval – Min time to wait between polls

  • max_poll_interval – Maximum time to wait between polls

  • exponential_decrease_seconds – Exponential decrease in wait time

  • max_sequential_exception_count – Maximum number of exceptions before giving up

  • time_limit_seconds – a maximum runtime before abort is triggered

class cpg_utils.cromwell_model.CallMetadata(name, executionStatus, stdout=None, backendStatus=None, compressedDockerSize=None, commandLine=None, shardIndex=None, outputs=None, runtimeAttributes=None, callCaching=None, inputs=None, returnCode=None, jobId=None, backend=None, end=None, dockerImageUsed=None, stderr=None, callRoot=None, attempt=None, executionEvents=None, start=None, preemptible=None, jes=None, failures=None, calls: dict[str, list[CallMetadata]] | None = None, **kwargs)[source]#

Python model for cromwell CallMetadata

display(expand_completed=False, monochrome=False)[source]#
static parse(d)[source]#
class cpg_utils.cromwell_model.ExecutionStatus(value)[source]#
color()[source]#
done = 'done'#
failed = 'failed'#
in_progress = 'inprogress'#
is_finished()[source]#
preparing = 'preparing'#
queuedincromwell = 'queuedincromwell'#
retryablefailure = 'retryablefailure'#
running = 'running'#
starting = 'starting'#
succeeded = 'succeeded'#
symbol()[source]#
class cpg_utils.cromwell_model.WorkflowMetadataModel(workflowName=None, workflowProcessingEvents=None, metadataSource=None, actualWorkflowLanguageVersion=None, submittedFiles=None, calls: dict[str, list[CallMetadata]] | None = None, outputs=None, workflowRoot=None, actualWorkflowLanguage=None, id=None, inputs=None, labels=None, submission=None, status=None, end=None, start=None, **kwargs)[source]#
display(expand_completed: bool = False, monochrome: bool = False)[source]#
static parse(d)[source]#
cpg_utils.cromwell_model.collapse_status_of_calls(calls: list[CallMetadata])[source]#
cpg_utils.cromwell_model.get_readable_duration(seconds: int | None)[source]#
>>> get_readable_duration(86401)
'1d:0h:0m:1s'
>>> get_readable_duration(100)
'1m:40s'
>>> get_readable_duration(3)
'3s'
cpg_utils.cromwell_model.get_seconds_duration_between_cromwell_dates(start, end)[source]#
cpg_utils.cromwell_model.prepare_inner_calls_string(name, calls: list[CallMetadata], expand_completed=False, monochrome=False)[source]#
cpg_utils.cromwell_model.unwrap_caused_by(failures: list)[source]#