Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 32 additions & 16 deletions src/dvsim/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import os
import pprint
import sys
from collections.abc import Mapping
from abc import ABC, abstractmethod
from collections.abc import Mapping, Sequence
from pathlib import Path
from typing import ClassVar

import hjson

Expand All @@ -27,7 +29,7 @@


# Interface class for extensions.
class FlowCfg:
class FlowCfg(ABC):
"""Base class for the different flows supported by dvsim.py.

The constructor expects some parsed hjson data. Create these objects with
Expand All @@ -41,9 +43,10 @@ class FlowCfg:

# Can be overridden in subclasses to configure which wildcards to ignore
# when expanding hjson.
ignored_wildcards = []
ignored_wildcards: ClassVar = []

def __str__(self) -> str:
"""Get string representation of the flow config."""
return pprint.pformat(self.__dict__)

def __init__(self, flow_cfg_file, hjson_data, args, mk_config) -> None:
Expand Down Expand Up @@ -87,7 +90,7 @@ def __init__(self, flow_cfg_file, hjson_data, args, mk_config) -> None:
# For a primary cfg, it is the aggregated list of all deploy objects
# under self.cfgs. For a non-primary cfg, it is the list of items
# slated for dispatch.
self.deploy = []
self.deploy: Sequence[Deploy] = []

# Timestamp
self.timestamp_long = args.timestamp_long
Expand All @@ -98,7 +101,7 @@ def __init__(self, flow_cfg_file, hjson_data, args, mk_config) -> None:
self.rel_path = ""
self.results_title = ""
self.revision = ""
self.css_file = os.path.join(Path(os.path.realpath(__file__)).parent, "style.css")
self.css_file = Path(__file__).resolve().parent / "style.css"
# `self.results_*` below will be updated after `self.rel_path` and
# `self.scratch_base_root` variables are updated.
self.results_dir = ""
Expand Down Expand Up @@ -151,7 +154,7 @@ def __init__(self, flow_cfg_file, hjson_data, args, mk_config) -> None:
# Run any final checks
self._post_init()

def _merge_hjson(self, hjson_data) -> None:
def _merge_hjson(self, hjson_data: Mapping) -> None:
"""Take hjson data and merge it into self.__dict__.

Subclasses that need to do something just before the merge should
Expand All @@ -162,7 +165,7 @@ def _merge_hjson(self, hjson_data) -> None:
set_target_attribute(self.flow_cfg_file, self.__dict__, key, value)

def _expand(self) -> None:
"""Called to expand wildcards after merging hjson.
"""Expand wildcards after merging hjson.

Subclasses can override this to do something just before expansion.

Expand Down Expand Up @@ -237,8 +240,9 @@ def _load_child_cfg(self, entry, mk_config) -> None:
)
sys.exit(1)

def _conv_inline_cfg_to_hjson(self, idict):
def _conv_inline_cfg_to_hjson(self, idict: Mapping) -> str | None:
"""Dump a temp hjson file in the scratch space from input dict.

This method is to be called only by a primary cfg.
"""
if not self.is_primary_cfg:
Expand All @@ -259,8 +263,10 @@ def _conv_inline_cfg_to_hjson(self, idict):

# Create the file and dump the dict as hjson
log.verbose('Dumping inline cfg "%s" in hjson to:\n%s', name, temp_cfg_file)

try:
Path(temp_cfg_file).write_text(hjson.dumps(idict, for_json=True))

