Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ uv.lock

# Data and Logs
data/
data-dev/
logs/
logs-dev/
*.db
*.sqlite
*.sqlite3
Expand Down
21 changes: 21 additions & 0 deletions docker-compose.dev.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
services:
webui:
build: .
shm_size: "1gb"
ports:
- "16666:1455"
- "6081:6080"
environment:
- WEBUI_HOST=0.0.0.0
- WEBUI_PORT=1455
- DISPLAY=:99
- ENABLE_VNC=1
- VNC_PORT=5900
- NOVNC_PORT=6080
- DEBUG=0
- LOG_LEVEL=info
- WEBUI_ACCESS_PASSWORD=admin123
volumes:
- ./data-dev:/app/data
- ./logs-dev:/app/logs
restart: unless-stopped
70 changes: 70 additions & 0 deletions docs/reviews/CR-REGISTRATION-WAIT-STRATEGY-2026-04-02.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Registration Wait Strategy Review

## Scope

- Branch: `feature/registration-wait-strategy`
- Base: `upstream/main`
- Goal: add a configurable global wait strategy for batch registration and surface the active mode in settings and registration UI

## Verification

### Automated Tests

Command:

```bash
docker exec codex-console-dev-webui-1 python -m pytest \
tests/test_settings_registration_auto_fields.py \
tests/test_registration_wait_strategy.py -q
```

Result:

```text
...... [100%]
6 passed in 1.91s
```

### Runtime Checks

Command:

```bash
python3 - <<'PY'
import urllib.request
for url in ['http://127.0.0.1:16666/login', 'http://127.0.0.1:15555/login']:
with urllib.request.urlopen(url, timeout=10) as r:
print(url, r.status)
PY
```

Result:

```text
http://127.0.0.1:16666/login 200
http://127.0.0.1:15555/login 200
```

- Dev service confirmed on `16666`
- Formal service confirmed on `15555`
- Dev service rebuilt after the final UI color adjustment

### Review Check

Command:

```bash
coderabbit review --prompt-only --base upstream/main -t committed
```

Result:

```text
Review completed: No findings
```

## Conclusion

- No blocking findings found in code review
- Global wait strategy is persisted, consumed by pipeline scheduling, and visible in both settings and registration UI
- Formal service remained unaffected during dev verification
14 changes: 14 additions & 0 deletions src/config/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ class RoleTag(str, Enum):
CHILD = "child"


class RegistrationWaitStrategy(str, Enum):
"""批量注册等待策略"""
START = "start"
COMPLETION = "completion"


class PoolState(str, Enum):
"""账号池状态"""
TEAM_POOL = "team_pool"
Expand Down Expand Up @@ -84,6 +90,14 @@ def normalize_role_tag(value: str) -> str:
return RoleTag.NONE.value


def normalize_registration_wait_strategy(value: str) -> str:
"""标准化批量等待策略,未知值降级为 start。"""
text = str(value or "").strip().lower()
if text == RegistrationWaitStrategy.COMPLETION.value:
return RegistrationWaitStrategy.COMPLETION.value
return RegistrationWaitStrategy.START.value


