12
12
from abc import abstractmethod
13
13
from dataclasses import dataclass
14
14
from pathlib import Path
15
- from typing import Any , Iterator , NamedTuple , Pattern , Union
15
+ from typing import Any , Iterator , NamedTuple , NoReturn , Pattern , Union
16
16
from typing_extensions import Final , TypeAlias as _TypeAlias
17
17
18
18
import pytest
@@ -77,11 +77,19 @@ def parse_test_case(case: DataDrivenTestCase) -> None:
77
77
targets : dict [int , list [str ]] = {} # Fine-grained targets (per fine-grained update)
78
78
test_modules : list [str ] = [] # Modules which are deemed "test" (vs "fixture")
79
79
80
+ def _case_fail (msg : str ) -> NoReturn :
81
+ pytest .fail (f"{ case .file } :{ case .line } : { msg } " , pytrace = False )
82
+
80
83
# Process the parsed items. Each item has a header of form [id args],
81
84
# optionally followed by lines of text.
82
85
item = first_item = test_items [0 ]
83
86
test_modules .append ("__main__" )
84
87
for item in test_items [1 :]:
88
+
89
+ def _item_fail (msg : str ) -> NoReturn :
90
+ item_abs_line = case .line + item .line - 2
91
+ pytest .fail (f"{ case .file } :{ item_abs_line } : { msg } " , pytrace = False )
92
+
85
93
if item .id in {"file" , "fixture" , "outfile" , "outfile-re" }:
86
94
# Record an extra file needed for the test case.
87
95
assert item .arg is not None
@@ -132,9 +140,11 @@ def parse_test_case(case: DataDrivenTestCase) -> None:
132
140
# File/directory to delete during a multi-step test case
133
141
assert item .arg is not None
134
142
m = re .match (r"(.*)\.([0-9]+)$" , item .arg )
135
- assert m , f"Invalid delete section: { item .arg } "
143
+ if m is None :
144
+ _item_fail (f"Invalid delete section { item .arg !r} " )
136
145
num = int (m .group (2 ))
137
- assert num >= 2 , f"Can't delete during step { num } "
146
+ if num < 2 :
147
+ _item_fail (f"Can't delete during step { num } " )
138
148
full = join (base_path , m .group (1 ))
139
149
deleted_paths .setdefault (num , set ()).add (full )
140
150
elif re .match (r"out[0-9]*$" , item .id ):
@@ -150,29 +160,18 @@ def parse_test_case(case: DataDrivenTestCase) -> None:
150
160
if arg .startswith ("version" ):
151
161
compare_op = arg [7 :9 ]
152
162
if compare_op not in {">=" , "==" }:
153
- raise ValueError (
154
- "{}, line {}: Only >= and == version checks are currently supported" .format (
155
- case .file , item .line
156
- )
157
- )
163
+ _item_fail ("Only >= and == version checks are currently supported" )
158
164
version_str = arg [9 :]
159
165
try :
160
166
version = tuple (int (x ) for x in version_str .split ("." ))
161
167
except ValueError :
162
- raise ValueError (
163
- '{}, line {}: "{}" is not a valid python version' .format (
164
- case .file , item .line , version_str
165
- )
166
- )
168
+ _item_fail (f"{ version_str !r} is not a valid python version" )
167
169
if compare_op == ">=" :
168
170
version_check = sys .version_info >= version
169
171
elif compare_op == "==" :
170
172
if not 1 < len (version ) < 4 :
171
- raise ValueError (
172
- "{}, line {}: Only minor or patch version checks "
173
- 'are currently supported with "==": "{}"' .format (
174
- case .file , item .line , version_str
175
- )
173
+ _item_fail (
174
+ f'Only minor or patch version checks are currently supported with "==": { version_str !r} '
176
175
)
177
176
version_check = sys .version_info [: len (version )] == version
178
177
if version_check :
@@ -189,10 +188,11 @@ def parse_test_case(case: DataDrivenTestCase) -> None:
189
188
elif item .id == "triggered" and item .arg is None :
190
189
triggered = item .data
191
190
else :
192
- raise ValueError (f"Invalid section header { item .id } in { case .file } :{ item .line } " )
191
+ section_str = item .id + (f" { item .arg } " if item .arg else "" )
192
+ _item_fail (f"Invalid section header [{ section_str } ] in case { case .name !r} " )
193
193
194
194
if out_section_missing :
195
- raise ValueError (f"{ case . file } , line { first_item . line } : Required output section not found" )
195
+ _case_fail (f"Required output section not found in case { case . name !r } " )
196
196
197
197
for passnum in stale_modules .keys ():
198
198
if passnum not in rechecked_modules :
@@ -204,11 +204,7 @@ def parse_test_case(case: DataDrivenTestCase) -> None:
204
204
and passnum in rechecked_modules
205
205
and not stale_modules [passnum ].issubset (rechecked_modules [passnum ])
206
206
):
207
- raise ValueError (
208
- (
209
- "Stale modules after pass {} must be a subset of rechecked modules ({}:{})"
210
- ).format (passnum , case .file , first_item .line )
211
- )
207
+ _case_fail (f"Stale modules after pass { passnum } must be a subset of rechecked modules" )
212
208
213
209
output_inline_start = len (output )
214
210
input = first_item .data
@@ -219,10 +215,7 @@ def parse_test_case(case: DataDrivenTestCase) -> None:
219
215
seen_files = set ()
220
216
for file , _ in files :
221
217
if file in seen_files :
222
- raise ValueError (
223
- f"{ case .file } , line { first_item .line } : Duplicated filename { file } . Did you include"
224
- " it multiple times?"
225
- )
218
+ _case_fail (f"Duplicated filename { file } . Did you include it multiple times?" )
226
219
227
220
seen_files .add (file )
228
221
@@ -367,12 +360,13 @@ def setup(self) -> None:
367
360
self .steps = [steps .get (num , []) for num in range (2 , max_step + 1 )]
368
361
369
362
def teardown (self ) -> None :
370
- assert self .old_cwd is not None and self .tmpdir is not None , "test was not properly set up"
371
- os .chdir (self .old_cwd )
372
- try :
373
- self .tmpdir .cleanup ()
374
- except OSError :
375
- pass
363
+ if self .old_cwd is not None :
364
+ os .chdir (self .old_cwd )
365
+ if self .tmpdir is not None :
366
+ try :
367
+ self .tmpdir .cleanup ()
368
+ except OSError :
369
+ pass
376
370
self .old_cwd = None
377
371
self .tmpdir = None
378
372
@@ -634,6 +628,16 @@ def pytest_pycollect_makeitem(collector: Any, name: str, obj: object) -> Any | N
634
628
return None
635
629
636
630
631
+ _case_name_pattern = re .compile (
632
+ r"(?P<name>[a-zA-Z_0-9]+)"
633
+ r"(?P<writescache>-writescache)?"
634
+ r"(?P<only_when>-only_when_cache|-only_when_nocache)?"
635
+ r"(-(?P<platform>posix|windows))?"
636
+ r"(?P<skip>-skip)?"
637
+ r"(?P<xfail>-xfail)?"
638
+ )
639
+
640
+
637
641
def split_test_cases (
638
642
parent : DataFileCollector , suite : DataSuite , file : str
639
643
) -> Iterator [DataDrivenTestCase ]:
@@ -644,40 +648,33 @@ def split_test_cases(
644
648
"""
645
649
with open (file , encoding = "utf-8" ) as f :
646
650
data = f .read ()
647
- # number of groups in the below regex
648
- NUM_GROUPS = 7
649
- cases = re .split (
650
- r"^\[case ([a-zA-Z_0-9]+)"
651
- r"(-writescache)?"
652
- r"(-only_when_cache|-only_when_nocache)?"
653
- r"(-posix|-windows)?"
654
- r"(-skip)?"
655
- r"(-xfail)?"
656
- r"\][ \t]*$\n" ,
657
- data ,
658
- flags = re .DOTALL | re .MULTILINE ,
659
- )
660
- line_no = cases [0 ].count ("\n " ) + 1
651
+ cases = re .split (r"^\[case ([^]+)]+)\][ \t]*$\n" , data , flags = re .DOTALL | re .MULTILINE )
652
+ cases_iter = iter (cases )
653
+ line_no = next (cases_iter ).count ("\n " ) + 1
661
654
test_names = set ()
662
- for i in range (1 , len (cases ), NUM_GROUPS ):
663
- name , writescache , only_when , platform_flag , skip , xfail , data = cases [i : i + NUM_GROUPS ]
655
+ for case_id in cases_iter :
656
+ data = next (cases_iter )
657
+
658
+ m = _case_name_pattern .fullmatch (case_id )
659
+ if not m :
660
+ raise RuntimeError (f"Invalid testcase id { case_id !r} " )
661
+ name = m .group ("name" )
664
662
if name in test_names :
665
663
raise RuntimeError (
666
664
'Found a duplicate test name "{}" in {} on line {}' .format (
667
665
name , parent .name , line_no
668
666
)
669
667
)
670
- platform = platform_flag [1 :] if platform_flag else None
671
668
yield DataDrivenTestCase .from_parent (
672
669
parent = parent ,
673
670
suite = suite ,
674
671
file = file ,
675
672
name = add_test_name_suffix (name , suite .test_name_suffix ),
676
- writescache = bool (writescache ),
677
- only_when = only_when ,
678
- platform = platform ,
679
- skip = bool (skip ),
680
- xfail = bool (xfail ),
673
+ writescache = bool (m . group ( " writescache" ) ),
674
+ only_when = m . group ( " only_when" ) ,
675
+ platform = m . group ( " platform" ) ,
676
+ skip = bool (m . group ( " skip" ) ),
677
+ xfail = bool (m . group ( " xfail" ) ),
681
678
data = data ,
682
679
line = line_no ,
683
680
)
0 commit comments