Skip to content

Conversation

@mzabaluev
Copy link
Contributor

Rationale for this change

The PartitionEvaluator implementation for NthValue in DataFusion has a few shortcomings:

  • When nulls are ignored (meaning the count should skip over them), the evaluation collects an array of all valid indices, to select at most one index accordingly to the First/Last/Nth case.
  • The memoize implementation gives up in the same condition, even after performing part of the logic!

What changes are included in this PR?

Use only as much iteration over the valid indices as needed for the function case, without collecting all indices.
The memoize implementation does the right thing for FirstValue with ignore_nulls set to true, or returns early for other function cases.

Are these changes tested?

All existing tests pass for FirstValue/LastValue/NthValue.

Are there any user-facing changes?

No.

Instead of collecting all valid indices per batch in PartitionEvaluator
for NthValue, use the iterator as appropriate for the case.
Even tn the worst case of negative index larger than 1, only a sliding
window of N last valid indices is needed.
Handle the case when FirstValue is called with ignore_nulls set to true,
can prune the partition on the first non-null value.
Also return early for the other function cases in the same condition,
rather than grinding some logic only to discard the results.
@github-actions github-actions bot added the functions Changes to functions implementation label Dec 25, 2025
Copy link
Contributor

@Jefffrey Jefffrey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good if there were some benchmarks to check this is indeed a performance boost

Comment on lines 475 to 476
let slice = array.slice(range.start, n_range);
if let Some(nulls) = slice.nulls() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also be more strict by checking if the null_count instead of simply checking for the presence of the null buffer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That will be another branch, and the iterator returned by nulls.valid_indices() being empty will do the right thing anyway?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can chain it like

            if let Some(nulls) = slice.nulls()
                && nulls.null_count() > 0
            {

and the iterator returned by nulls.valid_indices() being empty will do the right thing anyway?

But I think it would still iterate through the whole null buffer anyway? So if we're looking via a performance lens perhaps this approach is worth considering.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, I somehow confused the null count check with valid_indices retrieving no values, while it's the opposite. Also I'm a bit new to Arrow/Datafusion, so it's not clear to me if there are non-pathological scenarios that could produce an array with the null buffer present, but containing no nulls.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would indeed be a weird/pedantic case; however here it might be appropriate as we are slicing the original array so it could be likely we have an array with nulls but slice into a region with no nulls 🤔

// for the sliding window that will be discarded in the end.
return None;
}
let mut window = VecDeque::with_capacity(reverse_index);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reverse the iterator of valid_indices() to avoid need of this queue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, BitIndexIterator is not bidirectional.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The queue is a bad solution, indeed. I think a simple ring buffer will do much better here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, BitIndexIterator is not bidirectional.

Ah you're right, I was mistakenly thinking of BitIterator

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we do something like this it can work without needing a separate container?

