From f6cdc4ae1ec52ab0ebb8e67b9d7d7edd3f902bd4 Mon Sep 17 00:00:00 2001 From: Eoghan O'Hara Date: Fri, 30 Jan 2026 12:01:57 +0000 Subject: [PATCH 01/16] initial commit --- code/common/datareplay.q | 2 ++ 1 file changed, 2 insertions(+) diff --git a/code/common/datareplay.q b/code/common/datareplay.q index 0a67eb3e8..f45d1561e 100644 --- a/code/common/datareplay.q +++ b/code/common/datareplay.q @@ -108,3 +108,5 @@ tablesToDataStream:{[params] }; \d . + +//! initial commit \ No newline at end of file From 73ca3e0f6bbcca19691fb577a90e2de6a87010a9 Mon Sep 17 00:00:00 2001 From: Conor McLarnon Date: Wed, 4 Feb 2026 14:00:41 +0000 Subject: [PATCH 02/16] If .backtest.init[] is called on any proc, this turns it into a backtest process and overrides pub/sub/upd --- code/common/backtest.q | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 code/common/backtest.q diff --git a/code/common/backtest.q b/code/common/backtest.q new file mode 100644 index 000000000..94e0d2a4c --- /dev/null +++ b/code/common/backtest.q @@ -0,0 +1,35 @@ +\d .backtest + +/TESTING:TO BE REMOVED +test:`name`version`tabs`sts`ets`interval`timer`timerfunc!(`vwappublisher;1;`trade;2026.01.22D00:00:00.00;2026.01.22D01:00:00.00;0D00:10:00.00;1b;`.vwapsub.logvwap); + +initRan:0b; + +init:{[] + requiredProc:`backtestdb; + .servers.registerfromdiscovery[requiredProc;1b]; + .backtest.rdb:neg first exec w from .servers.SERVERS where procname=requiredProc; + `upd set .backtest.upd; + .backtest.id:first 1?0Ng; + .backtest.initRan:1b; + }; + +upd:{[t;d] + /Running old upd function + .[` sv `,.proc.proctype,`upd;(t;d)]; + }; + +/ To run backtest, optional where +run:{[params] + if[not initRan;'"Please run .backtest.init to override functions to backtest before running .backtest.run";]; + if[not all `name`version`tabs`sts`ets`interval`timer`timerfunc in key params;'"Please ensure all mandatory params have been populated";]; + / Remove optional where when not required + if[`where in key params; if[not count params`where;params:`where _params]]; + params[`h]:first exec w from .servers.SERVERS where proctype=`hdb; + msgs:.datareplay.tablesToDataStream `name`version _params; + / Publishing configuration of backtest + rdb(`upd;`config;(id;.z.p;;;;;enlist query;params). params`name`version`sts`ets); + value each msgs`msg; + }; + +\d . From 4c35fcb702812d8634b54d592d978bf9d27a5a95 Mon Sep 17 00:00:00 2001 From: Conor McLarnon Date: Wed, 4 Feb 2026 15:42:35 +0000 Subject: [PATCH 03/16] Storing query passed to hdb for upsert to config table --- code/common/datareplay.q | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/code/common/datareplay.q b/code/common/datareplay.q index f45d1561e..cc79e69a9 100644 --- a/code/common/datareplay.q +++ b/code/common/datareplay.q @@ -65,7 +65,7 @@ tableToDataStream:{[params] // Have hdb evaluate select statement. t:@[params[`h]; - (eval;(?;params[`tn];enlist wherec;0b;())); + (eval;.backtest.query:(?;params[`tn];enlist wherec;0b;())); {.lg.e[`dataloader;"Failed to evauluate query on hdb: ",x]} ]; @@ -108,5 +108,3 @@ tablesToDataStream:{[params] }; \d . - -//! initial commit \ No newline at end of file From 14b25ed2c721537a5369f99c68b1e6c364f9c8bf Mon Sep 17 00:00:00 2001 From: Conor McLarnon Date: Thu, 5 Feb 2026 15:22:40 +0000 Subject: [PATCH 04/16] Adding backtest instance to replay the data which is called from engine running backtest --- code/common/backtest.q | 49 ++++++++++++++++++++++-------------- code/processes/backtestpub.q | 21 ++++++++++++++++ 2 files changed, 51 insertions(+), 19 deletions(-) create mode 100644 code/processes/backtestpub.q diff --git a/code/common/backtest.q b/code/common/backtest.q index 94e0d2a4c..d06da2d3d 100644 --- a/code/common/backtest.q +++ b/code/common/backtest.q @@ -1,35 +1,46 @@ \d .backtest -/TESTING:TO BE REMOVED -test:`name`version`tabs`sts`ets`interval`timer`timerfunc!(`vwappublisher;1;`trade;2026.01.22D00:00:00.00;2026.01.22D01:00:00.00;0D00:10:00.00;1b;`.vwapsub.logvwap); - +/ Params to be passed to .backtest.run to kick off backtest, edit to fit usecase +test:`name`version`tabs`sts`ets`interval`timer`timerfunc!(`;1;`;0Np;0Np;0D00:10:00.00;0b;`); initRan:0b; +/ TO BE DELETED, TESTING ONLY +test:`name`version`tabs`sts`ets`interval`timer`timerfunc!(`vwappublisher;1;`trade;2026.01.22D00:00:00.00;2026.01.22D01:00:00.00;0D00:10:00.00;1b;`.vwapsub.logvwap); + init:{[] - requiredProc:`backtestdb; - .servers.registerfromdiscovery[requiredProc;1b]; - .backtest.rdb:neg first exec w from .servers.SERVERS where procname=requiredProc; - `upd set .backtest.upd; - .backtest.id:first 1?0Ng; + requiredProcs:`backtestdb`backtestpub; + .servers.registerfromdiscovery[requiredProcs;1b]; + .backtest.rdbh:neg first exec w from .servers.SERVERS where procname=`backtestdb; + .backtest.pubh:neg first exec w from .servers.SERVERS where procname=`backtestpub; + `.u.pub set .backtest.pub; .backtest.initRan:1b; }; -upd:{[t;d] - /Running old upd function - .[` sv `,.proc.proctype,`upd;(t;d)]; - }; +/ Receive full message from datareplay, extract details from msg before running msg func +extractmessage:{[msgs] + .dbg.msg:msgs; + msg:msgs`msg; + .backtest.simtime:msgs`time; + .backtest.name:first msg; + value msg + }; + +pub:{[t;d] + .dbg.pub:(t;d); + rdbh(`upd;`output;(.z.p;id;simtime;name;d)); + }; / To run backtest, optional where run:{[params] + if[.proc.procname=`backtestpub;'"Backtest should be ran from the process you are backtesting not backtest instance itself"]; if[not initRan;'"Please run .backtest.init to override functions to backtest before running .backtest.run";]; if[not all `name`version`tabs`sts`ets`interval`timer`timerfunc in key params;'"Please ensure all mandatory params have been populated";]; - / Remove optional where when not required - if[`where in key params; if[not count params`where;params:`where _params]]; - params[`h]:first exec w from .servers.SERVERS where proctype=`hdb; - msgs:.datareplay.tablesToDataStream `name`version _params; - / Publishing configuration of backtest - rdb(`upd;`config;(id;.z.p;;;;;enlist query;params). params`name`version`sts`ets); - value each msgs`msg; + / Remove optional where, when not required + if[`wherei in key params; if[not count params`where;params:`where _params]]; + / Random guid generated to easily pair up config to output + .backtest.id:first -1?0Ng; + / Kick off backtest from backtestpub which will replay the data back through the process running backtest + pubh(`.backtest.datareplay;params;.backtest.id); }; \d . diff --git a/code/processes/backtestpub.q b/code/processes/backtestpub.q new file mode 100644 index 000000000..ed68b514c --- /dev/null +++ b/code/processes/backtestpub.q @@ -0,0 +1,21 @@ +\d .backtest + +init:{[] + .servers.CONNECTIONS:`hdb`backtestdb; + .servers.startup[]; + .backtest.hdb:first exec w from .servers.SERVERS where proctype=`hdb; + .backtest.rdb:neg first exec w from .servers.SERVERS where procname=`backtestdb; + }; + +datareplay:{[params;id] + .dbg.replay:(params;id;.z.w); + params[`h]:first exec w from .servers.SERVERS where proctype=`hdb; + msgs:.datareplay.tablesToDataStream `name`version _params; + / Publishing configuration of backtest, needs to run after datareplay to pick up dataconfig + rdb(`upd;`config;(id;.z.p;;;;;enlist query;`h _params). params`name`version`sts`ets); + {neg[.z.w](`.backtest.extractmessage;x)}each msgs + }; + +\d . + +.backtest.init[]; From b8b5ebe18e030f69f74da632469d1b891828e312 Mon Sep 17 00:00:00 2001 From: Conor McLarnon Date: Fri, 6 Feb 2026 10:22:06 +0000 Subject: [PATCH 05/16] Updates to backtest instance + common override script --- code/common/backtest.q | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/code/common/backtest.q b/code/common/backtest.q index d06da2d3d..6cb3925b9 100644 --- a/code/common/backtest.q +++ b/code/common/backtest.q @@ -32,15 +32,21 @@ pub:{[t;d] / To run backtest, optional where run:{[params] + params:validaterun[params]; + / Random guid generated to match config to output + .backtest.id:first -1?0Ng; + / Kick off backtest from backtestpub which will replay the data back through the process running backtest + pubh(`.backtest.datareplay;params;.backtest.id); + }; + +validaterun:{[params] if[.proc.procname=`backtestpub;'"Backtest should be ran from the process you are backtesting not backtest instance itself"]; if[not initRan;'"Please run .backtest.init to override functions to backtest before running .backtest.run";]; if[not all `name`version`tabs`sts`ets`interval`timer`timerfunc in key params;'"Please ensure all mandatory params have been populated";]; + if[count where null params;'"Not all mandatory keys have been populated"]; / Remove optional where, when not required - if[`wherei in key params; if[not count params`where;params:`where _params]]; - / Random guid generated to easily pair up config to output - .backtest.id:first -1?0Ng; - / Kick off backtest from backtestpub which will replay the data back through the process running backtest - pubh(`.backtest.datareplay;params;.backtest.id); + if[`where in key params; if[not count params`where;params:`where _params]]; + params }; \d . From 00a929f20babf60fdfc9ccfb44c6b19d06dc2d0e Mon Sep 17 00:00:00 2001 From: Conor McLarnon Date: Fri, 6 Feb 2026 13:57:43 +0000 Subject: [PATCH 06/16] Break out intervals to separate upd/timers --- code/common/datareplay.q | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/code/common/datareplay.q b/code/common/datareplay.q index cc79e69a9..d0d7fe3f2 100644 --- a/code/common/datareplay.q +++ b/code/common/datareplay.q @@ -6,18 +6,20 @@ getBuckets:{[s;e;p](s+p*til(ceiling 1+e%p)-(ceiling s%p))} // params[`t] is table data // params[`tc] is time column to cut on // params[`tn] is table name -// params[`interval] is the time interval to bucket the messages into. +// params[`timerinterval] is the timer time interval to bucket the messages into. +// params[`datainterval] is the data time interval to bucket the messages into. tableDataToDataStream:{[params] + .dbg.params:params; // Sort table by time column. params[`t]:params[`tc] xasc delete date from params[`t]; // get all times from table t_times:params[`t][params[`tc]]; - $[not null params[`interval]; + $[not null params[`datainterval]; [ // if there is an interval, bucket messages into this interval - // make bukets of ten second intervals - times:getBuckets[params[`sts];params[`ets];params[`interval]]; + // make buckets of ten second intervals + times:getBuckets[params[`sts];params[`ets];params[`datainterval]]; // put start time in fornt of t_times t_times:params[`sts],t_times; @@ -38,7 +40,7 @@ tableDataToDataStream:{[params] // Return table of times and message chunks -1_([]time:time;msg:{(`upd;x;y)}[params[`tn]] each msgs) ]; - // if there is no intevral, cut by distinct time. + // if there is no interval, cut by distinct time. ([] time:distinct t_times; msg:{(`upd;x;$[1 Date: Fri, 6 Feb 2026 14:19:22 +0000 Subject: [PATCH 07/16] Update intervals --- code/common/backtest.q | 8 ++-- code/processes/vwapsub.q | 96 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+), 4 deletions(-) create mode 100644 code/processes/vwapsub.q diff --git a/code/common/backtest.q b/code/common/backtest.q index 6cb3925b9..f56c7d296 100644 --- a/code/common/backtest.q +++ b/code/common/backtest.q @@ -1,11 +1,11 @@ \d .backtest / Params to be passed to .backtest.run to kick off backtest, edit to fit usecase -test:`name`version`tabs`sts`ets`interval`timer`timerfunc!(`;1;`;0Np;0Np;0D00:10:00.00;0b;`); +test:`name`version`tabs`sts`ets`datainterval`timer`timerinterval`timerfunc!(`;1;`;0Np;0Np;0Nn;0b;0Nn;`); initRan:0b; / TO BE DELETED, TESTING ONLY -test:`name`version`tabs`sts`ets`interval`timer`timerfunc!(`vwappublisher;1;`trade;2026.01.22D00:00:00.00;2026.01.22D01:00:00.00;0D00:10:00.00;1b;`.vwapsub.logvwap); +test:`name`version`tabs`sts`ets`datainterval`timer`timerinterval`timerfunc!(`vwappublisher;1;`trade;2026.01.22D00:00:00.00;2026.01.22D01:00:00.00;0Nn;1b;0D00:10:00.00;`.vwapsub.logvwap); init:{[] requiredProcs:`backtestdb`backtestpub; @@ -42,8 +42,8 @@ run:{[params] validaterun:{[params] if[.proc.procname=`backtestpub;'"Backtest should be ran from the process you are backtesting not backtest instance itself"]; if[not initRan;'"Please run .backtest.init to override functions to backtest before running .backtest.run";]; - if[not all `name`version`tabs`sts`ets`interval`timer`timerfunc in key params;'"Please ensure all mandatory params have been populated";]; - if[count where null params;'"Not all mandatory keys have been populated"]; + if[not all (key[test]except `where) in key params;'"Please ensure all mandatory params have been populated";]; + if[count where null `datainterval _params;'"Not all mandatory keys have been populated"]; / Remove optional where, when not required if[`where in key params; if[not count params`where;params:`where _params]]; params diff --git a/code/processes/vwapsub.q b/code/processes/vwapsub.q new file mode 100644 index 000000000..33b817bc4 --- /dev/null +++ b/code/processes/vwapsub.q @@ -0,0 +1,96 @@ +\d .vwapsub // enter vwapsub namespace + +tickerplanttypes:@[value;`tickerplanttypes;`segmentedtickerplant]; // tickerplant types to subscribe to +hdbtypes:@[value;`hdbtypes;`hdb]; // hdbtypes to connect to + +// datareplay settings +realtime:@[value;`realtime;0b]; // use realtime feed or datareplay. default is 0b (datareplay) +replayinterval:@[value;`replayinterval;0Nn]; // interval to run upd at (optional) +timerinterval:@[value;`timerinterval;0D00:10:00.00]; // interval to run calcvwap at +replaysts:@[value;`replaysts;2026.01.22D01:00:00.00]; // start time of data to retreive from hdb +replayets:@[value;`replayets;2026.01.23D17:00:00.00]; // end time of data to retrieve from hdb +requiredprocs:value(`hdbtypes`tickerplanttypes)realtime; // required processes +tpcheckcycles:@[value;`tpcheckcycles;0W]; // specify the number of times the process will check for requiredprocs +tpconnsleep:@[value;`tpconnsleep;10]; // number of seconds between attempts to connect to the source tickerplant + +// Add hdb and tickerplant to connections list for TorQ +.servers.CONNECTIONS:tickerplanttypes,hdbtypes,`rdb; + +// upd function gets sum of price*size and sum of size by sym +// and adds it to the running total inside the vwap table +// This can be used to calculate current vwap quickly. +upd:{[t;d] + .dbg.upd:(t;d); + if[t~`trade; + `vwap set (`.[`vwap]) + select spts:sum price*size,ssize:sum size by sym from d; + ]; + .u.pub[`vwap;(`.[`vwap])] + }; + +// Calculates vwap at current time and adds it to the vwaptimes table, at time t. +calcvwap:{ + //`vwaptimes insert `time`vwap!(t;(select vwap:spts%ssize by sym from `.[`vwap])); + select vwap:spts%ssize by sym from `.[`vwap] + }; + +// replay data set +datareplay:{[] + // Turn off timer + system"t 0"; + + // Block process until all required processes are connected + .servers.startupdepcycles[requiredprocs;tpconnsleep;tpcheckcycles]; + + // Retrieve handle to hdb from TorQ serverlist + h:first exec w from .servers.SERVERS where proctype in .vwapsub.hdbtypes; + + // whc:(parse"select from t where ex=\"N\"") 2; // example of optional where clause + + params: (!) . flip ((`tabs;`trade); + (`h;h); + (`sts;replaysts); + (`ets;replayets); + //(`where;whc); // Optional where clause + (`replayinterval;replayinterval); + (`timer;1b); + (`timerinterval;timerinterval); + (`timerfunc;`.vwapsub.logvwap)); + + // Run datareplay utility using avove parameters + msgs:.datareplay.tablesToDataStream params; + + // Execute each message. + value each msgs`msg; + }; + +logvwap:{.dbg.log:x;`vwaptimes insert `time`vwap!(x;.vwapsub.calcvwap[]);.u.pub[`vwaptimes;(`.[`vwaptimes])]}; + +// subscribe to tickerplant types +subscribe:{[] + // Block process until all required processes are connected + .servers.startupdepcycles[requiredprocs;tpconnsleep;tpcheckcycles]; + + if[count s:.sub.getsubscriptionhandles[tickerplanttypes;();()!()]; + .lg.o[`subscribe;"found available tickerplant, attempting to subscribe"]; + .sub.subscribe[`trade;`;0b;0b;first s]; + ]; + .timer.rep[`timestamp$.proc.cd[]+00:00;0Wp;timerinterval;(`logvwapnow;`);0h;"Run logvwapnow at set interval";1b] + } + +\d . + +vwap:([sym:`$()]spts:`float$();ssize:`int$()); +vwaptimes:([]time:`timestamp$();vwap:()); + +logvwapnow:{.vwapsub.logvwap[.z.p]}; + +// set upd function at top level +upd:.vwapsub.upd; + +// Perform server discovery +.servers.startup[]; + +/// use tickerplant or datareplay +/$[.vwapsub.realtime; +/ .vwapsub.subscribe[]; // sub to tickerplant +/ .vwapsub.datareplay[]]; // replay hdb data From c46fd9ca418ce2ea80913f60fb231ed5944a2a8f Mon Sep 17 00:00:00 2001 From: Conor McLarnon Date: Fri, 6 Feb 2026 14:35:29 +0000 Subject: [PATCH 08/16] Update intervals --- code/common/backtest.q | 6 +-- code/common/datareplay.q | 12 +++-- code/processes/vwapsub.q | 96 ---------------------------------------- 3 files changed, 8 insertions(+), 106 deletions(-) delete mode 100644 code/processes/vwapsub.q diff --git a/code/common/backtest.q b/code/common/backtest.q index f56c7d296..833113bb2 100644 --- a/code/common/backtest.q +++ b/code/common/backtest.q @@ -1,11 +1,11 @@ \d .backtest / Params to be passed to .backtest.run to kick off backtest, edit to fit usecase -test:`name`version`tabs`sts`ets`datainterval`timer`timerinterval`timerfunc!(`;1;`;0Np;0Np;0Nn;0b;0Nn;`); +test:`name`version`tabs`sts`ets`replayinterval`timer`timerinterval`timerfunc!(`;1;`;0Np;0Np;0Nn;0b;0Nn;`); initRan:0b; / TO BE DELETED, TESTING ONLY -test:`name`version`tabs`sts`ets`datainterval`timer`timerinterval`timerfunc!(`vwappublisher;1;`trade;2026.01.22D00:00:00.00;2026.01.22D01:00:00.00;0Nn;1b;0D00:10:00.00;`.vwapsub.logvwap); +test:`name`version`tabs`sts`ets`replayinterval`timer`timerinterval`timerfunc!(`vwappublisher;1;`trade;2026.01.22D00:00:00.00;2026.01.22D01:00:00.00;0Nn;1b;0D00:10:00.00;`.vwapsub.logvwap); init:{[] requiredProcs:`backtestdb`backtestpub; @@ -43,7 +43,7 @@ validaterun:{[params] if[.proc.procname=`backtestpub;'"Backtest should be ran from the process you are backtesting not backtest instance itself"]; if[not initRan;'"Please run .backtest.init to override functions to backtest before running .backtest.run";]; if[not all (key[test]except `where) in key params;'"Please ensure all mandatory params have been populated";]; - if[count where null `datainterval _params;'"Not all mandatory keys have been populated"]; + if[count where null `replayinterval _params;'"Not all mandatory keys have been populated"]; / Remove optional where, when not required if[`where in key params; if[not count params`where;params:`where _params]]; params diff --git a/code/common/datareplay.q b/code/common/datareplay.q index d0d7fe3f2..8b736b355 100644 --- a/code/common/datareplay.q +++ b/code/common/datareplay.q @@ -6,8 +6,7 @@ getBuckets:{[s;e;p](s+p*til(ceiling 1+e%p)-(ceiling s%p))} // params[`t] is table data // params[`tc] is time column to cut on // params[`tn] is table name -// params[`timerinterval] is the timer time interval to bucket the messages into. -// params[`datainterval] is the data time interval to bucket the messages into. +// params[`replayinterval] is the data time interval to bucket the messages into. tableDataToDataStream:{[params] .dbg.params:params; // Sort table by time column. @@ -16,10 +15,10 @@ tableDataToDataStream:{[params] // get all times from table t_times:params[`t][params[`tc]]; - $[not null params[`datainterval]; + $[not null params[`replayinterval]; [ // if there is an interval, bucket messages into this interval // make buckets of ten second intervals - times:getBuckets[params[`sts];params[`ets];params[`datainterval]]; + times:getBuckets[params[`sts];params[`ets];params[`replayinterval]]; // put start time in fornt of t_times t_times:params[`sts],t_times; @@ -79,7 +78,7 @@ tableToDataStream:{[params] // params[`tp] is the increment between times // params[`timerfunc] is the timer function to use getTimers:{[params] - times:getBuckets[params[`sts];params[`ets];params[`replayinterval]]; + times:getBuckets[params[`sts];params[`ets];params[`timerinterval]]; ([]time:times;msg:params[`timerfunc],'times) } @@ -92,11 +91,10 @@ getTimers:{[params] // params[`timer] is whether or not to retrieve timer - Default 0b // params[`h] is handle to hdb - Default 0 (self) // params[`timerinterval] is the time interval to bucket the timer messages into. - Not Required -// params[`datainterval] is the time interval to bucket the upd messages into. - Not Required // prarms[`tc] is the time column of the tables specified - Defualt `time // params[`timerfunc] is the timer function to use in timer messages - Default `.z.ts tablesToDataStream:{[params] - defaults:`timer`h`syms`datainterval`replayinterval`tc`timerfunc`where!(0b;0;`symbol$();`timespan$0n;`timespan$0n;`time;`.z.ts;()); + defaults:`timer`h`syms`replayinterval`timerinterval`tc`timerfunc`where!(0b;0;`symbol$();`timespan$0n;`timespan$0n;`time;`.z.ts;()); params:defaults,params; // check for default parameters `tabs`sts`ets diff --git a/code/processes/vwapsub.q b/code/processes/vwapsub.q deleted file mode 100644 index 33b817bc4..000000000 --- a/code/processes/vwapsub.q +++ /dev/null @@ -1,96 +0,0 @@ -\d .vwapsub // enter vwapsub namespace - -tickerplanttypes:@[value;`tickerplanttypes;`segmentedtickerplant]; // tickerplant types to subscribe to -hdbtypes:@[value;`hdbtypes;`hdb]; // hdbtypes to connect to - -// datareplay settings -realtime:@[value;`realtime;0b]; // use realtime feed or datareplay. default is 0b (datareplay) -replayinterval:@[value;`replayinterval;0Nn]; // interval to run upd at (optional) -timerinterval:@[value;`timerinterval;0D00:10:00.00]; // interval to run calcvwap at -replaysts:@[value;`replaysts;2026.01.22D01:00:00.00]; // start time of data to retreive from hdb -replayets:@[value;`replayets;2026.01.23D17:00:00.00]; // end time of data to retrieve from hdb -requiredprocs:value(`hdbtypes`tickerplanttypes)realtime; // required processes -tpcheckcycles:@[value;`tpcheckcycles;0W]; // specify the number of times the process will check for requiredprocs -tpconnsleep:@[value;`tpconnsleep;10]; // number of seconds between attempts to connect to the source tickerplant - -// Add hdb and tickerplant to connections list for TorQ -.servers.CONNECTIONS:tickerplanttypes,hdbtypes,`rdb; - -// upd function gets sum of price*size and sum of size by sym -// and adds it to the running total inside the vwap table -// This can be used to calculate current vwap quickly. -upd:{[t;d] - .dbg.upd:(t;d); - if[t~`trade; - `vwap set (`.[`vwap]) + select spts:sum price*size,ssize:sum size by sym from d; - ]; - .u.pub[`vwap;(`.[`vwap])] - }; - -// Calculates vwap at current time and adds it to the vwaptimes table, at time t. -calcvwap:{ - //`vwaptimes insert `time`vwap!(t;(select vwap:spts%ssize by sym from `.[`vwap])); - select vwap:spts%ssize by sym from `.[`vwap] - }; - -// replay data set -datareplay:{[] - // Turn off timer - system"t 0"; - - // Block process until all required processes are connected - .servers.startupdepcycles[requiredprocs;tpconnsleep;tpcheckcycles]; - - // Retrieve handle to hdb from TorQ serverlist - h:first exec w from .servers.SERVERS where proctype in .vwapsub.hdbtypes; - - // whc:(parse"select from t where ex=\"N\"") 2; // example of optional where clause - - params: (!) . flip ((`tabs;`trade); - (`h;h); - (`sts;replaysts); - (`ets;replayets); - //(`where;whc); // Optional where clause - (`replayinterval;replayinterval); - (`timer;1b); - (`timerinterval;timerinterval); - (`timerfunc;`.vwapsub.logvwap)); - - // Run datareplay utility using avove parameters - msgs:.datareplay.tablesToDataStream params; - - // Execute each message. - value each msgs`msg; - }; - -logvwap:{.dbg.log:x;`vwaptimes insert `time`vwap!(x;.vwapsub.calcvwap[]);.u.pub[`vwaptimes;(`.[`vwaptimes])]}; - -// subscribe to tickerplant types -subscribe:{[] - // Block process until all required processes are connected - .servers.startupdepcycles[requiredprocs;tpconnsleep;tpcheckcycles]; - - if[count s:.sub.getsubscriptionhandles[tickerplanttypes;();()!()]; - .lg.o[`subscribe;"found available tickerplant, attempting to subscribe"]; - .sub.subscribe[`trade;`;0b;0b;first s]; - ]; - .timer.rep[`timestamp$.proc.cd[]+00:00;0Wp;timerinterval;(`logvwapnow;`);0h;"Run logvwapnow at set interval";1b] - } - -\d . - -vwap:([sym:`$()]spts:`float$();ssize:`int$()); -vwaptimes:([]time:`timestamp$();vwap:()); - -logvwapnow:{.vwapsub.logvwap[.z.p]}; - -// set upd function at top level -upd:.vwapsub.upd; - -// Perform server discovery -.servers.startup[]; - -/// use tickerplant or datareplay -/$[.vwapsub.realtime; -/ .vwapsub.subscribe[]; // sub to tickerplant -/ .vwapsub.datareplay[]]; // replay hdb data From 0bf9d5c1fd85d09d93fca0e2779e7954066d6b1d Mon Sep 17 00:00:00 2001 From: Conor McLarnon Date: Mon, 9 Feb 2026 17:19:29 +0000 Subject: [PATCH 09/16] Updating backtest for demo --- code/common/backtest.q | 43 +++++++++++++++++++++++++----------- code/processes/backtestdb.q | 4 ++++ code/processes/backtestpub.q | 8 +++++-- 3 files changed, 40 insertions(+), 15 deletions(-) create mode 100644 code/processes/backtestdb.q diff --git a/code/common/backtest.q b/code/common/backtest.q index 833113bb2..d144e42cb 100644 --- a/code/common/backtest.q +++ b/code/common/backtest.q @@ -1,17 +1,18 @@ \d .backtest / Params to be passed to .backtest.run to kick off backtest, edit to fit usecase -test:`name`version`tabs`sts`ets`replayinterval`timer`timerinterval`timerfunc!(`;1;`;0Np;0Np;0Nn;0b;0Nn;`); +requiredparams:`name`version`tabs`sts`ets`replayinterval`timer`timerinterval`timerfunc!(`;1;`;0Np;0Np;0Nn;0b;0Nn;`); initRan:0b; / TO BE DELETED, TESTING ONLY test:`name`version`tabs`sts`ets`replayinterval`timer`timerinterval`timerfunc!(`vwappublisher;1;`trade;2026.01.22D00:00:00.00;2026.01.22D01:00:00.00;0Nn;1b;0D00:10:00.00;`.vwapsub.logvwap); init:{[] - requiredProcs:`backtestdb`backtestpub; - .servers.registerfromdiscovery[requiredProcs;1b]; - .backtest.rdbh:neg first exec w from .servers.SERVERS where procname=`backtestdb; - .backtest.pubh:neg first exec w from .servers.SERVERS where procname=`backtestpub; + proctypes:exec proctype from(("SSSS";enlist",")0:hsym`$getenv`TORQPROCESSES)where proctype like "*backtest*"; + .servers.registerfromdiscovery[proctypes;1b]; + servers:select w, procname from .servers.SERVERS where proctype in proctypes; + .backtest.rdbh:neg first exec w from servers where procname like "*db"; + .backtest.pubh:neg first exec w from servers where procname like "*pub*"; `.u.pub set .backtest.pub; .backtest.initRan:1b; }; @@ -32,20 +33,36 @@ pub:{[t;d] / To run backtest, optional where run:{[params] - params:validaterun[params]; + params:validateparams[params]; / Random guid generated to match config to output .backtest.id:first -1?0Ng; / Kick off backtest from backtestpub which will replay the data back through the process running backtest pubh(`.backtest.datareplay;params;.backtest.id); }; -validaterun:{[params] - if[.proc.procname=`backtestpub;'"Backtest should be ran from the process you are backtesting not backtest instance itself"]; - if[not initRan;'"Please run .backtest.init to override functions to backtest before running .backtest.run";]; - if[not all (key[test]except `where) in key params;'"Please ensure all mandatory params have been populated";]; - if[count where null `replayinterval _params;'"Not all mandatory keys have been populated"]; - / Remove optional where, when not required - if[`where in key params; if[not count params`where;params:`where _params]]; +validateparams:{[params] + if[.proc.procname like "*backtest*"; + '"Backtest should be ran from the process you are backtesting not backtest instance itself"; + ]; + if[not initRan; + '"Please run .backtest.init to override functions to backtest before running .backtest.run"; + ]; + / Check param datatypes + if[any wrongtyp:not {(type x y)=type .backtest.requiredparams y}[params;]each kp:key[params]except `where; + '"The following param key(s) is not of the correct datatype - ","," sv string kp where wrongtyp; + ]; + / Check params keys complete + if[any missingparams:not (requiredkeys:key requiredparams) in key params; + '"Not all mandatory params have been used - ", "," sv string requiredkeys where missingparams; + ]; + / Check params keys non null on complusary keys, (replay interval optional but timer interval must be populated when timer 1b) + if[count missingparams:where null checkparams:(`replayinterval,$[params`timer;();`timerinterval`timerfunc]) _params; + '"Not all mandatory params have been populated - ", "," sv string missingparams; + ]; + / Remove optional where, when not used + if[`where in key params; + if[not count params`where;params:`where _params] + ]; params }; diff --git a/code/processes/backtestdb.q b/code/processes/backtestdb.q new file mode 100644 index 000000000..dffd43821 --- /dev/null +++ b/code/processes/backtestdb.q @@ -0,0 +1,4 @@ +upd:{[t;d] + .dbg.upd:(t;d); + t insert d + }; diff --git a/code/processes/backtestpub.q b/code/processes/backtestpub.q index ed68b514c..9fc5c2d2d 100644 --- a/code/processes/backtestpub.q +++ b/code/processes/backtestpub.q @@ -1,18 +1,22 @@ \d .backtest init:{[] - .servers.CONNECTIONS:`hdb`backtestdb; + .servers.CONNECTIONS:`hdb,outputdbtype; .servers.startup[]; + / Used to get data from source to replay as real time .backtest.hdb:first exec w from .servers.SERVERS where proctype=`hdb; + / Used to send results of the backtest .backtest.rdb:neg first exec w from .servers.SERVERS where procname=`backtestdb; }; datareplay:{[params;id] .dbg.replay:(params;id;.z.w); - params[`h]:first exec w from .servers.SERVERS where proctype=`hdb; + params[`h]:hdb; + / Return the messages to be replayed msgs:.datareplay.tablesToDataStream `name`version _params; / Publishing configuration of backtest, needs to run after datareplay to pick up dataconfig rdb(`upd;`config;(id;.z.p;;;;;enlist query;`h _params). params`name`version`sts`ets); + / Kick off upd/timers on the instance being backtested {neg[.z.w](`.backtest.extractmessage;x)}each msgs }; From 6063c9fed3aa9cf4f3b5af93f4aa835c62a119b4 Mon Sep 17 00:00:00 2001 From: Conor McLarnon Date: Tue, 10 Feb 2026 10:03:36 +0000 Subject: [PATCH 10/16] Backtest updates, tuesday demo --- code/common/backtest.q | 9 ++++++--- code/processes/backtestpub.q | 8 +++++--- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/code/common/backtest.q b/code/common/backtest.q index d144e42cb..6ecf1328c 100644 --- a/code/common/backtest.q +++ b/code/common/backtest.q @@ -8,11 +8,14 @@ initRan:0b; test:`name`version`tabs`sts`ets`replayinterval`timer`timerinterval`timerfunc!(`vwappublisher;1;`trade;2026.01.22D00:00:00.00;2026.01.22D01:00:00.00;0Nn;1b;0D00:10:00.00;`.vwapsub.logvwap); init:{[] - proctypes:exec proctype from(("SSSS";enlist",")0:hsym`$getenv`TORQPROCESSES)where proctype like "*backtest*"; + system"l ",getenv[`KDBCONFIG],"/settings/backtest.q"; + proctypes:outputdbtype,pubproctype; + rdbname:dbproc; + pubname:pubproc; .servers.registerfromdiscovery[proctypes;1b]; servers:select w, procname from .servers.SERVERS where proctype in proctypes; - .backtest.rdbh:neg first exec w from servers where procname like "*db"; - .backtest.pubh:neg first exec w from servers where procname like "*pub*"; + .backtest.rdbh:neg first exec w from servers where procname=rdbname; + .backtest.pubh:neg first exec w from servers where procname=pubname; `.u.pub set .backtest.pub; .backtest.initRan:1b; }; diff --git a/code/processes/backtestpub.q b/code/processes/backtestpub.q index 9fc5c2d2d..6eba3edc7 100644 --- a/code/processes/backtestpub.q +++ b/code/processes/backtestpub.q @@ -1,12 +1,14 @@ \d .backtest init:{[] - .servers.CONNECTIONS:`hdb,outputdbtype; + .servers.CONNECTIONS:inputdbtype,outputdbtype; .servers.startup[]; + hdbtype:inputdbtype; + rdbname:dbproc; / Used to get data from source to replay as real time - .backtest.hdb:first exec w from .servers.SERVERS where proctype=`hdb; + .backtest.hdb:first exec w from .servers.SERVERS where proctype=hdbtype; / Used to send results of the backtest - .backtest.rdb:neg first exec w from .servers.SERVERS where procname=`backtestdb; + .backtest.rdb:neg first exec w from .servers.SERVERS where procname=rdbname; }; datareplay:{[params;id] From 36366d1efd1639d37792f4cc8441e9140e8677b4 Mon Sep 17 00:00:00 2001 From: Conor McLarnon Date: Tue, 10 Feb 2026 10:04:52 +0000 Subject: [PATCH 11/16] Adding settings for names --- config/settings/backtest.q | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 config/settings/backtest.q diff --git a/config/settings/backtest.q b/config/settings/backtest.q new file mode 100644 index 000000000..13e152cd6 --- /dev/null +++ b/config/settings/backtest.q @@ -0,0 +1,9 @@ +\d .backtest + +dbproc:`backtestdb; +inputdbtype:`hdb; +outputdbtype:`backtestdb; +pubproc:`backtestpub; +pubproctype:`backtest; + +\d . From ff03872fe9d345181b6e1cfcd38c5f953c18d636 Mon Sep 17 00:00:00 2001 From: Conor McLarnon Date: Tue, 10 Feb 2026 10:58:10 +0000 Subject: [PATCH 12/16] Tidy up hardcoding procnames --- code/common/backtest.q | 9 +++------ code/processes/backtestpub.q | 10 ++++------ config/settings/backtest.q | 9 +++++++-- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/code/common/backtest.q b/code/common/backtest.q index 6ecf1328c..6bc0f9dab 100644 --- a/code/common/backtest.q +++ b/code/common/backtest.q @@ -9,13 +9,10 @@ test:`name`version`tabs`sts`ets`replayinterval`timer`timerinterval`timerfunc!(`v init:{[] system"l ",getenv[`KDBCONFIG],"/settings/backtest.q"; - proctypes:outputdbtype,pubproctype; - rdbname:dbproc; - pubname:pubproc; .servers.registerfromdiscovery[proctypes;1b]; - servers:select w, procname from .servers.SERVERS where proctype in proctypes; - .backtest.rdbh:neg first exec w from servers where procname=rdbname; - .backtest.pubh:neg first exec w from servers where procname=pubname; + servers:.servers.getservers[`proctype;`backtest`backtestdb;()!();0b;0b]; + .backtest.rdbh:neg first exec w from .servers.getservers[`procname;dbprocname;()!();0b;0b]; + .backtest.pubh:neg first exec w from .servers.getservers[`procname;pubprocname;()!();0b;0b]; `.u.pub set .backtest.pub; .backtest.initRan:1b; }; diff --git a/code/processes/backtestpub.q b/code/processes/backtestpub.q index 6eba3edc7..a047c7722 100644 --- a/code/processes/backtestpub.q +++ b/code/processes/backtestpub.q @@ -3,21 +3,19 @@ init:{[] .servers.CONNECTIONS:inputdbtype,outputdbtype; .servers.startup[]; - hdbtype:inputdbtype; - rdbname:dbproc; / Used to get data from source to replay as real time - .backtest.hdb:first exec w from .servers.SERVERS where proctype=hdbtype; + .backtest.hdbh:first exec w from .servers.getservers[`proctype;inputdbtype;()!();0b;0b]; / Used to send results of the backtest - .backtest.rdb:neg first exec w from .servers.SERVERS where procname=rdbname; + .backtest.rdbh:neg first exec w from .servers.getservers[`procname;dbprocname;()!();0b;0b]; }; datareplay:{[params;id] .dbg.replay:(params;id;.z.w); - params[`h]:hdb; + params[`h]:hdbh; / Return the messages to be replayed msgs:.datareplay.tablesToDataStream `name`version _params; / Publishing configuration of backtest, needs to run after datareplay to pick up dataconfig - rdb(`upd;`config;(id;.z.p;;;;;enlist query;`h _params). params`name`version`sts`ets); + rdbh(`upd;`config;(id;.z.p;;;;;enlist query;`h _params). params`name`version`sts`ets); / Kick off upd/timers on the instance being backtested {neg[.z.w](`.backtest.extractmessage;x)}each msgs }; diff --git a/config/settings/backtest.q b/config/settings/backtest.q index 13e152cd6..c623b13b6 100644 --- a/config/settings/backtest.q +++ b/config/settings/backtest.q @@ -1,9 +1,14 @@ \d .backtest -dbproc:`backtestdb; +/ Used for data replay inputdbtype:`hdb; +/Used to capture results +dbprocname:`backtestdb; outputdbtype:`backtestdb; -pubproc:`backtestpub; +/Used to publish data from replay to engine and results +pubprocname:`backtestpub; pubproctype:`backtest; +/ Used by engine getting backtested +proctypes:outputdbtype,pubproctype; \d . From 2353a788fdf535d8e04a4ff8e81d48e1d1fa1615 Mon Sep 17 00:00:00 2001 From: Conor McLarnon Date: Tue, 10 Feb 2026 15:04:29 +0000 Subject: [PATCH 13/16] Adding removal of tp sub --- code/common/backtest.q | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/code/common/backtest.q b/code/common/backtest.q index 6bc0f9dab..2c7d420fd 100644 --- a/code/common/backtest.q +++ b/code/common/backtest.q @@ -10,6 +10,8 @@ test:`name`version`tabs`sts`ets`replayinterval`timer`timerinterval`timerfunc!(`v init:{[] system"l ",getenv[`KDBCONFIG],"/settings/backtest.q"; .servers.registerfromdiscovery[proctypes;1b]; + / Unsubscribe from tp (Probably a better way of doing this to get the exact upstream process) + .servers.removerows exec i from .servers.SERVERS where proctype like "*tickerplant"; servers:.servers.getservers[`proctype;`backtest`backtestdb;()!();0b;0b]; .backtest.rdbh:neg first exec w from .servers.getservers[`procname;dbprocname;()!();0b;0b]; .backtest.pubh:neg first exec w from .servers.getservers[`procname;pubprocname;()!();0b;0b]; @@ -40,6 +42,7 @@ run:{[params] pubh(`.backtest.datareplay;params;.backtest.id); }; +/ Cannot run .backtest.run params until params is in correct format with helpful instruction on how to fix validateparams:{[params] if[.proc.procname like "*backtest*"; '"Backtest should be ran from the process you are backtesting not backtest instance itself"; @@ -66,4 +69,5 @@ validateparams:{[params] params }; + \d . From e511f8100e32139712ced7beb3db7fca8a220984 Mon Sep 17 00:00:00 2001 From: Conor McLarnon Date: Tue, 10 Feb 2026 16:53:06 +0000 Subject: [PATCH 14/16] Updating names of procs --- config/settings/backtest.q | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/config/settings/backtest.q b/config/settings/backtest.q index c623b13b6..3a1228fe7 100644 --- a/config/settings/backtest.q +++ b/config/settings/backtest.q @@ -3,10 +3,10 @@ / Used for data replay inputdbtype:`hdb; /Used to capture results -dbprocname:`backtestdb; +dbprocname:`backtestrdb1; outputdbtype:`backtestdb; /Used to publish data from replay to engine and results -pubprocname:`backtestpub; +pubprocname:`backtestpub1; pubproctype:`backtest; / Used by engine getting backtested proctypes:outputdbtype,pubproctype; From 5704930aa5160d86948dc8566b74456342b80a89 Mon Sep 17 00:00:00 2001 From: Conor McLarnon Date: Tue, 10 Feb 2026 16:55:39 +0000 Subject: [PATCH 15/16] Adding settings for db --- config/settings/backtestdb.q | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 config/settings/backtestdb.q diff --git a/config/settings/backtestdb.q b/config/settings/backtestdb.q new file mode 100644 index 000000000..770089bc4 --- /dev/null +++ b/config/settings/backtestdb.q @@ -0,0 +1,2 @@ +config:([] backtestid:`guid$(); time:`timestamp$(); backtesttype:`symbol$(); backtestversion:`int$(); simstart:`timestamp$(); simend:`timestamp$(); dataconfig:(); componentconfig:()); +output:([] time:`timestamp$(); backtestid:`guid$(); simtime:`timestamp$(); name:`symbol$(); data:()); From 9323cbdac66ddcda4ed4cb0094165b99dc6eed7b Mon Sep 17 00:00:00 2001 From: Conor McLarnon Date: Wed, 11 Feb 2026 11:19:16 +0000 Subject: [PATCH 16/16] Update torq to add backtest mode --- torq.q | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/torq.q b/torq.q index dfab9433d..c7455b686 100644 --- a/torq.q +++ b/torq.q @@ -59,7 +59,8 @@ stdoptionusage:@[value;`stdoptionusage;"Standard options: [-localtime]:\t\t\tuse local time instead of GMT [-usage]:\t\t\tprint usage info [-test]:\t\t\tset to run unit tests - [-jsonlogs]:\t\t\toutput logs in json format"] + [-jsonlogs]:\t\t\toutput logs in json format" + [-backtest]:\t\t\tset to run backtest mode] // extra info - used to extend the usage info extrausage:@[value;`extrausage;""] @@ -711,3 +712,8 @@ if[(`test in key .proc.params); .lg.e[`init;"environment variable KDBTESTS undefined"] ] ]; + +// Overwrite pub/sub channels when in backtest mode +if[`backtest in key .proc.params; + .backtest.init[]; + ];