web: Use single-threaded shared ownership for body stream

This commit is contained in:
David Wendt 2024-01-07 20:16:03 -05:00 committed by kmeisthax
parent 899e02fd82
commit 9f70fab5b5
1 changed files with 8 additions and 6 deletions

View File

@ -13,8 +13,10 @@ use ruffle_core::indexmap::IndexMap;
use ruffle_core::loader::Error;
use ruffle_core::socket::{ConnectionState, SocketAction, SocketHandle};
use std::borrow::Cow;
use std::cell::RefCell;
use std::rc::Rc;
use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::time::Duration;
use tracing_subscriber::layer::Layered;
use tracing_subscriber::Registry;
@ -447,7 +449,7 @@ impl NavigatorBackend for WebNavigatorBackend {
struct WebResponseWrapper {
response: WebResponse,
body_stream: Option<Arc<Mutex<ReadableStream>>>,
body_stream: Option<Rc<RefCell<ReadableStream>>>,
}
impl SuccessResponse for WebResponseWrapper {
@ -484,7 +486,7 @@ impl SuccessResponse for WebResponseWrapper {
self.response.redirected()
}
#[allow(clippy::await_holding_lock)]
#[allow(clippy::await_holding_refcell_ref)]
fn next_chunk(&mut self) -> OwnedFuture<Option<Vec<u8>>, Error> {
if self.body_stream.is_none() {
let body = self.response.body();
@ -492,15 +494,15 @@ impl SuccessResponse for WebResponseWrapper {
return Box::pin(async move { Ok(None) });
}
self.body_stream = Some(Arc::new(Mutex::new(ReadableStream::from_raw(
self.body_stream = Some(Rc::new(RefCell::new(ReadableStream::from_raw(
body.expect("body").unchecked_into(),
))));
}
let body_stream = self.body_stream.clone().expect("web body stream");
Box::pin(async move {
let read_lock = body_stream.try_lock();
if matches!(read_lock, Err(std::sync::TryLockError::WouldBlock)) {
let read_lock = body_stream.try_borrow_mut();
if read_lock.is_err() {
return Err(Error::FetchError(
"Concurrent read operations on the same stream are not supported.".to_string(),
));