Pre process messages in parallel, take 1

This commit is contained in:
James Gilles
2025-05-19 14:37:20 -04:00
parent 75e1a746ee
commit 8d16648e7e
+109 -62
View File
@@ -387,6 +387,29 @@ namespace SpacetimeDB
}
);
/// <summary>
/// A collection of updates to the same table that need to be pre-processed.
/// This is the unit of work that is spread across worker threads.
/// </summary>
internal struct UpdatesToPreProcess
{
/// <summary>
/// The table handle to use to parse rows.
/// You should only use this in a thread-safe way.
/// </summary>
public IRemoteTableHandle Table;
/// <summary>
/// The updates to parse.
/// </summary>
public List<TableUpdate> Updates;
/// <summary>
/// The delta to fill with data. Starts out empty.
/// </summary>
public MultiDictionaryDelta<object, PreHashedRow> Delta;
}
#if UNITY_WEBGL && !UNITY_EDITOR
IEnumerator PreProcessMessages()
#else
@@ -416,8 +439,9 @@ namespace SpacetimeDB
}
}
IEnumerable<(IRemoteTableHandle, TableUpdate)> GetTables(DatabaseUpdate updates)
IEnumerable<UpdatesToPreProcess> GetUpdatesToPreProcess(DatabaseUpdate updates, ProcessedDatabaseUpdate dbOps)
{
Dictionary<IRemoteTableHandle, UpdatesToPreProcess> tableToUpdates = new(32);
foreach (var update in updates.Tables)
{
var tableName = update.TableName;
@@ -427,21 +451,33 @@ namespace SpacetimeDB
Log.Error($"Unknown table name: {tableName}");
continue;
}
yield return (table, update);
if (tableToUpdates.ContainsKey(table))
{
tableToUpdates[table].Updates.Add(update);
}
else
{
var delta = dbOps.DeltaForTable(table);
tableToUpdates[table] = new()
{
Table = table,
Updates = new() { update },
Delta = delta
};
}
}
return tableToUpdates.Values;
}
ProcessedDatabaseUpdate PreProcessLegacySubscription(InitialSubscription initSub)
{
var dbOps = ProcessedDatabaseUpdate.New();
// This is all of the inserts
int cap = initSub.DatabaseUpdate.Tables.Sum(a => (int)a.NumRows);
// First apply all of the state
foreach (var (table, update) in GetTables(initSub.DatabaseUpdate))
Parallel.ForEach(GetUpdatesToPreProcess(initSub.DatabaseUpdate, dbOps), (todo) =>
{
PreProcessInsertOnlyTable(table, update, dbOps);
}
PreProcessInsertOnlyTable(todo);
});
return dbOps;
}
@@ -452,88 +488,92 @@ namespace SpacetimeDB
ProcessedDatabaseUpdate PreProcessSubscribeMultiApplied(SubscribeMultiApplied subscribeMultiApplied)
{
var dbOps = ProcessedDatabaseUpdate.New();
foreach (var (table, update) in GetTables(subscribeMultiApplied.Update))
Parallel.ForEach(GetUpdatesToPreProcess(subscribeMultiApplied.Update, dbOps), (todo) =>
{
PreProcessInsertOnlyTable(table, update, dbOps);
}
PreProcessInsertOnlyTable(todo);
});
return dbOps;
}
void PreProcessInsertOnlyTable(IRemoteTableHandle table, TableUpdate update, ProcessedDatabaseUpdate dbOps)
void PreProcessInsertOnlyTable(UpdatesToPreProcess todo)
{
var delta = dbOps.DeltaForTable(table);
foreach (var cqu in update.Updates)
foreach (var update in todo.Updates)
{
var qu = DecompressDecodeQueryUpdate(cqu);
if (qu.Deletes.RowsData.Count > 0)
foreach (var cqu in update.Updates)
{
Log.Warn("Non-insert during an insert-only server message!");
}
var (insertReader, insertRowCount) = ParseRowList(qu.Inserts);
for (var i = 0; i < insertRowCount; i++)
{
var obj = Decode(table, insertReader, out var pk);
delta.Add(pk, obj);
var qu = DecompressDecodeQueryUpdate(cqu);
if (qu.Deletes.RowsData.Count > 0)
{
Log.Warn("Non-insert during an insert-only server message!");
}
var (insertReader, insertRowCount) = ParseRowList(qu.Inserts);
for (var i = 0; i < insertRowCount; i++)
{
var obj = Decode(todo.Table, insertReader, out var pk);
todo.Delta.Add(pk, obj);
}
}
}
}
void PreProcessDeleteOnlyTable(IRemoteTableHandle table, TableUpdate update, ProcessedDatabaseUpdate dbOps)
void PreProcessDeleteOnlyTable(UpdatesToPreProcess todo)
{
var delta = dbOps.DeltaForTable(table);
foreach (var cqu in update.Updates)
foreach (var update in todo.Updates)
{
var qu = DecompressDecodeQueryUpdate(cqu);
if (qu.Inserts.RowsData.Count > 0)
foreach (var cqu in update.Updates)
{
Log.Warn("Non-delete during a delete-only operation!");
}
var qu = DecompressDecodeQueryUpdate(cqu);
if (qu.Inserts.RowsData.Count > 0)
{
Log.Warn("Non-delete during a delete-only operation!");
}
var (deleteReader, deleteRowCount) = ParseRowList(qu.Deletes);
for (var i = 0; i < deleteRowCount; i++)
{
var obj = Decode(table, deleteReader, out var pk);
delta.Remove(pk, obj);
var (deleteReader, deleteRowCount) = ParseRowList(qu.Deletes);
for (var i = 0; i < deleteRowCount; i++)
{
var obj = Decode(todo.Table, deleteReader, out var pk);
todo.Delta.Remove(pk, obj);
}
}
}
}
void PreProcessTable(IRemoteTableHandle table, TableUpdate update, ProcessedDatabaseUpdate dbOps)
void PreProcessTable(UpdatesToPreProcess todo)
{
var delta = dbOps.DeltaForTable(table);
foreach (var cqu in update.Updates)
foreach (var update in todo.Updates)
{
var qu = DecompressDecodeQueryUpdate(cqu);
// Because we are accumulating into a MultiDictionaryDelta that will be applied all-at-once
// to the table, it doesn't matter that we call Add before Remove here.
var (insertReader, insertRowCount) = ParseRowList(qu.Inserts);
for (var i = 0; i < insertRowCount; i++)
foreach (var compressableQueryUpdate in update.Updates)
{
var obj = Decode(table, insertReader, out var pk);
delta.Add(pk, obj);
}
var qu = DecompressDecodeQueryUpdate(compressableQueryUpdate);
var (deleteReader, deleteRowCount) = ParseRowList(qu.Deletes);
for (var i = 0; i < deleteRowCount; i++)
{
var obj = Decode(table, deleteReader, out var pk);
delta.Remove(pk, obj);
// Because we are accumulating into a MultiDictionaryDelta that will be applied all-at-once
// to the table, it doesn't matter that we call Add before Remove here.
var (insertReader, insertRowCount) = ParseRowList(qu.Inserts);
for (var i = 0; i < insertRowCount; i++)
{
var obj = Decode(todo.Table, insertReader, out var pk);
todo.Delta.Add(pk, obj);
}
var (deleteReader, deleteRowCount) = ParseRowList(qu.Deletes);
for (var i = 0; i < deleteRowCount; i++)
{
var obj = Decode(todo.Table, deleteReader, out var pk);
todo.Delta.Remove(pk, obj);
}
}
}
}
ProcessedDatabaseUpdate PreProcessUnsubscribeMultiApplied(UnsubscribeMultiApplied unsubMultiApplied)
{
var dbOps = ProcessedDatabaseUpdate.New();
foreach (var (table, update) in GetTables(unsubMultiApplied.Update))
Parallel.ForEach(GetUpdatesToPreProcess(unsubMultiApplied.Update, dbOps), (todo) =>
{
PreProcessDeleteOnlyTable(table, update, dbOps);
}
PreProcessDeleteOnlyTable(todo);
});
return dbOps;
}
@@ -542,10 +582,10 @@ namespace SpacetimeDB
{
var dbOps = ProcessedDatabaseUpdate.New();
foreach (var (table, update) in GetTables(updates))
Parallel.ForEach(GetUpdatesToPreProcess(updates, dbOps), (todo) =>
{
PreProcessTable(table, update, dbOps);
}
PreProcessTable(todo);
});
return dbOps;
}
@@ -904,7 +944,14 @@ namespace SpacetimeDB
}
}
// Note: this method is called from unit tests.
/// <summary>
/// Callback for receiving a message from the websocket.
/// Note: this method is invoked on the websocket thread, not on the main thread.
/// That's fine, since all it does is push a message to a queue.
/// Note: this method is called from unit tests.
/// </summary>
/// <param name="bytes"></param>
/// <param name="timestamp"></param>
internal void OnMessageReceived(byte[] bytes, DateTime timestamp) =>
_messageQueue.Add(new UnprocessedMessage { bytes = bytes, timestamp = timestamp });