Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 0 additions & 51 deletions tests/entrypoints/openai/test_basic.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import asyncio
from http import HTTPStatus
from typing import List

import openai
import pytest
import pytest_asyncio
import requests
Expand Down Expand Up @@ -105,52 +103,3 @@ async def test_check_health(server: RemoteOpenAIServer):
response = requests.get(server.url_for("health"))

assert response.status_code == HTTPStatus.OK


@pytest.mark.parametrize(
"server_args",
[
pytest.param(["--max-model-len", "10100"],
id="default-frontend-multiprocessing"),
pytest.param(
["--disable-frontend-multiprocessing", "--max-model-len", "10100"],
id="disable-frontend-multiprocessing")
],
indirect=True,
)
@pytest.mark.asyncio
async def test_request_cancellation(server: RemoteOpenAIServer):
# clunky test: send an ungodly amount of load in with short timeouts
# then ensure that it still responds quickly afterwards

chat_input = [{"role": "user", "content": "Write a long story"}]
client = server.get_async_client(timeout=0.5)
tasks = []
# Request about 2 million tokens
for _ in range(200):
task = asyncio.create_task(
client.chat.completions.create(messages=chat_input,
model=MODEL_NAME,
max_tokens=10000,
extra_body={"min_tokens": 10000}))
tasks.append(task)

done, pending = await asyncio.wait(tasks,
return_when=asyncio.ALL_COMPLETED)

# Make sure all requests were sent to the server and timed out
# (We don't want to hide other errors like 400s that would invalidate this
# test)
assert len(pending) == 0
for d in done:
with pytest.raises(openai.APITimeoutError):
d.result()

# If the server had not cancelled all the other requests, then it would not
# be able to respond to this one within the timeout
client = server.get_async_client(timeout=5)
response = await client.chat.completions.create(messages=chat_input,
model=MODEL_NAME,
max_tokens=10)

assert len(response.choices) == 1
6 changes: 5 additions & 1 deletion tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import os
import socket
from functools import partial
from typing import AsyncIterator, Tuple

import pytest
Expand All @@ -25,7 +26,10 @@ async def mock_async_iterator(idx: int):
print(f"iterator {idx} cancelled")

iterators = [mock_async_iterator(i) for i in range(3)]
merged_iterator = merge_async_iterators(*iterators)
merged_iterator = merge_async_iterators(*iterators,
is_cancelled=partial(asyncio.sleep,
0,
result=False))

async def stream_output(generator: AsyncIterator[Tuple[int, str]]):
async for idx, output in generator:
Expand Down
11 changes: 6 additions & 5 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,12 @@ def get_client(self):
api_key=self.DUMMY_API_KEY,
)

def get_async_client(self, **kwargs):
return openai.AsyncOpenAI(base_url=self.url_for("v1"),
api_key=self.DUMMY_API_KEY,
max_retries=0,
**kwargs)
def get_async_client(self):
return openai.AsyncOpenAI(
base_url=self.url_for("v1"),
api_key=self.DUMMY_API_KEY,
max_retries=0,
)


