Skip to content

Conversation

@ac-freeman
Copy link

I implemented simplified delivery timeouts for a research work 6 months ago. I asked Claude to help bring those changes into the latest version of the repo. This hasn't been completely tested yet, but my original implementation did work for my purposes. I haven't touched anything on the JavaScript side yet.

Paper (soon to be updated): https://arxiv.org/abs/2507.15673
Relevant issue: #678

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 21, 2026

Walkthrough

Delivery timeout support has been added throughout the system. A new DeliveryTimeout error variant was introduced to the Error enum. The Subscribe message structure was extended to include an optional delivery_timeout field for both IETF and lite protocols. The subscribe_track method now accepts an additional delivery_timeout parameter. The broadcast request channel now transmits TrackProducer paired with delivery_timeout instead of TrackProducer alone. FrameConsumer tracks arrival time, and TrackProducer includes a delivery_timeout field. Delivery timeout propagation flows through publisher and subscriber paths with associated error handling and timeout enforcement logic at the frame transmission level.

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 25.58% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title 'WIP: Delivery timeout' is a vague, generic label that doesn't clearly convey the specific changes made in this changeset. Provide a more descriptive title that summarizes the main feature being added, e.g., 'Add delivery timeout support for broadcast subscriptions' or 'Implement delivery timeout mechanism across publisher and subscriber paths'.
✅ Passed checks (1 passed)
Check name Status Explanation
Description check ✅ Passed The description provides context about the implementation being a reintroduction of prior work for research purposes, including relevant issue and paper references, which relates to the changeset's delivery timeout feature additions.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
rs/moq-lite/src/lite/subscribe.rs (1)

27-52: Reject invalid delivery_timeout flag values.
Right now any non-1 flag becomes None, silently accepting malformed data. This should error on invalid values to preserve protocol correctness.

🛠️ Proposed fix (validate flag with bool::decode / encode)
-		let delivery_timeout = if u8::decode(r, version)? == 1 {
-			Some(u64::decode(r, version)?)
-		} else {
-			None
-		};
+		let has_timeout = bool::decode(r, version)?;
+		let delivery_timeout = if has_timeout {
+			Some(u64::decode(r, version)?)
+		} else {
+			None
+		};

@@
-		if let Some(timeout) = self.delivery_timeout {
-			1u8.encode(w, version);
-			timeout.encode(w, version);
-		} else {
-			0u8.encode(w, version);
-		}
+		let has_timeout = self.delivery_timeout.is_some();
+		has_timeout.encode(w, version);
+		if let Some(timeout) = self.delivery_timeout {
+			timeout.encode(w, version);
+		}
rs/moq-lite/src/lite/subscriber.rs (1)

312-323: Delivery timeout isn’t enforced in run_frame yet.

_delivery_timeout is unused and the TODO remains, so inbound frames will never trigger a timeout. Please implement the check (using the chosen frame timestamp/arrival time) or remove the parameter until supported to avoid a false sense of enforcement.

If you want, I can draft the timeout check once you confirm which timestamp should be authoritative.

🤖 Fix all issues with AI agents
In `@rs/moq-lite/src/ietf/subscribe.rs`:
- Around line 83-87: The delivery_timeout presence flag currently decodes with
u8::decode(...) == 1 which silently accepts invalid values; change this to use
bool::decode(r, version) to validate the flag (or decode the u8 and explicitly
match 0/1 and return an Err for other values) and then conditionally decode the
u64 only when the flag is true; update the code around delivery_timeout
(referencing delivery_timeout, u8::decode, bool::decode, r, version) to reject
invalid encodings instead of treating any non-1 as absent.

In `@rs/moq-lite/src/ietf/subscriber.rs`:
- Around line 235-246: The code redundantly clones `track` into
`track_with_timeout` and reassigns `delivery_timeout`; remove the extra clone
and assignment and insert the original `track` directly into the subscription
state. Specifically, in the block around `let mut state = self.state.lock();`
and the `state.subscribes.insert(request_id, TrackState { ... })`, stop creating
`track_with_timeout` and use the existing `track` (which already has
`delivery_timeout` set) for the `producer` field of `TrackState`, leaving
`alias: None` unchanged.
🧹 Nitpick comments (3)
rs/moq-lite/src/model/track.rs (1)

53-65: Document delivery_timeout semantics (units/scope).
Public Option<u64> is ambiguous without units or definition (e.g., milliseconds vs ticks, per-frame vs per-group). A short doc comment would prevent misuse across call sites.

