From 2c3d9a142fddb10f184cefa03528d9f4663dbaa0 Mon Sep 17 00:00:00 2001 From: Symmetricity <184246+Symmetricity@users.noreply.github.com> Date: Sun, 10 May 2026 19:35:30 +0200 Subject: [PATCH] Synchronize attribute key cache refresh AttributeKeyStore publishes new keys to worker-local caches while PBF reading runs in parallel. The previous code advanced keys2indexSize before appending the matching key and refreshed the thread-local cache without holding keys2indexMutex, so another worker could read the deque while it was being mutated. Take the existing mutex before refreshing the thread-local cache and publish the new key only after it is appended to the shared store. Add a threaded AttributeKeyStore test for the concurrent lookup/insert path. --- src/attribute_store.cpp | 7 +++---- test/attribute_store.test.cpp | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 4 deletions(-) diff --git a/src/attribute_store.cpp b/src/attribute_store.cpp index 73ed4775..a0c31d71 100644 --- a/src/attribute_store.cpp +++ b/src/attribute_store.cpp @@ -16,13 +16,12 @@ uint16_t AttributeKeyStore::key2index(const std::string& key) { // Not found, ensure our local map is up-to-date for future calls, // and fall through to the main map. - // - // Note that we can read `keys` without a lock + std::lock_guard lock(keys2indexMutex); while (tlsKeys2IndexSize < keys2indexSize) { tlsKeys2IndexSize++; tlsKeys2Index[&keys[tlsKeys2IndexSize]] = tlsKeys2IndexSize; } - std::lock_guard lock(keys2indexMutex); + const auto& rv = keys2index.find(&key); if (rv != keys2index.end()) @@ -38,9 +37,9 @@ uint16_t AttributeKeyStore::key2index(const std::string& key) { if (newIndex >= 512) throw std::out_of_range("more than 512 unique keys"); - keys2indexSize++; keys.push_back(key); keys2index[&keys[newIndex]] = newIndex; + keys2indexSize = newIndex; return newIndex; } diff --git a/test/attribute_store.test.cpp b/test/attribute_store.test.cpp index 6058bc89..9b2fad54 100644 --- a/test/attribute_store.test.cpp +++ b/test/attribute_store.test.cpp @@ -1,5 +1,8 @@ #include #include +#include +#include +#include #include "external/minunit.h" #include "attribute_store.h" @@ -126,10 +129,42 @@ MU_TEST(test_attribute_store_capacity) { mu_check(caughtException == true); } +MU_TEST(test_attribute_key_store_threaded) { + AttributeKeyStore keys; + const int keyCount = 128; + const int threadCount = 8; + const int iterations = 100; + std::atomic start(false); + std::atomic failed(false); + std::vector threads; + + for (int thread = 0; thread < threadCount; thread++) { + threads.emplace_back([&, thread]() { + while (!start.load(std::memory_order_acquire)) {} + + for (int i = 0; i < iterations * keyCount; i++) { + const int keyNum = (i + thread * 17) % keyCount; + const std::string key = "key" + std::to_string(keyNum); + const uint16_t firstIndex = keys.key2index(key); + const uint16_t secondIndex = keys.key2index(key); + if (firstIndex == 0 || firstIndex > keyCount || firstIndex != secondIndex) + failed.store(true, std::memory_order_release); + } + }); + } + + start.store(true, std::memory_order_release); + for (std::thread& thread : threads) + thread.join(); + + mu_check(failed.load(std::memory_order_acquire) == false); +} + MU_TEST_SUITE(test_suite_attribute_store) { MU_RUN_TEST(test_attribute_store); MU_RUN_TEST(test_attribute_store_reuses); MU_RUN_TEST(test_attribute_store_capacity); + MU_RUN_TEST(test_attribute_key_store_threaded); } int main() {