5
5
It optionally opens a browser window to guide a human user to manually login.
6
6
After obtaining an auth code, the web server will automatically shut down.
7
7
"""
8
-
9
- import argparse
10
8
import webbrowser
11
9
import logging
10
+ import socket
11
+ from string import Template
12
12
13
13
try : # Python 3
14
14
from http .server import HTTPServer , BaseHTTPRequestHandler
18
18
from urlparse import urlparse , parse_qs
19
19
from urllib import urlencode
20
20
21
- from .oauth2 import Client
22
-
23
21
24
22
logger = logging .getLogger (__name__ )
25
23
26
- def obtain_auth_code (listen_port , auth_uri = None ):
27
- """This function will start a web server listening on http://localhost:port
28
- and then you need to open a browser on this device and visit your auth_uri.
29
- When interaction finishes, this function will return the auth code,
30
- and then shut down the local web server.
31
-
32
- :param listen_port:
33
- The local web server will listen at http://localhost:<listen_port>
34
- Unless the authorization server supports dynamic port,
35
- you need to use the same port when you register with your app.
36
- :param auth_uri: If provided, this function will try to open a local browser.
37
- :return: Hang indefinitely, until it receives and then return the auth code.
38
- """
39
- exit_hint = "Visit http://localhost:{p}?code=exit to abort" .format (p = listen_port )
40
- logger .warning (exit_hint )
41
- if auth_uri :
42
- page = "http://localhost:{p}?{q}" .format (p = listen_port , q = urlencode ({
43
- "text" : "Open this link to sign in. You may use incognito window" ,
44
- "link" : auth_uri ,
45
- "exit_hint" : exit_hint ,
46
- }))
47
- browse (page )
48
- server = HTTPServer (("" , int (listen_port )), AuthCodeReceiver )
49
- try :
50
- server .authcode = None
51
- while not server .authcode :
52
- # Derived from
53
- # https://docs.python.org/2/library/basehttpserver.html#more-examples
54
- server .handle_request ()
55
- return server .authcode
56
- finally :
57
- server .server_close ()
58
24
59
- def browse (auth_uri ):
25
+ def obtain_auth_code (listen_port , auth_uri = None ): # Historically only used in testing
26
+ with AuthCodeReceiver (port = listen_port ) as receiver :
27
+ return receiver .get_auth_response (
28
+ auth_uri = auth_uri ,
29
+ welcome_template = """<html><body>
30
+ Open this link to <a href='$auth_uri'>Sign In</a>
31
+ (You may want to use incognito window)
32
+ <hr><a href='$abort_uri'>Abort</a>
33
+ </body></html>""" ,
34
+ ).get ("code" )
35
+
36
+
37
+ def _browse (auth_uri ):
60
38
controller = webbrowser .get () # Get a default controller
61
39
# Some Linux Distro does not setup default browser properly,
62
40
# so we try to explicitly use some popular browser, if we found any.
@@ -69,23 +47,28 @@ def browse(auth_uri):
69
47
logger .info ("Please open a browser on THIS device to visit: %s" % auth_uri )
70
48
controller .open (auth_uri )
71
49
72
- class AuthCodeReceiver (BaseHTTPRequestHandler ):
50
+
51
+ def _qs2kv (qs ):
52
+ """Flatten parse_qs()'s single-item lists into the item itself"""
53
+ return {k : v [0 ] if isinstance (v , list ) and len (v ) == 1 else v
54
+ for k , v in qs .items ()}
55
+
56
+
57
+ class _AuthCodeHandler (BaseHTTPRequestHandler ):
73
58
def do_GET (self ):
74
59
# For flexibility, we choose to not check self.path matching redirect_uri
75
60
#assert self.path.startswith('/THE_PATH_REGISTERED_BY_THE_APP')
76
61
qs = parse_qs (urlparse (self .path ).query )
77
- if qs .get ('code' ): # Then store it into the server instance
78
- ac = self .server .authcode = qs [ 'code' ][ 0 ]
79
- self . _send_full_response ( 'Authcode: \n {}' . format ( ac ) )
80
- # NOTE: Don't do self.server.shutdown() here. It'll halt the server.
81
- elif qs . get ( 'text' ) and qs . get ( 'link' ): # Then display a landing page
62
+ if qs .get ('code' ) or qs . get ( "error" ) : # So, it is an auth response
63
+ self .server .auth_response = _qs2kv ( qs )
64
+ logger . debug ( "Got auth response: %s" , self . server . auth_response )
65
+ template = ( self .server .success_template
66
+ if "code" in qs else self . server . error_template )
82
67
self ._send_full_response (
83
- '<a href={link}>{text}</a><hr/>{exit_hint}' .format (
84
- link = qs ['link' ][0 ], text = qs ['text' ][0 ],
85
- exit_hint = qs .get ("exit_hint" , ['' ])[0 ],
86
- ))
68
+ template .safe_substitute (** self .server .auth_response ))
69
+ # NOTE: Don't do self.server.shutdown() here. It'll halt the server.
87
70
else :
88
- self ._send_full_response ("This web service serves your redirect_uri" )
71
+ self ._send_full_response (self . server . welcome_page )
89
72
90
73
def _send_full_response (self , body , is_ok = True ):
91
74
self .send_response (200 if is_ok else 400 )
@@ -95,17 +78,152 @@ def _send_full_response(self, body, is_ok=True):
95
78
self .wfile .write (body .encode ("utf-8" ))
96
79
97
80
81
+ class _AuthCodeHttpServer (HTTPServer ):
82
+ def handle_timeout (self ):
83
+ # It will be triggered when no request comes in self.timeout seconds.
84
+ # See https://docs.python.org/3/library/socketserver.html#socketserver.BaseServer.handle_timeout
85
+ raise RuntimeError ("Timeout. No auth response arrived." ) # Terminates this server
86
+ # We choose to not call self.server_close() here,
87
+ # because it would cause a socket.error exception in handle_request(),
88
+ # and likely end up the server being server_close() twice.
89
+
90
+
91
+ class _AuthCodeHttpServer6 (_AuthCodeHttpServer ):
92
+ address_family = socket .AF_INET6
93
+
94
+
95
+ class AuthCodeReceiver (object ):
96
+ # This class has (rather than is) an _AuthCodeHttpServer, so it does not leak API
97
+ def __init__ (self , port = None ):
98
+ """Create a Receiver waiting for incoming auth response.
99
+
100
+ :param port:
101
+ The local web server will listen at http://...:<port>
102
+ You need to use the same port when you register with your app.
103
+ If your Identity Provider supports dynamic port, you can use port=0 here.
104
+ Port 0 means to use an arbitrary unused port, per this official example:
105
+ https://docs.python.org/2.7/library/socketserver.html#asynchronous-mixins
106
+ """
107
+ address = "127.0.0.1" # Hardcode, for now, Not sure what to expose, yet.
108
+ # Per RFC 8252 (https://tools.ietf.org/html/rfc8252#section-8.3):
109
+ # * Clients should listen on the loopback network interface only.
110
+ # (It is not recommended to use "" shortcut to bind all addr.)
111
+ # * the use of localhost is NOT RECOMMENDED.
112
+ # (Use) the loopback IP literal
113
+ # rather than localhost avoids inadvertently listening on network
114
+ # interfaces other than the loopback interface.
115
+ # Note:
116
+ # When this server physically listens to a specific IP (as it should),
117
+ # you will still be able to specify your redirect_uri using either
118
+ # IP (e.g. 127.0.0.1) or localhost, whichever matches your registration.
119
+ Server = _AuthCodeHttpServer6 if ":" in address else _AuthCodeHttpServer
120
+ # TODO: But, it would treat "localhost" or "" as IPv4.
121
+ # If pressed, we might just expose a family parameter to caller.
122
+ self ._server = Server ((address , port or 0 ), _AuthCodeHandler )
123
+
124
+ def get_port (self ):
125
+ """The port this server actually listening to"""
126
+ # https://docs.python.org/2.7/library/socketserver.html#SocketServer.BaseServer.server_address
127
+ return self ._server .server_address [1 ]
128
+
129
+ def get_auth_response (self , auth_uri = None , text = None , timeout = None , state = None ,
130
+ welcome_template = None , success_template = None , error_template = None ):
131
+ """Wait and return the auth response, or None when timeout.
132
+
133
+ :param str auth_uri:
134
+ If provided, this function will try to open a local browser.
135
+ :param str text:
136
+ If provided (together with auth_uri),
137
+ this function will render a landing page with ``text`` in your browser.
138
+ This can be used to make testing more readable.
139
+ :param int timeout: In seconds. None means wait indefinitely.
140
+ :param str state:
141
+ You may provide the state you used in auth_url,
142
+ then we will use it to validate incoming response.
143
+ :param str welcome_template:
144
+ If provided, your end user will see it instead of the auth_uri.
145
+ When present, it shall be a plaintext or html template following
146
+ `Python Template string syntax <https://docs.python.org/3/library/string.html#template-strings>`_,
147
+ and include some of these placeholders: $auth_uri and $abort_uri.
148
+ :param str success_template:
149
+ The page will be displayed when authentication was largely successful.
150
+ Placeholders can be any of these:
151
+ https://tools.ietf.org/html/rfc6749#section-5.1
152
+ :param str error_template:
153
+ The page will be displayed when authentication encountered error.
154
+ Placeholders can be any of these:
155
+ https://tools.ietf.org/html/rfc6749#section-5.2
156
+ :return:
157
+ The auth response of the first leg of Auth Code flow,
158
+ typically {"code": "...", "state": "..."} or {"error": "...", ...}
159
+ See https://tools.ietf.org/html/rfc6749#section-4.1.2
160
+ and https://openid.net/specs/openid-connect-core-1_0.html#AuthResponse
161
+ Returns None when the state was mismatched, or when timeout occurred.
162
+ """
163
+ welcome_uri = "http://localhost:{p}" .format (p = self .get_port ())
164
+ abort_uri = "{loc}?error=abort" .format (loc = welcome_uri )
165
+ logger .debug ("Abort by visit %s" , abort_uri )
166
+ self ._server .welcome_page = Template (welcome_template or "" ).safe_substitute (
167
+ auth_uri = auth_uri , abort_uri = abort_uri )
168
+ if auth_uri :
169
+ _browse (welcome_uri if welcome_template else auth_uri )
170
+ self ._server .success_template = Template (success_template or
171
+ "Authentication completed. You can close this window now." )
172
+ self ._server .error_template = Template (error_template or
173
+ "Authentication failed. $error: $error_description. ($error_uri)" )
174
+
175
+ self ._server .timeout = timeout # Otherwise its handle_timeout() won't work
176
+ self ._server .auth_response = {} # Shared with _AuthCodeHandler
177
+ while True :
178
+ # Derived from
179
+ # https://docs.python.org/2/library/basehttpserver.html#more-examples
180
+ self ._server .handle_request ()
181
+ if self ._server .auth_response :
182
+ if state and state != self ._server .auth_response .get ("state" ):
183
+ logger .debug ("State mismatch. Ignoring this noise." )
184
+ else :
185
+ break
186
+ return self ._server .auth_response
187
+
188
+ def close (self ):
189
+ """Either call this eventually; or use the entire class as context manager"""
190
+ self ._server .server_close ()
191
+
192
+ def __enter__ (self ):
193
+ return self
194
+
195
+ def __exit__ (self , exc_type , exc_val , exc_tb ):
196
+ self .close ()
197
+
198
+ # Note: Manually use or test this module by:
199
+ # python -m path.to.this.file -h
98
200
if __name__ == '__main__' :
201
+ import argparse , json
202
+ from .oauth2 import Client
99
203
logging .basicConfig (level = logging .INFO )
100
204
p = parser = argparse .ArgumentParser (
205
+ formatter_class = argparse .ArgumentDefaultsHelpFormatter ,
101
206
description = __doc__ + "The auth code received will be shown at stdout." )
102
- p .add_argument ('endpoint' ,
103
- help = "The auth endpoint for your app. For example: "
104
- "https://login.microsoftonline.com/your_tenant /oauth2/authorize" )
207
+ p .add_argument (
208
+ '--endpoint' , help = "The auth endpoint for your app." ,
209
+ default = "https://login.microsoftonline.com/common /oauth2/v2.0 /authorize" )
105
210
p .add_argument ('client_id' , help = "The client_id of your application" )
106
- p .add_argument ('redirect_port' , type = int , help = "The port in redirect_uri" )
211
+ p .add_argument ('--port' , type = int , default = 0 , help = "The port in redirect_uri" )
212
+ p .add_argument ('--host' , default = "127.0.0.1" , help = "The host of redirect_uri" )
213
+ p .add_argument ('--scope' , default = None , help = "The scope list" )
107
214
args = parser .parse_args ()
108
- client = Client (args .client_id , authorization_endpoint = args .endpoint )
109
- auth_uri = client .build_auth_request_uri ("code" )
110
- print (obtain_auth_code (args .redirect_port , auth_uri ))
215
+ client = Client ({"authorization_endpoint" : args .endpoint }, args .client_id )
216
+ with AuthCodeReceiver (port = args .port ) as receiver :
217
+ auth_uri = client .build_auth_request_uri (
218
+ "code" ,
219
+ scope = args .scope .split () if args .scope else None ,
220
+ redirect_uri = "http://{h}:{p}" .format (h = args .host , p = receiver .get_port ()))
221
+ print (json .dumps (receiver .get_auth_response (
222
+ auth_uri = auth_uri ,
223
+ welcome_template =
224
+ "<a href='$auth_uri'>Sign In</a>, or <a href='$abort_uri'>Abort</a" ,
225
+ error_template = "Oh no. $error" ,
226
+ success_template = "Oh yeah. Got $code" ,
227
+ timeout = 60 ,
228
+ ), indent = 4 ))
111
229
0 commit comments