def _test_completion(
Expand Down
46 changes: 19 additions & 27 deletions vllm/engine/async_llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1065,20 +1065,16 @@ async def generate(
>>> # Process and return the final output
>>> ...
"""
try:
async for output in await self.add_request(
request_id,
prompt,
sampling_params,
lora_request=lora_request,
trace_headers=trace_headers,
prompt_adapter_request=prompt_adapter_request,
priority=priority,
):
yield LLMEngine.validate_output(output, RequestOutput)
except asyncio.CancelledError:
await self.abort(request_id)
raise
async for output in await self.add_request(
request_id,
prompt,
sampling_params,
lora_request=lora_request,
trace_headers=trace_headers,
prompt_adapter_request=prompt_adapter_request,
priority=priority,
):
yield LLMEngine.validate_output(output, RequestOutput)

async def encode(
self,
Expand Down Expand Up @@ -1151,19 +1147,15 @@ async def encode(
>>> # Process and return the final output
>>> ...
"""
try:
async for output in await self.add_request(
request_id,
prompt,
pooling_params,
lora_request=lora_request,
trace_headers=trace_headers,
priority=priority,
):
yield LLMEngine.validate_output(output, PoolingRequestOutput)
except asyncio.CancelledError:
await self.abort(request_id)
raise
async for output in await self.add_request(
request_id,
prompt,
pooling_params,
lora_request=lora_request,
trace_headers=trace_headers,
priority=priority,
):
yield LLMEngine.validate_output(output, PoolingRequestOutput)

async def abort(self, request_id: str) -> None:
"""Abort a request.
Expand Down
11 changes: 4 additions & 7 deletions vllm/entrypoints/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.engine.async_llm_engine import AsyncLLMEngine
from vllm.entrypoints.launcher import serve_http
from vllm.entrypoints.utils import with_cancellation
from vllm.logger import init_logger
from vllm.sampling_params import SamplingParams
from vllm.usage.usage_lib import UsageContext
from vllm.utils import FlexibleArgumentParser, random_uuid
from vllm.utils import (FlexibleArgumentParser, iterate_with_cancellation,
random_uuid)
from vllm.version import __version__ as VLLM_VERSION

logger = init_logger("vllm.entrypoints.api_server")
Expand All @@ -47,18 +47,15 @@ async def generate(request: Request) -> Response:
- other fields: the sampling parameters (See `SamplingParams` for details).
"""
request_dict = await request.json()
return await _generate(request_dict, raw_request=request)


@with_cancellation
async def _generate(request_dict: dict, raw_request: Request) -> Response:
prompt = request_dict.pop("prompt")
stream = request_dict.pop("stream", False)
sampling_params = SamplingParams(**request_dict)
request_id = random_uuid()

assert engine is not None
results_generator = engine.generate(prompt, sampling_params, request_id)
results_generator = iterate_with_cancellation(
results_generator, is_cancelled=request.is_disconnected)

# Streaming case
async def stream_results() -> AsyncGenerator[bytes, None]:
Expand Down
8 changes: 0 additions & 8 deletions vllm/entrypoints/openai/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
from vllm.entrypoints.openai.serving_tokenization import (
OpenAIServingTokenization)
from vllm.entrypoints.openai.tool_parsers import ToolParserManager
from vllm.entrypoints.utils import with_cancellation
from vllm.logger import init_logger
from vllm.usage.usage_lib import UsageContext
from vllm.utils import (FlexibleArgumentParser, get_open_zmq_ipc_path,
Expand Down Expand Up @@ -312,7 +311,6 @@ async def health(raw_request: Request) -> Response:


@router.post("/tokenize")
@with_cancellation
async def tokenize(request: TokenizeRequest, raw_request: Request):
handler = tokenization(raw_request)

Expand All @@ -327,7 +325,6 @@ async def tokenize(request: TokenizeRequest, raw_request: Request):


@router.post("/detokenize")
@with_cancellation
async def detokenize(request: DetokenizeRequest, raw_request: Request):
handler = tokenization(raw_request)

Expand Down Expand Up @@ -356,7 +353,6 @@ async def show_version():


@router.post("/v1/chat/completions")
@with_cancellation
async def create_chat_completion(request: ChatCompletionRequest,
raw_request: Request):
handler = chat(raw_request)
Expand All @@ -377,7 +373,6 @@ async def create_chat_completion(request: ChatCompletionRequest,


@router.post("/v1/completions")
@with_cancellation
async def create_completion(request: CompletionRequest, raw_request: Request):
handler = completion(raw_request)
if handler is None:
Expand All @@ -395,7 +390,6 @@ async def create_completion(request: CompletionRequest, raw_request: Request):


@router.post("/v1/embeddings")
@with_cancellation
async def create_embedding(request: EmbeddingRequest, raw_request: Request):
handler = embedding(raw_request)
if handler is None:
Expand All @@ -413,7 +407,6 @@ async def create_embedding(request: EmbeddingRequest, raw_request: Request):


@router.post("/score")
@with_cancellation
async def create_score(request: ScoreRequest, raw_request: Request):
handler = score(raw_request)
if handler is None:
Expand All @@ -431,7 +424,6 @@ async def create_score(request: ScoreRequest, raw_request: Request):


@router.post("/v1/score")
@with_cancellation
async def create_score_v1(request: ScoreRequest, raw_request: Request):
logger.warning(
"To indicate that Score API is not part of standard OpenAI API, we "
Expand Down
5 changes: 5 additions & 0 deletions vllm/entrypoints/openai/serving_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from vllm.sequence import Logprob
from vllm.transformers_utils.tokenizer import AnyTokenizer, MistralTokenizer
from vllm.transformers_utils.tokenizers import maybe_serialize_tool_calls
from vllm.utils import iterate_with_cancellation

logger = init_logger(__name__)

Expand Down Expand Up @@ -233,6 +234,10 @@ async def create_chat_completion(
assert len(generators) == 1
result_generator, = generators

if raw_request:
result_generator = iterate_with_cancellation(
result_generator, raw_request.is_disconnected)

# Streaming response
if request.stream:
return self.chat_completion_stream_generator(
Expand Down
3 changes: 2 additions & 1 deletion vllm/entrypoints/openai/serving_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ async def create_completion(
# TODO: Use a vllm-specific Validation Error
return self.create_error_response(str(e))

result_generator = merge_async_iterators(*generators)
result_generator = merge_async_iterators(
*generators, is_cancelled=raw_request.is_disconnected)

model_name = self._get_model_name(lora_request)
num_prompts = len(engine_prompts)
Expand Down
5 changes: 4 additions & 1 deletion vllm/entrypoints/openai/serving_embedding.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,10 @@ async def create_embedding(
# TODO: Use a vllm-specific Validation Error
return self.create_error_response(str(e))

result_generator = merge_async_iterators(*generators)
result_generator = merge_async_iterators(
*generators,
is_cancelled=raw_request.is_disconnected if raw_request else None,
)

num_prompts = len(engine_prompts)

Expand Down
5 changes: 4 additions & 1 deletion vllm/entrypoints/openai/serving_score.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,10 @@ async def create_score(
# TODO: Use a vllm-specific Validation Error
return self.create_error_response(str(e))

result_generator = merge_async_iterators(*generators)
result_generator = merge_async_iterators(
*generators,
is_cancelled=raw_request.is_disconnected if raw_request else None,
)

num_prompts = len(engine_prompts)

Expand Down
57 changes: 0 additions & 57 deletions vllm/entrypoints/utils.py

This file was deleted.

Loading