|
1 | 1 | import logging
|
2 |
| -import re |
3 | 2 | import socket
|
4 | 3 | import socketserver
|
5 | 4 | import ssl
|
|
8 | 7 | import pytest
|
9 | 8 | from redis.connection import Connection, SSLConnection, UnixDomainSocketConnection
|
10 | 9 |
|
| 10 | +from . import resp |
11 | 11 | from .ssl_utils import get_ssl_filename
|
12 | 12 |
|
13 | 13 | _logger = logging.getLogger(__name__)
|
14 | 14 |
|
15 | 15 |
|
16 | 16 | _CLIENT_NAME = "test-suite-client"
|
17 |
| -_CMD_SEP = b"\r\n" |
18 |
| -_SUCCESS_RESP = b"+OK" + _CMD_SEP |
19 |
| -_ERROR_RESP = b"-ERR" + _CMD_SEP |
20 |
| -_SUPPORTED_CMDS = {f"CLIENT SETNAME {_CLIENT_NAME}": _SUCCESS_RESP} |
21 | 17 |
|
22 | 18 |
|
23 | 19 | @pytest.fixture
|
@@ -148,44 +144,31 @@ def finish(self):
|
148 | 144 | _logger.info("%s disconnected", self.client_address)
|
149 | 145 |
|
150 | 146 | def handle(self):
|
| 147 | + parser = resp.RespParser() |
| 148 | + server = resp.RespServer() |
151 | 149 | buffer = b""
|
152 |
| - command = None |
153 |
| - command_ptr = None |
154 |
| - fragment_length = None |
155 |
| - while self.server.is_serving() or buffer: |
156 |
| - try: |
157 |
| - buffer += self.request.recv(1024) |
158 |
| - except socket.timeout: |
159 |
| - continue |
160 |
| - if not buffer: |
161 |
| - continue |
162 |
| - parts = re.split(_CMD_SEP, buffer) |
163 |
| - buffer = parts[-1] |
164 |
| - for fragment in parts[:-1]: |
165 |
| - fragment = fragment.decode() |
166 |
| - _logger.info("Command fragment: %s", fragment) |
167 |
| - |
168 |
| - if fragment.startswith("*") and command is None: |
169 |
| - command = [None for _ in range(int(fragment[1:]))] |
170 |
| - command_ptr = 0 |
171 |
| - fragment_length = None |
172 |
| - continue |
173 |
| - |
174 |
| - if fragment.startswith("$") and command[command_ptr] is None: |
175 |
| - fragment_length = int(fragment[1:]) |
176 |
| - continue |
177 |
| - |
178 |
| - assert len(fragment) == fragment_length |
179 |
| - command[command_ptr] = fragment |
180 |
| - command_ptr += 1 |
181 |
| - |
182 |
| - if command_ptr < len(command): |
| 150 | + try: |
| 151 | + # if client performs pipelining, we may need |
| 152 | + # to adjust this code to not block when sending |
| 153 | + # responses. |
| 154 | + while self.server.is_serving(): |
| 155 | + try: |
| 156 | + command = parser.parse(buffer) |
| 157 | + buffer = b"" |
| 158 | + except resp.NeedMoreData: |
| 159 | + try: |
| 160 | + buffer = self.request.recv(1024) |
| 161 | + except socket.timeout: |
| 162 | + buffer = b"" |
| 163 | + continue |
| 164 | + if not buffer: |
| 165 | + break # EOF |
183 | 166 | continue
|
184 |
| - |
185 |
| - command = " ".join(command) |
186 | 167 | _logger.info("Command %s", command)
|
187 |
| - resp = _SUPPORTED_CMDS.get(command, _ERROR_RESP) |
188 |
| - _logger.info("Response %s", resp) |
189 |
| - self.request.sendall(resp) |
190 |
| - command = None |
191 |
| - _logger.info("Exit handler") |
| 168 | + response = server.command(command) |
| 169 | + _logger.info("Response %s", response) |
| 170 | + self.request.sendall(response) |
| 171 | + except Exception: |
| 172 | + _logger.exception("Exception in handler") |
| 173 | + finally: |
| 174 | + _logger.info("Exit handler") |
0 commit comments