//! Deterministic chunking and transcode scaffolding. use ac_ffmpeg::format::{ demuxer::Demuxer, io::IO, muxer::{Muxer, OutputFormat}, }; use anyhow::{anyhow, Context, Result}; use ec_core::{ merkle_root_from_hashes, DeterminismProfile, ManifestBody, StreamId, StreamMetadata, }; use ec_ts::{SectionAssembler, TimeSyncEngine, TimeSyncUpdate, TsReader}; use serde::{Deserialize, Serialize}; use std::fs; use std::io::{Read, Write}; use std::path::{Path, PathBuf}; use std::process::{Child, Command, Stdio}; use std::time::Duration; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StreamProbe { pub index: usize, pub kind: String, pub decoder: Option, pub width: Option, pub height: Option, pub sample_rate: Option, pub channels: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] pub enum ChunkFormat { Fmp4, MpegTs, Matroska, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ChunkerConfig { pub output_dir: PathBuf, pub segment_duration_ms: u64, pub segment_template: String, pub format: ChunkFormat, pub profile: DeterminismProfile, } impl ChunkerConfig { pub fn default_segment_template() -> String { "segment_%06d.m4s".to_string() } } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ChunkSegment { pub index: usize, pub path: PathBuf, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ChunkManifest { pub output_dir: PathBuf, pub segments: Vec, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TsChunk { pub index: u64, pub path: PathBuf, pub timing: ChunkTiming, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct HashedTsChunk { pub index: u64, pub path: PathBuf, pub timing: ChunkTiming, pub hash: String, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct HashedTsChunkManifest { pub output_dir: PathBuf, pub chunks: Vec, pub merkle_root: String, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ChunkTiming { pub chunk_index: u64, pub chunk_start_27mhz: Option, pub chunk_duration_27mhz: u64, pub utc_start_unix: Option, pub sync_status: String, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TsChunkManifest { pub output_dir: PathBuf, pub chunks: Vec, } #[derive(Debug, Clone)] pub enum ChunkerInput { Url(String), File(PathBuf), } #[derive(Debug)] pub struct SegmenterProcess { pub child: Child, pub output_dir: PathBuf, } #[derive(Debug, Clone)] pub struct FfmpegCliSegmenter { pub ffmpeg_bin: PathBuf, } impl Default for FfmpegCliSegmenter { fn default() -> Self { Self { ffmpeg_bin: PathBuf::from("ffmpeg"), } } } impl FfmpegCliSegmenter { pub fn spawn(&self, input: ChunkerInput, config: &ChunkerConfig) -> Result { fs::create_dir_all(&config.output_dir) .with_context(|| format!("failed to create {}", config.output_dir.display()))?; let input_arg = match input { ChunkerInput::Url(url) => url, ChunkerInput::File(path) => path .to_str() .ok_or_else(|| anyhow!("invalid input path"))? .to_string(), }; let segment_time = format!("{:.3}", config.segment_duration_ms as f64 / 1000.0); let output_template = config.output_dir.join(&config.segment_template); let output_template = output_template .to_str() .ok_or_else(|| anyhow!("invalid output template path"))? .to_string(); let mut cmd = Command::new(&self.ffmpeg_bin); cmd.arg("-hide_banner") .arg("-loglevel") .arg("error") .arg("-nostdin") .arg("-y") .arg("-i") .arg(&input_arg); for arg in ffmpeg_profile_args(&config.profile) { cmd.arg(arg); } cmd.arg("-f") .arg("segment") .arg("-segment_time") .arg(segment_time) .arg("-reset_timestamps") .arg("1") .arg("-segment_format") .arg(segment_format_arg(&config.format)) .arg(&output_template) .stdin(Stdio::null()) .stdout(Stdio::null()) .stderr(Stdio::inherit()); let child = cmd .spawn() .with_context(|| "failed to spawn ffmpeg".to_string())?; Ok(SegmenterProcess { child, output_dir: config.output_dir.clone(), }) } } pub fn collect_segments(output_dir: &Path) -> Result { let mut entries = fs::read_dir(output_dir)? .filter_map(Result::ok) .filter(|entry| entry.file_type().map(|t| t.is_file()).unwrap_or(false)) .map(|entry| entry.path()) .collect::>(); entries.sort(); let segments = entries .into_iter() .enumerate() .map(|(index, path)| ChunkSegment { index, path }) .collect(); Ok(ChunkManifest { output_dir: output_dir.to_path_buf(), segments, }) } pub fn probe_read_stream(stream: T) -> Result> { let io = IO::from_read_stream(stream); let demuxer = Demuxer::builder() .build(io) .map_err(|err| anyhow!(err.to_string()))?; let demuxer = demuxer .find_stream_info(Some(Duration::from_secs(2))) .map_err(|(_, err)| anyhow!(err.to_string()))?; let mut probes = Vec::new(); for (index, stream) in demuxer.streams().iter().enumerate() { let params = stream.codec_parameters(); let mut probe = StreamProbe { index, kind: if params.is_video_codec() { "video".to_string() } else if params.is_audio_codec() { "audio".to_string() } else if params.is_subtitle_codec() { "subtitle".to_string() } else { "data".to_string() }, decoder: params.decoder_name().map(|name| name.to_string()), width: None, height: None, sample_rate: None, channels: None, }; if let Some(video) = params.as_video_codec_parameters() { probe.width = Some(video.width()); probe.height = Some(video.height()); } if let Some(audio) = params.as_audio_codec_parameters() { probe.sample_rate = Some(audio.sample_rate()); probe.channels = Some(audio.channel_layout().channels()); } probes.push(probe); } Ok(probes) } pub fn analyze_ts_time( stream: T, chunk_duration_ms: u64, max_events: usize, ) -> Result> { let mut reader = TsReader::new(stream); let mut assembler = SectionAssembler::default(); let mut engine = TimeSyncEngine::new(chunk_duration_ms); let mut events = Vec::new(); while let Some(packet) = reader.read_packet()? { for update in engine.ingest_packet(&packet, &mut assembler) { events.push(update); if events.len() >= max_events { return Ok(events); } } } Ok(events) } pub fn chunk_ts_stream( stream: T, output_dir: &Path, chunk_duration_ms: u64, max_chunks: Option, ) -> Result { let mut chunks = Vec::new(); chunk_ts_stream_live(stream, output_dir, chunk_duration_ms, max_chunks, |chunk| { chunks.push(chunk); Ok(()) })?; Ok(TsChunkManifest { output_dir: output_dir.to_path_buf(), chunks, }) } pub fn chunk_ts_stream_live Result<()>>( stream: T, output_dir: &Path, chunk_duration_ms: u64, max_chunks: Option, mut on_chunk: F, ) -> Result<()> { fs::create_dir_all(output_dir) .with_context(|| format!("failed to create {}", output_dir.display()))?; let mut reader = TsReader::new(stream); let mut assembler = SectionAssembler::default(); let mut engine = TimeSyncEngine::new(chunk_duration_ms); let mut current_index: Option = None; let mut current_file: Option = None; let mut current_timing: Option = None; let mut emitted = 0usize; let mut close_and_emit = |index: u64, timing: ChunkTiming, file: std::fs::File| -> Result { drop(file); let path = chunk_path(output_dir, index); on_chunk(TsChunk { index, path, timing, })?; emitted += 1; Ok(max_chunks.map(|limit| emitted >= limit).unwrap_or(false)) }; while let Some(packet) = reader.read_packet()? { let updates = engine.ingest_packet(&packet, &mut assembler); for update in updates { if update.discontinuity { if let (Some(index), Some(timing), Some(file)) = ( current_index.take(), current_timing.take(), current_file.take(), ) { if close_and_emit(index, timing, file)? { return Ok(()); } } } if let Some(index) = update.chunk_index { if current_index != Some(index) { if let (Some(prev_index), Some(timing), Some(file)) = ( current_index.take(), current_timing.take(), current_file.take(), ) { if close_and_emit(prev_index, timing, file)? { return Ok(()); } } let path = chunk_path(output_dir, index); let file = std::fs::File::create(&path) .with_context(|| format!("failed to create {}", path.display()))?; current_file = Some(file); current_index = Some(index); current_timing = Some(ChunkTiming { chunk_index: index, chunk_start_27mhz: update.chunk_start_27mhz, chunk_duration_27mhz: chunk_duration_ms * 27_000, utc_start_unix: update.utc_start_unix, sync_status: if update.synced { "synced".to_string() } else { "unsynced".to_string() }, }); } } } if let Some(file) = current_file.as_mut() { file.write_all(packet.as_bytes())?; } } if let (Some(index), Some(timing), Some(file)) = ( current_index.take(), current_timing.take(), current_file.take(), ) { let _ = close_and_emit(index, timing, file); } Ok(()) } fn chunk_path(output_dir: &Path, index: u64) -> PathBuf { output_dir.join(format!("chunk_{index:010}.ts")) } pub fn hash_file_blake3(path: &Path) -> Result { let mut file = fs::File::open(path).with_context(|| format!("failed to open {}", path.display()))?; let mut hasher = blake3::Hasher::new(); let mut buffer = [0u8; 8192]; loop { let read = file.read(&mut buffer)?; if read == 0 { break; } hasher.update(&buffer[..read]); } Ok(hasher.finalize().to_hex().to_string()) } pub fn chunk_stream_ffmpeg( stream: T, output_dir: &Path, chunk_duration_ms: u64, max_chunks: Option, ) -> Result { fs::create_dir_all(output_dir) .with_context(|| format!("failed to create {}", output_dir.display()))?; let io = IO::from_read_stream(stream); let demuxer = Demuxer::builder() .build(io) .map_err(|err| anyhow!(err.to_string()))?; let demuxer = demuxer .find_stream_info(Some(Duration::from_secs(2))) .map_err(|(_, err)| anyhow!(err.to_string()))?; let stream_info = demuxer .streams() .iter() .map(|stream| (stream.codec_parameters(), stream.time_base())) .collect::>(); let mut demuxer = demuxer.into_demuxer(); let chunk_duration_micros = chunk_duration_ms as i64 * 1000; let mut chunks = Vec::new(); let mut current_index: Option = None; let mut current_muxer: Option> = None; let mut current_timing: Option = None; loop { let Some(packet) = demuxer.take().map_err(|err| anyhow!(err.to_string()))? else { break; }; let ts = packet .pts() .as_micros() .or_else(|| packet.dts().as_micros()); let chunk_index = ts .and_then(|micros| { if micros < 0 { None } else { Some((micros / chunk_duration_micros) as u64) } }) .or(current_index); if let Some(index) = chunk_index { if current_index != Some(index) { if let Some(mut muxer) = current_muxer.take() { muxer.flush().map_err(|err| anyhow!(err.to_string()))?; let _ = muxer.close(); } if let (Some(prev_index), Some(timing)) = (current_index.take(), current_timing.take()) { chunks.push(TsChunk { index: prev_index, path: chunk_path(output_dir, prev_index), timing, }); } let path = chunk_path(output_dir, index); let file = std::fs::File::create(&path) .with_context(|| format!("failed to create {}", path.display()))?; let io = IO::from_write_stream(file); let mut builder = Muxer::builder(); for (params, _) in &stream_info { builder .add_stream(params) .map_err(|err| anyhow!(err.to_string()))?; } for (stream, (_, tb)) in builder.streams_mut().iter_mut().zip(stream_info.iter()) { stream.set_time_base(*tb); } let format = OutputFormat::find_by_name("mpegts") .ok_or_else(|| anyhow!("mpegts format not found"))?; let muxer = builder .interleaved(true) .build(io, format) .map_err(|err| anyhow!(err.to_string()))?; current_muxer = Some(muxer); current_index = Some(index); current_timing = Some(ChunkTiming { chunk_index: index, chunk_start_27mhz: ts.map(|micros| (micros as u64) * 27), chunk_duration_27mhz: chunk_duration_ms * 27_000, utc_start_unix: None, sync_status: "pts".to_string(), }); if let Some(limit) = max_chunks { if chunks.len() >= limit { break; } } } } if let Some(muxer) = current_muxer.as_mut() { let packet = packet.with_time_base(ac_ffmpeg::time::TimeBase::MICROSECONDS); muxer.push(packet).map_err(|err| anyhow!(err.to_string()))?; } } if let Some(mut muxer) = current_muxer.take() { let _ = muxer.flush(); let _ = muxer.close(); } if let (Some(index), Some(timing)) = (current_index.take(), current_timing.take()) { chunks.push(TsChunk { index, path: chunk_path(output_dir, index), timing, }); } Ok(TsChunkManifest { output_dir: output_dir.to_path_buf(), chunks, }) } pub fn hash_ts_chunks(manifest: &TsChunkManifest) -> Result { let mut ordered = manifest.chunks.clone(); ordered.sort_by_key(|chunk| chunk.index); let mut hashes = Vec::with_capacity(ordered.len()); let mut chunks = Vec::with_capacity(ordered.len()); for chunk in ordered { let hash = hash_file_blake3(&chunk.path)?; hashes.push(hash.clone()); chunks.push(HashedTsChunk { index: chunk.index, path: chunk.path.clone(), timing: chunk.timing.clone(), hash, }); } let merkle_root = merkle_root_from_hashes(&hashes)?; Ok(HashedTsChunkManifest { output_dir: manifest.output_dir.clone(), chunks, merkle_root, }) } pub fn build_manifest_body_for_chunks( stream_id: StreamId, epoch_id: impl Into, chunk_duration_ms: u64, chunk_start_index: u64, encoder_profile_id: impl Into, created_unix_ms: u64, metadata: Vec, chunk_hashes: &[String], ) -> Result { let merkle_root = merkle_root_from_hashes(chunk_hashes)?; Ok(ManifestBody { stream_id, epoch_id: epoch_id.into(), chunk_duration_ms, total_chunks: chunk_hashes.len() as u64, chunk_start_index, encoder_profile_id: encoder_profile_id.into(), merkle_root, created_unix_ms, metadata, chunk_hashes: chunk_hashes.to_vec(), variants: None, }) } pub fn manifest_for_ts_chunks( stream_id: StreamId, epoch_id: impl Into, chunk_duration_ms: u64, chunk_start_index: u64, encoder_profile_id: impl Into, created_unix_ms: u64, metadata: Vec, manifest: &TsChunkManifest, ) -> Result<(ManifestBody, HashedTsChunkManifest)> { let hashed = hash_ts_chunks(manifest)?; let chunk_hashes = hashed .chunks .iter() .map(|chunk| chunk.hash.clone()) .collect::>(); let body = build_manifest_body_for_chunks( stream_id, epoch_id, chunk_duration_ms, chunk_start_index, encoder_profile_id, created_unix_ms, metadata, &chunk_hashes, )?; Ok((body, hashed)) } pub fn chunk_stream_ffmpeg_live Result<()>>( stream: T, output_dir: &Path, chunk_duration_ms: u64, max_chunks: Option, mut on_chunk: F, ) -> Result<()> { fs::create_dir_all(output_dir) .with_context(|| format!("failed to create {}", output_dir.display()))?; let io = IO::from_read_stream(stream); let demuxer = Demuxer::builder() .build(io) .map_err(|err| anyhow!(err.to_string()))?; let demuxer = demuxer .find_stream_info(Some(Duration::from_secs(2))) .map_err(|(_, err)| anyhow!(err.to_string()))?; let stream_info = demuxer .streams() .iter() .map(|stream| (stream.codec_parameters(), stream.time_base())) .collect::>(); let mut demuxer = demuxer.into_demuxer(); let chunk_duration_micros = chunk_duration_ms as i64 * 1000; let mut current_index: Option = None; let mut current_muxer: Option> = None; let mut current_timing: Option = None; let mut emitted = 0usize; loop { let Some(packet) = demuxer.take().map_err(|err| anyhow!(err.to_string()))? else { break; }; let ts = packet .pts() .as_micros() .or_else(|| packet.dts().as_micros()); let chunk_index = ts .and_then(|micros| { if micros < 0 { None } else { Some((micros / chunk_duration_micros) as u64) } }) .or(current_index); if let Some(index) = chunk_index { if current_index != Some(index) { if let Some(mut muxer) = current_muxer.take() { muxer.flush().map_err(|err| anyhow!(err.to_string()))?; let _ = muxer.close(); } if let (Some(prev_index), Some(timing)) = (current_index.take(), current_timing.take()) { let chunk = TsChunk { index: prev_index, path: chunk_path(output_dir, prev_index), timing, }; on_chunk(chunk)?; emitted += 1; if let Some(limit) = max_chunks { if emitted >= limit { return Ok(()); } } } let path = chunk_path(output_dir, index); let file = std::fs::File::create(&path) .with_context(|| format!("failed to create {}", path.display()))?; let io = IO::from_write_stream(file); let mut builder = Muxer::builder(); for (params, _) in &stream_info { builder .add_stream(params) .map_err(|err| anyhow!(err.to_string()))?; } for (stream, (_, tb)) in builder.streams_mut().iter_mut().zip(stream_info.iter()) { stream.set_time_base(*tb); } let format = OutputFormat::find_by_name("mpegts") .ok_or_else(|| anyhow!("mpegts format not found"))?; let muxer = builder .interleaved(true) .build(io, format) .map_err(|err| anyhow!(err.to_string()))?; current_muxer = Some(muxer); current_index = Some(index); current_timing = Some(ChunkTiming { chunk_index: index, chunk_start_27mhz: ts.map(|micros| (micros as u64) * 27), chunk_duration_27mhz: chunk_duration_ms * 27_000, utc_start_unix: None, sync_status: "pts".to_string(), }); } } if let Some(muxer) = current_muxer.as_mut() { let packet = packet.with_time_base(ac_ffmpeg::time::TimeBase::MICROSECONDS); muxer.push(packet).map_err(|err| anyhow!(err.to_string()))?; } } if let Some(mut muxer) = current_muxer.take() { let _ = muxer.flush(); let _ = muxer.close(); } if let (Some(index), Some(timing)) = (current_index.take(), current_timing.take()) { let chunk = TsChunk { index, path: chunk_path(output_dir, index), timing, }; on_chunk(chunk)?; } Ok(()) } fn segment_format_arg(format: &ChunkFormat) -> &'static str { match format { ChunkFormat::Fmp4 => "mp4", ChunkFormat::MpegTs => "mpegts", ChunkFormat::Matroska => "matroska", } } pub fn ffmpeg_profile_args(profile: &DeterminismProfile) -> Vec { let mut args = Vec::new(); if !profile.encoder.is_empty() { args.push("-c:v".to_string()); args.push(profile.encoder.clone()); } for arg in &profile.encoder_args { args.push(arg.clone()); } args } pub fn deterministic_h264_profile() -> DeterminismProfile { DeterminismProfile { name: "deterministic-h264-aac".to_string(), description: "Single-threaded H.264 + AAC with fixed GOP and bitexact flags".to_string(), encoder: "libx264".to_string(), encoder_args: vec![ "-c:a".to_string(), "aac".to_string(), "-b:a".to_string(), "128k".to_string(), "-ac".to_string(), "2".to_string(), "-ar".to_string(), "48000".to_string(), "-pix_fmt".to_string(), "yuv420p".to_string(), "-g".to_string(), "60".to_string(), "-keyint_min".to_string(), "60".to_string(), "-sc_threshold".to_string(), "0".to_string(), "-bf".to_string(), "0".to_string(), "-threads".to_string(), "1".to_string(), "-fflags".to_string(), "+bitexact".to_string(), "-flags:v".to_string(), "+bitexact".to_string(), "-flags:a".to_string(), "+bitexact".to_string(), ], chunk_duration_ms: 2000, } } #[cfg(test)] mod tests { use super::*; use std::io::Cursor; fn ts_packet_with_pcr(pid: u16, cc: u8, pcr_27mhz: u64) -> [u8; ec_ts::TS_PACKET_SIZE] { // Match ec_ts parser expectations. let base = pcr_27mhz / 300; let ext = pcr_27mhz % 300; let mut pcr = [0u8; 6]; pcr[0] = ((base >> 25) & 0xFF) as u8; pcr[1] = ((base >> 17) & 0xFF) as u8; pcr[2] = ((base >> 9) & 0xFF) as u8; pcr[3] = ((base >> 1) & 0xFF) as u8; pcr[4] = (((base & 0x1) << 7) as u8) | 0x7E | (((ext >> 8) & 0x1) as u8); pcr[5] = (ext & 0xFF) as u8; let mut data = [0u8; ec_ts::TS_PACKET_SIZE]; data[0] = 0x47; data[1] = ((pid >> 8) as u8) & 0x1F; data[2] = (pid & 0xFF) as u8; data[3] = (2 << 4) | (cc & 0x0F); // adaptation only data[4] = 7; data[5] = 0x10; data[6..12].copy_from_slice(&pcr); data } #[test] fn segment_format_mapping_is_correct() { assert_eq!(segment_format_arg(&ChunkFormat::Fmp4), "mp4"); assert_eq!(segment_format_arg(&ChunkFormat::MpegTs), "mpegts"); assert_eq!(segment_format_arg(&ChunkFormat::Matroska), "matroska"); } #[test] fn deterministic_profile_args_are_single_threaded_and_bitexact() { let profile = deterministic_h264_profile(); let args = ffmpeg_profile_args(&profile); assert!(args.iter().any(|a| a == "-threads")); assert!(args.iter().any(|a| a == "1")); assert!(args.iter().any(|a| a == "+bitexact")); assert!(args.iter().any(|a| a == "libx264")); } #[test] fn hash_file_blake3_matches_direct_hash() { let dir = std::env::temp_dir().join(format!("ec-chopper-hash-{}", std::process::id())); let _ = fs::create_dir_all(&dir); let path = dir.join("x.bin"); fs::write(&path, b"hello").unwrap(); let h = hash_file_blake3(&path).unwrap(); assert_eq!(h, blake3::hash(b"hello").to_hex().to_string()); let _ = fs::remove_file(&path); } #[test] fn chunk_ts_stream_emits_expected_chunk_indices() { let chunk_ms = 1000u64; let dir = std::env::temp_dir().join(format!("ec-chopper-chunks-{}", std::process::id())); let _ = fs::remove_dir_all(&dir); fs::create_dir_all(&dir).unwrap(); let mut bytes = Vec::new(); bytes.extend_from_slice(&ts_packet_with_pcr(0x0100, 0, 0)); bytes.extend_from_slice(&ts_packet_with_pcr(0x0100, 1, 27_000_000)); bytes.extend_from_slice(&ts_packet_with_pcr(0x0100, 2, 54_000_000)); let manifest = chunk_ts_stream(Cursor::new(bytes), &dir, chunk_ms, None).unwrap(); let indices = manifest.chunks.iter().map(|c| c.index).collect::>(); assert_eq!(indices, vec![0, 1, 2]); for chunk in &manifest.chunks { let data = fs::read(&chunk.path).unwrap(); assert_eq!(data.len() % ec_ts::TS_PACKET_SIZE, 0); } let _ = fs::remove_dir_all(&dir); } #[test] fn hashed_manifest_merkle_root_matches_core() { let dir = std::env::temp_dir().join(format!("ec-chopper-merkle-{}", std::process::id())); let _ = fs::remove_dir_all(&dir); fs::create_dir_all(&dir).unwrap(); let mut bytes = Vec::new(); bytes.extend_from_slice(&ts_packet_with_pcr(0x0100, 0, 0)); bytes.extend_from_slice(&ts_packet_with_pcr(0x0100, 1, 27_000_000)); let manifest = chunk_ts_stream(Cursor::new(bytes), &dir, 1000, None).unwrap(); let hashed = hash_ts_chunks(&manifest).unwrap(); let hashes = hashed .chunks .iter() .map(|c| c.hash.clone()) .collect::>(); let expected = ec_core::merkle_root_from_hashes(&hashes).unwrap(); assert_eq!(hashed.merkle_root, expected); let _ = fs::remove_dir_all(&dir); } }