Skip to content

feat(python): add sync client #3609

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
dd5cf21
feat(python): add sync client
shortcuts Aug 28, 2024
6d1a5d0
chore: gen cts sync tests
shortcuts Aug 28, 2024
874e4f2
chore(javascript): add license to the root of the repo (#3610)
shortcuts Aug 28, 2024
cc9cd24
chore(javascript): add license to the root of the repo (#3610) (gener…
algolia-bot Aug 28, 2024
2ff422b
chore: some type improvements
shortcuts Aug 29, 2024
f6a3248
Merge branch 'main' into feat/python-sync
shortcuts Sep 5, 2024
d305a02
feat: async transporter
shortcuts Sep 5, 2024
dc0de70
Merge branch 'main' into feat/python-sync
shortcuts Sep 5, 2024
30fb81f
chore: generation file is wrong
shortcuts Sep 5, 2024
617ff38
chore: generation file is wrong
shortcuts Sep 5, 2024
2a92677
chore: sync and async modes
shortcuts Sep 5, 2024
b9f85eb
fix: template
shortcuts Sep 5, 2024
76390a4
fix: sync requester
shortcuts Sep 5, 2024
574cb23
chore: wouhou
shortcuts Sep 5, 2024
4e27ce1
Merge branch 'main' into feat/python-sync
shortcuts Sep 6, 2024
747b0bc
fix: test server
shortcuts Sep 6, 2024
f7fe3be
Merge branch 'main' into feat/python-sync
shortcuts Sep 6, 2024
25456a9
fix: e2e
shortcuts Sep 6, 2024
e83557e
chore: swifty wrongy
shortcuts Sep 6, 2024
2b8106d
Merge branch 'main' into feat/python-sync
shortcuts Sep 9, 2024
335212f
chore: more explicit templates
shortcuts Sep 9, 2024
626a235
chore: revert generated files
shortcuts Sep 9, 2024
5b55d8e
chore: revert generated files
shortcuts Sep 9, 2024
85177d4
Merge branch 'main' into feat/python-sync
shortcuts Sep 9, 2024
87a31b2
chore: lock
shortcuts Sep 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from typing import Any, Dict, List

from algoliasearch.http.base_config import BaseConfig
from algoliasearch.http.hosts import Host
from algoliasearch.http.request_options import RequestOptions
from algoliasearch.http.retry import RetryStrategy


class BaseTransporter:
_config: BaseConfig
_retry_strategy: RetryStrategy
_hosts: List[Host]

def __init__(self, config: BaseConfig) -> None:
self._config = config
self._retry_strategy = RetryStrategy()
self._hosts = []

def prepare(
self,
request_options: RequestOptions,
use_read_transporter: bool,
) -> Dict[str, Any]:
query_parameters = dict(request_options.query_parameters)

if use_read_transporter:
self._timeout = request_options.timeouts["read"]
self._hosts = self._config.hosts.read()
if isinstance(request_options.data, dict):
query_parameters.update(request_options.data)
return query_parameters

self._timeout = request_options.timeouts["write"]
self._hosts = self._config.hosts.write()

def build_path(self, path, query_parameters):
if query_parameters is not None and len(query_parameters) > 0:
return "{}?{}".format(
path,
"&".join(
[
"{}={}".format(key, value)
for key, value in query_parameters.items()
]
),
)
return path

def build_url(self, host, path):
return "{}://{}{}".format(
host.scheme,
host.url + (":{}".format(host.port) if host.port else ""),
path,
)

def get_proxy(self, url):
if url.startswith("https"):
return self._config.proxies.get("https")
elif url.startswith("http"):
return self._config.proxies.get("http")
else:
return None
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# coding: utf-8

from asyncio import sleep
import asyncio
import time
from typing import Callable, TypeVar

T = TypeVar("T")
Expand Down Expand Up @@ -45,7 +46,39 @@ async def retry(prev: T = None) -> T:
raise Exception("An error occurred")
raise Exception(error_message(resp))

await sleep(timeout())
await asyncio.sleep(timeout())
return await retry(resp)

return await retry()


def create_iterable_sync(
func: Callable[[T], T],
validate: Callable[[T], bool],
aggregator: Callable[[T], None],
timeout: Timeout = Timeout(),
error_validate: Callable[[T], bool] = None,
error_message: Callable[[T], str] = None,
) -> T:
"""
Helper: Iterates until the given `func` until `timeout` or `validate`.
"""

def retry(prev: T = None) -> T:
resp = func(prev)

if aggregator:
aggregator(resp)

if validate(resp):
return resp

if error_validate is not None and error_validate(resp):
if error_message is None:
raise Exception("An error occurred")
raise Exception(error_message(resp))

time.sleep(timeout())
return retry(resp)

return retry()
Original file line number Diff line number Diff line change
@@ -1,70 +1,30 @@
from asyncio import TimeoutError
from json import loads
from typing import Any, Dict, List

from aiohttp import ClientSession, TCPConnector
from async_timeout import timeout

from algoliasearch.http.api_response import ApiResponse
from algoliasearch.http.base_config import BaseConfig
from algoliasearch.http.base_transporter import BaseTransporter
from algoliasearch.http.exceptions import (
AlgoliaUnreachableHostException,
RequestException,
)
from algoliasearch.http.hosts import Host
from algoliasearch.http.request_options import RequestOptions
from algoliasearch.http.retry import RetryOutcome, RetryStrategy
from algoliasearch.http.verb import Verb


class Transporter:
_config: BaseConfig
_retry_strategy: RetryStrategy
class Transporter(BaseTransporter):
_session: ClientSession
_hosts: List[Host]

def __init__(self, config: BaseConfig) -> None:
self._session = None
self._config = config
self._retry_strategy = RetryStrategy()
self._session = None
self._hosts = []

async def __aenter__(self) -> None:
return self

async def __aexit__(self, exc_type, exc_value, traceback) -> None:
await self.close()

async def close(self) -> None:
if self._session is not None:
session = self._session
self._session = None

await session.close()

def __enter__(self) -> None:
return self

def __exit__(self, exc_type, exc_value, traceback) -> None:
pass

def prepare(
self,
request_options: RequestOptions,
use_read_transporter: bool,
) -> Dict[str, Any]:
query_parameters = dict(request_options.query_parameters)

if use_read_transporter:
self._timeout = request_options.timeouts["read"]
self._hosts = self._config.hosts.read()
if isinstance(request_options.data, dict):
query_parameters.update(request_options.data)
return query_parameters

self._timeout = request_options.timeouts["write"]
self._hosts = self._config.hosts.write()

async def request(
self,
verb: Verb,
Expand All @@ -81,29 +41,11 @@ async def request(
request_options, verb == Verb.GET or use_read_transporter
)

if query_parameters is not None and len(query_parameters) > 0:
path = "{}?{}".format(
path,
"&".join(
[
"{}={}".format(key, value)
for key, value in query_parameters.items()
]
),
)
path = self.build_path(path, query_parameters)

for host in self._retry_strategy.valid_hosts(self._hosts):
url = "{}://{}{}".format(
host.scheme,
host.url + (":{}".format(host.port) if host.port else ""),
path,
)

proxy = None
if url.startswith("https"):
proxy = self._config.proxies.get("https")
elif url.startswith("http"):
proxy = self._config.proxies.get("http")
url = self.build_url(host, path)
proxy = self.get_proxy(url)

try:
async with timeout(self._timeout / 1000):
Expand Down Expand Up @@ -178,5 +120,5 @@ async def request(
query_parameters=request_options.query_parameters,
headers=dict(request_options.headers),
data=request_options.data,
raw_data=request_options.data,
raw_data=request_options.data, # type: ignore
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
from json import loads
from sys import version_info

from requests import Request, Session, Timeout

if version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self

from requests.adapters import HTTPAdapter
from urllib3.util import Retry

from algoliasearch.http.api_response import ApiResponse
from algoliasearch.http.base_config import BaseConfig
from algoliasearch.http.base_transporter import BaseTransporter
from algoliasearch.http.exceptions import (
AlgoliaUnreachableHostException,
RequestException,
)
from algoliasearch.http.request_options import RequestOptions
from algoliasearch.http.retry import RetryOutcome, RetryStrategy
from algoliasearch.http.verb import Verb


class TransporterSync(BaseTransporter):
_session: Session

def __init__(self, config: BaseConfig) -> None:
self._session = None
self._config = config
self._retry_strategy = RetryStrategy()
self._hosts = []

def __enter__(self) -> Self:
return self

def __exit__(self, exc_type, exc_value, traceback) -> None:
pass

def close(self) -> None:
if self._session is not None:
_session = self._session
self._session = None

_session.close()

def request(
self,
verb: Verb,
path: str,
request_options: RequestOptions,
use_read_transporter: bool,
) -> ApiResponse:
if self._session is None:
self._session = Session()
self._session.mount("https://", HTTPAdapter(max_retries=Retry(connect=0)))

query_parameters = self.prepare(
request_options, verb == Verb.GET or use_read_transporter
)

path = self.build_path(path, query_parameters)

for host in self._retry_strategy.valid_hosts(self._hosts):
url = self.build_url(host, path)
proxy = self.get_proxy(url)

req = Request(
method=verb,
url=url,
headers=request_options.headers,
data=request_options.data,
).prepare()

try:
resp = self._session.send(
req,
timeout=self._timeout / 1000,
proxies=proxy,
)

response = ApiResponse(
verb=verb,
path=path,
url=url,
host=host.url,
status_code=resp.status_code,
headers=resp.headers, # type: ignore -- insensitive dict is still a dict
data=resp.text,
raw_data=resp.text,
error_message=str(resp.reason),
)
except Timeout as e:
response = ApiResponse(
verb=verb,
path=path,
url=url,
host=host.url,
error_message=str(e),
is_timed_out_error=True,
)

decision = self._retry_strategy.decide(host, response)

if decision == RetryOutcome.SUCCESS:
return response
elif decision == RetryOutcome.FAIL:
content = response.error_message
if response.data and "message" in response.data:
content = loads(response.data)["message"]

raise RequestException(content, response.status_code)

raise AlgoliaUnreachableHostException("Unreachable hosts")


class EchoTransporterSync(TransporterSync):
def __init__(self, config: BaseConfig) -> None:
self._config = config
self._retry_strategy = RetryStrategy()

def request(
self,
verb: Verb,
path: str,
request_options: RequestOptions,
use_read_transporter: bool,
) -> ApiResponse:
self.prepare(request_options, verb == Verb.GET or use_read_transporter)

return ApiResponse(
verb=verb,
path=path,
status_code=200,
host=self._retry_strategy.valid_hosts(self._hosts)[0].url,
timeouts={
"connect": request_options.timeouts["connect"],
"response": self._timeout,
},
query_parameters=request_options.query_parameters,
headers=dict(request_options.headers),
data=request_options.data,
raw_data=request_options.data, # type: ignore
)
Loading
Loading