diff --git a/src/dvsim/flow/base.py b/src/dvsim/flow/base.py index 7dded27f..619526d5 100644 --- a/src/dvsim/flow/base.py +++ b/src/dvsim/flow/base.py @@ -7,8 +7,10 @@ import os import pprint import sys -from collections.abc import Mapping +from abc import ABC, abstractmethod +from collections.abc import Mapping, Sequence from pathlib import Path +from typing import ClassVar import hjson @@ -27,7 +29,7 @@ # Interface class for extensions. -class FlowCfg: +class FlowCfg(ABC): """Base class for the different flows supported by dvsim.py. The constructor expects some parsed hjson data. Create these objects with @@ -41,9 +43,10 @@ class FlowCfg: # Can be overridden in subclasses to configure which wildcards to ignore # when expanding hjson. - ignored_wildcards = [] + ignored_wildcards: ClassVar = [] def __str__(self) -> str: + """Get string representation of the flow config.""" return pprint.pformat(self.__dict__) def __init__(self, flow_cfg_file, hjson_data, args, mk_config) -> None: @@ -87,7 +90,7 @@ def __init__(self, flow_cfg_file, hjson_data, args, mk_config) -> None: # For a primary cfg, it is the aggregated list of all deploy objects # under self.cfgs. For a non-primary cfg, it is the list of items # slated for dispatch. - self.deploy = [] + self.deploy: Sequence[Deploy] = [] # Timestamp self.timestamp_long = args.timestamp_long @@ -98,7 +101,7 @@ def __init__(self, flow_cfg_file, hjson_data, args, mk_config) -> None: self.rel_path = "" self.results_title = "" self.revision = "" - self.css_file = os.path.join(Path(os.path.realpath(__file__)).parent, "style.css") + self.css_file = Path(__file__).resolve().parent / "style.css" # `self.results_*` below will be updated after `self.rel_path` and # `self.scratch_base_root` variables are updated. self.results_dir = "" @@ -151,7 +154,7 @@ def __init__(self, flow_cfg_file, hjson_data, args, mk_config) -> None: # Run any final checks self._post_init() - def _merge_hjson(self, hjson_data) -> None: + def _merge_hjson(self, hjson_data: Mapping) -> None: """Take hjson data and merge it into self.__dict__. Subclasses that need to do something just before the merge should @@ -162,7 +165,7 @@ def _merge_hjson(self, hjson_data) -> None: set_target_attribute(self.flow_cfg_file, self.__dict__, key, value) def _expand(self) -> None: - """Called to expand wildcards after merging hjson. + """Expand wildcards after merging hjson. Subclasses can override this to do something just before expansion. @@ -237,8 +240,9 @@ def _load_child_cfg(self, entry, mk_config) -> None: ) sys.exit(1) - def _conv_inline_cfg_to_hjson(self, idict): + def _conv_inline_cfg_to_hjson(self, idict: Mapping) -> str | None: """Dump a temp hjson file in the scratch space from input dict. + This method is to be called only by a primary cfg. """ if not self.is_primary_cfg: @@ -259,8 +263,10 @@ def _conv_inline_cfg_to_hjson(self, idict): # Create the file and dump the dict as hjson log.verbose('Dumping inline cfg "%s" in hjson to:\n%s', name, temp_cfg_file) + try: Path(temp_cfg_file).write_text(hjson.dumps(idict, for_json=True)) + except Exception as e: log.exception( 'Failed to hjson-dump temp cfg file"%s" for "%s"(will be skipped!) due to:\n%s', @@ -332,6 +338,7 @@ def _do_override(self, ov_name: str, ov_value: object) -> None: log.error('Override key "%s" not found in the cfg!', ov_name) sys.exit(1) + @abstractmethod def _purge(self) -> None: """Purge the existing scratch areas in preparation for the new run.""" @@ -340,6 +347,7 @@ def purge(self) -> None: for item in self.cfgs: item._purge() + @abstractmethod def _print_list(self) -> None: """Print the list of available items that can be kicked off.""" @@ -370,12 +378,13 @@ def prune_selected_cfgs(self) -> None: # Filter configurations self.cfgs = [c for c in self.cfgs if c.name in self.select_cfgs] + @abstractmethod def _create_deploy_objects(self) -> None: """Create deploy objects from items that were passed on for being run. + The deploy objects for build and run are created from the objects that were created from the create_objects() method. """ - return def create_deploy_objects(self) -> None: """Public facing API for _create_deploy_objects().""" @@ -389,7 +398,7 @@ def create_deploy_objects(self) -> None: for item in self.cfgs: item._create_deploy_objects() - def deploy_objects(self): + def deploy_objects(self) -> Mapping[Deploy, str]: """Public facing API for deploying all available objects. Runs each job and returns a map from item to status. @@ -402,13 +411,18 @@ def deploy_objects(self): log.error("Nothing to run!") sys.exit(1) - return Scheduler(deploy, get_launcher_cls(), self.interactive).run() + return Scheduler( + items=deploy, + launcher_cls=get_launcher_cls(), + interactive=self.interactive, + ).run() - def _gen_results(self, results: Mapping[Deploy, str]) -> None: - """Generate results. + @abstractmethod + def _gen_results(self, results: Mapping[Deploy, str]) -> str: + """Generate flow results. - The function is called after the flow has completed. It collates the - status of all run targets and generates a dict. It parses the log + The function is called after the flow has completed. It collates + the status of all run targets and generates a dict. It parses the log to identify the errors, warnings and failures as applicable. It also prints the full list of failures for debug / triage to the final report, which is in markdown format. @@ -416,7 +430,7 @@ def _gen_results(self, results: Mapping[Deploy, str]) -> None: results should be a dictionary mapping deployed item to result. """ - def gen_results(self, results) -> None: + def gen_results(self, results: Mapping[Deploy, str]) -> None: """Public facing API for _gen_results(). results should be a dictionary mapping deployed item to result. @@ -437,6 +451,7 @@ def gen_results(self, results) -> None: self.gen_results_summary() self.write_results(self.results_html_name, self.results_summary_md) + @abstractmethod def gen_results_summary(self) -> None: """Public facing API to generate summary results for each IP/cfg file.""" @@ -468,4 +483,5 @@ def _get_results_page_link(self, relative_to: str, link_text: str = "") -> str: return f"[{link_text}]({relative_link})" def has_errors(self) -> bool: + """Return error state.""" return self.errors_seen diff --git a/src/dvsim/flow/sim.py b/src/dvsim/flow/sim.py index 2b39c263..dce93af7 100644 --- a/src/dvsim/flow/sim.py +++ b/src/dvsim/flow/sim.py @@ -4,13 +4,13 @@ """Class describing simulation configuration object.""" -import collections import fnmatch import json import os import re import sys -from collections import OrderedDict +from collections import OrderedDict, defaultdict +from collections.abc import Mapping from datetime import datetime, timezone from pathlib import Path from typing import ClassVar @@ -18,7 +18,7 @@ from tabulate import tabulate from dvsim.flow.base import FlowCfg -from dvsim.job.deploy import CompileSim, CovAnalyze, CovMerge, CovReport, CovUnr, RunTest +from dvsim.job.deploy import CompileSim, CovAnalyze, CovMerge, CovReport, CovUnr, Deploy, RunTest from dvsim.logging import log from dvsim.modes import BuildMode, Mode, RunMode, find_mode from dvsim.regression import Regression @@ -327,7 +327,7 @@ def _print_list(self) -> None: log.info(mode_name) def _create_build_and_run_list(self) -> None: - """Generates a list of deployable objects from the provided items. + """Generate a list of deployable objects from the provided items. Tests to be run are provided with --items switch. These can be glob- style patterns. This method finds regressions and tests that match @@ -562,20 +562,13 @@ def cov_unr(self) -> None: for item in self.cfgs: item._cov_unr() - def _gen_json_results(self, run_results): - """Returns the run results as json-formatted dictionary.""" - - def _empty_str_as_none(s: str) -> str | None: - """Map an empty string to None and retain the value of a non-empty - string. - - This is intended to clearly distinguish an empty string, which may - or may not be an valid value, from an invalid value. - """ - return s if s != "" else None + def _gen_json_results(self, run_results: Mapping[Deploy, str]) -> str: + """Return the run results as json-formatted dictionary.""" def _pct_str_to_float(s: str) -> float | None: - """Map a percentage value stored in a string with ` %` suffix to a + """Extract percent or None. + + Map a percentage value stored in a string with ` %` suffix to a float or to None if the conversion to Float fails. """ try: @@ -608,7 +601,7 @@ def _test_result_to_dict(tr) -> dict: # Describe name of hardware block targeted by this run and optionally # the variant of the hardware block. results["block_name"] = self.name.lower() - results["block_variant"] = _empty_str_as_none(self.variant.lower()) + results["block_variant"] = self.variant.lower() or None # The timestamp for this run has been taken with `utcnow()` and is # stored in a custom format. Store it in standard ISO format with @@ -620,7 +613,7 @@ def _test_result_to_dict(tr) -> dict: # Extract Git properties. m = re.search(r"https://github.com/.+?/tree/([0-9a-fA-F]+)", self.revision) results["git_revision"] = m.group(1) if m else None - results["git_branch_name"] = _empty_str_as_none(self.branch) + results["git_branch_name"] = self.branch or None # Describe type of report and tool used. results["report_type"] = "simulation" @@ -704,7 +697,7 @@ def _test_result_to_dict(tr) -> dict: if sim_results.buckets: by_tests = sorted(sim_results.buckets.items(), key=lambda i: len(i[1]), reverse=True) for bucket, tests in by_tests: - unique_tests = collections.defaultdict(list) + unique_tests = defaultdict(list) for test, line, context in tests: if not isinstance(test, RunTest): continue @@ -743,8 +736,10 @@ def _test_result_to_dict(tr) -> dict: # Return the `results` dictionary as json string. return json.dumps(self.results_dict) - def _gen_results(self, run_results): - """The function is called after the regression has completed. It collates the + def _gen_results(self, results: Mapping[Deploy, str]) -> str: + """Generate simulation results. + + The function is called after the regression has completed. It collates the status of all run targets and generates a dict. It parses the testplan and maps the generated result to the testplan entries to generate a final table (list). It also prints the full list of failures for debug / triage. If cov @@ -752,7 +747,7 @@ def _gen_results(self, run_results): result is in markdown format. """ - def indent_by(level): + def indent_by(level: int) -> str: return " " * (4 * level) def create_failure_message(test, line, context): @@ -769,7 +764,7 @@ def create_failure_message(test, line, context): return message def create_bucket_report(buckets): - """Creates a report based on the given buckets. + """Create a report based on the given buckets. The buckets are sorted by descending number of failures. Within buckets this also group tests by unqualified name, and just a few @@ -787,7 +782,7 @@ def create_bucket_report(buckets): fail_msgs = ["\n## Failure Buckets", ""] for bucket, tests in by_tests: fail_msgs.append(f"* `{bucket}` has {len(tests)} failures:") - unique_tests = collections.defaultdict(list) + unique_tests = defaultdict(list) for test, line, context in tests: unique_tests[test.name].append((test, line, context)) for name, test_reseeds in list(unique_tests.items())[:_MAX_UNIQUE_TESTS]: @@ -812,7 +807,7 @@ def create_bucket_report(buckets): return fail_msgs deployed_items = self.deploy - results = SimResults(deployed_items, run_results) + results = SimResults(deployed_items, results) # Generate results table for runs. results_str = "## " + self.results_title + "\n" @@ -881,7 +876,7 @@ def create_bucket_report(buckets): # Append coverage results if coverage was enabled. if self.cov_report_deploy is not None: - report_status = run_results[self.cov_report_deploy] + report_status = results[self.cov_report_deploy] if report_status == "P": results_str += "\n## Coverage Results\n" # Link the dashboard page using "cov_report_page" value. diff --git a/src/dvsim/job/deploy.py b/src/dvsim/job/deploy.py index 99fd6a79..cebd66be 100644 --- a/src/dvsim/job/deploy.py +++ b/src/dvsim/job/deploy.py @@ -26,6 +26,7 @@ if TYPE_CHECKING: from dvsim.flow.sim import SimCfg + from dvsim.launcher.base import Launcher class Deploy: @@ -92,7 +93,7 @@ def __init__(self, sim_cfg: "SimCfg") -> None: self.cmd = self._construct_cmd() # Launcher instance created later using create_launcher() method. - self.launcher = None + self.launcher: Launcher | None = None # Job's wall clock time (a.k.a CPU time, or runtime). self.job_runtime = JobTime() @@ -484,7 +485,7 @@ class RunTest(Deploy): fixed_seed = None cmds_list_vars = ["pre_run_cmds", "post_run_cmds"] - def __init__(self, index, test, build_job, sim_cfg) -> None: + def __init__(self, index, test, build_job, sim_cfg: "SimCfg") -> None: self.test_obj = test self.index = index self.build_seed = sim_cfg.build_seed diff --git a/src/dvsim/launcher/factory.py b/src/dvsim/launcher/factory.py index a2120631..4183fad2 100644 --- a/src/dvsim/launcher/factory.py +++ b/src/dvsim/launcher/factory.py @@ -20,11 +20,11 @@ EDACLOUD_LAUNCHER_EXISTS = False # The chosen launcher class. -_LAUNCHER_CLS = None +_LAUNCHER_CLS: type[Launcher] | None = None -def set_launcher_type(is_local=False) -> None: - """Sets the launcher type that will be used to launch the jobs. +def set_launcher_type(is_local: bool = False) -> None: + """Set the launcher type that will be used to launch the jobs. The env variable `DVSIM_LAUNCHER` is used to identify what launcher system to use. This variable is specific to the user's work site. It is meant to @@ -66,7 +66,7 @@ def set_launcher_type(is_local=False) -> None: _LAUNCHER_CLS = LocalLauncher -def get_launcher_cls(): +def get_launcher_cls() -> type[Launcher]: """Returns the chosen launcher class.""" assert _LAUNCHER_CLS is not None return _LAUNCHER_CLS diff --git a/src/dvsim/scheduler.py b/src/dvsim/scheduler.py index 846ccd5e..2c829fa0 100644 --- a/src/dvsim/scheduler.py +++ b/src/dvsim/scheduler.py @@ -2,25 +2,43 @@ # Licensed under the Apache License, Version 2.0, see LICENSE for details. # SPDX-License-Identifier: Apache-2.0 +"""Job scheduler.""" + import contextlib import threading +from collections.abc import ( + Mapping, + MutableMapping, + MutableSequence, + MutableSet, + Sequence, +) from signal import SIGINT, SIGTERM, signal +from types import FrameType +from typing import TYPE_CHECKING, Any -from dvsim.launcher.base import LauncherBusyError, LauncherError +from dvsim.job.deploy import Deploy +from dvsim.launcher.base import Launcher, LauncherBusyError, LauncherError from dvsim.logging import log from dvsim.utils.status_printer import get_status_printer from dvsim.utils.timer import Timer +if TYPE_CHECKING: + from dvsim.flow.base import FlowCfg + -# Sum of lengths of all lists in the given dict. -def sum_dict_lists(d): - """Given a dict whose key values are lists, return sum of lengths of +def total_sub_items( + d: Mapping[str, Sequence[Deploy]] | Mapping["FlowCfg", Sequence[Deploy]], +) -> int: + """Return the total number of sub items in a mapping. + + Given a dict whose key values are lists, return sum of lengths of these lists. """ - return sum(len(d[k]) for k in d) + return sum(len(v) for v in d.values()) -def get_next_item(arr, index): +def get_next_item(arr: Sequence, index: int) -> tuple[Any, int]: """Perpetually get an item from a list. Returns the next item on the list by advancing the index by 1. If the index @@ -42,7 +60,7 @@ def get_next_item(arr, index): item = arr[index] except IndexError: msg = "List is empty!" - raise IndexError(msg) + raise IndexError(msg) from None return item, index @@ -50,8 +68,15 @@ def get_next_item(arr, index): class Scheduler: """An object that runs one or more Deploy items.""" - def __init__(self, items, launcher_cls, interactive) -> None: - self.items = items + def __init__( + self, + items: Sequence[Deploy], + launcher_cls: type[Launcher], + *, + interactive: bool, + ) -> None: + """Initialise a job scheduler.""" + self.items: Sequence[Deploy] = items # 'scheduled[target][cfg]' is a list of Deploy objects for the chosen # target and cfg. As items in _scheduled are ready to be run (once @@ -59,7 +84,7 @@ def __init__(self, items, launcher_cls, interactive) -> None: # they wait until slots are available for them to be dispatched. # When all items (in all cfgs) of a target are done, it is removed from # this dictionary. - self._scheduled = {} + self._scheduled: MutableMapping[str, MutableMapping[FlowCfg, MutableSequence[Deploy]]] = {} self.add_to_scheduled(items) # Print status periodically using an external status printer. @@ -79,12 +104,12 @@ def __init__(self, items, launcher_cls, interactive) -> None: # This is done to allow us to poll a smaller subset of jobs rather than # the entire regression. We keep rotating through our list of running # items, picking up where we left off on the last poll. - self._targets = list(self._scheduled.keys()) - self._queued = {} - self._running = {} - self._passed = {} - self._failed = {} - self._killed = {} + self._targets: Sequence[str] = list(self._scheduled.keys()) + self._queued: MutableMapping[str, MutableSequence[Deploy]] = {} + self._running: MutableMapping[str, MutableSequence[Deploy]] = {} + self._passed: MutableMapping[str, MutableSet[Deploy]] = {} + self._failed: MutableMapping[str, MutableSet[Deploy]] = {} + self._killed: MutableMapping[str, MutableSet[Deploy]] = {} self._total = {} self.last_target_polled_idx = -1 self.last_item_polled_idx = {} @@ -94,7 +119,7 @@ def __init__(self, items, launcher_cls, interactive) -> None: self._passed[target] = set() self._failed[target] = set() self._killed[target] = set() - self._total[target] = sum_dict_lists(self._scheduled[target]) + self._total[target] = total_sub_items(self._scheduled[target]) self.last_item_polled_idx[target] = -1 # Stuff for printing the status. @@ -111,7 +136,7 @@ def __init__(self, items, launcher_cls, interactive) -> None: # current status. This status is 'Q', 'D', 'P', 'F' or 'K', # corresponding to membership in the dicts above. This is not # per-target. - self.item_to_status = {} + self.item_to_status: MutableMapping[Deploy, str] = {} # Create the launcher instance for all items. for item in self.items: @@ -119,9 +144,9 @@ def __init__(self, items, launcher_cls, interactive) -> None: # The chosen launcher class. This allows us to access launcher # variant-specific settings such as max parallel jobs & poll rate. - self.launcher_cls = launcher_cls + self.launcher_cls: type[Launcher] = launcher_cls - def run(self): + def run(self) -> Mapping[Deploy, str]: """Run all scheduled jobs and return the results. Returns the results (status) of all items dispatched for all @@ -133,7 +158,7 @@ def run(self): stop_now = threading.Event() old_handler = None - def on_signal(signal_received, frame) -> None: + def on_signal(signal_received: int, _: FrameType | None) -> None: log.info( "Received signal %s. Exiting gracefully.", signal_received, @@ -146,7 +171,9 @@ def on_signal(signal_received, frame) -> None: ) # Restore old handler to catch a second SIGINT - assert old_handler is not None + if old_handler is None: + raise RuntimeError("Old SIGINT handler not found") + signal(signal_received, old_handler) stop_now.set() @@ -186,10 +213,12 @@ def on_signal(signal_received, frame) -> None: # We got to the end without anything exploding. Return the results. return self.item_to_status - def add_to_scheduled(self, items) -> None: - """Add items to the list of _scheduled. + def add_to_scheduled(self, items: Sequence[Deploy]) -> None: + """Add items to the schedule. + + Args: + items: Deploy objects to add to the schedule. - 'items' is a list of Deploy objects. """ for item in items: target_dict = self._scheduled.setdefault(item.target, {}) @@ -197,45 +226,20 @@ def add_to_scheduled(self, items) -> None: if item not in cfg_list: cfg_list.append(item) - def _remove_from_scheduled(self, item) -> None: - """Removes the item from _scheduled[target][cfg] list. - - When all items in _scheduled[target][cfg] are finally removed, the cfg - key is deleted. - """ + def _unschedule_item(self, item: Deploy) -> None: + """Remove deploy item from the schedule.""" target_dict = self._scheduled[item.target] cfg_list = target_dict.get(item.sim_cfg) if cfg_list is not None: with contextlib.suppress(ValueError): cfg_list.remove(item) + + # When all items in _scheduled[target][cfg] are finally removed, + # the cfg key is deleted. if not cfg_list: del target_dict[item.sim_cfg] - def _get_next_target(self, curr_target): - """Returns the target that succeeds the current one. - - curr_target is the target of the job that just completed (example - - build). If it is None, then the first target in _scheduled is returned. - """ - if curr_target is None: - return next(iter(self._scheduled)) - - assert curr_target in self._scheduled - target_iterator = iter(self._scheduled) - target = next(target_iterator) - - found = False - while not found: - if target == curr_target: - found = True - try: - target = next(target_iterator) - except StopIteration: - return None - - return target - - def _enqueue_successors(self, item=None) -> None: + def _enqueue_successors(self, item: Deploy | None = None) -> None: """Move an item's successors from _scheduled to _queued. 'item' is the recently run job that has completed. If None, then we @@ -248,19 +252,20 @@ def _enqueue_successors(self, item=None) -> None: assert next_item not in self._queued[next_item.target] self.item_to_status[next_item] = "Q" self._queued[next_item.target].append(next_item) - self._remove_from_scheduled(next_item) + self._unschedule_item(next_item) + + def _cancel_successors(self, item: Deploy) -> None: + """Cancel an item's successors. - def _cancel_successors(self, item) -> None: - """Cancel an item's successors recursively by moving them from - _scheduled or _queued to _killed. + Recursively move them from _scheduled or _queued to _killed. """ - items = self._get_successors(item) + items = list(self._get_successors(item)) while items: next_item = items.pop() self._cancel_item(next_item, cancel_successors=False) items.extend(self._get_successors(next_item)) - def _get_successors(self, item=None): + def _get_successors(self, item: Deploy | None = None) -> Sequence[Deploy]: """Find immediate successors of an item. 'item' is a job that has completed. We choose the target that follows @@ -273,14 +278,34 @@ def _get_successors(self, item=None): none. """ if item is None: - target = self._get_next_target(None) + target = next(iter(self._scheduled)) + + if target is None: + return [] + cfgs = set(self._scheduled[target]) + else: - target = self._get_next_target(item.target) - cfgs = {item.sim_cfg} + if item.target not in self._scheduled: + msg = f"Scheduler does not contain target {item.target}" + raise KeyError(msg) + + target_iterator = iter(self._scheduled) + target = next(target_iterator) + + found = False + while not found: + if target == item.target: + found = True + try: + target = next(target_iterator) + except StopIteration: + return [] - if target is None: - return [] + if target is None: + return [] + + cfgs = {item.sim_cfg} # Find item's successors that can be enqueued. We assume here that # only the immediately succeeding target can be enqueued at this @@ -300,8 +325,8 @@ def _get_successors(self, item=None): return successors - def _ok_to_enqueue(self, item) -> bool: - """Returns true if ALL dependencies of item are complete.""" + def _ok_to_enqueue(self, item: Deploy) -> bool: + """Return true if ALL dependencies of item are complete.""" for dep in item.dependencies: # Ignore dependencies that were not scheduled to run. if dep not in self.items: @@ -317,8 +342,8 @@ def _ok_to_enqueue(self, item) -> bool: return True - def _ok_to_run(self, item): - """Returns true if the required dependencies have passed. + def _ok_to_run(self, item: Deploy) -> bool: + """Return true if the required dependencies have passed. The item's needs_all_dependencies_passing setting is used to figure out whether we can run this item or not, based on its dependent jobs' @@ -332,24 +357,26 @@ def _ok_to_run(self, item): continue dep_status = self.item_to_status[dep] - assert dep_status in ["P", "F", "K"] + if dep_status not in ["P", "F", "K"]: + raise ValueError("Status must be one of P, F, or K") if item.needs_all_dependencies_passing: if dep_status in ["F", "K"]: return False + elif dep_status in ["P"]: return True return item.needs_all_dependencies_passing - def _poll(self, hms): + def _poll(self, hms: str) -> bool: """Check for running items that have finished. Returns True if something changed. """ max_poll = min( self.launcher_cls.max_poll, - sum_dict_lists(self._running), + total_sub_items(self._running), ) # If there are no jobs running, we are likely done (possibly because @@ -374,15 +401,20 @@ def _poll(self, hms): status = item.launcher.poll() level = log.VERBOSE - assert status in ["D", "P", "F", "E", "K"] + if status not in ["D", "P", "F", "E", "K"]: + msg = f"Status must be one of D, P, F, E or K but found {status}" + raise ValueError(msg) + if status == "D": continue if status == "P": self._passed[target].add(item) + elif status == "F": self._failed[target].add(item) level = log.ERROR + else: # Killed or Error dispatching self._killed[target].add(item) @@ -391,6 +423,7 @@ def _poll(self, hms): self._running[target].pop(self.last_item_polled_idx[target]) self.last_item_polled_idx[target] -= 1 self.item_to_status[item] = status + log.log( level, "[%s]: [%s]: [status] [%s: %s]", @@ -412,9 +445,9 @@ def _poll(self, hms): return changed - def _dispatch(self, hms) -> None: + def _dispatch(self, hms: str) -> None: """Dispatch some queued items if possible.""" - slots = self.launcher_cls.max_parallel - sum_dict_lists(self._running) + slots = self.launcher_cls.max_parallel - total_sub_items(self._running) if slots <= 0: return @@ -478,12 +511,12 @@ def _dispatch(self, hms) -> None: try: item.launcher.launch() - except LauncherError as err: - log.exception(err.msg) + except LauncherError: + log.exception("Error launching %s", item) self._kill_item(item) - except LauncherBusyError as err: - log.exception("Launcher busy: %s", err) + except LauncherBusyError: + log.exception("Launcher busy") self._queued[target].append(item) @@ -512,7 +545,7 @@ def _kill(self) -> None: for item in list(self._running[target]): self._kill_item(item) - def _check_if_done(self, hms): + def _check_if_done(self, hms: str) -> bool: """Check if we are done executing all jobs. Also, prints the status of currently running jobs. @@ -554,7 +587,7 @@ def _check_if_done(self, hms): ) return done - def _cancel_item(self, item, cancel_successors=True) -> None: + def _cancel_item(self, item: Deploy, *, cancel_successors: bool = True) -> None: """Cancel an item and optionally all of its successors. Supplied item may be in _scheduled list or the _queued list. From @@ -565,12 +598,12 @@ def _cancel_item(self, item, cancel_successors=True) -> None: if item in self._queued[item.target]: self._queued[item.target].remove(item) else: - self._remove_from_scheduled(item) + self._unschedule_item(item) if cancel_successors: self._cancel_successors(item) - def _kill_item(self, item) -> None: + def _kill_item(self, item: Deploy) -> None: """Kill a running item and cancel all of its successors.""" item.launcher.kill() self.item_to_status[item] = "K"