Production Pipelines

Contents

Production Pipelines#

Production pipelines is a CPG specific module for improving the interactions between Hail Batch, and metamist for processing genomics data

Provides a Workflow class and a @stage decorator that allow to define workflows in a declarative fashion.

A Stage object is responsible for creating Hail Batch jobs and declaring outputs (files or metamist analysis objects) that are expected to be produced. Each stage acts on a Target, which can be of the following a SequencingGroup, a Dataset (a container for sequencing groups), or a Cohort (all input datasets together). A Workflow object plugs stages together by resolving dependencies between different levels accordingly.

Examples of workflows can be found in the production-workflows repository.

class cpg_workflows.workflow.Action(value)[source]#

Indicates what a stage should do with a specific target.

QUEUE = 1#
REUSE = 3#
SKIP = 2#
class cpg_workflows.workflow.CohortStage(name: str, required_stages: list[Callable[[...], Stage]] | Callable[[...], Stage] | None = None, analysis_type: str | None = None, analysis_keys: list[str] | None = None, update_analysis_meta: Callable[[str], dict] | None = None, tolerate_missing_output: bool = False, skipped: bool = False, assume_outputs_exist: bool = False, forced: bool = False)[source]#

Cohort-level stage (all datasets of a workflow run).

deprecated_queue_for_cohort(cohort: Cohort) dict[str, StageOutput | None][source]#

Plug the stage into the workflow. unused, ready for deletion

abstractmethod expected_outputs(cohort: Cohort) CloudPath | Path | dict[str, CloudPath | Path] | dict[str, str] | dict[str, CloudPath | Path | str] | str | None[source]#

Override to declare expected output paths.

queue_for_multicohort(multicohort: MultiCohort) dict[str, StageOutput | None][source]#

Plug the stage into the workflow.

abstractmethod queue_jobs(cohort: Cohort, inputs: StageInput) StageOutput | None[source]#

Override to add Hail Batch jobs.

class cpg_workflows.workflow.DatasetStage(name: str, required_stages: list[Callable[[...], Stage]] | Callable[[...], Stage] | None = None, analysis_type: str | None = None, analysis_keys: list[str] | None = None, update_analysis_meta: Callable[[str], dict] | None = None, tolerate_missing_output: bool = False, skipped: bool = False, assume_outputs_exist: bool = False, forced: bool = False)[source]#

Dataset-level stage

deprecated_queue_for_cohort(cohort: Cohort) dict[str, StageOutput | None][source]#

Plug the stage into the workflow. unused, ready for deletion

abstractmethod expected_outputs(dataset: Dataset) CloudPath | Path | dict[str, CloudPath | Path] | dict[str, str] | dict[str, CloudPath | Path | str] | str | None[source]#

Override to declare expected output paths.

queue_for_multicohort(multicohort: MultiCohort) dict[str, StageOutput | None][source]#

Plug the stage into the workflow.

abstractmethod queue_jobs(dataset: Dataset, inputs: StageInput) StageOutput | None[source]#

Override to add Hail Batch jobs.

class cpg_workflows.workflow.MultiCohortStage(name: str, required_stages: list[Callable[[...], Stage]] | Callable[[...], Stage] | None = None, analysis_type: str | None = None, analysis_keys: list[str] | None = None, update_analysis_meta: Callable[[str], dict] | None = None, tolerate_missing_output: bool = False, skipped: bool = False, assume_outputs_exist: bool = False, forced: bool = False)[source]#

MultiCohort-level stage (all datasets of a workflow run).

abstractmethod expected_outputs(multicohort: MultiCohort) CloudPath | Path | dict[str, CloudPath | Path] | dict[str, str] | dict[str, CloudPath | Path | str] | str | None[source]#

Override to declare expected output paths.

queue_for_multicohort(multicohort: MultiCohort) dict[str, StageOutput | None][source]#

Plug the stage into the workflow.

abstractmethod queue_jobs(multicohort: MultiCohort, inputs: StageInput) StageOutput | None[source]#

Override to add Hail Batch jobs.

class cpg_workflows.workflow.SequencingGroupStage(name: str, required_stages: list[Callable[[...], Stage]] | Callable[[...], Stage] | None = None, analysis_type: str | None = None, analysis_keys: list[str] | None = None, update_analysis_meta: Callable[[str], dict] | None = None, tolerate_missing_output: bool = False, skipped: bool = False, assume_outputs_exist: bool = False, forced: bool = False)[source]#

