Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 34 additions & 4 deletions src/brpc/rdma/block_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,17 @@ DEFINE_int32(rdma_memory_pool_tls_cache_num, 128, "Number of cached block in tls
DEFINE_bool(rdma_memory_pool_user_specified_memory, false,
"If true, the user must call UserExtendBlockPool() to extend "
"memory. bRPC will not handle memory extension.");
DEFINE_string(rdma_recv_block_type, "default", "Default size type for recv WR: "
"default(8KB - 32B)/large(64KB - 32B)/huge(2MB - 32B)");

static RegisterCallback g_cb = NULL;

// Number of bytes in 1MB
static const size_t BYTES_IN_MB = 1048576;

static const int BLOCK_DEFAULT = 0; // 8KB
// static const int BLOCK_LARGE = 1; // 64KB
// static const int BLOCK_HUGE = 2; // 2MB
static const int BLOCK_LARGE = 1; // 64KB
static const int BLOCK_HUGE = 2; // 2MB
static const int BLOCK_SIZE_COUNT = 3;
static size_t g_block_size[BLOCK_SIZE_COUNT] = { 8192, 65536, 2 * BYTES_IN_MB };

Expand Down Expand Up @@ -183,7 +185,7 @@ static void* ExtendBlockPoolImpl(void* region_base, size_t region_size, int bloc

// Extend the block pool with a new region (with different region ID)
static void* ExtendBlockPool(size_t region_size, int block_type) {
if (region_size < 1) {
if (region_size < 1 || block_type < 0) {
errno = EINVAL;
return NULL;
}
Expand Down Expand Up @@ -325,7 +327,7 @@ bool InitBlockPool(RegisterCallback cb) {
}

if (ExtendBlockPool(FLAGS_rdma_memory_pool_initial_size_mb,
BLOCK_DEFAULT) != NULL) {
GetRdmaBlockType()) != NULL) {
return true;
Comment on lines 329 to 331
}
return false;
Expand Down Expand Up @@ -541,6 +543,34 @@ size_t GetBlockSize(int type) {
return g_block_size[type];
}

size_t GetRdmaBlockSize() {
if (FLAGS_rdma_recv_block_type == "default") {
return GetBlockSize(0);
} else if (FLAGS_rdma_recv_block_type == "large") {
return GetBlockSize(1);
} else if (FLAGS_rdma_recv_block_type == "huge") {
return GetBlockSize(2);
} else {
LOG(ERROR) << "rdma_recv_block_type incorrect "
<< "(valid value: default/large/huge)";
return 0;
}
}

int GetRdmaBlockType() {
if (FLAGS_rdma_recv_block_type == "default") {
return BLOCK_DEFAULT;
} else if (FLAGS_rdma_recv_block_type == "large") {
return BLOCK_LARGE;
} else if (FLAGS_rdma_recv_block_type == "huge") {
return BLOCK_HUGE;
} else {
LOG(ERROR) << "rdma_recv_block_type incorrect "
<< "(valid value: default/large/huge)";
return -1;
}
}

void DumpMemoryPoolInfo(std::ostream& os) {
if (!g_dump_mutex) {
return;
Expand Down
4 changes: 4 additions & 0 deletions src/brpc/rdma/block_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ uint32_t GetRegionId(const void* buf);
// type=3: BLOCK_HUGE(2MB)
size_t GetBlockSize(int type);

size_t GetRdmaBlockSize();

int GetRdmaBlockType();

// Dump memory pool information
void DumpMemoryPoolInfo(std::ostream& os);

Expand Down
11 changes: 2 additions & 9 deletions src/brpc/rdma/rdma_endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ DEFINE_int32(rdma_sq_size, 128, "SQ size for RDMA");
DEFINE_int32(rdma_rq_size, 128, "RQ size for RDMA");
DEFINE_bool(rdma_recv_zerocopy, true, "Enable zerocopy for receive side");
DEFINE_int32(rdma_zerocopy_min_size, 512, "The minimal size for receive zerocopy");
DEFINE_string(rdma_recv_block_type, "default", "Default size type for recv WR: "
"default(8KB - 32B)/large(64KB - 32B)/huge(2MB - 32B)");
DEFINE_int32(rdma_cqe_poll_once, 32, "The maximum of cqe number polled once.");
DEFINE_int32(rdma_prepared_qp_size, 128, "SQ and RQ size for prepared QP.");
DEFINE_int32(rdma_prepared_qp_cnt, 1024, "Initial count of prepared QP.");
Expand Down Expand Up @@ -1628,13 +1626,8 @@ void RdmaEndpoint::DebugInfo(std::ostream& os, butil::StringPiece connector) con
}

int RdmaEndpoint::GlobalInitialize() {
if (FLAGS_rdma_recv_block_type == "default") {
g_rdma_recv_block_size = GetBlockSize(0) - IOBUF_BLOCK_HEADER_LEN;
} else if (FLAGS_rdma_recv_block_type == "large") {
g_rdma_recv_block_size = GetBlockSize(1) - IOBUF_BLOCK_HEADER_LEN;
} else if (FLAGS_rdma_recv_block_type == "huge") {
g_rdma_recv_block_size = GetBlockSize(2) - IOBUF_BLOCK_HEADER_LEN;
} else {
g_rdma_recv_block_size = GetRdmaBlockSize() - IOBUF_BLOCK_HEADER_LEN;
if (g_rdma_recv_block_size <= 0) {
LOG(ERROR) << "rdma_recv_block_type incorrect "
<< "(valid value: default/large/huge)";
errno = EINVAL;
Expand Down
1 change: 1 addition & 0 deletions src/brpc/rdma/rdma_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,7 @@ static void GlobalRdmaInitializeOrDieImpl() {
}

// Initialize RDMA memory pool (block_pool)
butil::SetDefaultBlockSize(GetRdmaBlockSize());
if (!InitBlockPool(RdmaRegisterMemory)) {
PLOG(ERROR) << "Fail to initialize RDMA memory pool";
Comment on lines 552 to 555
ExitWithError();
Expand Down
21 changes: 18 additions & 3 deletions src/butil/iobuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,24 @@
#include "butil/iobuf_profiler.h"

namespace butil {
static size_t default_block_size = 8192;

size_t GetDefaultBlockSize() {
return default_block_size;
}

// This is not thread safe
void SetDefaultBlockSize(size_t block_size) {
if (block_size <= 0) {
LOG(FATAL) << "block_size " << block_size << " should be bigger than 0!!!";
}
if (block_size / 4096 * 4096 != block_size) {
LOG(FATAL) << "block_size " << block_size << " should be multiply of 4096!!!";
}
Comment on lines +56 to +58
LOG(INFO) << "Update default_block_size from " << default_block_size << " to " << block_size;
default_block_size = block_size;
Comment on lines +45 to +60
}

namespace iobuf {

DEFINE_int32(iobuf_aligned_buf_block_size, 0, "iobuf aligned buf block size");
Expand Down Expand Up @@ -399,9 +417,6 @@ size_t IOBuf::block_count_hit_tls_threshold() {
BAIDU_CASSERT(sizeof(IOBuf::SmallView) == sizeof(IOBuf::BigView),
sizeof_small_and_big_view_should_equal);

BAIDU_CASSERT(IOBuf::DEFAULT_BLOCK_SIZE/4096*4096 == IOBuf::DEFAULT_BLOCK_SIZE,
sizeof_block_should_be_multiply_of_4096);

const IOBuf::Area IOBuf::INVALID_AREA;

IOBuf::IOBuf(const IOBuf& rhs) {
Expand Down
7 changes: 5 additions & 2 deletions src/butil/iobuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ struct ssl_st;

namespace butil {

size_t GetDefaultBlockSize();

void SetDefaultBlockSize(size_t block_size);

Comment on lines +55 to +58
// IOBuf is a non-continuous buffer that can be cut and combined w/o copying
// payload. It can be read from or flushed into file descriptors as well.
// IOBuf is [thread-compatible]. Namely using different IOBuf in different
Expand All @@ -67,7 +71,6 @@ friend class IOBufCutter;
friend class SingleIOBuf;

public:
static const size_t DEFAULT_BLOCK_SIZE = 8192;
static const size_t INITIAL_CAP = 32; // must be power of 2

struct Block;
Expand Down Expand Up @@ -775,4 +778,4 @@ inline void swap(butil::IOBuf& a, butil::IOBuf& b) {

#include "butil/iobuf_inl.h"

#endif // BUTIL_IOBUF_H
#endif // BUTIL_IOBUF_H
4 changes: 2 additions & 2 deletions src/butil/iobuf_inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ inline IOBuf::Block* create_block(const size_t block_size) {
}

inline IOBuf::Block* create_block() {
return create_block(IOBuf::DEFAULT_BLOCK_SIZE);
return create_block(butil::GetDefaultBlockSize());
}

void* cp(void *__restrict dest, const void *__restrict src, size_t n);
Expand All @@ -649,4 +649,4 @@ void* cp(void *__restrict dest, const void *__restrict src, size_t n);

} // namespace butil

#endif // BUTIL_IOBUF_INL_H
#endif // BUTIL_IOBUF_INL_H
4 changes: 2 additions & 2 deletions src/butil/single_iobuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ IOBuf::Block* SingleIOBuf::alloc_block_by_size(uint32_t data_size) {
}
}
uint32_t total_size = data_size + sizeof(IOBuf::Block);
if (total_size <= IOBuf::DEFAULT_BLOCK_SIZE) {
if (total_size <= butil::GetDefaultBlockSize()) {
_cur_block = iobuf::acquire_tls_block();
if (_cur_block != NULL) {
if (_cur_block->left_space() >= data_size) {
Expand Down Expand Up @@ -282,4 +282,4 @@ void SingleIOBuf::target_block_dec_ref(void* b) {
block->dec_ref();
}

} // namespace butil
} // namespace butil
6 changes: 3 additions & 3 deletions test/iobuf_unittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ extern IOBuf::Block* get_portal_next(IOBuf::Block const* b);
namespace {

const size_t BLOCK_OVERHEAD = 32; //impl dependent
const size_t DEFAULT_PAYLOAD = butil::IOBuf::DEFAULT_BLOCK_SIZE - BLOCK_OVERHEAD;
const size_t DEFAULT_PAYLOAD = butil::GetDefaultBlockSize() - BLOCK_OVERHEAD;

void check_tls_block() {
ASSERT_EQ((butil::IOBuf::Block*)NULL, butil::iobuf::get_tls_block_head());
Expand Down Expand Up @@ -534,7 +534,7 @@ TEST_F(IOBufTest, iobuf_sanity) {
TEST_F(IOBufTest, copy_and_assign) {
install_debug_allocator();

const size_t TARGET_SIZE = butil::IOBuf::DEFAULT_BLOCK_SIZE * 2;
const size_t TARGET_SIZE = butil::GetDefaultBlockSize() * 2;
butil::IOBuf buf0;
buf0.append("hello");
ASSERT_EQ(1u, buf0._ref_num());
Expand Down Expand Up @@ -1115,7 +1115,7 @@ TEST_F(IOBufTest, extended_backup) {
// Consume the left TLS block so that cases are easier to check.
butil::iobuf::remove_tls_block_chain();
butil::IOBuf src;
const int BLKSIZE = (i == 0 ? 1024 : butil::IOBuf::DEFAULT_BLOCK_SIZE);
const int BLKSIZE = (i == 0 ? 1024 : butil::GetDefaultBlockSize());
const int PLDSIZE = BLKSIZE - BLOCK_OVERHEAD;
butil::IOBufAsZeroCopyOutputStream out_stream1(&src, BLKSIZE);
butil::IOBufAsZeroCopyOutputStream out_stream2(&src);
Expand Down
Loading