Skip to content

[APIE-1040] Add generic --wait framework; refactor Flink statement create#3361

Open
Daniel Ayaz (danielayaz) wants to merge 6 commits into
mainfrom
dayaz/apie-1040-wait-framework
Open

[APIE-1040] Add generic --wait framework; refactor Flink statement create#3361
Daniel Ayaz (danielayaz) wants to merge 6 commits into
mainfrom
dayaz/apie-1040-wait-framework

Conversation

@danielayaz
Copy link
Copy Markdown
Member

@danielayaz Daniel Ayaz (danielayaz) commented May 19, 2026

Release Notes

New Features

  • Adds shared --wait / --wait-timeout flags via pkg/cmd.AddWaitFlag and pkg/cmd.AddWaitTimeoutFlag, backed by a new generic pkg/wait.Poll[T] polling framework plus a declarative PollPhases wrapper that maps directly onto OpenAPI status enums. Reference adoption is confluent flink statement create (cloud + on-prem); no behavior change to existing --wait usage other than the timeout default (see Compatibility).

Checklist

  • I have successfully built and used a custom CLI binary, without linter issues from this PR.
  • I have clearly specified in the What section below whether this PR applies to Confluent Cloud, Confluent Platform, or both.
  • I have verified this PR in Confluent Cloud pre-prod or production environment, if applicable. (verified via integration mocks; live verification pending if reviewer requests)
  • I have verified this PR in Confluent Platform on-premises environment, if applicable. (verified via integration mocks; live verification pending if reviewer requests)
  • I have attached manual CLI verification results or screenshots in the Test & Review section below.
  • I have added appropriate CLI integration or unit tests for any new or updated commands and functionality.
  • I confirm that this PR introduces no breaking changes or backward compatibility issues. (The --wait-timeout default changes from 1m → 6h to match terraform-provider-confluent; scripts that depended on the 1m circuit-breaker should pass --wait-timeout 1m explicitly. The flag itself is unchanged.)
  • I have indicated the potential customer impact if something goes wrong in the Blast Radius section below.
  • I have put checkmarks below confirming that the feature associated with this PR is enabled in:
    • Confluent Cloud prod (N/A — refactor preserves polling semantics; new framework is additive)
    • Confluent Cloud stag (N/A — same reason)
    • Confluent Platform (N/A — same reason)
    • Check this box if the feature is enabled for certain organizations only

What

Applies to: Confluent Cloud AND Confluent Platform.

Problem. --wait exists today on exactly two commands (flink statement create cloud + on-prem) as near-duplicate inline retry.Retry(...) blocks. Both check Status.Phase != "PENDING" to decide that a statement has settled, which is wrong: a statement in FAILING or STOPPING is also still in transition, and the CLI today prints it as if it had reached a terminal state. Many other 202-style create commands (network/*, flink compute-pool/connection, kafka cluster, ksql cluster, tableflow, ccpm) lack --wait entirely. Copy-pasting the inline pattern would compound the duplication.

Approach.

  1. pkg/wait — a generic Poll[T](ctx, Options[T]) with pluggable IsTerminal / IsFailed predicates, configurable Tick and Timeout, sentinel errors (ErrTimeout, ErrFailed), and context-cancellation support. A direct loop (not a wrapper around pkg/retry) because retry.Retry's func() error signature can't return the polled value or distinguish "still pending" from "permanently failed" from "fetch error" cleanly.

  2. pkg/wait.PollPhases + PhaseSet — declarative wrapper for the common case where readiness is a single status-phase string. Callers supply PendingPhases and FailedPhases slices; PollPhases builds the IsTerminal / IsFailed closures internally. This is the shape the cli-terraform-generator can emit deterministically from AsyncConfig.PendingStates / FailedStates (see the companion generator PR confluentinc/cli-terraform-generator[CLI-109] Add 'ccloud ps1' helper for configuring PS1 prompt #197), so future async resources adopt --wait without touching pkg/wait again.

  3. pkg/cmd/flags.goAddWaitFlag and AddWaitTimeoutFlag(cmd, defaultTimeout) helpers so future adoptions are one-liners. pkg/cmd/flags.go is the only file in pkg/cmd/ in CLAUDE.md's "edit freely" zone; the rest of pkg/cmd/ is untouched.

  4. Reference adoption. internal/flink/command_statement_create.go (1s tick) and internal/flink/command_statement_create_onprem.go (2s tick) refactored onto PollPhases with phase sets sourced from the Flink statement OpenAPI enum: pending = {PENDING, FAILING, STOPPING, DELETING}, failed = {FAILED}. IsFailed is wired so the call site can distinguish a failed-state landing from a timeout — the failed path now returns statement "x" entered failed phase "FAILED": <detail> with an Inspect the statement with confluent flink statement describe x . suggestion, while the timeout path keeps the Increase --wait-timeout suggestion.

  5. TF-aligned default. --wait-timeout default raised from 1m to 6h to match terraform-provider-confluent's statementsAPICreateTimeout. Users who want a short circuit-breaker still pass --wait-timeout explicitly. Other async resources adopting the framework will surface their own per-resource defaults via AsyncConfig.CreateTimeout (handled in the generator PR).

Notable detail (pflag gotcha). pflag.Duration interprets the first backtick-quoted word in a flag description as a type-name override. AddWaitTimeoutFlag's description therefore uses plain --wait (no backticks) so the help renders as --wait-timeout duration instead of --wait-timeout --wait. CLAUDE.md's backtick convention still applies in Short/Long/error messages.

Blast Radius

  • Confluent Cloud customers using confluent flink statement create --wait — semantic change: statements in FAILING / STOPPING / DELETING are now correctly treated as still in transition, where they were previously mis-reported as terminal. Mitigated by unit tests (TestPollPhases_AllPendingPhasesContinuePolling) and an integration test against a new failed-statement mock that exercises the differentiated error path.
  • Customers scripting around the previous 1m default — the new 6h default is more permissive; any script that depended on --wait timing out within ~1m should pass --wait-timeout 1m (or shorter) explicitly. The flag name and value type are unchanged.
  • Confluent Platform customers using confluent flink statement create --wait (on-prem) — same semantic change, same mitigation.
  • All commands using pkg/cmd/flags.go — additive helpers, no existing call site touched.
  • pkg/retry callers — unchanged; the package still exists with 5+ other call sites in internal/asyncapi, internal/kafka, internal/connect, pkg/auth. Flink statement create just no longer imports it.

References

Test & Review

Automated tests run locally (Go 1.25.7; repo pins .go-version to 1.26.1 which isn't installed locally, so 1.25.7 was used — CI will pick up 1.26.1):

Suite Result
go test ./pkg/wait/ ./pkg/cmd/ ./internal/flink/ (unit) PASS — 11 wait tests (7 baseline + 4 new: PhaseSet, PollPhases_TerminalSuccess, PollPhases_FailedPhase, PollPhases_AllPendingPhasesContinuePolling covering FAILING/STOPPING/DELETING)
make lint (golangci-lint + hunspell spell check) PASS
make integration-test -run TestCLI/TestFlinkStatementCreate$ (cloud) PASS — includes existing --wait case (unchanged golden), --wait-timeout 100ms timeout case, and new failed-statement --wait case exercising the differentiated ErrFailed error path
make integration-test -run TestCLI/TestHelp/flink/statement/create (help) PASS — both cloud and on-prem help goldens regenerated for the 6h default

Pre-existing test failure not introduced by this PR:

  • pkg/flink/internal/controller/TestInteractiveOutputControllerTestSuite fails in non-TTY environments with open /dev/tty: device not configured (tview/tcell needs a real terminal). Reproduced on baseline (origin/main without these changes). Expected to pass in Semaphore CI which provides a pseudo-TTY.

Golden file changes (all intentional):

  • create-wait.goldenunchanged (parity proof for the success path).
  • create-wait-timeout.goldenunchanged (timeout error path, error message identical).
  • create-wait-failed.goldennew (covers the wired-up IsFailed path with the differentiated error message).
  • create-help.golden / create-help-onprem.golden / create-missing-sql-failure.golden / create-missing-compute-pool-failure.golden — regenerated to reflect the new --wait-timeout default 6h0m0s. Diff is exactly one line in each.

Backwards compatibility. Per CLAUDE.md compatibility rules: adding --wait-timeout was non-breaking (additive flag). Changing the default from 1m to 6h is technically a behavior change but doesn't break the flag contract — see Blast Radius. The flag name, value type, and existing --wait boolean are all unchanged. Existing scripts that pass --wait continue to work; their wait window simply expands to match TF's.

🤖 Generated with Claude Code

…eate

Extract a generic pkg/wait.Poll[T] from the two inline retry blocks in
flink statement create (cloud + on-prem) and expose shared --wait /
--wait-timeout flag registrars in pkg/cmd/flags.go.

The framework is the foundation for adding --wait to other 202-style
async-provisioning resources (network/*, kafka cluster, ksqlDB cluster,
tableflow, ccpm, flink compute pool/connection) in follow-up tickets.

Behavior of flink statement create --wait is preserved (existing
create-wait.golden unchanged).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@danielayaz Daniel Ayaz (danielayaz) requested a review from a team as a code owner May 19, 2026 19:29
Copilot AI review requested due to automatic review settings May 19, 2026 19:29
@confluent-cla-assistant
Copy link
Copy Markdown

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a reusable wait/polling framework and shared wait flags, then refactors Flink statement creation to use them for both Cloud and on-prem paths.

Changes:

  • Adds generic pkg/wait.Poll[T] with timeout, tick, failed-state, and context-cancellation handling.
  • Adds shared --wait and --wait-timeout flag helpers in pkg/cmd.
  • Updates Flink statement create polling and adds Cloud timeout integration coverage/fixtures.

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
pkg/wait/wait.go Adds the generic polling framework.
pkg/wait/wait_test.go Adds unit tests for polling outcomes.
pkg/cmd/flags.go Adds shared wait flag helpers.
internal/flink/command_statement_create.go Refactors Cloud statement create wait behavior.
internal/flink/command_statement_create_onprem.go Refactors on-prem statement create wait behavior.
test/flink_test.go Adds Cloud wait-timeout integration coverage.
test/test-server/flink_gateway_router.go Adds a pending statement mock response.
test/fixtures/output/flink/statement/create-wait-timeout.golden Adds expected timeout output.
test/fixtures/output/flink/statement/create-missing-sql-failure.golden Updates on-prem help output.
test/fixtures/output/flink/statement/create-missing-compute-pool-failure.golden Updates on-prem help output.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread pkg/wait/wait.go Outdated
Comment on lines +31 to +33
v, err := opts.Fetch()
if err != nil {
return last, true, err
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 1b38ccb. Poll now retries fetch errors until timeout (matching the original retry.Retry behavior); when the timeout fires while the most recent fetch errored, that error is returned in place of ErrTimeout so the user sees the real cause. New test TestPoll_FetchErrorThenSuccess covers the mid-poll recovery path.

Comment on lines +173 to +174
if err != nil {
return errors.NewErrorWithSuggestions(err.Error(), "Increase `--wait-timeout` or omit `--wait`.")
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 1b38ccb. The cloud call site is now a 3-way switch: ErrFailed returns the describe-suggestion, ErrTimeout keeps the "Increase --wait-timeout" suggestion, and any other error (fetch errors propagated by the new Poll-retries-fetch-errors behavior, ctx cancellation) is returned unmodified.

Comment on lines +143 to +144
if err != nil {
return errors.NewErrorWithSuggestions(err.Error(), "Increase `--wait-timeout` or omit `--wait`.")
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 1b38ccb (same fix as the cloud call site). The on-prem path also uses the 3-way switch — only ErrTimeout carries the "Increase --wait-timeout" suggestion now.

Comment on lines +128 to +133
if shouldWait {
timeout, err := cmd.Flags().GetDuration("wait-timeout")
if err != nil {
return err
}
finalStatement, err = wait.Poll(cmd.Context(), wait.Options[cmfsdk.Statement]{
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 1b38ccb. Added running-wait-stmt to the on-prem test table; a matching branch in flink_onprem_handler.go returns RUNNING on GET while POST keeps returning PENDING, so the test exercises the full PENDING→RUNNING polling transition. Golden: create-wait-onprem.golden.

CI surfaced two help-output goldens I missed locally because their
fixture paths are constructed dynamically by TestHelp's tree walker,
not literal in any test file. Diff is the two intended changes:
  - --wait description swapped to the generic one
  - --wait-timeout duration line added

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@sonarqube-confluent
Copy link
Copy Markdown

return client.GetStatement(environmentId, name, c.Context.LastOrgId)
},
IsTerminal: func(s flinkgatewayv1.SqlV1Statement) bool {
return s.Status.GetPhase() != "PENDING"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"PENDING" may not be the only intermediate status, there are "FAILING" and "STOPPING" for Flink statement:
https://docs.confluent.io/cloud/current/ccloud/get-sqlv-1-statement/?_highlight=statement

If we think even one level higher, is there a way we can retrieve information from OpenAPI spec to determine which states are pending states, and which states are terminal states, this is the question we need to give answer to ourselves.

For example, a naive idea would be, if the status ends with "ING", we need to wait, otherwise we should terminate.
https://github.com/confluentinc/api/blob/master/flink-gateway/v1/openapi.yaml#L3313

Can you check if such idea can be applied to other async resources?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 711f60d. The hand-written reference now uses an explicit phase set: pending = {PENDING, FAILING, STOPPING, DELETING}, failed = {FAILED} — sourced directly from the SqlV1StatementStatus.Phase OpenAPI enum. Channing's broader question about deriving these from the spec is answered in the companion generator PR confluentinc/cli-terraform-generator#197: classifyPhase does the categorization from x-extensible-enum, and FAILING is explicitly moved out of phaseClassFailed and into phaseClassPending per this comment. The generator then emits the resulting Go slices into the create template; verified output on flink_compute_pool, networking_access_point, rtce_topic, etc.

Comment thread pkg/wait/wait.go Outdated
type Options[T any] struct {
Fetch func() (T, error)
IsTerminal func(T) bool
IsFailed func(T) bool
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IsFailed is not wired up to the command source file, so we can't tell if the successful wait status or failed wait status, can we handle this better?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 711f60d (and tightened in 1b38ccb). Both Flink call sites now pass FailedPhases: {FAILED} to the new PollPhases wrapper, which builds the IsFailed closure internally. The error handler differentiates: ErrFailed returns "statement \"x\" entered failed phase \"FAILED\": <detail>" with an "Inspect the statement with confluent flink statement describe x" suggestion; ErrTimeout keeps the increase-timeout suggestion. See create-wait-failed.golden for the new integration coverage.

IsTerminal: func(s cmfsdk.Statement) bool {
return s.GetStatus().Phase != "PENDING"
},
Tick: 2 * time.Second,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 * time.Second may work for Flink statement but not work for others, a short interval increases the chance of being rate limited, a long interval may upset users.

Also, please investigate on how TF handles the polling interval and timeout, is it purely manual setting, is it semi-manual setting with default, is it something we can derive from OpenAPI spec, is it something we can write to registry.yaml as override value, once you have a proposal, please sync with Kostya Linou (@linouk23) for the unified implementation.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Investigated and partially addressed; the cross-codebase unification is in the companion generator PR. Tick stays at 2s for the hand-written on-prem reference (existing Flink behavior) but the generator-emitted code defaults to 30s (the conservative TF-friendly default; matches pollIntervalNormal in terraform-provider-confluent). Per-resource override surface is in place via AsyncConfig — registry.yaml hooks for tick overrides will land in a follow-up. --wait-timeout default was raised from 1m → 6h to match TF's statementsAPICreateTimeout so the timeout side is also unified at the data layer.

cmd.Flags().String("flink-configuration", "", "The file path to hold the Flink configuration for the statement.")
cmd.Flags().Bool("wait", false, "Boolean flag to block until the statement is running or has failed.")
pcmd.AddWaitFlag(cmd)
pcmd.AddWaitTimeoutFlag(cmd, time.Minute)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TF doesn't have the user configurable wait timeout, as it's purely handled inside an internal function with hardcoded value, since we want to have unified UX for CLI and TF users, when designing please take this into consideration, we need to either alter the existing TF behavior or let CLI follow the CLI behavior.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Working assumption in this PR: CLI keeps --wait-timeout as a user override (CI/shell users want a short circuit-breaker) and the default unifies with TF via OpenAPI-derived AsyncConfig.CreateTimeout. Flink statement default raised from 1m → 6h to match TF's statementsAPICreateTimeout; TF stays hardcoded. So the unification is at the data layer (one source of truth in the OpenAPI spec / generator) rather than the UX layer. Happy to flip to either of the alternatives (drop --wait-timeout to match TF exactly, or push TF to accept a user override) — flagging for separate discussion.

Daniel Ayaz (danielayaz) and others added 4 commits May 28, 2026 16:13
…d default

Addresses review comments on PR #3361.

* `pkg/wait` — adds `PhaseSet` helper and `PollPhases` declarative wrapper
  over `Poll`. Call sites pass `PendingPhases` / `FailedPhases` slices
  instead of open-coded string compares; this maps directly onto
  `AsyncConfig.PendingStates` / `FailedStates` so cli-terraform-generator
  can emit equivalent code from the OpenAPI status enum.

* Flink statement create (cloud + on-prem) — replaces the single
  `Phase != "PENDING"` check with phase sets:
  pending = {PENDING, FAILING, STOPPING, DELETING}, failed = {FAILED}.
  Wires `IsFailed` so `ErrFailed` is distinguishable from `ErrTimeout`,
  and emits a differentiated error message ("statement \"x\" entered
  failed phase ... ; inspect with describe") vs. the timeout's
  "Increase --wait-timeout" suggestion.

* Default `--wait-timeout` raised from 1m to 6h to match
  terraform-provider-confluent's `statementsAPICreateTimeout`. Users
  who want a short circuit-breaker still pass `--wait-timeout`
  explicitly.

* Adds 4 new unit tests in `pkg/wait` covering `PhaseSet` membership,
  `PollPhases` terminal-success, failed-phase, and the transition-
  states-are-pending invariant (FAILING/STOPPING/DELETING).

* Adds an integration test case exercising the `IsFailed` path against
  a new `failed-statement` mock handler, with `create-wait-failed.golden`.

* Regenerates four help-output goldens for the new 6h default.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
… hint

Addresses the four Copilot review comments on PR #3361.

* `pkg/wait.Poll` — fetch errors are now retried until timeout instead of
  aborting polling on the first failure. This matches the historical
  `retry.Retry` behavior the original `--wait` adoption relied on (429s,
  5xxs, and network blips during polling get retried, not surfaced as
  fatal). When the timeout fires while the most recent fetch errored,
  that underlying error is returned in place of `ErrTimeout` so users
  see the real cause rather than a misleading timeout message.

* New tests: `TestPoll_FetchErrorThenSuccess` exercises recovery from a
  transient mid-poll fetch error; existing fetch-error tests are
  renamed and tightened to verify the last-error-at-timeout contract
  with short (20ms) timeouts.

* Both Flink statement create call sites — only the `ErrTimeout` path
  attaches the "Increase --wait-timeout" suggestion. `ErrFailed` keeps
  the describe-suggestion. Other errors (fetch errors propagated by
  `Poll`, context cancellation) return unmodified so the user sees the
  underlying message; suggesting a longer timeout in those cases would
  mislead.

* `test/flink_onprem_test.go` — adds an on-prem `--wait` success case
  (`running-wait-stmt`) so the CMF polling path is covered alongside
  the cloud table. Backed by a new branch in
  `test/test-server/flink_onprem_handler.go` that returns `RUNNING` on
  GET while POST keeps returning `PENDING`, exercising the
  PENDING → RUNNING transition end-to-end.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
CLI now blocks until the resource reaches a terminal state by default,
matching the Terraform provider's UX. Callers that need the historical
fire-and-forget behavior pass --no-wait.

* pkg/cmd/flags.go — AddWaitFlag default flipped to true with updated
  help text; new AddNoWaitFlag; new pcmd.ShouldWait helper that returns
  wait && !noWait so --no-wait wins over the default.
* internal/flink/command_statement_create*.go — both call sites
  register --no-wait and read via pcmd.ShouldWait.
* test tables — existing create cases that pre-dated --wait pass
  --no-wait explicitly to preserve their immediate-return goldens. The
  --wait test cases stay as-is (now redundant with the new default, but
  harmless).
* Help and error goldens regenerated for the new flag pair, the updated
  --wait description ("Pass --no-wait to opt out. (default true)"), the
  new --no-wait line, and the rewritten --wait-timeout text.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…nterval, add Delay

Addresses Channing review #11/#13/#14 on PR #197 (companion generator
PR) by tightening the wait API and the Flink statement reference
adoption.

* pkg/wait: rename Options.Tick / PhaseOptions.Tick to PollInterval to
  match terraform-provider-confluent's StateChangeConf vocabulary. Add
  Delay (initial sleep before the first Fetch) mirroring
  StateChangeConf.Delay — gives the API a moment to materialize the
  resource after a POST, reducing eventually-consistent 404s on the
  first poll. New tests TestPoll_DelayPostponesFirstFetch and
  TestPoll_DelayRespectsCtxCancellation cover the contract.

* Flink statement create (cloud + on-prem): drop the clierrors alias
  on pkg/errors and use direct == compare against wait.ErrFailed /
  wait.ErrTimeout. The wait sentinels are package-level vars returned
  directly from Poll (never wrapped), so == is correct and avoids
  importing stdlib errors alongside the CLI errors package — which
  matches the rest of the codebase's import convention.

* Timeout error now includes the latest observed phase:
  "wait timed out: statement \"x\" is still in phase \"PENDING\""
  rather than the bare "wait timed out". Updated
  create-wait-timeout.golden accordingly.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@sonarqube-confluent
Copy link
Copy Markdown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants