跳到主要内容

自定义服务应用程序

MLflow 的自定义服务应用程序允许您构建超越简单预测端点的复杂模型服务解决方案。使用 PyFunc 框架,您可以创建具有复杂预处理、后处理、多模型推理和业务逻辑集成的自定义应用程序。

概述

MLflow 中的自定义服务应用程序使用 mlflow.pyfunc.PythonModel 类构建,该类提供了一个灵活的框架,用于创建具有自定义逻辑的可部署模型。当您需要时,这种方法是理想选择

  • 🔄 实现高级预处理和后处理逻辑
  • 🧠 在单个服务管道中组合多个模型
  • ✅ 应用业务规则和自定义验证检查
  • 🔣 支持多样化的输入和输出数据格式
  • 🌐 与外部系统或数据库无缝集成

自定义 PyFunc 模型

自定义模型

以下是一个自定义 PyFunc 模型的示例

import mlflow
import pandas as pd
import json
from typing import Dict, List, Any
import openai # or any other LLM client


class CustomLLMModel(mlflow.pyfunc.PythonModel):
def load_context(self, context):
"""Load LLM configuration and initialize client"""
# Load model configuration from artifacts
config_path = context.artifacts.get("config", "config.json")
with open(config_path, "r") as f:
self.config = json.load(f)

# Initialize LLM client
self.client = openai.OpenAI(api_key=self.config["api_key"])
self.model_name = self.config["model_name"]
self.system_prompt = self.config.get(
"system_prompt", "You are a helpful assistant."
)

def predict(self, context, model_input):
"""Core LLM prediction logic"""
if isinstance(model_input, pd.DataFrame):
# Handle DataFrame input with prompts
responses = []
for _, row in model_input.iterrows():
user_prompt = row.get("prompt", row.get("input", ""))
processed_prompt = self._preprocess_prompt(user_prompt)
response = self._generate_response(processed_prompt)
post_processed = self._postprocess_response(response)
responses.append(post_processed)
return pd.DataFrame({"response": responses})
elif isinstance(model_input, dict):
# Handle single prompt
user_prompt = model_input.get("prompt", model_input.get("input", ""))
processed_prompt = self._preprocess_prompt(user_prompt)
response = self._generate_response(processed_prompt)
return self._postprocess_response(response)
else:
# Handle string input
processed_prompt = self._preprocess_prompt(str(model_input))
response = self._generate_response(processed_prompt)
return self._postprocess_response(response)

def _preprocess_prompt(self, prompt: str) -> str:
"""Custom prompt preprocessing logic"""
# Example: Add context, format prompt, apply templates
template = self.config.get("prompt_template", "{prompt}")
return template.format(prompt=prompt)

def _generate_response(self, prompt: str) -> str:
"""Core LLM inference"""
try:
response = self.client.chat.completions.create(
model=self.model_name,
messages=[
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": prompt},
],
temperature=self.config.get("temperature", 0.7),
max_tokens=self.config.get("max_tokens", 1000),
)
return response.choices[0].message.content
except Exception as e:
return f"Error generating response: {str(e)}"

def _postprocess_response(self, response: str) -> str:
"""Custom response postprocessing logic"""
# Example: format output, apply filters, extract specific parts
if self.config.get("strip_whitespace", True):
response = response.strip()

max_length = self.config.get("max_response_length")
if max_length and len(response) > max_length:
response = response[:max_length] + "..."

return response


# Example configuration
config = {
"api_key": "your-api-key",
"model_name": "gpt-4",
"system_prompt": "You are an expert data analyst. Provide clear, concise answers.",
"temperature": 0.3,
"max_tokens": 500,
"prompt_template": "Context: Data Analysis Task\n\nQuestion: {prompt}\n\nAnswer:",
"strip_whitespace": True,
"max_response_length": 1000,
}

# Save configuration
with open("config.json", "w") as f:
json.dump(config, f)

# Log the model
with mlflow.start_run():
# Log configuration as artifact
mlflow.log_artifact("config.json")

# Create input example
input_example = pd.DataFrame(
{"prompt": ["What is machine learning?", "Explain neural networks"]}
)

model_info = mlflow.pyfunc.log_model(
name="custom_llm_model",
python_model=CustomLLMModel(),
artifacts={"config": "config.json"},
input_example=input_example,
)

多模型集成

创建一个结合了具有不同优势的多个 LLM 的自定义应用程序

import mlflow
import mlflow.pyfunc
import pandas as pd
import json
import openai
import anthropic
from typing import List, Dict, Any


class MultiLLMEnsemble(mlflow.pyfunc.PythonModel):
def load_context(self, context):
"""Load multiple LLM configurations from artifacts"""
# Load ensemble configuration
config_path = context.artifacts["ensemble_config"]
with open(config_path, "r") as f:
self.config = json.load(f)

# Initialize multiple LLM clients
self.llm_clients = {}

# OpenAI client
if "openai" in self.config["models"]:
self.llm_clients["openai"] = openai.OpenAI(
api_key=self.config["models"]["openai"]["api_key"]
)

# Anthropic client
if "anthropic" in self.config["models"]:
self.llm_clients["anthropic"] = anthropic.Anthropic(
api_key=self.config["models"]["anthropic"]["api_key"]
)

# Add other LLM clients as needed

self.voting_strategy = self.config.get("voting_strategy", "weighted_average")
self.model_weights = self.config.get("model_weights", {})

def predict(self, context, model_input):
"""Ensemble prediction with multiple LLMs"""
if isinstance(model_input, pd.DataFrame):
responses = []
for _, row in model_input.iterrows():
prompt = row.get("prompt", row.get("input", ""))
task_type = row.get("task_type", "general")
ensemble_response = self._generate_ensemble_response(prompt, task_type)
responses.append(ensemble_response)
return pd.DataFrame({"response": responses})
else:
prompt = model_input.get("prompt", str(model_input))
task_type = (
model_input.get("task_type", "general")
if isinstance(model_input, dict)
else "general"
)
return self._generate_ensemble_response(prompt, task_type)

def _generate_ensemble_response(
self, prompt: str, task_type: str = "general"
) -> str:
"""Generate responses from multiple LLMs and combine them"""
responses = {}

# Get task-specific model configuration
task_config = self.config.get("task_routing", {}).get(task_type, {})
active_models = task_config.get("models", list(self.llm_clients.keys()))

# Generate responses from each active model
for model_name in active_models:
if model_name in self.llm_clients:
response = self._generate_single_response(model_name, prompt, task_type)
responses[model_name] = response

# Combine responses based on voting strategy
return self._combine_responses(responses, task_type)

def _generate_single_response(
self, model_name: str, prompt: str, task_type: str
) -> str:
"""Generate response from a single LLM"""
model_config = self.config["models"][model_name]

try:
if model_name == "openai":
response = self.llm_clients["openai"].chat.completions.create(
model=model_config["model_name"],
messages=[
{
"role": "system",
"content": model_config.get("system_prompt", ""),
},
{"role": "user", "content": prompt},
],
temperature=model_config.get("temperature", 0.7),
max_tokens=model_config.get("max_tokens", 1000),
)
return response.choices[0].message.content

elif model_name == "anthropic":
response = self.llm_clients["anthropic"].messages.create(
model=model_config["model_name"],
max_tokens=model_config.get("max_tokens", 1000),
temperature=model_config.get("temperature", 0.7),
messages=[{"role": "user", "content": prompt}],
)
return response.content[0].text

# Add other LLM implementations here

except Exception as e:
return f"Error from {model_name}: {str(e)}"

def _combine_responses(self, responses: Dict[str, str], task_type: str) -> str:
"""Combine multiple LLM responses using specified strategy"""
if self.voting_strategy == "best_for_task":
# Route to best model for specific task type
task_config = self.config.get("task_routing", {}).get(task_type, {})
preferred_model = task_config.get("preferred_model")
if preferred_model and preferred_model in responses:
return responses[preferred_model]

elif self.voting_strategy == "consensus":
# Return response if multiple models agree (simplified)
response_list = list(responses.values())
if len(set(response_list)) == 1:
return response_list[0]
else:
# If no consensus, return the longest response
return max(response_list, key=len)

elif self.voting_strategy == "weighted_combination":
# Combine responses with weights (simplified text combination)
combined_response = "Combined insights:\n\n"
for model_name, response in responses.items():
weight = self.model_weights.get(model_name, 1.0)
combined_response += (
f"[{model_name.upper()} - Weight: {weight}]: {response}\n\n"
)
return combined_response

# Default: return first available response
return list(responses.values())[0] if responses else "No response generated"


# Example ensemble configuration
ensemble_config = {
"voting_strategy": "best_for_task",
"models": {
"openai": {
"api_key": "your-openai-key",
"model_name": "gpt-4",
"system_prompt": "You are a helpful assistant specialized in technical analysis.",
"temperature": 0.3,
"max_tokens": 800,
},
"anthropic": {
"api_key": "your-anthropic-key",
"model_name": "claude-3-sonnet-20240229",
"temperature": 0.5,
"max_tokens": 1000,
},
},
"task_routing": {
"code_analysis": {"models": ["openai"], "preferred_model": "openai"},
"creative_writing": {"models": ["anthropic"], "preferred_model": "anthropic"},
"general": {"models": ["openai", "anthropic"], "preferred_model": "openai"},
},
"model_weights": {"openai": 0.6, "anthropic": 0.4},
}

# Save configuration
with open("ensemble_config.json", "w") as f:
json.dump(ensemble_config, f)

# Log the ensemble model
with mlflow.start_run():
# Log configuration as artifact
mlflow.log_artifact("ensemble_config.json")

# Create input example
input_example = pd.DataFrame(
{
"prompt": ["Explain quantum computing", "Write a creative story about AI"],
"task_type": ["general", "creative_writing"],
}
)

mlflow.pyfunc.log_model(
name="multi_llm_ensemble",
python_model=MultiLLMEnsemble(),
artifacts={"ensemble_config": "ensemble_config.json"},
input_example=input_example,
)

服务自定义应用程序

本地服务

创建并保存自定义应用程序后,在本地提供服务

# Serve from saved model path
mlflow models serve -m ./path/to/custom/model -p 5000

# Serve from Model Registry
mlflow models serve -m "models:/CustomApp/Production" -p 5000

Docker 部署

为您的自定义应用程序构建 Docker 镜像

# Build Docker image
mlflow models build-docker -m ./path/to/custom/model -n custom-app

# Run the container
docker run -p 5000:8080 custom-app

测试自定义应用程序

测试您的自定义服务应用程序

import requests
import pandas as pd
import json

# Prepare test data
test_data = pd.DataFrame(
{
"feature1": [1.0, 2.0, 3.0],
"feature2": [0.5, 1.5, 2.5],
"customer_value": [5000, 15000, 3000],
}
)

# Convert to the expected input format
input_data = {"inputs": test_data.to_dict("records")}

# Make prediction request
response = requests.post(
"https://:5000/invocations",
headers={"Content-Type": "application/json"},
data=json.dumps(input_data),
)

print("Response:", response.json())

自定义应用程序的最佳实践

错误处理

实施全面的错误处理

def predict(self, context, model_input):
try:
# Validate input
self._validate_input(model_input)

# Process and predict
result = self._process_prediction(model_input)

return result

except ValueError as e:
# Handle validation errors
return {"error": f"Validation error: {str(e)}"}
except Exception as e:
# Handle unexpected errors
return {"error": f"Prediction failed: {str(e)}"}

性能优化

  • 💤 延迟加载:推迟加载大型工件直到需要时
  • 🗂️ 缓存:存储并重用频繁计算的结果
  • 📦 批处理:在单个高效操作中处理多个输入
  • 🧹 内存管理:在每个请求或任务后释放未使用的资源

测试与验证

  • 🧪 单元测试:独立测试自定义模型的各个组件
  • 🔗 集成测试:端到端验证完整的预测管道
  • ✅ 输出验证:确保正确的输出格式和健壮的错误处理
  • 🚀 性能测试:使用实际数据量和负载评估可扩展性

文档

彻底记录您的自定义应用程序

  • 📥 输入/输出规范:明确定义预期的输入格式和输出结构
  • ⚙️ 业务逻辑:记录核心逻辑和决策规则
  • ⚡ 性能特征:描述预期的吞吐量、延迟和资源使用情况
  • ❗ 错误处理:说明如何检测、管理和通信错误

与 Databricks 集成

在 Databricks Managed MLflow 中,自定义应用程序可以利用额外的功能

  • ☁️ 无服务器计算:根据需求自动扩展
  • 🔐 安全集成:内置身份验证和授权
  • 📈 监控:高级指标和日志记录功能
  • 🗂️ 版本管理:通过 Unity Catalog 实现无缝模型版本管理

请注意,与 MLflow OSS 相比,Databricks 中服务端点的创建和管理方式有所不同,为企业部署提供了额外的 UI 和 API 功能。

故障排除

常见问题

  1. 导入错误:确保所有依赖项都在 conda 环境中指定
  2. 工件加载:验证工件路径是否正确且可访问
  3. 内存问题:使用大型模型或数据集时监控内存使用情况
  4. 序列化:在记录不可序列化的模型时,使用 models-from-code 功能

调试技巧

  • 🧾 启用追踪以跟踪执行流程
  • 🧪 在集成之前单独测试组件
  • 📊 使用小型测试数据集进行初步验证
  • 🖥️ 在开发过程中监控资源使用情况

自定义服务应用程序提供了灵活性,可构建与您的业务需求无缝集成的生产就绪型 ML 系统,同时保持 MLflow 服务基础设施的可靠性和可扩展性。