Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions KunQuant/Driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def compileit(f: Function, module_name: str, partition_factor = 3, dtype = "floa
blocking_len = suggested_len[dtype]
if element_size[dtype] * blocking_len not in simd_len:
raise RuntimeError(f"Blocking length {blocking_len} is not supported for {dtype} on {_cpu_arch}")
options['blocking_len'] = blocking_len
if output_layout not in ["STs", "TS", "STREAM"]:
raise RuntimeError("Bad output_layout name " + output_layout)
if input_layout not in ["STs", "TS", "STREAM"]:
Expand Down
15 changes: 15 additions & 0 deletions KunQuant/Op.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ def traverse_replace_map(op: 'OpBase', replace_map: Dict['OpBase', 'OpBase']) ->
return traverse_replace_map(found, replace_map)

class AcceptSingleValueInputTrait(ABC):
'''
The ops that accept a time_len=1 array as input
'''
@abstractmethod
def get_single_value_input_id() -> int:
pass
Expand Down Expand Up @@ -479,6 +482,18 @@ class GloablStatefulOpTrait(StatefulOpTrait):
'''
pass

class GlobalStatefulProducerTrait(GloablStatefulOpTrait):
'''
The ops that have an internal state, and the state is carried between different time steps, and the state must be consumed by a StateConsumerTrait
'''
pass

class StateConsumerTrait:
'''
The ops that consume a state from a GlobalStatefulProducerTrait
'''
pass

class ReductionOp(OpBase, StatefulOpTrait):
'''
Base class of all reduction ops. A reduction op takes inputs that is originated from a IterValue. The input must be in a loop (v.get_parent() is a loop). The data produced
Expand Down
130 changes: 96 additions & 34 deletions KunQuant/ops/CompOp.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
from .ReduceOp import ReduceAdd, ReduceMul, ReduceArgMax, ReduceRank, ReduceMin, ReduceMax, ReduceDecayLinear, ReduceArgMin
from KunQuant.Op import ConstantOp, OpBase, CompositiveOp, WindowedTrait, ForeachBackWindow, WindowedTempOutput, Builder, IterValue, WindowLoopIndex
from .ElewiseOp import And, DivConst, GreaterThan, LessThan, Or, Select, SetInfOrNanToValue, Sub, Mul, Sqrt, SubConst, Div, CmpOp, Exp, Log, Min, Max, Equals, Abs
from .MiscOp import Accumulator, BackRef, SetAccumulator, WindowedLinearRegression, WindowedLinearRegressionResiImpl, WindowedLinearRegressionRSqaureImpl, WindowedLinearRegressionSlopeImpl, ReturnFirstValue
from .MiscOp import Accumulator, BackRef, SetAccumulator, WindowedLinearRegression, WindowedLinearRegressionResiImpl,\
WindowedLinearRegressionRSqaureImpl, WindowedLinearRegressionSlopeImpl, ReturnFirstValue, SkipListState, SkipListQuantile, SkipListRank, SkipListMin, SkipListMax,\
SkipListArgMin
from collections import OrderedDict
from typing import Union, List, Tuple, Dict
import math

def _is_fast_stat(opt: dict, attrs: dict) -> bool:
return not opt.get("no_fast_stat", True) and not attrs.get("no_fast_stat", False)

def _decide_use_skip_list(window: int, blocking_len: int) -> bool:
naive_cost = window
skip_list_cost = math.log2(window) * blocking_len * 5
return skip_list_cost < naive_cost

class WindowedCompositiveOp(CompositiveOp, WindowedTrait):
def __init__(self, v: OpBase, window: int, v2 = None) -> None:
inputs = [v]
Expand All @@ -17,7 +24,7 @@ def __init__(self, v: OpBase, window: int, v2 = None) -> None:
super().__init__(inputs, [("window", window)])

class WindowedReduce(WindowedCompositiveOp):
def make_reduce(self, v: OpBase) -> OpBase:
def make_reduce(self, v: OpBase, newest: OpBase) -> OpBase:
raise RuntimeError("Not implemented")

def decompose(self, options: dict) -> List[OpBase]:
Expand All @@ -26,7 +33,7 @@ def decompose(self, options: dict) -> List[OpBase]:
v0 = WindowedTempOutput(self.inputs[0], self.attrs["window"])
v1 = ForeachBackWindow(v0, self.attrs["window"])
itr = IterValue(v1, v0)
v2 = self.make_reduce(itr)
v2 = self.make_reduce(itr, self.inputs[0])
return b.ops

class WindowedSum(WindowedReduce):
Expand All @@ -35,7 +42,7 @@ class WindowedSum(WindowedReduce):
For indices < window-1, the output will be NaN
similar to pandas.DataFrame.rolling(n).sum()
'''
def make_reduce(self, v: OpBase) -> OpBase:
def make_reduce(self, v: OpBase, newest: OpBase) -> OpBase:
return ReduceAdd(v)

class WindowedProduct(WindowedReduce):
Expand All @@ -44,26 +51,55 @@ class WindowedProduct(WindowedReduce):
For indices < window-1, the output will be NaN
similar to pandas.DataFrame.rolling(n).product()
'''
def make_reduce(self, v: OpBase) -> OpBase:
def make_reduce(self, v: OpBase, newest: OpBase) -> OpBase:
return ReduceMul(v)

class WindowedMin(WindowedReduce):
class _WindowedMinMaxBase(WindowedReduce):
'''
Base class for windowed min/max ops that can use skip list. If the window is small enough, use naive linear scan. Otherwise, use skip list
with Log(window) complexity.
'''
def on_skip_list(self, skplist: SkipListState, cur: OpBase) -> OpBase:
raise RuntimeError("Not implemented")

def decompose(self, options: dict) -> List[OpBase]:
window = self.attrs["window"]
blocking_len = options["blocking_len"]
if _decide_use_skip_list(window, blocking_len):
b = Builder(self.get_parent())
with b:
newv = self.inputs[0]
oldv = BackRef(newv, window)
v2 = SkipListState(oldv, newv, window)
self.on_skip_list(v2, newv)
return b.ops
else:
return super().decompose(options)

class WindowedMin(_WindowedMinMaxBase):
'''
Min of a rolling look back window, including the current newest data.
For indices < window-1, the output will be NaN
similar to pandas.DataFrame.rolling(n).min()
'''
def make_reduce(self, v: OpBase) -> OpBase:
def make_reduce(self, v: OpBase, newest: OpBase) -> OpBase:
return ReduceMin(v)

class WindowedMax(WindowedReduce):
def on_skip_list(self, skplist: SkipListState, cur: OpBase) -> OpBase:
return SkipListMin(skplist)


class WindowedMax(_WindowedMinMaxBase):
'''
Max of a rolling look back window, including the current newest data.
For indices < window-1, the output will be NaN
similar to pandas.DataFrame.rolling(n).max()
'''
def make_reduce(self, v: OpBase) -> OpBase:
def make_reduce(self, v: OpBase, newest: OpBase) -> OpBase:
return ReduceMax(v)

def on_skip_list(self, skplist: SkipListState, cur: OpBase) -> OpBase:
return SkipListMax(skplist)

# small - sum
def _kahan_sub(mask: OpBase, sum: OpBase, small: OpBase, compensation: OpBase) -> Union[OpBase, OpBase]:
Expand Down Expand Up @@ -424,51 +460,54 @@ def decompose(self, options: dict) -> List[OpBase]:
return b.ops


class TsArgMax(WindowedCompositiveOp):
class TsArgMax(WindowedReduce):
'''
ArgMax in a rolling look back window, including the current newest data.
The result should be the index of the max element in the rolling window. The index of the oldest element of the rolling window is 1.
Similar to df.rolling(window).apply(np.argmax) + 1
'''
def decompose(self, options: dict) -> List[OpBase]:
b = Builder(self.get_parent())
with b:
v0 = WindowedTempOutput(self.inputs[0], self.attrs["window"])
v1 = ForeachBackWindow(v0, self.attrs["window"])
v2 = ReduceArgMax(IterValue(v1, v0))
v3 = SubConst(v2, self.attrs["window"], True)
return b.ops
window = self.attrs["window"]
blocking_len = options["blocking_len"]
if _decide_use_skip_list(window, blocking_len):
b = Builder(self.get_parent())
with b:
TsArgMin(0-self.inputs[0], window)
return b.ops
else:
return super().decompose(options)

def make_reduce(self, v: OpBase, newest: OpBase) -> OpBase:
v2 = ReduceArgMax(v)
return self.attrs["window"] - v2

class TsArgMin(WindowedCompositiveOp):
class TsArgMin(_WindowedMinMaxBase):
'''
ArgMin in a rolling look back window, including the current newest data.
The result should be the index of the min element in the rolling window. The index of the oldest element of the rolling window is 1.
Similar to df.rolling(window).apply(np.argmin) + 1
'''
def decompose(self, options: dict) -> List[OpBase]:
b = Builder(self.get_parent())
with b:
v0 = WindowedTempOutput(self.inputs[0], self.attrs["window"])
v1 = ForeachBackWindow(v0, self.attrs["window"])
v2 = ReduceArgMin(IterValue(v1, v0))
v3 = SubConst(v2, self.attrs["window"], True)
return b.ops
def on_skip_list(self, skplist: SkipListState, cur: OpBase) -> OpBase:
return SkipListArgMin([skplist], [])

def make_reduce(self, v: OpBase, newest: OpBase) -> OpBase:
v2 = ReduceArgMin(v)
return self.attrs["window"] - v2


class TsRank(WindowedCompositiveOp):
class TsRank(_WindowedMinMaxBase):
'''
Time series rank of the newest data in a rolling look back window, including the current newest data.
Let num_values_less = the number of values in rolling window that is less than the current newest data.
Let num_values_eq = the number of values in rolling window that is equal to the current newest data.
rank = num_values_less + (num_values_eq + 1) / 2
Similar to df.rolling(window).rank()
'''
def decompose(self, options: dict) -> List[OpBase]:
b = Builder(self.get_parent())
with b:
v0 = WindowedTempOutput(self.inputs[0], self.attrs["window"])
v1 = ForeachBackWindow(v0, self.attrs["window"])
v2 = ReduceRank(IterValue(v1, v0), self.inputs[0])
return b.ops
def on_skip_list(self, skplist: SkipListState, cur: OpBase) -> OpBase:
return SkipListRank(skplist, cur)

def make_reduce(self, v: OpBase, newest: OpBase) -> OpBase:
return ReduceRank(v, newest)

class Clip(CompositiveOp):
'''
Expand Down Expand Up @@ -636,4 +675,27 @@ def decompose(self, options: dict) -> List[OpBase]:
filtered = Select(index >= max_bar_index, IterValue(each, v), inf)
trough = ReduceMin(filtered)
(peak - trough) / peak
return b.ops


class WindowedQuantile(CompositiveOp, WindowedTrait):
'''
Quantile in `window` rows ago.
Similar to pd.rolling(window).quantile(q, interpolation='linear')
'''
def __init__(self, v: OpBase, window: int, q: float) -> None:
super().__init__([v], [("window", window), ("q", q)])

def required_input_window(self) -> int:
return self.attrs["window"] + 1

def decompose(self, options: dict) -> List[OpBase]:
b = Builder(self.get_parent())
window = self.attrs["window"]
v = self.inputs[0]
q = self.attrs["q"]
with b:
old = BackRef(v, window)
v2 = SkipListState(old, v, window)
v3 = SkipListQuantile(v2, q)
return b.ops
75 changes: 51 additions & 24 deletions KunQuant/ops/MiscOp.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import KunQuant
from KunQuant.Op import AcceptSingleValueInputTrait, Input, OpBase, WindowedTrait, SinkOpTrait, CrossSectionalOp, GloablStatefulOpTrait, UnaryElementwiseOp, BinaryElementwiseOp
from KunQuant.Op import AcceptSingleValueInputTrait, Input, OpBase, WindowedTrait, SinkOpTrait, CrossSectionalOp, GlobalStatefulProducerTrait, GloablStatefulOpTrait, StateConsumerTrait, UnaryElementwiseOp, BinaryElementwiseOp
from typing import List, Union

class BackRef(OpBase, WindowedTrait):
Expand All @@ -12,16 +12,6 @@ def __init__(self, v: OpBase, window: int) -> None:
def required_input_window(self) -> int:
return self.attrs["window"] + 1

class WindowedQuantile(OpBase, WindowedTrait):
'''
Quantile in `window` rows ago.
Similar to pd.rolling(window).quantile(q, interpolation='linear')
'''
def __init__(self, v: OpBase, window: int, q: float) -> None:
super().__init__([v], [("window", window), ("q", q)])

def required_input_window(self) -> int:
return self.attrs["window"] + 1

class FastWindowedSum(OpBase, WindowedTrait, GloablStatefulOpTrait):
'''
Expand All @@ -39,7 +29,7 @@ def get_state_variable_name_prefix(self) -> str:
def generate_step_code(self, idx: str, time_idx: str, inputs: List[str], buf_name: str) -> str:
return f"auto v{idx} = sum_{idx}.step({buf_name}, {inputs[0]}, {time_idx});"

class Accumulator(OpBase, GloablStatefulOpTrait):
class Accumulator(OpBase, GlobalStatefulProducerTrait):
'''
Accumulator is a stateful op that accumulates the input value over time.
It can be used to compute running totals, moving averages, etc.'''
Expand All @@ -64,7 +54,7 @@ def verify(self, func) -> None:
raise RuntimeError(f"Accumulator {self.attrs['name']} is not used with any SetAccumulator")
return super().verify(func)

class SetAccumulator(OpBase):
class SetAccumulator(OpBase, StateConsumerTrait):
'''
Set the value of an Accumulator to a value, if mask is set. Otherwise, it does nothing.
'''
Expand Down Expand Up @@ -120,7 +110,7 @@ def generate_init_code(self, idx: str, elem_type: str, simd_lanes: int, inputs:
def generate_step_code(self, idx: str, time_idx: str, inputs: List[str]) -> str:
return f"auto v{idx} = ema_{idx}.step({inputs[0]}, {time_idx});"

class WindowedLinearRegression(OpBase, WindowedTrait, GloablStatefulOpTrait):
class WindowedLinearRegression(OpBase, WindowedTrait, GlobalStatefulProducerTrait):
'''
Compute states of Windowed Linear Regression
'''
Expand All @@ -135,37 +125,74 @@ def get_state_variable_name_prefix(self) -> str:

def generate_step_code(self, idx: str, time_idx: str, inputs: List[str], buf_name: str) -> str:
return f"const auto& v{idx} = linear_{idx}.step({buf_name}, {inputs[0]}, {time_idx});"

class WindowedLinearRegressionImplBase(OpBase):
def __init__(self, v: OpBase) -> None:
super().__init__([v])



class WindowedLinearRegressionConsumerTrait(StateConsumerTrait):
def verify(self, func: 'KunQuant.Stage.Function') -> None:
if len(self.inputs) < 1 or not isinstance(self.inputs[0], WindowedLinearRegression):
raise RuntimeError("WindowedLinearRegressionImpl expects WindowedLinearRegression Op as input")
return super().verify(func)
return OpBase.verify(self, func)

class WindowedLinearRegressionImplUnaryBase(WindowedLinearRegressionConsumerTrait, UnaryElementwiseOp):
pass

class WindowedLinearRegressionConsumerTrait:
class WindowedLinearRegressionImplBinaryBase(WindowedLinearRegressionConsumerTrait, BinaryElementwiseOp):
pass

class WindowedLinearRegressionRSqaureImpl(UnaryElementwiseOp, WindowedLinearRegressionConsumerTrait):
class WindowedLinearRegressionRSqaureImpl(WindowedLinearRegressionImplUnaryBase):
'''
Compute RSqaure of Windowed Linear Regression
'''
pass

class WindowedLinearRegressionSlopeImpl(UnaryElementwiseOp, WindowedLinearRegressionConsumerTrait):
class WindowedLinearRegressionSlopeImpl(WindowedLinearRegressionImplUnaryBase):
'''
Compute RSqaure of Windowed Linear Regression
'''
pass

class WindowedLinearRegressionResiImpl(BinaryElementwiseOp, WindowedLinearRegressionConsumerTrait):
class WindowedLinearRegressionResiImpl(WindowedLinearRegressionImplBinaryBase):
'''
Compute RSqaure of Windowed Linear Regression
'''
pass

class SkipListState(OpBase, GlobalStatefulProducerTrait):
'''
SkipListState is a stateful op that maintains a skip list of the input values.
'''
def __init__(self, oldvalue: OpBase, value: OpBase, window: int) -> None:
super().__init__([oldvalue, value], [("window", window)])

def get_state_variable_name_prefix(self) -> str:
return "skip_list_"

def generate_step_code(self, idx: str, time_idx: str, inputs: List[str]) -> str:
return f"auto& v{idx} = skip_list_{idx}.step({inputs[0]}, {inputs[1]}, {time_idx});"

class SkipListConsumerOp(StateConsumerTrait):
def verify(self, func: 'KunQuant.Stage.Function') -> None:
if len(self.inputs) < 1 or not isinstance(self.inputs[0], SkipListState):
raise RuntimeError("SkipListConsumerOp expects SkipListState Op as input")
return super().verify(func)

class SkipListQuantile(SkipListConsumerOp, OpBase):
def __init__(self, v: OpBase, q: float) -> None:
super().__init__([v], [("q", q)])

class SkipListRank(SkipListConsumerOp, BinaryElementwiseOp):
pass

class SkipListMin(SkipListConsumerOp, UnaryElementwiseOp):
pass

class SkipListMax(SkipListConsumerOp, UnaryElementwiseOp):
pass

class SkipListArgMin(SkipListConsumerOp, OpBase):
pass


class GenericCrossSectionalOp(CrossSectionalOp):
'''
Cross sectional op with customized C++ implementation.
Expand Down
Loading