mirror of
https://github.com/jeffusion/gitea-ai-assistant.git
synced 2026-03-27 10:05:50 +00:00
feat(review/codex): add Codex review engine with MCP tools
Add a new Codex-based review engine that runs OpenAI Codex CLI in full-auto mode with a Streamable HTTP MCP server providing Gitea review tools (get_pr_info, add_review_comment, add_review_summary, get_file_content). Includes incremental review support via lastReviewedHead in MCP context and review prompt. Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode)
This commit is contained in:
167
src/review/codex/codex-engine.ts
Normal file
167
src/review/codex/codex-engine.ts
Normal file
@@ -0,0 +1,167 @@
|
||||
import config from '../../config';
|
||||
import { logger } from '../../utils/logger';
|
||||
import { LocalRepoManager } from '../context/local-repo-manager';
|
||||
import { SandboxExec } from '../context/sandbox-exec';
|
||||
import { FileReviewStore } from '../store/file-review-store';
|
||||
import type { CommitReviewPayload, PullRequestReviewPayload, ReviewRun } from '../types';
|
||||
import { CodexRunner } from './codex-runner';
|
||||
|
||||
/**
|
||||
* Codex 审查引擎
|
||||
*
|
||||
* 与 agent ReviewEngine 类似的队列调度引擎,但使用 Codex CLI 执行审查。
|
||||
* 复用 FileReviewStore 进行状态管理、LocalRepoManager 进行仓库准备。
|
||||
*/
|
||||
class CodexEngine {
|
||||
private _store: FileReviewStore | null = null;
|
||||
private started = false;
|
||||
private activeRunsCount = 0;
|
||||
private timer: ReturnType<typeof setInterval> | null = null;
|
||||
private tickInProgress = false;
|
||||
|
||||
private get store(): FileReviewStore {
|
||||
if (!this._store) {
|
||||
this._store = new FileReviewStore(config.review.workdir);
|
||||
}
|
||||
return this._store;
|
||||
}
|
||||
|
||||
private createSandboxExec(): SandboxExec {
|
||||
return new SandboxExec(config.review.allowedCommands);
|
||||
}
|
||||
|
||||
private createLocalRepoManager(sandboxExec: SandboxExec): LocalRepoManager {
|
||||
return new LocalRepoManager(
|
||||
config.review.workdir,
|
||||
sandboxExec,
|
||||
config.review.commandTimeoutMs,
|
||||
config.gitea.accessToken
|
||||
);
|
||||
}
|
||||
|
||||
private createRunner(): CodexRunner {
|
||||
const sandboxExec = this.createSandboxExec();
|
||||
const localRepoManager = this.createLocalRepoManager(sandboxExec);
|
||||
return new CodexRunner(this.store, localRepoManager);
|
||||
}
|
||||
|
||||
async start(): Promise<void> {
|
||||
if (this.started) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
await this.store.init();
|
||||
const recovered = await this.store.recoverInterruptedRuns();
|
||||
if (recovered > 0) {
|
||||
logger.warn('Codex Engine: 检测到未完成的审查任务,已重新入队', { recovered });
|
||||
}
|
||||
|
||||
this.timer = setInterval(() => {
|
||||
this.tick().catch((error) => {
|
||||
logger.error('Codex Engine tick 失败', {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
});
|
||||
}, 1000);
|
||||
|
||||
this.started = true;
|
||||
logger.info('Codex Review Engine 已启动', {
|
||||
model: config.review.codexModel,
|
||||
apiUrl: config.review.codexApiUrl,
|
||||
});
|
||||
}
|
||||
|
||||
async stop(): Promise<void> {
|
||||
if (this.timer) {
|
||||
clearInterval(this.timer);
|
||||
this.timer = null;
|
||||
}
|
||||
this.started = false;
|
||||
}
|
||||
|
||||
async enqueuePullRequest(
|
||||
payload: PullRequestReviewPayload
|
||||
): Promise<{ run: ReviewRun; reused: boolean }> {
|
||||
await this.start();
|
||||
return this.store.createOrReuseRun(payload);
|
||||
}
|
||||
|
||||
async enqueueCommit(payload: CommitReviewPayload): Promise<{ run: ReviewRun; reused: boolean }> {
|
||||
await this.start();
|
||||
return this.store.createOrReuseRun(payload);
|
||||
}
|
||||
|
||||
async listRuns(limit = 50): Promise<ReviewRun[]> {
|
||||
return this.store.listRuns(limit);
|
||||
}
|
||||
|
||||
async getRunDetails(
|
||||
runId: string
|
||||
): Promise<Awaited<ReturnType<FileReviewStore['getRunDetails']>>> {
|
||||
return this.store.getRunDetails(runId);
|
||||
}
|
||||
|
||||
getStore(): FileReviewStore {
|
||||
return this.store;
|
||||
}
|
||||
|
||||
private async tick(): Promise<void> {
|
||||
if (this.tickInProgress) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.tickInProgress = true;
|
||||
try {
|
||||
const maxParallel = config.review.maxParallelRuns;
|
||||
if (this.activeRunsCount >= maxParallel) {
|
||||
return;
|
||||
}
|
||||
|
||||
while (this.activeRunsCount < maxParallel) {
|
||||
const run = await this.store.acquireNextQueuedRun();
|
||||
if (!run) {
|
||||
break;
|
||||
}
|
||||
|
||||
this.activeRunsCount++;
|
||||
this.processRun(run).finally(() => {
|
||||
this.activeRunsCount--;
|
||||
});
|
||||
}
|
||||
} finally {
|
||||
this.tickInProgress = false;
|
||||
}
|
||||
}
|
||||
|
||||
private async processRun(run: ReviewRun): Promise<void> {
|
||||
logger.info('开始处理 Codex 审查任务', {
|
||||
runId: run.id,
|
||||
owner: run.owner,
|
||||
repo: run.repo,
|
||||
eventType: run.eventType,
|
||||
activeRuns: this.activeRunsCount,
|
||||
});
|
||||
|
||||
const runner = this.createRunner();
|
||||
|
||||
try {
|
||||
await runner.execute(run);
|
||||
|
||||
const runDetails = await this.store.getRunDetails(run.id);
|
||||
if (runDetails && runDetails.run.status !== 'ignored') {
|
||||
await this.store.markRunSucceeded(run.id);
|
||||
}
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
const failed = await this.store.markRunFailed(run.id, message);
|
||||
if (!failed.requeued) {
|
||||
logger.error('Codex 审查任务失败并达到重试上限', { runId: run.id, error: message });
|
||||
} else {
|
||||
logger.warn('Codex 审查任务失败,已重新入队重试', { runId: run.id, error: message });
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const codexEngine = new CodexEngine();
|
||||
405
src/review/codex/codex-runner.ts
Normal file
405
src/review/codex/codex-runner.ts
Normal file
@@ -0,0 +1,405 @@
|
||||
import { mkdir, writeFile } from 'node:fs/promises';
|
||||
import path from 'node:path';
|
||||
import config from '../../config';
|
||||
import { logger } from '../../utils/logger';
|
||||
import type { LocalRepoManager, LocalRepoPaths } from '../context/local-repo-manager';
|
||||
import type { FileReviewStore } from '../store/file-review-store';
|
||||
import type { ReviewRun } from '../types';
|
||||
import { type ReviewRunContext, mcpToolExecutor } from './mcp-tools';
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// 默认审查提示词
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const SYSTEM_INSTRUCTIONS = `你是一个专业的代码审查助手。请审查当前仓库中两个分支之间的代码变更。
|
||||
|
||||
## 审查步骤
|
||||
|
||||
1. **获取审查上下文**:调用 \`get_pr_info\` 工具获取 PR 信息(owner、repo、PR number、base SHA、head SHA)。
|
||||
2. **获取代码差异**:在终端中执行 \`git diff <baseSha>...<headSha>\` 命令,查看两个分支之间的完整差异。如果差异内容过多,可以先执行 \`git diff --stat <baseSha>...<headSha>\` 了解变更概况,再针对重要文件逐个查看。
|
||||
3. **分析代码问题**:仔细审查差异中的代码,根据审查原则进行分析。
|
||||
4. **发布审查结果**:
|
||||
- 调用 \`add_review_summary\` 发布整体审查总结
|
||||
- 对发现的具体问题,调用 \`add_line_comment\` 在对应代码行添加评论
|
||||
|
||||
## 执行规则
|
||||
|
||||
- 这是自动化流程,不是对话。不要问候、不要寒暄、不要解释你将要做什么。
|
||||
- 直接调用工具,不要在调用前描述你的计划或在调用后总结结果。
|
||||
- 如果没有值得标记的问题,调用 add_review_summary 说明即可,不要额外输出。
|
||||
- 所有审查工作完成后,回复"DONE"。不要输出其他内容。`;
|
||||
|
||||
const DEFAULT_REVIEW_GUIDELINES = `- 关注逻辑错误、潜在 bug、安全漏洞、性能问题、错误处理缺失
|
||||
- 仅关注本次变更引入的问题,不要评论已有代码
|
||||
- 只标记作者如果知道了一定会修复的真正问题
|
||||
- 不要标记代码风格、格式化等非实质性问题
|
||||
- 评论要简洁明了,直接说明问题和影响
|
||||
- 如果没有发现值得标记的问题,在总结中说明代码变更看起来没有问题即可
|
||||
- 行评论中的文件路径使用相对于仓库根目录的路径`;
|
||||
|
||||
/**
|
||||
* 单次 Codex 审查执行器
|
||||
*
|
||||
* 负责:
|
||||
* 1. 准备工作空间(复用 LocalRepoManager)
|
||||
* 2. 生成 .codex/config.toml(含 MCP server 配置)
|
||||
* 3. 注册 MCP 审查上下文
|
||||
* 4. 启动 codex exec 子进程(自定义 prompt + MCP 工具)
|
||||
* 5. 等待 Codex 通过 MCP 工具发布审查评论
|
||||
* 6. 清理工作空间
|
||||
*/
|
||||
export class CodexRunner {
|
||||
constructor(
|
||||
private readonly store: FileReviewStore,
|
||||
private readonly localRepoManager: LocalRepoManager
|
||||
) {}
|
||||
|
||||
async execute(run: ReviewRun): Promise<void> {
|
||||
const targetSha = run.headSha || run.commitSha;
|
||||
if (!targetSha) {
|
||||
await this.store.markRunIgnored(run.id, '缺少目标 sha');
|
||||
return;
|
||||
}
|
||||
|
||||
let repoPaths: LocalRepoPaths | null = null;
|
||||
|
||||
try {
|
||||
// ── Step 1: 准备工作空间 ──────────────────────────────────
|
||||
const workspaceStepStart = Date.now();
|
||||
await this.store.addStep({
|
||||
runId: run.id,
|
||||
stepName: 'codex_prepare_workspace',
|
||||
status: 'started',
|
||||
startedAt: new Date(workspaceStepStart).toISOString(),
|
||||
});
|
||||
|
||||
repoPaths = await this.localRepoManager.prepareWorkspace(
|
||||
run.owner,
|
||||
run.repo,
|
||||
run.cloneUrl,
|
||||
targetSha,
|
||||
run.id,
|
||||
run.headCloneUrl
|
||||
);
|
||||
|
||||
await this.store.addStep({
|
||||
runId: run.id,
|
||||
stepName: 'codex_prepare_workspace',
|
||||
status: 'succeeded',
|
||||
startedAt: new Date(workspaceStepStart).toISOString(),
|
||||
finishedAt: new Date().toISOString(),
|
||||
latencyMs: Date.now() - workspaceStepStart,
|
||||
});
|
||||
|
||||
// ── 增量审查基线解析 ─────────────────────────────────────────────────────
|
||||
let lastReviewedHead: string | undefined;
|
||||
if (run.eventType === 'pull_request' && run.prNumber) {
|
||||
const snapshot = await this.localRepoManager.resolveReviewedRef(
|
||||
repoPaths.mirrorPath,
|
||||
run.prNumber
|
||||
);
|
||||
if (snapshot && targetSha) {
|
||||
if (snapshot.baseSha === run.baseSha) {
|
||||
// base 未变(追加 commit 或 force-push 修改 commit)→ 增量审查
|
||||
lastReviewedHead = snapshot.headSha;
|
||||
logger.info('Codex 增量审查模式:使用上次审查快照', {
|
||||
runId: run.id,
|
||||
lastReviewedHead: snapshot.headSha,
|
||||
currentHead: targetSha,
|
||||
baseSha: run.baseSha,
|
||||
});
|
||||
} else {
|
||||
// base 变了(PR 分支做了 rebase)→ 全量审查
|
||||
logger.info('Codex PR base 已变更(可能 rebase),回退全量审查', {
|
||||
runId: run.id,
|
||||
savedBaseSha: snapshot.baseSha,
|
||||
currentBaseSha: run.baseSha,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── Step 2: 生成 .codex 配置 ───────────────────────────────
|
||||
await this.generateCodexWorkspaceConfig(repoPaths.workspacePath, run.id);
|
||||
|
||||
// ── Step 3: 注册 MCP 上下文 ──────────────────────────────
|
||||
const mcpContext: ReviewRunContext = {
|
||||
runId: run.id,
|
||||
owner: run.owner,
|
||||
repo: run.repo,
|
||||
prNumber: run.prNumber,
|
||||
relatedPrNumber: run.relatedPrNumber,
|
||||
commitSha: run.commitSha,
|
||||
baseSha: run.baseSha,
|
||||
headSha: run.headSha,
|
||||
lastReviewedHead,
|
||||
};
|
||||
mcpToolExecutor.registerContext(mcpContext);
|
||||
|
||||
// ── Step 4: 执行 Codex CLI ────────────────────────────────
|
||||
const codexStepStart = Date.now();
|
||||
await this.store.addStep({
|
||||
runId: run.id,
|
||||
stepName: 'codex_review',
|
||||
status: 'started',
|
||||
startedAt: new Date(codexStepStart).toISOString(),
|
||||
});
|
||||
|
||||
await this.runCodexProcess(repoPaths.workspacePath, run, lastReviewedHead);
|
||||
|
||||
await this.store.addStep({
|
||||
runId: run.id,
|
||||
stepName: 'codex_review',
|
||||
status: 'succeeded',
|
||||
startedAt: new Date(codexStepStart).toISOString(),
|
||||
finishedAt: new Date().toISOString(),
|
||||
latencyMs: Date.now() - codexStepStart,
|
||||
});
|
||||
|
||||
logger.info('Codex 审查流程完成', {
|
||||
runId: run.id,
|
||||
owner: run.owner,
|
||||
repo: run.repo,
|
||||
});
|
||||
|
||||
// ── 审查成功:保存审查快照 ref ──────────────────────────────────────
|
||||
if (run.eventType === 'pull_request' && run.prNumber && targetSha) {
|
||||
try {
|
||||
await this.localRepoManager.saveReviewedRef(
|
||||
repoPaths!.mirrorPath,
|
||||
run.prNumber,
|
||||
run.baseSha!,
|
||||
targetSha
|
||||
);
|
||||
} catch (refError) {
|
||||
logger.warn('Codex 保存审查快照 ref 失败(非致命)', {
|
||||
runId: run.id,
|
||||
error: refError instanceof Error ? refError.message : String(refError),
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
await this.store.addStep({
|
||||
runId: run.id,
|
||||
stepName: 'codex_review',
|
||||
status: 'failed',
|
||||
startedAt: new Date().toISOString(),
|
||||
finishedAt: new Date().toISOString(),
|
||||
error: message,
|
||||
});
|
||||
throw error;
|
||||
} finally {
|
||||
// 清理 MCP 上下文
|
||||
mcpToolExecutor.unregisterContext(run.id);
|
||||
|
||||
// 清理工作空间
|
||||
if (repoPaths) {
|
||||
await this.localRepoManager.cleanupWorkspace(repoPaths);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async generateCodexWorkspaceConfig(workspacePath: string, runId: string): Promise<void> {
|
||||
const codexConfigDir = path.join(workspacePath, '.codex');
|
||||
await mkdir(codexConfigDir, { recursive: true });
|
||||
|
||||
const port = config.app.port;
|
||||
const model = config.review.codexModel;
|
||||
const apiUrl = this.normalizeApiBaseUrl(config.review.codexApiUrl);
|
||||
const apiKey = config.review.codexApiKey;
|
||||
|
||||
// 生成 TOML 配置(含 MCP server)
|
||||
const tomlLines: string[] = [
|
||||
`model = "${model}"`,
|
||||
`model_verbosity = "low"`,
|
||||
'',
|
||||
'[model_providers.openai]',
|
||||
`name = "OpenAI"`,
|
||||
`base_url = "${apiUrl}"`,
|
||||
`env_key = "OPENAI_API_KEY"`,
|
||||
`requires_openai_auth = false`,
|
||||
'',
|
||||
'[mcp_servers.gitea-review]',
|
||||
`url = "http://127.0.0.1:${port}/mcp/gitea-review"`,
|
||||
`http_headers = { "X-Review-Run-Id" = "${runId}" }`,
|
||||
`required = true`,
|
||||
'',
|
||||
];
|
||||
|
||||
await writeFile(path.join(codexConfigDir, 'config.toml'), tomlLines.join('\n'), 'utf-8');
|
||||
|
||||
if (apiKey?.trim()) {
|
||||
const authJson = {
|
||||
auth_mode: 'api_key',
|
||||
OPENAI_API_KEY: apiKey,
|
||||
};
|
||||
await writeFile(path.join(codexConfigDir, 'auth.json'), JSON.stringify(authJson), 'utf-8');
|
||||
}
|
||||
|
||||
logger.debug('已生成 Codex 配置文件', {
|
||||
runId,
|
||||
configPath: path.join(codexConfigDir, 'config.toml'),
|
||||
model,
|
||||
apiUrl,
|
||||
authMode: apiKey?.trim() ? 'env+auth.json' : 'missing_api_key',
|
||||
});
|
||||
}
|
||||
|
||||
private normalizeApiBaseUrl(rawUrl: string): string {
|
||||
const trimmed = rawUrl.trim().replace(/\/+$/, '');
|
||||
if (!trimmed) {
|
||||
return 'https://api.openai.com/v1';
|
||||
}
|
||||
return trimmed.endsWith('/v1') ? trimmed : `${trimmed}/v1`;
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建审查提示词
|
||||
*/
|
||||
private buildReviewPrompt(run: ReviewRun, lastReviewedHead?: string): string {
|
||||
const customPrompt = config.review.codexReviewPrompt?.trim();
|
||||
const globalPrompt = config.review.globalPrompt?.trim();
|
||||
const projectPrompt = this.resolveProjectPrompt(run);
|
||||
|
||||
const sections: string[] = [];
|
||||
|
||||
sections.push(SYSTEM_INSTRUCTIONS);
|
||||
|
||||
sections.push(`## 审查原则\n\n${customPrompt || DEFAULT_REVIEW_GUIDELINES}`);
|
||||
|
||||
if (globalPrompt) {
|
||||
sections.push(`## 全局审查要求\n\n${globalPrompt}`);
|
||||
}
|
||||
|
||||
if (projectPrompt) {
|
||||
sections.push(`## 项目级审查要求\n\n${projectPrompt}`);
|
||||
}
|
||||
|
||||
sections.push(
|
||||
'当要求冲突时,优先级为:项目级审查要求 > 全局审查要求 > 审查原则。'
|
||||
);
|
||||
|
||||
const contextLines: string[] = ['## 当前审查目标'];
|
||||
|
||||
if (run.eventType === 'pull_request') {
|
||||
contextLines.push('‐ 类型:Pull Request');
|
||||
if (run.prNumber) contextLines.push(`- PR 编号:#${run.prNumber}`);
|
||||
if (run.baseSha) contextLines.push(`- Base SHA:${run.baseSha}`);
|
||||
if (run.headSha) contextLines.push(`- Head SHA:${run.headSha}`);
|
||||
if (lastReviewedHead) {
|
||||
contextLines.push(`- 增量审查模式:仅审查上次审查后的新变更`);
|
||||
contextLines.push(`- 上次审查 SHA:${lastReviewedHead}`);
|
||||
contextLines.push(`- 请使用 \`git diff ${lastReviewedHead}..${run.headSha}\` 获取增量差异`);
|
||||
} else {
|
||||
contextLines.push(`- 请使用 \`git diff ${run.baseSha}...${run.headSha}\` 获取差异`);
|
||||
}
|
||||
} else if (run.eventType === 'commit_status') {
|
||||
contextLines.push('- 类型:Commit');
|
||||
if (run.commitSha) {
|
||||
contextLines.push(`- Commit SHA:${run.commitSha}`);
|
||||
contextLines.push(`- 请使用 \`git diff ${run.commitSha}~1...${run.commitSha}\` 获取差异`);
|
||||
}
|
||||
if (run.commitMessage) contextLines.push(`- Commit 信息:${run.commitMessage}`);
|
||||
}
|
||||
|
||||
sections.push(contextLines.join('\n'));
|
||||
return sections.join('\n\n');
|
||||
}
|
||||
|
||||
private resolveProjectPrompt(_run: ReviewRun): string | undefined {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行 codex exec 子进程(自定义 prompt + MCP 工具)
|
||||
*/
|
||||
private async runCodexProcess(workspacePath: string, run: ReviewRun, lastReviewedHead?: string): Promise<void> {
|
||||
const timeoutMs = config.review.codexTimeoutMs;
|
||||
const codexHome = path.join(workspacePath, '.codex');
|
||||
|
||||
// 构建审查提示词
|
||||
const prompt = this.buildReviewPrompt(run, lastReviewedHead);
|
||||
|
||||
// 构建命令参数:codex exec --full-auto --ephemeral [PROMPT]
|
||||
const args: string[] = ['exec', '--full-auto', '--ephemeral', prompt];
|
||||
|
||||
logger.info('启动 Codex CLI', {
|
||||
runId: run.id,
|
||||
args: ['codex', 'exec', '--full-auto', '--ephemeral', '<prompt>'].join(' '),
|
||||
promptLength: prompt.length,
|
||||
cwd: workspacePath,
|
||||
timeoutMs,
|
||||
});
|
||||
|
||||
const proc = Bun.spawn(['codex', ...args], {
|
||||
cwd: workspacePath,
|
||||
stdin: 'ignore',
|
||||
stdout: 'pipe',
|
||||
stderr: 'pipe',
|
||||
env: {
|
||||
...process.env,
|
||||
CODEX_API_KEY: config.review.codexApiKey || '',
|
||||
OPENAI_API_KEY: config.review.codexApiKey || '',
|
||||
OPENAI_BASE_URL: this.normalizeApiBaseUrl(config.review.codexApiUrl),
|
||||
CODEX_HOME: codexHome,
|
||||
},
|
||||
});
|
||||
|
||||
// 超时控制
|
||||
const timeoutPromise = new Promise<never>((_, reject) => {
|
||||
setTimeout(() => {
|
||||
proc.kill();
|
||||
reject(new Error(`Codex 审查超时(${timeoutMs}ms)`));
|
||||
}, timeoutMs);
|
||||
});
|
||||
|
||||
// 等待进程完成
|
||||
const processPromise = (async () => {
|
||||
let fullStderr = '';
|
||||
|
||||
// 并行读取 stdout 和 stderr
|
||||
const readStderr = (async () => {
|
||||
try {
|
||||
const text = await new Response(proc.stderr).text();
|
||||
fullStderr = text;
|
||||
// 按行输出到日志
|
||||
for (const line of text.split('\n')) {
|
||||
if (line.trim()) {
|
||||
logger.debug('Codex 输出', { runId: run.id, line: line.substring(0, 500) });
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// stream 读取错误不是致命的
|
||||
}
|
||||
})();
|
||||
|
||||
const readStdout = (async () => {
|
||||
try {
|
||||
const text = await new Response(proc.stdout).text();
|
||||
for (const line of text.split('\n')) {
|
||||
if (line.trim()) {
|
||||
logger.debug('Codex 结果', { runId: run.id, line: line.substring(0, 500) });
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// stream 读取错误不是致命的
|
||||
}
|
||||
})();
|
||||
|
||||
// 等待读取完成 + 进程退出
|
||||
await Promise.all([readStderr, readStdout]);
|
||||
const exitCode = await proc.exited;
|
||||
|
||||
if (exitCode !== 0) {
|
||||
throw new Error(
|
||||
`Codex 进程退出码 ${exitCode}${fullStderr ? `\nstderr: ${fullStderr.substring(0, 2000)}` : ''}`
|
||||
);
|
||||
}
|
||||
|
||||
logger.info('Codex 进程结束', { runId: run.id, exitCode });
|
||||
})();
|
||||
|
||||
await Promise.race([processPromise, timeoutPromise]);
|
||||
}
|
||||
}
|
||||
187
src/review/codex/mcp-handler.ts
Normal file
187
src/review/codex/mcp-handler.ts
Normal file
@@ -0,0 +1,187 @@
|
||||
import { Hono } from 'hono';
|
||||
import { logger } from '../../utils/logger';
|
||||
import { MCP_TOOLS, mcpToolExecutor } from './mcp-tools';
|
||||
|
||||
/**
|
||||
* Streamable HTTP MCP handler
|
||||
*
|
||||
* Codex 通过 StreamableHttp transport 发送标准 JSON-RPC 2.0 请求。
|
||||
* 我们实现最小子集:initialize、tools/list、tools/call。
|
||||
*
|
||||
* 路由挂载到 /mcp/gitea-review
|
||||
*/
|
||||
|
||||
interface JsonRpcMessage {
|
||||
jsonrpc: '2.0';
|
||||
id?: string | number | null;
|
||||
method: string;
|
||||
params?: Record<string, unknown>;
|
||||
}
|
||||
|
||||
function jsonRpcResponse(id: string | number | null, result: unknown) {
|
||||
return {
|
||||
jsonrpc: '2.0' as const,
|
||||
id,
|
||||
result,
|
||||
};
|
||||
}
|
||||
|
||||
function jsonRpcError(id: string | number | null, code: number, message: string) {
|
||||
return {
|
||||
jsonrpc: '2.0' as const,
|
||||
id,
|
||||
error: { code, message },
|
||||
};
|
||||
}
|
||||
|
||||
const SERVER_INFO = {
|
||||
name: 'gitea-review',
|
||||
version: '1.0.0',
|
||||
};
|
||||
|
||||
const SERVER_CAPABILITIES = {
|
||||
tools: {},
|
||||
};
|
||||
|
||||
export const mcpRouter = new Hono();
|
||||
|
||||
/**
|
||||
* POST /mcp/gitea-review — 处理所有 MCP JSON-RPC 请求
|
||||
*
|
||||
* Codex StreamableHttp transport 通过 POST 发送 JSON-RPC,
|
||||
* 通过 X-Review-Run-Id header 区分审查会话。
|
||||
*/
|
||||
mcpRouter.post('/', async (c) => {
|
||||
const runId = c.req.header('X-Review-Run-Id') || '';
|
||||
|
||||
let body: JsonRpcMessage | JsonRpcMessage[];
|
||||
try {
|
||||
body = await c.req.json();
|
||||
} catch {
|
||||
return c.json(jsonRpcError(null, -32700, 'Parse error'), 400);
|
||||
}
|
||||
|
||||
const messages = Array.isArray(body) ? body : [body];
|
||||
|
||||
// 按 MCP Streamable HTTP 规范:
|
||||
// - 通知(没有 id)不期待响应
|
||||
// - 如果所有消息都是通知,返回 202 Accepted 空 body
|
||||
// - 如果包含请求(有 id),返回对应响应
|
||||
const requests: JsonRpcMessage[] = [];
|
||||
const notifications: JsonRpcMessage[] = [];
|
||||
|
||||
for (const msg of messages) {
|
||||
if (msg.id !== undefined && msg.id !== null) {
|
||||
requests.push(msg);
|
||||
} else {
|
||||
notifications.push(msg);
|
||||
}
|
||||
}
|
||||
|
||||
// 处理通知(仅日志,不返回响应)
|
||||
for (const notif of notifications) {
|
||||
handleNotification(runId, notif);
|
||||
}
|
||||
|
||||
// 如果没有需要响应的请求,返回 202 Accepted
|
||||
if (requests.length === 0) {
|
||||
return c.body(null, 202);
|
||||
}
|
||||
|
||||
// 处理请求
|
||||
const results = await Promise.all(requests.map((req) => handleRequest(runId, req)));
|
||||
|
||||
// 单个请求返回单个对象,批量返回数组
|
||||
if (!Array.isArray(body)) {
|
||||
return c.json(results[0]);
|
||||
}
|
||||
return c.json(results);
|
||||
});
|
||||
|
||||
/**
|
||||
* GET /mcp/gitea-review — SSE 端点(Streamable HTTP MCP 的 server-initiated 通道)
|
||||
*
|
||||
* Codex 在连接时会先 GET 此端点建立 SSE 连接。
|
||||
* 我们的 MCP server 不需要主动推送,所以保持连接存活即可。
|
||||
*/
|
||||
mcpRouter.get('/', (c) => {
|
||||
// 返回 SSE 流,保持连接(Codex 需要此端点存在)
|
||||
c.header('Content-Type', 'text/event-stream');
|
||||
c.header('Cache-Control', 'no-cache');
|
||||
c.header('Connection', 'keep-alive');
|
||||
|
||||
// 发送一个空的 SSE 注释保持连接
|
||||
return new Response(
|
||||
new ReadableStream({
|
||||
start(controller) {
|
||||
controller.enqueue(new TextEncoder().encode(': keepalive\n\n'));
|
||||
// 不关闭 — Codex 会自行断开
|
||||
},
|
||||
}),
|
||||
{
|
||||
headers: {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache',
|
||||
Connection: 'keep-alive',
|
||||
},
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
/**
|
||||
* DELETE /mcp/gitea-review — 会话结束信号
|
||||
*/
|
||||
mcpRouter.delete('/', (c) => {
|
||||
return c.json({ ok: true });
|
||||
});
|
||||
|
||||
/** 处理 JSON-RPC 通知(无 id,不返回响应) */
|
||||
function handleNotification(runId: string, notif: JsonRpcMessage): void {
|
||||
logger.debug('MCP JSON-RPC 通知', { method: notif.method, runId });
|
||||
// notifications/initialized 等通知仅记录日志,无需其他处理
|
||||
}
|
||||
|
||||
/** 处理 JSON-RPC 请求(有 id,需要返回响应) */
|
||||
async function handleRequest(runId: string, req: JsonRpcMessage) {
|
||||
if (!req.jsonrpc || req.jsonrpc !== '2.0') {
|
||||
return jsonRpcError(req.id ?? null, -32600, 'Invalid Request: not JSON-RPC 2.0');
|
||||
}
|
||||
|
||||
logger.debug('MCP JSON-RPC 请求', { method: req.method, runId, id: req.id });
|
||||
|
||||
switch (req.method) {
|
||||
case 'initialize':
|
||||
return jsonRpcResponse(req.id!, {
|
||||
protocolVersion: '2025-03-26',
|
||||
capabilities: SERVER_CAPABILITIES,
|
||||
serverInfo: SERVER_INFO,
|
||||
});
|
||||
|
||||
case 'tools/list':
|
||||
return jsonRpcResponse(req.id!, {
|
||||
tools: MCP_TOOLS,
|
||||
});
|
||||
|
||||
case 'tools/call': {
|
||||
const params = req.params as
|
||||
| { name: string; arguments?: Record<string, unknown> }
|
||||
| undefined;
|
||||
if (!params?.name) {
|
||||
return jsonRpcError(req.id!, -32602, 'Invalid params: missing tool name');
|
||||
}
|
||||
|
||||
if (!runId) {
|
||||
return jsonRpcError(req.id!, -32602, 'Missing X-Review-Run-Id header');
|
||||
}
|
||||
|
||||
const result = await mcpToolExecutor.callTool(runId, params.name, params.arguments || {});
|
||||
return jsonRpcResponse(req.id!, result);
|
||||
}
|
||||
|
||||
case 'ping':
|
||||
return jsonRpcResponse(req.id!, {});
|
||||
|
||||
default:
|
||||
return jsonRpcError(req.id!, -32601, `Method not found: ${req.method}`);
|
||||
}
|
||||
}
|
||||
271
src/review/codex/mcp-tools.ts
Normal file
271
src/review/codex/mcp-tools.ts
Normal file
@@ -0,0 +1,271 @@
|
||||
import { giteaService } from '../../services/gitea';
|
||||
import { logger } from '../../utils/logger';
|
||||
import type { FileReviewStore } from '../store/file-review-store';
|
||||
|
||||
/**
|
||||
* MCP 工具定义 — 描述 Codex 可以调用的工具
|
||||
*/
|
||||
export const MCP_TOOLS = [
|
||||
{
|
||||
name: 'get_pr_info',
|
||||
description:
|
||||
'获取当前 Pull Request 或 Commit 的元信息,包括 owner、repo、PR number、base SHA、head SHA 等。调用此工具了解审查目标的上下文。',
|
||||
inputSchema: {
|
||||
type: 'object' as const,
|
||||
properties: {},
|
||||
required: [] as string[],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'add_review_summary',
|
||||
description:
|
||||
'发布代码审查总结评论到 Pull Request 或 Commit。在完成所有代码分析后调用此工具,提交你的总体审查结论。',
|
||||
inputSchema: {
|
||||
type: 'object' as const,
|
||||
properties: {
|
||||
summary: {
|
||||
type: 'string',
|
||||
description: '审查总结内容(Markdown 格式)',
|
||||
},
|
||||
},
|
||||
required: ['summary'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'add_line_comment',
|
||||
description: '对代码的特定行添加审查评论。仅针对发现严重问题的代码行调用此工具。',
|
||||
inputSchema: {
|
||||
type: 'object' as const,
|
||||
properties: {
|
||||
path: {
|
||||
type: 'string',
|
||||
description: '文件路径(相对于仓库根目录)',
|
||||
},
|
||||
line: {
|
||||
type: 'number',
|
||||
description: '代码行号(新文件中的行号)',
|
||||
},
|
||||
comment: {
|
||||
type: 'string',
|
||||
description: '评论内容',
|
||||
},
|
||||
},
|
||||
required: ['path', 'line', 'comment'],
|
||||
},
|
||||
},
|
||||
];
|
||||
|
||||
/**
|
||||
* 审查上下文 — 由 CodexRunner 创建,传递给 MCP handler
|
||||
*/
|
||||
export interface ReviewRunContext {
|
||||
runId: string;
|
||||
owner: string;
|
||||
repo: string;
|
||||
prNumber?: number;
|
||||
relatedPrNumber?: number;
|
||||
commitSha?: string;
|
||||
baseSha?: string;
|
||||
headSha?: string;
|
||||
lastReviewedHead?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* MCP 工具执行器
|
||||
*/
|
||||
export class McpToolExecutor {
|
||||
/** 活跃的审查上下文,按 runId 索引 */
|
||||
private contexts = new Map<string, ReviewRunContext>();
|
||||
|
||||
constructor(private readonly store?: FileReviewStore) {}
|
||||
|
||||
registerContext(ctx: ReviewRunContext): void {
|
||||
this.contexts.set(ctx.runId, ctx);
|
||||
logger.debug('MCP 注册审查上下文', { runId: ctx.runId, owner: ctx.owner, repo: ctx.repo });
|
||||
}
|
||||
|
||||
unregisterContext(runId: string): void {
|
||||
this.contexts.delete(runId);
|
||||
logger.debug('MCP 注销审查上下文', { runId });
|
||||
}
|
||||
|
||||
getContext(runId: string): ReviewRunContext | undefined {
|
||||
return this.contexts.get(runId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行 MCP 工具调用
|
||||
*/
|
||||
async callTool(
|
||||
runId: string,
|
||||
toolName: string,
|
||||
args: Record<string, unknown>
|
||||
): Promise<{ content: Array<{ type: string; text: string }>; isError?: boolean }> {
|
||||
const ctx = this.contexts.get(runId);
|
||||
if (!ctx) {
|
||||
return {
|
||||
content: [{ type: 'text', text: `错误:找不到审查上下文 (runId=${runId})` }],
|
||||
isError: true,
|
||||
};
|
||||
}
|
||||
|
||||
try {
|
||||
switch (toolName) {
|
||||
case 'get_pr_info':
|
||||
return this.handleGetPrInfo(ctx);
|
||||
case 'add_review_summary':
|
||||
return await this.handleAddReviewSummary(ctx, args as { summary: string });
|
||||
case 'add_line_comment':
|
||||
return await this.handleAddLineComment(
|
||||
ctx,
|
||||
args as { path: string; line: number; comment: string }
|
||||
);
|
||||
default:
|
||||
return {
|
||||
content: [{ type: 'text', text: `未知工具: ${toolName}` }],
|
||||
isError: true,
|
||||
};
|
||||
}
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
logger.error('MCP 工具调用失败', { runId, toolName, error: message });
|
||||
return {
|
||||
content: [{ type: 'text', text: `工具执行失败: ${message}` }],
|
||||
isError: true,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
private handleGetPrInfo(ctx: ReviewRunContext) {
|
||||
const info: Record<string, unknown> = {
|
||||
owner: ctx.owner,
|
||||
repo: ctx.repo,
|
||||
prNumber: ctx.prNumber,
|
||||
baseSha: ctx.baseSha,
|
||||
headSha: ctx.headSha,
|
||||
commitSha: ctx.commitSha || ctx.headSha,
|
||||
};
|
||||
if (ctx.lastReviewedHead) {
|
||||
info.lastReviewedHead = ctx.lastReviewedHead;
|
||||
info.reviewMode = 'incremental';
|
||||
} else {
|
||||
info.reviewMode = 'full';
|
||||
}
|
||||
return {
|
||||
content: [{ type: 'text', text: JSON.stringify(info, null, 2) }],
|
||||
};
|
||||
}
|
||||
|
||||
private async handleAddReviewSummary(ctx: ReviewRunContext, args: { summary: string }) {
|
||||
const body = `## AI \u4ee3\u7801\u5ba1\u67e5\u7ed3\u679c\n\n${args.summary}`;
|
||||
|
||||
// \u4f18\u5148\u901a\u8fc7 PR \u53d1\u5e03\u8bc4\u8bba
|
||||
let prNumber = ctx.prNumber;
|
||||
|
||||
// \u5982\u679c\u6ca1\u6709\u76f4\u63a5\u7684 prNumber\uff0c\u5c1d\u8bd5\u901a\u8fc7 relatedPrNumber \u6216 API \u67e5\u627e\u5173\u8054 PR
|
||||
if (!prNumber) {
|
||||
prNumber = ctx.relatedPrNumber;
|
||||
if (!prNumber && ctx.commitSha) {
|
||||
const related = await giteaService.getRelatedPullRequest(ctx.owner, ctx.repo, ctx.commitSha);
|
||||
prNumber = related?.number;
|
||||
}
|
||||
}
|
||||
|
||||
if (prNumber) {
|
||||
await giteaService.addPullRequestComment(ctx.owner, ctx.repo, prNumber, body);
|
||||
logger.info('Codex MCP: \u5df2\u53d1\u5e03 PR \u5ba1\u67e5\u603b\u7ed3', { runId: ctx.runId, prNumber });
|
||||
} else if (ctx.commitSha) {
|
||||
await giteaService.addCommitComment(ctx.owner, ctx.repo, ctx.commitSha, body);
|
||||
logger.info('Codex MCP: \u5df2\u53d1\u5e03 Commit \u5ba1\u67e5\u603b\u7ed3', {
|
||||
runId: ctx.runId,
|
||||
commitSha: ctx.commitSha,
|
||||
});
|
||||
} else {
|
||||
return {
|
||||
content: [{ type: 'text', text: '\u65e0\u6cd5\u53d1\u5e03\uff1a\u7f3a\u5c11 PR number \u6216 commit SHA' }],
|
||||
isError: true,
|
||||
};
|
||||
}
|
||||
|
||||
// 记录到 store
|
||||
if (this.store) {
|
||||
try {
|
||||
await this.store.addCommentRecord({
|
||||
runId: ctx.runId,
|
||||
status: 'published',
|
||||
body,
|
||||
});
|
||||
} catch (storeError) {
|
||||
logger.warn('MCP: 持久化 summary comment record 失败(非致命)', {
|
||||
runId: ctx.runId,
|
||||
error: storeError instanceof Error ? storeError.message : String(storeError),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
content: [{ type: 'text', text: '审查总结已发布成功' }],
|
||||
};
|
||||
}
|
||||
|
||||
private async handleAddLineComment(
|
||||
ctx: ReviewRunContext,
|
||||
args: { path: string; line: number; comment: string }
|
||||
) {
|
||||
const commitId = ctx.headSha || ctx.commitSha;
|
||||
if (!commitId) {
|
||||
return {
|
||||
content: [{ type: 'text', text: '无法添加行评论:缺少 commit SHA' }],
|
||||
isError: true,
|
||||
};
|
||||
}
|
||||
|
||||
let prNumber = ctx.prNumber || ctx.relatedPrNumber;
|
||||
if (!prNumber) {
|
||||
const related = await giteaService.getRelatedPullRequest(ctx.owner, ctx.repo, commitId);
|
||||
prNumber = related?.number;
|
||||
}
|
||||
|
||||
if (!prNumber) {
|
||||
return {
|
||||
content: [{ type: 'text', text: '无法添加行评论:找不到关联的 Pull Request' }],
|
||||
isError: true,
|
||||
};
|
||||
}
|
||||
|
||||
await giteaService.addLineComments(ctx.owner, ctx.repo, prNumber, commitId, [
|
||||
{ path: args.path, line: args.line, comment: args.comment },
|
||||
]);
|
||||
|
||||
logger.info('Codex MCP: 已发布行评论', {
|
||||
runId: ctx.runId,
|
||||
path: args.path,
|
||||
line: args.line,
|
||||
});
|
||||
|
||||
// 记录到 store
|
||||
if (this.store) {
|
||||
try {
|
||||
await this.store.addCommentRecord({
|
||||
runId: ctx.runId,
|
||||
status: 'published',
|
||||
path: args.path,
|
||||
line: args.line,
|
||||
body: args.comment,
|
||||
});
|
||||
} catch (storeError) {
|
||||
logger.warn('MCP: 持久化 line comment record 失败(非致命)', {
|
||||
runId: ctx.runId,
|
||||
error: storeError instanceof Error ? storeError.message : String(storeError),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
content: [{ type: 'text', text: `行评论已添加: ${args.path}:${args.line}` }],
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/** 全局单例 */
|
||||
export const mcpToolExecutor = new McpToolExecutor();
|
||||
@@ -1,4 +1,4 @@
|
||||
export type ReviewEngineMode = 'legacy' | 'agent';
|
||||
export type ReviewEngineMode = 'legacy' | 'agent' | 'codex';
|
||||
|
||||
export type ReviewEventType = 'pull_request' | 'commit_status';
|
||||
|
||||
|
||||
Reference in New Issue
Block a user