构建一个Node.js客户端代理以观测GraphQL向量API并暴露Prometheus时序指标


我们的业务场景中有一个棘手的监控需求:一个提供向量检索服务的GraphQL API。常规的GraphQL客户端监控,比如请求延迟、错误率和解析器耗时,已经通过APM工具覆盖了。但我们面临的真正痛点是,如何度量向量检索的“质量”——即返回的向量与查询向量的语义相似度分布。这种业务层面的指标,传统监控工具无能为力,而它对算法模型的迭代效果评估至关重要。

最初的构想是在业务代码中埋点,但这会造成代码侵入性强,且逻辑分散在多个服务中,难以统一维护和升级。于是,我们决定构建一个独立的、非侵入式的客户端代理层。这个代理将无缝包裹现有的GraphQL客户端,透明地拦截请求与响应,执行向量相似度计算,并将结果聚合为时序数据,暴露给Prometheus进行采集。

初步构想与技术选型

这个代理的核心职责是:

  1. 接收业务方的GraphQL查询。
  2. 转发查询到真实的GraphQL服务端。
  3. 在收到响应后,解析出其中的查询向量和结果向量。
  4. 在不阻塞Node.js事件循环的前提下,计算向量间的余弦相似度。
  5. 将相似度分布、请求延迟等指标更新到内部的Prometheus度量注册表中。
  6. 提供一个/metrics端点供Prometheus抓取。

技术栈选型相对直接:

  • 运行时: Node.js。其异步非阻塞特性非常适合构建网络代理。
  • Prometheus客户端: prom-client。这是Node.js生态中最成熟的Prometheus指标库。
  • HTTP客户端: 底层使用node-fetchaxios来与真实的GraphQL服务端通信。
  • 向量计算: 对于余弦相似度这类简单的计算,原生JavaScript即可胜任,避免引入过重的依赖。对于更复杂的计算,则考虑使用worker_threads来隔离计算密集型任务。

代理核心实现:从原型到生产级

我们从一个核心类GraphQLVectorMonitorProxy开始构建。

第一步:基础代理结构与指标定义

代理需要一个构造函数来接收目标GraphQL服务的URL和一些配置。同时,我们需要初始化所有需要暴露的Prometheus指标。

在真实项目中,一个常见的错误是滥用标签(label),导致Prometheus面临高基数(high cardinality)问题。因此,我们必须谨慎设计指标。

// src/proxy.js

const client = require('prom-client');
const fetch = require('node-fetch');
const { Worker } = require('worker_threads');
const path = require('path');

// 初始化Prometheus指标
const registry = new client.Registry();
client.collectDefaultMetrics({ register: registry });

// 指标设计:
// 1. 操作名称(operationName)是关键维度,但基数可控。
// 2. status_code用于区分网络层面和GraphQL业务层面的错误。
const metrics = {
    requestLatency: new client.Histogram({
        name: 'graphql_vector_client_request_latency_seconds',
        help: 'Latency for GraphQL vector API requests.',
        labelNames: ['operationName', 'target'],
        registers: [registry],
        buckets: [0.01, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5] // 适用于向量检索的延迟分布
    }),
    requestErrors: new client.Counter({
        name: 'graphql_vector_client_errors_total',
        help: 'Total number of errors in GraphQL vector API requests.',
        labelNames: ['operationName', 'target', 'error_type'], // network, graphql_error
        registers: [registry],
    }),
    similarityDistribution: new client.Histogram({
        name: 'graphql_vector_similarity_distribution',
        help: 'Distribution of cosine similarity scores for vector search results.',
        labelNames: ['operationName', 'target'],
        registers: [registry],
        buckets: [0.5, 0.6, 0.7, 0.8, 0.85, 0.9, 0.95, 0.98, 1.0] // 相似度通常关注高分段
    })
};