Sequencing Group level stage.

deprecated_queue_for_cohort(cohort: Cohort) dict[str, StageOutput | None][source]#

Plug the stage into the workflow. unused, ready for deletion

abstractmethod expected_outputs(sequencing_group: SequencingGroup) CloudPath | Path | dict[str, CloudPath | Path] | dict[str, str] | dict[str, CloudPath | Path | str] | str | None[source]#

Override to declare expected output paths.

queue_for_multicohort(multicohort: MultiCohort) dict[str, StageOutput | None][source]#

Plug the stage into the workflow.

abstractmethod queue_jobs(sequencing_group: SequencingGroup, inputs: StageInput) StageOutput | None[source]#

Override to add Hail Batch jobs.

class cpg_workflows.workflow.Stage(name: str, required_stages: list[Callable[[...], Stage]] | Callable[[...], Stage] | None = None, analysis_type: str | None = None, analysis_keys: list[str] | None = None, update_analysis_meta: Callable[[str], dict] | None = None, tolerate_missing_output: bool = False, skipped: bool = False, assume_outputs_exist: bool = False, forced: bool = False)[source]#

Abstract class for a workflow stage. Parametrised by specific Target subclass, i.e. SequencingGroupStage(Stage[SequencingGroup]) should only be able to work on SequencingGroup(Target).

property analysis_prefix: CloudPath | Path#
deprecated_queue_for_cohort(cohort: Cohort) dict[str, StageOutput | None][source]#

Queues jobs for each corresponding target, defined by Stage subclass.

Returns a dictionary of StageOutput objects indexed by target unique_id. unused, ready for deletion

abstractmethod expected_outputs(target: TargetT) CloudPath | Path | dict[str, CloudPath | Path] | dict[str, str] | dict[str, CloudPath | Path | str] | str | None[source]#

Get path(s) to files that the stage is expected to generate for a target. Used within in queue_jobs() to pass paths to outputs to job commands, as well as by the workflow to check if the stage’s expected outputs already exist and can be reused.

Can be a str, a Path object, or a dictionary of str/Path objects.

get_job_attrs(target: TargetT | None = None) dict[str, str][source]#

Create Hail Batch Job attributes dictionary

get_stage_cohort_prefix(cohort: Cohort, category: str | None = None) CloudPath | Path[source]#

Takes a cohort as an argument, calls through to the Workflow cohort_prefix method Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID / STAGE_NAME e.g. “gs://cpg-project-main/seqr_loader/COH123/MyStage”

Parameters:
  • cohort (Cohort) – we pull the analysis dataset and name from this Cohort

  • category (str | none) – main, tmp, test, analysis, web

Returns:

Path

make_outputs(target: Target, data: CloudPath | Path | dict[str, CloudPath | Path] | dict[str, str] | dict[str, CloudPath | Path | str] | str | None = None, jobs: Sequence[Job | None] | Job | None = None, meta: dict | None = None, reusable: bool = False, skipped: bool = False, error_msg: str | None = None) StageOutput[source]#

Create StageOutput for this stage.

property name: str#

Stage name (unique and descriptive stage)

output_by_target: dict[str, StageOutput | None]#
property prefix: CloudPath | Path#
abstractmethod queue_for_multicohort(multicohort: MultiCohort) dict[str, StageOutput | None][source]#

Queues jobs for each corresponding target, defined by Stage subclass.

Returns a dictionary of StageOutput objects indexed by target unique_id.

abstractmethod queue_jobs(target: TargetT, inputs: StageInput) StageOutput | None[source]#

Adds Hail Batch jobs that process target. Assumes that all the household work is done: checking missing inputs from required stages, checking for possible reuse of existing outputs.

required_stages: list[Stage]#
required_stages_classes: list[Callable[[...], Stage]]#
property tmp_prefix#
property web_prefix: CloudPath | Path#
class cpg_workflows.workflow.StageInput(stage: Stage)[source]#

Represents an input for a stage run. It wraps the outputs of all required upstream stages for corresponding targets (e.g. all GVCFs from a GenotypeSample stage for a JointCalling stage, along with Hail Batch jobs).

