diff --git a/core/base/crdt.ts b/core/base/crdt.ts index 93ef20e2c..018078585 100644 --- a/core/base/crdt.ts +++ b/core/base/crdt.ts @@ -83,7 +83,7 @@ export class CRDTImpl implements CRDT { // self reference to fullfill HasCRDT readonly crdt: CRDT; - readonly ledgerParent?: Ledger; + ledgerParent?: Ledger; constructor(sthis: SuperThis, opts: CRDTOpts, parent?: Ledger) { this.sthis = sthis; @@ -236,6 +236,7 @@ export class CRDTImpl implements CRDT { this.indexBlockstore ? this.indexBlockstore.close() : Promise.resolve(), this.clock.close(), ]); + this.ledgerParent = undefined; } async destroy(): Promise { diff --git a/core/blockstore/attachable-store.ts b/core/blockstore/attachable-store.ts index 42363e8ab..ac6ba51fa 100644 --- a/core/blockstore/attachable-store.ts +++ b/core/blockstore/attachable-store.ts @@ -289,7 +289,7 @@ export async function createAttachedStores( } export class AttachedRemotesImpl implements AttachedStores { - private readonly _remotes = new KeyedResolvOnce(); + private _remotes = new KeyedResolvOnce(); readonly loadable: Loadable; // readonly attactedFileStore: DataStore; @@ -330,6 +330,17 @@ export class AttachedRemotesImpl implements AttachedStores { return new ActiveStoreImpl(this._local.stores as LocalDataAndMetaAndWalStore, this); } + /** + * Returns the local store if set, or undefined if already reset. + * Use this for safe access during teardown. + */ + localOrUndefined(): LocalActiveStore | undefined { + if (!this._local) { + return undefined; + } + return new ActiveStoreImpl(this._local.stores as LocalDataAndMetaAndWalStore, this); + } + activate(store: DataAndMetaStore | CoerceURI): ActiveStore { // console.log( // "activate", @@ -374,7 +385,7 @@ export class AttachedRemotesImpl implements AttachedStores { } // needed for React Statemanagement - readonly _keyedAttachable = new KeyedResolvOnce(); + private _keyedAttachable = new KeyedResolvOnce(); async attach(attachable: Attachable, onAttach: (at: Attached) => Promise): Promise { const keyed = attachable.configHash(this.loadable.blockstoreParent?.crdtParent?.ledgerParent); // console.log("attach-enter", keyed, this.loadable.blockstoreParent?.crdtParent?.ledgerParent?.name); @@ -465,4 +476,16 @@ export class AttachedRemotesImpl implements AttachedStores { }); return ret; } + + /** + * Reset remote attachments for teardown. + * Note: _local is intentionally NOT cleared here because async operations + * (meta stream handler, write queue) may still reference it during shutdown. + * The local store is properly cleaned up via detach(). + */ + reset(): void { + this._remotes = new KeyedResolvOnce(); + this._keyedAttachable = new KeyedResolvOnce(); + // Don't clear _local - async operations may still need it during shutdown + } } diff --git a/core/blockstore/loader.ts b/core/blockstore/loader.ts index 0f050f98c..78f39c53c 100644 --- a/core/blockstore/loader.ts +++ b/core/blockstore/loader.ts @@ -157,15 +157,15 @@ class CommitAction implements CommitParams { export class Loader implements Loadable { // readonly name: string; - readonly blockstoreParent?: BlockFetcher; + blockstoreParent?: BlockFetcher; readonly ebOpts: BlockstoreRuntime; readonly logger: Logger; readonly commitQueue: CommitQueueIf; isCompacting = false; - readonly cidCache: KeyedResolvOnce; + private cidCache: KeyedResolvOnce; private readonly maxConcurrentCarReader: ReturnType; private readonly maxConcurrentWrite = pLimit(1); - readonly seenCompacted: LRUSet; + private seenCompacted: LRUSet; // readonly processedCars: Set = new Set(); readonly sthis: SuperThis; readonly taskManager: TaskManager; @@ -299,6 +299,11 @@ export class Loader implements Loadable { // private getBlockCache = new Map(); private seenMeta: LRUSet; + private readonly cacheSizes: { + readonly carCacheSize: number; + readonly metaCacheSize: number; + readonly compactCacheSize: number; + }; keyBag(): Promise { return getKeyBag(this.sthis, this.ebOpts.keyBag); @@ -306,7 +311,36 @@ export class Loader implements Loadable { private readonly onceReady: ResolveOnce = new ResolveOnce(); - metaStreamReader!: ReadableStreamDefaultReader; + /** + * Create a new CID cache for block lookups. + */ + private buildCidCache() { + return new KeyedResolvOnce({ + lru: { + maxEntries: this.cacheSizes.carCacheSize, + }, + }); + } + + /** + * Create a new cache for seen metadata groups. + */ + private buildSeenMeta() { + return new LRUSet({ + maxEntries: this.cacheSizes.metaCacheSize, + }); + } + + /** + * Create a new cache for seen compaction events. + */ + private buildSeenCompacted() { + return new LRUSet({ + maxEntries: this.cacheSizes.compactCacheSize, + }); + } + + metaStreamReader?: ReadableStreamDefaultReader; async ready(): Promise { return this.onceReady.once(async () => { await createAttachedStores( @@ -407,19 +441,31 @@ export class Loader implements Loadable { await this.attachedStores.detach(); // console.log("close-3"); await this.metaStreamReader?.cancel("close"); + this.metaStreamReader = undefined; // console.log("close-4"); // const toClose = await Promise.all([this.carStore(), this.metaStore(), this.fileStore(), this.WALStore()]); // await Promise.all(toClose.map((store) => store.close())); + this.attachedStores.reset(); + await this.taskManager.close(); + this.currentMeta = []; + this.carLog.update([]); + this.cidCache = this.buildCidCache(); + this.seenMeta = this.buildSeenMeta(); + this.seenCompacted = this.buildSeenCompacted(); + this.blockstoreParent = undefined; } + /** + * Destroys all local stores. Safe to call after close(). + */ async destroy() { // console.log("destroy", this.attachedStores.local().baseStores().map((store) => store.url().toString())); - await Promise.all( - this.attachedStores - .local() - .baseStores() - .map((store) => store.destroy()), - ); + const local = this.attachedStores.localOrUndefined?.(); + if (!local) { + // Already reset by close(), nothing to destroy + return; + } + await Promise.all(local.baseStores().map((store) => store.destroy())); } readonly id: string; @@ -439,17 +485,14 @@ export class Loader implements Loadable { "Loader", ); this.logger = ensureLogger(sthis, "Loader"); - this.cidCache = new KeyedResolvOnce({ - lru: { - maxEntries: parseInt(this.ebOpts.storeUrls.car.getParam(PARAM.CAR_CACHE_SIZE, "1000000"), 10), - }, - }); - this.seenMeta = new LRUSet({ - maxEntries: parseInt(this.ebOpts.storeUrls.meta.getParam(PARAM.CAR_META_CACHE_SIZE, "1000"), 10), - }); - this.seenCompacted = new LRUSet({ - maxEntries: parseInt(this.ebOpts.storeUrls.car.getParam(PARAM.CAR_COMPACT_CACHE_SIZE, "1000"), 10), - }); + this.cacheSizes = { + carCacheSize: parseInt(this.ebOpts.storeUrls.car.getParam(PARAM.CAR_CACHE_SIZE, "1000000"), 10), + metaCacheSize: parseInt(this.ebOpts.storeUrls.meta.getParam(PARAM.CAR_META_CACHE_SIZE, "1000"), 10), + compactCacheSize: parseInt(this.ebOpts.storeUrls.car.getParam(PARAM.CAR_COMPACT_CACHE_SIZE, "1000"), 10), + }; + this.cidCache = this.buildCidCache(); + this.seenMeta = this.buildSeenMeta(); + this.seenCompacted = this.buildSeenCompacted(); this.maxConcurrentCarReader = pLimit(parseInt(this.ebOpts.storeUrls.car.getParam(PARAM.CAR_PARALLEL, "10"), 10)); // console.log("maxConcurrentCarReader", this.maxConcurrentCarReader.concurrency); diff --git a/core/blockstore/transaction.ts b/core/blockstore/transaction.ts index c7d238199..c05a36de2 100644 --- a/core/blockstore/transaction.ts +++ b/core/blockstore/transaction.ts @@ -122,7 +122,7 @@ export class BaseBlockstoreImpl implements BlockFetcher { readonly ebOpts: BlockstoreRuntime; readonly sthis: SuperThis; - readonly crdtParent?: CRDT; + crdtParent?: CRDT; readonly loader: Loadable; // readonly name?: string; @@ -133,7 +133,8 @@ export class BaseBlockstoreImpl implements BlockFetcher { } async close(): Promise { - // no-op + this.transactions.clear(); + this.crdtParent = undefined; } async destroy(): Promise { @@ -221,8 +222,9 @@ export class EncryptedBlockstore extends BaseBlockstoreImpl { return this.loader.ready(); } - close(): Promise { - return this.loader.close(); + async close(): Promise { + await this.loader.close(); + await super.close(); } destroy(): Promise { diff --git a/core/runtime/task-manager.ts b/core/runtime/task-manager.ts index 68f3b5d39..e66cc79ad 100644 --- a/core/runtime/task-manager.ts +++ b/core/runtime/task-manager.ts @@ -21,6 +21,7 @@ export class TaskManager implements TaskManagerIf { private queue: TaskItem[] = []; private isProcessing = false; + private isClosed = false; readonly logger: Logger; readonly params: TaskManagerParams; @@ -33,6 +34,7 @@ export class TaskManager implements TaskManagerIf { } async handleEvent(cid: CarClockLink, parents: CarClockHead, dbMeta: DbMeta, store: ActiveStore) { + if (this.isClosed) return; for (const parent of parents) { this.eventsWeHandled.add(parent.toString()); } @@ -43,6 +45,7 @@ export class TaskManager implements TaskManagerIf { private async processQueue() { if (this.isProcessing) return; + if (this.isClosed) return; this.isProcessing = true; const filteredQueue = this.queue.filter(({ cid }) => !this.eventsWeHandled.has(cid)); const first = filteredQueue[0]; @@ -63,9 +66,19 @@ export class TaskManager implements TaskManagerIf { this.logger.Warn().Err(err).Msg("retry to process event block"); } finally { this.isProcessing = false; - if (this.queue.length > 0) { + if (!this.isClosed && this.queue.length > 0) { void this.processQueue(); } } } + + /** + * Stop processing and clear queued tasks. + */ + async close(): Promise { + this.isClosed = true; + this.queue = []; + this.eventsWeHandled.clear(); + this.isProcessing = false; + } } diff --git a/core/tests/blockstore/standalone.test.ts b/core/tests/blockstore/standalone.test.ts index c03893db0..fd723acb4 100644 --- a/core/tests/blockstore/standalone.test.ts +++ b/core/tests/blockstore/standalone.test.ts @@ -4,7 +4,6 @@ import pLimit from "@fireproof/vendor/p-limit"; import { ensureSuperThis, sleep } from "@fireproof/core-runtime"; import { CRDT, PARAM, LedgerOpts } from "@fireproof/core-types-base"; import { describe, it, vi, expect, beforeEach, afterEach } from "vitest"; -import { Loader } from "@fireproof/core-blockstore"; import { CRDTImpl, fireproof } from "@fireproof/core-base"; describe("standalone", () => { @@ -44,12 +43,14 @@ describe("standalone", () => { }, }); const fn = vi.fn(); - const loader = db.ledger.crdt.blockstore.loader as Loader; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const loader = db.ledger.crdt.blockstore.loader as any; loader.cidCache.onSet(fn); expect(fn).toHaveBeenCalledTimes(0); await db.ready(); expect( - loader.cidCache.values().map((i) => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + loader.cidCache.values().map((i: any) => { const v = i.value.Ok(); return { type: v.item.type, diff --git a/core/types/base/types.ts b/core/types/base/types.ts index f7bf23e05..426d925dd 100644 --- a/core/types/base/types.ts +++ b/core/types/base/types.ts @@ -500,7 +500,7 @@ export interface BaseBlockstore { } export interface CRDT extends ReadyCloseDestroy, HasLogger, HasSuperThis, HasCRDT { - readonly ledgerParent?: Ledger; + ledgerParent?: Ledger; readonly logger: Logger; readonly sthis: SuperThis; // self reference to fullfill HasCRDT diff --git a/core/types/blockstore/types.ts b/core/types/blockstore/types.ts index caf3b3a02..ff975ef9b 100644 --- a/core/types/blockstore/types.ts +++ b/core/types/blockstore/types.ts @@ -554,11 +554,17 @@ export interface AttachedStores { // metaStore(): Promise; local(): LocalActiveStore; + /** + * Returns the local store if set, or undefined if already reset. + * Use this for safe access during teardown. + */ + localOrUndefined(): LocalActiveStore | undefined; forRemotes(actionFn: (store: ActiveStore) => Promise): Promise; remotes(): ActiveStore[]; activate(store: DataAndMetaStore | CoerceURI): ActiveStore; attach(attached: Attachable, onAttach: (at: Attached) => Promise): Promise; detach(): Promise; + reset(): void; } export interface BaseAttachedStores { @@ -669,7 +675,7 @@ export interface Loadable { // readonly name: string; // = ""; readonly sthis: SuperThis; readonly logger: Logger; - readonly blockstoreParent?: BlockFetcher; + blockstoreParent?: BlockFetcher; readonly ebOpts: BlockstoreRuntime; readonly carLog: CarLog;