Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -2754,9 +2754,9 @@ void replicationSetMaster(char *ip, int port) {
/* Update oom_score_adj */
setOOMScoreAdj(-1);

/* Force our slaves to resync with us as well. They may hopefully be able
* to partially resync with us, but we can notify the replid change. */
disconnectSlaves();
/* Disconnecting here prematurely causes a cascade reconnect storm
* before we even know the sync outcome,
* which breaks topology changes under load. See Redis commit cee3d67f. */
cancelReplicationHandshake(0);
/* Before destroying our master state, create a cached master using
* our own parameters, to later PSYNC with the new master. */
Expand Down
11 changes: 9 additions & 2 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "latency.h"
#include "atomicvar.h"
#include "mt19937-64.h"
#include "xredis_gtid.h"

#include <time.h>
#include <signal.h>
Expand Down Expand Up @@ -3516,6 +3517,7 @@ void initServer(void) {
memset(server.master_uuid,'0',CONFIG_RUN_ID_SIZE);
server.master_uuid[CONFIG_RUN_ID_SIZE] = 0;
server.master_uuid_len = CONFIG_RUN_ID_SIZE;
serverGtidEmbeddedClear();
server.gtid_executed = gtidSetNew();
gtidSetCurrentUuidSetUpdate(server.gtid_executed,server.uuid,server.uuid_len);
server.gtid_lost = gtidSetNew();
Expand Down Expand Up @@ -4203,7 +4205,7 @@ void call(client *c, int flags) {
/* Call propagate() only if at least one of AOF / replication
* propagation is needed. Note that modules commands handle replication
* in an explicit way, so we never replicate them automatically. */
if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE) && c->cmd->proc != gtidCommand)
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
}

Expand All @@ -4227,9 +4229,13 @@ void call(client *c, int flags) {
* in case the nested MULTI/EXEC.
*
* And if the array contains only one command, no need to
* wrap it, since the single command is atomic. */
* wrap it, since the single command is atomic.
*
* gtidCommand only rewrites what the inner command already
* decided to propagate; do not add another MULTI/EXEC layer. */
if (server.also_propagate.numops > 1 &&
!(c->cmd->flags & CMD_MODULE) &&
c->cmd->proc != gtidCommand &&
!(c->flags & CLIENT_MULTI) &&
!(flags & CMD_CALL_NOWRAP))
{
Expand All @@ -4252,6 +4258,7 @@ void call(client *c, int flags) {
}
}
redisOpArrayFree(&server.also_propagate);
serverGtidEmbeddedClear();
}
server.also_propagate = prev_also_propagate;

Expand Down
7 changes: 7 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1743,6 +1743,13 @@ struct redisServer {
long long gtid_sync_stat[GTID_SYNC_TYPES];
int gtid_gaplog_enabled;
gtidGaplog* gtid_gap_log;
/* Caller-supplied GTID identity for the current GTID-wrapped command.
* Set by gtidCommand(), so that rewriting ops
* with the caller's uuid/gno instead of auto-allocating a new one. */
char *gtid_embedded_uuid;
size_t gtid_embedded_uuid_len;
gno_t gtid_embedded_gno; /* 0 means "not set" (gno starts from 1) */
int gtid_embedded_dbid;

/* importing mode */
mstime_t importing_end_time; /* in milliseconds */
Expand Down
33 changes: 30 additions & 3 deletions tests/helpers/gen_write_load.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,44 @@ source tests/support/redis.tcl

set ::tlsdir "tests/tls"

proc gen_write_load {host port seconds tls db} {
# Continuously sends SET commands to the server. If key is omitted, a random key
# is used for every SET command. The value is always random.
proc gen_write_load {host port seconds tls db {key ""} {size 0} {sleep 0}} {
set start_time [clock seconds]
set r [redis $host $port 1 $tls]
$r client setname LOAD_HANDLER
$r select $db

# fixed size value
if {$size != 0} {
set value [string repeat "x" $size]
}

while 1 {
$r set [expr rand()] [expr rand()]
if {$size == 0} {
set value [expr rand()]
}

if {$key == ""} {
if {[catch {$r set [expr rand()] $value} err]} {
exit 0
}
} else {
if {[catch {$r set $key $value} err]} {
exit 0
}
}
if {[clock seconds]-$start_time > $seconds} {
exit 0
}
if {$sleep ne 0} {
after $sleep
}
}
}

gen_write_load [lindex $argv 0] [lindex $argv 1] [lindex $argv 2] [lindex $argv 3] [lindex $argv 4]
if {[llength $argv] >= 8} {
gen_write_load [lindex $argv 0] [lindex $argv 1] [lindex $argv 2] [lindex $argv 3] [lindex $argv 4] [lindex $argv 5] [lindex $argv 6] [lindex $argv 7]
} else {
gen_write_load [lindex $argv 0] [lindex $argv 1] [lindex $argv 2] [lindex $argv 3] [lindex $argv 4]
}
29 changes: 29 additions & 0 deletions tests/modules/propagate.c
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,25 @@ int propagateTestSimpleCommand(RedisModuleCtx *ctx, RedisModuleString **argv, in
return REDISMODULE_OK;
}

int propagateTestSingleCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);

RedisModule_Replicate(ctx,"INCR","c","single-counter");
RedisModule_ReplyWithSimpleString(ctx,"OK");
return REDISMODULE_OK;
}

int propagateTestNoReplicateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);

RedisModule_ReplyWithSimpleString(ctx,"OK");
return REDISMODULE_OK;
}

int propagateTestMixedCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
REDISMODULE_NOT_USED(argv);
Expand Down Expand Up @@ -237,6 +256,16 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
"",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;

if (RedisModule_CreateCommand(ctx,"propagate-test.single",
propagateTestSingleCommand,
"",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;

if (RedisModule_CreateCommand(ctx,"propagate-test.noreplicate",
propagateTestNoReplicateCommand,
"",1,1,1) == REDISMODULE_ERR)
return REDISMODULE_ERR;

if (RedisModule_CreateCommand(ctx,"propagate-test.mixed",
propagateTestMixedCommand,
"",1,1,1) == REDISMODULE_ERR)
Expand Down
7 changes: 4 additions & 3 deletions tests/support/util.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -515,10 +515,11 @@ proc find_valgrind_errors {stderr on_termination} {
}

# Execute a background process writing random data for the specified number
# of seconds to the specified Redis instance.
proc start_write_load {host port seconds} {
# of seconds to the specified Redis instance. If key is omitted, a random key
# is used for every SET command.
proc start_write_load {host port seconds {key ""} {size 0} {sleep 0}} {
set tclsh [info nameofexecutable]
exec $tclsh tests/helpers/gen_write_load.tcl $host $port $seconds $::tls $::target_db &
exec $tclsh tests/helpers/gen_write_load.tcl $host $port $seconds $::tls $::target_db $key $size $sleep &
}

# Stop a process generating write load executed with start_write_load.
Expand Down
10 changes: 9 additions & 1 deletion tests/test_helper.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ source tests/support/util.tcl
source tests/support/gtid.tcl

set ::gtid_tests {
gtid/gtid
gtid/6_x/gtid
gtid/6_x/aof
gtid/6_x/sync
gtid/gtid
gtid/gtid_seq
gtid/gtid_module_replicate
gtid/replication-psync
gtid/sync
gtid/xsync
Expand Down Expand Up @@ -173,6 +177,7 @@ set ::baseport 21111; # initial port for spawned redis servers
set ::portcount 8000; # we don't wanna use more than 10000 to avoid collision with cluster bus ports
set ::traceleaks 0
set ::valgrind 0
set ::asan 0
set ::durable 0
set ::tls 0
set ::stack_logging 0
Expand Down Expand Up @@ -625,6 +630,7 @@ proc send_data_packet {fd status data} {
proc print_help_screen {} {
puts [join {
"--valgrind Run the test over valgrind."
"--asan Hint tests that the server was built with ASAN."
"--durable suppress test crashes and keep running"
"--stack-logging Enable OSX leaks/malloc stack logging."
"--accurate Run slow randomized tests for more iterations."
Expand Down Expand Up @@ -685,6 +691,8 @@ for {set j 0} {$j < [llength $argv]} {incr j} {
incr j
} elseif {$opt eq {--valgrind}} {
set ::valgrind 1
} elseif {$opt eq {--asan}} {
set ::asan 1
} elseif {$opt eq {--stack-logging}} {
if {[string match {*Darwin*} [exec uname -a]]} {
set ::stack_logging 1
Expand Down
Loading