Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions docs/cn/combo_channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,12 @@ public:
const google::protobuf::MethodDescriptor* method,
const google::protobuf::Message* request,
google::protobuf::Message* response) = 0;

virtual void MapController(int channel_index/*starting from 0*/, int channel_count,
const Controller* main_cntl, Controller* sub_cntl);
};
```
### Map

channel_index:该sub channel在ParallelChannel中的位置,从0开始计数。

Expand Down Expand Up @@ -124,6 +128,18 @@ method/request/response:ParallelChannel.CallMethod()的参数。
};
```

### MapController

channel_index:该sub channel在ParallelChannel中的位置,从0开始计数。

channel_count:ParallelChannel中sub channel的数量。

main_cntl:ParallelChannel.CallMethod()的参数。

sub_cntl:sub channel的请求对应的controller。默认实现:拷贝main_cntl的http_request和request_attachment到sub_cntl中。

注意:修改ClientSettings相关配置(如超时、重试等)是无效的,因为所有sub_cntl都是使用main_cntl的ClientSettings配置。

## ResponseMerger

response_merger把sub channel的response合并入总的response,其为NULL时,则使用response->MergeFrom(*sub_response),MergeFrom的行为可概括为“除了合并repeated字段,其余都是覆盖”。如果你需要更复杂的行为,则需实现ResponseMerger。response_merger是一个个执行的,所以你并不需要考虑多个Merge同时运行的情况。response_merger在ParallelChannel析构时被删除。response_merger内含引用计数,一个response_merger可与多个sub channel关联。
Expand Down
14 changes: 14 additions & 0 deletions docs/en/combo_channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public:
};
```

### Map

`channel_index`: The position of the sub channel inside `ParallelChannel`, starting from zero.

`channel_count`: The sub channel count inside `ParallelChannel`.
Expand Down Expand Up @@ -131,6 +133,18 @@ Common implementations of `Map()` are listed below:
};
```

### MapController

`channel_index`: The position of the sub channel inside `ParallelChannel`, starting from zero.

`channel_count`: The sub channel count inside `ParallelChannel`.

`main_cntl`:Parameters to `ParallelChannel::CallMethod()`.

`sub_cntl`:The controller corresponding to the sub-channel's requests. Default implementation: Copy the http_request and request_attachment of `main_cntl` to the `sub_cntl`.

Note: Modifying `ClientSettings` configurations (such as timeout and retries) is ineffective because all sub controllers use the `ClientSettings` configuration of `main_cntl`.

## ResponseMerger

