MLflow 中的 Sentence Transformers
Sentence Transformers 已经成为将文本转换为有意义的向量表示(捕捉语义含义)的首选解决方案。通过将 sentence transformers 的强大功能与 MLflow 的全面实验跟踪相结合,您可以创建一个强大的工作流程,用于开发、监控和部署语义理解应用程序。
为什么 Sentence Transformers 在语义理解方面表现出色
为什么选择 MLflow + Sentence Transformers?
MLflow 与 sentence transformers 的集成创建了一个用于语义 AI 开发的强大工作流程
- 📊 嵌入质量跟踪:监控语义相似性得分、嵌入分布和模型在不同任务中的表现
- 🔄 模型版本控制:跟踪嵌入模型演变,并比较不同架构和微调方法的性能
- 📈 语义评估:通过全面的可视化捕捉相似性基准、聚类指标和检索性能
- 🎯 准备好部署:打包具有适当签名和依赖项的嵌入模型,以便无缝生产部署
- 👥 协作开发:通过 MLflow 的直观界面,跨团队共享嵌入模型、评估结果和语义见解
- 🚀 生产集成:部署模型用于语义搜索、文档聚类和推荐系统,并具有完整的谱系跟踪
核心工作流程
- 基本用法
- 语义搜索
- 模型评估
- 微调
- 生产部署
- 批量处理管道
加载和记录模型
MLflow 使使用 sentence transformer 模型变得非常容易
import mlflow
import mlflow.sentence_transformers
from sentence_transformers import SentenceTransformer
# Load a pre-trained model
model = SentenceTransformer("all-MiniLM-L6-v2")
# Generate sample embeddings for signature inference
sample_texts = [
"MLflow makes machine learning development easier",
"Sentence transformers create semantic embeddings",
]
sample_embeddings = model.encode(sample_texts)
# Infer model signature
signature = mlflow.models.infer_signature(sample_texts, sample_embeddings)
# Log the model to MLflow
with mlflow.start_run():
model_info = mlflow.sentence_transformers.log_model(
model=model,
name="semantic_encoder",
signature=signature,
input_example=sample_texts,
)
print(f"Model logged with URI: {model_info.model_uri}")
加载和使用模型
一旦记录,您可以轻松加载和使用您的模型
# Load as a sentence transformer model (preserves all functionality)
loaded_transformer = mlflow.sentence_transformers.load_model(model_info.model_uri)
embeddings = loaded_transformer.encode(["New text to encode"])
# Load as a generic MLflow model (for deployment)
loaded_pyfunc = mlflow.pyfunc.load_model(model_info.model_uri)
predictions = loaded_pyfunc.predict(["New text to encode"])
print("Embeddings shape:", embeddings.shape)
print("Predictions shape:", predictions.shape)
理解嵌入的模型签名
模型签名对于 sentence transformers 至关重要,因为它们定义了预期的输入格式和输出结构
import mlflow
import numpy as np
from sentence_transformers import SentenceTransformer
from mlflow.models import infer_signature
model = SentenceTransformer("all-MiniLM-L6-v2")
# Single sentence input
single_input = "This is a sample sentence."
single_output = model.encode(single_input)
# Multiple sentences input
batch_input = [
"First sentence for encoding.",
"Second sentence for batch processing.",
"Third sentence to demonstrate batching.",
]
batch_output = model.encode(batch_input)
# Infer signature for batch processing (recommended)
signature = infer_signature(batch_input, batch_output)
with mlflow.start_run():
mlflow.sentence_transformers.log_model(
model=model,
name="batch_encoder",
signature=signature,
input_example=batch_input,
)
正确签名的好处
- 📝 输入验证:确保推理期间的正确数据格式
- 🔍 API 文档:对预期输入和输出的清晰说明
- 🚀 部署就绪:启用自动端点生成和验证
- 📊 类型安全:防止生产环境中的运行时错误
构建语义搜索系统
这是一个构建和记录语义搜索系统的完整示例
import mlflow
import numpy as np
import pandas as pd
from sentence_transformers import SentenceTransformer, util
from mlflow.models import infer_signature
# Sample document corpus
documents = [
"Machine learning is a subset of artificial intelligence.",
"Deep learning uses neural networks with multiple layers.",
"Natural language processing helps computers understand text.",
"Computer vision enables machines to interpret visual information.",
"Reinforcement learning trains agents through trial and error.",
"Data science combines statistics and programming for insights.",
"Cloud computing provides scalable infrastructure resources.",
"MLflow helps manage the machine learning lifecycle.",
]
def build_semantic_search_system():
"""Build and log a complete semantic search system."""
with mlflow.start_run(run_name="semantic_search_system"):
# Load the sentence transformer
model = SentenceTransformer("all-MiniLM-L6-v2")
# Log model parameters
mlflow.log_params(
{
"model_name": "all-MiniLM-L6-v2",
"embedding_dimension": model.get_sentence_embedding_dimension(),
"max_seq_length": model.max_seq_length,
"corpus_size": len(documents),
}
)
# Encode the document corpus
print("Encoding document corpus...")
corpus_embeddings = model.encode(documents, convert_to_tensor=True)
# Save corpus and embeddings as artifacts
corpus_df = pd.DataFrame({"documents": documents})
corpus_df.to_csv("corpus.csv", index=False)
mlflow.log_artifact("corpus.csv")
# Example queries for testing
test_queries = [
"What is artificial intelligence?",
"How do neural networks work?",
"Tell me about text processing",
"What tools help with ML development?",
]
# Perform semantic search for each query
search_results = []
for query in test_queries:
print(f"\nSearching for: '{query}'")
# Encode the query
query_embedding = model.encode(query, convert_to_tensor=True)
# Calculate similarities
similarities = util.semantic_search(
query_embedding, corpus_embeddings, top_k=3
)[0]
# Store results
for hit in similarities:
search_results.append(
{
"query": query,
"document": documents[hit["corpus_id"]],
"similarity_score": hit["score"],
"rank": len([r for r in search_results if r["query"] == query])
+ 1,
}
)
# Print top results
for hit in similarities:
print(f" Score: {hit['score']:.4f} - {documents[hit['corpus_id']]}")
# Log search results
results_df = pd.DataFrame(search_results)
results_df.to_csv("search_results.csv", index=False)
mlflow.log_artifact("search_results.csv")
# Calculate evaluation metrics
avg_top1_score = results_df[results_df["rank"] == 1]["similarity_score"].mean()
avg_top3_score = results_df["similarity_score"].mean()
mlflow.log_metrics(
{
"avg_top1_similarity": avg_top1_score,
"avg_top3_similarity": avg_top3_score,
"total_queries_tested": len(test_queries),
}
)
# Log the model with inference signature
signature = infer_signature(test_queries, model.encode(test_queries))
model_info = mlflow.sentence_transformers.log_model(
model=model,
name="semantic_search_model",
signature=signature,
input_example=test_queries[:2],
)
print(f"\nModel logged successfully!")
print(f"Average top-1 similarity: {avg_top1_score:.4f}")
print(f"Average top-3 similarity: {avg_top3_score:.4f}")
return model_info
# Run the semantic search system
model_info = build_semantic_search_system()
使用 MLflow 的评估框架
MLflow 的全面评估 API 可以适用于 sentence transformer 模型,以评估嵌入质量和语义理解
import mlflow
from mlflow.models import make_metric
import pandas as pd
import numpy as np
import time
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
from scipy.stats import pearsonr, spearmanr
def create_semantic_similarity_dataset():
"""Create a labeled dataset for semantic similarity evaluation."""
# Sample similarity pairs with human-annotated scores (0-1 scale)
similarity_data = [
{
"text1": "The cat is sleeping",
"text2": "A cat is resting",
"similarity": 0.85,
},
{
"text1": "I love programming",
"text2": "Coding is my passion",
"similarity": 0.80,
},
{
"text1": "The weather is nice",
"text2": "It's raining heavily",
"similarity": 0.15,
},
{
"text1": "Machine learning is exciting",
"text2": "AI technology fascinates me",
"similarity": 0.75,
},
{
"text1": "Python is a language",
"text2": "The snake slithered away",
"similarity": 0.10,
},
{
"text1": "Data science projects",
"text2": "Analytics and statistics work",
"similarity": 0.70,
},
]
return pd.DataFrame(similarity_data)
def evaluate_embedding_model_with_mlflow(model_name):
"""Evaluate a sentence transformer using MLflow's evaluation framework."""
with mlflow.start_run(run_name=f"eval_{model_name.replace('/', '_')}"):
# Load model
model = SentenceTransformer(model_name)
# Create evaluation dataset
eval_df = create_semantic_similarity_dataset()
# Create a wrapper model that outputs similarity predictions
class SimilarityPredictionModel(mlflow.pyfunc.PythonModel):
def __init__(self, sentence_transformer_model):
self.model = sentence_transformer_model
def predict(self, context, model_input):
"""Predict similarity scores for text pairs."""
# Expect input DataFrame with 'text1' and 'text2' columns
embeddings1 = self.model.encode(model_input["text1"].tolist())
embeddings2 = self.model.encode(model_input["text2"].tolist())
similarities = []
for emb1, emb2 in zip(embeddings1, embeddings2):
similarity = cosine_similarity([emb1], [emb2])[0][0]
similarities.append(similarity)
return similarities
# Create wrapper model instance
similarity_model = SimilarityPredictionModel(model)
# Log the wrapper model for evaluation
input_example = eval_df[["text1", "text2"]].head(2)
signature = mlflow.models.infer_signature(
input_example, similarity_model.predict(None, input_example)
)
model_info = mlflow.pyfunc.log_model(
python_model=similarity_model,
name="similarity_model",
signature=signature,
input_example=input_example,
)
model_uri = model_info.model_uri
# Create custom metrics for MLflow evaluation
def pearson_correlation_metric(eval_df, builtin_metrics):
"""Calculate Pearson correlation between predictions and targets."""
predictions = eval_df["prediction"]
targets = eval_df["similarity"]
correlation, _ = pearsonr(predictions, targets)
return correlation
def spearman_correlation_metric(eval_df, builtin_metrics):
"""Calculate Spearman correlation between predictions and targets."""
predictions = eval_df["prediction"]
targets = eval_df["similarity"]
correlation, _ = spearmanr(predictions, targets)
return correlation
def accuracy_within_threshold_metric(eval_df, builtin_metrics, threshold=0.1):
"""Calculate accuracy within similarity threshold."""
predictions = eval_df["prediction"]
targets = eval_df["similarity"]
accurate = np.abs(predictions - targets) <= threshold
return np.mean(accurate)
# Create MLflow metrics
pearson_metric = make_metric(
eval_fn=pearson_correlation_metric,
greater_is_better=True,
name="pearson_correlation",
)
spearman_metric = make_metric(
eval_fn=spearman_correlation_metric,
greater_is_better=True,
name="spearman_correlation",
)
accuracy_metric = make_metric(
eval_fn=lambda df, metrics: accuracy_within_threshold_metric(
df, metrics, 0.1
),
greater_is_better=True,
name="accuracy_within_0.1",
)
# Prepare evaluation data for MLflow evaluate
eval_data_for_mlflow = eval_df[["text1", "text2", "similarity"]].copy()
# Use MLflow's evaluate API
result = mlflow.models.evaluate(
model_uri,
eval_data_for_mlflow,
targets="similarity",
model_type="regressor", # Similarity prediction is a regression task
extra_metrics=[pearson_metric, spearman_metric, accuracy_metric],
)
# Extract our custom metrics
metrics = {
"pearson_correlation": result.metrics["pearson_correlation"],
"spearman_correlation": result.metrics["spearman_correlation"],
"accuracy_within_0.1": result.metrics["accuracy_within_0.1"],
"mean_absolute_error": result.metrics["mean_absolute_error"],
"root_mean_squared_error": result.metrics["root_mean_squared_error"],
}
print(f"Evaluation completed for {model_name}")
print(f"Pearson correlation: {metrics['pearson_correlation']:.3f}")
print(f"Spearman correlation: {metrics['spearman_correlation']:.3f}")
print(f"Mean Absolute Error: {metrics['mean_absolute_error']:.3f}")
return metrics, result
# Evaluate a single model
metrics, eval_result = evaluate_embedding_model_with_mlflow("all-MiniLM-L6-v2")
特定领域的微调
为您的特定领域微调 sentence transformers,同时跟踪整个过程
import mlflow
from sentence_transformers import SentenceTransformer, InputExample, losses
from torch.utils.data import DataLoader
def fine_tune_sentence_transformer():
"""Fine-tune a sentence transformer for domain-specific data."""
# Sample training data (in practice, use much more data)
train_examples = [
InputExample(texts=["Python programming", "Coding in Python"], label=0.9),
InputExample(texts=["Machine learning model", "ML algorithm"], label=0.8),
InputExample(texts=["Data science project", "Analytics work"], label=0.7),
InputExample(texts=["Software development", "Cooking recipes"], label=0.1),
InputExample(texts=["Neural networks", "Deep learning"], label=0.9),
InputExample(texts=["Database query", "SQL programming"], label=0.8),
InputExample(texts=["Web development", "Frontend coding"], label=0.7),
InputExample(texts=["API integration", "Backend services"], label=0.6),
]
with mlflow.start_run(run_name="fine_tuning_experiment"):
# Log training parameters
train_params = {
"base_model": "all-MiniLM-L6-v2",
"num_epochs": 3,
"batch_size": 16,
"learning_rate": 2e-5,
"warmup_steps": 100,
"training_examples": len(train_examples),
}
mlflow.log_params(train_params)
# Load base model
model = SentenceTransformer("all-MiniLM-L6-v2")
# Log original model performance
original_embedding_dim = model.get_sentence_embedding_dimension()
mlflow.log_metric("original_embedding_dimension", original_embedding_dim)
# Create data loader
train_dataloader = DataLoader(train_examples, shuffle=True, batch_size=16)
# Define loss function
train_loss = losses.CosineSimilarityLoss(model)
# Track training progress
class TrainingCallback:
def __init__(self):
self.step = 0
def __call__(self, score, epoch, steps):
self.step += 1
mlflow.log_metric("training_step", self.step)
if score is not None:
mlflow.log_metric("evaluation_score", score, step=epoch)
callback = TrainingCallback()
# Fine-tune the model
print("Starting fine-tuning...")
model.fit(
train_objectives=[(train_dataloader, train_loss)],
epochs=3,
warmup_steps=100,
output_path="./fine_tuned_model",
callback=callback,
show_progress_bar=True,
)
# Log the fine-tuned model
model_info = mlflow.sentence_transformers.log_model(
model=model,
name="fine_tuned_model",
input_example=["Sample domain-specific text"],
)
# Test fine-tuned model on domain-specific examples
test_pairs = [
("Python coding", "Programming in Python"),
("Machine learning", "AI algorithms"),
("Web development", "Cooking recipes"), # Negative example
]
for text1, text2 in test_pairs:
embeddings = model.encode([text1, text2])
similarity = cosine_similarity([embeddings[0]], [embeddings[1]])[0][0]
print(f"Similarity between '{text1}' and '{text2}': {similarity:.3f}")
mlflow.log_metric(f"similarity_{text1[:10]}_{text2[:10]}", similarity)
print("Fine-tuning completed and model logged!")
return model_info
# Run fine-tuning
fine_tuned_model_info = fine_tune_sentence_transformer()
准备好生产的模型部署
创建准备好用于生产部署的模型
import mlflow
from mlflow.models import ModelSignature
from mlflow.types.schema import Schema, ColSpec
def create_production_ready_model():
"""Create a production-ready semantic search model."""
with mlflow.start_run(run_name="production_semantic_search"):
model = SentenceTransformer("all-MiniLM-L6-v2")
# Define explicit signature for production
input_schema = Schema([ColSpec("string")])
output_schema = Schema([ColSpec("double", shape=(-1, 384))])
signature = ModelSignature(inputs=input_schema, outputs=output_schema)
# Log with production configuration
model_info = mlflow.sentence_transformers.log_model(
model=model,
name="production_embedder",
signature=signature,
input_example=["Production ready text embedding"],
pip_requirements=["sentence-transformers==4.1.0", "torch>=1.11.0"],
extra_pip_requirements=["numpy>=1.21.0"],
)
# Add production metadata
mlflow.set_tags(
{
"environment": "production",
"use_case": "semantic_search",
"deployment_ready": "true",
}
)
print(f"Production model ready: {model_info.model_uri}")
return model_info
# Create production model
production_model = create_production_ready_model()
批量处理管道
为大规模嵌入创建高效的批量处理
import time
def create_batch_embedding_pipeline():
"""Create a batch processing pipeline for large-scale embedding generation."""
with mlflow.start_run(run_name="batch_embedding_pipeline"):
model = SentenceTransformer("all-MiniLM-L6-v2")
# Simulate large dataset
large_text_dataset = [
f"Document {i}: This is sample text for embedding generation."
for i in range(1000)
]
# Batch processing configuration
batch_config = {
"batch_size": 32,
"show_progress_bar": True,
"convert_to_numpy": True,
"normalize_embeddings": True,
}
mlflow.log_params(batch_config)
mlflow.log_param("total_documents", len(large_text_dataset))
# Process in batches
start_time = time.time()
embeddings = model.encode(
large_text_dataset,
batch_size=batch_config["batch_size"],
show_progress_bar=batch_config["show_progress_bar"],
convert_to_numpy=batch_config["convert_to_numpy"],
normalize_embeddings=batch_config["normalize_embeddings"],
)
processing_time = time.time() - start_time
# Log performance metrics
mlflow.log_metrics(
{
"processing_time_seconds": processing_time,
"documents_per_second": len(large_text_dataset) / processing_time,
"embedding_dimension": embeddings.shape[1],
"total_embeddings": embeddings.shape[0],
}
)
# Save embeddings as artifact
np.save("batch_embeddings.npy", embeddings)
mlflow.log_artifact("batch_embeddings.npy")
# Log optimized model for batch processing
mlflow.sentence_transformers.log_model(
model=model, name="batch_processor", input_example=large_text_dataset[:5]
)
print(
f"Processed {len(large_text_dataset)} documents in {processing_time:.2f} seconds"
)
print(f"Rate: {len(large_text_dataset) / processing_time:.1f} documents/second")
# Run batch processing pipeline
create_batch_embedding_pipeline()
高级工作流程
- 模型比较
- 自定义工作流程
系统化的多模型评估
def comprehensive_model_comparison():
"""Compare multiple sentence transformer models systematically."""
models_to_compare = [
"all-MiniLM-L6-v2",
"all-mpnet-base-v2",
"paraphrase-albert-small-v2",
"multi-qa-MiniLM-L6-cos-v1",
]
# Parent run for the comparison experiment
with mlflow.start_run(run_name="multi_model_evaluation"):
all_results = {}
for model_name in models_to_compare:
print(f"\nEvaluating {model_name}...")
# Nested run for each model
with mlflow.start_run(
run_name=f"eval_{model_name.replace('/', '_')}", nested=True
):
# Evaluate using our custom function
metrics, _ = evaluate_embedding_model_with_mlflow(model_name)
all_results[model_name] = metrics
# Create comparison summary
comparison_data = []
for model_name, metrics in all_results.items():
comparison_data.append(
{
"model": model_name,
"pearson_correlation": metrics["pearson_correlation"],
"spearman_correlation": metrics["spearman_correlation"],
"mean_absolute_error": metrics["mean_absolute_error"],
"accuracy_within_0.1": metrics["accuracy_within_0.1"],
}
)
# Log comparison results
comparison_df = pd.DataFrame(comparison_data)
comparison_df.to_csv("model_comparison.csv", index=False)
mlflow.log_artifact("model_comparison.csv")
# Find best model
best_model = comparison_df.loc[comparison_df["pearson_correlation"].idxmax()]
mlflow.set_tag("best_model", best_model["model"])
print("\n" + "=" * 60)
print("MODEL COMPARISON SUMMARY")
print("=" * 60)
print(comparison_df.round(3))
print(f"\nBest model: {best_model['model']}")
print(f"Best Pearson correlation: {best_model['pearson_correlation']:.3f}")
# Run comprehensive comparison
comprehensive_model_comparison()
性能与质量的权衡
import matplotlib.pyplot as plt
def analyze_speed_quality_tradeoffs():
"""Analyze the trade-off between model speed and quality."""
model_configs = [
{"name": "paraphrase-albert-small-v2", "category": "fast"},
{"name": "all-MiniLM-L6-v2", "category": "balanced"},
{"name": "all-mpnet-base-v2", "category": "quality"},
]
with mlflow.start_run(run_name="speed_quality_analysis"):
results = []
for config in model_configs:
model_name = config["name"]
print(f"Analyzing {model_name}...")
with mlflow.start_run(
run_name=f"analysis_{model_name.replace('/', '_')}", nested=True
):
model = SentenceTransformer(model_name)
# Speed test
test_texts = ["Sample text for speed testing"] * 100
start_time = time.time()
embeddings = model.encode(test_texts)
encoding_time = time.time() - start_time
# Quality test (simplified)
test_pairs = [
("The cat is sleeping", "A cat is resting"),
("I love programming", "Coding is my passion"),
("The weather is nice", "It's raining heavily"),
]
similarities = []
for text1, text2 in test_pairs:
emb1, emb2 = model.encode([text1, text2])
sim = cosine_similarity([emb1], [emb2])[0][0]
similarities.append(sim)
# Calculate metrics
speed = len(test_texts) / encoding_time
avg_similarity = np.mean(similarities)
result = {
"model": model_name,
"category": config["category"],
"speed_texts_per_sec": speed,
"avg_similarity_quality": avg_similarity,
"embedding_dim": model.get_sentence_embedding_dimension(),
"encoding_time": encoding_time,
}
results.append(result)
mlflow.log_metrics(result)
# Create trade-off visualization
results_df = pd.DataFrame(results)
plt.figure(figsize=(10, 6))
scatter = plt.scatter(
results_df["speed_texts_per_sec"],
results_df["avg_similarity_quality"],
s=results_df["embedding_dim"] / 5, # Size by embedding dimension
alpha=0.7,
)
for i, row in results_df.iterrows():
plt.annotate(
row["model"].split("/")[-1],
(row["speed_texts_per_sec"], row["avg_similarity_quality"]),
xytext=(5, 5),
textcoords="offset points",
)
plt.xlabel("Speed (texts/second)")
plt.ylabel("Quality (avg similarity)")
plt.title("Speed vs Quality Trade-off")
plt.grid(True, alpha=0.3)
plt.savefig("speed_quality_tradeoff.png")
mlflow.log_artifact("speed_quality_tradeoff.png")
plt.close()
results_df.to_csv("speed_quality_analysis.csv", index=False)
mlflow.log_artifact("speed_quality_analysis.csv")
# Run speed-quality analysis
analyze_speed_quality_tradeoffs()
特定领域的评估管道
def create_domain_evaluation_pipeline(domain_name, test_cases):
"""Create a domain-specific evaluation pipeline."""
with mlflow.start_run(run_name=f"domain_eval_{domain_name}"):
# Test multiple models on domain-specific tasks
models_to_test = [
"all-MiniLM-L6-v2",
"all-mpnet-base-v2",
"multi-qa-MiniLM-L6-cos-v1",
]
domain_results = {}
for model_name in models_to_test:
print(f"Testing {model_name} on {domain_name} domain...")
model = SentenceTransformer(model_name)
# Domain-specific evaluation
domain_scores = []
for case in test_cases:
query = case["query"]
expected_doc = case["expected_match"]
distractor_docs = case["distractors"]
# Encode query and documents
query_emb = model.encode([query])
doc_embs = model.encode([expected_doc] + distractor_docs)
# Calculate similarities
similarities = cosine_similarity(query_emb, doc_embs)[0]
# Check if expected match has highest similarity
best_match_idx = np.argmax(similarities)
is_correct = best_match_idx == 0 # First doc is expected match
confidence = similarities[0] # Similarity to expected match
domain_scores.append(
{"correct": is_correct, "confidence": confidence, "query": query}
)
# Calculate domain metrics
accuracy = np.mean([score["correct"] for score in domain_scores])
avg_confidence = np.mean([score["confidence"] for score in domain_scores])
domain_results[model_name] = {
"accuracy": accuracy,
"avg_confidence": avg_confidence,
"detailed_scores": domain_scores,
}
# Log model-specific metrics
mlflow.log_metrics(
{
f"{model_name}_accuracy": accuracy,
f"{model_name}_confidence": avg_confidence,
}
)
# Find best model for this domain
best_model = max(
domain_results.keys(), key=lambda x: domain_results[x]["accuracy"]
)
mlflow.log_params(
{
"domain": domain_name,
"num_test_cases": len(test_cases),
"best_model_for_domain": best_model,
}
)
# Save detailed results
results_summary = pd.DataFrame(
[
{
"model": model,
"accuracy": results["accuracy"],
"avg_confidence": results["avg_confidence"],
}
for model, results in domain_results.items()
]
)
results_summary.to_csv(f"{domain_name}_evaluation_results.csv", index=False)
mlflow.log_artifact(f"{domain_name}_evaluation_results.csv")
print(f"Best model for {domain_name}: {best_model}")
print(f"Accuracy: {domain_results[best_model]['accuracy']:.3f}")
return domain_results
# Example: Legal domain evaluation
legal_test_cases = [
{
"query": "contract termination clauses",
"expected_match": "Legal provisions regarding contract termination and breach",
"distractors": [
"Software development contracts and agreements",
"Real estate purchase agreements",
"Employment termination procedures",
],
},
{
"query": "intellectual property rights",
"expected_match": "Patents, trademarks, and copyright protections",
"distractors": [
"Physical property ownership laws",
"Digital privacy and data protection",
"Software licensing agreements",
],
},
]
legal_results = create_domain_evaluation_pipeline("legal", legal_test_cases)
最佳实践和优化
实验组织
- 🏷️ 一致的标记:使用描述性标签按用例、模型类型和评估阶段组织实验
- 📊 全面的指标:跟踪技术指标(编码速度、嵌入维度)和特定于任务的性能
- 📝 文档:包括实验设置、数据源和预期用例的详细说明
模型管理
- 🔄 版本控制:维护模型、数据集和评估协议的清晰版本控制
- 📦 工件组织:将相关工件(数据集、评估结果、可视化)存储在一起
- 🚀 部署就绪:确保模型包括适当的签名、依赖项和使用示例
性能优化
- ⚡ 批量处理:使用批量编码来提高处理多个文本时的吞吐量
- 🎯 模型选择:选择能够平衡质量和速度的模型,以满足您的特定用例
- 💾 缓存策略:缓存常用内容的嵌入,以缩短响应时间
- 性能提示
- 部署模式
高效的批量处理
def optimized_batch_encoding():
"""Demonstrate optimized batch processing techniques."""
with mlflow.start_run(run_name="batch_optimization"):
model = SentenceTransformer("all-MiniLM-L6-v2")
# Large dataset simulation
large_dataset = [
f"Document {i} with sample content for encoding." for i in range(5000)
]
# Test different batch sizes
batch_sizes = [16, 32, 64, 128]
results = []
for batch_size in batch_sizes:
print(f"Testing batch size: {batch_size}")
start_time = time.time()
embeddings = model.encode(
large_dataset,
batch_size=batch_size,
show_progress_bar=False,
convert_to_tensor=False,
normalize_embeddings=True,
)
processing_time = time.time() - start_time
throughput = len(large_dataset) / processing_time
result = {
"batch_size": batch_size,
"processing_time": processing_time,
"throughput": throughput,
"memory_efficient": batch_size <= 64,
}
results.append(result)
mlflow.log_metrics(
{
f"batch_{batch_size}_time": processing_time,
f"batch_{batch_size}_throughput": throughput,
}
)
# Find optimal batch size
optimal_batch = max(results, key=lambda x: x["throughput"])
mlflow.log_params(
{
"optimal_batch_size": optimal_batch["batch_size"],
"optimal_throughput": optimal_batch["throughput"],
"dataset_size": len(large_dataset),
}
)
# Log results
results_df = pd.DataFrame(results)
results_df.to_csv("batch_optimization_results.csv", index=False)
mlflow.log_artifact("batch_optimization_results.csv")
print(f"Optimal batch size: {optimal_batch['batch_size']}")
print(f"Best throughput: {optimal_batch['throughput']:.1f} docs/sec")
optimized_batch_encoding()
生产 API 包装器
import mlflow
from typing import List, Dict, Optional
import numpy as np
class ProductionEmbeddingService:
"""Production-ready embedding service with MLflow integration."""
def __init__(self, model_uri: str):
self.model = mlflow.sentence_transformers.load_model(model_uri)
self.model_uri = model_uri
def encode_texts(
self, texts: List[str], normalize: bool = True, batch_size: int = 32
) -> np.ndarray:
"""Encode texts with production optimizations."""
embeddings = self.model.encode(
texts,
batch_size=batch_size,
convert_to_numpy=True,
normalize_embeddings=normalize,
show_progress_bar=False,
)
return embeddings
def similarity_search(
self, query: str, documents: List[str], top_k: int = 5
) -> List[Dict]:
"""Perform similarity search with ranking."""
# Encode query and documents
query_embedding = self.model.encode([query])
doc_embeddings = self.model.encode(documents)
# Calculate similarities
similarities = cosine_similarity(query_embedding, doc_embeddings)[0]
# Get top-k results
top_indices = np.argsort(similarities)[::-1][:top_k]
results = []
for i, idx in enumerate(top_indices):
results.append(
{
"rank": i + 1,
"document": documents[idx],
"similarity_score": float(similarities[idx]),
"document_index": int(idx),
}
)
return results
def health_check(self) -> Dict:
"""Service health check."""
try:
# Test encoding
test_embedding = self.model.encode(["Health check test"])
return {
"status": "healthy",
"model_uri": self.model_uri,
"embedding_dimension": test_embedding.shape[1],
"test_successful": True,
}
except Exception as e:
return {"status": "unhealthy", "error": str(e), "test_successful": False}
def deploy_embedding_service():
"""Deploy the embedding service with MLflow tracking."""
with mlflow.start_run(run_name="production_deployment"):
# Log a model for deployment
model = SentenceTransformer("all-MiniLM-L6-v2")
model_info = mlflow.sentence_transformers.log_model(
model=model,
name="production_embedder",
input_example=["Sample production text"],
pip_requirements=["sentence-transformers>=4.0.0"],
)
# Create service instance
service = ProductionEmbeddingService(model_info.model_uri)
# Test the service
health_status = service.health_check()
mlflow.log_params(health_status)
# Performance test
test_texts = ["Test document " + str(i) for i in range(100)]
start_time = time.time()
embeddings = service.encode_texts(test_texts)
encoding_time = time.time() - start_time
# Log performance metrics
mlflow.log_metrics(
{
"service_encoding_time": encoding_time,
"service_throughput": len(test_texts) / encoding_time,
"embedding_dimension": embeddings.shape[1],
}
)
mlflow.set_tags(
{
"deployment_ready": "true",
"service_type": "embedding_api",
"production_tested": "true",
}
)
print("Production service deployed and tested successfully!")
print(f"Health status: {health_status['status']}")
print(f"Throughput: {len(test_texts) / encoding_time:.1f} texts/sec")
return service, model_info
# Deploy the service
service, deployment_info = deploy_embedding_service()
实际应用
MLflow-Sentence Transformers 集成在以下实际场景中表现出色
- 🔍 文档搜索系统:构建智能搜索引擎,了解用户意图并根据语义含义查找相关文档
- 🏷️ 内容分类:使用语义相似性(而不是关键词匹配)自动对内容进行分类和标记,准确率高
- 🤖 聊天机器人意图识别:了解用户查询并将它们与适当的响应或操作相匹配
- 📚 知识库组织:集群和组织大型文档集合,以更好地检索信息
- 🔗 推荐引擎:构建内容推荐系统,了解项目之间的语义关系
- 🌐 跨语言应用程序:开发可以在共享语义理解中跨多种语言工作的系统
- 📊 数据去重:识别相似或重复的内容,即使表达方式不同
- 🎯 问题解答:将问题与知识库或常见问题解答中的相关答案相匹配
结论
MLflow-Sentence Transformers 集成为构建、跟踪和部署语义理解应用程序提供了全面的基础。通过将 sentence transformers 强大的语义能力与 MLflow 的实验管理相结合,您可以创建以下工作流程:
- 🔍 语义感知:理解文本的真实含义,并在简单的关键字匹配之外使用它们
- 🔄 可重现:可以精确地重新创建每个嵌入模型和评估
- 📊 可比较:可以使用清晰的指标并排评估不同的模型和方法
- 📈 可扩展:从简单的相似性任务到复杂的语义搜索系统
- 👥 协作:团队可以有效地共享模型、结果和见解
- 🚀 生产就绪:无缝部署具有适当监控和版本控制的语义模型
无论您是构建您的第一个语义搜索系统还是部署企业规模的文本理解应用程序,MLflow-Sentence Transformers 集成都为有组织、可重现和可扩展的语义 AI 开发奠定了基础。