Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
cmake_minimum_required(VERSION 3.12)
project(mapreduce)
set(CMAKE_CXX_STANDARD 14)
set(Boost_USE_STATIC_LIBS ON) # only find static libs
set(Boost_USE_MULTITHREADED ON)
set(Boost_USE_STATIC_RUNTIME OFF)
find_package(Boost 1.36 REQUIRED system filesystem iostreams)

MESSAGE(STATUS "find boost ${Boost_INCLUDE_DIRS} ${Boost_LIBRARIES}")

include_directories(${Boost_INCLUDE_DIRS})
include_directories(include)
include_directories(include/detail)
include_directories(include/detail/intermediates)
include_directories(include/detail/schedule_policy)

add_subdirectory(examples/wordcount)
add_subdirectory(examples/prime)
add_subdirectory(examples/friends)
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
MapReduce C++ Library
=

This is a fork of cdmh/mapreduce, this fork aims to help the portabillity of the original code, for any questions please contact the original developer.

The MapReduce C++ Library implements a single-machine platform for programming using the the Google MapReduce idiom. Users specify a map function that processes a key/value pair to generate a set of intermediate key/value pairs, and a reduce function that merges all intermediate values associated with the same intermediate key. Many real world tasks are expressible in this model, as shown in the Google paper.

map (k1,v1) --> list(k2,v2)
Expand Down
3 changes: 3 additions & 0 deletions examples/friends/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
project(friends)
add_executable(friends friends.cpp)
target_link_libraries(friends ${Boost_LIBRARIES} pthread)
3 changes: 3 additions & 0 deletions examples/prime/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
project(prime)
add_executable(prime prime.cpp)
target_link_libraries(prime ${Boost_LIBRARIES} pthread)
3 changes: 3 additions & 0 deletions examples/wordcount/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
project(wordcount)
add_executable(wordcount wordcount.cpp)
target_link_libraries(wordcount ${Boost_LIBRARIES} pthread)
29 changes: 12 additions & 17 deletions examples/wordcount/wordcount.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@
#include <crtdbg.h>
#endif


#if defined(__GNUC__) || defined(__GNUG__)
#define STRNICMP strncasecmp
#elif defined(_MSC_VER)
#define STRNICMP strnicmp
#endif

#include "wordcount.h"
#include <iostream>

Expand Down Expand Up @@ -52,20 +59,8 @@ bool std::less<std::pair<char const *, std::uintmax_t> >::operator()(
std::pair<char const *, std::uintmax_t> const &first,
std::pair<char const *, std::uintmax_t> const &second) const
{
return (strnicmp(first.first, second.first, std::min(first.second, second.second)) < 0)
|| ((first.second < second.second) && (strnicmp(first.first, second.first, std::min(first.second, second.second)) <= 0));
}

template<>
constexpr
bool std::less<std::string>::operator()(
std::string const &first,
std::string const &second) const
{
return
std::less<std::pair<char const *, std::uintmax_t>>()(
std::pair<char const *, std::uintmax_t>(first.c_str(), first.length()),
std::pair<char const *, std::uintmax_t>(second.c_str(), second.length()));
return (STRNICMP(first.first, second.first, std::min(first.second, second.second)) < 0)
|| ((first.second < second.second) && (STRNICMP(first.first, second.first, std::min(first.second, second.second)) <= 0));
}

