Skip to content

bpo-31530: fix crash when multiple threads iterate over a file, round 2 #5060

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 11 additions & 16 deletions Lib/test/test_file2k.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,18 +653,15 @@ def io_func():
self._test_close_open_io(io_func)

def test_iteration_torture(self):
# bpo-31530: Crash when concurrently iterate over a file.
# bpo-31530
with open(self.filename, "wb") as fp:
for i in xrange(2**20):
fp.write(b"0"*50 + b"\n")
with open(self.filename, "rb") as f:
def iterate():
try:
for l in f:
pass
except IOError:
def it():
for l in f:
pass
self._run_workers(iterate, 10)
self._run_workers(it, 10)

def test_iteration_seek(self):
# bpo-31530: Crash when concurrently seek and iterate over a file.
Expand All @@ -674,17 +671,15 @@ def test_iteration_seek(self):
with open(self.filename, "rb") as f:
it = iter([1] + [0]*10) # one thread reads, others seek
def iterate():
try:
if next(it):
for l in f:
pass
else:
for i in range(100):
f.seek(i*100, 0)
except IOError:
pass
if next(it):
for l in f:
pass
else:
for i in xrange(100):
f.seek(i*100, 0)
self._run_workers(iterate, 10)


@unittest.skipUnless(os.name == 'posix', 'test requires a posix system.')
class TestFileSignalEINTR(unittest.TestCase):
def _test_reading(self, data_to_write, read_and_verify_code, method_name,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
Fixed crashes when iterating over a file on multiple threads.
seek() and next() methods of file objects now raise an exception during
concurrent operation on the same file object.
A lock can be used to prevent the error.
118 changes: 67 additions & 51 deletions Objects/fileobject.c
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,12 @@ err_iterbuffered(void)
return NULL;
}

static void drop_readahead(PyFileObject *);
static void
drop_file_readahead(PyFileObject *f)
{
PyMem_FREE(f->f_buf);
f->f_buf = NULL;
}

/* Methods */

Expand All @@ -632,7 +637,7 @@ file_dealloc(PyFileObject *f)
Py_XDECREF(f->f_mode);
Py_XDECREF(f->f_encoding);
Py_XDECREF(f->f_errors);
drop_readahead(f);
drop_file_readahead(f);
Py_TYPE(f)->tp_free((PyObject *)f);
}

Expand Down Expand Up @@ -767,13 +772,7 @@ file_seek(PyFileObject *f, PyObject *args)

if (f->f_fp == NULL)
return err_closed();
if (f->unlocked_count > 0) {
PyErr_SetString(PyExc_IOError,
"seek() called during concurrent "
"operation on the same file object");
return NULL;
}
drop_readahead(f);
drop_file_readahead(f);
whence = 0;
if (!PyArg_ParseTuple(args, "O|i:seek", &offobj, &whence))
return NULL;
Expand Down Expand Up @@ -2242,49 +2241,51 @@ static PyGetSetDef file_getsetlist[] = {
{0},
};

typedef struct {
char *buf, *bufptr, *bufend;
} readaheadbuffer;

static void
drop_readahead(PyFileObject *f)
drop_readaheadbuffer(readaheadbuffer *rab)
{
if (f->f_buf != NULL) {
PyMem_Free(f->f_buf);
f->f_buf = NULL;
if (rab->buf != NULL) {
PyMem_FREE(rab->buf);
rab->buf = NULL;
}
}

/* Make sure that file has a readahead buffer with at least one byte
(unless at EOF) and no more than bufsize. Returns negative value on
error, will set MemoryError if bufsize bytes cannot be allocated. */
static int
readahead(PyFileObject *f, Py_ssize_t bufsize)
readahead(PyFileObject *f, readaheadbuffer *rab, Py_ssize_t bufsize)
{
Py_ssize_t chunksize;

assert(f->unlocked_count == 0);
if (f->f_buf != NULL) {
if( (f->f_bufend - f->f_bufptr) >= 1)
if (rab->buf != NULL) {
if ((rab->bufend - rab->bufptr) >= 1)
return 0;
else
drop_readahead(f);
drop_readaheadbuffer(rab);
}
if ((f->f_buf = (char *)PyMem_Malloc(bufsize)) == NULL) {
if ((rab->buf = PyMem_MALLOC(bufsize)) == NULL) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is changing PyMem_Malloc to PyMem_MALLOC required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, but I prefer them over the functions.

PyErr_NoMemory();
return -1;
}
FILE_BEGIN_ALLOW_THREADS(f)
errno = 0;
chunksize = Py_UniversalNewlineFread(
f->f_buf, bufsize, f->f_fp, (PyObject *)f);
chunksize = Py_UniversalNewlineFread(rab->buf, bufsize, f->f_fp, (PyObject *)f);
FILE_END_ALLOW_THREADS(f)
if (chunksize == 0) {
if (ferror(f->f_fp)) {
PyErr_SetFromErrno(PyExc_IOError);
clearerr(f->f_fp);
drop_readahead(f);
drop_readaheadbuffer(rab);
return -1;
}
}
f->f_bufptr = f->f_buf;
f->f_bufend = f->f_buf + chunksize;
rab->bufptr = rab->buf;
rab->bufend = rab->buf + chunksize;
return 0;
}

Expand All @@ -2294,51 +2295,43 @@ readahead(PyFileObject *f, Py_ssize_t bufsize)
logarithmic buffer growth to about 50 even when reading a 1gb line. */

static PyStringObject *
readahead_get_line_skip(PyFileObject *f, Py_ssize_t skip, Py_ssize_t bufsize)
readahead_get_line_skip(PyFileObject *f, readaheadbuffer *rab, Py_ssize_t skip, Py_ssize_t bufsize)
{
PyStringObject* s;
char *bufptr;
char *buf;
Py_ssize_t len;

if (f->unlocked_count > 0) {
PyErr_SetString(PyExc_IOError,
"next() called during concurrent "
"operation on the same file object");
return NULL;
}
if (f->f_buf == NULL)
if (readahead(f, bufsize) < 0)
if (rab->buf == NULL)
if (readahead(f, rab, bufsize) < 0)
return NULL;

len = f->f_bufend - f->f_bufptr;
len = rab->bufend - rab->bufptr;
if (len == 0)
return (PyStringObject *)
PyString_FromStringAndSize(NULL, skip);
bufptr = (char *)memchr(f->f_bufptr, '\n', len);
return (PyStringObject *)PyString_FromStringAndSize(NULL, skip);
bufptr = (char *)memchr(rab->bufptr, '\n', len);
if (bufptr != NULL) {
bufptr++; /* Count the '\n' */
len = bufptr - f->f_bufptr;
s = (PyStringObject *)
PyString_FromStringAndSize(NULL, skip + len);
len = bufptr - rab->bufptr;
s = (PyStringObject *)PyString_FromStringAndSize(NULL, skip + len);
if (s == NULL)
return NULL;
memcpy(PyString_AS_STRING(s) + skip, f->f_bufptr, len);
f->f_bufptr = bufptr;
if (bufptr == f->f_bufend)
drop_readahead(f);
memcpy(PyString_AS_STRING(s) + skip, rab->bufptr, len);
rab->bufptr = bufptr;
if (bufptr == rab->bufend)
drop_readaheadbuffer(rab);
} else {
bufptr = f->f_bufptr;
buf = f->f_buf;
f->f_buf = NULL; /* Force new readahead buffer */
bufptr = rab->bufptr;
buf = rab->buf;
rab->buf = NULL; /* Force new readahead buffer */
assert(len <= PY_SSIZE_T_MAX - skip);
s = readahead_get_line_skip(f, skip + len, bufsize + (bufsize>>2));
s = readahead_get_line_skip(f, rab, skip + len, bufsize + (bufsize>>2));
if (s == NULL) {
PyMem_Free(buf);
PyMem_FREE(buf);
return NULL;
}
memcpy(PyString_AS_STRING(s) + skip, bufptr, len);
PyMem_Free(buf);
PyMem_FREE(buf);
}
return s;
}
Expand All @@ -2356,7 +2349,30 @@ file_iternext(PyFileObject *f)
if (!f->readable)
return err_mode("reading");

l = readahead_get_line_skip(f, 0, READAHEAD_BUFSIZE);
{
/*
Multiple threads can enter this method while the GIL is released
during file read and wreak havoc on the file object's readahead
buffer. To avoid dealing with cross-thread coordination issues, we
cache the file buffer state locally and only set it back on the file
object when we're done.
*/
readaheadbuffer rab = {f->f_buf, f->f_bufptr, f->f_bufend};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drop_readaheadbuffer() frees rab->buf which is f->f_buf. If two threads call drop_readaheadbuffer() simultaneously f->f_buf will be freed twice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other threads can't access f->f_buf because it's set to NULL for the duration of the method.

f->f_buf = NULL;
l = readahead_get_line_skip(f, &rab, 0, READAHEAD_BUFSIZE);
/*
Make sure the file's internal read buffer is cleared out. This will
only do anything if some other thread interleaved with us during
readahead. We want to drop any changeling buffer, so we don't leak
memory. We may lose data, but that's what you get for reading the same
file object in multiple threads.
*/
drop_file_readahead(f);
f->f_buf = rab.buf;
f->f_bufptr = rab.bufptr;
f->f_bufend = rab.bufend;
}

if (l == NULL || PyString_GET_SIZE(l) == 0) {
Py_XDECREF(l);
return NULL;
Expand Down