Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions fns_cli/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@ def __init__(self, config: AppConfig) -> None:
self._running = False
self._handlers: dict[str, Callable[..., Coroutine]] = {}
self._binary_handler: Callable[..., Coroutine] | None = None
self._on_reconnect: Callable[[], Coroutine] | None = None
self._msg_queue: list[str | bytes] = []
self._ready_event = asyncio.Event()

def on_reconnect(self, handler: Callable[[], Coroutine]) -> None:
self._on_reconnect = handler

def on(self, action: str, handler: Callable[..., Coroutine]) -> None:
self._handlers[action] = handler

Expand Down Expand Up @@ -178,6 +182,11 @@ async def _on_auth_response(self, msg: WSMessage) -> None:
await self._raw_send(client_info.encode())
await self._flush_queue()
self._ready_event.set()
if self._connect_count > 1 and self._on_reconnect:
try:
await self._on_reconnect()
except Exception:
log.exception("Reconnect handler error")
else:
err = data.get("msg", data.get("message", "unknown"))
log.error("Authentication failed (code=%s): %s", code, err)
Expand Down
51 changes: 43 additions & 8 deletions fns_cli/file_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ def __init__(self, engine: SyncEngine) -> None:
self.vault_path = engine.vault_path
self._sync_complete = False
self._download_sessions: dict[str, _DownloadSession] = {}
self._expected_modify = 0
self._expected_delete = 0
self._received_modify = 0
self._received_delete = 0
self._got_end = False

@property
def is_sync_complete(self) -> bool:
Expand All @@ -78,8 +83,16 @@ def register_handlers(self) -> None:
ws.on(ACTION_FILE_SYNC_END, self._on_sync_end)
ws.on_binary(self._on_binary_chunk)

async def request_sync(self) -> None:
def _reset_counters(self) -> None:
self._sync_complete = False
self._got_end = False
self._expected_modify = 0
self._expected_delete = 0
self._received_modify = 0
self._received_delete = 0

async def request_sync(self) -> None:
self._reset_counters()
last_time = self.engine.state.last_file_sync_time
ctx = str(uuid.uuid4())
files = self._collect_local_files()
Expand Down Expand Up @@ -169,6 +182,8 @@ async def _on_sync_update(self, msg: WSMessage) -> None:
# a chunked download via FileChunkDownload.
log.info("← FileSyncUpdate (requesting chunk download): %s", rel_path)
await self._request_chunk_download(rel_path, data)
self._received_modify += 1
self._check_complete()
return

full = self.vault_path / rel_path
Expand All @@ -181,7 +196,6 @@ async def _on_sync_update(self, msg: WSMessage) -> None:
full.write_bytes(content)
else:
log.warning("Unexpected content type for %s: %s", rel_path, type(content))
return
if mtime and full.exists():
ts = mtime / 1000.0
os.utime(full, (ts, ts))
Expand All @@ -191,6 +205,9 @@ async def _on_sync_update(self, msg: WSMessage) -> None:
finally:
self.engine.unignore_file(rel_path)

self._received_modify += 1
self._check_complete()

async def _request_chunk_download(self, rel_path: str, data: dict) -> None:
"""Send FileChunkDownload request for a file that needs chunked transfer."""
msg = WSMessage(ACTION_FILE_CHUNK_DOWNLOAD, {
Expand All @@ -217,6 +234,9 @@ async def _on_sync_delete(self, msg: WSMessage) -> None:
finally:
self.engine.unignore_file(rel_path)

self._received_delete += 1
self._check_complete()

async def _on_sync_rename(self, msg: WSMessage) -> None:
data = _extract_inner(msg.data)
old_path: str = data.get("oldPath", "")
Expand Down Expand Up @@ -305,18 +325,33 @@ async def _on_sync_end(self, msg: WSMessage) -> None:
self.engine.state.last_file_sync_time = last_time
self.engine.state.save()

need_modify = data.get("needModifyCount", 0)
need_delete = data.get("needDeleteCount", 0)
self._expected_modify = data.get("needModifyCount", 0)
self._expected_delete = data.get("needDeleteCount", 0)
need_upload = data.get("needUploadCount", 0)

self._got_end = True
log.info(
"← FileSyncEnd (lastTime=%d, needModify=%d, needDelete=%d, needUpload=%d)",
last_time, need_modify, need_delete, need_upload,
last_time, self._expected_modify, self._expected_delete, need_upload,
)

# Mark sync complete — individual messages arrive after this.
# The sync_engine wait loop will keep running until this flag is set.
self._sync_complete = True
total = self._expected_modify + self._expected_delete
if total == 0:
self._sync_complete = True
else:
self._check_complete()

def _check_complete(self) -> None:
if not self._got_end:
return
total_expected = self._expected_modify + self._expected_delete
total_received = self._received_modify + self._received_delete
if total_received >= total_expected:
log.info(
"FileSync complete: %d modified, %d deleted",
self._received_modify, self._received_delete,
)
self._sync_complete = True
Comment on lines +344 to +354

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Using '>=' here can hide protocol mismatches or double-processing of messages.

With total_received >= total_expected, duplicate or double-counted messages will still mark the sync as complete, hiding protocol mismatches. If the protocol guarantees exact counts, consider using == and logging when total_received > total_expected so these issues are visible. Even if you keep the tolerance, logging any overshoot would help surface subtle bugs.

Suggested change
def _check_complete(self) -> None:
if not self._got_end:
return
total_expected = self._expected_modify + self._expected_delete
total_received = self._received_modify + self._received_delete
if total_received >= total_expected:
log.info(
"FileSync complete: %d modified, %d deleted",
self._received_modify, self._received_delete,
)
self._sync_complete = True
def _check_complete(self) -> None:
if not self._got_end:
return
total_expected = self._expected_modify + self._expected_delete
total_received = self._received_modify + self._received_delete
# Not done yet; still waiting for more messages.
if total_received < total_expected:
return
if total_received == total_expected:
log.info(
"FileSync complete: %d modified, %d deleted",
self._received_modify,
self._received_delete,
)
else:
# We received more messages than expected; this may indicate a protocol
# mismatch or double-processing, so surface it explicitly.
log.warning(
"FileSync received more messages than expected "
"(expected=%d, received=%d; modified=%d, deleted=%d). "
"Treating sync as complete.",
total_expected,
total_received,
self._received_modify,
self._received_delete,
)
self._sync_complete = True


def _collect_local_files(self) -> list[dict]:
"""Collect non-note, non-excluded local files with hashes for FileSync."""
Expand Down
14 changes: 12 additions & 2 deletions fns_cli/sync_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@ async def run(self) -> None:
from .watcher import VaultWatcher
watcher = VaultWatcher(self, loop)

async def _on_reconnect() -> None:
log.info("Reconnected — re-syncing")
self._watch_enabled = False
await self._initial_sync()
self._watch_enabled = True

self.ws_client.on_reconnect(_on_reconnect)

ws_task = asyncio.create_task(self.ws_client.run())

try:
Expand Down Expand Up @@ -133,11 +141,13 @@ async def sync_once(self) -> None:
log.error("Failed to authenticate within 30s")
return

await self._initial_sync()

if self.config.sync.sync_notes:
await self.note_sync.request_full_sync()
await self._wait_note_sync(timeout=120)

if self.config.sync.sync_files or self.config.sync.sync_config:
await self.file_sync.request_sync()
await self._wait_file_sync(timeout=300)
finally:
await self.ws_client.close()
ws_task.cancel()
Expand Down
6 changes: 5 additions & 1 deletion fns_cli/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,11 @@ def on_moved(self, event):
new_rel = self._rel(event.dest_path)
except ValueError:
return
if self.engine.is_excluded(new_rel):
if (
self.engine.is_ignored(old_rel)
or self.engine.is_ignored(new_rel)
or self.engine.is_excluded(new_rel)
):
return
self._schedule(
f"mv:{old_rel}:{new_rel}",
Expand Down
Loading