Add duplicate publisher determinism proof
Some checks failed
deploy-cloudflare / checks (push) Failing after 3s
ci-gates / checks (push) Failing after 5s
deploy-cloudflare / deploy (push) Has been skipped

This commit is contained in:
every.channel 2026-06-10 03:28:55 -07:00
parent 5d0f3077d3
commit 91dad67fc2
No known key found for this signature in database
18 changed files with 21569 additions and 595 deletions

View file

@ -0,0 +1,320 @@
#!/usr/bin/env python3
from __future__ import annotations
import importlib.util
import json
import sys
import unittest
from pathlib import Path
REPO = Path(__file__).resolve().parents[1]
SCRIPT = REPO / "scripts" / "measure-duplicate-publishers.py"
def load_module():
spec = importlib.util.spec_from_file_location("measure_duplicate_publishers", SCRIPT)
if spec is None or spec.loader is None:
raise RuntimeError(f"unable to load {SCRIPT}")
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
class MeasureDuplicatePublishersTest(unittest.TestCase):
def test_manifest_hash_stats_counts_duplicates_divergence_and_missing_hashes(self) -> None:
module = load_module()
records = [
{"group_sequence": 10, "received_unix_ms": 1_000, "blake3": "same", "source_node": "nuc-a"},
{"group_sequence": 10, "received_unix_ms": 1_001, "blake3": "same", "source_node": "nuc-b"},
{"group_sequence": 11, "received_unix_ms": 2_000, "blake3": "left", "source_node": "nuc-a"},
{"group_sequence": 11, "received_unix_ms": 2_001, "blake3": "right", "source_node": "nuc-b"},
{"group_sequence": 12, "received_unix_ms": 3_000},
]
stats = module.manifest_hash_stats(records, invalid_lines=2)
self.assertEqual(5, stats["record_count"])
self.assertEqual(2, stats["invalid_lines"])
self.assertEqual(2, stats["sequence_count"])
self.assertEqual(2, stats["source_identity_count"])
self.assertEqual(["nuc-a", "nuc-b"], stats["source_identities"])
self.assertEqual(1, stats["missing_source_identity_records"])
self.assertEqual(1, stats["duplicate_hash_source_records"])
self.assertEqual(1, stats["duplicate_hash_sequences"])
self.assertEqual(1, stats["hash_divergent_sequences"])
self.assertEqual(1, stats["missing_hash_records"])
self.assertEqual(1_000, stats["first_received_unix_ms"])
self.assertEqual(3_000, stats["latest_received_unix_ms"])
def test_compare_manifest_hashes_proves_byte_for_byte_matches(self) -> None:
module = load_module()
comparison = module.compare_manifest_hashes(
{
"publisher-a": [
{"group_sequence": 1, "blake3": "a", "source_node": "publisher-a"},
{"group_sequence": 2, "blake3": "b", "source_node": "publisher-a"},
],
"publisher-b": [
{"group_sequence": 1, "blake3": "a", "source_node": "publisher-b"},
{"group_sequence": 2, "blake3": "b", "source_node": "publisher-b"},
],
}
)
self.assertTrue(comparison["byte_for_byte_hash_match"])
self.assertTrue(comparison["source_identity_ok"])
self.assertEqual(["publisher-a", "publisher-b"], comparison["source_identities"])
self.assertEqual(2, comparison["matching_sequence_count"])
self.assertEqual(0, comparison["divergent_sequence_count"])
self.assertEqual(0, comparison["missing_sequence_count"])
def test_compare_manifest_hashes_reports_divergent_sequences(self) -> None:
module = load_module()
comparison = module.compare_manifest_hashes(
{
"publisher-a": [
{"group_sequence": 1, "blake3": "a", "source_node": "publisher-a"},
{"group_sequence": 2, "blake3": "b", "source_node": "publisher-a"},
],
"publisher-b": [
{"group_sequence": 1, "blake3": "a", "source_node": "publisher-b"},
{"group_sequence": 2, "blake3": "different", "source_node": "publisher-b"},
{"group_sequence": 3, "blake3": "extra", "source_node": "publisher-b"},
],
}
)
self.assertFalse(comparison["byte_for_byte_hash_match"])
self.assertEqual(1, comparison["matching_sequence_count"])
self.assertEqual(1, comparison["divergent_sequence_count"])
self.assertEqual(1, comparison["missing_sequence_count"])
self.assertEqual(2, comparison["divergent_examples"][0]["sequence"])
self.assertEqual(["different"], comparison["divergent_examples"][0]["hashes"]["publisher-b"])
def test_compare_manifest_hashes_rejects_intra_manifest_divergence(self) -> None:
module = load_module()
comparison = module.compare_manifest_hashes(
{
"publisher-a": [
{"group_sequence": 1, "blake3": "same", "source_node": "publisher-a"},
],
"publisher-b": [
{"group_sequence": 1, "blake3": "same", "source_node": "publisher-b"},
{"group_sequence": 1, "blake3": "different", "source_node": "publisher-b"},
],
}
)
self.assertFalse(comparison["byte_for_byte_hash_match"])
self.assertEqual(0, comparison["matching_sequence_count"])
self.assertEqual(1, comparison["divergent_sequence_count"])
self.assertEqual(["different", "same"], comparison["divergent_examples"][0]["hashes"]["publisher-b"])
def test_compare_manifest_hashes_rejects_mirrored_same_source_records(self) -> None:
module = load_module()
comparison = module.compare_manifest_hashes(
{
"nuc-a-buffer": [
{"group_sequence": 1, "blake3": "same", "source_node": "archive-origin"},
],
"nuc-b-buffer": [
{"group_sequence": 1, "blake3": "same", "source_node": "archive-origin"},
],
}
)
self.assertFalse(comparison["byte_for_byte_hash_match"])
self.assertFalse(comparison["source_identity_ok"])
self.assertEqual(["archive-origin"], comparison["source_identities"])
def test_summary_requires_manifest_comparison_and_prometheus_series(self) -> None:
module = load_module()
summary = module.summarize(
[
{
"sample_unix_ms": 1_000,
"publishers": {
"a": {"health_ok": True, "metrics_ok": True, "duplicate_metrics_present": True},
"b": {"health_ok": True, "metrics_ok": True, "duplicate_metrics_present": True},
},
"manifest_comparison": {
"source_identity_ok": True,
"matching_sequence_count": 2,
"divergent_sequence_count": 0,
"byte_for_byte_hash_match": True,
},
"prometheus": [
{
"metric": "every_channel_ladder_archive_duplicate_hash_source_records",
"ok": True,
"series_present": True,
"value": 2,
},
{
"metric": "every_channel_ladder_archive_hash_divergent_sequences",
"ok": True,
"series_present": True,
"value": 0,
},
],
},
{
"sample_unix_ms": 31_000,
"publishers": {
"a": {"health_ok": True, "metrics_ok": True, "duplicate_metrics_present": True},
"b": {"health_ok": True, "metrics_ok": True, "duplicate_metrics_present": True},
},
"manifest_comparison": {
"source_identity_ok": True,
"matching_sequence_count": 4,
"divergent_sequence_count": 0,
"byte_for_byte_hash_match": True,
},
"prometheus": [
{
"metric": "every_channel_ladder_archive_duplicate_hash_source_records",
"ok": True,
"series_present": True,
"value": 4,
},
{
"metric": "every_channel_ladder_archive_hash_divergent_sequences",
"ok": True,
"series_present": True,
"value": 0,
},
],
},
]
)
self.assertTrue(summary["ok"])
self.assertEqual(30_000, summary["elapsed_ms"])
self.assertEqual(2, summary["sample_count"])
self.assertEqual(4, summary["latest_manifest_comparison"]["matching_sequence_count"])
def test_summary_rejects_single_sample_and_manifest_hash_errors(self) -> None:
module = load_module()
summary = module.summarize(
[
{
"sample_unix_ms": 1_000,
"publishers": {
"a": {"health_ok": True, "metrics_ok": True, "duplicate_metrics_present": True},
"b": {"health_ok": True, "metrics_ok": True, "duplicate_metrics_present": True},
},
"manifests": {
"a": {
"ok": True,
"hash_divergent_sequences": 1,
"missing_hash_records": 1,
"invalid_lines": 1,
},
},
"manifest_comparison": {
"source_identity_ok": True,
"matching_sequence_count": 2,
"divergent_sequence_count": 0,
"byte_for_byte_hash_match": True,
},
"prometheus": [
{
"metric": "every_channel_ladder_archive_duplicate_hash_source_records",
"ok": True,
"series_present": True,
"value": 2,
},
],
},
]
)
self.assertFalse(summary["ok"])
self.assertIn("insufficient_elapsed_samples", summary["reasons"])
self.assertIn("manifest_hash_divergence_observed", summary["reasons"])
self.assertIn("manifest_hash_missing_records", summary["reasons"])
self.assertIn("manifest_invalid_lines", summary["reasons"])
def test_summary_rejects_missing_or_non_diverse_source_identity(self) -> None:
module = load_module()
summary = module.summarize(
[
{
"sample_unix_ms": 1_000,
"manifest_comparison": {
"source_identity_ok": False,
"matching_sequence_count": 2,
"divergent_sequence_count": 0,
"byte_for_byte_hash_match": False,
},
},
{
"sample_unix_ms": 31_000,
"manifest_comparison": {
"source_identity_ok": False,
"matching_sequence_count": 2,
"divergent_sequence_count": 0,
"byte_for_byte_hash_match": False,
},
"prometheus": [
{
"metric": "every_channel_archive_missing_source_identity_records",
"ok": True,
"series_present": True,
"value": 2,
},
],
},
]
)
self.assertFalse(summary["ok"])
self.assertIn("manifest_source_identity_missing_or_not_diverse", summary["reasons"])
self.assertIn("prometheus_source_identity_missing_nonzero", summary["reasons"])
def test_agent_manifest_url_builds_bounded_tailnet_endpoint(self) -> None:
module = load_module()
url = module.agent_manifest_url(
"http://100.64.0.5:7799/",
broadcast="la-kcop",
track="0.m4s",
role="publisher-buffer",
max_bytes=4096,
)
self.assertEqual(
"http://100.64.0.5:7799/v1/archive-manifest?broadcast=la-kcop&track=0.m4s&max_bytes=4096&role=publisher-buffer",
url,
)
def test_parser_defaults_to_publisher_origin_proof_track(self) -> None:
module = load_module()
args = module.build_parser().parse_args([])
self.assertEqual("publisher.m4s", args.track)
def test_parse_manifest_jsonl_tolerates_partial_first_tail_line(self) -> None:
module = load_module()
body = 'not-json-prefix{"group_sequence":1}\n{"group_sequence":2,"blake3":"b"}\n'
records, invalid = module.parse_manifest_jsonl(body)
self.assertEqual(0, invalid)
self.assertEqual([2], [record["group_sequence"] for record in records])
if __name__ == "__main__":
unittest.main()

