Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 51 additions & 11 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
run: |
sudo apt-get update
sudo apt-get -y install libsnappy-dev zlib1g-dev libstdc++6 liburing-dev libevent-dev libunwind-dev libgoogle-glog-dev
- name: unit-test
- name: test
run: LD_LIBRARY_PATH=/home/runner/work/Redis-On-Rocks/Redis-On-Rocks/deps/rocksdb/_build_folly/libs/glog/lib:/home/runner/work/Redis-On-Rocks/Redis-On-Rocks/deps/rocksdb/_build_folly/libs/libevent/lib make SWAP=1 unit-test -j8

# Memory-only mode tests (without SWAP)
Expand All @@ -43,6 +43,8 @@ jobs:
matrix:
platform: [ubuntu-24.04]
runs-on: ${{ matrix.platform }}
env:
MAX_MAKE_JOBS: 4
steps:
- uses: actions/checkout@v4
with:
Expand All @@ -51,17 +53,27 @@ jobs:
run: |
sudo apt-get update
sudo apt-get -y install libsnappy-dev zlib1g-dev libstdc++6 liburing-dev libevent-dev libunwind-dev libgoogle-glog-dev
- name: decide make jobs
run: |
CPU_COUNT=$(nproc)
MAKE_JOBS=$CPU_COUNT
if [ "$MAKE_JOBS" -gt "$MAX_MAKE_JOBS" ]; then
MAKE_JOBS=$MAX_MAKE_JOBS
fi
echo "MAKE_JOBS=$MAKE_JOBS" >> "$GITHUB_ENV"
- name: make
run: make -j8
run: make -j${MAKE_JOBS}
- name: test
run: make test -j8
run: make test -j${MAKE_JOBS}

# Memory-only mode with Address Sanitizer
mem-asan:
strategy:
matrix:
platform: [ubuntu-24.04]
runs-on: ${{ matrix.platform }}
env:
MAX_MAKE_JOBS: 4
steps:
- uses: actions/checkout@v4
with:
Expand All @@ -70,17 +82,27 @@ jobs:
run: |
sudo apt-get update
sudo apt-get -y install libsnappy-dev zlib1g-dev libstdc++6 liburing-dev libevent-dev libunwind-dev libgoogle-glog-dev
- name: decide make jobs
run: |
CPU_COUNT=$(nproc)
MAKE_JOBS=$CPU_COUNT
if [ "$MAKE_JOBS" -gt "$MAX_MAKE_JOBS" ]; then
MAKE_JOBS=$MAX_MAKE_JOBS
fi
echo "MAKE_JOBS=$MAKE_JOBS" >> "$GITHUB_ENV"
- name: make
run: make SANITIZER=address -j8
- name: make test
run: make test-asan -j8
run: make SANITIZER=address -j${MAKE_JOBS}
- name: test
run: make test-asan -j${MAKE_JOBS}

# SWAP mode tests (with RocksDB)
swap:
strategy:
matrix:
platform: [ubuntu-24.04]
runs-on: ${{ matrix.platform }}
env:
MAX_MAKE_JOBS: 2
steps:
- uses: actions/checkout@v4
with:
Expand All @@ -102,17 +124,27 @@ jobs:
run: |
sudo apt-get update
sudo apt-get -y install libsnappy-dev zlib1g-dev libstdc++6 liburing-dev libevent-dev libunwind-dev libgoogle-glog-dev
- name: decide make jobs
run: |
CPU_COUNT=$(nproc)
MAKE_JOBS=$CPU_COUNT
if [ "$MAKE_JOBS" -gt "$MAX_MAKE_JOBS" ]; then
MAKE_JOBS=$MAX_MAKE_JOBS
fi
echo "MAKE_JOBS=$MAKE_JOBS" >> "$GITHUB_ENV"
- name: make
run: make SWAP=1 -j8
run: make SWAP=1 -j${MAKE_JOBS}
- name: test
run: LD_LIBRARY_PATH=/home/runner/work/Redis-On-Rocks/Redis-On-Rocks/deps/rocksdb/_build_folly/libs/glog/lib:/home/runner/work/Redis-On-Rocks/Redis-On-Rocks/deps/rocksdb/_build_folly/libs/libevent/lib make SWAP=1 test -j8
run: LD_LIBRARY_PATH=/home/runner/work/Redis-On-Rocks/Redis-On-Rocks/deps/rocksdb/_build_folly/libs/glog/lib:/home/runner/work/Redis-On-Rocks/Redis-On-Rocks/deps/rocksdb/_build_folly/libs/libevent/lib make SWAP=1 test -j${MAKE_JOBS}

