From 6f7fe34535d2fc979dadca91aa505c2280a070c2 Mon Sep 17 00:00:00 2001 From: randomkang <550941794@qq.com> Date: Tue, 19 May 2026 01:01:00 +0800 Subject: [PATCH 1/7] detect and negotiate mtu --- src/brpc/rdma/rdma_endpoint.cpp | 40 ++++++++++++++++++++++++++++----- src/brpc/rdma/rdma_endpoint.h | 3 ++- src/brpc/rdma/rdma_helper.cpp | 30 +++++++++++++++++++++++++ src/brpc/rdma/rdma_helper.h | 1 + 4 files changed, 67 insertions(+), 7 deletions(-) diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index c69bf8ec07..a3981d3ba2 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -81,14 +81,16 @@ static const size_t RESERVED_WR_NUM = 3; // block size (4B) // sq size (2B) // rq size (2B) +// mtu type (2B) +// lid size (2B) // GID (16B) // QP number (4B) static const char* MAGIC_STR = "RDMA"; static const size_t MAGIC_STR_LEN = 4; -static const size_t HELLO_MSG_LEN_MIN = 40; +static const size_t HELLO_MSG_LEN_MIN = 42; // static const size_t HELLO_MSG_LEN_MAX = 4096; static const size_t ACK_MSG_LEN = 4; -static uint16_t g_rdma_hello_msg_len = 40; // In Byte +static uint16_t g_rdma_hello_msg_len = 42; // In Byte static uint16_t g_rdma_hello_version = 2; static uint16_t g_rdma_impl_version = 1; static uint32_t g_rdma_recv_block_size = 0; @@ -115,6 +117,7 @@ struct HelloMessage { uint32_t block_size; uint16_t sq_size; uint16_t rq_size; + uint16_t mtu_type; uint16_t lid; ibv_gid gid; uint32_t qp_num; @@ -130,6 +133,7 @@ void HelloMessage::Serialize(void* data) const { current_pos += 2; // move forward 4 Bytes *(current_pos++) = butil::HostToNet16(sq_size); *(current_pos++) = butil::HostToNet16(rq_size); + *(current_pos++) = butil::HostToNet16(mtu_type); *(current_pos++) = butil::HostToNet16(lid); memcpy(current_pos, gid.raw, 16); uint32_t* qp_num_pos = (uint32_t*)((char*)current_pos + 16); @@ -145,6 +149,7 @@ void HelloMessage::Deserialize(void* data) { current_pos += 2; // move forward 4 Bytes sq_size = butil::NetToHost16(*current_pos++); rq_size = butil::NetToHost16(*current_pos++); + mtu_type = butil::NetToHost16(*current_pos++); lid = butil::NetToHost16(*current_pos++); memcpy(gid.raw, current_pos, 16); qp_num = butil::NetToHost32(*(uint32_t*)((char*)current_pos + 16)); @@ -435,6 +440,7 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) { << "Start handshake on " << s->_local_side; uint8_t data[g_rdma_hello_msg_len]; + uint16_t local_mtu_type = detect_mtu(GetRdmaContext(), GetRdmaPortNum()); // First initialize CQ and QP resources ep->_state = C_ALLOC_QPCQ; @@ -455,6 +461,7 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) { local_msg.block_size = g_rdma_recv_block_size; local_msg.sq_size = ep->_sq_size; local_msg.rq_size = ep->_rq_size; + local_msg.mtu_type = local_mtu_type; local_msg.lid = GetRdmaLid(); local_msg.gid = GetRdmaGid(); if (BAIDU_LIKELY(ep->_resource)) { @@ -534,7 +541,9 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) { ep->_local_window_capacity, butil::memory_order_relaxed); ep->_state = C_BRINGUP_QP; - if (ep->BringUpQp(remote_msg.lid, remote_msg.gid, remote_msg.qp_num) < 0) { + // use the minimum of local mtu type and remote mtu type + uint16_t min_mtu_type = std::min(local_mtu_type, remote_msg.mtu_type); + if (ep->BringUpQp(remote_msg.lid, remote_msg.gid, remote_msg.qp_num, min_mtu_type) < 0) { LOG(WARNING) << "Fail to bringup QP, fallback to tcp:" << s->description(); rdma_transport->_rdma_state = RdmaTransport::RDMA_OFF; } else { @@ -582,6 +591,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) { << "Start handshake on " << s->description(); uint8_t data[g_rdma_hello_msg_len]; + uint16_t local_mtu_type = detect_mtu(GetRdmaContext(), GetRdmaPortNum()); ep->_state = S_HELLO_WAIT; if (ep->ReadFromFd(data, MAGIC_STR_LEN) < 0) { @@ -652,7 +662,9 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) { rdma_transport->_rdma_state = RdmaTransport::RDMA_OFF; } else { ep->_state = S_BRINGUP_QP; - if (ep->BringUpQp(remote_msg.lid, remote_msg.gid, remote_msg.qp_num) < 0) { + // use the minimum of local mtu type and remote mtu type + uint16_t min_mtu_type = std::min(local_mtu_type, remote_msg.mtu_type); + if (ep->BringUpQp(remote_msg.lid, remote_msg.gid, remote_msg.qp_num, min_mtu_type) < 0) { LOG(WARNING) << "Fail to bringup QP, fallback to tcp:" << s->description(); rdma_transport->_rdma_state = RdmaTransport::RDMA_OFF; @@ -673,6 +685,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) { local_msg.block_size = g_rdma_recv_block_size; local_msg.sq_size = ep->_sq_size; local_msg.rq_size = ep->_rq_size; + local_msg.mtu_type = local_mtu_type; local_msg.hello_ver = g_rdma_hello_version; local_msg.impl_ver = g_rdma_impl_version; if (BAIDU_LIKELY(ep->_resource)) { @@ -1232,12 +1245,27 @@ int RdmaEndpoint::AllocateResources() { return 0; } -int RdmaEndpoint::BringUpQp(uint16_t lid, ibv_gid gid, uint32_t qp_num) { +int RdmaEndpoint::BringUpQp(uint16_t lid, ibv_gid gid, uint32_t qp_num, uint16_t mtu_type) { if (BAIDU_UNLIKELY(g_skip_rdma_init)) { // For UT return 0; } + if (mtu_type == IBV_MTU_256) { + LOG(INFO) << "negotiated mtu is 256"; + } else if (mtu_type == IBV_MTU_512) { + LOG(INFO) << "negotiated mtu is 512"; + } else if (mtu_type == IBV_MTU_1024) { + LOG(INFO) << "negotiated mtu is 1024"; + } else if (mtu_type == IBV_MTU_2048) { + LOG(INFO) << "negotiated mtu is 2048"; + } else if (mtu_type == IBV_MTU_4096) { + LOG(INFO) << "negotiated mtu is 4096"; + } else { + LOG(ERROR) << "unknown mtu " << mtu_type; + return -1; + } + ibv_qp_attr attr; attr.qp_state = IBV_QPS_INIT; @@ -1275,7 +1303,7 @@ int RdmaEndpoint::BringUpQp(uint16_t lid, ibv_gid gid, uint32_t qp_num) { } attr.qp_state = IBV_QPS_RTR; - attr.path_mtu = IBV_MTU_1024; // TODO: support more mtu in future + attr.path_mtu = ibv_mtu(mtu_type); attr.ah_attr.grh.dgid = gid; attr.ah_attr.grh.flow_label = 0; attr.ah_attr.grh.sgid_index = GetRdmaGidIndex(); diff --git a/src/brpc/rdma/rdma_endpoint.h b/src/brpc/rdma/rdma_endpoint.h index 54a008f1f7..82c2911fa5 100644 --- a/src/brpc/rdma/rdma_endpoint.h +++ b/src/brpc/rdma/rdma_endpoint.h @@ -193,10 +193,11 @@ friend class Socket; // lid: remote LID // gid: remote GID // qp_num: remote QP number + // mtu_type: the minimum of local mtu_type and remote mtu_type // Return: // 0: success // -1: failed, errno set - int BringUpQp(uint16_t lid, ibv_gid gid, uint32_t qp_num); + int BringUpQp(uint16_t lid, ibv_gid gid, uint32_t qp_num, uint16_t mtu_type); // Get event from comp channel and ack the events int GetAndAckEvents(SocketUniquePtr& s); diff --git a/src/brpc/rdma/rdma_helper.cpp b/src/brpc/rdma/rdma_helper.cpp index 768bf615e2..00987f7ec7 100644 --- a/src/brpc/rdma/rdma_helper.cpp +++ b/src/brpc/rdma/rdma_helper.cpp @@ -701,6 +701,36 @@ bool SupportedByRdma(std::string protocol) { return false; } +uint16_t detect_mtu(struct ibv_context* ctx, int port_num) { + struct ibv_port_attr port_attr; + + if (ibv_query_port(ctx, port_num, &port_attr)) { + LOG(ERROR) << "ibv_query_port failed"; + return 0; + } + + LOG(INFO) << "local active mtu type:" << port_attr.active_mtu + << ", max mtu type:" << port_attr.max_mtu; + + ibv_mtu local_mtu_type = port_attr.active_mtu; + if (local_mtu_type == IBV_MTU_256) { + LOG(INFO) << "local mtu is 256"; + } else if (local_mtu_type == IBV_MTU_512) { + LOG(INFO) << "local mtu is 512"; + } else if (local_mtu_type == IBV_MTU_1024) { + LOG(INFO) << "local mtu is 1024"; + } else if (local_mtu_type == IBV_MTU_2048) { + LOG(INFO) << "local mtu is 2048"; + } else if (local_mtu_type == IBV_MTU_4096) { + LOG(INFO) << "local mtu is 4096"; + } else { + LOG(ERROR) << "unknown mtu " << local_mtu_type; + return 0; + } + + return local_mtu_type; +} + bool InitPollingModeWithTag(bthread_tag_t tag, std::function callback, std::function init_fn, diff --git a/src/brpc/rdma/rdma_helper.h b/src/brpc/rdma/rdma_helper.h index 052763325b..afabda7eb0 100644 --- a/src/brpc/rdma/rdma_helper.h +++ b/src/brpc/rdma/rdma_helper.h @@ -89,6 +89,7 @@ void GlobalDisableRdma(); // If the given protocol supported by RDMA bool SupportedByRdma(std::string protocol); +uint16_t detect_mtu(struct ibv_context* ctx, int port_num); } // namespace rdma } // namespace brpc #else From 52c3987b4273714834d8db1e2ee86c7b837110f6 Mon Sep 17 00:00:00 2001 From: randomkang <550941794@qq.com> Date: Wed, 20 May 2026 00:11:07 +0800 Subject: [PATCH 2/7] reorg code --- src/brpc/rdma/rdma_endpoint.cpp | 24 +++++++----- src/brpc/rdma/rdma_helper.cpp | 65 +++++++++++++++++++-------------- src/brpc/rdma/rdma_helper.h | 2 +- 3 files changed, 53 insertions(+), 38 deletions(-) diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index a3981d3ba2..ded5a67298 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -81,10 +81,10 @@ static const size_t RESERVED_WR_NUM = 3; // block size (4B) // sq size (2B) // rq size (2B) -// mtu type (2B) // lid size (2B) // GID (16B) // QP number (4B) +// mtu type (2B) static const char* MAGIC_STR = "RDMA"; static const size_t MAGIC_STR_LEN = 4; static const size_t HELLO_MSG_LEN_MIN = 42; @@ -117,10 +117,10 @@ struct HelloMessage { uint32_t block_size; uint16_t sq_size; uint16_t rq_size; - uint16_t mtu_type; uint16_t lid; ibv_gid gid; uint32_t qp_num; + uint16_t mtu_type; }; void HelloMessage::Serialize(void* data) const { @@ -133,11 +133,13 @@ void HelloMessage::Serialize(void* data) const { current_pos += 2; // move forward 4 Bytes *(current_pos++) = butil::HostToNet16(sq_size); *(current_pos++) = butil::HostToNet16(rq_size); - *(current_pos++) = butil::HostToNet16(mtu_type); *(current_pos++) = butil::HostToNet16(lid); memcpy(current_pos, gid.raw, 16); - uint32_t* qp_num_pos = (uint32_t*)((char*)current_pos + 16); + current_pos += 8; + uint32_t* qp_num_pos = (uint32_t*)(current_pos); *qp_num_pos = butil::HostToNet32(qp_num); + current_pos += 2; + *(current_pos) = butil::HostToNet16(mtu_type); } void HelloMessage::Deserialize(void* data) { @@ -149,10 +151,12 @@ void HelloMessage::Deserialize(void* data) { current_pos += 2; // move forward 4 Bytes sq_size = butil::NetToHost16(*current_pos++); rq_size = butil::NetToHost16(*current_pos++); - mtu_type = butil::NetToHost16(*current_pos++); lid = butil::NetToHost16(*current_pos++); memcpy(gid.raw, current_pos, 16); - qp_num = butil::NetToHost32(*(uint32_t*)((char*)current_pos + 16)); + current_pos += 8; + qp_num = butil::NetToHost32(*(uint32_t*)(current_pos)); + current_pos += 2; + mtu_type = butil::NetToHost16(*current_pos); } RdmaResource::~RdmaResource() { @@ -440,7 +444,7 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) { << "Start handshake on " << s->_local_side; uint8_t data[g_rdma_hello_msg_len]; - uint16_t local_mtu_type = detect_mtu(GetRdmaContext(), GetRdmaPortNum()); + uint16_t local_mtu_type = GetLocalMtuType(); // First initialize CQ and QP resources ep->_state = C_ALLOC_QPCQ; @@ -461,7 +465,6 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) { local_msg.block_size = g_rdma_recv_block_size; local_msg.sq_size = ep->_sq_size; local_msg.rq_size = ep->_rq_size; - local_msg.mtu_type = local_mtu_type; local_msg.lid = GetRdmaLid(); local_msg.gid = GetRdmaGid(); if (BAIDU_LIKELY(ep->_resource)) { @@ -470,6 +473,7 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) { // Only happens in UT local_msg.qp_num = 0; } + local_msg.mtu_type = local_mtu_type; memcpy(data, MAGIC_STR, 4); local_msg.Serialize((char*)data + 4); if (ep->WriteToFd(data, g_rdma_hello_msg_len) < 0) { @@ -591,7 +595,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) { << "Start handshake on " << s->description(); uint8_t data[g_rdma_hello_msg_len]; - uint16_t local_mtu_type = detect_mtu(GetRdmaContext(), GetRdmaPortNum()); + uint16_t local_mtu_type = GetLocalMtuType(); ep->_state = S_HELLO_WAIT; if (ep->ReadFromFd(data, MAGIC_STR_LEN) < 0) { @@ -685,7 +689,6 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) { local_msg.block_size = g_rdma_recv_block_size; local_msg.sq_size = ep->_sq_size; local_msg.rq_size = ep->_rq_size; - local_msg.mtu_type = local_mtu_type; local_msg.hello_ver = g_rdma_hello_version; local_msg.impl_ver = g_rdma_impl_version; if (BAIDU_LIKELY(ep->_resource)) { @@ -694,6 +697,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) { // Only happens in UT local_msg.qp_num = 0; } + local_msg.mtu_type = local_mtu_type; } memcpy(data, MAGIC_STR, 4); local_msg.Serialize((char*)data + 4); diff --git a/src/brpc/rdma/rdma_helper.cpp b/src/brpc/rdma/rdma_helper.cpp index 00987f7ec7..2fca644c3b 100644 --- a/src/brpc/rdma/rdma_helper.cpp +++ b/src/brpc/rdma/rdma_helper.cpp @@ -90,6 +90,8 @@ static int g_comp_vector_index = 0; butil::atomic g_rdma_available(false); +static uint16_t local_mtu_type = IBV_MTU_4096; + DEFINE_int32(rdma_max_sge, 0, "Max SGE num in a WR"); DEFINE_string(rdma_device, "", "The name of the HCA device used " "(Empty means using the first active device)"); @@ -455,6 +457,36 @@ static ibv_context* OpenDevice(int num_total, int* num_available_devices) { return ret_context; } +static uint16_t detect_mtu(struct ibv_context* ctx, int port_num) { + struct ibv_port_attr port_attr; + + if (ibv_query_port(ctx, port_num, &port_attr)) { + LOG(ERROR) << "ibv_query_port failed"; + return 0; + } + + LOG(INFO) << "local active mtu type:" << port_attr.active_mtu + << ", max mtu type:" << port_attr.max_mtu; + + uint16_t mtu_type = port_attr.active_mtu; + if (mtu_type == IBV_MTU_256) { + LOG(INFO) << "local mtu is 256"; + } else if (mtu_type == IBV_MTU_512) { + LOG(INFO) << "local mtu is 512"; + } else if (mtu_type == IBV_MTU_1024) { + LOG(INFO) << "local mtu is 1024"; + } else if (mtu_type == IBV_MTU_2048) { + LOG(INFO) << "local mtu is 2048"; + } else if (mtu_type == IBV_MTU_4096) { + LOG(INFO) << "local mtu is 4096"; + } else { + LOG(ERROR) << "unknown mtu type " << mtu_type; + return 0; + } + + return mtu_type; +} + static void GlobalRdmaInitializeOrDieImpl() { if (BAIDU_UNLIKELY(g_skip_rdma_init)) { // Just for UT @@ -549,6 +581,11 @@ static void GlobalRdmaInitializeOrDieImpl() { g_max_sge = attr.max_sge; } + local_mtu_type = detect_mtu(g_context, g_port_num); + if (!local_mtu_type) { + PLOG(ERROR) << "Fail to get local mtu type"; + ExitWithError(); + } // Initialize RDMA memory pool (block_pool) if (!InitBlockPool(RdmaRegisterMemory)) { PLOG(ERROR) << "Fail to initialize RDMA memory pool"; @@ -701,33 +738,7 @@ bool SupportedByRdma(std::string protocol) { return false; } -uint16_t detect_mtu(struct ibv_context* ctx, int port_num) { - struct ibv_port_attr port_attr; - - if (ibv_query_port(ctx, port_num, &port_attr)) { - LOG(ERROR) << "ibv_query_port failed"; - return 0; - } - - LOG(INFO) << "local active mtu type:" << port_attr.active_mtu - << ", max mtu type:" << port_attr.max_mtu; - - ibv_mtu local_mtu_type = port_attr.active_mtu; - if (local_mtu_type == IBV_MTU_256) { - LOG(INFO) << "local mtu is 256"; - } else if (local_mtu_type == IBV_MTU_512) { - LOG(INFO) << "local mtu is 512"; - } else if (local_mtu_type == IBV_MTU_1024) { - LOG(INFO) << "local mtu is 1024"; - } else if (local_mtu_type == IBV_MTU_2048) { - LOG(INFO) << "local mtu is 2048"; - } else if (local_mtu_type == IBV_MTU_4096) { - LOG(INFO) << "local mtu is 4096"; - } else { - LOG(ERROR) << "unknown mtu " << local_mtu_type; - return 0; - } - +uint16_t GetLocalMtuType() { return local_mtu_type; } diff --git a/src/brpc/rdma/rdma_helper.h b/src/brpc/rdma/rdma_helper.h index afabda7eb0..8f0f472e4b 100644 --- a/src/brpc/rdma/rdma_helper.h +++ b/src/brpc/rdma/rdma_helper.h @@ -89,7 +89,7 @@ void GlobalDisableRdma(); // If the given protocol supported by RDMA bool SupportedByRdma(std::string protocol); -uint16_t detect_mtu(struct ibv_context* ctx, int port_num); +uint16_t GetLocalMtuType(); } // namespace rdma } // namespace brpc #else From 7a5413c947c6b07e77f4e2ad6aceb09921892248 Mon Sep 17 00:00:00 2001 From: randomkang <550941794@qq.com> Date: Thu, 21 May 2026 00:09:48 +0800 Subject: [PATCH 3/7] compatible with older versions --- src/brpc/rdma/rdma_endpoint.cpp | 100 ++++++++++++++++++++++++++------ 1 file changed, 81 insertions(+), 19 deletions(-) diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index ded5a67298..e2946c52c1 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -87,7 +87,7 @@ static const size_t RESERVED_WR_NUM = 3; // mtu type (2B) static const char* MAGIC_STR = "RDMA"; static const size_t MAGIC_STR_LEN = 4; -static const size_t HELLO_MSG_LEN_MIN = 42; +static const size_t HELLO_MSG_LEN_MIN = 40; // static const size_t HELLO_MSG_LEN_MAX = 4096; static const size_t ACK_MSG_LEN = 4; static uint16_t g_rdma_hello_msg_len = 42; // In Byte @@ -108,9 +108,12 @@ static butil::Mutex* g_rdma_resource_mutex = NULL; static RdmaResource* g_rdma_resource_list = NULL; struct HelloMessage { - void Serialize(void* data) const; - void Deserialize(void* data); + void BaseSerialize(void* data) const; + void ExtSerialize(void* data) const; + void BaseDeserialize(void* data); + void ExtDeserialize(void* data, uint16_t ext_len); + // base fields uint16_t msg_len; uint16_t hello_ver; uint16_t impl_ver; @@ -120,10 +123,12 @@ struct HelloMessage { uint16_t lid; ibv_gid gid; uint32_t qp_num; + + // extern fields uint16_t mtu_type; }; -void HelloMessage::Serialize(void* data) const { +void HelloMessage::BaseSerialize(void* data) const { uint16_t* current_pos = (uint16_t*)data; *(current_pos++) = butil::HostToNet16(msg_len); *(current_pos++) = butil::HostToNet16(hello_ver); @@ -138,11 +143,14 @@ void HelloMessage::Serialize(void* data) const { current_pos += 8; uint32_t* qp_num_pos = (uint32_t*)(current_pos); *qp_num_pos = butil::HostToNet32(qp_num); - current_pos += 2; +} + +void HelloMessage::ExtSerialize(void* data) const { + uint16_t* current_pos = (uint16_t*)data; *(current_pos) = butil::HostToNet16(mtu_type); } -void HelloMessage::Deserialize(void* data) { +void HelloMessage::BaseDeserialize(void* data) { uint16_t* current_pos = (uint16_t*)data; msg_len = butil::NetToHost16(*current_pos++); hello_ver = butil::NetToHost16(*current_pos++); @@ -155,8 +163,23 @@ void HelloMessage::Deserialize(void* data) { memcpy(gid.raw, current_pos, 16); current_pos += 8; qp_num = butil::NetToHost32(*(uint32_t*)(current_pos)); - current_pos += 2; - mtu_type = butil::NetToHost16(*current_pos); +} + +uint16_t HelloMessage::ExtDeserialize(void* data, uint16_t ext_len) { + if (ext_len == 0) { + return 0; + } + + // try to deserialize mtu_type + if (ext_len < 2) { + LOG(FATAL) << "illegal HelloMessage, ext len is " << ext_len << ", should not be less than 2!!!"; + } + uint16_t* current_pos = (uint16_t*)data; + mtu_type = butil::NetToHost16(*current_pos++); + + ext_len -= 2; + + return ext_len; } RdmaResource::~RdmaResource() { @@ -475,7 +498,8 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) { } local_msg.mtu_type = local_mtu_type; memcpy(data, MAGIC_STR, 4); - local_msg.Serialize((char*)data + 4); + local_msg.BaseSerialize((char*)data + 4); + local_msg.ExtSerialize((char*)data + HELLO_MSG_LEN_MIN); if (ep->WriteToFd(data, g_rdma_hello_msg_len) < 0) { const int saved_errno = errno; PLOG(WARNING) << "Fail to send hello message to server:" << s->description(); @@ -513,7 +537,7 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) { return NULL; } HelloMessage remote_msg; - remote_msg.Deserialize(data); + remote_msg.BaseDeserialize(data); if (remote_msg.msg_len < HELLO_MSG_LEN_MIN) { LOG(WARNING) << "Fail to parse Hello Message length from server:" << s->description(); @@ -523,9 +547,27 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) { return NULL; } + // In older versions of brpc, IBV_MTU_1024 is the default mtu type, + // So we set remote_mtu IBV_MTU_1024 at default to be ompatible with older versions. + uint16_t remote_mtu_type = IBV_MTU_1024; if (remote_msg.msg_len > HELLO_MSG_LEN_MIN) { - // TODO: Read Hello Message customized data - // Just for future use, should not happen now + // Read Hello Message customized data + uint16_t remote_msg_ext_len = remote_msg.msg_len - HELLO_MSG_LEN_MIN; + uint8_t ext_data[remote_msg_ext_len]; + if (ep->ReadFromFd(ext_data, remote_msg_ext_len) < 0) { + const int saved_errno = errno; + PLOG(WARNING) << "Fail to get Hello Message ext fields from server:" << s->description(); + s->SetFailed(saved_errno, "Fail to complete rdma handshake from %s: %s", + s->description().c_str(), berror(saved_errno)); + ep->_state = FAILED; + return NULL; + } + remote_msg.ExtDeserialize(ext_data, remote_msg_ext_len); + if (remote_msg_ext_len >= 2) { + // mtu_type field is valid + remote_mtu_type = remote_msg.mtu_type; + } + // TODO: other extern fields } if (!HelloNegotiationValid(remote_msg)) { @@ -546,7 +588,7 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) { ep->_state = C_BRINGUP_QP; // use the minimum of local mtu type and remote mtu type - uint16_t min_mtu_type = std::min(local_mtu_type, remote_msg.mtu_type); + uint16_t min_mtu_type = std::min(local_mtu_type, remote_mtu_type); if (ep->BringUpQp(remote_msg.lid, remote_msg.gid, remote_msg.qp_num, min_mtu_type) < 0) { LOG(WARNING) << "Fail to bringup QP, fallback to tcp:" << s->description(); rdma_transport->_rdma_state = RdmaTransport::RDMA_OFF; @@ -619,7 +661,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) { return NULL; } - if (ep->ReadFromFd(data, g_rdma_hello_msg_len - MAGIC_STR_LEN) < 0) { + if (ep->ReadFromFd(data, HELLO_MSG_LEN_MIN - MAGIC_STR_LEN) < 0) { const int saved_errno = errno; PLOG(WARNING) << "Fail to read Hello Message from client:" << s->description(); s->SetFailed(saved_errno, "Fail to complete rdma handshake from %s: %s", @@ -629,7 +671,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) { } HelloMessage remote_msg; - remote_msg.Deserialize(data); + remote_msg.BaseDeserialize(data); if (remote_msg.msg_len < HELLO_MSG_LEN_MIN) { LOG(WARNING) << "Fail to parse Hello Message length from client:" << s->description(); @@ -638,9 +680,28 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) { ep->_state = FAILED; return NULL; } + + // In older versions of brpc, IBV_MTU_1024 is the default mtu type, + // So we set remote_mtu IBV_MTU_1024 at default to be ompatible with older versions. + uint16_t remote_mtu_type = IBV_MTU_1024; if (remote_msg.msg_len > HELLO_MSG_LEN_MIN) { - // TODO: Read Hello Message customized header - // Just for future use, should not happen now + // Read Hello Message customized data + uint16_t remote_msg_ext_len = remote_msg.msg_len - HELLO_MSG_LEN_MIN; + uint8_t ext_data[remote_msg_ext_len]; + if (ep->ReadFromFd(ext_data, remote_msg_ext_len) < 0) { + const int saved_errno = errno; + PLOG(WARNING) << "Fail to get Hello Message ext fields from client:" << s->description(); + s->SetFailed(saved_errno, "Fail to complete rdma handshake from %s: %s", + s->description().c_str(), berror(saved_errno)); + ep->_state = FAILED; + return NULL; + } + remote_msg.ExtDeserialize(ext_data, remote_msg_ext_len); + if (remote_msg_ext_len >= 2) { + // mtu_type field is valid + remote_mtu_type = remote_msg.mtu_type; + } + // TODO: other extern fields } if (!HelloNegotiationValid(remote_msg)) { @@ -667,7 +728,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) { } else { ep->_state = S_BRINGUP_QP; // use the minimum of local mtu type and remote mtu type - uint16_t min_mtu_type = std::min(local_mtu_type, remote_msg.mtu_type); + uint16_t min_mtu_type = std::min(local_mtu_type, remote_mtu_type); if (ep->BringUpQp(remote_msg.lid, remote_msg.gid, remote_msg.qp_num, min_mtu_type) < 0) { LOG(WARNING) << "Fail to bringup QP, fallback to tcp:" << s->description(); @@ -700,7 +761,8 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) { local_msg.mtu_type = local_mtu_type; } memcpy(data, MAGIC_STR, 4); - local_msg.Serialize((char*)data + 4); + local_msg.BaseSerialize((char*)data + 4); + local_msg.ExtSerialize((char*)data + HELLO_MSG_LEN_MIN); if (ep->WriteToFd(data, g_rdma_hello_msg_len) < 0) { const int saved_errno = errno; PLOG(WARNING) << "Fail to send Hello Message to client:" << s->description(); From 33666c9bc7d4d6261d5c72781f0006781864f8e2 Mon Sep 17 00:00:00 2001 From: randomkang <550941794@qq.com> Date: Thu, 21 May 2026 00:24:42 +0800 Subject: [PATCH 4/7] Fix code --- src/brpc/rdma/rdma_endpoint.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index e2946c52c1..65a1a8c857 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -111,7 +111,7 @@ struct HelloMessage { void BaseSerialize(void* data) const; void ExtSerialize(void* data) const; void BaseDeserialize(void* data); - void ExtDeserialize(void* data, uint16_t ext_len); + uint16_t ExtDeserialize(void* data, uint16_t ext_len); // base fields uint16_t msg_len; From 73628784bc1bfad66616a924a139a05ed6efee5d Mon Sep 17 00:00:00 2001 From: randomkang <550941794@qq.com> Date: Thu, 21 May 2026 00:29:00 +0800 Subject: [PATCH 5/7] fix code --- src/brpc/rdma/rdma_endpoint.cpp | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index 65a1a8c857..7305f40846 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -107,6 +107,9 @@ static const uint32_t ACK_MSG_RDMA_OK = 0x1; static butil::Mutex* g_rdma_resource_mutex = NULL; static RdmaResource* g_rdma_resource_list = NULL; +// The HelloMessage should have all base fields, and the new versions of HelloMessage +// maybe have some extern fields. + struct HelloMessage { void BaseSerialize(void* data) const; void ExtSerialize(void* data) const; @@ -170,16 +173,17 @@ uint16_t HelloMessage::ExtDeserialize(void* data, uint16_t ext_len) { return 0; } + uint16_t remain_ext_len = ext_len; + // try to deserialize mtu_type - if (ext_len < 2) { - LOG(FATAL) << "illegal HelloMessage, ext len is " << ext_len << ", should not be less than 2!!!"; + if (remain_ext_len < 2) { + LOG(FATAL) << "illegal HelloMessage, remain ext len is " << remain_ext_len << ", should not be less than 2!!!"; } uint16_t* current_pos = (uint16_t*)data; mtu_type = butil::NetToHost16(*current_pos++); + remain_ext_len -= 2; - ext_len -= 2; - - return ext_len; + return remain_ext_len; } RdmaResource::~RdmaResource() { From 9cd256595cc21cd164051a64a92c0d29b33182f5 Mon Sep 17 00:00:00 2001 From: randomkang <550941794@qq.com> Date: Thu, 21 May 2026 23:37:09 +0800 Subject: [PATCH 6/7] FLAGS_rdma_extend --- src/brpc/rdma/rdma_endpoint.cpp | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index 7305f40846..8fc1c18273 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -67,6 +67,7 @@ DEFINE_int32(rdma_poller_num, 1, "Poller number in RDMA polling mode."); DEFINE_bool(rdma_poller_yield, false, "Yield thread in RDMA polling mode."); DEFINE_bool(rdma_disable_bthread, false, "Disable bthread in RDMA"); DEFINE_bool(rdma_ece, false, "Open ece in RDMA, should use this feature when rdma nics are from the same merchant."); +DEFINE_bool(rdma_extend, false, "Use the extend fields to negotiate the advance feature of rdma, such as mtu."); static const size_t IOBUF_BLOCK_HEADER_LEN = 32; // implementation-dependent @@ -503,8 +504,12 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) { local_msg.mtu_type = local_mtu_type; memcpy(data, MAGIC_STR, 4); local_msg.BaseSerialize((char*)data + 4); - local_msg.ExtSerialize((char*)data + HELLO_MSG_LEN_MIN); - if (ep->WriteToFd(data, g_rdma_hello_msg_len) < 0) { + // If FLAGS_rdma_extend is not open, only send base fields of HelloMessage + if (FLAGS_rdma_extend) { + local_msg.ExtSerialize((char*)data + HELLO_MSG_LEN_MIN); + } + size_t msg_len = FLAGS_rdma_extend ? g_rdma_hello_msg_len : HELLO_MSG_LEN_MIN; + if (ep->WriteToFd(data, msg_len) < 0) { const int saved_errno = errno; PLOG(WARNING) << "Fail to send hello message to server:" << s->description(); s->SetFailed(saved_errno, "Fail to complete rdma handshake from %s: %s", @@ -554,7 +559,7 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) { // In older versions of brpc, IBV_MTU_1024 is the default mtu type, // So we set remote_mtu IBV_MTU_1024 at default to be ompatible with older versions. uint16_t remote_mtu_type = IBV_MTU_1024; - if (remote_msg.msg_len > HELLO_MSG_LEN_MIN) { + if (FLAGS_rdma_extend && remote_msg.msg_len > HELLO_MSG_LEN_MIN) { // Read Hello Message customized data uint16_t remote_msg_ext_len = remote_msg.msg_len - HELLO_MSG_LEN_MIN; uint8_t ext_data[remote_msg_ext_len]; @@ -688,7 +693,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) { // In older versions of brpc, IBV_MTU_1024 is the default mtu type, // So we set remote_mtu IBV_MTU_1024 at default to be ompatible with older versions. uint16_t remote_mtu_type = IBV_MTU_1024; - if (remote_msg.msg_len > HELLO_MSG_LEN_MIN) { + if (FLAGS_rdma_extend && remote_msg.msg_len > HELLO_MSG_LEN_MIN) { // Read Hello Message customized data uint16_t remote_msg_ext_len = remote_msg.msg_len - HELLO_MSG_LEN_MIN; uint8_t ext_data[remote_msg_ext_len]; @@ -766,8 +771,12 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) { } memcpy(data, MAGIC_STR, 4); local_msg.BaseSerialize((char*)data + 4); - local_msg.ExtSerialize((char*)data + HELLO_MSG_LEN_MIN); - if (ep->WriteToFd(data, g_rdma_hello_msg_len) < 0) { + // If FLAGS_rdma_extend is not open, only send base fields of HelloMessage + if (FLAGS_rdma_extend) { + local_msg.ExtSerialize((char*)data + HELLO_MSG_LEN_MIN); + } + size_t msg_len = FLAGS_rdma_extend ? g_rdma_hello_msg_len : HELLO_MSG_LEN_MIN; + if (ep->WriteToFd(data, msg_len) < 0) { const int saved_errno = errno; PLOG(WARNING) << "Fail to send Hello Message to client:" << s->description(); s->SetFailed(saved_errno, "Fail to complete rdma handshake from %s: %s", From 19ca407331593dcb27cf58ad64f011139ead9973 Mon Sep 17 00:00:00 2001 From: randomkang <550941794@qq.com> Date: Thu, 21 May 2026 23:39:47 +0800 Subject: [PATCH 7/7] IbvQueryPort --- src/brpc/rdma/rdma_helper.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/brpc/rdma/rdma_helper.cpp b/src/brpc/rdma/rdma_helper.cpp index 2fca644c3b..97afe604ff 100644 --- a/src/brpc/rdma/rdma_helper.cpp +++ b/src/brpc/rdma/rdma_helper.cpp @@ -460,8 +460,8 @@ static ibv_context* OpenDevice(int num_total, int* num_available_devices) { static uint16_t detect_mtu(struct ibv_context* ctx, int port_num) { struct ibv_port_attr port_attr; - if (ibv_query_port(ctx, port_num, &port_attr)) { - LOG(ERROR) << "ibv_query_port failed"; + if (IbvQueryPort(ctx, port_num, &port_attr)) { + LOG(ERROR) << "IbvQueryPort failed"; return 0; }