Skip to content

Commit 58b796f

Browse files
chore(bench): kernel-vs-Thrift performance baseline harness
One-shot benchmark script under scripts/ that runs each (backend × SQL-shape) combination N+1 times against a live warehouse, drops the first run (cache warm-up), and reports min/median/max for session-open, time-to-first-row, drain, and RSS delta. Not a CI gate — single-machine, single-warehouse, high-variance script meant to be re-run by hand when we want a baseline. Shapes: - SELECT 1 (pure round-trip latency, no data) - range(10k) (inline result, ~10K rows) - range(1M) (crosses CloudFetch threshold; currently panics on the kernel backend — see kernel issue #19, nested block_on bug) - wide-uuid(100k) (wider rows, Arrow serialization) - metadata.catalogs (metadata round-trip) Output is a Markdown table you can paste into a PR. Run with: set -a && source ~/.databricks/pecotesting-creds && set +a export DATABRICKS_SERVER_HOSTNAME=${DATABRICKS_HOST#https://} .venv/bin/python scripts/bench_kernel_vs_thrift.py Co-authored-by: Isaac Signed-off-by: Vikrant Puppala <vikrant.puppala@databricks.com>
1 parent aa8bd24 commit 58b796f

1 file changed

Lines changed: 294 additions & 0 deletions

File tree

scripts/bench_kernel_vs_thrift.py

Lines changed: 294 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,294 @@
1+
"""Benchmark the kernel-backed connector against the Thrift backend.
2+
3+
One-shot script, not a CI gate. Runs each (backend × SQL-shape)
4+
combination N+1 times against a live warehouse, drops the first
5+
run (cache warm-up), and reports min / median / max wall-clock for
6+
session-open, time-to-first-row, drain, and RSS delta.
7+
8+
Usage:
9+
10+
set -a && source ~/.databricks/pecotesting-creds && set +a
11+
# If DATABRICKS_HOST is set but DATABRICKS_SERVER_HOSTNAME is
12+
# not, normalise it (matches the e2e suite's convention).
13+
export DATABRICKS_SERVER_HOSTNAME=${DATABRICKS_SERVER_HOSTNAME:-${DATABRICKS_HOST#https://}}
14+
.venv/bin/python scripts/bench_kernel_vs_thrift.py
15+
16+
Honest disclaimers:
17+
- Single warehouse, single machine, single network route. High
18+
server-side variance is expected.
19+
- Server-side caches warm differently between back-to-back runs;
20+
the first-run-drop helps but doesn't eliminate it.
21+
- Comparison is **kernel-backed vs Thrift**. The pure-Python
22+
native SEA backend (``backend/sea/``) is no longer reachable via
23+
``use_sea=True`` after this PR, so it's not included.
24+
- RSS delta is process-wide and includes pyarrow tables we hold
25+
in scope during the drain. Two-orders-of-magnitude differences
26+
are signal; 10% differences are noise.
27+
28+
The output is a Markdown table you can paste into a PR
29+
description.
30+
"""
31+
32+
from __future__ import annotations
33+
34+
import argparse
35+
import gc
36+
import os
37+
import resource
38+
import statistics
39+
import sys
40+
import time
41+
from dataclasses import dataclass
42+
from typing import Callable, Dict, List, Optional, Tuple
43+
44+
import databricks.sql as sql
45+
46+
47+
# ─── Config ──────────────────────────────────────────────────────
48+
49+
50+
@dataclass(frozen=True)
51+
class Shape:
52+
name: str
53+
sql: Optional[str] # None means it's a metadata call
54+
metadata_call: Optional[str] # e.g. "catalogs"
55+
expected_rows: Optional[int] # None when we don't assert
56+
57+
58+
SHAPES: List[Shape] = [
59+
Shape("SELECT 1", "SELECT 1 AS n", None, 1),
60+
Shape("range(10k)", "SELECT * FROM range(10000)", None, 10_000),
61+
Shape("range(1M)", "SELECT * FROM range(1000000)", None, 1_000_000),
62+
Shape(
63+
"wide-uuid(100k)",
64+
"SELECT id, uuid() AS u FROM range(100000)",
65+
None,
66+
100_000,
67+
),
68+
Shape("metadata.catalogs", None, "catalogs", None),
69+
]
70+
71+
72+
BACKENDS: List[Tuple[str, Dict]] = [
73+
("thrift", {"use_sea": False}),
74+
("kernel", {"use_sea": True}),
75+
]
76+
77+
78+
# ─── Measurement ─────────────────────────────────────────────────
79+
80+
81+
@dataclass
82+
class SampleMetrics:
83+
open_s: float
84+
ttfr_s: float
85+
drain_s: float
86+
rows: int
87+
rss_delta_kb: int
88+
89+
90+
def _rss_kb() -> int:
91+
# ru_maxrss is in KB on Linux, bytes on macOS — the script is
92+
# primarily for Linux CI / dev shells, document the macOS
93+
# caveat and move on.
94+
return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
95+
96+
97+
def run_one(backend_kwargs: Dict, shape: Shape, conn_params: Dict) -> SampleMetrics:
98+
"""Open a fresh connection, run the shape, drain, return metrics."""
99+
gc.collect()
100+
rss_before = _rss_kb()
101+
102+
t0 = time.perf_counter()
103+
conn = sql.connect(**conn_params, **backend_kwargs)
104+
t_open = time.perf_counter()
105+
try:
106+
cur = conn.cursor()
107+
try:
108+
t_pre_exec = time.perf_counter()
109+
if shape.sql is not None:
110+
cur.execute(shape.sql)
111+
else:
112+
getattr(cur, shape.metadata_call)()
113+
# First row marks the end of poll + first-fetch latency.
114+
first = cur.fetchmany(1)
115+
t_ttfr = time.perf_counter()
116+
# Drain the rest.
117+
tail_rows = 0
118+
while True:
119+
chunk = cur.fetchmany(10_000)
120+
if not chunk:
121+
break
122+
tail_rows += len(chunk)
123+
t_drain = time.perf_counter()
124+
total_rows = len(first) + tail_rows
125+
if shape.expected_rows is not None and total_rows != shape.expected_rows:
126+
raise RuntimeError(
127+
f"{shape.name}: expected {shape.expected_rows} rows, got {total_rows}"
128+
)
129+
finally:
130+
cur.close()
131+
finally:
132+
conn.close()
133+
134+
rss_after = _rss_kb()
135+
return SampleMetrics(
136+
open_s=t_open - t0,
137+
ttfr_s=t_ttfr - t_pre_exec,
138+
drain_s=t_drain - t_pre_exec,
139+
rows=total_rows,
140+
rss_delta_kb=max(0, rss_after - rss_before),
141+
)
142+
143+
144+
# ─── Aggregation ─────────────────────────────────────────────────
145+
146+
147+
@dataclass
148+
class Aggregated:
149+
open_min: float
150+
open_med: float
151+
open_max: float
152+
ttfr_min: float
153+
ttfr_med: float
154+
ttfr_max: float
155+
drain_min: float
156+
drain_med: float
157+
drain_max: float
158+
rows: int
159+
rss_med_kb: int
160+
161+
162+
def aggregate(samples: List[SampleMetrics]) -> Aggregated:
163+
o = [s.open_s for s in samples]
164+
t = [s.ttfr_s for s in samples]
165+
d = [s.drain_s for s in samples]
166+
r = [s.rss_delta_kb for s in samples]
167+
return Aggregated(
168+
open_min=min(o), open_med=statistics.median(o), open_max=max(o),
169+
ttfr_min=min(t), ttfr_med=statistics.median(t), ttfr_max=max(t),
170+
drain_min=min(d), drain_med=statistics.median(d), drain_max=max(d),
171+
rows=samples[0].rows,
172+
rss_med_kb=int(statistics.median(r)),
173+
)
174+
175+
176+
def fmt_ms(seconds: float) -> str:
177+
return f"{seconds * 1000:.0f}"
178+
179+
180+
def fmt_rate(rows: int, seconds: float) -> str:
181+
if seconds <= 0:
182+
return "—"
183+
return f"{int(rows / seconds):,}"
184+
185+
186+
# ─── Driver ──────────────────────────────────────────────────────
187+
188+
189+
def build_conn_params() -> Dict:
190+
host = os.environ.get("DATABRICKS_SERVER_HOSTNAME") or os.environ.get("DATABRICKS_HOST", "")
191+
host = host.replace("https://", "").rstrip("/")
192+
http_path = os.environ.get("DATABRICKS_HTTP_PATH", "")
193+
token = os.environ.get("DATABRICKS_TOKEN", "")
194+
if not (host and http_path and token):
195+
sys.exit(
196+
"Missing credentials. Set DATABRICKS_SERVER_HOSTNAME (or _HOST), "
197+
"DATABRICKS_HTTP_PATH, DATABRICKS_TOKEN before running."
198+
)
199+
return {
200+
"server_hostname": host,
201+
"http_path": http_path,
202+
"access_token": token,
203+
}
204+
205+
206+
def main() -> int:
207+
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
208+
parser.add_argument(
209+
"--samples", type=int, default=5,
210+
help="Sample runs per (backend, shape). First run is dropped as warm-up. Default: 5.",
211+
)
212+
parser.add_argument(
213+
"--shapes", nargs="*",
214+
help="Subset of shapes to run by name. Default: all. Choices: " +
215+
", ".join(s.name for s in SHAPES),
216+
)
217+
parser.add_argument(
218+
"--backends", nargs="*", choices=[b for b, _ in BACKENDS],
219+
help="Subset of backends. Default: both.",
220+
)
221+
args = parser.parse_args()
222+
223+
conn_params = build_conn_params()
224+
shapes = [s for s in SHAPES if not args.shapes or s.name in args.shapes]
225+
backends = [(n, k) for (n, k) in BACKENDS if not args.backends or n in args.backends]
226+
227+
if not shapes:
228+
sys.exit(f"No shapes match {args.shapes!r}")
229+
if not backends:
230+
sys.exit(f"No backends match {args.backends!r}")
231+
232+
# results[(shape_name, backend_name)] = Aggregated
233+
results: Dict[Tuple[str, str], Aggregated] = {}
234+
235+
total_runs = len(shapes) * len(backends) * (args.samples + 1)
236+
print(f"Running {total_runs} samples ({len(shapes)} shapes × {len(backends)} backends × {args.samples + 1} runs/cell)\n", flush=True)
237+
238+
for shape in shapes:
239+
for backend_name, backend_kwargs in backends:
240+
print(f" {shape.name:24s} on {backend_name:8s} … ", end="", flush=True)
241+
samples: List[SampleMetrics] = []
242+
# +1 because we drop the first run.
243+
for run_idx in range(args.samples + 1):
244+
try:
245+
m = run_one(backend_kwargs, shape, conn_params)
246+
except Exception as exc:
247+
print(f"\n run {run_idx} FAILED: {exc}", flush=True)
248+
raise
249+
if run_idx == 0:
250+
continue # warmup
251+
samples.append(m)
252+
agg = aggregate(samples)
253+
results[(shape.name, backend_name)] = agg
254+
print(
255+
f"open={fmt_ms(agg.open_med)}ms "
256+
f"ttfr={fmt_ms(agg.ttfr_med)}ms "
257+
f"drain={fmt_ms(agg.drain_med)}ms "
258+
f"rows={agg.rows:,} "
259+
f"rss+={agg.rss_med_kb}kb",
260+
flush=True,
261+
)
262+
263+
# ─── Report ─────────────────────────────────────────────────
264+
print("\n" + "=" * 70)
265+
print("Results (median across {} samples; warm-up dropped):".format(args.samples))
266+
print("=" * 70)
267+
for shape in shapes:
268+
print(f"\n### {shape.name}")
269+
if shape.metadata_call:
270+
print(f"_metadata: cursor.{shape.metadata_call}()_")
271+
else:
272+
print(f"_SQL: `{shape.sql}`_")
273+
print()
274+
print("| backend | open (ms) | ttfr (ms) | drain (ms) | rows/s | rss Δ (KB) |")
275+
print("|---|---|---|---|---|---|")
276+
for backend_name, _ in backends:
277+
agg = results.get((shape.name, backend_name))
278+
if agg is None:
279+
print(f"| {backend_name} | (skipped) | | | | |")
280+
continue
281+
print(
282+
f"| {backend_name} | "
283+
f"{fmt_ms(agg.open_med)} ({fmt_ms(agg.open_min)}{fmt_ms(agg.open_max)}) | "
284+
f"{fmt_ms(agg.ttfr_med)} ({fmt_ms(agg.ttfr_min)}{fmt_ms(agg.ttfr_max)}) | "
285+
f"{fmt_ms(agg.drain_med)} ({fmt_ms(agg.drain_min)}{fmt_ms(agg.drain_max)}) | "
286+
f"{fmt_rate(agg.rows, agg.drain_med)} | "
287+
f"{agg.rss_med_kb} |"
288+
)
289+
290+
return 0
291+
292+
293+
if __name__ == "__main__":
294+
raise SystemExit(main())

0 commit comments

Comments
 (0)