omx(team): merge worker-1

This commit is contained in:
bellman
2026-05-14 18:21:26 +09:00
2 changed files with 153 additions and 7 deletions

View File

@@ -143,8 +143,9 @@ pub use prompt::{
PromptBuildError, SystemPromptBuilder, FRONTIER_MODEL_NAME, SYSTEM_PROMPT_DYNAMIC_BOUNDARY,
};
pub use recovery_recipes::{
attempt_recovery, recipe_for, EscalationPolicy, FailureScenario, RecoveryContext,
RecoveryEvent, RecoveryRecipe, RecoveryResult, RecoveryStep,
attempt_recovery, recipe_for, EscalationPolicy, FailureScenario, RecoveryAttemptState,
RecoveryContext, RecoveryEvent, RecoveryLedgerEntry, RecoveryRecipe, RecoveryResult,
RecoveryStep,
};
pub use remote::{
inherited_upstream_proxy_env, no_proxy_list, read_token, upstream_proxy_ws_url,

View File

@@ -135,14 +135,40 @@ pub enum RecoveryEvent {
Escalated,
}
/// Machine-readable recovery progress for one failure scenario.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RecoveryLedgerEntry {
pub recipe_id: String,
pub attempt_count: u32,
pub state: RecoveryAttemptState,
pub started_at: Option<String>,
pub finished_at: Option<String>,
pub last_failure_summary: Option<String>,
pub escalation_reason: Option<String>,
}
/// Current state of a recovery recipe attempt.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum RecoveryAttemptState {
Queued,
Running,
Succeeded,
Failed,
Exhausted,
}
/// Minimal context for tracking recovery state and emitting events.
///
/// Holds per-scenario attempt counts, a structured event log, and an
/// optional simulation knob for controlling step outcomes during tests.
/// Holds per-scenario attempt counts, a structured event log, a recovery
/// attempt ledger, and an optional simulation knob for controlling step
/// outcomes during tests.
#[derive(Debug, Clone, Default)]
pub struct RecoveryContext {
attempts: HashMap<FailureScenario, u32>,
events: Vec<RecoveryEvent>,
ledger: HashMap<FailureScenario, RecoveryLedgerEntry>,
clock_tick: u64,
/// Optional step index at which simulated execution fails.
/// `None` means all steps succeed.
fail_at_step: Option<usize>,
@@ -172,6 +198,25 @@ impl RecoveryContext {
pub fn attempt_count(&self, scenario: &FailureScenario) -> u32 {
self.attempts.get(scenario).copied().unwrap_or(0)
}
/// Returns the machine-readable recovery ledger entry for a scenario.
#[must_use]
pub fn ledger_entry(&self, scenario: &FailureScenario) -> Option<&RecoveryLedgerEntry> {
self.ledger.get(scenario)
}
/// Returns all recovery ledger entries currently tracked by this context.
#[must_use]
pub fn ledger_entries(&self) -> Vec<&RecoveryLedgerEntry> {
let mut entries: Vec<_> = self.ledger.values().collect();
entries.sort_by(|left, right| left.recipe_id.cmp(&right.recipe_id));
entries
}
fn next_timestamp(&mut self) -> String {
self.clock_tick += 1;
format!("recovery-ledger-tick-{}", self.clock_tick)
}
}
/// Returns the known recovery recipe for the given failure scenario.
@@ -235,16 +280,39 @@ pub fn recipe_for(scenario: &FailureScenario) -> RecoveryRecipe {
/// emits structured [`RecoveryEvent`]s for every attempt.
pub fn attempt_recovery(scenario: &FailureScenario, ctx: &mut RecoveryContext) -> RecoveryResult {
let recipe = recipe_for(scenario);
let attempt_count = ctx.attempts.entry(*scenario).or_insert(0);
let recipe_id = scenario.to_string();
ctx.ledger
.entry(*scenario)
.or_insert_with(|| RecoveryLedgerEntry {
recipe_id: recipe_id.clone(),
attempt_count: 0,
state: RecoveryAttemptState::Queued,
started_at: None,
finished_at: None,
last_failure_summary: None,
escalation_reason: None,
});
let current_attempts = ctx.attempt_count(scenario);
// Enforce one automatic recovery attempt before escalation.
if *attempt_count >= recipe.max_attempts {
if current_attempts >= recipe.max_attempts {
let result = RecoveryResult::EscalationRequired {
reason: format!(
"max recovery attempts ({}) exceeded for {}",
recipe.max_attempts, scenario
),
};
let finished_at = ctx.next_timestamp();
if let Some(entry) = ctx.ledger.get_mut(scenario) {
entry.attempt_count = current_attempts;
entry.state = RecoveryAttemptState::Exhausted;
entry.finished_at = Some(finished_at);
if let RecoveryResult::EscalationRequired { reason } = &result {
entry.last_failure_summary = Some(reason.clone());
entry.escalation_reason = Some(reason.clone());
}
}
ctx.events.push(RecoveryEvent::RecoveryAttempted {
scenario: *scenario,
recipe,
@@ -254,7 +322,18 @@ pub fn attempt_recovery(scenario: &FailureScenario, ctx: &mut RecoveryContext) -
return result;
}
*attempt_count += 1;
let updated_attempts = ctx.attempts.entry(*scenario).or_insert(0);
*updated_attempts += 1;
let updated_attempts = *updated_attempts;
let started_at = ctx.next_timestamp();
if let Some(entry) = ctx.ledger.get_mut(scenario) {
entry.attempt_count = updated_attempts;
entry.state = RecoveryAttemptState::Running;
entry.started_at = Some(started_at);
entry.finished_at = None;
entry.last_failure_summary = None;
entry.escalation_reason = None;
}
// Execute steps, honoring the optional fail_at_step simulation.
let fail_index = ctx.fail_at_step;
@@ -288,6 +367,25 @@ pub fn attempt_recovery(scenario: &FailureScenario, ctx: &mut RecoveryContext) -
};
// Emit the attempt as structured event data.
let finished_at = ctx.next_timestamp();
if let Some(entry) = ctx.ledger.get_mut(scenario) {
entry.finished_at = Some(finished_at);
match &result {
RecoveryResult::Recovered { .. } => {
entry.state = RecoveryAttemptState::Succeeded;
}
RecoveryResult::PartialRecovery { remaining, .. } => {
entry.state = RecoveryAttemptState::Failed;
entry.last_failure_summary =
Some(format!("{} step(s) remaining after partial recovery", remaining.len()));
}
RecoveryResult::EscalationRequired { reason } => {
entry.state = RecoveryAttemptState::Exhausted;
entry.last_failure_summary = Some(reason.clone());
entry.escalation_reason = Some(reason.clone());
}
}
}
ctx.events.push(RecoveryEvent::RecoveryAttempted {
scenario: *scenario,
recipe,
@@ -499,6 +597,53 @@ mod tests {
assert_eq!(ctx.attempt_count(&FailureScenario::PromptMisdelivery), 0);
}
#[test]
fn recovery_context_exposes_machine_readable_ledger() {
// given
let mut ctx = RecoveryContext::new();
// when
let result = attempt_recovery(&FailureScenario::StaleBranch, &mut ctx);
// then
assert_eq!(result, RecoveryResult::Recovered { steps_taken: 2 });
let entry = ctx
.ledger_entry(&FailureScenario::StaleBranch)
.expect("stale branch ledger entry");
assert_eq!(entry.recipe_id, "stale_branch");
assert_eq!(entry.attempt_count, 1);
assert_eq!(entry.state, RecoveryAttemptState::Succeeded);
assert!(entry.started_at.is_some());
assert!(entry.finished_at.is_some());
assert_eq!(entry.last_failure_summary, None);
assert_eq!(entry.escalation_reason, None);
}
#[test]
fn recovery_ledger_records_exhausted_escalation_reason() {
// given
let mut ctx = RecoveryContext::new();
let scenario = FailureScenario::PromptMisdelivery;
// when
let _ = attempt_recovery(&scenario, &mut ctx);
let result = attempt_recovery(&scenario, &mut ctx);
// then
assert!(matches!(
result,
RecoveryResult::EscalationRequired { .. }
));
let entry = ctx.ledger_entry(&scenario).expect("ledger entry");
assert_eq!(entry.state, RecoveryAttemptState::Exhausted);
assert_eq!(entry.attempt_count, 1);
assert!(entry
.escalation_reason
.as_deref()
.expect("escalation reason")
.contains("max recovery attempts"));
}
#[test]
fn stale_branch_recipe_has_rebase_then_clean_build() {
// given