目录
  1. 1. 一、API 调用链路
  2. 2. 二、claude.ts — API 核心
    1. 2.1. 2.1 主要导出函数
    2. 2.2. 2.2 请求参数构建
    3. 2.3. 2.3 System Prompt 格式化
  3. 3. 三、Streaming 事件流
    1. 3.1. 3.1 SSE 事件类型
    2. 3.2. 3.2 流式处理过程
  4. 4. 四、认证系统
    1. 4.1. 4.1 认证方式优先级
    2. 4.2. 4.2 OAuth 流程
    3. 4.3. 4.3 API Key 轮换
  5. 5. 五、重试策略
    1. 5.1. 错误分类
  6. 6. 六、Prompt Cache 优化
  7. 7. 七、Token 计数
  8. 8. 八、多提供商支持
  9. 9. 九、API 预连接
  10. 10. 十、Usage 追踪
【Claude Code源码剖析】08-API 通信与 Streaming

Claude Code 与 Anthropic API 之间的通信层,3420 行的核心模块。


一、API 调用链路

query.ts (Agentic Loop)


services/api/claude.ts (3420 行, API 核心)

├─ 构建请求参数 (system prompt, messages, tools, betas)
├─ 添加认证 (API Key / OAuth / AWS / GCP)
├─ 配置重试策略


@anthropic-ai/sdk (官方 SDK)

├─ Streaming 请求 (SSE)


Anthropic API Server


SSE 事件流 → 解析 → 转换 → yield 给 query.ts

二、claude.ts — API 核心

2.1 主要导出函数

// Streaming API 调用 (主要使用)
export async function* streamClaude(params: {
systemPrompt: SystemPrompt;
messages: Message[];
tools: Tools;
model: string;
maxTokens: number;
thinkingConfig?: ThinkingConfig;
// ...
}): AsyncGenerator<StreamEvent> {
// 1. 构建 BetaMessageStreamParams
// 2. 调用 SDK stream
// 3. 解析并 yield 事件
}

// 获取模型最大输出 token
export function getMaxOutputTokensForModel(model: string): number;

2.2 请求参数构建

function buildRequestParams(config): BetaMessageStreamParams {
return {
model: config.model, // "claude-sonnet-4-20250514"
max_tokens: config.maxTokens, // 16384 (默认)
system: formatSystemPrompt(config.systemPrompt),
messages: normalizeMessagesForAPI(config.messages),
tools: config.tools.map(toolToAPISchema),
stream: true,

// Betas (新功能)
betas: getMergedBetas(config.model),

// 思维链
thinking: config.thinkingConfig ? {
type: 'enabled',
budget_tokens: config.thinkingConfig.budgetTokens,
} : undefined,

// 缓存控制
...getCacheControlParams(config),

// 额外头部
extra_headers: {
...getAttributionHeader(),
...getCLISyspromptPrefix(),
},
};
}

2.3 System Prompt 格式化

// System Prompt 被拆分为多个缓存段,最大化 prompt cache 命中率
function formatSystemPrompt(prompt: SystemPrompt): ContentBlockParam[] {
return [
// Segment 1: 核心指令 (很少变化,高缓存命中)
{
type: 'text',
text: prompt.coreInstructions,
cache_control: { type: 'ephemeral' },
},
// Segment 2: 工具描述 (较少变化)
{
type: 'text',
text: prompt.toolDescriptions,
cache_control: { type: 'ephemeral' },
},
// Segment 3: 环境上下文 (每次会话变化)
{
type: 'text',
text: prompt.environmentContext,
},
];
}

三、Streaming 事件流

3.1 SSE 事件类型

type StreamEvent =
| { type: 'message_start'; message: BetaMessage }
| { type: 'content_block_start'; content_block: ContentBlock }
| { type: 'content_block_delta'; delta: ContentDelta }
| { type: 'content_block_stop' }
| { type: 'message_delta'; delta: { stop_reason: StopReason } }
| { type: 'message_stop' }

// Content Block 类型
type ContentBlock =
| { type: 'text'; text: string }
| { type: 'tool_use'; id: string; name: string; input: unknown }
| { type: 'thinking'; thinking: string } // 思维链
| { type: 'server_tool_use'; ... } // 服务器端工具

// Stop Reason
type StopReason =
| 'end_turn' // 正常结束
| 'tool_use' // 需要执行工具
| 'max_tokens' // 达到最大 token
| 'stop_sequence' // 命中停止序列

3.2 流式处理过程

// 简化的事件处理流程
for await (const event of sdk.stream(params)) {
switch (event.type) {
case 'content_block_start':
if (event.content_block.type === 'text') {
// 开始新的文本块
yield { type: 'text_start' };
} else if (event.content_block.type === 'tool_use') {
// 开始新的工具调用 → 可以提前执行
streamingToolExecutor.addTool(event.content_block);
}
break;

case 'content_block_delta':
if (event.delta.type === 'text_delta') {
// 文本增量 → 实时渲染到终端
yield { type: 'text', text: event.delta.text };
} else if (event.delta.type === 'input_json_delta') {
// 工具输入增量 → 累积 JSON
accumulateToolInput(event.delta.partial_json);
}
break;

case 'message_delta':
if (event.delta.stop_reason === 'tool_use') {
// 需要执行工具 → 收集结果后继续循环
}
break;
}
}

