Skip to content

Commit 23765b5

Browse files
authored
DPL: Enforce that dpl pipeline length is at least as long as number of TFs in flight (#15048)
1 parent 834cbc5 commit 23765b5

File tree

10 files changed

+75
-33
lines changed

10 files changed

+75
-33
lines changed

Framework/Core/include/Framework/DataRelayer.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,8 @@ class DataRelayer
102102
DataRelayer(CompletionPolicy const&,
103103
std::vector<InputRoute> const& routes,
104104
TimesliceIndex&,
105-
ServiceRegistryRef);
105+
ServiceRegistryRef,
106+
int);
106107

107108
/// This invokes the appropriate `InputRoute::danglingChecker` on every
108109
/// entry in the cache and if it returns true, it creates a new

Framework/Core/include/Framework/DefaultsHelpers.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,24 @@
1212
#ifndef O2_FRAMEWORK_DEFAULTHELPERS_H_
1313
#define O2_FRAMEWORK_DEFAULTHELPERS_H_
1414

15+
namespace fair::mq
16+
{
17+
class ProgOptions;
18+
}
19+
1520
namespace o2::framework
1621
{
1722
enum struct DeploymentMode;
23+
struct DeviceConfig;
1824

1925
struct DefaultsHelpers {
2026
static DeploymentMode deploymentMode();
2127
/// @true if running online
2228
static bool onlineDeploymentMode();
2329
/// get max number of timeslices in the queue
24-
static unsigned int pipelineLength();
30+
static unsigned int pipelineLength(unsigned int minLength);
31+
static unsigned int pipelineLength(const fair::mq::ProgOptions& options);
32+
static unsigned int pipelineLength(const DeviceConfig& dc);
2533
};
2634
} // namespace o2::framework
2735

Framework/Core/src/ArrowSupport.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -564,7 +564,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
564564
if (dc.options.count("timeframes-rate-limit") && dc.options["timeframes-rate-limit"].defaulted() == false) {
565565
config->maxTimeframes = std::stoll(dc.options["timeframes-rate-limit"].as<std::string>());
566566
} else {
567-
config->maxTimeframes = readers * DefaultsHelpers::pipelineLength();
567+
config->maxTimeframes = readers * DefaultsHelpers::pipelineLength(dc);
568568
}
569569
static bool once = false;
570570
// Until we guarantee this is called only once...

Framework/Core/src/CommonServices.cxx

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,11 +414,13 @@ o2::framework::ServiceSpec CommonServices::dataRelayer()
414414
.name = "datarelayer",
415415
.init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
416416
auto& spec = services.get<DeviceSpec const>();
417+
int pipelineLength = DefaultsHelpers::pipelineLength(options);
417418
return ServiceHandle{TypeIdHelpers::uniqueId<DataRelayer>(),
418419
new DataRelayer(spec.completionPolicy,
419420
spec.inputs,
420421
services.get<TimesliceIndex>(),
421-
services)};
422+
services,
423+
pipelineLength)};
422424
},
423425
.configure = noConfiguration(),
424426
.kind = ServiceKind::Serial};

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1483,7 +1483,7 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref)
14831483
auto& infos = state.inputChannelInfos;
14841484

