Coverage for src / agent / services / maven / version.py: 89%

205 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-11 14:30 +0000

1# Copyright 2025-2026 Microsoft Corporation 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# http://www.apache.org/licenses/LICENSE-2.0 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15"""Version parsing, comparison, and filtering service for Maven artifacts. 

16 

17This module handles the complexity of Maven version strings, which can come 

18in various formats: 

19- Semantic: 1.2.3, 1.2.3.4 

20- Calendar: 20240115, 2024.01.15 

21- Simple numeric: 5, 1.2 

22- With qualifiers: 1.0-SNAPSHOT, 1.0-alpha, 1.0.Final 

23 

24Qualifier ordering (lowest to highest): 

25 SNAPSHOT < alpha < beta < CR < RC < ga < release < Final 

26""" 

27 

28import functools 

29import logging 

30import re 

31from typing import Literal 

32 

33from agent.services.maven.types import LatestVersions 

34 

35logger = logging.getLogger(__name__) 

36 

37 

38class VersionService: 

39 """Service for Maven version parsing, comparison, and filtering.""" 

40 

41 # Qualifier rankings (higher is more stable) 

42 QUALIFIER_RANKS: dict[str, int] = { 

43 "snapshot": 10, 

44 "alpha": 20, 

45 "a": 20, 

46 "beta": 30, 

47 "b": 30, 

48 "milestone": 40, 

49 "m": 40, 

50 "cr": 50, 

51 "rc": 60, 

52 "": 70, # No qualifier 

53 "ga": 80, 

54 "release": 90, 

55 "final": 100, 

56 "sp": 100, # Service pack 

57 } 

58 

59 # Pre-release qualifiers to filter out for stable versions 

60 PRERELEASE_PATTERNS = ["snapshot", "alpha", "beta", "rc", "-m", ".m", "cr"] 

61 

62 @staticmethod 

63 def parse_version(version_string: str) -> tuple[list[int], str]: 

64 """Parse version string into numeric components and qualifier. 

65 

66 Supports multiple version formats: 

67 - Standard semver (MAJOR.MINOR.PATCH): 1.2.3 -> ([1, 2, 3], "") 

68 - Calendar format (YYYYMMDD): 20231013 -> ([2023, 10, 13], "") 

69 - Simple numeric: 5 -> ([5, 0, 0], "") 

70 - With qualifiers: 1.2.3-SNAPSHOT -> ([1, 2, 3], "-SNAPSHOT") 

71 

72 Args: 

73 version_string: Version string to parse 

74 

75 Returns: 

76 Tuple of (numeric_components, qualifier) 

77 """ 

78 if not version_string: 

79 return [], "" 

80 

81 # Extract qualifier from version string 

82 base_version, qualifier = VersionService._extract_qualifier(version_string) 

83 

84 # Try different parsing strategies in order 

85 components = ( 

86 VersionService._try_semver_pattern(base_version) 

87 or VersionService._try_calendar_version(base_version) 

88 or VersionService._try_simple_numeric(base_version) 

89 or VersionService._extract_numeric_components(base_version) 

90 ) 

91 

92 return components, qualifier 

93 

94 @staticmethod 

95 def _extract_qualifier(version_string: str) -> tuple[str, str]: 

96 """Extract qualifier from version string. 

97 

98 Args: 

99 version_string: Full version string 

100 

101 Returns: 

102 Tuple of (base_version, qualifier) 

103 """ 

104 # Handle hyphen-separated qualifiers: 1.2.3-SNAPSHOT 

105 if "-" in version_string: 

106 base_version, qualifier = version_string.split("-", 1) 

107 return base_version, f"-{qualifier}" 

108 

109 # Handle dot-separated non-numeric qualifiers: 1.2.3.Final 

110 if "." in version_string: 

111 parts = version_string.split(".") 

112 for i, part in enumerate(parts): 

113 if part and not part.isdigit(): 

114 base = ".".join(parts[:i]) 

115 qual = "." + ".".join(parts[i:]) 

116 return base, qual 

117 

118 return version_string, "" 

119 

120 @staticmethod 

121 def _try_semver_pattern(base_version: str) -> list[int] | None: 

122 """Try to parse as standard semver pattern. 

