Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
public class TransferShedderTest {
double setupLoadAvg = 0.36400000000000005;
double setupLoadStd = 0.3982762860126121;
double delta = 1e-5;

PulsarService pulsar;
NamespaceService namespaceService;
Expand Down Expand Up @@ -522,8 +523,8 @@ public void testNoOwnerLoadData() throws IllegalAccessException {
var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
assertTrue(res.isEmpty());
assertEquals(counter.getBreakdownCounters().get(Skip).get(NoBundles).get(), 1);
assertEquals(counter.getLoadAvg(), setupLoadAvg);
assertEquals(counter.getLoadStd(), setupLoadStd);
assertEquals(counter.getLoadAvg(), setupLoadAvg, delta);
assertEquals(counter.getLoadStd(), setupLoadStd, delta);
}

@Test
Expand All @@ -543,8 +544,8 @@ public void testEmptyTopBundlesLoadData() {
var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
assertTrue(res.isEmpty());
assertEquals(counter.getBreakdownCounters().get(Skip).get(NoLoadData).get(), 1);
assertEquals(counter.getLoadAvg(), setupLoadAvg);
assertEquals(counter.getLoadStd(), setupLoadStd);
assertEquals(counter.getLoadAvg(), setupLoadAvg, delta);
assertEquals(counter.getLoadStd(), setupLoadStd, delta);
}

@Test
Expand Down Expand Up @@ -587,8 +588,8 @@ public void testRecentlyUnloadedBrokers() {
expected.add(new UnloadDecision(new Unload("broker4:8080", bundleD1, Optional.of("broker2:8080")),
Success, Overloaded));
assertEquals(res, expected);
assertEquals(counter.getLoadAvg(), setupLoadAvg);
assertEquals(counter.getLoadStd(), setupLoadStd);
assertEquals(counter.getLoadAvg(), setupLoadAvg, delta);
assertEquals(counter.getLoadStd(), setupLoadStd, delta);

var now = System.currentTimeMillis();
recentlyUnloadedBrokers.put("broker1:8080", now);
Expand Down Expand Up @@ -616,8 +617,8 @@ public void testRecentlyUnloadedBundles() {
Optional.of("broker1:8080")),
Success, Overloaded));
assertEquals(res, expected);
assertEquals(counter.getLoadAvg(), setupLoadAvg);
assertEquals(counter.getLoadStd(), setupLoadStd);
assertEquals(counter.getLoadAvg(), setupLoadAvg, delta);
assertEquals(counter.getLoadStd(), setupLoadStd, delta);
}

@Test
Expand All @@ -635,8 +636,8 @@ public void testSheddingExcludedNamespaces() {
Optional.of("broker1:8080")),
Success, Overloaded));
assertEquals(res, expected);
assertEquals(counter.getLoadAvg(), setupLoadAvg);
assertEquals(counter.getLoadStd(), setupLoadStd);
assertEquals(counter.getLoadAvg(), setupLoadAvg, delta);
assertEquals(counter.getLoadStd(), setupLoadStd, delta);
}

