8
8
import webbrowser
9
9
import logging
10
10
import socket
11
+ from string import Template
11
12
12
13
try : # Python 3
13
14
from http .server import HTTPServer , BaseHTTPRequestHandler
21
22
logger = logging .getLogger (__name__ )
22
23
23
24
24
- def obtain_auth_code (listen_port , auth_uri = None ): # For backward compatibility
25
+ def obtain_auth_code (listen_port , auth_uri = None ): # Historically only used in testing
25
26
with AuthCodeReceiver (port = listen_port ) as receiver :
26
27
return receiver .get_auth_response (
27
28
auth_uri = auth_uri ,
28
- text = "Open this link to sign in. You may use incognito window" ,
29
+ welcome_template = """<html><body>
30
+ Open this link to <a href='$auth_uri'>Sign In</a>
31
+ (You may want to use incognito window)
32
+ <hr><a href='$abort_uri'>Abort</a>
33
+ </body></html>""" ,
29
34
).get ("code" )
30
35
31
36
@@ -43,26 +48,27 @@ def _browse(auth_uri):
43
48
controller .open (auth_uri )
44
49
45
50
51
+ def _qs2kv (qs ):
52
+ """Flatten parse_qs()'s single-item lists into the item itself"""
53
+ return {k : v [0 ] if isinstance (v , list ) and len (v ) == 1 else v
54
+ for k , v in qs .items ()}
55
+
56
+
46
57
class _AuthCodeHandler (BaseHTTPRequestHandler ):
47
58
def do_GET (self ):
48
59
# For flexibility, we choose to not check self.path matching redirect_uri
49
60
#assert self.path.startswith('/THE_PATH_REGISTERED_BY_THE_APP')
50
61
qs = parse_qs (urlparse (self .path ).query )
51
62
if qs .get ('code' ) or qs .get ("error" ): # So, it is an auth response
52
- # Then store it into the server instance
53
- self .server .auth_response = qs
54
- logger .debug ("Got auth response: %s" , qs )
63
+ self .server .auth_response = _qs2kv (qs )
64
+ logger .debug ("Got auth response: %s" , self .server .auth_response )
65
+ template = (self .server .success_template
66
+ if "code" in qs else self .server .error_template )
55
67
self ._send_full_response (
56
- 'Authentication complete. You can close this window.' )
68
+ template . safe_substitute ( ** self . server . auth_response ) )
57
69
# NOTE: Don't do self.server.shutdown() here. It'll halt the server.
58
- elif qs .get ('text' ) and qs .get ('link' ): # Then display a landing page
59
- self ._send_full_response (
60
- '<a href={link}>{text}</a><hr/>{exit_hint}' .format (
61
- link = qs ['link' ][0 ], text = qs ['text' ][0 ],
62
- exit_hint = qs .get ("exit_hint" , ['' ])[0 ],
63
- ))
64
70
else :
65
- self ._send_full_response ("This web service serves your redirect_uri" )
71
+ self ._send_full_response (self . server . welcome_page )
66
72
67
73
def _send_full_response (self , body , is_ok = True ):
68
74
self .send_response (200 if is_ok else 400 )
@@ -120,7 +126,8 @@ def get_port(self):
120
126
# https://docs.python.org/2.7/library/socketserver.html#SocketServer.BaseServer.server_address
121
127
return self ._server .server_address [1 ]
122
128
123
- def get_auth_response (self , auth_uri = None , text = None , timeout = None , state = None ):
129
+ def get_auth_response (self , auth_uri = None , text = None , timeout = None , state = None ,
130
+ welcome_template = None , success_template = None , error_template = None ):
124
131
"""Wait and return the auth response, or None when timeout.
125
132
126
133
:param str auth_uri:
@@ -133,23 +140,37 @@ def get_auth_response(self, auth_uri=None, text=None, timeout=None, state=None):
133
140
:param str state:
134
141
You may provide the state you used in auth_url,
135
142
then we will use it to validate incoming response.
143
+ :param str welcome_template:
144
+ If provided, your end user will see it instead of the auth_uri.
145
+ When present, it shall be a plaintext or html template following
146
+ `Python Template string syntax <https://docs.python.org/3/library/string.html#template-strings>`_,
147
+ and include some of these placeholders: $auth_uri and $abort_uri.
148
+ :param str success_template:
149
+ The page will be displayed when authentication was largely successful.
150
+ Placeholders can be any of these:
151
+ https://tools.ietf.org/html/rfc6749#section-5.1
152
+ :param str error_template:
153
+ The page will be displayed when authentication encountered error.
154
+ Placeholders can be any of these:
155
+ https://tools.ietf.org/html/rfc6749#section-5.2
136
156
:return:
137
157
The auth response of the first leg of Auth Code flow,
138
158
typically {"code": "...", "state": "..."} or {"error": "...", ...}
139
159
See https://tools.ietf.org/html/rfc6749#section-4.1.2
140
160
and https://openid.net/specs/openid-connect-core-1_0.html#AuthResponse
141
161
Returns None when the state was mismatched, or when timeout occurred.
142
162
"""
143
- location = "http://localhost:{p}" .format (p = self .get_port ()) # For testing
144
- exit_hint = "Abort by visit {loc}?error=abort" .format (loc = location )
145
- logger .debug (exit_hint )
163
+ welcome_uri = "http://localhost:{p}" .format (p = self .get_port ())
164
+ abort_uri = "{loc}?error=abort" .format (loc = welcome_uri )
165
+ logger .debug ("Abort by visit %s" , abort_uri )
166
+ self ._server .welcome_page = Template (welcome_template or "" ).safe_substitute (
167
+ auth_uri = auth_uri , abort_uri = abort_uri )
146
168
if auth_uri :
147
- page = "{loc}?{q}" .format (loc = location , q = urlencode ({
148
- "text" : text ,
149
- "link" : auth_uri ,
150
- "exit_hint" : exit_hint ,
151
- })) if text else auth_uri
152
- _browse (page )
169
+ _browse (welcome_uri if welcome_template else auth_uri )
170
+ self ._server .success_template = Template (success_template or
171
+ "Authentication completed. You can close this window now." )
172
+ self ._server .error_template = Template (error_template or
173
+ "Authentication failed. $error: $error_description. ($error_uri)" )
153
174
154
175
self ._server .timeout = timeout # Otherwise its handle_timeout() won't work
155
176
self ._server .auth_response = {} # Shared with _AuthCodeHandler
@@ -158,13 +179,11 @@ def get_auth_response(self, auth_uri=None, text=None, timeout=None, state=None):
158
179
# https://docs.python.org/2/library/basehttpserver.html#more-examples
159
180
self ._server .handle_request ()
160
181
if self ._server .auth_response :
161
- if state and state != self ._server .auth_response .get ("state" , [ None ])[ 0 ] :
182
+ if state and state != self ._server .auth_response .get ("state" ) :
162
183
logger .debug ("State mismatch. Ignoring this noise." )
163
184
else :
164
185
break
165
- return { # Normalize unnecessary lists into single values
166
- k : v [0 ] if isinstance (v , list ) and len (v ) == 1 else v
167
- for k , v in self ._server .auth_response .items ()}
186
+ return self ._server .auth_response
168
187
169
188
def close (self ):
170
189
"""Either call this eventually; or use the entire class as context manager"""
@@ -201,7 +220,10 @@ def __exit__(self, exc_type, exc_val, exc_tb):
201
220
redirect_uri = "http://{h}:{p}" .format (h = args .host , p = receiver .get_port ()))
202
221
print (json .dumps (receiver .get_auth_response (
203
222
auth_uri = auth_uri ,
204
- text = "Open this link to sign in. You may use incognito window" ,
223
+ welcome_template =
224
+ "<a href='$auth_uri'>Sign In</a>, or <a href='$abort_uri'>Abort</a" ,
225
+ error_template = "Oh no. $error" ,
226
+ success_template = "Oh yeah. Got $code" ,
205
227
timeout = 60 ,
206
228
), indent = 4 ))
207
229
0 commit comments