10
10
from kiota_abstractions .headers_collection import HeadersCollection
11
11
from kiota_abstractions .request_information import RequestInformation
12
12
from kiota_abstractions .serialization .additional_data_holder import AdditionalDataHolder
13
+ from kiota_abstractions .serialization .parsable_factory import ParsableFactory
13
14
14
15
from kiota_abstractions .request_adapter import RequestAdapter
15
16
@@ -23,7 +24,8 @@ def __init__(
23
24
upload_session : Parsable ,
24
25
request_adapter : RequestAdapter ,
25
26
stream : BytesIO ,
26
- max_chunk_size : int = 5 * 1024 * 1024
27
+ parsable_factory : Optional [ParsableFactory ] = None ,
28
+ max_chunk_size : int = 409600 #5 * 1024 * 1024
27
29
):
28
30
self ._upload_session = upload_session
29
31
self ._request_adapter = request_adapter
@@ -33,6 +35,7 @@ def __init__(
33
35
except AttributeError :
34
36
self .file_size = os .stat (stream .name ).st_size
35
37
self .max_chunk_size = max_chunk_size
38
+ self .factory = parsable_factory
36
39
cleaned_value = self .check_value_exists (
37
40
upload_session , 'get_next_expected_range' , ['next_expected_range' , 'NextExpectedRange' ]
38
41
)
@@ -97,8 +100,17 @@ async def upload(self, after_chunk_upload: Optional[Callable] = None):
97
100
range_parts = self .next_range [0 ].split ("-" ) if self .next_range else ['0' , '0' ]
98
101
end = min (int (range_parts [0 ]) + self .max_chunk_size - 1 , self .file_size )
99
102
uploaded_range = [range_parts [0 ], end ]
100
- while self .chunks > 0 :
103
+ response = None
104
+
105
+ while self .chunks >= 0 :
101
106
session = process_next
107
+ print (f"Chunks for upload : { self .chunks } " )
108
+ if self .chunks == 0 :
109
+ # last chunk
110
+ print (f"Last chunk: { self .chunks } upload stated" )
111
+ response = await self .last_chunk (self .stream )
112
+ print ("Last chunk response: received" )
113
+
102
114
try :
103
115
lfu_session : LargeFileUploadSession = session # type: ignore
104
116
if lfu_session is None :
@@ -115,17 +127,13 @@ async def upload(self, after_chunk_upload: Optional[Callable] = None):
115
127
uploaded_range = [range_parts [0 ], end ]
116
128
self .next_range = next_range [0 ] + "-"
117
129
process_next = await self .next_chunk (self .stream )
130
+
118
131
except Exception as error :
119
132
logging .error ("Error uploading chunk %s" , error )
120
133
finally :
121
134
self .chunks -= 1
122
135
upload_result = UploadResult ()
123
- upload_result .upload_session = UploadSessionDataHolder (
124
- expiration_date_time = self .upload_session .expiration_date_time ,
125
- next_expected_ranges = self .upload_session .next_expected_ranges ,
126
- upload_url = self .upload_session .upload_url
127
- )
128
- upload_result .item_response = session
136
+ upload_result .item_response = response
129
137
upload_result .location = self .upload_session .upload_url
130
138
return upload_result
131
139
@@ -172,6 +180,47 @@ async def next_chunk(self, file: BytesIO, range_start: int = 0, range_end: int =
172
180
parsable_factory = LargeFileUploadSession
173
181
return await self .request_adapter .send_async (info , parsable_factory , error_map )
174
182
183
+ async def last_chunk (
184
+ self ,
185
+ file : BytesIO ,
186
+ range_start : int = 0 ,
187
+ range_end : int = 0 ,
188
+ parsable_factory : Optional [ParsableFactory ] = None
189
+ ) -> Future :
190
+ upload_url = self .get_validated_upload_url (self .upload_session )
191
+ if not upload_url :
192
+ raise ValueError ('The upload session URL must not be empty.' )
193
+ info = RequestInformation ()
194
+ info .url = upload_url
195
+ info .http_method = Method .PUT
196
+ if not self .next_range :
197
+ self .next_range = f'{ range_start } -{ range_end } '
198
+ range_parts = self .next_range .split ('-' ) if self .next_range else ['-' ]
199
+ start = int (range_parts [0 ])
200
+ end = int (range_parts [1 ]) if len (range_parts ) > 1 else 0
201
+ if start == 0 and end == 0 :
202
+ chunk_data = file .read (self .max_chunk_size )
203
+ end = min (self .max_chunk_size - 1 , self .file_size - 1 )
204
+ elif start == 0 :
205
+ chunk_data = file .read (end + 1 )
206
+ elif end == 0 :
207
+ file .seek (start )
208
+ chunk_data = file .read (self .max_chunk_size )
209
+ end = start + len (chunk_data ) - 1
210
+ else :
211
+ file .seek (start )
212
+ end = min (end , self .max_chunk_size + start )
213
+ chunk_data = file .read (end - start + 1 )
214
+ info .headers = HeadersCollection ()
215
+
216
+ info .headers .try_add ('Content-Range' , f'bytes { start } -{ end } /{ self .file_size } ' )
217
+ info .headers .try_add ('Content-Length' , str (len (chunk_data )))
218
+ info .headers .try_add ("Content-Type" , "application/octet-stream" )
219
+ info .set_stream_content (bytes (chunk_data ))
220
+ error_map : Dict [str , int ] = {}
221
+ parsable_factory = self .factory or parsable_factory
222
+ return await self .request_adapter .send_async (info , parsable_factory , error_map )
223
+
175
224
def get_file (self ) -> BytesIO :
176
225
return self .stream
177
226
0 commit comments