An object of this class is passed to the public queue_jobs method of a Stage, and can be used to query dependency files and jobs.

add_other_stage_output(output: StageOutput)[source]#

Add output from another stage run.

as_dict(target: Target, stage: Callable[[...], Stage]) dict[str, CloudPath | Path][source]#

Get a dict of paths for a specific target and stage

as_dict_by_target(stage: Callable[[...], Stage]) dict[str, dict[str, CloudPath | Path]][source]#

Get as a dict of files/resources for a specific stage, indexed by target

as_path(target: Target, stage: Callable[[...], Stage], key: str | None = None) CloudPath | Path[source]#

Represent as a path to a file, otherwise fail. stage can be callable, or a subclass of Stage

as_path_by_target(stage: Callable[[...], Stage], key: str | None = None) dict[str, CloudPath | Path][source]#

Get a single file path result, indexed by target for a specific stage

as_path_dict_by_target(stage: Callable[[...], Stage]) dict[str, dict[str, CloudPath | Path]][source]#

Get a dict of paths for a specific stage, and indexed by target

as_str(target: Target, stage: Callable[[...], Stage], key: str | None = None) str[source]#

Represent as a simple string, otherwise fail. stage can be callable, or a subclass of Stage

get_jobs(target: Target) list[Job][source]#

Get list of jobs that the next stage would depend on.

exception cpg_workflows.workflow.StageInputNotFoundError[source]#

Thrown when a stage requests input from another stage that doesn’t exist.

class cpg_workflows.workflow.StageOutput(target: Target, data: CloudPath | Path | dict[str, CloudPath | Path] | dict[str, str] | dict[str, CloudPath | Path | str] | str | None = None, jobs: Sequence[Job | None] | Job | None = None, meta: dict | None = None, reusable: bool = False, skipped: bool = False, error_msg: str | None = None, stage: Stage | None = None)[source]#

Represents a result of a specific stage, which was run on a specific target. Can be a file path, or a Hail Batch Resource. Optionally wrapped in a dict.

as_dict() dict[str, CloudPath | Path][source]#

Cast the result to a dictionary, or throw an error if the cast failed.

as_path(key=None) CloudPath | Path[source]#

Cast the result to a path object. Throw an exception when can’t cast. key is used to extract the value when the result is a dictionary.

as_str(key=None) str[source]#

Cast the result to a simple string. Throw an exception when can’t cast. key is used to extract the value when the result is a dictionary.

jobs: list[Job]#
meta: dict#
class cpg_workflows.workflow.Workflow(stages: list[Callable[[...], Stage]] | None = None, dry_run: bool | None = None)[source]#

Encapsulates a Hail Batch object, stages, and a cohort of datasets of sequencing groups. Responsible for orchestrating stages.

property analysis_prefix: CloudPath | Path#
cohort_prefix(cohort: Cohort, category: str | None = None) CloudPath | Path[source]#

Takes a cohort and category as an argument, calls through to the Workflow cohort_prefix method Result in the form PROJECT_BUCKET / WORKFLOW_NAME / COHORT_ID e.g. “gs://cpg-project-main/seqr_loader/COH123”, or “gs://cpg-project-main-analysis/seqr_loader/COH123”

Parameters:
  • cohort (Cohort) – we pull the analysis dataset and id from this Cohort

  • category (str | None) – sub-bucket for this project

Returns:

Path

property output_version: str#
property prefix: CloudPath | Path#
queued_stages: list[Stage]#
run(stages: list[Callable[[...], Stage]] | None = None, wait: bool | None = False)[source]#

Resolve stages, add and submit Hail Batch jobs. When run_all_implicit_stages is set, all required stages that were not defined explicitly would still be executed.

run_timestamp: str#
set_stages(requested_stages: list[Callable[[...], Stage]])[source]#

Iterate over stages and call their queue_for_cohort(cohort) methods; through that, creates all Hail Batch jobs through Stage.queue_jobs().

property tmp_prefix: CloudPath | Path#
property web_prefix: CloudPath | Path#
exception cpg_workflows.workflow.WorkflowError[source]#

Error raised by workflow and stage implementation.

cpg_workflows.workflow.get_workflow(dry_run: bool = False) Workflow[source]#
cpg_workflows.workflow.path_walk(expected, collected: set | None = None) set[CloudPath | Path][source]#