@Test
Expand Down Expand Up @@ -674,8 +675,8 @@ public void testBundlesWithIsolationPolicies() {
expected.add(new UnloadDecision(new Unload("broker4:8080", bundleD1, Optional.of("broker1:8080")),
Success, Overloaded));
assertEquals(res, expected);
assertEquals(counter.getLoadAvg(), setupLoadAvg);
assertEquals(counter.getLoadStd(), setupLoadStd);
assertEquals(counter.getLoadAvg(), setupLoadAvg, delta);
assertEquals(counter.getLoadStd(), setupLoadStd, delta);

// Test unload a has isolation policies broker.
ctx.brokerConfiguration().setLoadBalancerTransferEnabled(false);
Expand All @@ -685,8 +686,8 @@ public void testBundlesWithIsolationPolicies() {
expected.add(new UnloadDecision(new Unload("broker4:8080", bundleD1, Optional.empty()),
Success, Overloaded));
assertEquals(res, expected);
assertEquals(counter.getLoadAvg(), setupLoadAvg);
assertEquals(counter.getLoadStd(), setupLoadStd);
assertEquals(counter.getLoadAvg(), setupLoadAvg, delta);
assertEquals(counter.getLoadStd(), setupLoadStd, delta);


// test setLoadBalancerSheddingBundlesWithPoliciesEnabled=false;
Expand All @@ -697,16 +698,16 @@ public void testBundlesWithIsolationPolicies() {
res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
assertTrue(res.isEmpty());
assertEquals(counter.getBreakdownCounters().get(Skip).get(NoBundles).get(), 1);
assertEquals(counter.getLoadAvg(), setupLoadAvg);
assertEquals(counter.getLoadStd(), setupLoadStd);
assertEquals(counter.getLoadAvg(), setupLoadAvg, delta);
assertEquals(counter.getLoadStd(), setupLoadStd, delta);

// Test unload a has isolation policies broker.
ctx.brokerConfiguration().setLoadBalancerTransferEnabled(false);
res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
assertTrue(res.isEmpty());
assertEquals(counter.getBreakdownCounters().get(Skip).get(NoBundles).get(), 2);
assertEquals(counter.getLoadAvg(), setupLoadAvg);
assertEquals(counter.getLoadStd(), setupLoadStd);
assertEquals(counter.getLoadAvg(), setupLoadAvg, delta);
assertEquals(counter.getLoadStd(), setupLoadStd, delta);
}

public BrokerLookupData getLookupData() {
Expand Down Expand Up @@ -788,8 +789,8 @@ public void testBundlesWithAntiAffinityGroup() throws MetadataStoreException {

assertTrue(res.isEmpty());
assertEquals(counter.getBreakdownCounters().get(Skip).get(NoBundles).get(), 1);
assertEquals(counter.getLoadAvg(), setupLoadAvg);
assertEquals(counter.getLoadStd(), setupLoadStd);
assertEquals(counter.getLoadAvg(), setupLoadAvg, delta);
assertEquals(counter.getLoadStd(), setupLoadStd, delta);

doAnswer(invocationOnMock -> {
Map<String, BrokerLookupData> brokers = invocationOnMock.getArgument(0);
Expand All @@ -806,8 +807,8 @@ public void testBundlesWithAntiAffinityGroup() throws MetadataStoreException {
expected2.add(new UnloadDecision(new Unload("broker5:8080", bundleE1, Optional.of("broker1:8080")),
Success, Overloaded));
assertEquals(res2, expected2);
assertEquals(counter.getLoadAvg(), setupLoadAvg);
assertEquals(counter.getLoadStd(), setupLoadStd);
assertEquals(counter.getLoadAvg(), setupLoadAvg, delta);
assertEquals(counter.getLoadStd(), setupLoadStd, delta);
}

@Test
Expand Down Expand Up @@ -837,8 +838,8 @@ public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String,

assertTrue(res.isEmpty());
assertEquals(counter.getBreakdownCounters().get(Skip).get(NoBundles).get(), 1);
assertEquals(counter.getLoadAvg(), setupLoadAvg);
assertEquals(counter.getLoadStd(), setupLoadStd);
assertEquals(counter.getLoadAvg(), setupLoadAvg, delta);
assertEquals(counter.getLoadStd(), setupLoadStd, delta);
}

@Test
Expand Down Expand Up @@ -900,8 +901,8 @@ public void testTargetStd() {

assertTrue(res.isEmpty());
assertEquals(counter.getBreakdownCounters().get(Skip).get(HitCount).get(), 1);
assertEquals(counter.getLoadAvg(), 0.2000000063578288);
assertEquals(counter.getLoadStd(), 0.08164966587949089);
assertEquals(counter.getLoadAvg(), 0.2000000063578288, delta);
assertEquals(counter.getLoadStd(), 0.08164966587949089, delta);
}

@Test
Expand All @@ -919,8 +920,8 @@ public void testSingleTopBundlesLoadData() {

assertTrue(res.isEmpty());
assertEquals(counter.getBreakdownCounters().get(Skip).get(NoBundles).get(), 1);
assertEquals(counter.getLoadAvg(), setupLoadAvg);
assertEquals(counter.getLoadStd(), setupLoadStd);
assertEquals(counter.getLoadAvg(), setupLoadAvg, delta);
assertEquals(counter.getLoadStd(), setupLoadStd, delta);
}

@Test
Expand All @@ -941,8 +942,8 @@ public void testBundleThroughputLargerThanOffloadThreshold() {
Optional.of("broker1:8080")),
Success, Overloaded));
assertEquals(res, expected);
assertEquals(counter.getLoadAvg(), setupLoadAvg);
assertEquals(counter.getLoadStd(), setupLoadStd);
assertEquals(counter.getLoadAvg(), setupLoadAvg, delta);
assertEquals(counter.getLoadStd(), setupLoadStd, delta);
}

@Test
Expand All @@ -961,8 +962,8 @@ public void testZeroBundleThroughput() {
var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
assertTrue(res.isEmpty());
assertEquals(counter.getBreakdownCounters().get(Skip).get(NoBundles).get(), 1);
assertEquals(counter.getLoadAvg(), setupLoadAvg);
assertEquals(counter.getLoadStd(), setupLoadStd);
assertEquals(counter.getLoadAvg(), setupLoadAvg, delta);
assertEquals(counter.getLoadStd(), setupLoadStd, delta);
}


Expand All @@ -980,8 +981,8 @@ public void testTargetStdAfterTransfer() {
expected.add(new UnloadDecision(new Unload("broker5:8080", bundleE1, Optional.of("broker1:8080")),
Success, Overloaded));
assertEquals(res, expected);
assertEquals(counter.getLoadAvg(), 0.26400000000000007);
assertEquals(counter.getLoadStd(), 0.27644891028904417);
assertEquals(counter.getLoadAvg(), 0.26400000000000007, delta);
assertEquals(counter.getLoadStd(), 0.27644891028904417, delta);
}

@Test
Expand Down Expand Up @@ -1032,12 +1033,12 @@ public void testUnloadBundlesGreaterThanTargetThroughput() throws IllegalAccessE
expected.add(new UnloadDecision(
new Unload("broker1:8080", "my-tenant/my-namespaceA/0x3FFFFFFF_0x4FFFFFFF",
Optional.of("broker2:8080")), Success, Overloaded));
assertEquals(counter.getLoadAvg(), 5.05);
assertEquals(counter.getLoadStd(), 4.95);
assertEquals(counter.getLoadAvg(), 5.05, delta);
assertEquals(counter.getLoadStd(), 4.95, delta);
assertEquals(res, expected);
var stats = (TransferShedder.LoadStats)
FieldUtils.readDeclaredField(transferShedder, "stats", true);
assertEquals(stats.std(), 0.050000004900021836);
assertEquals(stats.std(), 0.050000004900021836, delta);
}

@Test
Expand Down Expand Up @@ -1103,12 +1104,12 @@ public void testUnloadBundlesLessThanTargetThroughputAfterSplit() throws Illegal
new Unload("broker2:8080", "my-tenant/my-namespaceB/0x00000000_0x0FFFFFFF",
Optional.of("broker1:8080")),
Success, Overloaded));
assertEquals(counter.getLoadAvg(), 5.05);
assertEquals(counter.getLoadStd(), 4.95);
assertEquals(counter.getLoadAvg(), 5.05, delta);
assertEquals(counter.getLoadStd(), 4.95, delta);
assertEquals(res, expected);
var stats = (TransferShedder.LoadStats)
FieldUtils.readDeclaredField(transferShedder, "stats", true);
assertEquals(stats.std(), 0.050000004900021836);
assertEquals(stats.std(), 0.050000004900021836, delta);

}

