Skip to content

Commit 7038e00

Browse files
committed
add cwe 404 test
1 parent 0b599bd commit 7038e00

File tree

1 file changed

+147
-0
lines changed

1 file changed

+147
-0
lines changed

tests/test_asyncio/test_cwe_404.py

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
import asyncio
2+
import sys
3+
4+
import pytest
5+
6+
from redis.asyncio import Redis
7+
from redis.asyncio.cluster import RedisCluster
8+
9+
10+
async def pipe(
11+
reader: asyncio.StreamReader, writer: asyncio.StreamWriter, delay: float, name=""
12+
):
13+
while True:
14+
data = await reader.read(1000)
15+
if not data:
16+
break
17+
await asyncio.sleep(delay)
18+
writer.write(data)
19+
await writer.drain()
20+
21+
22+
class DelayProxy:
23+
def __init__(self, addr, redis_addr, delay: float):
24+
self.addr = addr
25+
self.redis_addr = redis_addr
26+
self.delay = delay
27+
28+
async def start(self):
29+
self.server = await asyncio.start_server(self.handle, *self.addr)
30+
self.ROUTINE = asyncio.create_task(self.server.serve_forever())
31+
32+
async def handle(self, reader, writer):
33+
# establish connection to redis
34+
redis_reader, redis_writer = await asyncio.open_connection(*self.redis_addr)
35+
pipe1 = asyncio.create_task(pipe(reader, redis_writer, self.delay, "to redis:"))
36+
pipe2 = asyncio.create_task(
37+
pipe(redis_reader, writer, self.delay, "from redis:")
38+
)
39+
await asyncio.gather(pipe1, pipe2)
40+
41+
async def stop(self):
42+
# clean up enough so that we can reuse the looper
43+
self.ROUTINE.cancel()
44+
loop = self.server.get_loop()
45+
await loop.shutdown_asyncgens()
46+
47+
48+
@pytest.mark.onlynoncluster
49+
@pytest.mark.parametrize("delay", argvalues=[0.05, 0.5, 1, 2])
50+
async def test_standalone(delay):
51+
52+
# create a tcp socket proxy that relays data to Redis and back,
53+
# inserting 0.1 seconds of delay
54+
dp = DelayProxy(
55+
addr=("localhost", 5380), redis_addr=("localhost", 6379), delay=delay * 2
56+
)
57+
await dp.start()
58+
59+
for b in [True, False]:
60+
# note that we connect to proxy, rather than to Redis directly
61+
async with Redis(host="localhost", port=5380, single_connection_client=b) as r:
62+
63+
await r.set("foo", "foo")
64+
await r.set("bar", "bar")
65+
66+
t = asyncio.create_task(r.get("foo"))
67+
await asyncio.sleep(delay)
68+
t.cancel()
69+
try:
70+
await t
71+
sys.stderr.write("try again, we did not cancel the task in time\n")
72+
except asyncio.CancelledError:
73+
sys.stderr.write(
74+
"canceled task, connection is left open with unread response\n"
75+
)
76+
77+
assert await r.get("bar") == b"bar"
78+
assert await r.ping()
79+
assert await r.get("foo") == b"foo"
80+
81+
await dp.stop()
82+
83+
84+
@pytest.mark.onlynoncluster
85+
@pytest.mark.parametrize("delay", argvalues=[0.05, 0.5, 1, 2])
86+
async def test_standalone_pipeline(delay):
87+
dp = DelayProxy(
88+
addr=("localhost", 5380), redis_addr=("localhost", 6379), delay=delay * 2
89+
)
90+
await dp.start()
91+
for b in [True, False]:
92+
async with Redis(host="localhost", port=5380, single_connection_client=b) as r:
93+
await r.set("foo", "foo")
94+
await r.set("bar", "bar")
95+
96+
pipe = r.pipeline()
97+
98+
pipe2 = r.pipeline()
99+
pipe2.get("bar")
100+
pipe2.ping()
101+
pipe2.get("foo")
102+
103+
t = asyncio.create_task(pipe.get("foo").execute())
104+
await asyncio.sleep(delay)
105+
t.cancel()
106+
107+
pipe.get("bar")
108+
pipe.ping()
109+
pipe.get("foo")
110+
pipe.reset()
111+
112+
assert await pipe.execute() is None
113+
114+
# validating that the pipeline can be used as it could previously
115+
pipe.get("bar")
116+
pipe.ping()
117+
pipe.get("foo")
118+
assert await pipe.execute() == [b"bar", True, b"foo"]
119+
assert await pipe2.execute() == [b"bar", True, b"foo"]
120+
121+
await dp.stop()
122+
123+
124+
@pytest.mark.onlycluster
125+
async def test_cluster(request):
126+
127+
dp = DelayProxy(addr=("localhost", 5381), redis_addr=("localhost", 6372), delay=0.1)
128+
await dp.start()
129+
130+
r = RedisCluster.from_url("redis://localhost:5381")
131+
await r.initialize()
132+
await r.set("foo", "foo")
133+
await r.set("bar", "bar")
134+
135+
t = asyncio.create_task(r.get("foo"))
136+
await asyncio.sleep(0.050)
137+
t.cancel()
138+
try:
139+
await t
140+
except asyncio.CancelledError:
141+
pytest.fail("connection is left open with unread response")
142+
143+
assert await r.get("bar") == b"bar"
144+
assert await r.ping()
145+
assert await r.get("foo") == b"foo"
146+
147+
await dp.stop()

0 commit comments

Comments
 (0)