web: Close sockets properly

This patch ensures that sockets are closed properly.
Without it, when the application-end of the socket was closed,
the other end was left intact, and the socket was not closed.
This commit is contained in:
Kamil Jarosz 2024-02-25 23:46:27 +01:00 committed by Nathan Adams
parent 6e84b05b66
commit bb79e8ad5d
1 changed files with 34 additions and 30 deletions

View File

@ -1,7 +1,8 @@
//! Navigator backend for web
use crate::SocketProxy;
use async_channel::{Receiver, Sender};
use futures_util::{SinkExt, StreamExt};
use futures_util::future::Either;
use futures_util::{future, SinkExt, StreamExt};
use gloo_net::websocket::{futures::WebSocket, Message};
use js_sys::{Array, Uint8Array};
use ruffle_core::backend::navigator::{
@ -289,7 +290,7 @@ impl NavigatorBackend for WebNavigatorBackend {
"Unable to create request for",
url.as_str(),
"",
)
);
}
};
@ -404,42 +405,45 @@ impl NavigatorBackend for WebNavigatorBackend {
}
};
let (mut sink, mut stream) = ws.split();
let (mut ws_write, mut ws_read) = ws.split();
sender
.try_send(SocketAction::Connect(handle, ConnectionState::Connected))
.expect("working channel send");
// Spawn future to handle incoming messages.
let stream_sender = sender.clone();
self.spawn_future(Box::pin(async move {
while let Some(msg) = stream.next().await {
match msg {
Ok(Message::Bytes(buf)) => stream_sender
.try_send(SocketAction::Data(handle, buf))
.expect("working channel send"),
Ok(_) => tracing::warn!("Server sent unexpected text message"),
Err(_) => {
stream_sender
.try_send(SocketAction::Close(handle))
.expect("working channel send");
return Ok(());
loop {
match future::select(ws_read.next(), std::pin::pin!(receiver.recv())).await {
// Handle incoming messages.
Either::Left((Some(msg), _)) => match msg {
Ok(Message::Bytes(buf)) => sender
.try_send(SocketAction::Data(handle, buf))
.expect("working channel send"),
Ok(_) => tracing::warn!("Server sent an unexpected text message"),
Err(_) => {
sender
.try_send(SocketAction::Close(handle))
.expect("working channel send");
break;
}
},
// Handle outgoing messages.
Either::Right((Ok(msg), _)) => {
if let Err(e) = ws_write.send(Message::Bytes(msg)).await {
tracing::warn!("Failed to send message to WebSocket {}", e);
sender
.try_send(SocketAction::Close(handle))
.expect("working channel send");
}
}
}
// The connection was closed.
_ => break,
};
}
Ok(())
}));
// Spawn future to handle outgoing messages.
self.spawn_future(Box::pin(async move {
while let Ok(msg) = receiver.recv().await {
if let Err(e) = sink.send(Message::Bytes(msg)).await {
tracing::warn!("Failed to send message to WebSocket {}", e);
sender
.try_send(SocketAction::Close(handle))
.expect("working channel send");
}
}
let ws = ws_write
.reunite(ws_read)
.expect("both originate from the same websocket");
let _ = ws.close(None, None);
Ok(())
}));