1
- use std:: { borrow:: Borrow , cmp:: max, path:: PathBuf , sync:: Arc } ;
1
+ use std:: {
2
+ borrow:: Borrow ,
3
+ cmp:: max,
4
+ env,
5
+ path:: PathBuf ,
6
+ sync:: { Arc , LazyLock , Mutex , PoisonError , Weak } ,
7
+ } ;
2
8
3
9
use anyhow:: { Context , Result , anyhow} ;
4
10
use rayon:: iter:: { IndexedParallelIterator , IntoParallelIterator , ParallelIterator } ;
5
11
use serde:: { Deserialize , Serialize } ;
6
12
use smallvec:: SmallVec ;
7
13
use tracing:: Span ;
8
- use turbo_tasks:: { SessionId , TaskId , backend:: CachedTaskType , turbo_tasks_scope} ;
14
+ use turbo_tasks:: {
15
+ SessionId , TaskId ,
16
+ backend:: CachedTaskType ,
17
+ panic_hooks:: { PanicHookGuard , register_panic_hook} ,
18
+ turbo_tasks_scope,
19
+ } ;
9
20
10
21
use crate :: {
11
22
GitVersionInfo ,
@@ -83,18 +94,48 @@ fn as_u32(bytes: impl Borrow<[u8]>) -> Result<u32> {
83
94
Ok ( n)
84
95
}
85
96
86
- pub struct KeyValueDatabaseBackingStorage < T : KeyValueDatabase > {
97
+ // We want to invalidate the cache on panic for most users, but this is a band-aid to underlying
98
+ // problems in turbo-tasks.
99
+ //
100
+ // If we invalidate the cache upon panic and it "fixes" the issue upon restart, users typically
101
+ // won't report bugs to us, and we'll never find root-causes for these problems.
102
+ //
103
+ // These overrides let us avoid the cache invalidation / error suppression within Vercel so that we
104
+ // feel these pain points and fix the root causes of bugs.
105
+ fn should_invalidate_on_panic ( ) -> bool {
106
+ fn env_is_falsy ( key : & str ) -> bool {
107
+ env:: var_os ( key)
108
+ . is_none_or ( |value| [ "" . as_ref ( ) , "0" . as_ref ( ) , "false" . as_ref ( ) ] . contains ( & & * value) )
109
+ }
110
+ static SHOULD_INVALIDATE : LazyLock < bool > = LazyLock :: new ( || {
111
+ env_is_falsy ( "TURBO_ENGINE_SKIP_INVALIDATE_ON_PANIC" ) && env_is_falsy ( "__NEXT_TEST_MODE" )
112
+ } ) ;
113
+ * SHOULD_INVALIDATE
114
+ }
115
+
116
+ pub struct KeyValueDatabaseBackingStorageInner < T : KeyValueDatabase > {
87
117
database : T ,
88
118
/// Used when calling [`BackingStorage::invalidate`]. Can be `None` in the memory-only/no-op
89
119
/// storage case.
90
120
base_path : Option < PathBuf > ,
121
+ /// Used to skip calling [`invalidate_db`] when the database has already been invalidated.
122
+ invalidated : Mutex < bool > ,
123
+ _panic_hook_guard : Option < PanicHookGuard > ,
124
+ }
125
+
126
+ pub struct KeyValueDatabaseBackingStorage < T : KeyValueDatabase > {
127
+ inner : Arc < KeyValueDatabaseBackingStorageInner < T > > ,
91
128
}
92
129
93
130
impl < T : KeyValueDatabase > KeyValueDatabaseBackingStorage < T > {
94
131
pub fn new_in_memory ( database : T ) -> Self {
95
132
Self {
96
- database,
97
- base_path : None ,
133
+ inner : Arc :: new ( KeyValueDatabaseBackingStorageInner {
134
+ database,
135
+ base_path : None ,
136
+ invalidated : Mutex :: new ( false ) ,
137
+ _panic_hook_guard : None ,
138
+ } ) ,
98
139
}
99
140
}
100
141
@@ -103,15 +144,44 @@ impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorage<T> {
103
144
version_info : & GitVersionInfo ,
104
145
is_ci : bool ,
105
146
database : impl FnOnce ( PathBuf ) -> Result < T > ,
106
- ) -> Result < Self > {
147
+ ) -> Result < Self >
148
+ where
149
+ T : Send + Sync + ' static ,
150
+ {
107
151
check_db_invalidation_and_cleanup ( & base_path) ?;
108
152
let versioned_path = handle_db_versioning ( & base_path, version_info, is_ci) ?;
153
+ let database = ( database) ( versioned_path) ?;
109
154
Ok ( Self {
110
- database : ( database) ( versioned_path) ?,
111
- base_path : Some ( base_path) ,
155
+ inner : Arc :: new_cyclic (
156
+ move |weak_inner : & Weak < KeyValueDatabaseBackingStorageInner < T > > | {
157
+ let panic_hook_guard = if should_invalidate_on_panic ( ) {
158
+ let weak_inner = weak_inner. clone ( ) ;
159
+ Some ( register_panic_hook ( Box :: new ( move |_| {
160
+ let Some ( inner) = weak_inner. upgrade ( ) else {
161
+ return ;
162
+ } ;
163
+ // If a panic happened that must mean something deep inside of turbopack
164
+ // or turbo-tasks failed, and it may be hard to recover. We don't want
165
+ // the cache to stick around, as that may persist bugs. Make a
166
+ // best-effort attempt to invalidate the database (ignoring failures).
167
+ let _ = inner. invalidate ( ) ;
168
+ } ) ) )
169
+ } else {
170
+ None
171
+ } ;
172
+ KeyValueDatabaseBackingStorageInner {
173
+ database,
174
+ base_path : Some ( base_path) ,
175
+ invalidated : Mutex :: new ( false ) ,
176
+ _panic_hook_guard : panic_hook_guard,
177
+ }
178
+ } ,
179
+ ) ,
112
180
} )
113
181
}
182
+ }
114
183
184
+ impl < T : KeyValueDatabase > KeyValueDatabaseBackingStorageInner < T > {
115
185
fn with_tx < R > (
116
186
& self ,
117
187
tx : Option < & T :: ReadTransaction < ' _ > > ,
@@ -126,14 +196,38 @@ impl<T: KeyValueDatabase> KeyValueDatabaseBackingStorage<T> {
126
196
Ok ( r)
127
197
}
128
198
}
129
- }
130
199
131
- fn get_infra_u32 ( database : & impl KeyValueDatabase , key : u32 ) -> Result < Option < u32 > > {
132
- let tx = database. begin_read_transaction ( ) ?;
133
- database
134
- . get ( & tx, KeySpace :: Infra , IntKey :: new ( key) . as_ref ( ) ) ?
135
- . map ( as_u32)
136
- . transpose ( )
200
+ fn invalidate ( & self ) -> Result < ( ) > {
201
+ // `base_path` can be `None` for a `NoopKvDb`
202
+ if let Some ( base_path) = & self . base_path {
203
+ // Invalidation could happen frequently if there's a bunch of panics. We only need to
204
+ // invalidate once, so grab a lock.
205
+ let mut invalidated_guard = self
206
+ . invalidated
207
+ . lock ( )
208
+ . unwrap_or_else ( PoisonError :: into_inner) ;
209
+ if * invalidated_guard {
210
+ return Ok ( ( ) ) ;
211
+ }
212
+ // Invalidate first, as it's a very fast atomic operation. `prevent_writes` is allowed
213
+ // to be slower (e.g. wait for a lock) and is allowed to corrupt the database with
214
+ // partial writes.
215
+ invalidate_db ( base_path) ?;
216
+ self . database . prevent_writes ( ) ;
217
+ // Avoid redundant invalidations from future panics
218
+ * invalidated_guard = true ;
219
+ }
220
+ Ok ( ( ) )
221
+ }
222
+
223
+ /// Used to read the previous session id and the next free task ID from the database.
224
+ fn get_infra_u32 ( & self , key : u32 ) -> Result < Option < u32 > > {
225
+ let tx = self . database . begin_read_transaction ( ) ?;
226
+ self . database
227
+ . get ( & tx, KeySpace :: Infra , IntKey :: new ( key) . as_ref ( ) ) ?
228
+ . map ( as_u32)
229
+ . transpose ( )
230
+ }
137
231
}
138
232
139
233
impl < T : KeyValueDatabase + Send + Sync + ' static > BackingStorage
@@ -149,15 +243,17 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
149
243
150
244
fn next_free_task_id ( & self ) -> Result < TaskId > {
151
245
Ok ( TaskId :: try_from (
152
- get_infra_u32 ( & self . database , META_KEY_NEXT_FREE_TASK_ID )
246
+ self . inner
247
+ . get_infra_u32 ( META_KEY_NEXT_FREE_TASK_ID )
153
248
. context ( "Unable to read next free task id from database" ) ?
154
249
. unwrap_or ( 1 ) ,
155
250
) ?)
156
251
}
157
252
158
253
fn next_session_id ( & self ) -> Result < SessionId > {
159
254
Ok ( SessionId :: try_from (
160
- get_infra_u32 ( & self . database , META_KEY_SESSION_ID )
255
+ self . inner
256
+ . get_infra_u32 ( META_KEY_SESSION_ID )
161
257
. context ( "Unable to read session id from database" ) ?
162
258
. unwrap_or ( 0 )
163
259
+ 1 ,
@@ -178,7 +274,7 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
178
274
let operations = deserialize_with_good_error ( operations. borrow ( ) ) ?;
179
275
Ok ( operations)
180
276
}
181
- get ( & self . database ) . context ( "Unable to read uncompleted operations from database" )
277
+ get ( & self . inner . database ) . context ( "Unable to read uncompleted operations from database" )
182
278
}
183
279
184
280
fn serialize ( task : TaskId , data : & Vec < CachedDataItem > ) -> Result < SmallVec < [ u8 ; 16 ] > > {
@@ -203,7 +299,7 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
203
299
+ Sync ,
204
300
{
205
301
let _span = tracing:: trace_span!( "save snapshot" , session_id = ?session_id, operations = operations. len( ) ) ;
206
- let mut batch = self . database . write_batch ( ) ?;
302
+ let mut batch = self . inner . database . write_batch ( ) ?;
207
303
208
304
// Start organizing the updates in parallel
209
305
match & mut batch {
@@ -380,14 +476,15 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
380
476
}
381
477
382
478
fn start_read_transaction ( & self ) -> Option < Self :: ReadTransaction < ' _ > > {
383
- self . database . begin_read_transaction ( ) . ok ( )
479
+ self . inner . database . begin_read_transaction ( ) . ok ( )
384
480
}
385
481
386
482
unsafe fn forward_lookup_task_cache (
387
483
& self ,
388
484
tx : Option < & T :: ReadTransaction < ' _ > > ,
389
485
task_type : & CachedTaskType ,
390
486
) -> Result < Option < TaskId > > {
487
+ let inner = & * self . inner ;
391
488
fn lookup < D : KeyValueDatabase > (
392
489
database : & D ,
393
490
tx : & D :: ReadTransaction < ' _ > ,
@@ -401,12 +498,13 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
401
498
let id = TaskId :: try_from ( u32:: from_le_bytes ( bytes) ) . unwrap ( ) ;
402
499
Ok ( Some ( id) )
403
500
}
404
- if self . database . is_empty ( ) {
501
+ if inner . database . is_empty ( ) {
405
502
// Checking if the database is empty is a performance optimization
406
503
// to avoid serializing the task type.
407
504
return Ok ( None ) ;
408
505
}
409
- self . with_tx ( tx, |tx| lookup ( & self . database , tx, task_type) )
506
+ inner
507
+ . with_tx ( tx, |tx| lookup ( & self . inner . database , tx, task_type) )
410
508
. with_context ( || format ! ( "Looking up task id for {task_type:?} from database failed" ) )
411
509
}
412
510
@@ -415,6 +513,7 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
415
513
tx : Option < & T :: ReadTransaction < ' _ > > ,
416
514
task_id : TaskId ,
417
515
) -> Result < Option < Arc < CachedTaskType > > > {
516
+ let inner = & * self . inner ;
418
517
fn lookup < D : KeyValueDatabase > (
419
518
database : & D ,
420
519
tx : & D :: ReadTransaction < ' _ > ,
@@ -430,7 +529,8 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
430
529
} ;
431
530
Ok ( Some ( deserialize_with_good_error ( bytes. borrow ( ) ) ?) )
432
531
}
433
- self . with_tx ( tx, |tx| lookup ( & self . database , tx, task_id) )
532
+ inner
533
+ . with_tx ( tx, |tx| lookup ( & inner. database , tx, task_id) )
434
534
. with_context ( || format ! ( "Looking up task type for {task_id} from database failed" ) )
435
535
}
436
536
@@ -440,6 +540,7 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
440
540
task_id : TaskId ,
441
541
category : TaskDataCategory ,
442
542
) -> Result < Vec < CachedDataItem > > {
543
+ let inner = & * self . inner ;
443
544
fn lookup < D : KeyValueDatabase > (
444
545
database : & D ,
445
546
tx : & D :: ReadTransaction < ' _ > ,
@@ -461,24 +562,17 @@ impl<T: KeyValueDatabase + Send + Sync + 'static> BackingStorage
461
562
let result: Vec < CachedDataItem > = deserialize_with_good_error ( bytes. borrow ( ) ) ?;
462
563
Ok ( result)
463
564
}
464
- self . with_tx ( tx, |tx| lookup ( & self . database , tx, task_id, category) )
565
+ inner
566
+ . with_tx ( tx, |tx| lookup ( & inner. database , tx, task_id, category) )
465
567
. with_context ( || format ! ( "Looking up data for {task_id} from database failed" ) )
466
568
}
467
569
468
570
fn invalidate ( & self ) -> Result < ( ) > {
469
- // `base_path` can be `None` for a `NoopKvDb`
470
- if let Some ( base_path) = & self . base_path {
471
- // Invalidate first, as it's a very fast atomic operation. `prevent_writes` is allowed
472
- // to be slower (e.g. wait for a lock) and is allowed to corrupt the database with
473
- // partial writes.
474
- invalidate_db ( base_path) ?;
475
- self . database . prevent_writes ( )
476
- }
477
- Ok ( ( ) )
571
+ self . inner . invalidate ( )
478
572
}
479
573
480
574
fn shutdown ( & self ) -> Result < ( ) > {
481
- self . database . shutdown ( )
575
+ self . inner . database . shutdown ( )
482
576
}
483
577
}
484
578
0 commit comments