Skip to content

Commit 3a5b2bd

Browse files
derrickstoleeGit for Windows Build Agent
authored andcommitted
pack-objects: thread the path-based compression
Adapting the implementation of ll_find_deltas(), create a threaded version of the --path-walk compression step in 'git pack-objects'. This involves adding a 'regions' member to the thread_params struct, allowing each thread to own a section of paths. We can simplify the way jobs are split because there is no value in extending the batch based on name-hash the way sections of the object entry array are attempted to be grouped. We re-use the 'list_size' and 'remaining' items for the purpose of borrowing work in progress from other "victim" threads when a thread has finished its batch of work more quickly. Using the Git repository as a test repo, the p5313 performance test shows that the resulting size of the repo is the same, but the threaded implementation gives gains of varying degrees depending on the number of objects being packed. (This was tested on a 16-core machine.) Test HEAD~1 HEAD ------------------------------------------------------------- 5313.6: thin pack with --path-walk 0.01 0.01 +0.0% 5313.7: thin pack size with --path-walk 475 475 +0.0% 5313.12: big pack with --path-walk 1.99 1.87 -6.0% 5313.13: big pack size with --path-walk 14.4M 14.3M -0.4% 5313.18: repack with --path-walk 98.14 41.46 -57.8% 5313.19: repack size with --path-walk 197.2M 197.3M +0.0% Signed-off-by: Derrick Stolee <[email protected]>
1 parent c4d2f36 commit 3a5b2bd

File tree

2 files changed

+167
-6
lines changed

2 files changed

+167
-6
lines changed

builtin/pack-objects.c

Lines changed: 163 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3045,6 +3045,7 @@ static void find_deltas(struct object_entry **list, unsigned *list_size,
30453045
struct thread_params {
30463046
pthread_t thread;
30473047
struct object_entry **list;
3048+
struct packing_region *regions;
30483049
unsigned list_size;
30493050
unsigned remaining;
30503051
int window;
@@ -3346,7 +3347,8 @@ static void find_deltas_by_region(struct object_entry *list,
33463347
progress_nr = regions[nr - 1].start + regions[nr - 1].nr;
33473348

33483349
if (progress)
3349-
progress_state = start_progress(_("Compressing objects by path"),
3350+
progress_state = start_progress(the_repository,
3351+
_("Compressing objects by path"),
33503352
progress_nr);
33513353

33523354
while (nr--)
@@ -3358,6 +3360,164 @@ static void find_deltas_by_region(struct object_entry *list,
33583360
stop_progress(&progress_state);
33593361
}
33603362

3363+
static void *threaded_find_deltas_by_path(void *arg)
3364+
{
3365+
struct thread_params *me = arg;
3366+
3367+
progress_lock();
3368+
while (me->remaining) {
3369+
while (me->remaining) {
3370+
progress_unlock();
3371+
find_deltas_for_region(to_pack.objects,
3372+
me->regions,
3373+
me->processed);
3374+
progress_lock();
3375+
me->remaining--;
3376+
me->regions++;
3377+
}
3378+
3379+
me->working = 0;
3380+
pthread_cond_signal(&progress_cond);
3381+
progress_unlock();
3382+
3383+
/*
3384+
* We must not set ->data_ready before we wait on the
3385+
* condition because the main thread may have set it to 1
3386+
* before we get here. In order to be sure that new
3387+
* work is available if we see 1 in ->data_ready, it
3388+
* was initialized to 0 before this thread was spawned
3389+
* and we reset it to 0 right away.
3390+
*/
3391+
pthread_mutex_lock(&me->mutex);
3392+
while (!me->data_ready)
3393+
pthread_cond_wait(&me->cond, &me->mutex);
3394+
me->data_ready = 0;
3395+
pthread_mutex_unlock(&me->mutex);
3396+
3397+
progress_lock();
3398+
}
3399+
progress_unlock();
3400+
/* leave ->working 1 so that this doesn't get more work assigned */
3401+
return NULL;
3402+
}
3403+
3404+
static void ll_find_deltas_by_region(struct object_entry *list,
3405+
struct packing_region *regions,
3406+
uint32_t start, uint32_t nr)
3407+
{
3408+
struct thread_params *p;
3409+
int i, ret, active_threads = 0;
3410+
unsigned int processed = 0;
3411+
uint32_t progress_nr;
3412+
init_threaded_search();
3413+
3414+
if (!nr)
3415+
return;
3416+
3417+
progress_nr = regions[nr - 1].start + regions[nr - 1].nr;
3418+
if (delta_search_threads <= 1) {
3419+
find_deltas_by_region(list, regions, start, nr);
3420+
cleanup_threaded_search();
3421+
return;
3422+
}
3423+
3424+
if (progress > pack_to_stdout)
3425+
fprintf_ln(stderr, _("Path-based delta compression using up to %d threads"),
3426+
delta_search_threads);
3427+
CALLOC_ARRAY(p, delta_search_threads);
3428+
3429+
if (progress)
3430+
progress_state = start_progress(the_repository,
3431+
_("Compressing objects by path"),
3432+
progress_nr);
3433+
/* Partition the work amongst work threads. */
3434+
for (i = 0; i < delta_search_threads; i++) {
3435+
unsigned sub_size = nr / (delta_search_threads - i);
3436+
3437+
p[i].window = window;
3438+
p[i].depth = depth;
3439+
p[i].processed = &processed;
3440+
p[i].working = 1;
3441+
p[i].data_ready = 0;
3442+
3443+
p[i].regions = regions;
3444+
p[i].list_size = sub_size;
3445+
p[i].remaining = sub_size;
3446+
3447+
regions += sub_size;
3448+
nr -= sub_size;
3449+
}
3450+
3451+
/* Start work threads. */
3452+
for (i = 0; i < delta_search_threads; i++) {
3453+
if (!p[i].list_size)
3454+
continue;
3455+
pthread_mutex_init(&p[i].mutex, NULL);
3456+
pthread_cond_init(&p[i].cond, NULL);
3457+
ret = pthread_create(&p[i].thread, NULL,
3458+
threaded_find_deltas_by_path, &p[i]);
3459+
if (ret)
3460+
die(_("unable to create thread: %s"), strerror(ret));
3461+
active_threads++;
3462+
}
3463+
3464+
/*
3465+
* Now let's wait for work completion. Each time a thread is done
3466+
* with its work, we steal half of the remaining work from the
3467+
* thread with the largest number of unprocessed objects and give
3468+
* it to that newly idle thread. This ensure good load balancing
3469+
* until the remaining object list segments are simply too short
3470+
* to be worth splitting anymore.
3471+
*/
3472+
while (active_threads) {
3473+
struct thread_params *target = NULL;
3474+
struct thread_params *victim = NULL;
3475+
unsigned sub_size = 0;
3476+
3477+
progress_lock();
3478+
for (;;) {
3479+
for (i = 0; !target && i < delta_search_threads; i++)
3480+
if (!p[i].working)
3481+
target = &p[i];
3482+
if (target)
3483+
break;
3484+
pthread_cond_wait(&progress_cond, &progress_mutex);
3485+
}
3486+
3487+
for (i = 0; i < delta_search_threads; i++)
3488+
if (p[i].remaining > 2*window &&
3489+
(!victim || victim->remaining < p[i].remaining))
3490+
victim = &p[i];
3491+
if (victim) {
3492+
sub_size = victim->remaining / 2;
3493+
target->regions = victim->regions + victim->remaining - sub_size;
3494+
victim->list_size -= sub_size;
3495+
victim->remaining -= sub_size;
3496+
}
3497+
target->list_size = sub_size;
3498+
target->remaining = sub_size;
3499+
target->working = 1;
3500+
progress_unlock();
3501+
3502+
pthread_mutex_lock(&target->mutex);
3503+
target->data_ready = 1;
3504+
pthread_cond_signal(&target->cond);
3505+
pthread_mutex_unlock(&target->mutex);
3506+
3507+
if (!sub_size) {
3508+
pthread_join(target->thread, NULL);
3509+
pthread_cond_destroy(&target->cond);
3510+
pthread_mutex_destroy(&target->mutex);
3511+
active_threads--;
3512+
}
3513+
}
3514+
cleanup_threaded_search();
3515+
free(p);
3516+
3517+
display_progress(progress_state, progress_nr);
3518+
stop_progress(&progress_state);
3519+
}
3520+
33613521
static void prepare_pack(int window, int depth)
33623522
{
33633523
struct object_entry **delta_list;
@@ -3383,8 +3543,8 @@ static void prepare_pack(int window, int depth)
33833543
return;
33843544

33853545
if (path_walk)
3386-
find_deltas_by_region(to_pack.objects, to_pack.regions,
3387-
0, to_pack.nr_regions);
3546+
ll_find_deltas_by_region(to_pack.objects, to_pack.regions,
3547+
0, to_pack.nr_regions);
33883548

33893549
ALLOC_ARRAY(delta_list, to_pack.nr_objects);
33903550
nr_deltas = n = 0;

t/perf/p5313-pack-objects.sh

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,23 +69,24 @@ test_perf 'thin pack with --path-walk' '
6969
'
7070

7171
test_size 'thin pack size with --path-walk' '
72-
wc -c <out
72+
test_file_size out
7373
'
7474

7575
test_perf 'big pack with --path-walk' '
7676
git pack-objects --stdout --revs --sparse --path-walk <in-big >out
7777
'
7878

7979
test_size 'big pack size with --path-walk' '
80-
wc -c <out
80+
test_file_size out
8181
'
8282

8383
test_perf 'repack with --path-walk' '
8484
git repack -adf --path-walk
8585
'
8686

8787
test_size 'repack size with --path-walk' '
88-
wc -c <.git/objects/pack/pack-*.pack
88+
pack=$(ls .git/objects/pack/pack-*.pack) &&
89+
test_file_size "$pack"
8990
'
9091

9192
test_done

0 commit comments

Comments
 (0)