Skip to content

Commit e2d5e63

Browse files
feat: Enable webauthn plugin for security keys (#1528)
* feat: Enable webauthn handling when plugin is installed. * Minor code cleanup. * feat: Enable webauthn plugin for security keys Move key press prompt and remove TODO question. * feat: Enable webauthn plugin for security keys Fix lint and mypy errors. * feat: Enable webauthn plugin for security keys Check dict accesses for None. Remove commented out line. * feat: Enable webauthn plugin for security keys Change _urlsafe_b64recode to _unpadded_urlsafe_b64recode for clarity. * feat: Enable webauthn plugin for security keys Fix broken test and add test clauses to bring coverage to 100%. --------- Co-authored-by: arithmetic1728 <[email protected]>
1 parent adb94f7 commit e2d5e63

File tree

4 files changed

+276
-0
lines changed

4 files changed

+276
-0
lines changed

google/oauth2/challenges.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,19 @@
2222

2323
from google.auth import _helpers
2424
from google.auth import exceptions
25+
from google.oauth2 import webauthn_handler_factory
26+
from google.oauth2.webauthn_types import (
27+
AuthenticationExtensionsClientInputs,
28+
GetRequest,
29+
PublicKeyCredentialDescriptor,
30+
)
2531

2632

2733
REAUTH_ORIGIN = "https://accounts.google.com"
2834
SAML_CHALLENGE_MESSAGE = (
2935
"Please run `gcloud auth login` to complete reauthentication with SAML."
3036
)
37+
WEBAUTHN_TIMEOUT_MS = 120000 # Two minute timeout
3138

3239

3340
def get_user_password(text):
@@ -110,6 +117,17 @@ def is_locally_eligible(self):
110117

111118
@_helpers.copy_docstring(ReauthChallenge)
112119
def obtain_challenge_input(self, metadata):
120+
# Check if there is an available Webauthn Handler, if not use pyu2f
121+
try:
122+
factory = webauthn_handler_factory.WebauthnHandlerFactory()
123+
webauthn_handler = factory.get_handler()
124+
if webauthn_handler is not None:
125+
sys.stderr.write("Please insert and touch your security key\n")
126+
return self._obtain_challenge_input_webauthn(metadata, webauthn_handler)
127+
except Exception:
128+
# Attempt pyu2f if exception in webauthn flow
129+
pass
130+
113131
try:
114132
import pyu2f.convenience.authenticator # type: ignore
115133
import pyu2f.errors # type: ignore
@@ -173,6 +191,66 @@ def obtain_challenge_input(self, metadata):
173191
sys.stderr.write("No security key found.\n")
174192
return None
175193

194+
def _obtain_challenge_input_webauthn(self, metadata, webauthn_handler):
195+
sk = metadata.get("securityKey")
196+
if sk is None:
197+
raise exceptions.InvalidValue("securityKey is None")
198+
challenges = sk.get("challenges")
199+
application_id = sk.get("applicationId")
200+
relying_party_id = sk.get("relyingPartyId")
201+
if challenges is None or len(challenges) < 1:
202+
raise exceptions.InvalidValue("challenges is None or empty")
203+
if application_id is None:
204+
raise exceptions.InvalidValue("application_id is None")
205+
if relying_party_id is None:
206+
raise exceptions.InvalidValue("relying_party_id is None")
207+
208+
allow_credentials = []
209+
for challenge in challenges:
210+
kh = challenge.get("keyHandle")
211+
if kh is None:
212+
raise exceptions.InvalidValue("keyHandle is None")
213+
key_handle = self._unpadded_urlsafe_b64recode(kh)
214+
allow_credentials.append(PublicKeyCredentialDescriptor(id=key_handle))
215+
216+
extension = AuthenticationExtensionsClientInputs(appid=application_id)
217+
218+
challenge = challenges[0].get("challenge")
219+
if challenge is None:
220+
raise exceptions.InvalidValue("challenge is None")
221+
222+
get_request = GetRequest(
223+
origin=REAUTH_ORIGIN,
224+
rpid=relying_party_id,
225+
challenge=self._unpadded_urlsafe_b64recode(challenge),
226+
timeout_ms=WEBAUTHN_TIMEOUT_MS,
227+
allow_credentials=allow_credentials,
228+
user_verification="required",
229+
extensions=extension,
230+
)
231+
232+
try:
233+
get_response = webauthn_handler.get(get_request)
234+
except Exception as e:
235+
sys.stderr.write("Webauthn Error: {}.\n".format(e))
236+
raise e
237+
238+
response = {
239+
"clientData": get_response.response.client_data_json,
240+
"authenticatorData": get_response.response.authenticator_data,
241+
"signatureData": get_response.response.signature,
242+
"applicationId": application_id,
243+
"keyHandle": get_response.id,
244+
"securityKeyReplyType": 2,
245+
}
246+
return {"securityKey": response}
247+
248+
def _unpadded_urlsafe_b64recode(self, s):
249+
"""Converts standard b64 encoded string to url safe b64 encoded string
250+
with no padding."""
251+
b = base64.urlsafe_b64decode(s)
252+
return base64.urlsafe_b64encode(b).decode().rstrip("=")
253+
176254

177255
class SamlChallenge(ReauthChallenge):
178256
"""Challenge that asks the users to browse to their ID Providers.
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from typing import List, Optional
2+
3+
from google.oauth2.webauthn_handler import PluginHandler, WebAuthnHandler
4+
5+
6+
class WebauthnHandlerFactory:
7+
handlers: List[WebAuthnHandler]
8+
9+
def __init__(self):
10+
self.handlers = [PluginHandler()]
11+
12+
def get_handler(self) -> Optional[WebAuthnHandler]:
13+
for handler in self.handlers:
14+
if handler.is_available():
15+
return handler
16+
return None

tests/oauth2/test_challenges.py

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
"""Tests for the reauth module."""
1616

1717
import base64
18+
import os
1819
import sys
1920

2021
import mock
@@ -23,6 +24,13 @@
2324

2425
from google.auth import exceptions
2526
from google.oauth2 import challenges
27+
from google.oauth2.webauthn_types import (
28+
AuthenticationExtensionsClientInputs,
29+
AuthenticatorAssertionResponse,
30+
GetRequest,
31+
GetResponse,
32+
PublicKeyCredentialDescriptor,
33+
)
2634

2735

2836
def test_get_user_password():
@@ -54,6 +62,8 @@ def test_security_key():
5462

5563
# Test the case that security key challenge is passed with applicationId and
5664
# relyingPartyId the same.
65+
os.environ.pop('"GOOGLE_AUTH_WEBAUTHN_PLUGIN"', None)
66+
5767
with mock.patch("pyu2f.model.RegisteredKey", return_value=mock_key):
5868
with mock.patch(
5969
"pyu2f.convenience.authenticator.CompositeAuthenticator.Authenticate"
@@ -70,6 +80,19 @@ def test_security_key():
7080
print_callback=sys.stderr.write,
7181
)
7282

83+
# Test the case that webauthn plugin is available
84+
os.environ["GOOGLE_AUTH_WEBAUTHN_PLUGIN"] = "plugin"
85+
86+
with mock.patch(
87+
"google.oauth2.challenges.SecurityKeyChallenge._obtain_challenge_input_webauthn",
88+
return_value={"securityKey": "security key response"},
89+
):
90+
91+
assert challenge.obtain_challenge_input(metadata) == {
92+
"securityKey": "security key response"
93+
}
94+
os.environ.pop('"GOOGLE_AUTH_WEBAUTHN_PLUGIN"', None)
95+
7396
# Test the case that security key challenge is passed with applicationId and
7497
# relyingPartyId different, first call works.
7598
metadata["securityKey"]["relyingPartyId"] = "security_key_relying_party_id"
@@ -173,6 +196,136 @@ def test_security_key():
173196
assert excinfo.match(r"pyu2f dependency is required")
174197

175198

199+
def test_security_key_webauthn():
200+
metadata = {
201+
"status": "READY",
202+
"challengeId": 2,
203+
"challengeType": "SECURITY_KEY",
204+
"securityKey": {
205+
"applicationId": "security_key_application_id",
206+
"challenges": [
207+
{
208+
"keyHandle": "some_key",
209+
"challenge": base64.urlsafe_b64encode(
210+
"some_challenge".encode("ascii")
211+
).decode("ascii"),
212+
}
213+
],
214+
"relyingPartyId": "security_key_application_id",
215+
},
216+
}
217+
218+
challenge = challenges.SecurityKeyChallenge()
219+
220+
sk = metadata["securityKey"]
221+
sk_challenges = sk["challenges"]
222+
223+
application_id = sk["applicationId"]
224+
225+
allow_credentials = []
226+
for sk_challenge in sk_challenges:
227+
allow_credentials.append(
228+
PublicKeyCredentialDescriptor(id=sk_challenge["keyHandle"])
229+
)
230+
231+
extension = AuthenticationExtensionsClientInputs(appid=application_id)
232+
233+
get_request = GetRequest(
234+
origin=challenges.REAUTH_ORIGIN,
235+
rpid=application_id,
236+
challenge=challenge._unpadded_urlsafe_b64recode(sk_challenge["challenge"]),
237+
timeout_ms=challenges.WEBAUTHN_TIMEOUT_MS,
238+
allow_credentials=allow_credentials,
239+
user_verification="required",
240+
extensions=extension,
241+
)
242+
243+
assertion_resp = AuthenticatorAssertionResponse(
244+
client_data_json="clientDataJSON",
245+
authenticator_data="authenticatorData",
246+
signature="signature",
247+
user_handle="userHandle",
248+
)
249+
get_response = GetResponse(
250+
id="id",
251+
response=assertion_resp,
252+
authenticator_attachment="authenticatorAttachment",
253+
client_extension_results="clientExtensionResults",
254+
)
255+
response = {
256+
"clientData": get_response.response.client_data_json,
257+
"authenticatorData": get_response.response.authenticator_data,
258+
"signatureData": get_response.response.signature,
259+
"applicationId": "security_key_application_id",
260+
"keyHandle": get_response.id,
261+
"securityKeyReplyType": 2,
262+
}
263+
264+
mock_handler = mock.Mock()
265+
mock_handler.get.return_value = get_response
266+
267+
# Test success case
268+
assert challenge._obtain_challenge_input_webauthn(metadata, mock_handler) == {
269+
"securityKey": response
270+
}
271+
mock_handler.get.assert_called_with(get_request)
272+
273+
# Test exceptions
274+
275+
# Missing Values
276+
sk = metadata["securityKey"]
277+
metadata["securityKey"] = None
278+
with pytest.raises(exceptions.InvalidValue):
279+
challenge._obtain_challenge_input_webauthn(metadata, mock_handler)
280+
metadata["securityKey"] = sk
281+
282+
c = metadata["securityKey"]["challenges"]
283+
metadata["securityKey"]["challenges"] = None
284+
with pytest.raises(exceptions.InvalidValue):
285+
challenge._obtain_challenge_input_webauthn(metadata, mock_handler)
286+
metadata["securityKey"]["challenges"] = []
287+
with pytest.raises(exceptions.InvalidValue):
288+
challenge._obtain_challenge_input_webauthn(metadata, mock_handler)
289+
metadata["securityKey"]["challenges"] = c
290+
291+
aid = metadata["securityKey"]["applicationId"]
292+
metadata["securityKey"]["applicationId"] = None
293+
with pytest.raises(exceptions.InvalidValue):
294+
challenge._obtain_challenge_input_webauthn(metadata, mock_handler)
295+
metadata["securityKey"]["applicationId"] = aid
296+
297+
rpi = metadata["securityKey"]["relyingPartyId"]
298+
metadata["securityKey"]["relyingPartyId"] = None
299+
with pytest.raises(exceptions.InvalidValue):
300+
challenge._obtain_challenge_input_webauthn(metadata, mock_handler)
301+
metadata["securityKey"]["relyingPartyId"] = rpi
302+
303+
kh = metadata["securityKey"]["challenges"][0]["keyHandle"]
304+
metadata["securityKey"]["challenges"][0]["keyHandle"] = None
305+
with pytest.raises(exceptions.InvalidValue):
306+
challenge._obtain_challenge_input_webauthn(metadata, mock_handler)
307+
metadata["securityKey"]["challenges"][0]["keyHandle"] = kh
308+
309+
ch = metadata["securityKey"]["challenges"][0]["challenge"]
310+
metadata["securityKey"]["challenges"][0]["challenge"] = None
311+
with pytest.raises(exceptions.InvalidValue):
312+
challenge._obtain_challenge_input_webauthn(metadata, mock_handler)
313+
metadata["securityKey"]["challenges"][0]["challenge"] = ch
314+
315+
# Handler Exceptions
316+
mock_handler.get.side_effect = exceptions.MalformedError
317+
with pytest.raises(exceptions.MalformedError):
318+
challenge._obtain_challenge_input_webauthn(metadata, mock_handler)
319+
320+
mock_handler.get.side_effect = exceptions.InvalidResource
321+
with pytest.raises(exceptions.InvalidResource):
322+
challenge._obtain_challenge_input_webauthn(metadata, mock_handler)
323+
324+
mock_handler.get.side_effect = exceptions.ReauthFailError
325+
with pytest.raises(exceptions.ReauthFailError):
326+
challenge._obtain_challenge_input_webauthn(metadata, mock_handler)
327+
328+
176329
@mock.patch("getpass.getpass", return_value="foo")
177330
def test_password_challenge(getpass_mock):
178331
challenge = challenges.PasswordChallenge()
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import mock
2+
import pytest # type: ignore
3+
4+
from google.oauth2 import webauthn_handler
5+
from google.oauth2 import webauthn_handler_factory
6+
7+
8+
@pytest.fixture
9+
def os_get_stub():
10+
with mock.patch.object(
11+
webauthn_handler.os.environ,
12+
"get",
13+
return_value="gcloud_webauthn_plugin",
14+
name="fake os.environ.get",
15+
) as mock_os_environ_get:
16+
yield mock_os_environ_get
17+
18+
19+
# Check that get_handler returns a value when env is set,
20+
# that type is PluginHandler, and that no value is returned
21+
# if env not set.
22+
def test_WebauthHandlerFactory_get(os_get_stub):
23+
factory = webauthn_handler_factory.WebauthnHandlerFactory()
24+
assert factory.get_handler() is not None
25+
26+
assert isinstance(factory.get_handler(), webauthn_handler.PluginHandler)
27+
28+
os_get_stub.return_value = None
29+
assert factory.get_handler() is None

0 commit comments

Comments
 (0)