recursive walk of expected_out if the object is iterable, walk it this gets around the issue with nested lists and dicts mainly around the use of Array outputs from Cromwell

Parameters:
  • expected (Any) – any type of object containing Paths

  • collected (set) – all collected paths so far

Returns:

a set of all collected Path nodes

Examples:

>>> path_walk({'a': {'b': {'c': Path('d')}}})
{Path('d')}
>>> path_walk({'a': {'b': {'c': [Path('d'), Path('e')]}}})
{Path('d'), Path('e')}
>>> path_walk({'a': Path('b'),'c': {'d': 'e'}, {'f': Path('g')}})
{Path('b'), Path('g')}
cpg_workflows.workflow.run_workflow(stages: list[Callable[[...], Stage]] | None = None, wait: bool | None = False, dry_run: bool = False) Workflow[source]#
cpg_workflows.workflow.skip(_fun: Callable[[...], Stage] | None = None, *, reason: str | None = None, assume_outputs_exist: bool = False) Callable[[...], Stage] | Callable[[...], Callable[[...], Stage]][source]#

Decorator on top of @stage that sets the self.skipped field to True. By default, expected outputs of a skipped stage will be checked, unless assume_outputs_exist is True.

@skip @stage class MyStage1(SequencingGroupStage):

@skip @stage(assume_outputs_exist=True) class MyStage2(SequencingGroupStage):

cpg_workflows.workflow.stage(cls: Type[Stage] | None = None, *, analysis_type: str | None = None, analysis_keys: list[str | CloudPath | Path] | None = None, update_analysis_meta: Callable[[str], dict] | None = None, tolerate_missing_output: bool = False, required_stages: list[Callable[[...], Stage]] | Callable[[...], Stage] | None = None, skipped: bool = False, assume_outputs_exist: bool = False, forced: bool = False) Callable[[...], Stage] | Callable[[...], Callable[[...], Stage]][source]#

Implements a standard class decorator pattern with optional arguments. The goal is to allow declaring workflow stages without requiring to implement a constructor method. E.g.

@stage(required_stages=[Align]) class GenotypeSample(SequencingGroupStage):

def expected_outputs(self, sequencing_group: SequencingGroup):

def queue_jobs(self, sequencing_group: SequencingGroup, inputs: StageInput) -> StageOutput:

@analysis_type: if defined, will be used to create/update Analysis entries

using the status reporter.

@analysis_keys: is defined, will be used to extract the value for Analysis.output

if the Stage.expected_outputs() returns a dict.

@update_analysis_meta: if defined, this function is called on the Analysis.output

field, and returns a dictionary to be merged into the Analysis.meta

@tolerate_missing_output: if True, when registering the output of this stage,

allow for the output file to be missing (only relevant for metamist entry)

@required_stages: list of other stage classes that are required prerequisites

for this stage. Outputs of those stages will be passed to Stage.queue_jobs(… , inputs) as inputs, and all required dependencies between Hail Batch jobs will be set automatically as well.

@skipped: always skip this stage. @assume_outputs_exist: assume expected outputs of this stage always exist. @forced: always force run this stage, regardless of the outputs’ existence.

Targets for workflow stages: SequencingGroup, Dataset, Cohort.

class cpg_workflows.targets.Cohort(id: str | None = None, name: str | None = None, multicohort: MultiCohort | None = None)[source]#

Represents a “cohort” target - all sequencing groups from a single CustomCohort (potentially spanning multiple datasets) in the workflow. Analysis dataset name is required and will be used as the default name for the cohort.

add_dataset(dataset: Dataset) Dataset[source]#

Add existing dataset into the cohort.

create_dataset(name: str) Dataset[source]#

Create a dataset and add it to the cohort.

get_dataset_by_name(name: str, only_active: bool = True) Dataset | None[source]#

Get dataset by name. Include only “active” datasets (unless only_active is False)

get_datasets(only_active: bool = True) list[Dataset][source]#

Gets list of all datasets. Include only “active” datasets (unless only_active is False)

get_job_attrs() dict[source]#

Attributes for Hail Batch job.

get_job_prefix() str[source]#

Prefix job names.

get_sequencing_groups(only_active: bool = True) list[SequencingGroup][source]#

Gets a flat list of all sequencing groups from all datasets. Include only “active” sequencing groups (unless only_active is False)

property target_id: str#

