frontend-utils: Get rid of deadlocks in AsyncExecutor

3d04971a was the first approach to eliminating deadlocks caused by
waking a task from another one. A task may also try to wake itself,
and that case was unfortunately not covered.

This patch rewrites the AsyncExecutor to use an async
channel for queuing tasks instead of a flag.
Using a flag required holding a lock for a task which was
problematic when a task needed to be awoken.
A flag also required doing a full search through tasks each poll,
which could be ineffective.
This commit is contained in:
Kamil Jarosz 2024-04-14 20:26:51 +02:00 committed by Nathan Adams
parent 31d8d98a10
commit f30c04e8b4
2 changed files with 61 additions and 83 deletions

View File

@ -7,7 +7,7 @@ use ruffle_core::loader::Error;
use slotmap::{new_key_type, SlotMap};
use std::future::Future;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard, Weak};
use std::sync::{Arc, Mutex, MutexGuard, Weak};
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
pub trait PollRequester: Clone {
@ -60,7 +60,7 @@ impl<R: PollRequester> TaskHandle<R> {
/// Wake the task this context refers to.
fn wake(&self) {
if let Some(executor) = self.executor.upgrade() {
executor.wake(self.handle);
executor.wake(self.handle, true);
}
}
@ -135,10 +135,16 @@ impl<R: PollRequester> TaskHandle<R> {
pub struct AsyncExecutor<R: PollRequester> {
/// List of all spawned tasks.
task_queue: RwLock<SlotMap<TaskKey, Mutex<Task>>>,
tasks: Mutex<SlotMap<TaskKey, Task>>,
/// Task execution queue represented by a channel.
///
/// In order to wake a task it should be sent to this channel,
/// whereas reading from this channel is used for task polling.
task_queue: (Sender<TaskKey>, Receiver<TaskKey>),
/// Source of tasks sent to us by the `NavigatorBackend`.
channel: Receiver<OwnedFuture<(), Error>>,
task_spawner: Receiver<OwnedFuture<(), Error>>,
/// Weak reference to ourselves.
self_ref: Weak<Self>,
@ -158,8 +164,9 @@ impl<R: PollRequester> AsyncExecutor<R> {
pub fn new(poll_requester: R) -> (Arc<Self>, AsyncFutureSpawner<R>) {
let (send, recv) = unbounded();
let new_self = Arc::new_cyclic(|self_ref| Self {
task_queue: RwLock::new(SlotMap::with_key()),
channel: recv,
tasks: Mutex::new(SlotMap::with_key()),
task_queue: unbounded(),
task_spawner: recv,
self_ref: self_ref.clone(),
poll_requester: poll_requester.clone(),
waiting_for_poll: AtomicBool::new(false),
@ -170,77 +177,67 @@ impl<R: PollRequester> AsyncExecutor<R> {
)
}
/// Poll all `Ready` futures.
/// Poll all ready tasks.
pub fn poll_all(&self) {
self.waiting_for_poll.store(false, Ordering::SeqCst);
self.insert_tasks();
let mut tasks = self.tasks.lock().expect("non-poisoned tasks");
self.insert_tasks(&mut tasks);
let mut completed_tasks = vec![];
// We want only to poll as many tasks as there were at the beginning.
// Newly added tasks will be polled later at the next iteration.
let tasks_to_poll = self.task_queue.1.len();
for (index, task) in self.read_task_queue().iter() {
let mut task = task.lock().expect("non-poisoned task");
if task.is_ready() {
let handle = TaskHandle::for_task(index, self.self_ref.clone());
let waker = handle.waker();
let mut context = Context::from_waker(&waker);
for _ in 0..tasks_to_poll {
let Ok(key) = self.task_queue.1.try_recv() else {
break;
};
match task.poll(&mut context) {
Poll::Pending => {}
Poll::Ready(r) => {
if let Err(e) = r {
tracing::error!("Async error: {}", e);
}
let Some(task) = tasks.get_mut(key) else {
// Tried to wake a nonexistent task.
continue;
};
completed_tasks.push(index);
if task.is_completed() {
// Tried to wake a completed task.
tasks.remove(key);
continue;
}
let handle = TaskHandle::for_task(key, self.self_ref.clone());
let waker = handle.waker();
let mut context = Context::from_waker(&waker);
match task.poll(&mut context) {
Poll::Pending => {}
Poll::Ready(r) => {
if let Err(e) = r {
tracing::error!("Async error: {}", e);
}
tasks.remove(key);
}
}
}
self.remove_tasks(completed_tasks);
}
/// Mark a task as ready to proceed.
fn wake(&self, task: TaskKey) {
if let Some(task) = self.read_task_queue().get(task) {
let mut task = task.lock().expect("non-poisoned task");
if !task.is_completed() {
task.set_ready();
if !self.waiting_for_poll.swap(true, Ordering::SeqCst) {
self.poll_requester.request_poll();
}
} else {
tracing::warn!(
"A Waker was invoked after the task it was attached to was completed."
);
}
} else {
tracing::warn!("Attempted to wake an already-finished task");
fn wake(&self, task: TaskKey, poll: bool) {
self.task_queue.0.try_send(task).expect("wake a task");
if poll && !self.waiting_for_poll.swap(true, Ordering::SeqCst) {
self.poll_requester.request_poll();
}
}
fn insert_tasks(&self) {
let mut queue = self.write_task_queue();
while let Ok(fut) = self.channel.try_recv() {
queue.insert(Mutex::new(Task::from_future(fut)));
fn insert_tasks(&self, tasks: &mut MutexGuard<SlotMap<TaskKey, Task>>) {
while let Ok(fut) = self.task_spawner.try_recv() {
let key = tasks.insert(Task::from_future(fut));
// Start executing the newly added task by waking it.
// We do not poll here, as we are inserting tasks during a poll already.
self.wake(key, false);
}
}
fn remove_tasks(&self, completed_tasks: Vec<TaskKey>) {
let mut queue = self.write_task_queue();
for index in completed_tasks {
queue.remove(index);
}
}
fn write_task_queue(&self) -> RwLockWriteGuard<'_, SlotMap<TaskKey, Mutex<Task>>> {
self.task_queue.write().expect("non-poisoned task queue")
}
fn read_task_queue(&self) -> RwLockReadGuard<'_, SlotMap<TaskKey, Mutex<Task>>> {
self.task_queue.read().expect("non-poisoned task queue")
}
}
pub trait FutureSpawner {
@ -255,11 +252,11 @@ pub struct AsyncFutureSpawner<R: PollRequester> {
impl<R: PollRequester> AsyncFutureSpawner<R> {
pub fn send_and_poll(
channel: Sender<OwnedFuture<(), Error>>,
request_poll: R,
poll_requester: R,
) -> AsyncFutureSpawner<R> {
AsyncFutureSpawner {
channel,
poll_requester: request_poll,
poll_requester,
}
}
}

View File

@ -7,11 +7,8 @@ use std::task::{Context, Poll};
/// Indicates the state of a given task.
#[derive(Eq, PartialEq)]
enum TaskState {
/// Indicates that a task is ready to be polled to make progress.
Ready,
/// Indicates that a task is blocked on another event source.
Blocked,
/// Indicates that a task is being executed and is waiting to be awoken.
InProgress,
/// Indicates that a task is complete and should not be awoken again.
Completed,
@ -30,27 +27,11 @@ impl Task {
/// Box an owned future into a task structure.
pub fn from_future(future: OwnedFuture<(), Error>) -> Self {
Self {
state: TaskState::Ready,
state: TaskState::InProgress,
future,
}
}
/// Returns `true` if the task is ready to be polled.
pub fn is_ready(&self) -> bool {
self.state == TaskState::Ready
}
/// Marks this task to as ready to make progress.
pub fn set_ready(&mut self) {
self.state = TaskState::Ready
}
/// Returns `true` if the task is awaiting further progress.
#[allow(dead_code)]
pub fn is_blocked(&self) -> bool {
self.state == TaskState::Blocked
}
/// Returns `true` if the task has completed and should not be polled again.
pub fn is_completed(&self) -> bool {
self.state == TaskState::Completed
@ -68,7 +49,7 @@ impl Task {
let poll = self.future.as_mut().poll(context);
self.state = match poll {
Poll::Pending => TaskState::Blocked,
Poll::Pending => TaskState::InProgress,
Poll::Ready(_) => TaskState::Completed,
};