Skip to content

Commit bbc7aec

Browse files
committed
[Issue #449] multithread delete for WAL archive
1 parent 5dacdf5 commit bbc7aec

File tree

10 files changed

+332
-49
lines changed

10 files changed

+332
-49
lines changed

src/catalog.c

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1727,7 +1727,10 @@ catalog_get_timelines(InstanceState *instanceState, InstanceConfig *instance)
17271727
parray_walk(timelines, pfree);
17281728
parray_free(timelines);
17291729
}
1730-
/* add WAL archive subdirectories to filelist (used only in delete) */
1730+
/*
1731+
* Add WAL archive subdirectories to filelist (used only in delete)
1732+
* TODO: currently only directory with 8-character name is treated as WAL subdir, is it ok?
1733+
*/
17311734
else if (S_ISDIR(file->mode) && strspn(file->rel_path, "0123456789ABCDEF") == 8)
17321735
{
17331736
if (instanceState->wal_archive_subdirs == NULL)
@@ -1760,6 +1763,9 @@ catalog_get_timelines(InstanceState *instanceState, InstanceConfig *instance)
17601763
parray_append(tlinfo->backups, backup);
17611764
}
17621765
}
1766+
1767+
/* setup locks */
1768+
xlogfilearray_clear_locks(tlinfo->xlog_filelist);
17631769
}
17641770

17651771
/* determine oldest backup and closest backup for every timeline */

src/delete.c

Lines changed: 112 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,23 @@ static bool backup_deleted = false; /* At least one backup was deleted */
2929
static bool backup_merged = false; /* At least one merge was enacted */
3030
static bool wal_deleted = false; /* At least one WAL segments was deleted */
3131

32+
typedef struct
33+
{
34+
parray *xlog_filelist;
35+
int thread_num;
36+
bool purge_all;
37+
XLogSegNo OldestToKeepSegNo;
38+
const char *archive_root_dir;
39+
40+
/*
41+
* Return value from the thread.
42+
* 0 means there is no error, 1 - there is an error.
43+
*/
44+
int ret;
45+
} delete_files_arg;
46+
47+
static void *delete_walfiles_in_tli_internal(void *arg);
48+
3249
void
3350
do_delete(InstanceState *instanceState, time_t backup_id)
3451
{
@@ -782,7 +799,7 @@ delete_backup_files(pgBackup *backup)
782799
elog(INFO, "Progress: (%zd/%zd). Delete file \"%s\"",
783800
i + 1, num_files, full_path);
784801

785-
pgFileDelete(file->mode, full_path);
802+
pgFileDelete(file->mode, full_path, ERROR);
786803
}
787804

