Skip to content

Commit 8134ff0

Browse files
Zainullin DamirZainullin Damir
authored andcommitted
++
1 parent 88e6fb3 commit 8134ff0

18 files changed

Lines changed: 606 additions & 611 deletions

include/ipfixprobe/outputPlugin/outputStorage/allocationBuffer2.hpp

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,23 +27,24 @@ class AllocationBuffer2 : public AllocationBufferBase<ElementType> {
2727
}
2828
}
2929

30-
ElementType* allocate() noexcept override
30+
ElementType* allocate([[maybe_unused]] const uint8_t writerId) noexcept override
3131
{
3232
static thread_local std::mt19937 gen(std::random_device {}());
3333
static thread_local std::uniform_int_distribution<> dist(0, 31);
3434
static thread_local uint64_t threadQueueIndex = dist(gen);
3535
while (true) {
3636
threadQueueIndex = (threadQueueIndex + 1) % m_queues.size();
3737
// const uint64_t queueIndex = m_nextQueue++ % m_queues.size();
38-
const std::optional<ElementType*> res = m_queues[threadQueueIndex].tryPop();
39-
40-
if (res.has_value()) {
41-
return *res;
38+
ElementType* res = m_queues[threadQueueIndex].tryPop();
39+
if (res != nullptr) {
40+
res->ElementType::~ElementType();
41+
new (res) ElementType();
42+
return res;
4243
}
4344
}
4445
}
4546

46-
void deallocate(ElementType* element) noexcept override
47+
void deallocate(ElementType* element, [[maybe_unused]] const uint8_t writerId) noexcept override
4748
{
4849
static thread_local std::mt19937 gen(std::random_device {}());
4950
static thread_local std::uniform_int_distribution<> dist(0, 31);
@@ -60,14 +61,14 @@ class AllocationBuffer2 : public AllocationBufferBase<ElementType> {
6061
private:
6162
class Queue {
6263
public:
63-
std::optional<ElementType*> tryPop() noexcept
64+
ElementType* tryPop() noexcept
6465
{
6566
if (!tryLock()) {
66-
return std::nullopt;
67+
return nullptr;
6768
}
6869
if (pointers.empty()) {
6970
unlock();
70-
return std::nullopt;
71+
return nullptr;
7172
}
7273
ElementType* res = pointers.back();
7374
pointers.pop_back();

include/ipfixprobe/outputPlugin/outputStorage/allocationBuffer3.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class AllocationBuffer3 : public AllocationBufferBase<ElementType> {
2828
}
2929
}
3030

31-
ElementType* allocate() noexcept override
31+
ElementType* allocate([[maybe_unused]] const uint8_t writerId) noexcept override
3232
{
3333
static thread_local std::mt19937 gen(std::random_device {}());
3434
static thread_local std::uniform_int_distribution<> dist(0, 31);
@@ -44,7 +44,7 @@ class AllocationBuffer3 : public AllocationBufferBase<ElementType> {
4444
}
4545
}
4646

47-
void deallocate(ElementType* element) noexcept override
47+
void deallocate(ElementType* element, [[maybe_unused]] const uint8_t writerId) noexcept override
4848
{
4949
static thread_local std::mt19937 gen(std::random_device {}());
5050
static thread_local std::uniform_int_distribution<> dist(0, 31);

include/ipfixprobe/outputPlugin/outputStorage/allocationBufferBase.hpp

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,27 @@ namespace ipxp::output {
55
template<typename ElementType>
66
class AllocationBufferBase {
77
public:
8-
virtual ElementType* allocate() noexcept = 0;
8+
static_assert(
9+
std::is_default_constructible_v<ElementType>,
10+
"ElementType must be default constructible");
911

10-
virtual void deallocate(ElementType* element) noexcept = 0;
12+
virtual ElementType* allocate(const uint8_t writerId) noexcept = 0;
13+
14+
virtual void deallocate(ElementType* element, const uint8_t writerId) noexcept = 0;
1115

1216
virtual void unregisterWriter() noexcept {}
1317

1418
virtual void registerWriter() noexcept {}
1519

1620
virtual ~AllocationBufferBase() = default;
21+
22+
void replace(ElementType*& oldValue, ElementType* newValue, const uint8_t writerId) noexcept
23+
{
24+
if (oldValue != nullptr) {
25+
deallocate(oldValue, writerId);
26+
}
27+
oldValue = newValue;
28+
}
1729
};
1830

1931
} // namespace ipxp::output

include/ipfixprobe/outputPlugin/outputStorage/allocationBufferR.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class AllocationBufferR : public AllocationBufferBase<ElementType> {
3131
void registerWriter() noexcept override { m_controlBlock->registerWriter(); }
3232
void unregisterWriter() noexcept override { m_controlBlock->unregisterWriter(); }
3333

34-
ElementType* allocate() noexcept override
34+
ElementType* allocate([[maybe_unused]] const uint8_t writerId) noexcept override
3535
{
3636
const std::optional<uint16_t> readPos = std::invoke([&]() {
3737
std::optional<uint16_t> res = std::nullopt;
@@ -49,7 +49,7 @@ class AllocationBufferR : public AllocationBufferBase<ElementType> {
4949
return res;
5050
}
5151

52-
void deallocate(ElementType* element) noexcept override
52+
void deallocate(ElementType* element, [[maybe_unused]] const uint8_t writerId) noexcept override
5353
{
5454
const std::optional<uint16_t> writePos = m_controlBlock->getWritePos();
5555
if (!writePos.has_value()) {

include/ipfixprobe/outputPlugin/outputStorage/b2OutputStorage.hpp

Lines changed: 63 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -16,152 +16,141 @@
1616

1717
namespace ipxp::output {
1818

19-
class B2OutputStorage : public BOutputStorage {
19+
template<typename ElementType>
20+
class B2OutputStorage : public BOutputStorage<ElementType> {
2021
public:
2122
explicit B2OutputStorage(const uint8_t writersCount) noexcept
22-
: BOutputStorage(writersCount)
23+
: BOutputStorage<ElementType>(writersCount)
2324
{
2425
}
2526

26-
bool storeContainer(ContainerWrapper container, const uint8_t writerIndex) noexcept override
27+
bool write(ElementType* element, const uint8_t writerIndex) noexcept override
2728
{
28-
if (container.empty()) {
29-
throw std::runtime_error("Trying to store empty container.");
30-
}
31-
WriterData& writerData = m_writersData[writerIndex].get();
29+
typename BOutputStorage<ElementType>::WriterData& writerData
30+
= this->m_writersData[writerIndex].get();
3231
const uint16_t containersLeft = writerData.bucketAllocation.containersLeft();
3332
switch (containersLeft) {
3433
case 1: {
35-
auto& y = getNextContainer(writerData.bucketAllocation);
36-
if (!y.empty() && y.getContainer().readTimes == 0) {
37-
throw std::runtime_error("XXX");
38-
}
39-
y.assign(container, *m_allocationBuffer);
40-
// getNextContainer(writerData.bucketAllocation).assign(container, *m_allocationBuffer);
34+
this->m_allocationBuffer->replace(
35+
this->getNextElement(writerData.bucketAllocation),
36+
element,
37+
writerIndex);
4138
}
4239
[[fallthrough]];
4340
case 0:
4441
break;
4542
default: {
46-
auto& z = getNextContainer(writerData.bucketAllocation);
47-
if (!z.empty() && z.getContainer().readTimes == 0) {
48-
// throw std::runtime_error("XXX");
49-
}
50-
z.assign(container, *m_allocationBuffer);
43+
this->m_allocationBuffer->replace(
44+
this->getNextElement(writerData.bucketAllocation),
45+
element,
46+
writerIndex);
5147
return true;
5248
}
5349
}
5450

5551
uint8_t loopCounter = 0;
5652
BackoffScheme backoffScheme(2, std::numeric_limits<std::size_t>::max());
57-
// const uint16_t initialPosition = writerData.writePosition;
5853
do {
5954
const bool overflowed = writerData.randomShift();
60-
d_writerShifts++;
6155
if (overflowed) {
62-
writerData.cachedLowestReaderGeneration = m_lowestReaderGeneration.load();
63-
if (containersLeft == 0) {
64-
// container.deallocate(*m_allocationBuffer);
65-
}
66-
d_writerYields++;
56+
writerData.cachedLowestReaderGeneration = this->m_lowestReaderGeneration.load();
57+
if (containersLeft == 0) {}
6758
backoffScheme.backoff();
6859
}
6960

70-
if (m_buckets[writerData.writePosition].generation
61+
if (this->m_buckets[writerData.writePosition].generation
7162
>= writerData.cachedLowestReaderGeneration
72-
|| !BucketAllocation::isValidBucketIndex(
73-
m_buckets[writerData.writePosition].bucketIndex)
74-
|| !m_buckets[writerData.writePosition].lock.try_lock()) {
63+
|| !BOutputStorage<ElementType>::BucketAllocation::isValidBucketIndex(
64+
this->m_buckets[writerData.writePosition].bucketIndex)
65+
|| !this->m_buckets[writerData.writePosition].lock.try_lock()) {
7566
continue;
7667
}
77-
if (m_buckets[writerData.writePosition].generation
68+
if (this->m_buckets[writerData.writePosition].generation
7869
>= writerData.cachedLowestReaderGeneration
79-
|| !BucketAllocation::isValidBucketIndex(
80-
m_buckets[writerData.writePosition].bucketIndex)) {
81-
m_buckets[writerData.writePosition].lock.unlock();
70+
|| !BOutputStorage<ElementType>::BucketAllocation::isValidBucketIndex(
71+
this->m_buckets[writerData.writePosition].bucketIndex)) {
72+
this->m_buckets[writerData.writePosition].lock.unlock();
8273
continue;
8374
}
8475
break;
8576
} while (true);
8677

87-
m_buckets[writerData.writePosition].bucketIndex
88-
= writerData.bucketAllocation.reset(m_buckets[writerData.writePosition].bucketIndex);
78+
this->m_buckets[writerData.writePosition].bucketIndex = writerData.bucketAllocation.reset(
79+
this->m_buckets[writerData.writePosition].bucketIndex);
8980
std::atomic_thread_fence(std::memory_order_release);
9081

9182
const uint64_t highestReaderGeneration
92-
= m_highestReaderGeneration.load(std::memory_order_acquire);
93-
if (writerData.generation < highestReaderGeneration + WINDOW_SIZE) {
94-
writerData.generation = highestReaderGeneration + WINDOW_SIZE;
83+
= this->m_highestReaderGeneration.load(std::memory_order_acquire);
84+
if (writerData.generation
85+
< highestReaderGeneration + BOutputStorage<ElementType>::WINDOW_SIZE) {
86+
writerData.generation
87+
= highestReaderGeneration + BOutputStorage<ElementType>::WINDOW_SIZE;
9588
// casMax(m_highestWriterGeneration, writerData.generation);
9689
}
97-
m_buckets[writerData.writePosition].generation = writerData.generation;
90+
this->m_buckets[writerData.writePosition].generation = writerData.generation;
9891

99-
m_buckets[writerData.writePosition].lock.unlock();
92+
this->m_buckets[writerData.writePosition].lock.unlock();
10093

10194
if (containersLeft == 0) {
102-
auto& z = getNextContainer(writerData.bucketAllocation);
103-
if (!z.empty() && z.getContainer().readTimes == 0) {
104-
throw std::runtime_error("XXX");
105-
}
106-
z.assign(container, *m_allocationBuffer);
107-
// getNextContainer(writerData.bucketAllocation).assign(container, *m_allocationBuffer);
95+
this->m_allocationBuffer->replace(
96+
this->getNextElement(writerData.bucketAllocation),
97+
element,
98+
writerIndex);
10899
}
109100
return true;
110101
}
111102

112-
std::optional<ReferenceCounterHandler<OutputContainer>> getContainer(
103+
const ElementType* read(
113104
const std::size_t readerGroupIndex,
114105
const uint8_t localReaderIndex,
115106
const uint8_t globalReaderIndex) noexcept override
116107
{
117-
ReaderData& readerData = m_readersData[globalReaderIndex].get();
108+
typename BOutputStorage<ElementType>::ReaderData& readerData
109+
= this->m_readersData[globalReaderIndex].get();
118110
// const uint64_t readPosition = readerData.readPosition;
119111
if (readerData.bucketAllocation.containersLeft()) {
120-
return std::make_optional<ReferenceCounterHandler<OutputContainer>>(
121-
getReferenceCounter(getNextContainer(readerData.bucketAllocation)));
112+
return this->getNextElement(readerData.bucketAllocation);
122113
}
123114

124115
uint8_t loopCounter = 0;
125116
uint64_t cachedGeneration;
126117
uint16_t cachedBucketIndex;
127118
const uint16_t initialPosition = readerData.readPosition;
128119
do {
129-
readerData.shift(m_readerGroupSizes[readerGroupIndex], localReaderIndex);
130-
d_readerShifts++;
120+
readerData.shift(this->m_readerGroupSizes[readerGroupIndex], localReaderIndex);
131121

132-
auto& y = m_buckets[readerData.readPosition];
133-
if (readerData.isOnBufferBegin(m_readerGroupSizes[readerGroupIndex])) {
134-
if (!writersPresent()) {
122+
auto& y = this->m_buckets[readerData.readPosition];
123+
if (readerData.isOnBufferBegin(this->m_readerGroupSizes[readerGroupIndex])) {
124+
if (!this->writersPresent()) {
135125
readerData.generation++;
136126
updateLowestReaderGeneration(globalReaderIndex);
137-
return std::nullopt;
127+
return nullptr;
138128
}
139129
if (!readerData.seenValidBucket) {
140130
updateLowestReaderGeneration(globalReaderIndex);
141131
std::this_thread::yield();
142-
d_readerYields++;
143132
readerData.skipLoop = true;
144-
return std::nullopt;
133+
return nullptr;
145134
}
146135
readerData.generation++;
147136
readerData.seenValidBucket = false;
148137
readerData.skipLoop = false;
149138
updateLowestReaderGeneration(globalReaderIndex);
150139
}
151-
cachedGeneration = m_buckets[readerData.readPosition].generation;
140+
cachedGeneration = this->m_buckets[readerData.readPosition].generation;
152141
std::atomic_thread_fence(std::memory_order_acquire);
153-
cachedBucketIndex = m_buckets[readerData.readPosition].bucketIndex;
142+
cachedBucketIndex = this->m_buckets[readerData.readPosition].bucketIndex;
154143
if (cachedGeneration >= readerData.generation + 2) {
155144
readerData.seenValidBucket = true;
156145
}
157146
} while (cachedGeneration != readerData.generation
158-
|| !BucketAllocation::isValidBucketIndex(cachedBucketIndex));
147+
|| !BOutputStorage<ElementType>::BucketAllocation::isValidBucketIndex(
148+
cachedBucketIndex));
159149

160150
readerData.seenValidBucket = true;
161-
readerData.bucketAllocation.reset(m_buckets[readerData.readPosition].bucketIndex);
151+
readerData.bucketAllocation.reset(this->m_buckets[readerData.readPosition].bucketIndex);
162152

163-
return std::make_optional<ReferenceCounterHandler<OutputContainer>>(
164-
getReferenceCounter(getNextContainer(readerData.bucketAllocation)));
153+
return this->getNextElement(readerData.bucketAllocation);
165154
}
166155

167156
/*bool finished([[maybe_unused]] const std::size_t readerGroupIndex) noexcept override
@@ -172,12 +161,14 @@ class B2OutputStorage : public BOutputStorage {
172161
protected:
173162
void updateLowestReaderGeneration(const uint8_t globalReaderIndex) noexcept
174163
{
175-
boost::container::static_vector<uint64_t, MAX_READERS_COUNT> readerGenerations
176-
= m_readersData
177-
| std::views::transform([](const CacheAlligned<ReaderData>& readerDataAlligned) {
178-
return readerDataAlligned->generation;
179-
})
180-
| std::ranges::to<boost::container::static_vector<uint64_t, MAX_READERS_COUNT>>();
164+
const auto readerGenerations
165+
= this->m_readersData
166+
| std::views::transform(
167+
[](const CacheAlligned<typename BOutputStorage<ElementType>::ReaderData>&
168+
readerDataAlligned) { return readerDataAlligned->generation; })
169+
| std::ranges::to<boost::container::static_vector<
170+
uint64_t,
171+
OutputStorage<ElementType>::MAX_READERS_COUNT>>();
181172
const uint64_t highestReaderGeneration = *std::ranges::max_element(readerGenerations);
182173
/*uint64_t expected;
183174
do {
@@ -189,11 +180,11 @@ class B2OutputStorage : public BOutputStorage {
189180
expected,
190181
highestReaderGeneration,
191182
std::memory_order_release));*/
192-
casMax(m_highestReaderGeneration, highestReaderGeneration);
183+
casMax(this->m_highestReaderGeneration, highestReaderGeneration);
193184
// m_highestReaderGeneration = highestReaderGeneration;
194185
const uint64_t lowestReaderGeneration = *std::ranges::min_element(readerGenerations);
195186
// casMin(m_lowestReaderGeneration, lowestReaderGeneration);
196-
m_lowestReaderGeneration = lowestReaderGeneration;
187+
this->m_lowestReaderGeneration = lowestReaderGeneration;
197188
}
198189

199190
std::atomic<uint64_t> m_highestWriterGeneration {0};

0 commit comments

Comments
 (0)