Skip to content

Commit c33d57a

Browse files
committed
feat: implement caching for user admission check
1 parent 12688ec commit c33d57a

File tree

8 files changed

+119
-14
lines changed

8 files changed

+119
-14
lines changed

package-lock.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@
8686
"@types/express": "4.17.15",
8787
"@types/js-yaml": "4.0.5",
8888
"@types/mocha": "^9.1.1",
89-
"@types/node": "^24.0.0",
89+
"@types/node": "^24.12.2",
9090
"@types/pg": "^8.6.5",
9191
"@types/ramda": "^0.28.13",
9292
"@types/sinon": "^10.0.11",

src/@types/adapters.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ export type IWebSocketAdapter = EventEmitter & {
2121
export interface ICacheAdapter {
2222
getKey(key: string): Promise<string>
2323
hasKey(key: string): Promise<boolean>
24-
setKey(key: string, value: string): Promise<boolean>
24+
setKey(key: string, value: string, expirySeconds?: number): Promise<boolean>
2525
addToSortedSet(key: string, set: Record<string, string> | Record<string, string>[]): Promise<number>
2626
removeRangeByScoreFromSortedSet(key: string, min: number, max: number): Promise<number>
2727
getRangeFromSortedSet(key: string, start: number, stop: number): Promise<string[]>

src/adapters/redis-adapter.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ export class RedisAdapter implements ICacheAdapter {
4242
}
4343

4444
private onClientError(error: Error) {
45-
console.error('Unable to connect to Redis.', error)
45+
debug('Unable to connect to Redis.', error)
4646
// throw error
4747
}
4848

@@ -58,9 +58,12 @@ export class RedisAdapter implements ICacheAdapter {
5858
return this.client.get(key)
5959
}
6060

61-
public async setKey(key: string, value: string): Promise<boolean> {
61+
public async setKey(key: string, value: string, expirySeconds?: number): Promise<boolean> {
6262
await this.connection
63-
debug('get %s key', key)
63+
debug('set %s key', key)
64+
if (typeof expirySeconds === 'number') {
65+
return 'OK' === await this.client.set(key, value, { EX: expirySeconds })
66+
}
6467
return 'OK' === await this.client.set(key, value)
6568
}
6669

src/constants/caching.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
export enum CacheAdmissionState {
2+
ADMITTED = 'admitted',
3+
BLOCKED_NOT_ADMITTED = 'blocked_not_admitted',
4+
BLOCKED_INSUFFICIENT_BALANCE = 'blocked_insufficient_balance',
5+
}

src/factories/message-handler-factory.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,23 @@
1+
import { ICacheAdapter, IWebSocketAdapter } from '../@types/adapters'
12
import { IEventRepository, IUserRepository } from '../@types/repositories'
23
import { IncomingMessage, MessageType } from '../@types/messages'
34
import { createSettings } from './settings-factory'
45
import { EventMessageHandler } from '../handlers/event-message-handler'
56
import { eventStrategyFactory } from './event-strategy-factory'
6-
import { IWebSocketAdapter } from '../@types/adapters'
7+
import { getCacheClient } from '../cache/client'
8+
import { RedisAdapter } from '../adapters/redis-adapter'
79
import { slidingWindowRateLimiterFactory } from './rate-limiter-factory'
810
import { SubscribeMessageHandler } from '../handlers/subscribe-message-handler'
911
import { UnsubscribeMessageHandler } from '../handlers/unsubscribe-message-handler'
1012

13+
let cacheAdapter: ICacheAdapter | undefined = undefined
14+
const getCache = (): ICacheAdapter => {
15+
if (!cacheAdapter) {
16+
cacheAdapter = new RedisAdapter(getCacheClient())
17+
}
18+
return cacheAdapter
19+
}
20+
1121
export const messageHandlerFactory = (
1222
eventRepository: IEventRepository,
1323
userRepository: IUserRepository,
@@ -22,6 +32,7 @@ export const messageHandlerFactory = (
2232
userRepository,
2333
createSettings,
2434
slidingWindowRateLimiterFactory,
35+
getCache(),
2536
)
2637
}
2738
case MessageType.REQ:

src/handlers/event-message-handler.ts

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@ import {
1515
} from '../utils/event'
1616
import { IEventRepository, IUserRepository } from '../@types/repositories'
1717
import { IEventStrategy, IMessageHandler } from '../@types/message-handlers'
18+
import { CacheAdmissionState } from '../constants/caching'
1819
import { createCommandResult } from '../utils/messages'
1920
import { createLogger } from '../factories/logger-factory'
2021
import { Factory } from '../@types/base'
22+
import { ICacheAdapter } from '../@types/adapters'
2123
import { IncomingEventMessage } from '../@types/messages'
2224
import { IRateLimiter } from '../@types/utils'
2325
import { IWebSocketAdapter } from '../@types/adapters'
@@ -33,6 +35,7 @@ export class EventMessageHandler implements IMessageHandler {
3335
protected readonly userRepository: IUserRepository,
3436
private readonly settings: () => Settings,
3537
private readonly slidingWindowRateLimiter: Factory<IRateLimiter>,
38+
private readonly cache: ICacheAdapter,
3639
) {}
3740

3841
public async handleMessage(message: IncomingEventMessage): Promise<void> {
@@ -92,7 +95,6 @@ export class EventMessageHandler implements IMessageHandler {
9295
try {
9396
await strategy.execute(event)
9497
} catch (error) {
95-
console.error('error handling message', message, error)
9698
this.webSocket.emit(WebSocketAdapterEvent.Message, createCommandResult(event.id, false, 'error: unable to process event'))
9799
}
98100
}
@@ -313,17 +315,44 @@ export class EventMessageHandler implements IMessageHandler {
313315
return
314316
}
315317

316-
// const hasKey = await this.cache.hasKey(`${event.pubkey}:is-admitted`)
317-
// TODO: use cache
318+
const cacheKey = `${event.pubkey}:is-admitted`
319+
320+
try {
321+
const cachedValue = await this.cache.getKey(cacheKey)
322+
if (cachedValue === CacheAdmissionState.ADMITTED) {
323+
debug('cache hit for %s admission: admitted', event.pubkey)
324+
return
325+
}
326+
if (cachedValue === CacheAdmissionState.BLOCKED_NOT_ADMITTED) {
327+
debug('cache hit for %s admission: blocked', event.pubkey)
328+
return 'blocked: pubkey not admitted'
329+
}
330+
if (cachedValue === CacheAdmissionState.BLOCKED_INSUFFICIENT_BALANCE) {
331+
debug('cache hit for %s admission: insufficient balance', event.pubkey)
332+
return 'blocked: insufficient balance'
333+
}
334+
} catch (error) {
335+
debug('cache error for %s: %o', event.pubkey, error)
336+
}
337+
318338
const user = await this.userRepository.findByPubkey(event.pubkey)
319339
if (!user || !user.isAdmitted) {
340+
this.cacheSet(cacheKey, CacheAdmissionState.BLOCKED_NOT_ADMITTED, 60)
320341
return 'blocked: pubkey not admitted'
321342
}
322343

323344
const minBalance = currentSettings.limits?.event?.pubkey?.minBalance ?? 0n
324345
if (minBalance > 0n && user.balance < minBalance) {
346+
this.cacheSet(cacheKey, CacheAdmissionState.BLOCKED_INSUFFICIENT_BALANCE, 60)
325347
return 'blocked: insufficient balance'
326348
}
349+
350+
this.cacheSet(cacheKey, CacheAdmissionState.ADMITTED, 300)
351+
}
352+
353+
private cacheSet(key: string, value: string, ttl: number): void {
354+
this.cache.setKey(key, value, ttl)
355+
.catch((error) => debug('unable to cache %s: %o', key, error))
327356
}
328357

329358
protected addExpirationMetadata(event: Event): Event | ExpiringEvent {

test/unit/handlers/event-message-handler.spec.ts

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ chai.use(chaiAsPromised)
1010

1111
import { EventLimits, Settings } from '../../../src/@types/settings'
1212
import { IncomingEventMessage, MessageType } from '../../../src/@types/messages'
13+
import { CacheAdmissionState } from '../../../src/constants/caching'
1314
import { Event } from '../../../src/@types/event'
1415
import { EventKinds } from '../../../src/constants/base'
1516
import { EventMessageHandler } from '../../../src/handlers/event-message-handler'
@@ -88,7 +89,8 @@ describe('EventMessageHandler', () => {
8889
() => ({
8990
info: { relay_url: 'relay_url' },
9091
}) as any,
91-
() => ({ hit: async () => false })
92+
() => ({ hit: async () => false }),
93+
{ hasKey: async () => false, setKey: async () => true } as any,
9294
)
9395
})
9496

@@ -262,7 +264,8 @@ describe('EventMessageHandler', () => {
262264
{ hasActiveRequestToVanish: async () => false } as any,
263265
userRepository,
264266
() => settings,
265-
() => ({ hit: async () => false })
267+
() => ({ hit: async () => false }),
268+
{ hasKey: async () => false, setKey: async () => true } as any,
266269
)
267270
})
268271

@@ -738,7 +741,8 @@ describe('EventMessageHandler', () => {
738741
{ hasActiveRequestToVanish: async () => false } as any,
739742
userRepository,
740743
() => settings,
741-
() => ({ hit: rateLimiterHitStub })
744+
() => ({ hit: rateLimiterHitStub }),
745+
{ hasKey: async () => false, setKey: async () => true } as any,
742746
)
743747
})
744748

@@ -953,6 +957,7 @@ describe('EventMessageHandler', () => {
953957
let webSocket: IWebSocketAdapter
954958
let getRelayPublicKeyStub: SinonStub
955959
let userRepositoryFindByPubkeyStub: SinonStub
960+
let cacheStub: any
956961

957962
beforeEach(() => {
958963
settings = {
@@ -1000,13 +1005,19 @@ describe('EventMessageHandler', () => {
10001005
userRepository = {
10011006
findByPubkey: userRepositoryFindByPubkeyStub,
10021007
} as any
1008+
cacheStub = {
1009+
hasKey: sandbox.stub().resolves(false),
1010+
getKey: sandbox.stub().resolves(null),
1011+
setKey: sandbox.stub().resolves(true),
1012+
}
10031013
handler = new EventMessageHandler(
10041014
webSocket,
10051015
() => null,
10061016
{ hasActiveRequestToVanish: async () => false } as any,
10071017
userRepository,
10081018
() => settings,
1009-
() => ({ hit: async () => false })
1019+
() => ({ hit: async () => false }),
1020+
cacheStub,
10101021
)
10111022
})
10121023

@@ -1108,5 +1119,51 @@ describe('EventMessageHandler', () => {
11081119

11091120
return expect((handler as any).isUserAdmitted(event)).to.eventually.be.undefined
11101121
})
1122+
1123+
describe('caching', () => {
1124+
it('fulfills with undefined and uses cache hit for admitted user without hitting DB', async () => {
1125+
cacheStub.getKey.resolves(CacheAdmissionState.ADMITTED)
1126+
1127+
await expect((handler as any).isUserAdmitted(event)).to.eventually.be.undefined
1128+
expect(userRepositoryFindByPubkeyStub).not.to.have.been.called
1129+
})
1130+
1131+
it('fulfills with reason and uses cache hit for blocked user without hitting DB', async () => {
1132+
cacheStub.getKey.resolves(CacheAdmissionState.BLOCKED_NOT_ADMITTED)
1133+
1134+
await expect((handler as any).isUserAdmitted(event)).to.eventually.equal('blocked: pubkey not admitted')
1135+
expect(userRepositoryFindByPubkeyStub).not.to.have.been.called
1136+
})
1137+
1138+
it('fulfills with reason and uses cache hit for insufficient balance without hitting DB', async () => {
1139+
cacheStub.getKey.resolves(CacheAdmissionState.BLOCKED_INSUFFICIENT_BALANCE)
1140+
1141+
await expect((handler as any).isUserAdmitted(event)).to.eventually.equal('blocked: insufficient balance')
1142+
expect(userRepositoryFindByPubkeyStub).not.to.have.been.called
1143+
})
1144+
1145+
it('caches blocked status with 60s ttl when user is not found', async () => {
1146+
userRepositoryFindByPubkeyStub.resolves(undefined)
1147+
1148+
await (handler as any).isUserAdmitted(event)
1149+
expect(cacheStub.setKey).to.have.been.calledWith(`${event.pubkey}:is-admitted`, CacheAdmissionState.BLOCKED_NOT_ADMITTED, 60)
1150+
})
1151+
1152+
it('caches insufficient balance status with 60s ttl when user balance is too low', async () => {
1153+
settings.limits.event.pubkey.minBalance = 100n
1154+
userRepositoryFindByPubkeyStub.resolves({ isAdmitted: true, balance: 50n })
1155+
1156+
await (handler as any).isUserAdmitted(event)
1157+
expect(cacheStub.setKey).to.have.been.calledWith(`${event.pubkey}:is-admitted`, CacheAdmissionState.BLOCKED_INSUFFICIENT_BALANCE, 60)
1158+
})
1159+
1160+
it('caches admitted status with 300s ttl when user is admitted and has balance', async () => {
1161+
settings.limits.event.pubkey.minBalance = 100n
1162+
userRepositoryFindByPubkeyStub.resolves({ isAdmitted: true, balance: 150n })
1163+
1164+
await (handler as any).isUserAdmitted(event)
1165+
expect(cacheStub.setKey).to.have.been.calledWith(`${event.pubkey}:is-admitted`, CacheAdmissionState.ADMITTED, 300)
1166+
})
1167+
})
11111168
})
11121169
})

0 commit comments

Comments
 (0)