Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### New features

* Added `shiny.render.renderer_components` decorator to help create new output renderers. (#621)
* `Session` objects can now accept an asynchronous (or synchronous) function for `.on_flush(fn=)`, `.on_flushed(fn=)`, and `.on_ended(fn=)` (#686).

### Bug fixes

Expand Down
40 changes: 25 additions & 15 deletions shiny/session/_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from .._fileupload import FileInfo, FileUploadManager
from .._namespaces import Id, ResolvedId, Root
from .._typing_extensions import TypedDict
from .._utils import wrap_async
from ..http_staticfiles import FileResponse
from ..input_handler import input_handlers
from ..reactive import Effect, Effect_, Value, flush, isolate
Expand Down Expand Up @@ -196,15 +197,15 @@ def __init__(
str, Callable[..., Awaitable[object]]
] = self._create_message_handlers()
self._file_upload_manager: FileUploadManager = FileUploadManager()
self._on_ended_callbacks = _utils.Callbacks()
self._on_ended_callbacks = _utils.AsyncCallbacks()
self._has_run_session_end_tasks: bool = False
self._downloads: dict[str, DownloadInfo] = {}
self._dynamic_routes: dict[str, DynamicRouteHandler] = {}

self._register_session_end_callbacks()

self._flush_callbacks = _utils.Callbacks()
self._flushed_callbacks = _utils.Callbacks()
self._flush_callbacks = _utils.AsyncCallbacks()
self._flushed_callbacks = _utils.AsyncCallbacks()

def _register_session_end_callbacks(self) -> None:
# This is to be called from the initialization. It registers functions
Expand All @@ -213,13 +214,13 @@ def _register_session_end_callbacks(self) -> None:
# Clear file upload directories, if present
self.on_ended(self._file_upload_manager.rm_upload_dir)

def _run_session_end_tasks(self) -> None:
async def _run_session_end_tasks(self) -> None:
if self._has_run_session_end_tasks:
return
self._has_run_session_end_tasks = True

try:
self._on_ended_callbacks.invoke()
await self._on_ended_callbacks.invoke()
finally:
self.app._remove_session(self)

Expand All @@ -228,7 +229,7 @@ async def close(self, code: int = 1001) -> None:
Close the session.
"""
await self._conn.close(code, None)
self._run_session_end_tasks()
await self._run_session_end_tasks()

async def _run(self) -> None:
conn_state: ConnectionState = ConnectionState.Start
Expand Down Expand Up @@ -318,7 +319,7 @@ def verify_state(expected_state: ConnectionState) -> None:
finally:
await self.close()
finally:
self._run_session_end_tasks()
await self._run_session_end_tasks()

def _manage_inputs(self, data: dict[str, object]) -> None:
for key, val in data.items():
Expand Down Expand Up @@ -632,7 +633,11 @@ def _send_error_response(self, message_str: str) -> None:
# Flush
# ==========================================================================
@add_example()
def on_flush(self, fn: Callable[[], None], once: bool = True) -> Callable[[], None]:
def on_flush(
self,
fn: Callable[[], None] | Callable[[], Awaitable[None]],
once: bool = True,
) -> Callable[[], None]:
"""
Register a function to call before the next reactive flush.

Expand All @@ -648,11 +653,13 @@ def on_flush(self, fn: Callable[[], None], once: bool = True) -> Callable[[], No
:
A function that can be used to cancel the registration.
"""
return self._flush_callbacks.register(fn, once)
return self._flush_callbacks.register(wrap_async(fn), once)

@add_example()
def on_flushed(
self, fn: Callable[[], None], once: bool = True
self,
fn: Callable[[], None] | Callable[[], Awaitable[None]],
once: bool = True,
) -> Callable[[], None]:
"""
Register a function to call after the next reactive flush.
Expand All @@ -669,14 +676,14 @@ def on_flushed(
:
A function that can be used to cancel the registration.
"""
return self._flushed_callbacks.register(fn, once)
return self._flushed_callbacks.register(wrap_async(fn), once)

def _request_flush(self) -> None:
self.app._request_flush(self)

async def _flush(self) -> None:
with session_context(self):
self._flush_callbacks.invoke()
await self._flush_callbacks.invoke()

try:
omq = self._outbound_message_queues
Expand All @@ -701,13 +708,16 @@ async def _flush(self) -> None:
self._outbound_message_queues = empty_outbound_message_queues()
finally:
with session_context(self):
self._flushed_callbacks.invoke()
await self._flushed_callbacks.invoke()

# ==========================================================================
# On session ended
# ==========================================================================
@add_example()
def on_ended(self, fn: Callable[[], None]) -> Callable[[], None]:
def on_ended(
self,
fn: Callable[[], None] | Callable[[], Awaitable[None]],
) -> Callable[[], None]:
"""
Registers a function to be called after the client has disconnected.

Expand All @@ -721,7 +731,7 @@ def on_ended(self, fn: Callable[[], None]) -> Callable[[], None]:
:
A function that can be used to cancel the registration.
"""
return self._on_ended_callbacks.register(fn)
return self._on_ended_callbacks.register(wrap_async(fn))

# ==========================================================================
# Misc
Expand Down
189 changes: 189 additions & 0 deletions tests/e2e/session/flush/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
from __future__ import annotations

import asyncio

from shiny import App, Inputs, Outputs, Session, reactive, render, ui

app_ui = ui.page_fluid(
ui.markdown(
"""
# `session.on_flush` and `session.on_flushed` Reprex

Verify that `on_flush` and `on_flushed` are called in the correct order, and that they can be cancelled, handle, synchronous functions, and handle asynchronous functions.

It is not safe to put reactivity inside `session.on_flush()` or `session.on_flushed()` callbacks as the reactive graph is not locked. This app breaks that rule to test the behavior as there is only one user. (If there were multiple users, the reactive graph would not be locked could update while waiting for an async callback.)

The reprex below will click the button twice (click count is `K`), once after 250ms and another after another 250ms. The expected output is:
* `a-K-EVENT`
* `bx-K-first-EVENT`
* `by-K-first-EVENT`
* `bx-K-second-EVENT`
* `by-K-second-EVENT`
* `c-K-EVENT`


Even though the `flush` and `flushed` events where mixed when being added, all `flush` events should occur before all `flushed` events.

Without something to continuously trigger the reactive graph, the `K` value will be `1` less than the click count. To combat this, a reactive event will trigger every 250ms to invoke session `flush` / `flushed` callback.

## Automated Reprex:
"""
),
ui.input_action_button("btn", "Click me!"),
ui.tags.br(),
ui.tags.span("Counter: "),
ui.output_text_verbatim("btn_txt", placeholder=True),
ui.tags.span("All events: "),
ui.output_text_verbatim("all_txt", placeholder=True),
ui.tags.span("Flush events: "),
ui.output_text_verbatim("flush_txt", placeholder=True),
ui.tags.span("Flushed: "),
ui.output_text_verbatim("flushed_txt", placeholder=True),
ui.tags.script(
"""
$(document).on('shiny:connected', function(event) {
const n = 250
document.querySelector("#btn").click();
setTimeout(function() {
document.querySelector("#btn").click();
}, n)
setTimeout(function() {
document.querySelector("#btn").click();
}, 2 * n)
});
"""
),
)


def server(input: Inputs, output: Outputs, session: Session):
def on_ended_sync(txt: str):
def _():
print(txt)

return _

def on_ended_async(txt: str):
async def _():
await asyncio.sleep(0)
print(txt)

return _

cancel_on_ended_sync = session.on_ended(
on_ended_sync("session ended - sync - cancel")
)
cancel_on_ended_async = session.on_ended(
on_ended_async("session ended - async - cancel")
)
session.on_ended(on_ended_sync("session ended - sync - test1"))
session.on_ended(on_ended_async("session ended - async - test2"))
session.on_ended(on_ended_async("session ended - async - test3"))
session.on_ended(on_ended_sync("session ended - sync - test4"))

all_vals: reactive.Value[tuple[str, ...]] = reactive.Value(())
flush_vals: reactive.Value[tuple[str, ...]] = reactive.Value(())
flushed_vals: reactive.Value[tuple[str, ...]] = reactive.Value(())

def call_a(
vals: reactive.Value[tuple[str, ...]],
suffix: str,
):
def _():
with reactive.isolate():
all_vals.set(all_vals.get() + (f"a-{suffix}",))
vals.set(vals.get() + (f"a-{suffix}",))

return _

def call_b(
vals: reactive.Value[tuple[str, ...]],
suffix: str,
):
async def _():
with reactive.isolate():
all_vals.set(all_vals.get() + (f"bx-{suffix}",))
vals.set(vals.get() + (f"bx-{suffix}",))
await asyncio.sleep(0)
with reactive.isolate():
all_vals.set(all_vals.get() + (f"by-{suffix}",))
vals.set(vals.get() + (f"by-{suffix}",))

return _

def call_c(
vals: reactive.Value[tuple[str, ...]],
suffix: str,
):
def _():
with reactive.isolate():
all_vals.set(all_vals.get() + (f"c-{suffix}",))
vals.set(vals.get() + (f"c-{suffix}",))

return _

# Continuously trigger the reactive graph to ensure that the flush / flushed
# callbacks are called. If this Effect is not called, then the click counter will
# always be one higher than the flush/flushed values displayed.
@reactive.Effect
def _():
reactive.invalidate_later(0.25)

@reactive.Effect
@reactive.event(input.btn)
def _():
btn_count = input.btn()

def reset():
all_vals.set(())
flush_vals.set(())
flushed_vals.set(())

session.on_flush(reset, once=True)

session.on_flushed(call_a(flushed_vals, f"{btn_count}-flushed"), once=True)
session.on_flushed(
call_b(flushed_vals, f"{btn_count}-first-flushed"), once=True
)
session.on_flush(call_a(flush_vals, f"{btn_count}-flush"), once=True)
cancel_b_flush = session.on_flush(
call_b(flush_vals, f"{btn_count}-cancel-flush"), once=True
)
session.on_flush(call_b(flush_vals, f"{btn_count}-first-flush"), once=True)
session.on_flushed(
call_b(flushed_vals, f"{btn_count}-second-flushed"), once=True
)
session.on_flush(call_b(flush_vals, f"{btn_count}-second-flush"), once=True)
session.on_flushed(call_c(flushed_vals, f"{btn_count}-flushed"), once=True)
cancel_c_flushed = session.on_flushed(
call_c(flushed_vals, f"{btn_count}-cancel-flushed"), once=True
)
session.on_flush(call_c(flush_vals, f"{btn_count}-flush"), once=True)

cancel_b_flush()
cancel_c_flushed()
cancel_on_ended_sync()
cancel_on_ended_async()

@output
@render.text
def btn_txt():
return str(input.btn())

@output
@render.text
def all_txt():
return str(all_vals.get())

@output
@render.text
def flush_txt():
return str(flush_vals.get())

@output
@render.text
def flushed_txt():
return str(flushed_vals.get())


app = App(app_ui, server)
42 changes: 42 additions & 0 deletions tests/e2e/session/flush/test_on_flush.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from conftest import ShinyAppProc
from controls import OutputTextVerbatim
from playwright.sync_api import Page


def test_output_image_kitchen(page: Page, local_app: ShinyAppProc) -> None:
page.goto(local_app.url)

OutputTextVerbatim(page, "all_txt").expect_value(
"('a-3-flush', 'bx-3-first-flush', 'by-3-first-flush', 'bx-3-second-flush', "
"'by-3-second-flush', 'c-3-flush', 'a-3-flushed', 'bx-3-first-flushed', "
"'by-3-first-flushed', 'bx-3-second-flushed', 'by-3-second-flushed', "
"'c-3-flushed')"
)
OutputTextVerbatim(page, "flush_txt").expect_value(
"('a-3-flush', 'bx-3-first-flush', 'by-3-first-flush', 'bx-3-second-flush', "
"'by-3-second-flush', 'c-3-flush')"
)
OutputTextVerbatim(page, "flushed_txt").expect_value(
"('a-3-flushed', 'bx-3-first-flushed', 'by-3-first-flushed', "
"'bx-3-second-flushed', 'by-3-second-flushed', 'c-3-flushed')"
)

# Verify `on_ended` callbacks are called in the correct order (and cancelled)
local_app.close()

# Wait up to 3 seconds for the app to close and print the logs. (Should be ~ instant)
local_app.stdout.wait_for(lambda x: "test4" in x, 3)
stdout = str(local_app.stdout)
out_indexes = [
stdout.index("session ended - sync - test1"),
stdout.index("session ended - async - test2"),
stdout.index("session ended - async - test3"),
stdout.index("session ended - sync - test4"),
]
for i in range(len(out_indexes)):
index = out_indexes[i]
assert index >= 0
# Make sure they are ordered correctly
if i > 0:
prev_index = out_indexes[i - 1]
assert index > prev_index