Skip to content

Commit 5a3aa8a

Browse files
Flag in the close method to abort running queries (#396)
1 parent 10fce4d commit 5a3aa8a

File tree

5 files changed

+133
-11
lines changed

5 files changed

+133
-11
lines changed

lib/postgres.dart

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,9 @@ abstract class SessionExecutor {
209209

210210
/// Closes this session, cleaning up resources and forbiding further calls to
211211
/// [prepare] and [execute].
212-
Future<void> close();
212+
/// If [force] is set to true, the session will be closed immediately, instead
213+
/// of waiting for any pending queries to finish.
214+
Future<void> close({bool force = false});
213215
}
214216

215217
abstract class Connection implements Session, SessionExecutor {

lib/src/pool/pool_impl.dart

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ class PoolImplementation<L> implements Pool<L> {
4141
Future<void> get closed => _semaphore.done;
4242

4343
@override
44-
Future<void> close() async {
44+
Future<void> close({bool force = false}) async {
45+
// TODO: Implement force close.
4546
await _semaphore.close();
4647

4748
// Connections are closed when they are returned to the pool if it's closed.
@@ -277,9 +278,11 @@ class _PoolConnection implements Connection {
277278
}
278279

279280
@override
280-
Future<void> close() async {
281+
Future<void> close({bool force = false}) async {
281282
// Don't forward the close call, the underlying connection should be re-used
282283
// when another pool connection is requested.
284+
285+
// TODO: Implement force close.
283286
}
284287

285288
@override

lib/src/v3/connection.dart

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -582,8 +582,8 @@ class PgConnectionImplementation extends _PgSessionBase implements Connection {
582582
}
583583

584584
@override
585-
Future<void> close() async {
586-
await _close(false, null);
585+
Future<void> close({bool force = false}) async {
586+
await _close(force, null);
587587
}
588588

589589
Future<void> _close(bool interruptRunning, PgException? cause,

test/pool_test.dart

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,4 +209,61 @@ void main() {
209209
expect(name, 'myapp');
210210
});
211211
});
212+
213+
group(skip: 'not implemented', 'force close', () {
214+
Future<Pool> openPool(PostgresServer server) async {
215+
final pool = Pool.withEndpoints(
216+
[await server.endpoint()],
217+
settings: PoolSettings(maxConnectionCount: 1),
218+
);
219+
addTearDown(pool.close);
220+
return pool;
221+
}
222+
223+
Future<void> expectPoolClosesForcefully(Pool pool) async {
224+
await pool
225+
.close(force: true) //
226+
// If close takes too long, the test will fail (force=true would not be working correctly)
227+
// as it would be waiting for the query to finish
228+
.timeout(Duration(seconds: 1));
229+
expect(pool.isOpen, isFalse);
230+
}
231+
232+
Future<void> runLongQuery(Session session) {
233+
return session.execute('select pg_sleep(10) from pg_stat_activity;');
234+
}
235+
236+
withPostgresServer('pool session', (server) {
237+
test('', () async {
238+
final pool = await openPool(server);
239+
// ignore: unawaited_futures
240+
runLongQuery(pool);
241+
// let it start
242+
await Future.delayed(const Duration(milliseconds: 100));
243+
await expectPoolClosesForcefully(pool);
244+
});
245+
});
246+
247+
withPostgresServer('tx session', (server) {
248+
test('', () async {
249+
final pool = await openPool(server);
250+
// Ignore async error, it will fail when the connection is closed and it tries to do COMMIT
251+
pool.runTx(runLongQuery).ignore();
252+
// let it start
253+
await Future.delayed(const Duration(milliseconds: 100));
254+
await expectPoolClosesForcefully(pool);
255+
});
256+
});
257+
258+
withPostgresServer('run session', (server) {
259+
test('', () async {
260+
final pool = await openPool(server);
261+
// ignore: unawaited_futures
262+
pool.run(runLongQuery);
263+
// let it start
264+
await Future.delayed(const Duration(milliseconds: 100));
265+
await expectPoolClosesForcefully(pool);
266+
});
267+
});
268+
});
212269
}

test/v3_close_test.dart

Lines changed: 66 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,23 @@ void main() {
88
late Connection conn1;
99
late Connection conn2;
1010

11+
const conn1Name = 'conn1';
12+
const conn2Name = 'conn2';
13+
1114
setUp(() async {
1215
conn1 = await Connection.open(
1316
await server.endpoint(),
1417
settings: ConnectionSettings(
15-
//transformer: _loggingTransformer('c1'),
16-
),
18+
applicationName: conn1Name,
19+
//transformer: _loggingTransformer('c1'),
20+
),
1721
);
1822

1923
conn2 = await Connection.open(
2024
await server.endpoint(),
25+
settings: ConnectionSettings(
26+
applicationName: conn2Name,
27+
),
2128
);
2229
});
2330

@@ -30,9 +37,8 @@ void main() {
3037
test(
3138
'with concurrent query: $concurrentQuery',
3239
() async {
33-
final endpoint = await server.endpoint();
3440
final res = await conn2.execute(
35-
"SELECT pid FROM pg_stat_activity where usename = '${endpoint.username}';");
41+
"SELECT pid FROM pg_stat_activity where application_name = '$conn1Name';");
3642
final conn1PID = res.first.first as int;
3743

3844
// Simulate issue by terminating a connection during a query
@@ -49,10 +55,9 @@ void main() {
4955
}
5056

5157
test('with simple query protocol', () async {
52-
final endpoint = await server.endpoint();
5358
// Get the PID for conn1
5459
final res = await conn2.execute(
55-
"SELECT pid FROM pg_stat_activity where usename = '${endpoint.username}';");
60+
"SELECT pid FROM pg_stat_activity where application_name = '$conn1Name';");
5661
final conn1PID = res.first.first as int;
5762

5863
// ignore: unawaited_futures
@@ -65,6 +70,61 @@ void main() {
6570
'select pg_terminate_backend($conn1PID) from pg_stat_activity;');
6671
});
6772
});
73+
74+
group('force close', () {
75+
Future<Connection> openConnection(PostgresServer server) async {
76+
final conn = await Connection.open(await server.endpoint());
77+
addTearDown(conn.close);
78+
return conn;
79+
}
80+
81+
Future<void> expectConn1ClosesForcefully(Connection conn) async {
82+
await conn
83+
.close(force: true) //
84+
// If close takes too long, the test will fail (force=true would not be working correctly)
85+
// as it would be waiting for the query to finish
86+
.timeout(Duration(seconds: 1));
87+
expect(conn.isOpen, isFalse);
88+
}
89+
90+
Future<void> runLongQuery(Session session) {
91+
return session.execute('select pg_sleep(10) from pg_stat_activity;');
92+
}
93+
94+
withPostgresServer('connection session', (server) {
95+
test('', () async {
96+
final conn = await openConnection(server);
97+
// ignore: unawaited_futures
98+
runLongQuery(conn);
99+
// let it start
100+
await Future.delayed(const Duration(milliseconds: 100));
101+
await expectConn1ClosesForcefully(conn);
102+
});
103+
});
104+
105+
withPostgresServer('tx session', (server) {
106+
test('', () async {
107+
final conn = await openConnection(server);
108+
// ignore: unawaited_futures
109+
// Ignore async error, it will fail when the connection is closed and it tries to do COMMIT
110+
conn.runTx(runLongQuery).ignore();
111+
// let it start
112+
await Future.delayed(const Duration(milliseconds: 100));
113+
await expectConn1ClosesForcefully(conn);
114+
});
115+
});
116+
117+
withPostgresServer('run session', (server) {
118+
test('', () async {
119+
final conn = await openConnection(server);
120+
// ignore: unawaited_futures
121+
conn.run(runLongQuery);
122+
// let it start
123+
await Future.delayed(const Duration(milliseconds: 100));
124+
await expectConn1ClosesForcefully(conn);
125+
});
126+
});
127+
});
68128
}
69129

70130
final _isPostgresException = isA<PgException>();

0 commit comments

Comments
 (0)