diff --git a/Framework/Core/include/Framework/DataModelViews.h b/Framework/Core/include/Framework/DataModelViews.h index 53d6e6615b96e..dd8d65ea16459 100644 --- a/Framework/Core/include/Framework/DataModelViews.h +++ b/Framework/Core/include/Framework/DataModelViews.h @@ -16,6 +16,7 @@ #include "DomainInfoHeader.h" #include "SourceInfoHeader.h" #include "Headers/DataHeader.h" +#include "Framework/DataRef.h" #include "Framework/TimesliceSlot.h" #include #include @@ -80,10 +81,7 @@ struct count_parts { } }; -struct DataRefIndices { - size_t headerIdx; - size_t payloadIdx; -}; +// DataRefIndices is defined in Framework/DataRef.h struct get_pair { size_t pairId; diff --git a/Framework/Core/include/Framework/DataRef.h b/Framework/Core/include/Framework/DataRef.h index d4cba88b19333..aad667c8c33bd 100644 --- a/Framework/Core/include/Framework/DataRef.h +++ b/Framework/Core/include/Framework/DataRef.h @@ -12,6 +12,7 @@ #define FRAMEWORK_DATAREF_H #include // for size_t +#include namespace o2 { @@ -29,6 +30,15 @@ struct DataRef { size_t payloadSize = 0; }; +/// Raw indices into the message vector for one (header, payload) pair. +/// Kept in a lightweight header so InputSpan can use it without pulling in FairMQ. +struct DataRefIndices { + size_t headerIdx; + size_t payloadIdx; + bool operator==(const DataRefIndices&) const = default; + auto operator<=>(const DataRefIndices&) const = default; +}; + } // namespace framework } // namespace o2 diff --git a/Framework/Core/include/Framework/InputRecord.h b/Framework/Core/include/Framework/InputRecord.h index 96963f88524be..d2e152c1bcacc 100644 --- a/Framework/Core/include/Framework/InputRecord.h +++ b/Framework/Core/include/Framework/InputRecord.h @@ -13,6 +13,7 @@ #include "Framework/DataRef.h" #include "Framework/DataRefUtils.h" +#include "Framework/InputSpan.h" #include "Framework/InputRoute.h" #include "Framework/TypeTraits.h" #include "Framework/TableConsumer.h" @@ -202,6 +203,15 @@ class InputRecord [[nodiscard]] size_t getNofParts(int pos) const; + /// O(1) access to the part described by @a indices in slot @a pos. + [[nodiscard]] DataRef getAtIndices(int pos, DataRefIndices indices) const; + + /// O(1) advance from @a current to the next part's indices in slot @a pos. + [[nodiscard]] DataRefIndices nextIndices(int pos, DataRefIndices current) const + { + return mSpan.nextIndices(pos, current); + } + // Given a binding by string, return the associated DataRef DataRef getDataRefByString(const char* bindingName, int part = 0) const { @@ -568,8 +578,8 @@ class InputRecord Iterator() = delete; - Iterator(ParentType const* parent, size_t position = 0, size_t size = 0) - : mPosition(position), mSize(size > position ? size : position), mParent(parent), mElement{nullptr, nullptr, nullptr} + Iterator(ParentType const* parent, bool isEnd = false) + : mPosition(isEnd ? parent->size() : 0), mSize(parent->size()), mParent(parent), mElement{nullptr, nullptr, nullptr} { if (mPosition < mSize) { if (mParent->isValid(mPosition)) { @@ -678,18 +688,29 @@ class InputRecord using reference = typename BaseType::reference; using pointer = typename BaseType::pointer; using ElementType = typename std::remove_const::type; - using iterator = Iterator; - using const_iterator = Iterator; + using iterator = InputSpan::Iterator; + using const_iterator = InputSpan::Iterator; + + InputRecordIterator(InputRecord const* parent, bool isEnd = false) + : BaseType(parent, isEnd) + { + } + + /// Initial indices for part-level iteration: first part starts at {headerIdx=0, payloadIdx=1}. + [[nodiscard]] DataRefIndices initialIndices() const { return {0, 1}; } + /// Sentinel used by nextIndicesGetter to signal end-of-slot. + [[nodiscard]] DataRefIndices endIndices() const { return {size_t(-1), size_t(-1)}; } - InputRecordIterator(InputRecord const* parent, size_t position = 0, size_t size = 0) - : BaseType(parent, position, size) + /// Get element at the given raw message indices in O(1). + [[nodiscard]] ElementType getAtIndices(DataRefIndices indices) const { + return this->parent()->getAtIndices(this->position(), indices); } - /// Get element at {slotindex, partindex} - [[nodiscard]] ElementType getByPos(size_t pos) const + /// Advance @a current to the next part's indices in O(1). + [[nodiscard]] DataRefIndices nextIndices(DataRefIndices current) const { - return this->parent()->getByPos(this->position(), pos); + return this->parent()->nextIndices(this->position(), current); } /// Check if slot is valid, index of part is not used @@ -709,12 +730,12 @@ class InputRecord [[nodiscard]] const_iterator begin() const { - return const_iterator(this, 0, size()); + return const_iterator(this, size() == 0); } [[nodiscard]] const_iterator end() const { - return const_iterator(this, size()); + return const_iterator(this, true); } }; @@ -723,12 +744,12 @@ class InputRecord [[nodiscard]] const_iterator begin() const { - return {this, 0, size()}; + return {this, false}; } [[nodiscard]] const_iterator end() const { - return {this, size()}; + return {this, true}; } InputSpan& span() diff --git a/Framework/Core/include/Framework/InputSpan.h b/Framework/Core/include/Framework/InputSpan.h index cf8c8acda6796..dbe270f0e030d 100644 --- a/Framework/Core/include/Framework/InputSpan.h +++ b/Framework/Core/include/Framework/InputSpan.h @@ -14,8 +14,8 @@ #include "Framework/DataRef.h" #include -extern template class std::function; -extern template class std::function; +extern template class std::function; +extern template class std::function; namespace o2::framework { @@ -32,37 +32,48 @@ class InputSpan InputSpan(InputSpan const&) = delete; InputSpan(InputSpan&&) = default; - /// @a getter is the mapping between an element of the span referred by - /// index and the buffer associated. - /// @a size is the number of elements in the span. - InputSpan(std::function getter, size_t size); + /// Navigate the message store via the DataRefIndices protocol. + /// get_next_pair (DataModelViews.h) provides O(1) sequential advancement for nextIndicesGetter. + InputSpan(std::function nofPartsGetter, + std::function refCountGetter, + std::function indicesGetter, + std::function nextIndicesGetter, + size_t size); - /// @a getter is the mapping between an element of the span referred by - /// index and the buffer associated. - /// @a size is the number of elements in the span. - InputSpan(std::function getter, size_t size); + /// @a i-th element of the InputSpan (O(partidx) sequential scan via indices protocol) + [[nodiscard]] DataRef get(size_t i, size_t partidx = 0) const + { + DataRefIndices idx{0, 1}; + for (size_t p = 0; p < partidx; ++p) { + idx = mNextIndicesGetter(i, idx); + } + return mIndicesGetter(i, idx); + } - /// @a getter is the mapping between an element of the span referred by - /// index and the buffer associated. - /// @nofPartsGetter is the getter for the number of parts associated with an index - /// @a size is the number of elements in the span. - InputSpan(std::function getter, std::function nofPartsGetter, std::function refCountGetter, size_t size); + /// Return the DataRef for the part described by @a indices in slot @a slotIdx in O(1). + [[nodiscard]] DataRef getAtIndices(size_t slotIdx, DataRefIndices indices) const + { + return mIndicesGetter(slotIdx, indices); + } - /// @a i-th element of the InputSpan - [[nodiscard]] DataRef get(size_t i, size_t partidx = 0) const + /// Advance from @a current to the indices of the next part in slot @a slotIdx in O(1). + [[nodiscard]] DataRefIndices nextIndices(size_t slotIdx, DataRefIndices current) const { - return mGetter(i, partidx); + return mNextIndicesGetter(slotIdx, current); } + // --- slot-level Iterator protocol (headerIdx doubles as slot position) --- + [[nodiscard]] DataRefIndices initialIndices() const { return {0, 0}; } + [[nodiscard]] DataRefIndices endIndices() const { return {mSize, 0}; } + [[nodiscard]] DataRef getAtIndices(DataRefIndices indices) const { return mIndicesGetter(indices.headerIdx, {0, 1}); } + [[nodiscard]] DataRefIndices nextIndices(DataRefIndices current) const { return {current.headerIdx + 1, 0}; } + /// @a number of parts in the i-th element of the InputSpan [[nodiscard]] size_t getNofParts(size_t i) const { if (i >= mSize) { return 0; } - if (!mNofPartsGetter) { - return 1; - } return mNofPartsGetter(i); } @@ -94,7 +105,8 @@ class InputSpan return get(i).payload; } - /// an iterator class working on position within the a parent class + /// An iterator over the elements of a parent container using the DataRefIndices protocol. + /// ParentT must provide: initialIndices(), getAtIndices(DataRefIndices), nextIndices(DataRefIndices). template class Iterator { @@ -110,23 +122,23 @@ class InputSpan Iterator() = delete; - Iterator(ParentType const* parent, size_t position = 0, size_t size = 0) - : mPosition(position), mSize(size > position ? size : position), mParent(parent), mElement{} + Iterator(ParentType const* parent, bool isEnd = false) + : mParent(parent), + mCurrentIndices(isEnd ? parent->endIndices() : parent->initialIndices()), + mElement{} { - if (mPosition < mSize) { - mElement = mParent->get(mPosition); + if (mCurrentIndices != mParent->endIndices()) { + mElement = mParent->getAtIndices(mCurrentIndices); } } - ~Iterator() = default; - // prefix increment SelfType& operator++() { - if (mPosition < mSize && ++mPosition < mSize) { - mElement = mParent->get(mPosition); + mCurrentIndices = mParent->nextIndices(mCurrentIndices); + if (mCurrentIndices != mParent->endIndices()) { + mElement = mParent->getAtIndices(mCurrentIndices); } else { - // reset the element to the default value of the type mElement = ElementType{}; } return *this; @@ -145,16 +157,14 @@ class InputSpan return mElement; } - // comparison bool operator==(const SelfType& rh) const { - return mPosition == rh.mPosition; + return mCurrentIndices == rh.mCurrentIndices; } - // comparison - bool operator!=(const SelfType& rh) const + auto operator<=>(const SelfType& rh) const { - return mPosition != rh.mPosition; + return mCurrentIndices <=> rh.mCurrentIndices; } // return pointer to parent instance @@ -163,22 +173,21 @@ class InputSpan return mParent; } - // return current position + // return current position (headerIdx serves as the slot index for slot-level iteration) [[nodiscard]] size_t position() const { - return mPosition; + return mCurrentIndices.headerIdx; } private: - size_t mPosition; - size_t mSize; ParentType const* mParent; + DataRefIndices mCurrentIndices; ElementType mElement; }; /// @class InputSpanIterator - /// An iterator over the input slots - /// It supports an iterator interface to access the parts in the slot + /// An iterator over the input slots. + /// It supports an iterator interface to access the parts in the slot. template class InputSpanIterator : public Iterator { @@ -192,24 +201,26 @@ class InputSpan using iterator = Iterator; using const_iterator = Iterator; - InputSpanIterator(InputSpan const* parent, size_t position = 0, size_t size = 0) - : BaseType(parent, position, size) + InputSpanIterator(InputSpan const* parent, bool isEnd = false) + : BaseType(parent, isEnd) { } - /// Get element at {slotindex, partindex} - [[nodiscard]] ElementType get(size_t pos) const + /// Initial indices for part-level iteration: first part starts at {headerIdx=0, payloadIdx=1}. + [[nodiscard]] DataRefIndices initialIndices() const { return {0, 1}; } + /// Sentinel used by nextIndicesGetter to signal end-of-slot. + [[nodiscard]] DataRefIndices endIndices() const { return {size_t(-1), size_t(-1)}; } + + /// Get element at the given raw message indices in O(1). + [[nodiscard]] ElementType getAtIndices(DataRefIndices indices) const { - return this->parent()->get(this->position(), pos); + return this->parent()->getAtIndices(this->position(), indices); } - /// Check if slot is valid, index of part is not used - [[nodiscard]] bool isValid(size_t = 0) const + /// Advance @a current to the next part's indices in O(1). + [[nodiscard]] DataRefIndices nextIndices(DataRefIndices current) const { - if (this->position() < this->parent()->size()) { - return this->parent()->isValid(this->position()); - } - return false; + return this->parent()->nextIndices(this->position(), current); } /// Get number of parts in input slot @@ -218,15 +229,14 @@ class InputSpan return this->parent()->getNofParts(this->position()); } - // iterator for the part access [[nodiscard]] const_iterator begin() const { - return const_iterator(this, 0, size()); + return const_iterator(this, size() == 0); } [[nodiscard]] const_iterator end() const { - return const_iterator(this, size()); + return const_iterator(this, true); } }; @@ -236,19 +246,19 @@ class InputSpan // supporting read-only access and returning const_iterator [[nodiscard]] const_iterator begin() const { - return {this, 0, size()}; + return {this, false}; } - // supporting read-only access and returning const_iterator [[nodiscard]] const_iterator end() const { - return {this, size()}; + return {this, true}; } private: - std::function mGetter; std::function mNofPartsGetter; std::function mRefCountGetter; + std::function mIndicesGetter; + std::function mNextIndicesGetter; size_t mSize; }; diff --git a/Framework/Core/src/CompletionPolicyHelpers.cxx b/Framework/Core/src/CompletionPolicyHelpers.cxx index 2b49b8dfa9acd..cc593ee7a2ed9 100644 --- a/Framework/Core/src/CompletionPolicyHelpers.cxx +++ b/Framework/Core/src/CompletionPolicyHelpers.cxx @@ -325,9 +325,9 @@ CompletionPolicy CompletionPolicyHelpers::consumeWhenAnyWithAllConditions(const // But I don't see any possibility to handle this in a better way. // Iterate on all specs and all inputs simultaneously - for (size_t i = 0; i < inputs.size(); ++i) { - char const* header = inputs.header(i); - auto& spec = specs[i]; + for (auto it = inputs.begin(), end = inputs.end(); it != end; ++it) { + char const* header = (*it).header; + auto& spec = specs[it.position()]; // In case a condition object is not there, we need to wait. if (header != nullptr) { canConsume = true; diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index 6b90747550278..b062f2bf68a75 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -2133,25 +2133,24 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v } else { currentSetOfInputs = relayer.consumeExistingInputsForTimeslice(slot); } - auto getter = [¤tSetOfInputs](size_t i, size_t partindex) -> DataRef { - if ((currentSetOfInputs[i] | count_payloads{}) > partindex) { - const char* headerptr = nullptr; - const char* payloadptr = nullptr; - size_t payloadSize = 0; - // - each input can have multiple parts - // - "part" denotes a sequence of messages belonging together, the first message of the - // sequence is the header message - // - each part has one or more payload messages - // - InputRecord provides all payloads as header-payload pair - auto const indices = currentSetOfInputs[i] | get_pair{partindex}; - auto const& headerMsg = currentSetOfInputs[i][indices.headerIdx]; - auto const& payloadMsg = currentSetOfInputs[i][indices.payloadIdx]; - headerptr = static_cast(headerMsg->GetData()); - payloadptr = payloadMsg ? static_cast(payloadMsg->GetData()) : nullptr; - payloadSize = payloadMsg ? payloadMsg->GetSize() : 0; - return DataRef{nullptr, headerptr, payloadptr, payloadSize}; + // Convert raw message indices directly to a DataRef in O(1). + // Used both by the sequential PartIterator and as the fallback for positional access. + auto indicesGetter = [¤tSetOfInputs](size_t i, DataRefIndices indices) -> DataRef { + auto const& msgs = currentSetOfInputs[i]; + if (msgs.size() <= indices.headerIdx) { + return DataRef{}; } - return DataRef{}; + auto const& headerMsg = msgs[indices.headerIdx]; + char const* payloadData = nullptr; + size_t payloadSize = 0; + if (msgs.size() > indices.payloadIdx && msgs[indices.payloadIdx]) { + payloadData = static_cast(msgs[indices.payloadIdx]->GetData()); + payloadSize = msgs[indices.payloadIdx]->GetSize(); + } + return DataRef{nullptr, + headerMsg ? static_cast(headerMsg->GetData()) : nullptr, + payloadData, + payloadSize}; }; auto nofPartsGetter = [¤tSetOfInputs](size_t i) -> size_t { return (currentSetOfInputs[i] | count_payloads{}); @@ -2160,7 +2159,11 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v auto& header = static_cast(*(currentSetOfInputs[idx] | get_header{0})); return header.GetRefCount(); }; - return InputSpan{getter, nofPartsGetter, refCountGetter, currentSetOfInputs.size()}; + auto nextIndicesGetter = [¤tSetOfInputs](size_t i, DataRefIndices current) -> DataRefIndices { + auto next = currentSetOfInputs[i] | get_next_pair{current}; + return next.headerIdx < currentSetOfInputs[i].size() ? next : DataRefIndices{size_t(-1), size_t(-1)}; + }; + return InputSpan{nofPartsGetter, refCountGetter, indicesGetter, nextIndicesGetter, currentSetOfInputs.size()}; }; auto markInputsAsDone = [ref](TimesliceSlot slot) -> void { diff --git a/Framework/Core/src/DataRelayer.cxx b/Framework/Core/src/DataRelayer.cxx index fc9966ffad643..7adf5b5c97fbb 100644 --- a/Framework/Core/src/DataRelayer.cxx +++ b/Framework/Core/src/DataRelayer.cxx @@ -212,18 +212,6 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector(header->GetData()), - reinterpret_cast(payload ? payload->GetData() : nullptr), - payload ? payload->GetSize() : 0}; - } - return DataRef{}; - }; auto nPartsGetter = [&partial](size_t idx) { return partial[idx] | count_parts{}; }; @@ -231,7 +219,24 @@ DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector(*(partial[idx] | get_header{0})); return header.GetRefCount(); }; - InputSpan span{getter, nPartsGetter, refCountGetter, static_cast(partial.size())}; + auto indicesGetter = [&partial](size_t idx, DataRefIndices indices) -> DataRef { + if (!partial[idx].empty()) { + auto const& headerMsg = partial[idx][indices.headerIdx]; + auto const& payloadMsg = partial[idx][indices.payloadIdx]; + if (headerMsg) { + return DataRef{nullptr, + reinterpret_cast(headerMsg->GetData()), + payloadMsg ? reinterpret_cast(payloadMsg->GetData()) : nullptr, + payloadMsg ? payloadMsg->GetSize() : 0}; + } + } + return DataRef{}; + }; + auto nextIndicesGetter = [&partial](size_t idx, DataRefIndices current) -> DataRefIndices { + auto next = partial[idx] | get_next_pair{current}; + return next.headerIdx < partial[idx].size() ? next : DataRefIndices{size_t(-1), size_t(-1)}; + }; + InputSpan span{nPartsGetter, refCountGetter, indicesGetter, nextIndicesGetter, static_cast(partial.size())}; // Setup the input span if (expirator.checker(services, timestamp.value, span) == false) { @@ -789,18 +794,6 @@ void DataRelayer::getReadyToProcess(std::vector& comp throw runtime_error_f("Completion police %s has no callback set", mCompletionPolicy.name.c_str()); } auto partial = getPartialRecord(li); - // TODO: get the data ref from message model - auto getter = [&partial](size_t idx, size_t part) { - if (!partial[idx].empty() && (partial[idx] | get_header{part}).get()) { - auto header = (partial[idx] | get_header{part}).get(); - auto payload = (partial[idx] | get_payload{part, 0}).get(); - return DataRef{nullptr, - reinterpret_cast(header->GetData()), - reinterpret_cast(payload ? payload->GetData() : nullptr), - payload ? payload->GetSize() : 0}; - } - return DataRef{}; - }; auto nPartsGetter = [&partial](size_t idx) { return partial[idx] | count_parts{}; }; @@ -808,7 +801,24 @@ void DataRelayer::getReadyToProcess(std::vector& comp auto& header = static_cast(*(partial[idx] | get_header{0})); return header.GetRefCount(); }; - InputSpan span{getter, nPartsGetter, refCountGetter, static_cast(partial.size())}; + auto indicesGetter = [&partial](size_t idx, DataRefIndices indices) -> DataRef { + if (!partial[idx].empty()) { + auto const& headerMsg = partial[idx][indices.headerIdx]; + auto const& payloadMsg = partial[idx][indices.payloadIdx]; + if (headerMsg) { + return DataRef{nullptr, + reinterpret_cast(headerMsg->GetData()), + payloadMsg ? reinterpret_cast(payloadMsg->GetData()) : nullptr, + payloadMsg ? payloadMsg->GetSize() : 0}; + } + } + return DataRef{}; + }; + auto nextIndicesGetter = [&partial](size_t idx, DataRefIndices current) -> DataRefIndices { + auto next = partial[idx] | get_next_pair{current}; + return next.headerIdx < partial[idx].size() ? next : DataRefIndices{size_t(-1), size_t(-1)}; + }; + InputSpan span{nPartsGetter, refCountGetter, indicesGetter, nextIndicesGetter, static_cast(partial.size())}; CompletionPolicy::CompletionOp action = mCompletionPolicy.callbackFull(span, mInputs, mContext); auto& variables = mTimesliceIndex.getVariablesForSlot(slot); diff --git a/Framework/Core/src/InputRecord.cxx b/Framework/Core/src/InputRecord.cxx index 18b341704ffcb..7bc9907b13ba4 100644 --- a/Framework/Core/src/InputRecord.cxx +++ b/Framework/Core/src/InputRecord.cxx @@ -139,6 +139,16 @@ size_t InputRecord::getNofParts(int pos) const } return mSpan.getNofParts(pos); } + +DataRef InputRecord::getAtIndices(int pos, DataRefIndices indices) const +{ + auto ref = mSpan.getAtIndices(pos, indices); + if (pos >= 0 && pos < (int)mInputsSchema.size()) { + ref.spec = &mInputsSchema[pos].matcher; + } + return ref; +} + size_t InputRecord::size() const { return mSpan.size(); diff --git a/Framework/Core/src/InputSpan.cxx b/Framework/Core/src/InputSpan.cxx index d1dffc85602a5..ccea2d1dd66ed 100644 --- a/Framework/Core/src/InputSpan.cxx +++ b/Framework/Core/src/InputSpan.cxx @@ -11,29 +11,17 @@ #include "Framework/InputSpan.h" -template class std::function; -template class std::function; +template class std::function; +template class std::function; namespace o2::framework { -InputSpan::InputSpan(std::function getter, size_t size) - : mGetter{}, mNofPartsGetter{}, mSize{size} -{ - mGetter = [getter](size_t index, size_t) -> DataRef { - return getter(index); - }; -} - -InputSpan::InputSpan(std::function getter, size_t size) - : mGetter{getter}, mNofPartsGetter{}, mSize{size} -{ -} - -InputSpan::InputSpan(std::function getter, - std::function nofPartsGetter, +InputSpan::InputSpan(std::function nofPartsGetter, std::function refCountGetter, + std::function indicesGetter, + std::function nextIndicesGetter, size_t size) - : mGetter{getter}, mNofPartsGetter{nofPartsGetter}, mRefCountGetter(refCountGetter), mSize{size} + : mNofPartsGetter{nofPartsGetter}, mRefCountGetter(refCountGetter), mIndicesGetter{std::move(indicesGetter)}, mNextIndicesGetter{std::move(nextIndicesGetter)}, mSize{size} { } diff --git a/Framework/Core/test/benchmark_InputRecord.cxx b/Framework/Core/test/benchmark_InputRecord.cxx index 69fc3c970c1e1..e3ec00ac815ed 100644 --- a/Framework/Core/test/benchmark_InputRecord.cxx +++ b/Framework/Core/test/benchmark_InputRecord.cxx @@ -47,7 +47,12 @@ static void BM_InputRecordGenericGetters(benchmark::State& state) createRoute("z_source", spec3)}; // First of all we test if an empty registry behaves as expected, raising a // bunch of exceptions. - InputSpan span{[](size_t) { return DataRef{nullptr, nullptr, nullptr}; }, 0}; + InputSpan span{ + [](size_t) -> size_t { return 0; }, + nullptr, + [](size_t, DataRefIndices) { return DataRef{nullptr, nullptr, nullptr}; }, + [](size_t, DataRefIndices) -> DataRefIndices { return {size_t(-1), size_t(-1)}; }, + 0}; ServiceRegistry registry; InputRecord emptyRecord(schema, span, registry); @@ -82,7 +87,12 @@ static void BM_InputRecordGenericGetters(benchmark::State& state) createMessage(dh1, 1); createMessage(dh2, 2); createEmpty(); - InputSpan span2{[&inputs](size_t i) { return DataRef{nullptr, static_cast(inputs[2 * i]), static_cast(inputs[2 * i + 1])}; }, inputs.size() / 2}; + InputSpan span2{ + [](size_t) -> size_t { return 1; }, + nullptr, + [&inputs](size_t i, DataRefIndices idx) { return DataRef{nullptr, static_cast(inputs[2 * i + idx.headerIdx]), static_cast(inputs[2 * i + idx.payloadIdx])}; }, + [](size_t, DataRefIndices) -> DataRefIndices { return {size_t(-1), size_t(-1)}; }, + inputs.size() / 2}; InputRecord record{schema, span2, registry}; for (auto _ : state) { diff --git a/Framework/Core/test/test_CompletionPolicy.cxx b/Framework/Core/test/test_CompletionPolicy.cxx index 059f20b352b3d..cc16ba95ba8f2 100644 --- a/Framework/Core/test/test_CompletionPolicy.cxx +++ b/Framework/Core/test/test_CompletionPolicy.cxx @@ -55,7 +55,12 @@ TEST_CASE("TestCompletionPolicy_callback") std::vector policies{ {"test", matcher, callback}}; CompletionPolicy::InputSetElement ref{nullptr, reinterpret_cast(stack.data()), nullptr}; - InputSpan const& inputs{[&ref](size_t) { return ref; }, 1}; + InputSpan const inputs{ + [](size_t) -> size_t { return 1; }, + nullptr, + [&ref](size_t, DataRefIndices) -> DataRef { return ref; }, + [](size_t, DataRefIndices) -> DataRefIndices { return {size_t(-1), size_t(-1)}; }, + 1}; std::vector specs; ServiceRegistryRef servicesRef{services}; for (auto& policy : policies) { diff --git a/Framework/Core/test/test_InputRecord.cxx b/Framework/Core/test/test_InputRecord.cxx index 4eb1265dcff53..355e52539ea5a 100644 --- a/Framework/Core/test/test_InputRecord.cxx +++ b/Framework/Core/test/test_InputRecord.cxx @@ -47,7 +47,10 @@ TEST_CASE("TestInputRecord") // First of all we test if an empty registry behaves as expected, raising a // bunch of exceptions. InputSpan span{ - [](size_t) { return DataRef{nullptr, nullptr, nullptr}; }, + [](size_t) -> size_t { return 0; }, + nullptr, + [](size_t, DataRefIndices) { return DataRef{nullptr, nullptr, nullptr}; }, + [](size_t, DataRefIndices) -> DataRefIndices { return {size_t(-1), size_t(-1)}; }, 0}; ServiceRegistry registry; InputRecord emptyRecord(schema, span, registry); @@ -91,7 +94,12 @@ TEST_CASE("TestInputRecord") createMessage(dh1, 1); createMessage(dh2, 2); createEmpty(); - InputSpan span2{[&inputs](size_t i) { return DataRef{nullptr, static_cast(inputs[2 * i]), static_cast(inputs[2 * i + 1])}; }, inputs.size() / 2}; + InputSpan span2{ + [](size_t) -> size_t { return 1; }, + nullptr, + [&inputs](size_t i, DataRefIndices idx) { return DataRef{nullptr, static_cast(inputs[2 * i + idx.headerIdx]), static_cast(inputs[2 * i + idx.payloadIdx])}; }, + [](size_t, DataRefIndices) -> DataRefIndices { return {size_t(-1), size_t(-1)}; }, + inputs.size() / 2}; InputRecord record{schema, span2, registry}; // Checking we can get the whole ref by name diff --git a/Framework/Core/test/test_InputRecordWalker.cxx b/Framework/Core/test/test_InputRecordWalker.cxx index 9af3c0dd2dbe2..1fcfea1ba1587 100644 --- a/Framework/Core/test/test_InputRecordWalker.cxx +++ b/Framework/Core/test/test_InputRecordWalker.cxx @@ -35,16 +35,12 @@ struct DataSet { using Messages = std::vector; using CheckType = std::vector; DataSet(std::vector&& s, Messages&& m, CheckType&& v, ServiceRegistryRef registry) - : schema{std::move(s)}, messages{std::move(m)}, span{[this](size_t i, size_t part) { - REQUIRE(i < this->messages.size()); - REQUIRE(part < this->messages[i].second.size() / 2); - auto header = static_cast(this->messages[i].second.at(2 * part)->data()); - auto payload = static_cast(this->messages[i].second.at(2 * part + 1)->data()); - return DataRef{nullptr, header, payload}; - }, - [this](size_t i) { return i < this->messages.size() ? messages[i].second.size() / 2 : 0; }, nullptr, this->messages.size()}, - record{schema, span, registry}, - values{std::move(v)} + : schema{std::move(s)}, messages{std::move(m)}, span{[this](size_t i) { return i < this->messages.size() ? messages[i].second.size() / 2 : 0; }, nullptr, [this](size_t i, DataRefIndices idx) { + auto header = static_cast(this->messages[i].second.at(idx.headerIdx)->data()); + auto payload = static_cast(this->messages[i].second.at(idx.payloadIdx)->data()); + return DataRef{nullptr, header, payload}; }, [this](size_t i, DataRefIndices current) -> DataRefIndices { + size_t next = current.headerIdx + 2; + return next < this->messages[i].second.size() ? DataRefIndices{next, next + 1} : DataRefIndices{size_t(-1), size_t(-1)}; }, this->messages.size()}, record{schema, span, registry}, values{std::move(v)} { REQUIRE(messages.size() == schema.size()); } diff --git a/Framework/Core/test/test_InputSpan.cxx b/Framework/Core/test/test_InputSpan.cxx index c5682aea80b6c..dc31085e741fd 100644 --- a/Framework/Core/test/test_InputSpan.cxx +++ b/Framework/Core/test/test_InputSpan.cxx @@ -30,14 +30,18 @@ TEST_CASE("TestInputSpan") routeNo++; } - auto getter = [&inputs](size_t i, size_t part) { - return DataRef{nullptr, inputs[i].at(part * 2).data(), inputs[i].at(part * 2 + 1).data()}; - }; auto nPartsGetter = [&inputs](size_t i) { return inputs[i].size() / 2; }; + auto indicesGetter = [&inputs](size_t i, DataRefIndices indices) { + return DataRef{nullptr, inputs[i].at(indices.headerIdx).data(), inputs[i].at(indices.payloadIdx).data()}; + }; + auto nextIndicesGetter = [&inputs](size_t i, DataRefIndices current) -> DataRefIndices { + size_t next = current.headerIdx + 2; + return next < inputs[i].size() ? DataRefIndices{next, next + 1} : DataRefIndices{size_t(-1), size_t(-1)}; + }; - InputSpan span{getter, nPartsGetter, nullptr, inputs.size()}; + InputSpan span{nPartsGetter, nullptr, indicesGetter, nextIndicesGetter, inputs.size()}; REQUIRE(span.size() == inputs.size()); routeNo = 0; for (; routeNo < span.size(); ++routeNo) { diff --git a/Framework/Utils/test/RawPageTestData.h b/Framework/Utils/test/RawPageTestData.h index a6b800f7cba32..29ac4eeba6b5b 100644 --- a/Framework/Utils/test/RawPageTestData.h +++ b/Framework/Utils/test/RawPageTestData.h @@ -42,13 +42,17 @@ struct DataSet { DataSet(std::vector&& s, Messages&& m, std::vector&& v, ServiceRegistryRef registry) : schema{std::move(s)}, messages{std::move(m)}, - span{[this](size_t i, size_t part) { - auto header = static_cast(this->messages[i].at(2 * part)->data()); - auto payload = static_cast(this->messages[i].at(2 * part + 1)->data()); + span{[this](size_t i) { return i < this->messages.size() ? messages[i].size() / 2 : 0; }, + nullptr, + [this](size_t i, DataRefIndices idx) { + auto header = static_cast(this->messages[i].at(idx.headerIdx)->data()); + auto payload = static_cast(this->messages[i].at(idx.payloadIdx)->data()); return DataRef{nullptr, header, payload}; }, - [this](size_t i) { return i < this->messages.size() ? messages[i].size() / 2 : 0; }, - nullptr, + [this](size_t i, DataRefIndices current) -> DataRefIndices { + size_t next = current.headerIdx + 2; + return next < this->messages[i].size() ? DataRefIndices{next, next + 1} : DataRefIndices{size_t(-1), size_t(-1)}; + }, this->messages.size()}, record{schema, span, registry}, values{std::move(v)} diff --git a/Framework/Utils/test/test_RootTreeWriter.cxx b/Framework/Utils/test/test_RootTreeWriter.cxx index 62e1eb62cb4f1..e372fb4e1302e 100644 --- a/Framework/Utils/test/test_RootTreeWriter.cxx +++ b/Framework/Utils/test/test_RootTreeWriter.cxx @@ -224,10 +224,14 @@ TEST_CASE("test_RootTreeWriter") {InputSpec{"input8", "TST", "SRLZDVEC"}, 7, "input8", 0}, // }; - auto getter = [&store](size_t i) -> DataRef { - return DataRef{nullptr, static_cast(store[2 * i]->GetData()), static_cast(store[2 * i + 1]->GetData())}; - }; - InputSpan span{getter, store.size() / 2}; + InputSpan span{ + [](size_t) -> size_t { return 1; }, + nullptr, + [&store](size_t i, DataRefIndices idx) -> DataRef { + return DataRef{nullptr, static_cast(store[2 * i + idx.headerIdx]->GetData()), static_cast(store[2 * i + idx.payloadIdx]->GetData())}; + }, + [](size_t, DataRefIndices) -> DataRefIndices { return {size_t(-1), size_t(-1)}; }, + store.size() / 2}; ServiceRegistry registry; InputRecord inputs{ schema, diff --git a/Utilities/DataSampling/src/Dispatcher.cxx b/Utilities/DataSampling/src/Dispatcher.cxx index 3ff0ba661fd93..22dd457a1211a 100644 --- a/Utilities/DataSampling/src/Dispatcher.cxx +++ b/Utilities/DataSampling/src/Dispatcher.cxx @@ -122,7 +122,7 @@ void Dispatcher::run(ProcessingContext& ctx) for (auto inputIt = ctx.inputs().begin(); inputIt != ctx.inputs().end(); inputIt++) { - const DataRef& firstPart = inputIt.getByPos(0); + const DataRef& firstPart = *inputIt; if (firstPart.header == nullptr) { continue; }