mirror of
https://github.com/valkey-io/valkey.git
synced 2026-05-06 05:26:42 -04:00
Reply Copy Avoidance (#2078)
### Overview This PR introduces the ability to avoid copying the content of string object into replies (i.e. bulk string replies) and to allow I/O threads refer directly to obj->ptr in writev iov. ### Key Changes * Added capability to reply construction allowing to interleave regular replies with copy avoid replies in client reply buffers * Extended write-to-client handlers to support copy avoid replies * Added copy avoidance of string bulk replies when copy avoidance indicated by I/O threads * Minor changes in cluster slots stats in order to support `network-bytes-out` for copy avoid replies * Copy avoidance is beneficial for performance despite object size only starting certain number of threads. So it will be enabled only starting certain number of threads. Internal configuration ``min-io-threads-copy-avoid`` introduced to manage this number of threads **Note**: When copy avoidance disabled content and handling of client reply buffers remains as before this PR ### Implementation Details #### ``client`` and ``clientReplyBlock`` structs: 1. ``buf_encoded`` flag has been added to ``clientReplyBlock`` struct and to ``client`` struct for static ``c->buf`` to indicate if reply buffer is in copy avoidance mode (i.e. include headers and payloads) or not (i.e. plain replies only). 2. ``io_last_written_buf``, ``io_last_written_bufpos``, ``io_last_written_data_len`` fields added ``client`` struct to to keep track of write state between ``writevToClient`` invocations #### Reply construction: 1. Original ```_addReplyToBuffer``` and ```_addReplyProtoToList``` have been renamed to ```_addReplyPayloadToBuffer``` and ```_addReplyPayloadToList``` and extended to support different types of payloads - regular replies and copy avoid replies. 3. New ```_addReplyToBuffer``` and ```_addReplyProtoToList``` calls now ```_addReplyPayloadToBuffer``` and ```_addReplyPayloadToList``` and used for adding **regular** replies to client reply buffers. 4. Newly introduced ```_addBulkOffloadToBuffer``` and ```_addBulkOffloadToList``` are used for adding **copy avoid** replies to client reply buffers. #### Write-to-client infrastructure: The ```writevToClient``` and ```_postWriteToClient``` has been significantly changed to support copy avoidance capability. #### Debug configuration: 1. ``min-io-threads-avoid-copy-reply`` - Minimum number of IO threads for copy avoidance 2. ``min-string-size-avoid-copy-reply`` - Minimum bulk string size for copy avoidance when IO threads disabled 3. ``min-string-size-avoid-copy-reply-threaded`` - Minimum bulk string size for copy avoidance when IO threads enabled ### Testing 1. Existing unit and integration tests passed. Copy avoidance enabled on tests with ``--io-threads`` flag 2. Added unit tests for copy avoidance functionality ### Performance Tests Note: pay attention `io-threads 1` config means only main thread with no additional io-threads, `io-threads 2` means main thread plus 1 I/O thread, `io-threads 9` means main thread plus 8 I/O threads. #### 512 byte object size Tests are conducted on memory optimized instances using: * 3,000,000 keys * 512 bytes object size * 1000 clients |io-threads (including main thread) |Plain Reply |Copy Avoidance | |--- |--- |--- | |7 |1,160,000 |1,160,000 | |8 |1,150,000 |1,280,000 | |9 |1,150,000 |1,330,000 | |10 |N/A |1,380,000 | |11 |N/A |1,420,000 | #### Various object size, small number of threads |iothreads |Data size |Keys |Clients |Instance type |Unstable branch |Copy Avoidance On | |--- |--- |--- |--- |--- |--- |--- | |1 |512 byte |3,000,000 |1,000 |memory optimized |195,000 |195,000 | |2 |512 byte |3,000,000 |1,000 |memory optimized |245,000 |245,000 | |3 |512 byte |3,000,000 |1,000 |memory optimized |455,000 |459,000 | |4 |512 byte |3,000,000 |1,000 |memory optimized |685,000 |685,000 | | | | | | | | | |1 |1K |3,000,000 |1,000 |memory optimized |185,000 |185,000 | |2 |1K |3,000,000 |1,000 |memory optimized |235,000 |235,000 | |3 |1K |3,000,000 |1,000 |memory optimized |450,000 |450,000 | | | | | | | | | |1 |4K |1,000,000 |1,000 |network optimized |182,000 |187,000 | |2 |4K |1,000,000 |1,000 |network optimized |240,000 |238,000 | | | | | | | | | |1 |16K |1,000,000 |500 |network optimized |100,000 |120,000 | |2 |16K |1,000,000 |500 |network optimized |140,000 |140,000 | |3 |16K |1,000,000 |500 |network optimized |275,000 |260,000 | | | | | | | | | |1 |32K |500,000 |500 |network optimized |57,000 |90,000 | |2 |32K |500,000 |500 |network optimized |110,000 |110,000 | |3 |32K |500,000 |500 |network optimized |215,000 |215,000 | | | | | | | | | |1 |64K |100,000 |500 |network optimized |30,000 |57,000 | |2 |64K |100,000 |500 |network optimized |69,000 |61,000 | |3 |64K |100,000 |500 |network optimized |120,000 |120,000 | |4 |64K |100,000 |500 |network optimized |115,000 - 175,000 |175,000 | |5 |64K |100,000 |500 |network optimized |115,000 - 165,000 |230,000 | --------- Signed-off-by: Alexander Shabanov <alexander.shabanov@gmail.com> Signed-off-by: xbasel <103044017+xbasel@users.noreply.github.com> Signed-off-by: Madelyn Olson <madelyneolson@gmail.com> Co-authored-by: Alexander Shabanov <alexander.shabanov@gmail.com> Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
This commit is contained in:
+17
-25
@@ -131,22 +131,23 @@ static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], lon
|
||||
}
|
||||
}
|
||||
|
||||
static int canAddNetworkBytesOut(client *c) {
|
||||
return server.cluster_slot_stats_enabled && server.cluster_enabled && c->slot != -1;
|
||||
/* Accumulates egress bytes for the slot. */
|
||||
void clusterSlotStatsAddNetworkBytesOutForSlot(int slot, unsigned long long net_bytes_out) {
|
||||
if (!clusterSlotStatsEnabled(slot)) return;
|
||||
|
||||
serverAssert(slot >= 0 && slot < CLUSTER_SLOTS);
|
||||
server.cluster->slot_stats[slot].network_bytes_out += net_bytes_out;
|
||||
}
|
||||
|
||||
/* Accumulates egress bytes upon sending RESP responses back to user clients. */
|
||||
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c) {
|
||||
if (!canAddNetworkBytesOut(c)) return;
|
||||
|
||||
serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
|
||||
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;
|
||||
clusterSlotStatsAddNetworkBytesOutForSlot(c->slot, c->net_output_bytes_curr_cmd);
|
||||
}
|
||||
|
||||
/* Accumulates egress bytes upon sending replication stream. This only applies for primary nodes. */
|
||||
static void clusterSlotStatsUpdateNetworkBytesOutForReplication(long long len) {
|
||||
client *c = server.current_client;
|
||||
if (c == NULL || !canAddNetworkBytesOut(c)) return;
|
||||
if (c == NULL || !clusterSlotStatsEnabled(c->slot)) return;
|
||||
|
||||
/* We multiply the bytes len by the number of replicas to account for us broadcasting to multiple replicas at once. */
|
||||
len *= (long long)listLength(server.replicas);
|
||||
@@ -177,24 +178,14 @@ void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len) {
|
||||
* This type is not aggregated, to stay consistent with server.stat_net_output_bytes aggregation.
|
||||
* This function covers the internal propagation component. */
|
||||
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot) {
|
||||
/* For a blocked client, c->slot could be pre-filled.
|
||||
* Thus c->slot is backed-up for restoration after aggregation is completed. */
|
||||
int _slot = c->slot;
|
||||
c->slot = slot;
|
||||
if (!canAddNetworkBytesOut(c)) {
|
||||
/* c->slot should not change as a side effect of this function,
|
||||
* regardless of the function's early return condition. */
|
||||
c->slot = _slot;
|
||||
return;
|
||||
}
|
||||
if (!clusterSlotStatsEnabled(slot)) return;
|
||||
|
||||
serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
|
||||
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;
|
||||
serverAssert(slot >= 0 && slot < CLUSTER_SLOTS);
|
||||
server.cluster->slot_stats[slot].network_bytes_out += c->net_output_bytes_curr_cmd;
|
||||
|
||||
/* For sharded pubsub, the client's network bytes metrics must be reset here,
|
||||
* as resetClient() is not called until subscription ends. */
|
||||
c->net_output_bytes_curr_cmd = 0;
|
||||
c->slot = _slot;
|
||||
}
|
||||
|
||||
/* Adds reply for the ORDERBY variant.
|
||||
@@ -222,9 +213,7 @@ void clusterSlotStatResetAll(void) {
|
||||
* would equate to repeating the same calculation twice.
|
||||
*/
|
||||
static int canAddCpuDuration(client *c) {
|
||||
return server.cluster_slot_stats_enabled && /* Config should be enabled. */
|
||||
server.cluster_enabled && /* Cluster mode should be enabled. */
|
||||
c->slot != -1 && /* Command should be slot specific. */
|
||||
return clusterSlotStatsEnabled(c->slot) &&
|
||||
(!server.execution_nesting || /* Either; */
|
||||
(server.execution_nesting && /* 1) Command should not be nested, or */
|
||||
c->realcmd->flags & CMD_BLOCKING)); /* 2) If command is nested, it must be due to unblocking. */
|
||||
@@ -251,8 +240,7 @@ static int canAddNetworkBytesIn(client *c) {
|
||||
* Third, blocked client is not aggregated, to avoid duplicate aggregation upon unblocking.
|
||||
* Fourth, the server is not under a MULTI/EXEC transaction, to avoid duplicate aggregation of
|
||||
* EXEC's 14 bytes RESP upon nested call()'s afterCommand(). */
|
||||
return server.cluster_enabled && server.cluster_slot_stats_enabled && c->slot != -1 && !(c->flag.blocked) &&
|
||||
!server.in_exec;
|
||||
return clusterSlotStatsEnabled(c->slot) && !(c->flag.blocked) && !server.in_exec;
|
||||
}
|
||||
|
||||
/* Adds network ingress bytes of the current command in execution,
|
||||
@@ -346,3 +334,7 @@ void clusterSlotStatsCommand(client *c) {
|
||||
addReplySubcommandSyntaxError(c);
|
||||
}
|
||||
}
|
||||
|
||||
int clusterSlotStatsEnabled(int slot) {
|
||||
return server.cluster_slot_stats_enabled && server.cluster_enabled && slot != -1;
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
/* General use-cases. */
|
||||
void clusterSlotStatReset(int slot);
|
||||
void clusterSlotStatResetAll(void);
|
||||
int clusterSlotStatsEnabled(int slot);
|
||||
|
||||
/* cpu-usec metric. */
|
||||
void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration);
|
||||
@@ -17,6 +18,7 @@ void clusterSlotStatsSetClusterMsgLength(uint32_t len);
|
||||
void clusterSlotStatsResetClusterMsgLength(void);
|
||||
|
||||
/* network-bytes-out metric. */
|
||||
void clusterSlotStatsAddNetworkBytesOutForSlot(int slot, unsigned long long net_bytes_out);
|
||||
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c);
|
||||
void clusterSlotStatsIncrNetworkBytesOutForReplication(long long len);
|
||||
void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len);
|
||||
|
||||
@@ -3262,6 +3262,9 @@ standardConfig static_configs[] = {
|
||||
createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */
|
||||
createIntConfig("io-threads", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, 1, IO_THREADS_MAX_NUM, server.io_threads_num, 1, INTEGER_CONFIG, NULL, updateIOThreads), /* Single threaded by default */
|
||||
createIntConfig("events-per-io-thread", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.events_per_io_thread, 2, INTEGER_CONFIG, NULL, NULL),
|
||||
createIntConfig("min-io-threads-avoid-copy-reply", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.min_io_threads_copy_avoid, 7, INTEGER_CONFIG, NULL, NULL),
|
||||
createIntConfig("min-string-size-avoid-copy-reply", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.min_string_size_copy_avoid, 16384, INTEGER_CONFIG, NULL, NULL),
|
||||
createIntConfig("min-string-size-avoid-copy-reply-threaded", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.min_string_size_copy_avoid_threaded, 65536, INTEGER_CONFIG, NULL, NULL),
|
||||
createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, NULL),
|
||||
createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL),
|
||||
createIntConfig("cluster-replica-validity-factor", "cluster-slave-validity-factor", MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_replica_validity_factor, 10, INTEGER_CONFIG, NULL, NULL), /* replica max data age factor. */
|
||||
|
||||
+6
-1
@@ -447,9 +447,14 @@ int trySendWriteToIOThreads(client *c) {
|
||||
* threads from reading data that might be invalid in their local CPU cache. */
|
||||
c->io_last_reply_block = listLast(c->reply);
|
||||
if (c->io_last_reply_block) {
|
||||
c->io_last_bufpos = ((clientReplyBlock *)listNodeValue(c->io_last_reply_block))->used;
|
||||
clientReplyBlock *block = (clientReplyBlock *)listNodeValue(c->io_last_reply_block);
|
||||
c->io_last_bufpos = block->used;
|
||||
/* If buffer is encoded force new header */
|
||||
if (block->flag.buf_encoded) block->last_header = NULL;
|
||||
} else {
|
||||
c->io_last_bufpos = (size_t)c->bufpos;
|
||||
/* If buffer is encoded force new header */
|
||||
if (c->flag.buf_encoded) c->last_header = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
|
||||
#include "memory_prefetch.h"
|
||||
#include "server.h"
|
||||
#include "io_threads.h"
|
||||
|
||||
typedef enum {
|
||||
PREFETCH_ENTRY, /* Initial state, prefetch entries associated with the given key's hash */
|
||||
@@ -120,6 +121,10 @@ static void prefetchEntry(KeyPrefetchInfo *info) {
|
||||
if (hashtableIncrementalFindStep(&info->hashtab_state) == 1) {
|
||||
/* Not done yet */
|
||||
moveToNextKey();
|
||||
} else if (server.io_threads_num >= server.min_io_threads_copy_avoid) {
|
||||
/* Copy avoidance should be more efficient without value prefetch
|
||||
* starting certain number of I/O threads */
|
||||
markKeyAsdone(info);
|
||||
} else {
|
||||
info->state = PREFETCH_VALUE;
|
||||
}
|
||||
|
||||
+513
-99
@@ -85,6 +85,33 @@ typedef struct {
|
||||
sds ip;
|
||||
} clientFilter;
|
||||
|
||||
/* Types of payloads in reply buffers (c->buf and c->reply)
|
||||
* Unencoded buffers contain plain replies only
|
||||
* Encoded buffers contain headers followed by either plain replies or
|
||||
* by bulk string references */
|
||||
typedef enum {
|
||||
PLAIN_REPLY = 0, /* plain reply */
|
||||
BULK_STR_REF /* bulk string references */
|
||||
} payloadType;
|
||||
|
||||
/* Encoded reply buffers consist from chunks
|
||||
* Each chunk contains header followed by payload
|
||||
* The packed attribute is specified because buffer is accessed at arbitrary offsets,
|
||||
* so no benefit in data structure padding and applying packed saves the space in the buffer */
|
||||
typedef struct __attribute__((__packed__)) payloadHeader {
|
||||
size_t payload_len; /* payload length in a reply buffer */
|
||||
size_t reply_len; /* actual reply length for non-plain payloads */
|
||||
uint8_t payload_type; /* one of payloadType */
|
||||
int16_t slot; /* to report network-bytes-out for BULK_STR_REF chunks */
|
||||
} payloadHeader;
|
||||
|
||||
/* To avoid copy of whole string in reply buffer
|
||||
* we store pointers to object and string itself */
|
||||
typedef struct __attribute__((__packed__)) bulkStrRef {
|
||||
robj *obj; /* pointer to object used for reference count management */
|
||||
sds str; /* pointer to string to optimize memory access by I/O thread */
|
||||
} bulkStrRef;
|
||||
|
||||
static void setProtocolError(const char *errstr, client *c);
|
||||
static void pauseClientsByClient(mstime_t end, int isPauseClientAll);
|
||||
int postponeClientRead(client *c);
|
||||
@@ -188,6 +215,35 @@ static inline int isReplicaReadyForReplData(client *replica) {
|
||||
!(replica->flag.close_asap);
|
||||
}
|
||||
|
||||
/* Decides if copy avoidance is preferred according to client type, number of I/O threads, object size
|
||||
* Maybe called with NULL obj for evaluation with no regard to object size
|
||||
* Copy avoidance can be allowed only for regular Valkey clients
|
||||
* that use _writeToClient handler to write replies to client connection */
|
||||
static int isCopyAvoidPreferred(client *c, robj *obj) {
|
||||
if (c->flag.fake || isDeferredReplyEnabled(c)) return 0;
|
||||
|
||||
int type = getClientType(c);
|
||||
if (type != CLIENT_TYPE_NORMAL && type != CLIENT_TYPE_PUBSUB) return 0;
|
||||
|
||||
if (obj) {
|
||||
if (obj->encoding != OBJ_ENCODING_RAW) return 0;
|
||||
if (obj->refcount == OBJ_STATIC_REFCOUNT) return 0;
|
||||
}
|
||||
|
||||
/* Copy avoidance is preferred for any string size starting certain number of I/O threads */
|
||||
if (server.min_io_threads_copy_avoid && server.io_threads_num >= server.min_io_threads_copy_avoid) return 1;
|
||||
|
||||
if (!obj) return 0;
|
||||
|
||||
/* Main thread only. No I/O threads */
|
||||
if (server.io_threads_num == 1) {
|
||||
/* Copy avoidance is preferred starting certain string size */
|
||||
return server.min_string_size_copy_avoid && sdslen(obj->ptr) >= (size_t)server.min_string_size_copy_avoid;
|
||||
}
|
||||
/* Main thread + I/O threads */
|
||||
return server.min_string_size_copy_avoid_threaded && sdslen(obj->ptr) >= (size_t)server.min_string_size_copy_avoid_threaded;
|
||||
}
|
||||
|
||||
client *createClient(connection *conn) {
|
||||
client *c = zmalloc(sizeof(client));
|
||||
|
||||
@@ -215,6 +271,7 @@ client *createClient(connection *conn) {
|
||||
c->lib_name = NULL;
|
||||
c->lib_ver = NULL;
|
||||
c->bufpos = 0;
|
||||
c->last_header = NULL;
|
||||
c->buf_peak = c->buf_usable_size;
|
||||
c->buf_peak_last_reset_time = server.unixtime;
|
||||
c->qb_pos = 0;
|
||||
@@ -234,7 +291,6 @@ client *createClient(connection *conn) {
|
||||
c->cur_script = NULL;
|
||||
c->multibulklen = 0;
|
||||
c->bulklen = -1;
|
||||
c->sentlen = 0;
|
||||
c->raw_flag = 0;
|
||||
c->capa = 0;
|
||||
c->slot = -1;
|
||||
@@ -275,6 +331,9 @@ client *createClient(connection *conn) {
|
||||
c->commands_processed = 0;
|
||||
c->io_last_reply_block = NULL;
|
||||
c->io_last_bufpos = 0;
|
||||
c->io_last_written.buf = NULL;
|
||||
c->io_last_written.bufpos = 0;
|
||||
c->io_last_written.data_len = 0;
|
||||
return c;
|
||||
}
|
||||
|
||||
@@ -418,6 +477,41 @@ void deleteCachedResponseClient(client *recording_client) {
|
||||
* Low level functions to add more data to output buffers.
|
||||
* -------------------------------------------------------------------------- */
|
||||
|
||||
/* Updates an existing header, if possible; otherwise inserts a new one
|
||||
* Returns the length of data that can be added to the reply buffer (i.e. min(available, requested)) */
|
||||
static size_t upsertPayloadHeader(char *buf, size_t *bufpos, payloadHeader **last_header, uint8_t type, size_t len, int slot, size_t available) {
|
||||
/* Enforce min len for BULK_STR_REF chunks as whole pointers must be written to the buffer */
|
||||
size_t min_len = (type == BULK_STR_REF ? len : 1);
|
||||
if (min_len > available) return 0;
|
||||
size_t allowed_len = min(available, len);
|
||||
|
||||
// If cluster slots stats disabled set slot to -1 to prevent excessive per slot headers
|
||||
if (!clusterSlotStatsEnabled(slot)) slot = -1;
|
||||
|
||||
/* Try to add payload to last chunk if possible */
|
||||
if (*last_header != NULL && (*last_header)->payload_type == type && (*last_header)->slot == slot) {
|
||||
(*last_header)->payload_len += allowed_len;
|
||||
return allowed_len;
|
||||
}
|
||||
|
||||
/* Recheck min len condition and recalculate allowed len with a new header to be added */
|
||||
if (sizeof(payloadHeader) + min_len > available) return 0;
|
||||
available -= sizeof(payloadHeader);
|
||||
if (len > available) allowed_len = available;
|
||||
|
||||
/* Start a new payload chunk */
|
||||
*last_header = (payloadHeader *)(buf + *bufpos);
|
||||
|
||||
(*last_header)->payload_type = type;
|
||||
(*last_header)->payload_len = allowed_len;
|
||||
(*last_header)->slot = slot;
|
||||
(*last_header)->reply_len = 0;
|
||||
|
||||
*bufpos += sizeof(payloadHeader);
|
||||
|
||||
return allowed_len;
|
||||
}
|
||||
|
||||
/* Attempts to add the reply to the static buffer in the client struct.
|
||||
* Returns the length of data that is added to the reply buffer.
|
||||
*
|
||||
@@ -425,27 +519,53 @@ void deleteCachedResponseClient(client *recording_client) {
|
||||
* zmalloc_usable_size() call. Writing beyond client->buf boundaries confuses
|
||||
* sanitizer and generates a false positive out-of-bounds error */
|
||||
VALKEY_NO_SANITIZE("bounds")
|
||||
size_t _addReplyToBuffer(client *c, const char *s, size_t len) {
|
||||
size_t available = c->buf_usable_size - c->bufpos;
|
||||
static size_t _addReplyPayloadToBuffer(client *c, const void *payload, size_t len, uint8_t payload_type) {
|
||||
/* If the debug enforcing to use the reply list is enabled.*/
|
||||
if (server.debug_client_enforce_reply_list) return 0;
|
||||
/* If there already are entries in the reply list, we cannot
|
||||
* add anything more to the static buffer. */
|
||||
if (listLength(c->reply) > 0) return 0;
|
||||
|
||||
size_t reply_len = len > available ? available : len;
|
||||
memcpy(c->buf + c->bufpos, s, reply_len);
|
||||
size_t available = c->buf_usable_size - c->bufpos;
|
||||
size_t reply_len = min(available, len);
|
||||
if (c->flag.buf_encoded) {
|
||||
reply_len = upsertPayloadHeader(c->buf, &c->bufpos, &c->last_header, payload_type, len, c->slot, available);
|
||||
}
|
||||
if (!reply_len) return 0;
|
||||
|
||||
memcpy(c->buf + c->bufpos, payload, reply_len);
|
||||
c->bufpos += reply_len;
|
||||
/* We update the buffer peak after appending the reply to the buffer */
|
||||
if (c->buf_peak < (size_t)c->bufpos) c->buf_peak = (size_t)c->bufpos;
|
||||
return reply_len;
|
||||
}
|
||||
|
||||
/* Adds the reply to the reply linked list.
|
||||
static size_t _addReplyToBuffer(client *c, const char *s, size_t len) {
|
||||
if (!len) return 0;
|
||||
if (!c->bufpos) {
|
||||
c->flag.buf_encoded = isCopyAvoidPreferred(c, NULL);
|
||||
}
|
||||
return _addReplyPayloadToBuffer(c, s, len, PLAIN_REPLY);
|
||||
}
|
||||
|
||||
/* Adds bulk string reference (i.e. pointer to object and pointer to string itself) to static buffer
|
||||
* Returns non-zero value if succeeded to add */
|
||||
static size_t _addBulkStrRefToBuffer(client *c, const void *payload, size_t len) {
|
||||
if (!c->flag.buf_encoded) {
|
||||
/* If buffer is plain and not empty then can't add bulk string reference to it */
|
||||
if (c->bufpos) return 0;
|
||||
c->flag.buf_encoded = 1;
|
||||
}
|
||||
return _addReplyPayloadToBuffer(c, payload, len, BULK_STR_REF);
|
||||
}
|
||||
|
||||
/* Adds the payload to the reply linked list.
|
||||
* Note: some edits to this function need to be relayed to AddReplyFromClient. */
|
||||
void _addReplyProtoToList(client *c, list *reply_list, const char *s, size_t len) {
|
||||
static void _addReplyPayloadToList(client *c, list *reply_list, const char *payload, size_t len, uint8_t payload_type) {
|
||||
listNode *ln = listLast(reply_list);
|
||||
clientReplyBlock *tail = ln ? listNodeValue(ln) : NULL;
|
||||
/* Determine if encoded buffer is required */
|
||||
int encoded = payload_type == BULK_STR_REF || isCopyAvoidPreferred(c, NULL);
|
||||
|
||||
/* Note that 'tail' may be NULL even if we have a tail node, because when
|
||||
* addReplyDeferredLen() is used, it sets a dummy node to NULL just
|
||||
@@ -457,22 +577,40 @@ void _addReplyProtoToList(client *c, list *reply_list, const char *s, size_t len
|
||||
* new node */
|
||||
size_t avail = tail->size - tail->used;
|
||||
size_t copy = avail >= len ? len : avail;
|
||||
memcpy(tail->buf + tail->used, s, copy);
|
||||
tail->used += copy;
|
||||
s += copy;
|
||||
len -= copy;
|
||||
|
||||
if (tail->flag.buf_encoded) {
|
||||
copy = upsertPayloadHeader(tail->buf, &tail->used, &tail->last_header, payload_type, len, c->slot, avail);
|
||||
} else if (encoded) {
|
||||
/* If encoded buffer is required but tail is unencoded then pretend nothing can be added to it
|
||||
* and, as consequence, cause addition of a new tail */
|
||||
copy = 0;
|
||||
}
|
||||
|
||||
if (copy) {
|
||||
memcpy(tail->buf + tail->used, payload, copy);
|
||||
tail->used += copy;
|
||||
payload += copy;
|
||||
len -= copy;
|
||||
}
|
||||
}
|
||||
if (len) {
|
||||
/* Create a new node, make sure it is allocated to at
|
||||
* least PROTO_REPLY_CHUNK_BYTES */
|
||||
size_t usable_size;
|
||||
size_t min_reply_size = isDeferredReplyEnabled(c) ? PROTO_REPLY_MIN_BYTES : PROTO_REPLY_CHUNK_BYTES;
|
||||
size_t size = len < min_reply_size ? min_reply_size : len;
|
||||
size_t required_size = encoded ? len + sizeof(payloadHeader) : len;
|
||||
size_t size = required_size < min_reply_size ? min_reply_size : required_size;
|
||||
tail = zmalloc_usable(size + sizeof(clientReplyBlock), &usable_size);
|
||||
/* take over the allocation's internal fragmentation */
|
||||
tail->size = usable_size - sizeof(clientReplyBlock);
|
||||
tail->used = len;
|
||||
memcpy(tail->buf, s, len);
|
||||
tail->used = 0;
|
||||
tail->flag.buf_encoded = encoded;
|
||||
tail->last_header = NULL;
|
||||
if (tail->flag.buf_encoded) {
|
||||
upsertPayloadHeader(tail->buf, &tail->used, &tail->last_header, payload_type, len, c->slot, tail->size);
|
||||
}
|
||||
memcpy(tail->buf + tail->used, payload, len);
|
||||
tail->used += len;
|
||||
listAddNodeTail(reply_list, tail);
|
||||
unsigned long long *reply_bytes = (isDeferredReplyEnabled(c)) ? &c->deferred_reply_bytes : &c->reply_bytes;
|
||||
*reply_bytes += tail->size;
|
||||
@@ -481,6 +619,16 @@ void _addReplyProtoToList(client *c, list *reply_list, const char *s, size_t len
|
||||
}
|
||||
}
|
||||
|
||||
void _addReplyProtoToList(client *c, list *reply_list, const char *s, size_t len) {
|
||||
if (!len) return;
|
||||
_addReplyPayloadToList(c, reply_list, s, len, PLAIN_REPLY);
|
||||
}
|
||||
|
||||
/* Adds bulk string reference (i.e. pointer to object and pointer to string itself) to reply list */
|
||||
static void _addBulkStrRefToToList(client *c, const void *payload, size_t len) {
|
||||
_addReplyPayloadToList(c, c->reply, payload, len, BULK_STR_REF);
|
||||
}
|
||||
|
||||
/* The subscribe / unsubscribe command family has a push as a reply,
|
||||
* or in other words, it responds with a push (or several of them
|
||||
* depending on how many arguments it got), and has no reply. */
|
||||
@@ -530,6 +678,20 @@ void _addReplyToBufferOrList(client *c, const char *s, size_t len) {
|
||||
if (len > reply_len) _addReplyProtoToList(c, c->reply, s + reply_len, len - reply_len);
|
||||
}
|
||||
|
||||
/* Increment reference to object and add pointer to object and
|
||||
* pointer to string itself to current reply buffer */
|
||||
static void _addBulkStrRefToBufferOrList(client *c, robj *obj) {
|
||||
if (c->flag.close_after_reply) return;
|
||||
|
||||
/* Refcount will be decremented in write completion handler by the main thread */
|
||||
incrRefCount(obj);
|
||||
|
||||
bulkStrRef str_ref = {.obj = obj, .str = obj->ptr};
|
||||
if (!_addBulkStrRefToBuffer(c, (void *)&str_ref, sizeof(str_ref))) {
|
||||
_addBulkStrRefToToList(c, (void *)&str_ref, sizeof(str_ref));
|
||||
}
|
||||
}
|
||||
|
||||
/* -----------------------------------------------------------------------------
|
||||
* Higher level functions to queue data on the client output buffer.
|
||||
* The following functions are the ones that commands implementations will call.
|
||||
@@ -823,7 +985,7 @@ void trimReplyUnusedTailSpace(client *c) {
|
||||
* Also, to avoid large memmove which happens as part of realloc, we only do
|
||||
* that if the used part is small. */
|
||||
if (tail->size - tail->used > tail->size / 4 && tail->used < PROTO_REPLY_CHUNK_BYTES &&
|
||||
c->io_write_state != CLIENT_PENDING_IO) {
|
||||
c->io_write_state != CLIENT_PENDING_IO && !tail->flag.buf_encoded) {
|
||||
size_t usable_size;
|
||||
size_t old_size = tail->size;
|
||||
tail = zrealloc_usable(tail, tail->used + sizeof(clientReplyBlock), &usable_size);
|
||||
@@ -884,8 +1046,8 @@ void setDeferredReply(client *c, void *node, const char *s, size_t length) {
|
||||
* - It has enough room already allocated
|
||||
* - And not too large (avoid large memmove)
|
||||
* - And the client is not in a pending I/O state */
|
||||
if (ln->prev != NULL && (prev = listNodeValue(ln->prev)) && prev->used < prev->size &&
|
||||
c->io_write_state != CLIENT_PENDING_IO) {
|
||||
if (ln->prev != NULL && (prev = listNodeValue(ln->prev)) && prev->size > prev->used &&
|
||||
c->io_write_state != CLIENT_PENDING_IO && !prev->flag.buf_encoded) {
|
||||
size_t len_to_copy = prev->size - prev->used;
|
||||
if (len_to_copy > length) len_to_copy = length;
|
||||
memcpy(prev->buf + prev->used, s, len_to_copy);
|
||||
@@ -900,7 +1062,7 @@ void setDeferredReply(client *c, void *node, const char *s, size_t length) {
|
||||
}
|
||||
|
||||
if (ln->next != NULL && (next = listNodeValue(ln->next)) && next->size - next->used >= length &&
|
||||
next->used < PROTO_REPLY_CHUNK_BYTES * 4 && c->io_write_state != CLIENT_PENDING_IO) {
|
||||
next->used < PROTO_REPLY_CHUNK_BYTES * 4 && c->io_write_state != CLIENT_PENDING_IO && !next->flag.buf_encoded) {
|
||||
memmove(next->buf + length, next->buf, next->used);
|
||||
memcpy(next->buf, s, length);
|
||||
c->net_output_bytes_curr_cmd += length;
|
||||
@@ -913,6 +1075,7 @@ void setDeferredReply(client *c, void *node, const char *s, size_t length) {
|
||||
/* Take over the allocation's internal fragmentation */
|
||||
buf->size = usable_size - sizeof(clientReplyBlock);
|
||||
buf->used = length;
|
||||
buf->flag.buf_encoded = 0;
|
||||
memcpy(buf->buf, s, length);
|
||||
c->net_output_bytes_curr_cmd += length;
|
||||
listNodeValue(ln) = buf;
|
||||
@@ -1177,8 +1340,20 @@ void addReplyBulkLen(client *c, robj *obj) {
|
||||
_addReplyLongLongWithPrefix(c, len, '$');
|
||||
}
|
||||
|
||||
/* Try to avoid whole bulk string copy to a reply buffer
|
||||
* If copy avoidance allowed then only pointer to object and string will be copied to the buffer */
|
||||
static int tryAvoidBulkStrCopyToReply(client *c, robj *obj) {
|
||||
if (!isCopyAvoidPreferred(c, obj)) return C_ERR;
|
||||
if (prepareClientToWrite(c) != C_OK) return C_ERR;
|
||||
|
||||
_addBulkStrRefToBufferOrList(c, obj);
|
||||
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
/* Add an Object as a bulk reply */
|
||||
void addReplyBulk(client *c, robj *obj) {
|
||||
if (tryAvoidBulkStrCopyToReply(c, obj) == C_OK) return;
|
||||
addReplyBulkLen(c, obj);
|
||||
addReply(c, obj);
|
||||
addReplyProto(c, "\r\n", 2);
|
||||
@@ -1392,6 +1567,7 @@ void AddReplyFromClient(client *dst, client *src) {
|
||||
}
|
||||
|
||||
/* First add the static buffer (either into the static buffer or reply list) */
|
||||
serverAssert(src->flag.buf_encoded == 0);
|
||||
addReplyProto(dst, src->buf, src->bufpos);
|
||||
|
||||
/* We need to check with prepareClientToWrite again (after addReplyProto)
|
||||
@@ -1824,6 +2000,7 @@ void freeClient(client *c) {
|
||||
freeClientPubSubData(c);
|
||||
|
||||
/* Free data structures. */
|
||||
releaseReplyReferences(c);
|
||||
listRelease(c->reply);
|
||||
c->reply = NULL;
|
||||
zfree_with_size(c->buf, c->buf_usable_size);
|
||||
@@ -2148,19 +2325,199 @@ static void writeToReplica(client *c) {
|
||||
c->nwritten = totwritten;
|
||||
}
|
||||
|
||||
/* Bulk string reply requires 3 iov entries -
|
||||
* length prefix ($<length>\r\n), string (<data>) and suffix (\r\n) */
|
||||
#define NUM_OF_IOV_PER_BULK_STR 3
|
||||
/* Bulk string prefix max size (long + $ + \r\n) */
|
||||
#define BULK_STR_LEN_PREFIX_MAX_SIZE (LONG_STR_SIZE + 3)
|
||||
|
||||
/* This struct is used by writevToClient to prepare iovec array for submitting to connWritev */
|
||||
typedef struct replyIOV {
|
||||
int iovcnt; /* number of elements in iov array */
|
||||
int iovsize; /* capacity of iov array */
|
||||
struct iovec *iov;
|
||||
ssize_t iov_len_total; /* Total length of data pointed by iov array */
|
||||
size_t last_written_len; /* Length of data in the last written buffer
|
||||
* partially written in previous writevToClient invocation */
|
||||
int limit_reached; /* Non zero if either max iov count or NET_MAX_WRITES_PER_EVENT limit
|
||||
* reached during iovec array preparation */
|
||||
/* Auxiliary fields for scattering BUFSTR_REF chunks from encoded buffers */
|
||||
int prfxcnt; /* number of prefixes */
|
||||
char (*prefixes)[BULK_STR_LEN_PREFIX_MAX_SIZE]; /* bulk string prefixes */
|
||||
char *crlf; /* bulk string suffix */
|
||||
} replyIOV;
|
||||
|
||||
/* The bufWriteMetadata struct is used by writevToClient to record metadata
|
||||
* about scattering of reply buffer to iov array */
|
||||
typedef struct bufWriteMetadata {
|
||||
char *buf;
|
||||
size_t bufpos;
|
||||
uint64_t data_len; /* Actual bytes out. Differs from bufpos if buffer encoded */
|
||||
int complete; /* Was the buffer completely scattered to iov or
|
||||
process stopped due encountered limit */
|
||||
} bufWriteMetadata;
|
||||
|
||||
static void initReplyIOV(client *c, int iovsize, struct iovec *iov_arr, char (*prefixes)[], char *crlf, replyIOV *reply) {
|
||||
reply->iovcnt = 0;
|
||||
reply->iovsize = iovsize;
|
||||
reply->limit_reached = 0;
|
||||
reply->iov = iov_arr;
|
||||
reply->iov_len_total = 0;
|
||||
reply->last_written_len = c->io_last_written.data_len;
|
||||
reply->prfxcnt = 0;
|
||||
reply->prefixes = prefixes;
|
||||
reply->crlf = crlf;
|
||||
}
|
||||
|
||||
static void addPlainBufferToReplyIOV(char *buf, size_t buf_len, replyIOV *reply, bufWriteMetadata *metadata) {
|
||||
if (reply->limit_reached) return;
|
||||
|
||||
if (reply->iovcnt == reply->iovsize) {
|
||||
reply->limit_reached = 1;
|
||||
return;
|
||||
}
|
||||
|
||||
/* Aggregate data length from the beginning of the buffer even though
|
||||
* part of the data can be skipped in this writevToClient invocation due to last_written_len */
|
||||
metadata->data_len += buf_len;
|
||||
|
||||
/* Skip data written in the previous writevToClient invocation(s) */
|
||||
if (reply->last_written_len >= buf_len) {
|
||||
reply->last_written_len -= buf_len;
|
||||
return;
|
||||
}
|
||||
|
||||
reply->iov[reply->iovcnt].iov_base = buf + reply->last_written_len;
|
||||
reply->iov[reply->iovcnt].iov_len = buf_len - reply->last_written_len;
|
||||
reply->last_written_len = 0;
|
||||
|
||||
reply->iov_len_total += reply->iov[reply->iovcnt++].iov_len;
|
||||
}
|
||||
|
||||
static void addBulkStringToReplyIOV(char *buf, size_t buf_len, replyIOV *reply, bufWriteMetadata *metadata) {
|
||||
bulkStrRef *str_ref = (bulkStrRef *)buf;
|
||||
while (buf_len > 0 && !reply->limit_reached) {
|
||||
size_t str_len = sdslen(str_ref->str);
|
||||
|
||||
/* RESP encodes bulk strings as $<length>\r\n<data>\r\n */
|
||||
char *prefix = reply->prefixes[reply->prfxcnt];
|
||||
prefix[0] = '$';
|
||||
size_t num_len = ll2string(prefix + 1, sizeof(reply->prefixes[0]) - 3, str_len);
|
||||
prefix[num_len + 1] = '\r';
|
||||
prefix[num_len + 2] = '\n';
|
||||
|
||||
int cnt = reply->iovcnt;
|
||||
addPlainBufferToReplyIOV(reply->prefixes[reply->prfxcnt], num_len + 3, reply, metadata);
|
||||
/* Increment prfxcnt only if prefix was added to reply in this writevToClient invocation */
|
||||
if (reply->iovcnt > cnt) reply->prfxcnt++;
|
||||
addPlainBufferToReplyIOV(str_ref->str, str_len, reply, metadata);
|
||||
addPlainBufferToReplyIOV(reply->crlf, 2, reply, metadata);
|
||||
|
||||
str_ref++;
|
||||
buf_len -= sizeof(bulkStrRef);
|
||||
}
|
||||
}
|
||||
|
||||
static void addEncodedBufferToReplyIOV(char *buf, size_t bufpos, replyIOV *reply, bufWriteMetadata *metadata) {
|
||||
char *ptr = buf;
|
||||
while (ptr < buf + bufpos && !reply->limit_reached) {
|
||||
payloadHeader *header = (payloadHeader *)ptr;
|
||||
ptr += sizeof(payloadHeader);
|
||||
if (header->payload_type == PLAIN_REPLY) {
|
||||
addPlainBufferToReplyIOV(ptr, header->payload_len, reply, metadata);
|
||||
} else {
|
||||
uint64_t data_len = metadata->data_len;
|
||||
addBulkStringToReplyIOV(ptr, header->payload_len, reply, metadata);
|
||||
/* Store actual reply len for cluster slot stats */
|
||||
header->reply_len = metadata->data_len - data_len;
|
||||
}
|
||||
ptr += header->payload_len;
|
||||
}
|
||||
}
|
||||
|
||||
static void addBufferToReplyIOV(int encoded, char *buf, size_t bufpos, replyIOV *reply, bufWriteMetadata *metadata) {
|
||||
metadata->data_len = 0;
|
||||
|
||||
if (encoded) {
|
||||
addEncodedBufferToReplyIOV(buf, bufpos, reply, metadata);
|
||||
metadata->complete = !reply->limit_reached;
|
||||
} else {
|
||||
addPlainBufferToReplyIOV(buf, bufpos, reply, metadata);
|
||||
metadata->complete = 1;
|
||||
}
|
||||
|
||||
if (reply->iov_len_total > NET_MAX_WRITES_PER_EVENT) {
|
||||
reply->limit_reached = 1;
|
||||
}
|
||||
|
||||
metadata->buf = buf;
|
||||
metadata->bufpos = bufpos;
|
||||
}
|
||||
|
||||
/*
|
||||
* This function calculates and stores on the client next:
|
||||
* io_last_written_buf - Last buffer that has been written to the client connection
|
||||
* io_last_written_bufpos - The buffer has been written until this position
|
||||
* io_last_written_data_len - The actual length of the data written from this buffer
|
||||
* This length differs from written bufpos in case of copy avoidance
|
||||
*
|
||||
* The io_last_written_buf and io_last_written_bufpos are used by _postWriteToClient
|
||||
* to detect last client reply buffer that can be released
|
||||
*
|
||||
* The io_last_written_data_len is used by writevToClient for resuming write from the point
|
||||
* where previous writevToClient invocation stopped
|
||||
**/
|
||||
static void saveLastWrittenBuf(client *c, bufWriteMetadata *metadata, int bufcnt, size_t totlen, size_t totwritten) {
|
||||
int last = bufcnt - 1;
|
||||
if (totwritten == totlen) {
|
||||
c->io_last_written.buf = metadata[last].buf;
|
||||
/* Zero io_last_written.bufpos indicates buffer written incompletely */
|
||||
c->io_last_written.bufpos = (metadata[last].complete ? metadata[last].bufpos : 0);
|
||||
c->io_last_written.data_len = metadata[last].data_len;
|
||||
return;
|
||||
}
|
||||
|
||||
last = -1;
|
||||
int64_t remaining = totwritten + c->io_last_written.data_len;
|
||||
while (remaining > 0) remaining -= metadata[++last].data_len;
|
||||
serverAssert(last < bufcnt);
|
||||
|
||||
c->io_last_written.buf = metadata[last].buf;
|
||||
/* Zero io_last_written_bufpos indicates buffer written incompletely */
|
||||
c->io_last_written.bufpos = (metadata[last].complete && remaining == 0 ? metadata[last].bufpos : 0);
|
||||
c->io_last_written.data_len = (size_t)(metadata[last].data_len + remaining);
|
||||
}
|
||||
|
||||
/* Adjust reply->iov to point to start of unwritten blocks */
|
||||
static void proceedToUnwritten(replyIOV *reply, int nwritten) {
|
||||
while (nwritten > 0) {
|
||||
if ((size_t)nwritten < reply->iov[0].iov_len) {
|
||||
reply->iov[0].iov_base = (char *)reply->iov[0].iov_base + nwritten;
|
||||
reply->iov[0].iov_len -= nwritten;
|
||||
break;
|
||||
}
|
||||
nwritten -= reply->iov[0].iov_len;
|
||||
reply->iov++;
|
||||
reply->iovcnt--;
|
||||
}
|
||||
}
|
||||
|
||||
/* This function should be called from _writeToClient when the reply list is not empty,
|
||||
* it gathers the scattered buffers from reply list and sends them away with connWritev.
|
||||
* If we write successfully, it returns C_OK, otherwise, C_ERR is returned.
|
||||
* Sets the c->nwritten to the number of bytes the server wrote to the client.
|
||||
* Can be called from the main thread or an I/O thread */
|
||||
static int writevToClient(client *c) {
|
||||
int iovcnt = 0;
|
||||
int iovmax = min(IOV_MAX, c->conn->iovcnt);
|
||||
struct iovec iov_arr[iovmax];
|
||||
struct iovec *iov = iov_arr;
|
||||
ssize_t bufpos, iov_bytes_len = 0;
|
||||
listNode *lastblock;
|
||||
/* iov_arr can accommodate iovmax / NUM_OF_IOV_PER_BULK_STR full bulk string replies
|
||||
* and one partial bulk reply */
|
||||
char prefixes[iovmax / NUM_OF_IOV_PER_BULK_STR + 1][BULK_STR_LEN_PREFIX_MAX_SIZE];
|
||||
char crlf[2] = {'\r', '\n'};
|
||||
size_t bufcnt = 0;
|
||||
|
||||
size_t bufpos = 0;
|
||||
listNode *lastblock;
|
||||
if (inMainThread()) {
|
||||
lastblock = listLast(c->reply);
|
||||
bufpos = c->bufpos;
|
||||
@@ -2169,50 +2526,54 @@ static int writevToClient(client *c) {
|
||||
bufpos = lastblock ? (size_t)c->bufpos : c->io_last_bufpos;
|
||||
}
|
||||
|
||||
int reply_blocks = (lastblock ? listLength(c->reply) : 0);
|
||||
/* +1 is for c->buf */
|
||||
size_t replyLen = min(reply_blocks + 1, iovmax);
|
||||
bufWriteMetadata buf_metadata[replyLen];
|
||||
|
||||
replyIOV reply;
|
||||
initReplyIOV(c, iovmax, iov_arr, prefixes, crlf, &reply);
|
||||
|
||||
/* If the static reply buffer is not empty,
|
||||
* add it to the iov array for writev() as well. */
|
||||
if (bufpos > 0) {
|
||||
iov[iovcnt].iov_base = c->buf + c->sentlen;
|
||||
iov[iovcnt].iov_len = bufpos - c->sentlen;
|
||||
iov_bytes_len += iov[iovcnt++].iov_len;
|
||||
addBufferToReplyIOV(c->flag.buf_encoded, c->buf, bufpos, &reply, &buf_metadata[bufcnt++]);
|
||||
}
|
||||
/* The first node of reply list might be incomplete from the last call,
|
||||
* thus it needs to be calibrated to get the actual data address and length. */
|
||||
size_t sentlen = bufpos > 0 ? 0 : c->sentlen;
|
||||
listIter iter;
|
||||
listNode *next;
|
||||
clientReplyBlock *o;
|
||||
size_t used;
|
||||
listRewind(c->reply, &iter);
|
||||
while ((next = listNext(&iter)) && iovcnt < iovmax && iov_bytes_len < NET_MAX_WRITES_PER_EVENT) {
|
||||
o = listNodeValue(next);
|
||||
|
||||
used = o->used;
|
||||
/* Use c->io_last_bufpos as the currently used portion of the block.
|
||||
* We use io_last_bufpos instead of o->used to ensure that we only access data guaranteed to be visible to the
|
||||
* current thread. Using o->used, which may have been updated by the main thread, could lead to accessing data
|
||||
* that may not yet be visible to the current thread*/
|
||||
if (!inMainThread() && next == lastblock) used = c->io_last_bufpos;
|
||||
if (lastblock) {
|
||||
listIter iter;
|
||||
listNode *next;
|
||||
listRewind(c->reply, &iter);
|
||||
while ((next = listNext(&iter)) && !reply.limit_reached) {
|
||||
clientReplyBlock *o = listNodeValue(next);
|
||||
|
||||
size_t used = o->used;
|
||||
/* Use c->io_last_bufpos as the currently used portion of the block.
|
||||
* We use io_last_bufpos instead of o->used to ensure that we only access data guaranteed to be visible to the
|
||||
* current thread. Using o->used, which may have been updated by the main thread, could lead to accessing data
|
||||
* that may not yet be visible to the current thread*/
|
||||
if (!inMainThread() && next == lastblock) used = c->io_last_bufpos;
|
||||
|
||||
if (used == 0) { /* empty node, skip over it. */
|
||||
if (next == lastblock) break;
|
||||
continue;
|
||||
}
|
||||
|
||||
addBufferToReplyIOV(o->flag.buf_encoded, o->buf, used, &reply, &buf_metadata[bufcnt]);
|
||||
if (!buf_metadata[bufcnt].data_len) break;
|
||||
bufcnt++;
|
||||
|
||||
if (used == 0) { /* empty node, skip over it. */
|
||||
if (next == lastblock) break;
|
||||
sentlen = 0;
|
||||
continue;
|
||||
|
||||
if (reply.iovcnt == reply.iovsize) {
|
||||
reply.limit_reached = 1;
|
||||
}
|
||||
}
|
||||
|
||||
iov[iovcnt].iov_base = o->buf + sentlen;
|
||||
iov[iovcnt].iov_len = used - sentlen;
|
||||
iov_bytes_len += iov[iovcnt++].iov_len;
|
||||
|
||||
sentlen = 0;
|
||||
if (next == lastblock) break;
|
||||
}
|
||||
|
||||
serverAssert(iovcnt != 0);
|
||||
|
||||
ssize_t totwritten = 0;
|
||||
while (1) {
|
||||
int nwritten = connWritev(c->conn, iov, iovcnt);
|
||||
int nwritten = connWritev(c->conn, reply.iov, reply.iovcnt);
|
||||
if (nwritten <= 0) {
|
||||
c->write_flags |= WRITE_FLAGS_WRITE_ERROR;
|
||||
totwritten = totwritten > 0 ? totwritten : nwritten;
|
||||
@@ -2220,7 +2581,7 @@ static int writevToClient(client *c) {
|
||||
}
|
||||
totwritten += nwritten;
|
||||
|
||||
if (totwritten == iov_bytes_len) break;
|
||||
if (totwritten == reply.iov_len_total) break;
|
||||
|
||||
if (totwritten > NET_MAX_WRITES_PER_EVENT) {
|
||||
/* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
|
||||
@@ -2237,20 +2598,13 @@ static int writevToClient(client *c) {
|
||||
}
|
||||
}
|
||||
|
||||
/* proceed to the unwritten blocks */
|
||||
while (nwritten > 0) {
|
||||
if ((size_t)nwritten < iov[0].iov_len) {
|
||||
iov[0].iov_base = (char *)iov[0].iov_base + nwritten;
|
||||
iov[0].iov_len -= nwritten;
|
||||
break;
|
||||
}
|
||||
nwritten -= iov[0].iov_len;
|
||||
iov++;
|
||||
iovcnt--;
|
||||
}
|
||||
proceedToUnwritten(&reply, nwritten);
|
||||
}
|
||||
|
||||
c->nwritten = totwritten;
|
||||
if (totwritten > 0) {
|
||||
saveLastWrittenBuf(c, buf_metadata, bufcnt, reply.iov_len_total, totwritten);
|
||||
}
|
||||
return totwritten > 0 ? C_OK : C_ERR;
|
||||
}
|
||||
|
||||
@@ -2271,14 +2625,17 @@ int _writeToClient(client *c) {
|
||||
lastblock = c->io_last_reply_block;
|
||||
}
|
||||
|
||||
/* If the reply list is not empty, use writev to save system calls and TCP packets */
|
||||
if (lastblock) return writevToClient(c);
|
||||
/* If the reply list is not empty or buffer is encoded,
|
||||
* use writev to save system calls and TCP packets */
|
||||
if (lastblock || c->flag.buf_encoded) return writevToClient(c);
|
||||
|
||||
ssize_t bytes_to_write = bufpos - c->sentlen;
|
||||
/* If io_last_written_data_len is nonzero it must relate to c->buf */
|
||||
serverAssert(c->io_last_written.data_len == 0 || c->io_last_written.buf == c->buf);
|
||||
ssize_t bytes_to_write = bufpos - c->io_last_written.data_len;
|
||||
ssize_t tot_written = 0;
|
||||
|
||||
while (tot_written < bytes_to_write) {
|
||||
int nwritten = connWrite(c->conn, c->buf + c->sentlen + tot_written, bytes_to_write - tot_written);
|
||||
int nwritten = connWrite(c->conn, c->buf + c->io_last_written.data_len + tot_written, bytes_to_write - tot_written);
|
||||
if (nwritten <= 0) {
|
||||
c->write_flags |= WRITE_FLAGS_WRITE_ERROR;
|
||||
tot_written = tot_written > 0 ? tot_written : nwritten;
|
||||
@@ -2288,44 +2645,101 @@ int _writeToClient(client *c) {
|
||||
}
|
||||
|
||||
c->nwritten = tot_written;
|
||||
if (tot_written > 0) {
|
||||
c->io_last_written.buf = c->buf;
|
||||
c->io_last_written.bufpos = (tot_written == bytes_to_write ? bufpos : 0);
|
||||
c->io_last_written.data_len = c->io_last_written.data_len + tot_written;
|
||||
}
|
||||
return tot_written > 0 ? C_OK : C_ERR;
|
||||
}
|
||||
|
||||
void resetLastWrittenBuf(client *c) {
|
||||
c->io_last_written.buf = NULL;
|
||||
c->io_last_written.bufpos = 0;
|
||||
c->io_last_written.data_len = 0;
|
||||
}
|
||||
|
||||
/* Release references to string objects inside an encoded buffer */
|
||||
static void releaseBufReferences(char *buf, size_t bufpos) {
|
||||
char *ptr = buf;
|
||||
while (ptr < buf + bufpos) {
|
||||
payloadHeader *header = (payloadHeader *)ptr;
|
||||
ptr += sizeof(payloadHeader);
|
||||
|
||||
if (header->payload_type == BULK_STR_REF) {
|
||||
clusterSlotStatsAddNetworkBytesOutForSlot(header->slot, header->reply_len);
|
||||
|
||||
bulkStrRef *str_ref = (bulkStrRef *)ptr;
|
||||
size_t len = header->payload_len;
|
||||
while (len > 0) {
|
||||
decrRefCount(str_ref->obj);
|
||||
str_ref++;
|
||||
len -= sizeof(bulkStrRef);
|
||||
}
|
||||
} else {
|
||||
serverAssert(header->payload_type == PLAIN_REPLY);
|
||||
}
|
||||
|
||||
ptr += header->payload_len;
|
||||
}
|
||||
serverAssert(ptr == buf + bufpos);
|
||||
}
|
||||
|
||||
void releaseReplyReferences(client *c) {
|
||||
if (c->bufpos > 0 && c->flag.buf_encoded) {
|
||||
releaseBufReferences(c->buf, c->bufpos);
|
||||
}
|
||||
|
||||
listIter iter;
|
||||
listNode *next;
|
||||
listRewind(c->reply, &iter);
|
||||
while ((next = listNext(&iter))) {
|
||||
clientReplyBlock *o = (clientReplyBlock *)listNodeValue(next);
|
||||
if (o->flag.buf_encoded) {
|
||||
releaseBufReferences(o->buf, o->used);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void _postWriteToClient(client *c) {
|
||||
if (c->nwritten <= 0) return;
|
||||
server.stat_net_output_bytes += c->nwritten;
|
||||
|
||||
int last_written = 0;
|
||||
if (c->bufpos > 0) {
|
||||
/* Is this buffer is last written? */
|
||||
last_written = (c->buf == c->io_last_written.buf);
|
||||
/* If buffer is completely written */
|
||||
if (!last_written || c->bufpos == c->io_last_written.bufpos) {
|
||||
/* If encoded then release references to bulk string objects */
|
||||
if (c->flag.buf_encoded) releaseBufReferences(c->buf, c->bufpos);
|
||||
/* Reset buffer metadata */
|
||||
c->bufpos = 0;
|
||||
c->flag.buf_encoded = 0;
|
||||
c->last_header = NULL;
|
||||
/* If completely written buffer is last written then reset last written state */
|
||||
if (last_written) resetLastWrittenBuf(c);
|
||||
}
|
||||
if (last_written) return;
|
||||
}
|
||||
|
||||
listIter iter;
|
||||
listNode *next;
|
||||
clientReplyBlock *o;
|
||||
|
||||
server.stat_net_output_bytes += c->nwritten;
|
||||
|
||||
/* Locate the new node which has leftover data and
|
||||
* release all nodes in front of it. */
|
||||
ssize_t remaining = c->nwritten;
|
||||
if (c->bufpos > 0) { /* Deal with static reply buffer first. */
|
||||
int buf_len = c->bufpos - c->sentlen;
|
||||
c->sentlen += c->nwritten;
|
||||
/* If the buffer was sent, set bufpos to zero to continue with
|
||||
* the remainder of the reply. */
|
||||
if (c->nwritten >= buf_len) {
|
||||
c->bufpos = 0;
|
||||
c->sentlen = 0;
|
||||
}
|
||||
remaining -= buf_len;
|
||||
}
|
||||
listRewind(c->reply, &iter);
|
||||
while (remaining > 0) {
|
||||
next = listNext(&iter);
|
||||
o = listNodeValue(next);
|
||||
if (remaining < (ssize_t)(o->used - c->sentlen)) {
|
||||
c->sentlen += remaining;
|
||||
break;
|
||||
while ((next = listNext(&iter))) {
|
||||
clientReplyBlock *o = listNodeValue(next);
|
||||
/* Is this buffer is last written? */
|
||||
last_written = (o->buf == c->io_last_written.buf);
|
||||
/* If buffer is completely written */
|
||||
if (!last_written || o->used == c->io_last_written.bufpos) {
|
||||
c->reply_bytes -= o->size;
|
||||
/* If encoded then release references to bulk string objects */
|
||||
if (o->flag.buf_encoded) releaseBufReferences(o->buf, o->used);
|
||||
listDelNode(c->reply, next);
|
||||
/* If completely written buffer is last written then reset last written state */
|
||||
if (last_written) resetLastWrittenBuf(c);
|
||||
}
|
||||
remaining -= (ssize_t)(o->used - c->sentlen);
|
||||
c->reply_bytes -= o->size;
|
||||
listDelNode(c->reply, next);
|
||||
c->sentlen = 0;
|
||||
if (last_written) return;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2359,7 +2773,7 @@ int postWriteToClient(client *c) {
|
||||
if (!c->flag.primary) c->last_interaction = server.unixtime;
|
||||
}
|
||||
if (!clientHasPendingReplies(c)) {
|
||||
c->sentlen = 0;
|
||||
resetLastWrittenBuf(c);
|
||||
if (connHasWriteHandler(c->conn)) {
|
||||
connSetWriteHandler(c->conn, NULL);
|
||||
}
|
||||
|
||||
+2
-1
@@ -4371,8 +4371,9 @@ void replicationCachePrimary(client *c) {
|
||||
server.primary->repl_data->repl_applied = 0;
|
||||
server.primary->repl_data->read_reploff = server.primary->repl_data->reploff;
|
||||
if (c->flag.multi) discardTransaction(c);
|
||||
releaseReplyReferences(c);
|
||||
resetLastWrittenBuf(c);
|
||||
listEmpty(c->reply);
|
||||
c->sentlen = 0;
|
||||
c->reply_bytes = 0;
|
||||
c->bufpos = 0;
|
||||
resetClient(c);
|
||||
|
||||
+4
-4
@@ -956,16 +956,16 @@ int clientsCronResizeQueryBuffer(client *c) {
|
||||
* The buffer peak will be reset back to the buffer position every server.reply_buffer_peak_reset_time milliseconds
|
||||
* The function always returns 0 as it never terminates the client. */
|
||||
int clientsCronResizeOutputBuffer(client *c, mstime_t now_ms) {
|
||||
if (c->io_write_state != CLIENT_IDLE) return 0;
|
||||
/* in case the resizing is disabled return immediately */
|
||||
if (!server.reply_buffer_resizing_enabled) return 0;
|
||||
|
||||
if (c->io_write_state != CLIENT_IDLE || c->flag.buf_encoded) return 0;
|
||||
|
||||
size_t new_buffer_size = 0;
|
||||
char *oldbuf = NULL;
|
||||
const size_t buffer_target_shrink_size = c->buf_usable_size / 2;
|
||||
const size_t buffer_target_expand_size = c->buf_usable_size * 2;
|
||||
|
||||
/* in case the resizing is disabled return immediately */
|
||||
if (!server.reply_buffer_resizing_enabled) return 0;
|
||||
|
||||
if (buffer_target_shrink_size >= PROTO_REPLY_MIN_BYTES && c->buf_peak < buffer_target_shrink_size) {
|
||||
new_buffer_size = max(PROTO_REPLY_MIN_BYTES, c->buf_peak + 1);
|
||||
server.stat_reply_buffer_shrinks++;
|
||||
|
||||
+31
-6
@@ -785,10 +785,19 @@ char *getObjectTypeName(robj *);
|
||||
|
||||
struct evictionPoolEntry; /* Defined in evict.c */
|
||||
|
||||
typedef struct payloadHeader payloadHeader; /* Defined in networking.c */
|
||||
|
||||
typedef struct ClientReplyBlockFlags {
|
||||
uint8_t buf_encoded : 1; /* True if reply block buf content is encoded (e.g. for copy avoidance) */
|
||||
uint8_t reserved : 7;
|
||||
} ClientReplyBlockFlags;
|
||||
|
||||
/* This structure is used in order to represent the output buffer of a client,
|
||||
* which is actually a linked list of blocks like that, that is: client->reply. */
|
||||
typedef struct clientReplyBlock {
|
||||
size_t size, used;
|
||||
payloadHeader *last_header; /* points to a last header in an encoded buffer */
|
||||
ClientReplyBlockFlags flag;
|
||||
char buf[];
|
||||
} clientReplyBlock;
|
||||
|
||||
@@ -1017,7 +1026,7 @@ typedef struct {
|
||||
/* General */
|
||||
int saved; /* 1 if we already saved the offset (first time we call addReply*) */
|
||||
/* Offset within the static reply buffer */
|
||||
int bufpos;
|
||||
size_t bufpos;
|
||||
/* Offset within the reply block list */
|
||||
struct {
|
||||
int index;
|
||||
@@ -1058,6 +1067,7 @@ typedef struct ClientFlags {
|
||||
uint64_t prevent_prop : 1; /* Don't propagate to AOF or replicas. */
|
||||
uint64_t pending_write : 1; /* Client has output to send but a write handler is yet not installed. */
|
||||
uint64_t pending_read : 1; /* Client has output to send but a write handler is yet not installed. */
|
||||
uint64_t buf_encoded : 1; /* True if c->buf content is encoded (e.g. for copy avoidance) */
|
||||
uint64_t reply_off : 1; /* Don't send replies to client. */
|
||||
uint64_t reply_skip_next : 1; /* Set CLIENT_REPLY_SKIP for next cmd */
|
||||
uint64_t reply_skip : 1; /* Don't send just this reply. */
|
||||
@@ -1115,7 +1125,6 @@ typedef struct ClientFlags {
|
||||
or client::buf. */
|
||||
uint64_t keyspace_notified : 1; /* Indicates that a keyspace notification was triggered during the execution of the
|
||||
current command. */
|
||||
uint64_t reserved : 1; /* Reserved for future use */
|
||||
} ClientFlags;
|
||||
|
||||
typedef struct ClientPubSubData {
|
||||
@@ -1182,6 +1191,14 @@ typedef struct ClientModuleData {
|
||||
* unloaded for cleanup. Opaque for the Server Core.*/
|
||||
} ClientModuleData;
|
||||
|
||||
typedef struct LastWrittenBuf {
|
||||
char *buf; /* Last buffer that has been written to the client connection
|
||||
* Last buffer is either c->buf or c->reply list node (i.e. buf from a clientReplyBlock) */
|
||||
size_t bufpos; /* The buffer has been written until this position */
|
||||
size_t data_len; /* The actual reply length written from this buffer
|
||||
* This length differs from bufpos in case of copy avoidance */
|
||||
} LastWrittenBuf;
|
||||
|
||||
typedef struct client {
|
||||
/* Basic client information and connection. */
|
||||
uint64_t id; /* Client incremental unique ID. */
|
||||
@@ -1217,12 +1234,13 @@ typedef struct client {
|
||||
list *reply; /* List of reply objects to send to the client. */
|
||||
listNode *io_last_reply_block; /* Last client reply block when sent to IO thread */
|
||||
size_t io_last_bufpos; /* The client's bufpos at the time it was sent to the IO thread */
|
||||
LastWrittenBuf io_last_written; /* Track state for last written buffer */
|
||||
unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
|
||||
size_t sentlen; /* Amount of bytes already sent in the current buffer or object being sent. */
|
||||
listNode clients_pending_write_node; /* list node in clients_pending_write or in clients_pending_io_write list */
|
||||
int bufpos;
|
||||
int original_argc; /* Num of arguments of original command if arguments were rewritten. */
|
||||
robj **original_argv; /* Arguments of original command if arguments were rewritten. */
|
||||
size_t bufpos;
|
||||
payloadHeader *last_header; /* Pointer to the last header in a buffer when using copy avoidance */
|
||||
int original_argc; /* Num of arguments of original command if arguments were rewritten. */
|
||||
robj **original_argv; /* Arguments of original command if arguments were rewritten. */
|
||||
/* Client flags and state indicators */
|
||||
union {
|
||||
uint64_t raw_flag;
|
||||
@@ -1676,6 +1694,10 @@ struct valkeyServer {
|
||||
int enable_module_cmd; /* Enable MODULE commands, see PROTECTED_ACTION_ALLOWED_* */
|
||||
int enable_debug_assert; /* Enable debug asserts */
|
||||
int debug_client_enforce_reply_list; /* Force client to always use the reply list */
|
||||
/* Reply construction copy avoidance */
|
||||
int min_io_threads_copy_avoid; /* Minimum number of IO threads for copy avoidance in reply construction */
|
||||
int min_string_size_copy_avoid_threaded; /* Minimum bulk string size for copy avoidance in reply construction when IO threads enabled */
|
||||
int min_string_size_copy_avoid; /* Minimum bulk string size for copy avoidance in reply construction when IO threads disabled */
|
||||
/* RDB / AOF loading information */
|
||||
volatile sig_atomic_t loading; /* We are loading data from disk if true */
|
||||
volatile sig_atomic_t async_loading; /* We are loading data without blocking the db being served */
|
||||
@@ -2786,6 +2808,9 @@ void ioThreadWriteToClient(void *data);
|
||||
int canParseCommand(client *c);
|
||||
int processIOThreadsReadDone(void);
|
||||
int processIOThreadsWriteDone(void);
|
||||
void releaseReplyReferences(client *c);
|
||||
void resetLastWrittenBuf(client *c);
|
||||
|
||||
|
||||
/* logreqres.c - logging of requests and responses */
|
||||
void reqresReset(client *c, int free_buf);
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -859,7 +859,7 @@ int test_listpackLpNextRandomCC(int argc, char **argv, int flags) {
|
||||
|
||||
/* Don't crash even for bad index. */
|
||||
for (int j = 0; j < 100; j++) {
|
||||
unsigned char *p;
|
||||
unsigned char *p = NULL;
|
||||
switch (j % 4) {
|
||||
case 0: p = p0; break;
|
||||
case 1: p = p1; break;
|
||||
|
||||
@@ -435,3 +435,266 @@ int test_rewriteClientCommandArgument(int argc, char **argv, int flags) {
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static client *createTestClient(void) {
|
||||
client *c = zcalloc(sizeof(client));
|
||||
|
||||
c->buf = zmalloc_usable(PROTO_REPLY_CHUNK_BYTES, &c->buf_usable_size);
|
||||
c->reply = listCreate();
|
||||
listSetFreeMethod(c->reply, freeClientReplyValue);
|
||||
listSetDupMethod(c->reply, dupClientReplyValue);
|
||||
/* dummy connection to bypass assert in closeClientOnOutputBufferLimitReached */
|
||||
c->conn = (connection *)c;
|
||||
c->deferred_reply_bytes = ULLONG_MAX;
|
||||
|
||||
return c;
|
||||
}
|
||||
|
||||
static void freeReplyOffloadClient(client *c) {
|
||||
listRelease(c->reply);
|
||||
zfree(c->buf);
|
||||
zfree(c);
|
||||
}
|
||||
|
||||
/* Each bulk offload puts 2 pointers to a reply buffer */
|
||||
#define PTRS_LEN (sizeof(void *) * 2)
|
||||
|
||||
int test_addRepliesWithOffloadsToBuffer(int argc, char **argv, int flags) {
|
||||
UNUSED(argc);
|
||||
UNUSED(argv);
|
||||
UNUSED(flags);
|
||||
|
||||
client *c = createTestClient();
|
||||
|
||||
/* Test 1: Add bulk offloads to the buffer */
|
||||
robj *obj = createObject(OBJ_STRING, sdscatfmt(sdsempty(), "test"));
|
||||
_addBulkStrRefToBufferOrList(c, obj);
|
||||
|
||||
TEST_ASSERT(obj->refcount == 2);
|
||||
TEST_ASSERT(c->bufpos == sizeof(payloadHeader) + PTRS_LEN);
|
||||
|
||||
payloadHeader *header1 = c->last_header;
|
||||
TEST_ASSERT(header1->payload_type == BULK_STR_REF);
|
||||
TEST_ASSERT(header1->payload_len == PTRS_LEN);
|
||||
|
||||
robj **ptr = (robj **)(c->buf + sizeof(payloadHeader));
|
||||
TEST_ASSERT(obj == *ptr);
|
||||
|
||||
robj *obj2 = createObject(OBJ_STRING, sdscatfmt(sdsempty(), "test2"));
|
||||
_addBulkStrRefToBufferOrList(c, obj2);
|
||||
|
||||
/* 2 offloads expected in c->buf */
|
||||
TEST_ASSERT(c->bufpos == sizeof(payloadHeader) + 2 * PTRS_LEN);
|
||||
TEST_ASSERT(header1->payload_type == BULK_STR_REF);
|
||||
TEST_ASSERT(header1->payload_len == 2 * PTRS_LEN);
|
||||
|
||||
ptr = (robj **)(c->buf + sizeof(payloadHeader) + PTRS_LEN);
|
||||
TEST_ASSERT(obj2 == *ptr);
|
||||
|
||||
/* Test 2: Add plain reply to the buffer */
|
||||
const char *plain = "+OK\r\n";
|
||||
size_t plain_len = strlen(plain);
|
||||
_addReplyToBufferOrList(c, plain, plain_len);
|
||||
|
||||
/* 2 offloads and plain reply expected in c->buf. So 2 headers expected as well */
|
||||
TEST_ASSERT(c->bufpos == 2 * sizeof(payloadHeader) + 2 * PTRS_LEN + plain_len);
|
||||
TEST_ASSERT(header1->payload_type == BULK_STR_REF);
|
||||
TEST_ASSERT(header1->payload_len == 2 * PTRS_LEN);
|
||||
payloadHeader *header2 = c->last_header;
|
||||
TEST_ASSERT(header2->payload_type == PLAIN_REPLY);
|
||||
TEST_ASSERT(header2->payload_len == plain_len);
|
||||
|
||||
/* Add more plain replies. Check same plain reply header updated properly */
|
||||
for (int i = 0; i < 9; ++i) _addReplyToBufferOrList(c, plain, plain_len);
|
||||
TEST_ASSERT(c->bufpos == 2 * sizeof(payloadHeader) + 2 * PTRS_LEN + 10 * plain_len);
|
||||
TEST_ASSERT(header2->payload_type == PLAIN_REPLY);
|
||||
TEST_ASSERT(header2->payload_len == plain_len * 10);
|
||||
|
||||
/* Test 3: Add one more bulk offload to the buffer */
|
||||
_addBulkStrRefToBufferOrList(c, obj);
|
||||
TEST_ASSERT(obj->refcount == 3);
|
||||
TEST_ASSERT(c->bufpos == 3 * sizeof(payloadHeader) + 3 * PTRS_LEN + 10 * plain_len);
|
||||
payloadHeader *header3 = c->last_header;
|
||||
TEST_ASSERT(header3->payload_type == BULK_STR_REF);
|
||||
ptr = (robj **)((char *)c->last_header + sizeof(payloadHeader));
|
||||
TEST_ASSERT(obj == *ptr);
|
||||
|
||||
releaseReplyReferences(c);
|
||||
decrRefCount(obj);
|
||||
decrRefCount(obj2);
|
||||
|
||||
freeReplyOffloadClient(c);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int test_addRepliesWithOffloadsToList(int argc, char **argv, int flags) {
|
||||
UNUSED(argc);
|
||||
UNUSED(argv);
|
||||
UNUSED(flags);
|
||||
|
||||
/* Required for isCopyAvoidPreferred / isCopyAvoidIndicatedByIOThreads */
|
||||
int io_threads_num = server.io_threads_num;
|
||||
int min_io_threads_for_copy_avoid = server.min_io_threads_copy_avoid;
|
||||
server.io_threads_num = 1;
|
||||
server.min_io_threads_copy_avoid = 1;
|
||||
|
||||
client *c = createTestClient();
|
||||
|
||||
// Mock ACL
|
||||
user u;
|
||||
DefaultUser = &u;
|
||||
DefaultUser->flags = USER_FLAG_NOPASS;
|
||||
|
||||
/* Test 1: Add bulk offloads to the reply list */
|
||||
|
||||
/* Select reply length so that there is place for 2 headers and 4 bytes only
|
||||
* 4 bytes is not enough for object pointer(s)
|
||||
* This will force bulk offload to be added to reply list
|
||||
*/
|
||||
size_t reply_len = c->buf_usable_size - 2 * sizeof(payloadHeader) - 4;
|
||||
char *reply = zmalloc(reply_len);
|
||||
memset(reply, 'a', reply_len);
|
||||
_addReplyToBufferOrList(c, reply, reply_len);
|
||||
TEST_ASSERT(c->flag.buf_encoded);
|
||||
TEST_ASSERT(c->bufpos == sizeof(payloadHeader) + reply_len);
|
||||
TEST_ASSERT(listLength(c->reply) == 0);
|
||||
|
||||
/* As bulk offload header+pointer can't be accommodated in c->buf
|
||||
* then one block is expected in c->reply */
|
||||
robj *obj = createObject(OBJ_STRING, sdscatfmt(sdsempty(), "test"));
|
||||
_addBulkStrRefToBufferOrList(c, obj);
|
||||
TEST_ASSERT(obj->refcount == 2);
|
||||
TEST_ASSERT(c->bufpos == sizeof(payloadHeader) + reply_len);
|
||||
TEST_ASSERT(listLength(c->reply) == 1);
|
||||
|
||||
/* Check bulk offload header+pointer inside c->reply */
|
||||
listIter iter;
|
||||
listRewind(c->reply, &iter);
|
||||
listNode *next = listNext(&iter);
|
||||
clientReplyBlock *blk = listNodeValue(next);
|
||||
|
||||
TEST_ASSERT(blk->used == sizeof(payloadHeader) + PTRS_LEN);
|
||||
payloadHeader *header1 = blk->last_header;
|
||||
TEST_ASSERT(header1->payload_type == BULK_STR_REF);
|
||||
TEST_ASSERT(header1->payload_len == PTRS_LEN);
|
||||
|
||||
robj **ptr = (robj **)(blk->buf + sizeof(payloadHeader));
|
||||
TEST_ASSERT(obj == *ptr);
|
||||
|
||||
/* Test 2: Add one more bulk offload to the reply list */
|
||||
_addBulkStrRefToBufferOrList(c, obj);
|
||||
TEST_ASSERT(obj->refcount == 3);
|
||||
TEST_ASSERT(listLength(c->reply) == 1);
|
||||
TEST_ASSERT(blk->used == sizeof(payloadHeader) + 2 * PTRS_LEN);
|
||||
TEST_ASSERT(header1->payload_type == BULK_STR_REF);
|
||||
TEST_ASSERT(header1->payload_len == 2 * PTRS_LEN);
|
||||
|
||||
/* Test 3: Add plain replies to cause reply list grow */
|
||||
while (reply_len < blk->size - blk->used) _addReplyToBufferOrList(c, reply, reply_len);
|
||||
_addReplyToBufferOrList(c, reply, reply_len);
|
||||
|
||||
TEST_ASSERT(listLength(c->reply) == 2);
|
||||
/* last header in 1st block */
|
||||
payloadHeader *header2 = blk->last_header;
|
||||
listRewind(c->reply, &iter);
|
||||
listNext(&iter);
|
||||
next = listNext(&iter);
|
||||
clientReplyBlock *blk2 = listNodeValue(next);
|
||||
/* last header in 2nd block */
|
||||
payloadHeader *header3 = blk2->last_header;
|
||||
TEST_ASSERT(header2->payload_type == PLAIN_REPLY && header3->payload_type == PLAIN_REPLY);
|
||||
TEST_ASSERT((header2->payload_len + header3->payload_len) % reply_len == 0);
|
||||
|
||||
releaseReplyReferences(c);
|
||||
decrRefCount(obj);
|
||||
|
||||
zfree(reply);
|
||||
|
||||
freeReplyOffloadClient(c);
|
||||
|
||||
/* Restore modified values */
|
||||
server.io_threads_num = io_threads_num;
|
||||
server.min_io_threads_copy_avoid = min_io_threads_for_copy_avoid;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int test_addBufferToReplyIOV(int argc, char **argv, int flags) {
|
||||
UNUSED(argc);
|
||||
UNUSED(argv);
|
||||
UNUSED(flags);
|
||||
|
||||
const char *expected_reply = "$5\r\nhello\r\n";
|
||||
ssize_t total_len = strlen(expected_reply);
|
||||
const int iovmax = 16;
|
||||
char crlf[2] = {'\r', '\n'};
|
||||
|
||||
/* Test 1: 1st writevToclient invocation */
|
||||
client *c = createTestClient();
|
||||
robj *obj = createObject(OBJ_STRING, sdscatfmt(sdsempty(), "hello"));
|
||||
_addBulkStrRefToBufferOrList(c, obj);
|
||||
|
||||
struct iovec iov_arr[iovmax];
|
||||
char prefixes[iovmax / 3 + 1][LONG_STR_SIZE + 3];
|
||||
bufWriteMetadata metadata[1];
|
||||
|
||||
replyIOV reply;
|
||||
initReplyIOV(c, iovmax, iov_arr, prefixes, crlf, &reply);
|
||||
addBufferToReplyIOV(c->flag.buf_encoded, c->buf, c->bufpos, &reply, &metadata[0]);
|
||||
|
||||
TEST_ASSERT(reply.iov_len_total == total_len);
|
||||
TEST_ASSERT(reply.iovcnt == 3);
|
||||
const char *ptr = expected_reply;
|
||||
for (int i = 0; i < reply.iovcnt; ++i) {
|
||||
TEST_ASSERT(memcmp(ptr, reply.iov[i].iov_base, reply.iov[i].iov_len) == 0);
|
||||
ptr += reply.iov[i].iov_len;
|
||||
}
|
||||
|
||||
/* Test 2: Last written buf/pos/data_len after 1st invocation */
|
||||
saveLastWrittenBuf(c, metadata, 1, reply.iov_len_total, 1); /* only 1 byte has been written */
|
||||
TEST_ASSERT(c->io_last_written.buf == c->buf);
|
||||
TEST_ASSERT(c->io_last_written.bufpos == 0); /* incomplete write */
|
||||
TEST_ASSERT(c->io_last_written.data_len == 1);
|
||||
|
||||
/* Test 3: 2nd writevToclient invocation */
|
||||
struct iovec iov_arr2[iovmax];
|
||||
char prefixes2[iovmax / 3 + 1][LONG_STR_SIZE + 3];
|
||||
bufWriteMetadata metadata2[1];
|
||||
|
||||
replyIOV reply2;
|
||||
initReplyIOV(c, iovmax, iov_arr2, prefixes2, crlf, &reply2);
|
||||
addBufferToReplyIOV(c->flag.buf_encoded, c->buf, c->bufpos, &reply2, &metadata2[0]);
|
||||
TEST_ASSERT(reply2.iov_len_total == total_len - 1);
|
||||
TEST_ASSERT((*(char *)reply2.iov[0].iov_base) == '5');
|
||||
|
||||
/* Test 4: Last written buf/pos/data_len after 2nd invocation */
|
||||
saveLastWrittenBuf(c, metadata2, 1, reply2.iov_len_total, 4); /* 4 more bytes has been written */
|
||||
TEST_ASSERT(c->io_last_written.buf == c->buf);
|
||||
TEST_ASSERT(c->io_last_written.bufpos == 0); /* incomplete write */
|
||||
TEST_ASSERT(c->io_last_written.data_len == 5); /* 1 + 4 */
|
||||
|
||||
/* Test 5: 3rd writevToclient invocation */
|
||||
struct iovec iov_arr3[iovmax];
|
||||
char prefixes3[iovmax / 3 + 1][LONG_STR_SIZE + 3];
|
||||
bufWriteMetadata metadata3[1];
|
||||
|
||||
replyIOV reply3;
|
||||
initReplyIOV(c, iovmax, iov_arr3, prefixes3, crlf, &reply3);
|
||||
addBufferToReplyIOV(c->flag.buf_encoded, c->buf, c->bufpos, &reply3, &metadata3[0]);
|
||||
TEST_ASSERT(reply3.iov_len_total == total_len - 5);
|
||||
TEST_ASSERT((*(char *)reply3.iov[0].iov_base) == 'e');
|
||||
|
||||
/* Test 6: Last written buf/pos/data_len after 3rd invocation */
|
||||
saveLastWrittenBuf(c, metadata3, 1, reply3.iov_len_total, reply3.iov_len_total); /* everything has been written */
|
||||
TEST_ASSERT(c->io_last_written.buf == c->buf);
|
||||
TEST_ASSERT(c->io_last_written.bufpos == c->bufpos);
|
||||
TEST_ASSERT(c->io_last_written.data_len == (size_t)total_len);
|
||||
|
||||
decrRefCount(obj);
|
||||
decrRefCount(obj);
|
||||
|
||||
freeReplyOffloadClient(c);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -639,7 +639,7 @@ int test_ziplistStressWithRandomPayloadsOfDifferentEncoding(int argc, char **arg
|
||||
/* Hold temp vars from ziplist */
|
||||
unsigned char *sstr;
|
||||
unsigned int slen;
|
||||
long long sval;
|
||||
long long sval = 0;
|
||||
|
||||
iteration = accurate ? 20000 : 20;
|
||||
for (i = 0; i < iteration; i++) {
|
||||
|
||||
@@ -111,6 +111,7 @@ proc spawn_instance {type base_port count {conf {}} {base_conf_file ""}} {
|
||||
if {$::io_threads} {
|
||||
puts $cfg "io-threads 2"
|
||||
puts $cfg "events-per-io-thread 0"
|
||||
puts $cfg "min-io-threads-avoid-copy-reply 2"
|
||||
}
|
||||
|
||||
if {$::log_req_res} {
|
||||
|
||||
@@ -8,7 +8,7 @@ set aof_base_file "$server_path/$aof_dirname/${aof_basename}.1$::base_aof_suffix
|
||||
set aof_file "$server_path/$aof_dirname/${aof_basename}.1$::incr_aof_suffix$::aof_format_suffix"
|
||||
set aof_manifest_file "$server_path/$aof_dirname/$aof_basename$::manifest_suffix"
|
||||
|
||||
tags {"aof external:skip"} {
|
||||
tags {"aof external:skip logreqres:skip"} {
|
||||
# Server can start when aof-load-truncated is set to yes and AOF
|
||||
# is truncated, with an incomplete MULTI block.
|
||||
create_aof $aof_dirpath $aof_file {
|
||||
|
||||
@@ -542,6 +542,7 @@ proc start_server {options {code undefined}} {
|
||||
if {$::io_threads} {
|
||||
dict set config "io-threads" 2
|
||||
dict set config "events-per-io-thread" 0
|
||||
dict set config "min-io-threads-avoid-copy-reply" 2
|
||||
}
|
||||
|
||||
foreach line $data {
|
||||
|
||||
@@ -52,6 +52,10 @@ proc kb {v} {
|
||||
start_server {} {
|
||||
set maxmemory_clients 3000000
|
||||
r config set maxmemory-clients $maxmemory_clients
|
||||
# Disable copy avoidance because it affects memory usage
|
||||
r config set min-io-threads-avoid-copy-reply 0
|
||||
r config set min-string-size-avoid-copy-reply 0
|
||||
r config set min-string-size-avoid-copy-reply-threaded 0
|
||||
|
||||
test "client evicted due to large argv" {
|
||||
r flushdb
|
||||
@@ -332,6 +336,10 @@ start_server {} {
|
||||
set obuf_limit [mb 3]
|
||||
r config set maxmemory-clients $maxmemory_clients
|
||||
r config set client-output-buffer-limit "normal $obuf_limit 0 0"
|
||||
# Disable copy avoidance
|
||||
r config set min-io-threads-avoid-copy-reply 0
|
||||
r config set min-string-size-avoid-copy-reply 0
|
||||
r config set min-string-size-avoid-copy-reply-threaded 0
|
||||
|
||||
test "avoid client eviction when client is freed by output buffer limit" {
|
||||
r flushdb
|
||||
@@ -385,13 +393,17 @@ start_server {} {
|
||||
}
|
||||
|
||||
start_server {} {
|
||||
# Disable copy avoidance
|
||||
r config set min-io-threads-avoid-copy-reply 0
|
||||
r config set min-string-size-avoid-copy-reply 0
|
||||
r config set min-string-size-avoid-copy-reply-threaded 0
|
||||
|
||||
test "decrease maxmemory-clients causes client eviction" {
|
||||
set maxmemory_clients [mb 4]
|
||||
set client_count 10
|
||||
set qbsize [expr ($maxmemory_clients - [mb 1]) / $client_count]
|
||||
r config set maxmemory-clients $maxmemory_clients
|
||||
|
||||
|
||||
# Make multiple clients consume together roughly 1mb less than maxmemory_clients
|
||||
set rrs {}
|
||||
for {set j 0} {$j < $client_count} {incr j} {
|
||||
@@ -426,6 +438,11 @@ start_server {} {
|
||||
}
|
||||
|
||||
start_server {} {
|
||||
# Disable copy avoidance
|
||||
r config set min-io-threads-avoid-copy-reply 0
|
||||
r config set min-string-size-avoid-copy-reply 0
|
||||
r config set min-string-size-avoid-copy-reply-threaded 0
|
||||
|
||||
test "evict clients only until below limit" {
|
||||
set client_count 10
|
||||
set client_mem [mb 1]
|
||||
@@ -434,6 +451,7 @@ start_server {} {
|
||||
r client setname control
|
||||
r client no-evict on
|
||||
|
||||
|
||||
# Make multiple clients consume together roughly 1mb less than maxmemory_clients
|
||||
set total_client_mem 0
|
||||
set max_client_mem 0
|
||||
@@ -488,6 +506,11 @@ start_server {} {
|
||||
}
|
||||
|
||||
start_server {} {
|
||||
# Disable copy avoidance
|
||||
r config set min-io-threads-avoid-copy-reply 0
|
||||
r config set min-string-size-avoid-copy-reply 0
|
||||
r config set min-string-size-avoid-copy-reply-threaded 0
|
||||
|
||||
test "evict clients in right order (large to small)" {
|
||||
# Note that each size step needs to be at least x2 larger than previous step
|
||||
# because of how the client-eviction size bucketing works
|
||||
@@ -555,6 +578,11 @@ start_server {} {
|
||||
}
|
||||
|
||||
start_server {} {
|
||||
# Disable copy avoidance
|
||||
r config set min-io-threads-avoid-copy-reply 0
|
||||
r config set min-string-size-avoid-copy-reply 0
|
||||
r config set min-string-size-avoid-copy-reply-threaded 0
|
||||
|
||||
foreach type {"client no-evict" "maxmemory-clients disabled"} {
|
||||
r flushall
|
||||
r client no-evict on
|
||||
|
||||
@@ -524,9 +524,12 @@ start_cluster 1 0 {tags {external:skip cluster}} {
|
||||
R 0 SET $key value
|
||||
# +OK\r\n --> 5 bytes
|
||||
|
||||
R 0 GET $key
|
||||
# $3\r\nvalue\r\n -> 11 bytes
|
||||
|
||||
set expected_slot_stats [
|
||||
dict create $key_slot [
|
||||
dict create network-bytes-out 5
|
||||
dict create network-bytes-out 16
|
||||
]
|
||||
]
|
||||
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
|
||||
|
||||
@@ -26,7 +26,6 @@ start_server {tags {"commandlog"} overrides {commandlog-execution-slower-than 10
|
||||
r set testkey $value
|
||||
assert_equal [r commandlog len large-request] 1
|
||||
|
||||
# for large-reply
|
||||
r config set commandlog-reply-larger-than 1024
|
||||
r ping
|
||||
assert_equal [r commandlog len large-reply] 0
|
||||
@@ -120,6 +119,9 @@ start_server {tags {"commandlog"} overrides {commandlog-execution-slower-than 10
|
||||
assert_equal {foobar} [lindex $e 5]
|
||||
|
||||
# for large-reply
|
||||
set copy_avoid [lindex [r config get min-io-threads-avoid-copy-reply] 1]
|
||||
r config set min-io-threads-avoid-copy-reply 0
|
||||
|
||||
r get testkey
|
||||
set e [lindex [r commandlog get -1 large-reply] 0]
|
||||
assert_equal [llength $e] 6
|
||||
@@ -129,7 +131,10 @@ start_server {tags {"commandlog"} overrides {commandlog-execution-slower-than 10
|
||||
assert_equal [expr {[lindex $e 2] > 1024}] 1
|
||||
assert_equal [lindex $e 3] {get testkey}
|
||||
assert_equal {foobar} [lindex $e 5]
|
||||
} {} {needs:debug}
|
||||
|
||||
# Restore min-io-threads-avoid-copy-reply value
|
||||
r config set min-io-threads-avoid-copy-reply $copy_avoid
|
||||
} {OK} {needs:debug}
|
||||
|
||||
test {COMMANDLOG slow - Certain commands are omitted that contain sensitive information} {
|
||||
r config set commandlog-slow-execution-max-len 100
|
||||
|
||||
@@ -384,6 +384,10 @@ start_server {tags {"info" "external:skip" "debug_defrag:skip"}} {
|
||||
}
|
||||
|
||||
test {stats: client input and output buffer limit disconnections} {
|
||||
# Disable copy avoidance because it affects memory usage
|
||||
set min_size [lindex [r config get min-string-size-avoid-copy-reply] 1]
|
||||
r config set min-string-size-avoid-copy-reply 0
|
||||
|
||||
r config resetstat
|
||||
set info [r info stats]
|
||||
assert_equal [getInfoProperty $info client_query_buffer_limit_disconnections] {0}
|
||||
@@ -407,6 +411,10 @@ start_server {tags {"info" "external:skip" "debug_defrag:skip"}} {
|
||||
r set key [string repeat a 100000] ;# to trigger output buffer limit check this needs to be big
|
||||
catch {r get key}
|
||||
r config set client-output-buffer-limit $org_outbuf_limit
|
||||
|
||||
# Restore copy avoidance configs
|
||||
r config set min-string-size-avoid-copy-reply $min_size
|
||||
|
||||
set info [r info stats]
|
||||
assert_equal [getInfoProperty $info client_output_buffer_limit_disconnections] {1}
|
||||
} {} {logreqres:skip} ;# same as obuf-limits.tcl, skip logreqres
|
||||
|
||||
@@ -1,7 +1,11 @@
|
||||
start_server {tags {"maxmemory" "external:skip"}} {
|
||||
start_server {tags {"maxmemory external:skip"}} {
|
||||
r config set maxmemory 11mb
|
||||
r config set maxmemory-policy allkeys-lru
|
||||
set server_pid [s process_id]
|
||||
# Disable copy avoidance because it affects memory usage
|
||||
r config set min-io-threads-avoid-copy-reply 0
|
||||
r config set min-string-size-avoid-copy-reply 0
|
||||
r config set min-string-size-avoid-copy-reply-threaded 0
|
||||
|
||||
proc init_test {client_eviction} {
|
||||
r flushdb sync
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
start_server {tags {"obuf-limits external:skip logreqres:skip"}} {
|
||||
# Disable copy avoidance because it affects memory usage
|
||||
r config set min-io-threads-avoid-copy-reply 0
|
||||
|
||||
test {CONFIG SET client-output-buffer-limit} {
|
||||
set oldval [lindex [r config get client-output-buffer-limit] 1]
|
||||
|
||||
|
||||
@@ -9,7 +9,11 @@ proc get_reply_buffer_size {cname} {
|
||||
}
|
||||
|
||||
start_server {tags {"replybufsize"}} {
|
||||
|
||||
# Disable copy avoidance because it affects memory usage
|
||||
r config set min-io-threads-avoid-copy-reply 0
|
||||
r config set min-string-size-avoid-copy-reply 0
|
||||
r config set min-string-size-avoid-copy-reply-threaded 0
|
||||
|
||||
test {verify reply buffer limits} {
|
||||
# In order to reduce test time we can set the peak reset time very low
|
||||
r debug replybuffer peak-reset-time 100
|
||||
|
||||
Reference in New Issue
Block a user