Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 205 additions & 0 deletions src/brpc/backup_request_policy.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "brpc/backup_request_policy.h"

#include <new> // std::nothrow
#include <gflags/gflags.h>
#include "butil/logging.h"
#include "brpc/reloadable_flags.h"
#include "bvar/reducer.h"
#include "bvar/window.h"
#include "butil/atomicops.h"
#include "butil/time.h"

namespace brpc {

DEFINE_double(backup_request_max_ratio, -1,
"Maximum ratio of backup requests to total requests. "
"Value in (0, 1] enables rate limiting. Values <= 0 disable it "
"(-1 is default). Can be overridden per-channel via "
"ChannelOptions.backup_request_max_ratio. "
"Note: takes effect at Channel::Init() time; changing this flag "
"at runtime does not affect already-created channels.");

static bool validate_backup_request_max_ratio(const char*, double v) {
if (v <= 0) return true; // non-positive means disabled
if (v <= 1.0) return true;
LOG(ERROR) << "Invalid backup_request_max_ratio=" << v
<< ", must be <= 0 (disabled) or in (0, 1]";
return false;
}
BRPC_VALIDATE_GFLAG(backup_request_max_ratio,
validate_backup_request_max_ratio);

DEFINE_int32(backup_request_ratio_window_size_s, 10,
"Window size in seconds for computing the backup request ratio. "
"Must be in [1, 3600].");

static bool validate_backup_request_ratio_window_size_s(
const char*, int32_t v) {
if (v >= 1 && v <= 3600) return true;
LOG(ERROR) << "Invalid backup_request_ratio_window_size_s=" << v
<< ", must be in [1, 3600]";
return false;
}
BRPC_VALIDATE_GFLAG(backup_request_ratio_window_size_s,
validate_backup_request_ratio_window_size_s);

DEFINE_int32(backup_request_ratio_update_interval_s, 5,
"Interval in seconds between ratio cache updates. Must be >= 1.");

static bool validate_backup_request_ratio_update_interval_s(
const char*, int32_t v) {
if (v >= 1) return true;
LOG(ERROR) << "Invalid backup_request_ratio_update_interval_s=" << v
<< ", must be >= 1";
return false;
}
BRPC_VALIDATE_GFLAG(backup_request_ratio_update_interval_s,
validate_backup_request_ratio_update_interval_s);

// Standalone statistics module for tracking backup/total request ratio
// within a sliding time window. Each instance schedules two bvar::Window
// sampler tasks; keep this in mind for high channel-count deployments.
class BackupRateLimiter {
public:
BackupRateLimiter(double max_backup_ratio,
int window_size_seconds,
int update_interval_seconds)
: _max_backup_ratio(max_backup_ratio)
, _update_interval_us(update_interval_seconds * 1000000LL)
, _total_count()
, _backup_count()
, _total_window(&_total_count, window_size_seconds)
, _backup_window(&_backup_count, window_size_seconds)
, _cached_ratio(0.0)
, _last_update_us(0) {
}

// All atomic operations use relaxed ordering intentionally.
// This is best-effort rate limiting: a slightly stale ratio is
// acceptable for approximate throttling.
bool ShouldAllow() const {
const int64_t now_us = butil::cpuwide_time_us();
int64_t last_us = _last_update_us.load(butil::memory_order_relaxed);
double ratio = _cached_ratio.load(butil::memory_order_relaxed);

if (now_us - last_us >= _update_interval_us) {
if (_last_update_us.compare_exchange_strong(
last_us, now_us, butil::memory_order_relaxed)) {
int64_t total = _total_window.get_value();
int64_t backup = _backup_window.get_value();
ratio = (total > 0) ? static_cast<double>(backup) / total : 0.0;
_cached_ratio.store(ratio, butil::memory_order_relaxed);
}
}

// max_backup_ratio >= 1.0 means no limit (ratio cannot exceed 1.0).
bool allow = _max_backup_ratio >= 1.0 || ratio < _max_backup_ratio;
if (allow) {
// Count backup decisions immediately for faster feedback
// during latency spikes (before RPCs complete).
_backup_count << 1;
}
return allow;
}

void OnRPCEnd(const Controller* /*controller*/) {
// Count all completed RPCs. Backup decisions are counted
// in ShouldAllow() at decision time for faster feedback.
_total_count << 1;
}

private:
double _max_backup_ratio;
int64_t _update_interval_us;

bvar::Adder<int64_t> _total_count;
mutable bvar::Adder<int64_t> _backup_count;
bvar::Window<bvar::Adder<int64_t>> _total_window;
bvar::Window<bvar::Adder<int64_t>> _backup_window;

mutable butil::atomic<double> _cached_ratio;
mutable butil::atomic<int64_t> _last_update_us;
};

// Internal BackupRequestPolicy that composes a BackupRateLimiter
// for ratio-based suppression.
class RateLimitedBackupPolicy : public BackupRequestPolicy {
public:
RateLimitedBackupPolicy(int32_t backup_request_ms,
double max_backup_ratio,
int window_size_seconds,
int update_interval_seconds)
: _backup_request_ms(backup_request_ms)
, _rate_limiter(max_backup_ratio, window_size_seconds,
update_interval_seconds) {
}

int32_t GetBackupRequestMs(const Controller* /*controller*/) const override {
return _backup_request_ms;
}

bool DoBackup(const Controller* /*controller*/) const override {
return _rate_limiter.ShouldAllow();
}

void OnRPCEnd(const Controller* controller) override {
_rate_limiter.OnRPCEnd(controller);
}

private:
int32_t _backup_request_ms;
BackupRateLimiter _rate_limiter;
};

BackupRequestPolicy* CreateRateLimitedBackupPolicy(
int32_t backup_request_ms,
double max_backup_ratio,
int window_size_seconds,
int update_interval_seconds) {
if (backup_request_ms < 0) {
LOG(ERROR) << "Invalid backup_request_ms=" << backup_request_ms
<< ", must be >= 0";
return NULL;
}
if (max_backup_ratio <= 0 || max_backup_ratio > 1.0) {
LOG(ERROR) << "Invalid max_backup_ratio=" << max_backup_ratio
<< ", must be in (0, 1]";
return NULL;
}
if (window_size_seconds < 1 || window_size_seconds > 3600) {
LOG(ERROR) << "Invalid window_size_seconds=" << window_size_seconds
<< ", must be in [1, 3600]";
return NULL;
}
if (update_interval_seconds < 1) {
LOG(ERROR) << "Invalid update_interval_seconds="
<< update_interval_seconds << ", must be >= 1";
return NULL;
}
RateLimitedBackupPolicy* policy = new (std::nothrow) RateLimitedBackupPolicy(
backup_request_ms, max_backup_ratio,
window_size_seconds, update_interval_seconds);
if (!policy) {
LOG(ERROR) << "Fail to allocate RateLimitedBackupPolicy";
}
return policy;
}

} // namespace brpc
18 changes: 18 additions & 0 deletions src/brpc/backup_request_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,24 @@ class BackupRequestPolicy {
virtual void OnRPCEnd(const Controller* controller) = 0;
};

