Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@clickup/ent-framework",
"description": "A PostgreSQL graph-database-alike library with microsharding and row-level security",
"version": "2.13.7",
"version": "2.13.8",
"license": "MIT",
"keywords": [
"postgresql",
Expand Down
60 changes: 49 additions & 11 deletions src/abstract/Cluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
objectHash,
maybeCall,
jsonHash,
jitter,
} from "../internal/misc";
import { Registry } from "../internal/Registry";
import type { Client } from "./Client";
Expand Down Expand Up @@ -45,6 +46,8 @@ export interface ClusterOptions<TClient extends Client, TNode> {
localCache?: LocalCache | null;
/** How often to run Shards rediscovery in normal circumstances. */
shardsDiscoverIntervalMs?: MaybeCallable<number>;
/** Jitter for shardsDiscoverIntervalMs. */
shardsDiscoverIntervalJitter?: MaybeCallable<number>;
/** How often to recheck for changes in options.islands (typically, often,
* since it's assumed that options.islands calculation is cheap). If the
* Cluster configuration is changed, then we trigger rediscovery ASAP. */
Expand Down Expand Up @@ -97,6 +100,7 @@ export class Cluster<TClient extends Client, TNode = DesperateAny> {
> = {
localCache: null,
shardsDiscoverIntervalMs: 10000,
shardsDiscoverIntervalJitter: 0.2,
shardsDiscoverRecheckIslandsIntervalMs: 500,
locateIslandErrorRetryCount: 2,
locateIslandErrorRediscoverClusterDelayMs: 1000,
Expand Down Expand Up @@ -183,7 +187,11 @@ export class Cluster<TClient extends Client, TNode = DesperateAny> {
this.shardNoByID = client.shardNoByID.bind(client);

this.shardsDiscoverCache = new CachedRefreshedValue({
delayMs: () => maybeCall(this.options.shardsDiscoverIntervalMs),
delayMs: () =>
Math.round(
maybeCall(this.options.shardsDiscoverIntervalMs) *
jitter(maybeCall(this.options.shardsDiscoverIntervalJitter)),
),
warningTimeoutMs: () => maybeCall(this.options.shardsDiscoverIntervalMs),
deps: {
delayMs: () =>
Expand All @@ -206,16 +214,38 @@ export class Cluster<TClient extends Client, TNode = DesperateAny> {
* Signals the Cluster to keep the Clients pre-warmed, e.g. open. (It's up to
* the particular Client's implementation, what does a "pre-warmed Client"
* mean; typically, it's keeping some minimal number of pooled connections.)
*
* Except when `randomizedDelayMs` is passed as 0, the actual prewarm (and
* Islands discovery) queries will run with a randomized delay between N/2 and
* N ms. It is better to operate in such mode: if multiple Node processes
* start simultaneously in the cluster, then the randomization helps to avoid
* new connections burst (new connections establishment is expensive for e.g.
* pgbouncer or when DB is accessed over SSL).
*/
prewarm(): void {
prewarm(
randomizedDelayMs: number = 5000,
onInitialPrewarm?: (delayMs: number) => void,
): void {
if (this.prewarmEnabled) {
return;
}

this.prewarmEnabled = true;
runInVoid(async () => {
for (const island of await this.islands()) {
for (const client of island.clients) {
client.prewarm();
}
}
});
const initialDelayMs = randomizedDelayMs
? Math.round(random(randomizedDelayMs / 2, randomizedDelayMs))
: 0;
setTimeout(
() =>
runInVoid(async () => {
onInitialPrewarm?.(initialDelayMs);
for (const island of await this.islands()) {
for (const client of island.clients) {
client.prewarm();
}
}
}),
initialDelayMs,
);
}

/**
Expand Down Expand Up @@ -402,7 +432,11 @@ export class Cluster<TClient extends Client, TNode = DesperateAny> {
const startTime = performance.now();
await pTimeout(
this.shardsDiscoverCache.waitRefresh(),
maybeCall(this.options.shardsDiscoverIntervalMs) * 2,
Math.round(
maybeCall(this.options.shardsDiscoverIntervalMs) *
jitter(maybeCall(this.options.shardsDiscoverIntervalJitter)) *
2,
),
"Timed out while waiting for whole-Cluster Shards discovery.",
).catch((error) =>
this.options.loggers.swallowedErrorLogger({
Expand Down Expand Up @@ -433,7 +467,11 @@ export class Cluster<TClient extends Client, TNode = DesperateAny> {
const startTime = performance.now();
await pTimeout(
island.rediscover(),
maybeCall(this.options.shardsDiscoverIntervalMs) * 2,
Math.round(
maybeCall(this.options.shardsDiscoverIntervalMs) *
jitter(maybeCall(this.options.shardsDiscoverIntervalJitter)) *
2,
),
`Timed out while waiting for Island ${island.no} Shards discovery.`,
).catch((error) =>
this.options.loggers.swallowedErrorLogger({
Expand Down
11 changes: 8 additions & 3 deletions src/abstract/LocalCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ export class LocalCache<TValue extends {} = never> {
this.options = defaults({}, options, LocalCache.DEFAULT_OPTIONS);
this.cleanupTimeout = setTimeout(
() => this.onCleanupTimer(),
this.options.cleanupFirstRunDelayMs * jitter(this.options.cleanupJitter),
Math.round(
this.options.cleanupFirstRunDelayMs *
jitter(this.options.cleanupJitter),
),
);
}

Expand Down Expand Up @@ -170,8 +173,10 @@ export class LocalCache<TValue extends {} = never> {
);
this.cleanupTimeout = setTimeout(
() => this.onCleanupTimer(),
(this.options.expirationMs / this.options.cleanupRoundsPerExpiration) *
jitter(this.options.cleanupJitter),
Math.round(
(this.options.expirationMs / this.options.cleanupRoundsPerExpiration) *
jitter(this.options.cleanupJitter),
),
);
}

Expand Down
4 changes: 2 additions & 2 deletions src/ent/Inverse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ export class Inverse<TClient extends Client, TTable extends Table> {
}),
);
if (row) {
await this.run(vc, this.shard(id1), this.inverseSchema.delete(row.id));
await this.run(vc, this.shard(id1), this.inverseSchema.delete(row[ID]));
}
}

Expand All @@ -143,7 +143,7 @@ export class Inverse<TClient extends Client, TTable extends Table> {
* Creates an Inverse schema which derives its id field's autoInsert from the
* passed id2 schema. The returned schema is heavily cached, so batching for
* it works efficiently even for different id2 schemas and different Inverse
* types (actually, it would work the same way even without @Memoize since
* types (actually, it would work the same way even without `@Memoize` since
* Runner batches by schema hash, not by schema object instance, but anyways).
*/
@Memoize(
Expand Down
10 changes: 8 additions & 2 deletions src/ent/QueryCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { VCWithQueryCache } from "./VCFlavor";
const OPS = [
"loadNullable",
"loadByNullable",
"selectBy",
"select",
"count",
"exists",
Expand Down Expand Up @@ -86,9 +87,14 @@ export class QueryCache {
}

/**
* Deletes cache slots or keys for an Ent.
* Deletes cache slots or keys for an Ent. If key is null, skips the deletion.
* If key is undefined (i.e. not passed), then deletes all slots.
*/
delete(EntClass: AnyClass, ops: Op[], key?: string): this {
delete(EntClass: AnyClass, ops: readonly Op[], key?: string | null): this {
if (key === null) {
return this;
}

const byOp = this.byEntClass?.get(EntClass);
if (!byOp) {
return this;
Expand Down
39 changes: 28 additions & 11 deletions src/ent/VCTrace.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
* part of the trace value.
*/
const RANDOM_BITS = 10;

const RANDOM_BITS_MASK = Math.pow(2, RANDOM_BITS) - 1;

/**
Expand All @@ -16,16 +15,34 @@ export class VCTrace {
readonly trace: string;

constructor(trace?: string) {
this.trace = trace ?? createRandomTrace();
this.trace = trace ?? this.createRandomTrace();
}
}

/**
* Returns a stringified uint63 (0 - 9223372036854775807).
*/
function createRandomTrace(): string {
return (
(BigInt(Date.now()) << BigInt(RANDOM_BITS)) |
BigInt(Math.trunc(Math.random() * RANDOM_BITS_MASK) & RANDOM_BITS_MASK)
).toString();
/**
* In case the trace was created by this tool, tries to extract the date of
* its creation. As a sanity check, verifies that this date is not too far
* away from the present time.
*/
tryExtractCreationDate(): Date | null {
try {
const ts = BigInt(this.trace) >> BigInt(RANDOM_BITS);
const minTs = Date.now() - 1000 * 3600 * 24 * 365;
const maxTs = Date.now() + 1000 * 3600 * 24;
return BigInt(minTs) < ts && ts < BigInt(maxTs)
? new Date(Number(ts))
: null;
} catch {
return null;
}
}

/**
* Returns a stringified uint63 (0 - 9223372036854775807).
*/
private createRandomTrace(): string {
return (
(BigInt(Date.now()) << BigInt(RANDOM_BITS)) |
BigInt(Math.trunc(Math.random() * RANDOM_BITS_MASK) & RANDOM_BITS_MASK)
).toString();
}
}
4 changes: 2 additions & 2 deletions src/ent/__tests__/Ent.composite-pk.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { Require } from "../rules/Require";
import { GLOBAL_SHARD } from "../ShardAffinity";
import { createVC } from "./test-utils";

export class EntTestUser extends BaseEnt(
class EntTestUser extends BaseEnt(
testCluster,
new PgSchema(
'ent.composite-pk"test_user',
Expand Down Expand Up @@ -39,7 +39,7 @@ export class EntTestUser extends BaseEnt(
}
}

export class EntTestComposite extends BaseEnt(
class EntTestComposite extends BaseEnt(
testCluster,
new PgSchema(
'ent.composite-pk"test_composite',
Expand Down
10 changes: 5 additions & 5 deletions src/ent/__tests__/Ent.generic.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { createVC, expectToMatchSnapshot } from "./test-utils";
/**
* Company
*/
export class EntTestCompany extends BaseEnt(
class EntTestCompany extends BaseEnt(
testCluster,
new PgSchema(
'ent.generic"company',
Expand Down Expand Up @@ -58,7 +58,7 @@ export class EntTestCompany extends BaseEnt(
/**
* User -> Company
*/
export class EntTestUser extends BaseEnt(
class EntTestUser extends BaseEnt(
testCluster,
new PgSchema(
'ent.generic"user',
Expand Down Expand Up @@ -110,7 +110,7 @@ export class EntTestUser extends BaseEnt(
/**
* Post -> User -> Company
*/
export class EntTestPost extends BaseEnt(
class EntTestPost extends BaseEnt(
testCluster,
new PgSchema(
'ent.generic"post',
Expand Down Expand Up @@ -165,7 +165,7 @@ export class EntTestPost extends BaseEnt(
/**
* Comment -> Post -> User -> Company
*/
export class EntTestComment extends BaseEnt(
class EntTestComment extends BaseEnt(
testCluster,
new PgSchema(
'ent.generic"comment',
Expand Down Expand Up @@ -210,7 +210,7 @@ export class EntTestComment extends BaseEnt(
/**
* Like -> Post -> User -> Company
*/
export class EntTestLike extends BaseEnt(
class EntTestLike extends BaseEnt(
testCluster,
new PgSchema(
'ent.generic"like',
Expand Down
2 changes: 1 addition & 1 deletion src/ent/__tests__/QueryCache.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { AllowIf } from "../rules/AllowIf";
import { GLOBAL_SHARD } from "../ShardAffinity";
import { createVC } from "./test-utils";

export class EntTestCompany extends BaseEnt(
class EntTestCompany extends BaseEnt(
testCluster,
new PgSchema(
'query-cache"company',
Expand Down
6 changes: 3 additions & 3 deletions src/ent/__tests__/Triggers.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const $EPHEMERAL2 = Symbol("$EPHEMERAL2");
/**
* User
*/
export class EntTestUser extends BaseEnt(
class EntTestUser extends BaseEnt(
testCluster,
new PgSchema(
'ent.triggers"user',
Expand Down Expand Up @@ -65,7 +65,7 @@ export class EntTestUser extends BaseEnt(
/**
* Headline -> User -> Company
*/
export class EntTestHeadline extends BaseEnt(
class EntTestHeadline extends BaseEnt(
testCluster,
new PgSchema(
'ent.triggers"headline',
Expand Down Expand Up @@ -269,7 +269,7 @@ export class EntTestHeadline extends BaseEnt(
/**
* Country
*/
export class EntTestCountry extends BaseEnt(
class EntTestCountry extends BaseEnt(
testCluster,
new PgSchema(
'ent.triggers"country',
Expand Down
Loading
Loading