//! Core types shared across every.channel. use serde::{Deserialize, Serialize}; use sha3::{Digest, Keccak256}; use std::fmt; pub const MANIFEST_ID_ALG_BLAKE3: &str = "blake3"; pub const MANIFEST_ID_ALG_KECCAK256: &str = "keccak256"; pub const MERKLE_PROOF_ALG_BLAKE3: &str = "merkle+blake3"; pub const MERKLE_PROOF_ALG_KECCAK256: &str = "merkle+keccak256"; #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct ChannelId(pub String); #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct DeviceId(pub String); #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct StreamId(pub String); #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct ChainCommitment { pub chain: String, pub scheme: String, pub digest: String, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StreamDescriptor { pub id: StreamId, pub title: String, pub number: Option, pub source: String, pub metadata: Vec, #[serde(default, skip_serializing_if = "Vec::is_empty")] pub commitments: Vec, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StreamMetadata { pub key: String, pub value: String, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct BroadcastId { pub standard: String, pub transport_stream_id: Option, pub program_number: Option, pub virtual_channel: Option, pub callsign: Option, pub region: Option, pub frequency: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SourceId { pub kind: String, pub device_id: Option, pub channel: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StreamKey { pub version: u16, pub broadcast: Option, pub source: Option, pub profile: Option, pub variant: Option, } impl StreamKey { pub fn for_channel_or_source( channel: Option<&Channel>, standard: Option<&str>, source: SourceId, profile: Option, variant: Option, ) -> Self { let broadcast = channel .and_then(|channel| standard.and_then(|standard| channel.broadcast_id(standard))); let source = if broadcast.is_some() { None } else { Some(source) }; Self { version: 1, broadcast, source, profile, variant, } } pub fn to_stream_id(&self) -> StreamId { let mut parts = vec![ "ec".to_string(), "stream".to_string(), format!("v{}", self.version), ]; if let Some(broadcast) = &self.broadcast { parts.push("broadcast".to_string()); parts.push(sanitize(&broadcast.standard)); if let Some(tsid) = broadcast.transport_stream_id { parts.push(format!("tsid-{tsid}")); } if let Some(program) = broadcast.program_number { parts.push(format!("program-{program}")); } if let Some(channel) = &broadcast.virtual_channel { parts.push(format!("channel-{}", sanitize(channel))); } if let Some(callsign) = &broadcast.callsign { parts.push(format!("callsign-{}", sanitize(callsign))); } if let Some(region) = &broadcast.region { parts.push(format!("region-{}", sanitize(region))); } if let Some(freq) = &broadcast.frequency { parts.push(format!("freq-{}", sanitize(freq))); } } else if let Some(source) = &self.source { parts.push("source".to_string()); parts.push(sanitize(&source.kind)); if let Some(device) = &source.device_id { parts.push(format!("device-{}", sanitize(device))); } if let Some(channel) = &source.channel { parts.push(format!("channel-{}", sanitize(channel))); } } else { parts.push("unknown".to_string()); } if let Some(profile) = &self.profile { parts.push(format!("profile-{}", sanitize(profile))); } if let Some(variant) = &self.variant { parts.push(format!("variant-{}", sanitize(variant))); } StreamId(parts.join("/")) } } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Channel { pub id: ChannelId, pub name: String, pub number: Option, pub program_id: Option, pub metadata: Vec, } fn sanitize(value: &str) -> String { value .chars() .map(|c| match c { 'a'..='z' | '0'..='9' | '-' | '_' => c, 'A'..='Z' => c.to_ascii_lowercase(), _ => '_', }) .collect() } #[derive(Debug, Clone, Serialize, Deserialize)] pub enum ChannelMetadata { Callsign(String), Network(String), Region(String), Frequency(String), Extra(String, String), } impl BroadcastId { pub fn is_usable(&self) -> bool { self.transport_stream_id.is_some() || self.program_number.is_some() || self .virtual_channel .as_ref() .is_some_and(|value| !value.trim().is_empty()) || self .callsign .as_ref() .is_some_and(|value| !value.trim().is_empty()) } } impl Channel { pub fn broadcast_id(&self, standard: &str) -> Option { let standard = standard.trim().to_ascii_lowercase(); if standard.is_empty() { return None; } let callsign = channel_metadata_value(&self.metadata, "callsign").or_else(|| { let name = self.name.trim(); (!name.is_empty()).then(|| name.to_string()) }); let region = channel_metadata_value(&self.metadata, "region"); let frequency = channel_metadata_value(&self.metadata, "frequency"); let transport_stream_id = channel_metadata_u16(&self.metadata, "transport_stream_id") .or_else(|| channel_metadata_u16(&self.metadata, "tsid")); let program_number = self .program_id .or_else(|| channel_metadata_u16(&self.metadata, "program_number")) .or_else(|| channel_metadata_u16(&self.metadata, "program_id")); let virtual_channel = self.number.as_ref().and_then(|value| { let trimmed = value.trim(); (!trimmed.is_empty()).then(|| trimmed.to_string()) }); let broadcast = BroadcastId { standard, transport_stream_id, program_number, virtual_channel, callsign, region, frequency, }; broadcast.is_usable().then_some(broadcast) } } fn channel_metadata_value(metadata: &[ChannelMetadata], key: &str) -> Option { for item in metadata { match item { ChannelMetadata::Callsign(value) if key == "callsign" => { return Some(value.trim().to_string()) } ChannelMetadata::Region(value) if key == "region" => { return Some(value.trim().to_string()) } ChannelMetadata::Frequency(value) if key == "frequency" => { return Some(value.trim().to_string()) } ChannelMetadata::Network(value) if key == "network" => { return Some(value.trim().to_string()) } ChannelMetadata::Extra(extra_key, value) if extra_key.eq_ignore_ascii_case(key) => { let trimmed = value.trim_matches('"').trim().to_string(); if !trimmed.is_empty() { return Some(trimmed); } } _ => {} } } None } fn channel_metadata_u16(metadata: &[ChannelMetadata], key: &str) -> Option { channel_metadata_value(metadata, key).and_then(|value| value.parse().ok()) } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PacketDigest { pub algorithm: String, pub hex: String, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DeterminismProfile { pub name: String, pub description: String, pub encoder: String, pub encoder_args: Vec, pub chunk_duration_ms: u64, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct NodeDescriptor { pub node_id: String, pub human_name: String, pub location_hint: Option, pub capabilities: Vec, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StreamEncryptionInfo { pub alg: String, pub key_id: String, pub nonce_scheme: String, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct MoqStreamDescriptor { pub endpoint: String, pub broadcast_name: String, pub track_name: String, pub encryption: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StreamCatalogEntry { pub stream: StreamDescriptor, pub moq: Option, pub manifest: Option, pub updated_unix_ms: u64, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StreamCatalog { pub entries: Vec, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "kind", rename_all = "snake_case")] pub enum StreamTransportDescriptor { /// Stream is available via a MoQ relay over WebTransport/WebSocket. RelayMoq { url: String, broadcast_name: String, track_name: String, }, /// Stream is available via iroh direct MoQ. IrohDirect { endpoint: String, broadcast_name: String, track_name: String, }, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StreamControlAnnouncement { pub stream: StreamDescriptor, pub transports: Vec, pub updated_unix_ms: u64, /// Suggested freshness window for this announcement. pub ttl_ms: u64, #[serde(default, skip_serializing_if = "Vec::is_empty")] pub commitments: Vec, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ManifestSummary { pub manifest_id: String, pub merkle_root: String, pub epoch_id: String, pub total_chunks: u64, pub chunk_start_index: u64, pub encoder_profile_id: String, pub signed_by: Vec, #[serde(default, skip_serializing_if = "Vec::is_empty")] pub commitments: Vec, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ChunkId { pub stream_id: StreamId, pub epoch_id: String, pub chunk_index: u64, pub chunk_hash: String, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ManifestVariant { pub variant_id: String, pub stream_id: StreamId, pub chunk_start_index: u64, pub total_chunks: u64, pub merkle_root: String, pub chunk_hashes: Vec, #[serde(default)] pub metadata: Vec, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ManifestBody { pub stream_id: StreamId, pub epoch_id: String, pub chunk_duration_ms: u64, pub total_chunks: u64, pub chunk_start_index: u64, pub encoder_profile_id: String, pub merkle_root: String, pub created_unix_ms: u64, pub metadata: Vec, pub chunk_hashes: Vec, #[serde(default)] pub variants: Option>, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ManifestSignature { pub signer_id: String, pub alg: String, pub signature: String, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Manifest { pub body: ManifestBody, pub manifest_id: String, pub signatures: Vec, #[serde(default, skip_serializing_if = "Vec::is_empty")] pub commitments: Vec, } impl Manifest { pub fn summary(&self) -> ManifestSummary { ManifestSummary { manifest_id: self.manifest_id.clone(), merkle_root: self.body.merkle_root.clone(), epoch_id: self.body.epoch_id.clone(), total_chunks: self.body.total_chunks, chunk_start_index: self.body.chunk_start_index, encoder_profile_id: self.body.encoder_profile_id.clone(), signed_by: self .signatures .iter() .map(|sig| sig.signer_id.clone()) .collect(), commitments: self.commitments.clone(), } } } #[derive(Debug, Clone)] pub enum ManifestError { Empty, InvalidHash(String), } impl fmt::Display for ManifestError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { ManifestError::Empty => write!(f, "no chunk hashes supplied"), ManifestError::InvalidHash(value) => write!(f, "invalid chunk hash: {value}"), } } } impl std::error::Error for ManifestError {} impl ManifestBody { pub fn manifest_id(&self) -> Result { self.manifest_id_blake3() } pub fn manifest_id_blake3(&self) -> Result { let bytes = serde_json::to_vec(self)?; Ok(blake3::hash(&bytes).to_hex().to_string()) } pub fn manifest_id_keccak256(&self) -> Result { let bytes = serde_json::to_vec(self)?; Ok(hex::encode(keccak256(&bytes))) } } fn parse_hash32(value: &str) -> Result<[u8; 32], ManifestError> { let trimmed = value.trim().strip_prefix("0x").unwrap_or(value.trim()); let bytes = hex::decode(trimmed).map_err(|_| ManifestError::InvalidHash(value.to_string()))?; if bytes.len() != 32 { return Err(ManifestError::InvalidHash(value.to_string())); } let mut out = [0u8; 32]; out.copy_from_slice(&bytes); Ok(out) } fn keccak256(bytes: &[u8]) -> [u8; 32] { let mut hasher = Keccak256::new(); hasher.update(bytes); let digest = hasher.finalize(); let mut out = [0u8; 32]; out.copy_from_slice(&digest); out } fn blake3_pair_hash(left: &[u8; 32], right: &[u8; 32]) -> [u8; 32] { let mut hasher = blake3::Hasher::new(); hasher.update(left); hasher.update(right); *hasher.finalize().as_bytes() } fn keccak_pair_hash(left: &[u8; 32], right: &[u8; 32]) -> [u8; 32] { let mut merged = [0u8; 64]; merged[..32].copy_from_slice(left); merged[32..].copy_from_slice(right); keccak256(&merged) } fn merkle_root_from_hashes_with( hashes: &[String], pair_hash: fn(&[u8; 32], &[u8; 32]) -> [u8; 32], ) -> Result { if hashes.is_empty() { return Err(ManifestError::Empty); } let mut nodes: Vec<[u8; 32]> = Vec::with_capacity(hashes.len()); for hash in hashes { nodes.push(parse_hash32(hash)?); } while nodes.len() > 1 { if nodes.len() % 2 == 1 { if let Some(last) = nodes.last().copied() { nodes.push(last); } } let mut parents = Vec::with_capacity(nodes.len() / 2); for pair in nodes.chunks(2) { parents.push(pair_hash(&pair[0], &pair[1])); } nodes = parents; } Ok(hex::encode(nodes[0])) } pub fn merkle_root_from_hashes(hashes: &[String]) -> Result { blake3_merkle_root_from_hashes(hashes) } pub fn blake3_merkle_root_from_hashes(hashes: &[String]) -> Result { merkle_root_from_hashes_with(hashes, blake3_pair_hash) } pub fn keccak_merkle_root_from_hashes(hashes: &[String]) -> Result { merkle_root_from_hashes_with(hashes, keccak_pair_hash) } fn merkle_proof_for_index_with( hashes: &[String], index: usize, pair_hash: fn(&[u8; 32], &[u8; 32]) -> [u8; 32], ) -> Result, ManifestError> { if hashes.is_empty() { return Err(ManifestError::Empty); } if index >= hashes.len() { return Err(ManifestError::InvalidHash(format!( "index {index} out of bounds" ))); } let mut nodes: Vec<[u8; 32]> = Vec::with_capacity(hashes.len()); for hash in hashes { nodes.push(parse_hash32(hash)?); } let mut proof = Vec::new(); let mut pos = index; while nodes.len() > 1 { if nodes.len() % 2 == 1 { if let Some(last) = nodes.last().copied() { nodes.push(last); } } let sibling_index = if pos % 2 == 0 { pos + 1 } else { pos - 1 }; let sibling = nodes .get(sibling_index) .ok_or_else(|| ManifestError::InvalidHash("missing sibling".to_string()))?; proof.push(hex::encode(sibling)); let mut parents = Vec::with_capacity(nodes.len() / 2); for pair in nodes.chunks(2) { parents.push(pair_hash(&pair[0], &pair[1])); } nodes = parents; pos /= 2; } Ok(proof) } pub fn merkle_proof_for_index( hashes: &[String], index: usize, ) -> Result, ManifestError> { blake3_merkle_proof_for_index(hashes, index) } pub fn blake3_merkle_proof_for_index( hashes: &[String], index: usize, ) -> Result, ManifestError> { merkle_proof_for_index_with(hashes, index, blake3_pair_hash) } pub fn keccak_merkle_proof_for_index( hashes: &[String], index: usize, ) -> Result, ManifestError> { merkle_proof_for_index_with(hashes, index, keccak_pair_hash) } fn verify_merkle_proof_with( leaf_hash: &str, mut index: usize, branch: &[String], expected_root: &str, pair_hash: fn(&[u8; 32], &[u8; 32]) -> [u8; 32], ) -> bool { let Ok(mut acc) = parse_hash32(leaf_hash) else { return false; }; for sibling_hex in branch { let Ok(sibling) = parse_hash32(sibling_hex) else { return false; }; let (left, right) = if index % 2 == 0 { (acc, sibling) } else { (sibling, acc) }; acc = pair_hash(&left, &right); index /= 2; } match parse_hash32(expected_root) { Ok(root) => acc == root, Err(_) => false, } } pub fn verify_merkle_proof( leaf_hash: &str, index: usize, branch: &[String], expected_root: &str, ) -> bool { verify_blake3_merkle_proof(leaf_hash, index, branch, expected_root) } pub fn verify_blake3_merkle_proof( leaf_hash: &str, index: usize, branch: &[String], expected_root: &str, ) -> bool { verify_merkle_proof_with(leaf_hash, index, branch, expected_root, blake3_pair_hash) } pub fn verify_keccak_merkle_proof( leaf_hash: &str, index: usize, branch: &[String], expected_root: &str, ) -> bool { verify_merkle_proof_with(leaf_hash, index, branch, expected_root, keccak_pair_hash) } #[cfg(test)] mod tests { use super::*; #[test] fn manifest_id_changes_with_body() { let body = ManifestBody { stream_id: StreamId("s".to_string()), epoch_id: "e".to_string(), chunk_duration_ms: 2000, total_chunks: 1, chunk_start_index: 0, encoder_profile_id: "p".to_string(), merkle_root: "00".repeat(32), created_unix_ms: 1, metadata: Vec::new(), chunk_hashes: vec!["11".repeat(32)], variants: None, }; let id1 = body.manifest_id().unwrap(); let mut body2 = body.clone(); body2.created_unix_ms = 2; let id2 = body2.manifest_id().unwrap(); assert_ne!(id1, id2); } #[test] fn manifest_id_defaults_to_blake3() { let body = ManifestBody { stream_id: StreamId("s".to_string()), epoch_id: "e".to_string(), chunk_duration_ms: 2000, total_chunks: 1, chunk_start_index: 0, encoder_profile_id: "p".to_string(), merkle_root: "00".repeat(32), created_unix_ms: 1, metadata: Vec::new(), chunk_hashes: vec!["11".repeat(32)], variants: None, }; let bytes = serde_json::to_vec(&body).unwrap(); assert_eq!( body.manifest_id().unwrap(), blake3::hash(&bytes).to_hex().to_string() ); assert_ne!( body.manifest_id().unwrap(), body.manifest_id_keccak256().unwrap() ); } #[test] fn merkle_root_single_is_leaf() { let leaf = blake3::hash(b"leaf").to_hex().to_string(); let root = merkle_root_from_hashes(&[leaf.clone()]).unwrap(); assert_eq!(root, leaf); let keccak_root = keccak_merkle_root_from_hashes(&[leaf.clone()]).unwrap(); assert_eq!(keccak_root, leaf); } #[test] fn merkle_root_rejects_invalid_hash() { let err = merkle_root_from_hashes(&["not-hex".to_string()]).unwrap_err(); assert!(matches!(err, ManifestError::InvalidHash(_))); } #[test] fn merkle_proof_roundtrip_small_sets() { for size in 1..=9usize { let leaves = (0..size) .map(|i| blake3::hash(&[i as u8]).to_hex().to_string()) .collect::>(); let root = merkle_root_from_hashes(&leaves).unwrap(); for idx in 0..size { let proof = merkle_proof_for_index(&leaves, idx).unwrap(); assert!( verify_merkle_proof(&leaves[idx], idx, &proof, &root), "size {size} idx {idx} failed" ); } } } #[test] fn keccak_merkle_proof_roundtrip_small_sets() { for size in 1..=9usize { let leaves = (0..size) .map(|i| blake3::hash(&[i as u8]).to_hex().to_string()) .collect::>(); let root = keccak_merkle_root_from_hashes(&leaves).unwrap(); for idx in 0..size { let proof = keccak_merkle_proof_for_index(&leaves, idx).unwrap(); assert!( verify_keccak_merkle_proof(&leaves[idx], idx, &proof, &root), "size {size} idx {idx} failed" ); } } } #[test] fn merkle_proof_detects_tampering() { let leaves = (0..4usize) .map(|i| blake3::hash(&[i as u8]).to_hex().to_string()) .collect::>(); let root = merkle_root_from_hashes(&leaves).unwrap(); let mut proof = merkle_proof_for_index(&leaves, 2).unwrap(); proof[0] = blake3::hash(b"evil").to_hex().to_string(); assert!(!verify_merkle_proof(&leaves[2], 2, &proof, &root)); } #[test] fn channel_broadcast_id_uses_typed_and_extra_metadata() { let channel = Channel { id: ChannelId("kcbs".to_string()), name: "KCBS-HD".to_string(), number: Some("2.1".to_string()), program_id: Some(3), metadata: vec![ ChannelMetadata::Callsign("KCBS".to_string()), ChannelMetadata::Region("los-angeles".to_string()), ChannelMetadata::Extra("tsid".to_string(), "42".to_string()), ChannelMetadata::Extra("frequency".to_string(), "573000000".to_string()), ], }; let broadcast = channel.broadcast_id("ATSC").unwrap(); assert_eq!(broadcast.standard, "atsc"); assert_eq!(broadcast.transport_stream_id, Some(42)); assert_eq!(broadcast.program_number, Some(3)); assert_eq!(broadcast.virtual_channel.as_deref(), Some("2.1")); assert_eq!(broadcast.callsign.as_deref(), Some("KCBS")); assert_eq!(broadcast.region.as_deref(), Some("los-angeles")); assert_eq!(broadcast.frequency.as_deref(), Some("573000000")); } #[test] fn stream_key_prefers_broadcast_scope_when_channel_identity_exists() { let channel = Channel { id: ChannelId("kcbs".to_string()), name: "KCBS-HD".to_string(), number: Some("2.1".to_string()), program_id: None, metadata: vec![ChannelMetadata::Callsign("KCBS".to_string())], }; let source = SourceId { kind: "hdhr".to_string(), device_id: Some("ABCDEF01".to_string()), channel: Some("2.1".to_string()), }; let key = StreamKey::for_channel_or_source( Some(&channel), Some("atsc"), source, Some("chunk-2000ms".to_string()), None, ); assert!(key.broadcast.is_some()); assert!(key.source.is_none()); assert_eq!( key.to_stream_id().0, "ec/stream/v1/broadcast/atsc/channel-2_1/callsign-kcbs/profile-chunk-2000ms" ); } #[test] fn stream_key_falls_back_to_source_scope_without_channel_identity() { let channel = Channel { id: ChannelId("unknown".to_string()), name: "".to_string(), number: None, program_id: None, metadata: Vec::new(), }; let source = SourceId { kind: "ts".to_string(), device_id: None, channel: Some("file.ts".to_string()), }; let key = StreamKey::for_channel_or_source( Some(&channel), Some("atsc"), source, Some("chunk-2000ms".to_string()), None, ); assert!(key.broadcast.is_none()); assert_eq!( key.to_stream_id().0, "ec/stream/v1/source/ts/channel-file_ts/profile-chunk-2000ms" ); } }