From f045de581ee485ddf3b42a5fe5f0130416774f7c Mon Sep 17 00:00:00 2001 From: Rich Chiodo Date: Fri, 7 Oct 2022 16:22:41 -0700 Subject: [PATCH 1/9] Use BroadcastChannel to pass messages instead of parentPort --- sync-api-common/src/browser/connection.ts | 38 ++++++++++++--------- sync-api-common/src/browser/ril.ts | 4 +-- sync-api-common/src/common/connection.ts | 28 +++++++++++++--- sync-api-common/src/node/connection.ts | 40 ++++++++++++----------- sync-api-common/src/node/ril.ts | 4 +-- testbeds/package-lock.json | 13 +++++--- testbeds/python/extension.ts | 4 +-- testbeds/python/worker.ts | 2 +- 8 files changed, 81 insertions(+), 52 deletions(-) diff --git a/sync-api-common/src/browser/connection.ts b/sync-api-common/src/browser/connection.ts index c4fb64ad..f5d18c3c 100644 --- a/sync-api-common/src/browser/connection.ts +++ b/sync-api-common/src/browser/connection.ts @@ -4,42 +4,48 @@ * ------------------------------------------------------------------------------------------ */ import RAL from '../common/ral'; -import { BaseServiceConnection, BaseClientConnection, Message, RequestType } from '../common/connection'; +import { BaseServiceConnection, BaseClientConnection, Message, RequestType, KnownConnectionIds, BroadcastChannelName } from '../common/connection'; export class ClientConnection extends BaseClientConnection { - private readonly port: MessagePort | Worker | DedicatedWorkerGlobalScope; + private readonly channel: BroadcastChannel; - constructor(port: MessagePort | Worker | DedicatedWorkerGlobalScope) { - super(); - this.port = port; - this.port.onmessage = ((event: MessageEvent) => { - this.handleMessage(event.data); + constructor() { + super(self.location.pathname); + this.channel = new BroadcastChannel(BroadcastChannelName); + this.channel.addEventListener('message', (ev: MessageEvent) => { + try { + if (ev.data.dest === this.connectionId || ev.data.dest === KnownConnectionIds.All) { + this.handleMessage(ev.data); + } + } catch (error) { + RAL().console.error(error); + } }); } protected postMessage(sharedArrayBuffer: SharedArrayBuffer) { - this.port.postMessage(sharedArrayBuffer); + this.channel.postMessage(sharedArrayBuffer); } } export class ServiceConnection extends BaseServiceConnection { - private readonly port: MessagePort | Worker | DedicatedWorkerGlobalScope; + private readonly channel: BroadcastChannel; - constructor(port: MessagePort | Worker | DedicatedWorkerGlobalScope) { - super(); - this.port = port; - this.port.onmessage = (async (event: MessageEvent) => { + constructor() { + super(KnownConnectionIds.Main); + this.channel = new BroadcastChannel(BroadcastChannelName); + this.channel.addEventListener('message', async (ev: MessageEvent) => { try { - await this.handleMessage(event.data); + await this.handleMessage(ev.data); } catch (error) { RAL().console.error(error); } }); } - protected postMessage(message: Message): void { - this.port.postMessage(message); + protected postMessage(message: Message) { + this.channel.postMessage(message); } } \ No newline at end of file diff --git a/sync-api-common/src/browser/ril.ts b/sync-api-common/src/browser/ril.ts index d8b92c4d..3ced3dff 100644 --- a/sync-api-common/src/browser/ril.ts +++ b/sync-api-common/src/browser/ril.ts @@ -21,7 +21,7 @@ class TestServiceConnection { @@ -60,7 +60,7 @@ const _ril: RIL = Object.freeze({ $testing: Object.freeze({ ClientConnection: Object.freeze({ create() { - return new ClientConnection(self); + return new ClientConnection(); } }), ServiceConnection: Object.freeze({ diff --git a/sync-api-common/src/common/connection.ts b/sync-api-common/src/common/connection.ts index f7e2dc10..26fea660 100644 --- a/sync-api-common/src/common/connection.ts +++ b/sync-api-common/src/common/connection.ts @@ -13,6 +13,7 @@ export type u64 = number; export type size = u32; export type Message = { + dest: string; method: string; params?: Params; }; @@ -53,6 +54,8 @@ export type Params = { export type Request = { id: number; + src: string; + dest: string; } & Message; export namespace Request { @@ -85,6 +88,13 @@ export type RequestType = MessageType & ({ result?: TypedArray | object | null; }); +export const BroadcastChannelName = `@vscode/sync-api`; + +export enum KnownConnectionIds { + Main = 'main', + All = 'all' +} + class NoResult { public static readonly kind = 0 as const; constructor() { @@ -582,7 +592,7 @@ export abstract class BaseClientConnection; private readyCallbacks: PromiseCallbacks | undefined; - constructor() { + constructor(protected connectionId: string) { this.id = 1; this.textEncoder = RAL().TextEncoder.create(); this.textDecoder = RAL().TextDecoder.create(); @@ -592,6 +602,7 @@ export abstract class BaseClientConnection { + this._sendRequest('$/checkready'); return this.readyPromise; } @@ -599,7 +610,7 @@ export abstract class BaseClientConnection; private readonly requestResults: Map; + private sentReady = false; - constructor() { + constructor(protected readonly connectionId: string) { this.textDecoder = RAL().TextDecoder.create(); this.textEncoder = RAL().TextEncoder.create(); this.requestHandlers = new Map(); @@ -809,6 +821,11 @@ export abstract class BaseServiceConnection extends BaseClientConnection { - private readonly port: MessagePort | Worker; + private readonly channel: BroadcastChannel; - constructor(port: MessagePort | Worker) { - super(); - this.port = port; - this.port.on('message', (message: Message) => { + constructor() { + super(isMainThread ? KnownConnectionIds.Main : threadId.toString()); + this.channel = new BroadcastChannel(BroadcastChannelName); + this.channel.onmessage = (message: any) => { try { - this.handleMessage(message); + if (message.data.dest === this.connectionId || message.data.dest === KnownConnectionIds.All) { + this.handleMessage(message.data); + } } catch (error) { RAL().console.error(error); } - }); + }; } protected postMessage(sharedArrayBuffer: SharedArrayBuffer) { - this.port.postMessage(sharedArrayBuffer); + this.channel.postMessage(sharedArrayBuffer); } } export class ServiceConnection extends BaseServiceConnection { - private readonly port: MessagePort | Worker; + private readonly channel: BroadcastChannel; - constructor(port: MessagePort | Worker) { - super(); - this.port = port; - this.port.on('message', async (sharedArrayBuffer: SharedArrayBuffer) => { + constructor() { + super(isMainThread ? KnownConnectionIds.Main : threadId.toString()); + this.channel = new BroadcastChannel(BroadcastChannelName); + this.channel.onmessage = async (message: any) => { try { - await this.handleMessage(sharedArrayBuffer); + await this.handleMessage(message.data as SharedArrayBuffer); } catch (error) { RAL().console.error(error); } - }); + }; } - protected postMessage(message: Message): void { - this.port.postMessage(message); + protected postMessage(message: Message) { + this.channel.postMessage(message); } } \ No newline at end of file diff --git a/sync-api-common/src/node/ril.ts b/sync-api-common/src/node/ril.ts index e8c29ec9..eaf305d2 100644 --- a/sync-api-common/src/node/ril.ts +++ b/sync-api-common/src/node/ril.ts @@ -18,7 +18,7 @@ class TestServiceConnection { @@ -63,7 +63,7 @@ const _ril: RIL = Object.freeze({ if (!parentPort) { throw new Error(`No parent port defined. Shouldn't happen in test setup`); } - return new ClientConnection(parentPort); + return new ClientConnection(); } }), ServiceConnection: Object.freeze({ diff --git a/testbeds/package-lock.json b/testbeds/package-lock.json index e4a7544c..ba709ed1 100644 --- a/testbeds/package-lock.json +++ b/testbeds/package-lock.json @@ -10,7 +10,7 @@ "vscode-uri": "3.0.3" }, "devDependencies": { - "@types/vscode": "^1.67.0" + "@types/vscode": "^1.71.0" } }, "../sync-api-client": { @@ -52,9 +52,10 @@ "devDependencies": {} }, "node_modules/@types/vscode": { - "version": "1.68.0", - "dev": true, - "license": "MIT" + "version": "1.72.0", + "resolved": "https://registry.npmjs.org/@types/vscode/-/vscode-1.72.0.tgz", + "integrity": "sha512-WvHluhUo+lQvE3I4wUagRpnkHuysB4qSyOQUyIAS9n9PYMJjepzTUD8Jyks0YeXoPD0UGctjqp2u84/b3v6Ydw==", + "dev": true }, "node_modules/vscode-uri": { "version": "3.0.3", @@ -63,7 +64,9 @@ }, "dependencies": { "@types/vscode": { - "version": "1.68.0", + "version": "1.72.0", + "resolved": "https://registry.npmjs.org/@types/vscode/-/vscode-1.72.0.tgz", + "integrity": "sha512-WvHluhUo+lQvE3I4wUagRpnkHuysB4qSyOQUyIAS9n9PYMJjepzTUD8Jyks0YeXoPD0UGctjqp2u84/b3v6Ydw==", "dev": true }, "vscode-uri": { diff --git a/testbeds/python/extension.ts b/testbeds/python/extension.ts index b3585fe5..dab70cbd 100644 --- a/testbeds/python/extension.ts +++ b/testbeds/python/extension.ts @@ -23,7 +23,7 @@ export async function activate(_context: ExtensionContext) { const key = Date.now(); const worker = new Worker(path.join(__dirname, './worker.js')); - const connection = new ServiceConnection(worker); + const connection = new ServiceConnection(); const apiService = new ApiService('Python Run', connection, { exitHandler: (_rval) => { connectionState.delete(key); @@ -41,7 +41,7 @@ export async function activate(_context: ExtensionContext) { commands.registerCommand('testbed-python.runInteractive', () => { const key = Date.now(); const worker = new Worker(path.join(__dirname, './worker.js')); - const connection = new ServiceConnection(worker); + const connection = new ServiceConnection(); const apiService = new ApiService('Python Shell', connection, { exitHandler: (_rval) => { connectionState.delete(key); diff --git a/testbeds/python/worker.ts b/testbeds/python/worker.ts index 8560a97b..15244160 100644 --- a/testbeds/python/worker.ts +++ b/testbeds/python/worker.ts @@ -15,7 +15,7 @@ if (parentPort === null) { process.exit(); } -const connection = new ClientConnection(parentPort); +const connection = new ClientConnection(); connection.serviceReady().then(async (params) => { const name = 'Python Shell'; const apiClient = new ApiClient(connection); From 98a1afc96b35c4334a171556f6ed4ee38ff49304 Mon Sep 17 00:00:00 2001 From: rchiodo Date: Fri, 7 Oct 2022 17:15:18 -0700 Subject: [PATCH 2/9] Path to get test bed working --- testbeds/python/worker.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testbeds/python/worker.ts b/testbeds/python/worker.ts index 15244160..04877e76 100644 --- a/testbeds/python/worker.ts +++ b/testbeds/python/worker.ts @@ -37,7 +37,7 @@ connection.serviceReady().then(async (params) => { mapDir.push({ name: path.posix.join(path.posix.sep, 'workspaces', folder.name), uri: folder.uri }); } } - const pythonRoot = URI.file(`/home/dirkb/Projects/dbaeumer/python-3.11.0rc`); + const pythonRoot = URI.file(`/home/rich/Python-3.11.0b5-wasm32-wasi-16`); mapDir.push({ name: path.posix.sep, uri: pythonRoot }); const exitHandler = (rval: number): void => { apiClient.process.procExit(rval); From 60bc06863a6c82139c3628aa841b65ae2d194fb5 Mon Sep 17 00:00:00 2001 From: rchiodo Date: Mon, 10 Oct 2022 11:14:08 -0700 Subject: [PATCH 3/9] Make sure to dispose broadcast channel --- sync-api-common/src/browser/connection.ts | 46 ++++++++++++------- sync-api-common/src/browser/ril.ts | 1 + sync-api-common/src/common/connection.ts | 6 +++ sync-api-common/src/common/test/tests.ts | 1 + .../src/common/test/workers/tests.ts | 4 +- sync-api-common/src/node/connection.ts | 17 ++++++- sync-api-common/src/node/ril.ts | 2 +- 7 files changed, 57 insertions(+), 20 deletions(-) diff --git a/sync-api-common/src/browser/connection.ts b/sync-api-common/src/browser/connection.ts index f5d18c3c..2d814202 100644 --- a/sync-api-common/src/browser/connection.ts +++ b/sync-api-common/src/browser/connection.ts @@ -13,20 +13,27 @@ export class ClientConnection { - try { - if (ev.data.dest === this.connectionId || ev.data.dest === KnownConnectionIds.All) { - this.handleMessage(ev.data); - } - } catch (error) { - RAL().console.error(error); - } - }); + this.channel.addEventListener('message', this._handleMessageEvent.bind(this)); + } + + dispose() { + this.channel.removeEventListener('message', this._handleMessageEvent.bind(this)); + this.channel.close(); } protected postMessage(sharedArrayBuffer: SharedArrayBuffer) { this.channel.postMessage(sharedArrayBuffer); } + + _handleMessageEvent(ev: MessageEvent) { + try { + if (ev.data.dest === this.connectionId || ev.data.dest === KnownConnectionIds.All) { + this.handleMessage(ev.data); + } + } catch (error) { + RAL().console.error(error); + } + } } export class ServiceConnection extends BaseServiceConnection { @@ -36,16 +43,23 @@ export class ServiceConnection { - try { - await this.handleMessage(ev.data); - } catch (error) { - RAL().console.error(error); - } - }); + this.channel.addEventListener('message', this._handleMessageEvent.bind(this)); + } + + dispose() { + this.channel.removeEventListener('message', this._handleMessageEvent.bind(this)); + this.channel.close(); } protected postMessage(message: Message) { this.channel.postMessage(message); } + + async _handleMessageEvent(ev: MessageEvent) { + try { + await this.handleMessage(ev.data); + } catch (error) { + RAL().console.error(error); + } + } } \ No newline at end of file diff --git a/sync-api-common/src/browser/ril.ts b/sync-api-common/src/browser/ril.ts index 3ced3dff..da4968ac 100644 --- a/sync-api-common/src/browser/ril.ts +++ b/sync-api-common/src/browser/ril.ts @@ -26,6 +26,7 @@ class TestServiceConnection { this.worker.terminate(); + this.dispose(); return Promise.resolve(0); } } diff --git a/sync-api-common/src/common/connection.ts b/sync-api-common/src/common/connection.ts index 26fea660..32eeb28b 100644 --- a/sync-api-common/src/common/connection.ts +++ b/sync-api-common/src/common/connection.ts @@ -582,6 +582,7 @@ export class RPCError extends Error { export interface ClientConnection { readonly sendRequest: SendRequestSignatures; serviceReady(): Promise; + dispose(): void; } export abstract class BaseClientConnection implements ClientConnection { @@ -732,6 +733,8 @@ export abstract class BaseClientConnection { readonly onRequest: HandleRequestSignatures; signalReady(): void; + dispose(): void; } export abstract class BaseServiceConnection implements ServiceConnection { @@ -896,4 +900,6 @@ export abstract class BaseServiceConnection { try { - if (message.data.dest === this.connectionId || message.data.dest === KnownConnectionIds.All) { + if (message.data?.dest === this.connectionId || message.data?.dest === KnownConnectionIds.All) { this.handleMessage(message.data); } } catch (error) { @@ -26,6 +26,11 @@ export class ClientConnection {}; + this.channel.close(); + } + protected postMessage(sharedArrayBuffer: SharedArrayBuffer) { this.channel.postMessage(sharedArrayBuffer); } @@ -40,13 +45,21 @@ export class ServiceConnection { try { - await this.handleMessage(message.data as SharedArrayBuffer); + // Skip broadcast messages that aren't SharedArrayBuffers + if (message.data?.byteLength) { + await this.handleMessage(message.data as SharedArrayBuffer); + } } catch (error) { RAL().console.error(error); } }; } + dispose() { + this.channel.onmessage = () => {}; + this.channel.close(); + } + protected postMessage(message: Message) { this.channel.postMessage(message); } diff --git a/sync-api-common/src/node/ril.ts b/sync-api-common/src/node/ril.ts index eaf305d2..dc769b51 100644 --- a/sync-api-common/src/node/ril.ts +++ b/sync-api-common/src/node/ril.ts @@ -2,7 +2,6 @@ * Copyright (c) Microsoft Corporation. All rights reserved. * Licensed under the MIT License. See License.txt in the project root for license information. * ------------------------------------------------------------------------------------------ */ -import * as path from 'path'; import { TextDecoder } from 'util'; import { parentPort, Worker } from 'worker_threads'; @@ -22,6 +21,7 @@ class TestServiceConnection { + this.dispose(); return this.worker.terminate(); } } From 03d2db6996276caad96e44932cd9dbcb03afd22b Mon Sep 17 00:00:00 2001 From: rchiodo Date: Mon, 10 Oct 2022 11:20:43 -0700 Subject: [PATCH 4/9] Allow for multiple 'channels' --- sync-api-common/src/browser/connection.ts | 8 ++++---- sync-api-common/src/common/connection.ts | 2 +- sync-api-common/src/node/connection.ts | 8 ++++---- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/sync-api-common/src/browser/connection.ts b/sync-api-common/src/browser/connection.ts index 2d814202..08ca79b5 100644 --- a/sync-api-common/src/browser/connection.ts +++ b/sync-api-common/src/browser/connection.ts @@ -10,9 +10,9 @@ export class ClientConnection { try { if (message.data?.dest === this.connectionId || message.data?.dest === KnownConnectionIds.All) { @@ -40,9 +40,9 @@ export class ServiceConnection { try { // Skip broadcast messages that aren't SharedArrayBuffers From dcd971a23cec134a747002fe19a4d578b54d387d Mon Sep 17 00:00:00 2001 From: rchiodo Date: Tue, 11 Oct 2022 11:39:21 -0700 Subject: [PATCH 5/9] Browser tests should handle messages like node --- sync-api-common/src/browser/connection.ts | 4 +++- sync-api-tests/src/api.test.ts | 3 +++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/sync-api-common/src/browser/connection.ts b/sync-api-common/src/browser/connection.ts index 08ca79b5..1287ec72 100644 --- a/sync-api-common/src/browser/connection.ts +++ b/sync-api-common/src/browser/connection.ts @@ -57,7 +57,9 @@ export class ServiceConnection string, scheme: }); connection.signalReady(); }); + + connection.terminate(); + if (assertionError !== undefined) { throw new assert.AssertionError(assertionError); } From b57068cb549d3d10267a84126df9f553b79c39f4 Mon Sep 17 00:00:00 2001 From: rchiodo Date: Wed, 12 Oct 2022 14:45:42 -0700 Subject: [PATCH 6/9] Another idea that doesn't work either :( --- build/bin/runBrowserTests.js | 3 +- sync-api-common/src/browser/connection.ts | 56 +++++++------ sync-api-common/src/browser/ril.ts | 1 + sync-api-common/src/common/connection.ts | 24 +++++- sync-api-common/src/common/test/tsconfig.json | 2 +- .../src/common/test/workers/tests.ts | 3 +- sync-api-common/src/node/connection.ts | 78 +++++++++++-------- sync-api-common/tsconfig.base.json | 2 +- sync-api-tests/tsconfig.base.json | 2 +- 9 files changed, 107 insertions(+), 64 deletions(-) diff --git a/build/bin/runBrowserTests.js b/build/bin/runBrowserTests.js index f20b7a37..6aed1643 100644 --- a/build/bin/runBrowserTests.js +++ b/build/bin/runBrowserTests.js @@ -77,9 +77,10 @@ async function runTests(location) { }); server.listen(8080, '127.0.0.1', async () => { let failCount = 0; - const browser = await playwright['chromium'].launch({ headless: true, devtools: false }); + const browser = await playwright['chromium'].launch({ headless: true, devtools: true }); const context = await browser.newContext(); const page = await context.newPage(); + page.on('console', msg => console.log('LOG FROM INSIDE PAGE: ', msg)) const emitter = new events.EventEmitter(); emitter.on('fail', () => { failCount++; diff --git a/sync-api-common/src/browser/connection.ts b/sync-api-common/src/browser/connection.ts index 1287ec72..66f159ea 100644 --- a/sync-api-common/src/browser/connection.ts +++ b/sync-api-common/src/browser/connection.ts @@ -8,60 +8,70 @@ import { BaseServiceConnection, BaseClientConnection, Message, RequestType, Know export class ClientConnection extends BaseClientConnection { - private readonly channel: BroadcastChannel; + private readonly broadcastChannel: BroadcastChannel; + private readonly messageChannel: MessageChannel; + private readonly sendPort: MessagePort; constructor(channelName: string = BroadcastChannelName) { super(self.location.pathname); - this.channel = new BroadcastChannel(channelName); - this.channel.addEventListener('message', this._handleMessageEvent.bind(this)); + console.log(`Creating client with name ${channelName} with origin ${origin}`); + this.broadcastChannel = new BroadcastChannel(channelName); + this.messageChannel = new MessageChannel(); + this.sendPort = this.messageChannel.port1; + this.sendPort.addEventListener('message', this._handleMessage.bind(this)); + this.broadcastChannel.postMessage(this.createPortBroadcastMessage(this.messageChannel.port2)); } dispose() { - this.channel.removeEventListener('message', this._handleMessageEvent.bind(this)); - this.channel.close(); + this.sendPort.removeEventListener('message', this._handleMessage.bind(this)); + this.messageChannel.port1.close(); + this.messageChannel.port2.close(); + this.broadcastChannel.close(); } - protected postMessage(sharedArrayBuffer: SharedArrayBuffer) { - this.channel.postMessage(sharedArrayBuffer); + protected override postMessage(sharedArrayBuffer: SharedArrayBuffer): void { + this.sendPort.postMessage(sharedArrayBuffer); } - _handleMessageEvent(ev: MessageEvent) { + private _handleMessage(message: any) { try { - if (ev.data.dest === this.connectionId || ev.data.dest === KnownConnectionIds.All) { - this.handleMessage(ev.data); + if (message.data?.dest === this.connectionId || message.data?.dest === KnownConnectionIds.All) { + this.handleMessage(message.data); } } catch (error) { RAL().console.error(error); } + } } export class ServiceConnection extends BaseServiceConnection { - private readonly channel: BroadcastChannel; + private readonly broadcastChannel: BroadcastChannel; + private readonly clientPorts: Map; constructor(channelName: string = BroadcastChannelName) { super(KnownConnectionIds.Main); - this.channel = new BroadcastChannel(channelName); - this.channel.addEventListener('message', this._handleMessageEvent.bind(this)); + console.log(`Creating server with name ${channelName} with origin ${origin}`); + this.broadcastChannel = new BroadcastChannel(channelName); + this.clientPorts = new Map(); + this.broadcastChannel.addEventListener('message', this.handleBroadcastMessage.bind(this)); } dispose() { - this.channel.removeEventListener('message', this._handleMessageEvent.bind(this)); - this.channel.close(); + this.clientPorts.clear(); + this.broadcastChannel.removeEventListener('message', this.handleBroadcastMessage.bind(this)); + this.broadcastChannel.close(); } protected postMessage(message: Message) { - this.channel.postMessage(message); + const clientPort = this.clientPorts.get(message.dest); + clientPort?.postMessage(message); } - async _handleMessageEvent(ev: MessageEvent) { - try { - if (ev.data?.byteLength) { - await this.handleMessage(ev.data); - } - } catch (error) { - RAL().console.error(error); + protected onBroadcastPort(message: Message): void { + if (message.params && message.src && message.params.port) { + this.clientPorts.set(message.src, message.params.port as MessagePort); } } } \ No newline at end of file diff --git a/sync-api-common/src/browser/ril.ts b/sync-api-common/src/browser/ril.ts index da4968ac..e13bcb70 100644 --- a/sync-api-common/src/browser/ril.ts +++ b/sync-api-common/src/browser/ril.ts @@ -25,6 +25,7 @@ class TestServiceConnection { + console.log('terminating service connection'); this.worker.terminate(); this.dispose(); return Promise.resolve(0); diff --git a/sync-api-common/src/common/connection.ts b/sync-api-common/src/common/connection.ts index 68cc4dee..0c1b200c 100644 --- a/sync-api-common/src/common/connection.ts +++ b/sync-api-common/src/common/connection.ts @@ -14,6 +14,7 @@ export type size = u32; export type Message = { dest: string; + src: string; method: string; params?: Params; }; @@ -592,7 +593,6 @@ export abstract class BaseClientConnection; private readyCallbacks: PromiseCallbacks | undefined; - constructor(protected connectionId: string) { this.id = 1; this.textEncoder = RAL().TextEncoder.create(); @@ -607,6 +607,10 @@ export abstract class BaseClientConnection = this._sendRequest as SendRequestSignatures; private _sendRequest(method: string, arg1?: Params | ResultType | number, arg2?: ResultType | number, arg3?: number): { errno: 0; data: any } | { errno: RPCErrno } { @@ -674,6 +678,7 @@ export abstract class BaseClientConnection { const header = new Uint32Array(sharedArrayBuffer, SyncSize.total, HeaderSize.total / 4); const requestOffset = header[HeaderIndex.messageOffset]; @@ -811,7 +823,9 @@ export abstract class BaseServiceConnection) => void): Promise { const connection = RAL().$testing.ClientConnection.create()!; - await connection.serviceReady(); try { + console.log('Awaiting ready in the client'); + await connection.serviceReady(); test(connection); } catch (error) { if (error instanceof assert.AssertionError) { diff --git a/sync-api-common/src/node/connection.ts b/sync-api-common/src/node/connection.ts index 1288df19..42390c08 100644 --- a/sync-api-common/src/node/connection.ts +++ b/sync-api-common/src/node/connection.ts @@ -3,64 +3,78 @@ * Licensed under the MIT License. See License.txt in the project root for license information. * ------------------------------------------------------------------------------------------ */ -import { BroadcastChannel, isMainThread, threadId } from 'worker_threads'; +import { BroadcastChannel, isMainThread, MessageChannel, MessagePort, threadId } from 'worker_threads'; import RAL from '../common/ral'; -import { BaseServiceConnection, BaseClientConnection, Message, RequestType, BroadcastChannelName, KnownConnectionIds } from '../common/connection'; +import { BaseServiceConnection, BaseClientConnection, Message, RequestType, BroadcastChannelName, KnownConnectionIds, Notification } from '../common/connection'; export class ClientConnection extends BaseClientConnection { - private readonly channel: BroadcastChannel; + private readonly broadcastChannel: BroadcastChannel; + private readonly messageChannel: MessageChannel; + private readonly sendPort: MessagePort; constructor(channelName: string = BroadcastChannelName) { super(isMainThread ? KnownConnectionIds.Main : threadId.toString()); - this.channel = new BroadcastChannel(channelName); - this.channel.onmessage = (message: any) => { - try { - if (message.data?.dest === this.connectionId || message.data?.dest === KnownConnectionIds.All) { - this.handleMessage(message.data); - } - } catch (error) { - RAL().console.error(error); - } - }; + this.broadcastChannel = new BroadcastChannel(channelName); + this.messageChannel = new MessageChannel(); + this.sendPort = this.messageChannel.port1; + this.sendPort.on('message', this._handleMessage.bind(this)); + + // Need to send the port as transfer item, but official api doesn't support that. Make it type unsafe + // to send the extra data + const broadcastMessage = this.broadcastChannel.postMessage.bind(this.broadcastChannel) as any; + broadcastMessage(this.createPortBroadcastMessage(this.messageChannel.port2), [this.messageChannel.port2]); } dispose() { - this.channel.onmessage = () => {}; - this.channel.close(); + this.messageChannel.port1.close(); + this.messageChannel.port2.close(); + this.broadcastChannel.close(); } - protected postMessage(sharedArrayBuffer: SharedArrayBuffer) { - this.channel.postMessage(sharedArrayBuffer); + protected override postMessage(sharedArrayBuffer: SharedArrayBuffer): void { + this.sendPort.postMessage(sharedArrayBuffer); + } + + private _handleMessage(message: any) { + try { + if (message.data?.dest === this.connectionId || message.data?.dest === KnownConnectionIds.All) { + this.handleMessage(message.data); + } + } catch (error) { + RAL().console.error(error); + } + } } export class ServiceConnection extends BaseServiceConnection { - private readonly channel: BroadcastChannel; + private readonly broadcastChannel: BroadcastChannel; + private readonly clientPorts: Map; constructor(channelName: string = BroadcastChannelName) { super(isMainThread ? KnownConnectionIds.Main : threadId.toString()); - this.channel = new BroadcastChannel(channelName); - this.channel.onmessage = async (message: any) => { - try { - // Skip broadcast messages that aren't SharedArrayBuffers - if (message.data?.byteLength) { - await this.handleMessage(message.data as SharedArrayBuffer); - } - } catch (error) { - RAL().console.error(error); - } - }; + this.broadcastChannel = new BroadcastChannel(channelName); + this.broadcastChannel.onmessage = this.handleBroadcastMessage.bind(this); + this.clientPorts = new Map(); } dispose() { - this.channel.onmessage = () => {}; - this.channel.close(); + this.clientPorts.clear(); + this.broadcastChannel.onmessage = () => {}; + this.broadcastChannel.close(); } protected postMessage(message: Message) { - this.channel.postMessage(message); + const clientPort = this.clientPorts.get(message.dest); + clientPort?.postMessage(message); + } + + protected onBroadcastPort(message: Message): void { + if (message.params && message.src && message.params.port) { + this.clientPorts.set(message.src, message.params.port as MessagePort); + } } } \ No newline at end of file diff --git a/sync-api-common/tsconfig.base.json b/sync-api-common/tsconfig.base.json index 3924b7d9..a2c5c87d 100644 --- a/sync-api-common/tsconfig.base.json +++ b/sync-api-common/tsconfig.base.json @@ -7,7 +7,7 @@ "declaration": true, "stripInternal": true, "target": "es2020", - "lib": [ "es2020" ], + "lib": [ "es2020", "DOM" ], "module": "Node16", "moduleResolution": "Node16", } diff --git a/sync-api-tests/tsconfig.base.json b/sync-api-tests/tsconfig.base.json index 3407ef0b..3376de3f 100644 --- a/sync-api-tests/tsconfig.base.json +++ b/sync-api-tests/tsconfig.base.json @@ -7,7 +7,7 @@ "declaration": true, "stripInternal": true, "target": "es2020", - "lib": [ "es2020" ], + "lib": [ "es2020", "DOM" ], "module": "Node16", "moduleResolution": "Node16", "types": [ From b67db2c4ae7ddd38b79dac38088218b810ed078a Mon Sep 17 00:00:00 2001 From: rchiodo Date: Wed, 12 Oct 2022 15:33:12 -0700 Subject: [PATCH 7/9] Hack to get working in node --- .../src/common/test/workers/tests.ts | 1 - sync-api-common/src/node/connection.ts | 26 ++++++++++++------- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/sync-api-common/src/common/test/workers/tests.ts b/sync-api-common/src/common/test/workers/tests.ts index e9fad29d..e3b922ec 100644 --- a/sync-api-common/src/common/test/workers/tests.ts +++ b/sync-api-common/src/common/test/workers/tests.ts @@ -28,7 +28,6 @@ export function assertResult(result: { errno: 0; data: TypedArray } | { errno: R export async function runSingle(test: (connection: ClientConnection) => void): Promise { const connection = RAL().$testing.ClientConnection.create()!; try { - console.log('Awaiting ready in the client'); await connection.serviceReady(); test(connection); } catch (error) { diff --git a/sync-api-common/src/node/connection.ts b/sync-api-common/src/node/connection.ts index 42390c08..39fcad88 100644 --- a/sync-api-common/src/node/connection.ts +++ b/sync-api-common/src/node/connection.ts @@ -21,10 +21,11 @@ export class ClientConnection c.postMessage(message)); + } else { + const clientPort = this.clientPorts.get(message.dest); + clientPort?.postMessage(message); + } } protected onBroadcastPort(message: Message): void { if (message.params && message.src && message.params.port) { + const messagePort = message.params.port as MessagePort; + messagePort.on('message', this.handleMessage.bind(this)); this.clientPorts.set(message.src, message.params.port as MessagePort); } } From 394f0e6b351a26efc1815c7645d728fc4d7daba1 Mon Sep 17 00:00:00 2001 From: rchiodo Date: Wed, 12 Oct 2022 15:35:12 -0700 Subject: [PATCH 8/9] Add comment about node specific implementation --- sync-api-common/src/node/connection.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sync-api-common/src/node/connection.ts b/sync-api-common/src/node/connection.ts index 39fcad88..e8e7e365 100644 --- a/sync-api-common/src/node/connection.ts +++ b/sync-api-common/src/node/connection.ts @@ -22,7 +22,8 @@ export class ClientConnection Date: Wed, 12 Oct 2022 15:44:36 -0700 Subject: [PATCH 9/9] Same trick doesn't seem to work for browser --- sync-api-common/src/browser/connection.ts | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/sync-api-common/src/browser/connection.ts b/sync-api-common/src/browser/connection.ts index 66f159ea..08ddb7a4 100644 --- a/sync-api-common/src/browser/connection.ts +++ b/sync-api-common/src/browser/connection.ts @@ -19,7 +19,10 @@ export class ClientConnection c.postMessage(message)); + } else { + const clientPort = this.clientPorts.get(message.dest); + clientPort?.postMessage(message); + } } protected onBroadcastPort(message: Message): void { if (message.params && message.src && message.params.port) { + const messagePort = message.params.port as MessagePort; + messagePort.addEventListener('message', this._handleClientMessage.bind(this)); this.clientPorts.set(message.src, message.params.port as MessagePort); } } + + private _handleClientMessage(ev: MessageEvent) { + if (ev.data?.byteLength) { + this.handleMessage(ev.data); + } + } } \ No newline at end of file