diff --git a/AGENTS.md b/AGENTS.md index fac41939..4363dbc1 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -41,7 +41,7 @@ Local development runs as **multiple app instances** (PM2) on top of **one share - Shared infra (Docker Compose project `shipsec-infra`): Postgres/Temporal/Redpanda/Redis/MinIO/Loki on fixed ports. - Per-instance apps: `shipsec-{frontend,backend,worker}-N`. -- Isolation is via per-instance DB + Temporal namespace/task queue + Kafka topic suffixing (not per-instance infra containers). +- Isolation is via per-instance DB + Temporal namespace/task queue + Kafka topic suffixing + instance-scoped Kafka consumer groups/client IDs (not per-instance infra containers). - The workspace can have an **active instance** (stored in `.shipsec-instance`, gitignored). **Agent rule:** before running any dev commands, ensure you’re targeting the intended instance. diff --git a/backend/src/agent-trace/__tests__/agent-trace-ingest.service.spec.ts b/backend/src/agent-trace/__tests__/agent-trace-ingest.service.spec.ts new file mode 100644 index 00000000..60acf933 --- /dev/null +++ b/backend/src/agent-trace/__tests__/agent-trace-ingest.service.spec.ts @@ -0,0 +1,61 @@ +import { afterEach, beforeEach, describe, expect, test } from 'bun:test'; + +import { AgentTraceIngestService } from '../agent-trace-ingest.service'; +import type { AgentTraceRepository } from '../agent-trace.repository'; + +const ORIGINAL_ENV = { ...process.env }; + +function restoreEnv(): void { + process.env = { ...ORIGINAL_ENV }; +} + +describe('AgentTraceIngestService', () => { + beforeEach(() => { + restoreEnv(); + process.env.LOG_KAFKA_BROKERS = 'localhost:19092'; + delete process.env.SHIPSEC_INSTANCE; + delete process.env.AGENT_TRACE_KAFKA_GROUP_ID; + delete process.env.AGENT_TRACE_KAFKA_CLIENT_ID; + }); + + afterEach(() => { + restoreEnv(); + }); + + test('uses legacy defaults when SHIPSEC_INSTANCE is unset', () => { + const repository = { append: async () => undefined } as unknown as AgentTraceRepository; + const service = new AgentTraceIngestService(repository) as unknown as { + kafkaGroupId: string; + kafkaClientId: string; + }; + + expect(service.kafkaGroupId).toBe('shipsec-agent-trace-ingestor'); + expect(service.kafkaClientId).toBe('shipsec-backend-agent-trace'); + }); + + test('uses instance-scoped defaults when SHIPSEC_INSTANCE is set', () => { + process.env.SHIPSEC_INSTANCE = '7'; + const repository = { append: async () => undefined } as unknown as AgentTraceRepository; + const service = new AgentTraceIngestService(repository) as unknown as { + kafkaGroupId: string; + kafkaClientId: string; + }; + + expect(service.kafkaGroupId).toBe('shipsec-agent-trace-ingestor-7'); + expect(service.kafkaClientId).toBe('shipsec-backend-agent-trace-7'); + }); + + test('prefers explicit env vars over defaults', () => { + process.env.SHIPSEC_INSTANCE = '3'; + process.env.AGENT_TRACE_KAFKA_GROUP_ID = 'custom-agent-trace-group'; + process.env.AGENT_TRACE_KAFKA_CLIENT_ID = 'custom-agent-trace-client'; + const repository = { append: async () => undefined } as unknown as AgentTraceRepository; + const service = new AgentTraceIngestService(repository) as unknown as { + kafkaGroupId: string; + kafkaClientId: string; + }; + + expect(service.kafkaGroupId).toBe('custom-agent-trace-group'); + expect(service.kafkaClientId).toBe('custom-agent-trace-client'); + }); +}); diff --git a/backend/src/agent-trace/agent-trace-ingest.service.ts b/backend/src/agent-trace/agent-trace-ingest.service.ts index 61bfb322..290aca41 100644 --- a/backend/src/agent-trace/agent-trace-ingest.service.ts +++ b/backend/src/agent-trace/agent-trace-ingest.service.ts @@ -26,9 +26,16 @@ export class AgentTraceIngestService implements OnModuleInit, OnModuleDestroy { // Use instance-aware topic name const topicResolver = getTopicResolver(); this.kafkaTopic = topicResolver.getAgentTraceTopic(); + const instanceId = process.env.SHIPSEC_INSTANCE; + const defaultGroupId = instanceId + ? `shipsec-agent-trace-ingestor-${instanceId}` + : 'shipsec-agent-trace-ingestor'; + const defaultClientId = instanceId + ? `shipsec-backend-agent-trace-${instanceId}` + : 'shipsec-backend-agent-trace'; - this.kafkaGroupId = process.env.AGENT_TRACE_KAFKA_GROUP_ID ?? 'shipsec-agent-trace-ingestor'; - this.kafkaClientId = process.env.AGENT_TRACE_KAFKA_CLIENT_ID ?? 'shipsec-backend-agent-trace'; + this.kafkaGroupId = process.env.AGENT_TRACE_KAFKA_GROUP_ID ?? defaultGroupId; + this.kafkaClientId = process.env.AGENT_TRACE_KAFKA_CLIENT_ID ?? defaultClientId; } async onModuleInit(): Promise { diff --git a/backend/src/node-io/__tests__/node-io-ingest.service.spec.ts b/backend/src/node-io/__tests__/node-io-ingest.service.spec.ts new file mode 100644 index 00000000..d7666d0d --- /dev/null +++ b/backend/src/node-io/__tests__/node-io-ingest.service.spec.ts @@ -0,0 +1,70 @@ +import { afterEach, beforeEach, describe, expect, test } from 'bun:test'; + +import { NodeIOIngestService } from '../node-io-ingest.service'; +import type { NodeIORepository } from '../node-io.repository'; + +const ORIGINAL_ENV = { ...process.env }; + +function restoreEnv(): void { + process.env = { ...ORIGINAL_ENV }; +} + +describe('NodeIOIngestService', () => { + beforeEach(() => { + restoreEnv(); + process.env.LOG_KAFKA_BROKERS = 'localhost:19092'; + delete process.env.SHIPSEC_INSTANCE; + delete process.env.NODE_IO_KAFKA_GROUP_ID; + delete process.env.NODE_IO_KAFKA_CLIENT_ID; + }); + + afterEach(() => { + restoreEnv(); + }); + + test('uses legacy defaults when SHIPSEC_INSTANCE is unset', () => { + const repository = { + recordStart: async () => undefined, + recordCompletion: async () => undefined, + } as unknown as NodeIORepository; + const service = new NodeIOIngestService(repository) as unknown as { + kafkaGroupId: string; + kafkaClientId: string; + }; + + expect(service.kafkaGroupId).toBe('shipsec-node-io-ingestor'); + expect(service.kafkaClientId).toBe('shipsec-backend-node-io'); + }); + + test('uses instance-scoped defaults when SHIPSEC_INSTANCE is set', () => { + process.env.SHIPSEC_INSTANCE = '4'; + const repository = { + recordStart: async () => undefined, + recordCompletion: async () => undefined, + } as unknown as NodeIORepository; + const service = new NodeIOIngestService(repository) as unknown as { + kafkaGroupId: string; + kafkaClientId: string; + }; + + expect(service.kafkaGroupId).toBe('shipsec-node-io-ingestor-4'); + expect(service.kafkaClientId).toBe('shipsec-backend-node-io-4'); + }); + + test('prefers explicit env vars over defaults', () => { + process.env.SHIPSEC_INSTANCE = '9'; + process.env.NODE_IO_KAFKA_GROUP_ID = 'custom-node-io-group'; + process.env.NODE_IO_KAFKA_CLIENT_ID = 'custom-node-io-client'; + const repository = { + recordStart: async () => undefined, + recordCompletion: async () => undefined, + } as unknown as NodeIORepository; + const service = new NodeIOIngestService(repository) as unknown as { + kafkaGroupId: string; + kafkaClientId: string; + }; + + expect(service.kafkaGroupId).toBe('custom-node-io-group'); + expect(service.kafkaClientId).toBe('custom-node-io-client'); + }); +}); diff --git a/backend/src/node-io/node-io-ingest.service.ts b/backend/src/node-io/node-io-ingest.service.ts index f7695cce..0459f0d5 100644 --- a/backend/src/node-io/node-io-ingest.service.ts +++ b/backend/src/node-io/node-io-ingest.service.ts @@ -46,9 +46,16 @@ export class NodeIOIngestService implements OnModuleInit, OnModuleDestroy { // Use instance-aware topic name const topicResolver = getTopicResolver(); this.kafkaTopic = topicResolver.getNodeIOTopic(); + const instanceId = process.env.SHIPSEC_INSTANCE; + const defaultGroupId = instanceId + ? `shipsec-node-io-ingestor-${instanceId}` + : 'shipsec-node-io-ingestor'; + const defaultClientId = instanceId + ? `shipsec-backend-node-io-${instanceId}` + : 'shipsec-backend-node-io'; - this.kafkaGroupId = process.env.NODE_IO_KAFKA_GROUP_ID ?? 'shipsec-node-io-ingestor'; - this.kafkaClientId = process.env.NODE_IO_KAFKA_CLIENT_ID ?? 'shipsec-backend-node-io'; + this.kafkaGroupId = process.env.NODE_IO_KAFKA_GROUP_ID ?? defaultGroupId; + this.kafkaClientId = process.env.NODE_IO_KAFKA_CLIENT_ID ?? defaultClientId; } async onModuleInit(): Promise { diff --git a/pm2.config.cjs b/pm2.config.cjs index 71452c39..b48939a8 100644 --- a/pm2.config.cjs +++ b/pm2.config.cjs @@ -271,6 +271,16 @@ module.exports = { EVENT_KAFKA_TOPIC: process.env.EVENT_KAFKA_TOPIC || 'telemetry.events', EVENT_KAFKA_CLIENT_ID: process.env.EVENT_KAFKA_CLIENT_ID || `shipsec-backend-events-${instanceNum}`, EVENT_KAFKA_GROUP_ID: process.env.EVENT_KAFKA_GROUP_ID || `shipsec-event-ingestor-${instanceNum}`, + NODE_IO_KAFKA_TOPIC: process.env.NODE_IO_KAFKA_TOPIC || 'telemetry.node-io', + NODE_IO_KAFKA_CLIENT_ID: + process.env.NODE_IO_KAFKA_CLIENT_ID || `shipsec-backend-node-io-${instanceNum}`, + NODE_IO_KAFKA_GROUP_ID: + process.env.NODE_IO_KAFKA_GROUP_ID || `shipsec-node-io-ingestor-${instanceNum}`, + AGENT_TRACE_KAFKA_TOPIC: process.env.AGENT_TRACE_KAFKA_TOPIC || 'telemetry.agent-trace', + AGENT_TRACE_KAFKA_CLIENT_ID: + process.env.AGENT_TRACE_KAFKA_CLIENT_ID || `shipsec-backend-agent-trace-${instanceNum}`, + AGENT_TRACE_KAFKA_GROUP_ID: + process.env.AGENT_TRACE_KAFKA_GROUP_ID || `shipsec-agent-trace-ingestor-${instanceNum}`, ENABLE_INGEST_SERVICES: process.env.ENABLE_INGEST_SERVICES || 'true', INTERNAL_SERVICE_TOKEN: process.env.INTERNAL_SERVICE_TOKEN || 'local-internal-token', TEMPORAL_ADDRESS: process.env.TEMPORAL_ADDRESS || 'localhost:7233',