Skip to content

bpo-31530: stop crashes when iterating over a file on multiple threads #3670

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Lib/test/test_file2k.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,16 @@ def io_func():
self.f.writelines('')
self._test_close_open_io(io_func)

def test_iteration_torture(self):
with open(self.filename, "wb") as fp:
for i in xrange(2**20):
fp.write(b"0"*50 + b"\n")
with open(self.filename, "rb") as f:
def it():
for l in f:
pass
self._run_workers(it, 10)


@unittest.skipUnless(os.name == 'posix', 'test requires a posix system.')
class TestFileSignalEINTR(unittest.TestCase):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Prevent crashes when multiple threads iterate of a file at once.
105 changes: 67 additions & 38 deletions Objects/fileobject.c
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,12 @@ err_iterbuffered(void)
return NULL;
}

static void drop_readahead(PyFileObject *);
static void
drop_file_readahead(PyFileObject *f)
{
PyMem_FREE(f->f_buf);
f->f_buf = NULL;
}

/* Methods */

Expand All @@ -627,7 +632,7 @@ file_dealloc(PyFileObject *f)
Py_XDECREF(f->f_mode);
Py_XDECREF(f->f_encoding);
Py_XDECREF(f->f_errors);
drop_readahead(f);
drop_file_readahead(f);
Py_TYPE(f)->tp_free((PyObject *)f);
}

Expand Down Expand Up @@ -762,7 +767,7 @@ file_seek(PyFileObject *f, PyObject *args)

if (f->f_fp == NULL)
return err_closed();
drop_readahead(f);
drop_file_readahead(f);
whence = 0;
if (!PyArg_ParseTuple(args, "O|i:seek", &offobj, &whence))
return NULL;
Expand Down Expand Up @@ -2221,48 +2226,51 @@ static PyGetSetDef file_getsetlist[] = {
{0},
};

typedef struct {
char *buf, *bufptr, *bufend;
} readaheadbuffer;

