Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ccpp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ jobs:
python tests/test.py
python tests/test2.py
python tests/test_runtime.py
python tests/test_stream.py
- name: Alpha158 test
working-directory: ./
run: |
Expand Down
19 changes: 16 additions & 3 deletions KunQuant/Driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# get the cpu architecture of the machine
from KunQuant.jit.env import cpu_arch as _cpu_arch

required_version = "0x64100003"
required_version = "kun::VERSION"
@dataclass
class KunCompilerConfig:
partition_factor : int = 3
Expand Down Expand Up @@ -232,7 +232,8 @@ def push_source(is_simple=False):
def query_temp_buf_id(tempname: str, window: int) -> int:
input_windows[tempname] = window
return insert_name_str(tempname, "TEMP").idx
src, decl = codegen_cpp(module_name, func, input_name_to_idx, ins, outs, options, stream_mode, query_temp_buf_id, input_windows, generated_cross_sectional_func, dtype, blocking_len, not allow_unaligned, is_single_source)
stream_state_buffer_init = []
src, decl = codegen_cpp(module_name, func, input_name_to_idx, ins, outs, options, stream_mode, query_temp_buf_id, input_windows, stream_state_buffer_init, generated_cross_sectional_func, dtype, blocking_len, not allow_unaligned, is_single_source)
impl_src.append(src)
decl_src.append(decl)
newparti = _Partition(func.name, len(partitions), pins, pouts)
Expand Down Expand Up @@ -309,6 +310,17 @@ def query_temp_buf_id(tempname: str, window: int) -> int:
{parti_dep_src2}
}}
''')

if len(stream_state_buffer_init) > 0:
stream_state_str = "\n ".join(stream_state_buffer_init)
impl_src.append(f'''static std::vector<StateBufferPtr> __init_state_buffers(size_t stock_count) {{
std::vector<StateBufferPtr> buffers;
buffers.reserve({len(stream_state_buffer_init)});
{stream_state_str}
return buffers;
}}
''')

dty = dtype[0].upper() + dtype[1:]
impl_src.append(f'''KUN_EXPORT Module {module_name}{{
{required_version},
Expand All @@ -320,7 +332,8 @@ def query_temp_buf_id(tempname: str, window: int) -> int:
MemoryLayout::{output_layout},
{blocking_len},
Datatype::{dty},
{"0" if allow_unaligned else "1"}
{"0" if allow_unaligned else "1"},
{"nullptr" if len(stream_state_buffer_init) == 0 else "__init_state_buffers"}
}};''')
push_source()
if not is_single_source:
Expand Down
15 changes: 15 additions & 0 deletions KunQuant/Op.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,21 @@ def generate_init_code(self, idx: str, elem_type: str, simd_lanes: int, inputs:
inputs: the input variables of the op
'''
return f"{self.get_func_or_class_full_name(elem_type, simd_lanes)} {self.get_state_variable_name_prefix()}{idx};"
def generate_init_code_stream(self, local_idx: str, buffer_idx: str, elem_type: str, simd_lanes: int, inputs: List[str], aligned: bool) -> Tuple[str, str]:
'''
generate the code for the initialization of the state variable
local_idx: the output variable name index
buffer_idx: the buffer index in ctx->state_buffers
elem_type: the element type of the state variable
simd_lanes: SIMD lanes
inputs: the input variables of the op

Returns:
["the code for the initialization of the state variable in the compute function", "initalizer code for the state buffer"]
'''
typename = self.get_func_or_class_full_name(elem_type, simd_lanes)
return f"{typename}& {self.get_state_variable_name_prefix()}{local_idx} = __ctx->state_buffers[{buffer_idx}]->get<{typename}>(__stock_idx);",\
f"buffers.emplace_back(makeStateBuffer<{typename}>(stock_count, {simd_lanes}));"


class GloablStatefulOpTrait(StatefulOpTrait):
Expand Down
7 changes: 6 additions & 1 deletion KunQuant/ops/MiscOp.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import KunQuant
from KunQuant.Op import AcceptSingleValueInputTrait, Input, OpBase, WindowedTrait, SinkOpTrait, CrossSectionalOp, GlobalStatefulProducerTrait, GloablStatefulOpTrait, StateConsumerTrait, UnaryElementwiseOp, BinaryElementwiseOp
from typing import List, Union
from typing import List, Tuple, Union

