构建基于DVC与GitOps的版本化特征存储控制平面及服务代理


团队扩张后,机器学习项目中的特征工程流程开始失控。最初,特征的定义和生成逻辑散落在各个Jupyter Notebook和独立的Python脚本里。数据科学家们各自为战,一个新特征的上线,往往意味着手动执行一段未经严格测试的SQL,更新一个共享的数据表,然后祈祷不要破坏下游的模型训练或在线预测。版本控制基本靠文件名后缀 _v2, _final, _fixed,特征的血缘关系和可复现性成了一笔糊涂账。当我们需要回滚某个特征,或者想对比不同版本特征对模型性能的影响时,整个过程充满了猜测和风险。

核心痛点非常明确:我们缺乏一套将特征工程纳入软件工程最佳实践的机制。特征的定义本身(通常是SQL转换逻辑)就应该是代码,需要被版本化、评审和自动化部署。这个想法催生了一个内部平台的初步构想:一个基于GitOps理念的版本化特征存储系统。

整个系统的设计目标是:

  1. 特征定义即代码:所有特征生成逻辑(SQL脚本)必须在Git仓库中管理。
  2. 版本化与可复现:使用DVC(Data Version Control)来追踪SQL脚本与其生成的具体数据版本之间的关系。
  3. 自动化流程:合并到主分支的特征定义,应自动触发执行、验证,并注册到中心化的特征注册表。
  4. 灵活的在线服务:通过服务代理,能够根据请求头,将流量路由到不同版本的特征服务上,以支持A/B测试和灰度发布。

技术选型基于轻量、高效和云原生的原则。我们最终确定了如下技术栈:

  • 控制平面API: 使用Koa (Node.js) 构建。它足够轻量,中间件模型非常适合构建简单的RESTful API,用于接收CI/CD流水线的指令,并操作后端的特征注册表。
  • 特征注册与离线存储: PostgreSQL。一个稳定可靠的关系型数据库,足以存储特征元数据和离线特征数据。
  • 版本控制核心: DVC。它与Git无缝集成,通过dvc.yaml定义数据处理阶段,完美契合我们“SQL脚本 -> 特征数据表”的流水线。
  • 服务代理与流量路由: Envoy Proxy。其强大的动态配置和路由能力,是实现在线特征服务版本控制的关键。

架构概览

在深入代码之前,我们先看一下整个系统的架构和数据流。

graph TD
    subgraph Git Repository
        A[Data Scientist: git push] --> B{CI/CD Pipeline};
        C[features/user_avg_spend.sql]
        D[dvc.yaml]
    end

    subgraph CI/CD Runner
        B -- on merge to main --> E[dvc repro];
        E -- executes SQL --> F[(PostgreSQL Offline Store)];
        E -- after success --> G[curl -X POST /register];
    end

    subgraph Control & Storage Plane
        G --> H{Koa Control Plane API};
        H -- writes metadata --> I[(PostgreSQL Feature Registry)];
        F -- feature data --> I;
    end

    subgraph Serving Plane
        J[Client Request] -- x-feature-version: 1.1 --> K{Envoy Proxy};
        K -- route based on header --> L[Feature Service v1.0];
        K -- route based on header --> M[Feature Service v1.1];
        L --> I;
        M --> I;
    end

    style F fill:#d3d3d3,stroke:#333,stroke-width:2px
    style I fill:#d3d3d3,stroke:#333,stroke-width:2px

工作流程如下:

  1. 数据科学家在Git仓库的features/目录下添加或修改一个SQL文件,定义新的特征。
  2. 同时,他们更新dvc.yaml来定义这个SQL文件如何生成一个数据表。
  3. 提交Pull Request,经过评审后合并到main分支。
  4. CI/CD流水线被触发,执行dvc repro命令。DVC会运行对应的SQL,将结果物化到PostgreSQL离线存储中。
  5. dvc repro成功后,流水线中的一个脚本会调用Koa控制平面的/register接口,将新版本的特征元数据(名称、版本、数据表位置等)写入特征注册表。
  6. 在线服务中,Envoy Proxy拦截所有特征获取请求。它可以解析请求头中的特定版本标识,将请求精确路由到部署了对应版本特征逻辑的服务实例上。

数据库与项目结构

首先,我们需要一个PostgreSQL数据库来存储特征的元数据。这个表的结构很简单,但至关重要。

SQL Schema (registry.sql)

CREATE TABLE IF NOT EXISTS feature_registry (
    id SERIAL PRIMARY KEY,
    feature_name VARCHAR(255) NOT NULL,
    version VARCHAR(50) NOT NULL,
    status VARCHAR(20) NOT NULL DEFAULT 'PENDING', -- PENDING, ACTIVE, DEPRECATED
    description TEXT,
    value_type VARCHAR(50), -- e.g., NUMERIC, CATEGORICAL
    source_sql_path VARCHAR(512), -- Path to the SQL file in Git
    offline_table_name VARCHAR(255), -- The materialized table in the DB
    created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
    UNIQUE(feature_name, version)
);

-- Simple function to update the `updated_at` timestamp on row update
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
   NEW.updated_at = NOW();
   RETURN NEW;
END;
$$ language 'plpgsql';

CREATE TRIGGER update_feature_registry_updated_at
BEFORE UPDATE ON feature_registry
FOR EACH ROW
EXECUTE FUNCTION update_updated_at_column();

我们的项目目录结构如下,将特征定义、DVC配置和控制平面代码清晰地分离开。

feature-store/
├── .dvc/
├── features/
│   └── user_7day_avg_spend.sql
├── services/
│   └── control-plane/
│       ├── src/
│       │   ├── db.js
│       │   ├── routes.js
│       │   └── server.js
│       ├── package.json
│       └── .env
├── dvc.yaml
└── .gitignore

DVC与GitOps流水线

DVC是连接Git和数据的桥梁。dvc.yaml文件定义了我们的数据处理阶段。

dvc.yaml

# dvc.yaml
stages:
  generate_user_7day_avg_spend:
    # Command to execute. It uses environment variables for DB connection.
    # The script should be idempotent.
    cmd: >-
      psql -h $DB_HOST -U $DB_USER -d $DB_NAME
      -c "CREATE TABLE IF NOT EXISTS user_7day_avg_spend AS
          SELECT * FROM dblink('dbname=source_db', 'SELECT user_id, AVG(amount) as avg_spend_7d FROM orders WHERE order_date >= NOW() - interval ''7 day'' GROUP BY user_id')
          AS t(user_id BIGINT, avg_spend_7d NUMERIC(10, 2));"
    # This stage depends on the SQL file. If the file changes, DVC knows to re-run.
    deps:
      - features/user_7day_avg_spend.sql
    # We don't track the output data directly with DVC,
    # as it's in a managed DB. Instead, we use a placeholder "metric" file
    # to signify completion. The real output is the DB table.
    # A post-hook in the CI pipeline will handle registration.
    outs:
      # This is a dummy output to make the stage trackable.
      - data/user_7day_avg_spend.flag

在真实的CI/CD管道(如GitHub Actions)中,工作流会是这样:

# .github/workflows/feature-ci.yaml (conceptual)
name: Feature Engineering CI

on:
  push:
    branches:
      - main
    paths:
      - 'features/**.sql'
      - 'dvc.yaml'

jobs:
  process-and-register:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Setup DVC
        uses: iterative/setup-dvc@v1

      - name: Run DVC pipeline
        env:
          DB_HOST: ${{ secrets.DB_HOST }}
          DB_USER: ${{ secrets.DB_USER }}
          DB_PASSWORD: ${{ secrets.DB_PASSWORD }}
          DB_NAME: ${{ secrets.DB_NAME }}
        run: |
          dvc repro

      - name: Register Feature Version
        run: |
          # A simple script to parse the feature name from the filename
          # and get the version from the Git commit hash.
          FEATURE_SQL_PATH=$(git diff-tree --no-commit-id --name-only -r HEAD | grep 'features/.*\.sql')
          FEATURE_NAME=$(basename $FEATURE_SQL_PATH .sql)
          VERSION=$(git rev-parse --short HEAD)
          
          curl -X POST http://control-plane.internal:3000/api/v1/register \
            -H "Content-Type: application/json" \
            -d '{
              "featureName": "'"$FEATURE_NAME"'",
              "version": "'"$VERSION"'",
              "description": "Auto-registered from CI pipeline",
              "valueType": "NUMERIC",
              "sourceSqlPath": "'"$FEATURE_SQL_PATH"'",
              "offlineTableName": "'"$FEATURE_NAME"'"
            }'

这里的坑在于,DVC原生更适合追踪文件系统中的数据。当输出是数据库表时,我们采用了一种变通策略:dvc repro负责执行SQL,但我们不让DVC直接追踪数据库状态。而是通过CI脚本在DVC执行成功后,调用我们的控制平面API来记录元数据,从而将Git-DVC的世界与我们的应用状态(特征注册表)连接起来。

Koa控制平面API实现

控制平面是一个简单的Koa应用,它提供了一个内部API端点用于特征注册。

services/control-plane/src/db.js

// src/db.js
const { Pool } = require('pg');

