系统监控的复杂性往往不在于单个组件,而在于组件之间的“缝隙”。一个请求进入API,写入SQL Server,然后发布一条消息到Google Cloud Pub/Sub,由另一个后台服务消费。当消费者服务抛出一个异常时,我们在Sentry里看到一个孤立的错误报告。在Jaeger里,我们可能看到两条断开的链路:一条是API到Pub/Sub发布的,另一条是消费者服务自己启动的。它们之间没有任何关联。这种断裂的遥测数据在排查问题时是致命的,尤其是在复杂的分布式系统中。
我们面临的真实挑战就是缝合这个缺口。目标是,无论一个业务流程如何跨越同步调用(如数据库操作)和异步消息传递,我们都能在Jaeger中看到一条完整的、连贯的分布式链路。同时,任何环节的错误,都应该能在Sentry中被捕获,并且能精确地关联到这条完整的链路上。
问题的根源在于上下文的丢失。分布式追踪的核心是上下文传播(Context Propagation),通常是通过W3C Trace Context标准实现的traceparent
头。在HTTP服务间调用时,这通常是自动的。但当请求流经一个消息队列时,这个上下文必须被手动或通过特定工具库序列化到消息中,然后在消费者端再反序列化出来,才能将链路延续下去。
我们将从一个典型的.NET项目入手,一步步构建这个统一的可观测性管道。这个过程将暴露许多现实世界中的陷阱,并给出生产级的解决方案。
第一阶段:生产者服务的基础链路构建
我们从一个ASP.NET Core Web API项目开始,它负责接收外部请求,与SQL Server交互,并向Pub/Sub发布消息。我们的第一步是确保API和数据库的交互能够被正确追踪。
在真实项目中,我们会使用OpenTelemetry
作为标准,因为它提供了与供应商无关的API和SDK,能够同时对接到Jaeger(用于追踪)和Sentry(用于错误和性能监控)。
首先,配置生产者服务的依赖项 (Producer.API.csproj
):
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
</PropertyGroup>
<ItemGroup>
<!-- 核心 OpenTelemetry 包 -->
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.7.0" />
<PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.7.1" />
<PackageReference Include="OpenTelemetry.Instrumentation.Http" Version="1.7.1" />
<!-- SQL Server 客户端追踪 -->
<PackageReference Include="OpenTelemetry.Instrumentation.SqlClient" Version="1.6.0-beta.3" />
<!-- Jaeger 导出器 -->
<PackageReference Include="OpenTelemetry.Exporter.Jaeger" Version="1.6.0" />
<!-- Sentry 与 OpenTelemetry 集成 -->
<PackageReference Include="Sentry.OpenTelemetry" Version="4.2.0" />
<!-- Google Cloud Pub/Sub 客户端 -->
<PackageReference Include="Google.Cloud.PubSub.V1" Version="3.8.0" />
<!-- 数据库驱动 -->
<PackageReference Include="Microsoft.Data.SqlClient" Version="5.1.2" />
</ItemGroup>
</Project>
接下来是核心的Program.cs
配置。这里的关键是将所有需要的Instrumentation
(检测工具)和Exporter
(导出器)注册到OpenTelemetry的TracerProvider
。
// Program.cs in Producer.API
using System.Diagnostics;
using Google.Cloud.PubSub.V1;
using Microsoft.Data.SqlClient;
using OpenTelemetry;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
var builder = WebApplication.CreateBuilder(args);
// 定义服务名称,这在分布式追踪中至关重要
const string serviceName = "Producer.API";
const string serviceVersion = "1.0.0";
// 1. 配置 OpenTelemetry
builder.Services.AddOpenTelemetry()
.ConfigureResource(resource => resource
.AddService(serviceName: serviceName, serviceVersion: serviceVersion))
.WithTracing(tracing => tracing
.AddSource(serviceName) // 添加自定义 ActivitySource
.AddAspNetCoreInstrumentation() // 自动检测 ASP.NET Core 请求
.AddHttpClientInstrumentation() // 自动检测 HttpClient 调用
.AddSqlClientInstrumentation(options => // 自动检测 SQL Server 调用
{
options.SetDbStatementForText = true; // 记录 SQL 语句
options.RecordException = true;
})
.AddJaegerExporter(options => // 配置 Jaeger 导出器
{
options.AgentHost = builder.Configuration.GetValue<string>("Jaeger:Host");
options.AgentPort = builder.Configuration.GetValue<int>("Jaeger:Port");
})
// 关键一步: 添加 Sentry 处理器,它会把追踪信息关联到 Sentry 事件
.AddSentry()
);
// 2. 注册自定义的 ActivitySource,用于手动创建 Span
builder.Services.AddSingleton(new ActivitySource(serviceName));
// 3. 注册 Google Cloud Pub/Sub PublisherClient
// 在真实项目中,ProjectId 和 TopicId 会来自配置
builder.Services.AddSingleton(await new PublisherClientBuilder
{
TopicName = new TopicName("your-gcp-project-id", "my-topic")
}.BuildAsync());
var app = builder.Build();
// Web API 端点
app.MapPost("/orders", async (
OrderRequest request,
PublisherClient publisher,
ActivitySource activitySource,
ILogger<Program> logger) =>
{
// 模拟数据库操作
using (var activity = activitySource.StartActivity("Database:SaveOrder", ActivityKind.Client))
{
activity?.SetTag("db.system", "sqlserver");
activity?.SetTag("order.id", request.OrderId);
// 这里的连接字符串应来自配置
const string connectionString = "Server=tcp:your_server.database.windows.net,1433;Initial Catalog=your_db;...";
await using var connection = new SqlConnection(connectionString);
await connection.OpenAsync();
var command = new SqlCommand("INSERT INTO Orders (Id, Product, Amount) VALUES (@Id, @Product, @Amount)", connection);
command.Parameters.AddWithValue("@Id", request.OrderId);
command.Parameters.AddWithValue("@Product", request.Product);
command.Parameters.AddWithValue("@Amount", request.Amount);
await command.ExecuteNonQueryAsync();
logger.LogInformation("Order {OrderId} saved to database.", request.OrderId);
}
// 此时,链路可以从 API 入口追踪到数据库操作
// 但接下来发布消息时,链路将中断
// 后续会在这里添加上下文传播逻辑
var message = new PubsubMessage
{
Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(request))
};
await publisher.PublishAsync(message);
logger.LogInformation("Message for Order {OrderId} published.", request.OrderId);
return Results.Accepted();
});
// 配置 Sentry SDK
builder.WebHost.UseSentry(options =>
{
// DSN 应该从安全配置中读取
options.Dsn = "your-sentry-dsn";
options.TracesSampleRate = 1.0; // 在生产中调整采样率
options.UseOpenTelemetry(); // 启用 Sentry 与 OpenTelemetry 的集成
});
app.Run();
public record OrderRequest(string OrderId, string Product, decimal Amount);
此时,如果我们启动Jaeger和这个API服务,然后发送一个POST请求到/orders
,我们会在Jaeger UI中看到一条链路。这条链路包含了从POST /orders
开始的根Span,以及一个子Span Database:SaveOrder
,甚至还有一个SqlClient
自动生成的更底层的DB Span。这证明了我们的第一阶段目标达成:同步调用链是完整的。
第二阶段:暴露并解决异步边界的断裂问题
现在我们创建消费者服务。它是一个简单的后台工作进程(BackgroundService),负责监听Pub/Sub主题并处理消息。
消费者服务的项目文件 (Consumer.Worker.csproj
) 与生产者类似,但不需要AspNetCore
检测,而是需要一个宿主环境。
// Program.cs in Consumer.Worker
using System.Diagnostics;
using Consumer.Worker;
using Google.Cloud.PubSub.V1;
using OpenTelemetry;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
const string serviceName = "Consumer.Worker";
const string serviceVersion = "1.0.0";
IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddOpenTelemetry()
.ConfigureResource(resource => resource
.AddService(serviceName: serviceName, serviceVersion: serviceVersion))
.WithTracing(tracing => tracing
.AddSource(serviceName)
.AddJaegerExporter(options => // 配置 Jaeger
{
options.AgentHost = hostContext.Configuration.GetValue<string>("Jaeger:Host");
options.AgentPort = hostContext.Configuration.GetValue<int>("Jaeger:Port");
})
.AddSentry());
services.AddSingleton(new ActivitySource(serviceName));
// 注册 Pub/Sub SubscriberClient
services.AddSingleton(async (provider) => await new SubscriberClientBuilder
{
SubscriptionName = new SubscriptionName("your-gcp-project-id", "my-subscription"),
ClientCreationSettings = new ClientCreationSettings(credentials: provider.GetService<Google.Api.Gax.Grpc.ChannelCredentials>())
}.BuildAsync());
services.AddHostedService<Worker>();
})
.UseSentry(options =>
{
options.Dsn = "your-sentry-dsn";
options.TracesSampleRate = 1.0;
options.UseOpenTelemetry();
})
.Build();
host.Run();
Worker.cs
是消费逻辑的核心:
// Worker.cs in Consumer.Worker
public class Worker : BackgroundService
{
private readonly ILogger<Worker> _logger;
private readonly SubscriberClient _subscriber;
private readonly ActivitySource _activitySource;
public Worker(ILogger<Worker> logger, SubscriberClient subscriber, ActivitySource activitySource)
{
_logger = logger;
_subscriber = subscriber;
_activitySource = activitySource;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _subscriber.StartAsync((message, token) =>
{
// 在这里,我们创建了一个新的根 Span
// 因为没有从消息中提取任何上下文
using (var activity = _activitySource.StartActivity("ProcessOrderMessage", ActivityKind.Consumer))
{
var orderJson = message.Data.ToStringUtf8();
_logger.LogInformation("Received message: {messageId}, Body: {body}", message.MessageId, orderJson);
activity?.SetTag("messaging.system", "gcp_pubsub");
activity?.SetTag("messaging.message_id", message.MessageId);
// 模拟处理逻辑
ProcessOrder(orderJson);
}
// 确认消息
return Task.FromResult(SubscriberClient.Reply.Ack);
});
}
private void ProcessOrder(string orderJson)
{
_logger.LogInformation("Processing order: {order}", orderJson);
// 模拟一些工作
Thread.Sleep(100);
}
}
现在,如果我们运行整个系统(生产者API和消费者Worker),然后向API发送请求,我们会观察到预期的“断裂”现象:
- 在Jaeger中,出现一条源自
Producer.API
的链路,它在PublishAsync
调用后就结束了。 - 同时,出现另一条完全独立的、源自
Consumer.Worker
的新链路,它的根Span是ProcessOrderMessage
。 - 这两条链路之间没有任何父子关系。这就是上下文丢失的直接后果。
第三阶段:手动注入与提取追踪上下文
要修复这个问题,我们必须手动扮演Instrumentation
库的角色:在发布消息前,将当前的追踪上下文注入到消息的属性中;在消费消息时,从属性中提取它,并用它来启动新的Span。
W3C Trace Context 标准定义了两个关键的HTTP头:traceparent
和tracestate
。我们将它们作为Pub/Sub消息的Attributes
来传递。
为此,我们创建一个辅助类TraceContextPropagator
来封装这个逻辑。
// TraceContextPropagator.cs (a shared utility class)
using System.Diagnostics;
using Google.Cloud.PubSub.V1;
using OpenTelemetry;
using OpenTelemetry.Context.Propagation;
public static class TraceContextPropagator
{
private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator;
// 将当前 Activity Context 注入到 PubsubMessage 的 Attributes 中
public static void Inject(PubsubMessage message)
{
// 这是一个关键的步骤,它使用 OpenTelemetry 的默认传播器(通常是 W3C TraceContext)
// 将当前的 Activity 上下文(traceId, spanId 等)序列化到一个字典中。
Propagator.Inject(new PropagationContext(Activity.Current?.Context ?? default, Baggage.Current), message,
(msg, key, value) =>
{
// Action<T, string, string> for setting attributes
if (msg.Attributes.ContainsKey(key))
{
msg.Attributes[key] = value;
}
else
{
msg.Attributes.Add(key, value);
}
});
}
// 从 PubsubMessage 的 Attributes 中提取父上下文
public static PropagationContext Extract(PubsubMessage message)
{
// 这是一个逆向过程,从消息的属性中读取 W3C TraceContext 信息,
// 并将其反序列化为一个 PropagationContext 对象,这个对象可以用来建立父子Span关系。
return Propagator.Extract(default, message, (msg, key) =>
{
msg.Attributes.TryGetValue(key, out var value);
return new[] { value };
});
}
}
现在,我们来改造生产者和消费者的代码。
在生产者 Producer.API
中:
修改/orders
端点的消息发布部分:
// ... inside app.MapPost("/orders", ...)
// ...
var message = new PubsubMessage
{
Data = Google.Protobuf.ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(request))
};
// 关键步骤: 在发布前注入追踪上下文
using (var activity = activitySource.StartActivity("PubSub:Publish", ActivityKind.Producer))
{
TraceContextPropagator.Inject(message);
activity?.SetTag("messaging.system", "gcp_pubsub");
activity?.SetTag("messaging.destination.name", "my-topic");
await publisher.PublishAsync(message);
}
logger.LogInformation("Message for Order {OrderId} published with trace context.", request.OrderId);
// ...
在消费者 Consumer.Worker
中:
修改ExecuteAsync
的消息处理回调:
// ... inside Worker.ExecuteAsync ...
await _subscriber.StartAsync((message, token) =>
{
// 关键步骤: 提取父上下文
var parentContext = TraceContextPropagator.Extract(message);
// 将提取的上下文与当前的 Baggage 关联起来
Baggage.Current = parentContext.Baggage;
// 使用提取的父上下文来启动新的 Activity (Span)
// 这样 OpenTelemetry 才能建立正确的父子关系
using (var activity = _activitySource.StartActivity("ProcessOrderMessage", ActivityKind.Consumer, parentContext.ActivityContext))
{
var orderJson = message.Data.ToStringUtf8();
_logger.LogInformation("Received message: {messageId}, Body: {body}", message.MessageId, orderJson);
activity?.SetTag("messaging.system", "gcp_pubsub");
activity?.SetTag("messaging.message_id", message.MessageId);
ProcessOrder(orderJson);
}
return Task.FromResult(SubscriberClient.Reply.Ack);
});
// ...
重启整个系统并再次发送请求。现在去Jaeger UI查看,奇迹发生了:你会看到一条完整的链路图。它从Producer.API
的HTTP请求开始,经过数据库操作,然后是一个PubSub:Publish
Span,紧接着是Consumer.Worker
的ProcessOrderMessage
Span。它们通过正确的父子关系连接在一起,形成了一个完整的业务流程视图。我们成功地缝合了异步边界。
第四阶段:集成Sentry,完成闭环
最后一步是验证Sentry的集成。Sentry的UseOpenTelemetry()
扩展非常强大,它会自动捕获由OpenTelemetry创建的Span,并将它们作为性能事务(Transaction)发送到Sentry。更重要的是,当一个异常发生时,Sentry SDK会捕获当前的Activity.Current.Context
,也就是当前的Span信息(Trace ID, Span ID),并将其附加到错误事件上。
我们在消费者中模拟一个处理失败的场景:
// Modify Worker.cs
private void ProcessOrder(string orderJson)
{
using var activity = _activitySource.StartActivity("SimulateProcessing");
try
{
_logger.LogInformation("Processing order: {order}", orderJson);
Thread.Sleep(100);
// 模拟一个可预测的错误
if (orderJson.Contains("fail"))
{
throw new InvalidOperationException($"Failed to process order: {orderJson}");
}
_logger.LogInformation("Order processed successfully.");
}
catch (Exception ex)
{
// Sentry SDK 会自动捕获未处理的异常
// 我们也可以手动捕获并记录,Sentry 仍能关联上下文
_logger.LogError(ex, "An error occurred during order processing.");
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
// 重抛异常,让 Sentry SDK 捕获
throw;
}
}
现在,发送一个包含"product": "fail"
的请求。
- 在 Jaeger 中: 你会看到完整的链路,并且
ProcessOrderMessage
或SimulateProcessing
这个Span会被标记为错误状态。 - 在 Sentry 中: 你会看到一个新的
InvalidOperationException
问题。打开它,神奇之处在于:- 在事件详情中,你会看到
Trace ID
和Span ID
。 - 有一个“View in Jaeger”或类似的链接(取决于Sentry的集成配置),点击它可以直接跳转到Jaeger UI中对应的、完整的分布式链路。
- 在Sentry的”Performance”页面,你也能看到这个完整的事务,从API入口一直到消费者失败。
- 在事件详情中,你会看到
我们已经完成了端到端的统一可观测性管道。
sequenceDiagram participant Client participant Producer.API participant SQL Server participant Google Cloud Pub/Sub participant Consumer.Worker participant Jaeger participant Sentry Client->>+Producer.API: POST /orders Producer.API->>Producer.API: Start Trace (TraceID: T1) Producer.API->>+SQL Server: INSERT INTO Orders SQL Server-->>-Producer.API: Success Producer.API->>Producer.API: Get Current TraceContext (T1) Producer.API->>Google Cloud Pub/Sub: Publish(Message + TraceContext) Producer.API-->>-Client: 202 Accepted Note right of Google Cloud Pub/Sub: Message with T1 waits in topic Consumer.Worker->>+Google Cloud Pub/Sub: Pull Message Google Cloud Pub/Sub-->>-Consumer.Worker: Return Message with T1 Consumer.Worker->>Consumer.Worker: Extract TraceContext (T1) Consumer.Worker->>Consumer.Worker: Start Child Span from T1 Consumer.Worker->>Consumer.Worker: ProcessOrder() -> throws Exception Consumer.Worker->>Sentry: Report Exception with TraceID T1 Note over Producer.API, Consumer.Worker: All spans are sent to Jaeger Producer.API->>Jaeger: Send Spans Consumer.Worker->>Jaeger: Send Spans
方案的局限性与未来迭代
这个手动注入和提取上下文的方案虽然有效,但在生产环境中存在维护成本。一个常见的错误是,开发人员在添加新的消息发布或消费逻辑时,忘记调用TraceContextPropagator
,这将导致链路再次断裂。理想的长期解决方案是等待或贡献官方的OpenTelemetry.Instrumentation.GoogleCloudPubSub
库,使其能像SqlClient
或AspNetCore
一样自动完成上下文传播。
此外,本示例中为了清晰起见,将采样率设为100% (TracesSampleRate = 1.0
)。在流量大的生产系统中,这会带来巨大的性能和成本开销。必须引入更智能的采样策略,例如基于头部的采样(Head-based Sampling)或更高级的尾部采样(Tail-based Sampling),后者允许系统在链路完整结束后再决定是否保留这条链路,对于定位偶发错误尤其有效。
最后,我们只传递了traceparent
。OpenTelemetry的Baggage
API允许在追踪上下文中携带业务数据(如userId
、tenantId
),这些数据会自动传播到下游所有服务。这对于在Sentry错误或Jaeger链路中快速定位到具体用户或租户的问题非常有价值,是下一步优化的重要方向。