@@ -150,20 +150,32 @@ class FormData(_IXSSSafeFieldStorage):
150
150
_storage : Dict [str , List [Union [str , bytes ]]]
151
151
files : Files
152
152
153
- def __init__ (self , data : bytes , content_type : str ) -> None :
154
- self .content_type = content_type
153
+ def _check_is_supported_content_type (self , content_type : str ) -> None :
154
+ return content_type in (
155
+ "application/x-www-form-urlencoded" ,
156
+ "multipart/form-data" ,
157
+ "text/plain" ,
158
+ )
159
+
160
+ def __init__ (self , data : bytes , headers : Headers , * , debug : bool = False ) -> None :
155
161
self ._storage = {}
156
162
self .files = Files ()
157
163
158
- if content_type .startswith ("application/x-www-form-urlencoded" ):
159
- self ._parse_x_www_form_urlencoded (data )
164
+ self .content_type = headers .get_directive ("Content-Type" )
165
+ content_length = int (headers .get ("Content-Length" , 0 ))
166
+
167
+ if not self ._check_is_supported_content_type (self .content_type ):
168
+ debug and _debug_unsupported_form_content_type (self .content_type )
160
169
161
- elif content_type .startswith ("multipart/form-data" ):
162
- boundary = content_type .split ("boundary=" )[1 ]
163
- self ._parse_multipart_form_data (data , boundary )
170
+ if self .content_type == "application/x-www-form-urlencoded" :
171
+ self ._parse_x_www_form_urlencoded (data [:content_length ])
164
172
165
- elif content_type .startswith ("text/plain" ):
166
- self ._parse_text_plain (data )
173
+ elif self .content_type == "multipart/form-data" :
174
+ boundary = headers .get_parameter ("Content-Type" , "boundary" )
175
+ self ._parse_multipart_form_data (data [:content_length ], boundary )
176
+
177
+ elif self .content_type == "text/plain" :
178
+ self ._parse_text_plain (data [:content_length ])
167
179
168
180
def _parse_x_www_form_urlencoded (self , data : bytes ) -> None :
169
181
if not (decoded_data := data .decode ("utf-8" ).strip ("&" )):
@@ -393,7 +405,7 @@ def form_data(self) -> Union[FormData, None]:
393
405
request.form_data.get_list("baz") # ["qux"]
394
406
"""
395
407
if self ._form_data is None and self .method == "POST" :
396
- self ._form_data = FormData (self .body , self .headers [ "Content-Type" ] )
408
+ self ._form_data = FormData (self .body , self .headers , debug = self . server . debug )
397
409
return self ._form_data
398
410
399
411
def json (self ) -> Union [dict , None ]:
@@ -435,3 +447,11 @@ def _parse_request_header(
435
447
headers = Headers (headers_string )
436
448
437
449
return method , path , query_params , http_version , headers
450
+
451
+
452
+ def _debug_unsupported_form_content_type (content_type : str ) -> None :
453
+ """Warns when an unsupported form content type is used."""
454
+ print (
455
+ f"WARNING: Unsupported Content-Type: { content_type } . "
456
+ "Only `application/x-www-form-urlencoded`, `multipart/form-data` and `text/plain` are supported."
457
+ )
0 commit comments