Unique target ID

to_tsv() str[source]#

Export to a parsable TSV file

write_ped_file(out_path: CloudPath | Path | None = None, use_participant_id: bool = False) CloudPath | Path[source]#

Create a PED file for all samples in the whole cohort PED is written with no header line to be strict specification compliant

class cpg_workflows.targets.Dataset(name: str, cohort: Cohort | None = None)[source]#

Represents a CPG dataset.

Each dataset at the CPG corresponds to * a GCP project: populationgenomics/team-docs * a Pulumi stack: populationgenomics/analysis-runner * a metamist project

add_sequencing_group(id: str, *, sequencing_type: str, sequencing_technology: str, sequencing_platform: str, external_id: str | None = None, participant_id: str | None = None, meta: dict | None = None, sex: Sex | None = None, pedigree: PedigreeInfo | None = None, alignment_input: AlignmentInput | None = None) SequencingGroup[source]#

Create a new sequencing group and add it to the dataset.

add_sequencing_group_object(s: SequencingGroup)[source]#

Add a sequencing group object to the dataset. :param s: SequencingGroup object

analysis_prefix(**kwargs) CloudPath | Path[source]#

Storage path for analysis files.

static create(name: str) Dataset[source]#

Create a dataset.

get_job_attrs() dict[source]#

Attributes for Hail Batch job.

get_job_prefix() str[source]#

Prefix job names.

get_sequencing_groups(only_active: bool = True) list[SequencingGroup][source]#

Get dataset’s sequencing groups. Include only “active” sequencing groups, unless only_active=False

prefix(**kwargs) CloudPath | Path[source]#

The primary storage path.

property target_id: str#

Unique target ID

tmp_prefix(**kwargs) CloudPath | Path[source]#

Storage path for temporary files.

web_prefix(**kwargs) CloudPath | Path[source]#

Path for files served by an HTTP server Matches corresponding URLs returns by self.web_url() URLs.

web_url() str | None[source]#

URLs matching self.storage_web_path() files serverd by an HTTP server.

write_ped_file(out_path: CloudPath | Path | None = None, use_participant_id: bool = False) CloudPath | Path[source]#

Create a PED file for all sequencing groups PED is written with no header line to be strict specification compliant

class cpg_workflows.targets.MultiCohort[source]#

Represents a “multi-cohort” target - multiple cohorts in the workflow.

add_dataset(d: Dataset) Dataset[source]#

Add a Dataset to the MultiCohort :param d: Dataset object

create_cohort(id: str, name: str) Cohort[source]#

Create a cohort and add it to the multi-cohort.

get_cohort_by_id(id: str, only_active: bool = True) Cohort | None[source]#

Get cohort by id. Include only “active” cohorts (unless only_active is False)

get_cohorts(only_active: bool = True) list[Cohort][source]#

Gets list of all cohorts. Include only “active” cohorts (unless only_active is False)

get_dataset_by_name(name: str, only_active: bool = True) Dataset | None[source]#

Get dataset by name. Include only “active” datasets (unless only_active is False)

get_datasets(only_active: bool = True) list[Dataset][source]#

Gets list of all datasets. Include only “active” datasets (unless only_active is False)

get_job_attrs() dict[source]#

Attributes for Hail Batch job.

get_sequencing_groups(only_active: bool = True) list[SequencingGroup][source]#

Gets a flat list of all sequencing groups from all datasets. uses a dictionary to avoid duplicates (we could have the same sequencing group in multiple cohorts) Include only “active” sequencing groups (unless only_active is False)

property target_id: str#

Unique target ID

write_ped_file(out_path: CloudPath | Path | None = None, use_participant_id: bool = False) CloudPath | Path[source]#

Create a PED file for all samples in the whole MultiCohort Duplication of the Cohort method PED is written with no header line to be strict specification compliant

class cpg_workflows.targets.PedigreeInfo(sequencing_group: SequencingGroup, sex: Sex = Sex.UNKNOWN, fam_id: str | None = None, phenotype: str | int = 0, dad: SequencingGroup | None = None, mom: SequencingGroup | None = None)[source]#

Pedigree relationships with other sequencing groups in the cohort, and other PED data

dad: SequencingGroup | None = None#
fam_id: str | None = None#
get_ped_dict(use_participant_id: bool = False) dict[str, str][source]#

