Coverage for src / agent / memory / store.py: 100%

46 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-11 14:30 +0000

1# Copyright 2025-2026 Microsoft Corporation 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""In-memory implementation of memory storage. 

16 

17This module provides the default in-memory storage for agent memories. 

18""" 

19 

20import logging 

21from datetime import datetime 

22 

23from agent.config.schema import AgentSettings 

24from agent.memory.manager import MemoryManager 

25 

26logger = logging.getLogger(__name__) 

27 

28 

29class InMemoryStore(MemoryManager): 

30 """In-memory storage implementation for agent memories. 

31 

32 Stores messages with metadata in memory, providing search, filtering, 

33 and retrieval capabilities without external dependencies. 

34 

35 Attributes: 

36 config: Agent configuration 

37 memories: List of stored memory entries 

38 

39 Example: 

40 >>> config = AgentConfig(memory_enabled=True) 

41 >>> store = InMemoryStore(config) 

42 >>> await store.add([{"role": "user", "content": "Hello"}]) 

43 """ 

44 

45 def __init__(self, config: AgentSettings): 

46 """Initialize in-memory store. 

47 

48 Args: 

49 config: Agent configuration with memory settings 

50 """ 

51 super().__init__(config) 

52 self.memories: list[dict] = [] 

53 

54 async def add(self, messages: list[dict]) -> dict: 

55 """Add messages to memory storage. 

56 

57 Each message is stored with metadata including timestamp, type, and content. 

58 Messages are stored in chronological order. 

59 

60 Args: 

61 messages: List of message dicts with role and content 

62 

63 Returns: 

64 Structured response dict with success status and added memory IDs 

65 

66 Example: 

67 >>> await store.add([ 

68 ... {"role": "user", "content": "My name is Alice"}, 

69 ... {"role": "assistant", "content": "Nice to meet you, Alice!"} 

70 ... ]) 

71 """ 

72 if not messages: 

73 return self._create_error_response( 

74 error="invalid_input", message="No messages provided" 

75 ) 

76 

77 added_memories = [] 

78 

79 for msg in messages: 

80 # Validate message structure 

81 if not isinstance(msg, dict) or "role" not in msg or "content" not in msg: 

82 logger.warning(f"Skipping invalid message: {msg}") 

83 continue 

84 

85 # Create memory entry with metadata (timestamp per message for accuracy) 

86 memory_entry = { 

87 "id": len(self.memories), # Simple incrementing ID 

88 "role": msg.get("role", "unknown"), 

89 "content": msg.get("content", ""), 

90 "timestamp": datetime.now().isoformat(), 

91 "metadata": msg.get("metadata", {}), 

92 } 

93 

94 self.memories.append(memory_entry) 

95 added_memories.append(memory_entry["id"]) 

96 

97 logger.debug(f"Added {len(added_memories)} messages to memory") 

98 

99 return self._create_success_response( 

100 result=added_memories, message=f"Added {len(added_memories)} messages to memory" 

101 ) 

102 

103 async def search(self, query: str, limit: int = 5) -> dict: 

104 """Search memories by keyword query. 

105 

106 Performs case-insensitive keyword search across message content. 

107 Returns memories ranked by relevance (number of keyword matches). 

108 

109 Args: 

110 query: Search query string (keywords) 

111 limit: Maximum number of results 

112 

113 Returns: 

114 Structured response dict with matching memories 

115 

116 Example: 

117 >>> result = await store.search("Alice name", limit=5) 

118 """ 

119 if not query or not query.strip(): 

120 return self._create_error_response( 

121 error="invalid_query", message="Search query cannot be empty" 

122 ) 

123 

124 query_lower = query.lower() 

125 keywords = query_lower.split() 

126 

127 # Search and rank by relevance 

128 matches = [] 

129 for memory in self.memories: 

130 content_lower = memory.get("content", "").lower() 

131 

132 # Count keyword matches 

133 match_count = sum(1 for keyword in keywords if keyword in content_lower) 

134 

135 if match_count > 0: 

136 matches.append((memory, match_count)) 

137 

138 # Sort by relevance (match count) and recency 

139 matches.sort(key=lambda x: (x[1], x[0].get("timestamp", "")), reverse=True) 

140 

141 # Extract top matches 

142 results = [match[0] for match in matches[:limit]] 

143 

144 logger.debug(f"Search for '{query}' returned {len(results)} results") 

145 

146 return self._create_success_response( 

147 result=results, 

148 message=f"Found {len(results)} matching memories for query: {query}", 

149 ) 

150 

151 async def get_all(self) -> dict: 

152 """Get all memories from storage. 

153 

154 Returns: 

155 Structured response dict with all memories 

156 """ 

157 return self._create_success_response(result=self.memories, message="Retrieved all memories") 

158 

159 async def get_recent(self, limit: int = 10) -> dict: 

160 """Get recent memories. 

161 

162 Returns the most recent memories based on storage order (chronological). 

163 More recent memories are at the end of the list. 

164 

165 Args: 

166 limit: Number of recent memories to retrieve 

167 

168 Returns: 

169 Structured response dict with recent memories 

170 """ 

171 recent = self.memories[-limit:] if self.memories else [] 

172 return self._create_success_response( 

173 result=recent, message=f"Retrieved {len(recent)} recent memories" 

174 ) 

175 

176 async def clear(self) -> dict: 

177 """Clear all memories from storage. 

178 

179 Returns: 

180 Structured response dict with success status 

181 """ 

182 count = len(self.memories) 

183 self.memories = [] 

184 return self._create_success_response(result=None, message=f"Cleared {count} memories")