class GraphQLVectorMonitorProxy {
    /**
     * @param {object} options
     * @param {string} options.targetEndpoint The real GraphQL endpoint URL.
     * @param {string} options.targetName A friendly name for the target, used in labels.
     * @param {number} [options.timeout=10000] Request timeout in milliseconds.
     */
    constructor({ targetEndpoint, targetName, timeout = 10000 }) {
        if (!targetEndpoint || !targetName) {
            throw new Error('targetEndpoint and targetName are required.');
        }
        this.targetEndpoint = targetEndpoint;
        this.targetName = targetName;
        this.timeout = timeout;
        this.worker = new Worker(path.resolve(__dirname, 'vectorWorker.js'));

        this.worker.on('message', (result) => {
            // 从工作线程接收计算结果并更新指标
            if (result.type === 'similarity' && result.scores.length > 0) {
                const { operationName, scores } = result.payload;
                scores.forEach(score => {
                    metrics.similarityDistribution.observe({ operationName, target: this.targetName }, score);
                });
            }
        });
        
        this.worker.on('error', (err) => {
            // 生产环境中必须有日志记录
            console.error('Vector worker thread error:', err);
        });
    }

    getMetrics() {
        return registry.metrics();
    }

    /**
     * Executes the GraphQL query and handles monitoring.
     * @param {object} payload The standard GraphQL request payload.
     * @param {string} payload.query The GraphQL query string.
     * @param {object} [payload.variables] The query variables.
     * @param {string} [payload.operationName] The operation name.
     * @returns {Promise<object>} The GraphQL response data.
     */
    async execute({ query, variables, operationName }) {
        const end = metrics.requestLatency.startTimer({ operationName, target: this.targetName });
        
        try {
            const controller = new AbortController();
            const timeoutId = setTimeout(() => controller.abort(), this.timeout);

            const response = await fetch(this.targetEndpoint, {
                method: 'POST',
                headers: { 'Content-Type': 'application/json' },
                body: JSON.stringify({ query, variables, operationName }),
                signal: controller.signal,
            });

            clearTimeout(timeoutId);

            const responseBody = await response.json();

            if (!response.ok || responseBody.errors) {
                metrics.requestErrors.inc({ operationName, target: this.targetName, error_type: 'graphql_error' });
                // 即使有错,也直接返回给调用方处理
                return responseBody;
            }

            // 异步、非阻塞地将向量计算任务派发给工作线程
            // 这里的关键在于,主线程不等待计算结果,立即返回响应数据。
            this.dispatchVectorCalculation(operationName, variables, responseBody.data);
            
            return responseBody;

        } catch (error) {
            const errorType = error.name === 'AbortError' ? 'timeout' : 'network';
            metrics.requestErrors.inc({ operationName, target: this.targetName, error_type: errorType });
            // 抛出错误,让上游业务感知到失败
            throw error;
        } finally {
            end();
        }
    }

    /**
     * A non-blocking method to send vector data to a worker thread for processing.
     * @param {string} operationName
     * @param {object} requestVariables
     * @param {object} responseData
     */
    dispatchVectorCalculation(operationName, requestVariables, responseData) {
        // 这里的路径解析逻辑是业务强相关的,必须根据实际的GraphQL Schema来定
        // 假设:请求变量中有 `queryVector`,响应数据中有 `search.results` 列表,每个结果包含 `embedding` 字段
        const queryVector = requestVariables?.queryVector;
        const resultItems = responseData?.search?.results;

        if (queryVector && Array.isArray(resultItems) && resultItems.length > 0) {
            const resultVectors = resultItems.map(item => item.embedding).filter(Boolean);
            
            if (resultVectors.length > 0) {
                this.worker.postMessage({
                    type: 'calculateSimilarity',
                    payload: {
                        queryVector,
                        resultVectors,
                        operationName
                    }
                });
            }
        }
    }
    
    shutdown() {
        return this.worker.terminate();
    }
}

module.exports = { GraphQLVectorMonitorProxy, registry };

这里的关键决策是,向量计算必须不能阻塞事件循环。一次向量检索可能返回数十个高维向量,循环计算余弦相似度虽然不复杂,但在高并发下会累积成显著的CPU负载,导致主线程延迟。因此,我们引入了worker_threadsdispatchVectorCalculation方法只负责提取数据并发送给工作线程,这是一个非常轻量的操作,主线程可以立即将GraphQL响应返回给业务调用方,确保低延迟。

第二步:实现向量计算的工作线程

工作线程的职责单一且明确:接收向量数据,执行计算,返回结果。

// src/vectorWorker.js

const { parentPort } = require('worker_threads');

/**
 * Calculates the dot product of two vectors.
 * @param {number[]} vecA
 * @param {number[]} vecB
 * @returns {number}
 */
function dotProduct(vecA, vecB) {
    let product = 0;
    for (let i = 0; i < vecA.length; i++) {
        product += vecA[i] * vecB[i];
    }
    return product;
}

/**
 * Calculates the magnitude (L2 norm) of a vector.
 * @param {number[]} vec
 * @returns {number}
 */
function magnitude(vec) {
    let sumOfSquares = 0;
    for (let i = 0; i < vec.length; i++) {
        sumOfSquares += vec[i] * vec[i];
    }
    return Math.sqrt(sumOfSquares);
}

/**
 * Calculates the cosine similarity between two vectors.
 * @param {number[]} vecA
 * @param {number[]} vecB
 * @returns {number} Returns NaN if either vector has zero magnitude.
 */
function cosineSimilarity(vecA, vecB) {
    if (vecA.length !== vecB.length) {
        // 在真实项目中,应该有更健壮的日志和错误处理
        console.error("Vectors have different dimensions.");
        return 0;
    }
    const magA = magnitude(vecA);
    const magB = magnitude(vecB);

    if (magA === 0 || magB === 0) {
        return 0; // Or handle as an error case
    }
    
    return dotProduct(vecA, vecB) / (magA * magB);
}

parentPort.on('message', (task) => {
    if (task.type === 'calculateSimilarity') {
        const { queryVector, resultVectors, operationName } = task.payload;
        
        try {
            const scores = resultVectors.map(vec => cosineSimilarity(queryVector, vec));
            
            // 将计算结果发送回主线程
            parentPort.postMessage({
                type: 'similarity',
                payload: {
                    operationName,
                    scores
                }
            });
        } catch (error) {
            // 错误处理:确保工作线程不会因单次计算失败而崩溃
            console.error('Error during similarity calculation:', error);
             parentPort.postMessage({
                type: 'error',
                payload: {
                    operationName,
                    message: error.message
                }
            });
        }
    }
});

这个工作线程是无状态的,只负责纯计算。这样设计使得它可以轻松地水平扩展(如果未来需要一个工作线程池)。

第三步:集成与暴露Metrics端点

现在我们需要一个服务来承载这个代理,并暴露/metrics端点。使用ExpressKoa都很简单。

// src/server.js

const express = require('express');
const { GraphQLVectorMonitorProxy } = require('./proxy');

const app = express();
app.use(express.json());

// 配置代理实例
const proxy = new GraphQLVectorMonitorProxy({
    targetEndpoint: 'http://your-real-graphql-api:4000/graphql',
    targetName: 'vector-search-service'
});

// 代理端点,业务系统将请求此端点而非直接请求原始API
app.post('/graphql-proxy', async (req, res) => {
    const { query, variables, operationName } = req.body;

    if (!query) {
        return res.status(400).json({ errors: [{ message: 'Query is required.' }] });
    }

    try {
        const result = await proxy.execute({ query, variables, operationName });
        res.json(result);
    } catch (error) {
        // 这里的错误已经被代理内部记录了,现在需要给客户端一个合理的响应
        console.error(`[Proxy] Request failed for operation ${operationName}:`, error.message);
        res.status(502).json({ errors: [{ message: 'Bad Gateway: Upstream service error.' }] });
    }
});

// Prometheus metrics endpoint
app.get('/metrics', async (req, res) => {
    try {
        res.set('Content-Type', proxy.getMetrics.contentType);
        res.end(await proxy.getMetrics());
    } catch (ex) {
        res.status(500).end(ex);
    }
});

const PORT = process.env.PORT || 3000;
const server = app.listen(PORT, () => {
    console.log(`GraphQL Vector Monitor Proxy listening on port ${PORT}`);
    console.log(`Metrics available at http://localhost:${PORT}/metrics`);
});

// 优雅停机
process.on('SIGTERM', () => {
    console.log('SIGTERM signal received: closing HTTP server');
    server.close(() => {
        console.log('HTTP server closed');
        proxy.shutdown().then(() => {
            console.log('Worker thread terminated');
            process.exit(0);
        });
    });
});

这个服务封装了代理,提供了一个新的入口/graphql-proxy,并正确地实现了/metrics端点。同时,包含了优雅停机的逻辑,这在容器化部署环境中至关重要,能确保在进程退出前,所有正在处理的请求和任务都能妥善完成。

架构与数据流

整个系统的数据流和组件交互可以用一个简单的图来表示。

sequenceDiagram
    participant ClientApp as 客户端应用
    participant ProxyServer as Node.js代理服务
    participant WorkerThread as 向量计算线程
    participant GraphQL_API as 目标GraphQL API
    participant Prometheus as Prometheus Server

    ClientApp->>+ProxyServer: POST /graphql-proxy (GraphQL Query)
    ProxyServer->>+GraphQL_API: Forward GraphQL Query
    GraphQL_API-->>-ProxyServer: GraphQL Response (with vectors)
    ProxyServer-->>-ClientApp: GraphQL Response (immediately)
    
    par
        ProxyServer-->>WorkerThread: postMessage (vector data)
    and
        Prometheus->>+ProxyServer: GET /metrics
        ProxyServer-->>-Prometheus: Expose metrics text
    end
    
    WorkerThread->>WorkerThread: Calculate Cosine Similarity
    WorkerThread-->>ProxyServer: postMessage (similarity scores)
    ProxyServer->>ProxyServer: Update Prometheus Histogram

这个架构的优势在于解耦。代理服务对客户端应用和GraphQL API都是透明的,它像一个中间件一样存在,专门负责可观测性数据的生成,而不影响主业务逻辑。

单元测试思路

对于这样一个核心组件,完备的测试是必不可少的。

  1. Proxy单元测试:
    • 使用nockmsw来模拟后端的GraphQL API。
    • 测试成功路径:验证execute方法是否正确转发请求、返回响应,并触发了dispatchVectorCalculation
    • 测试失败路径:模拟网络错误、超时、GraphQL errors数组,验证requestErrors计数器是否正确增加。
    • 测试指标更新:断言prom-client的指标在调用后是否具有期望的值。由于向量计算是异步的,需要巧妙地测试其最终一致性。可以mock工作线程的postMessage来验证主线程的行为。
  2. Worker单元测试:
    • 这是一个纯计算模块,测试非常直接。
    • 提供不同维度、包含零向量、长度不匹配的向量数组,验证cosineSimilarity函数的边缘情况处理。
    • 测试其消息处理逻辑,确保它能正确响应calculateSimilarity类型的消息。

局限性与未来迭代路径

当前实现虽然解决了核心问题,但在生产环境中仍有几个需要考虑的局限性:

  1. 单工作线程瓶颈: 如果GraphQL API的QPS非常高,单个工作线程可能成为瓶颈。未来的迭代可以将此升级为动态的工作线程池,根据负载来增减worker数量。
  2. 向量数据提取的硬编码: dispatchVectorCalculation中解析请求变量和响应体以获取向量的路径是硬编码的。一个更灵活的设计是允许通过配置来指定这些JSON路径,使代理能够适应不同的GraphQL Schema。
  3. 背压处理: 当主线程产生数据的速度远超工作线程的处理速度时,可能会导致内存中待处理消息队列的积压。需要实现一种背压机制,例如当队列长度超过阈值时,暂时停止派发新的计算任务,甚至可以采样处理。
  4. 高基数风险的持续警惕: 虽然我们已经注意到了operationName,但如果未来业务要求增加更多维度的标签(如用户ID、租户ID),必须评估其对Prometheus存储和查询性能的影响。对于这类高基数场景,可能需要转向ClickHouse等更适合的存储方案,或者在代理层进行预聚合。

这个代理的构建过程,是从一个具体的、非标准的可观测性需求出发,通过分层、解耦和对性能瓶颈的预判,逐步实现的一个健壮、可维护的系统组件。它没有止步于一个简单的脚本,而是包含了错误处理、异步隔离、配置化和优雅停机等生产环境所必需的特性。


  目录