四、认证系统

4.1 认证方式优先级

1. API Key (环境变量 ANTHROPIC_API_KEY)
2. OAuth Token (Anthropic Console 登录)
3. AWS Bedrock (IAM 认证)
4. Google Cloud Vertex AI (GCP 认证)
5. File Descriptor (--api-key-fd 传入)

4.2 OAuth 流程

// services/oauth/client.ts
// 1. 打开浏览器 → Anthropic Console 登录
// 2. 用户授权
// 3. 回调 → 获取 access_token + refresh_token
// 4. 存入 macOS Keychain / 系统安全存储
// 5. access_token 过期 → 自动刷新

// 启动时预取:
startKeychainPrefetch(); // 在 main.tsx 最顶部并行预取

4.3 API Key 轮换

// 当 401 错误时:
// 1. 如果是 OAuth → 尝试 refresh token
// 2. 如果是 API Key → 提示用户更新
// 3. 如果是 AWS/GCP → 尝试重新获取临时凭证

五、重试策略

// services/api/withRetry.ts
async function withRetry<T>(
fn: () => Promise<T>,
config: RetryConfig,
): Promise<T> {
let lastError: Error;

for (let attempt = 0; attempt < config.maxRetries; attempt++) {
try {
return await fn();
} catch (error) {
lastError = error;
const retryInfo = categorizeRetryableAPIError(error);

if (!retryInfo.retryable) throw error;

// 指数退避
const delay = Math.min(
retryInfo.retryAfter || config.baseDelay * Math.pow(2, attempt),
config.maxDelay
);

await sleep(delay);
}
}

throw lastError;
}

错误分类

状态码 分类 处理
429 Rate Limit 重试 (指数退避)
500 Server Error 重试
502/503 Server Unavailable 重试
401 Unauthorized 刷新 token 后重试
400 (prompt_too_long) 上下文溢出 压缩后重试
400 (其他) 请求错误 不重试
网络错误 Connection Error 重试

六、Prompt Cache 优化

// services/api/promptCacheBreakDetection.ts
// Anthropic API 支持 prompt cache:
// 相同的 system prompt + 相同的消息前缀 → 缓存命中 → 更快更便宜

// Claude Code 的优化策略:
// 1. System prompt 分段,高频部分标记 cache_control
// 2. 消息历史只追加不修改 (append-only)
// 3. 压缩时保留前缀不变
// 4. 检测缓存破裂 → 日志报告

function notifyCompaction(): void {
// 压缩会导致缓存失效,记录这次破裂事件
}

七、Token 计数

// utils/tokens.ts
export function tokenCountWithEstimation(messages: Message[]): number {
// 1. 如果有 API 返回的精确计数 → 使用它
if (lastResponseUsage) {
return lastResponseUsage.input_tokens;
}

// 2. 否则使用估算
// 粗略估算: 1 token ≈ 4 characters (英文)
// 更准确: 使用 tiktoken 或类似库
const totalChars = messages.reduce(
(sum, msg) => sum + getContentText(msg).length,
0
);
return Math.ceil(totalChars / 4);
}

// 上下文窗口配置
export function getContextWindowForModel(model: string): number {
// claude-sonnet-4: 200,000 tokens
// claude-opus-4: 200,000 tokens
// claude-haiku-3.5: 200,000 tokens
// 默认: 200,000 tokens
}

八、多提供商支持

// utils/model/providers.ts
type APIProvider =
| 'anthropic' // 官方 API
| 'bedrock' // AWS Bedrock
| 'vertex' // Google Vertex AI

function getAPIProvider(): APIProvider {
if (process.env.ANTHROPIC_BEDROCK_BASE_URL) return 'bedrock';
if (process.env.ANTHROPIC_VERTEX_BASE_URL) return 'vertex';
return 'anthropic';
}

// 不同提供商的参数差异:
// - Bedrock: 需要 AWS IAM 签名,不同的 model ID 格式
// - Vertex: 需要 GCP access token,不同的 endpoint
// - Anthropic: 标准 API Key

九、API 预连接

// utils/apiPreconnect.ts
// 在 init() 阶段就建立 TCP 连接(不发数据)
// 后续 API 调用直接复用连接,减少首次请求延迟

export function preconnectAnthropicApi(): void {
// DNS 解析 + TCP 握手 + TLS 握手
// 大约节省 100-300ms 的首次请求时间
}

十、Usage 追踪

// services/api/usage.ts + services/api/logging.ts
type NonNullableUsage = {
input_tokens: number;
output_tokens: number;
cache_creation_input_tokens: number;
cache_read_input_tokens: number;
};

// 每次 API 响应后累加
export function accumulateUsage(totalUsage, responseUsage): NonNullableUsage {
return {
input_tokens: totalUsage.input_tokens + responseUsage.input_tokens,
output_tokens: totalUsage.output_tokens + responseUsage.output_tokens,
cache_creation_input_tokens: totalUsage.cache_creation_input_tokens +
(responseUsage.cache_creation_input_tokens || 0),
cache_read_input_tokens: totalUsage.cache_read_input_tokens +
(responseUsage.cache_read_input_tokens || 0),
};
}
打赏
  • 微信
  • 支付宝

评论