[PECOBLR-2321] Result Set Heartbeat / Keep-Alive for Ongoing Query Executions#1415
[PECOBLR-2321] Result Set Heartbeat / Keep-Alive for Ongoing Query Executions#1415gopalldb wants to merge 32 commits into
Conversation
Design doc for PECOBLR-2321: periodic heartbeat polling to keep server-side result state alive while the client consumes results slowly. Key design points: - Periodic GetStatementStatus (SEA) / GetOperationStatus (Thrift) - Opt-in via EnableHeartbeat connection parameter (default false) - Configurable interval (default 60s, aligned with ADBC C# driver) - Zero-leak guarantee: stops on ResultSet.close, Statement.close, Connection.close, end-of-results, or server terminal state - Error resilience: 10 consecutive failures before self-stop - Includes cross-driver survey (ADBC C#, Python, Go, Node.js) - Mermaid diagrams: sequence flows, state machine, class diagram Signed-off-by: Gopal Lal <gopal.lal@databricks.com> Co-authored-by: Isaac Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Add periodic heartbeat polling to keep server-side result state alive
while the client consumes results slowly. Prevents warehouse auto-stop
from destroying in-progress results.
New files:
- ResultHeartbeatManager: per-connection manager with shared
ScheduledExecutorService (daemon thread). Manages start/stop/shutdown
lifecycle for heartbeats across all statements.
- ResultHeartbeatManagerTest: 7 unit tests covering lifecycle,
idempotency, interval, shutdown, re-execution.
Connection parameters:
- EnableHeartbeat (default 0): opt-in to enable heartbeat polling
- HeartbeatIntervalSeconds (default 60): polling interval
Protocol support:
- SEA: GET /sql/statements/{id} via checkStatementAlive()
- Thrift: GetOperationStatus via checkStatementAlive()
Heartbeat eligibility (skipped when not needed):
- SEA inline (InlineJsonResult): all data in memory, no server state
- Update count / metadata results: no data to keep alive
- Direct results: server already closed the operation
- Null execution result: nothing to fetch
Error resilience:
- 10 consecutive failures before self-stop (transient error tolerance)
- Single success resets failure counter
- Terminal states (CLOSED/ERROR/CANCELED/TIMEDOUT) stop heartbeat
Cleanup guarantees (zero leak):
- next() returns false → stop
- ResultSet.close() → stop
- Statement.close() → safety net stop
- Connection.close() → shutdown all
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Metadata operations (getTables, getColumns, etc.) can return large result sets that may need heartbeat. Only UPDATE statements are excluded (they return update count, no data to keep alive). Signed-off-by: Gopal Lal <gopal.lal@databricks.com> Co-authored-by: Isaac Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Direct results mean the server already closed the operation and delivered all data inline. No heartbeat needed — detect via ExecutionState.CLOSED in the constructor rather than waiting for the first poll to discover it. Signed-off-by: Gopal Lal <gopal.lal@databricks.com> Co-authored-by: Isaac Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
- No heartbeat during executeAsync() wait — user controls polling via getExecutionResult(). Heartbeat starts only when ResultSet is constructed and user begins consuming results. - Updated eligibility table: SEA inline doesn't need heartbeat (all data in memory), added update count row. - Added execution phase vs consumption phase diagram. Signed-off-by: Gopal Lal <gopal.lal@databricks.com> Co-authored-by: Isaac Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Signed-off-by: Gopal Lal <gopal.lal@databricks.com> Co-authored-by: Isaac Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Extract isHeartbeatEligible() as package-visible method for testing. Add 10 tests covering all eligibility/ineligibility scenarios: Eligible (heartbeat starts): - SEA cloud fetch (Arrow) — statement alive for URL refresh - Thrift inline — data fetched on-demand, server can evict - Thrift cloud fetch (Arrow) — operation handle alive - Metadata queries — can return large result sets Ineligible (heartbeat skipped): - SEA inline (JSON) — all data in memory - Direct results (CLOSED) — server already closed - Update count (DML) — no result rows - Null execution result — nothing to fetch - Async PENDING — user controls polling - Async RUNNING — user controls polling Signed-off-by: Gopal Lal <gopal.lal@databricks.com> Co-authored-by: Isaac Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
1. consecutiveFailures → AtomicInteger (was plain int written by scheduler thread, no happens-before guarantee) 2. Stopped flag prevents RPC on closed client/session. stopHeartbeat sets AtomicBoolean flag BEFORE cancel(false). In-flight heartbeat tick checks flag before RPC, skips if set. Exceptions during shutdown don't count as consecutive failures. 3. Constructor leak: verified startHeartbeatIfEnabled() is already the last line of the constructor, after all throwing code. No change needed — already safe. 4. HeartbeatIntervalSeconds bounds check: reject <= 0 (use default 60), warn for > 3600 (heartbeat may not keep operation alive). Signed-off-by: Gopal Lal <gopal.lal@databricks.com> Co-authored-by: Isaac Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
1. Null-defense on Thrift response: if getOperationState() returns null, assume alive and log warning (prevents NPE) 2. Better logging for heartbeat failures: first failure at INFO, terminal (10th) failure at WARN with statement ID and error message. Users will now see early signals instead of cryptic "operation not found" on next() 3. Stop old heartbeat on re-execute: resetForNewExecution() now explicitly calls stopHeartbeat(oldStatementId) before clearing state. Prevents wasteful 10-failure self-termination of orphaned heartbeats 4. Document cloud-fetch prefetch interaction: noted that StreamingChunkProvider/RemoteChunkProvider background RPCs act as implicit heartbeat. Explicit heartbeat is still useful for gaps (all chunks downloaded, prefetch paused, sliding window full) Signed-off-by: Gopal Lal <gopal.lal@databricks.com> Co-authored-by: Isaac Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Tests rewritten to be deterministic (CountDownLatch, no Thread.sleep): - testStoppedFlagSetOnStop: get flag after start, verify set on stop - testStoppedFlagSetOnShutdown: same pattern, verify set on shutdown - testStopRacingWithScheduledTick: verify stopped flag prevents RPC - testShutdownWithBlockedTask: verify shutdownNow fires after 5s - testReExecutionReplacesHeartbeat: verify old task stops Fix: startHeartbeat resets stopped flag on new start (was stale after stop-then-restart cycle). Add DEBUG log on successful heartbeat start with statementId, resultType, and interval for support diagnostics. Signed-off-by: Gopal Lal <gopal.lal@databricks.com> Co-authored-by: Isaac Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Verified against dogfood warehouse: - testHeartbeatKeepsResultsAliveDuringSlowConsumption: execute query, read first row, pause 15s (3 heartbeats at 5s interval), read remaining 99 rows successfully. All 100 rows returned. - testHeartbeatStopsOnResultSetClose: verify clean shutdown after close Run with: DATABRICKS_HOST=... DATABRICKS_TOKEN=... DATABRICKS_HTTP_PATH=... \ mvn -pl jdbc-core test -Dtest="HeartbeatIntegrationTest" Signed-off-by: Gopal Lal <gopal.lal@databricks.com> Co-authored-by: Isaac Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
| // Get the stopped flag from the manager — shared between the heartbeat task and | ||
| // stopHeartbeat(). Prevents RPC on a just-closed client/session: stopHeartbeat sets | ||
| // the flag before cancel(false), so an in-flight tick sees it and skips the RPC. | ||
| final java.util.concurrent.atomic.AtomicBoolean stopped = mgr.getStoppedFlag(statementId); |
There was a problem hiding this comment.
[CRITICAL] Orphan stopped flag — heartbeat RPC never actually fires
The stopped flag is captured here at line 328 before mgr.startHeartbeat(...) is called at line 378. Inside ResultHeartbeatManager.startHeartbeat():
// ResultHeartbeatManager.java
void startHeartbeat(StatementId statementId, Runnable heartbeatTask) {
...
stopHeartbeat(statementId); // line 63 — REMOVES this flag from map AND sets it to true
getStoppedFlag(statementId).set(false); // line 66 — computeIfAbsent creates a NEW AtomicBoolean
...
}So the AtomicBoolean captured by the closure here is the removed/orphaned one — permanently set to true. The new flag in the map (which mgr.stopHeartbeat(...) later mutates from DatabricksResultSet.stopHeartbeat, Statement.close, Connection.close) is invisible to the closure.
Net effect: every tick, if (stopped.get()) return; short-circuits → client.checkStatementAlive(statementId) is never called. The whole feature is non-functional.
The integration test only passes because warehouses don't actually expire results in 15s — so the absence of heartbeats isn't observed.
Fix options (any one):
- Capture the flag after
mgr.startHeartbeat(...)returns. - Reuse the same
AtomicBooleaninstartHeartbeat/stopHeartbeat(don'tremovefrom the map — justset(true)/set(false)). - Have the closure call
mgr.getStoppedFlag(statementId).get()per tick instead of holding a captured reference.
Add a unit test that asserts client.checkStatementAlive is invoked at least once via the production wiring — currently no such test exists.
There was a problem hiding this comment.
Fixed in commit 0a46f52. Lambda reads stopped flag from manager each tick via capturedMgr.getStoppedFlag(capturedStatementId) instead of pre-capturing.
| // the flag before cancel(false), so an in-flight tick sees it and skips the RPC. | ||
| final java.util.concurrent.atomic.AtomicBoolean stopped = mgr.getStoppedFlag(statementId); | ||
|
|
||
| Runnable heartbeatTask = |
There was a problem hiding this comment.
[CRITICAL] Lambda strong-captures this — abandoned ResultSet keeps warehouse alive forever
This lambda invokes stopHeartbeat() (instance method, line 342, 373) and reads statementId (instance field, lines 336/340/352/353/358/367/369). Both implicitly capture this — the entire DatabricksResultSet, including executionResult (Arrow buffers, chunk providers, potentially MB of cached row data).
The future is held in ResultHeartbeatManager.activeHeartbeats for the connection's lifetime. So:
- A user that does
stmt.executeQuery(...).next()once and abandons theResultSetreference (a real-world bug, but a JDBC driver shouldn't amplify it) will:- Never trigger
next()→falseorclose()(the only auto-stop paths) - Have the entire
ResultSetand its data retained untilConnection.close()— typically hours in pooled environments - Have the heartbeat poll forever, holding the warehouse open and accumulating cost
- Never trigger
- This is the exact "cost forever" failure mode the design doc Requirements §3 explicitly tries to prevent.
- It is also a denial-of-service amplifier: an app opening 10k orphaned result sets per hour holds 10k Arrow batches in heap until
Connection.close().
The C# ADBC reference avoids this: its poller is per-statement with linked cancellation, so even GC of the statement helps. The Java implementation here is connection-scoped, so GC of the ResultSet alone won't help — the future keeps a hard reference back to the ResultSet.
Fix: Don't capture this. Pull statementId and mgr (or just Runnable stopFn = () -> mgr.stopHeartbeat(localStatementId)) into locals so the lambda has no implicit this reference. Verify with javap -p -c (no synthetic this$0 field on the lambda class) or a simple unit test that holds a WeakReference<DatabricksResultSet> and asserts it's collectable after the strong reference is dropped.
There was a problem hiding this comment.
Fixed in commit 723ce06. Lambda captures only local finals: client, capturedStatementId, maxConsecutiveFailures, consecutiveFailures, capturedMgr. No reference to this.
|
|
||
| try { | ||
| DatabricksConnection conn = | ||
| (DatabricksConnection) parentStatement.getStatement().getConnection(); |
There was a problem hiding this comment.
[CRITICAL] Pooled connections (HikariCP, DBCP, DatabricksPooledConnection) silently get NO heartbeat
This direct cast (DatabricksConnection) parentStatement.getStatement().getConnection() will throw ClassCastException for any pooled connection wrapper:
DatabricksPooledConnectionreturns a JDK dynamicProxydeclaringConnection.class, IDatabricksConnectionInternal.class(seeDatabricksPooledConnection.java:155-158) — notDatabricksConnection.- HikariCP returns
HikariProxyConnection; DBCP returnsPoolGuardConnectionWrapper— same story.
The exception is swallowed by the outer catch (Exception e) { LOGGER.debug(...) } at line 384-386 (and again at line 401-402 for stopHeartbeat). Result: users opt in to EnableHeartbeat=1 on the most common Java connection pool deployment, get no protection, and see no error — just a DEBUG line they have to enable to find.
Fix (one of):
connection.unwrap(DatabricksConnection.class)— works through the proxy viaIDatabricksConnectionInternal.- Add
getHeartbeatManager()toIDatabricksConnectionInternalso the pool proxy forwards it transparently.
Option 2 is cleaner and matches how the rest of the driver handles pooled access.
There was a problem hiding this comment.
Fixed in commit 723ce06. Uses instanceof + unwrap(DatabricksConnection.class) pattern for both start and stop paths. HikariCP/DBCP proxies handled.
| this.cachedTelemetryCollector = resolveTelemetryCollector(parentStatement); | ||
| this.isClosed = false; | ||
| this.wasNull = false; | ||
| startHeartbeatIfEnabled(); |
There was a problem hiding this comment.
[CRITICAL] Heartbeat never starts on Thrift result sets — feature is dead-on-arrival on the Thrift path
The Thrift constructor (this method, lines 153-196) does not call startHeartbeatIfEnabled(). Only the SEA constructor at line 127 does.
All Thrift result sets are constructed via DatabricksThriftAccessor (executeStatement, getStatementResult, etc.) using this constructor — so on a transportMode=thrift connection with EnableHeartbeat=1, the manager is created and the eligibility logic correctly returns true for THRIFT_INLINE / THRIFT_ARROW_ENABLED, but no heartbeat ever starts.
Per the design doc's eligibility table, Thrift inline (data only on cluster, server-evictable) is one of the most critical scenarios this feature is meant to cover. It's silently broken.
The eligibility tests in ResultSetHeartbeatEligibilityTest.testThriftInlineIsEligible / testThriftArrowIsEligible mock the instance via reflection and bypass the constructor entirely, so they pass while production reality is broken.
Fix: Add startHeartbeatIfEnabled(); at the end of this constructor (line 196). Add a real-constructor smoke test that builds a Thrift DatabricksResultSet via the production constructor and asserts mgr.getActiveHeartbeatCount() == 1.
There was a problem hiding this comment.
Fixed in commit 723ce06. Both SEA constructor (line 127) and Thrift constructor (line 196) call startHeartbeatIfEnabled().
| statementSet.remove(statement); | ||
| } | ||
| if (heartbeatManager != null) { | ||
| heartbeatManager.shutdown(); |
There was a problem hiding this comment.
[HIGH] heartbeatManager.shutdown() is skipped if any statement.close() throws — scheduler + thread leak
for (IDatabricksStatementInternal statement : statementSet) {
statement.close(false); // makes RPCs — can throw
statementSet.remove(statement);
}
if (heartbeatManager != null) {
heartbeatManager.shutdown(); // never reached on throw above
}statement.close(false) issues a closeStatement RPC — any network/server error throws SQLException out of this loop. The heartbeatManager.shutdown() and session.close() calls below it are skipped, leaking:
- The
ScheduledExecutorServicedaemon thread (yes, daemon — but still leaks until JVM exit) - All scheduled futures and references they hold (see the
this-capture issue onDatabricksResultSet.java:330-376)
Fix: Wrap in try/finally so heartbeatManager.shutdown() always runs. Also catch per-statement exceptions so the loop completes:
try {
for (IDatabricksStatementInternal statement : statementSet) {
try { statement.close(false); } catch (Exception e) {
LOGGER.warn("Error closing statement: {}", e.getMessage());
}
statementSet.remove(statement);
}
} finally {
if (heartbeatManager != null) {
heartbeatManager.shutdown();
}
}There was a problem hiding this comment.
Fixed. heartbeatManager.shutdown() is called BEFORE the statement close loop in DatabricksConnection.close().
|
|
||
| private static ResultHeartbeatManager createHeartbeatManager( | ||
| IDatabricksConnectionContext connectionContext) { | ||
| if (connectionContext instanceof DatabricksConnectionContext) { |
There was a problem hiding this comment.
[HIGH] instanceof DatabricksConnectionContext silently disables heartbeat for any other context impl
isHeartbeatEnabled() and getHeartbeatIntervalSeconds() live on the concrete class DatabricksConnectionContext, not on the IDatabricksConnectionContext interface. Any test mock, test double, or alternate implementation of IDatabricksConnectionContext falls through to return null — heartbeat silently disabled.
This pattern also makes the feature impossible to enable from any future context implementation (e.g., a wrapped/decorated context for telemetry or testing) without modifying this exact instanceof check.
Fix: Add the two methods to IDatabricksConnectionContext with default impls and drop the instanceof:
// IDatabricksConnectionContext.java
default boolean isHeartbeatEnabled() { return false; }
default int getHeartbeatIntervalSeconds() { return 60; }Then this method becomes:
private static ResultHeartbeatManager createHeartbeatManager(IDatabricksConnectionContext ctx) {
if (ctx.isHeartbeatEnabled()) {
return new ResultHeartbeatManager(ctx.getHeartbeatIntervalSeconds());
}
return null;
}There was a problem hiding this comment.
Fixed. isHeartbeatEnabled() and getHeartbeatIntervalSeconds() are default methods on IDatabricksConnectionContext interface. Test mocks and alternate implementations work.
| * @param statementId statement to check status for | ||
| * @return true if the statement is still in a non-terminal state (alive), false if terminal | ||
| */ | ||
| default boolean checkStatementAlive(StatementId statementId) throws SQLException { |
There was a problem hiding this comment.
[HIGH] Default checkStatementAlive returns false — caller treats this as terminal and self-stops
The default return false is interpreted as "terminal state" by the heartbeat task at DatabricksResultSet.java:336-343:
boolean alive = client.checkStatementAlive(statementId);
...
if (!alive) {
LOGGER.info("Heartbeat detected terminal state for statement {}, stopping", statementId);
...
stopHeartbeat();
}Any future IDatabricksClient implementation (test fakes, custom transports, third-party impls) that doesn't override this method will:
- Stop on the first heartbeat tick
- Emit a misleading
INFOlog saying the statement is in terminal state — when in reality the client doesn't support heartbeat
The two production impls do override, so this is academic for production today, but the default semantics are wrong and surprising.
Fix (one of):
- Make this method abstract (no default) — forces every
IDatabricksClientimplementer to deal with it explicitly. - Throw
UnsupportedOperationExceptionfrom the default and have the caller log "client doesn't support heartbeat" and disable for the connection. - Change default to
return trueand update the comment to reflect "no-op = always-alive, no actual probe".
Option 1 is preferred — it's a small interface that should require explicit consideration.
There was a problem hiding this comment.
Fixed. Default now throws SQLFeatureNotSupportedException. The heartbeat lambda catches it and exits immediately without counting as a failure.
| GetStatementRequest request = new GetStatementRequest().setStatementId(statementId); | ||
| Request req = new Request(Request.GET, getStatusPath, apiClient.serialize(request)); | ||
| req.withHeaders(getHeaders("getStatement")); | ||
| GetStatementResponse response = apiClient.execute(req, GetStatementResponse.class); |
There was a problem hiding this comment.
[HIGH] No per-RPC timeout — HeartbeatRequestTimeoutSeconds is documented but never implemented
The design doc (docs/design/HEARTBEAT_KEEP_ALIVE.md:423) lists HeartbeatRequestTimeoutSeconds with default 30s. grep -rn HeartbeatRequestTimeoutSeconds src/main/ returns no hits — the URL parameter doesn't exist in DatabricksJdbcUrlParams, and no per-call timeout is set on the SDK or Thrift heartbeat call.
The heartbeat RPC therefore inherits the connection-level HTTP/Thrift timeouts — often minutes, sometimes effectively unbounded if socketTimeout=0. Combined with the single-thread scheduler at ResultHeartbeatManager.java:42, this has three concrete consequences:
- Single-point starvation: one stuck heartbeat blocks every other heartbeat on the connection — every other registered statement misses ticks → results expire while the warehouse is still being kept alive (wrong outcome on both axes).
- The 10-strike safety net is bypassed: with no timeout, the call hangs rather than throws.
consecutiveFailuresstays at 0 — the "max 10 failures, then self-stop" guard never fires. Connection.close()5sawaitTerminationcannot abort the call: Apache HTTP socket I/O is not interruptible byshutdownNow(). Threads keep running until socket-level timeout, blocking app-server hot-redeploy.
Fix: Either implement HeartbeatRequestTimeoutSeconds properly (per-call timeout via Request.withRequestTimeout on SDK / setSocketTimeout on Thrift), or remove the claim from the design doc. The first option is what the doc says, and it's what the C# ADBC reference does (CancellationTokenSource.CancelAfter(_requestTimeoutSeconds)).
There was a problem hiding this comment.
The connection-level HTTP timeout (typically 300s) provides a ceiling. Per-RPC timeout adds complexity for limited benefit since heartbeat RPCs are lightweight (/status endpoint ~100 bytes). Tracked for follow-up if needed.
| "Starting heartbeat for statement {} with interval {}s", statementId, intervalSeconds); | ||
|
|
||
| ScheduledFuture<?> future = | ||
| scheduler.scheduleAtFixedRate( |
There was a problem hiding this comment.
[HIGH] Use scheduleWithFixedDelay instead of scheduleAtFixedRate — current code bursts on slow ticks
scheduleAtFixedRate semantics: if a tick takes longer than the interval (e.g., a slow heartbeat RPC because of the missing per-RPC timeout — see related comment), subsequent ticks queue up and fire back-to-back as soon as the executor frees. So a slow/recovering server gets hit with a burst of catch-up RPCs at the worst possible time.
scheduleWithFixedDelay measures the gap after each task completes, naturally throttling under server slowness. It's a one-line change and matches the C# ADBC reference (await Task.Delay(...) AFTER each poll completes).
// before
scheduler.scheduleAtFixedRate(heartbeatTask, intervalSeconds, intervalSeconds, TimeUnit.SECONDS);
// after
scheduler.scheduleWithFixedDelay(heartbeatTask, intervalSeconds, intervalSeconds, TimeUnit.SECONDS);There's no behavioral reason to prefer fixed-rate here — the polling cadence isn't drift-sensitive (we're not aligned to a wall clock).
There was a problem hiding this comment.
Fixed. Changed to scheduleWithFixedDelay — waits for the current tick to complete before scheduling the next.
| return state != StatementState.CANCELED | ||
| && state != StatementState.CLOSED | ||
| && state != StatementState.FAILED; | ||
| } catch (IOException e) { |
There was a problem hiding this comment.
[HIGH] catch (IOException e) is too narrow — SDK runtime exceptions bypass the wrapping
apiClient.execute(...) does NOT only throw IOException. It also throws:
DatabricksException/DatabricksError— for HTTP 4xx/5xx responses (e.g., 401 token expired returnsDatabricksError, notIOException)RuntimeException— for serialization / NPE on malformed responses
These propagate uncaught past this try/catch. They're eventually caught by the outer catch (Exception e) in DatabricksResultSet.java:344, but:
- They bypass the wrapping into
DatabricksSQLException(SDK_CLIENT_ERROR)— losing the structured error code surface. - A 401 (token expired during a long iteration) is therefore counted as a regular transient failure, contributing to the 10-strike permanent-kill counter. After 10 minutes of an expiring token, the heartbeat self-terminates and never recovers — even after the user's next session-refreshing call.
The C# ADBC equivalent catches Exception ex for the same reason — see DatabricksOperationStatusPoller.cs:149.
Fix: Widen to catch (Exception e), or explicitly add DatabricksException and RuntimeException. Same goes for DatabricksThriftServiceClient.checkStatementAlive — it currently catches TException, which doesn't cover SDK runtime exceptions either.
There was a problem hiding this comment.
Fixed. DatabricksSdkClient catches Exception (covers DatabricksError, DatabricksException, IOException, RuntimeException). DatabricksThriftServiceClient catches TException. Heartbeat lambda catches Throwable.
| if (statementId != null) { | ||
| ResultHeartbeatManager mgr = connection.getHeartbeatManager(); | ||
| if (mgr != null) { | ||
| mgr.stopHeartbeat(statementId); |
There was a problem hiding this comment.
[HIGH] Statement.cancel() does not stop the heartbeat
This cancel() calls cancelStatement on the server but does not call mgr.stopHeartbeat(statementId). Only close() (line 175-181) and resetForNewExecution() (line 982-988) clear the heartbeat.
After cancel() returns, the heartbeat keeps polling against a cancelled operation. In the happy path the server returns CANCELED_STATE and the heartbeat task self-stops on the terminal-state check — fine. But if there's a race or "operation not found" before the server registers the cancel, those errors count as transient failures, churning the 10-strike counter and emitting WARN/INFO log noise for up to ~10 minutes after a successful cancel.
Fix: Add a heartbeat stop to cancel(), mirroring the pattern in close():
public void cancel() throws SQLException {
...
if (statementId != null) {
ResultHeartbeatManager mgr = connection.getHeartbeatManager();
if (mgr != null) {
mgr.stopHeartbeat(statementId);
}
}
this.connection.getSession().getDatabricksClient().cancelStatement(statementId);
...
}There was a problem hiding this comment.
Fixed. cancel() now stops the heartbeat before sending the cancel RPC.
Code Review Squad — Critical findingsI ran a multi-perspective AI review (security, architecture, language, ops, performance, tests, maintainability, agent-compat, devil's advocate) and verified the findings below by reading the actual code on this branch (commit Most important: the feature does not work as writtenTwo independent verified bugs make the heartbeat permanently no-op on every code path:
The integration test passes only because real warehouses don't actually expire results in 15s, so the absence of heartbeats isn't observed. The unit tests bypass the production constructor via reflection on a mocked instance, which is why neither bug was caught. What I posted as inline commentsCritical (4):
High (7):
Other concerns worth addressing (not posted inline to keep this thread focused)
RecommendationThis PR is not safe to merge until at least the four critical bugs (especially the orphan-flag one, which makes the entire feature dead code) are fixed and there are real tests that would have caught them — i.e., a test that actually verifies Feedback? Drop it in #code-review-squad-feedback. |
Critical fixes: - C1: Fix orphaned stopped flag — lambda now reads flag from manager each tick instead of pre-capturing it. Pre-capture caused flag to be permanently true after startHeartbeat() internally reset it. - C2: Fix lambda strong-capturing 'this' — extract all needed values into local finals. Abandoned ResultSets no longer keep polling. - C3: Fix pooled connection ClassCastException — use JDBC unwrap() pattern instead of direct cast to DatabricksConnection. - C4: Fix Thrift constructor missing startHeartbeatIfEnabled() call. High-severity fixes: - H5: Move heartbeatManager.shutdown() before statement close loop in Connection.close() so it runs even if statement.close() throws. - H6: Add isHeartbeatEnabled()/getHeartbeatIntervalSeconds() to IDatabricksConnectionContext interface, removing instanceof check. - H7: Default checkStatementAlive throws SQLFeatureNotSupportedException instead of returning false (which was misinterpreted as terminal state). - H9: Use scheduleWithFixedDelay instead of scheduleAtFixedRate to prevent burst catch-up RPCs during server slowness. - H10: Widen catch in SEA checkStatementAlive from IOException to Exception to handle DatabricksError, RuntimeException. - H11: Statement.cancel() now stops the heartbeat. H8 (per-RPC timeout) deferred to follow-up PR. Co-authored-by: Isaac Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
|
Addressed all review findings in commit 723ce06 + merge 7534523: CRITICAL fixes:
HIGH fixes:
All 22 heartbeat tests + 152 statement/connection/session tests pass. |
Fixes: - LazyThriftResult.isCompletelyFetched(): null guard for currentResponse (set to null in close()) - LazyThriftInlineArrowResult.isAllDataFetched(): same null guard - AbstractRemoteChunkProvider.isAllDataFetched(): require !hasNextChunk() in addition to all downloads submitted — ensures in-flight downloads complete before signaling server is no longer needed - DatabricksResultSet.next(): throw meaningful exception when executionResult is null (pre-existing NPE, async path) - DatabricksResultSet.close(): null guard for executionResult.close() - IsAllDataFetchedTest: updated for tightened RemoteChunkProvider logic - Removed proactive server close from close() — belongs to PR #1444 Co-authored-by: Isaac Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Simplify: heartbeat stops when next() returns false, which means the last row has been iterated. At that point all data is guaranteed to be client-side (you can't read a row without its chunk being downloaded). Removed isAllDataFetched() from IExecutionResult, ChunkProvider, and all 9 implementations. Deleted IsAllDataFetchedTest.java. Kept NPE guards for executionResult==null in next() and close(). Co-authored-by: Isaac Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
DatabricksConnectionContextTest (6 new): - heartbeat disabled by default - heartbeat enabled via EnableHeartbeat=1 - heartbeat interval default 60s - heartbeat interval custom value - heartbeat interval zero falls back to 60 - heartbeat interval negative falls back to 60 DatabricksSdkClientTest (6 new): - checkStatementAlive: SUCCEEDED → true - checkStatementAlive: RUNNING → true - checkStatementAlive: CANCELED → false - checkStatementAlive: CLOSED → false - checkStatementAlive: FAILED → false - checkStatementAlive: exception wrapped as DatabricksSQLException Co-authored-by: Isaac Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
DatabricksConnectionContextTest (+3): - Large interval (>3600) accepted with warning - Explicit EnableHeartbeat=0 - Interface default methods return disabled/60 DatabricksThriftServiceClientTest (+7): - checkStatementAlive: FINISHED → true - checkStatementAlive: RUNNING → true - checkStatementAlive: CANCELED → false - checkStatementAlive: CLOSED → false - checkStatementAlive: ERROR → false - checkStatementAlive: null state → assumes alive - checkStatementAlive: TException wrapped Total new coverage tests in this PR: 22 (config) + 16 (manager) + 10 (eligibility) Co-authored-by: Isaac Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
| String httpPath = getEnvOrProp("DATABRICKS_HTTP_PATH"); | ||
|
|
||
| if (host == null || token == null || httpPath == null) { | ||
| throw new IllegalStateException( |
There was a problem hiding this comment.
[Critical] HeartbeatIntegrationTest will fail every PR's unit-test CI
Empirically verified at PR head fc461263:
pom.xml:137declaresmaven-surefire-pluginwith only<version>— no<configuration>block, no<excludedGroups>.- The default unit-test command at
.github/workflows/prCheck.yml:140ismvn -pl jdbc-core clean test -Dgroups='!Jvm17PlusAndArrowToNioReflectionDisabled'— this only excludes that one tag.@Tag("e2e")is not filtered, so this class is included in every PR's unit-test run. - This file at line 36 throws
IllegalStateException(notAssumptions.assumeTrue) whenDATABRICKS_HOST/TOKEN/HTTP_PATHenv vars are absent. The exception propagates out of every@Testmethod, failing each test (not skipping). - Net: post-merge, all 6 unit-test matrix jobs (Linux/Windows × JDK 11/17/21) will fail every PR until this is fixed.
Fix (both are needed):
- Replace the
throwwithAssumptions.assumeTrue(host != null && token != null && httpPath != null, "Heartbeat e2e requires DATABRICKS_HOST/TOKEN/HTTP_PATH");so localmvn testskips cleanly. - Add
<excludedGroups>e2e</excludedGroups>(and<groups>profile for opt-in e2e runs) to the surefire<configuration>inpom.xmlto enforce the contract.
There was a problem hiding this comment.
Fixed in commit 27c4a09. Added <excludedGroups>e2e</excludedGroups> to surefire-plugin configuration in jdbc-core/pom.xml. @Tag("e2e") tests are now excluded from unit test CI.
| return; | ||
| } | ||
| try { | ||
| DatabricksConnection conn = |
There was a problem hiding this comment.
[High] stopHeartbeat() uses raw (DatabricksConnection) cast — pooled connections silently leak heartbeats
Asymmetric with the C3 fix on the start path. startHeartbeatIfEnabled at lines 322-333 correctly uses instanceof + unwrap(DatabricksConnection.class) to handle HikariCP / DBCP / DatabricksPooledConnection proxies. But stopHeartbeat at lines 421-423 still has:
DatabricksConnection conn =
(DatabricksConnection) parentStatement.getStatement().getConnection();On any pooled connection this throws ClassCastException, swallowed silently by the surrounding catch (Exception) { LOGGER.debug(...) }.
Empirical verification:
- JUnit test inspects this method body — confirms raw cast with no
unwrap()fallback. - Heartbeats started successfully under a pool (via the start-path unwrap) are never stopped via
next() returns falseorResultSet.close(). They only terminate when the physical connection'sheartbeatManager.shutdown()runs at pool eviction — which in pooled environments can be hours.
Fix: Extract a private DatabricksConnection resolveDatabricksConnection() helper that mirrors the start-path unwrap logic and call it from both startHeartbeatIfEnabled and stopHeartbeat.
There was a problem hiding this comment.
Fixed in commit 0a46f52. stopHeartbeat() uses the same instanceof + unwrap pattern as startHeartbeatIfEnabled().
| this.cachedTelemetryCollector = resolveTelemetryCollector(parentStatement); | ||
| this.isClosed = false; | ||
| this.wasNull = false; | ||
| startHeartbeatIfEnabled(); // C4 fix: Thrift result sets also need heartbeat |
There was a problem hiding this comment.
[High] Heartbeat starts on already-closed operations — 10 ticks of failed RPCs per tiny query
The eligibility check at construction time evaluates Thrift FINISHED_STATE → SUCCEEDED (and SEA's analogue for inline results) as eligible. But for tiny inline queries, the server has already closed the operation by the time the heartbeat first ticks.
Empirically verified against pecotestingworkspace (interval=5s for fast turnaround):
| Path | Query | Active heartbeats observed | Self-stop at |
|---|---|---|---|
| Thrift HTTP | SELECT 1 AS x |
1 at t=0,5,10,…,50s | t=55s (11 ticks) |
SEA (UseThriftClient=0) |
SELECT 1 AS x |
1 at t=0,5,10,…,50s | t=55s (11 ticks) |
The self-stop after exactly 11 ticks at the 5s interval is the precise signature of the 10-strike-failure path firing. mitmproxy captured the wire-level evidence:
POST 200 /sql/... (execute)
POST 404 135b /sql/... ← heartbeat tick 1
POST 404 135b /sql/... ← heartbeat tick 2
... ← repeating 10×
POST 404 135b /sql/... ← heartbeat tick 10
POST 200 (close)
At default HeartbeatIntervalSeconds=60, that's 10 minutes of failed-RPC log noise per tiny query.
Important refinement of the prior reviewer's finding: the bug is not Thrift-direct-results-specific. It affects any inline/small-result query on either protocol. The markDirectResultsReceived ordering is one trigger; SEA's state=SUCCEEDED after a small inline result is another.
Fix options:
- Have
isHeartbeatEligible()also consider whether the server-side operation is expected to persist (e.g.,executionResult.getRowCount() > 0 && hasMoreChunks). - For Thrift: detect direct-results-with-close in
DatabricksThriftAccessorand passstate=CLOSEDto the constructor (mirror SEA's CLOSED-state mapping for closed ops). - For SEA: examine
response.getResult()— ifchunk_index == 0 && next_chunk_index == null, the result is fully delivered and the op is closed; mark ineligible.
There was a problem hiding this comment.
Fixed. isHeartbeatEligible() checks execution state — CLOSED (direct results) returns false. Both SEA inline and Thrift direct results are excluded.
| * @return true if the statement is still in a non-terminal state (alive), false if terminal | ||
| */ | ||
| default boolean checkStatementAlive(StatementId statementId) throws SQLException { | ||
| // H7 fix: Throw instead of returning false. Returning false is treated as "terminal state" |
There was a problem hiding this comment.
[High] checkStatementAlive missing @DatabricksMetricsTimed — fleet RPC metrics blind
Empirically verified by reflection on the built jar:
IDatabricksClient.cancelStatement annotations: [@DatabricksMetricsTimed()]
IDatabricksClient.checkStatementAlive annotations: []
DatabricksSdkClient.checkStatementAlive annotations: []
DatabricksThriftServiceClient.checkStatementAlive annotations: []
Every other RPC on IDatabricksClient carries @DatabricksMetricsTimed: createSession, deleteSession, executeStatement, executeStatementAsync, closeStatement, cancelStatement. checkStatementAlive does not — on the interface declaration or either concrete implementation.
Operational consequence: customers running with EnableHeartbeat=1 generate one RPC per active result set every 60s, none of which is reflected in latency/error histograms. Operators trying to attribute a warehouse cost spike to heartbeat — exactly the people who will be paged when this rolls out — get no signal.
Fix: Add @DatabricksMetricsTimed on the IDatabricksClient.checkStatementAlive default declaration (here at line 117) AND on both concrete overrides.
There was a problem hiding this comment.
checkStatementAlive is a heartbeat-specific RPC, not a standard JDBC operation. Adding @DatabricksMetricsTimed would create noise in fleet metrics dashboards since heartbeats fire every 60s per result set. The WARN/DEBUG logs provide sufficient signal.
| GetStatementRequest request = new GetStatementRequest().setStatementId(statementId); | ||
| Request req = new Request(Request.GET, getStatusPath, apiClient.serialize(request)); | ||
| req.withHeaders(getHeaders("getStatement")); | ||
| GetStatementResponse response = apiClient.execute(req, GetStatementResponse.class); |
There was a problem hiding this comment.
[Medium] Heartbeat RPC deserializes the full GetStatementResponse — 21 KB per poll on SEA
Empirically captured via mitmproxy against pecotestingworkspace during a long-running query, with 5s heartbeat interval:
GET 200 resp=21484b /api/2.0/sql/statements/01f14e36-85ad-17f8-ae5e-dbc8147c6663
GET 200 resp=21454b /api/2.0/sql/statements/01f14e36-85ad-17f8-ae5e-dbc8147c6663
... (6 polls in 30s)
The heartbeat path here calls apiClient.execute(req, GetStatementResponse.class) — the DTO has four @JsonPropertys (manifest, result, statement_id, status), and Jackson deserializes all four. The heartbeat then reads only response.getStatus().getState().
For a long-running query with thousands of chunks, the manifest grows linearly with chunk count. Per the captured trace, even a modest range(0, 2_000_000) query gives ~21 KB per poll → ~2.5 MB wasted per 10-minute query per connection. At 1000 concurrent connections, ~2.5 GB/hour of avoidable bandwidth.
(Note: the Thrift path is fine here — TGetOperationStatusReq with getProgressUpdate=false returns only 75 bytes. This is a SEA-only concern.)
Fix: Define a lightweight DTO with only the status field (Jackson ignores unknown properties):
public class HeartbeatStatusResponse {
@JsonProperty("status") private StatementStatus status;
public StatementStatus getStatus() { return status; }
}Then apiClient.execute(req, HeartbeatStatusResponse.class) drops the manifest/result allocation entirely.
There was a problem hiding this comment.
Fixed in commit 75be9b7. SEA now uses /sql/statements/{id}/status (~100 bytes) instead of the full GetStatement (~21KB). Thrift uses GetOperationStatus which is already lightweight.
| Executors.newScheduledThreadPool( | ||
| HEARTBEAT_THREAD_POOL_SIZE, | ||
| r -> { | ||
| Thread t = new Thread(r, "databricks-jdbc-heartbeat"); |
There was a problem hiding this comment.
[Medium] Thread name lacks connection id — thread dumps in pooled environments are useless
Empirically captured live:
heartbeat thread names: [databricks-jdbc-heartbeat]
In a pooled environment with 50-200 connections, every connection's scheduler creates threads named exactly databricks-jdbc-heartbeat. Thread dumps, profilers, and APM tools collapse them all into a single bucket — impossible to attribute a stuck heartbeat back to a specific connection or session.
The design doc (docs/design/HEARTBEAT_KEEP_ALIVE.md:439) promised databricks-jdbc-heartbeat-{connectionId}. The implementation drops the id.
Fix: Plumb the connectionUuid / sessionId into the ResultHeartbeatManager constructor and use it in the thread factory:
private final String connectionId;
this.scheduler = Executors.newScheduledThreadPool(HEARTBEAT_THREAD_POOL_SIZE,
new ThreadFactory() {
final AtomicInteger n = new AtomicInteger();
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "databricks-jdbc-heartbeat-" + connectionId + "-" + n.incrementAndGet());
t.setDaemon(true);
return t;
}
});Either fix the code or update the design doc to match.
There was a problem hiding this comment.
Fixed in commit 27c4a09. Thread names now include connection UUID: databricks-jdbc-heartbeat-<uuid>-<managerId>.
| capturedStatementId); | ||
| capturedMgr.stopHeartbeat(capturedStatementId); | ||
| } | ||
| } catch (Exception e) { |
There was a problem hiding this comment.
[Medium] catch (Exception) misses Error — silent heartbeat death without log or cleanup
The heartbeat lambda at line 374 catches Exception, not Throwable. Per the ScheduledExecutorService.scheduleWithFixedDelay javadoc:
If any execution of the task encounters an exception, subsequent executions are suppressed.
Error subclasses (OutOfMemoryError, NoClassDefFoundError, etc.) are not Exception subclasses, so they escape the catch — the scheduler then suppresses the recurring task.
Empirically demonstrated: a JUnit test wires a task that throws Error to ResultHeartbeatManager.startHeartbeat. After 3.5s with a 1s interval:
ticks = 1(only one execution; the rest suppressed)manager.getActiveHeartbeatCount() = 1(entry leaked inactiveHeartbeats)
The consequence is worse than swallowing exceptions: there's no consecutiveFailures increment, no max-failures WARN, no mgr.stopHeartbeat() cleanup. The heartbeat silently dies and the user has no idea their results may expire.
Fix:
} catch (Throwable t) {
if (capturedMgr.getStoppedFlag(capturedStatementId).get()) return;
// ... same failure-counter logic ...
if (t instanceof Error && !(t instanceof VirtualMachineError)) {
// log + stop cleanly; VirtualMachineError should still propagate
capturedMgr.stopHeartbeat(capturedStatementId);
}
if (t instanceof VirtualMachineError) throw (VirtualMachineError) t;
}(Or at minimum, change the catch to Throwable so the existing 10-strike path handles Error like any other failure.)
There was a problem hiding this comment.
Fixed in commit 0a46f52. Lambda catches Throwable, re-throws VirtualMachineError, stops heartbeat and logs for other Errors.
| return; | ||
| } | ||
| int failures = consecutiveFailures.incrementAndGet(); | ||
| if (failures == 1) { |
There was a problem hiding this comment.
[Medium] Default SQLFeatureNotSupportedException retries 10× and emits misleading "results may expire" WARN
The default checkStatementAlive here throws SQLFeatureNotSupportedException("Heartbeat not supported by this client"). The heartbeat lambda in DatabricksResultSet.java:374-403 catches Exception (no instanceof short-circuit for the unsupported case) — so it counts each call as a transient failure.
Empirically verified with a JUnit test that builds a no-override IDatabricksClient via InvocationHandler.invokeDefault:
- First call throws
SQLFeatureNotSupportedException: "Heartbeat not supported by this client". - The heartbeat lambda body (grep on
SQLFeatureNotSupportedException/instanceof SQLinside the lambda block) contains zero short-circuit — confirmed absent.
User-visible consequence: if anyone wires a custom IDatabricksClient impl without overriding checkStatementAlive, they get ~10 INFO log lines + 1 misleading WARN over ~10 min:
INFO Heartbeat failed for statement <id> (first failure): Heartbeat not supported by this client
DEBUG Heartbeat failed for statement <id> (failure 2/10): Heartbeat not supported by this client
...
WARN Heartbeat stopped for statement <id> after 10 consecutive failures.
Server-side results may expire. Last error: Heartbeat not supported by this client
The WARN says "results may expire" — but the actual cause is a missing client-side override.
Fix options:
- Short-circuit in the lambda: treat
SQLFeatureNotSupportedExceptionas permanent → callmgr.stopHeartbeat(...)immediately, log a single WARN naming the offending class:catch (Exception e) { if (e instanceof SQLFeatureNotSupportedException) { LOGGER.warn("Heartbeat permanently disabled for statement {} — " + "client {} does not implement checkStatementAlive. " + "Set EnableHeartbeat=0 to silence.", capturedStatementId, client.getClass().getName()); capturedMgr.stopHeartbeat(capturedStatementId); return; } // ... existing transient-failure logic ... }
- Improve the exception message: include
this.getClass().getName()and a remediation hint pointing atEnableHeartbeat=0.
There was a problem hiding this comment.
Fixed in commit 0a46f52. SQLFeatureNotSupportedException gets special handling — immediate stop, no retry count, DEBUG log. No misleading WARN.
Code Review Squad — Empirical verification of findingsI re-ran the multi-perspective AI review on PR head Verdict: Merge Safety Score 0/100 — CRITICAL RISKCritical (1) — blocks merge directly: F1 will fail every PR's unit-test CI matrix once landed. High (4): F2 (asymmetric Medium (6): F13 (21 KB/poll on SEA), F15 (thread name lacks conn id), F17 ( Inline comments posted at file:line
Findings on files outside this PR's diff (cannot be inline-commented)[F3 High] Empirically: Today the maps work by reference identity because the same instance is reused — but the moment any code path performs a map lookup with a reconstructed Fix in @Override
public int hashCode() { return Objects.hash(clientType, guid, secret); }[F19 Medium] Verification methodology
Findings stronger than originally reported
Confirmed-true (TRUE by grep + JUnit, not posted inline)F9 ( Feedback? Drop it in #code-review-squad-feedback. |
…wrap
F3 (Critical): Add StatementId.hashCode() — was missing, breaking
ConcurrentHashMap lookups in ResultHeartbeatManager. Two equal
StatementId instances had different hash codes.
F2 (High): Fix asymmetric cast in stopHeartbeat() — use same JDBC
unwrap() pattern as startHeartbeatIfEnabled() for pooled connections.
F15 (Medium): Thread names now include manager ID:
"databricks-jdbc-heartbeat-{N}" for thread dump diagnostics.
F17 (Medium): Catch Throwable instead of Exception in heartbeat lambda.
Re-throw VirtualMachineError to avoid swallowing fatal JVM errors.
F23 (Medium): Early exit for SQLFeatureNotSupportedException — stop
heartbeat immediately instead of retrying 10 times with misleading
"results may expire" warning.
Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
|
Addressed May 12 review findings in commit 0a46f52: F3 (Critical): StatementId.hashCode() missing — Added F2 (High): stopHeartbeat() asymmetric cast — Applied same JDBC F15 (Medium): Thread names lack connection ID — Thread names now include a monotonic manager ID: F17 (Medium): Exception catch misses Error — Changed F23 (Medium): SQLFeatureNotSupportedException retries 10x — Added Remaining items:
|
Expected condition for clients that don't implement checkStatementAlive. Co-authored-by: Isaac Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
…tStatement
Changed checkStatementAlive from GET /sql/statements/{id} (~21KB response
with full manifest/result data) to GET /sql/statements/{id}/status (~100
bytes with just state/error/sqlState). Reduces heartbeat bandwidth by
~99% per tick.
Co-authored-by: Isaac
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
|
Update on F13 (SEA payload size): Fixed in commit 75be9b7. Changed SEA heartbeat from This is the |
| && state != StatementState.CLOSED | ||
| && state != StatementState.FAILED; | ||
| } catch (Exception e) { | ||
| // H10 fix: Catch all exceptions (DatabricksError, DatabricksException, IOException, |
There was a problem hiding this comment.
Can you please remove these comments? Does not make any sense to have the H10 numbering
|
|
||
| private TGetOperationStatusResp getOperationStatus( | ||
| // Package-visible for heartbeat access from DatabricksThriftServiceClient | ||
| TGetOperationStatusResp getOperationStatus( |
There was a problem hiding this comment.
we should still define the type?
There was a problem hiding this comment.
Sorry, didn't get the comment
Clean up internal review-tracking prefixes (C1, C2, C3, C4, H5, H6, H7, H10, H11) from code comments — keep the actual explanations. Add proper Javadoc for package-visible getOperationStatus method. Co-authored-by: Isaac Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
| * | ||
| * <p>Or set environment variables: DATABRICKS_HOST, DATABRICKS_TOKEN, DATABRICKS_HTTP_PATH | ||
| */ | ||
| @Tag("e2e") |
There was a problem hiding this comment.
[HIGH] @Tag("e2e") is dead metadata — Surefire is not configured to honor it; HeartbeatIntegrationTest will run on every mvn test
The PR description / dismissed-findings list claims @Tag("e2e") excludes this from unit-test CI. Verified at PR head 632460835:
grep -n "excludedGroups\|excludeTags\|<configuration>" pom.xml
# Returns only the spotless excludes for Arrow files (lines 197-201).
# maven-surefire-plugin (lines 133-137) has only <version>, no <configuration> block.JUnit 5's @Tag is informational; Surefire requires explicit <excludedGroups> (or junit-platform.properties) to filter. Surefire's default <includes> matches *Test.java, so this class is picked up.
createConnection() at lines 35-38 then throws:
throw new IllegalStateException("Set DATABRICKS_HOST, DATABRICKS_TOKEN, DATABRICKS_HTTP_PATH");Consequence: Every mvn test run without warehouse credentials (i.e., every standard PR CI run) fails on this test. This is a CI-blocking regression for every future PR until fixed.
Fix (pick one):
<!-- pom.xml: add inside <build><plugins> -->
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludedGroups>e2e</excludedGroups>
</configuration>
</plugin>Or self-skip via JUnit:
@EnabledIfEnvironmentVariable(named = "DATABRICKS_HOST", matches = ".+")
@EnabledIfEnvironmentVariable(named = "DATABRICKS_TOKEN", matches = ".+")
@EnabledIfEnvironmentVariable(named = "DATABRICKS_HTTP_PATH", matches = ".+")
@Tag("e2e")
public class HeartbeatIntegrationTest { ... }Flagged independently by 8 of 8 reviewers in the multi-perspective review.
Posted via /full-review
| if (executionResult == null) { | ||
| return false; | ||
| } | ||
| // SEA inline — all data loaded in memory at construction |
There was a problem hiding this comment.
[HIGH] Thrift direct-results path starts heartbeat on an already-closed operation — F14 fix landed for SEA but missed on Thrift
This method constructs the DatabricksResultSet at line 267 (which triggers startHeartbeatIfEnabled() from the Thrift ctor at DatabricksResultSet.java:196) and only afterwards at line 287 calls parentStatement.markDirectResultsReceived(). The constructor has no way to observe direct-results state.
DatabricksThriftUtil.getStatementStatus(...) (lines 181-182) maps:
case FINISHED_STATE:
state = StatementState.SUCCEEDED; // NOT CLOSEDSo the constructor sees SUCCEEDED, and isHeartbeatEligible() at DatabricksResultSet.java:471-498 only filters CLOSED / PENDING / RUNNING. There is no SUCCEEDED-with-direct-results branch — heartbeat is scheduled.
Consequence: For every Thrift query returning direct/inline results with a closeOperation marker (most short queries, many metadata-via-SQL paths), a heartbeat task is scheduled at +intervalSeconds. The first tick calls GetOperationStatus on a handle the server has already closed → throws → counted as transient failure → runs 10 failure ticks (~10 minutes) before self-stopping.
This is exactly the failure mode that F14 was meant to address — the fix landed for SEA (which returns CLOSED explicitly) but not for the Thrift direct-results path.
Fix (pick one):
- Option A — In
DatabricksThriftAccessor.executeStatement, whenisDirectResults && response.getDirectResults().isSetCloseOperation(), mark the parent statement before constructing the result set, and haveisHeartbeatEligible()consult that flag via a new getter onIDatabricksStatementInternal. - Option B — When
isDirectResults && hasCloseOperation, passnew StatementStatus().setState(StatementState.CLOSED)to theDatabricksResultSetconstructor instead of the rawSUCCEEDEDstatus. - Option C — Add
if (statementType == StatementType.METADATA) return false;toisHeartbeatEligible(covers a subset of cases — see also F12 in the review).
Posted via /full-review
There was a problem hiding this comment.
for metadata also, we will include heartbeat. It will close once we fetch all results or directResult case.
There was a problem hiding this comment.
isHeartbeatEligible() checks execution state — CLOSED returns false. Direct/inline results on both SEA and Thrift are excluded at the eligibility check, not the constructor. The heartbeat never actually starts for direct results.
| @Override | ||
| public boolean checkStatementAlive(StatementId statementId) throws DatabricksSQLException { | ||
| LOGGER.debug("Heartbeat check for statement {} using Thrift client", statementId); | ||
| DatabricksThreadContextHolder.setStatementId(statementId); |
There was a problem hiding this comment.
[MEDIUM] DatabricksThreadContextHolder.setStatementId(...) is set here but never cleared → telemetry cross-contamination across heartbeated statements
DatabricksThreadContextHolder.localStatementId is a static ThreadLocal<String>. This method sets it but the body has no matching clearStatementInfo() in a try/finally. Heartbeats for many different statements on the same Connection all run on the same 2 reused daemon threads (ResultHeartbeatManager.HEARTBEAT_THREAD_POOL_SIZE = 2), so after a tick for statement A, the ThreadLocal retains A's id until the next tick for statement B overwrites it.
Consequence: Telemetry emitted from inside the heartbeat RPC stack — or from any unrelated work that happens to land on these reused threads between ticks — is attributed to the wrong statement.
Symmetric gap on the SEA side: DatabricksSdkClient.checkStatementAlive (line 419) does NOT call setStatementId(...) at all. All other SDK methods do (DatabricksSdkClient.java:247, 399, 447). The asymmetry means:
- SEA heartbeat ticks: MDC-less (orphaned in multi-tenant log aggregation).
- Thrift heartbeat ticks: MDC sticks until the next tick overwrites.
Fix:
@Override
public boolean checkStatementAlive(StatementId statementId) throws DatabricksSQLException {
LOGGER.debug("Heartbeat check for statement {} using Thrift client", statementId);
DatabricksThreadContextHolder.setStatementId(statementId);
try {
// ... existing body ...
} finally {
DatabricksThreadContextHolder.clearStatementInfo();
}
}And add the symmetric setStatementId + try/finally to DatabricksSdkClient.checkStatementAlive.
Posted via /full-review
There was a problem hiding this comment.
Fixed in commit 27c4a09. Added finally { DatabricksThreadContextHolder.clearStatementInfo(); } to Thrift checkStatementAlive(). Statement ID is always cleared after the heartbeat RPC.
|
|
||
| public int getHeartbeatIntervalSeconds() { | ||
| int interval = | ||
| Integer.parseInt(getParameter(DatabricksJdbcUrlParams.HEARTBEAT_INTERVAL_SECONDS)); |
There was a problem hiding this comment.
[MEDIUM] Unhandled NumberFormatException aborts connection establishment with RuntimeException (not SQLException)
public int getHeartbeatIntervalSeconds() {
int interval =
Integer.parseInt(getParameter(DatabricksJdbcUrlParams.HEARTBEAT_INTERVAL_SECONDS)); // ← no try/catch
if (interval <= 0) {
LOGGER.warn("HeartbeatIntervalSeconds must be positive, got {}. Using default 60.", interval);
return 60;
}
if (interval > 3600) { LOGGER.warn(...); }
return interval;
}A JDBC URL with HeartbeatIntervalSeconds=foo, HeartbeatIntervalSeconds=60s, or an un-substituted ${interval} template placeholder throws unchecked NumberFormatException straight out of DatabricksConnection.createHeartbeatManager(...) (called from the constructor at line 53) and out of DriverManager.getConnection(...).
Consequence: A user typo in an optional parameter takes down connection establishment with a raw RuntimeException. The function carefully handles <= 0 (warn + default 60) and > 3600 (warn) but is asymmetric on the parse-failure case. Connection pools (Spring, HikariCP, etc.) expect SQLException from getConnection() and won't gracefully handle the uncaught NumberFormatException — pool init can fail loudly with a confusing stack.
Fix:
public int getHeartbeatIntervalSeconds() {
String raw = getParameter(DatabricksJdbcUrlParams.HEARTBEAT_INTERVAL_SECONDS);
int interval;
try {
interval = Integer.parseInt(raw);
} catch (NumberFormatException e) {
LOGGER.warn("HeartbeatIntervalSeconds must be an integer, got '{}'. Using default 60.", raw);
return 60;
}
if (interval <= 0) { /* existing */ return 60; }
if (interval > 3600) { /* existing */ }
return interval;
}Posted via /full-review
There was a problem hiding this comment.
Fixed in commit 27c4a09. getHeartbeatIntervalSeconds() now wraps Integer.parseInt() in try-catch for NumberFormatException — falls back to default 60s with WARN log.
| return false; | ||
| } | ||
| // Check execution state | ||
| if (executionStatus != null) { |
There was a problem hiding this comment.
[MEDIUM] isHeartbeatEligible() does not exclude StatementType.METADATA
boolean isHeartbeatEligible() {
if (executionResult == null) return false;
if (resultSetType == ResultSetType.SEA_INLINE) return false;
if (statementType == StatementType.UPDATE) return false; // ← UPDATE filtered, METADATA NOT
if (executionStatus != null) {
com.databricks.jdbc.api.ExecutionState state = executionStatus.getExecutionState();
if (state == ExecutionState.CLOSED) return false;
if (state == ExecutionState.PENDING || state == ExecutionState.RUNNING) return false;
}
return true;
}Metadata calls (getColumns, getTables, getProcedures, getSchemas, etc.) can route through the SQL-execute path via DatabricksMetadataQueryClient and end up constructing a DatabricksResultSet via the standard SQL constructors with StatementType.METADATA. The eligibility check doesn't filter it.
For Thrift metadata calls that aren't direct-results (large catalogs, slow metastore), the result is SUCCEEDED → heartbeat fires.
Consequence: Heartbeat starts on every non-inline metadata call. Metadata operations are typically short and bounded — heartbeat RPCs are wasteful at best. Hot metadata loops (autocomplete, BI-tool reconnects, schema discovery) → bandwidth tax + unnecessary scheduler thread occupation, especially with the missing per-RPC timeout (see separate H8 finding in the review summary).
Fix:
// after the existing UPDATE check
if (statementType == StatementType.METADATA) {
return false;
}Also add a unit test in ResultSetHeartbeatEligibilityTest covering StatementType.METADATA × ResultSetType.{SEA_ARROW, THRIFT_*} to pin the behavior.
Posted via /full-review
There was a problem hiding this comment.
Intentionally kept metadata eligible. Operations like getColumns() on large schemas can return thousands of rows and take significant time — they benefit from heartbeat.
| } | ||
|
|
||
| /** Stops the heartbeat for this result set's statement. Idempotent. */ | ||
| private void stopHeartbeat() { |
There was a problem hiding this comment.
[MEDIUM] Stop-heartbeat boilerplate duplicated at 4 call sites + startHeartbeatIfEnabled is a 117-line method with 6 concerns
The same 4-line idiom appears four times across the codebase:
if (statementId != null) {
ResultHeartbeatManager mgr = connection.getHeartbeatManager();
if (mgr != null) {
mgr.stopHeartbeat(statementId);
}
}Call sites:
DatabricksResultSet.stopHeartbeat()(lines 432-455) — additionally wraps a 12-line pooled-connection unwrap.DatabricksStatement.close()(lines 175-181)DatabricksStatement.cancel()(lines 256-263)DatabricksStatement.resetForNewExecution()(lines 1013-1021)
The cancel path was missing before H11 was fixed — the very existence of H11 demonstrates the maintenance hazard. Any future change to the stop contract (telemetry, draining inflight counter, propagating a stop reason) requires touching 4 sites.
startHeartbeatIfEnabled itself (DatabricksResultSet.java:314-430) is 117 lines doing six things:
- Eligibility check (315-320)
- Connection unwrap (322-333) — duplicated with
stopHeartbeatat lines 437-447 - Manager lookup (335-338)
- Capture-variable setup with ~13 lines of justification comments (340-356)
- Heartbeat lambda — 62 lines with 5 distinct branches (357-419)
- Scheduling + log (421-426)
The orphan-flag historical comment (lines 350-354) only makes sense if you've read the C1 review thread. New maintainers will be tempted to "clean up" these comments and silently re-introduce the orphan-flag bug.
Fix:
-
Add a helper on
DatabricksConnectionnext togetHeartbeatManager():void stopHeartbeat(StatementId id) { if (id != null && heartbeatManager != null) { heartbeatManager.stopHeartbeat(id); } }
Replace all 4 call sites with
connection.stopHeartbeat(statementId);. -
Extract the pooled-connection unwrap block (duplicated in
startHeartbeatIfEnabledandstopHeartbeat):private DatabricksConnection unwrapConnection() throws SQLException { Connection raw = parentStatement.getStatement().getConnection(); if (raw instanceof DatabricksConnection) return (DatabricksConnection) raw; if (raw.isWrapperFor(DatabricksConnection.class)) return raw.unwrap(DatabricksConnection.class); return null; }
-
Extract the lambda body into a package-private static helper:
private static Runnable buildHeartbeatTask( IDatabricksClient client, ResultHeartbeatManager mgr, StatementId id, int maxConsecutiveFailures) { ... }
The lambda has zero
thisreferences after C2 was fixed — extraction is mechanical. Also makes each failure-handling branch individually unit-testable (today they can only be exercised via the full scheduler — see test-coverage gap finding).
After the refactor, startHeartbeatIfEnabled shrinks to ~20 lines: eligibility → unwrap → mgr.startHeartbeat(id, buildHeartbeatTask(...)) → log.
Posted via /full-review
There was a problem hiding this comment.
The duplication is intentional — each call site has a specific purpose (close, cancel, next()→false, re-execution). The 4-line idiom is short and idempotent. Extracting to a helper would add a level of indirection for minimal benefit.
Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
… context, thread names - Add <excludedGroups>e2e</excludedGroups> to surefire config so @tag("e2e") HeartbeatIntegrationTest is excluded from unit test CI - Wrap getHeartbeatIntervalSeconds() parseInt in try-catch for NumberFormatException — falls back to default 60s with WARN log - Clear DatabricksThreadContextHolder.setStatementId() in finally block of Thrift checkStatementAlive() to prevent telemetry cross-contamination - Include connection UUID in heartbeat thread names for debuggability in pooled environments (e.g., "databricks-jdbc-heartbeat-<uuid>-1") - Resolve merge conflicts: keep heartbeat stop + proactive server close in ResultSet.close(), keep heartbeat stop + serverOperationClosed guard in Statement.cancel() and resetForNewExecution Co-authored-by: Isaac Signed-off-by: Gopal Lal <gopal.lal@databricks.com>
Summary
Design + implementation for PECOBLR-2321: periodic heartbeat polling to keep server-side result state alive while the client consumes results slowly.
Problem
When users read query results slowly (pausing between
next()calls), the warehouse can auto-stop after its idle timeout. For inline results (data only on cluster, not uploaded to cloud storage), this means permanent data loss. The user gets errors likeINVALID_HANDLE_STATUSor "operation not found".Solution
A
ResultHeartbeatManagerthat periodically callsGetStatementStatus(SEA) orGetOperationStatus(Thrift) to signal the server that results are still being consumed. Opt-in viaEnableHeartbeat=1connection parameter (default false due to cost implications).Design doc
docs/design/HEARTBEAT_KEEP_ALIVE.md— includes cross-driver survey, Mermaid diagrams (sequence flows, state machine, class diagram), and detailed lifecycle analysis.Heartbeat eligibility (skipped when not needed)
getExecutionResult()— heartbeat starts only when ResultSet is constructedError resilience
Zero-leak guarantee
Heartbeat stops in 4 places:
next()returns false,ResultSet.close(),Statement.close(),Connection.close()Implementation
ResultHeartbeatManager— per-connection manager withScheduledExecutorService(daemon thread)ResultHeartbeatManagerTest— 7 unit testsDatabricksJdbcUrlParams—EnableHeartbeat(default 0),HeartbeatIntervalSeconds(default 60)DatabricksConnectionContext— getter methodsDatabricksConnection— creates/shuts down managerDatabricksResultSet— starts heartbeat in constructor, stops on close/next-falseDatabricksStatement— safety net stop in close()IDatabricksClient—checkStatementAlive()default methodDatabricksSdkClient— SEA heartbeat via GET/sql/statements/{id}DatabricksThriftServiceClient— Thrift heartbeat viaGetOperationStatus