omx(team): merge worker-2

This commit is contained in:
bellman
2026-05-14 18:24:51 +09:00
2 changed files with 167 additions and 6 deletions

View File

@@ -144,8 +144,8 @@ pub use prompt::{
};
pub use recovery_recipes::{
attempt_recovery, recipe_for, EscalationPolicy, FailureScenario, RecoveryAttemptState,
RecoveryContext, RecoveryEvent, RecoveryLedgerEntry, RecoveryRecipe, RecoveryResult,
RecoveryStep,
RecoveryAttemptType, RecoveryCommandResult, RecoveryContext, RecoveryEvent,
RecoveryLedgerEntry, RecoveryRecipe, RecoveryResult, RecoveryStatusReport, RecoveryStep,
};
pub use remote::{
inherited_upstream_proxy_env, no_proxy_list, read_token, upstream_proxy_ws_url,

View File

@@ -121,6 +121,21 @@ pub enum RecoveryResult {
},
}
/// Type of recovery execution represented in the ledger.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum RecoveryAttemptType {
Automatic,
}
/// Result for one executable recovery command/step.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RecoveryCommandResult {
pub command: RecoveryStep,
pub status: RecoveryAttemptState,
pub result: String,
}
/// Structured event emitted during recovery.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
@@ -139,10 +154,16 @@ pub enum RecoveryEvent {
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RecoveryLedgerEntry {
pub recipe_id: String,
pub attempt_type: RecoveryAttemptType,
pub trigger: FailureScenario,
pub attempt_count: u32,
pub retry_limit: u32,
pub attempts_remaining: u32,
pub state: RecoveryAttemptState,
pub started_at: Option<String>,
pub finished_at: Option<String>,
pub command_results: Vec<RecoveryCommandResult>,
pub result: Option<RecoveryResult>,
pub last_failure_summary: Option<String>,
pub escalation_reason: Option<String>,
}
@@ -158,6 +179,19 @@ pub enum RecoveryAttemptState {
Exhausted,
}
/// Machine-readable status projection for callers that need to
/// distinguish an untouched scenario from an exhausted recovery.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RecoveryStatusReport {
pub scenario: FailureScenario,
pub attempted: bool,
pub state: Option<RecoveryAttemptState>,
pub attempt_count: u32,
pub retry_limit: Option<u32>,
pub attempts_remaining: Option<u32>,
pub escalation_reason: Option<String>,
}
/// Minimal context for tracking recovery state and emitting events.
///
/// Holds per-scenario attempt counts, a structured event log, a recovery
@@ -213,6 +247,32 @@ impl RecoveryContext {
entries
}
/// Returns a compact machine-readable recovery status for a scenario,
/// including `attempted = false` when no ledger entry exists yet.
#[must_use]
pub fn status_report(&self, scenario: &FailureScenario) -> RecoveryStatusReport {
self.ledger_entry(scenario).map_or(
RecoveryStatusReport {
scenario: *scenario,
attempted: false,
state: None,
attempt_count: 0,
retry_limit: None,
attempts_remaining: None,
escalation_reason: None,
},
|entry| RecoveryStatusReport {
scenario: *scenario,
attempted: entry.attempt_count > 0,
state: Some(entry.state),
attempt_count: entry.attempt_count,
retry_limit: Some(entry.retry_limit),
attempts_remaining: Some(entry.attempts_remaining),
escalation_reason: entry.escalation_reason.clone(),
},
)
}
fn next_timestamp(&mut self) -> String {
self.clock_tick += 1;
format!("recovery-ledger-tick-{}", self.clock_tick)
@@ -285,10 +345,16 @@ pub fn attempt_recovery(scenario: &FailureScenario, ctx: &mut RecoveryContext) -
.entry(*scenario)
.or_insert_with(|| RecoveryLedgerEntry {
recipe_id: recipe_id.clone(),
attempt_type: RecoveryAttemptType::Automatic,
trigger: *scenario,
attempt_count: 0,
retry_limit: recipe.max_attempts,
attempts_remaining: recipe.max_attempts,
state: RecoveryAttemptState::Queued,
started_at: None,
finished_at: None,
command_results: Vec::new(),
result: None,
last_failure_summary: None,
escalation_reason: None,
});
@@ -306,12 +372,15 @@ pub fn attempt_recovery(scenario: &FailureScenario, ctx: &mut RecoveryContext) -
let finished_at = ctx.next_timestamp();
if let Some(entry) = ctx.ledger.get_mut(scenario) {
entry.attempt_count = current_attempts;
entry.attempts_remaining = 0;
entry.state = RecoveryAttemptState::Exhausted;
entry.finished_at = Some(finished_at);
if let RecoveryResult::EscalationRequired { reason } = &result {
entry.last_failure_summary = Some(reason.clone());
entry.escalation_reason = Some(reason.clone());
}
entry.result = Some(result.clone());
let RecoveryResult::EscalationRequired { reason } = &result else {
unreachable!("exhaustion always produces escalation");
};
entry.last_failure_summary = Some(reason.clone());
entry.escalation_reason = Some(reason.clone());
}
ctx.events.push(RecoveryEvent::RecoveryAttempted {
scenario: *scenario,
@@ -328,9 +397,12 @@ pub fn attempt_recovery(scenario: &FailureScenario, ctx: &mut RecoveryContext) -
let started_at = ctx.next_timestamp();
if let Some(entry) = ctx.ledger.get_mut(scenario) {
entry.attempt_count = updated_attempts;
entry.attempts_remaining = recipe.max_attempts.saturating_sub(updated_attempts);
entry.state = RecoveryAttemptState::Running;
entry.started_at = Some(started_at);
entry.finished_at = None;
entry.command_results.clear();
entry.result = None;
entry.last_failure_summary = None;
entry.escalation_reason = None;
}
@@ -338,14 +410,25 @@ pub fn attempt_recovery(scenario: &FailureScenario, ctx: &mut RecoveryContext) -
// Execute steps, honoring the optional fail_at_step simulation.
let fail_index = ctx.fail_at_step;
let mut executed = Vec::new();
let mut command_results = Vec::new();
let mut failed = false;
for (i, step) in recipe.steps.iter().enumerate() {
if fail_index == Some(i) {
command_results.push(RecoveryCommandResult {
command: step.clone(),
status: RecoveryAttemptState::Failed,
result: format!("step {i} failed for {scenario}"),
});
failed = true;
break;
}
executed.push(step.clone());
command_results.push(RecoveryCommandResult {
command: step.clone(),
status: RecoveryAttemptState::Succeeded,
result: format!("step {i} succeeded for {scenario}"),
});
}
let result = if failed {
@@ -370,6 +453,8 @@ pub fn attempt_recovery(scenario: &FailureScenario, ctx: &mut RecoveryContext) -
let finished_at = ctx.next_timestamp();
if let Some(entry) = ctx.ledger.get_mut(scenario) {
entry.finished_at = Some(finished_at);
entry.command_results = command_results;
entry.result = Some(result.clone());
match &result {
RecoveryResult::Recovered { .. } => {
entry.state = RecoveryAttemptState::Succeeded;
@@ -613,10 +698,24 @@ mod tests {
.ledger_entry(&FailureScenario::StaleBranch)
.expect("stale branch ledger entry");
assert_eq!(entry.recipe_id, "stale_branch");
assert_eq!(entry.attempt_type, RecoveryAttemptType::Automatic);
assert_eq!(entry.trigger, FailureScenario::StaleBranch);
assert_eq!(entry.attempt_count, 1);
assert_eq!(entry.retry_limit, 1);
assert_eq!(entry.attempts_remaining, 0);
assert_eq!(entry.state, RecoveryAttemptState::Succeeded);
assert!(entry.started_at.is_some());
assert!(entry.finished_at.is_some());
assert_eq!(
entry.result,
Some(RecoveryResult::Recovered { steps_taken: 2 })
);
assert_eq!(entry.command_results.len(), 2);
assert_eq!(entry.command_results[0].command, RecoveryStep::RebaseBranch);
assert_eq!(
entry.command_results[0].status,
RecoveryAttemptState::Succeeded
);
assert_eq!(entry.last_failure_summary, None);
assert_eq!(entry.escalation_reason, None);
}
@@ -636,6 +735,11 @@ mod tests {
let entry = ctx.ledger_entry(&scenario).expect("ledger entry");
assert_eq!(entry.state, RecoveryAttemptState::Exhausted);
assert_eq!(entry.attempt_count, 1);
assert_eq!(entry.attempts_remaining, 0);
assert!(matches!(
entry.result,
Some(RecoveryResult::EscalationRequired { .. })
));
assert!(entry
.escalation_reason
.as_deref()
@@ -643,6 +747,63 @@ mod tests {
.contains("max recovery attempts"));
}
#[test]
fn recovery_status_report_distinguishes_not_attempted_from_exhausted() {
// given
let mut ctx = RecoveryContext::new();
let scenario = FailureScenario::PromptMisdelivery;
// then — no ledger entry is not the same as exhausted.
let not_attempted = ctx.status_report(&scenario);
assert!(!not_attempted.attempted);
assert_eq!(not_attempted.state, None);
assert_eq!(not_attempted.attempt_count, 0);
assert_eq!(not_attempted.retry_limit, None);
// when — one allowed attempt then one extra attempt.
let _ = attempt_recovery(&scenario, &mut ctx);
let _ = attempt_recovery(&scenario, &mut ctx);
// then
let exhausted = ctx.status_report(&scenario);
assert!(exhausted.attempted);
assert_eq!(exhausted.state, Some(RecoveryAttemptState::Exhausted));
assert_eq!(exhausted.attempt_count, 1);
assert_eq!(exhausted.retry_limit, Some(1));
assert_eq!(exhausted.attempts_remaining, Some(0));
assert!(exhausted
.escalation_reason
.as_deref()
.is_some_and(|reason| reason.contains("max recovery attempts")));
}
#[test]
fn recovery_ledger_records_failed_command_result() {
// given
let mut ctx = RecoveryContext::new().with_fail_at_step(1);
let scenario = FailureScenario::PartialPluginStartup;
// when
let result = attempt_recovery(&scenario, &mut ctx);
// then
assert!(matches!(result, RecoveryResult::PartialRecovery { .. }));
let entry = ctx.ledger_entry(&scenario).expect("ledger entry");
assert_eq!(entry.state, RecoveryAttemptState::Failed);
assert_eq!(entry.command_results.len(), 2);
assert_eq!(
entry.command_results[0].status,
RecoveryAttemptState::Succeeded
);
assert_eq!(
entry.command_results[1].status,
RecoveryAttemptState::Failed
);
assert!(entry.command_results[1]
.result
.contains("partial_plugin_startup"));
}
#[test]
fn stale_branch_recipe_has_rebase_then_clean_build() {
// given