feat(kv): add search subcommand to look up KID across vaults

Adds 'kv search <KID>' with optional -s/--service and -v/--vault
filters. Iterates configured vaults, short-circuits on first hit, and
renders results in the same Rich tree style used by the DRM key
display. Remote vaults that cannot enumerate services without a
service tag are skipped with a clear hint to re-run with --service.
This commit is contained in:
imSp4rky
2026-04-28 15:02:19 -06:00
parent 2f7a189c9c
commit 07881d78c2

View File

@@ -4,8 +4,12 @@ from pathlib import Path
from typing import Optional
import click
from rich.padding import Padding
from rich.text import Text
from rich.tree import Tree
from unshackle.core.config import config
from unshackle.core.console import console
from unshackle.core.constants import context_settings
from unshackle.core.services import Services
from unshackle.core.vault import Vault
@@ -188,6 +192,72 @@ def add(file: Path, service: str, vaults: list[str]) -> None:
log.info("Done!")
@kv.command()
@click.argument("kid", type=str)
@click.option("-s", "--service", type=str, default=None, help="Limit search to a specific service tag.")
@click.option(
"-v", "--vault", "vault_name", type=str, default=None, help="Limit search to a specific configured vault by name."
)
def search(kid: str, service: Optional[str], vault_name: Optional[str]) -> None:
"""
Search configured Key Vault(s) for a KID and report any matching KEY.
KID must be 32 hex characters (no dashes). If --service is omitted, every
service table in each vault is scanned. If --vault is omitted, every
vault in the config is searched.
"""
log = logging.getLogger("kv")
kid_norm = kid.replace("-", "").lower()
if not re.fullmatch(r"[0-9a-f]{32}", kid_norm):
raise click.ClickException(f"KID '{kid}' is not 32 hex characters.")
if vault_name:
vault_names = [vault_name]
else:
vault_names = [v["name"] for v in config.key_vaults]
if not vault_names:
raise click.ClickException("No Key Vaults are configured.")
vaults_ = load_vaults(vault_names)
service_tag = Services.get_tag(service) if service else None
hit: Optional[tuple[str, str, str]] = None
for vault in vaults_:
if service_tag:
services_to_check: list[str] = [service_tag]
else:
try:
services_to_check = list(vault.get_services())
except Exception as e:
log.debug(f"{vault}: get_services() failed ({e})")
services_to_check = []
if not services_to_check:
log.warning(f"{vault}: cannot search without a service (remote vault requires --service). Skipping.")
continue
for svc in services_to_check:
try:
key = vault.get_key(kid_norm, svc)
except Exception as e:
log.debug(f"{vault} [{svc}]: lookup error ({e})")
continue
if key and key.count("0") != len(key):
hit = (vault.name, svc, key)
break
if hit:
break
if hit:
vname, svc, key = hit
tree = Tree(Text.assemble((svc, "cyan"), (f"({vname})", "text"), overflow="fold"))
tree.add(f"[text2]{kid_norm}:{key}")
console.print(Padding(tree, (1, 5)))
else:
log.info(f"KID {kid_norm} not found in {len(vaults_)} vault(s).")
@kv.command()
@click.argument("vaults", nargs=-1, type=click.UNPROCESSED)
def prepare(vaults: list[str]) -> None: