Make G006 task policy state machine executable

Typed task packets, policy decisions, lane board status, and session liveness now have concrete runtime contracts and focused regressions for Stream 4.

Constraint: G006 requires task/lane operation without pane scraping while preserving legacy task packet callers.
Rejected: waiting on stale worker worktrees | all G006 worker worktrees remained at main with no commits, so leader integrated the verified slice directly.
Confidence: high
Scope-risk: moderate
Directive: Keep task packet serde defaults when adding fields so older packets continue to deserialize.
Tested: git diff --check; cargo fmt --manifest-path rust/Cargo.toml --all -- --check; cargo check --manifest-path rust/Cargo.toml -p runtime -p tools -p rusty-claude-cli; cargo test --manifest-path rust/Cargo.toml -p runtime task_packet -- --nocapture; cargo test --manifest-path rust/Cargo.toml -p runtime policy_engine -- --nocapture; cargo test --manifest-path rust/Cargo.toml -p runtime task_registry -- --nocapture; cargo test --manifest-path rust/Cargo.toml -p runtime session_heartbeat -- --nocapture; cargo test --manifest-path rust/Cargo.toml -p tools run_task_packet_creates_packet_backed_task -- --nocapture; cargo test --manifest-path rust/Cargo.toml -p tools lane_completion -- --nocapture; cargo test --manifest-path rust/Cargo.toml -p rusty-claude-cli status_json_surfaces -- --nocapture
Not-tested: full workspace test suite; PR/issue reconciliation deferred to G011/G012

Co-authored-by: OmX <omx@oh-my-codex.dev>
This commit is contained in:
bellman
2026-05-15 09:29:26 +09:00
parent 41b769fc5a
commit f7235ca932
9 changed files with 808 additions and 13 deletions

View File

@@ -0,0 +1,34 @@
# G006 Task Policy Board Verification Map
Goal: `G006-task-policy-board` — Stream 4 task packets, executable policy engine, lane board/status JSON, and running-state liveness heartbeat.
## Prompt-to-artifact checklist
| Requirement | Artifact/evidence |
| --- | --- |
| Typed task packet schema with objective, scope, files/resources, acceptance criteria, model/provider, permission profile, recovery policy, verification plan, reporting targets | `rust/crates/runtime/src/task_packet.rs` extends `TaskPacket` with `acceptance_criteria`, `resources`, `model`, `provider`, `permission_profile`, `recovery_policy`, `verification_plan`, and `reporting_targets`; tests cover legacy defaulted JSON and rich CC2 roundtrip. |
| Backwards compatibility for existing task packets and tool callers | `serde(default)`/optional fields in `task_packet.rs`; `rust/crates/tools/src/lib.rs` `run_task_packet_creates_packet_backed_task` updated for rich schema; legacy packet test keeps old JSON accepted. |
| Executable policy decisions for retry/rebase/merge/escalate/stale cleanup/approval token | `rust/crates/runtime/src/policy_engine.rs` adds `RetryAvailable`, `RebaseRequired`, `StaleCleanupRequired`, approval-token conditions/actions, `PolicyEvaluation`, `PolicyDecisionEvent`, and decision-table tests. |
| Policy decisions explainable and typed-event logged/emittable | `PolicyDecisionEvent` serializable typed event with `rule_name`, `priority`, `kind`, `explanation`, `approval_token_id`; `evaluate_with_events` emits event per flattened action. |
| Active lane board/dashboard/status JSON over canonical state | `rust/crates/runtime/src/task_registry.rs` adds `LaneBoard`, `LaneBoardEntry`, `LaneFreshness`, `lane_board_at`, and `lane_status_json_at`; CLI status JSON advertises lane board contract in `rust/crates/rusty-claude-cli/src/main.rs`. |
| Heartbeats independent of terminal rendering with healthy/stalled/transport-dead cases | `rust/crates/runtime/src/session.rs` adds `SessionHeartbeat`/`SessionLiveness` from persisted session health state; `task_registry.rs` heartbeat freshness is computed from canonical heartbeat timestamps and transport state. |
| Task/lane status JSON shows active/blocked/finished lanes with heartbeat freshness | `task_registry::tests::lane_board_groups_active_blocked_finished_and_reports_freshness`; `status_json_surfaces_session_lifecycle_for_clawhip`/status JSON surfaces lane board metadata. |
| Leader-owned ultragoal audit remains separate from workers | No worker changed `.omx/ultragoal`; leader will checkpoint with fresh `get_goal` only after terminal verification. |
## Verification run
- `git diff --check` — PASS
- `cargo fmt --manifest-path rust/Cargo.toml --all -- --check` — PASS
- `cargo check --manifest-path rust/Cargo.toml -p runtime -p tools -p rusty-claude-cli` — PASS
- `cargo test --manifest-path rust/Cargo.toml -p runtime task_packet -- --nocapture` — PASS (5 task packet tests)
- `cargo test --manifest-path rust/Cargo.toml -p runtime policy_engine -- --nocapture` — PASS (12 unit + 1 integration match)
- `cargo test --manifest-path rust/Cargo.toml -p runtime task_registry -- --nocapture` — PASS (17 task registry tests)
- `cargo test --manifest-path rust/Cargo.toml -p runtime session_heartbeat -- --nocapture` — PASS (1 heartbeat test)
- `cargo test --manifest-path rust/Cargo.toml -p tools run_task_packet_creates_packet_backed_task -- --nocapture` — PASS
- `cargo test --manifest-path rust/Cargo.toml -p tools lane_completion -- --nocapture` — PASS (6 tests)
- `cargo test --manifest-path rust/Cargo.toml -p rusty-claude-cli status_json_surfaces -- --nocapture` — PASS
## Remaining gates
- G006 can be checkpointed after team lifecycle is reconciled terminal and this commit is pushed.
- Open PR/issue reconciliation remains explicitly deferred to G011/G012 via `docs/pr-issue-resolution-gate.md`.

