diff --git a/.github/workflows-examples/deploy.yml b/.github/workflows-examples/deploy.yml new file mode 100644 index 0000000..35f0a37 --- /dev/null +++ b/.github/workflows-examples/deploy.yml @@ -0,0 +1,63 @@ +# EXAMPLE WORKFLOW - copy this file to .github/workflows/ in your own repository. +# +# It lives in workflows-examples/ because this examples repository does not deploy +# to a real Confluent Cloud environment on push. +# +# The workflow deploys the table program of Example_08_IntegrationAndDeployment: +# +# 1. Local unit tests run on the open source Flink engine (no credentials needed). +# 2. Integration tests run against Confluent Cloud. They fail fast if any of the +# secrets below are missing, so a misconfigured pipeline cannot skip its +# verification step and still deploy. +# 3. The deploy step runs the program's main() method, which submits the statement +# under a deterministic name. Re-running the workflow with unchanged code is +# idempotent: the plugin detects that an identical statement already exists. +# +# If the pipeline code changed, the conflicting statement currently has to be deleted +# first via the manage.yml workflow. Upcoming plugin versions simplify this with +# application name prefixes and an on-conflict replace option, removing the manual step. +# +# For staging-to-production promotion, run the same job against different GitHub +# environments so that each environment provides its own secrets and protection rules. +# +# Required secrets (see the README section "Via Environment Variables"): +# CLOUD_PROVIDER, CLOUD_REGION, FLINK_API_KEY, FLINK_API_SECRET, +# ORG_ID, ENV_ID, COMPUTE_POOL_ID, TARGET_CATALOG, TARGET_DATABASE +name: Deploy Table API Program + +on: + workflow_dispatch: + # In a real project, deploy when changes are merged: + # push: + # branches: [main] + +env: + CLOUD_PROVIDER: ${{ secrets.CLOUD_PROVIDER }} + CLOUD_REGION: ${{ secrets.CLOUD_REGION }} + FLINK_API_KEY: ${{ secrets.FLINK_API_KEY }} + FLINK_API_SECRET: ${{ secrets.FLINK_API_SECRET }} + ORG_ID: ${{ secrets.ORG_ID }} + ENV_ID: ${{ secrets.ENV_ID }} + COMPUTE_POOL_ID: ${{ secrets.COMPUTE_POOL_ID }} + TARGET_CATALOG: ${{ secrets.TARGET_CATALOG }} + TARGET_DATABASE: ${{ secrets.TARGET_DATABASE }} + +jobs: + test-and-deploy: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + + - uses: actions/setup-java@v5 + with: + distribution: 'temurin' + java-version: '17' + cache: maven + + - name: Run local unit tests and integration tests against Confluent Cloud + run: ./mvnw -B --no-transfer-progress verify + + - name: Deploy to Confluent Cloud + run: > + java -cp target/flink-table-api-java-examples-1.0.jar + io.confluent.flink.examples.table.Example_08_IntegrationAndDeployment diff --git a/.github/workflows-examples/manage.yml b/.github/workflows-examples/manage.yml new file mode 100644 index 0000000..87dc6a8 --- /dev/null +++ b/.github/workflows-examples/manage.yml @@ -0,0 +1,77 @@ +# EXAMPLE WORKFLOW - copy this file to .github/workflows/ in your own repository. +# +# It lives in workflows-examples/ because this examples repository does not manage +# statements in a real Confluent Cloud environment. +# +# Deployment and lifecycle management are separate concerns: removing or changing code +# does not imply that a running statement should be stopped or deleted. This workflow +# makes lifecycle operations explicit. It is triggered manually from the GitHub Actions +# UI and runs Example_12_StatementLifecycle with the chosen action. +# +# Example usage (use the exact statement name as printed by the deploy step; the +# plugin may append a suffix such as '-api' to configured names, which upcoming +# plugin versions remove): +# - Check a deployment: action=status, statement-name=vendors-per-brand-api +# - Stop a statement: action=stop, statement-name=vendors-per-brand-api +# - Resume after a stop: action=resume, statement-name=vendors-per-brand-api +# - Permanently remove: action=delete, statement-name=vendors-per-brand-api +# +# Upcoming plugin versions add an application name concept that groups all statements +# of a program under a common prefix and allows listing them, so that statements no +# longer need to be addressed one by one. +# +# Required secrets (see the README section "Via Environment Variables"): +# CLOUD_PROVIDER, CLOUD_REGION, FLINK_API_KEY, FLINK_API_SECRET, +# ORG_ID, ENV_ID, COMPUTE_POOL_ID +name: Manage Flink Statements + +on: + workflow_dispatch: + inputs: + action: + description: 'Action to perform' + required: true + type: choice + options: + - status + - stop + - resume + - delete + statement-name: + description: 'Full statement name as printed by the deploy step (e.g. vendors-per-brand-api)' + required: true + +env: + CLOUD_PROVIDER: ${{ secrets.CLOUD_PROVIDER }} + CLOUD_REGION: ${{ secrets.CLOUD_REGION }} + FLINK_API_KEY: ${{ secrets.FLINK_API_KEY }} + FLINK_API_SECRET: ${{ secrets.FLINK_API_SECRET }} + ORG_ID: ${{ secrets.ORG_ID }} + ENV_ID: ${{ secrets.ENV_ID }} + COMPUTE_POOL_ID: ${{ secrets.COMPUTE_POOL_ID }} + +jobs: + manage: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + + - uses: actions/setup-java@v5 + with: + distribution: 'temurin' + java-version: '17' + cache: maven + + - name: Build + run: ./mvnw -B --no-transfer-progress -DskipTests package + + - name: Run lifecycle action + # Inputs are passed via env and quoted instead of interpolated into the + # script, so that free-text input cannot inject shell commands. + env: + ACTION: ${{ inputs.action }} + STATEMENT_NAME: ${{ inputs.statement-name }} + run: > + java -cp target/flink-table-api-java-examples-1.0.jar + io.confluent.flink.examples.table.Example_12_StatementLifecycle + "$ACTION" "$STATEMENT_NAME" diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..bda1d77 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,27 @@ +# Continuous integration for this repository. +# +# Runs entirely without Confluent Cloud connectivity: code format check (Spotless), +# compilation, local unit tests on the open source Flink engine, and the fat JAR build. +# Integration tests (*IT) require Confluent Cloud credentials and would fail without +# them, so this workflow skips them explicitly with -DskipITs. +name: CI + +on: + pull_request: + push: + branches: [master] + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v6 + + - uses: actions/setup-java@v5 + with: + distribution: 'temurin' + java-version: '17' + cache: maven + + - name: Build and test + run: ./mvnw -B --no-transfer-progress verify -DskipITs diff --git a/.gitignore b/.gitignore index ae8186c..a45b90d 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ target/ .java-version *.DS_Store cloud.properties -dependency-reduced-pom.xml \ No newline at end of file +dependency-reduced-pom.xml +*.env diff --git a/README.md b/README.md index ffada6a..f16e466 100644 --- a/README.md +++ b/README.md @@ -90,6 +90,20 @@ public static void main(String[] args) { // pipeline.execute().await(); ``` +## Developer Journey + +The examples in this repository follow the journey of taking a table program from a first +experiment all the way to a production deployment: + +| Stage | What you do | Where to look | +|-------|-------------|---------------| +| Get started | Configure a connection to Confluent Cloud and run a first program | `Example_00` - `Example_02`, [Getting Started](#getting-started) | +| Build | Transform tables, build pipelines, work with data types, UDFs, and structured objects | `Example_03` - `Example_07`, `Example_09` - `Example_11`, `TableProgramTemplate` | +| Test locally | Run unit tests on mock data, without Confluent Cloud connectivity | `src/test/java/`, [Testing Table Programs](#testing-table-programs) | +| Test on Confluent Cloud | Run integration tests against the real service | `Example_08_IntegrationAndDeploymentIT`, [Testing Table Programs](#testing-table-programs) | +| Deploy | Submit statements with deterministic names from a CI/CD pipeline | `Example_08_IntegrationAndDeployment`, [CI/CD with GitHub Actions](#cicd-with-github-actions) | +| Operate | Check status, stop, resume, and delete deployed statements | `Example_12_StatementLifecycle`, `.github/workflows-examples/manage.yml` | + ## Getting Started ### Prerequisites @@ -456,6 +470,97 @@ ConfluentSettings settings3 = ConfluentSettings.newBuilder() .build(); ``` +## Testing Table Programs + +Table programs can be tested in three tiers, from fastest feedback to highest fidelity: + +1. **Unit tests on plain logic.** UDFs and other business logic are plain Java classes and can be + tested with JUnit alone: no Flink engine, no Confluent Cloud connectivity, and no artifact + upload required. See `Example_09_FunctionsTest`. +2. **Local pipeline tests on the open source Flink engine.** Pipeline logic that is structured as + a function from input `Table`s to an output `Table` (see `VendorsPerBrand` in + `Example_08_IntegrationAndDeployment`) can be executed locally with mock data from + `fromValues()`, without Confluent Cloud connectivity. See + `Example_08_IntegrationAndDeploymentTest` and run with `./mvnw test`. +3. **Integration tests against Confluent Cloud.** The same pipeline logic runs on the real + service with the exact Confluent semantics, on a Kafka-backed table that is made bounded with + dynamic options. See `Example_08_IntegrationAndDeploymentIT` and run with `./mvnw verify`. + These tests require the connection environment variables (see + [Via Environment Variables](#via-environment-variables)) plus `TARGET_CATALOG` (the name of + your Confluent Cloud environment) and `TARGET_DATABASE` (the name of a Kafka cluster with + write access), and fail fast when any are missing, so a CI pipeline cannot silently skip its + verification step and still report success. To build without Confluent Cloud credentials on + purpose, skip them explicitly: `./mvnw verify -DskipITs`. + +### How local testing works + +The Confluent plugin executes all statements on Confluent Cloud; it does not contain a local +engine. Local tests therefore run on the open source Flink engine (planner, runtime, and an +embedded mini-cluster), which this project adds as test-scoped dependencies. + +The plugin and the open source planner cannot share a runtime classpath: both register their +engine factories under the identifier `default`, and `TableEnvironment.create(...)` fails with +`Multiple factories for identifier 'default'` if both are present. This project resolves the +conflict with classpath exclusions in the `pom.xml`: + +- `./mvnw test` (surefire) excludes the Confluent plugin, so unit tests run on the open source + engine. +- `./mvnw verify` (failsafe, test classes named `*IT`) excludes the open source planner, so + integration tests run against Confluent Cloud. + +Keep pipeline logic free of `io.confluent.flink.plugin` imports so that unit tests can execute it +locally. + +NOTE: IDEs ignore these classpath exclusions, so run the tests via `./mvnw test` and +`./mvnw verify` instead of the IDE's test runner. To use the IDE's test runner anyway, replicate +the exclusion in the test's run configuration (IntelliJ IDEA: Modify options -> Modify classpath -> +Exclude): exclude the `confluent-flink-table-api-java-plugin` JAR for unit tests, or the +`flink-table-planner-loader` JAR for integration tests. For production projects, the cleaner +structure is a multi-module build: one module contains the pipeline logic with only +`flink-table-api-java` and the open source test dependencies, and another module adds the +Confluent plugin and the deployment entrypoints. + +### Local testing limitations + +The local engine is not identical to Confluent Cloud: + +- The `$rowtime` system column and other Confluent system columns do not exist locally. +- There is no local catalog mirroring your Confluent Cloud schemas. Mock tables are declared + manually with `fromValues()` and must be kept in sync with the real schemas. +- Confluent-specific SQL syntax (such as `DISTRIBUTED INTO ... BUCKETS`) and Confluent-provided + functions are not available. + +Local tests give fast feedback on transformation logic; integration tests against Confluent Cloud +remain the source of truth. + +## CI/CD with GitHub Actions + +The repository contains workflows that show how a table program moves through a CI/CD pipeline: + +- `.github/workflows/ci.yml` runs in this repository on every pull request: code format check, + compilation, local unit tests, and the fat JAR build. It requires no Confluent Cloud + credentials. +- `.github/workflows-examples/deploy.yml` is a template for your own repository: it runs the + integration tests against Confluent Cloud and then deploys the program by running its `main()` + method. Because the statement name is deterministic (see + `Example_08_IntegrationAndDeployment`), re-running the workflow with unchanged code is + idempotent. +- `.github/workflows-examples/manage.yml` is a template for explicit lifecycle operations: + checking status, stopping, resuming, or deleting a deployed statement by name (see + `Example_12_StatementLifecycle`). Deployment and lifecycle management are separate concerns; + removing code does not imply that a running statement should be stopped or deleted. + +The workflows authenticate via the environment variables described in +[Via Environment Variables](#via-environment-variables), mapped from GitHub Actions secrets. In +addition, the integration tests and the deploy entrypoint read `TARGET_CATALOG` (the name of the +Confluent Cloud environment) and `TARGET_DATABASE` (the name of the Kafka cluster) to determine +where tables may be created. For staging-to-production promotion, run the same deploy job against +different GitHub environments so that each environment provides its own secrets and protection +rules. + +Upcoming plugin versions simplify these workflows further with application name prefixes, +an on-conflict replace option for redeployments, and listing all statements of an application. + ## Documentation for Confluent Utilities ### Confluent Tools diff --git a/pom.xml b/pom.xml index 6e143c5..76bf7af 100644 --- a/pom.xml +++ b/pom.xml @@ -18,6 +18,9 @@ ${target.java.version} 2.26.0 3.6.0 + 5.12.2 + 3.27.3 + 3.5.4 @@ -53,6 +56,40 @@ ${confluent-plugin.version} + + + org.apache.flink + flink-table-planner-loader + ${flink.version} + test + + + org.apache.flink + flink-table-runtime + ${flink.version} + test + + + org.apache.flink + flink-clients + ${flink.version} + test + + + org.junit.jupiter + junit-jupiter + ${junit.version} + test + + + org.assertj + assertj-core + ${assertj.version} + test + + @@ -119,6 +156,45 @@ + + + org.apache.maven.plugins + maven-surefire-plugin + ${surefire.version} + + + io.confluent.flink:confluent-flink-table-api-java-plugin + + + + + + + org.apache.maven.plugins + maven-failsafe-plugin + ${surefire.version} + + + org.apache.flink:flink-table-planner-loader + + + + + + integration-test + verify + + + + + diff --git a/src/main/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeployment.java b/src/main/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeployment.java index e71c1cc..d8816a8 100644 --- a/src/main/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeployment.java +++ b/src/main/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeployment.java @@ -1,221 +1,142 @@ package io.confluent.flink.examples.table; +import io.confluent.flink.plugin.ConfluentPluginOptions; import io.confluent.flink.plugin.ConfluentSettings; import io.confluent.flink.plugin.ConfluentTools; import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; -import org.apache.flink.types.Row; -import org.apache.flink.util.CloseableIterator; - -import java.util.List; -import java.util.UUID; -import java.util.stream.Collectors; +import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.lit; -import static org.apache.flink.table.api.Expressions.withAllColumns; /** - * An example that illustrates how to embed a table program into a CI/CD pipeline for continuous - * testing and rollout. + * An example that illustrates how to structure, test, and deploy a table program for production use + * in a CI/CD pipeline. * - *

