Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ ctest -R test_dataframe_writer --test-args --gtest_filter=SparkIntegrationTest.P
# Run Test Suite directly
# --------------------------------
./test_<suite_name>

# --------------------------------
# Run Single Test Case directly - show output
# --------------------------------
./test_dataframe --gtest_filter=SparkIntegrationTest.DropDuplicates
```

### Mem Checks (Valgrind)
Expand Down
37 changes: 37 additions & 0 deletions src/dataframe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -689,4 +689,41 @@ DataFrameWriter DataFrame::write()
config.user_id = user_id_;

return DataFrameWriter(stub_, plan_.root(), config);
}

DataFrame DataFrame::dropDuplicates()
{
return dropDuplicates({});
}

DataFrame DataFrame::dropDuplicates(const std::vector<std::string> &subset)
{
spark::connect::Plan new_plan;

auto *relation = new_plan.mutable_root()->mutable_deduplicate();

if (this->plan_.has_root())
{
relation->mutable_input()->CopyFrom(this->plan_.root());
}

if (subset.empty()) {
relation->set_all_columns_as_keys(true);
} else {
for (const auto &col_name : subset) {
relation->add_column_names(col_name);
}
}

return DataFrame(stub_, new_plan, session_id_, user_id_);
}

DataFrame DataFrame::drop_duplicates()
{
return dropDuplicates();
}

DataFrame DataFrame::drop_duplicates(const std::vector<std::string> &subset)
{
return dropDuplicates(subset);
}
21 changes: 21 additions & 0 deletions src/dataframe.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,27 @@ class DataFrame
*/
DataFrameWriter write();

/**
* @brief Returns a new DataFrame with duplicate rows removed - equivalent to distinct() function
*/

DataFrame dropDuplicates();
/**
* @brief Returns a new DataFrame with duplicate rows removed,
* considering only the given subset of columns - equivalent to distinct() function
*/
DataFrame dropDuplicates(const std::vector<std::string>& subset);

/**
* @brief Alias for dropDuplicates().
*/
DataFrame drop_duplicates();

/**
* @brief Alias for dropDuplicates(subset).
*/
DataFrame drop_duplicates(const std::vector<std::string>& subset);

private:
std::shared_ptr<spark::connect::SparkConnectService::Stub> stub_;
spark::connect::Plan plan_;
Expand Down
26 changes: 25 additions & 1 deletion tests/dataframe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,4 +274,28 @@ TEST_F(SparkIntegrationTest, WhereFilter)

auto filtered_df = df.where("age < 25");
filtered_df.show();
}
}

TEST_F(SparkIntegrationTest, DropDuplicates)
{
// R - raw string literal.
auto df = spark->sql(R"(
SELECT *
FROM VALUES
(14, 'Tom'),
(14, 'Tom'),
(14, 'Alice'),
(14, 'Alice'),
(14, 'Bob'),
(14, 'Bob'),
(15, 'Tom'),
(15, 'John')
AS people(age, name)
)");

auto deduped = df.dropDuplicates();
deduped.show();

auto subset_deduped = df.dropDuplicates({"age"});
subset_deduped.show();
}