Expand Down Expand Up @@ -1142,19 +1143,19 @@ public void testUnloadBundlesGreaterThanTargetThroughputAfterSplit() throws Ille
expected.add(new UnloadDecision(
new Unload("broker1:8080",
res.stream().filter(x -> x.getUnload().sourceBroker().equals("broker1:8080")).findFirst().get()
.getUnload().serviceUnit(), Optional.of("broker2:8080")),
.getUnload().serviceUnit(), Optional.of("broker2:8080")),
Success, Overloaded));
expected.add(new UnloadDecision(
new Unload("broker2:8080",
res.stream().filter(x -> x.getUnload().sourceBroker().equals("broker2:8080")).findFirst().get()
.getUnload().serviceUnit(), Optional.of("broker1:8080")),
Success, Overloaded));
assertEquals(counter.getLoadAvg(), 0.74);
assertEquals(counter.getLoadStd(), 0.26);
assertEquals(counter.getLoadAvg(), 0.74, delta);
assertEquals(counter.getLoadStd(), 0.26, delta);
assertEquals(res, expected);
var stats = (TransferShedder.LoadStats)
FieldUtils.readDeclaredField(transferShedder, "stats", true);
assertEquals(stats.std(), 2.5809568279517847E-8);
assertEquals(stats.std(), 2.5809568279517847E-8, delta);
}

