diff --git a/unshackle/commands/kv.py b/unshackle/commands/kv.py index 28c870d..0722a8d 100644 --- a/unshackle/commands/kv.py +++ b/unshackle/commands/kv.py @@ -4,8 +4,12 @@ from pathlib import Path from typing import Optional import click +from rich.padding import Padding +from rich.text import Text +from rich.tree import Tree from unshackle.core.config import config +from unshackle.core.console import console from unshackle.core.constants import context_settings from unshackle.core.services import Services from unshackle.core.vault import Vault @@ -188,6 +192,72 @@ def add(file: Path, service: str, vaults: list[str]) -> None: log.info("Done!") +@kv.command() +@click.argument("kid", type=str) +@click.option("-s", "--service", type=str, default=None, help="Limit search to a specific service tag.") +@click.option( + "-v", "--vault", "vault_name", type=str, default=None, help="Limit search to a specific configured vault by name." +) +def search(kid: str, service: Optional[str], vault_name: Optional[str]) -> None: + """ + Search configured Key Vault(s) for a KID and report any matching KEY. + + KID must be 32 hex characters (no dashes). If --service is omitted, every + service table in each vault is scanned. If --vault is omitted, every + vault in the config is searched. + """ + log = logging.getLogger("kv") + + kid_norm = kid.replace("-", "").lower() + if not re.fullmatch(r"[0-9a-f]{32}", kid_norm): + raise click.ClickException(f"KID '{kid}' is not 32 hex characters.") + + if vault_name: + vault_names = [vault_name] + else: + vault_names = [v["name"] for v in config.key_vaults] + if not vault_names: + raise click.ClickException("No Key Vaults are configured.") + + vaults_ = load_vaults(vault_names) + + service_tag = Services.get_tag(service) if service else None + + hit: Optional[tuple[str, str, str]] = None + for vault in vaults_: + if service_tag: + services_to_check: list[str] = [service_tag] + else: + try: + services_to_check = list(vault.get_services()) + except Exception as e: + log.debug(f"{vault}: get_services() failed ({e})") + services_to_check = [] + if not services_to_check: + log.warning(f"{vault}: cannot search without a service (remote vault requires --service). Skipping.") + continue + + for svc in services_to_check: + try: + key = vault.get_key(kid_norm, svc) + except Exception as e: + log.debug(f"{vault} [{svc}]: lookup error ({e})") + continue + if key and key.count("0") != len(key): + hit = (vault.name, svc, key) + break + if hit: + break + + if hit: + vname, svc, key = hit + tree = Tree(Text.assemble((svc, "cyan"), (f"({vname})", "text"), overflow="fold")) + tree.add(f"[text2]{kid_norm}:{key}") + console.print(Padding(tree, (1, 5))) + else: + log.info(f"KID {kid_norm} not found in {len(vaults_)} vault(s).") + + @kv.command() @click.argument("vaults", nargs=-1, type=click.UNPROCESSED) def prepare(vaults: list[str]) -> None: