-
Notifications
You must be signed in to change notification settings - Fork 142
WIP: Delivery timeout #856
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughDelivery timeout support has been added throughout the system. A new 🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
rs/moq-lite/src/lite/subscribe.rs (1)
27-52: Reject invalid delivery_timeout flag values.
Right now any non-1flag becomesNone, silently accepting malformed data. This should error on invalid values to preserve protocol correctness.🛠️ Proposed fix (validate flag with bool::decode / encode)
- let delivery_timeout = if u8::decode(r, version)? == 1 { - Some(u64::decode(r, version)?) - } else { - None - }; + let has_timeout = bool::decode(r, version)?; + let delivery_timeout = if has_timeout { + Some(u64::decode(r, version)?) + } else { + None + }; @@ - if let Some(timeout) = self.delivery_timeout { - 1u8.encode(w, version); - timeout.encode(w, version); - } else { - 0u8.encode(w, version); - } + let has_timeout = self.delivery_timeout.is_some(); + has_timeout.encode(w, version); + if let Some(timeout) = self.delivery_timeout { + timeout.encode(w, version); + }rs/moq-lite/src/lite/subscriber.rs (1)
312-323: Delivery timeout isn’t enforced in run_frame yet.
_delivery_timeoutis unused and the TODO remains, so inbound frames will never trigger a timeout. Please implement the check (using the chosen frame timestamp/arrival time) or remove the parameter until supported to avoid a false sense of enforcement.If you want, I can draft the timeout check once you confirm which timestamp should be authoritative.
🤖 Fix all issues with AI agents
In `@rs/moq-lite/src/ietf/subscribe.rs`:
- Around line 83-87: The delivery_timeout presence flag currently decodes with
u8::decode(...) == 1 which silently accepts invalid values; change this to use
bool::decode(r, version) to validate the flag (or decode the u8 and explicitly
match 0/1 and return an Err for other values) and then conditionally decode the
u64 only when the flag is true; update the code around delivery_timeout
(referencing delivery_timeout, u8::decode, bool::decode, r, version) to reject
invalid encodings instead of treating any non-1 as absent.
In `@rs/moq-lite/src/ietf/subscriber.rs`:
- Around line 235-246: The code redundantly clones `track` into
`track_with_timeout` and reassigns `delivery_timeout`; remove the extra clone
and assignment and insert the original `track` directly into the subscription
state. Specifically, in the block around `let mut state = self.state.lock();`
and the `state.subscribes.insert(request_id, TrackState { ... })`, stop creating
`track_with_timeout` and use the existing `track` (which already has
`delivery_timeout` set) for the `producer` field of `TrackState`, leaving
`alias: None` unchanged.
🧹 Nitpick comments (3)
rs/moq-lite/src/model/track.rs (1)
53-65: Documentdelivery_timeoutsemantics (units/scope).
PublicOption<u64>is ambiguous without units or definition (e.g., milliseconds vs ticks, per-frame vs per-group). A short doc comment would prevent misuse across call sites.rs/moq-lite/src/lite/subscribe.rs (1)
13-19: Clarifydelivery_timeoutunits in Subscribe.
A brief comment would help keep producers/consumers aligned on units and semantics.rs/moq-lite/src/ietf/subscriber.rs (1)
400-411: TODO: Delivery timeout check not yet implemented.The
_delivery_timeoutparameter is accepted but unused, with a TODO comment indicating future implementation usingframe.info.sent_timestamp. This is acceptable for WIP.Would you like me to help implement the delivery timeout check logic, or open an issue to track this task?
| let delivery_timeout = if u8::decode(r, version)? == 1 { | ||
| Some(u64::decode(r, version)?) | ||
| } else { | ||
| None | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Validate the delivery_timeout presence flag.
Using u8::decode + == 1 silently accepts invalid values. Prefer bool::decode (or an explicit match) so invalid encodings are rejected.
🛠️ Proposed fix
- let delivery_timeout = if u8::decode(r, version)? == 1 {
- Some(u64::decode(r, version)?)
- } else {
- None
- };
+ let has_timeout = bool::decode(r, version)?;
+ let delivery_timeout = if has_timeout {
+ Some(u64::decode(r, version)?)
+ } else {
+ None
+ };📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let delivery_timeout = if u8::decode(r, version)? == 1 { | |
| Some(u64::decode(r, version)?) | |
| } else { | |
| None | |
| }; | |
| let has_timeout = bool::decode(r, version)?; | |
| let delivery_timeout = if has_timeout { | |
| Some(u64::decode(r, version)?) | |
| } else { | |
| None | |
| }; |
🤖 Prompt for AI Agents
In `@rs/moq-lite/src/ietf/subscribe.rs` around lines 83 - 87, The delivery_timeout
presence flag currently decodes with u8::decode(...) == 1 which silently accepts
invalid values; change this to use bool::decode(r, version) to validate the flag
(or decode the u8 and explicitly match 0/1 and return an Err for other values)
and then conditionally decode the u64 only when the flag is true; update the
code around delivery_timeout (referencing delivery_timeout, u8::decode,
bool::decode, r, version) to reject invalid encodings instead of treating any
non-1 as absent.
| // Set this track subscription to carry the delivery timeout | ||
| let mut track_with_timeout = track.clone(); | ||
| track_with_timeout.delivery_timeout = delivery_timeout; | ||
|
|
||
| let mut state = self.state.lock(); | ||
| state.subscribes.insert( | ||
| request_id, | ||
| TrackState { | ||
| producer: track.clone(), | ||
| producer: track_with_timeout.clone(), | ||
| alias: None, | ||
| }, | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find TrackProducer definition
rg -n -A 30 'pub struct TrackProducer' --type rust
echo "---"
# Check for Clone implementation
rg -n 'impl Clone for TrackProducer' --type rust -A 20Repository: moq-dev/moq
Length of output: 3727
🏁 Script executed:
#!/bin/bash
# Also check if TrackProducer has #[derive(Clone)]
rg -n -B 5 'pub struct TrackProducer' --type rust | head -20Repository: moq-dev/moq
Length of output: 899
🏁 Script executed:
#!/bin/bash
# Check broadcast.rs at line 214 area to understand initial delivery_timeout setup
rg -n -B 5 -A 5 'delivery_timeout' rs/moq-lite/src/ietf/broadcast.rs | head -40Repository: moq-dev/moq
Length of output: 128
🏁 Script executed:
#!/bin/bash
# Find broadcast.rs
find . -name 'broadcast.rs' -type f
# Also look for where delivery_timeout is being set
rg -n 'delivery_timeout.*=' --type rust | head -30Repository: moq-dev/moq
Length of output: 1598
🏁 Script executed:
#!/bin/bash
# Check the context in subscriber.rs around lines 235-246
cat -n rs/moq-lite/src/ietf/subscriber.rs | sed -n '230,250p'Repository: moq-dev/moq
Length of output: 814
🏁 Script executed:
#!/bin/bash
# Look for the requested_track method in broadcast to see how delivery_timeout is set
rg -n -B 5 -A 10 'fn requested_track\|pub fn requested_track' --type rustRepository: moq-dev/moq
Length of output: 37
🏁 Script executed:
#!/bin/bash
# Show the tokio::select block in subscriber.rs
cat -n rs/moq-lite/src/ietf/subscriber.rs | sed -n '220,240p'Repository: moq-dev/moq
Length of output: 956
🏁 Script executed:
#!/bin/bash
# Check broadcast.rs around line 214 to see context
cat -n rs/moq-lite/src/model/broadcast.rs | sed -n '200,230p'Repository: moq-dev/moq
Length of output: 1315
🏁 Script executed:
#!/bin/bash
# Look for the method that returns/sends the track from broadcast
rg -n -B 20 'producer.delivery_timeout = delivery_timeout' rs/moq-lite/src/model/broadcast.rs | head -50Repository: moq-dev/moq
Length of output: 856
🏁 Script executed:
#!/bin/bash
# Find the requested_track method definition in broadcast
rg -n -B 5 -A 20 'fn requested_track\|pub fn requested_track' rs/moq-lite/src/model/broadcast.rsRepository: moq-dev/moq
Length of output: 37
🏁 Script executed:
#!/bin/bash
# Check if there's any additional context about how track is used after assignment
cat -n rs/moq-lite/src/ietf/subscriber.rs | sed -n '235,260p'Repository: moq-dev/moq
Length of output: 933
Remove redundant delivery_timeout assignment (lines 236-237).
The track received from broadcast.requested_track() is a clone of the producer created at broadcast.rs:209-214, which already has delivery_timeout set. Since TrackProducer derives Clone, the field is preserved. The assignment at line 237 is redundant and can be removed; use track directly instead of creating track_with_timeout.
🤖 Prompt for AI Agents
In `@rs/moq-lite/src/ietf/subscriber.rs` around lines 235 - 246, The code
redundantly clones `track` into `track_with_timeout` and reassigns
`delivery_timeout`; remove the extra clone and assignment and insert the
original `track` directly into the subscription state. Specifically, in the
block around `let mut state = self.state.lock();` and the
`state.subscribes.insert(request_id, TrackState { ... })`, stop creating
`track_with_timeout` and use the existing `track` (which already has
`delivery_timeout` set) for the `producer` field of `TrackState`, leaving
`alias: None` unchanged.
|
Hey @ac-freeman I appreciate the PR. I've implemented this probably 4 times already and I'm never happy with the result, lul. I'd like to make delivery timeout work with fanout. So when the relay gets a subscription with a timeout of 1s, it subscribes upstream with 1s timeout. But when another request comes in with 5s timeout, it updates the existing subscription to 5s. That requires piping a lot of stuff through the model so we actually have some feedback on what consumers want, not just what producers are making. |
|
So, upstream_timeout = max(downstream_timeouts)? The upstream timeout would need to be updated when a subscriber drops, if it had the largest timeout. Wouldn't this also cascade with large fan-out? If you have one relay five layers deep that subscribes with a 60s timeout, that would bubble all the way up. Should the non-relay subscriber timeouts also affect upstream subscriptions? I just see a risk for a rogue subscriber hurting the latency for everyone. The original publisher timeout should probably also take precedence over the first relay's subscription timeout. They might want to have some say in what the latency regime is. |
Yeah, my plan is to have the publisher announce a TTL for each track. The relay should keep the data around for this amount of time, even if there are no active subscribers, just so new subscribers can get access. A relay could drop the cache early (LRU) if they're filling to refill it. Each subscriber would have its own TTL upper bound to the publisher's TTL. And note this TTL would be based on when the content was received/produced, not when the subscription started.
That's my issue with relying on TTLs; you can't handle publisher congestion if there's a VOD worker with a target latency of 30s alongside a real-time user with a target latency of 1s. I use priorities (descending group order) instead so the publisher deliver the latest content first (for the 1s viewer) and later backfills old content (for the VOD worker). For me, Delivery Timeout is an optimization to avoid transmitting old data that will be immediately dropped by the receiver. It's not meant to drain the queue during congestion because the queue is being actively prioritized. |
I implemented simplified delivery timeouts for a research work 6 months ago. I asked Claude to help bring those changes into the latest version of the repo. This hasn't been completely tested yet, but my original implementation did work for my purposes. I haven't touched anything on the JavaScript side yet.
Paper (soon to be updated): https://arxiv.org/abs/2507.15673
Relevant issue: #678