diff --git a/fns_cli/client.py b/fns_cli/client.py index 0450d75..47395fb 100644 --- a/fns_cli/client.py +++ b/fns_cli/client.py @@ -33,9 +33,13 @@ def __init__(self, config: AppConfig) -> None: self._running = False self._handlers: dict[str, Callable[..., Coroutine]] = {} self._binary_handler: Callable[..., Coroutine] | None = None + self._on_reconnect: Callable[[], Coroutine] | None = None self._msg_queue: list[str | bytes] = [] self._ready_event = asyncio.Event() + def on_reconnect(self, handler: Callable[[], Coroutine]) -> None: + self._on_reconnect = handler + def on(self, action: str, handler: Callable[..., Coroutine]) -> None: self._handlers[action] = handler @@ -178,6 +182,11 @@ async def _on_auth_response(self, msg: WSMessage) -> None: await self._raw_send(client_info.encode()) await self._flush_queue() self._ready_event.set() + if self._connect_count > 1 and self._on_reconnect: + try: + await self._on_reconnect() + except Exception: + log.exception("Reconnect handler error") else: err = data.get("msg", data.get("message", "unknown")) log.error("Authentication failed (code=%s): %s", code, err) diff --git a/fns_cli/file_sync.py b/fns_cli/file_sync.py index d387762..086fcae 100644 --- a/fns_cli/file_sync.py +++ b/fns_cli/file_sync.py @@ -62,6 +62,11 @@ def __init__(self, engine: SyncEngine) -> None: self.vault_path = engine.vault_path self._sync_complete = False self._download_sessions: dict[str, _DownloadSession] = {} + self._expected_modify = 0 + self._expected_delete = 0 + self._received_modify = 0 + self._received_delete = 0 + self._got_end = False @property def is_sync_complete(self) -> bool: @@ -78,8 +83,16 @@ def register_handlers(self) -> None: ws.on(ACTION_FILE_SYNC_END, self._on_sync_end) ws.on_binary(self._on_binary_chunk) - async def request_sync(self) -> None: + def _reset_counters(self) -> None: self._sync_complete = False + self._got_end = False + self._expected_modify = 0 + self._expected_delete = 0 + self._received_modify = 0 + self._received_delete = 0 + + async def request_sync(self) -> None: + self._reset_counters() last_time = self.engine.state.last_file_sync_time ctx = str(uuid.uuid4()) files = self._collect_local_files() @@ -169,6 +182,8 @@ async def _on_sync_update(self, msg: WSMessage) -> None: # a chunked download via FileChunkDownload. log.info("← FileSyncUpdate (requesting chunk download): %s", rel_path) await self._request_chunk_download(rel_path, data) + self._received_modify += 1 + self._check_complete() return full = self.vault_path / rel_path @@ -181,7 +196,6 @@ async def _on_sync_update(self, msg: WSMessage) -> None: full.write_bytes(content) else: log.warning("Unexpected content type for %s: %s", rel_path, type(content)) - return if mtime and full.exists(): ts = mtime / 1000.0 os.utime(full, (ts, ts)) @@ -191,6 +205,9 @@ async def _on_sync_update(self, msg: WSMessage) -> None: finally: self.engine.unignore_file(rel_path) + self._received_modify += 1 + self._check_complete() + async def _request_chunk_download(self, rel_path: str, data: dict) -> None: """Send FileChunkDownload request for a file that needs chunked transfer.""" msg = WSMessage(ACTION_FILE_CHUNK_DOWNLOAD, { @@ -217,6 +234,9 @@ async def _on_sync_delete(self, msg: WSMessage) -> None: finally: self.engine.unignore_file(rel_path) + self._received_delete += 1 + self._check_complete() + async def _on_sync_rename(self, msg: WSMessage) -> None: data = _extract_inner(msg.data) old_path: str = data.get("oldPath", "") @@ -305,18 +325,33 @@ async def _on_sync_end(self, msg: WSMessage) -> None: self.engine.state.last_file_sync_time = last_time self.engine.state.save() - need_modify = data.get("needModifyCount", 0) - need_delete = data.get("needDeleteCount", 0) + self._expected_modify = data.get("needModifyCount", 0) + self._expected_delete = data.get("needDeleteCount", 0) need_upload = data.get("needUploadCount", 0) + self._got_end = True log.info( "← FileSyncEnd (lastTime=%d, needModify=%d, needDelete=%d, needUpload=%d)", - last_time, need_modify, need_delete, need_upload, + last_time, self._expected_modify, self._expected_delete, need_upload, ) - # Mark sync complete — individual messages arrive after this. - # The sync_engine wait loop will keep running until this flag is set. - self._sync_complete = True + total = self._expected_modify + self._expected_delete + if total == 0: + self._sync_complete = True + else: + self._check_complete() + + def _check_complete(self) -> None: + if not self._got_end: + return + total_expected = self._expected_modify + self._expected_delete + total_received = self._received_modify + self._received_delete + if total_received >= total_expected: + log.info( + "FileSync complete: %d modified, %d deleted", + self._received_modify, self._received_delete, + ) + self._sync_complete = True def _collect_local_files(self) -> list[dict]: """Collect non-note, non-excluded local files with hashes for FileSync.""" diff --git a/fns_cli/sync_engine.py b/fns_cli/sync_engine.py index e4bc6ca..8511013 100644 --- a/fns_cli/sync_engine.py +++ b/fns_cli/sync_engine.py @@ -100,6 +100,14 @@ async def run(self) -> None: from .watcher import VaultWatcher watcher = VaultWatcher(self, loop) + async def _on_reconnect() -> None: + log.info("Reconnected — re-syncing") + self._watch_enabled = False + await self._initial_sync() + self._watch_enabled = True + + self.ws_client.on_reconnect(_on_reconnect) + ws_task = asyncio.create_task(self.ws_client.run()) try: @@ -133,11 +141,13 @@ async def sync_once(self) -> None: log.error("Failed to authenticate within 30s") return - await self._initial_sync() - if self.config.sync.sync_notes: await self.note_sync.request_full_sync() await self._wait_note_sync(timeout=120) + + if self.config.sync.sync_files or self.config.sync.sync_config: + await self.file_sync.request_sync() + await self._wait_file_sync(timeout=300) finally: await self.ws_client.close() ws_task.cancel() diff --git a/fns_cli/watcher.py b/fns_cli/watcher.py index 0c6261c..e9cca16 100644 --- a/fns_cli/watcher.py +++ b/fns_cli/watcher.py @@ -91,7 +91,11 @@ def on_moved(self, event): new_rel = self._rel(event.dest_path) except ValueError: return - if self.engine.is_excluded(new_rel): + if ( + self.engine.is_ignored(old_rel) + or self.engine.is_ignored(new_rel) + or self.engine.is_excluded(new_rel) + ): return self._schedule( f"mv:{old_rel}:{new_rel}",