Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
decodeMultipleValuesQuery,
fetchOrderFilledEventsBounded,
generateAIDeepLink,
getFailedProposal,
getNotifiedProposals,
getOrderFilledEvents,
getPolymarketMarketInformation,
Expand All @@ -28,7 +29,9 @@ import {
ONE_SCALED,
POLYGON_BLOCKS_PER_HOUR,
PolymarketTradeInformation,
removeFailedProposal,
shouldIgnoreThirdPartyProposal,
storeFailedProposal,
storeNotifiedProposals,
Logger,
Market,
Expand Down Expand Up @@ -246,8 +249,53 @@ export async function monitorTransactionsProposedOrderBook(

const logErrorAndPersist = async (proposal: OptimisticPriceRequest, err: Error) => {
const aiDeeplink = generateAIDeepLink(proposal.proposalHash, proposal.proposalLogIndex, params.aiResultsBaseUrl);
await logFailedMarketProposalVerification(logger, params.chainId, proposal, err as Error, aiDeeplink);

// When grace period is 0, alert immediately (original behavior).
if (params.failureGracePeriodSeconds <= 0) {
await logFailedMarketProposalVerification(logger, params.chainId, proposal, err, aiDeeplink);
await persistNotified(proposal, logger);
return;
}

const proposalKey = getProposalKeyToStore(proposal);
const failureRecord = await getFailedProposal(proposalKey);
const now = Date.now();

if (!failureRecord) {
await storeFailedProposal(proposalKey, {
firstFailureAt: new Date(now).toISOString(),
failureCount: 1,
lastError: err.message,
});
logger.warn({
at: "PolymarketMonitor",
message: "Check failed, will retry before alerting",
proposalHash: proposal.proposalHash,
error: err.message,
});
return;
}

const elapsed = now - new Date(failureRecord.firstFailureAt).getTime();
if (elapsed < params.failureGracePeriodSeconds * 1000) {
await storeFailedProposal(proposalKey, {
...failureRecord,
failureCount: failureRecord.failureCount + 1,
lastError: err.message,
});
logger.warn({
at: "PolymarketMonitor",
message: `Check failed (attempt ${failureRecord.failureCount + 1}), still within grace period`,
proposalHash: proposal.proposalHash,
error: err.message,
});
return;
}

// Grace period exceeded — alert and persist as notified.
await logFailedMarketProposalVerification(logger, params.chainId, proposal, err, aiDeeplink);
await persistNotified(proposal, logger);
await removeFailedProposal(proposalKey);
};

await Promise.all(
Expand Down Expand Up @@ -386,6 +434,7 @@ export async function monitorTransactionsProposedOrderBook(
tradeFilterFromTimestamp,
});
if (alerted) await persistNotified(proposal, logger);
if (params.failureGracePeriodSeconds > 0) await removeFailedProposal(getProposalKeyToStore(proposal));
} catch (err) {
await logErrorAndPersist(proposal, err as Error);
}
Expand Down
39 changes: 37 additions & 2 deletions packages/monitor-v2/src/monitor-polymarket/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ export interface MonitoringParams {
paginatedEventQueryConcurrency: number;
maxTradesPerToken: number;
fillEventsChunkBlocks: number;
failureGracePeriodSeconds: number;
}
interface PolymarketMarketGraphql {
question: string;
Expand Down Expand Up @@ -851,6 +852,35 @@ export const getNotifiedProposals = async (): Promise<{
}, {});
};

export interface FailedProposalRecord {
firstFailureAt: string;
failureCount: number;
lastError: string;
}

export const getFailedProposal = async (proposalKey: string): Promise<FailedProposalRecord | null> => {
if (!datastore) return null;
const key = datastore.key(["FailedProposals", proposalKey]);
const [entity] = await datastore.get(key);
return entity ? (entity as FailedProposalRecord) : null;
};

export const storeFailedProposal = async (proposalKey: string, record: FailedProposalRecord): Promise<void> => {
if (!datastore) return;
const key = datastore.key(["FailedProposals", proposalKey]);
await datastore.save({ key, data: record });
};

export const removeFailedProposal = async (proposalKey: string): Promise<void> => {
if (!datastore) return;
try {
const key = datastore.key(["FailedProposals", proposalKey]);
await datastore.delete(key);
} catch {
// Ignore — entity may not exist
}
};

