@@ -602,141 +602,142 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args)
602602 // replace origins in Preslice declarations
603603 homogeneous_apply_refs_sized<numElements>([&newOrigin](auto & element) { return analysis_task_parsers::replaceOrigin (element, newOrigin); }, *task.get ());
604604
605- auto algo = AlgorithmSpec::InitCallback{[task = task, expressionInfos, inputInfos, newOrigin, newOriginStr](InitContext& ic) mutable {
606- Cache bindingsKeys;
607- Cache bindingsKeysUnsorted;
608- // add preslice declarations to slicing cache definition
609- homogeneous_apply_refs_sized<numElements>([&bindingsKeys, &bindingsKeysUnsorted](auto & element) { return analysis_task_parsers::registerCache (element, bindingsKeys, bindingsKeysUnsorted); }, *task.get ());
610-
611- homogeneous_apply_refs_sized<numElements>([&ic](auto && element) { return analysis_task_parsers::prepareOption (ic, element); }, *task.get ());
612- homogeneous_apply_refs_sized<numElements>([&ic](auto && element) { return analysis_task_parsers::prepareService (ic, element); }, *task.get ());
613-
614- auto & callbacks = ic.services ().get <CallbackService>();
615- auto eoscb = [task](EndOfStreamContext& eosContext) {
616- homogeneous_apply_refs_sized<numElements>([&eosContext](auto & element) {
605+ auto algo = AlgorithmSpec::InitCallback{
606+ [task = task, expressionInfos, inputInfos, newOrigin, newOriginStr](InitContext& ic) mutable {
607+ Cache bindingsKeys;
608+ Cache bindingsKeysUnsorted;
609+ // add preslice declarations to slicing cache definition
610+ homogeneous_apply_refs_sized<numElements>([&bindingsKeys, &bindingsKeysUnsorted](auto & element) { return analysis_task_parsers::registerCache (element, bindingsKeys, bindingsKeysUnsorted); }, *task.get ());
611+
612+ homogeneous_apply_refs_sized<numElements>([&ic](auto && element) { return analysis_task_parsers::prepareOption (ic, element); }, *task.get ());
613+ homogeneous_apply_refs_sized<numElements>([&ic](auto && element) { return analysis_task_parsers::prepareService (ic, element); }, *task.get ());
614+
615+ auto & callbacks = ic.services ().get <CallbackService>();
616+ auto eoscb = [task](EndOfStreamContext& eosContext) {
617+ homogeneous_apply_refs_sized<numElements>([&eosContext](auto & element) {
617618 analysis_task_parsers::postRunService (eosContext, element);
618619 analysis_task_parsers::postRunOutput (eosContext, element);
619620 return true ; },
620- *task.get ());
621- eosContext.services ().get <ControlService>().readyToQuit (QuitRequest::Me);
622- };
621+ *task.get ());
622+ eosContext.services ().get <ControlService>().readyToQuit (QuitRequest::Me);
623+ };
623624
624- callbacks.set <CallbackService::Id::EndOfStream>(eoscb);
625+ callbacks.set <CallbackService::Id::EndOfStream>(eoscb);
625626
626- // / call the task's init() function first as it may manipulate the task's elements
627- if constexpr (requires { task->init (ic); }) {
628- task->init (ic);
629- }
630-
631- // / update configurables in filters and partitions
632- homogeneous_apply_refs_sized<numElements>(
633- [&ic](auto & element) -> bool { return analysis_task_parsers::updatePlaceholders (ic, element); },
634- *task.get ());
635- // / create expression trees for filters gandiva trees matched to schemas and store the pointers into expressionInfos
636- homogeneous_apply_refs_sized<numElements>([&expressionInfos](auto & element) {
637- return analysis_task_parsers::createExpressionTrees (expressionInfos, element);
638- },
639- *task.get ());
627+ // / call the task's init() function first as it may manipulate the task's elements
628+ if constexpr (requires { task->init (ic); }) {
629+ task->init (ic);
630+ }
640631
641- // / parse process functions to enable requested grouping caches - note that at this state process configurables have their final values
642- if constexpr ( requires { &T::process; }) {
643- AnalysisDataProcessorBuilder::cacheFromArgs (&T::process, true , bindingsKeys, bindingsKeysUnsorted);
644- }
645- homogeneous_apply_refs_sized<numElements>(
646- [&bindingsKeys, &bindingsKeysUnsorted ](auto & x ) {
647- return AnalysisDataProcessorBuilder::requestCacheFromArgs (x, bindingsKeys, bindingsKeysUnsorted );
632+ // / update configurables in filters and partitions
633+ homogeneous_apply_refs_sized<numElements>(
634+ [&ic]( auto & element) -> bool { return analysis_task_parsers::updatePlaceholders (ic, element); },
635+ *task. get ());
636+ // / create expression trees for filters gandiva trees matched to schemas and store the pointers into expressionInfos
637+ homogeneous_apply_refs_sized<numElements>([&expressionInfos ](auto & element ) {
638+ return analysis_task_parsers::createExpressionTrees (expressionInfos, element );
648639 },
649- *task.get ());
640+ *task.get ());
650641
651- // / replace origin in slicing caches
652- std::ranges::transform (bindingsKeys, bindingsKeys.begin (), [&newOrigin](Entry& entry) {
653- if ((entry.matcher .origin == header::DataOrigin{" AOD" }) && (newOrigin != header::DataOrigin{" AOD" })) {
654- entry.matcher = replaceOrigin (entry.matcher , newOrigin);
655- }
656- return entry;
657- });
658- std::ranges::transform (bindingsKeysUnsorted, bindingsKeysUnsorted.begin (), [&newOrigin](Entry& entry) {
659- if ((entry.matcher .origin == header::DataOrigin{" AOD" }) && (newOrigin != header::DataOrigin{" AOD" })) {
660- entry.matcher = replaceOrigin (entry.matcher , newOrigin);
642+ // / parse process functions to enable requested grouping caches - note that at this state process configurables have their final values
643+ if constexpr (requires { &T::process; }) {
644+ AnalysisDataProcessorBuilder::cacheFromArgs (&T::process, true , bindingsKeys, bindingsKeysUnsorted);
661645 }
662- return entry;
663- });
646+ homogeneous_apply_refs_sized<numElements>(
647+ [&bindingsKeys, &bindingsKeysUnsorted](auto & x) {
648+ return AnalysisDataProcessorBuilder::requestCacheFromArgs (x, bindingsKeys, bindingsKeysUnsorted);
649+ },
650+ *task.get ());
664651
665- ic.services ().get <ArrowTableSlicingCacheDef>().setCaches (std::move (bindingsKeys));
666- ic.services ().get <ArrowTableSlicingCacheDef>().setCachesUnsorted (std::move (bindingsKeysUnsorted));
667- ic.services ().get <ArrowTableSlicingCacheDef>().setOrigin (newOrigin);
652+ // / replace origin in slicing caches
653+ std::ranges::transform (bindingsKeys, bindingsKeys.begin (), [&newOrigin](Entry& entry) {
654+ if ((entry.matcher .origin == header::DataOrigin{" AOD" }) && (newOrigin != header::DataOrigin{" AOD" })) {
655+ entry.matcher = replaceOrigin (entry.matcher , newOrigin);
656+ }
657+ return entry;
658+ });
659+ std::ranges::transform (bindingsKeysUnsorted, bindingsKeysUnsorted.begin (), [&newOrigin](Entry& entry) {
660+ if ((entry.matcher .origin == header::DataOrigin{" AOD" }) && (newOrigin != header::DataOrigin{" AOD" })) {
661+ entry.matcher = replaceOrigin (entry.matcher , newOrigin);
662+ }
663+ return entry;
664+ });
665+
666+ ic.services ().get <ArrowTableSlicingCacheDef>().setCaches (std::move (bindingsKeys));
667+ ic.services ().get <ArrowTableSlicingCacheDef>().setCachesUnsorted (std::move (bindingsKeysUnsorted));
668+ ic.services ().get <ArrowTableSlicingCacheDef>().setOrigin (newOrigin);
668669#if (FAIRMQ_VERSION_DEC >= 111000)
669- PointerReconstructor pointerReconstructor (nullptr );
670- bool hasCCDBTables = !ic.services ().get <DanglingEdgesContext>().requestedTIMs .empty ();
671-
672- return [task, expressionInfos, inputInfos, newOrigin, hasCCDBTables, pointerReconstructor](ProcessingContext& pc) mutable {
673- if (hasCCDBTables && (!pointerReconstructor)) {
674- auto & proxy = pc.services ().get <FairMQDeviceProxy>();
675- auto & spec = pc.services ().get <DanglingEdgesContext>().requestedTIMs .front ();
676- pointerReconstructor = proxy.getShmPointerReconstructor (spec, 0 );
677- }
670+ PointerReconstructor pointerReconstructor (nullptr );
671+ bool hasCCDBTables = !ic.services ().get <DanglingEdgesContext>().requestedTIMs .empty ();
672+
673+ return [task, expressionInfos, inputInfos, newOrigin, hasCCDBTables, pointerReconstructor](ProcessingContext& pc) mutable {
674+ if (hasCCDBTables && (!pointerReconstructor)) {
675+ auto & proxy = pc.services ().get <FairMQDeviceProxy>();
676+ auto & spec = pc.services ().get <DanglingEdgesContext>().requestedTIMs .front ();
677+ pointerReconstructor = proxy.getShmPointerReconstructor (spec, 0 );
678+ }
678679#else
679- return [task, expressionInfos, inputInfos, newOrigin, hasCCDBTables](ProcessingContext& pc) mutable {
680+ return [task, expressionInfos, inputInfos, newOrigin, hasCCDBTables](ProcessingContext& pc) mutable {
680681#endif
681- // load the ccdb object from their cache
682- homogeneous_apply_refs_sized<numElements>([&pc](auto & element) { return analysis_task_parsers::newDataframeCondition (pc.inputs (), element); }, *task.get ());
683- // reset partitions once per dataframe
684- homogeneous_apply_refs_sized<numElements>([](auto & element) { return analysis_task_parsers::newDataframePartition (element); }, *task.get ());
685- // reset selections for the next dataframe
686- std::ranges::for_each (expressionInfos, [](auto & info) { info.resetSelection = true ; });
687- // reset pre-slice for the next dataframe
688- auto & slices = pc.services ().get <ArrowTableSlicingCache>();
689- homogeneous_apply_refs_sized<numElements>([&slices](auto & element) {
690- return analysis_task_parsers::updateSliceInfo (element, slices);
691- },
692- *(task.get ()));
693- // initialize local caches
694- homogeneous_apply_refs_sized<numElements>([&pc](auto & element) { return analysis_task_parsers::initializeCache (pc, element); }, *(task.get ()));
695- // prepare outputs
696- homogeneous_apply_refs_sized<numElements>([&pc](auto & element) { return analysis_task_parsers::prepareOutput (pc, element); }, *task.get ());
697- // execute run()
698- if constexpr (requires { task->run (pc); }) {
699- task->run (pc);
700- }
701- // execute process()
702- if constexpr (requires { &T::process; }) {
703- auto loc = std::ranges::find_if (inputInfos, [](auto const & info) { return info.hash == o2::framework::TypeIdHelpers::uniqueId<decltype (&T::process)>(); });
704- auto matchers = loc == inputInfos.end () ? std::vector<std::pair<int , ConcreteDataMatcher>>{} : loc->matchers ;
682+ // load the ccdb object from their cache
683+ homogeneous_apply_refs_sized<numElements>([&pc](auto & element) { return analysis_task_parsers::newDataframeCondition (pc.inputs (), element); }, *task.get ());
684+ // reset partitions once per dataframe
685+ homogeneous_apply_refs_sized<numElements>([](auto & element) { return analysis_task_parsers::newDataframePartition (element); }, *task.get ());
686+ // reset selections for the next dataframe
687+ std::ranges::for_each (expressionInfos, [](auto & info) { info.resetSelection = true ; });
688+ // reset pre-slice for the next dataframe
689+ auto & slices = pc.services ().get <ArrowTableSlicingCache>();
690+ homogeneous_apply_refs_sized<numElements>([&slices](auto & element) {
691+ return analysis_task_parsers::updateSliceInfo (element, slices);
692+ },
693+ *(task.get ()));
694+ // initialize local caches
695+ homogeneous_apply_refs_sized<numElements>([&pc](auto & element) { return analysis_task_parsers::initializeCache (pc, element); }, *(task.get ()));
696+ // prepare outputs
697+ homogeneous_apply_refs_sized<numElements>([&pc](auto & element) { return analysis_task_parsers::prepareOutput (pc, element); }, *task.get ());
698+ // execute run()
699+ if constexpr (requires { task->run (pc); }) {
700+ task->run (pc);
701+ }
702+ // execute process()
703+ if constexpr (requires { &T::process; }) {
704+ auto loc = std::ranges::find_if (inputInfos, [](auto const & info) { return info.hash == o2::framework::TypeIdHelpers::uniqueId<decltype (&T::process)>(); });
705+ auto matchers = loc == inputInfos.end () ? std::vector<std::pair<int , ConcreteDataMatcher>>{} : loc->matchers ;
705706#if (FAIRMQ_VERSION_DEC >= 111000)
706- AnalysisDataProcessorBuilder::invokeProcess (*(task.get ()), pc.inputs (), matchers, pointerReconstructor, &T::process, expressionInfos, slices, newOrigin);
707+ AnalysisDataProcessorBuilder::invokeProcess (*(task.get ()), pc.inputs (), matchers, pointerReconstructor, &T::process, expressionInfos, slices, newOrigin);
707708#else
708- AnalysisDataProcessorBuilder::invokeProcess (*(task.get ()), pc.inputs (), matchers, &T::process, expressionInfos, slices, newOrigin);
709+ AnalysisDataProcessorBuilder::invokeProcess (*(task.get ()), pc.inputs (), matchers, &T::process, expressionInfos, slices, newOrigin);
709710#endif
710- }
711- // execute optional process()
712- homogeneous_apply_refs_sized<numElements>(
711+ }
712+ // execute optional process()
713+ homogeneous_apply_refs_sized<numElements>(
713714#if (FAIRMQ_VERSION_DEC >= 111000)
714- [&pc, &expressionInfos, &task, &slices, &inputInfos, &newOrigin, &pointerReconstructor](auto & x) {
715+ [&pc, &expressionInfos, &task, &slices, &inputInfos, &newOrigin, &pointerReconstructor](auto & x) {
715716#else
716- [&pc, &expressionInfos, &task, &slices, &inputInfos, &newOrigin](auto & x) {
717+ [&pc, &expressionInfos, &task, &slices, &inputInfos, &newOrigin](auto & x) {
717718#endif
718- if constexpr (is_process_configurable<decltype (x)>) {
719- if (x.value == true ) {
720- auto loc = std::ranges::find_if (inputInfos, [](auto const & info) { return info.hash == o2::framework::TypeIdHelpers::uniqueId<decltype (x.process )>(); });
721- auto matchers = loc == inputInfos.end () ? std::vector<std::pair<int , ConcreteDataMatcher>>{} : loc->matchers ;
719+ if constexpr (is_process_configurable<decltype (x)>) {
720+ if (x.value == true ) {
721+ auto loc = std::ranges::find_if (inputInfos, [](auto const & info) { return info.hash == o2::framework::TypeIdHelpers::uniqueId<decltype (x.process )>(); });
722+ auto matchers = loc == inputInfos.end () ? std::vector<std::pair<int , ConcreteDataMatcher>>{} : loc->matchers ;
722723#if (FAIRMQ_VERSION_DEC >= 111000)
723- AnalysisDataProcessorBuilder::invokeProcess (*task.get (), pc.inputs (), matchers, pointerReconstructor, x.process , expressionInfos, slices, newOrigin);
724+ AnalysisDataProcessorBuilder::invokeProcess (*task.get (), pc.inputs (), matchers, pointerReconstructor, x.process , expressionInfos, slices, newOrigin);
724725#else
725- AnalysisDataProcessorBuilder::invokeProcess (*task.get (), pc.inputs (), matchers, x.process , expressionInfos, slices, newOrigin);
726+ AnalysisDataProcessorBuilder::invokeProcess (*task.get (), pc.inputs (), matchers, x.process , expressionInfos, slices, newOrigin);
726727#endif
727- return true ;
728+ return true ;
729+ }
730+ return false ;
728731 }
729732 return false ;
730- }
731- return false ;
732- },
733- *task.get ());
734- // prepare delayed outputs
735- homogeneous_apply_refs_sized<numElements>([&pc](auto & element) { return analysis_task_parsers::prepareDelayedOutput (pc, element); }, *task.get ());
736- // finalize outputs
737- homogeneous_apply_refs_sized<numElements>([&pc](auto & element) { return analysis_task_parsers::finalizeOutput (pc, element); }, *task.get ());
738- };
739- }
733+ },
734+ *task.get ());
735+ // prepare delayed outputs
736+ homogeneous_apply_refs_sized<numElements>([&pc](auto & element) { return analysis_task_parsers::prepareDelayedOutput (pc, element); }, *task.get ());
737+ // finalize outputs
738+ homogeneous_apply_refs_sized<numElements>([&pc](auto & element) { return analysis_task_parsers::finalizeOutput (pc, element); }, *task.get ());
739+ };
740+ }
740741 };
741742
742743 return {
0 commit comments