Skip to content

Commit b0fccb6

Browse files
authored
Added support for asynchronous usage via ApifyClientAsync (#82)
The Python SDK will use async methods by default, and for it to be effective, we need to support async usage in the Python Client as well. This PR adds `ApifyClientAsync`, a sibling to `ApifyClient` with all methods being `async`. The usage and structure is the same as in `ApifyClient`, but unfortunately, a lot of the code had to be mostly duplicated, because we had to add an `await` here and there, and we had to adjust the typings a bit. At least the docs don't have to be duplicated, I made a helper decorator for that. The downside is that now, if we add features to the client, we will have to add it to both the sync and the async version. Hopefully in the future we will remove the sync client and have only the async one.
1 parent 25338b7 commit b0fccb6

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+6086
-1172
lines changed

.flake8

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ ignore =
1919
D408
2020
D409
2121
D413
22+
ignore-decorators = _make_async_docs
2223
per-file-ignores =
2324
docs/*: D
2425
scripts/*: D

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ Changelog
1212

1313
### Added
1414

15+
- added support for asynchronous usage via `ApifyClientAsync`
1516
- added configurable socket timeout for requests to the Apify API
1617
- added `py.typed` file to signal type checkers that this package is typed
1718
- added method to update status message for a run

docs/docs.md

Lines changed: 3708 additions & 925 deletions
Large diffs are not rendered by default.

docs/res/format_docs.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,14 @@
5151
if match is not None:
5252
current_class = match.group(1)
5353

54-
match = re.match(r'#### (\w+)\([^: ]', line)
54+
match = re.match(r'#### (async )?(\w+)\([^: ]', line)
5555
if match is not None:
56-
method = match.group(1)
57-
method = re.sub('_', '\\_', method)
56+
is_async = match.group(1) is not None
57+
method_name = match.group(2)
58+
method_name = re.sub('_', '\\_', method_name)
5859
if current_class not in toc_methods:
5960
toc_methods[current_class] = []
60-
toc_methods[current_class].append(method)
61+
toc_methods[current_class].append((method_name, is_async))
6162

6263
match = re.match(r'#### (\w+)\( =', line)
6364
if match is not None:
@@ -137,8 +138,9 @@
137138
transformed_lines.append('')
138139

139140
if current_class in toc_methods:
140-
for method in toc_methods[current_class]:
141-
transformed_lines.append(f'* [{method}()](#{current_class.lower()}-{method.lower()})')
141+
for (method_name, is_async) in toc_methods[current_class]:
142+
async_prefix = 'async ' if is_async else ''
143+
transformed_lines.append(f'* [{async_prefix}{method_name}()](#{current_class.lower()}-{method_name.lower()})')
142144
transformed_lines.append('')
143145

144146
if current_class in toc_enum_items:
@@ -168,11 +170,16 @@
168170
line = re.sub(r'### class', f'### [](#{current_class.lower()})', line)
169171

170172
# Add special fragment link marker to each function header (will get used in Apify docs to display "Copy link" link)
171-
match = re.match(r'#### (\w+)\([^: ]', line)
173+
match = re.match(r'#### (async )?(\w+)\([^: ]', line)
172174
if match is not None:
173-
method = match.group(1)
175+
is_async = match.group(1) is not None
176+
method_name = match.group(2)
174177
line = re.sub(r'(#### .*)\\\*(.*)', r'\1*\2', line)
175-
line = re.sub(r'#### (\w+)(\([^)]*\))', f'#### [](#{current_class.lower()}-{method.lower()}) `{current_class}.\\1\\2`', line)
178+
line = re.sub(
179+
r'#### (async )?(\w+)(\([^)]*\))',
180+
f'#### [](#{current_class.lower()}-{method_name.lower()}) `\\1{current_class}.\\2\\3`',
181+
line,
182+
)
176183

177184
# Add special fragment link marker to each enum item header (will get used in Apify docs to display "Copy link" link)
178185
match = re.match(r'#### (\w+)\( =', line)

docs/res/intro.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ By default, it will retry up to 8 times. First retry will be attempted after ~50
5454
and so on. You can configure those parameters using the `max_retries` and `min_delay_between_retries_millis`
5555
options of the `ApifyClient` constructor.
5656

57+
### Support for asynchronous usage
58+
59+
Starting with version 0.7.0, the package offers an asynchronous version of the client, [`ApifyClientAsync`](#ApifyClientAsync),
60+
which allows you to work with the Apify API in an asynchronous way, using the standard `async`/`await` syntax.
61+
5762
### Convenience functions and options
5863

5964
Some actions can't be performed by the API itself, such as indefinite waiting for an actor run to finish
@@ -158,3 +163,25 @@ with apify_client.run('MY-RUN-ID').log().stream() as log_stream:
158163
for line in log_stream.iter_lines():
159164
print(line, end='')
160165
```
166+
167+
### Asynchronous usage
168+
169+
To use the asynchronous [`ApifyClientAsync`](#ApifyClientAsync) in your async code,
170+
you can use the standard `async`/`await` syntax [offered by Python](https://docs.python.org/3/library/asyncio-task.html).
171+
172+
For example, to run an actor and asynchronously stream its log while it's running, you can use this snippet:
173+
174+
```python
175+
from apify_client import ApifyClientAsync
176+
apify_client_async = ApifyClientAsync('MY-APIFY-TOKEN')
177+
178+
async def main():
179+
run = await apify_client_async.actor('my-actor').start()
180+
181+
async with apify_client_async.run(run['id']).log().stream() as async_log_stream:
182+
if async_log_stream:
183+
async for line in async_log_stream.aiter_lines():
184+
print(line, end='')
185+
186+
asyncio.run(main())
187+
```

docs/res/sphinx-config/index.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22
.. autoclass:: ApifyClient
33
:special-members: __init__
44
:members:
5+
.. autoclass:: ApifyClientAsync
6+
:special-members: __init__
7+
:members:
58
.. automodule:: apify_client.clients.resource_clients
69
:members:
710
.. autoclass:: apify_client._utils.ListPage

src/apify_client/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
from ._version import __version__
2-
from .client import ApifyClient
2+
from .client import ApifyClient, ApifyClientAsync
33

4-
__all__ = ['ApifyClient', '__version__']
4+
__all__ = ['ApifyClient', 'ApifyClientAsync', '__version__']

src/apify_client/_http_client.py

Lines changed: 129 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,20 @@
33
import os
44
import sys
55
from http import HTTPStatus
6-
from typing import Any, Callable, Dict, Optional
6+
from typing import Any, Callable, Dict, Optional, Tuple
77

88
import httpx
99

1010
from ._errors import ApifyApiError, InvalidResponseBodyError, _is_retryable_error
1111
from ._types import JSONSerializable
12-
from ._utils import _is_content_type_json, _is_content_type_text, _is_content_type_xml, _retry_with_exp_backoff
12+
from ._utils import _is_content_type_json, _is_content_type_text, _is_content_type_xml, _retry_with_exp_backoff, _retry_with_exp_backoff_async
1313
from ._version import __version__
1414

1515
DEFAULT_BACKOFF_EXPONENTIAL_FACTOR = 2
1616
DEFAULT_BACKOFF_RANDOM_FACTOR = 1
1717

1818

19-
class _HTTPClient:
19+
class _BaseHTTPClient:
2020
def __init__(
2121
self,
2222
*,
@@ -41,28 +41,52 @@ def __init__(
4141
headers['Authorization'] = f'Bearer {token}'
4242

4343
self.httpx_client = httpx.Client(headers=headers, follow_redirects=True, timeout=timeout_secs)
44+
self.httpx_async_client = httpx.AsyncClient(headers=headers, follow_redirects=True, timeout=timeout_secs)
4445

45-
def call(
46+
@staticmethod
47+
def _maybe_parse_response(response: httpx.Response) -> Any:
48+
if response.status_code == HTTPStatus.NO_CONTENT:
49+
return None
50+
51+
content_type = ''
52+
if 'content-type' in response.headers:
53+
content_type = response.headers['content-type'].split(';')[0].strip()
54+
55+
try:
56+
if _is_content_type_json(content_type):
57+
return response.json()
58+
elif _is_content_type_xml(content_type) or _is_content_type_text(content_type):
59+
return response.text
60+
else:
61+
return response.content
62+
except ValueError as err:
63+
raise InvalidResponseBodyError(response) from err
64+
65+
@staticmethod
66+
def _parse_params(params: Optional[Dict]) -> Optional[Dict]:
67+
if params is None:
68+
return None
69+
70+
parsed_params = {}
71+
for key, value in params.items():
72+
# Our API needs to have boolean parameters passed as 0 or 1, therefore we have to replace them
73+
if isinstance(value, bool):
74+
parsed_params[key] = int(value)
75+
elif value is not None:
76+
parsed_params[key] = value
77+
78+
return parsed_params
79+
80+
def _prepare_request_call(
4681
self,
47-
*,
48-
method: str,
49-
url: str,
5082
headers: Optional[Dict] = None,
5183
params: Optional[Dict] = None,
5284
data: Optional[Any] = None,
5385
json: Optional[JSONSerializable] = None,
54-
stream: Optional[bool] = None,
55-
parse_response: Optional[bool] = True,
56-
) -> httpx.Response:
57-
if stream and parse_response:
58-
raise ValueError('Cannot stream response and parse it at the same time!')
59-
86+
) -> Tuple[Dict, Optional[Dict], Any]:
6087
if json and data:
6188
raise ValueError('Cannot pass both "json" and "data" parameters at the same time!')
6289

63-
request_params = self._parse_params(params)
64-
httpx_client = self.httpx_client
65-
6690
if not headers:
6791
headers = {}
6892

@@ -77,23 +101,48 @@ def call(
77101
data = gzip.compress(data)
78102
headers['Content-Encoding'] = 'gzip'
79103

80-
# httpx uses `content` instead of `data` for binary content, let's rename it here to be clear about it
81-
content = data
104+
return (
105+
headers,
106+
self._parse_params(params),
107+
data,
108+
)
109+
110+
111+
class _HTTPClient(_BaseHTTPClient):
112+
def call(
113+
self,
114+
*,
115+
method: str,
116+
url: str,
117+
headers: Optional[Dict] = None,
118+
params: Optional[Dict] = None,
119+
data: Optional[Any] = None,
120+
json: Optional[JSONSerializable] = None,
121+
stream: Optional[bool] = None,
122+
parse_response: Optional[bool] = True,
123+
) -> httpx.Response:
124+
if stream and parse_response:
125+
raise ValueError('Cannot stream response and parse it at the same time!')
126+
127+
headers, params, content = self._prepare_request_call(headers, params, data, json)
128+
129+
httpx_client = self.httpx_client
82130

83131
def _make_request(stop_retrying: Callable, attempt: int) -> httpx.Response:
84132
try:
85133
request = httpx_client.build_request(
86134
method=method,
87135
url=url,
88136
headers=headers,
89-
params=request_params,
137+
params=params,
90138
content=content,
91139
)
92140
response = httpx_client.send(
93141
request=request,
94142
stream=stream or False,
95143
)
96144

145+
# If response status is < 300, the request was successful, and we can return the result
97146
if response.status_code < 300:
98147
if not stream:
99148
if parse_response:
@@ -109,6 +158,8 @@ def _make_request(stop_retrying: Callable, attempt: int) -> httpx.Response:
109158
stop_retrying()
110159
raise e
111160

161+
# We want to retry only requests which are server errors (status >= 500) and could resolve on their own,
162+
# and also retry rate limited requests that throw 429 Too Many Requests errors
112163
if response.status_code < 500 and response.status_code != HTTPStatus.TOO_MANY_REQUESTS:
113164
stop_retrying()
114165
raise ApifyApiError(response, attempt)
@@ -121,36 +172,67 @@ def _make_request(stop_retrying: Callable, attempt: int) -> httpx.Response:
121172
random_factor=DEFAULT_BACKOFF_RANDOM_FACTOR,
122173
)
123174

124-
@staticmethod
125-
def _maybe_parse_response(response: httpx.Response) -> Any:
126-
if response.status_code == HTTPStatus.NO_CONTENT:
127-
return None
128175

129-
content_type = ''
130-
if 'content-type' in response.headers:
131-
content_type = response.headers['content-type'].split(';')[0].strip()
176+
class _HTTPClientAsync(_BaseHTTPClient):
177+
async def call(
178+
self,
179+
*,
180+
method: str,
181+
url: str,
182+
headers: Optional[Dict] = None,
183+
params: Optional[Dict] = None,
184+
data: Optional[Any] = None,
185+
json: Optional[JSONSerializable] = None,
186+
stream: Optional[bool] = None,
187+
parse_response: Optional[bool] = True,
188+
) -> httpx.Response:
189+
if stream and parse_response:
190+
raise ValueError('Cannot stream response and parse it at the same time!')
132191

133-
try:
134-
if _is_content_type_json(content_type):
135-
return response.json()
136-
elif _is_content_type_xml(content_type) or _is_content_type_text(content_type):
137-
return response.text
138-
else:
139-
return response.content
140-
except ValueError as err:
141-
raise InvalidResponseBodyError(response) from err
192+
headers, params, content = self._prepare_request_call(headers, params, data, json)
142193

143-
@staticmethod
144-
def _parse_params(params: Optional[Dict]) -> Optional[Dict]:
145-
if params is None:
146-
return None
194+
httpx_async_client = self.httpx_async_client
147195

148-
parsed_params = {}
149-
for key, value in params.items():
150-
# Our API needs to have boolean parameters passed as 0 or 1, therefore we have to replace them
151-
if isinstance(value, bool):
152-
parsed_params[key] = int(value)
153-
elif value is not None:
154-
parsed_params[key] = value
196+
async def _make_request(stop_retrying: Callable, attempt: int) -> httpx.Response:
197+
try:
198+
request = httpx_async_client.build_request(
199+
method=method,
200+
url=url,
201+
headers=headers,
202+
params=params,
203+
content=content,
204+
)
205+
response = await httpx_async_client.send(
206+
request=request,
207+
stream=stream or False,
208+
)
155209

156-
return parsed_params
210+
# If response status is < 300, the request was successful, and we can return the result
211+
if response.status_code < 300:
212+
if not stream:
213+
if parse_response:
214+
_maybe_parsed_body = self._maybe_parse_response(response)
215+
else:
216+
_maybe_parsed_body = response.content
217+
setattr(response, '_maybe_parsed_body', _maybe_parsed_body) # noqa: B010
218+
219+
return response
220+
221+
except Exception as e:
222+
if not _is_retryable_error(e):
223+
stop_retrying()
224+
raise e
225+
226+
# We want to retry only requests which are server errors (status >= 500) and could resolve on their own,
227+
# and also retry rate limited requests that throw 429 Too Many Requests errors
228+
if response.status_code < 500 and response.status_code != HTTPStatus.TOO_MANY_REQUESTS:
229+
stop_retrying()
230+
raise ApifyApiError(response, attempt)
231+
232+
return await _retry_with_exp_backoff_async(
233+
_make_request,
234+
max_retries=self.max_retries,
235+
backoff_base_millis=self.min_delay_between_retries_millis,
236+
backoff_factor=DEFAULT_BACKOFF_EXPONENTIAL_FACTOR,
237+
random_factor=DEFAULT_BACKOFF_RANDOM_FACTOR,
238+
)

0 commit comments

Comments
 (0)