Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions crates/napi/src/next_api/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ use tracing::Instrument;
use tracing_subscriber::{Registry, layer::SubscriberExt, util::SubscriberInitExt};
use turbo_rcstr::{RcStr, rcstr};
use turbo_tasks::{
Completion, Effects, FxIndexSet, NonLocalValue, OperationValue, OperationVc, ReadRef,
ResolvedVc, TaskInput, TransientInstance, TryJoinIterExt, TurboTasksApi, UpdateInfo, Vc,
get_effects,
Effects, FxIndexSet, NonLocalValue, OperationValue, OperationVc, ReadRef, ResolvedVc,
TaskInput, TransientInstance, TryJoinIterExt, TurboTasksApi, UpdateInfo, Vc, get_effects,
message_queue::{CompilationEvent, Severity, TimingEvent},
trace::TraceRawVcs,
};
Expand Down Expand Up @@ -465,13 +464,18 @@ pub fn project_new(
.or_else(|e| turbopack_ctx.throw_turbopack_internal_result(&e))
.await?;

turbo_tasks.spawn_once_task({
turbo_tasks.start_once_process({
let tt = turbo_tasks.clone();
async move {
benchmark_file_io(tt, container.project().node_root().owned().await?)
.await
.inspect_err(|err| tracing::warn!(%err, "failed to benchmark file IO"))
}
Box::pin(async move {
let future = async move {
benchmark_file_io(tt, container.project().node_root().owned().await?).await
};
if let Err(err) = future.await {
// TODO Not ideal to print directly to stdout.
// We should use a compilation event instead to report async errors.
println!("Failed to benchmark file IO: {err}");
}
})
});

Ok(External::new(ProjectInstance {
Expand Down Expand Up @@ -519,10 +523,7 @@ impl CompilationEvent for SlowFilesystemEvent {
/// - https://x.com/jarredsumner/status/1637549427677364224
/// - https://github.com/oven-sh/bun/blob/06a9aa80c38b08b3148bfeabe560/src/install/install.zig#L3038
#[tracing::instrument(skip(turbo_tasks))]
async fn benchmark_file_io(
turbo_tasks: NextTurboTasks,
directory: FileSystemPath,
) -> Result<Vc<Completion>> {
async fn benchmark_file_io(turbo_tasks: NextTurboTasks, directory: FileSystemPath) -> Result<()> {
// try to get the real file path on disk so that we can use it with tokio
let fs = ResolvedVc::try_downcast_type::<DiskFileSystem>(directory.fs)
.context(anyhow!(
Expand Down Expand Up @@ -567,7 +568,7 @@ async fn benchmark_file_io(
}));
}

Ok(Completion::new())
Ok(())
}

#[napi]
Expand Down
11 changes: 5 additions & 6 deletions turbopack/crates/turbo-tasks-backend/benches/scope_stress.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Result;
use criterion::{BenchmarkId, Criterion};
use turbo_tasks::{Completion, ReadConsistency, TryJoinIterExt, TurboTasks, Vc};
use turbo_tasks::{Completion, TryJoinIterExt, TurboTasks, Vc};
use turbo_tasks_backend::{BackendOptions, TurboTasksBackend, noop_backing_storage};

pub fn scope_stress(c: &mut Criterion) {
Expand Down Expand Up @@ -47,12 +47,11 @@ pub fn scope_stress(c: &mut Criterion) {
.map(|(a, b)| {
let tt = &tt;
async move {
let task = tt.spawn_once_task(async move {
tt.run_once(async move {
rectangle(a, b).strongly_consistent().await?;
Ok::<Vc<()>, _>(Default::default())
});
tt.wait_task_completion(task, ReadConsistency::Eventual)
.await
Ok(())
})
.await
}
})
.try_join()
Expand Down
14 changes: 6 additions & 8 deletions turbopack/crates/turbo-tasks-backend/benches/stress.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Result;
use criterion::{BenchmarkId, Criterion};
use turbo_tasks::{ReadConsistency, TryJoinIterExt, TurboTasks, Vc};
use turbo_tasks::{TryJoinIterExt, TurboTasks, Vc};
use turbo_tasks_backend::{BackendOptions, TurboTasksBackend, noop_backing_storage};

pub fn fibonacci(c: &mut Criterion) {
Expand Down Expand Up @@ -37,18 +37,16 @@ pub fn fibonacci(c: &mut Criterion) {
noop_backing_storage(),
));
async move {
let task = tt.spawn_once_task(async move {
tt.run_once(async move {
// Number of tasks:
// 1 root task
// size >= 1 => + fib(0) = 1
// size >= 2 => + fib(1) = 2
(0..size).map(|i| fib(i, i)).try_join().await?;
Ok::<Vc<()>, _>(Default::default())
});
tt.wait_task_completion(task, ReadConsistency::Eventual)
.await
.unwrap();
tt
Ok(())
})
.await
.unwrap();
}
})
});
Expand Down
19 changes: 14 additions & 5 deletions turbopack/crates/turbo-tasks-testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
future::Future,
mem::replace,
panic::AssertUnwindSafe,
pin::Pin,
sync::{Arc, Mutex, Weak},
};

Expand Down Expand Up @@ -120,22 +121,30 @@ impl TurboTasksCallApi for VcStorage {
fn run_once(
&self,
_future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
) -> TaskId {
) -> Pin<
Box<
(dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send + 'static),
>,
> {
unreachable!()
}

fn run_once_with_reason(
&self,
_reason: StaticOrArc<dyn InvalidationReason>,
_future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
) -> TaskId {
) -> Pin<
Box<
(dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send + 'static),
>,
> {
unreachable!()
}

fn run_once_process(
fn start_once_process(
&self,
_future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
) -> TaskId {
_future: std::pin::Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
) {
unreachable!()
}
}
Expand Down
77 changes: 33 additions & 44 deletions turbopack/crates/turbo-tasks/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,13 @@ pub trait TurboTasksCallApi: Sync + Send {
fn run_once(
&self,
future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
) -> TaskId;
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
fn run_once_with_reason(
&self,
reason: StaticOrArc<dyn InvalidationReason>,
future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
) -> TaskId;
fn run_once_process(
&self,
future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
) -> TaskId;
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
}

/// A type-erased subset of [`TurboTasks`] stored inside a thread local when we're in a turbo task
Expand Down Expand Up @@ -493,7 +490,7 @@ impl<B: Backend + 'static> TurboTasks<B> {
/// Creates a new root task, that is only executed once.
/// Dependencies will not invalidate the task.
#[track_caller]
pub fn spawn_once_task<T, Fut>(&self, future: Fut) -> TaskId
fn spawn_once_task<T, Fut>(&self, future: Fut) -> TaskId
where
T: ?Sized,
Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
Expand Down Expand Up @@ -533,6 +530,21 @@ impl<B: Backend + 'static> TurboTasks<B> {
Ok(rx.await?)
}

pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
let this = self.pin();
tokio::spawn(async move {
this.pin()
.run_once(async move {
this.finish_foreground_job();
future.await;
this.begin_foreground_job();
Ok(())
})
.await
.unwrap()
});
}

pub(crate) fn native_call(
&self,
native_fn: &'static NativeFunction,
Expand Down Expand Up @@ -1109,41 +1121,28 @@ impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
fn run_once(
&self,
future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
) -> TaskId {
self.spawn_once_task(async move {
future.await?;
Ok(Completion::new())
})
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
let this = self.pin();
Box::pin(async move { this.run_once(future).await })
}

#[track_caller]
fn run_once_with_reason(
&self,
reason: StaticOrArc<dyn InvalidationReason>,
future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
) -> TaskId {
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
{
let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
reason_set.insert(reason);
}
self.spawn_once_task(async move {
future.await?;
Ok(Completion::new())
})
let this = self.pin();
Box::pin(async move { this.run_once(future).await })
}

#[track_caller]
fn run_once_process(
&self,
future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
) -> TaskId {
let this = self.pin();
self.spawn_once_task(async move {
this.finish_foreground_job();
future.await?;
this.begin_foreground_job();
Ok(Completion::new())
})
fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
self.start_once_process(future)
}
}

Expand Down Expand Up @@ -1428,18 +1427,13 @@ pub async fn run_once<T: Send + 'static>(
) -> Result<T> {
let (tx, rx) = tokio::sync::oneshot::channel();

let task_id = tt.run_once(Box::pin(async move {
tt.run_once(Box::pin(async move {
let result = future.await?;
tx.send(result)
.map_err(|_| anyhow!("unable to send result"))?;
Ok(())
}));

// INVALIDATION: A Once task will never invalidate, therefore we don't need to
// track a dependency
let raw_result = read_task_output_untracked(&*tt, task_id, ReadConsistency::Eventual).await?;
let raw_future = raw_result.into_read().untracked();
turbo_tasks_future_scope(tt, ReadVcFuture::<Completion>::from(raw_future)).await?;
}))
.await?;

Ok(rx.await?)
}
Expand All @@ -1451,21 +1445,16 @@ pub async fn run_once_with_reason<T: Send + 'static>(
) -> Result<T> {
let (tx, rx) = tokio::sync::oneshot::channel();

let task_id = tt.run_once_with_reason(
tt.run_once_with_reason(
(Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
Box::pin(async move {
let result = future.await?;
tx.send(result)
.map_err(|_| anyhow!("unable to send result"))?;
Ok(())
}),
);

// INVALIDATION: A Once task will never invalidate, therefore we don't need to
// track a dependency
let raw_result = read_task_output_untracked(&*tt, task_id, ReadConsistency::Eventual).await?;
let raw_future = raw_result.into_read().untracked();
turbo_tasks_future_scope(tt, ReadVcFuture::<Completion>::from(raw_future)).await?;
)
.await?;

Ok(rx.await?)
}
Expand Down
Loading
Loading