Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Local development runs as **multiple app instances** (PM2) on top of **one share

- Shared infra (Docker Compose project `shipsec-infra`): Postgres/Temporal/Redpanda/Redis/MinIO/Loki on fixed ports.
- Per-instance apps: `shipsec-{frontend,backend,worker}-N`.
- Isolation is via per-instance DB + Temporal namespace/task queue + Kafka topic suffixing (not per-instance infra containers).
- Isolation is via per-instance DB + Temporal namespace/task queue + Kafka topic suffixing + instance-scoped Kafka consumer groups/client IDs (not per-instance infra containers).
- The workspace can have an **active instance** (stored in `.shipsec-instance`, gitignored).

**Agent rule:** before running any dev commands, ensure you’re targeting the intended instance.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { afterEach, beforeEach, describe, expect, test } from 'bun:test';

import { AgentTraceIngestService } from '../agent-trace-ingest.service';
import type { AgentTraceRepository } from '../agent-trace.repository';

const ORIGINAL_ENV = { ...process.env };

function restoreEnv(): void {
process.env = { ...ORIGINAL_ENV };
}

describe('AgentTraceIngestService', () => {
beforeEach(() => {
restoreEnv();
process.env.LOG_KAFKA_BROKERS = 'localhost:19092';
delete process.env.SHIPSEC_INSTANCE;
delete process.env.AGENT_TRACE_KAFKA_GROUP_ID;
delete process.env.AGENT_TRACE_KAFKA_CLIENT_ID;
});

afterEach(() => {
restoreEnv();
});

test('uses legacy defaults when SHIPSEC_INSTANCE is unset', () => {
const repository = { append: async () => undefined } as unknown as AgentTraceRepository;
const service = new AgentTraceIngestService(repository) as unknown as {
kafkaGroupId: string;
kafkaClientId: string;
};

expect(service.kafkaGroupId).toBe('shipsec-agent-trace-ingestor');
expect(service.kafkaClientId).toBe('shipsec-backend-agent-trace');
});

test('uses instance-scoped defaults when SHIPSEC_INSTANCE is set', () => {
process.env.SHIPSEC_INSTANCE = '7';
const repository = { append: async () => undefined } as unknown as AgentTraceRepository;
const service = new AgentTraceIngestService(repository) as unknown as {
kafkaGroupId: string;
kafkaClientId: string;
};

expect(service.kafkaGroupId).toBe('shipsec-agent-trace-ingestor-7');
expect(service.kafkaClientId).toBe('shipsec-backend-agent-trace-7');
});

test('prefers explicit env vars over defaults', () => {
process.env.SHIPSEC_INSTANCE = '3';
process.env.AGENT_TRACE_KAFKA_GROUP_ID = 'custom-agent-trace-group';
process.env.AGENT_TRACE_KAFKA_CLIENT_ID = 'custom-agent-trace-client';
const repository = { append: async () => undefined } as unknown as AgentTraceRepository;
const service = new AgentTraceIngestService(repository) as unknown as {
kafkaGroupId: string;
kafkaClientId: string;
};

expect(service.kafkaGroupId).toBe('custom-agent-trace-group');
expect(service.kafkaClientId).toBe('custom-agent-trace-client');
});
});
11 changes: 9 additions & 2 deletions backend/src/agent-trace/agent-trace-ingest.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,16 @@ export class AgentTraceIngestService implements OnModuleInit, OnModuleDestroy {
// Use instance-aware topic name
const topicResolver = getTopicResolver();
this.kafkaTopic = topicResolver.getAgentTraceTopic();
const instanceId = process.env.SHIPSEC_INSTANCE;
const defaultGroupId = instanceId
? `shipsec-agent-trace-ingestor-${instanceId}`
: 'shipsec-agent-trace-ingestor';
const defaultClientId = instanceId
? `shipsec-backend-agent-trace-${instanceId}`
: 'shipsec-backend-agent-trace';

this.kafkaGroupId = process.env.AGENT_TRACE_KAFKA_GROUP_ID ?? 'shipsec-agent-trace-ingestor';
this.kafkaClientId = process.env.AGENT_TRACE_KAFKA_CLIENT_ID ?? 'shipsec-backend-agent-trace';
this.kafkaGroupId = process.env.AGENT_TRACE_KAFKA_GROUP_ID ?? defaultGroupId;
this.kafkaClientId = process.env.AGENT_TRACE_KAFKA_CLIENT_ID ?? defaultClientId;
}