123 

124 Args: 

125 base_version: Base version without qualifier 

126 

127 Returns: 

128 List of version components or None if not semver 

129 """ 

130 # Standard semver: MAJOR.MINOR.PATCH 

131 semver_match = re.match(r"^(\d+)\.(\d+)\.(\d+)$", base_version) 

132 if semver_match: 

133 return [int(x) for x in semver_match.groups()] 

134 

135 # Partial semver: MAJOR.MINOR 

136 partial_match = re.match(r"^(\d+)\.(\d+)$", base_version) 

137 if partial_match: 

138 major, minor = map(int, partial_match.groups()) 

139 return [major, minor, 0] 

140 

141 # Extended semver: MAJOR.MINOR.PATCH.BUILD 

142 extended_match = re.match(r"^(\d+)\.(\d+)\.(\d+)\.(\d+)$", base_version) 

143 if extended_match: 

144 return [int(x) for x in extended_match.groups()] 

145 

146 return None 

147 

148 @staticmethod 

149 def _try_calendar_version(base_version: str) -> list[int] | None: 

150 """Try to parse as calendar version (YYYYMMDD). 

151 

152 Args: 

153 base_version: Base version without qualifier 

154 

155 Returns: 

156 List of [year, month, day] or None if not calendar format 

157 """ 

158 if len(base_version) != 8 or not base_version.isdigit(): 

159 return None 

160 

161 try: 

162 year = int(base_version[:4]) 

163 month = int(base_version[4:6]) 

164 day = int(base_version[6:8]) 

165 

166 # Validate date range 

167 if 1900 <= year <= 2100 and 1 <= month <= 12 and 1 <= day <= 31: 

168 return [year, month, day] 

169 except (ValueError, IndexError): 

170 pass 

171 

172 return None 

173 

174 @staticmethod 

175 def _try_simple_numeric(base_version: str) -> list[int] | None: 

176 """Try to parse as simple numeric version. 

177 

178 Args: 

179 base_version: Base version without qualifier 

180 

181 Returns: 

182 List of [major, 0, 0] or None if not simple numeric 

183 """ 

184 if base_version.isdigit(): 

185 # Could be calendar - check first 

186 calendar = VersionService._try_calendar_version(base_version) 

187 if calendar: 

188 return calendar 

189 return [int(base_version), 0, 0] 

190 return None 

191 

192 @staticmethod 

193 def _extract_numeric_components(base_version: str) -> list[int]: 

194 """Extract numeric components from dotted version. 

195 

196 Args: 

197 base_version: Base version without qualifier 

198 

199 Returns: 

200 List of numeric components, padded to at least 3 elements 

201 """ 

202 components = [] 

203 for part in base_version.split("."): 

204 if part.isdigit(): 

205 components.append(int(part)) 

206 else: 

207 break 

208 

209 # Pad with zeros to ensure at least [major, minor, patch] 

210 while len(components) < 3: 

211 components.append(0) 

212 

213 return components if components != [0, 0, 0] else [0, 0, 0] 

214 

215 @staticmethod 

216 def compare_versions(version1: str, version2: str) -> int: 

217 """Compare two version strings semantically. 

218 

219 Args: 

220 version1: First version string 

221 version2: Second version string 

222 

223 Returns: 

224 -1 if version1 < version2 

225 0 if version1 == version2 

226 1 if version1 > version2 

