Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ struct ReservoirQuantileState {
void FillReservoir(idx_t sample_size, T element) {
if (pos < sample_size) {
v[pos++] = element;
r_samp->InitializeReservoirWeights(pos, len);
r_samp->InitializeReservoir(pos, len);
} else {
D_ASSERT(r_samp->next_index_to_sample >= r_samp->num_entries_to_skip_b4_next_sample);
if (r_samp->next_index_to_sample == r_samp->num_entries_to_skip_b4_next_sample) {
Expand Down
4 changes: 0 additions & 4 deletions src/catalog/catalog_entry/duck_table_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,6 @@ unique_ptr<BaseStatistics> DuckTableEntry::GetStatistics(ClientContext &context,
return storage->GetStatistics(context, column.StorageOid());
}

unique_ptr<BlockingSample> DuckTableEntry::GetSample() {
return storage->GetSample();
}

unique_ptr<CatalogEntry> DuckTableEntry::AlterEntry(CatalogTransaction transaction, AlterInfo &info) {
if (transaction.HasContext()) {
return AlterEntry(transaction.GetContext(), info);
Expand Down
4 changes: 0 additions & 4 deletions src/catalog/catalog_entry/table_catalog_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ LogicalIndex TableCatalogEntry::GetColumnIndex(string &column_name, bool if_exis
return entry;
}

unique_ptr<BlockingSample> TableCatalogEntry::GetSample() {
return nullptr;
}

bool TableCatalogEntry::ColumnExists(const string &name) const {
return columns.ColumnExists(name);
}
Expand Down
18 changes: 0 additions & 18 deletions src/common/enum_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3249,24 +3249,6 @@ SampleType EnumUtil::FromString<SampleType>(const char *value) {
return static_cast<SampleType>(StringUtil::StringToEnum(GetSampleTypeValues(), 3, "SampleType", value));
}

const StringUtil::EnumStringLiteral *GetSamplingStateValues() {
static constexpr StringUtil::EnumStringLiteral values[] {
{ static_cast<uint32_t>(SamplingState::RANDOM), "RANDOM" },
{ static_cast<uint32_t>(SamplingState::RESERVOIR), "RESERVOIR" }
};
return values;
}

template<>
const char* EnumUtil::ToChars<SamplingState>(SamplingState value) {
return StringUtil::EnumToString(GetSamplingStateValues(), 2, "SamplingState", static_cast<uint32_t>(value));
}

template<>
SamplingState EnumUtil::FromString<SamplingState>(const char *value) {
return static_cast<SamplingState>(StringUtil::StringToEnum(GetSamplingStateValues(), 2, "SamplingState", value));
}

const StringUtil::EnumStringLiteral *GetScanTypeValues() {
static constexpr StringUtil::EnumStringLiteral values[] {
{ static_cast<uint32_t>(ScanType::TABLE), "TABLE" },
Expand Down
4 changes: 2 additions & 2 deletions src/execution/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ add_subdirectory(nested_loop_join)
add_subdirectory(operator)
add_subdirectory(physical_plan)
add_subdirectory(index)
add_subdirectory(sample)

add_library_unity(
duckdb_execution
Expand All @@ -18,7 +17,8 @@ add_library_unity(
perfect_aggregate_hashtable.cpp
physical_operator.cpp
physical_plan_generator.cpp
radix_partitioned_hashtable.cpp)
radix_partitioned_hashtable.cpp
reservoir_sample.cpp)
set(ALL_OBJECT_FILES
${ALL_OBJECT_FILES} $<TARGET_OBJECTS:duckdb_execution>
PARENT_SCOPE)
324 changes: 324 additions & 0 deletions src/execution/reservoir_sample.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,324 @@
#include "duckdb/execution/reservoir_sample.hpp"
#include "duckdb/common/types/data_chunk.hpp"
#include "duckdb/common/pair.hpp"

namespace duckdb {

void ReservoirChunk::Serialize(Serializer &serializer) const {
chunk.Serialize(serializer);
}

unique_ptr<ReservoirChunk> ReservoirChunk::Deserialize(Deserializer &deserializer) {
auto result = make_uniq<ReservoirChunk>();
result->chunk.Deserialize(deserializer);
return result;
}

ReservoirSample::ReservoirSample(Allocator &allocator, idx_t sample_count, int64_t seed)
: BlockingSample(seed), allocator(allocator), sample_count(sample_count), reservoir_initialized(false) {
}

ReservoirSample::ReservoirSample(idx_t sample_count, int64_t seed)
: ReservoirSample(Allocator::DefaultAllocator(), sample_count, seed) {
}

void ReservoirSample::AddToReservoir(DataChunk &input) {
if (sample_count == 0) {
// sample count is 0, means no samples were requested
return;
}
old_base_reservoir_sample.num_entries_seen_total += input.size();
// Input: A population V of n weighted items
// Output: A reservoir R with a size m
// 1: The first m items of V are inserted into R
// first we need to check if the reservoir already has "m" elements
if (!reservoir_data_chunk || reservoir_data_chunk->size() < sample_count) {
if (FillReservoir(input) == 0) {
// entire chunk was consumed by reservoir
return;
}
}
D_ASSERT(reservoir_data_chunk);
D_ASSERT(reservoir_data_chunk->size() == sample_count);
// Initialize the weights if they have not been already
if (old_base_reservoir_sample.reservoir_weights.empty()) {
old_base_reservoir_sample.InitializeReservoir(reservoir_data_chunk->size(), sample_count);
}
// find the position of next_index_to_sample relative to number of seen entries (num_entries_to_skip_b4_next_sample)
idx_t remaining = input.size();
idx_t base_offset = 0;
while (true) {
idx_t offset = old_base_reservoir_sample.next_index_to_sample -
old_base_reservoir_sample.num_entries_to_skip_b4_next_sample;
if (offset >= remaining) {
// not in this chunk! increment current count and go to the next chunk
old_base_reservoir_sample.num_entries_to_skip_b4_next_sample += remaining;
return;
}
// in this chunk! replace the element
ReplaceElement(input, base_offset + offset);
// shift the chunk forward
remaining -= offset;
base_offset += offset;
}
}

unique_ptr<DataChunk> ReservoirSample::GetChunk() {
if (!reservoir_data_chunk || reservoir_data_chunk->size() == 0) {
return nullptr;
}
auto collected_sample_count = reservoir_data_chunk->size();
if (collected_sample_count > STANDARD_VECTOR_SIZE) {
// get from the back to avoid creating two selection vectors
// one to return the first STANDARD_VECTOR_SIZE
// another to replace the reservoir_data_chunk with the first STANDARD VECTOR SIZE missing
auto ret = make_uniq<DataChunk>();
auto samples_remaining = collected_sample_count - STANDARD_VECTOR_SIZE;
auto reservoir_types = reservoir_data_chunk->GetTypes();
SelectionVector sel(STANDARD_VECTOR_SIZE);
for (idx_t i = samples_remaining; i < collected_sample_count; i++) {
sel.set_index(i - samples_remaining, i);
}
ret->Initialize(allocator, reservoir_types);
ret->Slice(*reservoir_data_chunk, sel, STANDARD_VECTOR_SIZE);
ret->SetCardinality(STANDARD_VECTOR_SIZE);
// reduce capacity and cardinality of the sample data chunk
reservoir_data_chunk->SetCardinality(samples_remaining);
return ret;
}
return std::move(reservoir_data_chunk);
}

void ReservoirSample::ReplaceElement(DataChunk &input, idx_t index_in_chunk, double with_weight) {
// replace the entry in the reservoir
// 8. The item in R with the minimum key is replaced by item vi
D_ASSERT(input.ColumnCount() == reservoir_data_chunk->ColumnCount());
for (idx_t col_idx = 0; col_idx < input.ColumnCount(); col_idx++) {
reservoir_data_chunk->SetValue(col_idx, old_base_reservoir_sample.min_weighted_entry_index,
input.GetValue(col_idx, index_in_chunk));
}
old_base_reservoir_sample.ReplaceElement(with_weight);
}

void ReservoirSample::InitializeReservoir(DataChunk &input) {
reservoir_data_chunk = make_uniq<DataChunk>();
reservoir_data_chunk->Initialize(allocator, input.GetTypes(), sample_count);
for (idx_t col_idx = 0; col_idx < reservoir_data_chunk->ColumnCount(); col_idx++) {
FlatVector::Validity(reservoir_data_chunk->data[col_idx]).Initialize(sample_count);
}
reservoir_initialized = true;
}

idx_t ReservoirSample::FillReservoir(DataChunk &input) {
idx_t chunk_count = input.size();
input.Flatten();
auto num_added_samples = reservoir_data_chunk ? reservoir_data_chunk->size() : 0;
D_ASSERT(num_added_samples <= sample_count);

// required count is what we still need to add to the reservoir
idx_t required_count;
if (num_added_samples + chunk_count >= sample_count) {
// have to limit the count of the chunk
required_count = sample_count - num_added_samples;
} else {
// we copy the entire chunk
required_count = chunk_count;
}
input.SetCardinality(required_count);

// initialize the reservoir
if (!reservoir_initialized) {
InitializeReservoir(input);
}
reservoir_data_chunk->Append(input, false, nullptr, required_count);
old_base_reservoir_sample.InitializeReservoir(required_count, sample_count);

// check if there are still elements remaining in the Input data chunk that should be
// randomly sampled and potentially added. This happens if we are on a boundary
// for example, input.size() is 1024, but our sample size is 10
if (required_count == chunk_count) {
// we are done here
return 0;
}
// we still need to process a part of the chunk
// create a selection vector of the remaining elements
SelectionVector sel(STANDARD_VECTOR_SIZE);
for (idx_t i = required_count; i < chunk_count; i++) {
sel.set_index(i - required_count, i);
}
// slice the input vector and continue
input.Slice(sel, chunk_count - required_count);
return input.size();
}

void ReservoirSample::Finalize() {
return;
}

ReservoirSamplePercentage::ReservoirSamplePercentage(Allocator &allocator, double percentage, int64_t seed)
: BlockingSample(seed), allocator(allocator), sample_percentage(percentage / 100.0), current_count(0),
is_finalized(false) {
reservoir_sample_size = idx_t(sample_percentage * RESERVOIR_THRESHOLD);
current_sample = make_uniq<ReservoirSample>(allocator, reservoir_sample_size, random.NextRandomInteger());
}

ReservoirSamplePercentage::ReservoirSamplePercentage(double percentage, int64_t seed)
: ReservoirSamplePercentage(Allocator::DefaultAllocator(), percentage, seed) {
}

void ReservoirSamplePercentage::AddToReservoir(DataChunk &input) {
old_base_reservoir_sample.num_entries_seen_total += input.size();
if (current_count + input.size() > RESERVOIR_THRESHOLD) {
// we don't have enough space in our current reservoir
// first check what we still need to append to the current sample
idx_t append_to_current_sample_count = RESERVOIR_THRESHOLD - current_count;
idx_t append_to_next_sample = input.size() - append_to_current_sample_count;
if (append_to_current_sample_count > 0) {
// we have elements remaining, first add them to the current sample
if (append_to_next_sample > 0) {
// we need to also add to the next sample
DataChunk new_chunk;
new_chunk.InitializeEmpty(input.GetTypes());
new_chunk.Slice(input, *FlatVector::IncrementalSelectionVector(), append_to_current_sample_count);
new_chunk.Flatten();
current_sample->AddToReservoir(new_chunk);
} else {
input.Flatten();
input.SetCardinality(append_to_current_sample_count);
current_sample->AddToReservoir(input);
}
}
if (append_to_next_sample > 0) {
// slice the input for the remainder
SelectionVector sel(append_to_next_sample);
for (idx_t i = append_to_current_sample_count; i < append_to_next_sample + append_to_current_sample_count;
i++) {
sel.set_index(i - append_to_current_sample_count, i);
}
input.Slice(sel, append_to_next_sample);
}
// now our first sample is filled: append it to the set of finished samples
finished_samples.push_back(std::move(current_sample));

// allocate a new sample, and potentially add the remainder of the current input to that sample
current_sample = make_uniq<ReservoirSample>(allocator, reservoir_sample_size, random.NextRandomInteger());
if (append_to_next_sample > 0) {
current_sample->AddToReservoir(input);
}
current_count = append_to_next_sample;
} else {
// we can just append to the current sample
current_count += input.size();
current_sample->AddToReservoir(input);
}
}

unique_ptr<DataChunk> ReservoirSamplePercentage::GetChunk() {
if (!is_finalized) {
Finalize();
}
while (!finished_samples.empty()) {
auto &front = finished_samples.front();
auto chunk = front->GetChunk();
if (chunk && chunk->size() > 0) {
return chunk;
}
// move to the next sample
finished_samples.erase(finished_samples.begin());
}
return nullptr;
}

void ReservoirSamplePercentage::Finalize() {
// need to finalize the current sample, if any
// we are finializing, so we are starting to return chunks. Our last chunk has
// sample_percentage * RESERVOIR_THRESHOLD entries that hold samples.
// if our current count is less than the sample_percentage * RESERVOIR_THRESHOLD
// then we have sampled too much for the current_sample and we need to redo the sample
// otherwise we can just push the current sample back
// Imagine sampling 70% of 100 rows (so 70 rows). We allocate sample_percentage * RESERVOIR_THRESHOLD
// -----------------------------------------
auto sampled_more_than_required =
static_cast<double>(current_count) > sample_percentage * RESERVOIR_THRESHOLD || finished_samples.empty();
if (current_count > 0 && sampled_more_than_required) {
// create a new sample
auto new_sample_size = idx_t(round(sample_percentage * static_cast<double>(current_count)));
auto new_sample = make_uniq<ReservoirSample>(allocator, new_sample_size, random.NextRandomInteger());
while (true) {
auto chunk = current_sample->GetChunk();
if (!chunk || chunk->size() == 0) {
break;
}
new_sample->AddToReservoir(*chunk);
}
finished_samples.push_back(std::move(new_sample));
} else {
finished_samples.push_back(std::move(current_sample));
}
// when finalizing, current_sample is null. All samples are now in finished samples.
current_sample = nullptr;
is_finalized = true;
}

BaseReservoirSampling::BaseReservoirSampling(int64_t seed) : random(seed) {
next_index_to_sample = 0;
min_weight_threshold = 0;
min_weighted_entry_index = 0;
num_entries_to_skip_b4_next_sample = 0;
num_entries_seen_total = 0;
}

BaseReservoirSampling::BaseReservoirSampling() : BaseReservoirSampling(-1) {
}

void BaseReservoirSampling::InitializeReservoir(idx_t cur_size, idx_t sample_size) {
//! 1: The first m items of V are inserted into R
//! first we need to check if the reservoir already has "m" elements
if (cur_size == sample_size) {
//! 2. For each item vi ∈ R: Calculate a key ki = random(0, 1)
//! we then define the threshold to enter the reservoir T_w as the minimum key of R
//! we use a priority queue to extract the minimum key in O(1) time
for (idx_t i = 0; i < sample_size; i++) {
double k_i = random.NextRandom();
reservoir_weights.emplace(-k_i, i);
}
SetNextEntry();
}
}

void BaseReservoirSampling::SetNextEntry() {
//! 4. Let r = random(0, 1) and Xw = log(r) / log(T_w)
auto &min_key = reservoir_weights.top();
double t_w = -min_key.first;
double r = random.NextRandom();
double x_w = log(r) / log(t_w);
//! 5. From the current item vc skip items until item vi , such that:
//! 6. wc +wc+1 +···+wi−1 < Xw <= wc +wc+1 +···+wi−1 +wi
//! since all our weights are 1 (uniform sampling), we can just determine the amount of elements to skip
min_weight_threshold = t_w;
min_weighted_entry_index = min_key.second;
next_index_to_sample = MaxValue<idx_t>(1, idx_t(round(x_w)));
num_entries_to_skip_b4_next_sample = 0;
}

void BaseReservoirSampling::ReplaceElement(double with_weight) {
//! replace the entry in the reservoir
//! pop the minimum entry
reservoir_weights.pop();
//! now update the reservoir
//! 8. Let tw = Tw i , r2 = random(tw,1) and vi’s key: ki = (r2)1/wi
//! 9. The new threshold Tw is the new minimum key of R
//! we generate a random number between (min_weight_threshold, 1)
double r2 = random.NextRandom(min_weight_threshold, 1);

//! if we are merging two reservoir samples use the weight passed
if (with_weight >= 0) {
r2 = with_weight;
}
//! now we insert the new weight into the reservoir
reservoir_weights.emplace(-r2, min_weighted_entry_index);
//! we update the min entry with the new min entry in the reservoir
SetNextEntry();
}

} // namespace duckdb
Loading
Loading