#![cfg_attr(not(debug_assertions), windows_subsystem = "windows")] use anyhow::{anyhow, Context, Result}; use axum::Router; use blake3; use ec_core::{ merkle_root_from_hashes, Manifest, ManifestVariant, MoqStreamDescriptor, SourceId, StreamCatalogEntry, StreamDescriptor, StreamEncryptionInfo, StreamId, StreamKey, StreamMetadata, }; use ec_crypto::{decrypt_stream_data, encrypt_stream_data, ENCRYPTION_ALG}; use ec_hdhomerun::{HdhomerunDevice, LineupEntry}; use ec_iroh; use ec_linux_iptv::LinuxDvbConfig; use ec_moq::{ chunk_duration_secs, GroupId, HlsWriter, MoqNode, ObjectMeta, ObjectPayload, TimingMeta, DEFAULT_MANIFEST_TRACK_NAME, DEFAULT_TRACK_NAME, }; use reqwest::blocking as reqwest_blocking; use reqwest::Url; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; use std::fs; use std::io::Read; use std::net::IpAddr; use std::path::{Path, PathBuf}; use std::process::{Child, Command, Stdio}; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use tauri::path::BaseDirectory; use tauri::{AppHandle, Manager, State}; use tokio::sync::Mutex; use tower_http::services::ServeDir; #[derive(Debug, Clone, Serialize, Deserialize)] struct PlaybackInfo { stream_id: String, url: String, } #[derive(Debug, Clone)] struct StreamSource { stream_url: String, title: String, number: Option, metadata: Vec, } struct StreamProcess { _child: Child, _output_dir: PathBuf, } struct MoqStreamProcess { _task: tauri::async_runtime::JoinHandle<()>, _node: MoqNode, _output_dir: PathBuf, _mdns: Option, } struct MoqPublishProcess { _task: tauri::async_runtime::JoinHandle<()>, _node: MoqNode, _mdns: Option, share: ShareInfo, } struct CatalogProcess { _task: tauri::async_runtime::JoinHandle<()>, } #[derive(Debug, Clone, Serialize, Deserialize)] struct SourceDescriptor { id: String, kind: String, name: String, ip: Option, tuner_count: Option, status: String, } struct StreamManager { port: u16, output_root: PathBuf, streams: Vec, manual_streams: Vec, sources: HashMap, manual_sources: HashMap, manual_source_descriptors: HashMap, manual_devices: HashMap, manual_entries: Vec, manual_entries_loaded: bool, processes: HashMap, moq_processes: HashMap, moq_publishes: HashMap, catalog_streams: HashMap, catalog_process: Option, } impl StreamManager { fn new(port: u16, output_root: PathBuf) -> Self { Self { port, output_root, streams: Vec::new(), manual_streams: Vec::new(), sources: HashMap::new(), manual_sources: HashMap::new(), manual_source_descriptors: HashMap::new(), manual_devices: HashMap::new(), manual_entries: Vec::new(), manual_entries_loaded: false, processes: HashMap::new(), moq_processes: HashMap::new(), moq_publishes: HashMap::new(), catalog_streams: HashMap::new(), catalog_process: None, } } } #[tauri::command] async fn list_streams( state: State<'_, Arc>>, ) -> Result, String> { let needs_refresh = { let manager = state.lock().await; manager.streams.is_empty() }; if needs_refresh { let (streams, sources) = tokio::task::spawn_blocking(discover_streams) .await .map_err(|err| err.to_string()) .and_then(|res| res.map_err(|err| err.to_string()))?; let mut manager = state.lock().await; manager.streams = streams; manager.sources = sources; } let manager = state.lock().await; let local = merge_local_streams(&manager.streams, &manager.manual_streams); Ok(merge_streams(&local, &manager.catalog_streams)) } #[tauri::command] async fn refresh_streams( state: State<'_, Arc>>, ) -> Result, String> { let (streams, sources) = tokio::task::spawn_blocking(discover_streams) .await .map_err(|err| err.to_string()) .and_then(|res| res.map_err(|err| err.to_string()))?; let mut manager = state.lock().await; manager.streams = streams; manager.sources = sources; let local = merge_local_streams(&manager.streams, &manager.manual_streams); Ok(merge_streams(&local, &manager.catalog_streams)) } #[tauri::command] async fn list_sources( state: State<'_, Arc>>, ) -> Result, String> { let sources = tokio::task::spawn_blocking(discover_sources) .await .map_err(|err| err.to_string()) .and_then(|res| res.map_err(|err| err.to_string()))?; let manager = state.lock().await; let mut merged = merge_source_descriptors(sources, manager.manual_devices.iter()); let mut seen: HashSet = merged.iter().map(|source| source.id.clone()).collect(); for source in manager.manual_source_descriptors.values() { if seen.insert(source.id.clone()) { merged.push(source.clone()); } } Ok(dedupe_source_descriptors(merged)) } fn dedupe_source_descriptors(sources: Vec) -> Vec { // Defensive dedupe: callers occasionally merge the same physical device via multiple paths // (mDNS, manual add, persisted aliases). Prefer "online" when duplicates exist. use std::collections::BTreeMap; fn key(s: &SourceDescriptor) -> (String, String, String) { ( s.kind.clone(), s.id.clone(), s.ip.clone().unwrap_or_default(), ) } let mut by_key: BTreeMap<(String, String, String), SourceDescriptor> = BTreeMap::new(); for source in sources { let k = key(&source); match by_key.get(&k) { None => { by_key.insert(k, source); } Some(existing) => { if existing.status != "online" && source.status == "online" { by_key.insert(k, source); } } } } by_key.into_values().collect() } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct AddHdhrArgs { host: String, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct AddYtdlpArgs { url: String, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] #[serde(rename_all = "camelCase")] struct ManualSourceOptions { ytdlp_format: Option, ytdlp_live_from_start: bool, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] #[serde(rename_all = "camelCase")] struct ManualSourceEntry { kind: String, input: String, options: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct AddStreamArgs { input: String, options: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct ProbeStreamArgs { input: String, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct LinuxDvbAdapterInfo { adapter: u32, dvrs: Vec, frontends: Vec, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct LinuxDvbChannelsInfo { channels_conf: Option, channels: Vec, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct LinuxDvbBuildUrlArgs { adapter: u32, dvr: u32, channel: Option, channels_conf: Option, tune_wait_ms: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct LinuxDvbListChannelsArgs { channels_conf: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct YtdlpFormatOption { format_id: String, label: String, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct YtdlpProbe { title: Option, formats: Vec, default_format: Option, supports_live_from_start: bool, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct StreamProbe { kind: String, live: bool, requires_options: bool, message: Option, ytdlp: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct AddStreamResult { kind: String, added: usize, title: Option, } #[tauri::command] async fn add_hdhr_source( args: AddHdhrArgs, app: AppHandle, state: State<'_, Arc>>, ) -> Result { let host = args.host.trim(); if host.is_empty() { return Err("host is required".to_string()); } let host_string = normalize_host(host); let host_for_task = host_string.clone(); let (device, streams, sources) = tokio::task::spawn_blocking(move || hydrate_hdhr_host(&host_for_task)) .await .map_err(|err| err.to_string()) .and_then(|res| res.map_err(|err| err.to_string()))?; let mut manager = state.lock().await; add_manual_entries( &mut manager, host_string.clone(), device.clone(), streams, sources, ); remember_manual_entry( &mut manager, &app, ManualSourceEntry { kind: "hdhr".to_string(), input: host_string.clone(), options: None, }, ); Ok(source_descriptor_for_device_with_id( &device, "manual", manual_source_id(&host_string, &device), )) } #[tauri::command] async fn add_ytdlp_source( args: AddYtdlpArgs, app: AppHandle, state: State<'_, Arc>>, ) -> Result { let url = args.url.trim(); if url.is_empty() { return Err("url is required".to_string()); } let url_string = url.to_string(); let url_for_task = url_string.clone(); let app_clone = app.clone(); let resolved = tokio::task::spawn_blocking(move || resolve_ytdlp_stream(&app_clone, &url_for_task, None)) .await .map_err(|err| err.to_string()) .and_then(|res| res.map_err(|err| err.to_string()))?; let mut manager = state.lock().await; if !manager .manual_streams .iter() .any(|existing| existing.id == resolved.descriptor.id) { manager.manual_streams.push(resolved.descriptor.clone()); } manager .manual_sources .insert(resolved.descriptor.id.0.clone(), resolved.source); manager.manual_source_descriptors.insert( resolved.source_descriptor.id.clone(), resolved.source_descriptor, ); remember_manual_entry( &mut manager, &app, ManualSourceEntry { kind: "ytdlp".to_string(), input: url_string, options: None, }, ); Ok(resolved.descriptor) } #[tauri::command] async fn probe_stream(args: ProbeStreamArgs, app: AppHandle) -> Result { let input = args.input.trim().to_string(); if input.is_empty() { return Err("input is required".to_string()); } tokio::task::spawn_blocking(move || probe_stream_input(&app, &input)) .await .map_err(|err| err.to_string()) .and_then(|res| res.map_err(|err| err.to_string())) } #[tauri::command] async fn linux_dvb_list_adapters() -> Result, String> { tokio::task::spawn_blocking(move || -> Result> { let adapters = ec_linux_iptv::list_adapters()?; Ok(adapters .into_iter() .map(|info| LinuxDvbAdapterInfo { adapter: info.adapter, dvrs: info.dvrs, frontends: info.frontends, }) .collect()) }) .await .map_err(|err| err.to_string()) .and_then(|res| res.map_err(|err| err.to_string())) } #[tauri::command] async fn linux_dvb_list_channels( args: LinuxDvbListChannelsArgs, ) -> Result { tokio::task::spawn_blocking(move || -> Result { let conf = args .channels_conf .as_deref() .and_then(|value| { let value = value.trim(); if value.is_empty() { None } else { Some(PathBuf::from(value)) } }) .or_else(ec_linux_iptv::find_channels_conf); let channels = if let Some(path) = conf.as_deref() { ec_linux_iptv::parse_channels_conf(path)? } else { Vec::new() }; Ok(LinuxDvbChannelsInfo { channels_conf: conf.map(|p| p.display().to_string()), channels, }) }) .await .map_err(|err| err.to_string()) .and_then(|res| res.map_err(|err| err.to_string())) } #[tauri::command] async fn linux_dvb_build_url(args: LinuxDvbBuildUrlArgs) -> Result { tokio::task::spawn_blocking(move || -> Result { let adapter = args.adapter; let dvr = args.dvr; let mut tune_cmd = Vec::new(); if let (Some(conf), Some(channel)) = (args.channels_conf.as_deref(), args.channel.as_deref()) { let conf_path = PathBuf::from(conf); tune_cmd = ec_linux_iptv::default_zap_tune_command(adapter, &conf_path, channel); } let config = LinuxDvbConfig { adapter, frontend: 0, dvr, tune_command: if tune_cmd.is_empty() { None } else { Some(tune_cmd) }, tune_timeout_ms: args.tune_wait_ms, }; Ok(linux_dvb_url(&config)) }) .await .map_err(|err| err.to_string()) .and_then(|res| res.map_err(|err| err.to_string())) } #[tauri::command] async fn add_stream( args: AddStreamArgs, app: AppHandle, state: State<'_, Arc>>, ) -> Result { let input = args.input.trim().to_string(); if input.is_empty() { return Err("input is required".to_string()); } let options = args.options.clone(); let app_clone = app.clone(); let input_clone = input.clone(); let resolved = tokio::task::spawn_blocking(move || { resolve_stream_input(&app_clone, &input_clone, options) }) .await .map_err(|err| err.to_string()) .and_then(|res| res.map_err(|err| err.to_string()))?; let mut manager = state.lock().await; let mut added = 0usize; let mut title = None; let kind = resolved.kind_name(); match resolved { ResolvedStream::Hdhr { host, device, streams, sources, } => { let before = manager.manual_streams.len(); add_manual_entries(&mut manager, host.clone(), device.clone(), streams, sources); remember_manual_entry( &mut manager, &app, ManualSourceEntry { kind: "hdhr".to_string(), input: host, options: None, }, ); let after = manager.manual_streams.len(); added = after.saturating_sub(before); } ResolvedStream::Direct { entry } => { title = Some(entry.descriptor.title.clone()); if !manager .manual_streams .iter() .any(|existing| existing.id == entry.descriptor.id) { manager.manual_streams.push(entry.descriptor.clone()); added = 1; } manager .manual_sources .insert(entry.descriptor.id.0.clone(), entry.source); manager .manual_source_descriptors .insert(entry.source_descriptor.id.clone(), entry.source_descriptor); remember_manual_entry(&mut manager, &app, entry.manual_entry); } } Ok(AddStreamResult { kind, added, title }) } #[tauri::command] async fn start_stream( stream_id: String, state: State<'_, Arc>>, ) -> Result { let (stream_url, output_dir, port) = { let mut manager = state.lock().await; if let Some(_process) = manager.processes.get(&stream_id) { let url = playback_url(manager.port, &stream_id); return Ok(PlaybackInfo { stream_id, url }); } let source = manager .sources .get(&stream_id) .or_else(|| manager.manual_sources.get(&stream_id)) .ok_or_else(|| format!("unknown stream {stream_id}")) .cloned()?; let output_dir = manager.output_root.join(stream_dir_name(&stream_id)); (source.stream_url, output_dir, manager.port) }; let stream_id_clone = stream_id.clone(); let output_dir_clone = output_dir.clone(); let process = tokio::task::spawn_blocking(move || { spawn_ffmpeg_cmaf_ladder(&stream_url, &output_dir_clone, DEFAULT_SEGMENT_MS, 6, true) }) .await .map_err(|err| err.to_string()) .and_then(|res| res.map_err(|err| err.to_string()))?; let mut manager = state.lock().await; manager.processes.insert( stream_id.clone(), StreamProcess { _child: process, _output_dir: output_dir, }, ); Ok(PlaybackInfo { stream_id: stream_id_clone.clone(), url: playback_url(port, &stream_id_clone), }) } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct MoqStartArgs { remote: String, broadcast_name: String, stream_id: Option, track_name: Option, /// When true, subscribe to all known variants and write a master playlist so the player can /// auto-pick quality. This increases inbound bandwidth. auto_quality: Option, /// When set, subscribe to a specific variant id (e.g. "720p"). variant: Option, network_secret: Option, discovery: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] struct CatalogWatchArgs { peers: Vec, discovery: Option, } #[tauri::command] async fn start_catalog_watch( args: CatalogWatchArgs, state: State<'_, Arc>>, ) -> Result<(), String> { { let manager = state.lock().await; if manager.catalog_process.is_some() { return Ok(()); } } let state = state.inner().clone(); let state_for_task = state.clone(); let peers = args.peers.clone(); let discovery = parse_discovery(args.discovery.as_deref())?; let task = tauri::async_runtime::spawn(async move { let node = match MoqNode::bind_with_discovery(None, discovery).await { Ok(node) => node, Err(err) => { tracing::error!("catalog gossip failed to start: {err:#}"); return; } }; let mdns = if discovery.mdns { match ec_iroh::MdnsDiscovery::start( node.endpoint(), Some(ec_iroh::MDNS_USER_DATA), true, ) .await { Ok(mdns) => Some(mdns), Err(err) => { tracing::warn!("mdns discovery unavailable: {err:#}"); None } } } else { None }; let mut peer_list = parse_gossip_peers(peers); if let Some(mdns) = mdns.as_ref() { match mdns.discover_peers(Duration::from_secs(2)).await { Ok(found) => { for addr in found { if let Ok(encoded) = serde_json::to_string(&addr) { peer_list.push(encoded); } } } Err(err) => { tracing::warn!("mdns peer discovery failed: {err:#}"); } } } let peer_list = merge_peer_strings(peer_list); let mut gossip = match ec_iroh::CatalogGossip::join(node.endpoint().clone(), &peer_list).await { Ok(gossip) => gossip, Err(err) => { tracing::error!("catalog gossip join failed: {err:#}"); return; } }; // Keep adding newly discovered peers over time so "nearby directory" can // come online without manual contact entry. This is intentionally best-effort. let mdns_for_loop = mdns.clone(); let mut last_refresh = Instant::now() - Duration::from_secs(10); loop { if let Some(mdns) = mdns_for_loop.as_ref() { if last_refresh.elapsed() >= Duration::from_secs(5) { last_refresh = Instant::now(); match mdns.discover_peers(Duration::from_millis(800)).await { Ok(found) => gossip.add_peers(found), Err(err) => tracing::debug!("mdns peer refresh failed: {err:#}"), } } } match gossip.next_entry().await { Ok(Some(entry)) => { let descriptor = catalog_entry_to_descriptor(entry); let mut manager = state_for_task.lock().await; manager .catalog_streams .insert(descriptor.id.0.clone(), descriptor); } Ok(None) => break, Err(err) => { tracing::warn!("catalog gossip error: {err:#}"); tokio::time::sleep(Duration::from_secs(1)).await; } } } let _mdns = mdns; }); let mut manager = state.lock().await; manager.catalog_process = Some(CatalogProcess { _task: task }); Ok(()) } #[tauri::command] async fn start_moq_stream( args: MoqStartArgs, state: State<'_, Arc>>, ) -> Result { let stream_id = args .stream_id .clone() .unwrap_or_else(|| args.broadcast_name.clone()); let (output_dir, port) = { let mut manager = state.lock().await; if manager.moq_processes.contains_key(&stream_id) { let url = playback_url(manager.port, &stream_id); return Ok(PlaybackInfo { stream_id, url }); } let output_dir = manager.output_root.join(stream_dir_name(&stream_id)); (output_dir, manager.port) }; let output_dir_clone = output_dir.clone(); let broadcast_name = args.broadcast_name.clone(); let base_track_name = args .track_name .clone() .unwrap_or_else(|| DEFAULT_TRACK_NAME.to_string()); let auto_quality = args.auto_quality.unwrap_or(false); let variant = args.variant.clone().and_then(|v| { let t = v.trim().to_string(); if t.is_empty() { None } else { Some(t) } }); let remote = ec_iroh::parse_endpoint_addr(&args.remote).map_err(|err| err.to_string())?; let network_secret = parse_network_secret(args.network_secret).map_err(|err| err.to_string())?; let stream_id_for_key = args.stream_id.clone(); let discovery = parse_discovery(args.discovery.as_deref())?; let node = MoqNode::bind_with_discovery(None, discovery) .await .map_err(|err| err.to_string())?; let mdns = if discovery.mdns { match ec_iroh::MdnsDiscovery::start(node.endpoint(), Some(ec_iroh::MDNS_USER_DATA), true) .await { Ok(mdns) => Some(mdns), Err(err) => { tracing::warn!("mdns discovery unavailable: {err:#}"); None } } } else { None }; struct VariantSub { id: String, stream: ec_moq::MoqObjectStream, init_stream: Option, output_dir: PathBuf, } let variants = if auto_quality { let variants = default_cmaf_variants(); let base = base_track_name .split('/') .next() .unwrap_or(DEFAULT_TRACK_NAME) .to_string(); let mut subs = Vec::new(); for v in &variants { let chunk_track = format!("{}/{}", base, v.id); let init_track = format!("init/{}", v.id); let init_stream = node .subscribe_objects(remote.clone(), &broadcast_name, &init_track) .await .ok(); let stream = match node .subscribe_objects(remote.clone(), &broadcast_name, &chunk_track) .await { Ok(s) => s, Err(err) => { tracing::warn!("variant {} subscribe failed: {err:#}", v.id); continue; } }; subs.push(VariantSub { id: v.id.to_string(), stream, init_stream, output_dir: output_dir_clone.join(v.id), }); } Some((variants, subs)) } else { None }; let (single_stream, single_init_stream) = if !auto_quality { let base = base_track_name .split('/') .next() .unwrap_or(DEFAULT_TRACK_NAME) .to_string(); let (track_name, init_track) = if let Some(v) = variant.as_deref() { (format!("{base}/{v}"), format!("init/{v}")) } else { let track = base_track_name.clone(); let init = if let Some(suffix) = track.split('/').last() { if track.contains('/') && !suffix.is_empty() { format!("init/{suffix}") } else { "init".to_string() } } else { "init".to_string() }; (track, init) }; let init_stream = node .subscribe_objects(remote.clone(), &broadcast_name, &init_track) .await .ok(); let stream = node .subscribe_objects(remote.clone(), &broadcast_name, &track_name) .await .map_err(|err| err.to_string())?; (Some(stream), init_stream) } else { (None, None) }; let task = tauri::async_runtime::spawn(async move { async fn run_variant( mut stream: ec_moq::MoqObjectStream, mut init_stream: Option, output_dir: PathBuf, broadcast_name: String, stream_id_for_key: Option, network_secret: Option>, ) { let mut hls = match HlsWriter::new_cmaf(&output_dir, 2.0, 6) { Ok(hls) => hls, Err(err) => { tracing::error!("failed to create hls writer: {err:#}"); return; } }; let fallback = Duration::from_millis(2000); let mut fallback_index = 0u64; let mut init_ready = false; let mut buffered: Vec<(u64, f64, Vec)> = Vec::new(); loop { tokio::select! { biased; init_obj = async { if let Some(s) = init_stream.as_mut() { s.recv().await } else { None } }, if !init_ready && init_stream.is_some() => { let Some(object) = init_obj else { init_stream = None; continue; }; let index = object.meta.timing.as_ref().map(|t| t.chunk_index).unwrap_or(0); let key_id = object.meta.encryption.as_ref().map(|enc| enc.key_id.as_str()).unwrap_or(&broadcast_name); let init = if let Some(enc) = &object.meta.encryption { if enc.alg != ENCRYPTION_ALG { tracing::warn!("init: unsupported encryption {}", enc.alg); continue; } match decrypt_stream_data(key_id, index, &object.data, network_secret.as_deref()) { Some(plaintext) => plaintext, None => { tracing::warn!("init: decryption failed"); continue; } } } else { object.data }; if let Err(err) = hls.write_init_segment(&init) { tracing::warn!("failed to write init segment: {err:#}"); continue; } init_ready = true; buffered.sort_by_key(|(idx, _, _)| *idx); for (idx, dur, bytes) in buffered.drain(..) { if let Err(err) = hls.write_segment(idx, dur, &bytes) { tracing::warn!("failed to write buffered segment: {err:#}"); } } continue; } obj = stream.recv() => { let Some(object) = obj else { break; }; let index = object .meta .timing .as_ref() .map(|t| t.chunk_index) .unwrap_or_else(|| { let current = fallback_index; fallback_index += 1; current }); let stream_id = stream_id_for_key .as_deref() .or_else(|| object.meta.encryption.as_ref().map(|enc| enc.key_id.as_str())) .unwrap_or(&broadcast_name); let data = if let Some(enc) = &object.meta.encryption { if enc.alg != ENCRYPTION_ALG { tracing::warn!("unsupported encryption {}", enc.alg); continue; } match decrypt_stream_data(stream_id, index, &object.data, network_secret.as_deref()) { Some(plaintext) => plaintext, None => { tracing::warn!("decryption failed for chunk {}", index); continue; } } } else { object.data }; let duration = chunk_duration_secs(&object.meta, fallback); if !init_ready { buffered.push((index, duration, data)); continue; } if let Err(err) = hls.write_segment(index, duration, &data) { tracing::warn!("failed to write hls segment: {err:#}"); } } } } } if auto_quality { let Some((variants, subs)) = variants else { tracing::warn!("auto quality enabled, but no variant subscriptions were created"); return; }; if let Err(err) = write_hls_master_playlist(&output_dir_clone, &variants, 128_000) { tracing::warn!("failed to write master playlist: {err:#}"); } let mut handles = Vec::new(); for sub in subs { let out = sub.output_dir; let b = broadcast_name.clone(); let sid = stream_id_for_key.clone(); let secret = network_secret.clone(); handles.push(tokio::spawn(async move { run_variant(sub.stream, sub.init_stream, out, b, sid, secret).await })); } for h in handles { let _ = h.await; } return; } let Some(stream) = single_stream else { return; }; run_variant( stream, single_init_stream, output_dir_clone, broadcast_name, stream_id_for_key, network_secret, ) .await; }); let mut manager = state.lock().await; manager.moq_processes.insert( stream_id.clone(), MoqStreamProcess { _task: task, _node: node, _output_dir: output_dir, _mdns: mdns, }, ); Ok(PlaybackInfo { stream_id: stream_id.clone(), url: playback_url(port, &stream_id), }) } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct MoqPublishArgs { stream_id: String, network_secret: Option, chunk_ms: Option, announce: bool, gossip_peers: Vec, discovery: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct ShareInfo { stream_id: String, endpoint_addr: String, endpoint_id: String, broadcast_name: String, track_name: String, discovery: Option, announce_status: Option, } #[tauri::command] async fn start_moq_publish( args: MoqPublishArgs, state: State<'_, Arc>>, ) -> Result { let (stream_url, output_dir, stream_id, chunk_ms, descriptor) = { let mut manager = state.lock().await; if let Some(existing) = manager.moq_publishes.get(&args.stream_id) { return Ok(existing.share.clone()); } let source = manager .sources .get(&args.stream_id) .or_else(|| manager.manual_sources.get(&args.stream_id)) .ok_or_else(|| format!("unknown stream {}", args.stream_id)) .cloned()?; let descriptor = manager .streams .iter() .chain(manager.manual_streams.iter()) .find(|stream| stream.id.0 == args.stream_id) .cloned() .unwrap_or_else(|| StreamDescriptor { id: StreamId(args.stream_id.clone()), title: source.title.clone(), number: source.number.clone(), source: "source".to_string(), metadata: source.metadata.clone(), }); let output_dir = manager .output_root .join("publish") .join(stream_dir_name(&args.stream_id)); ( source.stream_url, output_dir, args.stream_id.clone(), args.chunk_ms.unwrap_or(DEFAULT_SEGMENT_MS), descriptor, ) }; fs::create_dir_all(&output_dir) .with_context(|| format!("failed to create {}", output_dir.display())) .map_err(|err| err.to_string())?; let variants = default_cmaf_variants(); let init_track_prefix = "init".to_string(); let chunks_track_prefix = DEFAULT_TRACK_NAME.to_string(); let manifest_track_name = DEFAULT_MANIFEST_TRACK_NAME.to_string(); let discovery = parse_discovery(args.discovery.as_deref())?; let node = MoqNode::bind_with_discovery(None, discovery) .await .map_err(|err| err.to_string())?; let mdns = if discovery.mdns { match ec_iroh::MdnsDiscovery::start(node.endpoint(), Some(ec_iroh::MDNS_USER_DATA), true) .await { Ok(mdns) => Some(mdns), Err(err) => { tracing::warn!("mdns discovery unavailable: {err:#}"); None } } } else { None }; let endpoint = node.endpoint().clone(); let endpoint_id = node.endpoint().id().to_string(); let endpoint_addr = serde_json::to_string(&node.endpoint_addr()) .unwrap_or_else(|_| node.endpoint().id().to_string()); let broadcast_name = stream_id.clone(); let track_name = chunks_track_prefix.clone(); let mut object_tracks = Vec::new(); // Back-compat: also publish a single default variant on the base tracks so simple links // (track=chunks) still work. object_tracks.push(chunks_track_prefix.clone()); object_tracks.push(init_track_prefix.clone()); for variant in &variants { object_tracks.push(format!("{}/{}", chunks_track_prefix, variant.id)); object_tracks.push(format!("{}/{}", init_track_prefix, variant.id)); } let mut publish_set = node .publish_track_set( &broadcast_name, object_tracks, vec![manifest_track_name.clone()], ) .await .map_err(|err| err.to_string())?; let network_secret = parse_network_secret(args.network_secret).map_err(|err| err.to_string())?; let share = ShareInfo { stream_id: stream_id.clone(), endpoint_addr: endpoint_addr.clone(), endpoint_id, broadcast_name: broadcast_name.clone(), track_name: track_name.clone(), discovery: args.discovery.clone(), announce_status: None, }; let stream_id_for_key = stream_id.clone(); let share_for_task = share.clone(); let task = tauri::async_runtime::spawn_blocking(move || { let result: Result<(), String> = (|| { // Spawn FFmpeg ladder segmenter and publish init+segments as encrypted objects. let mut child = spawn_ffmpeg_cmaf_ladder(&stream_url, &output_dir, chunk_ms, 0, false) .map_err(|err| err.to_string())?; for variant in &variants { let init_path = output_dir.join(variant.id).join("init.mp4"); wait_for_stable_file(&init_path, Duration::from_secs(20)) .map_err(|err| err.to_string())?; let data = fs::read(&init_path).map_err(|err| err.to_string())?; let key_id = format!( "{}/init", derive_variant_stream_id(&stream_id_for_key, variant.id) ); let object = build_object_bytes( &key_id, 0, 0, "init", data, network_secret.as_deref(), "video/mp4", None, ) .map_err(|err| err.to_string())?; let base_copy = object.clone(); publish_set .publish_object( &format!("{}/{}", init_track_prefix, variant.id), GroupId(0), object, ) .map_err(|err| err.to_string())?; if variant.id == "720p" { publish_set .publish_object(&init_track_prefix, GroupId(0), base_copy) .map_err(|err| err.to_string())?; } } let mut manifest_seq: u64 = 0; let mut index: u64 = 0; loop { let mut per_variant_hash = Vec::new(); let mut per_variant_data = Vec::new(); for variant in &variants { let seg_path = output_dir .join(variant.id) .join(format!("segment_{index:06}.m4s")); match wait_for_stable_file(&seg_path, Duration::from_secs(30)) { Ok(()) => {} Err(err) => { if let Ok(Some(status)) = child.try_wait() { if status.success() { return Ok(()); } return Err(format!("ffmpeg exited with {status} ({err:#})")); } return Err(err.to_string()); } } let data = fs::read(&seg_path).map_err(|err| err.to_string())?; let hash = blake3::hash(&data).to_hex().to_string(); per_variant_hash.push((variant.id.to_string(), hash)); per_variant_data.push((variant, data)); } let manifest = build_multi_variant_manifest( &stream_id_for_key, chunk_ms, index, &variants, &per_variant_hash, ) .map_err(|err| err.to_string())?; publish_set .publish_manifest(&manifest_track_name, manifest_seq, &manifest) .map_err(|err| err.to_string())?; manifest_seq += 1; for (variant, data) in per_variant_data { let key_id = derive_variant_stream_id(&stream_id_for_key, variant.id); let object = build_object_bytes( &key_id, index, chunk_ms * 27_000, "cmaf", data, network_secret.as_deref(), "video/iso.segment", Some(&manifest.manifest_id), ) .map_err(|err| err.to_string())?; let base_copy = object.clone(); publish_set .publish_object( &format!("{}/{}", chunks_track_prefix, variant.id), GroupId(index + 1), object, ) .map_err(|err| err.to_string())?; if variant.id == "720p" { publish_set .publish_object(&chunks_track_prefix, GroupId(index + 1), base_copy) .map_err(|err| err.to_string())?; } } index += 1; } })(); if let Err(err) = result { tracing::warn!("moq publish task ended: {err}"); } }); let mut manager = state.lock().await; manager.moq_publishes.insert( stream_id.clone(), MoqPublishProcess { _task: task, _node: node, _mdns: mdns.clone(), share: share_for_task, }, ); let mut share = share; if args.announce { let mut peers = parse_gossip_peers(args.gossip_peers); if let Some(mdns) = mdns.as_ref() { match mdns.discover_peers(Duration::from_secs(2)).await { Ok(found) => { for addr in found { if let Ok(encoded) = serde_json::to_string(&addr) { peers.push(encoded); } } } Err(err) => { tracing::warn!("mdns peer discovery failed: {err:#}"); } } } let peers = merge_peer_strings(peers); if peers.is_empty() { share.announce_status = Some("no gossip peers configured".to_string()); return Ok(share); } let entry = build_catalog_entry(&descriptor, &endpoint_addr, &broadcast_name, &track_name); match ec_iroh::CatalogGossip::join(endpoint.clone(), &peers).await { Ok(mut gossip) => match gossip.announce(entry).await { Ok(_) => share.announce_status = Some("announced".to_string()), Err(err) => share.announce_status = Some(format!("announce failed: {err}")), }, Err(err) => { share.announce_status = Some(format!("gossip join failed: {err}")); } } } Ok(share) } async fn load_persisted_manual_sources( app: AppHandle, state: Arc>, ) -> Result<()> { let entries = load_manual_sources(&app)?; if entries.is_empty() { let mut manager = state.lock().await; manager.manual_entries_loaded = true; return Ok(()); } { let mut manager = state.lock().await; if manager.manual_entries_loaded { return Ok(()); } manager.manual_entries_loaded = true; manager.manual_entries = entries.clone(); } for entry in entries { match entry.kind.as_str() { "hdhr" => { let host = entry.input.clone(); let result = tokio::task::spawn_blocking(move || hydrate_hdhr_host(&host)) .await .map_err(|err| anyhow!("manual host task failed: {err}"))?; match result { Ok((device, streams, sources)) => { let mut manager = state.lock().await; add_manual_entries(&mut manager, entry.input, device, streams, sources); } Err(err) => { tracing::warn!("failed to load manual HDHomeRun {}: {err:#}", entry.input); } } } "ytdlp" => { let app_clone = app.clone(); let entry_clone = entry.clone(); let result = tokio::task::spawn_blocking(move || { resolve_ytdlp_stream( &app_clone, &entry_clone.input, entry_clone.options.clone(), ) }) .await .map_err(|err| anyhow!("manual yt-dlp task failed: {err}"))?; match result { Ok(resolved) => { let mut manager = state.lock().await; if !manager .manual_streams .iter() .any(|existing| existing.id == resolved.descriptor.id) { manager.manual_streams.push(resolved.descriptor.clone()); } manager .manual_sources .insert(resolved.descriptor.id.0.clone(), resolved.source); manager.manual_source_descriptors.insert( resolved.source_descriptor.id.clone(), resolved.source_descriptor, ); } Err(err) => { tracing::warn!("failed to load yt-dlp source {}: {err:#}", entry.input); } } } "hls" => { let entry_clone = entry.clone(); let result = tokio::task::spawn_blocking(move || resolve_hls_stream(&entry_clone.input)) .await .map_err(|err| anyhow!("manual hls task failed: {err}"))?; match result { Ok(resolved) => { let mut manager = state.lock().await; if !manager .manual_streams .iter() .any(|existing| existing.id == resolved.descriptor.id) { manager.manual_streams.push(resolved.descriptor.clone()); } manager .manual_sources .insert(resolved.descriptor.id.0.clone(), resolved.source); manager.manual_source_descriptors.insert( resolved.source_descriptor.id.clone(), resolved.source_descriptor, ); } Err(err) => { tracing::warn!("failed to load hls source {}: {err:#}", entry.input); } } } "linux-dvb" => { let entry_clone = entry.clone(); let result = tokio::task::spawn_blocking(move || { let url = Url::parse(&entry_clone.input).context("invalid linux-dvb url")?; resolve_linux_dvb_stream(&url) }) .await .map_err(|err| anyhow!("manual linux-dvb task failed: {err}"))?; match result { Ok(resolved) => { let mut manager = state.lock().await; if !manager .manual_streams .iter() .any(|existing| existing.id == resolved.descriptor.id) { manager.manual_streams.push(resolved.descriptor.clone()); } manager .manual_sources .insert(resolved.descriptor.id.0.clone(), resolved.source); manager.manual_source_descriptors.insert( resolved.source_descriptor.id.clone(), resolved.source_descriptor, ); } Err(err) => { tracing::warn!("failed to load linux-dvb source {}: {err:#}", entry.input); } } } other => { tracing::warn!("unknown manual source kind {other}"); } } } Ok(()) } fn main() -> Result<()> { let output_root = std::env::temp_dir().join("every.channel").join("streams"); fs::create_dir_all(&output_root)?; let port = tauri::async_runtime::block_on(start_http_server(output_root.clone()))?; let manager = StreamManager::new(port, output_root); tauri::Builder::default() .manage(Arc::new(Mutex::new(manager))) .setup(|app| { let app_handle = app.handle().clone(); let state = app.state::>>().inner().clone(); tauri::async_runtime::spawn(async move { if let Err(err) = load_persisted_manual_sources(app_handle, state).await { tracing::warn!("manual sources load failed: {err:#}"); } }); Ok(()) }) .invoke_handler(tauri::generate_handler![ list_streams, refresh_streams, list_sources, add_hdhr_source, add_ytdlp_source, probe_stream, linux_dvb_list_adapters, linux_dvb_list_channels, linux_dvb_build_url, add_stream, start_stream, start_moq_stream, start_moq_publish, start_catalog_watch ]) .run(tauri::generate_context!()) .expect("tauri runtime error"); Ok(()) } async fn start_http_server(output_root: PathBuf) -> Result { let router = Router::new().nest_service("/streams", ServeDir::new(output_root)); let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?; let port = listener.local_addr()?.port(); tauri::async_runtime::spawn(async move { let _ = axum::serve(listener, router).await; }); Ok(port) } fn playback_url(port: u16, stream_id: &str) -> String { let dir = stream_dir_name(stream_id); format!("http://127.0.0.1:{port}/streams/{dir}/index.m3u8") } fn stream_dir_name(stream_id: &str) -> String { stream_id .chars() .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' }) .collect::() } fn discover_streams() -> Result<(Vec, HashMap)> { let devices = ec_hdhomerun::discover()?; let mut streams = Vec::new(); let mut sources = HashMap::new(); let mut seen_devices = HashSet::new(); let mut seen_streams = HashSet::new(); for device in devices { let device_key = if !device.id.0.is_empty() && device.id.0 != "unknown" { format!("id:{}", device.id.0) } else { format!("ip:{}", device.ip) }; if !seen_devices.insert(device_key) { continue; } let lineup = ec_hdhomerun::fetch_lineup(&device)?; for entry in lineup { let (descriptor, source) = descriptor_from_lineup(&device, &entry); let id = descriptor.id.0.clone(); if !seen_streams.insert(id.clone()) { continue; } streams.push(descriptor); sources.insert(id, source); } } // Linux DVB: if adapters exist and we can find a channels.conf, expose each channel as a // stream without requiring manual "add stream" input. Actual scanning is out of scope here; // we only consume an existing channels.conf. if let (Ok(adapters), Some(conf_path)) = ( ec_linux_iptv::list_adapters(), ec_linux_iptv::find_channels_conf(), ) { if let Ok(channels) = ec_linux_iptv::parse_channels_conf(&conf_path) { for adapter in adapters { let dvr = adapter.dvrs.first().copied().unwrap_or(0); for channel in channels.iter() { let tune_cmd = ec_linux_iptv::default_zap_tune_command( adapter.adapter, &conf_path, channel, ); let config = LinuxDvbConfig { adapter: adapter.adapter, frontend: 0, dvr, tune_command: Some(tune_cmd), tune_timeout_ms: Some(800), }; let stream_url = linux_dvb_url(&config); let stream_id = StreamKey { version: 1, broadcast: None, source: Some(SourceId { kind: "linux-dvb".to_string(), device_id: Some(format!("adapter{}:dvr{}", adapter.adapter, dvr)), channel: Some(channel.clone()), }), profile: None, variant: None, } .to_stream_id(); let id = stream_id.0.clone(); if !seen_streams.insert(id.clone()) { continue; } let mut metadata = Vec::new(); metadata.push(StreamMetadata { key: "adapter".to_string(), value: adapter.adapter.to_string(), }); metadata.push(StreamMetadata { key: "dvr".to_string(), value: dvr.to_string(), }); metadata.push(StreamMetadata { key: "channel".to_string(), value: channel.clone(), }); metadata.push(StreamMetadata { key: "channels_conf".to_string(), value: conf_path.display().to_string(), }); let descriptor = StreamDescriptor { id: stream_id, title: channel.clone(), number: None, source: "linux-dvb".to_string(), metadata: metadata.clone(), }; let source = StreamSource { stream_url, title: descriptor.title.clone(), number: None, metadata, }; streams.push(descriptor); sources.insert(id, source); } } } } Ok((streams, sources)) } fn descriptor_from_lineup( device: &HdhomerunDevice, entry: &LineupEntry, ) -> (StreamDescriptor, StreamSource) { let stream_id = StreamKey { version: 1, broadcast: None, source: Some(SourceId { kind: "hdhr".to_string(), device_id: Some(device.id.0.clone()), channel: entry .channel .number .clone() .or_else(|| Some(entry.channel.id.0.clone())), }), profile: None, variant: None, } .to_stream_id(); let mut metadata = Vec::new(); metadata.push(StreamMetadata { key: "device_id".to_string(), value: device.id.0.clone(), }); metadata.push(StreamMetadata { key: "device_ip".to_string(), value: device.ip.clone(), }); for channel_meta in &entry.channel.metadata { match channel_meta { ec_core::ChannelMetadata::Callsign(value) => metadata.push(StreamMetadata { key: "callsign".to_string(), value: value.clone(), }), ec_core::ChannelMetadata::Network(value) => metadata.push(StreamMetadata { key: "network".to_string(), value: value.clone(), }), ec_core::ChannelMetadata::Region(value) => metadata.push(StreamMetadata { key: "region".to_string(), value: value.clone(), }), ec_core::ChannelMetadata::Frequency(value) => metadata.push(StreamMetadata { key: "frequency".to_string(), value: value.clone(), }), ec_core::ChannelMetadata::Extra(key, value) => metadata.push(StreamMetadata { key: key.clone(), value: value.clone(), }), } } if is_drm_entry(entry) { metadata.push(StreamMetadata { key: "drm".to_string(), value: "likely".to_string(), }); } let title = entry.channel.name.clone(); let descriptor = StreamDescriptor { id: stream_id.clone(), title: title.clone(), number: entry.channel.number.clone(), source: "hdhr".to_string(), metadata: metadata.clone(), }; let source = StreamSource { stream_url: entry.stream_url.clone(), title, number: entry.channel.number.clone(), metadata, }; (descriptor, source) } fn is_drm_entry(entry: &LineupEntry) -> bool { fn looks_drm(value: &str) -> bool { let value = value.to_lowercase(); value.contains("drm") || value.contains("encrypted") || value.contains("protected") || value.contains("copy") || value.contains("widevine") } if entry.tags.iter().any(|tag| looks_drm(tag)) { return true; } if let Some(obj) = entry.raw.as_object() { for (key, value) in obj.iter() { if looks_drm(key) || looks_drm(&value.to_string()) { return true; } } } false } fn discover_sources() -> Result> { let devices = ec_hdhomerun::discover()?; let mut by_key: HashMap = HashMap::new(); for device in devices { let key = device_key(&device); by_key.entry(key).or_insert(device); } let mut sources = by_key .into_values() .map(|device| source_descriptor_for_device(&device, "online")) .collect::>(); if let Ok(adapters) = ec_linux_iptv::list_adapters() { for info in adapters { sources.push(SourceDescriptor { id: format!("linux-dvb:adapter{}", info.adapter), kind: "linux-dvb".to_string(), name: format!("Linux DVB adapter{}", info.adapter), ip: None, tuner_count: Some(info.frontends.len().min(u8::MAX as usize) as u8), status: "online".to_string(), }); } } Ok(sources) } fn merge_source_descriptors<'a, I>( mut sources: Vec, devices: I, ) -> Vec where I: IntoIterator, { let mut seen: HashSet = sources .iter() .map(|source| { if !source.id.is_empty() && source.id != "unknown" { format!("id:{}", source.id) } else { format!("ip:{}", source.ip.clone().unwrap_or_default()) } }) .collect(); for (host, device) in devices { // Manual HDHomeRun entries are aliases, not distinct devices. Deduplicate by device id when // possible so the Sources panel does not show the same tuner multiple times. let key = device_key(device); if key.starts_with("id:") { if !seen.insert(key) { continue; } sources.push(source_descriptor_for_device_with_id( device, "manual", device.id.0.clone(), )); continue; } let key = manual_source_id(host, device); if seen.insert(format!("id:{key}")) { sources.push(source_descriptor_for_device_with_id(device, "manual", key)); } } sources } fn device_key(device: &HdhomerunDevice) -> String { if !device.id.0.is_empty() && device.id.0 != "unknown" { format!("id:{}", device.id.0) } else { format!("ip:{}", device.ip) } } fn source_descriptor_for_device(device: &HdhomerunDevice, status: &str) -> SourceDescriptor { source_descriptor_for_device_with_id(device, status, device.id.0.clone()) } fn source_descriptor_for_device_with_id( device: &HdhomerunDevice, status: &str, id: String, ) -> SourceDescriptor { SourceDescriptor { id, kind: "hdhr".to_string(), name: device.friendly_name.clone().unwrap_or_else(|| { device .model_number .clone() .unwrap_or_else(|| "HDHomeRun".to_string()) }), ip: Some(device.ip.clone()), tuner_count: Some(device.tuner_count), status: status.to_string(), } } fn manual_source_id(host: &str, device: &HdhomerunDevice) -> String { if !device.id.0.is_empty() && device.id.0 != "unknown" { format!("{}@{}", device.id.0, host) } else { host.to_string() } } fn normalize_host(host: &str) -> String { let trimmed = host.trim(); let stripped = trimmed .strip_prefix("http://") .or_else(|| trimmed.strip_prefix("https://")) .unwrap_or(trimmed); let stripped = stripped.trim_end_matches('/'); stripped .split('/') .next() .unwrap_or(stripped) .trim() .to_string() } fn remember_manual_entry(manager: &mut StreamManager, app: &AppHandle, entry: ManualSourceEntry) { let Some(entry) = normalize_manual_entry(entry) else { return; }; if !manager .manual_entries .iter() .any(|existing| existing == &entry) { manager.manual_entries.push(entry); if let Err(err) = save_manual_sources(app, &manager.manual_entries) { tracing::warn!("failed to persist manual sources: {err:#}"); } } } #[derive(Debug, Clone, Deserialize)] struct YtDlpFormat { format_id: Option, format: Option, format_note: Option, url: Option, protocol: Option, tbr: Option, height: Option, width: Option, fps: Option, ext: Option, vcodec: Option, acodec: Option, } #[derive(Debug, Clone, Deserialize)] struct YtDlpInfo { id: Option, title: Option, webpage_url: Option, is_live: Option, live_status: Option, extractor: Option, extractor_key: Option, formats: Option>, url: Option, } struct ResolvedDirectStream { descriptor: StreamDescriptor, source: StreamSource, source_descriptor: SourceDescriptor, manual_entry: ManualSourceEntry, } enum ResolvedStream { Hdhr { host: String, device: HdhomerunDevice, streams: Vec, sources: HashMap, }, Direct { entry: ResolvedDirectStream, }, } impl ResolvedStream { fn kind_name(&self) -> String { match self { ResolvedStream::Hdhr { .. } => "hdhr".to_string(), ResolvedStream::Direct { entry } => entry.manual_entry.kind.clone(), } } } fn resolve_ytdlp_stream( app: &AppHandle, url: &str, options: Option, ) -> Result { let info = run_ytdlp_json(app, url, options.as_ref())?; if !is_ytdlp_live(&info) { return Err(anyhow!("yt-dlp stream is not live")); } let stream_url = pick_ytdlp_stream_url(&info) .ok_or_else(|| anyhow!("yt-dlp did not return a usable stream url"))?; let stream_id = StreamKey { version: 1, broadcast: None, source: Some(SourceId { kind: "ytdlp".to_string(), device_id: info.id.clone(), channel: Some(info.webpage_url.clone().unwrap_or_else(|| url.to_string())), }), profile: Some("hls".to_string()), variant: None, } .to_stream_id(); let title = info .title .clone() .unwrap_or_else(|| "yt-dlp stream".to_string()); let mut metadata = Vec::new(); metadata.push(StreamMetadata { key: "source_kind".to_string(), value: "ytdlp".to_string(), }); metadata.push(StreamMetadata { key: "origin_url".to_string(), value: info.webpage_url.clone().unwrap_or_else(|| url.to_string()), }); if let Some(id) = info.id.clone() { metadata.push(StreamMetadata { key: "ytdlp_id".to_string(), value: id, }); } if let Some(live) = info.is_live { metadata.push(StreamMetadata { key: "is_live".to_string(), value: live.to_string(), }); } let descriptor = StreamDescriptor { id: stream_id.clone(), title: title.clone(), number: None, source: "ytdlp".to_string(), metadata: metadata.clone(), }; let source = StreamSource { stream_url: stream_url.clone(), title: title.clone(), number: None, metadata: metadata.clone(), }; let source_id = info .id .clone() .map(|id| format!("ytdlp:{id}")) .unwrap_or_else(|| format!("ytdlp:{}", stream_id.0)); let source_descriptor = SourceDescriptor { id: source_id, kind: "ytdlp".to_string(), name: title, ip: None, tuner_count: None, status: if info.is_live.unwrap_or(false) { "live".to_string() } else { "ready".to_string() }, }; Ok(ResolvedDirectStream { descriptor, source, source_descriptor, manual_entry: ManualSourceEntry { kind: "ytdlp".to_string(), input: url.to_string(), options, }, }) } fn run_ytdlp_json( app: &AppHandle, url: &str, options: Option<&ManualSourceOptions>, ) -> Result { let python = resolve_ytdlp_python(app)?; let mut cmd = Command::new(python); cmd.arg("-m") .arg("yt_dlp") .arg("-J") .arg("--no-playlist") .arg("--no-warnings") .arg("--no-progress"); if let Some(options) = options { if let Some(format_id) = options.ytdlp_format.as_ref() { if !format_id.trim().is_empty() { cmd.arg("-f").arg(format_id); } } if options.ytdlp_live_from_start { cmd.arg("--live-from-start"); } } let output = cmd .arg(url) .env("PYTHONNOUSERSITE", "1") .output() .context("failed to run yt-dlp")?; if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); return Err(anyhow!("yt-dlp failed: {stderr}")); } parse_ytdlp_json(&output.stdout) } fn parse_ytdlp_json(output: &[u8]) -> Result { let text = String::from_utf8_lossy(output); if let Ok(info) = serde_json::from_str::(text.trim()) { return Ok(info); } for line in text.lines().rev() { if let Ok(info) = serde_json::from_str::(line.trim()) { return Ok(info); } } Err(anyhow!("failed to parse yt-dlp json output")) } fn pick_ytdlp_stream_url(info: &YtDlpInfo) -> Option { let mut best: Option<(f64, String)> = None; if let Some(formats) = info.formats.as_ref() { for format in formats { let url = match format.url.as_ref() { Some(url) => url, None => continue, }; let mut score = 0.0; if let Some(protocol) = format.protocol.as_ref() { let protocol = protocol.to_lowercase(); if protocol.contains("m3u8") { score += 1000.0; } } if let Some(ext) = format.ext.as_ref() { if ext.eq_ignore_ascii_case("mp4") { score += 10.0; } } if let Some(height) = format.height { score += height as f64; } if let Some(tbr) = format.tbr { score += tbr; } match best { Some((best_score, _)) if best_score >= score => {} _ => best = Some((score, url.clone())), } } } if let Some((_, url)) = best { return Some(url); } info.url.clone() } fn resolve_ytdlp_python(app: &AppHandle) -> Result { if let Ok(path) = std::env::var("EVERY_CHANNEL_YTDLP_PYTHON") { return Ok(PathBuf::from(path)); } let target = match std::env::consts::OS { "macos" => "macos", "linux" => "linux", "windows" => "windows", other => other, }; let base = app .path() .resolve(format!("yt-dlp/{target}/venv"), BaseDirectory::Resource) .context("failed to resolve yt-dlp resource path")?; let python = if cfg!(windows) { base.join("Scripts").join("python.exe") } else { base.join("bin").join("python") }; if python.exists() { Ok(python) } else { Err(anyhow!( "yt-dlp runtime not bundled; run scripts/vendor-yt-dlp.sh" )) } } fn probe_stream_input(app: &AppHandle, input: &str) -> Result { let trimmed = input.trim(); if trimmed.eq_ignore_ascii_case("linux-dvb") || trimmed.eq_ignore_ascii_case("dvb") { return Ok(StreamProbe { kind: "linux-dvb".to_string(), live: true, requires_options: true, message: Some("Select adapter + channel".to_string()), ytdlp: None, }); } if !input.contains("://") { let host = normalize_host(input); if host.is_empty() { return Err(anyhow!("input is required")); } if ec_hdhomerun::discover_from_host(&host).is_ok() { return Ok(StreamProbe { kind: "hdhr".to_string(), live: true, requires_options: false, message: None, ytdlp: None, }); } return Err(anyhow!("input is not a valid URL or HDHomeRun host")); } let url = Url::parse(input).context("invalid url")?; if is_linux_dvb_scheme(&url) { let _config = parse_linux_dvb_url(&url)?; return Ok(StreamProbe { kind: "linux-dvb".to_string(), live: true, requires_options: false, message: None, ytdlp: None, }); } if let Some(host) = url.host_str() { if is_likely_local_host(host) && ec_hdhomerun::discover_from_host(host).is_ok() { return Ok(StreamProbe { kind: "hdhr".to_string(), live: true, requires_options: false, message: None, ytdlp: None, }); } } if url.as_str().contains(".m3u8") { let _resolved = probe_hls_live(url.as_str())?; return Ok(StreamProbe { kind: "hls".to_string(), live: true, requires_options: false, message: None, ytdlp: None, }); } let info = run_ytdlp_json(app, input, None)?; if !is_ytdlp_live(&info) { return Err(anyhow!("yt-dlp stream is not live")); } let probe = build_ytdlp_probe(&info); Ok(StreamProbe { kind: "ytdlp".to_string(), live: true, requires_options: !probe.formats.is_empty(), message: None, ytdlp: Some(probe), }) } fn resolve_stream_input( app: &AppHandle, input: &str, options: Option, ) -> Result { let trimmed = input.trim(); if trimmed.eq_ignore_ascii_case("linux-dvb") || trimmed.eq_ignore_ascii_case("dvb") { return Err(anyhow!( "linux-dvb requires options; use the linux DVB picker" )); } if !input.contains("://") { let host = normalize_host(input); let (device, streams, sources) = hydrate_hdhr_host(&host)?; return Ok(ResolvedStream::Hdhr { host, device, streams, sources, }); } let url = Url::parse(input).context("invalid url")?; if is_linux_dvb_scheme(&url) { let resolved = resolve_linux_dvb_stream(&url)?; return Ok(ResolvedStream::Direct { entry: resolved }); } if let Some(host) = url.host_str() { if is_likely_local_host(host) { let (device, streams, sources) = hydrate_hdhr_host(host)?; return Ok(ResolvedStream::Hdhr { host: host.to_string(), device, streams, sources, }); } } if url.as_str().contains(".m3u8") { let resolved = resolve_hls_stream(input)?; return Ok(ResolvedStream::Direct { entry: resolved }); } let resolved = resolve_ytdlp_stream(app, input, options)?; Ok(ResolvedStream::Direct { entry: resolved }) } fn is_linux_dvb_scheme(url: &Url) -> bool { matches!(url.scheme(), "dvb" | "linux-dvb") } fn resolve_linux_dvb_stream(url: &Url) -> Result { let config = parse_linux_dvb_url(url)?; let stream_id = StreamKey { version: 1, broadcast: None, source: Some(SourceId { kind: "linux-dvb".to_string(), device_id: Some(format!("adapter{}:dvr{}", config.adapter, config.dvr)), channel: None, }), profile: None, variant: None, } .to_stream_id(); let title = format!("Linux DVB adapter{} dvr{}", config.adapter, config.dvr); let descriptor = StreamDescriptor { id: stream_id.clone(), title: title.clone(), number: None, source: "linux-dvb".to_string(), metadata: vec![ StreamMetadata { key: "adapter".to_string(), value: config.adapter.to_string(), }, StreamMetadata { key: "dvr".to_string(), value: config.dvr.to_string(), }, ], }; let stream_url = linux_dvb_url(&config); let source = StreamSource { stream_url: stream_url.clone(), title, number: None, metadata: descriptor.metadata.clone(), }; let source_descriptor = SourceDescriptor { id: format!("linux-dvb:adapter{}:dvr{}", config.adapter, config.dvr), kind: "linux-dvb".to_string(), name: format!("Linux DVB adapter{} dvr{}", config.adapter, config.dvr), ip: None, tuner_count: None, status: "manual".to_string(), }; Ok(ResolvedDirectStream { descriptor, source, source_descriptor, manual_entry: ManualSourceEntry { kind: "linux-dvb".to_string(), input: stream_url, options: None, }, }) } fn parse_linux_dvb_url(url: &Url) -> Result { let mut adapter = None; let mut dvr = None; let mut tune_cmd = Vec::new(); let mut tune_wait_ms = None; for (key, value) in url.query_pairs() { match key.as_ref() { "adapter" => adapter = value.parse::().ok(), "dvr" => dvr = value.parse::().ok(), "tune" | "tune_cmd" => tune_cmd.push(value.to_string()), "tune_wait_ms" => tune_wait_ms = value.parse::().ok(), _ => {} } } if adapter.is_none() || dvr.is_none() { let segments = url .path_segments() .map(|segments| segments.collect::>()) .unwrap_or_default(); if segments.len() >= 2 && segments[0].starts_with("adapter") { adapter = segments[0] .trim_start_matches("adapter") .parse::() .ok(); } if segments.len() >= 3 && segments[1] == "dvr" { dvr = segments[2].parse::().ok(); } else if segments.len() >= 2 && segments[1].starts_with("dvr") { dvr = segments[1].trim_start_matches("dvr").parse::().ok(); } } let adapter = adapter.unwrap_or(0); let dvr = dvr.unwrap_or(0); Ok(LinuxDvbConfig { adapter, frontend: 0, dvr, tune_command: if tune_cmd.is_empty() { None } else { Some(tune_cmd) }, tune_timeout_ms: tune_wait_ms, }) } fn linux_dvb_url(config: &LinuxDvbConfig) -> String { let mut url = Url::parse("linux-dvb://localhost").expect("static url"); { let mut pairs = url.query_pairs_mut(); pairs.append_pair("adapter", &config.adapter.to_string()); pairs.append_pair("dvr", &config.dvr.to_string()); if let Some(cmd) = &config.tune_command { for part in cmd { pairs.append_pair("tune", part); } } if let Some(wait) = config.tune_timeout_ms { pairs.append_pair("tune_wait_ms", &wait.to_string()); } } url.to_string() } fn is_likely_local_host(host: &str) -> bool { if host.eq_ignore_ascii_case("localhost") || host.ends_with(".local") { return true; } is_private_ip(host) } fn is_private_ip(host: &str) -> bool { let Ok(ip) = host.parse::() else { return false; }; match ip { IpAddr::V4(addr) => { let octets = addr.octets(); match octets[0] { 10 => true, 172 => (16..=31).contains(&octets[1]), 192 => octets[1] == 168, 127 => true, _ => false, } } IpAddr::V6(addr) => addr.is_loopback() || addr.is_unique_local(), } } fn build_ytdlp_probe(info: &YtDlpInfo) -> YtdlpProbe { let mut formats = Vec::new(); if let Some(list) = info.formats.as_ref() { for format in list { let format_id = match format.format_id.as_ref() { Some(id) => id.clone(), None => continue, }; let mut parts = Vec::new(); if let Some(height) = format.height { if let Some(width) = format.width { parts.push(format!("{width}x{height}")); } else { parts.push(format!("{height}p")); } } if let Some(tbr) = format.tbr { parts.push(format!("{tbr:.0} kbps")); } if let Some(protocol) = format.protocol.as_ref() { parts.push(protocol.to_string()); } if let Some(ext) = format.ext.as_ref() { parts.push(ext.to_string()); } if let Some(note) = format.format_note.as_ref() { parts.push(note.to_string()); } if let Some(vcodec) = format.vcodec.as_ref() { if !vcodec.eq_ignore_ascii_case("none") { parts.push(vcodec.to_string()); } } if let Some(acodec) = format.acodec.as_ref() { if !acodec.eq_ignore_ascii_case("none") { parts.push(acodec.to_string()); } } let label = if let Some(format_label) = format.format.as_ref() { format_label.clone() } else if parts.is_empty() { format_id.clone() } else { parts.join(" • ") }; formats.push(YtdlpFormatOption { format_id, label }); } } let default_format = formats.first().map(|f| f.format_id.clone()); YtdlpProbe { title: info.title.clone(), formats, default_format, supports_live_from_start: supports_live_from_start(info), } } fn supports_live_from_start(info: &YtDlpInfo) -> bool { let mut key = String::new(); if let Some(value) = info.extractor_key.as_ref() { key.push_str(value); } else if let Some(value) = info.extractor.as_ref() { key.push_str(value); } let key = key.to_lowercase(); key.contains("youtube") || key.contains("twitch") } fn is_ytdlp_live(info: &YtDlpInfo) -> bool { if info.is_live == Some(true) { return true; } if let Some(status) = info.live_status.as_ref() { return status.eq_ignore_ascii_case("is_live"); } false } fn resolve_hls_stream(url: &str) -> Result { let resolved_url = probe_hls_live(url)?; let parsed = Url::parse(&resolved_url).context("invalid hls url")?; let host = parsed.host_str().unwrap_or("HLS").to_string(); let title = format!("HLS {host}"); let stream_id = StreamKey { version: 1, broadcast: None, source: Some(SourceId { kind: "hls".to_string(), device_id: None, channel: Some(url.to_string()), }), profile: Some("hls".to_string()), variant: None, } .to_stream_id(); let metadata = vec![ StreamMetadata { key: "source_kind".to_string(), value: "hls".to_string(), }, StreamMetadata { key: "origin_url".to_string(), value: url.to_string(), }, StreamMetadata { key: "resolved_url".to_string(), value: resolved_url.clone(), }, ]; let descriptor = StreamDescriptor { id: stream_id.clone(), title: title.clone(), number: None, source: "hls".to_string(), metadata: metadata.clone(), }; let source = StreamSource { stream_url: resolved_url, title: title.clone(), number: None, metadata: metadata.clone(), }; let source_descriptor = SourceDescriptor { id: format!("hls:{}", stream_id.0), kind: "hls".to_string(), name: title, ip: None, tuner_count: None, status: "live".to_string(), }; Ok(ResolvedDirectStream { descriptor, source, source_descriptor, manual_entry: ManualSourceEntry { kind: "hls".to_string(), input: url.to_string(), options: None, }, }) } fn probe_hls_live(url: &str) -> Result { let text = fetch_hls_text(url)?; if text.contains("#EXT-X-STREAM-INF") { let base = Url::parse(url).context("invalid hls url")?; let mut lines = text.lines(); while let Some(line) = lines.next() { if line.trim().starts_with("#EXT-X-STREAM-INF") { for candidate in lines.by_ref() { let candidate = candidate.trim(); if candidate.is_empty() || candidate.starts_with('#') { continue; } let resolved = base.join(candidate).context("invalid hls variant url")?; return probe_hls_live(resolved.as_str()); } } } } if text.contains("#EXT-X-ENDLIST") || text.contains("#EXT-X-PLAYLIST-TYPE:VOD") { return Err(anyhow!("HLS playlist is not live")); } Ok(url.to_string()) } fn fetch_hls_text(url: &str) -> Result { let resp = reqwest_blocking::get(url).context("failed to fetch hls url")?; if !resp.status().is_success() { return Err(anyhow!("hls request failed with {}", resp.status())); } Ok(resp.text().context("failed to read hls response")?) } fn manual_sources_path(app: &AppHandle) -> Result { app.path() .resolve("manual_sources.json", BaseDirectory::AppConfig) .context("failed to resolve app config path") } fn legacy_manual_hosts_path(app: &AppHandle) -> Result { app.path() .resolve("manual_hdhomerun.json", BaseDirectory::AppConfig) .context("failed to resolve app config path") } fn normalize_manual_entry(mut entry: ManualSourceEntry) -> Option { entry.input = match entry.kind.as_str() { "hdhr" => normalize_host(&entry.input), "linux-dvb" => { if let Ok(url) = Url::parse(&entry.input) { if is_linux_dvb_scheme(&url) { if let Ok(config) = parse_linux_dvb_url(&url) { linux_dvb_url(&config) } else { entry.input.trim().to_string() } } else { entry.input.trim().to_string() } } else { entry.input.trim().to_string() } } _ => entry.input.trim().to_string(), }; if entry.input.is_empty() { None } else { Some(entry) } } fn load_manual_sources(app: &AppHandle) -> Result> { let path = manual_sources_path(app)?; let entries: Vec = if path.exists() { let bytes = fs::read(&path).with_context(|| format!("failed to read {}", path.display()))?; serde_json::from_slice(&bytes).context("invalid manual_sources.json")? } else { let legacy_path = legacy_manual_hosts_path(app)?; if !legacy_path.exists() { Vec::new() } else { let bytes = fs::read(&legacy_path) .with_context(|| format!("failed to read {}", legacy_path.display()))?; let hosts: Vec = serde_json::from_slice(&bytes).context("invalid manual_hdhomerun.json")?; hosts .into_iter() .map(|host| ManualSourceEntry { kind: "hdhr".to_string(), input: host, options: None, }) .collect() } }; let mut seen = HashSet::new(); let mut normalized = Vec::new(); for entry in entries.into_iter() { let Some(entry) = normalize_manual_entry(entry) else { continue; }; if seen.insert(entry.clone()) { normalized.push(entry); } } Ok(normalized) } fn save_manual_sources(app: &AppHandle, entries: &[ManualSourceEntry]) -> Result<()> { let path = manual_sources_path(app)?; if let Some(parent) = path.parent() { fs::create_dir_all(parent) .with_context(|| format!("failed to create {}", parent.display()))?; } let data = serde_json::to_vec_pretty(entries)?; fs::write(&path, data).with_context(|| format!("failed to write {}", path.display()))?; Ok(()) } fn hydrate_hdhr_host( host: &str, ) -> Result<( HdhomerunDevice, Vec, HashMap, )> { let device = ec_hdhomerun::discover_from_host(host)?; let lineup = ec_hdhomerun::fetch_lineup(&device)?; let mut streams = Vec::new(); let mut sources = HashMap::new(); let mut seen = HashSet::new(); for entry in lineup { let (descriptor, source) = descriptor_from_lineup(&device, &entry); let id = descriptor.id.0.clone(); if !seen.insert(id.clone()) { continue; } streams.push(descriptor); sources.insert(id, source); } Ok((device, streams, sources)) } fn add_manual_entries( manager: &mut StreamManager, host: String, device: HdhomerunDevice, streams: Vec, sources: HashMap, ) { manager.manual_devices.insert(host, device); for stream in streams { if !manager .manual_streams .iter() .any(|existing| existing.id == stream.id) { manager.manual_streams.push(stream); } } for (id, source) in sources { manager.manual_sources.insert(id, source); } } fn merge_local_streams( local: &[StreamDescriptor], manual: &[StreamDescriptor], ) -> Vec { let mut merged = local.to_vec(); let mut seen: HashSet<_> = local.iter().map(|stream| stream.id.0.clone()).collect(); for stream in manual { if seen.insert(stream.id.0.clone()) { merged.push(stream.clone()); } } merged } fn merge_streams( local: &[StreamDescriptor], catalog: &HashMap, ) -> Vec { let mut merged = local.to_vec(); let existing: HashSet<_> = local.iter().map(|s| s.id.0.clone()).collect(); for (id, entry) in catalog.iter() { if !existing.contains(id) { merged.push(entry.clone()); } } merged } fn catalog_entry_to_descriptor(entry: StreamCatalogEntry) -> StreamDescriptor { let mut descriptor = entry.stream; if let Some(moq) = entry.moq { descriptor.source = "moq".to_string(); descriptor.metadata.push(StreamMetadata { key: "moq_endpoint".to_string(), value: moq.endpoint, }); descriptor.metadata.push(StreamMetadata { key: "moq_broadcast".to_string(), value: moq.broadcast_name, }); descriptor.metadata.push(StreamMetadata { key: "moq_track".to_string(), value: moq.track_name, }); if let Some(enc) = moq.encryption { descriptor.metadata.push(StreamMetadata { key: "moq_enc_alg".to_string(), value: enc.alg, }); descriptor.metadata.push(StreamMetadata { key: "moq_key_id".to_string(), value: enc.key_id, }); } } descriptor } #[derive(Debug, Clone, Copy)] struct CmafVariantSpec { id: &'static str, width: u32, height: u32, video_bitrate_kbps: u32, } fn default_cmaf_variants() -> Vec { vec![ CmafVariantSpec { id: "1080p", width: 1920, height: 1080, video_bitrate_kbps: 6000, }, CmafVariantSpec { id: "720p", width: 1280, height: 720, video_bitrate_kbps: 3000, }, CmafVariantSpec { id: "480p", width: 854, height: 480, video_bitrate_kbps: 1200, }, ] } fn write_hls_master_playlist( output_dir: &Path, variants: &[CmafVariantSpec], audio_bitrate_bps: u32, ) -> Result<()> { let mut text = String::new(); text.push_str("#EXTM3U\n#EXT-X-VERSION:7\n"); for v in variants { let bandwidth = (v.video_bitrate_kbps * 1000).saturating_add(audio_bitrate_bps); text.push_str(&format!( "#EXT-X-STREAM-INF:BANDWIDTH={bandwidth},RESOLUTION={}x{}\n{}/index.m3u8\n", v.width, v.height, v.id )); } fs::create_dir_all(output_dir) .with_context(|| format!("failed to create {}", output_dir.display()))?; fs::write(output_dir.join("index.m3u8"), text.as_bytes()) .with_context(|| format!("failed to write {}", output_dir.display()))?; Ok(()) } fn wait_for_stable_file(path: &Path, timeout: Duration) -> Result<()> { let start = Instant::now(); let mut last_len: Option = None; let mut stable_ms: u64 = 0; while start.elapsed() < timeout { if let Ok(meta) = fs::metadata(path) { let len = meta.len(); if len > 0 { if Some(len) == last_len { stable_ms += 100; if stable_ms >= 300 { return Ok(()); } } else { last_len = Some(len); stable_ms = 0; } } } std::thread::sleep(Duration::from_millis(100)); } Err(anyhow!( "timed out waiting for stable file {} after {:?}", path.display(), timeout )) } fn spawn_ffmpeg_cmaf_ladder( stream_url: &str, output_dir: &Path, chunk_ms: u64, hls_list_size: usize, delete_segments: bool, ) -> Result { let variants = default_cmaf_variants(); let segment_time = format!("{:.3}", chunk_ms as f64 / 1000.0); let _ = fs::remove_dir_all(output_dir); fs::create_dir_all(output_dir) .with_context(|| format!("failed to create {}", output_dir.display()))?; for v in &variants { fs::create_dir_all(output_dir.join(v.id))?; } // Keep playback URL stable: /index.m3u8 is always present. For multi-variant this is a master. write_hls_master_playlist(output_dir, &variants, 128_000)?; if stream_url.starts_with("linux-dvb://") || stream_url.starts_with("dvb://") { let url = Url::parse(stream_url).context("invalid linux-dvb url")?; let config = parse_linux_dvb_url(&url)?; let reader = ec_linux_iptv::open_stream(&config).context("failed to open linux dvb stream")?; return spawn_ffmpeg_cmaf_ladder_from_reader( reader, output_dir, &segment_time, &variants, hls_list_size, delete_segments, ); } spawn_ffmpeg_cmaf_ladder_with_input( vec!["-i".to_string(), stream_url.to_string()], None, output_dir, &segment_time, &variants, hls_list_size, delete_segments, ) } fn sanitize_component(value: &str) -> String { value .chars() .map(|c| match c { 'a'..='z' | '0'..='9' | '-' | '_' | '/' => c, 'A'..='Z' => c.to_ascii_lowercase(), _ => '_', }) .collect() } fn derive_variant_stream_id(base_stream_id: &str, variant_id: &str) -> String { let v = sanitize_component(variant_id); format!("{}/variant-{}", base_stream_id.trim_end_matches('/'), v) } fn build_multi_variant_manifest( base_stream_id: &str, chunk_ms: u64, chunk_index: u64, variants: &[CmafVariantSpec], per_variant_hash: &[(String, String)], ) -> Result { let created_unix_ms = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_millis() as u64; let epoch_id = format!("epoch-{created_unix_ms}"); let mut entries = Vec::with_capacity(variants.len()); for v in variants { let Some((_, hash)) = per_variant_hash.iter().find(|(id, _)| id == v.id) else { return Err(anyhow!("missing hash for variant {}", v.id)); }; let chunk_hashes = vec![hash.clone()]; let merkle_root = merkle_root_from_hashes(&chunk_hashes).map_err(|err| anyhow!("{err}"))?; entries.push(ManifestVariant { variant_id: v.id.to_string(), stream_id: StreamId(derive_variant_stream_id(base_stream_id, v.id)), chunk_start_index: chunk_index, total_chunks: 1, merkle_root, chunk_hashes, metadata: vec![ StreamMetadata { key: "width".to_string(), value: v.width.to_string(), }, StreamMetadata { key: "height".to_string(), value: v.height.to_string(), }, StreamMetadata { key: "video_bitrate_kbps".to_string(), value: v.video_bitrate_kbps.to_string(), }, ], }); } entries.sort_by(|a, b| a.variant_id.cmp(&b.variant_id)); let roots = entries .iter() .map(|v| v.merkle_root.clone()) .collect::>(); let body_root = merkle_root_from_hashes(&roots).map_err(|err| anyhow!("{err}"))?; let body = ec_core::ManifestBody { stream_id: StreamId(base_stream_id.to_string()), epoch_id, chunk_duration_ms: chunk_ms, total_chunks: 1, chunk_start_index: chunk_index, encoder_profile_id: "deterministic-h264-aac".to_string(), merkle_root: body_root, created_unix_ms, metadata: vec![], chunk_hashes: vec![], variants: Some(entries), }; let manifest_id = body.manifest_id()?; let mut signatures = Vec::new(); if let Some(keypair) = ec_crypto::load_manifest_keypair_from_env().map_err(|err| anyhow!(err))? { signatures.push(ec_crypto::sign_manifest_id(&manifest_id, &keypair)); } Ok(Manifest { body, manifest_id, signatures, }) } fn spawn_ffmpeg_cmaf_ladder_from_reader( reader: R, output_dir: &Path, segment_time: &str, variants: &[CmafVariantSpec], hls_list_size: usize, delete_segments: bool, ) -> Result { spawn_ffmpeg_cmaf_ladder_with_input( vec!["-i".to_string(), "pipe:0".to_string()], Some(Box::new(reader)), output_dir, segment_time, variants, hls_list_size, delete_segments, ) } fn spawn_ffmpeg_cmaf_ladder_with_input( input_args: Vec, reader: Option>, output_dir: &Path, segment_time: &str, variants: &[CmafVariantSpec], hls_list_size: usize, delete_segments: bool, ) -> Result { let mut cmd = Command::new("ffmpeg"); cmd.current_dir(output_dir); cmd.arg("-hide_banner") .arg("-loglevel") .arg("error") .arg("-nostdin") .arg("-y"); for arg in input_args { cmd.arg(arg); } // Reduce opportunities for non-deterministic scheduling in filters/decoders. cmd.arg("-filter_threads") .arg("1") .arg("-filter_complex_threads") .arg("1") .arg("-threads") .arg("1") // Keep only a simple A/V set (ignore subs/data, drop metadata). .arg("-map") .arg("0:v:0") .arg("-map") .arg("0:a:0?") .arg("-sn") .arg("-dn") .arg("-map_metadata") .arg("-1"); // Filter graph: split and scale into N variants. let mut filter = String::new(); filter.push_str(&format!("[0:v]split={}", variants.len())); for i in 0..variants.len() { filter.push_str(&format!("[v{i}]")); } filter.push(';'); for (i, v) in variants.iter().enumerate() { filter.push_str(&format!( "[v{i}]scale=w={}:h={}:flags=bicubic[v{i}o];", v.width, v.height )); } cmd.arg("-filter_complex").arg(filter); for (i, v) in variants.iter().enumerate() { let out_variant_dir = output_dir.join(v.id); let seg_template = out_variant_dir.join("segment_%06d.m4s"); let seg_template = seg_template .to_str() .ok_or_else(|| anyhow!("invalid segment template path"))? .to_string(); let v_bitrate = format!("{}k", v.video_bitrate_kbps); let bufsize = format!("{}k", v.video_bitrate_kbps.saturating_mul(2)); cmd.arg("-map") .arg(format!("[v{i}o]")) .arg("-map") .arg("0:a:0?") .arg("-c:v") .arg("libx264") .arg("-b:v") .arg(v_bitrate) .arg("-maxrate") .arg(format!("{}k", v.video_bitrate_kbps)) .arg("-bufsize") .arg(bufsize); for arg in default_encoder_args() { cmd.arg(arg); } cmd.arg("-f") .arg("hls") .arg("-hls_time") .arg(segment_time) .arg("-hls_list_size") .arg(hls_list_size.to_string()) .arg("-hls_flags") .arg(if delete_segments { "delete_segments+append_list+independent_segments" } else { "append_list+independent_segments" }) .arg("-hls_segment_type") .arg("fmp4") .arg("-hls_fmp4_init_filename") .arg("init.mp4") .arg("-hls_segment_filename") .arg(seg_template) .arg(out_variant_dir.join("index.m3u8")); } if reader.is_some() { cmd.stdin(Stdio::piped()); } else { cmd.stdin(Stdio::null()); } cmd.stdout(Stdio::null()).stderr(Stdio::inherit()); let mut child = cmd .spawn() .with_context(|| "failed to spawn ffmpeg".to_string())?; if let Some(reader) = reader { let mut stdin = child .stdin .take() .ok_or_else(|| anyhow!("ffmpeg stdin unavailable"))?; std::thread::spawn(move || { let mut reader = reader; let _ = std::io::copy(&mut reader, &mut stdin); }); } Ok(child) } fn parse_network_secret(value: Option) -> Result>> { let value = value.or_else(|| std::env::var("EVERY_CHANNEL_NETWORK_SECRET").ok()); let Some(value) = value else { return Ok(None) }; let bytes = hex::decode(value).context("network secret must be hex")?; Ok(Some(bytes)) } fn parse_discovery(value: Option<&str>) -> Result { if let Some(value) = value { let trimmed = value.trim(); if !trimmed.is_empty() { return ec_iroh::DiscoveryConfig::from_list(trimmed).map_err(|err| err.to_string()); } } ec_iroh::DiscoveryConfig::from_env().map_err(|err| err.to_string()) } const DEFAULT_SEGMENT_MS: u64 = 2000; fn parse_gossip_peers(mut peers: Vec) -> Vec { if peers.is_empty() { if let Ok(env_peers) = std::env::var("EVERY_CHANNEL_GOSSIP_PEERS") { peers = env_peers .split(',') .map(|peer| peer.trim().to_string()) .filter(|peer| !peer.is_empty()) .collect(); } } peers } fn merge_peer_strings(peers: Vec) -> Vec { let mut seen = HashSet::new(); let mut merged = Vec::new(); for peer in peers { let trimmed = peer.trim(); if trimmed.is_empty() { continue; } if seen.insert(trimmed.to_string()) { merged.push(trimmed.to_string()); } } merged } fn build_object_bytes( key_id: &str, chunk_index: u64, chunk_duration_27mhz: u64, sync_status: &str, plaintext: Vec, network_secret: Option<&[u8]>, content_type: &str, manifest_id: Option<&str>, ) -> Result { let chunk_hash = blake3::hash(&plaintext).to_hex().to_string(); let created_unix_ms = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_millis() as u64; let timing = TimingMeta { chunk_index, chunk_start_27mhz: 0, chunk_duration_27mhz, utc_start_unix: None, sync_status: sync_status.to_string(), }; let encrypted = encrypt_stream_data(key_id, chunk_index, &plaintext, network_secret); let meta = ObjectMeta { created_unix_ms, content_type: content_type.to_string(), size_bytes: encrypted.ciphertext.len() as u64, timing: Some(timing), encryption: Some(ec_moq::EncryptionMeta { alg: encrypted.alg.to_string(), key_id: key_id.to_string(), nonce_hex: hex::encode(encrypted.nonce), }), chunk_hash: Some(chunk_hash), chunk_hash_alg: Some("blake3".to_string()), chunk_proof: None, chunk_proof_alg: None, manifest_id: manifest_id.map(|s| s.to_string()), }; Ok(ObjectPayload { meta, data: encrypted.ciphertext, }) } fn build_catalog_entry( descriptor: &StreamDescriptor, endpoint_addr: &str, broadcast_name: &str, track_name: &str, ) -> StreamCatalogEntry { let encryption = StreamEncryptionInfo { alg: ENCRYPTION_ALG.to_string(), key_id: descriptor.id.0.clone(), nonce_scheme: "blake3(stream-id,chunk-index)".to_string(), }; let moq = MoqStreamDescriptor { endpoint: endpoint_addr.to_string(), broadcast_name: broadcast_name.to_string(), track_name: track_name.to_string(), encryption: Some(encryption), }; StreamCatalogEntry { stream: descriptor.clone(), moq: Some(moq), manifest: None, updated_unix_ms: SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_millis() as u64, } } fn default_encoder_args() -> Vec<&'static str> { vec![ "-c:a", "aac", "-b:a", "128k", "-ac", "2", "-ar", "48000", "-pix_fmt", "yuv420p", "-g", "60", "-keyint_min", "60", "-sc_threshold", "0", "-bf", "0", "-threads", "1", "-fflags", "+bitexact", "-flags:v", "+bitexact", "-flags:a", "+bitexact", ] } #[cfg(test)] mod tests { use super::*; #[test] fn normalize_host_strips_scheme_path_and_slash() { // Use documentation IPs (RFC 5737) in tests. assert_eq!(normalize_host("http://192.0.2.1/"), "192.0.2.1"); assert_eq!( normalize_host("https://example.local/foo/bar"), "example.local" ); assert_eq!(normalize_host(" 192.0.2.3 "), "192.0.2.3"); } #[test] fn linux_dvb_url_roundtrips_parse() { let config = LinuxDvbConfig { adapter: 1, frontend: 0, dvr: 2, tune_command: Some(vec![ "dvbv5-zap".to_string(), "-r".to_string(), "Channel Name".to_string(), ]), tune_timeout_ms: Some(800), }; let url = linux_dvb_url(&config); let parsed = Url::parse(&url).unwrap(); let out = parse_linux_dvb_url(&parsed).unwrap(); assert_eq!(out.adapter, 1); assert_eq!(out.dvr, 2); assert_eq!(out.tune_timeout_ms, Some(800)); assert_eq!(out.tune_command.unwrap()[0], "dvbv5-zap"); } #[test] fn stream_dir_name_sanitizes_non_alnum() { assert_eq!( stream_dir_name("ec/stream/v1/source/hdhr"), "ec_stream_v1_source_hdhr" ); assert_eq!(stream_dir_name("a b+c"), "a_b_c"); } #[test] fn merge_source_descriptors_dedupes_manual_hdhr_by_device_id() { let device = HdhomerunDevice { id: ec_core::DeviceId("ABCDEF01".to_string()), ip: "10.0.0.1".to_string(), tuner_count: 4, lineup_url: None, discover_url: None, base_url: None, device_auth: None, friendly_name: Some("HDHR".to_string()), model_number: None, firmware_name: None, firmware_version: None, device_type: None, discovery_tags: Vec::new(), raw_discover_json: None, }; let sources = vec![source_descriptor_for_device(&device, "online")]; let merged = merge_source_descriptors(sources, [(&"host".to_string(), &device)]); let count = merged.iter().filter(|s| s.kind == "hdhr").count(); assert_eq!(count, 1); } #[test] fn write_hls_master_playlist_includes_variants() { let dir = std::env::temp_dir().join(format!( "ec-tauri-master-{}", SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_millis() )); let _ = fs::remove_dir_all(&dir); let variants = default_cmaf_variants(); write_hls_master_playlist(&dir, &variants, 128_000).unwrap(); let text = fs::read_to_string(dir.join("index.m3u8")).unwrap(); assert!(text.contains("#EXT-X-STREAM-INF")); assert!(text.contains("1080p/index.m3u8")); assert!(text.contains("720p/index.m3u8")); assert!(text.contains("480p/index.m3u8")); let _ = fs::remove_dir_all(&dir); } #[test] fn derive_variant_stream_id_is_stable() { assert_eq!( derive_variant_stream_id("every.channel/x", "720p"), "every.channel/x/variant-720p" ); assert_eq!( derive_variant_stream_id("every.channel/x/", "A B"), "every.channel/x/variant-a_b" ); } }