Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions .github/workflows-examples/deploy.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# EXAMPLE WORKFLOW - copy this file to .github/workflows/ in your own repository.
#
# It lives in workflows-examples/ because this examples repository does not deploy
# to a real Confluent Cloud environment on push.
#
# The workflow deploys the table program of Example_08_IntegrationAndDeployment:
#
# 1. Local unit tests run on the open source Flink engine (no credentials needed).
# 2. Integration tests run against Confluent Cloud. They fail fast if any of the
# secrets below are missing, so a misconfigured pipeline cannot skip its
# verification step and still deploy.
# 3. The deploy step runs the program's main() method, which submits the statement
# under a deterministic name. Re-running the workflow with unchanged code is
# idempotent: the plugin detects that an identical statement already exists.
#
# If the pipeline code changed, the conflicting statement currently has to be deleted
# first via the manage.yml workflow. Upcoming plugin versions simplify this with
# application name prefixes and an on-conflict replace option, removing the manual step.
#
# For staging-to-production promotion, run the same job against different GitHub
# environments so that each environment provides its own secrets and protection rules.
#
# Required secrets (see the README section "Via Environment Variables"):
# CLOUD_PROVIDER, CLOUD_REGION, FLINK_API_KEY, FLINK_API_SECRET,
# ORG_ID, ENV_ID, COMPUTE_POOL_ID, TARGET_CATALOG, TARGET_DATABASE
name: Deploy Table API Program

on:
workflow_dispatch:
# In a real project, deploy when changes are merged:
# push:
# branches: [main]

env:
CLOUD_PROVIDER: ${{ secrets.CLOUD_PROVIDER }}
CLOUD_REGION: ${{ secrets.CLOUD_REGION }}
FLINK_API_KEY: ${{ secrets.FLINK_API_KEY }}
FLINK_API_SECRET: ${{ secrets.FLINK_API_SECRET }}
ORG_ID: ${{ secrets.ORG_ID }}
ENV_ID: ${{ secrets.ENV_ID }}
COMPUTE_POOL_ID: ${{ secrets.COMPUTE_POOL_ID }}
TARGET_CATALOG: ${{ secrets.TARGET_CATALOG }}
TARGET_DATABASE: ${{ secrets.TARGET_DATABASE }}

jobs:
test-and-deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6

- uses: actions/setup-java@v5
with:
distribution: 'temurin'
java-version: '17'
cache: maven

- name: Run local unit tests and integration tests against Confluent Cloud
run: ./mvnw -B --no-transfer-progress verify

- name: Deploy to Confluent Cloud
run: >
java -cp target/flink-table-api-java-examples-1.0.jar
io.confluent.flink.examples.table.Example_08_IntegrationAndDeployment
77 changes: 77 additions & 0 deletions .github/workflows-examples/manage.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# EXAMPLE WORKFLOW - copy this file to .github/workflows/ in your own repository.
#
# It lives in workflows-examples/ because this examples repository does not manage
# statements in a real Confluent Cloud environment.
#
# Deployment and lifecycle management are separate concerns: removing or changing code
# does not imply that a running statement should be stopped or deleted. This workflow
# makes lifecycle operations explicit. It is triggered manually from the GitHub Actions
# UI and runs Example_12_StatementLifecycle with the chosen action.
#
# Example usage (use the exact statement name as printed by the deploy step; the
# plugin may append a suffix such as '-api' to configured names, which upcoming
# plugin versions remove):
# - Check a deployment: action=status, statement-name=vendors-per-brand-api
# - Stop a statement: action=stop, statement-name=vendors-per-brand-api
# - Resume after a stop: action=resume, statement-name=vendors-per-brand-api
# - Permanently remove: action=delete, statement-name=vendors-per-brand-api
#
# Upcoming plugin versions add an application name concept that groups all statements
# of a program under a common prefix and allows listing them, so that statements no
# longer need to be addressed one by one.
#
# Required secrets (see the README section "Via Environment Variables"):
# CLOUD_PROVIDER, CLOUD_REGION, FLINK_API_KEY, FLINK_API_SECRET,
# ORG_ID, ENV_ID, COMPUTE_POOL_ID
name: Manage Flink Statements

on:
workflow_dispatch:
inputs:
action:
description: 'Action to perform'
required: true
type: choice
options:
- status
- stop
- resume
- delete
statement-name:
description: 'Full statement name as printed by the deploy step (e.g. vendors-per-brand-api)'
required: true

env:
CLOUD_PROVIDER: ${{ secrets.CLOUD_PROVIDER }}
CLOUD_REGION: ${{ secrets.CLOUD_REGION }}
FLINK_API_KEY: ${{ secrets.FLINK_API_KEY }}
FLINK_API_SECRET: ${{ secrets.FLINK_API_SECRET }}
ORG_ID: ${{ secrets.ORG_ID }}
ENV_ID: ${{ secrets.ENV_ID }}
COMPUTE_POOL_ID: ${{ secrets.COMPUTE_POOL_ID }}

