diff --git a/sdks/csharp/src/SpacetimeDBClient.cs b/sdks/csharp/src/SpacetimeDBClient.cs index b0303b2460..92c3d58691 100644 --- a/sdks/csharp/src/SpacetimeDBClient.cs +++ b/sdks/csharp/src/SpacetimeDBClient.cs @@ -387,6 +387,29 @@ namespace SpacetimeDB } ); + /// + /// A collection of updates to the same table that need to be pre-processed. + /// This is the unit of work that is spread across worker threads. + /// + internal struct UpdatesToPreProcess + { + /// + /// The table handle to use to parse rows. + /// You should only use this in a thread-safe way. + /// + public IRemoteTableHandle Table; + + /// + /// The updates to parse. + /// + public List Updates; + + /// + /// The delta to fill with data. Starts out empty. + /// + public MultiDictionaryDelta Delta; + } + #if UNITY_WEBGL && !UNITY_EDITOR IEnumerator PreProcessMessages() #else @@ -416,8 +439,9 @@ namespace SpacetimeDB } } - IEnumerable<(IRemoteTableHandle, TableUpdate)> GetTables(DatabaseUpdate updates) + IEnumerable GetUpdatesToPreProcess(DatabaseUpdate updates, ProcessedDatabaseUpdate dbOps) { + Dictionary tableToUpdates = new(32); foreach (var update in updates.Tables) { var tableName = update.TableName; @@ -427,21 +451,33 @@ namespace SpacetimeDB Log.Error($"Unknown table name: {tableName}"); continue; } - yield return (table, update); + if (tableToUpdates.ContainsKey(table)) + { + tableToUpdates[table].Updates.Add(update); + } + else + { + var delta = dbOps.DeltaForTable(table); + tableToUpdates[table] = new() + { + Table = table, + Updates = new() { update }, + Delta = delta + }; + } } + + return tableToUpdates.Values; } ProcessedDatabaseUpdate PreProcessLegacySubscription(InitialSubscription initSub) { var dbOps = ProcessedDatabaseUpdate.New(); - // This is all of the inserts - int cap = initSub.DatabaseUpdate.Tables.Sum(a => (int)a.NumRows); - // First apply all of the state - foreach (var (table, update) in GetTables(initSub.DatabaseUpdate)) + Parallel.ForEach(GetUpdatesToPreProcess(initSub.DatabaseUpdate, dbOps), (todo) => { - PreProcessInsertOnlyTable(table, update, dbOps); - } + PreProcessInsertOnlyTable(todo); + }); return dbOps; } @@ -452,88 +488,92 @@ namespace SpacetimeDB ProcessedDatabaseUpdate PreProcessSubscribeMultiApplied(SubscribeMultiApplied subscribeMultiApplied) { var dbOps = ProcessedDatabaseUpdate.New(); - foreach (var (table, update) in GetTables(subscribeMultiApplied.Update)) + Parallel.ForEach(GetUpdatesToPreProcess(subscribeMultiApplied.Update, dbOps), (todo) => { - PreProcessInsertOnlyTable(table, update, dbOps); - } + PreProcessInsertOnlyTable(todo); + }); return dbOps; } - void PreProcessInsertOnlyTable(IRemoteTableHandle table, TableUpdate update, ProcessedDatabaseUpdate dbOps) + void PreProcessInsertOnlyTable(UpdatesToPreProcess todo) { - var delta = dbOps.DeltaForTable(table); - - foreach (var cqu in update.Updates) + foreach (var update in todo.Updates) { - var qu = DecompressDecodeQueryUpdate(cqu); - if (qu.Deletes.RowsData.Count > 0) + foreach (var cqu in update.Updates) { - Log.Warn("Non-insert during an insert-only server message!"); - } - var (insertReader, insertRowCount) = ParseRowList(qu.Inserts); - for (var i = 0; i < insertRowCount; i++) - { - var obj = Decode(table, insertReader, out var pk); - delta.Add(pk, obj); + var qu = DecompressDecodeQueryUpdate(cqu); + if (qu.Deletes.RowsData.Count > 0) + { + Log.Warn("Non-insert during an insert-only server message!"); + } + var (insertReader, insertRowCount) = ParseRowList(qu.Inserts); + for (var i = 0; i < insertRowCount; i++) + { + var obj = Decode(todo.Table, insertReader, out var pk); + todo.Delta.Add(pk, obj); + } } } } - void PreProcessDeleteOnlyTable(IRemoteTableHandle table, TableUpdate update, ProcessedDatabaseUpdate dbOps) + void PreProcessDeleteOnlyTable(UpdatesToPreProcess todo) { - var delta = dbOps.DeltaForTable(table); - foreach (var cqu in update.Updates) + foreach (var update in todo.Updates) { - var qu = DecompressDecodeQueryUpdate(cqu); - if (qu.Inserts.RowsData.Count > 0) + foreach (var cqu in update.Updates) { - Log.Warn("Non-delete during a delete-only operation!"); - } + var qu = DecompressDecodeQueryUpdate(cqu); + if (qu.Inserts.RowsData.Count > 0) + { + Log.Warn("Non-delete during a delete-only operation!"); + } - var (deleteReader, deleteRowCount) = ParseRowList(qu.Deletes); - for (var i = 0; i < deleteRowCount; i++) - { - var obj = Decode(table, deleteReader, out var pk); - delta.Remove(pk, obj); + var (deleteReader, deleteRowCount) = ParseRowList(qu.Deletes); + for (var i = 0; i < deleteRowCount; i++) + { + var obj = Decode(todo.Table, deleteReader, out var pk); + todo.Delta.Remove(pk, obj); + } } } } - void PreProcessTable(IRemoteTableHandle table, TableUpdate update, ProcessedDatabaseUpdate dbOps) + void PreProcessTable(UpdatesToPreProcess todo) { - var delta = dbOps.DeltaForTable(table); - foreach (var cqu in update.Updates) + foreach (var update in todo.Updates) { - var qu = DecompressDecodeQueryUpdate(cqu); - - // Because we are accumulating into a MultiDictionaryDelta that will be applied all-at-once - // to the table, it doesn't matter that we call Add before Remove here. - - var (insertReader, insertRowCount) = ParseRowList(qu.Inserts); - for (var i = 0; i < insertRowCount; i++) + foreach (var compressableQueryUpdate in update.Updates) { - var obj = Decode(table, insertReader, out var pk); - delta.Add(pk, obj); - } + var qu = DecompressDecodeQueryUpdate(compressableQueryUpdate); - var (deleteReader, deleteRowCount) = ParseRowList(qu.Deletes); - for (var i = 0; i < deleteRowCount; i++) - { - var obj = Decode(table, deleteReader, out var pk); - delta.Remove(pk, obj); + // Because we are accumulating into a MultiDictionaryDelta that will be applied all-at-once + // to the table, it doesn't matter that we call Add before Remove here. + + var (insertReader, insertRowCount) = ParseRowList(qu.Inserts); + for (var i = 0; i < insertRowCount; i++) + { + var obj = Decode(todo.Table, insertReader, out var pk); + todo.Delta.Add(pk, obj); + } + + var (deleteReader, deleteRowCount) = ParseRowList(qu.Deletes); + for (var i = 0; i < deleteRowCount; i++) + { + var obj = Decode(todo.Table, deleteReader, out var pk); + todo.Delta.Remove(pk, obj); + } } } - } ProcessedDatabaseUpdate PreProcessUnsubscribeMultiApplied(UnsubscribeMultiApplied unsubMultiApplied) { var dbOps = ProcessedDatabaseUpdate.New(); - foreach (var (table, update) in GetTables(unsubMultiApplied.Update)) + Parallel.ForEach(GetUpdatesToPreProcess(unsubMultiApplied.Update, dbOps), (todo) => { - PreProcessDeleteOnlyTable(table, update, dbOps); - } + PreProcessDeleteOnlyTable(todo); + }); return dbOps; } @@ -542,10 +582,10 @@ namespace SpacetimeDB { var dbOps = ProcessedDatabaseUpdate.New(); - foreach (var (table, update) in GetTables(updates)) + Parallel.ForEach(GetUpdatesToPreProcess(updates, dbOps), (todo) => { - PreProcessTable(table, update, dbOps); - } + PreProcessTable(todo); + }); return dbOps; } @@ -904,7 +944,14 @@ namespace SpacetimeDB } } - // Note: this method is called from unit tests. + /// + /// Callback for receiving a message from the websocket. + /// Note: this method is invoked on the websocket thread, not on the main thread. + /// That's fine, since all it does is push a message to a queue. + /// Note: this method is called from unit tests. + /// + /// + /// internal void OnMessageReceived(byte[] bytes, DateTime timestamp) => _messageQueue.Add(new UnprocessedMessage { bytes = bytes, timestamp = timestamp });