diff --git a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp index 0ed8d1033135..c6e32782709a 100644 --- a/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp +++ b/src/Storages/MergeTree/ExportPartitionManifestUpdatingTask.cpp @@ -36,12 +36,32 @@ namespace FailPoints namespace { + /// Work item describing a commit-recovery attempt that has been deferred out of + /// the `poll()` critical section. Captures everything by value so it can be + /// executed safely after `export_merge_tree_partition_mutex` has been released. + struct CommitRecoveryWork + { + ExportReplicatedMergeTreePartitionManifest metadata; + std::string entry_path; + StoragePtr destination_storage; + ContextPtr context; + }; + /* Remove expired entries and fix non-committed exports that have already exported all parts. Return values: - true: the cleanup was successful, the entry is removed from the entries_by_key container and the function returns true. Proceed to the next entry. - false: the cleanup was not successful, the entry is not removed from the entries_by_key container and the function returns false. + + Side outputs: + - `deferred_commits`: when a PENDING entry has all parts processed but the export was + never committed, this function appends a CommitRecoveryWork item to be executed by + the caller after releasing the storage-wide mutex. The actual commit() call (which + performs network I/O to the destination catalog and S3) MUST NOT run under the lock. + The function still returns `false` in that case so the outer poll() loop falls through + to `addTask`, keeping the in-memory entry consistent regardless of whether the + deferred commit ultimately succeeds. */ bool tryCleanup( const zkutil::ZooKeeperPtr & zk, @@ -53,7 +73,8 @@ namespace const ExportReplicatedMergeTreePartitionManifest & metadata, const time_t now, const bool is_pending, - auto & entries_by_key + auto & entries_by_key, + std::vector & deferred_commits ) { bool has_expired = metadata.create_time < now - static_cast(metadata.ttl_seconds); @@ -153,8 +174,8 @@ namespace if (parts_in_processing_or_pending.empty()) { - LOG_INFO(log, "ExportPartition Manifest Updating Task: Cleanup found PENDING for {} with all parts exported, try to fix it by committing the export", entry_path); - + LOG_INFO(log, "ExportPartition Manifest Updating Task: Cleanup found PENDING for {} with all parts exported, deferring commit recovery to post-lock phase", entry_path); + const auto destination_storage_id = StorageID(QualifiedTableName {metadata.destination_database, metadata.destination_table}); const auto destination_storage = DatabaseCatalog::instance().tryGetTable(destination_storage_id, context); if (!destination_storage) @@ -163,43 +184,24 @@ namespace return false; } - /// it sounds like a replica exported the last part, but was not able to commit the export. Try to fix it - try - { - ExportPartitionUtils::commit(metadata, destination_storage, zk, log, entry_path, context, storage); - } - catch (const Exception & e) - { - LOG_WARNING(log, - "ExportPartition Manifest Updating Task: " - "Caught exception while committing export for {}: {}", - entry_path, e.message()); - - /// Bump commit-attempts counter; transition to FAILED once the budget is exhausted. - /// This is the primary retry path for the commit phase — handlePartExportSuccess - /// only fires once (on the last part's completion); subsequent retries come from here. - const bool became_failed = ExportPartitionUtils::handleCommitFailure( - zk, - entry_path, - metadata.max_retries, - log); - - if (became_failed) - { - LOG_WARNING(log, - "ExportPartition Manifest Updating Task: " - "Commit for {} transitioned to FAILED after exhausting max_retries={}", - entry_path, metadata.max_retries); - } - - /// Return false so the next poll re-enters the cleanup path: - /// - if FAILED: status != PENDING on re-read, cleanup is a no-op - /// until the entry expires (handled by the first tryCleanup branch). - /// - if still PENDING: next poll increments the counter again. - return false; - } - - return true; + /// A replica exported the last part but the commit never landed. Capture everything + /// needed to run commit() outside `export_merge_tree_partition_mutex`. The + /// commit path performs network I/O (REST catalog + S3) with up to + /// MAX_TRANSACTION_RETRIES = 100 retries; holding the storage-wide mutex across + /// that work is what caused `system.replicated_partition_exports` to hang. + /// + /// Returning false here keeps the outer poll() loop on the normal path: it will + /// call addTask() so the in-memory container reflects the PENDING entry. The + /// status watch registered by poll() will transition the local entry to + /// COMPLETED/FAILED once the deferred commit (or a peer's commit) updates + /// /status in ZooKeeper. + deferred_commits.push_back(CommitRecoveryWork{ + .metadata = metadata, + .entry_path = entry_path, + .destination_storage = destination_storage, + .context = context, + }); + return false; } } @@ -553,25 +555,44 @@ std::vector ExportPartitionManifestUpdatingTask:: std::vector ExportPartitionManifestUpdatingTask::getPartitionExportsInfoLocal() const { - std::lock_guard lock(storage.export_merge_tree_partition_mutex); + /// Snapshot just the fields we need under the lock, then build the public + /// ReplicatedPartitionExportInfo vector after releasing it. This keeps the critical + /// section O(entries) of cheap struct copies (manifest + enum) rather than also + /// covering the formatting / Array construction performed by the caller. + struct EntrySnapshot + { + ExportReplicatedMergeTreePartitionManifest manifest; + ExportReplicatedMergeTreePartitionTaskEntry::Status status; + }; + + std::vector snapshots; + + { + std::lock_guard lock(storage.export_merge_tree_partition_mutex); + + snapshots.reserve(storage.export_merge_tree_partition_task_entries_by_key.size()); + for (const auto & entry : storage.export_merge_tree_partition_task_entries_by_key) + snapshots.push_back(EntrySnapshot{entry.manifest, entry.status}); + } std::vector infos; + infos.reserve(snapshots.size()); - for (const auto & entry : storage.export_merge_tree_partition_task_entries_by_key) + for (auto & snapshot : snapshots) { ReplicatedPartitionExportInfo info; - info.destination_database = entry.manifest.destination_database; - info.destination_table = entry.manifest.destination_table; - info.partition_id = entry.manifest.partition_id; - info.transaction_id = entry.manifest.transaction_id; - info.query_id = entry.manifest.query_id; - info.create_time = entry.manifest.create_time; - info.source_replica = entry.manifest.source_replica; - info.parts_count = entry.manifest.number_of_parts; - info.parts_to_do = entry.manifest.parts.size(); - info.parts = entry.manifest.parts; - info.status = magic_enum::enum_name(entry.status); + info.destination_database = snapshot.manifest.destination_database; + info.destination_table = snapshot.manifest.destination_table; + info.partition_id = snapshot.manifest.partition_id; + info.transaction_id = snapshot.manifest.transaction_id; + info.query_id = snapshot.manifest.query_id; + info.create_time = snapshot.manifest.create_time; + info.source_replica = snapshot.manifest.source_replica; + info.parts_count = snapshot.manifest.number_of_parts; + info.parts_to_do = snapshot.manifest.parts.size(); + info.parts = std::move(snapshot.manifest.parts); + info.status = magic_enum::enum_name(snapshot.status); infos.emplace_back(std::move(info)); } @@ -581,123 +602,190 @@ std::vector ExportPartitionManifestUpdatingTask:: void ExportPartitionManifestUpdatingTask::poll() { - std::lock_guard lock(storage.export_merge_tree_partition_mutex); - - LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Polling for new entries for table {}. Current number of entries: {}", storage.getStorageID().getNameForLogs(), storage.export_merge_tree_partition_task_entries_by_key.size()); + /// Commit-recovery work collected while the storage-wide mutex is held. + /// Executed AFTER the mutex is released - committing to Iceberg/REST-catalog can take + /// many seconds (up to MAX_TRANSACTION_RETRIES=100 catalog round-trips) and blocking + /// `system.replicated_partition_exports` for that long is what we are fixing here. + std::vector deferred_commits; auto zk = storage.getZooKeeper(); - + const std::string exports_path = fs::path(storage.zookeeper_path) / "exports"; const std::string cleanup_lock_path = fs::path(storage.zookeeper_path) / "exports_cleanup_lock"; + /// The `exports_cleanup_lock` is an ephemeral ZK node that serializes cleanup work + /// across replicas: only the replica holding it walks `tryCleanup` (entry expiry + + /// commit recovery). It MUST outlive the deferred-commit loop below; otherwise a peer + /// replica's next poll() could acquire it and race us on the same commit-recovery work, + /// duplicating REST-catalog round-trips and snapshot writes. The EphemeralNodeHolder + /// destructor removes the node, so we declare it at function scope and let it die + /// at the end of poll() - after all deferred commits have completed. + /// Acquired here (no mutex needed - it is just a ZK ephemeral create). auto cleanup_lock = zkutil::EphemeralNodeHolder::tryCreate(cleanup_lock_path, *zk, storage.replica_name); if (cleanup_lock) { LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Cleanup lock acquired, will remove stale entries"); } - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildrenWatch); + { + std::lock_guard lock(storage.export_merge_tree_partition_mutex); + + LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Polling for new entries for table {}. Current number of entries: {}", storage.getStorageID().getNameForLogs(), storage.export_merge_tree_partition_task_entries_by_key.size()); - Coordination::Stat stat; - const auto children = zk->getChildrenWatch(exports_path, &stat, storage.export_merge_tree_partition_watch_callback); - const std::unordered_set zk_children(children.begin(), children.end()); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildrenWatch); - const auto now = time(nullptr); + Coordination::Stat stat; + const auto children = zk->getChildrenWatch(exports_path, &stat, storage.export_merge_tree_partition_watch_callback); + const std::unordered_set zk_children(children.begin(), children.end()); - auto & entries_by_key = storage.export_merge_tree_partition_task_entries_by_key; + const auto now = time(nullptr); - /// Load new entries - /// If we have the cleanup lock, also remove stale entries from zk and local - /// Upload dangling commit files if any - for (const auto & key : zk_children) - { - const std::string entry_path = fs::path(exports_path) / key; + auto & entries_by_key = storage.export_merge_tree_partition_task_entries_by_key; - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); - std::string metadata_json; - if (!zk->tryGet(fs::path(entry_path) / "metadata.json", metadata_json)) + /// Load new entries + /// If we have the cleanup lock, also remove stale entries from zk and local + /// Upload dangling commit files if any + for (const auto & key : zk_children) { - LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Skipping {}: missing metadata.json", key); - continue; - } + const std::string entry_path = fs::path(exports_path) / key; - const auto metadata = ExportReplicatedMergeTreePartitionManifest::fromJsonString(metadata_json); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); + std::string metadata_json; + if (!zk->tryGet(fs::path(entry_path) / "metadata.json", metadata_json)) + { + LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Skipping {}: missing metadata.json", key); + continue; + } - const auto local_entry = entries_by_key.find(key); + const auto metadata = ExportReplicatedMergeTreePartitionManifest::fromJsonString(metadata_json); - /// If the zk entry has been replaced with export_merge_tree_partition_force_export, checking only for the export key is not enough - /// we need to make sure it is the same transaction id. If it is not, it needs to be replaced. - bool has_local_entry_and_is_up_to_date = local_entry != entries_by_key.end() - && local_entry->manifest.transaction_id == metadata.transaction_id; + const auto local_entry = entries_by_key.find(key); - /// If the entry is up to date and we don't have the cleanup lock, early exit, nothing to be done. - if (!cleanup_lock && has_local_entry_and_is_up_to_date) - continue; + /// If the zk entry has been replaced with export_merge_tree_partition_force_export, checking only for the export key is not enough + /// we need to make sure it is the same transaction id. If it is not, it needs to be replaced. + bool has_local_entry_and_is_up_to_date = local_entry != entries_by_key.end() + && local_entry->manifest.transaction_id == metadata.transaction_id; - std::weak_ptr weak_manifest_updater = storage.export_merge_tree_partition_manifest_updater; + /// If the entry is up to date and we don't have the cleanup lock, early exit, nothing to be done. + if (!cleanup_lock && has_local_entry_and_is_up_to_date) + continue; - auto status_watch_callback = std::make_shared([weak_manifest_updater, key](const Coordination::WatchResponse &) - { - /// If the table is dropped but the watch is not removed, we need to prevent use after free - /// below code assumes that if manifest updater is still alive, the status handling task is also alive - if (auto manifest_updater = weak_manifest_updater.lock()) + std::weak_ptr weak_manifest_updater = storage.export_merge_tree_partition_manifest_updater; + + auto status_watch_callback = std::make_shared([weak_manifest_updater, key](const Coordination::WatchResponse &) + { + /// If the table is dropped but the watch is not removed, we need to prevent use after free + /// below code assumes that if manifest updater is still alive, the status handling task is also alive + if (auto manifest_updater = weak_manifest_updater.lock()) + { + manifest_updater->addStatusChange(key); + manifest_updater->storage.export_merge_tree_partition_status_handling_task->schedule(); + } + }); + + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetWatch); + std::string status_string; + if (!zk->tryGetWatch(fs::path(entry_path) / "status", status_string, nullptr, status_watch_callback)) { - manifest_updater->addStatusChange(key); - manifest_updater->storage.export_merge_tree_partition_status_handling_task->schedule(); + LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Skipping {}: missing status", key); + continue; } - }); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetWatch); - std::string status_string; - if (!zk->tryGetWatch(fs::path(entry_path) / "status", status_string, nullptr, status_watch_callback)) - { - LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Skipping {}: missing status", key); - continue; - } + const auto status = magic_enum::enum_cast(status_string); + if (!status) + { + LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Invalid status {} for task {}, skipping", status_string, key); + continue; + } - const auto status = magic_enum::enum_cast(status_string); - if (!status) - { - LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Invalid status {} for task {}, skipping", status_string, key); - continue; - } + /// if we have the cleanup lock, try to cleanup + /// if we successfully cleaned it up, early exit + if (cleanup_lock) + { + bool cleanup_successful = tryCleanup( + zk, + entry_path, + storage.log.load(), + storage.getContext(), + storage, + key, + metadata, + now, + *status == ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, + entries_by_key, + deferred_commits); + + if (cleanup_successful) + continue; + } - /// if we have the cleanup lock, try to cleanup - /// if we successfully cleaned it up, early exit - if (cleanup_lock) - { - bool cleanup_successful = tryCleanup( - zk, - entry_path, - storage.log.load(), - storage.getContext(), - storage, - key, - metadata, - now, - *status == ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING, - entries_by_key); - - if (cleanup_successful) + if (has_local_entry_and_is_up_to_date) + { + LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Skipping {}: already exists", key); continue; - } + } - if (has_local_entry_and_is_up_to_date) - { - LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Skipping {}: already exists", key); - continue; + addTask(metadata, *status, key, entries_by_key); } - addTask(metadata, *status, key, entries_by_key); + /// Remove entries that were deleted by someone else + removeStaleEntries(zk_children, entries_by_key); + + LOG_INFO(storage.log, "ExportPartition Manifest Updating task: finished polling for new entries. Number of entries: {}", entries_by_key.size()); } + /// `export_merge_tree_partition_mutex` released here. Everything below runs without it + /// so concurrent readers of `system.replicated_partition_exports` and other writers are + /// not blocked by the (potentially slow) catalog round-trips below. + /// + /// `cleanup_lock` (the ZK ephemeral node) is INTENTIONALLY still held here and is only + /// destructed at end of function. This preserves the existing cross-replica invariant: + /// at any moment only one replica is performing commit recovery for a given table, so + /// peer replicas will not race us on the same `commit()` calls below. + /// + /// Shutdown safety: this function runs on a BackgroundSchedulePool task that + /// `StorageReplicatedMergeTree::shutdown()` deactivates before clearing the entry + /// container. Deactivation waits for the currently-running invocation (this very call) + /// to return before proceeding, so the deferred commits below complete (or throw) before + /// any teardown observes empty `export_merge_tree_partition_task_entries`. All work + /// items capture their inputs by value, so they are independent from container state. + + for (const auto & work : deferred_commits) + { + const auto log_ptr = storage.log.load(); - /// Remove entries that were deleted by someone else - removeStaleEntries(zk_children, entries_by_key); + /// A replica exported the last part but the commit never landed. Try to fix it. + try + { + ExportPartitionUtils::commit(work.metadata, work.destination_storage, zk, log_ptr, work.entry_path, work.context, storage); + } + catch (const Exception & e) + { + LOG_WARNING(log_ptr, + "ExportPartition Manifest Updating Task: " + "Caught exception while committing export for {}: {}", + work.entry_path, e.message()); + + /// Bump commit-attempts counter; transition to FAILED once the budget is exhausted. + /// This is the primary retry path for the commit phase — handlePartExportSuccess + /// only fires once (on the last part's completion); subsequent retries come from here. + const bool became_failed = ExportPartitionUtils::handleCommitFailure( + zk, + work.entry_path, + work.metadata.max_retries, + log_ptr); - LOG_INFO(storage.log, "ExportPartition Manifest Updating task: finished polling for new entries. Number of entries: {}", entries_by_key.size()); + if (became_failed) + { + LOG_WARNING(log_ptr, + "ExportPartition Manifest Updating Task: " + "Commit for {} transitioned to FAILED after exhausting max_retries={}", + work.entry_path, work.metadata.max_retries); + } + } + } storage.export_merge_tree_partition_select_task->schedule(); }