core: Use async channels in NavigatorBackend

This has several advantages:
1. it allows using async variants of send and recv,
2. it adds consistency as until now Receiver was async,
   and Sender was not.
This commit is contained in:
Kamil Jarosz 2024-02-27 19:03:22 +01:00 committed by Nathan Adams
parent 3e99dd3999
commit cc8b0aa5dd
6 changed files with 36 additions and 41 deletions

View File

@ -3,7 +3,7 @@
use crate::loader::Error; use crate::loader::Error;
use crate::socket::{ConnectionState, SocketAction, SocketHandle}; use crate::socket::{ConnectionState, SocketAction, SocketHandle};
use crate::string::WStr; use crate::string::WStr;
use async_channel::Receiver; use async_channel::{Receiver, Sender};
use indexmap::IndexMap; use indexmap::IndexMap;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::borrow::Cow; use std::borrow::Cow;
@ -14,7 +14,6 @@ use std::future::Future;
use std::io::Read; use std::io::Read;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::pin::Pin; use std::pin::Pin;
use std::sync::mpsc::Sender;
use std::time::Duration; use std::time::Duration;
use swf::avm1::types::SendVarsMethod; use swf::avm1::types::SendVarsMethod;
use url::{ParseError, Url}; use url::{ParseError, Url};
@ -455,7 +454,7 @@ impl NavigatorBackend for NullNavigatorBackend {
sender: Sender<SocketAction>, sender: Sender<SocketAction>,
) { ) {
sender sender
.send(SocketAction::Connect(handle, ConnectionState::Failed)) .try_send(SocketAction::Connect(handle, ConnectionState::Failed))
.expect("working channel send"); .expect("working channel send");
} }
} }

View File

