From 41a63e3befeeaff2a74a84c3e26421b47a470b33 Mon Sep 17 00:00:00 2001 From: sleepycatcoding <131554884+sleepycatcoding@users.noreply.github.com> Date: Sat, 15 Jul 2023 17:51:26 +0300 Subject: [PATCH] desktop: make socket connections async This also fixes an unrelated bug where null bytes were added to data, this broke data parsing. --- Cargo.lock | 42 +++++++++ desktop/Cargo.toml | 3 + desktop/src/backends/navigator.rs | 149 ++++++++++++++++++------------ 3 files changed, 135 insertions(+), 59 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a7d481932..4710ebda2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -306,6 +306,24 @@ dependencies = [ "event-listener", ] +[[package]] +name = "async-net" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4051e67316bc7eff608fe723df5d32ed639946adcd69e07df41fd42a7b411f1f" +dependencies = [ + "async-io", + "autocfg", + "blocking", + "futures-lite", +] + +[[package]] +name = "async-task" +version = "4.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc7ab41815b3c653ccd2978ec3255c81349336702dfdf62ee6f7069b12a3aae" + [[package]] name = "atk-sys" version = "0.16.0" @@ -318,6 +336,12 @@ dependencies = [ "system-deps", ] +[[package]] +name = "atomic-waker" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1181e1e0d1fce796a03db1ae795d67167da795f9cf4a39c37589e85ef57f26d3" + [[package]] name = "atomic_refcell" version = "0.1.10" @@ -444,6 +468,21 @@ dependencies = [ "objc2-encode", ] +[[package]] +name = "blocking" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77231a1c8f801696fc0123ec6150ce92cffb8e164a02afb9c8ddee0e9b65ad65" +dependencies = [ + "async-channel", + "async-lock", + "async-task", + "atomic-waker", + "fastrand", + "futures-lite", + "log", +] + [[package]] name = "bstr" version = "1.6.0" @@ -3850,6 +3889,8 @@ version = "0.1.0" dependencies = [ "anyhow", "arboard", + "async-io", + "async-net", "bytemuck", "chrono", "clap", @@ -3862,6 +3903,7 @@ dependencies = [ "fluent-templates", "fontdb", "futures", + "futures-lite", "generational-arena", "isahc", "os_info", diff --git a/desktop/Cargo.toml b/desktop/Cargo.toml index 3d8bbe5a0..2e6101804 100644 --- a/desktop/Cargo.toml +++ b/desktop/Cargo.toml @@ -37,6 +37,9 @@ wgpu = { workspace = true } futures = "0.3.28" chrono = { version = "0.4", default-features = false, features = [] } fluent-templates = "0.8.0" +futures-lite = "1.13.0" +async-io = "1.13.0" +async-net = "1.7.0" # Deliberately held back to match tracy client used by profiling crate tracing-tracy = { version = "=0.10.2", optional = true } diff --git a/desktop/src/backends/navigator.rs b/desktop/src/backends/navigator.rs index c9f559358..16a88a905 100644 --- a/desktop/src/backends/navigator.rs +++ b/desktop/src/backends/navigator.rs @@ -1,6 +1,10 @@ //! Navigator backend for web use crate::custom_event::RuffleEvent; +use async_io::Timer; +use async_net::TcpStream; +use futures::{AsyncReadExt, AsyncWriteExt}; +use futures_lite::FutureExt; use isahc::http::{HeaderName, HeaderValue}; use isahc::{ config::RedirectPolicy, prelude::*, AsyncReadResponseExt, HttpClient, Request as IsahcRequest, @@ -12,13 +16,14 @@ use ruffle_core::backend::navigator::{ }; use ruffle_core::indexmap::IndexMap; use ruffle_core::loader::Error; -use ruffle_core::socket::SocketConnection; -use std::collections::VecDeque; -use std::io::{ErrorKind, Read, Write}; -use std::net::TcpStream; +use ruffle_core::socket::{SocketAction, SocketHandle}; +use std::io; +use std::io::ErrorKind; use std::rc::Rc; use std::str::FromStr; -use std::sync::mpsc::Sender; +use std::sync::mpsc::{Receiver, Sender, TryRecvError}; +use std::time::Duration; +use tracing::warn; use url::{ParseError, Url}; use winit::event_loop::EventLoopProxy; @@ -312,73 +317,99 @@ impl NavigatorBackend for ExternalNavigatorBackend { url } - fn connect_socket(&mut self, host: &str, port: u16) -> Option> { + fn connect_socket( + &mut self, + host: String, + port: u16, + handle: SocketHandle, + receiver: Receiver>, + sender: Sender, + ) { // FIXME: Add connection permissions - Some(Box::new(TcpSocket::connect(host, port))) - } -} + let future = Box::pin(async move { + let host2 = host.clone(); -struct TcpSocket { - stream: Option, - pending_write: Vec, - pending_read: VecDeque, -} + let mut pending_write = vec![]; -impl TcpSocket { - fn connect(host: &str, port: u16) -> Self { - // FIXME: make connect asynchronous - Self { - stream: TcpStream::connect((host, port)).ok().and_then(|socket| { - if socket.set_nonblocking(true).is_ok() { - Some(socket) - } else { - None + let mut stream = match TcpStream::connect((host, port)).await { + Ok(stream) => { + sender + .send(SocketAction::Connect(handle, true)) + .expect("working channel send"); + + stream } - }), - pending_read: Default::default(), - pending_write: Default::default(), - } - } -} + Err(err) => { + warn!("Failed to connect to {}:{}, error: {}", host2, port, err); + sender + .send(SocketAction::Connect(handle, false)) + .expect("working channel send"); + return Ok(()); + } + }; -impl SocketConnection for TcpSocket { - fn is_connected(&self) -> Option { - Some(self.stream.is_some()) - } + loop { + loop { + match receiver.try_recv() { + Ok(val) => { + pending_write.extend(val); + } + Err(TryRecvError::Disconnected) => { + //NOTE: Channel sender has been dropped. + // This means we have to close the connection. + drop(stream); + return Ok(()); + } + Err(_) => break, + } + } - fn send(&mut self, buf: Vec) { - if self.stream.is_some() { - self.pending_write.extend(buf) - } - } + let mut buffer = [0; 4096]; - fn poll(&mut self) -> Option> { - if let Some(stream) = &mut self.stream { - if !self.pending_write.is_empty() { - match stream.write(&self.pending_write) { - Err(e) if e.kind() == ErrorKind::WouldBlock => {} // just try later + match stream + .read(&mut buffer) + .or(async { + Timer::after(Duration::from_millis(50)).await; + Result::::Err(io::Error::new(ErrorKind::TimedOut, "")) + }) + .await + { + Err(e) if e.kind() == ErrorKind::TimedOut => {} // try again later. Err(_) | Ok(0) => { - self.stream = None; - return None; + sender + .send(SocketAction::Close(handle)) + .expect("working channel send"); + drop(stream); + break; } - Ok(written) => { - let _ = self.pending_write.drain(..written); + Ok(read) => { + let buffer = buffer.into_iter().take(read).collect::>(); + + sender + .send(SocketAction::Data(handle, buffer)) + .expect("working channel send"); + } + }; + + if !pending_write.is_empty() { + match stream.write(&pending_write).await { + Err(_) | Ok(0) => { + sender + .send(SocketAction::Close(handle)) + .expect("working channel send"); + drop(stream); + break; + } + Ok(written) => { + let _ = pending_write.drain(..written); + } } } } - let mut buffer = [0; 4096]; + Ok(()) + }); - match stream.read(&mut buffer) { - Err(e) if e.kind() == ErrorKind::WouldBlock => return None, // just try later - Err(_) | Ok(0) => { - self.stream = None; - return None; - } - Ok(_read) => return Some(buffer.to_vec()), - } - } else { - None - } + self.spawn_future(future); } }