You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Home-AssistantConfig/codex_skills/homeassistant-yaml-dry-veri.../scripts/verify_ha_yaml_dry.py

494 lines
17 KiB

#!/usr/bin/env python
"""
Detect DRY violations in Home Assistant YAML by finding repeated structures.
Focus areas:
- Repeated trigger/condition/action blocks across automations
- Repeated sequence blocks across scripts
- Repeated entries inside those blocks
- Duplicate entries within a single block
"""
from __future__ import annotations
import argparse
import json
import os
import re
import sys
from collections import defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Iterable
import yaml
class _Tagged(str):
"""Opaque holder for unknown YAML tags such as !include or !secret."""
class _Loader(yaml.SafeLoader):
pass
def _construct_undefined(loader: _Loader, node: yaml.Node) -> Any:
if isinstance(node, yaml.ScalarNode):
return _Tagged(f"{node.tag} {node.value}")
if isinstance(node, yaml.SequenceNode):
seq = loader.construct_sequence(node)
return _Tagged(f"{node.tag} {seq!r}")
if isinstance(node, yaml.MappingNode):
mapping = loader.construct_mapping(node)
return _Tagged(f"{node.tag} {mapping!r}")
return _Tagged(f"{node.tag}")
_Loader.add_constructor(None, _construct_undefined) # type: ignore[arg-type]
AUTOMATION_KEYS: dict[str, tuple[str, ...]] = {
"trigger": ("trigger", "triggers"),
"condition": ("condition", "conditions"),
"action": ("action", "actions"),
}
SCRIPT_KEYS: dict[str, tuple[str, ...]] = {
"sequence": ("sequence",),
}
SKIP_DIRS = {".git", ".venv", ".codex_tmp", "__pycache__", ".mypy_cache"}
@dataclass(frozen=True)
class Candidate:
kind: str # "automation" | "script"
name: str
file_path: str
path: str
data: dict[str, Any]
@dataclass(frozen=True)
class Occurrence:
file_path: str
candidate_name: str
candidate_path: str
block_path: str
@dataclass(frozen=True)
class ParseError:
file_path: str
error: str
@dataclass(frozen=True)
class CentralScriptFinding:
script_id: str
definition_files: tuple[str, ...]
caller_files: tuple[str, ...]
def _discover_yaml_files(paths: Iterable[str]) -> list[Path]:
found: set[Path] = set()
for raw in paths:
p = Path(raw)
if not p.exists():
continue
if p.is_file() and p.suffix.lower() in {".yaml", ".yml"}:
found.add(p.resolve())
continue
if p.is_dir():
for root, dirs, files in os.walk(p):
dirs[:] = [d for d in dirs if d not in SKIP_DIRS]
root_path = Path(root)
for name in files:
if Path(name).suffix.lower() in {".yaml", ".yml"}:
found.add((root_path / name).resolve())
return sorted(found)
def _load_yaml_docs(path: Path) -> list[Any]:
text = path.read_text(encoding="utf-8")
return list(yaml.load_all(text, Loader=_Loader))
def _looks_like_automation(v: Any) -> bool:
if not isinstance(v, dict):
return False
has_trigger = "trigger" in v or "triggers" in v
has_action = "action" in v or "actions" in v
return has_trigger and has_action
def _looks_like_script(v: Any) -> bool:
return isinstance(v, dict) and "sequence" in v
def _candidate_name(kind: str, v: dict[str, Any], fallback: str) -> str:
alias = v.get("alias")
if isinstance(alias, str) and alias.strip():
return alias.strip()
cid = v.get("id")
if isinstance(cid, str) and cid.strip():
return cid.strip()
return f"{kind}:{fallback}"
def _iter_container_items(container: Any) -> Iterable[tuple[str, dict[str, Any]]]:
if isinstance(container, list):
for idx, item in enumerate(container):
if isinstance(item, dict):
yield f"[{idx}]", item
return
if isinstance(container, dict):
for key, item in container.items():
if isinstance(item, dict):
yield f".{key}", item
def _extract_candidates_from_doc(doc: Any, file_path: str, doc_idx: int) -> list[Candidate]:
out: list[Candidate] = []
root_path = "$" if doc_idx == 0 else f"$doc[{doc_idx}]"
if isinstance(doc, list):
for suffix, item in _iter_container_items(doc):
if _looks_like_automation(item):
name = _candidate_name("automation", item, f"{root_path}{suffix}")
out.append(
Candidate(
kind="automation",
name=name,
file_path=file_path,
path=f"{root_path}{suffix}",
data=item,
)
)
return out
if not isinstance(doc, dict):
return out
if _looks_like_automation(doc):
name = _candidate_name("automation", doc, root_path)
out.append(Candidate("automation", name, file_path, root_path, doc))
if _looks_like_script(doc):
name = _candidate_name("script", doc, root_path)
out.append(Candidate("script", name, file_path, root_path, doc))
if "automation" in doc:
for suffix, item in _iter_container_items(doc["automation"]):
if _looks_like_automation(item):
name = _candidate_name("automation", item, f"{root_path}.automation{suffix}")
out.append(
Candidate(
kind="automation",
name=name,
file_path=file_path,
path=f"{root_path}.automation{suffix}",
data=item,
)
)
if "script" in doc:
for suffix, item in _iter_container_items(doc["script"]):
if _looks_like_script(item):
name = _candidate_name("script", item, f"{root_path}.script{suffix}")
out.append(
Candidate(
kind="script",
name=name,
file_path=file_path,
path=f"{root_path}.script{suffix}",
data=item,
)
)
if out:
return out
for key, item in doc.items():
if isinstance(item, dict) and _looks_like_automation(item):
name = _candidate_name("automation", item, f"{root_path}.{key}")
out.append(Candidate("automation", name, file_path, f"{root_path}.{key}", item))
if isinstance(item, dict) and _looks_like_script(item):
name = _candidate_name("script", item, f"{root_path}.{key}")
out.append(Candidate("script", name, file_path, f"{root_path}.{key}", item))
return out
def _normalize(value: Any) -> Any:
if isinstance(value, _Tagged):
return str(value)
if isinstance(value, dict):
normalized_items: list[tuple[str, Any]] = []
for k, v in value.items():
normalized_items.append((str(k), _normalize(v)))
normalized_items.sort(key=lambda i: i[0])
return {k: v for k, v in normalized_items}
if isinstance(value, list):
return [_normalize(v) for v in value]
return value
def _fingerprint(value: Any) -> str:
return json.dumps(_normalize(value), sort_keys=True, separators=(",", ":"), ensure_ascii=True)
def _first_present_key(mapping: dict[str, Any], aliases: tuple[str, ...]) -> str | None:
for key in aliases:
if key in mapping:
return key
return None
def _iter_entries(value: Any) -> Iterable[tuple[str, Any]]:
if isinstance(value, list):
for idx, entry in enumerate(value):
yield f"[{idx}]", entry
return
yield "", value
def _block_keys_for_candidate(candidate: Candidate) -> dict[str, tuple[str, ...]]:
if candidate.kind == "automation":
return AUTOMATION_KEYS
return SCRIPT_KEYS
def _recommendation(block_label: str) -> str:
if block_label in {"action", "sequence"}:
return "Move repeated logic to a shared script and call it with variables."
if block_label == "condition":
return "Extract shared condition logic into helpers/template sensors or merge condition blocks."
return "Consolidate repeated trigger patterns where behavior is equivalent."
def _render_occurrences(occurrences: list[Occurrence], max_rows: int = 6) -> str:
lines: list[str] = []
for occ in occurrences[:max_rows]:
lines.append(
f" - {occ.file_path} :: {occ.block_path} ({occ.candidate_name})"
)
if len(occurrences) > max_rows:
lines.append(f" - ... {len(occurrences) - max_rows} more")
return "\n".join(lines)
def _normalize_path(path: str) -> str:
return path.replace("\\", "/").lower()
def _infer_script_id(candidate: Candidate) -> str | None:
if candidate.kind != "script":
return None
marker = ".script."
if marker in candidate.path:
return candidate.path.split(marker, 1)[1]
if "/config/script/" in _normalize_path(candidate.file_path):
if candidate.path.startswith("$."):
return candidate.path[2:]
match = re.match(r"^\$doc\[\d+\]\.(.+)$", candidate.path)
if match:
return match.group(1)
return None
def _collect_script_service_calls(node: Any, script_ids: set[str]) -> None:
if isinstance(node, dict):
for key, value in node.items():
if key in {"service", "action"} and isinstance(value, str):
service_name = value.strip()
if service_name.startswith("script."):
script_id = service_name.split(".", 1)[1].strip()
if script_id:
script_ids.add(script_id)
_collect_script_service_calls(value, script_ids)
return
if isinstance(node, list):
for item in node:
_collect_script_service_calls(item, script_ids)
def main(argv: list[str]) -> int:
ap = argparse.ArgumentParser(description="Detect duplicated Home Assistant YAML structures.")
ap.add_argument("paths", nargs="+", help="YAML file(s) or directory path(s) to scan")
ap.add_argument("--min-occurrences", type=int, default=2, help="Minimum duplicate count to report (default: 2)")
ap.add_argument("--max-groups", type=int, default=50, help="Maximum duplicate groups to print (default: 50)")
ap.add_argument("--strict", action="store_true", help="Return non-zero when duplicates are found")
args = ap.parse_args(argv)
if args.min_occurrences < 2:
print("ERROR: --min-occurrences must be >= 2", file=sys.stderr)
return 2
files = _discover_yaml_files(args.paths)
if not files:
print("ERROR: no YAML files found for the provided paths", file=sys.stderr)
return 2
parse_errors: list[ParseError] = []
candidates: list[Candidate] = []
script_calls_by_id: dict[str, set[str]] = defaultdict(set)
for path in files:
try:
docs = _load_yaml_docs(path)
except Exception as exc:
parse_errors.append(ParseError(file_path=str(path), error=str(exc)))
continue
script_calls_in_file: set[str] = set()
for doc_idx, doc in enumerate(docs):
candidates.extend(_extract_candidates_from_doc(doc, str(path), doc_idx))
_collect_script_service_calls(doc, script_calls_in_file)
for script_id in script_calls_in_file:
script_calls_by_id[script_id].add(str(path))
full_index: dict[tuple[str, str, str], list[Occurrence]] = defaultdict(list)
entry_index: dict[tuple[str, str, str], list[Occurrence]] = defaultdict(list)
intra_duplicate_notes: list[str] = []
for candidate in candidates:
block_key_map = _block_keys_for_candidate(candidate)
for block_label, aliases in block_key_map.items():
source_key = _first_present_key(candidate.data, aliases)
if not source_key:
continue
block_value = candidate.data[source_key]
if block_value in (None, [], {}):
continue
block_fp = _fingerprint(block_value)
full_index[(candidate.kind, block_label, block_fp)].append(
Occurrence(
file_path=candidate.file_path,
candidate_name=candidate.name,
candidate_path=candidate.path,
block_path=f"{candidate.path}.{source_key}",
)
)
seen_in_candidate: dict[str, list[str]] = defaultdict(list)
for suffix, entry in _iter_entries(block_value):
entry_fp = _fingerprint(entry)
entry_occ = Occurrence(
file_path=candidate.file_path,
candidate_name=candidate.name,
candidate_path=candidate.path,
block_path=f"{candidate.path}.{source_key}{suffix}",
)
entry_index[(candidate.kind, block_label, entry_fp)].append(entry_occ)
seen_in_candidate[entry_fp].append(entry_occ.block_path)
for entry_fp, block_paths in seen_in_candidate.items():
if len(block_paths) >= args.min_occurrences:
intra_duplicate_notes.append(
(
f"INTRA {candidate.kind}.{block_label}: {candidate.name} has "
f"{len(block_paths)} duplicated entries in {candidate.path}.{source_key}"
)
)
def _filter_groups(index: dict[tuple[str, str, str], list[Occurrence]]) -> list[tuple[tuple[str, str, str], list[Occurrence]]]:
groups = [(k, v) for k, v in index.items() if len(v) >= args.min_occurrences]
groups.sort(key=lambda item: (-len(item[1]), item[0][0], item[0][1]))
return groups
full_groups = _filter_groups(full_index)
entry_groups = _filter_groups(entry_index)
intra_duplicate_notes = sorted(set(intra_duplicate_notes))
script_definitions_by_id: dict[str, set[str]] = defaultdict(set)
for candidate in candidates:
script_id = _infer_script_id(candidate)
if script_id:
script_definitions_by_id[script_id].add(candidate.file_path)
central_script_findings: list[CentralScriptFinding] = []
for script_id, definition_files in script_definitions_by_id.items():
normalized_definitions = {_normalize_path(path): path for path in definition_files}
if not any("/config/packages/" in n for n in normalized_definitions):
continue
if any("/config/script/" in n for n in normalized_definitions):
continue
caller_files = sorted(script_calls_by_id.get(script_id, set()))
if len(caller_files) < 2:
continue
central_script_findings.append(
CentralScriptFinding(
script_id=script_id,
definition_files=tuple(sorted(definition_files)),
caller_files=tuple(caller_files),
)
)
central_script_findings.sort(key=lambda item: (-len(item.caller_files), item.script_id))
print(f"Scanned files: {len(files)}")
print(f"Parsed candidates: {len(candidates)}")
print(f"Parse errors: {len(parse_errors)}")
print(f"Duplicate full-block groups: {len(full_groups)}")
print(f"Duplicate entry groups: {len(entry_groups)}")
print(f"Intra-block duplicates: {len(intra_duplicate_notes)}")
print(f"Central-script findings: {len(central_script_findings)}")
if parse_errors:
print("\nParse errors:")
for err in parse_errors:
print(f" - {err.file_path}: {err.error}")
if full_groups:
print("\nFULL_BLOCK findings:")
for idx, ((kind, block_label, _), occurrences) in enumerate(full_groups[: args.max_groups], start=1):
print(f"{idx}. {kind}.{block_label} repeated {len(occurrences)} times")
print(_render_occurrences(occurrences))
print(f" suggestion: {_recommendation(block_label)}")
if entry_groups:
print("\nENTRY findings:")
for idx, ((kind, block_label, _), occurrences) in enumerate(entry_groups[: args.max_groups], start=1):
print(f"{idx}. {kind}.{block_label} entry repeated {len(occurrences)} times")
print(_render_occurrences(occurrences))
print(f" suggestion: {_recommendation(block_label)}")
if intra_duplicate_notes:
print("\nINTRA findings:")
for idx, note in enumerate(intra_duplicate_notes[: args.max_groups], start=1):
print(f"{idx}. {note}")
if central_script_findings:
print("\nCENTRAL_SCRIPT findings:")
for idx, finding in enumerate(central_script_findings[: args.max_groups], start=1):
print(
f"{idx}. script.{finding.script_id} is package-defined and called from "
f"{len(finding.caller_files)} files"
)
for definition_file in finding.definition_files:
print(f" - definition: {definition_file}")
for caller_file in finding.caller_files[:6]:
print(f" - caller: {caller_file}")
if len(finding.caller_files) > 6:
print(f" - ... {len(finding.caller_files) - 6} more callers")
print(f" suggestion: Move definition to config/script/{finding.script_id}.yaml")
finding_count = (
len(full_groups)
+ len(entry_groups)
+ len(intra_duplicate_notes)
+ len(central_script_findings)
)
if args.strict and finding_count > 0:
return 1
if parse_errors:
return 2
return 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))

Powered by TurnKey Linux.