From 07ff2bc3e4901f5dd500f0722c73ba1b13b4f99d Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Tue, 16 Sep 2025 15:00:29 +0200 Subject: [PATCH 1/2] Turbopack: Change run once signature to avoid exposing TaskId --- crates/napi/src/next_api/project.rs | 29 ++++--- .../benches/scope_stress.rs | 11 ++- .../turbo-tasks-backend/benches/stress.rs | 14 ++- .../crates/turbo-tasks-testing/src/lib.rs | 19 ++-- turbopack/crates/turbo-tasks/src/manager.rs | 77 +++++++---------- .../crates/turbopack-cli/src/build/mod.rs | 86 +++++++++---------- .../turbopack-dev-server/src/update/server.rs | 3 +- turbopack/crates/turbopack-nft/src/main.rs | 12 ++- .../crates/turbopack-tests/tests/snapshot.rs | 11 ++- .../benches/node_file_trace.rs | 13 ++- 10 files changed, 130 insertions(+), 145 deletions(-) diff --git a/crates/napi/src/next_api/project.rs b/crates/napi/src/next_api/project.rs index 121ef83428114..9e1ad7a5c0aca 100644 --- a/crates/napi/src/next_api/project.rs +++ b/crates/napi/src/next_api/project.rs @@ -32,9 +32,8 @@ use tracing::Instrument; use tracing_subscriber::{Registry, layer::SubscriberExt, util::SubscriberInitExt}; use turbo_rcstr::{RcStr, rcstr}; use turbo_tasks::{ - Completion, Effects, FxIndexSet, NonLocalValue, OperationValue, OperationVc, ReadRef, - ResolvedVc, TaskInput, TransientInstance, TryJoinIterExt, TurboTasksApi, UpdateInfo, Vc, - get_effects, + Effects, FxIndexSet, NonLocalValue, OperationValue, OperationVc, ReadRef, ResolvedVc, + TaskInput, TransientInstance, TryJoinIterExt, TurboTasksApi, UpdateInfo, Vc, get_effects, message_queue::{CompilationEvent, Severity, TimingEvent}, trace::TraceRawVcs, }; @@ -465,13 +464,18 @@ pub fn project_new( .or_else(|e| turbopack_ctx.throw_turbopack_internal_result(&e)) .await?; - turbo_tasks.spawn_once_task({ + turbo_tasks.start_once_process({ let tt = turbo_tasks.clone(); - async move { - benchmark_file_io(tt, container.project().node_root().owned().await?) - .await - .inspect_err(|err| tracing::warn!(%err, "failed to benchmark file IO")) - } + Box::pin(async move { + let future = async move { + benchmark_file_io(tt, container.project().node_root().owned().await?).await + }; + if let Err(err) = future.await { + // TODO this tracing warn will go into the void. Should we change this to + // stdout? + tracing::warn!(%err, "failed to benchmark file IO"); + } + }) }); Ok(External::new(ProjectInstance { @@ -519,10 +523,7 @@ impl CompilationEvent for SlowFilesystemEvent { /// - https://x.com/jarredsumner/status/1637549427677364224 /// - https://github.com/oven-sh/bun/blob/06a9aa80c38b08b3148bfeabe560/src/install/install.zig#L3038 #[tracing::instrument(skip(turbo_tasks))] -async fn benchmark_file_io( - turbo_tasks: NextTurboTasks, - directory: FileSystemPath, -) -> Result> { +async fn benchmark_file_io(turbo_tasks: NextTurboTasks, directory: FileSystemPath) -> Result<()> { // try to get the real file path on disk so that we can use it with tokio let fs = ResolvedVc::try_downcast_type::(directory.fs) .context(anyhow!( @@ -567,7 +568,7 @@ async fn benchmark_file_io( })); } - Ok(Completion::new()) + Ok(()) } #[napi] diff --git a/turbopack/crates/turbo-tasks-backend/benches/scope_stress.rs b/turbopack/crates/turbo-tasks-backend/benches/scope_stress.rs index 0e2a1791e366d..e532eae1a394a 100644 --- a/turbopack/crates/turbo-tasks-backend/benches/scope_stress.rs +++ b/turbopack/crates/turbo-tasks-backend/benches/scope_stress.rs @@ -1,6 +1,6 @@ use anyhow::Result; use criterion::{BenchmarkId, Criterion}; -use turbo_tasks::{Completion, ReadConsistency, TryJoinIterExt, TurboTasks, Vc}; +use turbo_tasks::{Completion, TryJoinIterExt, TurboTasks, Vc}; use turbo_tasks_backend::{BackendOptions, TurboTasksBackend, noop_backing_storage}; pub fn scope_stress(c: &mut Criterion) { @@ -47,12 +47,11 @@ pub fn scope_stress(c: &mut Criterion) { .map(|(a, b)| { let tt = &tt; async move { - let task = tt.spawn_once_task(async move { + tt.run_once(async move { rectangle(a, b).strongly_consistent().await?; - Ok::, _>(Default::default()) - }); - tt.wait_task_completion(task, ReadConsistency::Eventual) - .await + Ok(()) + }) + .await } }) .try_join() diff --git a/turbopack/crates/turbo-tasks-backend/benches/stress.rs b/turbopack/crates/turbo-tasks-backend/benches/stress.rs index 82159ec1b93d9..c5c53077bfbd5 100644 --- a/turbopack/crates/turbo-tasks-backend/benches/stress.rs +++ b/turbopack/crates/turbo-tasks-backend/benches/stress.rs @@ -1,6 +1,6 @@ use anyhow::Result; use criterion::{BenchmarkId, Criterion}; -use turbo_tasks::{ReadConsistency, TryJoinIterExt, TurboTasks, Vc}; +use turbo_tasks::{TryJoinIterExt, TurboTasks, Vc}; use turbo_tasks_backend::{BackendOptions, TurboTasksBackend, noop_backing_storage}; pub fn fibonacci(c: &mut Criterion) { @@ -37,18 +37,16 @@ pub fn fibonacci(c: &mut Criterion) { noop_backing_storage(), )); async move { - let task = tt.spawn_once_task(async move { + tt.run_once(async move { // Number of tasks: // 1 root task // size >= 1 => + fib(0) = 1 // size >= 2 => + fib(1) = 2 (0..size).map(|i| fib(i, i)).try_join().await?; - Ok::, _>(Default::default()) - }); - tt.wait_task_completion(task, ReadConsistency::Eventual) - .await - .unwrap(); - tt + Ok(()) + }) + .await + .unwrap(); } }) }); diff --git a/turbopack/crates/turbo-tasks-testing/src/lib.rs b/turbopack/crates/turbo-tasks-testing/src/lib.rs index 03c93d962bcd7..e5795a27eca27 100644 --- a/turbopack/crates/turbo-tasks-testing/src/lib.rs +++ b/turbopack/crates/turbo-tasks-testing/src/lib.rs @@ -7,6 +7,7 @@ use std::{ future::Future, mem::replace, panic::AssertUnwindSafe, + pin::Pin, sync::{Arc, Mutex, Weak}, }; @@ -120,7 +121,11 @@ impl TurboTasksCallApi for VcStorage { fn run_once( &self, _future: std::pin::Pin> + Send + 'static>>, - ) -> TaskId { + ) -> Pin< + Box< + (dyn futures::Future> + std::marker::Send + 'static), + >, + > { unreachable!() } @@ -128,14 +133,18 @@ impl TurboTasksCallApi for VcStorage { &self, _reason: StaticOrArc, _future: std::pin::Pin> + Send + 'static>>, - ) -> TaskId { + ) -> Pin< + Box< + (dyn futures::Future> + std::marker::Send + 'static), + >, + > { unreachable!() } - fn run_once_process( + fn start_once_process( &self, - _future: std::pin::Pin> + Send + 'static>>, - ) -> TaskId { + _future: std::pin::Pin + Send + 'static>>, + ) { unreachable!() } } diff --git a/turbopack/crates/turbo-tasks/src/manager.rs b/turbopack/crates/turbo-tasks/src/manager.rs index bad7772f2223f..fb9cfed8bce08 100644 --- a/turbopack/crates/turbo-tasks/src/manager.rs +++ b/turbopack/crates/turbo-tasks/src/manager.rs @@ -77,16 +77,13 @@ pub trait TurboTasksCallApi: Sync + Send { fn run_once( &self, future: Pin> + Send + 'static>>, - ) -> TaskId; + ) -> Pin> + Send>>; fn run_once_with_reason( &self, reason: StaticOrArc, future: Pin> + Send + 'static>>, - ) -> TaskId; - fn run_once_process( - &self, - future: Pin> + Send + 'static>>, - ) -> TaskId; + ) -> Pin> + Send>>; + fn start_once_process(&self, future: Pin + Send + 'static>>); } /// A type-erased subset of [`TurboTasks`] stored inside a thread local when we're in a turbo task @@ -493,7 +490,7 @@ impl TurboTasks { /// Creates a new root task, that is only executed once. /// Dependencies will not invalidate the task. #[track_caller] - pub fn spawn_once_task(&self, future: Fut) -> TaskId + fn spawn_once_task(&self, future: Fut) -> TaskId where T: ?Sized, Fut: Future>> + Send + 'static, @@ -533,6 +530,21 @@ impl TurboTasks { Ok(rx.await?) } + pub fn start_once_process(&self, future: impl Future + Send + 'static) { + let this = self.pin(); + tokio::spawn(async move { + this.pin() + .run_once(async move { + this.finish_foreground_job(); + future.await; + this.begin_foreground_job(); + Ok(()) + }) + .await + .unwrap() + }); + } + pub(crate) fn native_call( &self, native_fn: &'static NativeFunction, @@ -1109,11 +1121,9 @@ impl TurboTasksCallApi for TurboTasks { fn run_once( &self, future: Pin> + Send + 'static>>, - ) -> TaskId { - self.spawn_once_task(async move { - future.await?; - Ok(Completion::new()) - }) + ) -> Pin> + Send>> { + let this = self.pin(); + Box::pin(async move { this.run_once(future).await }) } #[track_caller] @@ -1121,29 +1131,18 @@ impl TurboTasksCallApi for TurboTasks { &self, reason: StaticOrArc, future: Pin> + Send + 'static>>, - ) -> TaskId { + ) -> Pin> + Send>> { { let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap(); reason_set.insert(reason); } - self.spawn_once_task(async move { - future.await?; - Ok(Completion::new()) - }) + let this = self.pin(); + Box::pin(async move { this.run_once(future).await }) } #[track_caller] - fn run_once_process( - &self, - future: Pin> + Send + 'static>>, - ) -> TaskId { - let this = self.pin(); - self.spawn_once_task(async move { - this.finish_foreground_job(); - future.await?; - this.begin_foreground_job(); - Ok(Completion::new()) - }) + fn start_once_process(&self, future: Pin + Send + 'static>>) { + self.start_once_process(future) } } @@ -1428,18 +1427,13 @@ pub async fn run_once( ) -> Result { let (tx, rx) = tokio::sync::oneshot::channel(); - let task_id = tt.run_once(Box::pin(async move { + tt.run_once(Box::pin(async move { let result = future.await?; tx.send(result) .map_err(|_| anyhow!("unable to send result"))?; Ok(()) - })); - - // INVALIDATION: A Once task will never invalidate, therefore we don't need to - // track a dependency - let raw_result = read_task_output_untracked(&*tt, task_id, ReadConsistency::Eventual).await?; - let raw_future = raw_result.into_read().untracked(); - turbo_tasks_future_scope(tt, ReadVcFuture::::from(raw_future)).await?; + })) + .await?; Ok(rx.await?) } @@ -1451,7 +1445,7 @@ pub async fn run_once_with_reason( ) -> Result { let (tx, rx) = tokio::sync::oneshot::channel(); - let task_id = tt.run_once_with_reason( + tt.run_once_with_reason( (Arc::new(reason) as Arc).into(), Box::pin(async move { let result = future.await?; @@ -1459,13 +1453,8 @@ pub async fn run_once_with_reason( .map_err(|_| anyhow!("unable to send result"))?; Ok(()) }), - ); - - // INVALIDATION: A Once task will never invalidate, therefore we don't need to - // track a dependency - let raw_result = read_task_output_untracked(&*tt, task_id, ReadConsistency::Eventual).await?; - let raw_future = raw_result.into_read().untracked(); - turbo_tasks_future_scope(tt, ReadVcFuture::::from(raw_future)).await?; + ) + .await?; Ok(rx.await?) } diff --git a/turbopack/crates/turbopack-cli/src/build/mod.rs b/turbopack/crates/turbopack-cli/src/build/mod.rs index bd98caa78a134..9a8c8663716f2 100644 --- a/turbopack/crates/turbopack-cli/src/build/mod.rs +++ b/turbopack/crates/turbopack-cli/src/build/mod.rs @@ -9,9 +9,7 @@ use anyhow::{Context, Result, bail}; use rustc_hash::FxHashSet; use tracing::Instrument; use turbo_rcstr::RcStr; -use turbo_tasks::{ - ReadConsistency, ResolvedVc, TransientInstance, TryJoinIterExt, TurboTasks, Vc, apply_effects, -}; +use turbo_tasks::{ResolvedVc, TransientInstance, TryJoinIterExt, TurboTasks, Vc, apply_effects}; use turbo_tasks_backend::{ BackendOptions, NoopBackingStorage, TurboTasksBackend, noop_backing_storage, }; @@ -143,51 +141,47 @@ impl TurbopackBuildBuilder { } pub async fn build(self) -> Result<()> { - let task = self.turbo_tasks.spawn_once_task::<(), _>(async move { - let build_result_op = build_internal( - self.project_dir.clone(), - self.root_dir, - self.entry_requests.clone(), - self.browserslist_query, - self.source_maps_type, - self.minify_type, - self.target, - self.scope_hoist, - ); - - // Await the result to propagate any errors. - build_result_op.read_strongly_consistent().await?; - - apply_effects(build_result_op) - .instrument(tracing::info_span!("apply effects")) - .await?; - - let issue_reporter: Vc> = - Vc::upcast(ConsoleUi::new(TransientInstance::new(LogOptions { - project_dir: PathBuf::from(self.project_dir), - current_dir: current_dir().unwrap(), - show_all: self.show_all, - log_detail: self.log_detail, - log_level: self.log_level, - }))); - - handle_issues( - build_result_op, - issue_reporter, - IssueSeverity::Error, - None, - None, - ) - .await?; - - Ok(Default::default()) - }); - self.turbo_tasks - .wait_task_completion(task, ReadConsistency::Strong) - .await?; + .run_once(async move { + let build_result_op = build_internal( + self.project_dir.clone(), + self.root_dir, + self.entry_requests.clone(), + self.browserslist_query, + self.source_maps_type, + self.minify_type, + self.target, + self.scope_hoist, + ); + + // Await the result to propagate any errors. + build_result_op.read_strongly_consistent().await?; + + apply_effects(build_result_op) + .instrument(tracing::info_span!("apply effects")) + .await?; + + let issue_reporter: Vc> = + Vc::upcast(ConsoleUi::new(TransientInstance::new(LogOptions { + project_dir: PathBuf::from(self.project_dir), + current_dir: current_dir().unwrap(), + show_all: self.show_all, + log_detail: self.log_detail, + log_level: self.log_level, + }))); + + handle_issues( + build_result_op, + issue_reporter, + IssueSeverity::Error, + None, + None, + ) + .await?; - Ok(()) + Ok(()) + }) + .await } } diff --git a/turbopack/crates/turbopack-dev-server/src/update/server.rs b/turbopack/crates/turbopack-dev-server/src/update/server.rs index c1ba9621fb4c2..4b2e51bb24adf 100644 --- a/turbopack/crates/turbopack-dev-server/src/update/server.rs +++ b/turbopack/crates/turbopack-dev-server/src/update/server.rs @@ -52,11 +52,10 @@ where /// Run the update server loop. pub fn run(self, tt: &dyn TurboTasksApi, ws: HyperWebsocket) { - tt.run_once_process(Box::pin(async move { + tt.start_once_process(Box::pin(async move { if let Err(err) = self.run_internal(ws).await { println!("[UpdateServer]: error {err:#}"); } - Ok(()) })); } diff --git a/turbopack/crates/turbopack-nft/src/main.rs b/turbopack/crates/turbopack-nft/src/main.rs index 36491e5880ae3..e6e3e32b63eda 100644 --- a/turbopack/crates/turbopack-nft/src/main.rs +++ b/turbopack/crates/turbopack-nft/src/main.rs @@ -6,7 +6,7 @@ use std::env::current_dir; use anyhow::Result; use clap::Parser; use tracing_subscriber::{Registry, layer::SubscriberExt, util::SubscriberInitExt}; -use turbo_tasks::{ReadConsistency, TurboTasks}; +use turbo_tasks::TurboTasks; use turbo_tasks_backend::{BackendOptions, TurboTasksBackend, noop_backing_storage}; use turbo_tasks_malloc::TurboMalloc; use turbopack_nft::nft::node_file_trace; @@ -90,7 +90,7 @@ async fn main_inner(args: Arguments) -> Result<()> { noop_backing_storage(), )); - let task = tt.spawn_once_task::<(), _>(async move { + tt.run_once(async move { node_file_trace( current_dir()?.to_str().unwrap().into(), args.entry.into(), @@ -99,11 +99,9 @@ async fn main_inner(args: Arguments) -> Result<()> { args.depth, ) .await?; - Ok(Default::default()) - }); - - tt.wait_task_completion(task, ReadConsistency::Strong) - .await?; + Ok(()) + }) + .await?; // Intentionally leak this `Arc`. Otherwise we'll waste time during process exit performing a // ton of drop calls. diff --git a/turbopack/crates/turbopack-tests/tests/snapshot.rs b/turbopack/crates/turbopack-tests/tests/snapshot.rs index e04a7acd74c79..105fb1004c21d 100644 --- a/turbopack/crates/turbopack-tests/tests/snapshot.rs +++ b/turbopack/crates/turbopack-tests/tests/snapshot.rs @@ -11,7 +11,7 @@ use rustc_hash::FxHashSet; use serde::Deserialize; use serde_json::json; use turbo_rcstr::{RcStr, rcstr}; -use turbo_tasks::{ReadConsistency, ResolvedVc, TurboTasks, ValueToString, Vc, apply_effects}; +use turbo_tasks::{ResolvedVc, TurboTasks, ValueToString, Vc, apply_effects}; use turbo_tasks_backend::{BackendOptions, TurboTasksBackend, noop_backing_storage}; use turbo_tasks_env::DotenvProcessEnv; use turbo_tasks_fs::{ @@ -201,15 +201,14 @@ async fn run(resource: PathBuf) -> Result<()> { }, noop_backing_storage(), )); - let task = tt.spawn_once_task(async move { + tt.run_once(async move { let emit_op = run_inner_operation(resource.to_str().unwrap().into()); emit_op.read_strongly_consistent().await?; apply_effects(emit_op).await?; - Ok(Vc::<()>::default()) - }); - tt.wait_task_completion(task, ReadConsistency::Strong) - .await?; + Ok(()) + }) + .await?; Ok(()) } diff --git a/turbopack/crates/turbopack-tracing/benches/node_file_trace.rs b/turbopack/crates/turbopack-tracing/benches/node_file_trace.rs index de94a62ef548f..f9aa9a3d78483 100644 --- a/turbopack/crates/turbopack-tracing/benches/node_file_trace.rs +++ b/turbopack/crates/turbopack-tracing/benches/node_file_trace.rs @@ -3,7 +3,7 @@ use std::{fs, path::PathBuf}; use criterion::{Bencher, BenchmarkId, Criterion}; use regex::Regex; use turbo_rcstr::{RcStr, rcstr}; -use turbo_tasks::{ReadConsistency, ResolvedVc, TurboTasks, Vc, apply_effects}; +use turbo_tasks::{ResolvedVc, TurboTasks, Vc, apply_effects}; use turbo_tasks_backend::{BackendOptions, TurboTasksBackend, noop_backing_storage}; use turbo_tasks_fs::{DiskFileSystem, FileSystem, NullFileSystem}; use turbopack::{ @@ -75,7 +75,7 @@ fn bench_emit(b: &mut Bencher, bench_input: &BenchInput) { let tests_root: RcStr = bench_input.tests_root.clone().into(); let input: RcStr = bench_input.input.clone().into(); async move { - let task = tt.spawn_once_task(async move { + tt.run_once(async move { let input_fs = DiskFileSystem::new(rcstr!("tests"), tests_root.clone()); let input = input_fs.root().await?.join(&input)?; @@ -125,11 +125,10 @@ fn bench_emit(b: &mut Bencher, bench_input: &BenchInput) { emit_op.read_strongly_consistent().await?; apply_effects(emit_op).await?; - Ok::, _>(Default::default()) - }); - tt.wait_task_completion(task, ReadConsistency::Strong) - .await - .unwrap(); + Ok(()) + }) + .await + .unwrap(); } }) } From 11edbd2bfa5fa4b7bc0bce706f3ac8adbe0002d1 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Wed, 17 Sep 2025 13:35:42 +0200 Subject: [PATCH 2/2] Print error to stdout --- crates/napi/src/next_api/project.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/napi/src/next_api/project.rs b/crates/napi/src/next_api/project.rs index 9e1ad7a5c0aca..21613df1a97ac 100644 --- a/crates/napi/src/next_api/project.rs +++ b/crates/napi/src/next_api/project.rs @@ -471,9 +471,9 @@ pub fn project_new( benchmark_file_io(tt, container.project().node_root().owned().await?).await }; if let Err(err) = future.await { - // TODO this tracing warn will go into the void. Should we change this to - // stdout? - tracing::warn!(%err, "failed to benchmark file IO"); + // TODO Not ideal to print directly to stdout. + // We should use a compilation event instead to report async errors. + println!("Failed to benchmark file IO: {err}"); } }) });