From f22f80060fe534074dcfbe7286802a57f6497bef Mon Sep 17 00:00:00 2001 From: Noa Date: Wed, 29 Apr 2026 21:40:29 -0500 Subject: [PATCH] [TS] Allow brotli to be specified for compression and reorganize some websocket stuff (#4561) # Description of Changes [Some runtimes](https://developer.mozilla.org/en-US/docs/Web/API/DecompressionStream#browser_compatibility) support brotli for `DecompressionStream`, so I figure we may as well allow it. Also reorganizes some of the websocket code for better separation of concerns. # Expected complexity level and risk 1 # Testing - [ ] - [ ] --- .../src/lib/binary_writer.ts | 2 +- .../src/sdk/db_connection_builder.ts | 23 +++-- .../src/sdk/db_connection_impl.ts | 26 +++--- .../bindings-typescript/src/sdk/decompress.ts | 30 ++---- .../src/sdk/websocket_decompress_adapter.ts | 91 +++---------------- .../src/sdk/websocket_test_adapter.ts | 28 ++---- .../src/sdk/websocket_v3_frames.ts | 24 ++--- crates/bindings-typescript/src/sdk/ws.ts | 84 ++++++++++++++++- .../test-app/tsconfig.app.json | 2 +- .../tests/db_connection.test.ts | 20 ++-- 10 files changed, 162 insertions(+), 168 deletions(-) diff --git a/crates/bindings-typescript/src/lib/binary_writer.ts b/crates/bindings-typescript/src/lib/binary_writer.ts index 6370e4815..048ecccb5 100644 --- a/crates/bindings-typescript/src/lib/binary_writer.ts +++ b/crates/bindings-typescript/src/lib/binary_writer.ts @@ -63,7 +63,7 @@ export default class BinaryWriter { return fromByteArray(this.getBuffer()); } - getBuffer(): Uint8Array { + getBuffer(): Uint8Array { return new Uint8Array(this.buffer.buffer, 0, this.offset); } diff --git a/crates/bindings-typescript/src/sdk/db_connection_builder.ts b/crates/bindings-typescript/src/sdk/db_connection_builder.ts index 43bda10f6..282cab02d 100644 --- a/crates/bindings-typescript/src/sdk/db_connection_builder.ts +++ b/crates/bindings-typescript/src/sdk/db_connection_builder.ts @@ -8,6 +8,7 @@ import type { } from '../'; import { ensureMinimumVersionOrThrow } from './version'; import { WebsocketDecompressAdapter } from './websocket_decompress_adapter'; +import type { WebSocketFactory } from './ws'; /** * The database client connection to a SpacetimeDB server. @@ -23,10 +24,10 @@ export class DbConnectionBuilder> { #identity?: Identity; #token?: string; #emitter: EventEmitter = new EventEmitter(); - #compression: 'gzip' | 'none' = 'gzip'; + #compression: 'gzip' | 'brotli' | 'none' = 'gzip'; #lightMode: boolean = false; #confirmedReads?: boolean; - #createWSFn: typeof WebsocketDecompressAdapter.createWebSocketFn; + #createWSFn: WebSocketFactory; /** * Creates a new `DbConnectionBuilder` database client and set the initial parameters. @@ -42,7 +43,7 @@ export class DbConnectionBuilder> { config: DbConnectionConfig> ) => DbConnection ) { - this.#createWSFn = WebsocketDecompressAdapter.createWebSocketFn; + this.#createWSFn = WebsocketDecompressAdapter.openWebSocket; } /** @@ -82,9 +83,7 @@ export class DbConnectionBuilder> { return this; } - withWSFn( - createWSFn: typeof WebsocketDecompressAdapter.createWebSocketFn - ): this { + withWSFn(createWSFn: WebSocketFactory): this { this.#createWSFn = createWSFn; return this; } @@ -94,7 +93,17 @@ export class DbConnectionBuilder> { * * @param compression The compression algorithm to use for the connection. */ - withCompression(compression: 'gzip' | 'none'): this { + withCompression(compression: 'gzip' | 'brotli' | 'none'): this { + if (compression === 'brotli') { + try { + new DecompressionStream('brotli' as CompressionFormat); + } catch (e) { + throw new TypeError( + `Brotli compression is not supported by the runtime. Please choose a different compression method.`, + { cause: e } + ); + } + } this.#compression = compression; return this; } diff --git a/crates/bindings-typescript/src/sdk/db_connection_impl.ts b/crates/bindings-typescript/src/sdk/db_connection_impl.ts index 954954040..1e604349f 100644 --- a/crates/bindings-typescript/src/sdk/db_connection_impl.ts +++ b/crates/bindings-typescript/src/sdk/db_connection_impl.ts @@ -37,10 +37,6 @@ import { type PendingCallback, type TableUpdate as CacheTableUpdate, } from './table_cache.ts'; -import { - WebsocketDecompressAdapter, - type WebsocketAdapter, -} from './websocket_decompress_adapter.ts'; import { SubscriptionBuilderImpl, SubscriptionHandleImpl, @@ -60,6 +56,7 @@ import type { ProceduresView } from './procedures.ts'; import type { Values } from '../lib/type_util.ts'; import type { TransactionUpdate } from './client_api/types.ts'; import { InternalError, SenderError } from '../lib/errors.ts'; +import type { WebSocketAdapter, WebSocketFactory } from './ws.ts'; import { normalizeWsProtocol, PREFERRED_WS_PROTOCOLS, @@ -101,8 +98,8 @@ export type DbConnectionConfig = { identity?: Identity; token?: string; emitter: EventEmitter; - createWSFn: typeof WebsocketDecompressAdapter.createWebSocketFn; - compression: 'gzip' | 'none'; + createWSFn: WebSocketFactory; + compression: 'gzip' | 'brotli' | 'none'; lightMode: boolean; confirmedReads?: boolean; remoteModule: RemoteModule; @@ -186,7 +183,7 @@ export class DbConnectionImpl #inboundQueue: Uint8Array[] = []; #inboundQueueOffset = 0; #isDrainingInboundQueue = false; - #outboundQueue: Uint8Array[] = []; + #outboundQueue: Uint8Array[] = []; #isOutboundFlushScheduled = false; #negotiatedWsProtocol: NegotiatedWsProtocol = V2_WS_PROTOCOL; #subscriptionManager = new SubscriptionManager(); @@ -224,8 +221,8 @@ export class DbConnectionImpl // private fields. // We use them in testing. private clientCache: ClientCache; - private ws?: WebsocketAdapter; - private wsPromise: Promise; + private ws?: WebSocketAdapter; + private wsPromise: Promise; constructor({ uri, @@ -612,7 +609,7 @@ export class DbConnectionImpl return this.#mergeTableUpdates(updates); } - #flushOutboundQueue(wsResolved: WebsocketAdapter): void { + #flushOutboundQueue(wsResolved: WebSocketAdapter): void { if (this.#negotiatedWsProtocol === V3_WS_PROTOCOL) { this.#flushOutboundQueueV3(wsResolved); return; @@ -620,14 +617,14 @@ export class DbConnectionImpl this.#flushOutboundQueueV2(wsResolved); } - #flushOutboundQueueV2(wsResolved: WebsocketAdapter): void { + #flushOutboundQueueV2(wsResolved: WebSocketAdapter): void { const pending = this.#outboundQueue.splice(0); for (const message of pending) { wsResolved.send(message); } } - #flushOutboundQueueV3(wsResolved: WebsocketAdapter): void { + #flushOutboundQueueV3(wsResolved: WebSocketAdapter): void { if (this.#outboundQueue.length === 0) { return; } @@ -692,7 +689,10 @@ export class DbConnectionImpl #reducerArgsEncoder = new BinaryWriter(1024); #clientMessageEncoder = new BinaryWriter(1024); - #sendEncodedMessage(encoded: Uint8Array, describe: () => string): void { + #sendEncodedMessage( + encoded: Uint8Array, + describe: () => string + ): void { stdbLogger('trace', describe); if (this.ws && this.isActive) { if (this.#negotiatedWsProtocol === V2_WS_PROTOCOL) { diff --git a/crates/bindings-typescript/src/sdk/decompress.ts b/crates/bindings-typescript/src/sdk/decompress.ts index cdc49a224..8c35e0d90 100644 --- a/crates/bindings-typescript/src/sdk/decompress.ts +++ b/crates/bindings-typescript/src/sdk/decompress.ts @@ -1,12 +1,11 @@ export async function decompress( - buffer: Uint8Array, - // Leaving it here to expand to brotli when it lands in the browsers and NodeJS - type: 'gzip', + buffer: Uint8Array, + type: CompressionFormat, chunkSize: number = 128 * 1024 // 128KB ): Promise { // Create a single ReadableStream to handle chunks let offset = 0; - const readableStream = new ReadableStream({ + const readableStream = new ReadableStream({ pull(controller) { if (offset < buffer.length) { // Slice a chunk of the buffer and enqueue it @@ -29,24 +28,9 @@ export async function decompress( const decompressedStream = readableStream.pipeThrough(decompressionStream); // Collect the decompressed chunks efficiently - const reader = decompressedStream.getReader(); - const chunks: Uint8Array[] = []; - let totalLength = 0; - let result: any; - - while (!(result = await reader.read()).done) { - chunks.push(result.value); - totalLength += result.value.length; + const chunks = []; + for await (const chunk of decompressedStream) { + chunks.push(chunk); } - - // Allocate a single Uint8Array for the decompressed data - const decompressedArray = new Uint8Array(totalLength); - let chunkOffset = 0; - - for (const chunk of chunks) { - decompressedArray.set(chunk, chunkOffset); - chunkOffset += chunk.length; - } - - return decompressedArray; + return new Blob(chunks).bytes(); } diff --git a/crates/bindings-typescript/src/sdk/websocket_decompress_adapter.ts b/crates/bindings-typescript/src/sdk/websocket_decompress_adapter.ts index b5db13a81..884dba30f 100644 --- a/crates/bindings-typescript/src/sdk/websocket_decompress_adapter.ts +++ b/crates/bindings-typescript/src/sdk/websocket_decompress_adapter.ts @@ -1,22 +1,10 @@ import { decompress } from './decompress'; -import { resolveWS } from './ws'; +import { openWebSocket, type WebSocketAdapter, type WebSocketArgs } from './ws'; -export interface WebsocketAdapter { - readonly protocol: string; - send(msg: Uint8Array): void; - close(): void; - - set onclose(handler: (ev: CloseEvent) => void); - set onopen(handler: () => void); - set onmessage(handler: (msg: { data: Uint8Array }) => void); - set onerror(handler: (msg: ErrorEvent) => void); -} - -export class WebsocketDecompressAdapter implements WebsocketAdapter { +export class WebsocketDecompressAdapter implements WebSocketAdapter { get protocol(): string { return this.#ws.protocol; } - set onclose(handler: (ev: CloseEvent) => void) { this.#ws.onclose = handler; } @@ -35,16 +23,17 @@ export class WebsocketDecompressAdapter implements WebsocketAdapter { #ws: WebSocket; - async #decompress(buffer: Uint8Array): Promise { + async #decompress(buffer: Uint8Array): Promise { const tag = buffer[0]; const data = buffer.subarray(1); switch (tag) { case 0: return data; case 1: - throw new Error( - 'Brotli Compression not supported. Please use gzip or none compression in withCompression method on DbConnection.' - ); + // Some runtimes support brotli, but it's not yet defined in `lib.dom.d.ts`. + // We assert runtime support in `DbConnectionBuilder.withCompression`, so + // this cast is safe. + return await decompress(data, 'brotli' as CompressionFormat); case 2: return await decompress(data, 'gzip'); default: @@ -54,7 +43,7 @@ export class WebsocketDecompressAdapter implements WebsocketAdapter { } } - send(msg: Uint8Array): void { + send(msg: Uint8Array): void { this.#ws.send(msg); } @@ -63,68 +52,12 @@ export class WebsocketDecompressAdapter implements WebsocketAdapter { } constructor(ws: WebSocket) { - ws.binaryType = 'arraybuffer'; - this.#ws = ws; } - static async createWebSocketFn({ - url, - nameOrAddress, - wsProtocol, - authToken, - compression, - lightMode, - confirmedReads, - }: { - url: URL; - wsProtocol: string | string[]; - nameOrAddress: string; - authToken?: string; - compression: 'gzip' | 'none'; - lightMode: boolean; - confirmedReads?: boolean; - }): Promise { - const headers = new Headers(); - - const WS = await resolveWS(); - - // We swap our original token to a shorter-lived token - // to avoid sending the original via query params. - let temporaryAuthToken: string | undefined = undefined; - if (authToken) { - headers.set('Authorization', `Bearer ${authToken}`); - const tokenUrl = new URL('v1/identity/websocket-token', url); - tokenUrl.protocol = url.protocol === 'wss:' ? 'https:' : 'http:'; - - const response = await fetch(tokenUrl, { method: 'POST', headers }); - if (response.ok) { - const { token } = await response.json(); - temporaryAuthToken = token; - } else { - return Promise.reject( - new Error(`Failed to verify token: ${response.statusText}`) - ); - } - } - - const databaseUrl = new URL(`v1/database/${nameOrAddress}/subscribe`, url); - if (temporaryAuthToken) { - databaseUrl.searchParams.set('token', temporaryAuthToken); - } - databaseUrl.searchParams.set( - 'compression', - compression === 'gzip' ? 'Gzip' : 'None' - ); - if (lightMode) { - databaseUrl.searchParams.set('light', 'true'); - } - if (confirmedReads !== undefined) { - databaseUrl.searchParams.set('confirmed', confirmedReads.toString()); - } - - const ws = new WS(databaseUrl.toString(), wsProtocol); - - return new WebsocketDecompressAdapter(ws); + static async openWebSocket( + args: WebSocketArgs + ): Promise { + return new this(await openWebSocket(args)); } } diff --git a/crates/bindings-typescript/src/sdk/websocket_test_adapter.ts b/crates/bindings-typescript/src/sdk/websocket_test_adapter.ts index 4c2c48b42..b32e7744a 100644 --- a/crates/bindings-typescript/src/sdk/websocket_test_adapter.ts +++ b/crates/bindings-typescript/src/sdk/websocket_test_adapter.ts @@ -1,17 +1,17 @@ import BinaryReader from '../lib/binary_reader.ts'; import BinaryWriter from '../lib/binary_writer.ts'; import { ClientMessage, ServerMessage } from './client_api/types'; -import type { WebsocketAdapter } from './websocket_decompress_adapter'; +import type { WebSocketAdapter, WebSocketFactory } from './ws'; import { PREFERRED_WS_PROTOCOLS, V3_WS_PROTOCOL } from './websocket_protocols'; import { decodeClientMessagesV3, encodeServerMessagesV3, } from './websocket_v3_frames.ts'; -class WebsocketTestAdapter implements WebsocketAdapter { +class WebsocketTestAdapter implements WebSocketAdapter { protocol: string = ''; - messageQueue: Uint8Array[]; + messageQueue: Uint8Array[]; outgoingMessages: ClientMessage[]; closed: boolean; supportedProtocols: string[]; @@ -41,7 +41,7 @@ class WebsocketTestAdapter implements WebsocketAdapter { set onerror(_handler: (msg: ErrorEvent) => void) {} - send(message: Uint8Array): void { + send(message: Uint8Array): void { const rawMessage = message.slice(); const outgoingMessages = this.protocol === V3_WS_PROTOCOL @@ -85,28 +85,16 @@ class WebsocketTestAdapter implements WebsocketAdapter { this.#onmessage({ data: outboundData }); } - async createWebSocketFn(_args: { - url: URL; - wsProtocol: string | string[]; - nameOrAddress: string; - authToken?: string; - compression: 'gzip' | 'none'; - lightMode: boolean; - confirmedReads?: boolean; - }): Promise { - const requestedProtocols = Array.isArray(_args.wsProtocol) - ? _args.wsProtocol - : [_args.wsProtocol]; - const negotiatedProtocol = requestedProtocols.find(protocol => + openWebSocket: WebSocketFactory = async ({ wsProtocol }) => { + const negotiatedProtocol = wsProtocol.find(protocol => this.supportedProtocols.includes(protocol) ); if (!negotiatedProtocol) { - return Promise.reject(new Error('No compatible websocket protocol')); + throw new Error('No compatible websocket protocol'); } this.protocol = negotiatedProtocol; return this; - } + }; } -export type { WebsocketTestAdapter }; export default WebsocketTestAdapter; diff --git a/crates/bindings-typescript/src/sdk/websocket_v3_frames.ts b/crates/bindings-typescript/src/sdk/websocket_v3_frames.ts index 36ea683ec..e6613101c 100644 --- a/crates/bindings-typescript/src/sdk/websocket_v3_frames.ts +++ b/crates/bindings-typescript/src/sdk/websocket_v3_frames.ts @@ -28,9 +28,9 @@ function ensureMessageCount( function concatenateMessagesV3( writer: BinaryWriter, - messages: readonly Uint8Array[], + messages: readonly Uint8Array[], messageCount: number = messages.length -): Uint8Array { +): Uint8Array { ensureMessageCount(messages, messageCount); writer.clear(); for (let i = 0; i < messageCount; i++) { @@ -41,15 +41,15 @@ function concatenateMessagesV3( function splitMessagesV3( reader: BinaryReader, - data: Uint8Array, + data: Uint8Array, deserialize: (reader: BinaryReader) => unknown -): Uint8Array[] { +): Uint8Array[] { reader.reset(data); if (reader.remaining === 0) { throw new RangeError(EMPTY_V3_PAYLOAD_ERR); } - const messages: Uint8Array[] = []; + const messages: Uint8Array[] = []; while (reader.remaining > 0) { const startOffset = reader.offset; deserialize(reader); @@ -60,7 +60,7 @@ function splitMessagesV3( } export function countClientMessagesForV3Frame( - messages: readonly Uint8Array[], + messages: readonly Uint8Array[], maxFrameBytes: number ): number { ensureMessages(messages); @@ -86,13 +86,15 @@ export function countClientMessagesForV3Frame( export function encodeClientMessagesV3( writer: BinaryWriter, - messages: readonly Uint8Array[], + messages: readonly Uint8Array[], messageCount: number = messages.length -): Uint8Array { +): Uint8Array { return concatenateMessagesV3(writer, messages, messageCount); } -export function decodeClientMessagesV3(data: Uint8Array): Uint8Array[] { +export function decodeClientMessagesV3( + data: Uint8Array +): Uint8Array[] { return splitMessagesV3(new BinaryReader(data), data, reader => ClientMessage.deserialize(reader) ); @@ -100,8 +102,8 @@ export function decodeClientMessagesV3(data: Uint8Array): Uint8Array[] { export function encodeServerMessagesV3( writer: BinaryWriter, - messages: readonly Uint8Array[] -): Uint8Array { + messages: readonly Uint8Array[] +): Uint8Array { return concatenateMessagesV3(writer, messages); } diff --git a/crates/bindings-typescript/src/sdk/ws.ts b/crates/bindings-typescript/src/sdk/ws.ts index fb7279d6e..1c3172ad9 100644 --- a/crates/bindings-typescript/src/sdk/ws.ts +++ b/crates/bindings-typescript/src/sdk/ws.ts @@ -1,9 +1,9 @@ import { stdbLogger } from './logger'; -export async function resolveWS(): Promise { +async function resolveWS(): Promise { // Browser or Node >= 22 (or any env that exposes global WebSocket) - if (typeof (globalThis as any).WebSocket !== 'undefined') { - return (globalThis as any).WebSocket as typeof WebSocket; + if (typeof WebSocket !== 'undefined') { + return WebSocket; } // Node without a global WebSocket: lazily load undici's polyfill. @@ -25,3 +25,81 @@ export async function resolveWS(): Promise { throw err; } } + +export interface WebSocketAdapter { + readonly protocol: string; + send(msg: Uint8Array): void; + close(): void; + + set onclose(handler: (ev: CloseEvent) => void); + set onopen(handler: () => void); + set onmessage(handler: (msg: { data: Uint8Array }) => void); + set onerror(handler: (msg: ErrorEvent) => void); +} + +export interface WebSocketArgs { + url: URL; + wsProtocol: string[]; + nameOrAddress: string; + authToken?: string; + compression: 'gzip' | 'brotli' | 'none'; + lightMode: boolean; + confirmedReads?: boolean; +} +export type WebSocketFactory = ( + args: WebSocketArgs +) => Promise; + +/** + * Open a WebSocket to the database specified by the given `WebSocketArgs`. + * @returns a WebSocket with `binaryType` set to `arraybuffer`. + */ +export async function openWebSocket({ + url, + nameOrAddress, + wsProtocol, + authToken, + compression, + lightMode, + confirmedReads, +}: WebSocketArgs): Promise { + const headers = new Headers(); + + const WS = await resolveWS(); + + // We swap our original token to a shorter-lived token + // to avoid sending the original via query params. + let temporaryAuthToken: string | undefined; + if (authToken) { + headers.set('Authorization', `Bearer ${authToken}`); + const tokenUrl = new URL('v1/identity/websocket-token', url); + tokenUrl.protocol = url.protocol === 'wss:' ? 'https:' : 'http:'; + + const response = await fetch(tokenUrl, { method: 'POST', headers }); + if (response.ok) { + const { token } = await response.json(); + temporaryAuthToken = token; + } else { + throw new Error(`Failed to verify token: ${response.statusText}`); + } + } + + const databaseUrl = new URL(`v1/database/${nameOrAddress}/subscribe`, url); + if (temporaryAuthToken) { + databaseUrl.searchParams.set('token', temporaryAuthToken); + } + databaseUrl.searchParams.set( + 'compression', + { gzip: 'Gzip', brotli: 'Brotli', none: 'None' }[compression] ?? 'None' + ); + if (lightMode) { + databaseUrl.searchParams.set('light', 'true'); + } + if (confirmedReads !== undefined) { + databaseUrl.searchParams.set('confirmed', confirmedReads.toString()); + } + + const ws = new WS(databaseUrl.toString(), wsProtocol); + ws.binaryType = 'arraybuffer'; + return ws; +} diff --git a/crates/bindings-typescript/test-app/tsconfig.app.json b/crates/bindings-typescript/test-app/tsconfig.app.json index 563997710..85a14ac73 100644 --- a/crates/bindings-typescript/test-app/tsconfig.app.json +++ b/crates/bindings-typescript/test-app/tsconfig.app.json @@ -4,7 +4,7 @@ "tsBuildInfoFile": "./node_modules/.tmp/tsconfig.app.tsbuildinfo", "target": "ES2020", "useDefineForClassFields": true, - "lib": ["ESNext", "DOM", "DOM.Iterable"], + "lib": ["ESNext", "DOM", "DOM.Iterable", "DOM.AsyncIterable"], "module": "ESNext", "skipLibCheck": true, diff --git a/crates/bindings-typescript/tests/db_connection.test.ts b/crates/bindings-typescript/tests/db_connection.test.ts index 1161c14aa..0580518e2 100644 --- a/crates/bindings-typescript/tests/db_connection.test.ts +++ b/crates/bindings-typescript/tests/db_connection.test.ts @@ -174,7 +174,7 @@ describe('DbConnection', () => { const client = DbConnection.builder() .withUri('ws://127.0.0.1:1234') .withDatabaseName('db') - .withWSFn(wsAdapter.createWebSocketFn.bind(wsAdapter) as any) + .withWSFn(wsAdapter.openWebSocket) .onConnect(() => { called = true; onConnectPromise.resolve(); @@ -201,7 +201,7 @@ describe('DbConnection', () => { const client = DbConnection.builder() .withUri('ws://127.0.0.1:1234') .withDatabaseName('db') - .withWSFn(wsAdapter.createWebSocketFn.bind(wsAdapter) as any) + .withWSFn(wsAdapter.openWebSocket) .build(); await client['wsPromise']; @@ -231,7 +231,7 @@ describe('DbConnection', () => { const client = DbConnection.builder() .withUri('ws://127.0.0.1:1234') .withDatabaseName('db') - .withWSFn(wsAdapter.createWebSocketFn.bind(wsAdapter) as any) + .withWSFn(wsAdapter.openWebSocket) .build(); await client['wsPromise']; @@ -259,7 +259,7 @@ describe('DbConnection', () => { const client = DbConnection.builder() .withUri('ws://127.0.0.1:1234') .withDatabaseName('db') - .withWSFn(wsAdapter.createWebSocketFn.bind(wsAdapter) as any) + .withWSFn(wsAdapter.openWebSocket) .onDisconnect(() => { onDisconnectPromise.resolve(); }) @@ -285,7 +285,7 @@ describe('DbConnection', () => { const client = DbConnection.builder() .withUri('ws://127.0.0.1:1234') .withDatabaseName('db') - .withWSFn(wsAdapter.createWebSocketFn.bind(wsAdapter) as any) + .withWSFn(wsAdapter.openWebSocket) .build(); await client['wsPromise']; @@ -327,7 +327,7 @@ describe('DbConnection', () => { const client = DbConnection.builder() .withUri('ws://127.0.0.1:1234') .withDatabaseName('db') - .withWSFn(wsAdapter.createWebSocketFn.bind(wsAdapter) as any) + .withWSFn(wsAdapter.openWebSocket) .onConnect(() => { onConnectPromise.resolve(); }) @@ -393,7 +393,7 @@ describe('DbConnection', () => { const client = DbConnection.builder() .withUri('ws://127.0.0.1:1234') .withDatabaseName('db') - .withWSFn(wsAdapter.createWebSocketFn.bind(wsAdapter) as any) + .withWSFn(wsAdapter.openWebSocket) .onConnect(() => { onConnectPromise.resolve(); }) @@ -438,7 +438,7 @@ describe('DbConnection', () => { const client = DbConnection.builder() .withUri('ws://127.0.0.1:1234') .withDatabaseName('db') - .withWSFn(wsAdapter.createWebSocketFn.bind(wsAdapter) as any) + .withWSFn(wsAdapter.openWebSocket) .onConnect(() => { onConnectPromise.resolve(); }) @@ -715,7 +715,7 @@ describe('DbConnection', () => { const client = DbConnection.builder() .withUri('ws://127.0.0.1:1234') .withDatabaseName('db') - .withWSFn(wsAdapter.createWebSocketFn.bind(wsAdapter) as any) + .withWSFn(wsAdapter.openWebSocket) .onConnect(() => {}) .build(); @@ -806,7 +806,7 @@ describe('DbConnection', () => { const client = DbConnection.builder() .withUri('ws://127.0.0.1:1234') .withDatabaseName('db') - .withWSFn(wsAdapter.createWebSocketFn.bind(wsAdapter) as any) + .withWSFn(wsAdapter.openWebSocket) .build(); await client['wsPromise']; wsAdapter.acceptConnection();