From 0aae39fb56282e0414ddb808d1d6f43aa0711048 Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Wed, 20 May 2026 18:43:32 +0100 Subject: [PATCH 1/4] fix: clone all batches before processing commits to avoid broken range Signed-off-by: Mouad BANI --- .../crowdgit/services/clone/clone_service.py | 5 +- .../services/commit/commit_service.py | 78 +++---------------- .../src/crowdgit/worker/repository_worker.py | 9 +-- 3 files changed, 18 insertions(+), 74 deletions(-) diff --git a/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py b/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py index 430127b1e1..ebe277f1e4 100644 --- a/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py +++ b/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py @@ -249,7 +249,6 @@ async def _update_batch_info( return batch_info.is_final_batch = await self._check_if_final_batch(repo_path, target_commit_hash) - batch_info.edge_commit = await self._get_edge_commit(repo_path) async def _get_edge_commit(self, repo_path: str) -> str | None: """ @@ -468,7 +467,6 @@ async def clone_batches_generator( batch_info.is_first_batch = False while not batch_info.is_final_batch: batch_start_time = time.time() - batch_info.prev_batch_edge_commit = await self._get_edge_commit(temp_repo_path) await self._clone_next_batch(temp_repo_path, batch_depth, remote) await self._update_batch_info( batch_info, @@ -479,7 +477,6 @@ async def clone_batches_generator( batch_end_time = time.time() total_execution_time += round(batch_end_time - batch_start_time, 2) - yield batch_info if not batch_info.is_final_batch: # exponential deepen increment to speed up fetching batch_depth = min( @@ -487,6 +484,8 @@ async def clone_batches_generator( MAX_BATCH_DEEPEN, ) + yield batch_info + except Exception as e: # Handle both CrowdGitError and generic Exception execution_status = ExecutionStatus.FAILURE diff --git a/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py b/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py index 013167d13d..718c288ac0 100644 --- a/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py +++ b/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py @@ -132,8 +132,6 @@ async def process_single_batch_commits( raw_commits = await self._execute_git_log( batch_info.repo_path, batch_info.clone_with_batches, - batch_info.prev_batch_edge_commit, - batch_info.edge_commit, repository.last_processed_commit, ) @@ -250,44 +248,6 @@ def _parse_numstats(self, raw_numstats: str) -> tuple[int, int]: return (insertions, deletions) - async def _get_optimized_commit_range( - self, - repo_path: str, - edge_commit: str, - prev_batch_edge_commit: str, - last_processed_commit: str | None, - ) -> str: - """ - Optimize commit range by using last_processed_commit if available in current batch. - - For middle batches, returns the slice of history between the last batch's edge and this one's. - If last_processed_commit exists in the current batch and is not the shallow boundary, - uses it as the range start to skip already-processed commits. - - Args: - repo_path: Local repository path - edge_commit: Current batch's edge commit (shallow boundary) - prev_batch_edge_commit: Previous batch's edge commit (upper bound of range) - last_processed_commit: Last commit that was successfully processed (if any) - - Returns: - Git commit range string (e.g., "commit_a..commit_b") - """ - - default_commit_range = f"{edge_commit}..{prev_batch_edge_commit}" - - if last_processed_commit and last_processed_commit != edge_commit: - try: - self.logger.info("Checking last processed commit existence in current batch") - await run_shell_command( - ["git", "cat-file", "-e", last_processed_commit], cwd=repo_path - ) - self.logger.info("Found! using optimized range") - default_commit_range = f"{last_processed_commit}..{prev_batch_edge_commit}" - except Exception: - self.logger.info("last processed commit not found in range") - return default_commit_range - @retry( stop=stop_after_attempt(5), wait=wait_fixed(1), @@ -297,8 +257,6 @@ async def _execute_git_log( self, repo_path: str, clone_with_batches: bool, - prev_batch_edge_commit: str | None = None, - edge_commit: str | None = None, last_processed_commit: str | None = None, ) -> str: """Execute git log command and return raw output.""" @@ -311,34 +269,22 @@ async def _execute_git_log( if not clone_with_batches: commit_reference = await self._get_commit_reference(repo_path) - self.logger.info( - f"Full repo cloned in single batch, getting all commits in {commit_reference}" + self.logger.info(f"Full clone: getting all commits in {commit_reference}") + return await run_shell_command( + self._build_git_log_command(repo_path, commit_reference) ) - raw_commits_cmd = self._build_git_log_command(repo_path, commit_reference) - return await run_shell_command(raw_commits_cmd) - - if not prev_batch_edge_commit: - return "" - if edge_commit: - commit_range = await self._get_optimized_commit_range( - repo_path, edge_commit, prev_batch_edge_commit, last_processed_commit - ) - self.logger.info(f"Processing middle batch: {commit_range}") + # Batched clone: deepening is complete, run last_processed..HEAD + if last_processed_commit: + commit_range = f"{last_processed_commit}..HEAD" else: - # Final batch: Get all commits from the last known edge to the root. - commit_range = prev_batch_edge_commit - self.logger.info(f"Processing final batch from: {prev_batch_edge_commit} to root") - - raw_commits_cmd = self._build_git_log_command(repo_path, commit_range) - + commit_range = "HEAD" self.logger.info(f"Executing git log for range: {commit_range}") - return await run_shell_command(raw_commits_cmd) + return await run_shell_command(self._build_git_log_command(repo_path, commit_range)) - def should_skip_commit(self, raw_commit: str | None, edge_commit: str | None) -> bool: - """Check if commit should be skipped based on edge commit comparison.""" - # Only skip the boundary commit of the current shallow clone. - return not raw_commit or (edge_commit and raw_commit.startswith(edge_commit)) + def should_skip_commit(self, raw_commit: str | None) -> bool: + """Check if commit should be skipped.""" + return not raw_commit def clean_up_username(self, name: str): name = re.sub(r"(?i)Reviewed[- ]by:", "", name) @@ -718,7 +664,7 @@ async def process_commits_chunk( commit = None for full_commit_text in commit_texts_chunk: - if self.should_skip_commit(full_commit_text, batch_info.edge_commit): + if self.should_skip_commit(full_commit_text): continue commit_text, numstats_text = full_commit_text.split(self.NUMSTAT_SPLITTER) commit_lines = commit_text.strip().splitlines() diff --git a/services/apps/git_integration/src/crowdgit/worker/repository_worker.py b/services/apps/git_integration/src/crowdgit/worker/repository_worker.py index 292eb9c7ca..3f74301a8e 100644 --- a/services/apps/git_integration/src/crowdgit/worker/repository_worker.py +++ b/services/apps/git_integration/src/crowdgit/worker/repository_worker.py @@ -244,12 +244,11 @@ async def _process_single_repository(self, repository: Repository): await self.maintainer_service.process_maintainers(repository, batch_info) licenses = await self.license_service.detect(batch_info.repo_path) await update_repository_licenses(repository.id, licenses) - await self.commit_service.process_single_batch_commits( - repository, - batch_info, - ) - if batch_info.is_final_batch: + await self.commit_service.process_single_batch_commits( + repository, + batch_info, + ) await update_last_processed_commit( repo_id=repository.id, commit_hash=batch_info.latest_commit_in_repo, From 67ee965d64ec434688d41d792c151abd32b98891 Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Thu, 21 May 2026 12:56:34 +0100 Subject: [PATCH 2/4] chore: refactoring after changing commits processing approach Signed-off-by: Mouad BANI --- .../src/crowdgit/models/clone_batch.py | 8 - .../crowdgit/services/clone/clone_service.py | 109 ++++++++----- .../services/commit/commit_service.py | 152 ++++++------------ .../src/crowdgit/worker/repository_worker.py | 43 +---- .../src/test/test_activity_extraction.py | 4 +- 5 files changed, 123 insertions(+), 193 deletions(-) diff --git a/services/apps/git_integration/src/crowdgit/models/clone_batch.py b/services/apps/git_integration/src/crowdgit/models/clone_batch.py index 6e98d04f99..9e203fe177 100644 --- a/services/apps/git_integration/src/crowdgit/models/clone_batch.py +++ b/services/apps/git_integration/src/crowdgit/models/clone_batch.py @@ -11,14 +11,6 @@ class CloneBatchInfo(BaseModel): latest_commit_in_repo: str | None = Field( None, description="Hash of the latest commit in repo" ) - edge_commit: str | None = Field( - default=None, - description="The oldest commit in the current batch, used to track progress during incremental processing.", - ) - prev_batch_edge_commit: str | None = Field( - default=None, - description="The edge commit from the previous batch, used to track progress during incremental processing.", - ) clone_with_batches: bool = Field( default=True, description="Whether repo is cloned with batches" ) diff --git a/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py b/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py index ebe277f1e4..18239578ca 100644 --- a/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py +++ b/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py @@ -3,6 +3,7 @@ import tempfile import time from collections.abc import AsyncIterator +from datetime import datetime, timezone from decimal import Decimal import aiofiles @@ -22,6 +23,7 @@ NetworkError, RateLimitError, RemoteServerError, + ReOnboardingRequiredError, ) from crowdgit.models import CloneBatchInfo, Repository, ServiceExecution from crowdgit.services.base.base_service import BaseService @@ -31,6 +33,10 @@ get_repo_name, run_shell_command, ) +from crowdgit.settings import ( + STUCK_ONBOARDING_REPO_TIMEOUT_HOURS, + STUCK_RECURRENT_REPO_TIMEOUT_HOURS, +) DEFAULT_STORAGE_OPTIMIZATION_THRESHOLD_MB = 10000 INITIAL_BATCH_DEEPEN = 50 @@ -69,20 +75,64 @@ async def _configure_global_git_client(self, path: str) -> None: await run_shell_command( ["git", "config", "--global", "fetch.recurseSubmodules", "false"], cwd=path ) + # Ensure abbreviated commits are disabled + await run_shell_command( + ["git", "-C", path, "config", "core.abbrevCommit", "false"], cwd=path + ) + + async def _is_shallow_clone(self, path: str) -> bool: + result = await run_shell_command(["git", "rev-parse", "--is-shallow-repository"], cwd=path) + return "true" in result + + def _is_timeout_reached(self, repository: Repository) -> bool: + processing_duration_hours = ( + datetime.now(timezone.utc) - repository.locked_at.astimezone(timezone.utc) + ).total_seconds() / 3600 + is_onboarding = repository.last_processed_commit is None + timeout_hours = ( + STUCK_ONBOARDING_REPO_TIMEOUT_HOURS + if is_onboarding + else STUCK_RECURRENT_REPO_TIMEOUT_HOURS + ) + if processing_duration_hours >= timeout_hours: + self.logger.warning( + f"Repo {repository.url} is stuck for {processing_duration_hours:.1f} hours — queuing for re-onboarding" + ) + return True + return False - async def _check_if_final_batch(self, path: str, target_commit_hash: str | None) -> bool: + async def _check_if_final_batch(self, path: str, repository: Repository) -> bool: """ - Final batch is determined if: - - full history is cloned (no longer shallow_clone) - - target commit reached + Check whether the current batch is the final one. + + Returns True when: + - full history fetched (onboarding — no last_processed_commit to look for), or + - all commits between last_processed_commit and HEAD are available (no shallow boundary in range). + + Raises ReOnboardingRequiredError when: + - force push detected (full clone but last_processed_commit missing), or + - processing timeout exceeded while still deepening. """ - is_shallow_clone = await run_shell_command( - ["git", "rev-parse", "--is-shallow-repository"], cwd=path - ) - if "false" in is_shallow_clone: + is_shallow_clone = await self._is_shallow_clone(path) + is_full_clone = not is_shallow_clone + + target_commit_hash = repository.last_processed_commit + if is_full_clone and not target_commit_hash: + # fully cloned at once return True - if not target_commit_hash: - return False + + reached_target_commit = await self._has_required_commit_range(path, target_commit_hash) + if reached_target_commit: + return True + + timeout_reached = self._is_timeout_reached(repository) + force_push_detected = is_full_clone and not reached_target_commit + if timeout_reached or force_push_detected: + raise ReOnboardingRequiredError() + + return False + + async def _has_required_commit_range(self, path: str, target_commit_hash: str) -> bool: try: await run_shell_command( ["git", "rev-parse", "--verify", f"{target_commit_hash}^{{commit}}"], cwd=path @@ -225,13 +275,14 @@ async def _update_batch_info( self, batch_info: CloneBatchInfo, repo_path: str, - target_commit_hash: str | None, + repository: Repository, clone_with_batches: bool, ) -> None: """Update batch info with repo path and final batch status. For full clones (clone_with_batches=False): Marks as final batch immediately. - For batched clones (clone_with_batches=True): Checks if target commit reached or full history fetched. + For batched clones (clone_with_batches=True): Delegates to _check_if_final_batch + to determine if required commit range is available or timeout/force-push occurred. """ batch_info.repo_path = repo_path batch_info.clone_with_batches = clone_with_batches @@ -248,21 +299,7 @@ async def _update_batch_info( batch_info.is_final_batch = True return - batch_info.is_final_batch = await self._check_if_final_batch(repo_path, target_commit_hash) - - async def _get_edge_commit(self, repo_path: str) -> str | None: - """ - Returns the edge commit of a shallow clone by reading the .git/shallow file, - which contains the boundary commit(s) when history is truncated. - - If the full history has been cloned, the .git/shallow file does not exist. - """ - boundaries = await self._read_shallow_file(repo_path) - if not boundaries: - return None - oldest_commit = boundaries[0] - self.logger.info(f"Edge commit: {oldest_commit}") - return oldest_commit + batch_info.is_final_batch = await self._check_if_final_batch(repo_path, repository) async def _cleanup_temp_directory(self, temp_repo_path: str, repo_id: str) -> None: """ @@ -455,10 +492,9 @@ async def clone_batches_generator( temp_repo_path, remote, repository.branch, repository.last_processed_commit ) await self._update_batch_info( - batch_info, temp_repo_path, repository.last_processed_commit, clone_with_batches + batch_info, temp_repo_path, repository, clone_with_batches ) - batch_end_time = time.time() - total_execution_time += round(batch_end_time - batch_start_time, 2) + total_execution_time += round(time.time() - batch_start_time, 2) yield batch_info if working_dir_cleanup: @@ -468,14 +504,10 @@ async def clone_batches_generator( while not batch_info.is_final_batch: batch_start_time = time.time() await self._clone_next_batch(temp_repo_path, batch_depth, remote) - await self._update_batch_info( - batch_info, - temp_repo_path, - repository.last_processed_commit, - clone_with_batches, + batch_info.is_final_batch = await self._check_if_final_batch( + temp_repo_path, repository ) - batch_end_time = time.time() - total_execution_time += round(batch_end_time - batch_start_time, 2) + total_execution_time += round(time.time() - batch_start_time, 2) if not batch_info.is_final_batch: # exponential deepen increment to speed up fetching @@ -484,7 +516,8 @@ async def clone_batches_generator( MAX_BATCH_DEEPEN, ) - yield batch_info + if clone_with_batches: + yield batch_info except Exception as e: # Handle both CrowdGitError and generic Exception diff --git a/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py b/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py index 718c288ac0..496847f659 100644 --- a/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py +++ b/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py @@ -17,6 +17,7 @@ batch_check_parent_activities, batch_insert_activities, save_service_execution, + update_last_processed_commit, ) from crowdgit.enums import ( DataSinkWorkerQueueMessageType, @@ -26,7 +27,7 @@ IntegrationResultType, OperationType, ) -from crowdgit.errors import CrowdGitError +from crowdgit.errors import CrowdGitError, InternalError from crowdgit.models import CloneBatchInfo, Repository, ServiceExecution from crowdgit.services.base.base_service import BaseService from crowdgit.services.commit.activitymap import ActivityMap @@ -58,8 +59,6 @@ class CommitService(BaseService): def __init__(self, queue_service: QueueService): super().__init__() self.queue_service = queue_service - # Metrics tracking for current repository - self._metrics_context = None @property def git_log_format(self) -> str: @@ -96,7 +95,7 @@ def is_valid_datetime(self, commit_datetime: str) -> bool: except ValueError: return False - async def process_single_batch_commits( + async def process_batch_commits( self, repository: Repository, batch_info: CloneBatchInfo, @@ -108,27 +107,18 @@ async def process_single_batch_commits( repository: Repository object containing segment and integration info batch_info: Clone batch information with paths and commit boundaries """ - # Initialize metrics context on first call - if self._metrics_context is None: - self._metrics_context = { - "start_time": time.time(), - "total_execution_time": 0.0, - "execution_status": ExecutionStatus.SUCCESS, - "error_code": None, - "error_message": None, - "total_commits": 0, - "processed_commits": 0, - "bad_commits": 0, - "skipped_activities": 0, - "total_activities": 0, - } + self._metrics_context = { + "total_commits": 0, + "processed_commits": 0, + "bad_commits": 0, + "skipped_activities": 0, + "total_activities": 0, + } - batch_start_time = time.time() + start_time = time.time() try: - self.logger.info( - f"Starting commits processing for new batch having commits older than {batch_info.prev_batch_edge_commit}" - ) + self.logger.info(f"Starting commits processing for {batch_info.remote}") raw_commits = await self._execute_git_log( batch_info.repo_path, batch_info.clone_with_batches, @@ -137,75 +127,45 @@ async def process_single_batch_commits( await self._process_activities_from_commits(raw_commits, batch_info, repository) - batch_end_time = time.time() - batch_time = round(batch_end_time - batch_start_time, 2) - self._metrics_context["total_execution_time"] += batch_time + execution_time = round(time.time() - start_time, 2) - self.logger.info( - f"Batch activity processed from {batch_info.remote} in {batch_time}sec" - ) + self.logger.info(f"Commits processed from {batch_info.remote} in {execution_time}sec") - # Save metrics if this is the final batch - if batch_info.is_final_batch: - service_execution = ServiceExecution( - repo_id=repository.id, - operation_type=OperationType.COMMIT, - status=self._metrics_context["execution_status"], - error_code=self._metrics_context["error_code"], - error_message=self._metrics_context["error_message"], - execution_time_sec=Decimal( - str(round(self._metrics_context["total_execution_time"], 2)) - ), - metrics={ - "total_commits": self._metrics_context["total_commits"], - "processed_commits": self._metrics_context["processed_commits"], - "bad_commits": self._metrics_context["bad_commits"], - "skipped_activities": self._metrics_context["skipped_activities"], - "total_activities": self._metrics_context["total_activities"], - }, - ) - await save_service_execution(service_execution) - # Reset metrics context after saving - self._metrics_context = None + service_execution = ServiceExecution( + repo_id=repository.id, + operation_type=OperationType.COMMIT, + status=ExecutionStatus.SUCCESS, + execution_time_sec=Decimal(str(execution_time)), + metrics=self._metrics_context, + ) + await save_service_execution(service_execution) except Exception as e: - # Update metrics context with error info - batch_end_time = time.time() - batch_time = round(batch_end_time - batch_start_time, 2) - self._metrics_context["total_execution_time"] += batch_time - self._metrics_context["execution_status"] = ExecutionStatus.FAILURE - error_message = e.error_message if isinstance(e, CrowdGitError) else repr(e) - self._metrics_context["error_code"] = ( - e.error_code.value if isinstance(e, CrowdGitError) else ErrorCode.UNKNOWN.value - ) - self._metrics_context["error_message"] = error_message - self.logger.error(f"Commit processing failed: {error_message}") - # Save metrics on error + execution_time = round(time.time() - start_time, 2) + service_execution = ServiceExecution( repo_id=repository.id, operation_type=OperationType.COMMIT, - status=self._metrics_context["execution_status"], - error_code=self._metrics_context["error_code"], - error_message=self._metrics_context["error_message"], - execution_time_sec=Decimal( - str(round(self._metrics_context["total_execution_time"], 2)) + status=ExecutionStatus.FAILURE, + error_code=( + e.error_code.value if isinstance(e, CrowdGitError) else ErrorCode.UNKNOWN.value ), - metrics={ - "total_commits": self._metrics_context["total_commits"], - "processed_commits": self._metrics_context["processed_commits"], - "bad_commits": self._metrics_context["bad_commits"], - "skipped_activities": self._metrics_context["skipped_activities"], - "total_activities": self._metrics_context["total_activities"], - }, + error_message=error_message, + execution_time_sec=Decimal(str(execution_time)), + metrics=self._metrics_context, ) await save_service_execution(service_execution) - # Reset metrics context after saving - self._metrics_context = None raise + await update_last_processed_commit( + repo_id=repository.id, + commit_hash=batch_info.latest_commit_in_repo, + branch=await get_default_branch(batch_info.repo_path), + ) + async def _get_commit_reference(self, repo_path: str) -> str: """Get the commit reference for git log command.""" default_branch = await get_default_branch(repo_path) @@ -260,10 +220,6 @@ async def _execute_git_log( last_processed_commit: str | None = None, ) -> str: """Execute git log command and return raw output.""" - # Ensure abbreviated commits are disabled - await run_shell_command( - ["git", "-C", repo_path, "config", "core.abbrevCommit", "false"], cwd=repo_path - ) self.logger.info("Running git log commands...") @@ -275,10 +231,12 @@ async def _execute_git_log( ) # Batched clone: deepening is complete, run last_processed..HEAD - if last_processed_commit: - commit_range = f"{last_processed_commit}..HEAD" - else: - commit_range = "HEAD" + if not last_processed_commit: + raise InternalError( + error_message="clone_with_batches=True requires last_processed_commit to be set" + ) + + commit_range = f"{last_processed_commit}..HEAD" self.logger.info(f"Executing git log for range: {commit_range}") return await run_shell_command(self._build_git_log_command(repo_path, commit_range)) @@ -646,17 +604,7 @@ async def process_commits_chunk( batch_info: CloneBatchInfo, repository: Repository, ) -> None: - """ - Process a chunk of raw commit texts into activities and write them to DB and Kafka. - - Args: - commit_texts_chunk: List of commit text strings to process - repo_path: Path to the repository - edge_commit_hash: Edge commit hash for filtering - remote: Remote repository URL - segment_id: Segment identifier - integration_id: Integration identifier - """ + """Process a chunk of raw commit texts into activities and write them to DB and Kafka.""" activities_db = [] activities_queue = [] bad_commits = 0 @@ -720,12 +668,10 @@ async def process_commits_chunk( self.logger.info( f"Processed {processed_commits} commits, skipped {bad_commits} invalid commits, filtered {skipped_activities} activities from parent repo in {batch_info.repo_path}" ) - # Update metrics context - if self._metrics_context: - self._metrics_context["processed_commits"] += processed_commits - self._metrics_context["bad_commits"] += bad_commits - self._metrics_context["total_activities"] += len(activities_db) - self._metrics_context["skipped_activities"] += skipped_activities + self._metrics_context["processed_commits"] += processed_commits + self._metrics_context["bad_commits"] += bad_commits + self._metrics_context["total_activities"] += len(activities_db) + self._metrics_context["skipped_activities"] += skipped_activities # Write activities to database and queue if activities_db: @@ -750,9 +696,7 @@ async def _process_activities_from_commits( logger.info(f"Actual number of commits to be processed: {len(commit_texts)}") - # Update total_commits metric - if self._metrics_context: - self._metrics_context["total_commits"] += len(commit_texts) + self._metrics_context["total_commits"] = len(commit_texts) if len(commit_texts) == 0: self.logger.info("No commits to be processed") diff --git a/services/apps/git_integration/src/crowdgit/worker/repository_worker.py b/services/apps/git_integration/src/crowdgit/worker/repository_worker.py index 3f74301a8e..5158d14cc2 100644 --- a/services/apps/git_integration/src/crowdgit/worker/repository_worker.py +++ b/services/apps/git_integration/src/crowdgit/worker/repository_worker.py @@ -1,12 +1,10 @@ import asyncio -from datetime import datetime, timezone from crowdgit.database.crud import ( acquire_repo_for_processing, get_recently_processed_repository_by_url, mark_repo_as_processed, release_repo, - update_last_processed_commit, update_repository_licenses, ) from crowdgit.enums import RepositoryState @@ -29,10 +27,8 @@ SoftwareValueService, VulnerabilityScannerService, ) -from crowdgit.services.utils import get_default_branch, get_repo_name +from crowdgit.services.utils import get_repo_name from crowdgit.settings import ( - STUCK_ONBOARDING_REPO_TIMEOUT_HOURS, - STUCK_RECURRENT_REPO_TIMEOUT_HOURS, WORKER_ERROR_BACKOFF_SEC, WORKER_POLLING_INTERVAL_SEC, ) @@ -96,34 +92,6 @@ async def shutdown(self): logger.info("Worker services shutdown triggered") - async def _ensure_repo_not_stuck(self, repository: Repository): - """ - Check if repo is stuck and raise the appropriate exception if so. - Repos can get stuck in processing state for different reasons: - - Worker crash or restart (e.g. pod eviction due OOM, deployment after timeout, ...) - - `last_processed_commit` is no loger valid due to force-push, dangling-commit, or so... - - Race condition: remote is going under breaking changes at the same time we're processing it - - Network issues breaking the clone/pull operation - """ - # detection - processing_duration_hours = ( - datetime.now(timezone.utc) - repository.locked_at.astimezone(timezone.utc) - ).total_seconds() / 3600 - repo_stuck: bool = ( - repository.last_processed_commit - and processing_duration_hours >= STUCK_RECURRENT_REPO_TIMEOUT_HOURS - ) or ( - repository.last_processed_commit is None # onboarding - and processing_duration_hours >= STUCK_ONBOARDING_REPO_TIMEOUT_HOURS - ) - - # handling - if repo_stuck: - logger.warning( - f"Repo {repository.url} is stuck for {processing_duration_hours} hours — queuing for re-onboarding" - ) - raise ReOnboardingRequiredError() - async def _process_repositories(self): """ Process repositories by priority - check acquire_repo_for_processing() @@ -245,17 +213,10 @@ async def _process_single_repository(self, repository: Repository): licenses = await self.license_service.detect(batch_info.repo_path) await update_repository_licenses(repository.id, licenses) if batch_info.is_final_batch: - await self.commit_service.process_single_batch_commits( + await self.commit_service.process_batch_commits( repository, batch_info, ) - await update_last_processed_commit( - repo_id=repository.id, - commit_hash=batch_info.latest_commit_in_repo, - branch=await get_default_branch(batch_info.repo_path), - ) - else: - await self._ensure_repo_not_stuck(repository) logger.info("Incremental processing completed successfully") processing_state = RepositoryState.COMPLETED diff --git a/services/apps/git_integration/src/test/test_activity_extraction.py b/services/apps/git_integration/src/test/test_activity_extraction.py index a960f0f21a..718c724697 100644 --- a/services/apps/git_integration/src/test/test_activity_extraction.py +++ b/services/apps/git_integration/src/test/test_activity_extraction.py @@ -152,7 +152,7 @@ async def mock_save_execution(execution): mock_save_execution, ): # Process commits - await commit_service.process_single_batch_commits( + await commit_service.process_batch_commits( repository=test_repository, batch_info=batch_info ) @@ -276,7 +276,7 @@ async def mock_save_execution(execution): "crowdgit.services.commit.commit_service.save_service_execution", mock_save_execution, ): - await commit_service.process_single_batch_commits( + await commit_service.process_batch_commits( repository=test_repository, batch_info=batch_info ) From 638c3406892652bbea513391a5b1bba59226c44c Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Thu, 21 May 2026 13:20:29 +0100 Subject: [PATCH 3/4] fix: review suggestions Signed-off-by: Mouad BANI --- .../crowdgit/services/clone/clone_service.py | 4 - .../services/commit/commit_service.py | 4 + .../src/test/outputs/test-repo_actual.json | 363 +++++++++--------- .../src/test/outputs/test-repo_expected.json | 23 +- .../src/test/test_activity_extraction.py | 55 ++- 5 files changed, 253 insertions(+), 196 deletions(-) diff --git a/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py b/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py index 18239578ca..71e9e3e449 100644 --- a/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py +++ b/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py @@ -75,10 +75,6 @@ async def _configure_global_git_client(self, path: str) -> None: await run_shell_command( ["git", "config", "--global", "fetch.recurseSubmodules", "false"], cwd=path ) - # Ensure abbreviated commits are disabled - await run_shell_command( - ["git", "-C", path, "config", "core.abbrevCommit", "false"], cwd=path - ) async def _is_shallow_clone(self, path: str) -> bool: result = await run_shell_command(["git", "rev-parse", "--is-shallow-repository"], cwd=path) diff --git a/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py b/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py index 496847f659..7cb3622372 100644 --- a/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py +++ b/services/apps/git_integration/src/crowdgit/services/commit/commit_service.py @@ -220,6 +220,10 @@ async def _execute_git_log( last_processed_commit: str | None = None, ) -> str: """Execute git log command and return raw output.""" + # Ensure abbreviated commits are disabled + await run_shell_command( + ["git", "-C", repo_path, "config", "core.abbrevCommit", "false"], cwd=repo_path + ) self.logger.info("Running git log commands...") diff --git a/services/apps/git_integration/src/test/outputs/test-repo_actual.json b/services/apps/git_integration/src/test/outputs/test-repo_actual.json index aeb4bb4106..8b677ac140 100644 --- a/services/apps/git_integration/src/test/outputs/test-repo_actual.json +++ b/services/apps/git_integration/src/test/outputs/test-repo_actual.json @@ -1,33 +1,34 @@ [ { - "type": "authored-commit", + "type": "reviewed-commit", "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "d38f1a8036c1b3177d94682f226a9fe6cb387240", - "sourceParentId": "", + "sourceId": "11b4c273bc98507e605d6f3af0dfb273e24649f1", + "sourceParentId": "4df11752fae718f054a20d22bec2078c1d1f08fa", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Merge pull request from external contributor\n\nThis commit was authored by a contributor but committed by a maintainer.\n\nSigned-off-by: Linda Contributor ", + "body": "Refactor authentication module\n\nRemoved old code and added new implementation.\n\nReviewed-by: Alice Developer ", + "username": "alice@example.com", "attributes": { - "insertions": 3, + "insertions": 8, "timezone": "UTC+01:00", - "deletions": 0, - "lines": 3, + "deletions": 2, + "lines": 6, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Linda Contributor", + "displayName": "Alice Developer", "identities": [ { "platform": "git", - "value": "linda@external.com", + "value": "alice@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "linda@external.com", + "value": "alice@example.com", "type": "email", "verified": false } @@ -36,34 +37,35 @@ "segmentId": "test-segment-id" }, { - "type": "committed-commit", + "type": "authored-commit", "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "4619a3b615c79fe4be0de4a82150318898d8ea49", - "sourceParentId": "d38f1a8036c1b3177d94682f226a9fe6cb387240", + "sourceId": "12b4d83852e06c5b3fa0e339c395bad0ee65200f", + "sourceParentId": "", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Merge pull request from external contributor\n\nThis commit was authored by a contributor but committed by a maintainer.\n\nSigned-off-by: Linda Contributor ", + "body": "Quick fix for production issue", + "username": "kelly@example.com", "attributes": { - "insertions": 3, + "insertions": 1, "timezone": "UTC+01:00", "deletions": 0, - "lines": 3, + "lines": 1, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Mike Maintainer", + "displayName": "Kelly Hotfix", "identities": [ { "platform": "git", - "value": "mike@example.com", + "value": "kelly@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "mike@example.com", + "value": "kelly@example.com", "type": "email", "verified": false } @@ -72,34 +74,35 @@ "segmentId": "test-segment-id" }, { - "type": "signed-off-commit", + "type": "authored-commit", "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "a321f844e34e4c4b2bd6b2997681fcf2dd5f47f5", - "sourceParentId": "d38f1a8036c1b3177d94682f226a9fe6cb387240", + "sourceId": "28c2ba317c4e202c25cd684a0c181f54c37d78a0", + "sourceParentId": "", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Merge pull request from external contributor\n\nThis commit was authored by a contributor but committed by a maintainer.\n\nSigned-off-by: Linda Contributor ", + "body": "Update documentation\n\nUpdated the README with new instructions.\n\nReviewed-By: No Email Here\nSigned-off-by: Henry Docs ", + "username": "henry@example.com", "attributes": { - "insertions": 3, + "insertions": 4, "timezone": "UTC+01:00", - "deletions": 0, + "deletions": 1, "lines": 3, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Linda Contributor", + "displayName": "Henry Docs", "identities": [ { "platform": "git", - "value": "linda@external.com", + "value": "henry@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "linda@external.com", + "value": "henry@example.com", "type": "email", "verified": false } @@ -110,32 +113,33 @@ { "type": "authored-commit", "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "12b4d83852e06c5b3fa0e339c395bad0ee65200f", + "sourceId": "2aaf02cef5d7a0fd7f916eb5b64456c4e82a7e58", "sourceParentId": "", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Quick fix for production issue", + "body": "Initial commit - basic structure", + "username": "alice@example.com", "attributes": { - "insertions": 1, + "insertions": 3, "timezone": "UTC+01:00", "deletions": 0, - "lines": 1, + "lines": 3, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Kelly Hotfix", + "displayName": "Alice Developer", "identities": [ { "platform": "git", - "value": "kelly@example.com", + "value": "alice@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "kelly@example.com", + "value": "alice@example.com", "type": "email", "verified": false } @@ -144,13 +148,14 @@ "segmentId": "test-segment-id" }, { - "type": "authored-commit", + "type": "signed-off-commit", "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "afd82faba62589717df9e3c32172b310b68f808b", - "sourceParentId": "", + "sourceId": "2bcde409c58ed7a5620b3f1a8c18c0572067eb62", + "sourceParentId": "afd82faba62589717df9e3c32172b310b68f808b", "platform": "git", "channel": "https://github.com/test/repo.git", "body": "Fix typo in config\n\nSigned-off-by: Ivan >\nReviewed-by: Jane <", + "username": "ivan@example.com", "attributes": { "insertions": 1, "timezone": "UTC+01:00", @@ -161,7 +166,7 @@ }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Ivan Typo", + "displayName": "Ivan", "identities": [ { "platform": "git", @@ -180,34 +185,35 @@ "segmentId": "test-segment-id" }, { - "type": "signed-off-commit", + "type": "committed-commit", "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "2bcde409c58ed7a5620b3f1a8c18c0572067eb62", - "sourceParentId": "afd82faba62589717df9e3c32172b310b68f808b", + "sourceId": "4619a3b615c79fe4be0de4a82150318898d8ea49", + "sourceParentId": "d38f1a8036c1b3177d94682f226a9fe6cb387240", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Fix typo in config\n\nSigned-off-by: Ivan >\nReviewed-by: Jane <", + "body": "Merge pull request from external contributor\n\nThis commit was authored by a contributor but committed by a maintainer.\n\nSigned-off-by: Linda Contributor ", + "username": "mike@example.com", "attributes": { - "insertions": 1, + "insertions": 3, "timezone": "UTC+01:00", - "deletions": 1, - "lines": 0, + "deletions": 0, + "lines": 3, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Ivan", + "displayName": "Mike Maintainer", "identities": [ { "platform": "git", - "value": "ivan@example.com", + "value": "mike@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "ivan@example.com", + "value": "mike@example.com", "type": "email", "verified": false } @@ -216,34 +222,35 @@ "segmentId": "test-segment-id" }, { - "type": "reviewed-commit", + "type": "authored-commit", "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "97ea4072257c1e377a60803c810c7e3af5d051af", - "sourceParentId": "afd82faba62589717df9e3c32172b310b68f808b", + "sourceId": "4df11752fae718f054a20d22bec2078c1d1f08fa", + "sourceParentId": "", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Fix typo in config\n\nSigned-off-by: Ivan >\nReviewed-by: Jane <", + "body": "Refactor authentication module\n\nRemoved old code and added new implementation.\n\nReviewed-by: Alice Developer ", + "username": "grace@example.com", "attributes": { - "insertions": 1, + "insertions": 8, "timezone": "UTC+01:00", - "deletions": 1, - "lines": 0, + "deletions": 2, + "lines": 6, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Jane", + "displayName": "Grace Refactorer", "identities": [ { "platform": "git", - "value": "jane@example.com", + "value": "grace@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "jane@example.com", + "value": "grace@example.com", "type": "email", "verified": false } @@ -254,32 +261,33 @@ { "type": "authored-commit", "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "28c2ba317c4e202c25cd684a0c181f54c37d78a0", + "sourceId": "4fec97a2520ab0aae9454ef0c060d0f1dfc6aa4d", "sourceParentId": "", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Update documentation\n\nUpdated the README with new instructions.\n\nReviewed-By: No Email Here\nSigned-off-by: Henry Docs ", + "body": "Add configuration file\n\nThis commit adds the main configuration file for the project.\n\nSigned-off-by: Bob Reviewer ", + "username": "bob@example.com", "attributes": { - "insertions": 4, + "insertions": 1, "timezone": "UTC+01:00", - "deletions": 1, - "lines": 3, + "deletions": 0, + "lines": 1, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Henry Docs", + "displayName": "Bob Reviewer", "identities": [ { "platform": "git", - "value": "henry@example.com", + "value": "bob@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "henry@example.com", + "value": "bob@example.com", "type": "email", "verified": false } @@ -290,32 +298,33 @@ { "type": "signed-off-commit", "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "9a5760b9f9e05e54acac8c29eb555bc51014a8a1", - "sourceParentId": "28c2ba317c4e202c25cd684a0c181f54c37d78a0", + "sourceId": "5d4675a0cabb19b066b94873a1f4b8f7caf805e7", + "sourceParentId": "c73621865a578ddcc419f976382ef701efa33da4", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Update documentation\n\nUpdated the README with new instructions.\n\nReviewed-By: No Email Here\nSigned-off-by: Henry Docs ", + "body": "Fix critical security vulnerability\n\nThis patch addresses a security issue in the authentication module.\n\nSigned-off-by: Charlie Developer \nReviewed-by: Alice Developer \nTested-by: David Tester ", + "username": "charlie@example.com", "attributes": { - "insertions": 4, + "insertions": 3, "timezone": "UTC+01:00", - "deletions": 1, + "deletions": 0, "lines": 3, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Henry Docs", + "displayName": "Charlie Developer", "identities": [ { "platform": "git", - "value": "henry@example.com", + "value": "charlie@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "henry@example.com", + "value": "charlie@example.com", "type": "email", "verified": false } @@ -324,34 +333,35 @@ "segmentId": "test-segment-id" }, { - "type": "authored-commit", + "type": "co-authored-commit", "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "4df11752fae718f054a20d22bec2078c1d1f08fa", - "sourceParentId": "", + "sourceId": "60a829c7b43c060ee991a6a4601cd9e0ae7058ea", + "sourceParentId": "eacba0e12f931b0cc6b551f4680e41c024b02afe", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Refactor authentication module\n\nRemoved old code and added new implementation.\n\nReviewed-by: Alice Developer ", + "body": "Implement new feature together\n\nThis feature was developed in pair programming session.\n\nCo-authored-by: Eve Pair \nSigned-off-by: Frank Lead ", + "username": "eve@example.com", "attributes": { - "insertions": 8, + "insertions": 2, "timezone": "UTC+01:00", - "deletions": 2, - "lines": 6, + "deletions": 0, + "lines": 2, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Grace Refactorer", + "displayName": "Eve Pair", "identities": [ { "platform": "git", - "value": "grace@example.com", + "value": "eve@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "grace@example.com", + "value": "eve@example.com", "type": "email", "verified": false } @@ -362,32 +372,33 @@ { "type": "reviewed-commit", "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "11b4c273bc98507e605d6f3af0dfb273e24649f1", - "sourceParentId": "4df11752fae718f054a20d22bec2078c1d1f08fa", + "sourceId": "97ea4072257c1e377a60803c810c7e3af5d051af", + "sourceParentId": "afd82faba62589717df9e3c32172b310b68f808b", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Refactor authentication module\n\nRemoved old code and added new implementation.\n\nReviewed-by: Alice Developer ", + "body": "Fix typo in config\n\nSigned-off-by: Ivan >\nReviewed-by: Jane <", + "username": "jane@example.com", "attributes": { - "insertions": 8, + "insertions": 1, "timezone": "UTC+01:00", - "deletions": 2, - "lines": 6, + "deletions": 1, + "lines": 0, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Alice Developer", + "displayName": "Jane", "identities": [ { "platform": "git", - "value": "alice@example.com", + "value": "jane@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "alice@example.com", + "value": "jane@example.com", "type": "email", "verified": false } @@ -396,34 +407,35 @@ "segmentId": "test-segment-id" }, { - "type": "authored-commit", + "type": "signed-off-commit", "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "eacba0e12f931b0cc6b551f4680e41c024b02afe", - "sourceParentId": "", + "sourceId": "9a5760b9f9e05e54acac8c29eb555bc51014a8a1", + "sourceParentId": "28c2ba317c4e202c25cd684a0c181f54c37d78a0", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Implement new feature together\n\nThis feature was developed in pair programming session.\n\nCo-authored-by: Eve Pair \nSigned-off-by: Frank Lead ", + "body": "Update documentation\n\nUpdated the README with new instructions.\n\nReviewed-By: No Email Here\nSigned-off-by: Henry Docs ", + "username": "henry@example.com", "attributes": { - "insertions": 2, + "insertions": 4, "timezone": "UTC+01:00", - "deletions": 0, - "lines": 2, + "deletions": 1, + "lines": 3, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Frank Lead", + "displayName": "Henry Docs", "identities": [ { "platform": "git", - "value": "frank@example.com", + "value": "henry@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "frank@example.com", + "value": "henry@example.com", "type": "email", "verified": false } @@ -432,34 +444,35 @@ "segmentId": "test-segment-id" }, { - "type": "co-authored-commit", + "type": "tested-commit", "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "60a829c7b43c060ee991a6a4601cd9e0ae7058ea", - "sourceParentId": "eacba0e12f931b0cc6b551f4680e41c024b02afe", + "sourceId": "9e230ef529e800d109a7374d8fd4182322508e3d", + "sourceParentId": "c73621865a578ddcc419f976382ef701efa33da4", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Implement new feature together\n\nThis feature was developed in pair programming session.\n\nCo-authored-by: Eve Pair \nSigned-off-by: Frank Lead ", + "body": "Fix critical security vulnerability\n\nThis patch addresses a security issue in the authentication module.\n\nSigned-off-by: Charlie Developer \nReviewed-by: Alice Developer \nTested-by: David Tester ", + "username": "david@example.com", "attributes": { - "insertions": 2, + "insertions": 3, "timezone": "UTC+01:00", "deletions": 0, - "lines": 2, + "lines": 3, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Eve Pair", + "displayName": "David Tester", "identities": [ { "platform": "git", - "value": "eve@example.com", + "value": "david@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "eve@example.com", + "value": "david@example.com", "type": "email", "verified": false } @@ -470,32 +483,33 @@ { "type": "signed-off-commit", "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "ea096a751ec8751d06c82a861a2f769a105bb762", - "sourceParentId": "eacba0e12f931b0cc6b551f4680e41c024b02afe", + "sourceId": "a321f844e34e4c4b2bd6b2997681fcf2dd5f47f5", + "sourceParentId": "d38f1a8036c1b3177d94682f226a9fe6cb387240", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Implement new feature together\n\nThis feature was developed in pair programming session.\n\nCo-authored-by: Eve Pair \nSigned-off-by: Frank Lead ", + "body": "Merge pull request from external contributor\n\nThis commit was authored by a contributor but committed by a maintainer.\n\nSigned-off-by: Linda Contributor ", + "username": "linda@external.com", "attributes": { - "insertions": 2, + "insertions": 3, "timezone": "UTC+01:00", "deletions": 0, - "lines": 2, + "lines": 3, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Frank Lead", + "displayName": "Linda Contributor", "identities": [ { "platform": "git", - "value": "frank@example.com", + "value": "linda@external.com", "type": "username", "verified": true }, { "platform": "git", - "value": "frank@example.com", + "value": "linda@external.com", "type": "email", "verified": false } @@ -506,32 +520,33 @@ { "type": "authored-commit", "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "c73621865a578ddcc419f976382ef701efa33da4", + "sourceId": "afd82faba62589717df9e3c32172b310b68f808b", "sourceParentId": "", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Fix critical security vulnerability\n\nThis patch addresses a security issue in the authentication module.\n\nSigned-off-by: Charlie Developer \nReviewed-by: Alice Developer \nTested-by: David Tester ", + "body": "Fix typo in config\n\nSigned-off-by: Ivan >\nReviewed-by: Jane <", + "username": "ivan@example.com", "attributes": { - "insertions": 3, + "insertions": 1, "timezone": "UTC+01:00", - "deletions": 0, - "lines": 3, + "deletions": 1, + "lines": 0, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Charlie Developer", + "displayName": "Ivan Typo", "identities": [ { "platform": "git", - "value": "charlie@example.com", + "value": "ivan@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "charlie@example.com", + "value": "ivan@example.com", "type": "email", "verified": false } @@ -540,13 +555,14 @@ "segmentId": "test-segment-id" }, { - "type": "signed-off-commit", + "type": "authored-commit", "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "5d4675a0cabb19b066b94873a1f4b8f7caf805e7", - "sourceParentId": "c73621865a578ddcc419f976382ef701efa33da4", + "sourceId": "c73621865a578ddcc419f976382ef701efa33da4", + "sourceParentId": "", "platform": "git", "channel": "https://github.com/test/repo.git", "body": "Fix critical security vulnerability\n\nThis patch addresses a security issue in the authentication module.\n\nSigned-off-by: Charlie Developer \nReviewed-by: Alice Developer \nTested-by: David Tester ", + "username": "charlie@example.com", "attributes": { "insertions": 3, "timezone": "UTC+01:00", @@ -576,13 +592,14 @@ "segmentId": "test-segment-id" }, { - "type": "reviewed-commit", + "type": "authored-commit", "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "eb1fb44cc5fb7cdaf526ef9268956b7158936887", - "sourceParentId": "c73621865a578ddcc419f976382ef701efa33da4", + "sourceId": "d38f1a8036c1b3177d94682f226a9fe6cb387240", + "sourceParentId": "", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Fix critical security vulnerability\n\nThis patch addresses a security issue in the authentication module.\n\nSigned-off-by: Charlie Developer \nReviewed-by: Alice Developer \nTested-by: David Tester ", + "body": "Merge pull request from external contributor\n\nThis commit was authored by a contributor but committed by a maintainer.\n\nSigned-off-by: Linda Contributor ", + "username": "linda@external.com", "attributes": { "insertions": 3, "timezone": "UTC+01:00", @@ -593,17 +610,17 @@ }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Alice Developer", + "displayName": "Linda Contributor", "identities": [ { "platform": "git", - "value": "alice@example.com", + "value": "linda@external.com", "type": "username", "verified": true }, { "platform": "git", - "value": "alice@example.com", + "value": "linda@external.com", "type": "email", "verified": false } @@ -612,34 +629,35 @@ "segmentId": "test-segment-id" }, { - "type": "tested-commit", + "type": "signed-off-commit", "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "9e230ef529e800d109a7374d8fd4182322508e3d", - "sourceParentId": "c73621865a578ddcc419f976382ef701efa33da4", + "sourceId": "e52dc2b2ec76720ce7c8233613d110da5378b014", + "sourceParentId": "4fec97a2520ab0aae9454ef0c060d0f1dfc6aa4d", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Fix critical security vulnerability\n\nThis patch addresses a security issue in the authentication module.\n\nSigned-off-by: Charlie Developer \nReviewed-by: Alice Developer \nTested-by: David Tester ", + "body": "Add configuration file\n\nThis commit adds the main configuration file for the project.\n\nSigned-off-by: Bob Reviewer ", + "username": "bob@example.com", "attributes": { - "insertions": 3, + "insertions": 1, "timezone": "UTC+01:00", "deletions": 0, - "lines": 3, + "lines": 1, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "David Tester", + "displayName": "Bob Reviewer", "identities": [ { "platform": "git", - "value": "david@example.com", + "value": "bob@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "david@example.com", + "value": "bob@example.com", "type": "email", "verified": false } @@ -648,34 +666,35 @@ "segmentId": "test-segment-id" }, { - "type": "authored-commit", + "type": "signed-off-commit", "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "4fec97a2520ab0aae9454ef0c060d0f1dfc6aa4d", - "sourceParentId": "", + "sourceId": "ea096a751ec8751d06c82a861a2f769a105bb762", + "sourceParentId": "eacba0e12f931b0cc6b551f4680e41c024b02afe", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Add configuration file\n\nThis commit adds the main configuration file for the project.\n\nSigned-off-by: Bob Reviewer ", + "body": "Implement new feature together\n\nThis feature was developed in pair programming session.\n\nCo-authored-by: Eve Pair \nSigned-off-by: Frank Lead ", + "username": "frank@example.com", "attributes": { - "insertions": 1, + "insertions": 2, "timezone": "UTC+01:00", "deletions": 0, - "lines": 1, + "lines": 2, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Bob Reviewer", + "displayName": "Frank Lead", "identities": [ { "platform": "git", - "value": "bob@example.com", + "value": "frank@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "bob@example.com", + "value": "frank@example.com", "type": "email", "verified": false } @@ -684,34 +703,35 @@ "segmentId": "test-segment-id" }, { - "type": "signed-off-commit", + "type": "authored-commit", "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "e52dc2b2ec76720ce7c8233613d110da5378b014", - "sourceParentId": "4fec97a2520ab0aae9454ef0c060d0f1dfc6aa4d", + "sourceId": "eacba0e12f931b0cc6b551f4680e41c024b02afe", + "sourceParentId": "", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Add configuration file\n\nThis commit adds the main configuration file for the project.\n\nSigned-off-by: Bob Reviewer ", + "body": "Implement new feature together\n\nThis feature was developed in pair programming session.\n\nCo-authored-by: Eve Pair \nSigned-off-by: Frank Lead ", + "username": "frank@example.com", "attributes": { - "insertions": 1, + "insertions": 2, "timezone": "UTC+01:00", "deletions": 0, - "lines": 1, + "lines": 2, "isMerge": false, "isMainBranch": true }, "url": "https://github.com/test/repo.git", "member": { - "displayName": "Bob Reviewer", + "displayName": "Frank Lead", "identities": [ { "platform": "git", - "value": "bob@example.com", + "value": "frank@example.com", "type": "username", "verified": true }, { "platform": "git", - "value": "bob@example.com", + "value": "frank@example.com", "type": "email", "verified": false } @@ -720,13 +740,14 @@ "segmentId": "test-segment-id" }, { - "type": "authored-commit", + "type": "reviewed-commit", "timestamp": "2025-10-12T16:15:49+01:00", - "sourceId": "2aaf02cef5d7a0fd7f916eb5b64456c4e82a7e58", - "sourceParentId": "", + "sourceId": "eb1fb44cc5fb7cdaf526ef9268956b7158936887", + "sourceParentId": "c73621865a578ddcc419f976382ef701efa33da4", "platform": "git", "channel": "https://github.com/test/repo.git", - "body": "Initial commit - basic structure", + "body": "Fix critical security vulnerability\n\nThis patch addresses a security issue in the authentication module.\n\nSigned-off-by: Charlie Developer \nReviewed-by: Alice Developer \nTested-by: David Tester ", + "username": "alice@example.com", "attributes": { "insertions": 3, "timezone": "UTC+01:00", @@ -755,4 +776,4 @@ }, "segmentId": "test-segment-id" } -] +] \ No newline at end of file diff --git a/services/apps/git_integration/src/test/outputs/test-repo_expected.json b/services/apps/git_integration/src/test/outputs/test-repo_expected.json index b7a95182bd..8b677ac140 100644 --- a/services/apps/git_integration/src/test/outputs/test-repo_expected.json +++ b/services/apps/git_integration/src/test/outputs/test-repo_expected.json @@ -7,6 +7,7 @@ "platform": "git", "channel": "https://github.com/test/repo.git", "body": "Refactor authentication module\n\nRemoved old code and added new implementation.\n\nReviewed-by: Alice Developer ", + "username": "alice@example.com", "attributes": { "insertions": 8, "timezone": "UTC+01:00", @@ -43,6 +44,7 @@ "platform": "git", "channel": "https://github.com/test/repo.git", "body": "Quick fix for production issue", + "username": "kelly@example.com", "attributes": { "insertions": 1, "timezone": "UTC+01:00", @@ -79,6 +81,7 @@ "platform": "git", "channel": "https://github.com/test/repo.git", "body": "Update documentation\n\nUpdated the README with new instructions.\n\nReviewed-By: No Email Here\nSigned-off-by: Henry Docs ", + "username": "henry@example.com", "attributes": { "insertions": 4, "timezone": "UTC+01:00", @@ -115,6 +118,7 @@ "platform": "git", "channel": "https://github.com/test/repo.git", "body": "Initial commit - basic structure", + "username": "alice@example.com", "attributes": { "insertions": 3, "timezone": "UTC+01:00", @@ -151,6 +155,7 @@ "platform": "git", "channel": "https://github.com/test/repo.git", "body": "Fix typo in config\n\nSigned-off-by: Ivan >\nReviewed-by: Jane <", + "username": "ivan@example.com", "attributes": { "insertions": 1, "timezone": "UTC+01:00", @@ -187,6 +192,7 @@ "platform": "git", "channel": "https://github.com/test/repo.git", "body": "Merge pull request from external contributor\n\nThis commit was authored by a contributor but committed by a maintainer.\n\nSigned-off-by: Linda Contributor ", + "username": "mike@example.com", "attributes": { "insertions": 3, "timezone": "UTC+01:00", @@ -223,6 +229,7 @@ "platform": "git", "channel": "https://github.com/test/repo.git", "body": "Refactor authentication module\n\nRemoved old code and added new implementation.\n\nReviewed-by: Alice Developer ", + "username": "grace@example.com", "attributes": { "insertions": 8, "timezone": "UTC+01:00", @@ -259,6 +266,7 @@ "platform": "git", "channel": "https://github.com/test/repo.git", "body": "Add configuration file\n\nThis commit adds the main configuration file for the project.\n\nSigned-off-by: Bob Reviewer ", + "username": "bob@example.com", "attributes": { "insertions": 1, "timezone": "UTC+01:00", @@ -295,6 +303,7 @@ "platform": "git", "channel": "https://github.com/test/repo.git", "body": "Fix critical security vulnerability\n\nThis patch addresses a security issue in the authentication module.\n\nSigned-off-by: Charlie Developer \nReviewed-by: Alice Developer \nTested-by: David Tester ", + "username": "charlie@example.com", "attributes": { "insertions": 3, "timezone": "UTC+01:00", @@ -331,6 +340,7 @@ "platform": "git", "channel": "https://github.com/test/repo.git", "body": "Implement new feature together\n\nThis feature was developed in pair programming session.\n\nCo-authored-by: Eve Pair \nSigned-off-by: Frank Lead ", + "username": "eve@example.com", "attributes": { "insertions": 2, "timezone": "UTC+01:00", @@ -367,6 +377,7 @@ "platform": "git", "channel": "https://github.com/test/repo.git", "body": "Fix typo in config\n\nSigned-off-by: Ivan >\nReviewed-by: Jane <", + "username": "jane@example.com", "attributes": { "insertions": 1, "timezone": "UTC+01:00", @@ -403,6 +414,7 @@ "platform": "git", "channel": "https://github.com/test/repo.git", "body": "Update documentation\n\nUpdated the README with new instructions.\n\nReviewed-By: No Email Here\nSigned-off-by: Henry Docs ", + "username": "henry@example.com", "attributes": { "insertions": 4, "timezone": "UTC+01:00", @@ -439,6 +451,7 @@ "platform": "git", "channel": "https://github.com/test/repo.git", "body": "Fix critical security vulnerability\n\nThis patch addresses a security issue in the authentication module.\n\nSigned-off-by: Charlie Developer \nReviewed-by: Alice Developer \nTested-by: David Tester ", + "username": "david@example.com", "attributes": { "insertions": 3, "timezone": "UTC+01:00", @@ -475,6 +488,7 @@ "platform": "git", "channel": "https://github.com/test/repo.git", "body": "Merge pull request from external contributor\n\nThis commit was authored by a contributor but committed by a maintainer.\n\nSigned-off-by: Linda Contributor ", + "username": "linda@external.com", "attributes": { "insertions": 3, "timezone": "UTC+01:00", @@ -511,6 +525,7 @@ "platform": "git", "channel": "https://github.com/test/repo.git", "body": "Fix typo in config\n\nSigned-off-by: Ivan >\nReviewed-by: Jane <", + "username": "ivan@example.com", "attributes": { "insertions": 1, "timezone": "UTC+01:00", @@ -547,6 +562,7 @@ "platform": "git", "channel": "https://github.com/test/repo.git", "body": "Fix critical security vulnerability\n\nThis patch addresses a security issue in the authentication module.\n\nSigned-off-by: Charlie Developer \nReviewed-by: Alice Developer \nTested-by: David Tester ", + "username": "charlie@example.com", "attributes": { "insertions": 3, "timezone": "UTC+01:00", @@ -583,6 +599,7 @@ "platform": "git", "channel": "https://github.com/test/repo.git", "body": "Merge pull request from external contributor\n\nThis commit was authored by a contributor but committed by a maintainer.\n\nSigned-off-by: Linda Contributor ", + "username": "linda@external.com", "attributes": { "insertions": 3, "timezone": "UTC+01:00", @@ -619,6 +636,7 @@ "platform": "git", "channel": "https://github.com/test/repo.git", "body": "Add configuration file\n\nThis commit adds the main configuration file for the project.\n\nSigned-off-by: Bob Reviewer ", + "username": "bob@example.com", "attributes": { "insertions": 1, "timezone": "UTC+01:00", @@ -655,6 +673,7 @@ "platform": "git", "channel": "https://github.com/test/repo.git", "body": "Implement new feature together\n\nThis feature was developed in pair programming session.\n\nCo-authored-by: Eve Pair \nSigned-off-by: Frank Lead ", + "username": "frank@example.com", "attributes": { "insertions": 2, "timezone": "UTC+01:00", @@ -691,6 +710,7 @@ "platform": "git", "channel": "https://github.com/test/repo.git", "body": "Implement new feature together\n\nThis feature was developed in pair programming session.\n\nCo-authored-by: Eve Pair \nSigned-off-by: Frank Lead ", + "username": "frank@example.com", "attributes": { "insertions": 2, "timezone": "UTC+01:00", @@ -727,6 +747,7 @@ "platform": "git", "channel": "https://github.com/test/repo.git", "body": "Fix critical security vulnerability\n\nThis patch addresses a security issue in the authentication module.\n\nSigned-off-by: Charlie Developer \nReviewed-by: Alice Developer \nTested-by: David Tester ", + "username": "alice@example.com", "attributes": { "insertions": 3, "timezone": "UTC+01:00", @@ -755,4 +776,4 @@ }, "segmentId": "test-segment-id" } -] +] \ No newline at end of file diff --git a/services/apps/git_integration/src/test/test_activity_extraction.py b/services/apps/git_integration/src/test/test_activity_extraction.py index 718c724697..ad85fcf82b 100644 --- a/services/apps/git_integration/src/test/test_activity_extraction.py +++ b/services/apps/git_integration/src/test/test_activity_extraction.py @@ -102,15 +102,17 @@ def test_repository(): @pytest.fixture def batch_info(): """Create CloneBatchInfo for testing.""" + ensure_test_repo_exists() + head_hash = subprocess.check_output( + ["git", "rev-parse", "HEAD"], cwd=str(TEST_REPO_PATH), text=True + ).strip() return CloneBatchInfo( repo_path=str(TEST_REPO_PATH), remote="https://github.com/test/repo.git", is_first_batch=True, is_final_batch=True, clone_with_batches=False, - latest_commit_in_repo=None, # Will be set by service - prev_batch_edge_commit=None, - edge_commit=None, + latest_commit_in_repo=head_hash, ) @@ -144,17 +146,23 @@ async def mock_save_execution(execution): """Mock service execution save.""" pass - with patch( - "crowdgit.services.commit.commit_service.batch_insert_activities", mock_batch_insert - ): - with patch( + with ( + patch( + "crowdgit.services.commit.commit_service.batch_insert_activities", + mock_batch_insert, + ), + patch( "crowdgit.services.commit.commit_service.save_service_execution", mock_save_execution, - ): - # Process commits - await commit_service.process_batch_commits( - repository=test_repository, batch_info=batch_info - ) + ), + patch( + "crowdgit.services.commit.commit_service.update_last_processed_commit", + AsyncMock(), + ), + ): + await commit_service.process_batch_commits( + repository=test_repository, batch_info=batch_info + ) # Verify activities were extracted assert len(captured_activities_db) > 0, "No activities were extracted" @@ -269,16 +277,23 @@ async def mock_batch_insert(activities): async def mock_save_execution(execution): pass - with patch( - "crowdgit.services.commit.commit_service.batch_insert_activities", mock_batch_insert - ): - with patch( + with ( + patch( + "crowdgit.services.commit.commit_service.batch_insert_activities", + mock_batch_insert, + ), + patch( "crowdgit.services.commit.commit_service.save_service_execution", mock_save_execution, - ): - await commit_service.process_batch_commits( - repository=test_repository, batch_info=batch_info - ) + ), + patch( + "crowdgit.services.commit.commit_service.update_last_processed_commit", + AsyncMock(), + ), + ): + await commit_service.process_batch_commits( + repository=test_repository, batch_info=batch_info + ) # Extract activity types from DB records # DB format: (result_id, state, json_data, tenant_id, integration_id) From e4a59cd466e861c3860698b2e1d918b324208c1c Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Thu, 21 May 2026 13:32:55 +0100 Subject: [PATCH 4/4] fix: double yield Signed-off-by: Mouad BANI --- .../src/crowdgit/services/clone/clone_service.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py b/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py index 71e9e3e449..2705a4df27 100644 --- a/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py +++ b/services/apps/git_integration/src/crowdgit/services/clone/clone_service.py @@ -497,6 +497,10 @@ async def clone_batches_generator( await self._cleanup_working_directory(temp_repo_path) batch_info.is_first_batch = False + if batch_info.is_final_batch: + self.logger.info("Clone complete — no deepening needed, all commits available") + return + while not batch_info.is_final_batch: batch_start_time = time.time() await self._clone_next_batch(temp_repo_path, batch_depth, remote) @@ -506,14 +510,12 @@ async def clone_batches_generator( total_execution_time += round(time.time() - batch_start_time, 2) if not batch_info.is_final_batch: - # exponential deepen increment to speed up fetching batch_depth = min( batch_depth * BATCH_DEEPEN_MULTIPLIER, MAX_BATCH_DEEPEN, ) - if clone_with_batches: - yield batch_info + yield batch_info except Exception as e: # Handle both CrowdGitError and generic Exception