Source code for plan_view.commands.edit

"""Edit commands for modifying plan data."""

import argparse
import contextlib
import shutil
import sys
from pathlib import Path

from plan_view.decorators import require_plan
from plan_view.formatting import VALID_STATUSES, now_iso
from plan_view.io import save_plan
from plan_view.state import (
    find_phase,
    find_task,
    format_phase_suggestions,
    format_task_suggestions,
)


def _is_dry_run(args: argparse.Namespace) -> bool:
    """Check if dry-run mode is enabled."""
    return getattr(args, "dry_run", False)


def _prefix(args: argparse.Namespace) -> str:
    """Return message prefix based on dry-run mode."""
    return "Would:" if _is_dry_run(args) else "✅"


[docs] def cmd_init(args: argparse.Namespace) -> None: """Create a new plan.json.""" path = args.file if path.exists() and not args.force: print(f"Error: {path} already exists. Use --force to overwrite.", file=sys.stderr) sys.exit(1) plan = { "meta": { "project": args.name, "version": "1.0.0", "created_at": now_iso(), "updated_at": now_iso(), "business_plan_path": ".claude/BUSINESS_PLAN.md", }, "summary": { "total_phases": 2, "total_tasks": 0, "completed_tasks": 0, "overall_progress": 0, }, "phases": [ { "id": "deferred", "name": "Deferred", "description": "Tasks postponed for later consideration", "status": "pending", "progress": {"completed": 0, "total": 0, "percentage": 0}, "tasks": [], }, { "id": "99", "name": "Bugs", "description": "Tasks identified as bugs requiring fixes", "status": "pending", "progress": {"completed": 0, "total": 0, "percentage": 0}, "tasks": [], }, ], } if not _is_dry_run(args): save_plan(path, plan) if not getattr(args, "quiet", False): print(f"{_prefix(args)} Created {path} for '{args.name}'")
[docs] @require_plan def cmd_add_phase(plan: dict, args: argparse.Namespace) -> None: """Add a new phase.""" # Determine next phase ID existing_ids = [int(p["id"]) for p in plan["phases"] if p["id"].isdigit()] next_id = str(max(existing_ids, default=-1) + 1) phase = { "id": next_id, "name": args.name, "description": args.desc or "", "status": "pending", "progress": {"completed": 0, "total": 0, "percentage": 0}, "tasks": [], } plan["phases"].append(phase) if not _is_dry_run(args): save_plan(args.file, plan) if not getattr(args, "quiet", False): print(f"{_prefix(args)} Added Phase {next_id}: {args.name}")
[docs] @require_plan def cmd_add_task(plan: dict, args: argparse.Namespace) -> None: """Add a new task to a phase.""" phase = find_phase(plan, args.phase) if phase is None: print(f"Error: Phase '{args.phase}' not found\n", file=sys.stderr) print(format_phase_suggestions(plan), file=sys.stderr) sys.exit(1) assert phase is not None # Determine next task ID (phase.section.task) phase_id = phase["id"] existing_tasks = phase.get("tasks", []) # Find the highest section.task number max_section = 0 max_task = 0 for t in existing_tasks: parts = t["id"].split(".") if len(parts) >= 3: section = int(parts[1]) task_num = int(parts[2]) if section > max_section or (section == max_section and task_num > max_task): max_section = section max_task = task_num # Use section 1 if no tasks exist, otherwise increment task number next_id = f"{phase_id}.1.1" if not existing_tasks else f"{phase_id}.{max_section}.{max_task + 1}" task = { "id": next_id, "title": args.title, "status": "pending", "agent_type": args.agent, "depends_on": args.deps.split(",") if args.deps else [], "tracking": {}, } phase["tasks"].append(task) if not _is_dry_run(args): save_plan(args.file, plan) if not getattr(args, "quiet", False): print(f"{_prefix(args)} Added [{next_id}] {args.title}")
[docs] @require_plan def cmd_set(plan: dict, args: argparse.Namespace) -> None: """Set a task field.""" result = find_task(plan, args.id) if result is None: print(f"Error: Task '{args.id}' not found\n", file=sys.stderr) print(format_task_suggestions(plan), file=sys.stderr) sys.exit(1) assert result is not None _, task = result if args.field == "status": if args.value not in VALID_STATUSES: print(f"Error: Invalid status. Use: {', '.join(VALID_STATUSES)}", file=sys.stderr) sys.exit(1) task["status"] = args.value if args.value == "in_progress": task["tracking"]["started_at"] = now_iso() elif args.value == "completed": task["tracking"]["completed_at"] = now_iso() elif args.field == "agent": task["agent_type"] = args.value if args.value != "none" else None elif args.field == "title": task["title"] = args.value else: print(f"Error: Unknown field '{args.field}'. Use: status, agent, title", file=sys.stderr) sys.exit(1) if not _is_dry_run(args): save_plan(args.file, plan) if not getattr(args, "quiet", False): print(f"{_prefix(args)} [{args.id}] {args.field}{args.value}")
[docs] def cmd_done(args: argparse.Namespace) -> None: """Mark task as completed.""" args.field = "status" args.value = "completed" cmd_set(args)
[docs] def cmd_start(args: argparse.Namespace) -> None: """Mark task as in_progress.""" args.field = "status" args.value = "in_progress" cmd_set(args)
[docs] def cmd_block(args: argparse.Namespace) -> None: """Mark task as blocked.""" args.field = "status" args.value = "blocked" cmd_set(args)
[docs] def cmd_skip(args: argparse.Namespace) -> None: """Mark task as skipped.""" args.field = "status" args.value = "skipped" cmd_set(args)
[docs] @require_plan def cmd_defer(plan: dict, args: argparse.Namespace) -> None: """Move task to deferred phase, or create a new deferred task if input is not an existing task ID.""" # Find or create deferred phase deferred = find_phase(plan, "deferred") if deferred is None: deferred = { "id": "deferred", "name": "Deferred", "description": "Tasks postponed for later consideration", "status": "pending", "progress": {"completed": 0, "total": 0, "percentage": 0}, "tasks": [], } plan["phases"].append(deferred) # Generate next ID for deferred phase existing_tasks = deferred.get("tasks", []) assert isinstance(existing_tasks, list) max_task = 0 for t in existing_tasks: assert isinstance(t, dict) parts = str(t["id"]).split(".") if len(parts) >= 3: with contextlib.suppress(ValueError): max_task = max(max_task, int(parts[2])) new_id = f"deferred.1.{max_task + 1}" # Get defer reason if provided (only store non-empty strings) defer_reason = getattr(args, "reason", None) if defer_reason and not defer_reason.strip(): defer_reason = None # Try to find existing task to move result = find_task(plan, args.id) if result is not None: # Move existing task to deferred old_phase, task = result old_phase["tasks"].remove(task) old_id = task["id"] task["id"] = new_id # Add defer reason to tracking if provided if defer_reason: tracking = task["tracking"] assert isinstance(tracking, dict) tracking["defer_reason"] = defer_reason task_list = deferred["tasks"] assert isinstance(task_list, list) task_list.append(task) if not _is_dry_run(args): save_plan(args.file, plan) if not getattr(args, "quiet", False): print(f"{_prefix(args)} [{old_id}] → [{new_id}] (deferred)") else: # Create new deferred task with input as title tracking: dict = {} if defer_reason: tracking["defer_reason"] = defer_reason task = { "id": new_id, "title": args.id, # Use input as title "status": "pending", "agent_type": None, "depends_on": [], "tracking": tracking, } task_list = deferred["tasks"] assert isinstance(task_list, list) task_list.append(task) if not _is_dry_run(args): save_plan(args.file, plan) if not getattr(args, "quiet", False): print(f"{_prefix(args)} Added [{new_id}] {args.id} (deferred)")
[docs] @require_plan def cmd_bug(plan: dict, args: argparse.Namespace) -> None: """Move task to bugs phase, or create a new bug if input is not an existing task ID.""" # Find or create bugs phase bugs = find_phase(plan, "99") if bugs is None: bugs = { "id": "99", "name": "Bugs", "description": "Tasks identified as bugs requiring fixes", "status": "pending", "progress": {"completed": 0, "total": 0, "percentage": 0}, "tasks": [], } plan["phases"].append(bugs) # Generate next ID for bugs phase existing_tasks = bugs.get("tasks", []) assert isinstance(existing_tasks, list) max_task = 0 for t in existing_tasks: assert isinstance(t, dict) parts = str(t["id"]).split(".") if len(parts) >= 3: with contextlib.suppress(ValueError): max_task = max(max_task, int(parts[2])) new_id = f"99.1.{max_task + 1}" # Try to find existing task to move result = find_task(plan, args.id) if result is not None: # Move existing task to bugs old_phase, task = result old_phase["tasks"].remove(task) old_id = task["id"] task["id"] = new_id task_list = bugs["tasks"] assert isinstance(task_list, list) task_list.append(task) if not _is_dry_run(args): save_plan(args.file, plan) if not getattr(args, "quiet", False): print(f"{_prefix(args)} [{old_id}] → [{new_id}] (bug)") else: # Create new bug task with input as title task = { "id": new_id, "title": args.id, # Use input as title "status": "pending", "agent_type": None, "depends_on": [], "tracking": {}, } task_list = bugs["tasks"] assert isinstance(task_list, list) task_list.append(task) if not _is_dry_run(args): save_plan(args.file, plan) if not getattr(args, "quiet", False): print(f"{_prefix(args)} Added [{new_id}] {args.id} (bug)")
[docs] @require_plan def cmd_idea(plan: dict, args: argparse.Namespace) -> None: """Move task to ideas phase, or create a new idea if input is not an existing task ID.""" # Find or create ideas phase ideas = find_phase(plan, "ideas") if ideas is None: ideas = { "id": "ideas", "name": "Ideas", "description": "Tasks stored as future ideas or concepts", "status": "pending", "progress": {"completed": 0, "total": 0, "percentage": 0}, "tasks": [], } plan["phases"].append(ideas) # Generate next ID for ideas phase existing_tasks = ideas.get("tasks", []) assert isinstance(existing_tasks, list) max_task = 0 for t in existing_tasks: assert isinstance(t, dict) parts = str(t["id"]).split(".") if len(parts) >= 3: with contextlib.suppress(ValueError): max_task = max(max_task, int(parts[2])) new_id = f"ideas.1.{max_task + 1}" # Try to find existing task to move result = find_task(plan, args.id) if result is not None: # Move existing task to ideas old_phase, task = result old_phase["tasks"].remove(task) old_id = task["id"] task["id"] = new_id task_list = ideas["tasks"] assert isinstance(task_list, list) task_list.append(task) if not _is_dry_run(args): save_plan(args.file, plan) if not getattr(args, "quiet", False): print(f"{_prefix(args)} [{old_id}] → [{new_id}] (idea)") else: # Create new idea task with input as title task = { "id": new_id, "title": args.id, # Use input as title "status": "pending", "agent_type": None, "depends_on": [], "tracking": {}, } task_list = ideas["tasks"] assert isinstance(task_list, list) task_list.append(task) if not _is_dry_run(args): save_plan(args.file, plan) if not getattr(args, "quiet", False): print(f"{_prefix(args)} Added [{new_id}] {args.id} (idea)")
[docs] @require_plan def cmd_rm(plan: dict, args: argparse.Namespace) -> None: """Remove a phase or task.""" if args.type == "task": result = find_task(plan, args.id) if result is None: print(f"Error: Task '{args.id}' not found\n", file=sys.stderr) print(format_task_suggestions(plan), file=sys.stderr) sys.exit(1) assert result is not None phase, task = result phase["tasks"].remove(task) if not _is_dry_run(args): save_plan(args.file, plan) if not getattr(args, "quiet", False): print(f"{_prefix(args)} Removed task [{args.id}]") else: # args.type == "phase" (argparse enforces this) phase = find_phase(plan, args.id) if phase is None: print(f"Error: Phase '{args.id}' not found\n", file=sys.stderr) print(format_phase_suggestions(plan), file=sys.stderr) sys.exit(1) assert phase is not None plan["phases"].remove(phase) if not _is_dry_run(args): save_plan(args.file, plan) if not getattr(args, "quiet", False): print(f"{_prefix(args)} Removed phase {args.id}")
def _rotate_backups(backup_dir: Path, plan_name: str, max_backups: int = 5) -> None: """Rotate backup files: .json.1 → .json.2, etc. Deletes oldest if at max.""" for i in range(max_backups, 0, -1): old = backup_dir / f"{plan_name}.json.{i}" new = backup_dir / f"{plan_name}.json.{i + 1}" if old.exists(): if i == max_backups: old.unlink() # Delete oldest else: old.rename(new) def _compact_task(task: dict) -> bool: """Strip completed task to minimal fields. Returns True if modified.""" keep = {"id", "title", "status", "tracking"} removed = [k for k in list(task.keys()) if k not in keep] for key in removed: del task[key] # Keep only completed_at in tracking modified = bool(removed) if task.get("tracking"): completed_at = task["tracking"].get("completed_at") old_tracking = task["tracking"] task["tracking"] = {"completed_at": completed_at} if completed_at else {} if task["tracking"] != old_tracking: modified = True return modified
[docs] @require_plan def cmd_compact(plan: dict, args: argparse.Namespace) -> None: """Backup plan and compact completed tasks to minimal fields.""" plan_path = Path(args.file) max_backups = getattr(args, "max_backups", 5) # 1. Create backup directory and rotate existing backups backup_dir = Path(".claude/plan-view") backup_dir.mkdir(parents=True, exist_ok=True) plan_name = plan_path.stem # "plan" from "plan.json" _rotate_backups(backup_dir, plan_name, max_backups) # 2. Save current plan as .json.1 backup_path = backup_dir / f"{plan_name}.json.1" shutil.copy2(plan_path, backup_path) # 3. Compact completed tasks compacted = 0 for phase in plan["phases"]: for task in phase["tasks"]: if task["status"] == "completed" and _compact_task(task): compacted += 1 # 4. Save compacted plan if not _is_dry_run(args): save_plan(args.file, plan) if not getattr(args, "quiet", False): print(f"{_prefix(args)} Backed up to {backup_path}") print(f"{_prefix(args)} Compacted {compacted} completed task{'s' if compacted != 1 else ''}")