#!/usr/bin/env python3 """Measure duplicate publisher media-hash convergence in production.""" from __future__ import annotations import argparse import json import sys import time import urllib.parse import urllib.request from dataclasses import dataclass from typing import Any, Callable USER_AGENT = "every-channel-measure-duplicate-publishers/1" DUPLICATE_PROMETHEUS_METRICS = [ "every_channel_ladder_archive_duplicate_hash_source_records", "every_channel_ladder_archive_duplicate_hash_sequences", "every_channel_ladder_archive_hash_divergent_sequences", "every_channel_ladder_archive_missing_hash_records", "every_channel_ladder_archive_missing_source_identity_records", "every_channel_archive_duplicate_hash_source_records", "every_channel_archive_duplicate_hash_sequences", "every_channel_archive_hash_divergent_sequences", "every_channel_archive_missing_hash_records", "every_channel_archive_missing_source_identity_records", ] SOURCE_IDENTITY_KEYS = ("source_node", "publisher_node", "source_id") @dataclass class FetchResult: url: str status: int body: str elapsed_ms: int error: str | None = None @property def ok(self) -> bool: return self.error is None and 200 <= self.status < 300 def now_ms() -> int: return int(time.time() * 1000) def fetch_text(url: str, timeout: float, max_bytes: int = 4 * 1024 * 1024) -> FetchResult: started = now_ms() headers = {"User-Agent": USER_AGENT} if max_bytes > 0: headers["Range"] = f"bytes=-{max_bytes}" req = urllib.request.Request(url, headers=headers) try: with urllib.request.urlopen(req, timeout=timeout) as res: body = res.read(max_bytes + 1 if max_bytes > 0 else -1) if max_bytes > 0 and len(body) > max_bytes: body = body[-max_bytes:] return FetchResult(url, int(res.status), body.decode("utf-8", "replace"), now_ms() - started) except Exception as err: # noqa: BLE001 - measurements preserve transport failures. return FetchResult(url, 0, "", now_ms() - started, str(err)) def fetch_json(url: str, timeout: float, max_bytes: int = 1024 * 1024) -> tuple[FetchResult, Any | None]: fetched = fetch_text(url, timeout, max_bytes=max_bytes) if not fetched.ok: return fetched, None try: return fetched, json.loads(fetched.body) except json.JSONDecodeError as err: fetched.error = f"invalid json: {err}" return fetched, None def parse_named_url(value: str) -> tuple[str, str]: if "=" not in value: raise ValueError(f"expected NAME=URL: {value}") name, url = value.split("=", 1) name = name.strip() url = url.strip() if not name or not url: raise ValueError(f"expected NAME=URL: {value}") return name, url def manifest_url(origin: str, broadcast: str, track: str) -> str: base = origin.rstrip("/") + "/" return urllib.parse.urljoin(base, f"manifests/{broadcast}/{track}.jsonl") def parse_manifest_jsonl(body: str) -> tuple[list[dict[str, Any]], int]: records: list[dict[str, Any]] = [] invalid_lines = 0 for index, line in enumerate(body.splitlines()): raw = line.strip() if not raw: continue try: record = json.loads(raw) except json.JSONDecodeError: # Tail range reads may start in the middle of a JSON line. if index == 0: continue invalid_lines += 1 continue if isinstance(record, dict): records.append(record) else: invalid_lines += 1 return records, invalid_lines def int_or_none(value: Any) -> int | None: if isinstance(value, bool): return None if isinstance(value, int): return value try: return int(str(value)) except (TypeError, ValueError): return None def record_source_identity(record: dict[str, Any]) -> str | None: for key in SOURCE_IDENTITY_KEYS: value = record.get(key) if isinstance(value, str) and value.strip(): return value.strip() return None def manifest_hash_stats(records: list[dict[str, Any]], invalid_lines: int = 0) -> dict[str, Any]: hashes_by_sequence: dict[int, set[str]] = {} source_hashes_by_sequence: dict[int, dict[str, set[str]]] = {} missing_hash_records = 0 missing_source_identity_records = 0 source_identities: set[str] = set() received_values: list[int] = [] for record in records: received_ms = int_or_none(record.get("received_unix_ms")) if received_ms is not None: received_values.append(received_ms) sequence = int_or_none(record.get("group_sequence")) digest = record.get("blake3") if sequence is None: continue source_identity = record_source_identity(record) if source_identity: source_identities.add(source_identity) else: missing_source_identity_records += 1 if not isinstance(digest, str) or not digest.strip(): missing_hash_records += 1 continue clean_digest = digest.strip() hashes_by_sequence.setdefault(sequence, set()).add(clean_digest) if source_identity: source_hashes_by_sequence.setdefault(sequence, {}).setdefault(clean_digest, set()).add(source_identity) duplicate_hash_source_records = sum( max(0, len(source_identities_for_hash) - 1) for hashes in source_hashes_by_sequence.values() for source_identities_for_hash in hashes.values() ) duplicate_hash_sequences = sum( 1 for hashes in source_hashes_by_sequence.values() if any(len(source_identities_for_hash) > 1 for source_identities_for_hash in hashes.values()) ) hash_divergent_sequences = sum(1 for hashes in hashes_by_sequence.values() if len(hashes) > 1) return { "record_count": len(records), "invalid_lines": invalid_lines, "sequence_count": len(hashes_by_sequence), "source_identity_count": len(source_identities), "source_identities": sorted(source_identities), "missing_source_identity_records": missing_source_identity_records, "duplicate_hash_source_records": duplicate_hash_source_records, "duplicate_hash_sequences": duplicate_hash_sequences, "hash_divergent_sequences": hash_divergent_sequences, "missing_hash_records": missing_hash_records, "first_received_unix_ms": min(received_values) if received_values else None, "latest_received_unix_ms": max(received_values) if received_values else None, } def first_hash_by_sequence(records: list[dict[str, Any]]) -> dict[int, str]: out: dict[int, str] = {} for record in records: sequence = int_or_none(record.get("group_sequence")) digest = record.get("blake3") if sequence is None or not isinstance(digest, str) or not digest.strip(): continue out.setdefault(sequence, digest.strip()) return out def hash_sets_by_sequence(records: list[dict[str, Any]]) -> dict[int, set[str]]: out: dict[int, set[str]] = {} for record in records: sequence = int_or_none(record.get("group_sequence")) digest = record.get("blake3") if sequence is None or not isinstance(digest, str) or not digest.strip(): continue out.setdefault(sequence, set()).add(digest.strip()) return out def compare_manifest_hashes(named_records: dict[str, list[dict[str, Any]]]) -> dict[str, Any]: input_manifest_count = len(named_records) missing_source_identity_records = 0 source_records: dict[str, list[dict[str, Any]]] = {} for manifest_name, records in named_records.items(): for index, record in enumerate(records): source_identity = record_source_identity(record) if source_identity is None: missing_source_identity_records += 1 source_identity = f"manifest:{manifest_name}" source_records.setdefault(source_identity, []).append(record) names = sorted(source_records) per_name = {name: hash_sets_by_sequence(records) for name, records in source_records.items()} all_sequences = sorted(set().union(*(set(value) for value in per_name.values()))) if per_name else [] shared_sequences = [ sequence for sequence in all_sequences if all(sequence in per_name[name] for name in names) ] matching = 0 divergent = 0 examples: list[dict[str, Any]] = [] for sequence in shared_sequences: values = {name: per_name[name][sequence] for name in names} flattened = [next(iter(digests)) for digests in values.values() if len(digests) == 1] if len(flattened) == len(names) and len(set(flattened)) == 1: matching += 1 else: divergent += 1 if len(examples) < 5: examples.append( { "sequence": sequence, "hashes": { name: sorted(digests) for name, digests in values.items() }, } ) source_identity_ok = missing_source_identity_records == 0 and len(names) >= 2 return { "publisher_count": len(names), "publishers": names, "input_manifest_count": input_manifest_count, "source_identity_count": len(names), "source_identities": names, "missing_source_identity_records": missing_source_identity_records, "source_identity_ok": source_identity_ok, "sequence_count": len(all_sequences), "shared_sequence_count": len(shared_sequences), "matching_sequence_count": matching, "divergent_sequence_count": divergent, "missing_sequence_count": max(0, len(all_sequences) - len(shared_sequences)), "divergent_examples": examples, "byte_for_byte_hash_match": bool( source_identity_ok and shared_sequences and divergent == 0 and matching == len(shared_sequences) ), } def prometheus_query_url(prometheus_url: str, expr: str) -> str: return ( prometheus_url.rstrip() .rstrip("/") + "/api/v1/query?" + urllib.parse.urlencode({"query": expr}) ) def prometheus_metric_sum( prometheus_url: str, metric: str, *, broadcast: str, timeout: float, fetcher: Callable[[str, float, int], FetchResult] = fetch_text, ) -> dict[str, Any]: selector = f'{metric}{{broadcast="{broadcast}"}}' expr = f"sum({selector})" fetched = fetcher(prometheus_query_url(prometheus_url, expr), timeout, 1024 * 1024) if not fetched.ok: return {"metric": metric, "ok": False, "value": None, "error": fetched.error} try: payload = json.loads(fetched.body) result = payload.get("data", {}).get("result", []) if not result: return {"metric": metric, "ok": True, "value": None, "series_present": False} raw_value = result[0].get("value", [None, None])[1] value = float(raw_value) except Exception as err: # noqa: BLE001 - preserve malformed Prometheus replies. return {"metric": metric, "ok": False, "value": None, "error": f"invalid prometheus response: {err}"} return {"metric": metric, "ok": True, "value": value, "series_present": True} def agent_manifest_url(base_url: str, *, broadcast: str, track: str, role: str, max_bytes: int) -> str: query = { "broadcast": broadcast, "track": track, "max_bytes": str(max_bytes), } if role: query["role"] = role return base_url.rstrip("/") + "/v1/archive-manifest?" + urllib.parse.urlencode(query) def sample_publishers( publisher_urls: dict[str, str], *, timeout: float, fetcher: Callable[[str, float, int], FetchResult] = fetch_text, ) -> dict[str, Any]: out: dict[str, Any] = {} for name, base_url in publisher_urls.items(): base = base_url.rstrip("/") health = fetcher(f"{base}/health", timeout, 1024 * 1024) metrics = fetcher(f"{base}/metrics", timeout, 2 * 1024 * 1024) row: dict[str, Any] = { "agent_url": base, "health_ok": health.ok, "metrics_ok": metrics.ok, "health_error": health.error, "metrics_error": metrics.error, "duplicate_metrics_present": False, "node_modes": [], "unhealthy_processes": [], } if health.ok: try: payload = json.loads(health.body) row["node_modes"] = payload.get("node_modes") if isinstance(payload.get("node_modes"), list) else [] row["unhealthy_processes"] = ( payload.get("unhealthy_processes") if isinstance(payload.get("unhealthy_processes"), list) else [] ) system = payload.get("system") if isinstance(payload.get("system"), dict) else {} row["hostname"] = system.get("hostname") or payload.get("hostname") except json.JSONDecodeError: row["health_error"] = "invalid health json" if metrics.ok: row["duplicate_metrics_present"] = any(metric in metrics.body for metric in DUPLICATE_PROMETHEUS_METRICS) row["metrics_bytes"] = len(metrics.body.encode("utf-8")) out[name] = row return out def sample_once(args: argparse.Namespace) -> dict[str, Any]: manifests: dict[str, str] = dict(parse_named_url(item) for item in args.manifest) if not manifests and args.archive_origin and args.broadcast and args.track: manifests["archive-origin"] = manifest_url(args.archive_origin, args.broadcast, args.track) publisher_urls: dict[str, str] = dict(parse_named_url(item) for item in args.publisher) agent_manifest_urls: dict[str, str] = dict(parse_named_url(item) for item in args.agent_manifest) fetched_records: dict[str, list[dict[str, Any]]] = {} manifest_stats: dict[str, Any] = {} for name, url in manifests.items(): fetched = fetch_text(url, args.timeout, max_bytes=args.max_manifest_bytes) if not fetched.ok: manifest_stats[name] = {"url": url, "ok": False, "error": fetched.error} continue records, invalid_lines = parse_manifest_jsonl(fetched.body) fetched_records[name] = records manifest_stats[name] = { "url": url, "ok": True, "fetch_elapsed_ms": fetched.elapsed_ms, **manifest_hash_stats(records, invalid_lines), } if agent_manifest_urls and args.broadcast and args.track: for name, base_url in agent_manifest_urls.items(): url = agent_manifest_url( base_url, broadcast=args.broadcast, track=args.track, role=args.agent_manifest_role, max_bytes=args.max_manifest_bytes, ) fetched, payload = fetch_json(url, args.timeout, max_bytes=args.max_manifest_bytes + 1024 * 1024) if not fetched.ok or not isinstance(payload, dict) or payload.get("ok") is not True: manifest_stats[name] = { "url": url, "ok": False, "source": "node-agent", "error": fetched.error or (payload.get("error") if isinstance(payload, dict) else "invalid response"), } continue records = payload.get("records") if isinstance(payload.get("records"), list) else [] records = [record for record in records if isinstance(record, dict)] fetched_records[name] = records invalid_lines = int_or_none(payload.get("invalid_lines")) or 0 stats = payload.get("stats") if isinstance(payload.get("stats"), dict) else {} manifest_stats[name] = { "url": url, "ok": True, "source": "node-agent", "fetch_elapsed_ms": fetched.elapsed_ms, "role": payload.get("role"), "file_bytes": int_or_none(payload.get("file_bytes")), "partial_scan": payload.get("partial_scan") is True, **manifest_hash_stats(records, invalid_lines), "node_agent_stats": stats, } prometheus_metrics = [] if args.prometheus_url and args.broadcast: for metric in DUPLICATE_PROMETHEUS_METRICS: prometheus_metrics.append( prometheus_metric_sum(args.prometheus_url, metric, broadcast=args.broadcast, timeout=args.timeout) ) return { "sample_unix_ms": now_ms(), "broadcast": args.broadcast, "track": args.track, "publishers": sample_publishers(publisher_urls, timeout=args.timeout) if publisher_urls else {}, "manifests": manifest_stats, "manifest_comparison": compare_manifest_hashes(fetched_records) if len(fetched_records) >= 2 else None, "prometheus": prometheus_metrics, } def summarize(samples: list[dict[str, Any]]) -> dict[str, Any]: if not samples: return {"ok": False, "reasons": ["no_samples"]} reasons: list[str] = [] elapsed_ms = max(0, int(samples[-1]["sample_unix_ms"]) - int(samples[0]["sample_unix_ms"])) if len(samples) < 2 or elapsed_ms <= 0: reasons.append("insufficient_elapsed_samples") publisher_rows = [ row for sample in samples for row in (sample.get("publishers") or {}).values() if isinstance(row, dict) ] if publisher_rows and not all(row.get("health_ok") is True for row in publisher_rows): reasons.append("publisher_health_missing") if publisher_rows and not any(row.get("metrics_ok") is True for row in publisher_rows): reasons.append("publisher_metrics_missing") if publisher_rows and not any(row.get("duplicate_metrics_present") is True for row in publisher_rows): reasons.append("duplicate_metrics_not_deployed_to_publishers") comparisons = [ sample.get("manifest_comparison") for sample in samples if isinstance(sample.get("manifest_comparison"), dict) ] latest_comparison = comparisons[-1] if comparisons else None if latest_comparison is None: reasons.append("manifest_comparison_missing") elif latest_comparison.get("source_identity_ok") is not True: reasons.append("manifest_source_identity_missing_or_not_diverse") elif latest_comparison.get("matching_sequence_count", 0) <= 0: reasons.append("no_matching_duplicate_sequences") elif latest_comparison.get("divergent_sequence_count", 0) > 0: reasons.append("duplicate_hash_divergence_observed") manifest_rows = [ row for sample in samples for row in (sample.get("manifests") or {}).values() if isinstance(row, dict) ] if manifest_rows and any(row.get("ok") is not True for row in manifest_rows): reasons.append("manifest_fetch_missing") if manifest_rows and any(int_or_none(row.get("hash_divergent_sequences")) or 0 for row in manifest_rows): reasons.append("manifest_hash_divergence_observed") if manifest_rows and any(int_or_none(row.get("missing_hash_records")) or 0 for row in manifest_rows): reasons.append("manifest_hash_missing_records") if manifest_rows and any(int_or_none(row.get("missing_source_identity_records")) or 0 for row in manifest_rows): reasons.append("manifest_source_identity_missing") if manifest_rows and any(int_or_none(row.get("invalid_lines")) or 0 for row in manifest_rows): reasons.append("manifest_invalid_lines") prom_rows = [ row for sample in samples for row in (sample.get("prometheus") or []) if isinstance(row, dict) ] prom_series = [row for row in prom_rows if row.get("series_present") is True] if prom_rows and not prom_series: reasons.append("prometheus_duplicate_series_missing") divergent_values = [ float(row.get("value") or 0) for row in prom_series if str(row.get("metric", "")).endswith("hash_divergent_sequences") ] if any(value > 0 for value in divergent_values): reasons.append("prometheus_hash_divergence_nonzero") missing_source_values = [ float(row.get("value") or 0) for row in prom_series if str(row.get("metric", "")).endswith("missing_source_identity_records") ] if any(value > 0 for value in missing_source_values): reasons.append("prometheus_source_identity_missing_nonzero") return { "ok": not reasons, "elapsed_ms": elapsed_ms, "sample_count": len(samples), "reasons": reasons, "latest_manifest_comparison": latest_comparison, "prometheus_series_present_count": len(prom_series), "publisher_count": len(samples[-1].get("publishers") or {}), } def measure(args: argparse.Namespace) -> dict[str, Any]: samples: list[dict[str, Any]] = [] started = time.monotonic() while True: samples.append(sample_once(args)) if args.duration_seconds <= 0: break if time.monotonic() - started >= args.duration_seconds: break time.sleep(args.poll_interval_seconds) report = { "started_unix_ms": samples[0]["sample_unix_ms"] if samples else now_ms(), "duration_seconds": args.duration_seconds, "samples": samples, } report["summary"] = summarize(samples) return report def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--publisher", action="append", default=[], help="Named node-agent URL, NAME=http://IP:7799.") parser.add_argument("--manifest", action="append", default=[], help="Named archive JSONL URL, NAME=https://...") parser.add_argument( "--agent-manifest", action="append", default=[], help="Named node-agent URL to sample /v1/archive-manifest from, NAME=http://IP:7799.", ) parser.add_argument("--agent-manifest-role", default="publisher-buffer") parser.add_argument("--archive-origin", default="", help="Archive origin root for manifests//.jsonl.") parser.add_argument("--prometheus-url", default="", help="Prometheus base URL for Grafana-facing metrics.") parser.add_argument("--broadcast", default="", help="Logical broadcast name to measure.") parser.add_argument( "--track", default="publisher.m4s", help="Track name to compare. Defaults to publisher-origin proof fragments, not relay video.", ) parser.add_argument("--duration-seconds", type=float, default=0.0) parser.add_argument("--poll-interval-seconds", type=float, default=30.0) parser.add_argument("--timeout", type=float, default=10.0) parser.add_argument("--max-manifest-bytes", type=int, default=4 * 1024 * 1024) parser.add_argument("--pretty", action="store_true") parser.add_argument("--require-ok", action="store_true", help="Exit non-zero unless summary.ok is true.") return parser def main(argv: list[str] | None = None) -> int: parser = build_parser() args = parser.parse_args(argv) try: report = measure(args) except Exception as err: # noqa: BLE001 - command-line tool should preserve exact failure. print(json.dumps({"ok": False, "error": str(err)}, sort_keys=True), file=sys.stderr) return 1 if args.pretty: print(json.dumps(report, indent=2, sort_keys=True)) else: print(json.dumps(report, sort_keys=True)) if args.require_ok and not report.get("summary", {}).get("ok"): return 2 return 0 if __name__ == "__main__": raise SystemExit(main())