diff --git a/templates/keynote-2/DEVELOP.md b/templates/keynote-2/DEVELOP.md index e18a1002a..fc4deee14 100644 --- a/templates/keynote-2/DEVELOP.md +++ b/templates/keynote-2/DEVELOP.md @@ -256,7 +256,6 @@ cd templates/keynote-2 pnpm run bench-dist-coordinator -- \ --test test-1 \ --connector spacetimedb \ - --warmup-seconds 15 \ --window-seconds 30 \ --verify 1 \ --stdb-url ws://127.0.0.1:3000 \ @@ -268,9 +267,10 @@ pnpm run bench-dist-coordinator -- \ Notes: -- `--warmup-seconds` is the unmeasured warmup period. Generators submit requests during warmup, but those transactions are excluded from TPS. +- Before measurement begins, the coordinator waits for every participating generator to start its epoch and acknowledge that it is running. - `--window-seconds` is the measured interval. - `--verify 1` preserves the existing benchmark semantics by running one verification pass centrally after the epoch completes. +- If a generator never acknowledges start, the coordinator fails the epoch after `--start-ack-timeout-seconds` seconds. The default is `60`. - The coordinator derives the HTTP metrics endpoint from `--stdb-url` by switching to `http://` or `https://` and appending `/v1/metrics`. - For a real multi-machine run, change `--bind 127.0.0.1` to `--bind 0.0.0.0` so remote generators can reach the coordinator. - For a real multi-machine run, set `--stdb-url` to the server machine's reachable address. @@ -367,8 +367,8 @@ The result contains: #### Operational notes - Start the coordinator before the generators. -- Generators begin submitting requests when the coordinator enters `warmup`, not when the measured window begins. -- Throughput is measured only from the committed transaction counter delta recorded after warmup, so warmup transactions are excluded. +- Generators begin submitting requests when the coordinator enters `starting`. +- Throughput is measured only from the committed transaction counter delta recorded after all participating generators have acknowledged start, so startup traffic is excluded. - For this distributed TypeScript mode, each connection runs closed-loop with one request at a time. There is no pipelining in this flow. - Late generators are allowed to register and become ready while an epoch is already running, but they only participate in the next epoch. - The coordinator does not use heartbeats. It includes generators that most recently reported `ready`. diff --git a/templates/keynote-2/src/core/runner.ts b/templates/keynote-2/src/core/runner.ts index 3078d9850..80054d492 100644 --- a/templates/keynote-2/src/core/runner.ts +++ b/templates/keynote-2/src/core/runner.ts @@ -376,11 +376,6 @@ export async function runOne({ return { start, completedWithinWindow, completedTotal, committedDelta }; }; - // const warmUpSeconds = 5; - // console.log(`[${connector.name}] Warming up for ${warmUpSeconds}s...`); - // await run(warmUpSeconds); - // console.log(`[${connector.name}] Finished warmup.`); - console.log(`[${connector.name}] Starting workers for ${seconds}s run...`); const { start, completedWithinWindow, completedTotal, committedDelta } = diff --git a/templates/keynote-2/src/core/spacetimeMetrics.ts b/templates/keynote-2/src/core/spacetimeMetrics.ts index a49e0f7e8..8bc088814 100644 --- a/templates/keynote-2/src/core/spacetimeMetrics.ts +++ b/templates/keynote-2/src/core/spacetimeMetrics.ts @@ -12,22 +12,26 @@ function formatErrorWithCause(err: unknown): string { return `${err.message}${cause}`; } -export async function fetchMetrics(url: string): Promise { +async function fetchText(url: string, label: string): Promise { let res: Response; try { res = await fetch(url); } catch (err) { - throw new Error(`metrics GET ${url} failed: ${formatErrorWithCause(err)}`); + throw new Error(`${label} GET ${url} failed: ${formatErrorWithCause(err)}`); } if (!res.ok) { throw new Error( - `metrics GET ${url} failed: ${res.status} ${res.statusText}`, + `${label} GET ${url} failed: ${res.status} ${res.statusText}`, ); } return await res.text(); } +export async function fetchMetrics(url: string): Promise { + return await fetchText(url, 'metrics'); +} + export function parseMetricCounter( body: string, metricName: string, diff --git a/templates/keynote-2/src/distributed/coordinator.ts b/templates/keynote-2/src/distributed/coordinator.ts index 0478a0f8c..105cf8899 100644 --- a/templates/keynote-2/src/distributed/coordinator.ts +++ b/templates/keynote-2/src/distributed/coordinator.ts @@ -29,6 +29,7 @@ import type { RegisterRequest, StartEpochRequest, StartEpochResponse, + StartedRequest, StoppedRequest, } from './protocol.ts'; import { isoNow, sleep, writeJsonFile } from './util.ts'; @@ -47,6 +48,7 @@ type ActiveEpoch = { label: string | null; participantIds: string[]; participantConnections: number; + startedAcks: Set; stopAcks: Set; }; @@ -110,7 +112,7 @@ async function runVerification( class DistributedCoordinator { private readonly testName: string; private readonly connectorName: string; - private readonly warmupMs: number; + private readonly startAckTimeoutMs: number; private readonly windowMs: number; private readonly verifyAfterEpoch: boolean; private readonly stopAckTimeoutMs: number; @@ -129,7 +131,7 @@ class DistributedCoordinator { constructor(opts: { testName: string; connectorName: string; - warmupMs: number; + startAckTimeoutMs: number; windowMs: number; verifyAfterEpoch: boolean; stopAckTimeoutMs: number; @@ -140,7 +142,7 @@ class DistributedCoordinator { }) { this.testName = opts.testName; this.connectorName = opts.connectorName; - this.warmupMs = opts.warmupMs; + this.startAckTimeoutMs = opts.startAckTimeoutMs; this.windowMs = opts.windowMs; this.verifyAfterEpoch = opts.verifyAfterEpoch; this.stopAckTimeoutMs = opts.stopAckTimeoutMs; @@ -194,6 +196,24 @@ class DistributedCoordinator { return this.snapshot(); } + started(body: StartedRequest): CoordinatorState { + const generator = this.requireGenerator(body.id); + if (!this.currentEpoch || body.epoch !== this.currentEpoch.epoch) { + throw new Error( + `Generator "${body.id}" acknowledged unexpected epoch ${body.epoch}`, + ); + } + if (generator.activeEpoch !== body.epoch) { + throw new Error( + `Generator "${body.id}" is not assigned to epoch ${body.epoch}`, + ); + } + + generator.localState = 'running'; + this.currentEpoch.startedAcks.add(body.id); + return this.snapshot(); + } + stopped(body: StoppedRequest): CoordinatorState { const generator = this.requireGenerator(body.id); generator.localState = 'ready'; @@ -252,18 +272,19 @@ class DistributedCoordinator { (sum, generator) => sum + generator.openedConnections, 0, ), + startedAcks: new Set(), stopAcks: new Set(), }; for (const participantId of activeEpoch.participantIds) { const generator = this.generators.get(participantId); if (!generator) continue; - generator.localState = 'running'; + generator.localState = 'starting'; generator.activeEpoch = activeEpoch.epoch; } this.currentEpoch = activeEpoch; - this.phase = 'warmup'; + this.phase = 'starting'; this.epochTask = this.runEpoch(activeEpoch) .catch((err) => { const msg = err instanceof Error ? err.message : String(err); @@ -297,9 +318,14 @@ class DistributedCoordinator { try { console.log( - `[coordinator] epoch ${activeEpoch.epoch} warmup for ${(this.warmupMs / 1000).toFixed(1)}s`, + `[coordinator] epoch ${activeEpoch.epoch} waiting for start acknowledgements from ${activeEpoch.participantIds.length} generators`, ); - await sleep(this.warmupMs); + const pendingStarts = await this.waitForStarts(activeEpoch); + if (pendingStarts.length > 0) { + throw new Error( + `Missing start acknowledgements from: ${pendingStarts.join(', ')}`, + ); + } const before = await getSpacetimeCommittedTransfers(this.stdbUrl); if (before == null) { @@ -379,7 +405,6 @@ class DistributedCoordinator { label: activeEpoch.label, test: this.testName, connector: this.connectorName, - warmupSeconds: this.warmupMs / 1000, windowSeconds: this.windowMs / 1000, actualWindowSeconds, participantIds: activeEpoch.participantIds, @@ -405,6 +430,25 @@ class DistributedCoordinator { console.log(`[coordinator] wrote epoch ${result.epoch} result to ${outPath}`); } + private async waitForStarts(activeEpoch: ActiveEpoch): Promise { + const deadline = Date.now() + this.startAckTimeoutMs; + + while (Date.now() < deadline) { + if (activeEpoch.startedAcks.size >= activeEpoch.participantIds.length) { + return []; + } + await sleep(250); + } + + const pending = activeEpoch.participantIds.filter( + (id) => !activeEpoch.startedAcks.has(id), + ); + console.warn( + `[coordinator] start acknowledgements timed out for epoch ${activeEpoch.epoch}: ${pending.join(', ')}`, + ); + return pending; + } + private async waitForStops(activeEpoch: ActiveEpoch): Promise { const deadline = Date.now() + this.stopAckTimeoutMs; @@ -450,7 +494,11 @@ async function main(): Promise { new URL('../../runs/distributed/', import.meta.url), ); const resultsDir = getStringFlag(flags, 'results-dir', defaultResultsDir); - const warmupSeconds = getNumberFlag(flags, 'warmup-seconds', 15); + const startAckTimeoutSeconds = getNumberFlag( + flags, + 'start-ack-timeout-seconds', + 60, + ); const windowSeconds = getNumberFlag(flags, 'window-seconds', 60); const stopAckTimeoutSeconds = getNumberFlag( flags, @@ -479,7 +527,7 @@ async function main(): Promise { const coordinator = new DistributedCoordinator({ testName, connectorName, - warmupMs: warmupSeconds * 1000, + startAckTimeoutMs: startAckTimeoutSeconds * 1000, windowMs: windowSeconds * 1000, verifyAfterEpoch, stopAckTimeoutMs: stopAckTimeoutSeconds * 1000, @@ -517,6 +565,12 @@ async function main(): Promise { return; } + if (method === 'POST' && path === '/started') { + const body = await readJsonBody(req); + json(res, 200, coordinator.started(body)); + return; + } + if (method === 'POST' && path === '/stopped') { const body = await readJsonBody(req); json(res, 200, coordinator.stopped(body)); @@ -550,7 +604,7 @@ async function main(): Promise { }); console.log( - `[coordinator] listening on http://${bind}:${port} test=${testName} connector=${connectorName} warmup=${warmupSeconds}s window=${windowSeconds}s verify=${verifyAfterEpoch ? 'on' : 'off'} stdb=${stdbUrl} compression=${stdbCompression}`, + `[coordinator] listening on http://${bind}:${port} test=${testName} connector=${connectorName} start_ack_timeout=${startAckTimeoutSeconds}s window=${windowSeconds}s verify=${verifyAfterEpoch ? 'on' : 'off'} stdb=${stdbUrl} compression=${stdbCompression}`, ); } diff --git a/templates/keynote-2/src/distributed/generator.ts b/templates/keynote-2/src/distributed/generator.ts index dffb68949..62a429d78 100644 --- a/templates/keynote-2/src/distributed/generator.ts +++ b/templates/keynote-2/src/distributed/generator.ts @@ -89,6 +89,18 @@ async function main(): Promise { let activeEpoch: number | null = null; let stopping = false; + const startActiveEpoch = async (epoch: number) => { + console.log(`[generator ${id}] starting epoch ${epoch}`); + await session.startEpoch(epoch); + activeEpoch = epoch; + await retryUntilSuccess('[generator] started', async () => { + await postJson(coordinatorUrl, '/started', { + id, + epoch, + }); + }, pollMs, controlRetries, () => !stopping); + }; + const stopActiveEpoch = async () => { if (activeEpoch == null) return; @@ -147,17 +159,15 @@ async function main(): Promise { state.participants.includes(id); const shouldKeepRunning = isParticipant && - (state.phase === 'warmup' || state.phase === 'measure'); + (state.phase === 'starting' || state.phase === 'measure'); if (!activeEpoch) { if ( - state.phase === 'warmup' && + state.phase === 'starting' && state.currentEpoch != null && state.participants.includes(id) ) { - console.log(`[generator ${id}] starting epoch ${state.currentEpoch}`); - await session.startEpoch(state.currentEpoch); - activeEpoch = state.currentEpoch; + await startActiveEpoch(state.currentEpoch); } } else if (!shouldKeepRunning) { await stopActiveEpoch(); diff --git a/templates/keynote-2/src/distributed/protocol.ts b/templates/keynote-2/src/distributed/protocol.ts index 91b128f0e..a7286d821 100644 --- a/templates/keynote-2/src/distributed/protocol.ts +++ b/templates/keynote-2/src/distributed/protocol.ts @@ -1,6 +1,10 @@ -export type GeneratorLocalState = 'registered' | 'ready' | 'running'; +export type GeneratorLocalState = + | 'registered' + | 'ready' + | 'starting' + | 'running'; -export type CoordinatorPhase = 'idle' | 'warmup' | 'measure' | 'stop'; +export type CoordinatorPhase = 'idle' | 'starting' | 'measure' | 'stop'; export type GeneratorSnapshot = { id: string; @@ -16,7 +20,6 @@ export type EpochResult = { label: string | null; test: string; connector: string; - warmupSeconds: number; windowSeconds: number; actualWindowSeconds: number; participantIds: string[]; @@ -54,6 +57,11 @@ export type ReadyRequest = { openedConnections: number; }; +export type StartedRequest = { + id: string; + epoch: number; +}; + export type StoppedRequest = { id: string; epoch: number;