@@ -1202,9 +1202,42 @@ def _set_encoding(self) -> None:
1202
1202
else :
1203
1203
self ._encoding = "utf-8"
1204
1204
1205
+ def _read_int8 (self ) -> int :
1206
+ return struct .unpack ("b" , self .path_or_buf .read (1 ))[0 ]
1207
+
1208
+ def _read_uint8 (self ) -> int :
1209
+ return struct .unpack ("B" , self .path_or_buf .read (1 ))[0 ]
1210
+
1211
+ def _read_uint16 (self ) -> int :
1212
+ return struct .unpack (f"{ self .byteorder } H" , self .path_or_buf .read (2 ))[0 ]
1213
+
1214
+ def _read_uint32 (self ) -> int :
1215
+ return struct .unpack (f"{ self .byteorder } I" , self .path_or_buf .read (4 ))[0 ]
1216
+
1217
+ def _read_uint64 (self ) -> int :
1218
+ return struct .unpack (f"{ self .byteorder } Q" , self .path_or_buf .read (8 ))[0 ]
1219
+
1220
+ def _read_int16 (self ) -> int :
1221
+ return struct .unpack (f"{ self .byteorder } h" , self .path_or_buf .read (2 ))[0 ]
1222
+
1223
+ def _read_int32 (self ) -> int :
1224
+ return struct .unpack (f"{ self .byteorder } i" , self .path_or_buf .read (4 ))[0 ]
1225
+
1226
+ def _read_int64 (self ) -> int :
1227
+ return struct .unpack (f"{ self .byteorder } q" , self .path_or_buf .read (8 ))[0 ]
1228
+
1229
+ def _read_char8 (self ) -> bytes :
1230
+ return struct .unpack ("c" , self .path_or_buf .read (1 ))[0 ]
1231
+
1232
+ def _read_int16_count (self , count : int ) -> tuple [int , ...]:
1233
+ return struct .unpack (
1234
+ f"{ self .byteorder } { 'h' * count } " ,
1235
+ self .path_or_buf .read (2 * count ),
1236
+ )
1237
+
1205
1238
def _read_header (self ) -> None :
1206
- first_char = self .path_or_buf . read ( 1 )
1207
- if struct . unpack ( "c" , first_char )[ 0 ] == b"<" :
1239
+ first_char = self ._read_char8 ( )
1240
+ if first_char == b"<" :
1208
1241
self ._read_new_header ()
1209
1242
else :
1210
1243
self ._read_old_header (first_char )
@@ -1224,11 +1257,9 @@ def _read_new_header(self) -> None:
1224
1257
self .path_or_buf .read (21 ) # </release><byteorder>
1225
1258
self .byteorder = ">" if self .path_or_buf .read (3 ) == b"MSF" else "<"
1226
1259
self .path_or_buf .read (15 ) # </byteorder><K>
1227
- nvar_type = "H" if self .format_version <= 118 else "I"
1228
- nvar_size = 2 if self .format_version <= 118 else 4
1229
- self .nvar = struct .unpack (
1230
- self .byteorder + nvar_type , self .path_or_buf .read (nvar_size )
1231
- )[0 ]
1260
+ self .nvar = (
1261
+ self ._read_uint16 () if self .format_version <= 118 else self ._read_uint32 ()
1262
+ )
1232
1263
self .path_or_buf .read (7 ) # </K><N>
1233
1264
1234
1265
self .nobs = self ._get_nobs ()
@@ -1240,46 +1271,27 @@ def _read_new_header(self) -> None:
1240
1271
self .path_or_buf .read (8 ) # 0x0000000000000000
1241
1272
self .path_or_buf .read (8 ) # position of <map>
1242
1273
1243
- self ._seek_vartypes = (
1244
- struct .unpack (self .byteorder + "q" , self .path_or_buf .read (8 ))[0 ] + 16
1245
- )
1246
- self ._seek_varnames = (
1247
- struct .unpack (self .byteorder + "q" , self .path_or_buf .read (8 ))[0 ] + 10
1248
- )
1249
- self ._seek_sortlist = (
1250
- struct .unpack (self .byteorder + "q" , self .path_or_buf .read (8 ))[0 ] + 10
1251
- )
1252
- self ._seek_formats = (
1253
- struct .unpack (self .byteorder + "q" , self .path_or_buf .read (8 ))[0 ] + 9
1254
- )
1255
- self ._seek_value_label_names = (
1256
- struct .unpack (self .byteorder + "q" , self .path_or_buf .read (8 ))[0 ] + 19
1257
- )
1274
+ self ._seek_vartypes = self ._read_int64 () + 16
1275
+ self ._seek_varnames = self ._read_int64 () + 10
1276
+ self ._seek_sortlist = self ._read_int64 () + 10
1277
+ self ._seek_formats = self ._read_int64 () + 9
1278
+ self ._seek_value_label_names = self ._read_int64 () + 19
1258
1279
1259
1280
# Requires version-specific treatment
1260
1281
self ._seek_variable_labels = self ._get_seek_variable_labels ()
1261
1282
1262
1283
self .path_or_buf .read (8 ) # <characteristics>
1263
- self .data_location = (
1264
- struct .unpack (self .byteorder + "q" , self .path_or_buf .read (8 ))[0 ] + 6
1265
- )
1266
- self .seek_strls = (
1267
- struct .unpack (self .byteorder + "q" , self .path_or_buf .read (8 ))[0 ] + 7
1268
- )
1269
- self .seek_value_labels = (
1270
- struct .unpack (self .byteorder + "q" , self .path_or_buf .read (8 ))[0 ] + 14
1271
- )
1284
+ self .data_location = self ._read_int64 () + 6
1285
+ self .seek_strls = self ._read_int64 () + 7
1286
+ self .seek_value_labels = self ._read_int64 () + 14
1272
1287
1273
1288
self .typlist , self .dtyplist = self ._get_dtypes (self ._seek_vartypes )
1274
1289
1275
1290
self .path_or_buf .seek (self ._seek_varnames )
1276
1291
self .varlist = self ._get_varlist ()
1277
1292
1278
1293
self .path_or_buf .seek (self ._seek_sortlist )
1279
- self .srtlist = struct .unpack (
1280
- self .byteorder + ("h" * (self .nvar + 1 )),
1281
- self .path_or_buf .read (2 * (self .nvar + 1 )),
1282
- )[:- 1 ]
1294
+ self .srtlist = self ._read_int16_count (self .nvar + 1 )[:- 1 ]
1283
1295
1284
1296
self .path_or_buf .seek (self ._seek_formats )
1285
1297
self .fmtlist = self ._get_fmtlist ()
@@ -1296,10 +1308,7 @@ def _get_dtypes(
1296
1308
) -> tuple [list [int | str ], list [str | np .dtype ]]:
1297
1309
1298
1310
self .path_or_buf .seek (seek_vartypes )
1299
- raw_typlist = [
1300
- struct .unpack (self .byteorder + "H" , self .path_or_buf .read (2 ))[0 ]
1301
- for _ in range (self .nvar )
1302
- ]
1311
+ raw_typlist = [self ._read_uint16 () for _ in range (self .nvar )]
1303
1312
1304
1313
def f (typ : int ) -> int | str :
1305
1314
if typ <= 2045 :
@@ -1368,16 +1377,16 @@ def _get_variable_labels(self) -> list[str]:
1368
1377
1369
1378
def _get_nobs (self ) -> int :
1370
1379
if self .format_version >= 118 :
1371
- return struct . unpack ( self .byteorder + "Q" , self . path_or_buf . read ( 8 ))[ 0 ]
1380
+ return self ._read_uint64 ()
1372
1381
else :
1373
- return struct . unpack ( self .byteorder + "I" , self . path_or_buf . read ( 4 ))[ 0 ]
1382
+ return self ._read_uint32 ()
1374
1383
1375
1384
def _get_data_label (self ) -> str :
1376
1385
if self .format_version >= 118 :
1377
- strlen = struct . unpack ( self .byteorder + "H" , self . path_or_buf . read ( 2 ))[ 0 ]
1386
+ strlen = self ._read_uint16 ()
1378
1387
return self ._decode (self .path_or_buf .read (strlen ))
1379
1388
elif self .format_version == 117 :
1380
- strlen = struct . unpack ( "b" , self .path_or_buf . read ( 1 ))[ 0 ]
1389
+ strlen = self ._read_int8 ()
1381
1390
return self ._decode (self .path_or_buf .read (strlen ))
1382
1391
elif self .format_version > 105 :
1383
1392
return self ._decode (self .path_or_buf .read (81 ))
@@ -1386,10 +1395,10 @@ def _get_data_label(self) -> str:
1386
1395
1387
1396
def _get_time_stamp (self ) -> str :
1388
1397
if self .format_version >= 118 :
1389
- strlen = struct . unpack ( "b" , self .path_or_buf . read ( 1 ))[ 0 ]
1398
+ strlen = self ._read_int8 ()
1390
1399
return self .path_or_buf .read (strlen ).decode ("utf-8" )
1391
1400
elif self .format_version == 117 :
1392
- strlen = struct . unpack ( "b" , self .path_or_buf . read ( 1 ))[ 0 ]
1401
+ strlen = self ._read_int8 ()
1393
1402
return self ._decode (self .path_or_buf .read (strlen ))
1394
1403
elif self .format_version > 104 :
1395
1404
return self ._decode (self .path_or_buf .read (18 ))
@@ -1404,22 +1413,20 @@ def _get_seek_variable_labels(self) -> int:
1404
1413
# variable, 20 for the closing tag and 17 for the opening tag
1405
1414
return self ._seek_value_label_names + (33 * self .nvar ) + 20 + 17
1406
1415
elif self .format_version >= 118 :
1407
- return struct . unpack ( self .byteorder + "q" , self . path_or_buf . read ( 8 ))[ 0 ] + 17
1416
+ return self ._read_int64 () + 17
1408
1417
else :
1409
1418
raise ValueError ()
1410
1419
1411
1420
def _read_old_header (self , first_char : bytes ) -> None :
1412
- self .format_version = struct . unpack ( "b" , first_char ) [0 ]
1421
+ self .format_version = int ( first_char [0 ])
1413
1422
if self .format_version not in [104 , 105 , 108 , 111 , 113 , 114 , 115 ]:
1414
1423
raise ValueError (_version_error .format (version = self .format_version ))
1415
1424
self ._set_encoding ()
1416
- self .byteorder = (
1417
- ">" if struct .unpack ("b" , self .path_or_buf .read (1 ))[0 ] == 0x1 else "<"
1418
- )
1419
- self .filetype = struct .unpack ("b" , self .path_or_buf .read (1 ))[0 ]
1425
+ self .byteorder = (">" if self ._read_int8 () == 0x1 else "<" )
1426
+ self .filetype = self ._read_int8 ()
1420
1427
self .path_or_buf .read (1 ) # unused
1421
1428
1422
- self .nvar = struct . unpack ( self .byteorder + "H" , self . path_or_buf . read ( 2 ))[ 0 ]
1429
+ self .nvar = self ._read_uint16 ()
1423
1430
self .nobs = self ._get_nobs ()
1424
1431
1425
1432
self ._data_label = self ._get_data_label ()
@@ -1428,7 +1435,7 @@ def _read_old_header(self, first_char: bytes) -> None:
1428
1435
1429
1436
# descriptors
1430
1437
if self .format_version > 108 :
1431
- typlist = [ord ( self . path_or_buf . read ( 1 )) for _ in range (self .nvar )]
1438
+ typlist = [int ( c ) for c in self . path_or_buf . read (self .nvar )]
1432
1439
else :
1433
1440
buf = self .path_or_buf .read (self .nvar )
1434
1441
typlistb = np .frombuffer (buf , dtype = np .uint8 )
@@ -1458,10 +1465,7 @@ def _read_old_header(self, first_char: bytes) -> None:
1458
1465
self .varlist = [
1459
1466
self ._decode (self .path_or_buf .read (9 )) for _ in range (self .nvar )
1460
1467
]
1461
- self .srtlist = struct .unpack (
1462
- self .byteorder + ("h" * (self .nvar + 1 )),
1463
- self .path_or_buf .read (2 * (self .nvar + 1 )),
1464
- )[:- 1 ]
1468
+ self .srtlist = self ._read_int16_count (self .nvar + 1 )[:- 1 ]
1465
1469
1466
1470
self .fmtlist = self ._get_fmtlist ()
1467
1471
@@ -1476,17 +1480,11 @@ def _read_old_header(self, first_char: bytes) -> None:
1476
1480
1477
1481
if self .format_version > 104 :
1478
1482
while True :
1479
- data_type = struct .unpack (
1480
- self .byteorder + "b" , self .path_or_buf .read (1 )
1481
- )[0 ]
1483
+ data_type = self ._read_int8 ()
1482
1484
if self .format_version > 108 :
1483
- data_len = struct .unpack (
1484
- self .byteorder + "i" , self .path_or_buf .read (4 )
1485
- )[0 ]
1485
+ data_len = self ._read_int32 ()
1486
1486
else :
1487
- data_len = struct .unpack (
1488
- self .byteorder + "h" , self .path_or_buf .read (2 )
1489
- )[0 ]
1487
+ data_len = self ._read_int16 ()
1490
1488
if data_type == 0 :
1491
1489
break
1492
1490
self .path_or_buf .read (data_len )
@@ -1570,8 +1568,8 @@ def _read_value_labels(self) -> None:
1570
1568
labname = self ._decode (self .path_or_buf .read (129 ))
1571
1569
self .path_or_buf .read (3 ) # padding
1572
1570
1573
- n = struct . unpack ( self .byteorder + "I" , self . path_or_buf . read ( 4 ))[ 0 ]
1574
- txtlen = struct . unpack ( self .byteorder + "I" , self . path_or_buf . read ( 4 ))[ 0 ]
1571
+ n = self ._read_uint32 ()
1572
+ txtlen = self ._read_uint32 ()
1575
1573
off = np .frombuffer (
1576
1574
self .path_or_buf .read (4 * n ), dtype = self .byteorder + "i4" , count = n
1577
1575
)
@@ -1599,7 +1597,7 @@ def _read_strls(self) -> None:
1599
1597
break
1600
1598
1601
1599
if self .format_version == 117 :
1602
- v_o = struct . unpack ( self .byteorder + "Q" , self . path_or_buf . read ( 8 ))[ 0 ]
1600
+ v_o = self ._read_uint64 ()
1603
1601
else :
1604
1602
buf = self .path_or_buf .read (12 )
1605
1603
# Only tested on little endian file on little endian machine.
@@ -1610,8 +1608,8 @@ def _read_strls(self) -> None:
1610
1608
# This path may not be correct, impossible to test
1611
1609
buf = buf [0 :v_size ] + buf [(4 + v_size ) :]
1612
1610
v_o = struct .unpack ("Q" , buf )[0 ]
1613
- typ = struct . unpack ( "B" , self .path_or_buf . read ( 1 ))[ 0 ]
1614
- length = struct . unpack ( self .byteorder + "I" , self . path_or_buf . read ( 4 ))[ 0 ]
1611
+ typ = self ._read_uint8 ()
1612
+ length = self ._read_uint32 ()
1615
1613
va = self .path_or_buf .read (length )
1616
1614
if typ == 130 :
1617
1615
decoded_va = va [0 :- 1 ].decode (self ._encoding )
0 commit comments