# SWAP mode with Address Sanitizer
swap-asan:
strategy:
matrix:
platform: [ubuntu-24.04]
runs-on: ${{ matrix.platform }}
env:
MAX_MAKE_JOBS: 2
steps:
- uses: actions/checkout@v4
with:
Expand All @@ -134,7 +166,15 @@ jobs:
run: |
sudo apt-get update
sudo apt-get -y install libsnappy-dev zlib1g-dev libstdc++6 liburing-dev libevent-dev libunwind-dev libgoogle-glog-dev
- name: decide make jobs
run: |
CPU_COUNT=$(nproc)
MAKE_JOBS=$CPU_COUNT
if [ "$MAKE_JOBS" -gt "$MAX_MAKE_JOBS" ]; then
MAKE_JOBS=$MAX_MAKE_JOBS
fi
echo "MAKE_JOBS=$MAKE_JOBS" >> "$GITHUB_ENV"
- name: make
run: make SWAP=1 SANITIZER=address -j8
- name: make test-asan
run: LD_LIBRARY_PATH=/home/runner/work/Redis-On-Rocks/Redis-On-Rocks/deps/rocksdb/_build_folly/libs/glog/lib:/home/runner/work/Redis-On-Rocks/Redis-On-Rocks/deps/rocksdb/_build_folly/libs/libevent/lib make SWAP=1 test-asan -j8
run: make SWAP=1 SANITIZER=address -j${MAKE_JOBS}
- name: test
run: LD_LIBRARY_PATH=/home/runner/work/Redis-On-Rocks/Redis-On-Rocks/deps/rocksdb/_build_folly/libs/glog/lib:/home/runner/work/Redis-On-Rocks/Redis-On-Rocks/deps/rocksdb/_build_folly/libs/libevent/lib make SWAP=1 test-asan -j${MAKE_JOBS}
8 changes: 8 additions & 0 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -566,11 +566,19 @@ distclean: clean

.PHONY: distclean

ifdef SWAP
test: $(REDIS_SERVER_NAME) $(REDIS_CHECK_AOF_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME)
@(cd ..; ./runtest --timeout 3600)

test-asan: $(REDIS_SERVER_NAME) $(REDIS_CHECK_AOF_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME)
@(cd ..; ./runtest --tags -nosanitizer --asan --timeout 3600)
else
test: $(REDIS_SERVER_NAME) $(REDIS_CHECK_AOF_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME)
@(cd ..; ./runtest)

test-asan: $(REDIS_SERVER_NAME) $(REDIS_CHECK_AOF_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME)
@(cd ..; ./runtest --tags -nosanitizer --asan)
endif

test-modules: $(REDIS_SERVER_NAME)
@(cd ..; ./runtest-moduleapi)
Expand Down
101 changes: 81 additions & 20 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,57 @@ static int updateClientOutputBufferLimit(sds *args, int arg_len, const char **er
return 1;
}

#ifdef ENABLE_SWAP
static int updateSwapBatchLimit(sds *args, int arg_len, const char **err) {
int j;
int intentions[SWAP_UTILS] = {0};
int counts[SWAP_UTILS] = {0};
unsigned long long mems[SWAP_UTILS] = {0};

if (arg_len == 0 || (arg_len % 3) != 0) {
if (err) *err = "wrong number of arguments";
return 0;
}

for (j = 0; j < arg_len; j += 3) {
int intention = getSwapIntentionByName(args[j]);
long long count;
int mem_err = 0;
unsigned long long mem;

if (intention <= 0 || intention == SWAP_UTILS) {
if (err) *err = "Unrecognized or unsupported swap intention name.";
return 0;
}

if (!string2ll(args[j+1], sdslen(args[j+1]), &count) ||
count < 0 || count > INT_MAX)
{
if (err) *err = "Invalid count setting in swap batch limit configuration.";
return 0;
}

mem = memtoull(args[j+2], &mem_err);
if (mem_err) {
if (err) *err = "Invalid memory setting in swap batch limit configuration.";
return 0;
}

intentions[intention] = 1;
counts[intention] = count;
mems[intention] = mem;
}

for (j = 1; j < SWAP_UTILS; j++) {
if (!intentions[j]) continue;
server.swap_batch_limits[j].count = counts[j];
server.swap_batch_limits[j].mem = mems[j];
}

return 1;
}
#endif