View file

@ -0,0 +1,581 @@
#!/usr/bin/env python3
"""Measure duplicate publisher media-hash convergence in production."""
from __future__ import annotations
import argparse
import json
import sys
import time
import urllib.parse
import urllib.request
from dataclasses import dataclass
from typing import Any, Callable
USER_AGENT = "every-channel-measure-duplicate-publishers/1"
DUPLICATE_PROMETHEUS_METRICS = [
"every_channel_ladder_archive_duplicate_hash_source_records",
"every_channel_ladder_archive_duplicate_hash_sequences",
"every_channel_ladder_archive_hash_divergent_sequences",
"every_channel_ladder_archive_missing_hash_records",
"every_channel_ladder_archive_missing_source_identity_records",
"every_channel_archive_duplicate_hash_source_records",
"every_channel_archive_duplicate_hash_sequences",
"every_channel_archive_hash_divergent_sequences",
"every_channel_archive_missing_hash_records",
"every_channel_archive_missing_source_identity_records",
]
SOURCE_IDENTITY_KEYS = ("source_node", "publisher_node", "source_id")
@dataclass
class FetchResult:
url: str
status: int
body: str
elapsed_ms: int
error: str | None = None
@property
def ok(self) -> bool:
return self.error is None and 200 <= self.status < 300
def now_ms() -> int:
return int(time.time() * 1000)
def fetch_text(url: str, timeout: float, max_bytes: int = 4 * 1024 * 1024) -> FetchResult:
started = now_ms()
headers = {"User-Agent": USER_AGENT}
if max_bytes > 0:
headers["Range"] = f"bytes=-{max_bytes}"
req = urllib.request.Request(url, headers=headers)
try:
with urllib.request.urlopen(req, timeout=timeout) as res:
body = res.read(max_bytes + 1 if max_bytes > 0 else -1)
if max_bytes > 0 and len(body) > max_bytes:
body = body[-max_bytes:]
return FetchResult(url, int(res.status), body.decode("utf-8", "replace"), now_ms() - started)
except Exception as err: # noqa: BLE001 - measurements preserve transport failures.
return FetchResult(url, 0, "", now_ms() - started, str(err))
def fetch_json(url: str, timeout: float, max_bytes: int = 1024 * 1024) -> tuple[FetchResult, Any | None]:
fetched = fetch_text(url, timeout, max_bytes=max_bytes)
if not fetched.ok:
return fetched, None
try:
return fetched, json.loads(fetched.body)
except json.JSONDecodeError as err:
fetched.error = f"invalid json: {err}"
return fetched, None
def parse_named_url(value: str) -> tuple[str, str]:
if "=" not in value:
raise ValueError(f"expected NAME=URL: {value}")
name, url = value.split("=", 1)
name = name.strip()
url = url.strip()
if not name or not url:
raise ValueError(f"expected NAME=URL: {value}")
return name, url
def manifest_url(origin: str, broadcast: str, track: str) -> str:
base = origin.rstrip("/") + "/"
return urllib.parse.urljoin(base, f"manifests/{broadcast}/{track}.jsonl")
def parse_manifest_jsonl(body: str) -> tuple[list[dict[str, Any]], int]:
records: list[dict[str, Any]] = []
invalid_lines = 0
for index, line in enumerate(body.splitlines()):
raw = line.strip()
if not raw:
continue
try:
record = json.loads(raw)
except json.JSONDecodeError:
# Tail range reads may start in the middle of a JSON line.
if index == 0:
continue
invalid_lines += 1
continue
if isinstance(record, dict):
records.append(record)
else:
invalid_lines += 1
return records, invalid_lines
def int_or_none(value: Any) -> int | None:
if isinstance(value, bool):
return None
if isinstance(value, int):
return value
try:
return int(str(value))
except (TypeError, ValueError):
return None
def record_source_identity(record: dict[str, Any]) -> str | None:
for key in SOURCE_IDENTITY_KEYS:
value = record.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
return None
def manifest_hash_stats(records: list[dict[str, Any]], invalid_lines: int = 0) -> dict[str, Any]:
hashes_by_sequence: dict[int, set[str]] = {}
source_hashes_by_sequence: dict[int, dict[str, set[str]]] = {}
missing_hash_records = 0
missing_source_identity_records = 0
source_identities: set[str] = set()
received_values: list[int] = []
for record in records:
received_ms = int_or_none(record.get("received_unix_ms"))
if received_ms is not None:
received_values.append(received_ms)
sequence = int_or_none(record.get("group_sequence"))
digest = record.get("blake3")
if sequence is None:
continue
source_identity = record_source_identity(record)
if source_identity:
source_identities.add(source_identity)
else:
missing_source_identity_records += 1
if not isinstance(digest, str) or not digest.strip():
missing_hash_records += 1
continue
clean_digest = digest.strip()
hashes_by_sequence.setdefault(sequence, set()).add(clean_digest)
if source_identity:
source_hashes_by_sequence.setdefault(sequence, {}).setdefault(clean_digest, set()).add(source_identity)
duplicate_hash_source_records = sum(
max(0, len(source_identities_for_hash) - 1)
for hashes in source_hashes_by_sequence.values()
for source_identities_for_hash in hashes.values()
)
duplicate_hash_sequences = sum(
1
for hashes in source_hashes_by_sequence.values()
if any(len(source_identities_for_hash) > 1 for source_identities_for_hash in hashes.values())
)
hash_divergent_sequences = sum(1 for hashes in hashes_by_sequence.values() if len(hashes) > 1)
return {
"record_count": len(records),
"invalid_lines": invalid_lines,
"sequence_count": len(hashes_by_sequence),
"source_identity_count": len(source_identities),
"source_identities": sorted(source_identities),
"missing_source_identity_records": missing_source_identity_records,
"duplicate_hash_source_records": duplicate_hash_source_records,
"duplicate_hash_sequences": duplicate_hash_sequences,
"hash_divergent_sequences": hash_divergent_sequences,
"missing_hash_records": missing_hash_records,
"first_received_unix_ms": min(received_values) if received_values else None,
"latest_received_unix_ms": max(received_values) if received_values else None,
}
def first_hash_by_sequence(records: list[dict[str, Any]]) -> dict[int, str]:
out: dict[int, str] = {}
for record in records:
sequence = int_or_none(record.get("group_sequence"))
digest = record.get("blake3")
if sequence is None or not isinstance(digest, str) or not digest.strip():
continue
out.setdefault(sequence, digest.strip())
return out
def hash_sets_by_sequence(records: list[dict[str, Any]]) -> dict[int, set[str]]:
out: dict[int, set[str]] = {}
for record in records:
sequence = int_or_none(record.get("group_sequence"))
digest = record.get("blake3")
if sequence is None or not isinstance(digest, str) or not digest.strip():
continue
out.setdefault(sequence, set()).add(digest.strip())
return out
def compare_manifest_hashes(named_records: dict[str, list[dict[str, Any]]]) -> dict[str, Any]:
input_manifest_count = len(named_records)
missing_source_identity_records = 0
source_records: dict[str, list[dict[str, Any]]] = {}
for manifest_name, records in named_records.items():
for index, record in enumerate(records):
source_identity = record_source_identity(record)
if source_identity is None:
missing_source_identity_records += 1
source_identity = f"manifest:{manifest_name}"
source_records.setdefault(source_identity, []).append(record)
names = sorted(source_records)
per_name = {name: hash_sets_by_sequence(records) for name, records in source_records.items()}
all_sequences = sorted(set().union(*(set(value) for value in per_name.values()))) if per_name else []
shared_sequences = [
sequence
for sequence in all_sequences
if all(sequence in per_name[name] for name in names)
]
matching = 0
divergent = 0
examples: list[dict[str, Any]] = []
for sequence in shared_sequences:
values = {name: per_name[name][sequence] for name in names}
flattened = [next(iter(digests)) for digests in values.values() if len(digests) == 1]
if len(flattened) == len(names) and len(set(flattened)) == 1:
matching += 1
else:
divergent += 1
if len(examples) < 5:
examples.append(
{
"sequence": sequence,
"hashes": {
name: sorted(digests)
for name, digests in values.items()
},
}
)
source_identity_ok = missing_source_identity_records == 0 and len(names) >= 2
return {
"publisher_count": len(names),
"publishers": names,
"input_manifest_count": input_manifest_count,
"source_identity_count": len(names),
"source_identities": names,
"missing_source_identity_records": missing_source_identity_records,
"source_identity_ok": source_identity_ok,
"sequence_count": len(all_sequences),
"shared_sequence_count": len(shared_sequences),
"matching_sequence_count": matching,
"divergent_sequence_count": divergent,
"missing_sequence_count": max(0, len(all_sequences) - len(shared_sequences)),
"divergent_examples": examples,
"byte_for_byte_hash_match": bool(
source_identity_ok and shared_sequences and divergent == 0 and matching == len(shared_sequences)
),
}
def prometheus_query_url(prometheus_url: str, expr: str) -> str:
return (
prometheus_url.rstrip()
.rstrip("/")
+ "/api/v1/query?"
+ urllib.parse.urlencode({"query": expr})
)
def prometheus_metric_sum(
prometheus_url: str,
metric: str,
*,
broadcast: str,
timeout: float,
fetcher: Callable[[str, float, int], FetchResult] = fetch_text,
) -> dict[str, Any]:
selector = f'{metric}{{broadcast="{broadcast}"}}'
expr = f"sum({selector})"
fetched = fetcher(prometheus_query_url(prometheus_url, expr), timeout, 1024 * 1024)
if not fetched.ok:
return {"metric": metric, "ok": False, "value": None, "error": fetched.error}
try:
payload = json.loads(fetched.body)
result = payload.get("data", {}).get("result", [])
if not result:
return {"metric": metric, "ok": True, "value": None, "series_present": False}
raw_value = result[0].get("value", [None, None])[1]
value = float(raw_value)
except Exception as err: # noqa: BLE001 - preserve malformed Prometheus replies.
return {"metric": metric, "ok": False, "value": None, "error": f"invalid prometheus response: {err}"}
return {"metric": metric, "ok": True, "value": value, "series_present": True}
def agent_manifest_url(base_url: str, *, broadcast: str, track: str, role: str, max_bytes: int) -> str:
query = {
"broadcast": broadcast,
"track": track,
"max_bytes": str(max_bytes),
}
if role:
query["role"] = role
return base_url.rstrip("/") + "/v1/archive-manifest?" + urllib.parse.urlencode(query)
def sample_publishers(
publisher_urls: dict[str, str],
*,
timeout: float,
fetcher: Callable[[str, float, int], FetchResult] = fetch_text,
) -> dict[str, Any]:
out: dict[str, Any] = {}
for name, base_url in publisher_urls.items():
base = base_url.rstrip("/")
health = fetcher(f"{base}/health", timeout, 1024 * 1024)
metrics = fetcher(f"{base}/metrics", timeout, 2 * 1024 * 1024)
row: dict[str, Any] = {
"agent_url": base,
"health_ok": health.ok,
"metrics_ok": metrics.ok,
"health_error": health.error,
"metrics_error": metrics.error,
"duplicate_metrics_present": False,
"node_modes": [],
"unhealthy_processes": [],
}
if health.ok:
try:
payload = json.loads(health.body)
row["node_modes"] = payload.get("node_modes") if isinstance(payload.get("node_modes"), list) else []
row["unhealthy_processes"] = (
payload.get("unhealthy_processes")
if isinstance(payload.get("unhealthy_processes"), list)
else []
)
system = payload.get("system") if isinstance(payload.get("system"), dict) else {}
row["hostname"] = system.get("hostname") or payload.get("hostname")
except json.JSONDecodeError:
row["health_error"] = "invalid health json"
if metrics.ok:
row["duplicate_metrics_present"] = any(metric in metrics.body for metric in DUPLICATE_PROMETHEUS_METRICS)
row["metrics_bytes"] = len(metrics.body.encode("utf-8"))
out[name] = row
return out
def sample_once(args: argparse.Namespace) -> dict[str, Any]:
manifests: dict[str, str] = dict(parse_named_url(item) for item in args.manifest)
if not manifests and args.archive_origin and args.broadcast and args.track:
manifests["archive-origin"] = manifest_url(args.archive_origin, args.broadcast, args.track)
publisher_urls: dict[str, str] = dict(parse_named_url(item) for item in args.publisher)
agent_manifest_urls: dict[str, str] = dict(parse_named_url(item) for item in args.agent_manifest)
fetched_records: dict[str, list[dict[str, Any]]] = {}
manifest_stats: dict[str, Any] = {}
for name, url in manifests.items():
fetched = fetch_text(url, args.timeout, max_bytes=args.max_manifest_bytes)
if not fetched.ok:
manifest_stats[name] = {"url": url, "ok": False, "error": fetched.error}
continue
records, invalid_lines = parse_manifest_jsonl(fetched.body)
fetched_records[name] = records
manifest_stats[name] = {
"url": url,
"ok": True,
"fetch_elapsed_ms": fetched.elapsed_ms,
**manifest_hash_stats(records, invalid_lines),
}
if agent_manifest_urls and args.broadcast and args.track:
for name, base_url in agent_manifest_urls.items():
url = agent_manifest_url(
base_url,
broadcast=args.broadcast,
track=args.track,
role=args.agent_manifest_role,
max_bytes=args.max_manifest_bytes,
)
fetched, payload = fetch_json(url, args.timeout, max_bytes=args.max_manifest_bytes + 1024 * 1024)
if not fetched.ok or not isinstance(payload, dict) or payload.get("ok") is not True:
manifest_stats[name] = {
"url": url,
"ok": False,
"source": "node-agent",
"error": fetched.error or (payload.get("error") if isinstance(payload, dict) else "invalid response"),
}
continue
records = payload.get("records") if isinstance(payload.get("records"), list) else []
records = [record for record in records if isinstance(record, dict)]
fetched_records[name] = records
invalid_lines = int_or_none(payload.get("invalid_lines")) or 0
stats = payload.get("stats") if isinstance(payload.get("stats"), dict) else {}
manifest_stats[name] = {
"url": url,
"ok": True,
"source": "node-agent",
"fetch_elapsed_ms": fetched.elapsed_ms,
"role": payload.get("role"),
"file_bytes": int_or_none(payload.get("file_bytes")),
"partial_scan": payload.get("partial_scan") is True,
**manifest_hash_stats(records, invalid_lines),
"node_agent_stats": stats,
}
prometheus_metrics = []
if args.prometheus_url and args.broadcast:
for metric in DUPLICATE_PROMETHEUS_METRICS:
prometheus_metrics.append(
prometheus_metric_sum(args.prometheus_url, metric, broadcast=args.broadcast, timeout=args.timeout)
)
return {
"sample_unix_ms": now_ms(),
"broadcast": args.broadcast,
"track": args.track,
"publishers": sample_publishers(publisher_urls, timeout=args.timeout) if publisher_urls else {},
"manifests": manifest_stats,
"manifest_comparison": compare_manifest_hashes(fetched_records) if len(fetched_records) >= 2 else None,
"prometheus": prometheus_metrics,
}
def summarize(samples: list[dict[str, Any]]) -> dict[str, Any]:
if not samples:
return {"ok": False, "reasons": ["no_samples"]}
reasons: list[str] = []
elapsed_ms = max(0, int(samples[-1]["sample_unix_ms"]) - int(samples[0]["sample_unix_ms"]))
if len(samples) < 2 or elapsed_ms <= 0:
reasons.append("insufficient_elapsed_samples")
publisher_rows = [
row
for sample in samples
for row in (sample.get("publishers") or {}).values()
if isinstance(row, dict)
]
if publisher_rows and not all(row.get("health_ok") is True for row in publisher_rows):
reasons.append("publisher_health_missing")
if publisher_rows and not any(row.get("metrics_ok") is True for row in publisher_rows):
reasons.append("publisher_metrics_missing")
if publisher_rows and not any(row.get("duplicate_metrics_present") is True for row in publisher_rows):
reasons.append("duplicate_metrics_not_deployed_to_publishers")
comparisons = [
sample.get("manifest_comparison")
for sample in samples
if isinstance(sample.get("manifest_comparison"), dict)
]
latest_comparison = comparisons[-1] if comparisons else None
if latest_comparison is None:
reasons.append("manifest_comparison_missing")
elif latest_comparison.get("source_identity_ok") is not True:
reasons.append("manifest_source_identity_missing_or_not_diverse")
elif latest_comparison.get("matching_sequence_count", 0) <= 0:
reasons.append("no_matching_duplicate_sequences")
elif latest_comparison.get("divergent_sequence_count", 0) > 0:
reasons.append("duplicate_hash_divergence_observed")
manifest_rows = [
row
for sample in samples
for row in (sample.get("manifests") or {}).values()
if isinstance(row, dict)
]
if manifest_rows and any(row.get("ok") is not True for row in manifest_rows):
reasons.append("manifest_fetch_missing")
if manifest_rows and any(int_or_none(row.get("hash_divergent_sequences")) or 0 for row in manifest_rows):
reasons.append("manifest_hash_divergence_observed")
if manifest_rows and any(int_or_none(row.get("missing_hash_records")) or 0 for row in manifest_rows):
reasons.append("manifest_hash_missing_records")
if manifest_rows and any(int_or_none(row.get("missing_source_identity_records")) or 0 for row in manifest_rows):
reasons.append("manifest_source_identity_missing")
if manifest_rows and any(int_or_none(row.get("invalid_lines")) or 0 for row in manifest_rows):
reasons.append("manifest_invalid_lines")
prom_rows = [
row
for sample in samples
for row in (sample.get("prometheus") or [])
if isinstance(row, dict)
]
prom_series = [row for row in prom_rows if row.get("series_present") is True]
if prom_rows and not prom_series:
reasons.append("prometheus_duplicate_series_missing")
divergent_values = [
float(row.get("value") or 0)
for row in prom_series
if str(row.get("metric", "")).endswith("hash_divergent_sequences")
]
if any(value > 0 for value in divergent_values):
reasons.append("prometheus_hash_divergence_nonzero")
missing_source_values = [
float(row.get("value") or 0)
for row in prom_series
if str(row.get("metric", "")).endswith("missing_source_identity_records")
]
if any(value > 0 for value in missing_source_values):
reasons.append("prometheus_source_identity_missing_nonzero")
return {
"ok": not reasons,
"elapsed_ms": elapsed_ms,
"sample_count": len(samples),
"reasons": reasons,
"latest_manifest_comparison": latest_comparison,
"prometheus_series_present_count": len(prom_series),
"publisher_count": len(samples[-1].get("publishers") or {}),
}
def measure(args: argparse.Namespace) -> dict[str, Any]:
samples: list[dict[str, Any]] = []
started = time.monotonic()
while True:
samples.append(sample_once(args))
if args.duration_seconds <= 0:
break
if time.monotonic() - started >= args.duration_seconds:
break
time.sleep(args.poll_interval_seconds)
report = {
"started_unix_ms": samples[0]["sample_unix_ms"] if samples else now_ms(),
"duration_seconds": args.duration_seconds,
"samples": samples,
}
report["summary"] = summarize(samples)
return report
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--publisher", action="append", default=[], help="Named node-agent URL, NAME=http://IP:7799.")
parser.add_argument("--manifest", action="append", default=[], help="Named archive JSONL URL, NAME=https://...")
parser.add_argument(
"--agent-manifest",
action="append",
default=[],
help="Named node-agent URL to sample /v1/archive-manifest from, NAME=http://IP:7799.",
)
parser.add_argument("--agent-manifest-role", default="publisher-buffer")
parser.add_argument("--archive-origin", default="", help="Archive origin root for manifests/<broadcast>/<track>.jsonl.")
parser.add_argument("--prometheus-url", default="", help="Prometheus base URL for Grafana-facing metrics.")
parser.add_argument("--broadcast", default="", help="Logical broadcast name to measure.")
parser.add_argument(
"--track",
default="publisher.m4s",
help="Track name to compare. Defaults to publisher-origin proof fragments, not relay video.",
)
parser.add_argument("--duration-seconds", type=float, default=0.0)
parser.add_argument("--poll-interval-seconds", type=float, default=30.0)
parser.add_argument("--timeout", type=float, default=10.0)
parser.add_argument("--max-manifest-bytes", type=int, default=4 * 1024 * 1024)
parser.add_argument("--pretty", action="store_true")
parser.add_argument("--require-ok", action="store_true", help="Exit non-zero unless summary.ok is true.")
return parser
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
try:
report = measure(args)
except Exception as err: # noqa: BLE001 - command-line tool should preserve exact failure.
print(json.dumps({"ok": False, "error": str(err)}, sort_keys=True), file=sys.stderr)
return 1
if args.pretty:
print(json.dumps(report, indent=2, sort_keys=True))
else:
print(json.dumps(report, sort_keys=True))
if args.require_ok and not report.get("summary", {}).get("ok"):
return 2
return 0
if __name__ == "__main__":
raise SystemExit(main())