Source code for cpg_utils.cloud

"""Convenience functions related to cloud infrastructure."""

import json
import os
import re
import subprocess
import traceback
import urllib.parse
from collections import defaultdict
from typing import Any, NamedTuple

# pylint: disable=no-name-in-module
import google.api_core.exceptions
import google.auth.transport
import google.oauth2
from deprecated import deprecated
from google.auth import (
    credentials as google_auth_credentials,
)
from google.auth import (
    environment_vars,
    exceptions,
    jwt,
)
from google.auth._default import (
    _AUTHORIZED_USER_TYPE,
    _EXTERNAL_ACCOUNT_TYPE,
    _SERVICE_ACCOUNT_TYPE,
)
from google.auth.transport import requests
from google.cloud import artifactregistry, secretmanager
from google.oauth2 import credentials as oauth2_credentials
from google.oauth2 import service_account

_CLOUD_SDK_MISSING_CREDENTIALS = """\
Your default credentials were not found. To set up Application Default Credentials, \
see https://cloud.google.com/docs/authentication/external/set-up-adc for more information.\
"""

IMPLEMENTED_CREDENTIALS_TYPES = (
    _AUTHORIZED_USER_TYPE,
    _SERVICE_ACCOUNT_TYPE,
    _EXTERNAL_ACCOUNT_TYPE,
)


