Files
SpacetimeDB/crates/bindings-csharp/Runtime/ProcedureContext.cs
rekhoff 39f01289e5 C# implementation of Transactions for Procedures (#3809)
# Description of Changes

Implements the C# equivalent of #3638

This implement uses inheritance, where abstract base classes (like
`ProcedureContextBase` in `ProcedureContext.cs`) store the core of the
implementation, and then generated wrappers (like `ProcedureContext` in
the generated FFI.cs file) inherit from them.

For error handling, we work like Rust's implementation of `Result<T,E>`
but we require `where E : Exception` because of how exceptions work in
C#. Transaction-level failures come back as a `TxOutcome` and user
errors should follow the `Result<T,E>` pattern. In this implementation,
we have `UnwrapOrThrow()` throws exceptions directly because of C#'s
error handling pattern.

Unlike the Rust implementation's direct `Result` propagation, we are
using an `AbortGuard` pattern (in `ProcedureContext.cs`) for exception
handling, which uses `IDisposable` for automatic cleanup.

Most changes should have fairly similar Rust-equivalents beyond that.
For module authors, the changes here allow for the transation logic to
work like:
```csharp
ctx.TryWithTx<ResultType, Exception>(tx => {
    // transaction logic
    return Result<ResultType, Exception>.Ok(result);
});
```
This change includes a number of tests added to the
`sdks/csharp/examples~/regression-tests/`'s `server` and `client` to
validate the behavior of the changes. `server` changes provide further
usage examples for module authors.

# API and ABI breaking changes

Should not be a breaking change

# Expected complexity level and risk

2

# Testing

- [x] Created Regression Tests that show transitions in procedures
working in various ways, all of which pass.
2025-12-18 18:41:47 +00:00

295 lines
8.2 KiB
C#

