1
+ import contextlib
1
2
import faulthandler
2
3
import functools
3
4
import gc
4
5
import importlib
5
6
import io
6
7
import os
7
8
import sys
9
+ import tempfile
8
10
import time
9
11
import traceback
10
12
import unittest
@@ -173,6 +175,63 @@ def get_abs_module(ns: Namespace, test_name: str) -> str:
173
175
return 'test.' + test_name
174
176
175
177
178
+ @contextlib .contextmanager
179
+ def override_fd (fd , fd2 ):
180
+ fd2_copy = os .dup (fd2 )
181
+ try :
182
+ os .dup2 (fd , fd2 )
183
+ yield
184
+ finally :
185
+ os .dup2 (fd2_copy , fd2 )
186
+ os .close (fd2_copy )
187
+
188
+
189
+ def get_stream_fd (stream ):
190
+ if stream is None :
191
+ return None
192
+ try :
193
+ return stream .fileno ()
194
+ except io .UnsupportedOperation :
195
+ return None
196
+
197
+
198
+ @contextlib .contextmanager
199
+ def capture_std_streams ():
200
+ """
201
+ Redirect all standard streams to a temporary file:
202
+
203
+ * stdout and stderr file descriptors (fd 1 and fd 2)
204
+ * sys.stdout, sys.__stdout__
205
+ * sys.stderr, sys.__stderr__
206
+ """
207
+ try :
208
+ stderr_fd = sys .stderr .fileno ()
209
+ except io .UnsupportedOperation :
210
+ stderr_fd = None
211
+
212
+ # Use a temporary file to support fileno() operation
213
+ tmp_file = tempfile .TemporaryFile (mode = 'w+' ,
214
+ # line buffering
215
+ buffering = 1 ,
216
+ encoding = sys .stderr .encoding ,
217
+ errors = sys .stderr .errors )
218
+ with contextlib .ExitStack () as stack :
219
+ stack .enter_context (tmp_file )
220
+
221
+ # Override stdout and stderr file descriptors
222
+ tmp_fd = tmp_file .fileno ()
223
+ for stream in (sys .stdout , sys .stderr ):
224
+ fd = get_stream_fd (stream )
225
+ if fd is not None :
226
+ stack .enter_context (override_fd (tmp_fd , fd ))
227
+
228
+ # Override sys attributes
229
+ for name in ('stdout' , 'stderr' , '__stdout__' , '__stderr__' ):
230
+ stack .enter_context (support .swap_attr (sys , name , tmp_file ))
231
+
232
+ yield tmp_file
233
+
234
+
176
235
def _runtest (ns : Namespace , test_name : str ) -> TestResult :
177
236
# Handle faulthandler timeout, capture stdout+stderr, XML serialization
178
237
# and measure time.
@@ -193,21 +252,17 @@ def _runtest(ns: Namespace, test_name: str) -> TestResult:
193
252
if output_on_failure :
194
253
support .verbose = True
195
254
196
- stream = io .StringIO ()
197
- orig_stdout = sys .stdout
198
- orig_stderr = sys .stderr
199
- try :
200
- sys .stdout = stream
201
- sys .stderr = stream
255
+ output = None
256
+ with capture_std_streams () as stream :
202
257
result = _runtest_inner (ns , test_name ,
203
258
display_failure = False )
204
259
if not isinstance (result , Passed ):
205
- output = stream .getvalue ( )
206
- orig_stderr . write ( output )
207
- orig_stderr . flush ()
208
- finally :
209
- sys .stdout = orig_stdout
210
- sys .stderr = orig_stderr
260
+ stream .seek ( 0 )
261
+ output = stream . read ( )
262
+
263
+ if output is not None :
264
+ sys .stderr . write ( output )
265
+ sys .stderr . flush ()
211
266
else :
212
267
# Tell tests to be moderately quiet
213
268
support .verbose = ns .verbose
0 commit comments