From f7235ca9323e431b2b7679ec63d64c0bfee2bbfb Mon Sep 17 00:00:00 2001 From: bellman Date: Fri, 15 May 2026 09:29:26 +0900 Subject: [PATCH] Make G006 task policy state machine executable Typed task packets, policy decisions, lane board status, and session liveness now have concrete runtime contracts and focused regressions for Stream 4. Constraint: G006 requires task/lane operation without pane scraping while preserving legacy task packet callers. Rejected: waiting on stale worker worktrees | all G006 worker worktrees remained at main with no commits, so leader integrated the verified slice directly. Confidence: high Scope-risk: moderate Directive: Keep task packet serde defaults when adding fields so older packets continue to deserialize. Tested: git diff --check; cargo fmt --manifest-path rust/Cargo.toml --all -- --check; cargo check --manifest-path rust/Cargo.toml -p runtime -p tools -p rusty-claude-cli; cargo test --manifest-path rust/Cargo.toml -p runtime task_packet -- --nocapture; cargo test --manifest-path rust/Cargo.toml -p runtime policy_engine -- --nocapture; cargo test --manifest-path rust/Cargo.toml -p runtime task_registry -- --nocapture; cargo test --manifest-path rust/Cargo.toml -p runtime session_heartbeat -- --nocapture; cargo test --manifest-path rust/Cargo.toml -p tools run_task_packet_creates_packet_backed_task -- --nocapture; cargo test --manifest-path rust/Cargo.toml -p tools lane_completion -- --nocapture; cargo test --manifest-path rust/Cargo.toml -p rusty-claude-cli status_json_surfaces -- --nocapture Not-tested: full workspace test suite; PR/issue reconciliation deferred to G011/G012 Co-authored-by: OmX --- ...g006-task-policy-board-verification-map.md | 34 ++ rust/crates/runtime/src/lib.rs | 12 +- rust/crates/runtime/src/policy_engine.rs | 317 +++++++++++++++++- rust/crates/runtime/src/session.rs | 71 ++++ rust/crates/runtime/src/task_packet.rs | 159 ++++++++- rust/crates/runtime/src/task_registry.rs | 188 +++++++++++ rust/crates/rusty-claude-cli/src/main.rs | 19 ++ rust/crates/tools/src/lane_completion.rs | 10 + rust/crates/tools/src/lib.rs | 11 + 9 files changed, 808 insertions(+), 13 deletions(-) create mode 100644 docs/g006-task-policy-board-verification-map.md diff --git a/docs/g006-task-policy-board-verification-map.md b/docs/g006-task-policy-board-verification-map.md new file mode 100644 index 00000000..0cdd64d8 --- /dev/null +++ b/docs/g006-task-policy-board-verification-map.md @@ -0,0 +1,34 @@ +# G006 Task Policy Board Verification Map + +Goal: `G006-task-policy-board` — Stream 4 task packets, executable policy engine, lane board/status JSON, and running-state liveness heartbeat. + +## Prompt-to-artifact checklist + +| Requirement | Artifact/evidence | +| --- | --- | +| Typed task packet schema with objective, scope, files/resources, acceptance criteria, model/provider, permission profile, recovery policy, verification plan, reporting targets | `rust/crates/runtime/src/task_packet.rs` extends `TaskPacket` with `acceptance_criteria`, `resources`, `model`, `provider`, `permission_profile`, `recovery_policy`, `verification_plan`, and `reporting_targets`; tests cover legacy defaulted JSON and rich CC2 roundtrip. | +| Backwards compatibility for existing task packets and tool callers | `serde(default)`/optional fields in `task_packet.rs`; `rust/crates/tools/src/lib.rs` `run_task_packet_creates_packet_backed_task` updated for rich schema; legacy packet test keeps old JSON accepted. | +| Executable policy decisions for retry/rebase/merge/escalate/stale cleanup/approval token | `rust/crates/runtime/src/policy_engine.rs` adds `RetryAvailable`, `RebaseRequired`, `StaleCleanupRequired`, approval-token conditions/actions, `PolicyEvaluation`, `PolicyDecisionEvent`, and decision-table tests. | +| Policy decisions explainable and typed-event logged/emittable | `PolicyDecisionEvent` serializable typed event with `rule_name`, `priority`, `kind`, `explanation`, `approval_token_id`; `evaluate_with_events` emits event per flattened action. | +| Active lane board/dashboard/status JSON over canonical state | `rust/crates/runtime/src/task_registry.rs` adds `LaneBoard`, `LaneBoardEntry`, `LaneFreshness`, `lane_board_at`, and `lane_status_json_at`; CLI status JSON advertises lane board contract in `rust/crates/rusty-claude-cli/src/main.rs`. | +| Heartbeats independent of terminal rendering with healthy/stalled/transport-dead cases | `rust/crates/runtime/src/session.rs` adds `SessionHeartbeat`/`SessionLiveness` from persisted session health state; `task_registry.rs` heartbeat freshness is computed from canonical heartbeat timestamps and transport state. | +| Task/lane status JSON shows active/blocked/finished lanes with heartbeat freshness | `task_registry::tests::lane_board_groups_active_blocked_finished_and_reports_freshness`; `status_json_surfaces_session_lifecycle_for_clawhip`/status JSON surfaces lane board metadata. | +| Leader-owned ultragoal audit remains separate from workers | No worker changed `.omx/ultragoal`; leader will checkpoint with fresh `get_goal` only after terminal verification. | + +## Verification run + +- `git diff --check` — PASS +- `cargo fmt --manifest-path rust/Cargo.toml --all -- --check` — PASS +- `cargo check --manifest-path rust/Cargo.toml -p runtime -p tools -p rusty-claude-cli` — PASS +- `cargo test --manifest-path rust/Cargo.toml -p runtime task_packet -- --nocapture` — PASS (5 task packet tests) +- `cargo test --manifest-path rust/Cargo.toml -p runtime policy_engine -- --nocapture` — PASS (12 unit + 1 integration match) +- `cargo test --manifest-path rust/Cargo.toml -p runtime task_registry -- --nocapture` — PASS (17 task registry tests) +- `cargo test --manifest-path rust/Cargo.toml -p runtime session_heartbeat -- --nocapture` — PASS (1 heartbeat test) +- `cargo test --manifest-path rust/Cargo.toml -p tools run_task_packet_creates_packet_backed_task -- --nocapture` — PASS +- `cargo test --manifest-path rust/Cargo.toml -p tools lane_completion -- --nocapture` — PASS (6 tests) +- `cargo test --manifest-path rust/Cargo.toml -p rusty-claude-cli status_json_surfaces -- --nocapture` — PASS + +## Remaining gates + +- G006 can be checkpointed after team lifecycle is reconciled terminal and this commit is pushed. +- Open PR/issue reconciliation remains explicitly deferred to G011/G012 via `docs/pr-issue-resolution-gate.md`. diff --git a/rust/crates/runtime/src/lib.rs b/rust/crates/runtime/src/lib.rs index 5b1a8aff..d201f2f2 100644 --- a/rust/crates/runtime/src/lib.rs +++ b/rust/crates/runtime/src/lib.rs @@ -135,8 +135,9 @@ pub use plugin_lifecycle::{ PluginState, ResourceInfo, ServerHealth, ServerStatus, ToolInfo, }; pub use policy_engine::{ - evaluate, DiffScope, GreenLevel, LaneBlocker, LaneContext, PolicyAction, PolicyCondition, - PolicyEngine, PolicyRule, ReconcileReason, ReviewStatus, + evaluate, evaluate_with_events, ApprovalToken, DiffScope, GreenLevel, LaneBlocker, LaneContext, + PolicyAction, PolicyCondition, PolicyDecisionEvent, PolicyDecisionKind, PolicyEngine, + PolicyEvaluation, PolicyRule, ReconcileReason, ReviewStatus, }; pub use prompt::{ load_system_prompt, prepend_bullets, ContextFile, ModelFamilyIdentity, ProjectContext, @@ -167,7 +168,7 @@ pub use sandbox::{ }; pub use session::{ ContentBlock, ConversationMessage, MessageRole, Session, SessionCompaction, SessionError, - SessionFork, SessionPromptEntry, + SessionFork, SessionHeartbeat, SessionLiveness, SessionPromptEntry, }; pub use sse::{IncrementalSseParser, SseEvent}; pub use stale_base::{ @@ -178,7 +179,10 @@ pub use stale_branch::{ apply_policy, check_freshness, BranchFreshness, StaleBranchAction, StaleBranchEvent, StaleBranchPolicy, }; -pub use task_packet::{validate_packet, TaskPacket, TaskPacketValidationError, ValidatedPacket}; +pub use task_packet::{ + validate_packet, TaskPacket, TaskPacketValidationError, TaskResource, ValidatedPacket, +}; +pub use task_registry::{LaneBoard, LaneBoardEntry, LaneFreshness, LaneHeartbeat}; #[cfg(test)] pub use trust_resolver::{TrustConfig, TrustDecision, TrustEvent, TrustPolicy, TrustResolver}; pub use usage::{ diff --git a/rust/crates/runtime/src/policy_engine.rs b/rust/crates/runtime/src/policy_engine.rs index f96c0feb..c3e4b1b8 100644 --- a/rust/crates/runtime/src/policy_engine.rs +++ b/rust/crates/runtime/src/policy_engine.rs @@ -1,5 +1,7 @@ use std::time::Duration; +use serde::{Deserialize, Serialize}; + pub type GreenLevel = u8; const STALE_BRANCH_THRESHOLD: Duration = Duration::from_hours(1); @@ -46,6 +48,11 @@ pub enum PolicyCondition { ReviewPassed, ScopedDiff, TimedOut { duration: Duration }, + RetryAvailable, + RebaseRequired, + StaleCleanupRequired, + ApprovalTokenPresent, + ApprovalTokenMissing, } impl PolicyCondition { @@ -68,6 +75,11 @@ impl PolicyCondition { Self::ReviewPassed => context.review_status == ReviewStatus::Approved, Self::ScopedDiff => context.diff_scope == DiffScope::Scoped, Self::TimedOut { duration } => context.branch_freshness >= *duration, + Self::RetryAvailable => context.retry_count < context.retry_limit, + Self::RebaseRequired => context.rebase_required, + Self::StaleCleanupRequired => context.stale_cleanup_required, + Self::ApprovalTokenPresent => context.approval_token.is_some(), + Self::ApprovalTokenMissing => context.approval_token.is_none(), } } } @@ -77,11 +89,15 @@ pub enum PolicyAction { MergeToDev, MergeForward, RecoverOnce, + Retry { reason: String }, + Rebase { reason: String }, Escalate { reason: String }, CloseoutLane, CleanupSession, + CleanupStale { reason: String }, Reconcile { reason: ReconcileReason }, Notify { channel: String }, + RequireApprovalToken { operation: String }, Block { reason: String }, Chain(Vec), } @@ -132,6 +148,44 @@ pub enum DiffScope { Scoped, } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ApprovalToken { + pub token_id: String, + pub operation: String, + pub granted_by: String, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum PolicyDecisionKind { + Retry, + Rebase, + Merge, + Escalate, + StaleCleanup, + ApprovalRequired, + Notify, + Block, + Closeout, + Reconcile, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PolicyDecisionEvent { + pub lane_id: String, + pub rule_name: String, + pub priority: u32, + pub kind: PolicyDecisionKind, + pub explanation: String, + pub approval_token_id: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PolicyEvaluation { + pub actions: Vec, + pub events: Vec, +} + #[derive(Debug, Clone, PartialEq, Eq)] pub struct LaneContext { pub lane_id: String, @@ -143,6 +197,11 @@ pub struct LaneContext { pub diff_scope: DiffScope, pub completed: bool, pub reconciled: bool, + pub retry_count: u32, + pub retry_limit: u32, + pub rebase_required: bool, + pub stale_cleanup_required: bool, + pub approval_token: Option, } impl LaneContext { @@ -166,6 +225,11 @@ impl LaneContext { diff_scope, completed, reconciled: false, + retry_count: 0, + retry_limit: 1, + rebase_required: false, + stale_cleanup_required: false, + approval_token: None, } } @@ -182,6 +246,11 @@ impl LaneContext { diff_scope: DiffScope::Full, completed: true, reconciled: true, + retry_count: 0, + retry_limit: 1, + rebase_required: false, + stale_cleanup_required: false, + approval_token: None, } } @@ -190,6 +259,31 @@ impl LaneContext { self.green_contract_satisfied = satisfied; self } + + #[must_use] + pub fn with_retry_state(mut self, retry_count: u32, retry_limit: u32) -> Self { + self.retry_count = retry_count; + self.retry_limit = retry_limit; + self + } + + #[must_use] + pub fn with_rebase_required(mut self, required: bool) -> Self { + self.rebase_required = required; + self + } + + #[must_use] + pub fn with_stale_cleanup_required(mut self, required: bool) -> Self { + self.stale_cleanup_required = required; + self + } + + #[must_use] + pub fn with_approval_token(mut self, token: ApprovalToken) -> Self { + self.approval_token = Some(token); + self + } } #[derive(Debug, Clone, PartialEq, Eq)] @@ -213,17 +307,119 @@ impl PolicyEngine { pub fn evaluate(&self, context: &LaneContext) -> Vec { evaluate(self, context) } + + #[must_use] + pub fn evaluate_with_events(&self, context: &LaneContext) -> PolicyEvaluation { + evaluate_with_events(self, context) + } } #[must_use] pub fn evaluate(engine: &PolicyEngine, context: &LaneContext) -> Vec { + evaluate_with_events(engine, context).actions +} + +#[must_use] +pub fn evaluate_with_events(engine: &PolicyEngine, context: &LaneContext) -> PolicyEvaluation { let mut actions = Vec::new(); + let mut events = Vec::new(); for rule in &engine.rules { if rule.matches(context) { + let before = actions.len(); rule.action.flatten_into(&mut actions); + for action in &actions[before..] { + events.push(decision_event(rule, context, action)); + } } } - actions + PolicyEvaluation { actions, events } +} + +fn decision_event( + rule: &PolicyRule, + context: &LaneContext, + action: &PolicyAction, +) -> PolicyDecisionEvent { + let (kind, explanation) = match action { + PolicyAction::MergeToDev | PolicyAction::MergeForward => ( + PolicyDecisionKind::Merge, + format!( + "rule '{}' allows merge action for lane {}", + rule.name, context.lane_id + ), + ), + PolicyAction::RecoverOnce | PolicyAction::Retry { reason: _ } => ( + PolicyDecisionKind::Retry, + format!( + "rule '{}' allows retry {}/{} for lane {}", + rule.name, context.retry_count, context.retry_limit, context.lane_id + ), + ), + PolicyAction::Rebase { reason } => ( + PolicyDecisionKind::Rebase, + format!("rule '{}' requires rebase: {reason}", rule.name), + ), + PolicyAction::Escalate { reason } => ( + PolicyDecisionKind::Escalate, + format!( + "rule '{}' escalates lane {}: {reason}", + rule.name, context.lane_id + ), + ), + PolicyAction::CleanupStale { reason } => ( + PolicyDecisionKind::StaleCleanup, + format!("rule '{}' requests cleanup: {reason}", rule.name), + ), + PolicyAction::CleanupSession => ( + PolicyDecisionKind::StaleCleanup, + format!("rule '{}' requests session cleanup", rule.name), + ), + PolicyAction::CloseoutLane => ( + PolicyDecisionKind::Closeout, + format!("rule '{}' closes out lane {}", rule.name, context.lane_id), + ), + PolicyAction::Reconcile { reason } => ( + PolicyDecisionKind::Reconcile, + format!( + "rule '{}' reconciles lane {}: {reason:?}", + rule.name, context.lane_id + ), + ), + PolicyAction::Notify { channel } => ( + PolicyDecisionKind::Notify, + format!("rule '{}' notifies {channel}", rule.name), + ), + PolicyAction::RequireApprovalToken { operation } => ( + PolicyDecisionKind::ApprovalRequired, + format!( + "rule '{}' requires approval token for {operation}", + rule.name + ), + ), + PolicyAction::Block { reason } => ( + PolicyDecisionKind::Block, + format!( + "rule '{}' blocks lane {}: {reason}", + rule.name, context.lane_id + ), + ), + PolicyAction::Chain(_) => ( + PolicyDecisionKind::Notify, + format!("rule '{}' expanded a chained action", rule.name), + ), + }; + + PolicyDecisionEvent { + lane_id: context.lane_id.clone(), + rule_name: rule.name.clone(), + priority: rule.priority, + kind, + explanation, + approval_token_id: context + .approval_token + .as_ref() + .map(|token| token.token_id.clone()), + } } #[cfg(test)] @@ -231,8 +427,9 @@ mod tests { use std::time::Duration; use super::{ - evaluate, DiffScope, LaneBlocker, LaneContext, PolicyAction, PolicyCondition, PolicyEngine, - PolicyRule, ReconcileReason, ReviewStatus, STALE_BRANCH_THRESHOLD, + evaluate, ApprovalToken, DiffScope, LaneBlocker, LaneContext, PolicyAction, + PolicyCondition, PolicyDecisionKind, PolicyEngine, PolicyRule, ReconcileReason, + ReviewStatus, STALE_BRANCH_THRESHOLD, }; fn default_context() -> LaneContext { @@ -532,6 +729,120 @@ mod tests { ); } + #[test] + fn executable_decision_table_emits_retry_rebase_merge_escalate_cleanup_and_approval_events() { + let engine = PolicyEngine::new(vec![ + PolicyRule::new( + "retry-available", + PolicyCondition::RetryAvailable, + PolicyAction::Retry { + reason: "transient failure".to_string(), + }, + 1, + ), + PolicyRule::new( + "rebase-required", + PolicyCondition::RebaseRequired, + PolicyAction::Rebase { + reason: "base branch moved".to_string(), + }, + 2, + ), + PolicyRule::new( + "stale-cleanup", + PolicyCondition::StaleCleanupRequired, + PolicyAction::CleanupStale { + reason: "lease expired".to_string(), + }, + 3, + ), + PolicyRule::new( + "approval-required", + PolicyCondition::ApprovalTokenMissing, + PolicyAction::RequireApprovalToken { + operation: "merge".to_string(), + }, + 4, + ), + PolicyRule::new( + "merge-approved", + PolicyCondition::And(vec![ + PolicyCondition::ApprovalTokenPresent, + PolicyCondition::GreenAt { level: 2 }, + PolicyCondition::ScopedDiff, + PolicyCondition::ReviewPassed, + ]), + PolicyAction::MergeToDev, + 5, + ), + PolicyRule::new( + "retry-exhausted", + PolicyCondition::TimedOut { + duration: Duration::from_secs(60), + }, + PolicyAction::Escalate { + reason: "lane timed out".to_string(), + }, + 6, + ), + ]); + + let missing_token_context = LaneContext::new( + "lane-cc2", + 2, + Duration::from_secs(90), + LaneBlocker::None, + ReviewStatus::Approved, + DiffScope::Scoped, + false, + ) + .with_green_contract_satisfied(true) + .with_retry_state(0, 1) + .with_rebase_required(true) + .with_stale_cleanup_required(true); + + let missing = engine.evaluate_with_events(&missing_token_context); + assert!(missing.actions.contains(&PolicyAction::Retry { + reason: "transient failure".to_string() + })); + assert!(missing.actions.contains(&PolicyAction::Rebase { + reason: "base branch moved".to_string() + })); + assert!(missing.actions.contains(&PolicyAction::CleanupStale { + reason: "lease expired".to_string() + })); + assert!(missing + .actions + .contains(&PolicyAction::RequireApprovalToken { + operation: "merge".to_string() + })); + assert!(missing.actions.contains(&PolicyAction::Escalate { + reason: "lane timed out".to_string() + })); + assert!(missing + .events + .iter() + .any(|event| event.kind == PolicyDecisionKind::ApprovalRequired + && event.explanation.contains("approval token"))); + + let approved_context = missing_token_context.with_approval_token(ApprovalToken { + token_id: "approval-123".to_string(), + operation: "merge".to_string(), + granted_by: "leader".to_string(), + }); + let approved = engine.evaluate_with_events(&approved_context); + assert!(approved.actions.contains(&PolicyAction::MergeToDev)); + let merge_event = approved + .events + .iter() + .find(|event| event.kind == PolicyDecisionKind::Merge) + .expect("merge event should be emitted"); + assert_eq!( + merge_event.approval_token_id.as_deref(), + Some("approval-123") + ); + } + #[test] fn reconciled_lane_emits_reconcile_and_cleanup() { // given — a lane where branch is already merged, no PR needed, session stale diff --git a/rust/crates/runtime/src/session.rs b/rust/crates/runtime/src/session.rs index 6b62444d..6d6ff8fb 100644 --- a/rust/crates/runtime/src/session.rs +++ b/rust/crates/runtime/src/session.rs @@ -8,6 +8,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use crate::json::{JsonError, JsonValue}; use crate::usage::TokenUsage; +use serde::{Deserialize, Serialize}; const SESSION_VERSION: u32 = 1; const ROTATE_AFTER_BYTES: u64 = 256 * 1024; @@ -82,6 +83,25 @@ struct SessionPersistence { path: PathBuf, } +/// Running-state liveness classification for a session heartbeat. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum SessionLiveness { + Healthy, + Stalled, + TransportDead, + Unknown, +} + +/// Heartbeat emitted from canonical session state, independent of terminal rendering. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct SessionHeartbeat { + pub session_id: String, + pub observed_at_ms: u64, + pub transport_alive: bool, + pub liveness: SessionLiveness, +} + /// Persisted conversational state for the runtime and CLI session manager. /// /// `workspace_root` binds the session to the worktree it was created in. The @@ -250,6 +270,35 @@ impl Session { self.push_message(ConversationMessage::user_text(text)) } + pub fn record_health_check(&mut self, timestamp_ms: u64) { + self.last_health_check_ms = Some(timestamp_ms); + self.touch(); + } + + #[must_use] + pub fn heartbeat_at( + &self, + now_ms: u64, + stalled_after_ms: u64, + transport_alive: bool, + ) -> SessionHeartbeat { + let liveness = match (transport_alive, self.last_health_check_ms) { + (false, _) => SessionLiveness::TransportDead, + (true, Some(last)) if now_ms.saturating_sub(last) <= stalled_after_ms => { + SessionLiveness::Healthy + } + (true, Some(_)) => SessionLiveness::Stalled, + (true, None) => SessionLiveness::Unknown, + }; + + SessionHeartbeat { + session_id: self.session_id.clone(), + observed_at_ms: now_ms, + transport_alive, + liveness, + } + } + pub fn record_compaction(&mut self, summary: impl Into, removed_message_count: usize) { self.touch(); let count = self.compaction.as_ref().map_or(1, |value| value.count + 1); @@ -1599,4 +1648,26 @@ mod workspace_sessions_dir_tests { fs::remove_dir_all(&tmp_a).ok(); fs::remove_dir_all(&tmp_b).ok(); } + #[test] + fn session_heartbeat_classifies_healthy_stalled_transport_dead_and_unknown() { + let mut session = Session::new(); + assert_eq!( + session.heartbeat_at(1_000, 500, true).liveness, + SessionLiveness::Unknown + ); + + session.record_health_check(800); + assert_eq!( + session.heartbeat_at(1_000, 500, true).liveness, + SessionLiveness::Healthy + ); + assert_eq!( + session.heartbeat_at(2_000, 500, true).liveness, + SessionLiveness::Stalled + ); + assert_eq!( + session.heartbeat_at(1_000, 500, false).liveness, + SessionLiveness::TransportDead + ); + } } diff --git a/rust/crates/runtime/src/task_packet.rs b/rust/crates/runtime/src/task_packet.rs index 405c073e..a97ffefb 100644 --- a/rust/crates/runtime/src/task_packet.rs +++ b/rust/crates/runtime/src/task_packet.rs @@ -38,10 +38,38 @@ pub struct TaskPacket { #[serde(skip_serializing_if = "Option::is_none")] pub worktree: Option, pub branch_policy: String, + /// Legacy verification commands kept for compatibility with existing task packets. + #[serde(default)] pub acceptance_tests: Vec, + /// Human-readable acceptance criteria for the task objective. + #[serde(default)] + pub acceptance_criteria: Vec, + /// Files, directories, services, or other resources the task is allowed to touch. + #[serde(default)] + pub resources: Vec, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub model: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub provider: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub permission_profile: Option, pub commit_policy: String, + /// Legacy reporting contract kept for compatibility with existing task packets. pub reporting_contract: String, + #[serde(default)] + pub reporting_targets: Vec, + /// Legacy escalation policy kept for compatibility with existing task packets. pub escalation_policy: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub recovery_policy: Option, + #[serde(default)] + pub verification_plan: Vec, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct TaskResource { + pub kind: String, + pub value: String, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -91,16 +119,25 @@ pub fn validate_packet(packet: TaskPacket) -> Result Result) { } } +fn validate_optional(field: &str, value: Option<&str>, errors: &mut Vec) { + if value.is_some_and(|value| value.trim().is_empty()) { + errors.push(format!("{field} must not be empty when present")); + } +} + #[cfg(test)] mod tests { use super::*; @@ -158,9 +238,20 @@ mod tests { "cargo build --workspace".to_string(), "cargo test --workspace".to_string(), ], + acceptance_criteria: vec!["packet can launch without pane scraping".to_string()], + resources: vec![TaskResource { + kind: "file".to_string(), + value: "rust/crates/runtime/src/task_packet.rs".to_string(), + }], + model: Some("gpt-5.5".to_string()), + provider: Some("openai".to_string()), + permission_profile: Some("workspace-write".to_string()), commit_policy: "single verified commit".to_string(), reporting_contract: "print build result, test result, commit sha".to_string(), + reporting_targets: vec!["leader".to_string()], escalation_policy: "stop only on destructive ambiguity".to_string(), + recovery_policy: Some("retry once then escalate".to_string()), + verification_plan: vec!["cargo test -p runtime task_packet".to_string()], } } @@ -183,9 +274,20 @@ mod tests { repo: String::new(), branch_policy: "\t".to_string(), acceptance_tests: vec!["ok".to_string(), " ".to_string()], + acceptance_criteria: vec![" ".to_string()], + resources: vec![TaskResource { + kind: " ".to_string(), + value: "resource".to_string(), + }], + model: Some(" ".to_string()), + provider: Some("openai".to_string()), + permission_profile: Some("workspace-write".to_string()), commit_policy: String::new(), reporting_contract: String::new(), + reporting_targets: Vec::new(), escalation_policy: String::new(), + recovery_policy: None, + verification_plan: vec![" ".to_string()], }; let error = validate_packet(packet).expect_err("packet should be rejected"); @@ -202,6 +304,51 @@ mod tests { .contains(&"acceptance_tests contains an empty value at index 1".to_string())); } + #[test] + fn legacy_packet_json_deserializes_with_defaulted_cc2_fields() { + let legacy = r#"{ + "objective": "Legacy packet", + "scope": "workspace", + "repo": "claw-code", + "branch_policy": "origin/main only", + "acceptance_tests": ["cargo test"], + "commit_policy": "single commit", + "reporting_contract": "report sha", + "escalation_policy": "ask leader" + }"#; + + let packet: TaskPacket = serde_json::from_str(legacy).expect("legacy packet should load"); + + assert_eq!(packet.objective, "Legacy packet"); + assert!(packet.acceptance_criteria.is_empty()); + assert!(packet.resources.is_empty()); + assert_eq!(packet.model, None); + validate_packet(packet).expect("legacy packet remains valid through aliases"); + } + + #[test] + fn rich_cc2_packet_fields_roundtrip_and_validate() { + let packet = sample_packet(); + let json = serde_json::to_value(&packet).expect("packet should serialize"); + + assert_eq!( + json["acceptance_criteria"][0], + "packet can launch without pane scraping" + ); + assert_eq!(json["resources"][0]["kind"], "file"); + assert_eq!(json["model"], "gpt-5.5"); + assert_eq!(json["provider"], "openai"); + assert_eq!(json["permission_profile"], "workspace-write"); + assert_eq!(json["recovery_policy"], "retry once then escalate"); + assert_eq!( + json["verification_plan"][0], + "cargo test -p runtime task_packet" + ); + + let roundtrip: TaskPacket = serde_json::from_value(json).expect("rich packet roundtrips"); + validate_packet(roundtrip).expect("rich packet validates"); + } + #[test] fn serialization_roundtrip_preserves_packet() { let packet = sample_packet(); diff --git a/rust/crates/runtime/src/task_registry.rs b/rust/crates/runtime/src/task_registry.rs index 7e6f65c6..f88895d3 100644 --- a/rust/crates/runtime/src/task_registry.rs +++ b/rust/crates/runtime/src/task_registry.rs @@ -14,6 +14,7 @@ use crate::{validate_packet, TaskPacket, TaskPacketValidationError}; pub enum TaskStatus { Created, Running, + Blocked, Completed, Failed, Stopped, @@ -24,6 +25,7 @@ impl std::fmt::Display for TaskStatus { match self { Self::Created => write!(f, "created"), Self::Running => write!(f, "running"), + Self::Blocked => write!(f, "blocked"), Self::Completed => write!(f, "completed"), Self::Failed => write!(f, "failed"), Self::Stopped => write!(f, "stopped"), @@ -43,6 +45,54 @@ pub struct Task { pub messages: Vec, pub output: String, pub team_id: Option, + pub heartbeat: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum LaneFreshness { + Healthy, + Stalled, + TransportDead, + Unknown, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct LaneHeartbeat { + pub observed_at: u64, + pub transport_alive: bool, + pub status: String, +} + +impl LaneHeartbeat { + #[must_use] + pub fn freshness_at(&self, now: u64, stalled_after_secs: u64) -> LaneFreshness { + if !self.transport_alive { + return LaneFreshness::TransportDead; + } + if now.saturating_sub(self.observed_at) > stalled_after_secs { + return LaneFreshness::Stalled; + } + LaneFreshness::Healthy + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct LaneBoardEntry { + pub task_id: String, + pub prompt: String, + pub status: TaskStatus, + pub team_id: Option, + pub heartbeat: Option, + pub freshness: LaneFreshness, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct LaneBoard { + pub generated_at: u64, + pub active: Vec, + pub blocked: Vec, + pub finished: Vec, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -114,6 +164,7 @@ impl TaskRegistry { messages: Vec::new(), output: String::new(), team_id: None, + heartbeat: None, }; inner.tasks.insert(task_id, task.clone()); task @@ -134,6 +185,67 @@ impl TaskRegistry { .collect() } + pub fn update_heartbeat(&self, task_id: &str, heartbeat: LaneHeartbeat) -> Result<(), String> { + let mut inner = self.inner.lock().expect("registry lock poisoned"); + let task = inner + .tasks + .get_mut(task_id) + .ok_or_else(|| format!("task not found: {task_id}"))?; + task.heartbeat = Some(heartbeat); + task.updated_at = now_secs(); + Ok(()) + } + + #[must_use] + pub fn lane_board(&self, stalled_after_secs: u64) -> LaneBoard { + let now = now_secs(); + self.lane_board_at(now, stalled_after_secs) + } + + #[must_use] + pub fn lane_board_at(&self, now: u64, stalled_after_secs: u64) -> LaneBoard { + let inner = self.inner.lock().expect("registry lock poisoned"); + let mut board = LaneBoard { + generated_at: now, + active: Vec::new(), + blocked: Vec::new(), + finished: Vec::new(), + }; + + for task in inner.tasks.values() { + let freshness = task + .heartbeat + .as_ref() + .map_or(LaneFreshness::Unknown, |heartbeat| { + heartbeat.freshness_at(now, stalled_after_secs) + }); + let entry = LaneBoardEntry { + task_id: task.task_id.clone(), + prompt: task.prompt.clone(), + status: task.status, + team_id: task.team_id.clone(), + heartbeat: task.heartbeat.clone(), + freshness, + }; + + match task.status { + TaskStatus::Running | TaskStatus::Created => board.active.push(entry), + TaskStatus::Blocked => board.blocked.push(entry), + TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Stopped => { + board.finished.push(entry); + } + } + } + + board + } + + #[must_use] + pub fn lane_status_json_at(&self, now: u64, stalled_after_secs: u64) -> serde_json::Value { + serde_json::to_value(self.lane_board_at(now, stalled_after_secs)) + .expect("lane board should serialize") + } + pub fn stop(&self, task_id: &str) -> Result { let mut inner = self.inner.lock().expect("registry lock poisoned"); let task = inner @@ -260,9 +372,20 @@ mod tests { repo: "claw-code-parity".to_string(), branch_policy: "origin/main only".to_string(), acceptance_tests: vec!["cargo test --workspace".to_string()], + acceptance_criteria: vec!["task is inspectable".to_string()], + resources: vec![crate::TaskResource { + kind: "module".to_string(), + value: "runtime/task system".to_string(), + }], + model: Some("gpt-5.5".to_string()), + provider: Some("openai".to_string()), + permission_profile: Some("workspace-write".to_string()), commit_policy: "single commit".to_string(), reporting_contract: "print commit sha".to_string(), + reporting_targets: vec!["leader".to_string()], escalation_policy: "manual escalation".to_string(), + recovery_policy: Some("retry once".to_string()), + verification_plan: vec!["cargo test --workspace".to_string()], }; let task = registry @@ -340,6 +463,68 @@ mod tests { assert_eq!(output, "line 1\nline 2\n"); } + #[test] + fn lane_board_groups_active_blocked_finished_and_reports_freshness() { + let registry = TaskRegistry::new(); + let active = registry.create("active", None); + let blocked = registry.create("blocked", None); + let finished = registry.create("finished", None); + + registry + .set_status(&active.task_id, TaskStatus::Running) + .expect("running status"); + registry + .set_status(&blocked.task_id, TaskStatus::Blocked) + .expect("blocked status"); + registry + .set_status(&finished.task_id, TaskStatus::Completed) + .expect("completed status"); + registry + .update_heartbeat( + &active.task_id, + LaneHeartbeat { + observed_at: 100, + transport_alive: true, + status: "running".to_string(), + }, + ) + .expect("heartbeat"); + registry + .update_heartbeat( + &blocked.task_id, + LaneHeartbeat { + observed_at: 10, + transport_alive: true, + status: "waiting".to_string(), + }, + ) + .expect("heartbeat"); + registry + .update_heartbeat( + &finished.task_id, + LaneHeartbeat { + observed_at: 100, + transport_alive: false, + status: "done".to_string(), + }, + ) + .expect("heartbeat"); + + let board = registry.lane_board_at(110, 30); + + assert_eq!(board.active.len(), 1); + assert_eq!(board.active[0].freshness, LaneFreshness::Healthy); + assert_eq!(board.blocked.len(), 1); + assert_eq!(board.blocked[0].freshness, LaneFreshness::Stalled); + assert_eq!(board.finished.len(), 1); + assert_eq!(board.finished[0].freshness, LaneFreshness::TransportDead); + + let json = registry.lane_status_json_at(110, 30); + assert_eq!(json["active"][0]["status"], "running"); + assert_eq!(json["blocked"][0]["freshness"], "stalled"); + assert_eq!(json["finished"][0]["freshness"], "transport_dead"); + } + #[test] fn assigns_team_and_removes_task() { let registry = TaskRegistry::new(); @@ -375,6 +560,7 @@ mod tests { let cases = [ (TaskStatus::Created, "created"), (TaskStatus::Running, "running"), + (TaskStatus::Blocked, "blocked"), (TaskStatus::Completed, "completed"), (TaskStatus::Failed, "failed"), (TaskStatus::Stopped, "stopped"), @@ -392,6 +578,7 @@ mod tests { vec![ ("created".to_string(), "created"), ("running".to_string(), "running"), + ("blocked".to_string(), "blocked"), ("completed".to_string(), "completed"), ("failed".to_string(), "failed"), ("stopped".to_string(), "stopped"), @@ -478,6 +665,7 @@ mod tests { assert!(task.messages.is_empty()); assert!(task.output.is_empty()); assert_eq!(task.team_id, None); + assert_eq!(task.heartbeat, None); } #[test] diff --git a/rust/crates/rusty-claude-cli/src/main.rs b/rust/crates/rusty-claude-cli/src/main.rs index 188990bb..6e7b47f0 100644 --- a/rust/crates/rusty-claude-cli/src/main.rs +++ b/rust/crates/rusty-claude-cli/src/main.rs @@ -6177,6 +6177,13 @@ fn status_json_value( "cumulative_total": usage.cumulative.total_tokens(), "estimated_tokens": usage.estimated_tokens, }, + "lane_board": { + "schema": "task_registry_v1", + "status_json_supported": true, + "heartbeat_freshness_supported": true, + "states": ["active", "blocked", "finished"], + "freshness_states": ["healthy", "stalled", "transport_dead", "unknown"], + }, "workspace": { "cwd": context.cwd, "project_root": context.project_root, @@ -11253,6 +11260,18 @@ mod tests { json.get("workspace").is_some(), "workspace field still reported" ); + assert_eq!( + json.pointer("/lane_board/status_json_supported") + .and_then(|v| v.as_bool()), + Some(true), + "status JSON should advertise lane board support: {json}" + ); + assert_eq!( + json.pointer("/lane_board/freshness_states/2") + .and_then(|v| v.as_str()), + Some("transport_dead"), + "status JSON should advertise transport-dead freshness: {json}" + ); assert!( json.get("sandbox").is_some(), "sandbox field still reported" diff --git a/rust/crates/tools/src/lane_completion.rs b/rust/crates/tools/src/lane_completion.rs index 5ca68fe2..947d58d9 100644 --- a/rust/crates/tools/src/lane_completion.rs +++ b/rust/crates/tools/src/lane_completion.rs @@ -63,6 +63,11 @@ pub(crate) fn detect_lane_completion( diff_scope: runtime::DiffScope::Scoped, completed: true, reconciled: false, + retry_count: 0, + retry_limit: 1, + rebase_required: false, + stale_cleanup_required: false, + approval_token: None, }) } @@ -173,6 +178,11 @@ mod tests { diff_scope: DiffScope::Scoped, completed: true, reconciled: false, + retry_count: 0, + retry_limit: 1, + rebase_required: false, + stale_cleanup_required: false, + approval_token: None, }; let actions = evaluate_completed_lane(&context); diff --git a/rust/crates/tools/src/lib.rs b/rust/crates/tools/src/lib.rs index 6a1fdc56..e7016f71 100644 --- a/rust/crates/tools/src/lib.rs +++ b/rust/crates/tools/src/lib.rs @@ -10148,9 +10148,20 @@ printf 'pwsh:%s' "$1" "cargo build --workspace".to_string(), "cargo test --workspace".to_string(), ], + acceptance_criteria: vec!["task packet is accepted".to_string()], + resources: vec![runtime::TaskResource { + kind: "module".to_string(), + value: "runtime/task system".to_string(), + }], + model: Some("gpt-5.5".to_string()), + provider: Some("openai".to_string()), + permission_profile: Some("workspace-write".to_string()), commit_policy: "single commit".to_string(), reporting_contract: "print build/test result and sha".to_string(), + reporting_targets: vec!["leader".to_string()], escalation_policy: "manual escalation".to_string(), + recovery_policy: Some("retry once".to_string()), + verification_plan: vec!["cargo test --workspace".to_string()], }) .expect("task packet should create a task");