mirror of
https://github.com/instructkr/claude-code.git
synced 2026-05-14 09:56:44 +00:00
omx(team): auto-checkpoint worker-1 [1]
This commit is contained in:
@@ -23,6 +23,7 @@ class PortingModule:
|
||||
class PermissionDenial:
|
||||
tool_name: str
|
||||
reason: str
|
||||
status: str = 'blocked'
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
|
||||
@@ -1,20 +1,49 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
|
||||
from .path_scope import PathScopeDecision, WorkspacePathScope
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ToolPermissionContext:
|
||||
deny_names: frozenset[str] = field(default_factory=frozenset)
|
||||
deny_prefixes: tuple[str, ...] = ()
|
||||
workspace_scope: WorkspacePathScope | None = None
|
||||
cwd: Path | None = None
|
||||
|
||||
@classmethod
|
||||
def from_iterables(cls, deny_names: list[str] | None = None, deny_prefixes: list[str] | None = None) -> 'ToolPermissionContext':
|
||||
def from_iterables(
|
||||
cls,
|
||||
deny_names: list[str] | None = None,
|
||||
deny_prefixes: list[str] | None = None,
|
||||
workspace_root: str | Path | None = None,
|
||||
workspace_roots: list[str | Path] | tuple[str | Path, ...] | None = None,
|
||||
cwd: str | Path | None = None,
|
||||
) -> 'ToolPermissionContext':
|
||||
roots: list[str | Path] = []
|
||||
if workspace_roots:
|
||||
roots.extend(workspace_roots)
|
||||
if workspace_root is not None:
|
||||
roots.append(workspace_root)
|
||||
return cls(
|
||||
deny_names=frozenset(name.lower() for name in (deny_names or [])),
|
||||
deny_prefixes=tuple(prefix.lower() for prefix in (deny_prefixes or [])),
|
||||
workspace_scope=WorkspacePathScope.from_roots(roots) if roots else None,
|
||||
cwd=Path(cwd).expanduser().resolve(strict=False) if cwd is not None else None,
|
||||
)
|
||||
|
||||
def blocks(self, tool_name: str) -> bool:
|
||||
lowered = tool_name.lower()
|
||||
return lowered in self.deny_names or any(lowered.startswith(prefix) for prefix in self.deny_prefixes)
|
||||
|
||||
def validate_payload_scope(self, tool_name: str, payload: str) -> PathScopeDecision:
|
||||
if self.workspace_scope is None or not _scope_checked_tool(tool_name):
|
||||
return PathScopeDecision(True, 'workspace path scope not required for this tool')
|
||||
return self.workspace_scope.validate_payload(payload, cwd=self.cwd)
|
||||
|
||||
|
||||
def _scope_checked_tool(tool_name: str) -> bool:
|
||||
lowered = tool_name.lower()
|
||||
return any(marker in lowered for marker in ('bash', 'shell', 'powershell', 'fileread', 'filewrite', 'fileedit'))
|
||||
|
||||
@@ -82,6 +82,7 @@ class QueryEnginePort:
|
||||
f'Matched commands: {", ".join(matched_commands) if matched_commands else "none"}',
|
||||
f'Matched tools: {", ".join(matched_tools) if matched_tools else "none"}',
|
||||
f'Permission denials: {len(denied_tools)}',
|
||||
*(f'Permission denial: {denial.tool_name} status={denial.status} reason={denial.reason}' for denial in denied_tools),
|
||||
]
|
||||
output = self._format_output(summary_lines)
|
||||
projected_usage = self.total_usage.add_turn(prompt, output)
|
||||
@@ -116,7 +117,13 @@ class QueryEnginePort:
|
||||
if matched_tools:
|
||||
yield {'type': 'tool_match', 'tools': matched_tools}
|
||||
if denied_tools:
|
||||
yield {'type': 'permission_denial', 'denials': [denial.tool_name for denial in denied_tools]}
|
||||
yield {
|
||||
'type': 'permission_denial',
|
||||
'denials': [
|
||||
{'tool_name': denial.tool_name, 'reason': denial.reason, 'status': denial.status}
|
||||
for denial in denied_tools
|
||||
],
|
||||
}
|
||||
result = self.submit_message(prompt, matched_commands, matched_tools, denied_tools)
|
||||
yield {'type': 'message_delta', 'text': result.output}
|
||||
yield {
|
||||
|
||||
17
src/tools.py
17
src/tools.py
@@ -78,10 +78,25 @@ def find_tools(query: str, limit: int = 20) -> list[PortingModule]:
|
||||
return matches[:limit]
|
||||
|
||||
|
||||
def execute_tool(name: str, payload: str = '') -> ToolExecution:
|
||||
def execute_tool(name: str, payload: str = '', permission_context: ToolPermissionContext | None = None) -> ToolExecution:
|
||||
module = get_tool(name)
|
||||
if module is None:
|
||||
return ToolExecution(name=name, source_hint='', payload=payload, handled=False, message=f'Unknown mirrored tool: {name}')
|
||||
if permission_context and permission_context.blocks(module.name):
|
||||
return ToolExecution(name=module.name, source_hint=module.source_hint, payload=payload, handled=False, message=f"Permission denied for mirrored tool '{module.name}'.")
|
||||
if permission_context:
|
||||
scope_decision = permission_context.validate_payload_scope(module.name, payload)
|
||||
if not scope_decision.allowed:
|
||||
return ToolExecution(
|
||||
name=module.name,
|
||||
source_hint=module.source_hint,
|
||||
payload=payload,
|
||||
handled=False,
|
||||
message=(
|
||||
f"Permission denied for mirrored tool '{module.name}': {scope_decision.reason}"
|
||||
f" (candidate={scope_decision.candidate!r}, resolved={scope_decision.resolved!r})."
|
||||
),
|
||||
)
|
||||
action = f"Mirrored tool '{module.name}' from {module.source_hint} would handle payload {payload!r}."
|
||||
return ToolExecution(name=module.name, source_hint=module.source_hint, payload=payload, handled=True, message=action)
|
||||
|
||||
|
||||
135
tests/test_security_scope.py
Normal file
135
tests/test_security_scope.py
Normal file
@@ -0,0 +1,135 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
from src.models import PermissionDenial
|
||||
from src.path_scope import WorkspacePathScope, extract_path_candidates
|
||||
from src.permissions import ToolPermissionContext
|
||||
from src.query_engine import QueryEnginePort
|
||||
from src.tools import execute_tool
|
||||
|
||||
|
||||
class WorkspacePathScopeTests(unittest.TestCase):
|
||||
def test_direct_parent_escape_is_denied(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
workspace = Path(tmp) / 'workspace'
|
||||
workspace.mkdir()
|
||||
decision = WorkspacePathScope.from_root(workspace).validate_payload('cat ../secret.txt')
|
||||
self.assertFalse(decision.allowed)
|
||||
self.assertIn('outside workspace scope', decision.reason)
|
||||
|
||||
def test_issue_3007_symlink_escape_is_denied(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
root = Path(tmp)
|
||||
workspace = root / 'workspace'
|
||||
outside = root / 'outside'
|
||||
workspace.mkdir()
|
||||
outside.mkdir()
|
||||
(outside / 'secret.txt').write_text('secret')
|
||||
link = workspace / 'linked-outside'
|
||||
link.symlink_to(outside, target_is_directory=True)
|
||||
|
||||
decision = WorkspacePathScope.from_root(workspace).validate_payload('cat linked-outside/secret.txt')
|
||||
|
||||
self.assertFalse(decision.allowed)
|
||||
self.assertIn(str(outside.resolve()), decision.resolved or '')
|
||||
|
||||
def test_glob_expansion_must_stay_inside_workspace(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
root = Path(tmp)
|
||||
workspace = root / 'workspace'
|
||||
outside = root / 'outside'
|
||||
workspace.mkdir()
|
||||
outside.mkdir()
|
||||
(outside / 'secret.txt').write_text('secret')
|
||||
|
||||
decision = WorkspacePathScope.from_root(workspace).validate_payload(f'cat {outside}/*.txt')
|
||||
|
||||
self.assertFalse(decision.allowed)
|
||||
self.assertEqual(str((outside / 'secret.txt').resolve()), decision.resolved)
|
||||
|
||||
def test_shell_environment_expansion_is_validated(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
root = Path(tmp)
|
||||
workspace = root / 'workspace'
|
||||
outside = root / 'outside'
|
||||
workspace.mkdir()
|
||||
outside.mkdir()
|
||||
previous = os.environ.get('CLAW_SCOPE_OUTSIDE')
|
||||
os.environ['CLAW_SCOPE_OUTSIDE'] = str(outside)
|
||||
try:
|
||||
self.assertEqual((f'{outside}/secret.txt',), extract_path_candidates('cat $CLAW_SCOPE_OUTSIDE/secret.txt'))
|
||||
decision = WorkspacePathScope.from_root(workspace).validate_payload('cat $CLAW_SCOPE_OUTSIDE/secret.txt')
|
||||
finally:
|
||||
if previous is None:
|
||||
os.environ.pop('CLAW_SCOPE_OUTSIDE', None)
|
||||
else:
|
||||
os.environ['CLAW_SCOPE_OUTSIDE'] = previous
|
||||
|
||||
self.assertFalse(decision.allowed)
|
||||
self.assertIn(str(outside.resolve()), decision.resolved or '')
|
||||
|
||||
def test_explicit_worktree_roots_are_allowed(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
root = Path(tmp)
|
||||
workspace = root / 'workspace'
|
||||
worktree = root / 'worktree'
|
||||
workspace.mkdir()
|
||||
worktree.mkdir()
|
||||
(worktree / 'file.txt').write_text('ok')
|
||||
|
||||
decision = WorkspacePathScope.from_roots((workspace, worktree)).validate_payload(f'cat {worktree}/file.txt')
|
||||
|
||||
self.assertTrue(decision.allowed, decision.reason)
|
||||
|
||||
def test_windows_absolute_paths_are_denied_for_posix_workspace(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
workspace = Path(tmp) / 'workspace'
|
||||
workspace.mkdir()
|
||||
|
||||
drive_decision = WorkspacePathScope.from_root(workspace).validate_payload(r'type C:\Users\other\secret.txt')
|
||||
unc_decision = WorkspacePathScope.from_root(workspace).validate_payload(r'type \\server\share\secret.txt')
|
||||
|
||||
self.assertFalse(drive_decision.allowed)
|
||||
self.assertIn('windows absolute path', drive_decision.reason)
|
||||
self.assertFalse(unc_decision.allowed)
|
||||
self.assertIn('windows absolute path', unc_decision.reason)
|
||||
|
||||
def test_file_and_shell_tools_use_workspace_scope_context(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
root = Path(tmp)
|
||||
workspace = root / 'workspace'
|
||||
outside = root / 'outside'
|
||||
workspace.mkdir()
|
||||
outside.mkdir()
|
||||
context = ToolPermissionContext.from_iterables(workspace_root=workspace, cwd=workspace)
|
||||
|
||||
file_result = execute_tool('FileReadTool', f'{outside}/secret.txt', permission_context=context)
|
||||
shell_result = execute_tool('BashTool', f'cat {outside}/secret.txt', permission_context=context)
|
||||
inside_result = execute_tool('FileReadTool', './allowed.txt', permission_context=context)
|
||||
|
||||
self.assertFalse(file_result.handled)
|
||||
self.assertIn('Permission denied', file_result.message)
|
||||
self.assertFalse(shell_result.handled)
|
||||
self.assertIn('Permission denied', shell_result.message)
|
||||
self.assertTrue(inside_result.handled)
|
||||
|
||||
def test_permission_denial_stream_events_expose_status_and_reason(self) -> None:
|
||||
engine = QueryEnginePort.from_workspace()
|
||||
denial = PermissionDenial('BashTool', 'path resolves outside workspace scope')
|
||||
|
||||
events = list(engine.stream_submit_message('cat ../secret.txt', matched_tools=('BashTool',), denied_tools=(denial,)))
|
||||
permission_event = next(event for event in events if event['type'] == 'permission_denial')
|
||||
result = engine.submit_message('cat ../secret.txt', matched_tools=('BashTool',), denied_tools=(denial,))
|
||||
|
||||
self.assertEqual('blocked', permission_event['denials'][0]['status'])
|
||||
self.assertEqual('path resolves outside workspace scope', permission_event['denials'][0]['reason'])
|
||||
self.assertIn('status=blocked', result.output)
|
||||
self.assertIn('path resolves outside workspace scope', result.output)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user