构建基于 Laravel 与 Neo4j 的 MLOps 血缘追踪系统以纳管 Kubeflow 工作流


失控的 MLOps 流程始于一个简单的 Python 脚本和 S3 存储桶。很快,模型版本变成了文件名,数据集变成了文件夹路径,实验参数散落在几十个 config.yaml 文件里。当线上模型出现性能衰减时,想要回答“这个模型究竟是用哪个版本的数据、哪组超参数训练出来的?”这个问题,需要考古式的调查。这不仅仅是效率问题,在生产环境中,这直接关系到系统的可靠性和可追溯性。

我们的初步构想是构建一个元数据存储,用于记录这一切。关系型数据库是一个显而易见的选择,但很快就暴露了其局限性。ML 工作流的依赖关系是天然的图结构:一个数据集可以训练出多个模型,一个模型可以被用于多个部署,多个实验可能共享同一个基础数据集。用 SQL 中的多层 JOIN 查询来追溯一个部署的完整上游依赖,性能和复杂度都令人难以接受。这正是图数据库的用武之地。

技术选型决策

我们的目标是建立一个自动化的血缘(Lineage)追踪系统,它必须与现有的技术栈深度集成。

  1. 运行时环境: Kubernetes & Kubeflow
    这是我们团队的 MLOps 标准。K8s 提供了弹性的计算资源,而 Kubeflow Pipelines 则负责编排复杂的训练和部署工作流。任何解决方案都必须能无缝地从 Kubeflow 中捕获事件和元数据。

  2. 血缘数据存储: Neo4j
    我们选择了 Neo4j,一个原生的图数据库。它的节点-关系模型能直观地映射我们的 MLOps 实体(数据集、实验、模型、部署)及其关系(TRAINED_WITH, PRODUCED, DEPLOYED_AS)。使用其查询语言 Cypher,进行深度、可变的血缘追溯查询将变得异常高效。

  3. 控制平面与API: Laravel
    这是一个非典型的选择。大多数 MLOps 工具链是 Python 主导的,但我们的核心 API 服务和内部工具平台是基于 Laravel 构建的。在真实项目中,技术决策往往受到团队技能储备和现有基础设施的影响。与其引入一个新的 Python 服务(FastAPI/Flask)增加维护成本,我们决定利用 Laravel 的健壮性、任务队列和生态来构建这个控制平面。它将负责接收来自 Kubeflow 的事件,与 K8s API 交互以获取运行时上下文,并将处理后的血缘数据写入 Neo4j。

  4. 前端可视化
    我们将构建一个单页应用(SPA),通过调用 Laravel 提供的 API,将 Neo4j 中的图数据渲染成交互式的依赖关系图。

整体架构

整个系统的核心思想是事件驱动。当 Kubeflow Pipeline 中的一个关键步骤(如训练、评估)完成后,它会向 Laravel 控制平面发送一个携带上下文信息的 webhook 请求。

graph TD
    subgraph Kubernetes Cluster
        A[Kubeflow Pipeline] -- triggers --> B(Training Pod)
        B -- on completion --> C{Send Webhook}
    end

    C -- HTTP POST --> D[Laravel Application]

    subgraph Laravel Application
        D -- 1. Receives Event --> E[Lineage Controller]
        E -- 2. Dispatches Job --> F[Queue: ProcessLineage]
    end

    subgraph Background Worker
        G[Worker] -- 3. Picks up Job --> H(LineageService)
        H -- 4a. Query Pod Info --> I[K8s API Server]
        H -- 4b. Write Graph Data --> J[Neo4j Database]
    end

    K[Frontend SPA] -- Requests Data --> D
    D -- Queries Graph --> J
    J -- Returns Lineage --> D
    D -- Returns JSON --> K

这种异步处理方式(通过 Laravel Queue)至关重要,它确保了对 Kubeflow Webhook 的快速响应,并将与 K8s API 和 Neo4j 的交互这种可能较慢的操作放入后台执行,避免阻塞 Pipeline 的后续步骤。

步骤一:定义 Neo4j 图模型

我们的图模型很简单,但足以覆盖核心的血缘追踪需求。

  • 节点 (Labels):

    • Dataset: 代表一个数据集。属性:name, version, path
    • Experiment: 代表一次 Kubeflow Pipeline 运行。属性:runId, pipelineName, parameters (JSON string)。
    • Model: 代表一个训练好的模型产物。属性:name, version, artifactUri
    • Deployment: 代表一个线上服务。属性:endpoint, status, deploymentName
  • 关系 (Types):

    • USED: (Experiment)-[:USED]->(Dataset)
    • PRODUCED: (Experiment)-[:PRODUCED]->(Model)
    • DEPLOYED: (Model)-[:DEPLOYED]->(Deployment)

通过这个模型,我们可以轻易地执行 Cypher 查询来回答复杂问题,例如:
“查找由 /data/images_v2.zip 数据集训练出来,并且当前部署在线上的所有模型。”

MATCH (d:Dataset {path: '/data/images_v2.zip'})<-[:USED]-(e:Experiment)-[:PRODUCED]->(m:Model)-[:DEPLOYED]->(dep:Deployment)
WHERE dep.status = 'active'
RETURN m.name, m.version, dep.endpoint

步骤二:改造 Kubeflow Pipeline 以发送事件

我们需要在 Kubeflow Pipeline 的 Python DSL 中定义一个组件,它在训练任务成功后被调用。这个组件的核心功能是收集必要的元数据并向 Laravel API 发送一个 POST 请求。

一个常见的错误是直接在组件里硬编码 URL 和认证信息。在生产环境中,这些应该通过 K8s Secrets 和 ConfigMaps 注入。

# pipeline_component.py
from kfp.dsl import component, Output, Artifact
import os
import requests
import json
import logging

# Setup basic logging
logging.basicConfig(level=logging.INFO)

@component(
    base_image="python:3.9-slim",
    packages_to_install=["requests"]
)
def notify_lineage_service(
    pipeline_run_id: str,
    pipeline_name: str,
    model_artifact: Input[Artifact],
    dataset_path: str,
    hyperparameters: dict
):
    """
    Sends metadata to the Laravel-based MLOps Lineage Service.
    """
    lineage_api_endpoint = os.getenv("LINEAGE_API_ENDPOINT")
    api_token = os.getenv("LINEAGE_API_TOKEN")

    if not lineage_api_endpoint or not api_token:
        logging.error("LINEAGE_API_ENDPOINT or LINEAGE_API_TOKEN not set. Skipping notification.")
        # In a real project, this might need to fail the pipeline.
        return

    payload = {
        "event_type": "training_completed",
        "run_id": pipeline_run_id,
        "pipeline_name": pipeline_name,
        "artifacts": {
            "model": {
                "uri": model_artifact.uri,
                "name": model_artifact.metadata.get("name", "default-model-name"),
                "version": model_artifact.metadata.get("version", "v1.0.0")
            }
        },
        "inputs": {
            "dataset": {
                "path": dataset_path,
                "version": "v2.1" # This should ideally come from metadata as well
            }
        },
        "parameters": hyperparameters
    }

    headers = {
        "Authorization": f"Bearer {api_token}",
        "Content-Type": "application/json",
        "Accept": "application/json"
    }

    try:
        logging.info(f"Sending lineage data to {lineage_api_endpoint}")
        logging.info(f"Payload: {json.dumps(payload, indent=2)}")
        
        response = requests.post(
            lineage_api_endpoint, 
            data=json.dumps(payload), 
            headers=headers,
            timeout=15 # Set a reasonable timeout
        )
        
        response.raise_for_status() # Raises HTTPError for bad responses (4xx or 5xx)
        
        logging.info(f"Successfully notified lineage service. Status: {response.status_code}")
        logging.info(f"Response: {response.json()}")

    except requests.exceptions.RequestException as e:
        logging.error(f"Failed to send lineage data: {e}")
        # This is a critical failure point. We must have a retry mechanism or robust alerting.
        # For now, we just raise the exception to fail the step.
        raise e

在 Kubeflow Pipeline 的定义中,我们会将这个组件链接在训练组件之后,并传递必要的上下文信息,如 {{workflow.name}}

步骤三:构建 Laravel 控制平面

这是连接所有部分的核心。我们将创建一个 API 端点、一个处理业务逻辑的 Service 类,以及一个与 Neo4j 交互的 Repository。

1. API 路由与控制器

首先,在 routes/api.php 中定义一个受保护的路由。

// routes/api.php
use App\Http\Controllers\Api\V1\LineageWebhookController;
use Illuminate\Support\Facades\Route;

// All routes here are automatically prefixed with /api/v1
Route::middleware('auth:sanctum')->group(function () {
    Route::post('/lineage/event', [LineageWebhookController::class, 'handle'])
        ->name('lineage.event');
});

我们使用 Laravel Sanctum 进行简单的 Token 认证。

控制器的工作是验证输入并分发一个 Job 到队列中。

// app/Http/Controllers/Api/V1/LineageWebhookController.php
namespace App\Http\Controllers\Api\V1;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessLineageEvent;
use Illuminate\Http\JsonResponse;
use Illuminate\Http\Request;
use Illuminate\Support\Facades\Log;
use Illuminate\Validation\ValidationException;

class LineageWebhookController extends Controller
{
    /**
     * Handle incoming lineage events from Kubeflow.
     *
     * @param Request $request
     * @return JsonResponse
     */
    public function handle(Request $request): JsonResponse
    {
        try {
            $validatedData = $request->validate([
                'event_type' => 'required|string|in:training_completed',
                'run_id' => 'required|string|max:255',
                'pipeline_name' => 'required|string',
                'artifacts.model.uri' => 'required|string',
                'artifacts.model.name' => 'required|string',
                'artifacts.model.version' => 'required|string',
                'inputs.dataset.path' => 'required|string',
                'inputs.dataset.version' => 'required|string',
                'parameters' => 'sometimes|array',
            ]);

            // Dispatch to a queue for robust, asynchronous processing.
            // This allows us to return a response to Kubeflow immediately.
            ProcessLineageEvent::dispatch($validatedData);

            Log::info('Lineage event received and queued for processing.', ['run_id' => $validatedData['run_id']]);

            return response()->json(['message' => 'Event received and queued.'], 202);

        } catch (ValidationException $e) {
            Log::warning('Invalid lineage event received.', [
                'error' => $e->errors(),
                'payload' => $request->all()
            ]);
            return response()->json(['message' => 'Invalid payload.', 'errors' => $e->errors()], 422);
        } catch (\Exception $e) {
            Log.error('Failed to handle lineage event.', ['exception' => $e]);
            return response()->json(['message' => 'An internal server error occurred.'], 500);
        }
    }
}

2. 核心逻辑:Job 与 Service

Job 负责调用 Service 来执行实际的图数据库操作。

// app/Jobs/ProcessLineageEvent.php
namespace App\Jobs;

use App\Services\MlOps\LineageService;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Log;

class ProcessLineageEvent implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public int $tries = 3; // Attempt to run the job 3 times.
    public int $backoff = 60; // Wait 60 seconds between retries.

    public function __construct(protected array $eventData)
    {
    }

    public function handle(LineageService $lineageService): void
    {
        try {
            if ($this->eventData['event_type'] === 'training_completed') {
                $lineageService->recordTrainingRun($this->eventData);
            }
            // Future event types can be handled here.
        } catch (\Exception $e) {
            Log::critical('Fatal error processing lineage event.', [
                'run_id' => $this->eventData['run_id'] ?? 'unknown',
                'exception' => $e,
            ]);
            // Re-throw the exception to make the job fail and retry.
            throw $e;
        }
    }
}

LineageService 是所有逻辑的汇集地。这里的坑在于,我们不仅要处理 webhook 传来的数据,还可能需要调用 K8s API 获取更丰富的上下文,例如 Pod 的日志、资源消耗等。为了简化,我们暂时省略 K8s API 的交互,但真实项目中这是非常有价值的一步。

// app/Services/MlOps/LineageService.php
namespace App\Services\MlOps;

use App\Repositories\Neo4jLineageRepository;
use Illuminate\Support\Facades\Log;

class LineageService
{
    public function __construct(protected Neo4jLineageRepository $repository)
    {
    }

    /**
     * Records a completed training run in the Neo4j graph.
     * This method is designed to be idempotent.
     *
     * @param array $data
     * @return void
     */
    public function recordTrainingRun(array $data): void
    {
        Log::info('Processing training run in LineageService', ['run_id' => $data['run_id']]);

        $datasetInfo = $data['inputs']['dataset'];
        $modelInfo = $data['artifacts']['model'];
        $experimentInfo = [
            'runId' => $data['run_id'],
            'pipelineName' => $data['pipeline_name'],
            'parameters' => json_encode($data['parameters'] ?? []),
        ];

        // The repository handles the transactional Cypher queries.
        $this->repository->createTrainingLineage(
            $datasetInfo,
            $experimentInfo,
            $modelInfo
        );

        Log::info('Successfully recorded training run lineage.', ['run_id' => $data['run_id']]);
    }
}

3. 数据持久化:Neo4j Repository

Repository 模式封装了所有与数据库的交互。我们使用 laudis/neo4j-php-client 这个库。

// app/Repositories/Neo4jLineageRepository.php
namespace App\Repositories;

use Laudis\Neo4j\Contracts\ClientInterface;
use Illuminate\Support\Facades\DB; // Assuming Neo4j connection is set up in config/database.php

class Neo4jLineageRepository
{
    protected ClientInterface $client;

    public function __construct()
    {
        // Get the client from Laravel's database manager
        $this->client = DB::connection('neo4j')->getClient();
    }
    
    /**
     * Creates the full lineage for a training run within a single transaction.
     * Uses MERGE to ensure idempotency. If nodes/relationships exist, they are matched; otherwise, they are created.
     */
    public function createTrainingLineage(array $dataset, array $experiment, array $model): void
    {
        // Using MERGE is crucial for idempotency. We don't want duplicate nodes if an event is re-processed.
        // We identify nodes by a unique property (path for dataset, runId for experiment, etc.).
        $query = <<<CYPHER
        // 1. Ensure Dataset node exists
        MERGE (d:Dataset {path: \$datasetPath})
        ON CREATE SET d.version = \$datasetVersion, d.name = split(\$datasetPath, '/')[-1]

        // 2. Ensure Experiment node exists
        MERGE (e:Experiment {runId: \$expRunId})
        ON CREATE SET e.pipelineName = \$expPipelineName, e.parameters = \$expParams

        // 3. Ensure Model node exists
        MERGE (m:Model {artifactUri: \$modelUri})
        ON CREATE SET m.name = \$modelName, m.version = \$modelVersion

        // 4. Create relationships if they don't exist
        MERGE (e)-[:USED]->(d)
        MERGE (e)-[:PRODUCED]->(m)
        CYPHER;

        $params = [
            'datasetPath' => $dataset['path'],
            'datasetVersion' => $dataset['version'],
            'expRunId' => $experiment['runId'],
            'expPipelineName' => $experiment['pipelineName'],
            'expParams' => $experiment['parameters'],
            'modelUri' => $model['uri'],
            'modelName' => $model['name'],
            'modelVersion' => $model['version'],
        ];

        // The client's `run` method executes the query.
        // For production, wrapping this in a transaction is essential.
        $this->client->writeTransaction(function ($tx) use ($query, $params) {
            return $tx->run($query, $params);
        });
    }

    /**
     * Fetches the upstream and downstream graph for a given model artifact URI.
     */
    public function getLineageForModel(string $artifactUri, int $depth = 3): array
    {
        // This query finds a model and traverses its relationships in both directions up to a certain depth.
        $query = <<<CYPHER
        MATCH (m:Model {artifactUri: \$uri})
        CALL apoc.path.subgraphAll(m, {
            maxLevel: \$depth
        })
        YIELD nodes, relationships
        RETURN nodes, relationships
        CYPHER;
        
        $result = $this->client->readTransaction(function ($tx) use ($query, $artifactUri, $depth) {
            return $tx->run($query, ['uri' => $artifactUri, 'depth' => $depth]);
        });

        // The processing of the result to a frontend-friendly format is non-trivial.
        // We need to format nodes and edges into a structure that libraries like vis.js or D3.js can consume.
        if ($result->count() === 0) {
            return ['nodes' => [], 'edges' => []];
        }

        $record = $result->first();
        $nodes = $record->get('nodes');
        $relationships = $record->get('relationships');
        
        $formattedNodes = [];
        foreach($nodes as $node) {
            $formattedNodes[] = [
                'id' => $node->getId(),
                'label' => $node->getLabels()->first(),
                'properties' => $node->getProperties()->toArray(),
            ];
        }
        
        $formattedEdges = [];
        foreach($relationships as $rel) {
             $formattedEdges[] = [
                'from' => $rel->getStartNodeId(),
                'to' => $rel->getEndNodeId(),
                'label' => $rel->getType()
            ];
        }

        return ['nodes' => $formattedNodes, 'edges' => $formattedEdges];
    }
}

注意: 上述 getLineageForModel 查询使用了 APOC (Awesome Procedures on Cypher) 扩展库中的 apoc.path.subgraphAll,这是一个非常强大的过程,可以方便地获取一个节点周围的子图。确保你的 Neo4j 实例安装了 APOC。

步骤四:为前端提供数据

现在,我们只需创建一个新的 API 端点,让前端可以查询特定模型的血缘。

// routes/api.php - add this inside the middleware group
Route::get('/lineage/model', [ModelLineageController::class, 'show'])->name('lineage.model.show');

// app/Http/Controllers/Api/V1/ModelLineageController.php
namespace App\Http\Controllers\Api\V1;

use App\Http\Controllers\Controller;
use App\Repositories\Neo4jLineageRepository;
use Illuminate\Http\Request;

class ModelLineageController extends Controller
{
    public function __construct(protected Neo4jLineageRepository $repository)
    {
    }

    public function show(Request $request)
    {
        $validated = $request->validate([
            'uri' => 'required|string',
            'depth' => 'sometimes|integer|min:1|max:5'
        ]);

        $lineageData = $this->repository->getLineageForModel(
            $validated['uri'],
            $validated['depth'] ?? 3
        );

        if (empty($lineageData['nodes'])) {
            return response()->json(['message' => 'Model not found or has no lineage.'], 404);
        }

        return response()->json($lineageData);
    }
}

前端应用拿到这个 JSON 响应后,就可以使用任何图形可视化库来渲染一个动态、可交互的血缘图谱,用户可以点击节点查看详细元数据。

当前方案的局限性与未来展望

这个架构虽然解决了核心的血缘追踪问题,但它并非完美。

首先,依赖 Kubeflow 组件主动发送 webhook 是一个潜在的故障点。如果网络分区或 Laravel 服务短暂不可用,事件就会丢失。一个更健壮的方案是引入一个消息队列(如 RabbitMQ 或 NATS),让 Kubeflow 组件将事件投递到队列中,Laravel 作为消费者来处理,这样可以提供更好的持久性和重试保证。

其次,当前的血缘粒度停留在“组件”级别。一个完整的 MLOps 工作流还包括更细粒度的信息,例如特征工程的具体转换逻辑、数据质量检查的结果等。扩展图模型以容纳这些实体将是下一步迭代的关键,但这会显著增加系统的复杂性。

最后,只追踪血缘是不够的。将可观测性数据(例如模型的线上预测性能指标、数据漂移监控结果)也关联到图数据库中,将为我们提供一个从数据源到生产性能的全景视图。这需要集成 Prometheus 等监控系统的数据,将 Deployment 节点与实时性能数据关联起来。


  目录