desktop: Make WinitAsyncExecutor thread-safe and prevent deadlocks
Not thread-safe WinitAsyncExecutor was wrapped with a Mutex, making any access to the instance exclusive. It was problematic, because when a task was awoken from within another task, it resulted in a deadlock, because the mutex was not reentrant. This patch makes WinitAsyncExecutor thread-safe and changes how tasks are locked. Specifically, instead of locking all tasks, only the currently executed task is being locked. This allows waking any other task from within it.
This commit is contained in:
parent
de8f63ec88
commit
3d04971ae4
|
@ -6,7 +6,8 @@ use async_channel::{unbounded, Receiver, Sender};
|
|||
use ruffle_core::backend::navigator::OwnedFuture;
|
||||
use ruffle_core::loader::Error;
|
||||
use slotmap::{new_key_type, SlotMap};
|
||||
use std::sync::{Arc, Mutex, Weak};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard, Weak};
|
||||
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
|
||||
use winit::event_loop::EventLoopProxy;
|
||||
|
||||
|
@ -28,12 +29,12 @@ struct TaskHandle {
|
|||
///
|
||||
/// Weak reference ensures that the executor along
|
||||
/// with its tasks is dropped properly.
|
||||
executor: Weak<Mutex<WinitAsyncExecutor>>,
|
||||
executor: Weak<WinitAsyncExecutor>,
|
||||
}
|
||||
|
||||
impl TaskHandle {
|
||||
/// Construct a handle to a given task.
|
||||
fn for_task(task: TaskKey, executor: Weak<Mutex<WinitAsyncExecutor>>) -> Self {
|
||||
fn for_task(task: TaskKey, executor: Weak<WinitAsyncExecutor>) -> Self {
|
||||
Self {
|
||||
handle: task,
|
||||
executor,
|
||||
|
@ -56,10 +57,7 @@ impl TaskHandle {
|
|||
/// Wake the task this context refers to.
|
||||
fn wake(&self) {
|
||||
if let Some(executor) = self.executor.upgrade() {
|
||||
executor
|
||||
.lock()
|
||||
.expect("able to lock executor")
|
||||
.wake(self.handle);
|
||||
executor.wake(self.handle);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -133,19 +131,19 @@ impl TaskHandle {
|
|||
|
||||
pub struct WinitAsyncExecutor {
|
||||
/// List of all spawned tasks.
|
||||
task_queue: SlotMap<TaskKey, Task>,
|
||||
task_queue: RwLock<SlotMap<TaskKey, Mutex<Task>>>,
|
||||
|
||||
/// Source of tasks sent to us by the `NavigatorBackend`.
|
||||
channel: Receiver<OwnedFuture<(), Error>>,
|
||||
|
||||
/// Weak reference to ourselves.
|
||||
self_ref: Weak<Mutex<Self>>,
|
||||
self_ref: Weak<Self>,
|
||||
|
||||
/// Event injector for the main thread event loop.
|
||||
event_loop: EventLoopProxy<RuffleEvent>,
|
||||
|
||||
/// Whether or not we have already queued a `TaskPoll` event.
|
||||
waiting_for_poll: bool,
|
||||
/// Whether we have already queued a `TaskPoll` event.
|
||||
waiting_for_poll: AtomicBool,
|
||||
}
|
||||
|
||||
impl WinitAsyncExecutor {
|
||||
|
@ -153,16 +151,14 @@ impl WinitAsyncExecutor {
|
|||
///
|
||||
/// This function returns the executor itself, plus the `Sender` necessary
|
||||
/// to spawn new tasks.
|
||||
pub fn new(event_loop: EventLoopProxy<RuffleEvent>) -> (Arc<Mutex<Self>>, WinitFutureSpawner) {
|
||||
pub fn new(event_loop: EventLoopProxy<RuffleEvent>) -> (Arc<Self>, WinitFutureSpawner) {
|
||||
let (send, recv) = unbounded();
|
||||
let new_self = Arc::new_cyclic(|self_ref| {
|
||||
Mutex::new(Self {
|
||||
task_queue: SlotMap::with_key(),
|
||||
channel: recv,
|
||||
self_ref: self_ref.clone(),
|
||||
event_loop: event_loop.clone(),
|
||||
waiting_for_poll: false,
|
||||
})
|
||||
let new_self = Arc::new_cyclic(|self_ref| Self {
|
||||
task_queue: RwLock::new(SlotMap::with_key()),
|
||||
channel: recv,
|
||||
self_ref: self_ref.clone(),
|
||||
event_loop: event_loop.clone(),
|
||||
waiting_for_poll: AtomicBool::new(false),
|
||||
});
|
||||
(
|
||||
new_self,
|
||||
|
@ -171,16 +167,15 @@ impl WinitAsyncExecutor {
|
|||
}
|
||||
|
||||
/// Poll all `Ready` futures.
|
||||
pub fn poll_all(&mut self) {
|
||||
self.waiting_for_poll = false;
|
||||
pub fn poll_all(&self) {
|
||||
self.waiting_for_poll.store(false, Ordering::SeqCst);
|
||||
|
||||
while let Ok(fut) = self.channel.try_recv() {
|
||||
self.task_queue.insert(Task::from_future(fut));
|
||||
}
|
||||
self.insert_tasks();
|
||||
|
||||
let mut completed_tasks = vec![];
|
||||
|
||||
for (index, task) in self.task_queue.iter_mut() {
|
||||
for (index, task) in self.read_task_queue().iter() {
|
||||
let mut task = task.lock().expect("non-poisoned task");
|
||||
if task.is_ready() {
|
||||
let handle = TaskHandle::for_task(index, self.self_ref.clone());
|
||||
let waker = handle.waker();
|
||||
|
@ -199,18 +194,16 @@ impl WinitAsyncExecutor {
|
|||
}
|
||||
}
|
||||
|
||||
for index in completed_tasks {
|
||||
self.task_queue.remove(index);
|
||||
}
|
||||
self.remove_tasks(completed_tasks);
|
||||
}
|
||||
|
||||
/// Mark a task as ready to proceed.
|
||||
fn wake(&mut self, task: TaskKey) {
|
||||
if let Some(task) = self.task_queue.get_mut(task) {
|
||||
fn wake(&self, task: TaskKey) {
|
||||
if let Some(task) = self.read_task_queue().get(task) {
|
||||
let mut task = task.lock().expect("non-poisoned task");
|
||||
if !task.is_completed() {
|
||||
task.set_ready();
|
||||
if !self.waiting_for_poll {
|
||||
self.waiting_for_poll = true;
|
||||
if !self.waiting_for_poll.swap(true, Ordering::SeqCst) {
|
||||
if self.event_loop.send_event(RuffleEvent::TaskPoll).is_err() {
|
||||
tracing::warn!("A task was queued on an event loop that has already ended. It will not be polled.");
|
||||
}
|
||||
|
@ -226,6 +219,28 @@ impl WinitAsyncExecutor {
|
|||
tracing::warn!("Attempted to wake an already-finished task");
|
||||
}
|
||||
}
|
||||
|
||||
fn insert_tasks(&self) {
|
||||
let mut queue = self.write_task_queue();
|
||||
while let Ok(fut) = self.channel.try_recv() {
|
||||
queue.insert(Mutex::new(Task::from_future(fut)));
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_tasks(&self, completed_tasks: Vec<TaskKey>) {
|
||||
let mut queue = self.write_task_queue();
|
||||
for index in completed_tasks {
|
||||
queue.remove(index);
|
||||
}
|
||||
}
|
||||
|
||||
fn write_task_queue(&self) -> RwLockWriteGuard<'_, SlotMap<TaskKey, Mutex<Task>>> {
|
||||
self.task_queue.write().expect("non-poisoned task queue")
|
||||
}
|
||||
|
||||
fn read_task_queue(&self) -> RwLockReadGuard<'_, SlotMap<TaskKey, Mutex<Task>>> {
|
||||
self.task_queue.read().expect("non-poisoned task queue")
|
||||
}
|
||||
}
|
||||
|
||||
pub trait FutureSpawner {
|
||||
|
|
|
@ -93,7 +93,7 @@ impl From<&GlobalPreferences> for PlayerOptions {
|
|||
/// which may be lost when this Player is closed (dropped)
|
||||
struct ActivePlayer {
|
||||
player: Arc<Mutex<Player>>,
|
||||
executor: Arc<Mutex<WinitAsyncExecutor>>,
|
||||
executor: Arc<WinitAsyncExecutor>,
|
||||
}
|
||||
|
||||
impl ActivePlayer {
|
||||
|
@ -342,11 +342,7 @@ impl PlayerController {
|
|||
|
||||
pub fn poll(&self) {
|
||||
if let Some(player) = &self.player {
|
||||
player
|
||||
.executor
|
||||
.lock()
|
||||
.expect("Executor lock must be available")
|
||||
.poll_all()
|
||||
player.executor.poll_all()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue