Files
valkey/cmake/Modules/SourceFiles.cmake
T
Akash Kumar a3a839972f Redesign IO threading communication model (#3324)
#### Summary
This PR redesigns the IO threading communication model, replacing the
inefficient client-list polling approach with a high-performance,
lock-free queue architecture. This change improves throughput by
**8–17%** across various workloads and lays the groundwork for
offloading command execution to IO threads in following PRs.



### Performance Comparison: Unstable vs New IO Queues

| Type | Operation | Unstable Branch (M TPS) | New IO Queues (M TPS)|
Difference (%) |
| :--- | :--- | :--- | :--- | :--- |
| **CME**<sup>1</sup> | SET | 1.02 | 1.19 | **+16.67%** |
| **CME** | GET | 1.30 | 1.47 | **+13.08%** |
| **CMD**<sup>2</sup> | SET | 1.15 | 1.35 | **+17.39%** |
| **CMD** | GET | 1.52 | 1.64 | **+7.89%** |

<sup>1</sup> Amazon terminology for cluster mode
<sup>2</sup> Amazon termonology for standalone mode, i.e. config
`cluster-enabled no`

- Test Configuration: 8 IO threads • 400 clients • 512-byte values • 3M
keys

#### Motivation
The previous IO model had several limitations that created performance
bottlenecks:
* **Inefficient Polling:** The main thread lacks a direct notification
mechanism for completed work. Instead, it must constantly iterate
through a list of all pending clients to check their state, wasting
significant CPU cycles.
* **Manual Load Balancing:** Jobs are assigned to specific threads
upfront. This requires the main thread to predict which thread to use,
often leaving some threads idle while others are overloaded.
* **Static Scaling:** Thread activation relies on a fixed heuristic
(e.g., 1 thread per 2 events). This approach fails to adapt to varying
workloads, such as TLS connections or differing read/write sizes.

### The Solution
To address these inefficiencies, this PR replaces the single SPSC queue
used currently with three specialized queues to handle communication and
load balancing more effectively.

#### 1. Main > IO: Shared Queue (Single Producer Multi Consumer)
Single queue from the main-thread to IO threads.
* **Automatic Load Balancing:** All threads pull from the same source.
Busy threads take less work, and idle threads take more, so we don't
need to manually select a thread.
* **Adaptive Scaling:** We now use the queue depth to decide when to add
or remove threads. If the queue is full, we scale up; if it's empty, we
scale down.
* *Ignition:* To get things started before the queue fills up, we
monitor the main thread's CPU. If usage goes over 30%, we wake up the
first IO thread.
* **Implementation:** To prevent contention among consumers, each item
in the ring buffer is padded to reside in its own cache line. Sequence
numbers are utilized to indicate whether a cell is empty or populated,
allowing threads to safely claim work.

#### 2. IO > Main: The Response Channel (MPSC Queue)
We replaced the old polling loop with a response queue.
* ** Faster Completion:** IO threads push completed jobs into this
queue. The main thread detects new data simply by checking if the queue
is not empty, removing the need to scan pending clients.
* **Contention Management:** To avoid lock contention, each thread
reserves a slot by atomically incrementing the tail index. In the rare
event that the queue is full, pending jobs are buffered in a local
temporary list until space becomes available.

#### 3. MAIN > IO (Thread-Specific): Private Inbox (SPSC Queue)
We kept the existing Single-Producer Single-Consumer (SPSC) queues for
tasks that must happen on a specific thread (like freeing memory
allocated by that thread). IO threads always check their private inbox
before looking at the shared queue.

### Changes Required
* **Async client release**
The main thread no longer busy-waits for IO threads to finish with a
client. Since the client must be popped from the multi-producer queue
before it can be released, clients with pending IO are now marked for
asynchronous closure.
* **eviction clients logic**
Updated evictClients() to account for memory pending release (clients
marked close_asap). freeClient() now returns a status code (1 for freed,
0 for async-close) to ensure the eviction loop does not over-evict by
ignoring memory that is about to be reclaimed.
* **events-per-io-thread config**
Replaced the `events-per-io-thread` configuration with
`io-threads-always-active`. as we no longer track events, since this
config is use only for tests no backward compatibility issue arises.
* **packed job instead of handlers**
Jobs are now represented as tagged pointers (using lower 3 bits for job
type) instead of separate `{handler, data}` structs. This reduces memory
overhead and allows jobs to be passed through the queues as single
pointers.
* **head caching in spsc queue**
The SPSC queue now caches the `head` index on the producer side
(`head_cache`) to avoid frequent atomic loads. The producer only
refreshes from the atomic `head` when the cache indicates the queue
might be full, reducing cross-thread cache-line bouncing.

* **deferred commit in SPSC queue**.   
`spscEnqueue()` supports batching via a `commit` flag. Multiple jobs can
be enqueued with `commit=false`, then flushed with a single
`spscCommit()` call, reducing atomic operations and cache-line bouncing.
* **rollback on fullness check failure**
When `spmcEnqueue()` fails due to a full queue, the client state is
rolled back (e.g., `io_write_state` reset to `CLIENT_IDLE`). This
rollback approach removes the need to call an expensive `isFull` check
before every enqueue, we just attempt the enqueue and revert if it
fails.

* **epoll offloading via SPSC at high thread counts**.  
When `active_io_threads_num > 9`, poll jobs are sent to per-thread SPSC
queues (round-robin). Since threads check their private queue first,
this ensures poll jobs are processed promptly without waiting behind
jobs in the shared SPMC queue.
* **avoid offload write before read comes back**
Added a check `if (c->io_read_state == CLIENT_PENDING_IO) return C_OK`
in `trySendWriteToIOThreads()`. In the previous per-thread SPSC
implementation, we could send consecutive read and write jobs for the
same client knowing a single thread would handle them in order. With the
shared SPMC queue, different threads may pick up the jobs, so we must
wait for the read to complete before sending a write to avoid 2 threads
handling the same client.
* **removing pending_read_list_node from client and
clients_pending_io_read/write lists from server**
Removed `pending_read_list_node` from the `client` struct and
`clients_pending_io_read`/`clients_pending_io_write` lists from
`valkeyServer`. as the new mpsc eliminates the need for these tracking
structures.
* **added inst metrics for pending io jobs**
Added `instantaneous_io_pending_jobs` metric via `STATS_METRIC_IO_WAIT`
to track average queue depth over time.
* **added stat for current active threads number**
Added `active_io_threads_num` to the INFO stats output for better
visibility.
* **added internal inst metric for main-thread cpu (non apple
compliant)**
Added `STATS_METRIC_MAIN_THREAD_CPU_SYS` to track main thread CPU usage
via `getrusage(RUSAGE_THREAD)`. This powers the "ignition" policy, when
CPU exceeds 30%, the first IO thread is activated. `RUSAGE_THREAD` is
Linux-specific, so macOS falls back to event-count heuristics.
* **added stat for pending read and writes for io**
Added `io_threaded_reads_pending` and `io_threaded_writes_pending` stats
to track how many read/write jobs are currently in-flight to IO threads.
* **added volatile for crashed**
Changed `server.crashed` from `int` to `volatile int` to ensure the
crash flag is visible across threads immediately, allowing IO threads to
detect a crash and stop sending responses back to the main thread to
avoid deadlock on crash.

---------

Signed-off-by: Uri Yagelnik <uriy@amazon.com>
Signed-off-by: akash kumar <akumdev@amazon.com>
Co-authored-by: Uri Yagelnik <uriy@amazon.com>
Co-authored-by: Dan Touitou <dan.touitou@gmail.com>
2026-04-13 13:48:20 -07:00

184 lines
6.7 KiB
CMake

# -------------------------------------------------
# Define the sources to be built
# -------------------------------------------------
# valkey-server source files
set(VALKEY_SERVER_SRCS
${CMAKE_SOURCE_DIR}/src/threads_mngr.c
${CMAKE_SOURCE_DIR}/src/adlist.c
${CMAKE_SOURCE_DIR}/src/vector.c
${CMAKE_SOURCE_DIR}/src/quicklist.c
${CMAKE_SOURCE_DIR}/src/ae.c
${CMAKE_SOURCE_DIR}/src/anet.c
${CMAKE_SOURCE_DIR}/src/hashtable.c
${CMAKE_SOURCE_DIR}/src/kvstore.c
${CMAKE_SOURCE_DIR}/src/sds.c
${CMAKE_SOURCE_DIR}/src/zmalloc.c
${CMAKE_SOURCE_DIR}/src/lzf_c.c
${CMAKE_SOURCE_DIR}/src/lzf_d.c
${CMAKE_SOURCE_DIR}/src/pqsort.c
${CMAKE_SOURCE_DIR}/src/zipmap.c
${CMAKE_SOURCE_DIR}/src/sha1.c
${CMAKE_SOURCE_DIR}/src/ziplist.c
${CMAKE_SOURCE_DIR}/src/release.c
${CMAKE_SOURCE_DIR}/src/memory_prefetch.c
${CMAKE_SOURCE_DIR}/src/io_threads.c
${CMAKE_SOURCE_DIR}/src/networking.c
${CMAKE_SOURCE_DIR}/src/util.c
${CMAKE_SOURCE_DIR}/src/object.c
${CMAKE_SOURCE_DIR}/src/db.c
${CMAKE_SOURCE_DIR}/src/replication.c
${CMAKE_SOURCE_DIR}/src/rdb.c
${CMAKE_SOURCE_DIR}/src/t_string.c
${CMAKE_SOURCE_DIR}/src/t_list.c
${CMAKE_SOURCE_DIR}/src/t_set.c
${CMAKE_SOURCE_DIR}/src/t_zset.c
${CMAKE_SOURCE_DIR}/src/t_hash.c
${CMAKE_SOURCE_DIR}/src/config.c
${CMAKE_SOURCE_DIR}/src/aof.c
${CMAKE_SOURCE_DIR}/src/pubsub.c
${CMAKE_SOURCE_DIR}/src/multi.c
${CMAKE_SOURCE_DIR}/src/debug.c
${CMAKE_SOURCE_DIR}/src/sort.c
${CMAKE_SOURCE_DIR}/src/intset.c
${CMAKE_SOURCE_DIR}/src/syncio.c
${CMAKE_SOURCE_DIR}/src/cluster.c
${CMAKE_SOURCE_DIR}/src/cluster_migrateslots.c
${CMAKE_SOURCE_DIR}/src/cluster_legacy.c
${CMAKE_SOURCE_DIR}/src/cluster_slot_stats.c
${CMAKE_SOURCE_DIR}/src/crc16.c
${CMAKE_SOURCE_DIR}/src/crc16_slottable.c
${CMAKE_SOURCE_DIR}/src/commandlog.c
${CMAKE_SOURCE_DIR}/src/eval.c
${CMAKE_SOURCE_DIR}/src/bio.c
${CMAKE_SOURCE_DIR}/src/rio.c
${CMAKE_SOURCE_DIR}/src/rand.c
${CMAKE_SOURCE_DIR}/src/memtest.c
${CMAKE_SOURCE_DIR}/src/syscheck.c
${CMAKE_SOURCE_DIR}/src/crcspeed.c
${CMAKE_SOURCE_DIR}/src/crccombine.c
${CMAKE_SOURCE_DIR}/src/crc64.c
${CMAKE_SOURCE_DIR}/src/bitops.c
${CMAKE_SOURCE_DIR}/src/sentinel.c
${CMAKE_SOURCE_DIR}/src/notify.c
${CMAKE_SOURCE_DIR}/src/setproctitle.c
${CMAKE_SOURCE_DIR}/src/blocked.c
${CMAKE_SOURCE_DIR}/src/hyperloglog.c
${CMAKE_SOURCE_DIR}/src/latency.c
${CMAKE_SOURCE_DIR}/src/sparkline.c
${CMAKE_SOURCE_DIR}/src/valkey-check-rdb.c
${CMAKE_SOURCE_DIR}/src/valkey-check-aof.c
${CMAKE_SOURCE_DIR}/src/valkey_strtod.c
${CMAKE_SOURCE_DIR}/src/geo.c
${CMAKE_SOURCE_DIR}/src/lazyfree.c
${CMAKE_SOURCE_DIR}/src/module.c
${CMAKE_SOURCE_DIR}/src/lrulfu.c
${CMAKE_SOURCE_DIR}/src/evict.c
${CMAKE_SOURCE_DIR}/src/expire.c
${CMAKE_SOURCE_DIR}/src/geohash.c
${CMAKE_SOURCE_DIR}/src/geohash_helper.c
${CMAKE_SOURCE_DIR}/src/childinfo.c
${CMAKE_SOURCE_DIR}/src/allocator_defrag.c
${CMAKE_SOURCE_DIR}/src/defrag.c
${CMAKE_SOURCE_DIR}/src/siphash.c
${CMAKE_SOURCE_DIR}/src/rax.c
${CMAKE_SOURCE_DIR}/src/t_stream.c
${CMAKE_SOURCE_DIR}/src/listpack.c
${CMAKE_SOURCE_DIR}/src/localtime.c
${CMAKE_SOURCE_DIR}/src/lolwut.c
${CMAKE_SOURCE_DIR}/src/lolwut5.c
${CMAKE_SOURCE_DIR}/src/lolwut6.c
${CMAKE_SOURCE_DIR}/src/lolwut9.c
${CMAKE_SOURCE_DIR}/src/acl.c
${CMAKE_SOURCE_DIR}/src/tracking.c
${CMAKE_SOURCE_DIR}/src/socket.c
${CMAKE_SOURCE_DIR}/src/tls.c
${CMAKE_SOURCE_DIR}/src/rdma.c
${CMAKE_SOURCE_DIR}/src/sha256.c
${CMAKE_SOURCE_DIR}/src/timeout.c
${CMAKE_SOURCE_DIR}/src/setcpuaffinity.c
${CMAKE_SOURCE_DIR}/src/monotonic.c
${CMAKE_SOURCE_DIR}/src/mt19937-64.c
${CMAKE_SOURCE_DIR}/src/resp_parser.c
${CMAKE_SOURCE_DIR}/src/call_reply.c
${CMAKE_SOURCE_DIR}/src/script.c
${CMAKE_SOURCE_DIR}/src/functions.c
${CMAKE_SOURCE_DIR}/src/scripting_engine.c
${CMAKE_SOURCE_DIR}/src/trace/trace.c
${CMAKE_SOURCE_DIR}/src/trace/trace_rdb.c
${CMAKE_SOURCE_DIR}/src/trace/trace_aof.c
${CMAKE_SOURCE_DIR}/src/trace/trace_commands.c
${CMAKE_SOURCE_DIR}/src/trace/trace_db.c
${CMAKE_SOURCE_DIR}/src/trace/trace_cluster.c
${CMAKE_SOURCE_DIR}/src/trace/trace_server.c
${CMAKE_SOURCE_DIR}/src/commands.c
${CMAKE_SOURCE_DIR}/src/strl.c
${CMAKE_SOURCE_DIR}/src/connection.c
${CMAKE_SOURCE_DIR}/src/unix.c
${CMAKE_SOURCE_DIR}/src/server.c
${CMAKE_SOURCE_DIR}/src/logreqres.c
${CMAKE_SOURCE_DIR}/src/entry.c
${CMAKE_SOURCE_DIR}/src/vset.c
${CMAKE_SOURCE_DIR}/src/fifo.c
${CMAKE_SOURCE_DIR}/src/mutexqueue.c
${CMAKE_SOURCE_DIR}/src/queues.c)
# valkey-cli
set(VALKEY_CLI_SRCS
${CMAKE_SOURCE_DIR}/src/anet.c
${CMAKE_SOURCE_DIR}/src/adlist.c
${CMAKE_SOURCE_DIR}/src/hashtable.c
${CMAKE_SOURCE_DIR}/src/sds.c
${CMAKE_SOURCE_DIR}/src/sha256.c
${CMAKE_SOURCE_DIR}/src/util.c
${CMAKE_SOURCE_DIR}/src/valkey-cli.c
${CMAKE_SOURCE_DIR}/src/valkey_strtod.c
${CMAKE_SOURCE_DIR}/src/zmalloc.c
${CMAKE_SOURCE_DIR}/src/release.c
${CMAKE_SOURCE_DIR}/src/ae.c
${CMAKE_SOURCE_DIR}/src/serverassert.c
${CMAKE_SOURCE_DIR}/src/crcspeed.c
${CMAKE_SOURCE_DIR}/src/crccombine.c
${CMAKE_SOURCE_DIR}/src/crc64.c
${CMAKE_SOURCE_DIR}/src/siphash.c
${CMAKE_SOURCE_DIR}/src/crc16.c
${CMAKE_SOURCE_DIR}/src/monotonic.c
${CMAKE_SOURCE_DIR}/src/cli_common.c
${CMAKE_SOURCE_DIR}/src/mt19937-64.c
${CMAKE_SOURCE_DIR}/src/strl.c
${CMAKE_SOURCE_DIR}/src/cli_commands.c)
# valkey-benchmark
set(VALKEY_BENCHMARK_SRCS
${CMAKE_SOURCE_DIR}/src/ae.c
${CMAKE_SOURCE_DIR}/src/anet.c
${CMAKE_SOURCE_DIR}/src/sds.c
${CMAKE_SOURCE_DIR}/src/sha256.c
${CMAKE_SOURCE_DIR}/src/util.c
${CMAKE_SOURCE_DIR}/src/valkey-benchmark.c
${CMAKE_SOURCE_DIR}/src/valkey_strtod.c
${CMAKE_SOURCE_DIR}/src/adlist.c
${CMAKE_SOURCE_DIR}/src/hashtable.c
${CMAKE_SOURCE_DIR}/src/zmalloc.c
${CMAKE_SOURCE_DIR}/src/serverassert.c
${CMAKE_SOURCE_DIR}/src/release.c
${CMAKE_SOURCE_DIR}/src/crcspeed.c
${CMAKE_SOURCE_DIR}/src/crccombine.c
${CMAKE_SOURCE_DIR}/src/crc64.c
${CMAKE_SOURCE_DIR}/src/siphash.c
${CMAKE_SOURCE_DIR}/src/crc16.c
${CMAKE_SOURCE_DIR}/src/crc16_slottable.c
${CMAKE_SOURCE_DIR}/src/monotonic.c
${CMAKE_SOURCE_DIR}/src/cli_common.c
${CMAKE_SOURCE_DIR}/src/mt19937-64.c
${CMAKE_SOURCE_DIR}/src/strl.c
${CMAKE_SOURCE_DIR}/src/fuzzer_client.c
${CMAKE_SOURCE_DIR}/src/fuzzer_command_generator.c)
# valkey-rdma module
set(VALKEY_RDMA_MODULE_SRCS ${CMAKE_SOURCE_DIR}/src/rdma.c)
# valkey-tls module
set(VALKEY_TLS_MODULE_SRCS ${CMAKE_SOURCE_DIR}/src/tls.c)