mirror of
https://github.com/valkey-io/valkey.git
synced 2026-05-06 05:26:42 -04:00
Module command result callback addition (#2936)
## Add Command Result Event Notifications for Modules ### Summary 1. Adds new server events `ValkeyModuleEvent_CommandResultSuccess` and `ValkeyModuleEvent_CommandResultFailure` for that can notify subscribed modules after command execution. This enables modules to implement audit logging, error monitoring, performance tracking, and observability without modifying core server code. 2. Adds new server event `ValkeyModuleEvent_CommandResultACLDenied` for commands rejected by ACL. Together with PR #2237 this covers auditing of authentication and authorisation. ### Motivation There is currently no module API to observe command outcomes after execution or to capture ACL denied commands. Modules that need audit logging or error monitoring have no mechanism to be notified when commands succeed or fail, what arguments were used, how long they took, or how many keys were modified. This feature fills that gap using the existing `ValkeyModule_SubscribeToServerEvent()` infrastructure. ### API #### Events | Event | Description | |---|---| | `ValkeyModuleEvent_CommandResultSuccess` | Fired after a command completes successfully | | `ValkeyModuleEvent_CommandResultFailure` | Fired after a command returns an error | | `ValkeyModuleEvent_CommandACLDenied` | Fired after a command is rejected by ACL | These are separate events (not sub-events), so modules can for example only subscribe to failures without incurring any callback overhead for successful commands. #### Event Data: `ValkeyModuleCommandResultInfo` The `data` pointer passed to the callback can be cast to `ValkeyModuleCommandResultInfo`: ```c typedef struct ValkeyModuleCommandResultInfo { uint64_t version; /* Version of this structure for ABI compat. */ const char *command_name; /* Full command name (e.g., "SET", "CLIENT|LIST"). */ long long duration_us; /* Execution duration in microseconds. */ long long dirty; /* Number of keys modified. */ uint64_t client_id; /* Client ID that executed the command. */ int is_module_client; /* 1 if command was from RM_Call, 0 otherwise. */ int argc; /* Number of command arguments. */ ValkeyModuleString **argv; /* Command arguments array (zero-copy, read-only). */ int acl_deny_reason; /* ACL_DENIED_CMD/KEY/CHANNEL/AUTH; 0 for non-ACL events */ const char *acl_object; /* Denied resource name (key/channel); NULL for CMD/AUTH */ } ValkeyModuleCommandResultInfoV1; ``` The struct is versioned (`VALKEYMODULE_COMMANDRESULTINFO_VERSION`) for forward-compatible API evolution. ### Usage Example ```c /* Callback receives events for whichever event(s) you subscribed to */ void OnCommandResult(ValkeyModuleCtx *ctx, ValkeyModuleEvent eid, uint64_t subevent, void *data) { VALKEYMODULE_NOT_USED(ctx); VALKEYMODULE_NOT_USED(subevent); ValkeyModuleCommandResultInfo *info = (ValkeyModuleCommandResultInfo *)data; if (info->version != VALKEYMODULE_COMMANDRESULTINFO_VERSION) return; int failed = (eid.id == VALKEYMODULE_EVENT_COMMAND_RESULT_FAILURE); /* Access fields directly */ printf("command=%s status=%s duration=%lldus dirty=%lld client=%llu\n", info->command_name, failed ? "FAIL" : "OK", info->duration_us, info->dirty, info->client_id); /* Access argv (read-only, zero-copy) */ for (int i = 0; i < info->argc; i++) { size_t len; const char *arg = ValkeyModule_StringPtrLen(info->argv[i], &len); printf(" argv[%d] = %.*s\n", i, (int)len, arg); } } /* Subscribe in ValkeyModule_OnLoad or at runtime */ /* Option A: command failures only (recommended for audit logging) */ ValkeyModule_SubscribeToServerEvent(ctx, ValkeyModuleEvent_CommandResultFailure, OnCommandResult); /* Option B: command successes only */ ValkeyModule_SubscribeToServerEvent(ctx, ValkeyModuleEvent_CommandResultSuccess, OnCommandResult); /* Option C: both command outcomes*/ ValkeyModule_SubscribeToServerEvent(ctx, ValkeyModuleEvent_CommandResultSuccess, OnCommandResult); ValkeyModule_SubscribeToServerEvent(ctx, ValkeyModuleEvent_CommandResultFailure, OnCommandResult); /* Subscribe to ACL Denied */ ValkeyModule_SubscribeToServerEvent(ctx, ValkeyModuleEvent_CommandResultACLDenied, onCommandResult); /* Unsubscribe pass NULL callback */ ValkeyModule_SubscribeToServerEvent(ctx, ValkeyModuleEvent_CommandResultFailure, NULL); ``` ### Design Decisions - **Separate events instead of sub-events**: Modules subscribing only to failures have zero overhead for successful commands (~2ns listener-list check vs ~30ns callback invocation per command). This is critical since success events fire on the hot path of every command. - **Stack-allocated info struct**: The `ValkeyModuleCommandResultInfoV1` is built on the stack ΓÇö no heap allocation per event. - **Zero-copy argv**: Arguments are passed directly from the client's argv array. Any integer-encoded arguments (from `tryObjectEncoding()` during command execution) are decoded to string-encoded objects before being passed to the callback, ensuring compatibility with `ValkeyModule_StringPtrLen()`. - **Early exit**: If no modules are subscribed to any server events, the event firing function returns immediately before building the info struct. - **Uses existing server event infrastructure**: Follows the `ValkeyModule_SubscribeToServerEvent()` pattern used by all other server events, rather than introducing a new callback mechanism. ### Files Changed | File | Change | |---|---| | `src/valkeymodule.h` | Event IDs, event constants, `ValkeyModuleCommandResultInfoV1` struct | | `src/module.c` | `moduleFireCommandResultEvent()`, event documentation, event version entries | | `src/module.h` | Function declaration | | `src/server.c` | Call `moduleFireCommandResultEvent()` from `call()` after command execution | | `src/server.c` | Call to `moduleFireCommandACLDeniedEvent` in `processCommand` after ACL rejection | | `tests/modules/commandresult.c` | Test module exercising the full API | | `tests/unit/moduleapi/commandresult.tcl` | Integration tests | --------- Signed-off-by: martinrvisser <mvisser@hotmail.com> Signed-off-by: martinrvisser <martinrvisser@users.noreply.github.com> Co-authored-by: Ricardo Dias <rjd15372@gmail.com>
This commit is contained in:
+239
-1
@@ -418,7 +418,11 @@ typedef struct ValkeyModuleEventListener {
|
||||
ValkeyModuleEventCallback callback;
|
||||
} ValkeyModuleEventListener;
|
||||
|
||||
list *ValkeyModule_EventListeners; /* Global list of all the active events. */
|
||||
list *ValkeyModule_EventListeners; /* Global list of all the active events. */
|
||||
static int commandResultSuccessListeners = 0; /* Count of modules listening for command result success. */
|
||||
static int commandResultFailureListeners = 0; /* Count of modules listening for command result failure. */
|
||||
static int commandResultRejectedListeners = 0; /* Count of modules listening for command result rejected. */
|
||||
static int commandResultACLRejectedListeners = 0; /* Count of modules listening for command result ACL rejected. */
|
||||
|
||||
/* Data structures related to the module users */
|
||||
|
||||
@@ -11671,6 +11675,138 @@ unsigned long long VM_CommandFilterGetClientId(ValkeyModuleCommandFilterCtx *fct
|
||||
return fctx->c->id;
|
||||
}
|
||||
|
||||
/* --------------------------------------------------------------------------
|
||||
* ## Module Command Result Event
|
||||
* -------------------------------------------------------------------------- */
|
||||
|
||||
/* Fire command result server event.
|
||||
* This is invoked from call() after command execution. */
|
||||
void moduleFireCommandResultEvent(client *c,
|
||||
struct serverCommand *cmd,
|
||||
int command_failed,
|
||||
long long duration,
|
||||
long long dirty) {
|
||||
/* Fast path: skip if no modules are subscribed to the relevant command
|
||||
* result event. This is an O(1) check using a dedicated counter, avoiding
|
||||
* the cost of argv decoding and struct building when no one is listening. */
|
||||
if (command_failed) {
|
||||
if (commandResultFailureListeners == 0) return;
|
||||
} else {
|
||||
if (commandResultSuccessListeners == 0) return;
|
||||
}
|
||||
|
||||
/* Get argv - prefer original_argv if available (before any rewriting) */
|
||||
robj **argv = c->original_argv ? c->original_argv : c->argv;
|
||||
int argc = c->original_argv ? c->original_argc : c->argc;
|
||||
|
||||
/* Some commands (e.g. SET) call tryObjectEncoding() on argv entries during
|
||||
* execution, converting string args to OBJ_ENCODING_INT. ValkeyModuleString
|
||||
* only supports string-encoded objects, so we decode any INT-encoded entries
|
||||
* before passing them to the module callback. For the common case where no
|
||||
* entries are INT-encoded, this adds only a scan with no allocations. */
|
||||
robj **decoded_argv = argv;
|
||||
int needs_decode = 0;
|
||||
|
||||
for (int i = 0; i < argc; i++) {
|
||||
if (argv[i]->encoding == OBJ_ENCODING_INT) {
|
||||
needs_decode = 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (needs_decode) {
|
||||
decoded_argv = zmalloc(sizeof(robj *) * argc);
|
||||
for (int i = 0; i < argc; i++) {
|
||||
decoded_argv[i] = getDecodedObject(argv[i]);
|
||||
}
|
||||
}
|
||||
|
||||
/* Build the event data structure. The object field is NULL for
|
||||
* success and failure events — it is only populated for REJECTED events. */
|
||||
ValkeyModuleCommandResultInfoV1 info = {
|
||||
.version = VALKEYMODULE_COMMANDRESULTINFO_VERSION,
|
||||
.command_name = cmd ? cmd->fullname : NULL,
|
||||
.duration_us = duration,
|
||||
.dirty = dirty,
|
||||
.client_id = c->id,
|
||||
.is_module_client = (c->flag.module ? 1 : 0),
|
||||
.argc = argc,
|
||||
.argv = (ValkeyModuleString **)decoded_argv,
|
||||
.rejection_context = NULL,
|
||||
};
|
||||
|
||||
/* Fire the appropriate event based on success/failure */
|
||||
uint64_t event_id = command_failed ? VALKEYMODULE_EVENT_COMMAND_RESULT_FAILURE
|
||||
: VALKEYMODULE_EVENT_COMMAND_RESULT_SUCCESS;
|
||||
moduleFireServerEvent(event_id, 0, &info);
|
||||
|
||||
if (needs_decode) {
|
||||
for (int i = 0; i < argc; i++) {
|
||||
decrRefCount(decoded_argv[i]);
|
||||
}
|
||||
zfree(decoded_argv);
|
||||
}
|
||||
}
|
||||
|
||||
/* Fire command result rejected server event (non-ACL rejections).
|
||||
* Called from processCommand() for all pre-execution rejections that are not
|
||||
* ACL-related. reply_str is the full error reply string that was sent to the
|
||||
* client; it is used as rejection_context in the event info. */
|
||||
void moduleFireCommandRejectedEvent(client *c, const char *reply_str) {
|
||||
if (commandResultRejectedListeners == 0) return;
|
||||
|
||||
ValkeyModuleCommandResultInfoV1 info = {
|
||||
.version = VALKEYMODULE_COMMANDRESULTINFO_VERSION,
|
||||
.command_name = c->cmd ? c->cmd->fullname : NULL,
|
||||
.duration_us = 0,
|
||||
.dirty = 0,
|
||||
.client_id = c->id,
|
||||
.is_module_client = (c->flag.module ? 1 : 0),
|
||||
.argc = c->argc,
|
||||
.argv = (ValkeyModuleString **)c->argv,
|
||||
.rejection_context = reply_str,
|
||||
};
|
||||
|
||||
moduleFireServerEvent(VALKEYMODULE_EVENT_COMMAND_RESULT_REJECTED, 0, &info);
|
||||
}
|
||||
|
||||
/* Fire command result ACL rejected server event.
|
||||
* Called from processCommand() when ACLCheckAllPerm() denies the command, or
|
||||
* when authRequired() rejects an unauthenticated client.
|
||||
* subevent is a ValkeyModuleACLLogEntryReason value (VALKEYMODULE_ACL_LOG_*).
|
||||
* errpos is the index into c->argv of the denied key or channel for
|
||||
* VALKEYMODULE_ACL_LOG_KEY/CHANNEL; pass -1 for all other subevents. */
|
||||
void moduleFireCommandACLRejectedEvent(client *c, uint64_t subevent, int errpos) {
|
||||
if (commandResultACLRejectedListeners == 0) return;
|
||||
|
||||
char int_key_buf[LONG_STR_SIZE];
|
||||
const char *rejection_context = NULL;
|
||||
if ((subevent == VALKEYMODULE_ACL_LOG_KEY || subevent == VALKEYMODULE_ACL_LOG_CHANNEL) &&
|
||||
errpos >= 0 && errpos < c->argc) {
|
||||
robj *key_obj = c->argv[errpos];
|
||||
if (key_obj->encoding == OBJ_ENCODING_INT) {
|
||||
ll2string(int_key_buf, sizeof(int_key_buf), (long)objectGetVal(key_obj));
|
||||
rejection_context = int_key_buf;
|
||||
} else {
|
||||
rejection_context = objectGetVal(key_obj);
|
||||
}
|
||||
}
|
||||
|
||||
ValkeyModuleCommandResultInfoV1 info = {
|
||||
.version = VALKEYMODULE_COMMANDRESULTINFO_VERSION,
|
||||
.command_name = c->cmd ? c->cmd->fullname : NULL,
|
||||
.duration_us = 0,
|
||||
.dirty = 0,
|
||||
.client_id = c->id,
|
||||
.is_module_client = (c->flag.module ? 1 : 0),
|
||||
.argc = c->argc,
|
||||
.argv = (ValkeyModuleString **)c->argv,
|
||||
.rejection_context = rejection_context,
|
||||
};
|
||||
|
||||
moduleFireServerEvent(VALKEYMODULE_EVENT_COMMAND_RESULT_ACL_REJECTED, subevent, &info);
|
||||
}
|
||||
|
||||
/* For a given pointer allocated via ValkeyModule_Alloc() or
|
||||
* ValkeyModule_Realloc(), return the amount of memory allocated for it.
|
||||
* Note that this may be different (larger) than the memory we allocated
|
||||
@@ -12124,6 +12260,10 @@ static uint64_t moduleEventVersions[] = {
|
||||
VALKEYMODULE_KEYINFO_VERSION, /* VALKEYMODULE_EVENT_KEY */
|
||||
VALKEYMODULE_AUTHENTICATION_INFO_VERSION, /* VALKEYMODULE_EVENT_AUTHENTICATION_ATTEMPT */
|
||||
VALKEYMODULE_ATOMICSLOTMIGRATION_INFO_VERSION, /* VALKEYMODULE_EVENT_ATOMIC_SLOT_MIGRATION */
|
||||
VALKEYMODULE_COMMANDRESULTINFO_VERSION, /* VALKEYMODULE_EVENT_COMMAND_RESULT_SUCCESS */
|
||||
VALKEYMODULE_COMMANDRESULTINFO_VERSION, /* VALKEYMODULE_EVENT_COMMAND_RESULT_FAILURE */
|
||||
VALKEYMODULE_COMMANDRESULTINFO_VERSION, /* VALKEYMODULE_EVENT_COMMAND_RESULT_REJECTED */
|
||||
VALKEYMODULE_COMMANDRESULTINFO_VERSION, /* VALKEYMODULE_EVENT_COMMAND_RESULT_ACL_REJECTED */
|
||||
};
|
||||
|
||||
/* Register to be notified, via a callback, when the specified server event
|
||||
@@ -12465,6 +12605,79 @@ static uint64_t moduleEventVersions[] = {
|
||||
* Importing keys will not be accessible to clients unless the slot migration
|
||||
* is COMPLETED.
|
||||
*
|
||||
* * ValkeyModuleEvent_CommandResultSuccess
|
||||
*
|
||||
* Called after a command completes successfully. This event fires for every
|
||||
* successful command execution, including commands called via RM_Call.
|
||||
* Modules can subscribe to this event to monitor command execution, collect
|
||||
* metrics, or implement audit logging.
|
||||
*
|
||||
* The data pointer can be casted to a ValkeyModuleCommandResultInfo
|
||||
* structure with the following fields:
|
||||
*
|
||||
* const char *command_name; // Full command name (e.g., "SET", "CLIENT|LIST")
|
||||
* long long duration_us; // Command execution time in microseconds
|
||||
* long long dirty; // Number of keys modified by the command
|
||||
* unsigned long long client_id; // Client ID that executed the command
|
||||
* int is_module_client; // 1 if called via RM_Call, 0 otherwise
|
||||
* int argc; // Number of command arguments
|
||||
* ValkeyModuleString **argv; // Command arguments (read-only, zero-copy)
|
||||
*
|
||||
* Performance note: Subscribe only to ValkeyModuleEvent_CommandResultFailure
|
||||
* if you only need to track failures, as this avoids the overhead of firing
|
||||
* callbacks for successful commands.
|
||||
*
|
||||
* * ValkeyModuleEvent_CommandResultFailure
|
||||
*
|
||||
* Called after a command fails (returns an error). This event fires for every
|
||||
* failed command execution, including commands called via RM_Call.
|
||||
* Modules can subscribe to this event to monitor errors, implement alerting,
|
||||
* or track client misbehavior.
|
||||
*
|
||||
* The data pointer can be casted to a ValkeyModuleCommandResultInfo
|
||||
* structure with the same fields as ValkeyModuleEvent_CommandResultSuccess.
|
||||
*
|
||||
* This event is useful for monitoring command failures without the overhead
|
||||
* of receiving callbacks for all successful commands.
|
||||
*
|
||||
* * ValkeyModuleEvent_CommandResultRejected
|
||||
*
|
||||
* Called when a command is rejected before execution for non-ACL reasons
|
||||
* (unknown command, wrong arity, OOM, cluster redirect, loading,
|
||||
* busy script/module, read-only replica, Pub/Sub context, etc.).
|
||||
*
|
||||
* The subevent is always 0 for this event — there is no subevent breakdown.
|
||||
*
|
||||
* duration_us and dirty are always 0 for this event — the command never ran.
|
||||
*
|
||||
* The data pointer can be casted to a ValkeyModuleCommandResultInfo structure.
|
||||
* The `rejection_context` field carries the full error reply string that was
|
||||
* sent to the client (e.g. "-OOM command not allowed when used memory >
|
||||
* 'maxmemory'", "-ERR Command 'multi' not allowed inside a transaction").
|
||||
*
|
||||
* * ValkeyModuleEvent_CommandResultACLRejected
|
||||
*
|
||||
* Called when a command is rejected before execution due to an ACL check
|
||||
* failure, or when an unauthenticated client sends a command (NOAUTH).
|
||||
*
|
||||
* For NOAUTH, see also ValkeyModuleEvent_AuthenticationAttempt
|
||||
* which covers AUTH/HELLO command outcomes.
|
||||
*
|
||||
* The subevent is a ValkeyModuleACLLogEntryReason value:
|
||||
* - VALKEYMODULE_ACL_LOG_AUTH (0): NOAUTH — client not yet authenticated
|
||||
* - VALKEYMODULE_ACL_LOG_CMD (1): NOPERM — command not permitted
|
||||
* - VALKEYMODULE_ACL_LOG_KEY (2): NOPERM — key access denied
|
||||
* - VALKEYMODULE_ACL_LOG_CHANNEL (3): NOPERM — channel access denied
|
||||
* - VALKEYMODULE_ACL_LOG_DB (4): NOPERM — database access denied
|
||||
*
|
||||
* duration_us and dirty are always 0 for this event — the command never ran.
|
||||
*
|
||||
* The data pointer can be casted to a ValkeyModuleCommandResultInfo structure.
|
||||
* The `rejection_context` field carries subevent-specific context:
|
||||
* - VALKEYMODULE_ACL_LOG_KEY / VALKEYMODULE_ACL_LOG_CHANNEL:
|
||||
* the denied key or channel name from argv
|
||||
* - All other ACL subevents: NULL
|
||||
*
|
||||
* The function returns VALKEYMODULE_OK if the module was successfully subscribed
|
||||
* for the specified event. If the API is called from a wrong context or unsupported event
|
||||
* is given then VALKEYMODULE_ERR is returned. */
|
||||
@@ -12491,6 +12704,14 @@ int VM_SubscribeToServerEvent(ValkeyModuleCtx *ctx, ValkeyModuleEvent event, Val
|
||||
if (callback == NULL) {
|
||||
listDelNode(ValkeyModule_EventListeners, ln);
|
||||
zfree(el);
|
||||
if (event.id == VALKEYMODULE_EVENT_COMMAND_RESULT_SUCCESS)
|
||||
commandResultSuccessListeners--;
|
||||
else if (event.id == VALKEYMODULE_EVENT_COMMAND_RESULT_FAILURE)
|
||||
commandResultFailureListeners--;
|
||||
else if (event.id == VALKEYMODULE_EVENT_COMMAND_RESULT_REJECTED)
|
||||
commandResultRejectedListeners--;
|
||||
else if (event.id == VALKEYMODULE_EVENT_COMMAND_RESULT_ACL_REJECTED)
|
||||
commandResultACLRejectedListeners--;
|
||||
} else {
|
||||
el->callback = callback; /* Update the callback with the new one. */
|
||||
}
|
||||
@@ -12503,6 +12724,14 @@ int VM_SubscribeToServerEvent(ValkeyModuleCtx *ctx, ValkeyModuleEvent event, Val
|
||||
el->event = event;
|
||||
el->callback = callback;
|
||||
listAddNodeTail(ValkeyModule_EventListeners, el);
|
||||
if (event.id == VALKEYMODULE_EVENT_COMMAND_RESULT_SUCCESS)
|
||||
commandResultSuccessListeners++;
|
||||
else if (event.id == VALKEYMODULE_EVENT_COMMAND_RESULT_FAILURE)
|
||||
commandResultFailureListeners++;
|
||||
else if (event.id == VALKEYMODULE_EVENT_COMMAND_RESULT_REJECTED)
|
||||
commandResultRejectedListeners++;
|
||||
else if (event.id == VALKEYMODULE_EVENT_COMMAND_RESULT_ACL_REJECTED)
|
||||
commandResultACLRejectedListeners++;
|
||||
return VALKEYMODULE_OK;
|
||||
}
|
||||
|
||||
@@ -12529,6 +12758,10 @@ int VM_IsSubEventSupported(ValkeyModuleEvent event, int64_t subevent) {
|
||||
case VALKEYMODULE_EVENT_EVENTLOOP: return subevent < _VALKEYMODULE_SUBEVENT_EVENTLOOP_NEXT;
|
||||
case VALKEYMODULE_EVENT_CONFIG: return subevent < _VALKEYMODULE_SUBEVENT_CONFIG_NEXT;
|
||||
case VALKEYMODULE_EVENT_KEY: return subevent < _VALKEYMODULE_SUBEVENT_KEY_NEXT;
|
||||
case VALKEYMODULE_EVENT_COMMAND_RESULT_REJECTED:
|
||||
return subevent == 0;
|
||||
case VALKEYMODULE_EVENT_COMMAND_RESULT_ACL_REJECTED:
|
||||
return subevent < 5; /* ValkeyModuleACLLogEntryReason has 5 values (0-4) */
|
||||
default: break;
|
||||
}
|
||||
return 0;
|
||||
@@ -12618,6 +12851,11 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) {
|
||||
moduledata = data;
|
||||
} else if (eid == VALKEYMODULE_EVENT_ATOMIC_SLOT_MIGRATION) {
|
||||
moduledata = data;
|
||||
} else if (eid == VALKEYMODULE_EVENT_COMMAND_RESULT_SUCCESS ||
|
||||
eid == VALKEYMODULE_EVENT_COMMAND_RESULT_FAILURE ||
|
||||
eid == VALKEYMODULE_EVENT_COMMAND_RESULT_REJECTED ||
|
||||
eid == VALKEYMODULE_EVENT_COMMAND_RESULT_ACL_REJECTED) {
|
||||
moduledata = data;
|
||||
}
|
||||
|
||||
el->module->in_hook++;
|
||||
|
||||
@@ -23,6 +23,7 @@ struct ValkeyModuleCtx;
|
||||
struct moduleLoadQueueEntry;
|
||||
struct ValkeyModuleKeyOptCtx;
|
||||
struct ValkeyModuleCommand;
|
||||
struct ValkeyModuleCommandResult;
|
||||
struct clusterState;
|
||||
|
||||
/* Each module type implementation should export a set of methods in order
|
||||
@@ -213,6 +214,13 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid)
|
||||
unsigned long moduleNotifyKeyspaceSubscribersCnt(void);
|
||||
void firePostExecutionUnitJobs(void);
|
||||
void moduleCallCommandFilters(client *c);
|
||||
void moduleFireCommandResultEvent(client *c,
|
||||
struct serverCommand *cmd,
|
||||
int command_failed,
|
||||
long long duration,
|
||||
long long dirty);
|
||||
void moduleFireCommandRejectedEvent(client *c, const char *reply_str);
|
||||
void moduleFireCommandACLRejectedEvent(client *c, uint64_t subevent, int errpos);
|
||||
void modulePostExecutionUnitOperations(void);
|
||||
void ModuleForkDoneHandler(int exitcode, int bysignal);
|
||||
int TerminateModuleForkChild(int child_pid, int wait);
|
||||
|
||||
+65
-33
@@ -3968,14 +3968,19 @@ void call(client *c, int flags) {
|
||||
|
||||
/* Update failed command calls if required. */
|
||||
|
||||
if (!incrCommandStatsOnError(real_cmd, ERROR_COMMAND_FAILED) && c->deferred_reply_errors) {
|
||||
int command_failed = incrCommandStatsOnError(real_cmd, ERROR_COMMAND_FAILED);
|
||||
if (!command_failed && c->deferred_reply_errors) {
|
||||
/* When call is used from a module client, error stats, and total_error_replies
|
||||
* isn't updated since these errors, if handled by the module, are internal,
|
||||
* and not reflected to users. however, the commandstats does show these calls
|
||||
* (made by RM_Call), so it should log if they failed or succeeded. */
|
||||
real_cmd->failed_calls++;
|
||||
command_failed = 1;
|
||||
}
|
||||
|
||||
/* Fire command result event for subscribed modules. */
|
||||
moduleFireCommandResultEvent(c, real_cmd, command_failed, duration, dirty);
|
||||
|
||||
/* After executing command, we will close the client after writing entire
|
||||
* reply if it is set 'CLIENT_CLOSE_AFTER_COMMAND' flag. */
|
||||
if (c->flag.close_after_command) {
|
||||
@@ -4116,8 +4121,9 @@ void call(client *c, int flags) {
|
||||
* If there's a transaction is flags it as dirty, and if the command is EXEC,
|
||||
* it aborts the transaction.
|
||||
* The duration is reset, since we reject the command, and it did not record.
|
||||
* Note: 'reply' is expected to end with \r\n */
|
||||
void rejectCommand(client *c, robj *reply) {
|
||||
* Note: 'reply' is expected to end with \r\n.
|
||||
* If notify_modules is non-zero, fires ValkeyModuleEvent_CommandResultRejected. */
|
||||
void rejectCommand(client *c, robj *reply, int notify_modules) {
|
||||
flagTransaction(c);
|
||||
c->duration = 0;
|
||||
if (c->cmd) c->cmd->rejected_calls++;
|
||||
@@ -4127,12 +4133,16 @@ void rejectCommand(client *c, robj *reply) {
|
||||
/* using addReplyError* rather than addReply so that the error can be logged. */
|
||||
addReplyErrorObject(c, reply);
|
||||
}
|
||||
if (notify_modules) moduleFireCommandRejectedEvent(c, objectGetVal(reply));
|
||||
}
|
||||
|
||||
void rejectCommandSds(client *c, sds s) {
|
||||
/* notify_modules controls whether ValkeyModuleEvent_CommandResultRejected is fired.
|
||||
* The event is fired before 's' is consumed so the string remains valid for callbacks. */
|
||||
void rejectCommandSds(client *c, sds s, int notify_modules) {
|
||||
flagTransaction(c);
|
||||
c->duration = 0;
|
||||
if (c->cmd) c->cmd->rejected_calls++;
|
||||
if (notify_modules) moduleFireCommandRejectedEvent(c, s);
|
||||
if (c->cmd && c->cmd->proc == execCommand) {
|
||||
execCommandAbort(c, s);
|
||||
sdsfree(s);
|
||||
@@ -4142,7 +4152,7 @@ void rejectCommandSds(client *c, sds s) {
|
||||
}
|
||||
}
|
||||
|
||||
void rejectCommandFormat(client *c, const char *fmt, ...) {
|
||||
void rejectCommandFormat(client *c, int notify_modules, const char *fmt, ...) {
|
||||
va_list ap;
|
||||
va_start(ap, fmt);
|
||||
sds s = sdscatvprintf(sdsempty(), fmt, ap);
|
||||
@@ -4150,7 +4160,7 @@ void rejectCommandFormat(client *c, const char *fmt, ...) {
|
||||
/* Make sure there are no newlines in the string, otherwise invalid protocol
|
||||
* is emitted (The args come from the user, they may contain any character). */
|
||||
sdsmapchars(s, "\r\n", " ", 2);
|
||||
rejectCommandSds(c, s);
|
||||
rejectCommandSds(c, s, notify_modules);
|
||||
}
|
||||
|
||||
/* This is called after a command in call, we can do some maintenance job in it. */
|
||||
@@ -4335,7 +4345,8 @@ int processCommand(client *c) {
|
||||
/* AUTH and HELLO and no auth commands are valid even in
|
||||
* non-authenticated state. */
|
||||
if (!c->cmd || !(c->cmd->flags & CMD_NO_AUTH)) {
|
||||
rejectCommand(c, shared.noautherr);
|
||||
rejectCommand(c, shared.noautherr, 0);
|
||||
moduleFireCommandACLRejectedEvent(c, VALKEYMODULE_ACL_LOG_AUTH, -1);
|
||||
return C_OK;
|
||||
}
|
||||
}
|
||||
@@ -4344,13 +4355,13 @@ int processCommand(client *c) {
|
||||
sds err;
|
||||
|
||||
if (!commandCheckExistence(c, &err)) {
|
||||
rejectCommandSds(c, err);
|
||||
rejectCommandSds(c, err, 1);
|
||||
return C_OK;
|
||||
}
|
||||
if (c->read_flags & READ_FLAGS_BAD_ARITY) {
|
||||
/* Already detected this, but do it again just to get the error message. */
|
||||
serverAssert(!commandCheckArity(c->cmd, c->argc, &err));
|
||||
rejectCommandSds(c, err);
|
||||
rejectCommandSds(c, err, 1);
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
@@ -4358,12 +4369,15 @@ int processCommand(client *c) {
|
||||
if (c->cmd->flags & CMD_PROTECTED) {
|
||||
if ((c->cmd->proc == debugCommand && !allowProtectedAction(server.enable_debug_cmd, c)) ||
|
||||
(c->cmd->proc == moduleCommand && !allowProtectedAction(server.enable_module_cmd, c))) {
|
||||
rejectCommandFormat(c,
|
||||
"%s command not allowed. If the %s option is set to \"local\", "
|
||||
"you can run it from a local connection, otherwise you need to set this option "
|
||||
"in the configuration file, and then restart the server.",
|
||||
c->cmd->proc == debugCommand ? "DEBUG" : "MODULE",
|
||||
c->cmd->proc == debugCommand ? "enable-debug-command" : "enable-module-command");
|
||||
sds protected_err = sdscatprintf(
|
||||
sdsempty(),
|
||||
"%s command not allowed. If the %s option is set to \"local\", "
|
||||
"you can run it from a local connection, otherwise you need to set this option "
|
||||
"in the configuration file, and then restart the server.",
|
||||
c->cmd->proc == debugCommand ? "DEBUG" : "MODULE",
|
||||
c->cmd->proc == debugCommand ? "enable-debug-command" : "enable-module-command");
|
||||
sdsmapchars(protected_err, "\r\n", " ", 2);
|
||||
rejectCommandSds(c, protected_err, 1);
|
||||
return C_OK;
|
||||
}
|
||||
}
|
||||
@@ -4388,7 +4402,10 @@ int processCommand(client *c) {
|
||||
const int obey_client = mustObeyClient(c);
|
||||
|
||||
if (c->flag.multi && c->cmd->flags & CMD_NO_MULTI) {
|
||||
rejectCommandFormat(c, "Command '%s' not allowed inside a transaction", c->cmd->fullname);
|
||||
sds nomulti_err =
|
||||
sdscatprintf(sdsempty(), "Command '%s' not allowed inside a transaction", c->cmd->fullname);
|
||||
sdsmapchars(nomulti_err, "\r\n", " ", 2);
|
||||
rejectCommandSds(c, nomulti_err, 1);
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
@@ -4400,8 +4417,16 @@ int processCommand(client *c) {
|
||||
addACLLogEntry(c, acl_retval, (c->flag.multi) ? ACL_LOG_CTX_MULTI : ACL_LOG_CTX_TOPLEVEL, acl_errpos, NULL,
|
||||
NULL);
|
||||
sds msg = getAclErrorMessage(acl_retval, c->user, c->cmd, objectGetVal(c->argv[acl_errpos]), 0);
|
||||
rejectCommandFormat(c, "-NOPERM %s", msg);
|
||||
rejectCommandFormat(c, 0, "-NOPERM %s", msg);
|
||||
sdsfree(msg);
|
||||
uint64_t acl_subevent;
|
||||
switch (acl_retval) {
|
||||
case ACL_DENIED_DB: acl_subevent = VALKEYMODULE_ACL_LOG_DB; break;
|
||||
case ACL_DENIED_KEY: acl_subevent = VALKEYMODULE_ACL_LOG_KEY; break;
|
||||
case ACL_DENIED_CHANNEL: acl_subevent = VALKEYMODULE_ACL_LOG_CHANNEL; break;
|
||||
default: acl_subevent = VALKEYMODULE_ACL_LOG_CMD; break;
|
||||
}
|
||||
moduleFireCommandACLRejectedEvent(c, acl_subevent, acl_errpos);
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
@@ -4422,6 +4447,7 @@ int processCommand(client *c) {
|
||||
clusterRedirectClient(c, n, c->slot, error_code);
|
||||
c->duration = 0;
|
||||
c->cmd->rejected_calls++;
|
||||
moduleFireCommandRejectedEvent(c, NULL);
|
||||
return C_OK;
|
||||
}
|
||||
}
|
||||
@@ -4457,6 +4483,7 @@ int processCommand(client *c) {
|
||||
}
|
||||
c->duration = 0;
|
||||
c->cmd->rejected_calls++;
|
||||
moduleFireCommandRejectedEvent(c, "-REDIRECT");
|
||||
addReplyErrorSds(c, sdscatprintf(sdsempty(), "-REDIRECT %s:%d", server.primary_host, server.primary_port));
|
||||
}
|
||||
return C_OK;
|
||||
@@ -4496,7 +4523,7 @@ int processCommand(client *c) {
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
rejectCommand(c, shared.oomerr);
|
||||
rejectCommand(c, shared.oomerr, 1);
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
@@ -4533,7 +4560,7 @@ int processCommand(client *c) {
|
||||
sds err = writeCommandsGetDiskErrorMessage(deny_write_type);
|
||||
/* remove the newline since rejectCommandSds adds it. */
|
||||
sdssubstr(err, 0, sdslen(err) - 2);
|
||||
rejectCommandSds(c, err);
|
||||
rejectCommandSds(c, err, 1);
|
||||
return C_OK;
|
||||
}
|
||||
}
|
||||
@@ -4541,14 +4568,14 @@ int processCommand(client *c) {
|
||||
/* Don't accept write commands if there are not enough good replicas and
|
||||
* user configured the min-replicas-to-write option. */
|
||||
if (is_write_command && !checkGoodReplicasStatus()) {
|
||||
rejectCommand(c, shared.noreplicaserr);
|
||||
rejectCommand(c, shared.noreplicaserr, 1);
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
/* Don't accept write commands if this is a read only replica. But
|
||||
* accept write commands if this is our primary. */
|
||||
if (server.primary_host && server.repl_replica_ro && !obey_client && is_write_command) {
|
||||
rejectCommand(c, shared.roreplicaerr);
|
||||
rejectCommand(c, shared.roreplicaerr, 1);
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
@@ -4558,10 +4585,12 @@ int processCommand(client *c) {
|
||||
c->cmd->proc != ssubscribeCommand && c->cmd->proc != unsubscribeCommand &&
|
||||
c->cmd->proc != sunsubscribeCommand && c->cmd->proc != psubscribeCommand &&
|
||||
c->cmd->proc != punsubscribeCommand && c->cmd->proc != quitCommand && c->cmd->proc != resetCommand) {
|
||||
rejectCommandFormat(c,
|
||||
"Can't execute '%s': only (P|S)SUBSCRIBE / "
|
||||
"(P|S)UNSUBSCRIBE / PING / QUIT / RESET are allowed in this context",
|
||||
c->cmd->fullname);
|
||||
sds pubsub_err = sdscatprintf(sdsempty(),
|
||||
"Can't execute '%s': only (P|S)SUBSCRIBE / "
|
||||
"(P|S)UNSUBSCRIBE / PING / QUIT / RESET are allowed in this context",
|
||||
c->cmd->fullname);
|
||||
sdsmapchars(pubsub_err, "\r\n", " ", 2);
|
||||
rejectCommandSds(c, pubsub_err, 1);
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
@@ -4570,20 +4599,20 @@ int processCommand(client *c) {
|
||||
* link with primary. */
|
||||
if (server.primary_host && server.repl_state != REPL_STATE_CONNECTED && server.repl_serve_stale_data == 0 &&
|
||||
is_denystale_command) {
|
||||
rejectCommand(c, shared.primarydownerr);
|
||||
rejectCommand(c, shared.primarydownerr, 1);
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
/* Loading DB? Return an error if the command has not the
|
||||
* CMD_LOADING flag. */
|
||||
if (server.loading && !server.async_loading && is_denyloading_command) {
|
||||
rejectCommand(c, shared.loadingerr);
|
||||
rejectCommand(c, shared.loadingerr, 1);
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
/* During async-loading, block certain commands. */
|
||||
if (server.async_loading && is_deny_async_loading_command) {
|
||||
rejectCommand(c, shared.loadingerr);
|
||||
rejectCommand(c, shared.loadingerr, 1);
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
@@ -4596,13 +4625,15 @@ int processCommand(client *c) {
|
||||
* executed, see Github PR #7022. */
|
||||
if (isInsideYieldingLongCommand() && !(c->cmd->flags & CMD_ALLOW_BUSY)) {
|
||||
if (server.busy_module_yield_flags && server.busy_module_yield_reply) {
|
||||
rejectCommandFormat(c, "-BUSY %s", server.busy_module_yield_reply);
|
||||
sds busy_err = sdscatprintf(sdsempty(), "-BUSY %s", server.busy_module_yield_reply);
|
||||
sdsmapchars(busy_err, "\r\n", " ", 2);
|
||||
rejectCommandSds(c, busy_err, 1);
|
||||
} else if (server.busy_module_yield_flags) {
|
||||
rejectCommand(c, shared.slowmoduleerr);
|
||||
rejectCommand(c, shared.slowmoduleerr, 1);
|
||||
} else if (scriptIsEval()) {
|
||||
rejectCommand(c, shared.slowevalerr);
|
||||
rejectCommand(c, shared.slowevalerr, 1);
|
||||
} else {
|
||||
rejectCommand(c, shared.slowscripterr);
|
||||
rejectCommand(c, shared.slowscripterr, 1);
|
||||
}
|
||||
return C_OK;
|
||||
}
|
||||
@@ -4611,7 +4642,8 @@ int processCommand(client *c) {
|
||||
* The main objective here is to prevent abuse of client pause check
|
||||
* from which replicas are exempt. */
|
||||
if (c->flag.replica && (is_may_replicate_command || is_write_command || is_read_command)) {
|
||||
rejectCommandFormat(c, "Replica can't interact with the keyspace");
|
||||
sds replica_err = sdsnew("Replica can't interact with the keyspace");
|
||||
rejectCommandSds(c, replica_err, 1);
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
|
||||
+1
-1
@@ -3499,7 +3499,7 @@ struct serverMemOverhead *getMemoryOverheadData(void);
|
||||
void freeMemoryOverheadData(struct serverMemOverhead *mh);
|
||||
void checkChildrenDone(void);
|
||||
int setOOMScoreAdj(int process_class);
|
||||
void rejectCommandFormat(client *c, const char *fmt, ...);
|
||||
void rejectCommandFormat(client *c, int notify_modules, const char *fmt, ...);
|
||||
void *activeDefragAlloc(void *ptr);
|
||||
sds activeDefragSds(sds sdsptr);
|
||||
robj *activeDefragStringOb(robj *ob);
|
||||
|
||||
+47
-2
@@ -536,7 +536,11 @@ typedef void (*ValkeyModuleEventLoopOneShotFunc)(void *user_data);
|
||||
#define VALKEYMODULE_EVENT_KEY 17
|
||||
#define VALKEYMODULE_EVENT_AUTHENTICATION_ATTEMPT 18
|
||||
#define VALKEYMODULE_EVENT_ATOMIC_SLOT_MIGRATION 19
|
||||
#define _VALKEYMODULE_EVENT_NEXT 20 /* Next event flag, should be updated if a new event added. */
|
||||
#define VALKEYMODULE_EVENT_COMMAND_RESULT_SUCCESS 20
|
||||
#define VALKEYMODULE_EVENT_COMMAND_RESULT_FAILURE 21
|
||||
#define VALKEYMODULE_EVENT_COMMAND_RESULT_REJECTED 22
|
||||
#define VALKEYMODULE_EVENT_COMMAND_RESULT_ACL_REJECTED 23
|
||||
#define _VALKEYMODULE_EVENT_NEXT 24 /* Next event flag, should be updated if a new event added. */
|
||||
|
||||
typedef struct ValkeyModuleEvent {
|
||||
uint64_t id; /* VALKEYMODULE_EVENT_... defines. */
|
||||
@@ -595,7 +599,11 @@ static const ValkeyModuleEvent ValkeyModuleEvent_ReplicationRoleChanged = {VALKE
|
||||
ValkeyModuleEvent_Config = {VALKEYMODULE_EVENT_CONFIG, 1},
|
||||
ValkeyModuleEvent_Key = {VALKEYMODULE_EVENT_KEY, 1},
|
||||
ValkeyModuleEvent_AuthenticationAttempt = {VALKEYMODULE_EVENT_AUTHENTICATION_ATTEMPT, 1},
|
||||
ValkeyModuleEvent_AtomicSlotMigration = {VALKEYMODULE_EVENT_ATOMIC_SLOT_MIGRATION, 1};
|
||||
ValkeyModuleEvent_AtomicSlotMigration = {VALKEYMODULE_EVENT_ATOMIC_SLOT_MIGRATION, 1},
|
||||
ValkeyModuleEvent_CommandResultSuccess = {VALKEYMODULE_EVENT_COMMAND_RESULT_SUCCESS, 1},
|
||||
ValkeyModuleEvent_CommandResultFailure = {VALKEYMODULE_EVENT_COMMAND_RESULT_FAILURE, 1},
|
||||
ValkeyModuleEvent_CommandResultRejected = {VALKEYMODULE_EVENT_COMMAND_RESULT_REJECTED, 1},
|
||||
ValkeyModuleEvent_CommandResultACLRejected = {VALKEYMODULE_EVENT_COMMAND_RESULT_ACL_REJECTED, 1};
|
||||
|
||||
/* Those are values that are used for the 'subevent' callback argument. */
|
||||
#define VALKEYMODULE_SUBEVENT_PERSISTENCE_RDB_START 0
|
||||
@@ -675,6 +683,21 @@ static const ValkeyModuleEvent ValkeyModuleEvent_ReplicationRoleChanged = {VALKE
|
||||
#define VALKEYMODULE_SUBEVENT_ATOMIC_SLOT_MIGRATION_EXPORT_COMPLETED 5
|
||||
#define _VALKEYMODULE_SUBEVENT_ATOMIC_SLOT_MIGRATION_NEXT 6
|
||||
|
||||
/* ValkeyModuleEvent_CommandResultRejected has no subevents (subevent is always 0).
|
||||
* The rejection_context field in ValkeyModuleCommandResultInfo carries the full
|
||||
* error reply string sent to the client (e.g. "-OOM command not allowed...",
|
||||
* "-ERR Command 'xyz' not allowed inside a transaction"). */
|
||||
|
||||
/* Subevents for ValkeyModuleEvent_CommandResultACLRejected.
|
||||
* These reuse the ValkeyModuleACLLogEntryReason enum values so the same
|
||||
* constants work with both the ACL log API and this event:
|
||||
* VALKEYMODULE_ACL_LOG_AUTH = 0 (NOAUTH; rejection_context = NULL)
|
||||
* VALKEYMODULE_ACL_LOG_CMD = 1 (NOPERM command; rejection_context = NULL)
|
||||
* VALKEYMODULE_ACL_LOG_KEY = 2 (NOPERM key; rejection_context = key name)
|
||||
* VALKEYMODULE_ACL_LOG_CHANNEL = 3 (NOPERM channel; rejection_context = channel name)
|
||||
* VALKEYMODULE_ACL_LOG_DB = 4 (NOPERM database; rejection_context = NULL)
|
||||
* No additional subevent constants are needed. */
|
||||
|
||||
/* ValkeyModuleClientInfo flags.
|
||||
* Note: flags VALKEYMODULE_CLIENTINFO_FLAG_PRIMARY and below were added in Valkey 9.1 */
|
||||
#define VALKEYMODULE_CLIENTINFO_FLAG_SSL (1 << 0)
|
||||
@@ -848,6 +871,28 @@ typedef struct ValkeyModuleAtomicSlotMigrationInfo {
|
||||
|
||||
#define ValkeyModuleAtomicSlotMigrationInfo ValkeyModuleAtomicSlotMigrationInfoV1
|
||||
|
||||
#define VALKEYMODULE_COMMANDRESULTINFO_VERSION 1
|
||||
typedef struct ValkeyModuleCommandResultInfo {
|
||||
uint64_t version; /* Version of this structure for ABI compat. */
|
||||
const char *command_name; /* Command name (e.g., "SET", "GET"). */
|
||||
long long duration_us; /* Execution duration in microseconds. */
|
||||
long long dirty; /* Number of keys modified. */
|
||||
uint64_t client_id; /* Client ID that executed the command. */
|
||||
int is_module_client; /* 1 if command was from RM_Call, 0 otherwise. */
|
||||
int argc; /* Number of command arguments. */
|
||||
ValkeyModuleString **argv; /* Command arguments array (zero-copy). */
|
||||
const char *rejection_context; /* Context string; meaning depends on the event:
|
||||
* ValkeyModuleEvent_CommandResultRejected:
|
||||
* The full error reply string sent to the client
|
||||
* (e.g. "-OOM command not allowed when used memory > 'maxmemory'").
|
||||
* ValkeyModuleEvent_CommandResultACLRejected:
|
||||
* ACL_LOG_KEY (2): the denied key name.
|
||||
* ACL_LOG_CHANNEL (3): the denied channel name.
|
||||
* All other ACL subevents: NULL. */
|
||||
} ValkeyModuleCommandResultInfoV1;
|
||||
|
||||
#define ValkeyModuleCommandResultInfo ValkeyModuleCommandResultInfoV1
|
||||
|
||||
#define VALKEYMODULE_ATOMICSLOTMIGRATIONINFO_INITIALIZER_V1 {.version = 1}
|
||||
|
||||
typedef enum {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
# Build test modules
|
||||
list(APPEND MODULES_LIST "commandfilter")
|
||||
list(APPEND MODULES_LIST "commandresult")
|
||||
list(APPEND MODULES_LIST "basics")
|
||||
list(APPEND MODULES_LIST "testrdb")
|
||||
list(APPEND MODULES_LIST "fork")
|
||||
|
||||
@@ -25,6 +25,7 @@ endif
|
||||
|
||||
TEST_MODULES = \
|
||||
commandfilter.so \
|
||||
commandresult.so \
|
||||
basics.so \
|
||||
testrdb.so \
|
||||
fork.so \
|
||||
|
||||
@@ -0,0 +1,497 @@
|
||||
/* Test module for command result event API
|
||||
*
|
||||
* This module tests the VALKEYMODULE_EVENT_COMMAND_RESULT_SUCCESS,
|
||||
* VALKEYMODULE_EVENT_COMMAND_RESULT_FAILURE,
|
||||
* VALKEYMODULE_EVENT_COMMAND_RESULT_REJECTED, and
|
||||
* VALKEYMODULE_EVENT_COMMAND_RESULT_ACL_REJECTED server events.
|
||||
*
|
||||
* Commands provided:
|
||||
* - CMDRESULT.REGISTER <mode> - Register event subscription
|
||||
* (success/failure/rejected/acl_rejected/all)
|
||||
* - CMDRESULT.UNSUBSCRIBE - Unsubscribe from the event
|
||||
* - CMDRESULT.STATS - Get statistics about event invocations
|
||||
* - CMDRESULT.RESET - Reset statistics
|
||||
* - CMDRESULT.GETLOG [count] - Get the last N logged command results
|
||||
* - CMDRESULT.SUCCESS - A command that always succeeds
|
||||
* - CMDRESULT.FAIL - A command that always fails
|
||||
* - CMDRESULT.RMCALL <command> [args...] - Call a command via RM_Call
|
||||
*/
|
||||
|
||||
#include "valkeymodule.h"
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
|
||||
/* Statistics tracking */
|
||||
static struct {
|
||||
long long total_callbacks;
|
||||
long long success_count;
|
||||
long long failure_count;
|
||||
long long rejected_count;
|
||||
long long acl_denied_count;
|
||||
long long total_duration_us;
|
||||
long long total_dirty;
|
||||
} stats = {0};
|
||||
|
||||
/* Command result log entry */
|
||||
#define MAX_LOG_ENTRIES 100
|
||||
#define MAX_ARGV_LOG 10
|
||||
#define MAX_ARG_LEN 128
|
||||
|
||||
typedef struct {
|
||||
char command_name[64];
|
||||
int status; /* 0 = success, 1 = failure, 2 = acl_rejected, 3 = rejected */
|
||||
uint64_t subevent;
|
||||
long long duration;
|
||||
long long dirty;
|
||||
unsigned long long client_id;
|
||||
int is_module_client;
|
||||
int argc;
|
||||
char argv[MAX_ARGV_LOG][MAX_ARG_LEN];
|
||||
char rejection_context[MAX_ARG_LEN];
|
||||
} ResultLogEntry;
|
||||
|
||||
static ResultLogEntry result_log[MAX_LOG_ENTRIES];
|
||||
static int log_head = 0;
|
||||
static int log_count = 0;
|
||||
|
||||
/* Track subscription mode bitmask:
|
||||
* bit 0 = success, bit 1 = failure, bit 2 = rejected, bit 3 = acl_rejected */
|
||||
#define MODE_SUCCESS 0x1
|
||||
#define MODE_FAILURE 0x2
|
||||
#define MODE_REJECTED 0x4
|
||||
#define MODE_ACL_REJECTED 0x8
|
||||
static int subscription_mode = 0;
|
||||
|
||||
/* Add entry to circular log */
|
||||
void LogResult(const char *cmd_name, int status, uint64_t subevent,
|
||||
long long duration, long long dirty,
|
||||
unsigned long long client_id, int is_module_client,
|
||||
ValkeyModuleString **argv, int argc,
|
||||
const char *rejection_context) {
|
||||
ResultLogEntry *entry = &result_log[log_head];
|
||||
|
||||
strncpy(entry->command_name, cmd_name, sizeof(entry->command_name) - 1);
|
||||
entry->command_name[sizeof(entry->command_name) - 1] = '\0';
|
||||
entry->status = status;
|
||||
entry->subevent = subevent;
|
||||
entry->duration = duration;
|
||||
entry->dirty = dirty;
|
||||
entry->client_id = client_id;
|
||||
entry->is_module_client = is_module_client;
|
||||
|
||||
if (rejection_context) {
|
||||
strncpy(entry->rejection_context, rejection_context,
|
||||
sizeof(entry->rejection_context) - 1);
|
||||
entry->rejection_context[sizeof(entry->rejection_context) - 1] = '\0';
|
||||
} else {
|
||||
entry->rejection_context[0] = '\0';
|
||||
}
|
||||
|
||||
/* Store argv */
|
||||
if (argv && argc > 0) {
|
||||
entry->argc = (argc < MAX_ARGV_LOG) ? argc : MAX_ARGV_LOG;
|
||||
for (int i = 0; i < entry->argc; i++) {
|
||||
if (argv[i] == NULL) {
|
||||
strcpy(entry->argv[i], "(null)");
|
||||
continue;
|
||||
}
|
||||
size_t len;
|
||||
const char *arg = ValkeyModule_StringPtrLen(argv[i], &len);
|
||||
if (arg == NULL) {
|
||||
strcpy(entry->argv[i], "(empty)");
|
||||
continue;
|
||||
}
|
||||
size_t copy_len = (len < MAX_ARG_LEN - 1) ? len : MAX_ARG_LEN - 1;
|
||||
memcpy(entry->argv[i], arg, copy_len);
|
||||
entry->argv[i][copy_len] = '\0';
|
||||
}
|
||||
} else {
|
||||
entry->argc = 0;
|
||||
}
|
||||
|
||||
log_head = (log_head + 1) % MAX_LOG_ENTRIES;
|
||||
if (log_count < MAX_LOG_ENTRIES)
|
||||
log_count++;
|
||||
}
|
||||
|
||||
/* Command result event callback — handles success, failure, and rejected
|
||||
* events */
|
||||
void CommandResultEventCallback(ValkeyModuleCtx *ctx, ValkeyModuleEvent eid,
|
||||
uint64_t subevent, void *data) {
|
||||
VALKEYMODULE_NOT_USED(ctx);
|
||||
|
||||
ValkeyModuleCommandResultInfo *info = (ValkeyModuleCommandResultInfo *)data;
|
||||
|
||||
if (info->version != VALKEYMODULE_COMMANDRESULTINFO_VERSION)
|
||||
return;
|
||||
|
||||
stats.total_callbacks++;
|
||||
|
||||
int status;
|
||||
if (eid.id == VALKEYMODULE_EVENT_COMMAND_RESULT_ACL_REJECTED) {
|
||||
status = 2;
|
||||
stats.acl_denied_count++;
|
||||
} else if (eid.id == VALKEYMODULE_EVENT_COMMAND_RESULT_REJECTED) {
|
||||
status = 3;
|
||||
stats.rejected_count++;
|
||||
} else if (eid.id == VALKEYMODULE_EVENT_COMMAND_RESULT_FAILURE) {
|
||||
status = 1;
|
||||
stats.failure_count++;
|
||||
} else {
|
||||
status = 0;
|
||||
stats.success_count++;
|
||||
}
|
||||
|
||||
stats.total_duration_us += info->duration_us;
|
||||
stats.total_dirty += info->dirty;
|
||||
|
||||
LogResult(info->command_name ? info->command_name : "unknown", status,
|
||||
subevent, info->duration_us, info->dirty, info->client_id,
|
||||
info->is_module_client, info->argv, info->argc,
|
||||
info->rejection_context);
|
||||
}
|
||||
|
||||
/* CMDRESULT.REGISTER <mode>
|
||||
* Mode can be: "all", "success", "failure", "rejected"
|
||||
*/
|
||||
int CmdResultRegister_ValkeyCommand(ValkeyModuleCtx *ctx,
|
||||
ValkeyModuleString **argv, int argc) {
|
||||
if (argc != 2) {
|
||||
return ValkeyModule_WrongArity(ctx);
|
||||
}
|
||||
|
||||
if (subscription_mode != 0) {
|
||||
return ValkeyModule_ReplyWithError(
|
||||
ctx, "ERR already subscribed to command result events");
|
||||
}
|
||||
|
||||
size_t len;
|
||||
const char *mode_str = ValkeyModule_StringPtrLen(argv[1], &len);
|
||||
|
||||
int new_mode = 0;
|
||||
if (strcmp(mode_str, "all") == 0) {
|
||||
new_mode = MODE_SUCCESS | MODE_FAILURE | MODE_REJECTED | MODE_ACL_REJECTED;
|
||||
} else if (strcmp(mode_str, "success") == 0) {
|
||||
new_mode = MODE_SUCCESS;
|
||||
} else if (strcmp(mode_str, "failure") == 0) {
|
||||
new_mode = MODE_FAILURE;
|
||||
} else if (strcmp(mode_str, "rejected") == 0) {
|
||||
new_mode = MODE_REJECTED;
|
||||
} else if (strcmp(mode_str, "acl_rejected") == 0) {
|
||||
new_mode = MODE_ACL_REJECTED;
|
||||
} else {
|
||||
return ValkeyModule_ReplyWithError(ctx,
|
||||
"ERR invalid mode. Use: all, success, "
|
||||
"failure, rejected, or acl_rejected");
|
||||
}
|
||||
|
||||
if ((new_mode & MODE_SUCCESS) &&
|
||||
ValkeyModule_SubscribeToServerEvent(
|
||||
ctx, ValkeyModuleEvent_CommandResultSuccess,
|
||||
CommandResultEventCallback) == VALKEYMODULE_ERR) {
|
||||
return ValkeyModule_ReplyWithError(
|
||||
ctx, "ERR failed to subscribe to success event");
|
||||
}
|
||||
|
||||
if ((new_mode & MODE_FAILURE) &&
|
||||
ValkeyModule_SubscribeToServerEvent(
|
||||
ctx, ValkeyModuleEvent_CommandResultFailure,
|
||||
CommandResultEventCallback) == VALKEYMODULE_ERR) {
|
||||
if (new_mode & MODE_SUCCESS)
|
||||
ValkeyModule_SubscribeToServerEvent(
|
||||
ctx, ValkeyModuleEvent_CommandResultSuccess, NULL);
|
||||
return ValkeyModule_ReplyWithError(
|
||||
ctx, "ERR failed to subscribe to failure event");
|
||||
}
|
||||
|
||||
if ((new_mode & MODE_REJECTED) &&
|
||||
ValkeyModule_SubscribeToServerEvent(
|
||||
ctx, ValkeyModuleEvent_CommandResultRejected,
|
||||
CommandResultEventCallback) == VALKEYMODULE_ERR) {
|
||||
if (new_mode & MODE_SUCCESS)
|
||||
ValkeyModule_SubscribeToServerEvent(
|
||||
ctx, ValkeyModuleEvent_CommandResultSuccess, NULL);
|
||||
if (new_mode & MODE_FAILURE)
|
||||
ValkeyModule_SubscribeToServerEvent(
|
||||
ctx, ValkeyModuleEvent_CommandResultFailure, NULL);
|
||||
return ValkeyModule_ReplyWithError(
|
||||
ctx, "ERR failed to subscribe to rejected event");
|
||||
}
|
||||
|
||||
if ((new_mode & MODE_ACL_REJECTED) &&
|
||||
ValkeyModule_SubscribeToServerEvent(
|
||||
ctx, ValkeyModuleEvent_CommandResultACLRejected,
|
||||
CommandResultEventCallback) == VALKEYMODULE_ERR) {
|
||||
if (new_mode & MODE_SUCCESS)
|
||||
ValkeyModule_SubscribeToServerEvent(
|
||||
ctx, ValkeyModuleEvent_CommandResultSuccess, NULL);
|
||||
if (new_mode & MODE_FAILURE)
|
||||
ValkeyModule_SubscribeToServerEvent(
|
||||
ctx, ValkeyModuleEvent_CommandResultFailure, NULL);
|
||||
if (new_mode & MODE_REJECTED)
|
||||
ValkeyModule_SubscribeToServerEvent(
|
||||
ctx, ValkeyModuleEvent_CommandResultRejected, NULL);
|
||||
return ValkeyModule_ReplyWithError(
|
||||
ctx, "ERR failed to subscribe to acl_rejected event");
|
||||
}
|
||||
|
||||
subscription_mode = new_mode;
|
||||
return ValkeyModule_ReplyWithSimpleString(ctx, "OK");
|
||||
}
|
||||
|
||||
/* CMDRESULT.UNSUBSCRIBE */
|
||||
int CmdResultUnsubscribe_ValkeyCommand(ValkeyModuleCtx *ctx,
|
||||
ValkeyModuleString **argv, int argc) {
|
||||
VALKEYMODULE_NOT_USED(argv);
|
||||
|
||||
if (argc != 1) {
|
||||
return ValkeyModule_WrongArity(ctx);
|
||||
}
|
||||
|
||||
if (subscription_mode == 0) {
|
||||
return ValkeyModule_ReplyWithError(
|
||||
ctx, "ERR not subscribed to command result events");
|
||||
}
|
||||
|
||||
if (subscription_mode & MODE_SUCCESS)
|
||||
ValkeyModule_SubscribeToServerEvent(
|
||||
ctx, ValkeyModuleEvent_CommandResultSuccess, NULL);
|
||||
if (subscription_mode & MODE_FAILURE)
|
||||
ValkeyModule_SubscribeToServerEvent(
|
||||
ctx, ValkeyModuleEvent_CommandResultFailure, NULL);
|
||||
if (subscription_mode & MODE_REJECTED)
|
||||
ValkeyModule_SubscribeToServerEvent(
|
||||
ctx, ValkeyModuleEvent_CommandResultRejected, NULL);
|
||||
if (subscription_mode & MODE_ACL_REJECTED)
|
||||
ValkeyModule_SubscribeToServerEvent(
|
||||
ctx, ValkeyModuleEvent_CommandResultACLRejected, NULL);
|
||||
|
||||
subscription_mode = 0;
|
||||
return ValkeyModule_ReplyWithSimpleString(ctx, "OK");
|
||||
}
|
||||
|
||||
/* CMDRESULT.STATS
|
||||
* Returns: total_callbacks, success_count, failure_count, rejected_count,
|
||||
* total_duration_us, total_dirty
|
||||
*/
|
||||
int CmdResultStats_ValkeyCommand(ValkeyModuleCtx *ctx,
|
||||
ValkeyModuleString **argv, int argc) {
|
||||
VALKEYMODULE_NOT_USED(argv);
|
||||
|
||||
if (argc != 1) {
|
||||
return ValkeyModule_WrongArity(ctx);
|
||||
}
|
||||
|
||||
ValkeyModule_ReplyWithArray(ctx, 14);
|
||||
ValkeyModule_ReplyWithSimpleString(ctx, "total_callbacks");
|
||||
ValkeyModule_ReplyWithLongLong(ctx, stats.total_callbacks);
|
||||
ValkeyModule_ReplyWithSimpleString(ctx, "success_count");
|
||||
ValkeyModule_ReplyWithLongLong(ctx, stats.success_count);
|
||||
ValkeyModule_ReplyWithSimpleString(ctx, "failure_count");
|
||||
ValkeyModule_ReplyWithLongLong(ctx, stats.failure_count);
|
||||
ValkeyModule_ReplyWithSimpleString(ctx, "rejected_count");
|
||||
ValkeyModule_ReplyWithLongLong(ctx, stats.rejected_count);
|
||||
ValkeyModule_ReplyWithSimpleString(ctx, "acl_denied_count");
|
||||
ValkeyModule_ReplyWithLongLong(ctx, stats.acl_denied_count);
|
||||
ValkeyModule_ReplyWithSimpleString(ctx, "total_duration_us");
|
||||
ValkeyModule_ReplyWithLongLong(ctx, stats.total_duration_us);
|
||||
ValkeyModule_ReplyWithSimpleString(ctx, "total_dirty");
|
||||
ValkeyModule_ReplyWithLongLong(ctx, stats.total_dirty);
|
||||
|
||||
return VALKEYMODULE_OK;
|
||||
}
|
||||
|
||||
/* CMDRESULT.RESET */
|
||||
int CmdResultReset_ValkeyCommand(ValkeyModuleCtx *ctx,
|
||||
ValkeyModuleString **argv, int argc) {
|
||||
VALKEYMODULE_NOT_USED(argv);
|
||||
|
||||
if (argc != 1) {
|
||||
return ValkeyModule_WrongArity(ctx);
|
||||
}
|
||||
|
||||
stats.total_callbacks = 0;
|
||||
stats.success_count = 0;
|
||||
stats.failure_count = 0;
|
||||
stats.rejected_count = 0;
|
||||
stats.acl_denied_count = 0;
|
||||
stats.total_duration_us = 0;
|
||||
stats.total_dirty = 0;
|
||||
|
||||
log_head = 0;
|
||||
log_count = 0;
|
||||
|
||||
return ValkeyModule_ReplyWithSimpleString(ctx, "OK");
|
||||
}
|
||||
|
||||
/* CMDRESULT.GETLOG [count]
|
||||
* Returns the last N command results from the log
|
||||
*/
|
||||
int CmdResultGetLog_ValkeyCommand(ValkeyModuleCtx *ctx,
|
||||
ValkeyModuleString **argv, int argc) {
|
||||
if (argc > 2) {
|
||||
return ValkeyModule_WrongArity(ctx);
|
||||
}
|
||||
|
||||
long long count = log_count;
|
||||
if (argc == 2) {
|
||||
if (ValkeyModule_StringToLongLong(argv[1], &count) != VALKEYMODULE_OK) {
|
||||
return ValkeyModule_ReplyWithError(ctx, "ERR invalid count");
|
||||
}
|
||||
if (count < 0)
|
||||
count = 0;
|
||||
if (count > log_count)
|
||||
count = log_count;
|
||||
}
|
||||
|
||||
ValkeyModule_ReplyWithArray(ctx, count);
|
||||
|
||||
/* Get entries from newest to oldest */
|
||||
for (int i = 0; i < count; i++) {
|
||||
int idx = (log_head - 1 - i + MAX_LOG_ENTRIES) % MAX_LOG_ENTRIES;
|
||||
ResultLogEntry *entry = &result_log[idx];
|
||||
|
||||
const char *status_str;
|
||||
if (entry->status == 3)
|
||||
status_str = "rejected";
|
||||
else if (entry->status == 2)
|
||||
status_str = "acl_rejected";
|
||||
else if (entry->status == 1)
|
||||
status_str = "failure";
|
||||
else
|
||||
status_str = "success";
|
||||
|
||||
ValkeyModule_ReplyWithArray(ctx, 18);
|
||||
ValkeyModule_ReplyWithSimpleString(ctx, "command");
|
||||
ValkeyModule_ReplyWithCString(ctx, entry->command_name);
|
||||
ValkeyModule_ReplyWithSimpleString(ctx, "status");
|
||||
ValkeyModule_ReplyWithCString(ctx, status_str);
|
||||
ValkeyModule_ReplyWithSimpleString(ctx, "duration_us");
|
||||
ValkeyModule_ReplyWithLongLong(ctx, entry->duration);
|
||||
ValkeyModule_ReplyWithSimpleString(ctx, "dirty");
|
||||
ValkeyModule_ReplyWithLongLong(ctx, entry->dirty);
|
||||
ValkeyModule_ReplyWithSimpleString(ctx, "client_id");
|
||||
ValkeyModule_ReplyWithLongLong(ctx, entry->client_id);
|
||||
ValkeyModule_ReplyWithSimpleString(ctx, "is_module_client");
|
||||
ValkeyModule_ReplyWithLongLong(ctx, entry->is_module_client);
|
||||
ValkeyModule_ReplyWithSimpleString(ctx, "subevent");
|
||||
ValkeyModule_ReplyWithLongLong(ctx, entry->subevent);
|
||||
ValkeyModule_ReplyWithSimpleString(ctx, "rejection_context");
|
||||
ValkeyModule_ReplyWithCString(ctx, entry->rejection_context);
|
||||
ValkeyModule_ReplyWithSimpleString(ctx, "argv");
|
||||
ValkeyModule_ReplyWithArray(ctx, entry->argc);
|
||||
for (int j = 0; j < entry->argc; j++) {
|
||||
ValkeyModule_ReplyWithCString(ctx, entry->argv[j]);
|
||||
}
|
||||
}
|
||||
|
||||
return VALKEYMODULE_OK;
|
||||
}
|
||||
|
||||
/* CMDRESULT.SUCCESS
|
||||
* A command that always succeeds
|
||||
*/
|
||||
int CmdResultSuccess_ValkeyCommand(ValkeyModuleCtx *ctx,
|
||||
ValkeyModuleString **argv, int argc) {
|
||||
VALKEYMODULE_NOT_USED(argv);
|
||||
VALKEYMODULE_NOT_USED(argc);
|
||||
|
||||
return ValkeyModule_ReplyWithSimpleString(ctx, "OK");
|
||||
}
|
||||
|
||||
/* CMDRESULT.FAIL
|
||||
* A command that always fails
|
||||
*/
|
||||
int CmdResultFail_ValkeyCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv,
|
||||
int argc) {
|
||||
VALKEYMODULE_NOT_USED(argv);
|
||||
VALKEYMODULE_NOT_USED(argc);
|
||||
|
||||
return ValkeyModule_ReplyWithError(ctx, "ERR intentional failure");
|
||||
}
|
||||
|
||||
/* CMDRESULT.RMCALL <command> [args...]
|
||||
* Test calling a command via RM_Call - allows testing is_module_client
|
||||
* detection
|
||||
*/
|
||||
int CmdResultRMCall_ValkeyCommand(ValkeyModuleCtx *ctx,
|
||||
ValkeyModuleString **argv, int argc) {
|
||||
if (argc < 2) {
|
||||
return ValkeyModule_WrongArity(ctx);
|
||||
}
|
||||
|
||||
/* Call the command via RM_Call */
|
||||
ValkeyModuleCallReply *reply = ValkeyModule_Call(
|
||||
ctx, ValkeyModule_StringPtrLen(argv[1], NULL), "v", argv + 2, argc - 2);
|
||||
|
||||
if (!reply) {
|
||||
return ValkeyModule_ReplyWithError(ctx, "ERR call failed");
|
||||
}
|
||||
|
||||
/* Forward the reply */
|
||||
ValkeyModule_ReplyWithCallReply(ctx, reply);
|
||||
ValkeyModule_FreeCallReply(reply);
|
||||
|
||||
return VALKEYMODULE_OK;
|
||||
}
|
||||
|
||||
int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv,
|
||||
int argc) {
|
||||
VALKEYMODULE_NOT_USED(argv);
|
||||
VALKEYMODULE_NOT_USED(argc);
|
||||
|
||||
if (ValkeyModule_Init(ctx, "commandresult", 1, VALKEYMODULE_APIVER_1) ==
|
||||
VALKEYMODULE_ERR) {
|
||||
return VALKEYMODULE_ERR;
|
||||
}
|
||||
|
||||
if (ValkeyModule_CreateCommand(ctx, "cmdresult.register",
|
||||
CmdResultRegister_ValkeyCommand, "admin", 0, 0,
|
||||
0) == VALKEYMODULE_ERR) {
|
||||
return VALKEYMODULE_ERR;
|
||||
}
|
||||
|
||||
if (ValkeyModule_CreateCommand(ctx, "cmdresult.unsubscribe",
|
||||
CmdResultUnsubscribe_ValkeyCommand, "admin", 0,
|
||||
0, 0) == VALKEYMODULE_ERR) {
|
||||
return VALKEYMODULE_ERR;
|
||||
}
|
||||
|
||||
if (ValkeyModule_CreateCommand(ctx, "cmdresult.stats",
|
||||
CmdResultStats_ValkeyCommand, "readonly", 0, 0,
|
||||
0) == VALKEYMODULE_ERR) {
|
||||
return VALKEYMODULE_ERR;
|
||||
}
|
||||
|
||||
if (ValkeyModule_CreateCommand(ctx, "cmdresult.reset",
|
||||
CmdResultReset_ValkeyCommand, "admin", 0, 0,
|
||||
0) == VALKEYMODULE_ERR) {
|
||||
return VALKEYMODULE_ERR;
|
||||
}
|
||||
|
||||
if (ValkeyModule_CreateCommand(ctx, "cmdresult.getlog",
|
||||
CmdResultGetLog_ValkeyCommand, "readonly", 0,
|
||||
0, 0) == VALKEYMODULE_ERR) {
|
||||
return VALKEYMODULE_ERR;
|
||||
}
|
||||
|
||||
if (ValkeyModule_CreateCommand(ctx, "cmdresult.success",
|
||||
CmdResultSuccess_ValkeyCommand, "readonly", 0,
|
||||
0, 0) == VALKEYMODULE_ERR) {
|
||||
return VALKEYMODULE_ERR;
|
||||
}
|
||||
|
||||
if (ValkeyModule_CreateCommand(ctx, "cmdresult.fail",
|
||||
CmdResultFail_ValkeyCommand, "readonly", 0, 0,
|
||||
0) == VALKEYMODULE_ERR) {
|
||||
return VALKEYMODULE_ERR;
|
||||
}
|
||||
|
||||
if (ValkeyModule_CreateCommand(ctx, "cmdresult.rmcall",
|
||||
CmdResultRMCall_ValkeyCommand, "readonly", 0,
|
||||
0, 0) == VALKEYMODULE_ERR) {
|
||||
return VALKEYMODULE_ERR;
|
||||
}
|
||||
|
||||
return VALKEYMODULE_OK;
|
||||
}
|
||||
@@ -0,0 +1,703 @@
|
||||
set testmodule [file normalize tests/modules/commandresult.so]
|
||||
|
||||
start_server {tags {"modules"}} {
|
||||
r module load $testmodule
|
||||
|
||||
# Helper to ensure cleanup between tests
|
||||
proc cleanup_callback {} {
|
||||
catch {r cmdresult.unsubscribe}
|
||||
r cmdresult.reset
|
||||
}
|
||||
|
||||
test {Module commandresult - Subscribe to all command result events} {
|
||||
cleanup_callback
|
||||
r cmdresult.register all
|
||||
|
||||
# Execute some commands
|
||||
r cmdresult.success
|
||||
r ping
|
||||
catch {r cmdresult.fail} e
|
||||
|
||||
# Check stats
|
||||
set stats [r cmdresult.stats]
|
||||
assert {[dict get $stats total_callbacks] >= 3}
|
||||
assert {[dict get $stats success_count] >= 2}
|
||||
assert {[dict get $stats failure_count] >= 1}
|
||||
|
||||
r cmdresult.unsubscribe
|
||||
}
|
||||
|
||||
test {Module commandresult - Subscribe to success events only} {
|
||||
cleanup_callback
|
||||
r cmdresult.register success
|
||||
|
||||
# Execute successful and failing commands
|
||||
r cmdresult.success
|
||||
r ping
|
||||
r cmdresult.success
|
||||
catch {r cmdresult.fail} e
|
||||
catch {r cmdresult.fail} e
|
||||
|
||||
# With success-only subscription, only success events are received
|
||||
set stats [r cmdresult.stats]
|
||||
assert {[dict get $stats success_count] >= 3}
|
||||
# Failures should NOT be tracked since we only subscribed to success
|
||||
assert_equal [dict get $stats failure_count] 0
|
||||
|
||||
r cmdresult.unsubscribe
|
||||
}
|
||||
|
||||
test {Module commandresult - Subscribe to failure events only} {
|
||||
cleanup_callback
|
||||
r cmdresult.register failure
|
||||
|
||||
# Execute successful and failing commands
|
||||
r cmdresult.success
|
||||
r ping
|
||||
r cmdresult.success
|
||||
catch {r cmdresult.fail} e
|
||||
catch {r cmdresult.fail} e
|
||||
|
||||
# With failure-only subscription, only failure events are received
|
||||
set stats [r cmdresult.stats]
|
||||
assert_equal [dict get $stats failure_count] 2
|
||||
# Successes should NOT be tracked since we only subscribed to failure
|
||||
assert_equal [dict get $stats success_count] 0
|
||||
|
||||
r cmdresult.unsubscribe
|
||||
}
|
||||
|
||||
test {Module commandresult - Callback tracks duration} {
|
||||
cleanup_callback
|
||||
r cmdresult.register all
|
||||
|
||||
r cmdresult.success
|
||||
r ping
|
||||
|
||||
set stats [r cmdresult.stats]
|
||||
# Duration should be > 0 microseconds
|
||||
assert {[dict get $stats total_duration_us] > 0}
|
||||
|
||||
r cmdresult.unsubscribe
|
||||
}
|
||||
|
||||
test {Module commandresult - Callback tracks dirty keys} {
|
||||
cleanup_callback
|
||||
r cmdresult.register all
|
||||
|
||||
# This command modifies a key
|
||||
r SET ss 3
|
||||
|
||||
set stats [r cmdresult.stats]
|
||||
# Should have at least 1 dirty key
|
||||
assert {[dict get $stats total_dirty] >= 1}
|
||||
|
||||
r cmdresult.unsubscribe
|
||||
}
|
||||
|
||||
test {Module commandresult - Get command log} {
|
||||
cleanup_callback
|
||||
r cmdresult.register all
|
||||
|
||||
r cmdresult.success
|
||||
catch {r cmdresult.fail} e
|
||||
r ping
|
||||
|
||||
set log [r cmdresult.getlog 3]
|
||||
assert_equal [llength $log] 3
|
||||
|
||||
# Check first entry (most recent - ping)
|
||||
set entry [lindex $log 0]
|
||||
assert {[dict get $entry command] eq "ping"}
|
||||
assert {[dict get $entry status] eq "success"}
|
||||
|
||||
# Check second entry (cmdresult.fail)
|
||||
set entry [lindex $log 1]
|
||||
assert {[dict get $entry command] eq "cmdresult.fail"}
|
||||
assert {[dict get $entry status] eq "failure"}
|
||||
|
||||
# Check third entry (cmdresult.success)
|
||||
set entry [lindex $log 2]
|
||||
assert {[dict get $entry command] eq "cmdresult.success"}
|
||||
assert {[dict get $entry status] eq "success"}
|
||||
|
||||
r cmdresult.unsubscribe
|
||||
}
|
||||
|
||||
test {Module commandresult - Client ID is captured} {
|
||||
cleanup_callback
|
||||
r cmdresult.register all
|
||||
|
||||
r cmdresult.success
|
||||
|
||||
set log [r cmdresult.getlog 1]
|
||||
set entry [lindex $log 0]
|
||||
# Client ID should be a positive integer
|
||||
assert {[dict get $entry client_id] > 0}
|
||||
|
||||
r cmdresult.unsubscribe
|
||||
}
|
||||
|
||||
test {Module commandresult - is_module_client detection} {
|
||||
cleanup_callback
|
||||
r cmdresult.register all
|
||||
|
||||
# Direct command - should NOT be from module client
|
||||
r ping
|
||||
|
||||
set log [r cmdresult.getlog 1]
|
||||
set entry [lindex $log 0]
|
||||
assert_equal [dict get $entry is_module_client] 0
|
||||
|
||||
r cmdresult.unsubscribe
|
||||
}
|
||||
|
||||
test {Module commandresult - RM_Call shows is_module_client=1} {
|
||||
cleanup_callback
|
||||
r cmdresult.register all
|
||||
|
||||
# This command calls PING via RM_Call
|
||||
r cmdresult.rmcall ping
|
||||
|
||||
set log [r cmdresult.getlog 2]
|
||||
# The inner ping should have is_module_client=1
|
||||
set ping_entry [lindex $log 1]
|
||||
assert {[dict get $ping_entry command] eq "ping"}
|
||||
assert_equal [dict get $ping_entry is_module_client] 1
|
||||
|
||||
r cmdresult.unsubscribe
|
||||
}
|
||||
|
||||
test {Module commandresult - Unsubscribe} {
|
||||
cleanup_callback
|
||||
r cmdresult.register all
|
||||
|
||||
r cmdresult.success
|
||||
# Unsubscribe also triggers a callback before unsubscribing
|
||||
r cmdresult.unsubscribe
|
||||
|
||||
# After unsubscribe, new commands shouldn't trigger callbacks
|
||||
r cmdresult.success
|
||||
r ping
|
||||
|
||||
set stats [r cmdresult.stats]
|
||||
# Should have 2 callbacks (cmdresult.success + cmdresult.unsubscribe)
|
||||
assert_equal [dict get $stats total_callbacks] 2
|
||||
|
||||
# Trying to unsubscribe again should fail
|
||||
catch {r cmdresult.unsubscribe} err
|
||||
assert_match {*not subscribed*} $err
|
||||
}
|
||||
|
||||
test {Module commandresult - Cannot subscribe twice} {
|
||||
cleanup_callback
|
||||
r cmdresult.register all
|
||||
|
||||
# Trying to subscribe again should fail
|
||||
catch {r cmdresult.register all} err
|
||||
assert_match {*already subscribed*} $err
|
||||
|
||||
r cmdresult.unsubscribe
|
||||
}
|
||||
|
||||
test {Module commandresult - Reset clears stats and log} {
|
||||
cleanup_callback
|
||||
r cmdresult.register all
|
||||
|
||||
r cmdresult.success
|
||||
r ping
|
||||
catch {r cmdresult.fail} e
|
||||
|
||||
# Verify we have stats
|
||||
set stats [r cmdresult.stats]
|
||||
assert {[dict get $stats total_callbacks] > 0}
|
||||
|
||||
# Reset should clear everything
|
||||
cleanup_callback
|
||||
|
||||
set stats [r cmdresult.stats]
|
||||
assert_equal [dict get $stats total_callbacks] 0
|
||||
assert_equal [dict get $stats success_count] 0
|
||||
assert_equal [dict get $stats failure_count] 0
|
||||
assert_equal [dict get $stats rejected_count] 0
|
||||
assert_equal [dict get $stats acl_denied_count] 0
|
||||
|
||||
set log [r cmdresult.getlog]
|
||||
assert_equal [llength $log] 0
|
||||
}
|
||||
|
||||
test {Module commandresult - Invalid mode returns error} {
|
||||
cleanup_callback
|
||||
|
||||
catch {r cmdresult.register invalid_mode} err
|
||||
assert_match {*invalid mode*} $err
|
||||
}
|
||||
|
||||
test {Module commandresult - Command name is captured correctly} {
|
||||
cleanup_callback
|
||||
r cmdresult.register all
|
||||
|
||||
r cmdresult.success
|
||||
r set mykey myvalue
|
||||
r get mykey
|
||||
|
||||
set log [r cmdresult.getlog 3]
|
||||
|
||||
# Check that command names are correct
|
||||
set commands [list]
|
||||
foreach entry $log {
|
||||
lappend commands [dict get $entry command]
|
||||
}
|
||||
|
||||
assert {[lsearch $commands "get"] >= 0}
|
||||
assert {[lsearch $commands "set"] >= 0}
|
||||
assert {[lsearch $commands "cmdresult.success"] >= 0}
|
||||
|
||||
r cmdresult.unsubscribe
|
||||
}
|
||||
|
||||
test {Module commandresult - Unload with active subscription} {
|
||||
cleanup_callback
|
||||
r cmdresult.register all
|
||||
|
||||
# Execute some commands to ensure callback is active
|
||||
r cmdresult.success
|
||||
r ping
|
||||
|
||||
set stats [r cmdresult.stats]
|
||||
assert {[dict get $stats total_callbacks] >= 2}
|
||||
|
||||
# Unload module while subscription is still active
|
||||
assert_equal {OK} [r module unload commandresult]
|
||||
|
||||
# Reload module for remaining tests
|
||||
r module load $testmodule
|
||||
}
|
||||
|
||||
test {Module commandresult - Multiple callbacks from different operations} {
|
||||
cleanup_callback
|
||||
r cmdresult.register all
|
||||
|
||||
# Test callbacks from various sources
|
||||
r set testkey testvalue ;# Built-in command
|
||||
r get testkey ;# Built-in command
|
||||
r cmdresult.success ;# Module command
|
||||
catch {r cmdresult.fail} e ;# Failing module command
|
||||
|
||||
set stats [r cmdresult.stats]
|
||||
# Should have at least 4 callbacks
|
||||
assert {[dict get $stats total_callbacks] >= 4}
|
||||
assert {[dict get $stats success_count] >= 3}
|
||||
assert {[dict get $stats failure_count] >= 1}
|
||||
|
||||
r cmdresult.unsubscribe
|
||||
}
|
||||
|
||||
test {Module commandresult - Empty subscription optimization} {
|
||||
cleanup_callback
|
||||
# No subscription - this tests early return when no modules subscribed
|
||||
|
||||
# Execute commands without any subscriptions
|
||||
r cmdresult.success
|
||||
r ping
|
||||
|
||||
# Verify no callbacks were fired
|
||||
set stats [r cmdresult.stats]
|
||||
assert_equal [dict get $stats total_callbacks] 0
|
||||
}
|
||||
|
||||
test {Module commandresult - Duration is always positive} {
|
||||
cleanup_callback
|
||||
r cmdresult.register all
|
||||
|
||||
# Execute a command
|
||||
r cmdresult.success
|
||||
|
||||
set log [r cmdresult.getlog 1]
|
||||
set entry [lindex $log 0]
|
||||
|
||||
# Duration should be >= 0 microseconds
|
||||
assert {[dict get $entry duration_us] >= 0}
|
||||
|
||||
r cmdresult.unsubscribe
|
||||
}
|
||||
|
||||
test {Module commandresult - RM_Call creates nested command execution} {
|
||||
cleanup_callback
|
||||
r cmdresult.register all
|
||||
|
||||
# cmdresult.rmcall calls PING via RM_Call
|
||||
# Both the wrapper and inner command should be tracked
|
||||
r cmdresult.rmcall ping
|
||||
|
||||
set stats [r cmdresult.stats]
|
||||
# Should see both cmdresult.rmcall and ping callbacks
|
||||
assert {[dict get $stats total_callbacks] >= 2}
|
||||
|
||||
r cmdresult.unsubscribe
|
||||
}
|
||||
|
||||
test {Module commandresult - argv captures command arguments} {
|
||||
cleanup_callback
|
||||
r cmdresult.register all
|
||||
|
||||
# Execute a command with arguments
|
||||
r set mykey myvalue
|
||||
|
||||
set log [r cmdresult.getlog 1]
|
||||
set entry [lindex $log 0]
|
||||
|
||||
# Check argv is captured
|
||||
set argv [dict get $entry argv]
|
||||
assert_equal [lindex $argv 0] "set"
|
||||
assert_equal [lindex $argv 1] "mykey"
|
||||
assert_equal [lindex $argv 2] "myvalue"
|
||||
|
||||
r cmdresult.unsubscribe
|
||||
}
|
||||
|
||||
test {Module commandresult - argv captures multi-argument commands} {
|
||||
cleanup_callback
|
||||
r cmdresult.register all
|
||||
|
||||
# Execute MSET with multiple key-value pairs
|
||||
r mset key1 val1 key2 val2 key3 val3
|
||||
|
||||
set log [r cmdresult.getlog 1]
|
||||
set entry [lindex $log 0]
|
||||
|
||||
set argv [dict get $entry argv]
|
||||
assert_equal [lindex $argv 0] "mset"
|
||||
assert_equal [lindex $argv 1] "key1"
|
||||
assert_equal [lindex $argv 2] "val1"
|
||||
assert_equal [lindex $argv 3] "key2"
|
||||
assert_equal [lindex $argv 4] "val2"
|
||||
|
||||
r cmdresult.unsubscribe
|
||||
}
|
||||
|
||||
test {Module commandresult - argv with command rewriting} {
|
||||
cleanup_callback
|
||||
r cmdresult.register all
|
||||
|
||||
# EXPIRE gets rewritten to PEXPIREAT internally
|
||||
# But we should see original argv (EXPIRE)
|
||||
r set rewritekey "value"
|
||||
r expire rewritekey 100
|
||||
|
||||
set log [r cmdresult.getlog 1]
|
||||
set entry [lindex $log 0]
|
||||
|
||||
set argv [dict get $entry argv]
|
||||
# Should see original command, not rewritten one
|
||||
assert_equal [string tolower [lindex $argv 0]] "expire"
|
||||
assert_equal [lindex $argv 1] "rewritekey"
|
||||
assert_equal [lindex $argv 2] "100"
|
||||
|
||||
r cmdresult.unsubscribe
|
||||
}
|
||||
|
||||
test {Module commandresult - High volume command tracking} {
|
||||
cleanup_callback
|
||||
r cmdresult.register all
|
||||
|
||||
# Run many commands
|
||||
for {set i 0} {$i < 100} {incr i} {
|
||||
r ping
|
||||
}
|
||||
|
||||
set stats [r cmdresult.stats]
|
||||
assert {[dict get $stats total_callbacks] >= 100}
|
||||
|
||||
r cmdresult.unsubscribe
|
||||
}
|
||||
|
||||
test {Module commandresult - acl_rejected: command not permitted (ACL_DENIED_CMD)} {
|
||||
cleanup_callback
|
||||
r cmdresult.register acl_rejected
|
||||
|
||||
# Create a user with no command permissions and authenticate
|
||||
r acl setuser testuser_cmd on >testpass nocommands
|
||||
set rd [valkey_deferring_client]
|
||||
$rd auth testuser_cmd testpass
|
||||
$rd read
|
||||
$rd get somekey
|
||||
catch {$rd read} e
|
||||
$rd close
|
||||
|
||||
set stats [r cmdresult.stats]
|
||||
assert {[dict get $stats acl_denied_count] >= 1}
|
||||
assert_equal [dict get $stats success_count] 0
|
||||
assert_equal [dict get $stats failure_count] 0
|
||||
assert_equal [dict get $stats rejected_count] 0
|
||||
|
||||
set log [r cmdresult.getlog 1]
|
||||
set entry [lindex $log 0]
|
||||
assert_equal [dict get $entry status] "acl_rejected"
|
||||
# VALKEYMODULE_ACL_LOG_CMD = 1
|
||||
assert_equal [dict get $entry subevent] 1
|
||||
assert_equal [dict get $entry rejection_context] ""
|
||||
|
||||
r acl deluser testuser_cmd
|
||||
r cmdresult.unsubscribe
|
||||
}
|
||||
|
||||
test {Module commandresult - acl_rejected: key pattern not permitted (ACL_DENIED_KEY)} {
|
||||
cleanup_callback
|
||||
r cmdresult.register acl_rejected
|
||||
|
||||
# Create a user allowed to run GET but only on keys matching "allowed:*"
|
||||
r acl setuser testuser_key on >testpass allcommands ~allowed:* nopass
|
||||
set rd [valkey_deferring_client]
|
||||
$rd auth testuser_key testpass
|
||||
$rd read
|
||||
$rd get denied_key
|
||||
catch {$rd read} e
|
||||
$rd close
|
||||
|
||||
set stats [r cmdresult.stats]
|
||||
assert {[dict get $stats acl_denied_count] >= 1}
|
||||
|
||||
set log [r cmdresult.getlog 1]
|
||||
set entry [lindex $log 0]
|
||||
assert_equal [dict get $entry status] "acl_rejected"
|
||||
# VALKEYMODULE_ACL_LOG_KEY = 2
|
||||
assert_equal [dict get $entry subevent] 2
|
||||
assert_equal [dict get $entry rejection_context] "denied_key"
|
||||
|
||||
r acl deluser testuser_key
|
||||
r cmdresult.unsubscribe
|
||||
}
|
||||
|
||||
test {Module commandresult - acl_rejected: channel not permitted (ACL_DENIED_CHANNEL)} {
|
||||
cleanup_callback
|
||||
r cmdresult.register acl_rejected
|
||||
|
||||
# Create a user with allcommands but no pub/sub channel access
|
||||
r acl setuser testuser_chan on >testpass allcommands allkeys resetchannels nopass
|
||||
set rd [valkey_deferring_client]
|
||||
$rd auth testuser_chan testpass
|
||||
$rd read
|
||||
$rd subscribe secret_channel
|
||||
catch {$rd read} e
|
||||
$rd close
|
||||
|
||||
set stats [r cmdresult.stats]
|
||||
assert {[dict get $stats acl_denied_count] >= 1}
|
||||
|
||||
set log [r cmdresult.getlog 1]
|
||||
set entry [lindex $log 0]
|
||||
assert_equal [dict get $entry status] "acl_rejected"
|
||||
# VALKEYMODULE_ACL_LOG_CHANNEL = 3
|
||||
assert_equal [dict get $entry subevent] 3
|
||||
assert_equal [dict get $entry rejection_context] "secret_channel"
|
||||
|
||||
r acl deluser testuser_chan
|
||||
r cmdresult.unsubscribe
|
||||
}
|
||||
|
||||
test {Module commandresult - acl_rejected events not fired when not subscribed} {
|
||||
cleanup_callback
|
||||
r cmdresult.register failure
|
||||
|
||||
r acl setuser testuser_nosub on >testpass nocommands nopass
|
||||
set rd [valkey_deferring_client]
|
||||
$rd auth testuser_nosub testpass
|
||||
$rd read
|
||||
$rd get somekey
|
||||
catch {$rd read} e
|
||||
$rd close
|
||||
|
||||
set stats [r cmdresult.stats]
|
||||
assert_equal [dict get $stats acl_denied_count] 0
|
||||
assert_equal [dict get $stats rejected_count] 0
|
||||
|
||||
r acl deluser testuser_nosub
|
||||
r cmdresult.unsubscribe
|
||||
}
|
||||
|
||||
test {Module commandresult - acl_rejected: unauthenticated command (NOAUTH)} {
|
||||
cleanup_callback
|
||||
r cmdresult.register acl_rejected
|
||||
|
||||
# Enable password so new connections require authentication
|
||||
r config set requirepass testpass
|
||||
|
||||
# Open a raw unauthenticated connection and send a command without AUTH
|
||||
set rd [valkey_deferring_client_by_addr [srv 0 host] [srv 0 port]]
|
||||
$rd get somekey
|
||||
catch {$rd read} e
|
||||
$rd close
|
||||
|
||||
# Restore: the existing r session stays authenticated; just clear the password
|
||||
r config set requirepass ""
|
||||
|
||||
set stats [r cmdresult.stats]
|
||||
assert {[dict get $stats acl_denied_count] >= 1}
|
||||
|
||||
set log [r cmdresult.getlog 1]
|
||||
set entry [lindex $log 0]
|
||||
assert_equal [dict get $entry status] "acl_rejected"
|
||||
# VALKEYMODULE_ACL_LOG_AUTH = 0
|
||||
assert_equal [dict get $entry subevent] 0
|
||||
assert_equal [dict get $entry rejection_context] ""
|
||||
|
||||
r cmdresult.unsubscribe
|
||||
}
|
||||
|
||||
test {Module commandresult - Reset clears acl_denied_count} {
|
||||
cleanup_callback
|
||||
r cmdresult.register acl_rejected
|
||||
|
||||
r acl setuser testuser_reset on >testpass nocommands nopass
|
||||
set rd [valkey_deferring_client]
|
||||
$rd auth testuser_reset testpass
|
||||
$rd read
|
||||
$rd get somekey
|
||||
catch {$rd read} e
|
||||
$rd close
|
||||
|
||||
set stats [r cmdresult.stats]
|
||||
assert {[dict get $stats acl_denied_count] >= 1}
|
||||
|
||||
r cmdresult.reset
|
||||
set stats [r cmdresult.stats]
|
||||
assert_equal [dict get $stats acl_denied_count] 0
|
||||
assert_equal [dict get $stats rejected_count] 0
|
||||
assert_equal [dict get $stats total_callbacks] 0
|
||||
|
||||
r acl deluser testuser_reset
|
||||
r cmdresult.unsubscribe
|
||||
}
|
||||
|
||||
test {Module commandresult - rejected: unknown command (UNKNOWNCMD)} {
|
||||
cleanup_callback
|
||||
r cmdresult.register rejected
|
||||
|
||||
catch {r thisdoesnotexist} e
|
||||
|
||||
set stats [r cmdresult.stats]
|
||||
assert {[dict get $stats rejected_count] >= 1}
|
||||
|
||||
set log [r cmdresult.getlog 1]
|
||||
set entry [lindex $log 0]
|
||||
assert_equal [dict get $entry status] "rejected"
|
||||
assert_match {*unknown*command*} [string tolower [dict get $entry rejection_context]]
|
||||
|
||||
r cmdresult.unsubscribe
|
||||
}
|
||||
|
||||
test {Module commandresult - rejected: wrong number of arguments (WRONGARITY)} {
|
||||
cleanup_callback
|
||||
r cmdresult.register rejected
|
||||
|
||||
catch {r set} e
|
||||
|
||||
set stats [r cmdresult.stats]
|
||||
assert {[dict get $stats rejected_count] >= 1}
|
||||
|
||||
set log [r cmdresult.getlog 1]
|
||||
set entry [lindex $log 0]
|
||||
assert_equal [dict get $entry status] "rejected"
|
||||
assert_equal [dict get $entry command] "set"
|
||||
assert_match {*wrong*number*arguments*} [string tolower [dict get $entry rejection_context]]
|
||||
|
||||
r cmdresult.unsubscribe
|
||||
}
|
||||
|
||||
test {Module commandresult - rejected: command not allowed in MULTI (NOMULTI)} {
|
||||
cleanup_callback
|
||||
r cmdresult.register rejected
|
||||
|
||||
r multi
|
||||
catch {r multi} e
|
||||
r discard
|
||||
|
||||
set stats [r cmdresult.stats]
|
||||
assert {[dict get $stats rejected_count] >= 1}
|
||||
|
||||
set log [r cmdresult.getlog 1]
|
||||
set entry [lindex $log 0]
|
||||
assert_equal [dict get $entry status] "rejected"
|
||||
assert_equal [dict get $entry command] "multi"
|
||||
assert_match {*not allowed inside a transaction*} [string tolower [dict get $entry rejection_context]]
|
||||
|
||||
r cmdresult.unsubscribe
|
||||
}
|
||||
|
||||
test {Module commandresult - rejected: command not allowed in Pub/Sub context (PUBSUB)} {
|
||||
cleanup_callback
|
||||
r cmdresult.register rejected
|
||||
|
||||
set rd [valkey_deferring_client]
|
||||
$rd subscribe testchan
|
||||
$rd read
|
||||
$rd set foo bar
|
||||
catch {$rd read} e
|
||||
$rd unsubscribe testchan
|
||||
$rd read
|
||||
$rd close
|
||||
|
||||
set stats [r cmdresult.stats]
|
||||
assert {[dict get $stats rejected_count] >= 1}
|
||||
|
||||
set log [r cmdresult.getlog 1]
|
||||
set entry [lindex $log 0]
|
||||
assert_equal [dict get $entry status] "rejected"
|
||||
assert_equal [dict get $entry command] "set"
|
||||
assert_match {*only*(p|s)subscribe*} [string tolower [dict get $entry rejection_context]]
|
||||
|
||||
r cmdresult.unsubscribe
|
||||
}
|
||||
|
||||
test {Module commandresult - rejected: not enough replicas (NOREPLICAS)} {
|
||||
cleanup_callback
|
||||
r cmdresult.register rejected
|
||||
|
||||
r config set min-replicas-to-write 100
|
||||
|
||||
catch {r set foo bar} e
|
||||
assert_match {*NOREPLICAS*} $e
|
||||
|
||||
r config set min-replicas-to-write 0
|
||||
|
||||
set stats [r cmdresult.stats]
|
||||
assert {[dict get $stats rejected_count] >= 1}
|
||||
|
||||
set log [r cmdresult.getlog 1]
|
||||
set entry [lindex $log 0]
|
||||
assert_equal [dict get $entry status] "rejected"
|
||||
assert_equal [dict get $entry command] "set"
|
||||
assert_match {*NOREPLICAS*} [dict get $entry rejection_context]
|
||||
|
||||
r cmdresult.unsubscribe
|
||||
}
|
||||
|
||||
test {Module commandresult - rejected: out of memory (OOM)} {
|
||||
cleanup_callback
|
||||
r cmdresult.register rejected
|
||||
|
||||
r config set maxmemory 1
|
||||
r config set maxmemory-policy noeviction
|
||||
|
||||
catch {r set oomkey oomval} e
|
||||
assert_match {*OOM*} $e
|
||||
|
||||
r config set maxmemory 0
|
||||
|
||||
set stats [r cmdresult.stats]
|
||||
assert {[dict get $stats rejected_count] >= 1}
|
||||
|
||||
set log [r cmdresult.getlog 1]
|
||||
set entry [lindex $log 0]
|
||||
assert_equal [dict get $entry status] "rejected"
|
||||
assert_match {*OOM*} [dict get $entry rejection_context]
|
||||
|
||||
r cmdresult.unsubscribe
|
||||
}
|
||||
|
||||
test {Unload the module - commandresult} {
|
||||
catch {r cmdresult.unsubscribe}
|
||||
assert_equal {OK} [r module unload commandresult]
|
||||
}
|
||||
}
|
||||
@@ -169,6 +169,17 @@ start_server {overrides {save {900 1}} tags {"modules"}} {
|
||||
assert { "ever_authenticated" in $flags }
|
||||
}
|
||||
|
||||
test {test module clientinfo api - primary/replica/monitor flags} {
|
||||
# Normal client should not have primary, replica, or monitor flags
|
||||
set info [r test.clientinfo]
|
||||
set flags [parse_client_flags [dict get $info flags]]
|
||||
assert { "primary" ni $flags }
|
||||
assert { "replica" ni $flags }
|
||||
assert { "monitor" ni $flags }
|
||||
assert { "module" ni $flags }
|
||||
assert { "fake" ni $flags }
|
||||
}
|
||||
|
||||
foreach cmd {rm_call vm_call_argv} {
|
||||
test "tracking with $cmd sanity" {
|
||||
set rd_trk [valkey_client]
|
||||
|
||||
Reference in New Issue
Block a user