diff --git a/KunQuant/Driver.py b/KunQuant/Driver.py index b9ec17c..513a75d 100644 --- a/KunQuant/Driver.py +++ b/KunQuant/Driver.py @@ -211,6 +211,7 @@ def push_source(is_simple=False): is_single_source = split_source == 0 # the set of names of custom cross sectional functions generated_cross_sectional_func = set() + stream_state_buffer_init = [] for func in impl: if split_source > 0 and cur_count > split_source: push_source() @@ -232,7 +233,6 @@ def push_source(is_simple=False): def query_temp_buf_id(tempname: str, window: int) -> int: input_windows[tempname] = window return insert_name_str(tempname, "TEMP").idx - stream_state_buffer_init = [] src, decl = codegen_cpp(module_name, func, input_name_to_idx, ins, outs, options, stream_mode, query_temp_buf_id, input_windows, stream_state_buffer_init, generated_cross_sectional_func, dtype, blocking_len, not allow_unaligned, is_single_source) impl_src.append(src) decl_src.append(decl) diff --git a/cpp/Kun/Runtime.cpp b/cpp/Kun/Runtime.cpp index 7e23860..af42860 100644 --- a/cpp/Kun/Runtime.cpp +++ b/cpp/Kun/Runtime.cpp @@ -327,7 +327,9 @@ template char *StreamBuffer::make(size_t stock_count, size_t window_size, size_t simd_len) { auto ret = kunAlignedAlloc( - sizeof(T) * simd_len, StreamBuffer::getBufferSize(stock_count, window_size, simd_len)); + KUN_MALLOC_ALIGNMENT, + roundUp(StreamBuffer::getBufferSize(stock_count, window_size, simd_len), + KUN_MALLOC_ALIGNMENT)); auto buf = (StreamBuffer *)ret; auto data = buf->getBuffer(); auto rounded_stock_count = roundUp(stock_count, simd_len); @@ -487,8 +489,10 @@ bool StreamContext::serializeStates(OutputStreamBase* stream) { StateBuffer *StateBuffer::make(size_t num_objs, size_t elem_size, CtorFn_t ctor_fn, DtorFn_t dtor_fn, SerializeFn_t serialize_fn, DeserializeFn_t deserialize_fn) { - auto ret = kunAlignedAlloc(KUN_MALLOC_ALIGNMENT, - sizeof(StateBuffer) + num_objs * elem_size); + auto ret = + kunAlignedAlloc(KUN_MALLOC_ALIGNMENT, + roundUp(sizeof(StateBuffer) + num_objs * elem_size, + KUN_MALLOC_ALIGNMENT)); auto buf = (StateBuffer *)ret; buf->num_objs = num_objs; buf->elem_size = elem_size; diff --git a/tests/test_runtime.py b/tests/test_runtime.py index 24cf58e..955ad52 100644 --- a/tests/test_runtime.py +++ b/tests/test_runtime.py @@ -596,6 +596,31 @@ def test_stream_lifetime_gh_issue_41(): #################################### +def repro_crash_gh_issue_71(): + print("Building factors...") + builder = Builder() + with builder: + inp1 = Input("close") + # Generate MANY factors to stress memory/heap + for i in range(20): + Output(WindowedQuantile(inp1, 10, 0.5 + i * 0.01), f"qtl_{i}") + for i in range(20): + Output(WindowedLinearRegressionSlope(inp1, 10 + i), f"beta_{i}") + f = Function(builder.ops) + return "test_repro_crash_gh_issue_71", f, KunCompilerConfig(partition_factor=3, output_layout="STREAM", options={"opt_reduce": False, "fast_log": True}) + +def test_repro_crash_gh_issue_71(lib): + num_symbols = 24 + executor = kr.createSingleThreadExecutor() + modu = lib.getModule("test_repro_crash_gh_issue_71") + stream = kr.StreamContext(executor, modu, num_symbols) + data = np.random.rand(num_symbols).astype("float32") + h_close = stream.queryBufferHandle("close") + for i in range(20): + stream.pushData(h_close, data) + stream.run() + +#################################### def create_stream_double(): builder = Builder() @@ -680,6 +705,7 @@ def rolling_max_dd(x, window_size, min_periods=1): check_covar(), check_quantile(), check_large_rank(), + repro_crash_gh_issue_71(), ] lib = cfake.compileit(funclist, "test", cfake.CppCompilerConfig(machine=get_compiler_flags())) @@ -706,4 +732,5 @@ def rolling_max_dd(x, window_size, min_periods=1): test_covar(lib) test_quantile(lib) test_large_rank(lib) +test_repro_crash_gh_issue_71(lib) print("done")