134 lines
4.5 KiB
Rust
134 lines
4.5 KiB
Rust
use anyhow::{anyhow, Result};
|
|
use bytes::Bytes;
|
|
use ec_direct::{decode_direct_link, encode_direct_link, DirectCodeV1};
|
|
use just_webrtc::types::{
|
|
DataChannelOptions, PeerConfiguration, PeerConnectionState, SessionDescription,
|
|
};
|
|
use just_webrtc::{DataChannelExt, PeerConnectionBuilder, PeerConnectionExt};
|
|
|
|
async fn wait_connected(pc: &impl PeerConnectionExt) -> Result<()> {
|
|
tokio::time::timeout(std::time::Duration::from_secs(20), async {
|
|
loop {
|
|
match pc.state_change().await {
|
|
PeerConnectionState::Connected => break Ok(()),
|
|
PeerConnectionState::Failed => break Err(anyhow!("peer connection failed")),
|
|
PeerConnectionState::Closed => break Err(anyhow!("peer connection closed")),
|
|
_ => {}
|
|
}
|
|
}
|
|
})
|
|
.await
|
|
.map_err(|_| anyhow!("timed out waiting for peer connection"))?
|
|
}
|
|
|
|
// Ignored by default: WebRTC can be timing-sensitive on some hosts.
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
#[ignore]
|
|
async fn e2e_direct_connect_loopback_sends_bytes() -> Result<()> {
|
|
// Avoid depending on external STUN servers in tests: use host candidates only.
|
|
let cfg = PeerConfiguration {
|
|
ice_servers: vec![],
|
|
..Default::default()
|
|
};
|
|
|
|
let offerer = PeerConnectionBuilder::new()
|
|
.set_config(cfg.clone())
|
|
.with_channel_options(vec![(
|
|
"simple_channel_".to_string(),
|
|
DataChannelOptions::default(),
|
|
)])
|
|
.map_err(|e| anyhow!("{e:#}"))?
|
|
.build()
|
|
.await
|
|
.map_err(|e| anyhow!("{e:#}"))?;
|
|
|
|
let offer_desc: SessionDescription = offerer
|
|
.get_local_description()
|
|
.await
|
|
.ok_or_else(|| anyhow!("missing offer local description"))?;
|
|
let offer_candidates = offerer
|
|
.collect_ice_candidates()
|
|
.await
|
|
.map_err(|e| anyhow!("{e:#}"))?;
|
|
let offer_link = encode_direct_link(&DirectCodeV1 {
|
|
v: 1,
|
|
desc: offer_desc,
|
|
candidates: offer_candidates,
|
|
label: Some("every.channel0".to_string()),
|
|
})?;
|
|
|
|
let offer_code = decode_direct_link(&offer_link)?;
|
|
let answerer = PeerConnectionBuilder::new()
|
|
.set_config(cfg.clone())
|
|
.with_remote_offer(Some(offer_code.desc.clone()))
|
|
.map_err(|e| anyhow!("{e:#}"))?
|
|
.build()
|
|
.await
|
|
.map_err(|e| anyhow!("{e:#}"))?;
|
|
answerer
|
|
.add_ice_candidates(offer_code.candidates.clone())
|
|
.await
|
|
.map_err(|e| anyhow!("{e:#}"))?;
|
|
let answer_desc = answerer
|
|
.get_local_description()
|
|
.await
|
|
.ok_or_else(|| anyhow!("missing answer local description"))?;
|
|
let answer_candidates = answerer
|
|
.collect_ice_candidates()
|
|
.await
|
|
.map_err(|e| anyhow!("{e:#}"))?;
|
|
let answer_link = encode_direct_link(&DirectCodeV1 {
|
|
v: 1,
|
|
desc: answer_desc,
|
|
candidates: answer_candidates,
|
|
label: Some("every.channel0".to_string()),
|
|
})?;
|
|
|
|
let answer_code = decode_direct_link(&answer_link)?;
|
|
offerer
|
|
.set_remote_description(answer_code.desc.clone())
|
|
.await
|
|
.map_err(|e| anyhow!("{e:#}"))?;
|
|
offerer
|
|
.add_ice_candidates(answer_code.candidates.clone())
|
|
.await
|
|
.map_err(|e| anyhow!("{e:#}"))?;
|
|
|
|
// Wait for both peers to report a full connection before waiting for the data channel.
|
|
wait_connected(&offerer).await?;
|
|
wait_connected(&answerer).await?;
|
|
|
|
let offerer_ch = offerer
|
|
.receive_channel()
|
|
.await
|
|
.map_err(|e| anyhow!("{e:#}"))?;
|
|
let answerer_ch = answerer
|
|
.receive_channel()
|
|
.await
|
|
.map_err(|e| anyhow!("{e:#}"))?;
|
|
offerer_ch.wait_ready().await;
|
|
answerer_ch.wait_ready().await;
|
|
|
|
let payload = Bytes::from_static(b"hello");
|
|
offerer_ch
|
|
.send(&payload)
|
|
.await
|
|
.map_err(|e| anyhow!("{e:#}"))?;
|
|
let got = tokio::time::timeout(std::time::Duration::from_secs(10), answerer_ch.receive())
|
|
.await
|
|
.map_err(|_| anyhow!("timed out waiting for receive"))?
|
|
.map_err(|e| anyhow!("{e:#}"))?;
|
|
assert_eq!(&got[..], b"hello");
|
|
|
|
// Confirm the reverse direction works too (this also guards against one-way readiness bugs).
|
|
answerer_ch
|
|
.send(&Bytes::from_static(b"world"))
|
|
.await
|
|
.map_err(|e| anyhow!("{e:#}"))?;
|
|
let got = tokio::time::timeout(std::time::Duration::from_secs(10), offerer_ch.receive())
|
|
.await
|
|
.map_err(|_| anyhow!("timed out waiting for receive"))?
|
|
.map_err(|e| anyhow!("{e:#}"))?;
|
|
assert_eq!(&got[..], b"world");
|
|
Ok(())
|
|
}
|