Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 27 additions & 20 deletions src/VecSim/algorithms/svs/svs.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,21 @@ struct SVSIndexBase
{
SVSIndexBase() : num_marked_deleted{0} {};
virtual ~SVSIndexBase() = default;

// Singleton accessor for the shared SVS thread pool.
// Always valid — initialized with size 1 (write-in-place mode: 0 worker threads,
// only the calling thread participates). Resized on VecSim_UpdateThreadPoolSize() calls.
static std::shared_ptr<VecSimSVSThreadPoolImpl> getSharedThreadPool() {
static auto shared_pool = std::make_shared<VecSimSVSThreadPoolImpl>(1);
return shared_pool;
}
virtual int addVectors(const void *vectors_data, const labelType *labels, size_t n) = 0;
virtual int deleteVectors(const labelType *labels, size_t n) = 0;
virtual bool isLabelExists(labelType label) const = 0;
virtual size_t indexStorageSize() const = 0;
virtual size_t getNumThreads() const = 0;
virtual void setNumThreads(size_t numThreads) = 0;
virtual size_t getThreadPoolCapacity() const = 0;
virtual size_t getParallelism() const = 0;
virtual void setParallelism(size_t parallelism) = 0;
virtual size_t getPoolSize() const = 0;
virtual bool isCompressed() const = 0;
size_t getNumMarkedDeleted() const { return num_marked_deleted; }

Expand All @@ -66,9 +74,9 @@ struct SVSIndexBase
};

/** Thread Management Strategy:
* - addVector(): Requires numThreads == 1
* - addVectors(): Allows any numThreads value, but prohibits n=1 with numThreads>1
* - Callers are responsible for setting appropriate thread counts
* - addVector(): Requires parallelism == 1
* - addVectors(): Allows any parallelism value, but prohibits n=1 with parallelism>1
* - Callers are responsible for setting appropriate parallelism
**/
template <typename MetricType, typename DataType, bool isMulti, size_t QuantBits,
size_t ResidualBits, bool IsLeanVec>
Expand Down Expand Up @@ -251,12 +259,12 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
this->impl_ = std::move(svs_handler->impl);
}

// Assuming numThreads was updated to reflect the number of available threads before this
// Assuming parallelism was updated to reflect the number of available threads before this
// function was called.
// This function assumes that the caller has already set numThreads to the appropriate value
// This function assumes that the caller has already set parallelism to the appropriate value
// for the operation.
// Important NOTE: For single vector operations (n=1), numThreads should be 1.
// For bulk operations (n>1), numThreads should reflect the number of available threads.
// Important NOTE: For single vector operations (n=1), parallelism should be 1.
// For bulk operations (n>1), parallelism should reflect the number of available threads.
int addVectorsImpl(const void *vectors_data, const labelType *labels, size_t n) {
if (n == 0) {
return 0;
Expand Down Expand Up @@ -361,8 +369,7 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
svs_details::getOrDefault(params.leanvec_dim, SVS_VAMANA_DEFAULT_LEANVEC_DIM)},
epsilon{svs_details::getOrDefault(params.epsilon, SVS_VAMANA_DEFAULT_EPSILON)},
is_two_level_lvq{isTwoLevelLVQ(params.quantBits)},
threadpool_{std::max(size_t{SVS_VAMANA_DEFAULT_NUM_THREADS}, params.num_threads)},
impl_{nullptr} {
threadpool_{SVSIndexBase::getSharedThreadPool(), this->logCallbackCtx}, impl_{nullptr} {
logger_ = makeLogger();
}

Expand Down Expand Up @@ -412,8 +419,8 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
.maxCandidatePoolSize = this->buildParams.max_candidate_pool_size,
.pruneTo = this->buildParams.prune_to,
.useSearchHistory = this->buildParams.use_full_search_history,
.numThreads = this->getThreadPoolCapacity(),
.lastReservedThreads = this->getNumThreads(),
.numThreads = this->getPoolSize(),
.lastReservedThreads = this->getParallelism(),
.numberOfMarkedDeletedNodes = this->num_marked_deleted,
.searchWindowSize = this->search_window_size,
.searchBufferCapacity = this->search_buffer_capacity,
Expand Down Expand Up @@ -517,16 +524,16 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl

int addVector(const void *vector_data, labelType label) override {
// Enforce single-threaded execution for single vector operations to ensure optimal
// performance and consistent behavior. Callers must set numThreads=1 before calling this
// performance and consistent behavior. Callers must set parallelism=1 before calling this
// method.
assert(getNumThreads() == 1 && "Can't use more than one thread to insert a single vector");
assert(getParallelism() == 1 && "Can't use more than one thread to insert a single vector");
return addVectorsImpl(vector_data, &label, 1);
}

int addVectors(const void *vectors_data, const labelType *labels, size_t n) override {
// Prevent misuse: single vector operations should use addVector(), not addVectors() with
// n=1 This ensures proper thread management and API contract enforcement.
assert(!(n == 1 && getNumThreads() > 1) &&
assert(!(n == 1 && getParallelism() > 1) &&
"Can't use more than one thread to insert a single vector");
return addVectorsImpl(vectors_data, labels, n);
}
Expand All @@ -541,10 +548,10 @@ class SVSIndex : public VecSimIndexAbstract<svs_details::vecsim_dt<DataType>, fl
return impl_ ? impl_->has_id(label) : false;
}

size_t getNumThreads() const override { return threadpool_.size(); }
void setNumThreads(size_t numThreads) override { threadpool_.resize(numThreads); }
size_t getParallelism() const override { return threadpool_.getParallelism(); }
void setParallelism(size_t parallelism) override { threadpool_.setParallelism(parallelism); }

size_t getThreadPoolCapacity() const override { return threadpool_.capacity(); }
size_t getPoolSize() const override { return threadpool_.poolSize(); }

bool isCompressed() const override { return storage_traits_t::is_compressed(); }

Expand Down
23 changes: 11 additions & 12 deletions src/VecSim/algorithms/svs/svs_tiered.h
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
// No need to run GC on an empty index.
return;
}
svs_index->setNumThreads(std::min(availableThreads, index->backendIndex->indexSize()));
svs_index->setParallelism(std::min(availableThreads, index->backendIndex->indexSize()));
// VecSimIndexAbstract::runGC() is protected
static_cast<VecSimIndexInterface *>(index->backendIndex)->runGC();
}
Expand All @@ -601,7 +601,7 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
return;
}

