Coverage for src / agent / update.py: 90%

129 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-11 14:30 +0000

1# Copyright 2025-2026 Microsoft Corporation 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Self-update functionality for osdu-agent. 

16 

17This module provides functions to check for updates, detect the package installer, 

18query GitLab for the latest version and release notes, and execute upgrades. 

19""" 

20 

21import logging 

22import shutil 

23import subprocess 

24from importlib.metadata import PackageNotFoundError, version 

25from typing import Literal 

26 

27import gitlab 

28from packaging.version import Version 

29from rich.console import Console 

30from rich.markdown import Markdown 

31from rich.panel import Panel 

32 

33from agent.config.constants import ( 

34 OSDU_GITLAB_URL, 

35 PACKAGE_NAME, 

36 PACKAGE_REGISTRY_PROJECT_ID, 

37 PACKAGE_REGISTRY_URL, 

38) 

39 

40logger = logging.getLogger(__name__) 

41console = Console() 

42 

43 

44def get_current_version() -> str: 

45 """Get the currently installed version of osdu-agent. 

46 

47 Returns: 

48 The installed version string, or "0.1.0" if not found (development fallback). 

49 """ 

50 try: 

51 return version(PACKAGE_NAME) 

52 except PackageNotFoundError: 

53 return "0.1.0" # Fallback for development 

54 

55 

56def compare_versions(current: str, latest: str) -> int: 

57 """Compare two version strings. 

58 

59 Args: 

60 current: The current version string. 

61 latest: The latest version string to compare against. 

62 

63 Returns: 

64 -1 if current < latest, 0 if equal, 1 if current > latest. 

65 """ 

66 try: 

67 v_current = Version(current) 

68 v_latest = Version(latest) 

69 if v_current < v_latest: 

70 return -1 

71 elif v_current > v_latest: 

72 return 1 

73 return 0 

74 except Exception: 

75 # Fallback to string comparison if parsing fails 

76 return 0 if current == latest else -1 

77 

78 

79def detect_installer() -> Literal["uv", "pipx"] | None: 

80 """Detect which package manager installed osdu-agent. 

81 

82 Checks uv first, then pipx, by querying their tool lists. 

83 

84 Returns: 

85 "uv" if installed via uv tool, "pipx" if installed via pipx, 

86 or None if neither can confirm the installation. 

87 """ 

88 # Check uv first 

89 if shutil.which("uv"): 

90 try: 

91 result = subprocess.run( 

92 ["uv", "tool", "list"], 

93 capture_output=True, 

94 text=True, 

95 timeout=10, 

96 ) 

97 if PACKAGE_NAME in result.stdout: 

98 return "uv" 

99 except (subprocess.TimeoutExpired, subprocess.SubprocessError): 

100 pass 

101 

102 # Check pipx 

103 if shutil.which("pipx"): 

104 try: 

105 result = subprocess.run( 

106 ["pipx", "list"], 

107 capture_output=True, 

108 text=True, 

109 timeout=10, 

110 ) 

111 if PACKAGE_NAME in result.stdout: 

112 return "pipx" 

113 except (subprocess.TimeoutExpired, subprocess.SubprocessError): 

114 pass 

115 

116 return None 

117 

118 

119def fetch_latest_version(gitlab_token: str | None = None) -> str | None: 

120 """Fetch the latest version from GitLab Package Registry. 

121 

122 Args: 

123 gitlab_token: Optional GitLab private token for API access. 

124 

125 Returns: 

126 The latest version string, or None if unable to fetch. 

127 """ 

128 try: 

129 gl = gitlab.Gitlab(OSDU_GITLAB_URL, private_token=gitlab_token) 

130 project = gl.projects.get(PACKAGE_REGISTRY_PROJECT_ID) 

131 

132 # Get packages for this project (only need the latest one) 

133 packages = project.packages.list( 

134 package_name=PACKAGE_NAME, 

135 package_type="pypi", 

136 order_by="created_at", 

137 sort="desc", 

138 per_page=1, 

139 get_all=False, # Suppress pagination warning - we only want the latest 

140 ) 

141 

142 if packages: 

143 return str(packages[0].version) 

144 return None 

145 except Exception as e: 

146 logger.debug(f"Failed to fetch latest version: {e}") 

147 return None 

148 

149 

150def fetch_releases(gitlab_token: str | None = None, limit: int = 5) -> list[dict]: 

151 """Fetch recent releases from GitLab. 

152 

153 Args: 

154 gitlab_token: Optional GitLab private token for API access. 

155 limit: Maximum number of releases to fetch. 

156 

157 Returns: 

158 List of release dictionaries with tag_name, name, description, released_at. 

159 """ 

160 try: 

161 gl = gitlab.Gitlab(OSDU_GITLAB_URL, private_token=gitlab_token) 

162 project = gl.projects.get(PACKAGE_REGISTRY_PROJECT_ID) 

163 releases = project.releases.list(per_page=limit) 

164 

165 return [ 

166 { 

167 "tag_name": r.tag_name, 

168 "name": r.name, 

169 "description": r.description, 

170 "released_at": r.released_at, 

171 } 

172 for r in releases 

173 ] 

174 except Exception as e: 

175 logger.debug(f"Failed to fetch releases: {e}") 

176 return [] 

177 

178 

179def format_changelog( 

180 releases: list[dict], 

181 current_version: str, 

182 latest_version: str, 

183) -> str: 

184 """Format changelog entries between current and latest version. 

