Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,6 @@ class CloneBatchInfo(BaseModel):
latest_commit_in_repo: str | None = Field(
None, description="Hash of the latest commit in repo"
)
edge_commit: str | None = Field(
default=None,
description="The oldest commit in the current batch, used to track progress during incremental processing.",
)
prev_batch_edge_commit: str | None = Field(
default=None,
description="The edge commit from the previous batch, used to track progress during incremental processing.",
)
clone_with_batches: bool = Field(
default=True, description="Whether repo is cloned with batches"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import tempfile
import time
from collections.abc import AsyncIterator
from datetime import datetime, timezone
from decimal import Decimal

import aiofiles
Expand All @@ -22,6 +23,7 @@
NetworkError,
RateLimitError,
RemoteServerError,
ReOnboardingRequiredError,
)
from crowdgit.models import CloneBatchInfo, Repository, ServiceExecution
from crowdgit.services.base.base_service import BaseService
Expand All @@ -31,6 +33,10 @@
get_repo_name,
run_shell_command,
)
from crowdgit.settings import (
STUCK_ONBOARDING_REPO_TIMEOUT_HOURS,
STUCK_RECURRENT_REPO_TIMEOUT_HOURS,
)

DEFAULT_STORAGE_OPTIMIZATION_THRESHOLD_MB = 10000
INITIAL_BATCH_DEEPEN = 50
Expand Down Expand Up @@ -70,19 +76,59 @@ async def _configure_global_git_client(self, path: str) -> None:
["git", "config", "--global", "fetch.recurseSubmodules", "false"], cwd=path
)

async def _check_if_final_batch(self, path: str, target_commit_hash: str | None) -> bool:
async def _is_shallow_clone(self, path: str) -> bool:
result = await run_shell_command(["git", "rev-parse", "--is-shallow-repository"], cwd=path)
return "true" in result

def _is_timeout_reached(self, repository: Repository) -> bool:
processing_duration_hours = (
datetime.now(timezone.utc) - repository.locked_at.astimezone(timezone.utc)
).total_seconds() / 3600
is_onboarding = repository.last_processed_commit is None
timeout_hours = (
STUCK_ONBOARDING_REPO_TIMEOUT_HOURS
if is_onboarding
else STUCK_RECURRENT_REPO_TIMEOUT_HOURS
)
if processing_duration_hours >= timeout_hours:
self.logger.warning(
f"Repo {repository.url} is stuck for {processing_duration_hours:.1f} hours — queuing for re-onboarding"
)
return True
return False

async def _check_if_final_batch(self, path: str, repository: Repository) -> bool:
"""
Final batch is determined if:
- full history is cloned (no longer shallow_clone)
- target commit reached
Check whether the current batch is the final one.

Returns True when:
- full history fetched (onboarding — no last_processed_commit to look for), or
- all commits between last_processed_commit and HEAD are available (no shallow boundary in range).

Raises ReOnboardingRequiredError when:
- force push detected (full clone but last_processed_commit missing), or
- processing timeout exceeded while still deepening.
"""
is_shallow_clone = await run_shell_command(
["git", "rev-parse", "--is-shallow-repository"], cwd=path
)
if "false" in is_shallow_clone:
is_shallow_clone = await self._is_shallow_clone(path)
is_full_clone = not is_shallow_clone

target_commit_hash = repository.last_processed_commit
if is_full_clone and not target_commit_hash:
# fully cloned at once
return True
if not target_commit_hash:
return False

reached_target_commit = await self._has_required_commit_range(path, target_commit_hash)
if reached_target_commit:
return True

timeout_reached = self._is_timeout_reached(repository)
force_push_detected = is_full_clone and not reached_target_commit
if timeout_reached or force_push_detected:
raise ReOnboardingRequiredError()

return False

async def _has_required_commit_range(self, path: str, target_commit_hash: str) -> bool:
try:
await run_shell_command(
["git", "rev-parse", "--verify", f"{target_commit_hash}^{{commit}}"], cwd=path
Expand Down Expand Up @@ -225,13 +271,14 @@ async def _update_batch_info(
self,
batch_info: CloneBatchInfo,
repo_path: str,
target_commit_hash: str | None,
repository: Repository,
clone_with_batches: bool,
) -> None:
"""Update batch info with repo path and final batch status.

For full clones (clone_with_batches=False): Marks as final batch immediately.
For batched clones (clone_with_batches=True): Checks if target commit reached or full history fetched.
For batched clones (clone_with_batches=True): Delegates to _check_if_final_batch
to determine if required commit range is available or timeout/force-push occurred.
"""
batch_info.repo_path = repo_path
batch_info.clone_with_batches = clone_with_batches
Expand All @@ -248,22 +295,7 @@ async def _update_batch_info(
batch_info.is_final_batch = True
return

batch_info.is_final_batch = await self._check_if_final_batch(repo_path, target_commit_hash)
batch_info.edge_commit = await self._get_edge_commit(repo_path)

async def _get_edge_commit(self, repo_path: str) -> str | None:
"""
Returns the edge commit of a shallow clone by reading the .git/shallow file,
which contains the boundary commit(s) when history is truncated.

If the full history has been cloned, the .git/shallow file does not exist.
"""
boundaries = await self._read_shallow_file(repo_path)
if not boundaries:
return None
oldest_commit = boundaries[0]
self.logger.info(f"Edge commit: {oldest_commit}")
return oldest_commit
batch_info.is_final_batch = await self._check_if_final_batch(repo_path, repository)

async def _cleanup_temp_directory(self, temp_repo_path: str, repo_id: str) -> None:
"""
Expand Down Expand Up @@ -456,37 +488,35 @@ async def clone_batches_generator(
temp_repo_path, remote, repository.branch, repository.last_processed_commit
)
await self._update_batch_info(
batch_info, temp_repo_path, repository.last_processed_commit, clone_with_batches
batch_info, temp_repo_path, repository, clone_with_batches
)
batch_end_time = time.time()
total_execution_time += round(batch_end_time - batch_start_time, 2)
total_execution_time += round(time.time() - batch_start_time, 2)

yield batch_info
if working_dir_cleanup:
await self._cleanup_working_directory(temp_repo_path)

batch_info.is_first_batch = False
if batch_info.is_final_batch:
self.logger.info("Clone complete — no deepening needed, all commits available")
return

while not batch_info.is_final_batch:
batch_start_time = time.time()
batch_info.prev_batch_edge_commit = await self._get_edge_commit(temp_repo_path)
await self._clone_next_batch(temp_repo_path, batch_depth, remote)
await self._update_batch_info(
batch_info,
temp_repo_path,
repository.last_processed_commit,
clone_with_batches,
batch_info.is_final_batch = await self._check_if_final_batch(
temp_repo_path, repository
)
batch_end_time = time.time()
total_execution_time += round(batch_end_time - batch_start_time, 2)
total_execution_time += round(time.time() - batch_start_time, 2)

yield batch_info
if not batch_info.is_final_batch:
# exponential deepen increment to speed up fetching
batch_depth = min(
batch_depth * BATCH_DEEPEN_MULTIPLIER,
MAX_BATCH_DEEPEN,
)

yield batch_info

except Exception as e:
# Handle both CrowdGitError and generic Exception
execution_status = ExecutionStatus.FAILURE
Expand Down
Loading
Loading