Skip to content

feat(tasks): add task run event stream sender#2110

Open
tatoalo wants to merge 2 commits into
mainfrom
feat/tasks-event-stream-sender
Open

feat(tasks): add task run event stream sender#2110
tatoalo wants to merge 2 commits into
mainfrom
feat/tasks-event-stream-sender

Conversation

@tatoalo
Copy link
Copy Markdown
Contributor

@tatoalo tatoalo commented May 8, 2026

Problem

agent server needs a resilient sender for task run live events before it can deliver events directly to the backend. It should preserve event order, tolerate retries, and avoid making agent execution wait on each individual live event write.

Changes

  • added TaskRunEventStreamSender for NDJSON event ingest requests to PostHog BE
  • added sequence sync and rebase handling so retries can recover from already-accepted events
  • added batching, request timeouts, stop-time draining, completion handshakes, oversized-event drops, and bounded backpressure
  • added unit coverage tests for various scenarios

Copy link
Copy Markdown
Contributor Author

tatoalo commented May 8, 2026

This stack of pull requests is managed by Graphite. Learn more about stacking.

@tatoalo tatoalo marked this pull request as ready for review May 8, 2026 15:38
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 8, 2026

Prompt To Fix All With AI
Fix the following 3 code review issues. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 3
packages/agent/src/server/event-stream-sender.test.ts:169-203
**Assertion inside mock is silently swallowed on failure**

`expect(init?.body).toBe("")` on line 172 lives inside the async mock. If the assertion throws, it rejects the promise returned by `fetchMock`, which is caught by `flush()`'s catch block and logged as a warning — the test still passes, masking the regression. The same pattern appears at lines 332–336 in "drains multiple capped batches on stop" where `expect(…X-PostHog-Event-Stream-Complete…).toBeUndefined()` is also inside the mock body. Move these assertions outside the mock (collect call args with `fetchMock.mock.calls` after `await sender.stop()` and assert there) to ensure vitest actually fails the test on violation.

### Issue 2 of 3
packages/agent/src/server/event-stream-sender.test.ts:205-304
**Prefer parameterised tests for drop scenarios**

"drops new events before assigning sequence when the buffer is full" (line 205) and "drops oversized events before assigning sequence" (line 254) test the identical observable behaviour — enqueue two events, the second is silently dropped, only the first reaches the server with `seq: 1`. The only difference is the drop trigger (`maxBufferedEvents: 1` vs `maxEventBytes: 120`). Per the team's rule, these should be a single `it.each` parameterised test sharing the mock and assertion logic rather than two separate near-identical blocks.

### Issue 3 of 3
packages/agent/src/server/event-stream-sender.ts:403-406
**Oversized-event size estimate is larger than the actual serialised size**

The size check serialises the envelope with `Number.MAX_SAFE_INTEGER` (16 digits) as the placeholder `seq`. The actual assigned sequence number will always be far smaller (e.g., 1 digit for `seq: 1`), so `eventBytes` can overestimate the real size by up to ~15 bytes. For events right at the `maxEventBytes` boundary this causes valid events to be incorrectly dropped. A safer upper bound would be the current `this.sequence + 1` (the next seq to be assigned), or a small fixed constant like `999_999` rather than `Number.MAX_SAFE_INTEGER`.

Reviews (1): Last reviewed commit: "feat(tasks): add task run event stream s..." | Re-trigger Greptile

Comment thread packages/agent/src/server/event-stream-sender.test.ts Outdated
Comment thread packages/agent/src/server/event-stream-sender.test.ts Outdated
Comment thread packages/agent/src/server/event-stream-sender.ts
@tatoalo tatoalo force-pushed the feat/tasks-event-stream-sender branch from ba262d3 to 70c5092 Compare May 8, 2026 15:50
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 8, 2026

Prompt To Fix All With AI
Fix the following 1 code review issue. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 1
packages/agent/src/server/event-stream-sender.ts:204-219
**AbortController + timeout fetch pattern repeated three times**

The same `AbortController` / `setTimeout` / `try { await fetch } finally { clearTimeout }` block appears identically in `syncSequenceWithServer` (lines 310–325), `sendBufferedEvents` (lines 204–219), and `completeTransport` (lines 246–265). This violates the team's OnceAndOnlyOnce rule. A private helper such as `fetchWithTimeout(init: RequestInit): Promise<Response>` would centralize the pattern and make future changes (e.g. adjusting abort semantics) a single-site edit.

Reviews (2): Last reviewed commit: "feat(tasks): add task run event stream s..." | Re-trigger Greptile

@tatoalo tatoalo self-assigned this May 8, 2026
@tatoalo tatoalo requested a review from a team May 8, 2026 16:53
Copy link
Copy Markdown
Contributor

@joshsny joshsny left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to go the approach of batching into POST requests in production, the 50ms batching is okay-ish but the time it takes to form the connection, validate the auth in django and post it to the stream could add up to a few hundred ms, which will make the stream very choppy.

I'd suggest we open a POST request and stream the body in and do that over some period (say 5 mins or until disconnect) and then we rely on the same logic of "ask for last event" when reconnecting again

@tatoalo tatoalo force-pushed the feat/tasks-event-stream-sender branch from 70c5092 to 0c05cf5 Compare May 18, 2026 15:08
@tatoalo tatoalo force-pushed the feat/tasks-event-stream-sender branch from 0c05cf5 to 768b082 Compare May 18, 2026 15:17
@tatoalo
Copy link
Copy Markdown
Contributor Author

tatoalo commented May 18, 2026

@joshsny good point, I changed this to avoid relying on a long-lived in-flight POST wo we have the agent server still posts directly to Django with ordered seqs, but it now uses short sequenced NDJSON request windows. That should maintain the reliability/ordering properties while avoiding proxy/tunnel resets and latency concerns from multi steps 🙏🏻

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants