Skip to content

Commit 8e064a5

Browse files
committed
[FLINK-33653][runtime] Introduce a benchmark for default scheduler slot request matching.
1 parent 703253c commit 8e064a5

File tree

1 file changed

+122
-0
lines changed

1 file changed

+122
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.scheduler.benchmark.scheduling.slot.matching.strategy;
20+
21+
import org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
22+
import org.apache.flink.runtime.clusterframework.types.AllocationID;
23+
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
24+
import org.apache.flink.runtime.jobmaster.SlotRequestId;
25+
import org.apache.flink.runtime.jobmaster.slotpool.PendingRequest;
26+
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
27+
import org.apache.flink.runtime.jobmaster.slotpool.RequestSlotMatchingStrategy;
28+
import org.apache.flink.runtime.jobmaster.slotpool.SimpleRequestSlotMatchingStrategy;
29+
import org.apache.flink.runtime.jobmaster.slotpool.TasksBalancedRequestSlotMatchingStrategy;
30+
import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot;
31+
import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
32+
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
33+
import org.apache.flink.scheduler.benchmark.SchedulerBenchmarkExecutorBase;
34+
35+
import org.openjdk.jmh.annotations.Benchmark;
36+
import org.openjdk.jmh.annotations.BenchmarkMode;
37+
import org.openjdk.jmh.annotations.Level;
38+
import org.openjdk.jmh.annotations.Mode;
39+
import org.openjdk.jmh.annotations.Param;
40+
import org.openjdk.jmh.annotations.Setup;
41+
import org.openjdk.jmh.infra.Blackhole;
42+
import org.openjdk.jmh.runner.RunnerException;
43+
44+
import java.util.ArrayList;
45+
import java.util.Collection;
46+
import java.util.Collections;
47+
import java.util.HashMap;
48+
49+
import static org.apache.flink.scheduler.benchmark.scheduling.slot.matching.resolver.SlotMatchingResolverBenchmarkExecutor.SLOTS_PER_TASKS_MANAGER;
50+
import static org.apache.flink.scheduler.benchmark.scheduling.slot.matching.resolver.SlotMatchingResolverBenchmarkExecutor.TASK_MANAGERS;
51+
import static org.apache.flink.scheduler.benchmark.scheduling.slot.matching.resolver.SlotMatchingResolverBenchmarkExecutor.getTaskManagerLocation;
52+
import static org.apache.flink.scheduler.benchmark.scheduling.slot.matching.resolver.SlotMatchingResolverBenchmarkExecutor.newGrainfinedResourceProfile;
53+
54+
/** The executor to drive {@link RequestSlotMatchingStrategy}. */
55+
public class RequestSlotMatchingStrategyBenchmarkExecutor extends SchedulerBenchmarkExecutorBase {
56+
57+
private static final Collection<PhysicalSlot> slots = new ArrayList<>();
58+
private static final Collection<PendingRequest> slotRequests = new ArrayList<>();
59+
60+
static {
61+
// For requested groups and slots.
62+
for (int tmIndex = 0; tmIndex < TASK_MANAGERS; tmIndex++) {
63+
64+
TaskManagerLocation tml = getTaskManagerLocation(tmIndex + 1);
65+
66+
for (int slotIndex = 0; slotIndex < SLOTS_PER_TASKS_MANAGER; slotIndex++) {
67+
ResourceProfile profile = newGrainfinedResourceProfile(slotIndex);
68+
69+
slots.add(new TestingSlot(new AllocationID(), profile, tml));
70+
slotRequests.add(getPendingRequest(slotIndex + 1, slotIndex));
71+
}
72+
}
73+
}
74+
75+
private static PendingRequest getPendingRequest(float loading, int slotIndex) {
76+
return PendingRequest.createNormalRequest(
77+
new SlotRequestId(),
78+
newGrainfinedResourceProfile(slotIndex),
79+
new DefaultLoadingWeight(loading),
80+
Collections.emptyList());
81+
}
82+
83+
@Param({"NONE", "TASKS"})
84+
private TaskManagerLoadBalanceMode taskManagerLoadBalanceMode;
85+
86+
private RequestSlotMatchingStrategy requestSlotMatchingStrategy;
87+
88+
public static void main(String[] args) throws RunnerException {
89+
runBenchmark(RequestSlotMatchingStrategyBenchmarkExecutor.class);
90+
}
91+
92+
@Setup(Level.Trial)
93+
public void setup() throws Exception {
94+
requestSlotMatchingStrategy = getRequestSlotMatchingStrategy();
95+
}
96+
97+
@Benchmark
98+
@BenchmarkMode(Mode.SingleShotTime)
99+
public void runSlotsMatching(Blackhole blackhole) {
100+
blackhole.consume(
101+
requestSlotMatchingStrategy.matchRequestsAndSlots(
102+
slots, slotRequests, new HashMap<>()));
103+
}
104+
105+
private RequestSlotMatchingStrategy getRequestSlotMatchingStrategy() {
106+
switch (taskManagerLoadBalanceMode) {
107+
case TASKS:
108+
this.requestSlotMatchingStrategy =
109+
TasksBalancedRequestSlotMatchingStrategy.INSTANCE;
110+
break;
111+
case NONE:
112+
this.requestSlotMatchingStrategy = SimpleRequestSlotMatchingStrategy.INSTANCE;
113+
break;
114+
default:
115+
throw new UnsupportedOperationException(
116+
String.format(
117+
"Unsupported task manager load balance mode '%s' in %s",
118+
taskManagerLoadBalanceMode, getClass().getName()));
119+
}
120+
return requestSlotMatchingStrategy;
121+
}
122+
}

0 commit comments

Comments
 (0)