构建支持多模态召回的插件化推荐引擎Kit


团队里不同业务线的推荐场景越来越多,一个痛点也愈发明显:每个新场景,无论是商品推荐、文章推荐,还是最近的图片素材推荐,我们都在重复搭建相似的数据处理、向量化、召回流程。代码库里充斥着大量高度耦合、难以复用的脚本,每次需求变更都像是在雷区里排线。一个业务线的 embedding 模型更新,可能会意外影响另一个看似无关的召回任务。维护成本已经高到无法忽视。

我们需要一个标准化的解决方案,一个“推荐召回工具箱”(Kit),而不是一堆散乱的脚本。这个 Kit 的核心目标是解耦推荐召回链路中的各个组件:数据源、Embedding 模型、向量存储。理想状态下,产品经理提出一个新的推荐需求,我们只需要通过配置文件组合不同的插件,就能快速上线一个原型,而不是重写整个数据管道。

初步构想的核心是微内核与插件化架构。内核(RecommendationKit)负责编排整个流程,它不关心数据具体从哪里来(MySQL?CSV?),也不关心文本或图片如何被转换成向量,更不关心向量最终存入哪个数据库(Pinecone?Milvus?)。这些具体实现都由独立的、可插拔的插件来完成。

graph TD
    subgraph RecommendationKit Core
        A[Orchestrator]
    end

    subgraph Plugins
        B1[DataSource: CSV]
        B2[DataSource: PostgreSQL]
        B3[...]

        C1[Embedding: SentenceTransformer]
        C2[Embedding: OpenAI CLIP]
        C3[...]

        D1[VectorStore: Pinecone]
        D2[VectorStore: Local FAISS]
        D3[...]
    end

    E[config.yaml] --> A

    A -- "load_data()" --> B1
    A -- "embed()" --> C1
    A -- "upsert()" --> D1

    style A fill:#f9f,stroke:#333,stroke-width:2px
    style B1 fill:#ccf,stroke:#333,stroke-width:1px
    style C1 fill:#cfc,stroke:#333,stroke-width:1px
    style D1 fill:#fec,stroke:#333,stroke-width:1px

在技术选型上,向量数据库我们选择了 Pinecone。原因很简单:作为初创团队,维护一个自建的 Milvus 或 Weaviate 集群的人力成本太高。Pinecone 的 Serverless 模式提供了开箱即用的高可用性和弹性伸缩,让我们能专注于业务逻辑而非底层运维。它的性能在我们的测试中也完全满足要求。这个决定是基于成本和维护性的务实考量。

定义插件接口

一切始于接口定义。清晰、稳定的接口是插件化架构的基石。我们需要三个核心接口:DataSourceEmbeddingStrategyVectorStore.

# rec_kit/interfaces.py

from abc import ABC, abstractmethod
from typing import List, Dict, Any, Iterator, Union
import numpy as np

# 数据载体,确保插件间传递的数据结构一致
class Document:
    def __init__(self, doc_id: str, content: Union[str, Any], metadata: Dict[str, Any]):
        self.id = doc_id
        self.content = content
        self.metadata = metadata

    def __repr__(self):
        return f"Document(id={self.id}, metadata={self.metadata})"

# 向量载体
class Vector:
    def __init__(self, vec_id: str, values: List[float], metadata: Dict[str, Any]):
        self.id = vec_id
        self.values = values
        self.metadata = metadata

    def __repr__(self):
        return f"Vector(id={self.id}, metadata={self.metadata})"

class DataSource(ABC):
    """
    数据源接口,负责加载原始数据。
    使用迭代器模式以支持大规模数据集,避免一次性加载到内存。
    """
    @abstractmethod
    def __init__(self, config: Dict[str, Any]):
        pass

    @abstractmethod
    def stream_documents(self) -> Iterator[Document]:
        """以流式方式产生文档对象"""
        pass

class EmbeddingStrategy(ABC):
    """
    向量化策略接口,负责将文档内容转换为向量。
    """
    @abstractmethod
    def __init__(self, config: Dict[str, Any]):
        pass

    @abstractmethod
    def embed(self, documents: List[Document]) -> List[Vector]:
        """批量将文档内容转换为向量"""
        pass

    @property
    @abstractmethod
    def dimension(self) -> int:
        """返回向量维度"""
        pass


class VectorStore(ABC):
    """
    向量存储接口,负责向量的存储和检索。
    """
    @abstractmethod
    def __init__(self, config: Dict[str, Any]):
        pass

    @abstractmethod
    def create_index_if_not_exists(self, dimension: int):
        """确保索引存在,如果不存在则根据维度创建"""
        pass

    @abstractmethod
    def upsert(self, vectors: List[Vector], batch_size: int = 100):
        """批量插入或更新向量"""
        pass

    @abstractmethod
    def query(self, query_vector: List[float], top_k: int, filter_criteria: Dict[str, Any] = None) -> List[Dict[str, Any]]:
        """执行向量查询"""
        pass

这里的关键设计在于:

  1. 统一数据结构: DocumentVector 作为标准的数据传输对象(DTO),确保了插件之间解耦。
  2. 流式处理: DataSourcestream_documents 返回一个迭代器,这对于处理G级别甚至T级别的数据至关重要,可以有效控制内存占用。
  3. 批量操作: embedupsert 都设计为批量操作,这是在与外部服务(如模型API、向量数据库)交互时提升性能的常用手段。

实现核心插件:以 Pinecone 和 SentenceTransformer 为例

接口只是蓝图,现在需要具体的实现。我们先实现最常用的文本推荐场景所需的一套插件。

Pinecone 向量存储插件

这个插件是与 Pinecone 服务交互的封装。一个常见的错误是在每个操作中都初始化客户端,正确的做法是在 __init__ 中初始化一次并复用。配置和 API Key 必须通过外部传入,而不是硬编码。

# rec_kit/plugins/vector_stores/pinecone_store.py

import os
import logging
from typing import List, Dict, Any
from pinecone import Pinecone as PineconeClient, PodSpec
from tenacity import retry, stop_after_attempt, wait_random_exponential

from rec_kit.interfaces import VectorStore, Vector

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class PineconeVectorStore(VectorStore):
    def __init__(self, config: Dict[str, Any]):
        api_key = os.getenv("PINECONE_API_KEY")
        if not api_key:
            raise ValueError("PINECONE_API_KEY environment variable not set.")
        
        self.index_name = config.get("index_name")
        if not self.index_name:
            raise ValueError("`index_name` must be provided in config.")
        
        self.environment = os.getenv("PINECONE_ENVIRONMENT", "gcp-starter") # 默认环境
        
        # 生产级代码必须考虑客户端的初始化和复用
        try:
            self.client = PineconeClient(api_key=api_key)
            self.index = None
        except Exception as e:
            logger.error(f"Failed to initialize Pinecone client: {e}")
            raise

    def create_index_if_not_exists(self, dimension: int):
        if self.index_name not in self.client.list_indexes().names():
            logger.info(f"Index '{self.index_name}' not found. Creating a new one with dimension {dimension}.")
            try:
                self.client.create_index(
                    name=self.index_name,
                    dimension=dimension,
                    metric="cosine", # 对于embedding,cosine相似度通常是好的选择
                    spec=PodSpec(environment=self.environment)
                )
                logger.info(f"Index '{self.index_name}' created successfully.")
            except Exception as e:
                logger.error(f"Failed to create Pinecone index: {e}")
                raise
        else:
            logger.info(f"Index '{self.index_name}' already exists.")
        
        self.index = self.client.Index(self.index_name)

    # 增加重试机制,应对网络波动或Pinecone的临时不可用
    @retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(5))
    def upsert(self, vectors: List[Vector], batch_size: int = 100):
        if not self.index:
            raise RuntimeError("Index is not initialized. Call create_index_if_not_exists first.")
        
        logger.info(f"Upserting {len(vectors)} vectors in batches of {batch_size}...")
        for i in range(0, len(vectors), batch_size):
            batch = vectors[i:i + batch_size]
            vectors_to_upsert = [
                {"id": v.id, "values": v.values, "metadata": v.metadata} for v in batch
            ]
            try:
                self.index.upsert(vectors=vectors_to_upsert)
            except Exception as e:
                logger.error(f"Error during batch upsert to Pinecone: {e}")
                # 抛出异常以触发tenacity重试
                raise
        logger.info("Upsert completed.")

    @retry(wait=wait_random_exponential(min=1, max=30), stop=stop_after_attempt(3))
    def query(self, query_vector: List[float], top_k: int, filter_criteria: Dict[str, Any] = None) -> List[Dict[str, Any]]:
        if not self.index:
            raise RuntimeError("Index is not initialized.")
        
        try:
            results = self.index.query(
                vector=query_vector,
                top_k=top_k,
                filter=filter_criteria,
                include_metadata=True
            )
            return [
                {"id": match.id, "score": match.score, "metadata": match.metadata}
                for match in results.matches
            ]
        except Exception as e:
            logger.error(f"Error during query from Pinecone: {e}")
            raise

SentenceTransformer 文本向量化插件

这个插件封装了一个本地的 embedding 模型,避免了对外部 API 的依赖和费用。

# rec_kit/plugins/embedding_strategies/sentence_transformer_strategy.py

from typing import List, Dict, Any
from sentence_transformers import SentenceTransformer
import numpy as np
import logging

from rec_kit.interfaces import EmbeddingStrategy, Document, Vector

logger = logging.getLogger(__name__)

class SentenceTransformerStrategy(EmbeddingStrategy):
    def __init__(self, config: Dict[str, Any]):
        model_name = config.get("model_name", "all-MiniLM-L6-v2")
        logger.info(f"Loading SentenceTransformer model: {model_name}")
        try:
            self.model = SentenceTransformer(model_name)
            self._dimension = self.model.get_sentence_embedding_dimension()
        except Exception as e:
            logger.error(f"Failed to load model {model_name}: {e}")
            raise

    def embed(self, documents: List[Document]) -> List[Vector]:
        # 只处理content是字符串的情况
        contents = [doc.content for doc in documents if isinstance(doc.content, str)]
        if not contents:
            return []
            
        logger.info(f"Embedding {len(contents)} documents...")
        # encode方法本身就是优化的批量操作
        embeddings = self.model.encode(contents, show_progress_bar=False)
        
        vectors = []
        doc_idx = 0
        for doc in documents:
            if isinstance(doc.content, str):
                vectors.append(
                    Vector(
                        vec_id=doc.id,
                        values=embeddings[doc_idx].tolist(),
                        metadata=doc.metadata
                    )
                )
                doc_idx += 1
        return vectors

    @property
    def dimension(self) -> int:
        return self._dimension

微内核:编排与动态加载

内核是整个 Kit 的大脑。它负责解析配置,动态加载并初始化所需的插件,然后按照 数据读取 -> 向量化 -> 存储 的顺序驱动整个流程。动态加载是关键,它使得我们可以在不修改内核代码的情况下,仅通过增加插件文件和修改配置来扩展系统功能。

# rec_kit/core.py

import yaml
import importlib
import logging
from typing import Dict, Any, List

from rec_kit.interfaces import DataSource, EmbeddingStrategy, VectorStore, Document

logger = logging.getLogger(__name__)

class RecommendationKit:
    def __init__(self, config_path: str):
        logger.info(f"Initializing RecommendationKit with config: {config_path}")
        with open(config_path, 'r') as f:
            self.config = yaml.safe_load(f)

        self.data_source: DataSource = self._load_plugin("data_source")
        self.embedding_strategy: EmbeddingStrategy = self._load_plugin("embedding_strategy")
        self.vector_store: VectorStore = self._load_plugin("vector_store")

        # 核心的校验步骤:确保向量存储的维度和模型维度匹配
        self.vector_store.create_index_if_not_exists(self.embedding_strategy.dimension)

    def _load_plugin(self, plugin_type: str):
        """
        动态加载插件。
        这里的实现比较简单,生产环境可能需要更复杂的插件发现机制。
        """
        try:
            plugin_config = self.config[plugin_type]
            module_path = plugin_config["module"]
            class_name = plugin_config["class"]
            params = plugin_config.get("params", {})

            logger.info(f"Loading plugin: {module_path}.{class_name}")
            
            module = importlib.import_module(module_path)
            plugin_class = getattr(module, class_name)
            
            return plugin_class(params)
        except (KeyError, ImportError, AttributeError) as e:
            logger.error(f"Failed to load plugin of type '{plugin_type}': {e}")
            raise ValueError(f"Configuration error for plugin '{plugin_type}'") from e

    def run_indexing(self, batch_size: int = 64):
        """
        执行完整的索引构建流程
        """
        logger.info("Starting indexing process...")
        doc_stream = self.data_source.stream_documents()
        
        doc_batch: List[Document] = []
        total_processed = 0
        
        for document in doc_stream:
            doc_batch.append(document)
            if len(doc_batch) >= batch_size:
                self._process_batch(doc_batch)
                total_processed += len(doc_batch)
                logger.info(f"Processed {total_processed} documents so far.")
                doc_batch = []
        
        # 处理最后一批不足batch_size的数据
        if doc_batch:
            self._process_batch(doc_batch)
            total_processed += len(doc_batch)

        logger.info(f"Indexing process finished. Total documents processed: {total_processed}")

    def _process_batch(self, documents: List[Document]):
        vectors = self.embedding_strategy.embed(documents)
        if vectors:
            self.vector_store.upsert(vectors)

    def search(self, query_content: Any, top_k: int = 10, filter_criteria: Dict = None) -> List[Dict]:
        """
        执行查询
        """
        logger.info(f"Performing search for query content with top_k={top_k}")
        # 将查询内容包装成Document,以便复用embed方法
        query_doc = Document(doc_id="query", content=query_content, metadata={})
        query_vector = self.embedding_strategy.embed([query_doc])[0]
        
        return self.vector_store.query(query_vector.values, top_k, filter_criteria)

配置文件与实战

现在,所有组件都已就绪。我们可以通过一个简单的 YAML 文件来定义一个完整的推荐召回管道。

# config.yaml

data_source:
  module: rec_kit.plugins.data_sources.csv_source
  class: CsvDataSource
  params:
    file_path: "./data/sample_articles.csv"
    id_column: "article_id"
    content_column: "title"
    metadata_columns: ["category", "publish_date"]

embedding_strategy:
  module: rec_kit.plugins.embedding_strategies.sentence_transformer_strategy
  class: SentenceTransformerStrategy
  params:
    model_name: "paraphrase-multilingual-MiniLM-L12-v2"

vector_store:
  module: rec_kit.plugins.vector_stores.pinecone_store
  class: PineconeVectorStore
  params:
    index_name: "article-recommendation-index"

为了让这个配置能跑起来,还需要一个 CsvDataSource 插件。

# rec_kit/plugins/data_sources/csv_source.py
import csv
from typing import Dict, Any, Iterator, List
from rec_kit.interfaces import DataSource, Document
import logging

logger = logging.getLogger(__name__)

class CsvDataSource(DataSource):
    def __init__(self, config: Dict[str, Any]):
        self.file_path = config["file_path"]
        self.id_column = config["id_column"]
        self.content_column = config["content_column"]
        self.metadata_columns = config.get("metadata_columns", [])
        logger.info(f"CSVDataSource initialized for file: {self.file_path}")

    def stream_documents(self) -> Iterator[Document]:
        try:
            with open(self.file_path, mode='r', encoding='utf-8') as f:
                reader = csv.DictReader(f)
                for row in reader:
                    metadata = {col: row.get(col) for col in self.metadata_columns if col in row}
                    yield Document(
                        doc_id=row[self.id_column],
                        content=row[self.content_column],
                        metadata=metadata
                    )
        except FileNotFoundError:
            logger.error(f"CSV file not found at: {self.file_path}")
            raise
        except KeyError as e:
            logger.error(f"Column not found in CSV: {e}. Check your config.")
            raise

最后,一个执行入口脚本:

# main.py
import os
import logging
from dotenv import load_dotenv
from rec_kit.core import RecommendationKit

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def setup_environment():
    """加载环境变量,这是管理密钥的最佳实践"""
    load_dotenv()
    if not os.getenv("PINECONE_API_KEY"):
        logging.warning("PINECONE_API_KEY is not set. Please create a .env file.")
        exit(1)

def run():
    setup_environment()
    
    # 实例化Kit,所有魔法都从这里开始
    kit = RecommendationKit(config_path="config.yaml")

    # 运行索引流程
    # 在真实项目中,这会是一个离线的、定时的ETL任务
    # kit.run_indexing()

    # 执行在线查询
    query_text = "最新的AI技术进展"
    logging.info(f"\nSearching for: '{query_text}'")
    
    # 示例:增加元数据过滤
    filters = {"category": {"$eq": "Technology"}}
    
    results = kit.search(query_text, top_k=5, filter_criteria=filters)

    logging.info("Search Results:")
    for res in results:
        print(f"  - ID: {res['id']}, Score: {res['score']:.4f}, Metadata: {res['metadata']}")

if __name__ == "__main__":
    run()

现在,如果我们需要支持图片推荐,该怎么做?无需修改任何核心代码。我们只需要:

  1. 实现一个新的 ImageDataSource,从文件夹或数据库读取图片。
  2. 实现一个新的 CLIPEmbeddingStrategy,使用 CLIP 模型将图片转换为向量。
  3. 创建一个新的 config_image.yaml 文件,将 data_sourceembedding_strategy 指向新的插件实现。

这种扩展性正是我们最初追求的目标。

局限性与未来迭代

这个 Kit 目前只解决了召回(Recall)阶段的问题,一个完整的生产级推荐系统远比这复杂。它缺少了关键的排序(Ranking)和重排序(Re-ranking)阶段。当前的设计并没有为这些阶段留下接口。

其次,插件加载机制过于简单,直接依赖 importlib 和文件路径。在大型项目中,更健壮的方案是使用 Python 的 entry_points 机制,实现真正的插件自发现,让插件以独立的包形式存在和分发。

未来的迭代方向很明确:

  1. 增加排序器插件(Ranker): 在 search 流程中引入一个可插拔的排序步骤,它可以是一个简单的基于业务规则的排序器,也可以是一个复杂的机器学习模型。
  2. 异步化改造: 对于在线查询,embed 步骤可能会成为瓶颈。可以考虑使用异步框架(如 asyncio),将模型推理等 IO 密集型操作异步化,以提高查询接口的吞吐量。
  3. 特征工程与管理: 引入一个 FeatureStore 插件的接口,用于在召回和排序阶段获取实时或离线特征,这对于个性化推荐至关重要。
  4. 更完善的错误处理与监控: 集成更详细的监控指标(例如,每个插件的处理延迟、成功率),并对插件执行失败提供更优雅的降级策略。

  目录