227 """ 

228 # Handle empty versions 

229 if not version1 and not version2: 

230 return 0 

231 if not version1: 

232 return -1 

233 if not version2: 

234 return 1 

235 

236 # Parse versions 

237 v1_numeric, v1_qualifier = VersionService.parse_version(version1) 

238 v2_numeric, v2_qualifier = VersionService.parse_version(version2) 

239 

240 # Compare numeric components 

241 numeric_cmp = VersionService._compare_numeric_components(v1_numeric, v2_numeric) 

242 if numeric_cmp != 0: 

243 return numeric_cmp 

244 

245 # Compare qualifiers 

246 return VersionService._compare_qualifiers(v1_qualifier, v2_qualifier) 

247 

248 @staticmethod 

249 def _compare_numeric_components(v1: list[int], v2: list[int]) -> int: 

250 """Compare numeric version components.""" 

251 # strict=False intentional: we handle unequal lengths below 

252 for c1, c2 in zip(v1, v2, strict=False): 

253 if c1 < c2: 

254 return -1 

255 if c1 > c2: 

256 return 1 

257 

258 # If one has more components 

259 if len(v1) < len(v2): 

260 return -1 

261 if len(v1) > len(v2): 

262 return 1 

263 

264 return 0 

265 

266 @staticmethod 

267 def _compare_qualifiers(q1: str, q2: str) -> int: 

268 """Compare version qualifiers.""" 

269 # Handle .Final special case 

270 if q1.lower() == ".final" and not q2: 

271 return 1 

272 if q2.lower() == ".final" and not q1: 

273 return -1 

274 

275 # No qualifier is higher than most qualifiers 

276 if not q1 and q2: 

277 return 1 

278 if q1 and not q2: 

279 return -1 

280 

281 # Compare by rank 

282 rank1 = VersionService._get_qualifier_rank(q1) 

283 rank2 = VersionService._get_qualifier_rank(q2) 

284 

285 if rank1 != rank2: 

286 return 1 if rank1 > rank2 else -1 

287 

288 # Lexicographic comparison for same rank 

289 if q1 < q2: 

290 return -1 

291 if q1 > q2: 

292 return 1 

293 

294 return 0 

295 

296 @staticmethod 

297 def _get_qualifier_rank(qualifier: str) -> int: 

298 """Get rank for a qualifier string.""" 

299 clean = qualifier.lower().replace("-", "").replace(".", "") 

300 

301 for key, rank in VersionService.QUALIFIER_RANKS.items(): 

302 if key in clean: 

303 return rank 

304 

305 return 0 

306 

307 @staticmethod 

308 def sort_versions(versions: list[str], descending: bool = True) -> list[str]: 

309 """Sort version strings semantically. 

310 

311 Args: 

312 versions: List of version strings 

313 descending: If True, sort newest first (default: True) 

314 

315 Returns: 

316 Sorted list of version strings 

317 """ 

318 return sorted( 

319 versions, 

320 key=functools.cmp_to_key(VersionService.compare_versions), 

321 reverse=descending, 

322 ) 

323 

324 @staticmethod 

325 def get_latest_version(versions: list[str], include_snapshots: bool = False) -> str | None: 

326 """Find the latest version from a list. 

327 

328 Args: 

329 versions: List of version strings 

330 include_snapshots: Include SNAPSHOT versions (default: False) 

331 

332 Returns: 

333 Latest version string or None if list is empty 

334 """ 

335 if not versions: 

336 return None 

337 

338 # Filter snapshots if needed 

339 filtered = versions 

340 if not include_snapshots: 

341 filtered = [v for v in versions if "snapshot" not in v.lower()] 

342 if not filtered: 

343 filtered = versions # Fall back to all if no non-snapshots 

344 

345 # Prioritize .Final versions 

346 final_versions = [v for v in filtered if ".Final" in v] 

347 if final_versions: 

348 return VersionService.sort_versions(final_versions)[0] 

349 

350 sorted_versions = VersionService.sort_versions(filtered) 

351 return sorted_versions[0] if sorted_versions else None 

352 

353 @staticmethod 

354 def filter_stable_versions(versions: list[str]) -> list[str]: 

355 """Filter out pre-release versions. 

356 

357 Args: 

358 versions: List of version strings 

359 

360 Returns: 

361 List of stable (non-pre-release) versions 

362 """ 

363 stable = [] 

364 for version in versions: 

365 lower = version.lower() 

366 if not any(pattern in lower for pattern in VersionService.PRERELEASE_PATTERNS): 

367 stable.append(version) 

368 return stable 

369 

370 @staticmethod 

371 def filter_by_update_type( 

372 versions: list[str], 

373 current_version: str, 

374 update_type: Literal["major", "minor", "patch"], 

375 ) -> list[str]: 

376 """Filter versions by update type relative to current version. 

