Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 190 additions & 0 deletions .ai/analytics-output-port-design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
# Analytics Output Port Design

## Status: Approved
## Date: 2025-01-21

## Problem Statement

When connecting a component's `rawOutput` (which contains complex nested JSON) to the Analytics Sink, OpenSearch hits the default field limit of 1000 fields. This is because:

1. **Dynamic mapping explosion**: Elasticsearch/OpenSearch creates a field for every unique JSON path
2. **Nested structures**: Arrays with objects like `issues[0].metadata.schema` create many paths
3. **Varying schemas**: Different scanner outputs accumulate unique field paths over time

Example error:
```
illegal_argument_exception: Limit of total fields [1000] has been exceeded
```

## Solution

### Design Decisions

1. **Each component owns its analytics schema**
- Components output structured `list<json>` through dedicated ports (`findings`, `results`, `secrets`, `issues`)
- Component authors define the structure appropriate for their tool
- No generic "one schema fits all" approach

2. **Analytics Sink accepts `list<json>`**
- Input type: `z.array(z.record(z.string(), z.unknown()))`
- Each item in the array is indexed as a separate document
- Rejects arbitrary nested objects (must be an array)

3. **Same timestamp for all findings in a batch**
- All findings from one component execution share the same `@timestamp`
- Captured once at the start of indexing, applied to all documents

4. **Nested `shipsec` context**
- Workflow context stored under `shipsec.*` namespace
- Prevents field name collision with component data
- Clear separation: component fields at root, system fields under `shipsec`

5. **Nested objects serialized before indexing**
- Any nested object or array within a finding is JSON-stringified
- Prevents field explosion from dynamic mapping
- Trade-off: Can't query inside serialized fields directly, but prevents index corruption

6. **No `data` wrapper**
- Original PRD design wrapped component output in a `data` field
- New design: finding fields are at the top level for easier querying

### Document Structure

**Before (PRD design):**
```json
{
"workflow_id": "...",
"workflow_name": "...",
"run_id": "...",
"node_ref": "...",
"component_id": "...",
"@timestamp": "...",
"asset_key": "...",
"data": {
"check_id": "DB_RLS_DISABLED",
"severity": "CRITICAL",
"metadata": { "schema": "public", "table": "users" }
}
}
```

**After (new design):**
```json
{
"check_id": "DB_RLS_DISABLED",
"severity": "CRITICAL",
"title": "RLS Disabled on Table: users",
"resource": "public.users",
"metadata": "{\"schema\":\"public\",\"table\":\"users\"}",
"scanner": "supabase-scanner",
"asset_key": "abcdefghij1234567890",
"finding_hash": "a1b2c3d4e5f67890",

"shipsec": {
"organization_id": "org_123",
"run_id": "shipsec-run-xxx",
"workflow_id": "d1d33161-929f-4af4-9a64-xxx",
"workflow_name": "Supabase Security Audit",
"component_id": "core.analytics.sink",
"node_ref": "analytics-sink-1"
},

"@timestamp": "2025-01-21T10:30:00.000Z"
}
```

### Component Output Ports

Components should use their existing structured list outputs:

| Component | Port | Type | Notes |
|-----------|------|------|-------|
| Nuclei | `results` | `z.array(z.record(z.string(), z.unknown()))` | Scanner + asset_key added |
| TruffleHog | `results` | `z.array(z.record(z.string(), z.unknown()))` | Scanner + asset_key added |
| Supabase Scanner | `results` | `z.array(z.record(z.string(), z.unknown()))` | Scanner + asset_key added |

All `results` ports include:
- `scanner`: Scanner identifier (e.g., `'nuclei'`, `'trufflehog'`, `'supabase-scanner'`)
- `asset_key`: Primary asset identifier from the finding
- `finding_hash`: Stable hash for deduplication (16-char hex from SHA-256)

### Finding Hash for Deduplication

The `finding_hash` enables tracking findings across workflow runs:

