From c78a85165516b4b0db3699716b8f0bc1ef331bfe Mon Sep 17 00:00:00 2001 From: meiravgri Date: Tue, 31 Mar 2026 16:29:00 +0000 Subject: [PATCH 1/9] **Step 1: Rename SVS thread pool internal methods to match shared pool terminology** MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rename `setNumThreads`/`getNumThreads` → `setParallelism`/`getParallelism` and `getThreadPoolCapacity` → `getPoolSize` across VectorSimilarity. Public info API fields (`numThreads`, `lastReservedThreads`, `NUM_THREADS`, `LAST_RESERVED_NUM_THREADS`) remain unchanged. No behavioral changes. --- src/VecSim/algorithms/svs/svs.h | 36 +++++++++++++------------- src/VecSim/algorithms/svs/svs_tiered.h | 18 ++++++------- tests/benchmark/bm_utils.h | 4 +-- tests/benchmark/bm_vecsim_svs.h | 2 +- tests/unit/test_svs_fp16.cpp | 6 ++--- tests/unit/test_svs_tiered.cpp | 12 ++++----- 6 files changed, 39 insertions(+), 39 deletions(-) diff --git a/src/VecSim/algorithms/svs/svs.h b/src/VecSim/algorithms/svs/svs.h index 8b03514f0..9d5ee1c3c 100644 --- a/src/VecSim/algorithms/svs/svs.h +++ b/src/VecSim/algorithms/svs/svs.h @@ -41,9 +41,9 @@ struct SVSIndexBase virtual int deleteVectors(const labelType *labels, size_t n) = 0; virtual bool isLabelExists(labelType label) const = 0; virtual size_t indexStorageSize() const = 0; - virtual size_t getNumThreads() const = 0; - virtual void setNumThreads(size_t numThreads) = 0; - virtual size_t getThreadPoolCapacity() const = 0; + virtual size_t getParallelism() const = 0; + virtual void setParallelism(size_t parallelism) = 0; + virtual size_t getPoolSize() const = 0; virtual bool isCompressed() const = 0; size_t getNumMarkedDeleted() const { return num_marked_deleted; } @@ -66,9 +66,9 @@ struct SVSIndexBase }; /** Thread Management Strategy: - * - addVector(): Requires numThreads == 1 - * - addVectors(): Allows any numThreads value, but prohibits n=1 with numThreads>1 - * - Callers are responsible for setting appropriate thread counts + * - addVector(): Requires parallelism == 1 + * - addVectors(): Allows any parallelism value, but prohibits n=1 with parallelism>1 + * - Callers are responsible for setting appropriate parallelism **/ template @@ -251,12 +251,12 @@ class SVSIndex : public VecSimIndexAbstract, fl this->impl_ = std::move(svs_handler->impl); } - // Assuming numThreads was updated to reflect the number of available threads before this + // Assuming parallelism was updated to reflect the number of available threads before this // function was called. - // This function assumes that the caller has already set numThreads to the appropriate value + // This function assumes that the caller has already set parallelism to the appropriate value // for the operation. - // Important NOTE: For single vector operations (n=1), numThreads should be 1. - // For bulk operations (n>1), numThreads should reflect the number of available threads. + // Important NOTE: For single vector operations (n=1), parallelism should be 1. + // For bulk operations (n>1), parallelism should reflect the number of available threads. int addVectorsImpl(const void *vectors_data, const labelType *labels, size_t n) { if (n == 0) { return 0; @@ -412,8 +412,8 @@ class SVSIndex : public VecSimIndexAbstract, fl .maxCandidatePoolSize = this->buildParams.max_candidate_pool_size, .pruneTo = this->buildParams.prune_to, .useSearchHistory = this->buildParams.use_full_search_history, - .numThreads = this->getThreadPoolCapacity(), - .lastReservedThreads = this->getNumThreads(), + .numThreads = this->getPoolSize(), + .lastReservedThreads = this->getParallelism(), .numberOfMarkedDeletedNodes = this->num_marked_deleted, .searchWindowSize = this->search_window_size, .searchBufferCapacity = this->search_buffer_capacity, @@ -517,16 +517,16 @@ class SVSIndex : public VecSimIndexAbstract, fl int addVector(const void *vector_data, labelType label) override { // Enforce single-threaded execution for single vector operations to ensure optimal - // performance and consistent behavior. Callers must set numThreads=1 before calling this + // performance and consistent behavior. Callers must set parallelism=1 before calling this // method. - assert(getNumThreads() == 1 && "Can't use more than one thread to insert a single vector"); + assert(getParallelism() == 1 && "Can't use more than one thread to insert a single vector"); return addVectorsImpl(vector_data, &label, 1); } int addVectors(const void *vectors_data, const labelType *labels, size_t n) override { // Prevent misuse: single vector operations should use addVector(), not addVectors() with // n=1 This ensures proper thread management and API contract enforcement. - assert(!(n == 1 && getNumThreads() > 1) && + assert(!(n == 1 && getParallelism() > 1) && "Can't use more than one thread to insert a single vector"); return addVectorsImpl(vectors_data, labels, n); } @@ -541,10 +541,10 @@ class SVSIndex : public VecSimIndexAbstract, fl return impl_ ? impl_->has_id(label) : false; } - size_t getNumThreads() const override { return threadpool_.size(); } - void setNumThreads(size_t numThreads) override { threadpool_.resize(numThreads); } + size_t getParallelism() const override { return threadpool_.size(); } + void setParallelism(size_t parallelism) override { threadpool_.resize(parallelism); } - size_t getThreadPoolCapacity() const override { return threadpool_.capacity(); } + size_t getPoolSize() const override { return threadpool_.capacity(); } bool isCompressed() const override { return storage_traits_t::is_compressed(); } diff --git a/src/VecSim/algorithms/svs/svs_tiered.h b/src/VecSim/algorithms/svs/svs_tiered.h index 2eac66d24..8ca67bf5a 100644 --- a/src/VecSim/algorithms/svs/svs_tiered.h +++ b/src/VecSim/algorithms/svs/svs_tiered.h @@ -587,7 +587,7 @@ class TieredSVSIndex : public VecSimTieredIndex { // No need to run GC on an empty index. return; } - svs_index->setNumThreads(std::min(availableThreads, index->backendIndex->indexSize())); + svs_index->setParallelism(std::min(availableThreads, index->backendIndex->indexSize())); // VecSimIndexAbstract::runGC() is protected static_cast(index->backendIndex)->runGC(); } @@ -601,7 +601,7 @@ class TieredSVSIndex : public VecSimTieredIndex { return; } - auto total_threads = this->GetSVSIndex()->getThreadPoolCapacity(); + auto total_threads = this->GetSVSIndex()->getPoolSize(); auto jobs = SVSMultiThreadJob::createJobs( this->allocator, SVS_BATCH_UPDATE_JOB, updateSVSIndexWrapper, this, total_threads, std::chrono::microseconds(updateJobWaitTime), &uncompletedJobs); @@ -614,7 +614,7 @@ class TieredSVSIndex : public VecSimTieredIndex { return; } - auto total_threads = this->GetSVSIndex()->getThreadPoolCapacity(); + auto total_threads = this->GetSVSIndex()->getPoolSize(); auto jobs = SVSMultiThreadJob::createJobs( this->allocator, SVS_GC_JOB, SVSIndexGCWrapper, this, total_threads, std::chrono::microseconds(updateJobWaitTime), &uncompletedJobs); @@ -683,7 +683,7 @@ class TieredSVSIndex : public VecSimTieredIndex { assert(labels_to_move.size() == vectors_to_move.size() / this->frontendIndex->getDim()); if (this->backendIndex->indexSize() == 0) { // If backend index is empty, we need to initialize it first. - svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size())); + svs_index->setParallelism(std::min(availableThreads, labels_to_move.size())); auto impl = svs_index->createImpl(vectors_to_move.data(), labels_to_move.data(), labels_to_move.size()); @@ -696,7 +696,7 @@ class TieredSVSIndex : public VecSimTieredIndex { main_shared_lock.unlock(); std::lock_guard lock(this->mainIndexGuard); // Upgrade to unique lock to add vectors - svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size())); + svs_index->setParallelism(std::min(availableThreads, labels_to_move.size())); svs_index->addVectors(vectors_to_move.data(), labels_to_move.data(), labels_to_move.size()); } @@ -821,9 +821,9 @@ class TieredSVSIndex : public VecSimTieredIndex { std::scoped_lock lock(this->updateJobMutex, this->mainIndexGuard); // Set available thread count to 1 for single vector write-in-place operation. // This maintains the contract that single vector operations use exactly one thread. - // TODO: Replace this setNumThreads(1) call with an assertion once we establish - // a contract that write-in-place mode guarantees numThreads == 1. - svs_index->setNumThreads(1); + // TODO: Replace this setParallelism(1) call with an assertion once we establish + // a contract that write-in-place mode guarantees parallelism == 1. + svs_index->setParallelism(1); return this->backendIndex->addVector(storage_blob.get(), label); } } @@ -1077,7 +1077,7 @@ class TieredSVSIndex : public VecSimTieredIndex { return; } // Force single thread for write-in-place mode. - this->GetSVSIndex()->setNumThreads(1); + this->GetSVSIndex()->setParallelism(1); // VecSimIndexAbstract::runGC() is protected static_cast(this->backendIndex)->runGC(); return; diff --git a/tests/benchmark/bm_utils.h b/tests/benchmark/bm_utils.h index 88f3e2e5d..e952794ad 100644 --- a/tests/benchmark/bm_utils.h +++ b/tests/benchmark/bm_utils.h @@ -30,9 +30,9 @@ CreateTieredSVSParams(VecSimParams &svs_params, tieredIndexMock &mock_thread_poo template static void verifyNumThreads(TieredSVSIndex *tiered_index, size_t expected_num_threads, size_t expected_capcity, std::string msg = "") { - ASSERT_EQ(tiered_index->GetSVSIndex()->getThreadPoolCapacity(), expected_capcity) + ASSERT_EQ(tiered_index->GetSVSIndex()->getPoolSize(), expected_capcity) << msg << ": thread pool capacity mismatch"; - size_t num_reserved_threads = tiered_index->GetSVSIndex()->getNumThreads(); + size_t num_reserved_threads = tiered_index->GetSVSIndex()->getParallelism(); if (num_reserved_threads < expected_num_threads) { std::cout << msg << ": WARNING: last reserved threads (" << num_reserved_threads << ") is less than expected (" << expected_num_threads << ")." << std::endl; diff --git a/tests/benchmark/bm_vecsim_svs.h b/tests/benchmark/bm_vecsim_svs.h index fe6c92493..c4b167421 100644 --- a/tests/benchmark/bm_vecsim_svs.h +++ b/tests/benchmark/bm_vecsim_svs.h @@ -115,7 +115,7 @@ class BM_VecSimSVS : public BM_VecSimGeneral { tiered_params.primaryIndexParams->algoParams.svsParams.num_threads; size_t num_threads = params_threadpool_size ? params_threadpool_size : mock_thread_pool.thread_pool_size; - tiered_index->GetSVSIndex()->setNumThreads(num_threads); + tiered_index->GetSVSIndex()->setParallelism(num_threads); test_utils::verifyNumThreads(tiered_index, num_threads, num_threads, std::string("CreateTieredSVSIndex")); diff --git a/tests/unit/test_svs_fp16.cpp b/tests/unit/test_svs_fp16.cpp index 59c11c0a1..82937ef32 100644 --- a/tests/unit/test_svs_fp16.cpp +++ b/tests/unit/test_svs_fp16.cpp @@ -2242,8 +2242,8 @@ class FP16SVSTieredIndexTest : public FP16SVSTest { } void verifyNumThreads(TieredSVSIndex *tiered_index, size_t expected_num_threads, size_t expected_capcity) { - ASSERT_EQ(tiered_index->GetSVSIndex()->getNumThreads(), expected_num_threads); - ASSERT_EQ(tiered_index->GetSVSIndex()->getThreadPoolCapacity(), expected_capcity); + ASSERT_EQ(tiered_index->GetSVSIndex()->getParallelism(), expected_num_threads); + ASSERT_EQ(tiered_index->GetSVSIndex()->getPoolSize(), expected_capcity); } TieredSVSIndex *CreateTieredSVSIndex(const TieredIndexParams &tiered_params, @@ -2258,7 +2258,7 @@ class FP16SVSTieredIndexTest : public FP16SVSTest { // Set number of available threads to 1 unless specified otherwise, // so we can insert one vector at a time directly to svs. - tiered_index->GetSVSIndex()->setNumThreads(num_available_threads); + tiered_index->GetSVSIndex()->setParallelism(num_available_threads); size_t params_threadpool_size = tiered_params.primaryIndexParams->algoParams.svsParams.num_threads; size_t expected_capacity = diff --git a/tests/unit/test_svs_tiered.cpp b/tests/unit/test_svs_tiered.cpp index 3d9c0bd0d..528df8794 100644 --- a/tests/unit/test_svs_tiered.cpp +++ b/tests/unit/test_svs_tiered.cpp @@ -92,8 +92,8 @@ class SVSTieredIndexTest : public ::testing::Test { void verifyNumThreads(TieredSVSIndex *tiered_index, size_t expected_num_threads, size_t expected_capcity) { - ASSERT_EQ(tiered_index->GetSVSIndex()->getNumThreads(), expected_num_threads); - ASSERT_EQ(tiered_index->GetSVSIndex()->getThreadPoolCapacity(), expected_capcity); + ASSERT_EQ(tiered_index->GetSVSIndex()->getParallelism(), expected_num_threads); + ASSERT_EQ(tiered_index->GetSVSIndex()->getPoolSize(), expected_capcity); } TieredSVSIndex *CreateTieredSVSIndex(const TieredIndexParams &tiered_params, tieredIndexMock &mock_thread_pool, @@ -104,11 +104,11 @@ class SVSTieredIndexTest : public ::testing::Test { // Set the created tiered index in the index external context (it will take ownership over // the index, and we'll need to release the ctx at the end of the test. mock_thread_pool.ctx->index_strong_ref.reset(tiered_index); - // Set numThreads to 1 by default to allow direct calls to SVS addVector() API, + // Set parallelism to 1 by default to allow direct calls to SVS addVector() API, // which requires exactly 1 thread. When using tiered index addVector API, - // the thread count is managed internally according to the operation and threadpool - // capacity, so testing parallelism remains intact. - tiered_index->GetSVSIndex()->setNumThreads(num_available_threads); + // the parallelism is managed internally according to the operation and pool + // size, so testing parallelism remains intact. + tiered_index->GetSVSIndex()->setParallelism(num_available_threads); size_t params_threadpool_size = tiered_params.primaryIndexParams->algoParams.svsParams.num_threads; size_t expected_capacity = From c488bf03c6f1c6fd58f8913e7e736ae1964f4cf6 Mon Sep 17 00:00:00 2001 From: meiravgri Date: Fri, 3 Apr 2026 04:41:22 +0000 Subject: [PATCH 2/9] introduce VecSim_UpdateThreadPoolSize api --- src/VecSim/algorithms/svs/svs.h | 8 ++++++++ src/VecSim/vec_sim.cpp | 11 +++++++++++ src/VecSim/vec_sim.h | 10 ++++++++++ 3 files changed, 29 insertions(+) diff --git a/src/VecSim/algorithms/svs/svs.h b/src/VecSim/algorithms/svs/svs.h index 9d5ee1c3c..2786aa363 100644 --- a/src/VecSim/algorithms/svs/svs.h +++ b/src/VecSim/algorithms/svs/svs.h @@ -37,6 +37,14 @@ struct SVSIndexBase { SVSIndexBase() : num_marked_deleted{0} {}; virtual ~SVSIndexBase() = default; + + // Singleton accessor for the shared SVS thread pool. + // Returns a reference to the static shared_ptr, which is initially null. + // Created lazily on first VecSim_UpdateThreadPoolSize(n > 0) call. + static std::shared_ptr &getSharedThreadPool() { + static std::shared_ptr shared_pool; + return shared_pool; + } virtual int addVectors(const void *vectors_data, const labelType *labels, size_t n) = 0; virtual int deleteVectors(const labelType *labels, size_t n) = 0; virtual bool isLabelExists(labelType label) const = 0; diff --git a/src/VecSim/vec_sim.cpp b/src/VecSim/vec_sim.cpp index 004d575b7..a4b8201bc 100644 --- a/src/VecSim/vec_sim.cpp +++ b/src/VecSim/vec_sim.cpp @@ -15,6 +15,7 @@ #include "VecSim/vec_sim_index.h" #include "VecSim/vec_sim_adhoc_bf_ctx.h" #include "VecSim/types/bfloat16.h" +#include "VecSim/algorithms/svs/svs.h" #include #include "memory.h" @@ -34,6 +35,16 @@ extern "C" void VecSim_SetLogCallbackFunction(logCallbackFunction callback) { extern "C" void VecSim_SetWriteMode(VecSimWriteMode mode) { VecSimIndex::setWriteMode(mode); } +extern "C" void VecSim_UpdateThreadPoolSize(size_t new_size) { + if (new_size == 0) { + VecSimIndex::setWriteMode(VecSim_WriteInPlace); + } else { + VecSimIndex::setWriteMode(VecSim_WriteAsync); + } + // TODO Step 3: Create/resize the shared SVS thread pool singleton via + // SVSIndexBase::getSharedThreadPool() +} + static VecSimResolveCode _ResolveParams_EFRuntime(VecSimAlgo index_type, VecSimRawParam rparam, VecSimQueryParams *qparams, VecsimQueryType query_type) { diff --git a/src/VecSim/vec_sim.h b/src/VecSim/vec_sim.h index 30f19065d..14e1bf65d 100644 --- a/src/VecSim/vec_sim.h +++ b/src/VecSim/vec_sim.h @@ -312,6 +312,16 @@ void VecSim_SetTestLogContext(const char *test_name, const char *test_type); */ void VecSim_SetWriteMode(VecSimWriteMode mode); +/** + * @brief Update the shared SVS thread pool size and set the write mode accordingly. + * If new_size == 0, sets write mode to in-place (no background threads). + * If new_size > 0, sets write mode to async and resizes the shared thread pool. + * @note Should be called from the main thread only. + * + * @param new_size the new thread pool size (0 = in-place mode, >0 = async mode). + */ +void VecSim_UpdateThreadPoolSize(size_t new_size); + #ifdef __cplusplus } #endif From 344538169019e97191cb266c0d1c690f9b84974e Mon Sep 17 00:00:00 2001 From: meiravgri Date: Sun, 5 Apr 2026 11:03:19 +0000 Subject: [PATCH 3/9] imp new VecSimSVSThreadPoolImpl --- src/VecSim/algorithms/svs/svs_utils.h | 235 ++++++++++++++++++++------ 1 file changed, 183 insertions(+), 52 deletions(-) diff --git a/src/VecSim/algorithms/svs/svs_utils.h b/src/VecSim/algorithms/svs/svs_utils.h index 2e240358f..f05ceb94b 100644 --- a/src/VecSim/algorithms/svs/svs_utils.h +++ b/src/VecSim/algorithms/svs/svs_utils.h @@ -19,8 +19,10 @@ #include "svs/cpuid.h" #endif +#include #include #include +#include #include #include @@ -341,93 +343,222 @@ struct SVSGraphBuilder { } }; -// Custom thread pool for SVS index +// A slot in the shared SVS thread pool. Wraps an SVS Thread with an occupancy flag +// used by the rental mechanism. Stored via shared_ptr so that a renter's reference +// keeps the slot (and its OS thread) alive even if the pool shrinks mid-rental. +struct ThreadSlot { + svs::threads::Thread thread; + std::atomic occupied{false}; + + ThreadSlot() = default; + + // Non-copyable, non-movable (atomic is not movable) + ThreadSlot(const ThreadSlot &) = delete; + ThreadSlot &operator=(const ThreadSlot &) = delete; + ThreadSlot(ThreadSlot &&) = delete; + ThreadSlot &operator=(ThreadSlot &&) = delete; +}; + +// RAII guard for threads rented from the shared pool. On destruction, marks all +// rented slots as unoccupied (lock-free atomic stores). Holds shared_ptr references +// to keep slots alive even if the pool shrinks while the rental is active. +class RentedThreads { +public: + RentedThreads() = default; + + // Move-only + RentedThreads(RentedThreads &&other) noexcept : slots_(std::move(other.slots_)) {} + RentedThreads &operator=(RentedThreads &&other) noexcept { + release(); + slots_ = std::move(other.slots_); + return *this; + } + RentedThreads(const RentedThreads &) = delete; + RentedThreads &operator=(const RentedThreads &) = delete; + + ~RentedThreads() { release(); } + + void add(std::shared_ptr slot) { slots_.push_back(std::move(slot)); } + + size_t count() const { return slots_.size(); } + + svs::threads::Thread &operator[](size_t i) { return slots_[i]->thread; } + +private: + void release() { + for (auto &slot : slots_) { + slot->occupied.store(false, std::memory_order_release); + } + slots_.clear(); + } + + std::vector> slots_; +}; + +// Shared thread pool for SVS indexes with rental model. // Based on svs::threads::NativeThreadPoolBase with changes: -// * Number of threads is fixed on construction time -// * Pool is resizable in bounds of pre-allocated threads +// * Pool is physically resizable (creates/destroys OS threads) +// * Threads are rented for the duration of a parallel_for call +// * Multiple callers can rent disjoint subsets of threads concurrently +// * Shrinking while threads are rented is safe (shared_ptr lifecycle) class VecSimSVSThreadPoolImpl { public: - // Allocate `num_threads - 1` threads since the main thread participates in the work - // as well. - explicit VecSimSVSThreadPoolImpl(size_t num_threads = 1) - : size_{num_threads}, threads_(num_threads - 1) {} + // Create a pool with `num_threads` total parallelism (including the calling thread). + // Spawns `num_threads - 1` worker OS threads. num_threads must be >= 1. + // When no threads are needed (write-in-place mode), the pool should not be created + // at all — the singleton should remain nullptr. + explicit VecSimSVSThreadPoolImpl(size_t num_threads = 1) { + assert(num_threads && "VecSimSVSThreadPoolImpl should not be created with 0 threads"); + slots_.reserve(num_threads - 1); + for (size_t i = 0; i < num_threads - 1; ++i) { + slots_.push_back(std::make_shared()); + } + } + + // Total parallelism: worker slots + 1 (the calling thread always participates). + size_t size() const { + std::lock_guard lock{pool_mutex_}; + return slots_.size() + 1; + } - size_t capacity() const { return threads_.size() + 1; } - size_t size() const { return size_; } + // Alias for size(). Capacity and size are always equal since resize is physical. + // TODO: is it needed? can we remove one of them? + size_t capacity() const { return size(); } - // Support resize - do not modify threads container just limit the size + // Physically resize the pool. Creates new OS threads on grow, shuts down idle threads + // on shrink. new_size is total parallelism including the calling thread (minimum 1). + // Occupied threads (held by renters) survive shrink via shared_ptr — their OS thread + // is joined when the last shared_ptr reference is dropped (in ~RentedThreads). void resize(size_t new_size) { - std::lock_guard lock{use_mutex_}; - size_ = std::clamp(new_size, size_t{1}, threads_.size() + 1); + new_size = std::max(new_size, size_t{1}); + size_t target_workers = new_size - 1; + + std::lock_guard lock{pool_mutex_}; + + if (target_workers > slots_.size()) { + // Grow: spawn new worker threads + slots_.reserve(target_workers); + for (size_t i = slots_.size(); i < target_workers; ++i) { + slots_.push_back(std::make_shared()); + } + } else if (target_workers < slots_.size()) { + // Shrink: remove slots from the back. + // If a slot is occupied, the renter's shared_ptr keeps it alive; the OS thread + // will be joined when ~ThreadSlot runs after ~RentedThreads releases it. + // If a slot is idle, dropping the shared_ptr here triggers immediate shutdown. + slots_.resize(target_workers); + } } - void parallel_for(std::function f, size_t n) { - if (n > size_) { - throw svs::threads::ThreadingException("Number of tasks exceeds the thread pool size"); + // Rent up to `count` worker threads from the pool. Returns an RAII guard that + // automatically releases the threads when destroyed. + // The SVS pool is sized to match the RediSearch thread pool, and RediSearch controls + // scheduling via reserve jobs, so all requested slots should always be available. + // Getting fewer threads than requested indicates a bug in the scheduling logic. + RentedThreads rent(size_t count) { + RentedThreads rented; + if (count == 0) { + return rented; + } + + std::lock_guard lock{pool_mutex_}; + for (auto &slot : slots_) { + if (rented.count() >= count) { + break; + } + bool expected = false; + if (slot->occupied.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { + rented.add(slot); + } } + + if (rented.count() < count) { + svs::logging::warn("SVS thread pool: rented {} threads out of {} requested " + "(pool has {} slots). This should not happen.", + rented.count(), count, slots_.size()); + assert(false && "Failed to rent the expected number of SVS threads"); + } + return rented; + } + + // Execute `f` in parallel with `n` partitions. The calling thread runs partition 0, + // and up to `n-1` worker threads are rented for partitions 1..n-1. + // Same signature as the SVS ThreadPool concept. + void parallel_for(std::function f, size_t n) { if (n == 0) { return; - } else if (n == 1) { - // Run on the main function. + } + if (n == 1) { + // Single partition: run on the calling thread, no rental needed. try { f(0); } catch (const std::exception &error) { - manage_exception_during_run(error.what()); + // No workers to check — just rethrow with formatted message. + auto msg = fmt::format("Thread 0: {}\n", error.what()); + throw svs::threads::ThreadingException{std::move(msg)}; } return; - } else { - std::lock_guard lock{use_mutex_}; - for (size_t i = 0; i < n - 1; ++i) { - threads_[i].assign({&f, i + 1}); - } - // Run on the main function. - try { - f(0); - } catch (const std::exception &error) { - manage_exception_during_run(error.what()); - } + } - // Wait until all threads are done. - // If any thread fails, then we're throwing. - for (size_t i = 0; i < size_ - 1; ++i) { - auto &thread = threads_[i]; - thread.wait(); - if (!thread.is_okay()) { - manage_exception_during_run(); - } - } + // Rent n-1 worker threads + auto rented = rent(n - 1); + assert(rented.count() == n - 1); + size_t num_workers = n - 1; + + // Assign work to rented workers (partitions 1..n-1) + for (size_t i = 0; i < num_workers; ++i) { + rented[i].assign({&f, i + 1}); + } + + // Run partition 0 on the calling thread + std::string main_thread_error; + try { + f(0); + } catch (const std::exception &error) { + main_thread_error = error.what(); } + + // Wait for all rented workers and collect errors. + // RentedThreads destructor will release the slots after this block. + manage_workers_after_run(main_thread_error, rented, num_workers); } - void manage_exception_during_run(const std::string &thread_0_message = {}) { +private: + // Wait for all rented workers to finish. If any worker (or the main thread) threw, + // restart crashed workers and throw a combined exception. + void manage_workers_after_run(const std::string &main_thread_error, RentedThreads &rented, + size_t rented_count) { auto message = std::string{}; auto inserter = std::back_inserter(message); - if (!thread_0_message.empty()) { - fmt::format_to(inserter, "Thread 0: {}\n", thread_0_message); + bool has_error = !main_thread_error.empty(); + + if (has_error) { + fmt::format_to(inserter, "Thread 0: {}\n", main_thread_error); } - // Manage all other exceptions thrown, restarting crashed threads. - for (size_t i = 0; i < size_ - 1; ++i) { - auto &thread = threads_[i]; + for (size_t i = 0; i < rented_count; ++i) { + auto &thread = rented[i]; thread.wait(); if (!thread.is_okay()) { + has_error = true; try { thread.unsafe_get_exception(); } catch (const std::exception &error) { fmt::format_to(inserter, "Thread {}: {}\n", i + 1, error.what()); } - // Restart the thread. - threads_[i].shutdown(); - threads_[i] = svs::threads::Thread{}; + // Restart the crashed thread so the slot is usable again. + thread.shutdown(); + thread = svs::threads::Thread{}; } } - throw svs::threads::ThreadingException{std::move(message)}; + + if (has_error) { + throw svs::threads::ThreadingException{std::move(message)}; + } } -private: - std::mutex use_mutex_; - size_t size_; - std::vector threads_; + mutable std::mutex pool_mutex_; + std::vector> slots_; }; // Copy-movable wrapper for VecSimSVSThreadPoolImpl From 13454cb548218f6c7ac080f47d88b14de2af3b99 Mon Sep 17 00:00:00 2001 From: meiravgri Date: Sun, 5 Apr 2026 13:50:53 +0000 Subject: [PATCH 4/9] imp VecSimSVSThreadPool --- src/VecSim/algorithms/svs/svs.h | 15 +++--- src/VecSim/algorithms/svs/svs_utils.h | 49 ++++++++++++++---- tests/unit/test_svs_tiered.cpp | 72 +++++++++++++++++++-------- 3 files changed, 97 insertions(+), 39 deletions(-) diff --git a/src/VecSim/algorithms/svs/svs.h b/src/VecSim/algorithms/svs/svs.h index 2786aa363..cd03370bf 100644 --- a/src/VecSim/algorithms/svs/svs.h +++ b/src/VecSim/algorithms/svs/svs.h @@ -39,10 +39,10 @@ struct SVSIndexBase virtual ~SVSIndexBase() = default; // Singleton accessor for the shared SVS thread pool. - // Returns a reference to the static shared_ptr, which is initially null. - // Created lazily on first VecSim_UpdateThreadPoolSize(n > 0) call. + // Always valid — initialized with size 1 (write-in-place mode: 0 worker threads, + // only the calling thread participates). Resized on VecSim_UpdateThreadPoolSize() calls. static std::shared_ptr &getSharedThreadPool() { - static std::shared_ptr shared_pool; + static auto shared_pool = std::make_shared(1); return shared_pool; } virtual int addVectors(const void *vectors_data, const labelType *labels, size_t n) = 0; @@ -369,8 +369,7 @@ class SVSIndex : public VecSimIndexAbstract, fl svs_details::getOrDefault(params.leanvec_dim, SVS_VAMANA_DEFAULT_LEANVEC_DIM)}, epsilon{svs_details::getOrDefault(params.epsilon, SVS_VAMANA_DEFAULT_EPSILON)}, is_two_level_lvq{isTwoLevelLVQ(params.quantBits)}, - threadpool_{std::max(size_t{SVS_VAMANA_DEFAULT_NUM_THREADS}, params.num_threads)}, - impl_{nullptr} { + threadpool_{SVSIndexBase::getSharedThreadPool()}, impl_{nullptr} { logger_ = makeLogger(); } @@ -549,10 +548,10 @@ class SVSIndex : public VecSimIndexAbstract, fl return impl_ ? impl_->has_id(label) : false; } - size_t getParallelism() const override { return threadpool_.size(); } - void setParallelism(size_t parallelism) override { threadpool_.resize(parallelism); } + size_t getParallelism() const override { return threadpool_.getParallelism(); } + void setParallelism(size_t parallelism) override { threadpool_.setParallelism(parallelism); } - size_t getPoolSize() const override { return threadpool_.capacity(); } + size_t getPoolSize() const override { return threadpool_.poolSize(); } bool isCompressed() const override { return storage_traits_t::is_compressed(); } diff --git a/src/VecSim/algorithms/svs/svs_utils.h b/src/VecSim/algorithms/svs/svs_utils.h index f05ceb94b..48ad9d72e 100644 --- a/src/VecSim/algorithms/svs/svs_utils.h +++ b/src/VecSim/algorithms/svs/svs_utils.h @@ -405,8 +405,8 @@ class VecSimSVSThreadPoolImpl { public: // Create a pool with `num_threads` total parallelism (including the calling thread). // Spawns `num_threads - 1` worker OS threads. num_threads must be >= 1. - // When no threads are needed (write-in-place mode), the pool should not be created - // at all — the singleton should remain nullptr. + // In write-in-place mode, the pool is created with num_threads == 1 (0 worker threads, + // only the calling thread participates). explicit VecSimSVSThreadPoolImpl(size_t num_threads = 1) { assert(num_threads && "VecSimSVSThreadPoolImpl should not be created with 0 threads"); slots_.reserve(num_threads - 1); @@ -561,21 +561,50 @@ class VecSimSVSThreadPoolImpl { std::vector> slots_; }; -// Copy-movable wrapper for VecSimSVSThreadPoolImpl +// Per-index wrapper around the shared VecSimSVSThreadPoolImpl singleton. +// Lightweight, copyable (SVS stores a copy via ThreadPoolHandle). Both the original +// and SVS's copy share the same pool_ and parallelism_ via shared_ptr, so state +// changes propagate automatically. +// Satisfies the svs::threads::ThreadPool concept (size() + parallel_for). +// The pool is always valid — in write-in-place mode it has size 1 (0 worker threads). class VecSimSVSThreadPool { private: - std::shared_ptr pool_; + std::shared_ptr pool_; // shared across all indexes + std::shared_ptr> parallelism_; // per-index, shared across copies public: - explicit VecSimSVSThreadPool(size_t num_threads = 1) - : pool_{std::make_shared(num_threads)} {} + // Construct with reference to the shared pool singleton. + // parallelism_ starts at 0 — caller must call setParallelism() before parallel_for(). + explicit VecSimSVSThreadPool(std::shared_ptr pool) + : pool_(std::move(pool)), parallelism_(std::make_shared>(0)) { + assert(pool_ && "Pool must not be null"); + } + + // Set the degree of parallelism for this index's next operation. + // n must be the number of threads actually reserved by the caller (i.e., the + // RediSearch workers that checked in via ReserveThreadJob). This is what allows + // us to assert n <= pool size: reserved workers are occupied RediSearch threads, + // so the pool cannot shrink while they are held, and n cannot exceed the pool size. + void setParallelism(size_t n) { + assert(n >= 1 && "Parallelism must be at least 1 (the calling thread)"); + assert(n <= pool_->size() && "Parallelism exceeds shared pool size"); + parallelism_->store(n); + } + size_t getParallelism() const { return parallelism_->load(); } - size_t capacity() const { return pool_->capacity(); } - size_t size() const { return pool_->size(); } + // Returns per-index parallelism. SVS uses this for task partitioning (ThreadPool concept). + size_t size() const { return parallelism_->load(); } + // Shared pool size — used by scheduling to decide how many reserve jobs to submit. + size_t poolSize() const { return pool_->size(); } + + // Delegates to the shared pool's parallel_for. + // n may be less than parallelism_ when the problem size is smaller than the + // thread count (SVS computes n = min(arg.size(), pool.size())). + // n must not exceed parallelism_ — we only have that many threads reserved. void parallel_for(std::function f, size_t n) { + assert(parallelism_->load() > 0 && "setParallelism must be called before parallel_for"); + assert(n <= parallelism_->load() && "n exceeds reserved thread count (parallelism)"); pool_->parallel_for(std::move(f), n); } - - void resize(size_t new_size) { pool_->resize(new_size); } }; diff --git a/tests/unit/test_svs_tiered.cpp b/tests/unit/test_svs_tiered.cpp index 528df8794..d079d64e5 100644 --- a/tests/unit/test_svs_tiered.cpp +++ b/tests/unit/test_svs_tiered.cpp @@ -3823,46 +3823,76 @@ TYPED_TEST(SVSTieredIndexTestBasic, testDeletedJournalMulti) { } TEST(SVSTieredIndexTest, testThreadPool) { - // Test VecSimSVSThreadPool + // Test VecSimSVSThreadPool with shared pool const size_t num_threads = 4; - auto pool = VecSimSVSThreadPool(num_threads); - ASSERT_EQ(pool.capacity(), num_threads); + auto shared_pool = std::make_shared(num_threads); + auto pool = VecSimSVSThreadPool(shared_pool); + ASSERT_EQ(pool.poolSize(), num_threads); + ASSERT_EQ(pool.size(), 0); // parallelism starts at 0 + ASSERT_EQ(pool.getParallelism(), 0); + + // Must call setParallelism before parallel_for + pool.setParallelism(num_threads); ASSERT_EQ(pool.size(), num_threads); + ASSERT_EQ(pool.getParallelism(), num_threads); std::atomic_int counter(0); auto task = [&counter](size_t i) { counter += i + 1; }; - // exeed the number of threads - ASSERT_THROW(pool.parallel_for(task, 10), svs::threads::ThreadingException); + // n > parallelism is a bug — asserts in debug mode +#if !defined(RUNNING_ON_VALGRIND) && !defined(NDEBUG) + ASSERT_DEATH(pool.parallel_for(task, 10), "n exceeds reserved thread count"); +#endif counter = 0; - pool.parallel_for(task, 4); + pool.parallel_for(task, num_threads); ASSERT_EQ(counter, 10); // 1+2+3+4 = 10 - pool.resize(1); - ASSERT_EQ(pool.capacity(), 4); - ASSERT_EQ(pool.size(), 1); - // exeed the new pool size - ASSERT_THROW(pool.parallel_for(task, 4), svs::threads::ThreadingException); + // n < parallelism is valid (SVS may partition into fewer tasks for small problems) + counter = 0; + pool.parallel_for(task, 2); + ASSERT_EQ(counter, 3); // 1+2 = 3 + + // setParallelism changes per-index parallelism, not the shared pool + pool.setParallelism(1); + ASSERT_EQ(pool.poolSize(), num_threads); // shared pool unchanged + ASSERT_EQ(pool.size(), 1); // per-index parallelism + ASSERT_EQ(pool.getParallelism(), 1); counter = 0; pool.parallel_for(task, 1); ASSERT_EQ(counter, 1); // 0+1 = 1 - pool.resize(0); - ASSERT_EQ(pool.capacity(), 4); - ASSERT_EQ(pool.size(), 1); + // setParallelism boundary checks — asserts in debug mode +#if !defined(RUNNING_ON_VALGRIND) && !defined(NDEBUG) + ASSERT_DEATH(pool.setParallelism(0), "Parallelism must be at least 1"); + ASSERT_DEATH(pool.setParallelism(10), "Parallelism exceeds shared pool size"); +#endif - pool.resize(5); - ASSERT_EQ(pool.capacity(), 4); - ASSERT_EQ(pool.size(), 4); + // Test write-in-place mode (pool with size 1) + auto inplace_shared = std::make_shared(1); + auto inplace_pool = VecSimSVSThreadPool(inplace_shared); + inplace_pool.setParallelism(1); + ASSERT_EQ(inplace_pool.size(), 1); + ASSERT_EQ(inplace_pool.poolSize(), 1); + counter = 0; + inplace_pool.parallel_for(task, 1); + ASSERT_EQ(counter, 1); + + // parallel_for without setParallelism asserts in debug mode +#if !defined(RUNNING_ON_VALGRIND) && !defined(NDEBUG) + auto unset_pool = VecSimSVSThreadPool(shared_pool); + ASSERT_DEATH(unset_pool.parallel_for(task, 1), + "setParallelism must be called before parallel_for"); +#endif - // Test VecSimSVSThreadPool for exception handling + // Test exception handling auto err_task = [](size_t) { throw std::runtime_error("Test exception"); }; - - ASSERT_NO_THROW(pool.parallel_for(err_task, 0)); // no task - no err + // n=0 is a no-op (no tasks to run, no error) + ASSERT_NO_THROW(pool.parallel_for(err_task, 0)); + pool.setParallelism(num_threads); ASSERT_THROW(pool.parallel_for(err_task, 1), svs::threads::ThreadingException); - ASSERT_THROW(pool.parallel_for(err_task, 4), svs::threads::ThreadingException); + ASSERT_THROW(pool.parallel_for(err_task, num_threads), svs::threads::ThreadingException); } #else // HAVE_SVS From 3c69a481f3e9e58d1c6b160aa99445ea46583653 Mon Sep 17 00:00:00 2001 From: meiravgri Date: Sun, 5 Apr 2026 13:58:51 +0000 Subject: [PATCH 5/9] add resize handling --- src/VecSim/vec_sim.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/VecSim/vec_sim.cpp b/src/VecSim/vec_sim.cpp index a4b8201bc..c9237d6fe 100644 --- a/src/VecSim/vec_sim.cpp +++ b/src/VecSim/vec_sim.cpp @@ -41,8 +41,10 @@ extern "C" void VecSim_UpdateThreadPoolSize(size_t new_size) { } else { VecSimIndex::setWriteMode(VecSim_WriteAsync); } - // TODO Step 3: Create/resize the shared SVS thread pool singleton via - // SVSIndexBase::getSharedThreadPool() + // Resize the shared SVS thread pool. resize() clamps to minimum size 1. + // new_size == 0 → pool size 1 (only calling thread, write-in-place). + // new_size > 0 → pool size new_size (new_size - 1 worker threads). + SVSIndexBase::getSharedThreadPool()->resize(std::max(new_size, size_t{1})); } static VecSimResolveCode _ResolveParams_EFRuntime(VecSimAlgo index_type, VecSimRawParam rparam, From 99eae2304dae6f58d633303dbdc7a2049ad70a95 Mon Sep 17 00:00:00 2001 From: meiravgri Date: Mon, 6 Apr 2026 07:38:07 +0000 Subject: [PATCH 6/9] log ctx --- src/VecSim/algorithms/svs/svs.h | 4 +-- src/VecSim/algorithms/svs/svs_utils.h | 35 +++++++++++++++------------ 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/src/VecSim/algorithms/svs/svs.h b/src/VecSim/algorithms/svs/svs.h index cd03370bf..f44135865 100644 --- a/src/VecSim/algorithms/svs/svs.h +++ b/src/VecSim/algorithms/svs/svs.h @@ -41,7 +41,7 @@ struct SVSIndexBase // Singleton accessor for the shared SVS thread pool. // Always valid — initialized with size 1 (write-in-place mode: 0 worker threads, // only the calling thread participates). Resized on VecSim_UpdateThreadPoolSize() calls. - static std::shared_ptr &getSharedThreadPool() { + static std::shared_ptr getSharedThreadPool() { static auto shared_pool = std::make_shared(1); return shared_pool; } @@ -369,7 +369,7 @@ class SVSIndex : public VecSimIndexAbstract, fl svs_details::getOrDefault(params.leanvec_dim, SVS_VAMANA_DEFAULT_LEANVEC_DIM)}, epsilon{svs_details::getOrDefault(params.epsilon, SVS_VAMANA_DEFAULT_EPSILON)}, is_two_level_lvq{isTwoLevelLVQ(params.quantBits)}, - threadpool_{SVSIndexBase::getSharedThreadPool()}, impl_{nullptr} { + threadpool_{SVSIndexBase::getSharedThreadPool(), this->logCallbackCtx}, impl_{nullptr} { logger_ = makeLogger(); } diff --git a/src/VecSim/algorithms/svs/svs_utils.h b/src/VecSim/algorithms/svs/svs_utils.h index 48ad9d72e..c8daee631 100644 --- a/src/VecSim/algorithms/svs/svs_utils.h +++ b/src/VecSim/algorithms/svs/svs_utils.h @@ -9,6 +9,7 @@ #pragma once #include "VecSim/query_results.h" +#include "VecSim/vec_sim_interface.h" #include "VecSim/types/float16.h" #include "svs/core/distance.h" @@ -421,10 +422,6 @@ class VecSimSVSThreadPoolImpl { return slots_.size() + 1; } - // Alias for size(). Capacity and size are always equal since resize is physical. - // TODO: is it needed? can we remove one of them? - size_t capacity() const { return size(); } - // Physically resize the pool. Creates new OS threads on grow, shuts down idle threads // on shrink. new_size is total parallelism including the calling thread (minimum 1). // Occupied threads (held by renters) survive shrink via shared_ptr — their OS thread @@ -455,7 +452,7 @@ class VecSimSVSThreadPoolImpl { // The SVS pool is sized to match the RediSearch thread pool, and RediSearch controls // scheduling via reserve jobs, so all requested slots should always be available. // Getting fewer threads than requested indicates a bug in the scheduling logic. - RentedThreads rent(size_t count) { + RentedThreads rent(size_t count, void *log_ctx = nullptr) { RentedThreads rented; if (count == 0) { return rented; @@ -473,9 +470,13 @@ class VecSimSVSThreadPoolImpl { } if (rented.count() < count) { - svs::logging::warn("SVS thread pool: rented {} threads out of {} requested " - "(pool has {} slots). This should not happen.", - rented.count(), count, slots_.size()); + auto msg = fmt::format("SVS thread pool: rented {} threads out of {} requested " + "(pool has {} slots). This should not happen.", + rented.count(), count, slots_.size()); + if (VecSimIndexInterface::logCallback) { + assert(log_ctx && "Log context must be provided when logging is available"); + VecSimIndexInterface::logCallback(log_ctx, "warning", msg.c_str()); + } assert(false && "Failed to rent the expected number of SVS threads"); } return rented; @@ -484,7 +485,7 @@ class VecSimSVSThreadPoolImpl { // Execute `f` in parallel with `n` partitions. The calling thread runs partition 0, // and up to `n-1` worker threads are rented for partitions 1..n-1. // Same signature as the SVS ThreadPool concept. - void parallel_for(std::function f, size_t n) { + void parallel_for(std::function f, size_t n, void *log_ctx = nullptr) { if (n == 0) { return; } @@ -501,9 +502,8 @@ class VecSimSVSThreadPoolImpl { } // Rent n-1 worker threads - auto rented = rent(n - 1); - assert(rented.count() == n - 1); - size_t num_workers = n - 1; + auto rented = rent(n - 1, log_ctx); + size_t num_workers = rented.count(); // Assign work to rented workers (partitions 1..n-1) for (size_t i = 0; i < num_workers; ++i) { @@ -571,12 +571,15 @@ class VecSimSVSThreadPool { private: std::shared_ptr pool_; // shared across all indexes std::shared_ptr> parallelism_; // per-index, shared across copies + void *log_ctx_ = nullptr; // per-index log context public: // Construct with reference to the shared pool singleton. // parallelism_ starts at 0 — caller must call setParallelism() before parallel_for(). - explicit VecSimSVSThreadPool(std::shared_ptr pool) - : pool_(std::move(pool)), parallelism_(std::make_shared>(0)) { + explicit VecSimSVSThreadPool(std::shared_ptr pool, + void *log_ctx = nullptr) + : pool_(std::move(pool)), parallelism_(std::make_shared>(0)), + log_ctx_(log_ctx) { assert(pool_ && "Pool must not be null"); } @@ -598,13 +601,13 @@ class VecSimSVSThreadPool { // Shared pool size — used by scheduling to decide how many reserve jobs to submit. size_t poolSize() const { return pool_->size(); } - // Delegates to the shared pool's parallel_for. + // Delegates to the shared pool's parallel_for, passing the per-index log context. // n may be less than parallelism_ when the problem size is smaller than the // thread count (SVS computes n = min(arg.size(), pool.size())). // n must not exceed parallelism_ — we only have that many threads reserved. void parallel_for(std::function f, size_t n) { assert(parallelism_->load() > 0 && "setParallelism must be called before parallel_for"); assert(n <= parallelism_->load() && "n exceeds reserved thread count (parallelism)"); - pool_->parallel_for(std::move(f), n); + pool_->parallel_for(std::move(f), n, log_ctx_); } }; From 0a34edfb9626e6aeccc13b94ba1f4e811fb4d98b Mon Sep 17 00:00:00 2001 From: meiravgri Date: Mon, 6 Apr 2026 10:29:31 +0000 Subject: [PATCH 7/9] default paralleism to 1 --- src/VecSim/algorithms/svs/svs_tiered.h | 8 ++++---- src/VecSim/algorithms/svs/svs_utils.h | 7 ++++--- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/VecSim/algorithms/svs/svs_tiered.h b/src/VecSim/algorithms/svs/svs_tiered.h index 8ca67bf5a..85165e7ff 100644 --- a/src/VecSim/algorithms/svs/svs_tiered.h +++ b/src/VecSim/algorithms/svs/svs_tiered.h @@ -819,10 +819,10 @@ class TieredSVSIndex : public VecSimTieredIndex { // prevent update job from running in parallel and lock any access to the backend // index std::scoped_lock lock(this->updateJobMutex, this->mainIndexGuard); - // Set available thread count to 1 for single vector write-in-place operation. - // This maintains the contract that single vector operations use exactly one thread. - // TODO: Replace this setParallelism(1) call with an assertion once we establish - // a contract that write-in-place mode guarantees parallelism == 1. + // Defensive: ensure single-threaded operation for write-in-place mode. + // parallelism_ defaults to 1, so this is a no-op in the normal case. + assert(svs_index->getParallelism() == 1 && + "Parallelism should be 1 for write-in-place mode, but it is not"); svs_index->setParallelism(1); return this->backendIndex->addVector(storage_blob.get(), label); } diff --git a/src/VecSim/algorithms/svs/svs_utils.h b/src/VecSim/algorithms/svs/svs_utils.h index c8daee631..f1f35e8b6 100644 --- a/src/VecSim/algorithms/svs/svs_utils.h +++ b/src/VecSim/algorithms/svs/svs_utils.h @@ -575,10 +575,12 @@ class VecSimSVSThreadPool { public: // Construct with reference to the shared pool singleton. - // parallelism_ starts at 0 — caller must call setParallelism() before parallel_for(). + // parallelism_ starts at 1 (the calling thread always participates), matching the + // pool's minimum size. Safe for immediate use in write-in-place mode without an + // explicit setParallelism() call. explicit VecSimSVSThreadPool(std::shared_ptr pool, void *log_ctx = nullptr) - : pool_(std::move(pool)), parallelism_(std::make_shared>(0)), + : pool_(std::move(pool)), parallelism_(std::make_shared>(1)), log_ctx_(log_ctx) { assert(pool_ && "Pool must not be null"); } @@ -606,7 +608,6 @@ class VecSimSVSThreadPool { // thread count (SVS computes n = min(arg.size(), pool.size())). // n must not exceed parallelism_ — we only have that many threads reserved. void parallel_for(std::function f, size_t n) { - assert(parallelism_->load() > 0 && "setParallelism must be called before parallel_for"); assert(n <= parallelism_->load() && "n exceeds reserved thread count (parallelism)"); pool_->parallel_for(std::move(f), n, log_ctx_); } From d7108220293c6d0de2739788428670814322571b Mon Sep 17 00:00:00 2001 From: meiravgri Date: Mon, 6 Apr 2026 12:38:11 +0000 Subject: [PATCH 8/9] fix VecSim_UpdateThreadPoolSize to include current thread --- src/VecSim/vec_sim.cpp | 7 ++++--- src/VecSim/vec_sim.h | 3 ++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/VecSim/vec_sim.cpp b/src/VecSim/vec_sim.cpp index c9237d6fe..1484d7071 100644 --- a/src/VecSim/vec_sim.cpp +++ b/src/VecSim/vec_sim.cpp @@ -41,10 +41,11 @@ extern "C" void VecSim_UpdateThreadPoolSize(size_t new_size) { } else { VecSimIndex::setWriteMode(VecSim_WriteAsync); } - // Resize the shared SVS thread pool. resize() clamps to minimum size 1. + // Resize the shared SVS thread pool. new_size is the number of RediSearch worker + // threads. The SVS pool's total parallelism includes the calling thread (+1). // new_size == 0 → pool size 1 (only calling thread, write-in-place). - // new_size > 0 → pool size new_size (new_size - 1 worker threads). - SVSIndexBase::getSharedThreadPool()->resize(std::max(new_size, size_t{1})); + // new_size > 0 → pool size new_size + 1 (new_size workers + calling thread). + SVSIndexBase::getSharedThreadPool()->resize(new_size + 1); } static VecSimResolveCode _ResolveParams_EFRuntime(VecSimAlgo index_type, VecSimRawParam rparam, diff --git a/src/VecSim/vec_sim.h b/src/VecSim/vec_sim.h index 14e1bf65d..e9ffd792c 100644 --- a/src/VecSim/vec_sim.h +++ b/src/VecSim/vec_sim.h @@ -318,7 +318,8 @@ void VecSim_SetWriteMode(VecSimWriteMode mode); * If new_size > 0, sets write mode to async and resizes the shared thread pool. * @note Should be called from the main thread only. * - * @param new_size the new thread pool size (0 = in-place mode, >0 = async mode). + * @param new_size the number of worker threads (0 = in-place mode, >0 = async mode). + * The total SVS pool parallelism will be new_size + 1 (workers + calling thread). */ void VecSim_UpdateThreadPoolSize(size_t new_size); From 1381cd14a5113dfaeacb9f00e7bf08a1a600848d Mon Sep 17 00:00:00 2001 From: meiravgri Date: Mon, 6 Apr 2026 14:01:33 +0000 Subject: [PATCH 9/9] revert resize --- src/VecSim/algorithms/svs/svs_tiered.h | 5 +- src/VecSim/vec_sim.cpp | 7 +- src/VecSim/vec_sim.h | 3 +- tests/unit/test_svs.cpp | 4 +- tests/unit/test_svs_fp16.cpp | 24 ++-- tests/unit/test_svs_tiered.cpp | 162 +++++++++++-------------- 6 files changed, 88 insertions(+), 117 deletions(-) diff --git a/src/VecSim/algorithms/svs/svs_tiered.h b/src/VecSim/algorithms/svs/svs_tiered.h index 85165e7ff..6d34779bb 100644 --- a/src/VecSim/algorithms/svs/svs_tiered.h +++ b/src/VecSim/algorithms/svs/svs_tiered.h @@ -677,7 +677,8 @@ class TieredSVSIndex : public VecSimTieredIndex { } // release frontend index executeTracingCallback("UpdateJob::before_add_to_svs"); - { // lock backend index for writing and add vectors there + if (!labels_to_move.empty()) { + // lock backend index for writing and add vectors there std::shared_lock main_shared_lock(this->mainIndexGuard); auto svs_index = GetSVSIndex(); assert(labels_to_move.size() == vectors_to_move.size() / this->frontendIndex->getDim()); @@ -821,8 +822,6 @@ class TieredSVSIndex : public VecSimTieredIndex { std::scoped_lock lock(this->updateJobMutex, this->mainIndexGuard); // Defensive: ensure single-threaded operation for write-in-place mode. // parallelism_ defaults to 1, so this is a no-op in the normal case. - assert(svs_index->getParallelism() == 1 && - "Parallelism should be 1 for write-in-place mode, but it is not"); svs_index->setParallelism(1); return this->backendIndex->addVector(storage_blob.get(), label); } diff --git a/src/VecSim/vec_sim.cpp b/src/VecSim/vec_sim.cpp index 1484d7071..c9237d6fe 100644 --- a/src/VecSim/vec_sim.cpp +++ b/src/VecSim/vec_sim.cpp @@ -41,11 +41,10 @@ extern "C" void VecSim_UpdateThreadPoolSize(size_t new_size) { } else { VecSimIndex::setWriteMode(VecSim_WriteAsync); } - // Resize the shared SVS thread pool. new_size is the number of RediSearch worker - // threads. The SVS pool's total parallelism includes the calling thread (+1). + // Resize the shared SVS thread pool. resize() clamps to minimum size 1. // new_size == 0 → pool size 1 (only calling thread, write-in-place). - // new_size > 0 → pool size new_size + 1 (new_size workers + calling thread). - SVSIndexBase::getSharedThreadPool()->resize(new_size + 1); + // new_size > 0 → pool size new_size (new_size - 1 worker threads). + SVSIndexBase::getSharedThreadPool()->resize(std::max(new_size, size_t{1})); } static VecSimResolveCode _ResolveParams_EFRuntime(VecSimAlgo index_type, VecSimRawParam rparam, diff --git a/src/VecSim/vec_sim.h b/src/VecSim/vec_sim.h index e9ffd792c..14e1bf65d 100644 --- a/src/VecSim/vec_sim.h +++ b/src/VecSim/vec_sim.h @@ -318,8 +318,7 @@ void VecSim_SetWriteMode(VecSimWriteMode mode); * If new_size > 0, sets write mode to async and resizes the shared thread pool. * @note Should be called from the main thread only. * - * @param new_size the number of worker threads (0 = in-place mode, >0 = async mode). - * The total SVS pool parallelism will be new_size + 1 (workers + calling thread). + * @param new_size the new thread pool size (0 = in-place mode, >0 = async mode). */ void VecSim_UpdateThreadPoolSize(size_t new_size); diff --git a/tests/unit/test_svs.cpp b/tests/unit/test_svs.cpp index 8dd35dd8e..f70681425 100644 --- a/tests/unit/test_svs.cpp +++ b/tests/unit/test_svs.cpp @@ -1676,8 +1676,8 @@ TYPED_TEST(SVSTest, test_svs_parameter_combinations_and_defaults) { .constructionWindowSize = 100, .maxCandidatePoolSize = 500, .pruneTo = 55, - .useSearchHistory = false, // VecSimOption_DISABLE - .numThreads = 4, + .useSearchHistory = false, // VecSimOption_DISABLE + .numThreads = SVS_VAMANA_DEFAULT_NUM_THREADS, // Deprecated, expect default to be used .numberOfMarkedDeletedNodes = 0, .searchWindowSize = 20, .searchBufferCapacity = 40, diff --git a/tests/unit/test_svs_fp16.cpp b/tests/unit/test_svs_fp16.cpp index 82937ef32..2f57b1c16 100644 --- a/tests/unit/test_svs_fp16.cpp +++ b/tests/unit/test_svs_fp16.cpp @@ -2228,9 +2228,6 @@ class FP16SVSTieredIndexTest : public FP16SVSTest { size_t update_threshold = SVS_VAMANA_DEFAULT_UPDATE_THRESHOLD) { // trainingThreshold = training_threshold; // updateThreshold = update_threshold; - if (svs_params.algoParams.svsParams.num_threads == 0) { - svs_params.algoParams.svsParams.num_threads = mock_thread_pool.thread_pool_size; - } return TieredIndexParams{ .jobQueue = &mock_thread_pool.jobQ, .jobQueueCtx = mock_thread_pool.ctx, @@ -2249,6 +2246,10 @@ class FP16SVSTieredIndexTest : public FP16SVSTest { TieredSVSIndex *CreateTieredSVSIndex(const TieredIndexParams &tiered_params, tieredIndexMock &mock_thread_pool, size_t num_available_threads = 1) { + // Resize the shared SVS thread pool singleton to match the mock thread pool size, + // simulating what RediSearch does via VecSim_UpdateThreadPoolSize during module init. + SVSIndexBase::getSharedThreadPool()->resize(mock_thread_pool.thread_pool_size); + auto *tiered_index = reinterpret_cast *>(TieredFactory::NewIndex(&tiered_params)); @@ -2256,14 +2257,7 @@ class FP16SVSTieredIndexTest : public FP16SVSTest { // the index, and we'll need to release the ctx at the end of the test. mock_thread_pool.ctx->index_strong_ref.reset(tiered_index); - // Set number of available threads to 1 unless specified otherwise, - // so we can insert one vector at a time directly to svs. - tiered_index->GetSVSIndex()->setParallelism(num_available_threads); - size_t params_threadpool_size = - tiered_params.primaryIndexParams->algoParams.svsParams.num_threads; - size_t expected_capacity = - params_threadpool_size ? params_threadpool_size : mock_thread_pool.thread_pool_size; - verifyNumThreads(tiered_index, num_available_threads, expected_capacity); + verifyNumThreads(tiered_index, num_available_threads, mock_thread_pool.thread_pool_size); return tiered_index; } @@ -2812,10 +2806,10 @@ TYPED_TEST(FP16SVSTieredIndexTest, KNNSearch) { TYPED_TEST(FP16SVSTieredIndexTest, deleteVector) { // Create TieredSVS index instance with a mock queue. size_t dim = 4; - SVSParams params = {.dim = dim, .metric = VecSimMetric_L2, .num_threads = 1}; + SVSParams params = {.dim = dim, .metric = VecSimMetric_L2}; this->SetTypeParams(params); VecSimParams svs_params = CreateParams(params); - auto mock_thread_pool = tieredIndexMock(); + auto mock_thread_pool = tieredIndexMock(1); auto *tiered_index = this->CreateTieredSVSIndex(svs_params, mock_thread_pool, 1, 1); ASSERT_NO_FATAL_FAILURE(this->verify_index(tiered_index)); @@ -2876,10 +2870,10 @@ TYPED_TEST(FP16SVSTieredIndexTest, deleteVector) { TYPED_TEST(FP16SVSTieredIndexTest, deleteVectorMulti) { // Create TieredSVS index instance with a mock queue. size_t dim = 4; - SVSParams params = {.dim = dim, .metric = VecSimMetric_L2, .multi = true, .num_threads = 1}; + SVSParams params = {.dim = dim, .metric = VecSimMetric_L2, .multi = true}; this->SetTypeParams(params); VecSimParams svs_params = CreateParams(params); - auto mock_thread_pool = tieredIndexMock(); + auto mock_thread_pool = tieredIndexMock(1); auto *tiered_index = this->CreateTieredSVSIndex(svs_params, mock_thread_pool, 1, 1); ASSERT_NO_FATAL_FAILURE(this->verify_index(tiered_index)); diff --git a/tests/unit/test_svs_tiered.cpp b/tests/unit/test_svs_tiered.cpp index d079d64e5..c3d67a5f4 100644 --- a/tests/unit/test_svs_tiered.cpp +++ b/tests/unit/test_svs_tiered.cpp @@ -76,9 +76,6 @@ class SVSTieredIndexTest : public ::testing::Test { trainingThreshold = training_threshold; updateThreshold = update_threshold; svs_params.algoParams.svsParams.quantBits = index_type_t::get_quant_bits(); - if (svs_params.algoParams.svsParams.num_threads == 0) { - svs_params.algoParams.svsParams.num_threads = mock_thread_pool.thread_pool_size; - } return TieredIndexParams{ .jobQueue = &mock_thread_pool.jobQ, .jobQueueCtx = mock_thread_pool.ctx, @@ -90,14 +87,16 @@ class SVSTieredIndexTest : public ::testing::Test { .updateJobWaitTime = update_job_wait_time}}}; } - void verifyNumThreads(TieredSVSIndex *tiered_index, size_t expected_num_threads, - size_t expected_capcity) { - ASSERT_EQ(tiered_index->GetSVSIndex()->getParallelism(), expected_num_threads); + void verifyNumThreads(TieredSVSIndex *tiered_index, size_t expected_capcity) { + ASSERT_EQ(tiered_index->GetSVSIndex()->getParallelism(), 1); ASSERT_EQ(tiered_index->GetSVSIndex()->getPoolSize(), expected_capcity); } TieredSVSIndex *CreateTieredSVSIndex(const TieredIndexParams &tiered_params, - tieredIndexMock &mock_thread_pool, - size_t num_available_threads = 1) { + tieredIndexMock &mock_thread_pool) { + // Resize the shared SVS thread pool singleton to match the mock thread pool size, + // simulating what RediSearch does via VecSim_UpdateThreadPoolSize during module init. + VecSim_UpdateThreadPoolSize(mock_thread_pool.thread_pool_size); + auto *tiered_index = reinterpret_cast *>(TieredFactory::NewIndex(&tiered_params)); @@ -108,12 +107,8 @@ class SVSTieredIndexTest : public ::testing::Test { // which requires exactly 1 thread. When using tiered index addVector API, // the parallelism is managed internally according to the operation and pool // size, so testing parallelism remains intact. - tiered_index->GetSVSIndex()->setParallelism(num_available_threads); - size_t params_threadpool_size = - tiered_params.primaryIndexParams->algoParams.svsParams.num_threads; - size_t expected_capacity = - params_threadpool_size ? params_threadpool_size : mock_thread_pool.thread_pool_size; - verifyNumThreads(tiered_index, num_available_threads, expected_capacity); + // tiered_index->GetSVSIndex()->setParallelism(num_available_threads); + verifyNumThreads(tiered_index, mock_thread_pool.thread_pool_size); return tiered_index; } @@ -122,18 +117,17 @@ class SVSTieredIndexTest : public ::testing::Test { CreateTieredSVSIndex(VecSimParams &svs_params, tieredIndexMock &mock_thread_pool, size_t training_threshold = TestsDefaultTrainingThreshold, size_t update_threshold = TestsDefaultUpdateThreshold, - size_t update_job_wait_time = SVS_DEFAULT_UPDATE_JOB_WAIT_TIME, - size_t num_available_threads = 1) { + size_t update_job_wait_time = SVS_DEFAULT_UPDATE_JOB_WAIT_TIME) { svs_params.algoParams.svsParams.quantBits = index_type_t::get_quant_bits(); TieredIndexParams tiered_params = CreateTieredSVSParams(svs_params, mock_thread_pool, training_threshold, update_threshold, update_job_wait_time); - return CreateTieredSVSIndex(tiered_params, mock_thread_pool, num_available_threads); + return CreateTieredSVSIndex(tiered_params, mock_thread_pool); } void SetUp() override { // Restore the write mode to default. - VecSim_SetWriteMode(VecSim_WriteAsync); + VecSim_UpdateThreadPoolSize(0); // Limit VecSim log level to avoid printing too much information VecSimIndexInterface::setLogCallbackFunction(svsTestLogCallBackNoDebug); } @@ -199,11 +193,10 @@ TYPED_TEST(SVSTieredIndexTest, ThreadsReservation) { } std::chrono::milliseconds timeout{1000}; // long enough to reserve all threads - SVSParams params = { - .type = TypeParam::get_index_type(), .dim = 4, .metric = VecSimMetric_L2, .num_threads = 1}; + SVSParams params = {.type = TypeParam::get_index_type(), .dim = 4, .metric = VecSimMetric_L2}; VecSimParams svs_params = CreateParams(params); - auto mock_thread_pool = tieredIndexMock(); - mock_thread_pool.thread_pool_size = num_threads; + auto mock_thread_pool = tieredIndexMock(num_threads); + ASSERT_EQ(mock_thread_pool.thread_pool_size, num_threads); // Create TieredSVS index instance with a mock queue. auto *tiered_index = this->CreateTieredSVSIndex(svs_params, mock_thread_pool); @@ -281,24 +274,21 @@ TYPED_TEST(SVSTieredIndexTest, TestDebugInfoThreadCount) { constexpr size_t update_threshold = 10; constexpr size_t update_job_wait_time = 10000; constexpr size_t dim = 4; - SVSParams params = {.type = TypeParam::get_index_type(), - .dim = dim, - .metric = VecSimMetric_L2, - .num_threads = num_threads}; + SVSParams params = {.type = TypeParam::get_index_type(), .dim = dim, .metric = VecSimMetric_L2}; VecSimParams svs_params = CreateParams(params); - auto mock_thread_pool = tieredIndexMock(); - mock_thread_pool.thread_pool_size = num_threads; + auto mock_thread_pool = tieredIndexMock(num_threads); + ASSERT_EQ(mock_thread_pool.thread_pool_size, num_threads); // Create TieredSVS index instance with a mock queue. - auto *tiered_index = - this->CreateTieredSVSIndex(svs_params, mock_thread_pool, training_threshold, - update_threshold, update_job_wait_time, num_threads); + auto *tiered_index = this->CreateTieredSVSIndex( + svs_params, mock_thread_pool, training_threshold, update_threshold, update_job_wait_time); ASSERT_INDEX(tiered_index); - // Verify initial state: both fields should equal configured thread count + // Verify initial state: numThreads reflects the shared pool size, + // lastReservedThreads starts at 1 (parallelism_ is initialized to 1). VecSimIndexDebugInfo backendIndexInfo = tiered_index->GetBackendIndex()->debugInfo(); ASSERT_EQ(backendIndexInfo.svsInfo.numThreads, num_threads); - ASSERT_EQ(backendIndexInfo.svsInfo.lastReservedThreads, num_threads); + ASSERT_EQ(backendIndexInfo.svsInfo.lastReservedThreads, 1); // Get the allocator from the tiered index. auto allocator = tiered_index->getAllocator(); @@ -332,7 +322,9 @@ TYPED_TEST(SVSTieredIndexTest, TestDebugInfoThreadCount) { while (tiered_index->GetBackendIndex()->indexSize() != training_threshold) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } - // Verify: numThreads unchanged, lastReservedThreads reflects actual availability + // Verify: numThreads (pool size) unchanged, lastReservedThreads (parallelism) reflects + // reduced availability — one thread was occupied, so at most num_threads - 1 could reserve. + // Use LE because OS timing may cause fewer threads to check in before the reservation timeout. backendIndexInfo = tiered_index->GetBackendIndex()->debugInfo(); ASSERT_EQ(backendIndexInfo.svsInfo.numThreads, num_threads); ASSERT_LE(backendIndexInfo.svsInfo.lastReservedThreads, num_threads - 1); @@ -346,7 +338,9 @@ TYPED_TEST(SVSTieredIndexTest, TestDebugInfoThreadCount) { } mock_thread_pool.thread_pool_join(); - // Verify: numThreads unchanged, lastReservedThreads reflects we used all configured threads + // Verify: numThreads (pool size) unchanged, lastReservedThreads (parallelism) reflects + // that all configured threads were available for this operation. + // Use LE because OS timing may cause fewer threads to check in before the reservation timeout. backendIndexInfo = tiered_index->GetBackendIndex()->debugInfo(); ASSERT_EQ(backendIndexInfo.svsInfo.numThreads, num_threads); ASSERT_LE(backendIndexInfo.svsInfo.lastReservedThreads, num_threads); @@ -354,8 +348,8 @@ TYPED_TEST(SVSTieredIndexTest, TestDebugInfoThreadCount) { TYPED_TEST(SVSTieredIndexTest, TestDebugInfoThreadCountWriteInPlace) { // Test that write-in-place mode correctly reports thread usage in debug info. - // Even when the index is configured with multiple threads, write-in-place operations - // should use only 1 thread and lastReservedThreads should reflect this. + // Even when the shared pool has multiple threads, write-in-place operations + // should use only 1 thread and lastReservedThreads (parallelism) should reflect this. // Set thread_pool_size to 4 or actual number of available CPUs const auto num_threads = std::min(4U, getAvailableCPUs()); @@ -364,31 +358,28 @@ TYPED_TEST(SVSTieredIndexTest, TestDebugInfoThreadCountWriteInPlace) { GTEST_SKIP() << "No threads available"; } - // Test svs when mode is write in place, but the index is configured with multiple threads. + // Test svs when mode is write in place, but the shared pool has multiple threads. constexpr size_t training_threshold = 10; constexpr size_t update_threshold = 10; constexpr size_t update_job_wait_time = 10000; constexpr size_t dim = 4; - SVSParams params = {.type = TypeParam::get_index_type(), - .dim = dim, - .metric = VecSimMetric_L2, - .num_threads = num_threads}; + SVSParams params = {.type = TypeParam::get_index_type(), .dim = dim, .metric = VecSimMetric_L2}; VecSimParams svs_params = CreateParams(params); - auto mock_thread_pool = tieredIndexMock(); - mock_thread_pool.thread_pool_size = num_threads; + auto mock_thread_pool = tieredIndexMock(num_threads); + ASSERT_EQ(mock_thread_pool.thread_pool_size, num_threads); // Create TieredSVS index instance with a mock queue. - auto *tiered_index = - this->CreateTieredSVSIndex(svs_params, mock_thread_pool, training_threshold, - update_threshold, update_job_wait_time, num_threads); + auto *tiered_index = this->CreateTieredSVSIndex( + svs_params, mock_thread_pool, training_threshold, update_threshold, update_job_wait_time); ASSERT_INDEX(tiered_index); - // Set to mode to write in place even though the index is configured with multiple threads. + // Set mode to write in place even though the shared pool has multiple threads. VecSim_SetWriteMode(VecSim_WriteInPlace); - // Verify initial state: both fields should equal configured thread count + // Verify initial state: numThreads reflects the shared pool size, + // lastReservedThreads starts at 1 (parallelism_ is initialized to 1). VecSimIndexDebugInfo backendIndexInfo = tiered_index->GetBackendIndex()->debugInfo(); ASSERT_EQ(backendIndexInfo.svsInfo.numThreads, num_threads); - ASSERT_EQ(backendIndexInfo.svsInfo.lastReservedThreads, num_threads); + ASSERT_EQ(backendIndexInfo.svsInfo.lastReservedThreads, 1); // Get the allocator from the tiered index. auto allocator = tiered_index->getAllocator(); @@ -403,8 +394,8 @@ TYPED_TEST(SVSTieredIndexTest, TestDebugInfoThreadCountWriteInPlace) { while (tiered_index->GetBackendIndex()->indexSize() != training_threshold) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } - // Verify: numThreads unchanged, lastReservedThreads reflects we only used one thread (main - // thread) + // Verify: numThreads (pool size) unchanged, lastReservedThreads (parallelism) reflects + // we only used one thread (write-in-place calls updateSVSIndexWrapper with availableThreads=1). backendIndexInfo = tiered_index->GetBackendIndex()->debugInfo(); ASSERT_EQ(backendIndexInfo.svsInfo.numThreads, num_threads); ASSERT_EQ(backendIndexInfo.svsInfo.lastReservedThreads, 1); @@ -413,7 +404,7 @@ TYPED_TEST(SVSTieredIndexTest, TestDebugInfoThreadCountWriteInPlace) { size_t num_vectors = 10; for (size_t i = 0; i < num_vectors; ++i) { GenerateAndAddVector(tiered_index, dim, training_threshold + i); - // Verify: numThreads unchanged, lastReservedThreads reflects we used only one thread + // Verify: numThreads (pool size) unchanged, lastReservedThreads (parallelism) = 1 backendIndexInfo = tiered_index->GetBackendIndex()->debugInfo(); ASSERT_EQ(backendIndexInfo.svsInfo.numThreads, num_threads); ASSERT_EQ(backendIndexInfo.svsInfo.lastReservedThreads, 1); @@ -1035,10 +1026,9 @@ TYPED_TEST(SVSTieredIndexTest, deleteVector) { SVSParams params = {.type = TypeParam::get_index_type(), .dim = dim, .metric = VecSimMetric_L2, - .multi = TypeParam::isMulti(), - .num_threads = 1}; + .multi = TypeParam::isMulti()}; VecSimParams svs_params = CreateParams(params); - auto mock_thread_pool = tieredIndexMock(); + auto mock_thread_pool = tieredIndexMock(1); auto *tiered_index = this->CreateTieredSVSIndex(svs_params, mock_thread_pool, 1, 1); ASSERT_INDEX(tiered_index); @@ -1101,12 +1091,9 @@ TYPED_TEST(SVSTieredIndexTestBasic, markedDeleted) { size_t dim = 4; constexpr size_t n = 10; constexpr size_t transfer_trigger = n; - SVSParams params = {.type = TypeParam::get_index_type(), - .dim = dim, - .metric = VecSimMetric_L2, - .num_threads = 1}; + SVSParams params = {.type = TypeParam::get_index_type(), .dim = dim, .metric = VecSimMetric_L2}; VecSimParams svs_params = CreateParams(params); - auto mock_thread_pool = tieredIndexMock(); + auto mock_thread_pool = tieredIndexMock(1); auto *tiered_index = this->CreateTieredSVSIndex(svs_params, mock_thread_pool, transfer_trigger, transfer_trigger); @@ -1180,13 +1167,10 @@ TYPED_TEST(SVSTieredIndexTestBasic, markedDeleted) { TYPED_TEST(SVSTieredIndexTestBasic, deleteVectorMulti) { // Create TieredSVS index instance with a mock queue. size_t dim = 4; - SVSParams params = {.type = TypeParam::get_index_type(), - .dim = dim, - .metric = VecSimMetric_L2, - .multi = true, - .num_threads = 1}; + SVSParams params = { + .type = TypeParam::get_index_type(), .dim = dim, .metric = VecSimMetric_L2, .multi = true}; VecSimParams svs_params = CreateParams(params); - auto mock_thread_pool = tieredIndexMock(); + auto mock_thread_pool = tieredIndexMock(1); auto *tiered_index = this->CreateTieredSVSIndex(svs_params, mock_thread_pool, 1, 1); auto allocator = tiered_index->getAllocator(); @@ -2604,13 +2588,10 @@ TYPED_TEST(SVSTieredIndexTestBasic, overwriteVectorBasic) { // Create TieredSVS index instance with a mock queue. size_t dim = 4; size_t n = 1000; - SVSParams params = {.type = TypeParam::get_index_type(), - .dim = dim, - .metric = VecSimMetric_L2, - .multi = false, - .num_threads = 1}; + SVSParams params = { + .type = TypeParam::get_index_type(), .dim = dim, .metric = VecSimMetric_L2, .multi = false}; VecSimParams svs_params = CreateParams(params); - auto mock_thread_pool = tieredIndexMock(); + auto mock_thread_pool = tieredIndexMock(1); auto *tiered_index = this->CreateTieredSVSIndex(svs_params, mock_thread_pool, 1, 1); ASSERT_INDEX(tiered_index); @@ -3828,23 +3809,23 @@ TEST(SVSTieredIndexTest, testThreadPool) { auto shared_pool = std::make_shared(num_threads); auto pool = VecSimSVSThreadPool(shared_pool); ASSERT_EQ(pool.poolSize(), num_threads); - ASSERT_EQ(pool.size(), 0); // parallelism starts at 0 - ASSERT_EQ(pool.getParallelism(), 0); - - // Must call setParallelism before parallel_for - pool.setParallelism(num_threads); - ASSERT_EQ(pool.size(), num_threads); - ASSERT_EQ(pool.getParallelism(), num_threads); + ASSERT_EQ(pool.size(), 1); // parallelism starts at 1 (calling thread) + ASSERT_EQ(pool.getParallelism(), 1); std::atomic_int counter(0); auto task = [&counter](size_t i) { counter += i + 1; }; + // Can use parallel_for immediately with parallelism 1 + pool.parallel_for(task, 1); + ASSERT_EQ(counter, 1); // 0+1 = 1 + // n > parallelism is a bug — asserts in debug mode #if !defined(RUNNING_ON_VALGRIND) && !defined(NDEBUG) ASSERT_DEATH(pool.parallel_for(task, 10), "n exceeds reserved thread count"); #endif counter = 0; + pool.setParallelism(4); pool.parallel_for(task, num_threads); ASSERT_EQ(counter, 10); // 1+2+3+4 = 10 @@ -3854,14 +3835,14 @@ TEST(SVSTieredIndexTest, testThreadPool) { ASSERT_EQ(counter, 3); // 1+2 = 3 // setParallelism changes per-index parallelism, not the shared pool - pool.setParallelism(1); + pool.setParallelism(2); ASSERT_EQ(pool.poolSize(), num_threads); // shared pool unchanged - ASSERT_EQ(pool.size(), 1); // per-index parallelism - ASSERT_EQ(pool.getParallelism(), 1); + ASSERT_EQ(pool.size(), 2); // per-index parallelism + ASSERT_EQ(pool.getParallelism(), 2); counter = 0; - pool.parallel_for(task, 1); - ASSERT_EQ(counter, 1); // 0+1 = 1 + pool.parallel_for(task, 2); + ASSERT_EQ(counter, 3); // 0+1+2 = 1 // setParallelism boundary checks — asserts in debug mode #if !defined(RUNNING_ON_VALGRIND) && !defined(NDEBUG) @@ -3879,12 +3860,11 @@ TEST(SVSTieredIndexTest, testThreadPool) { inplace_pool.parallel_for(task, 1); ASSERT_EQ(counter, 1); - // parallel_for without setParallelism asserts in debug mode -#if !defined(RUNNING_ON_VALGRIND) && !defined(NDEBUG) - auto unset_pool = VecSimSVSThreadPool(shared_pool); - ASSERT_DEATH(unset_pool.parallel_for(task, 1), - "setParallelism must be called before parallel_for"); -#endif + // parallel_for works immediately with default parallelism 1 + auto default_pool = VecSimSVSThreadPool(shared_pool); + counter = 0; + default_pool.parallel_for(task, 1); + ASSERT_EQ(counter, 1); // 0+1 = 1 // Test exception handling auto err_task = [](size_t) { throw std::runtime_error("Test exception"); };