Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
592ff64
init
aglinxinyuan Feb 9, 2026
b95465d
update
aglinxinyuan Feb 11, 2026
f83041a
Merge branch 'main' into xinyuan-loop-feb
aglinxinyuan Feb 11, 2026
d9d0cd9
update
aglinxinyuan Feb 11, 2026
144ae29
update
aglinxinyuan Feb 11, 2026
7b13fef
update
aglinxinyuan Feb 11, 2026
bd5ac3a
update
aglinxinyuan Feb 11, 2026
19be0c1
update
aglinxinyuan Feb 11, 2026
706884f
update
aglinxinyuan Feb 11, 2026
21e6a41
update
aglinxinyuan Feb 11, 2026
24da3e3
update
aglinxinyuan Feb 11, 2026
2ba0fa4
update
aglinxinyuan Feb 11, 2026
a05ffd1
update
aglinxinyuan Feb 11, 2026
44fc0e7
update
aglinxinyuan Feb 11, 2026
846aac2
update
aglinxinyuan Feb 12, 2026
6be7dc5
update
aglinxinyuan Feb 12, 2026
d8338d1
update
aglinxinyuan Feb 12, 2026
36e517e
fix
aglinxinyuan Feb 13, 2026
a4bfbdb
fix
aglinxinyuan Feb 13, 2026
393faac
Merge branch 'main' into xinyuan-loop-feb
aglinxinyuan Feb 14, 2026
cbb2fc7
Merge branch 'xiaozhen-sync-region-kill' into xinyuan-loop-feb
aglinxinyuan Feb 14, 2026
084f602
update
aglinxinyuan Feb 15, 2026
ae0d4ed
Merge branch 'xiaozhen-sync-region-kill' into xinyuan-loop-feb
aglinxinyuan Feb 15, 2026
55d5cec
update
aglinxinyuan Feb 15, 2026
b8faf93
update
aglinxinyuan Feb 15, 2026
a53506a
update
aglinxinyuan Feb 15, 2026
d44a664
update
aglinxinyuan Feb 15, 2026
1cd48fd
update
aglinxinyuan Feb 15, 2026
e35a332
update
aglinxinyuan Feb 16, 2026
4d18d1d
Merge branch 'xiaozhen-sync-region-kill' into xinyuan-loop-feb
aglinxinyuan Feb 17, 2026
30a8562
update
aglinxinyuan Feb 24, 2026
b717fb0
update
aglinxinyuan Feb 24, 2026
da8d6ed
Merge remote-tracking branch 'origin/xinyuan-loop-feb' into xinyuan-l…
aglinxinyuan Feb 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ message ControlRequest {
EmptyRequest emptyRequest = 56;
PrepareCheckpointRequest prepareCheckpointRequest = 57;
QueryStatisticsRequest queryStatisticsRequest = 58;
EndIterationRequest endIterationRequest = 59;

// request for testing
Ping ping = 100;
Expand Down Expand Up @@ -271,4 +272,9 @@ message PrepareCheckpointRequest{

message QueryStatisticsRequest{
repeated core.ActorVirtualIdentity filterByWorkers = 1;
}

message EndIterationRequest{
core.ActorVirtualIdentity LoopStartId = 1 [(scalapb.field).no_box = true];
int32 iteration = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ service WorkerService {
rpc EndWorker(EmptyRequest) returns (EmptyReturn);
rpc StartChannel(EmptyRequest) returns (EmptyReturn);
rpc EndChannel(EmptyRequest) returns (EmptyReturn);
rpc EndIteration(EndIterationRequest) returns (EmptyReturn);
rpc DebugCommand(DebugCommandRequest) returns (EmptyReturn);
rpc EvaluatePythonExpression(EvaluatePythonExpressionRequest) returns (EvaluatedValue);
rpc NoOperation(EmptyRequest) returns (EmptyReturn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ abstract class AmberProcessor(
with Serializable {

/** FIFO & exactly once */
val inputGateway: InputGateway = new NetworkInputGateway(this.actorId)
val inputGateway: NetworkInputGateway = new NetworkInputGateway(this.actorId)

// 1. Unified Output
val outputGateway: NetworkOutputGateway =
Expand All @@ -55,7 +55,7 @@ abstract class AmberProcessor(
}
)
// 2. RPC Layer
val asyncRPCClient = new AsyncRPCClient(outputGateway, actorId)
val asyncRPCClient = new AsyncRPCClient(inputGateway, outputGateway, actorId)
val asyncRPCServer: AsyncRPCServer =
new AsyncRPCServer(outputGateway, actorId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class WorkflowScheduler(
this.physicalPlan = updatedPhysicalPlan
}

def getNextRegions: Set[Region] = if (!schedule.hasNext) Set() else schedule.next()
def getNextRegions: Set[Region] = if (!schedule.hasNext) Set() else schedule.loopNext()

def hasPendingRegions: Boolean = schedule != null && schedule.hasNext

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ case class WorkflowExecution() {
* @throws AssertionError if the `RegionExecution` has already been initialized.
*/
def initRegionExecution(region: Region): RegionExecution = {
regionExecutions.remove(region.id)
// ensure the region execution hasn't been initialized already.
assert(
!regionExecutions.contains(region.id),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class AmberFIFOChannel(val channelId: ChannelIdentity) extends AmberLogging {
private var portId: Option[PortIdentity] = None

def acceptMessage(msg: WorkflowFIFOMessage): Unit = {
//channel remove
val seq = msg.sequenceNumber
val payload = msg.payload
if (isDuplicated(seq)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,8 @@ class NetworkInputGateway(val actorId: ActorVirtualIdentity)
enforcers += enforcer
}

def removeControlChannel(from: ActorVirtualIdentity): Unit = {
inputChannels.remove(ChannelIdentity(from, actorId, isControl = true))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,8 @@ class NetworkOutputGateway(
idToSequenceNums.getOrElseUpdate(channelId, new AtomicLong()).getAndIncrement()
}

def removeControlChannel(to: ActorVirtualIdentity): Unit = {
idToSequenceNums.remove(ChannelIdentity(actorId, to, isControl = true))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,14 @@ package org.apache.texera.amber.engine.architecture.messaginglayer
import org.apache.texera.amber.core.state.State
import org.apache.texera.amber.core.storage.DocumentFactory
import org.apache.texera.amber.core.storage.model.BufferedItemWriter
import org.apache.texera.amber.core.storage.result.ResultSchema
import org.apache.texera.amber.core.tuple._
import org.apache.texera.amber.core.virtualidentity.{ActorVirtualIdentity, ChannelIdentity}
import org.apache.texera.amber.core.workflow.{PhysicalLink, PortIdentity}
import org.apache.texera.amber.engine.architecture.messaginglayer.OutputManager.{
DPOutputIterator,
getBatchSize,
toPartitioner
}
import org.apache.texera.amber.engine.architecture.messaginglayer.OutputManager.{DPOutputIterator, getBatchSize, toPartitioner}
import org.apache.texera.amber.engine.architecture.sendsemantics.partitioners._
import org.apache.texera.amber.engine.architecture.sendsemantics.partitionings._
import org.apache.texera.amber.engine.architecture.worker.managers.{
OutputPortResultWriterThread,
PortStorageWriterTerminateSignal
}
import org.apache.texera.amber.engine.architecture.worker.managers.{OutputPortResultWriterThread, PortStorageWriterTerminateSignal}
import org.apache.texera.amber.engine.common.AmberLogging
import org.apache.texera.amber.util.VirtualIdentityUtils

Expand Down Expand Up @@ -124,6 +118,10 @@ class OutputManager(
: mutable.HashMap[PortIdentity, OutputPortResultWriterThread] =
mutable.HashMap()

private val ECMWriters
: mutable.HashMap[PortIdentity, BufferedItemWriter[Tuple]] =
mutable.HashMap()

/**
* Add down stream operator and its corresponding Partitioner.
*
Expand Down Expand Up @@ -232,6 +230,23 @@ class OutputManager(
})
}

def saveECMToStorageIfNeeded(
tuple: Tuple,
outputPortId: Option[PortIdentity] = None
): Unit = {
(outputPortId match {
case Some(portId) =>
this.ECMWriters.get(portId) match {
case Some(_) => this.ECMWriters.filter(_._1 == portId)
case None => Map.empty
}
case None => this.ECMWriters
}).foreach({
case (portId, writer) =>
writer.putOne(new Tuple(ResultSchema.ecmSchema, Array(worker.name)))
})
}

/**
* Singal the port storage writer to flush the remaining buffer and wait for commits to finish so that
* the output port is properly completed. If the output port does not need storage, no action will be done.
Expand Down Expand Up @@ -280,6 +295,10 @@ class OutputManager(
}

private def setupOutputStorageWriterThread(portId: PortIdentity, storageUri: URI): Unit = {
this.ECMWriters(portId) = DocumentFactory
.createDocument(storageUri.resolve("ecm"), ResultSchema.ecmSchema)
.writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
.asInstanceOf[BufferedItemWriter[Tuple]]
val bufferedItemWriter = DocumentFactory
.openDocument(storageUri)
._1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ class RegionExecutionCoordinator(
val actorRef = actorRefService.getActorRef(workerId)
// Remove the actorRef so that no other actors can find the worker and send messages.
actorRefService.removeActorRef(workerId)
asyncRPCClient.inputGateway.removeControlChannel(workerId)
asyncRPCClient.outputGateway.removeControlChannel(workerId)
gracefulStop(actorRef, ScalaDuration(5, TimeUnit.SECONDS)).asTwitter()
}
}.toSeq
Expand Down Expand Up @@ -569,7 +571,18 @@ class RegionExecutionCoordinator(
region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3
val schema =
schemaOptional.getOrElse(throw new IllegalStateException("Schema is missing"))
DocumentFactory.createDocument(storageUriToAdd, schema)

if (region.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopEnd-operator-"))) {
try {
DocumentFactory.openDocument(storageUriToAdd)
} catch {
case _: Exception =>
DocumentFactory.createDocument(storageUriToAdd, schema)
}
} else {
DocumentFactory.createDocument(storageUriToAdd, schema)
}

WorkflowExecutionsResource.insertOperatorPortResultUri(
eid = eid,
globalPortId = outputPortId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,15 @@

package org.apache.texera.amber.engine.architecture.scheduling

import org.apache.texera.amber.core.executor.OpExecWithClassName
import org.apache.texera.amber.operator.loop.{LoopEndOpDesc, LoopStartOpDesc}
import org.apache.texera.amber.util.JSONUtils.objectMapper

case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends Iterator[Set[Region]] {
private var currentLevel = levelSets.keys.minOption.getOrElse(0)
private var loopStartLevel = currentLevel
private var iteration = 1
private var i = 1

def getRegions: List[Region] = levelSets.values.flatten.toList

Expand All @@ -31,4 +38,31 @@ case class Schedule(private val levelSets: Map[Int, Set[Region]]) extends Iterat
currentLevel += 1
regions
}

def loopNext(): Set[Region] = {
val regions = levelSets(currentLevel)
if (
regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopStart-operator-")))
) {
iteration = objectMapper
.readValue(
regions.head.getOperators.head.opExecInitInfo
.asInstanceOf[OpExecWithClassName]
.descString,
classOf[LoopStartOpDesc]
)
.iteration
loopStartLevel = currentLevel - 1
}
if (
regions.exists(_.getOperators.exists(_.id.logicalOpId.id.startsWith("LoopEnd-operator-")))
) {
if (i < iteration) {
currentLevel = loopStartLevel
i += 1
}
}
currentLevel += 1
regions
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import org.apache.texera.amber.core.virtualidentity.{
EmbeddedControlMessageIdentity
}
import org.apache.texera.amber.engine.architecture.controller.ClientEvent
import org.apache.texera.amber.engine.architecture.messaginglayer.NetworkOutputGateway
import org.apache.texera.amber.engine.architecture.messaginglayer.{
NetworkInputGateway,
NetworkOutputGateway
}
import org.apache.texera.amber.engine.architecture.rpc.controlcommands._
import org.apache.texera.amber.engine.architecture.rpc.controllerservice.ControllerServiceFs2Grpc
import org.apache.texera.amber.engine.architecture.rpc.controlreturns.{
Expand Down Expand Up @@ -125,7 +128,8 @@ object AsyncRPCClient {
}

class AsyncRPCClient(
outputGateway: NetworkOutputGateway,
val inputGateway: NetworkInputGateway,
val outputGateway: NetworkOutputGateway,
val actorId: ActorVirtualIdentity
) extends AmberLogging {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ object WorkflowExecutionsResource {
OPERATOR_PORT_EXECUTIONS.RESULT_URI
)
.values(eid.id.toInt, globalPortId.serializeAsString, uri.toString)
.onConflict()
.doNothing()
.execute()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,9 @@ object ResultSchema {
val consoleMessagesSchema: Schema = new Schema(
new Attribute("message", AttributeType.STRING)
)

val ecmSchema: Schema = new Schema(
new Attribute("LoopStartId", AttributeType.STRING),
new Attribute("iteration", AttributeType.INTEGER)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.iceberg.io.{DataWriter, OutputFile}
import org.apache.iceberg.parquet.Parquet
import org.apache.iceberg.{Schema, Table}

import java.nio.file.Paths
import java.nio.file.{Files, Path, Paths}
import scala.collection.mutable.ArrayBuffer

/**
Expand Down Expand Up @@ -107,9 +107,12 @@ private[storage] class IcebergTableWriter[T](
private def flushBuffer(): Unit = {
if (buffer.nonEmpty) {
// Create a unique file path using the writer's identifier and the filename index
val filepath = Paths.get(table.location()).resolve(s"${writerIdentifier}_${filenameIdx}")
// Increment the filename index by 1
filenameIdx += 1
var filepath: Path = null
do {
filepath = Paths.get(table.location()).resolve(s"${writerIdentifier}_$filenameIdx")
filenameIdx+= 1
} while (Files.exists(filepath))

val outputFile: OutputFile = table.io().newOutputFile(filepath.toString)
// Create a Parquet data writer to write a new file
val dataWriter: DataWriter[Record] = Parquet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ import org.apache.texera.amber.operator.visualization.volcanoPlot.VolcanoPlotOpD
import org.apache.texera.amber.operator.visualization.waterfallChart.WaterfallChartOpDesc
import org.apache.texera.amber.operator.visualization.wordCloud.WordCloudOpDesc
import org.apache.commons.lang3.builder.{EqualsBuilder, HashCodeBuilder, ToStringBuilder}
import org.apache.texera.amber.operator.loop.{LoopEndOpDesc, LoopStartOpDesc}
import org.apache.texera.amber.operator.sklearn.testing.SklearnTestingOpDesc
import org.apache.texera.amber.operator.visualization.stripChart.StripChartOpDesc

Expand Down Expand Up @@ -203,6 +204,8 @@ trait StateTransferFunc
new Type(value = classOf[TypeCastingOpDesc], name = "TypeCasting"),
new Type(value = classOf[LimitOpDesc], name = "Limit"),
new Type(value = classOf[SleepOpDesc], name = "Sleep"),
new Type(value = classOf[LoopStartOpDesc], name = "LoopStart"),
new Type(value = classOf[LoopEndOpDesc], name = "LoopEnd"),
new Type(value = classOf[RandomKSamplingOpDesc], name = "RandomKSampling"),
new Type(value = classOf[ReservoirSamplingOpDesc], name = "ReservoirSampling"),
new Type(value = classOf[HashJoinOpDesc[String]], name = "HashJoin"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.texera.amber.operator.loop

import org.apache.texera.amber.core.executor.OpExecWithClassName
import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, WorkflowIdentity}
import org.apache.texera.amber.core.workflow.{InputPort, OutputPort, PhysicalOp}
import org.apache.texera.amber.operator.LogicalOp
import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, OperatorInfo}
import org.apache.texera.amber.util.JSONUtils.objectMapper

class LoopEndOpDesc extends LogicalOp {
override def getPhysicalOp(
workflowId: WorkflowIdentity,
executionId: ExecutionIdentity
): PhysicalOp = {
PhysicalOp
.oneToOnePhysicalOp(
workflowId,
executionId,
operatorIdentifier,
OpExecWithClassName(
"org.apache.texera.amber.operator.loop.LoopEndOpExec",
objectMapper.writeValueAsString(this)
)
)
.withInputPorts(operatorInfo.inputPorts)
.withOutputPorts(operatorInfo.outputPorts)
.withParallelizable(false)
.withSuggestedWorkerNum(1)
}

override def operatorInfo: OperatorInfo =
OperatorInfo(
"Loop End",
"Loop End",
OperatorGroupConstants.CONTROL_GROUP,
inputPorts = List(InputPort()),
outputPorts = List(OutputPort())
)
}
Loading
Loading