"""Provides access to config variables."""
import copy
import os
from shlex import quote
from typing import Any
import toml
from frozendict import frozendict
from cpg_utils import Path, to_path
from cpg_utils.cloud import find_image
AR_GUID_NAME = 'ar-guid'
# We use these globals for lazy initialization, but pylint doesn't like that.
# pylint: disable=global-statement, invalid-name
CONFIG_TYPE = frozendict[str, Any]
_config_paths = _val.split(',') if (_val := os.getenv('CPG_CONFIG_PATH')) else []
_config: CONFIG_TYPE | None = None # Cached config, initialized lazily.
# region GET_SET_CONFIG
def _validate_configs(config_paths: list[str]) -> None:
if [p for p in config_paths if not p.endswith('.toml')]:
raise ValueError(
f'All config files must have ".toml" extensions, got: {config_paths}',
)
paths = [to_path(p) for p in config_paths]
if bad_paths := [p for p in paths if not p.exists()]:
raise ValueError(f'Some config files do not exist: {bad_paths}')
# Reading each file to validate syntax:
exception_by_path: dict[Path, toml.decoder.TomlDecodeError] = {}
for p in paths:
with p.open() as f:
try:
toml.loads(f.read())
except toml.decoder.TomlDecodeError as e:
exception_by_path[p] = e
if exception_by_path:
msg = 'Failed parsing some config files:'
for path, exception in exception_by_path.items():
msg += f'\n\t{path}: {exception}'
raise ValueError(msg)
[docs]
def get_config_paths() -> list[str]:
"""
Returns the config paths that are used by subsequent calls to get_config.
If this isn't called, the value of the CPG_CONFIG_PATH environment variable is used
instead.
Returns
-------
list[str]
"""
global _config_paths
if not _config_paths:
env_val = os.getenv('CPG_CONFIG_PATH')
_config_paths = env_val.split(',') if env_val else []
if not _config_paths:
raise ConfigError(
'Either set the CPG_CONFIG_PATH environment variable or call set_config_paths',
)
return _config_paths
[docs]
def set_config_paths(config_paths: list[str]) -> None:
"""
Sets the config paths that are used by subsequent calls to get_config.
If this isn't called, the value of the CPG_CONFIG_PATH environment variable is used
instead.
Parameters
----------
config_paths: list[str]
A list of cloudpathlib-compatible paths to TOML files containing configurations.
"""
global _config_paths, _config
if _config_paths != config_paths:
_validate_configs(config_paths)
_config_paths = config_paths
os.environ['CPG_CONFIG_PATH'] = ','.join(_config_paths)
_config = None # Make sure the config gets reloaded.
[docs]
def prepend_config_paths(config_paths: list[str]) -> None:
"""
Prepend to the list of config paths. Equivalent to `dict.set_defaults`: any
values in current CPG_CONFIG_PATH will have the precedence over the provided
`config_paths` when merging the configs.
"""
_new_config_paths = copy.copy(config_paths)
if _config_paths:
_new_config_paths.extend(_config_paths)
set_config_paths(_new_config_paths)
[docs]
def append_config_paths(config_paths: list[str]) -> None:
"""
Append to the list of config paths. Any values in new configs will have the
precedence over the existing CPG_CONFIG_PATH when merging the configs.
"""
_new_config_paths = copy.copy(config_paths)
if _config_paths:
_new_config_paths = _config_paths + _new_config_paths
set_config_paths(_new_config_paths)
[docs]
def get_config(print_config: bool = False) -> CONFIG_TYPE:
"""
Returns the configuration dictionary.
Consider using `config_retrieve(keys)` instead.
Call `set_config_paths` beforehand to override the default path.
See `read_configs` for the path value semantics.
Notes
-----
Caches the result based on the config paths alone.
Returns
-------
dict
"""
global _config
if _config is None: # Lazily initialize the config.
_config = read_configs(get_config_paths())
# Print the config content, which is helpful for debugging.
if print_config:
print(
f'Configuration at {",".join(_config_paths)}:\n{toml.dumps(dict(_config))}',
)
if not _config:
raise ConfigError('No config found')
return _config
[docs]
def read_configs(config_paths: list[str]) -> CONFIG_TYPE:
"""
Creates a merged configuration from the given config paths.
This does NOT affect any state, re get_config.
For a list of configurations (e.g. ['base.toml', 'override.toml']), the
configurations get applied from left to right. I.e. the first config gets updated by
values of the second config, etc.
Returns
-------
dict
"""
if not config_paths:
raise ValueError('No config paths provided')
config: dict = {}
for path in config_paths:
with to_path(path).open() as f:
config_str = f.read()
update_dict(config, toml.loads(config_str))
return frozendict(config)
[docs]
def update_dict(d1: dict, d2: dict) -> dict:
"""
Updates the d1 dict with the values from the d2 dict recursively in-place.
Returns the pointer to d1 (the same as )
>>> update_dict({'a': 1, 'b': {'c': 1}}, {'b': {'c': 2, 'd': 2}})
{'a': 1, 'b': {'c': 2, 'd': 2}}
"""
for k, v2 in d2.items():
v1 = d1.get(k)
if isinstance(v1, dict) and isinstance(v2, dict):
update_dict(v1, v2)
else:
d1[k] = v2
return d1
# endregion GET_SET_CONFIG
[docs]
class ConfigError(Exception):
"""
Error retrieving keys from config.
"""
[docs]
class UnsuppliedDefault:
pass
[docs]
def config_retrieve(
key: list[str] | str,
default: Any | None = UnsuppliedDefault,
config: CONFIG_TYPE | dict[str, Any] | None = None,
) -> Any:
"""
Retrieve key from config, assuming nested key specified as a list of strings.
>> config_retrieve(['workflow', 'access_level'], config={'workflow': {'access_level': 'test'}})
'test'
>> config_retrieve(['workflow', 'access_level'], config={}, default='default')
'default'
>> config_retrieve('workflow', config={})
ConfigError("Key 'workflow' not found in {}")
>> config_retrieve(['key1', 'key2', 'key3'], config={'key1': {'key2': {}}})
ConfigError('Key "key3" not found in {} (path: key1 -> key2)')
Allow None as default value
>> config_retrieve(['key1', 'key2', 'key3'], config={}, default=None) is None
True
"""
if default is UnsuppliedDefault:
d = config if config is not None else get_config()
else:
try:
d = config if config is not None else get_config()
except ConfigError:
return default
if isinstance(key, str):
key = [key]
if not key:
raise ValueError('Key cannot be empty')
for idx, k in enumerate(key):
if k not in d:
if default is UnsuppliedDefault:
message = f'Key "{k}" not found in {d}'
if idx > 0:
key_bits = ' -> '.join(key[: idx + 1])
message += f' (path: {key_bits})'
raise ConfigError(message)
return default
d = d[k]
return d
[docs]
def get_driver_image() -> str:
"""
Get the driver image from the config.
"""
return config_retrieve(['workflow', 'driver_image'])
[docs]
def get_access_level() -> str:
"""
Get access level from the config.
"""
return config_retrieve(['workflow', 'access_level'])
[docs]
def get_gcp_project() -> str:
return config_retrieve(['workflow', 'dataset_gcp_project'])
[docs]
def get_cpg_namespace(access_level: str | None = None) -> str:
"""
Get storage namespace from the access level.
"""
access_level = access_level or get_access_level()
return 'test' if access_level == 'test' else 'main'
[docs]
def try_get_ar_guid():
"""Attempts to get the AR GUID from the environment.
This is a fallback for when the AR GUID is not available in the config.
"""
return config_retrieve(['workflow', AR_GUID_NAME], default=None)
# region PATHS
[docs]
def dataset_path(
suffix: str,
category: str | None = None,
dataset: str | None = None,
test: bool = False,
) -> str:
"""
Returns a full path for the current dataset, given a category and a path suffix.
This is useful for specifying input files, as in contrast to the `output_path`
function, `dataset_path` does _not_ take the `workflow/output_prefix` config
variable into account.
Assumes the config structure like below, which is auto-generated by
the analysis-runner:
```toml
[workflow]
access_level = "standard"
[storage.default]
default = "gs://thousand-genomes-main"
web = "gs://cpg-thousand-genomes-main-web"
analysis = "gs://cpg-thousand-genomes-main-analysis"
tmp = "gs://cpg-thousand-genomes-main-tmp"
web_url = "https://main-web.populationgenomics.org.au/thousand-genomes"
[storage.thousand-genomes]
default = "gs://cpg-thousand-genomes-main"
web = "gs://cpg-thousand-genomes-main-web"
analysis = "gs://cpg-thousand-genomes-main-analysis"
tmp = "gs://cpg-thousand-genomes-main-tmp"
web_url = "https://main-web.populationgenomics.org.au/thousand-genomes"
```
Examples
--------
Assuming that the analysis-runner has been invoked with
`--dataset fewgenomes --access-level test`:
> from cpg_utils.hail_batch import dataset_path
> dataset_path('1kg_densified/combined.mt')
'gs://cpg-fewgenomes-test/1kg_densified/combined.mt'
> dataset_path('1kg_densified/report.html', 'web')
'gs://cpg-fewgenomes-test-web/1kg_densified/report.html'
> dataset_path('1kg_densified/report.html', 'web', test=True)
'gs://cpg-fewgenomes-test-web/1kg_densified/report.html'
> dataset_path('1kg_densified/report.html', 'web_url')
'https://main-web.populationgenomics.org.au/fewgenomes/1kg_densified/report.html'
Notes
-----
* If you specify test=True, the `workflow/access_level` config variable is required
Parameters
----------
suffix : str
A path suffix to append to the bucket.
category : str, optional
A category like "tmp", "web", etc., defaults to "default" if omited.
dataset : str, optional
Dataset name, takes precedence over the `workflow/dataset` config variable
test : bool
Return "test" namespace version of the path
Returns
-------
str
"""
config = get_config()
if 'storage' not in config:
raise ConfigError('Storage section not found in config')
if dataset and dataset not in config['storage']:
raise ConfigError(
f'Storage section for dataset "{dataset}" not found in config. '
f'Please check that you have permissions to the dataset. '
f'Expected section: [storage.{dataset}]',
)
dataset = dataset or 'default'
# manual redirect to test paths
if test:
if 'workflow' not in config:
raise ConfigError('Workflow section not found in config')
if 'access_level' not in config['workflow']:
raise ConfigError('Access level not found in workflow section of config')
if config['workflow']['access_level'] != 'test':
section = get_config()['storage'][dataset]['test']
else:
section = get_config()['storage'][dataset]
else:
section = get_config()['storage'][dataset]
category = category or 'default'
prefix = section.get(category)
if not prefix:
raise ConfigError(
f'Category "{category}" not found in storage section '
f'for dataset "{dataset}": {section}',
)
return os.path.join(prefix, suffix)
[docs]
def cpg_test_dataset_path(
suffix: str,
category: str | None = None,
dataset: str | None = None,
) -> str:
"""
CPG-specific method to get corresponding test paths when running
from the main namespace.
"""
return dataset_path(suffix, category, dataset, test=True)
[docs]
def web_url(suffix: str = '', dataset: str | None = None, test: bool = False) -> str:
"""
Web URL to match the dataset_path of category 'web_url'.
"""
return dataset_path(suffix=suffix, dataset=dataset, category='web_url', test=test)
[docs]
def output_path(
suffix: str,
category: str | None = None,
dataset: str | None = None,
test: bool = False,
) -> str:
"""
Returns a full path for the given category and path suffix.
In contrast to the `dataset_path` function, `output_path` takes the
`workflow/output_prefix` config variable into account.
Examples
--------
If using the analysis-runner, the `workflow/output_prefix` would be set to the
value provided using the --output argument, e.g.:
```
analysis-runner --dataset fewgenomes --access-level test --output 1kg_pca/v42` ...
```
will use '1kg_pca/v42' as the base path to build upon in this method:
> from cpg_utils.hail_batch import output_path
> output_path('loadings.ht')
'gs://cpg-fewgenomes-test/1kg_pca/v42/loadings.ht'
> output_path('report.html', 'web')
'gs://cpg-fewgenomes-test-web/1kg_pca/v42/report.html'
Notes
-----
Requires the `workflow/output_prefix` config variable to be set, in addition to the
requirements for `dataset_path`.
Parameters
----------
suffix : str
A path suffix to append to the bucket + output directory.
category : str, optional
A category like "tmp", "web", etc., defaults to "default" if ommited.
dataset : str, optional
Dataset name, takes precedence over the `workflow/dataset` config variable
test : bool, optional
Boolean - if True, generate a test bucket path. Default to False.
Returns
-------
str
"""
output_prefix = config_retrieve(['workflow', 'output_prefix'])
return dataset_path(
os.path.join(output_prefix, suffix),
category=category,
dataset=dataset,
test=test,
)
[docs]
def image_path(
key: str,
version: str | list[str] | None = None,
repository: str | None = None,
) -> str:
"""
Returns a path to a container image for the given key (i.e., image name)
and version.
Examples
--------
>> image_path('bcftools', '1.16-1')
'australia-southeast1-docker.pkg.dev/cpg-common/images/bcftools:1.16-1'
Parameters
----------
key : str
Specifies the image name.
When `version` is not specified:
Describes the key within the `images` config section. Can list sections
separated with '/'.
version : str or list[str], optional
Specifies the desired image version, e.g., '1.18-1', either directly as
a version number string or indirectly via a config key list which will
be used to retrieve a version number string via `config_retrieve`.
repository : str, optional
The suffix (e.g., 'dev' for images-dev) of an artifact registry repository
to be used instead of the default production images repository.
Using `image_path(key)` without giving `version` is deprecated. In future,
specifying it will be required.
Returns
-------
str
"""
if version is None:
return config_retrieve(['images', *key.strip('/').split('/')])
if isinstance(version, list):
version = config_retrieve(version)
assert isinstance(version, str)
return find_image(repository, key, version).tag_uri
[docs]
def reference_path(key: str) -> str:
"""
Returns a path to a reference resource using key in config's "references" section.
Examples
--------
>> reference_path('vep_mount')
'gs://cpg-common-main/references/vep/105.0/mount'
>> reference_path('broad/genome_calling_interval_lists')
'gs://cpg-common-main/references/hg38/v0/wgs_calling_regions.hg38.interval_list'
Assuming config structure as follows:
```toml
[references]
vep_mount = 'gs://cpg-common-main/references/vep/105.0/mount'
[references.broad]
genome_calling_interval_lists = 'gs://cpg-common-main/references/hg38/v0/wgs_calling_regions.hg38.interval_list'
```
Parameters
----------
key : str
Describes the key within the `references` config section. Can list sections
separated with '/'.
Returns
-------
str
"""
return config_retrieve(['references', *key.strip('/').split('/')])
[docs]
def genome_build() -> str:
"""
Return the default genome build name
"""
return config_retrieve(['references', 'genome_build'], default='GRCh38')
[docs]
def get_gcloud_set_project(gcp_project: str | None = None) -> str:
"""
Get the gcloud command to set the project.
"""
gcp_project = gcp_project or get_gcp_project()
command = ['gcloud', 'config', 'set', 'project', gcp_project]
return ' '.join([quote(c) for c in command])