mirror of
https://github.com/clockworklabs/SpacetimeDB.git
synced 2026-05-14 03:37:55 -04:00
39f01289e5
# Description of Changes Implements the C# equivalent of #3638 This implement uses inheritance, where abstract base classes (like `ProcedureContextBase` in `ProcedureContext.cs`) store the core of the implementation, and then generated wrappers (like `ProcedureContext` in the generated FFI.cs file) inherit from them. For error handling, we work like Rust's implementation of `Result<T,E>` but we require `where E : Exception` because of how exceptions work in C#. Transaction-level failures come back as a `TxOutcome` and user errors should follow the `Result<T,E>` pattern. In this implementation, we have `UnwrapOrThrow()` throws exceptions directly because of C#'s error handling pattern. Unlike the Rust implementation's direct `Result` propagation, we are using an `AbortGuard` pattern (in `ProcedureContext.cs`) for exception handling, which uses `IDisposable` for automatic cleanup. Most changes should have fairly similar Rust-equivalents beyond that. For module authors, the changes here allow for the transation logic to work like: ```csharp ctx.TryWithTx<ResultType, Exception>(tx => { // transaction logic return Result<ResultType, Exception>.Ok(result); }); ``` This change includes a number of tests added to the `sdks/csharp/examples~/regression-tests/`'s `server` and `client` to validate the behavior of the changes. `server` changes provide further usage examples for module authors. # API and ABI breaking changes Should not be a breaking change # Expected complexity level and risk 2 # Testing - [x] Created Regression Tests that show transitions in procedures working in various ways, all of which pass.
295 lines
8.2 KiB
C#
295 lines
8.2 KiB
C#
namespace SpacetimeDB;
|
|
|
|
using System.Diagnostics.CodeAnalysis;
|
|
using Internal;
|
|
|
|
public readonly struct Result<T, E>(bool isSuccess, T? value, E? error)
|
|
where E : Exception
|
|
{
|
|
public bool IsSuccess { get; } = isSuccess;
|
|
public T? Value { get; } = value;
|
|
public E? Error { get; } = error;
|
|
|
|
public static Result<T, E> Ok(T value) => new(true, value, null);
|
|
|
|
public static Result<T, E> Err(E error) => new(false, default, error);
|
|
|
|
public T UnwrapOrThrow()
|
|
{
|
|
if (IsSuccess)
|
|
{
|
|
return Value!;
|
|
}
|
|
|
|
if (Error is not null)
|
|
{
|
|
throw Error;
|
|
}
|
|
|
|
throw new InvalidOperationException("Result failed without an error object.");
|
|
}
|
|
|
|
public T UnwrapOr(T defaultValue) => IsSuccess ? Value! : defaultValue;
|
|
|
|
public T UnwrapOrElse(Func<E, T> f) => IsSuccess ? Value! : f(Error!);
|
|
|
|
public TResult Match<TResult>(Func<T, TResult> onOk, Func<E, TResult> onErr) =>
|
|
IsSuccess ? onOk(Value!) : onErr(Error!);
|
|
}
|
|
|
|
#pragma warning disable STDB_UNSTABLE
|
|
public abstract class ProcedureContextBase(
|
|
Identity sender,
|
|
ConnectionId? connectionId,
|
|
Random random,
|
|
Timestamp time
|
|
) : Internal.IInternalProcedureContext
|
|
{
|
|
public static Identity Identity => Internal.IProcedureContext.GetIdentity();
|
|
public Identity Sender { get; } = sender;
|
|
public ConnectionId? ConnectionId { get; } = connectionId;
|
|
public Random Rng { get; } = random;
|
|
public Timestamp Timestamp { get; private set; } = time;
|
|
public AuthCtx SenderAuth { get; } = AuthCtx.BuildFromSystemTables(connectionId, sender);
|
|
|
|
private Internal.TxContext? txContext;
|
|
private ProcedureTxContextBase? cachedUserTxContext;
|
|
|
|
protected abstract ProcedureTxContextBase CreateTxContext(Internal.TxContext inner);
|
|
protected internal abstract LocalBase CreateLocal();
|
|
|
|
private protected ProcedureTxContextBase RequireTxContext()
|
|
{
|
|
var inner =
|
|
txContext
|
|
?? throw new InvalidOperationException("Transaction context was not initialised.");
|
|
cachedUserTxContext ??= CreateTxContext(inner);
|
|
cachedUserTxContext.Refresh(inner);
|
|
return cachedUserTxContext;
|
|
}
|
|
|
|
public Internal.TxContext EnterTxContext(long timestampMicros)
|
|
{
|
|
var timestamp = new Timestamp(timestampMicros);
|
|
Timestamp = timestamp;
|
|
txContext =
|
|
txContext?.WithTimestamp(timestamp)
|
|
?? new Internal.TxContext(
|
|
CreateLocal(),
|
|
Sender,
|
|
ConnectionId,
|
|
timestamp,
|
|
SenderAuth,
|
|
Rng
|
|
);
|
|
return txContext;
|
|
}
|
|
|
|
public void ExitTxContext() => txContext = null;
|
|
|
|
public readonly struct TxOutcome<TResult>(bool isSuccess, TResult? value, Exception? error)
|
|
{
|
|
public bool IsSuccess { get; } = isSuccess;
|
|
public TResult? Value { get; } = value;
|
|
public Exception? Error { get; } = error;
|
|
|
|
public static TxOutcome<TResult> Success(TResult value) => new(true, value, null);
|
|
|
|
public static TxOutcome<TResult> Failure(Exception error) => new(false, default, error);
|
|
|
|
public TResult UnwrapOrThrow() =>
|
|
IsSuccess
|
|
? Value!
|
|
: throw (
|
|
Error
|
|
?? new InvalidOperationException("Transaction failed without an error object.")
|
|
);
|
|
|
|
public TResult UnwrapOrThrow(Func<Exception> fallbackFactory) =>
|
|
IsSuccess ? Value! : throw (Error ?? fallbackFactory());
|
|
}
|
|
|
|
[Experimental("STDB_UNSTABLE")]
|
|
public TResult WithTx<TResult>(Func<ProcedureTxContextBase, TResult> body) =>
|
|
TryWithTx(tx => Result<TResult, Exception>.Ok(body(tx))).UnwrapOrThrow();
|
|
|
|
[Experimental("STDB_UNSTABLE")]
|
|
public TxOutcome<TResult> TryWithTx<TResult, TError>(
|
|
Func<ProcedureTxContextBase, Result<TResult, TError>> body
|
|
)
|
|
where TError : Exception
|
|
{
|
|
try
|
|
{
|
|
var result = RunWithRetry(body);
|
|
return result.IsSuccess
|
|
? TxOutcome<TResult>.Success(result.Value!)
|
|
: TxOutcome<TResult>.Failure(result.Error!);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
return TxOutcome<TResult>.Failure(ex);
|
|
}
|
|
}
|
|
|
|
// Private transaction management methods (Rust-like encapsulation)
|
|
private long StartMutTx()
|
|
{
|
|
var status = Internal.FFI.procedure_start_mut_tx(out var micros);
|
|
Internal.FFI.ErrnoHelpers.ThrowIfError(status);
|
|
return micros;
|
|
}
|
|
|
|
private void CommitMutTx()
|
|
{
|
|
var status = Internal.FFI.procedure_commit_mut_tx();
|
|
Internal.FFI.ErrnoHelpers.ThrowIfError(status);
|
|
}
|
|
|
|
private void AbortMutTx()
|
|
{
|
|
var status = Internal.FFI.procedure_abort_mut_tx();
|
|
Internal.FFI.ErrnoHelpers.ThrowIfError(status);
|
|
}
|
|
|
|
private bool CommitMutTxWithRetry(Func<bool> retryBody)
|
|
{
|
|
try
|
|
{
|
|
CommitMutTx();
|
|
return true;
|
|
}
|
|
catch (TransactionNotAnonymousException)
|
|
{
|
|
return false;
|
|
}
|
|
catch (StdbException)
|
|
{
|
|
Log.Warn("Committing anonymous transaction failed; retrying once.");
|
|
if (retryBody())
|
|
{
|
|
CommitMutTx();
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
}
|
|
|
|
private Result<TResult, TError> RunWithRetry<TResult, TError>(
|
|
Func<ProcedureTxContextBase, Result<TResult, TError>> body
|
|
)
|
|
where TError : Exception
|
|
{
|
|
var result = RunOnce(body);
|
|
if (!result.IsSuccess)
|
|
{
|
|
return result;
|
|
}
|
|
|
|
bool Retry()
|
|
{
|
|
result = RunOnce(body);
|
|
return result.IsSuccess;
|
|
}
|
|
|
|
if (!CommitMutTxWithRetry(Retry))
|
|
{
|
|
return result;
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
private Result<TResult, TError> RunOnce<TResult, TError>(
|
|
Func<ProcedureTxContextBase, Result<TResult, TError>> body
|
|
)
|
|
where TError : Exception
|
|
{
|
|
var micros = StartMutTx();
|
|
using var guard = new AbortGuard(AbortMutTx);
|
|
EnterTxContext(micros);
|
|
var txCtx = RequireTxContext();
|
|
|
|
Result<TResult, TError> result;
|
|
try
|
|
{
|
|
result = body(txCtx);
|
|
}
|
|
catch (Exception)
|
|
{
|
|
throw;
|
|
}
|
|
|
|
if (result.IsSuccess)
|
|
{
|
|
guard.Disarm();
|
|
return result;
|
|
}
|
|
|
|
AbortMutTx();
|
|
guard.Disarm();
|
|
return result;
|
|
}
|
|
|
|
private sealed class AbortGuard(Action abort) : IDisposable
|
|
{
|
|
private readonly Action abort = abort;
|
|
private bool disarmed;
|
|
|
|
public void Disarm() => disarmed = true;
|
|
|
|
public void Dispose()
|
|
{
|
|
if (!disarmed)
|
|
{
|
|
abort();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
public abstract class ProcedureTxContextBase(Internal.TxContext inner)
|
|
{
|
|
internal Internal.TxContext Inner { get; private set; } = inner;
|
|
|
|
internal void Refresh(Internal.TxContext inner) => Inner = inner;
|
|
|
|
public LocalBase Db => (LocalBase)Inner.Db;
|
|
public Identity Sender => Inner.Sender;
|
|
public ConnectionId? ConnectionId => Inner.ConnectionId;
|
|
public Timestamp Timestamp => Inner.Timestamp;
|
|
public AuthCtx SenderAuth => Inner.SenderAuth;
|
|
public Random Rng => Inner.Rng;
|
|
}
|
|
|
|
public abstract class LocalBase : Internal.Local { }
|
|
|
|
public sealed partial class RuntimeProcedureContext(
|
|
Identity sender,
|
|
ConnectionId? connectionId,
|
|
Random random,
|
|
Timestamp timestamp
|
|
) : ProcedureContextBase(sender, connectionId, random, timestamp)
|
|
{
|
|
private readonly Local _db = new();
|
|
|
|
protected internal override LocalBase CreateLocal() => _db;
|
|
|
|
protected override ProcedureTxContextBase CreateTxContext(Internal.TxContext inner) =>
|
|
_cached ??= new ProcedureTxContext(inner);
|
|
|
|
private ProcedureTxContext? _cached;
|
|
}
|
|
|
|
public sealed class ProcedureTxContext : ProcedureTxContextBase
|
|
{
|
|
internal ProcedureTxContext(Internal.TxContext inner)
|
|
: base(inner) { }
|
|
|
|
public new Local Db => (Local)base.Db;
|
|
}
|
|
|
|
public sealed class Local : LocalBase { }
|
|
|
|
#pragma warning restore STDB_UNSTABLE
|