Source code for cpg_utils.cromwell_model
# flake8: noqa
from enum import Enum
from textwrap import indent
import datetime
from tabulate import tabulate
from cpg_utils.constants import AnsiColors
[docs]
class ExecutionStatus(Enum):
preparing = 'preparing'
starting = 'starting'
in_progress = 'inprogress'
running = 'running'
done = 'done'
succeeded = 'succeeded'
failed = 'failed'
retryablefailure = 'retryablefailure'
queuedincromwell = 'queuedincromwell'
def __str__(self):
return self.value
@property
def _symbols(self):
return {
ExecutionStatus.starting: '...',
ExecutionStatus.preparing: '...',
ExecutionStatus.queuedincromwell: '...',
ExecutionStatus.in_progress: '~',
ExecutionStatus.running: '~',
ExecutionStatus.done: '#',
ExecutionStatus.succeeded: '#',
ExecutionStatus.failed: '!',
ExecutionStatus.retryablefailure: '~!',
}
@property
def _colors(self):
return {
ExecutionStatus.done: AnsiColors.BRIGHTGREEN,
ExecutionStatus.succeeded: AnsiColors.BRIGHTGREEN,
ExecutionStatus.failed: AnsiColors.BRIGHTRED,
ExecutionStatus.retryablefailure: AnsiColors.BRIGHTBLUE,
}
[docs]
def is_finished(self):
_finished_states = {
ExecutionStatus.done,
ExecutionStatus.succeeded,
ExecutionStatus.failed,
ExecutionStatus.retryablefailure,
}
return self in _finished_states
[docs]
class WorkflowMetadataModel:
def __init__(
self,
workflowName=None,
workflowProcessingEvents=None,
metadataSource=None,
actualWorkflowLanguageVersion=None,
submittedFiles=None,
calls: dict[str, list['CallMetadata']] | None = None,
outputs=None,
workflowRoot=None,
actualWorkflowLanguage=None,
id=None,
inputs=None,
labels=None,
submission=None,
status=None,
end=None,
start=None,
**kwargs,
):
self.workflowName = workflowName
self.workflowProcessingEvents = workflowProcessingEvents
self.metadataSource = metadataSource
self.actualWorkflowLanguageVersion = actualWorkflowLanguageVersion
self.submittedFiles = submittedFiles
self.calls = calls
self.outputs = outputs
self.workflowRoot = workflowRoot
self.actualWorkflowLanguage = actualWorkflowLanguage
self.id = id
self.inputs = inputs
self.labels = labels
self.submission = submission
self.status = (
ExecutionStatus(status.lower()) if status else ExecutionStatus.preparing
)
self.end = end
self.start = start
# safety
for k, v in kwargs.items():
self.__setattr__(k, v)
[docs]
@staticmethod
def parse(d):
new_d = {**d}
calls_d = new_d.pop('calls')
calls = {}
for name, sublist in calls_d.items():
name = name.split(".")[-1]
calls[name] = sorted(
[CallMetadata.parse({'name': name, **call}) for call in sublist],
key=lambda c: f'{c.shardIndex or 0}-{c.start}',
)
return WorkflowMetadataModel(calls=calls, **new_d)
[docs]
def display(self, expand_completed: bool = False, monochrome: bool = False):
duration_seconds = get_seconds_duration_between_cromwell_dates(
self.start, self.end
)
walltime_seconds = get_seconds_duration_between_cromwell_dates(
self.submission, self.end
)
headers = [
('Workflow ID', self.id),
('Name', self.workflowName),
('Status', self.status),
('Submitted', self.submission),
('Start', self.start),
('End', self.end),
('Duration', get_readable_duration(duration_seconds)),
('Walltime', get_readable_duration(walltime_seconds)),
]
calls_display: list[str] = []
for name, calls in sorted(
(self.calls or {}).items(), key=lambda a: a[1][0].start or '0'
):
calls_display.append(
indent(
prepare_inner_calls_string(
name,
calls,
expand_completed=expand_completed,
monochrome=monochrome,
),
' ',
)
)
header = tabulate(headers)
calls_str = '\n'.join(calls_display)
return f"""
{header}
Jobs:
{calls_str}
"""
[docs]
class CallMetadata:
"""Python model for cromwell CallMetadata"""
def __init__(
self,
name,
executionStatus,
stdout=None,
backendStatus=None,
compressedDockerSize=None,
commandLine=None,
shardIndex=None,
outputs=None,
runtimeAttributes=None,
callCaching=None,
inputs=None,
returnCode=None,
jobId=None,
backend=None,
end=None,
dockerImageUsed=None,
stderr=None,
callRoot=None,
attempt=None,
executionEvents=None,
start=None,
preemptible=None,
jes=None,
failures=None,
calls: dict[str, list['CallMetadata']] | None = None,
**kwargs,
):
self.name = name
self.executionStatus = (
ExecutionStatus(executionStatus.lower()) if executionStatus else None
)
self.stdout = stdout
self.backendStatus = backendStatus
self.compressedDockerSize = compressedDockerSize
self.commandLine = commandLine
self.shardIndex = int(shardIndex) if shardIndex else None
self.outputs = outputs
self.runtimeAttributes = runtimeAttributes
self.callCaching = callCaching
self.inputs = inputs
self.returnCode = returnCode
self.jobId = jobId
self.backend = backend
self.end = end
self.dockerImageUsed = dockerImageUsed
self.stderr = stderr
self.callRoot = callRoot
self.attempt = attempt
self.executionEvents = executionEvents
self.start = start
self.preemptible = preemptible
self.calls = calls
self.jes = jes
self.failures = failures
# safety
for k, v in kwargs.items():
self.__setattr__(k, v)
[docs]
@staticmethod
def parse(d):
new_d = {**d}
calls = None
if 'subWorkflowMetadata' in new_d:
calls_d = new_d.pop('subWorkflowMetadata').get('calls')
calls = {}
for name, sublist in calls_d.items():
name = name.split(".")[-1]
calls[name] = sorted(
[CallMetadata.parse({'name': name, **call}) for call in sublist],
key=lambda c: (c.shardIndex or 0, c.start),
)
return CallMetadata(calls=calls, **new_d)
[docs]
def display(self, expand_completed=False, monochrome=False):
duration_str = get_readable_duration(
get_seconds_duration_between_cromwell_dates(self.start, self.end)
)
extras = []
is_done = self.executionStatus.is_finished() if self.executionStatus else False
has_succeded = self.executionStatus == ExecutionStatus.succeeded
if (not has_succeded or expand_completed) and self.calls:
for name, calls in sorted(
self.calls.items(), key=lambda a: a[1][0].start or '0'
):
extras.append(
indent(
prepare_inner_calls_string(
name,
calls,
expand_completed=expand_completed,
monochrome=monochrome,
),
' ',
)
)
if not is_done:
if self.jobId:
extras.append(f'JobID: {self.jobId}')
else:
if self.callCaching and self.callCaching.get('hit'):
extras.append(f'Call caching: true')
if not self.calls and self.executionStatus == ExecutionStatus.failed:
extras.append(f'stdout: {self.stdout}')
extras.append(f'stderr: {self.stderr}')
extras.append(f'rc: {self.returnCode}')
if self.failures:
caused_by = unwrap_caused_by(self.failures)
if caused_by:
extras.append(f'error: {caused_by}')
name = self.name
if self.shardIndex is not None and self.shardIndex >= 0:
name += f' (shard-{self.shardIndex})'
if self.attempt is not None and self.attempt > 1:
name += f' (attempt {self.attempt})'
symbol = self.executionStatus.symbol() if self.executionStatus else '?'
color, rcol = '', ''
if not monochrome:
color = self.executionStatus.color() if self.executionStatus else ''
rcol = AnsiColors.RESET
extras_str = "".join("\n" + indent(e, ' ') for e in extras)
return f'{color}[{symbol}] {name} ({duration_str}){extras_str}{rcol}'
[docs]
def unwrap_caused_by(failures: list):
inner_failures = []
for failure in failures:
m = failure.get('message', '')
caused_by = failure.get('causedBy')
if caused_by:
m += f', caused by: {unwrap_caused_by(caused_by)}'
if m:
inner_failures.append(m)
return " & ".join(inner_failures)
[docs]
def prepare_inner_calls_string(
name, calls: list['CallMetadata'], expand_completed=False, monochrome=False
):
if len(calls) == 1:
return calls[0].display(
expand_completed=expand_completed,
monochrome=monochrome,
)
collapsed_status = collapse_status_of_calls(calls)
status = collapsed_status.symbol() if collapsed_status else '?'
color, rcol = '', ''
if not monochrome:
color = collapsed_status.color() if collapsed_status else ''
rcol = AnsiColors.RESET
inner_calls = ''
if len(calls) > 1 and not expand_completed:
name += f' ({len(calls)} jobs)'
if len(calls) > 0:
starts = [c.start for c in calls if c.start is not None]
finishes = [c.end for c in calls if c.end is not None]
start, finish = None, None
if starts:
start = min(starts)
if finishes:
finish = max(finishes)
name += f' ({get_readable_duration(get_seconds_duration_between_cromwell_dates(start, finish))})'
if expand_completed or collapsed_status not in [
ExecutionStatus.done,
ExecutionStatus.succeeded,
]:
inner_calls = "\n" + indent(
'\n'.join(
c.display(expand_completed=expand_completed, monochrome=monochrome)
for c in calls
),
' ',
)
return f'{color}[{status}] {name}{inner_calls}{rcol}'
[docs]
def collapse_status_of_calls(calls: list['CallMetadata']):
collapsed = set(c.executionStatus for c in calls)
if any(
status in collapsed
for status in [
ExecutionStatus.preparing,
ExecutionStatus.in_progress,
ExecutionStatus.running,
]
):
return ExecutionStatus.in_progress
if ExecutionStatus.failed in collapsed:
return ExecutionStatus.failed
if len(collapsed) != 1:
# hmm, don't know yet
return ExecutionStatus.in_progress
else:
return list(collapsed)[0]
[docs]
def get_seconds_duration_between_cromwell_dates(start, end):
s, e = None, None
if start:
s = datetime.datetime.strptime(start, "%Y-%m-%dT%H:%M:%S.%f%z").replace(
tzinfo=None
)
if end:
e = datetime.datetime.strptime(end, "%Y-%m-%dT%H:%M:%S.%f%z").replace(
tzinfo=None
)
if not s:
return None
return int(((e or datetime.datetime.utcnow()) - s).total_seconds())
[docs]
def get_readable_duration(seconds: int | None):
"""
>>> get_readable_duration(86401)
'1d:0h:0m:1s'
>>> get_readable_duration(100)
'1m:40s'
>>> get_readable_duration(3)
'3s'
"""
if seconds is None:
return "?"
if seconds < 0:
return "In the future..."
if seconds < 1:
return '<1s'
intervals = [
(365 * 86400, "y"),
(7 * 86400, 'w'),
(86400, 'd'),
(3600, 'h'),
(60, 'm'),
(1, 's'),
]
periods = []
has_seen_value = False
for interval, suffix in intervals:
if interval > seconds and not has_seen_value:
continue
has_seen_value = True
intervals, seconds = divmod(seconds, interval) # type: ignore
periods.append(f'{intervals}{suffix}')
# weird if we get to here, but sure
return ":".join(periods)