/* Note this is here to support detecting we're running a config set from
* within conf file parsing. This is only needed to support the deprecated
* abnormal aggregate `save T C` functionality. Remove in the future. */
Expand Down Expand Up @@ -597,22 +648,6 @@ void loadServerConfigFromString(char *config) {
err = "Target command name already exists"; goto loaderr;
}
}
#ifdef ENABLE_SWAP
} else if (!strcasecmp(argv[0],"swap-batch-limit") &&
argc == 4)
{
int intention = getSwapIntentionByName(argv[1]), count;
unsigned long long mem;

if (intention <= 0 || intention == SWAP_UTILS) {
err = "Unrecognized or unsupported swap intention name.";
goto loaderr;
}
count = atoi(argv[2]);
mem = memtoull(argv[3],NULL);
server.swap_batch_limits[intention].count = count;
server.swap_batch_limits[intention].mem = mem;
#endif
} else if (!strcasecmp(argv[0],"user") && argc >= 2) {
int argc_err;
if (ACLAppendUserForLoading(argv,argc,&argc_err) == C_ERR) {
Expand Down Expand Up @@ -1572,9 +1607,9 @@ void rewriteConfigClientOutputBufferLimitOption(standardConfig *config, const ch

#ifdef ENABLE_SWAP
/* Rewrite the swap-batch-limit option. */
void rewriteConfigSwapBatchlimitOption(struct rewriteConfigState *state) {
void rewriteConfigSwapBatchlimitOption(standardConfig *config, const char *name, struct rewriteConfigState *state) {
UNUSED(config);
int j;
char *option = "swap-batch-limit";

for (j = 1; j < SWAP_UTILS; j++) {
int force = (server.swap_batch_limits[j].count !=
Expand All @@ -1589,8 +1624,8 @@ void rewriteConfigSwapBatchlimitOption(struct rewriteConfigState *state) {

const char *typename = swapIntentionName(j);
line = sdscatprintf(sdsempty(),"%s %s %d %s",
option, typename, server.swap_batch_limits[j].count, mem);
rewriteConfigRewriteLine(state,option,line,force);
name, typename, server.swap_batch_limits[j].count, mem);
rewriteConfigRewriteLine(state,name,line,force);
}
}
#endif
Expand Down Expand Up @@ -3202,6 +3237,29 @@ static sds getConfigClientOutputBufferLimitOption(standardConfig *config) {
return buf;
}

#ifdef ENABLE_SWAP
static int setConfigSwapBatchLimitOption(standardConfig *config, sds *argv, int argc, const char **err) {
UNUSED(config);
return updateSwapBatchLimit(argv, argc, err);
}

static sds getConfigSwapBatchLimitOption(standardConfig *config) {
UNUSED(config);
sds buf = sdsempty();
int j;

for (j = 1; j < SWAP_UTILS; j++) {
buf = sdscatprintf(buf, "%s %d %llu",
swapIntentionName(j),
server.swap_batch_limits[j].count,
server.swap_batch_limits[j].mem);
if (j != SWAP_UTILS - 1)
buf = sdscatlen(buf, " ", 1);
}
return buf;
}
#endif

/* Parse an array of CONFIG_OOM_COUNT sds strings, validate and populate
* server.oom_score_adj_values if valid.
*/
Expand Down Expand Up @@ -3813,6 +3871,9 @@ standardConfig static_configs[] = {
createSpecialConfig("dir", NULL, MODIFIABLE_CONFIG | PROTECTED_CONFIG | DENY_LOADING_CONFIG, setConfigDirOption, getConfigDirOption, rewriteConfigDirOption, NULL),
createSpecialConfig("save", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigSaveOption, getConfigSaveOption, rewriteConfigSaveOption, NULL),
createSpecialConfig("client-output-buffer-limit", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigClientOutputBufferLimitOption, getConfigClientOutputBufferLimitOption, rewriteConfigClientOutputBufferLimitOption, NULL),
#ifdef ENABLE_SWAP
createSpecialConfig("swap-batch-limit", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigSwapBatchLimitOption, getConfigSwapBatchLimitOption, rewriteConfigSwapBatchlimitOption, NULL),
#endif
createSpecialConfig("oom-score-adj-values", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigOOMScoreAdjValuesOption, getConfigOOMScoreAdjValuesOption, rewriteConfigOOMScoreAdjValuesOption, updateOOMScoreAdj),
createSpecialConfig("notify-keyspace-events", NULL, MODIFIABLE_CONFIG, setConfigNotifyKeyspaceEventsOption, getConfigNotifyKeyspaceEventsOption, rewriteConfigNotifyKeyspaceEventsOption, NULL),
createSpecialConfig("bind", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigBindOption, getConfigBindOption, rewriteConfigBindOption, applyBind),
Expand Down
22 changes: 21 additions & 1 deletion src/ctrip_swap.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,23 @@
#include "server.h"
#include <rocksdb/c.h>
#include "atomicvar.h"

/* objectMeta stores heap pointers (listMeta*, bitmapMeta*) in a 60-bit
* bitfield (ptr:60). ASAN's LeakSanitizer performs a conservative scan of
* reachable memory at process exit: it reads every pointer-sized word and
* checks whether the value equals a live heap address. The raw memory word
* for ptr:60 is (heap_addr << 4) | swap_type_nibble, which is NOT a valid
* heap address, so LSan reports these objects as direct leaks.
*
* Use __lsan_ignore_object() to tell LSan "this object is intentionally
* reachable via an unconventional (bitfield) pointer; do not report it."
* The macro is a no-op in non-ASAN builds. */
#ifdef __SANITIZE_ADDRESS__
#include <sanitizer/lsan_interface.h>
#define LSAN_IGNORE_OBJECT(ptr) __lsan_ignore_object(ptr)
#else
#define LSAN_IGNORE_OBJECT(ptr) ((void)(ptr))
#endif
#include "ctrip_lru_cache.h"
#include "ctrip_cuckoo_filter.h"
#include "ctrip_swap_adlist.h"
Expand Down Expand Up @@ -563,6 +580,10 @@ static inline void *objectMetaGetPtr(objectMeta *object_meta) {
return (void*)(unsigned long long)object_meta->ptr;
}
static inline void objectMetaSetPtr(objectMeta *object_meta, void *ptr) {
/* Inform LSan that ptr is reachable via the ptr:60 bitfield. Without this,
* ASAN reports ptr as a direct leak because (heap_addr<<4)|swap_type is not
* a recognizable heap pointer. */
if (ptr) LSAN_IGNORE_OBJECT(ptr);
object_meta->ptr = (unsigned long long)ptr;
}

Expand Down Expand Up @@ -2460,7 +2481,6 @@ void _rdbSaveBackground(client *c, swapCtx *ctx);

typedef struct swapRdbSaveCtx {
long key_count;
size_t processed;
long long info_updated_time;
list *hot_keys_extension;
redisDb *rehash_paused_db;
Expand Down
6 changes: 0 additions & 6 deletions src/ctrip_swap_meta.c
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,6 @@ swapScanSession *swapScanSessionsAssign(swapScanSessions *sessions) {
listDelNode(sessions->free, ln);
id = sessionId2RaxKey(session->session_id);
raxInsert(sessions->assigned,(unsigned char*)&id,sizeof(id),session,NULL);
// TODO remove serverLog(LL_WARNING, "[xxx] insert %lu => %ld", id, session->session_id);
} else {
/* Try assign the least active, assign fails if session is not
* idled long enough. */
Expand Down Expand Up @@ -599,8 +598,6 @@ void swapScanSessionUnassign(swapScanSessions *sessions, swapScanSession *sessio
void *session_;
uint64_t id = sessionId2RaxKey(session->session_id);

// TODO remove serverLog(LL_WARNING, "[xxx] remove %lu => %ld", id, session->session_id);

if (raxRemove(sessions->assigned, (unsigned char *)&id, sizeof(id),
&session_)) {
serverAssert(session == session_);
Expand All @@ -615,9 +612,6 @@ swapScanSession *swapScanSessionsFind(swapScanSessions *sessions,
swapScanSession *session = NULL;
if (!raxFind(sessions->assigned,
(unsigned char*)&id, sizeof(id), &session)) session = NULL;

// TODO remove serverLog(LL_WARNING, "[xxx] find %lu => %lu", id, (session ? session->session_id : 999));

return session;
}

Expand Down
17 changes: 5 additions & 12 deletions src/ctrip_swap_rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,6 @@ int rdbSaveInfoSetSfrctx(rdbSaveInfo *rsiptr, swapForkRocksdbCtx *sfrctx) {

void swapRdbSaveCtxInit(swapRdbSaveCtx *ctx, int rdbflags, int rordb) {
ctx->key_count = 0;
ctx->processed = 0;
ctx->info_updated_time = 0;
ctx->hot_keys_extension = NULL;
ctx->rehash_paused_db = NULL;
Expand Down Expand Up @@ -1077,17 +1076,11 @@ int swapRdbSaveKeyValuePair(rio *rdb, redisDb *db, robj *key, robj *o,
void swapRdbSaveProgress(rio *rdb, swapRdbSaveCtx *ctx) {
char *pname = (ctx->rdbflags & RDBFLAGS_AOF_PREAMBLE) ? "AOF rewrite" : "RDB";

/* When this RDB is produced as part of an AOF rewrite, move
* accumulated diff from parent to child while rewriting in
* order to have a smaller final write. */
if (ctx->rdbflags & RDBFLAGS_AOF_PREAMBLE
// LATTE_TO_DO && rdb->processed_bytes > ctx->processed+AOF_READ_DIFF_INTERVAL_BYTES
)
{
ctx->processed = rdb->processed_bytes;
//LATTE_TO_DO
// aofReadDiffFromParent();
}
/* Note: in older Redis, aofReadDiffFromParent() was called here to pull
* parent-accumulated diffs via pipe during AOF rewrite. Redis 8 replaced
* that mechanism with multi-part AOF (manifest + INCR files): the parent
* now writes new commands directly to a new INCR AOF file opened before
* the fork, so the child no longer needs to read any diff. */

/* Update child info every 1 second (approximately).
* in order to avoid calling mstime() on each iteration, we will
Expand Down
Loading
Loading