Ordering::Less => {
    let reverse_index = (-self.n) as usize;
    let total_len = nulls.len();
    let null_count = nulls.null_count();
    let valid_indices_len = total_len - null_count;
    if reverse_index > valid_indices_len {
        return None;
    }
    nulls
        .valid_indices()
        .nth(valid_indices_len - reverse_index)
        .map(|idx| idx + offset)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! With it, the implementation is faster across the board accordingly to the added benchmark.

Comment on lines +374 to +397
if self.ignore_nulls {
match self.state.kind {
// Prune on first non-null output in case of FIRST_VALUE
NthValueKind::First => {
if let Some(nulls) = out.nulls() {
if self.state.finalized_result.is_none() {
if let Some(valid_index) = nulls.valid_indices().next() {
let result =
ScalarValue::try_from_array(out, valid_index)?;
self.state.finalized_result = Some(result);
} else {
// The output is empty or all nulls, ignore
}
}
if state.window_frame_range.start < state.window_frame_range.end {
state.window_frame_range.start =
state.window_frame_range.end - 1;
}
return Ok(());
} else {
// Fall through to the main case because there are no nulls
}
}
// Do not memoize for other kinds when nulls are ignored
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic here is really hard to follow with all the nesting present

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Between early returns and falling through to the main case, I don't know if I can make it more readable.

@mzabaluev
Copy link
Contributor Author

Would be good if there were some benchmarks to check this is indeed a performance boost

I have added a benchmark. It shows significant improvement in many cases, but it also shows that the VecDeque is problematic. I will try to replace it with a ring buffer.

@Jefffrey
Copy link
Contributor

Jefffrey commented Jan 4, 2026

Would be good if there were some benchmarks to check this is indeed a performance boost

I have added a benchmark. It shows significant improvement in many cases, but it also shows that the VecDeque is problematic. I will try to replace it with a ring buffer.

Would you be able to post the benchmark results for us to see?

@mzabaluev
Copy link
Contributor Author

Benchmark results against the branch base
nth_value_ignore_nulls/first_value_expanding/0%_nulls
                        time:   [229.32 µs 229.97 µs 230.68 µs]
                        change: [−4.1052% −3.6770% −3.2233%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 11 outliers among 100 measurements (11.00%)
  3 (3.00%) low mild
  7 (7.00%) high mild
  1 (1.00%) high severe
nth_value_ignore_nulls/last_value_expanding/0%_nulls
                        time:   [229.63 µs 230.18 µs 230.78 µs]
                        change: [−4.1494% −3.7747% −3.3808%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 10 outliers among 100 measurements (10.00%)
  2 (2.00%) low mild
  8 (8.00%) high mild
nth_value_ignore_nulls/nth_value_10_expanding/0%_nulls
                        time:   [231.01 µs 231.38 µs 231.78 µs]
                        change: [−3.6131% −3.3527% −3.0648%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 5 outliers among 100 measurements (5.00%)
  3 (3.00%) high mild
  2 (2.00%) high severe
nth_value_ignore_nulls/nth_value_neg10_expanding/0%_nulls
                        time:   [231.72 µs 232.23 µs 232.83 µs]
                        change: [−3.2292% −2.9837% −2.7387%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 7 outliers among 100 measurements (7.00%)
  1 (1.00%) low mild
  3 (3.00%) high mild
  3 (3.00%) high severe
nth_value_ignore_nulls/first_value_sliding_100/0%_nulls
                        time:   [231.63 µs 231.99 µs 232.41 µs]
                        change: [−3.1696% −2.9230% −2.6831%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 6 outliers among 100 measurements (6.00%)
  2 (2.00%) low mild
  2 (2.00%) high mild
  2 (2.00%) high severe
nth_value_ignore_nulls/last_value_sliding_100/0%_nulls
                        time:   [231.89 µs 232.27 µs 232.71 µs]
                        change: [−3.3218% −3.0671% −2.8237%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 7 outliers among 100 measurements (7.00%)
  1 (1.00%) low severe
  1 (1.00%) low mild
  5 (5.00%) high mild
nth_value_ignore_nulls/first_value_expanding/30%_nulls
                        time:   [519.53 µs 521.02 µs 522.69 µs]
                        change: [−98.032% −98.025% −98.018%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 5 outliers among 100 measurements (5.00%)
  1 (1.00%) low mild
  3 (3.00%) high mild
  1 (1.00%) high severe
nth_value_ignore_nulls/last_value_expanding/30%_nulls
                        time:   [21.887 ms 21.925 ms 21.970 ms]
                        change: [−15.961% −15.752% −15.531%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 7 outliers among 100 measurements (7.00%)
  2 (2.00%) high mild
  5 (5.00%) high severe
nth_value_ignore_nulls/nth_value_10_expanding/30%_nulls
                        time:   [605.66 µs 606.99 µs 608.56 µs]
                        change: [−97.648% −97.640% −97.632%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 7 outliers among 100 measurements (7.00%)
  1 (1.00%) low severe
  1 (1.00%) low mild
  5 (5.00%) high mild
nth_value_ignore_nulls/nth_value_neg10_expanding/30%_nulls
                        time:   [12.734 ms 12.765 ms 12.804 ms]
                        change: [−50.084% −49.902% −49.690%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 15 outliers among 100 measurements (15.00%)
  10 (10.00%) high mild
  5 (5.00%) high severe
nth_value_ignore_nulls/first_value_sliding_100/30%_nulls
                        time:   [459.39 µs 460.95 µs 462.60 µs]
                        change: [−73.979% −73.883% −73.779%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) high mild
Benchmarking nth_value_ignore_nulls/last_value_sliding_100/30%_nulls: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.5s, enable flat sampling, or reduce sample count to 60.
nth_value_ignore_nulls/last_value_sliding_100/30%_nulls
                        time:   [1.0836 ms 1.0861 ms 1.0889 ms]
                        change: [−37.896% −37.683% −37.418%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 3 outliers among 100 measurements (3.00%)
  1 (1.00%) high mild
  2 (2.00%) high severe
nth_value_ignore_nulls/first_value_expanding/50%_nulls
                        time:   [526.57 µs 528.12 µs 529.98 µs]
                        change: [−96.396% −96.382% −96.369%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 7 outliers among 100 measurements (7.00%)
  1 (1.00%) low mild
  1 (1.00%) high mild
  5 (5.00%) high severe
nth_value_ignore_nulls/last_value_expanding/50%_nulls
                        time:   [10.949 ms 10.972 ms 11.000 ms]
                        change: [−24.558% −24.348% −24.116%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 6 outliers among 100 measurements (6.00%)
  2 (2.00%) high mild
  4 (4.00%) high severe
nth_value_ignore_nulls/nth_value_10_expanding/50%_nulls
                        time:   [622.68 µs 624.14 µs 625.84 µs]
                        change: [−95.905% −95.890% −95.876%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 3 outliers among 100 measurements (3.00%)
  3 (3.00%) high mild
nth_value_ignore_nulls/nth_value_neg10_expanding/50%_nulls
                        time:   [8.6783 ms 8.6978 ms 8.7208 ms]
                        change: [−42.546% −42.387% −42.196%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 8 outliers among 100 measurements (8.00%)
  5 (5.00%) low mild
  3 (3.00%) high severe
nth_value_ignore_nulls/first_value_sliding_100/50%_nulls
                        time:   [464.08 µs 465.38 µs 466.74 µs]
                        change: [−66.353% −66.223% −66.101%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 9 outliers among 100 measurements (9.00%)
  2 (2.00%) low mild
  5 (5.00%) high mild
  2 (2.00%) high severe
nth_value_ignore_nulls/last_value_sliding_100/50%_nulls
                        time:   [929.50 µs 931.49 µs 933.57 µs]
                        change: [−36.937% −36.708% −36.466%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 12 outliers among 100 measurements (12.00%)
  2 (2.00%) low mild
  7 (7.00%) high mild
  3 (3.00%) high severe
nth_value_ignore_nulls/first_value_expanding/80%_nulls
                        time:   [526.65 µs 527.57 µs 528.64 µs]
                        change: [−90.400% −90.362% −90.327%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 8 outliers among 100 measurements (8.00%)
  1 (1.00%) low severe
  3 (3.00%) low mild
  2 (2.00%) high mild
  2 (2.00%) high severe
nth_value_ignore_nulls/last_value_expanding/80%_nulls
                        time:   [3.0262 ms 3.0319 ms 3.0383 ms]
                        change: [−41.230% −41.085% −40.926%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 3 outliers among 100 measurements (3.00%)
  1 (1.00%) low mild
  2 (2.00%) high severe
nth_value_ignore_nulls/nth_value_10_expanding/80%_nulls
                        time:   [614.14 µs 615.48 µs 616.94 µs]
                        change: [−88.219% −88.182% −88.145%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 12 outliers among 100 measurements (12.00%)
  1 (1.00%) low severe
  3 (3.00%) low mild
  4 (4.00%) high mild
  4 (4.00%) high severe
nth_value_ignore_nulls/nth_value_neg10_expanding/80%_nulls
                        time:   [2.6999 ms 2.7116 ms 2.7247 ms]
                        change: [−48.929% −48.694% −48.420%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 13 outliers among 100 measurements (13.00%)
  1 (1.00%) high mild
  12 (12.00%) high severe
nth_value_ignore_nulls/first_value_sliding_100/80%_nulls
                        time:   [467.46 µs 468.83 µs 470.30 µs]
                        change: [−56.178% −56.001% −55.832%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild
nth_value_ignore_nulls/last_value_sliding_100/80%_nulls
                        time:   [708.05 µs 710.02 µs 712.01 µs]
                        change: [−33.955% −33.664% −33.402%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 8 outliers among 100 measurements (8.00%)
  1 (1.00%) low severe
  4 (4.00%) low mild
  1 (1.00%) high mild
  2 (2.00%) high severe

nth_value_nulls_comparison/first_value/ignore_nulls
                        time:   [530.18 µs 531.19 µs 532.34 µs]
                        change: [−96.352% −96.344% −96.335%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 8 outliers among 100 measurements (8.00%)
  2 (2.00%) low severe
  1 (1.00%) low mild
  3 (3.00%) high mild
  2 (2.00%) high severe
nth_value_nulls_comparison/first_value/respect_nulls
                        time:   [73.979 µs 74.062 µs 74.151 µs]
                        change: [−1.9098% −1.6540% −1.3922%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 8 outliers among 100 measurements (8.00%)
  7 (7.00%) high mild
  1 (1.00%) high severe
nth_value_nulls_comparison/nth_value_10/ignore_nulls
                        time:   [621.26 µs 622.50 µs 623.92 µs]
                        change: [−95.832% −95.817% −95.799%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 6 outliers among 100 measurements (6.00%)
  1 (1.00%) low severe
  2 (2.00%) low mild
  2 (2.00%) high mild
  1 (1.00%) high severe
nth_value_nulls_comparison/nth_value_10/respect_nulls
                        time:   [74.362 µs 74.470 µs 74.615 µs]
                        change: [−1.2308% −0.9994% −0.7589%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 9 outliers among 100 measurements (9.00%)
  2 (2.00%) low mild
  2 (2.00%) high mild
  5 (5.00%) high severe

Copy link
Contributor

@Jefffrey Jefffrey left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The evaluate looks good to me; I have some questions around memoize but they're mainly because I am not as familiar with this part of the codebase 😅

Comment on lines +377 to +379
NthValueKind::First => {
if let Some(nulls) = out.nulls() {
if self.state.finalized_result.is_none() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a small pedantic case here; so if we have no null buffer (no nulls) then we fallback to existing behaviour below. If we have a null buffer, we try memoize if we haven't already but we'll still return from this point onwards (we won't fallback). But if the null buffer has no nulls then we don't fall through like before, we just handle the return here.

I guess it is the same result and I don't think it's too much of a concern at runtime, but it's just a potential path I find inconsistent when reading this code flow 🤔

Comment on lines +381 to +383
let result =
ScalarValue::try_from_array(out, valid_index)?;
self.state.finalized_result = Some(result);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I notice for below we memoize the last value in the out column whereas here we do the first; does it make a difference?

Comment on lines +388 to +391
if state.window_frame_range.start < state.window_frame_range.end {
state.window_frame_range.start =
state.window_frame_range.end - 1;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this equivalent to the is_prunable check (for NthValueKind::First) where it ensures n_range > 0 and then if pruning it does state.window_frame_range.start = state.window_frame_range.end.saturating_sub(buffer_size)?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

functions Changes to functions implementation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants