Add structured datasets loading capability in valkey benchmark (#2823)

## Background

Add structured datasets loading capability. Support CSV and TSV file
formats. Use `__field:fieldname__` placeholders to replace the
corresponding fields from the dataset file. Support natural content size
of varying length. Allow mixed placeholder usage combining dataset
fields with random generators. Enable automatic field discovery from
CSV/TSV headers. Use `--maxdocs` to limit the dataset loading.

Rather than modifying the existing placeholder system, we detect field
placeholders and switch to a separate code path that builds commands
from scratch using `valkeyFormatCommandArgv()`. This ensures:

- Zero impact on existing functionality
- Full support for variable-size content
- Thread-safe atomic record iteration
- Compatible with pipelining and threading modes

__Usage examples__

```sh
# Strings - Simple key-value with dataset fields
./valkey-benchmark --dataset products.csv -n 10000 SET product:__rand_int__ "__field:name__"

# Sets - Unique collections from dataset
./valkey-benchmark --dataset categories.csv -n 10000 SADD tags:__rand_int__ "__field:category__"

# CSV dataset with document limit
./valkey-benchmark --dataset wiki.csv --maxdocs 100000 -n 50000 HSET doc:__rand_int__ title "__field:title__" body "__field:abstract__"

# Mixed placeholders (dataset + random)
./valkey-benchmark --dataset terms.csv -r 5000000 -n 50000 HSET search:__rand_int__ term "__field:term__" score __rand_1st__
```

__Full-Text Search Benchmarking__

```sh
# Search hit scenarios (existing terms)
./valkey-benchmark --dataset search_terms.csv -n 50000 FT.SEARCH rd0 "__field:term__"

# Search miss scenarios (non-existent terms)
./valkey-benchmark --dataset miss_terms.csv -n 50000 FT.SEARCH rd0 "__field:term__"

# Query variations
./valkey-benchmark --dataset search_terms.csv -n 50000 FT.SEARCH rd0 "@title:__field:term__"
./valkey-benchmark --dataset search_terms.csv -n 50000 FT.SEARCH rd0 "__field:term__*"
```

__Benchmark Results__


Test environment:
__Instance:__ AWS c7i.16xlarge, 64 vCPU

Test Dataset: 5M+ Wikipedia XML documents, 5.8GB memory

| Configuration | Throughput | CPU Usage | Wall Time | Memory Peak |
|---------------|------------|-----------|-----------|-------------|
| Single-threaded, P1 | 93,295 RPS | 99% | 71.4s | 5.8GB |
| Multi-threaded (10), P1 | 93,332 RPS | 137% | 71.5s | 5.8GB |
| Single-threaded, P10 | 274,499 RPS | 96% | 36.1s | 5.8GB |
| Multi-threaded (4), P10 | 344,589 RPS | 161% | 32.4s | 5.8GB |

---------

Signed-off-by: Ram Prasad Voleti <ramvolet@amazon.com>
Co-authored-by: Ram Prasad Voleti <ramvolet@amazon.com>
This commit is contained in:
VoletiRam
2026-04-29 09:18:37 -07:00
committed by GitHub
parent 16ed690fec
commit 39036c7c06
6 changed files with 984 additions and 25 deletions
+1
View File
@@ -157,6 +157,7 @@ set(VALKEY_BENCHMARK_SRCS
${CMAKE_SOURCE_DIR}/src/sha256.c
${CMAKE_SOURCE_DIR}/src/util.c
${CMAKE_SOURCE_DIR}/src/valkey-benchmark.c
${CMAKE_SOURCE_DIR}/src/valkey-benchmark-dataset.c
${CMAKE_SOURCE_DIR}/src/valkey_strtod.c
${CMAKE_SOURCE_DIR}/src/adlist.c
${CMAKE_SOURCE_DIR}/src/hashtable.c
+1
View File
@@ -631,6 +631,7 @@ ENGINE_BENCHMARK_OBJ = \
strl.o \
util.o \
valkey-benchmark.o \
valkey-benchmark-dataset.o \
valkey_strtod.o \
zmalloc.o
ENGINE_CHECK_RDB_NAME=$(ENGINE_NAME)-check-rdb$(PROG_SUFFIX)
+459
View File
@@ -0,0 +1,459 @@
/* Dataset support for valkey-benchmark
*
* Copyright Valkey Contributors.
* All rights reserved.
* SPDX-License-Identifier: BSD 3-Clause
*/
#include "fmacros.h"
#include "valkey-benchmark-dataset.h"
#include "zmalloc.h"
#include <valkey/valkey.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
/* Internal constants */
#define PLACEHOLDER_COUNT 10
#define PLACEHOLDER_LEN 12
static const char *PLACEHOLDERS[PLACEHOLDER_COUNT] = {
"__rand_int__", "__rand_1st__", "__rand_2nd__", "__rand_3rd__", "__rand_4th__",
"__rand_5th__", "__rand_6th__", "__rand_7th__", "__rand_8th__", "__rand_9th__"};
/* Forward declarations */
static bool datasetBuildFieldMap(dataset *ds, sds *template_argv, int template_argc);
static sds getFieldValue(const char *row, int column_index, char delimiter);
static bool csvDiscoverFields(dataset *ds);
static bool csvLoadDocuments(dataset *ds, int verbose);
static bool shouldStopLoading(dataset *ds);
static int findFieldIndex(dataset *ds, const char *field_name, size_t field_name_len);
static const char *extractDatasetFieldValue(dataset *ds, int field_idx, int record_index);
static sds replaceOccurrence(sds processed_arg, const char *pos, const char *replacement);
static sds processFieldsInArg(dataset *ds, sds arg, int record_index);
static sds processRandPlaceholdersForDataSet(sds cmd, _Atomic uint64_t *seq_key, int replace_placeholders, int keyspacelen, int sequential_replacement);
dataset *datasetInit(const char *filename, int max_documents, int has_field_placeholders, sds *template_argv, int template_argc, int verbose) {
if (!filename) return NULL;
dataset *ds = zcalloc(sizeof(dataset));
if (!ds) return NULL;
ds->filename = filename;
ds->max_documents = max_documents;
/* Detect format */
if (strstr(filename, ".csv")) {
ds->format = DATASET_FORMAT_CSV;
ds->delimiter = ',';
} else if (strstr(filename, ".tsv")) {
ds->format = DATASET_FORMAT_TSV;
ds->delimiter = '\t';
} else {
ds->format = DATASET_FORMAT_CSV;
ds->delimiter = ',';
}
if (!csvDiscoverFields(ds)) goto error;
/* Build field map if needed (BEFORE loading) */
if (has_field_placeholders && template_argv && template_argc > 0) {
if (!datasetBuildFieldMap(ds, template_argv, template_argc)) goto error;
} else {
ds->used_field_count = ds->field_count;
}
if (!csvLoadDocuments(ds, verbose)) goto error;
return ds;
error:
datasetFree(ds);
return NULL;
}
void datasetFree(dataset *ds) {
if (!ds) return;
if (ds->field_names) {
/* Unified memory management: all formats use zmalloc + individual sdsnew */
for (int i = 0; i < ds->field_count; i++) {
sdsfree(ds->field_names[i]);
}
zfree(ds->field_names);
}
if (ds->field_map) {
zfree(ds->field_map);
}
if (ds->records) {
for (size_t i = 0; i < ds->record_count; i++) {
if (ds->records[i].fields) {
for (int j = 0; j < ds->used_field_count; j++) {
sdsfree(ds->records[i].fields[j]);
}
zfree(ds->records[i].fields);
}
}
zfree(ds->records);
}
zfree(ds);
}
bool datasetBuildFieldMap(dataset *ds, sds *template_argv, int template_argc) {
if (!ds) return false;
ds->field_map = zmalloc(ds->field_count * sizeof(int));
ds->used_field_count = 0;
for (int i = 0; i < ds->field_count; i++) {
ds->field_map[i] = -1;
}
for (int arg_idx = 0; arg_idx < template_argc; arg_idx++) {
const char *arg = template_argv[arg_idx];
const char *field_pos = strstr(arg, FIELD_PREFIX);
while (field_pos) {
const char *field_start = field_pos + FIELD_PREFIX_LEN;
const char *field_end = strstr(field_start, FIELD_SUFFIX);
if (!field_end) break;
size_t field_name_len = field_end - field_start;
sds field_name = sdsnewlen(field_start, field_name_len);
int field_idx = -1;
for (int k = 0; k < ds->field_count; k++) {
if (!strcmp(field_name, ds->field_names[k])) {
field_idx = k;
break;
}
}
if (field_idx == -1) {
fprintf(stderr, "Error: Field placeholder '__field:%s__' not found in dataset fields\n", field_name);
fprintf(stderr, "Available fields: ");
for (int j = 0; j < ds->field_count; j++) {
fprintf(stderr, "%s%s", ds->field_names[j], (j < ds->field_count - 1) ? ", " : "\n");
}
sdsfree(field_name);
return false;
}
if (ds->field_map[field_idx] == -1) {
ds->field_map[field_idx] = ds->used_field_count++;
}
sdsfree(field_name);
field_pos = strstr(field_end + FIELD_SUFFIX_LEN, FIELD_PREFIX);
}
}
return true;
}
size_t datasetGetRecordCount(dataset *ds) {
return ds ? ds->record_count : 0;
}
sds datasetGenerateCommand(dataset *ds, int record_index, sds *template_argv, int template_argc, _Atomic uint64_t *seq_key, int replace_placeholders, int keyspacelen, int sequential_replacement) {
if (!ds || !template_argv) return NULL;
sds *processed_argv = zmalloc(template_argc * sizeof(sds));
for (int i = 0; i < template_argc; i++) {
processed_argv[i] = processFieldsInArg(ds, sdsdup(template_argv[i]), record_index);
}
char *cmd = NULL;
int len = valkeyFormatCommandArgv(&cmd, template_argc, (const char **)processed_argv, NULL);
if (len == -1) {
for (int i = 0; i < template_argc; i++) {
sdsfree(processed_argv[i]);
}
zfree(processed_argv);
if (cmd) {
free(cmd);
}
return NULL;
}
sds result = sdsnewlen(cmd, len);
free(cmd);
result = processRandPlaceholdersForDataSet(result, seq_key, replace_placeholders,
keyspacelen, sequential_replacement);
for (int i = 0; i < template_argc; i++) {
sdsfree(processed_argv[i]);
}
zfree(processed_argv);
return result;
}
static bool shouldStopLoading(dataset *ds) {
if (ds->max_documents > 0 && (int)ds->record_count >= ds->max_documents) {
return true;
}
return false;
}
static sds getFieldValue(const char *row, int column_index, char delimiter) {
int current_col = 0;
const char *start = row;
const char *p = row;
int in_quotes = 0;
while (*p) {
if (*p == '"') {
in_quotes = !in_quotes;
} else if (*p == delimiter && !in_quotes) {
if (current_col == column_index) {
size_t len = p - start;
if (len > 0 && start[0] == '"' && p[-1] == '"') {
start++;
len -= 2;
}
return sdsnewlen(start, len);
}
current_col++;
start = p + 1;
}
p++;
}
if (current_col == column_index) {
size_t len = p - start;
if (len > 0 && start[0] == '"' && p[-1] == '"') {
start++;
len -= 2;
}
return sdsnewlen(start, len);
}
return sdsempty();
}
static bool csvDiscoverFields(dataset *ds) {
FILE *fp = fopen(ds->filename, "r");
if (!fp) {
fprintf(stderr, "Cannot open dataset file: %s\n", ds->filename);
return false;
}
char *line = NULL;
size_t len = 0;
if (getline(&line, &len, fp) == -1) {
fprintf(stderr, "Cannot read header from dataset file\n");
free(line);
fclose(fp);
return false;
}
len = strlen(line);
if (len > 0 && line[len - 1] == '\n') line[len - 1] = '\0';
if (len > 1 && line[len - 2] == '\r') line[len - 2] = '\0';
int count;
char delim_str[2] = {ds->delimiter, '\0'};
sds *temp = sdssplitlen(line, strlen(line), delim_str, 1, &count);
ds->field_names = zmalloc(count * sizeof(sds));
for (int i = 0; i < count; i++) {
ds->field_names[i] = sdsdup(temp[i]);
}
sdsfreesplitres(temp, count);
ds->field_count = count;
free(line);
fclose(fp);
return true;
}
static bool csvLoadDocuments(dataset *ds, int verbose) {
FILE *fp = fopen(ds->filename, "r");
if (!fp) return false;
char *line = NULL;
size_t len = 0;
if (getline(&line, &len, fp) == -1) {
fprintf(stderr, "Cannot read header from dataset file\n");
free(line);
fclose(fp);
return false;
}
size_t capacity = 1000;
ds->records = zmalloc(sizeof(datasetRecord) * capacity);
if (verbose) {
fprintf(stderr, "Loading dataset from %s...\n", ds->filename);
}
int *load_indices = NULL;
int load_count = 0;
if (ds->field_map) {
load_indices = zmalloc(ds->used_field_count * sizeof(int));
for (int i = 0; i < ds->field_count; i++) {
if (ds->field_map[i] >= 0) {
load_indices[load_count++] = i;
}
}
}
while (getline(&line, &len, fp) != -1 && !shouldStopLoading(ds)) {
if (line[0] == '\0' || line[0] == '\n') continue;
size_t line_len = strlen(line);
if (line_len > 0 && line[line_len - 1] == '\n') line[line_len - 1] = '\0';
if (line_len > 1 && line[line_len - 2] == '\r') line[line_len - 2] = '\0';
if (ds->record_count >= capacity) {
capacity *= 2;
ds->records = zrealloc(ds->records, sizeof(datasetRecord) * capacity);
}
datasetRecord *record = &ds->records[ds->record_count];
record->fields = zmalloc(sizeof(sds) * ds->used_field_count);
if (ds->field_map) {
for (int j = 0; j < load_count; j++) {
int orig_idx = load_indices[j];
int mapped_idx = ds->field_map[orig_idx];
record->fields[mapped_idx] = getFieldValue(line, orig_idx, ds->delimiter);
}
} else {
for (int i = 0; i < ds->field_count; i++) {
record->fields[i] = getFieldValue(line, i, ds->delimiter);
}
}
ds->record_count++;
if (verbose && ds->record_count % 1000 == 0) {
fprintf(stderr, "\rLoaded %zu documents...", ds->record_count);
fflush(stderr);
}
}
if (verbose) {
fprintf(stderr, "\rLoaded %zu documents%*s\n", ds->record_count, 20, "");
}
if (load_indices) zfree(load_indices);
free(line);
fclose(fp);
return true;
}
static int findFieldIndex(dataset *ds, const char *field_name, size_t field_name_len) {
for (int k = 0; k < ds->field_count; k++) {
if (strlen(ds->field_names[k]) == field_name_len &&
!memcmp(ds->field_names[k], field_name, field_name_len)) {
return ds->field_map ? ds->field_map[k] : k;
}
}
return -1;
}
static const char *extractDatasetFieldValue(dataset *ds, int field_idx, int record_index) {
return ds->records[record_index].fields[field_idx];
}
static sds replaceOccurrence(sds processed_arg, const char *pos, const char *replacement) {
size_t offset = pos - processed_arg;
size_t replacement_len = strlen(replacement);
size_t total_len = offset + replacement_len + (sdslen(processed_arg) - offset - PLACEHOLDER_LEN);
sds result = sdsnewlen(NULL, total_len);
char *p = result;
memcpy(p, processed_arg, offset);
p += offset;
memcpy(p, replacement, replacement_len);
p += replacement_len;
const char *after_start = pos + PLACEHOLDER_LEN;
size_t after_len = sdslen(processed_arg) - offset - PLACEHOLDER_LEN;
memcpy(p, after_start, after_len);
sdsfree(processed_arg);
return result;
}
static sds processFieldsInArg(dataset *ds, sds arg, int record_index) {
if (!strstr(arg, FIELD_PREFIX)) return arg;
/* Loop through all field placeholders in the argument */
while (strstr(arg, FIELD_PREFIX)) {
const char *field_pos = strstr(arg, FIELD_PREFIX);
const char *field_start = field_pos + FIELD_PREFIX_LEN;
const char *field_end = strstr(field_start, FIELD_SUFFIX);
if (!field_end) break;
size_t field_name_len = field_end - field_start;
int field_idx = findFieldIndex(ds, field_start, field_name_len);
if (field_idx == -1) break;
const char *field_value = extractDatasetFieldValue(ds, field_idx, record_index);
size_t before_len = field_pos - arg;
const char *after_start = field_end + FIELD_SUFFIX_LEN;
sds result = sdsnewlen(arg, before_len);
result = sdscat(result, field_value);
result = sdscat(result, after_start);
sdsfree(arg);
arg = result;
}
return arg;
}
static sds processRandPlaceholdersForDataSet(sds cmd, _Atomic uint64_t *seq_key, int replace_placeholders, int keyspacelen, int sequential_replacement) {
if (!replace_placeholders || keyspacelen == 0) return cmd;
for (int ph = 0; ph < PLACEHOLDER_COUNT; ph++) {
if (!strstr(cmd, PLACEHOLDERS[ph])) continue;
uint64_t shared_key = 0;
int generate_shared_key = (ph != 0);
if (generate_shared_key) {
if (sequential_replacement) {
shared_key = atomic_fetch_add_explicit(&seq_key[ph], 1, memory_order_relaxed);
} else {
shared_key = (uint64_t)random();
}
shared_key %= keyspacelen;
}
size_t search_offset = 0;
char *pos;
while ((pos = strstr(cmd + search_offset, PLACEHOLDERS[ph])) != NULL) {
uint64_t key = generate_shared_key ? shared_key : 0;
if (!generate_shared_key) {
if (sequential_replacement) {
key = atomic_fetch_add_explicit(&seq_key[ph], 1, memory_order_relaxed);
} else {
key = (uint64_t)random();
}
key %= keyspacelen;
}
char key_str[24];
snprintf(key_str, sizeof(key_str), "%012llu", (unsigned long long)key);
size_t offset = pos - cmd;
cmd = replaceOccurrence(cmd, pos, key_str);
search_offset = offset + PLACEHOLDER_LEN;
}
}
return cmd;
}
+64
View File
@@ -0,0 +1,64 @@
/* Dataset support for valkey-benchmark
*
* Copyright Valkey Contributors.
* All rights reserved.
* SPDX-License-Identifier: BSD 3-Clause
*/
#ifndef VALKEY_BENCHMARK_DATASET_H
#define VALKEY_BENCHMARK_DATASET_H
#include "sds.h"
#include <stdbool.h>
#include <stddef.h>
#ifndef __cplusplus
#include <stdatomic.h>
#endif
/* Dataset constants */
#define MAX_DATASET_FIELDS 1000
#define MAX_FIELD_NAME_LEN 512
#define FIELD_PREFIX "__field:"
#define FIELD_PREFIX_LEN 8
#define FIELD_SUFFIX "__"
#define FIELD_SUFFIX_LEN 2
/* Dataset format types */
typedef enum datasetFormat {
DATASET_FORMAT_CSV = 0,
DATASET_FORMAT_TSV
} datasetFormat;
/* Dataset structures */
typedef struct datasetRecord {
sds *fields;
} datasetRecord;
typedef struct dataset {
datasetFormat format;
char delimiter;
sds *field_names;
int field_count;
int *field_map;
int used_field_count;
datasetRecord *records;
size_t record_count;
const char *filename;
int max_documents;
} dataset;
/* Initialize dataset from file - returns NULL on error */
dataset *datasetInit(const char *filename, int max_documents, int has_field_placeholders, sds *template_argv, int template_argc, int verbose);
/* Free dataset and all memory */
void datasetFree(dataset *ds);
/* Get number of records */
size_t datasetGetRecordCount(dataset *ds);
#ifndef __cplusplus
/* Generate complete command for given record index (caller must sdsfree) */
sds datasetGenerateCommand(dataset *ds, int record_index, sds *template_argv, int template_argc, _Atomic uint64_t *seq_key, int replace_placeholders, int keyspacelen, int sequential_replacement);
#endif
#endif /* VALKEY_BENCHMARK_DATASET_H */
+185 -24
View File
@@ -61,6 +61,7 @@
#include "hdr_histogram.h"
#include "cli_common.h"
#include "mt19937-64.h"
#include "valkey-benchmark-dataset.h"
#define UNUSED(V) ((void)V)
#define RANDPTR_INITIAL_SIZE 8
@@ -160,6 +161,14 @@ static struct config {
atomic_uint_fast64_t last_time_ns;
uint64_t time_per_token;
uint64_t time_per_burst;
/* Dataset support */
sds dataset_file;
int max_documents; /* Maximum documents to load from dataset */
dataset *current_dataset; /* Current loaded dataset */
/* Command template for dataset mode */
int template_argc;
sds *template_argv;
int has_field_placeholders;
} config;
/* Locations of the placeholders __rand_int__, __rand_1st__,
@@ -171,6 +180,9 @@ static struct placeholders {
size_t *index_data; /* allocation holding all index data */
} placeholders;
/* Sequence keys for dataset command generation */
static _Atomic uint64_t dataset_seq_key[PLACEHOLDER_COUNT] = {0};
typedef struct _client {
valkeyContext *context;
sds obuf;
@@ -237,6 +249,7 @@ static int fetchClusterSlotsConfiguration(client c);
static void updateClusterSlotsConfiguration(void);
static long long showThroughput(struct aeEventLoop *eventLoop, long long id, void *clientData);
int runFuzzerClients(const char *host, int port, int max_commands, int parallel_clients, int cluster_mode, int num_keys, cliSSLconfig *ssl_config, const char *log_level, int fuzz_flags);
static int parseCommandTemplate(int argc, char **argv);
/* Dict callbacks */
static uint64_t dictSdsHash(const void *key);
@@ -567,6 +580,29 @@ static void resetClient(client c) {
c->pending = config.pipeline * c->seqlen;
}
/* Scan buffer for {tag} placeholders and store positions */
static void scanClusterTags(client c, char *buffer_start) {
if (!config.cluster_mode) return;
/* Preserve the total capacity across scans so we don't accidentally
* shrink the allocation when staglen is reset to zero. */
size_t total_cap = c->staglen + c->stagfree;
c->staglen = 0;
c->stagfree = total_cap;
char *p = buffer_start;
while ((p = strstr(p, "{tag}")) != NULL) {
if (c->stagfree == 0) {
size_t new_size = total_cap ? total_cap * 2 : RANDPTR_INITIAL_SIZE;
c->stagptr = zrealloc(c->stagptr, new_size * sizeof(char *));
total_cap = new_size;
c->stagfree = new_size - c->staglen;
}
c->stagptr[c->staglen++] = p;
c->stagfree--;
p += 5;
}
}
static void setClusterKeyHashTag(client c) {
assert(c->thread_id >= 0);
clusterNode *node = c->cluster_node;
@@ -864,8 +900,33 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
return;
}
/* Really initialize: replace keys and set start time. */
if (config.replace_placeholders) replacePlaceholders(c->obuf + c->prefixlen, config.pipeline);
/* Dataset field access mode - completely independent command generation */
if (config.has_field_placeholders && config.current_dataset && config.current_dataset->record_count > 0) {
static _Atomic uint64_t record_counter = 0;
/* Generate complete pipeline commands for dataset placeholders */
sdssetlen(c->obuf, c->prefixlen);
for (int p = 0; p < config.pipeline; p++) {
uint64_t record_index = atomic_fetch_add_explicit(&record_counter, 1, memory_order_relaxed) % config.current_dataset->record_count;
sds complete_cmd = datasetGenerateCommand(config.current_dataset, record_index,
config.template_argv, config.template_argc,
dataset_seq_key, config.replace_placeholders,
config.keyspacelen, config.sequential_replacement);
c->obuf = sdscatlen(c->obuf, complete_cmd, sdslen(complete_cmd));
sdsfree(complete_cmd);
}
/* Scan generated commands for {tag} in cluster mode */
if (config.cluster_mode && c->stagptr) {
scanClusterTags(c, c->obuf + c->prefixlen);
}
} else {
/* Standard mode */
if (config.replace_placeholders) {
replacePlaceholders(c->obuf + c->prefixlen, config.pipeline);
}
}
if (config.cluster_mode && c->staglen > 0) setClusterKeyHashTag(c);
c->slots_last_update = atomic_load_explicit(&config.slots_last_update, memory_order_relaxed);
c->start = ustime();
@@ -1048,20 +1109,9 @@ static client createClient(char *cmd, int len, int seqlen, client from, int thre
c->stagptr[j] += c->prefixlen - from->prefixlen;
}
} else {
char *p = c->obuf;
c->staglen = 0;
c->stagfree = RANDPTR_INITIAL_SIZE;
c->stagptr = zmalloc(sizeof(char *) * c->stagfree);
while ((p = strstr(p, "{tag}")) != NULL) {
if (c->stagfree == 0) {
c->stagptr = zrealloc(c->stagptr, sizeof(char *) * c->staglen * 2);
c->stagfree += c->staglen;
}
c->stagptr[c->staglen++] = p;
c->stagfree--;
p += 5; /* 5 is strlen("{tag}"). */
}
scanClusterTags(c, c->obuf);
}
}
aeEventLoop *el = NULL;
@@ -1612,6 +1662,55 @@ static void updateClusterSlotsConfiguration(void) {
pthread_mutex_unlock(&config.is_updating_slots_mutex);
}
/* Free dataset memory */
static void cleanupDataset(void) {
if (config.current_dataset) {
datasetFree(config.current_dataset);
config.current_dataset = NULL;
}
}
/* Add RESP command to sequence with repeat count */
static void addRespCommandToSequence(sds *sds_args, size_t *argvlen, int start, int end, int repeat, sds *cmd_seq, int *seq_len) {
char *cmd = NULL;
int len = valkeyFormatCommandArgv(&cmd, end - start, (const char **)sds_args + start, argvlen + start);
for (int j = 0; j < repeat; j++) {
*cmd_seq = sdscatlen(*cmd_seq, cmd, len);
}
*seq_len += repeat;
free(cmd);
}
/* Parse and setup command template for dataset field validation */
static int parseCommandTemplate(int argc, char **argv) {
sds *sds_args = getSdsArrayFromArgv(argc, argv, 0);
if (!sds_args) {
fprintf(stderr, "Invalid quoted string\n");
return 0;
}
/* Detect field placeholders */
config.has_field_placeholders = 0;
for (int i = 0; i < argc; i++) {
if (strstr(sds_args[i], FIELD_PREFIX)) {
config.has_field_placeholders = 1;
break;
}
}
if (config.has_field_placeholders) {
config.template_argc = argc;
config.template_argv = zmalloc(argc * sizeof(sds));
for (int i = 0; i < argc; i++) {
config.template_argv[i] = sdsdup(sds_args[i]);
}
}
sdsfreesplitres(sds_args, argc);
return 1;
}
/* Generate random data for the benchmark. See #7196. */
static void genBenchmarkRandomData(char *data, int count) {
static uint32_t state = 1234;
@@ -1785,6 +1884,13 @@ int parseOptions(int argc, char **argv) {
config.num_functions = atoi(argv[++i]);
} else if (!strcmp(argv[i], "--num-keys-in-fcall")) {
config.num_keys_in_fcall = atoi(argv[++i]);
} else if (!strcmp(argv[i], "--dataset")) {
if (lastarg) goto invalid;
config.dataset_file = sdsnew(argv[++i]);
} else if (!strcmp(argv[i], "--maxdocs")) {
if (lastarg) goto invalid;
config.max_documents = atoi(argv[++i]);
if (config.max_documents <= 0) config.max_documents = -1;
} else if (!strcmp(argv[i], "--help")) {
exit_status = 0;
goto usage;
@@ -1915,6 +2021,8 @@ usage:
"__rand_1st__ Like __rand_int__ but multiple occurrences will have the same\n"
" value. __rand_2nd__ through __rand_9th__ are also available.\n"
" __data__ Replaced with data of the size specified by the -d option.\n"
" __field:name__ Replaced with data from the specified field/column in the\n"
" dataset. Requires --dataset option.\n"
" {tag} Replaced with a tag that routes the command to each node in\n"
" a cluster. Include this in key names when running in cluster\n"
" mode.\n"
@@ -2000,7 +2108,11 @@ usage:
" loaded when running the 'function_load' test. (default 10).\n"
" --num-keys-in-fcall <num>\n"
" Sets the number of keys passed to FCALL command when running\n"
" the 'fcall' test. (default 1)\n",
" the 'fcall' test. (default 1)\n"
" --dataset <file> Path to CSV/TSV dataset file for field placeholder replacement.\n"
" All fields auto-detected with natural content lengths.\n"
" --maxdocs <num> Maximum number of documents to load from dataset file.\n"
" Default: unlimited.\n",
tls_usage,
rdma_usage,
" --mptcp Enable an MPTCP connection.\n"
@@ -2218,12 +2330,54 @@ int main(int argc, char **argv) {
config.num_functions = 10;
config.num_keys_in_fcall = 1;
config.resp3 = 0;
config.dataset_file = NULL;
config.max_documents = -1; /* -1 = unlimited */
config.current_dataset = NULL;
config.template_argc = 0;
config.template_argv = NULL;
config.has_field_placeholders = 0;
resetPlaceholders();
i = parseOptions(argc, argv);
argc -= i;
argv += i;
/* Setup dataset if specified */
if (config.dataset_file) {
if (argc == 0) {
fprintf(stderr, "Error: Dataset mode requires a command with field placeholders\n");
fprintf(stderr, "Example: SET doc:__rand_int__ \"__field:content__\"\n");
exit(1);
}
/* Parse command template and setup field placeholder detection */
if (!parseCommandTemplate(argc, argv)) {
exit(1);
}
/* Dataset mode requires at least one field placeholder in the command
* template. Without it, the dataset would be initialized and reported
* but never used for command generation. */
if (!config.has_field_placeholders) {
fprintf(stderr,
"Error: Dataset mode requires at least one field placeholder\n");
fprintf(stderr,
"Example: SET doc:__rand_int__ \"__field:content__\"\n");
exit(1);
}
/* Initialize dataset - single call does everything atomically */
int verbose = !config.csv && !config.quiet;
config.current_dataset = datasetInit(config.dataset_file,
config.max_documents,
config.has_field_placeholders,
config.template_argv, config.template_argc,
verbose);
if (!config.current_dataset) {
fprintf(stderr, "Failed to initialize dataset\n");
exit(1);
}
}
/* Set default for requests if not specified */
if (config.requests < 0) config.requests = 100000;
@@ -2382,18 +2536,15 @@ int main(int argc, char **argv) {
} else if (i == argc || strcmp(";", sds_args[i]) == 0) {
cmd = NULL;
if (i == start) continue;
/* End of command. RESP-encode and append to sequence. */
len = valkeyFormatCommandArgv(&cmd, i - start,
(const char **)sds_args + start,
argvlen + start);
for (int j = 0; j < repeat; j++) {
cmd_seq = sdscatlen(cmd_seq, cmd, len);
}
seq_len += repeat;
free(cmd);
addRespCommandToSequence(sds_args, argvlen, start, i, repeat, &cmd_seq, &seq_len);
start = i + 1;
repeat = 1;
} else if (strstr(sds_args[i], "__data__")) {
if (config.current_dataset) {
fprintf(stderr, "Error: __data__ placeholders cannot be used with --dataset option\n");
exit(1);
}
/* Replace data placeholders with data of length given by -d. */
int num_parts;
sds *parts = sdssplitlen(sds_args[i], sdslen(sds_args[i]),
@@ -2412,6 +2563,7 @@ int main(int argc, char **argv) {
sds_args[i] = newarg;
argvlen[i] = sdslen(sds_args[i]);
}
/* NOTE: Field placeholder processing is handled above in the command-level loop to ensure row consistency */
}
len = sdslen(cmd_seq);
/* adjust the datasize to the parsed command */
@@ -2632,6 +2784,15 @@ int main(int argc, char **argv) {
freeCliConnInfo(config.conn_info);
if (config.server_config != NULL) freeServerConfig(config.server_config);
resetPlaceholders();
cleanupDataset();
/* Clean up command template */
if (config.template_argv) {
for (int i = 0; i < config.template_argc; i++) {
sdsfree(config.template_argv[i]);
}
zfree(config.template_argv);
}
return 0;
}
+274 -1
View File
@@ -9,7 +9,7 @@ proc cmdstat {cmd} {
proc common_bench_setup {cmd} {
r config resetstat
r flushall
if {[catch { exec {*}$cmd } output]} {
if {[catch { exec {*}$cmd 2>@1 } output]} {
set first_line [lindex [split $output "\n"] 0]
puts [colorstr red "valkey-benchmark non zero code, the output is: $output"]
fail "valkey-benchmark non zero code. first line: $first_line"
@@ -199,6 +199,222 @@ tags {"benchmark network external:skip logreqres:skip"} {
assert {$different_count > 0}
}
test {benchmark: dataset CSV with quoted field values containing commas} {
# A value with an embedded comma must be quoted and parse correctly
set csv_data "id,title\n1,\"Book, Part 1\"\n"
set csv_file [tmpfile "quoted.csv"]
set fd [open $csv_file w]
puts $fd $csv_data
close $fd
set cmd [valkeybenchmark $master_host $master_port \
"--dataset $csv_file -n 2 -r 10 -- SET doc:__rand_int__ \"__field:title__\""]
common_bench_setup $cmd
# Verify the quoted comma-containing value was stored intact
set keys [r keys "doc:*"]
assert {[llength $keys] > 0}
foreach key $keys {
assert_equal "Book, Part 1" [r get $key]
}
file delete $csv_file
}
test {benchmark: dataset CSV with field placeholders} {
# Create test CSV dataset
set csv_data "title,content,author\nTest Title 1,Test Content 1,Author 1\nTest Title 2,Test Content 2,Author 2"
set csv_file [tmpfile "dataset.csv"]
set fd [open $csv_file w]
puts $fd $csv_data
close $fd
set cmd [valkeybenchmark $master_host $master_port "--dataset $csv_file -n 4 -r 10 -- HSET doc:__rand_int__ title \"__field:title__\" content \"__field:content__\""]
common_bench_setup $cmd
assert_match {*calls=4,*} [cmdstat hset]
# Verify field data was inserted correctly
set keys [r keys "doc:*"]
assert {[llength $keys] > 0}
set sample_key [lindex $keys 0]
set title [r hget $sample_key title]
set content [r hget $sample_key content]
assert {$title eq "Test Title 1" || $title eq "Test Title 2"}
assert {$content eq "Test Content 1" || $content eq "Test Content 2"}
file delete $csv_file
}
test {benchmark: dataset with maxdocs limit} {
# Create test dataset with multiple rows
set csv_data "name,value\nitem1,value1\nitem2,value2\nitem3,value3\nitem4,value4"
set csv_file [tmpfile "dataset.csv"]
set fd [open $csv_file w]
puts $fd $csv_data
close $fd
set cmd [valkeybenchmark $master_host $master_port "--dataset $csv_file --maxdocs 2 -n 4 -r 10 -- SET item:__rand_int__ \"__field:value__\""]
common_bench_setup $cmd
assert_match {*calls=4,*} [cmdstat set]
# Should only use first 2 documents due to maxdocs limit
set keys [r keys "item:*"]
assert {[llength $keys] > 0}
# Verify ALL keys only contain values from first 2 documents
set unique_values {}
foreach key $keys {
set value [r get $key]
assert {$value eq "value1" || $value eq "value2"}
if {[lsearch $unique_values $value] == -1} {
lappend unique_values $value
}
}
file delete $csv_file
}
test {benchmark: dataset error handling - invalid field} {
set csv_data "name,value\nitem1,value1"
set csv_file [tmpfile "dataset.csv"]
set fd [open $csv_file w]
puts $fd $csv_data
close $fd
set cmd [valkeybenchmark $master_host $master_port "--dataset $csv_file -n 1 -- SET item:__rand_int__ \"__field:invalid_field__\""]
# Should fail with invalid field error
if {[catch { exec {*}$cmd } error]} {
assert_match "*not found in dataset fields*" $error
} else {
fail "Expected error for invalid field placeholder"
}
file delete $csv_file
}
test {benchmark: dataset TSV with field placeholders} {
# Create test TSV dataset (tab-separated values)
set tsv_data "name\tvalue\tcount\nitem1\tvalue1\t100\nitem2\tvalue2\t200"
set tsv_file [tmpfile "dataset.tsv"]
set fd [open $tsv_file w]
puts $fd $tsv_data
close $fd
set cmd [valkeybenchmark $master_host $master_port "--dataset $tsv_file -n 4 -r 10 -- HSET tsv_doc:__rand_int__ name \"__field:name__\" value \"__field:value__\" count __field:count__"]
common_bench_setup $cmd
assert_match {*calls=4,*} [cmdstat hset]
# Verify TSV field data was inserted correctly
set keys [r keys "tsv_doc:*"]
assert {[llength $keys] > 0}
set sample_key [lindex $keys 0]
set name [r hget $sample_key name]
set value [r hget $sample_key value]
set count [r hget $sample_key count]
assert {$name eq "item1" || $name eq "item2"}
assert {$value eq "value1" || $value eq "value2"}
assert {$count eq "100" || $count eq "200"}
file delete $tsv_file
}
test {benchmark: dataset with maxdocs larger than available documents} {
# Create test dataset with only 2 rows but request maxdocs=5
set csv_data "name,value\nitem1,value1\nitem2,value2"
set csv_file [tmpfile "dataset.csv"]
set fd [open $csv_file w]
puts $fd $csv_data
close $fd
set cmd [valkeybenchmark $master_host $master_port "--dataset $csv_file --maxdocs 5 -n 4 -r 10 -- SET item:__rand_int__ \"__field:value__\""]
common_bench_setup $cmd
assert_match {*calls=4,*} [cmdstat set]
# Should gracefully use all available documents (2), cycling through them
set keys [r keys "item:*"]
assert {[llength $keys] > 0}
# All values should still be only from available documents
foreach key $keys {
set value [r get $key]
assert {$value eq "value1" || $value eq "value2"}
}
file delete $csv_file
}
test {benchmark: mixed placeholders - dataset fields and rand placeholders} {
# Test combining __field:name__ with __rand_int__ placeholders
set csv_data "category,description\nuser,User Management\norder,Order Processing"
set csv_file [tmpfile "dataset.csv"]
set fd [open $csv_file w]
puts $fd $csv_data
close $fd
set cmd [valkeybenchmark $master_host $master_port "--dataset $csv_file -n 6 -r 100 -- HSET mixed:__rand_int__ category \"__field:category__\" desc \"__field:description__\" score __rand_1st__"]
common_bench_setup $cmd
assert_match {*calls=6,*} [cmdstat hset]
# Verify both field and random placeholders work together
set keys [r keys "mixed:*"]
assert {[llength $keys] > 0}
set sample_key [lindex $keys 0]
set category [r hget $sample_key category]
set desc [r hget $sample_key desc]
set score [r hget $sample_key score]
# Field placeholders should contain dataset values
assert {$category eq "user" || $category eq "order"}
assert {$desc eq "User Management" || $desc eq "Order Processing"}
# Random placeholder should be a 12-digit number
assert {[string length $score] == 12}
assert {[string is digit $score]}
file delete $csv_file
}
test {benchmark: dataset mode requires field placeholders} {
set csv_data "name,value\nitem1,value1\nitem2,value2"
set csv_file [tmpfile "dataset.csv"]
set fd [open $csv_file w]
puts $fd $csv_data
close $fd
# Dataset mode should require field placeholders in the command
set cmd [valkeybenchmark $master_host $master_port "--dataset $csv_file -n 10 -r 10 -t set"]
# Should fail with error about missing field placeholders
if {[catch { exec {*}$cmd } error]} {
assert_match "*Dataset mode requires a command with field placeholders*" $error
} else {
fail "Expected error for dataset mode without field placeholders"
}
file delete $csv_file
}
test {benchmark: dataset with --csv produces clean CSV only} {
# Create test CSV dataset
set csv_data "name,value\ntest1,val1\ntest2,val2"
set csv_file [tmpfile "csv_clean.csv"]
set fd [open $csv_file w]
puts $fd $csv_data
close $fd
set cmd [valkeybenchmark $master_host $master_port "--dataset $csv_file --csv -n 4 -r 10 -- SET item:__rand_int__ \"__field:value__\""]
set output [common_bench_setup $cmd]
# Should NOT contain dataset diagnostic messages (those go to stderr)
assert_no_match "*Dataset:*documents*" $output
assert_no_match "*Loading*dataset*" $output
# Should contain valid CSV format
assert_match "*\"*\",\"*\",\"*\",\"*\",\"*\",\"*\",\"*\",\"*\"*" $output
file delete $csv_file
}
test {benchmark: sequential zadd results in expected number of keys} {
set cmd [valkeybenchmark $master_host $master_port "-r 50 -n 50 --sequential -t zadd"]
common_bench_setup $cmd
@@ -360,3 +576,60 @@ tags {"benchmark network external:skip logreqres:skip"} {
}
}
}
# Cluster mode tests
tags {"benchmark network external:skip cluster"} {
start_cluster 3 0 {tags {"external:skip"}} {
test "Cluster benchmark: dataset with {tag} validates replacement" {
# Create test CSV dataset
set csv_data "id,name\n1,Item1\n2,Item2"
set csv_file [tmpfile "cluster_ds.csv"]
set fd [open $csv_file w]
puts $fd $csv_data
close $fd
# Get cluster node
set host [srv 0 host]
set port [srv 0 port]
# Run with --cluster, {tag}, and dataset
set cmd [valkeybenchmark $host $port "--cluster \
--dataset $csv_file -n 2 -r 100 --sequential \
HSET doc:\{tag\}:__rand_int__ name __field:name__"]
if {[catch { exec {*}$cmd 2>@1 } output]} {
if {![string match "*HSET*" $output]} {
fail "Benchmark failed: $output"
}
}
# Collect all keys from all nodes
set all_keys {}
for {set id 0} {$id < 3} {incr id} {
set keys [R $id keys "doc:*"]
foreach key $keys {
lappend all_keys [list $id $key]
}
}
# Should have exactly 2 keys total
assert {[llength $all_keys] == 2}
# Verify each key
foreach key_info $all_keys {
lassign $key_info node_id key
# {tag} replaced (not literal)
assert {![string match "*\{tag\}*" $key]}
# Has cluster hash tag format
assert {[string match "doc:\{*\}:*" $key]}
# __rand_int__ replaced with 12 digits
assert {[regexp {:\d{12}$} $key]}
# Dataset field inserted
set name [R $node_id hget $key name]
assert {$name in {Item1 Item2}}
}
file delete $csv_file
}
}
}