View File

@@ -135,8 +135,9 @@ pub use plugin_lifecycle::{
PluginState, ResourceInfo, ServerHealth, ServerStatus, ToolInfo,
};
pub use policy_engine::{
evaluate, DiffScope, GreenLevel, LaneBlocker, LaneContext, PolicyAction, PolicyCondition,
PolicyEngine, PolicyRule, ReconcileReason, ReviewStatus,
evaluate, evaluate_with_events, ApprovalToken, DiffScope, GreenLevel, LaneBlocker, LaneContext,
PolicyAction, PolicyCondition, PolicyDecisionEvent, PolicyDecisionKind, PolicyEngine,
PolicyEvaluation, PolicyRule, ReconcileReason, ReviewStatus,
};
pub use prompt::{
load_system_prompt, prepend_bullets, ContextFile, ModelFamilyIdentity, ProjectContext,
@@ -167,7 +168,7 @@ pub use sandbox::{
};
pub use session::{
ContentBlock, ConversationMessage, MessageRole, Session, SessionCompaction, SessionError,
SessionFork, SessionPromptEntry,
SessionFork, SessionHeartbeat, SessionLiveness, SessionPromptEntry,
};
pub use sse::{IncrementalSseParser, SseEvent};
pub use stale_base::{
@@ -178,7 +179,10 @@ pub use stale_branch::{
apply_policy, check_freshness, BranchFreshness, StaleBranchAction, StaleBranchEvent,
StaleBranchPolicy,
};
pub use task_packet::{validate_packet, TaskPacket, TaskPacketValidationError, ValidatedPacket};
pub use task_packet::{
validate_packet, TaskPacket, TaskPacketValidationError, TaskResource, ValidatedPacket,
};
pub use task_registry::{LaneBoard, LaneBoardEntry, LaneFreshness, LaneHeartbeat};
#[cfg(test)]
pub use trust_resolver::{TrustConfig, TrustDecision, TrustEvent, TrustPolicy, TrustResolver};
pub use usage::{

View File

@@ -1,5 +1,7 @@
use std::time::Duration;
use serde::{Deserialize, Serialize};
pub type GreenLevel = u8;
const STALE_BRANCH_THRESHOLD: Duration = Duration::from_hours(1);
@@ -46,6 +48,11 @@ pub enum PolicyCondition {
ReviewPassed,
ScopedDiff,
TimedOut { duration: Duration },
RetryAvailable,
RebaseRequired,
StaleCleanupRequired,
ApprovalTokenPresent,
ApprovalTokenMissing,
}
impl PolicyCondition {
@@ -68,6 +75,11 @@ impl PolicyCondition {
Self::ReviewPassed => context.review_status == ReviewStatus::Approved,
Self::ScopedDiff => context.diff_scope == DiffScope::Scoped,
Self::TimedOut { duration } => context.branch_freshness >= *duration,
Self::RetryAvailable => context.retry_count < context.retry_limit,
Self::RebaseRequired => context.rebase_required,
Self::StaleCleanupRequired => context.stale_cleanup_required,
Self::ApprovalTokenPresent => context.approval_token.is_some(),
Self::ApprovalTokenMissing => context.approval_token.is_none(),
}
}
}
@@ -77,11 +89,15 @@ pub enum PolicyAction {
MergeToDev,
MergeForward,
RecoverOnce,
Retry { reason: String },
Rebase { reason: String },
Escalate { reason: String },
CloseoutLane,
CleanupSession,
CleanupStale { reason: String },
Reconcile { reason: ReconcileReason },
Notify { channel: String },
RequireApprovalToken { operation: String },
Block { reason: String },
Chain(Vec<PolicyAction>),
}
@@ -132,6 +148,44 @@ pub enum DiffScope {
Scoped,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ApprovalToken {
pub token_id: String,
pub operation: String,
pub granted_by: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PolicyDecisionKind {
Retry,
Rebase,
Merge,
Escalate,
StaleCleanup,
ApprovalRequired,
Notify,
Block,
Closeout,
Reconcile,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PolicyDecisionEvent {
pub lane_id: String,
pub rule_name: String,
pub priority: u32,
pub kind: PolicyDecisionKind,
pub explanation: String,
pub approval_token_id: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PolicyEvaluation {
pub actions: Vec<PolicyAction>,
pub events: Vec<PolicyDecisionEvent>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LaneContext {
pub lane_id: String,
@@ -143,6 +197,11 @@ pub struct LaneContext {
pub diff_scope: DiffScope,
pub completed: bool,
pub reconciled: bool,
pub retry_count: u32,
pub retry_limit: u32,
pub rebase_required: bool,
pub stale_cleanup_required: bool,
pub approval_token: Option<ApprovalToken>,
}
impl LaneContext {
@@ -166,6 +225,11 @@ impl LaneContext {
diff_scope,
completed,
reconciled: false,
retry_count: 0,
retry_limit: 1,
rebase_required: false,
stale_cleanup_required: false,
approval_token: None,
}
}
@@ -182,6 +246,11 @@ impl LaneContext {
diff_scope: DiffScope::Full,
completed: true,
reconciled: true,
retry_count: 0,
retry_limit: 1,
rebase_required: false,
stale_cleanup_required: false,
approval_token: None,
}
}
@@ -190,6 +259,31 @@ impl LaneContext {
self.green_contract_satisfied = satisfied;
self
}
#[must_use]
pub fn with_retry_state(mut self, retry_count: u32, retry_limit: u32) -> Self {
self.retry_count = retry_count;
self.retry_limit = retry_limit;
self
}
#[must_use]
pub fn with_rebase_required(mut self, required: bool) -> Self {
self.rebase_required = required;
self
}
#[must_use]
pub fn with_stale_cleanup_required(mut self, required: bool) -> Self {
self.stale_cleanup_required = required;
self
}
#[must_use]
pub fn with_approval_token(mut self, token: ApprovalToken) -> Self {
self.approval_token = Some(token);
self
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -213,17 +307,119 @@ impl PolicyEngine {
pub fn evaluate(&self, context: &LaneContext) -> Vec<PolicyAction> {
evaluate(self, context)
}
#[must_use]
pub fn evaluate_with_events(&self, context: &LaneContext) -> PolicyEvaluation {
evaluate_with_events(self, context)
}
}
#[must_use]
pub fn evaluate(engine: &PolicyEngine, context: &LaneContext) -> Vec<PolicyAction> {
evaluate_with_events(engine, context).actions
}
#[must_use]
pub fn evaluate_with_events(engine: &PolicyEngine, context: &LaneContext) -> PolicyEvaluation {
let mut actions = Vec::new();
let mut events = Vec::new();
for rule in &engine.rules {
if rule.matches(context) {
let before = actions.len();
rule.action.flatten_into(&mut actions);
for action in &actions[before..] {
events.push(decision_event(rule, context, action));
}
}
}
actions
PolicyEvaluation { actions, events }
}
fn decision_event(
rule: &PolicyRule,
context: &LaneContext,
action: &PolicyAction,
) -> PolicyDecisionEvent {
let (kind, explanation) = match action {
PolicyAction::MergeToDev | PolicyAction::MergeForward => (
PolicyDecisionKind::Merge,
format!(
"rule '{}' allows merge action for lane {}",
rule.name, context.lane_id
),
),
PolicyAction::RecoverOnce | PolicyAction::Retry { reason: _ } => (
PolicyDecisionKind::Retry,
format!(
"rule '{}' allows retry {}/{} for lane {}",
rule.name, context.retry_count, context.retry_limit, context.lane_id
),
),
PolicyAction::Rebase { reason } => (
PolicyDecisionKind::Rebase,
format!("rule '{}' requires rebase: {reason}", rule.name),
),
PolicyAction::Escalate { reason } => (
PolicyDecisionKind::Escalate,
format!(
"rule '{}' escalates lane {}: {reason}",
rule.name, context.lane_id
),
),
PolicyAction::CleanupStale { reason } => (
PolicyDecisionKind::StaleCleanup,
format!("rule '{}' requests cleanup: {reason}", rule.name),
),
PolicyAction::CleanupSession => (
PolicyDecisionKind::StaleCleanup,
format!("rule '{}' requests session cleanup", rule.name),
),
PolicyAction::CloseoutLane => (
PolicyDecisionKind::Closeout,
format!("rule '{}' closes out lane {}", rule.name, context.lane_id),
),
PolicyAction::Reconcile { reason } => (
PolicyDecisionKind::Reconcile,
format!(
"rule '{}' reconciles lane {}: {reason:?}",
rule.name, context.lane_id
),
),
PolicyAction::Notify { channel } => (
PolicyDecisionKind::Notify,
format!("rule '{}' notifies {channel}", rule.name),
),
PolicyAction::RequireApprovalToken { operation } => (
PolicyDecisionKind::ApprovalRequired,
format!(
"rule '{}' requires approval token for {operation}",
rule.name
),
),
PolicyAction::Block { reason } => (
PolicyDecisionKind::Block,
format!(
"rule '{}' blocks lane {}: {reason}",
rule.name, context.lane_id
),
),
PolicyAction::Chain(_) => (
PolicyDecisionKind::Notify,
format!("rule '{}' expanded a chained action", rule.name),
),
};
PolicyDecisionEvent {
lane_id: context.lane_id.clone(),
rule_name: rule.name.clone(),
priority: rule.priority,
kind,
explanation,
approval_token_id: context
.approval_token
.as_ref()
.map(|token| token.token_id.clone()),
}
}
#[cfg(test)]
@@ -231,8 +427,9 @@ mod tests {
use std::time::Duration;
use super::{
evaluate, DiffScope, LaneBlocker, LaneContext, PolicyAction, PolicyCondition, PolicyEngine,
PolicyRule, ReconcileReason, ReviewStatus, STALE_BRANCH_THRESHOLD,
evaluate, ApprovalToken, DiffScope, LaneBlocker, LaneContext, PolicyAction,
PolicyCondition, PolicyDecisionKind, PolicyEngine, PolicyRule, ReconcileReason,
ReviewStatus, STALE_BRANCH_THRESHOLD,
};
fn default_context() -> LaneContext {
@@ -532,6 +729,120 @@ mod tests {
);
}
#[test]
fn executable_decision_table_emits_retry_rebase_merge_escalate_cleanup_and_approval_events() {
let engine = PolicyEngine::new(vec![
PolicyRule::new(
"retry-available",
PolicyCondition::RetryAvailable,
PolicyAction::Retry {
reason: "transient failure".to_string(),
},
1,
),
PolicyRule::new(
"rebase-required",
PolicyCondition::RebaseRequired,
PolicyAction::Rebase {
reason: "base branch moved".to_string(),
},
2,
),
PolicyRule::new(
"stale-cleanup",
PolicyCondition::StaleCleanupRequired,
PolicyAction::CleanupStale {
reason: "lease expired".to_string(),
},
3,
),
PolicyRule::new(
"approval-required",
PolicyCondition::ApprovalTokenMissing,
PolicyAction::RequireApprovalToken {
operation: "merge".to_string(),
},
4,
),
PolicyRule::new(
"merge-approved",
PolicyCondition::And(vec![
PolicyCondition::ApprovalTokenPresent,
PolicyCondition::GreenAt { level: 2 },
PolicyCondition::ScopedDiff,
PolicyCondition::ReviewPassed,
]),
PolicyAction::MergeToDev,
5,
),
PolicyRule::new(
"retry-exhausted",
PolicyCondition::TimedOut {
duration: Duration::from_secs(60),
},
PolicyAction::Escalate {
reason: "lane timed out".to_string(),
},
6,
),
]);
let missing_token_context = LaneContext::new(
"lane-cc2",
2,
Duration::from_secs(90),
LaneBlocker::None,
ReviewStatus::Approved,
DiffScope::Scoped,
false,
)
.with_green_contract_satisfied(true)
.with_retry_state(0, 1)
.with_rebase_required(true)
.with_stale_cleanup_required(true);
let missing = engine.evaluate_with_events(&missing_token_context);
assert!(missing.actions.contains(&PolicyAction::Retry {
reason: "transient failure".to_string()
}));
assert!(missing.actions.contains(&PolicyAction::Rebase {
reason: "base branch moved".to_string()
}));
assert!(missing.actions.contains(&PolicyAction::CleanupStale {
reason: "lease expired".to_string()
}));
assert!(missing
.actions
.contains(&PolicyAction::RequireApprovalToken {
operation: "merge".to_string()
}));
assert!(missing.actions.contains(&PolicyAction::Escalate {
reason: "lane timed out".to_string()
}));
assert!(missing
.events
.iter()
.any(|event| event.kind == PolicyDecisionKind::ApprovalRequired
&& event.explanation.contains("approval token")));
let approved_context = missing_token_context.with_approval_token(ApprovalToken {
token_id: "approval-123".to_string(),
operation: "merge".to_string(),
granted_by: "leader".to_string(),
});
let approved = engine.evaluate_with_events(&approved_context);
assert!(approved.actions.contains(&PolicyAction::MergeToDev));
let merge_event = approved
.events
.iter()
.find(|event| event.kind == PolicyDecisionKind::Merge)
.expect("merge event should be emitted");
assert_eq!(
merge_event.approval_token_id.as_deref(),
Some("approval-123")
);
}
#[test]
fn reconciled_lane_emits_reconcile_and_cleanup() {
// given — a lane where branch is already merged, no PR needed, session stale

View File

@@ -8,6 +8,7 @@ use std::time::{SystemTime, UNIX_EPOCH};
use crate::json::{JsonError, JsonValue};
use crate::usage::TokenUsage;
use serde::{Deserialize, Serialize};
const SESSION_VERSION: u32 = 1;
const ROTATE_AFTER_BYTES: u64 = 256 * 1024;
@@ -82,6 +83,25 @@ struct SessionPersistence {
path: PathBuf,
}
/// Running-state liveness classification for a session heartbeat.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SessionLiveness {
Healthy,
Stalled,
TransportDead,
Unknown,
}
/// Heartbeat emitted from canonical session state, independent of terminal rendering.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SessionHeartbeat {
pub session_id: String,
pub observed_at_ms: u64,
pub transport_alive: bool,
pub liveness: SessionLiveness,
}
/// Persisted conversational state for the runtime and CLI session manager.
///
/// `workspace_root` binds the session to the worktree it was created in. The
@@ -250,6 +270,35 @@ impl Session {
self.push_message(ConversationMessage::user_text(text))
}
pub fn record_health_check(&mut self, timestamp_ms: u64) {
self.last_health_check_ms = Some(timestamp_ms);
self.touch();
}
#[must_use]
pub fn heartbeat_at(
&self,
now_ms: u64,
stalled_after_ms: u64,
transport_alive: bool,
) -> SessionHeartbeat {
let liveness = match (transport_alive, self.last_health_check_ms) {
(false, _) => SessionLiveness::TransportDead,
(true, Some(last)) if now_ms.saturating_sub(last) <= stalled_after_ms => {
SessionLiveness::Healthy
}
(true, Some(_)) => SessionLiveness::Stalled,
(true, None) => SessionLiveness::Unknown,
};
SessionHeartbeat {
session_id: self.session_id.clone(),
observed_at_ms: now_ms,
transport_alive,
liveness,
}
}
pub fn record_compaction(&mut self, summary: impl Into<String>, removed_message_count: usize) {
self.touch();
let count = self.compaction.as_ref().map_or(1, |value| value.count + 1);
@@ -1599,4 +1648,26 @@ mod workspace_sessions_dir_tests {
fs::remove_dir_all(&tmp_a).ok();
fs::remove_dir_all(&tmp_b).ok();
}
#[test]
fn session_heartbeat_classifies_healthy_stalled_transport_dead_and_unknown() {
let mut session = Session::new();
assert_eq!(
session.heartbeat_at(1_000, 500, true).liveness,
SessionLiveness::Unknown
);
session.record_health_check(800);
assert_eq!(
session.heartbeat_at(1_000, 500, true).liveness,
SessionLiveness::Healthy
);
assert_eq!(
session.heartbeat_at(2_000, 500, true).liveness,
SessionLiveness::Stalled
);
assert_eq!(
session.heartbeat_at(1_000, 500, false).liveness,
SessionLiveness::TransportDead
);
}
}

View File

@@ -38,10 +38,38 @@ pub struct TaskPacket {
#[serde(skip_serializing_if = "Option::is_none")]
pub worktree: Option<String>,
pub branch_policy: String,
/// Legacy verification commands kept for compatibility with existing task packets.
#[serde(default)]
pub acceptance_tests: Vec<String>,
/// Human-readable acceptance criteria for the task objective.
#[serde(default)]
pub acceptance_criteria: Vec<String>,
/// Files, directories, services, or other resources the task is allowed to touch.
#[serde(default)]
pub resources: Vec<TaskResource>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub model: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub provider: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub permission_profile: Option<String>,
pub commit_policy: String,
/// Legacy reporting contract kept for compatibility with existing task packets.
pub reporting_contract: String,
#[serde(default)]
pub reporting_targets: Vec<String>,
/// Legacy escalation policy kept for compatibility with existing task packets.
pub escalation_policy: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub recovery_policy: Option<String>,
#[serde(default)]
pub verification_plan: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TaskResource {
pub kind: String,
pub value: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -91,16 +119,25 @@ pub fn validate_packet(packet: TaskPacket) -> Result<ValidatedPacket, TaskPacket
validate_required("repo", &packet.repo, &mut errors);
validate_required("branch_policy", &packet.branch_policy, &mut errors);
validate_required("commit_policy", &packet.commit_policy, &mut errors);
validate_required(
"reporting_contract",
&packet.reporting_contract,
&mut errors,
);
validate_required("escalation_policy", &packet.escalation_policy, &mut errors);
if packet.reporting_contract.trim().is_empty() && packet.reporting_targets.is_empty() {
errors.push("reporting_contract or reporting_targets must not be empty".to_string());
}
if packet.escalation_policy.trim().is_empty()
&& packet
.recovery_policy
.as_ref()
.is_none_or(|policy| policy.trim().is_empty())
{
errors.push("escalation_policy or recovery_policy must not be empty".to_string());
}
// Validate scope-specific requirements
validate_scope_requirements(&packet, &mut errors);
if packet.acceptance_tests.is_empty() && packet.acceptance_criteria.is_empty() {
errors.push("acceptance_tests or acceptance_criteria must not be empty".to_string());
}
for (index, test) in packet.acceptance_tests.iter().enumerate() {
if test.trim().is_empty() {
errors.push(format!(
@@ -109,6 +146,43 @@ pub fn validate_packet(packet: TaskPacket) -> Result<ValidatedPacket, TaskPacket
}
}
for (index, criterion) in packet.acceptance_criteria.iter().enumerate() {
if criterion.trim().is_empty() {
errors.push(format!(
"acceptance_criteria contains an empty value at index {index}"
));
}
}
for (index, resource) in packet.resources.iter().enumerate() {
if resource.kind.trim().is_empty() || resource.value.trim().is_empty() {
errors.push(format!(
"resources contains an incomplete entry at index {index}"
));
}
}
validate_optional("model", packet.model.as_deref(), &mut errors);
validate_optional("provider", packet.provider.as_deref(), &mut errors);
validate_optional(
"permission_profile",
packet.permission_profile.as_deref(),
&mut errors,
);
validate_optional(
"recovery_policy",
packet.recovery_policy.as_deref(),
&mut errors,
);
for (index, step) in packet.verification_plan.iter().enumerate() {
if step.trim().is_empty() {
errors.push(format!(
"verification_plan contains an empty value at index {index}"
));
}
}
if errors.is_empty() {
Ok(ValidatedPacket(packet))
} else {
@@ -142,6 +216,12 @@ fn validate_required(field: &str, value: &str, errors: &mut Vec<String>) {
}
}
fn validate_optional(field: &str, value: Option<&str>, errors: &mut Vec<String>) {
if value.is_some_and(|value| value.trim().is_empty()) {
errors.push(format!("{field} must not be empty when present"));
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -158,9 +238,20 @@ mod tests {
"cargo build --workspace".to_string(),
"cargo test --workspace".to_string(),
],
acceptance_criteria: vec!["packet can launch without pane scraping".to_string()],
resources: vec![TaskResource {
kind: "file".to_string(),
value: "rust/crates/runtime/src/task_packet.rs".to_string(),
}],
model: Some("gpt-5.5".to_string()),
provider: Some("openai".to_string()),
permission_profile: Some("workspace-write".to_string()),
commit_policy: "single verified commit".to_string(),
reporting_contract: "print build result, test result, commit sha".to_string(),
reporting_targets: vec!["leader".to_string()],
escalation_policy: "stop only on destructive ambiguity".to_string(),
recovery_policy: Some("retry once then escalate".to_string()),
verification_plan: vec!["cargo test -p runtime task_packet".to_string()],
}
}
@@ -183,9 +274,20 @@ mod tests {
repo: String::new(),
branch_policy: "\t".to_string(),
acceptance_tests: vec!["ok".to_string(), " ".to_string()],
acceptance_criteria: vec![" ".to_string()],
resources: vec![TaskResource {
kind: " ".to_string(),
value: "resource".to_string(),
}],
model: Some(" ".to_string()),
provider: Some("openai".to_string()),
permission_profile: Some("workspace-write".to_string()),
commit_policy: String::new(),
reporting_contract: String::new(),
reporting_targets: Vec::new(),
escalation_policy: String::new(),
recovery_policy: None,
verification_plan: vec![" ".to_string()],
};
let error = validate_packet(packet).expect_err("packet should be rejected");
@@ -202,6 +304,51 @@ mod tests {
.contains(&"acceptance_tests contains an empty value at index 1".to_string()));
}
#[test]
fn legacy_packet_json_deserializes_with_defaulted_cc2_fields() {
let legacy = r#"{
"objective": "Legacy packet",
"scope": "workspace",
"repo": "claw-code",
"branch_policy": "origin/main only",
"acceptance_tests": ["cargo test"],
"commit_policy": "single commit",
"reporting_contract": "report sha",
"escalation_policy": "ask leader"
}"#;
let packet: TaskPacket = serde_json::from_str(legacy).expect("legacy packet should load");
assert_eq!(packet.objective, "Legacy packet");
assert!(packet.acceptance_criteria.is_empty());
assert!(packet.resources.is_empty());
assert_eq!(packet.model, None);
validate_packet(packet).expect("legacy packet remains valid through aliases");
}
#[test]
fn rich_cc2_packet_fields_roundtrip_and_validate() {
let packet = sample_packet();
let json = serde_json::to_value(&packet).expect("packet should serialize");
assert_eq!(
json["acceptance_criteria"][0],
"packet can launch without pane scraping"
);
assert_eq!(json["resources"][0]["kind"], "file");
assert_eq!(json["model"], "gpt-5.5");
assert_eq!(json["provider"], "openai");
assert_eq!(json["permission_profile"], "workspace-write");
assert_eq!(json["recovery_policy"], "retry once then escalate");
assert_eq!(
json["verification_plan"][0],
"cargo test -p runtime task_packet"
);
let roundtrip: TaskPacket = serde_json::from_value(json).expect("rich packet roundtrips");
validate_packet(roundtrip).expect("rich packet validates");
}
#[test]
fn serialization_roundtrip_preserves_packet() {
let packet = sample_packet();

View File

@@ -14,6 +14,7 @@ use crate::{validate_packet, TaskPacket, TaskPacketValidationError};
pub enum TaskStatus {
Created,
Running,
Blocked,
Completed,
Failed,
Stopped,
@@ -24,6 +25,7 @@ impl std::fmt::Display for TaskStatus {
match self {
Self::Created => write!(f, "created"),
Self::Running => write!(f, "running"),
Self::Blocked => write!(f, "blocked"),
Self::Completed => write!(f, "completed"),
Self::Failed => write!(f, "failed"),
Self::Stopped => write!(f, "stopped"),
@@ -43,6 +45,54 @@ pub struct Task {
pub messages: Vec<TaskMessage>,
pub output: String,
pub team_id: Option<String>,
pub heartbeat: Option<LaneHeartbeat>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum LaneFreshness {
Healthy,
Stalled,
TransportDead,
Unknown,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct LaneHeartbeat {
pub observed_at: u64,
pub transport_alive: bool,
pub status: String,
}
impl LaneHeartbeat {
#[must_use]
pub fn freshness_at(&self, now: u64, stalled_after_secs: u64) -> LaneFreshness {
if !self.transport_alive {
return LaneFreshness::TransportDead;
}
if now.saturating_sub(self.observed_at) > stalled_after_secs {
return LaneFreshness::Stalled;
}
LaneFreshness::Healthy
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct LaneBoardEntry {
pub task_id: String,
pub prompt: String,
pub status: TaskStatus,
pub team_id: Option<String>,
pub heartbeat: Option<LaneHeartbeat>,
pub freshness: LaneFreshness,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct LaneBoard {
pub generated_at: u64,
pub active: Vec<LaneBoardEntry>,
pub blocked: Vec<LaneBoardEntry>,
pub finished: Vec<LaneBoardEntry>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -114,6 +164,7 @@ impl TaskRegistry {
messages: Vec::new(),
output: String::new(),
team_id: None,
heartbeat: None,
};
inner.tasks.insert(task_id, task.clone());
task
@@ -134,6 +185,67 @@ impl TaskRegistry {
.collect()
}
pub fn update_heartbeat(&self, task_id: &str, heartbeat: LaneHeartbeat) -> Result<(), String> {
let mut inner = self.inner.lock().expect("registry lock poisoned");
let task = inner
.tasks
.get_mut(task_id)
.ok_or_else(|| format!("task not found: {task_id}"))?;
task.heartbeat = Some(heartbeat);
task.updated_at = now_secs();
Ok(())
}
#[must_use]
pub fn lane_board(&self, stalled_after_secs: u64) -> LaneBoard {
let now = now_secs();
self.lane_board_at(now, stalled_after_secs)
}
#[must_use]
pub fn lane_board_at(&self, now: u64, stalled_after_secs: u64) -> LaneBoard {
let inner = self.inner.lock().expect("registry lock poisoned");
let mut board = LaneBoard {
generated_at: now,
active: Vec::new(),
blocked: Vec::new(),
finished: Vec::new(),
};
for task in inner.tasks.values() {
let freshness = task
.heartbeat
.as_ref()
.map_or(LaneFreshness::Unknown, |heartbeat| {
heartbeat.freshness_at(now, stalled_after_secs)
});
let entry = LaneBoardEntry {
task_id: task.task_id.clone(),
prompt: task.prompt.clone(),
status: task.status,
team_id: task.team_id.clone(),
heartbeat: task.heartbeat.clone(),
freshness,
};
match task.status {
TaskStatus::Running | TaskStatus::Created => board.active.push(entry),
TaskStatus::Blocked => board.blocked.push(entry),
TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Stopped => {
board.finished.push(entry);
}
}
}
board
}
#[must_use]
pub fn lane_status_json_at(&self, now: u64, stalled_after_secs: u64) -> serde_json::Value {
serde_json::to_value(self.lane_board_at(now, stalled_after_secs))
.expect("lane board should serialize")
}
pub fn stop(&self, task_id: &str) -> Result<Task, String> {
let mut inner = self.inner.lock().expect("registry lock poisoned");
let task = inner
@@ -260,9 +372,20 @@ mod tests {
repo: "claw-code-parity".to_string(),
branch_policy: "origin/main only".to_string(),
acceptance_tests: vec!["cargo test --workspace".to_string()],
acceptance_criteria: vec!["task is inspectable".to_string()],
resources: vec![crate::TaskResource {
kind: "module".to_string(),
value: "runtime/task system".to_string(),
}],
model: Some("gpt-5.5".to_string()),
provider: Some("openai".to_string()),
permission_profile: Some("workspace-write".to_string()),
commit_policy: "single commit".to_string(),
reporting_contract: "print commit sha".to_string(),
reporting_targets: vec!["leader".to_string()],
escalation_policy: "manual escalation".to_string(),
recovery_policy: Some("retry once".to_string()),
verification_plan: vec!["cargo test --workspace".to_string()],
};
let task = registry
@@ -340,6 +463,68 @@ mod tests {
assert_eq!(output, "line 1\nline 2\n");
}
#[test]
fn lane_board_groups_active_blocked_finished_and_reports_freshness() {
let registry = TaskRegistry::new();
let active = registry.create("active", None);
let blocked = registry.create("blocked", None);
let finished = registry.create("finished", None);
registry
.set_status(&active.task_id, TaskStatus::Running)
.expect("running status");
registry
.set_status(&blocked.task_id, TaskStatus::Blocked)
.expect("blocked status");
registry
.set_status(&finished.task_id, TaskStatus::Completed)
.expect("completed status");
registry
.update_heartbeat(
&active.task_id,
LaneHeartbeat {
observed_at: 100,
transport_alive: true,
status: "running".to_string(),
},
)
.expect("heartbeat");
registry
.update_heartbeat(
&blocked.task_id,
LaneHeartbeat {
observed_at: 10,
transport_alive: true,
status: "waiting".to_string(),
},
)
.expect("heartbeat");
registry
.update_heartbeat(
&finished.task_id,
LaneHeartbeat {
observed_at: 100,
transport_alive: false,
status: "done".to_string(),
},
)
.expect("heartbeat");
let board = registry.lane_board_at(110, 30);
assert_eq!(board.active.len(), 1);
assert_eq!(board.active[0].freshness, LaneFreshness::Healthy);
assert_eq!(board.blocked.len(), 1);
assert_eq!(board.blocked[0].freshness, LaneFreshness::Stalled);
assert_eq!(board.finished.len(), 1);
assert_eq!(board.finished[0].freshness, LaneFreshness::TransportDead);
let json = registry.lane_status_json_at(110, 30);
assert_eq!(json["active"][0]["status"], "running");
assert_eq!(json["blocked"][0]["freshness"], "stalled");
assert_eq!(json["finished"][0]["freshness"], "transport_dead");
}
#[test]
fn assigns_team_and_removes_task() {
let registry = TaskRegistry::new();
@@ -375,6 +560,7 @@ mod tests {
let cases = [
(TaskStatus::Created, "created"),
(TaskStatus::Running, "running"),
(TaskStatus::Blocked, "blocked"),
(TaskStatus::Completed, "completed"),
(TaskStatus::Failed, "failed"),
(TaskStatus::Stopped, "stopped"),
@@ -392,6 +578,7 @@ mod tests {
vec![
("created".to_string(), "created"),
("running".to_string(), "running"),
("blocked".to_string(), "blocked"),
("completed".to_string(), "completed"),
("failed".to_string(), "failed"),
("stopped".to_string(), "stopped"),
@@ -478,6 +665,7 @@ mod tests {
assert!(task.messages.is_empty());
assert!(task.output.is_empty());
assert_eq!(task.team_id, None);
assert_eq!(task.heartbeat, None);
}
#[test]

View File

@@ -6177,6 +6177,13 @@ fn status_json_value(
"cumulative_total": usage.cumulative.total_tokens(),
"estimated_tokens": usage.estimated_tokens,
},
"lane_board": {
"schema": "task_registry_v1",
"status_json_supported": true,
"heartbeat_freshness_supported": true,
"states": ["active", "blocked", "finished"],
"freshness_states": ["healthy", "stalled", "transport_dead", "unknown"],
},
"workspace": {
"cwd": context.cwd,
"project_root": context.project_root,
@@ -11253,6 +11260,18 @@ mod tests {
json.get("workspace").is_some(),
"workspace field still reported"
);
assert_eq!(
json.pointer("/lane_board/status_json_supported")
.and_then(|v| v.as_bool()),
Some(true),
"status JSON should advertise lane board support: {json}"
);
assert_eq!(
json.pointer("/lane_board/freshness_states/2")
.and_then(|v| v.as_str()),
Some("transport_dead"),
"status JSON should advertise transport-dead freshness: {json}"
);
assert!(
json.get("sandbox").is_some(),
"sandbox field still reported"

View File

@@ -63,6 +63,11 @@ pub(crate) fn detect_lane_completion(
diff_scope: runtime::DiffScope::Scoped,
completed: true,
reconciled: false,
retry_count: 0,
retry_limit: 1,
rebase_required: false,
stale_cleanup_required: false,
approval_token: None,
})
}
@@ -173,6 +178,11 @@ mod tests {
diff_scope: DiffScope::Scoped,
completed: true,
reconciled: false,
retry_count: 0,
retry_limit: 1,
rebase_required: false,
stale_cleanup_required: false,
approval_token: None,
};
let actions = evaluate_completed_lane(&context);

View File

@@ -10148,9 +10148,20 @@ printf 'pwsh:%s' "$1"
"cargo build --workspace".to_string(),
"cargo test --workspace".to_string(),
],
acceptance_criteria: vec!["task packet is accepted".to_string()],
resources: vec![runtime::TaskResource {
kind: "module".to_string(),
value: "runtime/task system".to_string(),
}],
model: Some("gpt-5.5".to_string()),
provider: Some("openai".to_string()),
permission_profile: Some("workspace-write".to_string()),
commit_policy: "single commit".to_string(),
reporting_contract: "print build/test result and sha".to_string(),
reporting_targets: vec!["leader".to_string()],
escalation_policy: "manual escalation".to_string(),
recovery_policy: Some("retry once".to_string()),
verification_plan: vec!["cargo test --workspace".to_string()],
})
.expect("task packet should create a task");