Coverage for src / agent / memory / manager.py: 100%

20 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-11 14:30 +0000

1# Copyright 2025-2026 Microsoft Corporation 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Abstract base class for memory managers. 

16 

17This module defines the interface for memory management operations. 

18""" 

19 

20from abc import ABC, abstractmethod 

21from typing import Any 

22 

23from agent.config.schema import AgentSettings 

24from agent.utils.responses import create_error_response, create_success_response 

25 

26 

27class MemoryManager(ABC): 

28 """Abstract base class for agent memory management. 

29 

30 Defines the interface for memory operations that all memory implementations 

31 must support. Follows the AgentToolset pattern with dependency injection 

32 and structured responses. 

33 

34 Example: 

35 >>> class CustomMemory(MemoryManager): 

36 ... async def add(self, messages): 

37 ... # Implementation 

38 ... return self._create_success_response(result=[], message="Added") 

39 """ 

40 

41 def __init__(self, config: AgentSettings): 

42 """Initialize memory manager with settings. 

43 

44 Args: 

45 config: Agent configuration with memory settings 

46 """ 

47 self.config = config 

48 

49 @abstractmethod 

50 async def add(self, messages: list[dict]) -> dict: 

51 """Add messages to memory storage. 

52 

53 Args: 

54 messages: List of message dicts with role and content 

55 

56 Returns: 

57 Structured response dict with success status 

58 

59 Example: 

60 >>> result = await manager.add([ 

61 ... {"role": "user", "content": "My name is Alice"} 

62 ... ]) 

63 """ 

64 pass 

65 

66 @abstractmethod 

67 async def search(self, query: str, limit: int = 5) -> dict: 

68 """Search memories by keyword query. 

69 

70 Args: 

71 query: Search query string 

72 limit: Maximum number of results 

73 

74 Returns: 

75 Structured response dict with matching memories 

76 

77 Example: 

78 >>> result = await manager.search("Alice", limit=5) 

79 """ 

80 pass 

81 

82 @abstractmethod 

83 async def get_all(self) -> dict: 

84 """Get all memories from storage. 

85 

86 Returns: 

87 Structured response dict with all memories 

88 

89 Example: 

90 >>> result = await manager.get_all() 

91 """ 

92 pass 

93 

94 @abstractmethod 

95 async def get_recent(self, limit: int = 10) -> dict: 

96 """Get recent memories. 

97 

98 Args: 

99 limit: Number of recent memories to retrieve 

100 

101 Returns: 

102 Structured response dict with recent memories 

103 

104 Example: 

105 >>> result = await manager.get_recent(limit=10) 

106 """ 

107 pass 

108 

109 @abstractmethod 

110 async def clear(self) -> dict: 

111 """Clear all memories from storage. 

112 

113 Returns: 

114 Structured response dict with success status 

115 

116 Example: 

117 >>> result = await manager.clear() 

118 """ 

119 pass 

120 

121 async def retrieve_for_context(self, messages: list[dict], limit: int = 10) -> dict: 

122 """Retrieve memories relevant for context injection. 

123 

124 Default implementation extracts query from latest user message and searches, 

125 falling back to recent memories if no query available. Backends can override 

126 for optimized semantic retrieval. 

127 

128 Args: 

129 messages: Current conversation messages (list of dicts with role/content) 

130 limit: Maximum number of memories to retrieve 

131 

132 Returns: 

133 Structured response dict with relevant memories 

134 

135 Example: 

136 >>> result = await manager.retrieve_for_context( 

137 ... [{"role": "user", "content": "What's my name?"}], 

138 ... limit=5 

139 ... ) 

140 

141 Note: 

142 Implementations can use different strategies: 

143 - InMemoryStore: Keyword-based search from current message 

144 - Mem0Store: Semantic similarity search (future) 

145 """ 

146 # Extract query from latest user message 

147 query = None 

148 for msg in reversed(messages): 

149 if isinstance(msg, dict) and msg.get("role") == "user": 

150 query = msg.get("content", "").strip() 

151 break 

152 

153 # Search if we have a query, otherwise get recent 

154 if query: 

155 return await self.search(query, limit=limit) 

156 else: 

157 return await self.get_recent(limit=limit) 

158 

159 def _create_success_response(self, result: Any, message: str = "") -> dict: 

160 """Create standardized success response. 

161 

162 Args: 

163 result: Operation result 

164 message: Optional success message 

165 

166 Returns: 

167 Structured response dict with success=True 

168 """ 

169 return create_success_response(result, message) 

170 

171 def _create_error_response(self, error: str, message: str) -> dict: 

172 """Create standardized error response. 

173 

174 Args: 

175 error: Machine-readable error code 

176 message: Human-friendly error message 

177 

178 Returns: 

179 Structured response dict with success=False 

180 """ 

181 return create_error_response(error, message)