Add full pending-transaction subscription support (closes #14)#53
Add full pending-transaction subscription support (closes #14)#53
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
📝 WalkthroughWalkthroughAdds end-to-end support for subscribing to full pending transactions: new Changes
Sequence DiagramsequenceDiagram
actor User
participant WS as "WsClient"
participant Sub as "Subscription Parser"
participant Prov as "Provider.parseSingleTransaction"
participant Type as "RpcTransaction"
User->>WS: subscribePendingTransactions(full=true)
WS->>WS: send eth_subscribe("newPendingTransactions", true)
Note over WS: Node emits subscription notifications
WS->>Sub: receive raw notification (params.result)
Sub->>Prov: parseSingleTransaction(raw_obj)
Prov->>Type: allocate & populate RpcTransaction
Type-->>Prov: RpcTransaction
Prov-->>Sub: RpcTransaction
Sub-->>WS: parsed RpcTransaction
WS-->>User: deliver RpcTransaction
User->>Type: freeRpcTransaction(allocator, tx)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Review rate limit: 2/5 reviews remaining, refill in 33 minutes and 14 seconds. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tests/integration_tests.zig`:
- Around line 444-455: The test currently assumes subscription notifications
contain full tx objects and calls
eth.subscription.parseTransactionFromNotification which fails for hash-only
notifications; update the test around client.next()/ev.payload to detect and
skip hash-only pending notifications (e.g., inspect ev.payload for a JSON
string/hash or catch the parse error) so that when the node emits a plain tx
hash you continue the loop instead of failing; keep the existing defer
eth.rpc_transaction.freeRpcTransaction(allocator, tx) for the successful parse
path and only call it when parseTransactionFromNotification returns a non-error
tx.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: d5334343-fd66-434f-909e-a53ff4a7cc8c
📒 Files selected for processing (7)
README.mdsrc/provider.zigsrc/root.zigsrc/rpc_transaction.zigsrc/subscription.zigsrc/ws_client.zigtests/integration_tests.zig
Implements the geth-style eth_subscribe("newPendingTransactions", true)
variant so MEV bots can read pending transactions as full Transaction
objects without the extra eth_getTransactionByHash round-trip per tx.
API changes:
- SubscriptionParams.new_pending_transactions changes from `void` to
`PendingTxParams { full: bool = false }`. Callers writing
`.{ .new_pending_transactions = {} }` must update to
`.{ .new_pending_transactions = .{} }`.
- buildSubscribeParams emits `["newPendingTransactions"]` (hashes only)
or `["newPendingTransactions", true]` (full tx).
New types and parsers:
- src/rpc_transaction.zig: RpcTransaction flat struct covering all four
transaction types (legacy, EIP-2930, EIP-1559, EIP-4844). Captures the
RPC-only fields (hash, recovered from, optional block-position fields)
that the canonical signing-time Transaction union does not. v1 omits
access_list and blob_versioned_hashes parsing -- both are rare in
mempool sniping and can be added without a breaking change.
- provider.parseSingleTransaction(allocator, obj) -> RpcTransaction.
Mirrors parseSingleLog. Handles both `input` and the legacy `data`
alias. Caller owns the input slice; release with
rpc_transaction.freeRpcTransaction.
- subscription.parseTransactionFromNotification(allocator, raw) wraps
the above for use with WsClient.next() events.
Tests:
- 5 new unit tests in provider.zig (pending EIP-1559, mined legacy,
contract creation, data alias, full notification round-trip).
- 2 new struct/free helper tests in rpc_transaction.zig.
- 1 new buildSubscribeParams test for the `full` variant.
- 1 new integration test against Anvil that subscribes, sends a tx via
the wallet, and verifies the parsed RpcTransaction matches what was
sent (hash, recipient, value, pending state).
All 1300+ unit tests pass; 19/19 integration tests pass against Anvil.
Closes #14.
Some nodes (and even some Anvil builds) ignore the `full = true`
parameter on `eth_subscribe("newPendingTransactions", true)` and stream
plain transaction hashes (JSON strings) instead of full objects. The
integration test previously called parseTransactionFromNotification
unconditionally, which would error on those payloads and surface as
test failure rather than the intended skip-and-keep-reading behavior.
Catch error.InvalidNotification from the parser and `continue` so the
loop drains until either the matching tx is observed or the attempt
budget is exhausted (which already returns early).
5ce6c60 to
e8f0c8b
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/provider.zig`:
- Around line 851-876: The code currently silently coerces malformed fields
(tx_index via parseHexU32, type_val via parseHexU8 with catch 0, and v via
parseHexU256 with orelse "0x0"), which hides invalid RPC payloads; update the
parsing of transactionIndex (tx_index) to propagate failures instead of mapping
parseHexU32 errors to null (use try or orelse return error.InvalidResponse),
change type parsing (type_val) to return error.InvalidResponse on parse failures
rather than catch 0, and require v to be present/parsable by replacing the
orelse "0x0" with either try or orelse return error.InvalidResponse; use
jsonGetString, parseHexU32, parseHexU8, parseHexU256 and error.InvalidResponse
to locate and fix these spots so malformed fields cause immediate
error.InvalidResponse instead of silent coercion.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 886bcce9-7736-4719-bcd0-3932e9193620
📒 Files selected for processing (7)
README.mdsrc/provider.zigsrc/root.zigsrc/rpc_transaction.zigsrc/subscription.zigsrc/ws_client.zigtests/integration_tests.zig
✅ Files skipped from review due to trivial changes (1)
- src/root.zig
🚧 Files skipped from review as they are similar to previous changes (4)
- src/ws_client.zig
- tests/integration_tests.zig
- src/rpc_transaction.zig
- src/subscription.zig
| const tx_index: ?u32 = if (jsonGetString(obj, "transactionIndex")) |s| | ||
| parseHexU32(s) catch null | ||
| else | ||
| null; | ||
|
|
||
| const from_addr = (try parseOptionalAddress(jsonGetString(obj, "from"))) orelse return error.InvalidResponse; | ||
| const to_addr = try parseOptionalAddress(jsonGetString(obj, "to")); | ||
| const value = try parseHexU256(jsonGetString(obj, "value") orelse "0x0"); | ||
|
|
||
| const gas = try parseHexU64(jsonGetString(obj, "gas") orelse return error.InvalidResponse); | ||
| const gas_price: ?u256 = if (jsonGetString(obj, "gasPrice")) |s| try parseHexU256(s) else null; | ||
| const max_fee: ?u256 = if (jsonGetString(obj, "maxFeePerGas")) |s| try parseHexU256(s) else null; | ||
| const max_priority: ?u256 = if (jsonGetString(obj, "maxPriorityFeePerGas")) |s| try parseHexU256(s) else null; | ||
| const max_blob_fee: ?u256 = if (jsonGetString(obj, "maxFeePerBlobGas")) |s| try parseHexU256(s) else null; | ||
|
|
||
| // `input` and `data` are aliases; geth uses `input`, parity used `data`. | ||
| const input_str = jsonGetString(obj, "input") orelse jsonGetString(obj, "data") orelse "0x"; | ||
| const input = try parseHexBytes(allocator, input_str); | ||
| errdefer allocator.free(input); | ||
|
|
||
| const v = try parseHexU256(jsonGetString(obj, "v") orelse "0x0"); | ||
| const r = try parseHash(jsonGetString(obj, "r") orelse return error.InvalidResponse); | ||
| const s = try parseHash(jsonGetString(obj, "s") orelse return error.InvalidResponse); | ||
|
|
||
| const type_val = parseHexU8(jsonGetString(obj, "type") orelse "0x0") catch 0; | ||
| const chain_id = try parseOptionalHexU64(jsonGetString(obj, "chainId")); |
There was a problem hiding this comment.
Do not silently coerce malformed transaction fields.
transactionIndex parse failures are coerced to null, type parse failures are coerced to 0, and missing v is coerced to 0x0. This can misclassify malformed RPC payloads as valid legacy transactions instead of failing fast with error.InvalidResponse.
Suggested fix
- const tx_index: ?u32 = if (jsonGetString(obj, "transactionIndex")) |s|
- parseHexU32(s) catch null
- else
- null;
+ const tx_index: ?u32 = if (jsonGetString(obj, "transactionIndex")) |s|
+ try parseHexU32(s)
+ else
+ null;
- const v = try parseHexU256(jsonGetString(obj, "v") orelse "0x0");
+ const v = try parseHexU256(jsonGetString(obj, "v") orelse return error.InvalidResponse);
- const type_val = parseHexU8(jsonGetString(obj, "type") orelse "0x0") catch 0;
+ const type_val: u8 = if (jsonGetString(obj, "type")) |t|
+ try parseHexU8(t)
+ else
+ 0;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const tx_index: ?u32 = if (jsonGetString(obj, "transactionIndex")) |s| | |
| parseHexU32(s) catch null | |
| else | |
| null; | |
| const from_addr = (try parseOptionalAddress(jsonGetString(obj, "from"))) orelse return error.InvalidResponse; | |
| const to_addr = try parseOptionalAddress(jsonGetString(obj, "to")); | |
| const value = try parseHexU256(jsonGetString(obj, "value") orelse "0x0"); | |
| const gas = try parseHexU64(jsonGetString(obj, "gas") orelse return error.InvalidResponse); | |
| const gas_price: ?u256 = if (jsonGetString(obj, "gasPrice")) |s| try parseHexU256(s) else null; | |
| const max_fee: ?u256 = if (jsonGetString(obj, "maxFeePerGas")) |s| try parseHexU256(s) else null; | |
| const max_priority: ?u256 = if (jsonGetString(obj, "maxPriorityFeePerGas")) |s| try parseHexU256(s) else null; | |
| const max_blob_fee: ?u256 = if (jsonGetString(obj, "maxFeePerBlobGas")) |s| try parseHexU256(s) else null; | |
| // `input` and `data` are aliases; geth uses `input`, parity used `data`. | |
| const input_str = jsonGetString(obj, "input") orelse jsonGetString(obj, "data") orelse "0x"; | |
| const input = try parseHexBytes(allocator, input_str); | |
| errdefer allocator.free(input); | |
| const v = try parseHexU256(jsonGetString(obj, "v") orelse "0x0"); | |
| const r = try parseHash(jsonGetString(obj, "r") orelse return error.InvalidResponse); | |
| const s = try parseHash(jsonGetString(obj, "s") orelse return error.InvalidResponse); | |
| const type_val = parseHexU8(jsonGetString(obj, "type") orelse "0x0") catch 0; | |
| const chain_id = try parseOptionalHexU64(jsonGetString(obj, "chainId")); | |
| const tx_index: ?u32 = if (jsonGetString(obj, "transactionIndex")) |s| | |
| try parseHexU32(s) | |
| else | |
| null; | |
| const from_addr = (try parseOptionalAddress(jsonGetString(obj, "from"))) orelse return error.InvalidResponse; | |
| const to_addr = try parseOptionalAddress(jsonGetString(obj, "to")); | |
| const value = try parseHexU256(jsonGetString(obj, "value") orelse "0x0"); | |
| const gas = try parseHexU64(jsonGetString(obj, "gas") orelse return error.InvalidResponse); | |
| const gas_price: ?u256 = if (jsonGetString(obj, "gasPrice")) |s| try parseHexU256(s) else null; | |
| const max_fee: ?u256 = if (jsonGetString(obj, "maxFeePerGas")) |s| try parseHexU256(s) else null; | |
| const max_priority: ?u256 = if (jsonGetString(obj, "maxPriorityFeePerGas")) |s| try parseHexU256(s) else null; | |
| const max_blob_fee: ?u256 = if (jsonGetString(obj, "maxFeePerBlobGas")) |s| try parseHexU256(s) else null; | |
| // `input` and `data` are aliases; geth uses `input`, parity used `data`. | |
| const input_str = jsonGetString(obj, "input") orelse jsonGetString(obj, "data") orelse "0x"; | |
| const input = try parseHexBytes(allocator, input_str); | |
| errdefer allocator.free(input); | |
| const v = try parseHexU256(jsonGetString(obj, "v") orelse return error.InvalidResponse); | |
| const r = try parseHash(jsonGetString(obj, "r") orelse return error.InvalidResponse); | |
| const s = try parseHash(jsonGetString(obj, "s") orelse return error.InvalidResponse); | |
| const type_val: u8 = if (jsonGetString(obj, "type")) |t| | |
| try parseHexU8(t) | |
| else | |
| 0; | |
| const chain_id = try parseOptionalHexU64(jsonGetString(obj, "chainId")); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/provider.zig` around lines 851 - 876, The code currently silently coerces
malformed fields (tx_index via parseHexU32, type_val via parseHexU8 with catch
0, and v via parseHexU256 with orelse "0x0"), which hides invalid RPC payloads;
update the parsing of transactionIndex (tx_index) to propagate failures instead
of mapping parseHexU32 errors to null (use try or orelse return
error.InvalidResponse), change type parsing (type_val) to return
error.InvalidResponse on parse failures rather than catch 0, and require v to be
present/parsable by replacing the orelse "0x0" with either try or orelse return
error.InvalidResponse; use jsonGetString, parseHexU32, parseHexU8, parseHexU256
and error.InvalidResponse to locate and fix these spots so malformed fields
cause immediate error.InvalidResponse instead of silent coercion.
Summary
Implements geth-style
eth_subscribe(\"newPendingTransactions\", true)so MEV bots can read pending transactions as fullRpcTransactionobjects instead of just hashes -- saves the round-tripeth_getTransactionByHashcall per pending tx.API changes
SubscriptionParams.new_pending_transactionschanges fromvoidtoPendingTxParams { full: bool = false }. Slight API break (no public consumers yet -- the WsClient was just merged in Add WsClient: resilient WebSocket subscriptions with auto-reconnect #51). Migration:buildSubscribeParamsemits[\"newPendingTransactions\"]or[\"newPendingTransactions\", true].New types and parsers
src/rpc_transaction.zig--RpcTransactionflat struct covering all four tx types (legacy / EIP-2930 / EIP-1559 / EIP-4844). Distinct from the canonical signing-timeTransactionunion; captures the RPC-only fields (hash, recoveredfrom, optional block-position fields). v1 omitsaccess_listandblob_versioned_hashesarray parsing -- both are rare in mempool sniping and can be added later without a breaking change.provider.parseSingleTransaction(allocator, obj)-- mirrorsparseSingleLog. Handles bothinputand the legacydataalias. Caller ownstx.input; release withrpc_transaction.freeRpcTransaction.subscription.parseTransactionFromNotification(allocator, raw)-- wraps the above for use withWsClient.next()events.Test plan
make cipasses (1300+ unit tests)zig build integration-testpasses against Anvil: 19/19 (one new test sends a tx, then verifies the streamedRpcTransactionmatches by hash, recipient, value, and pending state)coderabbit review --prompt-only --type committed --base origin/main: no findingsOut of scope
access_listandblob_versioned_hashesarray parsing (additive follow-up)client.nextPendingTx()convenience helper -- the existingparseTransactionFromNotificationis enough; we can add sugar after we see real usage patternsCloses #14.
Summary by CodeRabbit
New Features
Bug Fixes
Tests