Coverage for src / agent / memory / store.py: 100%
46 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 14:30 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 14:30 +0000
1# Copyright 2025-2026 Microsoft Corporation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""In-memory implementation of memory storage.
17This module provides the default in-memory storage for agent memories.
18"""
20import logging
21from datetime import datetime
23from agent.config.schema import AgentSettings
24from agent.memory.manager import MemoryManager
26logger = logging.getLogger(__name__)
29class InMemoryStore(MemoryManager):
30 """In-memory storage implementation for agent memories.
32 Stores messages with metadata in memory, providing search, filtering,
33 and retrieval capabilities without external dependencies.
35 Attributes:
36 config: Agent configuration
37 memories: List of stored memory entries
39 Example:
40 >>> config = AgentConfig(memory_enabled=True)
41 >>> store = InMemoryStore(config)
42 >>> await store.add([{"role": "user", "content": "Hello"}])
43 """
45 def __init__(self, config: AgentSettings):
46 """Initialize in-memory store.
48 Args:
49 config: Agent configuration with memory settings
50 """
51 super().__init__(config)
52 self.memories: list[dict] = []
54 async def add(self, messages: list[dict]) -> dict:
55 """Add messages to memory storage.
57 Each message is stored with metadata including timestamp, type, and content.
58 Messages are stored in chronological order.
60 Args:
61 messages: List of message dicts with role and content
63 Returns:
64 Structured response dict with success status and added memory IDs
66 Example:
67 >>> await store.add([
68 ... {"role": "user", "content": "My name is Alice"},
69 ... {"role": "assistant", "content": "Nice to meet you, Alice!"}
70 ... ])
71 """
72 if not messages:
73 return self._create_error_response(
74 error="invalid_input", message="No messages provided"
75 )
77 added_memories = []
79 for msg in messages:
80 # Validate message structure
81 if not isinstance(msg, dict) or "role" not in msg or "content" not in msg:
82 logger.warning(f"Skipping invalid message: {msg}")
83 continue
85 # Create memory entry with metadata (timestamp per message for accuracy)
86 memory_entry = {
87 "id": len(self.memories), # Simple incrementing ID
88 "role": msg.get("role", "unknown"),
89 "content": msg.get("content", ""),
90 "timestamp": datetime.now().isoformat(),
91 "metadata": msg.get("metadata", {}),
92 }
94 self.memories.append(memory_entry)
95 added_memories.append(memory_entry["id"])
97 logger.debug(f"Added {len(added_memories)} messages to memory")
99 return self._create_success_response(
100 result=added_memories, message=f"Added {len(added_memories)} messages to memory"
101 )
103 async def search(self, query: str, limit: int = 5) -> dict:
104 """Search memories by keyword query.
106 Performs case-insensitive keyword search across message content.
107 Returns memories ranked by relevance (number of keyword matches).
109 Args:
110 query: Search query string (keywords)
111 limit: Maximum number of results
113 Returns:
114 Structured response dict with matching memories
116 Example:
117 >>> result = await store.search("Alice name", limit=5)
118 """
119 if not query or not query.strip():
120 return self._create_error_response(
121 error="invalid_query", message="Search query cannot be empty"
122 )
124 query_lower = query.lower()
125 keywords = query_lower.split()
127 # Search and rank by relevance
128 matches = []
129 for memory in self.memories:
130 content_lower = memory.get("content", "").lower()
132 # Count keyword matches
133 match_count = sum(1 for keyword in keywords if keyword in content_lower)
135 if match_count > 0:
136 matches.append((memory, match_count))
138 # Sort by relevance (match count) and recency
139 matches.sort(key=lambda x: (x[1], x[0].get("timestamp", "")), reverse=True)
141 # Extract top matches
142 results = [match[0] for match in matches[:limit]]
144 logger.debug(f"Search for '{query}' returned {len(results)} results")
146 return self._create_success_response(
147 result=results,
148 message=f"Found {len(results)} matching memories for query: {query}",
149 )
151 async def get_all(self) -> dict:
152 """Get all memories from storage.
154 Returns:
155 Structured response dict with all memories
156 """
157 return self._create_success_response(result=self.memories, message="Retrieved all memories")
159 async def get_recent(self, limit: int = 10) -> dict:
160 """Get recent memories.
162 Returns the most recent memories based on storage order (chronological).
163 More recent memories are at the end of the list.
165 Args:
166 limit: Number of recent memories to retrieve
168 Returns:
169 Structured response dict with recent memories
170 """
171 recent = self.memories[-limit:] if self.memories else []
172 return self._create_success_response(
173 result=recent, message=f"Retrieved {len(recent)} recent memories"
174 )
176 async def clear(self) -> dict:
177 """Clear all memories from storage.
179 Returns:
180 Structured response dict with success status
181 """
182 count = len(self.memories)
183 self.memories = []
184 return self._create_success_response(result=None, message=f"Cleared {count} memories")