Skip to content

Commit 17876f9

Browse files
committed
Thrift: result-set heartbeat to prevent operation-handle idle eviction
The Databricks SQL warehouse reaps a query's operation handle after roughly 20-30 minutes of driver idleness. Once that happens, any subsequent TFetchResults against the handle returns HTTP 404 / RESOURCE_DOES_NOT_EXIST and the result set is permanently broken -- the driver's retry policy classifies the error as non-retryable, so the user sees their query fail even though the data has already been computed. This affects any workflow where a consumer iterates slowly over a large result set: Notebook sessions, BI tools paginating user-facing tables, anywhere a fetch can sit idle for tens of minutes between calls. Adds a per-ThriftResultSet daemon thread that periodically calls GetOperationStatus while rows are still pending on the server, mirroring the C# ADBC driver's DatabricksOperationStatusPoller and the JDBC work in databricks-jdbc PR #1415. - New: ResultHeartbeatManager (backend/thrift_result_heartbeat_manager.py). Daemon thread, 60s default interval, 10-consecutive-failure stop, terminal-state self-stop. Bypasses make_request retry budget so a transient failure can't stall inside a single poll. - New: ThriftDatabricksClient._heartbeat_poll helper (acquires the existing _request_lock; bypasses make_request). - New: Connection kwargs enable_heartbeat (default True) and heartbeat_interval_seconds (default 60). - Wired into ThriftResultSet: started in __init__ when the server still has rows to deliver; stopped (a) the moment _fill_results_buffer sees has_more_rows flip to False, (b) ResultSet.close(), (c) transitively from Cursor.close()/Connection.close(). - 16 new unit tests (8 manager + 8 wiring), 3 new env-var-gated e2e tests. Verified end-to-end on a real warehouse against all three Thrift result dispositions (Arrow inline, cloud-fetch, pre-Arrow Column inline): heartbeat=False + 30-min idle reliably reproduces the 404; heartbeat=True (default) succeeds with ~29 GetOperationStatus polls during the idle window. Co-authored-by: Isaac Signed-off-by: Sreekanth Vadigi <sreekanth.vadigi@databricks.com>
1 parent cbd6a88 commit 17876f9

7 files changed

Lines changed: 685 additions & 0 deletions

File tree

src/databricks/sql/backend/thrift_backend.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -669,6 +669,20 @@ def _poll_for_status(self, op_handle):
669669
)
670670
return self.make_request(self._client.GetOperationStatus, req)
671671

672+
def _heartbeat_poll(self, op_handle):
673+
"""
674+
Single-shot GetOperationStatus for the result-set heartbeat. Bypasses
675+
make_request() so a transient failure does NOT stall inside the
676+
driver's long retry budget — ResultHeartbeatManager counts failures
677+
itself and self-stops after MAX_CONSECUTIVE_FAILURES.
678+
"""
679+
req = ttypes.TGetOperationStatusReq(
680+
operationHandle=op_handle,
681+
getProgressUpdate=False,
682+
)
683+
with self._request_lock:
684+
return self._client.GetOperationStatus(req)
685+
672686
def _create_arrow_table(self, t_row_set, lz4_compressed, schema_bytes, description):
673687
if t_row_set.columns is not None:
674688
(
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
"""
2+
Background heartbeat for the Thrift backend.
3+
4+
Why this exists
5+
---------------
6+
The warehouse evicts an operation/command handle after roughly 20-25 minutes
7+
of driver idleness. Once that happens, any subsequent TFetchResults against
8+
the handle returns HTTP 404 / RESOURCE_DOES_NOT_EXIST and the result set is
9+
permanently broken — the driver's retry policy classifies the error as
10+
non-retryable.
11+
12+
This manager keeps the handle alive while a consumer is slowly draining
13+
results. While a ThriftResultSet has rows still pending on the server, a
14+
daemon thread issues a periodic GetOperationStatus against the operation
15+
handle. The keepalive stops as soon as the server has finished delivering
16+
data (last TFetchResults returns hasMoreRows=False) or the result set is
17+
closed.
18+
19+
Design mirrors the C# ADBC driver's DatabricksOperationStatusPoller.
20+
"""
21+
22+
from __future__ import annotations
23+
24+
import logging
25+
import threading
26+
from typing import Optional
27+
28+
from databricks.sql.thrift_api.TCLIService import ttypes
29+
30+
logger = logging.getLogger(__name__)
31+
32+
33+
class ResultHeartbeatManager:
34+
"""Per-ResultSet background keepalive against operation-handle eviction."""
35+
36+
DEFAULT_INTERVAL_SECONDS = 60
37+
DEFAULT_STOP_TIMEOUT_SECONDS = 5.0
38+
MAX_CONSECUTIVE_FAILURES = 10
39+
40+
# Operation states that indicate the server has released the handle (or
41+
# is about to). No point continuing to heartbeat against any of these.
42+
# FINISHED_STATE is intentionally NOT terminal: it means query execution
43+
# finished but the handle is still alive for result streaming.
44+
_TERMINAL_STATES = frozenset(
45+
{
46+
ttypes.TOperationState.CANCELED_STATE,
47+
ttypes.TOperationState.CLOSED_STATE,
48+
ttypes.TOperationState.ERROR_STATE,
49+
ttypes.TOperationState.TIMEDOUT_STATE,
50+
ttypes.TOperationState.UKNOWN_STATE,
51+
}
52+
)
53+
54+
def __init__(
55+
self,
56+
*,
57+
backend,
58+
op_handle,
59+
interval_seconds: int,
60+
statement_id_hex: str,
61+
) -> None:
62+
self._backend = backend
63+
self._op_handle = op_handle
64+
self._interval_seconds = interval_seconds
65+
self._statement_id_hex = statement_id_hex
66+
self._stop_event = threading.Event()
67+
self._thread: Optional[threading.Thread] = None
68+
self._consecutive_failures = 0
69+
# Successful poll count — exposed for tests / ad-hoc debugging.
70+
# Intentionally not surfaced through the telemetry pipeline; see the
71+
# plan's "Telemetry: not added" section for why.
72+
self._poll_count = 0
73+
self._lock = threading.Lock()
74+
75+
def start(self) -> None:
76+
"""
77+
Spawn the daemon thread. Calling twice is a no-op with a warning —
78+
not an exception, because this guard sits in ResultSet construction
79+
and a defensive failure should not abort the user's query.
80+
"""
81+
with self._lock:
82+
if self._thread is not None:
83+
logger.warning(
84+
"ResultHeartbeatManager.start() called twice for "
85+
"statement %s; ignoring",
86+
self._statement_id_hex,
87+
)
88+
return
89+
self._thread = threading.Thread(
90+
target=self._run,
91+
name="databricks-sql-heartbeat-%s" % self._statement_id_hex,
92+
daemon=True,
93+
)
94+
self._thread.start()
95+
logger.debug(
96+
"heartbeat manager started for statement %s " "(interval=%ss)",
97+
self._statement_id_hex,
98+
self._interval_seconds,
99+
)
100+
101+
def stop(self, timeout: float = DEFAULT_STOP_TIMEOUT_SECONDS) -> None:
102+
"""
103+
Signal the loop to exit, then join with a bounded timeout.
104+
105+
Idempotent. If the join elapses without the thread terminating
106+
(e.g. wedged in a blocking socket rea_fill_results_bufferd), emit a single DEBUG log
107+
line and return — the daemon thread will die with the interpreter.
108+
"""
109+
with self._lock:
110+
self._stop_event.set()
111+
thread = self._thread
112+
if thread is None:
113+
return
114+
thread.join(timeout=timeout)
115+
if thread.is_alive():
116+
logger.debug(
117+
"heartbeat thread for statement %s did not terminate "
118+
"within %ss; letting daemon thread die with interpreter",
119+
self._statement_id_hex,
120+
timeout,
121+
)
122+
123+
def _run(self) -> None:
124+
# Event.wait returns True if the event was set during the wait
125+
# (i.e. stop was signaled), in which case we exit cleanly.
126+
while not self._stop_event.wait(self._interval_seconds):
127+
if not self._poll_once():
128+
return
129+
130+
def _poll_once(self) -> bool:
131+
"""
132+
Issue a single GetOperationStatus. Return True to keep polling,
133+
False to self-stop the manager.
134+
"""
135+
try:
136+
resp = self._backend._heartbeat_poll(self._op_handle)
137+
except Exception as e:
138+
self._consecutive_failures += 1
139+
logger.debug(
140+
"heartbeat poll failed for statement %s "
141+
"(consecutive_failures=%d): %s",
142+
self._statement_id_hex,
143+
self._consecutive_failures,
144+
e,
145+
)
146+
if self._consecutive_failures >= self.MAX_CONSECUTIVE_FAILURES:
147+
logger.warning(
148+
"heartbeat manager stopping after %d consecutive "
149+
"failures for statement %s",
150+
self._consecutive_failures,
151+
self._statement_id_hex,
152+
)
153+
return False
154+
return True
155+
156+
self._consecutive_failures = 0
157+
self._poll_count += 1
158+
state = getattr(resp, "operationState", None)
159+
logger.debug(
160+
"heartbeat poll ok for statement %s (state=%s)",
161+
self._statement_id_hex,
162+
ttypes.TOperationState._VALUES_TO_NAMES.get(state, state),
163+
)
164+
if state in self._TERMINAL_STATES:
165+
logger.debug(
166+
"heartbeat poll for statement %s observed terminal "
167+
"operation state %s; stopping",
168+
self._statement_id_hex,
169+
ttypes.TOperationState._VALUES_TO_NAMES.get(state, state),
170+
)
171+
return False
172+
return True

src/databricks/sql/client.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,14 @@ def read(self) -> Optional[OAuthToken]:
264264
# (True by default)
265265
# use_cloud_fetch
266266
# Enable use of cloud fetch to extract large query results in parallel via cloud storage
267+
# enable_heartbeat
268+
# When True (default), each Thrift ResultSet that still has rows pending on the server
269+
# spawns a daemon thread that periodically issues TGetOperationStatus against the
270+
# operation handle to keep it alive past the warehouse's idle-eviction window
271+
# (~20-25 min). Pass enable_heartbeat=False to opt out.
272+
# heartbeat_interval_seconds
273+
# Interval between heartbeat polls in seconds (default 60). Has no effect when
274+
# enable_heartbeat is False.
267275

268276
logger.debug(
269277
"Connection.__init__(server_hostname=%s, http_path=%s)",
@@ -295,6 +303,11 @@ def read(self) -> Optional[OAuthToken]:
295303
self.disable_pandas = kwargs.get("_disable_pandas", False)
296304
self.lz4_compression = kwargs.get("enable_query_result_lz4_compression", True)
297305
self.use_cloud_fetch = kwargs.get("use_cloud_fetch", True)
306+
# Per-ResultSet background GetOperationStatus keepalive against
307+
# server-side operation-handle idle eviction. See
308+
# backend/thrift_result_heartbeat_manager.py for details.
309+
self.enable_heartbeat = kwargs.get("enable_heartbeat", True)
310+
self.heartbeat_interval_seconds = kwargs.get("heartbeat_interval_seconds", 60)
298311
self._cursors = [] # type: List[Cursor]
299312
self.telemetry_batch_size = kwargs.get(
300313
"telemetry_batch_size", TelemetryClientFactory.DEFAULT_BATCH_SIZE

src/databricks/sql/result_set.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,14 @@ def fetchall_arrow(self) -> "pyarrow.Table":
163163
"""Fetch all remaining rows as an Arrow table."""
164164
pass
165165

166+
def _stop_heartbeat(self) -> None:
167+
"""
168+
Stop any background heartbeat associated with this result set.
169+
170+
Base-class no-op; the Thrift result set overrides this.
171+
"""
172+
return None
173+
166174
def close(self) -> None:
167175
"""
168176
Close the result set.
@@ -171,6 +179,10 @@ def close(self) -> None:
171179
been closed on the server for some other reason, issue a request to the server to close it.
172180
"""
173181
try:
182+
# Stop the heartbeat BEFORE close_command so the manager doesn't
183+
# race against the close RPC over the same Thrift transport.
184+
self._stop_heartbeat()
185+
174186
if self.results is not None:
175187
self.results.close()
176188
else:
@@ -222,6 +234,10 @@ def __init__(
222234
:param has_more_rows: Whether there are more rows to fetch
223235
"""
224236
self.num_chunks = 0
237+
# Initialize before any code path that could call _stop_heartbeat
238+
# (e.g. _fill_results_buffer below, if the initial fetch flips
239+
# has_more_rows to False).
240+
self._heartbeat_manager = None
225241

226242
# Initialize ThriftResultSet-specific attributes
227243
self._use_cloud_fetch = use_cloud_fetch
@@ -270,6 +286,45 @@ def __init__(
270286
if not self.results:
271287
self._fill_results_buffer()
272288

289+
# Start the background keepalive once the result set is fully
290+
# constructed and we know the server still has more rows to deliver.
291+
# This must happen AFTER the initial _fill_results_buffer above,
292+
# because that call may flip has_more_rows to False.
293+
if self._heartbeat_eligible():
294+
from databricks.sql.backend.thrift_result_heartbeat_manager import (
295+
ResultHeartbeatManager,
296+
)
297+
298+
self._heartbeat_manager = ResultHeartbeatManager(
299+
backend=self.backend,
300+
op_handle=self.command_id.to_thrift_handle(),
301+
interval_seconds=connection.heartbeat_interval_seconds,
302+
statement_id_hex=self.command_id.to_hex_guid(),
303+
)
304+
self._heartbeat_manager.start()
305+
306+
def _heartbeat_eligible(self) -> bool:
307+
if not getattr(self.connection, "enable_heartbeat", False):
308+
return False
309+
if self.has_been_closed_server_side:
310+
return False
311+
if not self.has_more_rows:
312+
return False
313+
# Defensive: command_id can be None in tests / mocks. Also,
314+
# to_thrift_handle returns None for non-Thrift command IDs.
315+
if self.command_id is None:
316+
return False
317+
return self.command_id.to_thrift_handle() is not None
318+
319+
def _stop_heartbeat(self) -> None:
320+
manager = self._heartbeat_manager
321+
if manager is None:
322+
return
323+
# Clear the attribute first so re-entry is a no-op even if stop()
324+
# itself is slow.
325+
self._heartbeat_manager = None
326+
manager.stop()
327+
273328
def _fill_results_buffer(self):
274329
results, has_more_rows, result_links_count = self.backend.fetch_results(
275330
command_id=self.command_id,
@@ -286,6 +341,13 @@ def _fill_results_buffer(self):
286341
self.has_more_rows = has_more_rows
287342
self.num_chunks += result_links_count
288343

344+
# Server has finished delivering rows for this statement — no point
345+
# keeping the operation handle alive even if the local buffer still
346+
# holds rows the consumer hasn't drained. Matches C# ADBC's stop at
347+
# end-of-results inside ReadNextRecordBatchAsync.
348+
if not has_more_rows:
349+
self._stop_heartbeat()
350+
289351
def _convert_columnar_table(self, table):
290352
column_names = [c[0] for c in self.description]
291353
ResultRow = Row(*column_names)

0 commit comments

Comments
 (0)