From e31b66991da7855f0057ccb51e5ed0057abf03d0 Mon Sep 17 00:00:00 2001 From: Oleksandr Brezhniev Date: Thu, 11 Sep 2025 00:46:53 +0100 Subject: [PATCH 1/9] Remove multiExp chunking - it works faster without it. Pass array buffers to worker threads instead of arrays (make it compatible with SharedArrayBuffer) --- src/engine_multiexp.js | 48 +++++++++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/src/engine_multiexp.js b/src/engine_multiexp.js index 38d813f..002c828 100644 --- a/src/engine_multiexp.js +++ b/src/engine_multiexp.js @@ -53,11 +53,15 @@ export default function buildMultiexp(curve, groupName) { const bitChunkSize = pTSizes[log2(nPoints)]; const nChunks = Math.floor((sScalar*8 - 1) / bitChunkSize) +1; + console.log("buffBases len", buffBases.byteLength, "sGIn", sGIn, "nPoints", nPoints); + console.log("buffScalars len", buffScalars.byteLength, "sScalar", sScalar); + console.log("bitChunkSize", bitChunkSize, "nChunks", nChunks); + const opPromises = []; for (let i=0; iMAX_CHUNK_SIZE) chunkSize = MAX_CHUNK_SIZE; - if (chunkSizeMAX_CHUNK_SIZE) chunkSize = MAX_CHUNK_SIZE; + // if (chunkSize { + // if (logger) logger.debug(`Multiexp end: ${logText}: ${i}/${nPoints}`); + // return r; + // })); + // } const opPromises = []; - for (let i=0; i { - if (logger) logger.debug(`Multiexp end: ${logText}: ${i}/${nPoints}`); - return r; - })); - } + opPromises.push(_multiExpChunk(buffBases, buffScalars, inType, logger, logText).then( (r) => { + if (logger) logger.debug(`Multiexp end: ${logText}: ${nPoints}`); + return r; + })); const result = await Promise.all(opPromises); From 1d086533f3f7e6c6e636d3fb14722355a95155cd Mon Sep 17 00:00:00 2001 From: Oleksandr Brezhniev Date: Thu, 11 Sep 2025 00:49:04 +0100 Subject: [PATCH 2/9] Create SharedArrayBuffers inside BigBuffer --- src/bigbuffer.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/bigbuffer.js b/src/bigbuffer.js index 1064a02..6900d57 100644 --- a/src/bigbuffer.js +++ b/src/bigbuffer.js @@ -8,7 +8,8 @@ export default class BigBuffer { this.byteLength = size; for (let i=0; i Date: Sat, 13 Sep 2025 01:59:10 +0100 Subject: [PATCH 3/9] Increase max size for underlying buffer of BigBuffer when run in nodejs --- src/bigbuffer.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bigbuffer.js b/src/bigbuffer.js index 6900d57..e2247e3 100644 --- a/src/bigbuffer.js +++ b/src/bigbuffer.js @@ -1,5 +1,5 @@ -const PAGE_SIZE = 1<<30; +const PAGE_SIZE = ( typeof Buffer !== "undefined" && Buffer.constants && Buffer.constants.MAX_LENGTH ) ? Buffer.constants.MAX_LENGTH : (1 << 30); export default class BigBuffer { From eb5ffe10e3990c89ac1a5e6e88e5b8229aa9af96 Mon Sep 17 00:00:00 2001 From: Oleksandr Brezhniev Date: Sat, 13 Sep 2025 02:02:36 +0100 Subject: [PATCH 4/9] Fallback to old slicing logic when buffers are not SharedArrayBuffers --- src/engine_multiexp.js | 57 +++++++++++++++++++++++++----------------- 1 file changed, 34 insertions(+), 23 deletions(-) diff --git a/src/engine_multiexp.js b/src/engine_multiexp.js index 002c828..243e57b 100644 --- a/src/engine_multiexp.js +++ b/src/engine_multiexp.js @@ -120,31 +120,42 @@ export default function buildMultiexp(curve, groupName) { throw new Error("Scalar size does not match"); } - // const bitChunkSize = pTSizes[log2(nPoints)]; - // const nChunks = Math.floor((sScalar*8 - 1) / bitChunkSize) +1; - // - // let chunkSize; - // chunkSize = Math.floor(nPoints / (tm.concurrency /nChunks)); - // if (chunkSize>MAX_CHUNK_SIZE) chunkSize = MAX_CHUNK_SIZE; - // if (chunkSize { - // if (logger) logger.debug(`Multiexp end: ${logText}: ${i}/${nPoints}`); - // return r; - // })); - // } + console.log("buffBases.buffer instanceof SharedArrayBuffer", buffBases.buffer instanceof SharedArrayBuffer); + console.log("buffScalars.buffer instanceof SharedArrayBuffer", buffScalars.buffer instanceof SharedArrayBuffer); const opPromises = []; - opPromises.push(_multiExpChunk(buffBases, buffScalars, inType, logger, logText).then( (r) => { - if (logger) logger.debug(`Multiexp end: ${logText}: ${nPoints}`); - return r; - })); + if (buffBases.buffer + && (buffBases.buffer instanceof SharedArrayBuffer) + && buffScalars.buffer + && (buffScalars.buffer instanceof SharedArrayBuffer) + ) + { + // If we are working with SharedArrayBuffers, we can do it in one chunk because memory is shared between threads + if (logger) logger.debug(`Multiexp start: ${logText}: ${nPoints}`); + opPromises.push(_multiExpChunk(buffBases, buffScalars, inType, logger, logText).then( (r) => { + if (logger) logger.debug(`Multiexp end: ${logText}: ${nPoints}`); + return r; + })); + } else { + const bitChunkSize = pTSizes[log2(nPoints)]; + const nChunks = Math.floor((sScalar*8 - 1) / bitChunkSize) +1; + + let chunkSize; + chunkSize = Math.floor(nPoints / (tm.concurrency /nChunks)); + if (chunkSize>MAX_CHUNK_SIZE) chunkSize = MAX_CHUNK_SIZE; + if (chunkSize { + if (logger) logger.debug(`Multiexp end: ${logText}: ${i}/${nPoints}`); + return r; + })); + } + } const result = await Promise.all(opPromises); From ee037e62b3d65bc3fd106117cd3cb4b7e5614da3 Mon Sep 17 00:00:00 2001 From: Oleksandr Brezhniev Date: Mon, 22 Sep 2025 02:41:22 +0100 Subject: [PATCH 5/9] Error handling in workers. Terminate on worker error. Linter fixes --- src/threadman.js | 19 ++++++++++++++----- src/threadman_thread.js | 25 +++++++++++++++---------- 2 files changed, 29 insertions(+), 15 deletions(-) diff --git a/src/threadman.js b/src/threadman.js index f80901b..6f1cc43 100644 --- a/src/threadman.js +++ b/src/threadman.js @@ -110,7 +110,7 @@ export default async function buildThreadManager(wasm, singleThread) { concurrency = os.cpus().length; } - if(concurrency == 0){ + if(concurrency === 0){ concurrency = 2; } @@ -151,6 +151,15 @@ export default async function buildThreadManager(wasm, singleThread) { data = e; } + // handle errors + if (data.error) { + console.log("Worker error", data.error); + + tm.working[i]=false; + tm.pendingDeferreds[i].reject(data.error); + throw new Error(data.error); + } + tm.working[i]=false; tm.pendingDeferreds[i].resolve(data); tm.processWorks(); @@ -166,19 +175,19 @@ export class ThreadManager { } startSyncOp() { - if (this.oldPFree != 0) throw new Error("Sync operation in progress"); + if (this.oldPFree !== 0) throw new Error("Sync operation in progress"); this.oldPFree = this.u32[0]; } endSyncOp() { - if (this.oldPFree == 0) throw new Error("No sync operation in progress"); + if (this.oldPFree === 0) throw new Error("No sync operation in progress"); this.u32[0] = this.oldPFree; this.oldPFree = 0; } postAction(workerId, e, transfers, _deferred) { if (this.working[workerId]) { - throw new Error("Posting a job t a working worker"); + throw new Error("Posting a job to a working worker"); } this.working[workerId] = true; @@ -190,7 +199,7 @@ export class ThreadManager { processWorks() { for (let i=0; (i 0); i++) { - if (this.working[i] == false) { + if (this.working[i] === false) { const work = this.actionQueue.shift(); this.postAction(i, work.data, work.transfers, work.deferred); } diff --git a/src/threadman_thread.js b/src/threadman_thread.js index 09951ba..726dbe9 100644 --- a/src/threadman_thread.js +++ b/src/threadman_thread.js @@ -14,15 +14,20 @@ export default function thread(self) { data = e; } - if (data[0].cmd == "INIT") { - init(data[0]).then(function() { - self.postMessage(data.result); - }); - } else if (data[0].cmd == "TERMINATE") { - self.close(); - } else { - const res = runTask(data); - self.postMessage(res); + try { + if (data[0].cmd === "INIT") { + init(data[0]).then(function() { + self.postMessage(data.result); + }); + } else if (data[0].cmd === "TERMINATE") { + self.close(); + } else { + const res = runTask(data); + self.postMessage(res); + } + } catch (err) { + // Catch any error and send it back to main thread + self.postMessage({error: err.message}); } }; } @@ -72,7 +77,7 @@ export default function thread(self) { } function runTask(task) { - if (task[0].cmd == "INIT") { + if (task[0].cmd === "INIT") { return init(task[0]); } const ctx = { From 8c51af04e233f312ec480d2c7a0b46868943358f Mon Sep 17 00:00:00 2001 From: Oleksandr Brezhniev Date: Mon, 22 Sep 2025 02:52:25 +0100 Subject: [PATCH 6/9] Remove passing shared array buffers in one chunk. Fix nChunks calculation - drastically improve memory usage. Increase min chunk size to 1<<15 (32k) - speed improvement on smaller circuits. Serial chunk processing - better mem usage. Linter fixes --- src/engine_multiexp.js | 109 ++++++++++++++++++++--------------------- 1 file changed, 52 insertions(+), 57 deletions(-) diff --git a/src/engine_multiexp.js b/src/engine_multiexp.js index 243e57b..c29591f 100644 --- a/src/engine_multiexp.js +++ b/src/engine_multiexp.js @@ -23,16 +23,16 @@ export default function buildMultiexp(curve, groupName) { let sGIn; let fnName; - if (groupName == "G1") { - if (inType == "affine") { + if (groupName === "G1") { + if (inType === "affine") { fnName = "g1m_multiexpAffine_chunk"; sGIn = G.F.n8*2; } else { fnName = "g1m_multiexp_chunk"; sGIn = G.F.n8*3; } - } else if (groupName == "G2") { - if (inType == "affine") { + } else if (groupName === "G2") { + if (inType === "affine") { fnName = "g2m_multiexpAffine_chunk"; sGIn = G.F.n8*2; } else { @@ -44,33 +44,30 @@ export default function buildMultiexp(curve, groupName) { } const nPoints = Math.floor(buffBases.byteLength / sGIn); - if (nPoints == 0) return G.zero; + if (nPoints === 0) return G.zero; const sScalar = Math.floor(buffScalars.byteLength / nPoints); - if( sScalar * nPoints != buffScalars.byteLength) { + if( sScalar * nPoints !== buffScalars.byteLength) { throw new Error("Scalar size does not match"); } const bitChunkSize = pTSizes[log2(nPoints)]; const nChunks = Math.floor((sScalar*8 - 1) / bitChunkSize) +1; - console.log("buffBases len", buffBases.byteLength, "sGIn", sGIn, "nPoints", nPoints); - console.log("buffScalars len", buffScalars.byteLength, "sScalar", sScalar); - console.log("bitChunkSize", bitChunkSize, "nChunks", nChunks); const opPromises = []; for (let i=0; i { - if (logger) logger.debug(`Multiexp end: ${logText}: ${nPoints}`); - return r; - })); - } else { - const bitChunkSize = pTSizes[log2(nPoints)]; - const nChunks = Math.floor((sScalar*8 - 1) / bitChunkSize) +1; - - let chunkSize; - chunkSize = Math.floor(nPoints / (tm.concurrency /nChunks)); - if (chunkSize>MAX_CHUNK_SIZE) chunkSize = MAX_CHUNK_SIZE; - if (chunkSize { - if (logger) logger.debug(`Multiexp end: ${logText}: ${i}/${nPoints}`); - return r; - })); - } + const bitChunkSize = pTSizes[log2(nPoints)]; + let nChunks = Math.floor((sScalar*8 - 1) / bitChunkSize) +1; + + if (groupName === "G2") { + // G2 has bigger points, so we reduce chunk size to optimize memory usage + nChunks *= 2; } - const result = await Promise.all(opPromises); + let chunkSize; + //chunkSize = Math.floor(nPoints / (tm.concurrency /nChunks)); + chunkSize = Math.floor(nPoints / nChunks); + if (chunkSize>MAX_CHUNK_SIZE) chunkSize = MAX_CHUNK_SIZE; + if (chunkSize { + // if (logger) logger.debug(`Multiexp end: ${logText}: ${i}/${nPoints}`); + // return r; + // })); + const r = await _multiExpChunk(buffBasesChunk, buffScalarsChunk, inType, logger, logText); + if (logger) logger.debug(`Multiexp end: ${logText}: ${i}/${nPoints}`); + result.push(r); + } + + //result = await Promise.all(opPromises); let res = G.zero; for (let i=result.length-1; i>=0; i--) { From af3b168ed24372b832876a28209a9b9aaf8bb381 Mon Sep 17 00:00:00 2001 From: Oleksandr Brezhniev Date: Thu, 25 Sep 2025 01:32:11 +0100 Subject: [PATCH 7/9] MultiExp: - remove chunking of chunks (removes unneeded copying of the same data to different worker jobs), - make nChunks multiple of tm.concurrency for optimal load balancing - switch back to promises from awaits (allows parallel execution of chunks) - rollback min chunk size - transfer buffer ownership to worker threads (removes memory copying for large arrays!!!) --- src/engine_multiexp.js | 72 ++++++++++++++++++++---------------------- 1 file changed, 35 insertions(+), 37 deletions(-) diff --git a/src/engine_multiexp.js b/src/engine_multiexp.js index c29591f..498e3cd 100644 --- a/src/engine_multiexp.js +++ b/src/engine_multiexp.js @@ -10,6 +10,7 @@ const pTSizes = [ export default function buildMultiexp(curve, groupName) { const G = curve[groupName]; const tm = G.tm; + async function _multiExpChunk(buffBases, buffScalars, inType, logger, logText) { if ( ! (buffBases instanceof Uint8Array) ) { if (logger) logger.error(`${logText} _multiExpChunk buffBases is not Uint8Array`); @@ -25,18 +26,18 @@ export default function buildMultiexp(curve, groupName) { let fnName; if (groupName === "G1") { if (inType === "affine") { - fnName = "g1m_multiexpAffine_chunk"; + fnName = "g1m_multiexpAffine"; sGIn = G.F.n8*2; } else { - fnName = "g1m_multiexp_chunk"; + fnName = "g1m_multiexp"; sGIn = G.F.n8*3; } } else if (groupName === "G2") { if (inType === "affine") { - fnName = "g2m_multiexpAffine_chunk"; + fnName = "g2m_multiexpAffine"; sGIn = G.F.n8*2; } else { - fnName = "g2m_multiexp_chunk"; + fnName = "g2m_multiexp"; sGIn = G.F.n8*3; } } else { @@ -51,30 +52,26 @@ export default function buildMultiexp(curve, groupName) { } const bitChunkSize = pTSizes[log2(nPoints)]; - const nChunks = Math.floor((sScalar*8 - 1) / bitChunkSize) +1; - const opPromises = []; - for (let i=0; iMAX_CHUNK_SIZE) chunkSize = MAX_CHUNK_SIZE; if (chunkSize { - // if (logger) logger.debug(`Multiexp end: ${logText}: ${i}/${nPoints}`); - // return r; - // })); - const r = await _multiExpChunk(buffBasesChunk, buffScalarsChunk, inType, logger, logText); - if (logger) logger.debug(`Multiexp end: ${logText}: ${i}/${nPoints}`); - result.push(r); + opPromises.push(_multiExpChunk(buffBasesChunk, buffScalarsChunk, inType, logger, logText).then((r) => { + if (logger) logger.debug(`Multiexp end: ${logText}: ${i}/${nPoints}`); + return r; + })); } - //result = await Promise.all(opPromises); + result = await Promise.all(opPromises); let res = G.zero; for (let i=result.length-1; i>=0; i--) { From 9ca91e44ab8f06c12a61d2325aa47a48d36f3b67 Mon Sep 17 00:00:00 2001 From: Oleksandr Brezhniev Date: Thu, 25 Sep 2025 01:34:58 +0100 Subject: [PATCH 8/9] Better error handling in threadman --- src/threadman.js | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/threadman.js b/src/threadman.js index 6f1cc43..38f46fc 100644 --- a/src/threadman.js +++ b/src/threadman.js @@ -153,11 +153,9 @@ export default async function buildThreadManager(wasm, singleThread) { // handle errors if (data.error) { - console.log("Worker error", data.error); - tm.working[i]=false; - tm.pendingDeferreds[i].reject(data.error); - throw new Error(data.error); + tm.pendingDeferreds[i].reject("Worker error: " + data.error); + throw new Error("Worker error: " + data.error); } tm.working[i]=false; @@ -198,6 +196,9 @@ export class ThreadManager { } processWorks() { + if (this.workers.length === 0 && this.actionQueue.length > 0) { + throw new Error("No workers initialized"); + } for (let i=0; (i 0); i++) { if (this.working[i] === false) { const work = this.actionQueue.shift(); From 6b39fc12e77cff1b069e1dd7d95afa4018ae67af Mon Sep 17 00:00:00 2001 From: Oleksandr Brezhniev Date: Fri, 26 Sep 2025 14:41:11 +0100 Subject: [PATCH 9/9] Transfer buffer ownership to worker threads everywhere else --- src/engine_applykey.js | 6 ++++-- src/engine_batchconvert.js | 2 +- src/engine_fft.js | 14 +++++++------- src/engine_pairing.js | 2 +- src/wasm_field1.js | 2 +- 5 files changed, 14 insertions(+), 12 deletions(-) diff --git a/src/engine_applykey.js b/src/engine_applykey.js index 5857d8c..294c9ff 100644 --- a/src/engine_applykey.js +++ b/src/engine_applykey.js @@ -64,10 +64,12 @@ export default function buildBatchApplyKey(curve, groupName) { const task = []; + const b = buff.slice(i*pointsPerChunk*sGin, i*pointsPerChunk*sGin + n*sGin); + task.push({ cmd: "ALLOCSET", var: 0, - buff: buff.slice(i*pointsPerChunk*sGin, i*pointsPerChunk*sGin + n*sGin) + buff: b }); task.push({cmd: "ALLOCSET", var: 1, buff: t}); task.push({cmd: "ALLOCSET", var: 2, buff: inc}); @@ -96,7 +98,7 @@ export default function buildBatchApplyKey(curve, groupName) { } task.push({cmd: "GET", out: 0, var: 3, len: n*sGout}); - opPromises.push(tm.queueAction(task)); + opPromises.push(tm.queueAction(task, [b.buffer])); t = Fr.mul(t, Fr.exp(inc, n)); } diff --git a/src/engine_batchconvert.js b/src/engine_batchconvert.js index 0acf524..1adb3bf 100644 --- a/src/engine_batchconvert.js +++ b/src/engine_batchconvert.js @@ -29,7 +29,7 @@ export default function buildBatchConvert(tm, fnName, sIn, sOut) { {cmd: "GET", out: 0, var: 1, len:sOut * n}, ]; opPromises.push( - tm.queueAction(task) + tm.queueAction(task, [buffChunk.buffer]) ); } diff --git a/src/engine_fft.js b/src/engine_fft.js index 5ae955b..2dae88d 100644 --- a/src/engine_fft.js +++ b/src/engine_fft.js @@ -146,7 +146,7 @@ export default function buildFFT(curve, groupName) { } else { task.push({cmd: "GET", out:0, var: 0, len: sMid*pointsInChunk}); } - promises.push(tm.queueAction(task).then( (r) => { + promises.push(tm.queueAction(task, [buffChunk.buffer]).then( (r) => { if (logger) logger.debug(`${loggerTxt}: fft ${bits} mix end: ${i}/${nChunks}`); return r; })); @@ -203,7 +203,7 @@ export default function buildFFT(curve, groupName) { task.push({cmd: "GET", out: 0, var: 0, len: pointsInChunk*sMid}); task.push({cmd: "GET", out: 1, var: 1, len: pointsInChunk*sMid}); } - opPromises.push(tm.queueAction(task).then( (r) => { + opPromises.push(tm.queueAction(task, [chunks[o1].buffer, chunks[o2].buffer, first.buffer ]).then( (r) => { if (logger) logger.debug(`${loggerTxt}: fft ${bits} join ${i}/${bits} ${j+1}/${nGroups} ${k}/${nChunksPerGroup/2}`); return r; })); @@ -402,7 +402,7 @@ export default function buildFFT(curve, groupName) { task.push({cmd: "GET", out: 0, var: 0, len: n*sOut}); task.push({cmd: "GET", out: 1, var: 1, len: n*sOut}); opPromises.push( - tm.queueAction(task).then( (r) => { + tm.queueAction(task, [b1.buffer, b2.buffer, firstChunk.buffer]).then((r) => { if (logger) logger.debug(`${loggerTxt}: fftJoinExt End: ${i}/${nPoints}`); return r; }) @@ -550,7 +550,7 @@ export default function buildFFT(curve, groupName) { } task.push({cmd: "GET", out: 0, var: 0, len: pointsPerChunk*sG}); opPromises.push( - tm.queueAction(task) + tm.queueAction(task, [b.buffer]) ); } @@ -585,7 +585,7 @@ export default function buildFFT(curve, groupName) { ]}); task.push({cmd: "GET", out: 0, var: 0, len: pointsPerChunk*sG}); task.push({cmd: "GET", out: 1, var: 1, len: pointsPerChunk*sG}); - opPromises.push(tm.queueAction(task)); + opPromises.push(tm.queueAction(task, [chunks[o1].buffer, chunks[o2].buffer, first.buffer])); } } @@ -664,7 +664,7 @@ export default function buildFFT(curve, groupName) { task.push({cmd: "GET", out: 0, var: 0, len: pointsPerChunk*sG}); task.push({cmd: "GET", out: 1, var: 1, len: pointsPerChunk*sG}); opPromises.push( - tm.queueAction(task) + tm.queueAction(task, [b1.buffer, b2.buffer, firstChunk.buffer]) ); } @@ -740,7 +740,7 @@ export default function buildFFT(curve, groupName) { ]}); task.push({cmd: "GET", out: 0, var: 0, len: n*sGout}); opPromises.push( - tm.queueAction(task) + tm.queueAction(task, [b.buffer]) ); } diff --git a/src/engine_pairing.js b/src/engine_pairing.js index 5a802ce..39095d2 100644 --- a/src/engine_pairing.js +++ b/src/engine_pairing.js @@ -60,7 +60,7 @@ export default function buildPairing(curve) { task.push({cmd: "GET", out: 0, var: 4, len: curve.Gt.n8}); opPromises.push( - tm.queueAction(task) + tm.queueAction(task, [g1Buff.buffer, g2Buff.buffer]) ); } diff --git a/src/wasm_field1.js b/src/wasm_field1.js index e5c3b64..7dbd8bb 100644 --- a/src/wasm_field1.js +++ b/src/wasm_field1.js @@ -282,7 +282,7 @@ export default class WasmField1 { {cmd: "GET", out: 0, var: 1, len:sOut * n}, ]; opPromises.push( - this.tm.queueAction(task) + this.tm.queueAction(task, [buffChunk.buffer]) ); }