@Test
Expand All @@ -1180,8 +1181,8 @@ public void testMinBrokerWithLowTraffic() throws IllegalAccessException {
expected.add(new UnloadDecision(new Unload("broker4:8080", bundleD1, Optional.of("broker2:8080")),
Success, Underloaded));
assertEquals(res, expected);
assertEquals(counter.getLoadAvg(), 0.26400000000000007);
assertEquals(counter.getLoadStd(), 0.27644891028904417);
assertEquals(counter.getLoadAvg(), 0.26400000000000007, delta);
assertEquals(counter.getLoadStd(), 0.27644891028904417, delta);
}

@Test
Expand All @@ -1204,8 +1205,8 @@ public void testMinBrokerWithLowerLoadThanAvg() throws IllegalAccessException {
expected.add(new UnloadDecision(new Unload("broker4:8080", bundleD1, Optional.of("broker2:8080")),
Success, Underloaded));
assertEquals(res, expected);
assertEquals(counter.getLoadAvg(), 0.262);
assertEquals(counter.getLoadStd(), 0.2780935094532054);
assertEquals(counter.getLoadAvg(), 0.262, delta);
assertEquals(counter.getLoadStd(), 0.2780935094532054, delta);
}

@Test
Expand All @@ -1222,8 +1223,8 @@ public void testMaxNumberOfTransfersPerShedderCycle() {
expected.add(new UnloadDecision(new Unload("broker4:8080", bundleD1, Optional.of("broker2:8080")),
Success, Overloaded));
assertEquals(res, expected);
assertEquals(counter.getLoadAvg(), setupLoadAvg);
assertEquals(counter.getLoadStd(), setupLoadStd);
assertEquals(counter.getLoadAvg(), setupLoadAvg, delta);
assertEquals(counter.getLoadStd(), setupLoadStd, delta);
}

@Test
Expand All @@ -1238,8 +1239,8 @@ public void testLoadBalancerSheddingConditionHitCountThreshold() {
var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
assertTrue(res.isEmpty());
assertEquals(counter.getBreakdownCounters().get(Skip).get(HitCount).get(), i + 1);
assertEquals(counter.getLoadAvg(), setupLoadAvg);
assertEquals(counter.getLoadStd(), setupLoadStd);
assertEquals(counter.getLoadAvg(), setupLoadAvg, delta);
assertEquals(counter.getLoadStd(), setupLoadStd, delta);
}
var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
var expected = new HashSet<UnloadDecision>();
Expand All @@ -1248,8 +1249,8 @@ public void testLoadBalancerSheddingConditionHitCountThreshold() {
expected.add(new UnloadDecision(new Unload("broker4:8080", bundleD1, Optional.of("broker2:8080")),
Success, Overloaded));
assertEquals(res, expected);
assertEquals(counter.getLoadAvg(), setupLoadAvg);
assertEquals(counter.getLoadStd(), setupLoadStd);
assertEquals(counter.getLoadAvg(), setupLoadAvg, delta);
assertEquals(counter.getLoadStd(), setupLoadStd, delta);
}

