Skip to content

Commit dbf52e0

Browse files
authored
bpo-31530: fix crash when multiple threads iterate over a file, round 2 (#5060)
Multiple threads iterating over a file can corrupt the file's internal readahead buffer resulting in crashes. To fix this, cache buffer state thread-locally for the duration of a file_iternext call and only update the file's internal state after reading completes. No attempt is made to define or provide "reasonable" semantics for iterating over a file on multiple threads. (Non-crashing) races are still present. Duplicated, corrupt, and missing data will happen. This was originally fixed by 6401e56, which raised an exception from seek() and next() when concurrent operations were detected. Alas, this simpler solution breaks legitimate use cases such as capturing the standard streams when multiple threads are logging.
1 parent 0e0d101 commit dbf52e0

File tree

3 files changed

+78
-70
lines changed

3 files changed

+78
-70
lines changed

Lib/test/test_file2k.py

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -653,18 +653,15 @@ def io_func():
653653
self._test_close_open_io(io_func)
654654

655655
def test_iteration_torture(self):
656-
# bpo-31530: Crash when concurrently iterate over a file.
656+
# bpo-31530
657657
with open(self.filename, "wb") as fp:
658658
for i in xrange(2**20):
659659
fp.write(b"0"*50 + b"\n")
660660
with open(self.filename, "rb") as f:
661-
def iterate():
662-
try:
663-
for l in f:
664-
pass
665-
except IOError:
661+
def it():
662+
for l in f:
666663
pass
667-
self._run_workers(iterate, 10)
664+
self._run_workers(it, 10)
668665

669666
def test_iteration_seek(self):
670667
# bpo-31530: Crash when concurrently seek and iterate over a file.
@@ -674,17 +671,15 @@ def test_iteration_seek(self):
674671
with open(self.filename, "rb") as f:
675672
it = iter([1] + [0]*10) # one thread reads, others seek
676673
def iterate():
677-
try:
678-
if next(it):
679-
for l in f:
680-
pass
681-
else:
682-
for i in range(100):
683-
f.seek(i*100, 0)
684-
except IOError:
685-
pass
674+
if next(it):
675+
for l in f:
676+
pass
677+
else:
678+
for i in xrange(100):
679+
f.seek(i*100, 0)
686680
self._run_workers(iterate, 10)
687681

682+
688683
@unittest.skipUnless(os.name == 'posix', 'test requires a posix system.')
689684
class TestFileSignalEINTR(unittest.TestCase):
690685
def _test_reading(self, data_to_write, read_and_verify_code, method_name,
Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1 @@
11
Fixed crashes when iterating over a file on multiple threads.
2-
seek() and next() methods of file objects now raise an exception during
3-
concurrent operation on the same file object.
4-
A lock can be used to prevent the error.

Objects/fileobject.c

Lines changed: 67 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -609,7 +609,12 @@ err_iterbuffered(void)
609609
return NULL;
610610
}
611611

612-
static void drop_readahead(PyFileObject *);
612+
static void
613+
drop_file_readahead(PyFileObject *f)
614+
{
615+
PyMem_FREE(f->f_buf);
616+
f->f_buf = NULL;
617+
}
613618

614619
/* Methods */
615620

@@ -632,7 +637,7 @@ file_dealloc(PyFileObject *f)
632637
Py_XDECREF(f->f_mode);
633638
Py_XDECREF(f->f_encoding);
634639
Py_XDECREF(f->f_errors);
635-
drop_readahead(f);
640+
drop_file_readahead(f);
636641
Py_TYPE(f)->tp_free((PyObject *)f);
637642
}
638643

@@ -767,13 +772,7 @@ file_seek(PyFileObject *f, PyObject *args)
767772

768773
if (f->f_fp == NULL)
769774
return err_closed();
770-
if (f->unlocked_count > 0) {
771-
PyErr_SetString(PyExc_IOError,
772-
"seek() called during concurrent "
773-
"operation on the same file object");
774-
return NULL;
775-
}
776-
drop_readahead(f);
775+
drop_file_readahead(f);
777776
whence = 0;
778777
if (!PyArg_ParseTuple(args, "O|i:seek", &offobj, &whence))
779778
return NULL;
@@ -2242,49 +2241,51 @@ static PyGetSetDef file_getsetlist[] = {
22422241
{0},
22432242
};
22442243