788805
parray_walk(files, pgFileFree);
@@ -826,6 +843,10 @@ delete_walfiles_in_tli(InstanceState *instanceState, XLogRecPtr keep_lsn, timeli
826843
size_t wal_size_actual = 0;
827844
char wal_pretty_size[20];
828845
bool purge_all = false;
846+
// multi-thread stuff
847+
pthread_t *threads;
848+
delete_files_arg *threads_args;
849+
bool delete_isok = true;
829850

830851

831852
/* Timeline is completely empty */
@@ -925,22 +946,105 @@ delete_walfiles_in_tli(InstanceState *instanceState, XLogRecPtr keep_lsn, timeli
925946
if (dry_run)
926947
return;
927948

949+
/* init thread args with own file lists */
950+
threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);
951+
threads_args = (delete_files_arg *) palloc(sizeof(delete_files_arg)*num_threads);
952+
953+
for (i = 0; i < num_threads; i++)
954+
{
955+
delete_files_arg *arg = &(threads_args[i]);
956+
957+
arg->purge_all = purge_all;
958+
arg->OldestToKeepSegNo = OldestToKeepSegNo;
959+
arg->archive_root_dir = instanceState->instance_wal_subdir_path;
960+
arg->xlog_filelist = tlinfo->xlog_filelist;
961+
arg->thread_num = i+1;
962+
/* By default there are some error */
963+
arg->ret = 1;
964+
}
965+
966+
/* Run threads */
967+
thread_interrupted = false;
968+
for (i = 0; i < num_threads; i++)
969+
{
970+
delete_files_arg *arg = &(threads_args[i]);
971+
972+
elog(VERBOSE, "Start thread num: %i", i);
973+
pthread_create(&threads[i], NULL, delete_walfiles_in_tli_internal, arg);
974+
}
975+
976+
/* Wait threads */
977+
for (i = 0; i < num_threads; i++)
978+
{
979+
pthread_join(threads[i], NULL);
980+
if (threads_args[i].ret == 1)
981+
delete_isok = false;
982+
}
983+
984+
/* TODO: */
985+
//if delete_isok
986+
987+
/* cleanup */
928988
for (i = 0; i < parray_num(tlinfo->xlog_filelist); i++)
929989
{
930990
xlogFile *wal_file = (xlogFile *) parray_get(tlinfo->xlog_filelist, i);
931991

932-
if (interrupted)
992+
if (wal_file->deleted)
993+
{
994+
pgXlogFileFree(wal_file);
995+
parray_remove(tlinfo->xlog_filelist, i);
996+
i--;
997+
}
998+
}
999+
pg_free(threads);
1000+
pg_free(threads_args);
1001+
1002+
/* Remove empty subdirectories */
1003+
if (!instanceState->wal_archive_subdirs)
1004+
return;
1005+
1006+
for (i = 0; i < parray_num(instanceState->wal_archive_subdirs); i++)
1007+
{
1008+
char fullpath[MAXPGPATH];
1009+
pgFile *file = (pgFile *) parray_get(instanceState->wal_archive_subdirs, i);
1010+
1011+
join_path_components(fullpath, instanceState->instance_wal_subdir_path, file->name);
1012+
1013+
if (dir_is_empty(fullpath, FIO_LOCAL_HOST))
1014+
{
1015+
pgFileDelete(file->mode, fullpath, WARNING); /* WARNING (not ERROR) due to possible race condition */
1016+
pgFileFree(file);
1017+
parray_remove(instanceState->wal_archive_subdirs, i);
1018+
i--;
1019+
}
1020+
}
1021+
}
1022+
1023+
void *
1024+
delete_walfiles_in_tli_internal(void *arg)
1025+
{
1026+
int i;
1027+
delete_files_arg *args = (delete_files_arg *) arg;
1028+
1029+
for (i = 0; i < parray_num(args->xlog_filelist); i++)
1030+
{
1031+
xlogFile *wal_file = (xlogFile *) parray_get(args->xlog_filelist, i);
1032+
1033+
if (interrupted || thread_interrupted)
9331034
elog(ERROR, "interrupted during WAL archive purge");
9341035

1036+
if (!pg_atomic_test_set_flag(&wal_file->lock))
1037+
continue;
1038+
9351039
/*
9361040
* Any segment equal or greater than EndSegNo must be kept
9371041
* unless it`s a 'purge all' scenario.
9381042
*/
939-
if (purge_all || wal_file->segno < OldestToKeepSegNo)
1043+
if (args->purge_all || wal_file->segno < args->OldestToKeepSegNo)
9401044
{
9411045
char wal_fullpath[MAXPGPATH];
9421046

943-
join_path_components(wal_fullpath, instanceState->instance_wal_subdir_path, wal_file->file.rel_path);
1047+
join_path_components(wal_fullpath, args->archive_root_dir, wal_file->file.rel_path);
9441048

9451049
/* save segment from purging */
9461050
if (instance_config.wal_depth >= 0 && wal_file->keep)
@@ -954,8 +1058,8 @@ delete_walfiles_in_tli(InstanceState *instanceState, XLogRecPtr keep_lsn, timeli
9541058
{
9551059
/* Missing file is not considered as error condition */
9561060
if (errno != ENOENT)
957-
elog(ERROR, "Could not remove file \"%s\": %s",
958-
wal_fullpath, strerror(errno));
1061+
elog(ERROR, "[Thread: %d] Could not remove file \"%s\": %s",
1062+
args->thread_num, wal_fullpath, strerror(errno));
9591063
}
9601064
else
9611065
{
@@ -970,33 +1074,11 @@ delete_walfiles_in_tli(InstanceState *instanceState, XLogRecPtr keep_lsn, timeli
9701074
}
9711075

9721076
wal_deleted = true;
973-
974-
/* cleanup */
975-
pgXlogFileFree(wal_file);
976-
parray_remove(tlinfo->xlog_filelist, i);
977-
i--;
1077+
wal_file->deleted = true;
9781078
}
9791079
}
9801080

981-
/* Remove empty subdirectories */
982-
if (!instanceState->wal_archive_subdirs)
983-
return;
984-
985-
for (i = 0; i < parray_num(instanceState->wal_archive_subdirs); i++)
986-
{
987-
char fullpath[MAXPGPATH];
988-
pgFile *file = (pgFile *) parray_get(instanceState->wal_archive_subdirs, i);
989-
990-
join_path_components(fullpath, instanceState->instance_wal_subdir_path, file->name);
991-
992-
if (dir_is_empty(fullpath, FIO_LOCAL_HOST))
993-
{
994-
pgFileDelete(file->mode, fullpath);
995-
pgFileFree(file);
996-
parray_remove(instanceState->wal_archive_subdirs, i);
997-
i--;
998-
}
999-
}
1081+
return NULL;
10001082
}
10011083

10021084

0 commit comments

Comments
 (0)