diff --git a/CMakeLists.txt b/CMakeLists.txt index 99ddd11..25d0dbd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -15,6 +15,7 @@ option(KUN_AVX512 "Enable AVX512 instruction set" OFF) option(KUN_AVX512DQ "Enable AVX512DQ instruction set" OFF) option(KUN_AVX512VL "Enable AVX512VL instruction set" OFF) option(KUN_NO_AVX2 "Disable AVX2 and FMA instruction set" OFF) +option(KUN_SANITIZER "Enable sanitizer" OFF) if (CMAKE_CXX_COMPILER_ID MATCHES "(Clang|GNU|AppleClang)") @@ -36,6 +37,9 @@ if (CMAKE_CXX_COMPILER_ID MATCHES "(Clang|GNU|AppleClang)") if(KUN_AVX512VL) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mavx512vl") endif() + if(KUN_SANITIZER) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address -static-libasan") + endif() else() set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd4251 /wd4200 /wd4305") if (NOT KUN_NO_AVX2) diff --git a/KunQuant/passes/CodegenCpp.py b/KunQuant/passes/CodegenCpp.py index 83c4904..59d8f54 100644 --- a/KunQuant/passes/CodegenCpp.py +++ b/KunQuant/passes/CodegenCpp.py @@ -97,13 +97,13 @@ def codegen_cpp(prefix: str, f: Function, input_name_to_idx: Dict[str, int], inp return "", f'''static auto stage_{prefix}__{f.name} = {f.ops[1].__class__.__name__}Stocks, Mapper{f.ops[2].attrs["layout"]}<{elem_type}, {simd_lanes}>>;''' is_cross_sectional = _is_cross_sectional(f) - time_or_stock, ctx_or_stage = ("__time_idx", "RuntimeStage *stage") if is_cross_sectional else ("__stock_idx", "Context* __ctx") + time_or_stock = "__time_idx" if is_cross_sectional else "__stock_idx" func_name = _generate_cross_sectional_func_name(is_cross_sectional, inputs, outputs) if is_cross_sectional else f.name - header = f'''{"static " if static else ""}void stage_{prefix}__{func_name}({ctx_or_stage}, size_t {time_or_stock}, size_t __total_time, size_t __start, size_t __length) ''' + header = f'''{"static " if static else ""}void stage_{prefix}__{func_name}(RuntimeStage *stage, size_t {time_or_stock}, size_t __total_time, size_t __start, size_t __length) ''' if static: decl = "" else: - decl = f'''extern void stage_{prefix}__{func_name}({ctx_or_stage}, size_t {time_or_stock}, size_t __total_time, size_t __start, size_t __length);''' + decl = f'''extern void stage_{prefix}__{func_name}(RuntimeStage *stage, size_t {time_or_stock}, size_t __total_time, size_t __start, size_t __length);''' if is_cross_sectional: decl = f"{decl}\nstatic auto stage_{prefix}__{f.name} = stage_{prefix}__{func_name};" if func_name in generated_cross_sectional_func: @@ -138,6 +138,7 @@ def codegen_cpp(prefix: str, f: Function, input_name_to_idx: Dict[str, int], inp }}''', decl toplevel = _CppScope(None) + toplevel.scope.append(_CppSingleLine(toplevel, f"auto __ctx = stage->ctx;")) buffer_type: Dict[OpBase, str] = dict() ptrname = "" if elem_type == "float" else "D" for inp, buf_kind in inputs: diff --git a/Readme.md b/Readme.md index 20e9d28..4445d4b 100644 --- a/Readme.md +++ b/Readme.md @@ -118,7 +118,7 @@ lib = cfake.compileit([("alpha101", f, KunCompilerConfig(input_layout="TS", outp modu = lib.getModule("alpha101") ``` -We will explain the function `cfake.compileit` in [Customize.md](./Customize.md). Let's continue to see how to use the compiled `lib`. +We will explain the function `cfake.compileit` in [Customize.md](./doc/Customize.md). Let's continue to see how to use the compiled `lib`. Load your stock data. In this example, load from local pandas files. We assume the open, close, high, low, volumn and amount data for different stocks are stored in different files. @@ -231,7 +231,7 @@ Note that the executors are reusable. A multithread executor is actually a threa ## Customized factors -KunQuant is a tool for general expressions. You can further read [Customize.md](./Customize.md) for how you can compile your own customized factors. This document also provides infomation on +KunQuant is a tool for general expressions. You can further read [Customize.md](./doc/Customize.md) for how you can compile your own customized factors. This document also provides infomation on * building and keeping the compilation result for later use * Loading existing compiled factor library * enabling AVX512 @@ -290,34 +290,22 @@ On x86-64 CPUs, AVX2-FMA is used by default in the built KunQuant core library. ## Streaming mode -KunQuant can be configured to generate factor libraries for streaming, when the data arrive one at a time. See [Stream.md](./Stream.md) +KunQuant can be configured to generate factor libraries for streaming, when the data arrive one at a time. See [Stream.md](./doc/Stream.md) -## Row-to-row correlation (for IC/IR calculation) +## Utility functions -```python -from KunQuant.runner import KunRunner as kr -data1 = ... # np.ndarray of shape [time*stocks]. For example, a factor's results -data2 = ... # np.ndarray of shape [time*stocks]. For example, a factor's results -valid_in = {"alpha1": data1, "alpha2": data2} -returns = ... # np.ndarray of shape [time*stocks]. For example, the rank of returns -valid_corr = {"alpha1": np.empty((time,), dtype="float32"), "alpha2": np.empty((time,), dtype="float32")} -kr.corrWith(executor, valid_in, returns, valid_corr, layout = "TS", rank_inputs = True) -# outputs in valid_corr -alpha1_ic = valid_corr["alpha1"].mean() -``` - -The parameter `rank_inputs=True` will first compute rank in the first input array (e.g. `valid_in` above) and compute the correlation with the second input (e.g. `returns` above). It will not compute the rank of the second input. +To compute row-to-row correlation (for IC/IR calculation) and aggregrating functions (like `pd.groupby(...)`), please see [Utility.md](./doc/Utility.md). ## Using C-style APIs -KunQuant provides C-style APIs to call the generated factor code in shared libraries. See [CAPI.md](./CAPI.md) +KunQuant provides C-style APIs to call the generated factor code in shared libraries. See [CAPI.md](./doc/CAPI.md) ## Operator definitions -See [Operators.md](./Operators.md) +See [Operators.md](./doc/Operators.md) -To add new operators, see [NewOperators.md](./NewOperators.md) +To add new operators, see [NewOperators.md](./doc/NewOperators.md) ## Testing and validation diff --git a/cpp/Kun/Aggregration.cpp b/cpp/Kun/Aggregration.cpp new file mode 100644 index 0000000..76befb5 --- /dev/null +++ b/cpp/Kun/Aggregration.cpp @@ -0,0 +1,122 @@ + +#include +#include +#include +#include +#include + +namespace kun { +namespace ops { + +template +static void aggregration(RuntimeStage *stage, size_t stock_idx, + size_t __total_time, size_t __start, size_t __length) { + auto num_stock = stage->ctx->stock_count; + auto &buffers = stage->ctx->buffers; + auto &module_in_buffers = stage->stage->in_buffers; + auto &module_out_buffers = stage->stage->out_buffers; + auto &lablebuf = buffers[module_in_buffers[0]->id]; + auto &inbuf_orig = buffers[module_in_buffers[1]->id]; + + auto &sumbuf_orig = buffers[module_out_buffers[AGGREGRATION_SUM]->id]; + auto &minbuf_orig = buffers[module_out_buffers[AGGREGRATION_MIN]->id]; + auto &maxbuf_orig = buffers[module_out_buffers[AGGREGRATION_MAX]->id]; + auto &firstbuf_orig = buffers[module_out_buffers[AGGREGRATION_FIRST]->id]; + auto &lastbuf_orig = buffers[module_out_buffers[AGGREGRATION_LAST]->id]; + auto &countbuf_orig = buffers[module_out_buffers[AGGREGRATION_COUNT]->id]; + auto &meanbuf_orig = buffers[module_out_buffers[AGGREGRATION_MEAN]->id]; + + T *labels = lablebuf.getPtr() + __start; + InputTS inbuf{inbuf_orig.getPtr(), stock_idx, num_stock, + __total_time, __start}; + + OutputTS sumbuf{sumbuf_orig.getPtr(), stock_idx, num_stock, + __total_time, 0}; + OutputTS minbuf{minbuf_orig.getPtr(), stock_idx, num_stock, + __total_time, 0}; + OutputTS maxbuf{maxbuf_orig.getPtr(), stock_idx, num_stock, + __total_time, 0}; + OutputTS firstbuf{firstbuf_orig.getPtr(), stock_idx, + num_stock, __total_time, 0}; + OutputTS lastbuf{lastbuf_orig.getPtr(), stock_idx, num_stock, + __total_time, 0}; + OutputTS countbuf{countbuf_orig.getPtr(), stock_idx, + num_stock, __total_time, 0}; + OutputTS meanbuf{meanbuf_orig.getPtr(), stock_idx, num_stock, + __total_time, 0}; + + using SimdT = kun_simd::vec; + ReduceMin reduce_min; + ReduceMax reduce_max; + ReduceAdd reduce_add; + SimdT first = inbuf.step(0); + SimdT last; + SimdT count{0}; + + auto todo_count = num_stock - stock_idx * simdlen; + todo_count = todo_count > simdlen ? simdlen : todo_count; + auto mask = SimdT::make_mask(todo_count); + T last_label = labels[0]; + size_t store_idx = 0; + for (size_t i = 0; i < __length; i++) { + auto label = labels[i]; + auto cur = inbuf.step(i); + if (label != last_label) { + if (sumbuf.buf) + sumbuf.store(store_idx, reduce_add, mask); + if (minbuf.buf) + minbuf.store(store_idx, reduce_min, mask); + if (maxbuf.buf) + maxbuf.store(store_idx, reduce_max, mask); + if (firstbuf.buf) + firstbuf.store(store_idx, first, mask); + if (lastbuf.buf) + lastbuf.store(store_idx, last, mask); + if (countbuf.buf) + countbuf.store(store_idx, count, mask); + if (meanbuf.buf) + meanbuf.store(store_idx, reduce_add / count, mask); + store_idx++; + first = cur; + reduce_min = ReduceMin{}; + reduce_max = ReduceMax{}; + reduce_add = ReduceAdd{}; + count = T{0}; + } + count = count + T{1}; + last = cur; + last_label = label; + reduce_max.step(cur, i); + reduce_min.step(cur, i); + reduce_add.step(cur, i); + } + if (sumbuf.buf) + sumbuf.store(store_idx, reduce_add, mask); + if (minbuf.buf) + minbuf.store(store_idx, reduce_min, mask); + if (maxbuf.buf) + maxbuf.store(store_idx, reduce_max, mask); + if (firstbuf.buf) + firstbuf.store(store_idx, first, mask); + if (lastbuf.buf) + lastbuf.store(store_idx, last, mask); + if (countbuf.buf) + countbuf.store(store_idx, count, mask); + if (meanbuf.buf) + meanbuf.store(store_idx, reduce_add / count, mask); +} + +void aggregrationFloat(RuntimeStage *stage, size_t stock_idx, + size_t __total_time, size_t __start, size_t __length) { + aggregration( + stage, stock_idx, __total_time, __start, __length); +} + +void aggregrationDouble(RuntimeStage *stage, size_t stock_idx, + size_t __total_time, size_t __start, size_t __length) { + aggregration( + stage, stock_idx, __total_time, __start, __length); +} + +} // namespace ops +} // namespace kun \ No newline at end of file diff --git a/cpp/Kun/Base.hpp b/cpp/Kun/Base.hpp index 06daeb0..4fb3589 100644 --- a/cpp/Kun/Base.hpp +++ b/cpp/Kun/Base.hpp @@ -4,7 +4,7 @@ #ifdef __cplusplus namespace kun { struct Context; -static constexpr size_t time_stride = 8; +constexpr size_t time_stride = 8; } // namespace kun #endif @@ -25,4 +25,14 @@ static constexpr size_t time_stride = 8; // g++ has an strange behavior, it needs T to be // exported if we want to export func #define KUN_TEMPLATE_ARG KUN_API -#endif \ No newline at end of file +#endif + + +#ifdef __AVX__ +#define KUN_DEFAULT_FLOAT_SIMD_LEN 8 +#define KUN_DEFAULT_DOUBLE_SIMD_LEN 4 +#else +// neon +#define KUN_DEFAULT_FLOAT_SIMD_LEN 4 +#define KUN_DEFAULT_DOUBLE_SIMD_LEN 2 +#endif diff --git a/cpp/Kun/CorrWith.cpp b/cpp/Kun/CorrWith.cpp index 5187189..44f2218 100644 --- a/cpp/Kun/CorrWith.cpp +++ b/cpp/Kun/CorrWith.cpp @@ -8,10 +8,10 @@ namespace ops { RuntimeStage * stage, size_t time_idx, size_t __total_time, \ size_t __start, size_t __length); -DEF_INSTANCE(CorrWith, MapperSTs) -DEF_INSTANCE(CorrWith, MapperTS) -DEF_INSTANCE(RankCorrWith, MapperSTs) -DEF_INSTANCE(RankCorrWith, MapperTS) +DEF_INSTANCE(CorrWith, MapperSTsFloat) +DEF_INSTANCE(CorrWith, MapperTSFloat) +DEF_INSTANCE(RankCorrWith, MapperSTsFloat) +DEF_INSTANCE(RankCorrWith, MapperTSFloat) } // namespace ops } // namespace kun \ No newline at end of file diff --git a/cpp/Kun/CorrWith.hpp b/cpp/Kun/CorrWith.hpp index 245d64d..4ef7cea 100644 --- a/cpp/Kun/CorrWith.hpp +++ b/cpp/Kun/CorrWith.hpp @@ -9,7 +9,7 @@ namespace kun { namespace ops { template -void KUN_TEMPLATE_EXPORT CorrWith(RuntimeStage *stage, size_t time_idx, +void CorrWith(RuntimeStage *stage, size_t time_idx, size_t __total_time, size_t __start, size_t __length) { auto num_stocks = stage->ctx->stock_count; @@ -54,7 +54,7 @@ void KUN_TEMPLATE_EXPORT CorrWith(RuntimeStage *stage, size_t time_idx, } template -void KUN_TEMPLATE_EXPORT RankCorrWith(RuntimeStage *stage, size_t time_idx, +void RankCorrWith(RuntimeStage *stage, size_t time_idx, size_t __total_time, size_t __start, size_t __length) { auto num_stocks = stage->ctx->stock_count; @@ -120,22 +120,22 @@ void KUN_TEMPLATE_EXPORT RankCorrWith(RuntimeStage *stage, size_t time_idx, } } -extern template void CorrWith>(RuntimeStage *stage, +extern template void CorrWith(RuntimeStage *stage, size_t time_idx, size_t __total_time, size_t __start, size_t __length); -extern template void CorrWith>(RuntimeStage *stage, +extern template void CorrWith(RuntimeStage *stage, size_t time_idx, size_t __total_time, size_t __start, size_t __length); -extern template void RankCorrWith>(RuntimeStage *stage, +extern template void RankCorrWith(RuntimeStage *stage, size_t time_idx, size_t __total_time, size_t __start, size_t __length); -extern template void RankCorrWith>(RuntimeStage *stage, +extern template void RankCorrWith(RuntimeStage *stage, size_t time_idx, size_t __total_time, size_t __start, diff --git a/cpp/Kun/LayoutMappers.hpp b/cpp/Kun/LayoutMappers.hpp index 979e3e7..91be213 100644 --- a/cpp/Kun/LayoutMappers.hpp +++ b/cpp/Kun/LayoutMappers.hpp @@ -55,6 +55,13 @@ struct KUN_TEMPLATE_ARG MapperSTREAM { } }; +using MapperSTsFloat = MapperSTs; +using MapperTSFloat = MapperTS; +using MapperSTREAMFloat = MapperSTREAM; +using MapperSTsDouble = MapperSTs; +using MapperTSDouble = MapperTS; +using MapperSTREAMDouble = MapperSTREAM; + namespace { template struct ExtractInputBuffer { diff --git a/cpp/Kun/Rank.cpp b/cpp/Kun/Rank.cpp index c70d440..6c3c405 100644 --- a/cpp/Kun/Rank.cpp +++ b/cpp/Kun/Rank.cpp @@ -8,11 +8,11 @@ namespace ops { RuntimeStage * stage, size_t time_idx, size_t __total_time, \ size_t __start, size_t __length); -DEF_INSTANCE(MapperSTs, MapperSTs) -DEF_INSTANCE(MapperSTs, MapperTS) -DEF_INSTANCE(MapperTS, MapperTS) -DEF_INSTANCE(MapperTS, MapperSTs) -DEF_INSTANCE(MapperSTREAM, MapperSTREAM) +DEF_INSTANCE(MapperSTsFloat, MapperSTsFloat) +DEF_INSTANCE(MapperSTsFloat, MapperTSFloat) +DEF_INSTANCE(MapperTSFloat, MapperTSFloat) +DEF_INSTANCE(MapperTSFloat, MapperSTsFloat) +DEF_INSTANCE(MapperSTREAMFloat, MapperSTREAMFloat) } // namespace ops } // namespace kun \ No newline at end of file diff --git a/cpp/Kun/Rank.hpp b/cpp/Kun/Rank.hpp index 193f2a7..8938aeb 100644 --- a/cpp/Kun/Rank.hpp +++ b/cpp/Kun/Rank.hpp @@ -56,19 +56,19 @@ void KUN_TEMPLATE_EXPORT RankStocks(RuntimeStage *stage, size_t time_idx, } } -extern template void RankStocks, MapperSTs>( +extern template void RankStocks( RuntimeStage *stage, size_t time_idx, size_t __total_time, size_t __start, size_t __length); -extern template void RankStocks, MapperTS>( +extern template void RankStocks( RuntimeStage *stage, size_t time_idx, size_t __total_time, size_t __start, size_t __length); -extern template void RankStocks, MapperTS>( +extern template void RankStocks( RuntimeStage *stage, size_t time_idx, size_t __total_time, size_t __start, size_t __length); -extern template void RankStocks, MapperSTs>( +extern template void RankStocks( RuntimeStage *stage, size_t time_idx, size_t __total_time, size_t __start, size_t __length); -extern template void RankStocks, MapperSTREAM>( +extern template void RankStocks( RuntimeStage *stage, size_t time_idx, size_t __total_time, size_t __start, size_t __length); diff --git a/cpp/Kun/RunGraph.hpp b/cpp/Kun/RunGraph.hpp index ddb6775..0470bf4 100644 --- a/cpp/Kun/RunGraph.hpp +++ b/cpp/Kun/RunGraph.hpp @@ -3,8 +3,8 @@ #include "Context.hpp" #include "Module.hpp" #include "StateBuffer.hpp" -#include #include +#include #include namespace kun { @@ -12,21 +12,18 @@ KUN_API void runGraph(std::shared_ptr exec, const Module *m, std::unordered_map &buffers, size_t num_stocks, size_t total_time, size_t cur_time, size_t length); -KUN_API void corrWith(std::shared_ptr exec, MemoryLayout layout, bool rank_inputs, - std::vector& buffers, - float* corr_with_buffer, - std::vector& outbuffers, - size_t num_stocks, size_t total_time, size_t cur_time, - size_t length); +KUN_API void corrWith(std::shared_ptr exec, MemoryLayout layout, + bool rank_inputs, std::vector &buffers, + float *corr_with_buffer, std::vector &outbuffers, + size_t num_stocks, size_t total_time, size_t cur_time, + size_t length); struct AlignedPtr { - void* ptr; + void *ptr; size_t size; - char* get() const noexcept { - return (char*)ptr; - } - AlignedPtr(void* ptr, size_t size) noexcept; - AlignedPtr(AlignedPtr&& other) noexcept; - AlignedPtr& operator=(AlignedPtr&& other) noexcept; + char *get() const noexcept { return (char *)ptr; } + AlignedPtr(void *ptr, size_t size) noexcept; + AlignedPtr(AlignedPtr &&other) noexcept; + AlignedPtr &operator=(AlignedPtr &&other) noexcept; void release() noexcept; ~AlignedPtr(); }; @@ -37,7 +34,7 @@ struct KUN_API StreamContext { Context ctx; const Module *m; StreamContext(std::shared_ptr exec, const Module *m, - size_t num_stocks, InputStreamBase* states = nullptr); + size_t num_stocks, InputStreamBase *states = nullptr); // query the buffer handle of a named buffer size_t queryBufferHandle(const char *name) const; // get the current readable position of the named buffer. The returned @@ -49,10 +46,32 @@ struct KUN_API StreamContext { void pushData(size_t handle, const float *data); void pushData(size_t handle, const double *data); void run(); - StreamContext(const StreamContext&) = delete; - StreamContext& operator=(const StreamContext&) = delete; + StreamContext(const StreamContext &) = delete; + StreamContext &operator=(const StreamContext &) = delete; ~StreamContext(); - bool serializeStates(OutputStreamBase* stream); + bool serializeStates(OutputStreamBase *stream); }; +enum AggregrationKind { + AGGREGRATION_SUM = 0, + AGGREGRATION_MIN, + AGGREGRATION_MAX, + AGGREGRATION_FIRST, + AGGREGRATION_LAST, + AGGREGRATION_COUNT, + AGGREGRATION_MEAN, + AGGREGRATION_NUM_KINDS, +}; + +struct AggregrationOutput { + float *buffers[AGGREGRATION_NUM_KINDS] = {nullptr}; +}; + +KUN_API void aggregrate(std::shared_ptr exec, + size_t num_aggregrations, float **buffers, + float **labels, Datatype dtype, + const AggregrationOutput *outbuffers, + size_t num_stocks, size_t total_time, size_t cur_time, + size_t length); + } // namespace kun \ No newline at end of file diff --git a/cpp/Kun/Runtime.cpp b/cpp/Kun/Runtime.cpp index af42860..475de22 100644 --- a/cpp/Kun/Runtime.cpp +++ b/cpp/Kun/Runtime.cpp @@ -72,7 +72,7 @@ void checkedDealloc(void *ptr, size_t sz) { namespace kun { void Buffer::alloc(size_t count, size_t use_count, size_t elem_size) { - if (!ptr) { + if (!ptr && count > 0) { ptr = (float *)kunAlignedAlloc(KUN_MALLOC_ALIGNMENT, count * elem_size); refcount = (int)use_count; #if CHECKED_PTR @@ -94,7 +94,7 @@ void Buffer::deref() { Buffer::~Buffer() { if (ptr && refcount.load() >= 0) { - free(ptr); + kunAlignedFree(ptr); } } @@ -107,15 +107,12 @@ size_t RuntimeStage::getNumTasks() const { bool RuntimeStage::doJob() { auto cur_idx = doing_index.load(); auto num_tasks = getNumTasks(); + auto total_time = ctx->total_time; + auto start = ctx->start; + auto length = ctx->length; while (cur_idx < num_tasks) { if (doing_index.compare_exchange_strong(cur_idx, cur_idx + 1)) { - if (stage->kind == TaskExecKind::SLICE_BY_STOCK) { - stage->f.f(ctx, cur_idx, ctx->total_time, ctx->start, - ctx->length); - } else { - stage->f.rankf(this, cur_idx, ctx->total_time, ctx->start, - ctx->length); - } + stage->f.f(this, cur_idx, total_time, start, length); if (!onDone(1)) { return false; } @@ -163,6 +160,23 @@ bool RuntimeStage::onDone(size_t cnt) { return true; } +static void setContextStagesAndRun(Context &ctx, Stage *mstages, + size_t num_stages, Executor *exec) { + std::vector &stages = ctx.stages; + stages.reserve(num_stages); + for (size_t i = 0; i < num_stages; i++) { + auto &stage = mstages[i]; + stages.emplace_back(&stage, &ctx); + } + for (size_t i = 0; i < num_stages; i++) { + auto &stage = mstages[i]; + if (stage.orig_pending == 0) { + stages[i].enqueue(); + } + } + exec->runUntilDone(); +} + void corrWith(std::shared_ptr exec, MemoryLayout layout, bool rank_inputs, std::vector &buffers, float *corr_with_buffer, std::vector &outbuffers, @@ -171,15 +185,15 @@ void corrWith(std::shared_ptr exec, MemoryLayout layout, decltype(&ops::CorrWith>) thefunc = nullptr; if (layout == MemoryLayout::TS) { if (rank_inputs) { - thefunc = &ops::RankCorrWith>; + thefunc = &ops::RankCorrWith; } else { - thefunc = &ops::CorrWith>; + thefunc = &ops::CorrWith; } } else { if (rank_inputs) { - thefunc = &ops::RankCorrWith>; + thefunc = &ops::RankCorrWith; } else { - thefunc = &ops::CorrWith>; + thefunc = &ops::CorrWith; } } std::vector buffer_info; @@ -211,7 +225,7 @@ void corrWith(std::shared_ptr exec, MemoryLayout layout, auto *outbuf = &temp.back(); mstages.emplace_back(Stage{thefunc, nullptr, 0, inbuf, 2, outbuf, 1, 0, TaskExecKind::SLICE_BY_TIME, - buffer_info.size()}); + mstages.size()}); } Context ctx{std::move(rtlbuffers), {}, @@ -223,20 +237,75 @@ void corrWith(std::shared_ptr exec, MemoryLayout layout, length, 8, Datatype::Float, - false, nullptr}; - std::vector &stages = ctx.stages; - stages.reserve(buffers.size()); - for (size_t i = 0; i < buffers.size(); i++) { - auto &stage = mstages[i]; - stages.emplace_back(&stage, &ctx); + false, + nullptr}; + setContextStagesAndRun(ctx, mstages.data(), mstages.size(), exec.get()); +} + +namespace ops { +void aggregrationFloat(RuntimeStage *stage, size_t stock_idx, + size_t __total_time, size_t __start, size_t __length); +void aggregrationDouble(RuntimeStage *stage, size_t stock_idx, + size_t __total_time, size_t __start, size_t __length); +} // namespace ops + +void aggregrate(std::shared_ptr exec, size_t num_aggregrations, + float **buffers, float **labels, Datatype dtype, + const AggregrationOutput *outbuffers, size_t num_stocks, + size_t total_time, size_t cur_time, size_t length) { + decltype(&ops::aggregrationFloat) thefunc = nullptr; + size_t simd_len = 0; + if (dtype == Datatype::Float) { + simd_len = KUN_DEFAULT_FLOAT_SIMD_LEN; + thefunc = &ops::aggregrationFloat; + } else { + simd_len = KUN_DEFAULT_DOUBLE_SIMD_LEN; + thefunc = &ops::aggregrationDouble; } - for (size_t i = 0; i < buffers.size(); i++) { - auto &stage = mstages[i]; - if (stage.orig_pending == 0) { - stages[i].enqueue(); + std::vector buffer_info; + std::vector rtlbuffers; + std::vector mstages; + std::vector temp; + // for each aggregration, we need num_aggregration_buffers + 2 buffers + // 1 for the labels, 1 for the input and num_aggregration_buffers for the + // output + rtlbuffers.reserve((AGGREGRATION_NUM_KINDS + 2) * num_aggregrations); + buffer_info.reserve((AGGREGRATION_NUM_KINDS + 2) * num_aggregrations); + mstages.reserve(num_aggregrations); + temp.reserve((AGGREGRATION_NUM_KINDS + 2) * num_aggregrations); + auto pushBuffer = [&](float *buffer, BufferKind kind, size_t num_users) { + rtlbuffers.emplace_back(buffer, total_time); + buffer_info.emplace_back( + BufferInfo{buffer_info.size(), "", num_users, kind, 0, 0}); + temp.push_back(&buffer_info.back()); + }; + for (size_t i = 0; i < num_aggregrations; i++) { + float *input = buffers[i]; + float *label = labels[i]; + pushBuffer(label, BufferKind::INPUT, 1); + auto *inbuf = &temp.back(); + pushBuffer(input, BufferKind::INPUT, 1); + auto *outbuf = &temp.back() + 1; + for (size_t j = 0; j < AGGREGRATION_NUM_KINDS; j++) { + pushBuffer(outbuffers[i].buffers[j], BufferKind::OUTPUT, 0); } + mstages.emplace_back( + Stage{thefunc, nullptr, 0, inbuf, 2, outbuf, AGGREGRATION_NUM_KINDS, + 0, TaskExecKind::SLICE_BY_STOCK, mstages.size()}); } - exec->runUntilDone(); + Context ctx{std::move(rtlbuffers), + {}, + exec, + 0, + num_stocks, + total_time, + cur_time, + length, + simd_len, + dtype, + false, + nullptr}; + setContextStagesAndRun(ctx, mstages.data(), mstages.size(), exec.get()); } void runGraph(std::shared_ptr exec, const Module *m, @@ -277,20 +346,9 @@ void runGraph(std::shared_ptr exec, const Module *m, length, m->blocking_len, m->dtype, - false, nullptr}; - std::vector &stages = ctx.stages; - stages.reserve(m->num_stages); - for (size_t i = 0; i < m->num_stages; i++) { - auto &stage = m->stages[i]; - stages.emplace_back(&stage, &ctx); - } - for (size_t i = 0; i < m->num_stages; i++) { - auto &stage = m->stages[i]; - if (stage.orig_pending == 0) { - stages[i].enqueue(); - } - } - exec->runUntilDone(); + false, + nullptr}; + setContextStagesAndRun(ctx, m->stages, m->num_stages, exec.get()); } AlignedPtr::AlignedPtr(void *ptr, size_t size) noexcept { @@ -430,12 +488,14 @@ size_t StreamContext::queryBufferHandle(const char *name) const { const float *StreamContext::getCurrentBufferPtrFloat(size_t handle) const { auto buf = (StreamBuffer *)buffers.at(handle).get(); - return buf->getCurrentBufferPtr(ctx.stock_count, m->buffers[handle].window, m->blocking_len); + return buf->getCurrentBufferPtr(ctx.stock_count, m->buffers[handle].window, + m->blocking_len); } const double *StreamContext::getCurrentBufferPtrDouble(size_t handle) const { auto buf = (StreamBuffer *)buffers.at(handle).get(); - return buf->getCurrentBufferPtr(ctx.stock_count, m->buffers[handle].window, m->blocking_len); + return buf->getCurrentBufferPtr(ctx.stock_count, m->buffers[handle].window, + m->blocking_len); } void StreamContext::pushData(size_t handle, const float *data) { @@ -471,14 +531,13 @@ void StreamContext::run() { StreamContext::~StreamContext() = default; - -bool StreamContext::serializeStates(OutputStreamBase* stream) { - for(auto& buf: buffers) { - if(!stream->write(buf.get(), buf.size)) { +bool StreamContext::serializeStates(OutputStreamBase *stream) { + for (auto &buf : buffers) { + if (!stream->write(buf.get(), buf.size)) { return false; } } - for (auto& ptr: state_buffers) { + for (auto &ptr : state_buffers) { if (!ptr->serialize(stream)) { return false; } @@ -487,7 +546,8 @@ bool StreamContext::serializeStates(OutputStreamBase* stream) { } StateBuffer *StateBuffer::make(size_t num_objs, size_t elem_size, - CtorFn_t ctor_fn, DtorFn_t dtor_fn, SerializeFn_t serialize_fn, + CtorFn_t ctor_fn, DtorFn_t dtor_fn, + SerializeFn_t serialize_fn, DeserializeFn_t deserialize_fn) { auto ret = kunAlignedAlloc(KUN_MALLOC_ALIGNMENT, diff --git a/cpp/Kun/Scale.cpp b/cpp/Kun/Scale.cpp index 5e1ad57..19ee03d 100644 --- a/cpp/Kun/Scale.cpp +++ b/cpp/Kun/Scale.cpp @@ -7,11 +7,11 @@ namespace ops { RuntimeStage * stage, size_t time_idx, size_t __total_time, \ size_t __start, size_t __length); -DEF_INSTANCE(MapperSTs, MapperSTs) -DEF_INSTANCE(MapperSTs, MapperTS) -DEF_INSTANCE(MapperTS, MapperTS) -DEF_INSTANCE(MapperTS, MapperSTs) -DEF_INSTANCE(MapperSTREAM, MapperSTREAM) +DEF_INSTANCE(MapperSTsFloat, MapperSTsFloat) +DEF_INSTANCE(MapperSTsFloat, MapperTSFloat) +DEF_INSTANCE(MapperTSFloat, MapperTSFloat) +DEF_INSTANCE(MapperTSFloat, MapperSTsFloat) +DEF_INSTANCE(MapperSTREAMFloat, MapperSTREAMFloat) } // namespace ops } // namespace kun \ No newline at end of file diff --git a/cpp/Kun/Scale.hpp b/cpp/Kun/Scale.hpp index b808f4b..4c79005 100644 --- a/cpp/Kun/Scale.hpp +++ b/cpp/Kun/Scale.hpp @@ -45,24 +45,24 @@ KUN_TEMPLATE_EXPORT void ScaleStocks(RuntimeStage *stage, size_t time_idx, } } -extern template void ScaleStocks, MapperSTs>( +extern template void ScaleStocks( RuntimeStage *stage, size_t time_idx, size_t __total_time, size_t __start, size_t __length); -extern template void ScaleStocks, MapperTS>( - RuntimeStage *stage, size_t time_idx, size_t __total_time, size_t __start, - size_t __length); -extern template void ScaleStocks, MapperTS>( - RuntimeStage *stage, size_t time_idx, size_t __total_time, size_t __start, - size_t __length); -extern template void ScaleStocks, MapperSTs>( +extern template void +ScaleStocks(RuntimeStage *stage, size_t time_idx, + size_t __total_time, size_t __start, + size_t __length); +extern template void +ScaleStocks(RuntimeStage *stage, size_t time_idx, + size_t __total_time, size_t __start, + size_t __length); +extern template void +ScaleStocks(RuntimeStage *stage, size_t time_idx, + size_t __total_time, size_t __start, + size_t __length); +extern template void ScaleStocks( RuntimeStage *stage, size_t time_idx, size_t __total_time, size_t __start, size_t __length); -extern template void -ScaleStocks, MapperSTREAM>(RuntimeStage *stage, - size_t time_idx, - size_t __total_time, - size_t __start, - size_t __length); } // namespace ops } // namespace kun \ No newline at end of file diff --git a/cpp/Kun/Stage.hpp b/cpp/Kun/Stage.hpp index 815571c..4bd8b17 100644 --- a/cpp/Kun/Stage.hpp +++ b/cpp/Kun/Stage.hpp @@ -11,7 +11,7 @@ namespace kun { struct RuntimeStage; -using FuncType = void (*)(Context *__ctx, size_t __stock_idx, +using FuncType = void (*)(RuntimeStage *stage, size_t __stock_idx, size_t __total_time, size_t __start, size_t __length); using RankFuncType = void (*)(RuntimeStage *stage, size_t __stock_idx, size_t __total_time, size_t __start, size_t __length); @@ -43,7 +43,6 @@ union FuncHolder RankFuncType rankf; FuncType f; constexpr FuncHolder(RankFuncType r): rankf{r} {} - constexpr FuncHolder(FuncType r): f{r} {} }; diff --git a/cpp/KunSIMD/cpu/neon/cast.hpp b/cpp/KunSIMD/cpu/neon/cast.hpp index 1a5224e..5f7be5e 100644 --- a/cpp/KunSIMD/cpu/neon/cast.hpp +++ b/cpp/KunSIMD/cpu/neon/cast.hpp @@ -73,4 +73,12 @@ INLINE vec_s64x2 fast_cast(vec_f64x2 v) { } +INLINE vec_f32x4::Masktype vec_f32x4::make_mask(int N) { + return vec_s32x4{N} > vec_s32x4{0, 1, 2, 3}; +} + +INLINE vec_f64x2::Masktype vec_f64x2::make_mask(int N) { + return vec_s64x2{N} > vec_s64x2{0, 1}; +} + } // namespace kun_simd \ No newline at end of file diff --git a/cpp/KunSIMD/cpu/neon/common.hpp b/cpp/KunSIMD/cpu/neon/common.hpp new file mode 100644 index 0000000..9f8a3be --- /dev/null +++ b/cpp/KunSIMD/cpu/neon/common.hpp @@ -0,0 +1,23 @@ +#pragma once + +namespace kun_simd { +namespace { +template +void simulate_masked_store(T (&v)[N], U (&mask)[N], T *p) { + for (int i = 0; i < N; i++) { + if (mask[i]) { + p[i] = v[i]; + } + } +} + +template +void simulate_masked_load(T (&v)[N], U (&mask)[N], const T *p) { + for (int i = 0; i < N; i++) { + if (mask[i]) { + v[i] = p[i]; + } + } +} +} // namespace +} // namespace kun_simd diff --git a/cpp/KunSIMD/cpu/neon/f32x4.hpp b/cpp/KunSIMD/cpu/neon/f32x4.hpp index 7f57019..f8eccb6 100644 --- a/cpp/KunSIMD/cpu/neon/f32x4.hpp +++ b/cpp/KunSIMD/cpu/neon/f32x4.hpp @@ -19,6 +19,7 @@ #include #include "../common.hpp" #include +#include #include "s32x4.hpp" namespace kun_simd { @@ -45,7 +46,18 @@ struct alignas(16) vec { static INLINE vec load_aligned(const float *p) { return vld1q_f32(p); } static INLINE void store(vec v, float *p) { vst1q_f32(p, v.v); } static INLINE void store_aligned(vec v, float *p) { vst1q_f32(p, v.v); } + static Masktype make_mask(int N); + // slow simulated implementation + static INLINE vec masked_load(const float *p, Masktype mask) { + vec ret{0}; + simulate_masked_load(ret.raw, mask.raw, p); + return ret; + } + // slow simulated implementation + static INLINE void masked_store(vec v, float *p, Masktype mask) { + simulate_masked_store(v.raw, mask.raw, p); + } operator float32x4_t() const { return v; } }; diff --git a/cpp/KunSIMD/cpu/neon/f64x2.hpp b/cpp/KunSIMD/cpu/neon/f64x2.hpp index 4e9a488..9d95cf4 100644 --- a/cpp/KunSIMD/cpu/neon/f64x2.hpp +++ b/cpp/KunSIMD/cpu/neon/f64x2.hpp @@ -19,6 +19,7 @@ #include #include "../common.hpp" #include +#include #include "s64x2.hpp" namespace kun_simd { @@ -43,7 +44,18 @@ struct alignas(16) vec { static INLINE vec load_aligned(const double *p) { return vld1q_f64(p); } static INLINE void store(vec v, double *p) { vst1q_f64(p, v.v); } static INLINE void store_aligned(vec v, double *p) { vst1q_f64(p, v.v); } - + static Masktype make_mask(int N); + + // slow simulated implementation + static INLINE vec masked_load(const double *p, Masktype mask) { + vec ret{0}; + simulate_masked_load(ret.raw, mask.raw, p); + return ret; + } + // slow simulated implementation + static INLINE void masked_store(vec v, double *p, Masktype mask) { + simulate_masked_store(v.raw, mask.raw, p); + } operator float64x2_t() const { return v; } }; diff --git a/cpp/Python/PyBinding.cpp b/cpp/Python/PyBinding.cpp index 04fe641..6f744c4 100644 --- a/cpp/Python/PyBinding.cpp +++ b/cpp/Python/PyBinding.cpp @@ -13,9 +13,23 @@ #include #include #include +#include namespace py = pybind11; +static std::string shapeToString(const std::vector &shape) { + std::stringstream ss; + ss << "("; + for (size_t i = 0; i < shape.size(); i++) { + ss << shape[i]; + if (i != shape.size() - 1) { + ss << ", "; + } + } + ss << ")"; + return ss.str(); +} + static void expectContiguousShape(kun::Datatype dtype, const py::buffer_info &info, const char *name, const std::vector &shape) { @@ -29,7 +43,9 @@ static void expectContiguousShape(kun::Datatype dtype, } // ST8s layout if (info.ndim != shape.size() || info.shape != shape) { - throw std::runtime_error(std::string("Bad shape at ") + name); + std::stringstream ss; + ss << "Bad shape at " << name << " expected " << shapeToString(shape) << " but got " << shapeToString(info.shape); + throw std::runtime_error(ss.str()); } for (auto s : info.shape) { if (s <= 0) { @@ -64,6 +80,67 @@ struct StreamContextWrapper { : lib{m->lib}, ctx{std::move(exec), m->modu, num_stocks, states} {} }; + +void *checkInput(const py::buffer_info &info, const std::string &name, + kun::MemoryLayout mlayout, kun::Datatype dtype, + py::ssize_t &known_S, py::ssize_t &known_T, + py::ssize_t &knownNumStocks, py::ssize_t simd_len) { + if (mlayout == kun::MemoryLayout::STs) { + // ST8t layout + if (info.ndim != 3) { + throw std::runtime_error("Bad STs shape at " + name); + } + auto S = info.shape[0]; + auto T = info.shape[1]; + if (known_S == 0) { + known_S = S; + known_T = T; + knownNumStocks = known_S * simd_len; + } + expectContiguousShape(dtype, info, name.c_str(), + {known_S, known_T, simd_len}); + } else if (mlayout == kun::MemoryLayout::TS) { + // TS layout + if (info.ndim != 2) { + throw std::runtime_error("Bad TS shape at " + name); + } + auto S = info.shape[1]; + auto T = info.shape[0]; + if (known_S == 0) { + known_S = S / simd_len; + knownNumStocks = S; + } + if (known_T == 0) { + known_T = T; + } + expectContiguousShape(dtype, info, name.c_str(), + {known_T, knownNumStocks}); + } else { + throw std::runtime_error("Unknown layout at " + name); + } + return info.ptr; +} + +kun::AggregrationKind getAggregrationKind(const std::string &name) { + if (name == "sum") { + return kun::AggregrationKind::AGGREGRATION_SUM; + } else if (name == "min") { + return kun::AggregrationKind::AGGREGRATION_MIN; + } else if (name == "max") { + return kun::AggregrationKind::AGGREGRATION_MAX; + } else if (name == "first") { + return kun::AggregrationKind::AGGREGRATION_FIRST; + } else if (name == "last") { + return kun::AggregrationKind::AGGREGRATION_LAST; + } else if (name == "count") { + return kun::AggregrationKind::AGGREGRATION_COUNT; + } else if (name == "mean") { + return kun::AggregrationKind::AGGREGRATION_MEAN; + } else { + throw std::runtime_error("Unknown aggregration kind: " + name); + } +} + } // namespace PYBIND11_MODULE(KunRunner, m) { @@ -338,53 +415,21 @@ PYBIND11_MODULE(KunRunner, m) { py::ssize_t known_S = 0; py::ssize_t known_T = 0; py::ssize_t knownNumStocks = 0; - py::ssize_t simd_len = 8; + py::ssize_t simd_len = KUN_DEFAULT_FLOAT_SIMD_LEN; std::vector bufinputs; std::vector bufoutputs; - auto check_input = [&](py::buffer buf_obj, - const std::string &name) -> float * { - auto info = buf_obj.request(); - if (mlayout == kun::MemoryLayout::STs) { - // ST8t layout - if (info.ndim != 3) { - throw std::runtime_error("Bad STs shape at " + name); - } - auto S = info.shape[0]; - auto T = info.shape[1]; - if (known_S == 0) { - known_S = S; - known_T = T; - knownNumStocks = known_S * simd_len; - } - expectContiguousShape(kun::Datatype::Float, info, - name.c_str(), - {known_S, known_T, simd_len}); - } else if (mlayout == kun::MemoryLayout::TS) { - // TS layout - if (info.ndim != 2) { - throw std::runtime_error("Bad TS shape at " + name); - } - auto S = info.shape[1]; - auto T = info.shape[0]; - if (known_S == 0) { - known_S = S / simd_len; - known_T = T; - knownNumStocks = S; - } - expectContiguousShape(kun::Datatype::Float, info, - name.c_str(), - {known_T, knownNumStocks}); - } else { - throw std::runtime_error("Unknown layout at " + name); - } - return (float *)info.ptr; - }; - float *bufcorr_with = check_input(corr_with, "corr_with"); + + float *bufcorr_with = (float *)checkInput( + corr_with.request(), "corr_with", mlayout, kun::Datatype::Float, + known_S, known_T, knownNumStocks, simd_len); int idx = -1; for (auto buf_obj : inputs) { idx += 1; - bufinputs.push_back(check_input( - buf_obj, std::string("buffer_") + std::to_string(idx))); + bufinputs.push_back((float *)checkInput( + buf_obj.request(), + std::string("buffer_") + std::to_string(idx), mlayout, + kun::Datatype::Float, known_S, known_T, knownNumStocks, + simd_len)); } py::array::ShapeContainer expected_out_shape{known_T}; for (size_t i = 0; i < outs.size(); i++) { @@ -401,6 +446,65 @@ PYBIND11_MODULE(KunRunner, m) { py::arg("outs"), py::arg("layout") = "TS", py::arg("rank_inputs") = false); + m.def( + "aggregrate", + [](std::shared_ptr exec, + const std::vector &inputs, + const std::vector &labels, + const std::vector &outs) { + if (inputs.size() != labels.size() || inputs.size() != outs.size()) + throw std::runtime_error( + "number of inputs, labels and outputs should match"); + if (inputs.size() == 0) + return; + kun::Datatype dtype = inputs[0].request().format == + py::format_descriptor::format() + ? kun::Datatype::Float + : kun::Datatype::Double; + py::ssize_t known_S = 0; + py::ssize_t known_T_input = 0; + py::ssize_t knownNumStocks = 0; + py::ssize_t simd_len = dtype == kun::Datatype::Float + ? KUN_DEFAULT_FLOAT_SIMD_LEN + : KUN_DEFAULT_DOUBLE_SIMD_LEN; + std::vector bufinputs; + std::vector buflabels; + std::vector bufoutputs; + bufinputs.reserve(inputs.size()); + buflabels.reserve(labels.size()); + bufoutputs.reserve(inputs.size()); + + for (size_t i = 0; i < inputs.size(); i++) { + auto input = inputs[i].request(); + auto label = labels[i].request(); + py::dict out = outs[i]; + checkInput(input, std::string("buffer_") + std::to_string(i), + kun::MemoryLayout::TS, dtype, known_S, known_T_input, + knownNumStocks, simd_len); + expectContiguousShape(dtype, label, "label", {known_T_input}); + bufinputs.push_back((float *)input.ptr); + buflabels.push_back((float *)label.ptr); + py::ssize_t known_T_output = 0; + kun::AggregrationOutput output{}; + for (auto kv : out) { + auto name = py::cast(kv.first); + auto &value = kv.second; + auto idx = getAggregrationKind(name); + auto output_ptr = checkInput( + py::cast(value).request(), + std::string("output_") + name + std::to_string(idx), + kun::MemoryLayout::TS, dtype, known_S, known_T_output, + knownNumStocks, simd_len); + output.buffers[idx] = (float *)output_ptr; + } + bufoutputs.emplace_back(output); + } + + kun::aggregrate(exec, inputs.size(), bufinputs.data(), + buflabels.data(), dtype, bufoutputs.data(), + knownNumStocks, known_T_input, 0, known_T_input); + }, + py::arg("exec"), py::arg("inputs"), py::arg("labels"), py::arg("outs")); py::class_(m, "StreamContext") .def(py::init, const ModuleHandle *, size_t>()) diff --git a/CAPI.md b/doc/CAPI.md similarity index 97% rename from CAPI.md rename to doc/CAPI.md index e5f39c6..5e34e10 100644 --- a/CAPI.md +++ b/doc/CAPI.md @@ -49,7 +49,7 @@ void transpose_ST8s(float* in, float* out, int stocks, int time) { Again, you need to make sure the number of stocks is a multiple of 8. -Then, similar to the Python example in [Readme.md](./Readme.md), we need to create the executor, load library and get the module: +Then, similar to the Python example in [Readme.md](../Readme.md), we need to create the executor, load library and get the module: ```C KunExecutorHandle exec = kunCreateSingleThreadExecutor(); diff --git a/Customize.md b/doc/Customize.md similarity index 99% rename from Customize.md rename to doc/Customize.md index 80686a3..21c946d 100644 --- a/Customize.md +++ b/doc/Customize.md @@ -6,7 +6,7 @@ This document describes how you can build your own factors. You can invoke KunQuant as a Python library to generate high performance C++ source code for your own factors. KunQuant also provides predefined factors of Alpha101, at the Python module KunQuant.predefined.Alpha101. -First, you need to install KunQuant. See [Readme.md](./Readme.md). +First, you need to install KunQuant. See [Readme.md](../Readme.md). Then in Python code, import the needed classes and functions. @@ -99,7 +99,7 @@ lib = kr.Library.load("/path/to/a/dir/your_lib_name/your_lib_name.so") modu = lib.getModule("my_library_name") ``` -And use the `modu` object just like in the example in [Readme](./Readme.md). +And use the `modu` object just like in the example in [Readme](../Readme.md). ## Compiler options diff --git a/NewOperators.md b/doc/NewOperators.md similarity index 100% rename from NewOperators.md rename to doc/NewOperators.md diff --git a/Operators.md b/doc/Operators.md similarity index 100% rename from Operators.md rename to doc/Operators.md diff --git a/Stream.md b/doc/Stream.md similarity index 91% rename from Stream.md rename to doc/Stream.md index 904080d..8bf83e7 100644 --- a/Stream.md +++ b/doc/Stream.md @@ -4,7 +4,7 @@ If you use KunQuant in online services, when the data for each tick are received ## Building Streaming mode Factor libraries -It is almost the same as the steps in [Customize.md](./Customize.md) and [Readme.md](./Readme.md). The main difference is that you need to specify `output_layout="STREAM"` in `generate.py` of your Factor library generator. `project/Alpha101Stream` is an example of Alpha101 in streaming mode. You can check the difference of `projects/Alpha101/generate.py` and `project/Alpha101Stream/generate.py`. Except the difference in the names, the only difference is at the line +It is almost the same as the steps in [Customize.md](./Customize.md) and [Readme.md](../Readme.md). The main difference is that you need to specify `output_layout="STREAM"` in `generate.py` of your Factor library generator. `project/Alpha101Stream` is an example of Alpha101 in streaming mode. You can check the difference of `projects/Alpha101/generate.py` and `project/Alpha101Stream/generate.py`. Except the difference in the names, the only difference is at the line ```python src = compileit(f, "alpha_101_stream", partition_factor=8, output_layout="STREAM", options={"opt_reduce": False, "fast_log": True}) diff --git a/doc/Utility.md b/doc/Utility.md new file mode 100644 index 0000000..3e05f5c --- /dev/null +++ b/doc/Utility.md @@ -0,0 +1,74 @@ +# Utility functions + +KunQuant provides C++ implemented utility functions to compute Row-to-row correlation and Aggregation. They are accelerated by SIMD and can be parallelized by multi-threading. + +## Row-to-row correlation (for IC/IR calculation) + +```python +KunQuant.runner.KunRunner.corrWith(executor, inputs, corr_with, outputs, layout = layout, rank_inputs = rank_inputs) +``` + +Compute row-to-row corr values of a list of matrices `inputs` with a single matrix `corr_with`. And write the results to the pre-allocated matrices in `outputs`. That is `outputs[i] = corr(inputs[i], corr_with)`. Each element of list `inputs` should be a matrix of `TS` or `STs` layout. Let the time-dimension of `inputs[i]` be `T`. `outputs[i]` should be a buffer (e.g. allocated by `np.empty(...)`) of 1D shape of `[T]`. `corr_with` should have the same time-dimension as `T`. Each element in `outputs[i]` should be the correlation of a row in `inputs[i]` and `corr_with`. + +Example: + +```python +from KunQuant.runner import KunRunner as kr +data1 = ... # np.ndarray of shape [time*stocks]. For example, a factor's results +data2 = ... # np.ndarray of shape [time*stocks]. For example, a factor's results +valid_in = {"alpha1": data1, "alpha2": data2} +returns = ... # np.ndarray of shape [time*stocks]. For example, the rank of returns +valid_corr = {"alpha1": np.empty((time,), dtype="float32"), "alpha2": np.empty((time,), dtype="float32")} +kr.corrWith(executor, valid_in, returns, valid_corr, layout = "TS", rank_inputs = True) +# outputs in valid_corr +alpha1_ic = valid_corr["alpha1"].mean() +``` + +The parameter `rank_inputs=True` will first compute rank in the first input array (e.g. `valid_in` above) and compute the correlation with the second input (e.g. `returns` above). It will not compute the rank of the second input. + +## Aggregation Functions + +KunQuant provides utility functions for aggregration, including min, max, first, last, count, sum, mean. Users can specify an matrix to aggregate (of shape `[num_time x num_stocks]`) and a 1D vector as the label (of shape `[num_time]`, with same `num_time` of the matrix). The aggregration is performed in the `time` dimension. The labels should be in the same datatype of the matrix and should be monotinically incresing. The rows of the matrix with the same label (indexed by the row-id) will be aggregrated together. + +For example, aggregating by the day, like pandas `a_df.groupby(a_df.index.date).sum()`: + +```python +from KunQuant.runner import KunRunner as kr +# randomly generate time stamps between 2026-01-01 to 2026-01-07 +dates = pd.date_range(start="2026-01-01", end="2026-01-07", freq="min") +# select 240 time stamps +labels = pd.DataFrame(sorted(np.random.choice(dates, size=240))) +# time=240 stocks=16 +a = np.random.rand(240, 16).astype(dtype) +a_df = pd.DataFrame(a, index=labels[0]) +# labels are the day-in-month, 1 to 6. len(labels) == 240 +labels = a_df.index.day.to_numpy().astype(dtype) +# 6 days in total +out_length = 6 +out_a = {"sum": np.empty((length, 16), dtype=dtype), "min": np.empty((length, 16), dtype=dtype), + "max": np.empty((length, 16), dtype=dtype)} +executor = kr.createMultiThreadExecutor(3) +kr.aggregrate(executor, [a], [labels], [out_a]) +# reference result in pandas +np.testing.assert_allclose(out_a["sum"], a_df.groupby(a_df.index.date).sum()) +np.testing.assert_allclose(out_a["min"], a_df.groupby(a_df.index.date).min()) +np.testing.assert_allclose(out_a["max"], a_df.groupby(a_df.index.date).max()) +``` + +You can also pass a list of matrices, labels and outputs to utilize multithreading in `kr.aggregrate`: + +```python +a = np.random.rand(...).astype(dtype) +b = np.random.rand(...).astype(dtype) +c = np.random.rand(...).astype(dtype) +labels_a = ... +labels_b = ... +labels_c = ... +out_a = {"sum": ..., "min": ...} +out_b = {"min": ..., "min": ...} +out_c = {"mean": ...} +executor = kr.createMultiThreadExecutor(3) +kr.aggregrate(executor, [a,b,c], [labels_a, labels_b, labels_c], [out_a, out_b, out_c]) +``` + +⚠️⚠️⚠️ **Note** that the output buffers should be large enough in `time` dimension. KunQuant will not check if it is large enough to hold the result. If it is not large enough, undefined behavior will occur. \ No newline at end of file diff --git a/setup.py b/setup.py index 0316be1..29790e7 100644 --- a/setup.py +++ b/setup.py @@ -27,6 +27,10 @@ def build_extension(self, ext): f"-DPYTHON_EXECUTABLE={os.sys.executable}", f"-DCMAKE_BUILD_TYPE={release_or_debug}" ] + if "KUN_SANITIZER" in os.environ and os.environ["KUN_SANITIZER"] != "0": + cmake_args += [f"-DKUN_SANITIZER=ON"] + else: + cmake_args += [f"-DKUN_SANITIZER=OFF"] build_args = ["cmake", "--build", "."] devbuild = False if "KUN_BUILD_TESTS" in os.environ and os.environ["KUN_BUILD_TESTS"] != "0": diff --git a/tests/cpp/TestRuntime.cpp b/tests/cpp/TestRuntime.cpp index 206793c..f0d41e0 100644 --- a/tests/cpp/TestRuntime.cpp +++ b/tests/cpp/TestRuntime.cpp @@ -7,8 +7,9 @@ static constexpr size_t num_splits = 4; const static size_t simd_len = 8; -static void stage1(Context *__ctx, size_t __stock_idx, size_t __total_time, +static void stage1(RuntimeStage *stage, size_t __stock_idx, size_t __total_time, size_t __start, size_t __length) { + auto __ctx = stage->ctx; float *base = &__ctx->buffers[0].ptr[__stock_idx * __total_time * simd_len + __start * simd_len]; float *out = @@ -18,8 +19,9 @@ static void stage1(Context *__ctx, size_t __stock_idx, size_t __total_time, } } -static void stage2(Context *__ctx, size_t __stock_idx, size_t __total_time, +static void stage2(RuntimeStage *stage, size_t __stock_idx, size_t __total_time, size_t __start, size_t __length) { + auto __ctx = stage->ctx; float *base = &__ctx->buffers[1].ptr[__stock_idx * __total_time * simd_len + __start * simd_len]; float *out = @@ -29,8 +31,9 @@ static void stage2(Context *__ctx, size_t __stock_idx, size_t __total_time, } } -static void stage3(Context *__ctx, size_t __stock_idx, size_t __total_time, +static void stage3(RuntimeStage *stage, size_t __stock_idx, size_t __total_time, size_t __start, size_t __length) { + auto __ctx = stage->ctx; float *base1 = &__ctx->buffers[1].ptr[__stock_idx * __total_time * simd_len + __start * simd_len]; float *base2 = diff --git a/tests/test_runtime.py b/tests/test_runtime.py index 955ad52..de7c9d9 100644 --- a/tests/test_runtime.py +++ b/tests/test_runtime.py @@ -10,10 +10,64 @@ from KunQuant.Op import * from KunQuant.ops import * from KunQuant.predefined.Alpha101 import * -from KunQuant.runner import KunRunner as kr +from KunQuant.runner import KunRunner as kr import sys from KunQuant.jit.env import cpu_arch +def test_aggregrate(dtype): + a = np.random.rand(240, 16).astype(dtype) + b = np.random.rand(240, 16).astype(dtype) + c = np.random.rand(240, 16).astype(dtype) + # randomly generate time stamps between 2026-01-01 to 2026-01-07 + dates = pd.date_range(start="2026-01-01", end="2026-01-07", freq="min") + labels = pd.DataFrame(sorted(np.random.choice(dates, size=240))) + adf = pd.DataFrame(a, index=labels[0]) + bdf = pd.DataFrame(b, index=labels[0]) + cdf = pd.DataFrame(c, index=labels[0]) + # compute the aggregrate of a sum of all rows for each day + a_sum = adf.groupby(adf.index.date).sum() + a_min = adf.groupby(adf.index.date).min() + a_max = adf.groupby(adf.index.date).max() + a_first = adf.groupby(adf.index.date).first() + a_last = adf.groupby(adf.index.date).last() + a_count = adf.groupby(adf.index.date).count() + a_mean = adf.groupby(adf.index.date).mean() + # compute the aggregrate of b sum of all rows for each day + b_sum = bdf.groupby(bdf.index.date).sum() + b_min = bdf.groupby(bdf.index.date).min() + b_max = bdf.groupby(bdf.index.date).max() + + c_sum = cdf.groupby(cdf.index.date).sum() + + # get day of month of adf.index + labels = adf.index.day.to_numpy().astype(dtype) + length = len(a_sum) + + executor = kr.createMultiThreadExecutor(3) + + def make_emp(): + return np.empty((length, 16), dtype=dtype) + out_a = {"sum": make_emp(), "min": make_emp(), + "max": make_emp(), "first": make_emp(), + "last": make_emp(), "count": make_emp(), "mean": make_emp()} + out_b = {"sum": make_emp(), "min": make_emp(), + "max": make_emp()} + out_c = {"sum": make_emp()} + kr.aggregrate(executor, [a, b, c], [labels, labels, labels], [out_a, out_b, out_c]) + np.testing.assert_allclose(out_a["sum"], a_sum, atol=1e-6, rtol=1e-4, equal_nan=True) + np.testing.assert_allclose(out_a["min"], a_min, atol=1e-6, rtol=1e-4, equal_nan=True) + np.testing.assert_allclose(out_a["max"], a_max, atol=1e-6, rtol=1e-4, equal_nan=True) + np.testing.assert_allclose(out_a["first"], a_first, atol=1e-6, rtol=1e-4, equal_nan=True) + np.testing.assert_allclose(out_a["last"], a_last, atol=1e-6, rtol=1e-4, equal_nan=True) + np.testing.assert_allclose(out_a["count"], a_count, atol=1e-6, rtol=1e-4, equal_nan=True) + np.testing.assert_allclose(out_a["mean"], a_mean, atol=1e-6, rtol=1e-4, equal_nan=True) + + np.testing.assert_allclose(out_b["sum"], b_sum, atol=1e-6, rtol=1e-4, equal_nan=True) + np.testing.assert_allclose(out_b["min"], b_min, atol=1e-6, rtol=1e-4, equal_nan=True) + np.testing.assert_allclose(out_b["max"], b_max, atol=1e-6, rtol=1e-4, equal_nan=True) + + np.testing.assert_allclose(out_c["sum"], c_sum, atol=1e-6, rtol=1e-4, equal_nan=True) + def get_compiler_flags(): if os.environ.get("KUN_TEST_NO_AVX2", "0") != "0": print("building AVX without AVX2") @@ -61,7 +115,7 @@ def test_corrwith(): a2 = pd.DataFrame(a).rank(axis=1, pct=True).astype("float32") b2 = pd.DataFrame(b).rank(axis=1, pct=True).astype("float32") ret2 = pd.DataFrame(b).rank(axis=1, pct=True).astype("float32") - kr.corrWith(executor, [a2.to_numpy(),b2.to_numpy()], ret2.to_numpy(), [out1, out2], "TS", rank_inputs = True) + kr.corrWith(executor, [a, b], ret2.to_numpy(), [out1, out2], "TS", rank_inputs = True) ex1 = a2.corrwith(ret2, axis=1) np.testing.assert_allclose(out1, ex1, atol=1e-6, rtol=1e-4, equal_nan=True) ex1 = b2.corrwith(ret2, axis=1) @@ -597,7 +651,6 @@ def test_stream_lifetime_gh_issue_41(): #################################### def repro_crash_gh_issue_71(): - print("Building factors...") builder = Builder() with builder: inp1 = Input("close") @@ -607,7 +660,7 @@ def repro_crash_gh_issue_71(): for i in range(20): Output(WindowedLinearRegressionSlope(inp1, 10 + i), f"beta_{i}") f = Function(builder.ops) - return "test_repro_crash_gh_issue_71", f, KunCompilerConfig(partition_factor=3, output_layout="STREAM", options={"opt_reduce": False, "fast_log": True}) + return "test_repro_crash_gh_issue_71", f, KunCompilerConfig(partition_factor=3, input_layout="STREAM", output_layout="STREAM", options={"opt_reduce": False, "fast_log": True}) def test_repro_crash_gh_issue_71(lib): num_symbols = 24 @@ -685,6 +738,8 @@ def rolling_max_dd(x, window_size, min_periods=1): test_stream_lifetime_gh_issue_41() test_corrwith() +test_aggregrate("float32") +test_aggregrate("float64") funclist = [ check_1(), check_TS(),