|
21 | 21 | from test.support import os_helper
|
22 | 22 | from test.support import (
|
23 | 23 | STDLIB_DIR, is_jython, swap_attr, swap_item, cpython_only, is_emscripten,
|
24 |
| - is_wasi) |
| 24 | + is_wasi, run_in_subinterp_with_config) |
25 | 25 | from test.support.import_helper import (
|
26 | 26 | forget, make_legacy_pyc, unlink, unload, DirsOnSysPath, CleanImport)
|
27 | 27 | from test.support.os_helper import (
|
|
30 | 30 | from test.support import threading_helper
|
31 | 31 | from test.test_importlib.util import uncache
|
32 | 32 | from types import ModuleType
|
| 33 | +try: |
| 34 | + import _testsinglephase |
| 35 | +except ImportError: |
| 36 | + _testsinglephase = None |
| 37 | +try: |
| 38 | + import _testmultiphase |
| 39 | +except ImportError: |
| 40 | + _testmultiphase = None |
33 | 41 |
|
34 | 42 |
|
35 | 43 | skip_if_dont_write_bytecode = unittest.skipIf(
|
@@ -1395,6 +1403,134 @@ def test_unwritable_module(self):
|
1395 | 1403 | unwritable.x = 42
|
1396 | 1404 |
|
1397 | 1405 |
|
| 1406 | +class SubinterpImportTests(unittest.TestCase): |
| 1407 | + |
| 1408 | + RUN_KWARGS = dict( |
| 1409 | + allow_fork=False, |
| 1410 | + allow_exec=False, |
| 1411 | + allow_threads=True, |
| 1412 | + allow_daemon_threads=False, |
| 1413 | + ) |
| 1414 | + |
| 1415 | + @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") |
| 1416 | + def pipe(self): |
| 1417 | + r, w = os.pipe() |
| 1418 | + self.addCleanup(os.close, r) |
| 1419 | + self.addCleanup(os.close, w) |
| 1420 | + if hasattr(os, 'set_blocking'): |
| 1421 | + os.set_blocking(r, False) |
| 1422 | + return (r, w) |
| 1423 | + |
| 1424 | + def import_script(self, name, fd): |
| 1425 | + return textwrap.dedent(f''' |
| 1426 | + import os, sys |
| 1427 | + try: |
| 1428 | + import {name} |
| 1429 | + except ImportError as exc: |
| 1430 | + text = 'ImportError: ' + str(exc) |
| 1431 | + else: |
| 1432 | + text = 'okay' |
| 1433 | + os.write({fd}, text.encode('utf-8')) |
| 1434 | + ''') |
| 1435 | + |
| 1436 | + def check_compatible_shared(self, name): |
| 1437 | + # Verify that the named module may be imported in a subinterpreter. |
| 1438 | + # |
| 1439 | + # The subinterpreter will be in the current process. |
| 1440 | + # The module will have already been imported in the main interpreter. |
| 1441 | + # Thus, for extension/builtin modules, the module definition will |
| 1442 | + # have been loaded already and cached globally. |
| 1443 | + |
| 1444 | + # This check should always pass for all modules if not strict. |
| 1445 | + |
| 1446 | + __import__(name) |
| 1447 | + |
| 1448 | + r, w = self.pipe() |
| 1449 | + ret = run_in_subinterp_with_config( |
| 1450 | + self.import_script(name, w), |
| 1451 | + **self.RUN_KWARGS, |
| 1452 | + ) |
| 1453 | + self.assertEqual(ret, 0) |
| 1454 | + out = os.read(r, 100) |
| 1455 | + self.assertEqual(out, b'okay') |
| 1456 | + |
| 1457 | + def check_compatible_isolated(self, name): |
| 1458 | + # Differences from check_compatible_shared(): |
| 1459 | + # * subinterpreter in a new process |
| 1460 | + # * module has never been imported before in that process |
| 1461 | + # * this tests importing the module for the first time |
| 1462 | + _, out, err = script_helper.assert_python_ok('-c', textwrap.dedent(f''' |
| 1463 | + import _testcapi, sys |
| 1464 | + assert ( |
| 1465 | + {name!r} in sys.builtin_module_names or |
| 1466 | + {name!r} not in sys.modules |
| 1467 | + ), repr({name!r}) |
| 1468 | + ret = _testcapi.run_in_subinterp_with_config( |
| 1469 | + {self.import_script(name, "sys.stdout.fileno()")!r}, |
| 1470 | + **{self.RUN_KWARGS}, |
| 1471 | + ) |
| 1472 | + assert ret == 0, ret |
| 1473 | + ''')) |
| 1474 | + self.assertEqual(err, b'') |
| 1475 | + self.assertEqual(out, b'okay') |
| 1476 | + |
| 1477 | + def check_incompatible_isolated(self, name): |
| 1478 | + # Differences from check_compatible_isolated(): |
| 1479 | + # * verify that import fails |
| 1480 | + _, out, err = script_helper.assert_python_ok('-c', textwrap.dedent(f''' |
| 1481 | + import _testcapi, sys |
| 1482 | + assert {name!r} not in sys.modules, {name!r} |
| 1483 | + ret = _testcapi.run_in_subinterp_with_config( |
| 1484 | + {self.import_script(name, "sys.stdout.fileno()")!r}, |
| 1485 | + **{self.RUN_KWARGS}, |
| 1486 | + ) |
| 1487 | + assert ret == 0, ret |
| 1488 | + ''')) |
| 1489 | + self.assertEqual(err, b'') |
| 1490 | + self.assertEqual( |
| 1491 | + out.decode('utf-8'), |
| 1492 | + f'ImportError: module {name} does not support loading in subinterpreters', |
| 1493 | + ) |
| 1494 | + |
| 1495 | + def test_builtin_compat(self): |
| 1496 | + module = 'sys' |
| 1497 | + with self.subTest(f'{module}: shared'): |
| 1498 | + self.check_compatible_shared(module) |
| 1499 | + |
| 1500 | + @cpython_only |
| 1501 | + def test_frozen_compat(self): |
| 1502 | + module = '_frozen_importlib' |
| 1503 | + if __import__(module).__spec__.origin != 'frozen': |
| 1504 | + raise unittest.SkipTest(f'{module} is unexpectedly not frozen') |
| 1505 | + with self.subTest(f'{module}: shared'): |
| 1506 | + self.check_compatible_shared(module) |
| 1507 | + |
| 1508 | + @unittest.skipIf(_testsinglephase is None, "test requires _testsinglephase module") |
| 1509 | + def test_single_init_extension_compat(self): |
| 1510 | + module = '_testsinglephase' |
| 1511 | + with self.subTest(f'{module}: shared'): |
| 1512 | + self.check_compatible_shared(module) |
| 1513 | + with self.subTest(f'{module}: isolated'): |
| 1514 | + self.check_compatible_isolated(module) |
| 1515 | + |
| 1516 | + @unittest.skipIf(_testmultiphase is None, "test requires _testmultiphase module") |
| 1517 | + def test_multi_init_extension_compat(self): |
| 1518 | + module = '_testmultiphase' |
| 1519 | + with self.subTest(f'{module}: shared'): |
| 1520 | + self.check_compatible_shared(module) |
| 1521 | + with self.subTest(f'{module}: isolated'): |
| 1522 | + self.check_compatible_isolated(module) |
| 1523 | + |
| 1524 | + def test_python_compat(self): |
| 1525 | + module = 'threading' |
| 1526 | + if __import__(module).__spec__.origin == 'frozen': |
| 1527 | + raise unittest.SkipTest(f'{module} is unexpectedly frozen') |
| 1528 | + with self.subTest(f'{module}: shared'): |
| 1529 | + self.check_compatible_shared(module) |
| 1530 | + with self.subTest(f'{module}: isolated'): |
| 1531 | + self.check_compatible_isolated(module) |
| 1532 | + |
| 1533 | + |
1398 | 1534 | if __name__ == '__main__':
|
1399 | 1535 | # Test needs to be a package, so we can do relative imports.
|
1400 | 1536 | unittest.main()
|
0 commit comments