From 0ec237179d7c6a19147710fb3959cf2b7dba29a4 Mon Sep 17 00:00:00 2001 From: Menooker Date: Wed, 10 Dec 2025 22:50:40 +0800 Subject: [PATCH 1/8] skip list sort --- KunQuant/Driver.py | 1 + KunQuant/Op.py | 15 ++++ KunQuant/ops/CompOp.py | 130 +++++++++++++++++++++-------- KunQuant/ops/MiscOp.py | 70 +++++++++++----- KunQuant/passes/CodegenCpp.py | 11 ++- KunQuant/passes/Partitioner.py | 8 +- cpp/Kun/Ops/Quantile.hpp | 147 +++++++++++++++++++++++++++------ tests/capi/test_c.cpp | 5 +- tests/test_runtime.py | 57 +++++++++++++ 9 files changed, 354 insertions(+), 90 deletions(-) diff --git a/KunQuant/Driver.py b/KunQuant/Driver.py index 48b61dc..5c37e83 100644 --- a/KunQuant/Driver.py +++ b/KunQuant/Driver.py @@ -106,6 +106,7 @@ def compileit(f: Function, module_name: str, partition_factor = 3, dtype = "floa blocking_len = suggested_len[dtype] if element_size[dtype] * blocking_len not in simd_len: raise RuntimeError(f"Blocking length {blocking_len} is not supported for {dtype} on {_cpu_arch}") + options['blocking_len'] = blocking_len if output_layout not in ["STs", "TS", "STREAM"]: raise RuntimeError("Bad output_layout name " + output_layout) if input_layout not in ["STs", "TS", "STREAM"]: diff --git a/KunQuant/Op.py b/KunQuant/Op.py index 35c301c..1c81503 100644 --- a/KunQuant/Op.py +++ b/KunQuant/Op.py @@ -74,6 +74,9 @@ def traverse_replace_map(op: 'OpBase', replace_map: Dict['OpBase', 'OpBase']) -> return traverse_replace_map(found, replace_map) class AcceptSingleValueInputTrait(ABC): + ''' + The ops that accept a time_len=1 array as input + ''' @abstractmethod def get_single_value_input_id() -> int: pass @@ -479,6 +482,18 @@ class GloablStatefulOpTrait(StatefulOpTrait): ''' pass +class GlobalStatefulProducerTrait(GloablStatefulOpTrait): + ''' + The ops that have an internal state, and the state is carried between different time steps, and the state must be consumed by a StateConsumerTrait + ''' + pass + +class StateConsumerTrait: + ''' + The ops that consume a state from a GlobalStatefulProducerTrait + ''' + pass + class ReductionOp(OpBase, StatefulOpTrait): ''' Base class of all reduction ops. A reduction op takes inputs that is originated from a IterValue. The input must be in a loop (v.get_parent() is a loop). The data produced diff --git a/KunQuant/ops/CompOp.py b/KunQuant/ops/CompOp.py index c972c00..934bc30 100644 --- a/KunQuant/ops/CompOp.py +++ b/KunQuant/ops/CompOp.py @@ -1,7 +1,9 @@ from .ReduceOp import ReduceAdd, ReduceMul, ReduceArgMax, ReduceRank, ReduceMin, ReduceMax, ReduceDecayLinear, ReduceArgMin from KunQuant.Op import ConstantOp, OpBase, CompositiveOp, WindowedTrait, ForeachBackWindow, WindowedTempOutput, Builder, IterValue, WindowLoopIndex from .ElewiseOp import And, DivConst, GreaterThan, LessThan, Or, Select, SetInfOrNanToValue, Sub, Mul, Sqrt, SubConst, Div, CmpOp, Exp, Log, Min, Max, Equals, Abs -from .MiscOp import Accumulator, BackRef, SetAccumulator, WindowedLinearRegression, WindowedLinearRegressionResiImpl, WindowedLinearRegressionRSqaureImpl, WindowedLinearRegressionSlopeImpl, ReturnFirstValue +from .MiscOp import Accumulator, BackRef, SetAccumulator, WindowedLinearRegression, WindowedLinearRegressionResiImpl,\ + WindowedLinearRegressionRSqaureImpl, WindowedLinearRegressionSlopeImpl, ReturnFirstValue, SkipListState, SkipListQuantile, SkipListRank, SkipListMin, SkipListMax,\ + SkipListArgMin from collections import OrderedDict from typing import Union, List, Tuple, Dict import math @@ -9,6 +11,11 @@ def _is_fast_stat(opt: dict, attrs: dict) -> bool: return not opt.get("no_fast_stat", True) and not attrs.get("no_fast_stat", False) +def _decide_use_skip_list(window: int, blocking_len: int) -> bool: + naive_cost = window + skip_list_cost = math.log2(window) * blocking_len * 5 + return skip_list_cost < naive_cost + class WindowedCompositiveOp(CompositiveOp, WindowedTrait): def __init__(self, v: OpBase, window: int, v2 = None) -> None: inputs = [v] @@ -17,7 +24,7 @@ def __init__(self, v: OpBase, window: int, v2 = None) -> None: super().__init__(inputs, [("window", window)]) class WindowedReduce(WindowedCompositiveOp): - def make_reduce(self, v: OpBase) -> OpBase: + def make_reduce(self, v: OpBase, newest: OpBase) -> OpBase: raise RuntimeError("Not implemented") def decompose(self, options: dict) -> List[OpBase]: @@ -26,7 +33,7 @@ def decompose(self, options: dict) -> List[OpBase]: v0 = WindowedTempOutput(self.inputs[0], self.attrs["window"]) v1 = ForeachBackWindow(v0, self.attrs["window"]) itr = IterValue(v1, v0) - v2 = self.make_reduce(itr) + v2 = self.make_reduce(itr, self.inputs[0]) return b.ops class WindowedSum(WindowedReduce): @@ -35,7 +42,7 @@ class WindowedSum(WindowedReduce): For indices < window-1, the output will be NaN similar to pandas.DataFrame.rolling(n).sum() ''' - def make_reduce(self, v: OpBase) -> OpBase: + def make_reduce(self, v: OpBase, newest: OpBase) -> OpBase: return ReduceAdd(v) class WindowedProduct(WindowedReduce): @@ -44,26 +51,55 @@ class WindowedProduct(WindowedReduce): For indices < window-1, the output will be NaN similar to pandas.DataFrame.rolling(n).product() ''' - def make_reduce(self, v: OpBase) -> OpBase: + def make_reduce(self, v: OpBase, newest: OpBase) -> OpBase: return ReduceMul(v) -class WindowedMin(WindowedReduce): +class _WindowedMinMaxBase(WindowedReduce): + ''' + Base class for windowed min/max ops that can use skip list. If the window is small enough, use naive linear scan. Otherwise, use skip list + with Log(window) complexity. + ''' + def on_skip_list(self, skplist: SkipListState, cur: OpBase) -> OpBase: + raise RuntimeError("Not implemented") + + def decompose(self, options: dict) -> List[OpBase]: + window = self.attrs["window"] + blocking_len = options["blocking_len"] + if _decide_use_skip_list(window, blocking_len): + b = Builder(self.get_parent()) + with b: + newv = self.inputs[0] + oldv = BackRef(newv, window) + v2 = SkipListState(oldv, newv, window) + self.on_skip_list(v2, newv) + return b.ops + else: + return super().decompose(options) + +class WindowedMin(_WindowedMinMaxBase): ''' Min of a rolling look back window, including the current newest data. For indices < window-1, the output will be NaN similar to pandas.DataFrame.rolling(n).min() ''' - def make_reduce(self, v: OpBase) -> OpBase: + def make_reduce(self, v: OpBase, newest: OpBase) -> OpBase: return ReduceMin(v) -class WindowedMax(WindowedReduce): + def on_skip_list(self, skplist: SkipListState, cur: OpBase) -> OpBase: + return SkipListMin(skplist) + + +class WindowedMax(_WindowedMinMaxBase): ''' Max of a rolling look back window, including the current newest data. For indices < window-1, the output will be NaN similar to pandas.DataFrame.rolling(n).max() ''' - def make_reduce(self, v: OpBase) -> OpBase: + def make_reduce(self, v: OpBase, newest: OpBase) -> OpBase: return ReduceMax(v) + + def on_skip_list(self, skplist: SkipListState, cur: OpBase) -> OpBase: + return SkipListMax(skplist) # small - sum def _kahan_sub(mask: OpBase, sum: OpBase, small: OpBase, compensation: OpBase) -> Union[OpBase, OpBase]: @@ -424,37 +460,42 @@ def decompose(self, options: dict) -> List[OpBase]: return b.ops -class TsArgMax(WindowedCompositiveOp): +class TsArgMax(WindowedReduce): ''' ArgMax in a rolling look back window, including the current newest data. The result should be the index of the max element in the rolling window. The index of the oldest element of the rolling window is 1. Similar to df.rolling(window).apply(np.argmax) + 1 ''' def decompose(self, options: dict) -> List[OpBase]: - b = Builder(self.get_parent()) - with b: - v0 = WindowedTempOutput(self.inputs[0], self.attrs["window"]) - v1 = ForeachBackWindow(v0, self.attrs["window"]) - v2 = ReduceArgMax(IterValue(v1, v0)) - v3 = SubConst(v2, self.attrs["window"], True) - return b.ops + window = self.attrs["window"] + blocking_len = options["blocking_len"] + if _decide_use_skip_list(window, blocking_len): + b = Builder(self.get_parent()) + with b: + TsArgMin(0-self.inputs[0], window) + return b.ops + else: + return super().decompose(options) + + def make_reduce(self, v: OpBase, newest: OpBase) -> OpBase: + v2 = ReduceArgMax(v) + return self.attrs["window"] - v2 -class TsArgMin(WindowedCompositiveOp): +class TsArgMin(_WindowedMinMaxBase): ''' ArgMin in a rolling look back window, including the current newest data. The result should be the index of the min element in the rolling window. The index of the oldest element of the rolling window is 1. Similar to df.rolling(window).apply(np.argmin) + 1 ''' - def decompose(self, options: dict) -> List[OpBase]: - b = Builder(self.get_parent()) - with b: - v0 = WindowedTempOutput(self.inputs[0], self.attrs["window"]) - v1 = ForeachBackWindow(v0, self.attrs["window"]) - v2 = ReduceArgMin(IterValue(v1, v0)) - v3 = SubConst(v2, self.attrs["window"], True) - return b.ops + def on_skip_list(self, skplist: SkipListState, cur: OpBase) -> OpBase: + return SkipListArgMin([skplist], []) + + def make_reduce(self, v: OpBase, newest: OpBase) -> OpBase: + v2 = ReduceArgMin(v) + return self.attrs["window"] - v2 + -class TsRank(WindowedCompositiveOp): +class TsRank(_WindowedMinMaxBase): ''' Time series rank of the newest data in a rolling look back window, including the current newest data. Let num_values_less = the number of values in rolling window that is less than the current newest data. @@ -462,13 +503,11 @@ class TsRank(WindowedCompositiveOp): rank = num_values_less + (num_values_eq + 1) / 2 Similar to df.rolling(window).rank() ''' - def decompose(self, options: dict) -> List[OpBase]: - b = Builder(self.get_parent()) - with b: - v0 = WindowedTempOutput(self.inputs[0], self.attrs["window"]) - v1 = ForeachBackWindow(v0, self.attrs["window"]) - v2 = ReduceRank(IterValue(v1, v0), self.inputs[0]) - return b.ops + def on_skip_list(self, skplist: SkipListState, cur: OpBase) -> OpBase: + return SkipListRank(skplist, cur) + + def make_reduce(self, v: OpBase, newest: OpBase) -> OpBase: + return ReduceRank(v, newest) class Clip(CompositiveOp): ''' @@ -636,4 +675,27 @@ def decompose(self, options: dict) -> List[OpBase]: filtered = Select(index >= max_bar_index, IterValue(each, v), inf) trough = ReduceMin(filtered) (peak - trough) / peak + return b.ops + + +class WindowedQuantile(CompositiveOp, WindowedTrait): + ''' + Quantile in `window` rows ago. + Similar to pd.rolling(window).quantile(q, interpolation='linear') + ''' + def __init__(self, v: OpBase, window: int, q: float) -> None: + super().__init__([v], [("window", window), ("q", q)]) + + def required_input_window(self) -> int: + return self.attrs["window"] + 1 + + def decompose(self, options: dict) -> List[OpBase]: + b = Builder(self.get_parent()) + window = self.attrs["window"] + v = self.inputs[0] + q = self.attrs["q"] + with b: + old = BackRef(v, window) + v2 = SkipListState(old, v, window) + v3 = SkipListQuantile(v2, q) return b.ops \ No newline at end of file diff --git a/KunQuant/ops/MiscOp.py b/KunQuant/ops/MiscOp.py index e021209..f3e7d92 100644 --- a/KunQuant/ops/MiscOp.py +++ b/KunQuant/ops/MiscOp.py @@ -1,5 +1,5 @@ import KunQuant -from KunQuant.Op import AcceptSingleValueInputTrait, Input, OpBase, WindowedTrait, SinkOpTrait, CrossSectionalOp, GloablStatefulOpTrait, UnaryElementwiseOp, BinaryElementwiseOp +from KunQuant.Op import AcceptSingleValueInputTrait, Input, OpBase, WindowedTrait, SinkOpTrait, CrossSectionalOp, GlobalStatefulProducerTrait, GloablStatefulOpTrait, StateConsumerTrait, UnaryElementwiseOp, BinaryElementwiseOp from typing import List, Union class BackRef(OpBase, WindowedTrait): @@ -12,16 +12,6 @@ def __init__(self, v: OpBase, window: int) -> None: def required_input_window(self) -> int: return self.attrs["window"] + 1 -class WindowedQuantile(OpBase, WindowedTrait): - ''' - Quantile in `window` rows ago. - Similar to pd.rolling(window).quantile(q, interpolation='linear') - ''' - def __init__(self, v: OpBase, window: int, q: float) -> None: - super().__init__([v], [("window", window), ("q", q)]) - - def required_input_window(self) -> int: - return self.attrs["window"] + 1 class FastWindowedSum(OpBase, WindowedTrait, GloablStatefulOpTrait): ''' @@ -39,7 +29,7 @@ def get_state_variable_name_prefix(self) -> str: def generate_step_code(self, idx: str, time_idx: str, inputs: List[str], buf_name: str) -> str: return f"auto v{idx} = sum_{idx}.step({buf_name}, {inputs[0]}, {time_idx});" -class Accumulator(OpBase, GloablStatefulOpTrait): +class Accumulator(OpBase, GlobalStatefulProducerTrait): ''' Accumulator is a stateful op that accumulates the input value over time. It can be used to compute running totals, moving averages, etc.''' @@ -64,7 +54,7 @@ def verify(self, func) -> None: raise RuntimeError(f"Accumulator {self.attrs['name']} is not used with any SetAccumulator") return super().verify(func) -class SetAccumulator(OpBase): +class SetAccumulator(OpBase, StateConsumerTrait): ''' Set the value of an Accumulator to a value, if mask is set. Otherwise, it does nothing. ''' @@ -120,7 +110,7 @@ def generate_init_code(self, idx: str, elem_type: str, simd_lanes: int, inputs: def generate_step_code(self, idx: str, time_idx: str, inputs: List[str]) -> str: return f"auto v{idx} = ema_{idx}.step({inputs[0]}, {time_idx});" -class WindowedLinearRegression(OpBase, WindowedTrait, GloablStatefulOpTrait): +class WindowedLinearRegression(OpBase, WindowedTrait, GlobalStatefulProducerTrait): ''' Compute states of Windowed Linear Regression ''' @@ -135,8 +125,12 @@ def get_state_variable_name_prefix(self) -> str: def generate_step_code(self, idx: str, time_idx: str, inputs: List[str], buf_name: str) -> str: return f"const auto& v{idx} = linear_{idx}.step({buf_name}, {inputs[0]}, {time_idx});" - -class WindowedLinearRegressionImplBase(OpBase): + + +class WindowedLinearRegressionConsumerTrait(StateConsumerTrait): + pass + +class WindowedLinearRegressionImplBase(UnaryElementwiseOp, WindowedLinearRegressionConsumerTrait): def __init__(self, v: OpBase) -> None: super().__init__([v]) @@ -145,27 +139,61 @@ def verify(self, func: 'KunQuant.Stage.Function') -> None: raise RuntimeError("WindowedLinearRegressionImpl expects WindowedLinearRegression Op as input") return super().verify(func) -class WindowedLinearRegressionConsumerTrait: - pass -class WindowedLinearRegressionRSqaureImpl(UnaryElementwiseOp, WindowedLinearRegressionConsumerTrait): +class WindowedLinearRegressionRSqaureImpl(WindowedLinearRegressionImplBase): ''' Compute RSqaure of Windowed Linear Regression ''' pass -class WindowedLinearRegressionSlopeImpl(UnaryElementwiseOp, WindowedLinearRegressionConsumerTrait): +class WindowedLinearRegressionSlopeImpl(WindowedLinearRegressionImplBase): ''' Compute RSqaure of Windowed Linear Regression ''' pass -class WindowedLinearRegressionResiImpl(BinaryElementwiseOp, WindowedLinearRegressionConsumerTrait): +class WindowedLinearRegressionResiImpl(WindowedLinearRegressionImplBase): ''' Compute RSqaure of Windowed Linear Regression ''' pass +class SkipListState(OpBase, GlobalStatefulProducerTrait): + ''' + SkipListState is a stateful op that maintains a skip list of the input values. + ''' + def __init__(self, oldvalue: OpBase, value: OpBase, window: int) -> None: + super().__init__([oldvalue, value], [("window", window)]) + + def get_state_variable_name_prefix(self) -> str: + return "skip_list_" + + def generate_step_code(self, idx: str, time_idx: str, inputs: List[str]) -> str: + return f"auto& v{idx} = skip_list_{idx}.step({inputs[0]}, {inputs[1]}, {time_idx});" + +class SkipListConsumerOp(StateConsumerTrait): + def verify(self, func: 'KunQuant.Stage.Function') -> None: + if len(self.inputs) < 1 or not isinstance(self.inputs[0], SkipListState): + raise RuntimeError("SkipListConsumerOp expects SkipListState Op as input") + return super().verify(func) + +class SkipListQuantile(SkipListConsumerOp, OpBase): + def __init__(self, v: OpBase, q: float) -> None: + super().__init__([v], [("q", q)]) + +class SkipListRank(SkipListConsumerOp, BinaryElementwiseOp): + pass + +class SkipListMin(SkipListConsumerOp, UnaryElementwiseOp): + pass + +class SkipListMax(SkipListConsumerOp, UnaryElementwiseOp): + pass + +class SkipListArgMin(SkipListConsumerOp, OpBase): + pass + + class GenericCrossSectionalOp(CrossSectionalOp): ''' Cross sectional op with customized C++ implementation. diff --git a/KunQuant/passes/CodegenCpp.py b/KunQuant/passes/CodegenCpp.py index 9f83913..05bf184 100644 --- a/KunQuant/passes/CodegenCpp.py +++ b/KunQuant/passes/CodegenCpp.py @@ -272,11 +272,14 @@ def codegen_cpp(prefix: str, f: Function, input_name_to_idx: Dict[str, int], inp buf_name = _get_buffer_name(op.inputs[0], inp[0]) funcname = "windowedRef" scope.scope.append(_CppSingleLine(scope, f'auto v{idx} = {funcname}<{elem_type}, {simd_lanes}, {op.attrs["window"]}>({buf_name}, i);')) - elif isinstance(op, WindowedQuantile): + elif isinstance(op, SkipListQuantile): assert(op.get_parent() is None) - buf_name = _get_buffer_name(op.inputs[0], inp[0]) - funcname = "windowedQuantile" - scope.scope.append(_CppSingleLine(scope, f'auto v{idx} = {funcname}<{elem_type}, {simd_lanes}, {op.attrs["window"]}>({buf_name}, i, {_float_value_to_float(op.attrs["q"], elem_type)});')) + funcname = "SkipListQuantile" + scope.scope.append(_CppSingleLine(scope, f'auto v{idx} = {funcname}<{elem_type}, {simd_lanes}>(v{inp[0]}, {_float_value_to_float(op.attrs["q"], elem_type)});')) + elif isinstance(op, SkipListArgMin): + assert(op.get_parent() is None) + funcname = "SkipListArgMin" + scope.scope.append(_CppSingleLine(scope, f'auto v{idx} = {funcname}<{elem_type}, {simd_lanes}>(v{inp[0]}, i);')) elif isinstance(op, GloablStatefulOpTrait): if stream_mode: raise RuntimeError(f"Stream Mode does not support {op.__class__.__name__}") assert(op.get_parent() is None) diff --git a/KunQuant/passes/Partitioner.py b/KunQuant/passes/Partitioner.py index 7a5eed6..cfe64fd 100644 --- a/KunQuant/passes/Partitioner.py +++ b/KunQuant/passes/Partitioner.py @@ -1,5 +1,5 @@ -from KunQuant.Op import OpBase, Output, Input, CrossSectionalOp, GraphSourceTrait, ConstantOp, ReductionOp, BoolOpTrait -from KunQuant.ops.MiscOp import Accumulator, SetAccumulator, WindowedLinearRegressionConsumerTrait, WindowedLinearRegression, ReturnFirstValue +from KunQuant.Op import OpBase, Output, Input, CrossSectionalOp, GraphSourceTrait, ConstantOp, ReductionOp, BoolOpTrait, GlobalStatefulProducerTrait, StateConsumerTrait +from KunQuant.ops.MiscOp import ReturnFirstValue from KunQuant.Stage import Function, OpInfo from KunQuant.ops import GenericPartition from typing import List, Dict, Set, Tuple @@ -17,7 +17,7 @@ def _is_non_output_op(op: OpBase) -> bool: ''' return true if the op cannot be at the edge of a partition as output ''' - if op.get_parent() is not None or isinstance(op, (WindowedLinearRegression, BoolOpTrait, Accumulator)): + if op.get_parent() is not None or isinstance(op, (GlobalStatefulProducerTrait, BoolOpTrait)): return True return False @@ -25,7 +25,7 @@ def _is_fast_select_op(op: OpBase) -> bool: ''' return true if the op should be selected ASAP ''' - if isinstance(op, (ReductionOp, WindowedLinearRegressionConsumerTrait, SetAccumulator, ReturnFirstValue)) or _is_non_output_op(op): + if isinstance(op, (ReductionOp, StateConsumerTrait, ReturnFirstValue)) or _is_non_output_op(op): return True else: # don't stop at boolean op diff --git a/cpp/Kun/Ops/Quantile.hpp b/cpp/Kun/Ops/Quantile.hpp index d7cfa48..ea3bc0d 100644 --- a/cpp/Kun/Ops/Quantile.hpp +++ b/cpp/Kun/Ops/Quantile.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -10,42 +11,136 @@ namespace kun { namespace ops { -namespace quantile { -template -T call(T *sorted, int length, T q) { - T findex = (length - 1) * q; - int index = findex; - T fraction = findex - index; - if (std::abs(fraction) < 1e-6) { - return sorted[index]; +namespace { +template +struct SkipListStateImpl { + using simd_t = kun_simd::vec; + SkipList skipList[simdLen]; + int lastInsertRank[simdLen]; + size_t window; + SkipListStateImpl(size_t window) : window(window) { + for (int i = 0; i < simdLen; i++) { + skipList[i].init(window); + } } - T i = sorted[index]; - T j = sorted[index + 1]; - return i + (j - i) * fraction; -} + SkipListStateImpl &step(const simd_t &oldvalue, const simd_t &value, + size_t index) { + alignas(alignof(simd_t)) T values[simdLen]; + simd_t::store(oldvalue, values); + for (int i = 0; i < simdLen; i++) { + if (!std::isnan(values[i])) { + skipList[i].remove(values[i]); + } + } + simd_t::store(value, values); + for (int i = 0; i < simdLen; i++) { + if (!std::isnan(values[i])) { + lastInsertRank[i] = skipList[i].insert(values[i], index); + } + } + return *this; + } +}; + +template +struct SkipListState : SkipListStateImpl { + SkipListState() : SkipListStateImpl(window) {} +}; +} // namespace -} // namespace quantile -template -kun_simd::vec windowedQuantile(TInput &input, size_t index, T q) { +// https://github.com/pandas-dev/pandas/blob/main/pandas/_libs/window/aggregations.pyx + +template +kun_simd::vec SkipListQuantile(SkipListStateImpl &state, + T q) { alignas(alignof(kun_simd::vec)) T result[stride]; - T sorted[window]; + size_t index; + bool found; for (int i = 0; i < stride; i++) { - int cnt = 0; - for (int j = 0; j < window; j++) { - auto v = input.getWindowLane(index, j, i); - if (!std::isnan(v)) { - sorted[cnt] = v; - cnt++; - } + int nobs = state.skipList[i].size(); + if (nobs != state.window) { + result[i] = NAN; + continue; } - if (cnt > 0) { - std::sort(sorted, sorted + cnt); - result[i] = quantile::call(sorted, cnt, q); + T idx_with_fraction = q * (nobs - 1); + int idx = static_cast(idx_with_fraction); + if (idx == idx_with_fraction) { + result[i] = state.skipList[i].get(idx, index, found); } else { + auto vlow = state.skipList[i].get(idx, index, found); + auto vhigh = state.skipList[i].get(idx + 1, index, found); + result[i] = vlow + (vhigh - vlow) * (idx_with_fraction - idx); + } + } + return kun_simd::vec::load(result); +} + +template +kun_simd::vec SkipListRank(SkipListStateImpl &state, + const kun_simd::vec &cur) { + alignas(alignof(kun_simd::vec)) T result[stride]; + alignas(alignof(kun_simd::vec)) T curval[stride]; + kun_simd::vec::store(cur, curval); + size_t index; + bool found; + for (int i = 0; i < stride; i++) { + int nobs = state.skipList[i].size(); + if (nobs != state.window) { result[i] = NAN; + continue; } + double rank = state.lastInsertRank[i] + 1; + double rank_min = state.skipList[i].minRank(curval[i]) + 1; + rank = (((rank * (rank + 1) / 2) - ((rank_min - 1) * rank_min / 2)) / + (rank - rank_min + 1)); + result[i] = rank; } return kun_simd::vec::load(result); } + +template +kun_simd::vec SkipListMinMax(SkipListStateImpl &state, bool is_min) { + alignas(alignof(kun_simd::vec)) T result[stride]; + size_t index; + bool found; + for (int i = 0; i < stride; i++) { + int nobs = state.skipList[i].size(); + if (nobs != state.window) { + result[i] = NAN; + continue; + } + int rank = is_min ? 0 : state.window - 1; + result[i] = state.skipList[i].get(rank, index, found); + } + return kun_simd::vec::load(result); +} + +template +kun_simd::vec SkipListMin(SkipListStateImpl &state) { + return SkipListMinMax(state, true); +} + +template +kun_simd::vec SkipListMax(SkipListStateImpl &state) { + return SkipListMinMax(state, false); +} + +template +kun_simd::vec SkipListArgMin(SkipListStateImpl &state, size_t cur_idx) { + alignas(alignof(kun_simd::vec)) T result[stride]; + size_t index; + bool found; + for (int i = 0; i < stride; i++) { + int nobs = state.skipList[i].size(); + if (nobs != state.window) { + result[i] = NAN; + continue; + } + state.skipList[i].get(0, index, found); + result[i] = index + nobs - cur_idx; + } + return kun_simd::vec::load(result); +} + } // namespace ops } // namespace kun \ No newline at end of file diff --git a/tests/capi/test_c.cpp b/tests/capi/test_c.cpp index 03fee50..acf031b 100644 --- a/tests/capi/test_c.cpp +++ b/tests/capi/test_c.cpp @@ -9,7 +9,7 @@ printf("CHECK(" #V ") faild\n"); \ return 3; \ } - +extern bool testSkipList(); static int testBatch(const char *libpath) { // prepare inputs const size_t num_stocks = 24; @@ -123,6 +123,9 @@ static int testStream(const char *libpath) { } int main(int args, char **argv) { + if (!testSkipList()) { + return 3; + } if (args != 3) { printf("Bad args\n"); return 2; diff --git a/tests/test_runtime.py b/tests/test_runtime.py index 5a2b300..022ddb4 100644 --- a/tests/test_runtime.py +++ b/tests/test_runtime.py @@ -194,6 +194,59 @@ def test_covar(lib): #################################### +def check_quantile(): + builder = Builder() + with builder: + inp1 = Input("a") + out1 = Output(WindowedQuantile(inp1, 10, 0.49), "ou1") + f = Function(builder.ops) + return "test_quantile", f, KunCompilerConfig(input_layout="TS", output_layout="TS", dtype="double", options={"no_fast_stat": 'no_warn'}) + +def test_quantile(lib): + modu = lib.getModule("test_quantile") + assert(modu) + inp = np.random.rand(200, 24).astype("float64") + executor = kr.createSingleThreadExecutor() + out = kr.runGraph(executor, modu, {"a": inp}, 0, 200) + outquantile = out["ou1"] + df = pd.DataFrame(inp) + expected_quantile = df.rolling(10).quantile(0.49, interpolation='linear').to_numpy() + np.testing.assert_allclose(outquantile, expected_quantile, rtol=1e-6, equal_nan=True) + +#################################### + +def check_large_rank(): + builder = Builder() + with builder: + inp1 = Input("a") + out1 = Output(TsRank(inp1, 200), "ou1") + out2 = Output(WindowedMin(inp1, 200), "min") + out3 = Output(WindowedMax(inp1, 200), "max") + out4 = Output(TsArgMin(inp1, 200), "argmin") + out5 = Output(TsArgMax(inp1, 200), "argmax") + f = Function(builder.ops) + return "test_large_rank", f, KunCompilerConfig(input_layout="TS", output_layout="TS", dtype="double", options={"no_fast_stat": 'no_warn'}) + +def test_large_rank(lib): + modu = lib.getModule("test_large_rank") + assert(modu) + inp = np.random.rand(2000, 24).astype("float64") + # test with duplicates + inp[400:410,:] = -1 + # inp[1400:1410,:] = 10 + executor = kr.createSingleThreadExecutor() + out = kr.runGraph(executor, modu, {"a": inp}, 0, 2000) + outrank = out["ou1"] + df = pd.DataFrame(inp) + expected_rank = df.rolling(200).rank().to_numpy() + np.testing.assert_allclose(outrank, expected_rank, rtol=1e-6, equal_nan=True) + np.testing.assert_allclose(out["min"], df.rolling(200).min().to_numpy(), rtol=1e-6, equal_nan=True) + np.testing.assert_allclose(out["max"], df.rolling(200).max().to_numpy(), rtol=1e-6, equal_nan=True) + np.testing.assert_allclose(out["argmin"], df.rolling(200).apply(np.argmin)+1, rtol=1e-6, equal_nan=True) + np.testing.assert_allclose(out["argmax"], df.rolling(200).apply(np.argmax)+1, rtol=1e-6, equal_nan=True) + +#################################### + def RefExpMovingAvg(v: pd.DataFrame): return v.ewm(span=5, adjust=False, ignore_na=True).mean() @@ -625,6 +678,8 @@ def rolling_max_dd(x, window_size, min_periods=1): check_aligned(), check_rank_alpha029(), check_covar(), + check_quantile(), + check_large_rank(), ] lib = cfake.compileit(funclist, "test", cfake.CppCompilerConfig(machine=get_compiler_flags())) @@ -649,4 +704,6 @@ def rolling_max_dd(x, window_size, min_periods=1): test_aligned(lib) test_loop_index() test_covar(lib) +test_quantile(lib) +test_large_rank(lib) print("done") From c8fd721aba9ec98a7ef98f6e73d53f34eb7b6ed5 Mon Sep 17 00:00:00 2001 From: Menooker Date: Thu, 11 Dec 2025 21:12:43 +0800 Subject: [PATCH 2/8] add skiplist --- cpp/Kun/SkipList.cpp | 297 +++++++++++++++++++++++++++++++++++ cpp/Kun/SkipList.hpp | 29 ++++ tests/capi/test_skiplist.cpp | 68 ++++++++ 3 files changed, 394 insertions(+) create mode 100644 cpp/Kun/SkipList.cpp create mode 100644 cpp/Kun/SkipList.hpp create mode 100644 tests/capi/test_skiplist.cpp diff --git a/cpp/Kun/SkipList.cpp b/cpp/Kun/SkipList.cpp new file mode 100644 index 0000000..f216bf3 --- /dev/null +++ b/cpp/Kun/SkipList.cpp @@ -0,0 +1,297 @@ +/* +Modified from +https://github.com/pandas-dev/pandas/blob/main/pandas/_libs/include/pandas/skiplist.h +Original copyright: +Copyright (c) 2016, PyData Development Team +All rights reserved. + +Distributed under the terms of the BSD Simplified License. + +The full license is in the LICENSE file, distributed with this software. + +Flexibly-sized, index-able skiplist data structure for maintaining a sorted +list of values + +Port of Wes McKinney's Cython version of Raymond Hettinger's original pure +Python recipe (https://rhettinger.wordpress.com/2010/02/06/lost-knowledge/) +*/ + +#include "SkipList.hpp" +#include +#include +#include +#include +#include +namespace kun { + +namespace detail { +template +class RcPtr { + T *ptr; + void deref() { + if (ptr) { + ptr->ref_count--; + if (ptr->ref_count == 0) { + delete ptr; + } + } + } + + public: + explicit RcPtr(T *ptr) noexcept : ptr{ptr} {} + RcPtr() noexcept : ptr{nullptr} {} + ~RcPtr() { deref(); } + RcPtr(const RcPtr &other) noexcept : ptr{other.ptr} { + if (ptr) { + ptr->ref_count++; + } + } + RcPtr(RcPtr &&other) noexcept : ptr{other.ptr} { other.ptr = nullptr; } + RcPtr &operator=(std::nullptr_t) noexcept { + deref(); + ptr = nullptr; + return *this; + } + RcPtr &operator=(RcPtr &&other) noexcept { + if (this == &other) { + return *this; + } + deref(); + ptr = other.ptr; + other.ptr = nullptr; + return *this; + } + RcPtr &operator=(const RcPtr &other) noexcept { + if (this == &other) { + return *this; + } + deref(); + ptr = other.ptr; + if (ptr) { + ptr->ref_count++; + } + return *this; + } + T *operator->() const { return ptr; } + T &operator*() const { return *ptr; } + bool operator==(const RcPtr &other) const { return ptr == other.ptr; } + bool operator!=(const RcPtr &other) const { return ptr != other.ptr; } + T *get() const { return ptr; } +}; + +static inline float __skiplist_nanf(void) { + const union { + int __i; + float __f; + } __bint = {0x7fc00000UL}; + return __bint.__f; +} +#define PANDAS_NAN ((double)__skiplist_nanf()) + +static inline double Log2(double val) { return std::log(val) / std::log(2.); } + +static inline double urand(void) { + return ((double)rand() + 1) / ((double)RAND_MAX + 2); +} + +static inline int int_min(int a, int b) { return a < b ? a : b; } + +struct Node { + std::vector> next; + std::unique_ptr width; + double value; + size_t index; + int is_nil; + int getLevels() const { return next.size(); } + int ref_count; + + Node(double value, size_t index, int levels) + : next(levels), width(new int[levels]), value(value), index(index), + is_nil(0), ref_count(1) {} + + // 1 if left < right, 0 if left == right, -1 if left > right + int cmp(double value) const { + if (is_nil || this->value > value) { + return -1; + } else if (this->value < value) { + return 1; + } else { + return 0; + } + } +}; + +struct SkipListImpl { + RcPtr head; + std::vector tmp_chain; + std::unique_ptr tmp_steps; + int size; + int getMaxLevels() const { return tmp_chain.size(); }; + SkipListImpl(int size) { + SkipListImpl *result = this; + int maxlevels, i; + + maxlevels = 1 + Log2((double)size); + result->tmp_chain = std::vector(maxlevels); + result->tmp_steps = std::unique_ptr(new int[maxlevels]); + result->size = 0; + + head = RcPtr(new Node(PANDAS_NAN, 0, maxlevels)); + RcPtr NIL{new Node(0.0, 0, 0)}; + NIL->is_nil = 1; + + for (i = 0; i < maxlevels; ++i) { + head->next[i] = NIL; + head->width[i] = 1; + } + } + + double get(int i, size_t &index, bool &ret) const { + int level; + + if (i < 0 || i >= size) { + ret = false; + return 0; + } + + Node *node = head.get(); + ++i; + for (int level = getMaxLevels() - 1; level >= 0; --level) { + while (node->width[level] <= i) { + i -= node->width[level]; + node = node->next[level].get(); + } + } + + ret = true; + index = node->index; + return node->value; + } + + // Returns the lowest rank of all elements with value `value`, as opposed to + // the + // highest rank returned by `skiplist_insert`. + int minRank(double value) const { + int rank = 0; + + Node *node = head.get(); + for (int level = getMaxLevels() - 1; level >= 0; --level) { + while (node->next[level]->cmp(value) > 0) { + rank += node->width[level]; + node = node->next[level].get(); + } + } + + return rank; + } + + // Returns the rank of the inserted element. When there are duplicates, + // `rank` is the highest of the group, i.e. the 'max' method of + // https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.rank.html + int insert(double value, size_t index) { + // Node *node, *prevnode, *newnode, *next_at_level; + int *steps_at_level = tmp_steps.get(); + // int size, steps, level, rank = 0; + int rank = 0; + + memset(steps_at_level, 0, getMaxLevels() * sizeof(int)); + + Node *node = head.get(); + + for (int level = getMaxLevels() - 1; level >= 0; --level) { + Node *next_at_level = node->next[level].get(); + while (next_at_level->cmp(value) >= 0) { + steps_at_level[level] += node->width[level]; + rank += node->width[level]; + node = next_at_level; + next_at_level = node->next[level].get(); + } + tmp_chain[level] = node; + } + + int size = int_min(getMaxLevels(), 1 - ((int)Log2(urand()))); + + RcPtr newnode{new Node(value, index, size)}; + int steps = 0; + + for (int level = 0; level < size; ++level) { + auto prevnode = tmp_chain[level]; + newnode->next[level] = prevnode->next[level]; + newnode->width[level] = prevnode->width[level] - steps; + prevnode->next[level] = newnode; + prevnode->width[level] = steps + 1; + + steps += steps_at_level[level]; + } + + for (int level = size; level < getMaxLevels(); ++level) { + tmp_chain[level]->width[level] += 1; + } + + ++(this->size); + + return rank; + } + + bool remove(double value) { + Node *node = head.get(); + + for (int level = getMaxLevels() - 1; level >= 0; --level) { + Node *next_at_level = node->next[level].get(); + while (next_at_level->cmp(value) > 0) { + node = next_at_level; + next_at_level = node->next[level].get(); + } + tmp_chain[level] = node; + } + + if (value != tmp_chain[0]->next[0]->value) { + return false; + } + + int size = tmp_chain[0]->next[0]->getLevels(); + + for (int level = 0; level < size; ++level) { + Node *prevnode = tmp_chain[level]; + + RcPtr tmpnode = prevnode->next[level]; + + prevnode->width[level] += tmpnode->width[level] - 1; + prevnode->next[level] = tmpnode->next[level]; + + tmpnode->next[level] = nullptr; + } + + for (int level = size; level < getMaxLevels(); ++level) { + --(tmp_chain[level]->width[level]); + } + + --(this->size); + return true; + } +}; + +} // namespace detail + +SkipList::SkipList(int size) : impl{new detail::SkipListImpl(size)} {} +SkipList::SkipList() = default; +void SkipList::init(int size) { + impl = std::unique_ptr(new detail::SkipListImpl(size)); +} + +SkipList::~SkipList() = default; + +int SkipList::insert(double value, size_t index) { + return impl->insert(value, index); +} + +bool SkipList::remove(double value) { return impl->remove(value); } + +int SkipList::minRank(double value) { return impl->minRank(value); } + +double SkipList::get(int rank, size_t &index, bool &found) { + return impl->get(rank, index, found); +} + +int SkipList::size() const { return impl->size; } +} // namespace kun \ No newline at end of file diff --git a/cpp/Kun/SkipList.hpp b/cpp/Kun/SkipList.hpp new file mode 100644 index 0000000..f58da9a --- /dev/null +++ b/cpp/Kun/SkipList.hpp @@ -0,0 +1,29 @@ +#pragma once +#include "Base.hpp" +#include +#include + +namespace kun { + +namespace detail { + struct SkipListImpl; +} + +struct KUN_API SkipList { + std::unique_ptr impl; + SkipList(int size); + SkipList(); + void init(int size); + ~SkipList(); + // Returns the rank of the inserted element. When there are duplicates + // `rank` is the highest of the group, i.e. the 'max' method of + // https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.rank.html + int insert(double value, size_t index); + // remove the first inserted element with the given value + bool remove(double value); + int minRank(double value); + double get(int rank, size_t& index, bool& found); + int size() const; +}; + +} \ No newline at end of file diff --git a/tests/capi/test_skiplist.cpp b/tests/capi/test_skiplist.cpp new file mode 100644 index 0000000..58e6aeb --- /dev/null +++ b/tests/capi/test_skiplist.cpp @@ -0,0 +1,68 @@ +#include +#include + +#define CHECK(V) \ + if (!(V)) { \ + printf("CHECK(" #V ") at " __FILE__ ":%d failed\n", __LINE__); \ + return false; \ + } + +bool testSkipList() { + kun::SkipList list(16); + list.insert(0, 0); + list.insert(0, 1); + for (int i = 2; i < 16; i++) { + list.insert(i, i); + } + // (0, 0), (0, 1), (2, 2), (3, 3), (4, 4), ..., (15, 15) + size_t index; + bool found; + auto result = list.get(0, index, found); + CHECK(found); + CHECK(result == 0); + CHECK(index == 0); + + // (0, 1), (2, 2), (3, 3), (4, 4), ..., (15, 15) + CHECK(list.remove(0)); + result = list.get(0, index, found); + CHECK(found); + CHECK(result == 0); + CHECK(index == 1); + + // (0, 1), (0, 16), (0, 17), (2, 2), (3, 3), (4, 4), ..., (15, 15) + CHECK(list.insert(0, 16) == 1); + CHECK(list.insert(0, 17) == 2); + result = list.get(0, index, found); + CHECK(found); + CHECK(result == 0); + CHECK(index == 1); + + // (0, 16), (0, 17), (2, 2), (3, 3), (4, 4), ..., (15, 15) + CHECK(list.remove(0)); + result = list.get(0, index, found); + CHECK(found); + CHECK(result == 0); + CHECK(index == 16); + + auto rank = list.minRank(0); + CHECK(rank == 0); + + // (0, 16), (0, 17), (2, 2), ..., (13, 13), (13, 18), (14, 14), ..., (15, + // 15) + CHECK(list.insert(13, 18) == 14); + CHECK(list.minRank(13) == 13); + result = list.get(13, index, found); + CHECK(found); + CHECK(result == 13); + CHECK(index == 13); + result = list.get(14, index, found); + CHECK(found); + CHECK(result == 13); + CHECK(index == 18); + result = list.get(15, index, found); + CHECK(found); + CHECK(result == 14); + CHECK(index == 14); + + return true; +} \ No newline at end of file From de1c03575b76fc3d423ede840e34fb07c67d8a56 Mon Sep 17 00:00:00 2001 From: Menooker Date: Fri, 12 Dec 2025 19:53:13 +0800 Subject: [PATCH 3/8] fix --- KunQuant/passes/Partitioner.py | 20 ++++++++++++++------ cpp/Kun/Ops/Quantile.hpp | 4 ++-- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/KunQuant/passes/Partitioner.py b/KunQuant/passes/Partitioner.py index cfe64fd..7f5c82f 100644 --- a/KunQuant/passes/Partitioner.py +++ b/KunQuant/passes/Partitioner.py @@ -103,7 +103,7 @@ def _is_bad_op_to_add(op: OpBase, partiton: _Partition): has_bad_input = True return has_bad_input, connected_to_parti -def _select_next(ready_ops: List[Tuple[OpBase, int]], info: Dict[OpBase, _PartitionOpInfo], partiton: _Partition, f: Function) -> OpBase: +def _select_next(ready_ops: List[Tuple[OpBase, int]], info: Dict[OpBase, _PartitionOpInfo], partiton: _Partition, f: Function, edge_ops: Dict[OpBase, bool]) -> OpBase: ''' Select the next op to put into the partition. 1. If there is CrossSectionalOp, always select it first @@ -113,7 +113,8 @@ def _select_next(ready_ops: List[Tuple[OpBase, int]], info: Dict[OpBase, _Partit ''' cur_best = (-1, -1, -1) # (is_loop, critical, score) cur_best_op: OpBase = None - edge_ops = partiton.get_edge_ops(f) + if edge_ops is None: + edge_ops = partiton.get_edge_ops(f) for op, idx in ready_ops: if isinstance(op, CrossSectionalOp): return op @@ -152,7 +153,13 @@ def _select_next(ready_ops: List[Tuple[OpBase, int]], info: Dict[OpBase, _Partit cur_best = score_tuple cur_best_op = op return cur_best_op - + +def _has_critical_ops(edge_ops: Dict[OpBase, bool]) -> bool: + for op, is_in_loop in edge_ops.items(): + if is_in_loop: + return True + return False + def _partition(f: Function, partition_thres = 3) -> List[_Partition]: opinfo = _collect_op_info(f) partitions: List[_Partition] = [] @@ -162,7 +169,7 @@ def _partition(f: Function, partition_thres = 3) -> List[_Partition]: while len(ready_ops): partition = _Partition(OrderedDict(), set()) # print("============\nnew partition:", partition) - selected = _select_next(ready_ops, opinfo, partition, f) + selected = _select_next(ready_ops, opinfo, partition, f, None) while selected: # remove the pending dependency. If an op is ready, put into ready queue def maintain_ready_queue(s_op: OpBase): @@ -198,7 +205,8 @@ def maintain_ready_queue(s_op: OpBase): partition.add(opinfo, inp) partition.add(opinfo, selected) # print("@@@add ", selected) - if partition.num_outputs > partition_thres: + next_edge_ops = partition.get_edge_ops(f) + if partition.num_outputs > partition_thres and not _has_critical_ops(next_edge_ops): # if an output is directly connected with the partition, add it direct_output = None for candidate, bat in ready_ops: @@ -213,7 +221,7 @@ def maintain_ready_queue(s_op: OpBase): continue # too many outputs visited, make a new partition break - selected = _select_next(ready_ops, opinfo, partition, f) + selected = _select_next(ready_ops, opinfo, partition, f, next_edge_ops) if partition.ops.__len__(): partitions.append(partition) if to_visit.__len__() != 0: diff --git a/cpp/Kun/Ops/Quantile.hpp b/cpp/Kun/Ops/Quantile.hpp index ea3bc0d..2511126 100644 --- a/cpp/Kun/Ops/Quantile.hpp +++ b/cpp/Kun/Ops/Quantile.hpp @@ -42,9 +42,9 @@ struct SkipListStateImpl { } }; -template +template struct SkipListState : SkipListStateImpl { - SkipListState() : SkipListStateImpl(window) {} + SkipListState() : SkipListStateImpl(expectedwindow) {} }; } // namespace From 9c62b8958191108ac38e8d4f7f04a0f3f259b3bf Mon Sep 17 00:00:00 2001 From: Menooker Date: Fri, 12 Dec 2025 20:59:11 +0800 Subject: [PATCH 4/8] fix --- KunQuant/ops/MiscOp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/KunQuant/ops/MiscOp.py b/KunQuant/ops/MiscOp.py index f3e7d92..263f8bf 100644 --- a/KunQuant/ops/MiscOp.py +++ b/KunQuant/ops/MiscOp.py @@ -132,7 +132,7 @@ class WindowedLinearRegressionConsumerTrait(StateConsumerTrait): class WindowedLinearRegressionImplBase(UnaryElementwiseOp, WindowedLinearRegressionConsumerTrait): def __init__(self, v: OpBase) -> None: - super().__init__([v]) + super().__init__(v) def verify(self, func: 'KunQuant.Stage.Function') -> None: if len(self.inputs) < 1 or not isinstance(self.inputs[0], WindowedLinearRegression): From 863d98ce6cbe3fa3dcc567257abbdb8c59914eae Mon Sep 17 00:00:00 2001 From: Menooker Date: Fri, 12 Dec 2025 21:07:46 +0800 Subject: [PATCH 5/8] fix --- KunQuant/ops/MiscOp.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/KunQuant/ops/MiscOp.py b/KunQuant/ops/MiscOp.py index 263f8bf..de98aa0 100644 --- a/KunQuant/ops/MiscOp.py +++ b/KunQuant/ops/MiscOp.py @@ -128,31 +128,30 @@ def generate_step_code(self, idx: str, time_idx: str, inputs: List[str], buf_nam class WindowedLinearRegressionConsumerTrait(StateConsumerTrait): - pass - -class WindowedLinearRegressionImplBase(UnaryElementwiseOp, WindowedLinearRegressionConsumerTrait): - def __init__(self, v: OpBase) -> None: - super().__init__(v) - def verify(self, func: 'KunQuant.Stage.Function') -> None: if len(self.inputs) < 1 or not isinstance(self.inputs[0], WindowedLinearRegression): raise RuntimeError("WindowedLinearRegressionImpl expects WindowedLinearRegression Op as input") - return super().verify(func) + return OpBase.verify(self, func) +class WindowedLinearRegressionImplUnaryBase(WindowedLinearRegressionConsumerTrait, UnaryElementwiseOp): + pass + +class WindowedLinearRegressionImplBinaryBase(WindowedLinearRegressionConsumerTrait, BinaryElementwiseOp): + pass -class WindowedLinearRegressionRSqaureImpl(WindowedLinearRegressionImplBase): +class WindowedLinearRegressionRSqaureImpl(WindowedLinearRegressionImplUnaryBase): ''' Compute RSqaure of Windowed Linear Regression ''' pass -class WindowedLinearRegressionSlopeImpl(WindowedLinearRegressionImplBase): +class WindowedLinearRegressionSlopeImpl(WindowedLinearRegressionImplUnaryBase): ''' Compute RSqaure of Windowed Linear Regression ''' pass -class WindowedLinearRegressionResiImpl(WindowedLinearRegressionImplBase): +class WindowedLinearRegressionResiImpl(WindowedLinearRegressionImplBinaryBase): ''' Compute RSqaure of Windowed Linear Regression ''' From 70bcb21bbf40ffd3d1c5ea67fa6bef4c923f1d7a Mon Sep 17 00:00:00 2001 From: Menooker Date: Fri, 12 Dec 2025 21:42:57 +0800 Subject: [PATCH 6/8] bump --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 6d9d01e..0316be1 100644 --- a/setup.py +++ b/setup.py @@ -73,7 +73,7 @@ def __init__(self, name, path, sourcedir=""): setup( name="KunQuant", - version="0.1.8" + git_ver, + version="0.1.9" + git_ver, description="A compiler, optimizer and executor for financial expressions and factors", long_description=open("Readme.md", encoding='utf-8').read(), long_description_content_type="text/markdown", From 01f52e2b49e83782d41f374e992df1e6618f4f24 Mon Sep 17 00:00:00 2001 From: Menooker Date: Fri, 12 Dec 2025 21:47:43 +0800 Subject: [PATCH 7/8] fix 158 --- tests/test_alpha158.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_alpha158.py b/tests/test_alpha158.py index 943df2a..48ec046 100644 --- a/tests/test_alpha158.py +++ b/tests/test_alpha158.py @@ -45,7 +45,7 @@ def check_alpha158(avx512, keep, tempdir): else: simd_len = 2 target = [("alpha158", f, KunCompilerConfig(dtype='double', blocking_len=simd_len, partition_factor=4, - output_layout="TS", input_layout="TS", options={"opt_reduce": True, "fast_log": True}))] + output_layout="TS", input_layout="TS", options={"opt_reduce": True, "fast_log": True, "no_fast_stat": True}))] if avx512: machine = cfake.X64CPUFlags(avx512=True, avx512dq=True, avx512vl=True) else: From ab955aabe309f7fdd7d6650aae98b6c7e71bf156 Mon Sep 17 00:00:00 2001 From: Menooker Date: Fri, 12 Dec 2025 22:58:47 +0800 Subject: [PATCH 8/8] fix mac seed --- tests/capi/test_c.cpp | 1 + tests/test_alpha158.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/capi/test_c.cpp b/tests/capi/test_c.cpp index acf031b..90d6931 100644 --- a/tests/capi/test_c.cpp +++ b/tests/capi/test_c.cpp @@ -130,6 +130,7 @@ int main(int args, char **argv) { printf("Bad args\n"); return 2; } + srand(114514); auto ret = testBatch(argv[1]); if (ret) { return ret; diff --git a/tests/test_alpha158.py b/tests/test_alpha158.py index 48ec046..943df2a 100644 --- a/tests/test_alpha158.py +++ b/tests/test_alpha158.py @@ -45,7 +45,7 @@ def check_alpha158(avx512, keep, tempdir): else: simd_len = 2 target = [("alpha158", f, KunCompilerConfig(dtype='double', blocking_len=simd_len, partition_factor=4, - output_layout="TS", input_layout="TS", options={"opt_reduce": True, "fast_log": True, "no_fast_stat": True}))] + output_layout="TS", input_layout="TS", options={"opt_reduce": True, "fast_log": True}))] if avx512: machine = cfake.X64CPUFlags(avx512=True, avx512dq=True, avx512vl=True) else: