Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,13 @@ message PrepareCheckpointRequest{
bool estimationOnly = 2;
}

enum StatisticsUpdateTarget {
BOTH_UI_AND_PERSISTENCE = 0;
UI_ONLY = 1;
PERSISTENCE_ONLY = 2;
}

message QueryStatisticsRequest{
repeated core.ActorVirtualIdentity filterByWorkers = 1;
StatisticsUpdateTarget updateTarget = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ case class ExecutionStateUpdate(state: WorkflowAggregatedState) extends ClientEv

case class ExecutionStatsUpdate(operatorMetrics: Map[String, OperatorMetrics]) extends ClientEvent

case class RuntimeStatisticsPersist(operatorMetrics: Map[String, OperatorMetrics])
extends ClientEvent

case class ReportCurrentProcessingTuple(
operatorID: String,
tuple: Array[(Tuple, ActorVirtualIdentity)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,16 @@ object ControllerConfig {
def default: ControllerConfig =
ControllerConfig(
statusUpdateIntervalMs = Option(ApplicationConfig.getStatusUpdateIntervalInMs),
runtimeStatisticsPersistenceIntervalMs =
Option(ApplicationConfig.getRuntimeStatisticsPersistenceIntervalInMs),
stateRestoreConfOpt = None,
faultToleranceConfOpt = None
)
}

final case class ControllerConfig(
statusUpdateIntervalMs: Option[Long],
runtimeStatisticsPersistenceIntervalMs: Option[Long],
stateRestoreConfOpt: Option[StateRestoreConfig],
faultToleranceConfOpt: Option[FaultToleranceConfig]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import org.apache.pekko.actor.Cancellable
import org.apache.texera.amber.engine.architecture.common.AkkaActorService
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
AsyncRPCContext,
QueryStatisticsRequest
QueryStatisticsRequest,
StatisticsUpdateTarget
}
import org.apache.texera.amber.engine.architecture.rpc.controllerservice.ControllerServiceGrpc.METHOD_CONTROLLER_INITIATE_QUERY_STATISTICS
import org.apache.texera.amber.engine.common.rpc.AsyncRPCClient.ControlInvocation
Expand All @@ -36,28 +37,61 @@ class ControllerTimerService(
akkaActorService: AkkaActorService
) {
var statusUpdateAskHandle: Option[Cancellable] = None
var runtimeStatisticsAskHandle: Option[Cancellable] = None

def enableStatusUpdate(): Unit = {
if (controllerConfig.statusUpdateIntervalMs.nonEmpty && statusUpdateAskHandle.isEmpty) {
statusUpdateAskHandle = Option(
private def enableTimer(
intervalMs: Option[Long],
updateTarget: StatisticsUpdateTarget,
handleOpt: Option[Cancellable]
): Option[Cancellable] = {
if (intervalMs.nonEmpty && handleOpt.isEmpty) {
Option(
akkaActorService.sendToSelfWithFixedDelay(
0.milliseconds,
FiniteDuration.apply(controllerConfig.statusUpdateIntervalMs.get, MILLISECONDS),
FiniteDuration.apply(intervalMs.get, MILLISECONDS),
ControlInvocation(
METHOD_CONTROLLER_INITIATE_QUERY_STATISTICS,
QueryStatisticsRequest(Seq.empty),
QueryStatisticsRequest(Seq.empty, updateTarget),
AsyncRPCContext(SELF, SELF),
0
)
)
)
} else {
handleOpt
}
}

def disableStatusUpdate(): Unit = {
if (statusUpdateAskHandle.nonEmpty) {
statusUpdateAskHandle.get.cancel()
statusUpdateAskHandle = Option.empty
private def disableTimer(handleOpt: Option[Cancellable]): Option[Cancellable] = {
if (handleOpt.nonEmpty) {
handleOpt.get.cancel()
Option.empty
} else {
handleOpt
}
}

def enableStatusUpdate(): Unit = {
statusUpdateAskHandle = enableTimer(
controllerConfig.statusUpdateIntervalMs,
StatisticsUpdateTarget.UI_ONLY,
statusUpdateAskHandle
)
}

def enableRuntimeStatisticsCollection(): Unit = {
runtimeStatisticsAskHandle = enableTimer(
controllerConfig.runtimeStatisticsPersistenceIntervalMs,
StatisticsUpdateTarget.PERSISTENCE_ONLY,
runtimeStatisticsAskHandle
)
}

def disableStatusUpdate(): Unit = {
statusUpdateAskHandle = disableTimer(statusUpdateAskHandle)
}

def disableRuntimeStatisticsCollection(): Unit = {
runtimeStatisticsAskHandle = disableTimer(runtimeStatisticsAskHandle)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity
import org.apache.texera.amber.engine.architecture.controller.{
ControllerAsyncRPCHandlerInitializer,
ExecutionStateUpdate,
ExecutionStatsUpdate
ExecutionStatsUpdate,
RuntimeStatisticsPersist
}
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
AsyncRPCContext,
Expand All @@ -47,6 +48,7 @@ trait PauseHandler {

override def pauseWorkflow(request: EmptyRequest, ctx: AsyncRPCContext): Future[EmptyReturn] = {
cp.controllerTimerService.disableStatusUpdate() // to be enabled in resume
cp.controllerTimerService.disableRuntimeStatisticsCollection() // to be enabled in resume
Future
.collect(
cp.workflowExecution.getRunningRegionExecutions
Expand Down Expand Up @@ -81,12 +83,10 @@ trait PauseHandler {
.toSeq
)
.map { _ =>
// update frontend workflow status
sendToClient(
ExecutionStatsUpdate(
cp.workflowExecution.getAllRegionExecutionsStats
)
)
// update frontend workflow status and persist statistics
val stats = cp.workflowExecution.getAllRegionExecutionsStats
sendToClient(ExecutionStatsUpdate(stats))
sendToClient(RuntimeStatisticsPersist(stats))
sendToClient(ExecutionStateUpdate(cp.workflowExecution.getState))
logger.info(s"workflow paused")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ import org.apache.texera.amber.engine.architecture.controller.{
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
AsyncRPCContext,
PortCompletedRequest,
QueryStatisticsRequest
QueryStatisticsRequest,
StatisticsUpdateTarget
}
import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER
Expand All @@ -50,7 +51,13 @@ trait PortCompletedHandler {
ctx: AsyncRPCContext
): Future[EmptyReturn] = {
controllerInterface
.controllerInitiateQueryStatistics(QueryStatisticsRequest(scala.Seq(ctx.sender)), CONTROLLER)
.controllerInitiateQueryStatistics(
QueryStatisticsRequest(
scala.Seq(ctx.sender),
StatisticsUpdateTarget.BOTH_UI_AND_PERSISTENCE
),
CONTROLLER
)
.map { _ =>
val globalPortId = GlobalPortIdentity(
VirtualIdentityUtils.getPhysicalOpId(ctx.sender),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@
package org.apache.texera.amber.engine.architecture.controller.promisehandlers

import com.twitter.util.Future
import org.apache.texera.amber.config.ApplicationConfig
import org.apache.texera.amber.core.virtualidentity.PhysicalOpIdentity
import org.apache.texera.amber.engine.architecture.controller.{
ControllerAsyncRPCHandlerInitializer,
ExecutionStatsUpdate
ExecutionStatsUpdate,
RuntimeStatisticsPersist
}
import org.apache.texera.amber.engine.architecture.deploysemantics.layer.WorkerExecution
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
AsyncRPCContext,
EmptyRequest,
QueryStatisticsRequest
QueryStatisticsRequest,
StatisticsUpdateTarget
}
import org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED
import org.apache.texera.amber.engine.architecture.rpc.controlreturns.{
Expand All @@ -47,19 +50,54 @@ trait QueryWorkerStatisticsHandler {

private var globalQueryStatsOngoing = false

// Minimum of the two timer intervals converted to nanoseconds.
// A full-graph worker query is skipped and served from cache when the last completed
// query falls within this window, avoiding redundant worker RPCs.
private val minQueryIntervalNs: Long =
Math.min(
ApplicationConfig.getStatusUpdateIntervalInMs,
ApplicationConfig.getRuntimeStatisticsPersistenceIntervalInMs
) * 1_000_000L

// Nanosecond timestamp of the last completed full-graph worker stats query.
@volatile private var lastWorkerQueryTimestampNs: Long = 0L

// Reads the current cached stats and forwards them to the appropriate client sink(s).
private def forwardStats(updateTarget: StatisticsUpdateTarget): Unit = {
val stats = cp.workflowExecution.getAllRegionExecutionsStats
updateTarget match {
case StatisticsUpdateTarget.UI_ONLY =>
sendToClient(ExecutionStatsUpdate(stats))
case StatisticsUpdateTarget.PERSISTENCE_ONLY =>
sendToClient(RuntimeStatisticsPersist(stats))
case StatisticsUpdateTarget.BOTH_UI_AND_PERSISTENCE |
StatisticsUpdateTarget.Unrecognized(_) =>
sendToClient(ExecutionStatsUpdate(stats))
sendToClient(RuntimeStatisticsPersist(stats))
}
}

override def controllerInitiateQueryStatistics(
msg: QueryStatisticsRequest,
ctx: AsyncRPCContext
): Future[EmptyReturn] = {
// Avoid issuing concurrent full-graph statistics queries.
// If a global query is already in progress, skip this request.
if (globalQueryStatsOngoing && msg.filterByWorkers.isEmpty) {
// A query is already in-flight: serve the last completed query's cached data,
// or drop silently if no prior query has finished yet.
if (lastWorkerQueryTimestampNs > 0) forwardStats(msg.updateTarget)
return EmptyReturn()
}

var opFilter: Set[PhysicalOpIdentity] = Set.empty
// Only enforce the single-query restriction for full-graph queries.
if (msg.filterByWorkers.isEmpty) {
if (System.nanoTime() - lastWorkerQueryTimestampNs < minQueryIntervalNs) {
// Cache is still fresh: the faster timer already queried workers recently.
forwardStats(msg.updateTarget)
return EmptyReturn()
}
globalQueryStatsOngoing = true
} else {
// Map the filtered worker IDs (if any) to their corresponding physical operator IDs
Expand Down Expand Up @@ -133,17 +171,17 @@ trait QueryWorkerStatisticsHandler {
Future.collect(futures).flatMap(_ => processLayers(rest))
}

// Start processing all layers and update the frontend after completion
// Start processing all layers and forward stats to the appropriate sink(s) on completion.
processLayers(layers).map { _ =>
collectedResults.foreach {
case (wExec, resp, timestamp) =>
wExec.update(timestamp, resp.metrics.workerState, resp.metrics.workerStatistics)
}
sendToClient(
ExecutionStatsUpdate(cp.workflowExecution.getAllRegionExecutionsStats)
)
// Release the global query lock if it was set
forwardStats(msg.updateTarget)
// Record the completion timestamp before releasing the lock so that any timer
// firing in between sees a valid cache entry rather than triggering a redundant query.
if (globalQueryStatsOngoing) {
lastWorkerQueryTimestampNs = System.nanoTime()
globalQueryStatsOngoing = false
}
EmptyReturn()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ package org.apache.texera.amber.engine.architecture.controller.promisehandlers
import com.twitter.util.Future
import org.apache.texera.amber.engine.architecture.controller.{
ControllerAsyncRPCHandlerInitializer,
ExecutionStatsUpdate
ExecutionStatsUpdate,
RuntimeStatisticsPersist
}
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
AsyncRPCContext,
Expand Down Expand Up @@ -57,14 +58,14 @@ trait ResumeHandler {
.toSeq
)
.map { _ =>
// update frontend status
sendToClient(
ExecutionStatsUpdate(
cp.workflowExecution.getAllRegionExecutionsStats
)
)
// update frontend status and persist statistics
val stats = cp.workflowExecution.getAllRegionExecutionsStats
sendToClient(ExecutionStatsUpdate(stats))
sendToClient(RuntimeStatisticsPersist(stats))
cp.controllerTimerService
.enableStatusUpdate() //re-enabled it since it is disabled in pause
cp.controllerTimerService
.enableRuntimeStatisticsCollection() //re-enabled it since it is disabled in pause
EmptyReturn()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ trait StartWorkflowHandler {
.coordinateRegionExecutors(cp.actorService)
.map(_ => {
cp.controllerTimerService.enableStatusUpdate()
cp.controllerTimerService.enableRuntimeStatisticsCollection()
StartWorkflowResponse(RUNNING)
})
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import org.apache.texera.amber.engine.architecture.controller.{
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
AsyncRPCContext,
EmptyRequest,
QueryStatisticsRequest
QueryStatisticsRequest,
StatisticsUpdateTarget
}
import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn
import org.apache.texera.amber.engine.common.virtualidentity.util.SELF
Expand All @@ -52,7 +53,7 @@ trait WorkerExecutionCompletedHandler {
// and the user sees the last update before completion
val statsRequest =
controllerInterface.controllerInitiateQueryStatistics(
QueryStatisticsRequest(Seq(ctx.sender)),
QueryStatisticsRequest(Seq(ctx.sender), StatisticsUpdateTarget.BOTH_UI_AND_PERSISTENCE),
mkContext(SELF)
)

Expand All @@ -64,6 +65,7 @@ trait WorkerExecutionCompletedHandler {
// after query result come back: send completed event, cleanup ,and kill workflow
sendToClient(ExecutionStateUpdate(cp.workflowExecution.getState))
cp.controllerTimerService.disableStatusUpdate()
cp.controllerTimerService.disableRuntimeStatisticsCollection()
}
})
EmptyReturn()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ package org.apache.texera.amber.engine.architecture.controller.promisehandlers
import com.twitter.util.Future
import org.apache.texera.amber.engine.architecture.controller.{
ControllerAsyncRPCHandlerInitializer,
ExecutionStatsUpdate
ExecutionStatsUpdate,
RuntimeStatisticsPersist
}
import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{
AsyncRPCContext,
Expand Down Expand Up @@ -50,11 +51,9 @@ trait WorkerStateUpdatedHandler {
.foreach(operatorExecution =>
operatorExecution.getWorkerExecution(ctx.sender).update(System.nanoTime(), msg.state)
)
sendToClient(
ExecutionStatsUpdate(
cp.workflowExecution.getAllRegionExecutionsStats
)
)
val stats = cp.workflowExecution.getAllRegionExecutionsStats
sendToClient(ExecutionStatsUpdate(stats))
sendToClient(RuntimeStatisticsPersist(stats))
EmptyReturn()
}
}
Loading
Loading