@@ -187,9 +187,8 @@ async def run( # noqa: C901
187
187
messages = ctx_messages .messages
188
188
ctx_messages .used = True
189
189
190
- message_history = _clean_message_history (ctx .state .message_history )
191
- # Add message history to the `capture_run_messages` list, which will be empty at this point
192
- messages .extend (message_history )
190
+ # Replace the `capture_run_messages` list with the message history
191
+ messages [:] = _clean_message_history (ctx .state .message_history )
193
192
# Use the `capture_run_messages` list as the message history so that new messages are added to it
194
193
ctx .state .message_history = messages
195
194
ctx .deps .new_message_index = len (messages )
@@ -456,7 +455,18 @@ async def _prepare_request(
456
455
# This will raise errors for any tool name conflicts
457
456
ctx .deps .tool_manager = await ctx .deps .tool_manager .for_run_step (run_context )
458
457
459
- message_history = await _process_message_history (ctx .state , ctx .deps .history_processors , run_context )
458
+ original_history = ctx .state .message_history [:]
459
+ message_history = await _process_message_history (original_history , ctx .deps .history_processors , run_context )
460
+ # Never merge the new `ModelRequest` with the one preceding it, to keep `new_messages()` from accidentally including part of the existing message history
461
+ message_history = [* _clean_message_history (message_history [:- 1 ]), message_history [- 1 ]]
462
+ # `ctx.state.message_history` is the same list used by `capture_run_messages`, so we should replace its contents, not the reference
463
+ ctx .state .message_history [:] = message_history
464
+ # Update the new message index to ensure `result.new_messages()` returns the correct messages
465
+ ctx .deps .new_message_index -= len (original_history ) - len (message_history )
466
+
467
+ # Do one more cleaning pass to merge possible consecutive trailing `ModelRequest`s into one, with tool call parts before user parts,
468
+ # but don't store it in the message history on state.
469
+ # See `tests/test_tools.py::test_parallel_tool_return_with_deferred` for an example where this is necessary
460
470
message_history = _clean_message_history (message_history )
461
471
462
472
model_request_parameters = await _prepare_request_parameters (ctx )
@@ -1087,12 +1097,11 @@ def build_agent_graph(
1087
1097
1088
1098
1089
1099
async def _process_message_history (
1090
- state : GraphAgentState ,
1100
+ messages : list [ _messages . ModelMessage ] ,
1091
1101
processors : Sequence [HistoryProcessor [DepsT ]],
1092
1102
run_context : RunContext [DepsT ],
1093
1103
) -> list [_messages .ModelMessage ]:
1094
1104
"""Process message history through a sequence of processors."""
1095
- messages = state .message_history
1096
1105
for processor in processors :
1097
1106
takes_ctx = is_takes_ctx (processor )
1098
1107
@@ -1110,8 +1119,12 @@ async def _process_message_history(
1110
1119
sync_processor = cast (_HistoryProcessorSync , processor )
1111
1120
messages = await run_in_executor (sync_processor , messages )
1112
1121
1113
- # Replaces the message history in the state with the processed messages
1114
- state .message_history = messages
1122
+ if len (messages ) == 0 :
1123
+ raise exceptions .UserError ('Processed history cannot be empty.' )
1124
+
1125
+ if not isinstance (messages [- 1 ], _messages .ModelRequest ):
1126
+ raise exceptions .UserError ('Processed history must end with a `ModelRequest`.' )
1127
+
1115
1128
return messages
1116
1129
1117
1130
0 commit comments