`response_merger` merges responses from all sub channels into one for the `ParallelChannel`. When it's NULL, `response->MergeFrom(*sub_response)` is used instead, whose behavior can be summarized as "merge repeated fields and overwrite the rest". If you need more complex behavior, implement `ResponseMerger`. Multiple `response_merger` are called one by one to merge sub responses so that you do not need to consider the race conditions between merging multiple responses simultaneously. The object is deleted when `ParallelChannel ` destructs. Due to the reference counting inside, `response_merger ` can be associated with multiple sub channels.
Expand Down
23 changes: 20 additions & 3 deletions src/brpc/parallel_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,7 @@ void ParallelChannel::CallMethod(
int ndone = nchan;
int fail_limit = 1;
int success_limit = 1;
Controller::ClientSettings settings{};
DEFINE_SMALL_ARRAY(SubCall, aps, nchan, 64);

if (cntl->FailedInline()) {
Expand Down Expand Up @@ -718,12 +719,28 @@ void ParallelChannel::CallMethod(
d->SaveThreadInfoOfCallsite();
CHECK_EQ(0, bthread_id_unlock(cid));
// Don't touch `cntl' and `d' again (for async RPC)


// Apply client settings of _cntl to controllers of sub calls, except
// timeout. If we let sub channel do their timeout separately, when
// timeout happens, we get ETOOMANYFAILS rather than ERPCTIMEDOUT.
cntl->SaveClientSettings(&settings);
settings.timeout_ms = -1;
for (int i = 0, j = 0; i < nchan; ++i) {
if (!aps[i].is_skip()) {
ParallelChannelDone::SubDone* sd = d->sub_done(j++);
if (NULL != _chans[i].call_mapper) {
_chans[i].call_mapper->MapController(i, nchan, cntl, &sd->cntl);
} else {
// Forward the attachment to each sub call.
sd->cntl.request_attachment().append(cntl->request_attachment());
}
sd->cntl.ApplyClientSettings(settings);
sd->cntl.allow_done_to_run_in_place();
}
}
for (int i = 0, j = 0; i < nchan; ++i) {
if (!aps[i].is_skip()) {
ParallelChannelDone::SubDone* sd = d->sub_done(j++);
// Forward the attachment to each sub call
sd->cntl.request_attachment().append(cntl->request_attachment());
_chans[i].chan->CallMethod(sd->ap.method, &sd->cntl,
sd->ap.request, sd->ap.response, sd);
}
Expand Down
16 changes: 15 additions & 1 deletion src/brpc/parallel_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,28 @@ struct SubCall {
// }
// return SubCall(sub_method, request->sub_request(channel_index),
// response->add_sub_response(), 0);
// MapController calls to ParallelChannel to sub channels, which can have
// different controllers.
// Note:
// Modifying ClientSettings configurations (such as timeout, retries, etc.)
// is ineffective because all sub-controllers use the main controller's
// ClientSettings configuration.
// Examples:
// sub_cntl->http_request().SetHeader(...);
class CallMapper : public SharedObject {
public:
virtual SubCall Map(int channel_index/*starting from 0*/,
int channel_count,
const google::protobuf::MethodDescriptor* method,
const google::protobuf::Message* request,
google::protobuf::Message* response) {
return Map(channel_index, method, request, response);
return Map(channel_index, method, request, response);
}

virtual void MapController(int channel_index/*starting from 0*/, int channel_count,
const Controller* main_cntl, Controller* sub_cntl) {
// Forward the attachment to each sub call by default.
sub_cntl->request_attachment().append(main_cntl->request_attachment());
}

protected:
Expand Down
2 changes: 1 addition & 1 deletion src/brpc/policy/redis_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ void SerializeRedisRequest(butil::IOBuf* buf,
const RedisRequest* rr = (const RedisRequest*)request;
// If redis byte size is zero, brpc call will fail with E22. Continuous E22 may cause E112 in the end.
// So set failed and return useful error message
if (rr->ByteSize() == 0) {
if (GetProtobufByteSize(*rr) == 0) {
return cntl->SetFailed(EREQUEST, "request byte size is empty");
}
// We work around SerializeTo of pb which is just a placeholder.
Expand Down
12 changes: 6 additions & 6 deletions src/brpc/selective_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,13 +344,13 @@ int Sender::IssueRPC(int64_t start_realtime_us) {
sub_cntl->set_request_code(_main_cntl->request_code());
// Forward request attachment to the subcall
sub_cntl->request_attachment().append(_main_cntl->request_attachment());
sub_cntl->http_request() = _main_cntl->http_request();
ProtocolType protocol = _main_cntl->request_protocol();
if (PROTOCOL_HTTP == protocol || PROTOCOL_H2 == protocol) {
sub_cntl->http_request() = _main_cntl->http_request();
}

sel_out.channel()->CallMethod(_main_cntl->_method,
&r.sub_done->_cntl,
_request,
r.response,
r.sub_done);
sel_out.channel()->CallMethod(_main_cntl->_method, &r.sub_done->_cntl,
_request, r.response, r.sub_done);
return 0;
}

Expand Down
3 changes: 1 addition & 2 deletions src/brpc/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -896,8 +896,7 @@ void Socket::OnFailed(int error_code, const std::string& error_text) {
// comes online.
if (HCEnabled()) {
GetOrNewSharedPart()->circuit_breaker.MarkAsBroken();
StartHealthCheck(id(),
GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms());
StartHealthCheck(id(), GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms());
}
// Wake up all threads waiting on EPOLLOUT when closing fd
_epollout_butex->fetch_add(1, butil::memory_order_relaxed);
Expand Down
75 changes: 68 additions & 7 deletions test/brpc_channel_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,16 @@ class MyEchoService : public ::test::EchoService {
res->add_code_list(req->code());
}
res->set_receiving_socket_id(cntl->_current_call.sending_sock->id());

brpc::ProtocolType protocol = cntl->request_protocol();
if ((brpc::PROTOCOL_HTTP == protocol || brpc::PROTOCOL_H2 == protocol) &&
!req->http_header().empty()) {
ASSERT_FALSE(req->http_header().empty());
const std::string* val = cntl->http_request().GetHeader(req->http_header());
ASSERT_TRUE(val);
ASSERT_FALSE(val->empty());
cntl->http_response().SetHeader(req->http_header(), *val);
}
}
static void CallAfterRpc(std::shared_ptr<CallAfterRpcObject> str,
brpc::Controller* cntl,
Expand Down Expand Up @@ -310,8 +320,10 @@ class ChannelTest : public ::testing::Test{
bool short_connection,
const brpc::Authenticator* auth = NULL,
std::string connection_group = std::string(),
bool use_backup_request_policy = false) {
bool use_backup_request_policy = false,
brpc::ProtocolType protocol = brpc::PROTOCOL_BAIDU_STD) {
brpc::ChannelOptions opt;
opt.protocol = protocol;
if (short_connection) {
opt.connection_type = brpc::CONNECTION_TYPE_SHORT;
}
Expand Down Expand Up @@ -526,7 +538,7 @@ class ChannelTest : public ::testing::Test{
int channel_index,
const google::protobuf::MethodDescriptor* method,
const google::protobuf::Message* req_base,
google::protobuf::Message* response) {
google::protobuf::Message* response) override {
test::EchoRequest* req = brpc::Clone<test::EchoRequest>(req_base);
req->set_code(channel_index + 1/*non-zero*/);
return brpc::SubCall(method, req, response->New(),
Expand All @@ -540,7 +552,7 @@ class ChannelTest : public ::testing::Test{
int channel_index,
const google::protobuf::MethodDescriptor* method,
const google::protobuf::Message* req_base,
google::protobuf::Message* response) {
google::protobuf::Message* response) override {
if (channel_index % 2) {
return brpc::SubCall::Skip();
}
Expand All @@ -554,7 +566,7 @@ class ChannelTest : public ::testing::Test{
int channel_index,
const google::protobuf::MethodDescriptor* method,
const google::protobuf::Message* req_base,
google::protobuf::Message* res_base) {
google::protobuf::Message* res_base) override {
const test::ComboRequest* req =
dynamic_cast<const test::ComboRequest*>(req_base);
test::ComboResponse* res = dynamic_cast<test::ComboResponse*>(res_base);
Expand Down Expand Up @@ -1334,7 +1346,7 @@ class ChannelTest : public ::testing::Test{
int /*channel_index*/,
const google::protobuf::MethodDescriptor* method,
const google::protobuf::Message* req_base,
google::protobuf::Message* response) {
google::protobuf::Message* response) override {
test::EchoRequest* req = brpc::Clone<test::EchoRequest>(req_base);
req->set_sleep_us(70000); // 70ms
return brpc::SubCall(method, req, response->New(),
Expand Down Expand Up @@ -2357,7 +2369,7 @@ class BadCall : public brpc::CallMapper {
brpc::SubCall Map(int,
const google::protobuf::MethodDescriptor*,
const google::protobuf::Message*,
google::protobuf::Message*) {
google::protobuf::Message*) override {
return brpc::SubCall::Bad();
}
};
Expand All @@ -2384,7 +2396,7 @@ class SkipCall : public brpc::CallMapper {
brpc::SubCall Map(int,
const google::protobuf::MethodDescriptor*,
const google::protobuf::Message*,
google::protobuf::Message*) {
google::protobuf::Message*) override {
return brpc::SubCall::Skip();
}
};
Expand Down Expand Up @@ -2412,6 +2424,55 @@ TEST_F(ChannelTest, skip_all_channels) {
}
}

static const std::string ECHO_HTTP_HEADER = "echo-http-header";

class EchoHttpHeader : public brpc::CallMapper {
public:
brpc::SubCall Map(int channel_index, int channel_count,
const google::protobuf::MethodDescriptor* method,
const google::protobuf::Message* request,
google::protobuf::Message* response) override {
return brpc::SubCall(method, request, response->New(), brpc::DELETE_RESPONSE);
}

void MapController(int channel_index, int,
const brpc::Controller* main_cntl,
brpc::Controller* sub_cntl) override {
sub_cntl->http_request().SetHeader(ECHO_HTTP_HEADER, std::to_string(channel_index));
}
};

TEST_F(ChannelTest, http_header_parallel_channels) {
brpc::Server server;
MyEchoService service;
ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
brpc::ServerOptions opt;
ASSERT_EQ(0, server.Start(_ep, &opt));

const size_t NCHANS = 5;
brpc::ParallelChannel channel;
for (size_t i = 0; i < NCHANS; ++i) {
brpc::Channel* sub_chan = new brpc::Channel();
SetUpChannel(sub_chan, true, false, NULL, "", false, brpc::PROTOCOL_HTTP);
ASSERT_EQ(0, channel.AddChannel(sub_chan, brpc::OWNS_CHANNEL, new EchoHttpHeader, NULL));
}

brpc::Controller cntl;
test::EchoRequest req;
test::EchoResponse res;
req.set_message(__FUNCTION__);
*req.mutable_http_header() = ECHO_HTTP_HEADER;
CallMethod(&channel, &cntl, &req, &res, false);

ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ((int)NCHANS, cntl.sub_count());
for (int i = 0; i < cntl.sub_count(); ++i) {
const brpc::Controller* sub_cntl = cntl.sub(i);
ASSERT_TRUE(NULL != sub_cntl) << "i=" << i;
ASSERT_EQ(std::to_string(i), *sub_cntl->http_response().GetHeader(ECHO_HTTP_HEADER));
}
}

TEST_F(ChannelTest, connection_failed_parallel) {
for (int i = 0; i <= 1; ++i) { // Flag SingleServer
for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
Expand Down
1 change: 1 addition & 0 deletions test/echo.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ message EchoRequest {
optional bool close_fd = 3;
optional int32 sleep_us = 4;
optional int32 server_fail = 5;
optional string http_header = 6;
};

message EchoResponse {
Expand Down
Loading