4
4
from asyncio import Future
5
5
from datetime import datetime , timedelta , timezone
6
6
import logging
7
+ import httpx
7
8
8
9
from kiota_abstractions .serialization .parsable import Parsable
9
10
from kiota_abstractions .method import Method
10
11
from kiota_abstractions .headers_collection import HeadersCollection
11
12
from kiota_abstractions .request_information import RequestInformation
13
+ from kiota_abstractions .native_response_handler import NativeResponseHandler
12
14
from kiota_abstractions .serialization .additional_data_holder import AdditionalDataHolder
13
15
from kiota_abstractions .serialization .parsable_factory import ParsableFactory
16
+ from msgraph .generated .models .attachment_item import AttachmentItem
14
17
15
18
from kiota_abstractions .request_adapter import RequestAdapter
16
19
@@ -25,7 +28,7 @@ def __init__(
25
28
request_adapter : RequestAdapter ,
26
29
stream : BytesIO ,
27
30
parsable_factory : Optional [ParsableFactory ] = None ,
28
- max_chunk_size : int = 5 * 1024 * 1024
31
+ max_chunk_size : int = 3 * 1024 * 1024 #5 - 409600
29
32
):
30
33
self ._upload_session = upload_session
31
34
self ._request_adapter = request_adapter
@@ -101,40 +104,45 @@ async def upload(self, after_chunk_upload: Optional[Callable] = None):
101
104
end = min (int (range_parts [0 ]) + self .max_chunk_size - 1 , self .file_size )
102
105
uploaded_range = [range_parts [0 ], end ]
103
106
response = None
107
+ upload_result = UploadResult ()
104
108
105
109
while self .chunks >= 0 :
106
110
session = process_next
107
- print (f"Chunks for upload : { self .chunks } " )
108
111
if self .chunks == 0 :
109
112
# last chunk
110
- print (f"Last chunk: { self .chunks } upload stated" )
111
113
response = await self .last_chunk (self .stream )
112
- print ("Last chunk response: received" )
113
-
114
- try :
115
- lfu_session : LargeFileUploadSession = session # type: ignore
116
- if lfu_session is None :
117
- continue
118
- next_range = lfu_session .next_expected_ranges
119
- old_url = self .get_validated_upload_url (self .upload_session )
120
- lfu_session .upload_url = old_url
121
- if self .on_chunk_upload_complete is not None :
122
- self .on_chunk_upload_complete (uploaded_range )
123
- if not next_range :
124
- continue
125
- range_parts = str (next_range [0 ]).split ("-" )
126
- end = min (int (range_parts [0 ]) + self .max_chunk_size , self .file_size )
127
- uploaded_range = [range_parts [0 ], end ]
128
- self .next_range = next_range [0 ] + "-"
129
- process_next = await self .next_chunk (self .stream )
130
-
131
- except Exception as error :
132
- logging .error ("Error uploading chunk %s" , error )
133
- finally :
134
- self .chunks -= 1
114
+ # upload_result.location = response.headers[
115
+ # 'Location'] # to be fixed for attachement item Response
116
+
117
+ if isinstance (session , LargeFileUploadSession ):
118
+ try :
119
+ lfu_session : LargeFileUploadSession = session # type: ignore
120
+ if lfu_session is None :
121
+ continue
122
+ next_range = lfu_session .next_expected_ranges
123
+ old_url = self .get_validated_upload_url (self .upload_session )
124
+ lfu_session .upload_url = old_url
125
+ if self .on_chunk_upload_complete is not None :
126
+ self .on_chunk_upload_complete (uploaded_range )
127
+ if not next_range :
128
+ continue
129
+ range_parts = str (next_range [0 ]).split ("-" )
130
+ end = min (int (range_parts [0 ]) + self .max_chunk_size , self .file_size )
131
+ uploaded_range = [range_parts [0 ], end ]
132
+ self .next_range = next_range [0 ] + "-"
133
+ process_next = await self .next_chunk (self .stream )
134
+
135
+ except Exception as error :
136
+ logging .error ("Error uploading chunk %s" , error )
137
+ finally :
138
+ self .chunks -= 1
139
+ else :
140
+ response = session
141
+ logging .info (f"Response headers: { response .headers } " )
142
+ logging .info (f"Response content: { response .content } " )
143
+ break
135
144
upload_result = UploadResult ()
136
145
upload_result .item_response = response
137
- upload_result .location = self .upload_session .upload_url
138
146
return upload_result
139
147
140
148
@property
@@ -177,6 +185,13 @@ async def next_chunk(self, file: BytesIO, range_start: int = 0, range_end: int =
177
185
info .headers .try_add ("Content-Type" , "application/octet-stream" )
178
186
info .set_stream_content (bytes (chunk_data ))
179
187
error_map : Dict [str , int ] = {}
188
+ if self .factory is AttachmentItem :
189
+ headers = {key : ', ' .join (value ) for key , value in info .headers .get_all ().items ()}
190
+ async with httpx .AsyncClient () as client :
191
+ response = await client .put (
192
+ self .upload_session .upload_url , headers = headers , data = info .content
193
+ )
194
+ return response
180
195
parsable_factory = LargeFileUploadSession
181
196
return await self .request_adapter .send_async (info , parsable_factory , error_map )
182
197
@@ -218,6 +233,14 @@ async def last_chunk(
218
233
info .headers .try_add ("Content-Type" , "application/octet-stream" )
219
234
info .set_stream_content (bytes (chunk_data ))
220
235
error_map : Dict [str , int ] = {}
236
+ if self .factory is AttachmentItem :
237
+ headers = {key : ', ' .join (value ) for key , value in info .headers .get_all ().items ()}
238
+
239
+ async with httpx .AsyncClient () as client :
240
+ response = await client .put (
241
+ self .upload_session .upload_url , headers = headers , data = info .content
242
+ )
243
+ return response
221
244
parsable_factory = self .factory or parsable_factory
222
245
return await self .request_adapter .send_async (info , parsable_factory , error_map )
223
246
0 commit comments