Skip to content

Commit 7042ff5

Browse files
committed
Rewrite authcode.py to support dynamic port
Details: No longer display auth code in the result page Adds timeout behavior Use optional text parameter to toggle landing page Supports state validation Supports dynamic port Returns auth_response instead of auth_code Refactor internal API layers Carefully choose the address to listen to Use RuntimeError for timeout, and let it bubble up Conclude the research on IPv6 Expose minimal API from authcode module
1 parent e6be9ec commit 7042ff5

File tree

1 file changed

+135
-42
lines changed

1 file changed

+135
-42
lines changed

oauth2cli/authcode.py

Lines changed: 135 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
"""
88
import webbrowser
99
import logging
10+
import socket
1011

1112
try: # Python 3
1213
from http.server import HTTPServer, BaseHTTPRequestHandler
@@ -19,40 +20,16 @@
1920

2021
logger = logging.getLogger(__name__)
2122

22-
def obtain_auth_code(listen_port, auth_uri=None):
23-
"""This function will start a web server listening on http://localhost:port
24-
and then you need to open a browser on this device and visit your auth_uri.
25-
When interaction finishes, this function will return the auth code,
26-
and then shut down the local web server.
27-
28-
:param listen_port:
29-
The local web server will listen at http://localhost:<listen_port>
30-
Unless the authorization server supports dynamic port,
31-
you need to use the same port when you register with your app.
32-
:param auth_uri: If provided, this function will try to open a local browser.
33-
:return: Hang indefinitely, until it receives and then return the auth code.
34-
"""
35-
exit_hint = "Visit http://localhost:{p}?code=exit to abort".format(p=listen_port)
36-
logger.warning(exit_hint)
37-
if auth_uri:
38-
page = "http://localhost:{p}?{q}".format(p=listen_port, q=urlencode({
39-
"text": "Open this link to sign in. You may use incognito window",
40-
"link": auth_uri,
41-
"exit_hint": exit_hint,
42-
}))
43-
browse(page)
44-
server = HTTPServer(("", int(listen_port)), AuthCodeReceiver)
45-
try:
46-
server.authcode = None
47-
while not server.authcode:
48-
# Derived from
49-
# https://docs.python.org/2/library/basehttpserver.html#more-examples
50-
server.handle_request()
51-
return server.authcode
52-
finally:
53-
server.server_close()
5423

55-
def browse(auth_uri):
24+
def obtain_auth_code(listen_port, auth_uri=None): # For backward compatibility
25+
with AuthCodeReceiver(port=listen_port) as receiver:
26+
return receiver.get_auth_response(
27+
auth_uri=auth_uri,
28+
text="Open this link to sign in. You may use incognito window",
29+
).get("code")
30+
31+
32+
def _browse(auth_uri):
5633
controller = webbrowser.get() # Get a default controller
5734
# Some Linux Distro does not setup default browser properly,
5835
# so we try to explicitly use some popular browser, if we found any.
@@ -65,14 +42,18 @@ def browse(auth_uri):
6542
logger.info("Please open a browser on THIS device to visit: %s" % auth_uri)
6643
controller.open(auth_uri)
6744

68-
class AuthCodeReceiver(BaseHTTPRequestHandler):
45+
46+
class _AuthCodeHandler(BaseHTTPRequestHandler):
6947
def do_GET(self):
7048
# For flexibility, we choose to not check self.path matching redirect_uri
7149
#assert self.path.startswith('/THE_PATH_REGISTERED_BY_THE_APP')
7250
qs = parse_qs(urlparse(self.path).query)
73-
if qs.get('code'): # Then store it into the server instance
74-
ac = self.server.authcode = qs['code'][0]
75-
self._send_full_response('Authcode:\n{}'.format(ac))
51+
if qs.get('code') or qs.get("error"): # So, it is an auth response
52+
# Then store it into the server instance
53+
self.server.auth_response = qs
54+
logger.debug("Got auth response: %s", qs)
55+
self._send_full_response(
56+
'Authentication complete. You can close this window.')
7657
# NOTE: Don't do self.server.shutdown() here. It'll halt the server.
7758
elif qs.get('text') and qs.get('link'): # Then display a landing page
7859
self._send_full_response(
@@ -91,10 +72,114 @@ def _send_full_response(self, body, is_ok=True):
9172
self.wfile.write(body.encode("utf-8"))
9273

9374

75+
class _AuthCodeHttpServer(HTTPServer):
76+
def handle_timeout(self):
77+
# It will be triggered when no request comes in self.timeout seconds.
78+
# See https://docs.python.org/3/library/socketserver.html#socketserver.BaseServer.handle_timeout
79+
raise RuntimeError("Timeout. No auth response arrived.") # Terminates this server
80+
# We choose to not call self.server_close() here,
81+
# because it would cause a socket.error exception in handle_request(),
82+
# and likely end up the server being server_close() twice.
83+
84+
85+
class _AuthCodeHttpServer6(_AuthCodeHttpServer):
86+
address_family = socket.AF_INET6
87+
88+
89+
class AuthCodeReceiver(object):
90+
# This class has (rather than is) an _AuthCodeHttpServer, so it does not leak API
91+
def __init__(self, port=None):
92+
"""Create a Receiver waiting for incoming auth response.
93+
94+
:param port:
95+
The local web server will listen at http://...:<port>
96+
You need to use the same port when you register with your app.
97+
If your Identity Provider supports dynamic port, you can use port=0 here.
98+
Port 0 means to use an arbitrary unused port, per this official example:
99+
https://docs.python.org/2.7/library/socketserver.html#asynchronous-mixins
100+
"""
101+
address = "127.0.0.1" # Hardcode, for now, Not sure what to expose, yet.
102+
# Per RFC 8252 (https://tools.ietf.org/html/rfc8252#section-8.3):
103+
# * Clients should listen on the loopback network interface only.
104+
# (It is not recommended to use "" shortcut to bind all addr.)
105+
# * the use of localhost is NOT RECOMMENDED.
106+
# (Use) the loopback IP literal
107+
# rather than localhost avoids inadvertently listening on network
108+
# interfaces other than the loopback interface.
109+
# Note:
110+
# When this server physically listens to a specific IP (as it should),
111+
# you will still be able to specify your redirect_uri using either
112+
# IP (e.g. 127.0.0.1) or localhost, whichever matches your registration.
113+
Server = _AuthCodeHttpServer6 if ":" in address else _AuthCodeHttpServer
114+
# TODO: But, it would treat "localhost" or "" as IPv4.
115+
# If pressed, we might just expose a family parameter to caller.
116+
self._server = Server((address, port or 0), _AuthCodeHandler)
117+
118+
def get_port(self):
119+
"""The port this server actually listening to"""
120+
# https://docs.python.org/2.7/library/socketserver.html#SocketServer.BaseServer.server_address
121+
return self._server.server_address[1]
122+
123+
def get_auth_response(self, auth_uri=None, text=None, timeout=None, state=None):
124+
"""Wait and return the auth response, or None when timeout.
125+
126+
:param str auth_uri:
127+
If provided, this function will try to open a local browser.
128+
:param str text:
129+
If provided (together with auth_uri),
130+
this function will render a landing page with ``text`` in your browser.
131+
This can be used to make testing more readable.
132+
:param int timeout: In seconds. None means wait indefinitely.
133+
:param str state:
134+
You may provide the state you used in auth_url,
135+
then we will use it to validate incoming response.
136+
:return:
137+
The auth response of the first leg of Auth Code flow,
138+
typically {"code": "...", "state": "..."} or {"error": "...", ...}
139+
See https://tools.ietf.org/html/rfc6749#section-4.1.2
140+
and https://openid.net/specs/openid-connect-core-1_0.html#AuthResponse
141+
Returns None when the state was mismatched, or when timeout occurred.
142+
"""
143+
location = "http://localhost:{p}".format(p=self.get_port()) # For testing
144+
exit_hint = "Abort by visit {loc}?error=abort".format(loc=location)
145+
logger.debug(exit_hint)
146+
if auth_uri:
147+
page = "{loc}?{q}".format(loc=location, q=urlencode({
148+
"text": text,
149+
"link": auth_uri,
150+
"exit_hint": exit_hint,
151+
})) if text else auth_uri
152+
_browse(page)
153+
154+
self._server.timeout = timeout # Otherwise its handle_timeout() won't work
155+
self._server.auth_response = {} # Shared with _AuthCodeHandler
156+
while True:
157+
# Derived from
158+
# https://docs.python.org/2/library/basehttpserver.html#more-examples
159+
self._server.handle_request()
160+
if self._server.auth_response:
161+
if state and state != self._server.auth_response.get("state", [None])[0]:
162+
logger.debug("State mismatch. Ignoring this noise.")
163+
else:
164+
break
165+
return { # Normalize unnecessary lists into single values
166+
k: v[0] if isinstance(v, list) and len(v) == 1 else v
167+
for k, v in self._server.auth_response.items()}
168+
169+
def close(self):
170+
"""Either call this eventually; or use the entire class as context manager"""
171+
self._server.server_close()
172+
173+
def __enter__(self):
174+
return self
175+
176+
def __exit__(self, exc_type, exc_val, exc_tb):
177+
self.close()
178+
94179
# Note: Manually use or test this module by:
95180
# python -m path.to.this.file -h
96181
if __name__ == '__main__':
97-
import argparse
182+
import argparse, json
98183
from .oauth2 import Client
99184
logging.basicConfig(level=logging.INFO)
100185
p = parser = argparse.ArgumentParser(
@@ -104,11 +189,19 @@ def _send_full_response(self, body, is_ok=True):
104189
'--endpoint', help="The auth endpoint for your app.",
105190
default="https://login.microsoftonline.com/common/oauth2/v2.0/authorize")
106191
p.add_argument('client_id', help="The client_id of your application")
107-
p.add_argument('--port', type=int, default=8000, help="The port in redirect_uri")
192+
p.add_argument('--port', type=int, default=0, help="The port in redirect_uri")
193+
p.add_argument('--host', default="127.0.0.1", help="The host of redirect_uri")
108194
p.add_argument('--scope', default=None, help="The scope list")
109195
args = parser.parse_args()
110196
client = Client({"authorization_endpoint": args.endpoint}, args.client_id)
111-
auth_uri = client.build_auth_request_uri(
112-
"code", scope=args.scope, redirect_uri="http://localhost:%d" % args.port)
113-
print(obtain_auth_code(args.port, auth_uri))
197+
with AuthCodeReceiver(port=args.port) as receiver:
198+
auth_uri = client.build_auth_request_uri(
199+
"code",
200+
scope=args.scope.split() if args.scope else None,
201+
redirect_uri="http://{h}:{p}".format(h=args.host, p=receiver.get_port()))
202+
print(json.dumps(receiver.get_auth_response(
203+
auth_uri=auth_uri,
204+
text="Open this link to sign in. You may use incognito window",
205+
timeout=60,
206+
), indent=4))
114207

0 commit comments

Comments
 (0)