1
1
import asyncio
2
+ import contextlib
2
3
import sys
3
4
4
5
import pytest
8
9
9
10
10
11
async def pipe (
11
- reader : asyncio .StreamReader , writer : asyncio .StreamWriter , delay : float , name = ""
12
+ reader : asyncio .StreamReader ,
13
+ writer : asyncio .StreamWriter ,
14
+ proxy : "DelayProxy" ,
15
+ name = "" ,
16
+ event : asyncio .Event = None ,
12
17
):
13
18
while True :
14
19
data = await reader .read (1000 )
15
20
if not data :
16
21
break
17
- await asyncio .sleep (delay )
22
+ if event :
23
+ event .set ()
24
+ await asyncio .sleep (proxy .delay )
18
25
writer .write (data )
19
26
await writer .drain ()
20
27
@@ -24,18 +31,32 @@ def __init__(self, addr, redis_addr, delay: float):
24
31
self .addr = addr
25
32
self .redis_addr = redis_addr
26
33
self .delay = delay
34
+ self .send_event = asyncio .Event ()
27
35
28
36
async def start (self ):
29
37
self .server = await asyncio .start_server (self .handle , * self .addr )
30
38
self .ROUTINE = asyncio .create_task (self .server .serve_forever ())
31
39
40
+ @contextlib .contextmanager
41
+ def override (self , delay : float = 0.0 ):
42
+ """
43
+ Allow to override the delay for parts of tests which aren't time dependent,
44
+ to speed up execution.
45
+ """
46
+ old = self .delay
47
+ self .delay = delay
48
+ try :
49
+ yield
50
+ finally :
51
+ self .delay = old
52
+
32
53
async def handle (self , reader , writer ):
33
54
# establish connection to redis
34
55
redis_reader , redis_writer = await asyncio .open_connection (* self .redis_addr )
35
- pipe1 = asyncio .create_task (pipe (reader , redis_writer , self .delay , "to redis:" ))
36
- pipe2 = asyncio .create_task (
37
- pipe (redis_reader , writer , self .delay , "from redis:" )
56
+ pipe1 = asyncio .create_task (
57
+ pipe (reader , redis_writer , self , "to redis:" , self .send_event )
38
58
)
59
+ pipe2 = asyncio .create_task (pipe (redis_reader , writer , self , "from redis:" ))
39
60
await asyncio .gather (pipe1 , pipe2 )
40
61
41
62
async def stop (self ):
@@ -60,23 +81,26 @@ async def test_standalone(delay):
60
81
# note that we connect to proxy, rather than to Redis directly
61
82
async with Redis (host = "localhost" , port = 5380 , single_connection_client = b ) as r :
62
83
63
- await r .set ("foo" , "foo" )
64
- await r .set ("bar" , "bar" )
84
+ with dp .override ():
85
+ await r .set ("foo" , "foo" )
86
+ await r .set ("bar" , "bar" )
65
87
88
+ dp .send_event .clear ()
66
89
t = asyncio .create_task (r .get ("foo" ))
67
- await asyncio .sleep (delay )
90
+ # wait until the task has sent, and then some, to make sure it has settled on
91
+ # reading.
92
+ await dp .send_event .wait ()
93
+ await asyncio .sleep (0.05 )
68
94
t .cancel ()
69
- try :
95
+ with pytest . raises ( asyncio . CancelledError ) :
70
96
await t
71
- sys .stderr .write ("try again, we did not cancel the task in time\n " )
72
- except asyncio .CancelledError :
73
- sys .stderr .write (
74
- "canceled task, connection is left open with unread response\n "
75
- )
76
97
77
- assert await r .get ("bar" ) == b"bar"
78
- assert await r .ping ()
79
- assert await r .get ("foo" ) == b"foo"
98
+ # make sure that our previous request, cancelled while waiting for a repsponse,
99
+ # didn't leave the connection in a bad state
100
+ with dp .override ():
101
+ assert await r .get ("bar" ) == b"bar"
102
+ assert await r .ping ()
103
+ assert await r .get ("foo" ) == b"foo"
80
104
81
105
await dp .stop ()
82
106
@@ -90,8 +114,9 @@ async def test_standalone_pipeline(delay):
90
114
await dp .start ()
91
115
for b in [True , False ]:
92
116
async with Redis (host = "localhost" , port = 5380 , single_connection_client = b ) as r :
93
- await r .set ("foo" , "foo" )
94
- await r .set ("bar" , "bar" )
117
+ with dp .override ():
118
+ await r .set ("foo" , "foo" )
119
+ await r .set ("bar" , "bar" )
95
120
96
121
pipe = r .pipeline ()
97
122
@@ -100,23 +125,32 @@ async def test_standalone_pipeline(delay):
100
125
pipe2 .ping ()
101
126
pipe2 .get ("foo" )
102
127
128
+ dp .send_event .clear ()
103
129
t = asyncio .create_task (pipe .get ("foo" ).execute ())
104
- await asyncio .sleep (delay )
130
+ # wait until task has settled on the read
131
+ await dp .send_event .wait ()
132
+ await asyncio .sleep (0.05 )
105
133
t .cancel ()
134
+ with pytest .raises (asyncio .CancelledError ):
135
+ await t
106
136
107
- pipe .get ("bar" )
108
- pipe .ping ()
109
- pipe .get ("foo" )
110
- pipe .reset ()
137
+ # we have now cancelled the pieline in the middle of a request, make sure
138
+ # that the connection is still usable
139
+ with dp .override ():
140
+ pipe .get ("bar" )
141
+ pipe .ping ()
142
+ pipe .get ("foo" )
143
+ await pipe .reset ()
111
144
112
- assert await pipe .execute () is None
145
+ # check that the pipeline is empty after reset
146
+ assert await pipe .execute () == []
113
147
114
- # validating that the pipeline can be used as it could previously
115
- pipe .get ("bar" )
116
- pipe .ping ()
117
- pipe .get ("foo" )
118
- assert await pipe .execute () == [b"bar" , True , b"foo" ]
119
- assert await pipe2 .execute () == [b"bar" , True , b"foo" ]
148
+ # validating that the pipeline can be used as it could previously
149
+ pipe .get ("bar" )
150
+ pipe .ping ()
151
+ pipe .get ("foo" )
152
+ assert await pipe .execute () == [b"bar" , True , b"foo" ]
153
+ assert await pipe2 .execute () == [b"bar" , True , b"foo" ]
120
154
121
155
await dp .stop ()
122
156
@@ -129,19 +163,21 @@ async def test_cluster(request):
129
163
130
164
r = RedisCluster .from_url ("redis://localhost:5381" )
131
165
await r .initialize ()
132
- await r .set ("foo" , "foo" )
133
- await r .set ("bar" , "bar" )
166
+ with dp .override ():
167
+ await r .set ("foo" , "foo" )
168
+ await r .set ("bar" , "bar" )
134
169
170
+ dp .send_event .clear ()
135
171
t = asyncio .create_task (r .get ("foo" ))
136
- await asyncio .sleep (0.050 )
172
+ await dp .send_event .wait ()
173
+ await asyncio .sleep (0.05 )
137
174
t .cancel ()
138
- try :
175
+ with pytest . raises ( asyncio . CancelledError ) :
139
176
await t
140
- except asyncio .CancelledError :
141
- pytest .fail ("connection is left open with unread response" )
142
177
143
- assert await r .get ("bar" ) == b"bar"
144
- assert await r .ping ()
145
- assert await r .get ("foo" ) == b"foo"
178
+ with dp .override ():
179
+ assert await r .get ("bar" ) == b"bar"
180
+ assert await r .ping ()
181
+ assert await r .get ("foo" ) == b"foo"
146
182
147
183
await dp .stop ()
0 commit comments