Hail Batch#

Convenience functions related to Hail.

class cpg_utils.hail_batch.Batch(name: str, backend: LocalBackend | ServiceBackend, *, pool_label: str | None = None, attributes: dict[str, str] | None = None, **kwargs: Any)[source]#

Thin subclass of the Hail Batch class. The aim is to be able to register created jobs, in order to print statistics before submitting the Batch.

run(**kwargs: Any)[source]#

Execute a batch. Overridden to print pre-submission statistics. Pylint disables: - R1710: Either all return statements in a function should return an expression,

or none of them should. - if no jobs are present, no batch is returned. Hail should have this behaviour…

  • W0221: Arguments number differs from overridden method - this wrapper makes use of **kwargs, which is being passed to the super().run() method

class cpg_utils.hail_batch.DefaultOverrideServiceBackend(*, billing_project: str, sync_fs: FS, async_fs: RouterAsyncFS, batch_client: BatchClient, batch: Batch, disable_progress_bar: bool, remote_tmpdir: str, driver_cores: str | int | None, driver_memory: str | None, worker_cores: str | int | None, worker_memory: str | None, regions: List[str], async_exit_stack: AsyncExitStack)[source]#
property jar_spec: dict#
cpg_utils.hail_batch.authenticate_cloud_credentials_in_job(job: BashJob, print_all_statements: bool = True)[source]#

Takes a hail batch job, activates the appropriate service account

Once multiple environments are supported this method will decide on which authentication method is appropriate

Parameters:
  • job

    • A hail BashJob

  • print_all_statements

    • logging toggle

Return type:

None

cpg_utils.hail_batch.command(cmd: str | list[str], monitor_space: bool = False, setup_gcp: bool = False, define_retry_function: bool = False, rm_leading_space: bool = True, python_script_path: CloudPath | Path | None = None) str[source]#

Wraps a command for Batch.

@param cmd: command to wrap (can be a list of commands) @param monitor_space: add a background process that checks the instance disk

space every 5 minutes and prints it to the screen

@param setup_gcp: authenticate on GCP @param define_retry_function: when set, adds bash functions retry that attempts

to redo a command after every 30 seconds (useful to pull inputs and get around GoogleEgressBandwidth Quota or other google quotas)

@param rm_leading_space: remove all leading spaces and tabs from the command lines @param python_script_path: if provided, copy this python script into the command

cpg_utils.hail_batch.copy_common_env(job: Job) None[source]#

Copies common environment variables that we use to run Hail jobs.

These variables are typically set up in the analysis-runner driver, but need to be passed through for “batch-in-batch” use cases.

The environment variable values are extracted from the current process and copied to the environment dictionary of the given Hail Batch job.

cpg_utils.hail_batch.cpg_namespace(*args, **kwargs)[source]#
cpg_utils.hail_batch.cpg_test_dataset_path(*args, **kwargs)[source]#
cpg_utils.hail_batch.fasta_res_group(b: Batch, indices: list[str] | None = None)[source]#

Hail Batch resource group for fasta reference files. @param b: Hail Batch object. @param indices: list of extensions to add to the base fasta file path.

cpg_utils.hail_batch.get_batch(name: str | None = None, *, default_python_image: str | None = None, attributes: dict[str, str] | None = None, **kwargs: Any) Batch[source]#

Wrapper around Hail’s Batch class, which allows to register created jobs This has been migrated (currently duplicated) out of cpg_workflows

Parameters:
  • name (str, optional, name for the batch)

  • default_python_image (str, optional, default python image to use)

Returns:

  • If there are scheduled jobs, return the batch

  • If there are no jobs to create, return None

cpg_utils.hail_batch.image_path(*args, **kwargs)[source]#
cpg_utils.hail_batch.init_batch(**kwargs: Any)[source]#

Initializes the Hail Query Service from within Hail Batch. Requires the hail/billing_project and hail/bucket config variables to be set.

Parameters:

kwargs (keyword arguments) – Forwarded directly to hl.init_batch.

cpg_utils.hail_batch.make_job_name(name: str, sequencing_group: str | None = None, participant_id: str | None = None, dataset: str | None = None, part: str | None = None) str[source]#

Extend the descriptive job name to reflect job attributes.

cpg_utils.hail_batch.output_path(*args, **kwargs)[source]#
cpg_utils.hail_batch.prepare_git_job(job: BashJob, repo_name: str, commit: str, organisation: str = 'populationgenomics', is_test: bool = True, print_all_statements: bool = True, get_deploy_token: bool = True)[source]#
Takes a hail batch job, and:
  • Clones the repository
    • if access_level != “test”: check the desired commit is on ‘main’

  • Check out the specific commit

Parameters:
  • BashJob (job - A hail)

  • organisation (organisation - The GitHub individual or)

  • out (commit - The commit hash to check)

  • out

  • specific (is_test - CPG)

  • toggle (print_all_statements - logging)

Return type:

No return required

cpg_utils.hail_batch.query_command(module: Any, func_name: str, *func_args: Any, setup_gcp: bool = False, setup_hail: bool = True, packages: list[str] | None = None, init_batch_args: dict[str, str | int] | None = None) str[source]#

Construct a command to run a python function inside a Hail Batch job. If hail_billing_project is provided, Hail Query will be also initialised.

Run a Python Hail Query function inside a Hail Batch job. Constructs a command string to use with job.command(). If hail_billing_project is provided, Hail Query will be initialised.

init_batch_args can be used to pass additional arguments to init_batch. this is a dict of args, which will be placed into the batch initiation command e.g. {‘worker_memory’: ‘highmem’} -> ‘init_batch(worker_memory=”highmem”)’

cpg_utils.hail_batch.reference_path(*args, **kwargs)[source]#
cpg_utils.hail_batch.remote_tmpdir(hail_bucket: str | None = None) str[source]#

Returns the remote_tmpdir to use for Hail initialization.

If hail_bucket is not specified explicitly, requires the hail/bucket config variable to be set.

cpg_utils.hail_batch.reset_batch()[source]#

Reset the global batch reference, useful for tests

cpg_utils.hail_batch.run_batch_job_and_print_url(batch: Batch, wait: bool, environment: str) str | None[source]#

Call batch.run(), return the URL, and wait for job to finish if wait=True

cpg_utils.hail_batch.start_query_context(query_backend: Literal['spark', 'batch', 'local', 'spark_local'] | None = None, log_path: str | None = None, dataset: str | None = None, billing_project: str | None = None)[source]#

Start Hail Query context, depending on the backend class specified in the hail/query_backend TOML config value.

cpg_utils.hail_batch.web_url(*args, **kwargs)[source]#