every.channel/crates/ec-core/tests/simulation.rs
every.channel 91dad67fc2
Some checks failed
deploy-cloudflare / checks (push) Failing after 3s
ci-gates / checks (push) Failing after 5s
deploy-cloudflare / deploy (push) Has been skipped
Add duplicate publisher determinism proof
2026-06-10 03:33:46 -07:00

986 lines
34 KiB
Rust

use ec_core::sim::{
check_control_plane_propagation_invariants, check_duplicate_publisher_invariants,
check_system_duplicate_publisher_invariants, run_control_plane_propagation_campaign,
run_control_plane_propagation_simulation, run_duplicate_publisher_campaign,
run_duplicate_publisher_simulation, run_seeded_simulation_campaign,
run_system_duplicate_publisher_campaign, run_system_duplicate_publisher_simulation,
shrink_duplicate_publisher_failure, simulated_media_hash,
ControlPlanePropagationInvariantConfig, ControlPlanePropagationScenario,
ControlPlaneTraceEvent, DeterministicSimulation, DuplicatePublisherInvariantConfig,
DuplicatePublisherScenario, EncoderDriftFault, FoundationStyleSystemScenarioConfig,
PublisherSequenceClock, SimulationOutage, SimulationPartition, SimulationSeed,
SystemDuplicatePublisherInvariantConfig, SystemDuplicatePublisherScenario,
};
const STREAM: &str = "la-kcop";
const RENDITION: &str = "720p";
const TRACK: &str = "0.m4s";
const PROFILE: &str = "x264-hd3-v1";
fn schedule_publisher_window(
sim: &mut DeterministicSimulation,
node: &str,
start_sequence: u64,
end_sequence: u64,
first_delivery_ms: u64,
step_ms: u64,
profile: &str,
) {
for sequence in start_sequence..end_sequence {
let hash = simulated_media_hash(STREAM, RENDITION, TRACK, sequence, profile);
sim.schedule_observation(
first_delivery_ms + (sequence - start_sequence) * step_ms,
node,
STREAM,
RENDITION,
TRACK,
sequence,
&hash,
);
}
}
#[test]
fn duplicate_publishers_converge_after_delayed_backfill() {
let mut sim = DeterministicSimulation::new();
schedule_publisher_window(&mut sim, "nuc-a", 0, 12, 0, 10, PROFILE);
schedule_publisher_window(&mut sim, "nuc-b", 0, 4, 30, 10, PROFILE);
schedule_publisher_window(&mut sim, "nuc-b", 4, 12, 500, 10, PROFILE);
sim.run_until(250);
let before_backfill = sim.convergence().summarize(STREAM, RENDITION, TRACK, 0, 12);
assert_eq!(before_backfill.expected_sequences, 12);
assert_eq!(before_backfill.missing_sequences, Vec::<u64>::new());
assert_eq!(
before_backfill.matching_duplicate_sequences,
vec![0, 1, 2, 3]
);
assert!(before_backfill.ok());
sim.run_to_idle();
let after_backfill = sim.convergence().summarize(STREAM, RENDITION, TRACK, 0, 12);
let duplicate_complete_at_ms = sim
.convergence()
.duplicate_complete_at_ms(STREAM, RENDITION, TRACK, 0, 12);
assert_eq!(after_backfill.missing_sequences, Vec::<u64>::new());
assert_eq!(after_backfill.divergent_sequences, Vec::<u64>::new());
assert_eq!(
after_backfill.matching_duplicate_sequences,
(0_u64..12).collect::<Vec<_>>()
);
assert_eq!(after_backfill.duplicate_source_records, 24);
assert_eq!(duplicate_complete_at_ms, Some(570));
assert_eq!(sim.trace().len(), 24);
assert!(
sim.trace()
.windows(2)
.all(|pair| (pair[0].at_ms, pair[0].order) <= (pair[1].at_ms, pair[1].order)),
"trace should preserve deterministic event order"
);
assert!(after_backfill.ok());
}
#[test]
fn media_convergence_can_summarize_sparse_observed_sequences() {
let mut sim = DeterministicSimulation::new();
for sequence in [7_287_381_184_512, 7_287_381_188_608] {
let hash = simulated_media_hash(STREAM, RENDITION, TRACK, sequence, PROFILE);
sim.schedule_observation(0, "nuc-a", STREAM, RENDITION, TRACK, sequence, &hash);
sim.schedule_observation(1, "nuc-b", STREAM, RENDITION, TRACK, sequence, &hash);
}
sim.run_to_idle();
let dense = sim.convergence().summarize(
STREAM,
RENDITION,
TRACK,
7_287_381_184_512,
7_287_381_188_609,
);
let sparse = sim.convergence().summarize_observed_sequences(
STREAM,
RENDITION,
TRACK,
7_287_381_184_512,
7_287_381_188_609,
);
assert!(!dense.missing_sequences.is_empty());
assert_eq!(sparse.expected_sequences, 2);
assert_eq!(sparse.missing_sequences, Vec::<u64>::new());
assert_eq!(
sparse.matching_duplicate_sequences,
vec![7_287_381_184_512, 7_287_381_188_608]
);
assert!(sparse.ok());
}
#[test]
fn duplicate_publisher_simulation_detects_encoder_drift() {
let mut sim = DeterministicSimulation::new();
schedule_publisher_window(&mut sim, "nuc-a", 0, 8, 0, 10, PROFILE);
schedule_publisher_window(&mut sim, "nuc-b", 0, 8, 5, 10, PROFILE);
let drift_hash = simulated_media_hash(STREAM, RENDITION, TRACK, 4, "x264-hd3-drift");
sim.schedule_observation(90, "nuc-b", STREAM, RENDITION, TRACK, 4, &drift_hash);
sim.run_to_idle();
let summary = sim.convergence().summarize(STREAM, RENDITION, TRACK, 0, 8);
assert_eq!(summary.missing_sequences, Vec::<u64>::new());
assert_eq!(summary.divergent_sequences, vec![4]);
assert!(!summary.ok());
}
#[test]
fn duplicate_publisher_fault_schedule_replays_from_seed() {
let scenario = faulted_duplicate_scenario(SimulationSeed::new(0x6d6f_712d_6475_7021));
let first = run_duplicate_publisher_simulation(&scenario);
let second = run_duplicate_publisher_simulation(&scenario);
assert_eq!(first, second);
assert!(first.duplicate_complete(), "replay {}", first.replay_hint);
assert_eq!(first.summary.matching_duplicate_sequences.len(), 48);
assert_eq!(
first.trace, second.trace,
"replayed reports should carry the same event history"
);
}
#[test]
fn duplicate_publisher_many_seed_fault_schedules_converge() {
let mut saw_transient_drop = false;
let mut saw_partition_delay = false;
let mut saw_publisher_outage = false;
for seed in 1..=96 {
let scenario = faulted_duplicate_scenario(SimulationSeed::new(seed));
let report = run_duplicate_publisher_simulation(&scenario);
saw_transient_drop |= report.fault_stats.transient_dropped_observations > 0;
saw_partition_delay |= report.fault_stats.partition_delayed_observations > 0;
saw_publisher_outage |= report.fault_stats.publisher_outage_observations > 0;
assert!(
report.duplicate_complete(),
"duplicate publisher convergence failed for {}: {:?}",
report.replay_hint,
report.summary
);
assert_eq!(report.summary.missing_sequences, Vec::<u64>::new());
assert_eq!(report.summary.divergent_sequences, Vec::<u64>::new());
assert_eq!(report.summary.duplicate_source_records, 96);
}
assert!(
saw_transient_drop,
"fault suite did not exercise transient drops"
);
assert!(
saw_partition_delay,
"fault suite did not exercise partitions"
);
assert!(
saw_publisher_outage,
"fault suite did not exercise publisher outages"
);
}
#[test]
fn seeded_fault_scenario_detects_encoder_drift() {
let mut scenario = faulted_duplicate_scenario(SimulationSeed::new(0x6472_6966_7421));
scenario
.encoder_drifts
.push(EncoderDriftFault::new("nuc-b", 17, "x264-hd3-drift"));
let report = run_duplicate_publisher_simulation(&scenario);
assert!(!report.duplicate_complete());
assert_eq!(report.summary.divergent_sequences, vec![17]);
assert_eq!(report.duplicate_complete_at_ms, None);
assert_eq!(report.fault_stats.encoder_drift_observations, 1);
}
#[test]
fn duplicate_publisher_simulation_detects_unaligned_publisher_phase() {
let mut scenario = DuplicatePublisherScenario::new(
SimulationSeed::new(0x7068_6173_652d_6275),
vec!["nuc-a".to_string(), "nuc-b".to_string()],
STREAM,
RENDITION,
TRACK,
PROFILE,
0,
8,
);
scenario.base_network_delay_ms = 0;
scenario.max_jitter_ms = 0;
scenario
.publisher_sequence_offsets
.insert("nuc-b".to_string(), 3);
let report = run_duplicate_publisher_simulation(&scenario);
let invariant = check_duplicate_publisher_invariants(
&report,
&DuplicatePublisherInvariantConfig::duplicate_complete_with_deadline(1_000),
);
assert!(!report.duplicate_complete());
assert_eq!(report.summary.missing_sequences, Vec::<u64>::new());
assert_eq!(
report.summary.matching_duplicate_sequences,
Vec::<u64>::new()
);
assert_eq!(
report.summary.divergent_sequences,
(0_u64..8).collect::<Vec<_>>()
);
assert_eq!(report.fault_stats.publisher_phase_offset_observations, 8);
assert_eq!(
invariant.failures,
vec![
"divergent_sequences".to_string(),
"media_timing_conflict_sequences".to_string(),
"duplicate_incomplete".to_string(),
"duplicate_complete_deadline_unreached".to_string(),
]
);
}
#[test]
fn duplicate_publisher_simulation_rejects_missing_media_timing() {
let mut scenario = DuplicatePublisherScenario::new(
SimulationSeed::new(0x7469_6d65_2d6d_6973),
vec!["nuc-a".to_string(), "nuc-b".to_string()],
STREAM,
RENDITION,
TRACK,
PROFILE,
0,
6,
);
scenario.base_network_delay_ms = 0;
scenario.max_jitter_ms = 0;
scenario
.missing_media_timing_publishers
.insert("nuc-b".to_string());
let report = run_duplicate_publisher_simulation(&scenario);
let invariant = check_duplicate_publisher_invariants(
&report,
&DuplicatePublisherInvariantConfig::duplicate_complete_with_deadline(1_000),
);
assert_eq!(report.summary.divergent_sequences, Vec::<u64>::new());
assert_eq!(report.summary.media_timing_missing_records, 6);
assert_eq!(
invariant.failures,
vec![
"media_timing_missing_records".to_string(),
"duplicate_incomplete".to_string(),
"duplicate_complete_deadline_unreached".to_string(),
]
);
}
#[test]
fn duplicate_publisher_simulation_rejects_conflicting_media_timing() {
let mut scenario = DuplicatePublisherScenario::new(
SimulationSeed::new(0x7469_6d65_2d73_6b65),
vec!["nuc-a".to_string(), "nuc-b".to_string()],
STREAM,
RENDITION,
TRACK,
PROFILE,
0,
6,
);
scenario.base_network_delay_ms = 0;
scenario.max_jitter_ms = 0;
scenario
.publisher_media_time_offsets_ms
.insert("nuc-b".to_string(), 17);
let report = run_duplicate_publisher_simulation(&scenario);
let invariant = check_duplicate_publisher_invariants(
&report,
&DuplicatePublisherInvariantConfig::duplicate_complete_with_deadline(1_000),
);
assert_eq!(report.summary.divergent_sequences, Vec::<u64>::new());
assert_eq!(
report.summary.media_timing_conflict_sequences,
(0_u64..6).collect::<Vec<_>>()
);
assert_eq!(
invariant.failures,
vec![
"media_timing_conflict_sequences".to_string(),
"duplicate_incomplete".to_string(),
"duplicate_complete_deadline_unreached".to_string(),
]
);
}
#[test]
fn duplicate_publisher_simulation_rejects_independent_source_material() {
let mut scenario = DuplicatePublisherScenario::new(
SimulationSeed::new(0x736f_7572_6365_6d61),
vec!["nuc-a".to_string(), "nuc-b".to_string()],
STREAM,
RENDITION,
TRACK,
PROFILE,
0,
6,
);
scenario.base_network_delay_ms = 0;
scenario.max_jitter_ms = 0;
scenario
.publisher_source_material
.insert("nuc-b".to_string(), "independent-rf-window".to_string());
let report = run_duplicate_publisher_simulation(&scenario);
let invariant = check_duplicate_publisher_invariants(
&report,
&DuplicatePublisherInvariantConfig::duplicate_complete_with_deadline(1_000),
);
assert_eq!(
report.summary.divergent_sequences,
(0_u64..6).collect::<Vec<_>>()
);
assert_eq!(
report.summary.media_timing_conflict_sequences,
Vec::<u64>::new()
);
assert_eq!(report.fault_stats.source_material_mismatch_observations, 12);
assert_eq!(
invariant.failures,
vec![
"divergent_sequences".to_string(),
"source_material_mismatch_observations".to_string(),
"duplicate_incomplete".to_string(),
"duplicate_complete_deadline_unreached".to_string(),
]
);
}
#[test]
fn duplicate_publisher_outage_backfills_after_restart() {
let mut scenario = faulted_duplicate_scenario(SimulationSeed::new(0x6f75_7461_6765));
scenario.partitions.clear();
scenario.transient_drop_per_million = 0;
scenario.publisher_outages = vec![SimulationOutage::new("nuc-b", 320, 760, 180)];
let report = run_duplicate_publisher_simulation(&scenario);
assert!(
report.duplicate_complete(),
"{} {:?}",
report.replay_hint,
report.summary
);
assert!(report.fault_stats.publisher_outage_observations > 0);
assert_eq!(
report.fault_stats.backfill_observations,
report.fault_stats.publisher_outage_observations
);
assert!(
report.duplicate_complete_at_ms.unwrap() >= 940,
"outage restart should move convergence later than the live path"
);
assert!(report.duplicate_complete_at_ms.unwrap() <= 3_000);
}
#[test]
fn duplicate_publisher_simulation_checks_convergence_deadline() {
let report = run_duplicate_publisher_simulation(&faulted_duplicate_scenario(
SimulationSeed::new(0x6465_6164_6c69_6e65),
));
let invariant = check_duplicate_publisher_invariants(
&report,
&DuplicatePublisherInvariantConfig::duplicate_complete_with_deadline(3_000),
);
assert!(
invariant.ok,
"{} {:?}",
invariant.replay_hint, invariant.failures
);
assert!(invariant.duplicate_complete_at_ms.is_some());
assert!(
invariant.duplicate_complete_at_ms.unwrap() <= 3_000,
"{} completed too late: {:?}",
invariant.replay_hint,
invariant.duplicate_complete_at_ms
);
}
#[test]
fn seeded_simulation_campaign_preserves_first_failure() {
let campaign = run_seeded_simulation_campaign(
"generic-seeded-campaign",
SimulationSeed::new(40),
8,
|seed| (seed.0 == 44).then_some(seed.replay_hint()),
);
assert!(!campaign.all_passed());
assert_eq!(campaign.passed, 7);
assert_eq!(campaign.failed, 1);
assert_eq!(
campaign.first_failure.as_deref(),
Some("EC_SIM_SEED=000000000000002c")
);
}
#[test]
fn control_plane_propagation_replays_from_seed() {
let scenario = faulted_control_plane_scenario(SimulationSeed::new(0x6374_726c_7265_706c));
let first = run_control_plane_propagation_simulation(&scenario);
let second = run_control_plane_propagation_simulation(&scenario);
assert_eq!(first, second);
assert!(
first.propagation_complete(),
"control propagation failed for {}: {:?}",
first.replay_hint,
first.missing_nodes
);
assert_eq!(first.known_count, scenario.nodes.len() as u64);
assert_eq!(
first.trace, second.trace,
"replayed control-plane schedules should carry identical traces"
);
assert!(first
.trace
.iter()
.any(|entry| matches!(entry.event, ControlPlaneTraceEvent::MessageScheduled { .. })));
assert!(first
.trace
.iter()
.any(|entry| matches!(entry.event, ControlPlaneTraceEvent::NodeLearned { .. })));
}
#[test]
fn control_plane_campaign_runs_many_fault_schedules() {
let invariant = ControlPlanePropagationInvariantConfig::complete_with_deadline(7, 900);
let campaign = run_control_plane_propagation_campaign(
"control-plane-gossip-fault-campaign",
SimulationSeed::new(1),
512,
&invariant,
faulted_control_plane_scenario,
);
assert!(
campaign.all_passed(),
"campaign failed: {:?}",
campaign.first_failure
);
assert_eq!(campaign.passed, 512);
assert_eq!(campaign.failed, 0);
assert!(campaign.total_transient_dropped_messages > 0);
assert!(campaign.total_partition_delayed_messages > 0);
assert!(campaign.total_node_outage_delayed_messages > 0);
assert!(campaign.total_duplicate_messages > 0);
assert!(campaign.max_propagation_complete_ms_observed <= 900);
}
#[test]
fn control_plane_simulation_detects_dead_fanout() {
let mut scenario = faulted_control_plane_scenario(SimulationSeed::new(0x6661_6e6f_7574));
scenario.fanout = 0;
scenario.transient_drop_per_million = 0;
scenario.partitions.clear();
scenario.node_outages.clear();
let report = run_control_plane_propagation_simulation(&scenario);
let invariant = check_control_plane_propagation_invariants(
&report,
&ControlPlanePropagationInvariantConfig::complete_with_deadline(7, 900),
);
assert!(!report.propagation_complete());
assert_eq!(report.known_nodes, vec!["nuc-a".to_string()]);
assert_eq!(report.missing_nodes.len(), 6);
assert_eq!(
invariant.failures,
vec![
"propagation_incomplete".to_string(),
"propagation_deadline_unreached".to_string(),
]
);
}
#[test]
fn duplicate_publisher_campaign_runs_many_seed_schedules() {
let invariant = DuplicatePublisherInvariantConfig::duplicate_complete_with_deadline(3_000);
let campaign = run_duplicate_publisher_campaign(
"duplicate-publisher-fault-campaign",
SimulationSeed::new(1),
512,
&invariant,
faulted_duplicate_scenario,
);
assert!(
campaign.all_passed(),
"campaign failed: {:?}",
campaign.first_failure
);
assert_eq!(campaign.passed, 512);
assert_eq!(campaign.failed, 0);
assert!(campaign.total_transient_dropped_observations > 0);
assert!(campaign.total_partition_delayed_observations > 0);
assert!(campaign.total_publisher_outage_observations > 0);
assert!(campaign.total_backfill_observations > 0);
assert!(campaign.max_duplicate_complete_ms_observed <= 3_000);
}
#[test]
fn duplicate_publisher_shrinker_minimizes_noisy_drift_failure() {
let invariant = DuplicatePublisherInvariantConfig::duplicate_complete_with_deadline(3_000);
let mut scenario = faulted_duplicate_scenario(SimulationSeed::new(19));
scenario
.encoder_drifts
.push(EncoderDriftFault::new("nuc-b", 17, "x264-hd3-drift"));
let shrunk = shrink_duplicate_publisher_failure(&scenario, &invariant)
.expect("drift should fail and be shrinkable");
assert_eq!(shrunk.seed, SimulationSeed::new(19));
assert_eq!(shrunk.scenario.expected_sequences(), 18);
assert_eq!(shrunk.scenario.partitions.len(), 0);
assert_eq!(shrunk.scenario.publisher_outages.len(), 0);
assert_eq!(shrunk.scenario.transient_drop_per_million, 0);
assert_eq!(shrunk.scenario.max_jitter_ms, 0);
assert_eq!(shrunk.scenario.base_network_delay_ms, 0);
assert_eq!(shrunk.report.summary.divergent_sequences, vec![17]);
assert_eq!(
shrunk.invariant.failures,
vec![
"divergent_sequences".to_string(),
"duplicate_incomplete".to_string(),
"duplicate_complete_deadline_unreached".to_string(),
]
);
assert!(
shrunk
.steps
.iter()
.any(|step| step.dimension == "sequence_count" && step.after == "18"),
"shrink steps should record the minimized failing media window"
);
}
#[test]
fn duplicate_publisher_campaign_keeps_first_replayable_failure() {
let invariant = DuplicatePublisherInvariantConfig::duplicate_complete_with_deadline(3_000);
let campaign = run_duplicate_publisher_campaign(
"duplicate-publisher-replayable-failure",
SimulationSeed::new(10),
32,
&invariant,
|seed| {
let mut scenario = faulted_duplicate_scenario(seed);
if seed.0 == 19 {
scenario
.encoder_drifts
.push(EncoderDriftFault::new("nuc-b", 17, "x264-hd3-drift"));
}
scenario
},
);
let failure = campaign
.first_failure
.as_ref()
.expect("campaign should preserve first failure");
let shrunk = failure
.shrunk_failure
.as_ref()
.expect("campaign should preserve a shrunk replay");
assert_eq!(failure.seed, SimulationSeed::new(19));
assert_eq!(
failure.invariant.failures,
vec![
"divergent_sequences".to_string(),
"duplicate_incomplete".to_string(),
"duplicate_complete_deadline_unreached".to_string(),
]
);
let mut replay = faulted_duplicate_scenario(failure.seed);
replay
.encoder_drifts
.push(EncoderDriftFault::new("nuc-b", 17, "x264-hd3-drift"));
let replayed_report = run_duplicate_publisher_simulation(&replay);
assert_eq!(replayed_report, failure.report);
assert_eq!(shrunk.scenario.expected_sequences(), 18);
assert_eq!(shrunk.report.summary.divergent_sequences, vec![17]);
}
#[test]
fn system_duplicate_publishers_converge_with_global_sequence_clock() {
let scenario = system_duplicate_scenario(
SimulationSeed::new(0x7379_7374_656d_676c),
PublisherSequenceClock::Global,
);
let report = run_system_duplicate_publisher_simulation(&scenario);
let invariant = check_system_duplicate_publisher_invariants(
&report,
&SystemDuplicatePublisherInvariantConfig::complete_with_deadline(3_500),
);
assert!(
report.system_complete(),
"{} control={:?} media={:?}",
report.replay_hint,
report.control.missing_nodes,
report.media.summary
);
assert!(
invariant.ok,
"{} {:?}",
invariant.replay_hint, invariant.failures
);
assert_eq!(report.media.summary.divergent_sequences, Vec::<u64>::new());
assert_eq!(
report.media.summary.matching_duplicate_sequences.len() as u64,
scenario.media.expected_sequences()
);
assert!(
report
.publisher_activation_ms
.get("nuc-b")
.copied()
.unwrap_or_default()
> report
.publisher_activation_ms
.get("nuc-a")
.copied()
.unwrap_or_default(),
"faulted control plane should activate nuc-b later than nuc-a"
);
}
#[test]
fn system_duplicate_publishers_reject_local_activation_sequence_clock() {
let scenario = system_duplicate_scenario(
SimulationSeed::new(0x7379_7374_656d_6c6f),
PublisherSequenceClock::LocalActivation,
);
let report = run_system_duplicate_publisher_simulation(&scenario);
let invariant = check_system_duplicate_publisher_invariants(
&report,
&SystemDuplicatePublisherInvariantConfig::complete_with_deadline(3_500),
);
assert!(report.control.propagation_complete());
assert!(!report.media.duplicate_complete());
assert!(
!report.media.summary.divergent_sequences.is_empty(),
"local activation clock should cause same advertised sequence to hash differently"
);
assert_eq!(
invariant.failures,
vec![
"media_divergent_sequences".to_string(),
"media_timing_conflict_sequences".to_string(),
"media_duplicate_incomplete".to_string(),
"system_complete_deadline_unreached".to_string(),
]
);
}
#[test]
fn system_duplicate_publisher_campaign_runs_many_seed_schedules() {
let invariant = SystemDuplicatePublisherInvariantConfig::complete_with_deadline(3_500);
let campaign = run_system_duplicate_publisher_campaign(
"system-duplicate-publisher-fault-campaign",
SimulationSeed::new(1),
256,
&invariant,
|seed| system_duplicate_scenario(seed, PublisherSequenceClock::Global),
);
assert!(
campaign.all_passed(),
"campaign failed: {:?}",
campaign.first_failure
);
assert_eq!(campaign.passed, 256);
assert_eq!(campaign.failed, 0);
assert!(campaign.max_control_propagation_ms_observed > 0);
assert!(campaign.max_media_duplicate_complete_ms_observed > 0);
assert!(campaign.max_system_complete_ms_observed <= 3_500);
assert!(campaign.total_system_complete_ms_observed > 0);
assert!(campaign.total_control_trace_events > 0);
assert!(campaign.total_media_trace_events > 0);
assert_eq!(
campaign.total_trace_events,
campaign.total_control_trace_events + campaign.total_media_trace_events
);
assert!(campaign.total_control_transient_drops > 0);
assert!(campaign.total_media_transient_drops > 0);
assert!(campaign.total_media_backfill_observations > 0);
assert!(campaign.seeds_with_system_convergence_time > 0);
assert!(campaign.seeds_with_control_transient_drops > 0);
assert!(campaign.seeds_with_media_transient_drops > 0);
assert!(campaign.seeds_with_media_backfill_observations > 0);
assert!(!campaign.slowest_system_runs.is_empty());
assert!(campaign.slowest_system_runs.len() <= 16);
assert!(campaign
.slowest_system_runs
.windows(2)
.all(|pair| pair[0].system_complete_at_ms.unwrap_or(u64::MAX)
>= pair[1].system_complete_at_ms.unwrap_or(u64::MAX)));
assert_eq!(campaign.total_media_publisher_phase_offsets, 0);
}
#[test]
fn foundation_style_system_campaign_runs_replayable_fault_schedules() {
let invariant = SystemDuplicatePublisherInvariantConfig::complete_with_deadline(6_000);
let config = FoundationStyleSystemScenarioConfig::default();
let campaign = run_system_duplicate_publisher_campaign(
"foundation-style-system-campaign",
SimulationSeed::new(1),
512,
&invariant,
|seed| ec_core::sim::foundation_style_system_duplicate_publisher_scenario(seed, &config),
);
assert!(
campaign.all_passed(),
"campaign failed: {:?}",
campaign.first_failure
);
assert_eq!(campaign.passed, 512);
assert_eq!(campaign.failed, 0);
assert!(campaign.max_system_complete_ms_observed <= 6_000);
assert!(campaign.total_system_complete_ms_observed > 0);
assert!(campaign.total_control_trace_events > 0);
assert!(campaign.total_media_trace_events > 0);
assert_eq!(
campaign.total_trace_events,
campaign.total_control_trace_events + campaign.total_media_trace_events
);
assert!(campaign.total_control_transient_drops > 0);
assert!(campaign.total_control_partition_delays > 0);
assert!(campaign.total_control_node_outage_delays > 0);
assert!(campaign.total_control_duplicate_messages > 0);
assert!(campaign.total_media_transient_drops > 0);
assert!(campaign.total_media_partition_delays > 0);
assert!(campaign.total_media_publisher_outages > 0);
assert!(campaign.total_media_backfill_observations > 0);
assert!(campaign.seeds_with_system_convergence_time > 0);
assert!(campaign.seeds_with_control_propagation_time > 0);
assert!(campaign.seeds_with_media_duplicate_convergence_time > 0);
assert!(campaign.seeds_with_control_transient_drops > 0);
assert!(campaign.seeds_with_control_partition_delays > 0);
assert!(campaign.seeds_with_control_node_outage_delays > 0);
assert!(campaign.seeds_with_control_duplicate_messages > 0);
assert!(campaign.seeds_with_media_transient_drops > 0);
assert!(campaign.seeds_with_media_partition_delays > 0);
assert!(campaign.seeds_with_media_publisher_outages > 0);
assert!(campaign.seeds_with_media_backfill_observations > 0);
assert!(campaign.fault_coverage_ok());
assert!(!campaign.slowest_system_runs.is_empty());
assert!(campaign.slowest_system_runs.len() <= 16);
assert_eq!(campaign.total_media_publisher_phase_offsets, 0);
}
#[test]
fn foundation_style_system_campaign_rejects_local_activation_sequence_clock() {
let invariant = SystemDuplicatePublisherInvariantConfig::complete_with_deadline(6_000);
let mut config = FoundationStyleSystemScenarioConfig::default();
config.sequence_clock = PublisherSequenceClock::LocalActivation;
let campaign = run_system_duplicate_publisher_campaign(
"foundation-style-local-activation-failure",
SimulationSeed::new(1),
32,
&invariant,
|seed| ec_core::sim::foundation_style_system_duplicate_publisher_scenario(seed, &config),
);
let failure = campaign
.first_failure
.as_ref()
.expect("local activation clock should fail under foundation-style faults");
assert!(!campaign.all_passed());
assert!(failure
.invariant
.failures
.contains(&"media_divergent_sequences".to_string()));
assert!(!failure.report.media.summary.divergent_sequences.is_empty());
assert!(
failure
.report
.media
.fault_stats
.publisher_phase_offset_observations
> 0
);
assert!(campaign.total_media_publisher_phase_offsets > 0);
assert!(campaign.seeds_with_media_publisher_phase_offsets > 0);
}
#[test]
fn system_duplicate_publisher_campaign_classifies_source_material_mismatch() {
let invariant = SystemDuplicatePublisherInvariantConfig::complete_with_deadline(3_500);
let campaign = run_system_duplicate_publisher_campaign(
"system-source-material-failure",
SimulationSeed::new(1),
1,
&invariant,
|seed| {
let mut scenario = system_duplicate_scenario(seed, PublisherSequenceClock::Global);
scenario
.media
.publisher_source_material
.insert("nuc-b".to_string(), "independent-rf-window".to_string());
scenario
},
);
let failure = campaign
.first_failure
.as_ref()
.expect("source material mismatch should fail");
assert!(!campaign.all_passed());
assert!(failure
.invariant
.failures
.contains(&"media_source_material_mismatch_observations".to_string()));
assert!(!failure.report.media.summary.divergent_sequences.is_empty());
assert!(
failure
.report
.media
.fault_stats
.source_material_mismatch_observations
> 0
);
assert!(campaign.total_media_source_material_mismatches > 0);
assert_eq!(campaign.seeds_with_media_source_material_mismatches, 1);
}
fn faulted_duplicate_scenario(seed: SimulationSeed) -> DuplicatePublisherScenario {
let mut scenario = DuplicatePublisherScenario::new(
seed,
vec!["nuc-a".to_string(), "nuc-b".to_string()],
STREAM,
RENDITION,
TRACK,
PROFILE,
0,
48,
);
scenario.segment_step_ms = 40;
scenario.base_network_delay_ms = 5;
scenario.max_jitter_ms = 75;
scenario.transient_drop_per_million = 275_000;
scenario.backfill_after_ms = 600;
scenario.partitions = vec![
SimulationPartition::new("nuc-b", 120, 520, 140),
SimulationPartition::new("nuc-a", 940, 1_260, 90),
];
scenario.publisher_outages = vec![SimulationOutage::new("nuc-b", 1_360, 1_520, 220)];
scenario
}
fn faulted_control_plane_scenario(seed: SimulationSeed) -> ControlPlanePropagationScenario {
let mut scenario = ControlPlanePropagationScenario::new(
seed,
vec![
"nuc-a".to_string(),
"nuc-b".to_string(),
"tower".to_string(),
"forge".to_string(),
"relay-lax".to_string(),
"relay-nyc".to_string(),
"relay-hel".to_string(),
],
"nuc-a",
"ec.control.broadcast.la-kcop",
"la-kcop@42",
);
scenario.fanout = 3;
scenario.gossip_interval_ms = 35;
scenario.max_gossip_rounds = 12;
scenario.base_network_delay_ms = 6;
scenario.max_jitter_ms = 45;
scenario.transient_drop_per_million = 120_000;
scenario.partitions = vec![
SimulationPartition::new("relay-hel", 70, 190, 55),
SimulationPartition::new("tower", 220, 310, 40),
];
scenario.node_outages = vec![SimulationOutage::new("relay-nyc", 105, 205, 45)];
scenario
}
fn system_duplicate_scenario(
seed: SimulationSeed,
sequence_clock: PublisherSequenceClock,
) -> SystemDuplicatePublisherScenario {
let mut control = ControlPlanePropagationScenario::new(
seed,
vec![
"forge".to_string(),
"nuc-a".to_string(),
"nuc-b".to_string(),
"tower".to_string(),
"relay-lax".to_string(),
"relay-nyc".to_string(),
"relay-hel".to_string(),
],
"forge",
"ec.control.broadcast.la-kcop",
"la-kcop@42",
);
control.fanout = 3;
control.gossip_interval_ms = 35;
control.max_gossip_rounds = 12;
control.base_network_delay_ms = 6;
control.max_jitter_ms = 45;
control.transient_drop_per_million = 120_000;
control.partitions = vec![
SimulationPartition::new("nuc-b", 0, 180, 40),
SimulationPartition::new("relay-hel", 70, 190, 55),
];
control.node_outages = vec![SimulationOutage::new("relay-nyc", 105, 205, 45)];
let mut media = DuplicatePublisherScenario::new(
SimulationSeed::new(seed.0 ^ 0x6d65_6469_6121),
vec!["nuc-a".to_string(), "nuc-b".to_string()],
STREAM,
RENDITION,
TRACK,
PROFILE,
0,
48,
);
media.segment_step_ms = 40;
media.base_network_delay_ms = 5;
media.max_jitter_ms = 75;
media.transient_drop_per_million = 275_000;
media.backfill_after_ms = 600;
media.partitions = vec![SimulationPartition::new("nuc-a", 940, 1_260, 90)];
media.publisher_outages = vec![SimulationOutage::new("nuc-b", 1_360, 1_520, 220)];
let mut scenario = SystemDuplicatePublisherScenario::new(seed, control, media);
scenario.publisher_activation_delay_ms = 25;
scenario.publisher_backfill_delay_ms = 180;
scenario.sequence_clock = sequence_clock;
scenario
}