Skip to content

Commit 23ee8d7

Browse files
committed
Turbopack: Change run once signature to avoid exposing TaskId
1 parent 485f796 commit 23ee8d7

File tree

11 files changed

+156
-161
lines changed

11 files changed

+156
-161
lines changed

crates/napi/src/next_api/project.rs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,8 @@ use tracing::Instrument;
3232
use tracing_subscriber::{Registry, layer::SubscriberExt, util::SubscriberInitExt};
3333
use turbo_rcstr::{RcStr, rcstr};
3434
use turbo_tasks::{
35-
Completion, Effects, FxIndexSet, NonLocalValue, OperationValue, OperationVc, ReadRef,
36-
ResolvedVc, TaskInput, TransientInstance, TryJoinIterExt, TurboTasksApi, UpdateInfo, Vc,
37-
get_effects,
35+
Effects, FxIndexSet, NonLocalValue, OperationValue, OperationVc, ReadRef, ResolvedVc,
36+
TaskInput, TransientInstance, TryJoinIterExt, TurboTasksApi, UpdateInfo, Vc, get_effects,
3837
message_queue::{CompilationEvent, Severity, TimingEvent},
3938
trace::TraceRawVcs,
4039
};
@@ -466,13 +465,18 @@ pub fn project_new(
466465
.or_else(|e| turbopack_ctx.throw_turbopack_internal_result(&e))
467466
.await?;
468467

469-
turbo_tasks.spawn_once_task({
468+
turbo_tasks.start_once_process({
470469
let tt = turbo_tasks.clone();
471-
async move {
472-
benchmark_file_io(tt, container.project().node_root().owned().await?)
473-
.await
474-
.inspect_err(|err| tracing::warn!(%err, "failed to benchmark file IO"))
475-
}
470+
Box::pin(async move {
471+
let future = async move {
472+
benchmark_file_io(tt, container.project().node_root().owned().await?).await
473+
};
474+
if let Err(err) = future.await {
475+
// TODO this tracing warn will go into the void. Should we change this to
476+
// stdout?
477+
tracing::warn!(%err, "failed to benchmark file IO");
478+
}
479+
})
476480
});
477481

478482
Ok(External::new(ProjectInstance {
@@ -520,10 +524,7 @@ impl CompilationEvent for SlowFilesystemEvent {
520524
/// - https://x.com/jarredsumner/status/1637549427677364224
521525
/// - https://github.com/oven-sh/bun/blob/06a9aa80c38b08b3148bfeabe560/src/install/install.zig#L3038
522526
#[tracing::instrument(skip(turbo_tasks))]
523-
async fn benchmark_file_io(
524-
turbo_tasks: NextTurboTasks,
525-
directory: FileSystemPath,
526-
) -> Result<Vc<Completion>> {
527+
async fn benchmark_file_io(turbo_tasks: NextTurboTasks, directory: FileSystemPath) -> Result<()> {
527528
// try to get the real file path on disk so that we can use it with tokio
528529
let fs = ResolvedVc::try_downcast_type::<DiskFileSystem>(directory.fs)
529530
.context(anyhow!(
@@ -568,7 +569,7 @@ async fn benchmark_file_io(
568569
}));
569570
}
570571

571-
Ok(Completion::new())
572+
Ok(())
572573
}
573574

574575
#[napi]

crates/next-api/benches/hmr.rs

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -435,28 +435,38 @@ fn bench_update(bencher: divan::Bencher, module_count: usize, num_updates: usize
435435
let setup = s.clone();
436436

437437
setup.clone().rt.block_on(async move {
438-
setup.clone().tt.run_once(Box::pin(async move {
439-
let _ = setup
440-
.benchmark
441-
.benchmark_initial_compilation()
442-
.await
443-
.unwrap();
444-
Ok(())
445-
}));
438+
setup
439+
.clone()
440+
.tt
441+
.run_once(Box::pin(async move {
442+
let _ = setup
443+
.benchmark
444+
.benchmark_initial_compilation()
445+
.await
446+
.unwrap();
447+
Ok(())
448+
}))
449+
.await
450+
.unwrap();
446451
});
447452

448453
s.clone()
449454
})
450455
.bench_values(|setup| {
451456
setup.clone().rt.block_on(async move {
452-
setup.clone().tt.run_once(Box::pin(async move {
453-
setup
454-
.benchmark
455-
.benchmark_hmr_update(num_updates)
456-
.await
457-
.unwrap();
458-
Ok(())
459-
}));
457+
setup
458+
.clone()
459+
.tt
460+
.run_once(Box::pin(async move {
461+
setup
462+
.benchmark
463+
.benchmark_hmr_update(num_updates)
464+
.await
465+
.unwrap();
466+
Ok(())
467+
}))
468+
.await
469+
.unwrap();
460470
})
461471
});
462472
}

turbopack/crates/turbo-tasks-backend/benches/scope_stress.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use anyhow::Result;
22
use criterion::{BenchmarkId, Criterion};
3-
use turbo_tasks::{Completion, ReadConsistency, TryJoinIterExt, TurboTasks, Vc};
3+
use turbo_tasks::{Completion, TryJoinIterExt, TurboTasks, Vc};
44
use turbo_tasks_backend::{BackendOptions, TurboTasksBackend, noop_backing_storage};
55

66
pub fn scope_stress(c: &mut Criterion) {
@@ -47,12 +47,11 @@ pub fn scope_stress(c: &mut Criterion) {
4747
.map(|(a, b)| {
4848
let tt = &tt;
4949
async move {
50-
let task = tt.spawn_once_task(async move {
50+
tt.run_once(async move {
5151
rectangle(a, b).strongly_consistent().await?;
52-
Ok::<Vc<()>, _>(Default::default())
53-
});
54-
tt.wait_task_completion(task, ReadConsistency::Eventual)
55-
.await
52+
Ok(())
53+
})
54+
.await
5655
}
5756
})
5857
.try_join()

turbopack/crates/turbo-tasks-backend/benches/stress.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use anyhow::Result;
22
use criterion::{BenchmarkId, Criterion};
3-
use turbo_tasks::{ReadConsistency, TryJoinIterExt, TurboTasks, Vc};
3+
use turbo_tasks::{TryJoinIterExt, TurboTasks, Vc};
44
use turbo_tasks_backend::{BackendOptions, TurboTasksBackend, noop_backing_storage};
55

66
pub fn fibonacci(c: &mut Criterion) {
@@ -37,18 +37,16 @@ pub fn fibonacci(c: &mut Criterion) {
3737
noop_backing_storage(),
3838
));
3939
async move {
40-
let task = tt.spawn_once_task(async move {
40+
tt.run_once(async move {
4141
// Number of tasks:
4242
// 1 root task
4343
// size >= 1 => + fib(0) = 1
4444
// size >= 2 => + fib(1) = 2
4545
(0..size).map(|i| fib(i, i)).try_join().await?;
46-
Ok::<Vc<()>, _>(Default::default())
47-
});
48-
tt.wait_task_completion(task, ReadConsistency::Eventual)
49-
.await
50-
.unwrap();
51-
tt
46+
Ok(())
47+
})
48+
.await
49+
.unwrap();
5250
}
5351
})
5452
});

turbopack/crates/turbo-tasks-testing/src/lib.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::{
77
future::Future,
88
mem::replace,
99
panic::AssertUnwindSafe,
10+
pin::Pin,
1011
sync::{Arc, Mutex, Weak},
1112
};
1213

@@ -120,22 +121,30 @@ impl TurboTasksCallApi for VcStorage {
120121
fn run_once(
121122
&self,
122123
_future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
123-
) -> TaskId {
124+
) -> Pin<
125+
Box<
126+
(dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send + 'static),
127+
>,
128+
> {
124129
unreachable!()
125130
}
126131

127132
fn run_once_with_reason(
128133
&self,
129134
_reason: StaticOrArc<dyn InvalidationReason>,
130135
_future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
131-
) -> TaskId {
136+
) -> Pin<
137+
Box<
138+
(dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send + 'static),
139+
>,
140+
> {
132141
unreachable!()
133142
}
134143

135-
fn run_once_process(
144+
fn start_once_process(
136145
&self,
137-
_future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
138-
) -> TaskId {
146+
_future: std::pin::Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
147+
) {
139148
unreachable!()
140149
}
141150
}

turbopack/crates/turbo-tasks/src/manager.rs

Lines changed: 33 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -77,16 +77,13 @@ pub trait TurboTasksCallApi: Sync + Send {
7777
fn run_once(
7878
&self,
7979
future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
80-
) -> TaskId;
80+
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
8181
fn run_once_with_reason(
8282
&self,
8383
reason: StaticOrArc<dyn InvalidationReason>,
8484
future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
85-
) -> TaskId;
86-
fn run_once_process(
87-
&self,
88-
future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
89-
) -> TaskId;
85+
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
86+
fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
9087
}
9188

9289
/// A type-erased subset of [`TurboTasks`] stored inside a thread local when we're in a turbo task
@@ -490,7 +487,7 @@ impl<B: Backend + 'static> TurboTasks<B> {
490487
/// Creates a new root task, that is only executed once.
491488
/// Dependencies will not invalidate the task.
492489
#[track_caller]
493-
pub fn spawn_once_task<T, Fut>(&self, future: Fut) -> TaskId
490+
fn spawn_once_task<T, Fut>(&self, future: Fut) -> TaskId
494491
where
495492
T: ?Sized,
496493
Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
@@ -530,6 +527,21 @@ impl<B: Backend + 'static> TurboTasks<B> {
530527
Ok(rx.await?)
531528
}
532529

530+
pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
531+
let this = self.pin();
532+
tokio::spawn(async move {
533+
this.pin()
534+
.run_once(async move {
535+
this.finish_foreground_job();
536+
future.await;
537+
this.begin_foreground_job();
538+
Ok(())
539+
})
540+
.await
541+
.unwrap()
542+
});
543+
}
544+
533545
pub(crate) fn native_call(
534546
&self,
535547
native_fn: &'static NativeFunction,
@@ -1107,41 +1119,28 @@ impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
11071119
fn run_once(
11081120
&self,
11091121
future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1110-
) -> TaskId {
1111-
self.spawn_once_task(async move {
1112-
future.await?;
1113-
Ok(Completion::new())
1114-
})
1122+
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1123+
let this = self.pin();
1124+
Box::pin(async move { this.run_once(future).await })
11151125
}
11161126

11171127
#[track_caller]
11181128
fn run_once_with_reason(
11191129
&self,
11201130
reason: StaticOrArc<dyn InvalidationReason>,
11211131
future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1122-
) -> TaskId {
1132+
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
11231133
{
11241134
let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
11251135
reason_set.insert(reason);
11261136
}
1127-
self.spawn_once_task(async move {
1128-
future.await?;
1129-
Ok(Completion::new())
1130-
})
1137+
let this = self.pin();
1138+
Box::pin(async move { this.run_once(future).await })
11311139
}
11321140

11331141
#[track_caller]
1134-
fn run_once_process(
1135-
&self,
1136-
future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1137-
) -> TaskId {
1138-
let this = self.pin();
1139-
self.spawn_once_task(async move {
1140-
this.finish_foreground_job();
1141-
future.await?;
1142-
this.begin_foreground_job();
1143-
Ok(Completion::new())
1144-
})
1142+
fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1143+
self.start_once_process(future)
11451144
}
11461145
}
11471146

@@ -1422,18 +1421,13 @@ pub async fn run_once<T: Send + 'static>(
14221421
) -> Result<T> {
14231422
let (tx, rx) = tokio::sync::oneshot::channel();
14241423

1425-
let task_id = tt.run_once(Box::pin(async move {
1424+
tt.run_once(Box::pin(async move {
14261425
let result = future.await?;
14271426
tx.send(result)
14281427
.map_err(|_| anyhow!("unable to send result"))?;
14291428
Ok(())
1430-
}));
1431-
1432-
// INVALIDATION: A Once task will never invalidate, therefore we don't need to
1433-
// track a dependency
1434-
let raw_result = read_task_output_untracked(&*tt, task_id, ReadConsistency::Eventual).await?;
1435-
let raw_future = raw_result.into_read().untracked();
1436-
turbo_tasks_future_scope(tt, ReadVcFuture::<Completion>::from(raw_future)).await?;
1429+
}))
1430+
.await?;
14371431

14381432
Ok(rx.await?)
14391433
}
@@ -1445,21 +1439,16 @@ pub async fn run_once_with_reason<T: Send + 'static>(
14451439
) -> Result<T> {
14461440
let (tx, rx) = tokio::sync::oneshot::channel();
14471441

1448-
let task_id = tt.run_once_with_reason(
1442+
tt.run_once_with_reason(
14491443
(Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
14501444
Box::pin(async move {
14511445
let result = future.await?;
14521446
tx.send(result)
14531447
.map_err(|_| anyhow!("unable to send result"))?;
14541448
Ok(())
14551449
}),
1456-
);
1457-
1458-
// INVALIDATION: A Once task will never invalidate, therefore we don't need to
1459-
// track a dependency
1460-
let raw_result = read_task_output_untracked(&*tt, task_id, ReadConsistency::Eventual).await?;
1461-
let raw_future = raw_result.into_read().untracked();
1462-
turbo_tasks_future_scope(tt, ReadVcFuture::<Completion>::from(raw_future)).await?;
1450+
)
1451+
.await?;
14631452

14641453
Ok(rx.await?)
14651454
}

0 commit comments

Comments
 (0)