From 592ff644f6bf6eb79493209005c2fc415c93ebe1 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Mon, 9 Feb 2026 01:48:57 -0800 Subject: [PATCH 01/28] init --- .../controller/WorkflowScheduler.scala | 7 +- .../execution/RegionExecution.scala | 2 - .../execution/WorkflowExecution.scala | 6 +- .../architecture/scheduling/Schedule.scala | 9 +++ .../promisehandlers/AssignPortHandler.scala | 3 +- .../InitializeExecutorHandler.scala | 1 + .../workflow/WorkflowExecutionsResource.scala | 2 + .../texera/amber/operator/LogicalOp.scala | 31 +++------ .../amber/operator/loop/LoopEndOpDesc.scala | 53 +++++++++++++++ .../amber/operator/loop/LoopEndOpExec.scala | 8 +++ .../amber/operator/loop/LoopStartOpDesc.scala | 62 ++++++++++++++++++ .../amber/operator/loop/LoopStartOpExec.scala | 44 +++++++++++++ .../workflow-editor.component.ts | 1 - .../src/assets/operator_images/LoopEnd.png | Bin 0 -> 5865 bytes .../src/assets/operator_images/LoopStart.png | Bin 0 -> 2138 bytes 15 files changed, 195 insertions(+), 34 deletions(-) create mode 100644 common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala create mode 100644 common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpExec.scala create mode 100644 common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala create mode 100644 common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpExec.scala create mode 100644 frontend/src/assets/operator_images/LoopEnd.png create mode 100644 frontend/src/assets/operator_images/LoopStart.png diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala index 9dcf3ad4bfc..e2239f99a9b 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala @@ -52,6 +52,9 @@ class WorkflowScheduler( this.physicalPlan = updatedPhysicalPlan } - def getNextRegions: Set[Region] = if (!schedule.hasNext) Set() else schedule.next() - + def getNextRegions: Set[Region] = { + val region : Set[Region] = if (!schedule.hasNext) Set() else schedule.loopNext() + println("current Region: " + region) + region + } } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecution.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecution.scala index d5939c2e3b1..e905c2b0449 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecution.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecution.scala @@ -59,8 +59,6 @@ case class RegionExecution(region: Region) { physicalOpId: PhysicalOpIdentity, inheritOperatorExecution: Option[OperatorExecution] = None ): OperatorExecution = { - assert(!operatorExecutions.contains(physicalOpId), "OperatorExecution already exists.") - operatorExecutions.getOrElseUpdate( physicalOpId, inheritOperatorExecution diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala index dea9b692a4f..31409b180af 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala @@ -44,11 +44,7 @@ case class WorkflowExecution() { * @throws AssertionError if the `RegionExecution` has already been initialized. */ def initRegionExecution(region: Region): RegionExecution = { - // ensure the region execution hasn't been initialized already. - assert( - !regionExecutions.contains(region.id), - s"RegionExecution of ${region.id} already initialized." - ) + regionExecutions.remove(region.id) regionExecutions.getOrElseUpdate(region.id, RegionExecution(region)) } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala index 6f34c9ed1e5..d0ba5268091 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala @@ -21,6 +21,7 @@ package org.apache.texera.amber.engine.architecture.scheduling case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends Iterator[Set[Region]] { private var currentLevel = levelSets.keys.minOption.getOrElse(0) + private var loopStartLevel = currentLevel def getRegions: List[Region] = levelSets.values.flatten.toList @@ -28,6 +29,14 @@ case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends Iterat override def next(): Set[Region] = { val regions = levelSets(currentLevel) + if(regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopStart-operator-")))) loopStartLevel = currentLevel + currentLevel += 1 + regions + } + + def loopNext(): Set[Region] = { + val regions = levelSets(currentLevel) + if(regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopEnd-operator-")))) currentLevel = loopStartLevel currentLevel += 1 regions } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala index fe959733abb..1cc725dff83 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala @@ -41,6 +41,7 @@ trait AssignPortHandler { this: DataProcessorRPCHandlerInitializer => override def assignPort(msg: AssignPortRequest, ctx: AsyncRPCContext): Future[EmptyReturn] = { + println("ergergerge") val schema = Schema.fromRawSchema(msg.schema) if (msg.input) { val inputPortURIStrs = msg.storageUris.toList @@ -55,7 +56,7 @@ trait AssignPortHandler { // Same as AddInputChannelHandler dp.inputGateway.getChannel(channelId).setPortId(msg.portId) dp.inputManager.getPort(msg.portId).channels.add(channelId) - dp.stateManager.assertState(READY, RUNNING, PAUSED) + //dp.stateManager.assertState(READY, RUNNING, PAUSED) } } else { val storageURIOption: Option[URI] = msg.storageUris.head match { diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala index bf45d8eff9a..cc1a32594b6 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala @@ -39,6 +39,7 @@ trait InitializeExecutorHandler { req: InitializeExecutorRequest, ctx: AsyncRPCContext ): Future[EmptyReturn] = { + println(s"Initializing executor with request: $req") dp.serializationManager.setOpInitialization(req) val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId) val workerCount = req.totalWorkerCount diff --git a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala index 72fb1c364e5..92582afdd2b 100644 --- a/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala +++ b/amber/src/main/scala/org/apache/texera/web/resource/dashboard/user/workflow/WorkflowExecutionsResource.scala @@ -247,6 +247,8 @@ object WorkflowExecutionsResource { OPERATOR_PORT_EXECUTIONS.RESULT_URI ) .values(eid.id.toInt, globalPortId.serializeAsString, uri.toString) + .onConflict() + .doNothing() .execute() } diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala index eb319a82d1d..ee57514212b 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala @@ -24,15 +24,8 @@ import com.fasterxml.jackson.annotation._ import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle import org.apache.texera.amber.core.executor.OperatorExecutor import org.apache.texera.amber.core.tuple.Schema -import org.apache.texera.amber.core.virtualidentity.{ - ExecutionIdentity, - OperatorIdentity, - WorkflowIdentity -} -import org.apache.texera.amber.core.workflow.WorkflowContext.{ - DEFAULT_EXECUTION_ID, - DEFAULT_WORKFLOW_ID -} +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, OperatorIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.WorkflowContext.{DEFAULT_EXECUTION_ID, DEFAULT_WORKFLOW_ID} import org.apache.texera.amber.core.workflow.{PhysicalOp, PhysicalPlan, PortIdentity} import org.apache.texera.amber.operator.aggregate.AggregateOpDesc import org.apache.texera.amber.operator.cartesianProduct.CartesianProductOpDesc @@ -42,22 +35,14 @@ import org.apache.texera.amber.operator.distinct.DistinctOpDesc import org.apache.texera.amber.operator.dummy.DummyOpDesc import org.apache.texera.amber.operator.filter.SpecializedFilterOpDesc import org.apache.texera.amber.operator.hashJoin.HashJoinOpDesc -import org.apache.texera.amber.operator.huggingFace.{ - HuggingFaceIrisLogisticRegressionOpDesc, - HuggingFaceSentimentAnalysisOpDesc, - HuggingFaceSpamSMSDetectionOpDesc, - HuggingFaceTextSummarizationOpDesc -} +import org.apache.texera.amber.operator.huggingFace.{HuggingFaceIrisLogisticRegressionOpDesc, HuggingFaceSentimentAnalysisOpDesc, HuggingFaceSpamSMSDetectionOpDesc, HuggingFaceTextSummarizationOpDesc} import org.apache.texera.amber.operator.ifStatement.IfOpDesc import org.apache.texera.amber.operator.intersect.IntersectOpDesc import org.apache.texera.amber.operator.intervalJoin.IntervalJoinOpDesc import org.apache.texera.amber.operator.keywordSearch.KeywordSearchOpDesc import org.apache.texera.amber.operator.limit.LimitOpDesc import org.apache.texera.amber.operator.machineLearning.Scorer.MachineLearningScorerOpDesc -import org.apache.texera.amber.operator.machineLearning.sklearnAdvanced.KNNTrainer.{ - SklearnAdvancedKNNClassifierTrainerOpDesc, - SklearnAdvancedKNNRegressorTrainerOpDesc -} +import org.apache.texera.amber.operator.machineLearning.sklearnAdvanced.KNNTrainer.{SklearnAdvancedKNNClassifierTrainerOpDesc, SklearnAdvancedKNNRegressorTrainerOpDesc} import org.apache.texera.amber.operator.machineLearning.sklearnAdvanced.SVCTrainer.SklearnAdvancedSVCTrainerOpDesc import org.apache.texera.amber.operator.machineLearning.sklearnAdvanced.SVRTrainer.SklearnAdvancedSVRTrainerOpDesc import org.apache.texera.amber.operator.metadata.{OPVersion, OperatorInfo, PropertyNameConstants} @@ -71,10 +56,7 @@ import org.apache.texera.amber.operator.sleep.SleepOpDesc import org.apache.texera.amber.operator.sort.{SortOpDesc, StableMergeSortOpDesc} import org.apache.texera.amber.operator.sortPartitions.SortPartitionsOpDesc import org.apache.texera.amber.operator.source.apis.reddit.RedditSearchSourceOpDesc -import org.apache.texera.amber.operator.source.apis.twitter.v2.{ - TwitterFullArchiveSearchSourceOpDesc, - TwitterSearchSourceOpDesc -} +import org.apache.texera.amber.operator.source.apis.twitter.v2.{TwitterFullArchiveSearchSourceOpDesc, TwitterSearchSourceOpDesc} import org.apache.texera.amber.operator.source.fetcher.URLFetcherOpDesc import org.apache.texera.amber.operator.source.scan.FileScanSourceOpDesc import org.apache.texera.amber.operator.source.scan.arrow.ArrowSourceOpDesc @@ -137,6 +119,7 @@ import org.apache.texera.amber.operator.visualization.volcanoPlot.VolcanoPlotOpD import org.apache.texera.amber.operator.visualization.waterfallChart.WaterfallChartOpDesc import org.apache.texera.amber.operator.visualization.wordCloud.WordCloudOpDesc import org.apache.commons.lang3.builder.{EqualsBuilder, HashCodeBuilder, ToStringBuilder} +import org.apache.texera.amber.operator.loop.{LoopEndOpDesc, LoopStartOpDesc} import org.apache.texera.amber.operator.visualization.stripChart.StripChartOpDesc import java.util.UUID @@ -202,6 +185,8 @@ trait StateTransferFunc new Type(value = classOf[TypeCastingOpDesc], name = "TypeCasting"), new Type(value = classOf[LimitOpDesc], name = "Limit"), new Type(value = classOf[SleepOpDesc], name = "Sleep"), + new Type(value = classOf[LoopStartOpDesc], name = "LoopStart"), + new Type(value = classOf[LoopEndOpDesc], name = "LoopEnd"), new Type(value = classOf[RandomKSamplingOpDesc], name = "RandomKSampling"), new Type(value = classOf[ReservoirSamplingOpDesc], name = "ReservoirSampling"), new Type(value = classOf[HashJoinOpDesc[String]], name = "HashJoin"), diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala new file mode 100644 index 00000000000..f56068e9036 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.loop + +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.{InputPort, OutputPort, PhysicalOp} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} + +class LoopEndOpDesc extends LogicalOp { + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = { + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithClassName("org.apache.texera.amber.operator.loop.LoopEndOpExec") + ) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withSuggestedWorkerNum(1) + } + + override def operatorInfo: OperatorInfo = + OperatorInfo( + "Loop End", + "Loop End", + OperatorGroupConstants.CONTROL_GROUP, + inputPorts = List(InputPort()), + outputPorts = List(OutputPort()) + ) +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpExec.scala new file mode 100644 index 00000000000..60f18cd5fc9 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpExec.scala @@ -0,0 +1,8 @@ +package org.apache.texera.amber.operator.loop + +import org.apache.texera.amber.core.executor.OperatorExecutor +import org.apache.texera.amber.core.tuple.{Tuple, TupleLike} + +class LoopEndOpExec extends OperatorExecutor { + override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = Iterator(tuple) +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala new file mode 100644 index 00000000000..4f3e86eb624 --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.loop + +import com.fasterxml.jackson.annotation.JsonProperty +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.{InputPort, OutputPort, PhysicalOp} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} +import org.apache.texera.amber.util.JSONUtils.objectMapper + +class LoopStartOpDesc extends LogicalOp { + + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = { + PhysicalOp + .oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithClassName( + "org.apache.texera.amber.operator.loop.LoopStartOpExec", + objectMapper.writeValueAsString(this) + ) + ) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withSuggestedWorkerNum(1) + .withParallelizable(false) + } + + override def operatorInfo: OperatorInfo = + OperatorInfo( + "Loop Start", + "Loop Start", + OperatorGroupConstants.CONTROL_GROUP, + inputPorts = List(InputPort()), + outputPorts = List(OutputPort()) + ) + +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpExec.scala new file mode 100644 index 00000000000..1ff10c650cb --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpExec.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.loop + +import org.apache.texera.amber.core.executor.OperatorExecutor +import org.apache.texera.amber.core.tuple.{Tuple, TupleLike} +import org.apache.texera.amber.util.JSONUtils.objectMapper + +import scala.collection.mutable + +class LoopStartOpExec(descString: String) extends OperatorExecutor { + private val data = new mutable.ArrayBuffer[Tuple] + private var currentIteration = 0 + + override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = { + data.append(tuple) + Iterator.empty + } + + override def onFinish(port: Int): Iterator[TupleLike] = { + currentIteration += 1 + data.iterator + } + +} + + diff --git a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts index b23f92caf32..185b723b778 100644 --- a/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts +++ b/frontend/src/app/workspace/component/workflow-editor/workflow-editor.component.ts @@ -341,7 +341,6 @@ export class WorkflowEditorComponent implements OnInit, AfterViewInit, OnDestroy body: { fill: "rgba(158,158,158,0.2)", pointerEvents: "none", - visibility: "hidden", }, }, }, diff --git a/frontend/src/assets/operator_images/LoopEnd.png b/frontend/src/assets/operator_images/LoopEnd.png new file mode 100644 index 0000000000000000000000000000000000000000..ee0f9ab6faccd328c102f214a7547d3e34d8359b GIT binary patch literal 5865 zcmVPy0qe(P?J^Czx6vRgiNy(U0Jp~gW+wJeo>FF6J zv)#V8`#arz@9BT?$35r!zVkc3^qfB5Mles!2yn63hL)Czs8n_X^S=Vv1(x+u03U(x zw}GeyL>8ERFmHopZ36HFlo}CsI${gOizOdGK38`k{#z@ z0LKG37R*P3=tuxoiu(3&tdALqidki^V-aG#x}8IXROixtAl!S_|ebFwduQ`ESz7 zbjWWA!Ukv4O65`j7wZ5mr#j}LP?(eVvu>^l%f;g1u&m1fd@ZLWO2h2?;QMz`x%|g8 zl*iSXOTu!ocp!+b29XmNfTo?#?1K3=DwqG9Ck%2$Se`Ls0yb^B0l<}c0y*Qh-j24m zSu}U<#*Et;n3o&Evg2F~=9?0Ai{@d6bl9le!$dcd>(0}mE{=}O6P9iJSP*>=z$tMS zYUV~lU>TUNAtNe;3TxWfHkYHj3yLDwV#I#U*Bvu(0mQ2kYVYyhm7M68OmOyH1TYUdIDwl7~3I>@WY#2Q?4@75X1sr2X zhye5BAi6jU(P5b&EIZC=0J;F|X-JY;1XL(v2f6Og)3h*k!m{HiPt~1if?#ylguvzG zx_8wyBf-I`5|(XyE{HBka6Lwp4g{F*AgLzA@=ZJefa|EJsK{hR*vTAn) zvr1r8XZ5{7q#gf6nX9h83WX`u)%BM|<&GgtD_}u%716^WQrV|@dR`3@0)bb+yaLQ` zpj=)>ij;LetjJs}?hm4aK{Oe_VPWFqp>>K(XBzV=FrTL@`b!&O*>R2s^P@S51$YU- zQ}F#~FmBwlG=KiPy4b+er|*m{TTX;!oebd906w8hcinpWViC?E*L}Wj84)LHA#CWm zQg?Z~M6@f7VN0Oy8(UkuY5x44Gz=I?Cr_U~5nH#aq)Bx@eO6ZN8fFKxDiCs}v}a75 zu;H8aiHwrl6>_cRw#T%;`Ts|C%qXH zCfrK1XKzk@12q}IGiUCAjT^58aD4{udArGVznJ9G6DMpY+^t0SgXji{KS@b~FDWZH zk}=N$^H(MHRF$2u8#htHW+0G7m8-mp%H>C^0@)-3xL7;~mi2uAs$6JVPhtTxi4iso z5qu&oM0(wxcVokb%jltpDk<8KN!5J*`33B>lZvqVW{L)?>#o?LiHQ(42)+hhPAQ(X zM|~kvSJxvcZbmLtb4SOy!A(gzw+Skq_L#Uq;K>lywwI)mDZsoGt*z6uM_=89)n4R+c?}AMbE&KA_w_8)%o!a4?&vrQmC7R^+D98s zbr$QWi?Ct&pck|)4p^x|>r59FImE**I!md%5|zs7v~b~@ za@%DjD+KtWiw;1cpzhLjIT@Cb>#892$WP6L4XOwN_e9nZXP`O;5Piu3M+uYzy z%ls>i$Ae31shU_5VZ$h_H#Nn#z6;Ab)tp&tBU}=rgEOt~KdI$IOvTTRAAdm9sIJ-w z>o`BqR!o@aG;yXu5>9Cha^-C^)lf%q~72=*S-=8>I zZ``p7Yuh*UW?tSTiiYoh$>d;(BAlr3Fh}fXi3%MmjQK|LyjdgZ7@M$;vqrQlHqrf5 zDw#g9qPw5GaBgqEPdFQQExGQ9kz=D0mK|pbfJMnKHT^Y zMPKzBD26Fh$#oYGoj(>~Ws6}j&mzyeF~)+-)D#h5+g5Ed5Pn*R{b*Koqh_IllnmX_TcsXlA7Gd8GCyKUP#v7*XDUU0(+ zYunF)=me3CV4g#sH&aBm5tOzqzWo0|1dvCo_Auu;Oz(-*2q$45}AkS$UK*6 zmjNaJQiJmKeQlwnp1t~Oa&1po+r9`yWg)Fklk5JwkZdD1Wdzu9{#&e32y>ACT^(h7uwg|uzS-Z)C6@fKtLWQ$zZF1f6 zV8Y5}7PPeNX>zr)Mz7@U33IlpahXP?$S!WN5%!?I-R4*32VR4%94T_&r0 z+mLo~v3NQxt6OAos%}9@*b8A<|0SZNrDYcrJ0OB=CWVHv1@DL$=llQBM_9*E318O> zD0!7!_b>suMr4K&V8?j{z@Y*r-rPr6+kOy4QxjLfd@p(4MTyHb0yB*O+x9{bjak8^ zt_3hJ>L=_=AgZfL!kCIpW3O=k>X`?T{}k@8&eZRfE`DLs;ZFHL?1=i%K+S&sMJ1Trbw{> zsCf}7Rs2{T39f}@-68;liP91PH7_EiqPEm6flSe2Q=Kp`+!C=%5_I;46G)W3 zyop>_$u>_DA|S;p^M-9gGU-^d+#A-MVRh_Z)9H{q!gG@pR>g2Kg~n&eLUL3G%~&99b$COJLi_g6M_Bu`tnEi9?zvfpXDW zCLT6VJjjKuvhlDc;~*C#$di$x83zgowr!Ksl_y4I+*ZLD>-8pUHshP0NAq-K&6>Id zd1OXLZI-V~U@E_4bnEkJmVD(GIp+ajYHQ__b#+>;r?!@zzhRSXiMBdJu9X|E(^v=Lkl0d8+U0?dD? zHZI2Rdy(q~9TvCi<{jS0J`17~VhA67D)SujyqO}hjnLE(VB4MzqDw^#V1AA~@5FxD z!wD-_r$Fi4jFy(&P2Vn&txZK}7(1{Izz!k?`2KZNF5fyjVW&;|7%G)lMU*R{Lktkp zV+3nKK(Lr#pZ$fxp|oJZYoii2JQhf~X_fdMluCP<-UnLvB&sCH-D+=F-#imV6on0u z2g@FIBnSvwEM5i6x=qB0KmlJ#o_B|sd?Q#30%Dm3{nq>b)l@EjKMG+d!Lt4$xaxJ- zamRgW_Uz4qI*e#R1bF7m9q`_JZ}nCU+FeYI@9#(D^6ILD4PJtdvlPG?V#b*Iv0~8b z5G+-laFFfY80wY`R)Y=F;W5xu0@PTs4?G{CN%rWo6 zvQDM0E_K*9k46NzqvI&}{*wT97FWx>9^=O!Ky&AAjH0V1!iH~IslHyr?PXpAqR)}* zz7b_HW=wnp*l`X9^RGZOqH8q1hG7yeA=jN3Rc_6M4N3bHfK#F>i8*d1DwWe|;lej# z$}!WcAQ1K@ctYHh^`RZh$aOzm%f{44SgBS?qg$>%_RcbI+FG`@wz0vR6~)Q=Oh5nL{*m9 zICD>%2$WjyS+}?NWOCi#)w76Lgbm)ZvULo`T9fgJ)wAQ_9Bm$~p*F@$^N&r~@My3E zMEYX@n3tlpb-D?diO$?0kY(%E`C4KFhML9vICw4S@MVZ&u6b746A zVWz-c0KS>l<^VXITsO9>H!;G7q`eHlohgE{625+>ast=4dB5}fo zZ(7+R98uP$>K*_$W5R@6o2kM~P?2Th#%lpwFI+bu$}Wsp3?s_K(ZWvu@e)4RuqJ(%1h8UwOO=g~Defj$~ql4M?kU!H9mTDSkF8 z-#3GK0W8bSbNTw9OuO$pAetstOgJi{gU6}(R_4!F%gw4sw3;JI*zi1iJeZ#V(Ju8G zDLC(G06)Q|O^fJ(2mT?bClS$MYoVzCz5?Kri3m+n2=hB2QrB0{Cn7BXQ5~6jL?%9%S$9hqCjpRS?!#EGZ;7SS)FIz@JxClEbC+dD!BO*Q8zQSaq1XAbdKbkwSR5e2s`LZ`*4On z#G>x;t6^K8SHQdi%x|DvUKQuoA^55<7WW6y!62Fp;4ly!3E%&v|=H`>}R zNcx_&n%Yzd+iw)xcKUAIY6zL%ha-@%yR{aGQzdNpF_w0~Of7hsu|Q&hEcJkyI$=Z7 zs?bDLnkbEU+Inzn#yKjUwIhiT!8qV$g0LZJ_XP92R5BOhAlM9Z`|i*$jvIWP1XP(J z?7+D;y&SP5@N7I8F>|nz1|)NY4d1rPGxo#WUwtLLTRP>bD=VUJNqeqpT~a0qJMgyE zU96*4^cr=1okcEmz>#Uf25;NGvPj?4Sj$TX?pf8b49wTmT0ThUVlq$Ifw!&WT-@t9 zx+y2I2s-yLzOKGlfE#P6A{bw3eG+m**x+s3=OMd6TT$iuFwNVn+tJoGOSi|WUXXG{ z*g-c52ZHEoO+6Rt#WvgXm|Za67NtykmKKsr!VV(t;jk>_DKq_QqqFKh`2JnGqN}2t z6>Vg$2|GaCzUFr-r%6=@H_ylj%tN6thZZb&EzbZo1YrlgarXexbP#m};d|y;3+65` z&mXOKYu+GgD8dc~pB-lkfUh@c2xB*Zd&zYd=Z%on>}^=W4tncO0x%UsUv6m6net3A z{}jL?DwkJR+xWcx-tdIo?yalNx?8uN4I-7)cS2q_ZNP5k=fK>J*4D@Jem0G|7fnUj zAx8!k9rjreoeJPnIXUyHl$_s!`AHD{It$TLQHhq+xTYoSPyoX&D#rsj7R*P734vBp zK-VG14-y}VmIF|oR-QLqbTkywsAHotF@-xjEv#PsaR8HHS%-jGm8(=`m-bE6J*?cK zz+1f^h^s)P%7wlTUk`V)0_`k4SDLq_~R%4M5l?=V1$00000NkvXXu0mjf{#zRE literal 0 HcmV?d00001 diff --git a/frontend/src/assets/operator_images/LoopStart.png b/frontend/src/assets/operator_images/LoopStart.png new file mode 100644 index 0000000000000000000000000000000000000000..7e5be023cdf6b64dd1bc140e5d75980455afeefb GIT binary patch literal 2138 zcmV-g2&MOlP)Px-6iGxuRCr$Pok@1vFbsy}TQ#e+JKw6wt+cDNN^a$bl4voq27m;>LBHSYqAkVY z$0rb+#7kG<@A`TD-M{p=3agGPM=EfMd@k!_*RSip?%sX$Fa2BAilA^a1?mO0>wb6l z?j5k2v68i*x_G;KNw`$Lu656Tc9Eo zuwW)j8SQ}zShIknU1*!20#9EcQB z6cz^>1a(cxTka+6Y@h~k>Vdf?xiG9J0X0-|fOc49G29qr3Tieum%s|bdO}cVHSgWK zRd`?mYB7L@zzV^7YEZqNBe1F-Ru$AD0sZM+llKS!)Id<7mwzo{Qv-F_Gg`;Ajf=V5 zcEPBP*Z_3}D+OT1L9Nzsk<(YzwwE$XlA1;kSW!?b@}lb|h!7T_9?%@2san@xK`cu5 z4NPGHDnVMNrYfLLkh*xZogYJ3fJ!!d(!e#T>u=Ub0F_K1J7MHlAR1IaI-00i9oPgi z9*BT73e;we!|nF0U1^@ zs7;RU1&&!a`tnAT$9|9)ZqY&gy58B9DER@RKu1jgs7DGB{-0nA5~$U$I4!jz2t$w; zqkgJLlynDUghNo9^B97VVtR@~AU#e0tKZ~t7t{tJ75|b-a~jdPl25tgMD{_gXRF?k zXh|dcz09jVg*qRav^@^AC+J29unB4*p2DAq9F9D=pW8RejG4((AV;*f{{Mr`-7oK{ z+EsgPgPL!YKX^Y6rKgkdgTv61*9U7K6zNnkYM3epFlN!gQDWcT~H&}?8PnHV1_x_cm~zVnY^)I>;EKn0{3-V~!vN)Oxq z1Xa7+7l8B!s6zQ=uyYedn!q}B{|`_Nx_vRvhYQil;ar^?Yf$$#9B^gF6_Z@Wbki%x ztAm4^J*sB6k1kr5l+3wnhXBsW4WL>Slvzs@udINoSG{+mM^%k8;7yegDI0JR1JDygFuPXM(N0BWhGCaCVhT#aw=V=z?de(toRjZVSEj!2b;lCwiOAI&NI40$xA|c}btX{twu`E7^XstQe;!h`;?wV2 z%Jo4-_kRvhHM@QHbw`UcQ|SIrz^bA*?VPi-I)#csRl~5VdQ*UDgxEX`@Jqdlp9+)We4}*#tzgGR)hetLR`( zU|H*=iJ9@^;87n?j-wUVwYy&yI3H%)=vntJ?akv=w?Xq4_}$+EM50(-LrH< z#{4!zlKpGYwa)zOG_vem6=HDBqaRts+|g(kR99Km4F_4&$W*8>sEl|tQ-hhs9_Vun z>gTnz5P+#bA;|?nZ4OexEW{zGgI7fekWRZ0s4d!pa$Jc7Dqv0LRAQSWb7U`2X@e|? zo0Q0)PVs8qDJuir)QU*(sEv*6XNeh_us{W@2^ces5|+~3$lf=OsRtia46fp6Wg4dG za4SL!HmH+;m@# zDsgnbrDHOvO~+D)^f3}JKy6IZ+tl4B?Q;+TwJ~8C^f8h$L2X1+(!N)UiejKPA}mF- zxHJ)4oya1?$hF7Y9dSk14)CI8Pk&?3lD+Fpmu$lmB4(g-(a!au8T+XF? z&3-<|(l-l&nh{o0IL%^I3J3n{a?L|445|^V{+yco^kdRID20W+G-h2trJ@70V|d`a zt^4IY5lN{uHH`wO6RauwE@t@doj)}B__y3}7UfZ?b`FmA)(Wr6Db&ar0hK8vsx4#E z73EQ58h~eJ1sL0ssI2 literal 0 HcmV?d00001 From b95465d2e0fe28fe4edc05cf955fe2b18ac28419 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Tue, 10 Feb 2026 18:19:19 -0800 Subject: [PATCH 02/28] update --- .../worker/promisehandlers/InitializeExecutorHandler.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala index cc1a32594b6..bf45d8eff9a 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala @@ -39,7 +39,6 @@ trait InitializeExecutorHandler { req: InitializeExecutorRequest, ctx: AsyncRPCContext ): Future[EmptyReturn] = { - println(s"Initializing executor with request: $req") dp.serializationManager.setOpInitialization(req) val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId) val workerCount = req.totalWorkerCount From d9d0cd94388ef21f1e32289d8f0e5d359553443f Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Tue, 10 Feb 2026 18:30:19 -0800 Subject: [PATCH 03/28] update --- .../architecture/controller/execution/RegionExecution.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecution.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecution.scala index e905c2b0449..d5939c2e3b1 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecution.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/RegionExecution.scala @@ -59,6 +59,8 @@ case class RegionExecution(region: Region) { physicalOpId: PhysicalOpIdentity, inheritOperatorExecution: Option[OperatorExecution] = None ): OperatorExecution = { + assert(!operatorExecutions.contains(physicalOpId), "OperatorExecution already exists.") + operatorExecutions.getOrElseUpdate( physicalOpId, inheritOperatorExecution From 144ae29f27dbb693f8f499da10ebb73ea1e6b88b Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Tue, 10 Feb 2026 18:37:00 -0800 Subject: [PATCH 04/28] update --- .../architecture/worker/promisehandlers/AssignPortHandler.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala index 1cc725dff83..57a7782cf55 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala @@ -41,7 +41,6 @@ trait AssignPortHandler { this: DataProcessorRPCHandlerInitializer => override def assignPort(msg: AssignPortRequest, ctx: AsyncRPCContext): Future[EmptyReturn] = { - println("ergergerge") val schema = Schema.fromRawSchema(msg.schema) if (msg.input) { val inputPortURIStrs = msg.storageUris.toList From 7b13fef2bd720015728fbe4051ad7ff5a4a9cdd5 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Tue, 10 Feb 2026 18:39:58 -0800 Subject: [PATCH 05/28] update --- .../engine/architecture/controller/WorkflowScheduler.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala index e2239f99a9b..385779fd38d 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/WorkflowScheduler.scala @@ -52,9 +52,5 @@ class WorkflowScheduler( this.physicalPlan = updatedPhysicalPlan } - def getNextRegions: Set[Region] = { - val region : Set[Region] = if (!schedule.hasNext) Set() else schedule.loopNext() - println("current Region: " + region) - region - } + def getNextRegions: Set[Region] = if (!schedule.hasNext) Set() else schedule.loopNext() } From bd5ac3aba6c1449bf7c4e62af64ad3b7bd3163c7 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Tue, 10 Feb 2026 18:45:46 -0800 Subject: [PATCH 06/28] update --- .../amber/operator/loop/LoopEndOpExec.scala | 19 +++++++++++++++++++ .../amber/operator/loop/LoopStartOpExec.scala | 18 +----------------- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpExec.scala index 60f18cd5fc9..9083c1e77aa 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpExec.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpExec.scala @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.texera.amber.operator.loop import org.apache.texera.amber.core.executor.OperatorExecutor diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpExec.scala index 1ff10c650cb..f15d90a6cca 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpExec.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpExec.scala @@ -18,27 +18,11 @@ */ package org.apache.texera.amber.operator.loop - import org.apache.texera.amber.core.executor.OperatorExecutor import org.apache.texera.amber.core.tuple.{Tuple, TupleLike} -import org.apache.texera.amber.util.JSONUtils.objectMapper - -import scala.collection.mutable class LoopStartOpExec(descString: String) extends OperatorExecutor { - private val data = new mutable.ArrayBuffer[Tuple] - private var currentIteration = 0 - - override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = { - data.append(tuple) - Iterator.empty - } - - override def onFinish(port: Int): Iterator[TupleLike] = { - currentIteration += 1 - data.iterator - } - + override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = Iterator(tuple) } From 19be0c1a3b4282f6c1830b254de6b78a6f2c705e Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Tue, 10 Feb 2026 19:28:06 -0800 Subject: [PATCH 07/28] update --- .../architecture/worker/promisehandlers/AssignPortHandler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala index 57a7782cf55..fe959733abb 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala @@ -55,7 +55,7 @@ trait AssignPortHandler { // Same as AddInputChannelHandler dp.inputGateway.getChannel(channelId).setPortId(msg.portId) dp.inputManager.getPort(msg.portId).channels.add(channelId) - //dp.stateManager.assertState(READY, RUNNING, PAUSED) + dp.stateManager.assertState(READY, RUNNING, PAUSED) } } else { val storageURIOption: Option[URI] = msg.storageUris.head match { From 706884f0b9e32c60bf7bfb6586fc10ab98b8758c Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Tue, 10 Feb 2026 20:32:39 -0800 Subject: [PATCH 08/28] update --- .../amber/engine/architecture/common/AmberProcessor.scala | 4 ++-- .../texera/amber/engine/common/rpc/AsyncRPCClient.scala | 8 ++++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala index e7763073232..22811b46417 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/common/AmberProcessor.scala @@ -43,7 +43,7 @@ abstract class AmberProcessor( with Serializable { /** FIFO & exactly once */ - val inputGateway: InputGateway = new NetworkInputGateway(this.actorId) + val inputGateway: NetworkInputGateway = new NetworkInputGateway(this.actorId) // 1. Unified Output val outputGateway: NetworkOutputGateway = @@ -55,7 +55,7 @@ abstract class AmberProcessor( } ) // 2. RPC Layer - val asyncRPCClient = new AsyncRPCClient(outputGateway, actorId) + val asyncRPCClient = new AsyncRPCClient(inputGateway, outputGateway, actorId) val asyncRPCServer: AsyncRPCServer = new AsyncRPCServer(outputGateway, actorId) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala b/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala index 704ebd7f476..f7e26803b47 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/common/rpc/AsyncRPCClient.scala @@ -27,7 +27,10 @@ import org.apache.texera.amber.core.virtualidentity.{ EmbeddedControlMessageIdentity } import org.apache.texera.amber.engine.architecture.controller.ClientEvent -import org.apache.texera.amber.engine.architecture.messaginglayer.NetworkOutputGateway +import org.apache.texera.amber.engine.architecture.messaginglayer.{ + NetworkInputGateway, + NetworkOutputGateway +} import org.apache.texera.amber.engine.architecture.rpc.controlcommands._ import org.apache.texera.amber.engine.architecture.rpc.controllerservice.ControllerServiceFs2Grpc import org.apache.texera.amber.engine.architecture.rpc.controlreturns.{ @@ -125,7 +128,8 @@ object AsyncRPCClient { } class AsyncRPCClient( - outputGateway: NetworkOutputGateway, + val inputGateway: NetworkInputGateway, + val outputGateway: NetworkOutputGateway, val actorId: ActorVirtualIdentity ) extends AmberLogging { From 21e6a41bb503d23ab1d67af3c142e6237528ac66 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Tue, 10 Feb 2026 20:33:28 -0800 Subject: [PATCH 09/28] update --- .../architecture/messaginglayer/NetworkInputGateway.scala | 4 ++++ .../architecture/messaginglayer/NetworkOutputGateway.scala | 4 ++++ .../architecture/scheduling/RegionExecutionCoordinator.scala | 2 ++ 3 files changed, 10 insertions(+) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala index 5cfd8aabc04..1d3ee3cb72c 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkInputGateway.scala @@ -86,4 +86,8 @@ class NetworkInputGateway(val actorId: ActorVirtualIdentity) enforcers += enforcer } + def removeControlChannel(from: ActorVirtualIdentity): Unit = { + inputChannels.remove(ChannelIdentity(from, actorId, isControl = true)) + } + } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala index 929a30f4efa..e35e819d41f 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/NetworkOutputGateway.scala @@ -94,4 +94,8 @@ class NetworkOutputGateway( idToSequenceNums.getOrElseUpdate(channelId, new AtomicLong()).getAndIncrement() } + def removeControlChannel(to: ActorVirtualIdentity): Unit = { + idToSequenceNums.remove(ChannelIdentity(actorId, to, isControl = true)) + } + } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 7e5b228801f..4b861de6574 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -166,6 +166,8 @@ class RegionExecutionCoordinator( val actorRef = actorRefService.getActorRef(workerId) // Remove the actorRef so that no other actors can find the worker and send messages. actorRefService.removeActorRef(workerId) + asyncRPCClient.outputGateway.removeControlChannel(workerId) + asyncRPCClient.inputGateway.removeControlChannel(workerId) gracefulStop(actorRef, Duration(5, TimeUnit.SECONDS)).asTwitter() } }.toSeq From 24da3e38ac1498a1f4cc1da0342d2d436cfdcc28 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Wed, 11 Feb 2026 00:27:38 -0800 Subject: [PATCH 10/28] update --- .../amber/engine/architecture/scheduling/Schedule.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala index d0ba5268091..c4b9c34c6cb 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala @@ -22,6 +22,7 @@ package org.apache.texera.amber.engine.architecture.scheduling case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends Iterator[Set[Region]] { private var currentLevel = levelSets.keys.minOption.getOrElse(0) private var loopStartLevel = currentLevel + private var i = 0 def getRegions: List[Region] = levelSets.values.flatten.toList @@ -36,7 +37,10 @@ case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends Iterat def loopNext(): Set[Region] = { val regions = levelSets(currentLevel) - if(regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopEnd-operator-")))) currentLevel = loopStartLevel + if(regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopEnd-operator-"))) && i < 5) { + currentLevel = loopStartLevel + i+=1 + } currentLevel += 1 regions } From 2ba0fa400d9a993352259e97583324c5bb62ef91 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Wed, 11 Feb 2026 01:39:46 -0800 Subject: [PATCH 11/28] update --- .../apache/texera/amber/operator/loop/LoopEndOpDesc.scala | 8 ++++++++ .../apache/texera/amber/operator/loop/LoopEndOpExec.scala | 7 ++++++- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala index f56068e9036..72f97f7c898 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala @@ -19,6 +19,8 @@ package org.apache.texera.amber.operator.loop +import com.fasterxml.jackson.annotation.JsonProperty +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle import org.apache.texera.amber.core.executor.OpExecWithClassName import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} import org.apache.texera.amber.core.workflow.{InputPort, OutputPort, PhysicalOp} @@ -26,6 +28,12 @@ import org.apache.texera.amber.operator.LogicalOp import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} class LoopEndOpDesc extends LogicalOp { + + @JsonProperty(required = true) + @JsonSchemaTitle("Iteration Number") + var iteration: Int = _ + + override def getPhysicalOp( workflowId: WorkflowIdentity, executionId: ExecutionIdentity diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpExec.scala index 9083c1e77aa..cb9393ed4f8 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpExec.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpExec.scala @@ -21,7 +21,12 @@ package org.apache.texera.amber.operator.loop import org.apache.texera.amber.core.executor.OperatorExecutor import org.apache.texera.amber.core.tuple.{Tuple, TupleLike} +import org.apache.texera.amber.util.JSONUtils.objectMapper + + +class LoopEndOpExec(descString: String) extends OperatorExecutor { + private val desc: LoopEndOpDesc = objectMapper.readValue(descString, classOf[LoopEndOpDesc]) + val iteration: Int = desc.iteration -class LoopEndOpExec extends OperatorExecutor { override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = Iterator(tuple) } From a05ffd1f780a8c1911df7e16b408206ab3820804 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Wed, 11 Feb 2026 02:06:08 -0800 Subject: [PATCH 12/28] update --- .../apache/texera/amber/operator/loop/LoopEndOpDesc.scala | 7 ++++--- .../apache/texera/amber/operator/loop/LoopEndOpExec.scala | 5 +---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala index 72f97f7c898..31719229f0b 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala @@ -19,13 +19,14 @@ package org.apache.texera.amber.operator.loop -import com.fasterxml.jackson.annotation.JsonProperty +import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription} import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle import org.apache.texera.amber.core.executor.OpExecWithClassName import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} import org.apache.texera.amber.core.workflow.{InputPort, OutputPort, PhysicalOp} import org.apache.texera.amber.operator.LogicalOp import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} +import org.apache.texera.amber.util.JSONUtils.objectMapper class LoopEndOpDesc extends LogicalOp { @@ -33,7 +34,6 @@ class LoopEndOpDesc extends LogicalOp { @JsonSchemaTitle("Iteration Number") var iteration: Int = _ - override def getPhysicalOp( workflowId: WorkflowIdentity, executionId: ExecutionIdentity @@ -43,10 +43,11 @@ class LoopEndOpDesc extends LogicalOp { workflowId, executionId, operatorIdentifier, - OpExecWithClassName("org.apache.texera.amber.operator.loop.LoopEndOpExec") + OpExecWithClassName("org.apache.texera.amber.operator.loop.LoopEndOpExec",objectMapper.writeValueAsString(this)) ) .withInputPorts(operatorInfo.inputPorts) .withOutputPorts(operatorInfo.outputPorts) + .withParallelizable(false) .withSuggestedWorkerNum(1) } diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpExec.scala index cb9393ed4f8..b1e6deace19 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpExec.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpExec.scala @@ -24,9 +24,6 @@ import org.apache.texera.amber.core.tuple.{Tuple, TupleLike} import org.apache.texera.amber.util.JSONUtils.objectMapper -class LoopEndOpExec(descString: String) extends OperatorExecutor { - private val desc: LoopEndOpDesc = objectMapper.readValue(descString, classOf[LoopEndOpDesc]) - val iteration: Int = desc.iteration - +class LoopEndOpExec extends OperatorExecutor { override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = Iterator(tuple) } From 44fc0e795bc02d064c3e517bd0004615bbd7fa6d Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Wed, 11 Feb 2026 02:06:12 -0800 Subject: [PATCH 13/28] update --- .../architecture/scheduling/Schedule.scala | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala index c4b9c34c6cb..20f29942218 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala @@ -19,10 +19,14 @@ package org.apache.texera.amber.engine.architecture.scheduling +import org.apache.texera.amber.core.executor.OpExecWithClassName +import org.apache.texera.amber.operator.loop.LoopEndOpDesc +import org.apache.texera.amber.util.JSONUtils.objectMapper + case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends Iterator[Set[Region]] { private var currentLevel = levelSets.keys.minOption.getOrElse(0) private var loopStartLevel = currentLevel - private var i = 0 + private var i = 1 def getRegions: List[Region] = levelSets.values.flatten.toList @@ -37,10 +41,14 @@ case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends Iterat def loopNext(): Set[Region] = { val regions = levelSets(currentLevel) - if(regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopEnd-operator-"))) && i < 5) { - currentLevel = loopStartLevel - i+=1 + + if(regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopEnd-operator-")))) { + if (i < objectMapper.readValue(regions.head.getOperators.head.opExecInitInfo.asInstanceOf[OpExecWithClassName].descString, classOf[LoopEndOpDesc]).iteration) { + currentLevel = loopStartLevel + i+=1 + } } + currentLevel += 1 regions } From 846aac2187ef8c5b1b9db0488970e29fadd57c0b Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Wed, 11 Feb 2026 17:15:05 -0800 Subject: [PATCH 14/28] update --- .../org/apache/texera/amber/operator/loop/LoopEndOpExec.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpExec.scala index b1e6deace19..d3e736c0e3a 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpExec.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpExec.scala @@ -21,9 +21,7 @@ package org.apache.texera.amber.operator.loop import org.apache.texera.amber.core.executor.OperatorExecutor import org.apache.texera.amber.core.tuple.{Tuple, TupleLike} -import org.apache.texera.amber.util.JSONUtils.objectMapper - -class LoopEndOpExec extends OperatorExecutor { +class LoopEndOpExec(descString: String) extends OperatorExecutor { override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = Iterator(tuple) } From 6be7dc5bcf3b0aaf6df92e4eff59ddc03e2f9096 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Wed, 11 Feb 2026 18:35:07 -0800 Subject: [PATCH 15/28] update --- .../messaginglayer/AmberFIFOChannel.scala | 22 +++++++++---------- .../RegionExecutionCoordinator.scala | 4 ++-- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala index d81b4239ba7..7917721c9c8 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala @@ -41,18 +41,18 @@ class AmberFIFOChannel(val channelId: ChannelIdentity) extends AmberLogging { private var portId: Option[PortIdentity] = None def acceptMessage(msg: WorkflowFIFOMessage): Unit = { - val seq = msg.sequenceNumber - val payload = msg.payload - if (isDuplicated(seq)) { - logger.debug( - s"received duplicated message $payload with seq = $seq while current seq = $current" - ) - } else if (isAhead(seq)) { - logger.debug(s"received ahead message $payload with seq = $seq while current seq = $current") - stash(seq, msg) - } else { + //val seq = msg.sequenceNumber + //val payload = msg.payload + //if (isDuplicated(seq)) { + // logger.debug( + // s"received duplicated message $payload with seq = $seq while current seq = $current" + // ) + //} else if (isAhead(seq)) { + // logger.debug(s"received ahead message $payload with seq = $seq while current seq = $current") + // stash(seq, msg) + //} else { enforceFIFO(msg) - } + //} } def getCurrentSeq: Long = current diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 4b861de6574..2cddb29ba12 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -166,8 +166,8 @@ class RegionExecutionCoordinator( val actorRef = actorRefService.getActorRef(workerId) // Remove the actorRef so that no other actors can find the worker and send messages. actorRefService.removeActorRef(workerId) - asyncRPCClient.outputGateway.removeControlChannel(workerId) - asyncRPCClient.inputGateway.removeControlChannel(workerId) + //asyncRPCClient.outputGateway.removeControlChannel(workerId) + //asyncRPCClient.inputGateway.removeControlChannel(workerId) gracefulStop(actorRef, Duration(5, TimeUnit.SECONDS)).asTwitter() } }.toSeq From d8338d124197c1343ecb7b03cb9adf4e307effe3 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Wed, 11 Feb 2026 23:08:34 -0800 Subject: [PATCH 16/28] update --- .../messaginglayer/AmberFIFOChannel.scala | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala index 7917721c9c8..2556c55a36d 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala @@ -41,18 +41,17 @@ class AmberFIFOChannel(val channelId: ChannelIdentity) extends AmberLogging { private var portId: Option[PortIdentity] = None def acceptMessage(msg: WorkflowFIFOMessage): Unit = { - //val seq = msg.sequenceNumber - //val payload = msg.payload - //if (isDuplicated(seq)) { - // logger.debug( - // s"received duplicated message $payload with seq = $seq while current seq = $current" - // ) - //} else if (isAhead(seq)) { - // logger.debug(s"received ahead message $payload with seq = $seq while current seq = $current") - // stash(seq, msg) - //} else { + //channel remove + val seq = msg.sequenceNumber + val payload = msg.payload + if (isDuplicated(seq)) { + logger.debug(s"received duplicated message $payload with seq = $seq while current seq = $current") + } else if (isAhead(seq)) { + logger.debug(s"received ahead message $payload with seq = $seq while current seq = $current") + stash(seq, msg) + } else { enforceFIFO(msg) - //} + } } def getCurrentSeq: Long = current From 36e517e463f171499cad3753c0fbc5497157cb0b Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Fri, 13 Feb 2026 13:21:28 -0800 Subject: [PATCH 17/28] fix --- .../architecture/scheduling/RegionExecutionCoordinator.scala | 4 ++-- .../worker/promisehandlers/AssignPortHandler.scala | 2 +- .../architecture/worker/promisehandlers/EndHandler.scala | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 2cddb29ba12..4b861de6574 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -166,8 +166,8 @@ class RegionExecutionCoordinator( val actorRef = actorRefService.getActorRef(workerId) // Remove the actorRef so that no other actors can find the worker and send messages. actorRefService.removeActorRef(workerId) - //asyncRPCClient.outputGateway.removeControlChannel(workerId) - //asyncRPCClient.inputGateway.removeControlChannel(workerId) + asyncRPCClient.outputGateway.removeControlChannel(workerId) + asyncRPCClient.inputGateway.removeControlChannel(workerId) gracefulStop(actorRef, Duration(5, TimeUnit.SECONDS)).asTwitter() } }.toSeq diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala index fe959733abb..57a7782cf55 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala @@ -55,7 +55,7 @@ trait AssignPortHandler { // Same as AddInputChannelHandler dp.inputGateway.getChannel(channelId).setPortId(msg.portId) dp.inputManager.getPort(msg.portId).channels.add(channelId) - dp.stateManager.assertState(READY, RUNNING, PAUSED) + //dp.stateManager.assertState(READY, RUNNING, PAUSED) } } else { val storageURIOption: Option[URI] = msg.storageUris.head match { diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndHandler.scala index 2a6a20b3d3e..472eb09016f 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndHandler.scala @@ -49,7 +49,7 @@ trait EndHandler { s"${dp.inputManager.inputMessageQueue.peek()}" ) } - assert(dp.inputManager.inputMessageQueue.isEmpty) + //assert(dp.inputManager.inputMessageQueue.isEmpty) // Now we can safely acknowledge that this worker can be terminated. EmptyReturn() } From a4bfbdb2181d87947922ba8be390d023256a6979 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Fri, 13 Feb 2026 13:54:04 -0800 Subject: [PATCH 18/28] fix --- .../scheduling/RegionExecutionCoordinator.scala | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 4b861de6574..318d3b2c602 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -536,7 +536,20 @@ class RegionExecutionCoordinator( region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3 val schema = schemaOptional.getOrElse(throw new IllegalStateException("Schema is missing")) - DocumentFactory.createDocument(storageUriToAdd, schema) + + + if (region.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopEnd-operator-"))) { + try { + DocumentFactory.openDocument(storageUriToAdd) + } catch { + case _: Exception => + DocumentFactory.createDocument(storageUriToAdd, schema) + } + } + else { + DocumentFactory.createDocument(storageUriToAdd, schema) + } + WorkflowExecutionsResource.insertOperatorPortResultUri( eid = eid, globalPortId = outputPortId, From 084f602990c43f7cb60c63b13a70b04d0d883314 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sat, 14 Feb 2026 19:46:25 -0800 Subject: [PATCH 19/28] update --- .../messaginglayer/AmberFIFOChannel.scala | 4 ++- .../RegionExecutionCoordinator.scala | 21 +++++++------- .../architecture/scheduling/Schedule.scala | 21 +++++++++++--- .../promisehandlers/AssignPortHandler.scala | 2 +- .../texera/amber/operator/LogicalOp.scala | 28 +++++++++++++++---- .../amber/operator/loop/LoopEndOpDesc.scala | 5 +++- .../amber/operator/loop/LoopStartOpExec.scala | 2 -- 7 files changed, 58 insertions(+), 25 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala index 2556c55a36d..b7611c8f8f9 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/AmberFIFOChannel.scala @@ -45,7 +45,9 @@ class AmberFIFOChannel(val channelId: ChannelIdentity) extends AmberLogging { val seq = msg.sequenceNumber val payload = msg.payload if (isDuplicated(seq)) { - logger.debug(s"received duplicated message $payload with seq = $seq while current seq = $current") + logger.debug( + s"received duplicated message $payload with seq = $seq while current seq = $current" + ) } else if (isAhead(seq)) { logger.debug(s"received ahead message $payload with seq = $seq while current seq = $current") stash(seq, msg) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 0f105410972..5cfbc593917 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -210,14 +210,15 @@ class RegionExecutionCoordinator( regionExecution: RegionExecution, attempt: Int = 1 ): Future[Unit] = { - terminateWorkers(regionExecution).rescue { case err => - logger.warn( - s"Failed to terminate region ${region.id.id} on attempt $attempt. Retrying in ${killRetryDelay.inMilliseconds} ms.", - err - ) - Future - .sleep(killRetryDelay)(killRetryTimer) - .flatMap(_ => terminateWorkersWithRetry(regionExecution, attempt + 1)) + terminateWorkers(regionExecution).rescue { + case err => + logger.warn( + s"Failed to terminate region ${region.id.id} on attempt $attempt. Retrying in ${killRetryDelay.inMilliseconds} ms.", + err + ) + Future + .sleep(killRetryDelay)(killRetryTimer) + .flatMap(_ => terminateWorkersWithRetry(regionExecution, attempt + 1)) } } @@ -572,7 +573,6 @@ class RegionExecutionCoordinator( val schema = schemaOptional.getOrElse(throw new IllegalStateException("Schema is missing")) - if (region.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopEnd-operator-"))) { try { DocumentFactory.openDocument(storageUriToAdd) @@ -580,8 +580,7 @@ class RegionExecutionCoordinator( case _: Exception => DocumentFactory.createDocument(storageUriToAdd, schema) } - } - else { + } else { DocumentFactory.createDocument(storageUriToAdd, schema) } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala index 20f29942218..93d96d640ac 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala @@ -34,7 +34,9 @@ case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends Iterat override def next(): Set[Region] = { val regions = levelSets(currentLevel) - if(regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopStart-operator-")))) loopStartLevel = currentLevel + if ( + regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopStart-operator-"))) + ) loopStartLevel = currentLevel currentLevel += 1 regions } @@ -42,10 +44,21 @@ case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends Iterat def loopNext(): Set[Region] = { val regions = levelSets(currentLevel) - if(regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopEnd-operator-")))) { - if (i < objectMapper.readValue(regions.head.getOperators.head.opExecInitInfo.asInstanceOf[OpExecWithClassName].descString, classOf[LoopEndOpDesc]).iteration) { + if ( + regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopEnd-operator-"))) + ) { + if ( + i < objectMapper + .readValue( + regions.head.getOperators.head.opExecInitInfo + .asInstanceOf[OpExecWithClassName] + .descString, + classOf[LoopEndOpDesc] + ) + .iteration + ) { currentLevel = loopStartLevel - i+=1 + i += 1 } } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala index 57a7782cf55..3fb489e1055 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala @@ -55,7 +55,7 @@ trait AssignPortHandler { // Same as AddInputChannelHandler dp.inputGateway.getChannel(channelId).setPortId(msg.portId) dp.inputManager.getPort(msg.portId).channels.add(channelId) - //dp.stateManager.assertState(READY, RUNNING, PAUSED) + //dp.stateManager.assertState(READY, RUNNING, PAUSED) } } else { val storageURIOption: Option[URI] = msg.storageUris.head match { diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala index ee57514212b..1b92674864c 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/LogicalOp.scala @@ -24,8 +24,15 @@ import com.fasterxml.jackson.annotation._ import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle import org.apache.texera.amber.core.executor.OperatorExecutor import org.apache.texera.amber.core.tuple.Schema -import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, OperatorIdentity, WorkflowIdentity} -import org.apache.texera.amber.core.workflow.WorkflowContext.{DEFAULT_EXECUTION_ID, DEFAULT_WORKFLOW_ID} +import org.apache.texera.amber.core.virtualidentity.{ + ExecutionIdentity, + OperatorIdentity, + WorkflowIdentity +} +import org.apache.texera.amber.core.workflow.WorkflowContext.{ + DEFAULT_EXECUTION_ID, + DEFAULT_WORKFLOW_ID +} import org.apache.texera.amber.core.workflow.{PhysicalOp, PhysicalPlan, PortIdentity} import org.apache.texera.amber.operator.aggregate.AggregateOpDesc import org.apache.texera.amber.operator.cartesianProduct.CartesianProductOpDesc @@ -35,14 +42,22 @@ import org.apache.texera.amber.operator.distinct.DistinctOpDesc import org.apache.texera.amber.operator.dummy.DummyOpDesc import org.apache.texera.amber.operator.filter.SpecializedFilterOpDesc import org.apache.texera.amber.operator.hashJoin.HashJoinOpDesc -import org.apache.texera.amber.operator.huggingFace.{HuggingFaceIrisLogisticRegressionOpDesc, HuggingFaceSentimentAnalysisOpDesc, HuggingFaceSpamSMSDetectionOpDesc, HuggingFaceTextSummarizationOpDesc} +import org.apache.texera.amber.operator.huggingFace.{ + HuggingFaceIrisLogisticRegressionOpDesc, + HuggingFaceSentimentAnalysisOpDesc, + HuggingFaceSpamSMSDetectionOpDesc, + HuggingFaceTextSummarizationOpDesc +} import org.apache.texera.amber.operator.ifStatement.IfOpDesc import org.apache.texera.amber.operator.intersect.IntersectOpDesc import org.apache.texera.amber.operator.intervalJoin.IntervalJoinOpDesc import org.apache.texera.amber.operator.keywordSearch.KeywordSearchOpDesc import org.apache.texera.amber.operator.limit.LimitOpDesc import org.apache.texera.amber.operator.machineLearning.Scorer.MachineLearningScorerOpDesc -import org.apache.texera.amber.operator.machineLearning.sklearnAdvanced.KNNTrainer.{SklearnAdvancedKNNClassifierTrainerOpDesc, SklearnAdvancedKNNRegressorTrainerOpDesc} +import org.apache.texera.amber.operator.machineLearning.sklearnAdvanced.KNNTrainer.{ + SklearnAdvancedKNNClassifierTrainerOpDesc, + SklearnAdvancedKNNRegressorTrainerOpDesc +} import org.apache.texera.amber.operator.machineLearning.sklearnAdvanced.SVCTrainer.SklearnAdvancedSVCTrainerOpDesc import org.apache.texera.amber.operator.machineLearning.sklearnAdvanced.SVRTrainer.SklearnAdvancedSVRTrainerOpDesc import org.apache.texera.amber.operator.metadata.{OPVersion, OperatorInfo, PropertyNameConstants} @@ -56,7 +71,10 @@ import org.apache.texera.amber.operator.sleep.SleepOpDesc import org.apache.texera.amber.operator.sort.{SortOpDesc, StableMergeSortOpDesc} import org.apache.texera.amber.operator.sortPartitions.SortPartitionsOpDesc import org.apache.texera.amber.operator.source.apis.reddit.RedditSearchSourceOpDesc -import org.apache.texera.amber.operator.source.apis.twitter.v2.{TwitterFullArchiveSearchSourceOpDesc, TwitterSearchSourceOpDesc} +import org.apache.texera.amber.operator.source.apis.twitter.v2.{ + TwitterFullArchiveSearchSourceOpDesc, + TwitterSearchSourceOpDesc +} import org.apache.texera.amber.operator.source.fetcher.URLFetcherOpDesc import org.apache.texera.amber.operator.source.scan.FileScanSourceOpDesc import org.apache.texera.amber.operator.source.scan.arrow.ArrowSourceOpDesc diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala index 31719229f0b..8288db6aa7f 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala @@ -43,7 +43,10 @@ class LoopEndOpDesc extends LogicalOp { workflowId, executionId, operatorIdentifier, - OpExecWithClassName("org.apache.texera.amber.operator.loop.LoopEndOpExec",objectMapper.writeValueAsString(this)) + OpExecWithClassName( + "org.apache.texera.amber.operator.loop.LoopEndOpExec", + objectMapper.writeValueAsString(this) + ) ) .withInputPorts(operatorInfo.inputPorts) .withOutputPorts(operatorInfo.outputPorts) diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpExec.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpExec.scala index f15d90a6cca..1d0c4cbe9e2 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpExec.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpExec.scala @@ -24,5 +24,3 @@ import org.apache.texera.amber.core.tuple.{Tuple, TupleLike} class LoopStartOpExec(descString: String) extends OperatorExecutor { override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = Iterator(tuple) } - - From 55d5cec6fb7cb890b87df278ce113d4e2f24016e Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sat, 14 Feb 2026 20:43:38 -0800 Subject: [PATCH 20/28] update --- .../amber/engine/architecture/scheduling/Schedule.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala index 93d96d640ac..435096cda1f 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala @@ -34,16 +34,15 @@ case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends Iterat override def next(): Set[Region] = { val regions = levelSets(currentLevel) - if ( - regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopStart-operator-"))) - ) loopStartLevel = currentLevel currentLevel += 1 regions } def loopNext(): Set[Region] = { val regions = levelSets(currentLevel) - + if ( + regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopStart-operator-"))) + ) loopStartLevel = currentLevel - 1 if ( regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopEnd-operator-"))) ) { From b8faf93e5adaff8b4c1aea5b7c681426ab63edaf Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sun, 15 Feb 2026 00:45:20 -0800 Subject: [PATCH 21/28] update --- .../architecture/scheduling/RegionExecutionCoordinator.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index 5cfbc593917..cd1541aaf7e 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -180,8 +180,8 @@ class RegionExecutionCoordinator( val actorRef = actorRefService.getActorRef(workerId) // Remove the actorRef so that no other actors can find the worker and send messages. actorRefService.removeActorRef(workerId) - asyncRPCClient.outputGateway.removeControlChannel(workerId) asyncRPCClient.inputGateway.removeControlChannel(workerId) + asyncRPCClient.outputGateway.removeControlChannel(workerId) gracefulStop(actorRef, ScalaDuration(5, TimeUnit.SECONDS)).asTwitter() } }.toSeq @@ -210,8 +210,7 @@ class RegionExecutionCoordinator( regionExecution: RegionExecution, attempt: Int = 1 ): Future[Unit] = { - terminateWorkers(regionExecution).rescue { - case err => + terminateWorkers(regionExecution).rescue { case err => logger.warn( s"Failed to terminate region ${region.id.id} on attempt $attempt. Retrying in ${killRetryDelay.inMilliseconds} ms.", err From a53506afca890849710a601e1df21ba1a54ba6f3 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sun, 15 Feb 2026 00:45:45 -0800 Subject: [PATCH 22/28] update --- .../architecture/worker/promisehandlers/AssignPortHandler.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala index 3fb489e1055..fe959733abb 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/AssignPortHandler.scala @@ -55,7 +55,7 @@ trait AssignPortHandler { // Same as AddInputChannelHandler dp.inputGateway.getChannel(channelId).setPortId(msg.portId) dp.inputManager.getPort(msg.portId).channels.add(channelId) - //dp.stateManager.assertState(READY, RUNNING, PAUSED) + dp.stateManager.assertState(READY, RUNNING, PAUSED) } } else { val storageURIOption: Option[URI] = msg.storageUris.head match { From d44a664f9f81448d2f13ded582df0061c6210c3a Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sun, 15 Feb 2026 00:46:33 -0800 Subject: [PATCH 23/28] update --- .../controller/execution/WorkflowExecution.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala index 6071a4dc504..2de29f31fdd 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/execution/WorkflowExecution.scala @@ -45,6 +45,11 @@ case class WorkflowExecution() { */ def initRegionExecution(region: Region): RegionExecution = { regionExecutions.remove(region.id) + // ensure the region execution hasn't been initialized already. + assert( + !regionExecutions.contains(region.id), + s"RegionExecution of ${region.id} already initialized." + ) regionExecutions.getOrElseUpdate(region.id, RegionExecution(region)) } From 1cd48fd12b88ab599b6262fb4b98c0a0baad487b Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sun, 15 Feb 2026 00:47:24 -0800 Subject: [PATCH 24/28] update --- .../scheduling/RegionExecutionCoordinator.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala index cd1541aaf7e..a6b7e173ccb 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala @@ -211,13 +211,13 @@ class RegionExecutionCoordinator( attempt: Int = 1 ): Future[Unit] = { terminateWorkers(regionExecution).rescue { case err => - logger.warn( - s"Failed to terminate region ${region.id.id} on attempt $attempt. Retrying in ${killRetryDelay.inMilliseconds} ms.", - err - ) - Future - .sleep(killRetryDelay)(killRetryTimer) - .flatMap(_ => terminateWorkersWithRetry(regionExecution, attempt + 1)) + logger.warn( + s"Failed to terminate region ${region.id.id} on attempt $attempt. Retrying in ${killRetryDelay.inMilliseconds} ms.", + err + ) + Future + .sleep(killRetryDelay)(killRetryTimer) + .flatMap(_ => terminateWorkersWithRetry(regionExecution, attempt + 1)) } } From e35a33286c52b28749cda6a8911ca4db47726142 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Sun, 15 Feb 2026 16:21:38 -0800 Subject: [PATCH 25/28] update --- .../architecture/scheduling/Schedule.scala | 27 ++++++++++--------- .../amber/operator/loop/LoopEndOpDesc.scala | 7 ----- .../amber/operator/loop/LoopStartOpDesc.scala | 3 +++ 3 files changed, 17 insertions(+), 20 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala index 435096cda1f..2f067437466 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/Schedule.scala @@ -20,12 +20,13 @@ package org.apache.texera.amber.engine.architecture.scheduling import org.apache.texera.amber.core.executor.OpExecWithClassName -import org.apache.texera.amber.operator.loop.LoopEndOpDesc +import org.apache.texera.amber.operator.loop.{LoopEndOpDesc, LoopStartOpDesc} import org.apache.texera.amber.util.JSONUtils.objectMapper case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends Iterator[Set[Region]] { private var currentLevel = levelSets.keys.minOption.getOrElse(0) private var loopStartLevel = currentLevel + private var iteration = 1 private var i = 1 def getRegions: List[Region] = levelSets.values.flatten.toList @@ -42,25 +43,25 @@ case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends Iterat val regions = levelSets(currentLevel) if ( regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopStart-operator-"))) - ) loopStartLevel = currentLevel - 1 + ) { + iteration = objectMapper + .readValue( + regions.head.getOperators.head.opExecInitInfo + .asInstanceOf[OpExecWithClassName] + .descString, + classOf[LoopStartOpDesc] + ) + .iteration + loopStartLevel = currentLevel - 1 + } if ( regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopEnd-operator-"))) ) { - if ( - i < objectMapper - .readValue( - regions.head.getOperators.head.opExecInitInfo - .asInstanceOf[OpExecWithClassName] - .descString, - classOf[LoopEndOpDesc] - ) - .iteration - ) { + if (i < iteration) { currentLevel = loopStartLevel i += 1 } } - currentLevel += 1 regions } diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala index 8288db6aa7f..08330db5841 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala @@ -19,8 +19,6 @@ package org.apache.texera.amber.operator.loop -import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription} -import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle import org.apache.texera.amber.core.executor.OpExecWithClassName import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} import org.apache.texera.amber.core.workflow.{InputPort, OutputPort, PhysicalOp} @@ -29,11 +27,6 @@ import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, Operat import org.apache.texera.amber.util.JSONUtils.objectMapper class LoopEndOpDesc extends LogicalOp { - - @JsonProperty(required = true) - @JsonSchemaTitle("Iteration Number") - var iteration: Int = _ - override def getPhysicalOp( workflowId: WorkflowIdentity, executionId: ExecutionIdentity diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala index 4f3e86eb624..473db52584a 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala @@ -29,6 +29,9 @@ import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, Operat import org.apache.texera.amber.util.JSONUtils.objectMapper class LoopStartOpDesc extends LogicalOp { + @JsonProperty(required = true) + @JsonSchemaTitle("Iteration Number") + var iteration: Int = _ override def getPhysicalOp( workflowId: WorkflowIdentity, From 30a85623a7abb8b30411d79a05b790ceaba59d05 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Tue, 24 Feb 2026 14:26:48 -0800 Subject: [PATCH 26/28] update --- .../architecture/rpc/controlcommands.proto | 6 +++ .../architecture/rpc/workerservice.proto | 1 + .../messaginglayer/OutputManager.scala | 37 ++++++++++++++----- .../core/storage/result/ResultSchema.scala | 5 +++ 4 files changed, 40 insertions(+), 9 deletions(-) diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto index ed17be236dc..f73f8707d6f 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto @@ -57,6 +57,7 @@ message ControlRequest { EmptyRequest emptyRequest = 56; PrepareCheckpointRequest prepareCheckpointRequest = 57; QueryStatisticsRequest queryStatisticsRequest = 58; + EndIterationRequest endIterationRequest = 59; // request for testing Ping ping = 100; @@ -271,4 +272,9 @@ message PrepareCheckpointRequest{ message QueryStatisticsRequest{ repeated core.ActorVirtualIdentity filterByWorkers = 1; +} + +message EndIterationRequest{ + core.ActorVirtualIdentity LoopStartId = 1 [(scalapb.field).no_box = true]; + int32 iteration = 2; } \ No newline at end of file diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto index dbcd6d8a5e0..a2c4d6ef9b7 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto @@ -47,6 +47,7 @@ service WorkerService { rpc EndWorker(EmptyRequest) returns (EmptyReturn); rpc StartChannel(EmptyRequest) returns (EmptyReturn); rpc EndChannel(EmptyRequest) returns (EmptyReturn); + rpc EndIteration(EndIterationRequest) returns (EmptyReturn); rpc DebugCommand(DebugCommandRequest) returns (EmptyReturn); rpc EvaluatePythonExpression(EvaluatePythonExpressionRequest) returns (EvaluatedValue); rpc NoOperation(EmptyRequest) returns (EmptyReturn); diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala index 4ab3d18056f..202d83d1638 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -22,20 +22,14 @@ package org.apache.texera.amber.engine.architecture.messaginglayer import org.apache.texera.amber.core.state.State import org.apache.texera.amber.core.storage.DocumentFactory import org.apache.texera.amber.core.storage.model.BufferedItemWriter +import org.apache.texera.amber.core.storage.result.ResultSchema import org.apache.texera.amber.core.tuple._ import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity} import org.apache.texera.amber.core.workflow.{PhysicalLink, PortIdentity} -import org.apache.texera.amber.engine.architecture.messaginglayer.OutputManager.{ - DPOutputIterator, - getBatchSize, - toPartitioner -} +import org.apache.texera.amber.engine.architecture.messaginglayer.OutputManager.{DPOutputIterator, getBatchSize, toPartitioner} import org.apache.texera.amber.engine.architecture.sendsemantics.partitioners._ import org.apache.texera.amber.engine.architecture.sendsemantics.partitionings._ -import org.apache.texera.amber.engine.architecture.worker.managers.{ - OutputPortResultWriterThread, - PortStorageWriterTerminateSignal -} +import org.apache.texera.amber.engine.architecture.worker.managers.{OutputPortResultWriterThread, PortStorageWriterTerminateSignal} import org.apache.texera.amber.engine.common.AmberLogging import org.apache.texera.amber.util.VirtualIdentityUtils @@ -124,6 +118,10 @@ class OutputManager( : mutable.HashMap[PortIdentity, OutputPortResultWriterThread] = mutable.HashMap() + private val ECMWriters + : mutable.HashMap[PortIdentity, BufferedItemWriter[Tuple]] = + mutable.HashMap() + /** * Add down stream operator and its corresponding Partitioner. * @@ -232,6 +230,23 @@ class OutputManager( }) } + def saveECMToStorageIfNeeded( + tuple: Tuple, + outputPortId: Option[PortIdentity] = None + ): Unit = { + (outputPortId match { + case Some(portId) => + this.ECMWriters.get(portId) match { + case Some(_) => this.ECMWriters.filter(_._1 == portId) + case None => Map.empty + } + case None => this.ECMWriters + }).foreach({ + case (portId, writer) => + writer.putOne(new Tuple(ResultSchema.ecmSchema, Array(worker.name))) + }) + } + /** * Singal the port storage writer to flush the remaining buffer and wait for commits to finish so that * the output port is properly completed. If the output port does not need storage, no action will be done. @@ -280,6 +295,10 @@ class OutputManager( } private def setupOutputStorageWriterThread(portId: PortIdentity, storageUri: URI): Unit = { + this.ECMWriters(portId) = DocumentFactory + .createDocument(storageUri.resolve("ecm"), ResultSchema.ecmSchema) + .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString) + .asInstanceOf[BufferedItemWriter[Tuple]] val bufferedItemWriter = DocumentFactory .openDocument(storageUri) ._1 diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/ResultSchema.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/ResultSchema.scala index ade33283f7f..4be34b856d6 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/ResultSchema.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/ResultSchema.scala @@ -39,4 +39,9 @@ object ResultSchema { val consoleMessagesSchema: Schema = new Schema( new Attribute("message", AttributeType.STRING) ) + + val ecmSchema: Schema = new Schema( + new Attribute("LoopStartId", AttributeType.STRING), + new Attribute("iteration", AttributeType.INTEGER) + ) } From b717fb0d46d6c2b82e49b2c4f356bd9484add129 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Tue, 24 Feb 2026 15:36:36 -0800 Subject: [PATCH 27/28] update --- .../storage/result/iceberg/IcebergTableWriter.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala index 549cb4b9d17..e5e2a5719d2 100644 --- a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala +++ b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergTableWriter.scala @@ -29,7 +29,7 @@ import org.apache.iceberg.io.{DataWriter, OutputFile} import org.apache.iceberg.parquet.Parquet import org.apache.iceberg.{Schema, Table} -import java.nio.file.Paths +import java.nio.file.{Files, Path, Paths} import scala.collection.mutable.ArrayBuffer /** @@ -107,9 +107,12 @@ private[storage] class IcebergTableWriter[T]( private def flushBuffer(): Unit = { if (buffer.nonEmpty) { // Create a unique file path using the writer's identifier and the filename index - val filepath = Paths.get(table.location()).resolve(s"${writerIdentifier}_${filenameIdx}") - // Increment the filename index by 1 - filenameIdx += 1 + var filepath: Path = null + do { + filepath = Paths.get(table.location()).resolve(s"${writerIdentifier}_$filenameIdx") + filenameIdx+= 1 + } while (Files.exists(filepath)) + val outputFile: OutputFile = table.io().newOutputFile(filepath.toString) // Create a Parquet data writer to write a new file val dataWriter: DataWriter[Record] = Parquet From 04fe614bf0d8c377357fc6a806220275d0709af2 Mon Sep 17 00:00:00 2001 From: Xinyuan Lin Date: Thu, 26 Feb 2026 21:11:54 -0800 Subject: [PATCH 28/28] update --- .../messaginglayer/OutputManager.scala | 2 +- .../DataProcessorRPCHandlerInitializer.scala | 1 + .../promisehandlers/EndIterationHandler.scala | 44 +++++++++++ .../operator/loop/LoopStartv2OpDesc.scala | 77 +++++++++++++++++++ 4 files changed, 123 insertions(+), 1 deletion(-) create mode 100644 amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala create mode 100644 common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartv2OpDesc.scala diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala index 202d83d1638..8f0c0f6feef 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala @@ -243,7 +243,7 @@ class OutputManager( case None => this.ECMWriters }).foreach({ case (portId, writer) => - writer.putOne(new Tuple(ResultSchema.ecmSchema, Array(worker.name))) + writer.putOne(new Tuple(ResultSchema.ecmSchema, Array("erge"))) }) } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala index 2abcdf66975..a49860c1e11 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala @@ -45,6 +45,7 @@ class DataProcessorRPCHandlerInitializer(val dp: DataProcessor) with ResumeHandler with StartHandler with EndHandler + with EndIterationHandler with StartChannelHandler with EndChannelHandler with AssignPortHandler diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala new file mode 100644 index 00000000000..e1827c7e949 --- /dev/null +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/EndIterationHandler.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.worker.promisehandlers + +import com.twitter.util.Future +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{AsyncRPCContext, EmptyRequest, EndIterationRequest} +import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn +import org.apache.texera.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer +import org.apache.texera.amber.operator.loop.LoopEndOpExec + +trait EndIterationHandler { + this: DataProcessorRPCHandlerInitializer => + + override def endIteration( + request: EndIterationRequest, + ctx: AsyncRPCContext + ): Future[EmptyReturn] = { + dp.executor match { + case _: LoopEndOpExec => + //workerInterface.nextIteration(EmptyRequest(), mkContext(request.worker)) + case _ => + //dp.processOnFinish() + //dp.outputManager.finalizeIteration(request.worker) + } + EmptyReturn() + } +} \ No newline at end of file diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartv2OpDesc.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartv2OpDesc.scala new file mode 100644 index 00000000000..5f5050b384a --- /dev/null +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartv2OpDesc.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.loop + +import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import org.apache.texera.amber.core.executor.OpExecWithCode +import org.apache.texera.amber.core.tuple.Schema +import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity} +import org.apache.texera.amber.core.workflow.{InputPort, OutputPort, PhysicalOp, PortIdentity, SchemaPropagationFunc} +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo} + +class LoopStartv2OpDesc extends LogicalOp { + override def getPhysicalOp( + workflowId: WorkflowIdentity, + executionId: ExecutionIdentity + ): PhysicalOp = { + val pythonCode = + try { + generatePythonCode() + } catch { + case ex: Throwable => + s"#EXCEPTION DURING CODE GENERATION: ${ex.getMessage}" + } + PhysicalOp.oneToOnePhysicalOp( + workflowId, + executionId, + operatorIdentifier, + OpExecWithCode(pythonCode, "python") + ) + .withInputPorts(operatorInfo.inputPorts) + .withOutputPorts(operatorInfo.outputPorts) + .withSuggestedWorkerNum(1) + .withParallelizable(false) + } + + @JsonSchemaTitle("Output columns") + var lambdaAttributeUnits: List[LambdaAttributeUnit] = List() + + + override def operatorInfo: OperatorInfo = + OperatorInfo( + "Python Table Reducer", + "Reduce Table to Tuple", + OperatorGroupConstants.PYTHON_GROUP, + inputPorts = List(InputPort()), + outputPorts = List(OutputPort()) + ) + + def generatePythonCode(): String = { + s""" + |from pytexera import * + |class ProcessTableOperator(UDFTableOperator): + | + | @overrides + | def process_table(self, table: Table, port: int) -> Iterator[Optional[TableLike]]: + | yield table + |""".stripMargin + } +}