Returns a dictionary of pedigree fields for this sequencing group, corresponding a PED file entry.

mom: SequencingGroup | None = None#
phenotype: str | int = 0#
sequencing_group: SequencingGroup#
sex: Sex = 0#
class cpg_workflows.targets.SequencingGroup(id: str, dataset: Dataset, *, sequencing_type: str, sequencing_technology: str, sequencing_platform: str, external_id: str | None = None, participant_id: str | None = None, meta: dict | None = None, sex: Sex | None = None, pedigree: PedigreeInfo | None = None, alignment_input: AlignmentInput | None = None, assays: tuple[Assay, ...] | None = None, forced: bool = False)[source]#

Represents a sequencing group.

alignment_input: AlignmentInput | None#
assays: tuple[Assay, ...] | None#
cram: CramPath | None#
cram_stage_name: str | None#
property external_id: str#

Get external sample ID, or substitute it with the internal ID.

get_job_attrs() dict[source]#

Attributes for Hail Batch job.

get_job_prefix() str[source]#

Prefix job names.

get_ped_dict(use_participant_id: bool = False) dict[str, str][source]#

Returns a dictionary of pedigree fields for this sequencing group, corresponding a PED file entry.

get_sequencing_groups(only_active: bool = True) list[SequencingGroup][source]#

Implementing the abstract method.

gvcf: GvcfPath | None#
gvcf_stage_name: str | None#
make_cram_path() CramPath[source]#

Path to a CRAM file. Not checking its existence here.

make_gvcf_path() GvcfPath[source]#

Path to a GVCF file. Not checking its existence here.

property make_sv_evidence_path: CloudPath | Path#

Path to the evidence root for GATK-SV evidence files.

meta: dict#
property participant_id: str#

Get ID of participant corresponding to this sequencing group, or substitute it with external ID.

pedigree: PedigreeInfo#
property rich_id: str#

composed of internal as well as external or participant IDs.

Type:

ID for reporting purposes

property target_id: str#

Unique target ID

class cpg_workflows.targets.Sex(value)[source]#

Sex as in PED format

FEMALE = 2#
MALE = 1#
UNKNOWN = 0#
static parse(sex: str | int | None) Sex[source]#

Parse a string into a Sex object.

class cpg_workflows.targets.Target[source]#

Defines a target that a stage can act upon.

active: bool#
alignment_inputs_hash: str | None#
forced: bool#
get_alignment_inputs_hash() str[source]#

If this hash has been set, return it, otherwise set it, then return it This should be safe as it matches the current usage: - we set up the Targets in this workflow (populating SGs, Datasets, Cohorts)

  • at this point the targets are malleable (e.g. addition of an additional Cohort may add SGs to Datasets)

  • we then set up the Stages, where alignment input hashes are generated
    • at this point, the alignment inputs are fixed

    • all calls to get_alignment_inputs_hash() need to return the same value

get_job_attrs() dict[source]#

Attributes for Hail Batch job.

get_job_prefix() str[source]#

Prefix job names.

get_sequencing_group_ids(only_active: bool = True) list[str][source]#

Get flat list of all sequencing group IDs corresponding to this target.

get_sequencing_groups(only_active: bool = True) list[SequencingGroup][source]#

Get flat list of all sequencing groups corresponding to this target.

rich_id_map() dict[str, str][source]#

Map of internal IDs to participant or external IDs, if the latter is provided.

set_alignment_inputs_hash()[source]#

Unique hash string of sample alignment inputs. Useful to decide whether the analysis on the target needs to be rerun.

property target_id: str#

ID should be unique across target of all levels.

We are raising NotImplementedError instead of making it an abstract class, because mypy is not happy about binding TypeVar to abstract classes, see: https://stackoverflow.com/questions/48349054/how-do-you-annotate-the-type-of -an-abstract-class-with-mypy

Specifically, ` TypeVar('TargetT', bound=Target) ` Will raise: ` Only concrete class can be given where "Type[Target]" is expected `

cpg_workflows.targets.hash_from_list_of_strings(string_list: list[str], hash_length: int = 10, suffix: str | None = None) str[source]#

Create a hash from a list of strings :param string_list (): :param hash_length: how many characters to use from the hash :type hash_length: int :param suffix: optional, clarify the type of value which was hashed :type suffix: str

Returns: