采用服务网格实现 Phoenix 与 Ray 分布式计算集群的异构集成


一个生产系统的技术栈选型,很少是纯粹的理想主义产物,它往往是多方权衡与妥协的结果。我们面临的挑战是构建一个高性能的数据处理管道:前端需要一个能稳定承载海量、长连接的网关,用于接收来自边缘设备的实时数据流;后端则需要一个能够利用 Python 生态进行复杂数据分析和机器学习模型推理的分布式计算集群。

Elixir 的 Phoenix 框架,凭借其在 BEAM 虚拟机上的并发能力,是构建高并发网关的不二之选。而 Python 的 Ray 框架,则为分布式计算提供了无与伦比的简洁性和强大的生态支持。问题在于,如何将这两个异构的技术栈——Elixir 与 Python——优雅、安全且可靠地粘合起来?

架构决策的十字路口

最初的方案是传统的 API 网关模式。Phoenix 作为入口,通过 HTTP 调用后端的 Python 服务。这个 Python 服务,我们选择 Sanic,因为它异步的特性与 Python 的 asyncio 生态以及 Ray 的 async API 结合得很好。

这个方案看似直接,但在真实项目中,魔鬼藏在细节里:

  1. 服务发现: Phoenix 如何知道 Sanic 服务的 IP 和端口?硬编码配置?还是引入一个额外的注册中心如 Eureka 或 Zookeeper?这增加了系统的复杂度和维护成本。
  2. 安全性: Phoenix 和 Sanic 之间的通信需要在公网上或内部网络中进行。如何保证通信的机密性和完整性?手动配置 mTLS 证书,并在两个技术栈中分别实现,这会迅速演变成一场证书管理的噩梦,尤其是在服务实例动态伸缩的场景下。
  3. 网络策略: 如何精细化控制服务间的访问权限?例如,只允许 Phoenix 服务调用 Sanic 的特定端点。传统的防火墙规则难以实现应用级别的细粒度控制。

正是在这个背景下,我们排除了传统方案,转向了服务网格(Service Mesh)。具体来说,我们选择了 HashiCorp 的 Consul Connect。它的核心思想是通过在每个服务实例旁部署一个 Sidecar 代理,将所有服务间通信的网络逻辑(如服务发现、mTLS 加密、路由、访问控制)从应用代码中剥离出来,下沉到基础设施层。

这个决策的依据是:我们宁愿在基础设施层面增加一次性的复杂性(部署和维护 Consul),来换取应用层面长期的开发简洁性、安全自动化和跨语言的统一治理能力。

系统架构概览

最终的架构设计如下:

graph TD
    subgraph Edge Devices
        D1[Device 1]
        D2[Device 2]
        D3[Device ...]
    end

    subgraph "Phoenix Gateway (Elixir)"
        A[Phoenix App] --- C1[Consul Sidecar Proxy]
    end

    subgraph "Processing Node (Python)"
        S[Sanic API] --- C2[Consul Sidecar Proxy]
        S -- Manages --> R[Ray Cluster]
        R --- DB[(SQLite)]
    end

    subgraph Ray Cluster
        direction LR
        Head[Ray Head]
        W1[Ray Worker 1]
        W2[Ray Worker 2]
        W3[Ray Worker ...]
        Head --- W1 & W2 & W3
    end

    D1 & D2 & D3 -- WebSocket/HTTP --> A
    A -- "Calls localhost:proxy_port" --> C1
    C1 -- "mTLS Tunnel" --> C2
    C2 -- "Forwards to localhost" --> S
    S -- "ray.remote()" --> Head

    W1 -- "Accesses local state" --> DB
    W2 -- "Accesses local state" --> DB

在这个架构中:

  • Phoenix Gateway: 作为系统的入口,处理高并发的外部连接。它不直接与 Sanic 通信,而是将请求发送到本地的 Consul Sidecar。
  • Consul Connect: Sidecar 代理 C1 和 C2 自动建立 mTLS 隧道。Phoenix 只需知道它要调用名为 processing-service 的服务,Consul 会处理服务发现和安全加密。
  • Sanic API: 扮演 Ray 客户端或驱动者的角色。它接收来自 Phoenix 的计算任务,将其转化为 Ray 的分布式任务,并提交给 Ray Cluster。
  • Ray Cluster: 执行实际的并行计算。每个 Ray Worker 节点都是一个独立的计算单元。
  • SQLite: 在这个架构中,SQLite 的定位非常明确——任务级别的本地持久化存储。每个 Ray Worker 在处理复杂任务时,可能需要缓存中间结果、记录处理日志或持久化状态。使用本地 SQLite 文件,避免了对中心化数据库的频繁网络请求,降低了延迟,也简化了 worker 节点的部署。这是一个常见的错误是,试图将 SQLite 当作分布式数据库来用,而它的真正威力在于提供零配置、高性能的单机事务性存储。

核心实现:代码与配置

让我们深入代码,看看这个架构是如何被具体实现的。假设我们已经在环境中安装并运行了 Consul agent(consul agent -dev)。

1. Phoenix 网关服务的注册与调用

首先,我们需要让 Phoenix 服务在启动时向 Consul 注册自己,并配置一个上游依赖,指向我们的 Python 处理服务。

config/config.exs

# In your Phoenix app's config
import Config

config :my_app, MyAppWeb.Endpoint,
  # ... other configs

# Consul configuration
config :my_app, :consul,
  agent_url: "http://127.0.0.1:8500",
  service_name: "phoenix-gateway",
  service_port: 4000 # The port Phoenix is running on

config :my_app, :http_client,
  # We don't point to the remote service directly.
  # Instead, we point to the local port where Consul will expose
  # the 'processing-service'. This port is defined in the sidecar registration.
  processing_service_url: "http://127.0.0.1:9091"

Consul 服务定义文件: consul/phoenix-gateway.hcl

// This file defines the service and its sidecar proxy for Consul.
service {
  name = "phoenix-gateway"
  port = 4000 // The actual application port

  // Health check for the Phoenix application
  check {
    id       = "phoenix-http-check"
    name     = "Phoenix HTTP Health Check"
    http     = "http://localhost:4000/health"
    method   = "GET"
    interval = "10s"
    timeout  = "2s"
  }

  // Define the sidecar proxy for this service
  connect {
    sidecar_service {
      // Define an upstream dependency. Consul will listen on a local port
      // and proxy connections to the 'processing-service'.
      proxy {
        upstreams {
          destination_name = "processing-service"
          local_bind_port  = 9091 // This is the port our Phoenix app will call
        }
      }
    }
  }
}

要启动 Phoenix 服务并注册到 Consul,我们使用 consul services registerconsul connect proxy 命令。

# Register the service definition
consul services register consul/phoenix-gateway.hcl

# Start the sidecar proxy for our Phoenix service
# This command blocks and manages the proxy lifecycle.
consul connect proxy -sidecar-for phoenix-gateway

现在,Phoenix 应用中的 HTTP 客户端可以直接向 http://127.0.0.1:9091 发起请求,Consul 会自动将这些请求通过 mTLS 隧道安全地转发到 processing-service 的一个健康实例上。

lib/my_app/worker.ex

defmodule MyApp.Worker do
  use Tesla

  plug Tesla.Middleware.BaseUrl, Application.get_env(:my_app, :http_client)[:processing_service_url]
  plug Tesla.Middleware.JSON

  # Function to trigger a computation task
  def trigger_computation(data) do
    # The call is simple and clean. All the complexity of service discovery
    # and mTLS is handled by the Consul sidecar.
    post("/compute", %{payload: data})
  end
end

这里的代码非常干净,它并不知道 processing-service 的物理位置,也不知道通信是加密的。这正是服务网格的价值所在。

2. Sanic + Ray 计算服务的实现

后端服务同样需要向 Consul 注册。

Consul 服务定义文件: consul/processing-service.hcl

service {
  name = "processing-service"
  port = 8000 // The port our Sanic app listens on

  check {
    id       = "sanic-http-check"
    name     = "Sanic HTTP Health Check"
    http     = "http://localhost:8000/health"
    method   = "GET"
    interval = "10s"
    timeout  = "2s"
  }

  connect {
    sidecar_service {} // No upstreams needed for this service
  }
}

启动命令与 Phoenix 类似:

consul services register consul/processing-service.hcl
consul connect proxy -sidecar-for processing-service

现在是 Sanic 应用的核心代码。它接收请求,初始化 Ray,分发任务,并在任务中使用 SQLite。

processing_service.py

import asyncio
import logging
import sqlite3
import os
from sanic import Sanic, response
import ray

# --- Logging Configuration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- Ray Initialization ---
# In a real project, this would connect to an existing Ray cluster.
# For demonstration, we initialize a local one.
if not ray.is_initialized():
    ray.init(num_cpus=4)
    logging.info("Ray cluster initialized.")

app = Sanic("ProcessingService")

# --- Database Helper for Ray Workers ---
def get_db_connection(worker_id: str):
    """
    Each worker gets its own SQLite DB file. This prevents write contention.
    The database path could be a temporary directory or a mounted volume.
    """
    db_path = f"/tmp/worker_{worker_id}.db"
    conn = sqlite3.connect(db_path)
    conn.execute("""
        CREATE TABLE IF NOT EXISTS task_state (
            task_id TEXT PRIMARY KEY,
            status TEXT NOT NULL,
            result REAL,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """)
    conn.commit()
    return conn

# --- Ray Remote Function (The actual workhorse) ---
@ray.remote
def process_data_partition(partition_id: int, data: list) -> tuple:
    """
    A non-trivial task that simulates processing a data partition.
    It performs a calculation and uses a local SQLite DB to log its state.
    """
    worker_id = ray.get_runtime_context().get_worker_id()
    logging.info(f"Worker {worker_id} starting on partition {partition_id}.")
    
    # Simulate complex computation
    result = sum(x * x for x in data) / len(data) if data else 0
    
    # --- Using SQLite for local state ---
    # Here's the key: SQLite is used for worker-local, ephemeral state.
    # It's not a shared database, which would be an anti-pattern.
    conn = None
    try:
        conn = get_db_connection(worker_id)
        cursor = conn.cursor()
        cursor.execute(
            "INSERT OR REPLACE INTO task_state (task_id, status, result) VALUES (?, ?, ?)",
            (f"partition_{partition_id}", "COMPLETED", result)
        )
        conn.commit()
        logging.info(f"Worker {worker_id} saved state for partition {partition_id} to its local SQLite DB.")
    except sqlite3.Error as e:
        logging.error(f"Worker {worker_id} DB error: {e}")
        # In production, add more robust error handling or retry logic.
        if conn:
            conn.rollback()
    finally:
        if conn:
            conn.close()
            
    # Simulate I/O bound work
    asyncio.sleep(0.5)
    
    return partition_id, result

# --- Sanic API Endpoint ---
@app.post("/compute")
async def compute_handler(request):
    """
    Receives data, splits it, and distributes it to the Ray cluster.
    """
    try:
        payload = request.json.get("payload")
        if not isinstance(payload, list):
            return response.json({"error": "Payload must be a list of numbers"}, status=400)
    except Exception:
        return response.json({"error": "Invalid JSON"}, status=400)

    # A common pattern: split a large task into smaller partitions for parallel processing.
    num_partitions = 4
    partitions = [payload[i::num_partitions] for i in range(num_partitions)]
    
    logging.info(f"Distributing task across {len(partitions)} partitions.")
    
    # Asynchronously execute tasks on Ray
    # This is non-blocking for the Sanic event loop.
    futures = [process_data_partition.remote(i, p) for i, p in enumerate(partitions)]
    
    try:
        # Wait for all Ray tasks to complete with a timeout
        results = await asyncio.wait_for(asyncio.gather(*futures), timeout=30.0)
    except asyncio.TimeoutError:
        logging.error("Ray computation timed out.")
        return response.json({"error": "Computation timed out"}, status=504)

    # Aggregate results
    aggregated_results = dict(results)
    final_average = sum(aggregated_results.values()) / len(aggregated_results) if aggregated_results else 0
    
    return response.json({
        "status": "success",
        "partition_results": aggregated_results,
        "final_average": final_average
    })

@app.get("/health")
async def health_check(request):
    return response.text("OK")

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000, workers=1)

这段代码展示了几个关键点:

  1. Sanic 作为驱动: Sanic 本身不执行重计算,它的角色是接收请求、验证输入、将任务分解并提交给 Ray。它的异步特性使得它在等待 Ray 任务完成时不会阻塞,可以继续处理其他请求。
  2. Ray 的并行计算: process_data_partition 函数被 @ray.remote 装饰,意味着它可以被调度到 Ray 集群的任意一个 Worker 上并行执行。
  3. SQLite 的正确定位: get_db_connection 函数为每个 Ray worker 创建一个独立的 SQLite 数据库文件。这完美地契合了 SQLite 的优势:快速、零配置、事务性的本地存储。它用于记录任务状态,而不是在 worker 之间共享数据。

3. 用 Consul Intentions 锁定安全

目前,任何注册在 Consul 中的服务都可以调用 processing-service。在生产环境中,我们需要收紧权限。这通过 Consul Intentions 实现,它是一种声明式的访问策略。

我们可以通过 UI 或命令行创建一个 Intention,只允许 phoenix-gateway 调用 processing-service

# Create an intention: allow phoenix-gateway -> processing-service
consul intention create -allow phoenix-gateway processing-service

执行此命令后,Consul 会立即更新其 Sidecar 代理的配置。任何来自其他服务(例如,一个恶意的、或配置错误的服务)的连接请求都将被 Sidecar 在网络层面直接拒绝,甚至无法到达 Sanic 应用。这种默认拒绝、显式允许的零信任网络模型,极大地提升了系统的安全性。

架构的扩展性与局限性

这套架构并非银弹,理解其边界至关重要。

扩展性:

  • 计算能力扩展: 增加 Ray 集群的 Worker 节点即可水平扩展计算能力,这个过程对 Phoenix 和 Sanic 是透明的。
  • 服务扩展: 可以在服务网格中轻松加入新的服务(例如,一个用 Go 编写的日志服务),并使用 Intentions 精确控制它与其他服务的交互,而无需修改现有服务的代码。
  • 网关扩展: Phoenix 网关本身可以部署多个实例,并通过负载均衡器对外提供服务。Consul 会自动处理到后端 processing-service 的负载均衡。

局限性:

  • Consul 运维成本: Consul 集群自身成为了关键的基础设施,需要保证其高可用性,这带来了额外的运维负担。
  • Sidecar 性能开销: 尽管 Consul 的 Sidecar(基于 Envoy)性能很高,但它仍然为每次网络调用增加了一层代理,会引入微秒级的延迟。对于纳秒级延迟要求的场景(如高频交易),这可能是不可接受的。
  • SQLite 的状态隔离: 本地 SQLite 的方案意味着 Ray worker 之间的状态是隔离的。如果任务需要共享状态,则需要引入分布式缓存(如 Redis)或分布式文件系统,这会增加架构的复杂性。
  • 开发环境的复杂性: 本地开发需要运行 Consul agent 和 Sidecar 代理,这比直接运行应用要复杂一些。需要通过 Docker Compose 或类似的工具来简化开发环境的搭建。

最终,这个架构选择是用基础设施的复杂性,换取了应用层的简洁、安全与语言无关性。对于需要整合多种技术栈、对安全和可维护性有较高要求的分布式系统而言,这通常是一笔划算的交易。


  目录