// Create a BackupRequestPolicy that limits the ratio of backup requests
// to total requests within a sliding time window. When the ratio reaches
// or exceeds max_backup_ratio, DoBackup() returns false.
// NOTE: Backup decisions are counted immediately at DoBackup() time for
// fast feedback. Total RPCs are counted on completion (OnRPCEnd). During
// latency spikes the ratio may temporarily lag until RPCs complete.
// Returns NULL on invalid parameters or allocation failure.
// backup_request_ms: >= 0
// max_backup_ratio: (0, 1]
// window_size_seconds: [1, 3600]
// update_interval_seconds: >= 1
// The caller owns the returned pointer.
BackupRequestPolicy* CreateRateLimitedBackupPolicy(
int32_t backup_request_ms,
double max_backup_ratio,
int window_size_seconds,
int update_interval_seconds);

}

#endif // BRPC_BACKUP_REQUEST_POLICY_H
70 changes: 70 additions & 0 deletions src/brpc/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <google/protobuf/descriptor.h>
#include <gflags/gflags.h>
#include <memory>
#include "butil/memory/scope_guard.h"
#include "butil/time.h" // milliseconds_from_now
#include "butil/logging.h"
#include "butil/third_party/murmurhash3/murmurhash3.h"
Expand All @@ -43,6 +44,9 @@ namespace brpc {

DECLARE_bool(enable_rpcz);
DECLARE_bool(usercode_in_pthread);
DECLARE_double(backup_request_max_ratio);
DECLARE_int32(backup_request_ratio_window_size_s);
DECLARE_int32(backup_request_ratio_update_interval_s);
DEFINE_string(health_check_path, "", "Http path of health check call."
"By default health check succeeds if the server is connectable."
"If this flag is set, health check is not completed until a http "
Expand All @@ -63,6 +67,7 @@ ChannelOptions::ChannelOptions()
, log_succeed_without_server(true)
, socket_mode(SOCKET_MODE_TCP)
, auth(NULL)
, backup_request_max_ratio(-1)
, backup_request_policy(NULL)
, retry_policy(NULL)
, ns_filter(NULL)
Expand Down Expand Up @@ -164,6 +169,7 @@ Channel::Channel(ProfilerLinker)
, _serialize_request(NULL)
, _pack_request(NULL)
, _get_method_name(NULL)
, _owns_backup_policy(false)
, _preferred_index(-1) {
}

Expand All @@ -172,12 +178,45 @@ Channel::~Channel() {
const ChannelSignature sig = ComputeChannelSignature(_options);
SocketMapRemove(SocketMapKey(_server_address, sig));
}
// Delete internally-created backup policy. Like user-provided
// backup_request_policy, the caller must ensure no async RPCs are
// in-flight when the Channel is destroyed.
if (_owns_backup_policy) {
delete _options.backup_request_policy;
}
}


int Channel::InitChannelOptions(const ChannelOptions* options) {
// Save any previously created internal backup policy (re-Init case).
// Deletion is deferred to the end so failed re-Init can rollback.
BackupRequestPolicy* old_backup_policy = NULL;
bool had_old_policy = _owns_backup_policy;
if (_owns_backup_policy) {
old_backup_policy = _options.backup_request_policy;
_options.backup_request_policy = NULL;
_owns_backup_policy = false;
}

// On failure, rollback old policy. On success, delete it.
bool init_success = false;
BRPC_SCOPE_EXIT {
if (!init_success && had_old_policy) {
_options.backup_request_policy = old_backup_policy;
_owns_backup_policy = true;
} else {
delete old_backup_policy;
}
};

if (options) { // Override default options if user provided one.
_options = *options;
// If the incoming options reused the old internal policy pointer,
// treat it as NULL since it will be deleted on success.
if (_options.backup_request_policy == old_backup_policy &&
old_backup_policy != NULL) {
_options.backup_request_policy = NULL;
}
}
const Protocol* protocol = FindProtocol(_options.protocol);
if (NULL == protocol || !protocol->support_client()) {
Expand Down Expand Up @@ -242,6 +281,37 @@ int Channel::InitChannelOptions(const ChannelOptions* options) {
if (!cg.empty() && (::isspace(cg.front()) || ::isspace(cg.back()))) {
butil::TrimWhitespace(cg, butil::TRIM_ALL, &cg);
}

// Create rate-limited backup policy if configured.
// Per-channel option takes precedence over the global gflag.
double max_ratio = _options.backup_request_max_ratio;
if (max_ratio < 0) {
max_ratio = FLAGS_backup_request_max_ratio;
}
if (max_ratio > 1.0) {
LOG(WARNING) << "backup_request_max_ratio=" << max_ratio
<< " is out of range (0, 1], clamped to 1.0";
max_ratio = 1.0;
}
// User-provided backup_request_policy takes precedence.
if (_options.backup_request_policy != NULL && max_ratio > 0) {
LOG(WARNING) << "backup_request_max_ratio=" << max_ratio
<< " is ignored because backup_request_policy is already set";
}
if (max_ratio > 0 && _options.backup_request_policy == NULL &&
_options.backup_request_ms >= 0) {
BackupRequestPolicy* policy = CreateRateLimitedBackupPolicy(
_options.backup_request_ms, max_ratio,
FLAGS_backup_request_ratio_window_size_s,
FLAGS_backup_request_ratio_update_interval_s);
if (policy) {
_options.backup_request_policy = policy;
_owns_backup_policy = true;
} else {
LOG(ERROR) << "Fail to create rate-limited backup policy";
}
}
init_success = true;
return 0;
}

Expand Down
24 changes: 20 additions & 4 deletions src/brpc/channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,26 @@ struct ChannelOptions {
// Default: NULL
const Authenticator* auth;

// Maximum ratio of backup requests to total requests within a sliding
// time window. When the ratio reaches or exceeds this value, backup
// requests are suppressed.
// Value in (0, 1] enables rate limiting. -1 (default) uses global gflag
// FLAGS_backup_request_max_ratio. 0 explicitly disables rate limiting.
// Only effective when backup_request_ms >= 0 and backup_request_policy
// is NULL (i.e. no custom policy). When effective, an internal
// rate-limited BackupRequestPolicy is created and used automatically.
// Default: -1 (use FLAGS_backup_request_max_ratio)
double backup_request_max_ratio;

// Customize the backup request time and whether to send backup request.
// Priority: `backup_request_policy' > `backup_request_ms'.
// Overridable by Controller.set_backup_request_ms() or
// Controller.set_backup_request_policy().
// This object is NOT owned by channel and should remain valid when channel is used.
// Priority: `backup_request_policy' > `backup_request_max_ratio' > `backup_request_ms'.
// Overridable per-RPC by Controller.set_backup_request_ms() or
// Controller.set_backup_request_policy(). Note: per-RPC override
// replaces the entire channel-level backup config including any
// internal rate-limited policy created by backup_request_max_ratio.
// When user-supplied, this object is NOT owned by channel and should
// remain valid during channel's lifetime. When backup_request_max_ratio
// creates an internal policy, that policy IS owned by channel.
// Default: NULL
BackupRequestPolicy* backup_request_policy;

Expand Down Expand Up @@ -263,6 +278,7 @@ friend class SelectiveChannel;
// the RPC above has finished
butil::intrusive_ptr<SharedLoadBalancer> _lb;
ChannelOptions _options;
bool _owns_backup_policy;
int _preferred_index;
};

Expand Down
Loading
Loading