// In a real project, use a robust configuration management system.
// Here we use environment variables for simplicity.
const pool = new Pool({
  host: process.env.DB_HOST || 'localhost',
  user: process.env.DB_USER || 'user',
  password: process.env.DB_PASSWORD || 'password',
  database: process.env.DB_NAME || 'feature_store',
  port: parseInt(process.env.DB_PORT || '5432', 10),
});

pool.on('error', (err, client) => {
  console.error('Unexpected error on idle client', err);
  process.exit(-1);
});

module.exports = {
  query: (text, params) => pool.query(text, params),
};

services/control-plane/src/routes.js

// src/routes.js
const Router = require('@koa/router');
const db = require('./db');

const router = new Router({
  prefix: '/api/v1'
});

router.post('/register', async (ctx) => {
  const {
    featureName,
    version,
    description,
    valueType,
    sourceSqlPath,
    offlineTableName
  } = ctx.request.body;

  // Basic validation
  if (!featureName || !version || !sourceSqlPath || !offlineTableName) {
    ctx.status = 400;
    ctx.body = { error: 'Missing required fields: featureName, version, sourceSqlPath, offlineTableName' };
    return;
  }

  try {
    const queryText = `
      INSERT INTO feature_registry(feature_name, version, status, description, value_type, source_sql_path, offline_table_name)
      VALUES($1, $2, $3, $4, $5, $6, $7)
      ON CONFLICT (feature_name, version) 
      DO UPDATE SET
        status = EXCLUDED.status,
        description = EXCLUDED.description,
        value_type = EXCLUDED.value_type,
        source_sql_path = EXCLUDED.source_sql_path,
        offline_table_name = EXCLUDED.offline_table_name,
        updated_at = NOW()
      RETURNING *;
    `;
    const values = [featureName, version, 'ACTIVE', description, valueType, sourceSqlPath, offlineTableName];
    
    const { rows } = await db.query(queryText, values);
    
    // A common mistake is not logging structured context.
    console.log({
        message: 'Feature version registered/updated successfully',
        featureName,
        version,
        registryId: rows[0].id,
    });

    ctx.status = 201;
    ctx.body = rows[0];
  } catch (err) {
    console.error({
        message: 'Error registering feature',
        error: err.message,
        stack: err.stack,
        requestBody: ctx.request.body
    });
    ctx.status = 500;
    ctx.body = { error: 'Internal Server Error' };
  }
});

router.get('/features/:name', async (ctx) => {
    // This endpoint could be used by feature services to discover active versions
    const { name } = ctx.params;
    try {
        const { rows } = await db.query(
            'SELECT * FROM feature_registry WHERE feature_name = $1 AND status = $2 ORDER BY created_at DESC',
            [name, 'ACTIVE']
        );
        if (rows.length === 0) {
            ctx.status = 404;
            ctx.body = { error: `No active features found with name: ${name}` };
            return;
        }
        ctx.body = rows;
    } catch (err) {
        console.error(`Error fetching feature: ${name}`, err);
        ctx.status = 500;
        ctx.body = { error: 'Internal Server Error' };
    }
});


module.exports = router;

services/control-plane/src/server.js

// src/server.js
require('dotenv').config(); // Load .env file

const Koa = require('koa');
const bodyParser = require('koa-bodyparser');
const router = require('./routes');

const app = new Koa();
const PORT = process.env.PORT || 3000;

// Centralized error handling middleware
app.use(async (ctx, next) => {
  try {
    await next();
  } catch (err) {
    ctx.status = err.status || 500;
    ctx.body = { error: err.message || 'Internal Server Error' };
    // In production, you'd want more robust logging here.
    console.error(`Unhandled error on ${ctx.method} ${ctx.url}:`, err);
  }
});

app.use(bodyParser());
app.use(router.routes());
app.use(router.allowedMethods());

// Graceful shutdown logic
const server = app.listen(PORT, () => {
  console.log(`Control Plane API server running on port ${PORT}`);
});

process.on('SIGTERM', () => {
  console.log('SIGTERM signal received: closing HTTP server');
  server.close(() => {
    console.log('HTTP server closed');
    // Here you would also close DB connections, etc.
    process.exit(0);
  });
});

这个Koa应用非常精简,只做了必要的事情:接收JSON,验证,写入数据库,并提供最基本的日志和错误处理。在生产环境中,还需要加入更详细的结构化日志、请求追踪ID、认证授权等。

Envoy Proxy的动态路由配置

这是将版本化理念带到线上的关键一步。假设我们有两个版本的特征服务feature-service-v-abc123feature-service-v-def456,分别对应两个Git提交。我们希望通过HTTP头 x-feature-version 来控制流量。

Envoy的配置比较复杂,但威力巨大。

envoy.yaml