except Exception as e:
log.exception(
'Failed to hjson-dump temp cfg file"%s" for "%s"(will be skipped!) due to:\n%s',
Expand Down Expand Up @@ -332,6 +338,7 @@ def _do_override(self, ov_name: str, ov_value: object) -> None:
log.error('Override key "%s" not found in the cfg!', ov_name)
sys.exit(1)

@abstractmethod
def _purge(self) -> None:
"""Purge the existing scratch areas in preparation for the new run."""

Expand All @@ -340,6 +347,7 @@ def purge(self) -> None:
for item in self.cfgs:
item._purge()

@abstractmethod
def _print_list(self) -> None:
"""Print the list of available items that can be kicked off."""

Expand Down Expand Up @@ -370,12 +378,13 @@ def prune_selected_cfgs(self) -> None:
# Filter configurations
self.cfgs = [c for c in self.cfgs if c.name in self.select_cfgs]

@abstractmethod
def _create_deploy_objects(self) -> None:
"""Create deploy objects from items that were passed on for being run.

The deploy objects for build and run are created from the objects that
were created from the create_objects() method.
"""
return

def create_deploy_objects(self) -> None:
"""Public facing API for _create_deploy_objects()."""
Expand All @@ -389,7 +398,7 @@ def create_deploy_objects(self) -> None:
for item in self.cfgs:
item._create_deploy_objects()

def deploy_objects(self):
def deploy_objects(self) -> Mapping[Deploy, str]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely not for this PR (but I was reading this file...) I wonder whether this function name needs to be changed to something like get_deploy_objects or maybe whether it should be turned into a property?

At the moment, it's a bit sad that "deploy" is a verb as well as an adjective.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes makes sense, I'd like to eventually refactor the whole thing with tests.

"""Public facing API for deploying all available objects.

Runs each job and returns a map from item to status.
Expand All @@ -402,21 +411,26 @@ def deploy_objects(self):
log.error("Nothing to run!")
sys.exit(1)

return Scheduler(deploy, get_launcher_cls(), self.interactive).run()
return Scheduler(
items=deploy,
launcher_cls=get_launcher_cls(),
interactive=self.interactive,
).run()

def _gen_results(self, results: Mapping[Deploy, str]) -> None:
"""Generate results.
@abstractmethod
def _gen_results(self, results: Mapping[Deploy, str]) -> str:
"""Generate flow results.

The function is called after the flow has completed. It collates the
status of all run targets and generates a dict. It parses the log
The function is called after the flow has completed. It collates
the status of all run targets and generates a dict. It parses the log
to identify the errors, warnings and failures as applicable. It also
prints the full list of failures for debug / triage to the final
report, which is in markdown format.

results should be a dictionary mapping deployed item to result.
"""

def gen_results(self, results) -> None:
def gen_results(self, results: Mapping[Deploy, str]) -> None:
"""Public facing API for _gen_results().

results should be a dictionary mapping deployed item to result.
Expand All @@ -437,6 +451,7 @@ def gen_results(self, results) -> None:
self.gen_results_summary()
self.write_results(self.results_html_name, self.results_summary_md)

@abstractmethod
def gen_results_summary(self) -> None:
"""Public facing API to generate summary results for each IP/cfg file."""

Expand Down Expand Up @@ -468,4 +483,5 @@ def _get_results_page_link(self, relative_to: str, link_text: str = "") -> str:
return f"[{link_text}]({relative_link})"

def has_errors(self) -> bool:
"""Return error state."""
return self.errors_seen
47 changes: 21 additions & 26 deletions src/dvsim/flow/sim.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@

"""Class describing simulation configuration object."""

import collections
import fnmatch
import json
import os
import re
import sys
from collections import OrderedDict
from collections import OrderedDict, defaultdict
from collections.abc import Mapping
from datetime import datetime, timezone
from pathlib import Path
from typing import ClassVar

from tabulate import tabulate

from dvsim.flow.base import FlowCfg
from dvsim.job.deploy import CompileSim, CovAnalyze, CovMerge, CovReport, CovUnr, RunTest
from dvsim.job.deploy import CompileSim, CovAnalyze, CovMerge, CovReport, CovUnr, Deploy, RunTest
from dvsim.logging import log
from dvsim.modes import BuildMode, Mode, RunMode, find_mode
from dvsim.regression import Regression
Expand Down Expand Up @@ -327,7 +327,7 @@ def _print_list(self) -> None:
log.info(mode_name)

def _create_build_and_run_list(self) -> None:
"""Generates a list of deployable objects from the provided items.
"""Generate a list of deployable objects from the provided items.

Tests to be run are provided with --items switch. These can be glob-
style patterns. This method finds regressions and tests that match
Expand Down Expand Up @@ -562,20 +562,13 @@ def cov_unr(self) -> None:
for item in self.cfgs:
item._cov_unr()

def _gen_json_results(self, run_results):
"""Returns the run results as json-formatted dictionary."""

def _empty_str_as_none(s: str) -> str | None:
"""Map an empty string to None and retain the value of a non-empty
string.

This is intended to clearly distinguish an empty string, which may
or may not be an valid value, from an invalid value.
"""
return s if s != "" else None
def _gen_json_results(self, run_results: Mapping[Deploy, str]) -> str:
"""Return the run results as json-formatted dictionary."""

def _pct_str_to_float(s: str) -> float | None:
"""Map a percentage value stored in a string with ` %` suffix to a
"""Extract percent or None.

Map a percentage value stored in a string with ` %` suffix to a
float or to None if the conversion to Float fails.
"""
try:
Expand Down Expand Up @@ -608,7 +601,7 @@ def _test_result_to_dict(tr) -> dict:
# Describe name of hardware block targeted by this run and optionally
# the variant of the hardware block.
results["block_name"] = self.name.lower()
results["block_variant"] = _empty_str_as_none(self.variant.lower())
results["block_variant"] = self.variant.lower() or None

# The timestamp for this run has been taken with `utcnow()` and is
# stored in a custom format. Store it in standard ISO format with
Expand All @@ -620,7 +613,7 @@ def _test_result_to_dict(tr) -> dict:
# Extract Git properties.
m = re.search(r"https://github.com/.+?/tree/([0-9a-fA-F]+)", self.revision)
results["git_revision"] = m.group(1) if m else None
results["git_branch_name"] = _empty_str_as_none(self.branch)
results["git_branch_name"] = self.branch or None

# Describe type of report and tool used.
results["report_type"] = "simulation"
Expand Down Expand Up @@ -704,7 +697,7 @@ def _test_result_to_dict(tr) -> dict:
if sim_results.buckets:
by_tests = sorted(sim_results.buckets.items(), key=lambda i: len(i[1]), reverse=True)
for bucket, tests in by_tests:
unique_tests = collections.defaultdict(list)
unique_tests = defaultdict(list)
for test, line, context in tests:
if not isinstance(test, RunTest):
continue
Expand Down Expand Up @@ -743,16 +736,18 @@ def _test_result_to_dict(tr) -> dict:
# Return the `results` dictionary as json string.
return json.dumps(self.results_dict)

def _gen_results(self, run_results):
"""The function is called after the regression has completed. It collates the
def _gen_results(self, results: Mapping[Deploy, str]) -> str:
"""Generate simulation results.

The function is called after the regression has completed. It collates the
status of all run targets and generates a dict. It parses the testplan and
maps the generated result to the testplan entries to generate a final table
(list). It also prints the full list of failures for debug / triage. If cov
is enabled, then the summary coverage report is also generated. The final
result is in markdown format.
"""

def indent_by(level):
def indent_by(level: int) -> str:
return " " * (4 * level)

def create_failure_message(test, line, context):
Expand All @@ -769,7 +764,7 @@ def create_failure_message(test, line, context):
return message

def create_bucket_report(buckets):
"""Creates a report based on the given buckets.
"""Create a report based on the given buckets.

The buckets are sorted by descending number of failures. Within
buckets this also group tests by unqualified name, and just a few
Expand All @@ -787,7 +782,7 @@ def create_bucket_report(buckets):
fail_msgs = ["\n## Failure Buckets", ""]
for bucket, tests in by_tests:
fail_msgs.append(f"* `{bucket}` has {len(tests)} failures:")
unique_tests = collections.defaultdict(list)
unique_tests = defaultdict(list)
for test, line, context in tests:
unique_tests[test.name].append((test, line, context))
for name, test_reseeds in list(unique_tests.items())[:_MAX_UNIQUE_TESTS]:
Expand All @@ -812,7 +807,7 @@ def create_bucket_report(buckets):
return fail_msgs

deployed_items = self.deploy
results = SimResults(deployed_items, run_results)
results = SimResults(deployed_items, results)

# Generate results table for runs.
results_str = "## " + self.results_title + "\n"
Expand Down Expand Up @@ -881,7 +876,7 @@ def create_bucket_report(buckets):

# Append coverage results if coverage was enabled.
if self.cov_report_deploy is not None:
report_status = run_results[self.cov_report_deploy]
report_status = results[self.cov_report_deploy]
if report_status == "P":
results_str += "\n## Coverage Results\n"
# Link the dashboard page using "cov_report_page" value.
Expand Down
5 changes: 3 additions & 2 deletions src/dvsim/job/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

if TYPE_CHECKING:
from dvsim.flow.sim import SimCfg
from dvsim.launcher.base import Launcher


class Deploy:
Expand Down Expand Up @@ -92,7 +93,7 @@ def __init__(self, sim_cfg: "SimCfg") -> None:
self.cmd = self._construct_cmd()

# Launcher instance created later using create_launcher() method.
self.launcher = None
self.launcher: Launcher | None = None

# Job's wall clock time (a.k.a CPU time, or runtime).
self.job_runtime = JobTime()
Expand Down Expand Up @@ -484,7 +485,7 @@ class RunTest(Deploy):
fixed_seed = None
cmds_list_vars = ["pre_run_cmds", "post_run_cmds"]

def __init__(self, index, test, build_job, sim_cfg) -> None:
def __init__(self, index, test, build_job, sim_cfg: "SimCfg") -> None:
self.test_obj = test
self.index = index
self.build_seed = sim_cfg.build_seed
Expand Down
8 changes: 4 additions & 4 deletions src/dvsim/launcher/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
EDACLOUD_LAUNCHER_EXISTS = False

# The chosen launcher class.
_LAUNCHER_CLS = None
_LAUNCHER_CLS: type[Launcher] | None = None


def set_launcher_type(is_local=False) -> None:
"""Sets the launcher type that will be used to launch the jobs.
def set_launcher_type(is_local: bool = False) -> None:
"""Set the launcher type that will be used to launch the jobs.

The env variable `DVSIM_LAUNCHER` is used to identify what launcher system
to use. This variable is specific to the user's work site. It is meant to
Expand Down Expand Up @@ -66,7 +66,7 @@ def set_launcher_type(is_local=False) -> None:
_LAUNCHER_CLS = LocalLauncher


def get_launcher_cls():
def get_launcher_cls() -> type[Launcher]:
"""Returns the chosen launcher class."""
assert _LAUNCHER_CLS is not None
return _LAUNCHER_CLS
Expand Down
Loading
Loading