desktop: make socket connections async

This also fixes an unrelated bug where null bytes were added to data, this
broke data parsing.
This commit is contained in:
sleepycatcoding 2023-07-15 17:51:26 +03:00 committed by Nathan Adams
parent 5080c19d11
commit 41a63e3bef
3 changed files with 135 additions and 59 deletions

42
Cargo.lock generated
View File

@ -306,6 +306,24 @@ dependencies = [
"event-listener",
]
[[package]]
name = "async-net"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4051e67316bc7eff608fe723df5d32ed639946adcd69e07df41fd42a7b411f1f"
dependencies = [
"async-io",
"autocfg",
"blocking",
"futures-lite",
]
[[package]]
name = "async-task"
version = "4.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ecc7ab41815b3c653ccd2978ec3255c81349336702dfdf62ee6f7069b12a3aae"
[[package]]
name = "atk-sys"
version = "0.16.0"
@ -318,6 +336,12 @@ dependencies = [
"system-deps",
]
[[package]]
name = "atomic-waker"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1181e1e0d1fce796a03db1ae795d67167da795f9cf4a39c37589e85ef57f26d3"
[[package]]
name = "atomic_refcell"
version = "0.1.10"
@ -444,6 +468,21 @@ dependencies = [
"objc2-encode",
]
[[package]]
name = "blocking"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77231a1c8f801696fc0123ec6150ce92cffb8e164a02afb9c8ddee0e9b65ad65"
dependencies = [
"async-channel",
"async-lock",
"async-task",
"atomic-waker",
"fastrand",
"futures-lite",
"log",
]
[[package]]
name = "bstr"
version = "1.6.0"
@ -3850,6 +3889,8 @@ version = "0.1.0"
dependencies = [
"anyhow",
"arboard",
"async-io",
"async-net",
"bytemuck",
"chrono",
"clap",
@ -3862,6 +3903,7 @@ dependencies = [
"fluent-templates",
"fontdb",
"futures",
"futures-lite",
"generational-arena",
"isahc",
"os_info",

View File

@ -37,6 +37,9 @@ wgpu = { workspace = true }
futures = "0.3.28"
chrono = { version = "0.4", default-features = false, features = [] }
fluent-templates = "0.8.0"
futures-lite = "1.13.0"
async-io = "1.13.0"
async-net = "1.7.0"
# Deliberately held back to match tracy client used by profiling crate
tracing-tracy = { version = "=0.10.2", optional = true }

View File

@ -1,6 +1,10 @@
//! Navigator backend for web
use crate::custom_event::RuffleEvent;
use async_io::Timer;
use async_net::TcpStream;
use futures::{AsyncReadExt, AsyncWriteExt};
use futures_lite::FutureExt;
use isahc::http::{HeaderName, HeaderValue};
use isahc::{
config::RedirectPolicy, prelude::*, AsyncReadResponseExt, HttpClient, Request as IsahcRequest,
@ -12,13 +16,14 @@ use ruffle_core::backend::navigator::{
};
use ruffle_core::indexmap::IndexMap;
use ruffle_core::loader::Error;
use ruffle_core::socket::SocketConnection;
use std::collections::VecDeque;
use std::io::{ErrorKind, Read, Write};
use std::net::TcpStream;
use ruffle_core::socket::{SocketAction, SocketHandle};
use std::io;
use std::io::ErrorKind;
use std::rc::Rc;
use std::str::FromStr;
use std::sync::mpsc::Sender;
use std::sync::mpsc::{Receiver, Sender, TryRecvError};
use std::time::Duration;
use tracing::warn;
use url::{ParseError, Url};
use winit::event_loop::EventLoopProxy;
@ -312,73 +317,99 @@ impl NavigatorBackend for ExternalNavigatorBackend {
url
}
fn connect_socket(&mut self, host: &str, port: u16) -> Option<Box<dyn SocketConnection>> {
fn connect_socket(
&mut self,
host: String,
port: u16,
handle: SocketHandle,
receiver: Receiver<Vec<u8>>,
sender: Sender<SocketAction>,
) {
// FIXME: Add connection permissions
Some(Box::new(TcpSocket::connect(host, port)))
}
}
let future = Box::pin(async move {
let host2 = host.clone();
struct TcpSocket {
stream: Option<TcpStream>,
pending_write: Vec<u8>,
pending_read: VecDeque<u8>,
}
let mut pending_write = vec![];
impl TcpSocket {
fn connect(host: &str, port: u16) -> Self {
// FIXME: make connect asynchronous
Self {
stream: TcpStream::connect((host, port)).ok().and_then(|socket| {
if socket.set_nonblocking(true).is_ok() {
Some(socket)
} else {
None
let mut stream = match TcpStream::connect((host, port)).await {
Ok(stream) => {
sender
.send(SocketAction::Connect(handle, true))
.expect("working channel send");
stream
}
}),
pending_read: Default::default(),
pending_write: Default::default(),
}
}
}
Err(err) => {
warn!("Failed to connect to {}:{}, error: {}", host2, port, err);
sender
.send(SocketAction::Connect(handle, false))
.expect("working channel send");
return Ok(());
}
};
impl SocketConnection for TcpSocket {
fn is_connected(&self) -> Option<bool> {
Some(self.stream.is_some())
}
loop {
loop {
match receiver.try_recv() {
Ok(val) => {
pending_write.extend(val);
}
Err(TryRecvError::Disconnected) => {
//NOTE: Channel sender has been dropped.
// This means we have to close the connection.
drop(stream);
return Ok(());
}
Err(_) => break,
}
}
fn send(&mut self, buf: Vec<u8>) {
if self.stream.is_some() {
self.pending_write.extend(buf)
}
}
let mut buffer = [0; 4096];
fn poll(&mut self) -> Option<Vec<u8>> {
if let Some(stream) = &mut self.stream {
if !self.pending_write.is_empty() {
match stream.write(&self.pending_write) {
Err(e) if e.kind() == ErrorKind::WouldBlock => {} // just try later
match stream
.read(&mut buffer)
.or(async {
Timer::after(Duration::from_millis(50)).await;
Result::<usize, io::Error>::Err(io::Error::new(ErrorKind::TimedOut, ""))
})
.await
{
Err(e) if e.kind() == ErrorKind::TimedOut => {} // try again later.
Err(_) | Ok(0) => {
self.stream = None;
return None;
sender
.send(SocketAction::Close(handle))
.expect("working channel send");
drop(stream);
break;
}
Ok(written) => {
let _ = self.pending_write.drain(..written);
Ok(read) => {
let buffer = buffer.into_iter().take(read).collect::<Vec<_>>();
sender
.send(SocketAction::Data(handle, buffer))
.expect("working channel send");
}
};
if !pending_write.is_empty() {
match stream.write(&pending_write).await {
Err(_) | Ok(0) => {
sender
.send(SocketAction::Close(handle))
.expect("working channel send");
drop(stream);
break;
}
Ok(written) => {
let _ = pending_write.drain(..written);
}
}
}
}
let mut buffer = [0; 4096];
Ok(())
});
match stream.read(&mut buffer) {
Err(e) if e.kind() == ErrorKind::WouldBlock => return None, // just try later
Err(_) | Ok(0) => {
self.stream = None;
return None;
}
Ok(_read) => return Some(buffer.to_vec()),
}
} else {
None
}
self.spawn_future(future);
}
}