Skip to content

Commit f5d4d77

Browse files
committed
Turbopack: Change run once signature to avoid exposing TaskId
1 parent de42be7 commit f5d4d77

File tree

11 files changed

+156
-161
lines changed

11 files changed

+156
-161
lines changed

crates/napi/src/next_api/project.rs

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,8 @@ use tracing::Instrument;
3232
use tracing_subscriber::{Registry, layer::SubscriberExt, util::SubscriberInitExt};
3333
use turbo_rcstr::{RcStr, rcstr};
3434
use turbo_tasks::{
35-
Completion, Effects, FxIndexSet, NonLocalValue, OperationValue, OperationVc, ReadRef,
36-
ResolvedVc, TaskInput, TransientInstance, TryJoinIterExt, TurboTasksApi, UpdateInfo, Vc,
37-
get_effects,
35+
Effects, FxIndexSet, NonLocalValue, OperationValue, OperationVc, ReadRef, ResolvedVc,
36+
TaskInput, TransientInstance, TryJoinIterExt, TurboTasksApi, UpdateInfo, Vc, get_effects,
3837
message_queue::{CompilationEvent, Severity, TimingEvent},
3938
trace::TraceRawVcs,
4039
};
@@ -466,13 +465,18 @@ pub fn project_new(
466465
.or_else(|e| turbopack_ctx.throw_turbopack_internal_result(&e))
467466
.await?;
468467

469-
turbo_tasks.spawn_once_task({
468+
turbo_tasks.start_once_process({
470469
let tt = turbo_tasks.clone();
471-
async move {
472-
benchmark_file_io(tt, container.project().node_root().owned().await?)
473-
.await
474-
.inspect_err(|err| tracing::warn!(%err, "failed to benchmark file IO"))
475-
}
470+
Box::pin(async move {
471+
let future = async move {
472+
benchmark_file_io(tt, container.project().node_root().owned().await?).await
473+
};
474+
if let Err(err) = future.await {
475+
// TODO this tracing warn will go into the void. Should we change this to
476+
// stdout?
477+
tracing::warn!(%err, "failed to benchmark file IO");
478+
}
479+
})
476480
});
477481

478482
Ok(External::new(ProjectInstance {
@@ -520,10 +524,7 @@ impl CompilationEvent for SlowFilesystemEvent {
520524
/// - https://x.com/jarredsumner/status/1637549427677364224
521525
/// - https://github.com/oven-sh/bun/blob/06a9aa80c38b08b3148bfeabe560/src/install/install.zig#L3038
522526
#[tracing::instrument(skip(turbo_tasks))]
523-
async fn benchmark_file_io(
524-
turbo_tasks: NextTurboTasks,
525-
directory: FileSystemPath,
526-
) -> Result<Vc<Completion>> {
527+
async fn benchmark_file_io(turbo_tasks: NextTurboTasks, directory: FileSystemPath) -> Result<()> {
527528
// try to get the real file path on disk so that we can use it with tokio
528529
let fs = ResolvedVc::try_downcast_type::<DiskFileSystem>(directory.fs)
529530
.context(anyhow!(
@@ -568,7 +569,7 @@ async fn benchmark_file_io(
568569
}));
569570
}
570571

571-
Ok(Completion::new())
572+
Ok(())
572573
}
573574

574575
#[napi]

crates/next-api/benches/hmr.rs

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -435,28 +435,38 @@ fn bench_update(bencher: divan::Bencher, module_count: usize, num_updates: usize
435435
let setup = s.clone();
436436

437437
setup.clone().rt.block_on(async move {
438-
setup.clone().tt.run_once(Box::pin(async move {
439-
let _ = setup
440-
.benchmark
441-
.benchmark_initial_compilation()
442-
.await
443-
.unwrap();
444-
Ok(())
445-
}));
438+
setup
439+
.clone()
440+
.tt
441+
.run_once(Box::pin(async move {
442+
let _ = setup
443+
.benchmark
444+
.benchmark_initial_compilation()
445+
.await
446+
.unwrap();
447+
Ok(())
448+
}))
449+
.await
450+
.unwrap();
446451
});
447452

448453
s.clone()
449454
})
450455
.bench_values(|setup| {
451456
setup.clone().rt.block_on(async move {
452-
setup.clone().tt.run_once(Box::pin(async move {
453-
setup
454-
.benchmark
455-
.benchmark_hmr_update(num_updates)
456-
.await
457-
.unwrap();
458-
Ok(())
459-
}));
457+
setup
458+
.clone()
459+
.tt
460+
.run_once(Box::pin(async move {
461+
setup
462+
.benchmark
463+
.benchmark_hmr_update(num_updates)
464+
.await
465+
.unwrap();
466+
Ok(())
467+
}))
468+
.await
469+
.unwrap();
460470
})
461471
});
462472
}

turbopack/crates/turbo-tasks-backend/benches/scope_stress.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use anyhow::Result;
22
use criterion::{BenchmarkId, Criterion};
3-
use turbo_tasks::{Completion, ReadConsistency, TryJoinIterExt, TurboTasks, Vc};
3+
use turbo_tasks::{Completion, TryJoinIterExt, TurboTasks, Vc};
44
use turbo_tasks_backend::{BackendOptions, TurboTasksBackend, noop_backing_storage};
55

66
pub fn scope_stress(c: &mut Criterion) {
@@ -47,12 +47,11 @@ pub fn scope_stress(c: &mut Criterion) {
4747
.map(|(a, b)| {
4848
let tt = &tt;
4949
async move {
50-
let task = tt.spawn_once_task(async move {
50+
tt.run_once(async move {
5151
rectangle(a, b).strongly_consistent().await?;
52-
Ok::<Vc<()>, _>(Default::default())
53-
});
54-
tt.wait_task_completion(task, ReadConsistency::Eventual)
55-
.await
52+
Ok(())
53+
})
54+
.await
5655
}
5756
})
5857
.try_join()

turbopack/crates/turbo-tasks-backend/benches/stress.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use anyhow::Result;
22
use criterion::{BenchmarkId, Criterion};
3-
use turbo_tasks::{ReadConsistency, TryJoinIterExt, TurboTasks, Vc};
3+
use turbo_tasks::{TryJoinIterExt, TurboTasks, Vc};
44
use turbo_tasks_backend::{BackendOptions, TurboTasksBackend, noop_backing_storage};
55

66
pub fn fibonacci(c: &mut Criterion) {
@@ -37,18 +37,16 @@ pub fn fibonacci(c: &mut Criterion) {
3737
noop_backing_storage(),
3838
));
3939
async move {
40-
let task = tt.spawn_once_task(async move {
40+
tt.run_once(async move {
4141
// Number of tasks:
4242
// 1 root task
4343
// size >= 1 => + fib(0) = 1
4444
// size >= 2 => + fib(1) = 2
4545
(0..size).map(|i| fib(i, i)).try_join().await?;
46-
Ok::<Vc<()>, _>(Default::default())
47-
});
48-
tt.wait_task_completion(task, ReadConsistency::Eventual)
49-
.await
50-
.unwrap();
51-
tt
46+
Ok(())
47+
})
48+
.await
49+
.unwrap();
5250
}
5351
})
5452
});

turbopack/crates/turbo-tasks-testing/src/lib.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::{
77
future::Future,
88
mem::replace,
99
panic::AssertUnwindSafe,
10+
pin::Pin,
1011
sync::{Arc, Mutex, Weak},
1112
};
1213

@@ -120,22 +121,30 @@ impl TurboTasksCallApi for VcStorage {
120121
fn run_once(
121122
&self,
122123
_future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
123-
) -> TaskId {
124+
) -> Pin<
125+
Box<
126+
(dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send + 'static),
127+
>,
128+
> {
124129
unreachable!()
125130
}
126131

127132
fn run_once_with_reason(
128133
&self,
129134
_reason: StaticOrArc<dyn InvalidationReason>,
130135
_future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
131-
) -> TaskId {
136+
) -> Pin<
137+
Box<
138+
(dyn futures::Future<Output = Result<(), anyhow::Error>> + std::marker::Send + 'static),
139+
>,
140+
> {
132141
unreachable!()
133142
}
134143

135-
fn run_once_process(
144+
fn start_once_process(
136145
&self,
137-
_future: std::pin::Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
138-
) -> TaskId {
146+
_future: std::pin::Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
147+
) {
139148
unreachable!()
140149
}
141150
}

turbopack/crates/turbo-tasks/src/manager.rs

Lines changed: 33 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -79,16 +79,13 @@ pub trait TurboTasksCallApi: Sync + Send {
7979
fn run_once(
8080
&self,
8181
future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
82-
) -> TaskId;
82+
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
8383
fn run_once_with_reason(
8484
&self,
8585
reason: StaticOrArc<dyn InvalidationReason>,
8686
future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
87-
) -> TaskId;
88-
fn run_once_process(
89-
&self,
90-
future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
91-
) -> TaskId;
87+
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>>;
88+
fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>);
9289
}
9390

9491
/// A type-erased subset of [`TurboTasks`] stored inside a thread local when we're in a turbo task
@@ -558,7 +555,7 @@ impl<B: Backend + 'static> TurboTasks<B> {
558555
/// Creates a new root task, that is only executed once.
559556
/// Dependencies will not invalidate the task.
560557
#[track_caller]
561-
pub fn spawn_once_task<T, Fut>(&self, future: Fut) -> TaskId
558+
fn spawn_once_task<T, Fut>(&self, future: Fut) -> TaskId
562559
where
563560
T: ?Sized,
564561
Fut: Future<Output = Result<Vc<T>>> + Send + 'static,
@@ -598,6 +595,21 @@ impl<B: Backend + 'static> TurboTasks<B> {
598595
Ok(rx.await?)
599596
}
600597

598+
pub fn start_once_process(&self, future: impl Future<Output = ()> + Send + 'static) {
599+
let this = self.pin();
600+
tokio::spawn(async move {
601+
this.pin()
602+
.run_once(async move {
603+
this.finish_foreground_job();
604+
future.await;
605+
this.begin_foreground_job();
606+
Ok(())
607+
})
608+
.await
609+
.unwrap()
610+
});
611+
}
612+
601613
pub(crate) fn native_call(
602614
&self,
603615
native_fn: &'static NativeFunction,
@@ -1185,41 +1197,28 @@ impl<B: Backend + 'static> TurboTasksCallApi for TurboTasks<B> {
11851197
fn run_once(
11861198
&self,
11871199
future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1188-
) -> TaskId {
1189-
self.spawn_once_task(async move {
1190-
future.await?;
1191-
Ok(Completion::new())
1192-
})
1200+
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
1201+
let this = self.pin();
1202+
Box::pin(async move { this.run_once(future).await })
11931203
}
11941204

11951205
#[track_caller]
11961206
fn run_once_with_reason(
11971207
&self,
11981208
reason: StaticOrArc<dyn InvalidationReason>,
11991209
future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1200-
) -> TaskId {
1210+
) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
12011211
{
12021212
let (_, reason_set) = &mut *self.aggregated_update.lock().unwrap();
12031213
reason_set.insert(reason);
12041214
}
1205-
self.spawn_once_task(async move {
1206-
future.await?;
1207-
Ok(Completion::new())
1208-
})
1215+
let this = self.pin();
1216+
Box::pin(async move { this.run_once(future).await })
12091217
}
12101218

12111219
#[track_caller]
1212-
fn run_once_process(
1213-
&self,
1214-
future: Pin<Box<dyn Future<Output = Result<()>> + Send + 'static>>,
1215-
) -> TaskId {
1216-
let this = self.pin();
1217-
self.spawn_once_task(async move {
1218-
this.finish_foreground_job();
1219-
future.await?;
1220-
this.begin_foreground_job();
1221-
Ok(Completion::new())
1222-
})
1220+
fn start_once_process(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
1221+
self.start_once_process(future)
12231222
}
12241223
}
12251224

@@ -1555,18 +1554,13 @@ pub async fn run_once<T: Send + 'static>(
15551554
) -> Result<T> {
15561555
let (tx, rx) = tokio::sync::oneshot::channel();
15571556

1558-
let task_id = tt.run_once(Box::pin(async move {
1557+
tt.run_once(Box::pin(async move {
15591558
let result = future.await?;
15601559
tx.send(result)
15611560
.map_err(|_| anyhow!("unable to send result"))?;
15621561
Ok(())
1563-
}));
1564-
1565-
// INVALIDATION: A Once task will never invalidate, therefore we don't need to
1566-
// track a dependency
1567-
let raw_result = read_task_output_untracked(&*tt, task_id, ReadConsistency::Eventual).await?;
1568-
let raw_future = raw_result.into_read().untracked();
1569-
turbo_tasks_future_scope(tt, ReadVcFuture::<Completion>::from(raw_future)).await?;
1562+
}))
1563+
.await?;
15701564

15711565
Ok(rx.await?)
15721566
}
@@ -1578,21 +1572,16 @@ pub async fn run_once_with_reason<T: Send + 'static>(
15781572
) -> Result<T> {
15791573
let (tx, rx) = tokio::sync::oneshot::channel();
15801574

1581-
let task_id = tt.run_once_with_reason(
1575+
tt.run_once_with_reason(
15821576
(Arc::new(reason) as Arc<dyn InvalidationReason>).into(),
15831577
Box::pin(async move {
15841578
let result = future.await?;
15851579
tx.send(result)
15861580
.map_err(|_| anyhow!("unable to send result"))?;
15871581
Ok(())
15881582
}),
1589-
);
1590-
1591-
// INVALIDATION: A Once task will never invalidate, therefore we don't need to
1592-
// track a dependency
1593-
let raw_result = read_task_output_untracked(&*tt, task_id, ReadConsistency::Eventual).await?;
1594-
let raw_future = raw_result.into_read().untracked();
1595-
turbo_tasks_future_scope(tt, ReadVcFuture::<Completion>::from(raw_future)).await?;
1583+
)
1584+
.await?;
15961585

15971586
Ok(rx.await?)
15981587
}

0 commit comments

Comments
 (0)