581 lines
24 KiB
Python
Executable file
581 lines
24 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""Measure duplicate publisher media-hash convergence in production."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
import time
|
|
import urllib.parse
|
|
import urllib.request
|
|
from dataclasses import dataclass
|
|
from typing import Any, Callable
|
|
|
|
|
|
USER_AGENT = "every-channel-measure-duplicate-publishers/1"
|
|
DUPLICATE_PROMETHEUS_METRICS = [
|
|
"every_channel_ladder_archive_duplicate_hash_source_records",
|
|
"every_channel_ladder_archive_duplicate_hash_sequences",
|
|
"every_channel_ladder_archive_hash_divergent_sequences",
|
|
"every_channel_ladder_archive_missing_hash_records",
|
|
"every_channel_ladder_archive_missing_source_identity_records",
|
|
"every_channel_archive_duplicate_hash_source_records",
|
|
"every_channel_archive_duplicate_hash_sequences",
|
|
"every_channel_archive_hash_divergent_sequences",
|
|
"every_channel_archive_missing_hash_records",
|
|
"every_channel_archive_missing_source_identity_records",
|
|
]
|
|
SOURCE_IDENTITY_KEYS = ("source_node", "publisher_node", "source_id")
|
|
|
|
|
|
@dataclass
|
|
class FetchResult:
|
|
url: str
|
|
status: int
|
|
body: str
|
|
elapsed_ms: int
|
|
error: str | None = None
|
|
|
|
@property
|
|
def ok(self) -> bool:
|
|
return self.error is None and 200 <= self.status < 300
|
|
|
|
|
|
def now_ms() -> int:
|
|
return int(time.time() * 1000)
|
|
|
|
|
|
def fetch_text(url: str, timeout: float, max_bytes: int = 4 * 1024 * 1024) -> FetchResult:
|
|
started = now_ms()
|
|
headers = {"User-Agent": USER_AGENT}
|
|
if max_bytes > 0:
|
|
headers["Range"] = f"bytes=-{max_bytes}"
|
|
req = urllib.request.Request(url, headers=headers)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=timeout) as res:
|
|
body = res.read(max_bytes + 1 if max_bytes > 0 else -1)
|
|
if max_bytes > 0 and len(body) > max_bytes:
|
|
body = body[-max_bytes:]
|
|
return FetchResult(url, int(res.status), body.decode("utf-8", "replace"), now_ms() - started)
|
|
except Exception as err: # noqa: BLE001 - measurements preserve transport failures.
|
|
return FetchResult(url, 0, "", now_ms() - started, str(err))
|
|
|
|
|
|
def fetch_json(url: str, timeout: float, max_bytes: int = 1024 * 1024) -> tuple[FetchResult, Any | None]:
|
|
fetched = fetch_text(url, timeout, max_bytes=max_bytes)
|
|
if not fetched.ok:
|
|
return fetched, None
|
|
try:
|
|
return fetched, json.loads(fetched.body)
|
|
except json.JSONDecodeError as err:
|
|
fetched.error = f"invalid json: {err}"
|
|
return fetched, None
|
|
|
|
|
|
def parse_named_url(value: str) -> tuple[str, str]:
|
|
if "=" not in value:
|
|
raise ValueError(f"expected NAME=URL: {value}")
|
|
name, url = value.split("=", 1)
|
|
name = name.strip()
|
|
url = url.strip()
|
|
if not name or not url:
|
|
raise ValueError(f"expected NAME=URL: {value}")
|
|
return name, url
|
|
|
|
|
|
def manifest_url(origin: str, broadcast: str, track: str) -> str:
|
|
base = origin.rstrip("/") + "/"
|
|
return urllib.parse.urljoin(base, f"manifests/{broadcast}/{track}.jsonl")
|
|
|
|
|
|
def parse_manifest_jsonl(body: str) -> tuple[list[dict[str, Any]], int]:
|
|
records: list[dict[str, Any]] = []
|
|
invalid_lines = 0
|
|
for index, line in enumerate(body.splitlines()):
|
|
raw = line.strip()
|
|
if not raw:
|
|
continue
|
|
try:
|
|
record = json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
# Tail range reads may start in the middle of a JSON line.
|
|
if index == 0:
|
|
continue
|
|
invalid_lines += 1
|
|
continue
|
|
if isinstance(record, dict):
|
|
records.append(record)
|
|
else:
|
|
invalid_lines += 1
|
|
return records, invalid_lines
|
|
|
|
|
|
def int_or_none(value: Any) -> int | None:
|
|
if isinstance(value, bool):
|
|
return None
|
|
if isinstance(value, int):
|
|
return value
|
|
try:
|
|
return int(str(value))
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
|
|
def record_source_identity(record: dict[str, Any]) -> str | None:
|
|
for key in SOURCE_IDENTITY_KEYS:
|
|
value = record.get(key)
|
|
if isinstance(value, str) and value.strip():
|
|
return value.strip()
|
|
return None
|
|
|
|
|
|
def manifest_hash_stats(records: list[dict[str, Any]], invalid_lines: int = 0) -> dict[str, Any]:
|
|
hashes_by_sequence: dict[int, set[str]] = {}
|
|
source_hashes_by_sequence: dict[int, dict[str, set[str]]] = {}
|
|
missing_hash_records = 0
|
|
missing_source_identity_records = 0
|
|
source_identities: set[str] = set()
|
|
received_values: list[int] = []
|
|
for record in records:
|
|
received_ms = int_or_none(record.get("received_unix_ms"))
|
|
if received_ms is not None:
|
|
received_values.append(received_ms)
|
|
sequence = int_or_none(record.get("group_sequence"))
|
|
digest = record.get("blake3")
|
|
if sequence is None:
|
|
continue
|
|
source_identity = record_source_identity(record)
|
|
if source_identity:
|
|
source_identities.add(source_identity)
|
|
else:
|
|
missing_source_identity_records += 1
|
|
if not isinstance(digest, str) or not digest.strip():
|
|
missing_hash_records += 1
|
|
continue
|
|
clean_digest = digest.strip()
|
|
hashes_by_sequence.setdefault(sequence, set()).add(clean_digest)
|
|
if source_identity:
|
|
source_hashes_by_sequence.setdefault(sequence, {}).setdefault(clean_digest, set()).add(source_identity)
|
|
duplicate_hash_source_records = sum(
|
|
max(0, len(source_identities_for_hash) - 1)
|
|
for hashes in source_hashes_by_sequence.values()
|
|
for source_identities_for_hash in hashes.values()
|
|
)
|
|
duplicate_hash_sequences = sum(
|
|
1
|
|
for hashes in source_hashes_by_sequence.values()
|
|
if any(len(source_identities_for_hash) > 1 for source_identities_for_hash in hashes.values())
|
|
)
|
|
hash_divergent_sequences = sum(1 for hashes in hashes_by_sequence.values() if len(hashes) > 1)
|
|
return {
|
|
"record_count": len(records),
|
|
"invalid_lines": invalid_lines,
|
|
"sequence_count": len(hashes_by_sequence),
|
|
"source_identity_count": len(source_identities),
|
|
"source_identities": sorted(source_identities),
|
|
"missing_source_identity_records": missing_source_identity_records,
|
|
"duplicate_hash_source_records": duplicate_hash_source_records,
|
|
"duplicate_hash_sequences": duplicate_hash_sequences,
|
|
"hash_divergent_sequences": hash_divergent_sequences,
|
|
"missing_hash_records": missing_hash_records,
|
|
"first_received_unix_ms": min(received_values) if received_values else None,
|
|
"latest_received_unix_ms": max(received_values) if received_values else None,
|
|
}
|
|
|
|
|
|
def first_hash_by_sequence(records: list[dict[str, Any]]) -> dict[int, str]:
|
|
out: dict[int, str] = {}
|
|
for record in records:
|
|
sequence = int_or_none(record.get("group_sequence"))
|
|
digest = record.get("blake3")
|
|
if sequence is None or not isinstance(digest, str) or not digest.strip():
|
|
continue
|
|
out.setdefault(sequence, digest.strip())
|
|
return out
|
|
|
|
|
|
def hash_sets_by_sequence(records: list[dict[str, Any]]) -> dict[int, set[str]]:
|
|
out: dict[int, set[str]] = {}
|
|
for record in records:
|
|
sequence = int_or_none(record.get("group_sequence"))
|
|
digest = record.get("blake3")
|
|
if sequence is None or not isinstance(digest, str) or not digest.strip():
|
|
continue
|
|
out.setdefault(sequence, set()).add(digest.strip())
|
|
return out
|
|
|
|
|
|
def compare_manifest_hashes(named_records: dict[str, list[dict[str, Any]]]) -> dict[str, Any]:
|
|
input_manifest_count = len(named_records)
|
|
missing_source_identity_records = 0
|
|
source_records: dict[str, list[dict[str, Any]]] = {}
|
|
for manifest_name, records in named_records.items():
|
|
for index, record in enumerate(records):
|
|
source_identity = record_source_identity(record)
|
|
if source_identity is None:
|
|
missing_source_identity_records += 1
|
|
source_identity = f"manifest:{manifest_name}"
|
|
source_records.setdefault(source_identity, []).append(record)
|
|
names = sorted(source_records)
|
|
per_name = {name: hash_sets_by_sequence(records) for name, records in source_records.items()}
|
|
all_sequences = sorted(set().union(*(set(value) for value in per_name.values()))) if per_name else []
|
|
shared_sequences = [
|
|
sequence
|
|
for sequence in all_sequences
|
|
if all(sequence in per_name[name] for name in names)
|
|
]
|
|
matching = 0
|
|
divergent = 0
|
|
examples: list[dict[str, Any]] = []
|
|
for sequence in shared_sequences:
|
|
values = {name: per_name[name][sequence] for name in names}
|
|
flattened = [next(iter(digests)) for digests in values.values() if len(digests) == 1]
|
|
if len(flattened) == len(names) and len(set(flattened)) == 1:
|
|
matching += 1
|
|
else:
|
|
divergent += 1
|
|
if len(examples) < 5:
|
|
examples.append(
|
|
{
|
|
"sequence": sequence,
|
|
"hashes": {
|
|
name: sorted(digests)
|
|
for name, digests in values.items()
|
|
},
|
|
}
|
|
)
|
|
source_identity_ok = missing_source_identity_records == 0 and len(names) >= 2
|
|
return {
|
|
"publisher_count": len(names),
|
|
"publishers": names,
|
|
"input_manifest_count": input_manifest_count,
|
|
"source_identity_count": len(names),
|
|
"source_identities": names,
|
|
"missing_source_identity_records": missing_source_identity_records,
|
|
"source_identity_ok": source_identity_ok,
|
|
"sequence_count": len(all_sequences),
|
|
"shared_sequence_count": len(shared_sequences),
|
|
"matching_sequence_count": matching,
|
|
"divergent_sequence_count": divergent,
|
|
"missing_sequence_count": max(0, len(all_sequences) - len(shared_sequences)),
|
|
"divergent_examples": examples,
|
|
"byte_for_byte_hash_match": bool(
|
|
source_identity_ok and shared_sequences and divergent == 0 and matching == len(shared_sequences)
|
|
),
|
|
}
|
|
|
|
|
|
def prometheus_query_url(prometheus_url: str, expr: str) -> str:
|
|
return (
|
|
prometheus_url.rstrip()
|
|
.rstrip("/")
|
|
+ "/api/v1/query?"
|
|
+ urllib.parse.urlencode({"query": expr})
|
|
)
|
|
|
|
|
|
def prometheus_metric_sum(
|
|
prometheus_url: str,
|
|
metric: str,
|
|
*,
|
|
broadcast: str,
|
|
timeout: float,
|
|
fetcher: Callable[[str, float, int], FetchResult] = fetch_text,
|
|
) -> dict[str, Any]:
|
|
selector = f'{metric}{{broadcast="{broadcast}"}}'
|
|
expr = f"sum({selector})"
|
|
fetched = fetcher(prometheus_query_url(prometheus_url, expr), timeout, 1024 * 1024)
|
|
if not fetched.ok:
|
|
return {"metric": metric, "ok": False, "value": None, "error": fetched.error}
|
|
try:
|
|
payload = json.loads(fetched.body)
|
|
result = payload.get("data", {}).get("result", [])
|
|
if not result:
|
|
return {"metric": metric, "ok": True, "value": None, "series_present": False}
|
|
raw_value = result[0].get("value", [None, None])[1]
|
|
value = float(raw_value)
|
|
except Exception as err: # noqa: BLE001 - preserve malformed Prometheus replies.
|
|
return {"metric": metric, "ok": False, "value": None, "error": f"invalid prometheus response: {err}"}
|
|
return {"metric": metric, "ok": True, "value": value, "series_present": True}
|
|
|
|
|
|
def agent_manifest_url(base_url: str, *, broadcast: str, track: str, role: str, max_bytes: int) -> str:
|
|
query = {
|
|
"broadcast": broadcast,
|
|
"track": track,
|
|
"max_bytes": str(max_bytes),
|
|
}
|
|
if role:
|
|
query["role"] = role
|
|
return base_url.rstrip("/") + "/v1/archive-manifest?" + urllib.parse.urlencode(query)
|
|
|
|
|
|
def sample_publishers(
|
|
publisher_urls: dict[str, str],
|
|
*,
|
|
timeout: float,
|
|
fetcher: Callable[[str, float, int], FetchResult] = fetch_text,
|
|
) -> dict[str, Any]:
|
|
out: dict[str, Any] = {}
|
|
for name, base_url in publisher_urls.items():
|
|
base = base_url.rstrip("/")
|
|
health = fetcher(f"{base}/health", timeout, 1024 * 1024)
|
|
metrics = fetcher(f"{base}/metrics", timeout, 2 * 1024 * 1024)
|
|
row: dict[str, Any] = {
|
|
"agent_url": base,
|
|
"health_ok": health.ok,
|
|
"metrics_ok": metrics.ok,
|
|
"health_error": health.error,
|
|
"metrics_error": metrics.error,
|
|
"duplicate_metrics_present": False,
|
|
"node_modes": [],
|
|
"unhealthy_processes": [],
|
|
}
|
|
if health.ok:
|
|
try:
|
|
payload = json.loads(health.body)
|
|
row["node_modes"] = payload.get("node_modes") if isinstance(payload.get("node_modes"), list) else []
|
|
row["unhealthy_processes"] = (
|
|
payload.get("unhealthy_processes")
|
|
if isinstance(payload.get("unhealthy_processes"), list)
|
|
else []
|
|
)
|
|
system = payload.get("system") if isinstance(payload.get("system"), dict) else {}
|
|
row["hostname"] = system.get("hostname") or payload.get("hostname")
|
|
except json.JSONDecodeError:
|
|
row["health_error"] = "invalid health json"
|
|
if metrics.ok:
|
|
row["duplicate_metrics_present"] = any(metric in metrics.body for metric in DUPLICATE_PROMETHEUS_METRICS)
|
|
row["metrics_bytes"] = len(metrics.body.encode("utf-8"))
|
|
out[name] = row
|
|
return out
|
|
|
|
|
|
def sample_once(args: argparse.Namespace) -> dict[str, Any]:
|
|
manifests: dict[str, str] = dict(parse_named_url(item) for item in args.manifest)
|
|
if not manifests and args.archive_origin and args.broadcast and args.track:
|
|
manifests["archive-origin"] = manifest_url(args.archive_origin, args.broadcast, args.track)
|
|
publisher_urls: dict[str, str] = dict(parse_named_url(item) for item in args.publisher)
|
|
agent_manifest_urls: dict[str, str] = dict(parse_named_url(item) for item in args.agent_manifest)
|
|
|
|
fetched_records: dict[str, list[dict[str, Any]]] = {}
|
|
manifest_stats: dict[str, Any] = {}
|
|
for name, url in manifests.items():
|
|
fetched = fetch_text(url, args.timeout, max_bytes=args.max_manifest_bytes)
|
|
if not fetched.ok:
|
|
manifest_stats[name] = {"url": url, "ok": False, "error": fetched.error}
|
|
continue
|
|
records, invalid_lines = parse_manifest_jsonl(fetched.body)
|
|
fetched_records[name] = records
|
|
manifest_stats[name] = {
|
|
"url": url,
|
|
"ok": True,
|
|
"fetch_elapsed_ms": fetched.elapsed_ms,
|
|
**manifest_hash_stats(records, invalid_lines),
|
|
}
|
|
|
|
if agent_manifest_urls and args.broadcast and args.track:
|
|
for name, base_url in agent_manifest_urls.items():
|
|
url = agent_manifest_url(
|
|
base_url,
|
|
broadcast=args.broadcast,
|
|
track=args.track,
|
|
role=args.agent_manifest_role,
|
|
max_bytes=args.max_manifest_bytes,
|
|
)
|
|
fetched, payload = fetch_json(url, args.timeout, max_bytes=args.max_manifest_bytes + 1024 * 1024)
|
|
if not fetched.ok or not isinstance(payload, dict) or payload.get("ok") is not True:
|
|
manifest_stats[name] = {
|
|
"url": url,
|
|
"ok": False,
|
|
"source": "node-agent",
|
|
"error": fetched.error or (payload.get("error") if isinstance(payload, dict) else "invalid response"),
|
|
}
|
|
continue
|
|
records = payload.get("records") if isinstance(payload.get("records"), list) else []
|
|
records = [record for record in records if isinstance(record, dict)]
|
|
fetched_records[name] = records
|
|
invalid_lines = int_or_none(payload.get("invalid_lines")) or 0
|
|
stats = payload.get("stats") if isinstance(payload.get("stats"), dict) else {}
|
|
manifest_stats[name] = {
|
|
"url": url,
|
|
"ok": True,
|
|
"source": "node-agent",
|
|
"fetch_elapsed_ms": fetched.elapsed_ms,
|
|
"role": payload.get("role"),
|
|
"file_bytes": int_or_none(payload.get("file_bytes")),
|
|
"partial_scan": payload.get("partial_scan") is True,
|
|
**manifest_hash_stats(records, invalid_lines),
|
|
"node_agent_stats": stats,
|
|
}
|
|
|
|
prometheus_metrics = []
|
|
if args.prometheus_url and args.broadcast:
|
|
for metric in DUPLICATE_PROMETHEUS_METRICS:
|
|
prometheus_metrics.append(
|
|
prometheus_metric_sum(args.prometheus_url, metric, broadcast=args.broadcast, timeout=args.timeout)
|
|
)
|
|
|
|
return {
|
|
"sample_unix_ms": now_ms(),
|
|
"broadcast": args.broadcast,
|
|
"track": args.track,
|
|
"publishers": sample_publishers(publisher_urls, timeout=args.timeout) if publisher_urls else {},
|
|
"manifests": manifest_stats,
|
|
"manifest_comparison": compare_manifest_hashes(fetched_records) if len(fetched_records) >= 2 else None,
|
|
"prometheus": prometheus_metrics,
|
|
}
|
|
|
|
|
|
def summarize(samples: list[dict[str, Any]]) -> dict[str, Any]:
|
|
if not samples:
|
|
return {"ok": False, "reasons": ["no_samples"]}
|
|
reasons: list[str] = []
|
|
elapsed_ms = max(0, int(samples[-1]["sample_unix_ms"]) - int(samples[0]["sample_unix_ms"]))
|
|
if len(samples) < 2 or elapsed_ms <= 0:
|
|
reasons.append("insufficient_elapsed_samples")
|
|
publisher_rows = [
|
|
row
|
|
for sample in samples
|
|
for row in (sample.get("publishers") or {}).values()
|
|
if isinstance(row, dict)
|
|
]
|
|
if publisher_rows and not all(row.get("health_ok") is True for row in publisher_rows):
|
|
reasons.append("publisher_health_missing")
|
|
if publisher_rows and not any(row.get("metrics_ok") is True for row in publisher_rows):
|
|
reasons.append("publisher_metrics_missing")
|
|
if publisher_rows and not any(row.get("duplicate_metrics_present") is True for row in publisher_rows):
|
|
reasons.append("duplicate_metrics_not_deployed_to_publishers")
|
|
comparisons = [
|
|
sample.get("manifest_comparison")
|
|
for sample in samples
|
|
if isinstance(sample.get("manifest_comparison"), dict)
|
|
]
|
|
latest_comparison = comparisons[-1] if comparisons else None
|
|
if latest_comparison is None:
|
|
reasons.append("manifest_comparison_missing")
|
|
elif latest_comparison.get("source_identity_ok") is not True:
|
|
reasons.append("manifest_source_identity_missing_or_not_diverse")
|
|
elif latest_comparison.get("matching_sequence_count", 0) <= 0:
|
|
reasons.append("no_matching_duplicate_sequences")
|
|
elif latest_comparison.get("divergent_sequence_count", 0) > 0:
|
|
reasons.append("duplicate_hash_divergence_observed")
|
|
|
|
manifest_rows = [
|
|
row
|
|
for sample in samples
|
|
for row in (sample.get("manifests") or {}).values()
|
|
if isinstance(row, dict)
|
|
]
|
|
if manifest_rows and any(row.get("ok") is not True for row in manifest_rows):
|
|
reasons.append("manifest_fetch_missing")
|
|
if manifest_rows and any(int_or_none(row.get("hash_divergent_sequences")) or 0 for row in manifest_rows):
|
|
reasons.append("manifest_hash_divergence_observed")
|
|
if manifest_rows and any(int_or_none(row.get("missing_hash_records")) or 0 for row in manifest_rows):
|
|
reasons.append("manifest_hash_missing_records")
|
|
if manifest_rows and any(int_or_none(row.get("missing_source_identity_records")) or 0 for row in manifest_rows):
|
|
reasons.append("manifest_source_identity_missing")
|
|
if manifest_rows and any(int_or_none(row.get("invalid_lines")) or 0 for row in manifest_rows):
|
|
reasons.append("manifest_invalid_lines")
|
|
|
|
prom_rows = [
|
|
row
|
|
for sample in samples
|
|
for row in (sample.get("prometheus") or [])
|
|
if isinstance(row, dict)
|
|
]
|
|
prom_series = [row for row in prom_rows if row.get("series_present") is True]
|
|
if prom_rows and not prom_series:
|
|
reasons.append("prometheus_duplicate_series_missing")
|
|
divergent_values = [
|
|
float(row.get("value") or 0)
|
|
for row in prom_series
|
|
if str(row.get("metric", "")).endswith("hash_divergent_sequences")
|
|
]
|
|
if any(value > 0 for value in divergent_values):
|
|
reasons.append("prometheus_hash_divergence_nonzero")
|
|
missing_source_values = [
|
|
float(row.get("value") or 0)
|
|
for row in prom_series
|
|
if str(row.get("metric", "")).endswith("missing_source_identity_records")
|
|
]
|
|
if any(value > 0 for value in missing_source_values):
|
|
reasons.append("prometheus_source_identity_missing_nonzero")
|
|
return {
|
|
"ok": not reasons,
|
|
"elapsed_ms": elapsed_ms,
|
|
"sample_count": len(samples),
|
|
"reasons": reasons,
|
|
"latest_manifest_comparison": latest_comparison,
|
|
"prometheus_series_present_count": len(prom_series),
|
|
"publisher_count": len(samples[-1].get("publishers") or {}),
|
|
}
|
|
|
|
|
|
def measure(args: argparse.Namespace) -> dict[str, Any]:
|
|
samples: list[dict[str, Any]] = []
|
|
started = time.monotonic()
|
|
while True:
|
|
samples.append(sample_once(args))
|
|
if args.duration_seconds <= 0:
|
|
break
|
|
if time.monotonic() - started >= args.duration_seconds:
|
|
break
|
|
time.sleep(args.poll_interval_seconds)
|
|
report = {
|
|
"started_unix_ms": samples[0]["sample_unix_ms"] if samples else now_ms(),
|
|
"duration_seconds": args.duration_seconds,
|
|
"samples": samples,
|
|
}
|
|
report["summary"] = summarize(samples)
|
|
return report
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--publisher", action="append", default=[], help="Named node-agent URL, NAME=http://IP:7799.")
|
|
parser.add_argument("--manifest", action="append", default=[], help="Named archive JSONL URL, NAME=https://...")
|
|
parser.add_argument(
|
|
"--agent-manifest",
|
|
action="append",
|
|
default=[],
|
|
help="Named node-agent URL to sample /v1/archive-manifest from, NAME=http://IP:7799.",
|
|
)
|
|
parser.add_argument("--agent-manifest-role", default="publisher-buffer")
|
|
parser.add_argument("--archive-origin", default="", help="Archive origin root for manifests/<broadcast>/<track>.jsonl.")
|
|
parser.add_argument("--prometheus-url", default="", help="Prometheus base URL for Grafana-facing metrics.")
|
|
parser.add_argument("--broadcast", default="", help="Logical broadcast name to measure.")
|
|
parser.add_argument(
|
|
"--track",
|
|
default="publisher.m4s",
|
|
help="Track name to compare. Defaults to publisher-origin proof fragments, not relay video.",
|
|
)
|
|
parser.add_argument("--duration-seconds", type=float, default=0.0)
|
|
parser.add_argument("--poll-interval-seconds", type=float, default=30.0)
|
|
parser.add_argument("--timeout", type=float, default=10.0)
|
|
parser.add_argument("--max-manifest-bytes", type=int, default=4 * 1024 * 1024)
|
|
parser.add_argument("--pretty", action="store_true")
|
|
parser.add_argument("--require-ok", action="store_true", help="Exit non-zero unless summary.ok is true.")
|
|
return parser
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
parser = build_parser()
|
|
args = parser.parse_args(argv)
|
|
try:
|
|
report = measure(args)
|
|
except Exception as err: # noqa: BLE001 - command-line tool should preserve exact failure.
|
|
print(json.dumps({"ok": False, "error": str(err)}, sort_keys=True), file=sys.stderr)
|
|
return 1
|
|
if args.pretty:
|
|
print(json.dumps(report, indent=2, sort_keys=True))
|
|
else:
|
|
print(json.dumps(report, sort_keys=True))
|
|
if args.require_ok and not report.get("summary", {}).get("ok"):
|
|
return 2
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|