-
Notifications
You must be signed in to change notification settings - Fork 114
Description
While trying to port this app: https://github.com/posit-dev/py-shiny/blob/9ad2ab12ae0054ac745efed78da5fceb1e09fc43/shiny/api-examples/poll/app.py
I tried defining a @reactive.poll
in a separate file (data.py), and then using it from the Express app.
Full source code
app.py
from shiny import reactive
from shiny.express import input, render, ui
from data import stock_quotes, SYMBOLS
ui.markdown(
"""
# `shiny.reactive.poll` demo
This example app shows how to stream results from a database (in this
case, an in-memory sqlite3) with the help of `shiny.reactive.poll`.
"""
)
ui.input_selectize("symbols", "Filter by symbol", [""] + SYMBOLS, multiple=True)
def filtered_quotes():
df = stock_quotes()
if input.symbols():
df = df[df["symbol"].isin(input.symbols())]
return df
@render.express
def table():
ui.HTML(
filtered_quotes().to_html(
index=False, classes="table font-monospace w-auto"
)
)
data.py
import asyncio
import random
import sqlite3
from datetime import datetime
from typing import Any, Awaitable
import pandas as pd
from shiny import reactive
SYMBOLS = ["AAA", "BBB", "CCC", "DDD", "EEE", "FFF"]
def timestamp() -> str:
return datetime.now().strftime("%x %X")
def rand_price() -> float:
return round(random.random() * 250, 2)
# === Initialize the database =========================================
def init_db(con: sqlite3.Connection) -> None:
cur = con.cursor()
try:
cur.executescript(
"""
CREATE TABLE stock_quotes (timestamp text, symbol text, price real);
CREATE INDEX idx_timestamp ON stock_quotes (timestamp);
"""
)
cur.executemany(
"INSERT INTO stock_quotes (timestamp, symbol, price) VALUES (?, ?, ?)",
[(timestamp(), symbol, rand_price()) for symbol in SYMBOLS],
)
con.commit()
finally:
cur.close()
conn = sqlite3.connect(":memory:")
init_db(conn)
# === Randomly update the database with an asyncio.task ==============
def update_db(con: sqlite3.Connection) -> None:
"""Update a single stock price entry at random"""
cur = con.cursor()
try:
sym = SYMBOLS[random.randint(0, len(SYMBOLS) - 1)]
print(f"Updating {sym}")
cur.execute(
"UPDATE stock_quotes SET timestamp = ?, price = ? WHERE symbol = ?",
(timestamp(), rand_price(), sym),
)
con.commit()
finally:
cur.close()
async def update_db_task(con: sqlite3.Connection) -> Awaitable[None]:
"""Task that alternates between sleeping and updating prices"""
while True:
await asyncio.sleep(random.random() * 1.5)
update_db(con)
asyncio.create_task(update_db_task(conn))
# === Create the reactive.poll object ===============================
def tbl_last_modified() -> Any:
print("polling")
df = pd.read_sql_query("SELECT MAX(timestamp) AS timestamp FROM stock_quotes", conn)
return df["timestamp"].to_list()
@reactive.poll(tbl_last_modified, 0.5)
def stock_quotes() -> pd.DataFrame:
return pd.read_sql_query("SELECT timestamp, symbol, price FROM stock_quotes", conn)
This loaded, but did not update. The polling function passed to @reactive.poll
, tbl_last_modified
, did not execute every 0.5 seconds as expected.
The reason for this is because the data.py
module is only loaded once (exactly the reason why we created it), but at the time that it's loaded, there's a session active--it happens to be a MockSession. When that MockSession closes, any reactive effects that belong to it shut down--in this case, our @reactive.poll
.
This is a tough nut because this automatic closing of effects is necessary for ones that really do belong to the session, but data.py
is intended to be global.
I worked around this by passing an explicit session=None
argument to reactive.poll()
.
A fix could be... I don't know, to have a special name for such modules? global.py
? And we load them first, outside of any session?