Because we cannot rely on production data in this example, the program sets up some - * Kafka-backed tables with data during the {@code setup} phase. + *
The example separates two concerns that often end up entangled: * - *
Afterward, the program can operate in two modes: one for integration testing ({@code test} - * phase) and one for deployment ({@code deploy} phase). + *
A CI/CD workflow could execute the following: + *
A CI/CD workflow (e.g. GitHub Actions, see {@code .github/workflows-examples/}) could execute + * the following: * *
- * export EXAMPLE_JAR=./target/flink-table-api-java-examples-1.0.jar - * export EXAMPLE_CLASS=io.confluent.flink.examples.table.Example_08_IntegrationAndDeployment - * java -jar $EXAMPLE_JAR $EXAMPLE_CLASS setup - * java -jar $EXAMPLE_JAR $EXAMPLE_CLASS test - * java -jar $EXAMPLE_JAR $EXAMPLE_CLASS deploy + * # Run local unit tests, no Confluent Cloud connectivity required + * ./mvnw test + * + * # Run integration tests against Confluent Cloud (see README for all variables) + * export FLINK_API_KEY=... FLINK_API_SECRET=... + * export TARGET_CATALOG=... TARGET_DATABASE=... + * ./mvnw verify + * + * # Deploy + * ./mvnw package + * java -cp target/flink-table-api-java-examples-1.0.jar \ + * io.confluent.flink.examples.table.Example_08_IntegrationAndDeployment ** - *
NOTE: This example requires write access to a Kafka cluster. Fill out the given variables - * below with target catalog/database if this is fine for you. + *
All connection and target configuration is provided via environment variables, so the same JAR + * can be promoted from a staging to a production environment by switching the variables (for + * example with GitHub Actions environments). * - *
ALSO NOTE: The example submits an unbounded background statement. Make sure to stop the - * statement in the Web UI afterward to clean up resources. + *
The statement is submitted under the deterministic name {@code vendors-per-brand}, so + * re-running the deployment with unchanged code is idempotent: the plugin detects that a statement + * with the same name and specification already exists. If the code changed, the conflicting + * statement must currently be deleted first (see {@code Example_12_StatementLifecycle}). Upcoming + * plugin versions simplify this with application name prefixes and an on-conflict replace option. * - *
The complete CI/CD workflow performs the following steps: + *
NOTE: This example requires write access to a Kafka cluster, configured via the environment + * variables TARGET_CATALOG (environment name) and TARGET_DATABASE (Kafka cluster name). * - *
ALSO NOTE: The example submits an unbounded background statement. Use {@code + * Example_12_StatementLifecycle} or the Web UI to stop and delete the statement afterward to clean + * up resources. */ public class Example_08_IntegrationAndDeployment { - // Fill this with an environment you have write access to - static final String TARGET_CATALOG = ""; - - // Fill this with a Kafka cluster you have write access to - static final String TARGET_DATABASE = ""; - - // Fill this with names of the Kafka Topics you want to create - static final String SOURCE_TABLE = "ProductsMock"; + // Name of the Kafka topic that stores the results static final String TARGET_TABLE = "VendorsPerBrand"; - // The following SQL will be tested on a finite subset of data before - // it gets deployed to production. - // In production, it will run on unbounded input. - // The '%s' parameterizes the SQL for testing. - static final String SQL = - "SELECT brand, COUNT(*) AS vendors FROM ProductsMock %s GROUP BY brand"; + // Deterministic statement name that makes the deployment addressable across CI/CD runs + static final String STATEMENT_NAME = "vendors-per-brand"; + + /** + * The pipeline logic under test: counts the number of vendors per brand. + * + *
This class must not reference any {@code io.confluent.flink.plugin} classes so that unit
+ * tests can run it on the open source Flink engine without the plugin on the classpath.
+ */
+ public static class VendorsPerBrand {
+ public static Table buildPipeline(Table products) {
+ return products.groupBy($("brand")).select($("brand"), lit(1).count().as("vendors"));
+ }
+ }
- // All logic is defined in a main() method. It can run both in an IDE or CI/CD system.
- public static void main(String[] args) throws Exception {
- if (args.length == 0) {
+ // The main() method only performs the deployment. Testing happens in JUnit (see above),
+ // lifecycle operations are a separate concern (see Example_12_StatementLifecycle).
+ public static void main(String[] args) {
+ if (args.length > 0) {
+ // Earlier versions of this example took a setup/test/deploy mode argument. Reject
+ // arguments so that a stale invocation cannot silently perform a deployment.
throw new IllegalArgumentException(
- "No mode specified. Possible values are 'setup', 'test', or 'deploy'.");
+ "This example does not take arguments: it always deploys. Testing happens in"
+ + " JUnit, lifecycle operations in Example_12_StatementLifecycle.");
}
- EnvironmentSettings settings = ConfluentSettings.fromResource("/cloud.properties");
+ // Reads CLOUD_PROVIDER, CLOUD_REGION, FLINK_API_KEY, FLINK_API_SECRET, ORG_ID, ENV_ID,
+ // and COMPUTE_POOL_ID from environment variables. In GitHub Actions, these map naturally
+ // to repository or environment secrets.
+ EnvironmentSettings settings = ConfluentSettings.fromGlobalVariables();
TableEnvironment env = TableEnvironment.create(settings);
- env.useCatalog(TARGET_CATALOG);
- env.useDatabase(TARGET_DATABASE);
-
- String mode = args[0];
- switch (mode) {
- case "setup":
- setupProgram(env);
- break;
- case "test":
- testProgram(env);
- break;
- case "deploy":
- deployProgram(env);
- break;
- default:
- throw new IllegalArgumentException("Unknown mode: " + mode);
- }
- }
- // --------------------------------------------------------------------------------------------
- // Setup Phase
- // --------------------------------------------------------------------------------------------
+ env.useCatalog(requireEnv("TARGET_CATALOG"));
+ env.useDatabase(requireEnv("TARGET_DATABASE"));
- private static void setupProgram(TableEnvironment env) throws Exception {
- System.out.println("Running setup...");
-
- System.out.println("Creating table..." + SOURCE_TABLE);
- // Create a mock table that has exactly the same schema as the example `products` table.
- // The LIKE clause is very convenient for this task which is why we use SQL here.
- // Since we use little data, a bucket of 1 is important to satisfy the `scan.bounded.mode`
- // during testing.
+ System.out.println("Creating table... " + TARGET_TABLE);
+ // The pipeline owns its output table and creates it on the first deployment.
env.executeSql(
String.format(
"CREATE TABLE IF NOT EXISTS `%s`\n"
- + "DISTRIBUTED INTO 1 BUCKETS\n"
- + "LIKE `examples`.`marketplace`.`products` (EXCLUDING OPTIONS)",
- SOURCE_TABLE));
-
- System.out.println("Start filling table...");
- // Let Flink copy generated data into the mock table. Note that the statement is unbounded
- // and submitted as a background statement by default.
- TableResult pipelineResult =
- env.from("`examples`.`marketplace`.`products`")
- .select(withAllColumns())
- .insertInto(SOURCE_TABLE)
- .execute();
-
- System.out.println("Waiting for at least 200 elements in table...");
- // We start a second Flink statement for monitoring how the copying progresses
- TableResult countResult = env.from(SOURCE_TABLE).select(lit(1).count()).as("c").execute();
- // This waits for the condition to be met:
- try (CloseableIterator Deployment (see {@code Example_08_IntegrationAndDeployment}) and lifecycle management are
+ * separate concerns: removing or changing code does not imply that a running statement should be
+ * stopped or deleted. This program makes those operations explicit, for example via a manually
+ * triggered GitHub Actions workflow (see {@code .github/workflows-examples/manage.yml}).
+ *
+ * Supported actions:
+ *
+ * NOTE: Statements are addressed by their exact name, which is why deployments should use
+ * deterministic statement names. Use the name printed by the deploy step: the plugin may append a
+ * suffix such as '-api' to configured names. Upcoming plugin versions guarantee exact names and add
+ * an application name concept that groups all statements of a program under a common prefix and
+ * allows listing them.
+ */
+public class Example_12_StatementLifecycle {
+
+ public static void main(String[] args) {
+ if (args.length < 2) {
+ throw new IllegalArgumentException(
+ "Usage: While the unit tests (see {@code Example_08_IntegrationAndDeploymentTest}) verify the logic
+ * locally, these tests verify it on the real service: the exact Confluent SQL semantics, the
+ * Confluent catalog, and Kafka-backed tables.
+ *
+ * The tests run during {@code ./mvnw verify} and fail fast when the required environment
+ * variables are not set, so a CI pipeline cannot silently skip its verification step (e.g. a deploy
+ * pipeline with a missing secret) and still report success. Builds that intentionally run without
+ * Confluent Cloud credentials skip integration tests explicitly with {@code ./mvnw verify
+ * -DskipITs}. The tests require the standard connection variables (see the README's "Via
+ * Environment Variables" section) plus TARGET_CATALOG and TARGET_DATABASE pointing to an
+ * environment and Kafka cluster with write access.
+ *
+ * Because we cannot rely on production data in this example, the test fixture creates a mock
+ * Kafka-backed table and fills it with data from the marketplace examples table. Dynamic options
+ * then make the table bounded, so the pipeline terminates and its result can be asserted.
+ *
+ * NOTE: When running from the IDE instead of {@code ./mvnw verify}, exclude the {@code
+ * flink-table-planner-loader} JAR from the test's run configuration classpath (the opposite of the
+ * unit tests; IntelliJ IDEA: Modify options -> Modify classpath -> Exclude) and set the required
+ * environment variables in the run configuration.
+ */
+class Example_08_IntegrationAndDeploymentIT {
+
+ // Name of the mock Kafka topic that emulates the production input
+ static final String SOURCE_TABLE = "ProductsMock";
+
+ static TableEnvironment env;
+
+ @BeforeAll
+ // The timeout runs the setup in a separate thread so that it can be interrupted even while
+ // blocked on statement results, e.g. when the compute pool has no capacity for the fill
+ // statement. Without it, a stuck setup would hang until the CI job timeout.
+ @Timeout(value = 15, unit = TimeUnit.MINUTES, threadMode = Timeout.ThreadMode.SEPARATE_THREAD)
+ static void setUpMockTable() throws Exception {
+ requireEnvironment();
+ env = TableEnvironment.create(ConfluentSettings.fromGlobalVariables());
+ env.useCatalog(System.getenv("TARGET_CATALOG"));
+ env.useDatabase(System.getenv("TARGET_DATABASE"));
+
+ System.out.println("Creating table... " + SOURCE_TABLE);
+ // Create a mock table that has exactly the same schema as the example `products` table.
+ // The LIKE clause is very convenient for this task which is why we use SQL here.
+ // Since we use little data, a bucket of 1 is important to satisfy the
+ // `scan.bounded.mode` during testing.
+ env.executeSql(
+ String.format(
+ "CREATE TABLE IF NOT EXISTS `%s`\n"
+ + "DISTRIBUTED INTO 1 BUCKETS\n"
+ + "LIKE `examples`.`marketplace`.`products` (EXCLUDING OPTIONS)",
+ SOURCE_TABLE));
+
+ System.out.println("Start filling table...");
+ // Let Flink copy generated data into the mock table. Note that the statement is
+ // unbounded and submitted as a background statement by default.
+ TableResult pipelineResult =
+ env.from("`examples`.`marketplace`.`products`")
+ .select(withAllColumns())
+ .insertInto(SOURCE_TABLE)
+ .execute();
+
+ long count = 0;
+ try {
+ System.out.println("Waiting for at least 200 elements in table...");
+ // A second Flink statement monitors how the copying progresses. The foreground
+ // statement is stopped automatically when its iterator is closed.
+ TableResult countResult =
+ env.from(SOURCE_TABLE).select(lit(1).count()).as("c").execute();
+ try (CloseableIterator These tests run entirely locally on the open source Flink engine with mock data from {@code
+ * fromValues()}. No Confluent Cloud connectivity, credentials, or compute pool are required, which
+ * makes them suitable for fast feedback during development and for CI runs on pull requests.
+ *
+ * NOTE: The Confluent plugin and the open source planner cannot share a runtime classpath (both
+ * register engine factories under the identifier 'default'), so these tests must be executed via
+ * {@code ./mvnw test}, where the surefire configuration excludes the plugin. Running them directly
+ * from the IDE fails with "Multiple factories for identifier 'default'". To run them from the IDE
+ * anyway, exclude the {@code confluent-flink-table-api-java-plugin} JAR from the test's run
+ * configuration classpath (IntelliJ IDEA: Modify options -> Modify classpath -> Exclude).
+ *
+ * ALSO NOTE: The local engine is not identical to Confluent Cloud. Confluent-specific features
+ * such as the {@code $rowtime} system column, the Confluent catalog, and Confluent SQL extensions
+ * are not available locally. Use the integration tests (see {@code
+ * Example_08_IntegrationAndDeploymentIT}) to verify behavior against the real service.
+ */
+class Example_08_IntegrationAndDeploymentTest {
+
+ private static Table mockProducts(TableEnvironment env) {
+ return env.fromValues(
+ DataTypes.ROW(
+ DataTypes.FIELD("name", DataTypes.STRING()),
+ DataTypes.FIELD("brand", DataTypes.STRING())),
+ row("MacBook", "Apple"),
+ row("iPhone", "Apple"),
+ row("Galaxy", "Samsung"));
+ }
+
+ @Test
+ void countsVendorsPerBrandInBatchMode() throws Exception {
+ // Batch mode computes the final result over the finite mock data, which makes
+ // assertions straightforward.
+ TableEnvironment env = TableEnvironment.create(EnvironmentSettings.inBatchMode());
+
+ Table result =
+ Example_08_IntegrationAndDeployment.VendorsPerBrand.buildPipeline(
+ mockProducts(env));
+
+ assertThat(collectRows(result))
+ .containsExactlyInAnyOrder(Row.of("Apple", 2L), Row.of("Samsung", 1L));
+ }
+
+ @Test
+ void countsVendorsPerBrandInStreamingMode() throws Exception {
+ // Streaming mode emits a changelog: an insert for the first product of a brand,
+ // followed by update_before/update_after pairs as more products arrive. This mirrors
+ // how the statement behaves on Confluent Cloud.
+ TableEnvironment env = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+
+ Table result =
+ Example_08_IntegrationAndDeployment.VendorsPerBrand.buildPipeline(
+ mockProducts(env));
+
+ List UDFs are plain Java classes, so their logic can be tested with JUnit alone: no Flink engine,
+ * no Confluent Cloud connectivity, and no artifact upload are required. This is the fastest test
+ * tier and should cover the bulk of a UDF's business logic before it is registered and exercised on
+ * Confluent Cloud.
+ */
+class Example_09_FunctionsTest {
+
+ @Test
+ void customTaxReturnsRatePerLocation() {
+ Example_09_Functions.CustomTax tax = new Example_09_Functions.CustomTax();
+
+ assertThat(tax.eval("USA")).isEqualTo(10);
+ assertThat(tax.eval("EU")).isEqualTo(5);
+ assertThat(tax.eval("Mars")).isEqualTo(0);
+ }
+
+ @Test
+ void explodeEmitsOneRowPerElement() {
+ Example_09_Functions.Explode explode = new Example_09_Functions.Explode();
+
+ // Table functions emit rows via a collector, which tests can replace with a list
+ List
+ * export FLINK_API_KEY=... FLINK_API_SECRET=... (see README for all variables)
+ *
+ * java -cp target/flink-table-api-java-examples-1.0.jar \
+ * io.confluent.flink.examples.table.Example_12_StatementLifecycle status vendors-per-brand-api
+ *
+ *
+ *
+ *
+ *
+ *