Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions content/src/components.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ export async function initComponentsWithEnv(env: Environment): Promise<AppCompon
const deployedEntitiesBloomFilter = createDeployedEntitiesBloomFilter({ database, logs, clock })
const activeEntities = createActiveEntitiesComponent({ database, env, logs, metrics, denylist, sequentialExecutor })

await activeEntities.initialize(database)

const deployer = createDeployer({
metrics,
storage,
Expand Down
17 changes: 17 additions & 0 deletions content/src/controller/handlers/active-entities-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,20 @@ export async function getActiveEntitiesHandler(
body: entities
}
}

// Method: GET
export async function getActiveEntitiesScenesHandler(
context: HandlerContextWithPath<'activeEntities' | 'denylist', '/entities/active/scenes'>
): Promise<{ status: 200; body: Pick<Entity, 'id' | 'pointers' | 'timestamp'>[] }> {
const { activeEntities, denylist } = context.components
const entities: Entity[] = activeEntities.getAllCachedScenes().filter((result) => !denylist.isDenylisted(result.id))
const mapping = entities.map((entity) => ({
id: entity.id,
pointers: entity.pointers,
timestamp: entity.timestamp
}))
return {
status: 200,
body: mapping
}
}
21 changes: 11 additions & 10 deletions content/src/controller/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,24 @@ import { Router } from '@well-known-components/http-server'
import { multipartParserWrapper } from '@well-known-components/multipart-wrapper'
import { EnvironmentConfig } from '../Environment'
import { GlobalContext } from '../types'
import { getActiveEntitiesHandler } from './handlers/active-entities-handler'
import { getActiveEntitiesHandler, getActiveEntitiesScenesHandler } from './handlers/active-entities-handler'
import { createEntity } from './handlers/create-entity-handler'
import { createErrorHandler, preventExecutionIfBoostrapping } from './middlewares'
import { getFailedDeploymentsHandler } from './handlers/failed-deployments-handler'
import { getEntitiesByPointerPrefixHandler } from './handlers/filter-by-urn-handler'
import { getActiveEntityIdsByDeploymentHashHandler } from './handlers/get-active-entities-by-deployment-hash-handler'
import { getEntityAuditInformationHandler } from './handlers/get-audit-handler'
import { getAvailableContentHandler } from './handlers/get-available-content-handler'
import { getPointerChangesHandler } from './handlers/pointer-changes-handler'
import { getStatusHandler } from './handlers/status-handler'
import { getSnapshotsHandler } from './handlers/get-snapshots-handler'
import { getEntitiesHandler } from './handlers/get-entities-handler'
import { getChallengeHandler } from './handlers/get-challenge-handler'
import { getContentHandler } from './handlers/get-content-handler'
import { getEntityThumbnailHandler } from './handlers/get-entity-thumbnail-handler'
import { getDeploymentsHandler } from './handlers/get-deployments-handler'
import { getEntitiesHandler } from './handlers/get-entities-handler'
import { getEntityImageHandler } from './handlers/get-entity-image-handler'
import { getEntityThumbnailHandler } from './handlers/get-entity-thumbnail-handler'
import { getERC721EntityHandler } from './handlers/get-erc721-entity-handler'
import { getDeploymentsHandler } from './handlers/get-deployments-handler'
import { getChallengeHandler } from './handlers/get-challenge-handler'
import { getActiveEntityIdsByDeploymentHashHandler } from './handlers/get-active-entities-by-deployment-hash-handler'
import { getSnapshotsHandler } from './handlers/get-snapshots-handler'
import { getPointerChangesHandler } from './handlers/pointer-changes-handler'
import { getStatusHandler } from './handlers/status-handler'
import { createErrorHandler, preventExecutionIfBoostrapping } from './middlewares'

// We return the entire router because it will be easier to test than a whole server
export async function setupRouter({ components }: GlobalContext): Promise<Router<GlobalContext>> {
Expand All @@ -42,6 +42,7 @@ export async function setupRouter({ components }: GlobalContext): Promise<Router
router.get('/entities/:type', getEntitiesHandler) // TODO: Deprecate
router.get('/entities/active/collections/:collectionUrn', getEntitiesByPointerPrefixHandler)
router.post('/entities/active', getActiveEntitiesHandler)
router.get('/entities/active/scenes', getActiveEntitiesScenesHandler)
router.head('/contents/:hashId', getContentHandler)
router.get('/contents/:hashId', getContentHandler)
router.get('/available-content', getAvailableContentHandler)
Expand Down
44 changes: 29 additions & 15 deletions content/src/logic/deployments.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,13 +224,16 @@ export async function getDeployments(
export async function getDeploymentsForActiveEntities(
database: DatabaseClient,
entityIds?: string[],
pointers?: string[]
pointers?: string[],
entityType?: EntityType
): Promise<Deployment[]> {
// Generate the select according the info needed
const bothPresent = entityIds && entityIds.length > 0 && pointers && pointers.length > 0
const nonePresent = !entityIds && !pointers
if (bothPresent || nonePresent) {
throw Error('in getDeploymentsForActiveEntities ids or pointers must be present, but not both')
// Validate that only one parameter is provided
const providedParams = [entityIds && entityIds.length > 0, pointers && pointers.length > 0, entityType].filter(
Boolean
)

if (providedParams.length !== 1) {
throw Error('getDeploymentsForActiveEntities requires exactly one of: entityIds, pointers, or entityType')
}

const query: SQLStatement = SQL`
Expand All @@ -248,15 +251,23 @@ export async function getDeploymentsForActiveEntities(
FROM deployments AS dep1
WHERE dep1.deleter_deployment IS NULL
AND `.append(
entityIds
entityIds && entityIds.length > 0
? SQL`dep1.entity_id = ANY (${entityIds})`
: SQL`dep1.entity_pointers && ${pointers!.map((p) => p.toLowerCase())}`
: pointers && pointers.length > 0
? SQL`dep1.entity_pointers && ${pointers.map((p) => p.toLowerCase())}`
: SQL`dep1.entity_type = ${entityType}`
)

const historicalDeploymentsResponse = await database.queryWithValues(query, 'get_active_entities')
const BATCH_SIZE = 1000
const deploymentsResult: HistoricalDeployment[] = []
const cursor = await database.streamQuery<HistoricalDeploymentsRow>(
query,
{ batchSize: BATCH_SIZE },
'get_active_entities'
)

const deploymentsResult: HistoricalDeployment[] = historicalDeploymentsResponse.rows.map(
(row: HistoricalDeploymentsRow): HistoricalDeployment => ({
for await (const row of cursor) {
deploymentsResult.push({
deploymentId: row.id,
entityType: row.entity_type,
entityId: row.entity_id,
Expand All @@ -269,12 +280,15 @@ export async function getDeploymentsForActiveEntities(
localTimestamp: row.local_timestamp,
overwrittenBy: row.overwritten_by ?? undefined
})
)

const deploymentIds = deploymentsResult.map(({ deploymentId }) => deploymentId)
}

const content = await getContentFiles(database, deploymentIds)
// Batch fetch all content files at once
const content = await getContentFiles(
database,
deploymentsResult.map((d) => d.deploymentId)
)

// Map results to final format
return deploymentsResult.map((result) => ({
entityVersion: result.version as EntityVersion,
entityType: result.entityType as EntityType,
Expand Down
112 changes: 98 additions & 14 deletions content/src/ports/activeEntities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ export type ActiveEntities = IBaseComponent & {
* Note: only used in stale profiles GC
*/
clearPointers(pointers: string[]): Promise<void>

/**
* Initialize the cache with the active entities for the cached entity types
*/
initialize(database: DatabaseClient): Promise<void>

/**
* Get all cached scenes
*/
getAllCachedScenes(): Entity[]
}

/**
Expand All @@ -81,9 +91,6 @@ export function createActiveEntitiesComponent(
components: Pick<AppComponents, 'database' | 'env' | 'logs' | 'metrics' | 'denylist' | 'sequentialExecutor'>
): ActiveEntities {
const logger = components.logs.getLogger('ActiveEntities')
const cache = new LRU<string, Entity | NotActiveEntity>({
max: components.env.getConfig(EnvironmentConfig.ENTITIES_CACHE_SIZE)
})

const collectionUrnsByPrefixCache = new LRU<string, string[]>({
ttl: 1000 * 60 * 60 * 24, // 24 hours
Expand All @@ -94,6 +101,47 @@ export function createActiveEntitiesComponent(

const normalizePointerCacheKey = (pointer: string) => pointer.toLowerCase()

const cache = new LRU<string, Entity | NotActiveEntity>({
max: components.env.getConfig(EnvironmentConfig.ENTITIES_CACHE_SIZE)
})
const fixedCache = new Map<string, Entity | NotActiveEntity>()

const createLRUandFixedCache = (maxLRU: number, fixedTypes: EntityType[]) => {
return {
get(entityId: string) {
return cache.get(entityId) || fixedCache.get(entityId)
},
set(entityId: string, entity: Entity | NotActiveEntity) {
const isFixed = fixedCache.has(entityId) || (typeof entity !== 'string' && fixedTypes.includes(entity.type))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ain't the second part of the condition enough to use the fixedCache? (I mean if entity type is SCENE we should already store it regardless if it is already stored there, right?)

if (isFixed) {
return fixedCache.set(entityId, entity)
}
return cache.set(entityId, entity)
},
setFixed(entityId: string, entity: Entity | NotActiveEntity) {
return fixedCache.set(entityId, entity)
},
get max() {
return cache.max
},
clear() {
cache.clear()
fixedCache.clear()
},
has(entityId: string) {
return cache.has(entityId) || fixedCache.has(entityId)
}
}
}

const cachedEntityntityTypes = [EntityType.SCENE]

// Entities cache by key=entityId
const entityCache = createLRUandFixedCache(
components.env.getConfig(EnvironmentConfig.ENTITIES_CACHE_SIZE),
cachedEntityntityTypes
)

const createEntityByPointersCache = (): Map<string, string | NotActiveEntity> => {
const entityIdByPointers = new Map<string, string | NotActiveEntity>()
return {
Expand All @@ -116,7 +164,7 @@ export function createActiveEntitiesComponent(
const entityIdByPointers = createEntityByPointersCache()

// init gauge metrics
components.metrics.observe('dcl_entities_cache_storage_max_size', {}, cache.max)
components.metrics.observe('dcl_entities_cache_storage_max_size', {}, entityCache.max)
Object.values(EntityType).forEach((entityType) => {
components.metrics.observe('dcl_entities_cache_storage_size', { entity_type: entityType }, 0)
})
Expand All @@ -133,9 +181,9 @@ export function createActiveEntitiesComponent(
// pointer now have a different active entity, let's update the old one
const entityId = entityIdByPointers.get(pointer)
if (isPointingToEntity(entityId)) {
const entity = cache.get(entityId) // it should be present
const entity = entityCache.get(entityId) // it should be present
if (isEntityPresent(entity)) {
cache.set(entityId, 'NOT_ACTIVE_ENTITY')
entityCache.set(entityId, 'NOT_ACTIVE_ENTITY')
for (const pointer of entity.pointers) {
entityIdByPointers.set(pointer, 'NOT_ACTIVE_ENTITY')
}
Expand All @@ -158,7 +206,7 @@ export function createActiveEntitiesComponent(
entityIdByPointers.set(pointer, isEntityPresent(entity) ? entity.id : entity)
}
if (isEntityPresent(entity)) {
cache.set(entity.id, entity)
entityCache.set(entity.id, entity)
components.metrics.increment('dcl_entities_cache_storage_size', { entity_type: entity.type })
// Store in the db the new entity pointed by pointers
await updateActiveDeployments(database, pointers, entity.id)
Expand Down Expand Up @@ -196,7 +244,7 @@ export function createActiveEntitiesComponent(
)

for (const entityId of entityIdsWithoutActiveEntity) {
cache.set(entityId, 'NOT_ACTIVE_ENTITY')
entityCache.set(entityId, 'NOT_ACTIVE_ENTITY')
logger.debug('entityId has no active entity', { entityId })
}
}
Expand Down Expand Up @@ -227,6 +275,19 @@ export function createActiveEntitiesComponent(
return entities
}

async function populateEntityType(database: DatabaseClient, entityType: EntityType): Promise<void> {
const deployments = await getDeploymentsForActiveEntities(database, undefined, undefined, entityType)

logger.info('Populating cache for entity type', { entityType, deployments: deployments.length })

for (const deployment of deployments) {
reportCacheAccess(deployment.entityType, 'miss')
}

const entities = mapDeploymentsToEntities(deployments)
await updateCache(database, entities, {})
}

/**
* Retrieve active entities by their ids
*/
Expand All @@ -236,7 +297,7 @@ export function createActiveEntitiesComponent(
const onCache: (Entity | NotActiveEntity)[] = []
const remaining: string[] = []
for (const entityId of uniqueEntityIds) {
const entity = cache.get(entityId)
const entity = entityCache.get(entityId)
if (entity) {
onCache.push(entity)
if (isEntityPresent(entity)) {
Expand Down Expand Up @@ -278,6 +339,8 @@ export function createActiveEntitiesComponent(
}
}

logger.info('Retrieving entities by pointers', { pointers: remaining.length })

// once we get the ids, retrieve from cache or find
const entityIds = Array.from(uniqueEntityIds.values())
const entitiesById = await withIds(database, entityIds)
Expand Down Expand Up @@ -313,7 +376,7 @@ export function createActiveEntitiesComponent(
for (const pointer of pointers) {
if (entityIdByPointers.has(pointer)) {
const entityId = entityIdByPointers.get(pointer)!
cache.set(entityId, 'NOT_ACTIVE_ENTITY')
entityCache.set(entityId, 'NOT_ACTIVE_ENTITY')
entityIdByPointers.set(pointer, 'NOT_ACTIVE_ENTITY')
}
}
Expand All @@ -322,7 +385,27 @@ export function createActiveEntitiesComponent(
function reset() {
entityIdByPointers.clear()
collectionUrnsByPrefixCache.clear()
cache.clear()
entityCache.clear()
}

async function initialize(database: DatabaseClient) {
logger.info('Initializing active entities cache', {
entityTypes: cachedEntityntityTypes.map((t) => t.toString()).toString()
})
try {
for (const entityType of cachedEntityntityTypes) {
await populateEntityType(database, entityType)
}
logger.info('Active entities cache initialized')
} catch (error) {
logger.error('Error initializing active entities cache', { error })
}
}

function getAllCachedScenes() {
return Array.from(fixedCache.values()).filter(
(entity): entity is Entity => typeof entity !== 'string' && entity.type === EntityType.SCENE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with this check, but can we prevent typeof entity !== string on setting time instead of retrieve time?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The LRU cache was already with this workaround for unsetting pointers with no longer active entities. I haven't had the intention of modifying this current behavior

)
}

return {
Expand All @@ -333,10 +416,11 @@ export function createActiveEntitiesComponent(
update,
clear,
clearPointers,

initialize,
getAllCachedScenes,
getCachedEntity(idOrPointer) {
if (cache.has(idOrPointer)) {
const cachedEntity = cache.get(idOrPointer)
if (entityCache.has(idOrPointer)) {
const cachedEntity = entityCache.get(idOrPointer)
return isEntityPresent(cachedEntity) ? cachedEntity.id : cachedEntity
}
return entityIdByPointers.get(idOrPointer)
Expand Down
Loading