Skip to content

Commit 68c5138

Browse files
committed
fix: Use ThreadPoolExecutor to avoid event loop conflicts in HybridBackgroundTasks
- Wrap Docket async operations in ThreadPoolExecutor to run in separate thread - This should resolve 'Event loop is closed' errors in CI - Tests may need updating to reflect that Docket operations run in background
1 parent 325bb71 commit 68c5138

File tree

1 file changed

+30
-28
lines changed

1 file changed

+30
-28
lines changed

agent_memory_server/dependencies.py

Lines changed: 30 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import asyncio
1+
import concurrent.futures
22
from collections.abc import Callable
33
from typing import Any
44

@@ -20,38 +20,40 @@ def add_task(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
2020

2121
if settings.use_docket:
2222
logger.info("Scheduling task through Docket")
23-
# Schedule task directly in Docket's Redis queue
24-
self._schedule_docket_task(func, *args, **kwargs)
23+
24+
# Create a wrapper that will handle Docket scheduling in a thread
25+
def docket_wrapper():
26+
"""Wrapper function that schedules the task through Docket"""
27+
28+
def run_in_thread():
29+
"""Run the async Docket operations in a separate thread"""
30+
import asyncio
31+
32+
from docket import Docket
33+
34+
async def schedule_task():
35+
async with Docket(
36+
name=settings.docket_name,
37+
url=settings.redis_url,
38+
) as docket:
39+
# Schedule task in Docket's queue
40+
await docket.add(func)(*args, **kwargs)
41+
42+
# Run in a new event loop in this thread
43+
asyncio.run(schedule_task())
44+
45+
# Execute in a thread pool to avoid event loop conflicts
46+
with concurrent.futures.ThreadPoolExecutor() as executor:
47+
future = executor.submit(run_in_thread)
48+
future.result() # Wait for completion
49+
50+
# Add the wrapper to FastAPI background tasks
51+
super().add_task(docket_wrapper)
2552
else:
2653
logger.info("Using FastAPI background tasks")
2754
# Use FastAPI's background tasks directly
2855
super().add_task(func, *args, **kwargs)
2956

30-
def _schedule_docket_task(
31-
self, func: Callable[..., Any], *args: Any, **kwargs: Any
32-
) -> None:
33-
"""Schedule a task in Docket's Redis queue"""
34-
35-
async def schedule_task():
36-
from docket import Docket
37-
38-
async with Docket(
39-
name=settings.docket_name,
40-
url=settings.redis_url,
41-
) as docket:
42-
# Schedule task in Docket's queue
43-
await docket.add(func)(*args, **kwargs)
44-
45-
# Run the async scheduling operation
46-
try:
47-
# Try to get the current event loop
48-
loop = asyncio.get_running_loop()
49-
# If we're in an async context, create a task
50-
loop.create_task(schedule_task())
51-
except RuntimeError:
52-
# If no event loop is running, run it synchronously
53-
asyncio.run(schedule_task())
54-
5557

5658
# Backwards compatibility alias
5759
DocketBackgroundTasks = HybridBackgroundTasks

0 commit comments

Comments
 (0)