@ -11,12 +11,11 @@ use crate::{
context::UpdateContext, context::UpdateContext,
string::AvmString, string::AvmString,
}; };
use async_channel::{unbounded, Sender as AsyncSender}; use async_channel::{unbounded, Receiver, Sender as AsyncSender, Sender};
use gc_arena::Collect; use gc_arena::Collect;
use generational_arena::{Arena, Index}; use generational_arena::{Arena, Index};
use std::{ use std::{
cell::{Cell, RefCell}, cell::{Cell, RefCell},
sync::mpsc::{channel, Receiver, Sender},
time::Duration, time::Duration,
}; };
@ -79,7 +78,7 @@ unsafe impl<'gc> Collect for Sockets<'gc> {
impl<'gc> Sockets<'gc> { impl<'gc> Sockets<'gc> {
pub fn empty() -> Self { pub fn empty() -> Self {
let (sender, receiver) = channel(); let (sender, receiver) = unbounded();
Self { Self {
sockets: Arena::new(), sockets: Arena::new(),

View File

@ -1,7 +1,6 @@
//! Navigator backend for web //! Navigator backend for web
use crate::custom_event::RuffleEvent; use async_channel::{Receiver, Sender, TryRecvError};
use async_channel::{Receiver, TryRecvError};
use async_io::Timer; use async_io::Timer;
use async_net::TcpStream; use async_net::TcpStream;
use futures::future::select; use futures::future::select;
@ -26,12 +25,12 @@ use std::io::ErrorKind;
use std::io::{self, Read}; use std::io::{self, Read};
use std::rc::Rc; use std::rc::Rc;
use std::str::FromStr; use std::str::FromStr;
use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
use tracing::warn; use tracing::warn;
use url::{ParseError, Url}; use url::{ParseError, Url};
use winit::event_loop::EventLoopProxy; use winit::event_loop::EventLoopProxy;
use crate::custom_event::RuffleEvent;
/// Implementation of `NavigatorBackend` for non-web environments that can call /// Implementation of `NavigatorBackend` for non-web environments that can call
/// out to a web browser. /// out to a web browser.
@ -329,7 +328,7 @@ impl NavigatorBackend for ExternalNavigatorBackend {
"Unable to create path out of URL", "Unable to create path out of URL",
response_url.as_str(), response_url.as_str(),
"", "",
) );
} }
}; };
@ -362,7 +361,7 @@ impl NavigatorBackend for ExternalNavigatorBackend {
"Can't open file", "Can't open file",
response_url.as_str(), response_url.as_str(),
e, e,
) );
} }
}; };
@ -463,7 +462,7 @@ impl NavigatorBackend for ExternalNavigatorBackend {
} }
fn spawn_future(&mut self, future: OwnedFuture<(), Error>) { fn spawn_future(&mut self, future: OwnedFuture<(), Error>) {
self.channel.send(future).expect("working channel send"); self.channel.try_send(future).expect("working channel send");
if self.event_loop.send_event(RuffleEvent::TaskPoll).is_err() { if self.event_loop.send_event(RuffleEvent::TaskPoll).is_err() {
tracing::warn!( tracing::warn!(
@ -498,7 +497,7 @@ impl NavigatorBackend for ExternalNavigatorBackend {
(false, SocketMode::Deny) => { (false, SocketMode::Deny) => {
// Just fail the connection. // Just fail the connection.
sender sender
.send(SocketAction::Connect(handle, ConnectionState::Failed)) .try_send(SocketAction::Connect(handle, ConnectionState::Failed))
.expect("working channel send"); .expect("working channel send");
tracing::warn!( tracing::warn!(
@ -509,13 +508,13 @@ impl NavigatorBackend for ExternalNavigatorBackend {
} }
(false, SocketMode::Ask) => { (false, SocketMode::Ask) => {
let attempt_sandbox_connect = AsyncMessageDialog::new().set_level(MessageLevel::Warning).set_description(format!("The current movie is attempting to connect to {:?} (port {}).\n\nTo allow it to do so, click Yes to grant network access to that host.\n\nOtherwise, click No to deny access.", host, port)).set_buttons(MessageButtons::YesNo) let attempt_sandbox_connect = AsyncMessageDialog::new().set_level(MessageLevel::Warning).set_description(format!("The current movie is attempting to connect to {:?} (port {}).\n\nTo allow it to do so, click Yes to grant network access to that host.\n\nOtherwise, click No to deny access.", host, port)).set_buttons(MessageButtons::YesNo)
.show() .show()
.await == MessageDialogResult::Yes; .await == MessageDialogResult::Yes;
if !attempt_sandbox_connect { if !attempt_sandbox_connect {
// fail the connection. // fail the connection.
sender sender
.send(SocketAction::Connect(handle, ConnectionState::Failed)) .try_send(SocketAction::Connect(handle, ConnectionState::Failed))
.expect("working channel send"); .expect("working channel send");
return Ok(()); return Ok(());
@ -534,13 +533,13 @@ impl NavigatorBackend for ExternalNavigatorBackend {
Err(e) if e.kind() == ErrorKind::TimedOut => { Err(e) if e.kind() == ErrorKind::TimedOut => {
warn!("Connection to {}:{} timed out", host2, port); warn!("Connection to {}:{} timed out", host2, port);
sender sender
.send(SocketAction::Connect(handle, ConnectionState::TimedOut)) .try_send(SocketAction::Connect(handle, ConnectionState::TimedOut))
.expect("working channel send"); .expect("working channel send");
return Ok(()); return Ok(());
} }
Ok(stream) => { Ok(stream) => {
sender sender
.send(SocketAction::Connect(handle, ConnectionState::Connected)) .try_send(SocketAction::Connect(handle, ConnectionState::Connected))
.expect("working channel send"); .expect("working channel send");
stream stream
@ -548,7 +547,7 @@ impl NavigatorBackend for ExternalNavigatorBackend {
Err(err) => { Err(err) => {
warn!("Failed to connect to {}:{}, error: {}", host2, port, err); warn!("Failed to connect to {}:{}, error: {}", host2, port, err);
sender sender
.send(SocketAction::Connect(handle, ConnectionState::Failed)) .try_send(SocketAction::Connect(handle, ConnectionState::Failed))
.expect("working channel send"); .expect("working channel send");
return Ok(()); return Ok(());
} }
@ -567,7 +566,7 @@ impl NavigatorBackend for ExternalNavigatorBackend {
Err(e) if e.kind() == ErrorKind::TimedOut => {} // try again later. Err(e) if e.kind() == ErrorKind::TimedOut => {} // try again later.
Err(_) | Ok(0) => { Err(_) | Ok(0) => {
sender sender
.send(SocketAction::Close(handle)) .try_send(SocketAction::Close(handle))
.expect("working channel send"); .expect("working channel send");
drop(read); drop(read);
break; break;
@ -576,7 +575,7 @@ impl NavigatorBackend for ExternalNavigatorBackend {
let buffer = buffer.into_iter().take(read).collect::<Vec<_>>(); let buffer = buffer.into_iter().take(read).collect::<Vec<_>>();
sender sender
.send(SocketAction::Data(handle, buffer)) .try_send(SocketAction::Data(handle, buffer))
.expect("working channel send"); .expect("working channel send");
} }
}; };
@ -607,7 +606,7 @@ impl NavigatorBackend for ExternalNavigatorBackend {
Err(e) if e.kind() == ErrorKind::TimedOut => {} // try again later. Err(e) if e.kind() == ErrorKind::TimedOut => {} // try again later.
Err(_) => { Err(_) => {
sender2 sender2
.send(SocketAction::Close(handle)) .try_send(SocketAction::Close(handle))
.expect("working channel send"); .expect("working channel send");
drop(write); drop(write);
return; return;

View File

@ -2,10 +2,10 @@
use crate::custom_event::RuffleEvent; use crate::custom_event::RuffleEvent;
use crate::task::Task; use crate::task::Task;
use async_channel::{unbounded, Receiver, Sender};
use generational_arena::{Arena, Index}; use generational_arena::{Arena, Index};
use ruffle_core::backend::navigator::OwnedFuture; use ruffle_core::backend::navigator::OwnedFuture;
use ruffle_core::loader::Error; use ruffle_core::loader::Error;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex, Weak}; use std::sync::{Arc, Mutex, Weak};
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use winit::event_loop::EventLoopProxy; use winit::event_loop::EventLoopProxy;
@ -151,7 +151,7 @@ impl WinitAsyncExecutor {
pub fn new( pub fn new(
event_loop: EventLoopProxy<RuffleEvent>, event_loop: EventLoopProxy<RuffleEvent>,
) -> (Arc<Mutex<Self>>, Sender<OwnedFuture<(), Error>>) { ) -> (Arc<Mutex<Self>>, Sender<OwnedFuture<(), Error>>) {
let (send, recv) = channel(); let (send, recv) = unbounded();
let new_self = Arc::new_cyclic(|self_ref| { let new_self = Arc::new_cyclic(|self_ref| {
Mutex::new(Self { Mutex::new(Self {
task_queue: Arena::new(), task_queue: Arena::new(),

View File

@ -1,6 +1,6 @@
use crate::backends::TestLogBackend; use crate::backends::TestLogBackend;
use crate::util::read_bytes; use crate::util::read_bytes;
use async_channel::Receiver; use async_channel::{Receiver, Sender};
use percent_encoding::percent_decode_str; use percent_encoding::percent_decode_str;
use ruffle_core::backend::log::LogBackend; use ruffle_core::backend::log::LogBackend;
use ruffle_core::backend::navigator::{ use ruffle_core::backend::navigator::{
@ -12,7 +12,6 @@ use ruffle_core::loader::Error;
use ruffle_core::socket::{ConnectionState, SocketAction, SocketHandle}; use ruffle_core::socket::{ConnectionState, SocketAction, SocketHandle};
use ruffle_socket_format::SocketEvent; use ruffle_socket_format::SocketEvent;
use std::borrow::Cow; use std::borrow::Cow;
use std::sync::mpsc::Sender;
use std::time::Duration; use std::time::Duration;
use url::{ParseError, Url}; use url::{ParseError, Url};
use vfs::VfsPath; use vfs::VfsPath;
@ -282,22 +281,22 @@ impl NavigatorBackend for TestNavigatorBackend {
if let Some(events) = self.socket_events.clone() { if let Some(events) = self.socket_events.clone() {
self.spawn_future(Box::pin(async move { self.spawn_future(Box::pin(async move {
sender sender
.send(SocketAction::Connect(handle, ConnectionState::Connected)) .try_send(SocketAction::Connect(handle, ConnectionState::Connected))
.expect("working channel send"); .expect("working channel send");
for event in events { for event in events {
match event { match event {
SocketEvent::Disconnect => { SocketEvent::Disconnect => {
sender sender
.send(SocketAction::Close(handle)) .try_send(SocketAction::Close(handle))
.expect("working channel send"); .expect("working channel send");
}, }
SocketEvent::WaitForDisconnect => { SocketEvent::WaitForDisconnect => {
match receiver.recv().await { match receiver.recv().await {
Err(_) => break, Err(_) => break,
Ok(_) => panic!("Expected client to disconnect, data was sent instead"), Ok(_) => panic!("Expected client to disconnect, data was sent instead"),
} }
}, }
SocketEvent::Receive { expected } => { SocketEvent::Receive { expected } => {
match receiver.recv().await { match receiver.recv().await {
Ok(val) => { Ok(val) => {
@ -307,9 +306,9 @@ impl NavigatorBackend for TestNavigatorBackend {
} }
Err(_) => panic!("Expected client to send data, but connection was closed instead"), Err(_) => panic!("Expected client to send data, but connection was closed instead"),
} }
}, }
SocketEvent::Send { payload } => { SocketEvent::Send { payload } => {
sender.send(SocketAction::Data(handle, payload)).expect("working channel send"); sender.try_send(SocketAction::Data(handle, payload)).expect("working channel send");
} }
} }
} }

View File

@ -1,6 +1,6 @@
//! Navigator backend for web //! Navigator backend for web
use crate::SocketProxy; use crate::SocketProxy;
use async_channel::Receiver; use async_channel::{Receiver, Sender};
use futures_util::{SinkExt, StreamExt}; use futures_util::{SinkExt, StreamExt};
use gloo_net::websocket::{futures::WebSocket, Message}; use gloo_net::websocket::{futures::WebSocket, Message};
use js_sys::{Array, Uint8Array}; use js_sys::{Array, Uint8Array};
@ -15,7 +15,6 @@ use ruffle_core::socket::{ConnectionState, SocketAction, SocketHandle};
use std::borrow::Cow; use std::borrow::Cow;
use std::cell::RefCell; use std::cell::RefCell;
use std::rc::Rc; use std::rc::Rc;
use std::sync::mpsc::Sender;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tracing_subscriber::layer::Layered; use tracing_subscriber::layer::Layered;
@ -387,7 +386,7 @@ impl NavigatorBackend for WebNavigatorBackend {
else { else {
tracing::warn!("Missing WebSocket proxy for host {}, port {}", host, port); tracing::warn!("Missing WebSocket proxy for host {}, port {}", host, port);
sender sender
.send(SocketAction::Connect(handle, ConnectionState::Failed)) .try_send(SocketAction::Connect(handle, ConnectionState::Failed))
.expect("working channel send"); .expect("working channel send");
return; return;
}; };
@ -399,7 +398,7 @@ impl NavigatorBackend for WebNavigatorBackend {
Err(e) => { Err(e) => {
tracing::error!("Failed to create WebSocket, reason {:?}", e); tracing::error!("Failed to create WebSocket, reason {:?}", e);
sender sender
.send(SocketAction::Connect(handle, ConnectionState::Failed)) .try_send(SocketAction::Connect(handle, ConnectionState::Failed))
.expect("working channel send"); .expect("working channel send");
return; return;
} }
@ -407,7 +406,7 @@ impl NavigatorBackend for WebNavigatorBackend {
let (mut sink, mut stream) = ws.split(); let (mut sink, mut stream) = ws.split();
sender sender
.send(SocketAction::Connect(handle, ConnectionState::Connected)) .try_send(SocketAction::Connect(handle, ConnectionState::Connected))
.expect("working channel send"); .expect("working channel send");
// Spawn future to handle incoming messages. // Spawn future to handle incoming messages.
@ -416,12 +415,12 @@ impl NavigatorBackend for WebNavigatorBackend {
while let Some(msg) = stream.next().await { while let Some(msg) = stream.next().await {
match msg { match msg {
Ok(Message::Bytes(buf)) => stream_sender Ok(Message::Bytes(buf)) => stream_sender
.send(SocketAction::Data(handle, buf)) .try_send(SocketAction::Data(handle, buf))
.expect("working channel send"), .expect("working channel send"),
Ok(_) => tracing::warn!("Server sent unexpected text message"), Ok(_) => tracing::warn!("Server sent unexpected text message"),
Err(_) => { Err(_) => {
stream_sender stream_sender
.send(SocketAction::Close(handle)) .try_send(SocketAction::Close(handle))
.expect("working channel send"); .expect("working channel send");
return Ok(()); return Ok(());
} }
@ -437,7 +436,7 @@ impl NavigatorBackend for WebNavigatorBackend {
if let Err(e) = sink.send(Message::Bytes(msg)).await { if let Err(e) = sink.send(Message::Bytes(msg)).await {
tracing::warn!("Failed to send message to WebSocket {}", e); tracing::warn!("Failed to send message to WebSocket {}", e);
sender sender
.send(SocketAction::Close(handle)) .try_send(SocketAction::Close(handle))
.expect("working channel send"); .expect("working channel send");
} }
} }