Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions fastdeploy/engine/async_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ async def add_request(
)
if envs.ZMQ_SEND_BATCH_DATA and self.connection_manager is not None:
request["zmq_worker_pid"] = self.connection_manager.worker_pid
if not envs.ENABLE_V1_DATA_PROCESSOR and self.cfg.model_config.enable_mm:
if self.cfg.model_config.enable_mm:
self.request_client.send_pyobj(request)
else:
self.request_client.send_json(request)
Expand Down Expand Up @@ -543,8 +543,7 @@ async def generate(
)
else:
processed_output = response_item
if not envs.ENABLE_V1_DATA_PROCESSOR:
processed_output = RequestOutput.from_dict(processed_output)
processed_output = RequestOutput.from_dict(processed_output)
# Enrich outputs with prompt metadata on the first packet
if req_id:
prompt_meta = self._prompt_metadata.get(req_id)
Expand Down
5 changes: 2 additions & 3 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1165,7 +1165,7 @@ def _insert_zmq_task_to_scheduler(self):
while self.running:
try:
block = True if len(added_requests) == 0 else False
if not self.cfg.model_config.enable_mm and not envs.ENABLE_V1_DATA_PROCESSOR:
if not self.cfg.model_config.enable_mm:
err, data = self.recv_request_server.receive_json_once(block)
else:
err, data = self.recv_request_server.receive_pyobj_once(block)
Expand Down Expand Up @@ -1222,8 +1222,7 @@ def _insert_zmq_task_to_scheduler(self):
continue
err_msg = None
try:
if not envs.ENABLE_V1_DATA_PROCESSOR:
request = Request.from_dict(data)
request = Request.from_dict(data)
request.metrics.scheduler_recv_req_time = time.time()
main_process_metrics.requests_number.inc()
trace_carrier = data.get("trace_carrier")
Expand Down
4 changes: 2 additions & 2 deletions fastdeploy/entrypoints/engine_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ async def add_requests(self, task):
def _send_task(self, task):
if envs.ZMQ_SEND_BATCH_DATA:
task["zmq_worker_pid"] = self.worker_pid
if not self.enable_mm and not envs.ENABLE_V1_DATA_PROCESSOR:
if not self.enable_mm:
self.zmq_client.send_json(task)
else:
if envs.FD_ENABLE_E2W_TENSOR_CONVERT:
Expand Down Expand Up @@ -599,7 +599,7 @@ async def run_control_method(self, request: ControlRequest):
req_dict = request.to_dict()
if envs.ZMQ_SEND_BATCH_DATA:
req_dict["zmq_worker_pid"] = self.worker_pid
if not self.enable_mm and not envs.ENABLE_V1_DATA_PROCESSOR:
if not self.enable_mm:
self.zmq_client.send_json(req_dict)
else:
self.zmq_client.send_pyobj(req_dict)
Expand Down
7 changes: 2 additions & 5 deletions fastdeploy/entrypoints/openai/serving_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

import fastdeploy.envs as envs
import fastdeploy.metrics.trace as tracing
from fastdeploy.engine.request import Request, RequestOutput
from fastdeploy.engine.request import RequestOutput
from fastdeploy.entrypoints.openai.protocol import (
ChatCompletionRequest,
ChatCompletionResponse,
Expand Down Expand Up @@ -145,10 +145,7 @@ async def create_chat_completion(self, request: ChatCompletionRequest):
prompt_tokens = None
max_tokens = None
try:
if not envs.ENABLE_V1_DATA_PROCESSOR:
current_req_dict = request.to_dict_for_infer(f"{request_id}_0")
else:
current_req_dict = Request.from_generic_request(request, request_id=f"{request_id}_0")
current_req_dict = request.to_dict_for_infer(f"{request_id}_0")
if "chat_template" not in current_req_dict:
current_req_dict["chat_template"] = self.chat_template
current_req_dict["metrics"]["arrival_time"] = time.time()
Expand Down
7 changes: 2 additions & 5 deletions fastdeploy/entrypoints/openai/serving_completion.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

import fastdeploy.envs as envs
import fastdeploy.metrics.trace as tracing
from fastdeploy.engine.request import Request, RequestOutput
from fastdeploy.engine.request import RequestOutput
from fastdeploy.entrypoints.openai.protocol import (
CompletionLogprobs,
CompletionRequest,
Expand Down Expand Up @@ -178,10 +178,7 @@ async def create_completion(self, request: CompletionRequest):
try:
for idx, prompt in enumerate(request_prompts):
request_id_idx = f"{request_id}_{idx}"
if not envs.ENABLE_V1_DATA_PROCESSOR:
current_req_dict = request.to_dict_for_infer(request_id_idx, prompt)
else:
current_req_dict = Request.from_generic_request(request, request_id=f"{request_id}_0")
current_req_dict = request.to_dict_for_infer(request_id_idx, prompt)
current_req_dict["metrics"]["arrival_time"] = time.time()
prompt_token_ids = await self.engine_client.format_and_add_data(current_req_dict) # tokenize
if isinstance(prompt_token_ids, np.ndarray):
Expand Down
29 changes: 7 additions & 22 deletions fastdeploy/entrypoints/openai/serving_embedding.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,17 @@
"""

import base64
import time
from collections.abc import AsyncGenerator
from typing import Literal, Union

import numpy as np
from typing_extensions import assert_never, override

import fastdeploy.envs as envs
from fastdeploy.engine.pooling_params import PoolingParams
from fastdeploy.engine.request import (
EmbeddingOutput,
EmbeddingRequestOutput,
PoolingRequestOutput,
Request,
)
from fastdeploy.entrypoints.openai.protocol import (
EmbeddingCompletionRequest,
Expand Down Expand Up @@ -69,25 +66,13 @@ def __init__(self, engine_client, models, cfg, pid, ips, max_waiting_time, chat_
@override
def _request_to_dict(self, ctx: ServeContext):
request: EmbeddingRequest = ctx.request
if not envs.ENABLE_V1_DATA_PROCESSOR:
request_dict = super()._request_to_dict(ctx)
if hasattr(request, "to_pooling_params"):
pooling_params: PoolingParams = request.to_pooling_params()
pooling_params.verify("embed", self.cfg.model_config)
request_dict["pooling_params"] = pooling_params.to_dict()
request_dict["metrics"] = {}
return request_dict
else:
request_obj = None
if hasattr(request, "to_pooling_params"):
pooling_params: PoolingParams = request.to_pooling_params()
pooling_params.verify("embed", self.cfg.model_config)
request_obj = Request.from_generic_request(
req=request, request_id=ctx.request_id, pooling_params=pooling_params
)
request_obj.metrics.arrival_time = time.time()
super()._process_chat_template_kwargs(request_obj)
return request_obj
request_dict = super()._request_to_dict(ctx)
if hasattr(request, "to_pooling_params"):
pooling_params: PoolingParams = request.to_pooling_params()
pooling_params.verify("embed", self.cfg.model_config)
request_dict["pooling_params"] = pooling_params.to_dict()
request_dict["metrics"] = {}
return request_dict

@override
def _request_to_batch_dicts(self, ctx: ServeContext):
Expand Down
30 changes: 8 additions & 22 deletions fastdeploy/entrypoints/openai/serving_reward.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@
# limitations under the License.
"""

import time
from collections.abc import AsyncGenerator

from typing_extensions import override

import fastdeploy.envs as envs
from fastdeploy.engine.pooling_params import PoolingParams
from fastdeploy.engine.request import PoolingRequestOutput, Request, RewardRequestOutput
from fastdeploy.engine.request import PoolingRequestOutput, RewardRequestOutput
from fastdeploy.entrypoints.openai.protocol import (
ChatRewardData,
ChatRewardRequest,
Expand All @@ -46,25 +44,13 @@ def __init__(self, engine_client, models, cfg, pid, ips, max_waiting_time, chat_
@override
def _request_to_dict(self, ctx: ServeContext):
request: ChatRewardRequest = ctx.request
if not envs.ENABLE_V1_DATA_PROCESSOR:
request_dict = super()._request_to_dict(ctx)
if hasattr(request, "to_pooling_params"):
pooling_params: PoolingParams = request.to_pooling_params()
pooling_params.verify("reward", self.cfg.model_config)
request_dict["pooling_params"] = pooling_params.to_dict()
request_dict["metrics"] = {}
return request_dict
else:
request_obj: Request = None
if hasattr(request, "to_pooling_params"):
pooling_params: PoolingParams = request.to_pooling_params()
pooling_params.verify("reward", self.cfg.model_config)
request_obj = Request.from_generic_request(
req=request, request_id=ctx.request_id, pooling_params=pooling_params
)
request_obj.metrics.arrival_time = time.time()
super()._process_chat_template_kwargs(request_obj)
return request_obj
request_dict = super()._request_to_dict(ctx)
if hasattr(request, "to_pooling_params"):
pooling_params: PoolingParams = request.to_pooling_params()
pooling_params.verify("reward", self.cfg.model_config)
request_dict["pooling_params"] = pooling_params.to_dict()
request_dict["metrics"] = {}
return request_dict

@override
def _request_to_batch_dicts(self, ctx: ServeContext):
Expand Down
2 changes: 0 additions & 2 deletions fastdeploy/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ def _validate_split_kv_size(value: int) -> int:
"EXPORTER_OTLP_HEADERS": lambda: os.getenv("EXPORTER_OTLP_HEADERS"),
# enable kv cache block scheduler v1 (no need for kv_cache_ratio)
"ENABLE_V1_KVCACHE_SCHEDULER": lambda: int(os.getenv("ENABLE_V1_KVCACHE_SCHEDULER", "1")),
# enable data processor v2
"ENABLE_V1_DATA_PROCESSOR": lambda: int(os.getenv("ENABLE_V1_DATA_PROCESSOR", "0")),
# set prealloc block num for decoder
Comment on lines 94 to 97
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR 标题目前不符合仓库模板要求的标签格式(需要包含至少一个形如 [BugFix]/[DataProcessor] 的 tag),且 PR 描述的 Motivation/Modifications 等部分为空。建议按 .github/pull_request_template.md 补全:在标题加上合适标签,并在描述里说明为何移除该开关、是否存在迁移/兼容性影响以及如何验证(例如跑哪些测试/命令)。

Copilot uses AI. Check for mistakes.
"FD_ENC_DEC_BLOCK_NUM": lambda: int(os.getenv("FD_ENC_DEC_BLOCK_NUM", "2")),
# enbale max prefill of one execute step
Expand Down
51 changes: 10 additions & 41 deletions fastdeploy/input/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from fastdeploy.config import ErnieArchitectures, ModelConfig
from fastdeploy.entrypoints.openai.tool_parsers import ToolParserManager
from fastdeploy.reasoning import ReasoningParserManager
from fastdeploy.utils import envs
from fastdeploy.utils import llm_logger as logger


Expand Down Expand Up @@ -83,25 +82,15 @@ def create_processor(self):
logger.info(f"Plugin input processor not available ({e}), using built-in processor")
if not self.model_config.enable_mm:
if not ErnieArchitectures.contains_ernie_arch(architecture):
if not envs.ENABLE_V1_DATA_PROCESSOR:
from fastdeploy.input.text_processor import DataProcessor
else:
from fastdeploy.input.v1.text_processor import DataProcessor
from fastdeploy.input.text_processor import DataProcessor

Comment on lines 82 to 86
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

当前 PR 删除了整个 fastdeploy/input/v1 包,但仓库里仍有测试在引用这些模块(例如 tests/model_executor/test_thinking_budget.py 仍在 import fastdeploy.input.v1.ernie4_5_vl_processor / fastdeploy.input.v1.text_processor)。这会导致单测在导入阶段直接失败。建议同步更新/删除这些 v1 引用(改为使用现有的非 v1 处理器/测试路径),或保留一个兼容层(例如在旧路径下提供 re-export/alias),确保 CI 能通过。

Copilot uses AI. Check for mistakes.
self.processor = DataProcessor(
model_name_or_path=self.model_name_or_path,
reasoning_parser_obj=reasoning_parser_obj,
tool_parser_obj=tool_parser_obj,
)
else:
if not envs.ENABLE_V1_DATA_PROCESSOR:
from fastdeploy.input.ernie4_5_processor import (
Ernie4_5Processor,
)
else:
from fastdeploy.input.v1.ernie4_5_processor import (
Ernie4_5Processor,
)
from fastdeploy.input.ernie4_5_processor import Ernie4_5Processor

self.processor = Ernie4_5Processor(
model_name_or_path=self.model_name_or_path,
Expand All @@ -110,14 +99,9 @@ def create_processor(self):
)
else:
if ErnieArchitectures.contains_ernie_arch(architecture):
if not envs.ENABLE_V1_DATA_PROCESSOR:
from fastdeploy.input.ernie4_5_vl_processor import (
Ernie4_5_VLProcessor,
)
else:
from fastdeploy.input.v1.ernie4_5_vl_processor import (
Ernie4_5_VLProcessor,
)
from fastdeploy.input.ernie4_5_vl_processor import (
Ernie4_5_VLProcessor,
)

self.processor = Ernie4_5_VLProcessor(
model_name_or_path=self.model_name_or_path,
Expand All @@ -128,14 +112,9 @@ def create_processor(self):
enable_processor_cache=self.enable_processor_cache,
)
elif "PaddleOCRVL" in architecture:
if not envs.ENABLE_V1_DATA_PROCESSOR:
from fastdeploy.input.paddleocr_vl_processor import (
PaddleOCRVLProcessor,
)
else:
from fastdeploy.input.v1.paddleocr_vl_processor import (
PaddleOCRVLProcessor,
)
from fastdeploy.input.paddleocr_vl_processor import (
PaddleOCRVLProcessor,
)

self.processor = PaddleOCRVLProcessor(
config=self.model_config,
Expand All @@ -145,12 +124,7 @@ def create_processor(self):
reasoning_parser_obj=reasoning_parser_obj,
)
elif "Qwen2_5_VL" in architecture:
if not envs.ENABLE_V1_DATA_PROCESSOR:
from fastdeploy.input.qwen_vl_processor import QwenVLProcessor
else:
from fastdeploy.input.v1.qwen_vl_processor import (
QwenVLProcessor,
)
from fastdeploy.input.qwen_vl_processor import QwenVLProcessor

self.processor = QwenVLProcessor(
config=self.model_config,
Expand All @@ -161,12 +135,7 @@ def create_processor(self):
enable_processor_cache=self.enable_processor_cache,
)
elif "Qwen3VL" in architecture:
if not envs.ENABLE_V1_DATA_PROCESSOR:
from fastdeploy.input.qwen3_vl_processor import Qwen3VLProcessor
else:
from fastdeploy.input.v1.qwen3_vl_processor import (
Qwen3VLProcessor,
)
from fastdeploy.input.qwen3_vl_processor import Qwen3VLProcessor

self.processor = Qwen3VLProcessor(
config=self.model_config,
Expand Down
15 changes: 0 additions & 15 deletions fastdeploy/input/v1/__init__.py

This file was deleted.

Loading
Loading