Skip to content

vllm.entrypoints.openai.server_utils

AuthenticationMiddleware

Pure ASGI middleware that authenticates each request by checking if the Authorization Bearer token exists and equals anyof "{api_key}".

Notes

There are two cases in which authentication is skipped: 1. The HTTP method is OPTIONS. 2. The request path doesn't start with /v1 (e.g. /health).

Source code in vllm/entrypoints/openai/server_utils.py
class AuthenticationMiddleware:
    """
    Pure ASGI middleware that authenticates each request by checking
    if the Authorization Bearer token exists and equals anyof "{api_key}".

    Notes
    -----
    There are two cases in which authentication is skipped:
        1. The HTTP method is OPTIONS.
        2. The request path doesn't start with /v1 (e.g. /health).
    """

    def __init__(self, app: ASGIApp, tokens: list[str]) -> None:
        self.app = app
        self.api_tokens = [hashlib.sha256(t.encode("utf-8")).digest() for t in tokens]

    def verify_token(self, headers: Headers) -> bool:
        authorization_header_value = headers.get("Authorization")
        if not authorization_header_value:
            return False

        scheme, _, param = authorization_header_value.partition(" ")
        if scheme.lower() != "bearer":
            return False

        param_hash = hashlib.sha256(param.encode("utf-8")).digest()

        token_match = False
        for token_hash in self.api_tokens:
            token_match |= secrets.compare_digest(param_hash, token_hash)

        return token_match

    def __call__(self, scope: Scope, receive: Receive, send: Send) -> Awaitable[None]:
        if scope["type"] not in ("http", "websocket") or scope["method"] == "OPTIONS":
            # scope["type"] can be "lifespan" or "startup" for example,
            # in which case we don't need to do anything
            return self.app(scope, receive, send)
        root_path = scope.get("root_path", "")
        url_path = URL(scope=scope).path.removeprefix(root_path)
        headers = Headers(scope=scope)
        # Type narrow to satisfy mypy.
        if url_path.startswith("/v1") and not self.verify_token(headers):
            response = JSONResponse(content={"error": "Unauthorized"}, status_code=401)
            return response(scope, receive, send)
        return self.app(scope, receive, send)

SSEDecoder

Robust Server-Sent Events decoder for streaming responses.

Source code in vllm/entrypoints/openai/server_utils.py
class SSEDecoder:
    """Robust Server-Sent Events decoder for streaming responses."""

    def __init__(self):
        self.buffer = ""
        self.content_buffer = []

    def decode_chunk(self, chunk: bytes) -> list[dict]:
        """Decode a chunk of SSE data and return parsed events."""
        import json

        try:
            chunk_str = chunk.decode("utf-8")
        except UnicodeDecodeError:
            # Skip malformed chunks
            return []

        self.buffer += chunk_str
        events = []

        # Process complete lines
        while "\n" in self.buffer:
            line, self.buffer = self.buffer.split("\n", 1)
            line = line.rstrip("\r")  # Handle CRLF

            if line.startswith("data: "):
                data_str = line[6:].strip()
                if data_str == "[DONE]":
                    events.append({"type": "done"})
                elif data_str:
                    try:
                        event_data = json.loads(data_str)
                        events.append({"type": "data", "data": event_data})
                    except json.JSONDecodeError:
                        # Skip malformed JSON
                        continue

        return events

    def extract_content(self, event_data: dict) -> str:
        """Extract content from event data."""
        return _extract_content_from_chunk(event_data)

    def add_content(self, content: str) -> None:
        """Add content to the buffer."""
        if content:
            self.content_buffer.append(content)

    def get_complete_content(self) -> str:
        """Get the complete buffered content."""
        return "".join(self.content_buffer)

add_content

add_content(content: str) -> None

Add content to the buffer.

Source code in vllm/entrypoints/openai/server_utils.py
def add_content(self, content: str) -> None:
    """Add content to the buffer."""
    if content:
        self.content_buffer.append(content)

decode_chunk

decode_chunk(chunk: bytes) -> list[dict]

Decode a chunk of SSE data and return parsed events.

Source code in vllm/entrypoints/openai/server_utils.py
def decode_chunk(self, chunk: bytes) -> list[dict]:
    """Decode a chunk of SSE data and return parsed events."""
    import json

    try:
        chunk_str = chunk.decode("utf-8")
    except UnicodeDecodeError:
        # Skip malformed chunks
        return []

    self.buffer += chunk_str
    events = []

    # Process complete lines
    while "\n" in self.buffer:
        line, self.buffer = self.buffer.split("\n", 1)
        line = line.rstrip("\r")  # Handle CRLF

        if line.startswith("data: "):
            data_str = line[6:].strip()
            if data_str == "[DONE]":
                events.append({"type": "done"})
            elif data_str:
                try:
                    event_data = json.loads(data_str)
                    events.append({"type": "data", "data": event_data})
                except json.JSONDecodeError:
                    # Skip malformed JSON
                    continue

    return events

