Skip to content

Send custom events from tool function, for AG-UI and event_stream_handler #2382

@kadosh1000

Description

@kadosh1000

Access to Event Bus for Runtime Events Without Tool Activation

Summary

I need to send events to the ag_ui frontend during agent execution (progress updates, intermediate results, etc.) without having to create tools specifically for event emission. The event bus functionality appears to be private in the current implementation.

Use Case

I want to provide real-time feedback to users during long-running agent operations:

  • Progress indicators during multi-step processes
  • Intermediate results before final completion
  • Status updates during external API calls
  • Debug information during development

What I've Tried

from pydantic_ai import Agent, RunContext

agent = Agent('openai:gpt-4')

@agent.tool
def long_running_task(ctx: RunContext, task_params: str) -> str:
    # I want to emit progress events here
    # But can't access the event bus directly
    
    for i in range(10):
        # Would like to emit: {"type": "progress", "value": i/10}
        # Currently no way to access ag_ui's event system
        time.sleep(1)  # Simulate work
        
    return "Task completed"

app = agent.to_ag_ui()

Looking at the source code, I can see there's event bus functionality, but it seems to be private/internal.

Expected Behavior

Access to emit custom events during agent execution:

@agent.tool
def long_running_task(ctx: RunContext, task_params: str) -> str:
    event_bus = ctx.get_event_bus()  # or similar API
    
    for i in range(10):
        event_bus.emit(CustomEvent(
            type=EventType.CUSTOM,
            name='count',
            value=i,
        ))
        time.sleep(1)
        
    return "Task completed"

Or maybe support "Generator tools"?

@agent.generator_tool
def long_running_task(ctx: RunContext, task_params: str) -> Generator[CustomEvent, None, str]:
    for i in range(10):
        yield CustomEvent(
            type=EventType.CUSTOM,
            name='count',
            value=i,
        )
        time.sleep(1)
        
    return "Task completed"

Actual Behavior

No apparent way to access the event bus from within agent tools or system prompts, making real-time user feedback impossible without creating dummy tools just for event emission.

Questions

  1. Is there a current way to access the event bus from within agent execution?
  2. If not, could this functionality be exposed through the RunContext or similar mechanism?
  3. Would you be open to making the event bus API public for custom event emission?
  4. What would be the preferred API design for this feature?

Additional Context

This would greatly improve user experience for long-running agents by providing immediate feedback rather than waiting for completion. Many production use cases require progress indication for user engagement and debugging purposes.

Additional Context

  • pydantic-ai version: 0.4.9
  • Python version: 3.14

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions