scanner: Print partial results as they come out of the parallel iterator.

This relies on a proposed serial bridge as per https://github.com/rayon-rs/rayon/issues/858. As that project is licensed the same as Ruffle itself and the submitter intended it to be included in Rayon, I believe it's legal to copy this code.
This commit is contained in:
David Wendt 2021-08-24 19:16:36 -04:00 committed by Mike Welsh
parent 46638b1be8
commit a18d13833c
3 changed files with 62 additions and 9 deletions

1
Cargo.lock generated
View File

@ -3174,6 +3174,7 @@ name = "ruffle_scanner"
version = "0.1.0"
dependencies = [
"clap",
"crossbeam-channel",
"csv",
"env_logger",
"indicatif",

View File

@ -16,4 +16,5 @@ csv = "1.1"
indicatif = "0.16"
path-slash = "0.1.4"
swf = { path = "../swf" }
rayon = "1.5.1"
rayon = "1.5.1"
crossbeam-channel = "0.5"

View File

@ -288,6 +288,55 @@ fn scan_file(file: DirEntry, name: String) -> FileResults {
}
}
/// Parallel-to-serial iterator bridge trait
///
/// Proposed in and copied from https://github.com/rayon-rs/rayon/issues/858
pub trait SerBridge<T>
where
T: Send + 'static,
Self: ParallelIterator<Item = T> + 'static,
{
fn ser_bridge(self) -> SerBridgeImpl<T> {
SerBridgeImpl::new(self)
}
}
impl<PI, T> SerBridge<T> for PI
where
T: Send + 'static,
PI: ParallelIterator<Item = T> + 'static,
{
}
/// Parallel-to-serial iterator bridge
///
/// Proposed in and copied from https://github.com/rayon-rs/rayon/issues/858
pub struct SerBridgeImpl<T> {
rx: crossbeam_channel::Receiver<T>,
}
impl<T: Send + 'static> SerBridgeImpl<T> {
pub fn new<PI>(par_iterable: impl IntoParallelIterator<Item = T, Iter = PI>) -> Self
where
PI: ParallelIterator<Item = T> + 'static,
{
let par_iter = par_iterable.into_par_iter();
let (tx, rx) = crossbeam_channel::bounded(0);
std::thread::spawn(move || {
let _ = par_iter.try_for_each(|item| tx.send(item));
});
SerBridgeImpl { rx }
}
}
impl<T> Iterator for SerBridgeImpl<T> {
type Item = T;
fn next(&mut self) -> Option<T> {
self.rx.recv().ok()
}
}
fn main() -> Result<(), std::io::Error> {
ThreadLocalScanLogger::init();
@ -314,25 +363,27 @@ fn main() -> Result<(), std::io::Error> {
writer.write_record(&["Filename", "Progress", "Error", "AVM Version"])?;
let mut results = Vec::new();
to_scan
let input_path = opt.input_path;
let closure_progress = progress.clone();
let result_iter = to_scan
.into_par_iter()
.map(|file| {
.map(move |file| {
let name = file
.path()
.strip_prefix(&opt.input_path)
.strip_prefix(&input_path)
.unwrap_or_else(|_| file.path())
.to_slash_lossy();
let result = scan_file(file, name.clone());
progress.inc(1);
progress.set_message(name);
closure_progress.inc(1);
closure_progress.set_message(name);
result
})
.collect_into_vec(&mut results);
.ser_bridge();
for result in results {
for result in result_iter {
if result.error.is_none() {
good += 1;
} else {