diff --git a/iotdb-client/client-cpp/src/main/Common.cpp b/iotdb-client/client-cpp/src/main/Common.cpp index 38b913c2270c..842150241da8 100644 --- a/iotdb-client/client-cpp/src/main/Common.cpp +++ b/iotdb-client/client-cpp/src/main/Common.cpp @@ -418,7 +418,7 @@ const char* MyStringBuffer::getOrderedByte(size_t len) { void MyStringBuffer::putOrderedByte(char* buf, int len) { if (isBigEndian) { - str.assign(buf, len); + str.append(buf, len); } else { for (int i = len - 1; i > -1; i--) { str += buf[i]; diff --git a/iotdb-client/client-cpp/src/main/Session.h b/iotdb-client/client-cpp/src/main/Session.h index cedfaeba1966..58bb0a7d3002 100644 --- a/iotdb-client/client-cpp/src/main/Session.h +++ b/iotdb-client/client-cpp/src/main/Session.h @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -258,16 +259,17 @@ class Tablet { template void addValue(size_t schemaId, size_t rowIndex, const T& value) { if (schemaId >= schemas.size()) { char tmpStr[100]; - sprintf(tmpStr, - "Tablet::addValue(), schemaId >= schemas.size(). schemaId=%ld, schemas.size()=%ld.", - schemaId, schemas.size()); + snprintf(tmpStr, sizeof(tmpStr), + "Tablet::addValue(), schemaId >= schemas.size(). schemaId=%ld, schemas.size()=%ld.", + (long)schemaId, (long)schemas.size()); throw std::out_of_range(tmpStr); } if (rowIndex >= rowSize) { char tmpStr[100]; - sprintf(tmpStr, "Tablet::addValue(), rowIndex >= rowSize. rowIndex=%ld, rowSize.size()=%ld.", - rowIndex, rowSize); + snprintf(tmpStr, sizeof(tmpStr), + "Tablet::addValue(), rowIndex >= rowSize. rowIndex=%ld, rowSize.size()=%ld.", + (long)rowIndex, (long)rowSize); throw std::out_of_range(tmpStr); } @@ -317,19 +319,19 @@ class Tablet { // Check schemaId bounds if (schemaId >= schemas.size()) { char tmpStr[100]; - sprintf(tmpStr, - "Tablet::addBinaryValueWithMeta(), schemaId >= schemas.size(). schemaId=%ld, " - "schemas.size()=%ld.", - schemaId, schemas.size()); + snprintf(tmpStr, sizeof(tmpStr), + "Tablet::addBinaryValueWithMeta(), schemaId >= schemas.size(). schemaId=%ld, " + "schemas.size()=%ld.", + (long)schemaId, (long)schemas.size()); throw std::out_of_range(tmpStr); } // Check rowIndex bounds if (rowIndex >= rowSize) { char tmpStr[100]; - sprintf(tmpStr, - "Tablet::addBinaryValueWithMeta(), rowIndex >= rowSize. rowIndex=%ld, rowSize=%ld.", - rowIndex, rowSize); + snprintf(tmpStr, sizeof(tmpStr), + "Tablet::addBinaryValueWithMeta(), rowIndex >= rowSize. rowIndex=%ld, rowSize=%ld.", + (long)rowIndex, (long)rowSize); throw std::out_of_range(tmpStr); } diff --git a/iotdb-client/client-cpp/src/main/SessionPool.cpp b/iotdb-client/client-cpp/src/main/SessionPool.cpp new file mode 100644 index 000000000000..a828f0ac2c6d --- /dev/null +++ b/iotdb-client/client-cpp/src/main/SessionPool.cpp @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "SessionPool.h" + +void PooledSession::reset() { + if (session_ && pool_ != nullptr) { + pool_->putBack(session_, broken_); + } + pool_ = nullptr; + session_ = nullptr; + broken_ = false; +} + +SessionPool::SessionPool(std::string host, int rpcPort, std::string username, std::string password, + size_t maxSize) + : host_(std::move(host)), rpcPort_(rpcPort), username_(std::move(username)), + password_(std::move(password)), maxSize_(maxSize) { + if (maxSize_ == 0) { + throw IoTDBException("SessionPool maxSize must be greater than 0."); + } +} + +SessionPool::SessionPool(std::vector nodeUrls, std::string username, + std::string password, size_t maxSize) + : rpcPort_(AbstractSessionBuilder::DEFAULT_RPC_PORT), nodeUrls_(std::move(nodeUrls)), + username_(std::move(username)), password_(std::move(password)), maxSize_(maxSize) { + if (maxSize_ == 0) { + throw IoTDBException("SessionPool maxSize must be greater than 0."); + } +} + +SessionPool::~SessionPool() { + try { + close(); + } catch (const std::exception& e) { + log_debug(std::string("SessionPool::~SessionPool(), ") + e.what()); + } +} + +SessionPool& SessionPool::setFetchSize(int fetchSize) { + fetchSize_ = fetchSize; + return *this; +} + +SessionPool& SessionPool::setZoneId(std::string zoneId) { + zoneId_ = std::move(zoneId); + return *this; +} + +SessionPool& SessionPool::setSqlDialect(std::string sqlDialect) { + sqlDialect_ = std::move(sqlDialect); + return *this; +} + +SessionPool& SessionPool::setDatabase(std::string database) { + database_ = std::move(database); + return *this; +} + +SessionPool& SessionPool::setEnableRedirection(bool enable) { + enableRedirection_ = enable; + return *this; +} + +SessionPool& SessionPool::setEnableAutoFetch(bool enable) { + enableAutoFetch_ = enable; + return *this; +} + +SessionPool& SessionPool::setEnableRPCCompression(bool enable) { + enableRPCCompression_ = enable; + return *this; +} + +SessionPool& SessionPool::setConnectTimeoutMs(int connectTimeoutMs) { + connectTimeoutMs_ = connectTimeoutMs; + return *this; +} + +SessionPool& SessionPool::setWaitToGetSessionTimeoutMs(int64_t timeoutMs) { + waitTimeoutMs_ = timeoutMs; + return *this; +} + +SessionPool& SessionPool::setUseSSL(bool useSSL) { + useSSL_ = useSSL; + return *this; +} + +SessionPool& SessionPool::setTrustCertFilePath(std::string path) { + trustCertFilePath_ = std::move(path); + return *this; +} + +std::shared_ptr SessionPool::constructNewSession() { + AbstractSessionBuilder builder; + builder.host = host_; + builder.rpcPort = rpcPort_; + builder.nodeUrls = nodeUrls_; + builder.username = username_; + builder.password = password_; + builder.zoneId = zoneId_; + builder.fetchSize = fetchSize_; + builder.sqlDialect = sqlDialect_; + builder.database = database_; + builder.enableAutoFetch = enableAutoFetch_; + builder.enableRedirections = enableRedirection_; + builder.enableRPCCompression = enableRPCCompression_; + builder.connectTimeoutMs = connectTimeoutMs_; + builder.useSSL = useSSL_; + builder.trustCertFilePath = trustCertFilePath_; + + auto session = std::make_shared(&builder); + session->open(enableRPCCompression_, connectTimeoutMs_); + return session; +} + +std::shared_ptr SessionPool::acquire(int64_t timeoutMs) { + const int64_t effectiveTimeout = timeoutMs > 0 ? timeoutMs : waitTimeoutMs_; + std::unique_lock lock(mutex_); + const auto deadline = + std::chrono::steady_clock::now() + std::chrono::milliseconds(effectiveTimeout); + + while (true) { + if (closed_) { + throw IoTDBException("SessionPool is closed."); + } + if (!idleQueue_.empty()) { + auto session = idleQueue_.front(); + idleQueue_.pop_front(); + return session; + } + if (size_ < maxSize_) { + // Reserve a slot, then build the connection outside the lock so other + // borrowers are not blocked by network/handshake latency. + ++size_; + lock.unlock(); + std::shared_ptr session; + try { + session = constructNewSession(); + } catch (...) { + lock.lock(); + --size_; + cv_.notify_one(); + throw; + } + lock.lock(); + if (closed_) { + // The pool was closed while this session was being built; do not hand it + // out. Release its slot and let it be torn down outside the lock. + --size_; + lock.unlock(); + throw IoTDBException("SessionPool is closed."); + } + return session; + } + + // Pool exhausted: wait for a Session to be returned. + if (effectiveTimeout <= 0) { + cv_.wait(lock); + } else { + if (cv_.wait_until(lock, deadline) == std::cv_status::timeout && idleQueue_.empty() && + size_ >= maxSize_ && !closed_) { + throw IoTDBException( + "Wait to get session timeout in SessionPool, maxSize=" + std::to_string(maxSize_) + + ", waitTimeoutMs=" + std::to_string(effectiveTimeout) + "."); + } + } + } +} + +void SessionPool::putBack(const std::shared_ptr& session, bool broken) { + std::lock_guard lock(mutex_); + if (broken || closed_) { + // Drop the Session and free its slot so a healthy replacement can be created + // on demand. The caller (PooledSession::reset) still holds the last reference + // and tears the connection down after we return, i.e. outside this lock. + --size_; + } else { + idleQueue_.push_back(session); + } + cv_.notify_one(); +} + +PooledSession SessionPool::getSession() { + return getSession(waitTimeoutMs_); +} + +PooledSession SessionPool::getSession(int64_t timeoutMs) { + return PooledSession(this, acquire(timeoutMs)); +} + +void SessionPool::insertTablet(Tablet& tablet, bool sorted) { + execute([&](Session& s) { s.insertTablet(tablet, sorted); }); +} + +void SessionPool::insertAlignedTablet(Tablet& tablet, bool sorted) { + execute([&](Session& s) { s.insertAlignedTablet(tablet, sorted); }); +} + +void SessionPool::insertTablets(std::unordered_map& tablets, bool sorted) { + execute([&](Session& s) { s.insertTablets(tablets, sorted); }); +} + +void SessionPool::insertRecord(const std::string& deviceId, int64_t time, + const std::vector& measurements, + const std::vector& values) { + execute([&](Session& s) { s.insertRecord(deviceId, time, measurements, values); }); +} + +void SessionPool::insertRecords(const std::vector& deviceIds, + const std::vector& times, + const std::vector>& measurementsList, + const std::vector>& valuesList) { + execute([&](Session& s) { s.insertRecords(deviceIds, times, measurementsList, valuesList); }); +} + +void SessionPool::executeNonQueryStatement(const std::string& sql) { + execute([&](Session& s) { s.executeNonQueryStatement(sql); }); +} + +PooledSessionDataSet SessionPool::executeQueryStatement(const std::string& sql) { + PooledSession lease = getSession(); + try { + auto dataSet = lease->executeQueryStatement(sql); + return PooledSessionDataSet(std::move(lease), std::move(dataSet)); + } catch (const IoTDBConnectionException&) { + lease.markBroken(); + throw; + } +} + +PooledSessionDataSet SessionPool::executeQueryStatement(const std::string& sql, + int64_t timeoutInMs) { + PooledSession lease = getSession(); + try { + auto dataSet = lease->executeQueryStatement(sql, timeoutInMs); + return PooledSessionDataSet(std::move(lease), std::move(dataSet)); + } catch (const IoTDBConnectionException&) { + lease.markBroken(); + throw; + } +} + +void SessionPool::close() { + std::deque> toClose; + { + std::lock_guard lock(mutex_); + if (closed_) { + return; + } + closed_ = true; + toClose.swap(idleQueue_); + size_ -= toClose.size(); + } + cv_.notify_all(); + // Sessions destructed here (outside the lock) close their connections. + toClose.clear(); +} + +size_t SessionPool::activeCount() { + std::lock_guard lock(mutex_); + return size_ - idleQueue_.size(); +} diff --git a/iotdb-client/client-cpp/src/main/SessionPool.h b/iotdb-client/client-cpp/src/main/SessionPool.h new file mode 100644 index 000000000000..4483dab0c514 --- /dev/null +++ b/iotdb-client/client-cpp/src/main/SessionPool.h @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef IOTDB_SESSIONPOOL_H +#define IOTDB_SESSIONPOOL_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "Session.h" + +/* + * A thread-safe pool of opened Session objects. + * + * A Session is NOT safe to use from multiple threads concurrently. SessionPool + * solves this by lending each Session to exactly one borrower at a time and + * reclaiming it afterwards, so many application threads can share a bounded set + * of physical connections without external locking. + * + * Two usage styles are supported: + * + * 1. RAII lease (recommended for arbitrary calls): + * { + * PooledSession s = pool.getSession(); // blocks up to the timeout + * s->insertTablet(tablet); // call any Session method + * } // automatically returned here + * + * 2. Convenience wrappers / generic execute() (recommended for hot paths): + * pool.insertTablet(tablet); + * pool.execute([&](Session& s) { s.insertRecord(...); }); + * + * Both styles evict a Session from the pool (instead of recycling it) when the + * operation throws IoTDBConnectionException, so a dead connection is never + * handed to the next borrower; a fresh one is created lazily on demand. + * + * Lifetime: a PooledSession returns its Session to the owning SessionPool when + * destroyed, so every PooledSession must not outlive the pool it came from. + */ +class SessionPool; + +class PooledSession { +public: + PooledSession() noexcept : pool_(nullptr), session_(nullptr), broken_(false) {} + + PooledSession(SessionPool* pool, std::shared_ptr session) + : pool_(pool), session_(std::move(session)), broken_(false) {} + + // Non-copyable: a leased Session is owned by exactly one borrower. + PooledSession(const PooledSession&) = delete; + PooledSession& operator=(const PooledSession&) = delete; + + PooledSession(PooledSession&& other) noexcept + : pool_(other.pool_), session_(std::move(other.session_)), broken_(other.broken_) { + other.pool_ = nullptr; + other.session_ = nullptr; + other.broken_ = false; + } + + PooledSession& operator=(PooledSession&& other) noexcept { + if (this != &other) { + reset(); + pool_ = other.pool_; + session_ = std::move(other.session_); + broken_ = other.broken_; + other.pool_ = nullptr; + other.session_ = nullptr; + other.broken_ = false; + } + return *this; + } + + ~PooledSession() { + reset(); + } + + Session* operator->() const { + return session_.get(); + } + + Session& operator*() const { + return *session_; + } + + explicit operator bool() const { + return static_cast(session_); + } + + // Mark the underlying connection as unusable so it is discarded (not recycled) + // when this lease is returned. Call this if you caught a connection error. + void markBroken() { + broken_ = true; + } + + // Eagerly return the Session to the pool before scope exit. + void release() { + reset(); + } + +private: + void reset(); + + SessionPool* pool_; + std::shared_ptr session_; + bool broken_; +}; + +/* + * Couples a query result set with the pooled Session that produced it. + * + * A SessionDataSet lazily fetches further result blocks over its Session's + * connection, so that Session must stay exclusively leased until iteration is + * finished. This wrapper holds the lease for exactly that long and returns the + * Session to the pool when destroyed. + */ +class PooledSessionDataSet { +public: + PooledSessionDataSet(PooledSession session, std::unique_ptr dataSet) + : session_(std::move(session)), dataSet_(std::move(dataSet)) {} + + PooledSessionDataSet(const PooledSessionDataSet&) = delete; + PooledSessionDataSet& operator=(const PooledSessionDataSet&) = delete; + PooledSessionDataSet(PooledSessionDataSet&&) noexcept = default; + PooledSessionDataSet& operator=(PooledSessionDataSet&&) noexcept = default; + + SessionDataSet* operator->() const { + return dataSet_.get(); + } + SessionDataSet& operator*() const { + return *dataSet_; + } + +private: + PooledSession session_; + std::unique_ptr dataSet_; +}; + +class SessionPool { +public: + static constexpr size_t DEFAULT_MAX_SIZE = 5; + static constexpr int64_t DEFAULT_WAIT_TIMEOUT_MS = 60 * 1000; + + // Single-host constructor. + SessionPool(std::string host, int rpcPort, std::string username, std::string password, + size_t maxSize = DEFAULT_MAX_SIZE); + + // Multi-node constructor. + SessionPool(std::vector nodeUrls, std::string username, std::string password, + size_t maxSize = DEFAULT_MAX_SIZE); + + ~SessionPool(); + + // Non-copyable, non-movable: the pool owns mutex/condition state. + SessionPool(const SessionPool&) = delete; + SessionPool& operator=(const SessionPool&) = delete; + + // ---- configuration (apply before the first getSession()) ---- + SessionPool& setFetchSize(int fetchSize); + SessionPool& setZoneId(std::string zoneId); + SessionPool& setSqlDialect(std::string sqlDialect); + SessionPool& setDatabase(std::string database); + SessionPool& setEnableRedirection(bool enable); + SessionPool& setEnableAutoFetch(bool enable); + SessionPool& setEnableRPCCompression(bool enable); + SessionPool& setConnectTimeoutMs(int connectTimeoutMs); + SessionPool& setWaitToGetSessionTimeoutMs(int64_t timeoutMs); + SessionPool& setUseSSL(bool useSSL); + SessionPool& setTrustCertFilePath(std::string path); + + // Borrow a Session. Blocks until one is free or a new one can be created, + // up to timeoutMs (<= 0 means use the pool default). Throws IoTDBException on + // timeout or when the pool is closed. + PooledSession getSession(); + PooledSession getSession(int64_t timeoutMs); + + // Generic helper: borrow a Session, run func(Session&), return/evict it, and + // forward the result. Evicts the Session on IoTDBConnectionException. + template auto execute(Func&& func) -> decltype(func(std::declval())); + + // ---- convenience wrappers for common operations (with eviction on failure) ---- + void insertTablet(Tablet& tablet, bool sorted = false); + void insertAlignedTablet(Tablet& tablet, bool sorted = false); + void insertTablets(std::unordered_map& tablets, bool sorted = false); + void insertRecord(const std::string& deviceId, int64_t time, + const std::vector& measurements, + const std::vector& values); + void insertRecords(const std::vector& deviceIds, const std::vector& times, + const std::vector>& measurementsList, + const std::vector>& valuesList); + void executeNonQueryStatement(const std::string& sql); + // The returned wrapper keeps the underlying Session leased until it is + // destroyed, so it is safe to iterate the result set across multiple fetches. + PooledSessionDataSet executeQueryStatement(const std::string& sql); + PooledSessionDataSet executeQueryStatement(const std::string& sql, int64_t timeoutInMs); + + // Close the pool: idle Sessions are closed immediately, in-use Sessions are + // closed when they are returned. Idempotent. + void close(); + + // ---- observability ---- + size_t getMaxSize() const { + return maxSize_; + } + // Number of Sessions currently borrowed. + size_t activeCount(); + +private: + friend class PooledSession; + + std::shared_ptr constructNewSession(); + std::shared_ptr acquire(int64_t timeoutMs); + void putBack(const std::shared_ptr& session, bool broken); + + // connection parameters + std::string host_; + int rpcPort_; + std::vector nodeUrls_; + std::string username_; + std::string password_; + std::string zoneId_; + int fetchSize_ = AbstractSessionBuilder::DEFAULT_FETCH_SIZE; + std::string sqlDialect_ = AbstractSessionBuilder::DEFAULT_SQL_DIALECT; + std::string database_; + bool enableRedirection_ = AbstractSessionBuilder::DEFAULT_ENABLE_REDIRECTIONS; + bool enableAutoFetch_ = AbstractSessionBuilder::DEFAULT_ENABLE_AUTO_FETCH; + bool enableRPCCompression_ = AbstractSessionBuilder::DEFAULT_ENABLE_RPC_COMPRESSION; + int connectTimeoutMs_ = AbstractSessionBuilder::DEFAULT_CONNECT_TIMEOUT_MS; + bool useSSL_ = false; + std::string trustCertFilePath_; + + // pool sizing / waiting policy + size_t maxSize_; + int64_t waitTimeoutMs_ = DEFAULT_WAIT_TIMEOUT_MS; + + // pool state, guarded by mutex_ + std::mutex mutex_; + std::condition_variable cv_; + std::deque> idleQueue_; + size_t size_ = 0; // total live Sessions (idle + borrowed) + bool closed_ = false; +}; + +template +auto SessionPool::execute(Func&& func) -> decltype(func(std::declval())) { + PooledSession lease = getSession(); + try { + return func(*lease); + } catch (const IoTDBConnectionException&) { + lease.markBroken(); + throw; + } +} + +/* + * Fluent builder for SessionPool, mirroring SessionBuilder / TableSessionBuilder. + * + * auto pool = SessionPoolBuilder() + * .host("127.0.0.1")->rpcPort(6667) + * ->username("root")->password("root") + * ->maxSize(10)->build(); + */ +class SessionPoolBuilder : public AbstractSessionBuilder { +public: + SessionPoolBuilder* host(const std::string& v) { + AbstractSessionBuilder::host = v; + return this; + } + SessionPoolBuilder* rpcPort(int v) { + AbstractSessionBuilder::rpcPort = v; + return this; + } + SessionPoolBuilder* nodeUrls(const std::vector& v) { + AbstractSessionBuilder::nodeUrls = v; + return this; + } + SessionPoolBuilder* username(const std::string& v) { + AbstractSessionBuilder::username = v; + return this; + } + SessionPoolBuilder* password(const std::string& v) { + AbstractSessionBuilder::password = v; + return this; + } + SessionPoolBuilder* zoneId(const std::string& v) { + AbstractSessionBuilder::zoneId = v; + return this; + } + SessionPoolBuilder* fetchSize(int v) { + AbstractSessionBuilder::fetchSize = v; + return this; + } + SessionPoolBuilder* database(const std::string& v) { + AbstractSessionBuilder::database = v; + return this; + } + SessionPoolBuilder* enableAutoFetch(bool v) { + AbstractSessionBuilder::enableAutoFetch = v; + return this; + } + SessionPoolBuilder* enableRedirections(bool v) { + AbstractSessionBuilder::enableRedirections = v; + return this; + } + SessionPoolBuilder* enableRPCCompression(bool v) { + AbstractSessionBuilder::enableRPCCompression = v; + return this; + } + SessionPoolBuilder* connectTimeoutMs(int v) { + AbstractSessionBuilder::connectTimeoutMs = v; + return this; + } + SessionPoolBuilder* useSSL(bool v) { + AbstractSessionBuilder::useSSL = v; + return this; + } + SessionPoolBuilder* trustCertFilePath(const std::string& v) { + AbstractSessionBuilder::trustCertFilePath = v; + return this; + } + SessionPoolBuilder* maxSize(size_t v) { + maxSize_ = v; + return this; + } + SessionPoolBuilder* waitToGetSessionTimeoutMs(int64_t v) { + waitTimeoutMs_ = v; + return this; + } + SessionPoolBuilder* sqlDialect(const std::string& v) { + AbstractSessionBuilder::sqlDialect = v; + return this; + } + + std::shared_ptr build() { + if (!AbstractSessionBuilder::nodeUrls.empty() && + (AbstractSessionBuilder::host != DEFAULT_HOST || + AbstractSessionBuilder::rpcPort != DEFAULT_RPC_PORT)) { + throw IoTDBException( + "SessionPool builder does not support setting node urls and host/rpcPort at the same " + "time."); + } + std::shared_ptr pool; + if (!AbstractSessionBuilder::nodeUrls.empty()) { + pool = std::make_shared(AbstractSessionBuilder::nodeUrls, + AbstractSessionBuilder::username, + AbstractSessionBuilder::password, maxSize_); + } else { + pool = std::make_shared( + AbstractSessionBuilder::host, AbstractSessionBuilder::rpcPort, + AbstractSessionBuilder::username, AbstractSessionBuilder::password, maxSize_); + } + pool->setFetchSize(AbstractSessionBuilder::fetchSize) + .setZoneId(AbstractSessionBuilder::zoneId) + .setSqlDialect(AbstractSessionBuilder::sqlDialect) + .setDatabase(AbstractSessionBuilder::database) + .setEnableRedirection(AbstractSessionBuilder::enableRedirections) + .setEnableAutoFetch(AbstractSessionBuilder::enableAutoFetch) + .setEnableRPCCompression(AbstractSessionBuilder::enableRPCCompression) + .setConnectTimeoutMs(AbstractSessionBuilder::connectTimeoutMs) + .setWaitToGetSessionTimeoutMs(waitTimeoutMs_) + .setUseSSL(AbstractSessionBuilder::useSSL) + .setTrustCertFilePath(AbstractSessionBuilder::trustCertFilePath); + return pool; + } + +private: + size_t maxSize_ = SessionPool::DEFAULT_MAX_SIZE; + int64_t waitTimeoutMs_ = SessionPool::DEFAULT_WAIT_TIMEOUT_MS; +}; + +#endif // IOTDB_SESSIONPOOL_H diff --git a/iotdb-client/client-cpp/src/test/cpp/sessionIT.cpp b/iotdb-client/client-cpp/src/test/cpp/sessionIT.cpp index 0bf30bbbf1be..ff850d943f1b 100644 --- a/iotdb-client/client-cpp/src/test/cpp/sessionIT.cpp +++ b/iotdb-client/client-cpp/src/test/cpp/sessionIT.cpp @@ -21,8 +21,12 @@ #include "Column.h" #include "Session.h" #include "SessionBuilder.h" +#include "SessionPool.h" #include "TsBlock.h" +#include +#include + using namespace std; extern std::shared_ptr session; @@ -898,4 +902,135 @@ TEST_CASE("Numeric column widening getters align with Java TsFile", "[column]") std::vector longValues = {1000}; auto longColumn = std::make_shared(0, 1, valueIsNull, longValues); REQUIRE(longColumn->getDouble(0) == Approx(1000.0)); -} \ No newline at end of file +} +TEST_CASE("SessionPool basic borrow/insert/query via RAII lease", "[sessionPool]") { + CaseReporter cr("SessionPool basic"); + auto pool = SessionPoolBuilder() + .host("127.0.0.1") + ->rpcPort(6667) + ->username("root") + ->password("root") + ->maxSize(3) + ->build(); + + { + PooledSession s = pool->getSession(); + try { + s->executeNonQueryStatement("delete timeseries root.test.pool.d1.*"); + } catch (const std::exception&) { + // Ignore: the timeseries may not exist yet on a fresh database. + } + } + + const int rows = 50; + for (int i = 0; i < rows; i++) { + PooledSession s = pool->getSession(); + s->insertRecord("root.test.pool.d1", i, {"s1"}, {to_string(i)}); + } + + int count = 0; + { + PooledSessionDataSet ds = pool->executeQueryStatement("select s1 from root.test.pool.d1"); + while (ds->hasNext()) { + ds->next(); + count++; + } + } + REQUIRE(count == rows); + + { + PooledSession s = pool->getSession(); + s->executeNonQueryStatement("delete timeseries root.test.pool.d1.*"); + } + pool->close(); +} + +TEST_CASE("SessionPool is safe under concurrent writers", "[sessionPool]") { + CaseReporter cr("SessionPool concurrency"); + auto pool = SessionPoolBuilder() + .host("127.0.0.1") + ->rpcPort(6667) + ->username("root") + ->password("root") + ->maxSize(4) + ->build(); + + { + PooledSession s = pool->getSession(); + try { + s->executeNonQueryStatement("delete timeseries root.test.pool.d2.*"); + } catch (const std::exception&) { + // Ignore: the timeseries may not exist yet on a fresh database. + } + } + + const int threadCount = 8; + const int rowsPerThread = 100; + std::atomic failures(0); + std::vector threads; + for (int t = 0; t < threadCount; t++) { + threads.emplace_back([&pool, t, rowsPerThread, &failures]() { + try { + for (int i = 0; i < rowsPerThread; i++) { + int64_t ts = static_cast(t) * rowsPerThread + i; + // Mix RAII and convenience APIs to exercise both borrow paths. + if (i % 2 == 0) { + pool->insertRecord("root.test.pool.d2", ts, {"s1"}, {to_string(ts)}); + } else { + PooledSession s = pool->getSession(); + s->insertRecord("root.test.pool.d2", ts, {"s1"}, {to_string(ts)}); + } + } + } catch (const std::exception& e) { + std::cerr << "writer thread failed: " << e.what() << std::endl; + failures++; + } + }); + } + for (auto& th : threads) { + th.join(); + } + REQUIRE(failures.load() == 0); + REQUIRE(pool->getMaxSize() == 4); + + int count = 0; + { + PooledSessionDataSet ds = pool->executeQueryStatement("select s1 from root.test.pool.d2"); + while (ds->hasNext()) { + ds->next(); + count++; + } + } + REQUIRE(count == threadCount * rowsPerThread); + + { + PooledSession s = pool->getSession(); + s->executeNonQueryStatement("delete timeseries root.test.pool.d2.*"); + } + pool->close(); +} + +TEST_CASE("SessionPool getSession times out when exhausted", "[sessionPool]") { + CaseReporter cr("SessionPool exhaustion timeout"); + auto pool = SessionPoolBuilder() + .host("127.0.0.1") + ->rpcPort(6667) + ->username("root") + ->password("root") + ->maxSize(1) + ->waitToGetSessionTimeoutMs(200) + ->build(); + + PooledSession held = pool->getSession(); + REQUIRE(static_cast(held)); + REQUIRE(pool->activeCount() == 1); + // The only Session is checked out, so a second borrow must time out. + REQUIRE_THROWS_AS(pool->getSession(), IoTDBException); + + held.release(); + // After returning it, a borrow succeeds again. + PooledSession reused = pool->getSession(); + REQUIRE(static_cast(reused)); + reused.release(); + pool->close(); +}