构建统一AI入口:千问/ChatGPT/豆包/文心一言的API聚合架构实战

2次阅读
没有评论

共计 2700 个字符,预计需要花费 7 分钟才能阅读完成。

image.webp

背景痛点

在企业级 AI 应用中,同时接入多个大模型服务已成为常态。以我们团队的实际需求为例,需要同时对接阿里的千问、OpenAI 的 ChatGPT、字节的豆包和百度的文心一言。这种多模型并存的场景下,我们遇到了几个典型问题:

构建统一 AI 入口:千问 /ChatGPT/ 豆包 / 文心一言的 API 聚合架构实战

  1. 协议差异
  2. 千问使用 RESTful JSON 格式
  3. 文心一言要求 Protocol Buffers 编码
  4. ChatGPT 的流式响应需要 SSE 处理

  5. 性能波动

  6. 豆包 API 平均响应时间在 200-800ms 波动
  7. ChatGPT 在高峰时段可能出现 1.5s+ 延迟

  8. 成本控制

  9. 文心一言按调用次数计费
  10. ChatGPT 基于 token 数量收费
  11. 豆包有并发连接数限制

架构设计

我们采用适配器模式 + 智能路由的核心架构,通过三层结构实现统一接入:

@startuml
skinparam monochrome true

rectangle "API 网关层" {
    component "鉴权代理"
    component "协议转换"
}

rectangle "路由中台" {
    component "适配器工厂" as AdapterFactory
    component "动态路由器" as Router
    database "权重配置" as Weights
}

rectangle "厂商接口" {
    component "千问"
    component "ChatGPT"
    component "豆包"
    component "文心一言"
}

鉴权代理 --> 协议转换
协议转换 --> AdapterFactory
AdapterFactory --> Router
Router --> Weights
Router --> 千问
Router --> ChatGPT
Router --> 豆包
Router --> 文心一言
@enduml

关键设计点:

  1. 适配器标准化
  2. 定义统一请求体 AIRequest 包含:

    record AIRequest(
        String query,
        List<Message> history,
        ModelType preferredModel
    ) {}

  3. 动态路由算法

  4. 基于最近 5 分钟成功率、响应时间和成本权重计算得分
  5. 公式:score = (0.4×成功率) + (0.3×(1- 响应时间占比)) + (0.3×成本系数)

代码实现

熔断限流配置

使用 Resilience4j 实现带自适应能力的保护机制:

@Bean
public CircuitBreakerConfig circuitBreakerConfig() {return CircuitBreakerConfig.custom()
        .failureRateThreshold(50) // 失败率阈值
        .waitDurationInOpenState(Duration.ofSeconds(30))
        .ringBufferSizeInHalfOpenState(10)
        .ringBufferSizeInClosedState(100)
        .recordExceptions(
            IOException.class, 
            TimeoutException.class,
            AIApiException.class)
        .build();}

@Bean
public RateLimiterConfig rateLimiterConfig() {return RateLimiterConfig.custom()
        .limitRefreshPeriod(Duration.ofSeconds(1))
        .limitForPeriod(50) // 初始 QPS
        .timeoutDuration(Duration.ofMillis(500))
        .dynamicConfiguration(() -> getCurrentModelTraffic() * 0.8 // 动态调整)
        .build();}

权重自适应逻辑

public class SmartRouter {private final Map<ModelType, Double> modelWeights = new ConcurrentHashMap<>();

    @Scheduled(fixedRate = 300_000)
    void updateWeights() {statsService.getModelStats().forEach((model, stats) -> {double score = calculateScore(stats);
            modelWeights.put(model, score);
        });
    }

    private double calculateScore(ModelStats stats) {double availabilityScore = stats.successRate() * 0.4;
        double latencyScore = (1 - stats.avgLatency()/1000.0) * 0.3;
        double costScore = (1 - normalizeCost(stats.costPerCall())) * 0.3;

        return availabilityScore + latencyScore + costScore;
    }
}

生产考量

幂等性处理方案对比

模型 建议方案 注意事项
千问 客户端生成 UUID 需自行维护去重窗口
ChatGPT 使用 idempotency-key 请求头 官方支持但有时效限制
文心一言 签名 + 参数 hash 作为唯一键 需考虑流式响应场景

数据安全方案

  1. 传输层
  2. 所有请求经过零信任网关
  3. 敏感字段使用国密 SM4 加密

  4. 日志处理

    @Around("execution(* com..ai.*.*(..))")
    public Object sanitizeLog(ProceedingJoinPoint pjp) {Object[] args = Arrays.stream(pjp.getArgs())
            .map(arg -> arg instanceof String s ? s.replaceAll("(?<=.{3}).", "*") : arg)
            .toArray();
        return pjp.proceed(args);
    }

避坑指南

  1. 文心一言流式响应
  2. 必须配置 Connection: keep-alive
  3. 建议使用连接池并设置:

    httpclient:
      max-connections: 50
      keep-alive: 30s

  4. ChatGPT 计费误差

  5. 实际 token 数比 API 返回多 5 -8%
  6. 建议代码预计算:
    int estimateTokens(String text) {return (int)(text.length() * 0.75); // 经验系数
    }

思考题

当用户会话需要在不同模型间切换时(如从千问转到 ChatGPT),如何设计上下文保持方案?考虑以下维度:

  1. 对话历史摘要生成策略
  2. 跨模型的知识对齐问题
  3. 上下文 token 数量的优化压缩

欢迎在评论区分享你的架构设计思路。

正文完
 0
评论(没有评论)