namespace {
Expand All @@ -81,7 +76,7 @@ double const sum(T const &durations)

void write_stats(mapreduce::results const &result)
{
if (result.map_times.size() == 0 || result.reduce_times.size() == 0)
if (result.map_times.empty() || result.reduce_times.empty())
return;

std::cout << std::endl << "\nMapReduce statistics:";
Expand All @@ -103,7 +98,7 @@ void write_stats(mapreduce::results const &result)
std::cout << "\n Reduce key processing errors : " << result.counters.reduce_key_errors;
std::cout << "\n Number of Reduce Tasks run (in parallel): " << result.counters.actual_reduce_tasks;
std::cout << "\n Number of Result Files : " << result.counters.num_result_files;
if (result.reduce_times.size() > 0)
if (!result.reduce_times.empty())
{
std::cout << "\n Fastest Reduce key processed in : " << std::min_element(result.reduce_times.cbegin(), result.reduce_times.cend())->count() << "s";
std::cout << "\n Slowest Reduce key processed in : " << std::max_element(result.reduce_times.cbegin(), result.reduce_times.cend())->count() << "s";
Expand Down Expand Up @@ -174,7 +169,7 @@ void run_wordcount(mapreduce::specification const &spec)
job.run<mapreduce::schedule_policy::sequential<Job> >(result);
#else
std::cout << "\nRunning Parallel WordCount MapReduce...";
job.run<mapreduce::schedule_policy::cpu_parallel<Job> >(result);
job.template run<mapreduce::schedule_policy::cpu_parallel<Job> >(result);
#endif
std::cout << "\nMapReduce Finished.";

Expand Down
2 changes: 1 addition & 1 deletion examples/wordcount/wordcount.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ template<typename KeyType>
struct reduce_task : public mapreduce::reduce_task<KeyType, unsigned>
{
template<typename Runtime, typename It>
void operator()(Runtime &runtime, key_type const &key, It it, It const ite) const
void operator()(Runtime &runtime, KeyType const &key, It it, It const ite) const
{
runtime.emit(key, std::accumulate(it, ite, 0));
}
Expand Down
5 changes: 3 additions & 2 deletions include/detail/datasource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#pragma once

#include <boost/iostreams/device/mapped_file.hpp>
#include <mutex>

namespace mapreduce {

Expand All @@ -15,7 +16,7 @@ template<typename Key, typename Value>
class file_handler : ::mapreduce::detail::noncopyable
{
public:
file_handler(mapreduce::specification const &spec);
explicit file_handler(mapreduce::specification const &spec);

bool const get_data(Key const &key, Value &value) const;
bool const setup_key(Key &/*key*/) const { return false; }
Expand Down Expand Up @@ -176,7 +177,7 @@ template<
class directory_iterator : mapreduce::detail::noncopyable
{
public:
directory_iterator(mapreduce::specification const &spec)
explicit directory_iterator(mapreduce::specification const &spec)
: specification_(spec),
file_handler_(spec)
{
Expand Down
17 changes: 8 additions & 9 deletions include/detail/intermediates/in_memory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class in_memory : detail::noncopyable

const_result_iterator &operator=(const_result_iterator const &other);

void increment(void)
void increment()
{
++current_.second;
if (current_.second == iterators_[current_.first]->second.end())
Expand All @@ -114,28 +114,28 @@ class in_memory : detail::noncopyable
return value_ == other.value_;
}

const_result_iterator &begin(void)
const_result_iterator &begin()
{
for (size_t loop=0; loop<outer_->num_partitions_; ++loop)
iterators_[loop] = outer_->intermediates_[loop].cbegin();
set_current();
return *this;
}

const_result_iterator &end(void)
const_result_iterator &end()
{
current_.first = std::numeric_limits<decltype(current_.first)>::max();
value_ = keyvalue_t();
iterators_.clear();
return *this;
}

keyvalue_t const &dereference(void) const
keyvalue_t const &dereference() const
{
return value_;
}

void set_current(void)
void set_current()
{
for (current_.first=0;
current_.first<outer_->num_partitions_ && iterators_[current_.first] == outer_->intermediates_[current_.first].end();
Expand Down Expand Up @@ -190,12 +190,12 @@ class in_memory : detail::noncopyable
intermediates_.resize(num_partitions_);
}

const_result_iterator begin_results(void) const
const_result_iterator begin_results() const
{
return const_result_iterator(this).begin();
}

const_result_iterator end_results(void) const
const_result_iterator end_results() const
{
return const_result_iterator(this).end();
}
Expand Down Expand Up @@ -263,7 +263,6 @@ class in_memory : detail::noncopyable
}

// receive final result
template<typename StoreResult>
bool const insert(typename reduce_task_type::key_type const &key,
typename reduce_task_type::value_type const &value,
StoreResult &store_result)
Expand All @@ -272,7 +271,7 @@ class in_memory : detail::noncopyable
}

// receive intermediate result
bool const insert(typename key_type const &key,
bool const insert(key_type const &key,
typename reduce_task_type::value_type const &value)
{
size_t const partition = (num_partitions_ == 1)? 0 : partitioner_(key, num_partitions_);
Expand Down
31 changes: 15 additions & 16 deletions include/detail/intermediates/local_disk.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#pragma once

#include <iomanip> // setw
#include "../job.hpp"
#ifdef __GNUC__
#include <iostream> // ubuntu linux
#include <fstream> // ubuntu linux
Expand Down Expand Up @@ -65,7 +66,7 @@ struct file_merger
using std::vector<std::string>::const_reference;
};

void open_files(void)
void open_files()
{
// open each file and read the first record (line) from each
while (files.size() > 0)
Expand Down Expand Up @@ -247,7 +248,7 @@ class local_disk : detail::noncopyable
kvlist_.resize(outer_->num_partitions_);
}

void increment(void)
void increment()
{
if (!kvlist_[index_].first->eof())
read_record(*kvlist_[index_].first, kvlist_[index_].second.first, kvlist_[index_].second.second);
Expand All @@ -262,7 +263,7 @@ class local_disk : detail::noncopyable
&& kvlist_[index_].second == other.kvlist_[index_].second);
}

const_result_iterator &begin(void)
const_result_iterator &begin()
{
for (size_t loop=0; loop<outer_->num_partitions_; ++loop)
{
Expand All @@ -287,19 +288,19 @@ class local_disk : detail::noncopyable
return *this;
}

const_result_iterator &end(void)
const_result_iterator &end()
{
index_ = 0;
kvlist_.clear();
return *this;
}

keyvalue_t const &dereference(void) const
keyvalue_t const &dereference() const
{
return kvlist_[index_].second;
}

void set_current(void)
void set_current()
{
index_ = 0;
while (index_<outer_->num_partitions_ && kvlist_[index_].first->eof())
Expand Down Expand Up @@ -333,11 +334,9 @@ class local_disk : detail::noncopyable
private:
struct intermediate_file_info
{
intermediate_file_info()
{
}
intermediate_file_info() = default;

intermediate_file_info(std::string fname)
explicit intermediate_file_info(std::string const& fname)
: filename(fname)
{
}
Expand All @@ -361,7 +360,7 @@ class local_disk : detail::noncopyable
std::ofstream::open(filename.c_str(), std::ios_base::binary);
}

void close(void)
void close()
{
if (is_open())
{
Expand Down Expand Up @@ -405,7 +404,7 @@ class local_disk : detail::noncopyable
return true;
}

bool const flush_cache(void)
bool const flush_cache()
{
use_cache_ = false;
for (auto it = records_.cbegin(); it != records_.cend(); ++it)
Expand Down Expand Up @@ -469,12 +468,12 @@ class local_disk : detail::noncopyable
}
}

const_result_iterator begin_results(void) const
const_result_iterator begin_results() const
{
return const_result_iterator(this).begin();
}

const_result_iterator end_results(void) const
const_result_iterator end_results() const
{
return const_result_iterator(this).end();
}
Expand All @@ -490,7 +489,7 @@ class local_disk : detail::noncopyable
}

// receive intermediate result
bool const insert(typename key_type const &key,
bool const insert(key_type const &key,
typename reduce_task_type::value_type const &value)
{
size_t const partition = partitioner_(key, num_partitions_);
Expand Down Expand Up @@ -646,7 +645,7 @@ class local_disk : detail::noncopyable
}

private:
void close_files(void)
void close_files()
{
for (auto it=intermediate_files_.cbegin(); it!=intermediate_files_.cend(); ++it)
it->second->write_stream.close();
Expand Down
Loading