Production Pipelines#
Production pipelines is a CPG specific module for improving the interactions between Hail Batch, and metamist for processing genomics data
Provides a Workflow class and a @stage decorator that allow to define workflows in a declarative fashion.
A Stage object is responsible for creating Hail Batch jobs and declaring outputs (files or metamist analysis objects) that are expected to be produced. Each stage acts on a Target, which can be of the following a SequencingGroup, a Dataset (a container for sequencing groups), or a Cohort (all input datasets together). A Workflow object plugs stages together by resolving dependencies between different levels accordingly.
Examples of workflows can be found in the production-workflows repository.
- class cpg_workflows.workflow.Action(value)[source]#
Indicates what a stage should do with a specific target.
- QUEUE = 1#
- REUSE = 3#
- SKIP = 2#
- class cpg_workflows.workflow.CohortStage(name: str, required_stages: list[Callable[[...], Stage]] | Callable[[...], Stage] | None = None, analysis_type: str | None = None, analysis_keys: list[str] | None = None, update_analysis_meta: Callable[[str], dict] | None = None, tolerate_missing_output: bool = False, skipped: bool = False, assume_outputs_exist: bool = False, forced: bool = False)[source]#
Cohort-level stage (all datasets of a workflow run).
- deprecated_queue_for_cohort(cohort: Cohort) dict[str, StageOutput | None][source]#
Plug the stage into the workflow. unused, ready for deletion
- abstractmethod expected_outputs(cohort: Cohort) CloudPath | Path | dict[str, CloudPath | Path] | dict[str, str] | dict[str, CloudPath | Path | str] | str | None[source]#
Override to declare expected output paths.
- queue_for_multicohort(multicohort: MultiCohort) dict[str, StageOutput | None][source]#
Plug the stage into the workflow.
- abstractmethod queue_jobs(cohort: Cohort, inputs: StageInput) StageOutput | None[source]#
Override to add Hail Batch jobs.
- class cpg_workflows.workflow.DatasetStage(name: str, required_stages: list[Callable[[...], Stage]] | Callable[[...], Stage] | None = None, analysis_type: str | None = None, analysis_keys: list[str] | None = None, update_analysis_meta: Callable[[str], dict] | None = None, tolerate_missing_output: bool = False, skipped: bool = False, assume_outputs_exist: bool = False, forced: bool = False)[source]#
Dataset-level stage
- deprecated_queue_for_cohort(cohort: Cohort) dict[str, StageOutput | None][source]#
Plug the stage into the workflow. unused, ready for deletion
- abstractmethod expected_outputs(dataset: Dataset) CloudPath | Path | dict[str, CloudPath | Path] | dict[str, str] | dict[str, CloudPath | Path | str] | str | None[source]#
Override to declare expected output paths.
- queue_for_multicohort(multicohort: MultiCohort) dict[str, StageOutput | None][source]#
Plug the stage into the workflow.
- abstractmethod queue_jobs(dataset: Dataset, inputs: StageInput) StageOutput | None[source]#
Override to add Hail Batch jobs.
- class cpg_workflows.workflow.MultiCohortStage(name: str, required_stages: list[Callable[[...], Stage]] | Callable[[...], Stage] | None = None, analysis_type: str | None = None, analysis_keys: list[str] | None = None, update_analysis_meta: Callable[[str], dict] | None = None, tolerate_missing_output: bool = False, skipped: bool = False, assume_outputs_exist: bool = False, forced: bool = False)[source]#
MultiCohort-level stage (all datasets of a workflow run).
- abstractmethod expected_outputs(multicohort: MultiCohort) CloudPath | Path | dict[str, CloudPath | Path] | dict[str, str] | dict[str, CloudPath | Path | str] | str | None[source]#
Override to declare expected output paths.
- queue_for_multicohort(multicohort: MultiCohort) dict[str, StageOutput | None][source]#
Plug the stage into the workflow.
- abstractmethod queue_jobs(multicohort: MultiCohort, inputs: StageInput) StageOutput | None[source]#
Override to add Hail Batch jobs.
- class cpg_workflows.workflow.SequencingGroupStage(name: str, required_stages: list[Callable[[...], Stage]] | Callable[[...], Stage] | None = None, analysis_type: str | None = None, analysis_keys: list[str] | None = None, update_analysis_meta: Callable[[str], dict] | None = None, tolerate_missing_output: bool = False, skipped: bool = False, assume_outputs_exist: bool = False, forced: bool = False)[source]#
Sequencing Group level stage.
- deprecated_queue_for_cohort(cohort: Cohort) dict[str, StageOutput | None][source]#
Plug the stage into the workflow. unused, ready for deletion
- abstractmethod expected_outputs(sequencing_group: SequencingGroup) CloudPath | Path | dict[str, CloudPath | Path] | dict[str, str] | dict[str, CloudPath | Path | str] | str | None[source]#
Override to declare expected output paths.
- queue_for_multicohort(multicohort: MultiCohort) dict[str, StageOutput | None][source]#
Plug the stage into the workflow.
- abstractmethod queue_jobs(sequencing_group: SequencingGroup, inputs: StageInput) StageOutput | None[source]#
Override to add Hail Batch jobs.
- class cpg_workflows.workflow.Stage(name: str, required_stages: list[Callable[[...], Stage]] | Callable[[...], Stage] | None = None, analysis_type: str | None = None, analysis_keys: list[str] | None = None, update_analysis_meta: Callable[[str], dict] | None = None, tolerate_missing_output: bool = False, skipped: bool = False, assume_outputs_exist: bool = False, forced: bool = False)[source]#
Abstract class for a workflow stage. Parametrised by specific Target subclass, i.e. SequencingGroupStage(Stage[SequencingGroup]) should only be able to work on SequencingGroup(Target).
- property analysis_prefix: CloudPath | Path#
- deprecated_queue_for_cohort(cohort: Cohort) dict[str, StageOutput | None][source]#
Queues jobs for each corresponding target, defined by Stage subclass.
Returns a dictionary of StageOutput objects indexed by target unique_id. unused, ready for deletion
- abstractmethod expected_outputs(target: TargetT) CloudPath | Path | dict[str, CloudPath | Path] | dict[str, str] | dict[str, CloudPath | Path | str] | str | None[source]#
Get path(s) to files that the stage is expected to generate for a target. Used within in queue_jobs() to pass paths to outputs to job commands, as well as by the workflow to check if the stage’s expected outputs already exist and can be reused.
Can be a str, a Path object, or a dictionary of str/Path objects.
- get_job_attrs(target: TargetT | None = None) dict[str, str][source]#
Create Hail Batch Job attributes dictionary
- get_stage_cohort_prefix(cohort: Cohort, category: str | None = None) CloudPath | Path[source]#
Takes a cohort as an argument, calls through to the Workflow cohort_prefix method Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME e.g. “gs://cpg-project-main/seqr_loader/COH123/MyStage”
- Parameters:
cohort (Cohort) – we pull the analysis dataset and name from this Cohort
category (str | none) – main, tmp, test, analysis, web
- Returns:
Path
- make_outputs(target: Target, data: CloudPath | Path | dict[str, CloudPath | Path] | dict[str, str] | dict[str, CloudPath | Path | str] | str | None = None, jobs: Sequence[Job | None] | Job | None = None, meta: dict | None = None, reusable: bool = False, skipped: bool = False, error_msg: str | None = None) StageOutput[source]#
Create StageOutput for this stage.
- property name: str#
Stage name (unique and descriptive stage)
- output_by_target: dict[str, StageOutput | None]#
- property prefix: CloudPath | Path#
- abstractmethod queue_for_multicohort(multicohort: MultiCohort) dict[str, StageOutput | None][source]#
Queues jobs for each corresponding target, defined by Stage subclass.
Returns a dictionary of StageOutput objects indexed by target unique_id.
- abstractmethod queue_jobs(target: TargetT, inputs: StageInput) StageOutput | None[source]#
Adds Hail Batch jobs that process target. Assumes that all the household work is done: checking missing inputs from required stages, checking for possible reuse of existing outputs.
- property tmp_prefix#
- property web_prefix: CloudPath | Path#
- class cpg_workflows.workflow.StageInput(stage: Stage)[source]#
Represents an input for a stage run. It wraps the outputs of all required upstream stages for corresponding targets (e.g. all GVCFs from a GenotypeSample stage for a JointCalling stage, along with Hail Batch jobs).
An object of this class is passed to the public queue_jobs method of a Stage, and can be used to query dependency files and jobs.
- add_other_stage_output(output: StageOutput)[source]#
Add output from another stage run.
- as_dict(target: Target, stage: Callable[[...], Stage]) dict[str, CloudPath | Path][source]#
Get a dict of paths for a specific target and stage
- as_dict_by_target(stage: Callable[[...], Stage]) dict[str, dict[str, CloudPath | Path]][source]#
Get as a dict of files/resources for a specific stage, indexed by target
- as_path(target: Target, stage: Callable[[...], Stage], key: str | None = None) CloudPath | Path[source]#
Represent as a path to a file, otherwise fail. stage can be callable, or a subclass of Stage
- as_path_by_target(stage: Callable[[...], Stage], key: str | None = None) dict[str, CloudPath | Path][source]#
Get a single file path result, indexed by target for a specific stage
- as_path_dict_by_target(stage: Callable[[...], Stage]) dict[str, dict[str, CloudPath | Path]][source]#
Get a dict of paths for a specific stage, and indexed by target
- exception cpg_workflows.workflow.StageInputNotFoundError[source]#
Thrown when a stage requests input from another stage that doesn’t exist.
- class cpg_workflows.workflow.StageOutput(target: Target, data: CloudPath | Path | dict[str, CloudPath | Path] | dict[str, str] | dict[str, CloudPath | Path | str] | str | None = None, jobs: Sequence[Job | None] | Job | None = None, meta: dict | None = None, reusable: bool = False, skipped: bool = False, error_msg: str | None = None, stage: Stage | None = None)[source]#
Represents a result of a specific stage, which was run on a specific target. Can be a file path, or a Hail Batch Resource. Optionally wrapped in a dict.
- as_dict() dict[str, CloudPath | Path][source]#
Cast the result to a dictionary, or throw an error if the cast failed.
- as_path(key=None) CloudPath | Path[source]#
Cast the result to a path object. Throw an exception when can’t cast. key is used to extract the value when the result is a dictionary.
- as_str(key=None) str[source]#
Cast the result to a simple string. Throw an exception when can’t cast. key is used to extract the value when the result is a dictionary.
- jobs: list[Job]#
- meta: dict#
- class cpg_workflows.workflow.Workflow(stages: list[Callable[[...], Stage]] | None = None, dry_run: bool | None = None)[source]#
Encapsulates a Hail Batch object, stages, and a cohort of datasets of sequencing groups. Responsible for orchestrating stages.
- property analysis_prefix: CloudPath | Path#
- cohort_prefix(cohort: Cohort, category: str | None = None) CloudPath | Path[source]#
Takes a cohort and category as an argument, calls through to the Workflow cohort_prefix method Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID e.g. “gs://cpg-project-main/seqr_loader/COH123”, or “gs://cpg-project-main-analysis/seqr_loader/COH123”
- Parameters:
cohort (Cohort) – we pull the analysis dataset and id from this Cohort
category (str | None) – sub-bucket for this project
- Returns:
Path
- property output_version: str#
- property prefix: CloudPath | Path#
- run(stages: list[Callable[[...], Stage]] | None = None, wait: bool | None = False)[source]#
Resolve stages, add and submit Hail Batch jobs. When run_all_implicit_stages is set, all required stages that were not defined explicitly would still be executed.
- run_timestamp: str#
- set_stages(requested_stages: list[Callable[[...], Stage]])[source]#
Iterate over stages and call their queue_for_cohort(cohort) methods; through that, creates all Hail Batch jobs through Stage.queue_jobs().
- property tmp_prefix: CloudPath | Path#
- property web_prefix: CloudPath | Path#
- exception cpg_workflows.workflow.WorkflowError[source]#
Error raised by workflow and stage implementation.
- cpg_workflows.workflow.path_walk(expected, collected: set | None = None) set[CloudPath | Path][source]#
recursive walk of expected_out if the object is iterable, walk it this gets around the issue with nested lists and dicts mainly around the use of Array outputs from Cromwell
- Parameters:
expected (Any) – any type of object containing Paths
collected (set) – all collected paths so far
- Returns:
a set of all collected Path nodes
Examples:
>>> path_walk({'a': {'b': {'c': Path('d')}}}) {Path('d')} >>> path_walk({'a': {'b': {'c': [Path('d'), Path('e')]}}}) {Path('d'), Path('e')} >>> path_walk({'a': Path('b'),'c': {'d': 'e'}, {'f': Path('g')}}) {Path('b'), Path('g')}
- cpg_workflows.workflow.run_workflow(stages: list[Callable[[...], Stage]] | None = None, wait: bool | None = False, dry_run: bool = False) Workflow[source]#
- cpg_workflows.workflow.skip(_fun: Callable[[...], Stage] | None = None, *, reason: str | None = None, assume_outputs_exist: bool = False) Callable[[...], Stage] | Callable[[...], Callable[[...], Stage]][source]#
Decorator on top of @stage that sets the self.skipped field to True. By default, expected outputs of a skipped stage will be checked, unless assume_outputs_exist is True.
@skip @stage class MyStage1(SequencingGroupStage):
…
@skip @stage(assume_outputs_exist=True) class MyStage2(SequencingGroupStage):
…
- cpg_workflows.workflow.stage(cls: Type[Stage] | None = None, *, analysis_type: str | None = None, analysis_keys: list[str | CloudPath | Path] | None = None, update_analysis_meta: Callable[[str], dict] | None = None, tolerate_missing_output: bool = False, required_stages: list[Callable[[...], Stage]] | Callable[[...], Stage] | None = None, skipped: bool = False, assume_outputs_exist: bool = False, forced: bool = False) Callable[[...], Stage] | Callable[[...], Callable[[...], Stage]][source]#
Implements a standard class decorator pattern with optional arguments. The goal is to allow declaring workflow stages without requiring to implement a constructor method. E.g.
@stage(required_stages=[Align]) class GenotypeSample(SequencingGroupStage):
- def expected_outputs(self, sequencing_group: SequencingGroup):
…
- def queue_jobs(self, sequencing_group: SequencingGroup, inputs: StageInput) -> StageOutput:
…
- @analysis_type: if defined, will be used to create/update Analysis entries
using the status reporter.
- @analysis_keys: is defined, will be used to extract the value for Analysis.output
if the Stage.expected_outputs() returns a dict.
- @update_analysis_meta: if defined, this function is called on the Analysis.output
field, and returns a dictionary to be merged into the Analysis.meta
- @tolerate_missing_output: if True, when registering the output of this stage,
allow for the output file to be missing (only relevant for metamist entry)
- @required_stages: list of other stage classes that are required prerequisites
for this stage. Outputs of those stages will be passed to Stage.queue_jobs(… , inputs) as inputs, and all required dependencies between Hail Batch jobs will be set automatically as well.
@skipped: always skip this stage. @assume_outputs_exist: assume expected outputs of this stage always exist. @forced: always force run this stage, regardless of the outputs’ existence.
Targets for workflow stages: SequencingGroup, Dataset, Cohort.
- class cpg_workflows.targets.Cohort(id: str | None = None, name: str | None = None, multicohort: MultiCohort | None = None)[source]#
Represents a “cohort” target - all sequencing groups from a single CustomCohort (potentially spanning multiple datasets) in the workflow. Analysis dataset name is required and will be used as the default name for the cohort.
- get_dataset_by_name(name: str, only_active: bool = True) Dataset | None[source]#
Get dataset by name. Include only “active” datasets (unless only_active is False)
- get_datasets(only_active: bool = True) list[Dataset][source]#
Gets list of all datasets. Include only “active” datasets (unless only_active is False)
- get_sequencing_groups(only_active: bool = True) list[SequencingGroup][source]#
Gets a flat list of all sequencing groups from all datasets. Include only “active” sequencing groups (unless only_active is False)
- property target_id: str#
Unique target ID
- class cpg_workflows.targets.Dataset(name: str, cohort: Cohort | None = None)[source]#
Represents a CPG dataset.
Each dataset at the CPG corresponds to * a GCP project: populationgenomics/team-docs * a Pulumi stack: populationgenomics/analysis-runner * a metamist project
- add_sequencing_group(id: str, *, sequencing_type: str, sequencing_technology: str, sequencing_platform: str, external_id: str | None = None, participant_id: str | None = None, meta: dict | None = None, sex: Sex | None = None, pedigree: PedigreeInfo | None = None, alignment_input: AlignmentInput | None = None) SequencingGroup[source]#
Create a new sequencing group and add it to the dataset.
- add_sequencing_group_object(s: SequencingGroup)[source]#
Add a sequencing group object to the dataset. :param s: SequencingGroup object
- get_sequencing_groups(only_active: bool = True) list[SequencingGroup][source]#
Get dataset’s sequencing groups. Include only “active” sequencing groups, unless only_active=False
- property target_id: str#
Unique target ID
- web_prefix(**kwargs) CloudPath | Path[source]#
Path for files served by an HTTP server Matches corresponding URLs returns by self.web_url() URLs.
- class cpg_workflows.targets.MultiCohort[source]#
Represents a “multi-cohort” target - multiple cohorts in the workflow.
- get_cohort_by_id(id: str, only_active: bool = True) Cohort | None[source]#
Get cohort by id. Include only “active” cohorts (unless only_active is False)
- get_cohorts(only_active: bool = True) list[Cohort][source]#
Gets list of all cohorts. Include only “active” cohorts (unless only_active is False)
- get_dataset_by_name(name: str, only_active: bool = True) Dataset | None[source]#
Get dataset by name. Include only “active” datasets (unless only_active is False)
- get_datasets(only_active: bool = True) list[Dataset][source]#
Gets list of all datasets. Include only “active” datasets (unless only_active is False)
- get_sequencing_groups(only_active: bool = True) list[SequencingGroup][source]#
Gets a flat list of all sequencing groups from all datasets. uses a dictionary to avoid duplicates (we could have the same sequencing group in multiple cohorts) Include only “active” sequencing groups (unless only_active is False)
- property target_id: str#
Unique target ID
- class cpg_workflows.targets.PedigreeInfo(sequencing_group: SequencingGroup, sex: Sex = Sex.UNKNOWN, fam_id: str | None = None, phenotype: str | int = 0, dad: SequencingGroup | None = None, mom: SequencingGroup | None = None)[source]#
Pedigree relationships with other sequencing groups in the cohort, and other PED data
- dad: SequencingGroup | None = None#
- fam_id: str | None = None#
- get_ped_dict(use_participant_id: bool = False) dict[str, str][source]#
Returns a dictionary of pedigree fields for this sequencing group, corresponding a PED file entry.
- mom: SequencingGroup | None = None#
- phenotype: str | int = 0#
- sequencing_group: SequencingGroup#
- class cpg_workflows.targets.SequencingGroup(id: str, dataset: Dataset, *, sequencing_type: str, sequencing_technology: str, sequencing_platform: str, external_id: str | None = None, participant_id: str | None = None, meta: dict | None = None, sex: Sex | None = None, pedigree: PedigreeInfo | None = None, alignment_input: AlignmentInput | None = None, assays: tuple[Assay, ...] | None = None, forced: bool = False)[source]#
Represents a sequencing group.
- alignment_input: AlignmentInput | None#
- assays: tuple[Assay, ...] | None#
- cram: CramPath | None#
- cram_stage_name: str | None#
- property external_id: str#
Get external sample ID, or substitute it with the internal ID.
- get_ped_dict(use_participant_id: bool = False) dict[str, str][source]#
Returns a dictionary of pedigree fields for this sequencing group, corresponding a PED file entry.
- get_sequencing_groups(only_active: bool = True) list[SequencingGroup][source]#
Implementing the abstract method.
- gvcf: GvcfPath | None#
- gvcf_stage_name: str | None#
- property make_sv_evidence_path: CloudPath | Path#
Path to the evidence root for GATK-SV evidence files.
- meta: dict#
- property participant_id: str#
Get ID of participant corresponding to this sequencing group, or substitute it with external ID.
- pedigree: PedigreeInfo#
- property rich_id: str#
composed of internal as well as external or participant IDs.
- Type:
ID for reporting purposes
- property target_id: str#
Unique target ID
- class cpg_workflows.targets.Sex(value)[source]#
Sex as in PED format
- FEMALE = 2#
- MALE = 1#
- UNKNOWN = 0#
- class cpg_workflows.targets.Target[source]#
Defines a target that a stage can act upon.
- active: bool#
- alignment_inputs_hash: str | None#
- forced: bool#
- get_alignment_inputs_hash() str[source]#
If this hash has been set, return it, otherwise set it, then return it This should be safe as it matches the current usage: - we set up the Targets in this workflow (populating SGs, Datasets, Cohorts)
at this point the targets are malleable (e.g. addition of an additional Cohort may add SGs to Datasets)
- we then set up the Stages, where alignment input hashes are generated
at this point, the alignment inputs are fixed
all calls to get_alignment_inputs_hash() need to return the same value
- get_sequencing_group_ids(only_active: bool = True) list[str][source]#
Get flat list of all sequencing group IDs corresponding to this target.
- get_sequencing_groups(only_active: bool = True) list[SequencingGroup][source]#
Get flat list of all sequencing groups corresponding to this target.
- rich_id_map() dict[str, str][source]#
Map of internal IDs to participant or external IDs, if the latter is provided.
- set_alignment_inputs_hash()[source]#
Unique hash string of sample alignment inputs. Useful to decide whether the analysis on the target needs to be rerun.
- property target_id: str#
ID should be unique across target of all levels.
We are raising NotImplementedError instead of making it an abstract class, because mypy is not happy about binding TypeVar to abstract classes, see: https://stackoverflow.com/questions/48349054/how-do-you-annotate-the-type-of -an-abstract-class-with-mypy
Specifically,
` TypeVar('TargetT', bound=Target) `Will raise:` Only concrete class can be given where "Type[Target]" is expected `
- cpg_workflows.targets.hash_from_list_of_strings(string_list: list[str], hash_length: int = 10, suffix: str | None = None) str[source]#
Create a hash from a list of strings :param string_list (): :param hash_length: how many characters to use from the hash :type hash_length: int :param suffix: optional, clarify the type of value which was hashed :type suffix: str
Returns: