-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Add support for durable execution with DBOS #2638
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@qianl15 Thanks Qian, nothing more to add :) We're going to launch v1 out of beta tomorrow, and this feels a bit too big to add during the beta stage, so I'll keep the PR open for a few more days to merge it for v1.0.1/v1.1.
@DouweM Sounds good! Thanks for the update. All comments have been addressed on my end. Looking forward to the v1 launch 🚀 |
yield event | ||
|
||
async for event in stream: | ||
await handler(ctx, streamed_response(event)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The goal of this change to TemporalAgent
was to make sure that the event_stream_handler
is only every called from inside an activity, where non-determinism/IO is available. I believe that to get the same behavior here, we'd have to create a new step, call that from here, and then inside the step call the self.event_stream_handler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for raising this! I'm open to discussion here.
The difference is intentional: in DBOSAgent, tools and event stream handlers are not automatically wrapped. Instead, users choose whether to decorate them with @DBOS.step
or invoke DBOS workflows/queues inside them. This provides flexibility:
- Use
@DBOS.step
if the function requires non-determinism or I/O. - Skip the decorator if durability isn't needed, so you avoid the extra DB checkpoint write.
- If the function needs to enqueue tasks, it should run inside the agent's main workflow (not in a step).
This lets you control the right level of durability for your application.
Note: If you call a @DBOS.step
inside another step or outside a workflow, it just runs as a normal function (no extra wrapping).
Here's the updated test case showing the intended usage:
# Wrap event_stream_handler as a DBOS step since it uses logfire (non-deterministic)
@DBOS.step()
async def event_stream_handler(
ctx: RunContext[Deps],
stream: AsyncIterable[AgentStreamEvent],
):
logfire.info(f'{ctx.run_step=}')
async for event in stream:
logfire.info('event', event=event)
And the docs now read:
Custom tool functions and event stream handlers are **not automatically wrapped** by DBOS.
If they involve non-deterministic behavior or perform I/O, explicitly decorate them with `@DBOS.step`.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@qianl15 Ah right, I sort of knew that :) So wrapping the event_stream_handler
in a @DBOS.step
works as expected, even though it's being passed an iterable that items are yielded into asynchronously? That's great.
Then all that's left before I merge are the 2 docs comments below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah! DBOS steps don't require their input parameters to be serializable (that constraint only applies to DBOS workflows for recovery). So you can safely pass in an async iterable to a @DBOS.step
, and it'll work as expected.
# Temporal | ||
temporal = ["temporalio==1.17.0"] | ||
# DBOS | ||
dbos = ["dbos>=1.13.0"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As it says above, "if you add optional groups, please update docs/install.md" :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated docs/install.md!
Summary
Closes #2629
This PR integrates DBOS with
Agent.run
and related methods to provide out-of-the-box durable execution and checkpointing.Changes
Workflows:
Agent.run
andAgent.run_sync
are now decorated as DBOS workflows.Steps:
get_tools
,call_tool
) are decorated as DBOS steps.Tooling:
TODO