#!/usr/bin/env python """ Detect DRY violations in Home Assistant YAML by finding repeated structures. Focus areas: - Repeated trigger/condition/action blocks across automations - Repeated sequence blocks across scripts - Repeated entries inside those blocks - Duplicate entries within a single block """ from __future__ import annotations import argparse import json import os import re import sys from collections import defaultdict from dataclasses import dataclass from pathlib import Path from typing import Any, Iterable import yaml class _Tagged(str): """Opaque holder for unknown YAML tags such as !include or !secret.""" class _Loader(yaml.SafeLoader): pass def _construct_undefined(loader: _Loader, node: yaml.Node) -> Any: if isinstance(node, yaml.ScalarNode): return _Tagged(f"{node.tag} {node.value}") if isinstance(node, yaml.SequenceNode): seq = loader.construct_sequence(node) return _Tagged(f"{node.tag} {seq!r}") if isinstance(node, yaml.MappingNode): mapping = loader.construct_mapping(node) return _Tagged(f"{node.tag} {mapping!r}") return _Tagged(f"{node.tag}") _Loader.add_constructor(None, _construct_undefined) # type: ignore[arg-type] AUTOMATION_KEYS: dict[str, tuple[str, ...]] = { "trigger": ("trigger", "triggers"), "condition": ("condition", "conditions"), "action": ("action", "actions"), } SCRIPT_KEYS: dict[str, tuple[str, ...]] = { "sequence": ("sequence",), } SKIP_DIRS = {".git", ".venv", ".codex_tmp", "__pycache__", ".mypy_cache"} @dataclass(frozen=True) class Candidate: kind: str # "automation" | "script" name: str file_path: str path: str data: dict[str, Any] @dataclass(frozen=True) class Occurrence: file_path: str candidate_name: str candidate_path: str block_path: str @dataclass(frozen=True) class ParseError: file_path: str error: str @dataclass(frozen=True) class CentralScriptFinding: script_id: str definition_files: tuple[str, ...] caller_files: tuple[str, ...] def _discover_yaml_files(paths: Iterable[str]) -> list[Path]: found: set[Path] = set() for raw in paths: p = Path(raw) if not p.exists(): continue if p.is_file() and p.suffix.lower() in {".yaml", ".yml"}: found.add(p.resolve()) continue if p.is_dir(): for root, dirs, files in os.walk(p): dirs[:] = [d for d in dirs if d not in SKIP_DIRS] root_path = Path(root) for name in files: if Path(name).suffix.lower() in {".yaml", ".yml"}: found.add((root_path / name).resolve()) return sorted(found) def _load_yaml_docs(path: Path) -> list[Any]: text = path.read_text(encoding="utf-8") return list(yaml.load_all(text, Loader=_Loader)) def _looks_like_automation(v: Any) -> bool: if not isinstance(v, dict): return False has_trigger = "trigger" in v or "triggers" in v has_action = "action" in v or "actions" in v return has_trigger and has_action def _looks_like_script(v: Any) -> bool: return isinstance(v, dict) and "sequence" in v def _candidate_name(kind: str, v: dict[str, Any], fallback: str) -> str: alias = v.get("alias") if isinstance(alias, str) and alias.strip(): return alias.strip() cid = v.get("id") if isinstance(cid, str) and cid.strip(): return cid.strip() return f"{kind}:{fallback}" def _iter_container_items(container: Any) -> Iterable[tuple[str, dict[str, Any]]]: if isinstance(container, list): for idx, item in enumerate(container): if isinstance(item, dict): yield f"[{idx}]", item return if isinstance(container, dict): for key, item in container.items(): if isinstance(item, dict): yield f".{key}", item def _extract_candidates_from_doc(doc: Any, file_path: str, doc_idx: int) -> list[Candidate]: out: list[Candidate] = [] root_path = "$" if doc_idx == 0 else f"$doc[{doc_idx}]" if isinstance(doc, list): for suffix, item in _iter_container_items(doc): if _looks_like_automation(item): name = _candidate_name("automation", item, f"{root_path}{suffix}") out.append( Candidate( kind="automation", name=name, file_path=file_path, path=f"{root_path}{suffix}", data=item, ) ) return out if not isinstance(doc, dict): return out if _looks_like_automation(doc): name = _candidate_name("automation", doc, root_path) out.append(Candidate("automation", name, file_path, root_path, doc)) if _looks_like_script(doc): name = _candidate_name("script", doc, root_path) out.append(Candidate("script", name, file_path, root_path, doc)) if "automation" in doc: for suffix, item in _iter_container_items(doc["automation"]): if _looks_like_automation(item): name = _candidate_name("automation", item, f"{root_path}.automation{suffix}") out.append( Candidate( kind="automation", name=name, file_path=file_path, path=f"{root_path}.automation{suffix}", data=item, ) ) if "script" in doc: for suffix, item in _iter_container_items(doc["script"]): if _looks_like_script(item): name = _candidate_name("script", item, f"{root_path}.script{suffix}") out.append( Candidate( kind="script", name=name, file_path=file_path, path=f"{root_path}.script{suffix}", data=item, ) ) if out: return out for key, item in doc.items(): if isinstance(item, dict) and _looks_like_automation(item): name = _candidate_name("automation", item, f"{root_path}.{key}") out.append(Candidate("automation", name, file_path, f"{root_path}.{key}", item)) if isinstance(item, dict) and _looks_like_script(item): name = _candidate_name("script", item, f"{root_path}.{key}") out.append(Candidate("script", name, file_path, f"{root_path}.{key}", item)) return out def _normalize(value: Any) -> Any: if isinstance(value, _Tagged): return str(value) if isinstance(value, dict): normalized_items: list[tuple[str, Any]] = [] for k, v in value.items(): normalized_items.append((str(k), _normalize(v))) normalized_items.sort(key=lambda i: i[0]) return {k: v for k, v in normalized_items} if isinstance(value, list): return [_normalize(v) for v in value] return value def _fingerprint(value: Any) -> str: return json.dumps(_normalize(value), sort_keys=True, separators=(",", ":"), ensure_ascii=True) def _first_present_key(mapping: dict[str, Any], aliases: tuple[str, ...]) -> str | None: for key in aliases: if key in mapping: return key return None def _iter_entries(value: Any) -> Iterable[tuple[str, Any]]: if isinstance(value, list): for idx, entry in enumerate(value): yield f"[{idx}]", entry return yield "", value def _block_keys_for_candidate(candidate: Candidate) -> dict[str, tuple[str, ...]]: if candidate.kind == "automation": return AUTOMATION_KEYS return SCRIPT_KEYS def _recommendation(block_label: str) -> str: if block_label in {"action", "sequence"}: return ( "Move repeated logic to config/script/.yaml and call it " "via service: script. with variables." ) if block_label == "condition": return ( "Extract shared condition logic into helper/template entities or " "merge condition blocks when behavior is equivalent." ) return ( "Consolidate equivalent trigger patterns and keep shared actions in a " "single reusable script when possible." ) def _render_occurrences(occurrences: list[Occurrence], max_rows: int = 6) -> str: lines: list[str] = [] for occ in occurrences[:max_rows]: lines.append( f" - {occ.file_path} :: {occ.block_path} ({occ.candidate_name})" ) if len(occurrences) > max_rows: lines.append(f" - ... {len(occurrences) - max_rows} more") return "\n".join(lines) def _normalize_path(path: str) -> str: return path.replace("\\", "/").lower() def _entry_parent_block_path(block_path: str) -> str: """Return parent block path for entry occurrences (strip trailing [idx]).""" return re.sub(r"\[\d+\]$", "", block_path) def _occurrence_key( occurrence: Occurrence, *, treat_as_entry: bool = False ) -> tuple[str, str, str]: block_path = ( _entry_parent_block_path(occurrence.block_path) if treat_as_entry else occurrence.block_path ) return (occurrence.file_path, occurrence.candidate_path, block_path) def _infer_script_id(candidate: Candidate) -> str | None: if candidate.kind != "script": return None marker = ".script." if marker in candidate.path: return candidate.path.split(marker, 1)[1] if "/config/script/" in _normalize_path(candidate.file_path): if candidate.path.startswith("$."): return candidate.path[2:] match = re.match(r"^\$doc\[\d+\]\.(.+)$", candidate.path) if match: return match.group(1) return None def _collect_script_service_calls(node: Any, script_ids: set[str]) -> None: """Collect called script IDs from common HA service invocation patterns.""" script_domain_meta_services = {"turn_on", "toggle", "reload", "stop"} def _add_script_entity_ids(value: Any) -> None: if isinstance(value, str): if value.startswith("script."): entity_script_id = value.split(".", 1)[1].strip() if entity_script_id: script_ids.add(entity_script_id) return if isinstance(value, list): for item in value: _add_script_entity_ids(item) if isinstance(node, dict): service_name_raw = node.get("service") action_name_raw = node.get("action") service_name = None if isinstance(service_name_raw, str): service_name = service_name_raw.strip() elif isinstance(action_name_raw, str): service_name = action_name_raw.strip() if service_name and service_name.startswith("script."): tail = service_name.split(".", 1)[1].strip() if tail and tail not in script_domain_meta_services: script_ids.add(tail) else: _add_script_entity_ids(node.get("entity_id")) for key in ("target", "data", "service_data"): container = node.get(key) if isinstance(container, dict): _add_script_entity_ids(container.get("entity_id")) for value in node.values(): _collect_script_service_calls(value, script_ids) return if isinstance(node, list): for item in node: _collect_script_service_calls(item, script_ids) def main(argv: list[str]) -> int: ap = argparse.ArgumentParser(description="Detect duplicated Home Assistant YAML structures.") ap.add_argument("paths", nargs="+", help="YAML file(s) or directory path(s) to scan") ap.add_argument("--min-occurrences", type=int, default=2, help="Minimum duplicate count to report (default: 2)") ap.add_argument("--max-groups", type=int, default=50, help="Maximum duplicate groups to print (default: 50)") ap.add_argument("--strict", action="store_true", help="Return non-zero when duplicates are found") args = ap.parse_args(argv) if args.min_occurrences < 2: print("ERROR: --min-occurrences must be >= 2", file=sys.stderr) return 2 files = _discover_yaml_files(args.paths) if not files: print("ERROR: no YAML files found for the provided paths", file=sys.stderr) return 2 parse_errors: list[ParseError] = [] candidates: list[Candidate] = [] script_calls_by_id: dict[str, set[str]] = defaultdict(set) for path in files: try: docs = _load_yaml_docs(path) except Exception as exc: parse_errors.append(ParseError(file_path=str(path), error=str(exc))) continue script_calls_in_file: set[str] = set() for doc_idx, doc in enumerate(docs): candidates.extend(_extract_candidates_from_doc(doc, str(path), doc_idx)) _collect_script_service_calls(doc, script_calls_in_file) for script_id in script_calls_in_file: script_calls_by_id[script_id].add(str(path)) full_index: dict[tuple[str, str, str], list[Occurrence]] = defaultdict(list) entry_index: dict[tuple[str, str, str], list[Occurrence]] = defaultdict(list) intra_duplicate_notes: list[str] = [] for candidate in candidates: block_key_map = _block_keys_for_candidate(candidate) for block_label, aliases in block_key_map.items(): source_key = _first_present_key(candidate.data, aliases) if not source_key: continue block_value = candidate.data[source_key] if block_value in (None, [], {}): continue block_fp = _fingerprint(block_value) full_index[(candidate.kind, block_label, block_fp)].append( Occurrence( file_path=candidate.file_path, candidate_name=candidate.name, candidate_path=candidate.path, block_path=f"{candidate.path}.{source_key}", ) ) seen_in_candidate: dict[str, list[str]] = defaultdict(list) for suffix, entry in _iter_entries(block_value): entry_fp = _fingerprint(entry) entry_occ = Occurrence( file_path=candidate.file_path, candidate_name=candidate.name, candidate_path=candidate.path, block_path=f"{candidate.path}.{source_key}{suffix}", ) entry_index[(candidate.kind, block_label, entry_fp)].append(entry_occ) seen_in_candidate[entry_fp].append(entry_occ.block_path) for entry_fp, block_paths in seen_in_candidate.items(): if len(block_paths) >= args.min_occurrences: intra_duplicate_notes.append( ( f"INTRA {candidate.kind}.{block_label}: {candidate.name} has " f"{len(block_paths)} duplicated entries in {candidate.path}.{source_key}" ) ) def _filter_groups(index: dict[tuple[str, str, str], list[Occurrence]]) -> list[tuple[tuple[str, str, str], list[Occurrence]]]: groups = [(k, v) for k, v in index.items() if len(v) >= args.min_occurrences] groups.sort(key=lambda item: (-len(item[1]), item[0][0], item[0][1])) return groups full_groups = _filter_groups(full_index) entry_groups = _filter_groups(entry_index) # Drop ENTRY groups that are fully subsumed by an identical FULL_BLOCK group. full_group_member_sets: dict[tuple[str, str], list[set[tuple[str, str, str]]]] = defaultdict(list) for (kind, block_label, _), occurrences in full_groups: full_group_member_sets[(kind, block_label)].append( {_occurrence_key(occ) for occ in occurrences} ) filtered_entry_groups: list[tuple[tuple[str, str, str], list[Occurrence]]] = [] for entry_group_key, entry_occurrences in entry_groups: kind, block_label, _ = entry_group_key entry_member_set = { _occurrence_key(occ, treat_as_entry=True) for occ in entry_occurrences } full_sets = full_group_member_sets.get((kind, block_label), []) is_subsumed = any(entry_member_set.issubset(full_set) for full_set in full_sets) if not is_subsumed: filtered_entry_groups.append((entry_group_key, entry_occurrences)) entry_groups = filtered_entry_groups intra_duplicate_notes = sorted(set(intra_duplicate_notes)) script_definitions_by_id: dict[str, set[str]] = defaultdict(set) for candidate in candidates: script_id = _infer_script_id(candidate) if script_id: script_definitions_by_id[script_id].add(candidate.file_path) central_script_findings: list[CentralScriptFinding] = [] for script_id, definition_files in script_definitions_by_id.items(): normalized_definitions = {_normalize_path(path): path for path in definition_files} if not any("/config/packages/" in n for n in normalized_definitions): continue if any("/config/script/" in n for n in normalized_definitions): continue caller_files = sorted(script_calls_by_id.get(script_id, set())) if len(caller_files) < 2: continue central_script_findings.append( CentralScriptFinding( script_id=script_id, definition_files=tuple(sorted(definition_files)), caller_files=tuple(caller_files), ) ) central_script_findings.sort(key=lambda item: (-len(item.caller_files), item.script_id)) print(f"Scanned files: {len(files)}") print(f"Parsed candidates: {len(candidates)}") print(f"Parse errors: {len(parse_errors)}") print(f"Duplicate full-block groups: {len(full_groups)}") print(f"Duplicate entry groups: {len(entry_groups)}") print(f"Intra-block duplicates: {len(intra_duplicate_notes)}") print(f"Central-script findings: {len(central_script_findings)}") if parse_errors: print("\nParse errors:") for err in parse_errors: print(f" - {err.file_path}: {err.error}") if full_groups: print("\nFULL_BLOCK findings:") for idx, ((kind, block_label, _), occurrences) in enumerate(full_groups[: args.max_groups], start=1): print(f"{idx}. {kind}.{block_label} repeated {len(occurrences)} times") print(_render_occurrences(occurrences)) print(f" suggestion: {_recommendation(block_label)}") if entry_groups: print("\nENTRY findings:") for idx, ((kind, block_label, _), occurrences) in enumerate(entry_groups[: args.max_groups], start=1): print(f"{idx}. {kind}.{block_label} entry repeated {len(occurrences)} times") print(_render_occurrences(occurrences)) print(f" suggestion: {_recommendation(block_label)}") if intra_duplicate_notes: print("\nINTRA findings:") for idx, note in enumerate(intra_duplicate_notes[: args.max_groups], start=1): print(f"{idx}. {note}") if central_script_findings: print("\nCENTRAL_SCRIPT findings:") for idx, finding in enumerate(central_script_findings[: args.max_groups], start=1): print( f"{idx}. script.{finding.script_id} is package-defined and called from " f"{len(finding.caller_files)} files" ) for definition_file in finding.definition_files: print(f" - definition: {definition_file}") for caller_file in finding.caller_files[:6]: print(f" - caller: {caller_file}") if len(finding.caller_files) > 6: print(f" - ... {len(finding.caller_files) - 6} more callers") print(f" suggestion: Move definition to config/script/{finding.script_id}.yaml") finding_count = ( len(full_groups) + len(entry_groups) + len(intra_duplicate_notes) + len(central_script_findings) ) if args.strict and finding_count > 0: return 1 if parse_errors: return 2 return 0 if __name__ == "__main__": raise SystemExit(main(sys.argv[1:]))