Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -233,3 +233,4 @@ libs/redis/docs/.Trash*
*.pyc
.ai
.claude
TASK_MEMORY.md
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ A memory layer for AI agents using Redis as the vector database.

- **Dual Interface**: REST API and Model Context Protocol (MCP) server
- **Two-Tier Memory**: Working memory (session-scoped) and long-term memory (persistent)
- **Configurable Memory Strategies**: Customize how memories are extracted (discrete, summary, preferences, custom)
- **Semantic Search**: Vector-based similarity search with metadata filtering
- **Flexible Backends**: Pluggable vector store factory system
- **AI Integration**: Automatic topic extraction, entity recognition, and conversation summarization
Expand Down
6 changes: 5 additions & 1 deletion agent_memory_server/docket_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
from docket import Docket

from agent_memory_server.config import settings
from agent_memory_server.extraction import extract_discrete_memories
from agent_memory_server.extraction import (
extract_discrete_memories,
extract_memories_with_strategy,
)
from agent_memory_server.long_term_memory import (
compact_long_term_memories,
delete_long_term_memories,
Expand All @@ -31,6 +34,7 @@
index_long_term_memories,
compact_long_term_memories,
extract_discrete_memories,
extract_memories_with_strategy,
promote_working_memory_to_long_term,
delete_long_term_memories,
forget_long_term_memories,
Expand Down
131 changes: 128 additions & 3 deletions agent_memory_server/extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from transformers import AutoModelForTokenClassification, AutoTokenizer, pipeline

from agent_memory_server.config import settings
from agent_memory_server.filters import DiscreteMemoryExtracted
from agent_memory_server.filters import DiscreteMemoryExtracted, MemoryType
from agent_memory_server.llms import (
AnthropicClientWrapper,
OpenAIClientWrapper,
Expand Down Expand Up @@ -312,8 +312,8 @@ async def extract_discrete_memories(
client = await get_model_client(settings.generation_model)

# Use vectorstore adapter to find messages that need discrete memory extraction
# TODO: Sort out circular imports
from agent_memory_server.filters import MemoryType
# Local imports to avoid circular dependencies:
# long_term_memory imports from extraction, so we import locally here
from agent_memory_server.long_term_memory import index_long_term_memories
from agent_memory_server.vectorstore_factory import get_vectorstore_adapter

Expand Down Expand Up @@ -408,3 +408,128 @@ async def extract_discrete_memories(
long_term_memories,
deduplicate=deduplicate,
)


async def extract_memories_with_strategy(
memories: list[MemoryRecord] | None = None,
deduplicate: bool = True,
):
"""
Extract memories using their configured strategies.

This function replaces extract_discrete_memories for strategy-aware extraction.
Each memory record contains its extraction strategy configuration.
"""
# Local imports to avoid circular dependencies:
# long_term_memory imports from extraction, so we import locally here
from agent_memory_server.long_term_memory import index_long_term_memories
from agent_memory_server.memory_strategies import get_memory_strategy
from agent_memory_server.vectorstore_factory import get_vectorstore_adapter

adapter = await get_vectorstore_adapter()

if not memories:
# If no memories are provided, search for any messages in long-term memory
# that haven't been processed for extraction
memories = []
offset = 0
while True:
search_result = await adapter.search_memories(
query="", # Empty query to get all messages
memory_type=MemoryType(eq="message"),
discrete_memory_extracted=DiscreteMemoryExtracted(eq="f"),
limit=25,
offset=offset,
)

logger.info(
f"Found {len(search_result.memories)} memories to extract: {[m.id for m in search_result.memories]}"
)

memories += search_result.memories

if len(search_result.memories) < 25:
break

offset += 25

# Group memories by extraction strategy for batch processing
strategy_groups = {}
for memory in memories:
if not memory or not memory.text:
logger.info(f"Deleting memory with no text: {memory}")
await adapter.delete_memories([memory.id])
continue

strategy_key = (
memory.extraction_strategy,
tuple(sorted(memory.extraction_strategy_config.items())),
)
if strategy_key not in strategy_groups:
strategy_groups[strategy_key] = []
strategy_groups[strategy_key].append(memory)

all_new_memories = []
all_updated_memories = []

# Process each strategy group
for (strategy_name, config_items), strategy_memories in strategy_groups.items():
logger.info(
f"Processing {len(strategy_memories)} memories with strategy: {strategy_name}"
)

# Get strategy instance
config_dict = dict(config_items)
try:
strategy = get_memory_strategy(strategy_name, **config_dict)
except ValueError as e:
logger.error(f"Unknown strategy {strategy_name}: {e}")
# Fall back to discrete strategy
strategy = get_memory_strategy("discrete")

# Process memories with this strategy
for memory in strategy_memories:
try:
extracted_memories = await strategy.extract_memories(memory.text)
all_new_memories.extend(extracted_memories)

# Update the memory to mark it as processed
updated_memory = memory.model_copy(
update={"discrete_memory_extracted": "t"}
)
all_updated_memories.append(updated_memory)

except Exception as e:
logger.error(
f"Error extracting memory {memory.id} with strategy {strategy_name}: {e}"
)
# Still mark as processed to avoid infinite retry
updated_memory = memory.model_copy(
update={"discrete_memory_extracted": "t"}
)
all_updated_memories.append(updated_memory)

# Update processed memories
if all_updated_memories:
await adapter.update_memories(all_updated_memories)

# Index new extracted memories
if all_new_memories:
long_term_memories = [
MemoryRecord(
id=str(ulid.ULID()),
text=new_memory["text"],
memory_type=new_memory.get("type", "episodic"),
topics=new_memory.get("topics", []),
entities=new_memory.get("entities", []),
discrete_memory_extracted="t",
extraction_strategy="discrete", # These are already extracted
extraction_strategy_config={},
)
for new_memory in all_new_memories
]

await index_long_term_memories(
long_term_memories,
deduplicate=deduplicate,
)
23 changes: 21 additions & 2 deletions agent_memory_server/long_term_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@

from agent_memory_server.config import settings
from agent_memory_server.dependencies import get_background_tasks
from agent_memory_server.extraction import extract_discrete_memories, handle_extraction
from agent_memory_server.extraction import (
extract_memories_with_strategy,
handle_extraction,
)
from agent_memory_server.filters import (
CreatedAt,
Entities,
Expand Down Expand Up @@ -846,7 +849,7 @@ async def index_long_term_memories(
# them as separate long-term memory records. This process also
# runs deduplication if requested.
await background_tasks.add_task(
extract_discrete_memories,
extract_memories_with_strategy,
memories=needs_extraction,
deduplicate=deduplicate,
)
Expand Down Expand Up @@ -1370,6 +1373,14 @@ async def promote_working_memory_to_long_term(
current_memory = deduped_memory or memory
current_memory.persisted_at = datetime.now(UTC)

# Set extraction strategy configuration from working memory
current_memory.extraction_strategy = (
current_working_memory.long_term_memory_strategy.strategy
)
current_memory.extraction_strategy_config = (
current_working_memory.long_term_memory_strategy.config
)

# Index the memory in long-term storage
await index_long_term_memories(
[current_memory],
Expand Down Expand Up @@ -1432,6 +1443,14 @@ async def promote_working_memory_to_long_term(
current_memory = deduped_memory or memory_record
current_memory.persisted_at = datetime.now(UTC)

# Set extraction strategy configuration from working memory
current_memory.extraction_strategy = (
current_working_memory.long_term_memory_strategy.strategy
)
current_memory.extraction_strategy_config = (
current_working_memory.long_term_memory_strategy.config
)

# Collect memory record for batch indexing
message_records_to_index.append(current_memory)

Expand Down
Loading