Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
579 changes: 574 additions & 5 deletions package-lock.json

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
"simple-statistics": "^7.8.3",
"swagger-jsdoc": "^6.2.8",
"swagger-ui-express": "^5.0.0",
"utf-8-validate": "^5.0.10"
"utf-8-validate": "^5.0.10",
"winston": "^3.13.0",
"winston-loki": "^6.1.2"
}
}
34 changes: 16 additions & 18 deletions src/ECGNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import { spawn } from 'node:child_process';
import { NodeConfig } from './model/NodeConfig';
import { GetNodeConfig, sleep } from './utils/Utils';
import * as dotenv from 'dotenv';
import { Log } from './utils/Logger';
import logger from './utils/Logger';
import { LoadConfiguration } from './config/Config';
dotenv.config();

async function main() {
process.title = 'ECG_NODE';
Log(`[ECG-NODE] STARTED FOR MARKET_ID: ${MARKET_ID}`);
logger.debug(`[ECG-NODE] STARTED FOR MARKET_ID: ${MARKET_ID}`);
if (!fs.existsSync(path.join(DATA_DIR))) {
fs.mkdirSync(path.join(DATA_DIR), { recursive: true });
}
Expand Down Expand Up @@ -47,45 +47,43 @@ function isDebug() {

async function startProcessors(nodeConfig: NodeConfig) {
if (nodeConfig.processors.AUCTION_BIDDER.enabled) {
startWithSpawn('AuctionBidder');
startWithSpawn('AuctionBidder', 'ECG_NODE_AUCTION_BIDDER');
await sleep(5000);
}
if (nodeConfig.processors.LOAN_CALLER.enabled) {
startWithSpawn('LoanCaller');
startWithSpawn('LoanCaller', 'ECG_NODE_LOAN_CALLER');
await sleep(5000);
}
if (nodeConfig.processors.TERM_OFFBOARDER.enabled) {
startWithSpawn('TermOffboarder');
startWithSpawn('TermOffboarder', 'ECG_NODE_TERM_OFFBOARDER');
await sleep(5000);
}
if (nodeConfig.processors.USER_SLASHER.enabled) {
startWithSpawn('UserSlasher');
startWithSpawn('UserSlasher', 'ECG_NODE_USER_SLASHER');
await sleep(5000);
}
// if (nodeConfig.processors.TERM_ONBOARDING_WATCHER.enabled) {
// startWithSpawn('TermOnboardingWatcher');
// await sleep(5000);
// }
if (nodeConfig.processors.TESTNET_MARKET_MAKER.enabled) {
startWithSpawn('TestnetMarketMaker');
startWithSpawn('TestnetMarketMaker', 'ECG_NODE_TESTNET_MARKET_MAKER');
await sleep(5000);
}
if (nodeConfig.processors.HISTORICAL_DATA_FETCHER.enabled) {
startWithSpawn('HistoricalDataFetcher');
startWithSpawn('HistoricalDataFetcher', 'ECG_NODE_HISTORICAL_DATA_FETCHER');
await sleep(5000);
}
}

function startWithSpawn(processorName: string) {
function startWithSpawn(processorName: string, appName: string) {
const nodeProcessFullPath = path.join(process.cwd(), 'processors', `${processorName}.js`);
Log(`Starting ${nodeProcessFullPath}`);
const child = spawn('node', [nodeProcessFullPath], { stdio: 'inherit' });
logger.debug(`Starting ${nodeProcessFullPath}`);
const updatedEnv = structuredClone(process.env);
updatedEnv.APP_NAME = appName;
const child = spawn('node', [nodeProcessFullPath], { stdio: 'inherit', env: updatedEnv });

child.on('close', (code) => {
Log(`Child process exited with code ${code}. Restarting after 10sec`);
setTimeout(() => startWithSpawn(processorName), 10000);
logger.debug(`Child process exited with code ${code}. Restarting after 10sec`);
setTimeout(() => startWithSpawn(processorName, appName), 10000);
});

Log(`Started ${nodeProcessFullPath}`);
logger.debug(`Started ${nodeProcessFullPath}`);
}
main();
6 changes: 3 additions & 3 deletions src/api/Api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import swaggerUi from 'swagger-ui-express';
import swaggerJsdoc from 'swagger-jsdoc';

import dotenv from 'dotenv';
import { Log } from '../utils/Logger';
import { LoadTokens } from '../config/Config';
import logger from '../utils/Logger';
dotenv.config();
const port = process.env.API_PORT || 17777;

Expand Down Expand Up @@ -54,14 +54,14 @@ app.use('/api/markets/', marketDataRoutes);
app.use('/api/protocol/', protocolDataRoutes);

app.listen(port, () => {
Log(`⚡️[server]: Server is running. See doc: http://localhost:${port}/api-docs`);
logger.debug(`⚡️[server]: Server is running. See doc: http://localhost:${port}/api-docs`);
});

process.on('SIGINT', cleanup);
process.on('SIGTERM', cleanup);

async function cleanup() {
// do cleanup if needed
Log('shutdown requested');
logger.debug('shutdown requested');
process.exit();
}
4 changes: 2 additions & 2 deletions src/api/middlewares/LoggerMiddleware.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Request, Response, NextFunction } from 'express';
import { Log } from '../../utils/Logger';
import logger from '../../utils/Logger';

const loggerMiddleware = (req: Request, _: Response, next: NextFunction) => {
Log(`[${new Date().toISOString()}] ${req.method} ${req.originalUrl}`);
logger.debug(`[${new Date().toISOString()}] ${req.method} ${req.originalUrl}`);
next();
};

Expand Down
1 change: 0 additions & 1 deletion src/api/routes/ProtocolDataRoutes.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import express, { Request, Response } from 'express';
import SimpleCacheService from '../../services/cache/CacheService';
import MarketDataController from '../controllers/MarketDataController';
import ProtocolDataController from '../controllers/ProtocolDataController';

const router = express.Router();
Expand Down
28 changes: 14 additions & 14 deletions src/datafetch/ECGDataFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { GetDeployBlock } from '../config/Config';
import { ReadJSON, WriteJSON } from '../utils/Utils';
import { GetWeb3Provider } from '../utils/Web3Helper';
import { FileMutex } from '../utils/FileMutex';
import { Log } from '../utils/Logger';
import logger from '../utils/Logger';
import { SendNotifications } from '../utils/Notifications';
import ProtocolDataFetcher from './fetchers/ProtocolDataFetcher';
import LendingTermsFetcher from './fetchers/LendingTermsFetcher';
Expand All @@ -26,36 +26,36 @@ export async function FetchECGData() {
const dtStart = performance.now();
const web3Provider = GetWeb3Provider();
const currentBlock = await web3Provider.getBlockNumber();
Log(`FetchECGData: fetching data up to block ${currentBlock}`);
logger.info(`FetchECGData: fetching data up to block ${currentBlock}`);

const syncData: SyncData = getSyncData();
Log('FetchECGData: start fetching');
logger.debug('FetchECGData: start fetching');
let fetchStart = performance.now();
const protocolData = await ProtocolDataFetcher.fetchAndSaveProtocolData(web3Provider);
Log(`FetchECGData: protocol data took: ${(performance.now() - fetchStart).toFixed(1)} ms`);
logger.debug(`FetchECGData: protocol data took: ${(performance.now() - fetchStart).toFixed(1)} ms`);
fetchStart = performance.now();
const terms = await LendingTermsFetcher.fetchAndSaveTerms(web3Provider, currentBlock);
Log(`FetchECGData: terms data took: ${(performance.now() - fetchStart).toFixed(1)} ms`);
logger.debug(`FetchECGData: terms data took: ${(performance.now() - fetchStart).toFixed(1)} ms`);
fetchStart = performance.now();
const loans = await LoansFetcher.fetchAndSaveLoans(web3Provider, terms, syncData, currentBlock);
Log(`FetchECGData: loan data took: ${(performance.now() - fetchStart).toFixed(1)} ms`);
logger.debug(`FetchECGData: loan data took: ${(performance.now() - fetchStart).toFixed(1)} ms`);
fetchStart = performance.now();
const gauges = await GaugesFetcher.fetchAndSaveGauges(web3Provider, syncData, currentBlock);
Log(`FetchECGData: gauges data took: ${(performance.now() - fetchStart).toFixed(1)} ms`);
logger.debug(`FetchECGData: gauges data took: ${(performance.now() - fetchStart).toFixed(1)} ms`);
fetchStart = performance.now();
const auctions = await AuctionsFetcher.fetchAndSaveAuctions(web3Provider, terms, syncData, currentBlock);
Log(`FetchECGData: auctions data took: ${(performance.now() - fetchStart).toFixed(1)} ms`);
logger.debug(`FetchECGData: auctions data took: ${(performance.now() - fetchStart).toFixed(1)} ms`);
fetchStart = performance.now();
const auctionsHouses = await AuctionsFetcher.fetchAndSaveAuctionHouses(web3Provider, terms);
Log(`FetchECGData: auction house data took: ${(performance.now() - fetchStart).toFixed(1)} ms`);
logger.debug(`FetchECGData: auction house data took: ${(performance.now() - fetchStart).toFixed(1)} ms`);
fetchStart = performance.now();
const proposals = await TermsProposalFetcher.fetchProposals(web3Provider, syncData, currentBlock);
Log(`FetchECGData: fetchProposals data took: ${(performance.now() - fetchStart).toFixed(1)} ms`);
logger.debug(`FetchECGData: fetchProposals data took: ${(performance.now() - fetchStart).toFixed(1)} ms`);
WriteJSON(path.join(DATA_DIR, 'sync.json'), syncData);
const durationMs = performance.now() - dtStart;
Log(`FetchECGData: finished fetching. Fetch duration: ${durationMs.toFixed(1)} ms`);
logger.info(`FetchECGData: finished fetching. Fetch duration: ${durationMs.toFixed(1)} ms`);
} catch (e) {
Log('FetchECGData: unknown failure', e);
logger.error('FetchECGData: unknown failure', e);
lastFetch = 0;
await SendNotifications('Data Fetcher', 'Unknown exception when fetching data', JSON.stringify(e));
} finally {
Expand Down Expand Up @@ -86,9 +86,9 @@ function getSyncData() {

export async function FetchIfTooOld() {
if (lastFetch + SECONDS_BETWEEN_FETCHES * 1000 > Date.now()) {
Log('FetchIfTooOld: no fetch needed');
logger.debug('FetchIfTooOld: no fetch needed');
} else {
Log('FetchIfTooOld: start fetching data');
logger.debug('FetchIfTooOld: start fetching data');
await FetchECGData();
}
}
32 changes: 13 additions & 19 deletions src/datafetch/EventProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
import { EventData, EventQueue } from '../utils/EventQueue';
import { buildTxUrl, sleep } from '../utils/Utils';
import { FetchECGData } from './ECGDataFetcher';
import { SendNotifications, SendNotificationsSpam } from '../utils/Notifications';
import { Log, Warn } from '../utils/Logger';
import logger from '../utils/Logger';
import { StartEventListener } from './EventWatcher';
import { MARKET_ID } from '../utils/Constants';
import { GuildToken__factory, LendingTerm__factory } from '../contracts/types';
import { GetWeb3Provider } from '../utils/Web3Helper';
import { GetGuildTokenAddress } from '../config/Config';

let lastBlockFetched = 0;
export async function StartEventProcessor() {
Log('Started the event processor');
logger.debug('Started the event processor');

// eslint-disable-next-line no-constant-condition
while (true) {
Expand All @@ -21,16 +16,17 @@ export async function StartEventProcessor() {
await ProcessAsync(event);
}
} else {
// Log('EventProcessor: sleeping');
// logger.debug('EventProcessor: sleeping');
await sleep(1000);
}
}
}

async function ProcessAsync(event: EventData) {
Log(`NEW EVENT DETECTED AT BLOCK ${event.block}: ${event.eventName}`);
logger.debug(`NEW EVENT DETECTED AT BLOCK ${event.block}: ${event.eventName}`);
if (mustUpdateProtocol(event)) {
if (lastBlockFetched < event.block) {
logger.info(`Important event ${event.eventName} detected at block ${event.block}. Restarting FetchECGData`);
await FetchECGData();
lastBlockFetched = event.block;

Expand All @@ -40,9 +36,7 @@ async function ProcessAsync(event: EventData) {
}
}

const msg = 'Updated backend data\n' + `Tx: ${buildTxUrl(event.txHash)}`;

Log(msg);
logger.debug('Updated backend data\n' + `Tx: ${buildTxUrl(event.txHash)}`);
}
}

Expand All @@ -69,7 +63,7 @@ function mustUpdateProtocol(event: EventData): boolean {
function guildTokenMustUpdate(event: EventData): boolean {
switch (event.eventName.toLowerCase()) {
default:
Log(`GuildToken ${event.eventName} is not important`);
logger.debug(`GuildToken ${event.eventName} is not important`);
return false;
case 'addgauge':
case 'removegauge':
Expand All @@ -82,40 +76,40 @@ function guildTokenMustUpdate(event: EventData): boolean {
function lendingTermMustUpdate(event: EventData): boolean {
switch (event.eventName.toLowerCase()) {
default:
Log(`LendingTerm ${event.eventName} is not important`);
logger.debug(`LendingTerm ${event.eventName} is not important`);
return false;
case 'loanopen':
case 'loanaddcollateral':
case 'loanpartialrepay':
case 'loanclose':
case 'loancall':
case 'setauctionhouse':
Log(`LendingTerm ${event.eventName} must force an update`);
logger.debug(`LendingTerm ${event.eventName} must force an update`);
return true;
}
}

function termFactoryMustUpdate(event: EventData): boolean {
switch (event.eventName.toLowerCase()) {
default:
Log(`TermFactory ${event.eventName} is not important`);
logger.debug(`TermFactory ${event.eventName} is not important`);
return false;
case 'termcreated':
Log(`TermFactory ${event.eventName} must force an update`);
logger.debug(`TermFactory ${event.eventName} must force an update`);
return true;
}
}

function onboardingMustUpdate(event: EventData): boolean {
switch (event.eventName.toLowerCase()) {
default:
Log(`Onboarding ${event.eventName} is not important`);
logger.debug(`Onboarding ${event.eventName} is not important`);
return false;
// case 'proposalexecuted': // dont check proposal executed as it will add a gauge anyway which is already fetched
case 'proposalcreated':
case 'proposalqueued':
case 'proposalcanceled':
Log(`Onboarding ${event.eventName} must force an update`);
logger.debug(`Onboarding ${event.eventName} must force an update`);
return true;
}
}
Expand Down
Loading