Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion iotdb-client/client-cpp/src/main/Common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ const char* MyStringBuffer::getOrderedByte(size_t len) {

void MyStringBuffer::putOrderedByte(char* buf, int len) {
if (isBigEndian) {
str.assign(buf, len);
str.append(buf, len);
} else {
for (int i = len - 1; i > -1; i--) {
str += buf[i];
Expand Down
14 changes: 12 additions & 2 deletions iotdb-client/client-cpp/src/main/Session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -768,8 +768,14 @@ void Session::initNodesSupplier(const std::vector<std::string>& nodeUrls) {
}

if (enableAutoFetch_) {
nodesSupplier_ =
NodesSupplier::create(endPoints, username_, password_, useSSL_, trustCertFilePath_);
// Propagate the compression setting to the node-discovery client too, so the
// background "show cluster" refresh uses the same protocol as data sessions.
// Other parameters keep their library defaults to preserve existing behavior.
nodesSupplier_ = NodesSupplier::create(
endPoints, username_, password_, useSSL_, trustCertFilePath_, /*zoneId=*/"",
ThriftConnection::THRIFT_DEFAULT_BUFFER_SIZE, ThriftConnection::THRIFT_MAX_FRAME_SIZE,
ThriftConnection::CONNECTION_TIMEOUT_IN_MS, enableRPCCompression_,
getVersionString(version));
} else {
nodesSupplier_ = make_shared<StaticNodesSupplier>(endPoints);
}
Expand Down Expand Up @@ -944,6 +950,10 @@ void Session::open(bool enableRPCCompression, int connectionTimeoutInMs) {
return;
}

// Honor compression requested either via the builder or this open() call, so
// the negotiated protocol (compact vs binary) actually reflects the setting.
enableRPCCompression_ = enableRPCCompression_ || enableRPCCompression;

try {
initDefaultSessionConnection();
} catch (const exception& e) {
Expand Down
28 changes: 16 additions & 12 deletions iotdb-client/client-cpp/src/main/Session.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <thread>
#include <stdexcept>
#include <cstdlib>
#include <cstdio>
#include <future>
#include <boost/date_time/gregorian/gregorian.hpp>
#include <thrift/protocol/TBinaryProtocol.h>
Expand Down Expand Up @@ -258,16 +259,17 @@ class Tablet {
template <typename T> void addValue(size_t schemaId, size_t rowIndex, const T& value) {
if (schemaId >= schemas.size()) {
char tmpStr[100];
sprintf(tmpStr,
"Tablet::addValue(), schemaId >= schemas.size(). schemaId=%ld, schemas.size()=%ld.",
schemaId, schemas.size());
snprintf(tmpStr, sizeof(tmpStr),
"Tablet::addValue(), schemaId >= schemas.size(). schemaId=%ld, schemas.size()=%ld.",
(long)schemaId, (long)schemas.size());
throw std::out_of_range(tmpStr);
}

if (rowIndex >= rowSize) {
char tmpStr[100];
sprintf(tmpStr, "Tablet::addValue(), rowIndex >= rowSize. rowIndex=%ld, rowSize.size()=%ld.",
rowIndex, rowSize);
snprintf(tmpStr, sizeof(tmpStr),
"Tablet::addValue(), rowIndex >= rowSize. rowIndex=%ld, rowSize.size()=%ld.",
(long)rowIndex, (long)rowSize);
throw std::out_of_range(tmpStr);
}

Expand Down Expand Up @@ -317,19 +319,19 @@ class Tablet {
// Check schemaId bounds
if (schemaId >= schemas.size()) {
char tmpStr[100];
sprintf(tmpStr,
"Tablet::addBinaryValueWithMeta(), schemaId >= schemas.size(). schemaId=%ld, "
"schemas.size()=%ld.",
schemaId, schemas.size());
snprintf(tmpStr, sizeof(tmpStr),
"Tablet::addBinaryValueWithMeta(), schemaId >= schemas.size(). schemaId=%ld, "
"schemas.size()=%ld.",
(long)schemaId, (long)schemas.size());
throw std::out_of_range(tmpStr);
}

// Check rowIndex bounds
if (rowIndex >= rowSize) {
char tmpStr[100];
sprintf(tmpStr,
"Tablet::addBinaryValueWithMeta(), rowIndex >= rowSize. rowIndex=%ld, rowSize=%ld.",
rowIndex, rowSize);
snprintf(tmpStr, sizeof(tmpStr),
"Tablet::addBinaryValueWithMeta(), rowIndex >= rowSize. rowIndex=%ld, rowSize=%ld.",
(long)rowIndex, (long)rowSize);
throw std::out_of_range(tmpStr);
}

Expand Down Expand Up @@ -595,6 +597,7 @@ class Session {
std::string database_;
bool enableAutoFetch_ = true;
bool enableRedirection_ = true;
bool enableRPCCompression_ = false;
std::shared_ptr<INodesSupplier> nodesSupplier_;
friend class SessionConnection;
friend class TableSession;
Expand Down Expand Up @@ -772,6 +775,7 @@ class Session {
this->database_ = builder->database;
this->enableAutoFetch_ = builder->enableAutoFetch;
this->enableRedirection_ = builder->enableRedirections;
this->enableRPCCompression_ = builder->enableRPCCompression;
this->connectTimeoutMs_ = builder->connectTimeoutMs;
this->nodeUrls_ = builder->nodeUrls;
this->useSSL_ = builder->useSSL;
Expand Down
3 changes: 3 additions & 0 deletions iotdb-client/client-cpp/src/main/SessionConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ SessionConnection::SessionConnection(Session* session_ptr, const TEndPoint& endp
retryIntervalMs(retryInterval), connectionTimeoutInMs(connectionTimeout),
sqlDialect(std::move(dialect)), database(std::move(db)) {
this->zoneId = zoneId.empty() ? getSystemDefaultZoneId() : zoneId;
// Inherit the compression setting negotiated by the owning Session so the
// chosen Thrift protocol (compact when compression is on) is consistent.
this->enableRPCCompression = session->enableRPCCompression_;
endPointList.push_back(endpoint);
init(endPoint, session->useSSL_, session->trustCertFilePath_);
}
Expand Down
266 changes: 266 additions & 0 deletions iotdb-client/client-cpp/src/main/SessionPool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "SessionPool.h"

void PooledSession::reset() {
if (session_ && pool_ != nullptr) {
pool_->putBack(session_, broken_);
}
pool_ = nullptr;
session_ = nullptr;
broken_ = false;
}

SessionPool::SessionPool(std::string host, int rpcPort, std::string username, std::string password,
size_t maxSize)
: host_(std::move(host)), rpcPort_(rpcPort), username_(std::move(username)),
password_(std::move(password)), maxSize_(maxSize == 0 ? 1 : maxSize) {}

SessionPool::SessionPool(std::vector<std::string> nodeUrls, std::string username,
std::string password, size_t maxSize)
: rpcPort_(AbstractSessionBuilder::DEFAULT_RPC_PORT), nodeUrls_(std::move(nodeUrls)),
username_(std::move(username)), password_(std::move(password)),
maxSize_(maxSize == 0 ? 1 : maxSize) {}

SessionPool::~SessionPool() {
try {
close();
} catch (const std::exception& e) {
log_debug(std::string("SessionPool::~SessionPool(), ") + e.what());
}
}

SessionPool& SessionPool::setFetchSize(int fetchSize) {
fetchSize_ = fetchSize;
return *this;
}

SessionPool& SessionPool::setZoneId(std::string zoneId) {
zoneId_ = std::move(zoneId);
return *this;
}

SessionPool& SessionPool::setSqlDialect(std::string sqlDialect) {
sqlDialect_ = std::move(sqlDialect);
return *this;
}

SessionPool& SessionPool::setDatabase(std::string database) {
database_ = std::move(database);
return *this;
}

SessionPool& SessionPool::setEnableRedirection(bool enable) {
enableRedirection_ = enable;
return *this;
}

SessionPool& SessionPool::setEnableAutoFetch(bool enable) {
enableAutoFetch_ = enable;
return *this;
}

SessionPool& SessionPool::setEnableRPCCompression(bool enable) {
enableRPCCompression_ = enable;
return *this;
}

SessionPool& SessionPool::setConnectTimeoutMs(int connectTimeoutMs) {
connectTimeoutMs_ = connectTimeoutMs;
return *this;
}

SessionPool& SessionPool::setWaitToGetSessionTimeoutMs(int64_t timeoutMs) {
waitTimeoutMs_ = timeoutMs;
return *this;
}

SessionPool& SessionPool::setUseSSL(bool useSSL) {
useSSL_ = useSSL;
return *this;
}

SessionPool& SessionPool::setTrustCertFilePath(std::string path) {
trustCertFilePath_ = std::move(path);
return *this;
}

std::shared_ptr<Session> SessionPool::constructNewSession() {
AbstractSessionBuilder builder;
builder.host = host_;
builder.rpcPort = rpcPort_;
builder.nodeUrls = nodeUrls_;
builder.username = username_;
builder.password = password_;
builder.zoneId = zoneId_;
builder.fetchSize = fetchSize_;
builder.sqlDialect = sqlDialect_;
builder.database = database_;
builder.enableAutoFetch = enableAutoFetch_;
builder.enableRedirections = enableRedirection_;
builder.enableRPCCompression = enableRPCCompression_;
builder.connectTimeoutMs = connectTimeoutMs_;
builder.useSSL = useSSL_;
builder.trustCertFilePath = trustCertFilePath_;

auto session = std::make_shared<Session>(&builder);
session->open(enableRPCCompression_, connectTimeoutMs_);
return session;
}

std::shared_ptr<Session> SessionPool::acquire(int64_t timeoutMs) {
const int64_t effectiveTimeout = timeoutMs > 0 ? timeoutMs : waitTimeoutMs_;
std::unique_lock<std::mutex> lock(mutex_);
const auto deadline =
std::chrono::steady_clock::now() + std::chrono::milliseconds(effectiveTimeout);

while (true) {
if (closed_) {
throw IoTDBException("SessionPool is closed.");
}
if (!idleQueue_.empty()) {
auto session = idleQueue_.front();
idleQueue_.pop_front();
return session;
}
if (size_ < maxSize_) {
// Reserve a slot, then build the connection outside the lock so other
// borrowers are not blocked by network/handshake latency.
++size_;
lock.unlock();
std::shared_ptr<Session> session;
try {
session = constructNewSession();
} catch (...) {
lock.lock();
--size_;
cv_.notify_one();
throw;
}
return session;
}

// Pool exhausted: wait for a Session to be returned.
if (effectiveTimeout <= 0) {
cv_.wait(lock);
} else {
if (cv_.wait_until(lock, deadline) == std::cv_status::timeout && idleQueue_.empty() &&
size_ >= maxSize_ && !closed_) {
throw IoTDBException("Wait to get session timeout in SessionPool, maxSize=" +
std::to_string(maxSize_) + ", waitTimeoutMs=" +
std::to_string(effectiveTimeout) + ".");
}
}
}
}

void SessionPool::putBack(const std::shared_ptr<Session>& session, bool broken) {
std::lock_guard<std::mutex> lock(mutex_);
if (broken || closed_) {
// Drop the Session and free its slot so a healthy replacement can be created
// on demand. The caller (PooledSession::reset) still holds the last reference
// and tears the connection down after we return, i.e. outside this lock.
--size_;
} else {
idleQueue_.push_back(session);
}
cv_.notify_one();
}

PooledSession SessionPool::getSession() {
return getSession(waitTimeoutMs_);
}

PooledSession SessionPool::getSession(int64_t timeoutMs) {
return PooledSession(this, acquire(timeoutMs));
}

void SessionPool::insertTablet(Tablet& tablet, bool sorted) {
execute([&](Session& s) { s.insertTablet(tablet, sorted); });
}

void SessionPool::insertAlignedTablet(Tablet& tablet, bool sorted) {
execute([&](Session& s) { s.insertAlignedTablet(tablet, sorted); });
}

void SessionPool::insertTablets(std::unordered_map<std::string, Tablet*>& tablets, bool sorted) {
execute([&](Session& s) { s.insertTablets(tablets, sorted); });
}

void SessionPool::insertRecord(const std::string& deviceId, int64_t time,
const std::vector<std::string>& measurements,
const std::vector<std::string>& values) {
execute([&](Session& s) { s.insertRecord(deviceId, time, measurements, values); });
}

void SessionPool::insertRecords(const std::vector<std::string>& deviceIds,
const std::vector<int64_t>& times,
const std::vector<std::vector<std::string>>& measurementsList,
const std::vector<std::vector<std::string>>& valuesList) {
execute([&](Session& s) { s.insertRecords(deviceIds, times, measurementsList, valuesList); });
}

void SessionPool::executeNonQueryStatement(const std::string& sql) {
execute([&](Session& s) { s.executeNonQueryStatement(sql); });
}

PooledSessionDataSet SessionPool::executeQueryStatement(const std::string& sql) {
PooledSession lease = getSession();
try {
auto dataSet = lease->executeQueryStatement(sql);
return PooledSessionDataSet(std::move(lease), std::move(dataSet));
} catch (const IoTDBConnectionException&) {
lease.markBroken();
throw;
}
}

PooledSessionDataSet SessionPool::executeQueryStatement(const std::string& sql,
int64_t timeoutInMs) {
PooledSession lease = getSession();
try {
auto dataSet = lease->executeQueryStatement(sql, timeoutInMs);
return PooledSessionDataSet(std::move(lease), std::move(dataSet));
} catch (const IoTDBConnectionException&) {
lease.markBroken();
throw;
}
}

void SessionPool::close() {
std::deque<std::shared_ptr<Session>> toClose;
{
std::lock_guard<std::mutex> lock(mutex_);
if (closed_) {
return;
}
closed_ = true;
toClose.swap(idleQueue_);
size_ -= toClose.size();
}
cv_.notify_all();
// Sessions destructed here (outside the lock) close their connections.
toClose.clear();
}

size_t SessionPool::activeCount() {
std::lock_guard<std::mutex> lock(mutex_);
return size_ - idleQueue_.size();
}
Loading
Loading