From 20d67a992f2ba779ee9a297d373612591a9df82e Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Mon, 21 Aug 2023 14:53:23 -0400 Subject: [PATCH 1/5] Make Session `on_flush()` and `on_flushed()` accept async functions Execute the functions asynchronously, one after another --- shiny/session/_session.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/shiny/session/_session.py b/shiny/session/_session.py index d14cec0dd..abe47619d 100644 --- a/shiny/session/_session.py +++ b/shiny/session/_session.py @@ -43,6 +43,7 @@ from .._fileupload import FileInfo, FileUploadManager from .._namespaces import Id, ResolvedId, Root from .._typing_extensions import TypedDict +from .._utils import wrap_async from ..http_staticfiles import FileResponse from ..input_handler import input_handlers from ..reactive import Effect, Effect_, Value, flush, isolate @@ -203,8 +204,8 @@ def __init__( self._register_session_end_callbacks() - self._flush_callbacks = _utils.Callbacks() - self._flushed_callbacks = _utils.Callbacks() + self._flush_callbacks = _utils.AsyncCallbacks() + self._flushed_callbacks = _utils.AsyncCallbacks() def _register_session_end_callbacks(self) -> None: # This is to be called from the initialization. It registers functions @@ -632,7 +633,11 @@ def _send_error_response(self, message_str: str) -> None: # Flush # ========================================================================== @add_example() - def on_flush(self, fn: Callable[[], None], once: bool = True) -> Callable[[], None]: + def on_flush( + self, + fn: Callable[[], None] | Callable[[], Awaitable[None]], + once: bool = True, + ) -> Callable[[], None]: """ Register a function to call before the next reactive flush. @@ -648,11 +653,13 @@ def on_flush(self, fn: Callable[[], None], once: bool = True) -> Callable[[], No : A function that can be used to cancel the registration. """ - return self._flush_callbacks.register(fn, once) + return self._flush_callbacks.register(wrap_async(fn), once) @add_example() def on_flushed( - self, fn: Callable[[], None], once: bool = True + self, + fn: Callable[[], None] | Callable[[], Awaitable[None]], + once: bool = True, ) -> Callable[[], None]: """ Register a function to call after the next reactive flush. @@ -669,14 +676,14 @@ def on_flushed( : A function that can be used to cancel the registration. """ - return self._flushed_callbacks.register(fn, once) + return self._flushed_callbacks.register(wrap_async(fn), once) def _request_flush(self) -> None: self.app._request_flush(self) async def _flush(self) -> None: with session_context(self): - self._flush_callbacks.invoke() + await self._flush_callbacks.invoke() try: omq = self._outbound_message_queues @@ -701,7 +708,7 @@ async def _flush(self) -> None: self._outbound_message_queues = empty_outbound_message_queues() finally: with session_context(self): - self._flushed_callbacks.invoke() + await self._flushed_callbacks.invoke() # ========================================================================== # On session ended From 751a6a935a44d28067bcf4f797a6abe996d58c7d Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Mon, 21 Aug 2023 15:00:11 -0400 Subject: [PATCH 2/5] Async-ify Session `on_ended()` tasks --- shiny/session/_session.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/shiny/session/_session.py b/shiny/session/_session.py index abe47619d..f3b0af941 100644 --- a/shiny/session/_session.py +++ b/shiny/session/_session.py @@ -197,7 +197,7 @@ def __init__( str, Callable[..., Awaitable[object]] ] = self._create_message_handlers() self._file_upload_manager: FileUploadManager = FileUploadManager() - self._on_ended_callbacks = _utils.Callbacks() + self._on_ended_callbacks = _utils.AsyncCallbacks() self._has_run_session_end_tasks: bool = False self._downloads: dict[str, DownloadInfo] = {} self._dynamic_routes: dict[str, DynamicRouteHandler] = {} @@ -214,13 +214,13 @@ def _register_session_end_callbacks(self) -> None: # Clear file upload directories, if present self.on_ended(self._file_upload_manager.rm_upload_dir) - def _run_session_end_tasks(self) -> None: + async def _run_session_end_tasks(self) -> None: if self._has_run_session_end_tasks: return self._has_run_session_end_tasks = True try: - self._on_ended_callbacks.invoke() + await self._on_ended_callbacks.invoke() finally: self.app._remove_session(self) @@ -229,7 +229,7 @@ async def close(self, code: int = 1001) -> None: Close the session. """ await self._conn.close(code, None) - self._run_session_end_tasks() + await self._run_session_end_tasks() async def _run(self) -> None: conn_state: ConnectionState = ConnectionState.Start @@ -319,7 +319,7 @@ def verify_state(expected_state: ConnectionState) -> None: finally: await self.close() finally: - self._run_session_end_tasks() + await self._run_session_end_tasks() def _manage_inputs(self, data: dict[str, object]) -> None: for key, val in data.items(): @@ -714,7 +714,10 @@ async def _flush(self) -> None: # On session ended # ========================================================================== @add_example() - def on_ended(self, fn: Callable[[], None]) -> Callable[[], None]: + def on_ended( + self, + fn: Callable[[], None] | Callable[[], Awaitable[None]], + ) -> Callable[[], None]: """ Registers a function to be called after the client has disconnected. @@ -728,7 +731,7 @@ def on_ended(self, fn: Callable[[], None]) -> Callable[[], None]: : A function that can be used to cancel the registration. """ - return self._on_ended_callbacks.register(fn) + return self._on_ended_callbacks.register(wrap_async(fn)) # ========================================================================== # Misc From 06e707052b787b262704e9f6691d5d9c5d27ebc0 Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Tue, 22 Aug 2023 16:08:01 -0400 Subject: [PATCH 3/5] Add e2e test of async server flush and flushed events --- tests/e2e/session/flush/app.py | 163 +++++++++++++++++++++++ tests/e2e/session/flush/test_on_flush.py | 22 +++ 2 files changed, 185 insertions(+) create mode 100644 tests/e2e/session/flush/app.py create mode 100644 tests/e2e/session/flush/test_on_flush.py diff --git a/tests/e2e/session/flush/app.py b/tests/e2e/session/flush/app.py new file mode 100644 index 000000000..d428c473d --- /dev/null +++ b/tests/e2e/session/flush/app.py @@ -0,0 +1,163 @@ +from __future__ import annotations + +import asyncio + +from shiny import App, Inputs, Outputs, Session, reactive, render, ui + +app_ui = ui.page_fluid( + ui.markdown( + """ + # `session.on_flush` and `session.on_flushed` Reprex + + Verify that `on_flush` and `on_flushed` are called in the correct order, and that they can be cancelled, handle, synchronous functions, and handle asynchronous functions. + + It is not safe to put reactivity inside `session.on_flush()` or `session.on_flushed()` callbacks as the reactive graph is not locked. This app breaks that rule to test the behavior as there is only one user. (If there were multiple users, the reactive graph would not be locked could update while waiting for an async callback.) + + The reprex below will click the button twice (click count is `K`), once after 250ms and another after another 250ms. The expected output is: + * `a-K-EVENT` + * `bx-K-first-EVENT` + * `by-K-first-EVENT` + * `bx-K-second-EVENT` + * `by-K-second-EVENT` + * `c-K-EVENT` + + + Even though the `flush` and `flushed` events where mixed when being added, all `flush` events should occur before all `flushed` events. + + Without something to continuously trigger the reactive graph, the `K` value will be `1` less than the click count. To combat this, a reactive event will trigger every 250ms to invoke session `flush` / `flushed` callback. + + ## Automated Reprex: + """ + ), + ui.input_action_button("btn", "Click me!"), + ui.tags.br(), + ui.tags.span("Counter: "), + ui.output_text_verbatim("btn_txt", placeholder=True), + ui.tags.span("All events: "), + ui.output_text_verbatim("all_txt", placeholder=True), + ui.tags.span("Flush events: "), + ui.output_text_verbatim("flush_txt", placeholder=True), + ui.tags.span("Flushed: "), + ui.output_text_verbatim("flushed_txt", placeholder=True), + ui.tags.script( + """ + $(document).on('shiny:connected', function(event) { + const n = 250 + document.querySelector("#btn").click(); + setTimeout(function() { + document.querySelector("#btn").click(); + }, n) + setTimeout(function() { + document.querySelector("#btn").click(); + }, 2 * n) + }); + """ + ), +) + + +def server(input: Inputs, output: Outputs, session: Session): + all_vals: reactive.Value[tuple[str, ...]] = reactive.Value(()) + flush_vals: reactive.Value[tuple[str, ...]] = reactive.Value(()) + flushed_vals: reactive.Value[tuple[str, ...]] = reactive.Value(()) + + def call_a( + vals: reactive.Value[tuple[str, ...]], + suffix: str, + ): + def _(): + with reactive.isolate(): + all_vals.set(all_vals.get() + (f"a-{suffix}",)) + vals.set(vals.get() + (f"a-{suffix}",)) + + return _ + + def call_b( + vals: reactive.Value[tuple[str, ...]], + suffix: str, + ): + async def _(): + with reactive.isolate(): + all_vals.set(all_vals.get() + (f"bx-{suffix}",)) + vals.set(vals.get() + (f"bx-{suffix}",)) + await asyncio.sleep(0) + with reactive.isolate(): + all_vals.set(all_vals.get() + (f"by-{suffix}",)) + vals.set(vals.get() + (f"by-{suffix}",)) + + return _ + + def call_c( + vals: reactive.Value[tuple[str, ...]], + suffix: str, + ): + def _(): + with reactive.isolate(): + all_vals.set(all_vals.get() + (f"c-{suffix}",)) + vals.set(vals.get() + (f"c-{suffix}",)) + + return _ + + # Continuously trigger the reactive graph to ensure that the flush / flushed + # callbacks are called. If this Effect is not called, then the click counter will + # always be one higher than the flush/flushed values displayed. + @reactive.Effect + def _(): + reactive.invalidate_later(0.25) + + @reactive.Effect + @reactive.event(input.btn) + def _(): + btn_count = input.btn() + + def reset(): + all_vals.set(()) + flush_vals.set(()) + flushed_vals.set(()) + + session.on_flush(reset, once=True) + + session.on_flushed(call_a(flushed_vals, f"{btn_count}-flushed"), once=True) + session.on_flushed( + call_b(flushed_vals, f"{btn_count}-first-flushed"), once=True + ) + session.on_flush(call_a(flush_vals, f"{btn_count}-flush"), once=True) + cancel_b_flush = session.on_flush( + call_b(flush_vals, f"{btn_count}-cancel-flush"), once=True + ) + session.on_flush(call_b(flush_vals, f"{btn_count}-first-flush"), once=True) + session.on_flushed( + call_b(flushed_vals, f"{btn_count}-second-flushed"), once=True + ) + session.on_flush(call_b(flush_vals, f"{btn_count}-second-flush"), once=True) + session.on_flushed(call_c(flushed_vals, f"{btn_count}-flushed"), once=True) + cancel_c_flushed = session.on_flushed( + call_c(flushed_vals, f"{btn_count}-cancel-flushed"), once=True + ) + session.on_flush(call_c(flush_vals, f"{btn_count}-flush"), once=True) + + cancel_b_flush() + cancel_c_flushed() + + @output + @render.text + def btn_txt(): + return str(input.btn()) + + @output + @render.text + def all_txt(): + return str(all_vals.get()) + + @output + @render.text + def flush_txt(): + return str(flush_vals.get()) + + @output + @render.text + def flushed_txt(): + return str(flushed_vals.get()) + + +app = App(app_ui, server) diff --git a/tests/e2e/session/flush/test_on_flush.py b/tests/e2e/session/flush/test_on_flush.py new file mode 100644 index 000000000..dc161c509 --- /dev/null +++ b/tests/e2e/session/flush/test_on_flush.py @@ -0,0 +1,22 @@ +from conftest import ShinyAppProc +from controls import OutputTextVerbatim +from playwright.sync_api import Page + + +def test_output_image_kitchen(page: Page, local_app: ShinyAppProc) -> None: + page.goto(local_app.url) + + OutputTextVerbatim(page, "all_txt").expect_value( + "('a-3-flush', 'bx-3-first-flush', 'by-3-first-flush', 'bx-3-second-flush', " + "'by-3-second-flush', 'c-3-flush', 'a-3-flushed', 'bx-3-first-flushed', " + "'by-3-first-flushed', 'bx-3-second-flushed', 'by-3-second-flushed', " + "'c-3-flushed')" + ) + OutputTextVerbatim(page, "flush_txt").expect_value( + "('a-3-flush', 'bx-3-first-flush', 'by-3-first-flush', 'bx-3-second-flush', " + "'by-3-second-flush', 'c-3-flush')" + ) + OutputTextVerbatim(page, "flushed_txt").expect_value( + "('a-3-flushed', 'bx-3-first-flushed', 'by-3-first-flushed', " + "'bx-3-second-flushed', 'by-3-second-flushed', 'c-3-flushed')" + ) From f7d02219772bb48795810c92ffa99cb2720eb1c0 Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Tue, 22 Aug 2023 16:30:01 -0400 Subject: [PATCH 4/5] Test `on_ended()` callbacks --- tests/e2e/session/flush/app.py | 26 ++++++++++++++++++++++++ tests/e2e/session/flush/test_on_flush.py | 20 ++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/tests/e2e/session/flush/app.py b/tests/e2e/session/flush/app.py index d428c473d..8cc62011a 100644 --- a/tests/e2e/session/flush/app.py +++ b/tests/e2e/session/flush/app.py @@ -57,6 +57,30 @@ def server(input: Inputs, output: Outputs, session: Session): + def on_ended_sync(txt: str): + def _(): + print(txt) + + return _ + + def on_ended_async(txt: str): + async def _(): + await asyncio.sleep(0) + print(txt) + + return _ + + cancel_on_ended_sync = session.on_ended( + on_ended_sync("session ended - sync - cancel") + ) + cancel_on_ended_async = session.on_ended( + on_ended_async("session ended - async - cancel") + ) + session.on_ended(on_ended_sync("session ended - sync - test1")) + session.on_ended(on_ended_async("session ended - async - test2")) + session.on_ended(on_ended_async("session ended - async - test3")) + session.on_ended(on_ended_sync("session ended - sync - test4")) + all_vals: reactive.Value[tuple[str, ...]] = reactive.Value(()) flush_vals: reactive.Value[tuple[str, ...]] = reactive.Value(()) flushed_vals: reactive.Value[tuple[str, ...]] = reactive.Value(()) @@ -138,6 +162,8 @@ def reset(): cancel_b_flush() cancel_c_flushed() + cancel_on_ended_sync() + cancel_on_ended_async() @output @render.text diff --git a/tests/e2e/session/flush/test_on_flush.py b/tests/e2e/session/flush/test_on_flush.py index dc161c509..b59feced9 100644 --- a/tests/e2e/session/flush/test_on_flush.py +++ b/tests/e2e/session/flush/test_on_flush.py @@ -20,3 +20,23 @@ def test_output_image_kitchen(page: Page, local_app: ShinyAppProc) -> None: "('a-3-flushed', 'bx-3-first-flushed', 'by-3-first-flushed', " "'bx-3-second-flushed', 'by-3-second-flushed', 'c-3-flushed')" ) + + # Verify `on_ended` callbacks are called in the correct order (and cancelled) + local_app.close() + + # Wait up to 3 seconds for the app to close and print the logs. (Should be ~ instant) + local_app.stdout.wait_for(lambda x: "test4" in x, 3) + stdout = str(local_app.stdout) + out_indexes = [ + stdout.index("session ended - sync - test1"), + stdout.index("session ended - async - test2"), + stdout.index("session ended - async - test3"), + stdout.index("session ended - sync - test4"), + ] + for i in range(len(out_indexes)): + index = out_indexes[i] + assert index >= 0 + # Make sure they are ordered correctly + if i > 0: + prev_index = out_indexes[i - 1] + assert index > prev_index From c1c291318821da2ab9105149848f19355cc656f5 Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Tue, 22 Aug 2023 16:31:38 -0400 Subject: [PATCH 5/5] Update CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index bf2493da9..96323b3ee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### New features * Added `shiny.render.renderer_components` decorator to help create new output renderers. (#621) +* `Session` objects can now accept an asynchronous (or synchronous) function for `.on_flush(fn=)`, `.on_flushed(fn=)`, and `.on_ended(fn=)` (#686). ### Bug fixes