2244+
typedef struct {
2245+
char *buf, *bufptr, *bufend;
2246+
} readaheadbuffer;
2247+
22452248
static void
2246-
drop_readahead(PyFileObject *f)
2249+
drop_readaheadbuffer(readaheadbuffer *rab)
22472250
{
2248-
if (f->f_buf != NULL) {
2249-
PyMem_Free(f->f_buf);
2250-
f->f_buf = NULL;
2251+
if (rab->buf != NULL) {
2252+
PyMem_FREE(rab->buf);
2253+
rab->buf = NULL;
22512254
}
22522255
}
22532256

22542257
/* Make sure that file has a readahead buffer with at least one byte
22552258
(unless at EOF) and no more than bufsize. Returns negative value on
22562259
error, will set MemoryError if bufsize bytes cannot be allocated. */
22572260
static int
2258-
readahead(PyFileObject *f, Py_ssize_t bufsize)
2261+
readahead(PyFileObject *f, readaheadbuffer *rab, Py_ssize_t bufsize)
22592262
{
22602263
Py_ssize_t chunksize;
22612264

2262-
assert(f->unlocked_count == 0);
2263-
if (f->f_buf != NULL) {
2264-
if( (f->f_bufend - f->f_bufptr) >= 1)
2265+
if (rab->buf != NULL) {
2266+
if ((rab->bufend - rab->bufptr) >= 1)
22652267
return 0;
22662268
else
2267-
drop_readahead(f);
2269+
drop_readaheadbuffer(rab);
22682270
}
2269-
if ((f->f_buf = (char *)PyMem_Malloc(bufsize)) == NULL) {
2271+
if ((rab->buf = PyMem_MALLOC(bufsize)) == NULL) {
22702272
PyErr_NoMemory();
22712273
return -1;
22722274
}
22732275
FILE_BEGIN_ALLOW_THREADS(f)
22742276
errno = 0;
2275-
chunksize = Py_UniversalNewlineFread(
2276-
f->f_buf, bufsize, f->f_fp, (PyObject *)f);
2277+
chunksize = Py_UniversalNewlineFread(rab->buf, bufsize, f->f_fp, (PyObject *)f);
22772278
FILE_END_ALLOW_THREADS(f)
22782279
if (chunksize == 0) {
22792280
if (ferror(f->f_fp)) {
22802281
PyErr_SetFromErrno(PyExc_IOError);
22812282
clearerr(f->f_fp);
2282-
drop_readahead(f);
2283+
drop_readaheadbuffer(rab);
22832284
return -1;
22842285
}
22852286
}
2286-
f->f_bufptr = f->f_buf;
2287-
f->f_bufend = f->f_buf + chunksize;
2287+
rab->bufptr = rab->buf;
2288+
rab->bufend = rab->buf + chunksize;
22882289
return 0;
22892290
}
22902291

@@ -2294,51 +2295,43 @@ readahead(PyFileObject *f, Py_ssize_t bufsize)
22942295
logarithmic buffer growth to about 50 even when reading a 1gb line. */
22952296

22962297
static PyStringObject *
2297-
readahead_get_line_skip(PyFileObject *f, Py_ssize_t skip, Py_ssize_t bufsize)
2298+
readahead_get_line_skip(PyFileObject *f, readaheadbuffer *rab, Py_ssize_t skip, Py_ssize_t bufsize)
22982299
{
22992300
PyStringObject* s;
23002301
char *bufptr;
23012302
char *buf;
23022303
Py_ssize_t len;
23032304

2304-
if (f->unlocked_count > 0) {
2305-
PyErr_SetString(PyExc_IOError,
2306-
"next() called during concurrent "
2307-
"operation on the same file object");
2308-
return NULL;
2309-
}
2310-
if (f->f_buf == NULL)
2311-
if (readahead(f, bufsize) < 0)
2305+
if (rab->buf == NULL)
2306+
if (readahead(f, rab, bufsize) < 0)
23122307
return NULL;
23132308

2314-
len = f->f_bufend - f->f_bufptr;
2309+
len = rab->bufend - rab->bufptr;
23152310
if (len == 0)
2316-
return (PyStringObject *)
2317-
PyString_FromStringAndSize(NULL, skip);
2318-
bufptr = (char *)memchr(f->f_bufptr, '\n', len);
2311+
return (PyStringObject *)PyString_FromStringAndSize(NULL, skip);
2312+
bufptr = (char *)memchr(rab->bufptr, '\n', len);
23192313
if (bufptr != NULL) {
23202314
bufptr++; /* Count the '\n' */
2321-
len = bufptr - f->f_bufptr;
2322-
s = (PyStringObject *)
2323-
PyString_FromStringAndSize(NULL, skip + len);
2315+
len = bufptr - rab->bufptr;
2316+
s = (PyStringObject *)PyString_FromStringAndSize(NULL, skip + len);
23242317
if (s == NULL)
23252318
return NULL;
2326-
memcpy(PyString_AS_STRING(s) + skip, f->f_bufptr, len);
2327-
f->f_bufptr = bufptr;
2328-
if (bufptr == f->f_bufend)
2329-
drop_readahead(f);
2319+
memcpy(PyString_AS_STRING(s) + skip, rab->bufptr, len);
2320+
rab->bufptr = bufptr;
2321+
if (bufptr == rab->bufend)
2322+
drop_readaheadbuffer(rab);
23302323
} else {
2331-
bufptr = f->f_bufptr;
2332-
buf = f->f_buf;
2333-
f->f_buf = NULL; /* Force new readahead buffer */
2324+
bufptr = rab->bufptr;
2325+
buf = rab->buf;
2326+
rab->buf = NULL; /* Force new readahead buffer */
23342327
assert(len <= PY_SSIZE_T_MAX - skip);
2335-
s = readahead_get_line_skip(f, skip + len, bufsize + (bufsize>>2));
2328+
s = readahead_get_line_skip(f, rab, skip + len, bufsize + (bufsize>>2));
23362329
if (s == NULL) {
2337-
PyMem_Free(buf);
2330+
PyMem_FREE(buf);
23382331
return NULL;
23392332
}
23402333
memcpy(PyString_AS_STRING(s) + skip, bufptr, len);
2341-
PyMem_Free(buf);
2334+
PyMem_FREE(buf);
23422335
}
23432336
return s;
23442337
}
@@ -2356,7 +2349,30 @@ file_iternext(PyFileObject *f)
23562349
if (!f->readable)
23572350
return err_mode("reading");
23582351

2359-
l = readahead_get_line_skip(f, 0, READAHEAD_BUFSIZE);
2352+
{
2353+
/*
2354+
Multiple threads can enter this method while the GIL is released
2355+
during file read and wreak havoc on the file object's readahead
2356+
buffer. To avoid dealing with cross-thread coordination issues, we
2357+
cache the file buffer state locally and only set it back on the file
2358+
object when we're done.
2359+
*/
2360+
readaheadbuffer rab = {f->f_buf, f->f_bufptr, f->f_bufend};
2361+
f->f_buf = NULL;
2362+
l = readahead_get_line_skip(f, &rab, 0, READAHEAD_BUFSIZE);
2363+
/*
2364+
Make sure the file's internal read buffer is cleared out. This will
2365+
only do anything if some other thread interleaved with us during
2366+
readahead. We want to drop any changeling buffer, so we don't leak
2367+
memory. We may lose data, but that's what you get for reading the same
2368+
file object in multiple threads.
2369+
*/
2370+
drop_file_readahead(f);
2371+
f->f_buf = rab.buf;
2372+
f->f_bufptr = rab.bufptr;
2373+
f->f_bufend = rab.bufend;
2374+
}
2375+
23602376
if (l == NULL || PyString_GET_SIZE(l) == 0) {
23612377
Py_XDECREF(l);
23622378
return NULL;

0 commit comments

Comments
 (0)