def normalize_pool_state(value: str) -> str:
"""标准化池状态,未知值降级为 candidate_pool。"""
text = str(value or "").strip().lower()
Expand Down
8 changes: 8 additions & 0 deletions src/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,12 @@ class SettingDefinition:
category=SettingCategory.REGISTRATION,
description="注册间隔最大值(秒)"
),
"registration_wait_strategy": SettingDefinition(
db_key="registration.wait_strategy",
default_value="start",
category=SettingCategory.REGISTRATION,
description="批量注册等待策略(start=启动间隔, completion=完成间隔)"
),
"registration_entry_flow": SettingDefinition(
db_key="registration.entry_flow",
default_value="native",
Expand Down Expand Up @@ -585,6 +591,7 @@ class SettingDefinition:
"registration_default_password_length": int,
"registration_sleep_min": int,
"registration_sleep_max": int,
"registration_wait_strategy": str,
"registration_entry_flow": str,
"registration_auto_enabled": bool,
"registration_auto_check_interval": int,
Expand Down Expand Up @@ -874,6 +881,7 @@ def proxy_url(self) -> Optional[str]:
registration_default_password_length: int = 12
registration_sleep_min: int = 5
registration_sleep_max: int = 30
registration_wait_strategy: str = "start"
registration_entry_flow: str = "native"
registration_auto_enabled: bool = False
registration_auto_check_interval: int = 60
Expand Down
55 changes: 52 additions & 3 deletions src/web/routes/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

from ...config.constants import (
RoleTag,
RegistrationWaitStrategy,
normalize_role_tag,
normalize_registration_wait_strategy,
role_tag_to_account_label,
)
from ...database import crud
Expand Down Expand Up @@ -43,6 +45,20 @@
batch_tasks: Dict[str, dict] = {}


def _wait_strategy_label(strategy: str) -> str:
normalized = normalize_registration_wait_strategy(strategy)
if normalized == RegistrationWaitStrategy.COMPLETION.value:
return "完成间隔"
return "启动间隔"


def _current_wait_strategy(settings: Optional[Settings] = None) -> str:
current_settings = settings or get_settings()
return normalize_registration_wait_strategy(
getattr(current_settings, "registration_wait_strategy", RegistrationWaitStrategy.START.value)
)


def _cancel_batch_tasks(batch_id: str) -> None:
batch = batch_tasks.get(batch_id)
if not batch:
Expand Down Expand Up @@ -968,6 +984,7 @@ async def run_batch_pipeline(
interval_min: int,
interval_max: int,
concurrency: int,
wait_strategy: str = RegistrationWaitStrategy.START.value,
auto_upload_cpa: bool = False,
cpa_service_ids: List[int] = None,
auto_upload_sub2api: bool = False,
Expand All @@ -986,7 +1003,11 @@ async def run_batch_pipeline(
semaphore = asyncio.Semaphore(concurrency)
counter_lock = asyncio.Lock()
running_tasks_list = []
add_batch_log(f"[系统] 流水线模式启动,并发数: {concurrency},总任务: {len(task_uuids)}")
launch_state = {"launched": 0}
normalized_wait_strategy = normalize_registration_wait_strategy(wait_strategy)
add_batch_log(
f"[系统] 流水线模式启动,并发数: {concurrency},总任务: {len(task_uuids)},等待策略: {_wait_strategy_label(normalized_wait_strategy)}"
)

async def _run_and_release(idx: int, uuid: str, pfx: str):
try:
Expand Down Expand Up @@ -1014,6 +1035,21 @@ async def _run_and_release(idx: int, uuid: str, pfx: str):
add_batch_log(f"{pfx} [失败] 注册失败: {t.error_message}")
update_batch_status(completed=new_completed, success=new_success, failed=new_failed)
finally:
should_delay_release = (
normalized_wait_strategy == RegistrationWaitStrategy.COMPLETION.value
and launch_state["launched"] < len(task_uuids)
and not task_manager.is_batch_cancelled(batch_id)
)
if should_delay_release:
wait_time = random.randint(interval_min, interval_max)
add_batch_log(f"{pfx} 完成后等待 {wait_time} 秒,再启动后续任务")
logger.info(
"批量任务 %s: 任务 %s 完成后等待 %s 秒再释放并发槽",
batch_id,
uuid,
wait_time,
)
await asyncio.sleep(wait_time)
semaphore.release()

try:
Expand All @@ -1035,9 +1071,15 @@ async def _run_and_release(idx: int, uuid: str, pfx: str):
add_batch_log(f"{prefix} 开始注册...")
t = asyncio.create_task(_run_and_release(i, task_uuid, prefix))
running_tasks_list.append(t)
launch_state["launched"] = i + 1

if i < len(task_uuids) - 1 and not task_manager.is_batch_cancelled(batch_id):
if (
normalized_wait_strategy == RegistrationWaitStrategy.START.value
and i < len(task_uuids) - 1
and not task_manager.is_batch_cancelled(batch_id)
):
wait_time = random.randint(interval_min, interval_max)
add_batch_log(f"[系统] 等待 {wait_time} 秒后启动下一个任务")
logger.info(f"批量任务 {batch_id}: 等待 {wait_time} 秒后启动下一个任务")
await asyncio.sleep(wait_time)

Expand Down Expand Up @@ -1087,6 +1129,7 @@ async def run_batch_registration(
interval_max: int,
concurrency: int = 1,
mode: str = "pipeline",
wait_strategy: str = RegistrationWaitStrategy.START.value,
auto_upload_cpa: bool = False,
cpa_service_ids: List[int] = None,
auto_upload_sub2api: bool = False,
Expand All @@ -1113,6 +1156,7 @@ async def run_batch_registration(
batch_id, task_uuids, email_service_type, proxy,
email_service_config, email_service_id,
interval_min, interval_max, concurrency,
wait_strategy=wait_strategy,
auto_upload_cpa=auto_upload_cpa, cpa_service_ids=cpa_service_ids,
auto_upload_sub2api=auto_upload_sub2api, sub2api_service_ids=sub2api_service_ids,
auto_upload_tm=auto_upload_tm, tm_service_ids=tm_service_ids,
Expand All @@ -1135,6 +1179,7 @@ async def run_auto_registration_batch(plan, settings: Settings) -> str:
interval_min = max(0, int(settings.registration_auto_interval_min))
interval_max = max(interval_min, int(settings.registration_auto_interval_max))
concurrency = max(1, int(settings.registration_auto_concurrency))
wait_strategy = _current_wait_strategy(settings)
email_service_id = int(settings.registration_auto_email_service_id or 0) or None
proxy = settings.registration_auto_proxy.strip() or None

Expand Down Expand Up @@ -1178,6 +1223,7 @@ async def run_auto_registration_batch(plan, settings: Settings) -> str:
interval_max=interval_max,
concurrency=concurrency,
mode=mode,
wait_strategy=wait_strategy,
auto_upload_cpa=True,
cpa_service_ids=[plan.cpa_service_id],
auto_upload_sub2api=False,
Expand Down Expand Up @@ -1339,6 +1385,7 @@ async def _start_batch_registration_internal(
request.interval_max,
request.concurrency,
request.mode,
_current_wait_strategy(),
request.auto_upload_cpa,
request.cpa_service_ids,
request.auto_upload_sub2api,
Expand Down Expand Up @@ -1432,6 +1479,7 @@ async def _start_outlook_batch_registration_internal(
request.interval_max,
request.concurrency,
request.mode,
_current_wait_strategy(),
request.auto_upload_cpa,
request.cpa_service_ids,
request.auto_upload_sub2api,
Expand Down Expand Up @@ -2084,6 +2132,7 @@ async def run_outlook_batch_registration(
interval_max: int,
concurrency: int = 1,
mode: str = "pipeline",
wait_strategy: str = RegistrationWaitStrategy.START.value,
auto_upload_cpa: bool = False,
cpa_service_ids: List[int] = None,
auto_upload_sub2api: bool = False,
Expand Down Expand Up @@ -2129,6 +2178,7 @@ async def run_outlook_batch_registration(
interval_max=interval_max,
concurrency=concurrency,
mode=mode,
wait_strategy=wait_strategy,
auto_upload_cpa=auto_upload_cpa,
cpa_service_ids=cpa_service_ids,
auto_upload_sub2api=auto_upload_sub2api,
Expand Down Expand Up @@ -2371,4 +2421,3 @@ async def delete_scheduled_registration_job(job_uuid: str):
raise HTTPException(status_code=400, detail="无法删除执行中的计划任务")
crud.delete_scheduled_registration_job(db, job_uuid)
return {'success': True, 'message': '计划任务已删除'}

12 changes: 12 additions & 0 deletions src/web/routes/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pydantic import BaseModel

from ...config.settings import get_settings, update_settings
from ...config.constants import RegistrationWaitStrategy, normalize_registration_wait_strategy
from ...core.auto_registration import (
trigger_auto_registration_check,
update_auto_registration_state,
Expand Down Expand Up @@ -54,6 +55,7 @@ class RegistrationSettings(BaseModel):
default_password_length: int = 12
sleep_min: int = 5
sleep_max: int = 30
wait_strategy: str = RegistrationWaitStrategy.START.value
entry_flow: str = "native"
auto_enabled: bool = False
auto_check_interval: int = 60
Expand Down Expand Up @@ -99,6 +101,7 @@ async def get_all_settings():

entry_flow_raw = str(settings.registration_entry_flow or "native").strip().lower()
entry_flow = "abcard" if entry_flow_raw == "abcard" else "native"
wait_strategy = normalize_registration_wait_strategy(getattr(settings, "registration_wait_strategy", "start"))

return {
"proxy": {
Expand All @@ -120,6 +123,7 @@ async def get_all_settings():
"default_password_length": settings.registration_default_password_length,
"sleep_min": settings.registration_sleep_min,
"sleep_max": settings.registration_sleep_max,
"wait_strategy": wait_strategy,
"entry_flow": entry_flow,
"auto_enabled": settings.registration_auto_enabled,
"auto_check_interval": settings.registration_auto_check_interval,
Expand Down Expand Up @@ -310,13 +314,15 @@ async def get_registration_settings():

entry_flow_raw = str(settings.registration_entry_flow or "native").strip().lower()
entry_flow = "abcard" if entry_flow_raw == "abcard" else "native"
wait_strategy = normalize_registration_wait_strategy(getattr(settings, "registration_wait_strategy", "start"))

return {
"max_retries": settings.registration_max_retries,
"timeout": settings.registration_timeout,
"default_password_length": settings.registration_default_password_length,
"sleep_min": settings.registration_sleep_min,
"sleep_max": settings.registration_sleep_max,
"wait_strategy": wait_strategy,
"entry_flow": entry_flow,
"auto_enabled": settings.registration_auto_enabled,
"auto_check_interval": settings.registration_auto_check_interval,
Expand Down Expand Up @@ -344,6 +350,11 @@ async def update_registration_settings(request: RegistrationSettings):
if request.sleep_min < 1 or request.sleep_max < request.sleep_min:
raise HTTPException(status_code=400, detail="注册等待时间参数无效")

wait_strategy_raw = str(request.wait_strategy or RegistrationWaitStrategy.START.value).strip().lower()
if wait_strategy_raw not in {RegistrationWaitStrategy.START.value, RegistrationWaitStrategy.COMPLETION.value}:
raise HTTPException(status_code=400, detail="等待策略仅支持 start / completion")
wait_strategy = normalize_registration_wait_strategy(wait_strategy_raw)

flow_raw = (request.entry_flow or "native").strip().lower()
# 兼容旧前端历史值:outlook -> native(Outlook 邮箱会在运行时自动走 outlook 链路)。
flow = "native" if flow_raw == "outlook" else flow_raw
Expand Down Expand Up @@ -399,6 +410,7 @@ async def update_registration_settings(request: RegistrationSettings):
registration_default_password_length=request.default_password_length,
registration_sleep_min=request.sleep_min,
registration_sleep_max=request.sleep_max,
registration_wait_strategy=wait_strategy,
registration_entry_flow=flow,
registration_auto_enabled=request.auto_enabled,
registration_auto_check_interval=request.auto_check_interval,
Expand Down
Loading