14851485
if (context.balancingInputs) {
1486-
static int pipelineLength = DefaultsHelpers::pipelineLength();
1486+
static int pipelineLength = DefaultsHelpers::pipelineLength(*ref.get<RawDeviceService>().device()->fConfig);
14871487
static uint64_t ahead = getenv("DPL_MAX_CHANNEL_AHEAD") ? std::atoll(getenv("DPL_MAX_CHANNEL_AHEAD")) : std::max(8, std::min(pipelineLength - 48, pipelineLength / 2));
14881488
auto newEnd = std::remove_if(pollOrder.begin(), pollOrder.end(), [&infos, limitNew = currentOldest.value + ahead](int a) -> bool {
14891489
return infos[a].oldestForChannel.value > limitNew;
@@ -2259,12 +2259,14 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
22592259
return false;
22602260
}
22612261

2262-
auto postUpdateStats = [ref](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t tStart, uint64_t tStartMilli) {
2262+
int pipelineLength = DefaultsHelpers::pipelineLength(*ref.get<RawDeviceService>().device()->fConfig);
2263+
2264+
auto postUpdateStats = [ref, pipelineLength](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t tStart, uint64_t tStartMilli) {
22632265
auto& stats = ref.get<DataProcessingStats>();
22642266
auto& states = ref.get<DataProcessingStates>();
22652267
std::atomic_thread_fence(std::memory_order_release);
22662268
char relayerSlotState[1024];
2267-
int written = snprintf(relayerSlotState, 1024, "%d ", DefaultsHelpers::pipelineLength());
2269+
int written = snprintf(relayerSlotState, 1024, "%d ", pipelineLength);
22682270
char* buffer = relayerSlotState + written;
22692271
for (size_t ai = 0; ai != record.size(); ai++) {
22702272
buffer[ai] = record.isValid(ai) ? '3' : '0';
@@ -2291,11 +2293,11 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
22912293
count++;
22922294
};
22932295

2294-
auto preUpdateStats = [ref](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t) {
2296+
auto preUpdateStats = [ref, pipelineLength](DataRelayer::RecordAction const& action, InputRecord const& record, uint64_t) {
22952297
auto& states = ref.get<DataProcessingStates>();
22962298
std::atomic_thread_fence(std::memory_order_release);
22972299
char relayerSlotState[1024];
2298-
snprintf(relayerSlotState, 1024, "%d ", DefaultsHelpers::pipelineLength());
2300+
snprintf(relayerSlotState, 1024, "%d ", pipelineLength);
22992301
char* buffer = strchr(relayerSlotState, ' ') + 1;
23002302
for (size_t ai = 0; ai != record.size(); ai++) {
23012303
buffer[ai] = record.isValid(ai) ? '2' : '0';

Framework/Core/src/DataRelayer.cxx

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include "Framework/DataProcessingStates.h"
3838
#include "Framework/DataTakingContext.h"
3939
#include "Framework/DefaultsHelpers.h"
40+
#include "Framework/RawDeviceService.h"
4041

4142
#include "Headers/DataHeaderHelpers.h"
4243
#include "Framework/Formatters.h"
@@ -48,6 +49,7 @@
4849
#include <fairmq/Channel.h>
4950
#include <functional>
5051
#include <fairmq/shmem/Message.h>
52+
#include <fairmq/Device.h>
5153
#include <fmt/format.h>
5254
#include <fmt/ostream.h>
5355
#include <span>
@@ -70,7 +72,8 @@ constexpr int INVALID_INPUT = -1;
7072
DataRelayer::DataRelayer(const CompletionPolicy& policy,
7173
std::vector<InputRoute> const& routes,
7274
TimesliceIndex& index,
73-
ServiceRegistryRef services)
75+
ServiceRegistryRef services,
76+
int pipelineLength)
7477
: mContext{services},
7578
mTimesliceIndex{index},
7679
mCompletionPolicy{policy},
@@ -81,7 +84,17 @@ DataRelayer::DataRelayer(const CompletionPolicy& policy,
8184
std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
8285

8386
if (policy.configureRelayer == nullptr) {
84-
static int pipelineLength = DefaultsHelpers::pipelineLength();
87+
if (pipelineLength == -1) {
88+
auto getPipelineLengthHelper = [&services]() {
89+
try {
90+
return DefaultsHelpers::pipelineLength(*services.get<RawDeviceService>().device()->fConfig);
91+
} catch (...) {
92+
return DefaultsHelpers::pipelineLength(0);
93+
}
94+
};
95+
static int detectedPipelineLength = getPipelineLengthHelper();
96+
pipelineLength = detectedPipelineLength;
97+
}
8598
setPipelineLength(pipelineLength);
8699
} else {
87100
policy.configureRelayer(*this);

Framework/Core/src/DefaultsHelpers.cxx

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,30 +11,45 @@
1111

1212
#include "Framework/DefaultsHelpers.h"
1313
#include "Framework/DataTakingContext.h"
14+
#include "Framework/DeviceConfig.h"
15+
#include <fairmq/ProgOptions.h>
16+
1417
#include <cstdlib>
1518
#include <cstring>
1619
#include <stdexcept>
1720

1821
namespace o2::framework
1922
{
2023

21-
unsigned int DefaultsHelpers::pipelineLength()
24+
unsigned int DefaultsHelpers::pipelineLength(unsigned int minLength)
2225
{
2326
static bool override = getenv("DPL_DEFAULT_PIPELINE_LENGTH");
2427
if (override) {
2528
static unsigned int retval = atoi(getenv("DPL_DEFAULT_PIPELINE_LENGTH"));
26-
return retval;
29+
return std::max(minLength, retval);
2730
}
2831
DeploymentMode deploymentMode = DefaultsHelpers::deploymentMode();
2932
// just some reasonable numers
3033
// The number should really be tuned at runtime for each processor.
3134
if (deploymentMode == DeploymentMode::OnlineDDS || deploymentMode == DeploymentMode::OnlineECS || deploymentMode == DeploymentMode::FST) {
32-
return 512;
35+
return std::max(minLength, 512u);
3336
} else {
34-
return 64;
37+
return std::max(minLength, 64u);
3538
}
3639
}
3740

41+
unsigned int DefaultsHelpers::pipelineLength(const DeviceConfig& dc)
42+
{
43+
static unsigned int minLength = dc.options.count("timeframes-rate-limit") ? std::max(0, atoi(dc.options["timeframes-rate-limit"].as<std::string>().c_str())) : 0;
44+
return pipelineLength(minLength);
45+
}
46+
47+
unsigned int DefaultsHelpers::pipelineLength(const fair::mq::ProgOptions& options)
48+
{
49+
static unsigned int minLength = options.Count("timeframes-rate-limit") ? std::max(0, atoi(options.GetValue<std::string>("timeframes-rate-limit").c_str())) : 0;
50+
return pipelineLength(minLength);
51+
}
52+
3853
static DeploymentMode getDeploymentMode_internal()
3954
{
4055
char* explicitMode = getenv("O2_DPL_DEPLOYMENT_MODE");

Framework/Core/src/runDataProcessing.cxx

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -817,7 +817,8 @@ void spawnDevice(uv_loop_t* loop,
817817
.sendInitialValue = true,
818818
});
819819

820-
for (size_t i = 0; i < DefaultsHelpers::pipelineLength(); ++i) {
820+
unsigned int pipelineLength = DefaultsHelpers::pipelineLength(DeviceConfig{varmap});
821+
for (size_t i = 0; i < pipelineLength; ++i) {
821822
allStates.back().registerState(DataProcessingStates::StateSpec{
822823
.name = fmt::format("matcher_variables/{}", i),
823824
.stateId = static_cast<short>((short)(ProcessingStateId::CONTEXT_VARIABLES_BASE) + i),
@@ -826,7 +827,7 @@ void spawnDevice(uv_loop_t* loop,
826827
});
827828
}
828829

829-
for (size_t i = 0; i < DefaultsHelpers::pipelineLength(); ++i) {
830+
for (size_t i = 0; i < pipelineLength; ++i) {
830831
allStates.back().registerState(DataProcessingStates::StateSpec{
831832
.name = fmt::format("data_relayer/{}", i),
832833
.stateId = static_cast<short>((short)(ProcessingStateId::DATA_RELAYER_BASE) + i),

Framework/Core/test/benchmark_DataRelayer.cxx

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ static void BM_RelaySingleSlot(benchmark::State& state)
6565
TimesliceIndex index{1, infos};
6666
auto policy = CompletionPolicyHelpers::consumeWhenAny();
6767
ServiceRegistry registry;
68-
DataRelayer relayer(policy, inputs, index, {registry});
68+
DataRelayer relayer(policy, inputs, index, {registry}, -1);
6969
relayer.setPipelineLength(4);
7070

7171
// Let's create a dummy O2 Message with two headers in the stack:
@@ -118,7 +118,7 @@ static void BM_RelayMultipleSlots(benchmark::State& state)
118118

119119
auto policy = CompletionPolicyHelpers::consumeWhenAny();
120120
ServiceRegistry registry;
121-
DataRelayer relayer(policy, inputs, index, {registry});
121+
DataRelayer relayer(policy, inputs, index, {registry}, -1);
122122
relayer.setPipelineLength(4);
123123

124124
// Let's create a dummy O2 Message with two headers in the stack:
@@ -177,7 +177,7 @@ static void BM_RelayMultipleRoutes(benchmark::State& state)
177177

178178
auto policy = CompletionPolicyHelpers::consumeWhenAny();
179179
ServiceRegistry registry;
180-
DataRelayer relayer(policy, inputs, index, {registry});
180+
DataRelayer relayer(policy, inputs, index, {registry}, -1);
181181
relayer.setPipelineLength(4);
182182

183183
// Let's create a dummy O2 Message with two headers in the stack:
@@ -254,7 +254,7 @@ static void BM_RelaySplitParts(benchmark::State& state)
254254

255255
auto policy = CompletionPolicyHelpers::consumeWhenAny();
256256
ServiceRegistry registry;
257-
DataRelayer relayer(policy, inputs, index, {registry});
257+
DataRelayer relayer(policy, inputs, index, {registry}, -1);
258258
relayer.setPipelineLength(4);
259259

260260
// Let's create a dummy O2 Message with two headers in the stack:
@@ -314,7 +314,7 @@ static void BM_RelayMultiplePayloads(benchmark::State& state)
314314

315315
auto policy = CompletionPolicyHelpers::consumeWhenAny();
316316
ServiceRegistry registry;
317-
DataRelayer relayer(policy, inputs, index, {registry});
317+
DataRelayer relayer(policy, inputs, index, {registry}, -1);
318318
relayer.setPipelineLength(4);
319319

320320
// DataHeader matching the one provided in the input

Framework/Core/test/test_DataRelayer.cxx

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ TEST_CASE("DataRelayer")
8383
ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
8484

8585
auto policy = CompletionPolicyHelpers::consumeWhenAny();
86-
DataRelayer relayer(policy, inputs, index, {registry});
86+
DataRelayer relayer(policy, inputs, index, {registry}, -1);
8787
relayer.setPipelineLength(4);
8888

8989
// Let's create a dummy O2 Message with two headers in the stack:
@@ -133,7 +133,7 @@ TEST_CASE("DataRelayer")
133133
ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
134134

135135
auto policy = CompletionPolicyHelpers::consumeWhenAny();
136-
DataRelayer relayer(policy, inputs, index, {registry});
136+
DataRelayer relayer(policy, inputs, index, {registry}, -1);
137137
relayer.setPipelineLength(4);
138138

139139
// Let's create a dummy O2 Message with two headers in the stack:
@@ -195,7 +195,7 @@ TEST_CASE("DataRelayer")
195195
ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
196196

197197
auto policy = CompletionPolicyHelpers::consumeWhenAll();
198-
DataRelayer relayer(policy, inputs, index, {registry});
198+
DataRelayer relayer(policy, inputs, index, {registry}, -1);
199199
relayer.setPipelineLength(4);
200200

201201
auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
@@ -276,7 +276,7 @@ TEST_CASE("DataRelayer")
276276
ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
277277

278278
auto policy = CompletionPolicyHelpers::consumeWhenAll();
279-
DataRelayer relayer(policy, inputs, index, {registry});
279+
DataRelayer relayer(policy, inputs, index, {registry}, -1);
280280
relayer.setPipelineLength(3);
281281

282282
auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
@@ -359,7 +359,7 @@ TEST_CASE("DataRelayer")
359359
std::vector<InputChannelInfo> infos{1};
360360
TimesliceIndex index{1, infos};
361361
ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
362-
DataRelayer relayer(policy, inputs, index, {registry});
362+
DataRelayer relayer(policy, inputs, index, {registry}, -1);
363363
// Only two messages to fill the cache.
364364
relayer.setPipelineLength(2);
365365

@@ -437,7 +437,7 @@ TEST_CASE("DataRelayer")
437437
ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
438438

439439
auto policy = CompletionPolicyHelpers::processWhenAny();
440-
DataRelayer relayer(policy, inputs, index, {registry});
440+
DataRelayer relayer(policy, inputs, index, {registry}, -1);
441441
// Only two messages to fill the cache.
442442
relayer.setPipelineLength(2);
443443

@@ -509,7 +509,7 @@ TEST_CASE("DataRelayer")
509509
ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
510510

511511
auto policy = CompletionPolicyHelpers::processWhenAny();
512-
DataRelayer relayer(policy, inputs, index, {registry});
512+
DataRelayer relayer(policy, inputs, index, {registry}, -1);
513513
// Only two messages to fill the cache.
514514
relayer.setPipelineLength(3);
515515

@@ -568,7 +568,7 @@ TEST_CASE("DataRelayer")
568568
ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
569569

570570
auto policy = CompletionPolicyHelpers::processWhenAny();
571-
DataRelayer relayer(policy, inputs, index, {registry});
571+
DataRelayer relayer(policy, inputs, index, {registry}, -1);
572572
// Only two messages to fill the cache.
573573
relayer.setPipelineLength(1);
574574

@@ -629,7 +629,7 @@ TEST_CASE("DataRelayer")
629629
ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
630630

631631
auto policy = CompletionPolicyHelpers::processWhenAny();
632-
DataRelayer relayer(policy, inputs, index, {registry});
632+
DataRelayer relayer(policy, inputs, index, {registry}, -1);
633633
// Only two messages to fill the cache.
634634
relayer.setPipelineLength(1);
635635

@@ -698,7 +698,7 @@ TEST_CASE("DataRelayer")
698698
ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
699699

700700
auto policy = CompletionPolicyHelpers::consumeWhenAny();
701-
DataRelayer relayer(policy, inputs, index, {registry});
701+
DataRelayer relayer(policy, inputs, index, {registry}, -1);
702702
relayer.setPipelineLength(4);
703703

704704
DataHeader dh{"CLUSTERS", "TPC", 0};
@@ -752,7 +752,7 @@ TEST_CASE("DataRelayer")
752752
ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
753753

754754
auto policy = CompletionPolicyHelpers::consumeWhenAny();
755-
DataRelayer relayer(policy, inputs, index, {registry});
755+
DataRelayer relayer(policy, inputs, index, {registry}, -1);
756756
relayer.setPipelineLength(4);
757757

758758
auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");

0 commit comments

Comments
 (0)