From 46458856ec6c093ea212c8dda706d147b3965dca Mon Sep 17 00:00:00 2001 From: svvimming Date: Wed, 3 May 2023 11:56:16 -0400 Subject: [PATCH 1/3] feat: cid-importer cron split to multithread processing using worker pool --- packages/be/crons/cid-batch-import.js | 226 ++++++++++++++++++ packages/be/crons/cid-importer.js | 281 ++++++++-------------- packages/be/crons/x_cid-importer.js | 322 ++++++++++++++++++++++++++ 3 files changed, 641 insertions(+), 188 deletions(-) create mode 100644 packages/be/crons/cid-batch-import.js create mode 100644 packages/be/crons/x_cid-importer.js diff --git a/packages/be/crons/cid-batch-import.js b/packages/be/crons/cid-batch-import.js new file mode 100644 index 00000000..fa4fbebf --- /dev/null +++ b/packages/be/crons/cid-batch-import.js @@ -0,0 +1,226 @@ +// ///////////////////////////////////////////////////// Imports + general setup +// ----------------------------------------------------------------------------- +const ModuleAlias = require('module-alias') +const Path = require('path') +const Axios = require('axios') +const Fs = require('fs-extra') +const Express = require('express') +const Util = require('util') +const Stream = require('stream') +const Pipeline = Util.promisify(Stream.pipeline) +const Spawn = require('child_process').spawn +const WorkerPool = require('workerpool') +const Mongoose = require('mongoose') +const MongooseSlugUpdater = require('mongoose-slug-updater') + +require('dotenv').config({ path: Path.resolve(__dirname, '../.env') }) + +const MC = require('../config') + +const CID_TMP_DIR = Path.resolve(`${MC.packageRoot}/tmp/cid-files`) + +// ////////////////////////////////////////////////////////////////// Initialize +MC.app = Express() +Mongoose.plugin(MongooseSlugUpdater) + +// ///////////////////////////////////////////////////////////////////// Aliases +ModuleAlias.addAliases({ + '@Root': MC.packageRoot, + '@Modules': `${MC.packageRoot}/modules` +}) + +try { + const modulesRoot = `${MC.packageRoot}/modules` + const items = Fs.readdirSync(modulesRoot) + items.forEach((name) => { + const path = `${modulesRoot}/${name}` + if (Fs.statSync(path).isDirectory()) { + const moduleName = `${name[0].toUpperCase() + name.substring(1)}` + ModuleAlias.addAlias(`@Module_${moduleName}`, path) + } + }) +} catch (e) { + console.log(e) +} + +// ///////////////////////////////////////////////////////////////////// Modules +require('@Module_Database') +require('@Module_Cidtemp') + +// /////////////////////////////////////////////////////////////////// Functions +// ------------------------------------------------------------ retrieveCidFiles +const retrieveCidFile = async (line, retryNo = 0) => { + try { + if (retryNo > 0) { + console.log(`Retry number ${retryNo}`) + } + const upload = JSON.parse(line) + // fetch file using upload cid + const response = await Axios.get(`https://${upload.cid}.ipfs.w3s.link/`, { + responseType: 'stream' + }) + // if a file already exists with this name in the temp folder, + // delete it to make way for an updated version + await deleteTemporaryFile(CID_TMP_DIR, upload.name) + // write file data to new zst file in the temp folder + await Pipeline(response.data, Fs.createWriteStream(`${CID_TMP_DIR}/${upload.name}`)) + // unpack the zst and return the inner json + return await unpackRetrievedFile(CID_TMP_DIR, { + cid: upload.cid, + name: upload.name, + updated: upload.updated, + created: upload.created + }) + } catch (e) { + console.log('====================================== [Function: unpackCids]') + console.log(e) + if (retryNo < 10) { + console.log(`Error retrieving CID ${JSON.parse(line).cid}. Retrying retrieval...`) + await retrieveCidFile(CID_TMP_DIR, line, retryNo + 1) + } else { + const cid = JSON.parse(line).cid + console.log(`Could not retrieve CID ${cid}. Max retries reached.`) + await cacheFailedCID(CID_TMP_DIR, cid) + } + } +} + +// --------------------------------------------------------------- unpackZstFile +const unpackZstFile = (file) => { + return new Promise((resolve, reject) => { + const unzstd = Spawn('unzstd', [`../tmp/cid-files/${file.name}`]) + const errors = [] + unzstd.stderr.on('data', (msg) => { + errors.push(`Error unpacking ${file.name}: ${msg.toString()}`) + }) + unzstd.on('exit', (code) => { + const err = errors.length > 0 && code !== 0 + if (err) { + console.log(errors.join('\n')) + } + resolve() + }) + }) +} + +// --------------------------------------------------------- unpackRetrievedFile +const unpackRetrievedFile = async (file) => { + try { + const jsonFilename = file.name.substring(0, file.name.length - 4) + await deleteTemporaryFile(jsonFilename) + await unpackZstFile(file) + // await deleteTemporaryFile(file.name) // TODO : don't delete zst file in tmp directory + const json = await GetFileFromDisk(`${CID_TMP_DIR}/${jsonFilename}`, true) + const fileMetadata = { + piece_cid: json.piece_cid, + payload_cid: json.payload_cid, + raw_car_file_size: json.raw_car_file_size, + dataset_slug: json.dataset, + filename: jsonFilename, + web3storageCreatedAt: file.created, + web3storageUpdatedAt: file.updated + } + await deleteTemporaryFile(jsonFilename) // TODO : do delete unpacked json file in tmp directory + return fileMetadata + } catch (e) { + console.log('============================ [Function: unpackRetrievedFiles]') + console.log(e) + } +} + +// -------------------------------------------------------------- cacheFailedCid +const cacheFailedCID = async (cid) => { + try { + await Pipeline(`${cid}\n`, Fs.createWriteStream(`${CID_TMP_DIR}/failed-cid-retrievals.txt`, { flags: 'a' })) + } catch (e) { + console.log('================================= [Function: cacheFailedCID ]') + console.log(e) + } +} + +// --------------------------------------------------------- deleteTemporaryFile +const deleteTemporaryFile = async (filename) => { + try { + if (Fs.existsSync(`${CID_TMP_DIR}/${filename}`)) { + Fs.unlinkSync(`${CID_TMP_DIR}/${filename}`) + } + } catch (e) { + console.log('============================ [Function: deleteTemporaryFile ]') + console.log(e) + } +} + +// ------------------------------------------------ writeBatchMetadataToDatabase +const writeBatchMetadataToDatabase = async (retrievedFiles) => { + try { + const operations = [] + const len = retrievedFiles.length + for (let i = 0; i < len; i++) { + const file = retrievedFiles[i] + operations.push({ + updateOne: { + filter: { payload_cid: file.payload_cid }, + update: { $set: file }, + upsert: true + } + }) + } + const response = await MC.model.Cidtemp.bulkWrite(operations, { ordered: false }) + return response.result + } catch (e) { + console.log('==================== [Function: writeBatchMetadataToDatabase]') + console.log(e) + } +} + +// ------------------------------------------------ backupFilesToBackblazeBucket +const backupFilesToBackblazeBucket = async (retrievedFiles) => { + try { + console.log('backup started') + } catch (e) { + console.log('==================== [Function: backupFilesToBackblazeBucket]') + console.log(e) + } +} + +// ////////////////////////////////////////////////////////////////////// Worker +// -------------------------------------------------------- processManifestBatch +const processManifestBatch = async (batch, batchNo) => { + try { + const len = batch.length + const uploaded = Math.floor(Math.random() * len) + const modified = len - uploaded + console.log(len, uploaded, modified) + // const retrievedFiles = [] + // for (let i = 0; i < len; i++) { + // const cidManifestItem = batch[i] + // const retrieved = await retrieveCidFile(cidManifestItem) + // if (retrieved) { retrievedFiles.push(retrieved) } + // } + // console.log(retrievedFiles) + // const databaseWriteResult = await writeBatchMetadataToDatabase(retrievedFiles) + // const batchBackupResult = await backupFilesToBackblazeBucket(retrievedFiles) + return new Promise((resolve, reject) => { + setTimeout(() => { + const result = new WorkerPool.Transfer({ + batch: batchNo, + databaseWriteResult: { + nUpserted: uploaded, + nModified: modified + }, + batchBackupResult: false + }) + resolve(result) + }, 3000) + }) + } catch (e) { + console.log('============================ [Function: processManifestBatch]') + console.log(e) + } +} + +// ////////////////////////////////////////////////////////////////// Initialize +// ----------------------------------------------------------------------------- +WorkerPool.worker({ + processManifestBatch: processManifestBatch +}) diff --git a/packages/be/crons/cid-importer.js b/packages/be/crons/cid-importer.js index a9f9e8c5..a0b61b16 100644 --- a/packages/be/crons/cid-importer.js +++ b/packages/be/crons/cid-importer.js @@ -17,14 +17,23 @@ const Util = require('util') const Stream = require('stream') const Pipeline = Util.promisify(Stream.pipeline) const readline = require('node:readline') -const Spawn = require('child_process').spawn const argv = require('minimist')(process.argv.slice(2)) +const WorkerPool = require('workerpool') + require('dotenv').config({ path: Path.resolve(__dirname, '../.env') }) const MC = require('../config') const CID_TMP_DIR = Path.resolve(`${MC.packageRoot}/tmp/cid-files`) +const numThreads = argv.threads || 16 +const Pool = WorkerPool.pool(Path.resolve(__dirname, './cid-batch-import.js'), { + maxWorkers: numThreads, + workerType: 'thread' +}) + +let startTime + // ////////////////////////////////////////////////////////////////// Initialize MC.app = Express() @@ -50,176 +59,72 @@ try { // ///////////////////////////////////////////////////////////////////// Modules require('@Module_Database') -require('@Module_Cid') -const { GetFileFromDisk, SecondsToHms } = require('@Module_Utilities') +require('@Module_Cidtemp') +const { SecondsToHms } = require('@Module_Utilities') // /////////////////////////////////////////////////////////////////// Functions -// ------------------------------------------------------------ retrieveCidFiles -const retrieveCidFile = async (line, retryNo = 0) => { +// --------------------------------------------------- performCidBatchOperations +const performCidBatchOperations = (manifest, batchNo, tasks, results) => { try { - if (retryNo > 0) { - console.log(`Retry number ${retryNo}`) - } - const upload = JSON.parse(line) - // fetch file using upload cid - const response = await Axios.get(`https://${upload.cid}.ipfs.w3s.link/`, { - responseType: 'stream' - }) - // if a file already exists with this name in the temp folder, - // delete it to make way for an updated version - await deleteTemporaryFile(upload.name) - // write file data to new zst file in the temp folder - await Pipeline(response.data, Fs.createWriteStream(`${CID_TMP_DIR}/${upload.name}`)) - // unpack the zst and return the inner json - return await unpackRetrievedFile({ - cid: upload.cid, - name: upload.name, - updated: upload.updated, - created: upload.created - }) - } catch (e) { - console.log('====================================== [Function: unpackCids]') - console.log(e) - if (retryNo < 10) { - console.log(`Error retrieving CID ${JSON.parse(line).cid}. Retrying retrieval...`) - await retrieveCidFile(line, retryNo + 1) + const batchSize = argv.pagesize || 1000 + const batch = manifest.slice(0, batchSize) + tasks.push( + Pool.exec('processManifestBatch', [batch, batchNo]).then((result) => { + results.push(result) + imported = result.databaseWriteResult.nUpserted + result.databaseWriteResult.nModified + console.log(`Batch ${result.batch} finished | ${imported} CIDs were imported to the database in this batch`) + }).catch((err) => { + console.log('================================ Error returned by worker') + console.error(err) + }) + ) + manifest.splice(0, batchSize) + if (manifest.length) { + performCidBatchOperations(manifest, ++batchNo, tasks, results) } else { - const cid = JSON.parse(line).cid - console.log(`Could not retrieve CID ${cid}. Max retries reached.`) - await cacheFailedCID(cid) - } - } -} - -// --------------------------------------------------------------- unpackZstFile -const unpackZstFile = (file) => { - return new Promise((resolve, reject) => { - const unzstd = Spawn('unzstd', [`../tmp/cid-files/${file.name}`]) - const errors = [] - unzstd.stderr.on('data', (msg) => { - errors.push(`Error unpacking ${file.name}: ${msg.toString()}`) - }) - unzstd.on('exit', (code) => { - const err = errors.length > 0 && code !== 0 - if (err) { - console.log(errors.join('\n')) - } - resolve() - }) - }) -} - -// --------------------------------------------------------- unpackRetrievedFile -const unpackRetrievedFile = async (file) => { - try { - const jsonFilename = file.name.substring(0, file.name.length - 4) - await deleteTemporaryFile(jsonFilename) - await unpackZstFile(file) - await deleteTemporaryFile(file.name) - const json = await GetFileFromDisk(`${CID_TMP_DIR}/${jsonFilename}`, true) - const fileMetadata = { - piece_cid: json.piece_cid, - payload_cid: json.payload_cid, - raw_car_file_size: json.raw_car_file_size, - dataset_slug: json.dataset, - filename: jsonFilename, - web3storageCreatedAt: file.created, - web3storageUpdatedAt: file.updated - } - await deleteTemporaryFile(jsonFilename) - return fileMetadata - } catch (e) { - console.log('============================ [Function: unpackRetrievedFiles]') - console.log(e) - } -} - -// ------------------------------------------------- writeFileMetadataToDatabase -const writeFileMetadataToDatabase = async (retrievedFiles) => { - try { - const operations = [] - const len = retrievedFiles.length - for (let i = 0; i < len; i++) { - const file = retrievedFiles[i] - operations.push({ - updateOne: { - filter: { payload_cid: file.payload_cid }, - update: { $set: file }, - upsert: true + Promise.all(tasks).catch((e) => { + console.log('=========================== Error returned by worker pool') + console.log(e) + }).then(() => { + Pool.terminate() + const len = results.length + console.log(results) + let dbTotalImported = 0 + let dbTotalModified = 0 + for (let i = 0; i < len; i++) { + const result = results[i] + dbTotalImported = dbTotalImported + result.databaseWriteResult.nUpserted + dbTotalModified = dbTotalModified + result.databaseWriteResult.nModified } + const endTime = process.hrtime()[0] + console.log(`📒 CID import finished | took ${SecondsToHms(endTime - startTime)}`) + console.log(`A total of ${dbTotalImported + dbTotalModified} CIDs were processed by the database | ${dbTotalImported} imported | ${dbTotalModified} modified`) + process.exit(0) }) } - const response = await MC.model.Cid.bulkWrite(operations, { ordered: false }) - return response.result - } catch (e) { - console.log('========================= [Function: writeCidFilesToDatabase]') - console.log(e) - } -} - -// -------------------------------------------------------------- cacheFailedCid -const cacheFailedCID = async (cid) => { - try { - await Pipeline(`${cid}\n`, Fs.createWriteStream(`${CID_TMP_DIR}/failed-cid-retrievals.txt`, { flags: 'a' })) - } catch (e) { - console.log('================================= [Function: cacheFailedCID ]') - console.log(e) - } -} - -// --------------------------------------------------------- deleteTemporaryFile -const deleteTemporaryFile = async (filename) => { - try { - if (Fs.existsSync(`${CID_TMP_DIR}/${filename}`)) { - Fs.unlinkSync(`${CID_TMP_DIR}/${filename}`) - } } catch (e) { - console.log('============================ [Function: deleteTemporaryFile ]') + console.log('======================= [Function: performCidBatchOperations]') console.log(e) } } // ------------------------------------------------- getCidFilesFromManifestList -const getCidFilesFromManifestList = async (importMax) => { +const getCidFilesFromManifestList = async () => { try { - if (Fs.existsSync(`${CID_TMP_DIR}/cid-manifest.txt`)) { - const manifest = Fs.createReadStream(`${CID_TMP_DIR}/cid-manifest.txt`) - const rl = readline.createInterface({ - input: manifest, - crlfDelay: Infinity - }) - // import all lines from the manifest to an array - const manifestCidLines = [] - for await (const line of rl) { - manifestCidLines.push(line) - } - // reverse the array to import oldest CIDs first - // begin writing to the database in batches - manifestCidLines.reverse() - let retrievedFiles = [] - let total = 0 - const batchSize = argv.pagesize || 1000 - const len = manifestCidLines.length - for (let i = 0; i < len; i++) { - if (i < importMax) { - const line = manifestCidLines[i] - console.log(`Retrieving file ${i + 1} from the CID manifest list.`) - const retrieved = await retrieveCidFile(line) - if (retrieved) { retrievedFiles.push(retrieved) } - // write retrieved file data to the database in batches of 1000 - if ((i + 1) % batchSize === 0) { - const result = await writeFileMetadataToDatabase(retrievedFiles) - total = total + result.nUpserted + result.nModified - console.log(`${result.nUpserted} new CIDs imported in this batch | ${result.nModified} CIDs updated in this batch | A total of ${total} CIDs imported/updated so far.`) - retrievedFiles = [] - } - } else { - break - } - } - const result = await writeFileMetadataToDatabase(retrievedFiles) - console.log(`${result.nUpserted} new CIDs imported in this batch | ${result.nModified} CIDs updated in this batch | A total of ${total + result.nUpserted + result.nModified} CIDs were imported/updated to the database.`) + const manifestList = Fs.createReadStream(`${CID_TMP_DIR}/cid-manifest.txt`) + const rl = readline.createInterface({ + input: manifestList, + crlfDelay: Infinity + }) + // import all lines from the manifest to an array + const manifest = [] + for await (const line of rl) { + manifest.push(line) } + // reverse the array to import oldest CIDs first + manifest.reverse() + // pass manifest on to the worker pool + performCidBatchOperations(manifest, 1, [], []) } catch (e) { console.log('===================== [Function: getCidFilesFromManifestList]') console.log(e) @@ -276,40 +181,40 @@ const createManifestFromWeb3StorageCids = async (searchParams, maxPages, lastSav // ///////////////////////////////////////////////////////////////// CidImporter const CidImporter = async () => { try { - const start = process.hrtime()[0] + startTime = process.hrtime()[0] const limit = argv.pagesize || 1000 const maxPages = argv.maxpages || Infinity - console.log(`📖 CID import started | page size of ${limit} and page maximum ${maxPages}.`) - // Get the latest upload entry from the database - const mostRecentCid = await MC.model.Cid.find().sort({ web3storageCreatedAt: -1 }).limit(1) - const mostRecentDocument = mostRecentCid[0] - console.log('Most recent CID imported:') - console.log(mostRecentDocument) - const lastSavedDate = mostRecentDocument ? new Date(mostRecentDocument.web3storageCreatedAt).getTime() : 0 - // Delete the outdated manifest file if it exists - await deleteTemporaryFile('cid-manifest.txt') - /** - * Build a manifest list of all cids not yet uploaded to the database: - * args: - * params passed to the initial api upload list request - * limit number of pages retrieved. Set to Infinity to retrieve all CIDs since the most recently uploaded saved in the db - * the most recent upload saved to our database (so as to request only more newer uploads) - */ - await createManifestFromWeb3StorageCids({ - size: limit, - page: 1, - sortBy: 'Date', - sortOrder: 'Desc' - }, maxPages, lastSavedDate) - /** - * Retrieve and unpack files one by one from the manifest list and bulkWrite contents to the database - * args: - * limit number of entries to the database (this will only be used for test) - */ - await getCidFilesFromManifestList(limit * maxPages) - const end = process.hrtime()[0] - console.log(`📒 CID import finished | took ${SecondsToHms(end - start)}`) - process.exit(0) + // console.log(`📖 CID import started | page size of ${limit} and page maximum ${maxPages}.`) + // // Get the latest upload entry from the database + // // const mostRecentCid = await MC.model.Cidtemp.find().sort({ web3storageCreatedAt: -1 }).limit(1) // TODO : uncomment line and replace with override option + // const mostRecentDocument = false // mostRecentCid[0] + // console.log('Most recent CID imported:') + // console.log(mostRecentDocument) + // const lastSavedDate = mostRecentDocument ? new Date(mostRecentDocument.web3storageCreatedAt).getTime() : 0 + // // Delete the outdated manifest file if it exists + // // await deleteTemporaryFile('cid-manifest.txt') + // if (Fs.existsSync(`${CID_TMP_DIR}/cid-manifest.txt`)) { + // Fs.unlinkSync(`${CID_TMP_DIR}/cid-manifest.txt`) + // } + // /** + // * Build a manifest list of all cids not yet uploaded to the database: + // * args: + // * params passed to the initial api upload list request + // * limit number of pages retrieved. Set to Infinity to retrieve all CIDs since the most recently uploaded saved in the db + // * the most recent upload saved to our database (so as to request only more newer uploads) + // */ + // await createManifestFromWeb3StorageCids({ + // size: limit, + // page: 1, + // sortBy: 'Date', + // sortOrder: 'Desc' + // }, maxPages, lastSavedDate) + // /** + // * Retrieve and unpack files one by one from the manifest list and bulkWrite contents to the database + // * args: + // * limit number of entries to the database (this will only be used for test) + // */ + await getCidFilesFromManifestList() } catch (e) { console.log('===================================== [Function: CidImporter]') console.log(e) diff --git a/packages/be/crons/x_cid-importer.js b/packages/be/crons/x_cid-importer.js new file mode 100644 index 00000000..9721378f --- /dev/null +++ b/packages/be/crons/x_cid-importer.js @@ -0,0 +1,322 @@ +/** + * + * ⏱️️ [Cron | weekly] CID Importer + * + * Caches CID data + * + */ + +// ///////////////////////////////////////////////////// Imports + general setup +// ----------------------------------------------------------------------------- +const ModuleAlias = require('module-alias') +const Path = require('path') +const Axios = require('axios') +const Fs = require('fs-extra') +const Express = require('express') +const Util = require('util') +const Stream = require('stream') +const Pipeline = Util.promisify(Stream.pipeline) +const readline = require('node:readline') +const Spawn = require('child_process').spawn +const argv = require('minimist')(process.argv.slice(2)) +require('dotenv').config({ path: Path.resolve(__dirname, '../.env') }) + +const MC = require('../config') + +const CID_TMP_DIR = Path.resolve(`${MC.packageRoot}/tmp/cid-files`) + +// ////////////////////////////////////////////////////////////////// Initialize +MC.app = Express() + +// ///////////////////////////////////////////////////////////////////// Aliases +ModuleAlias.addAliases({ + '@Root': MC.packageRoot, + '@Modules': `${MC.packageRoot}/modules` +}) + +try { + const modulesRoot = `${MC.packageRoot}/modules` + const items = Fs.readdirSync(modulesRoot) + items.forEach((name) => { + const path = `${modulesRoot}/${name}` + if (Fs.statSync(path).isDirectory()) { + const moduleName = `${name[0].toUpperCase() + name.substring(1)}` + ModuleAlias.addAlias(`@Module_${moduleName}`, path) + } + }) +} catch (e) { + console.log(e) +} + +// ///////////////////////////////////////////////////////////////////// Modules +require('@Module_Database') +require('@Module_Cidtemp') +const { GetFileFromDisk, SecondsToHms } = require('@Module_Utilities') + +// /////////////////////////////////////////////////////////////////// Functions +// ------------------------------------------------------------ retrieveCidFiles +const retrieveCidFile = async (line, retryNo = 0) => { + try { + if (retryNo > 0) { + console.log(`Retry number ${retryNo}`) + } + const upload = JSON.parse(line) + // fetch file using upload cid + const response = await Axios.get(`https://${upload.cid}.ipfs.w3s.link/`, { + responseType: 'stream' + }) + // if a file already exists with this name in the temp folder, + // delete it to make way for an updated version + await deleteTemporaryFile(upload.name) + // write file data to new zst file in the temp folder + await Pipeline(response.data, Fs.createWriteStream(`${CID_TMP_DIR}/${upload.name}`)) + // unpack the zst and return the inner json + return await unpackRetrievedFile({ + cid: upload.cid, + name: upload.name, + updated: upload.updated, + created: upload.created + }) + } catch (e) { + console.log('====================================== [Function: unpackCids]') + console.log(e) + if (retryNo < 10) { + console.log(`Error retrieving CID ${JSON.parse(line).cid}. Retrying retrieval...`) + await retrieveCidFile(line, retryNo + 1) + } else { + const cid = JSON.parse(line).cid + console.log(`Could not retrieve CID ${cid}. Max retries reached.`) + await cacheFailedCID(cid) + } + } +} + +// --------------------------------------------------------------- unpackZstFile +const unpackZstFile = (file) => { + return new Promise((resolve, reject) => { + const unzstd = Spawn('unzstd', [`../tmp/cid-files/${file.name}`]) + const errors = [] + unzstd.stderr.on('data', (msg) => { + errors.push(`Error unpacking ${file.name}: ${msg.toString()}`) + }) + unzstd.on('exit', (code) => { + const err = errors.length > 0 && code !== 0 + if (err) { + console.log(errors.join('\n')) + } + resolve() + }) + }) +} + +// --------------------------------------------------------- unpackRetrievedFile +const unpackRetrievedFile = async (file) => { + try { + const jsonFilename = file.name.substring(0, file.name.length - 4) + await deleteTemporaryFile(jsonFilename) + await unpackZstFile(file) + await deleteTemporaryFile(file.name) + const json = await GetFileFromDisk(`${CID_TMP_DIR}/${jsonFilename}`, true) + const fileMetadata = { + piece_cid: json.piece_cid, + payload_cid: json.payload_cid, + raw_car_file_size: json.raw_car_file_size, + dataset_slug: json.dataset, + filename: jsonFilename, + web3storageCreatedAt: file.created, + web3storageUpdatedAt: file.updated + } + await deleteTemporaryFile(jsonFilename) + return fileMetadata + } catch (e) { + console.log('============================ [Function: unpackRetrievedFiles]') + console.log(e) + } +} + +// ------------------------------------------------- writeFileMetadataToDatabase +const writeFileMetadataToDatabase = async (retrievedFiles) => { + try { + const operations = [] + const len = retrievedFiles.length + for (let i = 0; i < len; i++) { + const file = retrievedFiles[i] + operations.push({ + updateOne: { + filter: { payload_cid: file.payload_cid }, + update: { $set: file }, + upsert: true + } + }) + } + const response = await MC.model.Cidtemp.bulkWrite(operations, { ordered: false }) + return response.result + } catch (e) { + console.log('========================= [Function: writeCidFilesToDatabase]') + console.log(e) + } +} + +// -------------------------------------------------------------- cacheFailedCid +const cacheFailedCID = async (cid) => { + try { + await Pipeline(`${cid}\n`, Fs.createWriteStream(`${CID_TMP_DIR}/failed-cid-retrievals.txt`, { flags: 'a' })) + } catch (e) { + console.log('================================= [Function: cacheFailedCID ]') + console.log(e) + } +} + +// --------------------------------------------------------- deleteTemporaryFile +const deleteTemporaryFile = async (filename) => { + try { + if (Fs.existsSync(`${CID_TMP_DIR}/${filename}`)) { + Fs.unlinkSync(`${CID_TMP_DIR}/${filename}`) + } + } catch (e) { + console.log('============================ [Function: deleteTemporaryFile ]') + console.log(e) + } +} + +// ------------------------------------------------- getCidFilesFromManifestList +const getCidFilesFromManifestList = async (importMax) => { + try { + if (Fs.existsSync(`${CID_TMP_DIR}/cid-manifest.txt`)) { + const manifest = Fs.createReadStream(`${CID_TMP_DIR}/cid-manifest.txt`) + const rl = readline.createInterface({ + input: manifest, + crlfDelay: Infinity + }) + // import all lines from the manifest to an array + const manifestCidLines = [] + for await (const line of rl) { + manifestCidLines.push(line) + } + // reverse the array to import oldest CIDs first + // begin writing to the database in batches + manifestCidLines.reverse() + let retrievedFiles = [] + let total = 0 + const batchSize = argv.pagesize || 1000 + const len = manifestCidLines.length + for (let i = 0; i < len; i++) { + if (i < importMax) { + const line = manifestCidLines[i] + console.log(`Retrieving file ${i + 1} from the CID manifest list.`) + const retrieved = await retrieveCidFile(line) + if (retrieved) { retrievedFiles.push(retrieved) } + // write retrieved file data to the database in batches of 1000 + if ((i + 1) % batchSize === 0) { + const result = await writeFileMetadataToDatabase(retrievedFiles) + total = total + result.nUpserted + result.nModified + console.log(`${result.nUpserted} new CIDs imported in this batch | ${result.nModified} CIDs updated in this batch | A total of ${total} CIDs imported/updated so far.`) + retrievedFiles = [] + } + } else { + break + } + } + const result = await writeFileMetadataToDatabase(retrievedFiles) + console.log(`${result.nUpserted} new CIDs imported in this batch | ${result.nModified} CIDs updated in this batch | A total of ${total + result.nUpserted + result.nModified} CIDs were imported/updated to the database.`) + } + } catch (e) { + console.log('===================== [Function: getCidFilesFromManifestList]') + console.log(e) + } +} + +// ------------------------------------------- createManifestFromWeb3StorageCids +const createManifestFromWeb3StorageCids = async (searchParams, maxPages, lastSavedDate, count) => { + try { + const options = { headers: { Accept: 'application/json', Authorization: `Bearer ${process.env.WEB3STORAGE_TOKEN}` } } + const params = Object.keys(searchParams).map((item) => `${item}=${searchParams[item]}`).join('&') + const url = `https://api.web3.storage/user/uploads?${params}` + const response = await Axios.get(url, options) + const uploads = response.data + const len = uploads.length + let skipCount = 0 + let total = 0 + let lastSavedDateReached = false + for (let i = 0; i < len; i++) { + const upload = uploads[i] + const uploadDate = new Date(upload.created).getTime() + if (uploadDate > lastSavedDate) { + if (upload.name.endsWith('.zst')) { + const newline = JSON.stringify({ cid: upload.cid, name: upload.name, created: upload.created, updated: upload.updated }) + await Pipeline(`${newline}\n`, Fs.createWriteStream(`${CID_TMP_DIR}/cid-manifest.txt`, { flags: 'a' })) + } else { + skipCount++ + } + } else { + lastSavedDateReached = true + break + } + total = i + 1 + } + total = total - skipCount + const lineWriteCount = count ? count + total : total + console.log(`${total} new CID(s) saved to the manifest in this batch | ${skipCount} file(s) were skipped due to filetype mismatch | total of ${lineWriteCount} new CID(s) saved so far`) + if (uploads.length === searchParams.size && !lastSavedDateReached && searchParams.page < maxPages) { + searchParams.page += 1 + await createManifestFromWeb3StorageCids(searchParams, maxPages, lastSavedDate, lineWriteCount) + } else { + console.log(`Finished writing CID manifest file - a total of ${lineWriteCount} new CID(s) will be imported to the database.`) + } + } catch (e) { + console.log('=============== [Function: createManifestFromWeb3StorageCids]') + console.log(e) + if (e.response && e.response.status === 500) { + await createManifestFromWeb3StorageCids(searchParams, maxPages, lastSavedDate, count) + console.log(`Retrying fetching uploads on page ${searchParams.page}`) + } + } +} + +// ///////////////////////////////////////////////////////////////// CidImporter +const CidImporter = async () => { + try { + const start = process.hrtime()[0] + const limit = argv.pagesize || 1000 + const maxPages = argv.maxpages || Infinity + console.log(`📖 CID import started | page size of ${limit} and page maximum ${maxPages}.`) + // Get the latest upload entry from the database + const mostRecentCid = await MC.model.Cidtemp.find().sort({ web3storageCreatedAt: -1 }).limit(1) + const mostRecentDocument = mostRecentCid[0] + console.log('Most recent CID imported:') + console.log(mostRecentDocument) + const lastSavedDate = mostRecentDocument ? new Date(mostRecentDocument.web3storageCreatedAt).getTime() : 0 + // Delete the outdated manifest file if it exists + await deleteTemporaryFile('cid-manifest.txt') + /** + * Build a manifest list of all cids not yet uploaded to the database: + * args: + * params passed to the initial api upload list request + * limit number of pages retrieved. Set to Infinity to retrieve all CIDs since the most recently uploaded saved in the db + * the most recent upload saved to our database (so as to request only more newer uploads) + */ + await createManifestFromWeb3StorageCids({ + size: limit, + page: 1, + sortBy: 'Date', + sortOrder: 'Desc' + }, maxPages, lastSavedDate) + /** + * Retrieve and unpack files one by one from the manifest list and bulkWrite contents to the database + * args: + * limit number of entries to the database (this will only be used for test) + */ + await getCidFilesFromManifestList(limit * maxPages) + const end = process.hrtime()[0] + console.log(`📒 CID import finished | took ${SecondsToHms(end - start)}`) + process.exit(0) + } catch (e) { + console.log('===================================== [Function: CidImporter]') + console.log(e) + process.exit(0) + } +} + +// ////////////////////////////////////////////////////////////////// Initialize +// ----------------------------------------------------------------------------- +MC.app.on('mongoose-connected', CidImporter) From 49c1bddd8f8601402cb37e1556563bb13ef530e6 Mon Sep 17 00:00:00 2001 From: svvimming Date: Wed, 3 May 2023 12:01:05 -0400 Subject: [PATCH 2/3] feat: move workerpool dependency to be package --- package-lock.json | 16 +++++++- packages/be/modules/cidtemp/index.js | 16 ++++++++ packages/be/modules/cidtemp/model/index.js | 46 ++++++++++++++++++++++ packages/be/package.json | 3 +- 4 files changed, 78 insertions(+), 3 deletions(-) create mode 100644 packages/be/modules/cidtemp/index.js create mode 100644 packages/be/modules/cidtemp/model/index.js diff --git a/package-lock.json b/package-lock.json index 61195e98..3e4642fa 100644 --- a/package-lock.json +++ b/package-lock.json @@ -29253,6 +29253,11 @@ "microevent.ts": "~0.1.1" } }, + "node_modules/workerpool": { + "version": "6.4.0", + "resolved": "https://registry.npmjs.org/workerpool/-/workerpool-6.4.0.tgz", + "integrity": "sha512-i3KR1mQMNwY2wx20ozq2EjISGtQWDIfV56We+yGJ5yDs8jTwQiLLaqHlkBHITlCuJnYlVRmXegxFxZg7gqI++A==" + }, "node_modules/wrap-ansi": { "version": "7.0.0", "resolved": "https://registry.npmjs.org/wrap-ansi/-/wrap-ansi-7.0.0.tgz", @@ -29608,7 +29613,8 @@ "socket.io": "^4.5.2", "socket.io-client": "^4.5.2", "uuid": "^8.3.1", - "uuid-apikey": "^1.5.1" + "uuid-apikey": "^1.5.1", + "workerpool": "^6.4.0" }, "devDependencies": { "eslint": "^8.23.0", @@ -37466,7 +37472,8 @@ "socket.io": "^4.5.2", "socket.io-client": "^4.5.2", "uuid": "^8.3.1", - "uuid-apikey": "^1.5.1" + "uuid-apikey": "^1.5.1", + "workerpool": "*" } }, "better-opn": { @@ -52182,6 +52189,11 @@ "microevent.ts": "~0.1.1" } }, + "workerpool": { + "version": "6.4.0", + "resolved": "https://registry.npmjs.org/workerpool/-/workerpool-6.4.0.tgz", + "integrity": "sha512-i3KR1mQMNwY2wx20ozq2EjISGtQWDIfV56We+yGJ5yDs8jTwQiLLaqHlkBHITlCuJnYlVRmXegxFxZg7gqI++A==" + }, "wrap-ansi": { "version": "7.0.0", "resolved": "https://registry.npmjs.org/wrap-ansi/-/wrap-ansi-7.0.0.tgz", diff --git a/packages/be/modules/cidtemp/index.js b/packages/be/modules/cidtemp/index.js new file mode 100644 index 00000000..cb2ea84e --- /dev/null +++ b/packages/be/modules/cidtemp/index.js @@ -0,0 +1,16 @@ +console.log('📦 [module] cid temp') + +// ///////////////////////////////////////////////////////////////////// Imports +// ----------------------------------------------------------------------------- +const { RunStartupChecks } = require('@Module_Utilities') + +const MC = require('@Root/config') + +// ////////////////////////////////////////////////////////////// Startup Checks +// ----------------------------------------------------------------------------- +const checks = [] +RunStartupChecks(checks) + +// //////////////////////////////////////////////////////////////// Import Model +// ----------------------------------------------------------------------------- +MC.model.Cidtemp = require('@Module_Cidtemp/model') diff --git a/packages/be/modules/cidtemp/model/index.js b/packages/be/modules/cidtemp/model/index.js new file mode 100644 index 00000000..745e13b5 --- /dev/null +++ b/packages/be/modules/cidtemp/model/index.js @@ -0,0 +1,46 @@ +console.log('💿 [model] op_cids_temp') + +// ///////////////////////////////////////////////////////////////////// Imports +// ----------------------------------------------------------------------------- +const Mongoose = require('mongoose') +const Schema = Mongoose.Schema + +// ////////////////////////////////////////////////////////////////////// Schema +// ----------------------------------------------------------------------------- +const CIDtempSchema = new Schema({ + piece_cid: { + type: String, + required: true + }, + payload_cid: { + type: String, + required: true, + index: true + }, + raw_car_file_size: { + type: Number, + required: true + }, + dataset_slug: { + type: String, + required: true + }, + web3storageCreatedAt: { + type: Date, + required: true + }, + web3storageUpdatedAt: { + type: Date, + required: true + }, + filename: { + type: String, + required: true + } +}, { + timestamps: false +}) + +// ////////////////////////////////////////////////////////////////////// Export +// ----------------------------------------------------------------------------- +module.exports = Mongoose.model('op_cids_temp', CIDtempSchema) diff --git a/packages/be/package.json b/packages/be/package.json index 6f4c4edf..698305dc 100644 --- a/packages/be/package.json +++ b/packages/be/package.json @@ -52,6 +52,7 @@ "socket.io": "^4.5.2", "socket.io-client": "^4.5.2", "uuid": "^8.3.1", - "uuid-apikey": "^1.5.1" + "uuid-apikey": "^1.5.1", + "workerpool": "^6.4.0" } } From 1359d3a93aea01a7bbf22bcd497eb3d373e857e1 Mon Sep 17 00:00:00 2001 From: svvimming Date: Fri, 5 May 2023 15:13:12 -0400 Subject: [PATCH 3/3] feat: console log statements and edge case handling --- packages/be/crons/cid-batch-import.js | 128 +++++++++++------ packages/be/crons/cid-importer.js | 133 +++++++++++------- .../open-panda-dataset-meta-bk__filter.txt | 5 + 3 files changed, 170 insertions(+), 96 deletions(-) create mode 100644 packages/be/crons/open-panda-dataset-meta-bk__filter.txt diff --git a/packages/be/crons/cid-batch-import.js b/packages/be/crons/cid-batch-import.js index fa4fbebf..295ebba7 100644 --- a/packages/be/crons/cid-batch-import.js +++ b/packages/be/crons/cid-batch-import.js @@ -46,10 +46,11 @@ try { // ///////////////////////////////////////////////////////////////////// Modules require('@Module_Database') require('@Module_Cidtemp') +const { GetFileFromDisk } = require('@Module_Utilities') // /////////////////////////////////////////////////////////////////// Functions // ------------------------------------------------------------ retrieveCidFiles -const retrieveCidFile = async (line, retryNo = 0) => { +const retrieveCidFile = async (line, batchNo, retryNo = 0) => { try { if (retryNo > 0) { console.log(`Retry number ${retryNo}`) @@ -61,34 +62,34 @@ const retrieveCidFile = async (line, retryNo = 0) => { }) // if a file already exists with this name in the temp folder, // delete it to make way for an updated version - await deleteTemporaryFile(CID_TMP_DIR, upload.name) + await deleteTemporaryFile(`batch_${batchNo}/${upload.name}`) // write file data to new zst file in the temp folder - await Pipeline(response.data, Fs.createWriteStream(`${CID_TMP_DIR}/${upload.name}`)) + await Pipeline(response.data, Fs.createWriteStream(`${CID_TMP_DIR}/batch_${batchNo}/${upload.name}`)) // unpack the zst and return the inner json - return await unpackRetrievedFile(CID_TMP_DIR, { + return await unpackRetrievedFile({ cid: upload.cid, name: upload.name, updated: upload.updated, created: upload.created - }) + }, batchNo) } catch (e) { - console.log('====================================== [Function: unpackCids]') - console.log(e) if (retryNo < 10) { console.log(`Error retrieving CID ${JSON.parse(line).cid}. Retrying retrieval...`) - await retrieveCidFile(CID_TMP_DIR, line, retryNo + 1) + await retrieveCidFile(line, batchNo, retryNo + 1) } else { const cid = JSON.parse(line).cid + await cacheFailedCID(cid) + console.log('==================================== [Function: unpackCids]') console.log(`Could not retrieve CID ${cid}. Max retries reached.`) - await cacheFailedCID(CID_TMP_DIR, cid) + console.log(e) } } } // --------------------------------------------------------------- unpackZstFile -const unpackZstFile = (file) => { +const unpackZstFile = (file, batchNo) => { return new Promise((resolve, reject) => { - const unzstd = Spawn('unzstd', [`../tmp/cid-files/${file.name}`]) + const unzstd = Spawn('unzstd', [`../tmp/cid-files/batch_${batchNo}/${file.name}`]) const errors = [] unzstd.stderr.on('data', (msg) => { errors.push(`Error unpacking ${file.name}: ${msg.toString()}`) @@ -104,13 +105,11 @@ const unpackZstFile = (file) => { } // --------------------------------------------------------- unpackRetrievedFile -const unpackRetrievedFile = async (file) => { +const unpackRetrievedFile = async (file, batchNo) => { try { const jsonFilename = file.name.substring(0, file.name.length - 4) - await deleteTemporaryFile(jsonFilename) - await unpackZstFile(file) - // await deleteTemporaryFile(file.name) // TODO : don't delete zst file in tmp directory - const json = await GetFileFromDisk(`${CID_TMP_DIR}/${jsonFilename}`, true) + await unpackZstFile(file, batchNo) + const json = await GetFileFromDisk(`${CID_TMP_DIR}/batch_${batchNo}/${jsonFilename}`, true) const fileMetadata = { piece_cid: json.piece_cid, payload_cid: json.payload_cid, @@ -120,7 +119,7 @@ const unpackRetrievedFile = async (file) => { web3storageCreatedAt: file.created, web3storageUpdatedAt: file.updated } - await deleteTemporaryFile(jsonFilename) // TODO : do delete unpacked json file in tmp directory + await deleteTemporaryFile(`batch_${batchNo}/${jsonFilename}`) return fileMetadata } catch (e) { console.log('============================ [Function: unpackRetrievedFiles]') @@ -139,10 +138,10 @@ const cacheFailedCID = async (cid) => { } // --------------------------------------------------------- deleteTemporaryFile -const deleteTemporaryFile = async (filename) => { +const deleteTemporaryFile = async (path) => { try { - if (Fs.existsSync(`${CID_TMP_DIR}/${filename}`)) { - Fs.unlinkSync(`${CID_TMP_DIR}/${filename}`) + if (Fs.existsSync(`${CID_TMP_DIR}/${path}`)) { + Fs.unlinkSync(`${CID_TMP_DIR}/${path}`) } } catch (e) { console.log('============================ [Function: deleteTemporaryFile ]') @@ -173,12 +172,35 @@ const writeBatchMetadataToDatabase = async (retrievedFiles) => { } } -// ------------------------------------------------ backupFilesToBackblazeBucket -const backupFilesToBackblazeBucket = async (retrievedFiles) => { +// ------------------------------------------------- backupCidsToBackblazeBucket +const backupCidsToBackblazeBucket = async (batchNo) => { try { - console.log('backup started') + const rclone = Spawn('rclone', [ + 'copy', + `${MC.tmpRoot}/cid-files/batch_${batchNo}`, + process.env.B2_OPENPANDA_BUCKET, + '--filter-from', + `${MC.packageRoot}/crons/open-panda-dataset-meta-bk__filter.txt` + // process.env.B2_OPENPANDA_FILTER + ]) + const errors = [] + for await (const msg of rclone.stderr) { + errors.push(msg.toString()) + } + return await new Promise((resolve, reject) => { + rclone.on('exit', (code) => { + const err = errors.length > 0 && code !== 0 + err ? reject({ + success: false, + message: errors.join('\n\n') + }) : resolve({ + success: true, + message: `✓ CID batch ${batchNo} backup successful` + }) + }) + }) } catch (e) { - console.log('==================== [Function: backupFilesToBackblazeBucket]') + console.log('===================== [Function: backupCidsToBackblazeBucket]') console.log(e) } } @@ -187,31 +209,45 @@ const backupFilesToBackblazeBucket = async (retrievedFiles) => { // -------------------------------------------------------- processManifestBatch const processManifestBatch = async (batch, batchNo) => { try { + // make a temporary subdirectory for this batch + if (!Fs.existsSync(`${CID_TMP_DIR}/batch_${batchNo}`)) { + Fs.mkdirSync(`${CID_TMP_DIR}/batch_${batchNo}`) + } + // individually download each CID file in the batch + // save zst to a temp/cid-files/batch_x folder, extract and return metadata + // to the retrieved array const len = batch.length - const uploaded = Math.floor(Math.random() * len) - const modified = len - uploaded - console.log(len, uploaded, modified) - // const retrievedFiles = [] - // for (let i = 0; i < len; i++) { - // const cidManifestItem = batch[i] - // const retrieved = await retrieveCidFile(cidManifestItem) - // if (retrieved) { retrievedFiles.push(retrieved) } - // } - // console.log(retrievedFiles) - // const databaseWriteResult = await writeBatchMetadataToDatabase(retrievedFiles) - // const batchBackupResult = await backupFilesToBackblazeBucket(retrievedFiles) - return new Promise((resolve, reject) => { - setTimeout(() => { - const result = new WorkerPool.Transfer({ - batch: batchNo, - databaseWriteResult: { - nUpserted: uploaded, - nModified: modified - }, - batchBackupResult: false + const retrievedFiles = [] + for (let i = 0; i < len; i++) { + const cidManifestItem = batch[i] + const retrieved = await retrieveCidFile(cidManifestItem, batchNo) + if (retrieved) { retrievedFiles.push(retrieved) } + } + if (!retrievedFiles.length) { + throw new Error('No CIDs could be retrieved from this batch') + } + // save batch metadata to the database + const databaseWriteResult = await writeBatchMetadataToDatabase(retrievedFiles) + // backup zst files in the corresponding temp/cid-files/batch_x folder to backblaze + const batchBackupResult = await backupCidsToBackblazeBucket(batchNo) + // if the backup is successful clean up temp folder by deleting batch + if (batchBackupResult && batchBackupResult.success) { + if (Fs.existsSync(`${CID_TMP_DIR}/batch_${batchNo}`)) { + Fs.rm(`${CID_TMP_DIR}/batch_${batchNo}`, { recursive: true, force: true }) + } + } + // return results to the main thread + return await new Promise((resolve, reject) => { + if (!databaseWriteResult || !batchBackupResult) { + reject() + } else { + const result = new WorkerPool.Transfer({ + batchNo: batchNo, + databaseWriteResult: databaseWriteResult, + batchBackupResult: batchBackupResult }) resolve(result) - }, 3000) + } }) } catch (e) { console.log('============================ [Function: processManifestBatch]') diff --git a/packages/be/crons/cid-importer.js b/packages/be/crons/cid-importer.js index a0b61b16..ecdf692a 100644 --- a/packages/be/crons/cid-importer.js +++ b/packages/be/crons/cid-importer.js @@ -31,8 +31,8 @@ const Pool = WorkerPool.pool(Path.resolve(__dirname, './cid-batch-import.js'), { maxWorkers: numThreads, workerType: 'thread' }) - let startTime +let manifestLength = 0 // ////////////////////////////////////////////////////////////////// Initialize MC.app = Express() @@ -63,42 +63,70 @@ require('@Module_Cidtemp') const { SecondsToHms } = require('@Module_Utilities') // /////////////////////////////////////////////////////////////////// Functions +// ------------------------------------------------------- getCurrentImportTotal +const getCurrentImportTotal = (results) => { + const len = results.length + let dbTotalImported = 0 + let dbTotalModified = 0 + for (let i = 0; i < len; i++) { + const result = results[i] + if (result && result.databaseWriteResult) { + const newlyImported = result.databaseWriteResult.nUpserted || 0 + const newlyModified = result.databaseWriteResult.nModified || 0 + dbTotalImported = dbTotalImported + newlyImported + dbTotalModified = dbTotalModified + newlyModified + } + } + return { + imported: dbTotalImported, + modified: dbTotalModified, + total: dbTotalImported + dbTotalModified + } +} + // --------------------------------------------------- performCidBatchOperations -const performCidBatchOperations = (manifest, batchNo, tasks, results) => { +const performCidBatchOperations = (manifest, batchNo, tasks, results, retryQueue, inRetryLoop) => { try { + if (inRetryLoop) { + console.log('The following batch previously failed and is being retried:') + } const batchSize = argv.pagesize || 1000 const batch = manifest.slice(0, batchSize) tasks.push( - Pool.exec('processManifestBatch', [batch, batchNo]).then((result) => { + Pool.exec('processManifestBatch', [batch, batchNo]).then(async (result) => { + if (!result || !result.databaseWriteResult || !result.batchBackupResult) { + throw new Error(`An issue occured with batch ${batchNo}`) + } results.push(result) - imported = result.databaseWriteResult.nUpserted + result.databaseWriteResult.nModified - console.log(`Batch ${result.batch} finished | ${imported} CIDs were imported to the database in this batch`) - }).catch((err) => { + const imported = `${result.databaseWriteResult.nUpserted + result.databaseWriteResult.nModified} CIDs were imported to the db in this batch` + const activeTasks = `${Pool.stats().activeTasks} batches currently being processed` + const pendingTasks = `${Pool.stats().pendingTasks} pending batches remaining` + const currentTotals = await getCurrentImportTotal(results) + console.log(`Batch ${result.batchNo} finished | ${imported} | ${result.batchBackupResult.message} | ${activeTasks} | ${pendingTasks} | ${currentTotals.total} of ${manifestLength} CIDs completed`) + }).catch((e) => { + retryQueue.push(batch) console.log('================================ Error returned by worker') - console.error(err) + console.error(e) }) ) manifest.splice(0, batchSize) if (manifest.length) { - performCidBatchOperations(manifest, ++batchNo, tasks, results) + performCidBatchOperations(manifest, ++batchNo, tasks, results, retryQueue, inRetryLoop) + } else if (retryQueue.length) { + performCidBatchOperations(retryQueue, 1, tasks, results, [], true) } else { Promise.all(tasks).catch((e) => { console.log('=========================== Error returned by worker pool') console.log(e) - }).then(() => { - Pool.terminate() + process.exit(0) + }).then(async () => { const len = results.length - console.log(results) - let dbTotalImported = 0 - let dbTotalModified = 0 - for (let i = 0; i < len; i++) { - const result = results[i] - dbTotalImported = dbTotalImported + result.databaseWriteResult.nUpserted - dbTotalModified = dbTotalModified + result.databaseWriteResult.nModified - } const endTime = process.hrtime()[0] + const finalResults = await getCurrentImportTotal(results) console.log(`📒 CID import finished | took ${SecondsToHms(endTime - startTime)}`) - console.log(`A total of ${dbTotalImported + dbTotalModified} CIDs were processed by the database | ${dbTotalImported} imported | ${dbTotalModified} modified`) + console.log(`A total of ${finalResults.total} CIDs were processed by the database | ${finalResults.imported} imported | ${finalResults.modified} modified` | `${finalResults.total} total CIDs successfully backed up`) + console.log(`${manifestLength - finalResults.total} CIDs either already existed in the database or could not be retrieved and have been cached for the next import.`) + Pool.terminate() process.exit(0) }) } @@ -123,8 +151,9 @@ const getCidFilesFromManifestList = async () => { } // reverse the array to import oldest CIDs first manifest.reverse() + manifestLength = manifest.length // pass manifest on to the worker pool - performCidBatchOperations(manifest, 1, [], []) + performCidBatchOperations(manifest, 1, [], [], [], false) } catch (e) { console.log('===================== [Function: getCidFilesFromManifestList]') console.log(e) @@ -184,36 +213,40 @@ const CidImporter = async () => { startTime = process.hrtime()[0] const limit = argv.pagesize || 1000 const maxPages = argv.maxpages || Infinity - // console.log(`📖 CID import started | page size of ${limit} and page maximum ${maxPages}.`) - // // Get the latest upload entry from the database - // // const mostRecentCid = await MC.model.Cidtemp.find().sort({ web3storageCreatedAt: -1 }).limit(1) // TODO : uncomment line and replace with override option - // const mostRecentDocument = false // mostRecentCid[0] - // console.log('Most recent CID imported:') - // console.log(mostRecentDocument) - // const lastSavedDate = mostRecentDocument ? new Date(mostRecentDocument.web3storageCreatedAt).getTime() : 0 - // // Delete the outdated manifest file if it exists - // // await deleteTemporaryFile('cid-manifest.txt') - // if (Fs.existsSync(`${CID_TMP_DIR}/cid-manifest.txt`)) { - // Fs.unlinkSync(`${CID_TMP_DIR}/cid-manifest.txt`) - // } - // /** - // * Build a manifest list of all cids not yet uploaded to the database: - // * args: - // * params passed to the initial api upload list request - // * limit number of pages retrieved. Set to Infinity to retrieve all CIDs since the most recently uploaded saved in the db - // * the most recent upload saved to our database (so as to request only more newer uploads) - // */ - // await createManifestFromWeb3StorageCids({ - // size: limit, - // page: 1, - // sortBy: 'Date', - // sortOrder: 'Desc' - // }, maxPages, lastSavedDate) - // /** - // * Retrieve and unpack files one by one from the manifest list and bulkWrite contents to the database - // * args: - // * limit number of entries to the database (this will only be used for test) - // */ + console.log(`📖 CID import started | page size of ${limit} and page maximum ${maxPages}.`) + // Get the latest upload entry from the database + const mostRecentCid = await MC.model.Cidtemp.find().sort({ web3storageCreatedAt: -1 }).limit(1) + let mostRecentDocument = mostRecentCid[0] + if (argv.all) { + mostRecentDocument = false + } else { + console.log('Most recent CID imported:') + console.log(mostRecentDocument) + } + const lastSavedDate = mostRecentDocument ? new Date(mostRecentDocument.web3storageCreatedAt).getTime() : 0 + // Delete the outdated manifest file if it exists + // await deleteTemporaryFile('cid-manifest.txt') + if (Fs.existsSync(`${CID_TMP_DIR}/cid-manifest.txt`)) { + Fs.unlinkSync(`${CID_TMP_DIR}/cid-manifest.txt`) + } + /** + * Build a manifest list of all cids not yet uploaded to the database: + * args: + * params passed to the initial api upload list request + * limit number of pages retrieved. Set to Infinity to retrieve all CIDs since the most recently uploaded saved in the db + * the most recent upload saved to our database (so as to request only more newer uploads) + */ + await createManifestFromWeb3StorageCids({ + size: limit, + page: 1, + sortBy: 'Date', + sortOrder: 'Desc' + }, maxPages, lastSavedDate) + /** + * Retrieve and unpack files one by one from the manifest list and bulkWrite contents to the database + * args: + * limit number of entries to the database (this will only be used for test) + */ await getCidFilesFromManifestList() } catch (e) { console.log('===================================== [Function: CidImporter]') diff --git a/packages/be/crons/open-panda-dataset-meta-bk__filter.txt b/packages/be/crons/open-panda-dataset-meta-bk__filter.txt new file mode 100644 index 00000000..1e854a9d --- /dev/null +++ b/packages/be/crons/open-panda-dataset-meta-bk__filter.txt @@ -0,0 +1,5 @@ +# Filtering rules (Arrange the order of filter rules with the most restrictive first and work down.) ++ *.zst + +# Exclude everything else +- **