async onModuleInit(): Promise<void> {
Expand Down
70 changes: 70 additions & 0 deletions backend/src/node-io/__tests__/node-io-ingest.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { afterEach, beforeEach, describe, expect, test } from 'bun:test';

import { NodeIOIngestService } from '../node-io-ingest.service';
import type { NodeIORepository } from '../node-io.repository';

const ORIGINAL_ENV = { ...process.env };

function restoreEnv(): void {
process.env = { ...ORIGINAL_ENV };
}

describe('NodeIOIngestService', () => {
beforeEach(() => {
restoreEnv();
process.env.LOG_KAFKA_BROKERS = 'localhost:19092';
delete process.env.SHIPSEC_INSTANCE;
delete process.env.NODE_IO_KAFKA_GROUP_ID;
delete process.env.NODE_IO_KAFKA_CLIENT_ID;
});

afterEach(() => {
restoreEnv();
});

test('uses legacy defaults when SHIPSEC_INSTANCE is unset', () => {
const repository = {
recordStart: async () => undefined,
recordCompletion: async () => undefined,
} as unknown as NodeIORepository;
const service = new NodeIOIngestService(repository) as unknown as {
kafkaGroupId: string;
kafkaClientId: string;
};

expect(service.kafkaGroupId).toBe('shipsec-node-io-ingestor');
expect(service.kafkaClientId).toBe('shipsec-backend-node-io');
});

test('uses instance-scoped defaults when SHIPSEC_INSTANCE is set', () => {
process.env.SHIPSEC_INSTANCE = '4';
const repository = {
recordStart: async () => undefined,
recordCompletion: async () => undefined,
} as unknown as NodeIORepository;
const service = new NodeIOIngestService(repository) as unknown as {
kafkaGroupId: string;
kafkaClientId: string;
};

expect(service.kafkaGroupId).toBe('shipsec-node-io-ingestor-4');
expect(service.kafkaClientId).toBe('shipsec-backend-node-io-4');
});

test('prefers explicit env vars over defaults', () => {
process.env.SHIPSEC_INSTANCE = '9';
process.env.NODE_IO_KAFKA_GROUP_ID = 'custom-node-io-group';
process.env.NODE_IO_KAFKA_CLIENT_ID = 'custom-node-io-client';
const repository = {
recordStart: async () => undefined,
recordCompletion: async () => undefined,
} as unknown as NodeIORepository;
const service = new NodeIOIngestService(repository) as unknown as {
kafkaGroupId: string;
kafkaClientId: string;
};

expect(service.kafkaGroupId).toBe('custom-node-io-group');
expect(service.kafkaClientId).toBe('custom-node-io-client');
});
});
11 changes: 9 additions & 2 deletions backend/src/node-io/node-io-ingest.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,16 @@ export class NodeIOIngestService implements OnModuleInit, OnModuleDestroy {
// Use instance-aware topic name
const topicResolver = getTopicResolver();
this.kafkaTopic = topicResolver.getNodeIOTopic();
const instanceId = process.env.SHIPSEC_INSTANCE;
const defaultGroupId = instanceId
? `shipsec-node-io-ingestor-${instanceId}`
: 'shipsec-node-io-ingestor';
const defaultClientId = instanceId
? `shipsec-backend-node-io-${instanceId}`
: 'shipsec-backend-node-io';

this.kafkaGroupId = process.env.NODE_IO_KAFKA_GROUP_ID ?? 'shipsec-node-io-ingestor';
this.kafkaClientId = process.env.NODE_IO_KAFKA_CLIENT_ID ?? 'shipsec-backend-node-io';
this.kafkaGroupId = process.env.NODE_IO_KAFKA_GROUP_ID ?? defaultGroupId;
this.kafkaClientId = process.env.NODE_IO_KAFKA_CLIENT_ID ?? defaultClientId;
}

async onModuleInit(): Promise<void> {
Expand Down
10 changes: 10 additions & 0 deletions pm2.config.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,16 @@ module.exports = {
EVENT_KAFKA_TOPIC: process.env.EVENT_KAFKA_TOPIC || 'telemetry.events',
EVENT_KAFKA_CLIENT_ID: process.env.EVENT_KAFKA_CLIENT_ID || `shipsec-backend-events-${instanceNum}`,
EVENT_KAFKA_GROUP_ID: process.env.EVENT_KAFKA_GROUP_ID || `shipsec-event-ingestor-${instanceNum}`,
NODE_IO_KAFKA_TOPIC: process.env.NODE_IO_KAFKA_TOPIC || 'telemetry.node-io',
NODE_IO_KAFKA_CLIENT_ID:
process.env.NODE_IO_KAFKA_CLIENT_ID || `shipsec-backend-node-io-${instanceNum}`,
NODE_IO_KAFKA_GROUP_ID:
process.env.NODE_IO_KAFKA_GROUP_ID || `shipsec-node-io-ingestor-${instanceNum}`,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve legacy Kafka group defaults when instance is unset

In the backend PM2 env block, this default always appends -${instanceNum} even when SHIPSEC_INSTANCE is not provided, because instanceNum falls back to '0' earlier in the file. That means PM2-launched backends without SHIPSEC_INSTANCE now get new group IDs (same pattern also exists for agent trace below), so the service-level “legacy fallback when unset” is bypassed and consumers can replay from the beginning (fromBeginning: true), duplicating ingest data for existing single-instance PM2 deployments.

Useful? React with 👍 / 👎.

AGENT_TRACE_KAFKA_TOPIC: process.env.AGENT_TRACE_KAFKA_TOPIC || 'telemetry.agent-trace',
AGENT_TRACE_KAFKA_CLIENT_ID:
process.env.AGENT_TRACE_KAFKA_CLIENT_ID || `shipsec-backend-agent-trace-${instanceNum}`,
AGENT_TRACE_KAFKA_GROUP_ID:
process.env.AGENT_TRACE_KAFKA_GROUP_ID || `shipsec-agent-trace-ingestor-${instanceNum}`,
ENABLE_INGEST_SERVICES: process.env.ENABLE_INGEST_SERVICES || 'true',
INTERNAL_SERVICE_TOKEN: process.env.INTERNAL_SERVICE_TOKEN || 'local-internal-token',
TEMPORAL_ADDRESS: process.env.TEMPORAL_ADDRESS || 'localhost:7233',
Expand Down