auto total_threads = this->GetSVSIndex()->getThreadPoolCapacity();
auto total_threads = this->GetSVSIndex()->getPoolSize();
auto jobs = SVSMultiThreadJob::createJobs(
this->allocator, SVS_BATCH_UPDATE_JOB, updateSVSIndexWrapper, this, total_threads,
std::chrono::microseconds(updateJobWaitTime), &uncompletedJobs);
Expand All @@ -614,7 +614,7 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
return;
}

auto total_threads = this->GetSVSIndex()->getThreadPoolCapacity();
auto total_threads = this->GetSVSIndex()->getPoolSize();
auto jobs = SVSMultiThreadJob::createJobs(
this->allocator, SVS_GC_JOB, SVSIndexGCWrapper, this, total_threads,
std::chrono::microseconds(updateJobWaitTime), &uncompletedJobs);
Expand Down Expand Up @@ -677,13 +677,14 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
} // release frontend index

executeTracingCallback("UpdateJob::before_add_to_svs");
{ // lock backend index for writing and add vectors there
if (!labels_to_move.empty()) {
// lock backend index for writing and add vectors there
std::shared_lock main_shared_lock(this->mainIndexGuard);
auto svs_index = GetSVSIndex();
assert(labels_to_move.size() == vectors_to_move.size() / this->frontendIndex->getDim());
if (this->backendIndex->indexSize() == 0) {
// If backend index is empty, we need to initialize it first.
svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size()));
svs_index->setParallelism(std::min(availableThreads, labels_to_move.size()));
auto impl = svs_index->createImpl(vectors_to_move.data(), labels_to_move.data(),
labels_to_move.size());

Expand All @@ -696,7 +697,7 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
main_shared_lock.unlock();
std::lock_guard lock(this->mainIndexGuard);
// Upgrade to unique lock to add vectors
svs_index->setNumThreads(std::min(availableThreads, labels_to_move.size()));
svs_index->setParallelism(std::min(availableThreads, labels_to_move.size()));
svs_index->addVectors(vectors_to_move.data(), labels_to_move.data(),
labels_to_move.size());
}
Expand Down Expand Up @@ -819,11 +820,9 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
// prevent update job from running in parallel and lock any access to the backend
// index
std::scoped_lock lock(this->updateJobMutex, this->mainIndexGuard);
// Set available thread count to 1 for single vector write-in-place operation.
// This maintains the contract that single vector operations use exactly one thread.
// TODO: Replace this setNumThreads(1) call with an assertion once we establish
// a contract that write-in-place mode guarantees numThreads == 1.
svs_index->setNumThreads(1);
// Defensive: ensure single-threaded operation for write-in-place mode.
// parallelism_ defaults to 1, so this is a no-op in the normal case.
svs_index->setParallelism(1);
return this->backendIndex->addVector(storage_blob.get(), label);
}
}
Expand Down Expand Up @@ -1077,7 +1076,7 @@ class TieredSVSIndex : public VecSimTieredIndex<DataType, float> {
return;
}
// Force single thread for write-in-place mode.
this->GetSVSIndex()->setNumThreads(1);
this->GetSVSIndex()->setParallelism(1);
// VecSimIndexAbstract::runGC() is protected
static_cast<VecSimIndexInterface *>(this->backendIndex)->runGC();
return;
Expand Down
Loading
Loading