jobs:
manage:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6

- uses: actions/setup-java@v5
with:
distribution: 'temurin'
java-version: '17'
cache: maven

- name: Build
run: ./mvnw -B --no-transfer-progress -DskipTests package

- name: Run lifecycle action
# Inputs are passed via env and quoted instead of interpolated into the
# script, so that free-text input cannot inject shell commands.
env:
ACTION: ${{ inputs.action }}
STATEMENT_NAME: ${{ inputs.statement-name }}
run: >
java -cp target/flink-table-api-java-examples-1.0.jar
io.confluent.flink.examples.table.Example_12_StatementLifecycle
"$ACTION" "$STATEMENT_NAME"
27 changes: 27 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Continuous integration for this repository.
#
# Runs entirely without Confluent Cloud connectivity: code format check (Spotless),
# compilation, local unit tests on the open source Flink engine, and the fat JAR build.
# Integration tests (*IT) require Confluent Cloud credentials and would fail without
# them, so this workflow skips them explicitly with -DskipITs.
name: CI

on:
pull_request:
push:
branches: [master]

jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6

- uses: actions/setup-java@v5
with:
distribution: 'temurin'
java-version: '17'
cache: maven

- name: Build and test
run: ./mvnw -B --no-transfer-progress verify -DskipITs
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ target/
.java-version
*.DS_Store
cloud.properties
dependency-reduced-pom.xml
dependency-reduced-pom.xml
*.env
105 changes: 105 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,20 @@ public static void main(String[] args) {
// pipeline.execute().await();
```

## Developer Journey

The examples in this repository follow the journey of taking a table program from a first
experiment all the way to a production deployment:

| Stage | What you do | Where to look |
|-------|-------------|---------------|
| Get started | Configure a connection to Confluent Cloud and run a first program | `Example_00` - `Example_02`, [Getting Started](#getting-started) |
| Build | Transform tables, build pipelines, work with data types, UDFs, and structured objects | `Example_03` - `Example_07`, `Example_09` - `Example_11`, `TableProgramTemplate` |
| Test locally | Run unit tests on mock data, without Confluent Cloud connectivity | `src/test/java/`, [Testing Table Programs](#testing-table-programs) |
| Test on Confluent Cloud | Run integration tests against the real service | `Example_08_IntegrationAndDeploymentIT`, [Testing Table Programs](#testing-table-programs) |
| Deploy | Submit statements with deterministic names from a CI/CD pipeline | `Example_08_IntegrationAndDeployment`, [CI/CD with GitHub Actions](#cicd-with-github-actions) |
| Operate | Check status, stop, resume, and delete deployed statements | `Example_12_StatementLifecycle`, `.github/workflows-examples/manage.yml` |

## Getting Started

### Prerequisites
Expand Down Expand Up @@ -456,6 +470,97 @@ ConfluentSettings settings3 = ConfluentSettings.newBuilder()
.build();
```

## Testing Table Programs

Table programs can be tested in three tiers, from fastest feedback to highest fidelity:

1. **Unit tests on plain logic.** UDFs and other business logic are plain Java classes and can be
tested with JUnit alone: no Flink engine, no Confluent Cloud connectivity, and no artifact
upload required. See `Example_09_FunctionsTest`.
2. **Local pipeline tests on the open source Flink engine.** Pipeline logic that is structured as
a function from input `Table`s to an output `Table` (see `VendorsPerBrand` in
`Example_08_IntegrationAndDeployment`) can be executed locally with mock data from
`fromValues()`, without Confluent Cloud connectivity. See
`Example_08_IntegrationAndDeploymentTest` and run with `./mvnw test`.
3. **Integration tests against Confluent Cloud.** The same pipeline logic runs on the real
service with the exact Confluent semantics, on a Kafka-backed table that is made bounded with
dynamic options. See `Example_08_IntegrationAndDeploymentIT` and run with `./mvnw verify`.
These tests require the connection environment variables (see
[Via Environment Variables](#via-environment-variables)) plus `TARGET_CATALOG` (the name of
your Confluent Cloud environment) and `TARGET_DATABASE` (the name of a Kafka cluster with
write access), and fail fast when any are missing, so a CI pipeline cannot silently skip its
verification step and still report success. To build without Confluent Cloud credentials on
purpose, skip them explicitly: `./mvnw verify -DskipITs`.

### How local testing works

The Confluent plugin executes all statements on Confluent Cloud; it does not contain a local
engine. Local tests therefore run on the open source Flink engine (planner, runtime, and an
embedded mini-cluster), which this project adds as test-scoped dependencies.

The plugin and the open source planner cannot share a runtime classpath: both register their
engine factories under the identifier `default`, and `TableEnvironment.create(...)` fails with
`Multiple factories for identifier 'default'` if both are present. This project resolves the
conflict with classpath exclusions in the `pom.xml`:

- `./mvnw test` (surefire) excludes the Confluent plugin, so unit tests run on the open source
engine.
- `./mvnw verify` (failsafe, test classes named `*IT`) excludes the open source planner, so
integration tests run against Confluent Cloud.

Keep pipeline logic free of `io.confluent.flink.plugin` imports so that unit tests can execute it
locally.

NOTE: IDEs ignore these classpath exclusions, so run the tests via `./mvnw test` and
`./mvnw verify` instead of the IDE's test runner. To use the IDE's test runner anyway, replicate
the exclusion in the test's run configuration (IntelliJ IDEA: Modify options -> Modify classpath ->
Exclude): exclude the `confluent-flink-table-api-java-plugin` JAR for unit tests, or the
`flink-table-planner-loader` JAR for integration tests. For production projects, the cleaner
structure is a multi-module build: one module contains the pipeline logic with only
`flink-table-api-java` and the open source test dependencies, and another module adds the
Confluent plugin and the deployment entrypoints.

### Local testing limitations

The local engine is not identical to Confluent Cloud:

- The `$rowtime` system column and other Confluent system columns do not exist locally.
- There is no local catalog mirroring your Confluent Cloud schemas. Mock tables are declared
manually with `fromValues()` and must be kept in sync with the real schemas.
- Confluent-specific SQL syntax (such as `DISTRIBUTED INTO ... BUCKETS`) and Confluent-provided
functions are not available.

Local tests give fast feedback on transformation logic; integration tests against Confluent Cloud
remain the source of truth.

## CI/CD with GitHub Actions

The repository contains workflows that show how a table program moves through a CI/CD pipeline:

- `.github/workflows/ci.yml` runs in this repository on every pull request: code format check,
compilation, local unit tests, and the fat JAR build. It requires no Confluent Cloud
credentials.
- `.github/workflows-examples/deploy.yml` is a template for your own repository: it runs the
integration tests against Confluent Cloud and then deploys the program by running its `main()`
method. Because the statement name is deterministic (see
`Example_08_IntegrationAndDeployment`), re-running the workflow with unchanged code is
idempotent.
- `.github/workflows-examples/manage.yml` is a template for explicit lifecycle operations:
checking status, stopping, resuming, or deleting a deployed statement by name (see
`Example_12_StatementLifecycle`). Deployment and lifecycle management are separate concerns;
removing code does not imply that a running statement should be stopped or deleted.

The workflows authenticate via the environment variables described in
[Via Environment Variables](#via-environment-variables), mapped from GitHub Actions secrets. In
addition, the integration tests and the deploy entrypoint read `TARGET_CATALOG` (the name of the
Confluent Cloud environment) and `TARGET_DATABASE` (the name of the Kafka cluster) to determine
where tables may be created. For staging-to-production promotion, run the same deploy job against
different GitHub environments so that each environment provides its own secrets and protection
rules.

Upcoming plugin versions simplify these workflows further with application name prefixes,
an on-conflict replace option for redeployments, and listing all statements of an application.

## Documentation for Confluent Utilities

### Confluent Tools
Expand Down
76 changes: 76 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.26.0</log4j.version>
<spotless.version>3.6.0</spotless.version>
<junit.version>5.12.2</junit.version>
<assertj.version>3.27.3</assertj.version>
<surefire.version>3.5.4</surefire.version>
</properties>

<repositories>
Expand Down Expand Up @@ -53,6 +56,40 @@
<version>${confluent-plugin.version}</version>
</dependency>

<!-- Test dependencies for running table programs locally without Confluent Cloud
connectivity. The OSS Flink planner, runtime, and an embedded mini-cluster
(via flink-clients) execute pipeline logic on mock data in unit tests. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>

<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
Expand Down Expand Up @@ -119,6 +156,45 @@
</executions>
</plugin>

<!-- Unit tests (mvn test) run table programs locally on the OSS Flink engine.
The Confluent plugin and the OSS planner both register their Executor/Planner
factories under the identifier 'default', so they cannot share a runtime
classpath. Unit tests therefore exclude the Confluent plugin.
NOTE: IDEs ignore this exclusion; run tests via ./mvnw test instead. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${surefire.version}</version>
<configuration>
<classpathDependencyExcludes>
<classpathDependencyExclude>io.confluent.flink:confluent-flink-table-api-java-plugin</classpathDependencyExclude>
</classpathDependencyExcludes>
</configuration>
</plugin>

<!-- Integration tests (mvn verify, classes named *IT) run against Confluent Cloud
using the Confluent plugin and therefore exclude the OSS planner instead.
They fail fast when no Confluent Cloud credentials are configured; skip them
explicitly with -DskipITs. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>${surefire.version}</version>
<configuration>
<classpathDependencyExcludes>
<classpathDependencyExclude>org.apache.flink:flink-table-planner-loader</classpathDependencyExclude>
</classpathDependencyExcludes>
</configuration>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
Expand Down
Loading