rs/moq-lite/src/lite/subscribe.rs (1)

13-19: Clarify delivery_timeout units in Subscribe.
A brief comment would help keep producers/consumers aligned on units and semantics.

rs/moq-lite/src/ietf/subscriber.rs (1)

400-411: TODO: Delivery timeout check not yet implemented.

The _delivery_timeout parameter is accepted but unused, with a TODO comment indicating future implementation using frame.info.sent_timestamp. This is acceptable for WIP.

Would you like me to help implement the delivery timeout check logic, or open an issue to track this task?

Comment on lines +83 to +87
let delivery_timeout = if u8::decode(r, version)? == 1 {
Some(u64::decode(r, version)?)
} else {
None
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Validate the delivery_timeout presence flag.

Using u8::decode + == 1 silently accepts invalid values. Prefer bool::decode (or an explicit match) so invalid encodings are rejected.

🛠️ Proposed fix
-		let delivery_timeout = if u8::decode(r, version)? == 1 {
-			Some(u64::decode(r, version)?)
-		} else {
-			None
-		};
+		let has_timeout = bool::decode(r, version)?;
+		let delivery_timeout = if has_timeout {
+			Some(u64::decode(r, version)?)
+		} else {
+			None
+		};
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let delivery_timeout = if u8::decode(r, version)? == 1 {
Some(u64::decode(r, version)?)
} else {
None
};
let has_timeout = bool::decode(r, version)?;
let delivery_timeout = if has_timeout {
Some(u64::decode(r, version)?)
} else {
None
};
🤖 Prompt for AI Agents
In `@rs/moq-lite/src/ietf/subscribe.rs` around lines 83 - 87, The delivery_timeout
presence flag currently decodes with u8::decode(...) == 1 which silently accepts
invalid values; change this to use bool::decode(r, version) to validate the flag
(or decode the u8 and explicitly match 0/1 and return an Err for other values)
and then conditionally decode the u64 only when the flag is true; update the
code around delivery_timeout (referencing delivery_timeout, u8::decode,
bool::decode, r, version) to reject invalid encodings instead of treating any
non-1 as absent.

Comment on lines +235 to 246
// Set this track subscription to carry the delivery timeout
let mut track_with_timeout = track.clone();
track_with_timeout.delivery_timeout = delivery_timeout;

let mut state = self.state.lock();
state.subscribes.insert(
request_id,
TrackState {
producer: track.clone(),
producer: track_with_timeout.clone(),
alias: None,
},
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find TrackProducer definition
rg -n -A 30 'pub struct TrackProducer' --type rust

echo "---"

# Check for Clone implementation
rg -n 'impl Clone for TrackProducer' --type rust -A 20

Repository: moq-dev/moq

Length of output: 3727


🏁 Script executed:

#!/bin/bash
# Also check if TrackProducer has #[derive(Clone)]
rg -n -B 5 'pub struct TrackProducer' --type rust | head -20

Repository: moq-dev/moq

Length of output: 899


🏁 Script executed:

#!/bin/bash
# Check broadcast.rs at line 214 area to understand initial delivery_timeout setup
rg -n -B 5 -A 5 'delivery_timeout' rs/moq-lite/src/ietf/broadcast.rs | head -40

Repository: moq-dev/moq

Length of output: 128


🏁 Script executed:

#!/bin/bash
# Find broadcast.rs
find . -name 'broadcast.rs' -type f

# Also look for where delivery_timeout is being set
rg -n 'delivery_timeout.*=' --type rust | head -30

Repository: moq-dev/moq

Length of output: 1598


🏁 Script executed:

#!/bin/bash
# Check the context in subscriber.rs around lines 235-246
cat -n rs/moq-lite/src/ietf/subscriber.rs | sed -n '230,250p'

Repository: moq-dev/moq

Length of output: 814


🏁 Script executed:

#!/bin/bash
# Look for the requested_track method in broadcast to see how delivery_timeout is set
rg -n -B 5 -A 10 'fn requested_track\|pub fn requested_track' --type rust

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

#!/bin/bash
# Show the tokio::select block in subscriber.rs
cat -n rs/moq-lite/src/ietf/subscriber.rs | sed -n '220,240p'

Repository: moq-dev/moq

Length of output: 956


🏁 Script executed:

#!/bin/bash
# Check broadcast.rs around line 214 to see context
cat -n rs/moq-lite/src/model/broadcast.rs | sed -n '200,230p'

Repository: moq-dev/moq

Length of output: 1315


🏁 Script executed:

#!/bin/bash
# Look for the method that returns/sends the track from broadcast
rg -n -B 20 'producer.delivery_timeout = delivery_timeout' rs/moq-lite/src/model/broadcast.rs | head -50

Repository: moq-dev/moq

Length of output: 856


🏁 Script executed:

#!/bin/bash
# Find the requested_track method definition in broadcast
rg -n -B 5 -A 20 'fn requested_track\|pub fn requested_track' rs/moq-lite/src/model/broadcast.rs

Repository: moq-dev/moq

Length of output: 37


🏁 Script executed:

#!/bin/bash
# Check if there's any additional context about how track is used after assignment
cat -n rs/moq-lite/src/ietf/subscriber.rs | sed -n '235,260p'

Repository: moq-dev/moq

Length of output: 933


Remove redundant delivery_timeout assignment (lines 236-237).

The track received from broadcast.requested_track() is a clone of the producer created at broadcast.rs:209-214, which already has delivery_timeout set. Since TrackProducer derives Clone, the field is preserved. The assignment at line 237 is redundant and can be removed; use track directly instead of creating track_with_timeout.

🤖 Prompt for AI Agents
In `@rs/moq-lite/src/ietf/subscriber.rs` around lines 235 - 246, The code
redundantly clones `track` into `track_with_timeout` and reassigns
`delivery_timeout`; remove the extra clone and assignment and insert the
original `track` directly into the subscription state. Specifically, in the
block around `let mut state = self.state.lock();` and the
`state.subscribes.insert(request_id, TrackState { ... })`, stop creating
`track_with_timeout` and use the existing `track` (which already has
`delivery_timeout` set) for the `producer` field of `TrackState`, leaving
`alias: None` unchanged.

@kixelated
Copy link
Collaborator

Hey @ac-freeman I appreciate the PR. I've implemented this probably 4 times already and I'm never happy with the result, lul.

I'd like to make delivery timeout work with fanout. So when the relay gets a subscription with a timeout of 1s, it subscribes upstream with 1s timeout. But when another request comes in with 5s timeout, it updates the existing subscription to 5s. That requires piping a lot of stuff through the model so we actually have some feedback on what consumers want, not just what producers are making.

@ac-freeman
Copy link
Author

So, upstream_timeout = max(downstream_timeouts)? The upstream timeout would need to be updated when a subscriber drops, if it had the largest timeout. Wouldn't this also cascade with large fan-out? If you have one relay five layers deep that subscribes with a 60s timeout, that would bubble all the way up. Should the non-relay subscriber timeouts also affect upstream subscriptions? I just see a risk for a rogue subscriber hurting the latency for everyone.

The original publisher timeout should probably also take precedence over the first relay's subscription timeout. They might want to have some say in what the latency regime is.

@kixelated
Copy link
Collaborator

kixelated commented Feb 5, 2026

So, upstream_timeout = max(downstream_timeouts)? The upstream timeout would need to be updated when a subscriber drops, if it had the largest timeout. Wouldn't this also cascade with large fan-out? If you have one relay five layers deep that subscribes with a 60s timeout, that would bubble all the way up. Should the non-relay subscriber timeouts also affect upstream subscriptions? I just see a risk for a rogue subscriber hurting the latency for everyone.

The original publisher timeout should probably also take precedence over the first relay's subscription timeout. They might want to have some say in what the latency regime is.

Yeah, my plan is to have the publisher announce a TTL for each track. The relay should keep the data around for this amount of time, even if there are no active subscribers, just so new subscribers can get access. A relay could drop the cache early (LRU) if they're filling to refill it.

Each subscriber would have its own TTL upper bound to the publisher's TTL. And note this TTL would be based on when the content was received/produced, not when the subscription started.

I just see a risk for a rogue subscriber hurting the latency for everyone.

That's my issue with relying on TTLs; you can't handle publisher congestion if there's a VOD worker with a target latency of 30s alongside a real-time user with a target latency of 1s. I use priorities (descending group order) instead so the publisher deliver the latest content first (for the 1s viewer) and later backfills old content (for the VOD worker).

For me, Delivery Timeout is an optimization to avoid transmitting old data that will be immediately dropped by the receiver. It's not meant to drain the queue during congestion because the queue is being actively prioritized.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants