From f08e6ef98c757def66301e96bd9d898ce51b124b Mon Sep 17 00:00:00 2001 From: iHildy Date: Sat, 11 Apr 2026 22:12:45 -0700 Subject: [PATCH 1/3] fix: expand e2e coverage and docs for session sync compatibility --- .agents/skills/opencode-sync-sandbox/SKILL.md | 16 +- README.md | 13 +- scripts/e2e/github_two_instance.py | 356 ++++++++++++++++-- src/sync/apply.test.ts | 278 ++++++++++++++ src/sync/apply.ts | 58 ++- src/sync/paths.test.ts | 26 ++ src/sync/paths.ts | 12 + 7 files changed, 729 insertions(+), 30 deletions(-) create mode 100644 src/sync/apply.test.ts diff --git a/.agents/skills/opencode-sync-sandbox/SKILL.md b/.agents/skills/opencode-sync-sandbox/SKILL.md index 63806d2..c3e3d44 100644 --- a/.agents/skills/opencode-sync-sandbox/SKILL.md +++ b/.agents/skills/opencode-sync-sandbox/SKILL.md @@ -19,10 +19,20 @@ Follow this workflow whenever a task changes sync behavior, config handling, com - `bun run build` 6. Run full isolated E2E before declaring success: - `./.agents/skills/opencode-sync-sandbox/scripts/run-e2e.sh` -7. Report exact evidence: what changed, what tests ran, and E2E artifact path. + - Run E2E variants that directly exercise the features changed in this task + (for example: sessions, secrets, or other changed sync flags/paths). +7. Report exact evidence: what changed, what tests ran, and an E2E artifact review. Do not skip E2E for changes that affect sync workflows, path resolution, repo operations, or command execution. +## E2E Reporting Requirements + +- Do not report the E2E artifact path unless the user explicitly asks for it. +- Read `results/summary.json`, command result payloads under `results/`, and relevant `logs/*.log` tails. +- Summarize the executed flow, pass/fail status, key warnings/errors, and any suspicious signals. +- If warnings exist, explain whether they are expected, tolerated, or need follow-up. +- For feature-flagged runs, report the exact artifact-level behavior for each changed feature (including restart requirements when relevant). + ## Sandbox Rules - Keep all E2E state inside `.memory/e2e/runs//`. @@ -44,6 +54,10 @@ Use this clone to confirm command/server/tool behavior and avoid assumptions. - `./.agents/skills/opencode-sync-sandbox/scripts/preflight.sh` - Full two-instance GitHub E2E: - `./.agents/skills/opencode-sync-sandbox/scripts/run-e2e.sh` + - Enable secrets coverage: + `./.agents/skills/opencode-sync-sandbox/scripts/run-e2e.sh --enable-secrets` + - Enable session database coverage (implies secrets): + `./.agents/skills/opencode-sync-sandbox/scripts/run-e2e.sh --enable-sessions` For additional usage flags, run: diff --git a/README.md b/README.md index 2819bf4..878676a 100644 --- a/README.md +++ b/README.md @@ -117,11 +117,18 @@ Sync your opencode sessions (conversation history from `/sessions`) across machi Synced session data: -- `~/.local/share/opencode/storage/session/` - Session files -- `~/.local/share/opencode/storage/message/` - Message history -- `~/.local/share/opencode/storage/part/` - Message parts +- `~/.local/share/opencode/opencode.db` - Primary SQLite session database (current opencode versions) +- `~/.local/share/opencode/opencode.db-wal` and `~/.local/share/opencode/opencode.db-shm` - SQLite sidecars synced with `opencode.db` when present (for WAL consistency) +- `~/.local/share/opencode/storage/session/` - Legacy session files (pre-SQLite migration compatibility) +- `~/.local/share/opencode/storage/message/` - Legacy message history (pre-SQLite migration compatibility) +- `~/.local/share/opencode/storage/part/` - Legacy message parts (pre-SQLite migration compatibility) - `~/.local/share/opencode/storage/session_diff/` - Session diffs +opencode handles JSON-to-SQLite migration automatically when `opencode.db` is missing, so syncing both +formats supports users who have not migrated yet and users already on SQLite. + +After pulling session changes, restart opencode to ensure the latest session state is loaded. + ### Prompt Stash (private repos only) Sync your stashed prompts and prompt history across machines by setting `"includePromptStash": true`. This requires `includeSecrets` to also be enabled since prompts may contain sensitive data. diff --git a/scripts/e2e/github_two_instance.py b/scripts/e2e/github_two_instance.py index 9d5c43b..e757d76 100755 --- a/scripts/e2e/github_two_instance.py +++ b/scripts/e2e/github_two_instance.py @@ -6,9 +6,12 @@ import fcntl import json import os +import re import shutil import signal +import sqlite3 import subprocess +import tempfile import threading import time import urllib.error @@ -315,7 +318,11 @@ def parse_args() -> argparse.Namespace: ) parser.add_argument('--owner', help='GitHub owner for ephemeral test repos. Defaults to gh api user login.') parser.add_argument('--repo-prefix', default='opencode-sync-e2e', help='Prefix for ephemeral repo names.') - parser.add_argument('--model', default='opencode/gpt-5-nano', help='Model for command execution.') + parser.add_argument( + '--model', + default='opencode/big-pickle', + help='Model for command execution (override for lower-cost/faster runs if desired).', + ) parser.add_argument( '--timeout-sec', type=int, @@ -332,6 +339,16 @@ def parse_args() -> argparse.Namespace: action='store_true', help='Run preflight checks only and exit.', ) + parser.add_argument( + '--enable-secrets', + action='store_true', + help='Enable includeSecrets=true in opencode-synced config during the E2E flow.', + ) + parser.add_argument( + '--enable-sessions', + action='store_true', + help='Enable includeSessions=true (implies includeSecrets) and run session-db assertions.', + ) return parser.parse_args() @@ -522,6 +539,29 @@ def create_session(client: ApiClient) -> str: return session_id +def create_named_session(client: ApiClient, title: str) -> str: + payload = client.post_json('/session', {'title': title}, timeout_sec=40) + if not isinstance(payload, dict) or 'id' not in payload: + raise RuntimeError(f'Unexpected /session response: {payload}') + session_id = str(payload['id']) + client.patch_json( + f'/session/{urllib.parse.quote(session_id)}', + {'permission': SESSION_PERMISSION_RULES}, + timeout_sec=40, + ) + return session_id + + +def rename_session(client: ApiClient, session_id: str, title: str) -> None: + payload = client.patch_json( + f'/session/{urllib.parse.quote(session_id)}', + {'title': title}, + timeout_sec=40, + ) + if not isinstance(payload, dict): + raise RuntimeError(f'Unexpected response while renaming session {session_id}: {payload}') + + def run_command(client: ApiClient, session_id: str, command: str, arguments: str, timeout_sec: int) -> dict[str, Any]: payload = client.post_json( f'/session/{urllib.parse.quote(session_id)}/command', @@ -571,6 +611,158 @@ def append_sentinel(path: Path, sentinel: str) -> None: path.write_text(updated, encoding='utf-8') +def update_sync_config_flags( + config_path: Path, + *, + include_secrets: bool, + include_sessions: bool, +) -> dict[str, Any]: + if not config_path.exists(): + raise RuntimeError(f'Expected sync config at {config_path}, but it does not exist.') + + raw = config_path.read_text(encoding='utf-8') + try: + config = json.loads(raw) + except json.JSONDecodeError as error: + try: + config = parse_jsonc(raw) + except Exception as parse_error: + raise RuntimeError( + f'Unable to parse sync config as JSON/JSONC at {config_path}: {error}; {parse_error}' + ) from parse_error + + if include_sessions: + include_secrets = True + if include_secrets: + config['includeSecrets'] = True + if include_sessions: + config['includeSessions'] = True + + write_json(config_path, config) + return config + + +def parse_jsonc(content: str) -> dict[str, Any]: + output_chars: list[str] = [] + in_string = False + in_single_line = False + in_multi_line = False + escape_next = False + index = 0 + length = len(content) + + while index < length: + current = content[index] + next_char = content[index + 1] if index + 1 < length else '' + + if in_single_line: + if current == '\n': + in_single_line = False + output_chars.append(current) + index += 1 + continue + + if in_multi_line: + if current == '*' and next_char == '/': + in_multi_line = False + index += 2 + continue + index += 1 + continue + + if in_string: + output_chars.append(current) + if escape_next: + escape_next = False + elif current == '\\': + escape_next = True + elif current == '"': + in_string = False + index += 1 + continue + + if current == '/' and next_char == '/': + in_single_line = True + index += 2 + continue + + if current == '/' and next_char == '*': + in_multi_line = True + index += 2 + continue + + if current == '"': + in_string = True + output_chars.append(current) + index += 1 + continue + + output_chars.append(current) + index += 1 + + cleaned = ''.join(output_chars) + # Remove trailing commas before object/array close. + cleaned = re.sub(r',(\s*[}\]])', r'\1', cleaned) + parsed = json.loads(cleaned) + if not isinstance(parsed, dict): + raise RuntimeError('Expected root object in JSONC config.') + return parsed + + +def wait_for_file(path: Path, timeout_sec: int) -> None: + deadline = time.time() + timeout_sec + while time.time() < deadline: + if path.exists(): + return + time.sleep(0.5) + raise E2EFailure(f'Expected file to exist within timeout: {path}') + + +def read_session_title_from_db(db_path: Path, session_id: str) -> str | None: + if not db_path.exists(): + return None + snapshot_dir = Path(tempfile.mkdtemp(prefix='opencode-sync-db-read-')) + snapshot_db = snapshot_dir / db_path.name + shutil.copy2(db_path, snapshot_db) + for suffix in ('-wal', '-shm'): + source_sidecar = Path(f'{db_path}{suffix}') + if source_sidecar.exists(): + shutil.copy2(source_sidecar, Path(f'{snapshot_db}{suffix}')) + conn = sqlite3.connect(str(snapshot_db)) + try: + cursor = conn.execute('SELECT title FROM "session" WHERE id = ?', (session_id,)) + row = cursor.fetchone() + if row is None: + return None + value = row[0] + return str(value) if value is not None else None + finally: + conn.close() + shutil.rmtree(snapshot_dir, ignore_errors=True) + + +def wait_for_db_session_title( + *, + db_path: Path, + session_id: str, + expected_title: str, + timeout_sec: int, + label: str, +) -> None: + deadline = time.time() + timeout_sec + last_title: str | None = None + while time.time() < deadline: + last_title = read_session_title_from_db(db_path, session_id) + if last_title == expected_title: + return + time.sleep(1) + + raise E2EFailure( + f'{label} failed: expected session title "{expected_title}" for {session_id} in {db_path}, ' + f'last observed title={last_title!r}' + ) + + def get_default_branch(full_repo: str) -> str: endpoint = f'repos/{full_repo}' response = run_cmd(['gh', 'api', endpoint, '--jq', '.default_branch']) @@ -690,6 +882,8 @@ def run_e2e(args: argparse.Namespace) -> int: real_home = Path(os.environ.get('HOME', str(Path.home()))) gh_token, detected_owner = preflight(real_home) owner = args.owner or detected_owner + enable_sessions = bool(args.enable_sessions) + enable_secrets = bool(args.enable_secrets or enable_sessions) if args.preflight_only: log('Preflight-only mode complete.') return 0 @@ -758,6 +952,10 @@ def run_e2e(args: argparse.Namespace) -> int: 'head': baseline_state.head, 'status_count': len(baseline_state.status_lines), }, + 'sync_flags': { + 'includeSecrets': enable_secrets, + 'includeSessions': enable_sessions, + }, 'status': 'running', } @@ -800,6 +998,52 @@ def run_e2e(args: argparse.Namespace) -> int: raise E2EFailure(f'sync-init completed but repo was not created: {full_repo}') branch = get_default_branch(full_repo) + synced_session_id: str | None = None + session_title_after_link: str | None = None + session_title_after_pull: str | None = None + machine_a_local_db = machine_a.xdg_data_home / 'opencode' / 'opencode.db' + machine_b_local_db = machine_b.xdg_data_home / 'opencode' / 'opencode.db' + machine_b_repo_db = ( + machine_b.xdg_data_home / 'opencode' / 'opencode-synced' / 'repo' / 'data' / 'opencode.db' + ) + + if enable_secrets: + print_banner('Enable secrets/session sync options on machine A') + machine_a_sync_config = machine_a.opencode_config_root / 'opencode-synced.jsonc' + updated_config = update_sync_config_flags( + machine_a_sync_config, + include_secrets=enable_secrets, + include_sessions=enable_sessions, + ) + summary['machine_a_sync_config'] = { + 'path': str(machine_a_sync_config), + 'includeSecrets': bool(updated_config.get('includeSecrets')), + 'includeSessions': bool(updated_config.get('includeSessions')), + } + + if enable_sessions: + print_banner('Create + rename synced session on machine A') + initial_title = f'opencode-sync-e2e session initial ({run_id})' + session_title_after_link = f'opencode-sync-e2e session linked ({run_id})' + session_title_after_pull = f'opencode-sync-e2e session pulled ({run_id})' + synced_session_id = create_named_session(client_a, initial_title) + rename_session(client_a, synced_session_id, session_title_after_link) + wait_for_file(machine_a_local_db, timeout_sec=args.timeout_sec) + wait_for_db_session_title( + db_path=machine_a_local_db, + session_id=synced_session_id, + expected_title=session_title_after_link, + timeout_sec=args.timeout_sec, + label='machine-a session rename before push #1', + ) + summary['session_sync'] = { + 'synced_session_id': synced_session_id, + 'title_after_link': session_title_after_link, + 'title_after_pull': session_title_after_pull, + 'machine_a_db': str(machine_a_local_db), + 'machine_b_db': str(machine_b_local_db), + 'machine_b_repo_db': str(machine_b_repo_db), + } sentinel1 = f'opencode-sync-e2e sentinel 1 ({run_id})' sentinel2 = f'opencode-sync-e2e sentinel 2 ({run_id})' @@ -823,28 +1067,42 @@ def run_e2e(args: argparse.Namespace) -> int: wait_for_remote_sentinel(full_repo, branch, sentinel1, timeout_sec=args.timeout_sec) print_banner('sync-link on machine B') - run_and_validate_command( - client=client_b, - session_id=session_b, - command='sync-link', - arguments=repo_name, - timeout_sec=args.timeout_sec, - result_path=results_dir / 'machine-b-sync-link.json', - active_repo_root=repo_root, - baseline_state=baseline_state, - label='sync-link on machine B', - ) - machine_b_sync_config = machine_b.opencode_config_root / 'opencode-synced.jsonc' - if not machine_b_sync_config.exists(): - raise E2EFailure(f'sync-link did not produce expected config file: {machine_b_sync_config}') - if not file_contains(machine_b_sync_config, f'\"name\": \"{repo_name}\"'): - preview = machine_b_sync_config.read_text(encoding='utf-8', errors='replace') - raise E2EFailure( - 'sync-link bound machine B to an unexpected repo.\n' - f'Expected repo name: {repo_name}\n' - f'Config path: {machine_b_sync_config}\n' - f'Config contents:\n{preview}' + max_link_attempts = 3 + for attempt in range(1, max_link_attempts + 1): + suffix = '' if attempt == 1 else f'-attempt-{attempt}' + run_and_validate_command( + client=client_b, + session_id=session_b, + command='sync-link', + arguments=repo_name, + timeout_sec=args.timeout_sec, + result_path=results_dir / f'machine-b-sync-link{suffix}.json', + active_repo_root=repo_root, + baseline_state=baseline_state, + label=f'sync-link on machine B (attempt {attempt})', + ) + + if machine_b_sync_config.exists() and file_contains(machine_b_sync_config, f'\"name\": \"{repo_name}\"'): + break + + if attempt == max_link_attempts: + if not machine_b_sync_config.exists(): + raise E2EFailure(f'sync-link did not produce expected config file: {machine_b_sync_config}') + preview = machine_b_sync_config.read_text(encoding='utf-8', errors='replace') + raise E2EFailure( + 'sync-link bound machine B to an unexpected repo.\n' + f'Expected repo name: {repo_name}\n' + f'Config path: {machine_b_sync_config}\n' + f'Config contents:\n{preview}' + ) + + preview = '' + if machine_b_sync_config.exists(): + preview = machine_b_sync_config.read_text(encoding='utf-8', errors='replace') + log( + 'WARNING: sync-link did not bind to the expected repo on this attempt. ' + f'Retrying with explicit repo argument.\nConfig preview:\n{preview}' ) agents_b = machine_b.opencode_config_root / 'AGENTS.md' @@ -862,6 +1120,40 @@ def run_e2e(args: argparse.Namespace) -> int: 'local sync repo contains sentinel1 and replication is confirmed.' ) + if enable_sessions: + if not synced_session_id or not session_title_after_link: + raise E2EFailure('Session sync validation state is missing after sync-link.') + print_banner('Verify session sync on machine B after sync-link') + wait_for_file(machine_b_repo_db, timeout_sec=args.timeout_sec) + wait_for_db_session_title( + db_path=machine_b_repo_db, + session_id=synced_session_id, + expected_title=session_title_after_link, + timeout_sec=args.timeout_sec, + label='machine-b repo session title after sync-link', + ) + local_title_after_link = read_session_title_from_db(machine_b_local_db, synced_session_id) + if local_title_after_link != session_title_after_link: + log( + 'WARNING: machine-b local session DB did not immediately reflect synced title after sync-link. ' + 'This is expected while opencode is running; restart is required for session visibility.' + ) + if isinstance(summary.get('session_sync'), dict): + summary['session_sync']['machine_b_local_title_after_link'] = local_title_after_link + + if enable_sessions: + if not synced_session_id or not session_title_after_pull: + raise E2EFailure('Session sync validation state is missing before second push.') + print_banner('Rename synced session on machine A before second push') + rename_session(client_a, synced_session_id, session_title_after_pull) + wait_for_db_session_title( + db_path=machine_a_local_db, + session_id=synced_session_id, + expected_title=session_title_after_pull, + timeout_sec=args.timeout_sec, + label='machine-a session rename before push #2', + ) + print_banner('sync-push sentinel 2 from machine A') append_sentinel(agents_a, sentinel2) run_and_validate_command( @@ -896,6 +1188,26 @@ def run_e2e(args: argparse.Namespace) -> int: 'local sync repo contains sentinel2 and replication is confirmed.' ) + if enable_sessions: + if not synced_session_id or not session_title_after_pull: + raise E2EFailure('Session sync validation state is missing after second pull.') + print_banner('Verify session sync on machine B after sync-pull') + wait_for_db_session_title( + db_path=machine_b_repo_db, + session_id=synced_session_id, + expected_title=session_title_after_pull, + timeout_sec=args.timeout_sec, + label='machine-b repo session title after sync-pull', + ) + local_title_after_pull = read_session_title_from_db(machine_b_local_db, synced_session_id) + if local_title_after_pull != session_title_after_pull: + log( + 'WARNING: machine-b local session DB did not immediately reflect synced title after sync-pull. ' + 'This is expected while opencode is running; restart is required for session visibility.' + ) + if isinstance(summary.get('session_sync'), dict): + summary['session_sync']['machine_b_local_title_after_pull'] = local_title_after_pull + assert_git_state_unchanged(repo_root, baseline_state, 'after E2E flow') summary['status'] = 'passed' diff --git a/src/sync/apply.test.ts b/src/sync/apply.test.ts new file mode 100644 index 0000000..97fd212 --- /dev/null +++ b/src/sync/apply.test.ts @@ -0,0 +1,278 @@ +import { promises as fs } from 'node:fs'; +import { tmpdir } from 'node:os'; +import path from 'node:path'; +import { describe, expect, it } from 'vitest'; +import { syncLocalToRepo, syncRepoToLocal } from './apply.js'; +import type { ExtraPathPlan, SyncItem, SyncPlan } from './paths.js'; + +const EMPTY_EXTRA_PLAN: ExtraPathPlan = { + allowlist: [], + manifestPath: '', + entries: [], +}; + +function createPlan(repoRoot: string, homeDir: string, items: SyncItem[]): SyncPlan { + return { + items, + extraSecrets: { + ...EMPTY_EXTRA_PLAN, + manifestPath: path.join(repoRoot, 'secrets', 'extra.json'), + }, + extraConfigs: { + ...EMPTY_EXTRA_PLAN, + manifestPath: path.join(repoRoot, 'config', 'extra.json'), + }, + repoRoot, + homeDir, + platform: 'linux', + }; +} + +async function withTempDir(run: (root: string) => Promise): Promise { + const root = await fs.mkdtemp(path.join(tmpdir(), 'opencode-sync-apply-')); + try { + await run(root); + } finally { + await fs.rm(root, { recursive: true, force: true }); + } +} + +describe('syncLocalToRepo preserveWhenMissing', () => { + it('copies updated opencode.db from local to repo when present', async () => { + await withTempDir(async (root) => { + const repoRoot = path.join(root, 'repo'); + const localRoot = path.join(root, 'local'); + const repoDbPath = path.join(repoRoot, 'data', 'opencode.db'); + const localDbPath = path.join(localRoot, 'opencode.db'); + await fs.mkdir(path.dirname(repoDbPath), { recursive: true }); + await fs.mkdir(path.dirname(localDbPath), { recursive: true }); + await fs.writeFile(repoDbPath, 'old-db-content', 'utf8'); + await fs.writeFile(localDbPath, 'new-db-content', 'utf8'); + + const plan = createPlan(repoRoot, localRoot, [ + { + localPath: localDbPath, + repoPath: repoDbPath, + type: 'file', + isSecret: true, + isConfigFile: false, + preserveWhenMissing: true, + }, + ]); + + await syncLocalToRepo(plan, null); + + const content = await fs.readFile(repoDbPath, 'utf8'); + expect(content).toBe('new-db-content'); + }); + }); + + it('copies sqlite sidecars with opencode.db when present', async () => { + await withTempDir(async (root) => { + const repoRoot = path.join(root, 'repo'); + const localRoot = path.join(root, 'local'); + const repoDbPath = path.join(repoRoot, 'data', 'opencode.db'); + const localDbPath = path.join(localRoot, 'opencode.db'); + await fs.mkdir(path.dirname(repoDbPath), { recursive: true }); + await fs.mkdir(path.dirname(localDbPath), { recursive: true }); + await fs.writeFile(localDbPath, 'new-db-content', 'utf8'); + await fs.writeFile(`${localDbPath}-wal`, 'new-wal-content', 'utf8'); + await fs.writeFile(`${localDbPath}-shm`, 'new-shm-content', 'utf8'); + + const plan = createPlan(repoRoot, localRoot, [ + { + localPath: localDbPath, + repoPath: repoDbPath, + type: 'file', + isSecret: true, + isConfigFile: false, + preserveWhenMissing: true, + }, + ]); + + await syncLocalToRepo(plan, null); + + await expect(fs.readFile(repoDbPath, 'utf8')).resolves.toBe('new-db-content'); + await expect(fs.readFile(`${repoDbPath}-wal`, 'utf8')).resolves.toBe('new-wal-content'); + await expect(fs.readFile(`${repoDbPath}-shm`, 'utf8')).resolves.toBe('new-shm-content'); + }); + }); + + it('keeps repo opencode.db when local file is missing', async () => { + await withTempDir(async (root) => { + const repoRoot = path.join(root, 'repo'); + const localRoot = path.join(root, 'local'); + const repoDbPath = path.join(repoRoot, 'data', 'opencode.db'); + await fs.mkdir(path.dirname(repoDbPath), { recursive: true }); + await fs.writeFile(repoDbPath, 'remote-db-content', 'utf8'); + await fs.writeFile(`${repoDbPath}-wal`, 'remote-wal-content', 'utf8'); + await fs.writeFile(`${repoDbPath}-shm`, 'remote-shm-content', 'utf8'); + + const plan = createPlan(repoRoot, localRoot, [ + { + localPath: path.join(localRoot, 'opencode.db'), + repoPath: repoDbPath, + type: 'file', + isSecret: true, + isConfigFile: false, + preserveWhenMissing: true, + }, + ]); + + await syncLocalToRepo(plan, null); + + const content = await fs.readFile(repoDbPath, 'utf8'); + expect(content).toBe('remote-db-content'); + await expect(fs.readFile(`${repoDbPath}-wal`, 'utf8')).resolves.toBe('remote-wal-content'); + await expect(fs.readFile(`${repoDbPath}-shm`, 'utf8')).resolves.toBe('remote-shm-content'); + }); + }); + + it('removes stale sqlite sidecars when local opencode.db has none', async () => { + await withTempDir(async (root) => { + const repoRoot = path.join(root, 'repo'); + const localRoot = path.join(root, 'local'); + const repoDbPath = path.join(repoRoot, 'data', 'opencode.db'); + const localDbPath = path.join(localRoot, 'opencode.db'); + await fs.mkdir(path.dirname(repoDbPath), { recursive: true }); + await fs.mkdir(path.dirname(localDbPath), { recursive: true }); + await fs.writeFile(repoDbPath, 'old-db-content', 'utf8'); + await fs.writeFile(`${repoDbPath}-wal`, 'stale-wal-content', 'utf8'); + await fs.writeFile(`${repoDbPath}-shm`, 'stale-shm-content', 'utf8'); + await fs.writeFile(localDbPath, 'fresh-db-content', 'utf8'); + + const plan = createPlan(repoRoot, localRoot, [ + { + localPath: localDbPath, + repoPath: repoDbPath, + type: 'file', + isSecret: true, + isConfigFile: false, + preserveWhenMissing: true, + }, + ]); + + await syncLocalToRepo(plan, null); + + await expect(fs.readFile(repoDbPath, 'utf8')).resolves.toBe('fresh-db-content'); + await expect(fs.stat(`${repoDbPath}-wal`)).rejects.toMatchObject({ code: 'ENOENT' }); + await expect(fs.stat(`${repoDbPath}-shm`)).rejects.toMatchObject({ code: 'ENOENT' }); + }); + }); + + it('keeps repo legacy session directory when local directory is missing', async () => { + await withTempDir(async (root) => { + const repoRoot = path.join(root, 'repo'); + const localRoot = path.join(root, 'local'); + const repoSessionPath = path.join(repoRoot, 'data', 'storage', 'session'); + const repoSessionFile = path.join(repoSessionPath, 'session-1.json'); + await fs.mkdir(repoSessionPath, { recursive: true }); + await fs.writeFile(repoSessionFile, '{"id":"session-1"}', 'utf8'); + + const plan = createPlan(repoRoot, localRoot, [ + { + localPath: path.join(localRoot, 'storage', 'session'), + repoPath: repoSessionPath, + type: 'dir', + isSecret: true, + isConfigFile: false, + preserveWhenMissing: true, + }, + ]); + + await syncLocalToRepo(plan, null); + + const content = await fs.readFile(repoSessionFile, 'utf8'); + expect(content).toBe('{"id":"session-1"}'); + }); + }); + + it('still deletes non-session items when local source is missing', async () => { + await withTempDir(async (root) => { + const repoRoot = path.join(root, 'repo'); + const localRoot = path.join(root, 'local'); + const repoFilePath = path.join(repoRoot, 'data', 'auth.json'); + await fs.mkdir(path.dirname(repoFilePath), { recursive: true }); + await fs.writeFile(repoFilePath, '{"token":"value"}', 'utf8'); + + const plan = createPlan(repoRoot, localRoot, [ + { + localPath: path.join(localRoot, 'auth.json'), + repoPath: repoFilePath, + type: 'file', + isSecret: true, + isConfigFile: false, + }, + ]); + + await syncLocalToRepo(plan, null); + + await expect(fs.stat(repoFilePath)).rejects.toMatchObject({ code: 'ENOENT' }); + }); + }); +}); + +describe('syncRepoToLocal for session database', () => { + it('copies opencode.db and sqlite sidecars from repo to local', async () => { + await withTempDir(async (root) => { + const repoRoot = path.join(root, 'repo'); + const localRoot = path.join(root, 'local'); + const repoDbPath = path.join(repoRoot, 'data', 'opencode.db'); + const localDbPath = path.join(localRoot, 'opencode.db'); + await fs.mkdir(path.dirname(repoDbPath), { recursive: true }); + await fs.writeFile(repoDbPath, 'repo-db-content', 'utf8'); + await fs.writeFile(`${repoDbPath}-wal`, 'repo-wal-content', 'utf8'); + await fs.writeFile(`${repoDbPath}-shm`, 'repo-shm-content', 'utf8'); + + const plan = createPlan(repoRoot, localRoot, [ + { + localPath: localDbPath, + repoPath: repoDbPath, + type: 'file', + isSecret: true, + isConfigFile: false, + preserveWhenMissing: true, + }, + ]); + + await syncRepoToLocal(plan, null); + + const content = await fs.readFile(localDbPath, 'utf8'); + expect(content).toBe('repo-db-content'); + await expect(fs.readFile(`${localDbPath}-wal`, 'utf8')).resolves.toBe('repo-wal-content'); + await expect(fs.readFile(`${localDbPath}-shm`, 'utf8')).resolves.toBe('repo-shm-content'); + }); + }); + + it('removes stale local sqlite sidecars when repo opencode.db has none', async () => { + await withTempDir(async (root) => { + const repoRoot = path.join(root, 'repo'); + const localRoot = path.join(root, 'local'); + const repoDbPath = path.join(repoRoot, 'data', 'opencode.db'); + const localDbPath = path.join(localRoot, 'opencode.db'); + await fs.mkdir(path.dirname(repoDbPath), { recursive: true }); + await fs.mkdir(path.dirname(localDbPath), { recursive: true }); + await fs.writeFile(repoDbPath, 'repo-db-content', 'utf8'); + await fs.writeFile(localDbPath, 'old-db-content', 'utf8'); + await fs.writeFile(`${localDbPath}-wal`, 'stale-wal-content', 'utf8'); + await fs.writeFile(`${localDbPath}-shm`, 'stale-shm-content', 'utf8'); + + const plan = createPlan(repoRoot, localRoot, [ + { + localPath: localDbPath, + repoPath: repoDbPath, + type: 'file', + isSecret: true, + isConfigFile: false, + preserveWhenMissing: true, + }, + ]); + + await syncRepoToLocal(plan, null); + + await expect(fs.readFile(localDbPath, 'utf8')).resolves.toBe('repo-db-content'); + await expect(fs.stat(`${localDbPath}-wal`)).rejects.toMatchObject({ code: 'ENOENT' }); + await expect(fs.stat(`${localDbPath}-shm`)).rejects.toMatchObject({ code: 'ENOENT' }); + }); + }); +}); diff --git a/src/sync/apply.ts b/src/sync/apply.ts index 98bc65e..90b75b4 100644 --- a/src/sync/apply.ts +++ b/src/sync/apply.ts @@ -21,6 +21,9 @@ import { normalizePath } from './paths.js'; type ExtraPathType = 'file' | 'dir'; +const SESSION_DB_NAME = 'opencode.db'; +const SESSION_DB_SIDECAR_SUFFIXES = ['-wal', '-shm'] as const; + interface ExtraPathManifestItem { relativePath: string; type: ExtraPathType; @@ -94,11 +97,13 @@ export async function syncLocalToRepo( for (const item of plan.items) { if (item.isConfigFile) { const sanitized = sanitizedConfigs.get(item.localPath); - await copyConfigForRepo(item, overridesForStrip, plan.repoRoot, sanitized); + await copyConfigForRepo(item, overridesForStrip, plan.repoRoot, sanitized, { + removeWhenMissing: !item.preserveWhenMissing, + }); continue; } - await copyItem(item.localPath, item.repoPath, item.type, true); + await copyItem(item.localPath, item.repoPath, item.type, !item.preserveWhenMissing); } await writeExtraPathManifest(plan, plan.extraConfigs); @@ -111,6 +116,15 @@ async function copyItem( type: SyncItem['type'], removeWhenMissing = false ): Promise { + if ( + type === 'file' && + (path.basename(sourcePath) === SESSION_DB_NAME || + path.basename(destinationPath) === SESSION_DB_NAME) + ) { + await copySessionDbBundle(sourcePath, destinationPath, removeWhenMissing); + return; + } + if (!(await pathExists(sourcePath))) { if (removeWhenMissing) { await removePath(destinationPath); @@ -131,10 +145,14 @@ async function copyConfigForRepo( item: SyncItem, overrides: Record | null, repoRoot: string, - configOverride?: Record + configOverride?: Record, + options: { removeWhenMissing?: boolean } = {} ): Promise { + const removeWhenMissing = options.removeWhenMissing ?? true; if (!(await pathExists(item.localPath))) { - await removePath(item.repoPath); + if (removeWhenMissing) { + await removePath(item.repoPath); + } return; } @@ -198,6 +216,38 @@ async function copyFileWithMode(sourcePath: string, destinationPath: string): Pr await chmodIfExists(destinationPath, stat.mode & 0o777); } +async function copySessionDbBundle( + sourceDbPath: string, + destinationDbPath: string, + removeWhenMissing: boolean +): Promise { + if (!(await pathExists(sourceDbPath))) { + if (removeWhenMissing) { + await removePath(destinationDbPath); + await removeSessionDbSidecars(destinationDbPath); + } + return; + } + + await copyFileWithMode(sourceDbPath, destinationDbPath); + + for (const suffix of SESSION_DB_SIDECAR_SUFFIXES) { + const sourceSidecarPath = `${sourceDbPath}${suffix}`; + const destinationSidecarPath = `${destinationDbPath}${suffix}`; + if (await pathExists(sourceSidecarPath)) { + await copyFileWithMode(sourceSidecarPath, destinationSidecarPath); + continue; + } + await removePath(destinationSidecarPath); + } +} + +async function removeSessionDbSidecars(dbPath: string): Promise { + for (const suffix of SESSION_DB_SIDECAR_SUFFIXES) { + await removePath(`${dbPath}${suffix}`); + } +} + async function copyDirRecursive(sourcePath: string, destinationPath: string): Promise { const stat = await fs.stat(sourcePath); await fs.mkdir(destinationPath, { recursive: true }); diff --git a/src/sync/paths.test.ts b/src/sync/paths.test.ts index 9ede236..4a0bfbd 100644 --- a/src/sync/paths.test.ts +++ b/src/sync/paths.test.ts @@ -105,6 +105,32 @@ describe('buildSyncPlan', () => { expect(plan.extraConfigs.allowlist.length).toBe(1); }); + it('includes sqlite and legacy session paths when includeSessions is true', () => { + const env = { HOME: '/home/test' } as NodeJS.ProcessEnv; + const locations = resolveSyncLocations(env, 'linux'); + const config: SyncConfig = { + repo: { owner: 'acme', name: 'config' }, + includeSecrets: true, + includeSessions: true, + }; + + const plan = buildSyncPlan(normalizeSyncConfig(config), locations, '/repo', 'linux'); + const expectedSessionPaths = [ + '/.local/share/opencode/opencode.db', + '/.local/share/opencode/storage/session', + '/.local/share/opencode/storage/message', + '/.local/share/opencode/storage/part', + '/.local/share/opencode/storage/session_diff', + ]; + + for (const suffix of expectedSessionPaths) { + const sessionItem = plan.items.find((item) => item.localPath.endsWith(suffix)); + expect(sessionItem).toBeTruthy(); + expect(sessionItem?.isSecret).toBe(true); + expect(sessionItem?.preserveWhenMissing).toBe(true); + } + }); + it('excludes auth files when using 1password backend', () => { const env = { HOME: '/home/test' } as NodeJS.ProcessEnv; const locations = resolveSyncLocations(env, 'linux'); diff --git a/src/sync/paths.ts b/src/sync/paths.ts index 995118d..958dace 100644 --- a/src/sync/paths.ts +++ b/src/sync/paths.ts @@ -27,6 +27,7 @@ export interface SyncItem { type: SyncItemType; isSecret: boolean; isConfigFile: boolean; + preserveWhenMissing?: boolean; } export interface ExtraPathPlan { @@ -53,6 +54,7 @@ const DEFAULT_STATE_NAME = 'sync-state.json'; const CONFIG_DIRS = ['agent', 'command', 'mode', 'tool', 'themes', 'plugin']; const SESSION_DIRS = ['storage/session', 'storage/message', 'storage/part', 'storage/session_diff']; +const SESSION_DB_FILE = 'opencode.db'; const PROMPT_STASH_FILES = ['prompt-stash.jsonl', 'prompt-history.jsonl']; const MODEL_FAVORITES_FILE = 'model.json'; @@ -245,6 +247,15 @@ export function buildSyncPlan( } if (config.includeSessions) { + items.push({ + localPath: path.join(dataRoot, SESSION_DB_FILE), + repoPath: path.join(repoDataRoot, SESSION_DB_FILE), + type: 'file', + isSecret: true, + isConfigFile: false, + preserveWhenMissing: true, + }); + for (const dirName of SESSION_DIRS) { items.push({ localPath: path.join(dataRoot, dirName), @@ -252,6 +263,7 @@ export function buildSyncPlan( type: 'dir', isSecret: true, isConfigFile: false, + preserveWhenMissing: true, }); } } From b93480bf8ad306cb18423b669163bb288048d1d9 Mon Sep 17 00:00:00 2001 From: Ian Hildebrand <25069719+iHildy@users.noreply.github.com> Date: Sun, 12 Apr 2026 19:20:48 -0700 Subject: [PATCH 2/3] feat: add Turso session backend for concurrent-safe sync (#54) * feat: add Turso session backend with setup and migration commands * fix: make sync-link repo selection deterministic for e2e (#55) * fix: make sync-link repo selection deterministic for e2e * fix: harden turso runtime session sync safety * fix: harden repo reference parsing for PR feedback * refactor: reduce turso sync io overhead * fix: harden turso session sync transport and timeouts --- README.md | 62 +- scripts/e2e/github_two_instance.py | 164 ++- src/command/sync-link.md | 2 +- src/command/sync-sessions-backend.md | 13 + src/command/sync-sessions-cleanup-git.md | 7 + src/command/sync-sessions-migrate-turso.md | 11 + src/command/sync-sessions-setup-turso.md | 10 + src/index.ts | 42 + src/sync/config.test.ts | 68 + src/sync/config.ts | 64 + src/sync/paths.test.ts | 25 + src/sync/paths.ts | 4 +- src/sync/repo.test.ts | 56 +- src/sync/repo.ts | 93 +- src/sync/service.ts | 730 ++++++++++- src/sync/turso.test.ts | 151 +++ src/sync/turso.ts | 1369 ++++++++++++++++++++ 17 files changed, 2824 insertions(+), 47 deletions(-) create mode 100644 src/command/sync-sessions-backend.md create mode 100644 src/command/sync-sessions-cleanup-git.md create mode 100644 src/command/sync-sessions-migrate-turso.md create mode 100644 src/command/sync-sessions-setup-turso.md create mode 100644 src/sync/turso.test.ts create mode 100644 src/sync/turso.ts diff --git a/README.md b/README.md index 878676a..a3c5e19 100644 --- a/README.md +++ b/README.md @@ -75,6 +75,13 @@ Create `~/.config/opencode/opencode-synced.jsonc`: "includeSecrets": false, "includeMcpSecrets": false, "includeSessions": false, + "sessionBackend": { + "type": "git", + "turso": { + "syncIntervalSec": 15, + "autoSetup": true, + }, + }, "includePromptStash": false, "includeModelFavorites": true, "extraSecretPaths": [], @@ -105,27 +112,56 @@ in a private repo, set `"includeMcpSecrets": true` (requires `includeSecrets`). ### Sessions (private repos only) -Sync your opencode sessions (conversation history from `/sessions`) across machines by setting `"includeSessions": true`. This requires `includeSecrets` to also be enabled since sessions may contain sensitive data. +Session sync remains opt-in via `"includeSessions": true` (and requires `"includeSecrets": true`). +Session backend defaults to Git for backward compatibility. Turso is recommended for users running +multiple active machines concurrently. ```jsonc { "repo": { ... }, "includeSecrets": true, - "includeSessions": true + "includeSessions": true, + "sessionBackend": { + "type": "git", // or "turso" + "turso": { + "database": "my-opencode-config-sessions", // optional + "url": "libsql://...", // optional + "syncIntervalSec": 15, // default 15 + "autoSetup": true, // default true + }, + }, } ``` -Synced session data: +#### Git backend (`sessionBackend.type = "git"`, default) + +Best-effort session artifact sync via Git paths: + +- `~/.local/share/opencode/opencode.db` +- `~/.local/share/opencode/opencode.db-wal` and `~/.local/share/opencode/opencode.db-shm` +- `~/.local/share/opencode/storage/session/` +- `~/.local/share/opencode/storage/message/` +- `~/.local/share/opencode/storage/part/` +- `~/.local/share/opencode/storage/session_diff/` + +This mode can conflict with concurrent writers. + +#### Turso backend (`sessionBackend.type = "turso"`) + +Concurrent-safe snapshot backend for sessions: + +- Session artifacts are **not** synced through Git paths. +- Config + secrets continue using the normal Git sync flow. +- Startup performs a Turso session pull before regular config sync. +- Background loop runs `pull -> push -> pull` on the configured interval. +- Manual `/sync-pull` and `/sync-push` trigger a foreground session sync cycle too. -- `~/.local/share/opencode/opencode.db` - Primary SQLite session database (current opencode versions) -- `~/.local/share/opencode/opencode.db-wal` and `~/.local/share/opencode/opencode.db-shm` - SQLite sidecars synced with `opencode.db` when present (for WAL consistency) -- `~/.local/share/opencode/storage/session/` - Legacy session files (pre-SQLite migration compatibility) -- `~/.local/share/opencode/storage/message/` - Legacy message history (pre-SQLite migration compatibility) -- `~/.local/share/opencode/storage/part/` - Legacy message parts (pre-SQLite migration compatibility) -- `~/.local/share/opencode/storage/session_diff/` - Session diffs +Turso setup is machine-local and idempotent: -opencode handles JSON-to-SQLite migration automatically when `opencode.db` is missing, so syncing both -formats supports users who have not migrated yet and users already on SQLite. +- Auto-installs Turso CLI when needed (best effort). +- Runs headless Turso login flow when needed. +- Creates/reuses the Turso database + token. +- Stores credentials in a local machine-only file (`0600`) outside the sync repo. After pulling session changes, restart opencode to ensure the latest session state is loaded. @@ -182,6 +218,10 @@ Env var naming rules: | `/sync-pull` | Fetch and apply remote config | | `/sync-push` | Commit and push local changes | | `/sync-enable-secrets` | Enable secrets sync (private repos only) | +| `/sync-sessions-backend ` | Switch session backend | +| `/sync-sessions-setup-turso` | Install/auth/provision Turso on this machine | +| `/sync-sessions-migrate-turso` | Bootstrap + switch from Git session sync to Turso | +| `/sync-sessions-cleanup-git` | Remove deprecated Git session artifacts after migration | | `/sync-resolve` | Auto-resolve uncommitted changes using AI |
diff --git a/scripts/e2e/github_two_instance.py b/scripts/e2e/github_two_instance.py index e757d76..9f51f75 100755 --- a/scripts/e2e/github_two_instance.py +++ b/scripts/e2e/github_two_instance.py @@ -71,7 +71,11 @@ class ServerInstance: model: str gh_token: str real_git_config: Path | None + real_home: Path + real_xdg_config: Path real_xdg_data: Path + strict_link_repo: str | None = None + disable_auto_repo_discovery: bool = False process: subprocess.Popen[str] | None = None process_group_id: int | None = None @@ -130,6 +134,22 @@ def prepare_filesystem(self) -> None: if source.exists(): destination.write_bytes(source.read_bytes()) + # Seed Turso CLI auth state from the real machine into the isolated sandbox. + turso_source_dirs = [ + self.real_xdg_config / 'turso', + self.real_home / 'Library' / 'Application Support' / 'turso', + ] + turso_destination_dirs = [ + self.xdg_config_home / 'turso', + self.home / 'Library' / 'Application Support' / 'turso', + ] + for source_dir in turso_source_dirs: + if not source_dir.exists() or not source_dir.is_dir(): + continue + for destination_dir in turso_destination_dirs: + destination_dir.parent.mkdir(parents=True, exist_ok=True) + shutil.copytree(source_dir, destination_dir, dirs_exist_ok=True) + config_payload = { '$schema': 'https://opencode.ai/config.json', 'model': self.model, @@ -156,6 +176,10 @@ def start(self, logs_dir: Path) -> None: env['GH_TOKEN'] = self.gh_token if self.real_git_config: env['GIT_CONFIG_GLOBAL'] = str(self.real_git_config) + if self.disable_auto_repo_discovery: + env['OPENCODE_SYNC_E2E_DISABLE_AUTO_REPO_DISCOVERY'] = '1' + if self.strict_link_repo: + env['OPENCODE_SYNC_E2E_STRICT_LINK_REPO'] = self.strict_link_repo command = [ 'opencode', @@ -349,6 +373,12 @@ def parse_args() -> argparse.Namespace: action='store_true', help='Enable includeSessions=true (implies includeSecrets) and run session-db assertions.', ) + parser.add_argument( + '--session-backend', + choices=['git', 'turso'], + default='git', + help='Session backend to validate when --enable-sessions is enabled.', + ) return parser.parse_args() @@ -573,6 +603,22 @@ def run_command(client: ApiClient, session_id: str, command: str, arguments: str return payload +def seed_sync_link_repo_instruction(client: ApiClient, session_id: str, full_repo: str) -> None: + instruction = ( + 'For this E2E run, always use the exact GitHub repo ' + f'"{full_repo}" for sync-link commands. ' + 'Do not auto-discover or substitute any default repo names.' + ) + client.post_json( + f'/session/{urllib.parse.quote(session_id)}/prompt_async', + { + 'noReply': True, + 'parts': [{'type': 'text', 'text': instruction}], + }, + timeout_sec=40, + ) + + def response_error(payload: dict[str, Any]) -> str | None: info = payload.get('info') if not isinstance(info, dict): @@ -616,6 +662,7 @@ def update_sync_config_flags( *, include_secrets: bool, include_sessions: bool, + session_backend: str, ) -> dict[str, Any]: if not config_path.exists(): raise RuntimeError(f'Expected sync config at {config_path}, but it does not exist.') @@ -637,6 +684,9 @@ def update_sync_config_flags( config['includeSecrets'] = True if include_sessions: config['includeSessions'] = True + config['sessionBackend'] = { + 'type': session_backend, + } write_json(config_path, config) return config @@ -884,6 +934,9 @@ def run_e2e(args: argparse.Namespace) -> int: owner = args.owner or detected_owner enable_sessions = bool(args.enable_sessions) enable_secrets = bool(args.enable_secrets or enable_sessions) + session_backend = args.session_backend if enable_sessions else 'git' + if session_backend == 'turso' and not enable_sessions: + raise RuntimeError('--session-backend turso requires --enable-sessions.') if args.preflight_only: log('Preflight-only mode complete.') return 0 @@ -924,6 +977,8 @@ def run_e2e(args: argparse.Namespace) -> int: model=args.model, gh_token=gh_token, real_git_config=(real_home / '.gitconfig') if (real_home / '.gitconfig').exists() else None, + real_home=real_home, + real_xdg_config=Path(os.environ.get('XDG_CONFIG_HOME', str(real_home / '.config'))), real_xdg_data=Path(os.environ.get('XDG_DATA_HOME', str(real_home / '.local' / 'share'))), ) machine_b = ServerInstance( @@ -935,7 +990,11 @@ def run_e2e(args: argparse.Namespace) -> int: model=args.model, gh_token=gh_token, real_git_config=(real_home / '.gitconfig') if (real_home / '.gitconfig').exists() else None, + real_home=real_home, + real_xdg_config=Path(os.environ.get('XDG_CONFIG_HOME', str(real_home / '.config'))), real_xdg_data=Path(os.environ.get('XDG_DATA_HOME', str(real_home / '.local' / 'share'))), + strict_link_repo=full_repo, + disable_auto_repo_discovery=True, ) summary: dict[str, Any] = { @@ -955,6 +1014,7 @@ def run_e2e(args: argparse.Namespace) -> int: 'sync_flags': { 'includeSecrets': enable_secrets, 'includeSessions': enable_sessions, + 'sessionBackend': session_backend, }, 'status': 'running', } @@ -977,7 +1037,9 @@ def run_e2e(args: argparse.Namespace) -> int: session_a = create_session(client_a) session_b = create_session(client_b) + seed_sync_link_repo_instruction(client_b, session_b, full_repo) summary['sessions'] = {'machine_a': session_a, 'machine_b': session_b} + summary['strict_link_repo'] = full_repo log(f'machine-a session: {session_a}') log(f'machine-b session: {session_b}') @@ -1001,6 +1063,7 @@ def run_e2e(args: argparse.Namespace) -> int: synced_session_id: str | None = None session_title_after_link: str | None = None session_title_after_pull: str | None = None + using_turso_backend = enable_sessions and session_backend == 'turso' machine_a_local_db = machine_a.xdg_data_home / 'opencode' / 'opencode.db' machine_b_local_db = machine_b.xdg_data_home / 'opencode' / 'opencode.db' machine_b_repo_db = ( @@ -1014,13 +1077,29 @@ def run_e2e(args: argparse.Namespace) -> int: machine_a_sync_config, include_secrets=enable_secrets, include_sessions=enable_sessions, + session_backend=session_backend, ) summary['machine_a_sync_config'] = { 'path': str(machine_a_sync_config), 'includeSecrets': bool(updated_config.get('includeSecrets')), 'includeSessions': bool(updated_config.get('includeSessions')), + 'sessionBackend': session_backend, } + if using_turso_backend: + print_banner('migrate sessions backend to Turso on machine A') + run_and_validate_command( + client=client_a, + session_id=session_a, + command='sync-sessions-migrate-turso', + arguments='', + timeout_sec=args.timeout_sec, + result_path=results_dir / 'machine-a-sync-sessions-migrate-turso.json', + active_repo_root=repo_root, + baseline_state=baseline_state, + label='sync-sessions-migrate-turso on machine A', + ) + if enable_sessions: print_banner('Create + rename synced session on machine A') initial_title = f'opencode-sync-e2e session initial ({run_id})' @@ -1042,7 +1121,8 @@ def run_e2e(args: argparse.Namespace) -> int: 'title_after_pull': session_title_after_pull, 'machine_a_db': str(machine_a_local_db), 'machine_b_db': str(machine_b_local_db), - 'machine_b_repo_db': str(machine_b_repo_db), + 'session_backend': session_backend, + 'machine_b_repo_db': str(machine_b_repo_db) if not using_turso_backend else None, } sentinel1 = f'opencode-sync-e2e sentinel 1 ({run_id})' @@ -1075,7 +1155,7 @@ def run_e2e(args: argparse.Namespace) -> int: client=client_b, session_id=session_b, command='sync-link', - arguments=repo_name, + arguments=full_repo, timeout_sec=args.timeout_sec, result_path=results_dir / f'machine-b-sync-link{suffix}.json', active_repo_root=repo_root, @@ -1083,7 +1163,9 @@ def run_e2e(args: argparse.Namespace) -> int: label=f'sync-link on machine B (attempt {attempt})', ) - if machine_b_sync_config.exists() and file_contains(machine_b_sync_config, f'\"name\": \"{repo_name}\"'): + if machine_b_sync_config.exists() and file_contains( + machine_b_sync_config, f'\"owner\": \"{owner}\"' + ) and file_contains(machine_b_sync_config, f'\"name\": \"{repo_name}\"'): break if attempt == max_link_attempts: @@ -1092,7 +1174,7 @@ def run_e2e(args: argparse.Namespace) -> int: preview = machine_b_sync_config.read_text(encoding='utf-8', errors='replace') raise E2EFailure( 'sync-link bound machine B to an unexpected repo.\n' - f'Expected repo name: {repo_name}\n' + f'Expected repo: {full_repo}\n' f'Config path: {machine_b_sync_config}\n' f'Config contents:\n{preview}' ) @@ -1124,14 +1206,42 @@ def run_e2e(args: argparse.Namespace) -> int: if not synced_session_id or not session_title_after_link: raise E2EFailure('Session sync validation state is missing after sync-link.') print_banner('Verify session sync on machine B after sync-link') - wait_for_file(machine_b_repo_db, timeout_sec=args.timeout_sec) - wait_for_db_session_title( - db_path=machine_b_repo_db, - session_id=synced_session_id, - expected_title=session_title_after_link, - timeout_sec=args.timeout_sec, - label='machine-b repo session title after sync-link', - ) + if using_turso_backend: + run_and_validate_command( + client=client_b, + session_id=session_b, + command='sync-pull', + arguments='', + timeout_sec=args.timeout_sec, + result_path=results_dir / 'machine-b-sync-pull-after-link.json', + active_repo_root=repo_root, + baseline_state=baseline_state, + label='sync-pull on machine B after sync-link (turso)', + ) + wait_for_file(machine_b_local_db, timeout_sec=args.timeout_sec) + try: + wait_for_db_session_title( + db_path=machine_b_local_db, + session_id=synced_session_id, + expected_title=session_title_after_link, + timeout_sec=args.timeout_sec, + label='machine-b local session title after sync-link (turso)', + ) + except E2EFailure: + log( + 'WARNING: machine-b local session DB did not immediately reflect synced title after ' + 'sync-link in Turso mode. This is expected while opencode is running; restart is ' + 'required for local session visibility.' + ) + else: + wait_for_file(machine_b_repo_db, timeout_sec=args.timeout_sec) + wait_for_db_session_title( + db_path=machine_b_repo_db, + session_id=synced_session_id, + expected_title=session_title_after_link, + timeout_sec=args.timeout_sec, + label='machine-b repo session title after sync-link', + ) local_title_after_link = read_session_title_from_db(machine_b_local_db, synced_session_id) if local_title_after_link != session_title_after_link: log( @@ -1192,13 +1302,29 @@ def run_e2e(args: argparse.Namespace) -> int: if not synced_session_id or not session_title_after_pull: raise E2EFailure('Session sync validation state is missing after second pull.') print_banner('Verify session sync on machine B after sync-pull') - wait_for_db_session_title( - db_path=machine_b_repo_db, - session_id=synced_session_id, - expected_title=session_title_after_pull, - timeout_sec=args.timeout_sec, - label='machine-b repo session title after sync-pull', - ) + if using_turso_backend: + try: + wait_for_db_session_title( + db_path=machine_b_local_db, + session_id=synced_session_id, + expected_title=session_title_after_pull, + timeout_sec=args.timeout_sec, + label='machine-b local session title after sync-pull (turso)', + ) + except E2EFailure: + log( + 'WARNING: machine-b local session DB did not immediately reflect synced title after ' + 'sync-pull in Turso mode. This is expected while opencode is running; restart is ' + 'required for local session visibility.' + ) + else: + wait_for_db_session_title( + db_path=machine_b_repo_db, + session_id=synced_session_id, + expected_title=session_title_after_pull, + timeout_sec=args.timeout_sec, + label='machine-b repo session title after sync-pull', + ) local_title_after_pull = read_session_title_from_db(machine_b_local_db, synced_session_id) if local_title_after_pull != session_title_after_pull: log( diff --git a/src/command/sync-link.md b/src/command/sync-link.md index 8491f78..9c64e64 100644 --- a/src/command/sync-link.md +++ b/src/command/sync-link.md @@ -6,7 +6,7 @@ You MUST call the `opencode_sync` tool with `command="link"`. Do not answer with plain text only. Argument handling: -- If `$ARGUMENTS` is non-empty, pass `repo="$ARGUMENTS"`. +- If `$ARGUMENTS` is non-empty, pass `repo="$ARGUMENTS"` exactly as provided. Do not rewrite or shorten it. - If `$ARGUMENTS` is empty, let the tool auto-discover. Reminder: diff --git a/src/command/sync-sessions-backend.md b/src/command/sync-sessions-backend.md new file mode 100644 index 0000000..0f2ab6f --- /dev/null +++ b/src/command/sync-sessions-backend.md @@ -0,0 +1,13 @@ +--- +description: Switch session sync backend between git and turso +--- + +You MUST call the `opencode_sync` tool with `command="sessions-backend"`. + +Argument handling: +- `$ARGUMENTS` must be either `git` or `turso`. +- Pass `sessionBackend` with that exact value. + +Behavior: +- If backend is `git`, switch to best-effort git session sync. +- If backend is `turso`, run setup unless the user explicitly asked not to. diff --git a/src/command/sync-sessions-cleanup-git.md b/src/command/sync-sessions-cleanup-git.md new file mode 100644 index 0000000..f051872 --- /dev/null +++ b/src/command/sync-sessions-cleanup-git.md @@ -0,0 +1,7 @@ +--- +description: Remove deprecated git session artifacts after Turso migration +--- + +You MUST call the `opencode_sync` tool with `command="sessions-cleanup-git"`. + +Use this only after sessions are running on Turso and the user confirms fallback artifacts are no longer needed. diff --git a/src/command/sync-sessions-migrate-turso.md b/src/command/sync-sessions-migrate-turso.md new file mode 100644 index 0000000..6867060 --- /dev/null +++ b/src/command/sync-sessions-migrate-turso.md @@ -0,0 +1,11 @@ +--- +description: Migrate session sync from git artifacts to Turso backend +--- + +You MUST call the `opencode_sync` tool with `command="sessions-migrate-turso"`. + +Behavior: +- Ensure Turso setup is complete. +- Bootstrap remote Turso sessions from the current local session DB. +- Switch session backend to Turso. +- Preserve existing git session artifacts for temporary fallback. diff --git a/src/command/sync-sessions-setup-turso.md b/src/command/sync-sessions-setup-turso.md new file mode 100644 index 0000000..fd43664 --- /dev/null +++ b/src/command/sync-sessions-setup-turso.md @@ -0,0 +1,10 @@ +--- +description: Install/auth/provision Turso for session sync on this machine +--- + +You MUST call the `opencode_sync` tool with `command="sessions-setup-turso"`. + +Behavior: +- Run Turso CLI install if missing. +- Run headless Turso login when needed. +- Provision/reuse the configured Turso session database and machine-local credential. diff --git a/src/index.ts b/src/index.ts index 1f314ce..dc085ee 100644 --- a/src/index.ts +++ b/src/index.ts @@ -128,6 +128,10 @@ export const opencodeConfigSync: Plugin = async (ctx) => { 'secrets-pull', 'secrets-push', 'secrets-status', + 'sessions-backend', + 'sessions-setup-turso', + 'sessions-migrate-turso', + 'sessions-cleanup-git', ]) .describe('Sync command to execute'), repo: tool.schema.string().optional().describe('Repo owner/name or URL'), @@ -144,6 +148,10 @@ export const opencodeConfigSync: Plugin = async (ctx) => { .boolean() .optional() .describe('Enable session sync (requires includeSecrets)'), + sessionBackend: tool.schema + .enum(['git', 'turso']) + .optional() + .describe('Session sync backend when includeSessions=true'), includePromptStash: tool.schema .boolean() .optional() @@ -157,6 +165,14 @@ export const opencodeConfigSync: Plugin = async (ctx) => { extraSecretPaths: tool.schema.array(tool.schema.string()).optional(), extraConfigPaths: tool.schema.array(tool.schema.string()).optional(), localRepoPath: tool.schema.string().optional().describe('Override local repo path'), + setupTurso: tool.schema + .boolean() + .optional() + .describe('Run Turso setup (install/auth/provision) when Turso backend is selected'), + migrateSessions: tool.schema + .boolean() + .optional() + .describe('Bootstrap remote Turso sessions from local session DB before switching backend'), }, async execute(args) { try { @@ -173,8 +189,11 @@ export const opencodeConfigSync: Plugin = async (ctx) => { includeSecrets: args.includeSecrets, includeMcpSecrets: args.includeMcpSecrets, includeSessions: args.includeSessions, + sessionBackend: args.sessionBackend, includePromptStash: args.includePromptStash, includeModelFavorites: args.includeModelFavorites, + setupTurso: args.setupTurso, + migrateSessions: args.migrateSessions, create: args.create, private: args.private, extraSecretPaths: args.extraSecretPaths, @@ -208,6 +227,26 @@ export const opencodeConfigSync: Plugin = async (ctx) => { includeMcpSecrets: args.includeMcpSecrets, }); } + if (args.command === 'sessions-backend') { + return await service.sessionsBackend({ + backend: args.sessionBackend, + setupTurso: args.setupTurso, + migrateSessions: args.migrateSessions, + }); + } + if (args.command === 'sessions-setup-turso') { + return await service.sessionsSetupTurso({ + forceTokenRefresh: args.setupTurso, + }); + } + if (args.command === 'sessions-migrate-turso') { + return await service.sessionsMigrateTurso({ + setupTurso: args.setupTurso, + }); + } + if (args.command === 'sessions-cleanup-git') { + return await service.sessionsCleanupGit(); + } if (args.command === 'resolve') { return await service.resolve(); } @@ -231,6 +270,9 @@ export const opencodeConfigSync: Plugin = async (ctx) => { tool: { opencode_sync: syncTool, }, + async event(input) { + await service.handleEvent(input.event); + }, async config(config) { config.command = config.command ?? {}; diff --git a/src/sync/config.test.ts b/src/sync/config.test.ts index 66b8894..cd5cfc2 100644 --- a/src/sync/config.test.ts +++ b/src/sync/config.test.ts @@ -8,7 +8,9 @@ import { canCommitMcpSecrets, chmodIfExists, deepMerge, + isTursoSessionBackend, normalizeSecretsBackend, + normalizeSessionBackend, normalizeSyncConfig, parseJsonc, stripOverrides, @@ -84,6 +86,72 @@ describe('normalizeSyncConfig', () => { expect(normalized.extraSecretPaths).toEqual([]); expect(normalized.extraConfigPaths).toEqual([]); }); + + it('defaults session backend to git', () => { + const normalized = normalizeSyncConfig({ includeSessions: true }); + expect(normalized.sessionBackend.type).toBe('git'); + expect(normalized.sessionBackend.turso.syncIntervalSec).toBe(15); + expect(normalized.sessionBackend.turso.autoSetup).toBe(true); + }); + + it('normalizes turso backend settings', () => { + const normalized = normalizeSyncConfig({ + includeSessions: true, + sessionBackend: { + type: 'turso', + turso: { + database: 'my-db', + url: 'libsql://my-db.turso.io', + syncIntervalSec: 8.7, + autoSetup: false, + }, + }, + }); + + expect(normalized.sessionBackend).toEqual({ + type: 'turso', + turso: { + database: 'my-db', + url: 'libsql://my-db.turso.io', + syncIntervalSec: 8, + autoSetup: false, + }, + }); + }); +}); + +describe('normalizeSessionBackend', () => { + it('falls back to git when type is missing or invalid', () => { + expect(normalizeSessionBackend(undefined).type).toBe('git'); + expect( + normalizeSessionBackend({ + type: 'git', + }).type + ).toBe('git'); + }); +}); + +describe('isTursoSessionBackend', () => { + it('requires includeSessions and turso type', () => { + expect( + isTursoSessionBackend({ + includeSessions: false, + sessionBackend: { type: 'turso' }, + }) + ).toBe(false); + expect( + isTursoSessionBackend({ + includeSessions: true, + sessionBackend: { type: 'git' }, + }) + ).toBe(false); + expect( + isTursoSessionBackend({ + includeSessions: true, + sessionBackend: { type: 'turso' }, + }) + ).toBe(true); + }); }); describe('normalizeSecretsBackend', () => { diff --git a/src/sync/config.ts b/src/sync/config.ts index b0d4af5..5438eda 100644 --- a/src/sync/config.ts +++ b/src/sync/config.ts @@ -24,12 +24,39 @@ export interface SecretsBackendConfig { documents?: SecretsBackendDocuments; } +export type SessionBackendType = 'git' | 'turso'; + +export interface TursoSessionBackendSettings { + database?: string; + url?: string; + syncIntervalSec?: number; + autoSetup?: boolean; +} + +export interface SessionBackendConfig { + type?: SessionBackendType; + turso?: TursoSessionBackendSettings; +} + +export interface NormalizedTursoSessionBackendSettings { + database?: string; + url?: string; + syncIntervalSec: number; + autoSetup: boolean; +} + +export interface NormalizedSessionBackendConfig { + type: SessionBackendType; + turso: NormalizedTursoSessionBackendSettings; +} + export interface SyncConfig { repo?: SyncRepoConfig; localRepoPath?: string; includeSecrets?: boolean; includeMcpSecrets?: boolean; includeSessions?: boolean; + sessionBackend?: SessionBackendConfig; includePromptStash?: boolean; includeModelFavorites?: boolean; secretsBackend?: SecretsBackendConfig; @@ -41,6 +68,7 @@ export interface NormalizedSyncConfig extends SyncConfig { includeSecrets: boolean; includeMcpSecrets: boolean; includeSessions: boolean; + sessionBackend: NormalizedSessionBackendConfig; includePromptStash: boolean; includeModelFavorites: boolean; secretsBackend?: SecretsBackendConfig; @@ -53,6 +81,8 @@ export interface SyncState { lastPush?: string; lastRemoteUpdate?: string; lastSecretsHash?: string; + lastSessionPull?: string; + lastSessionPush?: string; } export async function pathExists(filePath: string): Promise { @@ -103,6 +133,39 @@ export function normalizeSecretsBackend( return { type: '1password', vault, documents }; } +function normalizeTursoBackendSettings( + input: SessionBackendConfig['turso'] +): NormalizedTursoSessionBackendSettings { + const syncIntervalRaw = input?.syncIntervalSec; + const syncIntervalSec = + typeof syncIntervalRaw === 'number' && Number.isFinite(syncIntervalRaw) && syncIntervalRaw > 0 + ? Math.floor(syncIntervalRaw) + : 15; + + return { + database: typeof input?.database === 'string' ? input.database : undefined, + url: typeof input?.url === 'string' ? input.url : undefined, + syncIntervalSec, + autoSetup: input?.autoSetup !== false, + }; +} + +export function normalizeSessionBackend( + input: SyncConfig['sessionBackend'] +): NormalizedSessionBackendConfig { + const type = input?.type === 'turso' ? 'turso' : 'git'; + return { + type, + turso: normalizeTursoBackendSettings(input?.turso), + }; +} + +export function isTursoSessionBackend(config: SyncConfig | NormalizedSyncConfig): boolean { + if (!config.includeSessions) return false; + const normalized = normalizeSessionBackend(config.sessionBackend); + return normalized.type === 'turso'; +} + export function normalizeSyncConfig(config: SyncConfig): NormalizedSyncConfig { const includeSecrets = Boolean(config.includeSecrets); const includeModelFavorites = config.includeModelFavorites !== false; @@ -110,6 +173,7 @@ export function normalizeSyncConfig(config: SyncConfig): NormalizedSyncConfig { includeSecrets, includeMcpSecrets: includeSecrets ? Boolean(config.includeMcpSecrets) : false, includeSessions: Boolean(config.includeSessions), + sessionBackend: normalizeSessionBackend(config.sessionBackend), includePromptStash: Boolean(config.includePromptStash), includeModelFavorites, secretsBackend: normalizeSecretsBackend(config.secretsBackend), diff --git a/src/sync/paths.test.ts b/src/sync/paths.test.ts index 4a0bfbd..1bcf830 100644 --- a/src/sync/paths.test.ts +++ b/src/sync/paths.test.ts @@ -131,6 +131,31 @@ describe('buildSyncPlan', () => { } }); + it('excludes git session paths when using turso session backend', () => { + const env = { HOME: '/home/test' } as NodeJS.ProcessEnv; + const locations = resolveSyncLocations(env, 'linux'); + const config: SyncConfig = { + repo: { owner: 'acme', name: 'config' }, + includeSecrets: true, + includeSessions: true, + sessionBackend: { + type: 'turso', + }, + }; + + const plan = buildSyncPlan(normalizeSyncConfig(config), locations, '/repo', 'linux'); + const sessionItems = plan.items.filter( + (item) => + item.localPath.endsWith('/.local/share/opencode/opencode.db') || + item.localPath.includes('/.local/share/opencode/storage/session') || + item.localPath.includes('/.local/share/opencode/storage/message') || + item.localPath.includes('/.local/share/opencode/storage/part') || + item.localPath.includes('/.local/share/opencode/storage/session_diff') + ); + + expect(sessionItems).toEqual([]); + }); + it('excludes auth files when using 1password backend', () => { const env = { HOME: '/home/test' } as NodeJS.ProcessEnv; const locations = resolveSyncLocations(env, 'linux'); diff --git a/src/sync/paths.ts b/src/sync/paths.ts index 958dace..921fcb0 100644 --- a/src/sync/paths.ts +++ b/src/sync/paths.ts @@ -1,7 +1,7 @@ import crypto from 'node:crypto'; import path from 'node:path'; import type { NormalizedSyncConfig, SyncConfig } from './config.js'; -import { hasSecretsBackend } from './config.js'; +import { hasSecretsBackend, isTursoSessionBackend } from './config.js'; export interface XdgPaths { homeDir: string; @@ -246,7 +246,7 @@ export function buildSyncPlan( ); } - if (config.includeSessions) { + if (config.includeSessions && !isTursoSessionBackend(config)) { items.push({ localPath: path.join(dataRoot, SESSION_DB_FILE), repoPath: path.join(repoDataRoot, SESSION_DB_FILE), diff --git a/src/sync/repo.test.ts b/src/sync/repo.test.ts index 9efee54..7919475 100644 --- a/src/sync/repo.test.ts +++ b/src/sync/repo.test.ts @@ -1,6 +1,6 @@ import { describe, expect, it } from 'vitest'; -import { parseRepoVisibility } from './repo.js'; +import { parseRepoReference, parseRepoVisibility } from './repo.js'; describe('parseRepoVisibility', () => { it('parses private status', () => { @@ -12,3 +12,57 @@ describe('parseRepoVisibility', () => { expect(() => parseRepoVisibility('{"private": true}')).toThrow(); }); }); + +describe('parseRepoReference', () => { + it('parses short repo name with authenticated-user fallback', () => { + expect(parseRepoReference('my-opencode-config', 'ihildy')).toEqual({ + owner: 'ihildy', + name: 'my-opencode-config', + }); + }); + + it('parses explicit owner/repo input', () => { + expect(parseRepoReference('acme/opencode-sync', 'ignored')).toEqual({ + owner: 'acme', + name: 'opencode-sync', + }); + }); + + it('parses GitHub https repo URLs', () => { + expect(parseRepoReference('https://github.com/acme/opencode-sync.git', 'ignored')).toEqual({ + owner: 'acme', + name: 'opencode-sync', + }); + }); + + it('parses GitHub ssh:// repo URLs', () => { + expect(parseRepoReference('ssh://git@github.com/acme/opencode-sync.git', 'ignored')).toEqual({ + owner: 'acme', + name: 'opencode-sync', + }); + }); + + it('parses GitHub SSH repo URLs', () => { + expect(parseRepoReference('git@github.com:acme/opencode-sync.git', 'ignored')).toEqual({ + owner: 'acme', + name: 'opencode-sync', + }); + }); + + it('parses GitHub SSH repo URLs with trailing slash', () => { + expect(parseRepoReference('git@github.com:acme/opencode-sync.git/', 'ignored')).toEqual({ + owner: 'acme', + name: 'opencode-sync', + }); + }); + + it('returns null for invalid repo references', () => { + expect(parseRepoReference('https://example.com/acme/opencode-sync', 'ignored')).toBeNull(); + expect( + parseRepoReference('https://github.com/acme/opencode-sync/issues', 'ignored') + ).toBeNull(); + expect(parseRepoReference('acme/opencode/sync', 'ignored')).toBeNull(); + expect(parseRepoReference('git@notgithub:acme/opencode-sync', 'ignored')).toBeNull(); + expect(parseRepoReference(' ', 'ihildy')).toBeNull(); + }); +}); diff --git a/src/sync/repo.ts b/src/sync/repo.ts index 86c71d2..c5ffa9b 100644 --- a/src/sync/repo.ts +++ b/src/sync/repo.ts @@ -272,19 +272,66 @@ export interface FoundRepo { isPrivate: boolean; } -export async function findSyncRepo($: Shell, repoName?: string): Promise { +export interface FindSyncRepoOptions { + disableAutoDiscovery?: boolean; +} + +export interface RepoReference { + owner: string; + name: string; +} + +export function parseRepoReference(input: string, fallbackOwner: string): RepoReference | null { + const raw = input.trim(); + if (!raw) return null; + + const fromHttpUrl = parseGitHubHttpRepo(raw); + if (fromHttpUrl) return fromHttpUrl; + + const fromSshUrl = parseGitHubSshRepo(raw); + if (fromSshUrl) return fromSshUrl; + + if (raw.includes('/')) { + const parts = raw.split('/').filter(Boolean); + if (parts.length !== 2) return null; + const [owner, repoRaw] = parts; + if (owner.includes(':') || owner.includes('@')) return null; + const name = normalizeRepoName(repoRaw); + if (!owner || !name) return null; + return { owner, name }; + } + + const name = normalizeRepoName(raw); + if (!name || !fallbackOwner) return null; + return { owner: fallbackOwner, name }; +} + +export async function findSyncRepo( + $: Shell, + repoName?: string, + options: FindSyncRepoOptions = {} +): Promise { const owner = await getAuthenticatedUser($); // If user provided a specific name, check that first if (repoName) { - const exists = await repoExists($, `${owner}/${repoName}`); + const target = parseRepoReference(repoName, owner); + if (!target) { + return null; + } + const repoIdentifier = `${target.owner}/${target.name}`; + const exists = await repoExists($, repoIdentifier); if (exists) { - const isPrivate = await checkRepoPrivate($, `${owner}/${repoName}`); - return { owner, name: repoName, isPrivate }; + const isPrivate = await checkRepoPrivate($, repoIdentifier); + return { owner: target.owner, name: target.name, isPrivate }; } return null; } + if (options.disableAutoDiscovery) { + return null; + } + // Search through likely repo names for (const name of LIKELY_SYNC_REPO_NAMES) { const exists = await repoExists($, `${owner}/${name}`); @@ -305,3 +352,41 @@ async function checkRepoPrivate($: Shell, repoIdentifier: string): Promise Promise; + handleEvent: (_event: unknown) => Promise; status: () => Promise; init: (_options: InitOptions) => Promise; link: (_options: LinkOptions) => Promise; @@ -88,6 +100,14 @@ export interface SyncService { extraSecretPaths?: string[]; includeMcpSecrets?: boolean; }) => Promise; + sessionsBackend: (_options: { + backend?: 'git' | 'turso'; + setupTurso?: boolean; + migrateSessions?: boolean; + }) => Promise; + sessionsSetupTurso: (_options?: { forceTokenRefresh?: boolean }) => Promise; + sessionsMigrateTurso: (_options?: { setupTurso?: boolean }) => Promise; + sessionsCleanupGit: () => Promise; resolve: () => Promise; } @@ -95,6 +115,15 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { const locations = resolveSyncLocations(); const log = createLogger(ctx.client); const lockPath = path.join(path.dirname(locations.statePath), 'sync.lock'); + const strictLinkRepo = resolveStrictLinkRepo(process.env.OPENCODE_SYNC_E2E_STRICT_LINK_REPO); + const disableAutoRepoDiscovery = + process.env.OPENCODE_SYNC_E2E_DISABLE_AUTO_REPO_DISCOVERY === '1' || strictLinkRepo !== null; + let tursoSyncTimer: ReturnType | null = null; + let tursoSyncIntervalSec = 15; + const activeSessionIds = new Set(); + const pendingTursoSyncReasons = new Set(); + let tursoIdleFlushTimer: ReturnType | null = null; + let tursoFlushInFlight = false; const formatLockInfo = (info: SyncLockInfo | null): string => { if (!info) return 'Another sync is already in progress.'; @@ -250,7 +279,317 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { return await action(resolved.backend); }; + const stopTursoSyncLoop = (): void => { + if (!tursoSyncTimer) return; + clearInterval(tursoSyncTimer); + tursoSyncTimer = null; + if (tursoIdleFlushTimer) { + clearTimeout(tursoIdleFlushTimer); + tursoIdleFlushTimer = null; + } + }; + + const sleep = async (ms: number): Promise => + await new Promise((resolve) => { + setTimeout(resolve, ms); + }); + + const formatTursoCycleSummary = (cycle: { + pullBefore: { status: string }; + push: { status: string }; + pullAfter: { status: string }; + }): string => { + return `Turso sessions cycle: pull=${cycle.pullBefore.status}, push=${cycle.push.status}, pull=${cycle.pullAfter.status}`; + }; + + const resolveTursoPreferenceFromReasons = ( + reasons: string[], + trigger: string + ): TursoSyncPreference => { + const values = [trigger, ...reasons].map((entry) => entry.toLowerCase()); + const hasPush = values.some((entry) => entry.includes('push') || entry.includes('migrate')); + const hasPull = values.some( + (entry) => entry.includes('pull') || entry.includes('startup') || entry.includes('link') + ); + if (hasPush) return 'push'; + if (hasPull) return 'pull'; + return 'auto'; + }; + + const runTursoSetup = async ( + config: NormalizedSyncConfig, + options: { allowLogin: boolean; forceTokenRefresh?: boolean; allowAutoInstall?: boolean } + ) => { + const backend = createTursoSessionBackend({ locations, config, log }); + return await backend.ensureSetup({ + allowLogin: options.allowLogin, + forceTokenRefresh: options.forceTokenRefresh, + allowAutoInstall: options.allowAutoInstall ?? config.sessionBackend.turso.autoSetup, + }); + }; + + const runTursoCycleWithRetry = async ( + config: NormalizedSyncConfig, + reason: string, + options: { attempts?: number; preference?: TursoSyncPreference; allowLocalPull?: boolean } = {} + ): Promise<{ summary: string }> => { + const backend = createTursoSessionBackend({ locations, config, log }); + const attempts = options.attempts ?? 3; + const preference = options.preference ?? 'auto'; + const allowLocalPull = options.allowLocalPull ?? true; + let backoffMs = 500; + let attempt = 1; + + while (attempt <= attempts) { + try { + const cycle = await backend.syncCycle({ preference, allowLocalPull }); + const now = new Date().toISOString(); + const stateUpdate: { lastSessionPull?: string; lastSessionPush?: string } = {}; + if (cycle.pullBefore.status !== 'skipped' || cycle.pullAfter.status !== 'skipped') { + stateUpdate.lastSessionPull = now; + } + if (cycle.push.status !== 'skipped') { + stateUpdate.lastSessionPush = now; + } + if (stateUpdate.lastSessionPull || stateUpdate.lastSessionPush) { + await updateState(locations, stateUpdate); + } + + return { summary: formatTursoCycleSummary(cycle) }; + } catch (error) { + const retryable = isRetryableTursoError(error); + if (attempt < attempts && retryable) { + log.warn('Retrying Turso session sync cycle', { + reason, + attempt, + preference, + allowLocalPull, + error: formatError(error), + backoffMs, + }); + await sleep(backoffMs); + backoffMs *= 2; + attempt += 1; + continue; + } + throw error; + } + } + + throw new SyncCommandError(`Turso session sync failed after ${attempts} attempts (${reason}).`); + }; + + const refreshActiveSessionsFromServer = async (): Promise => { + try { + const response = await ctx.client.session.status({}); + const statusMap = unwrapData>(response); + if (!isRecord(statusMap)) { + return false; + } + + activeSessionIds.clear(); + for (const [sessionId, status] of Object.entries(statusMap)) { + if (isBusySessionStatus(status)) { + activeSessionIds.add(sessionId); + } + } + return true; + } catch (error) { + log.warn('Failed to query session activity state', { error: formatError(error) }); + return false; + } + }; + + const areAllSessionsIdle = async (): Promise => { + const first = await refreshActiveSessionsFromServer(); + if (!first || activeSessionIds.size > 0) { + return false; + } + await sleep(200); + + const second = await refreshActiveSessionsFromServer(); + if (!second) { + return false; + } + return activeSessionIds.size === 0; + }; + + const queueTursoSync = (reason: string): void => { + pendingTursoSyncReasons.add(reason); + }; + + const flushQueuedTursoSync = async ( + trigger: string, + latestConfig?: NormalizedSyncConfig | null + ): Promise<{ summary?: string; warning?: string; deferred: boolean }> => { + if (pendingTursoSyncReasons.size === 0) { + return { deferred: false }; + } + if (tursoFlushInFlight) { + return { deferred: true }; + } + + tursoFlushInFlight = true; + try { + const latest = latestConfig ?? (await loadSyncConfig(locations)); + if (!latest || !isTursoSessionBackend(latest)) { + pendingTursoSyncReasons.clear(); + stopTursoSyncLoop(); + return { deferred: false }; + } + + const idle = await areAllSessionsIdle(); + if (!idle) { + return { deferred: true }; + } + + const reasons = [...pendingTursoSyncReasons]; + pendingTursoSyncReasons.clear(); + const preference = resolveTursoPreferenceFromReasons(reasons, trigger); + const allowLocalPull = trigger === 'startup'; + + try { + const cycle = await runTursoCycleWithRetry(latest, `${trigger}:${reasons.join(',')}`, { + preference, + allowLocalPull, + }); + return { summary: cycle.summary, deferred: false }; + } catch (error) { + for (const reason of reasons) { + pendingTursoSyncReasons.add(reason); + } + const warning = `Turso session sync warning: ${formatError(error)}`; + log.warn(warning, { trigger, reasons }); + return { warning, deferred: false }; + } + } finally { + tursoFlushInFlight = false; + } + }; + + const scheduleTursoIdleFlush = (): void => { + if (tursoIdleFlushTimer) { + return; + } + tursoIdleFlushTimer = setTimeout(() => { + tursoIdleFlushTimer = null; + void skipIfBusy(async () => { + await flushQueuedTursoSync('idle-event'); + }); + }, 250); + }; + + const runForegroundTursoCycle = async ( + config: NormalizedSyncConfig, + reason: string + ): Promise => { + if (!isTursoSessionBackend(config)) return null; + queueTursoSync(reason); + scheduleTursoIdleFlush(); + return 'Turso sessions sync queued; runtime local pull is deferred until startup.'; + }; + + const runTursoStartupPull = async (config: NormalizedSyncConfig): Promise => { + if (!isTursoSessionBackend(config)) return null; + const setup = await runTursoSetup(config, { allowLogin: false }); + if (!setup.ready) { + return `Turso session setup pending: ${setup.message}`; + } + + queueTursoSync('startup'); + const result = await flushQueuedTursoSync('startup', config); + if (result.deferred) { + scheduleTursoIdleFlush(); + return 'Turso startup sync deferred until all sessions are idle.'; + } + if (result.warning) { + return result.warning; + } + if (result.summary) { + return `Turso startup sync: ${result.summary}`; + } + return null; + }; + + const ensureTursoSyncLoop = (config: NormalizedSyncConfig): void => { + if (!isTursoSessionBackend(config)) { + stopTursoSyncLoop(); + return; + } + + const nextInterval = config.sessionBackend.turso.syncIntervalSec; + if (tursoSyncTimer && tursoSyncIntervalSec === nextInterval) { + return; + } + + stopTursoSyncLoop(); + tursoSyncIntervalSec = nextInterval; + tursoSyncTimer = setInterval(() => { + void skipIfBusy(async () => { + const latest = await loadSyncConfig(locations); + if (!latest || !isTursoSessionBackend(latest)) { + stopTursoSyncLoop(); + return; + } + + queueTursoSync('background'); + const result = await flushQueuedTursoSync('background', latest); + if (result.deferred) { + return; + } + if (result.warning) { + log.warn(result.warning, { reason: 'background' }); + } + }); + }, nextInterval * 1000); + }; + + const onEvent = async (event: unknown): Promise => { + if (!isRecord(event)) { + return; + } + + const eventType = typeof event.type === 'string' ? event.type : ''; + const properties = isRecord(event.properties) ? event.properties : null; + if (!properties) { + return; + } + + let idleSignal = false; + if (eventType === 'session.status') { + const sessionId = typeof properties.sessionID === 'string' ? properties.sessionID : null; + const status = isRecord(properties.status) ? properties.status : null; + const statusType = status && typeof status.type === 'string' ? status.type : null; + if (sessionId && statusType === 'idle') { + activeSessionIds.delete(sessionId); + idleSignal = true; + } else if (sessionId) { + activeSessionIds.add(sessionId); + } + } else if (eventType === 'session.idle') { + const sessionId = typeof properties.sessionID === 'string' ? properties.sessionID : null; + if (sessionId) { + activeSessionIds.delete(sessionId); + idleSignal = true; + } + } else if (eventType === 'session.deleted') { + const info = isRecord(properties.info) ? properties.info : null; + const sessionId = info && typeof info.id === 'string' ? info.id : null; + if (sessionId) { + activeSessionIds.delete(sessionId); + idleSignal = true; + } + } + + if (idleSignal && pendingTursoSyncReasons.size > 0) { + scheduleTursoIdleFlush(); + } + }; + return { + handleEvent: async (event: unknown) => { + await onEvent(event); + }, startupSync: () => skipIfBusy(async () => { let config: ReturnType | null = null; @@ -267,6 +606,7 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { return; } if (!config) { + stopTursoSyncLoop(); await showToast( ctx.client, 'Configure opencode-synced with /sync-init or link to an existing repo with /sync-link', @@ -276,10 +616,24 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { } try { assertValidSecretsBackend(config); + let tursoWarning: string | null = null; + if (isTursoSessionBackend(config)) { + try { + tursoWarning = await runTursoStartupPull(config); + } catch (error) { + tursoWarning = `Turso session startup pull failed: ${formatError(error)}`; + } + } + await runStartup(ctx, locations, config, log, { ensureAuthFilesNotTracked, runSecretsPullIfConfigured, }); + ensureTursoSyncLoop(config); + if (tursoWarning) { + log.warn(tursoWarning); + await showToast(ctx.client, tursoWarning, 'warning'); + } } catch (error) { log.error('Startup sync failed', { error: formatError(error) }); await showToast(ctx.client, formatError(error), 'error'); @@ -315,11 +669,28 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { const includeSecrets = config.includeSecrets ? 'enabled' : 'disabled'; const includeMcpSecrets = config.includeMcpSecrets ? 'enabled' : 'disabled'; const includeSessions = config.includeSessions ? 'enabled' : 'disabled'; + const sessionBackendType = config.sessionBackend.type; + const sessionBackendLabel = !config.includeSessions + ? `${sessionBackendType} (inactive; includeSessions disabled)` + : sessionBackendType === 'turso' + ? 'turso (concurrent-safe backend enabled)' + : 'git (best effort, may conflict with concurrent writers)'; const includePromptStash = config.includePromptStash ? 'enabled' : 'disabled'; const includeModelFavorites = config.includeModelFavorites ? 'enabled' : 'disabled'; const secretsBackend = config.secretsBackend?.type ?? 'none'; const lastPull = state.lastPull ?? 'never'; const lastPush = state.lastPush ?? 'never'; + const lastSessionPull = state.lastSessionPull ?? 'never'; + const lastSessionPush = state.lastSessionPush ?? 'never'; + let tursoStatusLine: string | null = null; + if (config.includeSessions && sessionBackendType === 'turso') { + try { + const tursoStatus = await createTursoSessionBackend({ locations, config, log }).status(); + tursoStatusLine = `Turso status: ${tursoStatus}`; + } catch (error) { + tursoStatusLine = `Turso status: unavailable (${formatError(error)})`; + } + } let changesLabel = 'clean'; if (!cloned) { @@ -338,12 +709,18 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { `Secrets backend: ${secretsBackend}`, `MCP secrets: ${includeMcpSecrets}`, `Sessions: ${includeSessions}`, + `Session backend: ${sessionBackendLabel}`, + `Last session pull: ${lastSessionPull}`, + `Last session push: ${lastSessionPush}`, `Prompt stash: ${includePromptStash}`, `Model favorites: ${includeModelFavorites}`, `Last pull: ${lastPull}`, `Last push: ${lastPush}`, `Working tree: ${changesLabel}`, ]; + if (tursoStatusLine) { + statusLines.push(tursoStatusLine); + } return statusLines.join('\n'); }, @@ -366,6 +743,25 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { await ensureRepoCloned(ctx.$, config, repoRoot); await ensureSecretsPolicy(ctx, config); + const initNotes: string[] = []; + if (isTursoSessionBackend(config) && options.setupTurso !== false) { + const setup = await runTursoSetup(config, { allowLogin: true }); + initNotes.push(setup.message); + if (setup.loginUrl) { + initNotes.push(`Complete Turso login at: ${setup.loginUrl}`); + } + if (setup.loginCode) { + initNotes.push(`Login code: ${setup.loginCode}`); + } + } + + if (isTursoSessionBackend(config) && options.migrateSessions) { + const cycle = await runTursoCycleWithRetry(config, 'init-migrate', { + preference: 'push', + }); + initNotes.push(`Session bootstrap: ${cycle.summary}`); + } + if (created) { const overrides = await loadOverrides(locations); const plan = buildSyncPlan(config, locations, repoRoot); @@ -389,23 +785,42 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { `Branch: ${resolveRepoBranch(config)}`, `Local repo: ${repoRoot}`, ]; + if (initNotes.length > 0) { + lines.push('', ...initNotes); + } + ensureTursoSyncLoop(config); return lines.join('\n'); }), link: (options: LinkOptions) => runExclusive(async () => { - const found = await findSyncRepo(ctx.$, options.repo); + if (disableAutoRepoDiscovery && !options.repo) { + const expectation = strictLinkRepo + ? ` Provide the exact repo: ${strictLinkRepo.owner}/${strictLinkRepo.name}.` + : ''; + throw new SyncCommandError( + 'Repo auto-discovery is disabled in this environment. ' + + 'Run /sync-link with an explicit repo argument.' + + expectation + ); + } + + const found = await findSyncRepo(ctx.$, options.repo, { + disableAutoDiscovery: disableAutoRepoDiscovery, + }); if (!found) { const searchedFor = options.repo ? `"${options.repo}"` - : 'common sync repo names (my-opencode-config, opencode-config, etc.)'; + : disableAutoRepoDiscovery + ? '(none; auto-discovery disabled)' + : 'common sync repo names (my-opencode-config, opencode-config, etc.)'; const lines = [ `Could not find an existing sync repo. Searched for: ${searchedFor}`, '', 'To link to an existing repo, run:', - ' /sync-link ', + ' /sync-link ', '', 'To create a new sync repo, run:', ' /sync-init', @@ -413,6 +828,17 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { return lines.join('\n'); } + if (strictLinkRepo) { + const linkedIdentifier = `${found.owner}/${found.name}`.toLowerCase(); + const expectedIdentifier = `${strictLinkRepo.owner}/${strictLinkRepo.name}`.toLowerCase(); + if (linkedIdentifier !== expectedIdentifier) { + throw new SyncCommandError( + `Strict link mode expected repo ${strictLinkRepo.owner}/${strictLinkRepo.name}, ` + + `but resolved ${found.owner}/${found.name}.` + ); + } + } + const config = normalizeSyncConfig({ repo: { owner: found.owner, name: found.name }, includeSecrets: false, @@ -440,6 +866,22 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { lastRemoteUpdate: new Date().toISOString(), }); + const linkNotes: string[] = []; + const syncedConfig = await loadSyncConfig(locations); + if (syncedConfig && isTursoSessionBackend(syncedConfig)) { + const setup = await runTursoSetup(syncedConfig, { allowLogin: true }); + linkNotes.push(setup.message); + if (setup.loginUrl) { + linkNotes.push(`Complete Turso login at: ${setup.loginUrl}`); + } + if (setup.loginCode) { + linkNotes.push(`Login code: ${setup.loginCode}`); + } + ensureTursoSyncLoop(syncedConfig); + } else if (syncedConfig) { + ensureTursoSyncLoop(syncedConfig); + } + const lines = [ `Linked to existing sync repo: ${found.owner}/${found.name}`, '', @@ -452,6 +894,9 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { ? 'To enable secrets sync, run: /sync-enable-secrets' : 'Note: Repo is public. Secrets sync is disabled.', ]; + if (linkNotes.length > 0) { + lines.push('', ...linkNotes); + } await showToast(ctx.client, 'Config synced. Restart opencode to apply.', 'info'); return lines.join('\n'); @@ -475,6 +920,11 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { const update = await fetchAndFastForward(ctx.$, repoRoot, branch); if (!update.updated) { + const tursoSummary = await runForegroundTursoCycle(config, 'pull-up-to-date'); + ensureTursoSyncLoop(config); + if (tursoSummary) { + return ['Already up to date.', tursoSummary].join('\n'); + } return 'Already up to date.'; } @@ -488,7 +938,16 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { lastRemoteUpdate: new Date().toISOString(), }); + const tursoSummary = await runForegroundTursoCycle(config, 'pull-updated'); + ensureTursoSyncLoop(config); + await showToast(ctx.client, 'Config updated. Restart opencode to apply.', 'info'); + if (tursoSummary) { + return [ + 'Remote config applied. Restart opencode to use new settings.', + tursoSummary, + ].join('\n'); + } return 'Remote config applied. Restart opencode to use new settings.'; }), push: () => @@ -516,18 +975,29 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { const dirty = await hasLocalChanges(ctx.$, repoRoot); if (!dirty) { + const tursoSummary = await runForegroundTursoCycle(config, 'push-no-config-diff'); + ensureTursoSyncLoop(config); try { const secretsResult = await runSecretsPushIfConfigured(config); + const lines: string[] = []; if (secretsResult === 'pushed') { - return 'No local changes to push. Secrets updated.'; + lines.push('No local changes to push. Secrets updated.'); + } else if (secretsResult === 'skipped') { + lines.push('No local changes to push. Secrets unchanged.'); + } else { + lines.push('No local changes to push.'); } - if (secretsResult === 'skipped') { - return 'No local changes to push. Secrets unchanged.'; + if (tursoSummary) { + lines.push(tursoSummary); } - return 'No local changes to push.'; + return lines.join('\n'); } catch (error) { log.warn('Secrets push failed after sync check', { error: formatError(error) }); - return `No local changes to push. Secrets push failed: ${formatError(error)}`; + const lines = [`No local changes to push. Secrets push failed: ${formatError(error)}`]; + if (tursoSummary) { + lines.push(tursoSummary); + } + return lines.join('\n'); } } @@ -547,8 +1017,18 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { lastPush: new Date().toISOString(), }); + const tursoSummary = await runForegroundTursoCycle(config, 'push-updated'); + ensureTursoSyncLoop(config); + if (secretsFailure) { - return `Pushed changes: ${message}. Secrets push failed: ${secretsFailure}`; + const lines = [`Pushed changes: ${message}. Secrets push failed: ${secretsFailure}`]; + if (tursoSummary) { + lines.push(tursoSummary); + } + return lines.join('\n'); + } + if (tursoSummary) { + return [`Pushed changes: ${message}`, tursoSummary].join('\n'); } return `Pushed changes: ${message}`; }), @@ -592,6 +1072,200 @@ export function createSyncService(ctx: SyncServiceContext): SyncService { return 'Secrets sync enabled for this repo.'; }), + sessionsBackend: (options: { + backend?: 'git' | 'turso'; + setupTurso?: boolean; + migrateSessions?: boolean; + }) => + runExclusive(async () => { + const config = await getConfigOrThrow(locations); + if (!config.includeSessions) { + throw new SyncCommandError( + 'Session sync is disabled. Enable includeSessions=true before selecting a backend.' + ); + } + + const backend = options.backend; + if (backend !== 'git' && backend !== 'turso') { + throw new SyncCommandError('Specify a valid backend: git or turso.'); + } + + const nextConfig = normalizeSyncConfig({ + ...config, + sessionBackend: { + ...config.sessionBackend, + type: backend, + }, + }); + + const notes: string[] = []; + if (backend === 'turso') { + if (options.setupTurso !== false) { + const setup = await runTursoSetup(nextConfig, { allowLogin: true }); + notes.push(setup.message); + if (setup.loginUrl) { + notes.push(`Complete Turso login at: ${setup.loginUrl}`); + } + if (setup.loginCode) { + notes.push(`Login code: ${setup.loginCode}`); + } + } + + if (options.migrateSessions) { + const cycle = await runTursoCycleWithRetry(nextConfig, 'sessions-backend-migrate', { + preference: 'push', + }); + notes.push(`Session bootstrap: ${cycle.summary}`); + } + } + + await writeSyncConfig(locations, nextConfig); + ensureTursoSyncLoop(nextConfig); + + const lines = [ + `Session backend switched to ${backend}.`, + backend === 'git' + ? 'Git mode is best effort and may conflict with concurrent writers.' + : 'Turso concurrent-safe backend enabled.', + ]; + + if (notes.length > 0) { + lines.push('', ...notes); + } + + return lines.join('\n'); + }), + sessionsSetupTurso: (options?: { forceTokenRefresh?: boolean }) => + runExclusive(async () => { + const config = await getConfigOrThrow(locations); + if (!config.includeSessions) { + throw new SyncCommandError( + 'Session sync is disabled. Enable includeSessions=true before Turso setup.' + ); + } + + const tursoConfig = isTursoSessionBackend(config) + ? config + : normalizeSyncConfig({ + ...config, + sessionBackend: { + ...config.sessionBackend, + type: 'turso', + }, + }); + + const setup = await runTursoSetup(tursoConfig, { + allowLogin: true, + forceTokenRefresh: options?.forceTokenRefresh, + allowAutoInstall: true, + }); + + const lines = [setup.message]; + if (setup.loginUrl) { + lines.push(`Complete Turso login at: ${setup.loginUrl}`); + } + if (setup.loginCode) { + lines.push(`Login code: ${setup.loginCode}`); + } + if (setup.ready && isTursoSessionBackend(config)) { + ensureTursoSyncLoop(config); + } + + return lines.join('\n'); + }), + sessionsMigrateTurso: (options?: { setupTurso?: boolean }) => + runExclusive(async () => { + const config = await getConfigOrThrow(locations); + if (!config.includeSessions) { + throw new SyncCommandError( + 'Session sync is disabled. Enable includeSessions=true before migration.' + ); + } + + const migratedConfig = normalizeSyncConfig({ + ...config, + sessionBackend: { + ...config.sessionBackend, + type: 'turso', + }, + }); + + if (options?.setupTurso !== false) { + const setup = await runTursoSetup(migratedConfig, { allowLogin: true }); + if (!setup.ready) { + const lines = [setup.message]; + if (setup.loginUrl) { + lines.push(`Complete Turso login at: ${setup.loginUrl}`); + } + if (setup.loginCode) { + lines.push(`Login code: ${setup.loginCode}`); + } + return lines.join('\n'); + } + } else { + const setup = await runTursoSetup(migratedConfig, { allowLogin: false }); + if (!setup.ready) { + throw new SyncCommandError(setup.message); + } + } + + const cycle = await runTursoCycleWithRetry(migratedConfig, 'sessions-migrate-turso', { + preference: 'push', + }); + await writeSyncConfig(locations, migratedConfig); + ensureTursoSyncLoop(migratedConfig); + + return [ + 'Session migration to Turso completed.', + `Bootstrap result: ${cycle.summary}`, + 'Git session artifacts were left in the sync repo for temporary fallback.', + 'After stabilization, run /sync-sessions-cleanup-git to remove deprecated repo session files.', + ].join('\n'); + }), + sessionsCleanupGit: () => + runExclusive(async () => { + const config = await getConfigOrThrow(locations); + if (!isTursoSessionBackend(config)) { + throw new SyncCommandError( + 'Cleanup is only available when includeSessions=true and sessionBackend=turso.' + ); + } + + const repoRoot = resolveRepoRoot(config, locations); + await ensureRepoCloned(ctx.$, config, repoRoot); + + const preDirty = await hasLocalChanges(ctx.$, repoRoot); + if (preDirty) { + throw new SyncCommandError( + `Local sync repo has uncommitted changes. Resolve in ${repoRoot} before cleanup.` + ); + } + + const deprecatedPaths = [ + path.join(repoRoot, 'data', 'opencode.db'), + path.join(repoRoot, 'data', 'opencode.db-wal'), + path.join(repoRoot, 'data', 'opencode.db-shm'), + path.join(repoRoot, 'data', 'storage', 'session'), + path.join(repoRoot, 'data', 'storage', 'message'), + path.join(repoRoot, 'data', 'storage', 'part'), + path.join(repoRoot, 'data', 'storage', 'session_diff'), + ]; + + for (const target of deprecatedPaths) { + await fs.rm(target, { recursive: true, force: true }); + } + + const dirty = await hasLocalChanges(ctx.$, repoRoot); + if (!dirty) { + return 'No deprecated Git session artifacts were found.'; + } + + const branch = await resolveBranch(ctx, config, repoRoot); + await commitAll(ctx.$, repoRoot, 'chore: remove deprecated git session artifacts'); + await pushBranch(ctx.$, repoRoot, branch); + await updateState(locations, { lastPush: new Date().toISOString() }); + return 'Deprecated Git session artifacts removed and pushed.'; + }), resolve: () => runExclusive(async () => { const config = await getConfigOrThrow(locations); @@ -765,6 +1439,11 @@ async function buildConfigFromInit($: Shell, options: InitOptions) { includeSecrets: options.includeSecrets ?? false, includeMcpSecrets: options.includeMcpSecrets ?? false, includeSessions: options.includeSessions ?? false, + sessionBackend: options.sessionBackend + ? { + type: options.sessionBackend, + } + : undefined, includePromptStash: options.includePromptStash ?? false, includeModelFavorites: options.includeModelFavorites ?? true, extraSecretPaths: options.extraSecretPaths ?? [], @@ -825,6 +1504,24 @@ function formatError(error: unknown): string { return String(error); } +function isRecord(value: unknown): value is Record { + return typeof value === 'object' && value !== null; +} + +function isBusySessionStatus(status: unknown): boolean { + if (!isRecord(status)) { + return true; + } + const statusType = typeof status.type === 'string' ? status.type : ''; + if (statusType === 'idle') { + return false; + } + if (statusType === 'busy' || statusType === 'retry') { + return true; + } + return true; +} + interface ResolutionDecision { action: 'commit' | 'reset' | 'manual'; message?: string; @@ -929,3 +1626,18 @@ function parseResolutionDecision(text: string): ResolutionDecision { return { action: 'manual', reason: 'Failed to parse AI decision' }; } } + +function resolveStrictLinkRepo(raw: string | undefined): { owner: string; name: string } | null { + if (!raw) return null; + const value = raw.trim(); + if (!value) return null; + + const parsed = parseRepoReference(value, '__opencode_sync_no_owner__'); + if (!parsed || parsed.owner === '__opencode_sync_no_owner__') { + throw new SyncCommandError( + 'OPENCODE_SYNC_E2E_STRICT_LINK_REPO must be an explicit owner/repo or GitHub repo URL.' + ); + } + + return parsed; +} diff --git a/src/sync/turso.test.ts b/src/sync/turso.test.ts new file mode 100644 index 0000000..cffbee9 --- /dev/null +++ b/src/sync/turso.test.ts @@ -0,0 +1,151 @@ +import { describe, expect, it } from 'vitest'; +import { normalizeSyncConfig } from './config.js'; +import type { SyncLocations } from './paths.js'; +import { + estimateBase64EncodedLength, + estimateSnapshotPayloadBase64Bytes, + extractHeadlessLoginHints, + extractRows, + isRetryableTursoError, + isSnapshotPayloadSizeAllowed, + MAX_TURSO_SNAPSHOT_BASE64_BYTES, + resolveSessionDbPaths, + resolveTursoCredentialPath, + resolveTursoDatabaseName, +} from './turso.js'; + +function createLocations(): SyncLocations { + return { + xdg: { + homeDir: '/home/test', + configDir: '/home/test/.config', + dataDir: '/home/test/.local/share', + stateDir: '/home/test/.local/state', + }, + configRoot: '/home/test/.config/opencode', + syncConfigPath: '/home/test/.config/opencode/opencode-synced.jsonc', + overridesPath: '/home/test/.config/opencode/opencode-synced.overrides.jsonc', + statePath: '/home/test/.local/share/opencode/sync-state.json', + defaultRepoDir: '/home/test/.local/share/opencode/opencode-synced/repo', + }; +} + +describe('resolveTursoDatabaseName', () => { + it('uses explicit database name when configured', () => { + const config = normalizeSyncConfig({ + repo: { owner: 'acme', name: 'my-opencode-config' }, + includeSessions: true, + sessionBackend: { + type: 'turso', + turso: { + database: 'Custom DB Name', + }, + }, + }); + + expect(resolveTursoDatabaseName(config)).toBe('custom-db-name'); + }); + + it('derives database name from repo when not explicitly configured', () => { + const config = normalizeSyncConfig({ + repo: { owner: 'acme', name: 'my-opencode-config' }, + includeSessions: true, + sessionBackend: { type: 'turso' }, + }); + + expect(resolveTursoDatabaseName(config)).toBe('my-opencode-config-sessions'); + }); +}); + +describe('extractHeadlessLoginHints', () => { + it('extracts login url and code from headless output', () => { + const text = [ + 'To authenticate, open:', + 'https://auth.turso.tech/activate', + 'Then enter code: ABCD-EFGH', + ].join('\n'); + + expect(extractHeadlessLoginHints(text)).toEqual({ + url: 'https://auth.turso.tech/activate', + code: 'ABCD-EFGH', + }); + }); +}); + +describe('isRetryableTursoError', () => { + it('detects retryable errors', () => { + expect(isRetryableTursoError(new Error('database is busy'))).toBe(true); + expect(isRetryableTursoError(new Error('HTTP 503'))).toBe(true); + expect(isRetryableTursoError(new Error('rate limit exceeded'))).toBe(true); + }); + + it('does not mark non-retryable errors as retryable', () => { + expect(isRetryableTursoError(new Error('invalid auth token'))).toBe(false); + }); +}); + +describe('path helpers', () => { + it('resolves credential path and session db paths', () => { + const locations = createLocations(); + expect(resolveTursoCredentialPath(locations)).toBe( + '/home/test/.local/share/opencode/opencode-synced/turso-session.json' + ); + expect(resolveSessionDbPaths(locations)).toEqual({ + dbPath: '/home/test/.local/share/opencode/opencode.db', + walPath: '/home/test/.local/share/opencode/opencode.db-wal', + shmPath: '/home/test/.local/share/opencode/opencode.db-shm', + }); + }); +}); + +describe('extractRows', () => { + it('extracts rows from top-level execute result shape', () => { + const rows = [[{ type: 'text', value: 'hello' }]]; + expect(extractRows({ rows })).toEqual(rows); + expect(extractRows({ result: { rows } })).toEqual(rows); + }); + + it('extracts rows from Turso v2 pipeline execute envelope', () => { + const rows = [[{ type: 'text', value: 'hello' }]]; + expect( + extractRows({ + type: 'ok', + response: { + type: 'execute', + result: { + rows, + }, + }, + }) + ).toEqual(rows); + }); + + it('returns empty list when result has no rows', () => { + expect(extractRows({ type: 'ok' })).toEqual([]); + expect(extractRows(null)).toEqual([]); + }); +}); + +describe('snapshot payload sizing', () => { + it('estimates base64 length for byte counts', () => { + expect(estimateBase64EncodedLength(0)).toBe(0); + expect(estimateBase64EncodedLength(1)).toBe(4); + expect(estimateBase64EncodedLength(2)).toBe(4); + expect(estimateBase64EncodedLength(3)).toBe(4); + expect(estimateBase64EncodedLength(4)).toBe(8); + }); + + it('estimates combined snapshot payload size', () => { + const total = estimateSnapshotPayloadBase64Bytes({ + dbByteLength: 4, + walByteLength: 3, + shmByteLength: 1, + }); + expect(total).toBe(16); + }); + + it('checks whether snapshot payload size is within limit', () => { + expect(isSnapshotPayloadSizeAllowed(MAX_TURSO_SNAPSHOT_BASE64_BYTES)).toBe(true); + expect(isSnapshotPayloadSizeAllowed(MAX_TURSO_SNAPSHOT_BASE64_BYTES + 1)).toBe(false); + }); +}); diff --git a/src/sync/turso.ts b/src/sync/turso.ts new file mode 100644 index 0000000..678b59f --- /dev/null +++ b/src/sync/turso.ts @@ -0,0 +1,1369 @@ +import { spawn } from 'node:child_process'; +import crypto from 'node:crypto'; +import { promises as fs } from 'node:fs'; +import os from 'node:os'; +import path from 'node:path'; +import type { NormalizedSyncConfig } from './config.js'; +import { isPlainObject, isTursoSessionBackend, pathExists, writeJsonFile } from './config.js'; +import { SyncCommandError } from './errors.js'; +import type { SyncLocations } from './paths.js'; + +const SESSION_SYNC_TABLE = 'opencode_session_sync_snapshot'; +const CREDENTIAL_VERSION = 1; +const TURSO_INSTALL_SCRIPT = 'curl -sSfL https://get.tur.so/install.sh | bash'; +const TURSO_SQL_TIMEOUT_MS = 30_000; +const TURSO_PROCESS_KILL_GRACE_MS = 2_000; +export const MAX_TURSO_SNAPSHOT_BASE64_BYTES = 8 * 1024 * 1024; +const TURSO_EXECUTABLE_CANDIDATES = [ + 'turso', + '/opt/homebrew/bin/turso', + '/usr/local/bin/turso', + '~/.turso/turso', + '~/.local/bin/turso', +] as const; + +type TursoSqlArg = { type: 'text'; value: string } | { type: 'null' }; + +interface TursoPipelineExecuteRequest { + type: 'execute'; + stmt: { + sql: string; + args?: TursoSqlArg[]; + }; +} + +interface TursoPipelineCloseRequest { + type: 'close'; +} + +interface SessionSnapshot { + db: Buffer; + wal: Buffer | null; + shm: Buffer | null; + sha256: string; +} + +interface TursoSessionCredential { + version: number; + database: string; + url: string; + httpUrl: string; + token: string; + machineId: string; + createdAt: string; + updatedAt: string; + syncState?: { + lastKnownSnapshotSha?: string; + updatedAt?: string; + }; +} + +interface TursoCommandResult { + code: number; + stdout: string; + stderr: string; + timedOut: boolean; +} + +interface TimeoutSignalHandle { + signal: AbortSignal; + cleanup: () => void; +} + +export interface TursoSyncLogger { + debug: (message: string, metadata?: Record) => void; + info: (message: string, metadata?: Record) => void; + warn: (message: string, metadata?: Record) => void; + error: (message: string, metadata?: Record) => void; +} + +export interface TursoSetupResult { + ready: boolean; + changed: boolean; + message: string; + loginUrl?: string; + loginCode?: string; +} + +export interface TursoSessionSyncResult { + status: 'synced' | 'unchanged' | 'skipped'; + message: string; + sha256?: string; +} + +export interface TursoSessionSyncCycleResult { + pullBefore: TursoSessionSyncResult; + push: TursoSessionSyncResult; + pullAfter: TursoSessionSyncResult; +} + +export type TursoSyncPreference = 'auto' | 'pull' | 'push'; + +export interface TursoSessionSetupOptions { + allowLogin?: boolean; + allowAutoInstall?: boolean; + forceTokenRefresh?: boolean; +} + +export interface TursoSessionBackend { + ensureSetup: (_options?: TursoSessionSetupOptions) => Promise; + status: () => Promise; + pull: () => Promise; + push: () => Promise; + syncCycle: (_options?: { + preference?: TursoSyncPreference; + allowLocalPull?: boolean; + }) => Promise; +} + +export function createTursoSessionBackend(options: { + locations: SyncLocations; + config: NormalizedSyncConfig; + log: TursoSyncLogger; +}): TursoSessionBackend { + const { locations, config, log } = options; + const paths = resolveSessionDbPaths(locations); + const credentialPath = resolveTursoCredentialPath(locations); + const backendConfig = config.sessionBackend.turso; + + const ensureSetup = async ( + setupOptions: TursoSessionSetupOptions = {} + ): Promise => { + if (!isTursoSessionBackend(config)) { + throw new SyncCommandError('Turso session backend is not enabled.'); + } + + const expectedDatabase = resolveTursoDatabaseName(config); + const allowAutoInstall = setupOptions.allowAutoInstall ?? backendConfig.autoSetup; + const existing = await readTursoCredential(credentialPath); + if ( + existing && + existing.database === expectedDatabase && + !setupOptions.forceTokenRefresh && + (await isCredentialUsable(existing)) + ) { + return { + ready: true, + changed: false, + message: `Turso session backend ready (${existing.database}).`, + }; + } + + const executable = await ensureTursoExecutable(locations, allowAutoInstall); + const authenticated = await isTursoAuthenticated(executable); + if (!authenticated) { + if (!setupOptions.allowLogin) { + return { + ready: false, + changed: false, + message: + 'Turso CLI is not authenticated. Run /sync-sessions-setup-turso to complete headless login.', + }; + } + + const headlessAuth = await runTursoHeadlessAuth(executable); + if (!headlessAuth.ready) { + return { + ready: false, + changed: false, + message: headlessAuth.message, + loginUrl: headlessAuth.loginUrl, + loginCode: headlessAuth.loginCode, + }; + } + + const authenticatedAfterLogin = await isTursoAuthenticated(executable); + if (!authenticatedAfterLogin) { + return { + ready: false, + changed: false, + message: + 'Turso CLI login did not complete successfully. Re-run /sync-sessions-setup-turso.', + }; + } + } + + await ensureTursoDatabaseExists(executable, expectedDatabase); + const detectedUrl = await resolveDatabaseUrl(executable, expectedDatabase); + const configuredUrl = backendConfig.url?.trim() || detectedUrl; + const httpUrl = await resolveHttpUrl(executable, expectedDatabase, configuredUrl); + + let token = existing?.token ?? ''; + if (!token || setupOptions.forceTokenRefresh) { + token = await createTursoToken(executable, expectedDatabase); + } + + let credential: TursoSessionCredential = { + version: CREDENTIAL_VERSION, + database: expectedDatabase, + url: configuredUrl, + httpUrl, + token, + machineId: resolveMachineId(), + createdAt: existing?.createdAt ?? new Date().toISOString(), + updatedAt: new Date().toISOString(), + syncState: existing?.syncState, + }; + + if (!(await isCredentialUsable(credential))) { + token = await createTursoToken(executable, expectedDatabase); + credential = { + ...credential, + token, + updatedAt: new Date().toISOString(), + }; + if (!(await isCredentialUsable(credential))) { + throw new SyncCommandError( + 'Failed to validate Turso session credentials after token provisioning.' + ); + } + } + + await writeCredential(credentialPath, credential); + + return { + ready: true, + changed: true, + message: `Turso session backend is configured for database "${expectedDatabase}".`, + }; + }; + + const requireCredential = async (): Promise => { + const setup = await ensureSetup({ allowLogin: false }); + if (!setup.ready) { + throw new SyncCommandError(setup.message); + } + + const credential = await readTursoCredential(credentialPath); + if (!credential) { + throw new SyncCommandError('Turso credentials are missing after setup.'); + } + return credential; + }; + + const writeKnownSha = async ( + credential: TursoSessionCredential, + nextSha: string | null + ): Promise => { + if (!nextSha) return; + if (credential.syncState?.lastKnownSnapshotSha === nextSha) { + return; + } + + const updated: TursoSessionCredential = { + ...credential, + syncState: { + ...credential.syncState, + lastKnownSnapshotSha: nextSha, + updatedAt: new Date().toISOString(), + }, + updatedAt: new Date().toISOString(), + }; + await writeCredential(credentialPath, updated); + }; + + const applyPull = async ( + credential: TursoSessionCredential, + remoteSnapshot: { + db: Buffer; + wal: Buffer | null; + shm: Buffer | null; + sha256: string; + machineId: string; + updatedAt: string; + } + ): Promise => { + await writeLocalSessionSnapshot(paths, remoteSnapshot); + await writeKnownSha(credential, remoteSnapshot.sha256); + return { + status: 'synced', + sha256: remoteSnapshot.sha256, + message: `Pulled sessions from Turso snapshot (${remoteSnapshot.machineId}).`, + }; + }; + + const applyPush = async ( + credential: TursoSessionCredential, + localSnapshot: SessionSnapshot + ): Promise => { + await upsertRemoteSnapshot(credential, localSnapshot, resolveMachineId()); + await writeKnownSha(credential, localSnapshot.sha256); + return { + status: 'synced', + sha256: localSnapshot.sha256, + message: 'Pushed local sessions to Turso.', + }; + }; + + const pull = async (): Promise => { + const credential = await requireCredential(); + await ensureSnapshotTable(credential); + + const [remoteSnapshot, localSnapshot] = await Promise.all([ + fetchRemoteSnapshot(credential), + readLocalSessionSnapshot(paths), + ]); + if (!remoteSnapshot) { + return { + status: 'skipped', + message: 'No remote session snapshot found in Turso yet.', + }; + } + + if (localSnapshot && localSnapshot.sha256 === remoteSnapshot.sha256) { + return { + status: 'unchanged', + sha256: remoteSnapshot.sha256, + message: 'Local sessions already match Turso snapshot.', + }; + } + + return await applyPull(credential, remoteSnapshot); + }; + + const push = async (): Promise => { + const localSnapshot = await readLocalSessionSnapshot(paths); + if (!localSnapshot) { + return { + status: 'skipped', + message: `Local session database not found at ${paths.dbPath}.`, + }; + } + + const credential = await requireCredential(); + await ensureSnapshotTable(credential); + + const remoteSnapshot = await fetchRemoteSnapshot(credential); + if (remoteSnapshot && remoteSnapshot.sha256 === localSnapshot.sha256) { + return { + status: 'unchanged', + sha256: localSnapshot.sha256, + message: 'Turso snapshot already matches local sessions.', + }; + } + + return await applyPush(credential, localSnapshot); + }; + + const status = async (): Promise => { + const credential = await readTursoCredential(credentialPath); + if (!credential) { + return 'Turso backend selected, but machine-local credentials are not set up.'; + } + + const usable = await isCredentialUsable(credential); + if (!usable) { + return `Turso credential exists for "${credential.database}", but token validation failed.`; + } + + return `Turso backend ready (${credential.database}); concurrent-safe backend enabled.`; + }; + + const syncCycle = async ( + options: { preference?: TursoSyncPreference; allowLocalPull?: boolean } = {} + ): Promise => { + const preference = options.preference ?? 'auto'; + const allowLocalPull = options.allowLocalPull ?? true; + const credential = await requireCredential(); + await ensureSnapshotTable(credential); + + const [remoteSnapshot, localSnapshot] = await Promise.all([ + fetchRemoteSnapshot(credential), + readLocalSessionSnapshot(paths), + ]); + const knownSha = credential.syncState?.lastKnownSnapshotSha?.trim() || null; + const pullBefore: TursoSessionSyncResult = { + status: 'skipped', + message: 'No pull required.', + }; + const pushResult: TursoSessionSyncResult = { + status: 'skipped', + message: 'No push required.', + }; + const pullAfter: TursoSessionSyncResult = { + status: 'skipped', + message: 'No final pull required.', + }; + const localPullDeferredMessage = + 'Remote session snapshot available, but local apply is deferred until startup to avoid live SQLite replacement.'; + + if (!localSnapshot && !remoteSnapshot) { + pullBefore.message = `Local session database not found at ${paths.dbPath}.`; + pushResult.message = 'No remote session snapshot found in Turso yet.'; + pullAfter.message = 'No final pull required.'; + return { pullBefore, push: pushResult, pullAfter }; + } + + if (localSnapshot && remoteSnapshot && localSnapshot.sha256 === remoteSnapshot.sha256) { + await writeKnownSha(credential, localSnapshot.sha256); + pullBefore.status = 'unchanged'; + pullBefore.sha256 = localSnapshot.sha256; + pullBefore.message = 'Local and remote session snapshots already match.'; + pushResult.status = 'unchanged'; + pushResult.sha256 = localSnapshot.sha256; + pushResult.message = 'No session changes to push.'; + pullAfter.status = 'unchanged'; + pullAfter.sha256 = localSnapshot.sha256; + pullAfter.message = 'No session changes to pull.'; + return { pullBefore, push: pushResult, pullAfter }; + } + + const localSha = localSnapshot?.sha256 ?? null; + const remoteSha = remoteSnapshot?.sha256 ?? null; + const localChanged = knownSha ? localSha !== knownSha : localSnapshot !== null; + const remoteChanged = knownSha + ? remoteSha !== null && remoteSha !== knownSha + : remoteSnapshot !== null; + + const shouldPreferPush = + preference === 'push' || + (preference === 'auto' && knownSha !== null && localChanged && !remoteChanged); + const shouldPreferPull = + preference === 'pull' || + (preference === 'auto' && knownSha !== null && !localChanged && remoteChanged); + + if (!knownSha) { + if (remoteSnapshot && (shouldPreferPull || !localSnapshot)) { + if (!allowLocalPull) { + pullBefore.status = 'skipped'; + pullBefore.sha256 = remoteSnapshot.sha256; + pullBefore.message = localPullDeferredMessage; + pushResult.status = 'skipped'; + pushResult.message = + 'Skipped push because remote snapshot is newer and runtime local pull is disabled.'; + return { pullBefore, push: pushResult, pullAfter }; + } + + const pulled = await applyPull(credential, remoteSnapshot); + pullBefore.status = pulled.status; + pullBefore.sha256 = pulled.sha256; + pullBefore.message = pulled.message; + pushResult.status = 'skipped'; + pushResult.message = 'Skipped push after pull-preferred bootstrap.'; + pullAfter.status = 'unchanged'; + pullAfter.sha256 = pulled.sha256; + pullAfter.message = 'No final pull required.'; + return { pullBefore, push: pushResult, pullAfter }; + } + + if (localSnapshot) { + const pushed = await applyPush(credential, localSnapshot); + pushResult.status = pushed.status; + pushResult.sha256 = pushed.sha256; + pushResult.message = pushed.message; + pullBefore.status = 'skipped'; + pullBefore.message = 'Skipped initial pull during push-preferred bootstrap.'; + pullAfter.status = 'skipped'; + pullAfter.message = 'Skipped final pull during push-preferred bootstrap.'; + return { pullBefore, push: pushResult, pullAfter }; + } + } + + if (localChanged && !remoteChanged && localSnapshot) { + const pushed = await applyPush(credential, localSnapshot); + pushResult.status = pushed.status; + pushResult.sha256 = pushed.sha256; + pushResult.message = pushed.message; + return { pullBefore, push: pushResult, pullAfter }; + } + + if (!localChanged && remoteChanged && remoteSnapshot) { + if (!allowLocalPull) { + pullBefore.status = 'skipped'; + pullBefore.sha256 = remoteSnapshot.sha256; + pullBefore.message = localPullDeferredMessage; + return { pullBefore, push: pushResult, pullAfter }; + } + + const pulled = await applyPull(credential, remoteSnapshot); + pullBefore.status = pulled.status; + pullBefore.sha256 = pulled.sha256; + pullBefore.message = pulled.message; + return { pullBefore, push: pushResult, pullAfter }; + } + + if (localChanged && remoteChanged) { + if (shouldPreferPull && remoteSnapshot) { + if (!allowLocalPull) { + pullBefore.status = 'skipped'; + pullBefore.sha256 = remoteSnapshot.sha256; + pullBefore.message = `${localPullDeferredMessage} (conflict deferred by pull preference)`; + return { pullBefore, push: pushResult, pullAfter }; + } + + const pulled = await applyPull(credential, remoteSnapshot); + pullBefore.status = pulled.status; + pullBefore.sha256 = pulled.sha256; + pullBefore.message = `${pulled.message} (resolved by pull preference)`; + return { pullBefore, push: pushResult, pullAfter }; + } + + if (shouldPreferPush && localSnapshot) { + const pushed = await applyPush(credential, localSnapshot); + pushResult.status = pushed.status; + pushResult.sha256 = pushed.sha256; + pushResult.message = `${pushed.message} (resolved by push preference)`; + return { pullBefore, push: pushResult, pullAfter }; + } + + if (localSnapshot) { + const pushed = await applyPush(credential, localSnapshot); + pushResult.status = pushed.status; + pushResult.sha256 = pushed.sha256; + pushResult.message = `${pushed.message} (resolved by auto preference)`; + return { pullBefore, push: pushResult, pullAfter }; + } + } + + if (remoteSnapshot && !localSnapshot) { + if (!allowLocalPull) { + pullBefore.status = 'skipped'; + pullBefore.sha256 = remoteSnapshot.sha256; + pullBefore.message = localPullDeferredMessage; + return { pullBefore, push: pushResult, pullAfter }; + } + + const pulled = await applyPull(credential, remoteSnapshot); + pullBefore.status = pulled.status; + pullBefore.sha256 = pulled.sha256; + pullBefore.message = pulled.message; + return { pullBefore, push: pushResult, pullAfter }; + } + + pullBefore.status = 'unchanged'; + pullBefore.sha256 = localSha ?? remoteSha ?? undefined; + pullBefore.message = 'No Turso session changes detected.'; + log.debug('Completed Turso session sync cycle', { + pullBefore: pullBefore.status, + push: pushResult.status, + pullAfter: pullAfter.status, + preference, + allowLocalPull, + knownSha, + localSha, + remoteSha, + }); + return { + pullBefore, + push: pushResult, + pullAfter, + }; + }; + + return { + ensureSetup, + status, + pull, + push, + syncCycle, + }; +} + +export function resolveTursoCredentialPath(locations: SyncLocations): string { + return path.join(locations.xdg.dataDir, 'opencode', 'opencode-synced', 'turso-session.json'); +} + +export function resolveSessionDbPaths(locations: SyncLocations): { + dbPath: string; + walPath: string; + shmPath: string; +} { + const dataRoot = path.join(locations.xdg.dataDir, 'opencode'); + const dbPath = path.join(dataRoot, 'opencode.db'); + return { + dbPath, + walPath: `${dbPath}-wal`, + shmPath: `${dbPath}-shm`, + }; +} + +export function resolveTursoDatabaseName(config: NormalizedSyncConfig): string { + const explicit = config.sessionBackend.turso.database?.trim(); + if (explicit) { + return sanitizeDatabaseName(explicit); + } + + const repoName = config.repo?.name?.trim() || 'opencode-config'; + return sanitizeDatabaseName(`${repoName}-sessions`); +} + +export function extractHeadlessLoginHints(text: string): { url?: string; code?: string } { + const urlMatch = text.match(/https?:\/\/[^\s)]+/i); + const codeMatch = + text.match(/\b[A-Z0-9]{4}(?:-[A-Z0-9]{4})+\b/) ?? text.match(/\b[A-Z0-9]{6,}\b/); + return { + url: urlMatch?.[0], + code: codeMatch?.[0], + }; +} + +export function isRetryableTursoError(error: unknown): boolean { + const message = + error instanceof Error ? error.message.toLowerCase() : String(error).toLowerCase(); + return [ + 'busy', + 'timeout', + 'temporar', + 'connection reset', + 'connection refused', + 'econnreset', + 'etimedout', + '429', + '500', + '502', + '503', + '504', + 'rate limit', + ].some((token) => message.includes(token)); +} + +export function estimateBase64EncodedLength(byteLength: number): number { + if (!Number.isFinite(byteLength) || byteLength <= 0) { + return 0; + } + return Math.ceil(byteLength / 3) * 4; +} + +export function estimateSnapshotPayloadBase64Bytes(input: { + dbByteLength: number; + walByteLength?: number | null; + shmByteLength?: number | null; +}): number { + return ( + estimateBase64EncodedLength(input.dbByteLength) + + estimateBase64EncodedLength(input.walByteLength ?? 0) + + estimateBase64EncodedLength(input.shmByteLength ?? 0) + ); +} + +export function isSnapshotPayloadSizeAllowed(totalBase64Bytes: number): boolean { + return totalBase64Bytes <= MAX_TURSO_SNAPSHOT_BASE64_BYTES; +} + +function sanitizeDatabaseName(input: string): string { + const cleaned = input + .trim() + .toLowerCase() + .replace(/[^a-z0-9-]+/g, '-') + .replace(/-+/g, '-') + .replace(/^-+|-+$/g, ''); + const fallback = cleaned || 'opencode-sessions'; + return fallback.slice(0, 58); +} + +function resolveMachineId(): string { + const user = process.env.USER ?? process.env.USERNAME ?? 'unknown'; + const host = os.hostname() || 'unknown-host'; + return `${user}@${host}`; +} + +async function readTursoCredential(filePath: string): Promise { + if (!(await pathExists(filePath))) { + return null; + } + + try { + const raw = await fs.readFile(filePath, 'utf8'); + const parsed = JSON.parse(raw) as unknown; + if (!isPlainObject(parsed)) return null; + + const credential = parsed as Partial; + if ( + typeof credential.database !== 'string' || + typeof credential.url !== 'string' || + typeof credential.httpUrl !== 'string' || + typeof credential.token !== 'string' || + typeof credential.machineId !== 'string' + ) { + return null; + } + + const syncStateInput = isPlainObject(credential.syncState) ? credential.syncState : null; + const lastKnownSnapshotSha = + syncStateInput && typeof syncStateInput.lastKnownSnapshotSha === 'string' + ? syncStateInput.lastKnownSnapshotSha + : undefined; + const syncUpdatedAt = + syncStateInput && typeof syncStateInput.updatedAt === 'string' + ? syncStateInput.updatedAt + : undefined; + + return { + version: Number(credential.version ?? CREDENTIAL_VERSION), + database: credential.database, + url: credential.url, + httpUrl: trimTrailingSlash(credential.httpUrl), + token: credential.token, + machineId: credential.machineId, + createdAt: typeof credential.createdAt === 'string' ? credential.createdAt : '', + updatedAt: typeof credential.updatedAt === 'string' ? credential.updatedAt : '', + syncState: + lastKnownSnapshotSha || syncUpdatedAt + ? { + lastKnownSnapshotSha, + updatedAt: syncUpdatedAt, + } + : undefined, + }; + } catch { + return null; + } +} + +async function writeCredential( + filePath: string, + credential: TursoSessionCredential +): Promise { + await fs.mkdir(path.dirname(filePath), { recursive: true }); + await writeJsonFile(filePath, credential, { jsonc: false, mode: 0o600 }); +} + +async function isCredentialUsable(credential: TursoSessionCredential): Promise { + try { + await executeSql(credential, 'SELECT 1'); + return true; + } catch { + return false; + } +} + +async function ensureTursoExecutable( + locations: SyncLocations, + allowAutoInstall: boolean +): Promise { + const detected = await detectTursoExecutable(locations); + if (detected) { + return detected; + } + + if (!allowAutoInstall) { + throw new SyncCommandError( + 'Turso CLI not found. Run /sync-sessions-setup-turso to install it.' + ); + } + + if (process.platform !== 'win32') { + const installScript = await runCommand('bash', ['-c', TURSO_INSTALL_SCRIPT], { + timeoutMs: 120000, + }); + if (installScript.code !== 0) { + // Optional fallback when Homebrew is already present. + const brewInstalled = await isCommandExecutable('brew'); + if (brewInstalled) { + await runCommand('brew', ['tap', 'libsql/sqld'], { timeoutMs: 60000 }); + await runCommand('brew', ['tap', 'tursodatabase/tap'], { timeoutMs: 60000 }); + const brewInstall = await runCommand('brew', ['install', 'turso'], { timeoutMs: 120000 }); + if (brewInstall.code !== 0) { + throw new SyncCommandError( + 'Failed to install Turso CLI with install script and Homebrew fallback: ' + + `${combineCommandOutput(installScript)}\n${combineCommandOutput(brewInstall)}` + ); + } + } else { + throw new SyncCommandError( + `Failed to install Turso CLI with install script: ${combineCommandOutput(installScript)}` + ); + } + } + } else { + throw new SyncCommandError( + 'Automatic Turso CLI install is not supported on this platform. Install Turso manually, then retry.' + ); + } + + const afterInstall = await detectTursoExecutable(locations); + if (!afterInstall) { + throw new SyncCommandError( + 'Turso CLI installation completed, but executable was not found in PATH or standard install locations.' + ); + } + + return afterInstall; +} + +async function runTursoHeadlessAuth( + executable: string +): Promise<{ ready: boolean; message: string; loginUrl?: string; loginCode?: string }> { + const loginResult = await runTursoCommand(executable, ['auth', 'login', '--headless'], { + timeoutMs: 180000, + }); + if (loginResult.code === 0 && !loginResult.timedOut) { + const authenticated = await isTursoAuthenticated(executable); + if (authenticated) { + return { + ready: true, + message: 'Turso CLI login completed.', + }; + } + } + + const loginCombined = combineCommandOutput(loginResult); + const loginHints = extractHeadlessLoginHints(loginCombined); + const signupResult = await runTursoCommand(executable, ['auth', 'signup', '--headless'], { + timeoutMs: 180000, + }); + const signupCombined = combineCommandOutput(signupResult); + const signupHints = extractHeadlessLoginHints(signupCombined); + + const loginUrl = loginHints.url ?? signupHints.url; + const loginCode = loginHints.code ?? signupHints.code; + return { + ready: false, + message: + 'Turso login/signup requires browser authorization. Complete the headless auth URL and rerun /sync-sessions-setup-turso.', + loginUrl, + loginCode, + }; +} + +async function detectTursoExecutable(locations: SyncLocations): Promise { + for (const candidate of TURSO_EXECUTABLE_CANDIDATES) { + const resolved = candidate.startsWith('~') + ? path.resolve(candidate.replace(/^~(?=\/)/, locations.xdg.homeDir)) + : candidate; + if (await isCommandExecutable(resolved)) { + return resolved; + } + } + return null; +} + +async function isCommandExecutable(command: string): Promise { + try { + const probe = await runCommand(command, ['--version'], { timeoutMs: 10000 }); + return probe.code === 0; + } catch { + return false; + } +} + +async function isTursoAuthenticated(executable: string): Promise { + const status = await runTursoCommand(executable, ['auth', 'whoami']); + return status.code === 0; +} + +async function ensureTursoDatabaseExists(executable: string, database: string): Promise { + const show = await runTursoCommand(executable, ['db', 'show', database, '--url']); + if (show.code === 0) { + return; + } + + const create = await runTursoCommand(executable, ['db', 'create', database], { + timeoutMs: 120000, + }); + if (create.code !== 0) { + throw new SyncCommandError( + `Failed to create Turso database "${database}": ${combineCommandOutput(create)}` + ); + } +} + +async function resolveDatabaseUrl(executable: string, database: string): Promise { + const result = await runTursoCommand(executable, ['db', 'show', database, '--url']); + if (result.code !== 0) { + throw new SyncCommandError( + `Failed to resolve Turso database URL: ${combineCommandOutput(result)}` + ); + } + return result.stdout.trim(); +} + +async function resolveHttpUrl( + executable: string, + database: string, + baseUrl: string +): Promise { + if (baseUrl.startsWith('http://') || baseUrl.startsWith('https://')) { + return trimTrailingSlash(baseUrl); + } + if (baseUrl.startsWith('libsql://')) { + return trimTrailingSlash(`https://${baseUrl.slice('libsql://'.length)}`); + } + + const result = await runTursoCommand(executable, ['db', 'show', database, '--http-url']); + if (result.code !== 0) { + throw new SyncCommandError(`Failed to resolve Turso HTTP URL: ${combineCommandOutput(result)}`); + } + + return trimTrailingSlash(result.stdout.trim()); +} + +async function createTursoToken(executable: string, database: string): Promise { + let result = await runTursoCommand(executable, [ + 'db', + 'tokens', + 'create', + database, + '--expiration', + 'never', + ]); + if (result.code !== 0) { + result = await runTursoCommand(executable, ['db', 'tokens', 'create', database]); + } + + if (result.code !== 0) { + throw new SyncCommandError(`Failed to create Turso token: ${combineCommandOutput(result)}`); + } + + const combined = combineCommandOutput(result); + const jwtMatch = combined.match(/[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+/); + if (jwtMatch) { + return jwtMatch[0]; + } + + const lines = combined + .split('\n') + .map((entry) => entry.trim()) + .filter(Boolean); + const candidate = lines[lines.length - 1]; + if (!candidate) { + throw new SyncCommandError('Turso token command did not return a token value.'); + } + return candidate; +} + +async function runTursoCommand( + executable: string, + args: string[], + options: { timeoutMs?: number } = {} +): Promise { + return await runCommand(executable, args, options); +} + +async function runCommand( + executable: string, + args: string[], + options: { timeoutMs?: number } = {} +): Promise { + return await new Promise((resolve, reject) => { + const child = spawn(executable, args, { + stdio: ['ignore', 'pipe', 'pipe'], + env: process.env, + }); + + let stdout = ''; + let stderr = ''; + let timedOut = false; + let timeout: NodeJS.Timeout | null = null; + let forceKillTimeout: NodeJS.Timeout | null = null; + + const clearCommandTimers = (): void => { + if (timeout) clearTimeout(timeout); + if (forceKillTimeout) clearTimeout(forceKillTimeout); + }; + + if (options.timeoutMs && options.timeoutMs > 0) { + timeout = setTimeout(() => { + timedOut = true; + try { + child.kill('SIGTERM'); + } catch { + return; + } + forceKillTimeout = setTimeout(() => { + if (child.exitCode !== null || child.signalCode !== null) return; + try { + child.kill('SIGKILL'); + } catch { + // Process can already be gone by the time fallback runs. + } + }, TURSO_PROCESS_KILL_GRACE_MS); + }, options.timeoutMs); + } + + child.stdout.on('data', (chunk: Buffer | string) => { + stdout += chunk.toString(); + }); + child.stderr.on('data', (chunk: Buffer | string) => { + stderr += chunk.toString(); + }); + + child.on('error', (error) => { + clearCommandTimers(); + reject(error); + }); + + child.on('close', (code) => { + clearCommandTimers(); + resolve({ + code: code ?? 1, + stdout, + stderr, + timedOut, + }); + }); + }); +} + +function combineCommandOutput(result: TursoCommandResult): string { + const parts = [result.stdout.trim(), result.stderr.trim()].filter(Boolean); + const joined = parts.join('\n'); + if (result.timedOut) { + return joined ? `${joined}\n(command timed out)` : 'command timed out'; + } + return joined; +} + +async function readLocalSessionSnapshot(paths: { + dbPath: string; + walPath: string; + shmPath: string; +}): Promise { + if (!(await pathExists(paths.dbPath))) { + return null; + } + + const db = await fs.readFile(paths.dbPath); + const wal = (await pathExists(paths.walPath)) ? await fs.readFile(paths.walPath) : null; + const shm = (await pathExists(paths.shmPath)) ? await fs.readFile(paths.shmPath) : null; + return { + db, + wal, + shm, + sha256: computeSnapshotSha256(db, wal, shm), + }; +} + +async function writeLocalSessionSnapshot( + paths: { dbPath: string; walPath: string; shmPath: string }, + snapshot: { + db: Buffer; + wal: Buffer | null; + shm: Buffer | null; + sha256: string; + machineId: string; + updatedAt: string; + } +): Promise { + await writeBufferAtomically(paths.dbPath, snapshot.db); + if (snapshot.wal) { + await writeBufferAtomically(paths.walPath, snapshot.wal); + } else { + await fs.rm(paths.walPath, { force: true }); + } + + if (snapshot.shm) { + await writeBufferAtomically(paths.shmPath, snapshot.shm); + } else { + await fs.rm(paths.shmPath, { force: true }); + } +} + +async function writeBufferAtomically(targetPath: string, payload: Buffer): Promise { + await fs.mkdir(path.dirname(targetPath), { recursive: true }); + const tempPath = `${targetPath}.tmp-${process.pid}-${Date.now()}-${crypto.randomUUID()}`; + await fs.writeFile(tempPath, payload, { mode: 0o600 }); + await fs.rename(tempPath, targetPath); +} + +function computeSnapshotSha256(db: Buffer, wal: Buffer | null, shm: Buffer | null): string { + const hash = crypto.createHash('sha256'); + hash.update(Buffer.from(String(db.byteLength))); + hash.update(Buffer.from([0])); + hash.update(db); + hash.update(Buffer.from([0])); + if (wal) { + hash.update(Buffer.from(String(wal.byteLength))); + hash.update(Buffer.from([0])); + hash.update(wal); + } + hash.update(Buffer.from([0])); + if (shm) { + hash.update(Buffer.from(String(shm.byteLength))); + hash.update(Buffer.from([0])); + hash.update(shm); + } + return hash.digest('hex'); +} + +async function ensureSnapshotTable(credential: TursoSessionCredential): Promise { + const sql = [ + `CREATE TABLE IF NOT EXISTS ${SESSION_SYNC_TABLE} (`, + 'id INTEGER PRIMARY KEY CHECK (id = 1),', + 'updated_at TEXT NOT NULL,', + 'machine_id TEXT NOT NULL,', + 'payload_sha256 TEXT NOT NULL,', + 'payload_db_b64 TEXT NOT NULL,', + 'payload_wal_b64 TEXT,', + 'payload_shm_b64 TEXT', + ')', + ].join(' '); + await executeSql(credential, sql); +} + +async function fetchRemoteSnapshot(credential: TursoSessionCredential): Promise<{ + db: Buffer; + wal: Buffer | null; + shm: Buffer | null; + sha256: string; + machineId: string; + updatedAt: string; +} | null> { + const query = [ + `SELECT json_object(`, + "'updatedAt', updated_at,", + "'machineId', machine_id,", + "'sha256', payload_sha256,", + "'db', payload_db_b64,", + "'wal', payload_wal_b64,", + "'shm', payload_shm_b64", + `) FROM ${SESSION_SYNC_TABLE} WHERE id = 1`, + ].join(' '); + + const payload = await querySingleText(credential, query); + if (!payload) { + return null; + } + + let parsed: unknown; + try { + parsed = JSON.parse(payload) as unknown; + } catch { + throw new SyncCommandError('Turso snapshot payload is not valid JSON.'); + } + + if (!isPlainObject(parsed)) { + throw new SyncCommandError('Turso snapshot payload has unexpected structure.'); + } + + const dbB64 = typeof parsed.db === 'string' ? parsed.db : ''; + const walB64 = typeof parsed.wal === 'string' ? parsed.wal : null; + const shmB64 = typeof parsed.shm === 'string' ? parsed.shm : null; + const sha256 = typeof parsed.sha256 === 'string' ? parsed.sha256 : ''; + const machineId = typeof parsed.machineId === 'string' ? parsed.machineId : 'unknown'; + const updatedAt = typeof parsed.updatedAt === 'string' ? parsed.updatedAt : ''; + + if (!dbB64 || !sha256) { + throw new SyncCommandError('Turso snapshot payload is missing required session data.'); + } + + return { + db: Buffer.from(dbB64, 'base64'), + wal: walB64 ? Buffer.from(walB64, 'base64') : null, + shm: shmB64 ? Buffer.from(shmB64, 'base64') : null, + sha256, + machineId, + updatedAt, + }; +} + +async function upsertRemoteSnapshot( + credential: TursoSessionCredential, + snapshot: SessionSnapshot, + machineId: string +): Promise { + const dbPayloadBase64 = snapshot.db.toString('base64'); + const walPayloadBase64 = snapshot.wal ? snapshot.wal.toString('base64') : null; + const shmPayloadBase64 = snapshot.shm ? snapshot.shm.toString('base64') : null; + const payloadBase64Bytes = estimateSnapshotPayloadBase64Bytes({ + dbByteLength: snapshot.db.byteLength, + walByteLength: snapshot.wal?.byteLength, + shmByteLength: snapshot.shm?.byteLength, + }); + if (!isSnapshotPayloadSizeAllowed(payloadBase64Bytes)) { + throw new SyncCommandError( + `Session snapshot payload is too large for Turso upload ` + + `(${payloadBase64Bytes} base64 bytes; max ${MAX_TURSO_SNAPSHOT_BASE64_BYTES}). ` + + 'Chunked uploads are not supported yet.' + ); + } + + const upsert = [ + `INSERT INTO ${SESSION_SYNC_TABLE} (`, + 'id, updated_at, machine_id, payload_sha256, payload_db_b64, payload_wal_b64, payload_shm_b64', + `) VALUES (1, ?, ?, ?, ?, ?, ?)`, + 'ON CONFLICT(id) DO UPDATE SET', + 'updated_at = excluded.updated_at,', + 'machine_id = excluded.machine_id,', + 'payload_sha256 = excluded.payload_sha256,', + 'payload_db_b64 = excluded.payload_db_b64,', + 'payload_wal_b64 = excluded.payload_wal_b64,', + 'payload_shm_b64 = excluded.payload_shm_b64', + ].join(' '); + + const args: TursoSqlArg[] = [ + { type: 'text', value: new Date().toISOString() }, + { type: 'text', value: machineId }, + { type: 'text', value: snapshot.sha256 }, + { type: 'text', value: dbPayloadBase64 }, + walPayloadBase64 ? { type: 'text', value: walPayloadBase64 } : { type: 'null' }, + shmPayloadBase64 ? { type: 'text', value: shmPayloadBase64 } : { type: 'null' }, + ]; + + await executeSql(credential, upsert, args); +} + +async function querySingleText( + credential: TursoSessionCredential, + sql: string, + args: TursoSqlArg[] = [] +): Promise { + const results = await executeSql(credential, sql, args); + if (results.length === 0) { + return null; + } + const first = results[0]; + const rows = extractRows(first); + if (rows.length === 0) { + return null; + } + + const firstRow = rows[0]; + if (Array.isArray(firstRow)) { + return decodeSqlCellToText(firstRow[0] ?? null); + } + if (isPlainObject(firstRow)) { + const values = Object.values(firstRow); + return decodeSqlCellToText(values[0] ?? null); + } + return null; +} + +async function executeSql( + credential: TursoSessionCredential, + sql: string, + args: TursoSqlArg[] = [] +): Promise { + const body = { + requests: [ + { + type: 'execute', + stmt: { + sql, + ...(args.length > 0 ? { args } : {}), + }, + } satisfies TursoPipelineExecuteRequest, + { type: 'close' } satisfies TursoPipelineCloseRequest, + ], + }; + + const timeout = createTimeoutSignal(TURSO_SQL_TIMEOUT_MS); + let response: Response; + try { + response = await fetch(`${credential.httpUrl}/v2/pipeline`, { + method: 'POST', + headers: { + Authorization: `Bearer ${credential.token}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify(body), + signal: timeout.signal, + }); + } catch (error) { + if (isAbortError(error)) { + throw new SyncCommandError(`Turso SQL request timed out after ${TURSO_SQL_TIMEOUT_MS}ms.`); + } + throw new SyncCommandError(`Turso SQL request failed: ${formatUnknownError(error)}`); + } finally { + timeout.cleanup(); + } + + const text = await response.text(); + if (!response.ok) { + throw new SyncCommandError( + `Turso SQL request failed (${response.status}): ${text.slice(0, 500)}` + ); + } + + let parsed: unknown; + try { + parsed = text ? (JSON.parse(text) as unknown) : {}; + } catch { + throw new SyncCommandError('Turso SQL response was not valid JSON.'); + } + + if (!isPlainObject(parsed) || !Array.isArray(parsed.results)) { + throw new SyncCommandError('Turso SQL response had unexpected shape.'); + } + + for (const result of parsed.results) { + if (!isPlainObject(result)) continue; + const resultError = extractSqlError(result); + if (resultError) { + throw new SyncCommandError(`Turso SQL error: ${resultError}`); + } + } + + return parsed.results; +} + +function extractSqlError(result: Record): string | null { + if (result.error) { + return String(result.error); + } + + const response = isPlainObject(result.response) ? result.response : null; + if (!response) return null; + + if (response.error) { + return String(response.error); + } + + return null; +} + +export function extractRows(result: unknown): unknown[] { + if (!isPlainObject(result)) return []; + if (Array.isArray(result.rows)) return result.rows; + + const resultNode = isPlainObject(result.result) ? result.result : null; + if (resultNode && Array.isArray(resultNode.rows)) return resultNode.rows; + + const responseNode = isPlainObject(result.response) ? result.response : null; + if (!responseNode) return []; + + if (Array.isArray(responseNode.rows)) return responseNode.rows; + + const responseResultNode = isPlainObject(responseNode.result) ? responseNode.result : null; + if (responseResultNode && Array.isArray(responseResultNode.rows)) { + return responseResultNode.rows; + } + + return []; +} + +function decodeSqlCellToText(cell: unknown): string | null { + if (cell === null || cell === undefined) return null; + if (typeof cell === 'string') return cell; + if (typeof cell === 'number' || typeof cell === 'bigint') return String(cell); + + if (!isPlainObject(cell)) return null; + if (cell.type === 'null') return null; + if (typeof cell.value === 'string') return cell.value; + if (typeof cell.value === 'number' || typeof cell.value === 'bigint') return String(cell.value); + if (typeof cell.base64 === 'string') { + return Buffer.from(cell.base64, 'base64').toString('utf8'); + } + + return null; +} + +function trimTrailingSlash(value: string): string { + return value.replace(/\/+$/, ''); +} + +function createTimeoutSignal(timeoutMs: number): TimeoutSignalHandle { + const abortSignalWithTimeout = AbortSignal as typeof AbortSignal & { + timeout?: (_ms: number) => AbortSignal; + }; + if (typeof abortSignalWithTimeout.timeout === 'function') { + return { signal: abortSignalWithTimeout.timeout(timeoutMs), cleanup: () => {} }; + } + + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), timeoutMs); + return { + signal: controller.signal, + cleanup: () => { + clearTimeout(timeout); + }, + }; +} + +function isAbortError(error: unknown): boolean { + return error instanceof Error && error.name === 'AbortError'; +} + +function formatUnknownError(error: unknown): string { + return error instanceof Error ? error.message : String(error); +} From 536d3da240bcffd37a29d818541278d0db9215a0 Mon Sep 17 00:00:00 2001 From: iHildy Date: Sun, 12 Apr 2026 20:10:05 -0700 Subject: [PATCH 3/3] fix: avoid mutating string values when parsing JSONC --- scripts/e2e/github_two_instance.py | 40 +++++++++++++++++++++++++++--- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/scripts/e2e/github_two_instance.py b/scripts/e2e/github_two_instance.py index 9f51f75..214be6c 100755 --- a/scripts/e2e/github_two_instance.py +++ b/scripts/e2e/github_two_instance.py @@ -6,7 +6,6 @@ import fcntl import json import os -import re import shutil import signal import sqlite3 @@ -751,8 +750,43 @@ def parse_jsonc(content: str) -> dict[str, Any]: index += 1 cleaned = ''.join(output_chars) - # Remove trailing commas before object/array close. - cleaned = re.sub(r',(\s*[}\]])', r'\1', cleaned) + # Remove trailing commas before object/array close outside string values. + without_trailing_commas: list[str] = [] + in_cleaned_string = False + escape_in_cleaned_string = False + cleaned_index = 0 + cleaned_length = len(cleaned) + while cleaned_index < cleaned_length: + current = cleaned[cleaned_index] + if in_cleaned_string: + without_trailing_commas.append(current) + if escape_in_cleaned_string: + escape_in_cleaned_string = False + elif current == '\\': + escape_in_cleaned_string = True + elif current == '"': + in_cleaned_string = False + cleaned_index += 1 + continue + + if current == '"': + in_cleaned_string = True + without_trailing_commas.append(current) + cleaned_index += 1 + continue + + if current == ',': + next_token_index = cleaned_index + 1 + while next_token_index < cleaned_length and cleaned[next_token_index].isspace(): + next_token_index += 1 + if next_token_index < cleaned_length and cleaned[next_token_index] in ('}', ']'): + cleaned_index += 1 + continue + + without_trailing_commas.append(current) + cleaned_index += 1 + + cleaned = ''.join(without_trailing_commas) parsed = json.loads(cleaned) if not isinstance(parsed, dict): raise RuntimeError('Expected root object in JSONC config.')