Skip to content

Commit e8e980a

Browse files
committed
Reduced allocation overhead, eliminated CPU-spinning waits, cleaner waiter coordination in the ConnectionPool
1 parent 86fc3c1 commit e8e980a

1 file changed

Lines changed: 134 additions & 95 deletions

File tree

src/javaxt/sql/ConnectionPool.java

Lines changed: 134 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.util.concurrent.*;
1212
import java.util.concurrent.atomic.AtomicBoolean;
1313
import java.util.concurrent.atomic.AtomicInteger;
14+
import java.util.concurrent.locks.LockSupport;
1415

1516
//******************************************************************************
1617
//** ConnectionPool
@@ -49,6 +50,7 @@ public class ConnectionPool {
4950
private final ConcurrentLinkedQueue<PooledConnectionWrapper> recycledConnections = new ConcurrentLinkedQueue<>();
5051
private final ConcurrentHashMap<PooledConnection, PooledConnectionWrapper> connectionWrappers = new ConcurrentHashMap<>();
5152
private volatile PooledConnection connectionInTransition;
53+
private final ConcurrentLinkedQueue<Thread> waiters = new ConcurrentLinkedQueue<>();
5254

5355
// Validation caching for performance optimization
5456
private final ConcurrentHashMap<PooledConnection, Long> validationCache = new ConcurrentHashMap<>();
@@ -140,29 +142,12 @@ public ConnectionPool(ConnectionPoolDataSource dataSource, int maxConnections, M
140142
public Connection getConnection() throws SQLException {
141143
long time = System.currentTimeMillis();
142144
long timeoutTime = time + timeoutMs;
143-
int triesWithoutDelay = getInactiveConnections() + 1;
144-
145-
while (true) {
145+
while (System.currentTimeMillis() < timeoutTime) {
146146
Connection conn = getConnection(time, timeoutTime);
147-
if (conn != null) {
148-
return conn;
149-
}
150-
triesWithoutDelay--;
151-
if (triesWithoutDelay <= 0) {
152-
triesWithoutDelay = 0;
153-
try {
154-
// Intentional sleep to avoid busy waiting when no connections are available
155-
Thread.sleep(250);
156-
}
157-
catch (InterruptedException e) {
158-
throw new RuntimeException("Interrupted while waiting for a valid database connection.", e);
159-
}
160-
}
147+
if (conn != null) return conn;
161148
time = System.currentTimeMillis();
162-
if (time >= timeoutTime) {
163-
throw new TimeoutException("Timeout while waiting for a valid database connection.");
164-
}
165149
}
150+
throw new TimeoutException("Timeout while waiting for a valid database connection.");
166151
}
167152

168153

@@ -291,6 +276,11 @@ public void close() throws SQLException {
291276
}
292277

293278
connectionWrappers.clear();
279+
280+
Thread w;
281+
while ((w = waiters.poll()) != null) {
282+
LockSupport.unpark(w);
283+
}
294284
}
295285

296286

@@ -398,51 +388,84 @@ private Connection getConnection(long time, long timeoutTime) {
398388
//** acquireConnection
399389
//**************************************************************************
400390
private Connection acquireConnection(long timeoutMs) throws SQLException {
391+
throwIfDisposed();
392+
393+
long deadline = System.currentTimeMillis() + timeoutMs;
394+
Thread me = Thread.currentThread();
395+
while (true) {
396+
Connection conn = tryAcquireOnce();
397+
if (conn != null) return conn;
398+
399+
long remainingMs = deadline - System.currentTimeMillis();
400+
if (remainingMs <= 0) throw new TimeoutException();
401+
402+
waiters.offer(me);
403+
try {
404+
conn = tryAcquireOnce();
405+
if (conn != null) return conn;
406+
407+
408+
throwIfDisposed();
409+
LockSupport.parkNanos(this, TimeUnit.MILLISECONDS.toNanos(remainingMs));
410+
throwIfDisposed();
411+
412+
if (Thread.interrupted()) {
413+
throw new RuntimeException("Interrupted while waiting for a database connection.");
414+
}
415+
}
416+
finally {
417+
waiters.remove(me);
418+
}
419+
}
420+
}
421+
422+
private void throwIfDisposed() {
401423
if (isDisposed.get()) {
402424
throw new IllegalStateException("Connection pool has been disposed.");
403425
}
426+
}
404427

405-
long startTime = System.currentTimeMillis();
406-
long timeoutTime = startTime + timeoutMs;
407428

408-
while (System.currentTimeMillis() < timeoutTime) {
409-
// First, try to get a recycled connection
410-
Connection recycledConn = getRecycledConnection();
411-
if (recycledConn != null) {
412-
return recycledConn;
413-
}
429+
//**************************************************************************
430+
//** tryAcquireOnce
431+
//**************************************************************************
432+
/** Single non-blocking attempt to obtain a connection: first from the
433+
* recycled queue, then by creating a new one if the pool isn't full.
434+
* Returns null if neither path yielded a connection.
435+
*/
436+
private Connection tryAcquireOnce() throws SQLException {
437+
if (isDisposed.get()) return null;
414438

415-
// No recycled connection available, try to create a new one
416-
// Use atomic counter for lock-free connection limit control
417-
int currentTotal = totalConnections.get();
418-
if (currentTotal < maxConnections) {
419-
if (totalConnections.compareAndSet(currentTotal, currentTotal + 1)) {
420-
try {
421-
Connection conn = createNewConnection();
422-
if (conn != null) {
423-
return conn;
424-
} else {
425-
// Connection creation failed, decrement counter
426-
totalConnections.decrementAndGet();
427-
}
428-
} catch (SQLException e) {
429-
// Connection creation failed, decrement counter
430-
totalConnections.decrementAndGet();
431-
// Continue to next iteration to try again
432-
}
439+
Connection conn = getRecycledConnection();
440+
if (conn != null) return conn;
441+
442+
int currentTotal = totalConnections.get();
443+
if (currentTotal < maxConnections) {
444+
if (totalConnections.compareAndSet(currentTotal, currentTotal + 1)) {
445+
try {
446+
Connection c = createNewConnection();
447+
if (c != null) return c;
448+
totalConnections.decrementAndGet();
449+
}
450+
catch (SQLException e) {
451+
totalConnections.decrementAndGet();
452+
throw e;
433453
}
434-
}
435-
// If we couldn't create a connection, wait a bit and try again
436-
try {
437-
Thread.sleep(10);
438-
}
439-
catch (InterruptedException e) {
440-
Thread.currentThread().interrupt();
441-
throw new RuntimeException("Interrupted while waiting for a database connection.", e);
442454
}
443455
}
456+
return null;
457+
}
458+
444459

445-
throw new TimeoutException();
460+
//**************************************************************************
461+
//** wakeOneWaiter
462+
//**************************************************************************
463+
/** Unparks the longest-waiting thread, if any. Called whenever a connection
464+
* is returned to the recycled queue or a slot is freed via dispose.
465+
*/
466+
private void wakeOneWaiter() {
467+
Thread t = waiters.poll();
468+
if (t != null) LockSupport.unpark(t);
446469
}
447470

448471

@@ -498,14 +521,12 @@ private Connection getRecycledConnection() throws SQLException {
498521
// This updates the underlying connection reference without creating a new wrapper object
499522
wrapper.connection.open(rawConn, database);
500523

501-
// Connection successfully acquired! Now update the wrapper state
502-
// IMPORTANT: Only update wrapper and map AFTER successfully acquiring the connection
503-
// This prevents race conditions where the wrapper state changes before the connection is ready
504-
PooledConnectionWrapper updatedWrapper = wrapper.markUsed();
505-
connectionWrappers.put(pconn, updatedWrapper);
524+
// Connection successfully acquired - mark wrapper as used in place.
525+
wrapper.markUsed();
506526

507-
return updatedWrapper.connection; // Return reused wrapper with fresh connection
508-
} catch (SQLException e) {
527+
return wrapper.connection; // Return reused wrapper with fresh connection
528+
}
529+
catch (SQLException e) {
509530
connectionInTransition = null;
510531
// Failed to acquire connection
511532

@@ -525,14 +546,17 @@ private Connection getRecycledConnection() throws SQLException {
525546

526547
pconn.removeConnectionEventListener(poolConnectionEventListener);
527548
pconn.close();
528-
} catch (SQLException ex) {
549+
}
550+
catch (SQLException ex) {
529551
// Ignore close errors for failed connections
530-
} finally {
552+
}
553+
finally {
531554
doPurgeConnection.set(false);
532555
totalConnections.decrementAndGet();
533556
}
534557
return null;
535-
} finally {
558+
}
559+
finally {
536560
connectionInTransition = null;
537561
}
538562
}
@@ -562,7 +586,8 @@ private Connection createNewConnection() throws SQLException {
562586

563587
// totalConnections was already incremented in acquireConnection
564588
return connection;
565-
} catch (SQLException e) {
589+
}
590+
catch (SQLException e) {
566591
connectionInTransition = null;
567592
// Connection creation failed, decrement the activeConnections counter we just incremented
568593
activeConnections.decrementAndGet();
@@ -572,17 +597,21 @@ private Connection createNewConnection() throws SQLException {
572597
try {
573598
pconn.removeConnectionEventListener(poolConnectionEventListener);
574599
pconn.close();
575-
} catch (SQLException ex) {
600+
}
601+
catch (SQLException ex) {
576602
// Ignore close errors for failed connections
577-
} finally {
603+
}
604+
finally {
578605
doPurgeConnection.set(false);
579606
totalConnections.decrementAndGet();
580607
}
581608
throw e;
582-
} finally {
609+
}
610+
finally {
583611
connectionInTransition = null;
584612
}
585-
} catch (SQLException e) {
613+
}
614+
catch (SQLException e) {
586615
throw e;
587616
}
588617
}
@@ -623,7 +652,8 @@ private void stopHealthMonitoring() {
623652
healthCheckExecutor.shutdownNow();
624653
log("Health monitoring thread did not terminate gracefully");
625654
}
626-
} catch (InterruptedException e) {
655+
}
656+
catch (InterruptedException e) {
627657
healthCheckExecutor.shutdownNow();
628658
Thread.currentThread().interrupt();
629659
}
@@ -825,17 +855,10 @@ private void recycleConnection (PooledConnection pconn) {
825855
// Get the existing wrapper
826856
PooledConnectionWrapper wrapper = connectionWrappers.get(pconn);
827857

828-
// Check if this connection is already in the recycled queue (double-close protection)
829-
if (wrapper != null) {
830-
for (PooledConnectionWrapper w : recycledConnections) {
831-
if (w.pooledConnection == pconn) {
832-
log("Warning: Connection already recycled, ignoring duplicate close");
833-
return;
834-
}
835-
}
836-
}
837-
838-
// Use atomic decrement to avoid TOCTOU race condition
858+
// Use atomic decrement to avoid TOCTOU race condition. Double-close
859+
// protection is handled below by the activeConnections < 0 check; the
860+
// previous O(n) queue scan was redundant with that and showed up as a
861+
// measurable cost in high-throughput tests.
839862
int prev = activeConnections.decrementAndGet();
840863

841864
if (prev < 0) {
@@ -857,10 +880,12 @@ private void recycleConnection (PooledConnection pconn) {
857880

858881
// Get the existing wrapper and update its usage time
859882
if (wrapper != null) {
860-
// Update the wrapper with current usage time and add to recycled connections
861-
// The javaxt.sql.Connection wrapper is reused - no new object creation!
862-
wrapper = wrapper.markUsed();
883+
// Mark the wrapper as used (in place - no allocation) and return
884+
// it to the recycled queue. Same instance stays in connectionWrappers.
885+
wrapper.markUsed();
863886
recycledConnections.offer(wrapper);
887+
// Wake one parked waiter (if any) so it can pick this up.
888+
wakeOneWaiter();
864889
}
865890
else {
866891
// This shouldn't happen in normal operation - log a warning
@@ -924,6 +949,12 @@ private void disposeConnection (PooledConnection pconn) {
924949
log("Error while closing database connection: "+e.toString());
925950
}
926951
assertInnerState();
952+
953+
// A slot freed up: wake a parked waiter so it can try to create a
954+
// replacement connection. (recycleConnection wakes a waiter for the
955+
// "got a recycled" path; this handles the "got a free totalConnections
956+
// slot" path.)
957+
wakeOneWaiter();
927958
}
928959

929960

@@ -994,15 +1025,20 @@ public void connectionErrorOccurred (ConnectionEvent event) {
9941025
private static class PooledConnectionWrapper {
9951026
private final Connection connection;
9961027
private final PooledConnection pooledConnection;
997-
private long createdTime;
998-
private long lastUsedTime;
999-
private final boolean isWarmup;
1028+
private final long createdTime;
1029+
// lastUsedTime and isWarmup are mutated in place by markUsed() on every
1030+
// acquire. They are read from other threads (stats, health-check) so
1031+
// declare them volatile to avoid torn 64-bit reads on JDK 8 and to
1032+
// publish updates without synchronization.
1033+
private volatile long lastUsedTime;
1034+
private volatile boolean isWarmup;
10001035

10011036
PooledConnectionWrapper(Connection connection, PooledConnection pooledConnection, boolean isWarmup) {
10021037
this.connection = connection;
10031038
this.pooledConnection = pooledConnection;
1004-
this.createdTime = System.currentTimeMillis();
1005-
this.lastUsedTime = System.currentTimeMillis();
1039+
long now = System.currentTimeMillis();
1040+
this.createdTime = now;
1041+
this.lastUsedTime = now;
10061042
this.isWarmup = isWarmup;
10071043
}
10081044

@@ -1014,11 +1050,14 @@ boolean isExpired(long maxAgeMs) {
10141050
return System.currentTimeMillis() - createdTime > maxAgeMs;
10151051
}
10161052

1017-
PooledConnectionWrapper markUsed() {
1018-
PooledConnectionWrapper wrapper = new PooledConnectionWrapper(connection, pooledConnection, false);
1019-
wrapper.createdTime = createdTime;
1020-
wrapper.lastUsedTime = System.currentTimeMillis();
1021-
return wrapper;
1053+
/** Mutates the wrapper in place to mark it as just-used. The wrapper
1054+
* object is reused across acquire/recycle cycles - no allocation, no
1055+
* map update required, since both the queue and the connectionWrappers
1056+
* map continue to reference the same instance.
1057+
*/
1058+
void markUsed() {
1059+
this.lastUsedTime = System.currentTimeMillis();
1060+
this.isWarmup = false;
10221061
}
10231062
}
10241063

0 commit comments

Comments
 (0)