namespace SpacetimeDB;
using System.Diagnostics.CodeAnalysis;
using Internal;
public readonly struct Result<T, E>(bool isSuccess, T? value, E? error)
where E : Exception
{
public bool IsSuccess { get; } = isSuccess;
public T? Value { get; } = value;
public E? Error { get; } = error;
public static Result<T, E> Ok(T value) => new(true, value, null);
public static Result<T, E> Err(E error) => new(false, default, error);
public T UnwrapOrThrow()
{
if (IsSuccess)
{
return Value!;
}
if (Error is not null)
{
throw Error;
}
throw new InvalidOperationException("Result failed without an error object.");
}
public T UnwrapOr(T defaultValue) => IsSuccess ? Value! : defaultValue;
public T UnwrapOrElse(Func<E, T> f) => IsSuccess ? Value! : f(Error!);
public TResult Match<TResult>(Func<T, TResult> onOk, Func<E, TResult> onErr) =>
IsSuccess ? onOk(Value!) : onErr(Error!);
}
#pragma warning disable STDB_UNSTABLE
public abstract class ProcedureContextBase(
Identity sender,
ConnectionId? connectionId,
Random random,
Timestamp time
) : Internal.IInternalProcedureContext
{
public static Identity Identity => Internal.IProcedureContext.GetIdentity();
public Identity Sender { get; } = sender;
public ConnectionId? ConnectionId { get; } = connectionId;
public Random Rng { get; } = random;
public Timestamp Timestamp { get; private set; } = time;
public AuthCtx SenderAuth { get; } = AuthCtx.BuildFromSystemTables(connectionId, sender);
private Internal.TxContext? txContext;
private ProcedureTxContextBase? cachedUserTxContext;
protected abstract ProcedureTxContextBase CreateTxContext(Internal.TxContext inner);
protected internal abstract LocalBase CreateLocal();
private protected ProcedureTxContextBase RequireTxContext()
{
var inner =
txContext
?? throw new InvalidOperationException("Transaction context was not initialised.");
cachedUserTxContext ??= CreateTxContext(inner);
cachedUserTxContext.Refresh(inner);
return cachedUserTxContext;
}
public Internal.TxContext EnterTxContext(long timestampMicros)
{
var timestamp = new Timestamp(timestampMicros);
Timestamp = timestamp;
txContext =
txContext?.WithTimestamp(timestamp)
?? new Internal.TxContext(
CreateLocal(),
Sender,
ConnectionId,
timestamp,
SenderAuth,
Rng
);
return txContext;
}
public void ExitTxContext() => txContext = null;
public readonly struct TxOutcome<TResult>(bool isSuccess, TResult? value, Exception? error)
{
public bool IsSuccess { get; } = isSuccess;
public TResult? Value { get; } = value;
public Exception? Error { get; } = error;
public static TxOutcome<TResult> Success(TResult value) => new(true, value, null);
public static TxOutcome<TResult> Failure(Exception error) => new(false, default, error);
public TResult UnwrapOrThrow() =>
IsSuccess
? Value!
: throw (
Error
?? new InvalidOperationException("Transaction failed without an error object.")
);
public TResult UnwrapOrThrow(Func<Exception> fallbackFactory) =>
IsSuccess ? Value! : throw (Error ?? fallbackFactory());
}
[Experimental("STDB_UNSTABLE")]
public TResult WithTx<TResult>(Func<ProcedureTxContextBase, TResult> body) =>
TryWithTx(tx => Result<TResult, Exception>.Ok(body(tx))).UnwrapOrThrow();
[Experimental("STDB_UNSTABLE")]
public TxOutcome<TResult> TryWithTx<TResult, TError>(
Func<ProcedureTxContextBase, Result<TResult, TError>> body
)
where TError : Exception
{
try
{
var result = RunWithRetry(body);
return result.IsSuccess
? TxOutcome<TResult>.Success(result.Value!)
: TxOutcome<TResult>.Failure(result.Error!);
}
catch (Exception ex)
{
return TxOutcome<TResult>.Failure(ex);
}
}
// Private transaction management methods (Rust-like encapsulation)
private long StartMutTx()
{
var status = Internal.FFI.procedure_start_mut_tx(out var micros);
Internal.FFI.ErrnoHelpers.ThrowIfError(status);
return micros;
}
private void CommitMutTx()
{
var status = Internal.FFI.procedure_commit_mut_tx();
Internal.FFI.ErrnoHelpers.ThrowIfError(status);
}
private void AbortMutTx()
{
var status = Internal.FFI.procedure_abort_mut_tx();
Internal.FFI.ErrnoHelpers.ThrowIfError(status);
}
private bool CommitMutTxWithRetry(Func<bool> retryBody)
{
try
{
CommitMutTx();
return true;
}
catch (TransactionNotAnonymousException)
{
return false;
}
catch (StdbException)
{
Log.Warn("Committing anonymous transaction failed; retrying once.");
if (retryBody())
{
CommitMutTx();
return true;
}
return false;
}
}
private Result<TResult, TError> RunWithRetry<TResult, TError>(
Func<ProcedureTxContextBase, Result<TResult, TError>> body
)
where TError : Exception
{
var result = RunOnce(body);
if (!result.IsSuccess)
{
return result;
}
bool Retry()
{
result = RunOnce(body);
return result.IsSuccess;
}
if (!CommitMutTxWithRetry(Retry))
{
return result;
}
return result;
}
private Result<TResult, TError> RunOnce<TResult, TError>(
Func<ProcedureTxContextBase, Result<TResult, TError>> body
)
where TError : Exception
{
var micros = StartMutTx();
using var guard = new AbortGuard(AbortMutTx);
EnterTxContext(micros);
var txCtx = RequireTxContext();
Result<TResult, TError> result;
try
{
result = body(txCtx);
}
catch (Exception)
{
throw;
}
if (result.IsSuccess)
{
guard.Disarm();
return result;
}
AbortMutTx();
guard.Disarm();
return result;
}
private sealed class AbortGuard(Action abort) : IDisposable
{
private readonly Action abort = abort;
private bool disarmed;
public void Disarm() => disarmed = true;
public void Dispose()
{
if (!disarmed)
{
abort();
}
}
}
}
public abstract class ProcedureTxContextBase(Internal.TxContext inner)
{
internal Internal.TxContext Inner { get; private set; } = inner;
internal void Refresh(Internal.TxContext inner) => Inner = inner;
public LocalBase Db => (LocalBase)Inner.Db;
public Identity Sender => Inner.Sender;
public ConnectionId? ConnectionId => Inner.ConnectionId;
public Timestamp Timestamp => Inner.Timestamp;
public AuthCtx SenderAuth => Inner.SenderAuth;
public Random Rng => Inner.Rng;
}
public abstract class LocalBase : Internal.Local { }
public sealed partial class RuntimeProcedureContext(
Identity sender,
ConnectionId? connectionId,
Random random,
Timestamp timestamp
) : ProcedureContextBase(sender, connectionId, random, timestamp)
{
private readonly Local _db = new();
protected internal override LocalBase CreateLocal() => _db;
protected override ProcedureTxContextBase CreateTxContext(Internal.TxContext inner) =>
_cached ??= new ProcedureTxContext(inner);
private ProcedureTxContext? _cached;
}
public sealed class ProcedureTxContext : ProcedureTxContextBase
{
internal ProcedureTxContext(Internal.TxContext inner)
: base(inner) { }
public new Local Db => (Local)base.Db;
}
public sealed class Local : LocalBase { }
#pragma warning restore STDB_UNSTABLE