Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions agent_memory_server/docket_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

from agent_memory_server.config import settings
from agent_memory_server.extraction import (
extract_discrete_memories,
extract_memories_with_strategy,
)
from agent_memory_server.long_term_memory import (
Expand All @@ -33,7 +32,6 @@
summarize_session,
index_long_term_memories,
compact_long_term_memories,
extract_discrete_memories,
extract_memories_with_strategy,
promote_working_memory_to_long_term,
delete_long_term_memories,
Expand Down
196 changes: 0 additions & 196 deletions agent_memory_server/extraction.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import json
import os
from datetime import datetime
from typing import TYPE_CHECKING, Any

import ulid
Expand Down Expand Up @@ -215,201 +214,6 @@ async def handle_extraction(text: str) -> tuple[list[str], list[str]]:
return topics, entities


DISCRETE_EXTRACTION_PROMPT = """
You are a long-memory manager. Your job is to analyze text and extract
information that might be useful in future conversations with users.

CURRENT CONTEXT:
Current date and time: {current_datetime}

Extract two types of memories:
1. EPISODIC: Personal experiences specific to a user or agent.
Example: "User prefers window seats" or "User had a bad experience in Paris"

2. SEMANTIC: User preferences and general knowledge outside of your training data.
Example: "Trek discontinued the Trek 520 steel touring bike in 2023"

CONTEXTUAL GROUNDING REQUIREMENTS:
When extracting memories, you must resolve all contextual references to their concrete referents:

1. PRONOUNS: Replace ALL pronouns (he/she/they/him/her/them/his/hers/theirs) with the actual person's name, EXCEPT for the application user, who must always be referred to as "User".
- "He loves coffee" → "User loves coffee" (if "he" refers to the user)
- "I told her about it" → "User told colleague about it" (if "her" refers to a colleague)
- "Her experience is valuable" → "User's experience is valuable" (if "her" refers to the user)
- "My name is Alice and I prefer tea" → "User prefers tea" (do NOT store the application user's given name in text)
- NEVER leave pronouns unresolved - always replace with the specific person's name

2. TEMPORAL REFERENCES: Convert relative time expressions to absolute dates/times using the current datetime provided above
- "yesterday" → specific date (e.g., "March 15, 2025" if current date is March 16, 2025)
- "last year" → specific year (e.g., "2024" if current year is 2025)
- "three months ago" → specific month/year (e.g., "December 2024" if current date is March 2025)
- "next week" → specific date range (e.g., "December 22-28, 2024" if current date is December 15, 2024)
- "tomorrow" → specific date (e.g., "December 16, 2024" if current date is December 15, 2024)
- "last month" → specific month/year (e.g., "November 2024" if current date is December 2024)

3. SPATIAL REFERENCES: Resolve place references to specific locations
- "there" → "San Francisco" (if referring to San Francisco)
- "that place" → "Chez Panisse restaurant" (if referring to that restaurant)
- "here" → "the office" (if referring to the office)

4. DEFINITE REFERENCES: Resolve definite articles to specific entities
- "the meeting" → "the quarterly planning meeting"
- "the document" → "the budget proposal document"

For each memory, return a JSON object with the following fields:
- type: str -- The memory type, either "episodic" or "semantic"
- text: str -- The actual information to store (with all contextual references grounded)
- topics: list[str] -- The topics of the memory (top {top_k_topics})
- entities: list[str] -- The entities of the memory

Return a list of memories, for example:
{{
"memories": [
{{
"type": "semantic",
"text": "User prefers window seats",
"topics": ["travel", "airline"],
"entities": ["User", "window seat"],
}},
{{
"type": "episodic",
"text": "Trek discontinued the Trek 520 steel touring bike in 2023",
"topics": ["travel", "bicycle"],
"entities": ["Trek", "Trek 520 steel touring bike"],
}},
]
}}

IMPORTANT RULES:
1. Only extract information that would be genuinely useful for future interactions.
2. Do not extract procedural knowledge - that is handled by the system's built-in tools and prompts.
3. You are a large language model - do not extract facts that you already know.
4. CRITICAL: ALWAYS ground ALL contextual references - never leave ANY pronouns, relative times, or vague place references unresolved. For the application user, always use "User" instead of their given name to avoid stale naming if they change their profile name later.
5. MANDATORY: Replace every instance of "he/she/they/him/her/them/his/hers/theirs" with the actual person's name.
6. MANDATORY: Replace possessive pronouns like "her experience" with "User's experience" (if "her" refers to the user).
7. If you cannot determine what a contextual reference refers to, either omit that memory or use generic terms like "someone" instead of ungrounded pronouns.

Message:
{message}

STEP-BY-STEP PROCESS:
1. First, identify all pronouns in the text: he, she, they, him, her, them, his, hers, theirs
2. Determine what person each pronoun refers to based on the context
3. Replace every single pronoun with the actual person's name
4. Extract the grounded memories with NO pronouns remaining

Extracted memories:
"""


async def extract_discrete_memories(
memories: list[MemoryRecord] | None = None,
deduplicate: bool = True,
):
"""
Extract episodic and semantic memories from text using an LLM.
"""
client = await get_model_client(settings.generation_model)

# Use vectorstore adapter to find messages that need discrete memory extraction
# Local imports to avoid circular dependencies:
# long_term_memory imports from extraction, so we import locally here
from agent_memory_server.long_term_memory import index_long_term_memories
from agent_memory_server.vectorstore_factory import get_vectorstore_adapter

adapter = await get_vectorstore_adapter()

if not memories:
# If no memories are provided, search for any messages in long-term memory
# that haven't been processed for discrete extraction

memories = []
offset = 0
while True:
search_result = await adapter.search_memories(
query="", # Empty query to get all messages
memory_type=MemoryType(eq="message"),
discrete_memory_extracted=DiscreteMemoryExtracted(eq="f"),
limit=25,
offset=offset,
)

logger.info(
f"Found {len(search_result.memories)} memories to extract: {[m.id for m in search_result.memories]}"
)

memories += search_result.memories

if len(search_result.memories) < 25:
break

offset += 25

new_discrete_memories = []
updated_memories = []

for memory in memories:
if not memory or not memory.text:
logger.info(f"Deleting memory with no text: {memory}")
await adapter.delete_memories([memory.id])
continue

async for attempt in AsyncRetrying(stop=stop_after_attempt(3)):
with attempt:
response = await client.create_chat_completion(
model=settings.generation_model,
prompt=DISCRETE_EXTRACTION_PROMPT.format(
message=memory.text,
top_k_topics=settings.top_k_topics,
current_datetime=datetime.now().strftime(
"%A, %B %d, %Y at %I:%M %p %Z"
),
),
response_format={"type": "json_object"},
)
try:
new_message = json.loads(response.choices[0].message.content)
except json.JSONDecodeError:
logger.error(
f"Error decoding JSON: {response.choices[0].message.content}"
)
raise
try:
assert isinstance(new_message, dict)
assert isinstance(new_message["memories"], list)
except AssertionError:
logger.error(
f"Invalid response format: {response.choices[0].message.content}"
)
raise
new_discrete_memories.extend(new_message["memories"])

# Update the memory to mark it as processed using the vectorstore adapter
updated_memory = memory.model_copy(update={"discrete_memory_extracted": "t"})
updated_memories.append(updated_memory)

if updated_memories:
await adapter.update_memories(updated_memories)

if new_discrete_memories:
long_term_memories = [
MemoryRecord(
id=str(ulid.ULID()),
text=new_memory["text"],
memory_type=new_memory.get("type", "episodic"),
topics=new_memory.get("topics", []),
entities=new_memory.get("entities", []),
discrete_memory_extracted="t",
)
for new_memory in new_discrete_memories
]

await index_long_term_memories(
long_term_memories,
deduplicate=deduplicate,
)


async def extract_memories_with_strategy(
memories: list[MemoryRecord] | None = None,
deduplicate: bool = True,
Expand Down
Loading