[docs] def email_from_id_token(id_token_jwt: str) -> str: """Decodes the ID token (JWT) to get the email address of the caller. See for details https://developers.google.com/identity/sign-in/web/backend-auth?authuser=0#verify-the-integrity-of-the-id-token This function assumes that the token has been verified beforehand.""" return jwt.decode(id_token_jwt, verify=False)['email']
[docs] def read_secret( project_id: str, secret_name: str, fail_gracefully: bool = True, ) -> str | None: """Reads the latest version of a GCP Secret Manager secret. Returns None if the secret doesn't exist or there was a problem retrieving it, unless `fail_gracefully` is set to False.""" secret_manager = secretmanager.SecretManagerServiceClient() secret_path = secret_manager.secret_version_path(project_id, secret_name, 'latest') try: # noinspection PyTypeChecker response = secret_manager.access_secret_version(request={'name': secret_path}) return response.payload.data.decode('UTF-8') except google.api_core.exceptions.ClientError: # Fail gracefully if there's no secret version yet. if fail_gracefully: traceback.print_exc() return None raise except AttributeError: # Sometimes the google API fails when no version is present, with: # File "{site-packages}/google/api_core/exceptions.py", # line 532, in from_grpc_error if isinstance(rpc_exc, grpc.Call) or _is_informative_grpc_error(rpc_exc): # AttributeError: 'NoneType' object has no attribute 'Call' if fail_gracefully: traceback.print_exc() return None raise
[docs] def write_secret(project_id: str, secret_name: str, secret_value: str) -> None: """ Adds a new version for a GCP Secret Manager secret and disables all previous versions Parameters ---------- project_id secret_name secret_value Returns ------- """ secret_manager = secretmanager.SecretManagerServiceClient() secret_path = secret_manager.secret_path(project_id, secret_name) response = secret_manager.add_secret_version( request={ 'parent': secret_path, 'payload': {'data': secret_value.encode('UTF-8')}, }, ) # Disable all previous versions. for version in secret_manager.list_secret_versions(request={'parent': secret_path}): # Don't attempt to change the state of destroyed / already disabled secrets and # don't disable the latest version. if ( version.state == secretmanager.SecretVersion.State.ENABLED and version.name != response.name ): secret_manager.disable_secret_version(request={'name': version.name})
[docs] class DockerImage(NamedTuple): name: str uri: str tag_uri: str size: str build_time: str
_repo_image_tags: dict[str, defaultdict[str, dict[str, DockerImage]]] = {} def _ensure_image_tags_loaded(project: str, location: str, repository: str) -> None: """Populate _repo_image_tags as a map-of-map-of-maps of 'repository' -> 'imagename' -> 'tag' -> image.""" if repository in _repo_image_tags: return image_tags: defaultdict[str, dict[str, DockerImage]] = defaultdict(dict) request = artifactregistry.ListDockerImagesRequest( parent=f'projects/{project}/locations/{location}/repositories/{repository}', page_size=500, # Increase efficiency by making fewer requests ) for image in artifactregistry.ArtifactRegistryClient().list_docker_images(request): name_and_checksum = image.name.rpartition('/dockerImages/')[2] name = urllib.parse.unquote(name_and_checksum).rpartition('@')[0] base_uri = image.uri.rpartition('@')[0] for tag in image.tags: image_tags[name][tag] = DockerImage( image.name, image.uri, f'{base_uri}:{tag}', image.image_size_bytes, image.build_time, ) image_tags.default_factory = None _repo_image_tags[repository] = image_tags
[docs] def find_image(repository: str | None, name: str, version: str) -> DockerImage: """Returns image details or raises ValueError if the image or tag does not exist.""" repository = f'images-{repository}' if repository is not None else 'images' _ensure_image_tags_loaded('cpg-common', 'australia-southeast1', repository) try: return _repo_image_tags[repository][name][version] except KeyError as e: message = f'Image {name}:{version} not found in {repository} repository ({e} not found)' raise ValueError(message) from None
[docs] def get_google_identity_token( target_audience: str | None, request: google.auth.transport.Request | None = None, ) -> str: """Returns a Google identity token for the given audience.""" if request is None: request = requests.Request() # Unfortunately this requires different handling for at least # three different cases and the standard libraries don't provide # a single helper function that captures all of them: # https://github.com/googleapis/google-auth-library-python/issues/590 creds = _get_default_id_token_credentials(target_audience, request) creds.refresh(request) token = creds.token if not token: raise ValueError('Could not generate google identity token') return token
class IDTokenCredentialsAdapter(google_auth_credentials.Credentials): """Convert Credentials with ``openid`` scope to IDTokenCredentials.""" def __init__(self, credentials: oauth2_credentials.Credentials): super().__init__() self.credentials = credentials self.token = credentials.id_token @property def expired(self): """Returns the expired property.""" return self.credentials.expired def refresh(self, request: google.auth.transport.Request): """Refreshes the token.""" self.credentials.refresh(request) self.token = self.credentials.id_token class ExternalCredentialsAdapter(google_auth_credentials.Credentials): """ Wrapper around ExternalCredentials because I (mfranklin) cannot work out how to make the python version work, and have defaulted to using the gcloud command line. """ def __init__( self, audience: str | None, impersonate_id: str | None = None, ): super().__init__() self.token: str | None = None self.audience = audience impersonate_id = impersonate_id or os.environ.get('GOOGLE_IMPERSONATE_IDENTITY') if not impersonate_id: raise exceptions.DefaultCredentialsError( 'GOOGLE_IMPERSONATE_IDENTITY environment variable is not set. ' 'Cannot impersonate service account.', ) self.impersonate_id = impersonate_id def refresh(self, *args: Any, **kwargs: Any): # noqa: ARG002 """Call gcloud to get a new token.""" command = [ 'gcloud', 'auth', 'print-identity-token', f'--impersonate-service-account={self.impersonate_id}', '--include-email', ] if self.audience: command.append(f'--audiences={self.audience}') self.token = ( subprocess.check_output(command).decode('utf-8').strip() # noqa: S603 ) def _load_credentials_from_file( filename: str, target_audience: str | None, ) -> google_auth_credentials.Credentials | None: """ Loads credentials from a file. The credentials file must be a service account key or a stored authorized user credential. :param filename: The full path to the credentials file. :return: Loaded credentials :rtype: google.auth.credentials.Credentials :raise google.auth.exceptions.DefaultCredentialsError: if the file is in the wrong format or is missing. """ if not os.path.exists(filename): raise exceptions.DefaultCredentialsError(f'File {filename} was not found.') with open(filename, encoding='utf-8') as file_obj: try: info = json.load(file_obj) except json.JSONDecodeError as exc: raise exceptions.DefaultCredentialsError( f'File {filename} is not a valid json file.', ) from exc # The type key should indicate that the file is either a service account # credentials file or an authorized user credentials file. credential_type = info.get('type') if credential_type == _AUTHORIZED_USER_TYPE: current_credentials = oauth2_credentials.Credentials.from_authorized_user_info( info, scopes=['openid', 'https://www.googleapis.com/auth/userinfo.email'], ) return IDTokenCredentialsAdapter(credentials=current_credentials) if credential_type == _SERVICE_ACCOUNT_TYPE: try: return service_account.IDTokenCredentials.from_service_account_info( info, target_audience=target_audience, ) except ValueError as exc: raise exceptions.DefaultCredentialsError( f'Failed to load service account credentials from {filename}', ) from exc if credential_type == _EXTERNAL_ACCOUNT_TYPE: return ExternalCredentialsAdapter(audience=target_audience) raise exceptions.DefaultCredentialsError( f'The file {filename} does not have a valid type of google-cloud credentials. ' f'Type is {credential_type}, but cpg-utils only implements ' f'{IMPLEMENTED_CREDENTIALS_TYPES}.', ) def _get_explicit_environ_credentials( target_audience: str | None, ) -> google_auth_credentials.Credentials | None: """Gets credentials from the GOOGLE_APPLICATION_CREDENTIALS environment variable.""" explicit_file = os.environ.get(environment_vars.CREDENTIALS) if explicit_file is None: return None return _load_credentials_from_file( os.environ[environment_vars.CREDENTIALS], target_audience=target_audience, ) def _get_gcloud_sdk_credentials( target_audience: str | None, ) -> google_auth_credentials.Credentials | None: """Gets the credentials and project ID from the Cloud SDK.""" from google.auth import _cloud_sdk # pylint: disable=import-outside-toplevel # Check if application default credentials exist. credentials_filename = _cloud_sdk.get_application_default_credentials_path() if not os.path.isfile(credentials_filename): return None return _load_credentials_from_file( credentials_filename, target_audience, ) def _get_gce_credentials( target_audience: str | None, request: google.auth.transport.Request | None = None, ) -> google_auth_credentials.Credentials | None: """Gets credentials and project ID from the GCE Metadata Service.""" # Ping requires a transport, but we want application default credentials # to require no arguments. So, we'll use the _http_client transport which # uses http.client. This is only acceptable because the metadata server # doesn't do SSL and never requires proxies. # While this library is normally bundled with compute_engine, there are # some cases where it's not available, so we tolerate ImportError. # pylint: disable=import-outside-toplevel try: from google.auth import compute_engine from google.auth.compute_engine import _metadata except ImportError: return None from google.auth.transport import _http_client if request is None: request = _http_client.Request() if _metadata.ping(request=request): return compute_engine.IDTokenCredentials( request, target_audience, use_metadata_identity_endpoint=True, ) return None def _get_default_id_token_credentials( target_audience: str | None, request: google.auth.transport.Request | None = None, ) -> google_auth_credentials.Credentials: """Gets the default ID Token credentials for the current environment. `Application Default Credentials`_ provides an easy way to obtain credentials to call Google APIs for server-to-server or local applications. .. _Application Default Credentials: https://developers.google.com\ /identity/protocols/application-default-credentials :param target_audience: The intended audience for these credentials. :param request: An object used to make HTTP requests. This is used to detect whether the application is running on Compute Engine. If not specified, then it will use the standard library http client to make requests. :return: the current environment's credentials. :rtype: google.auth.credentials.Credentials :raises ~google.auth.exceptions.DefaultCredentialsError: If no credentials were found, or if the credentials found were invalid. """ checkers = ( lambda: _get_explicit_environ_credentials(target_audience), lambda: _get_gcloud_sdk_credentials(target_audience), lambda: _get_gce_credentials(target_audience, request), ) for checker in checkers: current_credentials = checker() if current_credentials is not None: return current_credentials raise exceptions.DefaultCredentialsError(_CLOUD_SDK_MISSING_CREDENTIALS)
[docs] def get_path_components_from_gcp_path(path: str) -> dict[str, str]: """ Return the {bucket_name}, {dataset}, {bucket_type}, {subdir}, and {file} for GS only paths Uses regex to match the full bucket name, dataset name, bucket type (e.g. 'test', 'main-upload', 'release'), subdirectory, and the file name. """ bucket_types = ['archive', 'hail', 'main', 'test', 'release'] # compile pattern matching all CPG bucket formats gspath_pattern = re.compile( r'gs://(?P<bucket>cpg-(?P<dataset>[\w-]+)-(?P<bucket_type>[' + '|'.join(s for s in bucket_types) + r']+[-\w]*))/(?P<suffix>.+/)?(?P<file>.*)$', ) # if a match succeeds, return the key: value dictionary if path_match := gspath_pattern.match(path): return path_match.groupdict() # raise an error if the input String was not a valid CPG bucket path raise ValueError('The input String did not match a valid GCP path')
[docs] def get_project_id_from_service_account_email(service_account_email: str) -> str: """ Get GCP project id from service_account_email >>> get_project_id_from_service_account_email('cromwell-test@tob-wgs.iam.gserviceaccount.com') 'tob-wgs' """ # quick and dirty return service_account_email.split('@')[-1].split('.')[0]
[docs] @deprecated(reason='Use cpg_utils.membership.is_member_in_cached_group instead') def is_member_in_cached_group(*args: Any, **kwargs: Any): from cpg_utils.membership import ( is_member_in_cached_group as _is_member_in_cached_group, ) return _is_member_in_cached_group(*args, **kwargs)