Skip to content

Speed up node boot times by parallelizing buffer acquisition#19025

Open
jtuglu1 wants to merge 2 commits intoapache:masterfrom
jtuglu1:speed-up-broker-launch-time
Open

Speed up node boot times by parallelizing buffer acquisition#19025
jtuglu1 wants to merge 2 commits intoapache:masterfrom
jtuglu1:speed-up-broker-launch-time

Conversation

@jtuglu1
Copy link
Contributor

@jtuglu1 jtuglu1 commented Feb 14, 2026

Description

Brokers/Historicals/Peons currently allocate buffers serially on boot. For large quantities of larger buffers (100+ buffers @ ~2GB per buffer) this can mean waiting for several minutes (in our case, upwards of 6mins for brokers, 5+ seconds for peons) just to acquire the memory needed, which isn't great. This is because it is effectively doing 100 sequential malloc/mmap calls each needing 2GB zero'd out memory. This change parallelizes the acquisition of the buffers proportional to the number of cores available on the machine. IntStream threadpool is temporary and released once finished (this happens before the broker comes online and is serving queries anyways).

This is very helpful for both deployments and auto-scaling as it means newly-added nodes can more quickly begin providing value to the cluster. This, for example, can help with task launch times. For applications that run with -XX:+AlwaysPreTouch, this will be even slower as the JVM pre-allocate/touch the necessary memory pages.

Once compiler version is ≥ 21, we can consider MemorySegment API and allocate a single "wide" buffer and slice into that. That should be one large malloc/mmap call which should return much faster.

Benchmarks

Allocate 10, 100 merge buffers at @ 2GB each, using optimal JVM memory flags on JDK 21 compiled on JDK 17:

Before

Benchmark                                              (bufferCount)  (bufferSizeBytes)  Mode  Cnt       Score      Error  Units
DefaultBlockingPoolConstructorBenchmark.constructPool             10         2000000000    ss    5   11860.140 ±   84.047  ms/op
DefaultBlockingPoolConstructorBenchmark.constructPool            100         2000000000    ss    5  118666.210 ± 1413.217  ms/op

After

Benchmark                                              (bufferCount)  (bufferSizeBytes)  Mode  Cnt     Score     Error  Units
DefaultBlockingPoolConstructorBenchmark.constructPool             10         2000000000    ss    5  1407.045 ±  11.297  ms/op
DefaultBlockingPoolConstructorBenchmark.constructPool            100         2000000000    ss    5  5329.244 ± 723.182  ms/op

Overall, this results in a measured ~10x reduction in launch time in the worst case, to 100x in the best case. Brokers now boot ~7mins faster and peons ~5s faster on our workfloads.

Benchmark File
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.druid.benchmark.collections;

import com.google.common.base.Supplier;
import org.apache.druid.collections.DefaultBlockingPool;
import org.apache.druid.java.util.common.ByteBufferUtils;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.openjdk.jmh.runner.options.TimeValue;

import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Measures how long {@link DefaultBlockingPool}'s constructor takes to pre-allocate N buffers.
 *
 * This is meant to reflect broker/historical startup behavior, which constructs a merge buffer pool backed by direct
 * {@link ByteBuffer}s.
 */
