feat(tasks): add task run event stream sender#2110
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
Prompt To Fix All With AIFix the following 3 code review issues. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 3
packages/agent/src/server/event-stream-sender.test.ts:169-203
**Assertion inside mock is silently swallowed on failure**
`expect(init?.body).toBe("")` on line 172 lives inside the async mock. If the assertion throws, it rejects the promise returned by `fetchMock`, which is caught by `flush()`'s catch block and logged as a warning — the test still passes, masking the regression. The same pattern appears at lines 332–336 in "drains multiple capped batches on stop" where `expect(…X-PostHog-Event-Stream-Complete…).toBeUndefined()` is also inside the mock body. Move these assertions outside the mock (collect call args with `fetchMock.mock.calls` after `await sender.stop()` and assert there) to ensure vitest actually fails the test on violation.
### Issue 2 of 3
packages/agent/src/server/event-stream-sender.test.ts:205-304
**Prefer parameterised tests for drop scenarios**
"drops new events before assigning sequence when the buffer is full" (line 205) and "drops oversized events before assigning sequence" (line 254) test the identical observable behaviour — enqueue two events, the second is silently dropped, only the first reaches the server with `seq: 1`. The only difference is the drop trigger (`maxBufferedEvents: 1` vs `maxEventBytes: 120`). Per the team's rule, these should be a single `it.each` parameterised test sharing the mock and assertion logic rather than two separate near-identical blocks.
### Issue 3 of 3
packages/agent/src/server/event-stream-sender.ts:403-406
**Oversized-event size estimate is larger than the actual serialised size**
The size check serialises the envelope with `Number.MAX_SAFE_INTEGER` (16 digits) as the placeholder `seq`. The actual assigned sequence number will always be far smaller (e.g., 1 digit for `seq: 1`), so `eventBytes` can overestimate the real size by up to ~15 bytes. For events right at the `maxEventBytes` boundary this causes valid events to be incorrectly dropped. A safer upper bound would be the current `this.sequence + 1` (the next seq to be assigned), or a small fixed constant like `999_999` rather than `Number.MAX_SAFE_INTEGER`.
Reviews (1): Last reviewed commit: "feat(tasks): add task run event stream s..." | Re-trigger Greptile |
ba262d3 to
70c5092
Compare
Prompt To Fix All With AIFix the following 1 code review issue. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 1
packages/agent/src/server/event-stream-sender.ts:204-219
**AbortController + timeout fetch pattern repeated three times**
The same `AbortController` / `setTimeout` / `try { await fetch } finally { clearTimeout }` block appears identically in `syncSequenceWithServer` (lines 310–325), `sendBufferedEvents` (lines 204–219), and `completeTransport` (lines 246–265). This violates the team's OnceAndOnlyOnce rule. A private helper such as `fetchWithTimeout(init: RequestInit): Promise<Response>` would centralize the pattern and make future changes (e.g. adjusting abort semantics) a single-site edit.
Reviews (2): Last reviewed commit: "feat(tasks): add task run event stream s..." | Re-trigger Greptile |
joshsny
left a comment
There was a problem hiding this comment.
I don't think we want to go the approach of batching into POST requests in production, the 50ms batching is okay-ish but the time it takes to form the connection, validate the auth in django and post it to the stream could add up to a few hundred ms, which will make the stream very choppy.
I'd suggest we open a POST request and stream the body in and do that over some period (say 5 mins or until disconnect) and then we rely on the same logic of "ask for last event" when reconnecting again
70c5092 to
0c05cf5
Compare
0c05cf5 to
768b082
Compare
|
@joshsny good point, I changed this to avoid relying on a long-lived in-flight POST wo we have the agent server still posts directly to Django with ordered seqs, but it now uses short sequenced NDJSON request windows. That should maintain the reliability/ordering properties while avoiding proxy/tunnel resets and latency concerns from multi steps 🙏🏻 |

Problem
agent server needs a resilient sender for task run live events before it can deliver events directly to the backend. It should preserve event order, tolerate retries, and avoid making agent execution wait on each individual live event write.
Changes
TaskRunEventStreamSenderfor NDJSON event ingest requests to PostHog BE