Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions include/detail/datasource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#pragma once

#include <boost/iostreams/device/mapped_file.hpp>
#include <mutex>

namespace mapreduce {

Expand All @@ -15,7 +16,7 @@ template<typename Key, typename Value>
class file_handler : ::mapreduce::detail::noncopyable
{
public:
file_handler(mapreduce::specification const &spec);
explicit file_handler(mapreduce::specification const &spec);

bool const get_data(Key const &key, Value &value) const;
bool const setup_key(Key &/*key*/) const { return false; }
Expand Down Expand Up @@ -176,7 +177,7 @@ template<
class directory_iterator : mapreduce::detail::noncopyable
{
public:
directory_iterator(mapreduce::specification const &spec)
explicit directory_iterator(mapreduce::specification const &spec)
: specification_(spec),
file_handler_(spec)
{
Expand Down
17 changes: 8 additions & 9 deletions include/detail/intermediates/in_memory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class in_memory : detail::noncopyable

const_result_iterator &operator=(const_result_iterator const &other);

void increment(void)
void increment()
{
++current_.second;
if (current_.second == iterators_[current_.first]->second.end())
Expand All @@ -114,28 +114,28 @@ class in_memory : detail::noncopyable
return value_ == other.value_;
}

const_result_iterator &begin(void)
const_result_iterator &begin()
{
for (size_t loop=0; loop<outer_->num_partitions_; ++loop)
iterators_[loop] = outer_->intermediates_[loop].cbegin();
set_current();
return *this;
}

const_result_iterator &end(void)
const_result_iterator &end()
{
current_.first = std::numeric_limits<decltype(current_.first)>::max();
value_ = keyvalue_t();
iterators_.clear();
return *this;
}

keyvalue_t const &dereference(void) const
keyvalue_t const &dereference() const
{
return value_;
}

void set_current(void)
void set_current()
{
for (current_.first=0;
current_.first<outer_->num_partitions_ && iterators_[current_.first] == outer_->intermediates_[current_.first].end();
Expand Down Expand Up @@ -190,12 +190,12 @@ class in_memory : detail::noncopyable
intermediates_.resize(num_partitions_);
}

const_result_iterator begin_results(void) const
const_result_iterator begin_results() const
{
return const_result_iterator(this).begin();
}

const_result_iterator end_results(void) const
const_result_iterator end_results() const
{
return const_result_iterator(this).end();
}
Expand Down Expand Up @@ -263,7 +263,6 @@ class in_memory : detail::noncopyable
}

// receive final result
template<typename StoreResult>
bool const insert(typename reduce_task_type::key_type const &key,
typename reduce_task_type::value_type const &value,
StoreResult &store_result)
Expand All @@ -272,7 +271,7 @@ class in_memory : detail::noncopyable
}

// receive intermediate result
bool const insert(typename key_type const &key,
bool const insert(key_type const &key,
typename reduce_task_type::value_type const &value)
{
size_t const partition = (num_partitions_ == 1)? 0 : partitioner_(key, num_partitions_);
Expand Down
31 changes: 15 additions & 16 deletions include/detail/intermediates/local_disk.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#pragma once

#include <iomanip> // setw
#include "../job.hpp"
#ifdef __GNUC__
#include <iostream> // ubuntu linux
#include <fstream> // ubuntu linux
Expand Down Expand Up @@ -65,7 +66,7 @@ struct file_merger
using std::vector<std::string>::const_reference;
};

void open_files(void)
void open_files()
{
// open each file and read the first record (line) from each
while (files.size() > 0)
Expand Down Expand Up @@ -247,7 +248,7 @@ class local_disk : detail::noncopyable
kvlist_.resize(outer_->num_partitions_);
}

void increment(void)
void increment()
{
if (!kvlist_[index_].first->eof())
read_record(*kvlist_[index_].first, kvlist_[index_].second.first, kvlist_[index_].second.second);
Expand All @@ -262,7 +263,7 @@ class local_disk : detail::noncopyable
&& kvlist_[index_].second == other.kvlist_[index_].second);
}

const_result_iterator &begin(void)
const_result_iterator &begin()
{
for (size_t loop=0; loop<outer_->num_partitions_; ++loop)
{
Expand All @@ -287,19 +288,19 @@ class local_disk : detail::noncopyable
return *this;
}

const_result_iterator &end(void)
const_result_iterator &end()
{
index_ = 0;
kvlist_.clear();
return *this;
}

keyvalue_t const &dereference(void) const
keyvalue_t const &dereference() const
{
return kvlist_[index_].second;
}

void set_current(void)
void set_current()
{
index_ = 0;
while (index_<outer_->num_partitions_ && kvlist_[index_].first->eof())
Expand Down Expand Up @@ -333,11 +334,9 @@ class local_disk : detail::noncopyable
private:
struct intermediate_file_info
{
intermediate_file_info()
{
}
intermediate_file_info() = default;

intermediate_file_info(std::string fname)
explicit intermediate_file_info(std::string const& fname)
: filename(fname)
{
}
Expand All @@ -361,7 +360,7 @@ class local_disk : detail::noncopyable
std::ofstream::open(filename.c_str(), std::ios_base::binary);
}

void close(void)
void close()
{
if (is_open())
{
Expand Down Expand Up @@ -405,7 +404,7 @@ class local_disk : detail::noncopyable
return true;
}

bool const flush_cache(void)
bool const flush_cache()
{
use_cache_ = false;
for (auto it = records_.cbegin(); it != records_.cend(); ++it)
Expand Down Expand Up @@ -469,12 +468,12 @@ class local_disk : detail::noncopyable
}
}

const_result_iterator begin_results(void) const
const_result_iterator begin_results() const
{
return const_result_iterator(this).begin();
}

const_result_iterator end_results(void) const
const_result_iterator end_results() const
{
return const_result_iterator(this).end();
}
Expand All @@ -490,7 +489,7 @@ class local_disk : detail::noncopyable
}

// receive intermediate result
bool const insert(typename key_type const &key,
bool const insert(key_type const &key,
typename reduce_task_type::value_type const &value)
{
size_t const partition = partitioner_(key, num_partitions_);
Expand Down Expand Up @@ -646,7 +645,7 @@ class local_disk : detail::noncopyable
}

private:
void close_files(void)
void close_files()
{
for (auto it=intermediate_files_.cbegin(); it!=intermediate_files_.cend(); ++it)
it->second->write_stream.close();
Expand Down
16 changes: 9 additions & 7 deletions include/detail/job.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

#pragma once

#include "datasource.hpp"

namespace mapreduce {

template<typename T> uintmax_t const length(T const &str);
Expand Down Expand Up @@ -65,7 +67,7 @@ class job : detail::noncopyable
public:
typedef ReduceTask reduce_task_type;

map_task_runner(job &j)
explicit map_task_runner(job &j)
: job_(j),
intermediate_store_(job_.number_of_partitions())
{
Expand All @@ -91,7 +93,7 @@ class job : detail::noncopyable
return intermediate_store_.insert(key, value);
}

intermediate_store_type &intermediate_store(void)
intermediate_store_type &intermediate_store()
{
return intermediate_store_;
}
Expand All @@ -117,7 +119,7 @@ class job : detail::noncopyable
{
}

void reduce(void)
void reduce()
{
intermediate_store_.reduce(partition_, *this);
}
Expand Down Expand Up @@ -151,12 +153,12 @@ class job : detail::noncopyable
{
}

const_result_iterator begin_results(void) const
const_result_iterator begin_results() const
{
return intermediate_store_.begin_results();
}

const_result_iterator end_results(void) const
const_result_iterator end_results() const
{
return intermediate_store_.end_results();
}
Expand All @@ -170,12 +172,12 @@ class job : detail::noncopyable
return true;
}

size_t const number_of_partitions(void) const
size_t const number_of_partitions() const
{
return specification_.reduce_tasks;
}

size_t const number_of_map_tasks(void) const
size_t const number_of_map_tasks() const
{
return specification_.map_tasks;
}
Expand Down
2 changes: 1 addition & 1 deletion include/detail/platform.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ std::string get_temporary_filename(std::string &pathname)
{
const std::string tmp = "/tmp/XXXXXX";
char* tmpfile = const_cast<char*>(tmp.c_str());
mkstemp(tmpfile);
auto tmp2 = mkstemp(tmpfile);
std::string res(tmpfile);
return res;
}
Expand Down
4 changes: 2 additions & 2 deletions include/mapreduce.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class joined_thread_group : public std::vector<std::thread>
join_all();
}

void join_all(void)
void join_all()
{
for (auto &thread : *this)
{
Expand Down Expand Up @@ -146,7 +146,7 @@ void run(mapreduce::specification &spec, mapreduce::results &result)
{
typename Job::datasource_type datasource(spec);
Job job(datasource, spec);
job.run<mapreduce::schedule_policy::cpu_parallel<Job> >(result);
job.template run<mapreduce::schedule_policy::cpu_parallel<Job> >(result);
}

} // namespace mapreduce
Expand Down