diff --git a/packages/integration-platform/src/manifests/aws/checks/__tests__/s3-buckets.test.ts b/packages/integration-platform/src/manifests/aws/checks/__tests__/s3-buckets.test.ts new file mode 100644 index 000000000..46ac91f69 --- /dev/null +++ b/packages/integration-platform/src/manifests/aws/checks/__tests__/s3-buckets.test.ts @@ -0,0 +1,175 @@ +import { + GetBucketEncryptionCommand, + GetPublicAccessBlockCommand, + ListBucketsCommand, + type S3Client, +} from '@aws-sdk/client-s3'; +import { describe, expect, it } from 'bun:test'; +import { gatherBuckets } from '../s3-buckets'; + +interface BpaResponse { + PublicAccessBlockConfiguration?: { + BlockPublicAcls?: boolean; + IgnorePublicAcls?: boolean; + BlockPublicPolicy?: boolean; + RestrictPublicBuckets?: boolean; + }; +} + +const FULLY_BLOCKED: BpaResponse = { + PublicAccessBlockConfiguration: { + BlockPublicAcls: true, + IgnorePublicAcls: true, + BlockPublicPolicy: true, + RestrictPublicBuckets: true, + }, +}; + +const namedError = (name: string, message: string): Error => { + const err = new Error(message); + err.name = name; + return err; +}; + +/** Fake S3Client whose `send` dispatches on the command type. */ +function fakeClient(opts: { + buckets: Array<{ name: string; region?: string }>; + onPublicAccess?: (bucket: string) => BpaResponse | Promise; +}): S3Client { + const send = async (command: unknown): Promise => { + if (command instanceof ListBucketsCommand) { + return { + Buckets: opts.buckets.map((b) => ({ + Name: b.name, + BucketRegion: b.region, + })), + ContinuationToken: undefined, + }; + } + if (command instanceof GetPublicAccessBlockCommand) { + const bucket = String(command.input.Bucket); + return opts.onPublicAccess ? await opts.onPublicAccess(bucket) : FULLY_BLOCKED; + } + if (command instanceof GetBucketEncryptionCommand) { + return { ServerSideEncryptionConfiguration: { Rules: [{}] } }; + } + throw new Error('unexpected command'); + }; + return { send } as unknown as S3Client; +} + +describe('gatherBuckets', () => { + it('reads every bucket and preserves input order', async () => { + const client = fakeClient({ + buckets: [{ name: 'a' }, { name: 'b' }, { name: 'c' }], + }); + const infos = await gatherBuckets(client, { + encryption: false, + publicAccess: true, + }); + expect(infos.map((i) => i.name)).toEqual(['a', 'b', 'c']); + for (const info of infos) { + expect(info.publicAccessDetermined).toBe(true); + expect(info.bucketBpa).not.toBeNull(); + } + }); + + it('isolates a per-bucket read failure (one bad bucket cannot fail the run)', async () => { + const client = fakeClient({ + buckets: [{ name: 'ok' }, { name: 'denied' }, { name: 'ok2' }], + onPublicAccess: (bucket) => { + if (bucket === 'denied') { + throw namedError('AccessDenied', 'not authorized to perform'); + } + return FULLY_BLOCKED; + }, + }); + const infos = await gatherBuckets(client, { + encryption: false, + publicAccess: true, + }); + + const denied = infos.find((i) => i.name === 'denied'); + expect(denied?.publicAccessDetermined).toBe(false); + expect(denied?.publicAccessReadFailure).toBeDefined(); + + for (const name of ['ok', 'ok2']) { + const info = infos.find((i) => i.name === name); + expect(info?.publicAccessDetermined).toBe(true); + expect(info?.bucketBpa).not.toBeNull(); + } + }); + + it('treats NoSuchPublicAccessBlockConfiguration as no bucket-level config (a genuine finding, not an error)', async () => { + const client = fakeClient({ + buckets: [{ name: 'nocfg' }], + onPublicAccess: () => { + throw namedError('NoSuchPublicAccessBlockConfiguration', 'none'); + }, + }); + const infos = await gatherBuckets(client, { + encryption: false, + publicAccess: true, + }); + expect(infos[0].publicAccessDetermined).toBe(true); + expect(infos[0].bucketBpa).toBeNull(); + }); + + it('routes a regioned bucket to its per-region client and keeps order', async () => { + const regionalServed: string[] = []; + const baseServed: string[] = []; + const regional = fakeClient({ + buckets: [], + onPublicAccess: (b) => { + regionalServed.push(b); + return FULLY_BLOCKED; + }, + }); + const base = fakeClient({ + buckets: [{ name: 'a' }, { name: 'b', region: 'eu-west-1' }, { name: 'c' }], + onPublicAccess: (b) => { + baseServed.push(b); + return FULLY_BLOCKED; + }, + }); + + const infos = await gatherBuckets(base, { + encryption: false, + publicAccess: true, + clientForRegion: () => regional, + }); + + expect(infos.map((i) => i.name)).toEqual(['a', 'b', 'c']); + expect(regionalServed).toContain('b'); + expect(baseServed).toContain('a'); + expect(baseServed).toContain('c'); + expect(baseServed).not.toContain('b'); + }); + + it('reads buckets concurrently but bounded (regression guard: serial reads time out the gateway)', async () => { + const buckets = Array.from({ length: 50 }, (_, i) => ({ name: `b${i}` })); + let inFlight = 0; + let maxInFlight = 0; + const client = fakeClient({ + buckets, + onPublicAccess: async () => { + inFlight++; + maxInFlight = Math.max(maxInFlight, inFlight); + await new Promise((resolve) => setTimeout(resolve, 5)); + inFlight--; + return FULLY_BLOCKED; + }, + }); + + const infos = await gatherBuckets(client, { + encryption: false, + publicAccess: true, + }); + + expect(infos).toHaveLength(50); + // Serial execution would never exceed 1 in flight — this is the regression. + expect(maxInFlight).toBeGreaterThan(1); + // ...but it must stay bounded (BUCKET_READ_CONCURRENCY = 20). + expect(maxInFlight).toBeLessThanOrEqual(20); + }); +}); diff --git a/packages/integration-platform/src/manifests/aws/checks/s3-buckets.ts b/packages/integration-platform/src/manifests/aws/checks/s3-buckets.ts index 7dea96ab4..c9616afee 100644 --- a/packages/integration-platform/src/manifests/aws/checks/s3-buckets.ts +++ b/packages/integration-platform/src/manifests/aws/checks/s3-buckets.ts @@ -95,6 +95,111 @@ export async function listAllBuckets( } } +/** + * Per-bucket reads are independent and idempotent, so run them with bounded + * concurrency. Reading a large bucket fleet serially was exceeding the API + * gateway's idle timeout on the scheduled/auto run path (surfacing to the + * caller as a 504); a bounded pool keeps even thousands of buckets well under + * that ceiling without opening an unbounded number of sockets. + */ +const BUCKET_READ_CONCURRENCY = 20; + +/** Run `fn` over `items` with at most `limit` in flight, preserving order. */ +async function mapWithConcurrency( + items: T[], + limit: number, + fn: (item: T) => Promise, +): Promise { + const results = new Array(items.length); + let cursor = 0; + const worker = async (): Promise => { + while (cursor < items.length) { + const index = cursor++; + results[index] = await fn(items[index]); + } + }; + const workerCount = Math.min(Math.max(1, limit), items.length); + await Promise.all(Array.from({ length: workerCount }, () => worker())); + return results; +} + +interface BucketReadOptions { + encryption: boolean; + publicAccess: boolean; + log?: (message: string) => void; +} + +/** + * Read one bucket's encryption + Block Public Access posture. Never throws: a + * read error is recorded on the returned info (`*Determined: false`) so one bad + * bucket cannot fail the whole run. + */ +async function readBucketInfo( + client: S3Client, + name: string, + opts: BucketReadOptions, +): Promise { + let encrypted = false; + let encryptionDetermined = true; + let encryptionReadFailure: ReadFailure | undefined; + let bucketBpa: BpaFlags | null = null; + let publicAccessDetermined = true; + let publicAccessReadFailure: ReadFailure | undefined; + + if (opts.encryption) { + try { + const enc = await client.send(new GetBucketEncryptionCommand({ Bucket: name })); + encrypted = (enc.ServerSideEncryptionConfiguration?.Rules?.length ?? 0) > 0; + } catch (err) { + // "no encryption configured" is a genuine finding; any other error + // (permissions/transient) is indeterminate → exclude from evaluation. + if (err instanceof Error && /ServerSideEncryptionConfigurationNotFound/i.test(err.name)) { + encrypted = false; + } else { + encryptionDetermined = false; + encryptionReadFailure = toReadFailure(err); + opts.log?.(`S3 ${name}: encryption read failed — ${encryptionReadFailure.error}`); + } + } + } + + if (opts.publicAccess) { + try { + const pab = await client.send(new GetPublicAccessBlockCommand({ Bucket: name })); + const c = pab.PublicAccessBlockConfiguration; + bucketBpa = { + blockPublicAcls: Boolean(c?.BlockPublicAcls), + ignorePublicAcls: Boolean(c?.IgnorePublicAcls), + blockPublicPolicy: Boolean(c?.BlockPublicPolicy), + restrictPublicBuckets: Boolean(c?.RestrictPublicBuckets), + }; + } catch (err) { + // "no bucket-level config" is a genuine finding (account-level may still + // cover it); any other error (AccessDenied/transient) is indeterminate → + // exclude from evaluation so we don't report a false public-access failure. + if (err instanceof Error && /NoSuchPublicAccessBlockConfiguration/i.test(err.name)) { + bucketBpa = null; // no bucket-level config + } else { + publicAccessDetermined = false; + publicAccessReadFailure = toReadFailure(err); + opts.log?.( + `S3 ${name}: Block Public Access read failed — ${publicAccessReadFailure.error}`, + ); + } + } + } + + return { + name, + encrypted, + encryptionDetermined, + encryptionReadFailure, + bucketBpa, + publicAccessDetermined, + publicAccessReadFailure, + }; +} + export async function gatherBuckets( s3: S3Client, opts: { @@ -107,75 +212,8 @@ export async function gatherBuckets( ): Promise { const buckets = await listAllBuckets(s3, opts.log); - const infos: S3BucketInfo[] = []; - for (const { name, region } of buckets) { - const client = - region && opts.clientForRegion ? opts.clientForRegion(region) : s3; - let encrypted = false; - let encryptionDetermined = true; - let encryptionReadFailure: ReadFailure | undefined; - let bucketBpa: BpaFlags | null = null; - let publicAccessDetermined = true; - let publicAccessReadFailure: ReadFailure | undefined; - - if (opts.encryption) { - try { - const enc = await client.send(new GetBucketEncryptionCommand({ Bucket: name })); - encrypted = (enc.ServerSideEncryptionConfiguration?.Rules?.length ?? 0) > 0; - } catch (err) { - // "no encryption configured" is a genuine finding; any other error - // (permissions/transient) is indeterminate → exclude from evaluation. - if ( - err instanceof Error && - /ServerSideEncryptionConfigurationNotFound/i.test(err.name) - ) { - encrypted = false; - } else { - encryptionDetermined = false; - encryptionReadFailure = toReadFailure(err); - opts.log?.( - `S3 ${name}: encryption read failed — ${encryptionReadFailure.error}`, - ); - } - } - } - if (opts.publicAccess) { - try { - const pab = await client.send(new GetPublicAccessBlockCommand({ Bucket: name })); - const c = pab.PublicAccessBlockConfiguration; - bucketBpa = { - blockPublicAcls: Boolean(c?.BlockPublicAcls), - ignorePublicAcls: Boolean(c?.IgnorePublicAcls), - blockPublicPolicy: Boolean(c?.BlockPublicPolicy), - restrictPublicBuckets: Boolean(c?.RestrictPublicBuckets), - }; - } catch (err) { - // "no bucket-level config" is a genuine finding (account-level may still - // cover it); any other error (AccessDenied/transient) is indeterminate → - // exclude from evaluation so we don't report a false public-access failure. - if ( - err instanceof Error && - /NoSuchPublicAccessBlockConfiguration/i.test(err.name) - ) { - bucketBpa = null; // no bucket-level config - } else { - publicAccessDetermined = false; - publicAccessReadFailure = toReadFailure(err); - opts.log?.( - `S3 ${name}: Block Public Access read failed — ${publicAccessReadFailure.error}`, - ); - } - } - } - infos.push({ - name, - encrypted, - encryptionDetermined, - encryptionReadFailure, - bucketBpa, - publicAccessDetermined, - publicAccessReadFailure, - }); - } - return infos; + return mapWithConcurrency(buckets, BUCKET_READ_CONCURRENCY, ({ name, region }) => { + const client = region && opts.clientForRegion ? opts.clientForRegion(region) : s3; + return readBucketInfo(client, name, opts); + }); }