From 4af5664ff83a3130e949ce97c9e525a01f8c6002 Mon Sep 17 00:00:00 2001 From: bellman Date: Thu, 14 May 2026 17:18:50 +0900 Subject: [PATCH] omx(team): auto-checkpoint worker-1 [1] --- src/models.py | 1 + src/permissions.py | 31 +++++++- src/query_engine.py | 9 ++- src/tools.py | 17 ++++- tests/test_security_scope.py | 135 +++++++++++++++++++++++++++++++++++ 5 files changed, 190 insertions(+), 3 deletions(-) create mode 100644 tests/test_security_scope.py diff --git a/src/models.py b/src/models.py index d9fc2773..9ce4f9d0 100644 --- a/src/models.py +++ b/src/models.py @@ -23,6 +23,7 @@ class PortingModule: class PermissionDenial: tool_name: str reason: str + status: str = 'blocked' @dataclass(frozen=True) diff --git a/src/permissions.py b/src/permissions.py index 477fd3ec..026448b9 100644 --- a/src/permissions.py +++ b/src/permissions.py @@ -1,20 +1,49 @@ from __future__ import annotations from dataclasses import dataclass, field +from pathlib import Path + +from .path_scope import PathScopeDecision, WorkspacePathScope @dataclass(frozen=True) class ToolPermissionContext: deny_names: frozenset[str] = field(default_factory=frozenset) deny_prefixes: tuple[str, ...] = () + workspace_scope: WorkspacePathScope | None = None + cwd: Path | None = None @classmethod - def from_iterables(cls, deny_names: list[str] | None = None, deny_prefixes: list[str] | None = None) -> 'ToolPermissionContext': + def from_iterables( + cls, + deny_names: list[str] | None = None, + deny_prefixes: list[str] | None = None, + workspace_root: str | Path | None = None, + workspace_roots: list[str | Path] | tuple[str | Path, ...] | None = None, + cwd: str | Path | None = None, + ) -> 'ToolPermissionContext': + roots: list[str | Path] = [] + if workspace_roots: + roots.extend(workspace_roots) + if workspace_root is not None: + roots.append(workspace_root) return cls( deny_names=frozenset(name.lower() for name in (deny_names or [])), deny_prefixes=tuple(prefix.lower() for prefix in (deny_prefixes or [])), + workspace_scope=WorkspacePathScope.from_roots(roots) if roots else None, + cwd=Path(cwd).expanduser().resolve(strict=False) if cwd is not None else None, ) def blocks(self, tool_name: str) -> bool: lowered = tool_name.lower() return lowered in self.deny_names or any(lowered.startswith(prefix) for prefix in self.deny_prefixes) + + def validate_payload_scope(self, tool_name: str, payload: str) -> PathScopeDecision: + if self.workspace_scope is None or not _scope_checked_tool(tool_name): + return PathScopeDecision(True, 'workspace path scope not required for this tool') + return self.workspace_scope.validate_payload(payload, cwd=self.cwd) + + +def _scope_checked_tool(tool_name: str) -> bool: + lowered = tool_name.lower() + return any(marker in lowered for marker in ('bash', 'shell', 'powershell', 'fileread', 'filewrite', 'fileedit')) diff --git a/src/query_engine.py b/src/query_engine.py index bb14c2e3..dcf2008e 100644 --- a/src/query_engine.py +++ b/src/query_engine.py @@ -82,6 +82,7 @@ class QueryEnginePort: f'Matched commands: {", ".join(matched_commands) if matched_commands else "none"}', f'Matched tools: {", ".join(matched_tools) if matched_tools else "none"}', f'Permission denials: {len(denied_tools)}', + *(f'Permission denial: {denial.tool_name} status={denial.status} reason={denial.reason}' for denial in denied_tools), ] output = self._format_output(summary_lines) projected_usage = self.total_usage.add_turn(prompt, output) @@ -116,7 +117,13 @@ class QueryEnginePort: if matched_tools: yield {'type': 'tool_match', 'tools': matched_tools} if denied_tools: - yield {'type': 'permission_denial', 'denials': [denial.tool_name for denial in denied_tools]} + yield { + 'type': 'permission_denial', + 'denials': [ + {'tool_name': denial.tool_name, 'reason': denial.reason, 'status': denial.status} + for denial in denied_tools + ], + } result = self.submit_message(prompt, matched_commands, matched_tools, denied_tools) yield {'type': 'message_delta', 'text': result.output} yield { diff --git a/src/tools.py b/src/tools.py index 580a078b..e4e43405 100644 --- a/src/tools.py +++ b/src/tools.py @@ -78,10 +78,25 @@ def find_tools(query: str, limit: int = 20) -> list[PortingModule]: return matches[:limit] -def execute_tool(name: str, payload: str = '') -> ToolExecution: +def execute_tool(name: str, payload: str = '', permission_context: ToolPermissionContext | None = None) -> ToolExecution: module = get_tool(name) if module is None: return ToolExecution(name=name, source_hint='', payload=payload, handled=False, message=f'Unknown mirrored tool: {name}') + if permission_context and permission_context.blocks(module.name): + return ToolExecution(name=module.name, source_hint=module.source_hint, payload=payload, handled=False, message=f"Permission denied for mirrored tool '{module.name}'.") + if permission_context: + scope_decision = permission_context.validate_payload_scope(module.name, payload) + if not scope_decision.allowed: + return ToolExecution( + name=module.name, + source_hint=module.source_hint, + payload=payload, + handled=False, + message=( + f"Permission denied for mirrored tool '{module.name}': {scope_decision.reason}" + f" (candidate={scope_decision.candidate!r}, resolved={scope_decision.resolved!r})." + ), + ) action = f"Mirrored tool '{module.name}' from {module.source_hint} would handle payload {payload!r}." return ToolExecution(name=module.name, source_hint=module.source_hint, payload=payload, handled=True, message=action) diff --git a/tests/test_security_scope.py b/tests/test_security_scope.py new file mode 100644 index 00000000..55cba20a --- /dev/null +++ b/tests/test_security_scope.py @@ -0,0 +1,135 @@ +from __future__ import annotations + +import os +import tempfile +import unittest +from pathlib import Path + +from src.models import PermissionDenial +from src.path_scope import WorkspacePathScope, extract_path_candidates +from src.permissions import ToolPermissionContext +from src.query_engine import QueryEnginePort +from src.tools import execute_tool + + +class WorkspacePathScopeTests(unittest.TestCase): + def test_direct_parent_escape_is_denied(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + workspace = Path(tmp) / 'workspace' + workspace.mkdir() + decision = WorkspacePathScope.from_root(workspace).validate_payload('cat ../secret.txt') + self.assertFalse(decision.allowed) + self.assertIn('outside workspace scope', decision.reason) + + def test_issue_3007_symlink_escape_is_denied(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + workspace = root / 'workspace' + outside = root / 'outside' + workspace.mkdir() + outside.mkdir() + (outside / 'secret.txt').write_text('secret') + link = workspace / 'linked-outside' + link.symlink_to(outside, target_is_directory=True) + + decision = WorkspacePathScope.from_root(workspace).validate_payload('cat linked-outside/secret.txt') + + self.assertFalse(decision.allowed) + self.assertIn(str(outside.resolve()), decision.resolved or '') + + def test_glob_expansion_must_stay_inside_workspace(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + workspace = root / 'workspace' + outside = root / 'outside' + workspace.mkdir() + outside.mkdir() + (outside / 'secret.txt').write_text('secret') + + decision = WorkspacePathScope.from_root(workspace).validate_payload(f'cat {outside}/*.txt') + + self.assertFalse(decision.allowed) + self.assertEqual(str((outside / 'secret.txt').resolve()), decision.resolved) + + def test_shell_environment_expansion_is_validated(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + workspace = root / 'workspace' + outside = root / 'outside' + workspace.mkdir() + outside.mkdir() + previous = os.environ.get('CLAW_SCOPE_OUTSIDE') + os.environ['CLAW_SCOPE_OUTSIDE'] = str(outside) + try: + self.assertEqual((f'{outside}/secret.txt',), extract_path_candidates('cat $CLAW_SCOPE_OUTSIDE/secret.txt')) + decision = WorkspacePathScope.from_root(workspace).validate_payload('cat $CLAW_SCOPE_OUTSIDE/secret.txt') + finally: + if previous is None: + os.environ.pop('CLAW_SCOPE_OUTSIDE', None) + else: + os.environ['CLAW_SCOPE_OUTSIDE'] = previous + + self.assertFalse(decision.allowed) + self.assertIn(str(outside.resolve()), decision.resolved or '') + + def test_explicit_worktree_roots_are_allowed(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + workspace = root / 'workspace' + worktree = root / 'worktree' + workspace.mkdir() + worktree.mkdir() + (worktree / 'file.txt').write_text('ok') + + decision = WorkspacePathScope.from_roots((workspace, worktree)).validate_payload(f'cat {worktree}/file.txt') + + self.assertTrue(decision.allowed, decision.reason) + + def test_windows_absolute_paths_are_denied_for_posix_workspace(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + workspace = Path(tmp) / 'workspace' + workspace.mkdir() + + drive_decision = WorkspacePathScope.from_root(workspace).validate_payload(r'type C:\Users\other\secret.txt') + unc_decision = WorkspacePathScope.from_root(workspace).validate_payload(r'type \\server\share\secret.txt') + + self.assertFalse(drive_decision.allowed) + self.assertIn('windows absolute path', drive_decision.reason) + self.assertFalse(unc_decision.allowed) + self.assertIn('windows absolute path', unc_decision.reason) + + def test_file_and_shell_tools_use_workspace_scope_context(self) -> None: + with tempfile.TemporaryDirectory() as tmp: + root = Path(tmp) + workspace = root / 'workspace' + outside = root / 'outside' + workspace.mkdir() + outside.mkdir() + context = ToolPermissionContext.from_iterables(workspace_root=workspace, cwd=workspace) + + file_result = execute_tool('FileReadTool', f'{outside}/secret.txt', permission_context=context) + shell_result = execute_tool('BashTool', f'cat {outside}/secret.txt', permission_context=context) + inside_result = execute_tool('FileReadTool', './allowed.txt', permission_context=context) + + self.assertFalse(file_result.handled) + self.assertIn('Permission denied', file_result.message) + self.assertFalse(shell_result.handled) + self.assertIn('Permission denied', shell_result.message) + self.assertTrue(inside_result.handled) + + def test_permission_denial_stream_events_expose_status_and_reason(self) -> None: + engine = QueryEnginePort.from_workspace() + denial = PermissionDenial('BashTool', 'path resolves outside workspace scope') + + events = list(engine.stream_submit_message('cat ../secret.txt', matched_tools=('BashTool',), denied_tools=(denial,))) + permission_event = next(event for event in events if event['type'] == 'permission_denial') + result = engine.submit_message('cat ../secret.txt', matched_tools=('BashTool',), denied_tools=(denial,)) + + self.assertEqual('blocked', permission_event['denials'][0]['status']) + self.assertEqual('path resolves outside workspace scope', permission_event['denials'][0]['reason']) + self.assertIn('status=blocked', result.output) + self.assertIn('path resolves outside workspace scope', result.output) + + +if __name__ == '__main__': + unittest.main()