Skip to content

Commit 5aa58d7

Browse files
committed
Turbopack: Change run once signature to avoid exposing TaskId
1 parent 2249574 commit 5aa58d7

File tree

10 files changed

+130
-145
lines changed

10 files changed

+130
-145
lines changed

crates/napi/src/next_api/project.rs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,8 @@ use tracing::Instrument;
3232
use tracing_subscriber::{Registry, layer::SubscriberExt, util::SubscriberInitExt};
3333
use turbo_rcstr::{RcStr, rcstr};
3434
use turbo_tasks::{
35-
Completion, Effects, FxIndexSet, NonLocalValue, OperationValue, OperationVc, ReadRef,
36-
ResolvedVc, TaskInput, TransientInstance, TryJoinIterExt, TurboTasksApi, UpdateInfo, Vc,
37-
get_effects,
35+
Effects, FxIndexSet, NonLocalValue, OperationValue, OperationVc, ReadRef, ResolvedVc,
36+
TaskInput, TransientInstance, TryJoinIterExt, TurboTasksApi, UpdateInfo, Vc, get_effects,
3837
message_queue::{CompilationEvent, Severity, TimingEvent},
3938
trace::TraceRawVcs,
4039
};
@@ -466,13 +465,18 @@ pub fn project_new(
466465
.or_else(|e| turbopack_ctx.throw_turbopack_internal_result(&e))
467466
.await?;
468467

469-
turbo_tasks.spawn_once_task({
468+
turbo_tasks.start_once_process({
470469
let tt = turbo_tasks.clone();
471-
async move {
472-
benchmark_file_io(tt, container.project().node_root().owned().await?)
473-
.await
474-
.inspect_err(|err| tracing::warn!(%err, "failed to benchmark file IO"))
475-
}
470+
Box::pin(async move {
471+
let future = async move {
472+
benchmark_file_io(tt, container.project().node_root().owned().await?).await
473+
};
474+
if let Err(err) = future.await {
475+
// TODO this tracing warn will go into the void. Should we change this to
476+
// stdout?
477+
tracing::warn!(%err, "failed to benchmark file IO");
478+
}
479+
})
476480
});
477481

478482
Ok(External::new(ProjectInstance {
@@ -520,10 +524,7 @@ impl CompilationEvent for SlowFilesystemEvent {
520524
/// - https://x.com/jarredsumner/status/1637549427677364224
521525
/// - https://github.com/oven-sh/bun/blob/06a9aa80c38b08b3148bfeabe560/src/install/install.zig#L3038
522526
#[tracing::instrument(skip(turbo_tasks))]
523-
async fn benchmark_file_io(
524-
turbo_tasks: NextTurboTasks,
525-
directory: FileSystemPath,
526-
) -> Result<Vc<Completion>> {
527+
async fn benchmark_file_io(turbo_tasks: NextTurboTasks, directory: FileSystemPath) -> Result<()> {
527528
// try to get the real file path on disk so that we can use it with tokio
528529
let fs = ResolvedVc::try_downcast_type::<DiskFileSystem>(directory.fs)
529530
.context(anyhow!(
@@ -568,7 +569,7 @@ async fn benchmark_file_io(
568569
}));
569570
}
570571

571-
Ok(Completion::new())
572+
Ok(())
572573
}
573574

574575
#[napi]

turbopack/crates/turbo-tasks-backend/benches/scope_stress.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use anyhow::Result;
22
use criterion::{BenchmarkId, Criterion};
3-
use turbo_tasks::{Completion, ReadConsistency, TryJoinIterExt, TurboTasks, Vc};
3+
use turbo_tasks::{Completion, TryJoinIterExt, TurboTasks, Vc};
44
use turbo_tasks_backend::{BackendOptions, TurboTasksBackend, noop_backing_storage};
55

66
pub fn scope_stress(c: &mut Criterion) {
@@ -47,12 +47,11 @@ pub fn scope_stress(c: &mut Criterion) {
4747
.map(|(a, b)| {
4848
let tt = &tt;
4949
async move {
50-
let task = tt.spawn_once_task(async move {
50+
tt.run_once(async move {
5151
rectangle(a, b).strongly_consistent().await?;
52-
Ok::<Vc<()>, _>(Default::default())
53-
});
54-
tt.wait_task_completion(task, ReadConsistency::Eventual)
55-
.await
52+
Ok(())
53+
})
54+
.await
5655
}
5756
})
5857
.try_join()

turbopack/crates/turbo-tasks-backend/benches/stress.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use anyhow::Result;
22
use criterion::{BenchmarkId, Criterion};
3-
use turbo_tasks::{ReadConsistency, TryJoinIterExt, TurboTasks, Vc};
3+
use turbo_tasks::{TryJoinIterExt, TurboTasks, Vc};
44
use turbo_tasks_backend::{BackendOptions, TurboTasksBackend, noop_backing_storage};
55

66
pub fn fibonacci(c: &mut Criterion) {
@@ -37,18 +37,16 @@ pub fn fibonacci(c: &mut Criterion) {
3737
noop_backing_storage(),
3838
));
3939
async move {
40-
let task = tt.spawn_once_task(async move {
40+
tt.run_once(async move {
4141
// Number of tasks:
4242
// 1 root task
4343
// size >= 1 => + fib(0) = 1
4444
// size >= 2 => + fib(1) = 2
4545
(0..size).map(|i| fib(i, i)).try_join().await?;
46-
Ok::<Vc<()>, _>(Default::default())
47-
});
48-
tt.wait_task_completion(task, ReadConsistency::Eventual)
49-
.await
50-
.unwrap();
51-
tt
46+
Ok(())
47+
})
48+
.await
49+
.unwrap();
5250
}
5351
})
5452
});

turbopack/crates/turbo-tasks-testing/src/lib.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::{
77
future::Future,
88
mem::replace,
99
panic::AssertUnwindSafe,
10+
pin::Pin,
1011
sync::{Arc, Mutex, Weak},
1112
};
1213

@@ -120,22 +121,30 @@ impl TurboTasksCallApi for VcStorage {
120121
fn run_once(
121122
&self,
122123
_future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
123-
) -> TaskId {
124+
) -> Pin<
125+
Box<
126+
(dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send + 'static),
127+
>,
128+
> {
124129
unreachable!()
125130
}
126131

127132
fn run_once_with_reason(
128133
&self,
129134
_reason: StaticOrArc<dyn InvalidationReason>,
130135
_future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
131-
) -> TaskId {
136+
) -> Pin<
137+
Box<
138+
(dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send + 'static),
139+
>,
140+
> {
132141
unreachable!()
133142
}
134143

135-
fn run_once_process(
144+
fn start_once_process(
136145
&self,
137-
_future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
138-
) -> TaskId {
146+
_future: std::pin::Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
147+
) {
139148
unreachable!()
140149
}
141150
}

turbopack/crates/turbo-tasks/src/manager.rs

Lines changed: 33 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -77,16 +77,13 @@ pub trait TurboTasksCallApi: Sync + Send {
7777
fn run_once(
7878
&self,
7979
future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
80-
) -> TaskId;
80+
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
8181
fn run_once_with_reason(
8282
&self,
8383
reason: StaticOrArc<dyn InvalidationReason>,
8484
future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
85-
) -> TaskId;
86-
fn run_once_process(
87-
&self,
88-
future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
89-
) -> TaskId;
85+
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
86+
fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
9087
}
9188

9289
/// A type-erased subset of [`TurboTasks`] stored inside a thread local when we're in a turbo task
@@ -490,7 +487,7 @@ impl<B: Backend + 'static> TurboTasks<B> {
490487
/// Creates a new root task, that is only executed once.
491488
/// Dependencies will not invalidate the task.
492489
#[track_caller]
493-
pub fn spawn_once_task<T, Fut>(&self, future: Fut) -> TaskId
490+
fn spawn_once_task<T, Fut>(&self, future: Fut) -> TaskId
494491
where
495492
T: ?Sized,
496493
Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
@@ -530,6 +527,21 @@ impl<B: Backend + 'static> TurboTasks<B> {
530527
Ok(rx.await?)
531528
}
532529

530+
pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
531+
let this = self.pin();
532+
tokio::spawn(async move {
533+
this.pin()
534+
.run_once(async move {
535+
this.finish_foreground_job();
536+
future.await;
537+
this.begin_foreground_job();
538+
Ok(())
539+
})
540+
.await
541+
.unwrap()
542+
});
543+
}
544+
533545
pub(crate) fn native_call(
534546
&self,
535547
native_fn: &'static NativeFunction,
@@ -1106,41 +1118,28 @@ impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
11061118
fn run_once(
11071119
&self,
11081120
future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1109-
) -> TaskId {
1110-
self.spawn_once_task(async move {
1111-
future.await?;
1112-
Ok(Completion::new())
1113-
})
1121+
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1122+
let this = self.pin();
1123+
Box::pin(async move { this.run_once(future).await })
11141124
}
11151125

11161126
#[track_caller]
11171127
fn run_once_with_reason(
11181128
&self,
11191129
reason: StaticOrArc<dyn InvalidationReason>,
11201130
future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1121-
) -> TaskId {
1131+
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
11221132
{
11231133
let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
11241134
reason_set.insert(reason);
11251135
}
1126-
self.spawn_once_task(async move {
1127-
future.await?;
1128-
Ok(Completion::new())
1129-
})
1136+
let this = self.pin();
1137+
Box::pin(async move { this.run_once(future).await })
11301138
}
11311139

11321140
#[track_caller]
1133-
fn run_once_process(
1134-
&self,
1135-
future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1136-
) -> TaskId {
1137-
let this = self.pin();
1138-
self.spawn_once_task(async move {
1139-
this.finish_foreground_job();
1140-
future.await?;
1141-
this.begin_foreground_job();
1142-
Ok(Completion::new())
1143-
})
1141+
fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1142+
self.start_once_process(future)
11441143
}
11451144
}
11461145

@@ -1421,18 +1420,13 @@ pub async fn run_once<T: Send + 'static>(
14211420
) -> Result<T> {
14221421
let (tx, rx) = tokio::sync::oneshot::channel();
14231422

1424-
let task_id = tt.run_once(Box::pin(async move {
1423+
tt.run_once(Box::pin(async move {
14251424
let result = future.await?;
14261425
tx.send(result)
14271426
.map_err(|_| anyhow!("unable to send result"))?;
14281427
Ok(())
1429-
}));
1430-
1431-
// INVALIDATION: A Once task will never invalidate, therefore we don't need to
1432-
// track a dependency
1433-
let raw_result = read_task_output_untracked(&*tt, task_id, ReadConsistency::Eventual).await?;
1434-
let raw_future = raw_result.into_read().untracked();
1435-
turbo_tasks_future_scope(tt, ReadVcFuture::<Completion>::from(raw_future)).await?;
1428+
}))
1429+
.await?;
14361430

14371431
Ok(rx.await?)
14381432
}
@@ -1444,21 +1438,16 @@ pub async fn run_once_with_reason<T: Send + 'static>(
14441438
) -> Result<T> {
14451439
let (tx, rx) = tokio::sync::oneshot::channel();
14461440

1447-
let task_id = tt.run_once_with_reason(
1441+
tt.run_once_with_reason(
14481442
(Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
14491443
Box::pin(async move {
14501444
let result = future.await?;
14511445
tx.send(result)
14521446
.map_err(|_| anyhow!("unable to send result"))?;
14531447
Ok(())
14541448
}),
1455-
);
1456-
1457-
// INVALIDATION: A Once task will never invalidate, therefore we don't need to
1458-
// track a dependency
1459-
let raw_result = read_task_output_untracked(&*tt, task_id, ReadConsistency::Eventual).await?;
1460-
let raw_future = raw_result.into_read().untracked();
1461-
turbo_tasks_future_scope(tt, ReadVcFuture::<Completion>::from(raw_future)).await?;
1449+
)
1450+
.await?;
14621451

14631452
Ok(rx.await?)
14641453
}

0 commit comments

Comments
 (0)