From 2f35cc51f779443911cb79c4e891d8542ebff7fb Mon Sep 17 00:00:00 2001 From: spricoder Date: Sun, 31 May 2026 11:42:39 +0800 Subject: [PATCH 1/9] Wire RPC compression flag through Session to its connections The enableRPCCompression option set via Session::open(bool) or the session builder was never propagated to SessionConnection, whose flag was hardcoded to false, so the compact Thrift protocol never took effect. Thread the flag from the builder/open() into both the data SessionConnection and the node-discovery NodesSupplier client so compression actually applies. --- iotdb-client/client-cpp/src/main/Session.cpp | 14 ++++++++++++-- iotdb-client/client-cpp/src/main/Session.h | 2 ++ .../client-cpp/src/main/SessionConnection.cpp | 3 +++ 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/iotdb-client/client-cpp/src/main/Session.cpp b/iotdb-client/client-cpp/src/main/Session.cpp index a2b07ed7afb93..6227bca416108 100644 --- a/iotdb-client/client-cpp/src/main/Session.cpp +++ b/iotdb-client/client-cpp/src/main/Session.cpp @@ -768,8 +768,14 @@ void Session::initNodesSupplier(const std::vector& nodeUrls) { } if (enableAutoFetch_) { - nodesSupplier_ = - NodesSupplier::create(endPoints, username_, password_, useSSL_, trustCertFilePath_); + // Propagate the compression setting to the node-discovery client too, so the + // background "show cluster" refresh uses the same protocol as data sessions. + // Other parameters keep their library defaults to preserve existing behavior. + nodesSupplier_ = NodesSupplier::create( + endPoints, username_, password_, useSSL_, trustCertFilePath_, /*zoneId=*/"", + ThriftConnection::THRIFT_DEFAULT_BUFFER_SIZE, ThriftConnection::THRIFT_MAX_FRAME_SIZE, + ThriftConnection::CONNECTION_TIMEOUT_IN_MS, enableRPCCompression_, + getVersionString(version)); } else { nodesSupplier_ = make_shared(endPoints); } @@ -944,6 +950,10 @@ void Session::open(bool enableRPCCompression, int connectionTimeoutInMs) { return; } + // Honor compression requested either via the builder or this open() call, so + // the negotiated protocol (compact vs binary) actually reflects the setting. + enableRPCCompression_ = enableRPCCompression_ || enableRPCCompression; + try { initDefaultSessionConnection(); } catch (const exception& e) { diff --git a/iotdb-client/client-cpp/src/main/Session.h b/iotdb-client/client-cpp/src/main/Session.h index cedfaeba1966d..4a629565af0a3 100644 --- a/iotdb-client/client-cpp/src/main/Session.h +++ b/iotdb-client/client-cpp/src/main/Session.h @@ -595,6 +595,7 @@ class Session { std::string database_; bool enableAutoFetch_ = true; bool enableRedirection_ = true; + bool enableRPCCompression_ = false; std::shared_ptr nodesSupplier_; friend class SessionConnection; friend class TableSession; @@ -772,6 +773,7 @@ class Session { this->database_ = builder->database; this->enableAutoFetch_ = builder->enableAutoFetch; this->enableRedirection_ = builder->enableRedirections; + this->enableRPCCompression_ = builder->enableRPCCompression; this->connectTimeoutMs_ = builder->connectTimeoutMs; this->nodeUrls_ = builder->nodeUrls; this->useSSL_ = builder->useSSL; diff --git a/iotdb-client/client-cpp/src/main/SessionConnection.cpp b/iotdb-client/client-cpp/src/main/SessionConnection.cpp index a7fa3beb6ffc4..1746d2049cb72 100644 --- a/iotdb-client/client-cpp/src/main/SessionConnection.cpp +++ b/iotdb-client/client-cpp/src/main/SessionConnection.cpp @@ -39,6 +39,9 @@ SessionConnection::SessionConnection(Session* session_ptr, const TEndPoint& endp retryIntervalMs(retryInterval), connectionTimeoutInMs(connectionTimeout), sqlDialect(std::move(dialect)), database(std::move(db)) { this->zoneId = zoneId.empty() ? getSystemDefaultZoneId() : zoneId; + // Inherit the compression setting negotiated by the owning Session so the + // chosen Thrift protocol (compact when compression is on) is consistent. + this->enableRPCCompression = session->enableRPCCompression_; endPointList.push_back(endpoint); init(endPoint, session->useSSL_, session->trustCertFilePath_); } From 8566fd9182c5100e5ed39e18d8496841c566cf15 Mon Sep 17 00:00:00 2001 From: spricoder Date: Sun, 31 May 2026 11:43:18 +0800 Subject: [PATCH 2/9] Use snprintf for Tablet bounds-check error messages Tablet::addValue and the OBJECT-value overload formatted out-of-range diagnostics with sprintf into a fixed 100-byte stack buffer, risking an overflow. Switch to snprintf bounded by sizeof(buffer) and cast the size_t arguments to long to match the %ld format. --- iotdb-client/client-cpp/src/main/Session.h | 26 ++++++++++++---------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/iotdb-client/client-cpp/src/main/Session.h b/iotdb-client/client-cpp/src/main/Session.h index 4a629565af0a3..3f32b6c67d69d 100644 --- a/iotdb-client/client-cpp/src/main/Session.h +++ b/iotdb-client/client-cpp/src/main/Session.h @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -258,16 +259,17 @@ class Tablet { template void addValue(size_t schemaId, size_t rowIndex, const T& value) { if (schemaId >= schemas.size()) { char tmpStr[100]; - sprintf(tmpStr, - "Tablet::addValue(), schemaId >= schemas.size(). schemaId=%ld, schemas.size()=%ld.", - schemaId, schemas.size()); + snprintf(tmpStr, sizeof(tmpStr), + "Tablet::addValue(), schemaId >= schemas.size(). schemaId=%ld, schemas.size()=%ld.", + (long)schemaId, (long)schemas.size()); throw std::out_of_range(tmpStr); } if (rowIndex >= rowSize) { char tmpStr[100]; - sprintf(tmpStr, "Tablet::addValue(), rowIndex >= rowSize. rowIndex=%ld, rowSize.size()=%ld.", - rowIndex, rowSize); + snprintf(tmpStr, sizeof(tmpStr), + "Tablet::addValue(), rowIndex >= rowSize. rowIndex=%ld, rowSize.size()=%ld.", + (long)rowIndex, (long)rowSize); throw std::out_of_range(tmpStr); } @@ -317,19 +319,19 @@ class Tablet { // Check schemaId bounds if (schemaId >= schemas.size()) { char tmpStr[100]; - sprintf(tmpStr, - "Tablet::addBinaryValueWithMeta(), schemaId >= schemas.size(). schemaId=%ld, " - "schemas.size()=%ld.", - schemaId, schemas.size()); + snprintf(tmpStr, sizeof(tmpStr), + "Tablet::addBinaryValueWithMeta(), schemaId >= schemas.size(). schemaId=%ld, " + "schemas.size()=%ld.", + (long)schemaId, (long)schemas.size()); throw std::out_of_range(tmpStr); } // Check rowIndex bounds if (rowIndex >= rowSize) { char tmpStr[100]; - sprintf(tmpStr, - "Tablet::addBinaryValueWithMeta(), rowIndex >= rowSize. rowIndex=%ld, rowSize=%ld.", - rowIndex, rowSize); + snprintf(tmpStr, sizeof(tmpStr), + "Tablet::addBinaryValueWithMeta(), rowIndex >= rowSize. rowIndex=%ld, rowSize=%ld.", + (long)rowIndex, (long)rowSize); throw std::out_of_range(tmpStr); } From 0dbcdd16ae0bcf17d790271b45f83ee5b2266d50 Mon Sep 17 00:00:00 2001 From: spricoder Date: Sun, 31 May 2026 11:43:29 +0800 Subject: [PATCH 3/9] Append big-endian bytes in MyStringBuffer instead of overwriting On big-endian hosts MyStringBuffer::putOrderedByte used str.assign, which replaced the whole buffer with each numeric write and corrupted previously serialized content. Use str.append so bytes accumulate, matching the little-endian path. --- iotdb-client/client-cpp/src/main/Common.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-client/client-cpp/src/main/Common.cpp b/iotdb-client/client-cpp/src/main/Common.cpp index 38b913c2270ca..842150241da83 100644 --- a/iotdb-client/client-cpp/src/main/Common.cpp +++ b/iotdb-client/client-cpp/src/main/Common.cpp @@ -418,7 +418,7 @@ const char* MyStringBuffer::getOrderedByte(size_t len) { void MyStringBuffer::putOrderedByte(char* buf, int len) { if (isBigEndian) { - str.assign(buf, len); + str.append(buf, len); } else { for (int i = len - 1; i > -1; i--) { str += buf[i]; From 6e635c37ca074d132b61b6096e90fc0028bf1e7c Mon Sep 17 00:00:00 2001 From: spricoder Date: Sun, 31 May 2026 11:43:49 +0800 Subject: [PATCH 4/9] Add thread-safe SessionPool to the C++ client Introduce SessionPool and SessionPoolBuilder so multiple threads can share a bounded set of connections without external locking. A single Session is not safe to use concurrently, so the pool lends each Session to one borrower at a time via an RAII PooledSession handle and reclaims it on scope exit. Sessions are created outside the lock to avoid blocking other borrowers during the handshake, and getSession() blocks up to a configurable timeout when the pool is exhausted. Query results are returned as a PooledSessionDataSet that keeps the Session leased until the result set is fully read, since SessionDataSet lazily fetches further blocks over the same connection. Connections that raise IoTDBConnectionException are evicted rather than recycled. Add integration tests covering basic borrow/insert/query, concurrent writers, and exhaustion-timeout behavior. --- .../client-cpp/src/main/SessionPool.cpp | 266 ++++++++++++ .../client-cpp/src/main/SessionPool.h | 394 ++++++++++++++++++ .../client-cpp/src/test/cpp/sessionIT.cpp | 129 +++++- 3 files changed, 788 insertions(+), 1 deletion(-) create mode 100644 iotdb-client/client-cpp/src/main/SessionPool.cpp create mode 100644 iotdb-client/client-cpp/src/main/SessionPool.h diff --git a/iotdb-client/client-cpp/src/main/SessionPool.cpp b/iotdb-client/client-cpp/src/main/SessionPool.cpp new file mode 100644 index 0000000000000..681078a41cb0d --- /dev/null +++ b/iotdb-client/client-cpp/src/main/SessionPool.cpp @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "SessionPool.h" + +void PooledSession::reset() { + if (session_ && pool_ != nullptr) { + pool_->putBack(session_, broken_); + } + pool_ = nullptr; + session_ = nullptr; + broken_ = false; +} + +SessionPool::SessionPool(std::string host, int rpcPort, std::string username, std::string password, + size_t maxSize) + : host_(std::move(host)), rpcPort_(rpcPort), username_(std::move(username)), + password_(std::move(password)), maxSize_(maxSize == 0 ? 1 : maxSize) {} + +SessionPool::SessionPool(std::vector nodeUrls, std::string username, + std::string password, size_t maxSize) + : rpcPort_(AbstractSessionBuilder::DEFAULT_RPC_PORT), nodeUrls_(std::move(nodeUrls)), + username_(std::move(username)), password_(std::move(password)), + maxSize_(maxSize == 0 ? 1 : maxSize) {} + +SessionPool::~SessionPool() { + try { + close(); + } catch (const std::exception& e) { + log_debug(std::string("SessionPool::~SessionPool(), ") + e.what()); + } +} + +SessionPool& SessionPool::setFetchSize(int fetchSize) { + fetchSize_ = fetchSize; + return *this; +} + +SessionPool& SessionPool::setZoneId(std::string zoneId) { + zoneId_ = std::move(zoneId); + return *this; +} + +SessionPool& SessionPool::setSqlDialect(std::string sqlDialect) { + sqlDialect_ = std::move(sqlDialect); + return *this; +} + +SessionPool& SessionPool::setDatabase(std::string database) { + database_ = std::move(database); + return *this; +} + +SessionPool& SessionPool::setEnableRedirection(bool enable) { + enableRedirection_ = enable; + return *this; +} + +SessionPool& SessionPool::setEnableAutoFetch(bool enable) { + enableAutoFetch_ = enable; + return *this; +} + +SessionPool& SessionPool::setEnableRPCCompression(bool enable) { + enableRPCCompression_ = enable; + return *this; +} + +SessionPool& SessionPool::setConnectTimeoutMs(int connectTimeoutMs) { + connectTimeoutMs_ = connectTimeoutMs; + return *this; +} + +SessionPool& SessionPool::setWaitToGetSessionTimeoutMs(int64_t timeoutMs) { + waitTimeoutMs_ = timeoutMs; + return *this; +} + +SessionPool& SessionPool::setUseSSL(bool useSSL) { + useSSL_ = useSSL; + return *this; +} + +SessionPool& SessionPool::setTrustCertFilePath(std::string path) { + trustCertFilePath_ = std::move(path); + return *this; +} + +std::shared_ptr SessionPool::constructNewSession() { + AbstractSessionBuilder builder; + builder.host = host_; + builder.rpcPort = rpcPort_; + builder.nodeUrls = nodeUrls_; + builder.username = username_; + builder.password = password_; + builder.zoneId = zoneId_; + builder.fetchSize = fetchSize_; + builder.sqlDialect = sqlDialect_; + builder.database = database_; + builder.enableAutoFetch = enableAutoFetch_; + builder.enableRedirections = enableRedirection_; + builder.enableRPCCompression = enableRPCCompression_; + builder.connectTimeoutMs = connectTimeoutMs_; + builder.useSSL = useSSL_; + builder.trustCertFilePath = trustCertFilePath_; + + auto session = std::make_shared(&builder); + session->open(enableRPCCompression_, connectTimeoutMs_); + return session; +} + +std::shared_ptr SessionPool::acquire(int64_t timeoutMs) { + const int64_t effectiveTimeout = timeoutMs > 0 ? timeoutMs : waitTimeoutMs_; + std::unique_lock lock(mutex_); + const auto deadline = + std::chrono::steady_clock::now() + std::chrono::milliseconds(effectiveTimeout); + + while (true) { + if (closed_) { + throw IoTDBException("SessionPool is closed."); + } + if (!idleQueue_.empty()) { + auto session = idleQueue_.front(); + idleQueue_.pop_front(); + return session; + } + if (size_ < maxSize_) { + // Reserve a slot, then build the connection outside the lock so other + // borrowers are not blocked by network/handshake latency. + ++size_; + lock.unlock(); + std::shared_ptr session; + try { + session = constructNewSession(); + } catch (...) { + lock.lock(); + --size_; + cv_.notify_one(); + throw; + } + return session; + } + + // Pool exhausted: wait for a Session to be returned. + if (effectiveTimeout <= 0) { + cv_.wait(lock); + } else { + if (cv_.wait_until(lock, deadline) == std::cv_status::timeout && idleQueue_.empty() && + size_ >= maxSize_ && !closed_) { + throw IoTDBException("Wait to get session timeout in SessionPool, maxSize=" + + std::to_string(maxSize_) + ", waitTimeoutMs=" + + std::to_string(effectiveTimeout) + "."); + } + } + } +} + +void SessionPool::putBack(const std::shared_ptr& session, bool broken) { + std::lock_guard lock(mutex_); + if (broken || closed_) { + // Drop the Session and free its slot so a healthy replacement can be created + // on demand. The caller (PooledSession::reset) still holds the last reference + // and tears the connection down after we return, i.e. outside this lock. + --size_; + } else { + idleQueue_.push_back(session); + } + cv_.notify_one(); +} + +PooledSession SessionPool::getSession() { + return getSession(waitTimeoutMs_); +} + +PooledSession SessionPool::getSession(int64_t timeoutMs) { + return PooledSession(this, acquire(timeoutMs)); +} + +void SessionPool::insertTablet(Tablet& tablet, bool sorted) { + execute([&](Session& s) { s.insertTablet(tablet, sorted); }); +} + +void SessionPool::insertAlignedTablet(Tablet& tablet, bool sorted) { + execute([&](Session& s) { s.insertAlignedTablet(tablet, sorted); }); +} + +void SessionPool::insertTablets(std::unordered_map& tablets, bool sorted) { + execute([&](Session& s) { s.insertTablets(tablets, sorted); }); +} + +void SessionPool::insertRecord(const std::string& deviceId, int64_t time, + const std::vector& measurements, + const std::vector& values) { + execute([&](Session& s) { s.insertRecord(deviceId, time, measurements, values); }); +} + +void SessionPool::insertRecords(const std::vector& deviceIds, + const std::vector& times, + const std::vector>& measurementsList, + const std::vector>& valuesList) { + execute([&](Session& s) { s.insertRecords(deviceIds, times, measurementsList, valuesList); }); +} + +void SessionPool::executeNonQueryStatement(const std::string& sql) { + execute([&](Session& s) { s.executeNonQueryStatement(sql); }); +} + +PooledSessionDataSet SessionPool::executeQueryStatement(const std::string& sql) { + PooledSession lease = getSession(); + try { + auto dataSet = lease->executeQueryStatement(sql); + return PooledSessionDataSet(std::move(lease), std::move(dataSet)); + } catch (const IoTDBConnectionException&) { + lease.markBroken(); + throw; + } +} + +PooledSessionDataSet SessionPool::executeQueryStatement(const std::string& sql, + int64_t timeoutInMs) { + PooledSession lease = getSession(); + try { + auto dataSet = lease->executeQueryStatement(sql, timeoutInMs); + return PooledSessionDataSet(std::move(lease), std::move(dataSet)); + } catch (const IoTDBConnectionException&) { + lease.markBroken(); + throw; + } +} + +void SessionPool::close() { + std::deque> toClose; + { + std::lock_guard lock(mutex_); + if (closed_) { + return; + } + closed_ = true; + toClose.swap(idleQueue_); + size_ -= toClose.size(); + } + cv_.notify_all(); + // Sessions destructed here (outside the lock) close their connections. + toClose.clear(); +} + +size_t SessionPool::activeCount() { + std::lock_guard lock(mutex_); + return size_ - idleQueue_.size(); +} diff --git a/iotdb-client/client-cpp/src/main/SessionPool.h b/iotdb-client/client-cpp/src/main/SessionPool.h new file mode 100644 index 0000000000000..a54135469c819 --- /dev/null +++ b/iotdb-client/client-cpp/src/main/SessionPool.h @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef IOTDB_SESSIONPOOL_H +#define IOTDB_SESSIONPOOL_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "Session.h" + +/* + * A thread-safe pool of opened Session objects. + * + * A Session is NOT safe to use from multiple threads concurrently. SessionPool + * solves this by lending each Session to exactly one borrower at a time and + * reclaiming it afterwards, so many application threads can share a bounded set + * of physical connections without external locking. + * + * Two usage styles are supported: + * + * 1. RAII lease (recommended for arbitrary calls): + * { + * PooledSession s = pool.getSession(); // blocks up to the timeout + * s->insertTablet(tablet); // call any Session method + * } // automatically returned here + * + * 2. Convenience wrappers / generic execute() (recommended for hot paths): + * pool.insertTablet(tablet); + * pool.execute([&](Session& s) { s.insertRecord(...); }); + * + * Both styles evict a Session from the pool (instead of recycling it) when the + * operation throws IoTDBConnectionException, so a dead connection is never + * handed to the next borrower; a fresh one is created lazily on demand. + * + * Lifetime: a PooledSession returns its Session to the owning SessionPool when + * destroyed, so every PooledSession must not outlive the pool it came from. + */ +class SessionPool; + +class PooledSession { +public: + PooledSession() noexcept : pool_(nullptr), session_(nullptr), broken_(false) {} + + PooledSession(SessionPool* pool, std::shared_ptr session) + : pool_(pool), session_(std::move(session)), broken_(false) {} + + // Non-copyable: a leased Session is owned by exactly one borrower. + PooledSession(const PooledSession&) = delete; + PooledSession& operator=(const PooledSession&) = delete; + + PooledSession(PooledSession&& other) noexcept + : pool_(other.pool_), session_(std::move(other.session_)), broken_(other.broken_) { + other.pool_ = nullptr; + other.session_ = nullptr; + other.broken_ = false; + } + + PooledSession& operator=(PooledSession&& other) noexcept { + if (this != &other) { + reset(); + pool_ = other.pool_; + session_ = std::move(other.session_); + broken_ = other.broken_; + other.pool_ = nullptr; + other.session_ = nullptr; + other.broken_ = false; + } + return *this; + } + + ~PooledSession() { + reset(); + } + + Session* operator->() const { + return session_.get(); + } + + Session& operator*() const { + return *session_; + } + + explicit operator bool() const { + return static_cast(session_); + } + + // Mark the underlying connection as unusable so it is discarded (not recycled) + // when this lease is returned. Call this if you caught a connection error. + void markBroken() { + broken_ = true; + } + + // Eagerly return the Session to the pool before scope exit. + void release() { + reset(); + } + +private: + void reset(); + + SessionPool* pool_; + std::shared_ptr session_; + bool broken_; +}; + +/* + * Couples a query result set with the pooled Session that produced it. + * + * A SessionDataSet lazily fetches further result blocks over its Session's + * connection, so that Session must stay exclusively leased until iteration is + * finished. This wrapper holds the lease for exactly that long and returns the + * Session to the pool when destroyed. + */ +class PooledSessionDataSet { +public: + PooledSessionDataSet(PooledSession session, std::unique_ptr dataSet) + : session_(std::move(session)), dataSet_(std::move(dataSet)) {} + + PooledSessionDataSet(const PooledSessionDataSet&) = delete; + PooledSessionDataSet& operator=(const PooledSessionDataSet&) = delete; + PooledSessionDataSet(PooledSessionDataSet&&) noexcept = default; + PooledSessionDataSet& operator=(PooledSessionDataSet&&) noexcept = default; + + SessionDataSet* operator->() const { + return dataSet_.get(); + } + SessionDataSet& operator*() const { + return *dataSet_; + } + +private: + PooledSession session_; + std::unique_ptr dataSet_; +}; + +class SessionPool { +public: + static constexpr size_t DEFAULT_MAX_SIZE = 5; + static constexpr int64_t DEFAULT_WAIT_TIMEOUT_MS = 60 * 1000; + + // Single-host constructor. + SessionPool(std::string host, int rpcPort, std::string username, std::string password, + size_t maxSize = DEFAULT_MAX_SIZE); + + // Multi-node constructor. + SessionPool(std::vector nodeUrls, std::string username, std::string password, + size_t maxSize = DEFAULT_MAX_SIZE); + + ~SessionPool(); + + // Non-copyable, non-movable: the pool owns mutex/condition state. + SessionPool(const SessionPool&) = delete; + SessionPool& operator=(const SessionPool&) = delete; + + // ---- configuration (apply before the first getSession()) ---- + SessionPool& setFetchSize(int fetchSize); + SessionPool& setZoneId(std::string zoneId); + SessionPool& setSqlDialect(std::string sqlDialect); + SessionPool& setDatabase(std::string database); + SessionPool& setEnableRedirection(bool enable); + SessionPool& setEnableAutoFetch(bool enable); + SessionPool& setEnableRPCCompression(bool enable); + SessionPool& setConnectTimeoutMs(int connectTimeoutMs); + SessionPool& setWaitToGetSessionTimeoutMs(int64_t timeoutMs); + SessionPool& setUseSSL(bool useSSL); + SessionPool& setTrustCertFilePath(std::string path); + + // Borrow a Session. Blocks until one is free or a new one can be created, + // up to timeoutMs (<= 0 means use the pool default). Throws IoTDBException on + // timeout or when the pool is closed. + PooledSession getSession(); + PooledSession getSession(int64_t timeoutMs); + + // Generic helper: borrow a Session, run func(Session&), return/evict it, and + // forward the result. Evicts the Session on IoTDBConnectionException. + template + auto execute(Func&& func) -> decltype(func(std::declval())); + + // ---- convenience wrappers for common operations (with eviction on failure) ---- + void insertTablet(Tablet& tablet, bool sorted = false); + void insertAlignedTablet(Tablet& tablet, bool sorted = false); + void insertTablets(std::unordered_map& tablets, bool sorted = false); + void insertRecord(const std::string& deviceId, int64_t time, + const std::vector& measurements, + const std::vector& values); + void insertRecords(const std::vector& deviceIds, + const std::vector& times, + const std::vector>& measurementsList, + const std::vector>& valuesList); + void executeNonQueryStatement(const std::string& sql); + // The returned wrapper keeps the underlying Session leased until it is + // destroyed, so it is safe to iterate the result set across multiple fetches. + PooledSessionDataSet executeQueryStatement(const std::string& sql); + PooledSessionDataSet executeQueryStatement(const std::string& sql, int64_t timeoutInMs); + + // Close the pool: idle Sessions are closed immediately, in-use Sessions are + // closed when they are returned. Idempotent. + void close(); + + // ---- observability ---- + size_t getMaxSize() const { + return maxSize_; + } + // Number of Sessions currently borrowed. + size_t activeCount(); + +private: + friend class PooledSession; + + std::shared_ptr constructNewSession(); + std::shared_ptr acquire(int64_t timeoutMs); + void putBack(const std::shared_ptr& session, bool broken); + + // connection parameters + std::string host_; + int rpcPort_; + std::vector nodeUrls_; + std::string username_; + std::string password_; + std::string zoneId_; + int fetchSize_ = AbstractSessionBuilder::DEFAULT_FETCH_SIZE; + std::string sqlDialect_ = AbstractSessionBuilder::DEFAULT_SQL_DIALECT; + std::string database_; + bool enableRedirection_ = AbstractSessionBuilder::DEFAULT_ENABLE_REDIRECTIONS; + bool enableAutoFetch_ = AbstractSessionBuilder::DEFAULT_ENABLE_AUTO_FETCH; + bool enableRPCCompression_ = AbstractSessionBuilder::DEFAULT_ENABLE_RPC_COMPRESSION; + int connectTimeoutMs_ = AbstractSessionBuilder::DEFAULT_CONNECT_TIMEOUT_MS; + bool useSSL_ = false; + std::string trustCertFilePath_; + + // pool sizing / waiting policy + size_t maxSize_; + int64_t waitTimeoutMs_ = DEFAULT_WAIT_TIMEOUT_MS; + + // pool state, guarded by mutex_ + std::mutex mutex_; + std::condition_variable cv_; + std::deque> idleQueue_; + size_t size_ = 0; // total live Sessions (idle + borrowed) + bool closed_ = false; +}; + +template +auto SessionPool::execute(Func&& func) -> decltype(func(std::declval())) { + PooledSession lease = getSession(); + try { + return func(*lease); + } catch (const IoTDBConnectionException&) { + lease.markBroken(); + throw; + } +} + +/* + * Fluent builder for SessionPool, mirroring SessionBuilder / TableSessionBuilder. + * + * auto pool = SessionPoolBuilder() + * .host("127.0.0.1")->rpcPort(6667) + * ->username("root")->password("root") + * ->maxSize(10)->build(); + */ +class SessionPoolBuilder : public AbstractSessionBuilder { +public: + SessionPoolBuilder* host(const std::string& v) { + AbstractSessionBuilder::host = v; + return this; + } + SessionPoolBuilder* rpcPort(int v) { + AbstractSessionBuilder::rpcPort = v; + return this; + } + SessionPoolBuilder* nodeUrls(const std::vector& v) { + AbstractSessionBuilder::nodeUrls = v; + return this; + } + SessionPoolBuilder* username(const std::string& v) { + AbstractSessionBuilder::username = v; + return this; + } + SessionPoolBuilder* password(const std::string& v) { + AbstractSessionBuilder::password = v; + return this; + } + SessionPoolBuilder* zoneId(const std::string& v) { + AbstractSessionBuilder::zoneId = v; + return this; + } + SessionPoolBuilder* fetchSize(int v) { + AbstractSessionBuilder::fetchSize = v; + return this; + } + SessionPoolBuilder* database(const std::string& v) { + AbstractSessionBuilder::database = v; + return this; + } + SessionPoolBuilder* enableAutoFetch(bool v) { + AbstractSessionBuilder::enableAutoFetch = v; + return this; + } + SessionPoolBuilder* enableRedirections(bool v) { + AbstractSessionBuilder::enableRedirections = v; + return this; + } + SessionPoolBuilder* enableRPCCompression(bool v) { + AbstractSessionBuilder::enableRPCCompression = v; + return this; + } + SessionPoolBuilder* connectTimeoutMs(int v) { + AbstractSessionBuilder::connectTimeoutMs = v; + return this; + } + SessionPoolBuilder* useSSL(bool v) { + AbstractSessionBuilder::useSSL = v; + return this; + } + SessionPoolBuilder* trustCertFilePath(const std::string& v) { + AbstractSessionBuilder::trustCertFilePath = v; + return this; + } + SessionPoolBuilder* maxSize(size_t v) { + maxSize_ = v; + return this; + } + SessionPoolBuilder* waitToGetSessionTimeoutMs(int64_t v) { + waitTimeoutMs_ = v; + return this; + } + SessionPoolBuilder* sqlDialect(const std::string& v) { + AbstractSessionBuilder::sqlDialect = v; + return this; + } + + std::shared_ptr build() { + if (!AbstractSessionBuilder::nodeUrls.empty() && + (AbstractSessionBuilder::host != DEFAULT_HOST || + AbstractSessionBuilder::rpcPort != DEFAULT_RPC_PORT)) { + throw IoTDBException( + "SessionPool builder does not support setting node urls and host/rpcPort at the same " + "time."); + } + std::shared_ptr pool; + if (!AbstractSessionBuilder::nodeUrls.empty()) { + pool = std::make_shared(AbstractSessionBuilder::nodeUrls, + AbstractSessionBuilder::username, + AbstractSessionBuilder::password, maxSize_); + } else { + pool = std::make_shared( + AbstractSessionBuilder::host, AbstractSessionBuilder::rpcPort, + AbstractSessionBuilder::username, AbstractSessionBuilder::password, maxSize_); + } + pool->setFetchSize(AbstractSessionBuilder::fetchSize) + .setZoneId(AbstractSessionBuilder::zoneId) + .setSqlDialect(AbstractSessionBuilder::sqlDialect) + .setDatabase(AbstractSessionBuilder::database) + .setEnableRedirection(AbstractSessionBuilder::enableRedirections) + .setEnableAutoFetch(AbstractSessionBuilder::enableAutoFetch) + .setEnableRPCCompression(AbstractSessionBuilder::enableRPCCompression) + .setConnectTimeoutMs(AbstractSessionBuilder::connectTimeoutMs) + .setWaitToGetSessionTimeoutMs(waitTimeoutMs_) + .setUseSSL(AbstractSessionBuilder::useSSL) + .setTrustCertFilePath(AbstractSessionBuilder::trustCertFilePath); + return pool; + } + +private: + size_t maxSize_ = SessionPool::DEFAULT_MAX_SIZE; + int64_t waitTimeoutMs_ = SessionPool::DEFAULT_WAIT_TIMEOUT_MS; +}; + +#endif // IOTDB_SESSIONPOOL_H diff --git a/iotdb-client/client-cpp/src/test/cpp/sessionIT.cpp b/iotdb-client/client-cpp/src/test/cpp/sessionIT.cpp index 0bf30bbbf1be9..673942099bac0 100644 --- a/iotdb-client/client-cpp/src/test/cpp/sessionIT.cpp +++ b/iotdb-client/client-cpp/src/test/cpp/sessionIT.cpp @@ -21,8 +21,12 @@ #include "Column.h" #include "Session.h" #include "SessionBuilder.h" +#include "SessionPool.h" #include "TsBlock.h" +#include +#include + using namespace std; extern std::shared_ptr session; @@ -898,4 +902,127 @@ TEST_CASE("Numeric column widening getters align with Java TsFile", "[column]") std::vector longValues = {1000}; auto longColumn = std::make_shared(0, 1, valueIsNull, longValues); REQUIRE(longColumn->getDouble(0) == Approx(1000.0)); -} \ No newline at end of file +} +TEST_CASE("SessionPool basic borrow/insert/query via RAII lease", "[sessionPool]") { + CaseReporter cr("SessionPool basic"); + auto pool = SessionPoolBuilder() + .host("127.0.0.1") + ->rpcPort(6667) + ->username("root") + ->password("root") + ->maxSize(3) + ->build(); + + { + PooledSession s = pool->getSession(); + s->executeNonQueryStatement("delete timeseries root.test.pool.d1.*"); + } + + const int rows = 50; + for (int i = 0; i < rows; i++) { + PooledSession s = pool->getSession(); + s->insertRecord("root.test.pool.d1", i, {"s1"}, {to_string(i)}); + } + + int count = 0; + { + PooledSessionDataSet ds = pool->executeQueryStatement("select s1 from root.test.pool.d1"); + while (ds->hasNext()) { + ds->next(); + count++; + } + } + REQUIRE(count == rows); + + { + PooledSession s = pool->getSession(); + s->executeNonQueryStatement("delete timeseries root.test.pool.d1.*"); + } + pool->close(); +} + +TEST_CASE("SessionPool is safe under concurrent writers", "[sessionPool]") { + CaseReporter cr("SessionPool concurrency"); + auto pool = SessionPoolBuilder() + .host("127.0.0.1") + ->rpcPort(6667) + ->username("root") + ->password("root") + ->maxSize(4) + ->build(); + + { + PooledSession s = pool->getSession(); + s->executeNonQueryStatement("delete timeseries root.test.pool.d2.*"); + } + + const int threadCount = 8; + const int rowsPerThread = 100; + std::atomic failures(0); + std::vector threads; + for (int t = 0; t < threadCount; t++) { + threads.emplace_back([&pool, t, rowsPerThread, &failures]() { + try { + for (int i = 0; i < rowsPerThread; i++) { + int64_t ts = static_cast(t) * rowsPerThread + i; + // Mix RAII and convenience APIs to exercise both borrow paths. + if (i % 2 == 0) { + pool->insertRecord("root.test.pool.d2", ts, {"s1"}, {to_string(ts)}); + } else { + PooledSession s = pool->getSession(); + s->insertRecord("root.test.pool.d2", ts, {"s1"}, {to_string(ts)}); + } + } + } catch (const std::exception& e) { + std::cerr << "writer thread failed: " << e.what() << std::endl; + failures++; + } + }); + } + for (auto& th : threads) { + th.join(); + } + REQUIRE(failures.load() == 0); + REQUIRE(pool->getMaxSize() == 4); + + int count = 0; + { + PooledSessionDataSet ds = pool->executeQueryStatement("select s1 from root.test.pool.d2"); + while (ds->hasNext()) { + ds->next(); + count++; + } + } + REQUIRE(count == threadCount * rowsPerThread); + + { + PooledSession s = pool->getSession(); + s->executeNonQueryStatement("delete timeseries root.test.pool.d2.*"); + } + pool->close(); +} + +TEST_CASE("SessionPool getSession times out when exhausted", "[sessionPool]") { + CaseReporter cr("SessionPool exhaustion timeout"); + auto pool = SessionPoolBuilder() + .host("127.0.0.1") + ->rpcPort(6667) + ->username("root") + ->password("root") + ->maxSize(1) + ->waitToGetSessionTimeoutMs(200) + ->build(); + + PooledSession held = pool->getSession(); + REQUIRE(static_cast(held)); + REQUIRE(pool->activeCount() == 1); + // The only Session is checked out, so a second borrow must time out. + REQUIRE_THROWS_AS(pool->getSession(), IoTDBException); + + held.release(); + // After returning it, a borrow succeeds again. + PooledSession reused = pool->getSession(); + REQUIRE(static_cast(reused)); + reused.release(); + pool->close(); +} From 8ca5a42f4d8bb3ee4bce4937c183656e13cbe5c2 Mon Sep 17 00:00:00 2001 From: spricoder Date: Mon, 1 Jun 2026 14:05:04 +0800 Subject: [PATCH 5/9] Reject zero maxSize in SessionPool instead of clamping to 1 Address review feedback: maxSize is size_t, so a non-positive check reduces to == 0 (and "<= 0" would be a tautological-comparison warning under -Wall). Rather than silently clamping an invalid 0 to 1, fail fast by throwing IoTDBException so the misuse surfaces at construction time. --- iotdb-client/client-cpp/src/main/SessionPool.cpp | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/iotdb-client/client-cpp/src/main/SessionPool.cpp b/iotdb-client/client-cpp/src/main/SessionPool.cpp index 681078a41cb0d..9f1591a816e83 100644 --- a/iotdb-client/client-cpp/src/main/SessionPool.cpp +++ b/iotdb-client/client-cpp/src/main/SessionPool.cpp @@ -31,13 +31,20 @@ void PooledSession::reset() { SessionPool::SessionPool(std::string host, int rpcPort, std::string username, std::string password, size_t maxSize) : host_(std::move(host)), rpcPort_(rpcPort), username_(std::move(username)), - password_(std::move(password)), maxSize_(maxSize == 0 ? 1 : maxSize) {} + password_(std::move(password)), maxSize_(maxSize) { + if (maxSize_ == 0) { + throw IoTDBException("SessionPool maxSize must be greater than 0."); + } +} SessionPool::SessionPool(std::vector nodeUrls, std::string username, std::string password, size_t maxSize) : rpcPort_(AbstractSessionBuilder::DEFAULT_RPC_PORT), nodeUrls_(std::move(nodeUrls)), - username_(std::move(username)), password_(std::move(password)), - maxSize_(maxSize == 0 ? 1 : maxSize) {} + username_(std::move(username)), password_(std::move(password)), maxSize_(maxSize) { + if (maxSize_ == 0) { + throw IoTDBException("SessionPool maxSize must be greater than 0."); + } +} SessionPool::~SessionPool() { try { From b066827e8d75e875ffd0315c60cd2b7f1011b4b2 Mon Sep 17 00:00:00 2001 From: spricoder Date: Mon, 1 Jun 2026 14:16:46 +0800 Subject: [PATCH 6/9] Tolerate missing timeseries in SessionPool test cleanup The pre-test cleanup deleted root.test.pool.* timeseries unconditionally, which threw 508 (does not exist) on a fresh database and failed the new [sessionPool] cases. Ignore that error since the cleanup is best-effort. --- iotdb-client/client-cpp/src/test/cpp/sessionIT.cpp | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/iotdb-client/client-cpp/src/test/cpp/sessionIT.cpp b/iotdb-client/client-cpp/src/test/cpp/sessionIT.cpp index 673942099bac0..ff850d943f1b9 100644 --- a/iotdb-client/client-cpp/src/test/cpp/sessionIT.cpp +++ b/iotdb-client/client-cpp/src/test/cpp/sessionIT.cpp @@ -915,7 +915,11 @@ TEST_CASE("SessionPool basic borrow/insert/query via RAII lease", "[sessionPool] { PooledSession s = pool->getSession(); - s->executeNonQueryStatement("delete timeseries root.test.pool.d1.*"); + try { + s->executeNonQueryStatement("delete timeseries root.test.pool.d1.*"); + } catch (const std::exception&) { + // Ignore: the timeseries may not exist yet on a fresh database. + } } const int rows = 50; @@ -953,7 +957,11 @@ TEST_CASE("SessionPool is safe under concurrent writers", "[sessionPool]") { { PooledSession s = pool->getSession(); - s->executeNonQueryStatement("delete timeseries root.test.pool.d2.*"); + try { + s->executeNonQueryStatement("delete timeseries root.test.pool.d2.*"); + } catch (const std::exception&) { + // Ignore: the timeseries may not exist yet on a fresh database. + } } const int threadCount = 8; From bc155cc33173e072a609bd4433e7aff334e198d2 Mon Sep 17 00:00:00 2001 From: spricoder Date: Mon, 1 Jun 2026 14:17:55 +0800 Subject: [PATCH 7/9] Revert "Wire RPC compression flag through Session to its connections" This reverts commit 2f35cc51f7. Honoring the compression flag makes the client negotiate the compact Thrift protocol, which the binary-only IoTDB server used by the C++ integration tests cannot speak, breaking the pre-existing ts_session_open_with_compression smoke test (it had only passed because the flag was a no-op). Compression needs a compact-protocol-enabled test server, so it will be reintroduced in a dedicated PR with the matching server-side test support. SessionPool keeps its compression option for forward compatibility; it is currently a no-op, as the rest of the client has always been. --- iotdb-client/client-cpp/src/main/Session.cpp | 14 ++------------ iotdb-client/client-cpp/src/main/Session.h | 2 -- .../client-cpp/src/main/SessionConnection.cpp | 3 --- 3 files changed, 2 insertions(+), 17 deletions(-) diff --git a/iotdb-client/client-cpp/src/main/Session.cpp b/iotdb-client/client-cpp/src/main/Session.cpp index 6227bca416108..a2b07ed7afb93 100644 --- a/iotdb-client/client-cpp/src/main/Session.cpp +++ b/iotdb-client/client-cpp/src/main/Session.cpp @@ -768,14 +768,8 @@ void Session::initNodesSupplier(const std::vector& nodeUrls) { } if (enableAutoFetch_) { - // Propagate the compression setting to the node-discovery client too, so the - // background "show cluster" refresh uses the same protocol as data sessions. - // Other parameters keep their library defaults to preserve existing behavior. - nodesSupplier_ = NodesSupplier::create( - endPoints, username_, password_, useSSL_, trustCertFilePath_, /*zoneId=*/"", - ThriftConnection::THRIFT_DEFAULT_BUFFER_SIZE, ThriftConnection::THRIFT_MAX_FRAME_SIZE, - ThriftConnection::CONNECTION_TIMEOUT_IN_MS, enableRPCCompression_, - getVersionString(version)); + nodesSupplier_ = + NodesSupplier::create(endPoints, username_, password_, useSSL_, trustCertFilePath_); } else { nodesSupplier_ = make_shared(endPoints); } @@ -950,10 +944,6 @@ void Session::open(bool enableRPCCompression, int connectionTimeoutInMs) { return; } - // Honor compression requested either via the builder or this open() call, so - // the negotiated protocol (compact vs binary) actually reflects the setting. - enableRPCCompression_ = enableRPCCompression_ || enableRPCCompression; - try { initDefaultSessionConnection(); } catch (const exception& e) { diff --git a/iotdb-client/client-cpp/src/main/Session.h b/iotdb-client/client-cpp/src/main/Session.h index 3f32b6c67d69d..58bb0a7d30028 100644 --- a/iotdb-client/client-cpp/src/main/Session.h +++ b/iotdb-client/client-cpp/src/main/Session.h @@ -597,7 +597,6 @@ class Session { std::string database_; bool enableAutoFetch_ = true; bool enableRedirection_ = true; - bool enableRPCCompression_ = false; std::shared_ptr nodesSupplier_; friend class SessionConnection; friend class TableSession; @@ -775,7 +774,6 @@ class Session { this->database_ = builder->database; this->enableAutoFetch_ = builder->enableAutoFetch; this->enableRedirection_ = builder->enableRedirections; - this->enableRPCCompression_ = builder->enableRPCCompression; this->connectTimeoutMs_ = builder->connectTimeoutMs; this->nodeUrls_ = builder->nodeUrls; this->useSSL_ = builder->useSSL; diff --git a/iotdb-client/client-cpp/src/main/SessionConnection.cpp b/iotdb-client/client-cpp/src/main/SessionConnection.cpp index 1746d2049cb72..a7fa3beb6ffc4 100644 --- a/iotdb-client/client-cpp/src/main/SessionConnection.cpp +++ b/iotdb-client/client-cpp/src/main/SessionConnection.cpp @@ -39,9 +39,6 @@ SessionConnection::SessionConnection(Session* session_ptr, const TEndPoint& endp retryIntervalMs(retryInterval), connectionTimeoutInMs(connectionTimeout), sqlDialect(std::move(dialect)), database(std::move(db)) { this->zoneId = zoneId.empty() ? getSystemDefaultZoneId() : zoneId; - // Inherit the compression setting negotiated by the owning Session so the - // chosen Thrift protocol (compact when compression is on) is consistent. - this->enableRPCCompression = session->enableRPCCompression_; endPointList.push_back(endpoint); init(endPoint, session->useSSL_, session->trustCertFilePath_); } From 7036d7d637ffe101776afdeecb4b78549cddc894 Mon Sep 17 00:00:00 2001 From: spricoder Date: Mon, 1 Jun 2026 14:31:01 +0800 Subject: [PATCH 8/9] fix format --- iotdb-client/client-cpp/src/main/SessionPool.cpp | 6 +++--- iotdb-client/client-cpp/src/main/SessionPool.h | 6 ++---- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/iotdb-client/client-cpp/src/main/SessionPool.cpp b/iotdb-client/client-cpp/src/main/SessionPool.cpp index 9f1591a816e83..cdc99a245f9ef 100644 --- a/iotdb-client/client-cpp/src/main/SessionPool.cpp +++ b/iotdb-client/client-cpp/src/main/SessionPool.cpp @@ -170,9 +170,9 @@ std::shared_ptr SessionPool::acquire(int64_t timeoutMs) { } else { if (cv_.wait_until(lock, deadline) == std::cv_status::timeout && idleQueue_.empty() && size_ >= maxSize_ && !closed_) { - throw IoTDBException("Wait to get session timeout in SessionPool, maxSize=" + - std::to_string(maxSize_) + ", waitTimeoutMs=" + - std::to_string(effectiveTimeout) + "."); + throw IoTDBException( + "Wait to get session timeout in SessionPool, maxSize=" + std::to_string(maxSize_) + + ", waitTimeoutMs=" + std::to_string(effectiveTimeout) + "."); } } } diff --git a/iotdb-client/client-cpp/src/main/SessionPool.h b/iotdb-client/client-cpp/src/main/SessionPool.h index a54135469c819..4483dab0c5144 100644 --- a/iotdb-client/client-cpp/src/main/SessionPool.h +++ b/iotdb-client/client-cpp/src/main/SessionPool.h @@ -197,8 +197,7 @@ class SessionPool { // Generic helper: borrow a Session, run func(Session&), return/evict it, and // forward the result. Evicts the Session on IoTDBConnectionException. - template - auto execute(Func&& func) -> decltype(func(std::declval())); + template auto execute(Func&& func) -> decltype(func(std::declval())); // ---- convenience wrappers for common operations (with eviction on failure) ---- void insertTablet(Tablet& tablet, bool sorted = false); @@ -207,8 +206,7 @@ class SessionPool { void insertRecord(const std::string& deviceId, int64_t time, const std::vector& measurements, const std::vector& values); - void insertRecords(const std::vector& deviceIds, - const std::vector& times, + void insertRecords(const std::vector& deviceIds, const std::vector& times, const std::vector>& measurementsList, const std::vector>& valuesList); void executeNonQueryStatement(const std::string& sql); From 7d5fa7ed0c037e463212643e5594c2fb3a85ad2f Mon Sep 17 00:00:00 2001 From: spricoder Date: Mon, 1 Jun 2026 16:49:22 +0800 Subject: [PATCH 9/9] Discard SessionPool session if pool closed during construction Address review feedback: acquire() releases the lock while building a new connection, so a concurrent close() could set closed_ after the slot was reserved, and the freshly opened session would still be handed out from a closed pool. Re-check closed_ under the lock after construction; if the pool was closed meanwhile, release the slot, tear the session down outside the lock, and throw instead of returning it. --- iotdb-client/client-cpp/src/main/SessionPool.cpp | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/iotdb-client/client-cpp/src/main/SessionPool.cpp b/iotdb-client/client-cpp/src/main/SessionPool.cpp index cdc99a245f9ef..a828f0ac2c6da 100644 --- a/iotdb-client/client-cpp/src/main/SessionPool.cpp +++ b/iotdb-client/client-cpp/src/main/SessionPool.cpp @@ -161,6 +161,14 @@ std::shared_ptr SessionPool::acquire(int64_t timeoutMs) { cv_.notify_one(); throw; } + lock.lock(); + if (closed_) { + // The pool was closed while this session was being built; do not hand it + // out. Release its slot and let it be torn down outside the lock. + --size_; + lock.unlock(); + throw IoTDBException("SessionPool is closed."); + } return session; }