@Test
Expand All @@ -1268,8 +1269,8 @@ public void testRemainingTopBundles() {
expected.add(new UnloadDecision(new Unload("broker4:8080", bundleD1, Optional.of("broker2:8080")),
Success, Overloaded));
assertEquals(res, expected);
assertEquals(counter.getLoadAvg(), setupLoadAvg);
assertEquals(counter.getLoadStd(), setupLoadStd);
assertEquals(counter.getLoadAvg(), setupLoadAvg, delta);
assertEquals(counter.getLoadStd(), setupLoadStd, delta);
}

@Test
Expand All @@ -1289,14 +1290,14 @@ public void testLoadMoreThan100() throws IllegalAccessException {
expected.add(new UnloadDecision(new Unload("broker4:8080", bundleD1, Optional.of("broker2:8080")),
Success, Overloaded));
assertEquals(res, expected);
assertEquals(counter.getLoadAvg(), 2.4240000000000004);
assertEquals(counter.getLoadStd(), 3.8633332758124816);
assertEquals(counter.getLoadAvg(), 2.4240000000000004, delta);
assertEquals(counter.getLoadStd(), 3.8633332758124816, delta);


var stats = (TransferShedder.LoadStats)
FieldUtils.readDeclaredField(transferShedder, "stats", true);
assertEquals(stats.avg(), 2.4240000000000004);
assertEquals(stats.std(), 2.781643776903451);
assertEquals(stats.avg(), 2.4240000000000004, delta);
assertEquals(stats.std(), 2.781643776903451, delta);
}

@Test
Expand Down Expand Up @@ -1328,8 +1329,8 @@ public void testOverloadOutlier() {
new Unload("broker99:8080", "my-tenant/my-namespace99/0x00000000_0x0FFFFFFF",
Optional.of("broker83:8080")), Success, Underloaded))
);
assertEquals(counter.getLoadAvg(), 0.019900000000000008, 0.00001);
assertEquals(counter.getLoadStd(), 0.09850375627355534, 0.00001);
assertEquals(counter.getLoadAvg(), 0.019900000000000008, delta);
assertEquals(counter.getLoadStd(), 0.09850375627355534, delta);
}

@Test
Expand All @@ -1343,8 +1344,8 @@ public void testUnderloadOutlier() {
new Unload("broker98:8080", "my-tenant/my-namespace98/0x00000000_0x0FFFFFFF",
Optional.of("broker99:8080")), Success, Underloaded));
assertEquals(res, expected);
assertEquals(counter.getLoadAvg(), 0.9704000000000005, 0.00001);
assertEquals(counter.getLoadStd(), 0.09652895938523735, 0.00001);
assertEquals(counter.getLoadAvg(), 0.9704000000000005, delta);
assertEquals(counter.getLoadStd(), 0.09652895938523735, delta);
}

@Test
Expand Down Expand Up @@ -1406,8 +1407,8 @@ public void testHighVarianceLoadStats() {
}
stats.update(loadStore, availableBrokers, Map.of(), conf);

assertEquals(stats.avg(), 0.9417647058823528);
assertEquals(stats.std(), 0.23294117647058868);
assertEquals(stats.avg(), 0.9417647058823528, delta);
assertEquals(stats.std(), 0.23294117647058868, delta);
}

@Test
Expand All @@ -1424,7 +1425,7 @@ public void testLowVarianceLoadStats() {
loadStore.pushAsync("broker" + i + ":8080", getCpuLoad(ctx, loads[i], "broker" + i + ":8080"));
}
stats.update(loadStore, availableBrokers, Map.of(), conf);
assertEquals(stats.avg(), 3.9449999999999994);
assertEquals(stats.std(), 0.028722813232795824);
assertEquals(stats.avg(), 3.9449999999999994, delta);
assertEquals(stats.std(), 0.028722813232795824, delta);
}
}
Loading