From 1a2c2fbecaa879209f5ada18106932a50c56b09f Mon Sep 17 00:00:00 2001 From: Brett Bonner Date: Tue, 30 Dec 2025 18:29:46 -0800 Subject: [PATCH 1/3] Release blockstore references on close --- core/base/crdt.ts | 3 +- core/blockstore/attachable-store.ts | 10 ++++- core/blockstore/loader.ts | 63 +++++++++++++++++++++-------- core/blockstore/transaction.ts | 9 +++-- core/runtime/task-manager.ts | 13 ++++++ core/types/base/types.ts | 2 +- core/types/blockstore/types.ts | 3 +- 7 files changed, 79 insertions(+), 24 deletions(-) diff --git a/core/base/crdt.ts b/core/base/crdt.ts index 93ef20e2c..018078585 100644 --- a/core/base/crdt.ts +++ b/core/base/crdt.ts @@ -83,7 +83,7 @@ export class CRDTImpl implements CRDT { // self reference to fullfill HasCRDT readonly crdt: CRDT; - readonly ledgerParent?: Ledger; + ledgerParent?: Ledger; constructor(sthis: SuperThis, opts: CRDTOpts, parent?: Ledger) { this.sthis = sthis; @@ -236,6 +236,7 @@ export class CRDTImpl implements CRDT { this.indexBlockstore ? this.indexBlockstore.close() : Promise.resolve(), this.clock.close(), ]); + this.ledgerParent = undefined; } async destroy(): Promise { diff --git a/core/blockstore/attachable-store.ts b/core/blockstore/attachable-store.ts index 42363e8ab..9d6d77900 100644 --- a/core/blockstore/attachable-store.ts +++ b/core/blockstore/attachable-store.ts @@ -289,7 +289,7 @@ export async function createAttachedStores( } export class AttachedRemotesImpl implements AttachedStores { - private readonly _remotes = new KeyedResolvOnce(); + private _remotes = new KeyedResolvOnce(); readonly loadable: Loadable; // readonly attactedFileStore: DataStore; @@ -374,7 +374,7 @@ export class AttachedRemotesImpl implements AttachedStores { } // needed for React Statemanagement - readonly _keyedAttachable = new KeyedResolvOnce(); + private _keyedAttachable = new KeyedResolvOnce(); async attach(attachable: Attachable, onAttach: (at: Attached) => Promise): Promise { const keyed = attachable.configHash(this.loadable.blockstoreParent?.crdtParent?.ledgerParent); // console.log("attach-enter", keyed, this.loadable.blockstoreParent?.crdtParent?.ledgerParent?.name); @@ -465,4 +465,10 @@ export class AttachedRemotesImpl implements AttachedStores { }); return ret; } + + reset(): void { + this._remotes = new KeyedResolvOnce(); + this._keyedAttachable = new KeyedResolvOnce(); + this._local = undefined; + } } diff --git a/core/blockstore/loader.ts b/core/blockstore/loader.ts index 0f050f98c..cf48190e6 100644 --- a/core/blockstore/loader.ts +++ b/core/blockstore/loader.ts @@ -157,15 +157,15 @@ class CommitAction implements CommitParams { export class Loader implements Loadable { // readonly name: string; - readonly blockstoreParent?: BlockFetcher; + blockstoreParent?: BlockFetcher; readonly ebOpts: BlockstoreRuntime; readonly logger: Logger; readonly commitQueue: CommitQueueIf; isCompacting = false; - readonly cidCache: KeyedResolvOnce; + private cidCache: KeyedResolvOnce; private readonly maxConcurrentCarReader: ReturnType; private readonly maxConcurrentWrite = pLimit(1); - readonly seenCompacted: LRUSet; + private seenCompacted: LRUSet; // readonly processedCars: Set = new Set(); readonly sthis: SuperThis; readonly taskManager: TaskManager; @@ -299,6 +299,11 @@ export class Loader implements Loadable { // private getBlockCache = new Map(); private seenMeta: LRUSet; + private readonly cacheSizes: { + readonly carCacheSize: number; + readonly metaCacheSize: number; + readonly compactCacheSize: number; + }; keyBag(): Promise { return getKeyBag(this.sthis, this.ebOpts.keyBag); @@ -306,7 +311,27 @@ export class Loader implements Loadable { private readonly onceReady: ResolveOnce = new ResolveOnce(); - metaStreamReader!: ReadableStreamDefaultReader; + private buildCidCache() { + return new KeyedResolvOnce({ + lru: { + maxEntries: this.cacheSizes.carCacheSize, + }, + }); + } + + private buildSeenMeta() { + return new LRUSet({ + maxEntries: this.cacheSizes.metaCacheSize, + }); + } + + private buildSeenCompacted() { + return new LRUSet({ + maxEntries: this.cacheSizes.compactCacheSize, + }); + } + + metaStreamReader?: ReadableStreamDefaultReader; async ready(): Promise { return this.onceReady.once(async () => { await createAttachedStores( @@ -321,7 +346,7 @@ export class Loader implements Loadable { ); const local = this.attachedStores.local(); // console.log("ready", this.id); - this.metaStreamReader = local.active.meta.stream().getReader(); + this.metaStreamReader = local.active.meta.stream().getReader(); // console.log("attach-local", local.active.car.url().pathname); await this.waitFirstMeta(this.metaStreamReader, local, { meta: this.ebOpts.meta, origin: local.active.car.url() }); }); @@ -407,9 +432,18 @@ export class Loader implements Loadable { await this.attachedStores.detach(); // console.log("close-3"); await this.metaStreamReader?.cancel("close"); + this.metaStreamReader = undefined; // console.log("close-4"); // const toClose = await Promise.all([this.carStore(), this.metaStore(), this.fileStore(), this.WALStore()]); // await Promise.all(toClose.map((store) => store.close())); + this.attachedStores.reset?.(); + this.taskManager.close(); + this.currentMeta = []; + this.carLog.update([]); + this.cidCache = this.buildCidCache(); + this.seenMeta = this.buildSeenMeta(); + this.seenCompacted = this.buildSeenCompacted(); + this.blockstoreParent = undefined; } async destroy() { @@ -439,17 +473,14 @@ export class Loader implements Loadable { "Loader", ); this.logger = ensureLogger(sthis, "Loader"); - this.cidCache = new KeyedResolvOnce({ - lru: { - maxEntries: parseInt(this.ebOpts.storeUrls.car.getParam(PARAM.CAR_CACHE_SIZE, "1000000"), 10), - }, - }); - this.seenMeta = new LRUSet({ - maxEntries: parseInt(this.ebOpts.storeUrls.meta.getParam(PARAM.CAR_META_CACHE_SIZE, "1000"), 10), - }); - this.seenCompacted = new LRUSet({ - maxEntries: parseInt(this.ebOpts.storeUrls.car.getParam(PARAM.CAR_COMPACT_CACHE_SIZE, "1000"), 10), - }); + this.cacheSizes = { + carCacheSize: parseInt(this.ebOpts.storeUrls.car.getParam(PARAM.CAR_CACHE_SIZE, "1000000"), 10), + metaCacheSize: parseInt(this.ebOpts.storeUrls.meta.getParam(PARAM.CAR_META_CACHE_SIZE, "1000"), 10), + compactCacheSize: parseInt(this.ebOpts.storeUrls.car.getParam(PARAM.CAR_COMPACT_CACHE_SIZE, "1000"), 10), + }; + this.cidCache = this.buildCidCache(); + this.seenMeta = this.buildSeenMeta(); + this.seenCompacted = this.buildSeenCompacted(); this.maxConcurrentCarReader = pLimit(parseInt(this.ebOpts.storeUrls.car.getParam(PARAM.CAR_PARALLEL, "10"), 10)); // console.log("maxConcurrentCarReader", this.maxConcurrentCarReader.concurrency); diff --git a/core/blockstore/transaction.ts b/core/blockstore/transaction.ts index c7d238199..7a6fd502a 100644 --- a/core/blockstore/transaction.ts +++ b/core/blockstore/transaction.ts @@ -122,7 +122,7 @@ export class BaseBlockstoreImpl implements BlockFetcher { readonly ebOpts: BlockstoreRuntime; readonly sthis: SuperThis; - readonly crdtParent?: CRDT; + crdtParent?: CRDT; readonly loader: Loadable; // readonly name?: string; @@ -133,7 +133,8 @@ export class BaseBlockstoreImpl implements BlockFetcher { } async close(): Promise { - // no-op + this.transactions.clear(); + this.crdtParent = undefined; } async destroy(): Promise { @@ -222,7 +223,9 @@ export class EncryptedBlockstore extends BaseBlockstoreImpl { } close(): Promise { - return this.loader.close(); + return this.loader.close().finally(() => { + void super.close(); + }); } destroy(): Promise { diff --git a/core/runtime/task-manager.ts b/core/runtime/task-manager.ts index 68f3b5d39..b30d75f95 100644 --- a/core/runtime/task-manager.ts +++ b/core/runtime/task-manager.ts @@ -21,6 +21,7 @@ export class TaskManager implements TaskManagerIf { private queue: TaskItem[] = []; private isProcessing = false; + private isClosed = false; readonly logger: Logger; readonly params: TaskManagerParams; @@ -33,6 +34,7 @@ export class TaskManager implements TaskManagerIf { } async handleEvent(cid: CarClockLink, parents: CarClockHead, dbMeta: DbMeta, store: ActiveStore) { + if (this.isClosed) return; for (const parent of parents) { this.eventsWeHandled.add(parent.toString()); } @@ -43,6 +45,7 @@ export class TaskManager implements TaskManagerIf { private async processQueue() { if (this.isProcessing) return; + if (this.isClosed) return; this.isProcessing = true; const filteredQueue = this.queue.filter(({ cid }) => !this.eventsWeHandled.has(cid)); const first = filteredQueue[0]; @@ -63,9 +66,19 @@ export class TaskManager implements TaskManagerIf { this.logger.Warn().Err(err).Msg("retry to process event block"); } finally { this.isProcessing = false; + if (this.isClosed) { + return; + } if (this.queue.length > 0) { void this.processQueue(); } } } + + close(): void { + this.isClosed = true; + this.queue = []; + this.eventsWeHandled.clear(); + this.isProcessing = false; + } } diff --git a/core/types/base/types.ts b/core/types/base/types.ts index f7bf23e05..426d925dd 100644 --- a/core/types/base/types.ts +++ b/core/types/base/types.ts @@ -500,7 +500,7 @@ export interface BaseBlockstore { } export interface CRDT extends ReadyCloseDestroy, HasLogger, HasSuperThis, HasCRDT { - readonly ledgerParent?: Ledger; + ledgerParent?: Ledger; readonly logger: Logger; readonly sthis: SuperThis; // self reference to fullfill HasCRDT diff --git a/core/types/blockstore/types.ts b/core/types/blockstore/types.ts index caf3b3a02..8a10f2a58 100644 --- a/core/types/blockstore/types.ts +++ b/core/types/blockstore/types.ts @@ -559,6 +559,7 @@ export interface AttachedStores { activate(store: DataAndMetaStore | CoerceURI): ActiveStore; attach(attached: Attachable, onAttach: (at: Attached) => Promise): Promise; detach(): Promise; + reset?(): void; } export interface BaseAttachedStores { @@ -669,7 +670,7 @@ export interface Loadable { // readonly name: string; // = ""; readonly sthis: SuperThis; readonly logger: Logger; - readonly blockstoreParent?: BlockFetcher; + blockstoreParent?: BlockFetcher; readonly ebOpts: BlockstoreRuntime; readonly carLog: CarLog; From 8cbedd52ac81482e96647c6dacfb7e6b8bd9dbb9 Mon Sep 17 00:00:00 2001 From: Brett Bonner Date: Tue, 30 Dec 2025 20:19:59 -0800 Subject: [PATCH 2/3] Tighten loader teardown behavior --- core/blockstore/attachable-store.ts | 3 +++ core/blockstore/loader.ts | 13 +++++++++++-- core/blockstore/transaction.ts | 7 +++---- core/runtime/task-manager.ts | 5 ++++- core/types/blockstore/types.ts | 2 +- 5 files changed, 22 insertions(+), 8 deletions(-) diff --git a/core/blockstore/attachable-store.ts b/core/blockstore/attachable-store.ts index 9d6d77900..cc9d2f98c 100644 --- a/core/blockstore/attachable-store.ts +++ b/core/blockstore/attachable-store.ts @@ -466,6 +466,9 @@ export class AttachedRemotesImpl implements AttachedStores { return ret; } + /** + * Reset local and remote attachments for teardown. + */ reset(): void { this._remotes = new KeyedResolvOnce(); this._keyedAttachable = new KeyedResolvOnce(); diff --git a/core/blockstore/loader.ts b/core/blockstore/loader.ts index cf48190e6..a18a97407 100644 --- a/core/blockstore/loader.ts +++ b/core/blockstore/loader.ts @@ -311,6 +311,9 @@ export class Loader implements Loadable { private readonly onceReady: ResolveOnce = new ResolveOnce(); + /** + * Create a new CID cache for block lookups. + */ private buildCidCache() { return new KeyedResolvOnce({ lru: { @@ -319,12 +322,18 @@ export class Loader implements Loadable { }); } + /** + * Create a new cache for seen metadata groups. + */ private buildSeenMeta() { return new LRUSet({ maxEntries: this.cacheSizes.metaCacheSize, }); } + /** + * Create a new cache for seen compaction events. + */ private buildSeenCompacted() { return new LRUSet({ maxEntries: this.cacheSizes.compactCacheSize, @@ -436,8 +445,8 @@ export class Loader implements Loadable { // console.log("close-4"); // const toClose = await Promise.all([this.carStore(), this.metaStore(), this.fileStore(), this.WALStore()]); // await Promise.all(toClose.map((store) => store.close())); - this.attachedStores.reset?.(); - this.taskManager.close(); + this.attachedStores.reset(); + await this.taskManager.close(); this.currentMeta = []; this.carLog.update([]); this.cidCache = this.buildCidCache(); diff --git a/core/blockstore/transaction.ts b/core/blockstore/transaction.ts index 7a6fd502a..c05a36de2 100644 --- a/core/blockstore/transaction.ts +++ b/core/blockstore/transaction.ts @@ -222,10 +222,9 @@ export class EncryptedBlockstore extends BaseBlockstoreImpl { return this.loader.ready(); } - close(): Promise { - return this.loader.close().finally(() => { - void super.close(); - }); + async close(): Promise { + await this.loader.close(); + await super.close(); } destroy(): Promise { diff --git a/core/runtime/task-manager.ts b/core/runtime/task-manager.ts index b30d75f95..10d64a79f 100644 --- a/core/runtime/task-manager.ts +++ b/core/runtime/task-manager.ts @@ -75,7 +75,10 @@ export class TaskManager implements TaskManagerIf { } } - close(): void { + /** + * Stop processing and clear queued tasks. + */ + async close(): Promise { this.isClosed = true; this.queue = []; this.eventsWeHandled.clear(); diff --git a/core/types/blockstore/types.ts b/core/types/blockstore/types.ts index 8a10f2a58..af656ab55 100644 --- a/core/types/blockstore/types.ts +++ b/core/types/blockstore/types.ts @@ -559,7 +559,7 @@ export interface AttachedStores { activate(store: DataAndMetaStore | CoerceURI): ActiveStore; attach(attached: Attachable, onAttach: (at: Attached) => Promise): Promise; detach(): Promise; - reset?(): void; + reset(): void; } export interface BaseAttachedStores { From 10bcbf1fc668b02ac0652a8c80390362f899f11c Mon Sep 17 00:00:00 2001 From: bbopen Date: Tue, 30 Dec 2025 20:38:01 -0800 Subject: [PATCH 3/3] fix(blockstore): safely access local store during teardown MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add localOrUndefined() method to AttachedStores interface - Update destroy() to handle already-reset state safely - Fix reset() to not clear _local - async operations may still need it - Fix lint error: remove return statement from finally block These changes fix "local store not set" errors that occurred when async operations (meta stream handler, write queue) continued running after close() was called. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- core/blockstore/attachable-store.ts | 18 ++++++++++++++++-- core/blockstore/loader.ts | 17 ++++++++++------- core/runtime/task-manager.ts | 5 +---- core/tests/blockstore/standalone.test.ts | 7 ++++--- core/types/blockstore/types.ts | 5 +++++ 5 files changed, 36 insertions(+), 16 deletions(-) diff --git a/core/blockstore/attachable-store.ts b/core/blockstore/attachable-store.ts index cc9d2f98c..ac6ba51fa 100644 --- a/core/blockstore/attachable-store.ts +++ b/core/blockstore/attachable-store.ts @@ -330,6 +330,17 @@ export class AttachedRemotesImpl implements AttachedStores { return new ActiveStoreImpl(this._local.stores as LocalDataAndMetaAndWalStore, this); } + /** + * Returns the local store if set, or undefined if already reset. + * Use this for safe access during teardown. + */ + localOrUndefined(): LocalActiveStore | undefined { + if (!this._local) { + return undefined; + } + return new ActiveStoreImpl(this._local.stores as LocalDataAndMetaAndWalStore, this); + } + activate(store: DataAndMetaStore | CoerceURI): ActiveStore { // console.log( // "activate", @@ -467,11 +478,14 @@ export class AttachedRemotesImpl implements AttachedStores { } /** - * Reset local and remote attachments for teardown. + * Reset remote attachments for teardown. + * Note: _local is intentionally NOT cleared here because async operations + * (meta stream handler, write queue) may still reference it during shutdown. + * The local store is properly cleaned up via detach(). */ reset(): void { this._remotes = new KeyedResolvOnce(); this._keyedAttachable = new KeyedResolvOnce(); - this._local = undefined; + // Don't clear _local - async operations may still need it during shutdown } } diff --git a/core/blockstore/loader.ts b/core/blockstore/loader.ts index a18a97407..78f39c53c 100644 --- a/core/blockstore/loader.ts +++ b/core/blockstore/loader.ts @@ -355,7 +355,7 @@ export class Loader implements Loadable { ); const local = this.attachedStores.local(); // console.log("ready", this.id); - this.metaStreamReader = local.active.meta.stream().getReader(); + this.metaStreamReader = local.active.meta.stream().getReader(); // console.log("attach-local", local.active.car.url().pathname); await this.waitFirstMeta(this.metaStreamReader, local, { meta: this.ebOpts.meta, origin: local.active.car.url() }); }); @@ -455,14 +455,17 @@ export class Loader implements Loadable { this.blockstoreParent = undefined; } + /** + * Destroys all local stores. Safe to call after close(). + */ async destroy() { // console.log("destroy", this.attachedStores.local().baseStores().map((store) => store.url().toString())); - await Promise.all( - this.attachedStores - .local() - .baseStores() - .map((store) => store.destroy()), - ); + const local = this.attachedStores.localOrUndefined?.(); + if (!local) { + // Already reset by close(), nothing to destroy + return; + } + await Promise.all(local.baseStores().map((store) => store.destroy())); } readonly id: string; diff --git a/core/runtime/task-manager.ts b/core/runtime/task-manager.ts index 10d64a79f..e66cc79ad 100644 --- a/core/runtime/task-manager.ts +++ b/core/runtime/task-manager.ts @@ -66,10 +66,7 @@ export class TaskManager implements TaskManagerIf { this.logger.Warn().Err(err).Msg("retry to process event block"); } finally { this.isProcessing = false; - if (this.isClosed) { - return; - } - if (this.queue.length > 0) { + if (!this.isClosed && this.queue.length > 0) { void this.processQueue(); } } diff --git a/core/tests/blockstore/standalone.test.ts b/core/tests/blockstore/standalone.test.ts index c03893db0..fd723acb4 100644 --- a/core/tests/blockstore/standalone.test.ts +++ b/core/tests/blockstore/standalone.test.ts @@ -4,7 +4,6 @@ import pLimit from "@fireproof/vendor/p-limit"; import { ensureSuperThis, sleep } from "@fireproof/core-runtime"; import { CRDT, PARAM, LedgerOpts } from "@fireproof/core-types-base"; import { describe, it, vi, expect, beforeEach, afterEach } from "vitest"; -import { Loader } from "@fireproof/core-blockstore"; import { CRDTImpl, fireproof } from "@fireproof/core-base"; describe("standalone", () => { @@ -44,12 +43,14 @@ describe("standalone", () => { }, }); const fn = vi.fn(); - const loader = db.ledger.crdt.blockstore.loader as Loader; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const loader = db.ledger.crdt.blockstore.loader as any; loader.cidCache.onSet(fn); expect(fn).toHaveBeenCalledTimes(0); await db.ready(); expect( - loader.cidCache.values().map((i) => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + loader.cidCache.values().map((i: any) => { const v = i.value.Ok(); return { type: v.item.type, diff --git a/core/types/blockstore/types.ts b/core/types/blockstore/types.ts index af656ab55..ff975ef9b 100644 --- a/core/types/blockstore/types.ts +++ b/core/types/blockstore/types.ts @@ -554,6 +554,11 @@ export interface AttachedStores { // metaStore(): Promise; local(): LocalActiveStore; + /** + * Returns the local store if set, or undefined if already reset. + * Use this for safe access during teardown. + */ + localOrUndefined(): LocalActiveStore | undefined; forRemotes(actionFn: (store: ActiveStore) => Promise): Promise; remotes(): ActiveStore[]; activate(store: DataAndMetaStore | CoerceURI): ActiveStore;