Remove warmup from distributed keynote bench (#4757)

# Description of Changes

Replaces the warmup period in the distributed version of the `keynote-2`
benchmark with an explicit start barrier.

1. Removes `--warmup-seconds` from the distributed benchmark flow
2. Adds an explicit `starting` phase where generators start their local
epoch and POST `/started`
3. Makes the coordinator wait for all participant start acknowledgements
before beginning the measured window
4. Adds `--start-ack-timeout-seconds` as the timeout for that start
barrier
5. Removes `warmupSeconds` from the distributed benchmark
protocol/result types

# API and ABI breaking changes

N/A

# Expected complexity level and risk

1.5

# Testing

N/A
This commit is contained in:
joshua-spacetime
2026-04-07 17:44:46 -07:00
committed by GitHub
parent 29a9d063a3
commit 0586258ece
6 changed files with 102 additions and 31 deletions
+4 -4
View File
@@ -256,7 +256,6 @@ cd templates/keynote-2
pnpm run bench-dist-coordinator -- \
--test test-1 \
--connector spacetimedb \
--warmup-seconds 15 \
--window-seconds 30 \
--verify 1 \
--stdb-url ws://127.0.0.1:3000 \
@@ -268,9 +267,10 @@ pnpm run bench-dist-coordinator -- \
Notes:
- `--warmup-seconds` is the unmeasured warmup period. Generators submit requests during warmup, but those transactions are excluded from TPS.
- Before measurement begins, the coordinator waits for every participating generator to start its epoch and acknowledge that it is running.
- `--window-seconds` is the measured interval.
- `--verify 1` preserves the existing benchmark semantics by running one verification pass centrally after the epoch completes.
- If a generator never acknowledges start, the coordinator fails the epoch after `--start-ack-timeout-seconds` seconds. The default is `60`.
- The coordinator derives the HTTP metrics endpoint from `--stdb-url` by switching to `http://` or `https://` and appending `/v1/metrics`.
- For a real multi-machine run, change `--bind 127.0.0.1` to `--bind 0.0.0.0` so remote generators can reach the coordinator.
- For a real multi-machine run, set `--stdb-url` to the server machine's reachable address.
@@ -367,8 +367,8 @@ The result contains:
#### Operational notes
- Start the coordinator before the generators.
- Generators begin submitting requests when the coordinator enters `warmup`, not when the measured window begins.
- Throughput is measured only from the committed transaction counter delta recorded after warmup, so warmup transactions are excluded.
- Generators begin submitting requests when the coordinator enters `starting`.
- Throughput is measured only from the committed transaction counter delta recorded after all participating generators have acknowledged start, so startup traffic is excluded.
- For this distributed TypeScript mode, each connection runs closed-loop with one request at a time. There is no pipelining in this flow.
- Late generators are allowed to register and become ready while an epoch is already running, but they only participate in the next epoch.
- The coordinator does not use heartbeats. It includes generators that most recently reported `ready`.
-5
View File
@@ -376,11 +376,6 @@ export async function runOne({
return { start, completedWithinWindow, completedTotal, committedDelta };
};
// const warmUpSeconds = 5;
// console.log(`[${connector.name}] Warming up for ${warmUpSeconds}s...`);
// await run(warmUpSeconds);
// console.log(`[${connector.name}] Finished warmup.`);
console.log(`[${connector.name}] Starting workers for ${seconds}s run...`);
const { start, completedWithinWindow, completedTotal, committedDelta } =
@@ -12,22 +12,26 @@ function formatErrorWithCause(err: unknown): string {
return `${err.message}${cause}`;
}
export async function fetchMetrics(url: string): Promise<string> {
async function fetchText(url: string, label: string): Promise<string> {
let res: Response;
try {
res = await fetch(url);
} catch (err) {
throw new Error(`metrics GET ${url} failed: ${formatErrorWithCause(err)}`);
throw new Error(`${label} GET ${url} failed: ${formatErrorWithCause(err)}`);
}
if (!res.ok) {
throw new Error(
`metrics GET ${url} failed: ${res.status} ${res.statusText}`,
`${label} GET ${url} failed: ${res.status} ${res.statusText}`,
);
}
return await res.text();
}
export async function fetchMetrics(url: string): Promise<string> {
return await fetchText(url, 'metrics');
}
export function parseMetricCounter(
body: string,
metricName: string,
@@ -29,6 +29,7 @@ import type {
RegisterRequest,
StartEpochRequest,
StartEpochResponse,
StartedRequest,
StoppedRequest,
} from './protocol.ts';
import { isoNow, sleep, writeJsonFile } from './util.ts';
@@ -47,6 +48,7 @@ type ActiveEpoch = {
label: string | null;
participantIds: string[];
participantConnections: number;
startedAcks: Set<string>;
stopAcks: Set<string>;
};
@@ -110,7 +112,7 @@ async function runVerification(
class DistributedCoordinator {
private readonly testName: string;
private readonly connectorName: string;
private readonly warmupMs: number;
private readonly startAckTimeoutMs: number;
private readonly windowMs: number;
private readonly verifyAfterEpoch: boolean;
private readonly stopAckTimeoutMs: number;
@@ -129,7 +131,7 @@ class DistributedCoordinator {
constructor(opts: {
testName: string;
connectorName: string;
warmupMs: number;
startAckTimeoutMs: number;
windowMs: number;
verifyAfterEpoch: boolean;
stopAckTimeoutMs: number;
@@ -140,7 +142,7 @@ class DistributedCoordinator {
}) {
this.testName = opts.testName;
this.connectorName = opts.connectorName;
this.warmupMs = opts.warmupMs;
this.startAckTimeoutMs = opts.startAckTimeoutMs;
this.windowMs = opts.windowMs;
this.verifyAfterEpoch = opts.verifyAfterEpoch;
this.stopAckTimeoutMs = opts.stopAckTimeoutMs;
@@ -194,6 +196,24 @@ class DistributedCoordinator {
return this.snapshot();
}
started(body: StartedRequest): CoordinatorState {
const generator = this.requireGenerator(body.id);
if (!this.currentEpoch || body.epoch !== this.currentEpoch.epoch) {
throw new Error(
`Generator "${body.id}" acknowledged unexpected epoch ${body.epoch}`,
);
}
if (generator.activeEpoch !== body.epoch) {
throw new Error(
`Generator "${body.id}" is not assigned to epoch ${body.epoch}`,
);
}
generator.localState = 'running';
this.currentEpoch.startedAcks.add(body.id);
return this.snapshot();
}
stopped(body: StoppedRequest): CoordinatorState {
const generator = this.requireGenerator(body.id);
generator.localState = 'ready';
@@ -252,18 +272,19 @@ class DistributedCoordinator {
(sum, generator) => sum + generator.openedConnections,
0,
),
startedAcks: new Set<string>(),
stopAcks: new Set<string>(),
};
for (const participantId of activeEpoch.participantIds) {
const generator = this.generators.get(participantId);
if (!generator) continue;
generator.localState = 'running';
generator.localState = 'starting';
generator.activeEpoch = activeEpoch.epoch;
}
this.currentEpoch = activeEpoch;
this.phase = 'warmup';
this.phase = 'starting';
this.epochTask = this.runEpoch(activeEpoch)
.catch((err) => {
const msg = err instanceof Error ? err.message : String(err);
@@ -297,9 +318,14 @@ class DistributedCoordinator {
try {
console.log(
`[coordinator] epoch ${activeEpoch.epoch} warmup for ${(this.warmupMs / 1000).toFixed(1)}s`,
`[coordinator] epoch ${activeEpoch.epoch} waiting for start acknowledgements from ${activeEpoch.participantIds.length} generators`,
);
await sleep(this.warmupMs);
const pendingStarts = await this.waitForStarts(activeEpoch);
if (pendingStarts.length > 0) {
throw new Error(
`Missing start acknowledgements from: ${pendingStarts.join(', ')}`,
);
}
const before = await getSpacetimeCommittedTransfers(this.stdbUrl);
if (before == null) {
@@ -379,7 +405,6 @@ class DistributedCoordinator {
label: activeEpoch.label,
test: this.testName,
connector: this.connectorName,
warmupSeconds: this.warmupMs / 1000,
windowSeconds: this.windowMs / 1000,
actualWindowSeconds,
participantIds: activeEpoch.participantIds,
@@ -405,6 +430,25 @@ class DistributedCoordinator {
console.log(`[coordinator] wrote epoch ${result.epoch} result to ${outPath}`);
}
private async waitForStarts(activeEpoch: ActiveEpoch): Promise<string[]> {
const deadline = Date.now() + this.startAckTimeoutMs;
while (Date.now() < deadline) {
if (activeEpoch.startedAcks.size >= activeEpoch.participantIds.length) {
return [];
}
await sleep(250);
}
const pending = activeEpoch.participantIds.filter(
(id) => !activeEpoch.startedAcks.has(id),
);
console.warn(
`[coordinator] start acknowledgements timed out for epoch ${activeEpoch.epoch}: ${pending.join(', ')}`,
);
return pending;
}
private async waitForStops(activeEpoch: ActiveEpoch): Promise<string[]> {
const deadline = Date.now() + this.stopAckTimeoutMs;
@@ -450,7 +494,11 @@ async function main(): Promise<void> {
new URL('../../runs/distributed/', import.meta.url),
);
const resultsDir = getStringFlag(flags, 'results-dir', defaultResultsDir);
const warmupSeconds = getNumberFlag(flags, 'warmup-seconds', 15);
const startAckTimeoutSeconds = getNumberFlag(
flags,
'start-ack-timeout-seconds',
60,
);
const windowSeconds = getNumberFlag(flags, 'window-seconds', 60);
const stopAckTimeoutSeconds = getNumberFlag(
flags,
@@ -479,7 +527,7 @@ async function main(): Promise<void> {
const coordinator = new DistributedCoordinator({
testName,
connectorName,
warmupMs: warmupSeconds * 1000,
startAckTimeoutMs: startAckTimeoutSeconds * 1000,
windowMs: windowSeconds * 1000,
verifyAfterEpoch,
stopAckTimeoutMs: stopAckTimeoutSeconds * 1000,
@@ -517,6 +565,12 @@ async function main(): Promise<void> {
return;
}
if (method === 'POST' && path === '/started') {
const body = await readJsonBody<StartedRequest>(req);
json(res, 200, coordinator.started(body));
return;
}
if (method === 'POST' && path === '/stopped') {
const body = await readJsonBody<StoppedRequest>(req);
json(res, 200, coordinator.stopped(body));
@@ -550,7 +604,7 @@ async function main(): Promise<void> {
});
console.log(
`[coordinator] listening on http://${bind}:${port} test=${testName} connector=${connectorName} warmup=${warmupSeconds}s window=${windowSeconds}s verify=${verifyAfterEpoch ? 'on' : 'off'} stdb=${stdbUrl} compression=${stdbCompression}`,
`[coordinator] listening on http://${bind}:${port} test=${testName} connector=${connectorName} start_ack_timeout=${startAckTimeoutSeconds}s window=${windowSeconds}s verify=${verifyAfterEpoch ? 'on' : 'off'} stdb=${stdbUrl} compression=${stdbCompression}`,
);
}
@@ -89,6 +89,18 @@ async function main(): Promise<void> {
let activeEpoch: number | null = null;
let stopping = false;
const startActiveEpoch = async (epoch: number) => {
console.log(`[generator ${id}] starting epoch ${epoch}`);
await session.startEpoch(epoch);
activeEpoch = epoch;
await retryUntilSuccess('[generator] started', async () => {
await postJson<CoordinatorState>(coordinatorUrl, '/started', {
id,
epoch,
});
}, pollMs, controlRetries, () => !stopping);
};
const stopActiveEpoch = async () => {
if (activeEpoch == null) return;
@@ -147,17 +159,15 @@ async function main(): Promise<void> {
state.participants.includes(id);
const shouldKeepRunning =
isParticipant &&
(state.phase === 'warmup' || state.phase === 'measure');
(state.phase === 'starting' || state.phase === 'measure');
if (!activeEpoch) {
if (
state.phase === 'warmup' &&
state.phase === 'starting' &&
state.currentEpoch != null &&
state.participants.includes(id)
) {
console.log(`[generator ${id}] starting epoch ${state.currentEpoch}`);
await session.startEpoch(state.currentEpoch);
activeEpoch = state.currentEpoch;
await startActiveEpoch(state.currentEpoch);
}
} else if (!shouldKeepRunning) {
await stopActiveEpoch();
@@ -1,6 +1,10 @@
export type GeneratorLocalState = 'registered' | 'ready' | 'running';
export type GeneratorLocalState =
| 'registered'
| 'ready'
| 'starting'
| 'running';
export type CoordinatorPhase = 'idle' | 'warmup' | 'measure' | 'stop';
export type CoordinatorPhase = 'idle' | 'starting' | 'measure' | 'stop';
export type GeneratorSnapshot = {
id: string;
@@ -16,7 +20,6 @@ export type EpochResult = {
label: string | null;
test: string;
connector: string;
warmupSeconds: number;
windowSeconds: number;
actualWindowSeconds: number;
participantIds: string[];
@@ -54,6 +57,11 @@ export type ReadyRequest = {
openedConnections: number;
};
export type StartedRequest = {
id: string;
epoch: number;
};
export type StoppedRequest = {
id: string;
epoch: number;