Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/dvsim/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

"""Flow config base class."""

import json
import os
import pprint
import sys
Expand Down Expand Up @@ -411,6 +412,19 @@ def deploy_objects(self) -> Mapping[Deploy, str]:
log.error("Nothing to run!")
sys.exit(1)

if os.environ.get("DVSIM_DEPLOY_DUMP", "true"):
filename = f"deploy_{self.branch}_{self.timestamp}.json"
(Path(self.scratch_root) / filename).write_text(
json.dumps(
# Sort on full name to ensure consistent ordering
sorted(
[d.model_dump() for d in deploy],
key=lambda d: d["full_name"],
),
indent=2,
),
)

return Scheduler(
items=deploy,
launcher_cls=get_launcher_cls(),
Expand Down
21 changes: 21 additions & 0 deletions src/dvsim/job/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,27 @@ def create_launcher(self) -> None:
# Retain the handle to self for lookup & callbacks.
self.launcher = get_launcher(self)

def model_dump(self) -> Mapping:
"""Dump the deployment object to mapping object.

This method matches the interface provided by pydantic models to dump a
subset of the class attributes

Returns:
Representation of a deployment object as a dict.

"""
return {
"full_name": self.full_name,
"type": self.__class__.__name__,
"exports": self.exports,
"interactive": self.sim_cfg.interactive,
"log_path": self.get_log_path(),
"timeout_mins": self.get_timeout_mins(),
"cmd": self.cmd,
"gui": self.gui,
}


class CompileSim(Deploy):
"""Abstraction for building the simulation executable."""
Expand Down
6 changes: 5 additions & 1 deletion src/dvsim/launcher/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,21 @@
import shlex
import subprocess
from pathlib import Path
from typing import TYPE_CHECKING

from dvsim.launcher.base import ErrorMessage, Launcher, LauncherBusyError, LauncherError

if TYPE_CHECKING:
from dvsim.job.deploy import Deploy


class LocalLauncher(Launcher):
"""Implementation of Launcher to launch jobs in the user's local workstation."""

# Poll job's completion status every this many seconds
poll_freq = 0.025

def __init__(self, deploy) -> None:
def __init__(self, deploy: "Deploy") -> None:
"""Initialize common class members."""
super().__init__(deploy)

Expand Down
Loading