Skip to content

Commit 8a12d14

Browse files
committed
feat(sfn-s3vectors-rag-refresh-cdk): Add S3 Vectors RAG refresh pattern with CDK
- Add Step Functions state machine for automated document ingestion pipeline - Create S3 Vectors knowledge base with distributed map for parallel processing - Implement Lambda functions for embedding generation, validation, and rollback - Add CDK infrastructure as code for complete stack deployment - Include comprehensive README with deployment and testing instructions - Add example pattern configuration and state machine visualization - Configure TypeScript build setup with tsconfig and package dependencies - Enable vector embedding via Amazon Bedrock Titan Text Embeddings V2 - Implement validation and automatic rollback on ingestion failure
1 parent 1e52b0c commit 8a12d14

13 files changed

Lines changed: 628 additions & 0 deletions

File tree

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
# Knowledge base refresh pipeline with AWS Step Functions and Amazon S3 Vectors
2+
3+
This pattern deploys an AWS Step Functions workflow that automates the ingestion of new documents into an Amazon S3 Vectors knowledge base so AI agents always answer from up-to-date information. When new documents land in S3, the workflow fans out via Distributed Map to generate vector embeddings using Amazon Bedrock and store them with `PutVectors` in parallel. After ingestion, `QueryVectors` validates that the new content is searchable, and a Choice state either confirms success or rolls back by deleting the newly added vectors if validation fails.
4+
5+
Learn more about this pattern at Serverless Land Patterns: [https://serverlessland.com/patterns/sfn-s3vectors-rag-refresh-cdk](https://serverlessland.com/patterns/sfn-s3vectors-rag-refresh-cdk)
6+
7+
Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example.
8+
9+
## Requirements
10+
11+
* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources.
12+
* [AWS CLI v2](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) (latest available version) installed and configured
13+
* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)
14+
* [AWS CDK](https://docs.aws.amazon.com/cdk/latest/guide/getting_started.html) (version 2.221.0 or later) installed and configured
15+
* [Node.js 22.x](https://nodejs.org/) installed
16+
17+
## Deployment Instructions
18+
19+
1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository:
20+
21+
```bash
22+
git clone https://github.com/aws-samples/serverless-patterns
23+
```
24+
25+
1. Change directory to the pattern directory:
26+
27+
```bash
28+
cd sfn-s3vectors-rag-refresh-cdk
29+
```
30+
31+
1. Install the project dependencies:
32+
33+
```bash
34+
npm install
35+
```
36+
37+
1. Install the Lambda dependencies:
38+
39+
```bash
40+
cd lambda && npm install && cd ..
41+
```
42+
43+
1. Deploy the CDK stack:
44+
45+
```bash
46+
cdk deploy
47+
```
48+
49+
Note: Deploy to your default AWS region. Please refer to the [AWS capabilities explorer](https://builder.aws.com/build/capabilities/explore) for feature availability in your desired region.
50+
51+
1. Note the outputs from the CDK deployment process. These contain the resource names used for testing.
52+
53+
## How it works
54+
55+
This pattern creates a single stack with the following resources:
56+
57+
1. **S3 Document Bucket** — Stores the source documents to be ingested. Upload files to the `documents/` prefix.
58+
59+
2. **S3 Vectors Bucket & Index** — An S3 Vectors vector bucket with a `knowledge-base` index configured for 1024-dimensional cosine similarity (matching Amazon Titan Text Embeddings V2 output).
60+
61+
3. **Step Functions State Machine** — Orchestrates the full ingestion pipeline:
62+
- **Distributed Map** fans out over every object under `s3://<bucket>/documents/`, processing up to 40 documents concurrently
63+
- For each document, the **EmbedAndStore** Lambda reads the file, calls Amazon Bedrock Titan Text Embeddings V2 to generate a 1024-dimensional vector, and writes it to the S3 Vectors index via `PutVectors` with the source file path as metadata
64+
- **ValidateIngestion** Lambda fetches the Distributed Map result manifest from S3, collects all vector keys from the SUCCEEDED results, generates a probe embedding, and calls `QueryVectors` to confirm at least one newly ingested vector is returned
65+
- A **Choice** state checks the validation result: on success the workflow completes; on failure the **RollbackVectors** Lambda calls `DeleteVectors` to remove all newly added vectors, then the workflow fails
66+
67+
## Architecture
68+
69+
![State Machine](state-machine.png)
70+
71+
## Testing
72+
73+
After deployment, upload sample documents and start the workflow.
74+
75+
### Upload test documents
76+
77+
```bash
78+
BUCKET=$(aws cloudformation describe-stacks \
79+
--stack-name RagRefreshStack \
80+
--query "Stacks[0].Outputs[?OutputKey=='DocumentBucketName'].OutputValue" \
81+
--output text)
82+
83+
echo "Amazon S3 Vectors is a new vector storage capability." > /tmp/doc1.txt
84+
echo "Step Functions Distributed Map enables parallel processing at scale." > /tmp/doc2.txt
85+
86+
aws s3 cp /tmp/doc1.txt s3://$BUCKET/documents/doc1.txt
87+
aws s3 cp /tmp/doc2.txt s3://$BUCKET/documents/doc2.txt
88+
```
89+
90+
### Start the workflow
91+
92+
```bash
93+
STATE_MACHINE_ARN=$(aws cloudformation describe-stacks \
94+
--stack-name RagRefreshStack \
95+
--query "Stacks[0].Outputs[?OutputKey=='StateMachineArn'].OutputValue" \
96+
--output text)
97+
98+
aws stepfunctions start-execution \
99+
--state-machine-arn $STATE_MACHINE_ARN
100+
```
101+
102+
### Monitor execution
103+
104+
```bash
105+
aws stepfunctions list-executions \
106+
--state-machine-arn $STATE_MACHINE_ARN \
107+
--max-results 1
108+
```
109+
110+
### Expected result
111+
112+
The workflow should complete successfully. In the Step Functions console you'll see:
113+
1. Distributed Map processed both documents in parallel
114+
2. Each document was embedded and stored as a vector
115+
3. Validation confirmed the vectors are queryable
116+
4. The workflow reached the `IngestionSucceeded` state
117+
118+
## Cleanup
119+
120+
1. Delete the stack:
121+
122+
```bash
123+
cdk destroy
124+
```
125+
126+
1. Confirm the stack has been deleted:
127+
128+
```bash
129+
aws cloudformation list-stacks --stack-status-filter DELETE_COMPLETE
130+
```
131+
132+
----
133+
Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved.
134+
135+
SPDX-License-Identifier: MIT-0
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
#!/usr/bin/env node
2+
import 'source-map-support/register';
3+
import * as cdk from 'aws-cdk-lib';
4+
import { RagRefreshStack } from '../lib/rag-refresh-stack';
5+
6+
const app = new cdk.App();
7+
8+
new RagRefreshStack(app, 'RagRefreshStack', {
9+
env: {
10+
account: process.env.CDK_DEFAULT_ACCOUNT,
11+
region: process.env.AWS_REGION,
12+
},
13+
});
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{
2+
"app": "npx ts-node --prefer-ts-exts bin/app.ts",
3+
"context": {
4+
"@aws-cdk/aws-lambda:recognizeLayerVersion": true,
5+
"@aws-cdk/core:checkSecretUsage": true,
6+
"@aws-cdk/core:target-partitions": ["aws", "aws-cn"]
7+
}
8+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
{
2+
"title": "Knowledge base refresh pipeline with AWS Step Functions & Amazon S3 Vectors",
3+
"description": "Automate ingestion of new documents into an Amazon S3 Vectors knowledge base using AWS Step Functions Distributed Map with validation",
4+
"language": "TypeScript",
5+
"level": "300",
6+
"framework": "AWS CDK",
7+
"introBox": {
8+
"headline": "How it works",
9+
"text": [
10+
"When new documents land in an S3 bucket, a Step Functions workflow fans out via Distributed Map to process each document in parallel.",
11+
"For each document, a Lambda function reads the content, generates vector embeddings using Amazon Bedrock, and stores them with PutVectors in the S3 Vectors vector bucket.",
12+
"After ingestion completes, a validation step uses QueryVectors to confirm the new content is searchable. A Choice state either confirms success or rolls back by deleting the newly added vectors if validation fails."
13+
]
14+
},
15+
"gitHub": {
16+
"template": {
17+
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/sfn-s3vectors-rag-refresh-cdk",
18+
"templateURL": "serverless-patterns/sfn-s3vectors-rag-refresh-cdk",
19+
"projectFolder": "sfn-s3vectors-rag-refresh-cdk",
20+
"templateFile": "lib/rag-refresh-stack.ts"
21+
}
22+
},
23+
"resources": {
24+
"bullets": [
25+
{
26+
"text": "Amazon S3 Vectors documentation",
27+
"link": "https://docs.aws.amazon.com/AmazonS3/latest/userguide/vectors.html"
28+
},
29+
{
30+
"text": "AWS Step Functions Distributed Map",
31+
"link": "https://docs.aws.amazon.com/step-functions/latest/dg/concepts-asl-use-map-state-distributed.html"
32+
},
33+
{
34+
"text": "Amazon Bedrock Embeddings",
35+
"link": "https://docs.aws.amazon.com/bedrock/latest/userguide/embeddings.html"
36+
},
37+
{
38+
"text": "AWS CDK Developer Guide",
39+
"link": "https://docs.aws.amazon.com/cdk/latest/guide/"
40+
}
41+
]
42+
},
43+
"deploy": {
44+
"text": [
45+
"npm install",
46+
"cd lambda && npm install && cd ..",
47+
"cdk deploy"
48+
]
49+
},
50+
"testing": {
51+
"text": [
52+
"See the GitHub repo for detailed testing instructions."
53+
]
54+
},
55+
"cleanup": {
56+
"text": [
57+
"Delete the stack: <code>cdk destroy</code>."
58+
]
59+
},
60+
"authors": [
61+
{
62+
"name": "Ben Freiberg",
63+
"image": "https://serverlessland.com/assets/images/resources/contributors/ben-freiberg.jpg",
64+
"bio": "Ben is a Senior Solutions Architect at Amazon Web Services (AWS) based in Frankfurt, Germany.",
65+
"linkedin": "benfreiberg"
66+
}
67+
]
68+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import { S3Client, GetObjectCommand } from '@aws-sdk/client-s3';
2+
import { BedrockRuntimeClient, InvokeModelCommand } from '@aws-sdk/client-bedrock-runtime';
3+
import { S3VectorsClient, PutVectorsCommand } from '@aws-sdk/client-s3vectors';
4+
5+
const s3 = new S3Client();
6+
const bedrock = new BedrockRuntimeClient();
7+
const s3vectors = new S3VectorsClient();
8+
9+
const DOCUMENT_BUCKET = process.env.DOCUMENT_BUCKET!;
10+
const VECTOR_BUCKET_NAME = process.env.VECTOR_BUCKET_NAME!;
11+
const VECTOR_INDEX_NAME = process.env.VECTOR_INDEX_NAME!;
12+
const MODEL_ID = 'amazon.titan-embed-text-v2:0';
13+
14+
interface S3ItemEvent {
15+
Key: string;
16+
}
17+
18+
export async function handler(event: S3ItemEvent) {
19+
const { Key } = event;
20+
21+
// 1. Read document from S3
22+
const getObj = await s3.send(new GetObjectCommand({
23+
Bucket: DOCUMENT_BUCKET,
24+
Key,
25+
}));
26+
const text = await getObj.Body!.transformToString('utf-8');
27+
28+
// 2. Generate embedding via Bedrock Titan Embeddings V2
29+
const invokeResp = await bedrock.send(new InvokeModelCommand({
30+
modelId: MODEL_ID,
31+
contentType: 'application/json',
32+
accept: 'application/json',
33+
body: JSON.stringify({ inputText: text, dimensions: 1024, normalize: true }),
34+
}));
35+
const embeddingResult = JSON.parse(new TextDecoder().decode(invokeResp.body));
36+
const embedding: number[] = embeddingResult.embedding;
37+
38+
// 3. Store vector with PutVectors
39+
const vectorKey = Key.replace(/\//g, '_');
40+
await s3vectors.send(new PutVectorsCommand({
41+
vectorBucketName: VECTOR_BUCKET_NAME,
42+
indexName: VECTOR_INDEX_NAME,
43+
vectors: [{
44+
key: vectorKey,
45+
data: { float32: embedding },
46+
metadata: { source: Key },
47+
}],
48+
}));
49+
50+
return { vectorKey, documentKey: Key };
51+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"name": "rag-refresh-lambdas",
3+
"version": "1.0.0",
4+
"main": "index.js",
5+
"dependencies": {
6+
"@aws-sdk/client-s3": "^3.700.0",
7+
"@aws-sdk/client-bedrock-runtime": "^3.700.0",
8+
"@aws-sdk/client-s3vectors": "^3.700.0"
9+
}
10+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import { S3VectorsClient, DeleteVectorsCommand } from '@aws-sdk/client-s3vectors';
2+
3+
const s3vectors = new S3VectorsClient();
4+
5+
const VECTOR_BUCKET_NAME = process.env.VECTOR_BUCKET_NAME!;
6+
const VECTOR_INDEX_NAME = process.env.VECTOR_INDEX_NAME!;
7+
8+
interface RollbackEvent {
9+
vectorKeys: string[];
10+
}
11+
12+
export async function handler(event: RollbackEvent) {
13+
const { vectorKeys } = event;
14+
15+
// DeleteVectors accepts up to 500 keys per call
16+
const BATCH_SIZE = 500;
17+
for (let i = 0; i < vectorKeys.length; i += BATCH_SIZE) {
18+
const batch = vectorKeys.slice(i, i + BATCH_SIZE);
19+
await s3vectors.send(new DeleteVectorsCommand({
20+
vectorBucketName: VECTOR_BUCKET_NAME,
21+
indexName: VECTOR_INDEX_NAME,
22+
keys: batch,
23+
}));
24+
}
25+
26+
return { deleted: vectorKeys.length };
27+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
{
2+
"compilerOptions": {
3+
"target": "ES2022",
4+
"module": "commonjs",
5+
"moduleResolution": "node",
6+
"lib": ["es2022"],
7+
"outDir": ".",
8+
"rootDir": ".",
9+
"strict": true,
10+
"esModuleInterop": true,
11+
"skipLibCheck": true,
12+
"forceConsistentCasingInFileNames": true,
13+
"resolveJsonModule": true
14+
},
15+
"include": ["*.ts"],
16+
"exclude": ["node_modules"]
17+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import { BedrockRuntimeClient, InvokeModelCommand } from '@aws-sdk/client-bedrock-runtime';
2+
import { S3VectorsClient, QueryVectorsCommand } from '@aws-sdk/client-s3vectors';
3+
4+
const bedrock = new BedrockRuntimeClient();
5+
const s3vectors = new S3VectorsClient();
6+
7+
const VECTOR_BUCKET_NAME = process.env.VECTOR_BUCKET_NAME!;
8+
const VECTOR_INDEX_NAME = process.env.VECTOR_INDEX_NAME!;
9+
const MODEL_ID = 'amazon.titan-embed-text-v2:0';
10+
11+
interface MapChildResult {
12+
vectorKey: string;
13+
documentKey: string;
14+
}
15+
16+
export async function handler(event: MapChildResult[]) {
17+
// ResultWriterV2 returns child results directly as an array
18+
const vectorKeys = event.map(r => r.vectorKey);
19+
20+
if (vectorKeys.length === 0) {
21+
return { valid: false, vectorKeys };
22+
}
23+
24+
// 3. Generate a probe embedding and query the index
25+
const probeText = 'vector storage and embeddings for AI applications';
26+
const invokeResp = await bedrock.send(new InvokeModelCommand({
27+
modelId: MODEL_ID,
28+
contentType: 'application/json',
29+
accept: 'application/json',
30+
body: JSON.stringify({ inputText: probeText, dimensions: 1024, normalize: true }),
31+
}));
32+
const probeEmbedding: number[] = JSON.parse(new TextDecoder().decode(invokeResp.body)).embedding;
33+
34+
const queryResp = await s3vectors.send(new QueryVectorsCommand({
35+
vectorBucketName: VECTOR_BUCKET_NAME,
36+
indexName: VECTOR_INDEX_NAME,
37+
queryVector: { float32: probeEmbedding },
38+
topK: 5,
39+
returnMetadata: true,
40+
}));
41+
42+
// 4. Validate: at least one newly ingested vector appears in results
43+
const returnedKeys = new Set(queryResp.vectors?.map(v => v.key) ?? []);
44+
const found = vectorKeys.some(k => returnedKeys.has(k));
45+
46+
return { valid: found, vectorKeys };
47+
}

0 commit comments

Comments
 (0)