Advance forge rollout, Ethereum rails, and NBC sources

This commit is contained in:
every.channel 2026-04-01 15:58:49 -07:00
parent be26313225
commit 7d84510eac
No known key found for this signature in database
88 changed files with 11230 additions and 302 deletions

View file

@ -13,9 +13,11 @@ ec-crypto = { path = "../ec-crypto" }
ec-direct = { path = "../ec-direct" }
ec-moq = { path = "../ec-moq" }
ec-chopper = { path = "../ec-chopper" }
ec-eth = { path = "../ec-eth" }
ec-hdhomerun = { path = "../ec-hdhomerun" }
ec-iroh = { path = "../ec-iroh" }
ec-linux-iptv = { path = "../ec-linux-iptv" }
ec-ts = { path = "../ec-ts" }
hex = "0.4"
iroh = "0.96"
just-webrtc = "0.2"

View file

@ -8,15 +8,21 @@ use clap::ValueEnum;
use clap::{Parser, Subcommand};
use ec_chopper::{build_manifest_body_for_chunks, TsChunk};
use ec_core::{
merkle_proof_for_index, verify_merkle_proof, Manifest, ManifestSummary, ManifestVariant,
MoqStreamDescriptor, StreamCatalogEntry, StreamControlAnnouncement, StreamDescriptor,
StreamEncryptionInfo, StreamId, StreamKey, StreamMetadata, StreamTransportDescriptor,
merkle_proof_for_index, verify_merkle_proof, Manifest, ManifestBody, ManifestSummary,
ManifestVariant, MoqStreamDescriptor, StreamCatalogEntry, StreamControlAnnouncement,
StreamDescriptor, StreamEncryptionInfo, StreamId, StreamKey, StreamMetadata,
StreamTransportDescriptor, MERKLE_PROOF_ALG_BLAKE3,
};
use ec_crypto::{
decrypt_stream_data, encrypt_stream_data, load_manifest_keypair_from_env, sign_manifest_id,
verify_manifest_signature, ENCRYPTION_ALG,
decrypt_stream_data, encrypt_stream_data, load_ethereum_manifest_keypair_from_env,
load_manifest_keypair_from_env, sign_manifest_body_eip712, sign_manifest_id,
verify_manifest_signature_with_body, ENCRYPTION_ALG,
};
use ec_direct::{decode_direct_link, encode_direct_link, DirectCodeV1};
use ec_eth::{
control_announcement_commitments, manifest_commitments, manifest_commitments_match,
stream_descriptor_commitments,
};
use ec_iroh::DiscoveryConfig;
use ec_moq::{
chunk_duration_secs, decode_object_frame, encode_object_frame, FileRelay, GroupId, HlsWriter,
@ -779,7 +785,6 @@ fn ingest(args: IngestArgs) -> Result<()> {
};
let source_id = source.source_id();
let source_id_for_stream = source_id.clone();
let reader = source.open_stream()?;
let encoder_profile_id = if deterministic {
"deterministic-h264-aac".to_string()
@ -815,17 +820,12 @@ fn ingest(args: IngestArgs) -> Result<()> {
let relay = FileRelay::new(args.relay_dir);
let track = TrackName {
namespace: "every.channel".to_string(),
name: args.stream_id.unwrap_or_else(|| {
StreamKey {
version: 1,
broadcast: None,
source: Some(source_id_for_stream),
profile: Some(format!("chunk-{}ms", args.chunk_ms)),
variant: None,
name: match args.stream_id {
Some(stream_id) => stream_id,
None => {
default_stream_id_for_source(source.as_ref(), format!("chunk-{}ms", args.chunk_ms))?
}
.to_stream_id()
.0
}),
},
};
let stream_id = StreamId(track.name.clone());
let manifest_payload = build_manifest(
@ -1072,12 +1072,52 @@ fn deterministic_enabled(flag: bool) -> bool {
.unwrap_or(false)
}
fn default_stream_id_for_source(
source: &dyn StreamSource,
profile: impl Into<String>,
) -> Result<String> {
let source_id = source.source_id();
let profile = profile.into();
let stream_key = match source.broadcast_id()? {
Some(broadcast) => StreamKey {
version: 1,
broadcast: Some(broadcast),
source: None,
profile: Some(profile.clone()),
variant: None,
},
None => StreamKey {
version: 1,
broadcast: None,
source: Some(source_id),
profile: Some(profile),
variant: None,
},
};
Ok(stream_key.to_stream_id().0)
}
fn read_chunk_bytes_and_hash(path: &std::path::Path) -> Result<(Vec<u8>, String)> {
let data = fs::read(path).with_context(|| format!("failed to read {}", path.display()))?;
let hash = blake3::hash(&data).to_hex().to_string();
Ok((data, hash))
}
fn collect_manifest_signatures(
manifest_id: &str,
body: &ManifestBody,
) -> Result<Vec<ec_core::ManifestSignature>> {
let mut signatures = Vec::new();
if let Some(keypair) = load_manifest_keypair_from_env().map_err(|err| anyhow!(err))? {
signatures.push(sign_manifest_id(manifest_id, &keypair));
}
if let Some(keypair) = load_ethereum_manifest_keypair_from_env().map_err(|err| anyhow!(err))? {
signatures.push(sign_manifest_body_eip712(body, &keypair).map_err(|err| anyhow!(err))?);
}
Ok(signatures)
}
fn build_manifest(
stream_id: StreamId,
epoch_id: impl Into<String>,
@ -1099,15 +1139,15 @@ fn build_manifest(
&chunk_hashes,
)?;
let manifest_id = body.manifest_id()?;
let mut signatures = Vec::new();
if let Some(keypair) = load_manifest_keypair_from_env().map_err(|err| anyhow!(err))? {
signatures.push(sign_manifest_id(&manifest_id, &keypair));
}
Ok(Manifest {
let signatures = collect_manifest_signatures(&manifest_id, &body)?;
let mut manifest = Manifest {
body,
manifest_id,
signatures,
})
commitments: Vec::new(),
};
manifest.commitments = manifest_commitments(&manifest).map_err(|err| anyhow!("{err}"))?;
Ok(manifest)
}
#[derive(Debug, Clone)]
@ -1224,15 +1264,15 @@ fn build_multi_variant_manifest(
variants: Some(entries),
};
let manifest_id = body.manifest_id()?;
let mut signatures = Vec::new();
if let Some(keypair) = load_manifest_keypair_from_env().map_err(|err| anyhow!(err))? {
signatures.push(sign_manifest_id(&manifest_id, &keypair));
}
Ok(Manifest {
let signatures = collect_manifest_signatures(&manifest_id, &body)?;
let mut manifest = Manifest {
body,
manifest_id,
signatures,
})
commitments: Vec::new(),
};
manifest.commitments = manifest_commitments(&manifest).map_err(|err| anyhow!("{err}"))?;
Ok(manifest)
}
struct EpochBuffer {
@ -1343,10 +1383,13 @@ fn validate_manifest(manifest: &Manifest, allowlist: Option<&HashSet<String>>) -
_ => return false,
}
}
if !matches!(manifest_commitments_match(manifest), Ok(true)) {
return false;
}
if let Some(allowlist) = allowlist {
return manifest.signatures.iter().any(|sig| {
verify_manifest_signature(&manifest.manifest_id, sig)
verify_manifest_signature_with_body(&manifest.manifest_id, &manifest.body, sig)
&& allowlist.contains(&sig.signer_id)
});
}
@ -1363,7 +1406,7 @@ fn validate_manifest(manifest: &Manifest, allowlist: Option<&HashSet<String>>) -
manifest
.signatures
.iter()
.any(|sig| verify_manifest_signature(&manifest.manifest_id, sig))
.any(|sig| verify_manifest_signature_with_body(&manifest.manifest_id, &manifest.body, sig))
}
fn strip_init_suffix(key_id: &str) -> &str {
@ -1482,7 +1525,7 @@ fn build_object(
chunk_hash: Some(chunk_hash),
chunk_hash_alg: Some("blake3".to_string()),
chunk_proof,
chunk_proof_alg: Some("merkle+blake3".to_string()),
chunk_proof_alg: Some(MERKLE_PROOF_ALG_BLAKE3.to_string()),
manifest_id: manifest_id.map(|value| value.to_string()),
};
@ -1592,6 +1635,8 @@ fn flush_epoch_publish(
#[cfg(test)]
mod tests {
use super::*;
use ec_core::BroadcastId;
use std::io::Cursor;
#[test]
fn parse_manifest_allowlist_splits_and_trims() {
@ -1653,11 +1698,14 @@ mod tests {
vec![sig]
};
Manifest {
let mut manifest = Manifest {
body,
manifest_id,
signatures,
}
commitments: Vec::new(),
};
manifest.commitments = manifest_commitments(&manifest).unwrap();
manifest
}
#[test]
@ -1684,6 +1732,65 @@ mod tests {
assert!(!validate_manifest(&manifest, Some(&deny)));
}
#[test]
fn validate_manifest_accepts_ethereum_signature() {
let chunk_hashes = vec![blake3::hash(b"c0").to_hex().to_string()];
let body = build_manifest_body_for_chunks(
StreamId("s".to_string()),
"epoch-eth",
2000,
0,
"p",
1,
Vec::new(),
&chunk_hashes,
)
.unwrap();
let manifest_id = body.manifest_id().unwrap();
let prev = std::env::var(ec_crypto::ETH_MANIFEST_SIGNING_KEY_ENV).ok();
std::env::set_var(ec_crypto::ETH_MANIFEST_SIGNING_KEY_ENV, "22".repeat(32));
let keypair = load_ethereum_manifest_keypair_from_env().unwrap().unwrap();
let sig = sign_manifest_body_eip712(&body, &keypair).unwrap();
match prev {
Some(value) => std::env::set_var(ec_crypto::ETH_MANIFEST_SIGNING_KEY_ENV, value),
None => std::env::remove_var(ec_crypto::ETH_MANIFEST_SIGNING_KEY_ENV),
}
let mut manifest = Manifest {
body,
manifest_id,
signatures: vec![sig],
commitments: Vec::new(),
};
manifest.commitments = manifest_commitments(&manifest).unwrap();
assert!(validate_manifest(&manifest, None));
let allow = HashSet::from([manifest.signatures[0].signer_id.clone()]);
assert!(validate_manifest(&manifest, Some(&allow)));
}
#[test]
fn validate_manifest_rejects_bad_ethereum_commitment() {
let mut manifest = build_valid_manifest(true);
manifest.commitments[0].digest = "0xdeadbeef".to_string();
assert!(!validate_manifest(&manifest, None));
}
#[test]
fn control_announcement_carries_stream_and_announcement_commitments() {
let announcement = build_control_announcement(
"ec/stream/v1/broadcast/atsc/tsid-42/program-3".to_string(),
"KCBS".to_string(),
vec![StreamTransportDescriptor::IrohDirect {
endpoint: "ed25519:node".to_string(),
broadcast_name: "kcbs".to_string(),
track_name: "video0.m4s".to_string(),
}],
5_000,
);
assert!(!announcement.stream.commitments.is_empty());
assert!(!announcement.commitments.is_empty());
}
#[test]
fn manifest_hash_for_chunk_indexes_into_hash_list() {
let manifest = build_valid_manifest(true);
@ -1699,6 +1806,70 @@ mod tests {
);
assert!(manifest_hash_for_chunk(&manifest, sid, 12).is_none());
}
#[derive(Clone)]
struct DummySource {
source_id: ec_core::SourceId,
broadcast_id: Option<BroadcastId>,
}
impl StreamSource for DummySource {
fn open_stream(&self) -> Result<Box<dyn Read + Send>> {
Ok(Box::new(Cursor::new(Vec::<u8>::new())))
}
fn source_id(&self) -> ec_core::SourceId {
self.source_id.clone()
}
fn broadcast_id(&self) -> Result<Option<BroadcastId>> {
Ok(self.broadcast_id.clone())
}
}
#[test]
fn default_stream_id_for_source_uses_broadcast_identity_when_present() {
let source = DummySource {
source_id: ec_core::SourceId {
kind: "hdhr".to_string(),
device_id: Some("ABCDEF01".to_string()),
channel: Some("2.1".to_string()),
},
broadcast_id: Some(BroadcastId {
standard: "atsc".to_string(),
transport_stream_id: None,
program_number: Some(3),
virtual_channel: Some("2.1".to_string()),
callsign: Some("KCBS".to_string()),
region: None,
frequency: None,
}),
};
let stream_id = default_stream_id_for_source(&source, "chunk-2000ms").unwrap();
assert_eq!(
stream_id,
"ec/stream/v1/broadcast/atsc/program-3/channel-2_1/callsign-kcbs/profile-chunk-2000ms"
);
}
#[test]
fn default_stream_id_for_source_falls_back_to_source_identity() {
let source = DummySource {
source_id: ec_core::SourceId {
kind: "ts".to_string(),
device_id: None,
channel: Some("capture.ts".to_string()),
},
broadcast_id: None,
};
let stream_id = default_stream_id_for_source(&source, "chunk-2000ms").unwrap();
assert_eq!(
stream_id,
"ec/stream/v1/source/ts/channel-capture_ts/profile-chunk-2000ms"
);
}
}
async fn moq_publish(args: MoqPublishArgs) -> Result<()> {
@ -1747,19 +1918,13 @@ async fn moq_publish(args: MoqPublishArgs) -> Result<()> {
};
let source_id = source.source_id();
let source_id_for_stream = source_id.clone();
let stream_id = args.stream_id.unwrap_or_else(|| {
StreamKey {
version: 1,
broadcast: None,
source: Some(source_id_for_stream),
profile: Some(format!("chunk-{}ms", args.chunk_ms)),
variant: None,
let stream_id = match args.stream_id {
Some(stream_id) => stream_id,
None => {
default_stream_id_for_source(source.as_ref(), format!("chunk-{}ms", args.chunk_ms))?
}
.to_stream_id()
.0
});
};
let broadcast_name = args.broadcast_name.unwrap_or_else(|| stream_id.clone());
let track_name = args.track_name.clone();
@ -4409,7 +4574,10 @@ fn build_catalog_entry(
key: "broadcast".to_string(),
value: broadcast_name.to_string(),
}],
commitments: Vec::new(),
};
let mut stream = stream;
stream.commitments = stream_descriptor_commitments(&stream);
let encryption = StreamEncryptionInfo {
alg: ENCRYPTION_ALG.to_string(),
@ -4448,7 +4616,7 @@ fn build_control_announcement(
transports: Vec<StreamTransportDescriptor>,
ttl_ms: u64,
) -> StreamControlAnnouncement {
let stream = StreamDescriptor {
let mut stream = StreamDescriptor {
id: StreamId(stream_id.clone()),
title,
number: None,
@ -4457,14 +4625,19 @@ fn build_control_announcement(
key: "stream_id".to_string(),
value: stream_id,
}],
commitments: Vec::new(),
};
stream.commitments = stream_descriptor_commitments(&stream);
StreamControlAnnouncement {
let mut announcement = StreamControlAnnouncement {
stream,
transports,
updated_unix_ms: now_unix_ms(),
ttl_ms,
}
commitments: Vec::new(),
};
announcement.commitments = control_announcement_commitments(&announcement);
announcement
}
async fn spawn_control_announcer_task(

View file

@ -1,9 +1,10 @@
use anyhow::{anyhow, Result};
use clap::ValueEnum;
use ec_chopper::{deterministic_h264_profile, ffmpeg_profile_args};
use ec_core::SourceId;
use ec_hdhomerun::{find_lineup_entry_by_name, find_lineup_entry_by_number};
use ec_core::{BroadcastId, SourceId};
use ec_hdhomerun::{find_lineup_entry_by_name, find_lineup_entry_by_number, LineupEntry};
use ec_linux_iptv::LinuxDvbConfig;
use ec_ts::probe_transport_stream_identity;
use std::io::Read;
use std::process::{Child, Command, Stdio};
use std::thread;
@ -11,6 +12,9 @@ use std::thread;
pub trait StreamSource: Send {
fn open_stream(&self) -> Result<Box<dyn Read + Send>>;
fn source_id(&self) -> SourceId;
fn broadcast_id(&self) -> Result<Option<BroadcastId>> {
Ok(None)
}
}
#[derive(Debug, Clone)]
@ -24,20 +28,8 @@ pub struct HdhrSource {
impl StreamSource for HdhrSource {
fn open_stream(&self) -> Result<Box<dyn Read + Send>> {
let device = resolve_hdhr_device(self)?;
let lineup = ec_hdhomerun::fetch_lineup(&device)?;
let entry = if let Some(channel) = &self.channel {
find_lineup_entry_by_number(&lineup, channel)
.or_else(|| find_lineup_entry_by_name(&lineup, channel))
.ok_or_else(|| anyhow!("channel not found: {channel}"))?
} else if let Some(name) = &self.name {
find_lineup_entry_by_name(&lineup, name)
.ok_or_else(|| anyhow!("channel not found: {name}"))?
} else {
return Err(anyhow!("--channel or --name required for hdhr"));
};
Ok(Box::new(ec_hdhomerun::open_stream_entry(entry, None)?))
let entry = resolve_hdhr_lineup_entry(self)?;
Ok(Box::new(ec_hdhomerun::open_stream_entry(&entry, None)?))
}
fn source_id(&self) -> SourceId {
@ -48,6 +40,21 @@ impl StreamSource for HdhrSource {
channel: self.channel.clone().or_else(|| self.name.clone()),
}
}
fn broadcast_id(&self) -> Result<Option<BroadcastId>> {
let entry = resolve_hdhr_lineup_entry(self)?;
let mut broadcast = entry.channel.broadcast_id("atsc");
if broadcast.as_ref().is_none_or(|identity| {
identity.transport_stream_id.is_none() || identity.program_number.is_none()
}) {
let probe = ec_hdhomerun::open_stream_entry(&entry, Some(2))?;
broadcast = merge_broadcast_identity(
broadcast,
probe_transport_stream_broadcast(Box::new(probe), Some("atsc"))?,
);
}
Ok(broadcast)
}
}
fn resolve_hdhr_device(source: &HdhrSource) -> Result<ec_hdhomerun::HdhomerunDevice> {
@ -72,6 +79,23 @@ fn resolve_hdhr_device(source: &HdhrSource) -> Result<ec_hdhomerun::HdhomerunDev
.ok_or_else(|| anyhow!("no HDHomeRun devices found"))
}
fn resolve_hdhr_lineup_entry(source: &HdhrSource) -> Result<LineupEntry> {
let device = resolve_hdhr_device(source)?;
let lineup = ec_hdhomerun::fetch_lineup(&device)?;
let entry = if let Some(channel) = &source.channel {
find_lineup_entry_by_number(&lineup, channel)
.or_else(|| find_lineup_entry_by_name(&lineup, channel))
.ok_or_else(|| anyhow!("channel not found: {channel}"))?
} else if let Some(name) = &source.name {
find_lineup_entry_by_name(&lineup, name)
.ok_or_else(|| anyhow!("channel not found: {name}"))?
} else {
return Err(anyhow!("--channel or --name required for hdhr"));
};
Ok(entry.clone())
}
#[derive(Debug, Clone)]
pub struct LinuxDvbSource {
pub adapter: u32,
@ -126,6 +150,16 @@ impl StreamSource for TsSource {
channel: None,
}
}
fn broadcast_id(&self) -> Result<Option<BroadcastId>> {
if self.input.starts_with("http://") || self.input.starts_with("https://") {
let reader = ec_hdhomerun::open_stream_url(&self.input, Some(2))?;
probe_transport_stream_broadcast(Box::new(reader), None)
} else {
let reader = std::fs::File::open(&self.input)?;
probe_transport_stream_broadcast(Box::new(reader), None)
}
}
}
#[derive(Debug, Clone, Copy, ValueEnum)]
@ -197,6 +231,67 @@ impl StreamSource for HlsSource {
}
}
fn probe_transport_stream_broadcast(
reader: Box<dyn Read + Send>,
fallback_standard: Option<&str>,
) -> Result<Option<BroadcastId>> {
let Some(identity) = probe_transport_stream_identity(reader, 256)? else {
return Ok(None);
};
let virtual_channel = match (identity.major_channel_number, identity.minor_channel_number) {
(Some(major), Some(minor)) => Some(format!("{major}.{minor}")),
_ => None,
};
Ok(Some(BroadcastId {
standard: identity
.standard
.or_else(|| fallback_standard.map(|value| value.to_string()))
.unwrap_or_else(|| "mpegts".to_string()),
transport_stream_id: Some(identity.transport_stream_id),
program_number: identity.program_number,
virtual_channel,
callsign: identity.short_name,
region: None,
frequency: None,
}))
}
fn merge_broadcast_identity(
base: Option<BroadcastId>,
probed: Option<BroadcastId>,
) -> Option<BroadcastId> {
match (base, probed) {
(None, other) => other,
(some, None) => some,
(Some(mut base), Some(probed)) => {
if base.standard.trim().is_empty() {
base.standard = probed.standard;
}
if base.transport_stream_id.is_none() {
base.transport_stream_id = probed.transport_stream_id;
}
if base.program_number.is_none() {
base.program_number = probed.program_number;
}
if base.virtual_channel.is_none() {
base.virtual_channel = probed.virtual_channel;
}
if base.callsign.is_none() {
base.callsign = probed.callsign;
}
if base.region.is_none() {
base.region = probed.region;
}
if base.frequency.is_none() {
base.frequency = probed.frequency;
}
Some(base)
}
}
}
struct FfmpegChildStream {
child: Child,
stdout: std::process::ChildStdout,