mirror of
https://github.com/valkey-io/valkey.git
synced 2026-05-06 05:26:42 -04:00
Add cluster bus network traffic usage metric in bytes (#3396)
Adds cluster bus byte metrics to `CLUSTER INFO`, split by admin,
pub/sub, and module traffic. The change tracks sent and received bytes
on the cluster bus, exposes them as
`cluster_stats_*_bytes_{sent,received}`.
Example:
```
> src/valkey-cli CLUSTER INFO
cluster_state:fail
...
cluster_stats_bytes_sent:46088
cluster_stats_bytes_received:46080
cluster_stats_pubsub_bytes_sent:0
cluster_stats_pubsub_bytes_received:0
cluster_stats_module_bytes_sent:0
cluster_stats_module_bytes_received:0
total_cluster_links_buffer_limit_exceeded:0
```
Fixes: https://github.com/valkey-io/valkey/issues/1929
---------
Signed-off-by: Harkrishn Patro <bunty.hari@gmail.com>
Co-authored-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
This commit is contained in:
@@ -1623,6 +1623,12 @@ void resetClusterStats(void) {
|
||||
|
||||
memset(server.cluster->stats_bus_messages_sent, 0, sizeof(server.cluster->stats_bus_messages_sent));
|
||||
memset(server.cluster->stats_bus_messages_received, 0, sizeof(server.cluster->stats_bus_messages_received));
|
||||
server.cluster->stats_bus_bytes_sent = 0;
|
||||
server.cluster->stats_bus_bytes_received = 0;
|
||||
server.cluster->stats_bus_pubsub_bytes_sent = 0;
|
||||
server.cluster->stats_bus_pubsub_bytes_received = 0;
|
||||
server.cluster->stats_bus_module_bytes_sent = 0;
|
||||
server.cluster->stats_bus_module_bytes_received = 0;
|
||||
server.cluster->stat_cluster_links_buffer_limit_exceeded = 0;
|
||||
}
|
||||
|
||||
|
||||
+32
-2
@@ -3643,12 +3643,30 @@ static inline int messageTypeSupportsLightHdr(uint16_t type) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void clusterBusAddNetworkBytesByType(uint16_t type, uint64_t bytes, bool sent) {
|
||||
sent ? (server.cluster->stats_bus_bytes_sent += bytes)
|
||||
: (server.cluster->stats_bus_bytes_received += bytes);
|
||||
|
||||
if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
|
||||
sent ? (server.cluster->stats_bus_pubsub_bytes_sent += bytes)
|
||||
: (server.cluster->stats_bus_pubsub_bytes_received += bytes);
|
||||
} else if (type == CLUSTERMSG_TYPE_MODULE) {
|
||||
sent ? (server.cluster->stats_bus_module_bytes_sent += bytes)
|
||||
: (server.cluster->stats_bus_module_bytes_received += bytes);
|
||||
}
|
||||
}
|
||||
|
||||
int clusterIsValidPacket(clusterLink *link) {
|
||||
clusterMsgHeader *hdr = (clusterMsgHeader *)link->rcvbuf;
|
||||
uint32_t totlen = ntohl(hdr->totlen);
|
||||
int is_light = IS_LIGHT_MESSAGE(ntohs(hdr->type));
|
||||
uint16_t type = ntohs(hdr->type) & ~CLUSTERMSG_MODIFIER_MASK;
|
||||
|
||||
if (type < CLUSTERMSG_TYPE_COUNT) {
|
||||
server.cluster->stats_bus_messages_received[type]++;
|
||||
clusterBusAddNetworkBytesByType(type, totlen, 0);
|
||||
}
|
||||
|
||||
if (is_light && !messageTypeSupportsLightHdr(type)) {
|
||||
serverLog(LL_NOTICE,
|
||||
"Packet of type '%s' (%u) does not support light cluster header. Marking packet as invalid.",
|
||||
@@ -3656,8 +3674,6 @@ int clusterIsValidPacket(clusterLink *link) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (type < CLUSTERMSG_TYPE_COUNT) server.cluster->stats_bus_messages_received[type]++;
|
||||
|
||||
serverLog(LL_DEBUG, "--- Processing packet of type %s, %lu bytes", clusterGetMessageTypeString(type),
|
||||
(unsigned long)totlen);
|
||||
|
||||
@@ -4477,6 +4493,7 @@ void clusterWriteHandler(connection *conn) {
|
||||
handleLinkIOError(link);
|
||||
return;
|
||||
}
|
||||
clusterBusAddNetworkBytesByType(ntohs(msg->type) & ~CLUSTERMSG_MODIFIER_MASK, nwritten, 1);
|
||||
if (msg_offset + nwritten < msg_len) {
|
||||
/* If full message wasn't written, record the offset
|
||||
* and continue sending from this point next time */
|
||||
@@ -7371,6 +7388,19 @@ sds genClusterInfoString(sds info) {
|
||||
(long long)server.cluster->stats_bus_messages_received[i]);
|
||||
}
|
||||
info = sdscatfmt(info, "cluster_stats_messages_received:%I\r\n", tot_msg_received);
|
||||
info = sdscatfmt(info,
|
||||
"cluster_stats_bytes_sent:%U\r\n"
|
||||
"cluster_stats_bytes_received:%U\r\n"
|
||||
"cluster_stats_pubsub_bytes_sent:%U\r\n"
|
||||
"cluster_stats_pubsub_bytes_received:%U\r\n"
|
||||
"cluster_stats_module_bytes_sent:%U\r\n"
|
||||
"cluster_stats_module_bytes_received:%U\r\n",
|
||||
(unsigned long long)server.cluster->stats_bus_bytes_sent,
|
||||
(unsigned long long)server.cluster->stats_bus_bytes_received,
|
||||
(unsigned long long)server.cluster->stats_bus_pubsub_bytes_sent,
|
||||
(unsigned long long)server.cluster->stats_bus_pubsub_bytes_received,
|
||||
(unsigned long long)server.cluster->stats_bus_module_bytes_sent,
|
||||
(unsigned long long)server.cluster->stats_bus_module_bytes_received);
|
||||
|
||||
info = sdscatfmt(info, "total_cluster_links_buffer_limit_exceeded:%U\r\n",
|
||||
(unsigned long long)server.cluster->stat_cluster_links_buffer_limit_exceeded);
|
||||
|
||||
@@ -471,6 +471,12 @@ struct clusterState {
|
||||
/* Messages received and sent by type. */
|
||||
long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT];
|
||||
long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT];
|
||||
uint64_t stats_bus_bytes_sent;
|
||||
uint64_t stats_bus_bytes_received;
|
||||
uint64_t stats_bus_pubsub_bytes_sent;
|
||||
uint64_t stats_bus_pubsub_bytes_received;
|
||||
uint64_t stats_bus_module_bytes_sent;
|
||||
uint64_t stats_bus_module_bytes_received;
|
||||
long long stats_pfail_nodes; /* Number of nodes in PFAIL status,
|
||||
excluding nodes without address. */
|
||||
unsigned long long stat_cluster_links_buffer_limit_exceeded; /* Total number of cluster links freed due to exceeding
|
||||
|
||||
@@ -43,6 +43,15 @@ test "errorstats: rejected call due to MOVED Redirection" {
|
||||
} ;# start_cluster
|
||||
|
||||
start_cluster 3 0 {tags {external:skip cluster} overrides {cluster-node-timeout 1000}} {
|
||||
test "cluster bus byte stats move on a healthy cluster" {
|
||||
wait_for_condition 1000 50 {
|
||||
[CI 0 cluster_stats_bytes_sent] >= 1 &&
|
||||
[CI 0 cluster_stats_bytes_received] >= 1
|
||||
} else {
|
||||
fail "R 0 cluster bus byte stats are not as expected"
|
||||
}
|
||||
}
|
||||
|
||||
test "fail reason changed" {
|
||||
# Kill one primary, so the cluster fail with not-full-coverage.
|
||||
pause_process [srv 0 pid]
|
||||
@@ -69,6 +78,8 @@ start_cluster 3 0 {tags {external:skip cluster} overrides {cluster-node-timeout
|
||||
wait_for_condition 1000 10 {
|
||||
[CI 0 cluster_stats_messages_sent] >= 1 &&
|
||||
[CI 0 cluster_stats_messages_received] >= 1 &&
|
||||
[CI 0 cluster_stats_bytes_sent] >= 1 &&
|
||||
[CI 0 cluster_stats_bytes_received] >= 1 &&
|
||||
[CI 0 total_cluster_links_buffer_limit_exceeded] >= 1
|
||||
} else {
|
||||
fail "R 0 related info fields are not as expected"
|
||||
@@ -81,6 +92,12 @@ start_cluster 3 0 {tags {external:skip cluster} overrides {cluster-node-timeout
|
||||
|
||||
assert_equal [getInfoProperty $info cluster_stats_messages_sent] 0
|
||||
assert_equal [getInfoProperty $info cluster_stats_messages_received] 0
|
||||
assert_equal [getInfoProperty $info cluster_stats_bytes_sent] 0
|
||||
assert_equal [getInfoProperty $info cluster_stats_bytes_received] 0
|
||||
assert_equal [getInfoProperty $info cluster_stats_pubsub_bytes_sent] 0
|
||||
assert_equal [getInfoProperty $info cluster_stats_pubsub_bytes_received] 0
|
||||
assert_equal [getInfoProperty $info cluster_stats_module_bytes_sent] 0
|
||||
assert_equal [getInfoProperty $info cluster_stats_module_bytes_received] 0
|
||||
assert_equal [getInfoProperty $info total_cluster_links_buffer_limit_exceeded] 0
|
||||
|
||||
R 0 config set cluster-link-sendbuf-limit 0
|
||||
|
||||
@@ -38,13 +38,24 @@ test "Test publishing to slave" {
|
||||
|
||||
start_cluster 3 0 {tags {external:skip cluster}} {
|
||||
test "Test cluster info stats for publish" {
|
||||
R 0 CONFIG RESETSTAT
|
||||
R 1 CONFIG RESETSTAT
|
||||
R 2 CONFIG RESETSTAT
|
||||
|
||||
R 0 PUBLISH hello world
|
||||
assert_equal 2 [CI 0 cluster_stats_messages_publish_sent]
|
||||
wait_for_condition 50 100 {
|
||||
[CI 1 cluster_stats_messages_publish_received] eq 1 &&
|
||||
[CI 2 cluster_stats_messages_publish_received] eq 1
|
||||
[CI 2 cluster_stats_messages_publish_received] eq 1 &&
|
||||
[CI 1 cluster_stats_pubsub_bytes_received] > 0 &&
|
||||
[CI 2 cluster_stats_pubsub_bytes_received] > 0
|
||||
} else {
|
||||
fail "node 2 or node 3 didn't receive clusterbus publish packet"
|
||||
}
|
||||
|
||||
set sender_pubsub_bytes [CI 0 cluster_stats_pubsub_bytes_sent]
|
||||
set receiver_pubsub_bytes [expr {[CI 1 cluster_stats_pubsub_bytes_received] + [CI 2 cluster_stats_pubsub_bytes_received]}]
|
||||
assert_morethan $sender_pubsub_bytes 0
|
||||
assert_equal $sender_pubsub_bytes $receiver_pubsub_bytes
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,24 +13,39 @@ start_cluster 3 0 [list config_lines $modules] {
|
||||
set node3 [srv -2 client]
|
||||
|
||||
test "Cluster module send message API - VM_SendClusterMessage" {
|
||||
R 0 CONFIG RESETSTAT
|
||||
R 1 CONFIG RESETSTAT
|
||||
R 2 CONFIG RESETSTAT
|
||||
|
||||
assert_equal OK [$node1 test.pingall]
|
||||
assert_equal 2 [CI 0 cluster_stats_messages_module_sent]
|
||||
wait_for_condition 50 100 {
|
||||
[CI 1 cluster_stats_messages_module_received] eq 1 &&
|
||||
[CI 2 cluster_stats_messages_module_received] eq 1
|
||||
[CI 2 cluster_stats_messages_module_received] eq 1 &&
|
||||
[CI 1 cluster_stats_module_bytes_received] > 0 &&
|
||||
[CI 2 cluster_stats_module_bytes_received] > 0
|
||||
} else {
|
||||
fail "node 2 or node 3 didn't receive cluster module message"
|
||||
}
|
||||
set sent_module_bytes [CI 0 cluster_stats_module_bytes_sent]
|
||||
set received_module_bytes [expr {[CI 1 cluster_stats_module_bytes_received] + [CI 2 cluster_stats_module_bytes_received]}]
|
||||
assert {$sent_module_bytes > 0}
|
||||
assert_equal $sent_module_bytes $received_module_bytes
|
||||
verify_log_message -1 "*DING (type 1) RECEIVED*Hey*" 0
|
||||
verify_log_message -2 "*DING (type 1) RECEIVED*Hey*" 0
|
||||
}
|
||||
|
||||
test "Cluster module receive message API - VM_RegisterClusterMessageReceiver" {
|
||||
wait_for_condition 50 100 {
|
||||
[CI 0 cluster_stats_messages_module_received] eq 2
|
||||
[CI 0 cluster_stats_messages_module_received] eq 2 &&
|
||||
[CI 0 cluster_stats_module_bytes_received] > 0
|
||||
} else {
|
||||
fail "node 1 didn't receive DONG messages"
|
||||
}
|
||||
set received_module_bytes [CI 0 cluster_stats_module_bytes_received]
|
||||
set sent_module_bytes [expr {[CI 1 cluster_stats_module_bytes_sent] + [CI 2 cluster_stats_module_bytes_sent]}]
|
||||
assert {$received_module_bytes > 0}
|
||||
assert_equal $received_module_bytes $sent_module_bytes
|
||||
wait_for_condition 50 100 {
|
||||
[count_log_message 0 "* <cluster> DONG (type 2) RECEIVED*"] eq 2
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user