diff --git a/src/config/btc.sample.conf b/src/config/btc.sample.conf index f5eb3e405..55aed91cd 100644 --- a/src/config/btc.sample.conf +++ b/src/config/btc.sample.conf @@ -125,6 +125,11 @@ REQUEST_TIMEOUT = 30000 # Request timeout in milliseconds MAX_REQUESTS_PER_SECOND = 50 # Rate limit: max requests per second per client MAX_SUBSCRIPTIONS = 10 # Maximum subscriptions per client (blocks, epochs) +[API.MEMPOOL] +MAX_ADDRESSES = 20 # Maximum number of addresses per query +DEFAULT_LIMIT = 25 # Default number of transactions returned +MAX_LIMIT = 100 # Maximum number of transactions a client can request + [DATABASE] HOST = "127.0.0.1" # Database host PORT = 25485 diff --git a/src/protocols/OPNetAPIProtocol.proto b/src/protocols/OPNetAPIProtocol.proto index ffb669ad2..aa70840c9 100644 --- a/src/protocols/OPNetAPIProtocol.proto +++ b/src/protocols/OPNetAPIProtocol.proto @@ -599,6 +599,7 @@ message SubmitEpochResponse { enum SubscriptionTypeEnum { BLOCKS = 0; EPOCHS = 1; + MEMPOOL = 2; } message SubscribeBlocksRequest { @@ -645,3 +646,117 @@ message NewEpochNotification { string epochNumber = 2; string epochHash = 3; } + +// ============================================================================ +// Mempool Methods +// ============================================================================ + +// Request: retrieve aggregate mempool statistics (no parameters required). +message GetMempoolInfoRequest { + uint32 requestId = 1; +} + +// Response: aggregate mempool statistics. +message GetMempoolInfoResponse { + // Total number of pending transactions in the mempool. + uint32 count = 1; + // Number of pending OPNet-specific transactions. + uint32 opnetCount = 2; + // Total byte size of the mempool. + uint64 size = 3; +} + +// Request: fetch a single pending mempool transaction by hash. +message GetPendingTransactionRequest { + uint32 requestId = 1; + // The 64-character hex transaction hash. + string hash = 2; +} + +// A single transaction output as exposed by the mempool API. +message MempoolTransactionOutput { + // Destination address, or empty for unspendable outputs. + string address = 1; + // The vout index within the transaction. + uint32 outputIndex = 2; + // Output value in satoshis (decimal string). + string value = 3; + // Hex-encoded scriptPubKey. + string scriptPubKey = 4; +} + +// A single transaction input as exposed by the mempool API. +message MempoolTransactionInput { + // The txid of the referenced output. + string transactionId = 1; + // The vout index of the referenced output. + uint32 outputIndex = 2; +} + +// Full representation of a pending mempool transaction. +message PendingTransactionResponse { + // Internal transaction identifier (txid). + string id = 1; + // ISO-8601 timestamp of when the transaction was first seen. + string firstSeen = 2; + // Block height at which the transaction was observed (0x-prefixed hex). + string blockHeight = 3; + // Theoretical gas limit for OPNet execution (0x-prefixed hex). + string theoreticalGasLimit = 4; + // Priority fee attached to the transaction (0x-prefixed hex). + string priorityFee = 5; + // Whether this transaction targets an OPNet contract. + bool isOPNet = 6; + // Whether the transaction was submitted as a PSBT. + bool psbt = 7; + // The transaction inputs. + repeated MempoolTransactionInput inputs = 8; + // The transaction outputs. + repeated MempoolTransactionOutput outputs = 9; + // The full raw transaction as a hex string. + string raw = 10; +} + +// Request: fetch the latest pending transactions with optional address filter. +message GetLatestPendingTransactionsRequest { + uint32 requestId = 1; + // A single address to auto-resolve into all derived wallet address types. + string address = 2; + // Explicit list of addresses to filter by. + repeated string addresses = 3; + // Maximum number of transactions to return. + uint32 limit = 4; +} + +// Response: list of latest pending mempool transactions. +message LatestPendingTransactionsResponse { + // The matching pending transactions. + repeated PendingTransactionResponse transactions = 1; +} + +// Mempool Subscription + +// Request: subscribe to new mempool transaction notifications. +message SubscribeMempoolRequest { + uint32 requestId = 1; +} + +// Response: mempool subscription confirmation. +message SubscribeMempoolResponse { + // Server-assigned subscription identifier. + uint32 subscriptionId = 1; + // The subscription type (MEMPOOL). + SubscriptionTypeEnum type = 2; +} + +// Server push: a new transaction entered the mempool. +message NewMempoolTransactionNotification { + // The subscription identifier this notification belongs to. + uint32 subscriptionId = 1; + // The txid of the new mempool transaction. + string txId = 2; + // Whether this transaction targets an OPNet contract. + bool isOPNet = 3; + // Unix timestamp in milliseconds when the transaction was observed. + uint64 timestamp = 4; +} diff --git a/src/src/api/enums/Routes.ts b/src/src/api/enums/Routes.ts index 1abf8e8e2..7af595670 100644 --- a/src/src/api/enums/Routes.ts +++ b/src/src/api/enums/Routes.ts @@ -35,6 +35,11 @@ export enum Routes { SUBMIT_EPOCH = 'epoch/submit', EPOCH_TEMPLATE = 'epoch/template', + /** Mempool */ + MEMPOOL_INFO = 'mempool/info', + MEMPOOL_TRANSACTION = 'mempool/transaction', + MEMPOOL_TRANSACTIONS = 'mempool/transactions', + /** Other */ PROTOBUF_SCHEMA = 'protobuf/schema', PROTOBUF_API_SCHEMA = 'protobuf/api-schema', diff --git a/src/src/api/json-rpc/routes/JSONRpcRoute.ts b/src/src/api/json-rpc/routes/JSONRpcRoute.ts index 7789e1311..86a17545c 100644 --- a/src/src/api/json-rpc/routes/JSONRpcRoute.ts +++ b/src/src/api/json-rpc/routes/JSONRpcRoute.ts @@ -40,4 +40,9 @@ export const JSONRpcRouteMethods: JSONRpcRoute = { [JSONRpcMethods.GET_EPOCH_BY_HASH]: Routes.EPOCH_BY_HASH, [JSONRpcMethods.GET_EPOCH_TEMPLATE]: Routes.EPOCH_TEMPLATE, [JSONRpcMethods.SUBMIT_EPOCH]: Routes.SUBMIT_EPOCH, + + /** Mempool */ + [JSONRpcMethods.GET_MEMPOOL_INFO]: Routes.MEMPOOL_INFO, + [JSONRpcMethods.GET_PENDING_TRANSACTION]: Routes.MEMPOOL_TRANSACTION, + [JSONRpcMethods.GET_LATEST_PENDING_TRANSACTIONS]: Routes.MEMPOOL_TRANSACTIONS, }; diff --git a/src/src/api/json-rpc/types/enums/JSONRpcMethods.ts b/src/src/api/json-rpc/types/enums/JSONRpcMethods.ts index b4d84fa40..132cc525f 100644 --- a/src/src/api/json-rpc/types/enums/JSONRpcMethods.ts +++ b/src/src/api/json-rpc/types/enums/JSONRpcMethods.ts @@ -37,6 +37,15 @@ export enum JSONRpcMethods { GET_EPOCH_TEMPLATE = 'btc_getEpochTemplate', SUBMIT_EPOCH = 'btc_submitEpoch', + /** Mempool */ + + /** Retrieve aggregate mempool statistics (count, OPNet count, byte size). */ + GET_MEMPOOL_INFO = 'btc_getMempoolInfo', + /** Fetch a single pending mempool transaction by its hash. */ + GET_PENDING_TRANSACTION = 'btc_getPendingTransaction', + /** Fetch the latest pending mempool transactions, optionally filtered by address(es). */ + GET_LATEST_PENDING_TRANSACTIONS = 'btc_getLatestPendingTransactions', + /** Simulation */ CALL = 'btc_call', } diff --git a/src/src/api/json-rpc/types/interfaces/params/mempool/GetLatestPendingTransactionsParams.ts b/src/src/api/json-rpc/types/interfaces/params/mempool/GetLatestPendingTransactionsParams.ts new file mode 100644 index 000000000..5f19b55c8 --- /dev/null +++ b/src/src/api/json-rpc/types/interfaces/params/mempool/GetLatestPendingTransactionsParams.ts @@ -0,0 +1,21 @@ +import { JSONRpcMethods } from '../../../enums/JSONRpcMethods.js'; +import { JSONRpcParams } from '../../JSONRpcParams.js'; + +/** Object-form parameters for {@link JSONRpcMethods.GET_LATEST_PENDING_TRANSACTIONS}. */ +export interface GetLatestPendingTransactionsParamsAsObject + extends JSONRpcParams { + /** A single address to auto-resolve into all derived wallet address types. */ + readonly address?: string; + /** Explicit list of addresses to filter mempool transactions by. */ + readonly addresses?: string[]; + /** Maximum number of transactions to return. Clamped to `Config.API.MEMPOOL.MAX_LIMIT`. */ + readonly limit?: number; +} + +/** Array-form parameters: `[address?, addresses?, limit?]`. */ +export type GetLatestPendingTransactionsParamsAsArray = [string?, string[]?, number?]; + +/** Accepted parameter shapes for `btc_getLatestPendingTransactions`. */ +export type GetLatestPendingTransactionsParams = + | GetLatestPendingTransactionsParamsAsObject + | GetLatestPendingTransactionsParamsAsArray; diff --git a/src/src/api/json-rpc/types/interfaces/params/mempool/GetMempoolInfoParams.ts b/src/src/api/json-rpc/types/interfaces/params/mempool/GetMempoolInfoParams.ts new file mode 100644 index 000000000..865015737 --- /dev/null +++ b/src/src/api/json-rpc/types/interfaces/params/mempool/GetMempoolInfoParams.ts @@ -0,0 +1,12 @@ +import { JSONRpcMethods } from '../../../enums/JSONRpcMethods.js'; +import { JSONRpcParams } from '../../JSONRpcParams.js'; + +/** Object-form parameters for {@link JSONRpcMethods.GET_MEMPOOL_INFO} (none required). */ +export interface GetMempoolInfoParamsAsObject + extends JSONRpcParams {} + +/** Array-form parameters (empty). */ +export type GetMempoolInfoParamsAsArray = []; + +/** Accepted parameter shapes for `btc_getMempoolInfo`. */ +export type GetMempoolInfoParams = GetMempoolInfoParamsAsObject | GetMempoolInfoParamsAsArray; diff --git a/src/src/api/json-rpc/types/interfaces/params/mempool/GetPendingTransactionParams.ts b/src/src/api/json-rpc/types/interfaces/params/mempool/GetPendingTransactionParams.ts new file mode 100644 index 000000000..0badec8f3 --- /dev/null +++ b/src/src/api/json-rpc/types/interfaces/params/mempool/GetPendingTransactionParams.ts @@ -0,0 +1,17 @@ +import { JSONRpcMethods } from '../../../enums/JSONRpcMethods.js'; +import { JSONRpcParams } from '../../JSONRpcParams.js'; + +/** Object-form parameters for {@link JSONRpcMethods.GET_PENDING_TRANSACTION}. */ +export interface GetPendingTransactionParamsAsObject + extends JSONRpcParams { + /** The 64-character hex transaction hash. */ + readonly hash: string; +} + +/** Array-form parameters: `[hash]`. */ +export type GetPendingTransactionParamsAsArray = [string]; + +/** Accepted parameter shapes for `btc_getPendingTransaction`. */ +export type GetPendingTransactionParams = + | GetPendingTransactionParamsAsObject + | GetPendingTransactionParamsAsArray; diff --git a/src/src/api/json-rpc/types/interfaces/results/mempool/GetLatestPendingTransactionsResult.ts b/src/src/api/json-rpc/types/interfaces/results/mempool/GetLatestPendingTransactionsResult.ts new file mode 100644 index 000000000..832fdd845 --- /dev/null +++ b/src/src/api/json-rpc/types/interfaces/results/mempool/GetLatestPendingTransactionsResult.ts @@ -0,0 +1,14 @@ +import { JSONRpcMethods } from '../../../enums/JSONRpcMethods.js'; +import { JSONRpc2ResultData } from '../../JSONRpc2ResultData.js'; +import { MempoolTransactionData } from './MempoolTransactionData.js'; + +/** Payload shape for the `btc_getLatestPendingTransactions` result. */ +export interface GetLatestPendingTransactionsResultData { + /** The list of pending mempool transactions matching the query. */ + readonly transactions: MempoolTransactionData[]; +} + +/** Result type for the `btc_getLatestPendingTransactions` JSON-RPC method. */ +export type GetLatestPendingTransactionsResult = + JSONRpc2ResultData & + GetLatestPendingTransactionsResultData; diff --git a/src/src/api/json-rpc/types/interfaces/results/mempool/GetMempoolInfoResult.ts b/src/src/api/json-rpc/types/interfaces/results/mempool/GetMempoolInfoResult.ts new file mode 100644 index 000000000..979d4a1bf --- /dev/null +++ b/src/src/api/json-rpc/types/interfaces/results/mempool/GetMempoolInfoResult.ts @@ -0,0 +1,16 @@ +import { JSONRpcMethods } from '../../../enums/JSONRpcMethods.js'; +import { JSONRpc2ResultData } from '../../JSONRpc2ResultData.js'; + +/** Core mempool statistics returned by `btc_getMempoolInfo`. */ +export interface MempoolInfoData { + /** Total number of pending transactions in the mempool. */ + readonly count: number; + /** Number of pending OPNet-specific transactions in the mempool. */ + readonly opnetCount: number; + /** Total byte size of the mempool. */ + readonly size: number; +} + +/** Result type for the `btc_getMempoolInfo` JSON-RPC method. */ +export type GetMempoolInfoResult = JSONRpc2ResultData & + MempoolInfoData; diff --git a/src/src/api/json-rpc/types/interfaces/results/mempool/GetPendingTransactionResult.ts b/src/src/api/json-rpc/types/interfaces/results/mempool/GetPendingTransactionResult.ts new file mode 100644 index 000000000..4e102ec74 --- /dev/null +++ b/src/src/api/json-rpc/types/interfaces/results/mempool/GetPendingTransactionResult.ts @@ -0,0 +1,7 @@ +import { JSONRpcMethods } from '../../../enums/JSONRpcMethods.js'; +import { JSONRpc2ResultData } from '../../JSONRpc2ResultData.js'; +import { MempoolTransactionData } from './MempoolTransactionData.js'; + +/** Result type for the `btc_getPendingTransaction` JSON-RPC method. */ +export type GetPendingTransactionResult = + JSONRpc2ResultData & MempoolTransactionData; diff --git a/src/src/api/json-rpc/types/interfaces/results/mempool/MempoolTransactionData.ts b/src/src/api/json-rpc/types/interfaces/results/mempool/MempoolTransactionData.ts new file mode 100644 index 000000000..169fcdbec --- /dev/null +++ b/src/src/api/json-rpc/types/interfaces/results/mempool/MempoolTransactionData.ts @@ -0,0 +1,43 @@ +/** A single transaction input as exposed by the mempool API. */ +export interface MempoolTransactionInput { + /** The txid of the referenced output. */ + readonly transactionId: string; + /** The vout index of the referenced output. */ + readonly outputIndex: number; +} + +/** A single transaction output as exposed by the mempool API. */ +export interface MempoolTransactionOutput { + /** The destination address, or `null` for unspendable outputs. */ + readonly address: string | null; + /** The vout index within the transaction. */ + readonly outputIndex: number; + /** The output value in satoshis (decimal string). */ + readonly value: string; + /** The hex-encoded scriptPubKey. */ + readonly scriptPubKey: string; +} + +/** Full representation of a pending mempool transaction returned by the API. */ +export interface MempoolTransactionData { + /** Internal transaction identifier (txid). */ + readonly id: string; + /** ISO-8601 timestamp of when the transaction was first seen. */ + readonly firstSeen: string; + /** Block height at which the transaction was observed (`0x`-prefixed hex). */ + readonly blockHeight: string; + /** Theoretical gas limit for OPNet execution (`0x`-prefixed hex). */ + readonly theoreticalGasLimit: string; + /** Priority fee attached to the transaction (`0x`-prefixed hex). */ + readonly priorityFee: string; + /** Whether this transaction targets an OPNet contract. */ + readonly isOPNet: boolean; + /** Whether the transaction was submitted as a PSBT. */ + readonly psbt: boolean; + /** The transaction inputs. */ + readonly inputs: MempoolTransactionInput[]; + /** The transaction outputs. */ + readonly outputs: MempoolTransactionOutput[]; + /** The full raw transaction as a hex string. */ + readonly raw: string; +} diff --git a/src/src/api/routes/DefinedRoutes.ts b/src/src/api/routes/DefinedRoutes.ts index dcb468c17..5c5d1c535 100644 --- a/src/src/api/routes/DefinedRoutes.ts +++ b/src/src/api/routes/DefinedRoutes.ts @@ -28,6 +28,9 @@ import { EpochByHash } from './api/v1/epochs/EpochByHash.js'; import { SubmitEpochRoute } from './api/v1/epochs/SubmitEpochRoute.js'; import { GetEpochTemplateRoute } from './api/v1/epochs/GetEpochTemplateRoute.js'; import { BlockByChecksum } from './api/v1/block/BlockByChecksum.js'; +import { GetMempoolInfo } from './api/v1/mempool/GetMempoolInfo.js'; +import { GetPendingTransaction } from './api/v1/mempool/GetPendingTransaction.js'; +import { GetLatestPendingTransactions } from './api/v1/mempool/GetLatestPendingTransactions.js'; export const DefinedRoutes: { [key in Routes]: Route; @@ -67,6 +70,11 @@ export const DefinedRoutes: { [Routes.BROADCAST_TRANSACTION]: new BroadcastTransaction(), [Routes.TRANSACTION_PREIMAGE]: new GetPreimage(), + /** Mempool */ + [Routes.MEMPOOL_INFO]: new GetMempoolInfo(), + [Routes.MEMPOOL_TRANSACTION]: new GetPendingTransaction(), + [Routes.MEMPOOL_TRANSACTIONS]: new GetLatestPendingTransactions(), + /** Others */ [Routes.PROTOBUF_SCHEMA]: new ProtobufSchema(), [Routes.PROTOBUF_API_SCHEMA]: new ProtobufAPISchema(), diff --git a/src/src/api/routes/api/v1/mempool/GetLatestPendingTransactions.ts b/src/src/api/routes/api/v1/mempool/GetLatestPendingTransactions.ts new file mode 100644 index 000000000..bec74b2ab --- /dev/null +++ b/src/src/api/routes/api/v1/mempool/GetLatestPendingTransactions.ts @@ -0,0 +1,207 @@ +import { Request } from '@btc-vision/hyper-express/types/components/http/Request.js'; +import { Response } from '@btc-vision/hyper-express/types/components/http/Response.js'; +import { MiddlewareNext } from '@btc-vision/hyper-express/types/components/middleware/MiddlewareNext.js'; +import { Config } from '../../../../../config/Config.js'; +import { Routes, RouteType } from '../../../../enums/Routes.js'; +import { JSONRpcMethods } from '../../../../json-rpc/types/enums/JSONRpcMethods.js'; +import { GetLatestPendingTransactionsParams } from '../../../../json-rpc/types/interfaces/params/mempool/GetLatestPendingTransactionsParams.js'; +import { GetLatestPendingTransactionsResult } from '../../../../json-rpc/types/interfaces/results/mempool/GetLatestPendingTransactionsResult.js'; +import { + PublicKeyInfo, + IPubKeyNotFoundError, +} from '../../../../json-rpc/types/interfaces/results/address/PublicKeyInfoResult.js'; +import { Route } from '../../../Route.js'; +import { MempoolTransactionConverter } from './MempoolTransactionConverter.js'; + +/** + * Route handler for fetching the latest pending transactions from the mempool. + * + * @remarks + * Supports optional address-based filtering and pagination via the `limit` parameter. + * When a single address is supplied, it is auto-resolved to all derived wallet address + * types (p2tr, p2op, p2pkh, p2wpkh, p2sh-p2wpkh) so the caller does not need to know + * the full set of addresses associated with a key. + * + * Limits are governed by {@link Config.API.MEMPOOL}. + */ +export class GetLatestPendingTransactions extends Route< + Routes.MEMPOOL_TRANSACTIONS, + JSONRpcMethods.GET_LATEST_PENDING_TRANSACTIONS, + GetLatestPendingTransactionsResult +> { + constructor() { + super(Routes.MEMPOOL_TRANSACTIONS, RouteType.GET); + } + + /** + * Retrieves the latest pending mempool transactions, optionally filtered by address(es). + * + * @param params - Optional filter / pagination parameters. + * @returns The matching pending transactions. + * @throws If storage is not initialised or the address count exceeds the configured maximum. + */ + public async getData( + params?: GetLatestPendingTransactionsParams, + ): Promise { + if (!this.storage) { + throw new Error('Storage not initialized'); + } + + const decoded = this.getDecodedParams(params); + let addresses: string[] | undefined = decoded.addresses; + const limit = Math.max(1, Math.min(decoded.limit, Config.API.MEMPOOL.MAX_LIMIT)); + + // If a single address is provided, auto-resolve all address types + if (decoded.address && !addresses) { + addresses = await this.resolveAddresses(decoded.address); + } + + if (addresses && addresses.length > Config.API.MEMPOOL.MAX_ADDRESSES) { + throw new Error(`Too many addresses. Maximum is ${Config.API.MEMPOOL.MAX_ADDRESSES}.`); + } + + const txs = await this.storage.getLatestPendingTransactions(addresses, limit); + + return { + transactions: MempoolTransactionConverter.convertMany(txs), + }; + } + + /** {@inheritDoc Route.getDataRPC} */ + public async getDataRPC( + params?: GetLatestPendingTransactionsParams, + ): Promise { + return await this.getData(params); + } + + protected initialize(): void {} + + /** + * GET /api/v1/mempool/transactions + * @tag Mempool + * @summary Get latest pending transactions + * @description Get the latest pending transactions from the mempool. Optionally filter by address. + * @queryParam {string} [address] - A single address to auto-resolve all wallet address types. + * @queryParam {string} [addresses] - Comma-separated list of addresses to filter by. + * @queryParam {number} [limit] - Maximum number of results (default 25, max 100). + * @response 200 - Returns the latest pending transactions. + * @response 400 - Something went wrong. + * @response default - Unexpected error + * @responseContent {GetLatestPendingTransactionsResultData} 200.application/json + */ + protected async onRequest(_req: Request, res: Response, _next?: MiddlewareNext): Promise { + try { + const params = this.getParams(_req, res); + if (!params) { + return; + } + + const data = await this.getData(params); + + if (data) { + this.safeJson(res, 200, data); + } else { + this.safeJson(res, 400, { error: 'Could not fetch pending transactions.' }); + } + } catch (err) { + this.handleDefaultError(res, err as Error); + } + } + + /** + * Extracts query-string parameters from an HTTP request. + * + * @param req - The incoming HTTP request. + * @param _res - The HTTP response (unused). + * @returns Parsed parameters, or `undefined` when invalid. + */ + protected getParams( + req: Request, + _res: Response, + ): GetLatestPendingTransactionsParams | undefined { + const address = req.query.address as string | undefined; + const addressesStr = req.query.addresses as string | undefined; + const limitStr = req.query.limit as string | undefined; + + const addresses = addressesStr + ? addressesStr.split(',').filter((a) => a.length > 0) + : undefined; + const parsed = limitStr ? parseInt(limitStr, 10) : undefined; + const limit = parsed && !isNaN(parsed) ? parsed : undefined; + + return { + address, + addresses, + limit, + }; + } + + /** + * Resolves a single address to all known wallet address types via public-key lookup. + * + * @param address - The address to resolve. + * @returns Deduplicated array of all derived addresses (including the original). + */ + private async resolveAddresses(address: string): Promise { + if (!this.storage) { + throw new Error('Storage not initialized'); + } + + const result = await this.storage.getAddressOrPublicKeysInformation([address]); + const allAddresses: string[] = [address]; + + for (const key of Object.keys(result)) { + const info = result[key]; + if (this.isPubKeyNotFoundError(info)) { + continue; + } + + const pubKeyInfo = info as PublicKeyInfo; + if (pubKeyInfo.p2tr) allAddresses.push(pubKeyInfo.p2tr); + if (pubKeyInfo.p2op) allAddresses.push(pubKeyInfo.p2op); + if (pubKeyInfo.p2pkh) allAddresses.push(pubKeyInfo.p2pkh); + if (pubKeyInfo.p2wpkh) allAddresses.push(pubKeyInfo.p2wpkh); + if (pubKeyInfo.p2shp2wpkh) allAddresses.push(pubKeyInfo.p2shp2wpkh); + } + + // Deduplicate + return [...new Set(allAddresses)]; + } + + /** Type-guard that narrows a public-key result to the error variant. */ + private isPubKeyNotFoundError( + info: PublicKeyInfo | IPubKeyNotFoundError, + ): info is IPubKeyNotFoundError { + return 'error' in info; + } + + /** + * Normalises both array-style and object-style RPC parameters into a uniform shape. + * + * @param params - Raw RPC parameters (array or object form). + * @returns Normalised parameter object with a guaranteed `limit` value. + */ + private getDecodedParams(params?: GetLatestPendingTransactionsParams): { + address?: string; + addresses?: string[]; + limit: number; + } { + if (!params) { + return { limit: Config.API.MEMPOOL.DEFAULT_LIMIT }; + } + + if (Array.isArray(params)) { + return { + address: params[0] ?? undefined, + addresses: params[1] ?? undefined, + limit: params[2] ?? Config.API.MEMPOOL.DEFAULT_LIMIT, + }; + } + + return { + address: params.address, + addresses: params.addresses, + limit: params.limit ?? Config.API.MEMPOOL.DEFAULT_LIMIT, + }; + } +} diff --git a/src/src/api/routes/api/v1/mempool/GetMempoolInfo.ts b/src/src/api/routes/api/v1/mempool/GetMempoolInfo.ts new file mode 100644 index 000000000..be7a2109d --- /dev/null +++ b/src/src/api/routes/api/v1/mempool/GetMempoolInfo.ts @@ -0,0 +1,75 @@ +import { Request } from '@btc-vision/hyper-express/types/components/http/Request.js'; +import { Response } from '@btc-vision/hyper-express/types/components/http/Response.js'; +import { MiddlewareNext } from '@btc-vision/hyper-express/types/components/middleware/MiddlewareNext.js'; +import { Routes, RouteType } from '../../../../enums/Routes.js'; +import { JSONRpcMethods } from '../../../../json-rpc/types/enums/JSONRpcMethods.js'; +import { GetMempoolInfoParams } from '../../../../json-rpc/types/interfaces/params/mempool/GetMempoolInfoParams.js'; +import { GetMempoolInfoResult } from '../../../../json-rpc/types/interfaces/results/mempool/GetMempoolInfoResult.js'; +import { Route } from '../../../Route.js'; + +/** + * Route handler that returns live mempool statistics (total count, OPNet count, byte size). + */ +export class GetMempoolInfo extends Route< + Routes.MEMPOOL_INFO, + JSONRpcMethods.GET_MEMPOOL_INFO, + GetMempoolInfoResult +> { + constructor() { + super(Routes.MEMPOOL_INFO, RouteType.GET); + } + + /** + * Fetches aggregate mempool statistics from storage. + * + * @param _params - Unused; the endpoint accepts no parameters. + * @returns Mempool info containing total count, OPNet count, and byte size. + * @throws If storage is not initialised. + */ + public async getData(_params?: GetMempoolInfoParams): Promise { + if (!this.storage) { + throw new Error('Storage not initialized'); + } + + const info = await this.storage.getMempoolInfo(); + + return { + count: info.count, + opnetCount: info.opnetCount, + size: info.size, + }; + } + + /** {@inheritDoc Route.getDataRPC} */ + public async getDataRPC( + _params?: GetMempoolInfoParams, + ): Promise { + return await this.getData(); + } + + protected initialize(): void {} + + /** + * GET /api/v1/mempool/info + * @tag Mempool + * @summary Get mempool information + * @description Returns live mempool statistics including transaction count and OPNet breakdown. + * @response 200 - Returns mempool info. + * @response 400 - Something went wrong. + * @response default - Unexpected error + * @responseContent {MempoolInfoData} 200.application/json + */ + protected async onRequest(_req: Request, res: Response, _next?: MiddlewareNext): Promise { + try { + const data = await this.getData(); + + if (data) { + this.safeJson(res, 200, data); + } else { + this.safeJson(res, 400, { error: 'Could not fetch mempool info.' }); + } + } catch (err) { + this.handleDefaultError(res, err as Error); + } + } +} diff --git a/src/src/api/routes/api/v1/mempool/GetPendingTransaction.ts b/src/src/api/routes/api/v1/mempool/GetPendingTransaction.ts new file mode 100644 index 000000000..b9e219ea6 --- /dev/null +++ b/src/src/api/routes/api/v1/mempool/GetPendingTransaction.ts @@ -0,0 +1,131 @@ +import { Request } from '@btc-vision/hyper-express/types/components/http/Request.js'; +import { Response } from '@btc-vision/hyper-express/types/components/http/Response.js'; +import { MiddlewareNext } from '@btc-vision/hyper-express/types/components/middleware/MiddlewareNext.js'; +import { Routes, RouteType } from '../../../../enums/Routes.js'; +import { JSONRpcMethods } from '../../../../json-rpc/types/enums/JSONRpcMethods.js'; +import { GetPendingTransactionParams } from '../../../../json-rpc/types/interfaces/params/mempool/GetPendingTransactionParams.js'; +import { GetPendingTransactionResult } from '../../../../json-rpc/types/interfaces/results/mempool/GetPendingTransactionResult.js'; +import { Route } from '../../../Route.js'; +import { MempoolTransactionConverter } from './MempoolTransactionConverter.js'; + +/** + * Route handler that retrieves a single pending mempool transaction by its hash. + */ +export class GetPendingTransaction extends Route< + Routes.MEMPOOL_TRANSACTION, + JSONRpcMethods.GET_PENDING_TRANSACTION, + GetPendingTransactionResult +> { + constructor() { + super(Routes.MEMPOOL_TRANSACTION, RouteType.GET); + } + + /** + * Fetches a single pending transaction from the mempool. + * + * @param params - Must contain the 64-character hex transaction hash. + * @returns The transaction data converted to the API response shape. + * @throws If storage is not initialised, the hash is invalid, or the transaction is not found. + */ + public async getData( + params: GetPendingTransactionParams, + ): Promise { + if (!this.storage) { + throw new Error('Storage not initialized'); + } + + const [hash] = this.getDecodedParams(params); + if (hash.length !== 64) throw new Error(`Invalid hash length: ${hash.length}`); + + const tx = await this.storage.getMempoolTransaction(hash); + if (!tx) { + throw new Error(`Pending transaction ${hash} not found.`); + } + + return MempoolTransactionConverter.convert(tx) as GetPendingTransactionResult; + } + + /** {@inheritDoc Route.getDataRPC} */ + public async getDataRPC( + params: GetPendingTransactionParams, + ): Promise { + return await this.getData(params); + } + + protected initialize(): void {} + + /** + * GET /api/v1/mempool/transaction + * @tag Mempool + * @summary Get a pending transaction by hash + * @description Get the detailed information of a pending mempool transaction by its hash. + * @queryParam {string} hash - The hash of the transaction. + * @response 200 - Returns the pending transaction details. + * @response 400 - Something went wrong. + * @response default - Unexpected error + * @responseContent {MempoolTransactionData} 200.application/json + */ + protected async onRequest(_req: Request, res: Response, _next?: MiddlewareNext): Promise { + try { + const params = this.getParams(_req, res); + if (!params) { + return; + } + + const data = await this.getData(params); + + if (data) { + this.safeJson(res, 200, data); + } else { + this.safeJson(res, 400, { error: 'Pending transaction not found.' }); + } + } catch (err) { + this.handleDefaultError(res, err as Error); + } + } + + /** + * Extracts and validates the transaction hash from the HTTP query string. + * + * @param req - The incoming HTTP request. + * @param res - The HTTP response (used for error replies). + * @returns Parsed parameters, or `undefined` when the hash is missing / invalid. + */ + protected getParams(req: Request, res: Response): GetPendingTransactionParams | undefined { + if (!req.query) { + throw new Error('Invalid params.'); + } + + const hash = req.query.hash as string; + + if (!hash || (hash && hash.length !== 64)) { + this.safeJson(res, 400, { error: 'Invalid hash.' }); + return; + } + + return { + hash, + }; + } + + /** + * Normalises both array-style and object-style RPC parameters. + * + * @param params - Raw RPC parameters. + * @returns A single-element tuple containing the validated hash string. + * @throws If the hash is missing. + */ + private getDecodedParams(params: GetPendingTransactionParams): [string] { + let hash: string | undefined; + + if (Array.isArray(params)) { + hash = params.shift(); + } else { + hash = params.hash; + } + + if (!hash) throw new Error(`Invalid hash.`); + + return [hash]; + } +} diff --git a/src/src/api/routes/api/v1/mempool/MempoolTransactionConverter.ts b/src/src/api/routes/api/v1/mempool/MempoolTransactionConverter.ts new file mode 100644 index 000000000..3175bdae4 --- /dev/null +++ b/src/src/api/routes/api/v1/mempool/MempoolTransactionConverter.ts @@ -0,0 +1,47 @@ +import { toHex } from '@btc-vision/bitcoin'; +import { IMempoolTransactionObj } from '../../../../../db/interfaces/IMempoolTransaction.js'; +import { MempoolTransactionData } from '../../../../json-rpc/types/interfaces/results/mempool/MempoolTransactionData.js'; + +/** + * Converts raw mempool database objects into the API-facing {@link MempoolTransactionData} shape. + */ +export class MempoolTransactionConverter { + /** + * Converts a single mempool transaction from its database representation to the API format. + * + * @param tx - The raw mempool transaction object from storage. + * @returns A serialisable API response object with hex-encoded numeric fields. + */ + public static convert(tx: IMempoolTransactionObj): MempoolTransactionData { + return { + id: tx.id, + firstSeen: tx.firstSeen ? tx.firstSeen.toISOString() : new Date(0).toISOString(), + blockHeight: '0x' + tx.blockHeight.toString(16), + theoreticalGasLimit: '0x' + tx.theoreticalGasLimit.toString(16), + priorityFee: '0x' + tx.priorityFee.toString(16), + isOPNet: tx.isOPNet, + psbt: tx.psbt, + inputs: tx.inputs.map((input) => ({ + transactionId: input.transactionId, + outputIndex: input.outputIndex, + })), + outputs: tx.outputs.map((output) => ({ + address: output.address, + outputIndex: output.outputIndex, + value: output.value.toString(), + scriptPubKey: toHex(output.data), + })), + raw: toHex(tx.data), + }; + } + + /** + * Batch-converts an array of mempool transactions. + * + * @param txs - The raw mempool transaction objects from storage. + * @returns An array of API-formatted transaction objects. + */ + public static convertMany(txs: IMempoolTransactionObj[]): MempoolTransactionData[] { + return txs.map(MempoolTransactionConverter.convert); + } +} diff --git a/src/src/api/routes/api/v1/transaction/BroadcastTransaction.ts b/src/src/api/routes/api/v1/transaction/BroadcastTransaction.ts index 7c44caf3c..33cddf94a 100644 --- a/src/src/api/routes/api/v1/transaction/BroadcastTransaction.ts +++ b/src/src/api/routes/api/v1/transaction/BroadcastTransaction.ts @@ -20,6 +20,7 @@ import { BroadcastOPNetRequest } from '../../../../../threading/interfaces/threa import { TransactionSizeValidator } from '../../../../../poc/mempool/data-validator/TransactionSizeValidator.js'; import { Config } from '../../../../../config/Config.js'; import { fromBase64, fromHex, Transaction } from '@btc-vision/bitcoin'; +import { WSManager } from '../../../../websocket/WebSocketManager.js'; export class BroadcastTransaction extends Route< Routes.BROADCAST_TRANSACTION, @@ -97,10 +98,17 @@ export class BroadcastTransaction extends Route< }; } - return { + const mergedResult = { ...(verification as BroadcastTransactionResult), ...(result as BroadcastTransactionResult), } as BroadcastTransactionResult; + + // Notify mempool subscribers of the new transaction + if (mergedResult.success && mergedResult.result) { + WSManager.onMempoolTransaction(mergedResult.result, true); + } + + return mergedResult; } return verification; diff --git a/src/src/api/websocket/OpcodeRegistry.ts b/src/src/api/websocket/OpcodeRegistry.ts index 96c97f56e..448a1e162 100644 --- a/src/src/api/websocket/OpcodeRegistry.ts +++ b/src/src/api/websocket/OpcodeRegistry.ts @@ -375,6 +375,32 @@ export class OpcodeRegistry extends APIProtobufLoader { WebSocketResponseOpcode.EPOCH_SUBMIT_RESULT, ); + // Mempool + this.packetBuilders[APIPacketType.GetMempoolInfoRequest] = this.createPacket( + APIPacketType.GetMempoolInfoRequest, + WebSocketRequestOpcode.GET_MEMPOOL_INFO, + ); + this.packetBuilders[APIPacketType.GetMempoolInfoResponse] = this.createPacket( + APIPacketType.GetMempoolInfoResponse, + WebSocketResponseOpcode.MEMPOOL_INFO, + ); + this.packetBuilders[APIPacketType.GetPendingTransactionRequest] = this.createPacket( + APIPacketType.GetPendingTransactionRequest, + WebSocketRequestOpcode.GET_PENDING_TRANSACTION, + ); + this.packetBuilders[APIPacketType.PendingTransactionResponse] = this.createPacket( + APIPacketType.PendingTransactionResponse, + WebSocketResponseOpcode.PENDING_TRANSACTION, + ); + this.packetBuilders[APIPacketType.GetLatestPendingTransactionsRequest] = this.createPacket( + APIPacketType.GetLatestPendingTransactionsRequest, + WebSocketRequestOpcode.GET_LATEST_PENDING_TRANSACTIONS, + ); + this.packetBuilders[APIPacketType.LatestPendingTransactionsResponse] = this.createPacket( + APIPacketType.LatestPendingTransactionsResponse, + WebSocketResponseOpcode.LATEST_PENDING_TRANSACTIONS, + ); + // Subscriptions this.packetBuilders[APIPacketType.SubscribeBlocksRequest] = this.createPacket( APIPacketType.SubscribeBlocksRequest, @@ -392,6 +418,14 @@ export class OpcodeRegistry extends APIProtobufLoader { APIPacketType.SubscribeEpochsResponse, WebSocketResponseOpcode.SUBSCRIPTION_CREATED, ); + this.packetBuilders[APIPacketType.SubscribeMempoolRequest] = this.createPacket( + APIPacketType.SubscribeMempoolRequest, + WebSocketRequestOpcode.SUBSCRIBE_MEMPOOL, + ); + this.packetBuilders[APIPacketType.SubscribeMempoolResponse] = this.createPacket( + APIPacketType.SubscribeMempoolResponse, + WebSocketResponseOpcode.SUBSCRIPTION_CREATED, + ); this.packetBuilders[APIPacketType.UnsubscribeRequest] = this.createPacket( APIPacketType.UnsubscribeRequest, WebSocketRequestOpcode.UNSUBSCRIBE, @@ -410,6 +444,10 @@ export class OpcodeRegistry extends APIProtobufLoader { APIPacketType.NewEpochNotification, WebSocketResponseOpcode.NEW_EPOCH_NOTIFICATION, ); + this.packetBuilders[APIPacketType.NewMempoolTransactionNotification] = this.createPacket( + APIPacketType.NewMempoolTransactionNotification, + WebSocketResponseOpcode.NEW_MEMPOOL_TX_NOTIFICATION, + ); // Register response packets for (const packet of Object.values(this.packetBuilders)) { @@ -546,6 +584,26 @@ export class OpcodeRegistry extends APIProtobufLoader { WebSocketResponseOpcode.PREIMAGE, ); + // Mempool + register( + WebSocketRequestOpcode.GET_MEMPOOL_INFO, + APIPacketType.GetMempoolInfoRequest, + APIPacketType.GetMempoolInfoResponse, + WebSocketResponseOpcode.MEMPOOL_INFO, + ); + register( + WebSocketRequestOpcode.GET_PENDING_TRANSACTION, + APIPacketType.GetPendingTransactionRequest, + APIPacketType.PendingTransactionResponse, + WebSocketResponseOpcode.PENDING_TRANSACTION, + ); + register( + WebSocketRequestOpcode.GET_LATEST_PENDING_TRANSACTIONS, + APIPacketType.GetLatestPendingTransactionsRequest, + APIPacketType.LatestPendingTransactionsResponse, + WebSocketResponseOpcode.LATEST_PENDING_TRANSACTIONS, + ); + // Addresses register( WebSocketRequestOpcode.GET_BALANCE, @@ -646,6 +704,12 @@ export class OpcodeRegistry extends APIProtobufLoader { APIPacketType.SubscribeEpochsResponse, WebSocketResponseOpcode.SUBSCRIPTION_CREATED, ); + register( + WebSocketRequestOpcode.SUBSCRIBE_MEMPOOL, + APIPacketType.SubscribeMempoolRequest, + APIPacketType.SubscribeMempoolResponse, + WebSocketResponseOpcode.SUBSCRIPTION_CREATED, + ); register( WebSocketRequestOpcode.UNSUBSCRIBE, APIPacketType.UnsubscribeRequest, diff --git a/src/src/api/websocket/WebSocketManager.ts b/src/src/api/websocket/WebSocketManager.ts index 20146e778..5fcad4334 100644 --- a/src/src/api/websocket/WebSocketManager.ts +++ b/src/src/api/websocket/WebSocketManager.ts @@ -441,6 +441,53 @@ export class WebSocketManager extends Logger { } } + /** + * Broadcasts a new mempool transaction notification to all subscribed WebSocket clients. + * + * @param txId - The txid of the transaction that entered the mempool. + * @param isOPNet - Whether the transaction targets an OPNet contract. + */ + public onMempoolTransaction(txId: string, isOPNet: boolean): void { + if (!this.enabled) { + return; + } + + const notificationPacket = APIRegistry.getPacketBuilder( + APIPacketType.NewMempoolTransactionNotification, + ); + if (!notificationPacket) { + return; + } + + const notification = { + subscriptionId: 0, // Will be set per-client + txId, + isOPNet, + timestamp: BigInt(Date.now()), + }; + + for (const client of this.clients.values()) { + if (!client.isHandshakeCompleted()) { + continue; + } + + // Check if client has mempool subscription + for (const [subId, sub] of client.getSubscriptions()) { + if (sub.type === SubscriptionType.MEMPOOL) { + notification.subscriptionId = subId; + try { + const packed = notificationPacket.pack(notification); + client.send(packed); + } catch (error) { + this.error( + `Failed to send mempool notification to ${client.clientId}: ${error}`, + ); + } + } + } + } + } + /** * Broadcast a message to all connected and handshaked clients */ diff --git a/src/src/api/websocket/handlers/HandlerRegistry.ts b/src/src/api/websocket/handlers/HandlerRegistry.ts index c10451190..070196f62 100644 --- a/src/src/api/websocket/handlers/HandlerRegistry.ts +++ b/src/src/api/websocket/handlers/HandlerRegistry.ts @@ -33,6 +33,9 @@ import { GetEpochTemplateRequest, GetGasRequest, GetLatestEpochRequest, + GetLatestPendingTransactionsWsRequest, + GetMempoolInfoWsRequest, + GetPendingTransactionWsRequest, GetPreimageRequest, GetPublicKeyInfoRequest, GetReorgRequest, @@ -43,6 +46,7 @@ import { SubmitEpochRequest, SubscribeBlocksRequest, SubscribeEpochsRequest, + SubscribeMempoolWsRequest, UnsubscribeRequest, } from '../types/requests/WebSocketRequestTypes.js'; @@ -71,6 +75,9 @@ import { EpochByHash } from '../../routes/api/v1/epochs/EpochByHash.js'; import { GetEpochTemplateRoute } from '../../routes/api/v1/epochs/GetEpochTemplateRoute.js'; import { SubmitEpochRoute } from '../../routes/api/v1/epochs/SubmitEpochRoute.js'; import { SubmissionStatus } from '../../json-rpc/types/interfaces/results/epochs/SubmittedEpochResult.js'; +import { GetMempoolInfo } from '../../routes/api/v1/mempool/GetMempoolInfo.js'; +import { GetPendingTransaction } from '../../routes/api/v1/mempool/GetPendingTransaction.js'; +import { GetLatestPendingTransactions } from '../../routes/api/v1/mempool/GetLatestPendingTransactions.js'; /** * Converts bigint to string for protobuf serialization (uint64 needs special handling) @@ -265,6 +272,7 @@ export class HandlerRegistry extends Logger { public registerAll(): void { this.registerBlockHandlers(); this.registerTransactionHandlers(); + this.registerMempoolHandlers(); this.registerAddressHandlers(); this.registerChainHandlers(); this.registerStateHandlers(); @@ -455,6 +463,50 @@ export class HandlerRegistry extends Logger { ); } + /** Registers WebSocket handlers for mempool query opcodes (info, single tx, latest txs). */ + private registerMempoolHandlers(): void { + // GET_MEMPOOL_INFO + APIRegistry.registerHandler( + WebSocketRequestOpcode.GET_MEMPOOL_INFO, + async (_request: PackedMessage) => { + const route = DefinedRoutes[Routes.MEMPOOL_INFO] as GetMempoolInfo; + const result = await route.getData(); + + return { + count: result.count, + opnetCount: result.opnetCount, + size: BigInt(result.size), + }; + }, + ); + + // GET_PENDING_TRANSACTION + APIRegistry.registerHandler( + WebSocketRequestOpcode.GET_PENDING_TRANSACTION, + async (request: PackedMessage) => { + const route = DefinedRoutes[Routes.MEMPOOL_TRANSACTION] as GetPendingTransaction; + const result = await route.getData({ hash: request.hash }); + + return result; + }, + ); + + // GET_LATEST_PENDING_TRANSACTIONS + APIRegistry.registerHandler( + WebSocketRequestOpcode.GET_LATEST_PENDING_TRANSACTIONS, + async (request: PackedMessage) => { + const route = DefinedRoutes[Routes.MEMPOOL_TRANSACTIONS] as GetLatestPendingTransactions; + const result = await route.getData({ + address: request.address || undefined, + addresses: request.addresses?.length ? request.addresses : undefined, + limit: request.limit || undefined, + }); + + return result; + }, + ); + } + private registerAddressHandlers(): void { // GET_BALANCE APIRegistry.registerHandler( @@ -808,6 +860,36 @@ export class HandlerRegistry extends Logger { }, ); + // SUBSCRIBE_MEMPOOL + APIRegistry.registerHandler( + WebSocketRequestOpcode.SUBSCRIBE_MEMPOOL, + ( + _request: PackedMessage, + _requestId: number, + clientId: string, + ) => { + const client = WSManager.getClient(clientId); + if (!client) { + throw new WebSocketAPIError(InternalError.INTERNAL_ERROR); + } + + // Check if already subscribed + if (client.hasSubscription(SubscriptionType.MEMPOOL)) { + throw new WebSocketAPIError(ResourceError.SUBSCRIPTION_ALREADY_EXISTS); + } + + const subscriptionId = client.addSubscription(SubscriptionType.MEMPOOL); + if (subscriptionId === null) { + throw new WebSocketAPIError(ResourceError.MAX_SUBSCRIPTIONS_REACHED); + } + + return { + subscriptionId, + type: SubscriptionType.MEMPOOL, + }; + }, + ); + // UNSUBSCRIBE APIRegistry.registerHandler( WebSocketRequestOpcode.UNSUBSCRIBE, diff --git a/src/src/api/websocket/packets/types/APIPacketTypes.ts b/src/src/api/websocket/packets/types/APIPacketTypes.ts index b11650601..e5ad31763 100644 --- a/src/src/api/websocket/packets/types/APIPacketTypes.ts +++ b/src/src/api/websocket/packets/types/APIPacketTypes.ts @@ -66,17 +66,37 @@ export enum APIPacketType { SubmitEpochRequest = 'SubmitEpochRequest', SubmitEpochResponse = 'SubmitEpochResponse', + // Mempool + /** Request: aggregate mempool statistics. */ + GetMempoolInfoRequest = 'GetMempoolInfoRequest', + /** Response: aggregate mempool statistics. */ + GetMempoolInfoResponse = 'GetMempoolInfoResponse', + /** Request: single pending transaction by hash. */ + GetPendingTransactionRequest = 'GetPendingTransactionRequest', + /** Response: single pending mempool transaction. */ + PendingTransactionResponse = 'PendingTransactionResponse', + /** Request: latest pending transactions with optional address filter. */ + GetLatestPendingTransactionsRequest = 'GetLatestPendingTransactionsRequest', + /** Response: list of latest pending mempool transactions. */ + LatestPendingTransactionsResponse = 'LatestPendingTransactionsResponse', + // Subscriptions SubscribeBlocksRequest = 'SubscribeBlocksRequest', SubscribeBlocksResponse = 'SubscribeBlocksResponse', SubscribeEpochsRequest = 'SubscribeEpochsRequest', SubscribeEpochsResponse = 'SubscribeEpochsResponse', + /** Request: subscribe to new mempool transaction notifications. */ + SubscribeMempoolRequest = 'SubscribeMempoolRequest', + /** Response: mempool subscription confirmation. */ + SubscribeMempoolResponse = 'SubscribeMempoolResponse', UnsubscribeRequest = 'UnsubscribeRequest', UnsubscribeResponse = 'UnsubscribeResponse', // Notifications NewBlockNotification = 'NewBlockNotification', NewEpochNotification = 'NewEpochNotification', + /** Server push notification: a new transaction entered the mempool. */ + NewMempoolTransactionNotification = 'NewMempoolTransactionNotification', // Common types (for internal use) BlockIdentifier = 'BlockIdentifier', diff --git a/src/src/api/websocket/types/enums/SubscriptionType.ts b/src/src/api/websocket/types/enums/SubscriptionType.ts index ec746e5aa..872568426 100644 --- a/src/src/api/websocket/types/enums/SubscriptionType.ts +++ b/src/src/api/websocket/types/enums/SubscriptionType.ts @@ -8,6 +8,9 @@ export enum SubscriptionType { /** Subscribe to new epoch notifications */ EPOCHS = 1, + + /** Subscribe to new mempool transaction notifications */ + MEMPOOL = 2, } /** @@ -19,6 +22,8 @@ export function getSubscriptionTypeName(type: SubscriptionType): string { return 'BLOCKS'; case SubscriptionType.EPOCHS: return 'EPOCHS'; + case SubscriptionType.MEMPOOL: + return 'MEMPOOL'; default: return 'UNKNOWN'; } diff --git a/src/src/api/websocket/types/opcodes/WebSocketOpcodes.ts b/src/src/api/websocket/types/opcodes/WebSocketOpcodes.ts index 522e8a379..ce4ae153b 100644 --- a/src/src/api/websocket/types/opcodes/WebSocketOpcodes.ts +++ b/src/src/api/websocket/types/opcodes/WebSocketOpcodes.ts @@ -21,6 +21,14 @@ export enum WebSocketRequestOpcode { BROADCAST_TRANSACTION = 0x22, GET_PREIMAGE = 0x23, + // Mempool Methods (0x24 - 0x2F) + /** Request aggregate mempool statistics. */ + GET_MEMPOOL_INFO = 0x24, + /** Request a single pending transaction by hash. */ + GET_PENDING_TRANSACTION = 0x25, + /** Request the latest pending transactions (with optional address filter). */ + GET_LATEST_PENDING_TRANSACTIONS = 0x26, + // Address Methods (0x30 - 0x3F) GET_BALANCE = 0x30, GET_UTXOS = 0x31, @@ -45,6 +53,8 @@ export enum WebSocketRequestOpcode { // Subscription Methods (0x70 - 0x7F) SUBSCRIBE_BLOCKS = 0x70, SUBSCRIBE_EPOCHS = 0x71, + /** Subscribe to new mempool transaction notifications. */ + SUBSCRIBE_MEMPOOL = 0x72, UNSUBSCRIBE = 0x7f, } @@ -71,6 +81,12 @@ export enum WebSocketResponseOpcode { TRANSACTION_RECEIPT = 0xa1, BROADCAST_RESULT = 0xa2, PREIMAGE = 0xa3, + /** Response containing aggregate mempool statistics. */ + MEMPOOL_INFO = 0xa4, + /** Response containing a single pending mempool transaction. */ + PENDING_TRANSACTION = 0xa5, + /** Response containing the latest pending mempool transactions. */ + LATEST_PENDING_TRANSACTIONS = 0xa6, // Address Method Responses (0xB0 - 0xBF) BALANCE = 0xb0, @@ -98,6 +114,8 @@ export enum WebSocketResponseOpcode { // Server Push Notifications NEW_BLOCK_NOTIFICATION = 0xf8, NEW_EPOCH_NOTIFICATION = 0xf9, + /** Server push: a new transaction entered the mempool. */ + NEW_MEMPOOL_TX_NOTIFICATION = 0xfa, } /** @@ -126,6 +144,10 @@ export const RequestToResponseOpcode: Readonly< [WebSocketRequestOpcode.BROADCAST_TRANSACTION]: WebSocketResponseOpcode.BROADCAST_RESULT, [WebSocketRequestOpcode.GET_PREIMAGE]: WebSocketResponseOpcode.PREIMAGE, + [WebSocketRequestOpcode.GET_MEMPOOL_INFO]: WebSocketResponseOpcode.MEMPOOL_INFO, + [WebSocketRequestOpcode.GET_PENDING_TRANSACTION]: WebSocketResponseOpcode.PENDING_TRANSACTION, + [WebSocketRequestOpcode.GET_LATEST_PENDING_TRANSACTIONS]: WebSocketResponseOpcode.LATEST_PENDING_TRANSACTIONS, + [WebSocketRequestOpcode.GET_BALANCE]: WebSocketResponseOpcode.BALANCE, [WebSocketRequestOpcode.GET_UTXOS]: WebSocketResponseOpcode.UTXOS, [WebSocketRequestOpcode.GET_PUBLIC_KEY_INFO]: WebSocketResponseOpcode.PUBLIC_KEY_INFO, @@ -145,6 +167,7 @@ export const RequestToResponseOpcode: Readonly< [WebSocketRequestOpcode.SUBSCRIBE_BLOCKS]: WebSocketResponseOpcode.SUBSCRIPTION_CREATED, [WebSocketRequestOpcode.SUBSCRIBE_EPOCHS]: WebSocketResponseOpcode.SUBSCRIPTION_CREATED, + [WebSocketRequestOpcode.SUBSCRIBE_MEMPOOL]: WebSocketResponseOpcode.SUBSCRIPTION_CREATED, [WebSocketRequestOpcode.UNSUBSCRIBE]: WebSocketResponseOpcode.UNSUBSCRIBE_RESULT, }; @@ -165,6 +188,9 @@ export const OpcodeNames: Readonly> = { [WebSocketRequestOpcode.GET_TRANSACTION_RECEIPT]: 'GET_TRANSACTION_RECEIPT', [WebSocketRequestOpcode.BROADCAST_TRANSACTION]: 'BROADCAST_TRANSACTION', [WebSocketRequestOpcode.GET_PREIMAGE]: 'GET_PREIMAGE', + [WebSocketRequestOpcode.GET_MEMPOOL_INFO]: 'GET_MEMPOOL_INFO', + [WebSocketRequestOpcode.GET_PENDING_TRANSACTION]: 'GET_PENDING_TRANSACTION', + [WebSocketRequestOpcode.GET_LATEST_PENDING_TRANSACTIONS]: 'GET_LATEST_PENDING_TRANSACTIONS', [WebSocketRequestOpcode.GET_BALANCE]: 'GET_BALANCE', [WebSocketRequestOpcode.GET_UTXOS]: 'GET_UTXOS', [WebSocketRequestOpcode.GET_PUBLIC_KEY_INFO]: 'GET_PUBLIC_KEY_INFO', @@ -180,6 +206,7 @@ export const OpcodeNames: Readonly> = { [WebSocketRequestOpcode.SUBMIT_EPOCH]: 'SUBMIT_EPOCH', [WebSocketRequestOpcode.SUBSCRIBE_BLOCKS]: 'SUBSCRIBE_BLOCKS', [WebSocketRequestOpcode.SUBSCRIBE_EPOCHS]: 'SUBSCRIBE_EPOCHS', + [WebSocketRequestOpcode.SUBSCRIBE_MEMPOOL]: 'SUBSCRIBE_MEMPOOL', [WebSocketRequestOpcode.UNSUBSCRIBE]: 'UNSUBSCRIBE', // Response opcodes @@ -194,6 +221,9 @@ export const OpcodeNames: Readonly> = { [WebSocketResponseOpcode.TRANSACTION_RECEIPT]: 'TRANSACTION_RECEIPT', [WebSocketResponseOpcode.BROADCAST_RESULT]: 'BROADCAST_RESULT', [WebSocketResponseOpcode.PREIMAGE]: 'PREIMAGE', + [WebSocketResponseOpcode.MEMPOOL_INFO]: 'MEMPOOL_INFO', + [WebSocketResponseOpcode.PENDING_TRANSACTION]: 'PENDING_TRANSACTION', + [WebSocketResponseOpcode.LATEST_PENDING_TRANSACTIONS]: 'LATEST_PENDING_TRANSACTIONS', [WebSocketResponseOpcode.BALANCE]: 'BALANCE', [WebSocketResponseOpcode.UTXOS]: 'UTXOS', [WebSocketResponseOpcode.PUBLIC_KEY_INFO]: 'PUBLIC_KEY_INFO', @@ -209,4 +239,5 @@ export const OpcodeNames: Readonly> = { [WebSocketResponseOpcode.UNSUBSCRIBE_RESULT]: 'UNSUBSCRIBE_RESULT', [WebSocketResponseOpcode.NEW_BLOCK_NOTIFICATION]: 'NEW_BLOCK_NOTIFICATION', [WebSocketResponseOpcode.NEW_EPOCH_NOTIFICATION]: 'NEW_EPOCH_NOTIFICATION', + [WebSocketResponseOpcode.NEW_MEMPOOL_TX_NOTIFICATION]: 'NEW_MEMPOOL_TX_NOTIFICATION', }; diff --git a/src/src/api/websocket/types/requests/WebSocketRequestTypes.ts b/src/src/api/websocket/types/requests/WebSocketRequestTypes.ts index 3b624e2c1..b0e700e16 100644 --- a/src/src/api/websocket/types/requests/WebSocketRequestTypes.ts +++ b/src/src/api/websocket/types/requests/WebSocketRequestTypes.ts @@ -142,6 +142,29 @@ export interface SubmitEpochRequest extends BaseRequest { readonly signature: string; } +// ============================================================================ +// Mempool Requests +// ============================================================================ + +/** WebSocket request for mempool statistics (no additional fields). */ +export interface GetMempoolInfoWsRequest extends BaseRequest {} + +/** WebSocket request to fetch a single pending transaction by hash. */ +export interface GetPendingTransactionWsRequest extends BaseRequest { + /** The 64-character hex transaction hash. */ + readonly hash: string; +} + +/** WebSocket request to fetch the latest pending transactions. */ +export interface GetLatestPendingTransactionsWsRequest extends BaseRequest { + /** A single address to auto-resolve into all derived wallet address types. */ + readonly address?: string; + /** Explicit list of addresses to filter by. */ + readonly addresses?: string[]; + /** Maximum number of transactions to return. */ + readonly limit?: number; +} + // ============================================================================ // Subscription Requests // ============================================================================ @@ -150,6 +173,9 @@ export interface SubscribeBlocksRequest extends BaseRequest {} export interface SubscribeEpochsRequest extends BaseRequest {} +/** WebSocket request to subscribe to new mempool transaction notifications. */ +export interface SubscribeMempoolWsRequest extends BaseRequest {} + export interface UnsubscribeRequest extends BaseRequest { readonly subscriptionId: number; } diff --git a/src/src/config/BtcIndexerConfigLoader.ts b/src/src/config/BtcIndexerConfigLoader.ts index b4a0bc22a..e8cfbccf4 100644 --- a/src/src/config/BtcIndexerConfigLoader.ts +++ b/src/src/config/BtcIndexerConfigLoader.ts @@ -152,6 +152,12 @@ export class BtcIndexerConfigManager extends ConfigManager, + ): void { + if ( + parsedConfig.MAX_ADDRESSES !== undefined && + typeof parsedConfig.MAX_ADDRESSES !== 'number' + ) { + throw new Error(`Oops the property API.MEMPOOL.MAX_ADDRESSES is not a number.`); + } + + if ( + parsedConfig.DEFAULT_LIMIT !== undefined && + typeof parsedConfig.DEFAULT_LIMIT !== 'number' + ) { + throw new Error(`Oops the property API.MEMPOOL.DEFAULT_LIMIT is not a number.`); + } + + if (parsedConfig.MAX_LIMIT !== undefined && typeof parsedConfig.MAX_LIMIT !== 'number') { + throw new Error(`Oops the property API.MEMPOOL.MAX_LIMIT is not a number.`); + } + } + private verifyBase58Configs(parsedConfig: Partial): void { if ( typeof parsedConfig.PUBKEY_ADDRESS !== 'string' && @@ -1259,10 +1292,25 @@ export class BtcIndexerConfigManager extends ConfigManager { return this.convertToObj(result); } + public async getMempoolInfo(): Promise<{ + count: number; + opnetCount: number; + size: number; + }> { + const collection = this.getCollection(); + const options: AggregateOptions = this.getOptions() as AggregateOptions; + options.allowDiskUse = true; + + try { + const aggregation: Document[] = [ + { + $group: { + _id: null, + count: { $sum: 1 }, + opnetCount: { + $sum: { $cond: [{ $eq: ['$isOPNet', true] }, 1, 0] }, + }, + size: { $sum: { $bsonSize: '$$ROOT' } }, + }, + }, + { + $project: { + _id: 0, + count: 1, + opnetCount: 1, + size: 1, + }, + }, + ]; + + const results = await collection + .aggregate<{ count: number; opnetCount: number; size: number }>( + aggregation, + options, + ) + .toArray(); + + if (!results.length) { + return { count: 0, opnetCount: 0, size: 0 }; + } + + return results[0]; + } catch (e) { + this.error(`Can not fetch mempool info: ${(e as Error).stack}`); + throw e; + } + } + + public async getLatestTransactions( + addresses?: string[], + limit: number = 25, + ): Promise { + const collection = this.getCollection(); + const options: AggregateOptions = this.getOptions() as AggregateOptions; + options.allowDiskUse = true; + + const pipeline: Document[] = []; + + if (addresses && addresses.length > 0) { + pipeline.push({ + $match: { + 'outputs.address': { $in: addresses }, + }, + }); + } + + pipeline.push( + { $sort: { firstSeen: -1 } }, + { $limit: limit }, + ); + + try { + const results = (await collection + .aggregate(pipeline, options) + .toArray()) as IMempoolTransaction[]; + + return results.map(this.convertToObj.bind(this)); + } catch (e) { + this.error(`Can not fetch latest pending transactions: ${(e as Error).stack}`); + throw e; + } + } + /*public async purgeOldTransactions(currentBlock: bigint): Promise { // If the transaction is older than 20 blocks, we must purge it. const criteria: Filter = { diff --git a/src/src/vm/storage/VMStorage.ts b/src/src/vm/storage/VMStorage.ts index e75be4798..b5fc79df2 100644 --- a/src/src/vm/storage/VMStorage.ts +++ b/src/src/vm/storage/VMStorage.ts @@ -8,6 +8,7 @@ import { BlockHeaderDocument, } from '../../db/interfaces/IBlockHeaderBlockDocument.js'; import { IReorgData, IReorgDocument } from '../../db/interfaces/IReorgDocument.js'; +import { IMempoolTransactionObj } from '../../db/interfaces/IMempoolTransaction.js'; import { ITransactionDocument } from '../../db/interfaces/ITransactionDocument.js'; import { IParsedBlockWitnessDocument } from '../../db/models/IBlockWitnessDocument.js'; import { MemoryValue, ProvenMemoryValue, ProvenPointers } from './types/MemoryValue.js'; @@ -22,7 +23,6 @@ import { SafeBigInt } from '../../api/routes/safe/BlockParamsConverter.js'; import { ITargetEpochDocument } from '../../db/documents/interfaces/ITargetEpochDocument.js'; import { AttestationProof } from '../../blockchain-indexer/processor/block/merkle/EpochMerkleTree.js'; import { ChallengeSolution } from '../../blockchain-indexer/processor/interfaces/TransactionPreimage.js'; -import { IMempoolTransactionObj } from '../../db/interfaces/IMempoolTransaction.js'; import { IMLDSAPublicKey, MLDSAUpdateData } from '../../db/interfaces/IMLDSAPublicKey.js'; import { MLDSAPublicKeyExists } from '../../db/repositories/MLDSAPublicKeysRepository.js'; @@ -114,6 +114,21 @@ export abstract class VMStorage extends Logger { hash: string, ): Promise | undefined>; + public abstract getMempoolInfo(): Promise<{ + count: number; + opnetCount: number; + size: number; + }>; + + public abstract getMempoolTransaction( + id: string, + ): Promise; + + public abstract getLatestPendingTransactions( + addresses?: string[], + limit?: number, + ): Promise; + public abstract saveTransactions( transaction: ITransactionDocument[], ): Promise; diff --git a/src/src/vm/storage/databases/VMMongoStorage.ts b/src/src/vm/storage/databases/VMMongoStorage.ts index e93ead025..d98757ba1 100644 --- a/src/src/vm/storage/databases/VMMongoStorage.ts +++ b/src/src/vm/storage/databases/VMMongoStorage.ts @@ -11,6 +11,7 @@ import { IBlockHeaderBlockDocument, } from '../../../db/interfaces/IBlockHeaderBlockDocument.js'; import { IReorgData, IReorgDocument } from '../../../db/interfaces/IReorgDocument.js'; +import { IMempoolTransactionObj } from '../../../db/interfaces/IMempoolTransaction.js'; import { ITransactionDocument } from '../../../db/interfaces/ITransactionDocument.js'; import { IParsedBlockWitnessDocument } from '../../../db/models/IBlockWitnessDocument.js'; import { BlockRepository } from '../../../db/repositories/BlockRepository.js'; @@ -38,7 +39,6 @@ import { TargetEpochRepository } from '../../../db/repositories/TargetEpochRepos import { ITargetEpochDocument } from '../../../db/documents/interfaces/ITargetEpochDocument.js'; import { AttestationProof } from '../../../blockchain-indexer/processor/block/merkle/EpochMerkleTree.js'; import { ChallengeSolution } from '../../../blockchain-indexer/processor/interfaces/TransactionPreimage.js'; -import { IMempoolTransactionObj } from '../../../db/interfaces/IMempoolTransaction.js'; import { SpentUTXOSOutputTransaction, UTXOsOutputTransactions, @@ -519,6 +519,36 @@ export class VMMongoStorage extends VMStorage { return await this.transactionRepository.getTransactionByHash(hash); } + public async getMempoolInfo(): Promise<{ + count: number; + opnetCount: number; + size: number; + }> { + if (!this.mempoolRepository) { + throw new Error('Mempool repository not initialized'); + } + return await this.mempoolRepository.getMempoolInfo(); + } + + public async getMempoolTransaction( + id: string, + ): Promise { + if (!this.mempoolRepository) { + throw new Error('Mempool repository not initialized'); + } + return await this.mempoolRepository.getTransactionById(id); + } + + public async getLatestPendingTransactions( + addresses?: string[], + limit?: number, + ): Promise { + if (!this.mempoolRepository) { + throw new Error('Mempool repository not initialized'); + } + return await this.mempoolRepository.getLatestTransactions(addresses, limit); + } + public async close(): Promise { if (this.config.DEBUG_LEVEL >= DebugLevel.ALL) { this.debug('Closing database');