class BackRef(OpBase, WindowedTrait):
'''
Expand Down Expand Up @@ -100,6 +100,11 @@ def get_single_value_input_id(self) -> int:
def get_state_variable_name_prefix(self) -> str:
return "ema_"

def generate_init_code_stream(self, local_idx: str, buffer_idx: str, elem_type: str, simd_lanes: int, inputs: List[str], aligned: bool) -> Tuple[str, str]:
if len(self.inputs) == 2:
raise RuntimeError("EMA with init_val is not supported in stream mode")
return super().generate_init_code_stream(local_idx, buffer_idx, elem_type, simd_lanes, inputs, aligned)

def generate_init_code(self, idx: str, elem_type: str, simd_lanes: int, inputs: List[str], aligned: bool) -> str:
initv = "NAN"
if len(self.inputs) == 2:
Expand Down
11 changes: 8 additions & 3 deletions KunQuant/passes/CodegenCpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def _generate_cross_sectional_func_name(op: GenericCrossSectionalOp, inputs: Lis
name.append(layout)
return f"{op.__class__.__name__}_{'_'.join(name)}"

def codegen_cpp(prefix: str, f: Function, input_name_to_idx: Dict[str, int], inputs: List[Tuple[Input, bool]], outputs: List[Tuple[Output, bool]], options: dict, stream_mode: bool, query_temp_buffer_id, stream_window_size: Dict[str, int], generated_cross_sectional_func: Set[str], elem_type: str, simd_lanes: int, aligned: bool, static: bool) -> Tuple[str, str]:
def codegen_cpp(prefix: str, f: Function, input_name_to_idx: Dict[str, int], inputs: List[Tuple[Input, bool]], outputs: List[Tuple[Output, bool]], options: dict, stream_mode: bool, query_temp_buffer_id, stream_window_size: Dict[str, int], stream_state_buffer_init: List[str], generated_cross_sectional_func: Set[str], elem_type: str, simd_lanes: int, aligned: bool, static: bool) -> Tuple[str, str]:
if len(f.ops) == 3 and isinstance(f.ops[1], SimpleCrossSectionalOp):
return "", f'''static auto stage_{prefix}__{f.name} = {f.ops[1].__class__.__name__}Stocks<Mapper{f.ops[0].attrs["layout"]}<{elem_type}, {simd_lanes}>, Mapper{f.ops[2].attrs["layout"]}<{elem_type}, {simd_lanes}>>;'''

Expand Down Expand Up @@ -281,14 +281,19 @@ def codegen_cpp(prefix: str, f: Function, input_name_to_idx: Dict[str, int], inp
funcname = "SkipListArgMin"
scope.scope.append(_CppSingleLine(scope, f'auto v{idx} = {funcname}<{elem_type}, {simd_lanes}>(v{inp[0]}, i);'))
elif isinstance(op, GloablStatefulOpTrait):
if stream_mode: raise RuntimeError(f"Stream Mode does not support {op.__class__.__name__}")
assert(op.get_parent() is None)
args = {}
if isinstance(op, WindowedTrait):
buf_name = _get_buffer_name(op.inputs[0], inp[0])
args["buf_name"] = buf_name
vargs = [f"v{inpv}" for inpv in inp]
toplevel.scope.insert(-1, _CppSingleLine(toplevel, op.generate_init_code(idx, elem_type, simd_lanes, vargs, aligned)))
if stream_mode:
cur_idx = len(stream_state_buffer_init)
var_init_code, init_buffer_code = op.generate_init_code_stream(idx, cur_idx, elem_type, simd_lanes, vargs, aligned)
toplevel.scope.insert(-1, _CppSingleLine(toplevel, var_init_code))
stream_state_buffer_init.append(init_buffer_code)
else:
toplevel.scope.insert(-1, _CppSingleLine(toplevel, op.generate_init_code(idx, elem_type, simd_lanes, vargs, aligned)))
scope.scope.append(_CppSingleLine(scope, op.generate_step_code(idx, "i", vargs, **args)))
elif isinstance(op, Select):
scope.scope.append(_CppSingleLine(scope, f"auto v{idx} = Select(v{inp[0]}, v{inp[1]}, v{inp[2]});"))
Expand Down
3 changes: 3 additions & 0 deletions cpp/Kun/Context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

#include "Stage.hpp"
#include "StreamBuffer.hpp"
#include "StateBuffer.hpp"
#include <atomic>
#include <memory>
#include <stdlib.h>
#include <stddef.h>

namespace kun {

static const uint64_t VERSION = 0x64100004;
struct KUN_API RuntimeStage {
const Stage *stage;
Context *ctx;
Expand Down Expand Up @@ -127,6 +129,7 @@ struct Context {
size_t simd_len;
Datatype dtype;
bool is_stream;
StateBufferPtr* state_buffers;
};

KUN_API std::shared_ptr<Executor> createSingleThreadExecutor();
Expand Down
10 changes: 10 additions & 0 deletions cpp/Kun/MathUtil.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#pragma once
#include <cstddef>

namespace kun {
namespace {
size_t divideAndCeil(size_t x, size_t y) { return (x + y - 1) / y; }
size_t roundUp(size_t x, size_t y) { return divideAndCeil(x, y) * y; }

} // namespace
} // namespace kun
7 changes: 4 additions & 3 deletions cpp/Kun/Module.hpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#pragma once

#include "Stage.hpp"
#include <memory>
#include "StateBuffer.hpp"
#include <functional>
#include <vector>

namespace kun {

Expand All @@ -12,7 +13,6 @@ enum class MemoryLayout {
STREAM,
};


struct Module {
size_t required_version;
size_t num_stages;
Expand All @@ -24,11 +24,12 @@ struct Module {
size_t blocking_len;
Datatype dtype;
size_t aligned;
std::vector<StateBufferPtr> (*init_state_buffers)(size_t num_stocks);
};

struct Library {
void *handle;
std::function<void(Library*)> dtor;
std::function<void(Library *)> dtor;
KUN_API const Module *getModule(const char *name);
KUN_API static std::shared_ptr<Library> load(const char *filename);
Library(const Library &) = delete;
Expand Down
1 change: 1 addition & 0 deletions cpp/Kun/Ops.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ struct ExpMovingAvg {
using simd_int_t =
kun_simd::vec<typename kun_simd::fp_trait<T>::int_t, stride>;
simd_t v;
ExpMovingAvg() : v{NAN} {}
ExpMovingAvg(const simd_t &init) : v{init} {}
static constexpr T weight_latest = T(2.0) / (window + 1);
simd_t step(simd_t cur, size_t index) {
Expand Down
2 changes: 2 additions & 0 deletions cpp/Kun/RunGraph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "Context.hpp"
#include "Module.hpp"
#include "StateBuffer.hpp"
#include <unordered_map>
#include <string>
#include <vector>
Expand Down Expand Up @@ -34,6 +35,7 @@ struct AlignedPtr {

struct KUN_API StreamContext {
std::vector<AlignedPtr> buffers;
std::vector<StateBufferPtr> state_buffers;
Context ctx;
const Module *m;
StreamContext(std::shared_ptr<Executor> exec, const Module *m,
Expand Down
41 changes: 31 additions & 10 deletions cpp/Kun/Runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,6 @@
#define kunAlignedFree(x) free(x)
#endif

#ifdef __AVX__
#define MALLOC_ALIGNMENT 64 // AVX-512 alignment
#else
#define MALLOC_ALIGNMENT 16 // NEON alignment
#endif

#if CHECKED_PTR
#include <assert.h>
#include <sys/mman.h>
Expand Down Expand Up @@ -76,11 +70,10 @@ void checkedDealloc(void *ptr, size_t sz) {
#endif

namespace kun {
static const uint64_t VERSION = 0x64100003;

void Buffer::alloc(size_t count, size_t use_count, size_t elem_size) {
if (!ptr) {
ptr = (float *)kunAlignedAlloc(MALLOC_ALIGNMENT, count * elem_size);
ptr = (float *)kunAlignedAlloc(KUN_MALLOC_ALIGNMENT, count * elem_size);
refcount = (int)use_count;
#if CHECKED_PTR
size = count * elem_size;
Expand Down Expand Up @@ -230,7 +223,7 @@ void corrWith(std::shared_ptr<Executor> exec, MemoryLayout layout,
length,
8,
Datatype::Float,
false};
false, nullptr};
std::vector<RuntimeStage> &stages = ctx.stages;
stages.reserve(buffers.size());
for (size_t i = 0; i < buffers.size(); i++) {
Expand Down Expand Up @@ -284,7 +277,7 @@ void runGraph(std::shared_ptr<Executor> exec, const Module *m,
length,
m->blocking_len,
m->dtype,
false};
false, nullptr};
std::vector<RuntimeStage> &stages = ctx.stages;
stages.reserve(m->num_stages);
for (size_t i = 0; i < m->num_stages; i++) {
Expand Down Expand Up @@ -368,6 +361,12 @@ StreamContext::StreamContext(std::shared_ptr<Executor> exec, const Module *m,
throw std::runtime_error(
"Cannot run batch mode module via StreamContext");
}
if (m->init_state_buffers) {
state_buffers = m->init_state_buffers(num_stocks);
for (auto &buf : state_buffers) {
buf->initialize();
}
}
std::vector<Buffer> rtlbuffers;
rtlbuffers.reserve(m->num_buffers);
buffers.reserve(m->num_buffers);
Expand Down Expand Up @@ -404,6 +403,7 @@ StreamContext::StreamContext(std::shared_ptr<Executor> exec, const Module *m,
ctx.dtype = m->dtype;
ctx.is_stream = true;
ctx.simd_len = m->blocking_len;
ctx.state_buffers = state_buffers.data();
}

size_t StreamContext::queryBufferHandle(const char *name) const {
Expand Down Expand Up @@ -458,4 +458,25 @@ void StreamContext::run() {
}

StreamContext::~StreamContext() = default;


StateBuffer *StateBuffer::make(size_t num_objs, size_t elem_size,
CtorFn_t ctor_fn, DtorFn_t dtor_fn) {
auto ret = kunAlignedAlloc(KUN_MALLOC_ALIGNMENT,
sizeof(StateBuffer) + num_objs * elem_size);
auto buf = (StateBuffer *)ret;
buf->num_objs = num_objs;
buf->elem_size = elem_size;
buf->initialized = 0;
buf->ctor_fn = ctor_fn;
buf->dtor_fn = dtor_fn;
return buf;
}

void StateBuffer::Deleter::operator()(StateBuffer *buf) {
if (buf->initialized) {
buf->dtor_fn(buf);
}
kunAlignedFree(buf);
}
} // namespace kun
72 changes: 72 additions & 0 deletions cpp/Kun/StateBuffer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#pragma once

#include "Base.hpp"
#include "MathUtil.hpp"
#include <cstdint>
#include <memory>

namespace kun {

#ifdef __AVX__
#define KUN_MALLOC_ALIGNMENT 64 // AVX-512 alignment
#else
#define KUN_MALLOC_ALIGNMENT 16 // NEON alignment
#endif

struct StateBuffer {
using DtorFn_t = void (*)(StateBuffer *obj);
using CtorFn_t = void (*)(StateBuffer *obj);

alignas(KUN_MALLOC_ALIGNMENT) size_t num_objs;
uint32_t elem_size;
uint32_t initialized;
CtorFn_t ctor_fn;
DtorFn_t dtor_fn;
alignas(KUN_MALLOC_ALIGNMENT) char buf[0];

KUN_API static StateBuffer *make(size_t num_objs, size_t elem_size,
CtorFn_t ctor_fn, DtorFn_t dtor_fn);

// for std::unique_ptr
struct Deleter {
KUN_API void operator()(StateBuffer *buf);
};

template <typename T>
T &get(size_t idx) {
return *reinterpret_cast<T *>(buf + idx * sizeof(T));
}

void initialize() {
initialized = 1;
ctor_fn(this);
}
void destroy() {
if (initialized) {
dtor_fn(this);
}
initialized = 0;
}

private:
StateBuffer() = default;
};

using StateBufferPtr = std::unique_ptr<StateBuffer, StateBuffer::Deleter>;

template <typename T>
StateBufferPtr makeStateBuffer(size_t num_stocks, size_t simd_len) {
return StateBufferPtr(StateBuffer::make(
divideAndCeil(num_stocks, simd_len), sizeof(T),
[](StateBuffer *obj) {
for (size_t i = 0; i < obj->num_objs; i++) {
new (&obj->get<T>(i)) T();
}
},
[](StateBuffer *obj) {
for (size_t i = 0; i < obj->num_objs; i++) {
obj->get<T>(i).~T();
}
}));
}
} // namespace kun
Loading