From f605b94ae3b1182565997724f25801a8051ce580 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Mon, 18 May 2026 11:02:57 +0200 Subject: [PATCH 1/7] chore: git integration with full clones MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- scripts/services/git-integration.yaml | 9 +- .../src/crowdgit/database/crud.py | 29 +- .../src/crowdgit/models/clone_batch.py | 11 - .../crowdgit/services/clone/clone_service.py | 340 +++++--------- .../services/commit/commit_service.py | 215 +++------ .../src/crowdgit/services/utils.py | 10 +- .../git_integration/src/crowdgit/settings.py | 14 +- .../src/crowdgit/worker/repository_worker.py | 89 ++-- .../src/test/outputs/test-repo_actual.json | 403 +++++++++-------- .../src/test/outputs/test-repo_expected.json | 417 +++++++++--------- .../src/test/test_activity_extraction.py | 31 +- 11 files changed, 718 insertions(+), 850 deletions(-) diff --git a/scripts/services/git-integration.yaml b/scripts/services/git-integration.yaml index b04389ad44..149ff358a0 100644 --- a/scripts/services/git-integration.yaml +++ b/scripts/services/git-integration.yaml @@ -4,13 +4,14 @@ x-env-args: &env-args NODE_ENV: docker SERVICE: git-integration SHELL: /bin/sh + REPO_STORAGE_ROOT: /var/lib/crowdgit services: git-integration: build: context: ../../ dockerfile: ./scripts/services/docker/Dockerfile.git_integration - working_dir: /usr/crowd/app/services/apps/git_integration + working_dir: /usr/crowd/app env_file: - ../../backend/.env.dist.local - ../../backend/.env.dist.composed @@ -23,6 +24,8 @@ services: - crowd-bridge ports: - '8085:8085' + volumes: + - git-integration-repos:/var/lib/crowdgit git-integration-dev: build: @@ -44,7 +47,11 @@ services: - crowd-bridge volumes: - ../../services/apps/git_integration/src:/usr/crowd/app/services/apps/git_integration/src + - git-integration-repos:/var/lib/crowdgit networks: crowd-bridge: external: true + +volumes: + git-integration-repos: diff --git a/services/apps/git_integration/src/crowdgit/database/crud.py b/services/apps/git_integration/src/crowdgit/database/crud.py index 41c665777a..4c35f4039a 100644 --- a/services/apps/git_integration/src/crowdgit/database/crud.py +++ b/services/apps/git_integration/src/crowdgit/database/crud.py @@ -8,9 +8,11 @@ from crowdgit.models.repository import Repository from crowdgit.models.service_execution import ServiceExecution from crowdgit.settings import ( + FAILED_RETRY_INTERVAL_HOURS, MAX_CONCURRENT_ONBOARDINGS, MAX_INTEGRATION_RESULTS, REPOSITORY_UPDATE_INTERVAL_HOURS, + STUCK_REPO_TIMEOUT_HOURS, ) from .connection import get_db_connection @@ -74,7 +76,7 @@ async def acquire_onboarding_repo() -> Repository | None: JOIN git."repositoryProcessing" rp ON rp."repositoryId" = r.id CROSS JOIN current_onboarding_count c WHERE rp.state = $2 - AND rp."lockedAt" IS NULL + AND (rp."lockedAt" IS NULL OR rp."lockedAt" < NOW() - INTERVAL '1 hour' * $4::numeric) AND r."deletedAt" IS NULL AND c.count < $3 ORDER BY rp.priority ASC, rp."createdAt" ASC @@ -93,7 +95,7 @@ async def acquire_onboarding_repo() -> Repository | None: """ return await acquire_repository( onboarding_repo_sql_query, - (RepositoryState.PROCESSING, RepositoryState.PENDING, MAX_CONCURRENT_ONBOARDINGS), + (RepositoryState.PROCESSING, RepositoryState.PENDING, MAX_CONCURRENT_ONBOARDINGS, STUCK_REPO_TIMEOUT_HOURS), ) @@ -141,9 +143,11 @@ async def acquire_recurrent_repo() -> Repository | None: FROM public.repositories r JOIN git."repositoryProcessing" rp ON rp."repositoryId" = r.id WHERE NOT (rp.state = ANY($2)) - AND rp."lockedAt" IS NULL + AND (rp."lockedAt" IS NULL OR rp."lockedAt" < NOW() - INTERVAL '1 hour' * $4::numeric) AND r."deletedAt" IS NULL - AND rp."lastProcessedAt" < NOW() - INTERVAL '1 hour' * $3 + AND rp."lastProcessedAt" < NOW() - INTERVAL '1 hour' * ( + CASE WHEN rp.state = 'failed' THEN $5::numeric ELSE $3::numeric END + ) AND NOT ( r.url LIKE '%gerrit.automotivelinux.org%' AND EXISTS (SELECT 1 FROM automotivelinux_processing) @@ -170,7 +174,7 @@ async def acquire_recurrent_repo() -> Repository | None: ) return await acquire_repository( recurrent_repo_sql_query, - (RepositoryState.PROCESSING, states_to_exclude, REPOSITORY_UPDATE_INTERVAL_HOURS), + (RepositoryState.PROCESSING, states_to_exclude, REPOSITORY_UPDATE_INTERVAL_HOURS, STUCK_REPO_TIMEOUT_HOURS, FAILED_RETRY_INTERVAL_HOURS), ) @@ -197,7 +201,7 @@ async def acquire_pending_reonboard_repo() -> Repository | None: FROM public.repositories r JOIN git."repositoryProcessing" rp ON rp."repositoryId" = r.id WHERE rp.state = $1 - AND rp."lockedAt" IS NULL + AND (rp."lockedAt" IS NULL OR rp."lockedAt" < NOW() - INTERVAL '1 hour' * $3::numeric) AND r."deletedAt" IS NULL ORDER BY rp.priority ASC, rp."lastProcessedAt" ASC LIMIT 1 @@ -218,7 +222,7 @@ async def acquire_pending_reonboard_repo() -> Repository | None: """ return await acquire_repository( pending_reonboard_sql_query, - (RepositoryState.PENDING_REONBOARD, RepositoryState.PROCESSING), + (RepositoryState.PENDING_REONBOARD, RepositoryState.PROCESSING, STUCK_REPO_TIMEOUT_HOURS), ) @@ -268,6 +272,17 @@ async def release_repo(repo_id: str): return str(result) +async def update_lock_heartbeat(repo_id: str): + """Refresh lockedAt timestamp for an actively-processing repo to prevent stale-lock reclaim.""" + sql_query = """ + UPDATE git."repositoryProcessing" + SET "lockedAt" = NOW(), + "updatedAt" = NOW() + WHERE "repositoryId" = $1 + """ + await execute(sql_query, (repo_id,)) + + async def update_last_processed_commit(repo_id: str, commit_hash: str, branch: str | None = None): """ Update last processed commit and optionally the branch after processing diff --git a/services/apps/git_integration/src/crowdgit/models/clone_batch.py b/services/apps/git_integration/src/crowdgit/models/clone_batch.py index 6e98d04f99..3a3e337a01 100644 --- a/services/apps/git_integration/src/crowdgit/models/clone_batch.py +++ b/services/apps/git_integration/src/crowdgit/models/clone_batch.py @@ -11,17 +11,6 @@ class CloneBatchInfo(BaseModel): latest_commit_in_repo: str | None = Field( None, description="Hash of the latest commit in repo" ) - edge_commit: str | None = Field( - default=None, - description="The oldest commit in the current batch, used to track progress during incremental processing.", - ) - prev_batch_edge_commit: str | None = Field( - default=None, - description="The edge commit from the previous batch, used to track progress during incremental processing.", - ) - clone_with_batches: bool = Field( - default=True, description="Whether repo is cloned with batches" - ) class Config: """Pydantic configuration""" diff --git a/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py b/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py index b563316e73..52ec411de4 100644 --- a/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py +++ b/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py @@ -5,7 +5,6 @@ from collections.abc import AsyncIterator from decimal import Decimal -import aiofiles from tenacity import ( retry, retry_if_exception_type, @@ -18,7 +17,9 @@ from crowdgit.enums import ErrorCode, ExecutionStatus, OperationType from crowdgit.errors import ( CommandExecutionError, + CommandTimeoutError, CrowdGitError, + DiskSpaceError, NetworkError, RateLimitError, RemoteServerError, @@ -31,6 +32,7 @@ get_repo_name, run_shell_command, ) +from crowdgit.settings import REPO_STORAGE_ROOT DEFAULT_STORAGE_OPTIMIZATION_THRESHOLD_MB = 10000 @@ -57,43 +59,6 @@ def __init__(self): def _is_gerrit_remote(remote: str) -> bool: return any(pattern in remote for pattern in GERRIT_PATTERNS) - async def _check_if_final_batch(self, path: str, target_commit_hash: str | None) -> bool: - """ - Final batch is determined if: - - full history is cloned (no longer shallow_clone) - - target commit reached - """ - is_shallow_clone = await run_shell_command( - ["git", "rev-parse", "--is-shallow-repository"], cwd=path - ) - if "false" in is_shallow_clone: - return True - if not target_commit_hash: - return False - try: - await run_shell_command( - ["git", "rev-parse", "--verify", f"{target_commit_hash}^{{commit}}"], cwd=path - ) - self.logger.info(f"Target commit {target_commit_hash} reached") - return True - except CommandExecutionError: - return False - - @retry_on_clone_error - async def _perform_minimal_clone(self, path: str, remote: str) -> None: - """ - Perform minimal clone of depth=1 - """ - # increasing post buffer to avoid RPC failed error - await run_shell_command( - ["git", "config", "--global", "http.postBuffer", "524288000"], cwd=path - ) - self.logger.info("Initializing minimal clone") - await run_shell_command( - ["git", "clone", "--depth=1", "--no-tags", "--single-branch", remote, "."], cwd=path - ) - self.logger.info("Minimal clone initialized successfully") - async def _get_repo_size_mb(self, repo_path: str) -> float: try: result = await run_shell_command(["du", "-sm", repo_path], cwd=repo_path) @@ -144,73 +109,16 @@ async def _optimize_repository_storage( self.logger.error(f"Failed to perform git gc: {repr(e)}") # Don't raise - gc failure shouldn't stop processing - @retry_on_clone_error - async def _clone_next_batch(self, repo_path: str, batch_depth: int, remote: str): - default_branch = await get_default_branch(repo_path) - self.logger.info( - f"Fetching an additional {batch_depth} commits from {default_branch} branch" - ) - try: - await run_shell_command( - ["git", "fetch", "origin", default_branch, f"--deepen={batch_depth}"], - cwd=repo_path, - ) - except RemoteServerError: - if self._is_gerrit_remote(remote): - self.logger.warning( - "Gerrit server error on --deepen, falling back to unshallow fetch" - ) - await run_shell_command(["git", "fetch", "--unshallow"], cwd=repo_path) - else: - raise - # Optimize repository storage using git garbage collection - await self._optimize_repository_storage(repo_path) - - async def _update_batch_info( - self, - batch_info: CloneBatchInfo, - repo_path: str, - target_commit_hash: str | None, - clone_with_batches: bool, - ) -> None: - """Update batch info with repo path and final batch status. - - For full clones (clone_with_batches=False): Marks as final batch immediately. - For batched clones (clone_with_batches=True): Checks if target commit reached or full history fetched. - """ + async def _update_batch_info(self, batch_info: CloneBatchInfo, repo_path: str) -> None: batch_info.repo_path = repo_path - batch_info.clone_with_batches = clone_with_batches + batch_info.is_final_batch = True if batch_info.is_first_batch: - # Set latest commit only from first batch latest_commit_output = await run_shell_command( ["git", "rev-parse", "HEAD"], cwd=repo_path, ) batch_info.latest_commit_in_repo = latest_commit_output.strip() - if not clone_with_batches: - # Full clone: always final batch since entire repository history is available - batch_info.is_final_batch = True - return - - batch_info.is_final_batch = await self._check_if_final_batch(repo_path, target_commit_hash) - batch_info.edge_commit = await self._get_edge_commit(repo_path) - - async def _get_edge_commit(self, repo_path: str): - """ - Returns the edge commit of a shallow clone by reading the .git/shallow file, - which contains the boundary commit(s) when history is truncated. - - If the full history has been cloned, the .git/shallow file does not exist. - """ - shallow_file = os.path.join(repo_path, ".git", "shallow") - try: - async with aiofiles.open(shallow_file, "r", encoding="utf-8") as f: - oldest_commit = (await f.readline()).strip() - self.logger.info(f"Edge commit: {oldest_commit}") - return oldest_commit - except FileNotFoundError: - return None async def _cleanup_temp_directory(self, temp_repo_path: str, repo_id: str) -> None: """ @@ -253,61 +161,6 @@ async def _cleanup_temp_directory_with_retries(self, temp_repo_path: str) -> Non self.logger.info(f"cleaning temp dir {temp_repo_path}") shutil.rmtree(temp_repo_path) - @retry( - stop=stop_after_attempt(3), - wait=wait_fixed(2), - reraise=False, - ) - async def _cleanup_working_directory(self, repo_path: str) -> None: - """ - Remove all files and directories from the repository except the .git directory. - This helps reduce disk usage while preserving git history for commit processing. - """ - self.logger.info(f"Cleaning working directory: {repo_path}") - - # Use find command to remove everything except .git directory - await run_shell_command( - [ - "find", - ".", - "-mindepth", - "1", - "-maxdepth", - "1", - "!", - "-name", - ".git", - "-exec", - "rm", - "-rf", - "{}", - "+", - ], - cwd=repo_path, - ) - - self.logger.info("Working directory cleanup completed") - - async def _calculate_batch_depth(self, repo_path: str, remote: str) -> int: - calculated_depth = None - total_branches_tags = await run_shell_command( - ["git", "ls-remote", "--heads", "--tags", remote], cwd=repo_path - ) - total_branches_tags = len(total_branches_tags.splitlines()) - if total_branches_tags <= 200: - # Small repo, get a decent amount of history - calculated_depth = 100 - elif total_branches_tags <= 1000: - # Medium repo, get a moderate amount of history - calculated_depth = 50 - else: - # Large repo, get less history - calculated_depth = 5 - self.logger.info( - f"total_branches_tags={total_branches_tags}, calculated_depth={calculated_depth}" - ) - return calculated_depth - @retry_on_clone_error async def _perform_full_clone(self, repo_path: str, remote: str): """Perform full repository clone""" @@ -317,6 +170,60 @@ async def _perform_full_clone(self, repo_path: str, remote: str): ) self.logger.info(f"Successfully completed full clone of repository: {remote}") + def _stable_repo_path(self, repo_id: str) -> str | None: + """Returns a stable on-disk path for the repo when REPO_STORAGE_ROOT is set, else None.""" + if REPO_STORAGE_ROOT is None: + return None + return os.path.join(REPO_STORAGE_ROOT, str(repo_id)) + + async def _is_repo_valid(self, repo_path: str) -> bool: + """Return True if the local clone is usable: HEAD resolves and a remote tracking branch exists.""" + try: + await run_shell_command(["git", "rev-parse", "HEAD"], cwd=repo_path) + branch = await get_default_branch(repo_path) + return branch != "*" + except Exception: + return False + + @retry_on_clone_error + async def _incremental_fetch(self, repo_path: str, remote: str) -> None: + """Fetch latest commits into a persistent clone and update the working tree.""" + # Remove stale git lockfiles left by a previous crash before touching the repo. + git_dir = os.path.join(repo_path, ".git") + for dirpath, _, filenames in os.walk(git_dir): + for filename in filenames: + if filename.endswith(".lock"): + lockpath = os.path.join(dirpath, filename) + try: + os.remove(lockpath) + self.logger.warning(f"Removed stale lockfile: {lockpath}") + except OSError as e: + self.logger.warning(f"Failed to remove stale lockfile {lockpath}: {e}") + + default_branch = await get_default_branch(repo_path) + if default_branch == "*": + raise CommandExecutionError( + f"Cannot fetch {remote}: no remote tracking branch found (detached HEAD)", + returncode=1, + ) + self.logger.info(f"Fetching {default_branch} from {remote}") + await run_shell_command( + ["git", "fetch", "--no-tags", "origin", default_branch], cwd=repo_path + ) + # Update working tree so file-scanning services (licensee, rg, vuln scanner) see + # current file content, not the state from the previous run. + await run_shell_command( + ["git", "reset", "--hard", f"origin/{default_branch}"], cwd=repo_path + ) + await self._optimize_repository_storage(repo_path) + + async def _wipe_and_reclone(self, repo_path: str, remote: str) -> None: + """Wipe an existing persistent clone and replace it with a fresh full clone.""" + self.logger.info(f"Wiping and re-cloning {remote}") + shutil.rmtree(repo_path) + os.makedirs(repo_path) + await self._perform_full_clone(repo_path, remote) + async def has_default_branch_changed(self, remote: str, saved_branch: str | None) -> bool: """Check if the default branch has changed compared to the saved branch Args: @@ -350,57 +257,23 @@ async def has_default_branch_changed(self, remote: str, saved_branch: str | None # On error, assume no change to avoid unnecessary re-cloning return False - async def determine_clone_strategy( - self, repo_path: str, remote: str, branch: str | None, last_processed_commit: str | None - ) -> bool: - """Determine whether to use full clone or minimal clone strategy based on repository state. - - Args: - repo_path: Local path where repository will be cloned - remote: Remote repository URL (e.g., 'https://github.com/user/repo') - branch: Current saved branch name or None for new repositories - last_processed_commit: Last processed commit hash or None for new repositories - - Returns: (clone_with_batches) - bool: False for full clone (clone_with_batches=False), True for minimal clone (clone_with_batches=True) - - Strategy: - - Full clone: New repositories (last_processed_commit=None) or branch changed - - Minimal clone: Existing repositories with unchanged branch for incremental processing - """ - - self.logger.info( - f"Starting clone decision for {remote} (branch: {branch}, last_commit: {last_processed_commit})" - ) - - default_branch_changed = await self.has_default_branch_changed(remote, branch) - - if not last_processed_commit or default_branch_changed: - reason = "new repository" if not last_processed_commit else "branch changed" - self.logger.info(f"Performing full clone for {remote} - reason: {reason}") - await self._perform_full_clone(repo_path, remote) - return False - - self.logger.info( - f"Performing minimal clone for {remote} - existing repository with unchanged branch" - ) - await self._perform_minimal_clone(repo_path, remote) - return True - async def clone_batches_generator( self, repository: Repository, - working_dir_cleanup: bool | None = False, ) -> AsyncIterator[CloneBatchInfo]: """ - Async generator that yields CloneBatchInfo for repository cloning. + Async generator that yields a single CloneBatchInfo per call. - For new repositories (clone_with_batches=False): Performs full clone to avoid inefficient batching (stacked git objects). + When REPO_STORAGE_ROOT is set (staging/prod k8s): uses a stable on-disk path per repo. + First run performs a full clone; subsequent runs do an incremental git fetch + working + tree reset. No temp dir cleanup — the clone persists across runs. - For existing repositories (clone_with_batches=True): Uses incremental batched - processing to fetch only new commits since last processing. + When REPO_STORAGE_ROOT is unset (local dev / Docker Compose): falls back to the legacy + ephemeral behaviour — tempfile.mkdtemp + full clone + cleanup in finally. Behaviour is + identical to before this change. """ - temp_repo_path = None + repo_path = None + stable_path = self._stable_repo_path(repository.id) execution_status = ExecutionStatus.SUCCESS error_code = None error_message = None @@ -408,60 +281,69 @@ async def clone_batches_generator( remote = repository.url.removesuffix(".git") batch_info = CloneBatchInfo( - repo_path=temp_repo_path, + repo_path=repo_path, remote=remote, is_final_batch=False, is_first_batch=True, ) try: - temp_repo_path = tempfile.mkdtemp(prefix=f"{get_repo_name(remote)}_") batch_start_time = time.time() - clone_with_batches = await self.determine_clone_strategy( - temp_repo_path, remote, repository.branch, repository.last_processed_commit - ) - if clone_with_batches: - batch_depth = await self._calculate_batch_depth(temp_repo_path, remote) - await self._update_batch_info( - batch_info, temp_repo_path, repository.last_processed_commit, clone_with_batches - ) - batch_end_time = time.time() - total_execution_time += round(batch_end_time - batch_start_time, 2) + if stable_path is not None: + # Persistent path — used when REPO_STORAGE_ROOT env var is set (k8s). + repo_path = stable_path + os.makedirs(repo_path, exist_ok=True) + + if os.path.isdir(os.path.join(repo_path, ".git")): + if not await self._is_repo_valid(repo_path): + self.logger.warning( + f"Repo at {repo_path} is in an invalid state, wiping and re-cloning" + ) + await self._wipe_and_reclone(repo_path, remote) + else: + default_branch_changed = await self.has_default_branch_changed( + remote, repository.branch + ) + if default_branch_changed: + await self._wipe_and_reclone(repo_path, remote) + else: + await self._incremental_fetch(repo_path, remote) + else: + # Defensive: wipe in case a prior crash left checkout artifacts with no .git. + shutil.rmtree(repo_path, ignore_errors=True) + os.makedirs(repo_path) + await self._perform_full_clone(repo_path, remote) + else: + # Ephemeral path — legacy behaviour for local dev / Docker Compose. + repo_path = tempfile.mkdtemp(prefix=f"{get_repo_name(remote)}_") + await self._perform_full_clone(repo_path, remote) - yield batch_info - if working_dir_cleanup: - await self._cleanup_working_directory(temp_repo_path) - - batch_info.is_first_batch = False - while not batch_info.is_final_batch: - batch_start_time = time.time() - batch_info.prev_batch_edge_commit = await self._get_edge_commit(temp_repo_path) - await self._clone_next_batch(temp_repo_path, batch_depth, remote) - await self._update_batch_info( - batch_info, - temp_repo_path, - repository.last_processed_commit, - clone_with_batches, - ) - batch_end_time = time.time() - total_execution_time += round(batch_end_time - batch_start_time, 2) + await self._update_batch_info(batch_info, repo_path) + total_execution_time += round(time.time() - batch_start_time, 2) - yield batch_info + yield batch_info except Exception as e: - # Handle both CrowdGitError and generic Exception execution_status = ExecutionStatus.FAILURE error_message = e.error_message if isinstance(e, CrowdGitError) else repr(e) error_code = ( e.error_code.value if isinstance(e, CrowdGitError) else ErrorCode.UNKNOWN.value ) self.logger.error(f"Cloning failed: {error_message}") + # Wipe persistent clone only on non-transient failures that indicate local git + # corruption. Transient/remote errors (network, rate-limit, disk-full, timeout) + # leave the local clone intact — no point re-cloning 6+ GB because GitHub was down. + _TRANSIENT_ERRORS = (RateLimitError, NetworkError, RemoteServerError, DiskSpaceError, CommandTimeoutError) + if stable_path and os.path.isdir(os.path.join(stable_path, ".git")): + if not isinstance(e, _TRANSIENT_ERRORS): + self.logger.warning(f"Wiping persistent clone at {stable_path} after non-transient failure") + shutil.rmtree(stable_path, ignore_errors=True) raise finally: - if temp_repo_path and os.path.exists(temp_repo_path): - await self._cleanup_temp_directory(temp_repo_path, repository.id) + # Only clean up ephemeral temp dirs. Persistent clones are intentionally kept. + if stable_path is None and repo_path and os.path.exists(repo_path): + await self._cleanup_temp_directory(repo_path, repository.id) - # Save metrics service_execution = ServiceExecution( repo_id=repository.id, operation_type=OperationType.CLONE, diff --git a/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py b/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py index 013167d13d..f1c5890036 100644 --- a/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py +++ b/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py @@ -9,14 +9,13 @@ from typing import Any import orjson -from loguru import logger from pydantic import validate_email -from tenacity import retry, stop_after_attempt, wait_fixed from crowdgit.database.crud import ( batch_check_parent_activities, batch_insert_activities, save_service_execution, + update_last_processed_commit, ) from crowdgit.enums import ( DataSinkWorkerQueueMessageType, @@ -54,6 +53,7 @@ class CommitService(BaseService): _EMAIL_TYPE = "email" MAX_CHUNK_SIZE = 250 + HASH_BATCH_SIZE = 1000 def __init__(self, queue_service: QueueService): super().__init__() @@ -126,18 +126,7 @@ async def process_single_batch_commits( batch_start_time = time.time() try: - self.logger.info( - f"Starting commits processing for new batch having commits older than {batch_info.prev_batch_edge_commit}" - ) - raw_commits = await self._execute_git_log( - batch_info.repo_path, - batch_info.clone_with_batches, - batch_info.prev_batch_edge_commit, - batch_info.edge_commit, - repository.last_processed_commit, - ) - - await self._process_activities_from_commits(raw_commits, batch_info, repository) + await self._process_with_hash_streaming(batch_info, repository) batch_end_time = time.time() batch_time = round(batch_end_time - batch_start_time, 2) @@ -215,18 +204,69 @@ async def _get_commit_reference(self, repo_path: str) -> str: return "HEAD" return f"origin/{default_branch}" - def _build_git_log_command(self, repo_path: str, commit_range: str) -> list[str]: - """Build git log commands for commits and numstats.""" - return [ + async def _get_commit_hashes(self, repo_path: str, commit_range: str) -> list[str]: + """Stream commit hashes oldest-first via rev-list. No buffering in git subprocess.""" + output = await run_shell_command( + ["git", "-C", repo_path, "rev-list", "--reverse", commit_range] + ) + return [h for h in output.strip().splitlines() if h.strip()] + + async def _fetch_commits_batch(self, repo_path: str, hashes: list[str]) -> str: + """Fetch full commit data for exactly the given hashes (no parent traversal).""" + cmd = [ "git", + "-c", "core.abbrevCommit=false", "-C", repo_path, "log", - commit_range, + "--no-walk=unsorted", "--cc", "--numstat", f"--pretty=format:{self.git_log_format}", - ] + ] + hashes + return await run_shell_command(cmd) + + async def _process_with_hash_streaming( + self, batch_info: CloneBatchInfo, repository: Repository + ) -> None: + """Process all commits for a full clone via hash-based streaming. + + Uses git rev-list to get hashes (no subprocess buffering), then fetches + commit data in batches of HASH_BATCH_SIZE. Checkpoints after each batch + so a crash at most replays one batch worth of commits. + """ + repo_path = batch_info.repo_path + commit_reference = await self._get_commit_reference(repo_path) + if repository.last_processed_commit: + commit_range = f"{repository.last_processed_commit}..{commit_reference}" + self.logger.info(f"Processing incremental range: {commit_range}") + else: + commit_range = commit_reference + self.logger.info(f"Processing full history in {commit_reference}") + + hashes = await self._get_commit_hashes(repo_path, commit_range) + if not hashes: + self.logger.info("No new commits to process") + return + + branch = await get_default_branch(repo_path) + total_batches = (len(hashes) + self.HASH_BATCH_SIZE - 1) // self.HASH_BATCH_SIZE + self.logger.info(f"Processing {len(hashes)} commits in {total_batches} hash batches") + + for batch_num, i in enumerate(range(0, len(hashes), self.HASH_BATCH_SIZE), 1): + batch_hashes = hashes[i : i + self.HASH_BATCH_SIZE] + raw_commits = await self._fetch_commits_batch(repo_path, batch_hashes) + await self._process_activities_from_commits(raw_commits, batch_info, repository) + # Checkpoint at last hash in batch (oldest-first from rev-list = newest in batch). + # Next run resumes from batch_hashes[-1]..HEAD. + await update_last_processed_commit( + repo_id=repository.id, + commit_hash=batch_hashes[-1], + branch=branch, + ) + self.logger.info( + f"Hash batch {batch_num}/{total_batches} checkpointed at {batch_hashes[-1]}" + ) def _parse_numstats(self, raw_numstats: str) -> tuple[int, int]: """ @@ -250,96 +290,6 @@ def _parse_numstats(self, raw_numstats: str) -> tuple[int, int]: return (insertions, deletions) - async def _get_optimized_commit_range( - self, - repo_path: str, - edge_commit: str, - prev_batch_edge_commit: str, - last_processed_commit: str | None, - ) -> str: - """ - Optimize commit range by using last_processed_commit if available in current batch. - - For middle batches, returns the slice of history between the last batch's edge and this one's. - If last_processed_commit exists in the current batch and is not the shallow boundary, - uses it as the range start to skip already-processed commits. - - Args: - repo_path: Local repository path - edge_commit: Current batch's edge commit (shallow boundary) - prev_batch_edge_commit: Previous batch's edge commit (upper bound of range) - last_processed_commit: Last commit that was successfully processed (if any) - - Returns: - Git commit range string (e.g., "commit_a..commit_b") - """ - - default_commit_range = f"{edge_commit}..{prev_batch_edge_commit}" - - if last_processed_commit and last_processed_commit != edge_commit: - try: - self.logger.info("Checking last processed commit existence in current batch") - await run_shell_command( - ["git", "cat-file", "-e", last_processed_commit], cwd=repo_path - ) - self.logger.info("Found! using optimized range") - default_commit_range = f"{last_processed_commit}..{prev_batch_edge_commit}" - except Exception: - self.logger.info("last processed commit not found in range") - return default_commit_range - - @retry( - stop=stop_after_attempt(5), - wait=wait_fixed(1), - reraise=True, - ) - async def _execute_git_log( - self, - repo_path: str, - clone_with_batches: bool, - prev_batch_edge_commit: str | None = None, - edge_commit: str | None = None, - last_processed_commit: str | None = None, - ) -> str: - """Execute git log command and return raw output.""" - # Ensure abbreviated commits are disabled - await run_shell_command( - ["git", "-C", repo_path, "config", "core.abbrevCommit", "false"], cwd=repo_path - ) - - self.logger.info("Running git log commands...") - - if not clone_with_batches: - commit_reference = await self._get_commit_reference(repo_path) - self.logger.info( - f"Full repo cloned in single batch, getting all commits in {commit_reference}" - ) - raw_commits_cmd = self._build_git_log_command(repo_path, commit_reference) - return await run_shell_command(raw_commits_cmd) - - if not prev_batch_edge_commit: - return "" - - if edge_commit: - commit_range = await self._get_optimized_commit_range( - repo_path, edge_commit, prev_batch_edge_commit, last_processed_commit - ) - self.logger.info(f"Processing middle batch: {commit_range}") - else: - # Final batch: Get all commits from the last known edge to the root. - commit_range = prev_batch_edge_commit - self.logger.info(f"Processing final batch from: {prev_batch_edge_commit} to root") - - raw_commits_cmd = self._build_git_log_command(repo_path, commit_range) - - self.logger.info(f"Executing git log for range: {commit_range}") - return await run_shell_command(raw_commits_cmd) - - def should_skip_commit(self, raw_commit: str | None, edge_commit: str | None) -> bool: - """Check if commit should be skipped based on edge commit comparison.""" - # Only skip the boundary commit of the current shallow clone. - return not raw_commit or (edge_commit and raw_commit.startswith(edge_commit)) - def clean_up_username(self, name: str): name = re.sub(r"(?i)Reviewed[- ]by:", "", name) name = re.sub(r"(?i)from:", "", name) @@ -700,17 +650,7 @@ async def process_commits_chunk( batch_info: CloneBatchInfo, repository: Repository, ) -> None: - """ - Process a chunk of raw commit texts into activities and write them to DB and Kafka. - - Args: - commit_texts_chunk: List of commit text strings to process - repo_path: Path to the repository - edge_commit_hash: Edge commit hash for filtering - remote: Remote repository URL - segment_id: Segment identifier - integration_id: Integration identifier - """ + """Process a chunk of raw commit texts into activities and write them to DB and Kafka.""" activities_db = [] activities_queue = [] bad_commits = 0 @@ -718,7 +658,7 @@ async def process_commits_chunk( commit = None for full_commit_text in commit_texts_chunk: - if self.should_skip_commit(full_commit_text, batch_info.edge_commit): + if not full_commit_text: continue commit_text, numstats_text = full_commit_text.split(self.NUMSTAT_SPLITTER) commit_lines = commit_text.strip().splitlines() @@ -802,7 +742,7 @@ async def _process_activities_from_commits( del raw_commits gc.collect() - logger.info(f"Actual number of commits to be processed: {len(commit_texts)}") + self.logger.info(f"Actual number of commits to be processed: {len(commit_texts)}") # Update total_commits metric if self._metrics_context: @@ -820,22 +760,14 @@ async def _process_activities_from_commits( max_concurrent = 2 semaphore = asyncio.Semaphore(max_concurrent) - self.logger.info(f"Processing with max_concurrent={max_concurrent}") - completed_chunks = 0 async def process_single_chunk(chunk_start_idx: int, chunk_end_idx: int): - nonlocal completed_chunks, total_chunks - + nonlocal completed_chunks async with semaphore: chunk = commit_texts[chunk_start_idx:chunk_end_idx] # noqa: F821 try: - # Process chunk and write to DB/Kafka - await self.process_commits_chunk( - chunk, - batch_info, - repository, - ) + await self.process_commits_chunk(chunk, batch_info, repository) completed_chunks += 1 self.logger.info(f"Progress: {completed_chunks}/{total_chunks} chunks") del chunk @@ -843,24 +775,23 @@ async def process_single_chunk(chunk_start_idx: int, chunk_end_idx: int): self.logger.error(f"Error processing chunk: {repr(e)}") raise - tasks = [ - process_single_chunk(i, min(i + chunk_size, len(commit_texts))) + running_tasks = [ + asyncio.create_task( + process_single_chunk(i, min(i + chunk_size, len(commit_texts))) + ) for i in range(0, len(commit_texts), chunk_size) ] - self.logger.info( - f"Created {len(tasks)} tasks. Processing with max {max_concurrent} concurrent tasks" - ) - try: - await asyncio.gather(*tasks) - + await asyncio.gather(*running_tasks) self.logger.info(f"All {total_chunks} chunks processed successfully.") - except Exception as e: self.logger.error( f"Error during chunk processing at chunk {completed_chunks}/{total_chunks}: {e}" ) + for t in running_tasks: + t.cancel() + await asyncio.gather(*running_tasks, return_exceptions=True) raise finally: @@ -972,7 +903,7 @@ def _validate_and_adjust_datetime(self, commit_datetime: str, author_datetime: s return commit_datetime - except ValueError: + except (ValueError, TypeError): self.logger.warning( f"Invalid commit datetime format: {commit_datetime}, using author datetime" ) diff --git a/services/apps/git_integration/src/crowdgit/services/utils.py b/services/apps/git_integration/src/crowdgit/services/utils.py index d3e96f7bef..633972c138 100644 --- a/services/apps/git_integration/src/crowdgit/services/utils.py +++ b/services/apps/git_integration/src/crowdgit/services/utils.py @@ -115,11 +115,15 @@ async def get_remote_default_branch(remote_url: str) -> str | None: # Extract branch name from "ref: refs/heads/main" return line.split("refs/heads/")[-1].split("\t")[0] - # Fallback: if symbolic ref not available, try common branches + # Fallback: if symbolic ref not available, try common branches. + # ls-remote exits 0 even when branch absent — must check stdout is non-empty. for branch in ["main", "master"]: try: - await run_shell_command(["git", "ls-remote", "--heads", remote_url, branch]) - return branch + output = await run_shell_command( + ["git", "ls-remote", "--heads", remote_url, branch] + ) + if output.strip(): + return branch except CommandExecutionError: continue diff --git a/services/apps/git_integration/src/crowdgit/settings.py b/services/apps/git_integration/src/crowdgit/settings.py index 447f2f3342..9a81a8f493 100644 --- a/services/apps/git_integration/src/crowdgit/settings.py +++ b/services/apps/git_integration/src/crowdgit/settings.py @@ -2,14 +2,14 @@ def load_env_var(key: str, required=True, default=None): - value = os.getenv(key, default) + value = os.getenv(key) or default if required and value is None: raise OSError(f"Missing required environment variable: {key}") return value CROWD_DB_WRITE_HOST = load_env_var("CROWD_DB_WRITE_HOST") -CROWD_DB_PORT = load_env_var("CROWD_DB_PORT") +CROWD_DB_PORT = int(load_env_var("CROWD_DB_PORT")) CROWD_DB_USERNAME = load_env_var("CROWD_DB_USERNAME") CROWD_DB_PASSWORD = load_env_var("CROWD_DB_PASSWORD") CROWD_DB_DATABASE = load_env_var("CROWD_DB_DATABASE") @@ -38,9 +38,7 @@ def load_env_var(key: str, required=True, default=None): WORKER_SHUTDOWN_TIMEOUT_SEC = int(load_env_var("WORKER_SHUTDOWN_TIMEOUT_SEC", default="3600")) MAX_CONCURRENT_ONBOARDINGS = int(load_env_var("MAX_CONCURRENT_ONBOARDINGS", default="3")) MAX_INTEGRATION_RESULTS = int(load_env_var("MAX_INTEGRATION_RESULTS", default="5000000")) -STUCK_ONBOARDING_REPO_TIMEOUT_HOURS = int( - load_env_var("STUCK_ONBOARDING_REPO_TIMEOUT_HOURS", default="12") -) -STUCK_RECURRENT_REPO_TIMEOUT_HOURS = int( - load_env_var("STUCK_RECURRENT_REPO_TIMEOUT_HOURS", default="4") -) +LOCK_HEARTBEAT_INTERVAL_SEC = int(load_env_var("LOCK_HEARTBEAT_INTERVAL_SEC", default="300")) +STUCK_REPO_TIMEOUT_HOURS = int(load_env_var("STUCK_REPO_TIMEOUT_HOURS", default="1")) +FAILED_RETRY_INTERVAL_HOURS = int(load_env_var("FAILED_RETRY_INTERVAL_HOURS", default="1")) +REPO_STORAGE_ROOT = load_env_var("REPO_STORAGE_ROOT", required=False, default=None) diff --git a/services/apps/git_integration/src/crowdgit/worker/repository_worker.py b/services/apps/git_integration/src/crowdgit/worker/repository_worker.py index 3733d59af3..1c26544e54 100644 --- a/services/apps/git_integration/src/crowdgit/worker/repository_worker.py +++ b/services/apps/git_integration/src/crowdgit/worker/repository_worker.py @@ -1,5 +1,4 @@ import asyncio -from datetime import datetime, timezone from crowdgit.database.crud import ( acquire_repo_for_processing, @@ -7,13 +6,16 @@ mark_repo_as_processed, release_repo, update_last_processed_commit, + update_lock_heartbeat, update_repository_licenses, ) from crowdgit.enums import RepositoryState from crowdgit.errors import ( + EmptyRepoError, + ForbiddenError, InternalError, ParentRepoInvalidError, - ReOnboardingRequiredError, + PermissionError as RepoPermissionError, RepoAuthRequiredError, ) @@ -31,8 +33,7 @@ ) from crowdgit.services.utils import get_default_branch, get_repo_name from crowdgit.settings import ( - STUCK_ONBOARDING_REPO_TIMEOUT_HOURS, - STUCK_RECURRENT_REPO_TIMEOUT_HOURS, + LOCK_HEARTBEAT_INTERVAL_SEC, WORKER_ERROR_BACKOFF_SEC, WORKER_POLLING_INTERVAL_SEC, ) @@ -96,33 +97,15 @@ async def shutdown(self): logger.info("Worker services shutdown triggered") - async def _ensure_repo_not_stuck(self, repository: Repository): - """ - Check if repo is stuck and raise the appropriate exception if so. - Repos can get stuck in processing state for different reasons: - - Worker crash or restart (e.g. pod eviction due OOM, deployment after timeout, ...) - - `last_processed_commit` is no loger valid due to force-push, dangling-commit, or so... - - Race condition: remote is going under breaking changes at the same time we're processing it - - Network issues breaking the clone/pull operation - """ - # detection - processing_duration_hours = ( - datetime.now(timezone.utc) - repository.locked_at.astimezone(timezone.utc) - ).total_seconds() / 3600 - repo_stuck: bool = ( - repository.last_processed_commit - and processing_duration_hours >= STUCK_RECURRENT_REPO_TIMEOUT_HOURS - ) or ( - repository.last_processed_commit is None # onboarding - and processing_duration_hours >= STUCK_ONBOARDING_REPO_TIMEOUT_HOURS - ) - - # handling - if repo_stuck: - logger.warning( - f"Repo {repository.url} is stuck for {processing_duration_hours} hours — queuing for re-onboarding" - ) - raise ReOnboardingRequiredError() + async def _heartbeat_loop(self, repo_id: str, stop_event: asyncio.Event) -> None: + """Periodically refresh lockedAt so the repo is not reclaimed as a stale lock.""" + while not stop_event.is_set(): + await asyncio.sleep(LOCK_HEARTBEAT_INTERVAL_SEC) + if not stop_event.is_set(): + try: + await update_lock_heartbeat(repo_id) + except Exception as e: + logger.warning(f"Heartbeat update failed for repo {repo_id}: {e}") async def _process_repositories(self): """ @@ -138,14 +121,26 @@ async def _process_repositories(self): logger.debug("No repositories to process") return - await self._process_single_repository(available_repo_to_process) + stop_heartbeat = asyncio.Event() + heartbeat_task = asyncio.create_task( + self._heartbeat_loop(available_repo_to_process.id, stop_heartbeat) + ) + try: + await self._process_single_repository(available_repo_to_process) + finally: + stop_heartbeat.set() + heartbeat_task.cancel() + try: + await heartbeat_task + except asyncio.CancelledError: + pass except Exception as e: logger.error( f"Failed to process repository {available_repo_to_process} with error {e}" ) finally: if available_repo_to_process: - logger.info("releasing repo: ", available_repo_to_process.url) + logger.info(f"releasing repo: {available_repo_to_process.url}") await release_repo(available_repo_to_process.id) logger.info(f"Repo {available_repo_to_process.url} released!") @@ -231,10 +226,7 @@ async def _process_single_repository(self, repository: Repository): # Validate and get parent repo if this is a fork repository.parent_repo = await self._validate_and_get_parent_repo(repository) - async for batch_info in self.clone_service.clone_batches_generator( - repository, - working_dir_cleanup=True, - ): + async for batch_info in self.clone_service.clone_batches_generator(repository): logger.info(f"Clone batch info: {batch_info}") if batch_info.is_first_batch: await self.software_value_service.run(repository.id, batch_info.repo_path) @@ -249,24 +241,21 @@ async def _process_single_repository(self, repository: Repository): batch_info, ) - if batch_info.is_final_batch: - await update_last_processed_commit( - repo_id=repository.id, - commit_hash=batch_info.latest_commit_in_repo, - branch=await get_default_branch(batch_info.repo_path), - ) - else: - await self._ensure_repo_not_stuck(repository) + await update_last_processed_commit( + repo_id=repository.id, + commit_hash=batch_info.latest_commit_in_repo, + branch=await get_default_branch(batch_info.repo_path), + ) logger.info("Incremental processing completed successfully") processing_state = RepositoryState.COMPLETED - except ReOnboardingRequiredError: - logger.info(f"Repo {repository.url} needs re-onboarding, deferring until weekend") - processing_state = RepositoryState.PENDING_REONBOARD + except EmptyRepoError: + logger.info(f"Repository {repository.url} is empty, marking as completed") + processing_state = RepositoryState.COMPLETED except ParentRepoInvalidError as e: logger.error(f"Parent repo validation failed: {repr(e)}") processing_state = RepositoryState.REQUIRES_PARENT - except RepoAuthRequiredError as e: + except (RepoAuthRequiredError, ForbiddenError, RepoPermissionError) as e: logger.error(f"Repository requires authentication: {repr(e)}") processing_state = RepositoryState.AUTH_REQUIRED except Exception as e: @@ -276,7 +265,7 @@ async def _process_single_repository(self, repository: Repository): # Reset logger context for all services self._reset_all_contexts() - logger.info(f"Updating ${repository.url} state to ${processing_state}") + logger.info(f"Updating {repository.url} state to {processing_state}") await mark_repo_as_processed(repository.id, processing_state) logger.info("Completed processing repository: {}", repository.url) diff --git a/services/apps/git_integration/src/test/outputs/test-repo_actual.json b/services/apps/git_integration/src/test/outputs/test-repo_actual.json index aeb4bb4106..5eca14cb9b 100644 --- a/services/apps/git_integration/src/test/outputs/test-repo_actual.json +++ b/services/apps/git_integration/src/test/outputs/test-repo_actual.json @@ -1,33 +1,34 @@ [ { "type": "authored-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "d38f1a8036c1b3177d94682f226a9fe6cb387240", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "00bc152e7840d47fa31cdd89f9d3f53d3e12621a", "sourceParentId": "", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Merge pull request from external contributor\n\nThis commit was authored by a contributor but committed by a maintainer.\n\nSigned-off-by: Linda Contributor ", + "body": "Add configuration file\n\nThis commit adds the main configuration file for the project.\n\nSigned-off-by: Bob Reviewer ", + "username": "bob@example.com", "attributes": { - "insertions": 3, + "insertions": 1, "timezone": "UTC+01:00", "deletions": 0, - "lines": 3, + "lines": 1, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Linda Contributor", + "displayName": "Bob Reviewer", "identities": [ { "platform": "git", - "value": "linda@external.com", + "value": "bob@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "linda@external.com", + "value": "bob@example.com", "type": "email", "verified": false } @@ -36,34 +37,35 @@ "segmentId": "test-segment-id" }, { - "type": "committed-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "4619a3b615c79fe4be0de4a82150318898d8ea49", - "sourceParentId": "d38f1a8036c1b3177d94682f226a9fe6cb387240", + "type": "signed-off-commit", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "01095aa2caab05d8d08f2c4cb17ceb834294802b", + "sourceParentId": "8007a6c8648839cb4624521d986fb60945828f5d", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Merge pull request from external contributor\n\nThis commit was authored by a contributor but committed by a maintainer.\n\nSigned-off-by: Linda Contributor ", + "body": "Fix typo in config\n\nSigned-off-by: Ivan >\nReviewed-by: Jane <", + "username": "ivan@example.com", "attributes": { - "insertions": 3, + "insertions": 1, "timezone": "UTC+01:00", - "deletions": 0, - "lines": 3, + "deletions": 1, + "lines": 0, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Mike Maintainer", + "displayName": "Ivan", "identities": [ { "platform": "git", - "value": "mike@example.com", + "value": "ivan@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "mike@example.com", + "value": "ivan@example.com", "type": "email", "verified": false } @@ -72,34 +74,35 @@ "segmentId": "test-segment-id" }, { - "type": "signed-off-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "a321f844e34e4c4b2bd6b2997681fcf2dd5f47f5", - "sourceParentId": "d38f1a8036c1b3177d94682f226a9fe6cb387240", + "type": "reviewed-commit", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "44c7cc646f0b66c6e643bc0c2cb4fb3390865fda", + "sourceParentId": "8007a6c8648839cb4624521d986fb60945828f5d", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Merge pull request from external contributor\n\nThis commit was authored by a contributor but committed by a maintainer.\n\nSigned-off-by: Linda Contributor ", + "body": "Fix typo in config\n\nSigned-off-by: Ivan >\nReviewed-by: Jane <", + "username": "jane@example.com", "attributes": { - "insertions": 3, + "insertions": 1, "timezone": "UTC+01:00", - "deletions": 0, - "lines": 3, + "deletions": 1, + "lines": 0, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Linda Contributor", + "displayName": "Jane", "identities": [ { "platform": "git", - "value": "linda@external.com", + "value": "jane@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "linda@external.com", + "value": "jane@example.com", "type": "email", "verified": false } @@ -109,33 +112,34 @@ }, { "type": "authored-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "12b4d83852e06c5b3fa0e339c395bad0ee65200f", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "46e950cf179b11a1f6b6a8f69fb40ae67eb9be56", "sourceParentId": "", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Quick fix for production issue", + "body": "Merge pull request from external contributor\n\nThis commit was authored by a contributor but committed by a maintainer.\n\nSigned-off-by: Linda Contributor ", + "username": "linda@external.com", "attributes": { - "insertions": 1, + "insertions": 3, "timezone": "UTC+01:00", "deletions": 0, - "lines": 1, + "lines": 3, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Kelly Hotfix", + "displayName": "Linda Contributor", "identities": [ { "platform": "git", - "value": "kelly@example.com", + "value": "linda@external.com", "type": "username", "verified": true }, { "platform": "git", - "value": "kelly@example.com", + "value": "linda@external.com", "type": "email", "verified": false } @@ -145,33 +149,34 @@ }, { "type": "authored-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "afd82faba62589717df9e3c32172b310b68f808b", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "4aebf3f1c07d7ce4413c6b0fc6d11293f9cd807c", "sourceParentId": "", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Fix typo in config\n\nSigned-off-by: Ivan >\nReviewed-by: Jane <", + "body": "Quick fix for production issue", + "username": "kelly@example.com", "attributes": { "insertions": 1, "timezone": "UTC+01:00", - "deletions": 1, - "lines": 0, + "deletions": 0, + "lines": 1, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Ivan Typo", + "displayName": "Kelly Hotfix", "identities": [ { "platform": "git", - "value": "ivan@example.com", + "value": "kelly@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "ivan@example.com", + "value": "kelly@example.com", "type": "email", "verified": false } @@ -180,34 +185,35 @@ "segmentId": "test-segment-id" }, { - "type": "signed-off-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "2bcde409c58ed7a5620b3f1a8c18c0572067eb62", - "sourceParentId": "afd82faba62589717df9e3c32172b310b68f808b", + "type": "reviewed-commit", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "710c60a55b0c4353ca1a84694227c61ed5fc34a8", + "sourceParentId": "982f1963705ca80dc1194cc085a7351745459457", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Fix typo in config\n\nSigned-off-by: Ivan >\nReviewed-by: Jane <", + "body": "Refactor authentication module\n\nRemoved old code and added new implementation.\n\nReviewed-by: Alice Developer ", + "username": "alice@example.com", "attributes": { - "insertions": 1, + "insertions": 8, "timezone": "UTC+01:00", - "deletions": 1, - "lines": 0, + "deletions": 2, + "lines": 6, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Ivan", + "displayName": "Alice Developer", "identities": [ { "platform": "git", - "value": "ivan@example.com", + "value": "alice@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "ivan@example.com", + "value": "alice@example.com", "type": "email", "verified": false } @@ -216,13 +222,14 @@ "segmentId": "test-segment-id" }, { - "type": "reviewed-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "97ea4072257c1e377a60803c810c7e3af5d051af", - "sourceParentId": "afd82faba62589717df9e3c32172b310b68f808b", + "type": "authored-commit", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "8007a6c8648839cb4624521d986fb60945828f5d", + "sourceParentId": "", "platform": "git", "channel": "https://github.com/test/repo.git", "body": "Fix typo in config\n\nSigned-off-by: Ivan >\nReviewed-by: Jane <", + "username": "ivan@example.com", "attributes": { "insertions": 1, "timezone": "UTC+01:00", @@ -233,17 +240,17 @@ }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Jane", + "displayName": "Ivan Typo", "identities": [ { "platform": "git", - "value": "jane@example.com", + "value": "ivan@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "jane@example.com", + "value": "ivan@example.com", "type": "email", "verified": false } @@ -252,34 +259,35 @@ "segmentId": "test-segment-id" }, { - "type": "authored-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "28c2ba317c4e202c25cd684a0c181f54c37d78a0", - "sourceParentId": "", + "type": "reviewed-commit", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "8881dc51913f1234107b34be2b176a34e06b18c5", + "sourceParentId": "98fc9496af492345dff6dd47895695986ced362c", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Update documentation\n\nUpdated the README with new instructions.\n\nReviewed-By: No Email Here\nSigned-off-by: Henry Docs ", + "body": "Fix critical security vulnerability\n\nThis patch addresses a security issue in the authentication module.\n\nSigned-off-by: Charlie Developer \nReviewed-by: Alice Developer \nTested-by: David Tester ", + "username": "alice@example.com", "attributes": { - "insertions": 4, + "insertions": 3, "timezone": "UTC+01:00", - "deletions": 1, + "deletions": 0, "lines": 3, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Henry Docs", + "displayName": "Alice Developer", "identities": [ { "platform": "git", - "value": "henry@example.com", + "value": "alice@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "henry@example.com", + "value": "alice@example.com", "type": "email", "verified": false } @@ -288,34 +296,35 @@ "segmentId": "test-segment-id" }, { - "type": "signed-off-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "9a5760b9f9e05e54acac8c29eb555bc51014a8a1", - "sourceParentId": "28c2ba317c4e202c25cd684a0c181f54c37d78a0", + "type": "authored-commit", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "8ae70441565d9ed1036b5b3546658b1cad914dcc", + "sourceParentId": "", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Update documentation\n\nUpdated the README with new instructions.\n\nReviewed-By: No Email Here\nSigned-off-by: Henry Docs ", + "body": "Implement new feature together\n\nThis feature was developed in pair programming session.\n\nCo-authored-by: Eve Pair \nSigned-off-by: Frank Lead ", + "username": "frank@example.com", "attributes": { - "insertions": 4, + "insertions": 2, "timezone": "UTC+01:00", - "deletions": 1, - "lines": 3, + "deletions": 0, + "lines": 2, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Henry Docs", + "displayName": "Frank Lead", "identities": [ { "platform": "git", - "value": "henry@example.com", + "value": "frank@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "henry@example.com", + "value": "frank@example.com", "type": "email", "verified": false } @@ -324,34 +333,35 @@ "segmentId": "test-segment-id" }, { - "type": "authored-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "4df11752fae718f054a20d22bec2078c1d1f08fa", - "sourceParentId": "", + "type": "signed-off-commit", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "95c22aca2d51f6c40859061c5df1a2aa1b513646", + "sourceParentId": "8ae70441565d9ed1036b5b3546658b1cad914dcc", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Refactor authentication module\n\nRemoved old code and added new implementation.\n\nReviewed-by: Alice Developer ", + "body": "Implement new feature together\n\nThis feature was developed in pair programming session.\n\nCo-authored-by: Eve Pair \nSigned-off-by: Frank Lead ", + "username": "frank@example.com", "attributes": { - "insertions": 8, + "insertions": 2, "timezone": "UTC+01:00", - "deletions": 2, - "lines": 6, + "deletions": 0, + "lines": 2, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Grace Refactorer", + "displayName": "Frank Lead", "identities": [ { "platform": "git", - "value": "grace@example.com", + "value": "frank@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "grace@example.com", + "value": "frank@example.com", "type": "email", "verified": false } @@ -360,13 +370,14 @@ "segmentId": "test-segment-id" }, { - "type": "reviewed-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "11b4c273bc98507e605d6f3af0dfb273e24649f1", - "sourceParentId": "4df11752fae718f054a20d22bec2078c1d1f08fa", + "type": "authored-commit", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "982f1963705ca80dc1194cc085a7351745459457", + "sourceParentId": "", "platform": "git", "channel": "https://github.com/test/repo.git", "body": "Refactor authentication module\n\nRemoved old code and added new implementation.\n\nReviewed-by: Alice Developer ", + "username": "grace@example.com", "attributes": { "insertions": 8, "timezone": "UTC+01:00", @@ -377,17 +388,17 @@ }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Alice Developer", + "displayName": "Grace Refactorer", "identities": [ { "platform": "git", - "value": "alice@example.com", + "value": "grace@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "alice@example.com", + "value": "grace@example.com", "type": "email", "verified": false } @@ -397,33 +408,34 @@ }, { "type": "authored-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "eacba0e12f931b0cc6b551f4680e41c024b02afe", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "98fc9496af492345dff6dd47895695986ced362c", "sourceParentId": "", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Implement new feature together\n\nThis feature was developed in pair programming session.\n\nCo-authored-by: Eve Pair \nSigned-off-by: Frank Lead ", + "body": "Fix critical security vulnerability\n\nThis patch addresses a security issue in the authentication module.\n\nSigned-off-by: Charlie Developer \nReviewed-by: Alice Developer \nTested-by: David Tester ", + "username": "charlie@example.com", "attributes": { - "insertions": 2, + "insertions": 3, "timezone": "UTC+01:00", "deletions": 0, - "lines": 2, + "lines": 3, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Frank Lead", + "displayName": "Charlie Developer", "identities": [ { "platform": "git", - "value": "frank@example.com", + "value": "charlie@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "frank@example.com", + "value": "charlie@example.com", "type": "email", "verified": false } @@ -432,34 +444,35 @@ "segmentId": "test-segment-id" }, { - "type": "co-authored-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "60a829c7b43c060ee991a6a4601cd9e0ae7058ea", - "sourceParentId": "eacba0e12f931b0cc6b551f4680e41c024b02afe", + "type": "authored-commit", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "af49c9408dea4a8567ed90f1dc355045fb55e516", + "sourceParentId": "", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Implement new feature together\n\nThis feature was developed in pair programming session.\n\nCo-authored-by: Eve Pair \nSigned-off-by: Frank Lead ", + "body": "Initial commit - basic structure", + "username": "alice@example.com", "attributes": { - "insertions": 2, + "insertions": 3, "timezone": "UTC+01:00", "deletions": 0, - "lines": 2, + "lines": 3, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Eve Pair", + "displayName": "Alice Developer", "identities": [ { "platform": "git", - "value": "eve@example.com", + "value": "alice@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "eve@example.com", + "value": "alice@example.com", "type": "email", "verified": false } @@ -468,34 +481,35 @@ "segmentId": "test-segment-id" }, { - "type": "signed-off-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "ea096a751ec8751d06c82a861a2f769a105bb762", - "sourceParentId": "eacba0e12f931b0cc6b551f4680e41c024b02afe", + "type": "committed-commit", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "ba71081e8d434642b001d6c806eef43351b23ee1", + "sourceParentId": "46e950cf179b11a1f6b6a8f69fb40ae67eb9be56", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Implement new feature together\n\nThis feature was developed in pair programming session.\n\nCo-authored-by: Eve Pair \nSigned-off-by: Frank Lead ", + "body": "Merge pull request from external contributor\n\nThis commit was authored by a contributor but committed by a maintainer.\n\nSigned-off-by: Linda Contributor ", + "username": "mike@example.com", "attributes": { - "insertions": 2, + "insertions": 3, "timezone": "UTC+01:00", "deletions": 0, - "lines": 2, + "lines": 3, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Frank Lead", + "displayName": "Mike Maintainer", "identities": [ { "platform": "git", - "value": "frank@example.com", + "value": "mike@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "frank@example.com", + "value": "mike@example.com", "type": "email", "verified": false } @@ -505,33 +519,34 @@ }, { "type": "authored-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "c73621865a578ddcc419f976382ef701efa33da4", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "c59d9e2b5cd7b1f177f08f1aeaae891470426fca", "sourceParentId": "", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Fix critical security vulnerability\n\nThis patch addresses a security issue in the authentication module.\n\nSigned-off-by: Charlie Developer \nReviewed-by: Alice Developer \nTested-by: David Tester ", + "body": "Update documentation\n\nUpdated the README with new instructions.\n\nReviewed-By: No Email Here\nSigned-off-by: Henry Docs ", + "username": "henry@example.com", "attributes": { - "insertions": 3, + "insertions": 4, "timezone": "UTC+01:00", - "deletions": 0, + "deletions": 1, "lines": 3, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Charlie Developer", + "displayName": "Henry Docs", "identities": [ { "platform": "git", - "value": "charlie@example.com", + "value": "henry@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "charlie@example.com", + "value": "henry@example.com", "type": "email", "verified": false } @@ -541,33 +556,34 @@ }, { "type": "signed-off-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "5d4675a0cabb19b066b94873a1f4b8f7caf805e7", - "sourceParentId": "c73621865a578ddcc419f976382ef701efa33da4", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "c6f50bbdb46f82c43d943f6ddd827d53d0b37dd4", + "sourceParentId": "c59d9e2b5cd7b1f177f08f1aeaae891470426fca", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Fix critical security vulnerability\n\nThis patch addresses a security issue in the authentication module.\n\nSigned-off-by: Charlie Developer \nReviewed-by: Alice Developer \nTested-by: David Tester ", + "body": "Update documentation\n\nUpdated the README with new instructions.\n\nReviewed-By: No Email Here\nSigned-off-by: Henry Docs ", + "username": "henry@example.com", "attributes": { - "insertions": 3, + "insertions": 4, "timezone": "UTC+01:00", - "deletions": 0, + "deletions": 1, "lines": 3, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Charlie Developer", + "displayName": "Henry Docs", "identities": [ { "platform": "git", - "value": "charlie@example.com", + "value": "henry@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "charlie@example.com", + "value": "henry@example.com", "type": "email", "verified": false } @@ -576,34 +592,35 @@ "segmentId": "test-segment-id" }, { - "type": "reviewed-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "eb1fb44cc5fb7cdaf526ef9268956b7158936887", - "sourceParentId": "c73621865a578ddcc419f976382ef701efa33da4", + "type": "co-authored-commit", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "ddc284210a8f6a1b6a004055ce0b2617bf4c58ec", + "sourceParentId": "8ae70441565d9ed1036b5b3546658b1cad914dcc", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Fix critical security vulnerability\n\nThis patch addresses a security issue in the authentication module.\n\nSigned-off-by: Charlie Developer \nReviewed-by: Alice Developer \nTested-by: David Tester ", + "body": "Implement new feature together\n\nThis feature was developed in pair programming session.\n\nCo-authored-by: Eve Pair \nSigned-off-by: Frank Lead ", + "username": "eve@example.com", "attributes": { - "insertions": 3, + "insertions": 2, "timezone": "UTC+01:00", "deletions": 0, - "lines": 3, + "lines": 2, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Alice Developer", + "displayName": "Eve Pair", "identities": [ { "platform": "git", - "value": "alice@example.com", + "value": "eve@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "alice@example.com", + "value": "eve@example.com", "type": "email", "verified": false } @@ -613,12 +630,13 @@ }, { "type": "tested-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "9e230ef529e800d109a7374d8fd4182322508e3d", - "sourceParentId": "c73621865a578ddcc419f976382ef701efa33da4", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "eaaae913674aee329a813b3888b80593d109846e", + "sourceParentId": "98fc9496af492345dff6dd47895695986ced362c", "platform": "git", "channel": "https://github.com/test/repo.git", "body": "Fix critical security vulnerability\n\nThis patch addresses a security issue in the authentication module.\n\nSigned-off-by: Charlie Developer \nReviewed-by: Alice Developer \nTested-by: David Tester ", + "username": "david@example.com", "attributes": { "insertions": 3, "timezone": "UTC+01:00", @@ -648,34 +666,35 @@ "segmentId": "test-segment-id" }, { - "type": "authored-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "4fec97a2520ab0aae9454ef0c060d0f1dfc6aa4d", - "sourceParentId": "", + "type": "signed-off-commit", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "f00370b20a58a4369e8f8be1dcb26e10a3c238f4", + "sourceParentId": "46e950cf179b11a1f6b6a8f69fb40ae67eb9be56", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Add configuration file\n\nThis commit adds the main configuration file for the project.\n\nSigned-off-by: Bob Reviewer ", + "body": "Merge pull request from external contributor\n\nThis commit was authored by a contributor but committed by a maintainer.\n\nSigned-off-by: Linda Contributor ", + "username": "linda@external.com", "attributes": { - "insertions": 1, + "insertions": 3, "timezone": "UTC+01:00", "deletions": 0, - "lines": 1, + "lines": 3, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Bob Reviewer", + "displayName": "Linda Contributor", "identities": [ { "platform": "git", - "value": "bob@example.com", + "value": "linda@external.com", "type": "username", "verified": true }, { "platform": "git", - "value": "bob@example.com", + "value": "linda@external.com", "type": "email", "verified": false } @@ -685,33 +704,34 @@ }, { "type": "signed-off-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "e52dc2b2ec76720ce7c8233613d110da5378b014", - "sourceParentId": "4fec97a2520ab0aae9454ef0c060d0f1dfc6aa4d", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "f560cbfab0c80a432b3928c7dafb20dcf44378ca", + "sourceParentId": "98fc9496af492345dff6dd47895695986ced362c", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Add configuration file\n\nThis commit adds the main configuration file for the project.\n\nSigned-off-by: Bob Reviewer ", + "body": "Fix critical security vulnerability\n\nThis patch addresses a security issue in the authentication module.\n\nSigned-off-by: Charlie Developer \nReviewed-by: Alice Developer \nTested-by: David Tester ", + "username": "charlie@example.com", "attributes": { - "insertions": 1, + "insertions": 3, "timezone": "UTC+01:00", "deletions": 0, - "lines": 1, + "lines": 3, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Bob Reviewer", + "displayName": "Charlie Developer", "identities": [ { "platform": "git", - "value": "bob@example.com", + "value": "charlie@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "bob@example.com", + "value": "charlie@example.com", "type": "email", "verified": false } @@ -720,34 +740,35 @@ "segmentId": "test-segment-id" }, { - "type": "authored-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "2aaf02cef5d7a0fd7f916eb5b64456c4e82a7e58", - "sourceParentId": "", + "type": "signed-off-commit", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "f69cd3c5788ae63e1940d0e3c85fddb4365975aa", + "sourceParentId": "00bc152e7840d47fa31cdd89f9d3f53d3e12621a", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Initial commit - basic structure", + "body": "Add configuration file\n\nThis commit adds the main configuration file for the project.\n\nSigned-off-by: Bob Reviewer ", + "username": "bob@example.com", "attributes": { - "insertions": 3, + "insertions": 1, "timezone": "UTC+01:00", "deletions": 0, - "lines": 3, + "lines": 1, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Alice Developer", + "displayName": "Bob Reviewer", "identities": [ { "platform": "git", - "value": "alice@example.com", + "value": "bob@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "alice@example.com", + "value": "bob@example.com", "type": "email", "verified": false } @@ -755,4 +776,4 @@ }, "segmentId": "test-segment-id" } -] +] \ No newline at end of file diff --git a/services/apps/git_integration/src/test/outputs/test-repo_expected.json b/services/apps/git_integration/src/test/outputs/test-repo_expected.json index b7a95182bd..5eca14cb9b 100644 --- a/services/apps/git_integration/src/test/outputs/test-repo_expected.json +++ b/services/apps/git_integration/src/test/outputs/test-repo_expected.json @@ -1,33 +1,34 @@ [ { - "type": "reviewed-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "11b4c273bc98507e605d6f3af0dfb273e24649f1", - "sourceParentId": "4df11752fae718f054a20d22bec2078c1d1f08fa", + "type": "authored-commit", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "00bc152e7840d47fa31cdd89f9d3f53d3e12621a", + "sourceParentId": "", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Refactor authentication module\n\nRemoved old code and added new implementation.\n\nReviewed-by: Alice Developer ", + "body": "Add configuration file\n\nThis commit adds the main configuration file for the project.\n\nSigned-off-by: Bob Reviewer ", + "username": "bob@example.com", "attributes": { - "insertions": 8, + "insertions": 1, "timezone": "UTC+01:00", - "deletions": 2, - "lines": 6, + "deletions": 0, + "lines": 1, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Alice Developer", + "displayName": "Bob Reviewer", "identities": [ { "platform": "git", - "value": "alice@example.com", + "value": "bob@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "alice@example.com", + "value": "bob@example.com", "type": "email", "verified": false } @@ -36,34 +37,35 @@ "segmentId": "test-segment-id" }, { - "type": "authored-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "12b4d83852e06c5b3fa0e339c395bad0ee65200f", - "sourceParentId": "", + "type": "signed-off-commit", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "01095aa2caab05d8d08f2c4cb17ceb834294802b", + "sourceParentId": "8007a6c8648839cb4624521d986fb60945828f5d", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Quick fix for production issue", + "body": "Fix typo in config\n\nSigned-off-by: Ivan >\nReviewed-by: Jane <", + "username": "ivan@example.com", "attributes": { "insertions": 1, "timezone": "UTC+01:00", - "deletions": 0, - "lines": 1, + "deletions": 1, + "lines": 0, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Kelly Hotfix", + "displayName": "Ivan", "identities": [ { "platform": "git", - "value": "kelly@example.com", + "value": "ivan@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "kelly@example.com", + "value": "ivan@example.com", "type": "email", "verified": false } @@ -72,34 +74,35 @@ "segmentId": "test-segment-id" }, { - "type": "authored-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "28c2ba317c4e202c25cd684a0c181f54c37d78a0", - "sourceParentId": "", + "type": "reviewed-commit", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "44c7cc646f0b66c6e643bc0c2cb4fb3390865fda", + "sourceParentId": "8007a6c8648839cb4624521d986fb60945828f5d", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Update documentation\n\nUpdated the README with new instructions.\n\nReviewed-By: No Email Here\nSigned-off-by: Henry Docs ", + "body": "Fix typo in config\n\nSigned-off-by: Ivan >\nReviewed-by: Jane <", + "username": "jane@example.com", "attributes": { - "insertions": 4, + "insertions": 1, "timezone": "UTC+01:00", "deletions": 1, - "lines": 3, + "lines": 0, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Henry Docs", + "displayName": "Jane", "identities": [ { "platform": "git", - "value": "henry@example.com", + "value": "jane@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "henry@example.com", + "value": "jane@example.com", "type": "email", "verified": false } @@ -109,12 +112,13 @@ }, { "type": "authored-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "2aaf02cef5d7a0fd7f916eb5b64456c4e82a7e58", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "46e950cf179b11a1f6b6a8f69fb40ae67eb9be56", "sourceParentId": "", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Initial commit - basic structure", + "body": "Merge pull request from external contributor\n\nThis commit was authored by a contributor but committed by a maintainer.\n\nSigned-off-by: Linda Contributor ", + "username": "linda@external.com", "attributes": { "insertions": 3, "timezone": "UTC+01:00", @@ -125,17 +129,17 @@ }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Alice Developer", + "displayName": "Linda Contributor", "identities": [ { "platform": "git", - "value": "alice@example.com", + "value": "linda@external.com", "type": "username", "verified": true }, { "platform": "git", - "value": "alice@example.com", + "value": "linda@external.com", "type": "email", "verified": false } @@ -144,34 +148,35 @@ "segmentId": "test-segment-id" }, { - "type": "signed-off-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "2bcde409c58ed7a5620b3f1a8c18c0572067eb62", - "sourceParentId": "afd82faba62589717df9e3c32172b310b68f808b", + "type": "authored-commit", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "4aebf3f1c07d7ce4413c6b0fc6d11293f9cd807c", + "sourceParentId": "", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Fix typo in config\n\nSigned-off-by: Ivan >\nReviewed-by: Jane <", + "body": "Quick fix for production issue", + "username": "kelly@example.com", "attributes": { "insertions": 1, "timezone": "UTC+01:00", - "deletions": 1, - "lines": 0, + "deletions": 0, + "lines": 1, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Ivan", + "displayName": "Kelly Hotfix", "identities": [ { "platform": "git", - "value": "ivan@example.com", + "value": "kelly@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "ivan@example.com", + "value": "kelly@example.com", "type": "email", "verified": false } @@ -180,34 +185,35 @@ "segmentId": "test-segment-id" }, { - "type": "committed-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "4619a3b615c79fe4be0de4a82150318898d8ea49", - "sourceParentId": "d38f1a8036c1b3177d94682f226a9fe6cb387240", + "type": "reviewed-commit", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "710c60a55b0c4353ca1a84694227c61ed5fc34a8", + "sourceParentId": "982f1963705ca80dc1194cc085a7351745459457", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Merge pull request from external contributor\n\nThis commit was authored by a contributor but committed by a maintainer.\n\nSigned-off-by: Linda Contributor ", + "body": "Refactor authentication module\n\nRemoved old code and added new implementation.\n\nReviewed-by: Alice Developer ", + "username": "alice@example.com", "attributes": { - "insertions": 3, + "insertions": 8, "timezone": "UTC+01:00", - "deletions": 0, - "lines": 3, + "deletions": 2, + "lines": 6, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Mike Maintainer", + "displayName": "Alice Developer", "identities": [ { "platform": "git", - "value": "mike@example.com", + "value": "alice@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "mike@example.com", + "value": "alice@example.com", "type": "email", "verified": false } @@ -217,33 +223,34 @@ }, { "type": "authored-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "4df11752fae718f054a20d22bec2078c1d1f08fa", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "8007a6c8648839cb4624521d986fb60945828f5d", "sourceParentId": "", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Refactor authentication module\n\nRemoved old code and added new implementation.\n\nReviewed-by: Alice Developer ", + "body": "Fix typo in config\n\nSigned-off-by: Ivan >\nReviewed-by: Jane <", + "username": "ivan@example.com", "attributes": { - "insertions": 8, + "insertions": 1, "timezone": "UTC+01:00", - "deletions": 2, - "lines": 6, + "deletions": 1, + "lines": 0, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Grace Refactorer", + "displayName": "Ivan Typo", "identities": [ { "platform": "git", - "value": "grace@example.com", + "value": "ivan@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "grace@example.com", + "value": "ivan@example.com", "type": "email", "verified": false } @@ -252,34 +259,35 @@ "segmentId": "test-segment-id" }, { - "type": "authored-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "4fec97a2520ab0aae9454ef0c060d0f1dfc6aa4d", - "sourceParentId": "", + "type": "reviewed-commit", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "8881dc51913f1234107b34be2b176a34e06b18c5", + "sourceParentId": "98fc9496af492345dff6dd47895695986ced362c", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Add configuration file\n\nThis commit adds the main configuration file for the project.\n\nSigned-off-by: Bob Reviewer ", + "body": "Fix critical security vulnerability\n\nThis patch addresses a security issue in the authentication module.\n\nSigned-off-by: Charlie Developer \nReviewed-by: Alice Developer \nTested-by: David Tester ", + "username": "alice@example.com", "attributes": { - "insertions": 1, + "insertions": 3, "timezone": "UTC+01:00", "deletions": 0, - "lines": 1, + "lines": 3, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Bob Reviewer", + "displayName": "Alice Developer", "identities": [ { "platform": "git", - "value": "bob@example.com", + "value": "alice@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "bob@example.com", + "value": "alice@example.com", "type": "email", "verified": false } @@ -288,34 +296,35 @@ "segmentId": "test-segment-id" }, { - "type": "signed-off-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "5d4675a0cabb19b066b94873a1f4b8f7caf805e7", - "sourceParentId": "c73621865a578ddcc419f976382ef701efa33da4", + "type": "authored-commit", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "8ae70441565d9ed1036b5b3546658b1cad914dcc", + "sourceParentId": "", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Fix critical security vulnerability\n\nThis patch addresses a security issue in the authentication module.\n\nSigned-off-by: Charlie Developer \nReviewed-by: Alice Developer \nTested-by: David Tester ", + "body": "Implement new feature together\n\nThis feature was developed in pair programming session.\n\nCo-authored-by: Eve Pair \nSigned-off-by: Frank Lead ", + "username": "frank@example.com", "attributes": { - "insertions": 3, + "insertions": 2, "timezone": "UTC+01:00", "deletions": 0, - "lines": 3, + "lines": 2, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Charlie Developer", + "displayName": "Frank Lead", "identities": [ { "platform": "git", - "value": "charlie@example.com", + "value": "frank@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "charlie@example.com", + "value": "frank@example.com", "type": "email", "verified": false } @@ -324,13 +333,14 @@ "segmentId": "test-segment-id" }, { - "type": "co-authored-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "60a829c7b43c060ee991a6a4601cd9e0ae7058ea", - "sourceParentId": "eacba0e12f931b0cc6b551f4680e41c024b02afe", + "type": "signed-off-commit", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "95c22aca2d51f6c40859061c5df1a2aa1b513646", + "sourceParentId": "8ae70441565d9ed1036b5b3546658b1cad914dcc", "platform": "git", "channel": "https://github.com/test/repo.git", "body": "Implement new feature together\n\nThis feature was developed in pair programming session.\n\nCo-authored-by: Eve Pair \nSigned-off-by: Frank Lead ", + "username": "frank@example.com", "attributes": { "insertions": 2, "timezone": "UTC+01:00", @@ -341,17 +351,17 @@ }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Eve Pair", + "displayName": "Frank Lead", "identities": [ { "platform": "git", - "value": "eve@example.com", + "value": "frank@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "eve@example.com", + "value": "frank@example.com", "type": "email", "verified": false } @@ -360,34 +370,35 @@ "segmentId": "test-segment-id" }, { - "type": "reviewed-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "97ea4072257c1e377a60803c810c7e3af5d051af", - "sourceParentId": "afd82faba62589717df9e3c32172b310b68f808b", + "type": "authored-commit", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "982f1963705ca80dc1194cc085a7351745459457", + "sourceParentId": "", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Fix typo in config\n\nSigned-off-by: Ivan >\nReviewed-by: Jane <", + "body": "Refactor authentication module\n\nRemoved old code and added new implementation.\n\nReviewed-by: Alice Developer ", + "username": "grace@example.com", "attributes": { - "insertions": 1, + "insertions": 8, "timezone": "UTC+01:00", - "deletions": 1, - "lines": 0, + "deletions": 2, + "lines": 6, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Jane", + "displayName": "Grace Refactorer", "identities": [ { "platform": "git", - "value": "jane@example.com", + "value": "grace@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "jane@example.com", + "value": "grace@example.com", "type": "email", "verified": false } @@ -396,34 +407,35 @@ "segmentId": "test-segment-id" }, { - "type": "signed-off-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "9a5760b9f9e05e54acac8c29eb555bc51014a8a1", - "sourceParentId": "28c2ba317c4e202c25cd684a0c181f54c37d78a0", + "type": "authored-commit", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "98fc9496af492345dff6dd47895695986ced362c", + "sourceParentId": "", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Update documentation\n\nUpdated the README with new instructions.\n\nReviewed-By: No Email Here\nSigned-off-by: Henry Docs ", + "body": "Fix critical security vulnerability\n\nThis patch addresses a security issue in the authentication module.\n\nSigned-off-by: Charlie Developer \nReviewed-by: Alice Developer \nTested-by: David Tester ", + "username": "charlie@example.com", "attributes": { - "insertions": 4, + "insertions": 3, "timezone": "UTC+01:00", - "deletions": 1, + "deletions": 0, "lines": 3, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Henry Docs", + "displayName": "Charlie Developer", "identities": [ { "platform": "git", - "value": "henry@example.com", + "value": "charlie@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "henry@example.com", + "value": "charlie@example.com", "type": "email", "verified": false } @@ -432,13 +444,14 @@ "segmentId": "test-segment-id" }, { - "type": "tested-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "9e230ef529e800d109a7374d8fd4182322508e3d", - "sourceParentId": "c73621865a578ddcc419f976382ef701efa33da4", + "type": "authored-commit", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "af49c9408dea4a8567ed90f1dc355045fb55e516", + "sourceParentId": "", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Fix critical security vulnerability\n\nThis patch addresses a security issue in the authentication module.\n\nSigned-off-by: Charlie Developer \nReviewed-by: Alice Developer \nTested-by: David Tester ", + "body": "Initial commit - basic structure", + "username": "alice@example.com", "attributes": { "insertions": 3, "timezone": "UTC+01:00", @@ -449,17 +462,17 @@ }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "David Tester", + "displayName": "Alice Developer", "identities": [ { "platform": "git", - "value": "david@example.com", + "value": "alice@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "david@example.com", + "value": "alice@example.com", "type": "email", "verified": false } @@ -468,13 +481,14 @@ "segmentId": "test-segment-id" }, { - "type": "signed-off-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "a321f844e34e4c4b2bd6b2997681fcf2dd5f47f5", - "sourceParentId": "d38f1a8036c1b3177d94682f226a9fe6cb387240", + "type": "committed-commit", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "ba71081e8d434642b001d6c806eef43351b23ee1", + "sourceParentId": "46e950cf179b11a1f6b6a8f69fb40ae67eb9be56", "platform": "git", "channel": "https://github.com/test/repo.git", "body": "Merge pull request from external contributor\n\nThis commit was authored by a contributor but committed by a maintainer.\n\nSigned-off-by: Linda Contributor ", + "username": "mike@example.com", "attributes": { "insertions": 3, "timezone": "UTC+01:00", @@ -485,17 +499,17 @@ }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Linda Contributor", + "displayName": "Mike Maintainer", "identities": [ { "platform": "git", - "value": "linda@external.com", + "value": "mike@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "linda@external.com", + "value": "mike@example.com", "type": "email", "verified": false } @@ -505,33 +519,34 @@ }, { "type": "authored-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "afd82faba62589717df9e3c32172b310b68f808b", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "c59d9e2b5cd7b1f177f08f1aeaae891470426fca", "sourceParentId": "", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Fix typo in config\n\nSigned-off-by: Ivan >\nReviewed-by: Jane <", + "body": "Update documentation\n\nUpdated the README with new instructions.\n\nReviewed-By: No Email Here\nSigned-off-by: Henry Docs ", + "username": "henry@example.com", "attributes": { - "insertions": 1, + "insertions": 4, "timezone": "UTC+01:00", "deletions": 1, - "lines": 0, + "lines": 3, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Ivan Typo", + "displayName": "Henry Docs", "identities": [ { "platform": "git", - "value": "ivan@example.com", + "value": "henry@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "ivan@example.com", + "value": "henry@example.com", "type": "email", "verified": false } @@ -540,34 +555,35 @@ "segmentId": "test-segment-id" }, { - "type": "authored-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "c73621865a578ddcc419f976382ef701efa33da4", - "sourceParentId": "", + "type": "signed-off-commit", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "c6f50bbdb46f82c43d943f6ddd827d53d0b37dd4", + "sourceParentId": "c59d9e2b5cd7b1f177f08f1aeaae891470426fca", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Fix critical security vulnerability\n\nThis patch addresses a security issue in the authentication module.\n\nSigned-off-by: Charlie Developer \nReviewed-by: Alice Developer \nTested-by: David Tester ", + "body": "Update documentation\n\nUpdated the README with new instructions.\n\nReviewed-By: No Email Here\nSigned-off-by: Henry Docs ", + "username": "henry@example.com", "attributes": { - "insertions": 3, + "insertions": 4, "timezone": "UTC+01:00", - "deletions": 0, + "deletions": 1, "lines": 3, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Charlie Developer", + "displayName": "Henry Docs", "identities": [ { "platform": "git", - "value": "charlie@example.com", + "value": "henry@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "charlie@example.com", + "value": "henry@example.com", "type": "email", "verified": false } @@ -576,34 +592,35 @@ "segmentId": "test-segment-id" }, { - "type": "authored-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "d38f1a8036c1b3177d94682f226a9fe6cb387240", - "sourceParentId": "", + "type": "co-authored-commit", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "ddc284210a8f6a1b6a004055ce0b2617bf4c58ec", + "sourceParentId": "8ae70441565d9ed1036b5b3546658b1cad914dcc", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Merge pull request from external contributor\n\nThis commit was authored by a contributor but committed by a maintainer.\n\nSigned-off-by: Linda Contributor ", + "body": "Implement new feature together\n\nThis feature was developed in pair programming session.\n\nCo-authored-by: Eve Pair \nSigned-off-by: Frank Lead ", + "username": "eve@example.com", "attributes": { - "insertions": 3, + "insertions": 2, "timezone": "UTC+01:00", "deletions": 0, - "lines": 3, + "lines": 2, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Linda Contributor", + "displayName": "Eve Pair", "identities": [ { "platform": "git", - "value": "linda@external.com", + "value": "eve@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "linda@external.com", + "value": "eve@example.com", "type": "email", "verified": false } @@ -612,34 +629,35 @@ "segmentId": "test-segment-id" }, { - "type": "signed-off-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "e52dc2b2ec76720ce7c8233613d110da5378b014", - "sourceParentId": "4fec97a2520ab0aae9454ef0c060d0f1dfc6aa4d", + "type": "tested-commit", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "eaaae913674aee329a813b3888b80593d109846e", + "sourceParentId": "98fc9496af492345dff6dd47895695986ced362c", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Add configuration file\n\nThis commit adds the main configuration file for the project.\n\nSigned-off-by: Bob Reviewer ", + "body": "Fix critical security vulnerability\n\nThis patch addresses a security issue in the authentication module.\n\nSigned-off-by: Charlie Developer \nReviewed-by: Alice Developer \nTested-by: David Tester ", + "username": "david@example.com", "attributes": { - "insertions": 1, + "insertions": 3, "timezone": "UTC+01:00", "deletions": 0, - "lines": 1, + "lines": 3, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Bob Reviewer", + "displayName": "David Tester", "identities": [ { "platform": "git", - "value": "bob@example.com", + "value": "david@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "bob@example.com", + "value": "david@example.com", "type": "email", "verified": false } @@ -649,33 +667,34 @@ }, { "type": "signed-off-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "ea096a751ec8751d06c82a861a2f769a105bb762", - "sourceParentId": "eacba0e12f931b0cc6b551f4680e41c024b02afe", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "f00370b20a58a4369e8f8be1dcb26e10a3c238f4", + "sourceParentId": "46e950cf179b11a1f6b6a8f69fb40ae67eb9be56", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Implement new feature together\n\nThis feature was developed in pair programming session.\n\nCo-authored-by: Eve Pair \nSigned-off-by: Frank Lead ", + "body": "Merge pull request from external contributor\n\nThis commit was authored by a contributor but committed by a maintainer.\n\nSigned-off-by: Linda Contributor ", + "username": "linda@external.com", "attributes": { - "insertions": 2, + "insertions": 3, "timezone": "UTC+01:00", "deletions": 0, - "lines": 2, + "lines": 3, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Frank Lead", + "displayName": "Linda Contributor", "identities": [ { "platform": "git", - "value": "frank@example.com", + "value": "linda@external.com", "type": "username", "verified": true }, { "platform": "git", - "value": "frank@example.com", + "value": "linda@external.com", "type": "email", "verified": false } @@ -684,34 +703,35 @@ "segmentId": "test-segment-id" }, { - "type": "authored-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "eacba0e12f931b0cc6b551f4680e41c024b02afe", - "sourceParentId": "", + "type": "signed-off-commit", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "f560cbfab0c80a432b3928c7dafb20dcf44378ca", + "sourceParentId": "98fc9496af492345dff6dd47895695986ced362c", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Implement new feature together\n\nThis feature was developed in pair programming session.\n\nCo-authored-by: Eve Pair \nSigned-off-by: Frank Lead ", + "body": "Fix critical security vulnerability\n\nThis patch addresses a security issue in the authentication module.\n\nSigned-off-by: Charlie Developer \nReviewed-by: Alice Developer \nTested-by: David Tester ", + "username": "charlie@example.com", "attributes": { - "insertions": 2, + "insertions": 3, "timezone": "UTC+01:00", "deletions": 0, - "lines": 2, + "lines": 3, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Frank Lead", + "displayName": "Charlie Developer", "identities": [ { "platform": "git", - "value": "frank@example.com", + "value": "charlie@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "frank@example.com", + "value": "charlie@example.com", "type": "email", "verified": false } @@ -720,34 +740,35 @@ "segmentId": "test-segment-id" }, { - "type": "reviewed-commit", - "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "eb1fb44cc5fb7cdaf526ef9268956b7158936887", - "sourceParentId": "c73621865a578ddcc419f976382ef701efa33da4", + "type": "signed-off-commit", + "timestamp": "2026-03-27T17:48:52+01:00", + "sourceId": "f69cd3c5788ae63e1940d0e3c85fddb4365975aa", + "sourceParentId": "00bc152e7840d47fa31cdd89f9d3f53d3e12621a", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Fix critical security vulnerability\n\nThis patch addresses a security issue in the authentication module.\n\nSigned-off-by: Charlie Developer \nReviewed-by: Alice Developer \nTested-by: David Tester ", + "body": "Add configuration file\n\nThis commit adds the main configuration file for the project.\n\nSigned-off-by: Bob Reviewer ", + "username": "bob@example.com", "attributes": { - "insertions": 3, + "insertions": 1, "timezone": "UTC+01:00", "deletions": 0, - "lines": 3, + "lines": 1, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Alice Developer", + "displayName": "Bob Reviewer", "identities": [ { "platform": "git", - "value": "alice@example.com", + "value": "bob@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "alice@example.com", + "value": "bob@example.com", "type": "email", "verified": false } @@ -755,4 +776,4 @@ }, "segmentId": "test-segment-id" } -] +] \ No newline at end of file diff --git a/services/apps/git_integration/src/test/test_activity_extraction.py b/services/apps/git_integration/src/test/test_activity_extraction.py index a960f0f21a..82dbb26aaf 100644 --- a/services/apps/git_integration/src/test/test_activity_extraction.py +++ b/services/apps/git_integration/src/test/test_activity_extraction.py @@ -107,10 +107,7 @@ def batch_info(): remote="https://github.com/test/repo.git", is_first_batch=True, is_final_batch=True, - clone_with_batches=False, latest_commit_in_repo=None, # Will be set by service - prev_batch_edge_commit=None, - edge_commit=None, ) @@ -144,6 +141,9 @@ async def mock_save_execution(execution): """Mock service execution save.""" pass + async def mock_update_last_processed_commit(*args, **kwargs): + pass + with patch( "crowdgit.services.commit.commit_service.batch_insert_activities", mock_batch_insert ): @@ -151,10 +151,14 @@ async def mock_save_execution(execution): "crowdgit.services.commit.commit_service.save_service_execution", mock_save_execution, ): - # Process commits - await commit_service.process_single_batch_commits( - repository=test_repository, batch_info=batch_info - ) + with patch( + "crowdgit.services.commit.commit_service.update_last_processed_commit", + mock_update_last_processed_commit, + ): + # Process commits + await commit_service.process_single_batch_commits( + repository=test_repository, batch_info=batch_info + ) # Verify activities were extracted assert len(captured_activities_db) > 0, "No activities were extracted" @@ -269,6 +273,9 @@ async def mock_batch_insert(activities): async def mock_save_execution(execution): pass + async def mock_update_last_processed_commit(*args, **kwargs): + pass + with patch( "crowdgit.services.commit.commit_service.batch_insert_activities", mock_batch_insert ): @@ -276,9 +283,13 @@ async def mock_save_execution(execution): "crowdgit.services.commit.commit_service.save_service_execution", mock_save_execution, ): - await commit_service.process_single_batch_commits( - repository=test_repository, batch_info=batch_info - ) + with patch( + "crowdgit.services.commit.commit_service.update_last_processed_commit", + mock_update_last_processed_commit, + ): + await commit_service.process_single_batch_commits( + repository=test_repository, batch_info=batch_info + ) # Extract activity types from DB records # DB format: (result_id, state, json_data, tenant_id, integration_id) From ddad3d43f9ac21e84757e77971ed8031713db9db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Mon, 18 May 2026 12:13:01 +0200 Subject: [PATCH 2/7] fix: comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../src/crowdgit/database/crud.py | 23 ++++++- .../git_integration/src/crowdgit/errors.py | 2 +- .../crowdgit/services/clone/clone_service.py | 66 +++++++++++++------ .../services/commit/commit_service.py | 10 +-- .../src/crowdgit/services/utils.py | 4 +- .../git_integration/src/crowdgit/settings.py | 2 +- .../src/crowdgit/worker/repository_worker.py | 9 ++- 7 files changed, 83 insertions(+), 33 deletions(-) diff --git a/services/apps/git_integration/src/crowdgit/database/crud.py b/services/apps/git_integration/src/crowdgit/database/crud.py index 4c35f4039a..8508e41160 100644 --- a/services/apps/git_integration/src/crowdgit/database/crud.py +++ b/services/apps/git_integration/src/crowdgit/database/crud.py @@ -95,7 +95,12 @@ async def acquire_onboarding_repo() -> Repository | None: """ return await acquire_repository( onboarding_repo_sql_query, - (RepositoryState.PROCESSING, RepositoryState.PENDING, MAX_CONCURRENT_ONBOARDINGS, STUCK_REPO_TIMEOUT_HOURS), + ( + RepositoryState.PROCESSING, + RepositoryState.PENDING, + MAX_CONCURRENT_ONBOARDINGS, + STUCK_REPO_TIMEOUT_HOURS, + ), ) @@ -174,7 +179,13 @@ async def acquire_recurrent_repo() -> Repository | None: ) return await acquire_repository( recurrent_repo_sql_query, - (RepositoryState.PROCESSING, states_to_exclude, REPOSITORY_UPDATE_INTERVAL_HOURS, STUCK_REPO_TIMEOUT_HOURS, FAILED_RETRY_INTERVAL_HOURS), + ( + RepositoryState.PROCESSING, + states_to_exclude, + REPOSITORY_UPDATE_INTERVAL_HOURS, + STUCK_REPO_TIMEOUT_HOURS, + FAILED_RETRY_INTERVAL_HOURS, + ), ) @@ -194,7 +205,13 @@ async def can_onboard_more(): async def acquire_pending_reonboard_repo() -> Repository | None: - """Acquire a pending_reonboard repo for re-onboarding (only called on weekends).""" + """Acquire a pending_reonboard repo for re-onboarding (only called on weekends). + + PENDING_REONBOARD is no longer produced automatically (ReOnboardingRequiredError was removed + in CM-1185). This function stays to drain any legacy rows that pre-date the change, and to + allow the CM-1186 backfill script to set state='pending_reonboard' for repos that need a + full re-ingest on the weekend. + """ pending_reonboard_sql_query = f""" WITH selected_repo AS ( SELECT r.id diff --git a/services/apps/git_integration/src/crowdgit/errors.py b/services/apps/git_integration/src/crowdgit/errors.py index 0dd85185c7..6606cdafba 100644 --- a/services/apps/git_integration/src/crowdgit/errors.py +++ b/services/apps/git_integration/src/crowdgit/errors.py @@ -47,7 +47,7 @@ class NetworkError(CrowdGitError): @dataclass -class PermissionError(CrowdGitError): +class RepoPermissionError(CrowdGitError): error_message: str = "Permission denied" error_code: ErrorCode = ErrorCode.PERMISSION_ERROR diff --git a/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py b/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py index 52ec411de4..24377dfeed 100644 --- a/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py +++ b/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py @@ -1,3 +1,4 @@ +import fcntl import os import shutil import tempfile @@ -294,25 +295,44 @@ async def clone_batches_generator( repo_path = stable_path os.makedirs(repo_path, exist_ok=True) - if os.path.isdir(os.path.join(repo_path, ".git")): - if not await self._is_repo_valid(repo_path): - self.logger.warning( - f"Repo at {repo_path} is in an invalid state, wiping and re-cloning" - ) - await self._wipe_and_reclone(repo_path, remote) - else: - default_branch_changed = await self.has_default_branch_changed( - remote, repository.branch - ) - if default_branch_changed: + # OS-level flock guards against the DB lock expiry window: if a pod crashes and + # another acquires the DB lock after STUCK_REPO_TIMEOUT_HOURS, this flock ensures + # the new pod does not mutate the directory while the crashed pod's git subprocess + # is still running. Non-blocking: raises OSError(EWOULDBLOCK) if held elsewhere. + flock_path = os.path.join(repo_path, ".crowdgit.lock") + flock_fd = open(flock_path, "w") + try: + fcntl.flock(flock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + except OSError: + flock_fd.close() + raise CommandExecutionError( + f"OS flock busy for {repo_path} — another process still holds the repo", + returncode=1, + ) from None + + try: + if os.path.isdir(os.path.join(repo_path, ".git")): + if not await self._is_repo_valid(repo_path): + self.logger.warning( + f"Repo at {repo_path} is in an invalid state, wiping and re-cloning" + ) await self._wipe_and_reclone(repo_path, remote) else: - await self._incremental_fetch(repo_path, remote) - else: - # Defensive: wipe in case a prior crash left checkout artifacts with no .git. - shutil.rmtree(repo_path, ignore_errors=True) - os.makedirs(repo_path) - await self._perform_full_clone(repo_path, remote) + default_branch_changed = await self.has_default_branch_changed( + remote, repository.branch + ) + if default_branch_changed: + await self._wipe_and_reclone(repo_path, remote) + else: + await self._incremental_fetch(repo_path, remote) + else: + # Defensive: wipe in case a prior crash left checkout artifacts with no .git. + shutil.rmtree(repo_path, ignore_errors=True) + os.makedirs(repo_path) + await self._perform_full_clone(repo_path, remote) + finally: + fcntl.flock(flock_fd, fcntl.LOCK_UN) + flock_fd.close() else: # Ephemeral path — legacy behaviour for local dev / Docker Compose. repo_path = tempfile.mkdtemp(prefix=f"{get_repo_name(remote)}_") @@ -333,10 +353,18 @@ async def clone_batches_generator( # Wipe persistent clone only on non-transient failures that indicate local git # corruption. Transient/remote errors (network, rate-limit, disk-full, timeout) # leave the local clone intact — no point re-cloning 6+ GB because GitHub was down. - _TRANSIENT_ERRORS = (RateLimitError, NetworkError, RemoteServerError, DiskSpaceError, CommandTimeoutError) + _TRANSIENT_ERRORS = ( + RateLimitError, + NetworkError, + RemoteServerError, + DiskSpaceError, + CommandTimeoutError, + ) if stable_path and os.path.isdir(os.path.join(stable_path, ".git")): if not isinstance(e, _TRANSIENT_ERRORS): - self.logger.warning(f"Wiping persistent clone at {stable_path} after non-transient failure") + self.logger.warning( + f"Wiping persistent clone at {stable_path} after non-transient failure" + ) shutil.rmtree(stable_path, ignore_errors=True) raise finally: diff --git a/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py b/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py index f1c5890036..7364e8a304 100644 --- a/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py +++ b/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py @@ -10,6 +10,7 @@ import orjson from pydantic import validate_email +from tenacity import retry, stop_after_attempt, wait_fixed from crowdgit.database.crud import ( batch_check_parent_activities, @@ -204,6 +205,7 @@ async def _get_commit_reference(self, repo_path: str) -> str: return "HEAD" return f"origin/{default_branch}" + @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), reraise=True) async def _get_commit_hashes(self, repo_path: str, commit_range: str) -> list[str]: """Stream commit hashes oldest-first via rev-list. No buffering in git subprocess.""" output = await run_shell_command( @@ -211,11 +213,13 @@ async def _get_commit_hashes(self, repo_path: str, commit_range: str) -> list[st ) return [h for h in output.strip().splitlines() if h.strip()] + @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), reraise=True) async def _fetch_commits_batch(self, repo_path: str, hashes: list[str]) -> str: """Fetch full commit data for exactly the given hashes (no parent traversal).""" cmd = [ "git", - "-c", "core.abbrevCommit=false", + "-c", + "core.abbrevCommit=false", "-C", repo_path, "log", @@ -776,9 +780,7 @@ async def process_single_chunk(chunk_start_idx: int, chunk_end_idx: int): raise running_tasks = [ - asyncio.create_task( - process_single_chunk(i, min(i + chunk_size, len(commit_texts))) - ) + asyncio.create_task(process_single_chunk(i, min(i + chunk_size, len(commit_texts)))) for i in range(0, len(commit_texts), chunk_size) ] diff --git a/services/apps/git_integration/src/crowdgit/services/utils.py b/services/apps/git_integration/src/crowdgit/services/utils.py index 633972c138..502d408aed 100644 --- a/services/apps/git_integration/src/crowdgit/services/utils.py +++ b/services/apps/git_integration/src/crowdgit/services/utils.py @@ -9,10 +9,10 @@ EmptyRepoError, ForbiddenError, NetworkError, - PermissionError, RateLimitError, RemoteServerError, RepoAuthRequiredError, + RepoPermissionError, ValidationError, ) from crowdgit.logger import logger @@ -175,7 +175,7 @@ async def get_default_branch(repo_path: str) -> str: # (stderr_patterns, exception_class) ({"No space left on device"}, DiskSpaceError), ({"Network is unreachable", "Connection refused", "Connection timed out"}, NetworkError), - ({"Permission denied"}, PermissionError), + ({"Permission denied"}, RepoPermissionError), ({"The requested URL returned error: 403"}, ForbiddenError), ({"The requested URL returned error: 429"}, RateLimitError), ({"The requested URL returned error: 5"}, RemoteServerError), diff --git a/services/apps/git_integration/src/crowdgit/settings.py b/services/apps/git_integration/src/crowdgit/settings.py index 9a81a8f493..b0e2138ef6 100644 --- a/services/apps/git_integration/src/crowdgit/settings.py +++ b/services/apps/git_integration/src/crowdgit/settings.py @@ -2,7 +2,7 @@ def load_env_var(key: str, required=True, default=None): - value = os.getenv(key) or default + value = os.getenv(key, default) if required and value is None: raise OSError(f"Missing required environment variable: {key}") return value diff --git a/services/apps/git_integration/src/crowdgit/worker/repository_worker.py b/services/apps/git_integration/src/crowdgit/worker/repository_worker.py index 1c26544e54..6fe4f83169 100644 --- a/services/apps/git_integration/src/crowdgit/worker/repository_worker.py +++ b/services/apps/git_integration/src/crowdgit/worker/repository_worker.py @@ -15,11 +15,9 @@ ForbiddenError, InternalError, ParentRepoInvalidError, - PermissionError as RepoPermissionError, RepoAuthRequiredError, + RepoPermissionError, ) - -# Import configured loguru logger from crowdgit.logger from crowdgit.logger import logger from crowdgit.models import Repository from crowdgit.services import ( @@ -99,6 +97,11 @@ async def shutdown(self): async def _heartbeat_loop(self, repo_id: str, stop_event: asyncio.Event) -> None: """Periodically refresh lockedAt so the repo is not reclaimed as a stale lock.""" + # Refresh immediately so the first tick is not delayed by LOCK_HEARTBEAT_INTERVAL_SEC. + try: + await update_lock_heartbeat(repo_id) + except Exception as e: + logger.warning(f"Heartbeat update failed for repo {repo_id}: {e}") while not stop_event.is_set(): await asyncio.sleep(LOCK_HEARTBEAT_INTERVAL_SEC) if not stop_event.is_set(): From fb6bed1089ca7616bd8f0d9175afacca7348c227 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Mon, 18 May 2026 14:51:36 +0200 Subject: [PATCH 3/7] fix: comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../src/crowdgit/models/clone_batch.py | 4 +++ .../crowdgit/services/clone/clone_service.py | 22 +++++++++---- .../services/commit/commit_service.py | 11 ++++++- .../git_integration/src/crowdgit/settings.py | 2 +- .../src/crowdgit/worker/repository_worker.py | 31 ++++++++++++++----- 5 files changed, 54 insertions(+), 16 deletions(-) diff --git a/services/apps/git_integration/src/crowdgit/models/clone_batch.py b/services/apps/git_integration/src/crowdgit/models/clone_batch.py index 3a3e337a01..7cf5ca0494 100644 --- a/services/apps/git_integration/src/crowdgit/models/clone_batch.py +++ b/services/apps/git_integration/src/crowdgit/models/clone_batch.py @@ -11,6 +11,10 @@ class CloneBatchInfo(BaseModel): latest_commit_in_repo: str | None = Field( None, description="Hash of the latest commit in repo" ) + branch_changed: bool = Field( + default=False, + description="True when default branch changed and clone was wiped — last_processed_commit is stale and must be ignored", + ) class Config: """Pydantic configuration""" diff --git a/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py b/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py index 24377dfeed..4a9ae07004 100644 --- a/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py +++ b/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py @@ -24,6 +24,7 @@ NetworkError, RateLimitError, RemoteServerError, + RepoLockingError, ) from crowdgit.models import CloneBatchInfo, Repository, ServiceExecution from crowdgit.services.base.base_service import BaseService @@ -299,15 +300,22 @@ async def clone_batches_generator( # another acquires the DB lock after STUCK_REPO_TIMEOUT_HOURS, this flock ensures # the new pod does not mutate the directory while the crashed pod's git subprocess # is still running. Non-blocking: raises OSError(EWOULDBLOCK) if held elsewhere. - flock_path = os.path.join(repo_path, ".crowdgit.lock") - flock_fd = open(flock_path, "w") + # + # Lock file lives OUTSIDE repo_path so _wipe_and_reclone (rmtree + makedirs) does + # not unlink the inode that the kernel's flock is attached to — if it did, a second + # opener would get a fresh inode and a fresh lock, silently bypassing exclusion. + # "a" mode avoids truncating the file on concurrent open() calls (harmless, but + # cleaner than "w"). + flock_path = os.path.join(REPO_STORAGE_ROOT, f"{repository.id}.lock") + flock_fd = open(flock_path, "a") try: fcntl.flock(flock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) except OSError: flock_fd.close() - raise CommandExecutionError( - f"OS flock busy for {repo_path} — another process still holds the repo", - returncode=1, + # RepoLockingError is in _TRANSIENT_ERRORS so the outer except will NOT wipe + # the persistent clone that the other process is actively using. + raise RepoLockingError( + f"OS flock busy for {repo_path} — another process still holds the repo" ) from None try: @@ -323,12 +331,13 @@ async def clone_batches_generator( ) if default_branch_changed: await self._wipe_and_reclone(repo_path, remote) + batch_info.branch_changed = True else: await self._incremental_fetch(repo_path, remote) else: # Defensive: wipe in case a prior crash left checkout artifacts with no .git. shutil.rmtree(repo_path, ignore_errors=True) - os.makedirs(repo_path) + os.makedirs(repo_path, exist_ok=True) await self._perform_full_clone(repo_path, remote) finally: fcntl.flock(flock_fd, fcntl.LOCK_UN) @@ -359,6 +368,7 @@ async def clone_batches_generator( RemoteServerError, DiskSpaceError, CommandTimeoutError, + RepoLockingError, # flock busy: another pod holds repo, nothing was touched locally ) if stable_path and os.path.isdir(os.path.join(stable_path, ".git")): if not isinstance(e, _TRANSIENT_ERRORS): diff --git a/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py b/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py index 7364e8a304..7c02b6061a 100644 --- a/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py +++ b/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py @@ -241,10 +241,14 @@ async def _process_with_hash_streaming( """ repo_path = batch_info.repo_path commit_reference = await self._get_commit_reference(repo_path) - if repository.last_processed_commit: + if repository.last_processed_commit and not batch_info.branch_changed: commit_range = f"{repository.last_processed_commit}..{commit_reference}" self.logger.info(f"Processing incremental range: {commit_range}") else: + if batch_info.branch_changed: + self.logger.info( + f"Branch changed for {batch_info.remote} — ignoring stale last_processed_commit, processing full history" + ) commit_range = commit_reference self.logger.info(f"Processing full history in {commit_reference}") @@ -263,6 +267,11 @@ async def _process_with_hash_streaming( await self._process_activities_from_commits(raw_commits, batch_info, repository) # Checkpoint at last hash in batch (oldest-first from rev-list = newest in batch). # Next run resumes from batch_hashes[-1]..HEAD. + # Note: in merge-heavy repos batch_hashes[-1] may not be an ancestor of every earlier + # hash in the same batch (rev-list topological order interleaves parallel branches). + # On crash-resume, commits that aren't reachable from batch_hashes[-1] are re-fetched + # and re-processed — duplicates are prevented by batch_check_parent_activities which + # deduplicates on commit hash before insert. No commits are skipped. await update_last_processed_commit( repo_id=repository.id, commit_hash=batch_hashes[-1], diff --git a/services/apps/git_integration/src/crowdgit/settings.py b/services/apps/git_integration/src/crowdgit/settings.py index b0e2138ef6..e72794fb6a 100644 --- a/services/apps/git_integration/src/crowdgit/settings.py +++ b/services/apps/git_integration/src/crowdgit/settings.py @@ -39,6 +39,6 @@ def load_env_var(key: str, required=True, default=None): MAX_CONCURRENT_ONBOARDINGS = int(load_env_var("MAX_CONCURRENT_ONBOARDINGS", default="3")) MAX_INTEGRATION_RESULTS = int(load_env_var("MAX_INTEGRATION_RESULTS", default="5000000")) LOCK_HEARTBEAT_INTERVAL_SEC = int(load_env_var("LOCK_HEARTBEAT_INTERVAL_SEC", default="300")) -STUCK_REPO_TIMEOUT_HOURS = int(load_env_var("STUCK_REPO_TIMEOUT_HOURS", default="1")) +STUCK_REPO_TIMEOUT_HOURS = int(load_env_var("STUCK_REPO_TIMEOUT_HOURS", default="4")) FAILED_RETRY_INTERVAL_HOURS = int(load_env_var("FAILED_RETRY_INTERVAL_HOURS", default="1")) REPO_STORAGE_ROOT = load_env_var("REPO_STORAGE_ROOT", required=False, default=None) diff --git a/services/apps/git_integration/src/crowdgit/worker/repository_worker.py b/services/apps/git_integration/src/crowdgit/worker/repository_worker.py index 6fe4f83169..302391fbdd 100644 --- a/services/apps/git_integration/src/crowdgit/worker/repository_worker.py +++ b/services/apps/git_integration/src/crowdgit/worker/repository_worker.py @@ -32,6 +32,7 @@ from crowdgit.services.utils import get_default_branch, get_repo_name from crowdgit.settings import ( LOCK_HEARTBEAT_INTERVAL_SEC, + STUCK_REPO_TIMEOUT_HOURS, WORKER_ERROR_BACKOFF_SEC, WORKER_POLLING_INTERVAL_SEC, ) @@ -97,18 +98,32 @@ async def shutdown(self): async def _heartbeat_loop(self, repo_id: str, stop_event: asyncio.Event) -> None: """Periodically refresh lockedAt so the repo is not reclaimed as a stale lock.""" + consecutive_failures = 0 + max_failures_before_alarm = max( + 1, (STUCK_REPO_TIMEOUT_HOURS * 3600) // (LOCK_HEARTBEAT_INTERVAL_SEC * 2) + ) + + async def _beat() -> None: + nonlocal consecutive_failures + try: + await update_lock_heartbeat(repo_id) + consecutive_failures = 0 + except Exception as e: + consecutive_failures += 1 + if consecutive_failures >= max_failures_before_alarm: + logger.error( + f"Heartbeat failed {consecutive_failures}x for repo {repo_id} — " + f"lock may expire soon (timeout={STUCK_REPO_TIMEOUT_HOURS}h): {e}" + ) + else: + logger.warning(f"Heartbeat update failed for repo {repo_id}: {e}") + # Refresh immediately so the first tick is not delayed by LOCK_HEARTBEAT_INTERVAL_SEC. - try: - await update_lock_heartbeat(repo_id) - except Exception as e: - logger.warning(f"Heartbeat update failed for repo {repo_id}: {e}") + await _beat() while not stop_event.is_set(): await asyncio.sleep(LOCK_HEARTBEAT_INTERVAL_SEC) if not stop_event.is_set(): - try: - await update_lock_heartbeat(repo_id) - except Exception as e: - logger.warning(f"Heartbeat update failed for repo {repo_id}: {e}") + await _beat() async def _process_repositories(self): """ From 5f5d69661d010fd64e03e94987033106c6f9c9b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Mon, 18 May 2026 15:07:30 +0200 Subject: [PATCH 4/7] fix: comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../crowdgit/services/clone/clone_service.py | 40 +++++++++++-------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py b/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py index 4a9ae07004..70a041b74a 100644 --- a/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py +++ b/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py @@ -325,6 +325,7 @@ async def clone_batches_generator( f"Repo at {repo_path} is in an invalid state, wiping and re-cloning" ) await self._wipe_and_reclone(repo_path, remote) + batch_info.branch_changed = True else: default_branch_changed = await self.has_default_branch_changed( remote, repository.branch @@ -339,6 +340,26 @@ async def clone_batches_generator( shutil.rmtree(repo_path, ignore_errors=True) os.makedirs(repo_path, exist_ok=True) await self._perform_full_clone(repo_path, remote) + except Exception as inner_exc: + # Wipe while flock is still held so no other pod can grab the flock and start + # operating on a directory we are about to delete. Releasing the flock first + # (in finally below) and wiping in the outer except creates a race window. + _TRANSIENT = ( + RateLimitError, + NetworkError, + RemoteServerError, + DiskSpaceError, + CommandTimeoutError, + RepoLockingError, + ) + if not isinstance(inner_exc, _TRANSIENT) and os.path.isdir( + os.path.join(stable_path, ".git") + ): + self.logger.warning( + f"Wiping persistent clone at {stable_path} after non-transient failure" + ) + shutil.rmtree(stable_path, ignore_errors=True) + raise finally: fcntl.flock(flock_fd, fcntl.LOCK_UN) flock_fd.close() @@ -359,23 +380,8 @@ async def clone_batches_generator( e.error_code.value if isinstance(e, CrowdGitError) else ErrorCode.UNKNOWN.value ) self.logger.error(f"Cloning failed: {error_message}") - # Wipe persistent clone only on non-transient failures that indicate local git - # corruption. Transient/remote errors (network, rate-limit, disk-full, timeout) - # leave the local clone intact — no point re-cloning 6+ GB because GitHub was down. - _TRANSIENT_ERRORS = ( - RateLimitError, - NetworkError, - RemoteServerError, - DiskSpaceError, - CommandTimeoutError, - RepoLockingError, # flock busy: another pod holds repo, nothing was touched locally - ) - if stable_path and os.path.isdir(os.path.join(stable_path, ".git")): - if not isinstance(e, _TRANSIENT_ERRORS): - self.logger.warning( - f"Wiping persistent clone at {stable_path} after non-transient failure" - ) - shutil.rmtree(stable_path, ignore_errors=True) + # Wipe-on-failure is handled inside the flock-held inner except block above, + # so the rmtree always happens before the flock is released (no race window). raise finally: # Only clean up ephemeral temp dirs. Persistent clones are intentionally kept. From f667bce896ad814fafa1486f7aba8c11cb4cc6d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Mon, 18 May 2026 20:28:36 +0200 Subject: [PATCH 5/7] fix: comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../crowdgit/services/clone/clone_service.py | 35 +++++++++++++------ .../services/commit/commit_service.py | 24 ++++++++++--- .../src/crowdgit/worker/repository_worker.py | 4 ++- 3 files changed, 46 insertions(+), 17 deletions(-) diff --git a/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py b/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py index 70a041b74a..4314790833 100644 --- a/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py +++ b/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py @@ -191,16 +191,21 @@ async def _is_repo_valid(self, repo_path: str) -> bool: async def _incremental_fetch(self, repo_path: str, remote: str) -> None: """Fetch latest commits into a persistent clone and update the working tree.""" # Remove stale git lockfiles left by a previous crash before touching the repo. + # Target known locations only — os.walk(.git) on large repos (e.g. linux kernel) blocks + # the event loop for seconds scanning hundreds of thousands of pack/ref files. git_dir = os.path.join(repo_path, ".git") - for dirpath, _, filenames in os.walk(git_dir): - for filename in filenames: - if filename.endswith(".lock"): - lockpath = os.path.join(dirpath, filename) - try: - os.remove(lockpath) - self.logger.warning(f"Removed stale lockfile: {lockpath}") - except OSError as e: - self.logger.warning(f"Failed to remove stale lockfile {lockpath}: {e}") + _KNOWN_LOCKFILES = [ + os.path.join(git_dir, "index.lock"), + os.path.join(git_dir, "HEAD.lock"), + os.path.join(git_dir, "packed-refs.lock"), + ] + for lockpath in _KNOWN_LOCKFILES: + if os.path.exists(lockpath): + try: + os.remove(lockpath) + self.logger.warning(f"Removed stale lockfile: {lockpath}") + except OSError as e: + self.logger.warning(f"Failed to remove stale lockfile {lockpath}: {e}") default_branch = await get_default_branch(repo_path) if default_branch == "*": @@ -224,7 +229,13 @@ async def _wipe_and_reclone(self, repo_path: str, remote: str) -> None: self.logger.info(f"Wiping and re-cloning {remote}") shutil.rmtree(repo_path) os.makedirs(repo_path) - await self._perform_full_clone(repo_path, remote) + try: + await self._perform_full_clone(repo_path, remote) + except Exception: + # Clone failed — wipe the partial .git so the next run starts fresh rather than + # attempting an incremental fetch against a half-written clone. + shutil.rmtree(repo_path, ignore_errors=True) + raise async def has_default_branch_changed(self, remote: str, saved_branch: str | None) -> bool: """Check if the default branch has changed compared to the saved branch @@ -348,9 +359,11 @@ async def clone_batches_generator( RateLimitError, NetworkError, RemoteServerError, - DiskSpaceError, CommandTimeoutError, RepoLockingError, + # DiskSpaceError intentionally excluded: a full-disk clone leaves a + # partial .git that _is_repo_valid may accept on the next run, causing + # _incremental_fetch to run against a corrupt local clone. Wipe instead. ) if not isinstance(inner_exc, _TRANSIENT) and os.path.isdir( os.path.join(stable_path, ".git") diff --git a/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py b/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py index 7c02b6061a..53e40a931f 100644 --- a/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py +++ b/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py @@ -207,7 +207,7 @@ async def _get_commit_reference(self, repo_path: str) -> str: @retry(stop=stop_after_attempt(5), wait=wait_fixed(1), reraise=True) async def _get_commit_hashes(self, repo_path: str, commit_range: str) -> list[str]: - """Stream commit hashes oldest-first via rev-list. No buffering in git subprocess.""" + """Return commit hashes oldest-first via rev-list. Output is buffered in-process (~41 bytes × N commits).""" output = await run_shell_command( ["git", "-C", repo_path, "rev-list", "--reverse", commit_range] ) @@ -242,8 +242,20 @@ async def _process_with_hash_streaming( repo_path = batch_info.repo_path commit_reference = await self._get_commit_reference(repo_path) if repository.last_processed_commit and not batch_info.branch_changed: - commit_range = f"{repository.last_processed_commit}..{commit_reference}" - self.logger.info(f"Processing incremental range: {commit_range}") + # Verify the commit is reachable in the current clone before using it as a range + # boundary. In the ephemeral path (no REPO_STORAGE_ROOT) a branch change is not + # flagged via batch_info, so last_processed_commit may point to the old branch. + try: + await run_shell_command( + ["git", "-C", repo_path, "cat-file", "-e", f"{repository.last_processed_commit}^{{commit}}"] + ) + commit_range = f"{repository.last_processed_commit}..{commit_reference}" + self.logger.info(f"Processing incremental range: {commit_range}") + except Exception: + self.logger.warning( + f"last_processed_commit {repository.last_processed_commit} not reachable in current clone — processing full history" + ) + commit_range = commit_reference else: if batch_info.branch_changed: self.logger.info( @@ -270,8 +282,10 @@ async def _process_with_hash_streaming( # Note: in merge-heavy repos batch_hashes[-1] may not be an ancestor of every earlier # hash in the same batch (rev-list topological order interleaves parallel branches). # On crash-resume, commits that aren't reachable from batch_hashes[-1] are re-fetched - # and re-processed — duplicates are prevented by batch_check_parent_activities which - # deduplicates on commit hash before insert. No commits are skipped. + # and re-processed. Note: batch_check_parent_activities deduplication only runs for + # fork repos (when parent_repo is set). For non-fork repos, replayed activities are + # re-inserted; downstream consumers deduplicate on (sourceId, type, platform). + # No commits are skipped. await update_last_processed_commit( repo_id=repository.id, commit_hash=batch_hashes[-1], diff --git a/services/apps/git_integration/src/crowdgit/worker/repository_worker.py b/services/apps/git_integration/src/crowdgit/worker/repository_worker.py index 302391fbdd..bafdd2443d 100644 --- a/services/apps/git_integration/src/crowdgit/worker/repository_worker.py +++ b/services/apps/git_integration/src/crowdgit/worker/repository_worker.py @@ -118,7 +118,9 @@ async def _beat() -> None: else: logger.warning(f"Heartbeat update failed for repo {repo_id}: {e}") - # Refresh immediately so the first tick is not delayed by LOCK_HEARTBEAT_INTERVAL_SEC. + # Fire one beat before the loop so the first refresh happens as soon as the task is + # scheduled (the task may not run until the first await inside _process_single_repository, + # but this minimises the gap vs. waiting a full LOCK_HEARTBEAT_INTERVAL_SEC). await _beat() while not stop_event.is_set(): await asyncio.sleep(LOCK_HEARTBEAT_INTERVAL_SEC) From 42fd9f84939566b88cbb4eb7e15619fc0d48f07a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Mon, 18 May 2026 20:37:06 +0200 Subject: [PATCH 6/7] fix: comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../git_integration/src/crowdgit/services/clone/clone_service.py | 1 - 1 file changed, 1 deletion(-) diff --git a/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py b/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py index 4314790833..6a42d7bc7d 100644 --- a/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py +++ b/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py @@ -20,7 +20,6 @@ CommandExecutionError, CommandTimeoutError, CrowdGitError, - DiskSpaceError, NetworkError, RateLimitError, RemoteServerError, From 53d87a2f45ea856bec37d9138a086e18c8401090 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Mon, 18 May 2026 20:58:11 +0200 Subject: [PATCH 7/7] fix: comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../src/crowdgit/services/clone/clone_service.py | 2 +- .../src/crowdgit/services/commit/commit_service.py | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py b/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py index 6a42d7bc7d..126c68b048 100644 --- a/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py +++ b/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py @@ -173,7 +173,7 @@ async def _perform_full_clone(self, repo_path: str, remote: str): def _stable_repo_path(self, repo_id: str) -> str | None: """Returns a stable on-disk path for the repo when REPO_STORAGE_ROOT is set, else None.""" - if REPO_STORAGE_ROOT is None: + if not REPO_STORAGE_ROOT: return None return os.path.join(REPO_STORAGE_ROOT, str(repo_id)) diff --git a/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py b/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py index 53e40a931f..b06678b8ab 100644 --- a/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py +++ b/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py @@ -247,7 +247,14 @@ async def _process_with_hash_streaming( # flagged via batch_info, so last_processed_commit may point to the old branch. try: await run_shell_command( - ["git", "-C", repo_path, "cat-file", "-e", f"{repository.last_processed_commit}^{{commit}}"] + [ + "git", + "-C", + repo_path, + "cat-file", + "-e", + f"{repository.last_processed_commit}^{{commit}}", + ] ) commit_range = f"{repository.last_processed_commit}..{commit_reference}" self.logger.info(f"Processing incremental range: {commit_range}")