chore: git integration with full clones (CM-1182)#4124
Conversation
Signed-off-by: Uroš Marolt <uros@marolt.me>
There was a problem hiding this comment.
Pull request overview
Switches the git integration worker from ephemeral shallow clones + deepen-loop batching to a persistent full clone on an FSS-backed PVC with incremental git fetch, fixing ~85% commit loss on merge-heavy repos. Bundled in are unrelated stuck-repo lock-heartbeat changes, a unified STUCK_REPO_TIMEOUT_HOURS, and supporting model/test/compose updates.
Changes:
- Clone path rewritten: stable on-disk path via
REPO_STORAGE_ROOT,_incremental_fetch/_wipe_and_reclone, single-yieldclone_batches_generator. - Commit processing collapsed to hash-streaming (
rev-list --reverse+ per-batchgit log --no-walk+ checkpoint after each hash batch); old shallow-edge/batched code paths removed. - Stuck-repo handling moved from time-based
ReOnboardingRequiredErrorto SQL lock heartbeat + unified timeout;CROWD_DB_PORTcast to int;load_env_varempty-string fallback; new EmptyRepo/Forbidden/Permission error mapping; Docker compose volume + working_dir change.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| services/apps/git_integration/src/crowdgit/services/clone/clone_service.py | Rewrite to persistent clone + incremental fetch with REPO_STORAGE_ROOT fallback. |
| services/apps/git_integration/src/crowdgit/services/commit/commit_service.py | Hash-streaming pipeline replaces shallow-edge batched git log; per-batch checkpoint. |
| services/apps/git_integration/src/crowdgit/worker/repository_worker.py | Lock-heartbeat loop, new error mapping, removal of stuck-repo timer logic. |
| services/apps/git_integration/src/crowdgit/database/crud.py | Stale-lock reclaim via timeout, update_lock_heartbeat, failed retry interval. |
| services/apps/git_integration/src/crowdgit/settings.py | New REPO_STORAGE_ROOT/heartbeat/timeout settings, int-cast port, empty-string fallback. |
| services/apps/git_integration/src/crowdgit/services/utils.py | ls-remote fallback now checks stdout non-empty. |
| services/apps/git_integration/src/crowdgit/models/clone_batch.py | Removed unused edge/clone_with_batches fields. |
| services/apps/git_integration/src/test/test_activity_extraction.py | Patched new update_last_processed_commit and trimmed CloneBatchInfo fields. |
| services/apps/git_integration/src/test/outputs/test-repo_expected.json | Updated fixtures to new output shape (adds username, new timestamps/SHAs). |
| services/apps/git_integration/src/test/outputs/test-repo_actual.json | Regenerated actual-output fixture matching expected. |
| scripts/services/git-integration.yaml | Adds git-integration-repos named volume + REPO_STORAGE_ROOT env. |
Comments suppressed due to low confidence (6)
services/apps/git_integration/src/crowdgit/services/clone/clone_service.py:225
shutil.rmtreeis synchronous and is invoked here directly inside async code paths (_wipe_and_reclone, the exception handler at line 340, and_cleanup_temp_directory_with_retrieswas previously the path for cleanups). For multi-GB persistent clones on shared FSS storage, this will block the event loop for many seconds, freezing the heartbeat loop and any other concurrent worker tasks. Consider running rmtree viaasyncio.to_thread(orloop.run_in_executor) to avoid stalling the event loop while wiping a corrupted/changed persistent clone.
async def _wipe_and_reclone(self, repo_path: str, remote: str) -> None:
"""Wipe an existing persistent clone and replace it with a fresh full clone."""
self.logger.info(f"Wiping and re-cloning {remote}")
shutil.rmtree(repo_path)
os.makedirs(repo_path)
await self._perform_full_clone(repo_path, remote)
services/apps/git_integration/src/crowdgit/services/commit/commit_service.py:269
- Checkpointing at
batch_hashes[-1](the newest commit in the hash slice produced bygit rev-list --reverse) is correct for linear histories but, on a branchy DAG, the next run's rangebatch_hashes[-1]..HEADwill reach back through any commits in earlier hash batches that are not ancestors ofbatch_hashes[-1]. Those commits will be re-fetched and re-processed (deduped downstream, but still wasted work, especially after a crash mid-repo). Consider usinggit merge-base(or rev-list^batch_hashes[-1]) semantics, or checkpointing the boundary hash that is guaranteed to be the topological predecessor of the unprocessed set.
for batch_num, i in enumerate(range(0, len(hashes), self.HASH_BATCH_SIZE), 1):
batch_hashes = hashes[i : i + self.HASH_BATCH_SIZE]
raw_commits = await self._fetch_commits_batch(repo_path, batch_hashes)
await self._process_activities_from_commits(raw_commits, batch_info, repository)
# Checkpoint at last hash in batch (oldest-first from rev-list = newest in batch).
# Next run resumes from batch_hashes[-1]..HEAD.
await update_last_processed_commit(
repo_id=repository.id,
commit_hash=batch_hashes[-1],
branch=branch,
)
self.logger.info(
f"Hash batch {batch_num}/{total_batches} checkpointed at {batch_hashes[-1]}"
)
services/apps/git_integration/src/crowdgit/services/clone/clone_service.py:202
os.walkover the entire.gitdirectory tree on every incremental run can be measurably expensive on large repos (e.g. linux/torvalds with hundreds of thousands of loose-object subdirs and packs). Lock files of interest live in well-known locations (.git/index.lock,.git/HEAD.lock,.git/refs/**/*.lock,.git/packed-refs.lock,.git/shallow.lock). Consider checking those specific paths rather than scanning the whole tree on every fetch.
git_dir = os.path.join(repo_path, ".git")
for dirpath, _, filenames in os.walk(git_dir):
for filename in filenames:
if filename.endswith(".lock"):
lockpath = os.path.join(dirpath, filename)
try:
os.remove(lockpath)
self.logger.warning(f"Removed stale lockfile: {lockpath}")
except OSError as e:
self.logger.warning(f"Failed to remove stale lockfile {lockpath}: {e}")
services/apps/git_integration/src/crowdgit/services/clone/clone_service.py:340
DiskSpaceErroris treated as transient and the persistent clone is kept on disk after a "no space left on device" failure. In practice disk-full almost always corresponds to a partially-written fetch/pack on the PVC that needs cleanup before the next run can succeed, so excluding it from the wipe set risks the repo getting stuck in a permanently-failing state. Consider treating disk-space errors as non-transient (or at least running agit gc/cleanup before the next attempt).
_TRANSIENT_ERRORS = (RateLimitError, NetworkError, RemoteServerError, DiskSpaceError, CommandTimeoutError)
if stable_path and os.path.isdir(os.path.join(stable_path, ".git")):
if not isinstance(e, _TRANSIENT_ERRORS):
self.logger.warning(f"Wiping persistent clone at {stable_path} after non-transient failure")
shutil.rmtree(stable_path, ignore_errors=True)
services/apps/git_integration/src/crowdgit/worker/repository_worker.py:263
- Removing
ReOnboardingRequiredErrorhandling and the_ensure_repo_not_stuckdetection means there is no longer any code path that transitions a repo toRepositoryState.PENDING_REONBOARD. With the new heartbeat-based stale-lock detection a repo that genuinely needs re-onboarding (e.g. force-pushed branch withlast_processed_commitno longer reachable from HEAD) will now loop forever: the heartbeat keeps the lock fresh, processing fails with a generic exception → state goes toFAILED, thenFAILED_RETRY_INTERVAL_HOURSlater it is re-acquired with the same stalelast_processed_commitand fails again. Worth confirming there is a separate mechanism that detects "last_processed_commit no longer reachable" and recovers, otherwise the previous PENDING_REONBOARD flow needs an equivalent in the new design.
logger.info("Incremental processing completed successfully")
processing_state = RepositoryState.COMPLETED
except EmptyRepoError:
logger.info(f"Repository {repository.url} is empty, marking as completed")
processing_state = RepositoryState.COMPLETED
except ParentRepoInvalidError as e:
logger.error(f"Parent repo validation failed: {repr(e)}")
processing_state = RepositoryState.REQUIRES_PARENT
except (RepoAuthRequiredError, ForbiddenError, RepoPermissionError) as e:
logger.error(f"Repository requires authentication: {repr(e)}")
processing_state = RepositoryState.AUTH_REQUIRED
except Exception as e:
logger.error(f"Processing failed with error: {repr(e)}")
processing_state = RepositoryState.FAILED
services/apps/git_integration/src/crowdgit/services/clone/clone_service.py:339
- Long line (>120 chars) — the tuple definition for
_TRANSIENT_ERRORSand the log message just below exceed the codebase's apparent line length convention seen elsewhere in this file. Consider wrapping for readability.
_TRANSIENT_ERRORS = (RateLimitError, NetworkError, RemoteServerError, DiskSpaceError, CommandTimeoutError)
if stable_path and os.path.isdir(os.path.join(stable_path, ".git")):
if not isinstance(e, _TRANSIENT_ERRORS):
self.logger.warning(f"Wiping persistent clone at {stable_path} after non-transient failure")
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Signed-off-by: Uroš Marolt <uros@marolt.me>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 12 out of 12 changed files in this pull request and generated 12 comments.
Comments suppressed due to low confidence (2)
services/apps/git_integration/src/crowdgit/services/clone/clone_service.py:202
- Recursively walking the entire
.gitdirectory tree to delete every file whose name ends with.lockis too coarse. Git creates short-lived.lockfiles (e.g.index.lock,HEAD.lock,<refname>.lock, packed-refs.lock) under normal concurrent-operation contention; if for any reason a legitimate git subprocess is still running in this directory (e.g. an unkilled child from a previous async task, an external scanner, or a future code path that runs git in the background under the same flock) this loop will corrupt its in-progress operation. At minimum, scope this to a known whitelist of well-known stale lock paths (index.lock,HEAD.lock,config.lock,packed-refs.lock) and/or check file age before deleting.
git_dir = os.path.join(repo_path, ".git")
for dirpath, _, filenames in os.walk(git_dir):
for filename in filenames:
if filename.endswith(".lock"):
lockpath = os.path.join(dirpath, filename)
try:
os.remove(lockpath)
self.logger.warning(f"Removed stale lockfile: {lockpath}")
except OSError as e:
self.logger.warning(f"Failed to remove stale lockfile {lockpath}: {e}")
services/apps/git_integration/src/crowdgit/services/clone/clone_service.py:218
get_default_branch(in services/utils.py) returns its result without stripping trailing whitespace (return output[len(prefix):]). When the newly-added_incremental_fetchuses it via["git", "fetch", "--no-tags", "origin", default_branch]and["git", "reset", "--hard", f"origin/{default_branch}"], a trailing newline indefault_branchwill cause both commands to fail with an unknown-ref error and trigger the full-clone wipe path on every incremental run. Please confirmrun_shell_commandstrips trailing newlines, or call.strip()ondefault_branchbefore using it in the new fetch/reset commands.
default_branch = await get_default_branch(repo_path)
if default_branch == "*":
raise CommandExecutionError(
f"Cannot fetch {remote}: no remote tracking branch found (detached HEAD)",
returncode=1,
)
self.logger.info(f"Fetching {default_branch} from {remote}")
await run_shell_command(
["git", "fetch", "--no-tags", "origin", default_branch], cwd=repo_path
)
# Update working tree so file-scanning services (licensee, rg, vuln scanner) see
# current file content, not the state from the previous run.
await run_shell_command(
["git", "reset", "--hard", f"origin/{default_branch}"], cwd=repo_path
)
Signed-off-by: Uroš Marolt <uros@marolt.me>
Signed-off-by: Uroš Marolt <uros@marolt.me>
Signed-off-by: Uroš Marolt <uros@marolt.me>
Signed-off-by: Uroš Marolt <uros@marolt.me>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 42fd9f8. Configure here.
Signed-off-by: Uroš Marolt <uros@marolt.me>

