feat(review/codex): add Codex review engine with MCP tools

Add a new Codex-based review engine that runs OpenAI Codex CLI in
full-auto mode with a Streamable HTTP MCP server providing Gitea
review tools (get_pr_info, add_review_comment, add_review_summary,
get_file_content). Includes incremental review support via
lastReviewedHead in MCP context and review prompt.

Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode)
This commit is contained in:
jeffusion
2026-03-07 00:15:10 +08:00
committed by 路遥知码力
parent fdfd49be63
commit 614f66c433
5 changed files with 1031 additions and 1 deletions

View File

@@ -0,0 +1,167 @@
import config from '../../config';
import { logger } from '../../utils/logger';
import { LocalRepoManager } from '../context/local-repo-manager';
import { SandboxExec } from '../context/sandbox-exec';
import { FileReviewStore } from '../store/file-review-store';
import type { CommitReviewPayload, PullRequestReviewPayload, ReviewRun } from '../types';
import { CodexRunner } from './codex-runner';
/**
* Codex 审查引擎
*
* 与 agent ReviewEngine 类似的队列调度引擎,但使用 Codex CLI 执行审查。
* 复用 FileReviewStore 进行状态管理、LocalRepoManager 进行仓库准备。
*/
class CodexEngine {
private _store: FileReviewStore | null = null;
private started = false;
private activeRunsCount = 0;
private timer: ReturnType<typeof setInterval> | null = null;
private tickInProgress = false;
private get store(): FileReviewStore {
if (!this._store) {
this._store = new FileReviewStore(config.review.workdir);
}
return this._store;
}
private createSandboxExec(): SandboxExec {
return new SandboxExec(config.review.allowedCommands);
}
private createLocalRepoManager(sandboxExec: SandboxExec): LocalRepoManager {
return new LocalRepoManager(
config.review.workdir,
sandboxExec,
config.review.commandTimeoutMs,
config.gitea.accessToken
);
}
private createRunner(): CodexRunner {
const sandboxExec = this.createSandboxExec();
const localRepoManager = this.createLocalRepoManager(sandboxExec);
return new CodexRunner(this.store, localRepoManager);
}
async start(): Promise<void> {
if (this.started) {
return;
}
await this.store.init();
const recovered = await this.store.recoverInterruptedRuns();
if (recovered > 0) {
logger.warn('Codex Engine: 检测到未完成的审查任务,已重新入队', { recovered });
}
this.timer = setInterval(() => {
this.tick().catch((error) => {
logger.error('Codex Engine tick 失败', {
error: error instanceof Error ? error.message : String(error),
});
});
}, 1000);
this.started = true;
logger.info('Codex Review Engine 已启动', {
model: config.review.codexModel,
apiUrl: config.review.codexApiUrl,
});
}
async stop(): Promise<void> {
if (this.timer) {
clearInterval(this.timer);
this.timer = null;
}
this.started = false;
}
async enqueuePullRequest(
payload: PullRequestReviewPayload
): Promise<{ run: ReviewRun; reused: boolean }> {
await this.start();
return this.store.createOrReuseRun(payload);
}
async enqueueCommit(payload: CommitReviewPayload): Promise<{ run: ReviewRun; reused: boolean }> {
await this.start();
return this.store.createOrReuseRun(payload);
}
async listRuns(limit = 50): Promise<ReviewRun[]> {
return this.store.listRuns(limit);
}
async getRunDetails(
runId: string
): Promise<Awaited<ReturnType<FileReviewStore['getRunDetails']>>> {
return this.store.getRunDetails(runId);
}
getStore(): FileReviewStore {
return this.store;
}
private async tick(): Promise<void> {
if (this.tickInProgress) {
return;
}
this.tickInProgress = true;
try {
const maxParallel = config.review.maxParallelRuns;
if (this.activeRunsCount >= maxParallel) {
return;
}
while (this.activeRunsCount < maxParallel) {
const run = await this.store.acquireNextQueuedRun();
if (!run) {
break;
}
this.activeRunsCount++;
this.processRun(run).finally(() => {
this.activeRunsCount--;
});
}
} finally {
this.tickInProgress = false;
}
}
private async processRun(run: ReviewRun): Promise<void> {
logger.info('开始处理 Codex 审查任务', {
runId: run.id,
owner: run.owner,
repo: run.repo,
eventType: run.eventType,
activeRuns: this.activeRunsCount,
});
const runner = this.createRunner();
try {
await runner.execute(run);
const runDetails = await this.store.getRunDetails(run.id);
if (runDetails && runDetails.run.status !== 'ignored') {
await this.store.markRunSucceeded(run.id);
}
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
const failed = await this.store.markRunFailed(run.id, message);
if (!failed.requeued) {
logger.error('Codex 审查任务失败并达到重试上限', { runId: run.id, error: message });
} else {
logger.warn('Codex 审查任务失败,已重新入队重试', { runId: run.id, error: message });
}
}
}
}
export const codexEngine = new CodexEngine();

View File

@@ -0,0 +1,405 @@
import { mkdir, writeFile } from 'node:fs/promises';
import path from 'node:path';
import config from '../../config';
import { logger } from '../../utils/logger';
import type { LocalRepoManager, LocalRepoPaths } from '../context/local-repo-manager';
import type { FileReviewStore } from '../store/file-review-store';
import type { ReviewRun } from '../types';
import { type ReviewRunContext, mcpToolExecutor } from './mcp-tools';
// ---------------------------------------------------------------------------
// 默认审查提示词
// ---------------------------------------------------------------------------
const SYSTEM_INSTRUCTIONS = `你是一个专业的代码审查助手。请审查当前仓库中两个分支之间的代码变更。
## 审查步骤
1. **获取审查上下文**:调用 \`get_pr_info\` 工具获取 PR 信息owner、repo、PR number、base SHA、head SHA
2. **获取代码差异**:在终端中执行 \`git diff <baseSha>...<headSha>\` 命令,查看两个分支之间的完整差异。如果差异内容过多,可以先执行 \`git diff --stat <baseSha>...<headSha>\` 了解变更概况,再针对重要文件逐个查看。
3. **分析代码问题**:仔细审查差异中的代码,根据审查原则进行分析。
4. **发布审查结果**
- 调用 \`add_review_summary\` 发布整体审查总结
- 对发现的具体问题,调用 \`add_line_comment\` 在对应代码行添加评论
## 执行规则
- 这是自动化流程,不是对话。不要问候、不要寒暄、不要解释你将要做什么。
- 直接调用工具,不要在调用前描述你的计划或在调用后总结结果。
- 如果没有值得标记的问题,调用 add_review_summary 说明即可,不要额外输出。
- 所有审查工作完成后,回复"DONE"。不要输出其他内容。`;
const DEFAULT_REVIEW_GUIDELINES = `- 关注逻辑错误、潜在 bug、安全漏洞、性能问题、错误处理缺失
- 仅关注本次变更引入的问题,不要评论已有代码
- 只标记作者如果知道了一定会修复的真正问题
- 不要标记代码风格、格式化等非实质性问题
- 评论要简洁明了,直接说明问题和影响
- 如果没有发现值得标记的问题,在总结中说明代码变更看起来没有问题即可
- 行评论中的文件路径使用相对于仓库根目录的路径`;
/**
* 单次 Codex 审查执行器
*
* 负责:
* 1. 准备工作空间(复用 LocalRepoManager
* 2. 生成 .codex/config.toml含 MCP server 配置)
* 3. 注册 MCP 审查上下文
* 4. 启动 codex exec 子进程(自定义 prompt + MCP 工具)
* 5. 等待 Codex 通过 MCP 工具发布审查评论
* 6. 清理工作空间
*/
export class CodexRunner {
constructor(
private readonly store: FileReviewStore,
private readonly localRepoManager: LocalRepoManager
) {}
async execute(run: ReviewRun): Promise<void> {
const targetSha = run.headSha || run.commitSha;
if (!targetSha) {
await this.store.markRunIgnored(run.id, '缺少目标 sha');
return;
}
let repoPaths: LocalRepoPaths | null = null;
try {
// ── Step 1: 准备工作空间 ──────────────────────────────────
const workspaceStepStart = Date.now();
await this.store.addStep({
runId: run.id,
stepName: 'codex_prepare_workspace',
status: 'started',
startedAt: new Date(workspaceStepStart).toISOString(),
});
repoPaths = await this.localRepoManager.prepareWorkspace(
run.owner,
run.repo,
run.cloneUrl,
targetSha,
run.id,
run.headCloneUrl
);
await this.store.addStep({
runId: run.id,
stepName: 'codex_prepare_workspace',
status: 'succeeded',
startedAt: new Date(workspaceStepStart).toISOString(),
finishedAt: new Date().toISOString(),
latencyMs: Date.now() - workspaceStepStart,
});
// ── 增量审查基线解析 ─────────────────────────────────────────────────────
let lastReviewedHead: string | undefined;
if (run.eventType === 'pull_request' && run.prNumber) {
const snapshot = await this.localRepoManager.resolveReviewedRef(
repoPaths.mirrorPath,
run.prNumber
);
if (snapshot && targetSha) {
if (snapshot.baseSha === run.baseSha) {
// base 未变(追加 commit 或 force-push 修改 commit→ 增量审查
lastReviewedHead = snapshot.headSha;
logger.info('Codex 增量审查模式:使用上次审查快照', {
runId: run.id,
lastReviewedHead: snapshot.headSha,
currentHead: targetSha,
baseSha: run.baseSha,
});
} else {
// base 变了PR 分支做了 rebase→ 全量审查
logger.info('Codex PR base 已变更(可能 rebase回退全量审查', {
runId: run.id,
savedBaseSha: snapshot.baseSha,
currentBaseSha: run.baseSha,
});
}
}
}
// ── Step 2: 生成 .codex 配置 ───────────────────────────────
await this.generateCodexWorkspaceConfig(repoPaths.workspacePath, run.id);
// ── Step 3: 注册 MCP 上下文 ──────────────────────────────
const mcpContext: ReviewRunContext = {
runId: run.id,
owner: run.owner,
repo: run.repo,
prNumber: run.prNumber,
relatedPrNumber: run.relatedPrNumber,
commitSha: run.commitSha,
baseSha: run.baseSha,
headSha: run.headSha,
lastReviewedHead,
};
mcpToolExecutor.registerContext(mcpContext);
// ── Step 4: 执行 Codex CLI ────────────────────────────────
const codexStepStart = Date.now();
await this.store.addStep({
runId: run.id,
stepName: 'codex_review',
status: 'started',
startedAt: new Date(codexStepStart).toISOString(),
});
await this.runCodexProcess(repoPaths.workspacePath, run, lastReviewedHead);
await this.store.addStep({
runId: run.id,
stepName: 'codex_review',
status: 'succeeded',
startedAt: new Date(codexStepStart).toISOString(),
finishedAt: new Date().toISOString(),
latencyMs: Date.now() - codexStepStart,
});
logger.info('Codex 审查流程完成', {
runId: run.id,
owner: run.owner,
repo: run.repo,
});
// ── 审查成功:保存审查快照 ref ──────────────────────────────────────
if (run.eventType === 'pull_request' && run.prNumber && targetSha) {
try {
await this.localRepoManager.saveReviewedRef(
repoPaths!.mirrorPath,
run.prNumber,
run.baseSha!,
targetSha
);
} catch (refError) {
logger.warn('Codex 保存审查快照 ref 失败(非致命)', {
runId: run.id,
error: refError instanceof Error ? refError.message : String(refError),
});
}
}
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
await this.store.addStep({
runId: run.id,
stepName: 'codex_review',
status: 'failed',
startedAt: new Date().toISOString(),
finishedAt: new Date().toISOString(),
error: message,
});
throw error;
} finally {
// 清理 MCP 上下文
mcpToolExecutor.unregisterContext(run.id);
// 清理工作空间
if (repoPaths) {
await this.localRepoManager.cleanupWorkspace(repoPaths);
}
}
}
private async generateCodexWorkspaceConfig(workspacePath: string, runId: string): Promise<void> {
const codexConfigDir = path.join(workspacePath, '.codex');
await mkdir(codexConfigDir, { recursive: true });
const port = config.app.port;
const model = config.review.codexModel;
const apiUrl = this.normalizeApiBaseUrl(config.review.codexApiUrl);
const apiKey = config.review.codexApiKey;
// 生成 TOML 配置(含 MCP server
const tomlLines: string[] = [
`model = "${model}"`,
`model_verbosity = "low"`,
'',
'[model_providers.openai]',
`name = "OpenAI"`,
`base_url = "${apiUrl}"`,
`env_key = "OPENAI_API_KEY"`,
`requires_openai_auth = false`,
'',
'[mcp_servers.gitea-review]',
`url = "http://127.0.0.1:${port}/mcp/gitea-review"`,
`http_headers = { "X-Review-Run-Id" = "${runId}" }`,
`required = true`,
'',
];
await writeFile(path.join(codexConfigDir, 'config.toml'), tomlLines.join('\n'), 'utf-8');
if (apiKey?.trim()) {
const authJson = {
auth_mode: 'api_key',
OPENAI_API_KEY: apiKey,
};
await writeFile(path.join(codexConfigDir, 'auth.json'), JSON.stringify(authJson), 'utf-8');
}
logger.debug('已生成 Codex 配置文件', {
runId,
configPath: path.join(codexConfigDir, 'config.toml'),
model,
apiUrl,
authMode: apiKey?.trim() ? 'env+auth.json' : 'missing_api_key',
});
}
private normalizeApiBaseUrl(rawUrl: string): string {
const trimmed = rawUrl.trim().replace(/\/+$/, '');
if (!trimmed) {
return 'https://api.openai.com/v1';
}
return trimmed.endsWith('/v1') ? trimmed : `${trimmed}/v1`;
}
/**
* 构建审查提示词
*/
private buildReviewPrompt(run: ReviewRun, lastReviewedHead?: string): string {
const customPrompt = config.review.codexReviewPrompt?.trim();
const globalPrompt = config.review.globalPrompt?.trim();
const projectPrompt = this.resolveProjectPrompt(run);
const sections: string[] = [];
sections.push(SYSTEM_INSTRUCTIONS);
sections.push(`## 审查原则\n\n${customPrompt || DEFAULT_REVIEW_GUIDELINES}`);
if (globalPrompt) {
sections.push(`## 全局审查要求\n\n${globalPrompt}`);
}
if (projectPrompt) {
sections.push(`## 项目级审查要求\n\n${projectPrompt}`);
}
sections.push(
'当要求冲突时,优先级为:项目级审查要求 > 全局审查要求 > 审查原则。'
);
const contextLines: string[] = ['## 当前审查目标'];
if (run.eventType === 'pull_request') {
contextLines.push(' 类型Pull Request');
if (run.prNumber) contextLines.push(`- PR 编号:#${run.prNumber}`);
if (run.baseSha) contextLines.push(`- Base SHA${run.baseSha}`);
if (run.headSha) contextLines.push(`- Head SHA${run.headSha}`);
if (lastReviewedHead) {
contextLines.push(`- 增量审查模式:仅审查上次审查后的新变更`);
contextLines.push(`- 上次审查 SHA${lastReviewedHead}`);
contextLines.push(`- 请使用 \`git diff ${lastReviewedHead}..${run.headSha}\` 获取增量差异`);
} else {
contextLines.push(`- 请使用 \`git diff ${run.baseSha}...${run.headSha}\` 获取差异`);
}
} else if (run.eventType === 'commit_status') {
contextLines.push('- 类型Commit');
if (run.commitSha) {
contextLines.push(`- Commit SHA${run.commitSha}`);
contextLines.push(`- 请使用 \`git diff ${run.commitSha}~1...${run.commitSha}\` 获取差异`);
}
if (run.commitMessage) contextLines.push(`- Commit 信息:${run.commitMessage}`);
}
sections.push(contextLines.join('\n'));
return sections.join('\n\n');
}
private resolveProjectPrompt(_run: ReviewRun): string | undefined {
return undefined;
}
/**
* 执行 codex exec 子进程(自定义 prompt + MCP 工具)
*/
private async runCodexProcess(workspacePath: string, run: ReviewRun, lastReviewedHead?: string): Promise<void> {
const timeoutMs = config.review.codexTimeoutMs;
const codexHome = path.join(workspacePath, '.codex');
// 构建审查提示词
const prompt = this.buildReviewPrompt(run, lastReviewedHead);
// 构建命令参数codex exec --full-auto --ephemeral [PROMPT]
const args: string[] = ['exec', '--full-auto', '--ephemeral', prompt];
logger.info('启动 Codex CLI', {
runId: run.id,
args: ['codex', 'exec', '--full-auto', '--ephemeral', '<prompt>'].join(' '),
promptLength: prompt.length,
cwd: workspacePath,
timeoutMs,
});
const proc = Bun.spawn(['codex', ...args], {
cwd: workspacePath,
stdin: 'ignore',
stdout: 'pipe',
stderr: 'pipe',
env: {
...process.env,
CODEX_API_KEY: config.review.codexApiKey || '',
OPENAI_API_KEY: config.review.codexApiKey || '',
OPENAI_BASE_URL: this.normalizeApiBaseUrl(config.review.codexApiUrl),
CODEX_HOME: codexHome,
},
});
// 超时控制
const timeoutPromise = new Promise<never>((_, reject) => {
setTimeout(() => {
proc.kill();
reject(new Error(`Codex 审查超时(${timeoutMs}ms`));
}, timeoutMs);
});
// 等待进程完成
const processPromise = (async () => {
let fullStderr = '';
// 并行读取 stdout 和 stderr
const readStderr = (async () => {
try {
const text = await new Response(proc.stderr).text();
fullStderr = text;
// 按行输出到日志
for (const line of text.split('\n')) {
if (line.trim()) {
logger.debug('Codex 输出', { runId: run.id, line: line.substring(0, 500) });
}
}
} catch {
// stream 读取错误不是致命的
}
})();
const readStdout = (async () => {
try {
const text = await new Response(proc.stdout).text();
for (const line of text.split('\n')) {
if (line.trim()) {
logger.debug('Codex 结果', { runId: run.id, line: line.substring(0, 500) });
}
}
} catch {
// stream 读取错误不是致命的
}
})();
// 等待读取完成 + 进程退出
await Promise.all([readStderr, readStdout]);
const exitCode = await proc.exited;
if (exitCode !== 0) {
throw new Error(
`Codex 进程退出码 ${exitCode}${fullStderr ? `\nstderr: ${fullStderr.substring(0, 2000)}` : ''}`
);
}
logger.info('Codex 进程结束', { runId: run.id, exitCode });
})();
await Promise.race([processPromise, timeoutPromise]);
}
}

View File

@@ -0,0 +1,187 @@
import { Hono } from 'hono';
import { logger } from '../../utils/logger';
import { MCP_TOOLS, mcpToolExecutor } from './mcp-tools';
/**
* Streamable HTTP MCP handler
*
* Codex 通过 StreamableHttp transport 发送标准 JSON-RPC 2.0 请求。
* 我们实现最小子集initialize、tools/list、tools/call。
*
* 路由挂载到 /mcp/gitea-review
*/
interface JsonRpcMessage {
jsonrpc: '2.0';
id?: string | number | null;
method: string;
params?: Record<string, unknown>;
}
function jsonRpcResponse(id: string | number | null, result: unknown) {
return {
jsonrpc: '2.0' as const,
id,
result,
};
}
function jsonRpcError(id: string | number | null, code: number, message: string) {
return {
jsonrpc: '2.0' as const,
id,
error: { code, message },
};
}
const SERVER_INFO = {
name: 'gitea-review',
version: '1.0.0',
};
const SERVER_CAPABILITIES = {
tools: {},
};
export const mcpRouter = new Hono();
/**
* POST /mcp/gitea-review — 处理所有 MCP JSON-RPC 请求
*
* Codex StreamableHttp transport 通过 POST 发送 JSON-RPC
* 通过 X-Review-Run-Id header 区分审查会话。
*/
mcpRouter.post('/', async (c) => {
const runId = c.req.header('X-Review-Run-Id') || '';
let body: JsonRpcMessage | JsonRpcMessage[];
try {
body = await c.req.json();
} catch {
return c.json(jsonRpcError(null, -32700, 'Parse error'), 400);
}
const messages = Array.isArray(body) ? body : [body];
// 按 MCP Streamable HTTP 规范:
// - 通知(没有 id不期待响应
// - 如果所有消息都是通知,返回 202 Accepted 空 body
// - 如果包含请求(有 id返回对应响应
const requests: JsonRpcMessage[] = [];
const notifications: JsonRpcMessage[] = [];
for (const msg of messages) {
if (msg.id !== undefined && msg.id !== null) {
requests.push(msg);
} else {
notifications.push(msg);
}
}
// 处理通知(仅日志,不返回响应)
for (const notif of notifications) {
handleNotification(runId, notif);
}
// 如果没有需要响应的请求,返回 202 Accepted
if (requests.length === 0) {
return c.body(null, 202);
}
// 处理请求
const results = await Promise.all(requests.map((req) => handleRequest(runId, req)));
// 单个请求返回单个对象,批量返回数组
if (!Array.isArray(body)) {
return c.json(results[0]);
}
return c.json(results);
});
/**
* GET /mcp/gitea-review — SSE 端点Streamable HTTP MCP 的 server-initiated 通道)
*
* Codex 在连接时会先 GET 此端点建立 SSE 连接。
* 我们的 MCP server 不需要主动推送,所以保持连接存活即可。
*/
mcpRouter.get('/', (c) => {
// 返回 SSE 流保持连接Codex 需要此端点存在)
c.header('Content-Type', 'text/event-stream');
c.header('Cache-Control', 'no-cache');
c.header('Connection', 'keep-alive');
// 发送一个空的 SSE 注释保持连接
return new Response(
new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode(': keepalive\n\n'));
// 不关闭 — Codex 会自行断开
},
}),
{
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
},
}
);
});
/**
* DELETE /mcp/gitea-review — 会话结束信号
*/
mcpRouter.delete('/', (c) => {
return c.json({ ok: true });
});
/** 处理 JSON-RPC 通知(无 id不返回响应 */
function handleNotification(runId: string, notif: JsonRpcMessage): void {
logger.debug('MCP JSON-RPC 通知', { method: notif.method, runId });
// notifications/initialized 等通知仅记录日志,无需其他处理
}
/** 处理 JSON-RPC 请求(有 id需要返回响应 */
async function handleRequest(runId: string, req: JsonRpcMessage) {
if (!req.jsonrpc || req.jsonrpc !== '2.0') {
return jsonRpcError(req.id ?? null, -32600, 'Invalid Request: not JSON-RPC 2.0');
}
logger.debug('MCP JSON-RPC 请求', { method: req.method, runId, id: req.id });
switch (req.method) {
case 'initialize':
return jsonRpcResponse(req.id!, {
protocolVersion: '2025-03-26',
capabilities: SERVER_CAPABILITIES,
serverInfo: SERVER_INFO,
});
case 'tools/list':
return jsonRpcResponse(req.id!, {
tools: MCP_TOOLS,
});
case 'tools/call': {
const params = req.params as
| { name: string; arguments?: Record<string, unknown> }
| undefined;
if (!params?.name) {
return jsonRpcError(req.id!, -32602, 'Invalid params: missing tool name');
}
if (!runId) {
return jsonRpcError(req.id!, -32602, 'Missing X-Review-Run-Id header');
}
const result = await mcpToolExecutor.callTool(runId, params.name, params.arguments || {});
return jsonRpcResponse(req.id!, result);
}
case 'ping':
return jsonRpcResponse(req.id!, {});
default:
return jsonRpcError(req.id!, -32601, `Method not found: ${req.method}`);
}
}

View File

@@ -0,0 +1,271 @@
import { giteaService } from '../../services/gitea';
import { logger } from '../../utils/logger';
import type { FileReviewStore } from '../store/file-review-store';
/**
* MCP 工具定义 — 描述 Codex 可以调用的工具
*/
export const MCP_TOOLS = [
{
name: 'get_pr_info',
description:
'获取当前 Pull Request 或 Commit 的元信息,包括 owner、repo、PR number、base SHA、head SHA 等。调用此工具了解审查目标的上下文。',
inputSchema: {
type: 'object' as const,
properties: {},
required: [] as string[],
},
},
{
name: 'add_review_summary',
description:
'发布代码审查总结评论到 Pull Request 或 Commit。在完成所有代码分析后调用此工具提交你的总体审查结论。',
inputSchema: {
type: 'object' as const,
properties: {
summary: {
type: 'string',
description: '审查总结内容Markdown 格式)',
},
},
required: ['summary'],
},
},
{
name: 'add_line_comment',
description: '对代码的特定行添加审查评论。仅针对发现严重问题的代码行调用此工具。',
inputSchema: {
type: 'object' as const,
properties: {
path: {
type: 'string',
description: '文件路径(相对于仓库根目录)',
},
line: {
type: 'number',
description: '代码行号(新文件中的行号)',
},
comment: {
type: 'string',
description: '评论内容',
},
},
required: ['path', 'line', 'comment'],
},
},
];
/**
* 审查上下文 — 由 CodexRunner 创建,传递给 MCP handler
*/
export interface ReviewRunContext {
runId: string;
owner: string;
repo: string;
prNumber?: number;
relatedPrNumber?: number;
commitSha?: string;
baseSha?: string;
headSha?: string;
lastReviewedHead?: string;
}
/**
* MCP 工具执行器
*/
export class McpToolExecutor {
/** 活跃的审查上下文,按 runId 索引 */
private contexts = new Map<string, ReviewRunContext>();
constructor(private readonly store?: FileReviewStore) {}
registerContext(ctx: ReviewRunContext): void {
this.contexts.set(ctx.runId, ctx);
logger.debug('MCP 注册审查上下文', { runId: ctx.runId, owner: ctx.owner, repo: ctx.repo });
}
unregisterContext(runId: string): void {
this.contexts.delete(runId);
logger.debug('MCP 注销审查上下文', { runId });
}
getContext(runId: string): ReviewRunContext | undefined {
return this.contexts.get(runId);
}
/**
* 执行 MCP 工具调用
*/
async callTool(
runId: string,
toolName: string,
args: Record<string, unknown>
): Promise<{ content: Array<{ type: string; text: string }>; isError?: boolean }> {
const ctx = this.contexts.get(runId);
if (!ctx) {
return {
content: [{ type: 'text', text: `错误:找不到审查上下文 (runId=${runId})` }],
isError: true,
};
}
try {
switch (toolName) {
case 'get_pr_info':
return this.handleGetPrInfo(ctx);
case 'add_review_summary':
return await this.handleAddReviewSummary(ctx, args as { summary: string });
case 'add_line_comment':
return await this.handleAddLineComment(
ctx,
args as { path: string; line: number; comment: string }
);
default:
return {
content: [{ type: 'text', text: `未知工具: ${toolName}` }],
isError: true,
};
}
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
logger.error('MCP 工具调用失败', { runId, toolName, error: message });
return {
content: [{ type: 'text', text: `工具执行失败: ${message}` }],
isError: true,
};
}
}
private handleGetPrInfo(ctx: ReviewRunContext) {
const info: Record<string, unknown> = {
owner: ctx.owner,
repo: ctx.repo,
prNumber: ctx.prNumber,
baseSha: ctx.baseSha,
headSha: ctx.headSha,
commitSha: ctx.commitSha || ctx.headSha,
};
if (ctx.lastReviewedHead) {
info.lastReviewedHead = ctx.lastReviewedHead;
info.reviewMode = 'incremental';
} else {
info.reviewMode = 'full';
}
return {
content: [{ type: 'text', text: JSON.stringify(info, null, 2) }],
};
}
private async handleAddReviewSummary(ctx: ReviewRunContext, args: { summary: string }) {
const body = `## AI \u4ee3\u7801\u5ba1\u67e5\u7ed3\u679c\n\n${args.summary}`;
// \u4f18\u5148\u901a\u8fc7 PR \u53d1\u5e03\u8bc4\u8bba
let prNumber = ctx.prNumber;
// \u5982\u679c\u6ca1\u6709\u76f4\u63a5\u7684 prNumber\uff0c\u5c1d\u8bd5\u901a\u8fc7 relatedPrNumber \u6216 API \u67e5\u627e\u5173\u8054 PR
if (!prNumber) {
prNumber = ctx.relatedPrNumber;
if (!prNumber && ctx.commitSha) {
const related = await giteaService.getRelatedPullRequest(ctx.owner, ctx.repo, ctx.commitSha);
prNumber = related?.number;
}
}
if (prNumber) {
await giteaService.addPullRequestComment(ctx.owner, ctx.repo, prNumber, body);
logger.info('Codex MCP: \u5df2\u53d1\u5e03 PR \u5ba1\u67e5\u603b\u7ed3', { runId: ctx.runId, prNumber });
} else if (ctx.commitSha) {
await giteaService.addCommitComment(ctx.owner, ctx.repo, ctx.commitSha, body);
logger.info('Codex MCP: \u5df2\u53d1\u5e03 Commit \u5ba1\u67e5\u603b\u7ed3', {
runId: ctx.runId,
commitSha: ctx.commitSha,
});
} else {
return {
content: [{ type: 'text', text: '\u65e0\u6cd5\u53d1\u5e03\uff1a\u7f3a\u5c11 PR number \u6216 commit SHA' }],
isError: true,
};
}
// 记录到 store
if (this.store) {
try {
await this.store.addCommentRecord({
runId: ctx.runId,
status: 'published',
body,
});
} catch (storeError) {
logger.warn('MCP: 持久化 summary comment record 失败(非致命)', {
runId: ctx.runId,
error: storeError instanceof Error ? storeError.message : String(storeError),
});
}
}
return {
content: [{ type: 'text', text: '审查总结已发布成功' }],
};
}
private async handleAddLineComment(
ctx: ReviewRunContext,
args: { path: string; line: number; comment: string }
) {
const commitId = ctx.headSha || ctx.commitSha;
if (!commitId) {
return {
content: [{ type: 'text', text: '无法添加行评论:缺少 commit SHA' }],
isError: true,
};
}
let prNumber = ctx.prNumber || ctx.relatedPrNumber;
if (!prNumber) {
const related = await giteaService.getRelatedPullRequest(ctx.owner, ctx.repo, commitId);
prNumber = related?.number;
}
if (!prNumber) {
return {
content: [{ type: 'text', text: '无法添加行评论:找不到关联的 Pull Request' }],
isError: true,
};
}
await giteaService.addLineComments(ctx.owner, ctx.repo, prNumber, commitId, [
{ path: args.path, line: args.line, comment: args.comment },
]);
logger.info('Codex MCP: 已发布行评论', {
runId: ctx.runId,
path: args.path,
line: args.line,
});
// 记录到 store
if (this.store) {
try {
await this.store.addCommentRecord({
runId: ctx.runId,
status: 'published',
path: args.path,
line: args.line,
body: args.comment,
});
} catch (storeError) {
logger.warn('MCP: 持久化 line comment record 失败(非致命)', {
runId: ctx.runId,
error: storeError instanceof Error ? storeError.message : String(storeError),
});
}
}
return {
content: [{ type: 'text', text: `行评论已添加: ${args.path}:${args.line}` }],
};
}
}
/** 全局单例 */
export const mcpToolExecutor = new McpToolExecutor();

View File

@@ -1,4 +1,4 @@
export type ReviewEngineMode = 'legacy' | 'agent';
export type ReviewEngineMode = 'legacy' | 'agent' | 'codex';
export type ReviewEventType = 'pull_request' | 'commit_status';