Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/base/crdt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ export class CRDTImpl implements CRDT {
// self reference to fullfill HasCRDT
readonly crdt: CRDT;

readonly ledgerParent?: Ledger;
ledgerParent?: Ledger;

constructor(sthis: SuperThis, opts: CRDTOpts, parent?: Ledger) {
this.sthis = sthis;
Expand Down Expand Up @@ -236,6 +236,7 @@ export class CRDTImpl implements CRDT {
this.indexBlockstore ? this.indexBlockstore.close() : Promise.resolve(),
this.clock.close(),
]);
this.ledgerParent = undefined;
}

async destroy(): Promise<void> {
Expand Down
27 changes: 25 additions & 2 deletions core/blockstore/attachable-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ export async function createAttachedStores(
}

export class AttachedRemotesImpl implements AttachedStores {
private readonly _remotes = new KeyedResolvOnce<Attached>();
private _remotes = new KeyedResolvOnce<Attached>();

readonly loadable: Loadable;
// readonly attactedFileStore: DataStore;
Expand Down Expand Up @@ -330,6 +330,17 @@ export class AttachedRemotesImpl implements AttachedStores {
return new ActiveStoreImpl(this._local.stores as LocalDataAndMetaAndWalStore, this);
}

/**
* Returns the local store if set, or undefined if already reset.
* Use this for safe access during teardown.
*/
localOrUndefined(): LocalActiveStore | undefined {
if (!this._local) {
return undefined;
}
return new ActiveStoreImpl(this._local.stores as LocalDataAndMetaAndWalStore, this);
}

activate(store: DataAndMetaStore | CoerceURI): ActiveStore {
// console.log(
// "activate",
Expand Down Expand Up @@ -374,7 +385,7 @@ export class AttachedRemotesImpl implements AttachedStores {
}

// needed for React Statemanagement
readonly _keyedAttachable = new KeyedResolvOnce<Attached>();
private _keyedAttachable = new KeyedResolvOnce<Attached>();
async attach(attachable: Attachable, onAttach: (at: Attached) => Promise<Attached>): Promise<Attached> {
const keyed = attachable.configHash(this.loadable.blockstoreParent?.crdtParent?.ledgerParent);
// console.log("attach-enter", keyed, this.loadable.blockstoreParent?.crdtParent?.ledgerParent?.name);
Expand Down Expand Up @@ -465,4 +476,16 @@ export class AttachedRemotesImpl implements AttachedStores {
});
return ret;
}

/**
* Reset remote attachments for teardown.
* Note: _local is intentionally NOT cleared here because async operations
* (meta stream handler, write queue) may still reference it during shutdown.
* The local store is properly cleaned up via detach().
*/
reset(): void {
this._remotes = new KeyedResolvOnce<Attached>();
this._keyedAttachable = new KeyedResolvOnce<Attached>();
// Don't clear _local - async operations may still need it during shutdown
}
}
85 changes: 64 additions & 21 deletions core/blockstore/loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,15 +157,15 @@ class CommitAction implements CommitParams {

export class Loader implements Loadable {
// readonly name: string;
readonly blockstoreParent?: BlockFetcher;
blockstoreParent?: BlockFetcher;
readonly ebOpts: BlockstoreRuntime;
readonly logger: Logger;
readonly commitQueue: CommitQueueIf<CarGroup>;
isCompacting = false;
readonly cidCache: KeyedResolvOnce<FPBlock>;
private cidCache: KeyedResolvOnce<FPBlock>;
private readonly maxConcurrentCarReader: ReturnType<typeof pLimit>;
private readonly maxConcurrentWrite = pLimit(1);
readonly seenCompacted: LRUSet<string>;
private seenCompacted: LRUSet<string>;
// readonly processedCars: Set<string> = new Set<string>();
readonly sthis: SuperThis;
readonly taskManager: TaskManager;
Expand Down Expand Up @@ -299,14 +299,48 @@ export class Loader implements Loadable {

// private getBlockCache = new Map<string, AnyBlock>();
private seenMeta: LRUSet<string>;
private readonly cacheSizes: {
readonly carCacheSize: number;
readonly metaCacheSize: number;
readonly compactCacheSize: number;
};

keyBag(): Promise<KeyBagIf> {
return getKeyBag(this.sthis, this.ebOpts.keyBag);
}

private readonly onceReady: ResolveOnce<void> = new ResolveOnce<void>();

metaStreamReader!: ReadableStreamDefaultReader<DbMeta[]>;
/**
* Create a new CID cache for block lookups.
*/
private buildCidCache() {
return new KeyedResolvOnce<FPBlock>({
lru: {
maxEntries: this.cacheSizes.carCacheSize,
},
});
}

/**
* Create a new cache for seen metadata groups.
*/
private buildSeenMeta() {
return new LRUSet<string>({
maxEntries: this.cacheSizes.metaCacheSize,
});
}

/**
* Create a new cache for seen compaction events.
*/
private buildSeenCompacted() {
return new LRUSet<string>({
maxEntries: this.cacheSizes.compactCacheSize,
});
}

metaStreamReader?: ReadableStreamDefaultReader<DbMeta[]>;
async ready(): Promise<void> {
return this.onceReady.once(async () => {
await createAttachedStores(
Expand Down Expand Up @@ -407,19 +441,31 @@ export class Loader implements Loadable {
await this.attachedStores.detach();
// console.log("close-3");
await this.metaStreamReader?.cancel("close");
this.metaStreamReader = undefined;
// console.log("close-4");
// const toClose = await Promise.all([this.carStore(), this.metaStore(), this.fileStore(), this.WALStore()]);
// await Promise.all(toClose.map((store) => store.close()));
this.attachedStores.reset();
await this.taskManager.close();
this.currentMeta = [];
this.carLog.update([]);
this.cidCache = this.buildCidCache();
this.seenMeta = this.buildSeenMeta();
this.seenCompacted = this.buildSeenCompacted();
this.blockstoreParent = undefined;
}

/**
* Destroys all local stores. Safe to call after close().
*/
async destroy() {
// console.log("destroy", this.attachedStores.local().baseStores().map((store) => store.url().toString()));
await Promise.all(
this.attachedStores
.local()
.baseStores()
.map((store) => store.destroy()),
);
const local = this.attachedStores.localOrUndefined?.();
if (!local) {
// Already reset by close(), nothing to destroy
return;
}
await Promise.all(local.baseStores().map((store) => store.destroy()));
}

readonly id: string;
Expand All @@ -439,17 +485,14 @@ export class Loader implements Loadable {
"Loader",
);
this.logger = ensureLogger(sthis, "Loader");
this.cidCache = new KeyedResolvOnce({
lru: {
maxEntries: parseInt(this.ebOpts.storeUrls.car.getParam(PARAM.CAR_CACHE_SIZE, "1000000"), 10),
},
});
this.seenMeta = new LRUSet({
maxEntries: parseInt(this.ebOpts.storeUrls.meta.getParam(PARAM.CAR_META_CACHE_SIZE, "1000"), 10),
});
this.seenCompacted = new LRUSet({
maxEntries: parseInt(this.ebOpts.storeUrls.car.getParam(PARAM.CAR_COMPACT_CACHE_SIZE, "1000"), 10),
});
this.cacheSizes = {
carCacheSize: parseInt(this.ebOpts.storeUrls.car.getParam(PARAM.CAR_CACHE_SIZE, "1000000"), 10),
metaCacheSize: parseInt(this.ebOpts.storeUrls.meta.getParam(PARAM.CAR_META_CACHE_SIZE, "1000"), 10),
compactCacheSize: parseInt(this.ebOpts.storeUrls.car.getParam(PARAM.CAR_COMPACT_CACHE_SIZE, "1000"), 10),
};
this.cidCache = this.buildCidCache();
this.seenMeta = this.buildSeenMeta();
this.seenCompacted = this.buildSeenCompacted();
this.maxConcurrentCarReader = pLimit(parseInt(this.ebOpts.storeUrls.car.getParam(PARAM.CAR_PARALLEL, "10"), 10));
// console.log("maxConcurrentCarReader", this.maxConcurrentCarReader.concurrency);

Expand Down
10 changes: 6 additions & 4 deletions core/blockstore/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ export class BaseBlockstoreImpl implements BlockFetcher {
readonly ebOpts: BlockstoreRuntime;
readonly sthis: SuperThis;

readonly crdtParent?: CRDT;
crdtParent?: CRDT;

readonly loader: Loadable;
// readonly name?: string;
Expand All @@ -133,7 +133,8 @@ export class BaseBlockstoreImpl implements BlockFetcher {
}

async close(): Promise<void> {
// no-op
this.transactions.clear();
this.crdtParent = undefined;
}

async destroy(): Promise<void> {
Expand Down Expand Up @@ -221,8 +222,9 @@ export class EncryptedBlockstore extends BaseBlockstoreImpl {
return this.loader.ready();
}

close(): Promise<void> {
return this.loader.close();
async close(): Promise<void> {
await this.loader.close();
await super.close();
}

destroy(): Promise<void> {
Expand Down
15 changes: 14 additions & 1 deletion core/runtime/task-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export class TaskManager implements TaskManagerIf {

private queue: TaskItem[] = [];
private isProcessing = false;
private isClosed = false;

readonly logger: Logger;
readonly params: TaskManagerParams;
Expand All @@ -33,6 +34,7 @@ export class TaskManager implements TaskManagerIf {
}

async handleEvent(cid: CarClockLink, parents: CarClockHead, dbMeta: DbMeta, store: ActiveStore) {
if (this.isClosed) return;
for (const parent of parents) {
this.eventsWeHandled.add(parent.toString());
}
Expand All @@ -43,6 +45,7 @@ export class TaskManager implements TaskManagerIf {

private async processQueue() {
if (this.isProcessing) return;
if (this.isClosed) return;
this.isProcessing = true;
const filteredQueue = this.queue.filter(({ cid }) => !this.eventsWeHandled.has(cid));
const first = filteredQueue[0];
Expand All @@ -63,9 +66,19 @@ export class TaskManager implements TaskManagerIf {
this.logger.Warn().Err(err).Msg("retry to process event block");
} finally {
this.isProcessing = false;
if (this.queue.length > 0) {
if (!this.isClosed && this.queue.length > 0) {
void this.processQueue();
}
}
}

/**
* Stop processing and clear queued tasks.
*/
async close(): Promise<void> {
this.isClosed = true;
this.queue = [];
this.eventsWeHandled.clear();
this.isProcessing = false;
}
}
7 changes: 4 additions & 3 deletions core/tests/blockstore/standalone.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import pLimit from "@fireproof/vendor/p-limit";
import { ensureSuperThis, sleep } from "@fireproof/core-runtime";
import { CRDT, PARAM, LedgerOpts } from "@fireproof/core-types-base";
import { describe, it, vi, expect, beforeEach, afterEach } from "vitest";
import { Loader } from "@fireproof/core-blockstore";
import { CRDTImpl, fireproof } from "@fireproof/core-base";

describe("standalone", () => {
Expand Down Expand Up @@ -44,12 +43,14 @@ describe("standalone", () => {
},
});
const fn = vi.fn();
const loader = db.ledger.crdt.blockstore.loader as Loader;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const loader = db.ledger.crdt.blockstore.loader as any;
loader.cidCache.onSet(fn);
expect(fn).toHaveBeenCalledTimes(0);
await db.ready();
expect(
loader.cidCache.values().map((i) => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
loader.cidCache.values().map((i: any) => {
const v = i.value.Ok();
return {
type: v.item.type,
Expand Down
2 changes: 1 addition & 1 deletion core/types/base/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ export interface BaseBlockstore {
}

export interface CRDT extends ReadyCloseDestroy, HasLogger, HasSuperThis, HasCRDT {
readonly ledgerParent?: Ledger;
ledgerParent?: Ledger;
readonly logger: Logger;
readonly sthis: SuperThis;
// self reference to fullfill HasCRDT
Expand Down
8 changes: 7 additions & 1 deletion core/types/blockstore/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -554,11 +554,17 @@ export interface AttachedStores {
// metaStore(): Promise<MetaStore>;

local(): LocalActiveStore;
/**
* Returns the local store if set, or undefined if already reset.
* Use this for safe access during teardown.
*/
localOrUndefined(): LocalActiveStore | undefined;
forRemotes(actionFn: (store: ActiveStore) => Promise<unknown>): Promise<void>;
remotes(): ActiveStore[];
activate(store: DataAndMetaStore | CoerceURI): ActiveStore;
attach(attached: Attachable, onAttach: (at: Attached) => Promise<Attached>): Promise<Attached>;
detach(): Promise<void>;
reset(): void;
}

export interface BaseAttachedStores {
Expand Down Expand Up @@ -669,7 +675,7 @@ export interface Loadable {
// readonly name: string; // = "";
readonly sthis: SuperThis;
readonly logger: Logger;
readonly blockstoreParent?: BlockFetcher;
blockstoreParent?: BlockFetcher;
readonly ebOpts: BlockstoreRuntime;
readonly carLog: CarLog;

Expand Down