Summary
Replace ephemeral shallow clone (
git clone --depth=1+--deepen=5) with persistent full clone on FSS-backed PVC + incrementalgit fetch. Fixes ~85% commit loss on merge-heavy repos (e.g.torvalds/linux) where second-parent objects from sub-maintainer merges sat outside the shallow boundary and were never walked. Steady-state cost drops (fetch ~10–60s vs reclone ~75s).Changes
clone_service.py: rewrite. Added_stable_repo_path(repo_id)(usesREPO_STORAGE_ROOTwhen set, else falls back totempfile.mkdtemp— backward compatible for local/Docker). Added_incremental_fetch.clone_batches_generatorcollapsed to single yield (is_final_batch=True); no batched-depth loop; normtreeinfinallywhen persistent path is used.commit_service.py:_execute_git_logsimplified to singlelast_processed_commit..origin/<branch>range. Batched-iteration / edge-commit branches now dead (slated for Phase 4 cleanup, CM-1187).models/clone_batch.py: trimmed unused fields.settings.py: newREPO_STORAGE_ROOT(required=False, default=None). Also unrelated stuck-repo fixes bundled here —STUCK_REPO_TIMEOUT_HOURS(replaces split onboarding/recurrent timeouts),LOCK_HEARTBEAT_INTERVAL_SEC,FAILED_RETRY_INTERVAL_HOURS,CROWD_DB_PORTcast toint,load_env_varempty-string fallback (os.getenv(key) or default).worker/repository_worker.py,database/crud.py,services/utils.py: supporting changes for new lock-heartbeat + single stuck-repo timeout.scripts/services/git-integration.yaml: Docker Compose — named volumegit-integration-reposmounted at/var/lib/crowdgit,REPO_STORAGE_ROOTenv,working_dirmoved to repo root.test_activity_extraction.pyupdated to new output shape.Concurrency safety: existing
SELECT … FOR UPDATE … SKIP LOCKEDper-repo lock already serializes access across all 5 replicas — shared RWX PVC safe without StatefulSet.Rollback: revert + redeploy previous image. Orphaned
.gitdirs on FSS are inert.Type of change
JIRA ticket
https://linuxfoundation.atlassian.net/browse/CM-1185 (parent epic: https://linuxfoundation.atlassian.net/browse/CM-1182)
Note
High Risk
High risk because it rewrites core repo cloning/commit-walking and lock acquisition behavior, affecting ingestion correctness and concurrency safety as well as persistent disk usage.
Overview
Switches git-integration from ephemeral cloning to persistent, full on-disk clones when
REPO_STORAGE_ROOTis set, using incrementalgit fetch+ hard reset on subsequent runs and OS-levelflockto avoid concurrent directory mutation.Repository locking is hardened: DB acquisition queries can now reclaim stale
lockedAtafterSTUCK_REPO_TIMEOUT_HOURS, a new worker heartbeat periodically refreshes the lock, and failed repos are retried on a shorterFAILED_RETRY_INTERVAL_HOURScadence.Commit processing is reworked to stream commits by hash batches (
rev-list+git log --no-walk) with per-batch checkpointing viaupdate_last_processed_commit, and branch-change handling now forces full-history processing.Supporting updates include new settings/env (
REPO_STORAGE_ROOT, heartbeat/timeout knobs), Docker Compose volume mounting for repo storage, renamingPermissionErrortoRepoPermissionErrorin error classification/handling, and test fixture updates to match the new extraction/checkpointing behavior.Reviewed by Cursor Bugbot for commit 53d87a2. Bugbot is set up for automated code reviews on this repo. Configure here.