Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 67 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,70 @@
# Estuary Examples

This repository is a collection of example projects utilizing Estuary. See subdirectories for specific projects and instructions.
A collection of hands-on, runnable examples for building real-time data pipelines with [Estuary](https://estuary.dev). Each project is self-contained and covers a production-grade pattern: change data capture (CDC) from databases like PostgreSQL, MongoDB, Oracle, and SQL Server; streaming ETL and materializations into warehouses and lakehouses; SQL, TypeScript, and Python derivations; and real-time Retrieval-Augmented Generation (RAG) and AI pipelines. Clone any folder and stream live data in minutes.

For more, check out the Estuary [blog](https://estuary.dev/blog/) and [documentation](https://docs.estuary.dev/).
## What is Estuary?

Estuary is a real-time data integration platform that captures change data capture (CDC) streams and event data from your databases, SaaS apps, and streams, then materializes them into warehouses, lakehouses, vector databases, and analytics tools with millisecond latency. Learn more at [estuary.dev](https://estuary.dev) and read the [documentation](https://docs.estuary.dev).

## Database CDC Captures

| Example | Description |
| --- | --- |
| [postgres-simple-capture](./postgres-simple-capture) | Minimal, self-contained PostgreSQL CDC demo using Docker and an ngrok tunnel to stream row changes into an Estuary collection. |
| [postgres-cloudsql-simple-capture](./postgres-cloudsql-simple-capture) | Real-time PostgreSQL CDC pipeline targeting Google Cloud SQL for PostgreSQL with the Cloud SQL Python Connector. |
| [oracle-capture](./oracle-capture) | Oracle CDC capture from a free, local Oracle Database 23.6 in Docker using LogMiner-based logical replication. |
| [sqlserver-cdc-capture](./sqlserver-cdc-capture) | Self-contained SQL Server 2022 environment with Change Data Capture enabled and a continuous insert/update/delete data generator. |
| [kafka-capture](./kafka-capture) | Capture real-time IoT topics from an Amazon MSK (Apache Kafka) cluster into Estuary using AWS IAM authentication. |
| [estuary-demo-movies](./estuary-demo-movies) | Seed a `movies` table in any ANSI-SQL database as a ready-to-capture source for an Estuary capture. |
| [shipments-datagen](./shipments-datagen) | Dockerized PostgreSQL data generator that continuously mutates realistic shipments data, pre-wired for Estuary CDC. |
| [postgres-measure-wal-throughput](./postgres-measure-wal-throughput) | Measure PostgreSQL WAL throughput to size and forecast a CDC pipeline's change-event volume before you build it. |

## Materializations & Destinations

| Example | Description |
| --- | --- |
| [estuary-motherduck-demo-2025](./estuary-motherduck-demo-2025) | Stream PostgreSQL CDC into MotherDuck in real time, keeping analytical tables up to date with low latency. |
| [estuary-motherduck-orders](./estuary-motherduck-orders) | Stream a live pet-store order feed from PostgreSQL into MotherDuck (serverless DuckDB) via CDC. |
| [postgres-cdc-bigquery-dbt](./postgres-cdc-bigquery-dbt) | End-to-end ELT pipeline streaming PostgreSQL CDC into Google BigQuery, then modeling it with dbt. |
| [postgresql-cdc-databricks-fraud-detection](./postgresql-cdc-databricks-fraud-detection) | Real-time fraud detection pipeline streaming PostgreSQL CDC into Databricks for SQL-based lakehouse analysis. |
| [singlestore-webinar-2025](./singlestore-webinar-2025) | Stream PostgreSQL CDC into SingleStore in real time for low-latency analytics (Estuary x SingleStore webinar demo). |
| [mongodb-tinybird-clickstream](./mongodb-tinybird-clickstream) | Capture a live e-commerce clickstream from MongoDB Atlas and materialize it into Tinybird. |
| [sqlserver-cdc-materialize](./sqlserver-cdc-materialize) | Stream SQL Server CDC into Materialize via the Dekaf Kafka-compatible API to power an incrementally maintained view. |
| [pyiceberg-aws-glue](./pyiceberg-aws-glue) | Query an Apache Iceberg table that Estuary materialized to S3 using PyIceberg and the AWS Glue Data Catalog. |

## Derivations & Transformations

| Example | Description |
| --- | --- |
| [derivations-ad-performance](./derivations-ad-performance) | Real-time ad performance analytics joining impression and click streams with a stateful TypeScript derivation. |
| [derivations-sql-full-outer-join](./derivations-sql-full-outer-join) | Implement a full outer join across two collections with a SQLite-backed Estuary SQL derivation. |
| [python-derivations](./python-derivations) | Four Python derivation patterns: stateless transforms, stateful aggregation, streaming joins, and ML feature engineering. |

## Real-Time RAG & AI

| Example | Description |
| --- | --- |
| [google-sheets-pinecone-rag](./google-sheets-pinecone-rag) | End-to-end real-time RAG: stream Google Sheets rows to Pinecone embeddings and serve a Streamlit chatbot. |
| [mongodb-pinecone-rag](./mongodb-pinecone-rag) | Stream MongoDB product reviews to a Pinecone vector index for a real-time RAG Streamlit chat app. |
| [snowflake-cdc-pinecone-rag](./snowflake-cdc-pinecone-rag) | Stream Snowflake CDC into Pinecone vectors in real time, queried by a Streamlit RAG chatbot. |

## Streaming, Lakehouse & Stream Processing

| Example | Description |
| --- | --- |
| [dekaf-kcat](./dekaf-kcat) | Consume a live Estuary collection from the CLI with kcat over Estuary's Kafka-compatible Dekaf API. |
| [dekaf-python](./dekaf-python) | Consume a real-time Estuary collection in Python with `confluent-kafka`, Dekaf, and an Avro schema registry. |
| [estuary-bytewax](./estuary-bytewax) | Stream MongoDB CDC into a Bytewax Python dataflow via Dekaf to compute tumbling-window metrics. |
| [streaming-lakehouse-iceberg-duckdb](./streaming-lakehouse-iceberg-duckdb) | Build a streaming lakehouse: PostgreSQL CDC into Apache Iceberg on S3 (AWS Glue), queried with PyIceberg/DuckDB. |
| [shipments_eta](./shipments_eta) | Real-time freight ETA tracking from MongoDB CDC to Tinybird/ClickHouse with a Next.js dashboard via Dekaf. |

## Demos, Workshops & Webinars

| Example | Description |
| --- | --- |
| [hands-on-lab-postgres-motherduck](./hands-on-lab-postgres-motherduck) | Guided hands-on lab: PostgreSQL CDC to MotherDuck with soft delete, hard delete, and SCD2 materialization patterns. |
| [estuary-coaelsce-demo-2025](./estuary-coaelsce-demo-2025) | Self-contained PostgreSQL CDC fraud-detection demo with anomaly injection (Estuary x Coalesce 2025). |

---

Built with [Estuary](https://estuary.dev). Read the [blog](https://estuary.dev/blog/), explore the [documentation](https://docs.estuary.dev), or get started in the [dashboard](https://dashboard.estuary.dev).
119 changes: 119 additions & 0 deletions dekaf-kcat/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# Consume an Estuary Collection from the CLI with kcat (Kafka) via Dekaf

Read a live Estuary collection straight from your terminal using [kcat](https://github.com/edenhill/kcat) (the Kafka CLI, formerly `kafkacat`) over Estuary's Kafka-compatible **Dekaf** API. The included `consume.sh` connects to a Dekaf bootstrap endpoint over `SASL_SSL` / `PLAIN` and tails the public demo `wikipedia/recentchange` collection — a real-time stream of Wikipedia edit events — with no Estuary-specific tooling required.

Because Dekaf speaks the Kafka wire protocol, any existing Kafka consumer (kcat, the Java client, `confluent-kafka`, Spark, Flink, ksqlDB, etc.) can read an Estuary collection as if it were a Kafka topic.

## How it works

Estuary [captures](https://docs.estuary.dev/concepts/captures/) data from sources into [collections](https://docs.estuary.dev/concepts/collections/) — schematized JSON streams backed by cloud storage. **Dekaf** exposes those collections through a Kafka-compatible endpoint, so a Kafka consumer can subscribe to a collection as a topic.

```
Source ──capture──▶ Estuary collection ──Dekaf (Kafka API)──▶ kcat (this example)
(demo/wikipedia/recentchange-sampled)
```

- **Bootstrap server**: the Dekaf endpoint, addressed like a Kafka broker.
- **Topic**: the Estuary collection name.
- **Auth**: SASL `PLAIN` over TLS. Username is the Dekaf task name (or `{}` for public demo topics); password is an Estuary access token (empty for public demo topics).

## What's included

- `consume.sh` — a single `kcat` consumer invocation that connects to Dekaf and prints messages from the public Wikipedia recent-changes demo collection.

## Prerequisites

- **kcat** installed and on your `PATH`:
- macOS: `brew install kcat`
- Debian/Ubuntu: `apt-get install kcat`
- Or see the [kcat install docs](https://github.com/edenhill/kcat#install).
- Nothing else for the public demo topic. To read **your own** collections you need a free [Estuary account](https://dashboard.estuary.dev) and an Estuary access token.

## Running it

The script as committed:

```bash
kcat -C \
-b dekaf.estuary-data.com:9092 \
-t demo/wikipedia/recentchange-sampled \
-X security.protocol=sasl_ssl \
-X sasl.mechanisms=PLAIN \
-X sasl.username='{}' \
-X sasl.password=''
```

Run it:

```bash
chmod +x consume.sh
./consume.sh
```

`kcat -C` runs in **consumer** mode and streams the Wikipedia `recentchange` events to stdout. Press `Ctrl-C` to stop.

Flags explained:

| Flag | Value | Meaning |
| --- | --- | --- |
| `-C` | — | Consumer mode |
| `-b` | `dekaf.estuary-data.com:9092` | Dekaf bootstrap server (Kafka broker) |
| `-t` | `demo/wikipedia/recentchange-sampled` | Collection name, used as the Kafka topic |
| `-X security.protocol` | `sasl_ssl` | Encrypted connection with SASL auth |
| `-X sasl.mechanisms` | `PLAIN` | SASL PLAIN mechanism |
| `-X sasl.username` | `{}` | Public demo placeholder (use your Dekaf task name for private collections) |
| `-X sasl.password` | (empty) | Public demo placeholder (use your Estuary access token for private collections) |

> **Note on the bootstrap host:** the script uses Estuary's production Dekaf endpoint, `dekaf.estuary-data.com:9092` (with the schema registry at `https://dekaf.estuary-data.com`). A legacy host, `dekaf.fly.dev:9092`, appeared in older versions of this example but no longer resolves — use `dekaf.estuary-data.com:9092`. See the [Dekaf reading guide](https://docs.estuary.dev/guides/dekaf_reading_collections_from_kafka/) for the authoritative endpoint and connection settings.

## Reading your own collections

To consume a private collection instead of the public demo:

1. Sign in to the [Estuary dashboard](https://dashboard.estuary.dev) and create a [Dekaf materialization](https://docs.estuary.dev/guides/dekaf_reading_collections_from_kafka/) (or use an existing one) to expose your collection over the Kafka API.
2. Generate an Estuary access token (refresh token) from the dashboard.
3. Run kcat with the production endpoint, your Dekaf task name as the username, and the token as the password:

```bash
export DEKAF_TASK_NAME="your-org/your-dekaf-task"
export DEKAF_ACCESS_TOKEN="your-estuary-access-token"

kcat -C \
-b dekaf.estuary-data.com:9092 \
-t your-collection-name \
-X security.protocol=sasl_ssl \
-X sasl.mechanisms=PLAIN \
-X sasl.username="$DEKAF_TASK_NAME" \
-X sasl.password="$DEKAF_ACCESS_TOKEN"
```

The Kafka topic name is the collection name as exposed by your Dekaf task.

## Verify

If the connection is working, you'll see a continuous stream of JSON Wikipedia edit events printed to your terminal. To consume from the beginning of the available data instead of the live tail, add `-o beginning`:

```bash
kcat -C -o beginning \
-b dekaf.estuary-data.com:9092 \
-t demo/wikipedia/recentchange-sampled \
-X security.protocol=sasl_ssl \
-X sasl.mechanisms=PLAIN \
-X sasl.username='{}' \
-X sasl.password=''
```

To print only a fixed number of messages and exit, add `-c <N>` (for example `-c 10`).

## Next steps

- Consume the same collection from a Python application with the Avro schema registry: see the [`dekaf-python`](../dekaf-python) example in this repository. Note that the Python example subscribes to the topic as `recentchange-sampled` while this kcat example uses the fully-qualified `demo/wikipedia/recentchange-sampled`; both refer to the same demo collection, so use whichever topic string the example you are running already specifies.
- Wire any other Kafka client (Spark, Flink, ksqlDB, the Java client) to an Estuary collection through Dekaf.
- Build your own pipeline: create a [capture](https://dashboard.estuary.dev/captures), land it in a collection, and expose it over Dekaf.

## References

- Dekaf — reading collections from Kafka: https://docs.estuary.dev/guides/dekaf_reading_collections_from_kafka/
- Estuary documentation: https://docs.estuary.dev
- Estuary dashboard: https://dashboard.estuary.dev
- kcat (Kafka CLI): https://github.com/edenhill/kcat
2 changes: 1 addition & 1 deletion dekaf-kcat/consume.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
kcat -C \
-b dekaf.fly.dev:9092 \
-b dekaf.estuary-data.com:9092 \
-t demo/wikipedia/recentchange-sampled \
-X security.protocol=sasl_ssl \
-X sasl.mechanisms=PLAIN \
Expand Down
121 changes: 121 additions & 0 deletions dekaf-python/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# Consume an Estuary Collection in Python with Kafka and Avro via Estuary Dekaf

Read a real-time Estuary collection from Python using the `confluent-kafka` client, Estuary's **Dekaf** Kafka-compatible API, and an Avro schema registry. This example consumes the `recentchange-sampled` collection (Wikimedia RecentChange events) over SASL_SSL, deserializes each record against the registry-managed Avro schema, and prints it. No Kafka cluster, no Debezium, no schema-registry to operate — Dekaf exposes any Estuary collection as a Kafka topic.

## How it works

Estuary captures source data into **collections** (a schematized, real-time data lake of JSON in cloud storage). **Dekaf** is Estuary's Kafka-compatible streaming API: it lets any Kafka consumer read a collection as if it were a Kafka topic, complete with a Confluent-style schema registry that serves Avro schemas for each collection.

```
Estuary collection (recentchange-sampled)
Dekaf ── Kafka-compatible broker + Avro schema registry
broker: dekaf.estuary-data.com:9092 (SASL_SSL / PLAIN)
schema registry: https://dekaf.estuary-data.com
main.py ── confluent-kafka Consumer + AvroDeserializer ──► prints records
```

`main.py`:
1. Connects a `confluent_kafka.Consumer` to `dekaf.estuary-data.com:9092` using `security.protocol=SASL_SSL`, `sasl.mechanism=PLAIN`, username = the Dekaf task name, password = an Estuary access token.
2. Connects a `SchemaRegistryClient` to `https://dekaf.estuary-data.com` with the same credentials (`basic.auth.user.info`).
3. Fetches the latest Avro schema for the `recentchange-sampled-value` subject and builds an `AvroDeserializer`.
4. Subscribes to the `recentchange-sampled` topic and polls in a loop, deserializing each message value and printing `id`, `meta.domain`, `timestamp`, and `title`.

## What's included

- **`main.py`** — the consumer. Holds the Dekaf endpoint, schema registry URL, target topic (`recentchange-sampled`), Kafka/SASL config, Avro deserialization, and the poll loop.
- **`requirements.txt`** — single dependency: `confluent-kafka[avro,schemaregistry,rules]` (the Kafka client plus Avro + schema-registry extras). Note `main.py` also uses `python-dotenv` (`load_dotenv`) to read credentials from a `.env` file — install it as well (see below).

## Prerequisites

- **Python 3.8+** and `pip`.
- A **free Estuary account** — sign up at [https://dashboard.estuary.dev](https://dashboard.estuary.dev).
- The `recentchange-sampled` **collection available in your Estuary account**. This is the Wikimedia RecentChange sample stream. If you don't already have it, create a capture for the Wikimedia / public demo source (or any collection of your own) and update the `topic` variable in `main.py` to match the collection name you want to read.
- A **Dekaf access token** (an Estuary refresh/access token) to authenticate the consumer and schema registry.

> Dekaf authenticates with `sasl.mechanism=PLAIN` where the username is the Dekaf **task name** and the password is an Estuary **access token**. Public demo topics can be read with username `{}` and an empty password, but this example is wired for an authenticated collection via `DEKAF_TASK_NAME` and `DEKAF_ACCESS_TOKEN`.

## Setup

Install the dependencies:

```bash
pip install -r requirements.txt
pip install python-dotenv
```

Create a `.env` file in this directory with your Dekaf credentials:

```bash
# .env
DEKAF_TASK_NAME=your-dekaf-task-name
DEKAF_ACCESS_TOKEN=your-estuary-access-token
```

These map directly to the consumer's `sasl.username` / `sasl.password` and the schema registry's `basic.auth.user.info` in `main.py`.

### Getting your Dekaf credentials

1. In the [Estuary dashboard](https://dashboard.estuary.dev), open the collection you want to consume (here, `recentchange-sampled`).
2. Create or open a **Dekaf** materialization/task for it — its name is your `DEKAF_TASK_NAME`.
3. Generate an **access token** (or refresh token) from your account settings — this is your `DEKAF_ACCESS_TOKEN`.

See the Dekaf guide for the exact steps: [Reading Estuary collections from Kafka (Dekaf)](https://docs.estuary.dev/guides/dekaf_reading_collections_from_kafka/).

## Running it

```bash
python main.py
```

You should see one line per record as events stream in, for example:

```
('<id>', '<meta.domain>', '<timestamp>', '<title>')
```

Stop with `Ctrl+C` — the script catches `KeyboardInterrupt` and closes the consumer cleanly.

## Configuration reference

All connection settings live at the top of `main.py`:

| Setting | Value | Notes |
| --- | --- | --- |
| `bootstrap.servers` | `dekaf.estuary-data.com` | Dekaf broker (port `9092`) |
| `security.protocol` | `SASL_SSL` | Required by Dekaf |
| `sasl.mechanism` | `PLAIN` | Required by Dekaf |
| `sasl.username` | `DEKAF_TASK_NAME` | Dekaf task name |
| `sasl.password` | `DEKAF_ACCESS_TOKEN` | Estuary access token |
| Schema registry URL | `https://dekaf.estuary-data.com` | Confluent-compatible Avro registry |
| `group.id` | `my-group` | Consumer group |
| `auto.offset.reset` | `latest` | Set to `earliest` to read from the start of the collection |
| `topic` | `recentchange-sampled` | The Estuary collection name |

To consume a different collection, change `topic` to that collection's name; the deserializer automatically resolves the `<topic>-value` subject from the schema registry.

## Verify

- Watch `main.py`'s output — a steady stream of printed records confirms Dekaf is serving the collection and the Avro schema resolved correctly.
- Cross-check against the collection in the [Estuary dashboard](https://dashboard.estuary.dev) (open the collection and view recent documents).
- With [flowctl](https://docs.estuary.dev/concepts/flowctl/) you can read the same collection directly:

```bash
flowctl collections read --collection recentchange-sampled --uncommitted | head
```

## Next steps

- Point any other Kafka client (kcat, Kafka Connect, Flink, Spark, Tinybird, ClickPipes, etc.) at the same Dekaf endpoint — the credentials and schema registry are identical across clients.
- Swap the `print` for your own processing: write to a database, push to another stream, or run real-time analytics.
- Build a full pipeline: capture a source into a collection, optionally transform it with a [derivation](https://docs.estuary.dev/concepts/derivations/), and consume it here.

## References

- Dekaf guide: [Reading Estuary collections from Kafka](https://docs.estuary.dev/guides/dekaf_reading_collections_from_kafka/)
- Estuary docs: [https://docs.estuary.dev](https://docs.estuary.dev)
- Estuary dashboard: [https://dashboard.estuary.dev](https://dashboard.estuary.dev)
- `confluent-kafka` Python client: [https://github.com/confluentinc/confluent-kafka-python](https://github.com/confluentinc/confluent-kafka-python)
Loading