Skip to content

Add full pending-transaction subscription support (closes #14)#53

Open
koko1123 wants to merge 2 commits intomainfrom
feat/pending-tx-full
Open

Add full pending-transaction subscription support (closes #14)#53
koko1123 wants to merge 2 commits intomainfrom
feat/pending-tx-full

Conversation

@koko1123
Copy link
Copy Markdown
Contributor

@koko1123 koko1123 commented May 1, 2026

Summary

Implements geth-style eth_subscribe(\"newPendingTransactions\", true) so MEV bots can read pending transactions as full RpcTransaction objects instead of just hashes -- saves the round-trip eth_getTransactionByHash call per pending tx.

API changes

  • SubscriptionParams.new_pending_transactions changes from void to PendingTxParams { full: bool = false }. Slight API break (no public consumers yet -- the WsClient was just merged in Add WsClient: resilient WebSocket subscriptions with auto-reconnect #51). Migration:
    - .{ .new_pending_transactions = {} }
    + .{ .new_pending_transactions = .{} }                  // hashes only (default)
    + .{ .new_pending_transactions = .{ .full = true } }    // full Transaction objects
  • buildSubscribeParams emits [\"newPendingTransactions\"] or [\"newPendingTransactions\", true].

New types and parsers

  • src/rpc_transaction.zig -- RpcTransaction flat struct covering all four tx types (legacy / EIP-2930 / EIP-1559 / EIP-4844). Distinct from the canonical signing-time Transaction union; captures the RPC-only fields (hash, recovered from, optional block-position fields). v1 omits access_list and blob_versioned_hashes array parsing -- both are rare in mempool sniping and can be added later without a breaking change.
  • provider.parseSingleTransaction(allocator, obj) -- mirrors parseSingleLog. Handles both input and the legacy data alias. Caller owns tx.input; release with rpc_transaction.freeRpcTransaction.
  • subscription.parseTransactionFromNotification(allocator, raw) -- wraps the above for use with WsClient.next() events.

Test plan

  • make ci passes (1300+ unit tests)
  • zig build integration-test passes against Anvil: 19/19 (one new test sends a tx, then verifies the streamed RpcTransaction matches by hash, recipient, value, and pending state)
  • coderabbit review --prompt-only --type committed --base origin/main: no findings

Out of scope

  • access_list and blob_versioned_hashes array parsing (additive follow-up)
  • A typed client.nextPendingTx() convenience helper -- the existing parseTransactionFromNotification is enough; we can add sugar after we see real usage patterns
  • Pending-tx filter helpers (to/from/value-threshold). Issue Pending transaction subscription (newPendingTransactions full tx) #14 mentions these as a follow-up

Closes #14.

Summary by CodeRabbit

  • New Features

    • WebSocket subscriptions can deliver full pending transaction payloads (complete transaction details).
    • Introduces a structured transaction type and robust parsing that accepts legacy and EIP-1559 fee formats and normalizes calldata fields with memory-safe handling.
  • Bug Fixes

    • Reconnection/resubscribe now preserves original pending-transaction parameters.
  • Tests

    • Added integration and unit tests covering full pending subscriptions, parsing variants, and memory cleanup.

@vercel
Copy link
Copy Markdown

vercel Bot commented May 1, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
eth-zig Ready Ready Preview, Comment May 1, 2026 8:24pm

Request Review

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 1, 2026

📝 Walkthrough

Walkthrough

Adds end-to-end support for subscribing to full pending transactions: new RpcTransaction type, parsing utilities for single transactions and subscription notifications, extended subscription params (PendingTxParams with full), small WebSocket resubscribe fix, and unit + integration tests exercising the flow.

Changes

Cohort / File(s) Summary
New Transaction Type
src/rpc_transaction.zig
Adds exported RpcTransaction struct and freeRpcTransaction to own/release heap-backed calldata; tests for field shapes and deallocation.
Transaction Parsing (provider)
src/provider.zig
Adds pub fn parseSingleTransaction(allocator, obj) !rpc_transaction_mod.RpcTransaction — parses required/optional fields, supports legacy and EIP-1559/EIP-4844 fees, handles input/data aliasing and allocs input bytes; includes unit tests.
Subscription Params & Notification Parsing
src/subscription.zig
Changes new_pending_transactions variant to PendingTxParams { full: bool }, updates subscribe param generation to include boolean when full, and adds parseTransactionFromNotification delegating to provider parser; test updates.
WS Client Resubscribe Fix
src/ws_client.zig
cloneParams now preserves the original new_pending_transactions payload so reconnect/resubscribe replays full flag.
Module Export / Test Inclusion
src/root.zig
Re-exports rpc_transaction and includes it in test compilation.
Examples & Integration Test
README.md, tests/integration_tests.zig
README example updated to show full = true; new WebSocket integration test subscribes with full=true, emits a tx, consumes notifications and parses full RpcTransaction, asserting expected fields and pending status.

Sequence Diagram

sequenceDiagram
    actor User
    participant WS as "WsClient"
    participant Sub as "Subscription Parser"
    participant Prov as "Provider.parseSingleTransaction"
    participant Type as "RpcTransaction"

    User->>WS: subscribePendingTransactions(full=true)
    WS->>WS: send eth_subscribe("newPendingTransactions", true)
    Note over WS: Node emits subscription notifications
    WS->>Sub: receive raw notification (params.result)
    Sub->>Prov: parseSingleTransaction(raw_obj)
    Prov->>Type: allocate & populate RpcTransaction
    Type-->>Prov: RpcTransaction
    Prov-->>Sub: RpcTransaction
    Sub-->>WS: parsed RpcTransaction
    WS-->>User: deliver RpcTransaction
    User->>Type: freeRpcTransaction(allocator, tx)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Poem

🐰 I nibble mempool crumbs and hop with glee,
Full txs arrive — not just a hash to see,
Bytes decoded, fees and sigs in tow,
I free the input when the bunnies go,
Hooray for websockets streaming mempool flow!

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and specifically summarizes the main change: adding support for full pending-transaction subscriptions, which is the primary objective of this changeset.
Linked Issues check ✅ Passed The PR fully implements the requirements from issue #14: eth_subscribe with full pending transactions support [#14], RPC transaction parsing [#14], WebSocket integration [#14], and includes comprehensive tests [#14].
Out of Scope Changes check ✅ Passed All changes are directly within scope: new RpcTransaction type, transaction parsers, subscription parameter updates, and integration tests for full pending-transaction subscriptions.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/pending-tx-full

Review rate limit: 2/5 reviews remaining, refill in 33 minutes and 14 seconds.

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@tests/integration_tests.zig`:
- Around line 444-455: The test currently assumes subscription notifications
contain full tx objects and calls
eth.subscription.parseTransactionFromNotification which fails for hash-only
notifications; update the test around client.next()/ev.payload to detect and
skip hash-only pending notifications (e.g., inspect ev.payload for a JSON
string/hash or catch the parse error) so that when the node emits a plain tx
hash you continue the loop instead of failing; keep the existing defer
eth.rpc_transaction.freeRpcTransaction(allocator, tx) for the successful parse
path and only call it when parseTransactionFromNotification returns a non-error
tx.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: d5334343-fd66-434f-909e-a53ff4a7cc8c

📥 Commits

Reviewing files that changed from the base of the PR and between 6cf28b5 and 5ce6c60.

📒 Files selected for processing (7)
  • README.md
  • src/provider.zig
  • src/root.zig
  • src/rpc_transaction.zig
  • src/subscription.zig
  • src/ws_client.zig
  • tests/integration_tests.zig

Comment thread tests/integration_tests.zig
koko1123 added 2 commits May 1, 2026 16:20
Implements the geth-style eth_subscribe("newPendingTransactions", true)
variant so MEV bots can read pending transactions as full Transaction
objects without the extra eth_getTransactionByHash round-trip per tx.

API changes:
- SubscriptionParams.new_pending_transactions changes from `void` to
  `PendingTxParams { full: bool = false }`. Callers writing
  `.{ .new_pending_transactions = {} }` must update to
  `.{ .new_pending_transactions = .{} }`.
- buildSubscribeParams emits `["newPendingTransactions"]` (hashes only)
  or `["newPendingTransactions", true]` (full tx).

New types and parsers:
- src/rpc_transaction.zig: RpcTransaction flat struct covering all four
  transaction types (legacy, EIP-2930, EIP-1559, EIP-4844). Captures the
  RPC-only fields (hash, recovered from, optional block-position fields)
  that the canonical signing-time Transaction union does not. v1 omits
  access_list and blob_versioned_hashes parsing -- both are rare in
  mempool sniping and can be added without a breaking change.
- provider.parseSingleTransaction(allocator, obj) -> RpcTransaction.
  Mirrors parseSingleLog. Handles both `input` and the legacy `data`
  alias. Caller owns the input slice; release with
  rpc_transaction.freeRpcTransaction.
- subscription.parseTransactionFromNotification(allocator, raw) wraps
  the above for use with WsClient.next() events.

Tests:
- 5 new unit tests in provider.zig (pending EIP-1559, mined legacy,
  contract creation, data alias, full notification round-trip).
- 2 new struct/free helper tests in rpc_transaction.zig.
- 1 new buildSubscribeParams test for the `full` variant.
- 1 new integration test against Anvil that subscribes, sends a tx via
  the wallet, and verifies the parsed RpcTransaction matches what was
  sent (hash, recipient, value, pending state).

All 1300+ unit tests pass; 19/19 integration tests pass against Anvil.

Closes #14.
Some nodes (and even some Anvil builds) ignore the `full = true`
parameter on `eth_subscribe("newPendingTransactions", true)` and stream
plain transaction hashes (JSON strings) instead of full objects. The
integration test previously called parseTransactionFromNotification
unconditionally, which would error on those payloads and surface as
test failure rather than the intended skip-and-keep-reading behavior.

Catch error.InvalidNotification from the parser and `continue` so the
loop drains until either the matching tx is observed or the attempt
budget is exhausted (which already returns early).
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/provider.zig`:
- Around line 851-876: The code currently silently coerces malformed fields
(tx_index via parseHexU32, type_val via parseHexU8 with catch 0, and v via
parseHexU256 with orelse "0x0"), which hides invalid RPC payloads; update the
parsing of transactionIndex (tx_index) to propagate failures instead of mapping
parseHexU32 errors to null (use try or orelse return error.InvalidResponse),
change type parsing (type_val) to return error.InvalidResponse on parse failures
rather than catch 0, and require v to be present/parsable by replacing the
orelse "0x0" with either try or orelse return error.InvalidResponse; use
jsonGetString, parseHexU32, parseHexU8, parseHexU256 and error.InvalidResponse
to locate and fix these spots so malformed fields cause immediate
error.InvalidResponse instead of silent coercion.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 886bcce9-7736-4719-bcd0-3932e9193620

📥 Commits

Reviewing files that changed from the base of the PR and between 5ce6c60 and e8f0c8b.

📒 Files selected for processing (7)
  • README.md
  • src/provider.zig
  • src/root.zig
  • src/rpc_transaction.zig
  • src/subscription.zig
  • src/ws_client.zig
  • tests/integration_tests.zig
✅ Files skipped from review due to trivial changes (1)
  • src/root.zig
🚧 Files skipped from review as they are similar to previous changes (4)
  • src/ws_client.zig
  • tests/integration_tests.zig
  • src/rpc_transaction.zig
  • src/subscription.zig

Comment thread src/provider.zig
Comment on lines +851 to +876
const tx_index: ?u32 = if (jsonGetString(obj, "transactionIndex")) |s|
parseHexU32(s) catch null
else
null;

const from_addr = (try parseOptionalAddress(jsonGetString(obj, "from"))) orelse return error.InvalidResponse;
const to_addr = try parseOptionalAddress(jsonGetString(obj, "to"));
const value = try parseHexU256(jsonGetString(obj, "value") orelse "0x0");

const gas = try parseHexU64(jsonGetString(obj, "gas") orelse return error.InvalidResponse);
const gas_price: ?u256 = if (jsonGetString(obj, "gasPrice")) |s| try parseHexU256(s) else null;
const max_fee: ?u256 = if (jsonGetString(obj, "maxFeePerGas")) |s| try parseHexU256(s) else null;
const max_priority: ?u256 = if (jsonGetString(obj, "maxPriorityFeePerGas")) |s| try parseHexU256(s) else null;
const max_blob_fee: ?u256 = if (jsonGetString(obj, "maxFeePerBlobGas")) |s| try parseHexU256(s) else null;

// `input` and `data` are aliases; geth uses `input`, parity used `data`.
const input_str = jsonGetString(obj, "input") orelse jsonGetString(obj, "data") orelse "0x";
const input = try parseHexBytes(allocator, input_str);
errdefer allocator.free(input);

const v = try parseHexU256(jsonGetString(obj, "v") orelse "0x0");
const r = try parseHash(jsonGetString(obj, "r") orelse return error.InvalidResponse);
const s = try parseHash(jsonGetString(obj, "s") orelse return error.InvalidResponse);

const type_val = parseHexU8(jsonGetString(obj, "type") orelse "0x0") catch 0;
const chain_id = try parseOptionalHexU64(jsonGetString(obj, "chainId"));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Do not silently coerce malformed transaction fields.

transactionIndex parse failures are coerced to null, type parse failures are coerced to 0, and missing v is coerced to 0x0. This can misclassify malformed RPC payloads as valid legacy transactions instead of failing fast with error.InvalidResponse.

Suggested fix
-    const tx_index: ?u32 = if (jsonGetString(obj, "transactionIndex")) |s|
-        parseHexU32(s) catch null
-    else
-        null;
+    const tx_index: ?u32 = if (jsonGetString(obj, "transactionIndex")) |s|
+        try parseHexU32(s)
+    else
+        null;

-    const v = try parseHexU256(jsonGetString(obj, "v") orelse "0x0");
+    const v = try parseHexU256(jsonGetString(obj, "v") orelse return error.InvalidResponse);

-    const type_val = parseHexU8(jsonGetString(obj, "type") orelse "0x0") catch 0;
+    const type_val: u8 = if (jsonGetString(obj, "type")) |t|
+        try parseHexU8(t)
+    else
+        0;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const tx_index: ?u32 = if (jsonGetString(obj, "transactionIndex")) |s|
parseHexU32(s) catch null
else
null;
const from_addr = (try parseOptionalAddress(jsonGetString(obj, "from"))) orelse return error.InvalidResponse;
const to_addr = try parseOptionalAddress(jsonGetString(obj, "to"));
const value = try parseHexU256(jsonGetString(obj, "value") orelse "0x0");
const gas = try parseHexU64(jsonGetString(obj, "gas") orelse return error.InvalidResponse);
const gas_price: ?u256 = if (jsonGetString(obj, "gasPrice")) |s| try parseHexU256(s) else null;
const max_fee: ?u256 = if (jsonGetString(obj, "maxFeePerGas")) |s| try parseHexU256(s) else null;
const max_priority: ?u256 = if (jsonGetString(obj, "maxPriorityFeePerGas")) |s| try parseHexU256(s) else null;
const max_blob_fee: ?u256 = if (jsonGetString(obj, "maxFeePerBlobGas")) |s| try parseHexU256(s) else null;
// `input` and `data` are aliases; geth uses `input`, parity used `data`.
const input_str = jsonGetString(obj, "input") orelse jsonGetString(obj, "data") orelse "0x";
const input = try parseHexBytes(allocator, input_str);
errdefer allocator.free(input);
const v = try parseHexU256(jsonGetString(obj, "v") orelse "0x0");
const r = try parseHash(jsonGetString(obj, "r") orelse return error.InvalidResponse);
const s = try parseHash(jsonGetString(obj, "s") orelse return error.InvalidResponse);
const type_val = parseHexU8(jsonGetString(obj, "type") orelse "0x0") catch 0;
const chain_id = try parseOptionalHexU64(jsonGetString(obj, "chainId"));
const tx_index: ?u32 = if (jsonGetString(obj, "transactionIndex")) |s|
try parseHexU32(s)
else
null;
const from_addr = (try parseOptionalAddress(jsonGetString(obj, "from"))) orelse return error.InvalidResponse;
const to_addr = try parseOptionalAddress(jsonGetString(obj, "to"));
const value = try parseHexU256(jsonGetString(obj, "value") orelse "0x0");
const gas = try parseHexU64(jsonGetString(obj, "gas") orelse return error.InvalidResponse);
const gas_price: ?u256 = if (jsonGetString(obj, "gasPrice")) |s| try parseHexU256(s) else null;
const max_fee: ?u256 = if (jsonGetString(obj, "maxFeePerGas")) |s| try parseHexU256(s) else null;
const max_priority: ?u256 = if (jsonGetString(obj, "maxPriorityFeePerGas")) |s| try parseHexU256(s) else null;
const max_blob_fee: ?u256 = if (jsonGetString(obj, "maxFeePerBlobGas")) |s| try parseHexU256(s) else null;
// `input` and `data` are aliases; geth uses `input`, parity used `data`.
const input_str = jsonGetString(obj, "input") orelse jsonGetString(obj, "data") orelse "0x";
const input = try parseHexBytes(allocator, input_str);
errdefer allocator.free(input);
const v = try parseHexU256(jsonGetString(obj, "v") orelse return error.InvalidResponse);
const r = try parseHash(jsonGetString(obj, "r") orelse return error.InvalidResponse);
const s = try parseHash(jsonGetString(obj, "s") orelse return error.InvalidResponse);
const type_val: u8 = if (jsonGetString(obj, "type")) |t|
try parseHexU8(t)
else
0;
const chain_id = try parseOptionalHexU64(jsonGetString(obj, "chainId"));
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/provider.zig` around lines 851 - 876, The code currently silently coerces
malformed fields (tx_index via parseHexU32, type_val via parseHexU8 with catch
0, and v via parseHexU256 with orelse "0x0"), which hides invalid RPC payloads;
update the parsing of transactionIndex (tx_index) to propagate failures instead
of mapping parseHexU32 errors to null (use try or orelse return
error.InvalidResponse), change type parsing (type_val) to return
error.InvalidResponse on parse failures rather than catch 0, and require v to be
present/parsable by replacing the orelse "0x0" with either try or orelse return
error.InvalidResponse; use jsonGetString, parseHexU32, parseHexU8, parseHexU256
and error.InvalidResponse to locate and fix these spots so malformed fields
cause immediate error.InvalidResponse instead of silent coercion.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Pending transaction subscription (newPendingTransactions full tx)

1 participant