**Generation:**
```typescript
import { createHash } from 'crypto';

function generateFindingHash(...fields: (string | undefined | null)[]): string {
const normalized = fields.map((f) => (f ?? '').toLowerCase().trim()).join('|');
return createHash('sha256').update(normalized).digest('hex').slice(0, 16);
}
```

**Key fields per scanner:**
| Scanner | Hash Fields |
|---------|-------------|
| Nuclei | `templateId + host + matchedAt` |
| TruffleHog | `DetectorType + Redacted + filePath` |
| Supabase Scanner | `check_id + projectRef + resource` |

**Use cases:**
- **New vs recurring**: Is this finding appearing for the first time?
- **First-seen / last-seen**: When did we first detect this? Is it still present?
- **Resolution tracking**: Findings that stop appearing may be resolved
- **Deduplication**: Remove duplicates in dashboards across runs

### `shipsec` Context Fields

The indexer automatically adds these fields under `shipsec`:

| Field | Description |
|-------|-------------|
| `organization_id` | Organization that owns the workflow |
| `run_id` | Unique identifier for this workflow execution |
| `workflow_id` | ID of the workflow definition |
| `workflow_name` | Human-readable workflow name |
| `component_id` | Component type (e.g., `core.analytics.sink`) |
| `node_ref` | Node reference in the workflow graph |
| `asset_key` | Auto-detected or specified asset identifier |

### Querying in OpenSearch

With this structure, users can:
- Filter by organization: `shipsec.organization_id: "org_123"`
- Filter by workflow: `shipsec.workflow_id: "xxx"`
- Filter by run: `shipsec.run_id: "xxx"`
- Filter by asset: `asset_key: "api.example.com"`
- Filter by scanner: `scanner: "nuclei"`
- Filter by component-specific fields: `severity: "CRITICAL"`
- Aggregate by severity: `terms` aggregation on `severity` field
- Track finding history: `finding_hash: "a1b2c3d4" | sort @timestamp`
- Find recurring findings: Group by `finding_hash`, count occurrences

### Trade-offs

| Decision | Pro | Con |
|----------|-----|-----|
| Serialize nested objects | Prevents field explosion | Can't query inside serialized fields |
| `shipsec` namespace | No field collision | Slightly more verbose queries |
| No generic schema | Better fit per component | Less consistency across components |
| Same timestamp per batch | Accurate (same scan time) | Can't distinguish individual finding times |

### Implementation Files

1. `/worker/src/utils/opensearch-indexer.ts` - Add `shipsec` context, serialize nested objects
2. `/worker/src/components/core/analytics-sink.ts` - Accept `list<json>`, consistent timestamp
3. Component files - Ensure structured output, add `results` port where missing

### Backward Compatibility

- Existing workflows connecting `rawOutput` to Analytics Sink will still work
- Analytics Sink continues to accept any data type for backward compatibility
- New `list<json>` processing only triggers when input is an array

### Future Considerations

1. **Index templates**: Create OpenSearch index template with explicit mappings for `shipsec.*` fields
2. **Field discovery**: Build UI to show available fields from indexed data
3. **Schema validation**: Optional strict mode to validate findings against expected schema
4 changes: 4 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ ARG VITE_DEFAULT_ORG_ID=local-dev
ARG VITE_GIT_SHA=unknown
ARG VITE_PUBLIC_POSTHOG_KEY=""
ARG VITE_PUBLIC_POSTHOG_HOST=""
ARG VITE_OPENSEARCH_DASHBOARDS_URL=""

ENV VITE_AUTH_PROVIDER=${VITE_AUTH_PROVIDER}
ENV VITE_CLERK_PUBLISHABLE_KEY=${VITE_CLERK_PUBLISHABLE_KEY}
Expand All @@ -98,6 +99,7 @@ ENV VITE_DEFAULT_ORG_ID=${VITE_DEFAULT_ORG_ID}
ENV VITE_GIT_SHA=${VITE_GIT_SHA}
ENV VITE_PUBLIC_POSTHOG_KEY=${VITE_PUBLIC_POSTHOG_KEY}
ENV VITE_PUBLIC_POSTHOG_HOST=${VITE_PUBLIC_POSTHOG_HOST}
ENV VITE_OPENSEARCH_DASHBOARDS_URL=${VITE_OPENSEARCH_DASHBOARDS_URL}