export const parseEnvList = (env: NodeJS.ProcessEnv, key: string, defaultValue: string[]): string[] => {
const rawValue = env[key];
if (!rawValue) return defaultValue;
Expand Down Expand Up @@ -895,8 +925,8 @@ export const initMonitoringParams = async (
const pollingDelay = env.POLLING_DELAY ? Number(env.POLLING_DELAY) : 60;

const maxBlockLookBack = env.MAX_BLOCK_LOOK_BACK ? Number(env.MAX_BLOCK_LOOK_BACK) : 3499;
const retryAttempts = env.RETRY_ATTEMPTS ? Number(env.RETRY_ATTEMPTS) : 1;
const retryDelayMs = env.RETRY_DELAY_MS ? Number(env.RETRY_DELAY_MS) : 0;
const retryAttempts = env.RETRY_ATTEMPTS ? Number(env.RETRY_ATTEMPTS) : 3;
const retryDelayMs = env.RETRY_DELAY_MS ? Number(env.RETRY_DELAY_MS) : 1000;

const unknownProposalNotificationInterval = env.UNKNOWN_PROPOSAL_NOTIFICATION_INTERVAL
? Number(env.UNKNOWN_PROPOSAL_NOTIFICATION_INTERVAL)
Expand Down Expand Up @@ -936,6 +966,10 @@ export const initMonitoringParams = async (

const orderBookBatchSize = env.ORDER_BOOK_BATCH_SIZE ? Number(env.ORDER_BOOK_BATCH_SIZE) : 499;

// Grace period before alerting on failures. 0 = alert immediately (original behavior).
// Default 630s = 11 minutes = 2 consecutive 5-minute serverless runs + 30s buffer.
const failureGracePeriodSeconds = env.FAILURE_GRACE_PERIOD_SECONDS ? Number(env.FAILURE_GRACE_PERIOD_SECONDS) : 630;

// Rate limit and retry with exponential backoff and jitter to handle rate limiting and errors from the APIs.
const httpClient = createHttpClient({
axios: { timeout: httpTimeout },
Expand Down Expand Up @@ -985,6 +1019,7 @@ export const initMonitoringParams = async (
paginatedEventQueryConcurrency,
maxTradesPerToken,
fillEventsChunkBlocks,
failureGracePeriodSeconds,
};
};

Expand Down
211 changes: 211 additions & 0 deletions packages/monitor-v2/test/PolymarketMonitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ describe("PolymarketNotifier", function () {
paginatedEventQueryConcurrency: 5,
maxTradesPerToken: 50,
fillEventsChunkBlocks: 30,
failureGracePeriodSeconds: 0, // Disable grace period by default to preserve existing test behavior
};
};

Expand Down Expand Up @@ -167,6 +168,11 @@ describe("PolymarketNotifier", function () {
// Tests that need to override this should call fetchBoundedStub.restore() first
fetchBoundedStub = sandbox.stub(commonModule, "fetchOrderFilledEventsBounded").resolves(new Map());

// Stub failure grace period Datastore functions (no-ops by default)
sandbox.stub(commonModule, "getFailedProposal").resolves(null);
sandbox.stub(commonModule, "storeFailedProposal").resolves();
sandbox.stub(commonModule, "removeFailedProposal").resolves();

// Fund staker and stake tokens.
const TEN_MILLION = ethers.utils.parseEther("10000000");
await (await votingToken.addMinter(await deployer.getAddress())).wait();
Expand Down Expand Up @@ -1393,4 +1399,209 @@ describe("PolymarketNotifier", function () {
assert.deepEqual(result[1], [], "token2 returns empty array");
});
});

describe("failure grace period", function () {
const makeGracePeriodParams = async (gracePeriodSeconds: number) => {
const base = await createMonitoringParams();
return { ...base, failureGracePeriodSeconds: gracePeriodSeconds };
};

const makeMockProposal = async (): Promise<OptimisticPriceRequest> => ({
proposalHash: "0xgracetest",
requester: await deployer.getAddress(),
proposer: await deployer.getAddress(),
identifier: "0x5945535f4f525f4e4f5f51554552590000000000000000000000000000000000",
proposedPrice: ONE,
requestTimestamp: ethers.BigNumber.from(Date.now()),
proposalBlockNumber: 12345,
ancillaryData: ethers.utils.hexlify(ancillaryData),
requestHash: "0xrequesthash",
requestLogIndex: 0,
proposalTimestamp: ethers.BigNumber.from(Date.now()),
proposalExpirationTimestamp: ethers.BigNumber.from(Date.now() + 1000 * 60 * 60 * 24),
proposalLogIndex: 0,
});

it("First failure within grace period: warns but does not alert", async function () {
const params = await makeGracePeriodParams(900); // 15 min grace period

const mockProposal = await makeMockProposal();
sandbox.stub(commonModule, "getPolymarketProposedPriceRequestsOO").callsFake(async (_p, v) => {
return v === "v2" ? [mockProposal] : [];
});

mockFunctionThrowsError("getPolymarketMarketInformation", "Network timeout");
mockFunctionWithReturnValue("getPolymarketOrderBooks", asBooksRecord(emptyOrders));
mockSyncFunctionWithReturnValue("getOrderFilledEvents", emptyTradeInformation);

// getFailedProposal returns null (first failure)
// storeFailedProposal and removeFailedProposal already stubbed in beforeEach

await oov2.requestPrice(identifier, 1, ancillaryData, votingToken.address, 0);
await oov2.proposePrice(await deployer.getAddress(), identifier, 1, ancillaryData, ONE);

const spy = sinon.spy();
const spyLogger = createNewLogger([new SpyTransport({}, { spy })]);
await monitorTransactionsProposedOrderBook(spyLogger, params);

// Should get a warn log, NOT an error alert
const warnLogs = spy
.getCalls()
.map((c) => c.lastArg)
.filter((a) => a?.message === "Check failed, will retry before alerting");
assert.equal(warnLogs.length, 1, "Should log a warning for first failure");

// Should NOT have the "Failed to verify" error notification
const errorLogs = spy
.getCalls()
.map((c) => c.lastArg)
.filter((a) => a?.message?.includes("Failed to verify proposed market"));
assert.equal(errorLogs.length, 0, "Should not send error notification on first failure");

// storeFailedProposal should have been called
assert.isTrue((commonModule.storeFailedProposal as sinon.SinonStub).calledOnce, "Should store failure record");
});

it("Subsequent failure within grace period: warns with attempt count", async function () {
const params = await makeGracePeriodParams(900);

const mockProposal = await makeMockProposal();
sandbox.stub(commonModule, "getPolymarketProposedPriceRequestsOO").callsFake(async (_p, v) => {
return v === "v2" ? [mockProposal] : [];
});

mockFunctionThrowsError("getPolymarketMarketInformation", "Network timeout");
mockFunctionWithReturnValue("getPolymarketOrderBooks", asBooksRecord(emptyOrders));
mockSyncFunctionWithReturnValue("getOrderFilledEvents", emptyTradeInformation);

// Simulate existing failure record from a recent first failure (within grace period)
(commonModule.getFailedProposal as sinon.SinonStub).resolves({
firstFailureAt: new Date(Date.now() - 5 * 60 * 1000).toISOString(), // 5 min ago
failureCount: 1,
lastError: "Previous error",
});

await oov2.requestPrice(identifier, 1, ancillaryData, votingToken.address, 0);
await oov2.proposePrice(await deployer.getAddress(), identifier, 1, ancillaryData, ONE);

const spy = sinon.spy();
const spyLogger = createNewLogger([new SpyTransport({}, { spy })]);
await monitorTransactionsProposedOrderBook(spyLogger, params);

// Should get a warn log with attempt count
const warnLogs = spy
.getCalls()
.map((c) => c.lastArg)
.filter((a) => a?.message?.includes("still within grace period"));
assert.equal(warnLogs.length, 1, "Should log retry warning");
assert.include(warnLogs[0].message, "attempt 2", "Should include attempt count");

// Should NOT have error notification
const errorLogs = spy
.getCalls()
.map((c) => c.lastArg)
.filter((a) => a?.message?.includes("Failed to verify proposed market"));
assert.equal(errorLogs.length, 0, "Should not alert within grace period");
});

it("Failure after grace period exceeded: sends alert and cleans up", async function () {
const params = await makeGracePeriodParams(900);

const mockProposal = await makeMockProposal();
sandbox.stub(commonModule, "getPolymarketProposedPriceRequestsOO").callsFake(async (_p, v) => {
return v === "v2" ? [mockProposal] : [];
});

mockFunctionThrowsError("getPolymarketMarketInformation", "Network timeout");
mockFunctionWithReturnValue("getPolymarketOrderBooks", asBooksRecord(emptyOrders));
mockSyncFunctionWithReturnValue("getOrderFilledEvents", emptyTradeInformation);

// Simulate failure record from 20 min ago (exceeds 15 min grace period)
(commonModule.getFailedProposal as sinon.SinonStub).resolves({
firstFailureAt: new Date(Date.now() - 20 * 60 * 1000).toISOString(), // 20 min ago
failureCount: 3,
lastError: "Previous error",
});

await oov2.requestPrice(identifier, 1, ancillaryData, votingToken.address, 0);
await oov2.proposePrice(await deployer.getAddress(), identifier, 1, ancillaryData, ONE);

const spy = sinon.spy();
const spyLogger = createNewLogger([new SpyTransport({}, { spy })]);
await monitorTransactionsProposedOrderBook(spyLogger, params);

// Should now have the "Failed to verify" error notification
const errorLogs = spy
.getCalls()
.map((c) => c.lastArg)
.filter((a) => a?.message?.includes("Failed to verify proposed market"));
assert.equal(errorLogs.length, 1, "Should send error alert after grace period");

// removeFailedProposal should have been called to clean up
assert.isTrue(
(commonModule.removeFailedProposal as sinon.SinonStub).called,
"Should remove failure record after alerting"
);
});

it("Successful check clears any prior failure record", async function () {
const params = await makeGracePeriodParams(900);

const mockProposal = await makeMockProposal();
sandbox.stub(commonModule, "getPolymarketProposedPriceRequestsOO").callsFake(async (_p, v) => {
return v === "v2" ? [mockProposal] : [];
});

sandbox.stub(commonModule, "getPolymarketMarketInformation").resolves(marketInfo);
sandbox.stub(commonModule, "getPolymarketOrderBooks").resolves(asBooksRecord(emptyOrders));
sandbox.stub(commonModule, "getOrderFilledEvents").returns([[], []]);
sandbox.stub(commonModule, "isInitialConfirmationLogged").resolves(true);

await oov2.requestPrice(identifier, 1, ancillaryData, votingToken.address, 0);
await oov2.proposePrice(await deployer.getAddress(), identifier, 1, ancillaryData, ONE);

const spy = sinon.spy();
const spyLogger = createNewLogger([new SpyTransport({}, { spy })]);
await monitorTransactionsProposedOrderBook(spyLogger, params);

// removeFailedProposal should have been called on the success path
assert.isTrue(
(commonModule.removeFailedProposal as sinon.SinonStub).called,
"Should clean up failure record on success"
);
});

it("Grace period of 0 alerts immediately (backward-compatible)", async function () {
const params = await makeGracePeriodParams(0);

const mockProposal = await makeMockProposal();
sandbox.stub(commonModule, "getPolymarketProposedPriceRequestsOO").callsFake(async (_p, v) => {
return v === "v2" ? [mockProposal] : [];
});

mockFunctionThrowsError("getPolymarketMarketInformation", "Network timeout");
mockFunctionWithReturnValue("getPolymarketOrderBooks", asBooksRecord(emptyOrders));
mockSyncFunctionWithReturnValue("getOrderFilledEvents", emptyTradeInformation);

await oov2.requestPrice(identifier, 1, ancillaryData, votingToken.address, 0);
await oov2.proposePrice(await deployer.getAddress(), identifier, 1, ancillaryData, ONE);

const spy = sinon.spy();
const spyLogger = createNewLogger([new SpyTransport({}, { spy })]);
await monitorTransactionsProposedOrderBook(spyLogger, params);

// Should get the immediate error alert (no grace period)
const errorLogs = spy
.getCalls()
.map((c) => c.lastArg)
.filter((a) => a?.message?.includes("Failed to verify proposed market"));
assert.equal(errorLogs.length, 1, "Grace period of 0 should alert immediately");

// Should NOT have called storeFailedProposal (skips grace period logic entirely)
assert.isFalse(
(commonModule.storeFailedProposal as sinon.SinonStub).called,
"Should not store failure record when grace period is 0"
);
});
});
});