[Sync replication] Make sync replication configurable (#3038)

This PR

1. Add a configurable flag `sync-replication` to turn the feature on and
off. It handles the toggling on and off of the feature
2. Adds the beginnings of test coverage for the feature. 


```
❯ ./valkey-unit-tests --single test_reply_blocking.c
Tests will run with seed=C9E48F2E3BBB54FF8CED7B81482C794FE17CC776D9F400F0C75E0FDE5F98133C03DE9F61EDBEA1CA7C7E7214D52131A8578975156F148EFDAAE4E4DB1D50D815
[START] - test_reply_blocking.c
[ok] - test_reply_blocking.c:test_durableClientInitAndReset
[ok] - test_reply_blocking.c:test_isDurabilityEnabled
[ok] - test_reply_blocking.c:test_isPrimaryDurabilityEnabled
[ok] - test_reply_blocking.c:test_isClientReplyBufferLimited
[ok] - test_reply_blocking.c:test_durableInit
[ok] - test_reply_blocking.c:test_blockLastResponseIfExist
[ok] - test_reply_blocking.c:test_durablePurgeAndGetUncommittedKeyOffset
34625:C 09 Jan 2026 18:16:21.209 . preCall hook entered
34625:C 09 Jan 2026 18:16:21.212 . preCall hook: pre_call_replication_offset=0, pre_call_num_ops_pending_propagation=0
34625:C 09 Jan 2026 18:16:21.212 . Cleanup reply blocking structures
34625:C 09 Jan 2026 18:16:21.212 * Clearing all uncommitted keys for AMZ sync replication
[ok] - test_reply_blocking.c:test_beforeCommandTrackReplOffset
34625:C 09 Jan 2026 18:16:21.212 . preCommandExec hook entered for command 'get'
34625:C 09 Jan 2026 18:16:21.212 . preCommandExec hook: durability not enabled, allowing
34625:C 09 Jan 2026 18:16:21.212 . preCommandExec hook entered for command 'get'
34625:C 09 Jan 2026 18:16:21.212 . Cleanup reply blocking structures
34625:C 09 Jan 2026 18:16:21.212 * Clearing all uncommitted keys for AMZ sync replication
[ok] - test_reply_blocking.c:test_preCommandExec
[END] - test_reply_blocking.c: 10 tests, 10 passed, 0 failed
1 test suites executed, 1 passed, 0 failed
```

```
❯ ./runtest --single durability/reply_blocking
Cleanup: may take some time... OK
Starting test server at port 21079
[ready]: 34851
Testing durability/reply_blocking
[ok]: Sync replication blocks replies until replica acks (115 ms)
[ok]: Sync replication blocks reads on dirty keys (1450 ms)
[ok]: Sync replication toggling disables reply blocking (4 ms)
[ok]: Disabling sync replication unblocks pending replies (3 ms)
[ok]: Check for memory leaks (pid 34879) (5553 ms)
[ok]: Check for memory leaks (pid 34855) (2194 ms)
[1/1 done]: durability/reply_blocking (11 seconds)

                   The End

Execution time of different units:
  11 seconds - durability/reply_blocking

Test Summary: 6 passed, 0 failed
```

---------

Signed-off-by: jjuleslasarte <jules.lasarte@gmail.com>
This commit is contained in:
jjuleslasarte
2026-01-09 14:43:37 -08:00
committed by GitHub
parent 1797768d5d
commit bc2295e78f
11 changed files with 668 additions and 31 deletions
+1
View File
@@ -97,6 +97,7 @@ void commandlogInit(void) {
}
}
/* Push a new entry into the command log.
* This function will make sure to trim the command log accordingly to the
* configured max length. */
-1
View File
@@ -48,5 +48,4 @@ typedef struct commandlogEntry {
/* Exported API */
void commandlogInit(void);
#endif /* __COMMANDLOG_H__ */
+7
View File
@@ -2663,6 +2663,12 @@ int updateAppendFsync(const char **err) {
return 1;
}
static int updateSynReplicationEnabled(const char **err) {
UNUSED(err);
durabilityReset();
return 1;
}
/* applyBind affects both TCP and TLS (if enabled) together */
static int applyBind(const char **err) {
connListener *tcp_listener = listenerByType(CONN_TYPE_SOCKET);
@@ -3251,6 +3257,7 @@ standardConfig static_configs[] = {
createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL),
createBoolConfig("lua-enable-insecure-api", "lua-enable-deprecated-api", MODIFIABLE_CONFIG | HIDDEN_CONFIG | PROTECTED_CONFIG, server.lua_enable_insecure_api, 0, NULL, updateLuaEnableInsecureApi),
createBoolConfig("import-mode", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.import_mode, 0, NULL, NULL),
createBoolConfig("sync-replication", NULL, MODIFIABLE_CONFIG, server.durability.sync_replication_enabled, 0, NULL, updateSynReplicationEnabled),
/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
+127 -23
View File
@@ -1,6 +1,7 @@
#include "reply_blocking.h"
#include "expire.h"
#include "server.h"
#include "zmalloc.h"
#include <assert.h>
#include <math.h>
@@ -41,9 +42,7 @@ static long long getSingleCommandBlockingOffsetForConsistentWrites(struct client
* return 1 if durability is enabled, 0 otherwise.
*/
int isDurabilityEnabled(void) {
// should this have its own flag?
// or general 'durability flag'
return true;
return server.durability.sync_replication_enabled;
}
int isPrimaryDurabilityEnabled(void) {
@@ -232,6 +231,15 @@ static int unblockClientWaitingReplicaAck(struct client *c) {
return 0;
}
/* Add a free function for blockedResponse entries */
static void blockedResponseFree(void *value) {
blockedResponse *br = (blockedResponse *)value;
/* If the blocked response ever contains dynamically allocated fields,
free them here. For now the structure only holds pointers owned
elsewhere, so we simply free the structure itself. */
zfree(br);
}
/**
* Initialize the durable write client attributes when client is created
*/
@@ -241,7 +249,7 @@ void durableClientInit(struct client *c) {
}
if (c->clientDurabilityInfo.blocked_responses == NULL) {
c->clientDurabilityInfo.blocked_responses = listCreate();
listSetFreeMethod(c->clientDurabilityInfo.blocked_responses, zfree);
listSetFreeMethod(c->clientDurabilityInfo.blocked_responses, blockedResponseFree);
resetPreExecutionOffset(c);
c->clientDurabilityInfo.current_command_repl_offset = -1;
}
@@ -395,7 +403,7 @@ void blockClientOnReplOffset(struct client *c, long long blockingReplOffset) {
/* If needed, we block the client and put it into our list of clients
* waiting for ack from slaves. */
if (isBlockingNeededForOffset(c, blockingReplOffset)) {
serverLog(LOG_DEBUG, "client should be blocked at offset %lld,", blockingReplOffset);
serverLog(LL_DEBUG, "client should be blocked at offset %lld,", blockingReplOffset);
blockLastResponseIfExist(c, blockingReplOffset);
if (!c->clientDurabilityInfo.durable_blocked_client) {
listAddNodeTail(server.durability.clients_waiting_replica_ack, c);
@@ -436,12 +444,11 @@ static void blockClientAndMonitorsOnReplOffset(struct client *c, long long block
* @param consensus_ack_offset Repl offset that have been acked by the required number of replicas
*/
void unblockResponsesWithAckOffset(struct durable_t *durability, long long consensus_ack_offset) {
serverLog(LOG_DEBUG, "unblocking clients for consensus offset %lld,", consensus_ack_offset);
serverLog(LL_DEBUG, "unblocking clients for consensus offset %lld,", consensus_ack_offset);
// Traverses through all the clients that wait for replica ack
listIter li, li_response;
listNode *ln, *ln_response;
listIter li;
listNode *ln;
listRewind(durability->clients_waiting_replica_ack, &li);
blockedResponse *br = NULL;
while ((ln = listNext(&li))) {
client *c = ln->value;
@@ -450,16 +457,16 @@ void unblockResponsesWithAckOffset(struct durable_t *durability, long long conse
// ACK'ed by the required number of replicas
// If the max repl offset is acked, all blocked responses will be unblocked
serverAssert(c->clientDurabilityInfo.blocked_responses != NULL);
listRewind(c->clientDurabilityInfo.blocked_responses, &li_response);
bool unblocked_responses = false;
while ((ln_response = listNext(&li_response))) {
br = listNodeValue(ln_response);
// Keep deleting from the front while the first response can be unblocked
while (listLength(c->clientDurabilityInfo.blocked_responses) > 0) {
listNode *first = listFirst(c->clientDurabilityInfo.blocked_responses);
blockedResponse *br = listNodeValue(first);
if (br->primary_repl_offset <= consensus_ack_offset) {
unblockFirstResponse(c);
if (unblocked_responses == false) {
unblocked_responses = true;
}
unblocked_responses = true;
} else {
// As soon as we encounter a client response that has the
// required reply offset greater than the replicas ACK'ed offset,
@@ -486,7 +493,7 @@ void unblockResponsesWithAckOffset(struct durable_t *durability, long long conse
/* Check if there are clients blocked that can be unblocked since
* we received enough ACKs from replicas */
void postReplicaAck(void) {
serverLog(LOG_DEBUG, "postReplicaAck hook entered");
serverLog(LL_DEBUG, "postReplicaAck hook entered");
if (!isPrimaryDurabilityEnabled()) {
return;
}
@@ -650,6 +657,29 @@ void clearUncommittedKeysAcknowledged(void) {
}
}
void durableInitDatabase(serverDb *db) {
db->uncommitted_keys = raxNew();
db->dirty_repl_offset = -1;
db->scan_in_progress = 0;
}
/**
* Utility function to clear all uncommitted keys for each database
*/
static inline void clearAllUncommittedKeys(void) {
serverLog(LL_NOTICE, "Clearing all uncommitted keys for sync replication");
for (int i = 0; i < server.dbnum; i++) {
serverDb *db = server.db[i];
if (db == NULL) continue;
raxFree(db->uncommitted_keys);
if (db->scan_in_progress) {
raxStop(&db->next_scan_iter);
}
durableInitDatabase(db);
}
server.durability.curr_db_scan_idx = 0;
}
/*========================== Command access validation ====================== */
/**
@@ -738,11 +768,13 @@ static long long getSingleCommandBlockingOffsetForReplicatingCommand(client *c)
if (c->cmd->proc == moveCommand) {
// TODO: support MOVE command: we need to mark the key as dirty in the destination DB
// dont block for now
getKeysFreeResult(&result);
return -1;
} else if (c->cmd->proc == copyCommand) {
// TODO: handle copy command
// handle the dirty keys in the destination db
// dont block for now
getKeysFreeResult(&result);
return -1;
}
@@ -767,6 +799,7 @@ static long long getSingleCommandBlockingOffsetForNonReplicatingCommand(client *
long long blocking_repl_offset = -1;
// TODO: handle function, module, etc
if (c->cmd->flags & (CMD_READONLY | CMD_WRITE)) {
serverLog(LL_DEBUG, "getSingleCommandBlockingOffsetForNonReplicatingCommand for command");
// For read/write commands that didn't generate replication data, we would block
// on the highest offset of all accessed uncommitted keys and the valkey DBs itself.
// Note some commands categorized as writes can perform read only operations
@@ -834,12 +867,12 @@ static long long getSingleCommandBlockingOffsetForConsistentWrites(struct client
* about to be executed.
*/
void beforeCommandTrackReplOffset(void) {
serverLog(LOG_DEBUG, "preCall hook entered");
serverLog(LL_DEBUG, "preCall hook entered");
if (!isPrimaryDurabilityEnabled()) return;
server.durability.pre_call_replication_offset = server.primary_repl_offset;
server.durability.pre_call_num_ops_pending_propagation = server.also_propagate.numops;
serverLog(LOG_DEBUG, "preCall hook: pre_call_replication_offset=%lld, pre_call_num_ops_pending_propagation=%d",
serverLog(LL_DEBUG, "preCall hook: pre_call_replication_offset=%lld, pre_call_num_ops_pending_propagation=%d",
server.durability.pre_call_replication_offset, server.durability.pre_call_num_ops_pending_propagation);
}
@@ -857,7 +890,7 @@ void beforeCommandTrackReplOffset(void) {
*/
void afterCommandTrackReplOffset(struct client *c) {
// log debug tracing
serverLog(LOG_DEBUG, "Call hook entered for command '%s'", c->cmd->declared_name);
serverLog(LL_DEBUG, "Call hook entered for command '%s'", c->cmd->declared_name);
if (!isPrimaryDurabilityEnabled() || (c->flag.blocked))
return;
@@ -884,10 +917,10 @@ void afterCommandTrackReplOffset(struct client *c) {
* in the reply COB of the client and all the connected monitors.
*/
int preCommandExec(struct client *c) {
serverLog(LOG_DEBUG, "preCommandExec hook entered for command '%s'",
serverLog(LL_DEBUG, "preCommandExec hook entered for command '%s'",
c->cmd ? c->cmd->declared_name : "NULL");
if (!isDurabilityEnabled()) {
serverLog(LOG_DEBUG, "preCommandExec hook: durability not enabled, allowing");
serverLog(LL_DEBUG, "preCommandExec hook: durability not enabled, allowing");
return CMD_FILTER_ALLOW;
}
@@ -930,7 +963,7 @@ void postCommandExec(struct client *c) {
if (!isPrimaryDurabilityEnabled()) {
return;
}
serverLog(LOG_DEBUG, "postCommandExec hook entered for command '%s'",
serverLog(LL_DEBUG, "postCommandExec hook entered for command '%s'",
c->cmd ? c->cmd->declared_name : "NULL");
// If the command is NULL or is in a MULTI/EXEC block, then we skip
// TODO: handle multi
@@ -955,7 +988,6 @@ void postCommandExec(struct client *c) {
/**
* Function used to initialize the durability datastructures.
* TODO: exit clean up?
*/
void durableInit(void) {
// Initialize synchronous replication
@@ -963,3 +995,75 @@ void durableInit(void) {
server.durability.curr_db_scan_idx = 0;
server.durability.clients_waiting_replica_ack = listCreate();
}
/**
* Function used to cleanup the durability datastructures on server shutdown.
*/
void durableCleanup(void) {
serverLog(LL_DEBUG, "Cleanup reply blocking structures");
server.durability.replica_offsets_size = 0;
zfree(server.durability.replica_offsets);
if (server.durability.clients_waiting_replica_ack != NULL) {
listRelease(server.durability.clients_waiting_replica_ack);
server.durability.clients_waiting_replica_ack = NULL;
}
clearAllUncommittedKeys();
}
/**
* Utility function to release buffer used for replica offsets
*/
static inline void releaseReplicaOffsetBuffer(void) {
server.durability.replica_offsets_size = 0;
zfree(server.durability.replica_offsets);
server.durability.replica_offsets = NULL;
}
/**
* Function to reset primary state for synchronous replication
* This function will be invoked at primary when sync replication is dynamically disabled or it becomes a replica.
* Both cases require related resources to be reset to initial state. The only difference is how we handle the
* clients waiting for replica ACK. If sync replication is disabled, all blocked responses and keyspace
* notifications will be flushed for these clients, whose connections to primary will still be maintained.
* TODO: if the primary becomes a replica, all these clients should be disconnected and freed.
*/
static inline void syncReplicationResetPrimaryState() {
// Release buffer we use for replica offsets
releaseReplicaOffsetBuffer();
if (listLength(server.durability.clients_waiting_replica_ack) > 0) {
// Flush all blocked response and keyspace notifications to clients waiting for replica ACK
unblockResponsesWithAckOffset(&server.durability, LLONG_MAX);
// Make sure there is no clients waiting for replica ACK
serverAssert(listLength(server.durability.clients_waiting_replica_ack) == 0);
}
}
/**
* Reset related resources when disabling synchronous replication
* This method is invoked when user turns off durability via config set command
*/
void durabilityReset(void) {
if (isDurabilityEnabled()) {
// To enable sync replication, we update the pre-command offset so that the CONFIG SET command
// itself doesn't get inadvertently blocked because the primary replication offset is greater
// than the stale pre_command_replication_offset.
server.durability.pre_command_replication_offset = server.primary_repl_offset;
listIter li;
listNode *ln;
listRewind(server.clients, &li);
while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);
durableClientInit(c);
}
} else {
// To disable durable write, we need to flush all blocked responses and keyspace
// notifications, and then reset all durability related resources to initial state at the primary node.
if (iAmPrimary()) {
syncReplicationResetPrimaryState();
}
clearAllUncommittedKeys();
}
}
+6 -1
View File
@@ -24,6 +24,8 @@ typedef long long mstime_t;
* Durability container to house all the durability related fields.
*/
typedef struct durable_t {
/* Flag to enable/disable sync replication (durability) */
int sync_replication_enabled;
/* Uncommitted keys cleanup configuration time limit in milliseconds */
unsigned int keys_cleanup_time_limit_ms;
/* The current scanning database index, starting from 0 */
@@ -98,9 +100,12 @@ typedef struct clientDurabilityInfo {
* Init
*/
void durableInit(void);
void durableCleanup(void);
void durabilityReset(void);
void durableClientInit(struct client *c);
void durableClientReset(struct client *c);
/*
/**
Command processing hooks for offset and cob tracking
*/
void beforeCommandTrackReplOffset(void);
+3 -5
View File
@@ -3033,12 +3033,7 @@ void initServer(void) {
/* Initialize the EVAL scripting component. */
evalInit();
commandlogInit();
latencyMonitorInit();
initSharedQueryBuf();
durableInit();
/* Initialize ACL default password if it exists */
ACLUpdateDefaultUserPassword(server.requirepass);
applyWatchdogPeriod();
@@ -4792,6 +4787,9 @@ int finishShutdown(void) {
/* Fire the shutdown modules event. */
moduleFireServerEvent(VALKEYMODULE_EVENT_SHUTDOWN, 0, NULL);
/* Cleanup durability tracking resources. */
durableCleanup();
/* Remove the pid file if possible and needed. */
if (server.daemonize || server.pidfile) {
serverLog(LL_NOTICE, "Removing the pid file.");
+1
View File
@@ -950,6 +950,7 @@ typedef struct serverDb {
long long dirty_repl_offset; /* Replication offset for a dirty DB */
raxIterator next_scan_iter; /* The next iterator for db scan */
int scan_in_progress; /* Flag of showing whether db is in scan or not */
rax *reply_duration; /* Radix tree tracking reply durations for durable blocked clients */
} serverDb;
/* forward declaration for functions ctx */
+1 -1
View File
@@ -169,7 +169,7 @@ int test_entryUpdate(int argc, char **argv, int flags) {
// Update the value so that memory usage is less than 3/4 of the current allocation size
// Ensuring required_embedded_size < current_embedded_allocation_size * 3 / 4, which creates a new entry
size_t current_embedded_allocation_size = entryMemUsage(e9);
sds value10 = sdsnew("xxxxxxxxxxxxxxxxxxxxx");
sds value10 = sdsnew("xxxxxx");
sds value_copy10 = sdsdup(value10);
long long expiry10 = expiry9;
entry *e10 = entryUpdate(e9, value10, expiry10);
File diff suppressed because one or more lines are too long
+397
View File
@@ -0,0 +1,397 @@
#include "../fmacros.h"
#include "../reply_blocking.h"
#include "test_help.h"
#include "../server.h"
#include <stdio.h>
#include <limits.h>
#include <string.h>
void blockLastResponseIfExist(struct client *c, long long blocked_offset);
static void initReplyBlockingTestEnv(void) {
static char test_logfile[] = "";
if (server.logfile == NULL) {
/* serverLogRaw dereferences server.logfile; use stdout for unit tests. */
server.logfile = test_logfile;
}
}
static clientReplyBlock *createReplyBlock(size_t used) {
clientReplyBlock *block = zmalloc(sizeof(clientReplyBlock) + used);
block->size = used;
block->used = used;
block->last_header = NULL;
block->flag.buf_encoded = 0;
return block;
}
/**
* Test durableClientInit and durableClientReset functions
* These functions initialize and reset client durability attributes
*/
int test_durableClientInitAndReset(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);
initReplyBlockingTestEnv();
// Create a mock client
client *c = zcalloc(sizeof(client));
c->clientDurabilityInfo.blocked_responses = NULL;
c->clientDurabilityInfo.durable_blocked_client = 0;
c->clientDurabilityInfo.current_command_repl_offset = 0;
// Test initialization when durability is disabled
server.durability.sync_replication_enabled = 0;
durableClientInit(c);
TEST_ASSERT(c->clientDurabilityInfo.blocked_responses == NULL);
// Test initialization when durability is enabled
server.durability.sync_replication_enabled = 1;
durableClientInit(c);
TEST_ASSERT(c->clientDurabilityInfo.blocked_responses != NULL);
TEST_ASSERT(listLength(c->clientDurabilityInfo.blocked_responses) == 0);
TEST_ASSERT(c->clientDurabilityInfo.offset.recorded == false);
TEST_ASSERT(c->clientDurabilityInfo.offset.reply_block == NULL);
TEST_ASSERT(c->clientDurabilityInfo.offset.byte_offset == 0);
TEST_ASSERT(c->clientDurabilityInfo.current_command_repl_offset == -1);
// Test reset
durableClientReset(c);
TEST_ASSERT(c->clientDurabilityInfo.blocked_responses == NULL);
TEST_ASSERT(c->clientDurabilityInfo.offset.recorded == false);
TEST_ASSERT(c->clientDurabilityInfo.current_command_repl_offset == -1);
// Reset to default
server.durability.sync_replication_enabled = 0;
// Cleanup
zfree(c);
return 0;
}
/**
* Test isDurabilityEnabled function
*/
int test_isDurabilityEnabled(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);
// Test that durability is disabled by default
server.durability.sync_replication_enabled = 0;
TEST_ASSERT(isDurabilityEnabled() == 0);
// Test that durability can be enabled
server.durability.sync_replication_enabled = 1;
TEST_ASSERT(isDurabilityEnabled() == 1);
// Reset to default
server.durability.sync_replication_enabled = 0;
return 0;
}
/**
* Test isPrimaryDurabilityEnabled function
*/
int test_isPrimaryDurabilityEnabled(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);
// Enable sync replication for testing
server.durability.sync_replication_enabled = 1;
// Test when server is primary (not a replica)
server.primary_host = NULL;
TEST_ASSERT(isPrimaryDurabilityEnabled() == 1);
// Test when server is a replica
server.primary_host = sdsnew("127.0.0.1");
TEST_ASSERT(isPrimaryDurabilityEnabled() == 0);
// Test when durability is disabled but server is primary
server.durability.sync_replication_enabled = 0;
sdsfree(server.primary_host);
server.primary_host = NULL;
TEST_ASSERT(isPrimaryDurabilityEnabled() == 0);
// Cleanup and reset
server.durability.sync_replication_enabled = 0;
return 0;
}
/**
* Test isClientReplyBufferLimited function
*/
int test_isClientReplyBufferLimited(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);
// Create a mock client
client *c = zcalloc(sizeof(client));
// Case 1: No blocked_responses list
c->clientDurabilityInfo.blocked_responses = NULL;
TEST_ASSERT(isClientReplyBufferLimited(c) == false);
// Case 2: Empty blocked_responses list
c->clientDurabilityInfo.blocked_responses = listCreate();
TEST_ASSERT(isClientReplyBufferLimited(c) == false);
// Case 3: Non-empty blocked_responses list
blockedResponse *br = zcalloc(sizeof(blockedResponse));
br->primary_repl_offset = 100;
br->disallowed_byte_offset = 0;
br->disallowed_reply_block = NULL;
listAddNodeTail(c->clientDurabilityInfo.blocked_responses, br);
TEST_ASSERT(isClientReplyBufferLimited(c) == true);
// Cleanup
listSetFreeMethod(c->clientDurabilityInfo.blocked_responses, zfree);
listRelease(c->clientDurabilityInfo.blocked_responses);
zfree(c);
return 0;
}
/**
* Test durableInit function
*/
int test_durableInit(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);
// Clean up any existing state
if (server.durability.clients_waiting_replica_ack != NULL) {
listRelease(server.durability.clients_waiting_replica_ack);
}
// Initialize durability
durableInit();
// Verify initialization
TEST_ASSERT(server.durability.sync_replication_enabled == 0);
TEST_ASSERT(server.durability.clients_waiting_replica_ack != NULL);
TEST_ASSERT(listLength(server.durability.clients_waiting_replica_ack) == 0);
TEST_ASSERT(server.durability.previous_acked_offset == -1);
TEST_ASSERT(server.durability.curr_db_scan_idx == 0);
// Cleanup to prevent memory leak
listRelease(server.durability.clients_waiting_replica_ack);
server.durability.clients_waiting_replica_ack = NULL;
return 0;
}
/**
* Test blockLastResponseIfExist function
*/
int test_blockLastResponseIfExist(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);
// Case 1: Response appended to initial buffer
client *c = zcalloc(sizeof(client));
c->clientDurabilityInfo.blocked_responses = listCreate();
listSetFreeMethod(c->clientDurabilityInfo.blocked_responses, zfree);
c->bufpos = 5;
c->clientDurabilityInfo.offset.recorded = true;
c->clientDurabilityInfo.offset.reply_block = NULL;
c->clientDurabilityInfo.offset.byte_offset = 3;
c->reply = listCreate();
blockLastResponseIfExist(c, 42);
TEST_ASSERT(listLength(c->clientDurabilityInfo.blocked_responses) == 1);
blockedResponse *br = listNodeValue(listFirst(c->clientDurabilityInfo.blocked_responses));
TEST_ASSERT(br->primary_repl_offset == 42);
TEST_ASSERT(br->disallowed_reply_block == NULL);
TEST_ASSERT(br->disallowed_byte_offset == 3);
listRelease(c->clientDurabilityInfo.blocked_responses);
listRelease(c->reply);
zfree(c);
// Case 2: Response appended to reply list (first block)
c = zcalloc(sizeof(client));
c->clientDurabilityInfo.blocked_responses = listCreate();
listSetFreeMethod(c->clientDurabilityInfo.blocked_responses, zfree);
c->reply = listCreate();
listSetFreeMethod(c->reply, zfree);
clientReplyBlock *first = createReplyBlock(4);
clientReplyBlock *second = createReplyBlock(2);
listAddNodeTail(c->reply, first);
listAddNodeTail(c->reply, second);
c->bufpos = 3;
c->clientDurabilityInfo.offset.recorded = true;
c->clientDurabilityInfo.offset.reply_block = NULL;
c->clientDurabilityInfo.offset.byte_offset = 3;
blockLastResponseIfExist(c, 99);
TEST_ASSERT(listLength(c->clientDurabilityInfo.blocked_responses) == 1);
br = listNodeValue(listFirst(c->clientDurabilityInfo.blocked_responses));
TEST_ASSERT(br->primary_repl_offset == 99);
TEST_ASSERT(br->disallowed_reply_block == listFirst(c->reply));
TEST_ASSERT(br->disallowed_byte_offset == 0);
listRelease(c->clientDurabilityInfo.blocked_responses);
listRelease(c->reply);
zfree(c);
// Case 3: Response starts in next reply block
c = zcalloc(sizeof(client));
c->clientDurabilityInfo.blocked_responses = listCreate();
listSetFreeMethod(c->clientDurabilityInfo.blocked_responses, zfree);
c->reply = listCreate();
listSetFreeMethod(c->reply, zfree);
first = createReplyBlock(4);
second = createReplyBlock(6);
listAddNodeTail(c->reply, first);
listAddNodeTail(c->reply, second);
c->clientDurabilityInfo.offset.recorded = true;
c->clientDurabilityInfo.offset.reply_block = listFirst(c->reply);
c->clientDurabilityInfo.offset.byte_offset = 4;
blockLastResponseIfExist(c, 7);
TEST_ASSERT(listLength(c->clientDurabilityInfo.blocked_responses) == 1);
br = listNodeValue(listFirst(c->clientDurabilityInfo.blocked_responses));
TEST_ASSERT(br->primary_repl_offset == 7);
TEST_ASSERT(br->disallowed_reply_block == listFirst(c->reply)->next);
TEST_ASSERT(br->disallowed_byte_offset == 0);
listRelease(c->clientDurabilityInfo.blocked_responses);
listRelease(c->reply);
zfree(c);
return 0;
}
/**
* Test durablePurgeAndGetUncommittedKeyOffset function
*/
int test_durablePurgeAndGetUncommittedKeyOffset(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);
serverDb **old_db = server.db;
int old_dbnum = server.dbnum;
char *old_primary_host = server.primary_host;
int old_cluster_enabled = server.cluster_enabled;
long long old_previous_acked_offset = server.durability.previous_acked_offset;
server.cluster_enabled = 0;
server.primary_host = NULL;
server.dbnum = 1;
server.db = zcalloc(sizeof(serverDb *));
serverDb *db = zcalloc(sizeof(serverDb));
db->uncommitted_keys = raxNew();
db->dirty_repl_offset = -1;
db->scan_in_progress = 0;
server.db[0] = db;
sds key = sdsnew("key");
long long offset = 10;
raxInsert(db->uncommitted_keys, (unsigned char *)key, sdslen(key), (void *)offset, NULL);
server.durability.previous_acked_offset = 5;
TEST_ASSERT(durablePurgeAndGetUncommittedKeyOffset(key, db) == offset);
void *result = NULL;
TEST_ASSERT(raxFind(db->uncommitted_keys, (unsigned char *)key, sdslen(key), &result));
TEST_ASSERT((long long)result == offset);
server.durability.previous_acked_offset = 10;
TEST_ASSERT(durablePurgeAndGetUncommittedKeyOffset(key, db) == -1);
TEST_ASSERT(!raxFind(db->uncommitted_keys, (unsigned char *)key, sdslen(key), &result));
sdsfree(key);
raxFree(db->uncommitted_keys);
zfree(db);
zfree(server.db);
server.db = old_db;
server.dbnum = old_dbnum;
server.primary_host = old_primary_host;
server.cluster_enabled = old_cluster_enabled;
server.durability.previous_acked_offset = old_previous_acked_offset;
return 0;
}
int test_beforeCommandTrackReplOffset(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);
// Create a mock client
client *c = zcalloc(sizeof(client));
durableClientInit(c);
durableInit();
// Test when durability is enabled
server.durability.sync_replication_enabled = 1;
beforeCommandTrackReplOffset();
TEST_ASSERT(c->clientDurabilityInfo.offset.reply_block == NULL);
TEST_ASSERT(c->clientDurabilityInfo.offset.byte_offset == 0);
TEST_ASSERT(c->clientDurabilityInfo.current_command_repl_offset == 0);
// Cleanup
zfree(c);
// Reset to default
server.durability.sync_replication_enabled = 0;
durableCleanup();
return 0;
}
int test_preCommandExec(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);
initReplyBlockingTestEnv();
struct serverCommand readonly_cmd = {.declared_name = "get", .flags = CMD_READONLY};
// Case 1: Durability disabled, allow command and leave tracking untouched.
client *c = zcalloc(sizeof(client));
c->cmd = &readonly_cmd;
c->clientDurabilityInfo.current_command_repl_offset = 123;
server.durability.sync_replication_enabled = 0;
server.durability.pre_command_replication_offset = 999;
server.primary_repl_offset = 555;
TEST_ASSERT(preCommandExec(c) == CMD_FILTER_ALLOW);
TEST_ASSERT(c->clientDurabilityInfo.current_command_repl_offset == 123);
TEST_ASSERT(server.durability.pre_command_replication_offset == 999);
zfree(c);
// Case 2: Durability enabled on primary, track pre-exec position.
c = zcalloc(sizeof(client));
c->cmd = &readonly_cmd;
c->bufpos = 7;
c->clientDurabilityInfo.current_command_repl_offset = 88;
server.durability.sync_replication_enabled = 1;
server.primary_host = NULL;
server.primary_repl_offset = 1234;
TEST_ASSERT(preCommandExec(c) == CMD_FILTER_ALLOW);
TEST_ASSERT(c->clientDurabilityInfo.current_command_repl_offset == -1);
TEST_ASSERT(c->clientDurabilityInfo.offset.recorded == true);
TEST_ASSERT(c->clientDurabilityInfo.offset.reply_block == NULL);
TEST_ASSERT(c->clientDurabilityInfo.offset.byte_offset == 7);
TEST_ASSERT(server.durability.pre_command_replication_offset == 1234);
zfree(c);
server.durability.sync_replication_enabled = 0;
durableCleanup();
durableClientReset(c);
return 0;
}
+114
View File
@@ -0,0 +1,114 @@
# Tests for reply blocking durability feature
# This test suite covers the synchronous replication functionality
# that blocks client responses until replicas acknowledge writes
start_server {tags {"repl durability external:skip"} overrides {sync-replication yes}} {
set primary [srv 0 client]
set primary_host [srv 0 host]
set primary_port [srv 0 port]
start_server {} {
set replica [srv 0 client]
set replica_host [srv 0 host]
set replica_port [srv 0 port]
test "Sync replication blocks replies until replica acks" {
assert_equal "yes" [lindex [$primary config get sync-replication] 1]
set rd [valkey_deferring_client -1]
$rd set durable:blocked value
set fd [$rd channel]
fconfigure $fd -blocking 0
set early_reply [read $fd]
fconfigure $fd -blocking 1
assert_equal "" $early_reply
$replica replicaof $primary_host $primary_port
wait_replica_online $primary
wait_replica_acked_ofs $primary $replica $replica_host $replica_port
assert_equal "OK" [$rd read]
$replica replicaof no one
wait_for_condition 50 100 {
[llength [$primary client list type replica]] == 0
} else {
fail "Primary didn't notice replica disconnect"
}
}
test "Sync replication blocks reads on dirty keys" {
assert_equal "yes" [lindex [$primary config get sync-replication] 1]
set writer [valkey_deferring_client -1]
$writer client reply off
$writer set durable:blocked dirty
set rd [valkey_deferring_client -1]
$rd get durable:blocked
set fd [$rd channel]
fconfigure $fd -blocking 0
set early_reply [read $fd]
fconfigure $fd -blocking 1
assert_equal "" $early_reply
$replica replicaof $primary_host $primary_port
wait_replica_online $primary
wait_replica_acked_ofs $primary $replica $replica_host $replica_port
assert_equal "dirty" [$rd read]
}
test "Sync replication toggling disables reply blocking" {
assert_equal "OK" [$primary config set sync-replication no]
assert_equal "no" [lindex [$primary config get sync-replication] 1]
set writer [valkey_deferring_client -1]
$writer client reply off
$writer set durable:toggle value
set rd [valkey_deferring_client -1]
$rd get durable:toggle
assert_equal "value" [$rd read]
assert_equal "OK" [$primary config set sync-replication yes]
assert_equal "yes" [lindex [$primary config get sync-replication] 1]
}
test "Disabling sync replication unblocks pending replies" {
assert_equal "yes" [lindex [$primary config get sync-replication] 1]
set rd [valkey_deferring_client -1]
$rd set durable:toggle-blocked value
set fd [$rd channel]
fconfigure $fd -blocking 0
set early_reply [read $fd]
assert_equal "" $early_reply
assert_equal "OK" [$primary config set sync-replication no]
assert_equal "no" [lindex [$primary config get sync-replication] 1]
set raw_reply ""
set got_reply 0
for {set i 0} {$i < 50} {incr i} {
append raw_reply [read $fd]
if {[string match "*\r\n" $raw_reply]} {
set got_reply 1
break
}
after 100
}
if {!$got_reply} {
fail "Reply didn't unblock after disabling sync replication"
}
fconfigure $fd -blocking 1
assert_match "+OK*" $raw_reply
assert_equal "OK" [$primary config set sync-replication yes]
assert_equal "yes" [lindex [$primary config get sync-replication] 1]
}
}
}