# Set working directory for frontend
USER shipsec
Expand Down Expand Up @@ -129,6 +131,7 @@ ARG VITE_DEFAULT_ORG_ID=local-dev
ARG VITE_GIT_SHA=unknown
ARG VITE_PUBLIC_POSTHOG_KEY=""
ARG VITE_PUBLIC_POSTHOG_HOST=""
ARG VITE_OPENSEARCH_DASHBOARDS_URL=""

ENV VITE_AUTH_PROVIDER=${VITE_AUTH_PROVIDER}
ENV VITE_CLERK_PUBLISHABLE_KEY=${VITE_CLERK_PUBLISHABLE_KEY}
Expand All @@ -138,6 +141,7 @@ ENV VITE_DEFAULT_ORG_ID=${VITE_DEFAULT_ORG_ID}
ENV VITE_GIT_SHA=${VITE_GIT_SHA}
ENV VITE_PUBLIC_POSTHOG_KEY=${VITE_PUBLIC_POSTHOG_KEY}
ENV VITE_PUBLIC_POSTHOG_HOST=${VITE_PUBLIC_POSTHOG_HOST}
ENV VITE_OPENSEARCH_DASHBOARDS_URL=${VITE_OPENSEARCH_DASHBOARDS_URL}

# Set working directory for frontend
USER shipsec
Expand Down
21 changes: 16 additions & 5 deletions backend/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ AUTH_PROVIDER="local"
# If AUTH_LOCAL_ALLOW_UNAUTHENTICATED=false, clients must present AUTH_LOCAL_API_KEY in the Authorization header.
AUTH_LOCAL_ALLOW_UNAUTHENTICATED="true"
AUTH_LOCAL_API_KEY=""
# Required in production for session auth cookie signing
SESSION_SECRET=""

# Clerk provider options
# Required when AUTH_PROVIDER="clerk"
Expand All @@ -44,15 +46,24 @@ PLATFORM_SERVICE_TOKEN=""
# Optional: override request timeout in milliseconds (default 5000)
PLATFORM_API_TIMEOUT_MS=""

# OpenSearch configuration
OPENSEARCH_URL="http://localhost:9200"
OPENSEARCH_INDEX_PREFIX="logs-tenant"
# OPENSEARCH_USERNAME=""
# OPENSEARCH_PASSWORD=""
# OpenSearch configuration for security analytics indexing
# Optional: if not set, security analytics indexing will be disabled
OPENSEARCH_URL=""
OPENSEARCH_USERNAME=""
OPENSEARCH_PASSWORD=""

# OpenSearch Dashboards configuration for analytics visualization
# Optional: if not set, Dashboards link will not appear in frontend sidebar
# Example: "http://localhost:5601" or "https://dashboards.example.com"
OPENSEARCH_DASHBOARDS_URL=""

# Secret encryption key (must be exactly 32 characters, NOT hex-encoded)
# Generate with: openssl rand -base64 24 | head -c 32
SECRET_STORE_MASTER_KEY="CHANGE_ME_32_CHAR_SECRET_KEY!!!!"

# Redis configuration for rate limiting and caching
# Optional: if not set, rate limiting will use in-memory storage (not recommended for production)
REDIS_URL=""

