diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..735683a --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,19 @@ +cmake_minimum_required(VERSION 3.12) +project(mapreduce) +set(CMAKE_CXX_STANDARD 14) +set(Boost_USE_STATIC_LIBS ON) # only find static libs +set(Boost_USE_MULTITHREADED ON) +set(Boost_USE_STATIC_RUNTIME OFF) +find_package(Boost 1.36 REQUIRED system filesystem iostreams) + +MESSAGE(STATUS "find boost ${Boost_INCLUDE_DIRS} ${Boost_LIBRARIES}") + +include_directories(${Boost_INCLUDE_DIRS}) +include_directories(include) +include_directories(include/detail) +include_directories(include/detail/intermediates) +include_directories(include/detail/schedule_policy) + +add_subdirectory(examples/wordcount) +add_subdirectory(examples/prime) +add_subdirectory(examples/friends) diff --git a/README.md b/README.md index 2bfa1f5..9096689 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,8 @@ MapReduce C++ Library = + +This is a fork of cdmh/mapreduce, this fork aims to help the portabillity of the original code, for any questions please contact the original developer. + The MapReduce C++ Library implements a single-machine platform for programming using the the Google MapReduce idiom. Users specify a map function that processes a key/value pair to generate a set of intermediate key/value pairs, and a reduce function that merges all intermediate values associated with the same intermediate key. Many real world tasks are expressible in this model, as shown in the Google paper. map (k1,v1) --> list(k2,v2) diff --git a/examples/friends/CMakeLists.txt b/examples/friends/CMakeLists.txt new file mode 100644 index 0000000..3ffc202 --- /dev/null +++ b/examples/friends/CMakeLists.txt @@ -0,0 +1,3 @@ +project(friends) +add_executable(friends friends.cpp) +target_link_libraries(friends ${Boost_LIBRARIES} pthread) diff --git a/examples/prime/CMakeLists.txt b/examples/prime/CMakeLists.txt new file mode 100644 index 0000000..551712d --- /dev/null +++ b/examples/prime/CMakeLists.txt @@ -0,0 +1,3 @@ +project(prime) +add_executable(prime prime.cpp) +target_link_libraries(prime ${Boost_LIBRARIES} pthread) diff --git a/examples/wordcount/CMakeLists.txt b/examples/wordcount/CMakeLists.txt new file mode 100644 index 0000000..c0a05a9 --- /dev/null +++ b/examples/wordcount/CMakeLists.txt @@ -0,0 +1,3 @@ +project(wordcount) +add_executable(wordcount wordcount.cpp) +target_link_libraries(wordcount ${Boost_LIBRARIES} pthread) diff --git a/examples/wordcount/wordcount.cpp b/examples/wordcount/wordcount.cpp index f0e3cee..3542f3e 100644 --- a/examples/wordcount/wordcount.cpp +++ b/examples/wordcount/wordcount.cpp @@ -20,6 +20,13 @@ #include #endif + +#if defined(__GNUC__) || defined(__GNUG__) +#define STRNICMP strncasecmp +#elif defined(_MSC_VER) +#define STRNICMP strnicmp +#endif + #include "wordcount.h" #include @@ -52,20 +59,8 @@ bool std::less >::operator()( std::pair const &first, std::pair const &second) const { - return (strnicmp(first.first, second.first, std::min(first.second, second.second)) < 0) - || ((first.second < second.second) && (strnicmp(first.first, second.first, std::min(first.second, second.second)) <= 0)); -} - -template<> -constexpr -bool std::less::operator()( - std::string const &first, - std::string const &second) const -{ - return - std::less>()( - std::pair(first.c_str(), first.length()), - std::pair(second.c_str(), second.length())); + return (STRNICMP(first.first, second.first, std::min(first.second, second.second)) < 0) + || ((first.second < second.second) && (STRNICMP(first.first, second.first, std::min(first.second, second.second)) <= 0)); } namespace { @@ -81,7 +76,7 @@ double const sum(T const &durations) void write_stats(mapreduce::results const &result) { - if (result.map_times.size() == 0 || result.reduce_times.size() == 0) + if (result.map_times.empty() || result.reduce_times.empty()) return; std::cout << std::endl << "\nMapReduce statistics:"; @@ -103,7 +98,7 @@ void write_stats(mapreduce::results const &result) std::cout << "\n Reduce key processing errors : " << result.counters.reduce_key_errors; std::cout << "\n Number of Reduce Tasks run (in parallel): " << result.counters.actual_reduce_tasks; std::cout << "\n Number of Result Files : " << result.counters.num_result_files; - if (result.reduce_times.size() > 0) + if (!result.reduce_times.empty()) { std::cout << "\n Fastest Reduce key processed in : " << std::min_element(result.reduce_times.cbegin(), result.reduce_times.cend())->count() << "s"; std::cout << "\n Slowest Reduce key processed in : " << std::max_element(result.reduce_times.cbegin(), result.reduce_times.cend())->count() << "s"; @@ -174,7 +169,7 @@ void run_wordcount(mapreduce::specification const &spec) job.run >(result); #else std::cout << "\nRunning Parallel WordCount MapReduce..."; - job.run >(result); + job.template run >(result); #endif std::cout << "\nMapReduce Finished."; diff --git a/examples/wordcount/wordcount.h b/examples/wordcount/wordcount.h index 739f047..4c37ce6 100644 --- a/examples/wordcount/wordcount.h +++ b/examples/wordcount/wordcount.h @@ -46,7 +46,7 @@ template struct reduce_task : public mapreduce::reduce_task { template - void operator()(Runtime &runtime, key_type const &key, It it, It const ite) const + void operator()(Runtime &runtime, KeyType const &key, It it, It const ite) const { runtime.emit(key, std::accumulate(it, ite, 0)); } diff --git a/include/detail/datasource.hpp b/include/detail/datasource.hpp index 042c2c3..7b4b43d 100644 --- a/include/detail/datasource.hpp +++ b/include/detail/datasource.hpp @@ -4,6 +4,7 @@ #pragma once #include +#include namespace mapreduce { @@ -15,7 +16,7 @@ template class file_handler : ::mapreduce::detail::noncopyable { public: - file_handler(mapreduce::specification const &spec); + explicit file_handler(mapreduce::specification const &spec); bool const get_data(Key const &key, Value &value) const; bool const setup_key(Key &/*key*/) const { return false; } @@ -176,7 +177,7 @@ template< class directory_iterator : mapreduce::detail::noncopyable { public: - directory_iterator(mapreduce::specification const &spec) + explicit directory_iterator(mapreduce::specification const &spec) : specification_(spec), file_handler_(spec) { diff --git a/include/detail/intermediates/in_memory.hpp b/include/detail/intermediates/in_memory.hpp index d222c51..f6f267c 100644 --- a/include/detail/intermediates/in_memory.hpp +++ b/include/detail/intermediates/in_memory.hpp @@ -93,7 +93,7 @@ class in_memory : detail::noncopyable const_result_iterator &operator=(const_result_iterator const &other); - void increment(void) + void increment() { ++current_.second; if (current_.second == iterators_[current_.first]->second.end()) @@ -114,7 +114,7 @@ class in_memory : detail::noncopyable return value_ == other.value_; } - const_result_iterator &begin(void) + const_result_iterator &begin() { for (size_t loop=0; loopnum_partitions_; ++loop) iterators_[loop] = outer_->intermediates_[loop].cbegin(); @@ -122,7 +122,7 @@ class in_memory : detail::noncopyable return *this; } - const_result_iterator &end(void) + const_result_iterator &end() { current_.first = std::numeric_limits::max(); value_ = keyvalue_t(); @@ -130,12 +130,12 @@ class in_memory : detail::noncopyable return *this; } - keyvalue_t const &dereference(void) const + keyvalue_t const &dereference() const { return value_; } - void set_current(void) + void set_current() { for (current_.first=0; current_.firstnum_partitions_ && iterators_[current_.first] == outer_->intermediates_[current_.first].end(); @@ -190,12 +190,12 @@ class in_memory : detail::noncopyable intermediates_.resize(num_partitions_); } - const_result_iterator begin_results(void) const + const_result_iterator begin_results() const { return const_result_iterator(this).begin(); } - const_result_iterator end_results(void) const + const_result_iterator end_results() const { return const_result_iterator(this).end(); } @@ -263,7 +263,6 @@ class in_memory : detail::noncopyable } // receive final result - template bool const insert(typename reduce_task_type::key_type const &key, typename reduce_task_type::value_type const &value, StoreResult &store_result) @@ -272,7 +271,7 @@ class in_memory : detail::noncopyable } // receive intermediate result - bool const insert(typename key_type const &key, + bool const insert(key_type const &key, typename reduce_task_type::value_type const &value) { size_t const partition = (num_partitions_ == 1)? 0 : partitioner_(key, num_partitions_); diff --git a/include/detail/intermediates/local_disk.hpp b/include/detail/intermediates/local_disk.hpp index 641d556..642a624 100644 --- a/include/detail/intermediates/local_disk.hpp +++ b/include/detail/intermediates/local_disk.hpp @@ -4,6 +4,7 @@ #pragma once #include // setw +#include "../job.hpp" #ifdef __GNUC__ #include // ubuntu linux #include // ubuntu linux @@ -65,7 +66,7 @@ struct file_merger using std::vector::const_reference; }; - void open_files(void) + void open_files() { // open each file and read the first record (line) from each while (files.size() > 0) @@ -247,7 +248,7 @@ class local_disk : detail::noncopyable kvlist_.resize(outer_->num_partitions_); } - void increment(void) + void increment() { if (!kvlist_[index_].first->eof()) read_record(*kvlist_[index_].first, kvlist_[index_].second.first, kvlist_[index_].second.second); @@ -262,7 +263,7 @@ class local_disk : detail::noncopyable && kvlist_[index_].second == other.kvlist_[index_].second); } - const_result_iterator &begin(void) + const_result_iterator &begin() { for (size_t loop=0; loopnum_partitions_; ++loop) { @@ -287,19 +288,19 @@ class local_disk : detail::noncopyable return *this; } - const_result_iterator &end(void) + const_result_iterator &end() { index_ = 0; kvlist_.clear(); return *this; } - keyvalue_t const &dereference(void) const + keyvalue_t const &dereference() const { return kvlist_[index_].second; } - void set_current(void) + void set_current() { index_ = 0; while (index_num_partitions_ && kvlist_[index_].first->eof()) @@ -333,11 +334,9 @@ class local_disk : detail::noncopyable private: struct intermediate_file_info { - intermediate_file_info() - { - } + intermediate_file_info() = default; - intermediate_file_info(std::string fname) + explicit intermediate_file_info(std::string const& fname) : filename(fname) { } @@ -361,7 +360,7 @@ class local_disk : detail::noncopyable std::ofstream::open(filename.c_str(), std::ios_base::binary); } - void close(void) + void close() { if (is_open()) { @@ -405,7 +404,7 @@ class local_disk : detail::noncopyable return true; } - bool const flush_cache(void) + bool const flush_cache() { use_cache_ = false; for (auto it = records_.cbegin(); it != records_.cend(); ++it) @@ -469,12 +468,12 @@ class local_disk : detail::noncopyable } } - const_result_iterator begin_results(void) const + const_result_iterator begin_results() const { return const_result_iterator(this).begin(); } - const_result_iterator end_results(void) const + const_result_iterator end_results() const { return const_result_iterator(this).end(); } @@ -490,7 +489,7 @@ class local_disk : detail::noncopyable } // receive intermediate result - bool const insert(typename key_type const &key, + bool const insert(key_type const &key, typename reduce_task_type::value_type const &value) { size_t const partition = partitioner_(key, num_partitions_); @@ -646,7 +645,7 @@ class local_disk : detail::noncopyable } private: - void close_files(void) + void close_files() { for (auto it=intermediate_files_.cbegin(); it!=intermediate_files_.cend(); ++it) it->second->write_stream.close(); diff --git a/include/detail/job.hpp b/include/detail/job.hpp index d88e120..db609b4 100644 --- a/include/detail/job.hpp +++ b/include/detail/job.hpp @@ -3,6 +3,8 @@ #pragma once +#include "datasource.hpp" + namespace mapreduce { template uintmax_t const length(T const &str); @@ -65,7 +67,7 @@ class job : detail::noncopyable public: typedef ReduceTask reduce_task_type; - map_task_runner(job &j) + explicit map_task_runner(job &j) : job_(j), intermediate_store_(job_.number_of_partitions()) { @@ -91,7 +93,7 @@ class job : detail::noncopyable return intermediate_store_.insert(key, value); } - intermediate_store_type &intermediate_store(void) + intermediate_store_type &intermediate_store() { return intermediate_store_; } @@ -117,7 +119,7 @@ class job : detail::noncopyable { } - void reduce(void) + void reduce() { intermediate_store_.reduce(partition_, *this); } @@ -151,12 +153,12 @@ class job : detail::noncopyable { } - const_result_iterator begin_results(void) const + const_result_iterator begin_results() const { return intermediate_store_.begin_results(); } - const_result_iterator end_results(void) const + const_result_iterator end_results() const { return intermediate_store_.end_results(); } @@ -170,12 +172,12 @@ class job : detail::noncopyable return true; } - size_t const number_of_partitions(void) const + size_t const number_of_partitions() const { return specification_.reduce_tasks; } - size_t const number_of_map_tasks(void) const + size_t const number_of_map_tasks() const { return specification_.map_tasks; } diff --git a/include/mapreduce.hpp b/include/mapreduce.hpp index fa24996..f5f571f 100644 --- a/include/mapreduce.hpp +++ b/include/mapreduce.hpp @@ -53,7 +53,7 @@ class joined_thread_group : public std::vector join_all(); } - void join_all(void) + void join_all() { for (auto &thread : *this) { @@ -146,7 +146,7 @@ void run(mapreduce::specification &spec, mapreduce::results &result) { typename Job::datasource_type datasource(spec); Job job(datasource, spec); - job.run >(result); + job.template run >(result); } } // namespace mapreduce