Coverage for src / agent / tools / gitlab.py: 86%
251 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 14:30 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 14:30 +0000
1# Copyright 2025-2026 Microsoft Corporation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
15"""GitLab API tools for MRs, issues, and pipelines."""
17import asyncio
18import logging
19import os
20import re
21import subprocess
22from typing import Annotated, Any
23from urllib.parse import urlparse
25from pydantic import Field
27from agent.config.schema import AgentSettings
28from agent.tools.toolset import AgentToolset
30logger = logging.getLogger(__name__)
32# Valid GitLab project path pattern (group/subgroup/project or numeric ID)
33PROJECT_PATH_PATTERN = re.compile(r"^[\w.-]+(/[\w.-]+)*$|^\d+$")
36class GitLabTools(AgentToolset):
37 """GitLab API tools for MRs, issues, and pipelines."""
39 def __init__(self, settings: AgentSettings):
40 """Initialize GitLabTools with settings.
42 Args:
43 settings: Agent settings instance with GitLab configuration
44 """
45 super().__init__(settings)
46 self._client_cache: Any = None
47 self._token_cache: str | None = None
49 def get_tools(self) -> list:
50 """Get list of GitLab tools.
52 Returns:
53 List of GitLab tool functions
54 """
55 return [
56 self.list_merge_requests,
57 self.get_merge_request,
58 self.create_merge_request,
59 self.list_issues,
60 self.get_issue,
61 self.create_issue,
62 self.list_pipelines,
63 self.get_pipeline,
64 self.get_pipeline_jobs,
65 ]
67 def _get_gitlab_url(self) -> str:
68 """Get GitLab URL from config or environment.
70 Priority order:
71 1. config.gitlab_url
72 2. GITLAB_URL environment variable
73 3. Default: https://community.opengroup.org
75 Returns:
76 GitLab instance URL
77 """
78 # Check config first
79 if hasattr(self.config, "gitlab_url") and self.config.gitlab_url:
80 return self.config.gitlab_url
82 # Check environment variable
83 env_url = os.getenv("GITLAB_URL")
84 if env_url:
85 return env_url
87 # Default for OSDU
88 return "https://community.opengroup.org"
90 def _get_gitlab_token(self) -> str | None:
91 """Get GitLab token from config, environment, or glab CLI.
93 Priority order:
94 1. Cached token (for performance)
95 2. config.gitlab_token
96 3. GITLAB_TOKEN environment variable
97 4. glab CLI token (via glab auth status)
99 Returns:
100 GitLab personal access token or None
101 """
102 if self._token_cache is not None:
103 return self._token_cache
105 # Check config first
106 if hasattr(self.config, "gitlab_token") and self.config.gitlab_token:
107 self._token_cache = self.config.gitlab_token
108 return self._token_cache
110 # Check environment variable
111 env_token = os.getenv("GITLAB_TOKEN")
112 if env_token:
113 self._token_cache = env_token
114 return self._token_cache
116 # Try glab CLI with correct hostname
117 try:
118 # Extract hostname from gitlab_url for glab CLI
119 gitlab_url = self._get_gitlab_url()
120 hostname = urlparse(gitlab_url).netloc
122 result = subprocess.run(
123 ["glab", "auth", "status", "-t", "--hostname", hostname],
124 capture_output=True,
125 text=True,
126 timeout=5,
127 )
128 if result.returncode == 0:
129 # Parse token from output
130 for line in result.stdout.splitlines():
131 if "Token:" in line:
132 token = line.split("Token:")[-1].strip()
133 if token and token != "***":
134 self._token_cache = token
135 return self._token_cache
136 except (subprocess.TimeoutExpired, FileNotFoundError):
137 pass
139 return None
141 def _get_client(self) -> Any:
142 """Get GitLab client with lazy initialization.
144 Returns:
145 Configured gitlab.Gitlab client
147 Raises:
148 ImportError: If python-gitlab is not installed
149 ValueError: If no authentication token is available
150 """
151 if self._client_cache is not None:
152 return self._client_cache
154 try:
155 import gitlab
156 except ImportError as e:
157 raise ImportError(
158 "python-gitlab is required for GitLab tools. "
159 "Install with: pip install python-gitlab"
160 ) from e
162 url = self._get_gitlab_url()
163 token = self._get_gitlab_token()
165 if not token:
166 raise ValueError(
167 "GitLab authentication required. Set GITLAB_TOKEN environment variable "
168 "or configure via 'osdu-agent config'"
169 )
171 timeout = (
172 self.config.gitlab_api_timeout if hasattr(self.config, "gitlab_api_timeout") else 30
173 )
175 self._client_cache = gitlab.Gitlab(
176 url=url,
177 private_token=token,
178 timeout=timeout,
179 )
181 return self._client_cache
183 def _validate_project_id(self, project_id: str) -> dict | None:
184 """Validate GitLab project ID format.
186 Args:
187 project_id: Project path (group/project) or numeric ID
189 Returns:
190 Error dict if invalid, None if valid
191 """
192 if not project_id or not project_id.strip():
193 return self._create_error_response(
194 error="empty_project_id",
195 message="Project ID cannot be empty",
196 )
198 # Check for path traversal attempts
199 if ".." in project_id:
200 return self._create_error_response(
201 error="invalid_project_id",
202 message=f"Invalid project ID format: {project_id}. Path traversal not allowed.",
203 )
205 if not PROJECT_PATH_PATTERN.match(project_id):
206 return self._create_error_response(
207 error="invalid_project_id",
208 message=f"Invalid project ID format: {project_id}. Use group/project path or numeric ID.",
209 )
211 return None
213 async def _run_gitlab_call(self, func: Any, *args: Any, **kwargs: Any) -> Any:
214 """Run a synchronous GitLab API call in a thread to avoid blocking.
216 Args:
217 func: Synchronous function to call
218 *args: Positional arguments for the function
219 **kwargs: Keyword arguments for the function
221 Returns:
222 Result from the function call
223 """
224 return await asyncio.to_thread(func, *args, **kwargs)
226 # ==================== Merge Request Tools ====================
228 def _list_merge_requests_sync(
229 self, project_id: str, state: str | None, limit: int
230 ) -> list[dict[str, Any]]:
231 """Synchronous helper for listing merge requests."""
232 gl = self._get_client()
233 project = gl.projects.get(project_id)
235 params: dict[str, Any] = {"per_page": limit}
236 if state and state != "all":
237 params["state"] = state
239 mrs = project.mergerequests.list(**params)
241 return [
242 {
243 "iid": mr.iid,
244 "title": mr.title,
245 "author": mr.author.get("username") if mr.author else None,
246 "state": mr.state,
247 "source_branch": mr.source_branch,
248 "target_branch": mr.target_branch,
249 "web_url": mr.web_url,
250 }
251 for mr in mrs
252 ]
254 async def list_merge_requests(
255 self,
256 project_id: Annotated[str, Field(description="GitLab project path (group/project) or ID")],
257 state: Annotated[
258 str | None, Field(description="Filter: opened, merged, closed, all")
259 ] = "opened",
260 limit: Annotated[int | None, Field(description="Max results (default 20)")] = 20,
261 ) -> dict:
262 """List GitLab merge requests. Filter by state (opened/merged/closed/all). Returns IIDs, titles, authors."""
263 error = self._validate_project_id(project_id)
264 if error:
265 return error
267 try:
268 result_list = await self._run_gitlab_call(
269 self._list_merge_requests_sync, project_id, state, limit or 20
270 )
272 return self._create_success_response(
273 result={"merge_requests": result_list, "count": len(result_list)},
274 message=f"Found {len(result_list)} merge requests",
275 )
277 except Exception as e:
278 logger.error(f"Error listing merge requests: {e}")
279 return self._create_error_response(
280 error="gitlab_api_error",
281 message=f"Failed to list merge requests: {str(e)}",
282 )
284 def _get_merge_request_sync(self, project_id: str, mr_iid: int) -> dict[str, Any]:
285 """Synchronous helper for getting merge request details."""
286 gl = self._get_client()
287 project = gl.projects.get(project_id)
288 mr = project.mergerequests.get(mr_iid)
290 discussions = mr.discussions.list(per_page=100)
291 discussion_count = len(list(discussions))
293 return {
294 "iid": mr.iid,
295 "title": mr.title,
296 "description": mr.description,
297 "author": mr.author.get("username") if mr.author else None,
298 "state": mr.state,
299 "source_branch": mr.source_branch,
300 "target_branch": mr.target_branch,
301 "labels": mr.labels,
302 "reviewers": [r.get("username") for r in (mr.reviewers or [])],
303 "assignees": [a.get("username") for a in (mr.assignees or [])],
304 "pipeline_status": mr.head_pipeline.get("status") if mr.head_pipeline else None,
305 "merge_status": mr.merge_status,
306 "draft": mr.draft,
307 "discussion_count": discussion_count,
308 "web_url": mr.web_url,
309 "created_at": mr.created_at,
310 "updated_at": mr.updated_at,
311 }
313 async def get_merge_request(
314 self,
315 project_id: Annotated[str, Field(description="GitLab project path or ID")],
316 mr_iid: Annotated[int, Field(description="Merge request IID (not ID)")],
317 ) -> dict:
318 """Get merge request details including discussions. Returns description, labels, reviewers, CI status."""
319 error = self._validate_project_id(project_id)
320 if error:
321 return error
323 try:
324 result = await self._run_gitlab_call(self._get_merge_request_sync, project_id, mr_iid)
326 return self._create_success_response(
327 result=result,
328 message=f"Retrieved MR !{mr_iid}: {result['title']}",
329 )
331 except Exception as e:
332 logger.error(f"Error getting merge request: {e}")
333 return self._create_error_response(
334 error="gitlab_api_error",
335 message=f"Failed to get merge request: {str(e)}",
336 )
338 def _create_merge_request_sync(
339 self,
340 project_id: str,
341 source_branch: str,
342 target_branch: str,
343 title: str,
344 description: str | None,
345 labels: list[str] | None,
346 ) -> dict[str, Any]:
347 """Synchronous helper for creating merge request."""
348 gl = self._get_client()
349 project = gl.projects.get(project_id)
351 mr_data: dict[str, Any] = {
352 "source_branch": source_branch,
353 "target_branch": target_branch,
354 "title": title,
355 }
357 if description:
358 mr_data["description"] = description
359 if labels:
360 mr_data["labels"] = labels
362 mr = project.mergerequests.create(mr_data)
364 return {
365 "iid": mr.iid,
366 "title": mr.title,
367 "source_branch": mr.source_branch,
368 "target_branch": mr.target_branch,
369 "web_url": mr.web_url,
370 "state": mr.state,
371 }
373 async def create_merge_request(
374 self,
375 project_id: Annotated[str, Field(description="GitLab project path or ID")],
376 source_branch: Annotated[str, Field(description="Source branch name")],
377 target_branch: Annotated[str, Field(description="Target branch name")],
378 title: Annotated[str, Field(description="MR title")],
379 description: Annotated[str | None, Field(description="MR description")] = None,
380 labels: Annotated[list[str] | None, Field(description="Labels to apply")] = None,
381 ) -> dict:
382 """Create merge request from source to target branch. Returns IID and web URL for review."""
383 error = self._validate_project_id(project_id)
384 if error:
385 return error
387 if not source_branch or not target_branch or not title:
388 return self._create_error_response(
389 error="missing_required_fields",
390 message="source_branch, target_branch, and title are required",
391 )
393 try:
394 result = await self._run_gitlab_call(
395 self._create_merge_request_sync,
396 project_id,
397 source_branch,
398 target_branch,
399 title,
400 description,
401 labels,
402 )
404 return self._create_success_response(
405 result=result,
406 message=f"Created MR !{result['iid']}: {result['title']}",
407 )
409 except Exception as e:
410 logger.error(f"Error creating merge request: {e}")
411 return self._create_error_response(
412 error="gitlab_api_error",
413 message=f"Failed to create merge request: {str(e)}",
414 )
416 # ==================== Issue Tools ====================
418 def _list_issues_sync(
419 self, project_id: str, state: str | None, labels: list[str] | None, limit: int
420 ) -> list[dict[str, Any]]:
421 """Synchronous helper for listing issues."""
422 gl = self._get_client()
423 project = gl.projects.get(project_id)
425 params: dict[str, Any] = {"per_page": limit}
426 if state and state != "all":
427 params["state"] = state
428 if labels:
429 params["labels"] = ",".join(labels)
431 issues = project.issues.list(**params)
433 return [
434 {
435 "iid": issue.iid,
436 "title": issue.title,
437 "author": issue.author.get("username") if issue.author else None,
438 "assignees": [a.get("username") for a in (issue.assignees or [])],
439 "state": issue.state,
440 "labels": issue.labels,
441 "web_url": issue.web_url,
442 }
443 for issue in issues
444 ]
446 async def list_issues(
447 self,
448 project_id: Annotated[str, Field(description="GitLab project path or ID")],
449 state: Annotated[str | None, Field(description="Filter: opened, closed, all")] = "opened",
450 labels: Annotated[list[str] | None, Field(description="Filter by labels")] = None,
451 limit: Annotated[int | None, Field(description="Max results (default 20)")] = 20,
452 ) -> dict:
453 """List GitLab issues. Filter by state and labels. Returns IIDs, titles, assignees."""
454 error = self._validate_project_id(project_id)
455 if error:
456 return error
458 try:
459 result_list = await self._run_gitlab_call(
460 self._list_issues_sync, project_id, state, labels, limit or 20
461 )
463 return self._create_success_response(
464 result={"issues": result_list, "count": len(result_list)},
465 message=f"Found {len(result_list)} issues",
466 )
468 except Exception as e:
469 logger.error(f"Error listing issues: {e}")
470 return self._create_error_response(
471 error="gitlab_api_error",
472 message=f"Failed to list issues: {str(e)}",
473 )
475 def _get_issue_sync(self, project_id: str, issue_iid: int) -> dict[str, Any]:
476 """Synchronous helper for getting issue details."""
477 gl = self._get_client()
478 project = gl.projects.get(project_id)
479 issue = project.issues.get(issue_iid)
481 return {
482 "iid": issue.iid,
483 "title": issue.title,
484 "description": issue.description,
485 "author": issue.author.get("username") if issue.author else None,
486 "assignees": [a.get("username") for a in (issue.assignees or [])],
487 "state": issue.state,
488 "labels": issue.labels,
489 "milestone": issue.milestone.get("title") if issue.milestone else None,
490 "due_date": issue.due_date,
491 "web_url": issue.web_url,
492 "created_at": issue.created_at,
493 "updated_at": issue.updated_at,
494 }
496 async def get_issue(
497 self,
498 project_id: Annotated[str, Field(description="GitLab project path or ID")],
499 issue_iid: Annotated[int, Field(description="Issue IID (not ID)")],
500 ) -> dict:
501 """Get issue details. Returns description, labels, assignees, milestone."""
502 error = self._validate_project_id(project_id)
503 if error:
504 return error
506 try:
507 result = await self._run_gitlab_call(self._get_issue_sync, project_id, issue_iid)
509 return self._create_success_response(
510 result=result,
511 message=f"Retrieved issue #{issue_iid}: {result['title']}",
512 )
514 except Exception as e:
515 logger.error(f"Error getting issue: {e}")
516 return self._create_error_response(
517 error="gitlab_api_error",
518 message=f"Failed to get issue: {str(e)}",
519 )
521 def _create_issue_sync(
522 self,
523 project_id: str,
524 title: str,
525 description: str | None,
526 labels: list[str] | None,
527 assignee_ids: list[int] | None,
528 ) -> dict[str, Any]:
529 """Synchronous helper for creating issue."""
530 gl = self._get_client()
531 project = gl.projects.get(project_id)
533 issue_data: dict[str, Any] = {"title": title}
535 if description:
536 issue_data["description"] = description
537 if labels:
538 issue_data["labels"] = labels
539 if assignee_ids:
540 issue_data["assignee_ids"] = assignee_ids
542 issue = project.issues.create(issue_data)
544 return {
545 "iid": issue.iid,
546 "title": issue.title,
547 "web_url": issue.web_url,
548 "state": issue.state,
549 }
551 async def create_issue(
552 self,
553 project_id: Annotated[str, Field(description="GitLab project path or ID")],
554 title: Annotated[str, Field(description="Issue title")],
555 description: Annotated[str | None, Field(description="Issue description")] = None,
556 labels: Annotated[list[str] | None, Field(description="Labels to apply")] = None,
557 assignee_ids: Annotated[list[int] | None, Field(description="User IDs to assign")] = None,
558 ) -> dict:
559 """Create new issue in project. Returns IID and web URL."""
560 error = self._validate_project_id(project_id)
561 if error:
562 return error
564 if not title:
565 return self._create_error_response(
566 error="missing_required_fields",
567 message="title is required",
568 )
570 try:
571 result = await self._run_gitlab_call(
572 self._create_issue_sync, project_id, title, description, labels, assignee_ids
573 )
575 return self._create_success_response(
576 result=result,
577 message=f"Created issue #{result['iid']}: {result['title']}",
578 )
580 except Exception as e:
581 logger.error(f"Error creating issue: {e}")
582 return self._create_error_response(
583 error="gitlab_api_error",
584 message=f"Failed to create issue: {str(e)}",
585 )
587 # ==================== Pipeline Tools ====================
589 def _list_pipelines_sync(
590 self, project_id: str, ref: str | None, status: str | None, limit: int
591 ) -> list[dict[str, Any]]:
592 """Synchronous helper for listing pipelines."""
593 gl = self._get_client()
594 project = gl.projects.get(project_id)
596 params: dict[str, Any] = {"per_page": limit}
597 if ref:
598 params["ref"] = ref
599 if status:
600 params["status"] = status
602 pipelines = project.pipelines.list(**params)
604 return [
605 {
606 "id": pipeline.id,
607 "ref": pipeline.ref,
608 "status": pipeline.status,
609 "source": pipeline.source,
610 "sha": pipeline.sha[:8] if pipeline.sha else None,
611 "web_url": pipeline.web_url,
612 "created_at": pipeline.created_at,
613 }
614 for pipeline in pipelines
615 ]
617 async def list_pipelines(
618 self,
619 project_id: Annotated[str, Field(description="GitLab project path or ID")],
620 ref: Annotated[str | None, Field(description="Filter by branch/tag")] = None,
621 status: Annotated[
622 str | None, Field(description="Filter: running, pending, success, failed, canceled")
623 ] = None,
624 limit: Annotated[int | None, Field(description="Max results (default 20)")] = 20,
625 ) -> dict:
626 """List recent pipelines for project. Filter by ref/status. Returns IDs, statuses, timestamps."""
627 error = self._validate_project_id(project_id)
628 if error:
629 return error
631 try:
632 result_list = await self._run_gitlab_call(
633 self._list_pipelines_sync, project_id, ref, status, limit or 20
634 )
636 return self._create_success_response(
637 result={"pipelines": result_list, "count": len(result_list)},
638 message=f"Found {len(result_list)} pipelines",
639 )
641 except Exception as e:
642 logger.error(f"Error listing pipelines: {e}")
643 return self._create_error_response(
644 error="gitlab_api_error",
645 message=f"Failed to list pipelines: {str(e)}",
646 )
648 def _get_pipeline_sync(self, project_id: str, pipeline_id: int) -> dict[str, Any]:
649 """Synchronous helper for getting pipeline details."""
650 gl = self._get_client()
651 project = gl.projects.get(project_id)
652 pipeline = project.pipelines.get(pipeline_id)
654 jobs = pipeline.jobs.list(per_page=100)
656 stages: dict[str, list[dict[str, Any]]] = {}
657 for job in jobs:
658 stage = job.stage
659 if stage not in stages:
660 stages[stage] = []
661 stages[stage].append(
662 {
663 "id": job.id,
664 "name": job.name,
665 "status": job.status,
666 "duration": job.duration,
667 }
668 )
670 return {
671 "id": pipeline.id,
672 "ref": pipeline.ref,
673 "status": pipeline.status,
674 "source": pipeline.source,
675 "sha": pipeline.sha,
676 "duration": pipeline.duration,
677 "stages": stages,
678 "web_url": pipeline.web_url,
679 "created_at": pipeline.created_at,
680 "updated_at": pipeline.updated_at,
681 "finished_at": pipeline.finished_at,
682 }
684 async def get_pipeline(
685 self,
686 project_id: Annotated[str, Field(description="GitLab project path or ID")],
687 pipeline_id: Annotated[int, Field(description="Pipeline ID")],
688 ) -> dict:
689 """Get pipeline details with job breakdown. Returns stages, job statuses, duration."""
690 error = self._validate_project_id(project_id)
691 if error:
692 return error
694 try:
695 result = await self._run_gitlab_call(self._get_pipeline_sync, project_id, pipeline_id)
697 return self._create_success_response(
698 result=result,
699 message=f"Retrieved pipeline {pipeline_id}: {result['status']}",
700 )
702 except Exception as e:
703 logger.error(f"Error getting pipeline: {e}")
704 return self._create_error_response(
705 error="gitlab_api_error",
706 message=f"Failed to get pipeline: {str(e)}",
707 )
709 def _get_pipeline_jobs_sync(
710 self, project_id: str, pipeline_id: int, scope: str | None
711 ) -> list[dict[str, Any]]:
712 """Synchronous helper for getting pipeline jobs."""
713 gl = self._get_client()
714 project = gl.projects.get(project_id)
715 pipeline = project.pipelines.get(pipeline_id)
717 params: dict[str, Any] = {"per_page": 100}
718 if scope:
719 params["scope"] = scope
721 jobs = pipeline.jobs.list(**params)
723 return [
724 {
725 "id": job.id,
726 "name": job.name,
727 "stage": job.stage,
728 "status": job.status,
729 "duration": job.duration,
730 "web_url": job.web_url,
731 "started_at": job.started_at,
732 "finished_at": job.finished_at,
733 }
734 for job in jobs
735 ]
737 async def get_pipeline_jobs(
738 self,
739 project_id: Annotated[str, Field(description="GitLab project path or ID")],
740 pipeline_id: Annotated[int, Field(description="Pipeline ID")],
741 scope: Annotated[
742 str | None, Field(description="Filter: created, pending, running, failed, success")
743 ] = None,
744 ) -> dict:
745 """Get jobs for specific pipeline. Filter by scope. Returns job names, statuses, durations."""
746 error = self._validate_project_id(project_id)
747 if error:
748 return error
750 try:
751 result_list = await self._run_gitlab_call(
752 self._get_pipeline_jobs_sync, project_id, pipeline_id, scope
753 )
755 return self._create_success_response(
756 result={"jobs": result_list, "count": len(result_list)},
757 message=f"Found {len(result_list)} jobs in pipeline {pipeline_id}",
758 )
760 except Exception as e:
761 logger.error(f"Error getting pipeline jobs: {e}")
762 return self._create_error_response(
763 error="gitlab_api_error",
764 message=f"Failed to get pipeline jobs: {str(e)}",
765 )