Files
archived-gitea-ai-assistant/src/review/orchestrator.ts
jeffusion d5deb75231 feat(repo): add project-level review prompt with UI redesign
- Add database migration and repository for project review prompts
- Add API endpoint for setting project-level prompts
- Integrate project prompts into Agent and Codex review flows
- Redesign repository management UI with dialog-based prompt editor
- Replace flat buttons with Switch for webhook toggle and dedicated prompt button
- Add Dialog and DropdownMenu UI components from Radix UI
- Add comprehensive tests for wiring and interactions
2026-03-26 13:35:05 +08:00

729 lines
26 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import { randomUUID } from 'node:crypto';
import config from '../config';
import { LLMGateway, llmGateway } from '../llm/gateway';
import { giteaService } from '../services/gitea';
import { logger } from '../utils/logger';
import { DebateOrchestrator } from './agents/debate-orchestrator';
import { JudgeAgent } from './agents/judge-agent';
import { ReflexionAgent } from './agents/reflexion-agent';
import { TriageAgent, type TriageResult } from './agents/triage-agent';
import { DiffExtractor } from './context/diff-extractor';
import { LocalRepoManager, LocalRepoPaths } from './context/local-repo-manager';
import { LearningSystem } from './learning/learning-system';
import { VectorMemoryStore } from './memory/vector-store';
import { applyPublishPolicy } from './policy/publish-policy';
import { resolveProjectReviewPrompt } from './project-review-prompt';
import { FileReviewStore } from './store/file-review-store';
import { createCodeSearchTool } from './tools/code-search-tool';
import { createFileReadTool } from './tools/file-read-tool';
import { createFunctionReferenceSearchTool } from './tools/function-reference-search-tool';
import { ToolRegistry } from './tools/registry';
import { Finding, FindingCategory, ReviewRun, ReviewTask } from './types';
interface LineCommentInput {
path: string;
line: number;
comment: string;
}
function findingToLineComment(
finding: Omit<Finding, 'id' | 'runId' | 'published'>
): LineCommentInput {
return {
path: finding.path,
line: finding.line,
comment: `**[${finding.severity.toUpperCase()}][${finding.category}]** ${finding.title}\n\n${finding.detail}\n\n建议: ${finding.suggestion}`,
};
}
function summarizeGatedCount(gatedCount: number): string {
if (gatedCount <= 0) {
return '';
}
return `\n\n> ${gatedCount} 条低置信或低优先级问题已进入人工审批队列。`;
}
export class ReviewOrchestrator {
private readonly gateway: LLMGateway;
private readonly toolRegistry: ToolRegistry;
private readonly agentMap: Record<string, ReflexionAgent>;
private readonly correctnessAgent: ReflexionAgent;
private readonly securityAgent: ReflexionAgent;
private readonly reliabilityAgent: ReflexionAgent;
private readonly maintainabilityAgent: ReflexionAgent;
private readonly judgeAgent: JudgeAgent;
private readonly debateOrchestrator: DebateOrchestrator;
private readonly triageAgent: TriageAgent;
private readonly memoryStore?: VectorMemoryStore;
private readonly learningSystem?: LearningSystem;
constructor(
private readonly store: FileReviewStore,
private readonly localRepoManager: LocalRepoManager,
private readonly diffExtractor: DiffExtractor
) {
this.gateway = llmGateway;
// 初始化工具注册表
this.toolRegistry = new ToolRegistry();
this.toolRegistry.register(createCodeSearchTool(this.diffExtractor.getSandbox()));
this.toolRegistry.register(createFunctionReferenceSearchTool(this.diffExtractor.getSandbox()));
this.toolRegistry.register(createFileReadTool());
logger.info('已注册工具(支持所有编程语言)', {
tools: this.toolRegistry.getAll().map((t) => t.name),
});
// 初始化记忆和学习系统(可选)
if (config.review.qdrantUrl && config.review.enableMemory) {
this.memoryStore = new VectorMemoryStore(config.review.qdrantUrl);
this.learningSystem = new LearningSystem(this.memoryStore, this.store);
this.memoryStore.initialize().catch((err) => {
logger.warn('向量记忆系统初始化失败', { error: err.message });
});
logger.info('向量记忆系统已启用', { qdrantUrl: config.review.qdrantUrl });
}
// 创建Reflexion-wrapped agents并传递工具注册表和学习系统
this.correctnessAgent = new ReflexionAgent(
this.gateway,
'correctness',
'Correctness Agent',
'业务逻辑正确性、边界条件、空值处理和明显bug',
this.toolRegistry,
this.learningSystem
);
this.securityAgent = new ReflexionAgent(
this.gateway,
'security',
'Security Agent',
'注入漏洞、权限绕过、敏感信息泄露、反序列化和输入校验缺失',
this.toolRegistry,
this.learningSystem
);
this.reliabilityAgent = new ReflexionAgent(
this.gateway,
'reliability',
'Reliability Agent',
'错误处理、重试策略、幂等性、并发一致性和资源释放',
this.toolRegistry,
this.learningSystem
);
this.maintainabilityAgent = new ReflexionAgent(
this.gateway,
'maintainability',
'Maintainability Agent',
'可维护性、复杂度、接口破坏风险和可测试性不足',
this.toolRegistry,
this.learningSystem
);
this.judgeAgent = new JudgeAgent();
this.debateOrchestrator = new DebateOrchestrator(this.gateway);
this.triageAgent = new TriageAgent(this.gateway);
// Build agent map for dynamic dispatch
this.agentMap = {
correctness: this.correctnessAgent,
security: this.securityAgent,
reliability: this.reliabilityAgent,
maintainability: this.maintainabilityAgent,
};
}
async execute(run: ReviewRun): Promise<void> {
const targetSha = run.headSha || run.commitSha;
if (!targetSha) {
await this.store.markRunIgnored(run.id, '缺少目标 sha');
return;
}
const workspaceStepStart = Date.now();
await this.store.addStep({
runId: run.id,
stepName: 'prepare_workspace',
status: 'started',
startedAt: new Date(workspaceStepStart).toISOString(),
});
let repoPaths: LocalRepoPaths | null = null;
try {
repoPaths = await this.localRepoManager.prepareWorkspace(
run.owner,
run.repo,
run.cloneUrl,
targetSha,
run.id,
run.headCloneUrl
);
await this.store.addStep({
runId: run.id,
stepName: 'prepare_workspace',
status: 'succeeded',
startedAt: new Date(workspaceStepStart).toISOString(),
finishedAt: new Date().toISOString(),
latencyMs: Date.now() - workspaceStepStart,
});
// ── 增量审查基线解析 ─────────────────────────────────────────────
let lastReviewedHead: string | undefined;
if (run.eventType === 'pull_request' && run.prNumber) {
const snapshot = await this.localRepoManager.resolveReviewedRef(
repoPaths.mirrorPath,
run.prNumber
);
if (snapshot && targetSha) {
if (snapshot.baseSha === run.baseSha) {
// base 未变(追加 commit 或 force-push 修改 commit→ 增量审查
lastReviewedHead = snapshot.headSha;
logger.info('增量审查模式:使用上次审查快照', {
runId: run.id,
lastReviewedHead: snapshot.headSha,
currentHead: targetSha,
baseSha: run.baseSha,
});
} else {
// base 变了PR 分支做了 rebase→ 全量审查
logger.info('PR base 已变更(可能 rebase回退全量审查', {
runId: run.id,
savedBaseSha: snapshot.baseSha,
currentBaseSha: run.baseSha,
});
}
}
}
const contextStart = Date.now();
await this.store.addStep({
runId: run.id,
stepName: 'build_context',
status: 'started',
startedAt: new Date(contextStart).toISOString(),
});
const context = await this.diffExtractor.buildContext(
run,
repoPaths.mirrorPath,
repoPaths.workspacePath,
lastReviewedHead
);
await this.store.addStep({
runId: run.id,
stepName: 'build_context',
status: 'succeeded',
startedAt: new Date(contextStart).toISOString(),
finishedAt: new Date().toISOString(),
latencyMs: Date.now() - contextStart,
});
if (!context.diff.trim()) {
await this.publishSummary(run, '本次变更无可审查差异内容,已跳过自动行级评论。', 0);
await this.store.markRunIgnored(run.id, '无可审查差异');
return;
}
const projectPrompt = resolveProjectReviewPrompt(run.owner, run.repo);
// ── Triage: 决定哪些 specialist 需要参与 ─────────────────────────
let triage: TriageResult | null = null;
const enableTriage = config.review.enableTriage ?? true;
if (enableTriage) {
const triageStart = Date.now();
await this.store.addStep({
runId: run.id,
stepName: 'triage',
status: 'started',
startedAt: new Date(triageStart).toISOString(),
});
triage = await this.triageAgent.analyze(context, { projectPrompt });
await this.store.addStep({
runId: run.id,
stepName: 'triage',
status: 'succeeded',
startedAt: new Date(triageStart).toISOString(),
finishedAt: new Date().toISOString(),
latencyMs: Date.now() - triageStart,
});
}
// ── 按 triage 结果选择性派发 specialists ─────────────────────────
const agentStart = Date.now();
await this.store.addStep({
runId: run.id,
stepName: 'run_specialists',
status: 'started',
startedAt: new Date(agentStart).toISOString(),
});
const enableReflection = config.review.enableReflection ?? false;
const maxReflectionRounds = config.review.maxReflectionRounds ?? 2;
const defaultDomains: FindingCategory[] = [
'correctness',
'security',
'reliability',
'maintainability',
];
const defaultTasks: ReviewTask[] = defaultDomains.map((domain) => ({
domain,
paths: context.changedFiles.map((f) => f.path),
riskTags: [],
mode: 'full',
tokenBudget: config.review.tokenBudgetLarge,
maxIterations: 2,
allowTools: true,
allowReflection: true,
allowDebate: true,
}));
const triageTasks = triage?.tasks ?? defaultTasks;
const tasksByDomain = new Map<ReviewTask['domain'], ReviewTask>();
for (const task of triageTasks) {
const existing = tasksByDomain.get(task.domain);
if (!existing) {
tasksByDomain.set(task.domain, {
...task,
paths: [...new Set(task.paths)],
});
continue;
}
tasksByDomain.set(task.domain, {
...existing,
paths: [...new Set([...existing.paths, ...task.paths])],
riskTags: [...new Set([...existing.riskTags, ...task.riskTags])],
maxIterations: Math.max(existing.maxIterations, task.maxIterations),
tokenBudget: Math.max(existing.tokenBudget, task.tokenBudget),
allowTools: existing.allowTools || task.allowTools,
allowReflection: existing.allowReflection || task.allowReflection,
allowDebate: existing.allowDebate || task.allowDebate,
mode: existing.mode === 'full' || task.mode === 'full' ? 'full' : 'light',
});
}
const domainTasks = [...tasksByDomain.values()];
logger.info('Specialist 派发决策', {
runId: run.id,
triageComplexity: triage?.complexity ?? 'disabled',
reviewMode: triage?.mode ?? 'full',
taskCount: triageTasks.length,
domainCount: domainTasks.length,
domains: domainTasks.map((task) => task.domain),
});
const agentResults = await Promise.all(
domainTasks.map(async (task) => {
const agent = this.agentMap[task.domain];
const reviewOptions = {
scopePaths: task.paths,
allowTools: task.allowTools,
maxIterations: task.maxIterations,
mode: task.mode,
maxContextTokens: Math.max(1500, Math.floor(task.tokenBudget * 0.7)),
projectPrompt,
} as const;
const useReflection =
enableReflection &&
task.allowReflection &&
task.mode !== 'light' &&
triage?.complexity !== 'trivial';
if (useReflection) {
return agent.reviewWithReflection(run, context, maxReflectionRounds, reviewOptions);
}
return agent.reviewWithOptions(run, context, reviewOptions);
})
);
await this.store.addStep({
runId: run.id,
stepName: 'run_specialists',
status: 'succeeded',
startedAt: new Date(agentStart).toISOString(),
finishedAt: new Date().toISOString(),
latencyMs: Date.now() - agentStart,
});
let allFindings = agentResults.flatMap((result) => result.findings);
// 对高严重性findings启动Debatetrivial 变更跳过 debate
const enableDebate = config.review.enableDebate ?? false;
const debateThreshold = config.review.debateThreshold ?? 'high';
if (
enableDebate &&
allFindings.length > 0 &&
triage?.mode === 'full' &&
triage?.complexity !== 'trivial'
) {
const debateStart = Date.now();
await this.store.addStep({
runId: run.id,
stepName: 'debate_high_severity',
status: 'started',
startedAt: new Date(debateStart).toISOString(),
});
const debatableFindings = allFindings.filter((f) => {
if (debateThreshold === 'high') return f.severity === 'high';
if (debateThreshold === 'medium') return f.severity === 'high' || f.severity === 'medium';
return false;
});
logger.info('启动Debate阶段', {
runId: run.id,
totalFindings: allFindings.length,
debatableFindings: debatableFindings.length,
threshold: debateThreshold,
});
const allowDebateDomains = new Set(
domainTasks.filter((task) => task.allowDebate).map((task) => task.domain)
);
const debatedFindings: typeof allFindings = [];
for (const finding of debatableFindings) {
if (!allowDebateDomains.has(finding.category)) {
debatedFindings.push(finding);
continue;
}
const debateAgents = [this.agentMap[finding.category]];
if (finding.category !== 'correctness' && allowDebateDomains.has('correctness')) {
debateAgents.push(this.correctnessAgent);
}
if (finding.category !== 'security' && allowDebateDomains.has('security')) {
debateAgents.push(this.securityAgent);
}
const uniqueDebateAgents = [...new Set(debateAgents)];
const debatedFinding = await this.debateOrchestrator.conductDebate(
finding,
uniqueDebateAgents,
2,
projectPrompt
);
debatedFindings.push(debatedFinding);
}
// 替换原findings
allFindings = [
...debatedFindings,
...allFindings.filter((f) => !debatableFindings.includes(f)),
];
await this.store.addStep({
runId: run.id,
stepName: 'debate_high_severity',
status: 'succeeded',
startedAt: new Date(debateStart).toISOString(),
finishedAt: new Date().toISOString(),
latencyMs: Date.now() - debateStart,
});
}
const decision = this.judgeAgent.judge(allFindings);
const policyResult = applyPublishPolicy(
decision.findings,
config.review.autoPublishMinConfidence,
config.review.enableHumanGate
);
// 检查是否重试检测summary或line comments是否已发布避免重复发布
// summary comment特征status='published' 且 path字段为空
// line comment特征status='published' 且 path字段存在
const runDetails = await this.store.getRunDetails(run.id);
const summaryPublished =
runDetails?.comments.some((comment) => comment.status === 'published' && !comment.path) ||
false;
const lineCommentsPublished =
runDetails?.comments.some((comment) => comment.status === 'published' && comment.path) ||
false;
if (lineCommentsPublished) {
logger.info('检测到重试且line comments已发布跳过line comments和findings标记', {
runId: run.id,
existingLineComments: runDetails?.comments.filter((c) => c.path).length,
});
// 重试场景line comments已发布跳过line comments发布步骤
// 注意不能return需要继续执行summary和pending gate记录即使summary已存在
}
// 只持久化publishable和gated的findingshuman gate禁用时丢弃低质量findings
// 避免将不会发布也不会人工审批的findings加入pending队列
const findingsToStore = [...policyResult.publishable, ...policyResult.gated];
// 创建fingerprint -> published状态的映射用于在retry时恢复published状态
// 防止addFindings覆盖时将已发布的findings重置为unpublished
const existingPublishedStatus = new Map<string, boolean>();
if (runDetails?.findings) {
for (const f of runDetails.findings) {
existingPublishedStatus.set(f.fingerprint, f.published);
}
}
const persistedFindings: Finding[] = findingsToStore.map((finding) => ({
...finding,
id: randomUUID(),
runId: run.id,
// 如果finding已经publishedretry场景保留published状态否则设为false
published: existingPublishedStatus.get(finding.fingerprint) || false,
}));
await this.store.addFindings(run.id, persistedFindings);
// 先发布line comments可重试步骤成功后再发布summary
// 顺序重要如果publishLineComments失败导致重试不会重复发布summary
if (!lineCommentsPublished) {
// 首次执行发布line comments并标记findings
const lineComments = policyResult.publishable.map(findingToLineComment);
const lineCommentsPublishedSuccessfully = await this.publishLineComments(run, lineComments);
// 只有实际发布了line comments才标记findings为published
// 避免在无PR number等场景下findings消失但开发者没收到评论
if (lineCommentsPublishedSuccessfully) {
for (const finding of policyResult.publishable) {
await this.store.markFindingPublished(run.id, finding.fingerprint);
}
}
} else {
// Retry场景line comments已发布reconcile所有publishable findings的published状态
// 防止crash/store write失败发生在markFindingPublished中间时部分findings永远保持unpublished
for (const finding of policyResult.publishable) {
await this.store.markFindingPublished(run.id, finding.fingerprint);
}
}
// Summary放在最后line comments和markFindingPublished都成功后才发布
// 如果前面步骤失败重试不会产生重复summary
if (!summaryPublished) {
await this.publishSummary(run, decision.summaryMarkdown, policyResult.gated.length);
} else {
logger.info('Summary已发布跳过重复发布', { runId: run.id });
}
// 关键即使summary已存在仍需添加gated findings到pending队列
// 防止crash发生在publishSummary之后、addCommentRecord之前时丢失待审批findings
// 使用幂等性检查防止retry时重复添加
const existingPendingComments =
runDetails?.comments.filter((c) => c.status === 'pending') || [];
// 跟踪本次循环中已添加的location防止同一run中多个findings在同一位置导致重复pending记录
const addedLocations = new Set<string>();
for (const finding of policyResult.gated) {
const locationKey = `${finding.path}:${finding.line}`;
// 检查是否已存在相同的pending记录通过runId + path + line去重
// 需要同时检查1) 之前run的记录 2) 本次循环已添加的记录
const alreadyPending =
existingPendingComments.some((c) => c.path === finding.path && c.line === finding.line) ||
addedLocations.has(locationKey);
if (!alreadyPending) {
await this.store.addCommentRecord({
runId: run.id,
status: 'pending',
body: `PENDING: ${finding.title}`,
path: finding.path,
line: finding.line,
fingerprint: finding.fingerprint,
});
addedLocations.add(locationKey);
} else {
logger.debug('跳过已存在的pending记录', {
runId: run.id,
path: finding.path,
line: finding.line,
});
}
}
// 将已发布的findings存储到向量记忆自动标记为已批准
if (this.memoryStore && policyResult.publishable.length > 0) {
for (const finding of policyResult.publishable) {
const persistedFinding = persistedFindings.find(
(f) => f.fingerprint === finding.fingerprint
);
if (persistedFinding) {
try {
await this.memoryStore.storeFinding(
persistedFinding as Finding,
true,
run.owner,
run.repo
);
} catch (error) {
logger.warn('存储finding到向量记忆失败', {
findingId: persistedFinding.id,
error: error instanceof Error ? error.message : String(error),
});
}
}
}
logger.debug('已发布findings已存储到向量记忆', {
count: policyResult.publishable.length,
});
}
logger.info('Agent 审查流程完成', {
runId: run.id,
owner: run.owner,
repo: run.repo,
findings: decision.findings.length,
published: policyResult.publishable.length,
gated: policyResult.gated.length,
dropped: policyResult.dropped.length,
});
// ── 审查成功:保存审查快照 ref ──────────────────────────────────
if (run.eventType === 'pull_request' && run.prNumber && targetSha) {
try {
await this.localRepoManager.saveReviewedRef(
repoPaths!.mirrorPath,
run.prNumber,
run.baseSha!,
targetSha
);
} catch (refError) {
logger.warn('保存审查快照 ref 失败(非致命)', {
runId: run.id,
error: refError instanceof Error ? refError.message : String(refError),
});
}
}
} catch (error) {
await this.store.addStep({
runId: run.id,
stepName: 'orchestrator',
status: 'failed',
startedAt: new Date().toISOString(),
finishedAt: new Date().toISOString(),
error: error instanceof Error ? error.message : String(error),
});
throw error;
} finally {
if (repoPaths) {
await this.localRepoManager.cleanupWorkspace(repoPaths);
}
}
}
private async publishSummary(run: ReviewRun, summary: string, gatedCount: number): Promise<void> {
const body = `## AI Agent代码审查结果\n\n${summary}${summarizeGatedCount(gatedCount)}`;
if (run.eventType === 'pull_request' && run.prNumber) {
await giteaService.addPullRequestComment(run.owner, run.repo, run.prNumber, body);
// 尝试写入本地record失败不抛出避免阻塞整个审查流程
// 如果失败retry时会因缺少record重复发布summary可接受的权衡
try {
await this.store.addCommentRecord({
runId: run.id,
status: 'published',
body,
});
} catch (storeError) {
logger.error(
'Failed to persist summary comment record (non-fatal, may cause duplicate on retry)',
{
runId: run.id,
error: storeError instanceof Error ? storeError.message : String(storeError),
}
);
// 不抛出,允许审查流程继续
}
return;
}
if (run.commitSha) {
await giteaService.addCommitComment(run.owner, run.repo, run.commitSha, body);
try {
await this.store.addCommentRecord({
runId: run.id,
status: 'published',
body,
});
} catch (storeError) {
logger.error(
'Failed to persist summary comment record (non-fatal, may cause duplicate on retry)',
{
runId: run.id,
error: storeError instanceof Error ? storeError.message : String(storeError),
}
);
// 不抛出,允许审查流程继续
}
}
}
private async publishLineComments(
run: ReviewRun,
comments: LineCommentInput[]
): Promise<boolean> {
if (comments.length === 0) {
return false;
}
const commitId = run.commitSha || run.headSha;
if (!commitId) {
return false;
}
let prNumber = run.prNumber || run.relatedPrNumber;
if (!prNumber) {
const related = await giteaService.getRelatedPullRequest(run.owner, run.repo, commitId);
prNumber = related?.number;
}
if (!prNumber) {
return false;
}
await giteaService.addLineComments(run.owner, run.repo, prNumber, commitId, comments);
// 尝试为每个comment写入本地record失败不抛出避免阻塞整个审查流程
// 如果部分失败retry时lineCommentsPublished可能为false/partial导致重复发布可接受的权衡
for (const comment of comments) {
try {
await this.store.addCommentRecord({
runId: run.id,
status: 'published',
path: comment.path,
line: comment.line,
body: comment.comment,
});
} catch (storeError) {
logger.error(
'Failed to persist line comment record (non-fatal, may cause duplicate on retry)',
{
runId: run.id,
path: comment.path,
line: comment.line,
error: storeError instanceof Error ? storeError.message : String(storeError),
}
);
// 不抛出继续处理下一条comment
}
}
return true; // 成功发布
}
}