core: Re-implement `NullExecutor`

Make it a thin abstraction layer over either the `futures` or `wasm-bindgen-futures`
crates, as already done in `render/wgpu/src/uniform_buffer.rs`,
instead of a hand-made single-thread executor.

Ideally this would also be usable on desktop, but I didn't manage to
get `LocalPool` working with `winit` (it needs to post a task to the
`EventLoopProxy` as a wake procedure).
This commit is contained in:
relrelb 2022-03-15 23:28:13 +02:00 committed by Mike Welsh
parent bb63ac2de7
commit 161071e8c4
5 changed files with 73 additions and 115 deletions

2
Cargo.lock generated
View File

@ -2971,6 +2971,7 @@ dependencies = [
"flash-lso",
"flate2",
"fnv",
"futures",
"gc-arena",
"generational-arena",
"gif",
@ -3001,6 +3002,7 @@ dependencies = [
"symphonia",
"thiserror",
"url",
"wasm-bindgen-futures",
"weak-table",
]

View File

@ -53,6 +53,12 @@ nihav_duck = { git = "https://github.com/ruffle-rs/nihav-vp6", rev = "9416fcc9fc
version = "0.2.2"
default-features = false # can't use rayon on web
[target.'cfg(not(target_family = "wasm"))'.dependencies.futures]
version = "0.3.21"
[target.'cfg(target_family = "wasm")'.dependencies.wasm-bindgen-futures]
version = "0.4"
[dev-dependencies]
approx = "0.5.1"

View File

@ -4,14 +4,10 @@ use crate::loader::Error;
use crate::string::WStr;
use indexmap::IndexMap;
use std::borrow::Cow;
use std::collections::VecDeque;
use std::fs;
use std::future::Future;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::ptr::null;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use swf::avm1::types::SendVarsMethod;
use url::{ParseError, Url};
@ -204,103 +200,72 @@ pub trait NavigatorBackend {
fn pre_process_url(&self, url: Url) -> Url;
}
/// A null implementation of an event loop that only supports blocking.
pub struct NullExecutor {
/// The list of outstanding futures spawned on this executor.
futures_queue: VecDeque<OwnedFuture<(), Error>>,
/// The source of any additional futures.
channel: Receiver<OwnedFuture<(), Error>>,
}
unsafe fn do_nothing(_data: *const ()) {}
unsafe fn clone(_data: *const ()) -> RawWaker {
NullExecutor::raw_waker()
}
const NULL_VTABLE: RawWakerVTable = RawWakerVTable::new(clone, do_nothing, do_nothing, do_nothing);
#[cfg(not(target_family = "wasm"))]
pub struct NullExecutor(futures::executor::LocalPool);
#[cfg(not(target_family = "wasm"))]
impl NullExecutor {
/// Construct a new executor.
///
/// The sender yielded as part of construction should be given to a
/// `NullNavigatorBackend` so that it can spawn futures on this executor.
pub fn new() -> (Self, Sender<OwnedFuture<(), Error>>) {
let (send, recv) = channel();
(
Self {
futures_queue: VecDeque::new(),
channel: recv,
},
send,
)
pub fn new() -> Self {
Self(futures::executor::LocalPool::new())
}
/// Construct a do-nothing raw waker.
///
/// The RawWaker, because the RawWaker
/// interface normally deals with unchecked pointers. We instead just hand
/// it a null pointer and do nothing with it, which is trivially sound.
fn raw_waker() -> RawWaker {
RawWaker::new(null(), &NULL_VTABLE)
pub fn spawner(&self) -> NullSpawner {
NullSpawner(self.0.spawner())
}
/// Copy all outstanding futures into the local queue.
fn flush_channel(&mut self) {
for future in self.channel.try_iter() {
self.futures_queue.push_back(future);
pub fn run(&mut self) {
self.0.run();
}
}
impl Default for NullExecutor {
fn default() -> Self {
Self::new()
}
}
#[cfg(not(target_family = "wasm"))]
pub struct NullSpawner(futures::executor::LocalSpawner);
#[cfg(not(target_family = "wasm"))]
impl NullSpawner {
pub fn spawn_local(&self, future: OwnedFuture<(), Error>) {
use futures::task::LocalSpawnExt;
let _ = self.0.spawn_local(async move {
if let Err(e) = future.await {
log::error!("Asynchronous error occurred: {}", e);
}
});
}
}
#[cfg(target_family = "wasm")]
pub struct NullExecutor;
#[cfg(target_family = "wasm")]
impl NullExecutor {
pub fn new() -> Self {
Self
}
/// Poll all in-progress futures.
///
/// If any task in the executor yields an error, then this function will
/// stop polling futures and return that error. Otherwise, it will yield
/// `Ok`, indicating that no errors occurred. More work may still be
/// available,
pub fn poll_all(&mut self) -> Result<(), Error> {
self.flush_channel();
let mut unfinished_futures = VecDeque::new();
let mut result = Ok(());
while let Some(mut future) = self.futures_queue.pop_front() {
let waker = unsafe { Waker::from_raw(Self::raw_waker()) };
let mut context = Context::from_waker(&waker);
match future.as_mut().poll(&mut context) {
Poll::Ready(v) if v.is_err() => {
result = v;
break;
}
Poll::Ready(_) => continue,
Poll::Pending => unfinished_futures.push_back(future),
}
pub fn spawner(&self) -> NullSpawner {
NullSpawner
}
for future in unfinished_futures {
self.futures_queue.push_back(future);
pub fn run(&mut self) {}
}
#[cfg(target_family = "wasm")]
pub struct NullSpawner;
#[cfg(target_family = "wasm")]
impl NullSpawner {
pub fn spawn_local(&self, future: OwnedFuture<(), Error>) {
wasm_bindgen_futures::spawn_local(async move {
if let Err(e) = future.await {
log::error!("Asynchronous error occurred: {}", e);
}
result
}
/// Check if work remains in the executor.
pub fn has_work(&mut self) -> bool {
self.flush_channel();
!self.futures_queue.is_empty()
}
/// Block until all futures complete or an error occurs.
pub fn block_all(&mut self) -> Result<(), Error> {
while self.has_work() {
self.poll_all()?;
}
Ok(())
});
}
}
@ -309,35 +274,25 @@ impl NullExecutor {
/// The NullNavigatorBackend includes a trivial executor that holds owned
/// futures and runs them to completion, blockingly.
pub struct NullNavigatorBackend {
/// The channel upon which all spawned futures will be sent.
channel: Option<Sender<OwnedFuture<(), Error>>>,
spawner: NullSpawner,
/// The base path for all relative fetches.
relative_base_path: PathBuf,
}
impl NullNavigatorBackend {
/// Construct a default navigator backend with no async or fetch
/// capability.
pub fn new() -> Self {
NullNavigatorBackend {
channel: None,
let executor = NullExecutor::new();
Self {
spawner: executor.spawner(),
relative_base_path: PathBuf::new(),
}
}
/// Construct a navigator backend with fetch and async capability.
pub fn with_base_path<P: AsRef<Path>>(
path: P,
channel: Sender<OwnedFuture<(), Error>>,
) -> Self {
let mut relative_base_path = PathBuf::new();
relative_base_path.push(path);
NullNavigatorBackend {
channel: Some(channel),
relative_base_path,
pub fn with_base_path(path: &Path, executor: &NullExecutor) -> Self {
Self {
spawner: executor.spawner(),
relative_base_path: path.to_path_buf(),
}
}
}
@ -365,9 +320,7 @@ impl NavigatorBackend for NullNavigatorBackend {
}
fn spawn_future(&mut self, future: OwnedFuture<(), Error>) {
if let Some(channel) = self.channel.as_ref() {
channel.send(future).unwrap();
}
self.spawner.spawn_local(future);
}
fn resolve_relative_url<'a>(&self, url: &'a str) -> Cow<'a, str> {

View File

@ -13,24 +13,21 @@ use ruffle_core::swf::{decompress_swf, parse_swf};
use ruffle_core::tag_utils::SwfMovie;
use ruffle_core::Player;
use sha2::{Digest, Sha256};
use std::path::Path;
use std::panic::catch_unwind;
use std::io::{stdout, Write};
use std::panic::catch_unwind;
use std::path::Path;
use std::sync::Arc;
use std::time::{Duration, Instant};
fn execute_swf(file: &Path) {
let base_path = file.parent().unwrap();
let (_executor, channel) = NullExecutor::new();
let executor = NullExecutor::new();
let movie = SwfMovie::from_path(file, None).unwrap();
let frame_time = 1000.0 / movie.frame_rate().to_f64();
let player = Player::new(
Box::new(NullRenderer::new()),
Box::new(NullAudioBackend::new()),
Box::new(NullNavigatorBackend::with_base_path(base_path, channel)),
Box::new(NullNavigatorBackend::with_base_path(base_path, &executor)),
Box::new(MemoryStorageBackend::default()),
Box::new(NullVideoBackend::new()),
Box::new(ScanLogBackend::new()),

View File

@ -1073,7 +1073,7 @@ fn run_swf(
check_img &= RUN_IMG_TESTS;
let base_path = Path::new(swf_path).parent().unwrap();
let (mut executor, channel) = NullExecutor::new();
let mut executor = NullExecutor::new();
let movie = SwfMovie::from_path(swf_path, None)?;
let frame_time = 1000.0 / movie.frame_rate().to_f64();
let trace_output = Rc::new(RefCell::new(Vec::new()));
@ -1115,7 +1115,7 @@ fn run_swf(
let player = Player::new(
render_backend,
Box::new(NullAudioBackend::new()),
Box::new(NullNavigatorBackend::with_base_path(base_path, channel)),
Box::new(NullNavigatorBackend::with_base_path(base_path, &executor)),
Box::new(MemoryStorageBackend::default()),
video_backend,
Box::new(TestLogBackend::new(trace_output.clone())),
@ -1132,7 +1132,7 @@ fn run_swf(
for _ in 0..num_frames {
player.lock().unwrap().run_frame();
player.lock().unwrap().update_timers(frame_time);
executor.poll_all().unwrap();
executor.run();
}
// Render the image to disk
@ -1193,7 +1193,7 @@ fn run_swf(
before_end(player)?;
executor.block_all().unwrap();
executor.run();
let trace = trace_output.borrow().join("\n");
Ok(trace)