frontend-utils: Replace async-net's `TcpStream` with tokio's

This commit is contained in:
sleepycatcoding 2024-06-20 00:22:12 +03:00
parent 9271618185
commit 9979631f8c
3 changed files with 22 additions and 25 deletions

2
Cargo.lock generated
View File

@ -4431,8 +4431,6 @@ version = "0.1.0"
dependencies = [
"async-channel",
"async-io",
"async-net",
"futures",
"futures-lite",
"macro_rules_attribute",
"reqwest",

View File

@ -21,13 +21,11 @@ ruffle_core = { path = "../core", default-features = false }
ruffle_render = { path = "../render", default-features = false }
async-channel = { workspace = true }
slotmap = { workspace = true }
futures = { workspace = true }
async-io = "2.3.2"
async-net = "2.0.0"
futures-lite = "2.3.0"
webbrowser = "1.0.1"
reqwest = { version = "0.12.5", default-features = false, features = ["rustls-tls", "cookies", "charset", "http2", "macos-system-configuration"] }
tokio = { workspace = true }
tokio = { workspace = true, features = ["net"] }
[dev-dependencies]
tempfile = "3"

View File

@ -5,9 +5,6 @@ use crate::backends::navigator::fetch::{Response, ResponseBody};
use crate::content::PlayingContent;
use async_channel::{Receiver, Sender, TryRecvError};
use async_io::Timer;
use async_net::TcpStream;
use futures::future::select;
use futures::{AsyncReadExt, AsyncWriteExt};
use futures_lite::FutureExt;
use reqwest::{cookie, header, Proxy};
use ruffle_core::backend::navigator::{
@ -25,10 +22,12 @@ use std::path::Path;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tracing::warn;
use url::{ParseError, Url};
pub trait NavigatorInterface: Clone + 'static {
pub trait NavigatorInterface: Clone + Send + 'static {
fn confirm_website_navigation(&self, url: &Url) -> bool;
fn open_file(&self, path: &Path) -> io::Result<File>;
@ -340,7 +339,7 @@ impl<F: FutureSpawner, I: NavigatorInterface> NavigatorBackend for ExternalNavig
"SWF tried to open a socket, but opening a socket is not allowed"
);
return Ok(());
return;
}
(false, SocketMode::Ask) => {
let attempt_sandbox_connect = interface.confirm_socket(&host, port).await;
@ -351,7 +350,7 @@ impl<F: FutureSpawner, I: NavigatorInterface> NavigatorBackend for ExternalNavig
.try_send(SocketAction::Connect(handle, ConnectionState::Failed))
.expect("working channel send");
return Ok(());
return;
}
}
}
@ -363,13 +362,13 @@ impl<F: FutureSpawner, I: NavigatorInterface> NavigatorBackend for ExternalNavig
Result::<TcpStream, io::Error>::Err(io::Error::new(ErrorKind::TimedOut, ""))
};
let stream = match TcpStream::connect((host, port)).or(timeout).await {
let mut stream = match TcpStream::connect((host, port)).or(timeout).await {
Err(e) if e.kind() == ErrorKind::TimedOut => {
warn!("Connection to {}:{} timed out", host2, port);
sender
.try_send(SocketAction::Connect(handle, ConnectionState::TimedOut))
.expect("working channel send");
return Ok(());
return;
}
Ok(stream) => {
sender
@ -383,7 +382,7 @@ impl<F: FutureSpawner, I: NavigatorInterface> NavigatorBackend for ExternalNavig
sender
.try_send(SocketAction::Connect(handle, ConnectionState::Failed))
.expect("working channel send");
return Ok(());
return;
}
};
@ -392,7 +391,7 @@ impl<F: FutureSpawner, I: NavigatorInterface> NavigatorBackend for ExternalNavig
let sender2 = sender.clone();
let (mut read, mut write) = stream.split();
let read = std::pin::pin!(async move {
let read = async move {
loop {
let mut buffer = [0; 4096];
@ -402,7 +401,6 @@ impl<F: FutureSpawner, I: NavigatorInterface> NavigatorBackend for ExternalNavig
sender
.try_send(SocketAction::Close(handle))
.expect("working channel send");
drop(read);
break;
}
Ok(read) => {
@ -414,9 +412,9 @@ impl<F: FutureSpawner, I: NavigatorInterface> NavigatorBackend for ExternalNavig
}
};
}
});
};
let write = std::pin::pin!(async move {
let write = async move {
let mut pending_write = vec![];
loop {
@ -442,7 +440,6 @@ impl<F: FutureSpawner, I: NavigatorInterface> NavigatorBackend for ExternalNavig
sender2
.try_send(SocketAction::Close(handle))
.expect("working channel send");
drop(write);
return;
}
Ok(written) => {
@ -450,7 +447,6 @@ impl<F: FutureSpawner, I: NavigatorInterface> NavigatorBackend for ExternalNavig
}
}
} else if close_connection {
drop(write);
return;
} else {
// Receiver is empty and there's no pending data,
@ -466,25 +462,30 @@ impl<F: FutureSpawner, I: NavigatorInterface> NavigatorBackend for ExternalNavig
}
}
}
});
};
//NOTE: If one future exits, this will take the other one down too.
select(read, write).await;
tokio::select! {
_ = read => {},
_ = write => {},
};
Ok(())
if let Err(e) = stream.shutdown().await {
tracing::warn!("Failed to shutdown write half of TcpStream: {e}");
}
});
self.spawn_future(future);
tokio::spawn(future);
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use async_net::TcpListener;
use ruffle_core::socket::SocketAction::{Close, Connect, Data};
use std::net::SocketAddr;
use std::str::FromStr;
use tokio::net::TcpListener;
use tokio::task;
use super::*;