# Kafka / Redpanda configuration for node I/O, log, and event ingestion
LOG_KAFKA_BROKERS="localhost:19092"
7 changes: 6 additions & 1 deletion backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,22 @@
"generate:openapi": "bun scripts/generate-openapi.ts",
"migration:push": "bun x drizzle-kit push",
"migration:smoke": "bun scripts/migration-smoke.ts",
"delete:runs": "bun scripts/delete-all-workflow-runs.ts"
"delete:runs": "bun scripts/delete-all-workflow-runs.ts",
"setup:opensearch": "bun scripts/setup-opensearch.ts"
},
"dependencies": {
"@clerk/backend": "^2.29.5",
"@clerk/types": "^4.101.13",
"@grpc/grpc-js": "^1.14.3",
"@nest-lab/throttler-storage-redis": "^1.1.0",
"@nestjs/common": "^10.4.22",
"@nestjs/config": "^3.3.0",
"@nestjs/core": "^10.4.22",
"@nestjs/microservices": "^11.1.13",
"@nestjs/platform-express": "^10.4.22",
"@nestjs/swagger": "^11.2.5",
"@nestjs/throttler": "^6.5.0",
"@opensearch-project/opensearch": "^3.5.1",
"@shipsec/backend-client": "workspace:*",
"@shipsec/component-sdk": "workspace:*",
"@shipsec/shared": "workspace:*",
Expand Down Expand Up @@ -62,6 +66,7 @@
"@eslint/js": "^9.39.2",
"@nestjs/testing": "^10.4.22",
"@types/bcryptjs": "^3.0.0",
"@types/cookie-parser": "^1.4.10",
"@types/express-serve-static-core": "^4.19.8",
"@types/har-format": "^1.2.16",
"@types/multer": "^2.0.0",
Expand Down
93 changes: 93 additions & 0 deletions backend/scripts/setup-opensearch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import { Client } from '@opensearch-project/opensearch';
import { config } from 'dotenv';

// Load environment variables
config();

async function main() {
const url = process.env.OPENSEARCH_URL;
const username = process.env.OPENSEARCH_USERNAME;
const password = process.env.OPENSEARCH_PASSWORD;

if (!url) {
console.error('❌ OPENSEARCH_URL environment variable is required');
process.exit(1);
}

console.log('🔍 Connecting to OpenSearch...');

const client = new Client({
node: url,
auth: username && password ? { username, password } : undefined,
ssl: {
rejectUnauthorized: process.env.NODE_ENV === 'production',
},
});

try {
// Test connection
const healthCheck = await client.cluster.health();
console.log(`✅ Connected to OpenSearch cluster (status: ${healthCheck.body.status})`);

// Create index template for security-findings-*
const templateName = 'security-findings-template';
console.log(`\n📋 Creating index template: ${templateName}`);

await client.indices.putIndexTemplate({
name: templateName,
body: {
index_patterns: ['security-findings-*'],
template: {
settings: {
number_of_shards: 1,
number_of_replicas: 1,
},
mappings: {
properties: {
'@timestamp': { type: 'date' },
// Root-level analytics fields
scanner: { type: 'keyword' },
severity: { type: 'keyword' },
finding_hash: { type: 'keyword' },
asset_key: { type: 'keyword' },
// Workflow context under shipsec namespace
shipsec: {
type: 'object',
dynamic: true,
properties: {
organization_id: { type: 'keyword' },
run_id: { type: 'keyword' },
workflow_id: { type: 'keyword' },
workflow_name: { type: 'keyword' },
component_id: { type: 'keyword' },
node_ref: { type: 'keyword' },
asset_key: { type: 'keyword' },
},
},
},
},
},
},
});

console.log(`✅ Index template '${templateName}' created successfully`);
console.log('\n📊 Template configuration:');
console.log(' - Index pattern: security-findings-*');
console.log(' - Shards: 1, Replicas: 1');
console.log(' - Mappings: @timestamp (date)');
console.log(' root: scanner, severity, finding_hash, asset_key (keyword)');
console.log(' shipsec.*: organization_id, run_id, workflow_id, workflow_name,');
console.log(' component_id, node_ref, asset_key (keyword)');
console.log('\n🎉 OpenSearch setup completed successfully!');
} catch (error) {
console.error('❌ OpenSearch setup failed');
console.error(error);
process.exit(1);
}
}

main().catch((error) => {
console.error('❌ Unexpected error during OpenSearch setup');
console.error(error);
process.exit(1);
});
Loading