mirror of
https://github.com/clockworklabs/SpacetimeDB.git
synced 2026-05-06 07:26:43 -04:00
keynote-2: use rust client (#4421)
This commit is contained in:
committed by
GitHub
parent
11a693d4c3
commit
6832fa1bce
Generated
+57
-24
@@ -4318,6 +4318,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"libm",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -5943,6 +5944,16 @@ dependencies = [
|
||||
"getrandom 0.3.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_distr"
|
||||
version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6a8615d50dcf34fa31f7ab52692afec947c4dd0ab803cc87cb3b0b4570ff7463"
|
||||
dependencies = [
|
||||
"num-traits",
|
||||
"rand 0.9.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rand_xorshift"
|
||||
version = "0.4.0"
|
||||
@@ -7436,9 +7447,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "spacetimedb"
|
||||
version = "1.6.0"
|
||||
version = "1.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8679cf54a7a653e6bc612bef28c219f98b9274308d8aae1e86046ed685db48b1"
|
||||
checksum = "db18cb19c7499ba4a65b1504442179a7e4aba487dc35978d90966c5ca02ee16b"
|
||||
dependencies = [
|
||||
"bytemuck",
|
||||
"derive_more 0.99.20",
|
||||
@@ -7446,10 +7457,11 @@ dependencies = [
|
||||
"log",
|
||||
"rand 0.8.5",
|
||||
"scoped-tls",
|
||||
"spacetimedb-bindings-macro 1.6.0",
|
||||
"spacetimedb-bindings-sys 1.6.0",
|
||||
"spacetimedb-lib 1.6.0",
|
||||
"spacetimedb-primitives 1.6.0",
|
||||
"serde_json",
|
||||
"spacetimedb-bindings-macro 1.9.0",
|
||||
"spacetimedb-bindings-sys 1.9.0",
|
||||
"spacetimedb-lib 1.9.0",
|
||||
"spacetimedb-primitives 1.9.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -7541,15 +7553,15 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "spacetimedb-bindings-macro"
|
||||
version = "1.6.0"
|
||||
version = "1.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9a930242493f5c875ab96903eb40fb6a90c4d3ae99597fd51da569ff22769a03"
|
||||
checksum = "47725515a53cf3344aa6bbb3f2063c7fbb5496c743f7a7c2150413acd1213c1d"
|
||||
dependencies = [
|
||||
"heck 0.4.1",
|
||||
"humantime",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"spacetimedb-primitives 1.6.0",
|
||||
"spacetimedb-primitives 1.9.0",
|
||||
"syn 2.0.107",
|
||||
]
|
||||
|
||||
@@ -7567,11 +7579,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "spacetimedb-bindings-sys"
|
||||
version = "1.6.0"
|
||||
version = "1.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e565dfcdd2dc3f58e0178052ec1c8ce710b013482d4fa50b511ba786a2c3bc68"
|
||||
checksum = "08201dac3ce095645dfbf407e71aba7c784a6061dace21bb4a49dd0b80d3f007"
|
||||
dependencies = [
|
||||
"spacetimedb-primitives 1.6.0",
|
||||
"spacetimedb-primitives 1.9.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -8074,9 +8086,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "spacetimedb-lib"
|
||||
version = "1.6.0"
|
||||
version = "1.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0ba57c8f1983bb144ee7e1ff28aa5882ca437f67c14a14daad4bf61d31f7040d"
|
||||
checksum = "702c08bfcd0426c45786e30f016e0a03d85f34dac3555e5b370291441297e266"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bitflags 2.10.0",
|
||||
@@ -8086,9 +8098,9 @@ dependencies = [
|
||||
"enum-as-inner",
|
||||
"hex",
|
||||
"itertools 0.12.1",
|
||||
"spacetimedb-bindings-macro 1.6.0",
|
||||
"spacetimedb-primitives 1.6.0",
|
||||
"spacetimedb-sats 1.6.0",
|
||||
"spacetimedb-bindings-macro 1.9.0",
|
||||
"spacetimedb-primitives 1.9.0",
|
||||
"spacetimedb-sats 1.9.0",
|
||||
"thiserror 1.0.69",
|
||||
]
|
||||
|
||||
@@ -8194,12 +8206,13 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "spacetimedb-primitives"
|
||||
version = "1.6.0"
|
||||
version = "1.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1dcd64c6970ca59e7b71e51952cf1a71bae2652b1eb736c19a7e528f3874894a"
|
||||
checksum = "55af71f2ccb753957ad47b19648481bd67ae458885f18df867a4d5b0a55c8c67"
|
||||
dependencies = [
|
||||
"bitflags 2.10.0",
|
||||
"either",
|
||||
"enum-as-inner",
|
||||
"itertools 0.12.1",
|
||||
"nohash-hasher",
|
||||
]
|
||||
@@ -8242,11 +8255,31 @@ dependencies = [
|
||||
"spacetimedb-lib 2.0.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "spacetimedb-rust-transfer-sim"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"clap 4.5.50",
|
||||
"futures",
|
||||
"http 1.3.1",
|
||||
"humantime",
|
||||
"itertools 0.12.1",
|
||||
"log",
|
||||
"rand 0.9.2",
|
||||
"rand_distr",
|
||||
"spacetimedb-client-api-messages",
|
||||
"spacetimedb-lib 2.0.1",
|
||||
"thiserror 1.0.69",
|
||||
"tokio",
|
||||
"tokio-tungstenite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "spacetimedb-sats"
|
||||
version = "1.6.0"
|
||||
version = "1.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "eae61d8f88bda21f56c143bc9b4ddc20100ff08ae4fed0b9444509f7c3ca4339"
|
||||
checksum = "c4a89afd9f4eded852e7355102f66f8ff346d25fe903d38ef0b6a171d32d696a"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arrayvec",
|
||||
@@ -8263,8 +8296,8 @@ dependencies = [
|
||||
"second-stack",
|
||||
"sha3",
|
||||
"smallvec",
|
||||
"spacetimedb-bindings-macro 1.6.0",
|
||||
"spacetimedb-primitives 1.6.0",
|
||||
"spacetimedb-bindings-macro 1.9.0",
|
||||
"spacetimedb-primitives 1.9.0",
|
||||
"thiserror 1.0.69",
|
||||
]
|
||||
|
||||
@@ -9813,7 +9846,7 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"log",
|
||||
"spacetimedb 1.6.0",
|
||||
"spacetimedb 1.9.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
+3
-1
@@ -45,6 +45,7 @@ members = [
|
||||
"modules/module-test",
|
||||
"templates/basic-rs/spacetimedb",
|
||||
"templates/chat-console-rs/spacetimedb",
|
||||
"templates/keynote-2/spacetimedb-rust-client",
|
||||
"modules/sdk-test",
|
||||
"modules/sdk-test-connect-disconnect",
|
||||
"modules/sdk-test-procedure",
|
||||
@@ -210,7 +211,7 @@ home = "0.5"
|
||||
hostname = "^0.3"
|
||||
http = "1.0"
|
||||
http-body-util= "0.1.3"
|
||||
humantime = "2.1.0"
|
||||
humantime = "2.3"
|
||||
hyper = "1.0"
|
||||
hyper-util = { version = "0.1", features = ["tokio"] }
|
||||
ignore = "0.4"
|
||||
@@ -254,6 +255,7 @@ quick-xml = "0.31"
|
||||
quote = "1.0.8"
|
||||
rand08 = { package = "rand", version = "0.8" }
|
||||
rand = "0.9"
|
||||
rand_distr = "0.5.1"
|
||||
rayon = "1.8"
|
||||
rayon-core = "1.11.0"
|
||||
regex = "1"
|
||||
|
||||
Generated
+44
-3
@@ -468,8 +468,8 @@ importers:
|
||||
specifier: ^3.0.1
|
||||
version: 3.0.1
|
||||
spacetimedb:
|
||||
specifier: workspace:*
|
||||
version: link:../../crates/bindings-typescript
|
||||
specifier: ^2.0
|
||||
version: 2.0.1(@angular/core@21.1.4(@angular/compiler@21.1.4)(rxjs@7.8.2))(@tanstack/react-query@5.90.19(react@19.2.4))(react@19.2.4)(svelte@5.46.4)(undici@6.21.3)(vue@3.5.26(typescript@5.9.3))
|
||||
sql.js:
|
||||
specifier: ^1.13.0
|
||||
version: 1.14.0
|
||||
@@ -12872,6 +12872,29 @@ packages:
|
||||
space-separated-tokens@2.0.2:
|
||||
resolution: {integrity: sha512-PEGlAwrG8yXGXRjW32fGbg66JAlOAwbObuqVoJpv/mRgoWDQfgH1wDPvtzWyUSNAXBGSk8h755YDbbcEy3SH2Q==}
|
||||
|
||||
spacetimedb@2.0.1:
|
||||
resolution: {integrity: sha512-19YkLz1P+JQDlYvlegDsn3YF4OsE2Pih31e5Xx/kobTExKwx/AtFWRUSA/EiRWvMV9jaIWzMhnsVfG+1fJ39Aw==}
|
||||
peerDependencies:
|
||||
'@angular/core': '>=17.0.0'
|
||||
'@tanstack/react-query': ^5.0.0
|
||||
react: ^18.0.0 || ^19.0.0-0 || ^19.0.0
|
||||
svelte: ^4.0.0 || ^5.0.0
|
||||
undici: ^6.19.2
|
||||
vue: ^3.3.0
|
||||
peerDependenciesMeta:
|
||||
'@angular/core':
|
||||
optional: true
|
||||
'@tanstack/react-query':
|
||||
optional: true
|
||||
react:
|
||||
optional: true
|
||||
svelte:
|
||||
optional: true
|
||||
undici:
|
||||
optional: true
|
||||
vue:
|
||||
optional: true
|
||||
|
||||
spdx-correct@3.2.0:
|
||||
resolution: {integrity: sha512-kN9dJbvnySHULIluDHy32WHRUu3Og7B9sbY7tsFLctQkIqnMh3hErYgdMjTYuqmcXX+lK5T1lnUt3G7zNswmZA==}
|
||||
|
||||
@@ -25925,7 +25948,7 @@ snapshots:
|
||||
h3@2.0.1-rc.14:
|
||||
dependencies:
|
||||
rou3: 0.7.12
|
||||
srvx: 0.11.4
|
||||
srvx: 0.11.7
|
||||
|
||||
handle-thing@2.0.1: {}
|
||||
|
||||
@@ -30788,6 +30811,24 @@ snapshots:
|
||||
|
||||
space-separated-tokens@2.0.2: {}
|
||||
|
||||
spacetimedb@2.0.1(@angular/core@21.1.4(@angular/compiler@21.1.4)(rxjs@7.8.2))(@tanstack/react-query@5.90.19(react@19.2.4))(react@19.2.4)(svelte@5.46.4)(undici@6.21.3)(vue@3.5.26(typescript@5.9.3)):
|
||||
dependencies:
|
||||
base64-js: 1.5.1
|
||||
headers-polyfill: 4.0.3
|
||||
object-inspect: 1.13.4
|
||||
prettier: 3.6.2
|
||||
pure-rand: 7.0.1
|
||||
safe-stable-stringify: 2.5.0
|
||||
statuses: 2.0.2
|
||||
url-polyfill: 1.1.14
|
||||
optionalDependencies:
|
||||
'@angular/core': 21.1.4(@angular/compiler@21.1.4)(rxjs@7.8.2)
|
||||
'@tanstack/react-query': 5.90.19(react@19.2.4)
|
||||
react: 19.2.4
|
||||
svelte: 5.46.4
|
||||
undici: 6.21.3
|
||||
vue: 3.5.26(typescript@5.9.3)
|
||||
|
||||
spdx-correct@3.2.0:
|
||||
dependencies:
|
||||
spdx-expression-parse: 3.0.1
|
||||
|
||||
@@ -15,6 +15,8 @@ The demo compares SpacetimeDB and Convex by default, since both are easy for any
|
||||
|
||||
**Options:** `--systems a,b,c` | `--seconds N` | `--skip-prep` | `--no-animation`
|
||||
|
||||
**Note:** You will need to [install Rust](https://rust-lang.org/tools/install/) to run the spacetimedb benchmark, because we run a [Rust Client](#rust-client).
|
||||
|
||||
## Results Summary
|
||||
|
||||
All tests use 50 concurrent connections with a transfer workload (read-modify-write transaction between two accounts).
|
||||
@@ -151,6 +153,13 @@ SpacetimeDB supports `withConfirmedReads` mode which ensures transactions are du
|
||||
|
||||
PlanetScale results (~477 TPS) demonstrate the **significant impact of cloud database latency**. When the database is accessed over the network (even within the same cloud region), round-trip latency dominates performance. This is why SpacetimeDB's colocated architecture provides such dramatic improvements.
|
||||
|
||||
### Rust client
|
||||
|
||||
When running the benchmark for SpacetimeDB on higher-end hardware we found out that we were actually bottlnecked
|
||||
on our test TypeScript client. To get the absolute most out of the performance of SpacetimeDB we wrote a custom
|
||||
Rust client that allows us to send a much larger number of requests then we could otherwise. We didn't do this
|
||||
for the other backends/databases as they maxed out before the client.
|
||||
|
||||
## Systems Tested
|
||||
|
||||
| System | Architecture |
|
||||
|
||||
Generated
+50
-24
@@ -15,7 +15,7 @@
|
||||
"drizzle-orm": "^0.44.7",
|
||||
"express": "^5.1.0",
|
||||
"hdr-histogram-js": "^3.0.1",
|
||||
"spacetimedb": "1.11.4",
|
||||
"spacetimedb": "^2.0",
|
||||
"sql.js": "^1.13.0",
|
||||
"undici": "^6.19.2"
|
||||
},
|
||||
@@ -744,7 +744,6 @@
|
||||
"integrity": "sha512-NMv9ASNARoKksWtsq/SHakpYAYnhBrQgGD8zkLYk/jaK8jUGn08CfEdTRgYhMypUQAfzSP8W6gNLe0q19/t4VA==",
|
||||
"devOptional": true,
|
||||
"license": "MIT",
|
||||
"peer": true,
|
||||
"dependencies": {
|
||||
"@types/node": "*"
|
||||
}
|
||||
@@ -841,7 +840,6 @@
|
||||
"integrity": "sha512-NoaMtzhxOrubeL/7UZuNTrejB4MPAJ0RpxZqXQf2qXuVlTPuG6Y8p4u9dKRaue4yjmC7ZhzVO2/Yyyn25znrPQ==",
|
||||
"devOptional": true,
|
||||
"license": "MIT",
|
||||
"peer": true,
|
||||
"dependencies": {
|
||||
"@types/node": "*",
|
||||
"pg-protocol": "*",
|
||||
@@ -907,7 +905,6 @@
|
||||
"integrity": "sha512-ep8b36RKHlgWPqjNG9ToUrPiwkhwh0AEzy883mO5Xnd+cL6VBH1EvSjBAAuxLUFF2Vn/moE3Me6v9E1Lo+48GQ==",
|
||||
"devOptional": true,
|
||||
"license": "MIT",
|
||||
"peer": true,
|
||||
"dependencies": {
|
||||
"@types/emscripten": "*",
|
||||
"@types/node": "*"
|
||||
@@ -981,7 +978,6 @@
|
||||
"integrity": "sha512-gaYt9yqTbQ1iOxLpJA8FPR5PiaHP+jlg8I5EX0Rs2KFwNzhBsF40KzMZS5FwelY7RG0wzaucWdqSAJM3uNCPCg==",
|
||||
"hasInstallScript": true,
|
||||
"license": "MIT",
|
||||
"peer": true,
|
||||
"dependencies": {
|
||||
"bindings": "^1.5.0",
|
||||
"prebuild-install": "^7.1.1"
|
||||
@@ -1097,7 +1093,6 @@
|
||||
"integrity": "sha512-z3Xwlg7j2l9JY27x5Qn3Wlyos8YAp0kKRlrePAOjgjMGS5IG6E7Jnlx736vH9UVI4wUICwwhC9anYL++XeOgTQ==",
|
||||
"devOptional": true,
|
||||
"license": "MIT",
|
||||
"peer": true,
|
||||
"dependencies": {
|
||||
"@types/node": "*"
|
||||
}
|
||||
@@ -1200,7 +1195,6 @@
|
||||
"resolved": "https://registry.npmjs.org/convex/-/convex-1.29.3.tgz",
|
||||
"integrity": "sha512-tg5TXzMjpNk9m50YRtdp6US+t7ckxE4E+7DNKUCjJ2MupQs2RBSPF/z5SNN4GUmQLSfg0eMILDySzdAvjTrhnw==",
|
||||
"license": "Apache-2.0",
|
||||
"peer": true,
|
||||
"dependencies": {
|
||||
"esbuild": "0.25.4",
|
||||
"prettier": "^3.0.0"
|
||||
@@ -1667,12 +1661,6 @@
|
||||
"url": "https://opencollective.com/express"
|
||||
}
|
||||
},
|
||||
"node_modules/fast-text-encoding": {
|
||||
"version": "1.0.6",
|
||||
"resolved": "https://registry.npmjs.org/fast-text-encoding/-/fast-text-encoding-1.0.6.tgz",
|
||||
"integrity": "sha512-VhXlQgj9ioXCqGstD37E/HBeqEGV/qOD/kmbVG8h5xKBYvM1L3lR1Zn4555cQ8GkYbJa8aJSipLPndE1k6zK2w==",
|
||||
"license": "Apache-2.0"
|
||||
},
|
||||
"node_modules/fdir": {
|
||||
"version": "6.5.0",
|
||||
"resolved": "https://registry.npmjs.org/fdir/-/fdir-6.5.0.tgz",
|
||||
@@ -2512,7 +2500,6 @@
|
||||
"integrity": "sha512-enxc1h0jA/aq5oSDMvqyW3q89ra6XIIDZgCX9vkMrnz5DFTw/Ny3Li2lFQ+pt3L6MCgm/5o2o8HW9hiJji+xvw==",
|
||||
"devOptional": true,
|
||||
"license": "MIT",
|
||||
"peer": true,
|
||||
"dependencies": {
|
||||
"pg-connection-string": "^2.9.1",
|
||||
"pg-pool": "^3.10.1",
|
||||
@@ -2539,7 +2526,6 @@
|
||||
"version": "1.2.7",
|
||||
"resolved": "https://registry.npmjs.org/pg-cloudflare/-/pg-cloudflare-1.2.7.tgz",
|
||||
"integrity": "sha512-YgCtzMH0ptvZJslLM1ffsY4EuGaU0cx4XSdXLRFae8bPP4dS5xL1tNB3k2o/N64cHJpwU7dxKli/nZ2lUa5fLg==",
|
||||
"dev": true,
|
||||
"license": "MIT",
|
||||
"optional": true
|
||||
},
|
||||
@@ -2610,7 +2596,6 @@
|
||||
"integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==",
|
||||
"dev": true,
|
||||
"license": "MIT",
|
||||
"peer": true,
|
||||
"engines": {
|
||||
"node": ">=12"
|
||||
},
|
||||
@@ -2749,6 +2734,22 @@
|
||||
"once": "^1.3.1"
|
||||
}
|
||||
},
|
||||
"node_modules/pure-rand": {
|
||||
"version": "7.0.1",
|
||||
"resolved": "https://registry.npmjs.org/pure-rand/-/pure-rand-7.0.1.tgz",
|
||||
"integrity": "sha512-oTUZM/NAZS8p7ANR3SHh30kXB+zK2r2BPcEn/awJIbOvq82WoMN4p62AWWp3Hhw50G0xMsw1mhIBLqHw64EcNQ==",
|
||||
"funding": [
|
||||
{
|
||||
"type": "individual",
|
||||
"url": "https://github.com/sponsors/dubzzz"
|
||||
},
|
||||
{
|
||||
"type": "opencollective",
|
||||
"url": "https://opencollective.com/fast-check"
|
||||
}
|
||||
],
|
||||
"license": "MIT"
|
||||
},
|
||||
"node_modules/qs": {
|
||||
"version": "6.14.0",
|
||||
"resolved": "https://registry.npmjs.org/qs/-/qs-6.14.0.tgz",
|
||||
@@ -2873,6 +2874,15 @@
|
||||
],
|
||||
"license": "MIT"
|
||||
},
|
||||
"node_modules/safe-stable-stringify": {
|
||||
"version": "2.5.0",
|
||||
"resolved": "https://registry.npmjs.org/safe-stable-stringify/-/safe-stable-stringify-2.5.0.tgz",
|
||||
"integrity": "sha512-b3rppTKm9T+PsVCBEOUR46GWI7fdOs00VKZ1+9c1EWDaDMvjQc6tUwuFyIprgGgTcWoVHSKrU8H31ZHA2e0RHA==",
|
||||
"license": "MIT",
|
||||
"engines": {
|
||||
"node": ">=10"
|
||||
}
|
||||
},
|
||||
"node_modules/safer-buffer": {
|
||||
"version": "2.1.2",
|
||||
"resolved": "https://registry.npmjs.org/safer-buffer/-/safer-buffer-2.1.2.tgz",
|
||||
@@ -3093,28 +3103,46 @@
|
||||
}
|
||||
},
|
||||
"node_modules/spacetimedb": {
|
||||
"version": "1.11.4",
|
||||
"resolved": "https://registry.npmjs.org/spacetimedb/-/spacetimedb-1.11.4.tgz",
|
||||
"integrity": "sha512-Xcb42UH2dAysThPrCUARTDxeeJWAf6QrJThS89LigegLmjFd1/KvJ9k/yMrXz75JdSlJZhFvttjKZff3CakqRQ==",
|
||||
"version": "2.0.1",
|
||||
"resolved": "https://registry.npmjs.org/spacetimedb/-/spacetimedb-2.0.1.tgz",
|
||||
"integrity": "sha512-19YkLz1P+JQDlYvlegDsn3YF4OsE2Pih31e5Xx/kobTExKwx/AtFWRUSA/EiRWvMV9jaIWzMhnsVfG+1fJ39Aw==",
|
||||
"license": "ISC",
|
||||
"dependencies": {
|
||||
"base64-js": "^1.5.1",
|
||||
"fast-text-encoding": "^1.0.0",
|
||||
"headers-polyfill": "^4.0.3",
|
||||
"object-inspect": "^1.13.4",
|
||||
"prettier": "^3.3.3",
|
||||
"pure-rand": "^7.0.1",
|
||||
"safe-stable-stringify": "^2.5.0",
|
||||
"statuses": "^2.0.2",
|
||||
"url-polyfill": "^1.1.14"
|
||||
},
|
||||
"peerDependencies": {
|
||||
"@angular/core": ">=17.0.0",
|
||||
"@tanstack/react-query": "^5.0.0",
|
||||
"react": "^18.0.0 || ^19.0.0-0 || ^19.0.0",
|
||||
"undici": "^6.19.2"
|
||||
"svelte": "^4.0.0 || ^5.0.0",
|
||||
"undici": "^6.19.2",
|
||||
"vue": "^3.3.0"
|
||||
},
|
||||
"peerDependenciesMeta": {
|
||||
"@angular/core": {
|
||||
"optional": true
|
||||
},
|
||||
"@tanstack/react-query": {
|
||||
"optional": true
|
||||
},
|
||||
"react": {
|
||||
"optional": true
|
||||
},
|
||||
"svelte": {
|
||||
"optional": true
|
||||
},
|
||||
"undici": {
|
||||
"optional": true
|
||||
},
|
||||
"vue": {
|
||||
"optional": true
|
||||
}
|
||||
}
|
||||
},
|
||||
@@ -3132,8 +3160,7 @@
|
||||
"version": "1.13.0",
|
||||
"resolved": "https://registry.npmjs.org/sql.js/-/sql.js-1.13.0.tgz",
|
||||
"integrity": "sha512-RJbVP1HRDlUUXahJ7VMTcu9Rm1Nzw+EBpoPr94vnbD4LwR715F3CcxE2G2k45PewcaZ57pjetYa+LoSJLAASgA==",
|
||||
"license": "MIT",
|
||||
"peer": true
|
||||
"license": "MIT"
|
||||
},
|
||||
"node_modules/ssri": {
|
||||
"version": "13.0.0",
|
||||
@@ -3323,7 +3350,6 @@
|
||||
"resolved": "https://registry.npmjs.org/undici/-/undici-6.22.0.tgz",
|
||||
"integrity": "sha512-hU/10obOIu62MGYjdskASR3CUAiYaFTtC9Pa6vHyf//mAipSvSQg6od2CnJswq7fvzNS3zJhxoRkgNVaHurWKw==",
|
||||
"license": "MIT",
|
||||
"peer": true,
|
||||
"engines": {
|
||||
"node": ">=18.17"
|
||||
}
|
||||
|
||||
@@ -36,7 +36,7 @@
|
||||
"drizzle-orm": "^0.44.7",
|
||||
"express": "^5.1.0",
|
||||
"hdr-histogram-js": "^3.0.1",
|
||||
"spacetimedb": "workspace:*",
|
||||
"spacetimedb": "^2.0",
|
||||
"sql.js": "^1.13.0",
|
||||
"undici": "^6.19.2"
|
||||
}
|
||||
|
||||
+2197
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,21 @@
|
||||
[package]
|
||||
name = "spacetimedb-rust-transfer-sim"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
description = "Client simulating of transfers and benchmarking TPS for SpacetimeDB"
|
||||
|
||||
[dependencies]
|
||||
spacetimedb-lib.workspace = true
|
||||
spacetimedb-client-api-messages.workspace = true
|
||||
http.workspace = true
|
||||
thiserror.workspace = true
|
||||
rand.workspace = true
|
||||
rand_distr.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-tungstenite.workspace = true
|
||||
log.workspace = true
|
||||
bytes.workspace = true
|
||||
futures.workspace = true
|
||||
itertools.workspace = true
|
||||
humantime.workspace = true
|
||||
clap.workspace = true
|
||||
@@ -0,0 +1,332 @@
|
||||
#![allow(clippy::disallowed_macros)]
|
||||
|
||||
mod websocket;
|
||||
|
||||
use crate::websocket::{Recv, Send, ServerMessage, WsParams};
|
||||
use clap::{Args, Parser, Subcommand};
|
||||
use core::sync::atomic::{AtomicU64, Ordering};
|
||||
use humantime::{format_duration, parse_duration};
|
||||
use rand::{SeedableRng as _, distr::Distribution, rngs::SmallRng};
|
||||
use rand_distr::Zipf;
|
||||
use spacetimedb_client_api_messages::websocket::v1::{CallReducer, CallReducerFlags, ClientMessage, Compression};
|
||||
use spacetimedb_lib::bsatn;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
use std::{fs, thread};
|
||||
use tokio::runtime::{self, Handle, Runtime};
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
const LOCALHOST: &str = "http://localhost:3000";
|
||||
const MODULE: &str = "sim";
|
||||
|
||||
const DURATION: &str = "5s";
|
||||
const WARMUP_DURATION: &str = "5s";
|
||||
const ALPHA: f32 = 0.5;
|
||||
const CONNECTIONS: usize = 10;
|
||||
const INIT_BALANCE: i64 = 1_000_000;
|
||||
const AMOUNT: u32 = 1;
|
||||
const ACCOUNTS: u32 = 100_000;
|
||||
const CONFIRMED_READS: bool = false;
|
||||
// Max inflight reducer calls imposed by the server.
|
||||
const MAX_INFLIGHT_REDUCERS: u64 = 16384;
|
||||
|
||||
// When called from within an async context, return a handle to it (and no
|
||||
// `Runtime`), otherwise create a fresh `Runtime` and return it along with a
|
||||
// handle to it.
|
||||
fn enter_or_create_runtime(connections: usize) -> (Option<Runtime>, runtime::Handle) {
|
||||
match runtime::Handle::try_current() {
|
||||
Err(e) if e.is_missing_context() => {
|
||||
let rt = tokio::runtime::Builder::new_multi_thread()
|
||||
.enable_all()
|
||||
.worker_threads(connections)
|
||||
.thread_name("spacetimedb-background-connection")
|
||||
.build()
|
||||
.unwrap();
|
||||
let handle = rt.handle().clone();
|
||||
|
||||
(Some(rt), handle)
|
||||
}
|
||||
Ok(handle) => (None, handle),
|
||||
Err(_) => unimplemented!(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn init_conn(cli: &Common, handle: &Handle) -> (JoinHandle<()>, Recv, Send) {
|
||||
let server: &str = &cli.server;
|
||||
let uri = server.try_into().unwrap();
|
||||
let params = WsParams {
|
||||
compression: Compression::None,
|
||||
light: true,
|
||||
confirmed: cli.confirmed_reads.into(),
|
||||
};
|
||||
|
||||
let conn = websocket::WsConnection::connect(uri, &cli.module, None, None, params)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let (jh, mut rx, tx) = conn.spawn_message_loop(handle);
|
||||
|
||||
let init = rx.recv().await.unwrap();
|
||||
assert_eq!(init, ServerMessage::IdentityToken);
|
||||
|
||||
(jh, rx, tx)
|
||||
}
|
||||
|
||||
fn pick_two_distinct(mut pick: impl FnMut() -> u32, max_spins: usize) -> (u32, u32) {
|
||||
let a = pick();
|
||||
let mut b = pick();
|
||||
|
||||
let mut spins = 0;
|
||||
while a == b && spins < max_spins {
|
||||
b = pick();
|
||||
spins += 1;
|
||||
}
|
||||
|
||||
(a, b)
|
||||
}
|
||||
|
||||
fn make_transfers(accounts: u32, alpha: f32) -> Vec<(u32, u32)> {
|
||||
let dist = Zipf::new(accounts as f32, alpha).unwrap();
|
||||
let mut rng = SmallRng::seed_from_u64(0x12345678);
|
||||
(0..10_000_000)
|
||||
.filter_map(|_| {
|
||||
let (from, to) = pick_two_distinct(|| dist.sample(&mut rng) as u32, 32);
|
||||
if from >= accounts || to >= accounts || from == to {
|
||||
None
|
||||
} else {
|
||||
Some((from, to))
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn seed(cli: &Common, seed: &Seed) {
|
||||
let (_runtime, handle) = enter_or_create_runtime(1);
|
||||
let (jh, mut rx, tx) = handle.block_on(init_conn(cli, &handle));
|
||||
|
||||
let args = (cli.accounts, seed.initial_balance);
|
||||
let args = bsatn::to_vec(&args).unwrap().into();
|
||||
tx.send(ClientMessage::CallReducer(CallReducer {
|
||||
reducer: "seed".into(),
|
||||
args,
|
||||
request_id: 0,
|
||||
flags: CallReducerFlags::FullUpdate,
|
||||
}))
|
||||
.unwrap();
|
||||
|
||||
let reply = rx.blocking_recv().unwrap();
|
||||
assert_eq!(reply, ServerMessage::TransactionUpdate);
|
||||
|
||||
if !cli.quiet {
|
||||
println!("done seeding");
|
||||
}
|
||||
|
||||
jh.abort();
|
||||
}
|
||||
|
||||
fn bench(cli: &Common, bench: &Bench) {
|
||||
let (_runtime, handle) = enter_or_create_runtime(bench.connections);
|
||||
|
||||
// Dump some config parameters.
|
||||
let alpha = bench.alpha;
|
||||
let accounts = cli.accounts;
|
||||
let amount = bench.amount;
|
||||
if !cli.quiet {
|
||||
println!("Benchmark parameters:");
|
||||
println!("alpha={alpha}, amount = {amount}, accounts = {accounts}");
|
||||
println!("max inflight reducers = {}", bench.max_inflight_reducers);
|
||||
println!();
|
||||
}
|
||||
|
||||
// Parse the durations.
|
||||
let duration = parse_duration(&bench.duration).expect("invalid duration passed");
|
||||
let warmup_duration = parse_duration(&bench.warmup_duration).expect("invalid warmup duration passed");
|
||||
|
||||
// Initialize connections.
|
||||
let connections = bench.connections;
|
||||
let confirmed_reads = cli.confirmed_reads;
|
||||
if !cli.quiet {
|
||||
println!("initializing {connections} connections with confirmed-reads={confirmed_reads}");
|
||||
}
|
||||
let (join_handles, conns): (Vec<_>, Vec<_>) = (0..connections)
|
||||
.map(|_| {
|
||||
let (jh, tx, rx) = handle.block_on(init_conn(cli, &handle));
|
||||
(jh, (tx, rx))
|
||||
})
|
||||
.unzip();
|
||||
|
||||
// Pre-compute transfer pairs.
|
||||
let transfer_pairs = &make_transfers(accounts, alpha);
|
||||
let transfers_per_worker = transfer_pairs.len() / conns.len();
|
||||
|
||||
let warmup_start_all = Instant::now();
|
||||
let mut start_all = warmup_start_all;
|
||||
let barrier = &std::sync::Barrier::new(conns.len());
|
||||
let completed = Arc::new(AtomicU64::default());
|
||||
|
||||
thread::scope(|scope| {
|
||||
if !cli.quiet {
|
||||
eprintln!("warming up for {}...", format_duration(warmup_duration));
|
||||
}
|
||||
let mut start_all = Some(&mut start_all);
|
||||
for (worker_idx, (mut rx, tx)) in conns.into_iter().enumerate() {
|
||||
let completed = completed.clone();
|
||||
let start_all = start_all.take();
|
||||
scope.spawn(move || {
|
||||
let mut run = || {
|
||||
let mut transfers = 0;
|
||||
let mut transfer_idx = worker_idx * transfers_per_worker;
|
||||
|
||||
while transfers < bench.max_inflight_reducers {
|
||||
let (from, to) = match transfer_pairs.get(transfer_idx) {
|
||||
Some(x) => *x,
|
||||
None => {
|
||||
transfer_idx = 0;
|
||||
transfer_pairs[transfer_idx]
|
||||
}
|
||||
};
|
||||
transfer_idx += 1;
|
||||
|
||||
let args = (from, to, amount);
|
||||
let args = bsatn::to_vec(&args).unwrap().into();
|
||||
tx.send(ClientMessage::CallReducer(CallReducer {
|
||||
reducer: "transfer".into(),
|
||||
args,
|
||||
request_id: 0,
|
||||
flags: CallReducerFlags::FullUpdate,
|
||||
}))
|
||||
.unwrap();
|
||||
transfers += 1;
|
||||
}
|
||||
|
||||
// Block until all confirmations arrived.
|
||||
let mut recorded_transfers = 0;
|
||||
while recorded_transfers < transfers {
|
||||
match rx.blocking_recv() {
|
||||
None => unreachable!(),
|
||||
Some(ServerMessage::TransactionUpdate) => {}
|
||||
Some(_) => continue,
|
||||
}
|
||||
recorded_transfers += 1;
|
||||
}
|
||||
|
||||
transfers
|
||||
};
|
||||
|
||||
while warmup_start_all.elapsed() < warmup_duration {
|
||||
run();
|
||||
}
|
||||
|
||||
if barrier.wait().is_leader() && !cli.quiet {
|
||||
eprintln!("finished warmup...");
|
||||
eprintln!("benchmarking for {}...", format_duration(duration));
|
||||
}
|
||||
let start = Instant::now();
|
||||
if let Some(start_all) = start_all {
|
||||
*start_all = start;
|
||||
}
|
||||
|
||||
while start.elapsed() < duration {
|
||||
let transfers = run();
|
||||
completed.fetch_add(transfers, Ordering::Relaxed);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
let completed = completed.load(Ordering::Relaxed);
|
||||
let elapsed = start_all.elapsed().as_secs_f64();
|
||||
let tps = completed as f64 / elapsed;
|
||||
|
||||
if !cli.quiet {
|
||||
println!("ran for {elapsed} seconds");
|
||||
println!("completed {completed}");
|
||||
println!("throughput was {tps} TPS");
|
||||
}
|
||||
|
||||
if let Some(path) = bench.tps_write_path.as_deref() {
|
||||
let path = Path::new(path);
|
||||
fs::write(path, format!("{tps}")).expect("Failed to write TPS to file {path}");
|
||||
}
|
||||
|
||||
for handle in join_handles {
|
||||
handle.abort();
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(about)]
|
||||
struct Cli {
|
||||
#[command(subcommand)]
|
||||
command: Commands,
|
||||
}
|
||||
|
||||
#[derive(Args)]
|
||||
struct Common {
|
||||
#[arg(short, long, default_value_t = false)]
|
||||
quiet: bool,
|
||||
|
||||
#[arg(short, long, default_value = LOCALHOST)]
|
||||
server: String,
|
||||
|
||||
#[arg(short, long, default_value = MODULE)]
|
||||
module: String,
|
||||
|
||||
#[arg(long, default_value_t = CONFIRMED_READS)]
|
||||
confirmed_reads: bool,
|
||||
|
||||
#[arg(long, default_value_t = ACCOUNTS)]
|
||||
accounts: u32,
|
||||
}
|
||||
|
||||
#[derive(Subcommand)]
|
||||
enum Commands {
|
||||
Seed(Seed),
|
||||
Bench(Bench),
|
||||
}
|
||||
|
||||
#[derive(Args)]
|
||||
struct Seed {
|
||||
#[command(flatten)]
|
||||
common: Common,
|
||||
|
||||
#[arg(short, long, default_value_t = INIT_BALANCE)]
|
||||
initial_balance: i64,
|
||||
}
|
||||
|
||||
#[derive(Args)]
|
||||
struct Bench {
|
||||
#[command(flatten)]
|
||||
common: Common,
|
||||
|
||||
#[arg(short, long, default_value_t = ALPHA)]
|
||||
alpha: f32,
|
||||
|
||||
#[arg(long, default_value_t = AMOUNT)]
|
||||
amount: u32,
|
||||
|
||||
#[arg(short, long, default_value_t = CONNECTIONS)]
|
||||
connections: usize,
|
||||
|
||||
#[arg(long, default_value_t = MAX_INFLIGHT_REDUCERS)]
|
||||
max_inflight_reducers: u64,
|
||||
|
||||
#[arg(short, long, default_value = DURATION)]
|
||||
duration: String,
|
||||
|
||||
#[arg(short, long, default_value = WARMUP_DURATION)]
|
||||
warmup_duration: String,
|
||||
|
||||
#[arg(short, long)]
|
||||
tps_write_path: Option<String>,
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let cli = Cli::parse();
|
||||
|
||||
match &cli.command {
|
||||
Commands::Seed(seed_args) => seed(&seed_args.common, seed_args),
|
||||
Commands::Bench(bench_args) => bench(&bench_args.common, bench_args),
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,422 @@
|
||||
//! Low-level WebSocket plumbing.
|
||||
//!
|
||||
//! This module is internal, and may incompatibly change without warning.
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::{SinkExt, TryStreamExt};
|
||||
use http::uri::{InvalidUri, Scheme, Uri};
|
||||
use spacetimedb_client_api_messages::websocket::common::SERVER_MSG_COMPRESSION_TAG_NONE;
|
||||
use spacetimedb_client_api_messages::websocket::v1::{BIN_PROTOCOL, ClientMessage, Compression};
|
||||
use spacetimedb_lib::de::Deserialize;
|
||||
use spacetimedb_lib::{ConnectionId, bsatn};
|
||||
use std::mem;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use thiserror::Error;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::Instant;
|
||||
use tokio::{net::TcpStream, runtime};
|
||||
use tokio_tungstenite::{
|
||||
MaybeTlsStream, WebSocketStream, connect_async_with_config,
|
||||
tungstenite::client::IntoClientRequest,
|
||||
tungstenite::protocol::{Message as WebSocketMessage, WebSocketConfig},
|
||||
};
|
||||
|
||||
fn decompress_server_message(raw: &[u8]) -> &[u8] {
|
||||
match raw {
|
||||
[SERVER_MSG_COMPRESSION_TAG_NONE, bytes @ ..] => bytes,
|
||||
_ => unimplemented!(),
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Error, Debug, Clone)]
|
||||
pub enum UriError {
|
||||
#[error("Unknown URI scheme {scheme}, expected http, https, ws or wss")]
|
||||
UnknownUriScheme { scheme: String },
|
||||
|
||||
#[error("Expected a URI without a query part, but found {query}")]
|
||||
UnexpectedQuery { query: String },
|
||||
|
||||
#[error(transparent)]
|
||||
InvalidUri {
|
||||
// `Arc` is required for `Self: Clone`, as `http::uri::InvalidUri: !Clone`.
|
||||
source: Arc<http::uri::InvalidUri>,
|
||||
},
|
||||
|
||||
#[error(transparent)]
|
||||
InvalidUriParts {
|
||||
// `Arc` is required for `Self: Clone`, as `http::uri::InvalidUriParts: !Clone`.
|
||||
source: Arc<http::uri::InvalidUriParts>,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Error, Debug, Clone)]
|
||||
pub enum WsError {
|
||||
#[error(transparent)]
|
||||
UriError(#[from] UriError),
|
||||
|
||||
#[error("Error in WebSocket connection with {uri}: {source}")]
|
||||
Tungstenite {
|
||||
uri: Uri,
|
||||
#[source]
|
||||
// `Arc` is required for `Self: Clone`, as `tungstenite::Error: !Clone`.
|
||||
source: Arc<tokio_tungstenite::tungstenite::Error>,
|
||||
},
|
||||
|
||||
#[error("Failed to deserialize WebSocket message: {source}")]
|
||||
DeserializeMessage {
|
||||
#[source]
|
||||
source: bsatn::DecodeError,
|
||||
},
|
||||
}
|
||||
|
||||
pub struct WsConnection {
|
||||
sock: WebSocketStream<MaybeTlsStream<TcpStream>>,
|
||||
}
|
||||
|
||||
fn parse_scheme(scheme: Option<Scheme>) -> Result<Scheme, UriError> {
|
||||
Ok(match scheme {
|
||||
Some(s) => match s.as_str() {
|
||||
"ws" | "wss" => s,
|
||||
"http" => "ws".parse().unwrap(),
|
||||
"https" => "wss".parse().unwrap(),
|
||||
unknown_scheme => {
|
||||
return Err(UriError::UnknownUriScheme {
|
||||
scheme: unknown_scheme.into(),
|
||||
});
|
||||
}
|
||||
},
|
||||
None => "ws".parse().unwrap(),
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Default)]
|
||||
pub struct WsParams {
|
||||
pub compression: Compression,
|
||||
pub light: bool,
|
||||
/// `Some(true)` to enable confirmed reads for the connection,
|
||||
/// `Some(false)` to disable them.
|
||||
/// `None` to not set the parameter and let the server choose.
|
||||
pub confirmed: Option<bool>,
|
||||
}
|
||||
|
||||
pub fn make_uri(
|
||||
host: Uri,
|
||||
db_name: &str,
|
||||
connection_id: Option<ConnectionId>,
|
||||
params: WsParams,
|
||||
) -> Result<Uri, UriError> {
|
||||
let mut parts = host.into_parts();
|
||||
let scheme = parse_scheme(parts.scheme.take())?;
|
||||
parts.scheme = Some(scheme);
|
||||
let mut path = if let Some(path_and_query) = parts.path_and_query {
|
||||
if let Some(query) = path_and_query.query() {
|
||||
return Err(UriError::UnexpectedQuery { query: query.into() });
|
||||
}
|
||||
path_and_query.path().to_string()
|
||||
} else {
|
||||
"/".to_string()
|
||||
};
|
||||
|
||||
// Normalize the path, ensuring it ends with `/`.
|
||||
if !path.ends_with('/') {
|
||||
path.push('/');
|
||||
}
|
||||
|
||||
path.push_str("v1/database/");
|
||||
path.push_str(db_name);
|
||||
path.push_str("/subscribe");
|
||||
|
||||
// Specify the desired compression for host->client replies.
|
||||
match params.compression {
|
||||
Compression::None => path.push_str("?compression=None"),
|
||||
Compression::Gzip => path.push_str("?compression=Gzip"),
|
||||
// The host uses the same default as the sdk,
|
||||
// but in case this changes, we prefer to be explicit now.
|
||||
Compression::Brotli => path.push_str("?compression=Brotli"),
|
||||
};
|
||||
|
||||
// Provide the connection ID if the client provided one.
|
||||
if let Some(cid) = connection_id {
|
||||
// If a connection ID is provided, append it to the path.
|
||||
path.push_str("&connection_id=");
|
||||
path.push_str(&cid.to_hex());
|
||||
}
|
||||
|
||||
// Specify the `light` mode if requested.
|
||||
if params.light {
|
||||
path.push_str("&light=true");
|
||||
}
|
||||
|
||||
// Enable confirmed reads if requested.
|
||||
if let Some(confirmed) = params.confirmed {
|
||||
path.push_str("&confirmed=");
|
||||
path.push_str(if confirmed { "true" } else { "false" });
|
||||
}
|
||||
|
||||
parts.path_and_query = Some(path.parse().map_err(|source: InvalidUri| UriError::InvalidUri {
|
||||
source: Arc::new(source),
|
||||
})?);
|
||||
Uri::from_parts(parts).map_err(|source| UriError::InvalidUriParts {
|
||||
source: Arc::new(source),
|
||||
})
|
||||
}
|
||||
|
||||
// Tungstenite doesn't offer an interface to specify a WebSocket protocol, which frankly
|
||||
// seems like a pretty glaring omission in its API. In order to insert our own protocol
|
||||
// header, we manually the `Request` constructed by
|
||||
// `tungstenite::IntoClientRequest::into_client_request`.
|
||||
|
||||
// TODO: `core` uses [Hyper](https://docs.rs/hyper/latest/hyper/) as its HTTP library
|
||||
// rather than having Tungstenite manage its own connections. Should this library do
|
||||
// the same?
|
||||
|
||||
fn make_request(
|
||||
host: Uri,
|
||||
db_name: &str,
|
||||
token: Option<&str>,
|
||||
connection_id: Option<ConnectionId>,
|
||||
params: WsParams,
|
||||
) -> Result<http::Request<()>, WsError> {
|
||||
let uri = make_uri(host, db_name, connection_id, params)?;
|
||||
let mut req = IntoClientRequest::into_client_request(uri.clone()).map_err(|source| WsError::Tungstenite {
|
||||
uri,
|
||||
source: Arc::new(source),
|
||||
})?;
|
||||
request_insert_protocol_header(&mut req);
|
||||
request_insert_auth_header(&mut req, token);
|
||||
Ok(req)
|
||||
}
|
||||
|
||||
fn request_insert_protocol_header(req: &mut http::Request<()>) {
|
||||
req.headers_mut().insert(
|
||||
http::header::SEC_WEBSOCKET_PROTOCOL,
|
||||
const { http::HeaderValue::from_static(BIN_PROTOCOL) },
|
||||
);
|
||||
}
|
||||
|
||||
fn request_insert_auth_header(req: &mut http::Request<()>, token: Option<&str>) {
|
||||
if let Some(token) = token {
|
||||
let auth = ["Bearer ", token].concat().try_into().unwrap();
|
||||
req.headers_mut().insert(http::header::AUTHORIZATION, auth);
|
||||
}
|
||||
}
|
||||
|
||||
/// If `res` evaluates to `Err(e)`, log a warning in the form `"{}: {:?}", $cause, e`.
|
||||
///
|
||||
/// Could be trivially written as a function, but macro-ifying it preserves the source location of the log.
|
||||
macro_rules! maybe_log_error {
|
||||
($cause:expr, $res:expr) => {
|
||||
if let Err(e) = $res {
|
||||
log::warn!("{}: {:?}", $cause, e);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[derive(Deserialize, PartialEq, Debug)]
|
||||
pub enum ServerMessage {
|
||||
/// Informs of changes to subscribed rows.
|
||||
/// This will be removed when we switch to `SubscribeSingle`.
|
||||
InitialSubscription,
|
||||
/// Upon reducer run.
|
||||
TransactionUpdate,
|
||||
/// Upon reducer run, but limited to just the table updates.
|
||||
TransactionUpdateLight,
|
||||
/// After connecting, to inform client of its identity.
|
||||
IdentityToken,
|
||||
/// Return results to a one off SQL query.
|
||||
OneOffQueryResponse,
|
||||
/// Sent in response to a `SubscribeSingle` message. This contains the initial matching rows.
|
||||
SubscribeApplied,
|
||||
/// Sent in response to an `Unsubscribe` message. This contains the matching rows.
|
||||
UnsubscribeApplied,
|
||||
/// Communicate an error in the subscription lifecycle.
|
||||
SubscriptionError,
|
||||
/// Sent in response to a `SubscribeMulti` message. This contains the initial matching rows.
|
||||
SubscribeMultiApplied,
|
||||
/// Sent in response to an `UnsubscribeMulti` message. This contains the matching rows.
|
||||
UnsubscribeMultiApplied,
|
||||
/// Sent in response to a [`CallProcedure`] message. This contains the return value.
|
||||
ProcedureResult,
|
||||
}
|
||||
|
||||
pub type Recv = mpsc::UnboundedReceiver<ServerMessage>;
|
||||
pub type Send = mpsc::UnboundedSender<ClientMessage<Bytes>>;
|
||||
|
||||
impl WsConnection {
|
||||
pub async fn connect(
|
||||
host: Uri,
|
||||
db_name: &str,
|
||||
token: Option<&str>,
|
||||
connection_id: Option<ConnectionId>,
|
||||
params: WsParams,
|
||||
) -> Result<Self, WsError> {
|
||||
let req = make_request(host, db_name, token, connection_id, params)?;
|
||||
|
||||
// Grab the URI for error-reporting.
|
||||
let uri = req.uri().clone();
|
||||
|
||||
let (sock, _): (WebSocketStream<MaybeTlsStream<TcpStream>>, _) = connect_async_with_config(
|
||||
req,
|
||||
// TODO(kim): In order to be able to replicate module WASM blobs,
|
||||
// `cloud-next` cannot have message / frame size limits. That's
|
||||
// obviously a bad default for all other clients, though.
|
||||
Some(WebSocketConfig::default().max_frame_size(None).max_message_size(None)),
|
||||
false,
|
||||
)
|
||||
.await
|
||||
.map_err(|source| WsError::Tungstenite {
|
||||
uri,
|
||||
source: Arc::new(source),
|
||||
})?;
|
||||
Ok(WsConnection { sock })
|
||||
}
|
||||
|
||||
fn parse_response(bytes: &[u8]) -> Result<ServerMessage, WsError> {
|
||||
let bytes = decompress_server_message(bytes);
|
||||
bsatn::from_slice(bytes).map_err(|source| WsError::DeserializeMessage { source })
|
||||
}
|
||||
|
||||
fn encode_message(msg: ClientMessage<Bytes>) -> WebSocketMessage {
|
||||
WebSocketMessage::Binary(bsatn::to_vec(&msg).unwrap().into())
|
||||
}
|
||||
|
||||
async fn message_loop(
|
||||
mut self,
|
||||
incoming_messages: mpsc::UnboundedSender<ServerMessage>,
|
||||
outgoing_messages: mpsc::UnboundedReceiver<ClientMessage<Bytes>>,
|
||||
) {
|
||||
// There is a small but plausible chance that a client's socket will not
|
||||
// be notified that the remote end has closed the connection, e.g.
|
||||
// because of the remote machine being power cycled, or middleboxes
|
||||
// misbehaving.
|
||||
//
|
||||
// Unless the client uses dynamic subscriptions, it will only ever try
|
||||
// to read from the socket, and thus not notice the connection closure.
|
||||
//
|
||||
// For certain types of clients it is crucial to eventually time out
|
||||
// such connections, and attempt to reconnect. We don't, however, want
|
||||
// to flood the server with `Ping` frames unnecessarily.
|
||||
//
|
||||
// Instead, we:
|
||||
//
|
||||
// * Check every `IDLE_TIMEOUT` whether some data has arrived.
|
||||
//
|
||||
// - If not, send a `Ping` frame.
|
||||
//
|
||||
// * Check after another `IDLE_TIMEOUT` whether data has arrived.
|
||||
//
|
||||
// - If not, and we were expecting a `Pong` response, consider the
|
||||
// connection bad and exit the loop, thereby closing the socket.
|
||||
//
|
||||
// Note that the server also initiates `Ping`s, currently at `2 * IDLE_TIMEOUT`.
|
||||
// If both ends cannot communicate, we assume the server has already
|
||||
// timed out the client, and so don't bother sending a `Close` frame.
|
||||
const IDLE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
let mut idle_timeout_interval = tokio::time::interval_at(Instant::now() + IDLE_TIMEOUT, IDLE_TIMEOUT);
|
||||
|
||||
let mut idle = true;
|
||||
let mut want_pong = false;
|
||||
|
||||
let mut outgoing_messages = Some(outgoing_messages);
|
||||
loop {
|
||||
tokio::select! {
|
||||
incoming = self.sock.try_next() => match incoming {
|
||||
Err(tokio_tungstenite::tungstenite::error::Error::ConnectionClosed) | Ok(None) => {
|
||||
log::info!("Connection closed");
|
||||
break;
|
||||
},
|
||||
|
||||
Err(e) => {
|
||||
maybe_log_error!(
|
||||
"Error reading message from read WebSocket stream",
|
||||
Result::<(), _>::Err(e)
|
||||
);
|
||||
break;
|
||||
},
|
||||
|
||||
Ok(Some(WebSocketMessage::Binary(bytes))) => {
|
||||
idle = false;
|
||||
match Self::parse_response(&bytes) {
|
||||
Err(e) => maybe_log_error!(
|
||||
"Error decoding WebSocketMessage::Binary payload",
|
||||
Result::<(), _>::Err(e)
|
||||
),
|
||||
Ok(msg) => maybe_log_error!(
|
||||
"Error sending decoded message to incoming_messages queue",
|
||||
incoming_messages.send(msg)
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Some(WebSocketMessage::Ping(_))) => {
|
||||
log::trace!("received ping");
|
||||
idle = false;
|
||||
// No need to explicitly respond with a `Pong`,
|
||||
// as tungstenite handles this automatically.
|
||||
// See [https://github.com/snapview/tokio-tungstenite/issues/88].
|
||||
},
|
||||
|
||||
Ok(Some(WebSocketMessage::Pong(_))) => {
|
||||
log::trace!("received pong");
|
||||
idle = false;
|
||||
want_pong = false;
|
||||
},
|
||||
|
||||
Ok(Some(other)) => {
|
||||
log::warn!("Unexpected WebSocket message {other:?}");
|
||||
idle = false;
|
||||
},
|
||||
},
|
||||
|
||||
_ = idle_timeout_interval.tick() => {
|
||||
if mem::replace(&mut idle, true) {
|
||||
if want_pong {
|
||||
// Nothing received while we were waiting for a pong.
|
||||
log::warn!("Connection timed out");
|
||||
break;
|
||||
}
|
||||
|
||||
log::trace!("sending client ping");
|
||||
let ping = WebSocketMessage::Ping(Bytes::new());
|
||||
if let Err(e) = self.sock.send(ping).await {
|
||||
log::warn!("Error sending ping: {e:?}");
|
||||
break;
|
||||
}
|
||||
want_pong = true;
|
||||
}
|
||||
},
|
||||
|
||||
// this is stupid. we want to handle the channel close *once*, and then disable this branch
|
||||
Some(outgoing) = async { Some(outgoing_messages.as_mut()?.recv().await) } => match outgoing {
|
||||
Some(outgoing) => {
|
||||
let msg = Self::encode_message(outgoing);
|
||||
if let Err(e) = self.sock.send(msg).await {
|
||||
log::warn!("Error sending outgoing message: {e:?}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
None => {
|
||||
maybe_log_error!("Error sending close frame", SinkExt::close(&mut self.sock).await);
|
||||
outgoing_messages = None;
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn spawn_message_loop(
|
||||
self,
|
||||
runtime: &runtime::Handle,
|
||||
) -> (
|
||||
JoinHandle<()>,
|
||||
mpsc::UnboundedReceiver<ServerMessage>,
|
||||
mpsc::UnboundedSender<ClientMessage<Bytes>>,
|
||||
) {
|
||||
let (outgoing_send, outgoing_recv) = mpsc::unbounded_channel();
|
||||
let (incoming_send, incoming_recv) = mpsc::unbounded_channel();
|
||||
let handle = runtime.spawn(self.message_loop(incoming_send, outgoing_recv));
|
||||
(handle, incoming_recv, outgoing_send)
|
||||
}
|
||||
}
|
||||
@@ -8,6 +8,25 @@ import { RunResult } from './types.ts';
|
||||
const OP_TIMEOUT_MS = Number(process.env.BENCH_OP_TIMEOUT_MS ?? '15000');
|
||||
const MIN_OP_TIMEOUT_MS = Number(process.env.MIN_OP_TIMEOUT_MS ?? '250');
|
||||
const TAIL_SLACK_MS = Number(process.env.TAIL_SLACK_MS ?? '1000');
|
||||
const DEFAULT_PRECOMPUTED_TRANSFER_PAIRS = 10_000_000;
|
||||
|
||||
function precomputeZipfTransferPairs(
|
||||
accounts: number,
|
||||
alpha: number,
|
||||
count: number,
|
||||
): { from: Uint32Array; to: Uint32Array; count: number } {
|
||||
const pick = zipfSampler(accounts, alpha);
|
||||
const from = new Uint32Array(count);
|
||||
const to = new Uint32Array(count);
|
||||
|
||||
for (let i = 0; i < count; i++) {
|
||||
const [a, b] = pickTwoDistinct(pick);
|
||||
from[i] = a;
|
||||
to[i] = b;
|
||||
}
|
||||
|
||||
return { from, to, count };
|
||||
}
|
||||
|
||||
async function withOpTimeout<T>(
|
||||
promise: Promise<T>,
|
||||
@@ -121,7 +140,28 @@ export async function runOne({
|
||||
}
|
||||
}
|
||||
|
||||
const pick = zipfSampler(accounts, alpha);
|
||||
const precomputedPairsRaw = Number(
|
||||
process.env.BENCH_PRECOMPUTED_TRANSFER_PAIRS ??
|
||||
DEFAULT_PRECOMPUTED_TRANSFER_PAIRS,
|
||||
);
|
||||
const precomputedPairs = Number.isFinite(precomputedPairsRaw)
|
||||
? Math.max(1, Math.floor(precomputedPairsRaw))
|
||||
: DEFAULT_PRECOMPUTED_TRANSFER_PAIRS;
|
||||
|
||||
console.log(
|
||||
`[${connector.name}] precomputing ${precomputedPairs} Zipf transfer pairs...`,
|
||||
);
|
||||
const precomputeStart = performance.now();
|
||||
const transferPairs = precomputeZipfTransferPairs(
|
||||
accounts,
|
||||
alpha,
|
||||
precomputedPairs,
|
||||
);
|
||||
const precomputeElapsedMs = performance.now() - precomputeStart;
|
||||
console.log(
|
||||
`[${connector.name}] precomputed ${transferPairs.count} pairs in ${(precomputeElapsedMs / 1000).toFixed(2)}s`,
|
||||
);
|
||||
|
||||
const start = performance.now();
|
||||
const endAt = start + seconds * 1000;
|
||||
|
||||
@@ -153,6 +193,22 @@ export async function runOne({
|
||||
|
||||
async function worker(workerIndex: number) {
|
||||
const conn = workers[workerIndex];
|
||||
const pairsPerWorker = Math.max(
|
||||
1,
|
||||
Math.floor(transferPairs.count / concurrency),
|
||||
);
|
||||
let pairIndex = workerIndex * pairsPerWorker;
|
||||
|
||||
const nextTransferPair = (): [number, number] => {
|
||||
if (pairIndex >= transferPairs.count) {
|
||||
pairIndex = 0;
|
||||
}
|
||||
|
||||
const from = transferPairs.from[pairIndex]!;
|
||||
const to = transferPairs.to[pairIndex]!;
|
||||
pairIndex++;
|
||||
return [from, to];
|
||||
};
|
||||
|
||||
// non-pipelined
|
||||
if (!PIPELINED) {
|
||||
@@ -166,7 +222,7 @@ export async function runOne({
|
||||
Math.min(OP_TIMEOUT_MS, timeLeft + TAIL_SLACK_MS),
|
||||
);
|
||||
|
||||
const [from, to] = pickTwoDistinct(pick);
|
||||
const [from, to] = nextTransferPair();
|
||||
|
||||
collisionTracker.begin(from);
|
||||
collisionTracker.begin(to);
|
||||
@@ -213,7 +269,7 @@ export async function runOne({
|
||||
const unlimitedInflight = !Number.isFinite(MAX_INFLIGHT_PER_WORKER);
|
||||
|
||||
const launchOp = (dynamicTimeout: number) => {
|
||||
const [from, to] = pickTwoDistinct(pick);
|
||||
const [from, to] = nextTransferPair();
|
||||
|
||||
collisionTracker.begin(from);
|
||||
collisionTracker.begin(to);
|
||||
|
||||
@@ -7,6 +7,7 @@ import { CONNECTORS } from './connectors';
|
||||
import { runOne } from './core/runner';
|
||||
import { initConvex } from './init/init_convex';
|
||||
import { sh } from './init/utils';
|
||||
import * as fs from 'fs';
|
||||
|
||||
// Simple TCP ping - just check if something is listening on the port
|
||||
function ping(port: number, timeoutMs = 2000): Promise<boolean> {
|
||||
@@ -69,8 +70,8 @@ function hasFlag(name: string): boolean {
|
||||
}
|
||||
|
||||
const seconds = getArg('seconds', 10);
|
||||
const concurrency = getArg('concurrency', 50);
|
||||
const alpha = getArg('alpha', 1.5);
|
||||
const concurrency = getArg('concurrency', 10);
|
||||
const alpha = getArg('alpha', 0.5);
|
||||
const systems = getStringArg('systems', 'convex,spacetimedb')
|
||||
.split(',')
|
||||
.map((s) => s.trim());
|
||||
@@ -235,6 +236,7 @@ async function prepSystem(system: string): Promise<void> {
|
||||
if (system === 'spacetimedb') {
|
||||
const moduleName = process.env.STDB_MODULE || 'test-1';
|
||||
const server = process.env.STDB_SERVER || 'local';
|
||||
const server2 = process.env.STDB_SERVER || 'http://localhost:3000';
|
||||
const modulePath = process.env.STDB_MODULE_PATH || './spacetimedb';
|
||||
|
||||
// Publish module (creates DB if needed, updates if exists)
|
||||
@@ -246,15 +248,24 @@ async function prepSystem(system: string): Promise<void> {
|
||||
'--module-path',
|
||||
modulePath,
|
||||
]);
|
||||
await sh('spacetime', [
|
||||
'call',
|
||||
await sh('cargo', [
|
||||
'run',
|
||||
//"--quiet",
|
||||
"--manifest-path",
|
||||
"spacetimedb-rust-client/Cargo.toml",
|
||||
"--",
|
||||
"seed",
|
||||
//"--quiet",
|
||||
'--server',
|
||||
server,
|
||||
server2,
|
||||
"--module",
|
||||
moduleName,
|
||||
'seed',
|
||||
"--accounts",
|
||||
String(accounts),
|
||||
"--initial-balance",
|
||||
String(initialBalance),
|
||||
]);
|
||||
console.log('[spacetimedb] seed complete.');
|
||||
} else if (system === 'convex') {
|
||||
await initConvex();
|
||||
} else {
|
||||
@@ -276,11 +287,9 @@ async function prepSystem(system: string): Promise<void> {
|
||||
interface BenchResult {
|
||||
system: string;
|
||||
tps: number;
|
||||
p50_ms: number;
|
||||
p99_ms: number;
|
||||
}
|
||||
|
||||
async function runBenchmark(system: string): Promise<BenchResult | null> {
|
||||
async function runBenchmarkOther(system: string): Promise<BenchResult | null> {
|
||||
const connectorFactory = (CONNECTORS as any)[system];
|
||||
if (!connectorFactory) {
|
||||
console.log(` ${system}: Unknown connector`);
|
||||
@@ -303,11 +312,56 @@ async function runBenchmark(system: string): Promise<BenchResult | null> {
|
||||
return {
|
||||
system,
|
||||
tps: Math.round(result.tps),
|
||||
p50_ms: result.p50_ms,
|
||||
p99_ms: result.p99_ms,
|
||||
};
|
||||
}
|
||||
|
||||
async function runBenchmarkStdb(): Promise<BenchResult | null> {
|
||||
const moduleName = process.env.STDB_MODULE || 'test-1';
|
||||
const server2 = process.env.STDB_SERVER || 'http://localhost:3000';
|
||||
|
||||
await sh('cargo', [
|
||||
'run',
|
||||
//"--quiet",
|
||||
"--manifest-path",
|
||||
"spacetimedb-rust-client/Cargo.toml",
|
||||
"--",
|
||||
"bench",
|
||||
//"--quiet",
|
||||
'--server',
|
||||
server2,
|
||||
"--module",
|
||||
moduleName,
|
||||
"--duration",
|
||||
`${seconds}s`,
|
||||
"--connections",
|
||||
String(concurrency),
|
||||
"--alpha",
|
||||
String(alpha),
|
||||
"--tps-write-path",
|
||||
"spacetimedb-tps.tmp.log",
|
||||
]);
|
||||
|
||||
const tpsStr = fs.readFileSync("spacetimedb-tps.tmp.log", 'utf-8').trim();
|
||||
const tps = Number(tpsStr);
|
||||
if (isNaN(tps)) {
|
||||
console.warn(`[spacetimedb] Failed to parse TPS from file: ${tpsStr}`);
|
||||
return null;
|
||||
}
|
||||
|
||||
return {
|
||||
system: "spacetimedb",
|
||||
tps: Math.round(tps),
|
||||
};
|
||||
}
|
||||
|
||||
async function runBenchmark(system: string): Promise<BenchResult | null> {
|
||||
if (system === 'spacetimedb') {
|
||||
return await runBenchmarkStdb();
|
||||
} else {
|
||||
return await runBenchmarkOther(system);
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Display
|
||||
// ============================================================================
|
||||
@@ -381,11 +435,11 @@ async function displayResults(results: BenchResult[]): Promise<void> {
|
||||
console.log(' ' + c('cyan', '║') + ' '.repeat(boxWidth) + c('cyan', '║'));
|
||||
console.log(
|
||||
' ' +
|
||||
c('cyan', '║') +
|
||||
' '.repeat(msgPadding) +
|
||||
c('bold', c('green', msgWithEmoji)) +
|
||||
' '.repeat(rightPadding) +
|
||||
c('cyan', '║'),
|
||||
c('cyan', '║') +
|
||||
' '.repeat(msgPadding) +
|
||||
c('bold', c('green', msgWithEmoji)) +
|
||||
' '.repeat(rightPadding) +
|
||||
c('cyan', '║'),
|
||||
);
|
||||
console.log(' ' + c('cyan', '║') + ' '.repeat(boxWidth) + c('cyan', '║'));
|
||||
console.log(' ' + c('cyan', '╚' + '═'.repeat(boxWidth) + '╝'));
|
||||
@@ -438,8 +492,8 @@ async function main() {
|
||||
} else {
|
||||
console.log(
|
||||
'\n' +
|
||||
c('bold', ' [2/4] Preparing databases...') +
|
||||
c('dim', ' (skipped)\n'),
|
||||
c('bold', ' [2/4] Preparing databases...') +
|
||||
c('dim', ' (skipped)\n'),
|
||||
);
|
||||
}
|
||||
|
||||
@@ -478,8 +532,6 @@ async function main() {
|
||||
results: results.map((r) => ({
|
||||
system: r.system,
|
||||
tps: r.tps,
|
||||
p50_ms: r.p50_ms,
|
||||
p99_ms: r.p99_ms,
|
||||
})),
|
||||
},
|
||||
null,
|
||||
|
||||
Reference in New Issue
Block a user