-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Add support for durable execution with DBOS #2638
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 91 commits
Commits
Show all changes
94 commits
Select commit
Hold shift + click to select a range
7f2edbd
add DBOS dependency
qianl15 1ed6782
add DBOS dependency
qianl15 dc0d6c9
basic test
qianl15 46f6e1e
basic workflow test running
qianl15 c8fdbb5
DBOS model step
qianl15 2067dd2
DBOS agent run name
qianl15 9f77bb1
Cannot define workflows dynamically
qianl15 4c3fb6e
Complex agent working
qianl15 470e6b7
Complex agent working
qianl15 5a4f048
Complex agent without DBOS working
qianl15 1976c12
Merge branch 'pydantic:main' into qian/dbos-agent
qianl15 fcca2dc
Merge branch 'pydantic:main' into qian/dbos-agent
qianl15 4fc68d5
multiple agents working
qianl15 9fc7793
more tests
qianl15 ecb316a
toolsets test
qianl15 f2f1044
more tests, handle stream in direct calls
qianl15 664e94d
more tests, handle stream in direct calls
qianl15 440a158
child workflow also works
qianl15 4024e89
clean up tests
qianl15 785a5fb
iter in workflow
qianl15 57886d3
workflow with model
qianl15 f496b52
dynamic toolset test
qianl15 3cd36e4
dynamic tools test
qianl15 c28d63c
model stream direct test
qianl15 eadcba8
unserializable deps test
qianl15 e1f843c
last test
qianl15 3483c6d
Merge branch 'pydantic:main' into qian/dbos-agent
qianl15 c91f2dc
Fix model
qianl15 de4683a
DBOS MCP server
qianl15 da7912f
Clean up todos
qianl15 0972f4e
test for spans
qianl15 247e7a0
oops revert version
qianl15 75a30f4
Fix tests
qianl15 0c60d39
Better step wrapping
qianl15 f72638d
wrong file
qianl15 a82c61e
fix test-loewst-versions
qianl15 1f529c2
fix coverage?
qianl15 554657e
uv.lock?
qianl15 a3d99dc
fix coverage
qianl15 97b0079
fix coverage
qianl15 fcbf4a8
debug
qianl15 c45ddfe
fix coverage, cleanup
qianl15 8e163cf
Merge branch 'pydantic:main' into qian/dbos-agent
qianl15 03bbee8
ignore resource warning
qianl15 cb48b19
No need for ConfiguredInstance in model and mcp_server
qianl15 6451554
Don't hard code serialization error handling
qianl15 c943e04
Merge branch 'pydantic:main' into qian/dbos-agent
qianl15 8a9bf3b
Remove DBOS dependency
qianl15 42fd4a3
dev dependency
qianl15 752e8dd
optional DBOS
qianl15 80b89a8
relax id requirement, make sure that BaseModel tool args work as expe…
qianl15 e6000ba
fix coverage
qianl15 ecd83a7
Merge branch 'main' into qian/dbos-agent
qianl15 5957602
don't change unrelated file
qianl15 8a5fa3a
Merge branch 'pydantic:main' into qian/dbos-agent
qianl15 81a9d7a
Test dynamic toolsets
qianl15 e551ee1
Merge remote-tracking branch 'pydantic/main' into qian/dbos-agent
qianl15 97b1949
Remove postgres dependency
qianl15 1e3f34b
formatting
qianl15 6252698
nits
qianl15 3288c26
Add DBOS to API references
qianl15 a4364f3
WIP: DBOS docs
qianl15 4bf1ce2
Merge branch 'pydantic:main' into qian/dbos-agent
qianl15 974f595
First draft of docs
qianl15 a936e40
Simplify DBOS docs
qianl15 6b1c20f
Merge branch 'pydantic:main' into qian/dbos-agent
qianl15 4edad0a
Merge branch 'main' into qian/dbos-agent
qianl15 0e71269
fix test
qianl15 35714b9
Update docs/api/durable_exec.md
qianl15 04e3fb5
Update docs/dbos.md
qianl15 5379ef4
Address comments
qianl15 e138158
Merge branch 'pydantic:main' into qian/dbos-agent
qianl15 7a61d2a
refactor docs
qianl15 67d454b
Recommend disabling DBOS OTLP traces when using logfire
qianl15 6a3ef13
DBOS human-in-the-loop working
qianl15 22f59a3
Support model retry
qianl15 95fa7e2
fix test
qianl15 c63e2bf
Merge branch 'main' into qian/dbos-agent
qianl15 ce43826
fix coverage
qianl15 bd4b4c4
Merge branch 'main' into qian/dbos-agent
qianl15 0d2ed25
clarify docs
qianl15 700f5e4
clean up
qianl15 418784d
Merge branch 'main' into qian/dbos-agent
qianl15 685c70b
Merge branch 'main' into qian/dbos-agent
qianl15 1001caa
add list steps test, mamke sure event_stream_handler runs in a step
qianl15 c7fdda9
event stream handler step
qianl15 c8aa11c
fix semantics for event handler as a step
qianl15 255d977
Merge branch 'main' into qian/dbos-agent
qianl15 fc316bc
minor update
qianl15 7e96b10
Merge branch 'main' into qian/dbos-agent
qianl15 b963421
fix broken link
qianl15 747b4cf
clarify docs
qianl15 d82454a
update docs for review comments, fix broken links
qianl15 1106194
Merge branch 'pydantic:main' into qian/dbos-agent
qianl15 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
# `pydantic_ai.durable_exec` | ||
|
||
::: pydantic_ai.durable_exec.temporal | ||
|
||
::: pydantic_ai.durable_exec.dbos | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,160 @@ | ||
# Durable Execution with DBOS | ||
|
||
[DBOS](https://www.dbos.dev/) is a lightweight [durable execution](https://docs.dbos.dev/architecture) library natively integrated with Pydantic AI. | ||
|
||
## Durable Execution | ||
|
||
DBOS workflows make your program **durable** by checkpointing its state in a database. If your program ever fails, when it restarts all your workflows will automatically resume from the last completed step. | ||
|
||
* **Workflows** must be deterministic and generally cannot include I/O. | ||
* **Steps** may perform I/O (network, disk, API calls). If a step fails, it restarts from the beginning. | ||
|
||
Every workflow input and step output is durably stored in the system database. When workflow execution fails, whether from crashes, network issues, or server restarts, DBOS leverages these checkpoints to recover workflows from their last completed step. | ||
|
||
DBOS **queues** provide durable, database-backed alternatives to systems like Celery or BullMQ, supporting features such as concurrency limits, rate limits, timeouts, and prioritization. See the [DBOS docs](https://docs.dbos.dev/architecture) for details. | ||
|
||
The diagram below shows the overall architecture of an agentic application in DBOS. | ||
DBOS runs fully in-process as a library. Functions remain normal Python functions but are checkpointed into a database (Postgres or SQLite). | ||
|
||
```text | ||
Clients | ||
(HTTP, RPC, Kafka, etc.) | ||
| | ||
v | ||
+------------------------------------------------------+ | ||
| Application Servers | | ||
| | | ||
| +----------------------------------------------+ | | ||
| | Pydantic AI + DBOS Libraries | | | ||
| | | | | ||
| | [ Workflows (Agent Run Loop) ] | | | ||
| | [ Steps (Tool, MCP, Model) ] | | | ||
| | [ Queues ] [ Cron Jobs ] [ Messaging ] | | | ||
| +----------------------------------------------+ | | ||
| | | ||
+------------------------------------------------------+ | ||
| | ||
v | ||
+------------------------------------------------------+ | ||
| Database | | ||
| (Stores workflow and step state, schedules tasks) | | ||
+------------------------------------------------------+ | ||
``` | ||
|
||
See the [DBOS documentation](https://docs.dbos.dev/architecture) for more information. | ||
|
||
## Durable Agent | ||
|
||
Any agent can be wrapped in a [`DBOSAgent`][pydantic_ai.durable_exec.dbos.DBOSAgent] to get durable execution. `DBOSAgent` automatically:, | ||
|
||
* Wraps `Agent.run` and `Agent.run_sync` as DBOS workflows. | ||
* Wraps [model requests](../models/overview.md) and [MCP communication](../mcp/client.md) as DBOS steps. | ||
|
||
Custom tool functions and event stream handlers are not wrapped automatically. You can decorate them with `@DBOS.workflow` or `@DBOS.step` as needed. | ||
|
||
The original agent, model, and MCP server can still be used as normal outside the DBOS workflow. | ||
|
||
Here is a simple but complete example of wrapping an agent for durable execution. All it requires is to install Pydantic AI with the DBOS [open-source library](https://github.com/dbos-inc/dbos-transact-py): | ||
|
||
```sh | ||
pip/uv-add pydantic-ai[dbos] | ||
DouweM marked this conversation as resolved.
Show resolved
Hide resolved
|
||
``` | ||
|
||
```python {title="dbos_agent.py" test="skip"} | ||
from dbos import DBOS, DBOSConfig | ||
|
||
from pydantic_ai import Agent | ||
from pydantic_ai.durable_exec.dbos import DBOSAgent | ||
|
||
dbos_config: DBOSConfig = { | ||
'name': 'pydantic_dbos_agent', | ||
'system_database_url': 'sqlite:///dbostest.sqlite', # (3)! | ||
} | ||
DBOS(config=dbos_config) | ||
|
||
agent = Agent( | ||
'gpt-5', | ||
instructions="You're an expert in geography.", | ||
name='geography', # (4)! | ||
) | ||
|
||
dbos_agent = DBOSAgent(agent) # (1)! | ||
|
||
async def main(): | ||
DBOS.launch() | ||
result = await dbos_agent.run('What is the capital of Mexico?') # (2)! | ||
print(result.output) | ||
#> Mexico City (Ciudad de México, CDMX) | ||
``` | ||
|
||
1. Workflows and `DBOSAgent` must be defined before `DBOS.launch()` so that recovery can correctly find all workflows. | ||
2. [`DBOSAgent.run()`][pydantic_ai.durable_exec.dbos.DBOSAgent.run] works like [`Agent.run()`][pydantic_ai.Agent.run], but runs as a DBOS workflow and executes model requests, decorated tool calls, and MCP communication as DBOS steps. | ||
3. This example uses SQLite. Postgres is recommended for production. | ||
4. The agent's `name` is used to uniquely identify its workflows. | ||
|
||
_(This example is complete, it can be run "as is" — you'll need to add `asyncio.run(main())` to run `main`)_ | ||
|
||
Because DBOS workflows need to be defined before calling `DBOS.launch()` and the `DBOSAgent` instance automatically registers `run` and `run_sync` as workflows, it needs to be defined before calling `DBOS.launch()` as well. | ||
|
||
For more information on how to use DBOS in Python applications, see their [Python SDK guide](https://docs.dbos.dev/python/programming-guide). | ||
|
||
## DBOS Integration Considerations | ||
|
||
When using DBOS with Pydantic AI agents, there are a few important considerations to ensure workflows and toolsets behave correctly. | ||
|
||
### Agent and Toolset Requirements | ||
|
||
Each agent instance must have a unique `name` so DBOS can correctly resume workflows after a failure or restart. Tools that perform I/O or external interactions should be annotated as DBOS steps. | ||
|
||
Other than that, any agent and toolset will just work! | ||
|
||
### Agent Run Context and Dependencies | ||
|
||
DBOS checkpoints workflow inputs/outputs and step outputs into a database using `jsonpickle`. This means you need to make sure [dependencies](../dependencies.md) object provided to [`DBOSAgent.run()`][pydantic_ai.durable_exec.dbos.DBOSAgent.run] or [`DBOSAgent.run_sync()`][pydantic_ai.durable_exec.dbos.DBOSAgent.run_sync], and tool outputs can be serialized using jsonpickle. You may also want to keep the inputs and outputs small (under \~2 MB). PostgreSQL and SQLite support up to 1 GB per field, but large objects may impact performance. | ||
|
||
### Streaming | ||
|
||
Because DBOS cannot stream output directly to the workflow or step call site, [`Agent.run_stream()`][pydantic_ai.Agent.run_stream] is not supported when running inside of a DBOS workflow. | ||
|
||
Instead, you can implement streaming by setting an [`event_stream_handler`][pydantic_ai.agent.EventStreamHandler] on the `Agent` or `DBOSAgent` instance and using [`DBOSAgent.run()`][pydantic_ai.durable_exec.dbos.DBOSAgent.run]. | ||
The event stream handler function will receive the agent [run context][pydantic_ai.tools.RunContext] and an async iterable of events from the model's streaming response and the agent's execution of tools. For examples, see the [streaming docs](../agents.md#streaming-all-events). | ||
|
||
|
||
## Step Configuration | ||
|
||
You can customize DBOS step behavior, such as retries, by passing [`StepConfig`][pydantic_ai.durable_exec.dbos.StepConfig] objects to the `DBOSAgent` constructor: | ||
|
||
- `mcp_step_config`: The DBOS step config to use for MCP server communication. No retries if omitted. | ||
- `model_step_config`: The DBOS step config to use for model request steps. No retries if omitted. | ||
|
||
For custom tools, you can annotate them directly with [`@DBOS.step`](https://docs.dbos.dev/python/reference/decorators#step) or [`@DBOS.workflow`](https://docs.dbos.dev/python/reference/decorators#workflow) decorators as needed. These decorators have no effect outside DBOS workflows, so tools remain usable in non-DBOS agents. | ||
|
||
|
||
## Step Retries | ||
|
||
On top of the automatic retries for request failures that DBOS will perform, Pydantic AI and various provider API clients also have their own request retry logic. Enabling these at the same time may cause the request to be retried more often than expected, with improper `Retry-After` handling. | ||
|
||
When using DBOS, it's recommended to not use [HTTP Request Retries](../retries.md) and to turn off your provider API client's own retry logic, for example by setting `max_retries=0` on a [custom `OpenAIProvider` API client](../models/openai.md#custom-openai-client). | ||
|
||
You can customize DBOS's retry policy using [step configuration](#step-configuration). | ||
|
||
## Observability with Logfire | ||
|
||
When using [Pydantic Logfire](../logfire.md), we **recommend disabling DBOS's built-in OpenTelemetry tracing**. | ||
DBOS automatically wraps workflow and step execution in spans, while Pydantic AI and Logfire already emit spans for the same function calls, model requests, and tool invocations. Without disabling DBOS tracing, these operations may appear twice in your trace tree. | ||
|
||
To disable DBOS traces and logs, you can set `disable_otlp=True` in `DBOSConfig`. For example: | ||
|
||
|
||
```python {title="dbos_no_traces.py" test="skip"} | ||
from dbos import DBOS, DBOSConfig | ||
|
||
dbos_config: DBOSConfig = { | ||
'name': 'pydantic_dbos_agent', | ||
'system_database_url': 'sqlite:///dbostest.sqlite', | ||
'disable_otlp': True # (1)! | ||
} | ||
DBOS(config=dbos_config) | ||
``` | ||
|
||
1. If `True`, disables OpenTelemetry tracing and logging for DBOS. Default is `False`. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
# Durable Execution | ||
|
||
Pydantic AI allows you to build durable agents that can preserve their progress across transient API failures and application errors or restarts, and handle long-running, asynchronous, and human-in-the-loop workflows with production-grade reliability. Durable agents have full support for [streaming](../agents.md#streaming-all-events) and [MCP](../mcp/client.md), with the added benefit of fault tolerance. | ||
|
||
Pydantic AI natively supports two durable execution solutions: | ||
|
||
- [Temporal](./temporal.md) | ||
- [DBOS](./dbos.md) | ||
|
||
These integrations only uses Pydantic AI's public interface, so they also serve as a reference for integrating with other durable systems. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.