Skip to content

Commit ff64db9

Browse files
committed
Turbopack: add turbo_tasks::run to run in turbo tasks scope without a task
use the new method in some test cases where it was possible
1 parent 8c8617a commit ff64db9

35 files changed

+321
-185
lines changed

turbopack/crates/turbo-tasks-backend/src/backend/mod.rs

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
415415
unsafe fn connect_child_with_tx<'l, 'tx: 'l>(
416416
&'l self,
417417
tx: Option<&'l B::ReadTransaction<'tx>>,
418-
parent_task: TaskId,
418+
parent_task: Option<TaskId>,
419419
child_task: TaskId,
420420
turbo_tasks: &'l dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
421421
) {
@@ -426,7 +426,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
426426

427427
fn connect_child(
428428
&self,
429-
parent_task: TaskId,
429+
parent_task: Option<TaskId>,
430430
child_task: TaskId,
431431
turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
432432
) {
@@ -444,9 +444,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
444444
consistency: ReadConsistency,
445445
turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
446446
) -> Result<Result<RawVc, EventListener>> {
447-
if let Some(reader) = reader {
448-
self.assert_not_persistent_calling_transient(reader, task_id, /* cell_id */ None);
449-
}
447+
self.assert_not_persistent_calling_transient(reader, task_id, /* cell_id */ None);
450448

451449
let mut ctx = self.execute_context(turbo_tasks);
452450
let mut task = ctx.task(task_id, TaskDataCategory::All);
@@ -754,9 +752,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
754752
options: ReadCellOptions,
755753
turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
756754
) -> Result<Result<TypedCellContent, EventListener>> {
757-
if let Some(reader) = reader {
758-
self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
759-
}
755+
self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));
760756

761757
fn add_cell_dependency<B: BackingStorage>(
762758
backend: &TurboTasksBackendInner<B>,
@@ -1268,7 +1264,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
12681264
fn get_or_create_persistent_task(
12691265
&self,
12701266
task_type: CachedTaskType,
1271-
parent_task: TaskId,
1267+
parent_task: Option<TaskId>,
12721268
turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
12731269
) -> TaskId {
12741270
if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
@@ -1328,10 +1324,12 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
13281324
fn get_or_create_transient_task(
13291325
&self,
13301326
task_type: CachedTaskType,
1331-
parent_task: TaskId,
1327+
parent_task: Option<TaskId>,
13321328
turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
13331329
) -> TaskId {
1334-
if !parent_task.is_transient() {
1330+
if let Some(parent_task) = parent_task
1331+
&& !parent_task.is_transient()
1332+
{
13351333
self.panic_persistent_calling_transient(
13361334
self.lookup_task_type(parent_task).as_deref(),
13371335
Some(&task_type),
@@ -2264,7 +2262,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
22642262
&self,
22652263
task_id: TaskId,
22662264
collectible_type: TraitTypeId,
2267-
reader_id: TaskId,
2265+
reader_id: Option<TaskId>,
22682266
turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
22692267
) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
22702268
let mut ctx = self.execute_context(turbo_tasks);
@@ -2312,13 +2310,15 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
23122310
.entry(RawVc::TaskCell(collectible.task, collectible.cell))
23132311
.or_insert(0) += count;
23142312
}
2315-
let _ = task.add(CachedDataItem::CollectiblesDependent {
2316-
collectible_type,
2317-
task: reader_id,
2318-
value: (),
2319-
});
2313+
if let Some(reader_id) = reader_id {
2314+
let _ = task.add(CachedDataItem::CollectiblesDependent {
2315+
collectible_type,
2316+
task: reader_id,
2317+
value: (),
2318+
});
2319+
}
23202320
}
2321-
{
2321+
if let Some(reader_id) = reader_id {
23222322
let mut reader = ctx.task(reader_id, TaskDataCategory::Data);
23232323
let target = CollectiblesRef {
23242324
task: task_id,
@@ -2480,7 +2480,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
24802480
fn connect_task(
24812481
&self,
24822482
task: TaskId,
2483-
parent_task: TaskId,
2483+
parent_task: Option<TaskId>,
24842484
turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
24852485
) {
24862486
self.assert_not_persistent_calling_transient(parent_task, task, None);
@@ -2737,13 +2737,15 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
27372737

27382738
fn assert_not_persistent_calling_transient(
27392739
&self,
2740-
parent_id: TaskId,
2740+
parent_id: Option<TaskId>,
27412741
child_id: TaskId,
27422742
cell_id: Option<CellId>,
27432743
) {
2744-
if !parent_id.is_transient() && child_id.is_transient() {
2744+
if !parent_id.is_none_or(|id| id.is_transient()) && child_id.is_transient() {
27452745
self.panic_persistent_calling_transient(
2746-
self.lookup_task_type(parent_id).as_deref(),
2746+
parent_id
2747+
.and_then(|id| self.lookup_task_type(id))
2748+
.as_deref(),
27472749
self.lookup_task_type(child_id).as_deref(),
27482750
cell_id,
27492751
);
@@ -2828,7 +2830,7 @@ impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
28282830
fn get_or_create_persistent_task(
28292831
&self,
28302832
task_type: CachedTaskType,
2831-
parent_task: TaskId,
2833+
parent_task: Option<TaskId>,
28322834
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
28332835
) -> TaskId {
28342836
self.0
@@ -2838,7 +2840,7 @@ impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
28382840
fn get_or_create_transient_task(
28392841
&self,
28402842
task_type: CachedTaskType,
2841-
parent_task: TaskId,
2843+
parent_task: Option<TaskId>,
28422844
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
28432845
) -> TaskId {
28442846
self.0
@@ -2963,7 +2965,7 @@ impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
29632965
&self,
29642966
task_id: TaskId,
29652967
collectible_type: TraitTypeId,
2966-
reader: TaskId,
2968+
reader: Option<TaskId>,
29672969
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
29682970
) -> AutoMap<RawVc, i32, BuildHasherDefault<FxHasher>, 1> {
29692971
self.0
@@ -3035,7 +3037,7 @@ impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
30353037
parent_task: TaskId,
30363038
turbo_tasks: &dyn TurboTasksBackendApi<Self>,
30373039
) {
3038-
self.0.connect_task(task, parent_task, turbo_tasks);
3040+
self.0.connect_task(task, Some(parent_task), turbo_tasks);
30393041
}
30403042

30413043
fn create_transient_task(

turbopack/crates/turbo-tasks-backend/src/backend/operation/connect_child.rs

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,31 +23,37 @@ pub enum ConnectChildOperation {
2323
}
2424

2525
impl ConnectChildOperation {
26-
pub fn run(parent_task_id: TaskId, child_task_id: TaskId, mut ctx: impl ExecuteContext) {
27-
let mut parent_task = ctx.task(parent_task_id, TaskDataCategory::All);
28-
let Some(InProgressState::InProgress(box InProgressStateInner { new_children, .. })) =
29-
get_mut!(parent_task, InProgress)
30-
else {
31-
panic!("Task is not in progress while calling another task");
32-
};
26+
pub fn run(
27+
parent_task_id: Option<TaskId>,
28+
child_task_id: TaskId,
29+
mut ctx: impl ExecuteContext,
30+
) {
31+
if let Some(parent_task_id) = parent_task_id {
32+
let mut parent_task = ctx.task(parent_task_id, TaskDataCategory::All);
33+
let Some(InProgressState::InProgress(box InProgressStateInner {
34+
new_children, ..
35+
})) = get_mut!(parent_task, InProgress)
36+
else {
37+
panic!("Task is not in progress while calling another task");
38+
};
3339

34-
// Quick skip if the child was already connected before
35-
if !new_children.insert(child_task_id) {
36-
return;
37-
}
40+
// Quick skip if the child was already connected before
41+
if !new_children.insert(child_task_id) {
42+
return;
43+
}
3844

39-
if parent_task.has_key(&CachedDataItemKey::Child {
40-
task: child_task_id,
41-
}) {
42-
// It is already connected, we can skip the rest
43-
return;
45+
if parent_task.has_key(&CachedDataItemKey::Child {
46+
task: child_task_id,
47+
}) {
48+
// It is already connected, we can skip the rest
49+
return;
50+
}
4451
}
45-
drop(parent_task);
4652

4753
let mut queue = AggregationUpdateQueue::new();
4854

4955
// Handle the transient to persistent boundary by making the persistent task a root task
50-
if parent_task_id.is_transient() && !child_task_id.is_transient() {
56+
if parent_task_id.is_none_or(|id| id.is_transient()) && !child_task_id.is_transient() {
5157
queue.push(AggregationUpdateJob::UpdateAggregationNumber {
5258
task_id: child_task_id,
5359
base_aggregation_number: u32::MAX,
@@ -56,7 +62,7 @@ impl ConnectChildOperation {
5662
}
5763

5864
// Immutable tasks cannot be invalidated, meaning that we never reschedule them.
59-
if ctx.should_track_activeness() {
65+
if ctx.should_track_activeness() && parent_task_id.is_some() {
6066
queue.push(AggregationUpdateJob::IncreaseActiveCount {
6167
task: child_task_id,
6268
});

turbopack/crates/turbo-tasks-backend/tests/all_in_one.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@
55
use anyhow::{Result, bail};
66
use turbo_rcstr::{RcStr, rcstr};
77
use turbo_tasks::{ResolvedVc, TaskInput, ValueToString, Vc};
8-
use turbo_tasks_testing::{Registration, register, run};
8+
use turbo_tasks_testing::{Registration, register, run_once};
99

1010
static REGISTRATION: Registration = register!();
1111

1212
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1313
async fn all_in_one() {
14-
run(&REGISTRATION, || async {
14+
run_once(&REGISTRATION, || async {
1515
let a: Vc<u32> = Vc::cell(4242);
1616
assert_eq!(*a.await?, 4242);
1717

turbopack/crates/turbo-tasks-backend/tests/basic.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@
44

55
use anyhow::Result;
66
use turbo_tasks::Vc;
7-
use turbo_tasks_testing::{Registration, register, run};
7+
use turbo_tasks_testing::{Registration, register, run_once};
88

99
static REGISTRATION: Registration = register!();
1010

1111
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1212
async fn basic() {
13-
run(&REGISTRATION, || async {
13+
run_once(&REGISTRATION, || async {
1414
let output1 = func_without_args();
1515
assert_eq!(output1.await?.value, 123);
1616

turbopack/crates/turbo-tasks-backend/tests/bug.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
use anyhow::Result;
66
use serde::{Deserialize, Serialize};
77
use turbo_tasks::{NonLocalValue, Vc, trace::TraceRawVcs};
8-
use turbo_tasks_testing::{Registration, register, run};
8+
use turbo_tasks_testing::{Registration, register, run_once};
99

1010
static REGISTRATION: Registration = register!();
1111

@@ -27,7 +27,7 @@ struct TasksSpec(Vec<TaskSpec>);
2727
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
2828
async fn graph_bug() {
2929
// see https://github.com/vercel/next.js/pull/79451
30-
run(&REGISTRATION, || async {
30+
run_once(&REGISTRATION, || async {
3131
let spec = vec![
3232
TaskSpec {
3333
references: vec![

turbopack/crates/turbo-tasks-backend/tests/bug2.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::sync::Arc;
77
use anyhow::Result;
88
use serde::{Deserialize, Serialize};
99
use turbo_tasks::{NonLocalValue, State, TaskInput, Vc, trace::TraceRawVcs};
10-
use turbo_tasks_testing::{Registration, register, run};
10+
use turbo_tasks_testing::{Registration, register, run_once};
1111

1212
static REGISTRATION: Registration = register!();
1313

@@ -35,7 +35,7 @@ struct Iteration(State<usize>);
3535

3636
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
3737
async fn graph_bug() {
38-
run(&REGISTRATION, move || async move {
38+
run_once(&REGISTRATION, move || async move {
3939
let spec = vec![
4040
TaskSpec {
4141
references: vec![TaskReferenceSpec {

turbopack/crates/turbo-tasks-backend/tests/call_types.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@
44

55
use anyhow::Result;
66
use turbo_tasks::Vc;
7-
use turbo_tasks_testing::{Registration, register, run};
7+
use turbo_tasks_testing::{Registration, register, run_once};
88

99
static REGISTRATION: Registration = register!();
1010

1111
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1212
async fn functions() {
13-
run(&REGISTRATION, || async {
13+
run_once(&REGISTRATION, || async {
1414
assert_eq!(*fn_plain().await?, 42);
1515
assert_eq!(*fn_arg(43).await?, 43);
1616
assert_eq!(*fn_vc_arg(Vc::cell(44)).await?, 44);
@@ -55,7 +55,7 @@ async fn async_fn_vc_arg(n: Vc<u32>) -> Result<Vc<u32>> {
5555

5656
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
5757
async fn methods() {
58-
run(&REGISTRATION, || async {
58+
run_once(&REGISTRATION, || async {
5959
assert_eq!(*Value::static_method().await?, 42);
6060
assert_eq!(*Value::async_static_method().await?, 42);
6161

@@ -108,7 +108,7 @@ impl Value {
108108

109109
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
110110
async fn trait_methods() {
111-
run(&REGISTRATION, || async {
111+
run_once(&REGISTRATION, || async {
112112
assert_eq!(*Value::static_trait_method().await?, 42);
113113
assert_eq!(*Value::async_static_trait_method().await?, 42);
114114

turbopack/crates/turbo-tasks-backend/tests/collectibles.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@ use rustc_hash::FxHashSet;
1010
use tokio::time::sleep;
1111
use turbo_rcstr::{RcStr, rcstr};
1212
use turbo_tasks::{CollectiblesSource, ResolvedVc, ValueToString, Vc, emit};
13-
use turbo_tasks_testing::{Registration, register, run};
13+
use turbo_tasks_testing::{Registration, register, run_once};
1414

1515
static REGISTRATION: Registration = register!();
1616

1717
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1818
async fn transitive_emitting() {
19-
run(&REGISTRATION, || async {
19+
run_once(&REGISTRATION, || async {
2020
let result_op = my_transitive_emitting_function(rcstr!(""), rcstr!(""));
2121
let result_val = result_op.connect().strongly_consistent().await?;
2222
let list = result_op.peek_collectibles::<Box<dyn ValueToString>>();
@@ -34,7 +34,7 @@ async fn transitive_emitting() {
3434

3535
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
3636
async fn transitive_emitting_indirect() {
37-
run(&REGISTRATION, || async {
37+
run_once(&REGISTRATION, || async {
3838
let result_op = my_transitive_emitting_function(rcstr!(""), rcstr!(""));
3939
let collectibles_op = my_transitive_emitting_function_collectibles(rcstr!(""), rcstr!(""));
4040
let list = collectibles_op.connect().strongly_consistent().await?;
@@ -52,7 +52,7 @@ async fn transitive_emitting_indirect() {
5252

5353
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
5454
async fn multi_emitting() {
55-
run(&REGISTRATION, || async {
55+
run_once(&REGISTRATION, || async {
5656
let result_op = my_multi_emitting_function();
5757
let result_val = result_op.connect().strongly_consistent().await?;
5858
let list = result_op.peek_collectibles::<Box<dyn ValueToString>>();
@@ -70,7 +70,7 @@ async fn multi_emitting() {
7070

7171
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
7272
async fn taking_collectibles() {
73-
run(&REGISTRATION, || async {
73+
run_once(&REGISTRATION, || async {
7474
let result_op = my_collecting_function();
7575
let result_val = result_op.connect().strongly_consistent().await?;
7676
let list = result_op.take_collectibles::<Box<dyn ValueToString>>();
@@ -86,7 +86,7 @@ async fn taking_collectibles() {
8686

8787
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
8888
async fn taking_collectibles_extra_layer() {
89-
run(&REGISTRATION, || async {
89+
run_once(&REGISTRATION, || async {
9090
let result_op = my_collecting_function_indirect();
9191
let result_val = result_op.connect().strongly_consistent().await?;
9292
let list = result_op.take_collectibles::<Box<dyn ValueToString>>();
@@ -102,7 +102,7 @@ async fn taking_collectibles_extra_layer() {
102102

103103
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
104104
async fn taking_collectibles_parallel() {
105-
run(&REGISTRATION, || async {
105+
run_once(&REGISTRATION, || async {
106106
let result_op = my_transitive_emitting_function(rcstr!(""), rcstr!("a"));
107107
let result_val = result_op.connect().strongly_consistent().await?;
108108
let list = result_op.take_collectibles::<Box<dyn ValueToString>>();
@@ -144,7 +144,7 @@ async fn taking_collectibles_parallel() {
144144

145145
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
146146
async fn taking_collectibles_with_resolve() {
147-
run(&REGISTRATION, || async {
147+
run_once(&REGISTRATION, || async {
148148
let result_op = my_transitive_emitting_function_with_resolve(rcstr!("resolve"));
149149
result_op.connect().strongly_consistent().await?;
150150
let list = result_op.take_collectibles::<Box<dyn ValueToString>>();

0 commit comments

Comments
 (0)