feat(cancel): add Cancel RPC + orchestrator cancel pipeline#157
Conversation
…c log Two CI failures on PR #157: - TestCancelRequest_InvalidSqid expected codes.InvalidArgument but the gateway returned a plain fmt.Errorf that gRPC defaults to codes.Unknown. Added a unary interceptor in example/server/gateway/main.go that translates any error matching controller.IsInvalidRequest into codes.InvalidArgument. Benefits Land too without touching the controllers' return shapes. - TestCancelRequest_BeforeBatch read request_log exactly once after observing state=cancelled, racing the orchestrator's async log publish. Wrapped the assertion in require.Eventually so the check waits for the log consumer to persist the terminal entry. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
72bdf6e to
164917b
Compare
What base controller layer? Is this referring to core/errs? If so, it looks like the only retryable error types are context-cancellation and infra-errors -- I don't see any special handing of these specific storage errors. |
It's referring to the ConsumerController aka BaseController for all queue consumers |
I see -- it looks like there is no base controller implemented. So as things stand today (unless I'm missing something), there is no code that maps |
| // kicked off, and buildsignal's own halted short-circuit takes over. Both | ||
| // truly-terminal and mid-cancellation batches reach the same observable | ||
| // behaviour (no build performed). | ||
| if entity.IsBatchStateHalted(batch.State) { |
There was a problem hiding this comment.
just thinking if this can be wired up into the Consumer Controller in some way,
Basically if each controller provides a way to get the deserialized payload, then consumer can just make a callback using the same entity before calling the process and short-circuit for all the consumer controllers.
| // push-already-committed race is acknowledged elsewhere) and do not fan out | ||
| // (speculate owns the terminal write to Cancelled and the downstream | ||
| // dependent / conclude publishes). | ||
| if batch.State == entity.BatchStateCancelling { |
There was a problem hiding this comment.
should this use the IsHalted utility?
https://github.com/uber/submitqueue/blob/main/core/consumer/controller.go |
Should we make sure these error types are retriable? Otherwise we fall into the DLQ code-path here instead of the retry code-path: https://github.com/uber/submitqueue/blob/main/core/consumer/consumer.go#L374 |
|
|
Adds a gateway `Cancel(sqid)` RPC that publishes to a new TopicKeyCancel and an orchestrator cancel controller that performs the actual state transitions. Cancellation uses a two-step pattern: first the request is marked RequestStateCancelling (non-terminal intent), then either transitioned directly to RequestStateCancelled (un-batched path) or handed off via batch-cancel + conclude fan-out (batched path). The Cancelling intent is non-terminal so a concurrent merge or failure may still win the race and prevail with Landed or Error — conclude reconciles to the actual terminal outcome. Every stage controller (validate, batch, score, build, buildsignal, merge, conclude, speculate) is now cancellation-aware. Speculate treats cancelled deps as out-of-the-way (drop and continue) instead of cascade-failing. Conclude maps BatchStateCancelled → RequestStateCancelled. Cancellation is asynchronous and not guaranteed; gateway.proto comments direct callers to check the actual outcome via the separate status API. Infrastructure changes that fell out of building this: - gRPC error mapping: a unary interceptor in example/server/gateway/main.go translates any error matching controller.IsInvalidRequest into codes.InvalidArgument so invalid-sqid (and Land's invalid inputs) no longer surface as codes.Unknown. - PublishLog message ID scoping: log entries are now keyed by `<requestID>/<status>` so distinct statuses for the same request coexist on the queue's `(topic, partition_key, id)` unique index (start writes `started`, cancel writes `cancelling` then `cancelled`). Same-status retries still dedupe. Regression coverage: unit tests for every new controller and the two-step transition, plus e2e tests for cancel-before-batch (including idempotent re-cancel) and the InvalidArgument mapping. The idempotent re-cancel e2e test depends on the MySQL queue `Insert` becoming idempotent on (topic, partition_key, id) — extracted to a separate PR. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
164917b to
4e01daf
Compare
Summary
Cancel(sqid)RPC that publishes aCancelRequesttoTopicKeyCancel; processing is asynchronous and cancellation is not guaranteed (a request that has already merged, or that races to completion, may still land or error — callers should check the actual outcome via the separate status / request-log API).RequestStateCancelling(non-terminal intent) → eitherRequestStateCancelleddirectly (un-batched) or via batch cancel + conclude fan-out (batched). The Cancelling intent is non-terminal so concurrent merge / failure may still win and prevail withLanded/Error.IsRequestStateHalted/ batch-terminal short-circuit. Speculate treats cancelled deps as out-of-the-way (drop and continue) instead of cascade-failing; conclude mapsBatchStateCancelled→RequestStateCancelled.storage.ErrNotFoundandstorage.ErrVersionMismatchpropagate as-is so the base controller layer can map them to retryable, matching the pattern in every other orchestrator controller.Test plan
make fmt && make lint && make check-tidy && make check-gazelle && make check-mockscleanbazel test //... --test_tag_filters=-integration,-e2e(32 unit-test targets) all passmake local-start: Land → Cancel before batch → confirmRequestStateCancelledand no batch createdCancelled, contained requests reconcile toCancelled, dependent batches respeculateOut of scope (deferred)
BuildStatusCancelledwrite).🤖 Generated with Claude Code