@BenchmarkMode(Mode.SingleShotTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Warmup(iterations = 3)
@Measurement(iterations = 5)
@Fork(1)
@State(Scope.Thread)
public class DefaultBlockingPoolConstructorBenchmark
{
  @Param({"10", "100", "500"})
  public int bufferCount;

  /**
   * Size of each direct buffer to allocate.
   *
   * Note that {@link ByteBuffer} capacities are limited to {@link Integer#MAX_VALUE}, so a "2GB" buffer cannot be
   * exactly 2GiB (2,147,483,648). Use a value <= 2,147,483,647.
   */
  @Param({"2000000000"})
  public int bufferSizeBytes;

  private Supplier<ByteBuffer> generator;
  private DefaultBlockingPool<ByteBuffer> pool;
  private ByteBuffer[] allocatedBuffers;

  @Setup(Level.Invocation)
  public void setupInvocation()
  {
    allocatedBuffers = new ByteBuffer[bufferCount];
    final AtomicInteger allocateIndex = new AtomicInteger(0);
    generator = () -> {
      final int i = allocateIndex.getAndIncrement();
      final ByteBuffer buffer = ByteBuffer.allocateDirect(bufferSizeBytes);
      // generator is invoked from a parallel stream in DefaultBlockingPool ctor
      if (i >= 0 && i < allocatedBuffers.length) {
        allocatedBuffers[i] = buffer;
      }
      return buffer;
    };
    pool = null;
  }

  @TearDown(Level.Invocation)
  public void tearDownInvocation()
  {
    pool = null;
    if (allocatedBuffers != null) {
      for (ByteBuffer buffer : allocatedBuffers) {
        if (buffer != null) {
          ByteBufferUtils.free(buffer);
        }
      }
      allocatedBuffers = null;
    }
  }

  @Benchmark
  public void constructPool(final Blackhole blackhole)
  {
    pool = new DefaultBlockingPool<>(generator, bufferCount);
    blackhole.consume(pool);
  }

  /**
   * Convenience entrypoint to run from an IDE without building the shaded benchmarks jar.
   *
   * Note: to run with large direct buffers, set your IDE run configuration VM options, e.g.
   * {@code -Xmx200g -XX:MaxDirectMemorySize=200g}.
   */
  public static void main(String[] args) throws RunnerException
  {
    final Options opt = new OptionsBuilder()
        .include(".*" + DefaultBlockingPoolConstructorBenchmark.class.getSimpleName() + ".*")
        .forks(1)
        .warmupIterations(3)
        .warmupTime(TimeValue.seconds(1))
        .measurementIterations(5)
        .measurementTime(TimeValue.seconds(1))
        .build();

    new Runner(opt).run();
  }
}

Release note

Speed up broker boot times by parallelizing merger buffer initialization


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@jtuglu1 jtuglu1 marked this pull request as ready for review February 14, 2026 20:57
Brokers currently allocate buffers serially on boot. For large amounts of buffer (100+ buffers) this can mean waiting for several minutes to acquire the memory needed. This change parallelizes the acquisition of the buffers.
@jtuglu1 jtuglu1 force-pushed the speed-up-broker-launch-time branch from 1fed5c0 to 408e2ff Compare February 15, 2026 20:40
Copy link
Contributor

@maytasm maytasm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@jtuglu1
Copy link
Contributor Author

jtuglu1 commented Feb 16, 2026

I will need to adjust approach since the test failures are due to deadlock/starvation on usage of common pool for the .parallel(). Given the gains, I think it's fine to spin up a temporary FJP to do the allocations for both temp/merge buffers.

@abhishekrb19
Copy link
Contributor

Brokers currently allocate buffers serially on boot. Speed up broker boot times by parallelizing merger buffer initialization

Just a quick clarification - isn't this applicable to all servers, including Historicals and Peons? Or did you notice the bottleneck primarily on the Brokers?

@jtuglu1
Copy link
Contributor Author

jtuglu1 commented Feb 17, 2026

Brokers currently allocate buffers serially on boot. Speed up broker boot times by parallelizing merger buffer initialization

Just a quick clarification - isn't this applicable to all servers, including Historicals and Peons? Or did you notice the bottleneck primarily on the Brokers?

Yeah this will speed up all servers that pre-alloc some # of buffers.

@jtuglu1 jtuglu1 changed the title Speed up broker boot times by parallelizing buffer acquisition Speed up node boot times by parallelizing buffer acquisition Feb 17, 2026
@kfaraz
Copy link
Contributor

kfaraz commented Feb 19, 2026

@jtuglu1 , to address the CI failures, would it make sense to not fully parallelize the allocation and instead use batches of size equal to the number of cores? Claude seems to think that this would reduce the contention in the JVM direct mem allocator.

@jtuglu1
Copy link
Contributor Author

jtuglu1 commented Feb 19, 2026

@jtuglu1 , to address the CI failures, would it make sense to not fully parallelize the allocation and instead use batches of size equal to the number of cores? Claude seems to think that this would reduce the contention in the JVM direct mem allocator.

I think this can still, in the worst case, run into issues since you're not guaranteeing a completion deadline on the buffer alloc tasks. This means you can still occupy the common thread pool threads which might cause other deadlock issues. To resolve this, I've created a temporary FJP to perform the allocs. Normally, doing this sort of thing would be prohibitive, however FJP threads are created lazily and there are only at most 2 production usages of this pool per node, so we're spinning up at most 2 dedicated, short-lived allocation pools per node only once (on boot) which I think is reasonable, LMK if you disagree.

An alternative would be to make a static, shared FJP in the class.

@jtuglu1 jtuglu1 requested a review from kfaraz February 19, 2026 20:15
@jtuglu1
Copy link
Contributor Author

jtuglu1 commented Feb 19, 2026

@gianm @clintropolis any thoughts here?

@jtuglu1 jtuglu1 force-pushed the speed-up-broker-launch-time branch from ade2804 to 05e7af5 Compare February 19, 2026 21:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants

Comments