desktop: Make WinitAsyncExecutor thread-safe and prevent deadlocks

Not thread-safe WinitAsyncExecutor was wrapped with a Mutex, making
any access to the instance exclusive. It was problematic, because
when a task was awoken from within another task, it resulted in
a deadlock, because the mutex was not reentrant.

This patch makes WinitAsyncExecutor thread-safe and changes how tasks
are locked. Specifically, instead of locking all tasks, only
the currently executed task is being locked. This allows waking any
other task from within it.
This commit is contained in:
Kamil Jarosz 2024-03-04 00:12:55 +01:00 committed by Nathan Adams
parent de8f63ec88
commit 3d04971ae4
2 changed files with 50 additions and 39 deletions

View File

@ -6,7 +6,8 @@ use async_channel::{unbounded, Receiver, Sender};
use ruffle_core::backend::navigator::OwnedFuture;
use ruffle_core::loader::Error;
use slotmap::{new_key_type, SlotMap};
use std::sync::{Arc, Mutex, Weak};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard, Weak};
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use winit::event_loop::EventLoopProxy;
@ -28,12 +29,12 @@ struct TaskHandle {
///
/// Weak reference ensures that the executor along
/// with its tasks is dropped properly.
executor: Weak<Mutex<WinitAsyncExecutor>>,
executor: Weak<WinitAsyncExecutor>,
}
impl TaskHandle {
/// Construct a handle to a given task.
fn for_task(task: TaskKey, executor: Weak<Mutex<WinitAsyncExecutor>>) -> Self {
fn for_task(task: TaskKey, executor: Weak<WinitAsyncExecutor>) -> Self {
Self {
handle: task,
executor,
@ -56,10 +57,7 @@ impl TaskHandle {
/// Wake the task this context refers to.
fn wake(&self) {
if let Some(executor) = self.executor.upgrade() {
executor
.lock()
.expect("able to lock executor")
.wake(self.handle);
executor.wake(self.handle);
}
}
@ -133,19 +131,19 @@ impl TaskHandle {
pub struct WinitAsyncExecutor {
/// List of all spawned tasks.
task_queue: SlotMap<TaskKey, Task>,
task_queue: RwLock<SlotMap<TaskKey, Mutex<Task>>>,
/// Source of tasks sent to us by the `NavigatorBackend`.
channel: Receiver<OwnedFuture<(), Error>>,
/// Weak reference to ourselves.
self_ref: Weak<Mutex<Self>>,
self_ref: Weak<Self>,
/// Event injector for the main thread event loop.
event_loop: EventLoopProxy<RuffleEvent>,
/// Whether or not we have already queued a `TaskPoll` event.
waiting_for_poll: bool,
/// Whether we have already queued a `TaskPoll` event.
waiting_for_poll: AtomicBool,
}
impl WinitAsyncExecutor {
@ -153,16 +151,14 @@ impl WinitAsyncExecutor {
///
/// This function returns the executor itself, plus the `Sender` necessary
/// to spawn new tasks.
pub fn new(event_loop: EventLoopProxy<RuffleEvent>) -> (Arc<Mutex<Self>>, WinitFutureSpawner) {
pub fn new(event_loop: EventLoopProxy<RuffleEvent>) -> (Arc<Self>, WinitFutureSpawner) {
let (send, recv) = unbounded();
let new_self = Arc::new_cyclic(|self_ref| {
Mutex::new(Self {
task_queue: SlotMap::with_key(),
channel: recv,
self_ref: self_ref.clone(),
event_loop: event_loop.clone(),
waiting_for_poll: false,
})
let new_self = Arc::new_cyclic(|self_ref| Self {
task_queue: RwLock::new(SlotMap::with_key()),
channel: recv,
self_ref: self_ref.clone(),
event_loop: event_loop.clone(),
waiting_for_poll: AtomicBool::new(false),
});
(
new_self,
@ -171,16 +167,15 @@ impl WinitAsyncExecutor {
}
/// Poll all `Ready` futures.
pub fn poll_all(&mut self) {
self.waiting_for_poll = false;
pub fn poll_all(&self) {
self.waiting_for_poll.store(false, Ordering::SeqCst);
while let Ok(fut) = self.channel.try_recv() {
self.task_queue.insert(Task::from_future(fut));
}
self.insert_tasks();
let mut completed_tasks = vec![];
for (index, task) in self.task_queue.iter_mut() {
for (index, task) in self.read_task_queue().iter() {
let mut task = task.lock().expect("non-poisoned task");
if task.is_ready() {
let handle = TaskHandle::for_task(index, self.self_ref.clone());
let waker = handle.waker();
@ -199,18 +194,16 @@ impl WinitAsyncExecutor {
}
}
for index in completed_tasks {
self.task_queue.remove(index);
}
self.remove_tasks(completed_tasks);
}
/// Mark a task as ready to proceed.
fn wake(&mut self, task: TaskKey) {
if let Some(task) = self.task_queue.get_mut(task) {
fn wake(&self, task: TaskKey) {
if let Some(task) = self.read_task_queue().get(task) {
let mut task = task.lock().expect("non-poisoned task");
if !task.is_completed() {
task.set_ready();
if !self.waiting_for_poll {
self.waiting_for_poll = true;
if !self.waiting_for_poll.swap(true, Ordering::SeqCst) {
if self.event_loop.send_event(RuffleEvent::TaskPoll).is_err() {
tracing::warn!("A task was queued on an event loop that has already ended. It will not be polled.");
}
@ -226,6 +219,28 @@ impl WinitAsyncExecutor {
tracing::warn!("Attempted to wake an already-finished task");
}
}
fn insert_tasks(&self) {
let mut queue = self.write_task_queue();
while let Ok(fut) = self.channel.try_recv() {
queue.insert(Mutex::new(Task::from_future(fut)));
}
}
fn remove_tasks(&self, completed_tasks: Vec<TaskKey>) {
let mut queue = self.write_task_queue();
for index in completed_tasks {
queue.remove(index);
}
}
fn write_task_queue(&self) -> RwLockWriteGuard<'_, SlotMap<TaskKey, Mutex<Task>>> {
self.task_queue.write().expect("non-poisoned task queue")
}
fn read_task_queue(&self) -> RwLockReadGuard<'_, SlotMap<TaskKey, Mutex<Task>>> {
self.task_queue.read().expect("non-poisoned task queue")
}
}
pub trait FutureSpawner {

View File

@ -93,7 +93,7 @@ impl From<&GlobalPreferences> for PlayerOptions {
/// which may be lost when this Player is closed (dropped)
struct ActivePlayer {
player: Arc<Mutex<Player>>,
executor: Arc<Mutex<WinitAsyncExecutor>>,
executor: Arc<WinitAsyncExecutor>,
}
impl ActivePlayer {
@ -342,11 +342,7 @@ impl PlayerController {
pub fn poll(&self) {
if let Some(player) = &self.player {
player
.executor
.lock()
.expect("Executor lock must be available")
.poll_all()
player.executor.poll_all()
}
}
}