|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +import datetime |
| 4 | +import logging |
| 5 | +import subprocess |
| 6 | +import sys |
| 7 | +import threading |
| 8 | +from pathlib import PurePath |
| 9 | +from types import TracebackType |
| 10 | +from typing import IO, Any, Callable, Generator, List, Optional, TextIO, Type, Union |
| 11 | + |
| 12 | +import shiny._utils |
| 13 | + |
| 14 | +__all__ = ( |
| 15 | + "ShinyAppProc", |
| 16 | + "run_shiny_app", |
| 17 | + # For internal use only |
| 18 | + # "shiny_app_gen", |
| 19 | +) |
| 20 | + |
| 21 | + |
| 22 | +class OutputStream: |
| 23 | + """ |
| 24 | + Designed to wrap an IO[str] and accumulate the output using a bg thread |
| 25 | +
|
| 26 | + Also allows for blocking waits for particular lines. |
| 27 | + """ |
| 28 | + |
| 29 | + def __init__(self, io: IO[str], desc: Optional[str] = None): |
| 30 | + self._io = io |
| 31 | + self._closed = False |
| 32 | + self._lines: List[str] = [] |
| 33 | + self._cond = threading.Condition() |
| 34 | + self._thread = threading.Thread( |
| 35 | + group=None, target=self._run, daemon=True, name=desc |
| 36 | + ) |
| 37 | + |
| 38 | + self._thread.start() |
| 39 | + |
| 40 | + def _run(self): |
| 41 | + """ |
| 42 | + Add lines into self._lines in a tight loop. |
| 43 | + """ |
| 44 | + |
| 45 | + try: |
| 46 | + while not self._io.closed: |
| 47 | + try: |
| 48 | + line = self._io.readline() |
| 49 | + except ValueError: |
| 50 | + # This is raised when the stream is closed |
| 51 | + break |
| 52 | + if line != "": |
| 53 | + with self._cond: |
| 54 | + self._lines.append(line) |
| 55 | + self._cond.notify_all() |
| 56 | + finally: |
| 57 | + # If we got here, we're finished reading self._io and need to signal any |
| 58 | + # waiters that we're done and they'll never hear from us again. |
| 59 | + with self._cond: |
| 60 | + self._closed = True |
| 61 | + self._cond.notify_all() |
| 62 | + |
| 63 | + def wait_for(self, predicate: Callable[[str], bool], timeout_secs: float) -> bool: |
| 64 | + """ |
| 65 | + Wait until the predicate returns True for a line in the output. |
| 66 | +
|
| 67 | + Parameters |
| 68 | + ---------- |
| 69 | + predicate |
| 70 | + A function that takes a line of output and returns True if the line |
| 71 | + satisfies the condition. |
| 72 | + timeoutSecs |
| 73 | + How long to wait for the predicate to return True before raising a |
| 74 | + TimeoutError. |
| 75 | + """ |
| 76 | + timeout_at = datetime.datetime.now() + datetime.timedelta(seconds=timeout_secs) |
| 77 | + pos = 0 |
| 78 | + with self._cond: |
| 79 | + while True: |
| 80 | + while pos < len(self._lines): |
| 81 | + if predicate(self._lines[pos]): |
| 82 | + return True |
| 83 | + pos += 1 |
| 84 | + if self._closed: |
| 85 | + return False |
| 86 | + else: |
| 87 | + remaining = (timeout_at - datetime.datetime.now()).total_seconds() |
| 88 | + if remaining < 0 or not self._cond.wait(timeout=remaining): |
| 89 | + # Timed out |
| 90 | + raise TimeoutError( |
| 91 | + "Timeout while waiting for Shiny app to become ready" |
| 92 | + ) |
| 93 | + |
| 94 | + def __str__(self): |
| 95 | + with self._cond: |
| 96 | + return "".join(self._lines) |
| 97 | + |
| 98 | + |
| 99 | +def dummyio() -> TextIO: |
| 100 | + io = TextIO() |
| 101 | + io.close() |
| 102 | + return io |
| 103 | + |
| 104 | + |
| 105 | +class ShinyAppProc: |
| 106 | + """ |
| 107 | + Class that represents a running Shiny app process. |
| 108 | +
|
| 109 | + This class is a context manager that can be used to run a Shiny app in a subprocess. It provides a way to interact |
| 110 | + with the app and terminate it when it is no longer needed. |
| 111 | + """ |
| 112 | + |
| 113 | + file: PurePath |
| 114 | + """The path to the Shiny app file.""" |
| 115 | + proc: subprocess.Popen[str] |
| 116 | + """The subprocess object that represents the running Shiny app.""" |
| 117 | + port: int |
| 118 | + """The port that the Shiny app is running on.""" |
| 119 | + url: str |
| 120 | + """The URL that the Shiny app is running on.""" |
| 121 | + stdout: OutputStream |
| 122 | + """The standard output stream of the Shiny app subprocess.""" |
| 123 | + stderr: OutputStream |
| 124 | + """The standard error stream of the Shiny app subprocess.""" |
| 125 | + |
| 126 | + def __init__( |
| 127 | + self, |
| 128 | + proc: subprocess.Popen[str], |
| 129 | + port: int, |
| 130 | + *, |
| 131 | + app_file: PurePath | str, |
| 132 | + ): |
| 133 | + self.proc = proc |
| 134 | + self.port = port |
| 135 | + self.url = f"http://127.0.0.1:{port}/" |
| 136 | + self.stdout = OutputStream(proc.stdout or dummyio()) |
| 137 | + self.stderr = OutputStream(proc.stderr or dummyio()) |
| 138 | + threading.Thread(group=None, target=self._run, daemon=True).start() |
| 139 | + |
| 140 | + self.file = PurePath(app_file) |
| 141 | + |
| 142 | + def _run(self) -> None: |
| 143 | + self.proc.wait() |
| 144 | + if self.proc.stdout is not None: |
| 145 | + self.proc.stdout.close() |
| 146 | + if self.proc.stderr is not None: |
| 147 | + self.proc.stderr.close() |
| 148 | + |
| 149 | + def close(self) -> None: |
| 150 | + """ |
| 151 | + Closes the connection and terminates the process. |
| 152 | +
|
| 153 | + This method is responsible for closing the connection and terminating the process associated with it. |
| 154 | + """ |
| 155 | + # from time import sleep |
| 156 | + # sleep(0.5) |
| 157 | + self.proc.terminate() |
| 158 | + |
| 159 | + def __enter__(self) -> ShinyAppProc: |
| 160 | + return self |
| 161 | + |
| 162 | + def __exit__( |
| 163 | + self, |
| 164 | + exc_type: Optional[Type[BaseException]], |
| 165 | + exc_value: Optional[BaseException], |
| 166 | + traceback: Optional[TracebackType], |
| 167 | + ): |
| 168 | + self.close() |
| 169 | + |
| 170 | + def wait_until_ready(self, timeout_secs: float) -> None: |
| 171 | + """ |
| 172 | + Waits until the shiny app is ready to serve requests. |
| 173 | +
|
| 174 | + Parameters |
| 175 | + ---------- |
| 176 | + timeout_secs |
| 177 | + The maximum number of seconds to wait for the app to become ready. |
| 178 | +
|
| 179 | + Raises |
| 180 | + ------ |
| 181 | + ConnectionError |
| 182 | + If there is an error while starting the shiny app. |
| 183 | + TimeoutError |
| 184 | + If the shiny app does not become ready within the specified timeout. |
| 185 | + """ |
| 186 | + error_lines: List[str] = [] |
| 187 | + |
| 188 | + def stderr_uvicorn(line: str) -> bool: |
| 189 | + error_lines.append(line) |
| 190 | + if "error while attempting to bind on address" in line: |
| 191 | + raise ConnectionError(f"Error while starting shiny app: `{line}`") |
| 192 | + return "Uvicorn running on" in line |
| 193 | + |
| 194 | + if self.stderr.wait_for(stderr_uvicorn, timeout_secs=timeout_secs): |
| 195 | + return |
| 196 | + else: |
| 197 | + raise TimeoutError( |
| 198 | + "Shiny app exited without ever becoming ready. Waiting for 'Uvicorn running on' in stderr. Last 20 lines of stderr:\n" |
| 199 | + + "\n".join(error_lines[-20:]) |
| 200 | + ) |
| 201 | + |
| 202 | + |
| 203 | +def run_shiny_app( |
| 204 | + app_file: Union[str, PurePath], |
| 205 | + *, |
| 206 | + start_attempts: int = 3, |
| 207 | + port: int = 0, |
| 208 | + cwd: Optional[str] = None, |
| 209 | + wait_for_start: bool = True, |
| 210 | + timeout_secs: float = 30, |
| 211 | + bufsize: int = 64 * 1024, |
| 212 | +) -> ShinyAppProc: |
| 213 | + """ |
| 214 | + Run a Shiny app in a subprocess. |
| 215 | +
|
| 216 | + Parameters |
| 217 | + ---------- |
| 218 | + app_file |
| 219 | + The path to the Shiny app file. |
| 220 | + port |
| 221 | + The port to run the app on. If 0, a random port will be chosen. |
| 222 | + cwd |
| 223 | + The working directory to run the app in. |
| 224 | + wait_for_start |
| 225 | + If True, wait for the app to become ready before returning. |
| 226 | + timeout_secs |
| 227 | + The maximum number of seconds to wait for the app to become ready. |
| 228 | + bufsize |
| 229 | + The buffer size to use for stdout and stderr. |
| 230 | + """ |
| 231 | + shiny_port = port if port != 0 else shiny._utils.random_port() |
| 232 | + |
| 233 | + child = subprocess.Popen( |
| 234 | + [ |
| 235 | + sys.executable, |
| 236 | + "-m", |
| 237 | + "shiny", |
| 238 | + "run", |
| 239 | + "--port", |
| 240 | + str(shiny_port), |
| 241 | + str(app_file), |
| 242 | + ], |
| 243 | + bufsize=bufsize, |
| 244 | + executable=sys.executable, |
| 245 | + stdout=subprocess.PIPE, |
| 246 | + stderr=subprocess.PIPE, |
| 247 | + cwd=cwd, |
| 248 | + encoding="utf-8", |
| 249 | + ) |
| 250 | + |
| 251 | + # TODO: Detect early exit |
| 252 | + |
| 253 | + sa = ShinyAppProc(child, shiny_port, app_file=app_file) |
| 254 | + |
| 255 | + if wait_for_start: |
| 256 | + try: |
| 257 | + sa.wait_until_ready(timeout_secs) |
| 258 | + except ConnectionError as e: |
| 259 | + logging.error(f"Failed to bind to port: {e}") |
| 260 | + |
| 261 | + # Make sure the current process is closed |
| 262 | + sa.close() |
| 263 | + |
| 264 | + start_attempts -= 1 |
| 265 | + if start_attempts < 1: |
| 266 | + # Ran out of attempts! |
| 267 | + raise e |
| 268 | + |
| 269 | + # Try again with a new port! |
| 270 | + return run_shiny_app( |
| 271 | + app_file, |
| 272 | + start_attempts=start_attempts, |
| 273 | + port=port, |
| 274 | + cwd=cwd, |
| 275 | + wait_for_start=wait_for_start, |
| 276 | + timeout_secs=timeout_secs, |
| 277 | + bufsize=bufsize, |
| 278 | + ) |
| 279 | + |
| 280 | + return sa |
| 281 | + |
| 282 | + |
| 283 | +# Internal method to help make fixtures a little easier to write |
| 284 | +# Attempt up to 3 times to start the app, with a random port each time |
| 285 | +def shiny_app_gen( |
| 286 | + app_file: PurePath | str, |
| 287 | + *, |
| 288 | + start_attempts: int = 3, |
| 289 | + port: int = 0, |
| 290 | + cwd: Optional[str] = None, |
| 291 | + # wait_for_start: bool = False, |
| 292 | + timeout_secs: float = 30, |
| 293 | + bufsize: int = 64 * 1024, |
| 294 | +) -> Generator[ShinyAppProc, Any, None]: |
| 295 | + """ |
| 296 | + Run a Shiny app in a subprocess. |
| 297 | +
|
| 298 | + This app will be automatically shut down when the Generator is exhausted. A |
| 299 | + generator is returned so we can utilize the context manager methods of the |
| 300 | + `ShinyAppProc` class (`__enter__` and `__exit__`). This allows for the app to be |
| 301 | + automatically shut down when the context manager exists. (This exit method is not |
| 302 | + possible when returning a ShinyAppProc directly.) |
| 303 | +
|
| 304 | + Parameters |
| 305 | + ---------- |
| 306 | + app |
| 307 | + The path to the Shiny app file. |
| 308 | + start_attempts |
| 309 | + Number of attempts to try and start the Shiny app. If the random port is already |
| 310 | + in use, a new random port will be chosen and another attempt will be made. If |
| 311 | + all attempts have been made, an error will be raised. |
| 312 | + port |
| 313 | + The port to run the app on. If 0, a random port will be chosen. |
| 314 | + cwd |
| 315 | + The working directory to run the app in. |
| 316 | + timeout_secs |
| 317 | + The maximum number of seconds to wait for the app to become ready. |
| 318 | + bufsize |
| 319 | + The buffer size to use for stdout and stderr. |
| 320 | +
|
| 321 | + Yields |
| 322 | + ------ |
| 323 | + : |
| 324 | + A single Shiny app process |
| 325 | + """ |
| 326 | + # wait_for_start |
| 327 | + # If True, wait for the app to become ready before returning. |
| 328 | + |
| 329 | + sa = run_shiny_app( |
| 330 | + app_file, |
| 331 | + wait_for_start=True, |
| 332 | + start_attempts=start_attempts, |
| 333 | + port=port, |
| 334 | + cwd=cwd, |
| 335 | + bufsize=bufsize, |
| 336 | + timeout_secs=timeout_secs, |
| 337 | + ) |
| 338 | + had_connection_error: bool = False |
| 339 | + try: |
| 340 | + with sa: |
| 341 | + yield sa |
| 342 | + except ConnectionError as e: |
| 343 | + had_connection_error = True |
| 344 | + raise e |
| 345 | + finally: |
| 346 | + if not had_connection_error: |
| 347 | + logging.warning("Application output:\n" + str(sa.stderr)) |
0 commit comments