"""Edit commands for modifying plan data."""
import argparse
import contextlib
import json
import shutil
import sys
from pathlib import Path
from plan_view.commands.view import cmd_validate
from plan_view.decorators import require_plan
from plan_view.formatting import BOLD, DIM, RESET, VALID_STATUSES, now_iso
from plan_view.io import save_plan
from plan_view.state import (
find_phase,
find_task,
format_phase_suggestions,
format_task_suggestions,
)
def _is_dry_run(args: argparse.Namespace) -> bool:
"""Check if dry-run mode is enabled."""
return getattr(args, "dry_run", False)
def _prefix(args: argparse.Namespace) -> str:
"""Return message prefix based on dry-run mode."""
return "Would:" if _is_dry_run(args) else "✅"
[docs]
def cmd_init(args: argparse.Namespace) -> None:
"""Create a new plan.json."""
path = args.file
if path.exists() and not args.force:
print(f"Error: {path} already exists. Use --force to overwrite.", file=sys.stderr)
sys.exit(1)
if path.exists() and args.force:
backup_dir = Path(".claude/plan-view")
backup_dir.mkdir(parents=True, exist_ok=True)
plan_name = path.stem
_rotate_backups(backup_dir, plan_name)
backup_path = backup_dir / f"{plan_name}.json.1"
shutil.copy2(path, backup_path)
# Reset periodic backup counter
periodic_path = backup_dir / "periodic.json"
if periodic_path.exists():
periodic_path.unlink()
if not getattr(args, "quiet", False):
print(f"{_prefix(args)} Backed up existing plan to {backup_path}")
plan = {
"meta": {
"project": args.name,
"version": "1.0.0",
"created_at": now_iso(),
"updated_at": now_iso(),
"business_plan_path": ".claude/BUSINESS_PLAN.md",
},
"summary": {
"total_phases": 2,
"total_tasks": 0,
"completed_tasks": 0,
"overall_progress": 0,
},
"phases": [
{
"id": "deferred",
"name": "Deferred",
"description": "Tasks postponed for later consideration",
"status": "pending",
"progress": {"completed": 0, "total": 0, "percentage": 0},
"tasks": [],
},
{
"id": "bugs",
"name": "Bugs",
"description": "Tasks identified as bugs requiring fixes",
"status": "pending",
"progress": {"completed": 0, "total": 0, "percentage": 0},
"tasks": [],
},
],
}
if not _is_dry_run(args):
save_plan(path, plan)
if not getattr(args, "quiet", False):
print(f"{_prefix(args)} Created {path} for '{args.name}'")
[docs]
@require_plan
def cmd_add_phase(plan: dict, args: argparse.Namespace) -> None:
"""Add a new phase."""
# Determine next phase ID
existing_ids = [int(p["id"]) for p in plan["phases"] if p["id"].isdigit()]
next_id = str(max(existing_ids, default=-1) + 1)
phase = {
"id": next_id,
"name": args.name,
"description": args.desc or "",
"status": "pending",
"progress": {"completed": 0, "total": 0, "percentage": 0},
"tasks": [],
}
plan["phases"].append(phase)
if not _is_dry_run(args):
save_plan(args.file, plan)
if not getattr(args, "quiet", False):
print(f"{_prefix(args)} Added Phase {next_id}: {args.name}")
[docs]
@require_plan
def cmd_add_task(plan: dict, args: argparse.Namespace) -> None:
"""Add a new task to a phase."""
phase = find_phase(plan, args.phase)
if phase is None:
print(f"Error: Phase '{args.phase}' not found\n", file=sys.stderr)
print(format_phase_suggestions(plan), file=sys.stderr)
sys.exit(1)
assert phase is not None
# Determine next task ID (phase.section.task)
phase_id = phase["id"]
existing_tasks = phase.get("tasks", [])
# Find the highest section.task number
max_section = 0
max_task = 0
for t in existing_tasks:
parts = t["id"].split(".")
if len(parts) >= 3:
section = int(parts[1])
task_num = int(parts[2])
if section > max_section or (section == max_section and task_num > max_task):
max_section = section
max_task = task_num
# Use section 1 if no tasks exist, otherwise increment task number
next_id = f"{phase_id}.1.1" if not existing_tasks else f"{phase_id}.{max_section}.{max_task + 1}"
# Parse files if provided
files_arg = getattr(args, "files", None)
files_list = [f.strip() for f in files_arg.split(",")] if files_arg else []
task = {
"id": next_id,
"title": args.title,
"status": "pending",
"agent_type": args.agent,
"skill": getattr(args, "skill", None),
"depends_on": args.deps.split(",") if args.deps else [],
"files": files_list if files_list else [],
"tracking": {},
}
phase["tasks"].append(task)
if not _is_dry_run(args):
save_plan(args.file, plan)
if not getattr(args, "quiet", False):
print(f"{_prefix(args)} Added [{next_id}] {args.title}")
[docs]
@require_plan
def cmd_set(plan: dict, args: argparse.Namespace) -> None:
"""Set a task field."""
result = find_task(plan, args.id)
if result is None:
print(f"Error: Task '{args.id}' not found\n", file=sys.stderr)
print(format_task_suggestions(plan), file=sys.stderr)
sys.exit(1)
assert result is not None
_, task = result
if args.field == "status":
if args.value not in VALID_STATUSES:
print(f"Error: Invalid status. Use: {', '.join(VALID_STATUSES)}", file=sys.stderr)
sys.exit(1)
task["status"] = args.value
if args.value == "in_progress":
task["tracking"]["started_at"] = now_iso()
elif args.value == "completed":
task["tracking"]["completed_at"] = now_iso()
# Cascade to subtasks
for subtask in task.get("subtasks", []):
subtask["status"] = "completed"
elif args.field == "agent":
task["agent_type"] = args.value if args.value != "none" else None
elif args.field == "skill":
task["skill"] = args.value if args.value != "none" else None
elif args.field == "title":
task["title"] = args.value
elif args.field == "files":
# Comma-separated list of file paths with optional line refs
if args.value in ("none", ""):
task["files"] = []
else:
task["files"] = [f.strip() for f in args.value.split(",")]
elif args.field == "research":
task["research"] = args.value if args.value != "none" else None
elif args.field == "plan":
task["plan"] = args.value if args.value != "none" else None
else:
valid_fields = "status, agent, skill, title, files, research, plan"
print(f"Error: Unknown field '{args.field}'. Use: {valid_fields}", file=sys.stderr)
sys.exit(1)
if not _is_dry_run(args):
save_plan(args.file, plan)
if not getattr(args, "quiet", False):
print(f"{_prefix(args)} [{args.id}] {args.field} → {args.value}")
[docs]
def cmd_done(args: argparse.Namespace) -> None:
"""Mark task as completed."""
args.field = "status"
args.value = "completed"
cmd_set(args)
[docs]
def cmd_start(args: argparse.Namespace) -> None:
"""Mark task as in_progress."""
args.field = "status"
args.value = "in_progress"
cmd_set(args)
[docs]
def cmd_block(args: argparse.Namespace) -> None:
"""Mark task as blocked."""
args.field = "status"
args.value = "blocked"
cmd_set(args)
[docs]
def cmd_skip(args: argparse.Namespace) -> None:
"""Mark task as skipped."""
args.field = "status"
args.value = "skipped"
cmd_set(args)
[docs]
@require_plan
def cmd_defer(plan: dict, args: argparse.Namespace) -> None:
"""Move task to deferred phase, or create a new deferred task if input is not an existing task ID."""
# Find or create deferred phase
deferred = find_phase(plan, "deferred")
if deferred is None:
deferred = {
"id": "deferred",
"name": "Deferred",
"description": "Tasks postponed for later consideration",
"status": "pending",
"progress": {"completed": 0, "total": 0, "percentage": 0},
"tasks": [],
}
plan["phases"].append(deferred)
# Generate next ID for deferred phase
existing_tasks = deferred.get("tasks", [])
assert isinstance(existing_tasks, list)
max_task = 0
for t in existing_tasks:
assert isinstance(t, dict)
parts = str(t["id"]).split(".")
if len(parts) >= 3:
with contextlib.suppress(ValueError):
max_task = max(max_task, int(parts[2]))
new_id = f"deferred.1.{max_task + 1}"
# Get defer reason if provided (only store non-empty strings)
defer_reason = getattr(args, "reason", None)
if defer_reason and not defer_reason.strip():
defer_reason = None
# Try to find existing task to move
result = find_task(plan, args.id)
if result is not None:
# Move existing task to deferred
old_phase, task = result
old_phase["tasks"].remove(task)
old_id = task["id"]
task["id"] = new_id
# Add defer reason to tracking if provided
if defer_reason:
tracking = task["tracking"]
assert isinstance(tracking, dict)
tracking["defer_reason"] = defer_reason
task_list = deferred["tasks"]
assert isinstance(task_list, list)
task_list.append(task)
if not _is_dry_run(args):
save_plan(args.file, plan)
if not getattr(args, "quiet", False):
print(f"{_prefix(args)} [{old_id}] → [{new_id}] (deferred)")
else:
# Create new deferred task with input as title
tracking: dict = {}
if defer_reason:
tracking["defer_reason"] = defer_reason
task = {
"id": new_id,
"title": args.id, # Use input as title
"status": "pending",
"agent_type": None,
"skill": None,
"depends_on": [],
"tracking": tracking,
}
task_list = deferred["tasks"]
assert isinstance(task_list, list)
task_list.append(task)
if not _is_dry_run(args):
save_plan(args.file, plan)
if not getattr(args, "quiet", False):
print(f"{_prefix(args)} Added [{new_id}] {args.id} (deferred)")
[docs]
@require_plan
def cmd_bug(plan: dict, args: argparse.Namespace) -> None:
"""Move task to bugs phase, or create a new bug if input is not an existing task ID."""
# Find or create bugs phase
bugs = find_phase(plan, "bugs")
if bugs is None:
bugs = {
"id": "bugs",
"name": "Bugs",
"description": "Tasks identified as bugs requiring fixes",
"status": "pending",
"progress": {"completed": 0, "total": 0, "percentage": 0},
"tasks": [],
}
plan["phases"].append(bugs)
# Generate next ID for bugs phase
existing_tasks = bugs.get("tasks", [])
assert isinstance(existing_tasks, list)
max_task = 0
for t in existing_tasks:
assert isinstance(t, dict)
parts = str(t["id"]).split(".")
if len(parts) >= 3:
with contextlib.suppress(ValueError):
max_task = max(max_task, int(parts[2]))
new_id = f"bugs.1.{max_task + 1}"
# Try to find existing task to move
result = find_task(plan, args.id)
if result is not None:
# Move existing task to bugs
old_phase, task = result
old_phase["tasks"].remove(task)
old_id = task["id"]
task["id"] = new_id
task_list = bugs["tasks"]
assert isinstance(task_list, list)
task_list.append(task)
if not _is_dry_run(args):
save_plan(args.file, plan)
if not getattr(args, "quiet", False):
print(f"{_prefix(args)} [{old_id}] → [{new_id}] (bug)")
else:
# Create new bug task with input as title
task = {
"id": new_id,
"title": args.id, # Use input as title
"status": "pending",
"agent_type": None,
"skill": None,
"depends_on": [],
"tracking": {},
}
task_list = bugs["tasks"]
assert isinstance(task_list, list)
task_list.append(task)
if not _is_dry_run(args):
save_plan(args.file, plan)
if not getattr(args, "quiet", False):
print(f"{_prefix(args)} Added [{new_id}] {args.id} (bug)")
[docs]
@require_plan
def cmd_idea(plan: dict, args: argparse.Namespace) -> None:
"""Move task to ideas phase, or create a new idea if input is not an existing task ID."""
# Find or create ideas phase
ideas = find_phase(plan, "ideas")
if ideas is None:
ideas = {
"id": "ideas",
"name": "Ideas",
"description": "Tasks stored as future ideas or concepts",
"status": "pending",
"progress": {"completed": 0, "total": 0, "percentage": 0},
"tasks": [],
}
plan["phases"].append(ideas)
# Generate next ID for ideas phase
existing_tasks = ideas.get("tasks", [])
assert isinstance(existing_tasks, list)
max_task = 0
for t in existing_tasks:
assert isinstance(t, dict)
parts = str(t["id"]).split(".")
if len(parts) >= 3:
with contextlib.suppress(ValueError):
max_task = max(max_task, int(parts[2]))
new_id = f"ideas.1.{max_task + 1}"
# Try to find existing task to move
result = find_task(plan, args.id)
if result is not None:
# Move existing task to ideas
old_phase, task = result
old_phase["tasks"].remove(task)
old_id = task["id"]
task["id"] = new_id
task_list = ideas["tasks"]
assert isinstance(task_list, list)
task_list.append(task)
if not _is_dry_run(args):
save_plan(args.file, plan)
if not getattr(args, "quiet", False):
print(f"{_prefix(args)} [{old_id}] → [{new_id}] (idea)")
else:
# Create new idea task with input as title
task = {
"id": new_id,
"title": args.id, # Use input as title
"status": "pending",
"agent_type": None,
"skill": None,
"depends_on": [],
"tracking": {},
}
task_list = ideas["tasks"]
assert isinstance(task_list, list)
task_list.append(task)
if not _is_dry_run(args):
save_plan(args.file, plan)
if not getattr(args, "quiet", False):
print(f"{_prefix(args)} Added [{new_id}] {args.id} (idea)")
[docs]
@require_plan
def cmd_rm(plan: dict, args: argparse.Namespace) -> None:
"""Remove a phase or task."""
if args.type == "task":
result = find_task(plan, args.id)
if result is None:
print(f"Error: Task '{args.id}' not found\n", file=sys.stderr)
print(format_task_suggestions(plan), file=sys.stderr)
sys.exit(1)
assert result is not None
phase, task = result
phase["tasks"].remove(task)
if not _is_dry_run(args):
save_plan(args.file, plan)
if not getattr(args, "quiet", False):
print(f"{_prefix(args)} Removed task [{args.id}]")
else: # args.type == "phase" (argparse enforces this)
phase = find_phase(plan, args.id)
if phase is None:
print(f"Error: Phase '{args.id}' not found\n", file=sys.stderr)
print(format_phase_suggestions(plan), file=sys.stderr)
sys.exit(1)
assert phase is not None
plan["phases"].remove(phase)
if not _is_dry_run(args):
save_plan(args.file, plan)
if not getattr(args, "quiet", False):
print(f"{_prefix(args)} Removed phase {args.id}")
def _rotate_backups(backup_dir: Path, plan_name: str, max_backups: int = 5) -> None:
"""Rotate backup files: .json.1 → .json.2, etc. Deletes oldest if at max."""
for i in range(max_backups, 0, -1):
old = backup_dir / f"{plan_name}.json.{i}"
new = backup_dir / f"{plan_name}.json.{i + 1}"
if old.exists():
if i == max_backups:
old.unlink() # Delete oldest
else:
old.rename(new)
def _compact_task(task: dict) -> bool:
"""Strip completed task to minimal fields. Returns True if modified."""
keep = {"id", "title", "status", "tracking"}
removed = [k for k in list(task.keys()) if k not in keep]
for key in removed:
del task[key]
# Keep only completed_at in tracking
modified = bool(removed)
if task.get("tracking"):
completed_at = task["tracking"].get("completed_at")
old_tracking = task["tracking"]
task["tracking"] = {"completed_at": completed_at} if completed_at else {}
if task["tracking"] != old_tracking:
modified = True
return modified
[docs]
@require_plan
def cmd_compact(plan: dict, args: argparse.Namespace) -> None:
"""Backup plan and compact completed tasks to minimal fields."""
plan_path = Path(args.file)
max_backups = getattr(args, "max_backups", 5)
# 1. Create backup directory and rotate existing backups
backup_dir = Path(".claude/plan-view")
backup_dir.mkdir(parents=True, exist_ok=True)
plan_name = plan_path.stem # "plan" from "plan.json"
_rotate_backups(backup_dir, plan_name, max_backups)
# 2. Save current plan as .json.1
backup_path = backup_dir / f"{plan_name}.json.1"
shutil.copy2(plan_path, backup_path)
# 3. Compact completed tasks
compacted = 0
for phase in plan["phases"]:
for task in phase["tasks"]:
if task["status"] == "completed" and _compact_task(task):
compacted += 1
# 4. Save compacted plan
if not _is_dry_run(args):
save_plan(args.file, plan)
if not getattr(args, "quiet", False):
print(f"{_prefix(args)} Backed up to {backup_path}")
print(f"{_prefix(args)} Compacted {compacted} completed task{'s' if compacted != 1 else ''}")
[docs]
@require_plan
def cmd_reconcile(plan: dict, args: argparse.Namespace) -> None:
"""Reconcile plan data: fix inconsistencies and validate."""
fixes = []
# 1. Cascade subtask status for completed tasks
for phase in plan["phases"]:
for task in phase["tasks"]:
if task["status"] == "completed":
for subtask in task.get("subtasks", []):
if subtask["status"] != "completed":
subtask["status"] = "completed"
msg = f" Marked subtask {subtask['id']} as completed (parent {task['id']} is completed)"
fixes.append(msg)
# 2. Ensure completed tasks have completed_at timestamp
for phase in plan["phases"]:
for task in phase["tasks"]:
if task["status"] == "completed":
tracking = task.get("tracking", {})
if not tracking.get("completed_at"):
task.setdefault("tracking", {})["completed_at"] = now_iso()
fixes.append(f" Added completed_at timestamp to task {task['id']}")
# 3. Ensure in_progress tasks have started_at timestamp
for phase in plan["phases"]:
for task in phase["tasks"]:
if task["status"] == "in_progress":
tracking = task.get("tracking", {})
if not tracking.get("started_at"):
task.setdefault("tracking", {})["started_at"] = now_iso()
fixes.append(f" Added started_at timestamp to task {task['id']}")
# 4. Ensure all tasks have tracking and depends_on fields
for phase in plan["phases"]:
for task in phase["tasks"]:
if "tracking" not in task:
task["tracking"] = {}
fixes.append(f" Added tracking field to task {task['id']}")
if "depends_on" not in task:
task["depends_on"] = []
fixes.append(f" Added depends_on field to task {task['id']}")
# Report fixes
if not getattr(args, "quiet", False):
if fixes:
print(f"Reconciled {len(fixes)} issue{'s' if len(fixes) != 1 else ''}:")
for fix in fixes:
print(fix)
else:
print("No issues found.")
# Save if there were fixes
if fixes and not _is_dry_run(args):
save_plan(args.file, plan)
if not getattr(args, "quiet", False):
print(f"\nSaved changes to {args.file}")
# Run validation
if not getattr(args, "quiet", False):
print("\nValidating...")
cmd_validate(plan, args.file, as_json=getattr(args, "json", False))
def _list_restore_points(backup_dir: Path) -> list[dict]:
"""Discover available restore points in backup_dir.
Returns list of dicts with keys: path, label, size, modified, type.
Sorted newest-first.
"""
points: list[dict] = []
# Full backups: plan.json.1, plan.json.2, ...
for f in sorted(backup_dir.glob("plan.json.[0-9]*")):
suffix = f.name.split(".")[-1]
stat = f.stat()
try:
data = json.loads(f.read_text())
project = data.get("meta", {}).get("project", "?")
updated = data.get("meta", {}).get("updated_at", "?")
label = f"Full backup #{suffix} — {project} (updated {updated})"
except (json.JSONDecodeError, KeyError):
label = f"Full backup #{suffix}"
points.append({
"path": f,
"label": label,
"size": stat.st_size,
"modified": stat.st_mtime,
"type": "full",
"index": int(suffix),
})
# Delta backups: plan.delta.1.json, plan.delta.2.json, ...
for f in sorted(backup_dir.glob("plan.delta.[0-9]*.json")):
parts = f.stem.split(".")
suffix = parts[2] if len(parts) >= 3 else "?"
stat = f.stat()
try:
patch_ops = json.loads(f.read_text())
op_count = len(patch_ops) if isinstance(patch_ops, list) else 0
label = f"Delta patch #{suffix} — {op_count} operation{'s' if op_count != 1 else ''}"
except (json.JSONDecodeError, KeyError):
label = f"Delta patch #{suffix}"
points.append({
"path": f,
"label": label,
"size": stat.st_size,
"modified": stat.st_mtime,
"type": "delta",
"index": int(suffix),
})
# Sort by modification time, newest first
points.sort(key=lambda p: p["modified"], reverse=True)
return points
[docs]
def cmd_restore(args: argparse.Namespace) -> None:
"""List restore points and restore a chosen backup."""
plan_path = Path(args.file)
backup_dir = Path(".claude/plan-view")
if not backup_dir.exists():
print("No backup directory found (.claude/plan-view/)", file=sys.stderr)
sys.exit(1)
points = _list_restore_points(backup_dir)
if not points:
print("No restore points found.", file=sys.stderr)
sys.exit(1)
# List mode (no --point given)
choice = getattr(args, "point", None)
if choice is None:
print(f"{BOLD}Available restore points:{RESET}\n")
for idx, pt in enumerate(points, 1):
type_tag = "full" if pt["type"] == "full" else "delta"
print(f" {BOLD}{idx}{RESET}) [{DIM}{type_tag}{RESET}] {pt['label']}")
print(f"\n{DIM}Usage: pv restore <number>{RESET}")
return
# Restore mode
try:
idx = int(choice) - 1
except ValueError:
print(f"Error: '{choice}' is not a valid number", file=sys.stderr)
sys.exit(1)
if idx < 0 or idx >= len(points):
print(f"Error: choice must be between 1 and {len(points)}", file=sys.stderr)
sys.exit(1)
selected = points[idx]
if selected["type"] == "delta":
print("Error: Delta patches cannot be restored directly. Choose a full backup.", file=sys.stderr)
sys.exit(1)
# Read restore content before rotation moves the file
restore_content = selected["path"].read_bytes()
# Back up current plan before restoring
if plan_path.exists():
backup_dir.mkdir(parents=True, exist_ok=True)
plan_name = plan_path.stem
_rotate_backups(backup_dir, plan_name)
pre_restore_path = backup_dir / f"{plan_name}.json.1"
shutil.copy2(plan_path, pre_restore_path)
if not getattr(args, "quiet", False):
print(f"Backed up current plan to {pre_restore_path}")
# Write the restore content over the current plan
if not _is_dry_run(args):
plan_path.write_bytes(restore_content)
# Reset periodic counter since we changed the plan
periodic_path = backup_dir / "periodic.json"
if periodic_path.exists():
periodic_path.unlink()
if not getattr(args, "quiet", False):
print(f"{_prefix(args)} Restored from: {selected['label']}")