"""Collecte et agregation des donnees repos Gitea.""" from __future__ import annotations from dataclasses import dataclass from gitea_dashboard.client import GiteaClient @dataclass class RepoData: """Donnees agregees d'un repo.""" name: str full_name: str description: str open_issues: int # open_issues_count - open_pr_counter is_fork: bool is_archived: bool is_mirror: bool latest_release: dict | None # {tag_name, published_at} ou None milestones: list[dict] # [{title, open_issues, closed_issues, due_on}] last_commit_date: str | None # ISO 8601, ex: "2026-03-10T14:30:00Z" @dataclass class MilestoneData: """Donnees agregees d'une milestone avec son repo parent.""" repo_name: str title: str open_issues: int closed_issues: int progress_pct: int # Pourcentage de completion (0-100) due_on: str | None # ISO 8601 ou None state: str # "open" ou "closed" def _matches_any(name: str, patterns: list[str]) -> bool: """Return True if name contains any of the patterns (case-insensitive).""" name_lower = name.lower() return any(p.lower() in name_lower for p in patterns) def collect_all( client: GiteaClient, include: list[str] | None = None, exclude: list[str] | None = None, ) -> list[RepoData]: """Collecte les donnees des repos, avec filtrage optionnel. Args: client: Client API Gitea. include: Si fourni, ne garde que les repos dont le nom contient au moins une des sous-chaines (insensible a la casse). exclude: Si fourni, exclut les repos dont le nom contient au moins une des sous-chaines (insensible a la casse). Ordre d'application : include d'abord (si present), puis exclude. Si include est None ou vide, tous les repos sont inclus avant l'etape exclude. """ repos = client.get_repos() # Filtrage post-fetch : l'API Gitea ne supporte pas le filtre par nom repos = _filter_repos(repos, include, exclude) result: list[RepoData] = [] for repo in repos: owner = repo["owner"]["login"] name = repo["name"] commit = client.get_latest_commit(owner, name) last_commit_date = commit["created"] if commit else None result.append( RepoData( name=name, full_name=repo["full_name"], description=repo.get("description", "") or "", open_issues=repo["open_issues_count"] - repo["open_pr_counter"], is_fork=repo["fork"], is_archived=repo["archived"], is_mirror=repo["mirror"], latest_release=client.get_latest_release(owner, name), milestones=client.get_milestones(owner, name), last_commit_date=last_commit_date, ) ) return result def _filter_repos( repos: list[dict], include: list[str] | None = None, exclude: list[str] | None = None, ) -> list[dict]: """Filtre les repos par include/exclude (logique partagee).""" if include: repos = [r for r in repos if _matches_any(r["name"], include)] if exclude: repos = [r for r in repos if not _matches_any(r["name"], exclude)] return repos def collect_milestones( client: GiteaClient, include: list[str] | None = None, exclude: list[str] | None = None, ) -> list[MilestoneData]: """Collecte les milestones de tous les repos accessibles. Reutilise la logique de filtrage de collect_all (include/exclude). Pour chaque repo filtre, appelle client.get_milestones() avec state=all. Retourne une liste plate de MilestoneData triee par repo puis milestone. """ repos = client.get_repos() repos = _filter_repos(repos, include, exclude) result: list[MilestoneData] = [] for repo in repos: owner = repo["owner"]["login"] name = repo["name"] milestones = client.get_milestones(owner, name, state="all") for ms in milestones: total = ms["open_issues"] + ms["closed_issues"] pct = round(ms["closed_issues"] / total * 100) if total > 0 else 0 result.append( MilestoneData( repo_name=name, title=ms["title"], open_issues=ms["open_issues"], closed_issues=ms["closed_issues"], progress_pct=pct, due_on=ms.get("due_on"), state=ms.get("state", "open"), ) ) return result