377 

378 Args: 

379 versions: List of available versions 

380 current_version: Current version to compare against 

381 update_type: Type of update to filter for 

382 

383 Returns: 

384 Filtered list of versions matching the update type 

385 """ 

386 current_components, _ = VersionService.parse_version(current_version) 

387 while len(current_components) < 3: 

388 current_components.append(0) 

389 

390 curr_major, curr_minor = current_components[0], current_components[1] 

391 

392 # Filter stable versions first 

393 stable = VersionService.filter_stable_versions(versions) 

394 

395 if update_type == "major": 

396 return stable # All stable versions for major 

397 

398 if update_type == "minor": 

399 # Same major version 

400 return [v for v in stable if VersionService.parse_version(v)[0][0] == curr_major] 

401 

402 if update_type == "patch": 

403 # Same major.minor version 

404 result = [] 

405 for v in stable: 

406 components, _ = VersionService.parse_version(v) 

407 if len(components) >= 2: 

408 if components[0] == curr_major and components[1] == curr_minor: 

409 result.append(v) 

410 return result 

411 

412 return [] 

413 

414 @staticmethod 

415 def find_latest_versions(versions: list[str], current_version: str) -> LatestVersions: 

416 """Find latest versions by update type. 

417 

418 Args: 

419 versions: List of available versions 

420 current_version: Current version 

421 

422 Returns: 

423 LatestVersions with latest major, minor, patch, and overall versions 

424 """ 

425 # Get current version components 

426 curr_components, _ = VersionService.parse_version(current_version) 

427 while len(curr_components) < 3: 

428 curr_components.append(0) 

429 curr_major, curr_minor = curr_components[0], curr_components[1] 

430 

431 stable = VersionService.filter_stable_versions(versions) 

432 sorted_versions = VersionService.sort_versions(stable) 

433 

434 latest_overall = sorted_versions[0] if sorted_versions else None 

435 

436 # Find latest major (any newer major version) 

437 latest_major = None 

438 for v in sorted_versions: 

439 v_components, _ = VersionService.parse_version(v) 

440 if v_components and v_components[0] > curr_major: 

441 latest_major = v 

442 break 

443 

444 # Find latest minor (same major, any newer minor) 

445 latest_minor = None 

446 for v in sorted_versions: 

447 v_components, _ = VersionService.parse_version(v) 

448 if len(v_components) >= 2: 

449 if v_components[0] == curr_major and v_components[1] > curr_minor: 

450 latest_minor = v 

451 break 

452 

453 # Find latest patch (same major.minor, any newer patch) 

454 latest_patch = None 

455 curr_patch = curr_components[2] if len(curr_components) > 2 else 0 

456 for v in sorted_versions: 

457 v_components, _ = VersionService.parse_version(v) 

458 if len(v_components) >= 3: 

459 if ( 

460 v_components[0] == curr_major 

461 and v_components[1] == curr_minor 

462 and v_components[2] > curr_patch 

463 ): 

464 latest_patch = v 

465 break 

466 

467 return LatestVersions( 

468 latest_major=latest_major, 

469 latest_minor=latest_minor, 

470 latest_patch=latest_patch, 

471 latest_overall=latest_overall, 

472 ) 

473 

474 @staticmethod 

475 def is_update_available( 

476 current_version: str, available_versions: list[str] 

477 ) -> tuple[bool, bool, bool]: 

478 """Check if updates are available. 

479 

480 Args: 

481 current_version: Current version 

482 available_versions: List of available versions 

483 

484 Returns: 

485 Tuple of (has_major_update, has_minor_update, has_patch_update) 

486 """ 

487 latest = VersionService.find_latest_versions(available_versions, current_version) 

488 

489 return ( 

490 latest.latest_major is not None, 

491 latest.latest_minor is not None, 

492 latest.latest_patch is not None, 

493 )