diff --git a/storage-node/.eslintrc.js b/storage-node/.eslintrc.js index cc0fb2f316..872cb89882 100644 --- a/storage-node/.eslintrc.js +++ b/storage-node/.eslintrc.js @@ -11,5 +11,6 @@ module.exports = { 'no-unused-vars': 'off', // Required by the typescript rule below '@typescript-eslint/no-unused-vars': ['error'], '@typescript-eslint/no-floating-promises': 'error', + 'no-useless-constructor': 'off', }, } diff --git a/storage-node/package.json b/storage-node/package.json index 4fae3846c1..9fe36e34ed 100644 --- a/storage-node/package.json +++ b/storage-node/package.json @@ -57,6 +57,7 @@ "sleep-promise": "^9.1.0", "subscriptions-transport-ws": "^0.11.0", "superagent": "^6.1.0", + "superagent-node-http-timings": "1.0.1", "tslib": "^1", "url-join": "^4.0.1", "uuid": "^8.3.2", diff --git a/storage-node/src/@types/superagent-node-http-timings/index.d.ts b/storage-node/src/@types/superagent-node-http-timings/index.d.ts new file mode 100644 index 0000000000..2c279e3743 --- /dev/null +++ b/storage-node/src/@types/superagent-node-http-timings/index.d.ts @@ -0,0 +1 @@ +declare module 'superagent-node-http-timings' diff --git a/storage-node/src/commands/server.ts b/storage-node/src/commands/server.ts index f64c67bc1d..c7ca0aaf69 100644 --- a/storage-node/src/commands/server.ts +++ b/storage-node/src/commands/server.ts @@ -388,16 +388,7 @@ async function runSyncWithInterval( while (true) { try { logger.info(`Resume syncing....`) - await performSync( - api, - buckets, - syncWorkersNumber, - syncWorkersTimeout, - qnApi, - uploadsDirectory, - tempDirectory, - hostId - ) + await performSync(buckets, syncWorkersNumber, syncWorkersTimeout, qnApi, uploadsDirectory, tempDirectory, hostId) logger.info(`Sync run complete. Next run in ${syncIntervalMinutes} minute(s).`) await sleep(sleepInterval) } catch (err) { diff --git a/storage-node/src/commands/util/fetch-bucket.ts b/storage-node/src/commands/util/fetch-bucket.ts index 3fb885510e..c46ce98f4c 100644 --- a/storage-node/src/commands/util/fetch-bucket.ts +++ b/storage-node/src/commands/util/fetch-bucket.ts @@ -70,7 +70,6 @@ export default class FetchBucket extends Command { try { await performSync( - undefined, [bucketId], flags.syncWorkersNumber, flags.syncWorkersTimeout, diff --git a/storage-node/src/services/queryNode/queries/queries.graphql b/storage-node/src/services/queryNode/queries/queries.graphql index 0bf24b24d9..4325ae16a0 100644 --- a/storage-node/src/services/queryNode/queries/queries.graphql +++ b/storage-node/src/services/queryNode/queries/queries.graphql @@ -96,6 +96,8 @@ query getBagConnection($bucketIds: [ID!], $limit: Int, $cursor: String) { fragment DataObjectByBagIdsDetails on StorageDataObject { id + size + ipfsHash storageBagId } diff --git a/storage-node/src/services/sync/remoteStorageData.ts b/storage-node/src/services/sync/remoteStorageData.ts deleted file mode 100644 index 42fd748746..0000000000 --- a/storage-node/src/services/sync/remoteStorageData.ts +++ /dev/null @@ -1,63 +0,0 @@ -import superagent from 'superagent' -import urljoin from 'url-join' -import logger from '../logger' -import NodeCache from 'node-cache' - -// Expiration period in seconds for the local cache. -const ExpirationPeriod: number = 3 * 60 // minutes - -// Max data entries in local cache -const MaxEntries = 10000 - -// Local in-memory cache for data object IDs by operator URL. -const availableIDsCache = new NodeCache({ - stdTTL: ExpirationPeriod, - deleteOnExpire: true, - maxKeys: MaxEntries, -}) - -// Local in-memory cache for faulty operator URL. Prevents fetching from the -// offline storage nodes. -const badOperatorUrls = new NodeCache({ - stdTTL: ExpirationPeriod, - deleteOnExpire: true, - maxKeys: MaxEntries, -}) - -/** - * Queries the remote storage node for its data object IDs from the storage. - * It caches the result (including errors) for some limited time. - * - * @param operatorUrl - remote storage node URL - */ -export async function getRemoteDataObjects(operatorUrl: string, hostId: string): Promise { - const url = urljoin(operatorUrl, 'api/v1/state/data-objects') - - const faultyOperator = badOperatorUrls.has(operatorUrl) - if (faultyOperator) { - logger.debug(`Sync - cached error for the ${url} skipping ....`) - return [] - } - - const cachedData = availableIDsCache.get(url) - if (cachedData) { - logger.debug(`Sync - getting from cache available data for ${url}`) - return cachedData - } - - try { - logger.debug(`Sync - fetching available data for ${url}`) - const timeoutMs = 120 * 1000 // 2 min - const response = await superagent.get(url).timeout(timeoutMs).set('X-COLOSSUS-HOST-ID', hostId) - - const data = response.body - availableIDsCache.set(url, data, ExpirationPeriod) - - return data - } catch (err) { - logger.error(`Sync - fetching data error from ${url}: ${err}`) - badOperatorUrls.set(operatorUrl, null, ExpirationPeriod) - } - - return [] -} diff --git a/storage-node/src/services/sync/storageObligations.ts b/storage-node/src/services/sync/storageObligations.ts index 05d495b4c0..d987be472d 100644 --- a/storage-node/src/services/sync/storageObligations.ts +++ b/storage-node/src/services/sync/storageObligations.ts @@ -76,6 +76,16 @@ type DataObject = { * Assigned bag ID */ bagId: string + + /** + * Data Object size (in bytes) + */ + size: string + + /** + * Data Object hash + */ + ipfsHash: string } /** @@ -109,6 +119,8 @@ export async function getStorageObligationsFromRuntime( })), dataObjects: assignedDataObjects.map((dataObject) => ({ id: dataObject.id, + size: dataObject.size, + ipfsHash: dataObject.ipfsHash, bagId: dataObject.storageBagId, })), } diff --git a/storage-node/src/services/sync/synchronizer.ts b/storage-node/src/services/sync/synchronizer.ts index 0b32c7b170..335a243258 100644 --- a/storage-node/src/services/sync/synchronizer.ts +++ b/storage-node/src/services/sync/synchronizer.ts @@ -1,11 +1,10 @@ -import { ApiPromise } from '@polkadot/api' import _ from 'lodash' import { getDataObjectIDs } from '../../services/caching/localDataObjects' import logger from '../../services/logger' import { QueryNodeApi } from '../queryNode/api' import { DataObligations, getStorageObligationsFromRuntime } from './storageObligations' -import { DownloadFileTask, PrepareDownloadFileTask, SyncTask } from './tasks' -import { TaskProcessorSpawner, TaskSink, WorkingStack } from './workingProcess' +import { DownloadFileTask } from './tasks' +import { TaskProcessorSpawner, WorkingStack } from './workingProcess' /** * Temporary directory name for data uploading. @@ -31,12 +30,11 @@ export const PendingDirName = 'pending' * @param qnApi - Query Node API * @param uploadDirectory - local directory to get file names from * @param tempDirectory - local directory for temporary data uploading - * @param operatorUrl - (optional) defines the data source URL. If not set + * @param selectedOperatorUrl - (optional) defines the data source URL. If not set * the source URL is resolved for each data object separately using the Query * Node information about the storage providers. */ export async function performSync( - api: ApiPromise | undefined, buckets: string[], asyncWorkersNumber: number, asyncWorkersTimeout: number, @@ -44,37 +42,31 @@ export async function performSync( uploadDirectory: string, tempDirectory: string, hostId: string, - operatorUrl?: string + selectedOperatorUrl?: string ): Promise { logger.info('Started syncing...') const [model, files] = await Promise.all([getStorageObligationsFromRuntime(qnApi, buckets), getDataObjectIDs()]) - const requiredIds = model.dataObjects.map((obj) => obj.id) + const required = model.dataObjects - const added = _.difference(requiredIds, files) - const removed = _.difference(files, requiredIds) + const added = _.differenceWith(required, files, (required, file) => required.id === file) + const removed = _.differenceWith(files, required, (file, required) => file === required.id) logger.debug(`Sync - new objects: ${added.length}`) logger.debug(`Sync - obsolete objects: ${removed.length}`) const workingStack = new WorkingStack() - let addedTasks: SyncTask[] - if (operatorUrl === undefined) { - addedTasks = await getPrepareDownloadTasks( - api, - model, - buckets, - added, - uploadDirectory, - tempDirectory, - workingStack, - asyncWorkersTimeout, - hostId - ) - } else { - addedTasks = await getDownloadTasks(operatorUrl, added, uploadDirectory, tempDirectory, asyncWorkersTimeout, hostId) - } + const addedTasks = await getDownloadTasks( + model, + buckets, + added, + uploadDirectory, + tempDirectory, + asyncWorkersTimeout, + hostId, + selectedOperatorUrl + ) logger.debug(`Sync - started processing...`) @@ -87,28 +79,28 @@ export async function performSync( } /** - * Creates the download preparation tasks. + * Creates the download tasks. * - * @param api - Runtime API promise - * @param ownBuckets - list of bucket ids operated this node * @param dataObligations - defines the current data obligations for the node + * @param ownBuckets - list of bucket ids operated this node * @param addedIds - data object IDs to download * @param uploadDirectory - local directory for data uploading * @param tempDirectory - local directory for temporary data uploading * @param taskSink - a destination for the newly created tasks * @param asyncWorkersTimeout - downloading asset timeout + * @param hostId - Random host UUID assigned to each node during bootstrap + * @param selectedOperatorUrl - operator URL selected for syncing objects */ -async function getPrepareDownloadTasks( - api: ApiPromise | undefined, +async function getDownloadTasks( dataObligations: DataObligations, ownBuckets: string[], - addedIds: string[], + added: DataObligations['dataObjects'], uploadDirectory: string, tempDirectory: string, - taskSink: TaskSink, asyncWorkersTimeout: number, - hostId: string -): Promise { + hostId: string, + selectedOperatorUrl?: string +): Promise { const bagIdByDataObjectId = new Map() for (const entry of dataObligations.dataObjects) { bagIdByDataObjectId.set(entry.id, entry.bagId) @@ -148,53 +140,27 @@ async function getPrepareDownloadTasks( bagOperatorsUrlsById.set(entry.id, operatorUrls) } - const tasks = addedIds.map((id) => { + const tasks = added.map((dataObject) => { let operatorUrls: string[] = [] // can be empty after look up let bagId = null - if (bagIdByDataObjectId.has(id)) { - bagId = bagIdByDataObjectId.get(id) + if (bagIdByDataObjectId.has(dataObject.id)) { + bagId = bagIdByDataObjectId.get(dataObject.id) if (bagOperatorsUrlsById.has(bagId)) { operatorUrls = bagOperatorsUrlsById.get(bagId) } } - return new PrepareDownloadFileTask( - operatorUrls, - hostId, - bagId, - id, + return new DownloadFileTask( + selectedOperatorUrl ? [selectedOperatorUrl] : operatorUrls, + dataObject.id, + dataObject.size, + dataObject.ipfsHash, uploadDirectory, tempDirectory, - taskSink, asyncWorkersTimeout, - api + hostId ) }) return tasks } - -/** - * Creates the download file tasks. - * - * @param operatorUrl - defines the data source URL. - * @param addedIds - data object IDs to download - * @param uploadDirectory - local directory for data uploading - * @param tempDirectory - local directory for temporary data uploading - * @param downloadTimeout - asset downloading timeout (in minutes) - */ -async function getDownloadTasks( - operatorUrl: string, - addedIds: string[], - uploadDirectory: string, - tempDirectory: string, - downloadTimeout: number, - hostId: string -): Promise { - const addedTasks = addedIds.map( - (fileName) => - new DownloadFileTask(operatorUrl, fileName, undefined, uploadDirectory, tempDirectory, downloadTimeout, hostId) - ) - - return addedTasks -} diff --git a/storage-node/src/services/sync/tasks.ts b/storage-node/src/services/sync/tasks.ts index f0b168014f..0bc069b4fc 100644 --- a/storage-node/src/services/sync/tasks.ts +++ b/storage-node/src/services/sync/tasks.ts @@ -1,10 +1,9 @@ -import { ApiPromise } from '@polkadot/api' -import { hexToString } from '@polkadot/util' import fs from 'fs' import _ from 'lodash' import path from 'path' import { pipeline } from 'stream' import superagent from 'superagent' +import superagentTimings from 'superagent-node-http-timings' import urljoin from 'url-join' import { promisify } from 'util' import { v4 as uuidv4 } from 'uuid' @@ -15,14 +14,39 @@ import { getDataObjectIdFromCache, } from '../caching/localDataObjects' import { isNewDataObject } from '../caching/newUploads' -import { parseBagId } from '../helpers/bagTypes' import { hashFile } from '../helpers/hashing' -import { getRemoteDataObjects } from './remoteStorageData' -import { TaskSink } from './workingProcess' const fsPromises = fs.promises +class DownloadSpeedTracker { + private downloadSpeedsByUrl: Map + + constructor(private maxRecordsPerUrl: number) { + this.downloadSpeedsByUrl = new Map() + } + + recordDownloadSpeed(url: string, speed: number): void { + const downloadSpeeds = this.downloadSpeedsByUrl.get(url) || [] + downloadSpeeds.push(speed) + + // Keep only the last `maxRecordsPerUrl` records + if (downloadSpeeds.length > this.maxRecordsPerUrl) { + downloadSpeeds.shift() + } + + // Update the map with the new download times list + this.downloadSpeedsByUrl.set(url, downloadSpeeds) + } + + getAverageDownloadSpeed(url: string): number { + const downloadSpeeds = this.downloadSpeedsByUrl.get(url) + return _.mean(downloadSpeeds) || 0 + } +} + +const downloadSpeedTracker = new DownloadSpeedTracker(10) + /** - * Defines syncronization task abstraction. + * Defines synchronization task abstraction. */ export interface SyncTask { /** @@ -77,54 +101,71 @@ export class DeleteLocalFileTask implements SyncTask { * Download the file from the remote storage node to the local storage. */ export class DownloadFileTask implements SyncTask { - dataObjectId: string - expectedHash?: string - uploadsDirectory: string - tempDirectory: string - url: string - downloadTimeout: number - hostId: string - constructor( - baseUrl: string, - dataObjectId: string, - expectedHash: string | undefined, - uploadsDirectory: string, - tempDirectory: string, - downloadTimeout: number, - hostId: string - ) { - this.dataObjectId = dataObjectId - this.expectedHash = expectedHash - this.uploadsDirectory = uploadsDirectory - this.tempDirectory = tempDirectory - this.downloadTimeout = downloadTimeout - this.url = urljoin(baseUrl, 'api/v1/files', dataObjectId) - this.hostId = hostId - } + private baseUrls: string[], + private dataObjectId: string, + private expectedSize: string, + private expectedHash: string, + private uploadsDirectory: string, + private tempDirectory: string, + private downloadTimeout: number, + private hostId: string + ) {} description(): string { - return `Sync - downloading file: ${this.url} to ${this.uploadsDirectory} ....` + return `Sync - Trying for download of: ${this.dataObjectId} ....` } async execute(): Promise { + const baseUrls: string[] = this.getBaseUrlsOrderedByAvgDownloadSpeed() + + for (const baseUrl of baseUrls) { + logger.debug(`Sync - storage node URL chosen for download: ${baseUrl}`) + + const filepath = path.join(this.uploadsDirectory, this.dataObjectId) + + // Try downloading file + await this.tryDownload(baseUrl, filepath) + + // if download succeeds, break the loop + if (fs.existsSync(filepath)) { + return + } + } + + logger.warn(`Sync - cannot get operator URLs for ${this.dataObjectId}`) + } + + async tryDownload(baseUrl: string, filepath: string): Promise { const streamPipeline = promisify(pipeline) - const filepath = path.join(this.uploadsDirectory, this.dataObjectId) // We create tempfile first to mitigate partial downloads on app (or remote node) crash. // This partial downloads will be cleaned up during the next sync iteration. const tempFilePath = path.join(this.tempDirectory, uuidv4()) + const url = urljoin(baseUrl, 'api/v1/files', this.dataObjectId) try { const timeoutMs = this.downloadTimeout * 60 * 1000 // Casting because of: // https://stackoverflow.com/questions/38478034/pipe-superagent-response-to-express-response const request = superagent - .get(this.url) + .get(url) + .use( + superagentTimings((err: unknown, result: { status: number; timings: { total: number } }) => { + if (err) { + logger.error(`Sync - error measuring download time for ${url}: ${err}`, { err }) + } + + // Record download speed for given operator (speed = bytes/ms) + if (result.status === 200) { + downloadSpeedTracker.recordDownloadSpeed(baseUrl, parseInt(this.expectedSize) / result.timings.total) + } + }) + ) .timeout(timeoutMs) .set('X-COLOSSUS-HOST-ID', this.hostId) as unknown as NodeJS.ReadableStream const fileStream = fs.createWriteStream(tempFilePath) request.on('response', (res) => { - if (!res.ok) { + if (!res.ok && res.statusCode !== 404) { logger.error(`Sync - unexpected status code(${res.statusCode}) for ${res?.request?.url}`) } @@ -132,19 +173,19 @@ export class DownloadFileTask implements SyncTask { // prematurely aborted/closed due to timeout and the response still was not completed // See: https://github.com/nodejs/node/blob/cd171576b2d1376dae3eb371b6da5ccf04dc4a85/lib/_http_client.js#L439-L441 res.on('error', (err: Error) => { - logger.error(`Sync - fetching data error for ${this.url}: ${err}`, { err }) + logger.error(`Sync - fetching data error for ${url}: ${err}`, { err }) }) }) request.on('error', (err) => { - logger.error(`Sync - fetching data error for ${this.url}: ${err}`, { err }) + logger.error(`Sync - fetching data error for ${url}: ${err}`, { err }) }) await streamPipeline(request, fileStream) await this.verifyDownloadedFile(tempFilePath) await fsPromises.rename(tempFilePath, filepath) await addDataObjectIdToCache(this.dataObjectId) } catch (err) { - logger.error(`Sync - fetching data error for ${this.url}: ${err}`, { err }) + logger.error(`Sync - fetching data error for ${url}: ${err}`, { err }) try { logger.warn(`Cleaning up file ${tempFilePath}`) await fsPromises.unlink(tempFilePath) @@ -159,115 +200,21 @@ export class DownloadFileTask implements SyncTask { * @param filePath downloaded file path */ async verifyDownloadedFile(filePath: string): Promise { - if (!_.isEmpty(this.expectedHash)) { - const hash = await hashFile(filePath) + const hash = await hashFile(filePath) - if (hash !== this.expectedHash) { - throw new Error(`Invalid file hash. Expected: ${this.expectedHash} - real: ${hash}`) - } + if (hash !== this.expectedHash) { + throw new Error(`Invalid file hash. Expected: ${this.expectedHash} - real: ${hash}`) } } -} - -/** - * Resolve remote storage node URLs and creates file downloading tasks (DownloadFileTask). - */ -export class PrepareDownloadFileTask implements SyncTask { - bagId: string - dataObjectId: string - operatorUrlCandidates: string[] - taskSink: TaskSink - uploadsDirectory: string - tempDirectory: string - api?: ApiPromise - downloadTimeout: number - hostId: string - - constructor( - operatorUrlCandidates: string[], - hostId: string, - bagId: string, - dataObjectId: string, - uploadsDirectory: string, - tempDirectory: string, - taskSink: TaskSink, - downloadTimeout: number, - api?: ApiPromise - ) { - this.api = api - this.bagId = bagId - this.dataObjectId = dataObjectId - this.taskSink = taskSink - this.operatorUrlCandidates = operatorUrlCandidates - this.uploadsDirectory = uploadsDirectory - this.tempDirectory = tempDirectory - this.downloadTimeout = downloadTimeout - this.hostId = hostId - } - - description(): string { - return `Sync - preparing for download of: ${this.dataObjectId} ....` - } - - async execute(): Promise { - // Create an array of operator URL indices to maintain a random URL choice - // cannot use the original array because we shouldn't modify the original data. - // And cloning it seems like a heavy operation. - const operatorUrlIndices: number[] = [...Array(this.operatorUrlCandidates.length).keys()] - - if (_.isEmpty(this.bagId)) { - logger.error(`Sync - invalid task - no bagId for ${this.dataObjectId}`) - return - } - - while (!_.isEmpty(operatorUrlIndices)) { - const randomUrlIndex = _.sample(operatorUrlIndices) - if (randomUrlIndex === undefined) { - logger.warn(`Sync - cannot get a random URL`) - break - } - - const randomUrl = this.operatorUrlCandidates[randomUrlIndex] - logger.debug(`Sync - random storage node URL was chosen ${randomUrl}`) - - // Remove random url from the original list. - _.remove(operatorUrlIndices, (index) => index === randomUrlIndex) - - try { - const chosenBaseUrl = randomUrl - const [remoteOperatorIds, hash] = await Promise.all([ - getRemoteDataObjects(chosenBaseUrl, this.hostId), - this.getExpectedHash(), - ]) - - if (remoteOperatorIds.includes(this.dataObjectId)) { - const newTask = new DownloadFileTask( - chosenBaseUrl, - this.dataObjectId, - hash, - this.uploadsDirectory, - this.tempDirectory, - this.downloadTimeout, - this.hostId - ) - - return this.taskSink.add([newTask]) - } - } catch (err) { - logger.error(`Sync - fetching data error for ${this.dataObjectId}: ${err}`, { err }) - } - } - logger.warn(`Sync - cannot get operator URLs for ${this.dataObjectId}`) - } + getBaseUrlsOrderedByAvgDownloadSpeed(): string[] { + const urlsWithAvgSpeeds = [] - async getExpectedHash(): Promise { - if (this.api !== undefined) { - const convertedBagId = parseBagId(this.bagId) - const dataObject = await this.api.query.storage.dataObjectsById(convertedBagId, this.dataObjectId) - return hexToString(dataObject.ipfsContentId.toString()) + for (const baseUrl of this.baseUrls) { + const avgSpeed = downloadSpeedTracker.getAverageDownloadSpeed(baseUrl) + urlsWithAvgSpeeds.push({ baseUrl, avgSpeed }) } - return undefined + return urlsWithAvgSpeeds.sort((a, b) => b.avgSpeed - a.avgSpeed).map((item) => item.baseUrl) } } diff --git a/yarn.lock b/yarn.lock index d84814f1c1..f7d93d8bf0 100644 --- a/yarn.lock +++ b/yarn.lock @@ -20508,6 +20508,11 @@ subscriptions-transport-ws@^0.9.18, subscriptions-transport-ws@^0.9.19, subscrip symbol-observable "^1.0.4" ws "^5.2.0 || ^6.0.0 || ^7.0.0" +superagent-node-http-timings@1.0.1: + version "1.0.1" + resolved "https://registry.yarnpkg.com/superagent-node-http-timings/-/superagent-node-http-timings-1.0.1.tgz#e529ac02773753279536aa718ea8060a0db05a5b" + integrity sha512-d+iwOeDGnKrcJ8U7YMF6lXYaKMB/grpDPVbsH9YPy6vZsH8+uRT9NjxpELrn6QlH9pPDgNANiw71PxUdwSBfrQ== + superagent@^6.1.0: version "6.1.0" resolved "https://registry.yarnpkg.com/superagent/-/superagent-6.1.0.tgz#09f08807bc41108ef164cfb4be293cebd480f4a6"