Source code for plan_view.commands.edit

"""Edit commands for modifying plan data."""

import argparse
import contextlib
import json
import shutil
import sys
from pathlib import Path

from plan_view.commands.view import cmd_validate
from plan_view.decorators import require_plan
from plan_view.formatting import BOLD, DIM, RESET, VALID_STATUSES, now_iso
from plan_view.io import save_plan
from plan_view.state import (
    find_phase,
    find_task,
    format_phase_suggestions,
    format_task_suggestions,
)


def _is_dry_run(args: argparse.Namespace) -> bool:
    """Check if dry-run mode is enabled."""
    return getattr(args, "dry_run", False)


def _prefix(args: argparse.Namespace) -> str:
    """Return message prefix based on dry-run mode."""
    return "Would:" if _is_dry_run(args) else "✅"


[docs] def cmd_init(args: argparse.Namespace) -> None: """Create a new plan.json.""" path = args.file if path.exists() and not args.force: print(f"Error: {path} already exists. Use --force to overwrite.", file=sys.stderr) sys.exit(1) if path.exists() and args.force: backup_dir = Path(".claude/plan-view") backup_dir.mkdir(parents=True, exist_ok=True) plan_name = path.stem _rotate_backups(backup_dir, plan_name) backup_path = backup_dir / f"{plan_name}.json.1" shutil.copy2(path, backup_path) # Reset periodic backup counter periodic_path = backup_dir / "periodic.json" if periodic_path.exists(): periodic_path.unlink() if not getattr(args, "quiet", False): print(f"{_prefix(args)} Backed up existing plan to {backup_path}") plan = { "meta": { "project": args.name, "version": "1.0.0", "created_at": now_iso(), "updated_at": now_iso(), "business_plan_path": ".claude/BUSINESS_PLAN.md", }, "summary": { "total_phases": 2, "total_tasks": 0, "completed_tasks": 0, "overall_progress": 0, }, "phases": [ { "id": "deferred", "name": "Deferred", "description": "Tasks postponed for later consideration", "status": "pending", "progress": {"completed": 0, "total": 0, "percentage": 0}, "tasks": [], }, { "id": "bugs", "name": "Bugs", "description": "Tasks identified as bugs requiring fixes", "status": "pending", "progress": {"completed": 0, "total": 0, "percentage": 0}, "tasks": [], }, ], } if not _is_dry_run(args): save_plan(path, plan) if not getattr(args, "quiet", False): print(f"{_prefix(args)} Created {path} for '{args.name}'")
[docs] @require_plan def cmd_add_phase(plan: dict, args: argparse.Namespace) -> None: """Add a new phase.""" # Determine next phase ID existing_ids = [int(p["id"]) for p in plan["phases"] if p["id"].isdigit()] next_id = str(max(existing_ids, default=-1) + 1) phase = { "id": next_id, "name": args.name, "description": args.desc or "", "status": "pending", "progress": {"completed": 0, "total": 0, "percentage": 0}, "tasks": [], } plan["phases"].append(phase) if not _is_dry_run(args): save_plan(args.file, plan) if not getattr(args, "quiet", False): print(f"{_prefix(args)} Added Phase {next_id}: {args.name}")
[docs] @require_plan def cmd_add_task(plan: dict, args: argparse.Namespace) -> None: """Add a new task to a phase.""" phase = find_phase(plan, args.phase) if phase is None: print(f"Error: Phase '{args.phase}' not found\n", file=sys.stderr) print(format_phase_suggestions(plan), file=sys.stderr) sys.exit(1) assert phase is not None # Determine next task ID (phase.section.task) phase_id = phase["id"] existing_tasks = phase.get("tasks", []) # Find the highest section.task number max_section = 0 max_task = 0 for t in existing_tasks: parts = t["id"].split(".") if len(parts) >= 3: section = int(parts[1]) task_num = int(parts[2]) if section > max_section or (section == max_section and task_num > max_task): max_section = section max_task = task_num # Use section 1 if no tasks exist, otherwise increment task number next_id = f"{phase_id}.1.1" if not existing_tasks else f"{phase_id}.{max_section}.{max_task + 1}" # Parse files if provided files_arg = getattr(args, "files", None) files_list = [f.strip() for f in files_arg.split(",")] if files_arg else [] task = { "id": next_id, "title": args.title, "status": "pending", "agent_type": args.agent, "skill": getattr(args, "skill", None), "depends_on": args.deps.split(",") if args.deps else [], "files": files_list if files_list else [], "tracking": {}, } phase["tasks"].append(task) if not _is_dry_run(args): save_plan(args.file, plan) if not getattr(args, "quiet", False): print(f"{_prefix(args)} Added [{next_id}] {args.title}")
[docs] @require_plan def cmd_set(plan: dict, args: argparse.Namespace) -> None: """Set a task field.""" result = find_task(plan, args.id) if result is None: print(f"Error: Task '{args.id}' not found\n", file=sys.stderr) print(format_task_suggestions(plan), file=sys.stderr) sys.exit(1) assert result is not None _, task = result if args.field == "status": if args.value not in VALID_STATUSES: print(f"Error: Invalid status. Use: {', '.join(VALID_STATUSES)}", file=sys.stderr) sys.exit(1) task["status"] = args.value if args.value == "in_progress": task["tracking"]["started_at"] = now_iso() elif args.value == "completed": task["tracking"]["completed_at"] = now_iso() # Cascade to subtasks for subtask in task.get("subtasks", []): subtask["status"] = "completed" elif args.field == "agent": task["agent_type"] = args.value if args.value != "none" else None elif args.field == "skill": task["skill"] = args.value if args.value != "none" else None elif args.field == "title": task["title"] = args.value elif args.field == "files": # Comma-separated list of file paths with optional line refs if args.value in ("none", ""): task["files"] = [] else: task["files"] = [f.strip() for f in args.value.split(",")] elif args.field == "research": task["research"] = args.value if args.value != "none" else None elif args.field == "plan": task["plan"] = args.value if args.value != "none" else None else: valid_fields = "status, agent, skill, title, files, research, plan" print(f"Error: Unknown field '{args.field}'. Use: {valid_fields}", file=sys.stderr) sys.exit(1) if not _is_dry_run(args): save_plan(args.file, plan) if not getattr(args, "quiet", False): print(f"{_prefix(args)} [{args.id}] {args.field}{args.value}")
[docs] def cmd_done(args: argparse.Namespace) -> None: """Mark task as completed.""" args.field = "status" args.value = "completed" cmd_set(args)
[docs] def cmd_start(args: argparse.Namespace) -> None: """Mark task as in_progress.""" args.field = "status" args.value = "in_progress" cmd_set(args)
[docs] def cmd_block(args: argparse.Namespace) -> None: """Mark task as blocked.""" args.field = "status" args.value = "blocked" cmd_set(args)
[docs] def cmd_skip(args: argparse.Namespace) -> None: """Mark task as skipped.""" args.field = "status" args.value = "skipped" cmd_set(args)
[docs] @require_plan def cmd_defer(plan: dict, args: argparse.Namespace) -> None: """Move task to deferred phase, or create a new deferred task if input is not an existing task ID.""" # Find or create deferred phase deferred = find_phase(plan, "deferred") if deferred is None: deferred = { "id": "deferred", "name": "Deferred", "description": "Tasks postponed for later consideration", "status": "pending", "progress": {"completed": 0, "total": 0, "percentage": 0}, "tasks": [], } plan["phases"].append(deferred) # Generate next ID for deferred phase existing_tasks = deferred.get("tasks", []) assert isinstance(existing_tasks, list) max_task = 0 for t in existing_tasks: assert isinstance(t, dict) parts = str(t["id"]).split(".") if len(parts) >= 3: with contextlib.suppress(ValueError): max_task = max(max_task, int(parts[2])) new_id = f"deferred.1.{max_task + 1}" # Get defer reason if provided (only store non-empty strings) defer_reason = getattr(args, "reason", None) if defer_reason and not defer_reason.strip(): defer_reason = None # Try to find existing task to move result = find_task(plan, args.id) if result is not None: # Move existing task to deferred old_phase, task = result old_phase["tasks"].remove(task) old_id = task["id"] task["id"] = new_id # Add defer reason to tracking if provided if defer_reason: tracking = task["tracking"] assert isinstance(tracking, dict) tracking["defer_reason"] = defer_reason task_list = deferred["tasks"] assert isinstance(task_list, list) task_list.append(task) if not _is_dry_run(args): save_plan(args.file, plan) if not getattr(args, "quiet", False): print(f"{_prefix(args)} [{old_id}] → [{new_id}] (deferred)") else: # Create new deferred task with input as title tracking: dict = {} if defer_reason: tracking["defer_reason"] = defer_reason task = { "id": new_id, "title": args.id, # Use input as title "status": "pending", "agent_type": None, "skill": None, "depends_on": [], "tracking": tracking, } task_list = deferred["tasks"] assert isinstance(task_list, list) task_list.append(task) if not _is_dry_run(args): save_plan(args.file, plan) if not getattr(args, "quiet", False): print(f"{_prefix(args)} Added [{new_id}] {args.id} (deferred)")
[docs] @require_plan def cmd_bug(plan: dict, args: argparse.Namespace) -> None: """Move task to bugs phase, or create a new bug if input is not an existing task ID.""" # Find or create bugs phase bugs = find_phase(plan, "bugs") if bugs is None: bugs = { "id": "bugs", "name": "Bugs", "description": "Tasks identified as bugs requiring fixes", "status": "pending", "progress": {"completed": 0, "total": 0, "percentage": 0}, "tasks": [], } plan["phases"].append(bugs) # Generate next ID for bugs phase existing_tasks = bugs.get("tasks", []) assert isinstance(existing_tasks, list) max_task = 0 for t in existing_tasks: assert isinstance(t, dict) parts = str(t["id"]).split(".") if len(parts) >= 3: with contextlib.suppress(ValueError): max_task = max(max_task, int(parts[2])) new_id = f"bugs.1.{max_task + 1}" # Try to find existing task to move result = find_task(plan, args.id) if result is not None: # Move existing task to bugs old_phase, task = result old_phase["tasks"].remove(task) old_id = task["id"] task["id"] = new_id task_list = bugs["tasks"] assert isinstance(task_list, list) task_list.append(task) if not _is_dry_run(args): save_plan(args.file, plan) if not getattr(args, "quiet", False): print(f"{_prefix(args)} [{old_id}] → [{new_id}] (bug)") else: # Create new bug task with input as title task = { "id": new_id, "title": args.id, # Use input as title "status": "pending", "agent_type": None, "skill": None, "depends_on": [], "tracking": {}, } task_list = bugs["tasks"] assert isinstance(task_list, list) task_list.append(task) if not _is_dry_run(args): save_plan(args.file, plan) if not getattr(args, "quiet", False): print(f"{_prefix(args)} Added [{new_id}] {args.id} (bug)")
[docs] @require_plan def cmd_idea(plan: dict, args: argparse.Namespace) -> None: """Move task to ideas phase, or create a new idea if input is not an existing task ID.""" # Find or create ideas phase ideas = find_phase(plan, "ideas") if ideas is None: ideas = { "id": "ideas", "name": "Ideas", "description": "Tasks stored as future ideas or concepts", "status": "pending", "progress": {"completed": 0, "total": 0, "percentage": 0}, "tasks": [], } plan["phases"].append(ideas) # Generate next ID for ideas phase existing_tasks = ideas.get("tasks", []) assert isinstance(existing_tasks, list) max_task = 0 for t in existing_tasks: assert isinstance(t, dict) parts = str(t["id"]).split(".") if len(parts) >= 3: with contextlib.suppress(ValueError): max_task = max(max_task, int(parts[2])) new_id = f"ideas.1.{max_task + 1}" # Try to find existing task to move result = find_task(plan, args.id) if result is not None: # Move existing task to ideas old_phase, task = result old_phase["tasks"].remove(task) old_id = task["id"] task["id"] = new_id task_list = ideas["tasks"] assert isinstance(task_list, list) task_list.append(task) if not _is_dry_run(args): save_plan(args.file, plan) if not getattr(args, "quiet", False): print(f"{_prefix(args)} [{old_id}] → [{new_id}] (idea)") else: # Create new idea task with input as title task = { "id": new_id, "title": args.id, # Use input as title "status": "pending", "agent_type": None, "skill": None, "depends_on": [], "tracking": {}, } task_list = ideas["tasks"] assert isinstance(task_list, list) task_list.append(task) if not _is_dry_run(args): save_plan(args.file, plan) if not getattr(args, "quiet", False): print(f"{_prefix(args)} Added [{new_id}] {args.id} (idea)")
[docs] @require_plan def cmd_rm(plan: dict, args: argparse.Namespace) -> None: """Remove a phase or task.""" if args.type == "task": result = find_task(plan, args.id) if result is None: print(f"Error: Task '{args.id}' not found\n", file=sys.stderr) print(format_task_suggestions(plan), file=sys.stderr) sys.exit(1) assert result is not None phase, task = result phase["tasks"].remove(task) if not _is_dry_run(args): save_plan(args.file, plan) if not getattr(args, "quiet", False): print(f"{_prefix(args)} Removed task [{args.id}]") else: # args.type == "phase" (argparse enforces this) phase = find_phase(plan, args.id) if phase is None: print(f"Error: Phase '{args.id}' not found\n", file=sys.stderr) print(format_phase_suggestions(plan), file=sys.stderr) sys.exit(1) assert phase is not None plan["phases"].remove(phase) if not _is_dry_run(args): save_plan(args.file, plan) if not getattr(args, "quiet", False): print(f"{_prefix(args)} Removed phase {args.id}")
def _rotate_backups(backup_dir: Path, plan_name: str, max_backups: int = 5) -> None: """Rotate backup files: .json.1 → .json.2, etc. Deletes oldest if at max.""" for i in range(max_backups, 0, -1): old = backup_dir / f"{plan_name}.json.{i}" new = backup_dir / f"{plan_name}.json.{i + 1}" if old.exists(): if i == max_backups: old.unlink() # Delete oldest else: old.rename(new) def _compact_task(task: dict) -> bool: """Strip completed task to minimal fields. Returns True if modified.""" keep = {"id", "title", "status", "tracking"} removed = [k for k in list(task.keys()) if k not in keep] for key in removed: del task[key] # Keep only completed_at in tracking modified = bool(removed) if task.get("tracking"): completed_at = task["tracking"].get("completed_at") old_tracking = task["tracking"] task["tracking"] = {"completed_at": completed_at} if completed_at else {} if task["tracking"] != old_tracking: modified = True return modified
[docs] @require_plan def cmd_compact(plan: dict, args: argparse.Namespace) -> None: """Backup plan and compact completed tasks to minimal fields.""" plan_path = Path(args.file) max_backups = getattr(args, "max_backups", 5) # 1. Create backup directory and rotate existing backups backup_dir = Path(".claude/plan-view") backup_dir.mkdir(parents=True, exist_ok=True) plan_name = plan_path.stem # "plan" from "plan.json" _rotate_backups(backup_dir, plan_name, max_backups) # 2. Save current plan as .json.1 backup_path = backup_dir / f"{plan_name}.json.1" shutil.copy2(plan_path, backup_path) # 3. Compact completed tasks compacted = 0 for phase in plan["phases"]: for task in phase["tasks"]: if task["status"] == "completed" and _compact_task(task): compacted += 1 # 4. Save compacted plan if not _is_dry_run(args): save_plan(args.file, plan) if not getattr(args, "quiet", False): print(f"{_prefix(args)} Backed up to {backup_path}") print(f"{_prefix(args)} Compacted {compacted} completed task{'s' if compacted != 1 else ''}")
[docs] @require_plan def cmd_reconcile(plan: dict, args: argparse.Namespace) -> None: """Reconcile plan data: fix inconsistencies and validate.""" fixes = [] # 1. Cascade subtask status for completed tasks for phase in plan["phases"]: for task in phase["tasks"]: if task["status"] == "completed": for subtask in task.get("subtasks", []): if subtask["status"] != "completed": subtask["status"] = "completed" msg = f" Marked subtask {subtask['id']} as completed (parent {task['id']} is completed)" fixes.append(msg) # 2. Ensure completed tasks have completed_at timestamp for phase in plan["phases"]: for task in phase["tasks"]: if task["status"] == "completed": tracking = task.get("tracking", {}) if not tracking.get("completed_at"): task.setdefault("tracking", {})["completed_at"] = now_iso() fixes.append(f" Added completed_at timestamp to task {task['id']}") # 3. Ensure in_progress tasks have started_at timestamp for phase in plan["phases"]: for task in phase["tasks"]: if task["status"] == "in_progress": tracking = task.get("tracking", {}) if not tracking.get("started_at"): task.setdefault("tracking", {})["started_at"] = now_iso() fixes.append(f" Added started_at timestamp to task {task['id']}") # 4. Ensure all tasks have tracking and depends_on fields for phase in plan["phases"]: for task in phase["tasks"]: if "tracking" not in task: task["tracking"] = {} fixes.append(f" Added tracking field to task {task['id']}") if "depends_on" not in task: task["depends_on"] = [] fixes.append(f" Added depends_on field to task {task['id']}") # Report fixes if not getattr(args, "quiet", False): if fixes: print(f"Reconciled {len(fixes)} issue{'s' if len(fixes) != 1 else ''}:") for fix in fixes: print(fix) else: print("No issues found.") # Save if there were fixes if fixes and not _is_dry_run(args): save_plan(args.file, plan) if not getattr(args, "quiet", False): print(f"\nSaved changes to {args.file}") # Run validation if not getattr(args, "quiet", False): print("\nValidating...") cmd_validate(plan, args.file, as_json=getattr(args, "json", False))
def _list_restore_points(backup_dir: Path) -> list[dict]: """Discover available restore points in backup_dir. Returns list of dicts with keys: path, label, size, modified, type. Sorted newest-first. """ points: list[dict] = [] # Full backups: plan.json.1, plan.json.2, ... for f in sorted(backup_dir.glob("plan.json.[0-9]*")): suffix = f.name.split(".")[-1] stat = f.stat() try: data = json.loads(f.read_text()) project = data.get("meta", {}).get("project", "?") updated = data.get("meta", {}).get("updated_at", "?") label = f"Full backup #{suffix}{project} (updated {updated})" except (json.JSONDecodeError, KeyError): label = f"Full backup #{suffix}" points.append({ "path": f, "label": label, "size": stat.st_size, "modified": stat.st_mtime, "type": "full", "index": int(suffix), }) # Delta backups: plan.delta.1.json, plan.delta.2.json, ... for f in sorted(backup_dir.glob("plan.delta.[0-9]*.json")): parts = f.stem.split(".") suffix = parts[2] if len(parts) >= 3 else "?" stat = f.stat() try: patch_ops = json.loads(f.read_text()) op_count = len(patch_ops) if isinstance(patch_ops, list) else 0 label = f"Delta patch #{suffix}{op_count} operation{'s' if op_count != 1 else ''}" except (json.JSONDecodeError, KeyError): label = f"Delta patch #{suffix}" points.append({ "path": f, "label": label, "size": stat.st_size, "modified": stat.st_mtime, "type": "delta", "index": int(suffix), }) # Sort by modification time, newest first points.sort(key=lambda p: p["modified"], reverse=True) return points
[docs] def cmd_restore(args: argparse.Namespace) -> None: """List restore points and restore a chosen backup.""" plan_path = Path(args.file) backup_dir = Path(".claude/plan-view") if not backup_dir.exists(): print("No backup directory found (.claude/plan-view/)", file=sys.stderr) sys.exit(1) points = _list_restore_points(backup_dir) if not points: print("No restore points found.", file=sys.stderr) sys.exit(1) # List mode (no --point given) choice = getattr(args, "point", None) if choice is None: print(f"{BOLD}Available restore points:{RESET}\n") for idx, pt in enumerate(points, 1): type_tag = "full" if pt["type"] == "full" else "delta" print(f" {BOLD}{idx}{RESET}) [{DIM}{type_tag}{RESET}] {pt['label']}") print(f"\n{DIM}Usage: pv restore <number>{RESET}") return # Restore mode try: idx = int(choice) - 1 except ValueError: print(f"Error: '{choice}' is not a valid number", file=sys.stderr) sys.exit(1) if idx < 0 or idx >= len(points): print(f"Error: choice must be between 1 and {len(points)}", file=sys.stderr) sys.exit(1) selected = points[idx] if selected["type"] == "delta": print("Error: Delta patches cannot be restored directly. Choose a full backup.", file=sys.stderr) sys.exit(1) # Read restore content before rotation moves the file restore_content = selected["path"].read_bytes() # Back up current plan before restoring if plan_path.exists(): backup_dir.mkdir(parents=True, exist_ok=True) plan_name = plan_path.stem _rotate_backups(backup_dir, plan_name) pre_restore_path = backup_dir / f"{plan_name}.json.1" shutil.copy2(plan_path, pre_restore_path) if not getattr(args, "quiet", False): print(f"Backed up current plan to {pre_restore_path}") # Write the restore content over the current plan if not _is_dry_run(args): plan_path.write_bytes(restore_content) # Reset periodic counter since we changed the plan periodic_path = backup_dir / "periodic.json" if periodic_path.exists(): periodic_path.unlink() if not getattr(args, "quiet", False): print(f"{_prefix(args)} Restored from: {selected['label']}")