core: Add method to read downloads in chunks instead of all-at-once.

This commit is contained in:
David Wendt 2023-04-01 00:18:36 -04:00 committed by kmeisthax
parent 06eb2e1ee8
commit 91dd9563bb
6 changed files with 218 additions and 28 deletions

15
Cargo.lock generated
View File

@ -4576,6 +4576,7 @@ dependencies = [
"base64",
"chrono",
"console_error_panic_hook",
"futures",
"futures-util",
"generational-arena",
"getrandom",
@ -4599,6 +4600,7 @@ dependencies = [
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"wasm-streams",
"web-sys",
]
@ -5821,6 +5823,19 @@ version = "0.2.90"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d91413b1c31d7539ba5ef2451af3f0b833a005eb27a631cec32bc0635a8602b"
[[package]]
name = "wasm-streams"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4609d447824375f43e1ffbc051b50ad8f4b3ae8219680c94452ea05eb240ac7"
dependencies = [
"futures-util",
"js-sys",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
]
[[package]]
name = "wayland-backend"
version = "0.3.2"

View File

@ -9,7 +9,9 @@ use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::fmt;
use std::fmt::Display;
use std::fs::File;
use std::future::Future;
use std::io::Read;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::mpsc::Sender;
@ -201,6 +203,18 @@ pub trait SuccessResponse {
/// Indicates if the request has been redirected.
fn redirected(&self) -> bool;
/// Read the next chunk of the response.
///
/// Repeated calls to `next_chunk` yield further bytes of the response body.
/// A response that has no data or no more data to yield will instead
/// yield None.
///
/// The size of yielded chunks is implementation-defined.
///
/// Mixing `next_chunk` and `body` is not supported and may yield errors.
/// Use one or the other.
fn next_chunk(&mut self) -> OwnedFuture<Option<Vec<u8>>, Error>;
}
/// A response to a non-successful fetch request.
@ -550,6 +564,7 @@ pub fn fetch_path<NavigatorType: NavigatorBackend>(
struct LocalResponse {
url: String,
path: PathBuf,
open_file: Option<File>,
status: u16,
redirected: bool,
}
@ -572,6 +587,33 @@ pub fn fetch_path<NavigatorType: NavigatorBackend>(
fn redirected(&self) -> bool {
self.redirected
}
fn next_chunk(&mut self) -> OwnedFuture<Option<Vec<u8>>, Error> {
if self.open_file.is_none() {
let result = std::fs::File::open(self.path.clone())
.map_err(|e| Error::FetchError(e.to_string()));
match result {
Ok(file) => self.open_file = Some(file),
Err(e) => return Box::pin(async move { Err(e) }),
}
}
let file = self.open_file.as_mut().unwrap();
let mut buf = vec![0; 4096];
let res = file.read(&mut buf);
Box::pin(async move {
match res {
Ok(count) if count > 0 => {
buf.resize(count, 0);
Ok(Some(buf))
}
Ok(_) => Ok(None),
Err(e) => Err(Error::FetchError(e.to_string())),
}
})
}
}
let url = match navigator.resolve_url(url) {
@ -617,6 +659,7 @@ pub fn fetch_path<NavigatorType: NavigatorBackend>(
let response: Box<dyn SuccessResponse> = Box::new(LocalResponse {
url: url.to_string(),
path,
open_file: None,
status: 0,
redirected: false,
});

View File

@ -9,7 +9,8 @@ use futures::{AsyncReadExt, AsyncWriteExt};
use futures_lite::FutureExt;
use isahc::http::{HeaderName, HeaderValue};
use isahc::{
config::RedirectPolicy, prelude::*, AsyncReadResponseExt, HttpClient, Request as IsahcRequest,
config::RedirectPolicy, prelude::*, AsyncBody, AsyncReadResponseExt, HttpClient,
Request as IsahcRequest, Response as IsahcResponse,
};
use rfd::{AsyncMessageDialog, MessageButtons, MessageDialog, MessageDialogResult, MessageLevel};
use ruffle_core::backend::navigator::{
@ -20,11 +21,13 @@ use ruffle_core::indexmap::IndexMap;
use ruffle_core::loader::Error;
use ruffle_core::socket::{ConnectionState, SocketAction, SocketHandle};
use std::collections::HashSet;
use std::io;
use std::fs::File;
use std::io::ErrorKind;
use std::io::{self, Read};
use std::rc::Rc;
use std::str::FromStr;
use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tracing::warn;
use url::{ParseError, Url};
@ -172,9 +175,21 @@ impl NavigatorBackend for ExternalNavigatorBackend {
}
fn fetch(&self, request: Request) -> OwnedFuture<Box<dyn SuccessResponse>, ErrorResponse> {
enum DesktopResponseBody {
/// The response's body comes from a file.
File(File),
/// The response's body comes from the network.
///
/// This has to be stored in shared ownerhsip so that we can return
/// owned futures. A synchronous lock is used here as we do not
/// expect contention on this lock.
Network(Arc<Mutex<IsahcResponse<AsyncBody>>>),
}
struct DesktopResponse {
url: String,
body: Vec<u8>,
response_body: DesktopResponseBody,
status: u16,
redirected: bool,
}
@ -184,8 +199,28 @@ impl NavigatorBackend for ExternalNavigatorBackend {
std::borrow::Cow::Borrowed(&self.url)
}
#[allow(clippy::await_holding_lock)]
fn body(self: Box<Self>) -> OwnedFuture<Vec<u8>, Error> {
Box::pin(async { Ok(self.body) })
match self.response_body {
DesktopResponseBody::File(mut file) => Box::pin(async move {
let mut body = vec![];
file.read_to_end(&mut body)
.map_err(|e| Error::FetchError(e.to_string()))?;
Ok(body)
}),
DesktopResponseBody::Network(response) => Box::pin(async move {
let mut body = vec![];
response
.lock()
.expect("working lock during fetch body read")
.copy_to(&mut body)
.await
.map_err(|e| Error::FetchError(e.to_string()))?;
Ok(body)
}),
}
}
fn status(&self) -> u16 {
@ -195,6 +230,56 @@ impl NavigatorBackend for ExternalNavigatorBackend {
fn redirected(&self) -> bool {
self.redirected
}
#[allow(clippy::await_holding_lock)]
fn next_chunk(&mut self) -> OwnedFuture<Option<Vec<u8>>, Error> {
match &mut self.response_body {
DesktopResponseBody::File(file) => {
let mut buf = vec![0; 4096];
let res = file.read(&mut buf);
Box::pin(async move {
match res {
Ok(count) if count > 0 => {
buf.resize(count, 0);
Ok(Some(buf))
}
Ok(_) => Ok(None),
Err(e) => Err(Error::FetchError(e.to_string())),
}
})
}
DesktopResponseBody::Network(response) => {
let response = response.clone();
Box::pin(async move {
let mut buf = vec![0; 4096];
let lock = response.try_lock();
if matches!(lock, Err(std::sync::TryLockError::WouldBlock)) {
return Err(Error::FetchError(
"Concurrent read operations on the same stream are not supported."
.to_string(),
));
}
let result = lock
.expect("desktop network lock")
.body_mut()
.read(&mut buf)
.await;
match result {
Ok(count) if count > 0 => {
buf.resize(count, 0);
Ok(Some(buf))
}
Ok(_) => Ok(None),
Err(e) => Err(Error::FetchError(e.to_string())),
}
})
}
}
}
}
// TODO: honor sandbox type (local-with-filesystem, local-with-network, remote, ...)
@ -228,7 +313,7 @@ impl NavigatorBackend for ExternalNavigatorBackend {
}
};
let contents = std::fs::read(&path).or_else(|e| {
let contents = std::fs::File::open(&path).or_else(|e| {
if cfg!(feature = "sandbox") {
use rfd::FileDialog;
@ -242,7 +327,7 @@ impl NavigatorBackend for ExternalNavigatorBackend {
if attempt_sandbox_open {
FileDialog::new().set_directory(&path).pick_folder();
return std::fs::read(&path);
return std::fs::File::open(&path);
}
}
}
@ -250,8 +335,8 @@ impl NavigatorBackend for ExternalNavigatorBackend {
Err(e)
});
let body = match contents {
Ok(body) => body,
let file = match contents {
Ok(file) => file,
Err(e) => {
return create_specific_fetch_error(
"Can't open file",
@ -263,7 +348,7 @@ impl NavigatorBackend for ExternalNavigatorBackend {
let response: Box<dyn SuccessResponse> = Box::new(DesktopResponse {
url: response_url.to_string(),
body,
response_body: DesktopResponseBody::File(file),
status: 0,
redirected: false,
});
@ -308,7 +393,7 @@ impl NavigatorBackend for ExternalNavigatorBackend {
error: Error::FetchError(e.to_string()),
})?;
let mut response = client.send_async(body).await.map_err(|e| {
let response = client.send_async(body).await.map_err(|e| {
let inner = match e.kind() {
isahc::error::ErrorKind::NameResolution => {
Error::InvalidDomain(processed_url.to_string())
@ -339,18 +424,9 @@ impl NavigatorBackend for ExternalNavigatorBackend {
return Err(ErrorResponse { url, error });
}
let mut body = vec![];
response
.copy_to(&mut body)
.await
.map_err(|e| ErrorResponse {
url: url.clone(),
error: Error::FetchError(e.to_string()),
})?;
let response: Box<dyn SuccessResponse> = Box::new(DesktopResponse {
url,
body,
response_body: DesktopResponseBody::Network(Arc::new(Mutex::new(response))),
status,
redirected,
});

View File

@ -20,6 +20,7 @@ use vfs::VfsPath;
struct TestResponse {
url: String,
body: Vec<u8>,
chunk_gotten: bool,
status: u16,
redirected: bool,
}
@ -33,6 +34,16 @@ impl SuccessResponse for TestResponse {
Box::pin(async move { Ok(self.body) })
}
fn next_chunk(&mut self) -> OwnedFuture<Option<Vec<u8>>, Error> {
if !self.chunk_gotten {
self.chunk_gotten = true;
let body = self.body.clone();
Box::pin(async move { Ok(Some(body)) })
} else {
Box::pin(async move { Ok(None) })
}
}
fn status(&self) -> u16 {
self.status
}
@ -103,6 +114,7 @@ impl NavigatorBackend for TestNavigatorBackend {
let response: Box<dyn SuccessResponse> = Box::new(TestResponse {
url: request.url().to_string(),
body: b"Hello, World!".to_vec(),
chunk_gotten: false,
status: 200,
redirected: false,
});
@ -216,6 +228,7 @@ impl NavigatorBackend for TestNavigatorBackend {
let response: Box<dyn SuccessResponse> = Box::new(TestResponse {
url: url.to_string(),
body,
chunk_gotten: false,
status: 0,
redirected: false,
});

View File

@ -57,6 +57,8 @@ async-channel = "2.1.1"
futures-util = { version = "0.3.30", features = ["sink"] }
gloo-net = { version = "0.5.0", default-features = false, features = ["websocket"] }
rfd = { version = "0.13.0", features = ["file-handle-inner"] }
wasm-streams = "0.3.0"
futures = "0.3.0"
[dependencies.ruffle_core]
path = "../core"
@ -70,5 +72,5 @@ features = [
"ChannelMergerNode", "ChannelSplitterNode", "ClipboardEvent", "DataTransfer", "Element", "Event",
"EventTarget", "GainNode", "Headers", "HtmlCanvasElement", "HtmlDocument", "HtmlElement", "HtmlFormElement",
"HtmlInputElement", "HtmlTextAreaElement", "KeyboardEvent", "Location", "PointerEvent",
"Request", "RequestInit", "Response", "Storage", "WheelEvent", "Window", "RequestCredentials"
"Request", "RequestInit", "Response", "Storage", "WheelEvent", "Window", "ReadableStream", "RequestCredentials"
]

View File

@ -14,7 +14,7 @@ use ruffle_core::loader::Error;
use ruffle_core::socket::{ConnectionState, SocketAction, SocketHandle};
use std::borrow::Cow;
use std::sync::mpsc::Sender;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tracing_subscriber::layer::Layered;
use tracing_subscriber::Registry;
@ -22,6 +22,7 @@ use tracing_wasm::WASMLayer;
use url::{ParseError, Url};
use wasm_bindgen::JsCast;
use wasm_bindgen_futures::{spawn_local, JsFuture};
use wasm_streams::readable::ReadableStream;
use web_sys::{
window, Blob, BlobPropertyBag, HtmlFormElement, HtmlInputElement, Request as WebRequest,
RequestCredentials, RequestInit, Response as WebResponse,
@ -327,7 +328,10 @@ impl NavigatorBackend for WebNavigatorBackend {
return Err(ErrorResponse { url, error });
}
let wrapper: Box<dyn SuccessResponse> = Box::new(WebResponseWrapper(response));
let wrapper: Box<dyn SuccessResponse> = Box::new(WebResponseWrapper {
response,
body_stream: None,
});
Ok(wrapper)
})
@ -441,17 +445,20 @@ impl NavigatorBackend for WebNavigatorBackend {
}
}
struct WebResponseWrapper(WebResponse);
struct WebResponseWrapper {
response: WebResponse,
body_stream: Option<Arc<Mutex<ReadableStream>>>,
}
impl SuccessResponse for WebResponseWrapper {
fn url(&self) -> Cow<str> {
Cow::Owned(self.0.url())
Cow::Owned(self.response.url())
}
fn body(self: Box<Self>) -> OwnedFuture<Vec<u8>, Error> {
Box::pin(async move {
let body = JsFuture::from(
self.0
self.response
.array_buffer()
.map_err(|_| Error::FetchError("Got JS error".to_string()))?,
)
@ -470,10 +477,44 @@ impl SuccessResponse for WebResponseWrapper {
}
fn status(&self) -> u16 {
self.0.status()
self.response.status()
}
fn redirected(&self) -> bool {
self.0.redirected()
self.response.redirected()
}
#[allow(clippy::await_holding_lock)]
fn next_chunk(&mut self) -> OwnedFuture<Option<Vec<u8>>, Error> {
if self.body_stream.is_none() {
let body = self.response.body();
if body.is_none() {
return Box::pin(async move { Ok(None) });
}
self.body_stream = Some(Arc::new(Mutex::new(ReadableStream::from_raw(
body.expect("body").unchecked_into(),
))));
}
let body_stream = self.body_stream.clone().expect("web body stream");
Box::pin(async move {
let read_lock = body_stream.try_lock();
if matches!(read_lock, Err(std::sync::TryLockError::WouldBlock)) {
return Err(Error::FetchError(
"Concurrent read operations on the same stream are not supported.".to_string(),
));
}
let mut read_lock = read_lock.expect("web response reader");
let mut body_reader = read_lock.get_reader();
let chunk = body_reader.read();
match chunk.await {
Ok(Some(chunk)) => Ok(Some(Uint8Array::new(&chunk).to_vec())),
Ok(None) => Ok(None),
Err(_) => Err(Error::FetchError("Cannot read next chunk".to_string())), //TODO: JsValue to string?!
}
})
}
}