diff --git a/src/commandlog.c b/src/commandlog.c index e01b7f92e..2730e89ef 100644 --- a/src/commandlog.c +++ b/src/commandlog.c @@ -97,6 +97,7 @@ void commandlogInit(void) { } } + /* Push a new entry into the command log. * This function will make sure to trim the command log accordingly to the * configured max length. */ diff --git a/src/commandlog.h b/src/commandlog.h index 825014746..58a72f4c0 100644 --- a/src/commandlog.h +++ b/src/commandlog.h @@ -48,5 +48,4 @@ typedef struct commandlogEntry { /* Exported API */ void commandlogInit(void); - #endif /* __COMMANDLOG_H__ */ diff --git a/src/config.c b/src/config.c index 5ff533ab3..2de95577a 100644 --- a/src/config.c +++ b/src/config.c @@ -2663,6 +2663,12 @@ int updateAppendFsync(const char **err) { return 1; } +static int updateSynReplicationEnabled(const char **err) { + UNUSED(err); + durabilityReset(); + return 1; +} + /* applyBind affects both TCP and TLS (if enabled) together */ static int applyBind(const char **err) { connListener *tcp_listener = listenerByType(CONN_TYPE_SOCKET); @@ -3251,6 +3257,7 @@ standardConfig static_configs[] = { createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL), createBoolConfig("lua-enable-insecure-api", "lua-enable-deprecated-api", MODIFIABLE_CONFIG | HIDDEN_CONFIG | PROTECTED_CONFIG, server.lua_enable_insecure_api, 0, NULL, updateLuaEnableInsecureApi), createBoolConfig("import-mode", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.import_mode, 0, NULL, NULL), + createBoolConfig("sync-replication", NULL, MODIFIABLE_CONFIG, server.durability.sync_replication_enabled, 0, NULL, updateSynReplicationEnabled), /* String Configs */ createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL), diff --git a/src/reply_blocking.c b/src/reply_blocking.c index ee6e569c0..2610630f8 100644 --- a/src/reply_blocking.c +++ b/src/reply_blocking.c @@ -1,6 +1,7 @@ #include "reply_blocking.h" #include "expire.h" #include "server.h" +#include "zmalloc.h" #include #include @@ -41,9 +42,7 @@ static long long getSingleCommandBlockingOffsetForConsistentWrites(struct client * return 1 if durability is enabled, 0 otherwise. */ int isDurabilityEnabled(void) { - // should this have its own flag? - // or general 'durability flag' - return true; + return server.durability.sync_replication_enabled; } int isPrimaryDurabilityEnabled(void) { @@ -232,6 +231,15 @@ static int unblockClientWaitingReplicaAck(struct client *c) { return 0; } +/* Add a free function for blockedResponse entries */ +static void blockedResponseFree(void *value) { + blockedResponse *br = (blockedResponse *)value; + /* If the blocked response ever contains dynamically allocated fields, + free them here. For now the structure only holds pointers owned + elsewhere, so we simply free the structure itself. */ + zfree(br); +} + /** * Initialize the durable write client attributes when client is created */ @@ -241,7 +249,7 @@ void durableClientInit(struct client *c) { } if (c->clientDurabilityInfo.blocked_responses == NULL) { c->clientDurabilityInfo.blocked_responses = listCreate(); - listSetFreeMethod(c->clientDurabilityInfo.blocked_responses, zfree); + listSetFreeMethod(c->clientDurabilityInfo.blocked_responses, blockedResponseFree); resetPreExecutionOffset(c); c->clientDurabilityInfo.current_command_repl_offset = -1; } @@ -395,7 +403,7 @@ void blockClientOnReplOffset(struct client *c, long long blockingReplOffset) { /* If needed, we block the client and put it into our list of clients * waiting for ack from slaves. */ if (isBlockingNeededForOffset(c, blockingReplOffset)) { - serverLog(LOG_DEBUG, "client should be blocked at offset %lld,", blockingReplOffset); + serverLog(LL_DEBUG, "client should be blocked at offset %lld,", blockingReplOffset); blockLastResponseIfExist(c, blockingReplOffset); if (!c->clientDurabilityInfo.durable_blocked_client) { listAddNodeTail(server.durability.clients_waiting_replica_ack, c); @@ -436,12 +444,11 @@ static void blockClientAndMonitorsOnReplOffset(struct client *c, long long block * @param consensus_ack_offset Repl offset that have been acked by the required number of replicas */ void unblockResponsesWithAckOffset(struct durable_t *durability, long long consensus_ack_offset) { - serverLog(LOG_DEBUG, "unblocking clients for consensus offset %lld,", consensus_ack_offset); + serverLog(LL_DEBUG, "unblocking clients for consensus offset %lld,", consensus_ack_offset); // Traverses through all the clients that wait for replica ack - listIter li, li_response; - listNode *ln, *ln_response; + listIter li; + listNode *ln; listRewind(durability->clients_waiting_replica_ack, &li); - blockedResponse *br = NULL; while ((ln = listNext(&li))) { client *c = ln->value; @@ -450,16 +457,16 @@ void unblockResponsesWithAckOffset(struct durable_t *durability, long long conse // ACK'ed by the required number of replicas // If the max repl offset is acked, all blocked responses will be unblocked serverAssert(c->clientDurabilityInfo.blocked_responses != NULL); - listRewind(c->clientDurabilityInfo.blocked_responses, &li_response); bool unblocked_responses = false; - while ((ln_response = listNext(&li_response))) { - br = listNodeValue(ln_response); + // Keep deleting from the front while the first response can be unblocked + while (listLength(c->clientDurabilityInfo.blocked_responses) > 0) { + listNode *first = listFirst(c->clientDurabilityInfo.blocked_responses); + blockedResponse *br = listNodeValue(first); + if (br->primary_repl_offset <= consensus_ack_offset) { unblockFirstResponse(c); - if (unblocked_responses == false) { - unblocked_responses = true; - } + unblocked_responses = true; } else { // As soon as we encounter a client response that has the // required reply offset greater than the replicas ACK'ed offset, @@ -486,7 +493,7 @@ void unblockResponsesWithAckOffset(struct durable_t *durability, long long conse /* Check if there are clients blocked that can be unblocked since * we received enough ACKs from replicas */ void postReplicaAck(void) { - serverLog(LOG_DEBUG, "postReplicaAck hook entered"); + serverLog(LL_DEBUG, "postReplicaAck hook entered"); if (!isPrimaryDurabilityEnabled()) { return; } @@ -650,6 +657,29 @@ void clearUncommittedKeysAcknowledged(void) { } } +void durableInitDatabase(serverDb *db) { + db->uncommitted_keys = raxNew(); + db->dirty_repl_offset = -1; + db->scan_in_progress = 0; +} + +/** + * Utility function to clear all uncommitted keys for each database + */ +static inline void clearAllUncommittedKeys(void) { + serverLog(LL_NOTICE, "Clearing all uncommitted keys for sync replication"); + for (int i = 0; i < server.dbnum; i++) { + serverDb *db = server.db[i]; + if (db == NULL) continue; + raxFree(db->uncommitted_keys); + if (db->scan_in_progress) { + raxStop(&db->next_scan_iter); + } + durableInitDatabase(db); + } + server.durability.curr_db_scan_idx = 0; +} + /*========================== Command access validation ====================== */ /** @@ -738,11 +768,13 @@ static long long getSingleCommandBlockingOffsetForReplicatingCommand(client *c) if (c->cmd->proc == moveCommand) { // TODO: support MOVE command: we need to mark the key as dirty in the destination DB // dont block for now + getKeysFreeResult(&result); return -1; } else if (c->cmd->proc == copyCommand) { // TODO: handle copy command // handle the dirty keys in the destination db // dont block for now + getKeysFreeResult(&result); return -1; } @@ -767,6 +799,7 @@ static long long getSingleCommandBlockingOffsetForNonReplicatingCommand(client * long long blocking_repl_offset = -1; // TODO: handle function, module, etc if (c->cmd->flags & (CMD_READONLY | CMD_WRITE)) { + serverLog(LL_DEBUG, "getSingleCommandBlockingOffsetForNonReplicatingCommand for command"); // For read/write commands that didn't generate replication data, we would block // on the highest offset of all accessed uncommitted keys and the valkey DBs itself. // Note some commands categorized as writes can perform read only operations @@ -834,12 +867,12 @@ static long long getSingleCommandBlockingOffsetForConsistentWrites(struct client * about to be executed. */ void beforeCommandTrackReplOffset(void) { - serverLog(LOG_DEBUG, "preCall hook entered"); + serverLog(LL_DEBUG, "preCall hook entered"); if (!isPrimaryDurabilityEnabled()) return; server.durability.pre_call_replication_offset = server.primary_repl_offset; server.durability.pre_call_num_ops_pending_propagation = server.also_propagate.numops; - serverLog(LOG_DEBUG, "preCall hook: pre_call_replication_offset=%lld, pre_call_num_ops_pending_propagation=%d", + serverLog(LL_DEBUG, "preCall hook: pre_call_replication_offset=%lld, pre_call_num_ops_pending_propagation=%d", server.durability.pre_call_replication_offset, server.durability.pre_call_num_ops_pending_propagation); } @@ -857,7 +890,7 @@ void beforeCommandTrackReplOffset(void) { */ void afterCommandTrackReplOffset(struct client *c) { // log debug tracing - serverLog(LOG_DEBUG, "Call hook entered for command '%s'", c->cmd->declared_name); + serverLog(LL_DEBUG, "Call hook entered for command '%s'", c->cmd->declared_name); if (!isPrimaryDurabilityEnabled() || (c->flag.blocked)) return; @@ -884,10 +917,10 @@ void afterCommandTrackReplOffset(struct client *c) { * in the reply COB of the client and all the connected monitors. */ int preCommandExec(struct client *c) { - serverLog(LOG_DEBUG, "preCommandExec hook entered for command '%s'", + serverLog(LL_DEBUG, "preCommandExec hook entered for command '%s'", c->cmd ? c->cmd->declared_name : "NULL"); if (!isDurabilityEnabled()) { - serverLog(LOG_DEBUG, "preCommandExec hook: durability not enabled, allowing"); + serverLog(LL_DEBUG, "preCommandExec hook: durability not enabled, allowing"); return CMD_FILTER_ALLOW; } @@ -930,7 +963,7 @@ void postCommandExec(struct client *c) { if (!isPrimaryDurabilityEnabled()) { return; } - serverLog(LOG_DEBUG, "postCommandExec hook entered for command '%s'", + serverLog(LL_DEBUG, "postCommandExec hook entered for command '%s'", c->cmd ? c->cmd->declared_name : "NULL"); // If the command is NULL or is in a MULTI/EXEC block, then we skip // TODO: handle multi @@ -955,7 +988,6 @@ void postCommandExec(struct client *c) { /** * Function used to initialize the durability datastructures. - * TODO: exit clean up? */ void durableInit(void) { // Initialize synchronous replication @@ -963,3 +995,75 @@ void durableInit(void) { server.durability.curr_db_scan_idx = 0; server.durability.clients_waiting_replica_ack = listCreate(); } + +/** + * Function used to cleanup the durability datastructures on server shutdown. + */ +void durableCleanup(void) { + serverLog(LL_DEBUG, "Cleanup reply blocking structures"); + server.durability.replica_offsets_size = 0; + zfree(server.durability.replica_offsets); + + if (server.durability.clients_waiting_replica_ack != NULL) { + listRelease(server.durability.clients_waiting_replica_ack); + server.durability.clients_waiting_replica_ack = NULL; + } + + clearAllUncommittedKeys(); +} + +/** + * Utility function to release buffer used for replica offsets + */ +static inline void releaseReplicaOffsetBuffer(void) { + server.durability.replica_offsets_size = 0; + zfree(server.durability.replica_offsets); + server.durability.replica_offsets = NULL; +} + +/** + * Function to reset primary state for synchronous replication + * This function will be invoked at primary when sync replication is dynamically disabled or it becomes a replica. + * Both cases require related resources to be reset to initial state. The only difference is how we handle the + * clients waiting for replica ACK. If sync replication is disabled, all blocked responses and keyspace + * notifications will be flushed for these clients, whose connections to primary will still be maintained. + * TODO: if the primary becomes a replica, all these clients should be disconnected and freed. + */ +static inline void syncReplicationResetPrimaryState() { + // Release buffer we use for replica offsets + releaseReplicaOffsetBuffer(); + + if (listLength(server.durability.clients_waiting_replica_ack) > 0) { + // Flush all blocked response and keyspace notifications to clients waiting for replica ACK + unblockResponsesWithAckOffset(&server.durability, LLONG_MAX); + // Make sure there is no clients waiting for replica ACK + serverAssert(listLength(server.durability.clients_waiting_replica_ack) == 0); + } +} + +/** + * Reset related resources when disabling synchronous replication + * This method is invoked when user turns off durability via config set command + */ +void durabilityReset(void) { + if (isDurabilityEnabled()) { + // To enable sync replication, we update the pre-command offset so that the CONFIG SET command + // itself doesn't get inadvertently blocked because the primary replication offset is greater + // than the stale pre_command_replication_offset. + server.durability.pre_command_replication_offset = server.primary_repl_offset; + listIter li; + listNode *ln; + listRewind(server.clients, &li); + while ((ln = listNext(&li)) != NULL) { + client *c = listNodeValue(ln); + durableClientInit(c); + } + } else { + // To disable durable write, we need to flush all blocked responses and keyspace + // notifications, and then reset all durability related resources to initial state at the primary node. + if (iAmPrimary()) { + syncReplicationResetPrimaryState(); + } + clearAllUncommittedKeys(); + } +} diff --git a/src/reply_blocking.h b/src/reply_blocking.h index 798a25693..52429cc7b 100644 --- a/src/reply_blocking.h +++ b/src/reply_blocking.h @@ -24,6 +24,8 @@ typedef long long mstime_t; * Durability container to house all the durability related fields. */ typedef struct durable_t { + /* Flag to enable/disable sync replication (durability) */ + int sync_replication_enabled; /* Uncommitted keys cleanup configuration time limit in milliseconds */ unsigned int keys_cleanup_time_limit_ms; /* The current scanning database index, starting from 0 */ @@ -98,9 +100,12 @@ typedef struct clientDurabilityInfo { * Init */ void durableInit(void); +void durableCleanup(void); +void durabilityReset(void); void durableClientInit(struct client *c); void durableClientReset(struct client *c); -/* + +/** Command processing hooks for offset and cob tracking */ void beforeCommandTrackReplOffset(void); diff --git a/src/server.c b/src/server.c index 4e5db679f..759fd1893 100644 --- a/src/server.c +++ b/src/server.c @@ -3033,12 +3033,7 @@ void initServer(void) { /* Initialize the EVAL scripting component. */ evalInit(); - commandlogInit(); - latencyMonitorInit(); - initSharedQueryBuf(); durableInit(); - /* Initialize ACL default password if it exists */ - ACLUpdateDefaultUserPassword(server.requirepass); applyWatchdogPeriod(); @@ -4792,6 +4787,9 @@ int finishShutdown(void) { /* Fire the shutdown modules event. */ moduleFireServerEvent(VALKEYMODULE_EVENT_SHUTDOWN, 0, NULL); + /* Cleanup durability tracking resources. */ + durableCleanup(); + /* Remove the pid file if possible and needed. */ if (server.daemonize || server.pidfile) { serverLog(LL_NOTICE, "Removing the pid file."); diff --git a/src/server.h b/src/server.h index f2e1f2629..1abc0ced9 100644 --- a/src/server.h +++ b/src/server.h @@ -950,6 +950,7 @@ typedef struct serverDb { long long dirty_repl_offset; /* Replication offset for a dirty DB */ raxIterator next_scan_iter; /* The next iterator for db scan */ int scan_in_progress; /* Flag of showing whether db is in scan or not */ + rax *reply_duration; /* Radix tree tracking reply durations for durable blocked clients */ } serverDb; /* forward declaration for functions ctx */ diff --git a/src/unit/test_entry.c b/src/unit/test_entry.c index d51d91b7e..5287d1fea 100644 --- a/src/unit/test_entry.c +++ b/src/unit/test_entry.c @@ -169,7 +169,7 @@ int test_entryUpdate(int argc, char **argv, int flags) { // Update the value so that memory usage is less than 3/4 of the current allocation size // Ensuring required_embedded_size < current_embedded_allocation_size * 3 / 4, which creates a new entry size_t current_embedded_allocation_size = entryMemUsage(e9); - sds value10 = sdsnew("xxxxxxxxxxxxxxxxxxxxx"); + sds value10 = sdsnew("xxxxxx"); sds value_copy10 = sdsdup(value10); long long expiry10 = expiry9; entry *e10 = entryUpdate(e9, value10, expiry10); diff --git a/src/unit/test_files.h b/src/unit/test_files.h index 30042dbf4..607a7fca5 100644 --- a/src/unit/test_files.h +++ b/src/unit/test_files.h @@ -214,6 +214,15 @@ int test_raxBenchmark(int argc, char **argv, int flags); int test_raxHugeKey(int argc, char **argv, int flags); int test_raxFuzz(int argc, char **argv, int flags); int test_raxRecompressHugeKey(int argc, char **argv, int flags); +int test_durableClientInitAndReset(int argc, char **argv, int flags); +int test_isDurabilityEnabled(int argc, char **argv, int flags); +int test_isPrimaryDurabilityEnabled(int argc, char **argv, int flags); +int test_isClientReplyBufferLimited(int argc, char **argv, int flags); +int test_durableInit(int argc, char **argv, int flags); +int test_blockLastResponseIfExist(int argc, char **argv, int flags); +int test_durablePurgeAndGetUncommittedKeyOffset(int argc, char **argv, int flags); +int test_beforeCommandTrackReplOffset(int argc, char **argv, int flags); +int test_preCommandExec(int argc, char **argv, int flags); int test_sds(int argc, char **argv, int flags); int test_typesAndAllocSize(int argc, char **argv, int flags); int test_sdsHeaderSizes(int argc, char **argv, int flags); @@ -300,6 +309,7 @@ unitTest __test_networking_c[] = {{"test_writeToReplica", test_writeToReplica}, unitTest __test_object_c[] = {{"test_object_with_key", test_object_with_key}, {"test_embedded_string_with_key", test_embedded_string_with_key}, {"test_embedded_string_with_key_and_expire", test_embedded_string_with_key_and_expire}, {"test_embedded_value", test_embedded_value}, {"test_unembed_value", test_unembed_value}, {NULL, NULL}}; unitTest __test_quicklist_c[] = {{"test_quicklistCreateList", test_quicklistCreateList}, {"test_quicklistAddToTailOfEmptyList", test_quicklistAddToTailOfEmptyList}, {"test_quicklistAddToHeadOfEmptyList", test_quicklistAddToHeadOfEmptyList}, {"test_quicklistAddToTail5xAtCompress", test_quicklistAddToTail5xAtCompress}, {"test_quicklistAddToHead5xAtCompress", test_quicklistAddToHead5xAtCompress}, {"test_quicklistAddToTail500xAtCompress", test_quicklistAddToTail500xAtCompress}, {"test_quicklistAddToHead500xAtCompress", test_quicklistAddToHead500xAtCompress}, {"test_quicklistRotateEmpty", test_quicklistRotateEmpty}, {"test_quicklistComprassionPlainNode", test_quicklistComprassionPlainNode}, {"test_quicklistNextPlainNode", test_quicklistNextPlainNode}, {"test_quicklistRotatePlainNode", test_quicklistRotatePlainNode}, {"test_quicklistRotateOneValOnce", test_quicklistRotateOneValOnce}, {"test_quicklistRotate500Val5000TimesAtCompress", test_quicklistRotate500Val5000TimesAtCompress}, {"test_quicklistPopEmpty", test_quicklistPopEmpty}, {"test_quicklistPop1StringFrom1", test_quicklistPop1StringFrom1}, {"test_quicklistPopHead1NumberFrom1", test_quicklistPopHead1NumberFrom1}, {"test_quicklistPopHead500From500", test_quicklistPopHead500From500}, {"test_quicklistPopHead5000From500", test_quicklistPopHead5000From500}, {"test_quicklistIterateForwardOver500List", test_quicklistIterateForwardOver500List}, {"test_quicklistIterateReverseOver500List", test_quicklistIterateReverseOver500List}, {"test_quicklistInsertAfter1Element", test_quicklistInsertAfter1Element}, {"test_quicklistInsertBefore1Element", test_quicklistInsertBefore1Element}, {"test_quicklistInsertHeadWhileHeadNodeIsFull", test_quicklistInsertHeadWhileHeadNodeIsFull}, {"test_quicklistInsertTailWhileTailNodeIsFull", test_quicklistInsertTailWhileTailNodeIsFull}, {"test_quicklistInsertOnceInElementsWhileIteratingAtCompress", test_quicklistInsertOnceInElementsWhileIteratingAtCompress}, {"test_quicklistInsertBefore250NewInMiddleOf500ElementsAtCompress", test_quicklistInsertBefore250NewInMiddleOf500ElementsAtCompress}, {"test_quicklistInsertAfter250NewInMiddleOf500ElementsAtCompress", test_quicklistInsertAfter250NewInMiddleOf500ElementsAtCompress}, {"test_quicklistDuplicateEmptyList", test_quicklistDuplicateEmptyList}, {"test_quicklistDuplicateListOf1Element", test_quicklistDuplicateListOf1Element}, {"test_quicklistDuplicateListOf500", test_quicklistDuplicateListOf500}, {"test_quicklistIndex1200From500ListAtFill", test_quicklistIndex1200From500ListAtFill}, {"test_quicklistIndex12From500ListAtFill", test_quicklistIndex12From500ListAtFill}, {"test_quicklistIndex100From500ListAtFill", test_quicklistIndex100From500ListAtFill}, {"test_quicklistIndexTooBig1From50ListAtFill", test_quicklistIndexTooBig1From50ListAtFill}, {"test_quicklistDeleteRangeEmptyList", test_quicklistDeleteRangeEmptyList}, {"test_quicklistDeleteRangeOfEntireNodeInListOfOneNode", test_quicklistDeleteRangeOfEntireNodeInListOfOneNode}, {"test_quicklistDeleteRangeOfEntireNodeWithOverflowCounts", test_quicklistDeleteRangeOfEntireNodeWithOverflowCounts}, {"test_quicklistDeleteMiddle100Of500List", test_quicklistDeleteMiddle100Of500List}, {"test_quicklistDeleteLessThanFillButAcrossNodes", test_quicklistDeleteLessThanFillButAcrossNodes}, {"test_quicklistDeleteNegative1From500List", test_quicklistDeleteNegative1From500List}, {"test_quicklistDeleteNegative1From500ListWithOverflowCounts", test_quicklistDeleteNegative1From500ListWithOverflowCounts}, {"test_quicklistDeleteNegative100From500List", test_quicklistDeleteNegative100From500List}, {"test_quicklistDelete10Count5From50List", test_quicklistDelete10Count5From50List}, {"test_quicklistNumbersOnlyListRead", test_quicklistNumbersOnlyListRead}, {"test_quicklistNumbersLargerListRead", test_quicklistNumbersLargerListRead}, {"test_quicklistNumbersLargerListReadB", test_quicklistNumbersLargerListReadB}, {"test_quicklistLremTestAtCompress", test_quicklistLremTestAtCompress}, {"test_quicklistIterateReverseDeleteAtCompress", test_quicklistIterateReverseDeleteAtCompress}, {"test_quicklistIteratorAtIndexTestAtCompress", test_quicklistIteratorAtIndexTestAtCompress}, {"test_quicklistLtrimTestAAtCompress", test_quicklistLtrimTestAAtCompress}, {"test_quicklistLtrimTestBAtCompress", test_quicklistLtrimTestBAtCompress}, {"test_quicklistLtrimTestCAtCompress", test_quicklistLtrimTestCAtCompress}, {"test_quicklistLtrimTestDAtCompress", test_quicklistLtrimTestDAtCompress}, {"test_quicklistVerifySpecificCompressionOfInteriorNodes", test_quicklistVerifySpecificCompressionOfInteriorNodes}, {"test_quicklistBookmarkGetUpdatedToNextItem", test_quicklistBookmarkGetUpdatedToNextItem}, {"test_quicklistBookmarkLimit", test_quicklistBookmarkLimit}, {"test_quicklistCompressAndDecompressQuicklistListpackNode", test_quicklistCompressAndDecompressQuicklistListpackNode}, {"test_quicklistCompressAndDecomressQuicklistPlainNodeLargeThanUINT32MAX", test_quicklistCompressAndDecomressQuicklistPlainNodeLargeThanUINT32MAX}, {NULL, NULL}}; unitTest __test_rax_c[] = {{"test_raxRandomWalk", test_raxRandomWalk}, {"test_raxIteratorUnitTests", test_raxIteratorUnitTests}, {"test_raxTryInsertUnitTests", test_raxTryInsertUnitTests}, {"test_raxRegressionTest1", test_raxRegressionTest1}, {"test_raxRegressionTest2", test_raxRegressionTest2}, {"test_raxRegressionTest3", test_raxRegressionTest3}, {"test_raxRegressionTest4", test_raxRegressionTest4}, {"test_raxRegressionTest5", test_raxRegressionTest5}, {"test_raxRegressionTest6", test_raxRegressionTest6}, {"test_raxBenchmark", test_raxBenchmark}, {"test_raxHugeKey", test_raxHugeKey}, {"test_raxFuzz", test_raxFuzz}, {"test_raxRecompressHugeKey", test_raxRecompressHugeKey}, {NULL, NULL}}; +unitTest __test_reply_blocking_c[] = {{"test_durableClientInitAndReset", test_durableClientInitAndReset}, {"test_isDurabilityEnabled", test_isDurabilityEnabled}, {"test_isPrimaryDurabilityEnabled", test_isPrimaryDurabilityEnabled}, {"test_isClientReplyBufferLimited", test_isClientReplyBufferLimited}, {"test_durableInit", test_durableInit}, {"test_blockLastResponseIfExist", test_blockLastResponseIfExist}, {"test_durablePurgeAndGetUncommittedKeyOffset", test_durablePurgeAndGetUncommittedKeyOffset}, {"test_beforeCommandTrackReplOffset", test_beforeCommandTrackReplOffset}, {"test_preCommandExec", test_preCommandExec}, {NULL, NULL}}; unitTest __test_sds_c[] = {{"test_sds", test_sds}, {"test_typesAndAllocSize", test_typesAndAllocSize}, {"test_sdsHeaderSizes", test_sdsHeaderSizes}, {"test_sdssplitargs", test_sdssplitargs}, {"test_sdsnsplitargs", test_sdsnsplitargs}, {"test_sdsnsplitargsBenchmark", test_sdsnsplitargsBenchmark}, {NULL, NULL}}; unitTest __test_sha1_c[] = {{"test_sha1", test_sha1}, {NULL, NULL}}; unitTest __test_sha256_c[] = {{"test_sha256_abc", test_sha256_abc}, {"test_sha256_large", test_sha256_large}, {"test_sha256_million_a", test_sha256_million_a}, {NULL, NULL}}; @@ -331,6 +341,7 @@ struct unitTestSuite { {"test_object.c", __test_object_c}, {"test_quicklist.c", __test_quicklist_c}, {"test_rax.c", __test_rax_c}, + {"test_reply_blocking.c", __test_reply_blocking_c}, {"test_sds.c", __test_sds_c}, {"test_sha1.c", __test_sha1_c}, {"test_sha256.c", __test_sha256_c}, diff --git a/src/unit/test_reply_blocking.c b/src/unit/test_reply_blocking.c new file mode 100644 index 000000000..92b988024 --- /dev/null +++ b/src/unit/test_reply_blocking.c @@ -0,0 +1,397 @@ +#include "../fmacros.h" +#include "../reply_blocking.h" +#include "test_help.h" +#include "../server.h" +#include +#include +#include + +void blockLastResponseIfExist(struct client *c, long long blocked_offset); + +static void initReplyBlockingTestEnv(void) { + static char test_logfile[] = ""; + if (server.logfile == NULL) { + /* serverLogRaw dereferences server.logfile; use stdout for unit tests. */ + server.logfile = test_logfile; + } +} + +static clientReplyBlock *createReplyBlock(size_t used) { + clientReplyBlock *block = zmalloc(sizeof(clientReplyBlock) + used); + block->size = used; + block->used = used; + block->last_header = NULL; + block->flag.buf_encoded = 0; + return block; +} + +/** + * Test durableClientInit and durableClientReset functions + * These functions initialize and reset client durability attributes + */ +int test_durableClientInitAndReset(int argc, char **argv, int flags) { + UNUSED(argc); + UNUSED(argv); + UNUSED(flags); + initReplyBlockingTestEnv(); + + // Create a mock client + client *c = zcalloc(sizeof(client)); + c->clientDurabilityInfo.blocked_responses = NULL; + c->clientDurabilityInfo.durable_blocked_client = 0; + c->clientDurabilityInfo.current_command_repl_offset = 0; + + // Test initialization when durability is disabled + server.durability.sync_replication_enabled = 0; + durableClientInit(c); + TEST_ASSERT(c->clientDurabilityInfo.blocked_responses == NULL); + + // Test initialization when durability is enabled + server.durability.sync_replication_enabled = 1; + durableClientInit(c); + TEST_ASSERT(c->clientDurabilityInfo.blocked_responses != NULL); + TEST_ASSERT(listLength(c->clientDurabilityInfo.blocked_responses) == 0); + TEST_ASSERT(c->clientDurabilityInfo.offset.recorded == false); + TEST_ASSERT(c->clientDurabilityInfo.offset.reply_block == NULL); + TEST_ASSERT(c->clientDurabilityInfo.offset.byte_offset == 0); + TEST_ASSERT(c->clientDurabilityInfo.current_command_repl_offset == -1); + + // Test reset + durableClientReset(c); + TEST_ASSERT(c->clientDurabilityInfo.blocked_responses == NULL); + TEST_ASSERT(c->clientDurabilityInfo.offset.recorded == false); + TEST_ASSERT(c->clientDurabilityInfo.current_command_repl_offset == -1); + + // Reset to default + server.durability.sync_replication_enabled = 0; + + // Cleanup + zfree(c); + + return 0; +} + +/** + * Test isDurabilityEnabled function + */ +int test_isDurabilityEnabled(int argc, char **argv, int flags) { + UNUSED(argc); + UNUSED(argv); + UNUSED(flags); + + // Test that durability is disabled by default + server.durability.sync_replication_enabled = 0; + TEST_ASSERT(isDurabilityEnabled() == 0); + + // Test that durability can be enabled + server.durability.sync_replication_enabled = 1; + TEST_ASSERT(isDurabilityEnabled() == 1); + + // Reset to default + server.durability.sync_replication_enabled = 0; + + return 0; +} + +/** + * Test isPrimaryDurabilityEnabled function + */ +int test_isPrimaryDurabilityEnabled(int argc, char **argv, int flags) { + UNUSED(argc); + UNUSED(argv); + UNUSED(flags); + + // Enable sync replication for testing + server.durability.sync_replication_enabled = 1; + + // Test when server is primary (not a replica) + server.primary_host = NULL; + TEST_ASSERT(isPrimaryDurabilityEnabled() == 1); + + // Test when server is a replica + server.primary_host = sdsnew("127.0.0.1"); + TEST_ASSERT(isPrimaryDurabilityEnabled() == 0); + + // Test when durability is disabled but server is primary + server.durability.sync_replication_enabled = 0; + sdsfree(server.primary_host); + server.primary_host = NULL; + TEST_ASSERT(isPrimaryDurabilityEnabled() == 0); + + // Cleanup and reset + server.durability.sync_replication_enabled = 0; + + return 0; +} + +/** + * Test isClientReplyBufferLimited function + */ +int test_isClientReplyBufferLimited(int argc, char **argv, int flags) { + UNUSED(argc); + UNUSED(argv); + UNUSED(flags); + + // Create a mock client + client *c = zcalloc(sizeof(client)); + + // Case 1: No blocked_responses list + c->clientDurabilityInfo.blocked_responses = NULL; + TEST_ASSERT(isClientReplyBufferLimited(c) == false); + + // Case 2: Empty blocked_responses list + c->clientDurabilityInfo.blocked_responses = listCreate(); + TEST_ASSERT(isClientReplyBufferLimited(c) == false); + + // Case 3: Non-empty blocked_responses list + blockedResponse *br = zcalloc(sizeof(blockedResponse)); + br->primary_repl_offset = 100; + br->disallowed_byte_offset = 0; + br->disallowed_reply_block = NULL; + listAddNodeTail(c->clientDurabilityInfo.blocked_responses, br); + TEST_ASSERT(isClientReplyBufferLimited(c) == true); + + // Cleanup + listSetFreeMethod(c->clientDurabilityInfo.blocked_responses, zfree); + listRelease(c->clientDurabilityInfo.blocked_responses); + zfree(c); + + return 0; +} + +/** + * Test durableInit function + */ +int test_durableInit(int argc, char **argv, int flags) { + UNUSED(argc); + UNUSED(argv); + UNUSED(flags); + + // Clean up any existing state + if (server.durability.clients_waiting_replica_ack != NULL) { + listRelease(server.durability.clients_waiting_replica_ack); + } + + // Initialize durability + durableInit(); + + // Verify initialization + TEST_ASSERT(server.durability.sync_replication_enabled == 0); + TEST_ASSERT(server.durability.clients_waiting_replica_ack != NULL); + TEST_ASSERT(listLength(server.durability.clients_waiting_replica_ack) == 0); + TEST_ASSERT(server.durability.previous_acked_offset == -1); + TEST_ASSERT(server.durability.curr_db_scan_idx == 0); + + // Cleanup to prevent memory leak + listRelease(server.durability.clients_waiting_replica_ack); + server.durability.clients_waiting_replica_ack = NULL; + + return 0; +} + +/** + * Test blockLastResponseIfExist function + */ +int test_blockLastResponseIfExist(int argc, char **argv, int flags) { + UNUSED(argc); + UNUSED(argv); + UNUSED(flags); + + // Case 1: Response appended to initial buffer + client *c = zcalloc(sizeof(client)); + c->clientDurabilityInfo.blocked_responses = listCreate(); + listSetFreeMethod(c->clientDurabilityInfo.blocked_responses, zfree); + c->bufpos = 5; + c->clientDurabilityInfo.offset.recorded = true; + c->clientDurabilityInfo.offset.reply_block = NULL; + c->clientDurabilityInfo.offset.byte_offset = 3; + c->reply = listCreate(); + + blockLastResponseIfExist(c, 42); + TEST_ASSERT(listLength(c->clientDurabilityInfo.blocked_responses) == 1); + blockedResponse *br = listNodeValue(listFirst(c->clientDurabilityInfo.blocked_responses)); + TEST_ASSERT(br->primary_repl_offset == 42); + TEST_ASSERT(br->disallowed_reply_block == NULL); + TEST_ASSERT(br->disallowed_byte_offset == 3); + + listRelease(c->clientDurabilityInfo.blocked_responses); + listRelease(c->reply); + zfree(c); + + // Case 2: Response appended to reply list (first block) + c = zcalloc(sizeof(client)); + c->clientDurabilityInfo.blocked_responses = listCreate(); + listSetFreeMethod(c->clientDurabilityInfo.blocked_responses, zfree); + c->reply = listCreate(); + listSetFreeMethod(c->reply, zfree); + clientReplyBlock *first = createReplyBlock(4); + clientReplyBlock *second = createReplyBlock(2); + listAddNodeTail(c->reply, first); + listAddNodeTail(c->reply, second); + c->bufpos = 3; + c->clientDurabilityInfo.offset.recorded = true; + c->clientDurabilityInfo.offset.reply_block = NULL; + c->clientDurabilityInfo.offset.byte_offset = 3; + + blockLastResponseIfExist(c, 99); + TEST_ASSERT(listLength(c->clientDurabilityInfo.blocked_responses) == 1); + br = listNodeValue(listFirst(c->clientDurabilityInfo.blocked_responses)); + TEST_ASSERT(br->primary_repl_offset == 99); + TEST_ASSERT(br->disallowed_reply_block == listFirst(c->reply)); + TEST_ASSERT(br->disallowed_byte_offset == 0); + + listRelease(c->clientDurabilityInfo.blocked_responses); + listRelease(c->reply); + zfree(c); + + // Case 3: Response starts in next reply block + c = zcalloc(sizeof(client)); + c->clientDurabilityInfo.blocked_responses = listCreate(); + listSetFreeMethod(c->clientDurabilityInfo.blocked_responses, zfree); + c->reply = listCreate(); + listSetFreeMethod(c->reply, zfree); + first = createReplyBlock(4); + second = createReplyBlock(6); + listAddNodeTail(c->reply, first); + listAddNodeTail(c->reply, second); + c->clientDurabilityInfo.offset.recorded = true; + c->clientDurabilityInfo.offset.reply_block = listFirst(c->reply); + c->clientDurabilityInfo.offset.byte_offset = 4; + + blockLastResponseIfExist(c, 7); + TEST_ASSERT(listLength(c->clientDurabilityInfo.blocked_responses) == 1); + br = listNodeValue(listFirst(c->clientDurabilityInfo.blocked_responses)); + TEST_ASSERT(br->primary_repl_offset == 7); + TEST_ASSERT(br->disallowed_reply_block == listFirst(c->reply)->next); + TEST_ASSERT(br->disallowed_byte_offset == 0); + + listRelease(c->clientDurabilityInfo.blocked_responses); + listRelease(c->reply); + zfree(c); + + return 0; +} + +/** + * Test durablePurgeAndGetUncommittedKeyOffset function + */ +int test_durablePurgeAndGetUncommittedKeyOffset(int argc, char **argv, int flags) { + UNUSED(argc); + UNUSED(argv); + UNUSED(flags); + + serverDb **old_db = server.db; + int old_dbnum = server.dbnum; + char *old_primary_host = server.primary_host; + int old_cluster_enabled = server.cluster_enabled; + long long old_previous_acked_offset = server.durability.previous_acked_offset; + + server.cluster_enabled = 0; + server.primary_host = NULL; + server.dbnum = 1; + server.db = zcalloc(sizeof(serverDb *)); + + serverDb *db = zcalloc(sizeof(serverDb)); + db->uncommitted_keys = raxNew(); + db->dirty_repl_offset = -1; + db->scan_in_progress = 0; + server.db[0] = db; + + sds key = sdsnew("key"); + long long offset = 10; + raxInsert(db->uncommitted_keys, (unsigned char *)key, sdslen(key), (void *)offset, NULL); + + server.durability.previous_acked_offset = 5; + TEST_ASSERT(durablePurgeAndGetUncommittedKeyOffset(key, db) == offset); + + void *result = NULL; + TEST_ASSERT(raxFind(db->uncommitted_keys, (unsigned char *)key, sdslen(key), &result)); + TEST_ASSERT((long long)result == offset); + + server.durability.previous_acked_offset = 10; + TEST_ASSERT(durablePurgeAndGetUncommittedKeyOffset(key, db) == -1); + TEST_ASSERT(!raxFind(db->uncommitted_keys, (unsigned char *)key, sdslen(key), &result)); + + sdsfree(key); + raxFree(db->uncommitted_keys); + zfree(db); + zfree(server.db); + + server.db = old_db; + server.dbnum = old_dbnum; + server.primary_host = old_primary_host; + server.cluster_enabled = old_cluster_enabled; + server.durability.previous_acked_offset = old_previous_acked_offset; + + return 0; +} + +int test_beforeCommandTrackReplOffset(int argc, char **argv, int flags) { + UNUSED(argc); + UNUSED(argv); + UNUSED(flags); + + // Create a mock client + client *c = zcalloc(sizeof(client)); + durableClientInit(c); + durableInit(); + + // Test when durability is enabled + server.durability.sync_replication_enabled = 1; + beforeCommandTrackReplOffset(); + TEST_ASSERT(c->clientDurabilityInfo.offset.reply_block == NULL); + TEST_ASSERT(c->clientDurabilityInfo.offset.byte_offset == 0); + TEST_ASSERT(c->clientDurabilityInfo.current_command_repl_offset == 0); + + // Cleanup + zfree(c); + + // Reset to default + server.durability.sync_replication_enabled = 0; + + durableCleanup(); + return 0; +} + +int test_preCommandExec(int argc, char **argv, int flags) { + UNUSED(argc); + UNUSED(argv); + UNUSED(flags); + + initReplyBlockingTestEnv(); + + struct serverCommand readonly_cmd = {.declared_name = "get", .flags = CMD_READONLY}; + + // Case 1: Durability disabled, allow command and leave tracking untouched. + client *c = zcalloc(sizeof(client)); + c->cmd = &readonly_cmd; + c->clientDurabilityInfo.current_command_repl_offset = 123; + server.durability.sync_replication_enabled = 0; + server.durability.pre_command_replication_offset = 999; + server.primary_repl_offset = 555; + TEST_ASSERT(preCommandExec(c) == CMD_FILTER_ALLOW); + TEST_ASSERT(c->clientDurabilityInfo.current_command_repl_offset == 123); + TEST_ASSERT(server.durability.pre_command_replication_offset == 999); + zfree(c); + + // Case 2: Durability enabled on primary, track pre-exec position. + c = zcalloc(sizeof(client)); + c->cmd = &readonly_cmd; + c->bufpos = 7; + c->clientDurabilityInfo.current_command_repl_offset = 88; + server.durability.sync_replication_enabled = 1; + server.primary_host = NULL; + server.primary_repl_offset = 1234; + TEST_ASSERT(preCommandExec(c) == CMD_FILTER_ALLOW); + TEST_ASSERT(c->clientDurabilityInfo.current_command_repl_offset == -1); + TEST_ASSERT(c->clientDurabilityInfo.offset.recorded == true); + TEST_ASSERT(c->clientDurabilityInfo.offset.reply_block == NULL); + TEST_ASSERT(c->clientDurabilityInfo.offset.byte_offset == 7); + TEST_ASSERT(server.durability.pre_command_replication_offset == 1234); + zfree(c); + + server.durability.sync_replication_enabled = 0; + durableCleanup(); + durableClientReset(c); + return 0; +} diff --git a/tests/durability/reply_blocking.tcl b/tests/durability/reply_blocking.tcl new file mode 100644 index 000000000..1c26eb606 --- /dev/null +++ b/tests/durability/reply_blocking.tcl @@ -0,0 +1,114 @@ +# Tests for reply blocking durability feature +# This test suite covers the synchronous replication functionality +# that blocks client responses until replicas acknowledge writes + +start_server {tags {"repl durability external:skip"} overrides {sync-replication yes}} { + set primary [srv 0 client] + set primary_host [srv 0 host] + set primary_port [srv 0 port] + + start_server {} { + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + + test "Sync replication blocks replies until replica acks" { + assert_equal "yes" [lindex [$primary config get sync-replication] 1] + + set rd [valkey_deferring_client -1] + $rd set durable:blocked value + + set fd [$rd channel] + fconfigure $fd -blocking 0 + set early_reply [read $fd] + fconfigure $fd -blocking 1 + assert_equal "" $early_reply + + $replica replicaof $primary_host $primary_port + wait_replica_online $primary + wait_replica_acked_ofs $primary $replica $replica_host $replica_port + + assert_equal "OK" [$rd read] + + $replica replicaof no one + wait_for_condition 50 100 { + [llength [$primary client list type replica]] == 0 + } else { + fail "Primary didn't notice replica disconnect" + } + } + + test "Sync replication blocks reads on dirty keys" { + assert_equal "yes" [lindex [$primary config get sync-replication] 1] + + set writer [valkey_deferring_client -1] + $writer client reply off + $writer set durable:blocked dirty + + set rd [valkey_deferring_client -1] + $rd get durable:blocked + + set fd [$rd channel] + fconfigure $fd -blocking 0 + set early_reply [read $fd] + fconfigure $fd -blocking 1 + assert_equal "" $early_reply + + $replica replicaof $primary_host $primary_port + wait_replica_online $primary + wait_replica_acked_ofs $primary $replica $replica_host $replica_port + + assert_equal "dirty" [$rd read] + } + + test "Sync replication toggling disables reply blocking" { + assert_equal "OK" [$primary config set sync-replication no] + assert_equal "no" [lindex [$primary config get sync-replication] 1] + + set writer [valkey_deferring_client -1] + $writer client reply off + $writer set durable:toggle value + + set rd [valkey_deferring_client -1] + $rd get durable:toggle + assert_equal "value" [$rd read] + + assert_equal "OK" [$primary config set sync-replication yes] + assert_equal "yes" [lindex [$primary config get sync-replication] 1] + } + + test "Disabling sync replication unblocks pending replies" { + assert_equal "yes" [lindex [$primary config get sync-replication] 1] + + set rd [valkey_deferring_client -1] + $rd set durable:toggle-blocked value + + set fd [$rd channel] + fconfigure $fd -blocking 0 + set early_reply [read $fd] + assert_equal "" $early_reply + + assert_equal "OK" [$primary config set sync-replication no] + assert_equal "no" [lindex [$primary config get sync-replication] 1] + + set raw_reply "" + set got_reply 0 + for {set i 0} {$i < 50} {incr i} { + append raw_reply [read $fd] + if {[string match "*\r\n" $raw_reply]} { + set got_reply 1 + break + } + after 100 + } + if {!$got_reply} { + fail "Reply didn't unblock after disabling sync replication" + } + fconfigure $fd -blocking 1 + assert_match "+OK*" $raw_reply + + assert_equal "OK" [$primary config set sync-replication yes] + assert_equal "yes" [lindex [$primary config get sync-replication] 1] + } + } +}