static void
drop_readahead(PyFileObject *f)
drop_readaheadbuffer(readaheadbuffer *rab)
{
if (f->f_buf != NULL) {
PyMem_Free(f->f_buf);
f->f_buf = NULL;
if (rab->buf != NULL) {
PyMem_FREE(rab->buf);
rab->buf = NULL;
}
}

/* Make sure that file has a readahead buffer with at least one byte
(unless at EOF) and no more than bufsize. Returns negative value on
error, will set MemoryError if bufsize bytes cannot be allocated. */
static int
readahead(PyFileObject *f, Py_ssize_t bufsize)
readahead(PyFileObject *f, readaheadbuffer *rab, Py_ssize_t bufsize)
{
Py_ssize_t chunksize;

if (f->f_buf != NULL) {
if( (f->f_bufend - f->f_bufptr) >= 1)
if (rab->buf != NULL) {
if ((rab->bufend - rab->bufptr) >= 1)
return 0;
else
drop_readahead(f);
drop_readaheadbuffer(rab);
}
if ((f->f_buf = (char *)PyMem_Malloc(bufsize)) == NULL) {
if ((rab->buf = PyMem_MALLOC(bufsize)) == NULL) {
PyErr_NoMemory();
return -1;
}
FILE_BEGIN_ALLOW_THREADS(f)
errno = 0;
chunksize = Py_UniversalNewlineFread(
f->f_buf, bufsize, f->f_fp, (PyObject *)f);
chunksize = Py_UniversalNewlineFread(rab->buf, bufsize, f->f_fp, (PyObject *)f);
FILE_END_ALLOW_THREADS(f)
if (chunksize == 0) {
if (ferror(f->f_fp)) {
PyErr_SetFromErrno(PyExc_IOError);
clearerr(f->f_fp);
drop_readahead(f);
drop_readaheadbuffer(rab);
return -1;
}
}
f->f_bufptr = f->f_buf;
f->f_bufend = f->f_buf + chunksize;
rab->bufptr = rab->buf;
rab->bufend = rab->buf + chunksize;
return 0;
}

Expand All @@ -2272,45 +2280,43 @@ readahead(PyFileObject *f, Py_ssize_t bufsize)
logarithmic buffer growth to about 50 even when reading a 1gb line. */

static PyStringObject *
readahead_get_line_skip(PyFileObject *f, Py_ssize_t skip, Py_ssize_t bufsize)
readahead_get_line_skip(PyFileObject *f, readaheadbuffer *rab, Py_ssize_t skip, Py_ssize_t bufsize)
{
PyStringObject* s;
char *bufptr;
char *buf;
Py_ssize_t len;

if (f->f_buf == NULL)
if (readahead(f, bufsize) < 0)
if (rab->buf == NULL)
if (readahead(f, rab, bufsize) < 0)
return NULL;

len = f->f_bufend - f->f_bufptr;
len = rab->bufend - rab->bufptr;
if (len == 0)
return (PyStringObject *)
PyString_FromStringAndSize(NULL, skip);
bufptr = (char *)memchr(f->f_bufptr, '\n', len);
return (PyStringObject *)PyString_FromStringAndSize(NULL, skip);
bufptr = (char *)memchr(rab->bufptr, '\n', len);
if (bufptr != NULL) {
bufptr++; /* Count the '\n' */
len = bufptr - f->f_bufptr;
s = (PyStringObject *)
PyString_FromStringAndSize(NULL, skip + len);
len = bufptr - rab->bufptr;
s = (PyStringObject *)PyString_FromStringAndSize(NULL, skip + len);
if (s == NULL)
return NULL;
memcpy(PyString_AS_STRING(s) + skip, f->f_bufptr, len);
f->f_bufptr = bufptr;
if (bufptr == f->f_bufend)
drop_readahead(f);
memcpy(PyString_AS_STRING(s) + skip, rab->bufptr, len);
rab->bufptr = bufptr;
if (bufptr == rab->bufend)
drop_readaheadbuffer(rab);
} else {
bufptr = f->f_bufptr;
buf = f->f_buf;
f->f_buf = NULL; /* Force new readahead buffer */
bufptr = rab->bufptr;
buf = rab->buf;
rab->buf = NULL; /* Force new readahead buffer */
assert(len <= PY_SSIZE_T_MAX - skip);
s = readahead_get_line_skip(f, skip + len, bufsize + (bufsize>>2));
s = readahead_get_line_skip(f, rab, skip + len, bufsize + (bufsize>>2));
if (s == NULL) {
PyMem_Free(buf);
PyMem_FREE(buf);
return NULL;
}
memcpy(PyString_AS_STRING(s) + skip, bufptr, len);
PyMem_Free(buf);
PyMem_FREE(buf);
}
return s;
}
Expand All @@ -2328,7 +2334,30 @@ file_iternext(PyFileObject *f)
if (!f->readable)
return err_mode("reading");

l = readahead_get_line_skip(f, 0, READAHEAD_BUFSIZE);
{
/*
Multiple threads can enter this method while the GIL is released
during file read and wreak havoc on the file object's readahead
buffer. To avoid dealing with cross-thread coordination issues, we
cache the file buffer state locally and only set it back on the file
object when we're done.
*/
readaheadbuffer rab = {f->f_buf, f->f_bufptr, f->f_bufend};
f->f_buf = NULL;
l = readahead_get_line_skip(f, &rab, 0, READAHEAD_BUFSIZE);
/*
Make sure the file's internal read buffer is cleared out. This will
only do anything if some other thread interleaved with us during
readahead. We want to drop any changeling buffer, so we don't leak
memory. We may lose data, but that's what you get for reading the same
file object in multiple threads.
*/
drop_file_readahead(f);
f->f_buf = rab.buf;
f->f_bufptr = rab.bufptr;
f->f_bufend = rab.bufend;
}

if (l == NULL || PyString_GET_SIZE(l) == 0) {
Py_XDECREF(l);
return NULL;
Expand Down