Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/github-action-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ jobs:
python-version: '3.11'
- name: Show Python
run: python --version || python3 --version
- name: Install dependencies
run: |
python -m pip install --upgrade pip
if [ -f amber/requirements.txt ]; then pip install -r amber/requirements.txt; fi
if [ -f amber/operator-requirements.txt ]; then pip install -r amber/operator-requirements.txt; fi
- name: Setup sbt launcher
uses: sbt/setup-sbt@3e125ece5c3e5248e18da9ed8d2cce3d335ec8dd # v1.1.14
- uses: coursier/cache-action@4e2615869d13561d626ed48655e1a39e5b192b3c # v6.4.9
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import "org/apache/texera/amber/engine/architecture/worker/statistics.proto";
import "org/apache/texera/amber/engine/architecture/sendsemantics/partitionings.proto";
import "scalapb/scalapb.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/any.proto";

option (scalapb.options) = {
scope: FILE,
Expand All @@ -40,12 +39,12 @@ message ControlRequest {
TakeGlobalCheckpointRequest takeGlobalCheckpointRequest = 2;
DebugCommandRequest debugCommandRequest = 3;
EvaluatePythonExpressionRequest evaluatePythonExpressionRequest = 4;
ModifyLogicRequest modifyLogicRequest = 5;
RetryWorkflowRequest retryWorkflowRequest = 6;
ConsoleMessageTriggeredRequest consoleMessageTriggeredRequest = 8;
PortCompletedRequest portCompletedRequest = 9;
WorkerStateUpdatedRequest workerStateUpdatedRequest = 10;
LinkWorkersRequest linkWorkersRequest = 11;
RetryWorkflowRequest retryWorkflowRequest = 5;
ConsoleMessageTriggeredRequest consoleMessageTriggeredRequest = 6;
PortCompletedRequest portCompletedRequest = 7;
WorkerStateUpdatedRequest workerStateUpdatedRequest = 8;
LinkWorkersRequest linkWorkersRequest = 9;
WorkflowReconfigureRequest workflowReconfigureRequest = 10;

// request for worker
AddInputChannelRequest addInputChannelRequest = 50;
Expand Down Expand Up @@ -119,7 +118,7 @@ message TakeGlobalCheckpointRequest {
}

message WorkflowReconfigureRequest{
ModifyLogicRequest reconfiguration = 1 [(scalapb.field).no_box = true];
repeated UpdateExecutorRequest reconfiguration = 1;
string reconfigurationId = 2;
}

Expand All @@ -134,10 +133,6 @@ message EvaluatePythonExpressionRequest {
string operatorId = 2;
}

message ModifyLogicRequest {
repeated UpdateExecutorRequest updateRequest = 1;
}

message RetryWorkflowRequest {
repeated core.ActorVirtualIdentity workers = 1;
}
Expand Down Expand Up @@ -260,8 +255,7 @@ message InitializeExecutorRequest {

message UpdateExecutorRequest {
core.PhysicalOpIdentity targetOpId = 1 [(scalapb.field).no_box = true];
google.protobuf.Any newExecutor = 2 [(scalapb.field).no_box = true];
google.protobuf.Any stateTransferFunc = 3;
core.OpExecInitInfo newExecInitInfo = 2;
}

message PrepareCheckpointRequest{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,5 @@ service ControllerService {
rpc LinkWorkers(LinkWorkersRequest) returns (EmptyReturn);
rpc ControllerInitiateQueryStatistics(QueryStatisticsRequest) returns (EmptyReturn);
rpc RetryWorkflow(RetryWorkflowRequest) returns (EmptyReturn);
rpc ReconfigureWorkflow(WorkflowReconfigureRequest) returns (EmptyReturn);
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,5 @@ service WorkerService {
rpc DebugCommand(DebugCommandRequest) returns (EmptyReturn);
rpc EvaluatePythonExpression(EvaluatePythonExpressionRequest) returns (EvaluatedValue);
rpc NoOperation(EmptyRequest) returns (EmptyReturn);
rpc UpdateExecutor(UpdateExecutorRequest) returns (EmptyReturn);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@
# specific language governing permissions and limitations
# under the License.

# from proto.org.apache.texera.amber.engine.architecture.worker import UpdateExecutorV2
# from core.architecture.handlers.control.control_handler_base import ControlHandler
# from core.architecture.managers.context import Context
#
#
# class UpdateExecutorHandler(ControlHandler):
# cmd = UpdateExecutorV2
#
# def __call__(self, context: Context, command: cmd, *args, **kwargs):
# context.executor_manager.update_executor(command.code, command.is_source)
# return None
from core.architecture.handlers.control.control_handler_base import ControlHandler
from proto.org.apache.texera.amber.engine.architecture.rpc import (
EmptyReturn,
UpdateExecutorRequest,
)
from core.util import get_one_of
from proto.org.apache.texera.amber.core import OpExecWithCode


class UpdateExecutorHandler(ControlHandler):
async def update_executor(self, req: UpdateExecutorRequest) -> EmptyReturn:
op_exec_with_code: OpExecWithCode = get_one_of(req.new_exec_init_info)
self.context.executor_manager.update_executor(
op_exec_with_code.code, self.context.executor_manager.executor.is_source
)
return EmptyReturn()
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
from core.architecture.handlers.control.resume_worker_handler import ResumeWorkerHandler
from core.architecture.handlers.control.start_channel_handler import StartChannelHandler
from core.architecture.handlers.control.start_worker_handler import StartWorkerHandler
from core.architecture.handlers.control.update_executor_handler import (
UpdateExecutorHandler,
)


class AsyncRPCHandlerInitializer(
Expand All @@ -64,5 +67,6 @@ class AsyncRPCHandlerInitializer(
StartChannelHandler,
EndChannelHandler,
NoOperationHandler,
UpdateExecutorHandler,
):
pass
8 changes: 7 additions & 1 deletion amber/src/main/python/core/runnables/main_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,13 @@ def _check_and_process_control(self) -> None:
or not self._input_queue.is_data_enabled()
):
next_entry = self.interruptible_get()
self._process_dcm(next_entry)
match(
next_entry,
DCMElement,
self._process_dcm,
ECMElement,
self._process_ecm,
)

@overrides
def pre_start(self) -> None:
Expand Down
3 changes: 2 additions & 1 deletion amber/src/main/python/core/runnables/network_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ def data_handler(command: bytes, table: Table) -> int:
# Explicitly set is_control to trigger lazy computation.
# If not set, it may be computed at different times,
# causing hash inconsistencies.
data_header.tag.is_control = False
if not data_header.tag.is_control:
data_header.tag.is_control = False
payload = match(
data_header.payload_type,
"Data",
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class ControllerAsyncRPCHandlerInitializer(
with DebugCommandHandler
with TakeGlobalCheckpointHandler
with EmbeddedControlMessageHandler
with RetrieveWorkflowStateHandler {
with RetrieveWorkflowStateHandler
with ReconfigurationHandler {
val actorId: ActorVirtualIdentity = cp.actorId
}
Loading
Loading