diff --git a/src/brpc/backup_request_policy.cpp b/src/brpc/backup_request_policy.cpp new file mode 100644 index 0000000000..d2d867fd20 --- /dev/null +++ b/src/brpc/backup_request_policy.cpp @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "brpc/backup_request_policy.h" + +#include // std::nothrow +#include +#include "butil/logging.h" +#include "brpc/reloadable_flags.h" +#include "bvar/reducer.h" +#include "bvar/window.h" +#include "butil/atomicops.h" +#include "butil/time.h" + +namespace brpc { + +DEFINE_double(backup_request_max_ratio, -1, + "Maximum ratio of backup requests to total requests. " + "Value in (0, 1] enables rate limiting. Values <= 0 disable it " + "(-1 is default). Can be overridden per-channel via " + "ChannelOptions.backup_request_max_ratio. " + "Note: takes effect at Channel::Init() time; changing this flag " + "at runtime does not affect already-created channels."); + +static bool validate_backup_request_max_ratio(const char*, double v) { + if (v <= 0) return true; // non-positive means disabled + if (v <= 1.0) return true; + LOG(ERROR) << "Invalid backup_request_max_ratio=" << v + << ", must be <= 0 (disabled) or in (0, 1]"; + return false; +} +BRPC_VALIDATE_GFLAG(backup_request_max_ratio, + validate_backup_request_max_ratio); + +DEFINE_int32(backup_request_ratio_window_size_s, 10, + "Window size in seconds for computing the backup request ratio. " + "Must be in [1, 3600]."); + +static bool validate_backup_request_ratio_window_size_s( + const char*, int32_t v) { + if (v >= 1 && v <= 3600) return true; + LOG(ERROR) << "Invalid backup_request_ratio_window_size_s=" << v + << ", must be in [1, 3600]"; + return false; +} +BRPC_VALIDATE_GFLAG(backup_request_ratio_window_size_s, + validate_backup_request_ratio_window_size_s); + +DEFINE_int32(backup_request_ratio_update_interval_s, 5, + "Interval in seconds between ratio cache updates. Must be >= 1."); + +static bool validate_backup_request_ratio_update_interval_s( + const char*, int32_t v) { + if (v >= 1) return true; + LOG(ERROR) << "Invalid backup_request_ratio_update_interval_s=" << v + << ", must be >= 1"; + return false; +} +BRPC_VALIDATE_GFLAG(backup_request_ratio_update_interval_s, + validate_backup_request_ratio_update_interval_s); + +// Standalone statistics module for tracking backup/total request ratio +// within a sliding time window. Each instance schedules two bvar::Window +// sampler tasks; keep this in mind for high channel-count deployments. +class BackupRateLimiter { +public: + BackupRateLimiter(double max_backup_ratio, + int window_size_seconds, + int update_interval_seconds) + : _max_backup_ratio(max_backup_ratio) + , _update_interval_us(update_interval_seconds * 1000000LL) + , _total_count() + , _backup_count() + , _total_window(&_total_count, window_size_seconds) + , _backup_window(&_backup_count, window_size_seconds) + , _cached_ratio(0.0) + , _last_update_us(0) { + } + + // All atomic operations use relaxed ordering intentionally. + // This is best-effort rate limiting: a slightly stale ratio is + // acceptable for approximate throttling. + bool ShouldAllow() const { + const int64_t now_us = butil::cpuwide_time_us(); + int64_t last_us = _last_update_us.load(butil::memory_order_relaxed); + double ratio = _cached_ratio.load(butil::memory_order_relaxed); + + if (now_us - last_us >= _update_interval_us) { + if (_last_update_us.compare_exchange_strong( + last_us, now_us, butil::memory_order_relaxed)) { + int64_t total = _total_window.get_value(); + int64_t backup = _backup_window.get_value(); + ratio = (total > 0) ? static_cast(backup) / total : 0.0; + _cached_ratio.store(ratio, butil::memory_order_relaxed); + } + } + + // max_backup_ratio >= 1.0 means no limit (ratio cannot exceed 1.0). + bool allow = _max_backup_ratio >= 1.0 || ratio < _max_backup_ratio; + if (allow) { + // Count backup decisions immediately for faster feedback + // during latency spikes (before RPCs complete). + _backup_count << 1; + } + return allow; + } + + void OnRPCEnd(const Controller* /*controller*/) { + // Count all completed RPCs. Backup decisions are counted + // in ShouldAllow() at decision time for faster feedback. + _total_count << 1; + } + +private: + double _max_backup_ratio; + int64_t _update_interval_us; + + bvar::Adder _total_count; + mutable bvar::Adder _backup_count; + bvar::Window> _total_window; + bvar::Window> _backup_window; + + mutable butil::atomic _cached_ratio; + mutable butil::atomic _last_update_us; +}; + +// Internal BackupRequestPolicy that composes a BackupRateLimiter +// for ratio-based suppression. +class RateLimitedBackupPolicy : public BackupRequestPolicy { +public: + RateLimitedBackupPolicy(int32_t backup_request_ms, + double max_backup_ratio, + int window_size_seconds, + int update_interval_seconds) + : _backup_request_ms(backup_request_ms) + , _rate_limiter(max_backup_ratio, window_size_seconds, + update_interval_seconds) { + } + + int32_t GetBackupRequestMs(const Controller* /*controller*/) const override { + return _backup_request_ms; + } + + bool DoBackup(const Controller* /*controller*/) const override { + return _rate_limiter.ShouldAllow(); + } + + void OnRPCEnd(const Controller* controller) override { + _rate_limiter.OnRPCEnd(controller); + } + +private: + int32_t _backup_request_ms; + BackupRateLimiter _rate_limiter; +}; + +BackupRequestPolicy* CreateRateLimitedBackupPolicy( + int32_t backup_request_ms, + double max_backup_ratio, + int window_size_seconds, + int update_interval_seconds) { + if (backup_request_ms < 0) { + LOG(ERROR) << "Invalid backup_request_ms=" << backup_request_ms + << ", must be >= 0"; + return NULL; + } + if (max_backup_ratio <= 0 || max_backup_ratio > 1.0) { + LOG(ERROR) << "Invalid max_backup_ratio=" << max_backup_ratio + << ", must be in (0, 1]"; + return NULL; + } + if (window_size_seconds < 1 || window_size_seconds > 3600) { + LOG(ERROR) << "Invalid window_size_seconds=" << window_size_seconds + << ", must be in [1, 3600]"; + return NULL; + } + if (update_interval_seconds < 1) { + LOG(ERROR) << "Invalid update_interval_seconds=" + << update_interval_seconds << ", must be >= 1"; + return NULL; + } + RateLimitedBackupPolicy* policy = new (std::nothrow) RateLimitedBackupPolicy( + backup_request_ms, max_backup_ratio, + window_size_seconds, update_interval_seconds); + if (!policy) { + LOG(ERROR) << "Fail to allocate RateLimitedBackupPolicy"; + } + return policy; +} + +} // namespace brpc diff --git a/src/brpc/backup_request_policy.h b/src/brpc/backup_request_policy.h index ea254f1dbf..a6219dc6f0 100644 --- a/src/brpc/backup_request_policy.h +++ b/src/brpc/backup_request_policy.h @@ -38,6 +38,24 @@ class BackupRequestPolicy { virtual void OnRPCEnd(const Controller* controller) = 0; }; +// Create a BackupRequestPolicy that limits the ratio of backup requests +// to total requests within a sliding time window. When the ratio reaches +// or exceeds max_backup_ratio, DoBackup() returns false. +// NOTE: Backup decisions are counted immediately at DoBackup() time for +// fast feedback. Total RPCs are counted on completion (OnRPCEnd). During +// latency spikes the ratio may temporarily lag until RPCs complete. +// Returns NULL on invalid parameters or allocation failure. +// backup_request_ms: >= 0 +// max_backup_ratio: (0, 1] +// window_size_seconds: [1, 3600] +// update_interval_seconds: >= 1 +// The caller owns the returned pointer. +BackupRequestPolicy* CreateRateLimitedBackupPolicy( + int32_t backup_request_ms, + double max_backup_ratio, + int window_size_seconds, + int update_interval_seconds); + } #endif // BRPC_BACKUP_REQUEST_POLICY_H diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp index 86124c2552..d130a0c9df 100644 --- a/src/brpc/channel.cpp +++ b/src/brpc/channel.cpp @@ -20,6 +20,7 @@ #include #include #include +#include "butil/memory/scope_guard.h" #include "butil/time.h" // milliseconds_from_now #include "butil/logging.h" #include "butil/third_party/murmurhash3/murmurhash3.h" @@ -43,6 +44,9 @@ namespace brpc { DECLARE_bool(enable_rpcz); DECLARE_bool(usercode_in_pthread); +DECLARE_double(backup_request_max_ratio); +DECLARE_int32(backup_request_ratio_window_size_s); +DECLARE_int32(backup_request_ratio_update_interval_s); DEFINE_string(health_check_path, "", "Http path of health check call." "By default health check succeeds if the server is connectable." "If this flag is set, health check is not completed until a http " @@ -63,6 +67,7 @@ ChannelOptions::ChannelOptions() , log_succeed_without_server(true) , socket_mode(SOCKET_MODE_TCP) , auth(NULL) + , backup_request_max_ratio(-1) , backup_request_policy(NULL) , retry_policy(NULL) , ns_filter(NULL) @@ -164,6 +169,7 @@ Channel::Channel(ProfilerLinker) , _serialize_request(NULL) , _pack_request(NULL) , _get_method_name(NULL) + , _owns_backup_policy(false) , _preferred_index(-1) { } @@ -172,12 +178,45 @@ Channel::~Channel() { const ChannelSignature sig = ComputeChannelSignature(_options); SocketMapRemove(SocketMapKey(_server_address, sig)); } + // Delete internally-created backup policy. Like user-provided + // backup_request_policy, the caller must ensure no async RPCs are + // in-flight when the Channel is destroyed. + if (_owns_backup_policy) { + delete _options.backup_request_policy; + } } int Channel::InitChannelOptions(const ChannelOptions* options) { + // Save any previously created internal backup policy (re-Init case). + // Deletion is deferred to the end so failed re-Init can rollback. + BackupRequestPolicy* old_backup_policy = NULL; + bool had_old_policy = _owns_backup_policy; + if (_owns_backup_policy) { + old_backup_policy = _options.backup_request_policy; + _options.backup_request_policy = NULL; + _owns_backup_policy = false; + } + + // On failure, rollback old policy. On success, delete it. + bool init_success = false; + BRPC_SCOPE_EXIT { + if (!init_success && had_old_policy) { + _options.backup_request_policy = old_backup_policy; + _owns_backup_policy = true; + } else { + delete old_backup_policy; + } + }; + if (options) { // Override default options if user provided one. _options = *options; + // If the incoming options reused the old internal policy pointer, + // treat it as NULL since it will be deleted on success. + if (_options.backup_request_policy == old_backup_policy && + old_backup_policy != NULL) { + _options.backup_request_policy = NULL; + } } const Protocol* protocol = FindProtocol(_options.protocol); if (NULL == protocol || !protocol->support_client()) { @@ -242,6 +281,37 @@ int Channel::InitChannelOptions(const ChannelOptions* options) { if (!cg.empty() && (::isspace(cg.front()) || ::isspace(cg.back()))) { butil::TrimWhitespace(cg, butil::TRIM_ALL, &cg); } + + // Create rate-limited backup policy if configured. + // Per-channel option takes precedence over the global gflag. + double max_ratio = _options.backup_request_max_ratio; + if (max_ratio < 0) { + max_ratio = FLAGS_backup_request_max_ratio; + } + if (max_ratio > 1.0) { + LOG(WARNING) << "backup_request_max_ratio=" << max_ratio + << " is out of range (0, 1], clamped to 1.0"; + max_ratio = 1.0; + } + // User-provided backup_request_policy takes precedence. + if (_options.backup_request_policy != NULL && max_ratio > 0) { + LOG(WARNING) << "backup_request_max_ratio=" << max_ratio + << " is ignored because backup_request_policy is already set"; + } + if (max_ratio > 0 && _options.backup_request_policy == NULL && + _options.backup_request_ms >= 0) { + BackupRequestPolicy* policy = CreateRateLimitedBackupPolicy( + _options.backup_request_ms, max_ratio, + FLAGS_backup_request_ratio_window_size_s, + FLAGS_backup_request_ratio_update_interval_s); + if (policy) { + _options.backup_request_policy = policy; + _owns_backup_policy = true; + } else { + LOG(ERROR) << "Fail to create rate-limited backup policy"; + } + } + init_success = true; return 0; } diff --git a/src/brpc/channel.h b/src/brpc/channel.h index 7c257c05d3..e8357a8595 100644 --- a/src/brpc/channel.h +++ b/src/brpc/channel.h @@ -116,11 +116,26 @@ struct ChannelOptions { // Default: NULL const Authenticator* auth; + // Maximum ratio of backup requests to total requests within a sliding + // time window. When the ratio reaches or exceeds this value, backup + // requests are suppressed. + // Value in (0, 1] enables rate limiting. -1 (default) uses global gflag + // FLAGS_backup_request_max_ratio. 0 explicitly disables rate limiting. + // Only effective when backup_request_ms >= 0 and backup_request_policy + // is NULL (i.e. no custom policy). When effective, an internal + // rate-limited BackupRequestPolicy is created and used automatically. + // Default: -1 (use FLAGS_backup_request_max_ratio) + double backup_request_max_ratio; + // Customize the backup request time and whether to send backup request. - // Priority: `backup_request_policy' > `backup_request_ms'. - // Overridable by Controller.set_backup_request_ms() or - // Controller.set_backup_request_policy(). - // This object is NOT owned by channel and should remain valid when channel is used. + // Priority: `backup_request_policy' > `backup_request_max_ratio' > `backup_request_ms'. + // Overridable per-RPC by Controller.set_backup_request_ms() or + // Controller.set_backup_request_policy(). Note: per-RPC override + // replaces the entire channel-level backup config including any + // internal rate-limited policy created by backup_request_max_ratio. + // When user-supplied, this object is NOT owned by channel and should + // remain valid during channel's lifetime. When backup_request_max_ratio + // creates an internal policy, that policy IS owned by channel. // Default: NULL BackupRequestPolicy* backup_request_policy; @@ -263,6 +278,7 @@ friend class SelectiveChannel; // the RPC above has finished butil::intrusive_ptr _lb; ChannelOptions _options; + bool _owns_backup_policy; int _preferred_index; }; diff --git a/test/brpc_channel_unittest.cpp b/test/brpc_channel_unittest.cpp index 66d1fbad9b..ac16bb7d00 100644 --- a/test/brpc_channel_unittest.cpp +++ b/test/brpc_channel_unittest.cpp @@ -26,6 +26,7 @@ #include #include "butil/time.h" #include "butil/macros.h" +#include "butil/memory/scope_guard.h" #include "butil/logging.h" #include "butil/files/temp_file.h" #include "brpc/socket.h" @@ -46,6 +47,9 @@ namespace brpc { DECLARE_int32(idle_timeout_second); DECLARE_int32(max_connection_pool_size); +DECLARE_double(backup_request_max_ratio); +DECLARE_int32(backup_request_ratio_window_size_s); +DECLARE_int32(backup_request_ratio_update_interval_s); class Server; class MethodStatus; namespace policy { @@ -2803,6 +2807,142 @@ TEST_F(ChannelTest, backup_request_policy) { } } +TEST_F(ChannelTest, backup_request_rate_limited) { + ASSERT_EQ(0, StartAccept(_ep)); + + // Test 1: Policy is created when backup_request_max_ratio > 0 + { + brpc::ChannelOptions opt; + opt.backup_request_ms = 10; + opt.backup_request_max_ratio = 0.3; + brpc::Channel channel; + ASSERT_EQ(0, channel.Init(_ep, &opt)); + ASSERT_TRUE(channel.options().backup_request_policy != NULL); + } + + // Test 2: No policy when per-channel ratio is -1 and global gflag is also disabled + { + const double saved = brpc::FLAGS_backup_request_max_ratio; + brpc::FLAGS_backup_request_max_ratio = -1; + BRPC_SCOPE_EXIT { brpc::FLAGS_backup_request_max_ratio = saved; }; + + brpc::ChannelOptions opt; + opt.backup_request_ms = 10; + opt.backup_request_max_ratio = -1; // fallback to gflag (-1) = disabled + brpc::Channel channel; + ASSERT_EQ(0, channel.Init(_ep, &opt)); + ASSERT_TRUE(channel.options().backup_request_policy == NULL); + } + + // Test 3: max_ratio=0 explicitly disables even when global gflag is set + { + const double saved = brpc::FLAGS_backup_request_max_ratio; + brpc::FLAGS_backup_request_max_ratio = 0.5; + BRPC_SCOPE_EXIT { brpc::FLAGS_backup_request_max_ratio = saved; }; + + brpc::ChannelOptions opt; + opt.backup_request_ms = 10; + opt.backup_request_max_ratio = 0; // explicitly disabled + brpc::Channel channel; + ASSERT_EQ(0, channel.Init(_ep, &opt)); + ASSERT_TRUE(channel.options().backup_request_policy == NULL); + } + + // Test 4: max_ratio=1.0 creates policy but allows all backups + { + brpc::ChannelOptions opt; + opt.backup_request_ms = 10; + opt.backup_request_max_ratio = 1.0; + brpc::Channel channel; + ASSERT_EQ(0, channel.Init(_ep, &opt)); + ASSERT_TRUE(channel.options().backup_request_policy != NULL); + } + + // Test 5: max_ratio > 1.0 is clamped to 1.0, policy still created + { + brpc::ChannelOptions opt; + opt.backup_request_ms = 10; + opt.backup_request_max_ratio = 2.0; + brpc::Channel channel; + ASSERT_EQ(0, channel.Init(_ep, &opt)); + ASSERT_TRUE(channel.options().backup_request_policy != NULL); + } + + // Test 6: Custom policy is preserved, not replaced + { + brpc::ChannelOptions opt; + opt.backup_request_ms = 10; + opt.backup_request_max_ratio = 0.3; + opt.backup_request_policy = &_backup_request_policy; + brpc::Channel channel; + ASSERT_EQ(0, channel.Init(_ep, &opt)); + ASSERT_EQ(&_backup_request_policy, + channel.options().backup_request_policy); + } + + // Test 7: No policy when backup_request_ms < 0 (backup disabled) + { + brpc::ChannelOptions opt; + opt.backup_request_ms = -1; + opt.backup_request_max_ratio = 0.3; + brpc::Channel channel; + ASSERT_EQ(0, channel.Init(_ep, &opt)); + ASSERT_TRUE(channel.options().backup_request_policy == NULL); + } + + // Test 8: Functional — rate limiting reduces backup requests. + // Use RAII guard for gflag restore so cleanup runs even on ASSERT failure. + { + const int32_t saved_interval = + brpc::FLAGS_backup_request_ratio_update_interval_s; + const int32_t saved_window = + brpc::FLAGS_backup_request_ratio_window_size_s; + const double saved_ratio = brpc::FLAGS_backup_request_max_ratio; + brpc::FLAGS_backup_request_ratio_update_interval_s = 1; + brpc::FLAGS_backup_request_ratio_window_size_s = 2; + BRPC_SCOPE_EXIT { + brpc::FLAGS_backup_request_ratio_update_interval_s = saved_interval; + brpc::FLAGS_backup_request_ratio_window_size_s = saved_window; + brpc::FLAGS_backup_request_max_ratio = saved_ratio; + }; + + brpc::ChannelOptions opt; + opt.backup_request_ms = 10; // 10ms backup timeout + opt.backup_request_max_ratio = 0.3; + opt.timeout_ms = 500; + opt.max_retry = 1; + + brpc::Channel channel; + ASSERT_EQ(0, channel.Init(_ep, &opt)); + ASSERT_TRUE(channel.options().backup_request_policy != NULL); + + int backup_count = 0; + const int N = 40; + for (int i = 0; i < N; ++i) { + test::EchoRequest req; + test::EchoResponse res; + brpc::Controller cntl; + req.set_message(__FUNCTION__); + req.set_sleep_us(50000); // 50ms > 10ms, triggers backup timer + CallMethod(&channel, &cntl, &req, &res, false); + EXPECT_EQ(0, cntl.ErrorCode()) << cntl.ErrorText(); + if (cntl.has_backup_request()) { + ++backup_count; + } + } + // 40 requests * ~50ms = ~2s. Without rate limiting all 40 would + // get backup requests. With max_ratio=0.3, we expect fewer. + LOG(INFO) << "Rate-limited backup: " << backup_count << "/" << N; + EXPECT_LT(backup_count, N); + const int upper_bound = static_cast(N * 0.3) + 10; + EXPECT_LE(backup_count, upper_bound) + << "backup_count=" << backup_count + << " exceeded expected upper bound " << upper_bound; + } + + StopAndJoin(); +} + TEST_F(ChannelTest, multiple_threads_single_channel) { srand(time(NULL)); ASSERT_EQ(0, StartAccept(_ep));