From 52c39a9997aa2e8187828ce54afeae58003d9a47 Mon Sep 17 00:00:00 2001 From: chenBright Date: Mon, 2 Feb 2026 19:50:34 +0800 Subject: [PATCH 1/2] Copy http headers from main controller to sub controller --- src/brpc/parallel_channel.cpp | 3 +++ src/brpc/selective_channel.cpp | 4 +++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/brpc/parallel_channel.cpp b/src/brpc/parallel_channel.cpp index 130712bfb9..66b8f0cb75 100644 --- a/src/brpc/parallel_channel.cpp +++ b/src/brpc/parallel_channel.cpp @@ -724,6 +724,9 @@ void ParallelChannel::CallMethod( ParallelChannelDone::SubDone* sd = d->sub_done(j++); // Forward the attachment to each sub call sd->cntl.request_attachment().append(cntl->request_attachment()); + if (PROTOCOL_HTTP == cntl->request_protocol()) { + sd->cntl.http_request() = cntl->http_request(); + } _chans[i].chan->CallMethod(sd->ap.method, &sd->cntl, sd->ap.request, sd->ap.response, sd); } diff --git a/src/brpc/selective_channel.cpp b/src/brpc/selective_channel.cpp index ec93354121..00810b04ed 100644 --- a/src/brpc/selective_channel.cpp +++ b/src/brpc/selective_channel.cpp @@ -344,7 +344,9 @@ int Sender::IssueRPC(int64_t start_realtime_us) { sub_cntl->set_request_code(_main_cntl->request_code()); // Forward request attachment to the subcall sub_cntl->request_attachment().append(_main_cntl->request_attachment()); - sub_cntl->http_request() = _main_cntl->http_request(); + if (PROTOCOL_HTTP == _main_cntl->request_protocol()) { + sub_cntl->http_request() = _main_cntl->http_request(); + } sel_out.channel()->CallMethod(_main_cntl->_method, &r.sub_done->_cntl, From 913dffd5e88a13d55fb572b2a97ed8941bfaed88 Mon Sep 17 00:00:00 2001 From: chenBright Date: Wed, 25 Feb 2026 17:08:01 +0800 Subject: [PATCH 2/2] Support custom modification of sub controllers --- docs/cn/combo_channel.md | 16 +++++++ docs/en/combo_channel.md | 14 ++++++ src/brpc/parallel_channel.cpp | 24 ++++++++-- src/brpc/parallel_channel.h | 16 ++++++- src/brpc/policy/redis_protocol.cpp | 2 +- src/brpc/selective_channel.cpp | 10 ++-- src/brpc/socket.cpp | 3 +- test/brpc_channel_unittest.cpp | 75 +++++++++++++++++++++++++++--- test/echo.proto | 1 + 9 files changed, 139 insertions(+), 22 deletions(-) diff --git a/docs/cn/combo_channel.md b/docs/cn/combo_channel.md index e11c79b46b..fba4f6be69 100644 --- a/docs/cn/combo_channel.md +++ b/docs/cn/combo_channel.md @@ -60,8 +60,12 @@ public: const google::protobuf::MethodDescriptor* method, const google::protobuf::Message* request, google::protobuf::Message* response) = 0; + + virtual void MapController(int channel_index/*starting from 0*/, int channel_count, + const Controller* main_cntl, Controller* sub_cntl); }; ``` +### Map channel_index:该sub channel在ParallelChannel中的位置,从0开始计数。 @@ -124,6 +128,18 @@ method/request/response:ParallelChannel.CallMethod()的参数。 }; ``` +### MapController + +channel_index:该sub channel在ParallelChannel中的位置,从0开始计数。 + +channel_count:ParallelChannel中sub channel的数量。 + +main_cntl:ParallelChannel.CallMethod()的参数。 + +sub_cntl:sub channel的请求对应的controller。默认实现:拷贝main_cntl的http_request和request_attachment到sub_cntl中。 + +注意:修改ClientSettings相关配置(如超时、重试等)是无效的,因为所有sub_cntl都是使用main_cntl的ClientSettings配置。 + ## ResponseMerger response_merger把sub channel的response合并入总的response,其为NULL时,则使用response->MergeFrom(*sub_response),MergeFrom的行为可概括为“除了合并repeated字段,其余都是覆盖”。如果你需要更复杂的行为,则需实现ResponseMerger。response_merger是一个个执行的,所以你并不需要考虑多个Merge同时运行的情况。response_merger在ParallelChannel析构时被删除。response_merger内含引用计数,一个response_merger可与多个sub channel关联。 diff --git a/docs/en/combo_channel.md b/docs/en/combo_channel.md index 686fad59c1..ab68188fe2 100644 --- a/docs/en/combo_channel.md +++ b/docs/en/combo_channel.md @@ -63,6 +63,8 @@ public: }; ``` +### Map + `channel_index`: The position of the sub channel inside `ParallelChannel`, starting from zero. `channel_count`: The sub channel count inside `ParallelChannel`. @@ -131,6 +133,18 @@ Common implementations of `Map()` are listed below: }; ``` +### MapController + +`channel_index`: The position of the sub channel inside `ParallelChannel`, starting from zero. + +`channel_count`: The sub channel count inside `ParallelChannel`. + +`main_cntl`:Parameters to `ParallelChannel::CallMethod()`. + +`sub_cntl`:The controller corresponding to the sub-channel's requests. Default implementation: Copy the http_request and request_attachment of `main_cntl` to the `sub_cntl`. + +Note: Modifying `ClientSettings` configurations (such as timeout and retries) is ineffective because all sub controllers use the `ClientSettings` configuration of `main_cntl`. + ## ResponseMerger `response_merger` merges responses from all sub channels into one for the `ParallelChannel`. When it's NULL, `response->MergeFrom(*sub_response)` is used instead, whose behavior can be summarized as "merge repeated fields and overwrite the rest". If you need more complex behavior, implement `ResponseMerger`. Multiple `response_merger` are called one by one to merge sub responses so that you do not need to consider the race conditions between merging multiple responses simultaneously. The object is deleted when `ParallelChannel ` destructs. Due to the reference counting inside, `response_merger ` can be associated with multiple sub channels. diff --git a/src/brpc/parallel_channel.cpp b/src/brpc/parallel_channel.cpp index 66b8f0cb75..de2b86f1c4 100644 --- a/src/brpc/parallel_channel.cpp +++ b/src/brpc/parallel_channel.cpp @@ -612,6 +612,7 @@ void ParallelChannel::CallMethod( int ndone = nchan; int fail_limit = 1; int success_limit = 1; + Controller::ClientSettings settings{}; DEFINE_SMALL_ARRAY(SubCall, aps, nchan, 64); if (cntl->FailedInline()) { @@ -718,15 +719,28 @@ void ParallelChannel::CallMethod( d->SaveThreadInfoOfCallsite(); CHECK_EQ(0, bthread_id_unlock(cid)); // Don't touch `cntl' and `d' again (for async RPC) - + + // Apply client settings of _cntl to controllers of sub calls, except + // timeout. If we let sub channel do their timeout separately, when + // timeout happens, we get ETOOMANYFAILS rather than ERPCTIMEDOUT. + cntl->SaveClientSettings(&settings); + settings.timeout_ms = -1; for (int i = 0, j = 0; i < nchan; ++i) { if (!aps[i].is_skip()) { ParallelChannelDone::SubDone* sd = d->sub_done(j++); - // Forward the attachment to each sub call - sd->cntl.request_attachment().append(cntl->request_attachment()); - if (PROTOCOL_HTTP == cntl->request_protocol()) { - sd->cntl.http_request() = cntl->http_request(); + if (NULL != _chans[i].call_mapper) { + _chans[i].call_mapper->MapController(i, nchan, cntl, &sd->cntl); + } else { + // Forward the attachment to each sub call. + sd->cntl.request_attachment().append(cntl->request_attachment()); } + sd->cntl.ApplyClientSettings(settings); + sd->cntl.allow_done_to_run_in_place(); + } + } + for (int i = 0, j = 0; i < nchan; ++i) { + if (!aps[i].is_skip()) { + ParallelChannelDone::SubDone* sd = d->sub_done(j++); _chans[i].chan->CallMethod(sd->ap.method, &sd->cntl, sd->ap.request, sd->ap.response, sd); } diff --git a/src/brpc/parallel_channel.h b/src/brpc/parallel_channel.h index 84e5f342cb..292213c1ad 100644 --- a/src/brpc/parallel_channel.h +++ b/src/brpc/parallel_channel.h @@ -91,6 +91,14 @@ struct SubCall { // } // return SubCall(sub_method, request->sub_request(channel_index), // response->add_sub_response(), 0); +// MapController calls to ParallelChannel to sub channels, which can have +// different controllers. +// Note: +// Modifying ClientSettings configurations (such as timeout, retries, etc.) +// is ineffective because all sub-controllers use the main controller's +// ClientSettings configuration. +// Examples: +// sub_cntl->http_request().SetHeader(...); class CallMapper : public SharedObject { public: virtual SubCall Map(int channel_index/*starting from 0*/, @@ -98,7 +106,13 @@ class CallMapper : public SharedObject { const google::protobuf::MethodDescriptor* method, const google::protobuf::Message* request, google::protobuf::Message* response) { - return Map(channel_index, method, request, response); + return Map(channel_index, method, request, response); + } + + virtual void MapController(int channel_index/*starting from 0*/, int channel_count, + const Controller* main_cntl, Controller* sub_cntl) { + // Forward the attachment to each sub call by default. + sub_cntl->request_attachment().append(main_cntl->request_attachment()); } protected: diff --git a/src/brpc/policy/redis_protocol.cpp b/src/brpc/policy/redis_protocol.cpp index f8acf49d6a..9e8e148ebf 100644 --- a/src/brpc/policy/redis_protocol.cpp +++ b/src/brpc/policy/redis_protocol.cpp @@ -283,7 +283,7 @@ void SerializeRedisRequest(butil::IOBuf* buf, const RedisRequest* rr = (const RedisRequest*)request; // If redis byte size is zero, brpc call will fail with E22. Continuous E22 may cause E112 in the end. // So set failed and return useful error message - if (rr->ByteSize() == 0) { + if (GetProtobufByteSize(*rr) == 0) { return cntl->SetFailed(EREQUEST, "request byte size is empty"); } // We work around SerializeTo of pb which is just a placeholder. diff --git a/src/brpc/selective_channel.cpp b/src/brpc/selective_channel.cpp index 00810b04ed..567ffa51b8 100644 --- a/src/brpc/selective_channel.cpp +++ b/src/brpc/selective_channel.cpp @@ -344,15 +344,13 @@ int Sender::IssueRPC(int64_t start_realtime_us) { sub_cntl->set_request_code(_main_cntl->request_code()); // Forward request attachment to the subcall sub_cntl->request_attachment().append(_main_cntl->request_attachment()); - if (PROTOCOL_HTTP == _main_cntl->request_protocol()) { + ProtocolType protocol = _main_cntl->request_protocol(); + if (PROTOCOL_HTTP == protocol || PROTOCOL_H2 == protocol) { sub_cntl->http_request() = _main_cntl->http_request(); } - sel_out.channel()->CallMethod(_main_cntl->_method, - &r.sub_done->_cntl, - _request, - r.response, - r.sub_done); + sel_out.channel()->CallMethod(_main_cntl->_method, &r.sub_done->_cntl, + _request, r.response, r.sub_done); return 0; } diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index b132f2acea..c123fb6b6e 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -896,8 +896,7 @@ void Socket::OnFailed(int error_code, const std::string& error_text) { // comes online. if (HCEnabled()) { GetOrNewSharedPart()->circuit_breaker.MarkAsBroken(); - StartHealthCheck(id(), - GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms()); + StartHealthCheck(id(), GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms()); } // Wake up all threads waiting on EPOLLOUT when closing fd _epollout_butex->fetch_add(1, butil::memory_order_relaxed); diff --git a/test/brpc_channel_unittest.cpp b/test/brpc_channel_unittest.cpp index 66d1fbad9b..86bee89105 100644 --- a/test/brpc_channel_unittest.cpp +++ b/test/brpc_channel_unittest.cpp @@ -176,6 +176,16 @@ class MyEchoService : public ::test::EchoService { res->add_code_list(req->code()); } res->set_receiving_socket_id(cntl->_current_call.sending_sock->id()); + + brpc::ProtocolType protocol = cntl->request_protocol(); + if ((brpc::PROTOCOL_HTTP == protocol || brpc::PROTOCOL_H2 == protocol) && + !req->http_header().empty()) { + ASSERT_FALSE(req->http_header().empty()); + const std::string* val = cntl->http_request().GetHeader(req->http_header()); + ASSERT_TRUE(val); + ASSERT_FALSE(val->empty()); + cntl->http_response().SetHeader(req->http_header(), *val); + } } static void CallAfterRpc(std::shared_ptr str, brpc::Controller* cntl, @@ -310,8 +320,10 @@ class ChannelTest : public ::testing::Test{ bool short_connection, const brpc::Authenticator* auth = NULL, std::string connection_group = std::string(), - bool use_backup_request_policy = false) { + bool use_backup_request_policy = false, + brpc::ProtocolType protocol = brpc::PROTOCOL_BAIDU_STD) { brpc::ChannelOptions opt; + opt.protocol = protocol; if (short_connection) { opt.connection_type = brpc::CONNECTION_TYPE_SHORT; } @@ -526,7 +538,7 @@ class ChannelTest : public ::testing::Test{ int channel_index, const google::protobuf::MethodDescriptor* method, const google::protobuf::Message* req_base, - google::protobuf::Message* response) { + google::protobuf::Message* response) override { test::EchoRequest* req = brpc::Clone(req_base); req->set_code(channel_index + 1/*non-zero*/); return brpc::SubCall(method, req, response->New(), @@ -540,7 +552,7 @@ class ChannelTest : public ::testing::Test{ int channel_index, const google::protobuf::MethodDescriptor* method, const google::protobuf::Message* req_base, - google::protobuf::Message* response) { + google::protobuf::Message* response) override { if (channel_index % 2) { return brpc::SubCall::Skip(); } @@ -554,7 +566,7 @@ class ChannelTest : public ::testing::Test{ int channel_index, const google::protobuf::MethodDescriptor* method, const google::protobuf::Message* req_base, - google::protobuf::Message* res_base) { + google::protobuf::Message* res_base) override { const test::ComboRequest* req = dynamic_cast(req_base); test::ComboResponse* res = dynamic_cast(res_base); @@ -1334,7 +1346,7 @@ class ChannelTest : public ::testing::Test{ int /*channel_index*/, const google::protobuf::MethodDescriptor* method, const google::protobuf::Message* req_base, - google::protobuf::Message* response) { + google::protobuf::Message* response) override { test::EchoRequest* req = brpc::Clone(req_base); req->set_sleep_us(70000); // 70ms return brpc::SubCall(method, req, response->New(), @@ -2357,7 +2369,7 @@ class BadCall : public brpc::CallMapper { brpc::SubCall Map(int, const google::protobuf::MethodDescriptor*, const google::protobuf::Message*, - google::protobuf::Message*) { + google::protobuf::Message*) override { return brpc::SubCall::Bad(); } }; @@ -2384,7 +2396,7 @@ class SkipCall : public brpc::CallMapper { brpc::SubCall Map(int, const google::protobuf::MethodDescriptor*, const google::protobuf::Message*, - google::protobuf::Message*) { + google::protobuf::Message*) override { return brpc::SubCall::Skip(); } }; @@ -2412,6 +2424,55 @@ TEST_F(ChannelTest, skip_all_channels) { } } +static const std::string ECHO_HTTP_HEADER = "echo-http-header"; + +class EchoHttpHeader : public brpc::CallMapper { +public: + brpc::SubCall Map(int channel_index, int channel_count, + const google::protobuf::MethodDescriptor* method, + const google::protobuf::Message* request, + google::protobuf::Message* response) override { + return brpc::SubCall(method, request, response->New(), brpc::DELETE_RESPONSE); + } + + void MapController(int channel_index, int, + const brpc::Controller* main_cntl, + brpc::Controller* sub_cntl) override { + sub_cntl->http_request().SetHeader(ECHO_HTTP_HEADER, std::to_string(channel_index)); + } +}; + +TEST_F(ChannelTest, http_header_parallel_channels) { + brpc::Server server; + MyEchoService service; + ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE)); + brpc::ServerOptions opt; + ASSERT_EQ(0, server.Start(_ep, &opt)); + + const size_t NCHANS = 5; + brpc::ParallelChannel channel; + for (size_t i = 0; i < NCHANS; ++i) { + brpc::Channel* sub_chan = new brpc::Channel(); + SetUpChannel(sub_chan, true, false, NULL, "", false, brpc::PROTOCOL_HTTP); + ASSERT_EQ(0, channel.AddChannel(sub_chan, brpc::OWNS_CHANNEL, new EchoHttpHeader, NULL)); + } + + brpc::Controller cntl; + test::EchoRequest req; + test::EchoResponse res; + req.set_message(__FUNCTION__); + *req.mutable_http_header() = ECHO_HTTP_HEADER; + CallMethod(&channel, &cntl, &req, &res, false); + + ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText(); + ASSERT_EQ((int)NCHANS, cntl.sub_count()); + for (int i = 0; i < cntl.sub_count(); ++i) { + const brpc::Controller* sub_cntl = cntl.sub(i); + ASSERT_TRUE(NULL != sub_cntl) << "i=" << i; + ASSERT_EQ(std::to_string(i), *sub_cntl->http_response().GetHeader(ECHO_HTTP_HEADER)); + } +} + TEST_F(ChannelTest, connection_failed_parallel) { for (int i = 0; i <= 1; ++i) { // Flag SingleServer for (int j = 0; j <= 1; ++j) { // Flag Asynchronous diff --git a/test/echo.proto b/test/echo.proto index 970ef1dbb1..c9fa8acee4 100644 --- a/test/echo.proto +++ b/test/echo.proto @@ -27,6 +27,7 @@ message EchoRequest { optional bool close_fd = 3; optional int32 sleep_us = 4; optional int32 server_fail = 5; + optional string http_header = 6; }; message EchoResponse {