From abec00bc2f5356a664e5c4ecfe6df7c52cc9787f Mon Sep 17 00:00:00 2001 From: Zeeshan Akram <97m.zeeshan@gmail.com> Date: Tue, 26 Dec 2023 14:19:52 +0000 Subject: [PATCH 1/5] delete remoteStorageData.ts file --- .../src/services/sync/remoteStorageData.ts | 63 ------------------- 1 file changed, 63 deletions(-) delete mode 100644 storage-node/src/services/sync/remoteStorageData.ts diff --git a/storage-node/src/services/sync/remoteStorageData.ts b/storage-node/src/services/sync/remoteStorageData.ts deleted file mode 100644 index 42fd748746..0000000000 --- a/storage-node/src/services/sync/remoteStorageData.ts +++ /dev/null @@ -1,63 +0,0 @@ -import superagent from 'superagent' -import urljoin from 'url-join' -import logger from '../logger' -import NodeCache from 'node-cache' - -// Expiration period in seconds for the local cache. -const ExpirationPeriod: number = 3 * 60 // minutes - -// Max data entries in local cache -const MaxEntries = 10000 - -// Local in-memory cache for data object IDs by operator URL. -const availableIDsCache = new NodeCache({ - stdTTL: ExpirationPeriod, - deleteOnExpire: true, - maxKeys: MaxEntries, -}) - -// Local in-memory cache for faulty operator URL. Prevents fetching from the -// offline storage nodes. -const badOperatorUrls = new NodeCache({ - stdTTL: ExpirationPeriod, - deleteOnExpire: true, - maxKeys: MaxEntries, -}) - -/** - * Queries the remote storage node for its data object IDs from the storage. - * It caches the result (including errors) for some limited time. - * - * @param operatorUrl - remote storage node URL - */ -export async function getRemoteDataObjects(operatorUrl: string, hostId: string): Promise { - const url = urljoin(operatorUrl, 'api/v1/state/data-objects') - - const faultyOperator = badOperatorUrls.has(operatorUrl) - if (faultyOperator) { - logger.debug(`Sync - cached error for the ${url} skipping ....`) - return [] - } - - const cachedData = availableIDsCache.get(url) - if (cachedData) { - logger.debug(`Sync - getting from cache available data for ${url}`) - return cachedData - } - - try { - logger.debug(`Sync - fetching available data for ${url}`) - const timeoutMs = 120 * 1000 // 2 min - const response = await superagent.get(url).timeout(timeoutMs).set('X-COLOSSUS-HOST-ID', hostId) - - const data = response.body - availableIDsCache.set(url, data, ExpirationPeriod) - - return data - } catch (err) { - logger.error(`Sync - fetching data error from ${url}: ${err}`) - badOperatorUrls.set(operatorUrl, null, ExpirationPeriod) - } - - return [] -} From 313579f4951a40c7731120f72d5af70da488eb86 Mon Sep 17 00:00:00 2001 From: Zeeshan Akram <97m.zeeshan@gmail.com> Date: Tue, 26 Dec 2023 14:21:07 +0000 Subject: [PATCH 2/5] remove 'api' parameter from performSync function & fetch object hash from QN --- storage-node/src/commands/server.ts | 11 +---------- storage-node/src/commands/util/fetch-bucket.ts | 1 - .../src/services/queryNode/queries/queries.graphql | 1 + storage-node/src/services/sync/storageObligations.ts | 6 ++++++ 4 files changed, 8 insertions(+), 11 deletions(-) diff --git a/storage-node/src/commands/server.ts b/storage-node/src/commands/server.ts index f64c67bc1d..c7ca0aaf69 100644 --- a/storage-node/src/commands/server.ts +++ b/storage-node/src/commands/server.ts @@ -388,16 +388,7 @@ async function runSyncWithInterval( while (true) { try { logger.info(`Resume syncing....`) - await performSync( - api, - buckets, - syncWorkersNumber, - syncWorkersTimeout, - qnApi, - uploadsDirectory, - tempDirectory, - hostId - ) + await performSync(buckets, syncWorkersNumber, syncWorkersTimeout, qnApi, uploadsDirectory, tempDirectory, hostId) logger.info(`Sync run complete. Next run in ${syncIntervalMinutes} minute(s).`) await sleep(sleepInterval) } catch (err) { diff --git a/storage-node/src/commands/util/fetch-bucket.ts b/storage-node/src/commands/util/fetch-bucket.ts index 3fb885510e..c46ce98f4c 100644 --- a/storage-node/src/commands/util/fetch-bucket.ts +++ b/storage-node/src/commands/util/fetch-bucket.ts @@ -70,7 +70,6 @@ export default class FetchBucket extends Command { try { await performSync( - undefined, [bucketId], flags.syncWorkersNumber, flags.syncWorkersTimeout, diff --git a/storage-node/src/services/queryNode/queries/queries.graphql b/storage-node/src/services/queryNode/queries/queries.graphql index 0bf24b24d9..71ff1ffb1d 100644 --- a/storage-node/src/services/queryNode/queries/queries.graphql +++ b/storage-node/src/services/queryNode/queries/queries.graphql @@ -96,6 +96,7 @@ query getBagConnection($bucketIds: [ID!], $limit: Int, $cursor: String) { fragment DataObjectByBagIdsDetails on StorageDataObject { id + ipfsHash storageBagId } diff --git a/storage-node/src/services/sync/storageObligations.ts b/storage-node/src/services/sync/storageObligations.ts index 05d495b4c0..012590d024 100644 --- a/storage-node/src/services/sync/storageObligations.ts +++ b/storage-node/src/services/sync/storageObligations.ts @@ -76,6 +76,11 @@ type DataObject = { * Assigned bag ID */ bagId: string + + /** + * Data Object hash + */ + ipfsHash: string } /** @@ -110,6 +115,7 @@ export async function getStorageObligationsFromRuntime( dataObjects: assignedDataObjects.map((dataObject) => ({ id: dataObject.id, bagId: dataObject.storageBagId, + ipfsHash: dataObject.ipfsHash, })), } From 482df7583088a549abfb3cb3bbdc15417ece340c Mon Sep 17 00:00:00 2001 From: Zeeshan Akram <97m.zeeshan@gmail.com> Date: Tue, 26 Dec 2023 14:29:59 +0000 Subject: [PATCH 3/5] directly download data objects from nodes instead of pre-checking the existance first --- .../src/services/sync/synchronizer.ts | 103 +++------ storage-node/src/services/sync/tasks.ts | 195 +++++------------- 2 files changed, 88 insertions(+), 210 deletions(-) diff --git a/storage-node/src/services/sync/synchronizer.ts b/storage-node/src/services/sync/synchronizer.ts index 0b32c7b170..a5685fd00a 100644 --- a/storage-node/src/services/sync/synchronizer.ts +++ b/storage-node/src/services/sync/synchronizer.ts @@ -1,11 +1,10 @@ -import { ApiPromise } from '@polkadot/api' import _ from 'lodash' import { getDataObjectIDs } from '../../services/caching/localDataObjects' import logger from '../../services/logger' import { QueryNodeApi } from '../queryNode/api' import { DataObligations, getStorageObligationsFromRuntime } from './storageObligations' -import { DownloadFileTask, PrepareDownloadFileTask, SyncTask } from './tasks' -import { TaskProcessorSpawner, TaskSink, WorkingStack } from './workingProcess' +import { DownloadFileTask } from './tasks' +import { TaskProcessorSpawner, WorkingStack } from './workingProcess' /** * Temporary directory name for data uploading. @@ -31,12 +30,11 @@ export const PendingDirName = 'pending' * @param qnApi - Query Node API * @param uploadDirectory - local directory to get file names from * @param tempDirectory - local directory for temporary data uploading - * @param operatorUrl - (optional) defines the data source URL. If not set + * @param selectedOperatorUrl - (optional) defines the data source URL. If not set * the source URL is resolved for each data object separately using the Query * Node information about the storage providers. */ export async function performSync( - api: ApiPromise | undefined, buckets: string[], asyncWorkersNumber: number, asyncWorkersTimeout: number, @@ -44,37 +42,31 @@ export async function performSync( uploadDirectory: string, tempDirectory: string, hostId: string, - operatorUrl?: string + selectedOperatorUrl?: string ): Promise { logger.info('Started syncing...') const [model, files] = await Promise.all([getStorageObligationsFromRuntime(qnApi, buckets), getDataObjectIDs()]) - const requiredIds = model.dataObjects.map((obj) => obj.id) + const required = model.dataObjects - const added = _.difference(requiredIds, files) - const removed = _.difference(files, requiredIds) + const added = _.differenceWith(required, files, (required, file) => required.id === file) + const removed = _.differenceWith(files, required, (file, required) => file === required.id) logger.debug(`Sync - new objects: ${added.length}`) logger.debug(`Sync - obsolete objects: ${removed.length}`) const workingStack = new WorkingStack() - let addedTasks: SyncTask[] - if (operatorUrl === undefined) { - addedTasks = await getPrepareDownloadTasks( - api, - model, - buckets, - added, - uploadDirectory, - tempDirectory, - workingStack, - asyncWorkersTimeout, - hostId - ) - } else { - addedTasks = await getDownloadTasks(operatorUrl, added, uploadDirectory, tempDirectory, asyncWorkersTimeout, hostId) - } + const addedTasks = await getDownloadTasks( + model, + buckets, + added, + uploadDirectory, + tempDirectory, + asyncWorkersTimeout, + hostId, + selectedOperatorUrl + ) logger.debug(`Sync - started processing...`) @@ -87,28 +79,28 @@ export async function performSync( } /** - * Creates the download preparation tasks. + * Creates the download tasks. * - * @param api - Runtime API promise - * @param ownBuckets - list of bucket ids operated this node * @param dataObligations - defines the current data obligations for the node + * @param ownBuckets - list of bucket ids operated this node * @param addedIds - data object IDs to download * @param uploadDirectory - local directory for data uploading * @param tempDirectory - local directory for temporary data uploading * @param taskSink - a destination for the newly created tasks * @param asyncWorkersTimeout - downloading asset timeout + * @param hostId - Random host UUID assigned to each node during bootstrap + * @param selectedOperatorUrl - operator URL selected for syncing objects */ -async function getPrepareDownloadTasks( - api: ApiPromise | undefined, +async function getDownloadTasks( dataObligations: DataObligations, ownBuckets: string[], - addedIds: string[], + added: DataObligations['dataObjects'], uploadDirectory: string, tempDirectory: string, - taskSink: TaskSink, asyncWorkersTimeout: number, - hostId: string -): Promise { + hostId: string, + selectedOperatorUrl?: string +): Promise { const bagIdByDataObjectId = new Map() for (const entry of dataObligations.dataObjects) { bagIdByDataObjectId.set(entry.id, entry.bagId) @@ -148,53 +140,26 @@ async function getPrepareDownloadTasks( bagOperatorsUrlsById.set(entry.id, operatorUrls) } - const tasks = addedIds.map((id) => { + const tasks = added.map((dataObject) => { let operatorUrls: string[] = [] // can be empty after look up let bagId = null - if (bagIdByDataObjectId.has(id)) { - bagId = bagIdByDataObjectId.get(id) + if (bagIdByDataObjectId.has(dataObject.id)) { + bagId = bagIdByDataObjectId.get(dataObject.id) if (bagOperatorsUrlsById.has(bagId)) { operatorUrls = bagOperatorsUrlsById.get(bagId) } } - return new PrepareDownloadFileTask( - operatorUrls, - hostId, - bagId, - id, + return new DownloadFileTask( + selectedOperatorUrl ? [selectedOperatorUrl] : operatorUrls, + dataObject.id, + dataObject.ipfsHash, uploadDirectory, tempDirectory, - taskSink, asyncWorkersTimeout, - api + hostId ) }) return tasks } - -/** - * Creates the download file tasks. - * - * @param operatorUrl - defines the data source URL. - * @param addedIds - data object IDs to download - * @param uploadDirectory - local directory for data uploading - * @param tempDirectory - local directory for temporary data uploading - * @param downloadTimeout - asset downloading timeout (in minutes) - */ -async function getDownloadTasks( - operatorUrl: string, - addedIds: string[], - uploadDirectory: string, - tempDirectory: string, - downloadTimeout: number, - hostId: string -): Promise { - const addedTasks = addedIds.map( - (fileName) => - new DownloadFileTask(operatorUrl, fileName, undefined, uploadDirectory, tempDirectory, downloadTimeout, hostId) - ) - - return addedTasks -} diff --git a/storage-node/src/services/sync/tasks.ts b/storage-node/src/services/sync/tasks.ts index f0b168014f..2e83ac8541 100644 --- a/storage-node/src/services/sync/tasks.ts +++ b/storage-node/src/services/sync/tasks.ts @@ -1,5 +1,3 @@ -import { ApiPromise } from '@polkadot/api' -import { hexToString } from '@polkadot/util' import fs from 'fs' import _ from 'lodash' import path from 'path' @@ -15,10 +13,7 @@ import { getDataObjectIdFromCache, } from '../caching/localDataObjects' import { isNewDataObject } from '../caching/newUploads' -import { parseBagId } from '../helpers/bagTypes' import { hashFile } from '../helpers/hashing' -import { getRemoteDataObjects } from './remoteStorageData' -import { TaskSink } from './workingProcess' const fsPromises = fs.promises /** @@ -77,39 +72,62 @@ export class DeleteLocalFileTask implements SyncTask { * Download the file from the remote storage node to the local storage. */ export class DownloadFileTask implements SyncTask { - dataObjectId: string - expectedHash?: string - uploadsDirectory: string - tempDirectory: string - url: string - downloadTimeout: number - hostId: string + operatorUrls: string[] constructor( - baseUrl: string, - dataObjectId: string, - expectedHash: string | undefined, - uploadsDirectory: string, - tempDirectory: string, - downloadTimeout: number, - hostId: string + baseUrls: string[], + private dataObjectId: string, + private expectedHash: string, + private uploadsDirectory: string, + private tempDirectory: string, + private downloadTimeout: number, + private hostId: string ) { - this.dataObjectId = dataObjectId - this.expectedHash = expectedHash - this.uploadsDirectory = uploadsDirectory - this.tempDirectory = tempDirectory - this.downloadTimeout = downloadTimeout - this.url = urljoin(baseUrl, 'api/v1/files', dataObjectId) - this.hostId = hostId + this.operatorUrls = baseUrls.map((baseUrl) => urljoin(baseUrl, 'api/v1/files', dataObjectId)) } description(): string { - return `Sync - downloading file: ${this.url} to ${this.uploadsDirectory} ....` + return `Sync - Trying for download of: ${this.dataObjectId} ....` } async execute(): Promise { + // Create an array of operator URL indices to maintain a random URL choice + // cannot use the original array because we shouldn't modify the original data. + // And cloning it seems like a heavy operation. + const operatorUrlIndices: number[] = [...Array(this.operatorUrls.length).keys()] + + while (!_.isEmpty(operatorUrlIndices)) { + const randomUrlIndex = _.sample(operatorUrlIndices) + if (randomUrlIndex === undefined) { + logger.warn(`Sync - cannot get a random URL`) + break + } + + const chosenBaseUrl = this.operatorUrls[randomUrlIndex] + logger.debug(`Sync - random storage node URL was chosen ${chosenBaseUrl}`) + + // Remove random url from the original list. + _.remove(operatorUrlIndices, (index) => index === randomUrlIndex) + + const filepath = path.join(this.uploadsDirectory, this.dataObjectId) + try { + // Try downloading file + await this.tryDownload(chosenBaseUrl, filepath) + + // if download succeeds, break the loop + if (fs.existsSync(filepath)) { + return + } + } catch (err) { + logger.error(`Sync - fetching data error for ${this.dataObjectId}: ${err}`, { err }) + } + } + + logger.warn(`Sync - cannot get operator URLs for ${this.dataObjectId}`) + } + + async tryDownload(url: string, filepath: string): Promise { const streamPipeline = promisify(pipeline) - const filepath = path.join(this.uploadsDirectory, this.dataObjectId) // We create tempfile first to mitigate partial downloads on app (or remote node) crash. // This partial downloads will be cleaned up during the next sync iteration. const tempFilePath = path.join(this.tempDirectory, uuidv4()) @@ -118,33 +136,33 @@ export class DownloadFileTask implements SyncTask { // Casting because of: // https://stackoverflow.com/questions/38478034/pipe-superagent-response-to-express-response const request = superagent - .get(this.url) + .get(url) .timeout(timeoutMs) .set('X-COLOSSUS-HOST-ID', this.hostId) as unknown as NodeJS.ReadableStream const fileStream = fs.createWriteStream(tempFilePath) request.on('response', (res) => { if (!res.ok) { - logger.error(`Sync - unexpected status code(${res.statusCode}) for ${res?.request?.url}`) + logger.debug(`Sync - unexpected status code(${res.statusCode}) for ${res?.request?.url}`) } // Handle 'error' event on Response too, because it will be emitted if request was // prematurely aborted/closed due to timeout and the response still was not completed // See: https://github.com/nodejs/node/blob/cd171576b2d1376dae3eb371b6da5ccf04dc4a85/lib/_http_client.js#L439-L441 res.on('error', (err: Error) => { - logger.error(`Sync - fetching data error for ${this.url}: ${err}`, { err }) + logger.error(`Sync - fetching data error for ${url}: ${err}`, { err }) }) }) request.on('error', (err) => { - logger.error(`Sync - fetching data error for ${this.url}: ${err}`, { err }) + logger.error(`Sync - fetching data error for ${url}: ${err}`, { err }) }) await streamPipeline(request, fileStream) await this.verifyDownloadedFile(tempFilePath) await fsPromises.rename(tempFilePath, filepath) await addDataObjectIdToCache(this.dataObjectId) } catch (err) { - logger.error(`Sync - fetching data error for ${this.url}: ${err}`, { err }) + logger.error(`Sync - fetching data error for ${url}: ${err}`, { err }) try { logger.warn(`Cleaning up file ${tempFilePath}`) await fsPromises.unlink(tempFilePath) @@ -159,115 +177,10 @@ export class DownloadFileTask implements SyncTask { * @param filePath downloaded file path */ async verifyDownloadedFile(filePath: string): Promise { - if (!_.isEmpty(this.expectedHash)) { - const hash = await hashFile(filePath) + const hash = await hashFile(filePath) - if (hash !== this.expectedHash) { - throw new Error(`Invalid file hash. Expected: ${this.expectedHash} - real: ${hash}`) - } + if (hash !== this.expectedHash) { + throw new Error(`Invalid file hash. Expected: ${this.expectedHash} - real: ${hash}`) } } } - -/** - * Resolve remote storage node URLs and creates file downloading tasks (DownloadFileTask). - */ -export class PrepareDownloadFileTask implements SyncTask { - bagId: string - dataObjectId: string - operatorUrlCandidates: string[] - taskSink: TaskSink - uploadsDirectory: string - tempDirectory: string - api?: ApiPromise - downloadTimeout: number - hostId: string - - constructor( - operatorUrlCandidates: string[], - hostId: string, - bagId: string, - dataObjectId: string, - uploadsDirectory: string, - tempDirectory: string, - taskSink: TaskSink, - downloadTimeout: number, - api?: ApiPromise - ) { - this.api = api - this.bagId = bagId - this.dataObjectId = dataObjectId - this.taskSink = taskSink - this.operatorUrlCandidates = operatorUrlCandidates - this.uploadsDirectory = uploadsDirectory - this.tempDirectory = tempDirectory - this.downloadTimeout = downloadTimeout - this.hostId = hostId - } - - description(): string { - return `Sync - preparing for download of: ${this.dataObjectId} ....` - } - - async execute(): Promise { - // Create an array of operator URL indices to maintain a random URL choice - // cannot use the original array because we shouldn't modify the original data. - // And cloning it seems like a heavy operation. - const operatorUrlIndices: number[] = [...Array(this.operatorUrlCandidates.length).keys()] - - if (_.isEmpty(this.bagId)) { - logger.error(`Sync - invalid task - no bagId for ${this.dataObjectId}`) - return - } - - while (!_.isEmpty(operatorUrlIndices)) { - const randomUrlIndex = _.sample(operatorUrlIndices) - if (randomUrlIndex === undefined) { - logger.warn(`Sync - cannot get a random URL`) - break - } - - const randomUrl = this.operatorUrlCandidates[randomUrlIndex] - logger.debug(`Sync - random storage node URL was chosen ${randomUrl}`) - - // Remove random url from the original list. - _.remove(operatorUrlIndices, (index) => index === randomUrlIndex) - - try { - const chosenBaseUrl = randomUrl - const [remoteOperatorIds, hash] = await Promise.all([ - getRemoteDataObjects(chosenBaseUrl, this.hostId), - this.getExpectedHash(), - ]) - - if (remoteOperatorIds.includes(this.dataObjectId)) { - const newTask = new DownloadFileTask( - chosenBaseUrl, - this.dataObjectId, - hash, - this.uploadsDirectory, - this.tempDirectory, - this.downloadTimeout, - this.hostId - ) - - return this.taskSink.add([newTask]) - } - } catch (err) { - logger.error(`Sync - fetching data error for ${this.dataObjectId}: ${err}`, { err }) - } - } - - logger.warn(`Sync - cannot get operator URLs for ${this.dataObjectId}`) - } - - async getExpectedHash(): Promise { - if (this.api !== undefined) { - const convertedBagId = parseBagId(this.bagId) - const dataObject = await this.api.query.storage.dataObjectsById(convertedBagId, this.dataObjectId) - return hexToString(dataObject.ipfsContentId.toString()) - } - - return undefined - } -} From 4a2293a7e1565ef1b83f89bc9ac5a0d3926c185c Mon Sep 17 00:00:00 2001 From: Zeeshan Akram <97m.zeeshan@gmail.com> Date: Tue, 26 Dec 2023 14:45:36 +0000 Subject: [PATCH 4/5] [Colossus] dont log message if data object does not exist on remote node --- storage-node/src/services/sync/tasks.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/storage-node/src/services/sync/tasks.ts b/storage-node/src/services/sync/tasks.ts index 2e83ac8541..d586dfea3b 100644 --- a/storage-node/src/services/sync/tasks.ts +++ b/storage-node/src/services/sync/tasks.ts @@ -142,8 +142,8 @@ export class DownloadFileTask implements SyncTask { const fileStream = fs.createWriteStream(tempFilePath) request.on('response', (res) => { - if (!res.ok) { - logger.debug(`Sync - unexpected status code(${res.statusCode}) for ${res?.request?.url}`) + if (!res.ok && res.statusCode !== 404) { + logger.error(`Sync - unexpected status code(${res.statusCode}) for ${res?.request?.url}`) } // Handle 'error' event on Response too, because it will be emitted if request was From 156094770b604f2c9d722437d976a63bc7513b92 Mon Sep 17 00:00:00 2001 From: Zeeshan Akram <97m.zeeshan@gmail.com> Date: Wed, 27 Dec 2023 15:55:59 +0500 Subject: [PATCH 5/5] [Colossus] select data object download URLs based on the avg download speed --- storage-node/.eslintrc.js | 1 + storage-node/package.json | 1 + .../superagent-node-http-timings/index.d.ts | 1 + .../queryNode/queries/queries.graphql | 1 + .../src/services/sync/storageObligations.ts | 8 +- .../src/services/sync/synchronizer.ts | 1 + storage-node/src/services/sync/tasks.ts | 100 ++++++++++++------ yarn.lock | 5 + 8 files changed, 84 insertions(+), 34 deletions(-) create mode 100644 storage-node/src/@types/superagent-node-http-timings/index.d.ts diff --git a/storage-node/.eslintrc.js b/storage-node/.eslintrc.js index cc0fb2f316..872cb89882 100644 --- a/storage-node/.eslintrc.js +++ b/storage-node/.eslintrc.js @@ -11,5 +11,6 @@ module.exports = { 'no-unused-vars': 'off', // Required by the typescript rule below '@typescript-eslint/no-unused-vars': ['error'], '@typescript-eslint/no-floating-promises': 'error', + 'no-useless-constructor': 'off', }, } diff --git a/storage-node/package.json b/storage-node/package.json index 4fae3846c1..9fe36e34ed 100644 --- a/storage-node/package.json +++ b/storage-node/package.json @@ -57,6 +57,7 @@ "sleep-promise": "^9.1.0", "subscriptions-transport-ws": "^0.11.0", "superagent": "^6.1.0", + "superagent-node-http-timings": "1.0.1", "tslib": "^1", "url-join": "^4.0.1", "uuid": "^8.3.2", diff --git a/storage-node/src/@types/superagent-node-http-timings/index.d.ts b/storage-node/src/@types/superagent-node-http-timings/index.d.ts new file mode 100644 index 0000000000..2c279e3743 --- /dev/null +++ b/storage-node/src/@types/superagent-node-http-timings/index.d.ts @@ -0,0 +1 @@ +declare module 'superagent-node-http-timings' diff --git a/storage-node/src/services/queryNode/queries/queries.graphql b/storage-node/src/services/queryNode/queries/queries.graphql index 71ff1ffb1d..4325ae16a0 100644 --- a/storage-node/src/services/queryNode/queries/queries.graphql +++ b/storage-node/src/services/queryNode/queries/queries.graphql @@ -96,6 +96,7 @@ query getBagConnection($bucketIds: [ID!], $limit: Int, $cursor: String) { fragment DataObjectByBagIdsDetails on StorageDataObject { id + size ipfsHash storageBagId } diff --git a/storage-node/src/services/sync/storageObligations.ts b/storage-node/src/services/sync/storageObligations.ts index 012590d024..d987be472d 100644 --- a/storage-node/src/services/sync/storageObligations.ts +++ b/storage-node/src/services/sync/storageObligations.ts @@ -77,6 +77,11 @@ type DataObject = { */ bagId: string + /** + * Data Object size (in bytes) + */ + size: string + /** * Data Object hash */ @@ -114,8 +119,9 @@ export async function getStorageObligationsFromRuntime( })), dataObjects: assignedDataObjects.map((dataObject) => ({ id: dataObject.id, - bagId: dataObject.storageBagId, + size: dataObject.size, ipfsHash: dataObject.ipfsHash, + bagId: dataObject.storageBagId, })), } diff --git a/storage-node/src/services/sync/synchronizer.ts b/storage-node/src/services/sync/synchronizer.ts index a5685fd00a..335a243258 100644 --- a/storage-node/src/services/sync/synchronizer.ts +++ b/storage-node/src/services/sync/synchronizer.ts @@ -153,6 +153,7 @@ async function getDownloadTasks( return new DownloadFileTask( selectedOperatorUrl ? [selectedOperatorUrl] : operatorUrls, dataObject.id, + dataObject.size, dataObject.ipfsHash, uploadDirectory, tempDirectory, diff --git a/storage-node/src/services/sync/tasks.ts b/storage-node/src/services/sync/tasks.ts index d586dfea3b..0bc069b4fc 100644 --- a/storage-node/src/services/sync/tasks.ts +++ b/storage-node/src/services/sync/tasks.ts @@ -3,6 +3,7 @@ import _ from 'lodash' import path from 'path' import { pipeline } from 'stream' import superagent from 'superagent' +import superagentTimings from 'superagent-node-http-timings' import urljoin from 'url-join' import { promisify } from 'util' import { v4 as uuidv4 } from 'uuid' @@ -16,8 +17,36 @@ import { isNewDataObject } from '../caching/newUploads' import { hashFile } from '../helpers/hashing' const fsPromises = fs.promises +class DownloadSpeedTracker { + private downloadSpeedsByUrl: Map + + constructor(private maxRecordsPerUrl: number) { + this.downloadSpeedsByUrl = new Map() + } + + recordDownloadSpeed(url: string, speed: number): void { + const downloadSpeeds = this.downloadSpeedsByUrl.get(url) || [] + downloadSpeeds.push(speed) + + // Keep only the last `maxRecordsPerUrl` records + if (downloadSpeeds.length > this.maxRecordsPerUrl) { + downloadSpeeds.shift() + } + + // Update the map with the new download times list + this.downloadSpeedsByUrl.set(url, downloadSpeeds) + } + + getAverageDownloadSpeed(url: string): number { + const downloadSpeeds = this.downloadSpeedsByUrl.get(url) + return _.mean(downloadSpeeds) || 0 + } +} + +const downloadSpeedTracker = new DownloadSpeedTracker(10) + /** - * Defines syncronization task abstraction. + * Defines synchronization task abstraction. */ export interface SyncTask { /** @@ -72,71 +101,65 @@ export class DeleteLocalFileTask implements SyncTask { * Download the file from the remote storage node to the local storage. */ export class DownloadFileTask implements SyncTask { - operatorUrls: string[] - constructor( - baseUrls: string[], + private baseUrls: string[], private dataObjectId: string, + private expectedSize: string, private expectedHash: string, private uploadsDirectory: string, private tempDirectory: string, private downloadTimeout: number, private hostId: string - ) { - this.operatorUrls = baseUrls.map((baseUrl) => urljoin(baseUrl, 'api/v1/files', dataObjectId)) - } + ) {} description(): string { return `Sync - Trying for download of: ${this.dataObjectId} ....` } async execute(): Promise { - // Create an array of operator URL indices to maintain a random URL choice - // cannot use the original array because we shouldn't modify the original data. - // And cloning it seems like a heavy operation. - const operatorUrlIndices: number[] = [...Array(this.operatorUrls.length).keys()] - - while (!_.isEmpty(operatorUrlIndices)) { - const randomUrlIndex = _.sample(operatorUrlIndices) - if (randomUrlIndex === undefined) { - logger.warn(`Sync - cannot get a random URL`) - break - } - - const chosenBaseUrl = this.operatorUrls[randomUrlIndex] - logger.debug(`Sync - random storage node URL was chosen ${chosenBaseUrl}`) + const baseUrls: string[] = this.getBaseUrlsOrderedByAvgDownloadSpeed() - // Remove random url from the original list. - _.remove(operatorUrlIndices, (index) => index === randomUrlIndex) + for (const baseUrl of baseUrls) { + logger.debug(`Sync - storage node URL chosen for download: ${baseUrl}`) const filepath = path.join(this.uploadsDirectory, this.dataObjectId) - try { - // Try downloading file - await this.tryDownload(chosenBaseUrl, filepath) - // if download succeeds, break the loop - if (fs.existsSync(filepath)) { - return - } - } catch (err) { - logger.error(`Sync - fetching data error for ${this.dataObjectId}: ${err}`, { err }) + // Try downloading file + await this.tryDownload(baseUrl, filepath) + + // if download succeeds, break the loop + if (fs.existsSync(filepath)) { + return } } logger.warn(`Sync - cannot get operator URLs for ${this.dataObjectId}`) } - async tryDownload(url: string, filepath: string): Promise { + async tryDownload(baseUrl: string, filepath: string): Promise { const streamPipeline = promisify(pipeline) // We create tempfile first to mitigate partial downloads on app (or remote node) crash. // This partial downloads will be cleaned up during the next sync iteration. const tempFilePath = path.join(this.tempDirectory, uuidv4()) + const url = urljoin(baseUrl, 'api/v1/files', this.dataObjectId) try { const timeoutMs = this.downloadTimeout * 60 * 1000 // Casting because of: // https://stackoverflow.com/questions/38478034/pipe-superagent-response-to-express-response const request = superagent .get(url) + .use( + superagentTimings((err: unknown, result: { status: number; timings: { total: number } }) => { + if (err) { + logger.error(`Sync - error measuring download time for ${url}: ${err}`, { err }) + } + + // Record download speed for given operator (speed = bytes/ms) + if (result.status === 200) { + downloadSpeedTracker.recordDownloadSpeed(baseUrl, parseInt(this.expectedSize) / result.timings.total) + } + }) + ) .timeout(timeoutMs) .set('X-COLOSSUS-HOST-ID', this.hostId) as unknown as NodeJS.ReadableStream const fileStream = fs.createWriteStream(tempFilePath) @@ -183,4 +206,15 @@ export class DownloadFileTask implements SyncTask { throw new Error(`Invalid file hash. Expected: ${this.expectedHash} - real: ${hash}`) } } + + getBaseUrlsOrderedByAvgDownloadSpeed(): string[] { + const urlsWithAvgSpeeds = [] + + for (const baseUrl of this.baseUrls) { + const avgSpeed = downloadSpeedTracker.getAverageDownloadSpeed(baseUrl) + urlsWithAvgSpeeds.push({ baseUrl, avgSpeed }) + } + + return urlsWithAvgSpeeds.sort((a, b) => b.avgSpeed - a.avgSpeed).map((item) => item.baseUrl) + } } diff --git a/yarn.lock b/yarn.lock index d84814f1c1..f7d93d8bf0 100644 --- a/yarn.lock +++ b/yarn.lock @@ -20508,6 +20508,11 @@ subscriptions-transport-ws@^0.9.18, subscriptions-transport-ws@^0.9.19, subscrip symbol-observable "^1.0.4" ws "^5.2.0 || ^6.0.0 || ^7.0.0" +superagent-node-http-timings@1.0.1: + version "1.0.1" + resolved "https://registry.yarnpkg.com/superagent-node-http-timings/-/superagent-node-http-timings-1.0.1.tgz#e529ac02773753279536aa718ea8060a0db05a5b" + integrity sha512-d+iwOeDGnKrcJ8U7YMF6lXYaKMB/grpDPVbsH9YPy6vZsH8+uRT9NjxpELrn6QlH9pPDgNANiw71PxUdwSBfrQ== + superagent@^6.1.0: version "6.1.0" resolved "https://registry.yarnpkg.com/superagent/-/superagent-6.1.0.tgz#09f08807bc41108ef164cfb4be293cebd480f4a6"