extract_content

extract_content(event_data: dict) -> str

Extract content from event data.

Source code in vllm/entrypoints/openai/server_utils.py
def extract_content(self, event_data: dict) -> str:
    """Extract content from event data."""
    return _extract_content_from_chunk(event_data)

get_complete_content

get_complete_content() -> str

Get the complete buffered content.

Source code in vllm/entrypoints/openai/server_utils.py
def get_complete_content(self) -> str:
    """Get the complete buffered content."""
    return "".join(self.content_buffer)

XRequestIdMiddleware

Middleware the set's the X-Request-Id header for each response to a random uuid4 (hex) value if the header isn't already present in the request, otherwise use the provided request id.

Source code in vllm/entrypoints/openai/server_utils.py
class XRequestIdMiddleware:
    """
    Middleware the set's the X-Request-Id header for each response
    to a random uuid4 (hex) value if the header isn't already
    present in the request, otherwise use the provided request id.
    """

    def __init__(self, app: ASGIApp) -> None:
        self.app = app

    def __call__(self, scope: Scope, receive: Receive, send: Send) -> Awaitable[None]:
        if scope["type"] not in ("http", "websocket"):
            return self.app(scope, receive, send)

        # Extract the request headers.
        request_headers = Headers(scope=scope)

        async def send_with_request_id(message: Message) -> None:
            """
            Custom send function to mutate the response headers
            and append X-Request-Id to it.
            """
            if message["type"] == "http.response.start":
                response_headers = MutableHeaders(raw=message["headers"])
                request_id = request_headers.get("X-Request-Id", uuid.uuid4().hex)
                response_headers.append("X-Request-Id", request_id)
            await send(message)

        return self.app(scope, receive, send_with_request_id)

_extract_content_from_chunk

_extract_content_from_chunk(chunk_data: dict) -> str

Extract content from a streaming response chunk.

Source code in vllm/entrypoints/openai/server_utils.py
def _extract_content_from_chunk(chunk_data: dict) -> str:
    """Extract content from a streaming response chunk."""
    try:
        from vllm.entrypoints.openai.chat_completion.protocol import (
            ChatCompletionStreamResponse,
        )
        from vllm.entrypoints.openai.completion.protocol import (
            CompletionStreamResponse,
        )

        # Try using Completion types for type-safe parsing
        if chunk_data.get("object") == "chat.completion.chunk":
            chat_response = ChatCompletionStreamResponse.model_validate(chunk_data)
            if chat_response.choices and chat_response.choices[0].delta.content:
                return chat_response.choices[0].delta.content
        elif chunk_data.get("object") == "text_completion":
            completion_response = CompletionStreamResponse.model_validate(chunk_data)
            if completion_response.choices and completion_response.choices[0].text:
                return completion_response.choices[0].text
    except pydantic.ValidationError:
        # Fallback to manual parsing
        if "choices" in chunk_data and chunk_data["choices"]:
            choice = chunk_data["choices"][0]
            if "delta" in choice and choice["delta"].get("content"):
                return choice["delta"]["content"]
            elif choice.get("text"):
                return choice["text"]
    return ""

_log_non_streaming_response

_log_non_streaming_response(response_body: list) -> None

Log non-streaming response.

Source code in vllm/entrypoints/openai/server_utils.py
def _log_non_streaming_response(response_body: list) -> None:
    """Log non-streaming response."""
    try:
        decoded_body = response_body[0].decode()
        logger.info("response_body={%s}", decoded_body)
    except UnicodeDecodeError:
        logger.info("response_body={<binary_data>}")

_log_streaming_response

_log_streaming_response(
    response, response_body: list
) -> None

Log streaming response with robust SSE parsing.

