diff --git a/.github/workflows/github-action-build.yml b/.github/workflows/github-action-build.yml index af2a60920d5..6db42a6f768 100644 --- a/.github/workflows/github-action-build.yml +++ b/.github/workflows/github-action-build.yml @@ -117,6 +117,11 @@ jobs: python-version: '3.11' - name: Show Python run: python --version || python3 --version + - name: Install dependencies + run: | + python -m pip install --upgrade pip + if [ -f amber/requirements.txt ]; then pip install -r amber/requirements.txt; fi + if [ -f amber/operator-requirements.txt ]; then pip install -r amber/operator-requirements.txt; fi - name: Setup sbt launcher uses: sbt/setup-sbt@3e125ece5c3e5248e18da9ed8d2cce3d335ec8dd # v1.1.14 - uses: coursier/cache-action@4e2615869d13561d626ed48655e1a39e5b192b3c # v6.4.9 diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto index d714f64a154..b22c1bdf7c2 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto @@ -25,7 +25,6 @@ import "org/apache/texera/amber/engine/architecture/worker/statistics.proto"; import "org/apache/texera/amber/engine/architecture/sendsemantics/partitionings.proto"; import "scalapb/scalapb.proto"; import "google/protobuf/timestamp.proto"; -import "google/protobuf/any.proto"; option (scalapb.options) = { scope: FILE, @@ -40,12 +39,12 @@ message ControlRequest { TakeGlobalCheckpointRequest takeGlobalCheckpointRequest = 2; DebugCommandRequest debugCommandRequest = 3; EvaluatePythonExpressionRequest evaluatePythonExpressionRequest = 4; - ModifyLogicRequest modifyLogicRequest = 5; - RetryWorkflowRequest retryWorkflowRequest = 6; - ConsoleMessageTriggeredRequest consoleMessageTriggeredRequest = 8; - PortCompletedRequest portCompletedRequest = 9; - WorkerStateUpdatedRequest workerStateUpdatedRequest = 10; - LinkWorkersRequest linkWorkersRequest = 11; + RetryWorkflowRequest retryWorkflowRequest = 5; + ConsoleMessageTriggeredRequest consoleMessageTriggeredRequest = 6; + PortCompletedRequest portCompletedRequest = 7; + WorkerStateUpdatedRequest workerStateUpdatedRequest = 8; + LinkWorkersRequest linkWorkersRequest = 9; + WorkflowReconfigureRequest workflowReconfigureRequest = 10; // request for worker AddInputChannelRequest addInputChannelRequest = 50; @@ -119,7 +118,7 @@ message TakeGlobalCheckpointRequest { } message WorkflowReconfigureRequest{ - ModifyLogicRequest reconfiguration = 1 [(scalapb.field).no_box = true]; + repeated UpdateExecutorRequest reconfiguration = 1; string reconfigurationId = 2; } @@ -134,10 +133,6 @@ message EvaluatePythonExpressionRequest { string operatorId = 2; } -message ModifyLogicRequest { - repeated UpdateExecutorRequest updateRequest = 1; -} - message RetryWorkflowRequest { repeated core.ActorVirtualIdentity workers = 1; } @@ -260,8 +255,7 @@ message InitializeExecutorRequest { message UpdateExecutorRequest { core.PhysicalOpIdentity targetOpId = 1 [(scalapb.field).no_box = true]; - google.protobuf.Any newExecutor = 2 [(scalapb.field).no_box = true]; - google.protobuf.Any stateTransferFunc = 3; + core.OpExecInitInfo newExecInitInfo = 2; } message PrepareCheckpointRequest{ diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto index 70d189a3411..27b4727ee98 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto @@ -45,4 +45,5 @@ service ControllerService { rpc LinkWorkers(LinkWorkersRequest) returns (EmptyReturn); rpc ControllerInitiateQueryStatistics(QueryStatisticsRequest) returns (EmptyReturn); rpc RetryWorkflow(RetryWorkflowRequest) returns (EmptyReturn); + rpc ReconfigureWorkflow(WorkflowReconfigureRequest) returns (EmptyReturn); } \ No newline at end of file diff --git a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto index dbcd6d8a5e0..21944ffefc6 100644 --- a/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto +++ b/amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto @@ -50,4 +50,5 @@ service WorkerService { rpc DebugCommand(DebugCommandRequest) returns (EmptyReturn); rpc EvaluatePythonExpression(EvaluatePythonExpressionRequest) returns (EvaluatedValue); rpc NoOperation(EmptyRequest) returns (EmptyReturn); + rpc UpdateExecutor(UpdateExecutorRequest) returns (EmptyReturn); } \ No newline at end of file diff --git a/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py b/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py index fe33b2dec0d..f2b6d16d46a 100644 --- a/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py +++ b/amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py @@ -15,14 +15,19 @@ # specific language governing permissions and limitations # under the License. -# from proto.org.apache.texera.amber.engine.architecture.worker import UpdateExecutorV2 -# from core.architecture.handlers.control.control_handler_base import ControlHandler -# from core.architecture.managers.context import Context -# -# -# class UpdateExecutorHandler(ControlHandler): -# cmd = UpdateExecutorV2 -# -# def __call__(self, context: Context, command: cmd, *args, **kwargs): -# context.executor_manager.update_executor(command.code, command.is_source) -# return None +from core.architecture.handlers.control.control_handler_base import ControlHandler +from proto.org.apache.texera.amber.engine.architecture.rpc import ( + EmptyReturn, + UpdateExecutorRequest, +) +from core.util import get_one_of +from proto.org.apache.texera.amber.core import OpExecWithCode + + +class UpdateExecutorHandler(ControlHandler): + async def update_executor(self, req: UpdateExecutorRequest) -> EmptyReturn: + op_exec_with_code: OpExecWithCode = get_one_of(req.new_exec_init_info) + self.context.executor_manager.update_executor( + op_exec_with_code.code, self.context.executor_manager.executor.is_source + ) + return EmptyReturn() diff --git a/amber/src/main/python/core/architecture/rpc/async_rpc_handler_initializer.py b/amber/src/main/python/core/architecture/rpc/async_rpc_handler_initializer.py index c2574028a13..146cf91b0d3 100644 --- a/amber/src/main/python/core/architecture/rpc/async_rpc_handler_initializer.py +++ b/amber/src/main/python/core/architecture/rpc/async_rpc_handler_initializer.py @@ -45,6 +45,9 @@ from core.architecture.handlers.control.resume_worker_handler import ResumeWorkerHandler from core.architecture.handlers.control.start_channel_handler import StartChannelHandler from core.architecture.handlers.control.start_worker_handler import StartWorkerHandler +from core.architecture.handlers.control.update_executor_handler import ( + UpdateExecutorHandler, +) class AsyncRPCHandlerInitializer( @@ -64,5 +67,6 @@ class AsyncRPCHandlerInitializer( StartChannelHandler, EndChannelHandler, NoOperationHandler, + UpdateExecutorHandler, ): pass diff --git a/amber/src/main/python/core/runnables/main_loop.py b/amber/src/main/python/core/runnables/main_loop.py index d73c655734f..794224c97f3 100644 --- a/amber/src/main/python/core/runnables/main_loop.py +++ b/amber/src/main/python/core/runnables/main_loop.py @@ -118,7 +118,13 @@ def _check_and_process_control(self) -> None: or not self._input_queue.is_data_enabled() ): next_entry = self.interruptible_get() - self._process_dcm(next_entry) + match( + next_entry, + DCMElement, + self._process_dcm, + ECMElement, + self._process_ecm, + ) @overrides def pre_start(self) -> None: diff --git a/amber/src/main/python/core/runnables/network_receiver.py b/amber/src/main/python/core/runnables/network_receiver.py index fd42a8f589b..d5a765c5739 100644 --- a/amber/src/main/python/core/runnables/network_receiver.py +++ b/amber/src/main/python/core/runnables/network_receiver.py @@ -90,7 +90,8 @@ def data_handler(command: bytes, table: Table) -> int: # Explicitly set is_control to trigger lazy computation. # If not set, it may be computed at different times, # causing hash inconsistencies. - data_header.tag.is_control = False + if not data_header.tag.is_control: + data_header.tag.is_control = False payload = match( data_header.payload_type, "Data", diff --git a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py index ea6ddc5e43f..b341f700814 100644 --- a/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py +++ b/amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/__init__.py @@ -13,7 +13,6 @@ ) import betterproto -import betterproto.lib.google.protobuf as betterproto_lib_google_protobuf import grpclib from betterproto.grpc.grpclib_server import ServiceBase @@ -78,23 +77,23 @@ class ControlRequest(betterproto.Message): evaluate_python_expression_request: "EvaluatePythonExpressionRequest" = ( betterproto.message_field(4, group="sealed_value") ) - modify_logic_request: "ModifyLogicRequest" = betterproto.message_field( - 5, group="sealed_value" - ) retry_workflow_request: "RetryWorkflowRequest" = betterproto.message_field( - 6, group="sealed_value" + 5, group="sealed_value" ) console_message_triggered_request: "ConsoleMessageTriggeredRequest" = ( - betterproto.message_field(8, group="sealed_value") + betterproto.message_field(6, group="sealed_value") ) port_completed_request: "PortCompletedRequest" = betterproto.message_field( - 9, group="sealed_value" + 7, group="sealed_value" ) worker_state_updated_request: "WorkerStateUpdatedRequest" = ( - betterproto.message_field(10, group="sealed_value") + betterproto.message_field(8, group="sealed_value") ) link_workers_request: "LinkWorkersRequest" = betterproto.message_field( - 11, group="sealed_value" + 9, group="sealed_value" + ) + workflow_reconfigure_request: "WorkflowReconfigureRequest" = ( + betterproto.message_field(10, group="sealed_value") ) add_input_channel_request: "AddInputChannelRequest" = betterproto.message_field( 50, group="sealed_value" @@ -192,7 +191,7 @@ class TakeGlobalCheckpointRequest(betterproto.Message): @dataclass(eq=False, repr=False) class WorkflowReconfigureRequest(betterproto.Message): - reconfiguration: "ModifyLogicRequest" = betterproto.message_field(1) + reconfiguration: List["UpdateExecutorRequest"] = betterproto.message_field(1) reconfiguration_id: str = betterproto.string_field(2) @@ -208,11 +207,6 @@ class EvaluatePythonExpressionRequest(betterproto.Message): operator_id: str = betterproto.string_field(2) -@dataclass(eq=False, repr=False) -class ModifyLogicRequest(betterproto.Message): - update_request: List["UpdateExecutorRequest"] = betterproto.message_field(1) - - @dataclass(eq=False, repr=False) class RetryWorkflowRequest(betterproto.Message): workers: List["___core__.ActorVirtualIdentity"] = betterproto.message_field(1) @@ -366,10 +360,7 @@ class InitializeExecutorRequest(betterproto.Message): @dataclass(eq=False, repr=False) class UpdateExecutorRequest(betterproto.Message): target_op_id: "___core__.PhysicalOpIdentity" = betterproto.message_field(1) - new_executor: "betterproto_lib_google_protobuf.Any" = betterproto.message_field(2) - state_transfer_func: "betterproto_lib_google_protobuf.Any" = ( - betterproto.message_field(3) - ) + new_exec_init_info: "___core__.OpExecInitInfo" = betterproto.message_field(2) @dataclass(eq=False, repr=False) @@ -1030,6 +1021,23 @@ async def no_operation( metadata=metadata, ) + async def update_executor( + self, + update_executor_request: "UpdateExecutorRequest", + *, + timeout: Optional[float] = None, + deadline: Optional["Deadline"] = None, + metadata: Optional["MetadataLike"] = None + ) -> "EmptyReturn": + return await self._unary_unary( + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/UpdateExecutor", + update_executor_request, + EmptyReturn, + timeout=timeout, + deadline=deadline, + metadata=metadata, + ) + class ControllerServiceStub(betterproto.ServiceStub): async def retrieve_workflow_state( @@ -1287,6 +1295,23 @@ async def retry_workflow( metadata=metadata, ) + async def reconfigure_workflow( + self, + workflow_reconfigure_request: "WorkflowReconfigureRequest", + *, + timeout: Optional[float] = None, + deadline: Optional["Deadline"] = None, + metadata: Optional["MetadataLike"] = None + ) -> "EmptyReturn": + return await self._unary_unary( + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ReconfigureWorkflow", + workflow_reconfigure_request, + EmptyReturn, + timeout=timeout, + deadline=deadline, + metadata=metadata, + ) + class RpcTesterBase(ServiceBase): @@ -1547,6 +1572,11 @@ async def evaluate_python_expression( async def no_operation(self, empty_request: "EmptyRequest") -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + async def update_executor( + self, update_executor_request: "UpdateExecutorRequest" + ) -> "EmptyReturn": + raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + async def __rpc_add_input_channel( self, stream: "grpclib.server.Stream[AddInputChannelRequest, EmptyReturn]" ) -> None: @@ -1689,6 +1719,13 @@ async def __rpc_no_operation( response = await self.no_operation(request) await stream.send_message(response) + async def __rpc_update_executor( + self, stream: "grpclib.server.Stream[UpdateExecutorRequest, EmptyReturn]" + ) -> None: + request = await stream.recv_message() + response = await self.update_executor(request) + await stream.send_message(response) + def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/AddInputChannel": grpclib.const.Handler( @@ -1811,6 +1848,12 @@ def __mapping__(self) -> Dict[str, grpclib.const.Handler]: EmptyRequest, EmptyReturn, ), + "/org.apache.texera.amber.engine.architecture.rpc.WorkerService/UpdateExecutor": grpclib.const.Handler( + self.__rpc_update_executor, + grpclib.const.Cardinality.UNARY_UNARY, + UpdateExecutorRequest, + EmptyReturn, + ), } @@ -1888,6 +1931,11 @@ async def retry_workflow( ) -> "EmptyReturn": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + async def reconfigure_workflow( + self, workflow_reconfigure_request: "WorkflowReconfigureRequest" + ) -> "EmptyReturn": + raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED) + async def __rpc_retrieve_workflow_state( self, stream: "grpclib.server.Stream[EmptyRequest, RetrieveWorkflowStateResponse]", @@ -1998,6 +2046,13 @@ async def __rpc_retry_workflow( response = await self.retry_workflow(request) await stream.send_message(response) + async def __rpc_reconfigure_workflow( + self, stream: "grpclib.server.Stream[WorkflowReconfigureRequest, EmptyReturn]" + ) -> None: + request = await stream.recv_message() + response = await self.reconfigure_workflow(request) + await stream.send_message(response) + def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/RetrieveWorkflowState": grpclib.const.Handler( @@ -2090,4 +2145,10 @@ def __mapping__(self) -> Dict[str, grpclib.const.Handler]: RetryWorkflowRequest, EmptyReturn, ), + "/org.apache.texera.amber.engine.architecture.rpc.ControllerService/ReconfigureWorkflow": grpclib.const.Handler( + self.__rpc_reconfigure_workflow, + grpclib.const.Cardinality.UNARY_UNARY, + WorkflowReconfigureRequest, + EmptyReturn, + ), } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala index 4d9a36bab43..2902173364e 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala @@ -46,6 +46,7 @@ class ControllerAsyncRPCHandlerInitializer( with DebugCommandHandler with TakeGlobalCheckpointHandler with EmbeddedControlMessageHandler - with RetrieveWorkflowStateHandler { + with RetrieveWorkflowStateHandler + with ReconfigurationHandler { val actorId: ActorVirtualIdentity = cp.actorId } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala new file mode 100644 index 00000000000..210d7c5b98c --- /dev/null +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.controller.promisehandlers + +import com.twitter.util.Future +import org.apache.texera.amber.core.virtualidentity.{ + ChannelIdentity, + EmbeddedControlMessageIdentity +} +import org.apache.texera.amber.engine.architecture.controller.ControllerAsyncRPCHandlerInitializer +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmbeddedControlMessageType.ALL_ALIGNMENT +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ + AsyncRPCContext, + WorkflowReconfigureRequest +} +import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn +import org.apache.texera.amber.engine.common.FriesReconfigurationAlgorithm +import org.apache.texera.amber.engine.common.virtualidentity.util.CONTROLLER +import org.apache.texera.amber.util.VirtualIdentityUtils +import org.apache.texera.amber.engine.architecture.rpc.workerservice.WorkerServiceGrpc.METHOD_UPDATE_EXECUTOR + +import scala.collection.mutable + +trait ReconfigurationHandler { + this: ControllerAsyncRPCHandlerInitializer => + + override def reconfigureWorkflow( + msg: WorkflowReconfigureRequest, + ctx: AsyncRPCContext + ): Future[EmptyReturn] = { + if ( + msg.reconfiguration.exists(req => + cp.workflowScheduler.physicalPlan.getOperator(req.targetOpId).isSourceOperator + ) + ) { + throw new IllegalStateException( + "Reconfiguration cannot be applied to source operators" + ) + } + val futures = mutable.ArrayBuffer[Future[_]]() + val friesComponents = + FriesReconfigurationAlgorithm.getReconfigurations(cp.workflowExecutionCoordinator, msg) + friesComponents.foreach { friesComponent => + if (friesComponent.scope.size == 1) { + val updateExecutorRequest = friesComponent.reconfigurations.head + val workerIds = cp.workflowExecution + .getLatestOperatorExecution(updateExecutorRequest.targetOpId) + .getWorkerIds + workerIds.foreach { worker => + futures.append(workerInterface.updateExecutor(updateExecutorRequest, mkContext(worker))) + } + } else { + val channelScope = cp.workflowExecution.getRunningRegionExecutions + .flatMap(regionExecution => + regionExecution.getAllLinkExecutions + .map(_._2) + .flatMap(linkExecution => linkExecution.getAllChannelExecutions.map(_._1)) + ) + .filter(channelId => { + friesComponent.scope + .contains(VirtualIdentityUtils.getPhysicalOpId(channelId.fromWorkerId)) && + friesComponent.scope + .contains(VirtualIdentityUtils.getPhysicalOpId(channelId.toWorkerId)) + }) + val controlChannels = friesComponent.sources.flatMap { source => + cp.workflowExecution.getLatestOperatorExecution(source).getWorkerIds.flatMap { worker => + Seq( + ChannelIdentity(CONTROLLER, worker, isControl = true), + ChannelIdentity(worker, CONTROLLER, isControl = true) + ) + } + } + val finalScope = channelScope ++ controlChannels + val cmdMapping = + friesComponent.reconfigurations.flatMap { updateReq => + val workers = + cp.workflowExecution.getLatestOperatorExecution(updateReq.targetOpId).getWorkerIds + workers.map(worker => + worker.name -> createInvocation( + METHOD_UPDATE_EXECUTOR.getBareMethodName, + updateReq, + worker + ) + ) + }.toMap + futures += cmdMapping.map { + case (_, (_, singleWorkerUpdateFuture)) => singleWorkerUpdateFuture + } + friesComponent.sources.foreach { source => + cp.workflowExecution.getLatestOperatorExecution(source).getWorkerIds.foreach { worker => + sendECM( + EmbeddedControlMessageIdentity(msg.reconfigurationId), + ALL_ALIGNMENT, + finalScope.toSet, + cmdMapping.map(x => (x._1, x._2._1)), + ChannelIdentity(actorId, worker, isControl = true) + ) + } + } + } + } + Future.collect(futures.toList).map { _ => + EmptyReturn() + } + } + +} diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala index 2abcdf66975..6b0c62ac3f2 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala @@ -20,6 +20,13 @@ package org.apache.texera.amber.engine.architecture.worker import com.twitter.util.Future +import org.apache.texera.amber.core.executor.{ + ExecFactory, + OpExecInitInfo, + OpExecSource, + OpExecWithClassName, + OpExecWithCode +} import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ AsyncRPCContext, @@ -32,6 +39,9 @@ import org.apache.texera.amber.engine.architecture.rpc.workerservice.WorkerServi import org.apache.texera.amber.engine.architecture.worker.promisehandlers._ import org.apache.texera.amber.engine.common.AmberLogging import org.apache.texera.amber.engine.common.rpc.AsyncRPCHandlerInitializer +import org.apache.texera.amber.operator.source.cache.CacheSourceOpExec + +import java.net.URI class DataProcessorRPCHandlerInitializer(val dp: DataProcessor) extends AsyncRPCHandlerInitializer(dp.asyncRPCClient, dp.asyncRPCServer) @@ -52,9 +62,12 @@ class DataProcessorRPCHandlerInitializer(val dp: DataProcessor) with FlushNetworkBufferHandler with RetrieveStateHandler with PrepareCheckpointHandler - with FinalizeCheckpointHandler { + with FinalizeCheckpointHandler + with UpdateExecutorHandler { val actorId: ActorVirtualIdentity = dp.actorId + var cachedTotalWorkerCount = 0 + override def debugCommand( request: DebugCommandRequest, ctx: AsyncRPCContext @@ -69,4 +82,17 @@ class DataProcessorRPCHandlerInitializer(val dp: DataProcessor) ??? override def noOperation(request: EmptyRequest, ctx: AsyncRPCContext): Future[EmptyReturn] = ??? + + def setupExecutor(execInitInfo: OpExecInitInfo, workerIdx: Int, workerCount: Int): Unit = { + dp.executor = execInitInfo match { + case OpExecWithClassName(className, descString) => + ExecFactory.newExecFromJavaClassName(className, descString, workerIdx, workerCount) + case OpExecWithCode(code, _) => + ExecFactory.newExecFromJavaCode(code) + case OpExecSource(storageUri, _) => + new CacheSourceOpExec(URI.create(storageUri)) + case OpExecInitInfo.Empty => + throw new IllegalArgumentException("Empty executor initialization info") + } + } } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala index bf45d8eff9a..212a980e5ed 100644 --- a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala @@ -20,18 +20,14 @@ package org.apache.texera.amber.engine.architecture.worker.promisehandlers import com.twitter.util.Future -import org.apache.texera.amber.core.executor._ import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ AsyncRPCContext, InitializeExecutorRequest } import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn import org.apache.texera.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer -import org.apache.texera.amber.operator.source.cache.CacheSourceOpExec import org.apache.texera.amber.util.VirtualIdentityUtils -import java.net.URI - trait InitializeExecutorHandler { this: DataProcessorRPCHandlerInitializer => @@ -41,18 +37,8 @@ trait InitializeExecutorHandler { ): Future[EmptyReturn] = { dp.serializationManager.setOpInitialization(req) val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId) - val workerCount = req.totalWorkerCount - dp.executor = req.opExecInitInfo match { - case OpExecWithClassName(className, descString) => - ExecFactory.newExecFromJavaClassName(className, descString, workerIdx, workerCount) - case OpExecWithCode(code, _) => - ExecFactory.newExecFromJavaCode(code) - case OpExecSource(storageUri, _) => - new CacheSourceOpExec(URI.create(storageUri)) - case OpExecInitInfo.Empty => - throw new IllegalArgumentException("Empty executor initialization info") - } + cachedTotalWorkerCount = req.totalWorkerCount + setupExecutor(req.opExecInitInfo, workerIdx, cachedTotalWorkerCount) EmptyReturn() } - } diff --git a/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala new file mode 100644 index 00000000000..8ed9ebdc595 --- /dev/null +++ b/amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.worker.promisehandlers + +import com.twitter.util.Future +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ + AsyncRPCContext, + UpdateExecutorRequest +} +import org.apache.texera.amber.engine.architecture.rpc.controlreturns.EmptyReturn +import org.apache.texera.amber.engine.architecture.worker.DataProcessorRPCHandlerInitializer +import org.apache.texera.amber.util.VirtualIdentityUtils + +trait UpdateExecutorHandler { + this: DataProcessorRPCHandlerInitializer => + + override def updateExecutor( + request: UpdateExecutorRequest, + ctx: AsyncRPCContext + ): Future[EmptyReturn] = { + val workerIdx = VirtualIdentityUtils.getWorkerIndex(actorId) + // Close the existing executor (if any) before replacing it to avoid resource leaks. + val oldExecutor = dp.executor + if (oldExecutor != null) { + oldExecutor.close() + } + setupExecutor(request.newExecInitInfo, workerIdx, cachedTotalWorkerCount) + dp.executor.open() + EmptyReturn() + } + +} diff --git a/amber/src/main/scala/org/apache/texera/web/service/FriesReconfigurationAlgorithm.scala b/amber/src/main/scala/org/apache/texera/amber/engine/common/FriesReconfigurationAlgorithm.scala similarity index 72% rename from amber/src/main/scala/org/apache/texera/web/service/FriesReconfigurationAlgorithm.scala rename to amber/src/main/scala/org/apache/texera/amber/engine/common/FriesReconfigurationAlgorithm.scala index c2a15106b19..c13e7801190 100644 --- a/amber/src/main/scala/org/apache/texera/web/service/FriesReconfigurationAlgorithm.scala +++ b/amber/src/main/scala/org/apache/texera/amber/engine/common/FriesReconfigurationAlgorithm.scala @@ -17,13 +17,13 @@ * under the License. */ -package org.apache.texera.web.service +package org.apache.texera.amber.engine.common import org.apache.texera.amber.core.virtualidentity.PhysicalOpIdentity import org.apache.texera.amber.core.workflow.PhysicalPlan import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ - ModifyLogicRequest, - PropagateEmbeddedControlMessageRequest + UpdateExecutorRequest, + WorkflowReconfigureRequest } import org.apache.texera.amber.engine.architecture.scheduling.{Region, WorkflowExecutionCoordinator} import org.jgrapht.alg.connectivity.ConnectivityInspector @@ -34,28 +34,33 @@ import scala.jdk.CollectionConverters.SetHasAsScala object FriesReconfigurationAlgorithm { + case class FriesComponent( + sources: Set[PhysicalOpIdentity], + scope: Set[PhysicalOpIdentity], + reconfigurations: Set[UpdateExecutorRequest] + ) + private def getOneToManyOperators(region: Region): Set[PhysicalOpIdentity] = { region.getOperators.filter(op => op.isOneToManyOp).map(op => op.id) } - def scheduleReconfigurations( + def getReconfigurations( workflowExecutionCoordinator: WorkflowExecutionCoordinator, - reconfiguration: ModifyLogicRequest, - epochMarkerId: String - ): Set[PropagateEmbeddedControlMessageRequest] = { + reconfiguration: WorkflowReconfigureRequest + ): Set[FriesComponent] = { // independently schedule reconfigurations for each region: workflowExecutionCoordinator.getExecutingRegions - .flatMap(region => computeMCS(region, reconfiguration, epochMarkerId)) + .flatMap(region => computeMCS(region, reconfiguration, reconfiguration.reconfigurationId)) } private def computeMCS( region: Region, - reconfiguration: ModifyLogicRequest, + reconfiguration: WorkflowReconfigureRequest, epochMarkerId: String - ): List[PropagateEmbeddedControlMessageRequest] = { + ): List[FriesComponent] = { // add all reconfiguration operators to M - val reconfigOps = reconfiguration.updateRequest.map(req => req.targetOpId).toSet + val reconfigOps = reconfiguration.reconfiguration.map(req => req.targetOpId).toSet val M = mutable.Set.empty ++ reconfigOps // for each one-to-many operator, add it to M if its downstream has a reconfiguration operator @@ -101,30 +106,20 @@ object FriesReconfigurationAlgorithm { // find the MCS components, // for each component, send an epoch marker to each of its source operators - val epochMarkers = new ArrayBuffer[PropagateEmbeddedControlMessageRequest]() + val epochMarkers = new ArrayBuffer[FriesComponent]() val connectedSets = new ConnectivityInspector(mcsPlan.dag).connectedSets() connectedSets.forEach(component => { val componentSet = component.asScala.toSet val componentPlan = mcsPlan.getSubPlan(componentSet) - - // generate the reconfiguration command for this component - // val reconfigCommands = - // reconfiguration.updateRequest - // .filter(req => component.contains(req.targetOpId)) - // val reconfigTargets = reconfigCommands.map(_.targetOpId) - // - // // find the source operators of the component - // val sources = componentSet.intersect(mcsPlan.getSourceOperatorIds) - // epochMarkers += PropagateEmbeddedControlMessageRequest( - // sources.toSeq, - // EmbeddedControlMessageIdentity(epochMarkerId), - // ALL_ALIGNMENT, - // componentPlan.operators.map(_.id).toSeq, - // reconfigTargets, - // ModifyLogicRequest(reconfigCommands), - // METHOD_MODIFY_LOGIC.getBareMethodName - // ) + val reconfigCommands = + reconfiguration.reconfiguration + .filter(req => component.contains(req.targetOpId)) + .toSet + + // find the source operators of the component + val sources = componentSet.intersect(mcsPlan.getSourceOperatorIds) + epochMarkers += FriesComponent(sources, componentPlan.operators.map(_.id), reconfigCommands) }) epochMarkers.toList } diff --git a/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala new file mode 100644 index 00000000000..718478d0e72 --- /dev/null +++ b/amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.e2e + +import com.twitter.util.{Await, Duration, Promise} +import com.typesafe.scalalogging.Logger +import org.apache.pekko.actor.{ActorSystem, Props} +import org.apache.pekko.testkit.{ImplicitSender, TestKit} +import org.apache.pekko.util.Timeout +import org.apache.texera.amber.clustering.SingleNodeListener +import org.apache.texera.amber.core.executor.{OpExecInitInfo, OpExecWithCode} +import org.apache.texera.amber.core.storage.DocumentFactory +import org.apache.texera.amber.core.storage.model.VirtualDocument +import org.apache.texera.amber.core.tuple.Tuple +import org.apache.texera.amber.core.virtualidentity.OperatorIdentity +import org.apache.texera.amber.core.workflow.{PortIdentity, WorkflowContext} +import org.apache.texera.amber.engine.architecture.controller.{ + ControllerConfig, + ExecutionStateUpdate +} +import org.apache.texera.amber.engine.architecture.rpc.controlcommands.{ + EmptyRequest, + UpdateExecutorRequest, + WorkflowReconfigureRequest +} +import org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED +import org.apache.texera.amber.engine.common.AmberRuntime +import org.apache.texera.amber.engine.common.client.AmberClient +import org.apache.texera.amber.engine.e2e.TestUtils.{ + cleanupWorkflowExecutionData, + initiateTexeraDBForTestCases, + setUpWorkflowExecutionData +} +import org.apache.texera.amber.operator.{LogicalOp, TestOperators} +import org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource.getResultUriByLogicalPortId +import org.apache.texera.workflow.LogicalLink +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Outcome, Retries} +import org.scalatest.flatspec.AnyFlatSpecLike + +import scala.concurrent.duration._ + +class ModifyLogicSpec + extends TestKit(ActorSystem("ModifyLogicSpec", AmberRuntime.akkaConfig)) + with ImplicitSender + with AnyFlatSpecLike + with BeforeAndAfterAll + with BeforeAndAfterEach + with Retries { + + /** + * This block retries each test once if it fails. + * In the CI environment, there is a chance that executeWorkflow does not receive "COMPLETED" status. + * Until we find the root cause of this issue, we use a retry mechanism here to stabilize CI runs. + */ + override def withFixture(test: NoArgTest): Outcome = + withRetry { super.withFixture(test) } + + implicit val timeout: Timeout = Timeout(5.seconds) + + val logger = Logger("ModifyLogicSpecLogger") + val ctx = new WorkflowContext() + + override protected def beforeEach(): Unit = { + setUpWorkflowExecutionData() + } + + override protected def afterEach(): Unit = { + cleanupWorkflowExecutionData() + } + + override def beforeAll(): Unit = { + system.actorOf(Props[SingleNodeListener](), "cluster-info") + // These test cases access postgres in CI, but occasionally the jdbc driver cannot be found during CI run. + // Explicitly load the JDBC driver to avoid flaky CI failures. + Class.forName("org.postgresql.Driver") + initiateTexeraDBForTestCases() + } + + override def afterAll(): Unit = { + TestKit.shutdownActorSystem(system) + } + + def shouldReconfigure( + operators: List[LogicalOp], + links: List[LogicalLink], + targetOps: Seq[LogicalOp], + newOpExecInitInfo: OpExecInitInfo + ): Map[OperatorIdentity, List[Tuple]] = { + val workflow = + TestUtils.buildWorkflow(operators, links, ctx) + val client = + new AmberClient( + system, + workflow.context, + workflow.physicalPlan, + ControllerConfig.default, + error => {} + ) + val completion = Promise[Unit]() + var result: Map[OperatorIdentity, List[Tuple]] = null + client + .registerCallback[ExecutionStateUpdate](evt => { + if (evt.state == COMPLETED) { + result = workflow.logicalPlan.getTerminalOperatorIds + .filter(terminalOpId => { + val uri = getResultUriByLogicalPortId( + workflow.context.executionId, + terminalOpId, + PortIdentity() + ) + uri.nonEmpty + }) + .map(terminalOpId => { + //TODO: remove the delay after fixing the issue of reporting "completed" status too early. + Thread.sleep(1000) + val uri = getResultUriByLogicalPortId( + workflow.context.executionId, + terminalOpId, + PortIdentity() + ).get + terminalOpId -> DocumentFactory + .openDocument(uri) + ._1 + .asInstanceOf[VirtualDocument[Tuple]] + .get() + .toList + }) + .toMap + completion.setDone() + } + }) + Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ())) + Await.result(client.controllerInterface.pauseWorkflow(EmptyRequest(), ())) + Thread.sleep(4000) + val physicalOps = targetOps.flatMap(op => + workflow.physicalPlan.getPhysicalOpsOfLogicalOp(op.operatorIdentifier) + ) + Await.result( + client.controllerInterface.reconfigureWorkflow( + WorkflowReconfigureRequest( + reconfiguration = physicalOps.map(op => UpdateExecutorRequest(op.id, newOpExecInitInfo)), + reconfigurationId = "test-reconfigure-1" + ), + () + ) + ) + Await.result(client.controllerInterface.resumeWorkflow(EmptyRequest(), ())) + Thread.sleep(400) + Await.result(completion, Duration.fromMinutes(1)) + result + } + + "Engine" should "be able to modify a python UDF worker in workflow" in { + val sourceOpDesc = TestOperators.smallCsvScanOpDesc() + val udfOpDesc = TestOperators.pythonOpDesc() + val code = """ + |from pytexera import * + | + |class ProcessTupleOperator(UDFOperatorV2): + | @overrides + | def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]: + | tuple_['Region'] = tuple_['Region'] + '_reconfigured' + | yield tuple_ + |""".stripMargin + + val result = shouldReconfigure( + List(sourceOpDesc, udfOpDesc), + List( + LogicalLink( + sourceOpDesc.operatorIdentifier, + PortIdentity(), + udfOpDesc.operatorIdentifier, + PortIdentity() + ) + ), + Seq(udfOpDesc), + OpExecWithCode(code, "python") + ) + assert(result(udfOpDesc.operatorIdentifier).exists { t => + t.getField("Region").asInstanceOf[String].contains("_reconfigured") + }) + } + + "Engine" should "be able to modify a java operator in workflow" in { + val sourceOpDesc = TestOperators.mediumCsvScanOpDesc() + val keywordMatchNoneOpDesc = TestOperators.keywordSearchOpDesc("Region", "ShouldMatchNone") + val keywordMatchManyOpDesc = TestOperators.keywordSearchOpDesc("Region", "Asia") + val result = shouldReconfigure( + List(sourceOpDesc, keywordMatchNoneOpDesc), + List( + LogicalLink( + sourceOpDesc.operatorIdentifier, + PortIdentity(), + keywordMatchNoneOpDesc.operatorIdentifier, + PortIdentity() + ) + ), + Seq(keywordMatchNoneOpDesc), + keywordMatchManyOpDesc.getPhysicalOp(ctx.workflowId, ctx.executionId).opExecInitInfo + ) + assert(result(keywordMatchNoneOpDesc.operatorIdentifier).nonEmpty) + } + + "Engine" should "not be able to modify a source operator in workflow" in { + val sourceOpDesc = TestOperators.mediumCsvScanOpDesc() + val sourceOpDesc2 = TestOperators.mediumCsvScanOpDesc() + val keywordMatchNoneOpDesc = TestOperators.keywordSearchOpDesc("Region", "ShouldMatchNone") + val ex = intercept[Throwable] { + shouldReconfigure( + List(sourceOpDesc, keywordMatchNoneOpDesc), + List( + LogicalLink( + sourceOpDesc.operatorIdentifier, + PortIdentity(), + keywordMatchNoneOpDesc.operatorIdentifier, + PortIdentity() + ) + ), + Seq(sourceOpDesc), + sourceOpDesc2.getPhysicalOp(ctx.workflowId, ctx.executionId).opExecInitInfo + ) + } + assert( + ex.getMessage == "java.lang.IllegalStateException: Reconfiguration cannot be applied to source operators" + ) + } + + "Engine" should "propagate reconfiguration through a source operator in workflow" in { + val sourceOpDesc = TestOperators.pythonSourceOpDesc(10000) + val udfOpDesc = TestOperators.pythonOpDesc() + val code = """ + |from pytexera import * + | + |class ProcessTupleOperator(UDFOperatorV2): + | @overrides + | def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]: + | tuple_['field_1'] = tuple_['field_1'] + '_reconfigured' + | yield tuple_ + |""".stripMargin + val result = shouldReconfigure( + List(sourceOpDesc, udfOpDesc), + List( + LogicalLink( + sourceOpDesc.operatorIdentifier, + PortIdentity(), + udfOpDesc.operatorIdentifier, + PortIdentity() + ) + ), + Seq(udfOpDesc), + OpExecWithCode(code, "python") + ) + assert(result(udfOpDesc.operatorIdentifier).exists { t => + t.getField("field_1").asInstanceOf[String].contains("_reconfigured") + }) + } + + "Engine" should "be able to modify two python UDFs in workflow" in { + val sourceOpDesc = TestOperators.smallCsvScanOpDesc() + val udfOpDesc1 = TestOperators.pythonOpDesc() + val udfOpDesc2 = TestOperators.pythonOpDesc() + val code = """ + |from pytexera import * + | + |class ProcessTupleOperator(UDFOperatorV2): + | @overrides + | def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]: + | tuple_['Region'] = tuple_['Region'] + '_reconfigured' + | yield tuple_ + |""".stripMargin + + val result = shouldReconfigure( + List(sourceOpDesc, udfOpDesc1, udfOpDesc2), + List( + LogicalLink( + sourceOpDesc.operatorIdentifier, + PortIdentity(), + udfOpDesc1.operatorIdentifier, + PortIdentity() + ), + LogicalLink( + udfOpDesc1.operatorIdentifier, + PortIdentity(), + udfOpDesc2.operatorIdentifier, + PortIdentity() + ) + ), + Seq(udfOpDesc1, udfOpDesc2), + OpExecWithCode(code, "python") + ) + assert(result(udfOpDesc2.operatorIdentifier).exists { t => + t.getField("Region").asInstanceOf[String].contains("_reconfigured_reconfigured") + }) + } + +} diff --git a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala index ab7e8dc2de5..268b06ff8be 100644 --- a/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala +++ b/common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala @@ -20,6 +20,7 @@ package org.apache.texera.amber.operator import org.apache.texera.amber.core.storage.FileResolver +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType} import org.apache.texera.amber.operator.aggregate.{ AggregateOpDesc, AggregationFunction, @@ -32,6 +33,7 @@ import org.apache.texera.amber.operator.source.scan.json.JSONLScanSourceOpDesc import org.apache.texera.amber.operator.source.sql.asterixdb.AsterixDBSourceOpDesc import org.apache.texera.amber.operator.source.sql.mysql.MySQLSourceOpDesc import org.apache.texera.amber.operator.udf.python.PythonUDFOpDescV2 +import org.apache.texera.amber.operator.udf.python.source.PythonUDFSourceOpDescV2 import java.nio.file.Path @@ -171,6 +173,7 @@ object TestOperators { def pythonOpDesc(): PythonUDFOpDescV2 = { val udf = new PythonUDFOpDescV2() udf.workers = 1 + udf.retainInputColumns = true udf.code = """ |from pytexera import * | @@ -181,4 +184,21 @@ object TestOperators { |""".stripMargin udf } + + def pythonSourceOpDesc(numTuple: Int): PythonUDFSourceOpDescV2 = { + val udf = new PythonUDFSourceOpDescV2() + udf.workers = 1 + udf.columns = List(new Attribute("field_1", AttributeType.STRING)) + udf.code = s""" + |from pytexera import * + | + |class UDFSourceOperator(UDFSourceOperator): + | @overrides + | def produce(self) -> Iterator[Union[TupleLike, TableLike, None]]: + | for i in range($numTuple): + | yield {'field_1': str(i) } + |""".stripMargin + udf + } + }