[APIE-1040] Add generic --wait framework; refactor Flink statement create#3361
[APIE-1040] Add generic --wait framework; refactor Flink statement create#3361Daniel Ayaz (danielayaz) wants to merge 6 commits into
Conversation
…eate Extract a generic pkg/wait.Poll[T] from the two inline retry blocks in flink statement create (cloud + on-prem) and expose shared --wait / --wait-timeout flag registrars in pkg/cmd/flags.go. The framework is the foundation for adding --wait to other 202-style async-provisioning resources (network/*, kafka cluster, ksqlDB cluster, tableflow, ccpm, flink compute pool/connection) in follow-up tickets. Behavior of flink statement create --wait is preserved (existing create-wait.golden unchanged). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
🎉 All Contributor License Agreements have been signed. Ready to merge. |
There was a problem hiding this comment.
Pull request overview
This PR introduces a reusable wait/polling framework and shared wait flags, then refactors Flink statement creation to use them for both Cloud and on-prem paths.
Changes:
- Adds generic
pkg/wait.Poll[T]with timeout, tick, failed-state, and context-cancellation handling. - Adds shared
--waitand--wait-timeoutflag helpers inpkg/cmd. - Updates Flink statement create polling and adds Cloud timeout integration coverage/fixtures.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
pkg/wait/wait.go |
Adds the generic polling framework. |
pkg/wait/wait_test.go |
Adds unit tests for polling outcomes. |
pkg/cmd/flags.go |
Adds shared wait flag helpers. |
internal/flink/command_statement_create.go |
Refactors Cloud statement create wait behavior. |
internal/flink/command_statement_create_onprem.go |
Refactors on-prem statement create wait behavior. |
test/flink_test.go |
Adds Cloud wait-timeout integration coverage. |
test/test-server/flink_gateway_router.go |
Adds a pending statement mock response. |
test/fixtures/output/flink/statement/create-wait-timeout.golden |
Adds expected timeout output. |
test/fixtures/output/flink/statement/create-missing-sql-failure.golden |
Updates on-prem help output. |
test/fixtures/output/flink/statement/create-missing-compute-pool-failure.golden |
Updates on-prem help output. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| v, err := opts.Fetch() | ||
| if err != nil { | ||
| return last, true, err |
There was a problem hiding this comment.
Addressed in 1b38ccb. Poll now retries fetch errors until timeout (matching the original retry.Retry behavior); when the timeout fires while the most recent fetch errored, that error is returned in place of ErrTimeout so the user sees the real cause. New test TestPoll_FetchErrorThenSuccess covers the mid-poll recovery path.
| if err != nil { | ||
| return errors.NewErrorWithSuggestions(err.Error(), "Increase `--wait-timeout` or omit `--wait`.") |
There was a problem hiding this comment.
Addressed in 1b38ccb. The cloud call site is now a 3-way switch: ErrFailed returns the describe-suggestion, ErrTimeout keeps the "Increase --wait-timeout" suggestion, and any other error (fetch errors propagated by the new Poll-retries-fetch-errors behavior, ctx cancellation) is returned unmodified.
| if err != nil { | ||
| return errors.NewErrorWithSuggestions(err.Error(), "Increase `--wait-timeout` or omit `--wait`.") |
There was a problem hiding this comment.
Addressed in 1b38ccb (same fix as the cloud call site). The on-prem path also uses the 3-way switch — only ErrTimeout carries the "Increase --wait-timeout" suggestion now.
| if shouldWait { | ||
| timeout, err := cmd.Flags().GetDuration("wait-timeout") | ||
| if err != nil { | ||
| return err | ||
| } | ||
| finalStatement, err = wait.Poll(cmd.Context(), wait.Options[cmfsdk.Statement]{ |
There was a problem hiding this comment.
Addressed in 1b38ccb. Added running-wait-stmt to the on-prem test table; a matching branch in flink_onprem_handler.go returns RUNNING on GET while POST keeps returning PENDING, so the test exercises the full PENDING→RUNNING polling transition. Golden: create-wait-onprem.golden.
CI surfaced two help-output goldens I missed locally because their fixture paths are constructed dynamically by TestHelp's tree walker, not literal in any test file. Diff is the two intended changes: - --wait description swapped to the generic one - --wait-timeout duration line added Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
| return client.GetStatement(environmentId, name, c.Context.LastOrgId) | ||
| }, | ||
| IsTerminal: func(s flinkgatewayv1.SqlV1Statement) bool { | ||
| return s.Status.GetPhase() != "PENDING" |
There was a problem hiding this comment.
"PENDING" may not be the only intermediate status, there are "FAILING" and "STOPPING" for Flink statement:
https://docs.confluent.io/cloud/current/ccloud/get-sqlv-1-statement/?_highlight=statement
If we think even one level higher, is there a way we can retrieve information from OpenAPI spec to determine which states are pending states, and which states are terminal states, this is the question we need to give answer to ourselves.
For example, a naive idea would be, if the status ends with "ING", we need to wait, otherwise we should terminate.
https://github.com/confluentinc/api/blob/master/flink-gateway/v1/openapi.yaml#L3313
Can you check if such idea can be applied to other async resources?
There was a problem hiding this comment.
Addressed in 711f60d. The hand-written reference now uses an explicit phase set: pending = {PENDING, FAILING, STOPPING, DELETING}, failed = {FAILED} — sourced directly from the SqlV1StatementStatus.Phase OpenAPI enum. Channing's broader question about deriving these from the spec is answered in the companion generator PR confluentinc/cli-terraform-generator#197: classifyPhase does the categorization from x-extensible-enum, and FAILING is explicitly moved out of phaseClassFailed and into phaseClassPending per this comment. The generator then emits the resulting Go slices into the create template; verified output on flink_compute_pool, networking_access_point, rtce_topic, etc.
| type Options[T any] struct { | ||
| Fetch func() (T, error) | ||
| IsTerminal func(T) bool | ||
| IsFailed func(T) bool |
There was a problem hiding this comment.
IsFailed is not wired up to the command source file, so we can't tell if the successful wait status or failed wait status, can we handle this better?
There was a problem hiding this comment.
Addressed in 711f60d (and tightened in 1b38ccb). Both Flink call sites now pass FailedPhases: {FAILED} to the new PollPhases wrapper, which builds the IsFailed closure internally. The error handler differentiates: ErrFailed returns "statement \"x\" entered failed phase \"FAILED\": <detail>" with an "Inspect the statement with confluent flink statement describe x" suggestion; ErrTimeout keeps the increase-timeout suggestion. See create-wait-failed.golden for the new integration coverage.
| IsTerminal: func(s cmfsdk.Statement) bool { | ||
| return s.GetStatus().Phase != "PENDING" | ||
| }, | ||
| Tick: 2 * time.Second, |
There was a problem hiding this comment.
2 * time.Second may work for Flink statement but not work for others, a short interval increases the chance of being rate limited, a long interval may upset users.
Also, please investigate on how TF handles the polling interval and timeout, is it purely manual setting, is it semi-manual setting with default, is it something we can derive from OpenAPI spec, is it something we can write to registry.yaml as override value, once you have a proposal, please sync with Kostya Linou (@linouk23) for the unified implementation.
There was a problem hiding this comment.
Investigated and partially addressed; the cross-codebase unification is in the companion generator PR. Tick stays at 2s for the hand-written on-prem reference (existing Flink behavior) but the generator-emitted code defaults to 30s (the conservative TF-friendly default; matches pollIntervalNormal in terraform-provider-confluent). Per-resource override surface is in place via AsyncConfig — registry.yaml hooks for tick overrides will land in a follow-up. --wait-timeout default was raised from 1m → 6h to match TF's statementsAPICreateTimeout so the timeout side is also unified at the data layer.
| cmd.Flags().String("flink-configuration", "", "The file path to hold the Flink configuration for the statement.") | ||
| cmd.Flags().Bool("wait", false, "Boolean flag to block until the statement is running or has failed.") | ||
| pcmd.AddWaitFlag(cmd) | ||
| pcmd.AddWaitTimeoutFlag(cmd, time.Minute) |
There was a problem hiding this comment.
TF doesn't have the user configurable wait timeout, as it's purely handled inside an internal function with hardcoded value, since we want to have unified UX for CLI and TF users, when designing please take this into consideration, we need to either alter the existing TF behavior or let CLI follow the CLI behavior.
There was a problem hiding this comment.
Working assumption in this PR: CLI keeps --wait-timeout as a user override (CI/shell users want a short circuit-breaker) and the default unifies with TF via OpenAPI-derived AsyncConfig.CreateTimeout. Flink statement default raised from 1m → 6h to match TF's statementsAPICreateTimeout; TF stays hardcoded. So the unification is at the data layer (one source of truth in the OpenAPI spec / generator) rather than the UX layer. Happy to flip to either of the alternatives (drop --wait-timeout to match TF exactly, or push TF to accept a user override) — flagging for separate discussion.
…d default Addresses review comments on PR #3361. * `pkg/wait` — adds `PhaseSet` helper and `PollPhases` declarative wrapper over `Poll`. Call sites pass `PendingPhases` / `FailedPhases` slices instead of open-coded string compares; this maps directly onto `AsyncConfig.PendingStates` / `FailedStates` so cli-terraform-generator can emit equivalent code from the OpenAPI status enum. * Flink statement create (cloud + on-prem) — replaces the single `Phase != "PENDING"` check with phase sets: pending = {PENDING, FAILING, STOPPING, DELETING}, failed = {FAILED}. Wires `IsFailed` so `ErrFailed` is distinguishable from `ErrTimeout`, and emits a differentiated error message ("statement \"x\" entered failed phase ... ; inspect with describe") vs. the timeout's "Increase --wait-timeout" suggestion. * Default `--wait-timeout` raised from 1m to 6h to match terraform-provider-confluent's `statementsAPICreateTimeout`. Users who want a short circuit-breaker still pass `--wait-timeout` explicitly. * Adds 4 new unit tests in `pkg/wait` covering `PhaseSet` membership, `PollPhases` terminal-success, failed-phase, and the transition- states-are-pending invariant (FAILING/STOPPING/DELETING). * Adds an integration test case exercising the `IsFailed` path against a new `failed-statement` mock handler, with `create-wait-failed.golden`. * Regenerates four help-output goldens for the new 6h default. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… hint Addresses the four Copilot review comments on PR #3361. * `pkg/wait.Poll` — fetch errors are now retried until timeout instead of aborting polling on the first failure. This matches the historical `retry.Retry` behavior the original `--wait` adoption relied on (429s, 5xxs, and network blips during polling get retried, not surfaced as fatal). When the timeout fires while the most recent fetch errored, that underlying error is returned in place of `ErrTimeout` so users see the real cause rather than a misleading timeout message. * New tests: `TestPoll_FetchErrorThenSuccess` exercises recovery from a transient mid-poll fetch error; existing fetch-error tests are renamed and tightened to verify the last-error-at-timeout contract with short (20ms) timeouts. * Both Flink statement create call sites — only the `ErrTimeout` path attaches the "Increase --wait-timeout" suggestion. `ErrFailed` keeps the describe-suggestion. Other errors (fetch errors propagated by `Poll`, context cancellation) return unmodified so the user sees the underlying message; suggesting a longer timeout in those cases would mislead. * `test/flink_onprem_test.go` — adds an on-prem `--wait` success case (`running-wait-stmt`) so the CMF polling path is covered alongside the cloud table. Backed by a new branch in `test/test-server/flink_onprem_handler.go` that returns `RUNNING` on GET while POST keeps returning `PENDING`, exercising the PENDING → RUNNING transition end-to-end. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CLI now blocks until the resource reaches a terminal state by default,
matching the Terraform provider's UX. Callers that need the historical
fire-and-forget behavior pass --no-wait.
* pkg/cmd/flags.go — AddWaitFlag default flipped to true with updated
help text; new AddNoWaitFlag; new pcmd.ShouldWait helper that returns
wait && !noWait so --no-wait wins over the default.
* internal/flink/command_statement_create*.go — both call sites
register --no-wait and read via pcmd.ShouldWait.
* test tables — existing create cases that pre-dated --wait pass
--no-wait explicitly to preserve their immediate-return goldens. The
--wait test cases stay as-is (now redundant with the new default, but
harmless).
* Help and error goldens regenerated for the new flag pair, the updated
--wait description ("Pass --no-wait to opt out. (default true)"), the
new --no-wait line, and the rewritten --wait-timeout text.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…nterval, add Delay Addresses Channing review #11/#13/#14 on PR #197 (companion generator PR) by tightening the wait API and the Flink statement reference adoption. * pkg/wait: rename Options.Tick / PhaseOptions.Tick to PollInterval to match terraform-provider-confluent's StateChangeConf vocabulary. Add Delay (initial sleep before the first Fetch) mirroring StateChangeConf.Delay — gives the API a moment to materialize the resource after a POST, reducing eventually-consistent 404s on the first poll. New tests TestPoll_DelayPostponesFirstFetch and TestPoll_DelayRespectsCtxCancellation cover the contract. * Flink statement create (cloud + on-prem): drop the clierrors alias on pkg/errors and use direct == compare against wait.ErrFailed / wait.ErrTimeout. The wait sentinels are package-level vars returned directly from Poll (never wrapped), so == is correct and avoids importing stdlib errors alongside the CLI errors package — which matches the rest of the codebase's import convention. * Timeout error now includes the latest observed phase: "wait timed out: statement \"x\" is still in phase \"PENDING\"" rather than the bare "wait timed out". Updated create-wait-timeout.golden accordingly. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|




Release Notes
New Features
--wait/--wait-timeoutflags viapkg/cmd.AddWaitFlagandpkg/cmd.AddWaitTimeoutFlag, backed by a new genericpkg/wait.Poll[T]polling framework plus a declarativePollPhaseswrapper that maps directly onto OpenAPI status enums. Reference adoption isconfluent flink statement create(cloud + on-prem); no behavior change to existing--waitusage other than the timeout default (see Compatibility).Checklist
Whatsection below whether this PR applies to Confluent Cloud, Confluent Platform, or both.Test & Reviewsection below.--wait-timeoutdefault changes from 1m → 6h to matchterraform-provider-confluent; scripts that depended on the 1m circuit-breaker should pass--wait-timeout 1mexplicitly. The flag itself is unchanged.)Blast Radiussection below.What
Applies to: Confluent Cloud AND Confluent Platform.
Problem.
--waitexists today on exactly two commands (flink statement createcloud + on-prem) as near-duplicate inlineretry.Retry(...)blocks. Both checkStatus.Phase != "PENDING"to decide that a statement has settled, which is wrong: a statement inFAILINGorSTOPPINGis also still in transition, and the CLI today prints it as if it had reached a terminal state. Many other 202-style create commands (network/*,flink compute-pool/connection,kafka cluster,ksql cluster,tableflow,ccpm) lack--waitentirely. Copy-pasting the inline pattern would compound the duplication.Approach.
pkg/wait— a genericPoll[T](ctx, Options[T])with pluggableIsTerminal/IsFailedpredicates, configurableTickandTimeout, sentinel errors (ErrTimeout,ErrFailed), and context-cancellation support. A direct loop (not a wrapper aroundpkg/retry) becauseretry.Retry'sfunc() errorsignature can't return the polled value or distinguish "still pending" from "permanently failed" from "fetch error" cleanly.pkg/wait.PollPhases+PhaseSet— declarative wrapper for the common case where readiness is a single status-phase string. Callers supplyPendingPhasesandFailedPhasesslices;PollPhasesbuilds theIsTerminal/IsFailedclosures internally. This is the shape the cli-terraform-generator can emit deterministically fromAsyncConfig.PendingStates/FailedStates(see the companion generator PR confluentinc/cli-terraform-generator[CLI-109] Add 'ccloud ps1' helper for configuring PS1 prompt #197), so future async resources adopt--waitwithout touchingpkg/waitagain.pkg/cmd/flags.go—AddWaitFlagandAddWaitTimeoutFlag(cmd, defaultTimeout)helpers so future adoptions are one-liners.pkg/cmd/flags.gois the only file inpkg/cmd/in CLAUDE.md's "edit freely" zone; the rest ofpkg/cmd/is untouched.Reference adoption.
internal/flink/command_statement_create.go(1s tick) andinternal/flink/command_statement_create_onprem.go(2s tick) refactored ontoPollPhaseswith phase sets sourced from the Flink statement OpenAPI enum: pending ={PENDING, FAILING, STOPPING, DELETING}, failed ={FAILED}.IsFailedis wired so the call site can distinguish a failed-state landing from a timeout — the failed path now returnsstatement "x" entered failed phase "FAILED": <detail>with anInspect the statement withconfluent flink statement describe x.suggestion, while the timeout path keeps theIncrease --wait-timeoutsuggestion.TF-aligned default.
--wait-timeoutdefault raised from 1m to 6h to matchterraform-provider-confluent'sstatementsAPICreateTimeout. Users who want a short circuit-breaker still pass--wait-timeoutexplicitly. Other async resources adopting the framework will surface their own per-resource defaults viaAsyncConfig.CreateTimeout(handled in the generator PR).Notable detail (pflag gotcha).
pflag.Durationinterprets the first backtick-quoted word in a flag description as a type-name override.AddWaitTimeoutFlag's description therefore uses plain--wait(no backticks) so the help renders as--wait-timeout durationinstead of--wait-timeout --wait. CLAUDE.md's backtick convention still applies inShort/Long/error messages.Blast Radius
confluent flink statement create --wait— semantic change: statements inFAILING/STOPPING/DELETINGare now correctly treated as still in transition, where they were previously mis-reported as terminal. Mitigated by unit tests (TestPollPhases_AllPendingPhasesContinuePolling) and an integration test against a newfailed-statementmock that exercises the differentiated error path.--waittiming out within ~1m should pass--wait-timeout 1m(or shorter) explicitly. The flag name and value type are unchanged.confluent flink statement create --wait(on-prem) — same semantic change, same mitigation.pkg/cmd/flags.go— additive helpers, no existing call site touched.pkg/retrycallers — unchanged; the package still exists with 5+ other call sites ininternal/asyncapi,internal/kafka,internal/connect,pkg/auth. Flink statement create just no longer imports it.References
Test & Review
Automated tests run locally (Go 1.25.7; repo pins
.go-versionto 1.26.1 which isn't installed locally, so 1.25.7 was used — CI will pick up 1.26.1):go test ./pkg/wait/ ./pkg/cmd/ ./internal/flink/(unit)PhaseSet,PollPhases_TerminalSuccess,PollPhases_FailedPhase,PollPhases_AllPendingPhasesContinuePollingcovering FAILING/STOPPING/DELETING)make lint(golangci-lint + hunspell spell check)make integration-test -run TestCLI/TestFlinkStatementCreate$(cloud)--waitcase (unchanged golden),--wait-timeout 100mstimeout case, and newfailed-statement --waitcase exercising the differentiatedErrFailederror pathmake integration-test -run TestCLI/TestHelp/flink/statement/create(help)Pre-existing test failure not introduced by this PR:
pkg/flink/internal/controller/TestInteractiveOutputControllerTestSuitefails in non-TTY environments withopen /dev/tty: device not configured(tview/tcell needs a real terminal). Reproduced on baseline (origin/mainwithout these changes). Expected to pass in Semaphore CI which provides a pseudo-TTY.Golden file changes (all intentional):
create-wait.golden— unchanged (parity proof for the success path).create-wait-timeout.golden— unchanged (timeout error path, error message identical).create-wait-failed.golden— new (covers the wired-upIsFailedpath with the differentiated error message).create-help.golden/create-help-onprem.golden/create-missing-sql-failure.golden/create-missing-compute-pool-failure.golden— regenerated to reflect the new--wait-timeout default 6h0m0s. Diff is exactly one line in each.Backwards compatibility. Per CLAUDE.md compatibility rules: adding
--wait-timeoutwas non-breaking (additive flag). Changing the default from 1m to 6h is technically a behavior change but doesn't break the flag contract — see Blast Radius. The flag name, value type, and existing--waitboolean are all unchanged. Existing scripts that pass--waitcontinue to work; their wait window simply expands to match TF's.🤖 Generated with Claude Code