# envoy.yaml
static_resources:
  listeners:
  - name: listener_0
    address:
      socket_address:
        address: 0.0.0.0
        port_value: 8000
    filter_chains:
    - filters:
      - name: envoy.filters.network.http_connection_manager
        typed_config:
          "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
          stat_prefix: ingress_http
          route_config:
            name: local_route
            virtual_hosts:
            - name: feature_service_vhost
              domains: ["*"]
              routes:
              - match:
                  prefix: "/features/user_7day_avg_spend"
                route:
                  # We use a weighted cluster approach, but routing rules override this.
                  weighted_clusters:
                    clusters:
                    - name: feature_service_stable
                      weight: 100
                    - name: feature_service_canary
                      weight: 0
                  # The core logic for version-based routing
                  # This allows us to target specific versions for debugging or A/B testing.
                  # If the header is present, it routes to the specified cluster.
                decorator:
                  operation: "get_feature"
              - match:
                  prefix: "/features/user_7day_avg_spend"
                  headers:
                  - name: x-feature-version
                    exact_match: "abc123" # Corresponds to a specific git hash / version
                route:
                  cluster: feature_service_stable # Maps to the pod with this version
              - match:
                  prefix: "/features/user_7day_avg_spend"
                  headers:
                  - name: x-feature-version
                    exact_match: "def456" # A newer version
                route:
                  cluster: feature_service_canary
          http_filters:
          - name: envoy.filters.http.router
            typed_config: {}

  clusters:
  - name: feature_service_stable
    connect_timeout: 0.25s
    type: STRICT_DNS
    lb_policy: ROUND_ROBIN
    load_assignment:
      cluster_name: feature_service_stable
      endpoints:
      - lb_endpoints:
        - endpoint:
            address:
              socket_address:
                # In Kubernetes, this would be the service name.
                address: feature-service-v-abc123.internal
                port_value: 8080
  - name: feature_service_canary
    connect_timeout: 0.25s
    type: STRICT_DNS
    lb_policy: ROUND_ROBIN
    load_assignment:
      cluster_name: feature_service_canary
      endpoints:
      - lb_endpoints:
        - endpoint:
            address:
              socket_address:
                address: feature-service-v-def456.internal
                port_value: 8080

这段Envoy配置定义了:

  1. 一个监听8000端口的HTTP监听器。
  2. 一个虚拟主机,匹配所有域名。
  3. 一个复杂的路由规则:
    • 首先定义了两个路由匹配规则,它们检查 x-feature-version 头。如果请求头是 x-feature-version: abc123,流量会被发送到 feature_service_stable 集群。如果是 def456,则发送到 feature_service_canary 集群。
    • 一个默认的、优先级较低的路由规则,它将所有不匹配上述头部的请求发送到 feature_service_stable。通过调整weighted_clusters,我们可以实现百分比的灰度发布。
  4. 两个集群 feature_service_stablefeature_service_canary,分别指向不同版本的后端服务地址。

在生产环境中,这套配置会通过Envoy的xDS API动态下发,而不是静态文件。我们的控制平面可以扩展,在注册新特征版本并部署新服务实例后,自动生成并推送新的Envoy配置。

局限性与未来展望

这个系统虽然解决了我最初提到的核心痛点,但它远非一个完备的特征平台。

  • 批处理限制: 当前的设计完全基于批处理。SQL脚本的执行是周期性的,无法满足对实时事件流进行特征计算的需求。要支持流式特征,需要引入Flink或Spark Streaming等流计算引擎,架构会变得复杂得多。
  • 在线/离线一致性: 系统只保证了离线特征的生成和版本化。在线服务如何获取这些特征(直接查PostgreSQL?还是同步到Redis/DynamoDB等低延迟存储?)并未详细设计。确保在线服务获取的数据与离线训练时使用的数据定义完全一致,是一个巨大的挑战。
  • 服务部署自动化: 我们谈到了CI/CD会部署新的服务实例,但具体的实现(例如,使用Kubernetes Operator或Helm模板动态创建Deployment和Service)并未覆盖。这本身就是一个复杂的工程问题。
  • Schema管理: 当特征的定义发生变化,导致输出数据表的Schema改变时,如何管理下游的兼容性?这需要引入类似Protobuf或Avro的Schema管理机制,并在CI流程中加入强制的兼容性检查。

未来的迭代方向很明确:首先是构建一个高性能的在线特征服务层,它能从PostgreSQL同步数据到低延迟的KV存储中。其次是探索集成流处理框架,以支持实时特征。最后,将Envoy的配置管理完全自动化,并与Kubernetes的部署生命周期深度绑定,形成一个真正动态、自愈的特征服务网格。


  目录