diff --git a/llm_bench/README.md b/llm_bench/README.md index 94d5a79..92fe456 100644 --- a/llm_bench/README.md +++ b/llm_bench/README.md @@ -65,6 +65,7 @@ Generation options: - `--chat`: specify to call chat API instead of raw completions - `--stream`: stream the result back. Enabling this gives "time to first token" and "time per token" metrics - (optional) `--logprobs`: corresponds to `logprobs` API parameter. For some providers, it's needed for output token counting in streaming mode. +- `--num-queries`: stop sending requests after reaching specified number of queries. ### Writing results diff --git a/llm_bench/load_test.py b/llm_bench/load_test.py index 81c7209..3554be8 100644 --- a/llm_bench/load_test.py +++ b/llm_bench/load_test.py @@ -147,6 +147,8 @@ class InitTracker: logging_params = None environment = None tokenizer = None + req_sent = 0 + req_quit_called = False @classmethod def notify_init(cls, environment, logging_params): @@ -163,13 +165,18 @@ def notify_init(cls, environment, logging_params): @classmethod def notify_first_request(cls): with cls.lock: - if cls.environment.parsed_options.qps is not None and cls.first_request_done == 0: + if ( + cls.environment.parsed_options.qps is not None + and cls.first_request_done == 0 + and cls.environment.parsed_options.num_queries is None + ): # if in QPS mode, reset after first successful request comes back cls.reset_stats() cls.first_request_done += 1 if ( cls.environment.parsed_options.qps is not None and cls.first_request_done == 0 + and cls.environment.parsed_options.num_queries is None and cls.users == cls.first_request_done ): # if in fixed load mode, reset after all users issued one request (we're in a steady state) @@ -202,6 +209,35 @@ def load_tokenizer(cls, dir): cls.tokenizer.add_bos_token = False cls.tokenizer.add_eos_token = False return cls.tokenizer + + @classmethod + def try_acquire_request(cls): + """ + Thread-safe: attempt to reserve one request slot. + Returns True if the caller should proceed to send a request. + Returns False if the num_queries (if set) has been reached. + If this acquisition brings the sent count to num_queries, schedule a single runner.stop() + to stop issuing new requests and let in-flight requests finish. + """ + if cls.environment.parsed_options.num_queries is None: + return True + with cls.lock: + if cls.req_sent >= cls.environment.parsed_options.num_queries: + if not cls.req_quit_called: + cls.req_quit_called = True + def do_quit(): + try: + if cls.environment and getattr(cls.environment, "runner", None): + print(f"Reached queries-num={cls.environment.parsed_options.num_queries}, signaling shutdown after in-flight requests finish") + cls.environment.runner.stop(graceful=True) + cls.environment.runner.user_count = 0 + except Exception as e: + print(f"Failed to quit runner: {e}") + t = threading.Thread(target=do_quit, daemon=True) + t.start() + return False + cls.req_sent += 1 + return True events.spawning_complete.add_listener(InitTracker.notify_spawning_complete) @@ -752,6 +788,8 @@ def insert_image_placeholders(self, prompt, num_images, prompt_images_positionin @task def generate_text(self): + if not InitTracker.try_acquire_request(): + return max_tokens = self.max_tokens_sampler.sample() prompt, images = self._get_input() data = self.provider_formatter.format_payload(prompt, max_tokens, images) @@ -1028,6 +1066,12 @@ def init_parser(parser): default="constant", help="Must be used with --qps. Specifies how to space out requests: equally ('constant') or by sampling wait times from a distribution ('uniform' or 'exponential'). Expected QPS is going to match --qps", ) + parser.add_argument( + "--num-queries", + type=int, + default=None, + help="Stop sending requests after reaching specified number of queries.", + ) parser.add_argument( "--burst", type=float,