Source code in vllm/entrypoints/openai/server_utils.py
def _log_streaming_response(response, response_body: list) -> None:
    """Log streaming response with robust SSE parsing."""
    from starlette.concurrency import iterate_in_threadpool

    sse_decoder = SSEDecoder()
    chunk_count = 0

    def buffered_iterator():
        nonlocal chunk_count

        for chunk in response_body:
            chunk_count += 1
            yield chunk

            # Parse SSE events from chunk
            events = sse_decoder.decode_chunk(chunk)

            for event in events:
                if event["type"] == "data":
                    content = sse_decoder.extract_content(event["data"])
                    sse_decoder.add_content(content)
                elif event["type"] == "done":
                    # Log complete content when done
                    full_content = sse_decoder.get_complete_content()
                    if full_content:
                        # Truncate if too long
                        if len(full_content) > 2048:
                            full_content = full_content[:2048] + ""
                            "...[truncated]"
                        logger.info(
                            "response_body={streaming_complete: content=%r, chunks=%d}",
                            full_content,
                            chunk_count,
                        )
                    else:
                        logger.info(
                            "response_body={streaming_complete: no_content, chunks=%d}",
                            chunk_count,
                        )
                    return

    response.body_iterator = iterate_in_threadpool(buffered_iterator())
    logger.info("response_body={streaming_started: chunks=%d}", len(response_body))

engine_error_handler async

engine_error_handler(
    req: Request, exc: EngineDeadError | EngineGenerateError
)

VLLM V1 AsyncLLM catches exceptions and returns only two types: EngineGenerateError and EngineDeadError.

EngineGenerateError is raised by the per request generate() method. This error could be request specific (and therefore recoverable - e.g. if there is an error in input processing).

EngineDeadError is raised by the background output_handler method. This error is global and therefore not recoverable.

We register these @app.exception_handlers to return nice responses to the end user if they occur and shut down if needed. See https://fastapi.tiangolo.com/tutorial/handling-errors/ for more details on how exception handlers work.

If an exception is encountered in a StreamingResponse generator, the exception is not raised, since we already sent a 200 status. Rather, we send an error message as the next chunk. Since the exception is not raised, this means that the server will not automatically shut down. Instead, we use the watchdog background task for check for errored state.

Source code in vllm/entrypoints/openai/server_utils.py
async def engine_error_handler(
    req: Request, exc: EngineDeadError | EngineGenerateError
):
    """
    VLLM V1 AsyncLLM catches exceptions and returns
    only two types: EngineGenerateError and EngineDeadError.

    EngineGenerateError is raised by the per request generate()
    method. This error could be request specific (and therefore
    recoverable - e.g. if there is an error in input processing).

    EngineDeadError is raised by the background output_handler
    method. This error is global and therefore not recoverable.

    We register these @app.exception_handlers to return nice
    responses to the end user if they occur and shut down if needed.
    See https://fastapi.tiangolo.com/tutorial/handling-errors/
    for more details on how exception handlers work.

    If an exception is encountered in a StreamingResponse
    generator, the exception is not raised, since we already sent
    a 200 status. Rather, we send an error message as the next chunk.
    Since the exception is not raised, this means that the server
    will not automatically shut down. Instead, we use the watchdog
    background task for check for errored state.
    """

    if req.app.state.args.log_error_stack:
        logger.exception(
            "Engine Exception caught. Request id: %s",
            req.state.request_metadata.request_id
            if hasattr(req.state, "request_metadata")
            else None,
        )

    terminate_if_errored(
        server=req.app.state.server,
        engine=req.app.state.engine_client,
    )
    return Response(status_code=HTTPStatus.INTERNAL_SERVER_ERROR)

get_uvicorn_log_config

get_uvicorn_log_config(args: Namespace) -> dict | None

Get the uvicorn log config based on the provided arguments.

Priority: 1. If log_config_file is specified, use it 2. If disable_access_log_for_endpoints is specified, create a config with the access log filter 3. Otherwise, return None (use uvicorn defaults)

Source code in vllm/entrypoints/openai/server_utils.py
def get_uvicorn_log_config(args: Namespace) -> dict | None:
    """
    Get the uvicorn log config based on the provided arguments.

    Priority:
    1. If log_config_file is specified, use it
    2. If disable_access_log_for_endpoints is specified, create a config with
       the access log filter
    3. Otherwise, return None (use uvicorn defaults)
    """
    # First, try to load from file if specified
    log_config = load_log_config(args.log_config_file)
    if log_config is not None:
        return log_config

    # If endpoints to filter are specified, create a config with the filter
    if args.disable_access_log_for_endpoints:
        from vllm.logging_utils import create_uvicorn_log_config

        # Parse comma-separated string into list
        excluded_paths = [
            p.strip()
            for p in args.disable_access_log_for_endpoints.split(",")
            if p.strip()
        ]
        return create_uvicorn_log_config(
            excluded_paths=excluded_paths,
            log_level=args.uvicorn_log_level,
        )

    return None