From f30c04e8b4d77bae221b2cfc3ce03faf1f06eeb2 Mon Sep 17 00:00:00 2001 From: Kamil Jarosz Date: Sun, 14 Apr 2024 20:26:51 +0200 Subject: [PATCH] frontend-utils: Get rid of deadlocks in AsyncExecutor 3d04971a was the first approach to eliminating deadlocks caused by waking a task from another one. A task may also try to wake itself, and that case was unfortunately not covered. This patch rewrites the AsyncExecutor to use an async channel for queuing tasks instead of a flag. Using a flag required holding a lock for a task which was problematic when a task needed to be awoken. A flag also required doing a full search through tasks each poll, which could be ineffective. --- frontend-utils/src/backends/executor.rs | 117 +++++++++---------- frontend-utils/src/backends/executor/task.rs | 27 +---- 2 files changed, 61 insertions(+), 83 deletions(-) diff --git a/frontend-utils/src/backends/executor.rs b/frontend-utils/src/backends/executor.rs index 10d932d21..8d97e8fe6 100644 --- a/frontend-utils/src/backends/executor.rs +++ b/frontend-utils/src/backends/executor.rs @@ -7,7 +7,7 @@ use ruffle_core::loader::Error; use slotmap::{new_key_type, SlotMap}; use std::future::Future; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard, Weak}; +use std::sync::{Arc, Mutex, MutexGuard, Weak}; use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; pub trait PollRequester: Clone { @@ -60,7 +60,7 @@ impl TaskHandle { /// Wake the task this context refers to. fn wake(&self) { if let Some(executor) = self.executor.upgrade() { - executor.wake(self.handle); + executor.wake(self.handle, true); } } @@ -135,10 +135,16 @@ impl TaskHandle { pub struct AsyncExecutor { /// List of all spawned tasks. - task_queue: RwLock>>, + tasks: Mutex>, + + /// Task execution queue represented by a channel. + /// + /// In order to wake a task it should be sent to this channel, + /// whereas reading from this channel is used for task polling. + task_queue: (Sender, Receiver), /// Source of tasks sent to us by the `NavigatorBackend`. - channel: Receiver>, + task_spawner: Receiver>, /// Weak reference to ourselves. self_ref: Weak, @@ -158,8 +164,9 @@ impl AsyncExecutor { pub fn new(poll_requester: R) -> (Arc, AsyncFutureSpawner) { let (send, recv) = unbounded(); let new_self = Arc::new_cyclic(|self_ref| Self { - task_queue: RwLock::new(SlotMap::with_key()), - channel: recv, + tasks: Mutex::new(SlotMap::with_key()), + task_queue: unbounded(), + task_spawner: recv, self_ref: self_ref.clone(), poll_requester: poll_requester.clone(), waiting_for_poll: AtomicBool::new(false), @@ -170,77 +177,67 @@ impl AsyncExecutor { ) } - /// Poll all `Ready` futures. + /// Poll all ready tasks. pub fn poll_all(&self) { self.waiting_for_poll.store(false, Ordering::SeqCst); - self.insert_tasks(); + let mut tasks = self.tasks.lock().expect("non-poisoned tasks"); + self.insert_tasks(&mut tasks); - let mut completed_tasks = vec![]; + // We want only to poll as many tasks as there were at the beginning. + // Newly added tasks will be polled later at the next iteration. + let tasks_to_poll = self.task_queue.1.len(); - for (index, task) in self.read_task_queue().iter() { - let mut task = task.lock().expect("non-poisoned task"); - if task.is_ready() { - let handle = TaskHandle::for_task(index, self.self_ref.clone()); - let waker = handle.waker(); - let mut context = Context::from_waker(&waker); + for _ in 0..tasks_to_poll { + let Ok(key) = self.task_queue.1.try_recv() else { + break; + }; - match task.poll(&mut context) { - Poll::Pending => {} - Poll::Ready(r) => { - if let Err(e) = r { - tracing::error!("Async error: {}", e); - } + let Some(task) = tasks.get_mut(key) else { + // Tried to wake a nonexistent task. + continue; + }; - completed_tasks.push(index); + if task.is_completed() { + // Tried to wake a completed task. + tasks.remove(key); + continue; + } + + let handle = TaskHandle::for_task(key, self.self_ref.clone()); + let waker = handle.waker(); + let mut context = Context::from_waker(&waker); + + match task.poll(&mut context) { + Poll::Pending => {} + Poll::Ready(r) => { + if let Err(e) = r { + tracing::error!("Async error: {}", e); } + + tasks.remove(key); } } } - - self.remove_tasks(completed_tasks); } /// Mark a task as ready to proceed. - fn wake(&self, task: TaskKey) { - if let Some(task) = self.read_task_queue().get(task) { - let mut task = task.lock().expect("non-poisoned task"); - if !task.is_completed() { - task.set_ready(); - if !self.waiting_for_poll.swap(true, Ordering::SeqCst) { - self.poll_requester.request_poll(); - } - } else { - tracing::warn!( - "A Waker was invoked after the task it was attached to was completed." - ); - } - } else { - tracing::warn!("Attempted to wake an already-finished task"); + fn wake(&self, task: TaskKey, poll: bool) { + self.task_queue.0.try_send(task).expect("wake a task"); + if poll && !self.waiting_for_poll.swap(true, Ordering::SeqCst) { + self.poll_requester.request_poll(); } } - fn insert_tasks(&self) { - let mut queue = self.write_task_queue(); - while let Ok(fut) = self.channel.try_recv() { - queue.insert(Mutex::new(Task::from_future(fut))); + fn insert_tasks(&self, tasks: &mut MutexGuard>) { + while let Ok(fut) = self.task_spawner.try_recv() { + let key = tasks.insert(Task::from_future(fut)); + + // Start executing the newly added task by waking it. + // We do not poll here, as we are inserting tasks during a poll already. + self.wake(key, false); } } - - fn remove_tasks(&self, completed_tasks: Vec) { - let mut queue = self.write_task_queue(); - for index in completed_tasks { - queue.remove(index); - } - } - - fn write_task_queue(&self) -> RwLockWriteGuard<'_, SlotMap>> { - self.task_queue.write().expect("non-poisoned task queue") - } - - fn read_task_queue(&self) -> RwLockReadGuard<'_, SlotMap>> { - self.task_queue.read().expect("non-poisoned task queue") - } } pub trait FutureSpawner { @@ -255,11 +252,11 @@ pub struct AsyncFutureSpawner { impl AsyncFutureSpawner { pub fn send_and_poll( channel: Sender>, - request_poll: R, + poll_requester: R, ) -> AsyncFutureSpawner { AsyncFutureSpawner { channel, - poll_requester: request_poll, + poll_requester, } } } diff --git a/frontend-utils/src/backends/executor/task.rs b/frontend-utils/src/backends/executor/task.rs index d61c5366a..e9a18bd0f 100644 --- a/frontend-utils/src/backends/executor/task.rs +++ b/frontend-utils/src/backends/executor/task.rs @@ -7,11 +7,8 @@ use std::task::{Context, Poll}; /// Indicates the state of a given task. #[derive(Eq, PartialEq)] enum TaskState { - /// Indicates that a task is ready to be polled to make progress. - Ready, - - /// Indicates that a task is blocked on another event source. - Blocked, + /// Indicates that a task is being executed and is waiting to be awoken. + InProgress, /// Indicates that a task is complete and should not be awoken again. Completed, @@ -30,27 +27,11 @@ impl Task { /// Box an owned future into a task structure. pub fn from_future(future: OwnedFuture<(), Error>) -> Self { Self { - state: TaskState::Ready, + state: TaskState::InProgress, future, } } - /// Returns `true` if the task is ready to be polled. - pub fn is_ready(&self) -> bool { - self.state == TaskState::Ready - } - - /// Marks this task to as ready to make progress. - pub fn set_ready(&mut self) { - self.state = TaskState::Ready - } - - /// Returns `true` if the task is awaiting further progress. - #[allow(dead_code)] - pub fn is_blocked(&self) -> bool { - self.state == TaskState::Blocked - } - /// Returns `true` if the task has completed and should not be polled again. pub fn is_completed(&self) -> bool { self.state == TaskState::Completed @@ -68,7 +49,7 @@ impl Task { let poll = self.future.as_mut().poll(context); self.state = match poll { - Poll::Pending => TaskState::Blocked, + Poll::Pending => TaskState::InProgress, Poll::Ready(_) => TaskState::Completed, };