|
33 | 33 | check_disallow_instantiation,
|
34 | 34 | threading_helper,
|
35 | 35 | )
|
| 36 | +from _testcapi import INT_MAX |
| 37 | +from os import SEEK_SET, SEEK_CUR, SEEK_END |
36 | 38 | from test.support.os_helper import TESTFN, unlink, temp_dir
|
37 | 39 |
|
38 | 40 |
|
@@ -1041,11 +1043,163 @@ def test_same_query_in_multiple_cursors(self):
|
1041 | 1043 | self.assertEqual(cu.fetchall(), [(1,)])
|
1042 | 1044 |
|
1043 | 1045 |
|
| 1046 | +class BlobTests(unittest.TestCase): |
| 1047 | + def setUp(self): |
| 1048 | + self.cx = sqlite.connect(":memory:") |
| 1049 | + self.cx.execute("create table test(b blob)") |
| 1050 | + self.data = b"this blob data string is exactly fifty bytes long!" |
| 1051 | + self.cx.execute("insert into test(b) values (?)", (self.data,)) |
| 1052 | + self.blob = self.cx.blobopen("test", "b", 1) |
| 1053 | + |
| 1054 | + def tearDown(self): |
| 1055 | + self.blob.close() |
| 1056 | + self.cx.close() |
| 1057 | + |
| 1058 | + def test_blob_seek_and_tell(self): |
| 1059 | + self.blob.seek(10) |
| 1060 | + self.assertEqual(self.blob.tell(), 10) |
| 1061 | + |
| 1062 | + self.blob.seek(10, SEEK_SET) |
| 1063 | + self.assertEqual(self.blob.tell(), 10) |
| 1064 | + |
| 1065 | + self.blob.seek(10, SEEK_CUR) |
| 1066 | + self.assertEqual(self.blob.tell(), 20) |
| 1067 | + |
| 1068 | + self.blob.seek(-10, SEEK_END) |
| 1069 | + self.assertEqual(self.blob.tell(), 40) |
| 1070 | + |
| 1071 | + def test_blob_seek_error(self): |
| 1072 | + msg_oor = "offset out of blob range" |
| 1073 | + msg_orig = "'origin' should be os.SEEK_SET, os.SEEK_CUR, or os.SEEK_END" |
| 1074 | + msg_of = "seek offset results in overflow" |
| 1075 | + |
| 1076 | + dataset = ( |
| 1077 | + (ValueError, msg_oor, lambda: self.blob.seek(1000)), |
| 1078 | + (ValueError, msg_oor, lambda: self.blob.seek(-10)), |
| 1079 | + (ValueError, msg_orig, lambda: self.blob.seek(10, -1)), |
| 1080 | + (ValueError, msg_orig, lambda: self.blob.seek(10, 3)), |
| 1081 | + ) |
| 1082 | + for exc, msg, fn in dataset: |
| 1083 | + with self.subTest(exc=exc, msg=msg, fn=fn): |
| 1084 | + self.assertRaisesRegex(exc, msg, fn) |
| 1085 | + |
| 1086 | + # Force overflow errors |
| 1087 | + self.blob.seek(1, SEEK_SET) |
| 1088 | + with self.assertRaisesRegex(OverflowError, msg_of): |
| 1089 | + self.blob.seek(INT_MAX, SEEK_CUR) |
| 1090 | + with self.assertRaisesRegex(OverflowError, msg_of): |
| 1091 | + self.blob.seek(INT_MAX, SEEK_END) |
| 1092 | + |
| 1093 | + def test_blob_read(self): |
| 1094 | + buf = self.blob.read() |
| 1095 | + self.assertEqual(buf, self.data) |
| 1096 | + |
| 1097 | + def test_blob_read_oversized(self): |
| 1098 | + buf = self.blob.read(len(self.data) * 2) |
| 1099 | + self.assertEqual(buf, self.data) |
| 1100 | + |
| 1101 | + def test_blob_read_advance_offset(self): |
| 1102 | + n = 10 |
| 1103 | + buf = self.blob.read(n) |
| 1104 | + self.assertEqual(buf, self.data[:n]) |
| 1105 | + self.assertEqual(self.blob.tell(), n) |
| 1106 | + |
| 1107 | + def test_blob_read_at_offset(self): |
| 1108 | + self.blob.seek(10) |
| 1109 | + self.assertEqual(self.blob.read(10), self.data[10:20]) |
| 1110 | + |
| 1111 | + def test_blob_read_error_row_changed(self): |
| 1112 | + self.cx.execute("update test set b='aaaa' where rowid=1") |
| 1113 | + with self.assertRaises(sqlite.OperationalError): |
| 1114 | + self.blob.read() |
| 1115 | + |
| 1116 | + def test_blob_write(self): |
| 1117 | + new_data = b"new data".ljust(50) |
| 1118 | + self.blob.write(new_data) |
| 1119 | + row = self.cx.execute("select b from test").fetchone() |
| 1120 | + self.assertEqual(row[0], new_data) |
| 1121 | + |
| 1122 | + def test_blob_write_at_offset(self): |
| 1123 | + new_data = b"c" * 25 |
| 1124 | + self.blob.seek(25) |
| 1125 | + self.blob.write(new_data) |
| 1126 | + row = self.cx.execute("select b from test").fetchone() |
| 1127 | + self.assertEqual(row[0], self.data[:25] + new_data) |
| 1128 | + |
| 1129 | + def test_blob_write_advance_offset(self): |
| 1130 | + self.blob.write(b"d"*10) |
| 1131 | + self.assertEqual(self.blob.tell(), 10) |
| 1132 | + |
| 1133 | + def test_blob_write_error_length(self): |
| 1134 | + with self.assertRaisesRegex(ValueError, "data longer than blob"): |
| 1135 | + self.blob.write(b"a" * 1000) |
| 1136 | + |
| 1137 | + def test_blob_write_error_row_changed(self): |
| 1138 | + self.cx.execute("update test set b='aaaa' where rowid=1") |
| 1139 | + with self.assertRaises(sqlite.OperationalError): |
| 1140 | + self.blob.write(b"aaa") |
| 1141 | + |
| 1142 | + def test_blob_write_error_readonly(self): |
| 1143 | + ro_blob = self.cx.blobopen("test", "b", 1, readonly=True) |
| 1144 | + with self.assertRaisesRegex(sqlite.OperationalError, "readonly"): |
| 1145 | + ro_blob.write(b"aaa") |
| 1146 | + ro_blob.close() |
| 1147 | + |
| 1148 | + def test_blob_open_error(self): |
| 1149 | + dataset = ( |
| 1150 | + (("test", "b", 1), {"name": "notexisting"}), |
| 1151 | + (("notexisting", "b", 1), {}), |
| 1152 | + (("test", "notexisting", 1), {}), |
| 1153 | + (("test", "b", 2), {}), |
| 1154 | + ) |
| 1155 | + regex = "no such" |
| 1156 | + for args, kwds in dataset: |
| 1157 | + with self.subTest(args=args, kwds=kwds): |
| 1158 | + with self.assertRaisesRegex(sqlite.OperationalError, regex): |
| 1159 | + self.cx.blobopen(*args, **kwds) |
| 1160 | + |
| 1161 | + def test_blob_sequence_not_supported(self): |
| 1162 | + with self.assertRaises(TypeError): |
| 1163 | + self.blob + self.blob |
| 1164 | + with self.assertRaises(TypeError): |
| 1165 | + self.blob * 5 |
| 1166 | + with self.assertRaises(TypeError): |
| 1167 | + b"a" in self.blob |
| 1168 | + |
| 1169 | + def test_blob_closed(self): |
| 1170 | + with memory_database() as cx: |
| 1171 | + cx.execute("create table test(b blob)") |
| 1172 | + cx.execute("insert into test values (zeroblob(100))") |
| 1173 | + blob = cx.blobopen("test", "b", 1) |
| 1174 | + blob.close() |
| 1175 | + |
| 1176 | + msg = "Cannot operate on a closed blob" |
| 1177 | + with self.assertRaisesRegex(sqlite.ProgrammingError, msg): |
| 1178 | + blob.read() |
| 1179 | + with self.assertRaisesRegex(sqlite.ProgrammingError, msg): |
| 1180 | + blob.write(b"") |
| 1181 | + with self.assertRaisesRegex(sqlite.ProgrammingError, msg): |
| 1182 | + blob.seek(0) |
| 1183 | + with self.assertRaisesRegex(sqlite.ProgrammingError, msg): |
| 1184 | + blob.tell() |
| 1185 | + |
| 1186 | + def test_blob_closed_db_read(self): |
| 1187 | + with memory_database() as cx: |
| 1188 | + cx.execute("create table test(b blob)") |
| 1189 | + cx.execute("insert into test(b) values (zeroblob(100))") |
| 1190 | + blob = cx.blobopen("test", "b", 1) |
| 1191 | + cx.close() |
| 1192 | + self.assertRaisesRegex(sqlite.ProgrammingError, |
| 1193 | + "Cannot operate on a closed database", |
| 1194 | + blob.read) |
| 1195 | + |
| 1196 | + |
1044 | 1197 | class ThreadTests(unittest.TestCase):
|
1045 | 1198 | def setUp(self):
|
1046 | 1199 | self.con = sqlite.connect(":memory:")
|
1047 | 1200 | self.cur = self.con.cursor()
|
1048 |
| - self.cur.execute("create table test(name text)") |
| 1201 | + self.cur.execute("create table test(name text, b blob)") |
| 1202 | + self.cur.execute("insert into test values('blob', zeroblob(1))") |
1049 | 1203 |
|
1050 | 1204 | def tearDown(self):
|
1051 | 1205 | self.cur.close()
|
@@ -1080,6 +1234,7 @@ def test_check_connection_thread(self):
|
1080 | 1234 | lambda: self.con.create_collation("foo", None),
|
1081 | 1235 | lambda: self.con.setlimit(sqlite.SQLITE_LIMIT_LENGTH, -1),
|
1082 | 1236 | lambda: self.con.getlimit(sqlite.SQLITE_LIMIT_LENGTH),
|
| 1237 | + lambda: self.con.blobopen("test", "b", 1), |
1083 | 1238 | ]
|
1084 | 1239 | if hasattr(sqlite.Connection, "serialize"):
|
1085 | 1240 | fns.append(lambda: self.con.serialize())
|
|
0 commit comments