185 

186 Args: 

187 releases: List of release dictionaries from fetch_releases(). 

188 current_version: The currently installed version. 

189 latest_version: The latest available version. 

190 

191 Returns: 

192 Formatted changelog string, or "No changelog available" if none found. 

193 """ 

194 changelog_parts = [] 

195 

196 for release in releases: 

197 tag = release["tag_name"] 

198 release_version = tag.lstrip("v") 

199 

200 # Skip versions older than or equal to current 

201 if compare_versions(current_version, release_version) >= 0: 

202 continue 

203 

204 # Skip versions newer than latest (shouldn't happen normally) 

205 if compare_versions(latest_version, release_version) < 0: 

206 continue 

207 

208 description = release.get("description") or "No release notes" 

209 changelog_parts.append(description) 

210 

211 return "\n\n---\n\n".join(changelog_parts) if changelog_parts else "No changelog available" 

212 

213 

214def display_changelog(changelog: str) -> None: 

215 """Display formatted changelog in terminal using Rich. 

216 

217 Args: 

218 changelog: The changelog text in markdown format. 

219 """ 

220 md = Markdown(changelog) 

221 console.print(Panel(md, title="What's New", border_style="cyan")) 

222 

223 

224def upgrade_package(installer: Literal["uv", "pipx"]) -> tuple[bool, str]: 

225 """Execute upgrade command and return result. 

226 

227 Args: 

228 installer: The package manager to use ("uv" or "pipx"). 

229 

230 Returns: 

231 Tuple of (success, output) where success is True if upgrade succeeded. 

232 """ 

233 try: 

234 if installer == "uv": 

235 cmd = [ 

236 "uv", 

237 "tool", 

238 "upgrade", 

239 PACKAGE_NAME, 

240 "--extra-index-url", 

241 PACKAGE_REGISTRY_URL, 

242 ] 

243 elif installer == "pipx": 

244 cmd = [ 

245 "pipx", 

246 "upgrade", 

247 PACKAGE_NAME, 

248 "--pip-args", 

249 f"--extra-index-url {PACKAGE_REGISTRY_URL}", 

250 ] 

251 else: 

252 return False, f"Unknown installer: {installer}" 

253 

254 result = subprocess.run( 

255 cmd, 

256 capture_output=True, 

257 text=True, 

258 timeout=180, # 3 minutes timeout 

259 ) 

260 

261 output = result.stdout + result.stderr 

262 return result.returncode == 0, output 

263 

264 except subprocess.TimeoutExpired: 

265 return False, "Upgrade timed out after 3 minutes" 

266 except FileNotFoundError: 

267 return False, f"{installer} command not found" 

268 except Exception as e: 

269 return False, str(e) 

270 

271 

272def update_bundled_skills() -> tuple[bool, str]: 

273 """Trigger update for bundled skills with CLIs. 

274 

275 Discovers all bundled skills that have CLI tools installed and runs 

276 their update commands. 

277 

278 Returns: 

279 Tuple of (success, message) indicating update result. 

280 """ 

281 # List of bundled skills with CLIs that support 'update' command 

282 bundled_cli_skills = ["osdu-quality", "osdu-engagement", "osdu-activity"] 

283 

284 updated = [] 

285 skipped = [] 

286 failed = [] 

287 

288 for skill_name in bundled_cli_skills: 

289 cli_path = shutil.which(skill_name) 

290 if not cli_path: 

291 skipped.append(skill_name) 

292 continue 

293 

294 try: 

295 console.print(f"\n[bold]Updating {skill_name}...[/bold]") 

296 console.print(f"[dim white]Running: {skill_name} update[/dim white]") 

297 

298 result = subprocess.run( 

299 [skill_name, "update"], 

300 capture_output=False, # Show output in real-time 

301 timeout=300, # 5 minutes timeout 

302 ) 

303 

304 if result.returncode == 0: 

305 updated.append(skill_name) 

306 else: 

307 failed.append(f"{skill_name} (exit code {result.returncode})") 

308 

309 except subprocess.TimeoutExpired: 

310 failed.append(f"{skill_name} (timeout)") 

311 except FileNotFoundError: 

312 skipped.append(skill_name) 

313 except Exception as e: 

314 failed.append(f"{skill_name} ({e})") 

315 

316 # Build result message 

317 messages = [] 

318 if updated: 

319 messages.append(f"Updated: {', '.join(updated)}") 

320 if skipped: 

321 messages.append(f"Skipped (not installed): {', '.join(skipped)}") 

322 if failed: 

323 messages.append(f"Failed: {', '.join(failed)}") 

324 

325 success = len(failed) == 0 

326 message = "; ".join(messages) if messages else "No bundled skills to update" 

327 

328 return success, message