diff --git a/docs/reviews/CR-ACCOUNT-BATCH-ACTIONS-2026-04-02.md b/docs/reviews/CR-ACCOUNT-BATCH-ACTIONS-2026-04-02.md new file mode 100644 index 00000000..bd0c9986 --- /dev/null +++ b/docs/reviews/CR-ACCOUNT-BATCH-ACTIONS-2026-04-02.md @@ -0,0 +1,95 @@ +# Account Batch Actions Review + +## Scope + +- Branch: `feature/account-batch-action-tooltips` +- Base: `upstream/main` +- Goal: + - fix the three broken batch action routes on the accounts page + - keep button labels stable in idle state + - replace native `title` hints with hover bubbles shown below the buttons + +## Verification + +### Static Check + +Command: + +```bash +python3 -m py_compile src/web/routes/accounts.py src/web/routes/payment.py +``` + +Result: + +```text +exit code 0 +``` + +### Runtime Check + +Isolated instance: + +- URL: `http://127.0.0.1:16667` +- Access password: `admin123` + +Command: + +```bash +python3 - <<'PY' +import urllib.parse, urllib.request, http.cookiejar, json +jar = http.cookiejar.CookieJar() +opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(jar)) +login_data = urllib.parse.urlencode({'password': 'admin123'}).encode() +login_req = urllib.request.Request('http://127.0.0.1:16667/login', data=login_data, method='POST') +login_req.add_header('Content-Type', 'application/x-www-form-urlencoded') +login_resp = opener.open(login_req, timeout=10) +print('login_status', login_resp.status) +accounts_resp = opener.open('http://127.0.0.1:16667/accounts', timeout=10) +print('accounts_status', accounts_resp.status) +for path, poll_prefix in [ + ('/api/accounts/batch-refresh/async', '/api/accounts/tasks/'), + ('/api/accounts/batch-validate/async', '/api/accounts/tasks/'), + ('/api/payment/accounts/batch-check-subscription/async', '/api/payment/ops/tasks/'), +]: + req = urllib.request.Request( + f'http://127.0.0.1:16667{path}', + data=json.dumps({'ids': [], 'select_all': True}).encode(), + method='POST', + headers={'Content-Type': 'application/json'}, + ) + resp = opener.open(req, timeout=20) + payload = json.loads(resp.read().decode() or '{}') + task_id = payload.get('id') or payload.get('task_id') + print(path, resp.status, task_id) + if task_id: + poll = opener.open(f'http://127.0.0.1:16667{poll_prefix}{task_id}', timeout=20) + poll_payload = json.loads(poll.read().decode() or '{}') + print(poll_prefix, poll.status, poll_payload.get('status')) +PY +``` + +Result: + +```text +login_status 200 +accounts_status 200 +/api/accounts/batch-refresh/async 200 accounts-batch-refresh-f0b2d40566ba +/api/accounts/tasks/ 200 running +/api/accounts/batch-validate/async 200 accounts-batch-validate-1d5627590eb7 +/api/accounts/tasks/ 200 completed +/api/payment/accounts/batch-check-subscription/async 200 payment-batch-check-subscription-227ec45d862f +/api/payment/ops/tasks/ 200 completed +``` + +### UI Check + +- Hovering `刷新Token` shows a custom bubble below the button +- Hovering `验证Token` shows a custom bubble below the button +- Hovering `检测订阅` shows a custom bubble below the button +- When selection count changes, these three buttons keep stable idle labels + +## Conclusion + +- The broken batch action routes are fixed on this branch +- Hover help now matches the requested interaction model +- No formal environment deployment was required for this review diff --git a/src/web/routes/accounts.py b/src/web/routes/accounts.py index 104e27a3..c731ae07 100644 --- a/src/web/routes/accounts.py +++ b/src/web/routes/accounts.py @@ -7,6 +7,8 @@ import logging import re import threading +import time +import uuid import zipfile import base64 from datetime import datetime, timedelta, timezone @@ -50,6 +52,449 @@ ) _QUICK_REFRESH_WORKFLOW_LOCK = threading.Lock() +_ACCOUNT_TASK_POLL_INTERVAL_SECONDS = 0.25 + + +def _account_task_id(task_type: str) -> str: + normalized = re.sub(r"[^a-z0-9]+", "-", str(task_type or "").strip().lower()).strip("-") or "task" + return f"accounts-{normalized}-{uuid.uuid4().hex[:12]}" + + +def _get_account_task_or_404(task_id: str) -> Dict[str, Any]: + snapshot = task_manager.get_domain_task("accounts", task_id) + if not snapshot: + raise HTTPException(status_code=404, detail="任务不存在") + return snapshot + + +def _wait_account_task_if_paused(task_id: str) -> bool: + while True: + snapshot = task_manager.get_domain_task("accounts", task_id) or {} + if bool(snapshot.get("cancel_requested")): + return False + if not bool(snapshot.get("pause_requested")): + return True + task_manager.update_domain_task( + "accounts", + task_id, + status="paused", + paused=True, + message="任务已暂停,等待继续", + ) + time.sleep(_ACCOUNT_TASK_POLL_INTERVAL_SECONDS) + + +def _finalize_account_async_task( + task_id: str, + *, + status: str, + message: str, + result: Dict[str, Any], + error: Optional[str] = None, +) -> None: + task_manager.update_domain_task( + "accounts", + task_id, + status=status, + paused=False, + pause_requested=False, + finished_at=datetime.utcnow().isoformat(), + message=message, + error=error, + result=result, + ) + task_manager.release_domain_slot("accounts", task_id) + + +def _submit_account_async_task(task_id: str, runner, payload: Dict[str, Any]) -> None: + try: + task_manager.executor.submit(runner, task_id, payload) + except Exception as exc: + logger.exception("提交 accounts 异步任务失败: task_id=%s error=%s", task_id, exc) + task_manager.update_domain_task( + "accounts", + task_id, + status="failed", + finished_at=datetime.utcnow().isoformat(), + message=f"任务提交失败: {exc}", + error=str(exc), + ) + raise HTTPException(status_code=500, detail="任务提交失败") from exc + + +def _run_batch_refresh_async(task_id: str, request_data: Dict[str, Any]) -> None: + acquired, running, quota = task_manager.try_acquire_domain_slot("accounts", task_id) + if not acquired: + reason = f"并发配额已满(running={running}, quota={quota})" + task_manager.update_domain_task( + "accounts", + task_id, + status="failed", + finished_at=datetime.utcnow().isoformat(), + message=reason, + error=reason, + ) + return + + request = BatchRefreshRequest(**request_data) + proxy = _get_proxy(request.proxy) + result = {"success_count": 0, "failed_count": 0, "errors": [], "details": []} + + try: + with get_db() as db: + ids = resolve_account_ids( + db, request.ids, request.select_all, + request.status_filter, request.email_service_filter, request.search_filter + ) + + total = len(ids) + task_manager.update_domain_task( + "accounts", + task_id, + status="running", + started_at=datetime.utcnow().isoformat(), + paused=False, + message="批量刷新 Token 执行中", + progress={"completed": 0, "total": total}, + ) + + for index, account_id in enumerate(ids, start=1): + if task_manager.is_domain_task_cancel_requested("accounts", task_id): + _finalize_account_async_task( + task_id, + status="cancelled", + message="批量刷新已取消", + result=result, + ) + return + if not _wait_account_task_if_paused(task_id): + _finalize_account_async_task( + task_id, + status="cancelled", + message="批量刷新已取消", + result=result, + ) + return + + task_manager.update_domain_task( + "accounts", + task_id, + status="running", + paused=False, + message=f"正在刷新第 {index}/{total} 个账号", + ) + + detail: Dict[str, Any] = {"id": account_id, "success": False} + try: + refresh_result = do_refresh(account_id, proxy) + if refresh_result.success: + result["success_count"] += 1 + detail["success"] = True + detail["status"] = AccountStatus.ACTIVE.value + else: + result["failed_count"] += 1 + detail["error"] = refresh_result.error_message + detail["status"] = AccountStatus.FAILED.value + result["errors"].append({"id": account_id, "error": refresh_result.error_message}) + except Exception as exc: + result["failed_count"] += 1 + detail["error"] = str(exc) + detail["status"] = AccountStatus.FAILED.value + result["errors"].append({"id": account_id, "error": str(exc)}) + + result["details"].append(detail) + task_manager.append_domain_task_detail("accounts", task_id, detail) + task_manager.set_domain_task_progress("accounts", task_id, completed=index, total=total) + + _finalize_account_async_task( + task_id, + status="completed", + message=f"批量刷新完成:成功 {result['success_count']},失败 {result['failed_count']}", + result=result, + ) + except Exception as exc: + logger.exception("批量刷新异步任务失败: task_id=%s error=%s", task_id, exc) + _finalize_account_async_task( + task_id, + status="failed", + message=f"批量刷新异常: {exc}", + result=result, + error=str(exc), + ) + + +def _run_batch_validate_async(task_id: str, request_data: Dict[str, Any]) -> None: + acquired, running, quota = task_manager.try_acquire_domain_slot("accounts", task_id) + if not acquired: + reason = f"并发配额已满(running={running}, quota={quota})" + task_manager.update_domain_task( + "accounts", + task_id, + status="failed", + finished_at=datetime.utcnow().isoformat(), + message=reason, + error=reason, + ) + return + + request = BatchValidateRequest(**request_data) + proxy = _get_proxy(request.proxy) + result = {"valid_count": 0, "invalid_count": 0, "details": []} + + try: + with get_db() as db: + ids = resolve_account_ids( + db, request.ids, request.select_all, + request.status_filter, request.email_service_filter, request.search_filter + ) + + total = len(ids) + task_manager.update_domain_task( + "accounts", + task_id, + status="running", + started_at=datetime.utcnow().isoformat(), + paused=False, + message="批量验证 Token 执行中", + progress={"completed": 0, "total": total}, + ) + + for index, account_id in enumerate(ids, start=1): + if task_manager.is_domain_task_cancel_requested("accounts", task_id): + _finalize_account_async_task( + task_id, + status="cancelled", + message="批量验证已取消", + result=result, + ) + return + if not _wait_account_task_if_paused(task_id): + _finalize_account_async_task( + task_id, + status="cancelled", + message="批量验证已取消", + result=result, + ) + return + + task_manager.update_domain_task( + "accounts", + task_id, + status="running", + paused=False, + message=f"正在验证第 {index}/{total} 个账号", + ) + + detail: Dict[str, Any] = {"id": account_id, "valid": False, "status": AccountStatus.FAILED.value} + try: + is_valid, error = do_validate(account_id, proxy) + detail["valid"] = bool(is_valid) + detail["error"] = error + detail["status"] = AccountStatus.ACTIVE.value if is_valid else AccountStatus.FAILED.value + if is_valid: + result["valid_count"] += 1 + else: + result["invalid_count"] += 1 + except Exception as exc: + try: + with get_db() as db: + account = crud.get_account_by_id(db, account_id) + if account and account.status != AccountStatus.FAILED.value: + crud.update_account(db, account_id, status=AccountStatus.FAILED.value) + except Exception: + logger.debug("异步验证写回 failed 状态失败: account_id=%s", account_id, exc_info=True) + detail["error"] = str(exc) + result["invalid_count"] += 1 + + result["details"].append(detail) + task_manager.append_domain_task_detail("accounts", task_id, detail) + task_manager.set_domain_task_progress("accounts", task_id, completed=index, total=total) + + _finalize_account_async_task( + task_id, + status="completed", + message=f"批量验证完成:有效 {result['valid_count']},无效 {result['invalid_count']}", + result=result, + ) + except Exception as exc: + logger.exception("批量验证异步任务失败: task_id=%s error=%s", task_id, exc) + _finalize_account_async_task( + task_id, + status="failed", + message=f"批量验证异常: {exc}", + result=result, + error=str(exc), + ) + + +def _resolve_overview_account_ids(request: "OverviewRefreshRequest") -> List[int]: + with get_db() as db: + ids = resolve_account_ids( + db, + request.ids, + request.select_all, + request.status_filter, + request.email_service_filter, + request.search_filter, + ) + if ids: + return ids + candidates = db.query(Account).filter( + func.lower(Account.subscription_type).in_(PAID_SUBSCRIPTION_TYPES) + ).order_by(Account.created_at.desc()).all() + return [acc.id for acc in candidates if not _is_overview_card_removed(acc)] + + +def _run_overview_refresh_async(task_id: str, request_data: Dict[str, Any]) -> None: + acquired, running, quota = task_manager.try_acquire_domain_slot("accounts", task_id) + if not acquired: + reason = f"并发配额已满(running={running}, quota={quota})" + task_manager.update_domain_task( + "accounts", + task_id, + status="failed", + finished_at=datetime.utcnow().isoformat(), + message=reason, + error=reason, + ) + return + + request = OverviewRefreshRequest(**request_data) + proxy = _get_proxy(request.proxy) + result = {"success_count": 0, "failed_count": 0, "details": []} + + try: + ids = _resolve_overview_account_ids(request) + total = len(ids) + task_manager.update_domain_task( + "accounts", + task_id, + status="running", + started_at=datetime.utcnow().isoformat(), + paused=False, + message="账号总览刷新执行中", + progress={"completed": 0, "total": total}, + ) + + with get_db() as db: + for index, account_id in enumerate(ids, start=1): + if task_manager.is_domain_task_cancel_requested("accounts", task_id): + _finalize_account_async_task( + task_id, + status="cancelled", + message="账号总览刷新已取消", + result=result, + ) + return + if not _wait_account_task_if_paused(task_id): + _finalize_account_async_task( + task_id, + status="cancelled", + message="账号总览刷新已取消", + result=result, + ) + return + + task_manager.update_domain_task( + "accounts", + task_id, + status="running", + paused=False, + message=f"正在刷新第 {index}/{total} 个总览", + ) + + account = crud.get_account_by_id(db, account_id) + detail: Dict[str, Any] = {"id": account_id, "success": False} + if not account: + result["failed_count"] += 1 + detail["error"] = "账号不存在" + elif (not _is_paid_subscription(account.subscription_type)) or _is_overview_card_removed(account): + detail["error"] = "账号不在 Codex 卡片范围内,已跳过" + else: + account_proxy = (account.proxy_used or "").strip() or proxy + overview, updated = _get_account_overview_data( + db, + account, + force_refresh=request.force, + proxy=account_proxy, + allow_network=True, + ) + if updated: + db.commit() + if overview.get("hourly_quota", {}).get("status") == "unknown" and overview.get("weekly_quota", {}).get("status") == "unknown": + result["failed_count"] += 1 + detail["error"] = overview.get("error") or "未获取到配额数据" + else: + result["success_count"] += 1 + detail["success"] = True + detail["plan_type"] = overview.get("plan_type") + + result["details"].append(detail) + task_manager.append_domain_task_detail("accounts", task_id, detail) + task_manager.set_domain_task_progress("accounts", task_id, completed=index, total=total) + + _finalize_account_async_task( + task_id, + status="completed", + message=f"账号总览刷新完成:成功 {result['success_count']},失败 {result['failed_count']}", + result=result, + ) + except Exception as exc: + logger.exception("账号总览异步刷新失败: task_id=%s error=%s", task_id, exc) + _finalize_account_async_task( + task_id, + status="failed", + message=f"账号总览刷新异常: {exc}", + result=result, + error=str(exc), + ) + + +def cancel_account_async_task(task_id: str) -> Dict[str, Any]: + _get_account_task_or_404(task_id) + snapshot = task_manager.request_domain_task_cancel("accounts", task_id) + return { + "success": True, + "task_id": task_id, + "status": "cancelling", + "task": snapshot, + } + + +def pause_account_async_task(task_id: str) -> Dict[str, Any]: + _get_account_task_or_404(task_id) + snapshot = task_manager.request_domain_task_pause("accounts", task_id) + return { + "success": True, + "task_id": task_id, + "status": "paused", + "task": snapshot, + } + + +def resume_account_async_task(task_id: str) -> Dict[str, Any]: + _get_account_task_or_404(task_id) + snapshot = task_manager.request_domain_task_resume("accounts", task_id) + return { + "success": True, + "task_id": task_id, + "status": "running", + "task": snapshot, + } + + +def retry_account_async_task(task_id: str) -> Dict[str, Any]: + snapshot = _get_account_task_or_404(task_id) + payload = dict(snapshot.get("payload") or {}) + task_type = str(snapshot.get("task_type") or "").strip().lower() + + if task_type == "batch_refresh": + return create_batch_refresh_async_task(BatchRefreshRequest(**payload)) + if task_type == "batch_validate": + return create_batch_validate_async_task(BatchValidateRequest(**payload)) + if task_type == "overview_refresh": + return create_overview_refresh_async_task(OverviewRefreshRequest(**payload)) + raise HTTPException(status_code=400, detail="当前任务类型不支持重试") def _is_retryable_validate_error(error_message: Optional[str]) -> bool: @@ -1340,6 +1785,23 @@ async def refresh_accounts_overview(request: OverviewRefreshRequest): return result +@router.post("/overview/refresh/async") +def create_overview_refresh_async_task(request: OverviewRefreshRequest): + payload = request.model_dump() + ids = _resolve_overview_account_ids(request) + task_id = _account_task_id("overview-refresh") + snapshot = task_manager.register_domain_task( + domain="accounts", + task_id=task_id, + task_type="overview_refresh", + payload=payload, + progress={"completed": 0, "total": len(ids)}, + max_retries=3, + ) + _submit_account_async_task(task_id, _run_overview_refresh_async, payload) + return snapshot + + @router.get("/current") async def get_current_account(): """获取当前已切换的账号""" @@ -1948,6 +2410,28 @@ async def batch_refresh_tokens(request: BatchRefreshRequest, background_tasks: B return results +@router.post("/batch-refresh/async") +def create_batch_refresh_async_task(request: BatchRefreshRequest): + payload = request.model_dump() + with get_db() as db: + ids = resolve_account_ids( + db, request.ids, request.select_all, + request.status_filter, request.email_service_filter, request.search_filter + ) + + task_id = _account_task_id("batch-refresh") + snapshot = task_manager.register_domain_task( + domain="accounts", + task_id=task_id, + task_type="batch_refresh", + payload=payload, + progress={"completed": 0, "total": len(ids)}, + max_retries=3, + ) + _submit_account_async_task(task_id, _run_batch_refresh_async, payload) + return snapshot + + @router.post("/{account_id}/refresh") async def refresh_account_token(account_id: int, request: Optional[TokenRefreshRequest] = Body(default=None)): """刷新单个账号的 Token""" @@ -2020,6 +2504,28 @@ async def batch_validate_tokens(request: BatchValidateRequest): return _run_batch_validate_tokens(request) +@router.post("/batch-validate/async") +def create_batch_validate_async_task(request: BatchValidateRequest): + payload = request.model_dump() + with get_db() as db: + ids = resolve_account_ids( + db, request.ids, request.select_all, + request.status_filter, request.email_service_filter, request.search_filter + ) + + task_id = _account_task_id("batch-validate") + snapshot = task_manager.register_domain_task( + domain="accounts", + task_id=task_id, + task_type="batch_validate", + payload=payload, + progress={"completed": 0, "total": len(ids)}, + max_retries=3, + ) + _submit_account_async_task(task_id, _run_batch_validate_async, payload) + return snapshot + + def run_quick_refresh_workflow(source: str = "manual") -> Dict[str, Any]: if not _QUICK_REFRESH_WORKFLOW_LOCK.acquire(blocking=False): raise RuntimeError("quick_refresh_workflow_busy") @@ -2097,6 +2603,11 @@ async def validate_account_token(account_id: int, request: Optional[TokenValidat } +@router.get("/tasks/{task_id}") +def get_account_async_task(task_id: str): + return _get_account_task_or_404(task_id) + + # ============== CPA 上传相关 ============== class CPAUploadRequest(BaseModel): diff --git a/src/web/routes/payment.py b/src/web/routes/payment.py index c1dbbcd1..d7d56f5b 100644 --- a/src/web/routes/payment.py +++ b/src/web/routes/payment.py @@ -6,7 +6,7 @@ import os import re import uuid -from typing import Optional, List +from typing import Optional, List, Dict, Any from datetime import datetime import time from urllib.parse import urlparse, urlunparse @@ -36,6 +36,7 @@ from ...core.openai.random_billing import generate_random_billing_profile from ...core.openai.token_refresh import TokenRefreshManager from ...core.dynamic_proxy import get_proxy_url_for_task +from ..task_manager import task_manager logger = logging.getLogger(__name__) router = APIRouter() @@ -64,6 +65,74 @@ "country, region, or territory not supported", "request_forbidden", ) +_PAYMENT_TASK_POLL_INTERVAL_SECONDS = 0.25 + + +def _payment_task_id(task_type: str) -> str: + normalized = re.sub(r"[^a-z0-9]+", "-", str(task_type or "").strip().lower()).strip("-") or "task" + return f"payment-{normalized}-{uuid.uuid4().hex[:12]}" + + +def _get_payment_task_or_404(task_id: str) -> Dict[str, Any]: + snapshot = task_manager.get_domain_task("payment", task_id) + if not snapshot: + raise HTTPException(status_code=404, detail="任务不存在") + return snapshot + + +def _wait_payment_task_if_paused(task_id: str) -> bool: + while True: + snapshot = task_manager.get_domain_task("payment", task_id) or {} + if bool(snapshot.get("cancel_requested")): + return False + if not bool(snapshot.get("pause_requested")): + return True + task_manager.update_domain_task( + "payment", + task_id, + status="paused", + paused=True, + message="任务已暂停,等待继续", + ) + time.sleep(_PAYMENT_TASK_POLL_INTERVAL_SECONDS) + + +def _finalize_payment_async_task( + task_id: str, + *, + status: str, + message: str, + result: Dict[str, Any], + error: Optional[str] = None, +) -> None: + task_manager.update_domain_task( + "payment", + task_id, + status=status, + paused=False, + pause_requested=False, + finished_at=datetime.utcnow().isoformat(), + message=message, + error=error, + result=result, + ) + task_manager.release_domain_slot("payment", task_id) + + +def _submit_payment_async_task(task_id: str, runner, payload: Dict[str, Any]) -> None: + try: + task_manager.executor.submit(runner, task_id, payload) + except Exception as exc: + logger.exception("提交 payment 异步任务失败: task_id=%s error=%s", task_id, exc) + task_manager.update_domain_task( + "payment", + task_id, + status="failed", + finished_at=datetime.utcnow().isoformat(), + message=f"任务提交失败: {exc}", + error=str(exc), + ) + raise HTTPException(status_code=500, detail="任务提交失败") from exc def _is_retryable_subscription_check_error(error_message: Optional[str]) -> bool: @@ -3261,6 +3330,177 @@ def delete_bind_card_task(task_id: int): # ============== 订阅状态 ============== + +def _run_batch_check_subscription_async(task_id: str, request_data: Dict[str, Any]) -> None: + acquired, running, quota = task_manager.try_acquire_domain_slot("payment", task_id) + if not acquired: + reason = f"并发配额已满(running={running}, quota={quota})" + task_manager.update_domain_task( + "payment", + task_id, + status="failed", + finished_at=datetime.utcnow().isoformat(), + message=reason, + error=reason, + ) + return + + request = BatchCheckSubscriptionRequest(**request_data) + explicit_proxy = _normalize_proxy_value(request.proxy) + result = {"success_count": 0, "failed_count": 0, "details": []} + + try: + with get_db() as db: + ids = resolve_account_ids( + db, request.ids, request.select_all, + request.status_filter, request.email_service_filter, request.search_filter + ) + + total = len(ids) + task_manager.update_domain_task( + "payment", + task_id, + status="running", + started_at=datetime.utcnow().isoformat(), + paused=False, + message="批量检测订阅执行中", + progress={"completed": 0, "total": total}, + ) + + with get_db() as db: + for index, account_id in enumerate(ids, start=1): + if task_manager.is_domain_task_cancel_requested("payment", task_id): + _finalize_payment_async_task( + task_id, + status="cancelled", + message="批量检测订阅已取消", + result=result, + ) + return + if not _wait_payment_task_if_paused(task_id): + _finalize_payment_async_task( + task_id, + status="cancelled", + message="批量检测订阅已取消", + result=result, + ) + return + + task_manager.update_domain_task( + "payment", + task_id, + status="running", + paused=False, + message=f"正在检测第 {index}/{total} 个账号的订阅", + ) + + account = db.query(Account).filter(Account.id == account_id).first() + detail: Dict[str, Any] = {"id": account_id, "email": None, "success": False} + if not account: + result["failed_count"] += 1 + detail["error"] = "账号不存在" + else: + detail["email"] = account.email + try: + runtime_proxy = _resolve_runtime_proxy(explicit_proxy, account) + subscription_detail, refreshed = _check_subscription_detail_with_retry( + db=db, + account=account, + proxy=runtime_proxy, + allow_token_refresh=True, + ) + status = str(subscription_detail.get("status") or "free").lower() + confidence = str(subscription_detail.get("confidence") or "low").lower() + + if status in ("plus", "team"): + account.subscription_type = status + account.subscription_at = utcnow_naive() + elif status == "free" and confidence == "high": + account.subscription_type = None + account.subscription_at = None + + db.commit() + result["success_count"] += 1 + detail.update( + { + "success": True, + "subscription_type": status, + "confidence": confidence, + "source": subscription_detail.get("source"), + "token_refreshed": refreshed, + } + ) + except Exception as exc: + result["failed_count"] += 1 + detail["error"] = str(exc) + + result["details"].append(detail) + task_manager.append_domain_task_detail("payment", task_id, detail) + task_manager.set_domain_task_progress("payment", task_id, completed=index, total=total) + + _finalize_payment_async_task( + task_id, + status="completed", + message=f"批量检测订阅完成:成功 {result['success_count']},失败 {result['failed_count']}", + result=result, + ) + except Exception as exc: + logger.exception("批量检测订阅异步任务失败: task_id=%s error=%s", task_id, exc) + _finalize_payment_async_task( + task_id, + status="failed", + message=f"批量检测订阅异常: {exc}", + result=result, + error=str(exc), + ) + + +def get_payment_op_task(task_id: str) -> Dict[str, Any]: + return _get_payment_task_or_404(task_id) + + +def cancel_payment_op_task(task_id: str) -> Dict[str, Any]: + _get_payment_task_or_404(task_id) + snapshot = task_manager.request_domain_task_cancel("payment", task_id) + return { + "success": True, + "task_id": task_id, + "status": "cancelling", + "task": snapshot, + } + + +def pause_payment_op_task(task_id: str) -> Dict[str, Any]: + _get_payment_task_or_404(task_id) + snapshot = task_manager.request_domain_task_pause("payment", task_id) + return { + "success": True, + "task_id": task_id, + "status": "paused", + "task": snapshot, + } + + +def resume_payment_op_task(task_id: str) -> Dict[str, Any]: + _get_payment_task_or_404(task_id) + snapshot = task_manager.request_domain_task_resume("payment", task_id) + return { + "success": True, + "task_id": task_id, + "status": "running", + "task": snapshot, + } + + +def retry_payment_op_task(task_id: str) -> Dict[str, Any]: + snapshot = _get_payment_task_or_404(task_id) + payload = dict(snapshot.get("payload") or {}) + task_type = str(snapshot.get("task_type") or "").strip().lower() + if task_type == "batch_check_subscription": + return create_batch_check_subscription_async_task(BatchCheckSubscriptionRequest(**payload)) + raise HTTPException(status_code=400, detail="当前任务类型不支持重试") + + @router.post("/accounts/batch-check-subscription") def batch_check_subscription(request: BatchCheckSubscriptionRequest): """批量检测账号订阅状态""" @@ -3322,6 +3562,33 @@ def batch_check_subscription(request: BatchCheckSubscriptionRequest): return results +@router.post("/accounts/batch-check-subscription/async") +def create_batch_check_subscription_async_task(request: BatchCheckSubscriptionRequest): + payload = request.model_dump() + with get_db() as db: + ids = resolve_account_ids( + db, request.ids, request.select_all, + request.status_filter, request.email_service_filter, request.search_filter + ) + + task_id = _payment_task_id("batch-check-subscription") + snapshot = task_manager.register_domain_task( + domain="payment", + task_id=task_id, + task_type="batch_check_subscription", + payload=payload, + progress={"completed": 0, "total": len(ids)}, + max_retries=3, + ) + _submit_payment_async_task(task_id, _run_batch_check_subscription_async, payload) + return snapshot + + +@router.get("/ops/tasks/{task_id}") +def get_payment_operation_task(task_id: str): + return get_payment_op_task(task_id) + + @router.post("/accounts/{account_id}/mark-subscription") def mark_subscription(account_id: int, request: MarkSubscriptionRequest): """手动标记账号订阅类型""" diff --git a/static/css/style.css b/static/css/style.css index 9fc6ad43..0695686d 100644 --- a/static/css/style.css +++ b/static/css/style.css @@ -1064,6 +1064,66 @@ body { gap: var(--spacing-sm); } +.hover-help { + position: relative; + display: inline-flex; + align-items: center; +} + +.hover-help-bubble { + position: absolute; + top: calc(100% + 8px); + left: 50%; + z-index: 120; + width: 260px; + padding: 10px 12px; + border: 1px solid var(--border); + border-radius: var(--radius); + background: var(--surface); + box-shadow: var(--shadow-lg); + color: var(--text-primary); + font-size: 0.75rem; + line-height: 1.5; + opacity: 0; + visibility: hidden; + transform: translateX(-50%) translateY(-4px); + transition: opacity var(--transition), transform var(--transition), visibility var(--transition); + pointer-events: none; +} + +.hover-help-bubble::before { + content: ''; + position: absolute; + top: -6px; + left: 50%; + width: 10px; + height: 10px; + background: var(--surface); + border-top: 1px solid var(--border); + border-left: 1px solid var(--border); + transform: translateX(-50%) rotate(45deg); +} + +.hover-help-bubble strong { + display: block; + margin-bottom: 4px; + color: var(--text-primary); + font-size: 0.8125rem; + font-weight: 600; +} + +.hover-help-bubble span { + display: block; + color: var(--text-secondary); +} + +.hover-help:hover .hover-help-bubble, +.hover-help:focus-within .hover-help-bubble { + opacity: 1; + visibility: visible; + transform: translateX(-50%) translateY(0); +} + .form-select, .form-input { padding: 8px 12px; diff --git a/static/js/accounts.js b/static/js/accounts.js index e681e213..150c7ffb 100644 --- a/static/js/accounts.js +++ b/static/js/accounts.js @@ -1037,10 +1037,10 @@ function updateBatchButtons() { } elements.batchDeleteBtn.textContent = count > 0 ? `🗑️ 删除 (${count})` : '🗑️ 批量删除'; - elements.batchRefreshBtn.textContent = count > 0 ? `🔄 刷新 (${count})` : '🔄 刷新Token'; - elements.batchValidateBtn.textContent = count > 0 ? `✅ 验证 (${count})` : '✅ 验证Token'; + elements.batchRefreshBtn.textContent = '🔄 刷新Token'; + elements.batchValidateBtn.textContent = '✅ 验证Token'; elements.batchUploadBtn.textContent = count > 0 ? `☁️ 上传 (${count})` : '☁️ 上传'; - elements.batchCheckSubBtn.textContent = count > 0 ? `🔍 检测 (${count})` : '🔍 检测订阅'; + elements.batchCheckSubBtn.textContent = '🔍 检测订阅'; const pausableCount = getPausableBatchTasks().length; const resumableCount = getResumableBatchTasks().length; diff --git a/templates/accounts.html b/templates/accounts.html index 0ea5dff5..c3e4099e 100644 --- a/templates/accounts.html +++ b/templates/accounts.html @@ -335,15 +335,36 @@