@@ -10,32 +10,47 @@ use std::path::{Path, PathBuf};
10
10
use std:: sync:: atomic:: Ordering ;
11
11
use std:: time:: { Duration , Instant } ;
12
12
13
+ pub type ProgressItem = gix:: progress:: DoOrDiscard < gix:: progress:: prodash:: tree:: Item > ;
14
+
15
+ pub struct State {
16
+ pub progress : ProgressItem ,
17
+ pub gitoxide_version : String ,
18
+ pub trace_to_progress : bool ,
19
+ pub reverse_trace_lines : bool ,
20
+ }
21
+
13
22
impl Engine {
14
23
/// Open the corpus DB or create it.
15
- pub fn open_or_create ( db : PathBuf , gitoxide_version : String , progress : corpus :: Progress ) -> anyhow:: Result < Engine > {
24
+ pub fn open_or_create ( db : PathBuf , state : State ) -> anyhow:: Result < Engine > {
16
25
let con = crate :: corpus:: db:: create ( db) . context ( "Could not open or create database" ) ?;
17
- Ok ( Engine {
18
- progress,
19
- con,
20
- gitoxide_version,
21
- } )
26
+ Ok ( Engine { con, state } )
22
27
}
23
28
24
29
/// Run on the existing set of repositories we have already seen or obtain them from `path` if there is none yet.
25
- pub fn run ( & mut self , corpus_path : PathBuf , threads : Option < usize > ) -> anyhow:: Result < ( ) > {
30
+ pub fn run (
31
+ & mut self ,
32
+ corpus_path : PathBuf ,
33
+ threads : Option < usize > ,
34
+ dry_run : bool ,
35
+ repo_sql_suffix : Option < String > ,
36
+ allowed_task_names : Vec < String > ,
37
+ ) -> anyhow:: Result < ( ) > {
38
+ let tasks = self . tasks_or_insert ( & allowed_task_names) ?;
39
+ if tasks. is_empty ( ) {
40
+ bail ! ( "Cannot run without any task to perform on the repositories" ) ;
41
+ }
26
42
let ( corpus_path, corpus_id) = self . prepare_corpus_path ( corpus_path) ?;
27
43
let gitoxide_id = self . gitoxide_version_id_or_insert ( ) ?;
28
44
let runner_id = self . runner_id_or_insert ( ) ?;
29
- let repos = self . find_repos_or_insert ( & corpus_path, corpus_id) ?;
30
- let tasks = self . tasks_or_insert ( ) ?;
31
- self . perform_run ( & corpus_path, gitoxide_id, runner_id, & tasks, repos, threads)
45
+ let repos = self . find_repos_or_insert ( & corpus_path, corpus_id, repo_sql_suffix) ?;
46
+ self . perform_run ( & corpus_path, gitoxide_id, runner_id, & tasks, repos, threads, dry_run)
32
47
}
33
48
34
49
pub fn refresh ( & mut self , corpus_path : PathBuf ) -> anyhow:: Result < ( ) > {
35
50
let ( corpus_path, corpus_id) = self . prepare_corpus_path ( corpus_path) ?;
36
51
let repos = self . refresh_repos ( & corpus_path, corpus_id) ?;
37
- self . progress . set_name ( "refresh repos" ) ;
38
- self . progress . info ( format ! (
52
+ self . state . progress . set_name ( "refresh repos" ) ;
53
+ self . state . progress . info ( format ! (
39
54
"Added or updated {} repositories under {corpus_path:?}" ,
40
55
repos. len( )
41
56
) ) ;
@@ -44,6 +59,7 @@ impl Engine {
44
59
}
45
60
46
61
impl Engine {
62
+ #[ allow( clippy:: too_many_arguments) ]
47
63
fn perform_run (
48
64
& mut self ,
49
65
corpus_path : & Path ,
@@ -52,21 +68,42 @@ impl Engine {
52
68
tasks : & [ ( db:: Id , & ' static Task ) ] ,
53
69
mut repos : Vec < db:: Repo > ,
54
70
threads : Option < usize > ,
71
+ dry_run : bool ,
55
72
) -> anyhow:: Result < ( ) > {
56
73
let start = Instant :: now ( ) ;
57
- let task_progress = & mut self . progress ;
58
- task_progress. set_name ( "run" ) ;
59
- task_progress. init ( Some ( tasks. len ( ) ) , gix:: progress:: count ( "tasks" ) ) ;
74
+ let repo_progress = & mut self . state . progress ;
60
75
let threads = gix:: parallel:: num_threads ( threads) ;
61
76
let db_path = self . con . path ( ) . expect ( "opened from path on disk" ) . to_owned ( ) ;
62
- for ( task_id, task) in tasks {
77
+ ' tasks_loop : for ( task_id, task) in tasks {
63
78
let task_start = Instant :: now ( ) ;
64
- let mut repo_progress = task_progress. add_child ( format ! ( "run '{}'" , task. short_name) ) ;
79
+ let task_info = format ! ( "run '{}'" , task. short_name) ;
80
+ repo_progress. set_name ( task_info. clone ( ) ) ;
65
81
repo_progress. init ( Some ( repos. len ( ) ) , gix:: progress:: count ( "repos" ) ) ;
66
-
67
- if task. execute_exclusive || threads == 1 {
82
+ if task. execute_exclusive || threads == 1 || dry_run {
83
+ if dry_run {
84
+ repo_progress. set_name ( "WOULD run" ) ;
85
+ for repo in & repos {
86
+ repo_progress. info ( format ! (
87
+ "{}" ,
88
+ repo. path
89
+ . strip_prefix( corpus_path)
90
+ . expect( "corpus contains repo" )
91
+ . display( )
92
+ ) ) ;
93
+ repo_progress. inc ( ) ;
94
+ }
95
+ repo_progress. info ( format ! ( "with {} tasks" , tasks. len( ) ) ) ;
96
+ for ( _, task) in tasks {
97
+ repo_progress. info ( format ! ( "task '{}' ({})" , task. description, task. short_name) )
98
+ }
99
+ break ' tasks_loop;
100
+ }
68
101
let mut run_progress = repo_progress. add_child ( "set later" ) ;
69
- let ( _guard, current_id) = corpus:: trace:: override_thread_subscriber ( db_path. as_str ( ) ) ?;
102
+ let ( _guard, current_id) = corpus:: trace:: override_thread_subscriber (
103
+ db_path. as_str ( ) ,
104
+ self . state . trace_to_progress . then ( || repo_progress. add_child ( "trace" ) ) ,
105
+ self . state . reverse_trace_lines ,
106
+ ) ?;
70
107
71
108
for repo in & repos {
72
109
if gix:: interrupt:: is_triggered ( ) {
@@ -80,7 +117,7 @@ impl Engine {
80
117
. display( )
81
118
) ) ;
82
119
83
- // TODO: wait for new release to be able to provide run_id via span attributes
120
+ // TODO: wait for new release of `tracing-forest` to be able to provide run_id via span attributes
84
121
let mut run = Self :: insert_run ( & self . con , gitoxide_id, runner_id, * task_id, repo. id ) ?;
85
122
current_id. store ( run. id , Ordering :: SeqCst ) ;
86
123
tracing:: info_span!( "run" , run_id = run. id) . in_scope ( || {
@@ -98,17 +135,21 @@ impl Engine {
98
135
repo_progress. show_throughput ( task_start) ;
99
136
} else {
100
137
let counter = repo_progress. counter ( ) ;
101
- let repo_progress = gix:: threading:: OwnShared :: new ( gix:: threading:: Mutable :: new ( repo_progress) ) ;
138
+ let repo_progress = gix:: threading:: OwnShared :: new ( gix:: threading:: Mutable :: new (
139
+ repo_progress. add_child ( "will be changed" ) ,
140
+ ) ) ;
102
141
gix:: parallel:: in_parallel_with_slice (
103
142
& mut repos,
104
143
Some ( threads) ,
105
144
{
106
145
let shared_repo_progress = repo_progress. clone ( ) ;
107
146
let db_path = db_path. clone ( ) ;
108
147
move |tid| {
148
+ let mut progress = gix:: threading:: lock ( & shared_repo_progress) ;
109
149
(
110
- corpus:: trace:: override_thread_subscriber ( db_path. as_str ( ) ) ,
111
- gix:: threading:: lock ( & shared_repo_progress) . add_child ( format ! ( "{tid}" ) ) ,
150
+ // threaded printing is usually spammy, and lines interleave so it's useless.
151
+ corpus:: trace:: override_thread_subscriber ( db_path. as_str ( ) , None , false ) ,
152
+ progress. add_child ( format ! ( "{tid}" ) ) ,
112
153
rusqlite:: Connection :: open ( & db_path) ,
113
154
)
114
155
}
@@ -154,9 +195,9 @@ impl Engine {
154
195
gix:: threading:: lock ( & repo_progress) . show_throughput ( task_start) ;
155
196
}
156
197
157
- task_progress . inc ( ) ;
198
+ repo_progress . inc ( ) ;
158
199
}
159
- task_progress . show_throughput ( start) ;
200
+ repo_progress . show_throughput ( start) ;
160
201
Ok ( ( ) )
161
202
}
162
203
@@ -166,13 +207,21 @@ impl Engine {
166
207
Ok ( ( corpus_path, corpus_id) )
167
208
}
168
209
169
- fn find_repos ( & mut self , corpus_path : & Path , corpus_id : db:: Id ) -> anyhow:: Result < Vec < db:: Repo > > {
170
- self . progress . set_name ( "query db-repos" ) ;
171
- self . progress . init ( None , gix:: progress:: count ( "repos" ) ) ;
210
+ fn find_repos (
211
+ & mut self ,
212
+ corpus_path : & Path ,
213
+ corpus_id : db:: Id ,
214
+ sql_suffix : Option < & str > ,
215
+ ) -> anyhow:: Result < Vec < db:: Repo > > {
216
+ self . state . progress . set_name ( "query db-repos" ) ;
217
+ self . state . progress . init ( None , gix:: progress:: count ( "repos" ) ) ;
172
218
173
219
Ok ( self
174
220
. con
175
- . prepare ( "SELECT id, rela_path, odb_size, num_objects, num_references FROM repository WHERE corpus = ?1" ) ?
221
+ . prepare ( & format ! (
222
+ "SELECT id, rela_path, odb_size, num_objects, num_references FROM repository WHERE corpus = ?1 {}" ,
223
+ sql_suffix. unwrap_or_default( )
224
+ ) ) ?
176
225
. query_map ( [ corpus_id] , |r| {
177
226
Ok ( db:: Repo {
178
227
id : r. get ( 0 ) ?,
@@ -182,17 +231,17 @@ impl Engine {
182
231
num_references : r. get ( 4 ) ?,
183
232
} )
184
233
} ) ?
185
- . inspect ( |_| self . progress . inc ( ) )
234
+ . inspect ( |_| self . state . progress . inc ( ) )
186
235
. collect :: < Result < _ , _ > > ( ) ?)
187
236
}
188
237
189
238
fn refresh_repos ( & mut self , corpus_path : & Path , corpus_id : db:: Id ) -> anyhow:: Result < Vec < db:: Repo > > {
190
239
let start = Instant :: now ( ) ;
191
- self . progress . set_name ( "refresh" ) ;
192
- self . progress . init ( None , gix:: progress:: count ( "repos" ) ) ;
240
+ self . state . progress . set_name ( "refresh" ) ;
241
+ self . state . progress . init ( None , gix:: progress:: count ( "repos" ) ) ;
193
242
194
243
let repos = std:: thread:: scope ( {
195
- let progress = & mut self . progress ;
244
+ let progress = & mut self . state . progress ;
196
245
let con = & mut self . con ;
197
246
|scope| -> anyhow:: Result < _ > {
198
247
let threads = std:: thread:: available_parallelism ( )
@@ -264,13 +313,23 @@ impl Engine {
264
313
Ok ( repos)
265
314
}
266
315
267
- fn find_repos_or_insert ( & mut self , corpus_path : & Path , corpus_id : db:: Id ) -> anyhow:: Result < Vec < db:: Repo > > {
316
+ fn find_repos_or_insert (
317
+ & mut self ,
318
+ corpus_path : & Path ,
319
+ corpus_id : db:: Id ,
320
+ sql_suffix : Option < String > ,
321
+ ) -> anyhow:: Result < Vec < db:: Repo > > {
268
322
let start = Instant :: now ( ) ;
269
- let repos = self . find_repos ( corpus_path, corpus_id) ?;
323
+ let repos = self . find_repos ( corpus_path, corpus_id, sql_suffix . as_deref ( ) ) ?;
270
324
if repos. is_empty ( ) {
271
- self . refresh_repos ( corpus_path, corpus_id)
325
+ let res = self . refresh_repos ( corpus_path, corpus_id) ;
326
+ if sql_suffix. is_some ( ) {
327
+ self . find_repos ( corpus_path, corpus_id, sql_suffix. as_deref ( ) )
328
+ } else {
329
+ res
330
+ }
272
331
} else {
273
- self . progress . show_throughput ( start) ;
332
+ self . state . progress . show_throughput ( start) ;
274
333
Ok ( repos)
275
334
}
276
335
}
0 commit comments