diff --git a/sim/src/test_dmclock.h b/sim/src/test_dmclock.h index 7f1e554..9728b45 100644 --- a/sim/src/test_dmclock.h +++ b/sim/src/test_dmclock.h @@ -29,13 +29,14 @@ namespace crimson { }; using DmcQueue = dmc::PushPriorityQueue; + using DmcServiceTracker = dmc::ServiceTracker; using DmcServer = sim::SimulatedServer; - using DmcClient = sim::SimulatedClient, + using DmcClient = sim::SimulatedClient; diff --git a/sim/src/test_dmclock_main.cc b/sim/src/test_dmclock_main.cc index 679b392..f59b735 100644 --- a/sim/src/test_dmclock_main.cc +++ b/sim/src/test_dmclock_main.cc @@ -237,9 +237,9 @@ int main(int argc, char* argv[]) { void test::client_data(std::ostream& out, - test::MySim* sim, - test::MySim::ClientFilter client_disp_filter, - int head_w, int data_w, int data_prec) { + test::MySim* sim, + test::MySim::ClientFilter client_disp_filter, + int head_w, int data_w, int data_prec) { // report how many ops were done by reservation and proportion for // each client @@ -270,9 +270,9 @@ void test::client_data(std::ostream& out, void test::server_data(std::ostream& out, - test::MySim* sim, - test::MySim::ServerFilter server_disp_filter, - int head_w, int data_w, int data_prec) { + test::MySim* sim, + test::MySim::ServerFilter server_disp_filter, + int head_w, int data_w, int data_prec) { out << std::setw(head_w) << "res_ops:"; int total_r = 0; for (uint i = 0; i < sim->get_server_count(); ++i) { diff --git a/src/dmclock_client.h b/src/dmclock_client.h index 92f4cf8..e0280ab 100644 --- a/src/dmclock_client.h +++ b/src/dmclock_client.h @@ -22,38 +22,132 @@ namespace crimson { namespace dmclock { - struct ServerInfo { + + // OrigTracker is a best-effort implementation of the the original + // dmClock calculations of delta and rho. It adheres to an + // interface, implemented via a template type, that allows it to + // be replaced with an alternative. The interface consists of the + // static create, prepare_req, resp_update, and get_last_delta + // functions. + class OrigTracker { Counter delta_prev_req; Counter rho_prev_req; uint32_t my_delta; uint32_t my_rho; - ServerInfo(Counter _delta_prev_req, - Counter _rho_prev_req) : - delta_prev_req(_delta_prev_req), - rho_prev_req(_rho_prev_req), + public: + + OrigTracker(Counter global_delta, + Counter global_rho) : + delta_prev_req(global_delta), + rho_prev_req(global_rho), my_delta(0), my_rho(0) - { - // empty + { /* empty */ } + + static inline OrigTracker create(Counter the_delta, Counter the_rho) { + return OrigTracker(the_delta, the_rho); } - inline void req_update(Counter delta, Counter rho) { - delta_prev_req = delta; - rho_prev_req = rho; + inline ReqParams prepare_req(Counter& the_delta, Counter& the_rho) { + Counter delta_out = 1 + the_delta - delta_prev_req - my_delta; + Counter rho_out = 1 + the_rho - rho_prev_req - my_rho; + delta_prev_req = the_delta; + rho_prev_req = the_rho; my_delta = 0; my_rho = 0; + return ReqParams(uint32_t(delta_out), uint32_t(rho_out)); } - inline void resp_update(PhaseType phase) { + inline void resp_update(PhaseType phase, + Counter& the_delta, + Counter& the_rho) { + ++the_delta; ++my_delta; - if (phase == PhaseType::reservation) ++my_rho; + if (phase == PhaseType::reservation) { + ++the_rho; + ++my_rho; + } + } + + inline Counter get_last_delta() const { + return delta_prev_req; } - }; + }; // struct OrigTracker + + + // BorrowingTracker always returns a positive delta and rho. If + // not enough responses have come in to allow that, we will borrow + // a future response and repay it later. + class BorrowingTracker { + Counter delta_prev_req; + Counter rho_prev_req; + Counter delta_borrow; + Counter rho_borrow; + + public: + + BorrowingTracker(Counter global_delta, Counter global_rho) : + delta_prev_req(global_delta), + rho_prev_req(global_rho), + delta_borrow(0), + rho_borrow(0) + { /* empty */ } + + static inline BorrowingTracker create(Counter the_delta, + Counter the_rho) { + return BorrowingTracker(the_delta, the_rho); + } + + inline Counter calc_with_borrow(const Counter& global, + const Counter& previous, + Counter& borrow) { + Counter result = global - previous; + if (0 == result) { + // if no replies have come in, borrow one from the future + ++borrow; + return 1; + } else if (result > borrow) { + // if we can give back all of what we borrowed, do so + result -= borrow; + borrow = 0; + return result; + } else { + // can only return part of what was borrowed in order to + // return positive + borrow = borrow - result + 1; + return 1; + } + } + + inline ReqParams prepare_req(Counter& the_delta, Counter& the_rho) { + Counter delta_out = + calc_with_borrow(the_delta, delta_prev_req, delta_borrow); + Counter rho_out = + calc_with_borrow(the_rho, rho_prev_req, rho_borrow); + delta_prev_req = the_delta; + rho_prev_req = the_rho; + return ReqParams(uint32_t(delta_out), uint32_t(rho_out)); + } + + inline void resp_update(PhaseType phase, + Counter& the_delta, + Counter& the_rho) { + ++the_delta; + if (phase == PhaseType::reservation) { + ++the_rho; + } + } + + inline Counter get_last_delta() const { + return delta_prev_req; + } + }; // struct BorrowingTracker // S is server identifier type - template + // T is the server info class that adheres to ServerTrackerIfc interface + template class ServiceTracker { // we don't want to include gtest.h just for FRIEND_TEST friend class dmclock_client_server_erase_Test; @@ -64,7 +158,7 @@ namespace crimson { Counter delta_counter; // # reqs completed Counter rho_counter; // # reqs completed via reservation - std::map server_map; + std::map server_map; mutable std::mutex data_mtx; // protects Counters and map using DataGuard = std::lock_guard; @@ -72,7 +166,7 @@ namespace crimson { // clean config std::deque clean_mark_points; - Duration clean_age; // age at which ServerInfo cleaned + Duration clean_age; // age at which server tracker cleaned // NB: All threads declared at end, so they're destructed firs! @@ -119,20 +213,13 @@ namespace crimson { // this code can only run if a request did not precede the // response or if the record was cleaned up b/w when // the request was made and now - ServerInfo si(delta_counter, rho_counter); - si.resp_update(phase); - server_map.emplace(server_id, si); - } else { - it->second.resp_update(phase); - } - - ++delta_counter; - if (PhaseType::reservation == phase) { - ++rho_counter; + auto i = server_map.emplace(server_id, + T::create(delta_counter, rho_counter)); + it = i.first; } + it->second.resp_update(phase, delta_counter, rho_counter); } - /* * Returns the ReqParams for the given server. */ @@ -140,17 +227,11 @@ namespace crimson { DataGuard g(data_mtx); auto it = server_map.find(server); if (server_map.end() == it) { - server_map.emplace(server, ServerInfo(delta_counter, rho_counter)); + server_map.emplace(server, + T::create(delta_counter, rho_counter)); return ReqParams(1, 1); } else { - Counter delta = - 1 + delta_counter - it->second.delta_prev_req - it->second.my_delta; - Counter rho = - 1 + rho_counter - it->second.rho_prev_req - it->second.my_rho; - - it->second.req_update(delta_counter, rho_counter); - - return ReqParams(uint32_t(delta), uint32_t(rho)); + return it->second.prepare_req(delta_counter, rho_counter); } } @@ -182,7 +263,7 @@ namespace crimson { i != server_map.end(); /* empty */) { auto i2 = i++; - if (i2->second.delta_prev_req <= earliest) { + if (i2->second.get_last_delta() <= earliest) { server_map.erase(i2); } } diff --git a/test/test_dmclock_client.cc b/test/test_dmclock_client.cc index ee4172d..94ba2c6 100644 --- a/test/test_dmclock_client.cc +++ b/test/test_dmclock_client.cc @@ -109,7 +109,6 @@ namespace crimson { dmc::ServiceTracker st(std::chrono::seconds(2), std::chrono::seconds(3)); - auto rp1 = st.get_req_params(server1); EXPECT_EQ(1u, rp1.delta) << @@ -128,6 +127,7 @@ namespace crimson { "rho should be 1 with no intervening reservation responses by" << "other servers"; + // RESPONSE st.track_resp(server1, dmc::PhaseType::priority); auto rp3 = st.get_req_params(server1); @@ -139,11 +139,12 @@ namespace crimson { "rho should be 1 with no intervening reservation responses by" << "other servers"; + // RESPONSE st.track_resp(server2, dmc::PhaseType::priority); auto rp4 = st.get_req_params(server1); - EXPECT_EQ(2u, rp4.delta) << + EXPECT_EQ(1u, rp4.delta) << "delta should be 2 with one intervening priority response by " << "another server"; EXPECT_EQ(1u, rp4.rho) << @@ -159,19 +160,18 @@ namespace crimson { "rho should be 1 with no intervening reservation responses by" << "other servers"; + // RESPONSE st.track_resp(server2, dmc::PhaseType::reservation); auto rp6 = st.get_req_params(server1); - EXPECT_EQ(2u, rp6.delta) << + EXPECT_EQ(1u, rp6.delta) << "delta should be 2 with one intervening reservation response by " << "another server"; - EXPECT_EQ(2u, rp6.rho) << + EXPECT_EQ(1u, rp6.rho) << "rho should be 2 with one intervening reservation responses by " << "another server"; - // auto rp6_b = st.get_req_params(server2); - st.track_resp(server2, dmc::PhaseType::reservation); st.track_resp(server1, dmc::PhaseType::priority); st.track_resp(server2, dmc::PhaseType::priority); @@ -183,19 +183,19 @@ namespace crimson { auto rp7 = st.get_req_params(server1); EXPECT_EQ(5u, rp7.delta) << - "delta should be 5 with fourintervening responses by " << + "delta should be 5 with four intervening responses by " << "another server"; - EXPECT_EQ(3u, rp7.rho) << - "rho should be 3 with two intervening reservation responses by " << + EXPECT_EQ(1u, rp7.rho) << + "rho should be 1 with two intervening reservation responses by " << "another server"; auto rp7b = st.get_req_params(server2); - EXPECT_EQ(4u, rp7b.delta) << - "delta should be 4 with three intervening responses by " << + EXPECT_EQ(9u, rp7b.delta) << + "delta should be 9 with three intervening responses by " << "another server"; - EXPECT_EQ(2u, rp7b.rho) << - "rho should be 2 with one intervening reservation responses by " << + EXPECT_EQ(4u, rp7b.rho) << + "rho should be 4 with one intervening reservation responses by " << "another server"; auto rp8 = st.get_req_params(server1); @@ -215,5 +215,85 @@ namespace crimson { "rho should be 1 with no intervening reservation responses by " << "another server"; } // TEST + + + // NB: the BorrowingTracker has not been fully tested and the + // expected values below have not yet been compared with the + // theoretically correct values. + TEST(dmclock_client, orig_tracker_delta_rho_values) { + using ServerId = int; + + ServerId server1 = 101; + ServerId server2 = 7; + + dmc::ServiceTracker + st(std::chrono::seconds(2), std::chrono::seconds(3)); + + auto rp1 = st.get_req_params(server1); + + EXPECT_EQ(1u, rp1.delta); + EXPECT_EQ(1u, rp1.rho); + + auto rp2 = st.get_req_params(server1); + + EXPECT_EQ(1u, rp2.delta); + EXPECT_EQ(1u, rp2.rho); + + st.track_resp(server1, dmc::PhaseType::priority); + + auto rp3 = st.get_req_params(server1); + + EXPECT_EQ(1u, rp3.delta); + EXPECT_EQ(1u, rp3.rho); + + st.track_resp(server2, dmc::PhaseType::priority); + + auto rp4 = st.get_req_params(server1); + + EXPECT_EQ(2u, rp4.delta); + EXPECT_EQ(1u, rp4.rho); + + auto rp5 = st.get_req_params(server1); + + EXPECT_EQ(1u, rp5.delta); + EXPECT_EQ(1u, rp5.rho); + + st.track_resp(server2, dmc::PhaseType::reservation); + + auto rp6 = st.get_req_params(server1); + + EXPECT_EQ(2u, rp6.delta); + EXPECT_EQ(2u, rp6.rho); + + // auto rp6_b = st.get_req_params(server2); + + st.track_resp(server2, dmc::PhaseType::reservation); + st.track_resp(server1, dmc::PhaseType::priority); + st.track_resp(server2, dmc::PhaseType::priority); + st.track_resp(server2, dmc::PhaseType::reservation); + st.track_resp(server1, dmc::PhaseType::reservation); + st.track_resp(server1, dmc::PhaseType::priority); + st.track_resp(server2, dmc::PhaseType::priority); + + auto rp7 = st.get_req_params(server1); + + EXPECT_EQ(5u, rp7.delta); + EXPECT_EQ(3u, rp7.rho); + + auto rp7b = st.get_req_params(server2); + + EXPECT_EQ(4u, rp7b.delta); + EXPECT_EQ(2u, rp7b.rho); + + auto rp8 = st.get_req_params(server1); + + EXPECT_EQ(1u, rp8.delta); + EXPECT_EQ(1u, rp8.rho); + + auto rp8b = st.get_req_params(server2); + EXPECT_EQ(1u, rp8b.delta); + EXPECT_EQ(1u, rp8b.rho); + } // TEST + } // namespace dmclock } // namespace crimson