Because we cannot rely on production data in this example, the program sets up some - * Kafka-backed tables with data during the {@code setup} phase. + *

The example separates two concerns that often end up entangled: * - *

Afterward, the program can operate in two modes: one for integration testing ({@code test} - * phase) and one for deployment ({@code deploy} phase). + *

    + *
  • The pipeline logic in {@link VendorsPerBrand} is plain Table API code without any + * Confluent-specific dependencies. It receives its input table as a parameter instead of + * resolving it from a catalog. This makes the logic executable on the open source Flink + * engine in local unit tests, where the input is mocked with {@code fromValues()} (see {@code + * Example_08_IntegrationAndDeploymentTest}), as well as on Confluent Cloud, where the input + * is a Kafka-backed table (see {@code Example_08_IntegrationAndDeploymentIT}). + *
  • The {@link #main(String[])} method is the deployment entrypoint. It wires the pipeline to + * Confluent Cloud and submits it as a long-running background statement. + *
* - *

A CI/CD workflow could execute the following: + *

A CI/CD workflow (e.g. GitHub Actions, see {@code .github/workflows-examples/}) could execute + * the following: * *

- *     export EXAMPLE_JAR=./target/flink-table-api-java-examples-1.0.jar
- *     export EXAMPLE_CLASS=io.confluent.flink.examples.table.Example_08_IntegrationAndDeployment
- *     java -jar $EXAMPLE_JAR $EXAMPLE_CLASS setup
- *     java -jar $EXAMPLE_JAR $EXAMPLE_CLASS test
- *     java -jar $EXAMPLE_JAR $EXAMPLE_CLASS deploy
+ *     # Run local unit tests, no Confluent Cloud connectivity required
+ *     ./mvnw test
+ *
+ *     # Run integration tests against Confluent Cloud (see README for all variables)
+ *     export FLINK_API_KEY=... FLINK_API_SECRET=...
+ *     export TARGET_CATALOG=... TARGET_DATABASE=...
+ *     ./mvnw verify
+ *
+ *     # Deploy
+ *     ./mvnw package
+ *     java -cp target/flink-table-api-java-examples-1.0.jar \
+ *         io.confluent.flink.examples.table.Example_08_IntegrationAndDeployment
  * 
* - *

NOTE: This example requires write access to a Kafka cluster. Fill out the given variables - * below with target catalog/database if this is fine for you. + *

All connection and target configuration is provided via environment variables, so the same JAR + * can be promoted from a staging to a production environment by switching the variables (for + * example with GitHub Actions environments). * - *

ALSO NOTE: The example submits an unbounded background statement. Make sure to stop the - * statement in the Web UI afterward to clean up resources. + *

The statement is submitted under the deterministic name {@code vendors-per-brand}, so + * re-running the deployment with unchanged code is idempotent: the plugin detects that a statement + * with the same name and specification already exists. If the code changed, the conflicting + * statement must currently be deleted first (see {@code Example_12_StatementLifecycle}). Upcoming + * plugin versions simplify this with application name prefixes and an on-conflict replace option. * - *

The complete CI/CD workflow performs the following steps: + *

NOTE: This example requires write access to a Kafka cluster, configured via the environment + * variables TARGET_CATALOG (environment name) and TARGET_DATABASE (Kafka cluster name). * - *

    - *
  1. Create Kafka table 'ProductsMock' and 'VendorsPerBrand'. - *
  2. Fill Kafka table 'ProductsMock' with data from marketplace examples table 'products'. - *
  3. Test the given SQL on a subset of data in 'ProductsMock' with the help of dynamic options. - *
  4. Deploy an unbounded version of the tested SQL that write into 'VendorsPerBrand'. - *
+ *

ALSO NOTE: The example submits an unbounded background statement. Use {@code + * Example_12_StatementLifecycle} or the Web UI to stop and delete the statement afterward to clean + * up resources. */ public class Example_08_IntegrationAndDeployment { - // Fill this with an environment you have write access to - static final String TARGET_CATALOG = ""; - - // Fill this with a Kafka cluster you have write access to - static final String TARGET_DATABASE = ""; - - // Fill this with names of the Kafka Topics you want to create - static final String SOURCE_TABLE = "ProductsMock"; + // Name of the Kafka topic that stores the results static final String TARGET_TABLE = "VendorsPerBrand"; - // The following SQL will be tested on a finite subset of data before - // it gets deployed to production. - // In production, it will run on unbounded input. - // The '%s' parameterizes the SQL for testing. - static final String SQL = - "SELECT brand, COUNT(*) AS vendors FROM ProductsMock %s GROUP BY brand"; + // Deterministic statement name that makes the deployment addressable across CI/CD runs + static final String STATEMENT_NAME = "vendors-per-brand"; + + /** + * The pipeline logic under test: counts the number of vendors per brand. + * + *

This class must not reference any {@code io.confluent.flink.plugin} classes so that unit + * tests can run it on the open source Flink engine without the plugin on the classpath. + */ + public static class VendorsPerBrand { + public static Table buildPipeline(Table products) { + return products.groupBy($("brand")).select($("brand"), lit(1).count().as("vendors")); + } + } - // All logic is defined in a main() method. It can run both in an IDE or CI/CD system. - public static void main(String[] args) throws Exception { - if (args.length == 0) { + // The main() method only performs the deployment. Testing happens in JUnit (see above), + // lifecycle operations are a separate concern (see Example_12_StatementLifecycle). + public static void main(String[] args) { + if (args.length > 0) { + // Earlier versions of this example took a setup/test/deploy mode argument. Reject + // arguments so that a stale invocation cannot silently perform a deployment. throw new IllegalArgumentException( - "No mode specified. Possible values are 'setup', 'test', or 'deploy'."); + "This example does not take arguments: it always deploys. Testing happens in" + + " JUnit, lifecycle operations in Example_12_StatementLifecycle."); } - EnvironmentSettings settings = ConfluentSettings.fromResource("/cloud.properties"); + // Reads CLOUD_PROVIDER, CLOUD_REGION, FLINK_API_KEY, FLINK_API_SECRET, ORG_ID, ENV_ID, + // and COMPUTE_POOL_ID from environment variables. In GitHub Actions, these map naturally + // to repository or environment secrets. + EnvironmentSettings settings = ConfluentSettings.fromGlobalVariables(); TableEnvironment env = TableEnvironment.create(settings); - env.useCatalog(TARGET_CATALOG); - env.useDatabase(TARGET_DATABASE); - - String mode = args[0]; - switch (mode) { - case "setup": - setupProgram(env); - break; - case "test": - testProgram(env); - break; - case "deploy": - deployProgram(env); - break; - default: - throw new IllegalArgumentException("Unknown mode: " + mode); - } - } - // -------------------------------------------------------------------------------------------- - // Setup Phase - // -------------------------------------------------------------------------------------------- + env.useCatalog(requireEnv("TARGET_CATALOG")); + env.useDatabase(requireEnv("TARGET_DATABASE")); - private static void setupProgram(TableEnvironment env) throws Exception { - System.out.println("Running setup..."); - - System.out.println("Creating table..." + SOURCE_TABLE); - // Create a mock table that has exactly the same schema as the example `products` table. - // The LIKE clause is very convenient for this task which is why we use SQL here. - // Since we use little data, a bucket of 1 is important to satisfy the `scan.bounded.mode` - // during testing. + System.out.println("Creating table... " + TARGET_TABLE); + // The pipeline owns its output table and creates it on the first deployment. env.executeSql( String.format( "CREATE TABLE IF NOT EXISTS `%s`\n" - + "DISTRIBUTED INTO 1 BUCKETS\n" - + "LIKE `examples`.`marketplace`.`products` (EXCLUDING OPTIONS)", - SOURCE_TABLE)); - - System.out.println("Start filling table..."); - // Let Flink copy generated data into the mock table. Note that the statement is unbounded - // and submitted as a background statement by default. - TableResult pipelineResult = - env.from("`examples`.`marketplace`.`products`") - .select(withAllColumns()) - .insertInto(SOURCE_TABLE) - .execute(); - - System.out.println("Waiting for at least 200 elements in table..."); - // We start a second Flink statement for monitoring how the copying progresses - TableResult countResult = env.from(SOURCE_TABLE).select(lit(1).count()).as("c").execute(); - // This waits for the condition to be met: - try (CloseableIterator iterator = countResult.collect()) { - while (iterator.hasNext()) { - Row row = iterator.next(); - long count = row.getFieldAs("c"); - if (count >= 200L) { - System.out.println("200 elements reached. Stopping..."); - break; - } - } - } - - // By using a closable iterator, the foreground statement will be stopped automatically when - // the iterator is closed. But the background statement still needs a manual stop. - ConfluentTools.stopStatement(pipelineResult); - - System.out.println("Creating table..." + TARGET_TABLE); - // Create a table for storing the results after deployment. - env.executeSql( - String.format( - "CREATE TABLE IF NOT EXISTS `%s` \n" + "(brand STRING, vendors BIGINT, PRIMARY KEY(brand) NOT ENFORCED)\n" + "DISTRIBUTED INTO 1 BUCKETS", TARGET_TABLE)); - } - - // -------------------------------------------------------------------------------------------- - // Test Phase - // -------------------------------------------------------------------------------------------- - - private static void testProgram(TableEnvironment env) { - System.out.println("Running test..."); - - // Dynamic options allow influencing parts of a table scan. In this case, they define a - // range (from start offset '0' to end offset '100') how to read from Kafka. Effectively, - // they make the table bounded. If all tables are finite, the statement can terminate. - // This allows us to run checks on the result. - String dynamicOptions = - "/*+ OPTIONS(\n" - + "'scan.startup.mode' = 'specific-offsets',\n" - + "'scan.startup.specific-offsets' = 'partition: 0, offset: 0',\n" - + "'scan.bounded.mode' = 'specific-offsets',\n" - + "'scan.bounded.specific-offsets' = 'partition: 0, offset: 100'\n" - + ") */"; - - System.out.println("Requesting test data..."); - TableResult result = env.executeSql(String.format(SQL, dynamicOptions)); - List rows = ConfluentTools.collectMaterialized(result); + System.out.println("Deploying statement... " + STATEMENT_NAME); + // A deterministic statement name makes the deployment idempotent and the statement + // addressable for lifecycle operations (status, stop, resume, delete) in later CI/CD + // runs. Without it, every run would submit a new statement under a random name. + env.getConfig().set(ConfluentPluginOptions.CLIENT_STATEMENT_NAME, STATEMENT_NAME); + + // The same pipeline logic that was tested locally and against Confluent Cloud now runs + // unbounded on the continuously generated rows of the examples catalog. + Table products = env.from("`examples`.`marketplace`.`products`"); + TableResult result = + VendorsPerBrand.buildPipeline(products).insertInto(TARGET_TABLE).execute(); + + // The API might add suffixes to manual statement names such as '-sql' or '-api'. For the + // final submitted name, use the provided tools. Upcoming plugin versions guarantee that + // the submitted name is exactly the configured one. System.out.println( - "Test data:\n" - + rows.stream().map(Row::toString).collect(Collectors.joining("\n"))); - - // Use the testing framework of your choice and add checks to verify the - // correctness of the test data - boolean testSuccessful = - rows.stream() - .map(r -> r.getFieldAs("brand")) - .anyMatch(brand -> brand.equals("Apple")); - if (testSuccessful) { - System.out.println("Success. Ready for deployment."); - } else { - throw new IllegalStateException("Test was not successful"); - } + "Statement has been deployed as: " + ConfluentTools.getStatementName(result)); } - // -------------------------------------------------------------------------------------------- - // Deploy Phase - // -------------------------------------------------------------------------------------------- - - private static void deployProgram(TableEnvironment env) { - System.out.println("Running deploy..."); - - // It is possible to give a better statement name for deployment but make sure that the name - // is unique across environment and region. - String statementName = "vendors-per-brand-" + UUID.randomUUID(); - env.getConfig().set("client.statement-name", statementName); - - // Execute the SQL without dynamic options. - // The result is unbounded and piped into the target table. - TableResult insertIntoResult = - env.sqlQuery(String.format(SQL, "")).insertInto(TARGET_TABLE).execute(); - - // The API might add suffixes to manual statement names such as '-sql' or '-api'. - // For the final submitted name, use the provided tools. - String finalName = ConfluentTools.getStatementName(insertIntoResult); - - System.out.println("Statement has been deployed as: " + finalName); + private static String requireEnv(String name) { + String value = System.getenv(name); + if (value == null || value.isBlank()) { + throw new IllegalArgumentException("Environment variable '" + name + "' is required."); + } + return value; } } diff --git a/src/main/java/io/confluent/flink/examples/table/Example_12_StatementLifecycle.java b/src/main/java/io/confluent/flink/examples/table/Example_12_StatementLifecycle.java new file mode 100644 index 0000000..4588a38 --- /dev/null +++ b/src/main/java/io/confluent/flink/examples/table/Example_12_StatementLifecycle.java @@ -0,0 +1,80 @@ +package io.confluent.flink.examples.table; + +import io.confluent.flink.plugin.ConfluentSettings; +import io.confluent.flink.plugin.StatementHandle; +import io.confluent.flink.plugin.model.SqlV1StatementStatus; + +import org.apache.flink.table.api.TableEnvironment; + +/** + * An example that illustrates how to manage the lifecycle of deployed statements from a CI/CD + * pipeline or the command line, without requiring the Confluent CLI. + * + *

Deployment (see {@code Example_08_IntegrationAndDeployment}) and lifecycle management are + * separate concerns: removing or changing code does not imply that a running statement should be + * stopped or deleted. This program makes those operations explicit, for example via a manually + * triggered GitHub Actions workflow (see {@code .github/workflows-examples/manage.yml}). + * + *

+ *     export FLINK_API_KEY=... FLINK_API_SECRET=... (see README for all variables)
+ *
+ *     java -cp target/flink-table-api-java-examples-1.0.jar \
+ *         io.confluent.flink.examples.table.Example_12_StatementLifecycle status vendors-per-brand-api
+ * 
+ * + *

Supported actions: + * + *

    + *
  • {@code status}: print the current phase and warnings of a statement + *
  • {@code stop}: pause a running statement + *
  • {@code resume}: restart a stopped statement + *
  • {@code delete}: permanently remove a statement + *
+ * + *

NOTE: Statements are addressed by their exact name, which is why deployments should use + * deterministic statement names. Use the name printed by the deploy step: the plugin may append a + * suffix such as '-api' to configured names. Upcoming plugin versions guarantee exact names and add + * an application name concept that groups all statements of a program under a common prefix and + * allows listing them. + */ +public class Example_12_StatementLifecycle { + + public static void main(String[] args) { + if (args.length < 2) { + throw new IllegalArgumentException( + "Usage: "); + } + String action = args[0]; + String statementName = args[1]; + + TableEnvironment env = TableEnvironment.create(ConfluentSettings.fromGlobalVariables()); + + // A handle gives access to an already deployed statement by name + StatementHandle handle = StatementHandle.fromName(env, statementName); + + switch (action) { + case "status": + SqlV1StatementStatus status = handle.getSqlV1Statement().getStatus(); + System.out.println("Phase: " + status.getPhase()); + if (status.getDetail() != null) { + System.out.println("Detail: " + status.getDetail()); + } + handle.getWarnings().forEach(warning -> System.out.println("Warning: " + warning)); + break; + case "stop": + handle.stop(); + System.out.println("Statement has been stopped: " + statementName); + break; + case "resume": + handle.resume(); + System.out.println("Statement has been resumed: " + statementName); + break; + case "delete": + handle.delete(); + System.out.println("Statement has been deleted: " + statementName); + break; + default: + throw new IllegalArgumentException("Unknown action: " + action); + } + } +} diff --git a/src/test/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeploymentIT.java b/src/test/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeploymentIT.java new file mode 100644 index 0000000..7e0ff7c --- /dev/null +++ b/src/test/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeploymentIT.java @@ -0,0 +1,191 @@ +package io.confluent.flink.examples.table; + +import io.confluent.flink.plugin.ConfluentPluginOptions; +import io.confluent.flink.plugin.ConfluentSettings; +import io.confluent.flink.plugin.ConfluentTools; + +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.flink.table.api.Expressions.lit; +import static org.apache.flink.table.api.Expressions.withAllColumns; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.fail; + +/** + * Integration tests for the pipeline logic of {@link Example_08_IntegrationAndDeployment}, executed + * against Confluent Cloud. + * + *

While the unit tests (see {@code Example_08_IntegrationAndDeploymentTest}) verify the logic + * locally, these tests verify it on the real service: the exact Confluent SQL semantics, the + * Confluent catalog, and Kafka-backed tables. + * + *

The tests run during {@code ./mvnw verify} and fail fast when the required environment + * variables are not set, so a CI pipeline cannot silently skip its verification step (e.g. a deploy + * pipeline with a missing secret) and still report success. Builds that intentionally run without + * Confluent Cloud credentials skip integration tests explicitly with {@code ./mvnw verify + * -DskipITs}. The tests require the standard connection variables (see the README's "Via + * Environment Variables" section) plus TARGET_CATALOG and TARGET_DATABASE pointing to an + * environment and Kafka cluster with write access. + * + *

Because we cannot rely on production data in this example, the test fixture creates a mock + * Kafka-backed table and fills it with data from the marketplace examples table. Dynamic options + * then make the table bounded, so the pipeline terminates and its result can be asserted. + * + *

NOTE: When running from the IDE instead of {@code ./mvnw verify}, exclude the {@code + * flink-table-planner-loader} JAR from the test's run configuration classpath (the opposite of the + * unit tests; IntelliJ IDEA: Modify options -> Modify classpath -> Exclude) and set the required + * environment variables in the run configuration. + */ +class Example_08_IntegrationAndDeploymentIT { + + // Name of the mock Kafka topic that emulates the production input + static final String SOURCE_TABLE = "ProductsMock"; + + static TableEnvironment env; + + @BeforeAll + // The timeout runs the setup in a separate thread so that it can be interrupted even while + // blocked on statement results, e.g. when the compute pool has no capacity for the fill + // statement. Without it, a stuck setup would hang until the CI job timeout. + @Timeout(value = 15, unit = TimeUnit.MINUTES, threadMode = Timeout.ThreadMode.SEPARATE_THREAD) + static void setUpMockTable() throws Exception { + requireEnvironment(); + env = TableEnvironment.create(ConfluentSettings.fromGlobalVariables()); + env.useCatalog(System.getenv("TARGET_CATALOG")); + env.useDatabase(System.getenv("TARGET_DATABASE")); + + System.out.println("Creating table... " + SOURCE_TABLE); + // Create a mock table that has exactly the same schema as the example `products` table. + // The LIKE clause is very convenient for this task which is why we use SQL here. + // Since we use little data, a bucket of 1 is important to satisfy the + // `scan.bounded.mode` during testing. + env.executeSql( + String.format( + "CREATE TABLE IF NOT EXISTS `%s`\n" + + "DISTRIBUTED INTO 1 BUCKETS\n" + + "LIKE `examples`.`marketplace`.`products` (EXCLUDING OPTIONS)", + SOURCE_TABLE)); + + System.out.println("Start filling table..."); + // Let Flink copy generated data into the mock table. Note that the statement is + // unbounded and submitted as a background statement by default. + TableResult pipelineResult = + env.from("`examples`.`marketplace`.`products`") + .select(withAllColumns()) + .insertInto(SOURCE_TABLE) + .execute(); + + long count = 0; + try { + System.out.println("Waiting for at least 200 elements in table..."); + // A second Flink statement monitors how the copying progresses. The foreground + // statement is stopped automatically when its iterator is closed. + TableResult countResult = + env.from(SOURCE_TABLE).select(lit(1).count()).as("c").execute(); + try (CloseableIterator iterator = countResult.collect()) { + while (count < 200L && iterator.hasNext()) { + count = iterator.next().getFieldAs("c"); + } + } + } finally { + // The fill statement is unbounded and must always be cleaned up, even when the wait + // above fails, as it would otherwise keep running and consuming CFUs. It is also + // deleted rather than just stopped: it gets a random name on every run, and stopped + // statements would accumulate in the environment. + String fillStatement = ConfluentTools.getStatementName(pipelineResult); + ConfluentTools.stopStatement(env, fillStatement); + ConfluentTools.deleteStatement(env, fillStatement); + } + if (count < 200L) { + fail( + "The mock table only reached " + + count + + " elements before the monitoring statement terminated."); + } + System.out.println("200 elements reached."); + } + + // Fails fast with a clear message instead of skipping, so that a CI pipeline with missing + // secrets cannot report a successful verification that never ran. The connection variable + // names come from ConfluentPluginOptions, so the list cannot drift from the plugin contract. + private static void requireEnvironment() { + List required = new ArrayList<>(); + // A properties file referenced via FLINK_PROPERTIES is a valid alternative to the + // discrete connection variables (see the README's "Configuration" section). + if (isBlank(System.getenv(ConfluentPluginOptions.VAR_FLINK_PROPERTIES))) { + required.add(ConfluentPluginOptions.VAR_CLOUD_PROVIDER); + required.add(ConfluentPluginOptions.VAR_CLOUD_REGION); + required.add(ConfluentPluginOptions.VAR_FLINK_API_KEY); + required.add(ConfluentPluginOptions.VAR_FLINK_API_SECRET); + required.add(ConfluentPluginOptions.VAR_ORG_ID); + required.add(ConfluentPluginOptions.VAR_ENV_ID); + required.add(ConfluentPluginOptions.VAR_COMPUTE_POOL_ID); + } + required.add("TARGET_CATALOG"); + required.add("TARGET_DATABASE"); + List missing = + required.stream() + .filter(name -> isBlank(System.getenv(name))) + .collect(Collectors.toList()); + if (!missing.isEmpty()) { + fail( + "Integration tests verify the pipeline against Confluent Cloud and require the" + + " environment variables " + + missing + + ". Set them (see the README section 'Via Environment Variables') or" + + " skip the integration tests explicitly with -DskipITs."); + } + } + + private static boolean isBlank(String value) { + return value == null || value.isBlank(); + } + + @Test + void countsVendorsPerBrandOnBoundedData() { + // Dynamic options allow influencing parts of a table scan. In this case, they define a + // range (from start offset '0' to end offset '100') how to read from Kafka. Effectively, + // they make the table bounded. If all tables are finite, the statement can terminate. + // This allows us to run checks on the result. + Table boundedProducts = + env.sqlQuery( + String.format( + "SELECT * FROM `%s` /*+ OPTIONS(\n" + + "'scan.startup.mode' = 'specific-offsets',\n" + + "'scan.startup.specific-offsets' = 'partition: 0, offset: 0',\n" + + "'scan.bounded.mode' = 'specific-offsets',\n" + + "'scan.bounded.specific-offsets' = 'partition: 0, offset: 100'\n" + + ") */", + SOURCE_TABLE)); + + // The exact same pipeline logic that the unit tests run locally + Table result = + Example_08_IntegrationAndDeployment.VendorsPerBrand.buildPipeline(boundedProducts); + + List rows = ConfluentTools.collectMaterialized(result.execute()); + + assertThat(rows).isNotEmpty(); + assertThat(rows) + .allSatisfy( + row -> { + assertThat(row.getFieldAs("brand")).isNotBlank(); + assertThat(row.getFieldAs("vendors")).isPositive(); + }); + // The examples data generator produces a fixed set of brands. Checking for a known one + // guards against reading the wrong data, not just producing plausible-looking rows. + assertThat(rows).extracting(row -> row.getFieldAs("brand")).contains("Apple"); + } +} diff --git a/src/test/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeploymentTest.java b/src/test/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeploymentTest.java new file mode 100644 index 0000000..948facc --- /dev/null +++ b/src/test/java/io/confluent/flink/examples/table/Example_08_IntegrationAndDeploymentTest.java @@ -0,0 +1,109 @@ +package io.confluent.flink.examples.table; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.CloseableIterator; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.table.api.Expressions.row; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Unit tests for the pipeline logic of {@link Example_08_IntegrationAndDeployment}. + * + *

These tests run entirely locally on the open source Flink engine with mock data from {@code + * fromValues()}. No Confluent Cloud connectivity, credentials, or compute pool are required, which + * makes them suitable for fast feedback during development and for CI runs on pull requests. + * + *

NOTE: The Confluent plugin and the open source planner cannot share a runtime classpath (both + * register engine factories under the identifier 'default'), so these tests must be executed via + * {@code ./mvnw test}, where the surefire configuration excludes the plugin. Running them directly + * from the IDE fails with "Multiple factories for identifier 'default'". To run them from the IDE + * anyway, exclude the {@code confluent-flink-table-api-java-plugin} JAR from the test's run + * configuration classpath (IntelliJ IDEA: Modify options -> Modify classpath -> Exclude). + * + *

ALSO NOTE: The local engine is not identical to Confluent Cloud. Confluent-specific features + * such as the {@code $rowtime} system column, the Confluent catalog, and Confluent SQL extensions + * are not available locally. Use the integration tests (see {@code + * Example_08_IntegrationAndDeploymentIT}) to verify behavior against the real service. + */ +class Example_08_IntegrationAndDeploymentTest { + + private static Table mockProducts(TableEnvironment env) { + return env.fromValues( + DataTypes.ROW( + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("brand", DataTypes.STRING())), + row("MacBook", "Apple"), + row("iPhone", "Apple"), + row("Galaxy", "Samsung")); + } + + @Test + void countsVendorsPerBrandInBatchMode() throws Exception { + // Batch mode computes the final result over the finite mock data, which makes + // assertions straightforward. + TableEnvironment env = TableEnvironment.create(EnvironmentSettings.inBatchMode()); + + Table result = + Example_08_IntegrationAndDeployment.VendorsPerBrand.buildPipeline( + mockProducts(env)); + + assertThat(collectRows(result)) + .containsExactlyInAnyOrder(Row.of("Apple", 2L), Row.of("Samsung", 1L)); + } + + @Test + void countsVendorsPerBrandInStreamingMode() throws Exception { + // Streaming mode emits a changelog: an insert for the first product of a brand, + // followed by update_before/update_after pairs as more products arrive. This mirrors + // how the statement behaves on Confluent Cloud. + TableEnvironment env = TableEnvironment.create(EnvironmentSettings.inStreamingMode()); + + Table result = + Example_08_IntegrationAndDeployment.VendorsPerBrand.buildPipeline( + mockProducts(env)); + + List changelog = collectRows(result); + assertThat(materialize(changelog)) + .containsExactlyInAnyOrder(Row.of("Apple", 2L), Row.of("Samsung", 1L)); + } + + private static List collectRows(Table table) throws Exception { + List rows = new ArrayList<>(); + try (CloseableIterator iterator = table.execute().collect()) { + iterator.forEachRemaining(rows::add); + } + return rows; + } + + // Applies the changelog to derive the final result, similar to what + // ConfluentTools.collectMaterialized() does for statements running on Confluent Cloud. + // Rows are copied so that the caller's changelog is left untouched. + private static List materialize(List changelog) { + List state = new ArrayList<>(); + for (Row row : changelog) { + Row copy = Row.copy(row); + copy.setKind(RowKind.INSERT); + switch (row.getKind()) { + case INSERT: + case UPDATE_AFTER: + state.add(copy); + break; + case UPDATE_BEFORE: + case DELETE: + state.remove(copy); + break; + } + } + return state; + } +} diff --git a/src/test/java/io/confluent/flink/examples/table/Example_09_FunctionsTest.java b/src/test/java/io/confluent/flink/examples/table/Example_09_FunctionsTest.java new file mode 100644 index 0000000..3c24541 --- /dev/null +++ b/src/test/java/io/confluent/flink/examples/table/Example_09_FunctionsTest.java @@ -0,0 +1,43 @@ +package io.confluent.flink.examples.table; + +import org.apache.flink.api.common.functions.util.ListCollector; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Unit tests for the User-Defined Functions of {@link Example_09_Functions}. + * + *

UDFs are plain Java classes, so their logic can be tested with JUnit alone: no Flink engine, + * no Confluent Cloud connectivity, and no artifact upload are required. This is the fastest test + * tier and should cover the bulk of a UDF's business logic before it is registered and exercised on + * Confluent Cloud. + */ +class Example_09_FunctionsTest { + + @Test + void customTaxReturnsRatePerLocation() { + Example_09_Functions.CustomTax tax = new Example_09_Functions.CustomTax(); + + assertThat(tax.eval("USA")).isEqualTo(10); + assertThat(tax.eval("EU")).isEqualTo(5); + assertThat(tax.eval("Mars")).isEqualTo(0); + } + + @Test + void explodeEmitsOneRowPerElement() { + Example_09_Functions.Explode explode = new Example_09_Functions.Explode(); + + // Table functions emit rows via a collector, which tests can replace with a list + List collected = new ArrayList<>(); + explode.setCollector(new ListCollector<>(collected)); + + explode.eval(List.of("Apples", "Bananas")); + + assertThat(collected).containsExactly("Apples", "Bananas"); + } +}