Skip to content

Commit 59676e2

Browse files
authored
Merge 87a31b2 into dbaac0d
2 parents dbaac0d + 87a31b2 commit 59676e2

37 files changed

+646
-274
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
from typing import Any, Dict, List
2+
3+
from algoliasearch.http.base_config import BaseConfig
4+
from algoliasearch.http.hosts import Host
5+
from algoliasearch.http.request_options import RequestOptions
6+
from algoliasearch.http.retry import RetryStrategy
7+
8+
9+
class BaseTransporter:
10+
_config: BaseConfig
11+
_retry_strategy: RetryStrategy
12+
_hosts: List[Host]
13+
14+
def __init__(self, config: BaseConfig) -> None:
15+
self._config = config
16+
self._retry_strategy = RetryStrategy()
17+
self._hosts = []
18+
19+
def prepare(
20+
self,
21+
request_options: RequestOptions,
22+
use_read_transporter: bool,
23+
) -> Dict[str, Any]:
24+
query_parameters = dict(request_options.query_parameters)
25+
26+
if use_read_transporter:
27+
self._timeout = request_options.timeouts["read"]
28+
self._hosts = self._config.hosts.read()
29+
if isinstance(request_options.data, dict):
30+
query_parameters.update(request_options.data)
31+
return query_parameters
32+
33+
self._timeout = request_options.timeouts["write"]
34+
self._hosts = self._config.hosts.write()
35+
36+
def build_path(self, path, query_parameters):
37+
if query_parameters is not None and len(query_parameters) > 0:
38+
return "{}?{}".format(
39+
path,
40+
"&".join(
41+
[
42+
"{}={}".format(key, value)
43+
for key, value in query_parameters.items()
44+
]
45+
),
46+
)
47+
return path
48+
49+
def build_url(self, host, path):
50+
return "{}://{}{}".format(
51+
host.scheme,
52+
host.url + (":{}".format(host.port) if host.port else ""),
53+
path,
54+
)
55+
56+
def get_proxy(self, url):
57+
if url.startswith("https"):
58+
return self._config.proxies.get("https")
59+
elif url.startswith("http"):
60+
return self._config.proxies.get("http")
61+
else:
62+
return None

clients/algoliasearch-client-python/algoliasearch/http/helpers.py

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# coding: utf-8
22

3-
from asyncio import sleep
3+
import asyncio
4+
import time
45
from typing import Callable, TypeVar
56

67
T = TypeVar("T")
@@ -45,7 +46,39 @@ async def retry(prev: T = None) -> T:
4546
raise Exception("An error occurred")
4647
raise Exception(error_message(resp))
4748

48-
await sleep(timeout())
49+
await asyncio.sleep(timeout())
4950
return await retry(resp)
5051

5152
return await retry()
53+
54+
55+
def create_iterable_sync(
56+
func: Callable[[T], T],
57+
validate: Callable[[T], bool],
58+
aggregator: Callable[[T], None],
59+
timeout: Timeout = Timeout(),
60+
error_validate: Callable[[T], bool] = None,
61+
error_message: Callable[[T], str] = None,
62+
) -> T:
63+
"""
64+
Helper: Iterates until the given `func` until `timeout` or `validate`.
65+
"""
66+
67+
def retry(prev: T = None) -> T:
68+
resp = func(prev)
69+
70+
if aggregator:
71+
aggregator(resp)
72+
73+
if validate(resp):
74+
return resp
75+
76+
if error_validate is not None and error_validate(resp):
77+
if error_message is None:
78+
raise Exception("An error occurred")
79+
raise Exception(error_message(resp))
80+
81+
time.sleep(timeout())
82+
return retry(resp)
83+
84+
return retry()

clients/algoliasearch-client-python/algoliasearch/http/transporter.py

Lines changed: 7 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,30 @@
11
from asyncio import TimeoutError
22
from json import loads
3-
from typing import Any, Dict, List
43

54
from aiohttp import ClientSession, TCPConnector
65
from async_timeout import timeout
76

