Coverage for src / agent / update.py: 90%
129 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 14:30 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 14:30 +0000
1# Copyright 2025-2026 Microsoft Corporation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""Self-update functionality for osdu-agent.
17This module provides functions to check for updates, detect the package installer,
18query GitLab for the latest version and release notes, and execute upgrades.
19"""
21import logging
22import shutil
23import subprocess
24from importlib.metadata import PackageNotFoundError, version
25from typing import Literal
27import gitlab
28from packaging.version import Version
29from rich.console import Console
30from rich.markdown import Markdown
31from rich.panel import Panel
33from agent.config.constants import (
34 OSDU_GITLAB_URL,
35 PACKAGE_NAME,
36 PACKAGE_REGISTRY_PROJECT_ID,
37 PACKAGE_REGISTRY_URL,
38)
40logger = logging.getLogger(__name__)
41console = Console()
44def get_current_version() -> str:
45 """Get the currently installed version of osdu-agent.
47 Returns:
48 The installed version string, or "0.1.0" if not found (development fallback).
49 """
50 try:
51 return version(PACKAGE_NAME)
52 except PackageNotFoundError:
53 return "0.1.0" # Fallback for development
56def compare_versions(current: str, latest: str) -> int:
57 """Compare two version strings.
59 Args:
60 current: The current version string.
61 latest: The latest version string to compare against.
63 Returns:
64 -1 if current < latest, 0 if equal, 1 if current > latest.
65 """
66 try:
67 v_current = Version(current)
68 v_latest = Version(latest)
69 if v_current < v_latest:
70 return -1
71 elif v_current > v_latest:
72 return 1
73 return 0
74 except Exception:
75 # Fallback to string comparison if parsing fails
76 return 0 if current == latest else -1
79def detect_installer() -> Literal["uv", "pipx"] | None:
80 """Detect which package manager installed osdu-agent.
82 Checks uv first, then pipx, by querying their tool lists.
84 Returns:
85 "uv" if installed via uv tool, "pipx" if installed via pipx,
86 or None if neither can confirm the installation.
87 """
88 # Check uv first
89 if shutil.which("uv"):
90 try:
91 result = subprocess.run(
92 ["uv", "tool", "list"],
93 capture_output=True,
94 text=True,
95 timeout=10,
96 )
97 if PACKAGE_NAME in result.stdout:
98 return "uv"
99 except (subprocess.TimeoutExpired, subprocess.SubprocessError):
100 pass
102 # Check pipx
103 if shutil.which("pipx"):
104 try:
105 result = subprocess.run(
106 ["pipx", "list"],
107 capture_output=True,
108 text=True,
109 timeout=10,
110 )
111 if PACKAGE_NAME in result.stdout:
112 return "pipx"
113 except (subprocess.TimeoutExpired, subprocess.SubprocessError):
114 pass
116 return None
119def fetch_latest_version(gitlab_token: str | None = None) -> str | None:
120 """Fetch the latest version from GitLab Package Registry.
122 Args:
123 gitlab_token: Optional GitLab private token for API access.
125 Returns:
126 The latest version string, or None if unable to fetch.
127 """
128 try:
129 gl = gitlab.Gitlab(OSDU_GITLAB_URL, private_token=gitlab_token)
130 project = gl.projects.get(PACKAGE_REGISTRY_PROJECT_ID)
132 # Get packages for this project (only need the latest one)
133 packages = project.packages.list(
134 package_name=PACKAGE_NAME,
135 package_type="pypi",
136 order_by="created_at",
137 sort="desc",
138 per_page=1,
139 get_all=False, # Suppress pagination warning - we only want the latest
140 )
142 if packages:
143 return str(packages[0].version)
144 return None
145 except Exception as e:
146 logger.debug(f"Failed to fetch latest version: {e}")
147 return None
150def fetch_releases(gitlab_token: str | None = None, limit: int = 5) -> list[dict]:
151 """Fetch recent releases from GitLab.
153 Args:
154 gitlab_token: Optional GitLab private token for API access.
155 limit: Maximum number of releases to fetch.
157 Returns:
158 List of release dictionaries with tag_name, name, description, released_at.
159 """
160 try:
161 gl = gitlab.Gitlab(OSDU_GITLAB_URL, private_token=gitlab_token)
162 project = gl.projects.get(PACKAGE_REGISTRY_PROJECT_ID)
163 releases = project.releases.list(per_page=limit)
165 return [
166 {
167 "tag_name": r.tag_name,
168 "name": r.name,
169 "description": r.description,
170 "released_at": r.released_at,
171 }
172 for r in releases
173 ]
174 except Exception as e:
175 logger.debug(f"Failed to fetch releases: {e}")
176 return []
179def format_changelog(
180 releases: list[dict],
181 current_version: str,
182 latest_version: str,
183) -> str:
184 """Format changelog entries between current and latest version.
186 Args:
187 releases: List of release dictionaries from fetch_releases().
188 current_version: The currently installed version.
189 latest_version: The latest available version.
191 Returns:
192 Formatted changelog string, or "No changelog available" if none found.
193 """
194 changelog_parts = []
196 for release in releases:
197 tag = release["tag_name"]
198 release_version = tag.lstrip("v")
200 # Skip versions older than or equal to current
201 if compare_versions(current_version, release_version) >= 0:
202 continue
204 # Skip versions newer than latest (shouldn't happen normally)
205 if compare_versions(latest_version, release_version) < 0:
206 continue
208 description = release.get("description") or "No release notes"
209 changelog_parts.append(description)
211 return "\n\n---\n\n".join(changelog_parts) if changelog_parts else "No changelog available"
214def display_changelog(changelog: str) -> None:
215 """Display formatted changelog in terminal using Rich.
217 Args:
218 changelog: The changelog text in markdown format.
219 """
220 md = Markdown(changelog)
221 console.print(Panel(md, title="What's New", border_style="cyan"))
224def upgrade_package(installer: Literal["uv", "pipx"]) -> tuple[bool, str]:
225 """Execute upgrade command and return result.
227 Args:
228 installer: The package manager to use ("uv" or "pipx").
230 Returns:
231 Tuple of (success, output) where success is True if upgrade succeeded.
232 """
233 try:
234 if installer == "uv":
235 cmd = [
236 "uv",
237 "tool",
238 "upgrade",
239 PACKAGE_NAME,
240 "--extra-index-url",
241 PACKAGE_REGISTRY_URL,
242 ]
243 elif installer == "pipx":
244 cmd = [
245 "pipx",
246 "upgrade",
247 PACKAGE_NAME,
248 "--pip-args",
249 f"--extra-index-url {PACKAGE_REGISTRY_URL}",
250 ]
251 else:
252 return False, f"Unknown installer: {installer}"
254 result = subprocess.run(
255 cmd,
256 capture_output=True,
257 text=True,
258 timeout=180, # 3 minutes timeout
259 )
261 output = result.stdout + result.stderr
262 return result.returncode == 0, output
264 except subprocess.TimeoutExpired:
265 return False, "Upgrade timed out after 3 minutes"
266 except FileNotFoundError:
267 return False, f"{installer} command not found"
268 except Exception as e:
269 return False, str(e)
272def update_bundled_skills() -> tuple[bool, str]:
273 """Trigger update for bundled skills with CLIs.
275 Discovers all bundled skills that have CLI tools installed and runs
276 their update commands.
278 Returns:
279 Tuple of (success, message) indicating update result.
280 """
281 # List of bundled skills with CLIs that support 'update' command
282 bundled_cli_skills = ["osdu-quality", "osdu-engagement", "osdu-activity"]
284 updated = []
285 skipped = []
286 failed = []
288 for skill_name in bundled_cli_skills:
289 cli_path = shutil.which(skill_name)
290 if not cli_path:
291 skipped.append(skill_name)
292 continue
294 try:
295 console.print(f"\n[bold]Updating {skill_name}...[/bold]")
296 console.print(f"[dim white]Running: {skill_name} update[/dim white]")
298 result = subprocess.run(
299 [skill_name, "update"],
300 capture_output=False, # Show output in real-time
301 timeout=300, # 5 minutes timeout
302 )
304 if result.returncode == 0:
305 updated.append(skill_name)
306 else:
307 failed.append(f"{skill_name} (exit code {result.returncode})")
309 except subprocess.TimeoutExpired:
310 failed.append(f"{skill_name} (timeout)")
311 except FileNotFoundError:
312 skipped.append(skill_name)
313 except Exception as e:
314 failed.append(f"{skill_name} ({e})")
316 # Build result message
317 messages = []
318 if updated:
319 messages.append(f"Updated: {', '.join(updated)}")
320 if skipped:
321 messages.append(f"Skipped (not installed): {', '.join(skipped)}")
322 if failed:
323 messages.append(f"Failed: {', '.join(failed)}")
325 success = len(failed) == 0
326 message = "; ".join(messages) if messages else "No bundled skills to update"
328 return success, message