Skip to content

vllm.model_executor.layers.fused_moe.prepare_finalize

Modules:

Name Description
naive_dp_ep

MoEPrepareAndFinalizeNaiveDPEPModular

Bases: FusedMoEPrepareAndFinalizeModular

Naive Prepare/Finalize for Dp/Ep case for Modular Kernels.

Uses Torch AR/RS or AR for dispatch/combine operations, applied to the topk weights and ids.

Source code in vllm/model_executor/layers/fused_moe/prepare_finalize/naive_dp_ep.py
class MoEPrepareAndFinalizeNaiveDPEPModular(mk.FusedMoEPrepareAndFinalizeModular):
    """
    Naive Prepare/Finalize for Dp/Ep case for Modular Kernels.

    Uses Torch AR/RS or AR for dispatch/combine operations, applied
    to the topk weights and ids.
    """

    def __init__(
        self,
        is_sequence_parallel: bool = False,
        num_dispatchers: int = 1,
    ) -> None:
        super().__init__()
        self.is_sequence_parallel = is_sequence_parallel
        self._num_dispatchers = num_dispatchers

    @property
    def activation_format(self) -> mk.FusedMoEActivationFormat:
        return mk.FusedMoEActivationFormat.Standard

    def max_num_tokens_per_rank(self) -> int | None:
        return None

    def topk_indices_dtype(self) -> torch.dtype | None:
        return None

    def num_dispatchers(self) -> int:
        return self._num_dispatchers

    def output_is_reduced(self) -> bool:
        return False

    def prepare(
        self,
        a1: torch.Tensor,
        topk_weights: torch.Tensor,
        topk_ids: torch.Tensor,
        num_experts: int,
        expert_map: torch.Tensor | None,
        apply_router_weight_on_input: bool,
        quant_config: FusedMoEQuantConfig,
        defer_input_quant: bool = False,
    ) -> mk.PrepareResultType:
        """Quantize and Dispatch Topk Weights and Topk Ids."""

        if apply_router_weight_on_input:
            topk = topk_ids.size(1)
            assert topk == 1, (
                "apply_router_weight_on_input is only implemented for topk=1"
            )
            # Note: do not use inplace for shared experts overlap
            a1 = a1 * topk_weights.to(a1.dtype)

        a1q, scales = _quantize_and_setup_dispatch(a1, quant_config, defer_input_quant)

        res = get_ep_group().dispatch(
            a1q,
            topk_weights,
            topk_ids,
            is_sequence_parallel=self.is_sequence_parallel,
            extra_tensors=scales,
        )

        if scales is None:
            a1q, topk_weights, topk_ids = res
            a1q_scale = None
        else:
            a1q, topk_weights, topk_ids, scales = res
            a1q_scale = _unwrap_scale_and_prepare_for_moe(scales, quant_config)

        return a1q, a1q_scale, None, topk_ids, topk_weights

    def finalize(
        self,
        output: torch.Tensor,
        fused_expert_output: torch.Tensor,
        topk_weights: torch.Tensor,
        topk_ids: torch.Tensor,
        apply_router_weight_on_input: bool,
        weight_and_reduce_impl: mk.TopKWeightAndReduce,
    ) -> None:
        if isinstance(weight_and_reduce_impl, TopKWeightAndReduceDelegate):
            weight_and_reduce_impl = TopKWeightAndReduceContiguous()

        out = weight_and_reduce_impl.apply(
            output=None,
            fused_expert_output=fused_expert_output,
            topk_weights=topk_weights,
            topk_ids=topk_ids,
            apply_router_weight_on_input=apply_router_weight_on_input,
        )

        output.copy_(
            get_ep_group().combine(out, is_sequence_parallel=self.is_sequence_parallel)
        )

prepare

prepare(
    a1: Tensor,
    topk_weights: Tensor,
    topk_ids: Tensor,
    num_experts: int,
    expert_map: Tensor | None,
    apply_router_weight_on_input: bool,
    quant_config: FusedMoEQuantConfig,
    defer_input_quant: bool = False,
) -> PrepareResultType

Quantize and Dispatch Topk Weights and Topk Ids.

Source code in vllm/model_executor/layers/fused_moe/prepare_finalize/naive_dp_ep.py
def prepare(
    self,
    a1: torch.Tensor,
    topk_weights: torch.Tensor,
    topk_ids: torch.Tensor,
    num_experts: int,
    expert_map: torch.Tensor | None,
    apply_router_weight_on_input: bool,
    quant_config: FusedMoEQuantConfig,
    defer_input_quant: bool = False,
) -> mk.PrepareResultType:
    """Quantize and Dispatch Topk Weights and Topk Ids."""

    if apply_router_weight_on_input:
        topk = topk_ids.size(1)
        assert topk == 1, (
            "apply_router_weight_on_input is only implemented for topk=1"
        )
        # Note: do not use inplace for shared experts overlap
        a1 = a1 * topk_weights.to(a1.dtype)

    a1q, scales = _quantize_and_setup_dispatch(a1, quant_config, defer_input_quant)

    res = get_ep_group().dispatch(
        a1q,
        topk_weights,
        topk_ids,
        is_sequence_parallel=self.is_sequence_parallel,
        extra_tensors=scales,
    )

    if scales is None:
        a1q, topk_weights, topk_ids = res
        a1q_scale = None
    else:
        a1q, topk_weights, topk_ids, scales = res
        a1q_scale = _unwrap_scale_and_prepare_for_moe(scales, quant_config)

    return a1q, a1q_scale, None, topk_ids, topk_weights

MoEPrepareAndFinalizeNaiveDPEPMonolithic

Bases: FusedMoEPrepareAndFinalizeMonolithic

Naive Prepare/Finalize for Dp/Ep case for Modular Kernels.

Uses Torch AR/RS or AR for dispatch/combine operations, applied to the router logits (the MoE kernel runs the router internally).

Source code in vllm/model_executor/layers/fused_moe/prepare_finalize/naive_dp_ep.py
class MoEPrepareAndFinalizeNaiveDPEPMonolithic(mk.FusedMoEPrepareAndFinalizeMonolithic):
    """
    Naive Prepare/Finalize for Dp/Ep case for Modular Kernels.

    Uses Torch AR/RS or AR for dispatch/combine operations, applied
    to the router logits (the MoE kernel runs the router internally).
    """

    def __init__(
        self,
        is_sequence_parallel: bool = False,
        num_dispatchers: int = 1,
    ) -> None:
        super().__init__()
        self.is_sequence_parallel = is_sequence_parallel
        self._num_dispatchers = num_dispatchers

    @property
    def activation_format(self) -> mk.FusedMoEActivationFormat:
        return mk.FusedMoEActivationFormat.Standard

    def max_num_tokens_per_rank(self) -> int | None:
        return None

    def topk_indices_dtype(self) -> torch.dtype | None:
        return None

    def num_dispatchers(self) -> int:
        return self._num_dispatchers

    def output_is_reduced(self) -> bool:
        return False

    def prepare(
        self,
        a1: torch.Tensor,
        router_logits: torch.Tensor,
        quant_config: FusedMoEQuantConfig,
        defer_input_quant: bool = False,
    ) -> mk.PrepareMonolithicResultType:
        """Quantize and Dispatch Router Logits."""

        a1q, scales = _quantize_and_setup_dispatch(a1, quant_config, defer_input_quant)

        res = get_ep_group().dispatch_router_logits(
            a1q,
            router_logits,
            is_sequence_parallel=self.is_sequence_parallel,
            extra_tensors=scales,
        )

        if scales is None:
            a1q, router_logits = res
            a1q_scale = None
        else:
            a1q, router_logits, scales = res
            a1q_scale = _unwrap_scale_and_prepare_for_moe(scales, quant_config)

        return a1q, a1q_scale, router_logits

    def finalize(
        self,
        fused_expert_output: torch.Tensor,
    ) -> torch.Tensor:
        out = get_ep_group().combine(
            fused_expert_output, is_sequence_parallel=self.is_sequence_parallel
        )
        return out

prepare

prepare(
    a1: Tensor,
    router_logits: Tensor,
    quant_config: FusedMoEQuantConfig,
    defer_input_quant: bool = False,
) -> PrepareMonolithicResultType

Quantize and Dispatch Router Logits.

Source code in vllm/model_executor/layers/fused_moe/prepare_finalize/naive_dp_ep.py
def prepare(
    self,
    a1: torch.Tensor,
    router_logits: torch.Tensor,
    quant_config: FusedMoEQuantConfig,
    defer_input_quant: bool = False,
) -> mk.PrepareMonolithicResultType:
    """Quantize and Dispatch Router Logits."""

    a1q, scales = _quantize_and_setup_dispatch(a1, quant_config, defer_input_quant)

    res = get_ep_group().dispatch_router_logits(
        a1q,
        router_logits,
        is_sequence_parallel=self.is_sequence_parallel,
        extra_tensors=scales,
    )

    if scales is None:
        a1q, router_logits = res
        a1q_scale = None
    else:
        a1q, router_logits, scales = res
        a1q_scale = _unwrap_scale_and_prepare_for_moe(scales, quant_config)

    return a1q, a1q_scale, router_logits