diff --git a/.github/workflows/ccpp.yml b/.github/workflows/ccpp.yml index 86fe071..8131ea1 100644 --- a/.github/workflows/ccpp.yml +++ b/.github/workflows/ccpp.yml @@ -106,6 +106,7 @@ jobs: python tests/test.py python tests/test2.py python tests/test_runtime.py + python tests/test_stream.py - name: Alpha158 test working-directory: ./ run: | diff --git a/KunQuant/Driver.py b/KunQuant/Driver.py index 5c37e83..b9ec17c 100644 --- a/KunQuant/Driver.py +++ b/KunQuant/Driver.py @@ -10,7 +10,7 @@ # get the cpu architecture of the machine from KunQuant.jit.env import cpu_arch as _cpu_arch -required_version = "0x64100003" +required_version = "kun::VERSION" @dataclass class KunCompilerConfig: partition_factor : int = 3 @@ -232,7 +232,8 @@ def push_source(is_simple=False): def query_temp_buf_id(tempname: str, window: int) -> int: input_windows[tempname] = window return insert_name_str(tempname, "TEMP").idx - src, decl = codegen_cpp(module_name, func, input_name_to_idx, ins, outs, options, stream_mode, query_temp_buf_id, input_windows, generated_cross_sectional_func, dtype, blocking_len, not allow_unaligned, is_single_source) + stream_state_buffer_init = [] + src, decl = codegen_cpp(module_name, func, input_name_to_idx, ins, outs, options, stream_mode, query_temp_buf_id, input_windows, stream_state_buffer_init, generated_cross_sectional_func, dtype, blocking_len, not allow_unaligned, is_single_source) impl_src.append(src) decl_src.append(decl) newparti = _Partition(func.name, len(partitions), pins, pouts) @@ -309,6 +310,17 @@ def query_temp_buf_id(tempname: str, window: int) -> int: {parti_dep_src2} }} ''') + + if len(stream_state_buffer_init) > 0: + stream_state_str = "\n ".join(stream_state_buffer_init) + impl_src.append(f'''static std::vector __init_state_buffers(size_t stock_count) {{ + std::vector buffers; + buffers.reserve({len(stream_state_buffer_init)}); + {stream_state_str} + return buffers; +}} +''') + dty = dtype[0].upper() + dtype[1:] impl_src.append(f'''KUN_EXPORT Module {module_name}{{ {required_version}, @@ -320,7 +332,8 @@ def query_temp_buf_id(tempname: str, window: int) -> int: MemoryLayout::{output_layout}, {blocking_len}, Datatype::{dty}, - {"0" if allow_unaligned else "1"} + {"0" if allow_unaligned else "1"}, + {"nullptr" if len(stream_state_buffer_init) == 0 else "__init_state_buffers"} }};''') push_source() if not is_single_source: diff --git a/KunQuant/Op.py b/KunQuant/Op.py index 1c81503..bd96014 100644 --- a/KunQuant/Op.py +++ b/KunQuant/Op.py @@ -474,6 +474,21 @@ def generate_init_code(self, idx: str, elem_type: str, simd_lanes: int, inputs: inputs: the input variables of the op ''' return f"{self.get_func_or_class_full_name(elem_type, simd_lanes)} {self.get_state_variable_name_prefix()}{idx};" + def generate_init_code_stream(self, local_idx: str, buffer_idx: str, elem_type: str, simd_lanes: int, inputs: List[str], aligned: bool) -> Tuple[str, str]: + ''' + generate the code for the initialization of the state variable + local_idx: the output variable name index + buffer_idx: the buffer index in ctx->state_buffers + elem_type: the element type of the state variable + simd_lanes: SIMD lanes + inputs: the input variables of the op + + Returns: + ["the code for the initialization of the state variable in the compute function", "initalizer code for the state buffer"] + ''' + typename = self.get_func_or_class_full_name(elem_type, simd_lanes) + return f"{typename}& {self.get_state_variable_name_prefix()}{local_idx} = __ctx->state_buffers[{buffer_idx}]->get<{typename}>(__stock_idx);",\ + f"buffers.emplace_back(makeStateBuffer<{typename}>(stock_count, {simd_lanes}));" class GloablStatefulOpTrait(StatefulOpTrait): diff --git a/KunQuant/ops/MiscOp.py b/KunQuant/ops/MiscOp.py index de98aa0..218d487 100644 --- a/KunQuant/ops/MiscOp.py +++ b/KunQuant/ops/MiscOp.py @@ -1,6 +1,6 @@ import KunQuant from KunQuant.Op import AcceptSingleValueInputTrait, Input, OpBase, WindowedTrait, SinkOpTrait, CrossSectionalOp, GlobalStatefulProducerTrait, GloablStatefulOpTrait, StateConsumerTrait, UnaryElementwiseOp, BinaryElementwiseOp -from typing import List, Union +from typing import List, Tuple, Union class BackRef(OpBase, WindowedTrait): ''' @@ -100,6 +100,11 @@ def get_single_value_input_id(self) -> int: def get_state_variable_name_prefix(self) -> str: return "ema_" + def generate_init_code_stream(self, local_idx: str, buffer_idx: str, elem_type: str, simd_lanes: int, inputs: List[str], aligned: bool) -> Tuple[str, str]: + if len(self.inputs) == 2: + raise RuntimeError("EMA with init_val is not supported in stream mode") + return super().generate_init_code_stream(local_idx, buffer_idx, elem_type, simd_lanes, inputs, aligned) + def generate_init_code(self, idx: str, elem_type: str, simd_lanes: int, inputs: List[str], aligned: bool) -> str: initv = "NAN" if len(self.inputs) == 2: diff --git a/KunQuant/passes/CodegenCpp.py b/KunQuant/passes/CodegenCpp.py index 05bf184..83c4904 100644 --- a/KunQuant/passes/CodegenCpp.py +++ b/KunQuant/passes/CodegenCpp.py @@ -92,7 +92,7 @@ def _generate_cross_sectional_func_name(op: GenericCrossSectionalOp, inputs: Lis name.append(layout) return f"{op.__class__.__name__}_{'_'.join(name)}" -def codegen_cpp(prefix: str, f: Function, input_name_to_idx: Dict[str, int], inputs: List[Tuple[Input, bool]], outputs: List[Tuple[Output, bool]], options: dict, stream_mode: bool, query_temp_buffer_id, stream_window_size: Dict[str, int], generated_cross_sectional_func: Set[str], elem_type: str, simd_lanes: int, aligned: bool, static: bool) -> Tuple[str, str]: +def codegen_cpp(prefix: str, f: Function, input_name_to_idx: Dict[str, int], inputs: List[Tuple[Input, bool]], outputs: List[Tuple[Output, bool]], options: dict, stream_mode: bool, query_temp_buffer_id, stream_window_size: Dict[str, int], stream_state_buffer_init: List[str], generated_cross_sectional_func: Set[str], elem_type: str, simd_lanes: int, aligned: bool, static: bool) -> Tuple[str, str]: if len(f.ops) == 3 and isinstance(f.ops[1], SimpleCrossSectionalOp): return "", f'''static auto stage_{prefix}__{f.name} = {f.ops[1].__class__.__name__}Stocks, Mapper{f.ops[2].attrs["layout"]}<{elem_type}, {simd_lanes}>>;''' @@ -281,14 +281,19 @@ def codegen_cpp(prefix: str, f: Function, input_name_to_idx: Dict[str, int], inp funcname = "SkipListArgMin" scope.scope.append(_CppSingleLine(scope, f'auto v{idx} = {funcname}<{elem_type}, {simd_lanes}>(v{inp[0]}, i);')) elif isinstance(op, GloablStatefulOpTrait): - if stream_mode: raise RuntimeError(f"Stream Mode does not support {op.__class__.__name__}") assert(op.get_parent() is None) args = {} if isinstance(op, WindowedTrait): buf_name = _get_buffer_name(op.inputs[0], inp[0]) args["buf_name"] = buf_name vargs = [f"v{inpv}" for inpv in inp] - toplevel.scope.insert(-1, _CppSingleLine(toplevel, op.generate_init_code(idx, elem_type, simd_lanes, vargs, aligned))) + if stream_mode: + cur_idx = len(stream_state_buffer_init) + var_init_code, init_buffer_code = op.generate_init_code_stream(idx, cur_idx, elem_type, simd_lanes, vargs, aligned) + toplevel.scope.insert(-1, _CppSingleLine(toplevel, var_init_code)) + stream_state_buffer_init.append(init_buffer_code) + else: + toplevel.scope.insert(-1, _CppSingleLine(toplevel, op.generate_init_code(idx, elem_type, simd_lanes, vargs, aligned))) scope.scope.append(_CppSingleLine(scope, op.generate_step_code(idx, "i", vargs, **args))) elif isinstance(op, Select): scope.scope.append(_CppSingleLine(scope, f"auto v{idx} = Select(v{inp[0]}, v{inp[1]}, v{inp[2]});")) diff --git a/cpp/Kun/Context.hpp b/cpp/Kun/Context.hpp index a33ed37..76271fc 100644 --- a/cpp/Kun/Context.hpp +++ b/cpp/Kun/Context.hpp @@ -2,6 +2,7 @@ #include "Stage.hpp" #include "StreamBuffer.hpp" +#include "StateBuffer.hpp" #include #include #include @@ -9,6 +10,7 @@ namespace kun { +static const uint64_t VERSION = 0x64100004; struct KUN_API RuntimeStage { const Stage *stage; Context *ctx; @@ -127,6 +129,7 @@ struct Context { size_t simd_len; Datatype dtype; bool is_stream; + StateBufferPtr* state_buffers; }; KUN_API std::shared_ptr createSingleThreadExecutor(); diff --git a/cpp/Kun/MathUtil.hpp b/cpp/Kun/MathUtil.hpp new file mode 100644 index 0000000..71211a5 --- /dev/null +++ b/cpp/Kun/MathUtil.hpp @@ -0,0 +1,10 @@ +#pragma once +#include + +namespace kun { +namespace { +size_t divideAndCeil(size_t x, size_t y) { return (x + y - 1) / y; } +size_t roundUp(size_t x, size_t y) { return divideAndCeil(x, y) * y; } + +} // namespace +} // namespace kun \ No newline at end of file diff --git a/cpp/Kun/Module.hpp b/cpp/Kun/Module.hpp index 4e36c90..da65d4a 100644 --- a/cpp/Kun/Module.hpp +++ b/cpp/Kun/Module.hpp @@ -1,8 +1,9 @@ #pragma once #include "Stage.hpp" -#include +#include "StateBuffer.hpp" #include +#include namespace kun { @@ -12,7 +13,6 @@ enum class MemoryLayout { STREAM, }; - struct Module { size_t required_version; size_t num_stages; @@ -24,11 +24,12 @@ struct Module { size_t blocking_len; Datatype dtype; size_t aligned; + std::vector (*init_state_buffers)(size_t num_stocks); }; struct Library { void *handle; - std::function dtor; + std::function dtor; KUN_API const Module *getModule(const char *name); KUN_API static std::shared_ptr load(const char *filename); Library(const Library &) = delete; diff --git a/cpp/Kun/Ops.hpp b/cpp/Kun/Ops.hpp index eac8615..4923384 100644 --- a/cpp/Kun/Ops.hpp +++ b/cpp/Kun/Ops.hpp @@ -348,6 +348,7 @@ struct ExpMovingAvg { using simd_int_t = kun_simd::vec::int_t, stride>; simd_t v; + ExpMovingAvg() : v{NAN} {} ExpMovingAvg(const simd_t &init) : v{init} {} static constexpr T weight_latest = T(2.0) / (window + 1); simd_t step(simd_t cur, size_t index) { diff --git a/cpp/Kun/RunGraph.hpp b/cpp/Kun/RunGraph.hpp index 966fafd..3e1ab14 100644 --- a/cpp/Kun/RunGraph.hpp +++ b/cpp/Kun/RunGraph.hpp @@ -2,6 +2,7 @@ #include "Context.hpp" #include "Module.hpp" +#include "StateBuffer.hpp" #include #include #include @@ -34,6 +35,7 @@ struct AlignedPtr { struct KUN_API StreamContext { std::vector buffers; + std::vector state_buffers; Context ctx; const Module *m; StreamContext(std::shared_ptr exec, const Module *m, diff --git a/cpp/Kun/Runtime.cpp b/cpp/Kun/Runtime.cpp index 0dc784b..38c2fa1 100644 --- a/cpp/Kun/Runtime.cpp +++ b/cpp/Kun/Runtime.cpp @@ -18,12 +18,6 @@ #define kunAlignedFree(x) free(x) #endif -#ifdef __AVX__ -#define MALLOC_ALIGNMENT 64 // AVX-512 alignment -#else -#define MALLOC_ALIGNMENT 16 // NEON alignment -#endif - #if CHECKED_PTR #include #include @@ -76,11 +70,10 @@ void checkedDealloc(void *ptr, size_t sz) { #endif namespace kun { -static const uint64_t VERSION = 0x64100003; void Buffer::alloc(size_t count, size_t use_count, size_t elem_size) { if (!ptr) { - ptr = (float *)kunAlignedAlloc(MALLOC_ALIGNMENT, count * elem_size); + ptr = (float *)kunAlignedAlloc(KUN_MALLOC_ALIGNMENT, count * elem_size); refcount = (int)use_count; #if CHECKED_PTR size = count * elem_size; @@ -230,7 +223,7 @@ void corrWith(std::shared_ptr exec, MemoryLayout layout, length, 8, Datatype::Float, - false}; + false, nullptr}; std::vector &stages = ctx.stages; stages.reserve(buffers.size()); for (size_t i = 0; i < buffers.size(); i++) { @@ -284,7 +277,7 @@ void runGraph(std::shared_ptr exec, const Module *m, length, m->blocking_len, m->dtype, - false}; + false, nullptr}; std::vector &stages = ctx.stages; stages.reserve(m->num_stages); for (size_t i = 0; i < m->num_stages; i++) { @@ -368,6 +361,12 @@ StreamContext::StreamContext(std::shared_ptr exec, const Module *m, throw std::runtime_error( "Cannot run batch mode module via StreamContext"); } + if (m->init_state_buffers) { + state_buffers = m->init_state_buffers(num_stocks); + for (auto &buf : state_buffers) { + buf->initialize(); + } + } std::vector rtlbuffers; rtlbuffers.reserve(m->num_buffers); buffers.reserve(m->num_buffers); @@ -404,6 +403,7 @@ StreamContext::StreamContext(std::shared_ptr exec, const Module *m, ctx.dtype = m->dtype; ctx.is_stream = true; ctx.simd_len = m->blocking_len; + ctx.state_buffers = state_buffers.data(); } size_t StreamContext::queryBufferHandle(const char *name) const { @@ -458,4 +458,25 @@ void StreamContext::run() { } StreamContext::~StreamContext() = default; + + +StateBuffer *StateBuffer::make(size_t num_objs, size_t elem_size, + CtorFn_t ctor_fn, DtorFn_t dtor_fn) { + auto ret = kunAlignedAlloc(KUN_MALLOC_ALIGNMENT, + sizeof(StateBuffer) + num_objs * elem_size); + auto buf = (StateBuffer *)ret; + buf->num_objs = num_objs; + buf->elem_size = elem_size; + buf->initialized = 0; + buf->ctor_fn = ctor_fn; + buf->dtor_fn = dtor_fn; + return buf; +} + +void StateBuffer::Deleter::operator()(StateBuffer *buf) { + if (buf->initialized) { + buf->dtor_fn(buf); + } + kunAlignedFree(buf); +} } // namespace kun diff --git a/cpp/Kun/StateBuffer.hpp b/cpp/Kun/StateBuffer.hpp new file mode 100644 index 0000000..ca588cd --- /dev/null +++ b/cpp/Kun/StateBuffer.hpp @@ -0,0 +1,72 @@ +#pragma once + +#include "Base.hpp" +#include "MathUtil.hpp" +#include +#include + +namespace kun { + +#ifdef __AVX__ +#define KUN_MALLOC_ALIGNMENT 64 // AVX-512 alignment +#else +#define KUN_MALLOC_ALIGNMENT 16 // NEON alignment +#endif + +struct StateBuffer { + using DtorFn_t = void (*)(StateBuffer *obj); + using CtorFn_t = void (*)(StateBuffer *obj); + + alignas(KUN_MALLOC_ALIGNMENT) size_t num_objs; + uint32_t elem_size; + uint32_t initialized; + CtorFn_t ctor_fn; + DtorFn_t dtor_fn; + alignas(KUN_MALLOC_ALIGNMENT) char buf[0]; + + KUN_API static StateBuffer *make(size_t num_objs, size_t elem_size, + CtorFn_t ctor_fn, DtorFn_t dtor_fn); + + // for std::unique_ptr + struct Deleter { + KUN_API void operator()(StateBuffer *buf); + }; + + template + T &get(size_t idx) { + return *reinterpret_cast(buf + idx * sizeof(T)); + } + + void initialize() { + initialized = 1; + ctor_fn(this); + } + void destroy() { + if (initialized) { + dtor_fn(this); + } + initialized = 0; + } + + private: + StateBuffer() = default; +}; + +using StateBufferPtr = std::unique_ptr; + +template +StateBufferPtr makeStateBuffer(size_t num_stocks, size_t simd_len) { + return StateBufferPtr(StateBuffer::make( + divideAndCeil(num_stocks, simd_len), sizeof(T), + [](StateBuffer *obj) { + for (size_t i = 0; i < obj->num_objs; i++) { + new (&obj->get(i)) T(); + } + }, + [](StateBuffer *obj) { + for (size_t i = 0; i < obj->num_objs; i++) { + obj->get(i).~T(); + } + })); +} +} // namespace kun \ No newline at end of file diff --git a/cpp/Kun/StreamBuffer.hpp b/cpp/Kun/StreamBuffer.hpp index 57e5e7b..0c67529 100644 --- a/cpp/Kun/StreamBuffer.hpp +++ b/cpp/Kun/StreamBuffer.hpp @@ -1,14 +1,10 @@ #pragma once #include "Base.hpp" +#include "MathUtil.hpp" #include #include namespace kun { -namespace { -size_t divideAndCeil(size_t x, size_t y) { return (x + y - 1) / y; } -size_t roundUp(size_t x, size_t y) { return divideAndCeil(x, y) * y; } - -} // namespace template struct StreamBuffer { // [#stock_count of float data] diff --git a/cpp/Python/PyBinding.cpp b/cpp/Python/PyBinding.cpp index 5616771..5edaa94 100644 --- a/cpp/Python/PyBinding.cpp +++ b/cpp/Python/PyBinding.cpp @@ -55,12 +55,13 @@ struct ModuleHandle { const std::shared_ptr &lib) : modu{modu}, lib{lib} {} }; -struct StreamContextWrapper : kun::StreamContext { +struct StreamContextWrapper { std::shared_ptr lib; + kun::StreamContext ctx; StreamContextWrapper(std::shared_ptr exec, const ModuleHandle *m, size_t num_stocks) - : kun::StreamContext{std::move(exec), m->modu, num_stocks}, - lib{m->lib} {} + : lib{m->lib}, ctx{std::move(exec), m->modu, num_stocks} + {} }; } // namespace @@ -402,9 +403,13 @@ PYBIND11_MODULE(KunRunner, m) { py::class_(m, "StreamContext") .def(py::init, const ModuleHandle *, size_t>()) - .def("queryBufferHandle", &StreamContextWrapper::queryBufferHandle) + .def("queryBufferHandle", + [](StreamContextWrapper &t, const char *name) { + return t.ctx.queryBufferHandle(name); + }) .def("getCurrentBuffer", - [](StreamContextWrapper &ths, size_t handle) -> py::buffer { + [](StreamContextWrapper &t, size_t handle) -> py::buffer { + auto &ths = t.ctx; if (ths.m->dtype == kun::Datatype::Double) { auto buf = ths.getCurrentBufferPtrDouble(handle); return py::array_t{ @@ -416,7 +421,8 @@ PYBIND11_MODULE(KunRunner, m) { }) .def( "pushData", - [](StreamContextWrapper &ths, size_t handle, py::array data) { + [](StreamContextWrapper &t, size_t handle, py::array data) { + auto &ths = t.ctx; py::ssize_t ndim; if (ths.m->dtype == kun::Datatype::Float) { if (!py::isinstance>( @@ -442,5 +448,5 @@ PYBIND11_MODULE(KunRunner, m) { ths.pushData(handle, (const double *)data.data()); } }) - .def("run", &StreamContextWrapper::run); + .def("run", [](StreamContextWrapper &t) { t.ctx.run(); }); } \ No newline at end of file diff --git a/tests/cpp/TestRuntime.cpp b/tests/cpp/TestRuntime.cpp index 4d9b570..206793c 100644 --- a/tests/cpp/TestRuntime.cpp +++ b/tests/cpp/TestRuntime.cpp @@ -87,7 +87,7 @@ Stage *stage2_dep[] = {&stages[2]}; } // namespace KUN_EXPORT Module testRuntimeModule{ - 0x64100003, + kun::VERSION, arraySize(stages), stages, arraySize(buffers), diff --git a/tests/test_stream.py b/tests/test_stream.py new file mode 100644 index 0000000..78e19a8 --- /dev/null +++ b/tests/test_stream.py @@ -0,0 +1,47 @@ +import numpy as np +import pandas as pd +from KunQuant.jit import cfake +from KunQuant.Op import Input, Output, Builder +from KunQuant.Stage import Function +from KunQuant.Op import * +from KunQuant.ops import * +from KunQuant.runner import KunRunner as kr + + +def test_stream(): + builder = Builder() + with builder: + inp1 = Input("a") + Output(WindowedQuantile(inp1, 10, 0.49), "quantile") + Output(ExpMovingAvg(inp1, 10), "ema") + Output(WindowedLinearRegressionSlope(inp1, 10), "slope") + f = Function(builder.ops) + lib = cfake.compileit([("stream_test", f, cfake.KunCompilerConfig(dtype="double", input_layout="STREAM", output_layout="STREAM"))], + "stream_test", cfake.CppCompilerConfig()) + + executor = kr.createSingleThreadExecutor() + stream = kr.StreamContext(executor, lib.getModule("stream_test"), 24) + a = np.random.rand(100, 24) + handle_a = stream.queryBufferHandle("a") + handle_quantile = stream.queryBufferHandle("quantile") + handle_ema = stream.queryBufferHandle("ema") + handle_slope = stream.queryBufferHandle("slope") + out = np.empty((100, 24)) + ema = np.empty((100, 24)) + slope = np.empty((100, 24)) + for i in range(100): + stream.pushData(handle_a, a[i]) + stream.run() + out[i] = stream.getCurrentBuffer(handle_quantile) + ema[i] = stream.getCurrentBuffer(handle_ema) + slope[i] = stream.getCurrentBuffer(handle_slope) + df = pd.DataFrame(a) + expected_quantile = df.rolling(10).quantile(0.49, interpolation='linear').to_numpy() + expected_ema = df.ewm(span=10, adjust=False, ignore_na=True).mean().to_numpy() + expected_slope = df.rolling(10).apply(lambda x: np.polyfit(np.arange(len(x)), x, 1)[0]).to_numpy() + np.testing.assert_allclose(out, expected_quantile, atol=1e-6, rtol=1e-4, equal_nan=True) + np.testing.assert_allclose(ema, expected_ema, atol=1e-6, rtol=1e-4, equal_nan=True) + np.testing.assert_allclose(slope[10:], expected_slope[10:], atol=1e-6, rtol=1e-4, equal_nan=True) + +test_stream() +print("test_stream passed") \ No newline at end of file diff --git a/tests/tests.sh b/tests/tests.sh index 5c80b8d..85902b8 100644 --- a/tests/tests.sh +++ b/tests/tests.sh @@ -4,6 +4,8 @@ python tests/test.py python tests/test2.py echo "KunQuant runtime tests" python tests/test_runtime.py +echo "KunQuant stream tests" +python tests/test_stream.py echo "KunQuant runtime tests (AVX)" KUN_TEST_NO_AVX2=1 python tests/test_runtime.py echo "KunQuant alpha101 tests" diff --git a/tests/tests_arm.sh b/tests/tests_arm.sh index 8d61eba..58427a2 100644 --- a/tests/tests_arm.sh +++ b/tests/tests_arm.sh @@ -17,6 +17,8 @@ python tests/test.py python tests/test2.py echo "KunQuant runtime tests" python tests/test_runtime.py +echo "KunQuant stream tests" +python tests/test_stream.py echo "KunQuant alpha101 tests" python tests/test_alpha101.py arm echo "KunQuant alpha158 tests"