Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions apps/sim/lib/core/config/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ export const env = createEnv({
KB_CONFIG_RETRY_FACTOR: z.number().optional().default(2), // Retry backoff factor
KB_CONFIG_MIN_TIMEOUT: z.number().optional().default(1000), // Min timeout in ms
KB_CONFIG_MAX_TIMEOUT: z.number().optional().default(10000), // Max timeout in ms
KB_CONFIG_CONCURRENCY_LIMIT: z.number().optional().default(20), // Queue concurrency limit
KB_CONFIG_BATCH_SIZE: z.number().optional().default(20), // Processing batch size
KB_CONFIG_DELAY_BETWEEN_BATCHES: z.number().optional().default(100), // Delay between batches in ms
KB_CONFIG_CONCURRENCY_LIMIT: z.number().optional().default(50), // Concurrent embedding API calls
KB_CONFIG_BATCH_SIZE: z.number().optional().default(2000), // Chunks to process per embedding batch
KB_CONFIG_DELAY_BETWEEN_BATCHES: z.number().optional().default(0), // Delay between batches in ms (0 for max speed)
KB_CONFIG_DELAY_BETWEEN_DOCUMENTS: z.number().optional().default(50), // Delay between documents in ms

// Real-time Communication
Expand Down
8 changes: 4 additions & 4 deletions apps/sim/lib/knowledge/documents/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ const TIMEOUTS = {

// Configuration for handling large documents
const LARGE_DOC_CONFIG = {
MAX_CHUNKS_PER_BATCH: 500, // Insert embeddings in batches of 500
MAX_EMBEDDING_BATCH: 500, // Generate embeddings in batches of 500
MAX_FILE_SIZE: 100 * 1024 * 1024, // 100MB max file size
MAX_CHUNKS_PER_DOCUMENT: 100000, // Maximum chunks allowed per document
MAX_CHUNKS_PER_BATCH: 500,
MAX_EMBEDDING_BATCH: env.KB_CONFIG_BATCH_SIZE || 2000,
MAX_FILE_SIZE: 100 * 1024 * 1024,
MAX_CHUNKS_PER_DOCUMENT: 100000,
}

/**
Expand Down
68 changes: 45 additions & 23 deletions apps/sim/lib/knowledge/embeddings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { batchByTokenLimit, getTotalTokenCount } from '@/lib/tokenization'
const logger = createLogger('EmbeddingUtils')

const MAX_TOKENS_PER_REQUEST = 8000
const MAX_CONCURRENT_BATCHES = env.KB_CONFIG_CONCURRENCY_LIMIT || 50

export class EmbeddingAPIError extends Error {
public status: number
Expand Down Expand Up @@ -121,8 +122,29 @@ async function callEmbeddingAPI(inputs: string[], config: EmbeddingConfig): Prom
}

/**
* Generate embeddings for multiple texts with token-aware batching
* Uses tiktoken for token counting
* Process batches with controlled concurrency
*/
async function processWithConcurrency<T, R>(
items: T[],
concurrency: number,
processor: (item: T, index: number) => Promise<R>
): Promise<R[]> {
const results: R[] = new Array(items.length)
let currentIndex = 0

const workers = Array.from({ length: Math.min(concurrency, items.length) }, async () => {
while (currentIndex < items.length) {
const index = currentIndex++
results[index] = await processor(items[index], index)
}
})

await Promise.all(workers)
return results
}

/**
* Generate embeddings for multiple texts with token-aware batching and parallel processing
*/
export async function generateEmbeddings(
texts: string[],
Expand All @@ -138,35 +160,35 @@ export async function generateEmbeddings(
const batches = batchByTokenLimit(texts, MAX_TOKENS_PER_REQUEST, embeddingModel)

logger.info(
`Split ${texts.length} texts into ${batches.length} batches (max ${MAX_TOKENS_PER_REQUEST} tokens per batch)`
`Split ${texts.length} texts into ${batches.length} batches (max ${MAX_TOKENS_PER_REQUEST} tokens per batch, ${MAX_CONCURRENT_BATCHES} concurrent)`
)

const allEmbeddings: number[][] = []
const batchResults = await processWithConcurrency(
batches,
MAX_CONCURRENT_BATCHES,
async (batch, i) => {
const batchTokenCount = getTotalTokenCount(batch, embeddingModel)

for (let i = 0; i < batches.length; i++) {
const batch = batches[i]
const batchTokenCount = getTotalTokenCount(batch, embeddingModel)
logger.info(
`Processing batch ${i + 1}/${batches.length}: ${batch.length} texts, ${batchTokenCount} tokens`
)

logger.info(
`Processing batch ${i + 1}/${batches.length}: ${batch.length} texts, ${batchTokenCount} tokens`
)
try {
const batchEmbeddings = await callEmbeddingAPI(batch, config)

try {
const batchEmbeddings = await callEmbeddingAPI(batch, config)
allEmbeddings.push(...batchEmbeddings)
logger.info(
`Generated ${batchEmbeddings.length} embeddings for batch ${i + 1}/${batches.length}`
)

logger.info(
`Generated ${batchEmbeddings.length} embeddings for batch ${i + 1}/${batches.length}`
)
} catch (error) {
logger.error(`Failed to generate embeddings for batch ${i + 1}:`, error)
throw error
return batchEmbeddings
} catch (error) {
logger.error(`Failed to generate embeddings for batch ${i + 1}:`, error)
throw error
}
}
)

if (i + 1 < batches.length) {
await new Promise((resolve) => setTimeout(resolve, 100))
}
}
const allEmbeddings = batchResults.flat()

logger.info(`Successfully generated ${allEmbeddings.length} embeddings total`)

Expand Down