87
from algoliasearch.http.api_response import ApiResponse
98
from algoliasearch.http.base_config import BaseConfig
9+
from algoliasearch.http.base_transporter import BaseTransporter
1010
from algoliasearch.http.exceptions import (
1111
AlgoliaUnreachableHostException,
1212
RequestException,
1313
)
14-
from algoliasearch.http.hosts import Host
1514
from algoliasearch.http.request_options import RequestOptions
1615
from algoliasearch.http.retry import RetryOutcome, RetryStrategy
1716
from algoliasearch.http.verb import Verb
1817

1918

20-
class Transporter:
21-
_config: BaseConfig
22-
_retry_strategy: RetryStrategy
19+
class Transporter(BaseTransporter):
2320
_session: ClientSession
24-
_hosts: List[Host]
2521

2622
def __init__(self, config: BaseConfig) -> None:
23+
self._session = None
2724
self._config = config
2825
self._retry_strategy = RetryStrategy()
29-
self._session = None
3026
self._hosts = []
3127

32-
async def __aenter__(self) -> None:
33-
return self
34-
35-
async def __aexit__(self, exc_type, exc_value, traceback) -> None:
36-
await self.close()
37-
38-
async def close(self) -> None:
39-
if self._session is not None:
40-
session = self._session
41-
self._session = None
42-
43-
await session.close()
44-
45-
def __enter__(self) -> None:
46-
return self
47-
48-
def __exit__(self, exc_type, exc_value, traceback) -> None:
49-
pass
50-
51-
def prepare(
52-
self,
53-
request_options: RequestOptions,
54-
use_read_transporter: bool,
55-
) -> Dict[str, Any]:
56-
query_parameters = dict(request_options.query_parameters)
57-
58-
if use_read_transporter:
59-
self._timeout = request_options.timeouts["read"]
60-
self._hosts = self._config.hosts.read()
61-
if isinstance(request_options.data, dict):
62-
query_parameters.update(request_options.data)
63-
return query_parameters
64-
65-
self._timeout = request_options.timeouts["write"]
66-
self._hosts = self._config.hosts.write()
67-
6828
async def request(
6929
self,
7030
verb: Verb,
@@ -81,29 +41,11 @@ async def request(
8141
request_options, verb == Verb.GET or use_read_transporter
8242
)
8343

84-
if query_parameters is not None and len(query_parameters) > 0:
85-
path = "{}?{}".format(
86-
path,
87-
"&".join(
88-
[
89-
"{}={}".format(key, value)
90-
for key, value in query_parameters.items()
91-
]
92-
),
93-
)
44+
path = self.build_path(path, query_parameters)
9445

9546
for host in self._retry_strategy.valid_hosts(self._hosts):
96-
url = "{}://{}{}".format(
97-
host.scheme,
98-
host.url + (":{}".format(host.port) if host.port else ""),
99-
path,
100-
)
101-
102-
proxy = None
103-
if url.startswith("https"):
104-
proxy = self._config.proxies.get("https")
105-
elif url.startswith("http"):
106-
proxy = self._config.proxies.get("http")
47+
url = self.build_url(host, path)
48+
proxy = self.get_proxy(url)
10749

10850
try:
10951
async with timeout(self._timeout / 1000):
@@ -178,5 +120,5 @@ async def request(
178120
query_parameters=request_options.query_parameters,
179121
headers=dict(request_options.headers),
180122
data=request_options.data,
181-
raw_data=request_options.data,
123+
raw_data=request_options.data, # type: ignore
182124
)
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
from json import loads
2+
from sys import version_info
3+
4+
from requests import Request, Session, Timeout
5+
6+
if version_info >= (3, 11):
7+
from typing import Self
8+
else:
9+
from typing_extensions import Self
10+
11+
from requests.adapters import HTTPAdapter
12+
from urllib3.util import Retry
13+
14+
from algoliasearch.http.api_response import ApiResponse
15+
from algoliasearch.http.base_config import BaseConfig
16+
from algoliasearch.http.base_transporter import BaseTransporter
17+
from algoliasearch.http.exceptions import (
18+
AlgoliaUnreachableHostException,
19+
RequestException,
20+
)
21+
from algoliasearch.http.request_options import RequestOptions
22+
from algoliasearch.http.retry import RetryOutcome, RetryStrategy
23+
from algoliasearch.http.verb import Verb
24+
25+
26+
class TransporterSync(BaseTransporter):
27+
_session: Session
28+
29+
def __init__(self, config: BaseConfig) -> None:
30+
self._session = None
31+
self._config = config
32+
self._retry_strategy = RetryStrategy()
33+
self._hosts = []
34+
35+
def __enter__(self) -> Self:
36+
return self
37+
38+
def __exit__(self, exc_type, exc_value, traceback) -> None:
39+
pass
40+
41+
def close(self) -> None:
42+
if self._session is not None:
43+
_session = self._session
44+
self._session = None
45+
46+
_session.close()
47+
48+
def request(
49+
self,
50+
verb: Verb,
51+
path: str,
52+
request_options: RequestOptions,
53+
use_read_transporter: bool,
54+
) -> ApiResponse:
55+
if self._session is None:
56+
self._session = Session()
57+
self._session.mount("https://", HTTPAdapter(max_retries=Retry(connect=0)))
58+
59+
query_parameters = self.prepare(
60+
request_options, verb == Verb.GET or use_read_transporter
61+
)
62+
63+
path = self.build_path(path, query_parameters)
64+
65+
for host in self._retry_strategy.valid_hosts(self._hosts):
66+
url = self.build_url(host, path)
67+
proxy = self.get_proxy(url)
68+
69+
req = Request(
70+
method=verb,
71+
url=url,
72+
headers=request_options.headers,
73+
data=request_options.data,
74+
).prepare()
75+
76+
try:
77+
resp = self._session.send(
78+
req,
79+
timeout=self._timeout / 1000,
80+
proxies=proxy,
81+
)
82+
83+
response = ApiResponse(
84+
verb=verb,
85+
path=path,
86+
url=url,
87+
host=host.url,
88+
status_code=resp.status_code,
89+
headers=resp.headers, # type: ignore -- insensitive dict is still a dict
90+
data=resp.text,
91+
raw_data=resp.text,
92+
error_message=str(resp.reason),
93+
)
94+
except Timeout as e:
95+
response = ApiResponse(
96+
verb=verb,
97+
path=path,
98+
url=url,
99+
host=host.url,
100+
error_message=str(e),
101+
is_timed_out_error=True,
102+
)
103+
104+
decision = self._retry_strategy.decide(host, response)
105+
106+
if decision == RetryOutcome.SUCCESS:
107+
return response
108+
elif decision == RetryOutcome.FAIL:
109+
content = response.error_message
110+
if response.data and "message" in response.data:
111+
content = loads(response.data)["message"]
112+
113+
raise RequestException(content, response.status_code)
114+
115+
raise AlgoliaUnreachableHostException("Unreachable hosts")
116+
117+
118+
class EchoTransporterSync(TransporterSync):
119+
def __init__(self, config: BaseConfig) -> None:
120+
self._config = config
121+
self._retry_strategy = RetryStrategy()
122+
123+
def request(
124+
self,
125+
verb: Verb,
126+
path: str,
127+
request_options: RequestOptions,
128+
use_read_transporter: bool,
129+
) -> ApiResponse:
130+
self.prepare(request_options, verb == Verb.GET or use_read_transporter)
131+
132+
return ApiResponse(
133+
verb=verb,
134+
path=path,
135+
status_code=200,
136+
host=self._retry_strategy.valid_hosts(self._hosts)[0].url,
137+
timeouts={
138+
"connect": request_options.timeouts["connect"],
139+
"response": self._timeout,
140+
},
141+
query_parameters=request_options.query_parameters,
142+
headers=dict(request_options.headers),
143+
data=request_options.data,
144+
raw_data=request_options.data, # type: ignore
145+
)

0 commit comments

Comments
 (0)