Skip to content

Commit 208d8c3

Browse files
committed
Merge pull request #94 from dtynn/wanglin/feature/simple_rio
Wanglin/feature/simple rio
2 parents 580f755 + b30d252 commit 208d8c3

File tree

1 file changed

+120
-132
lines changed

1 file changed

+120
-132
lines changed

qiniu/resumable_io.py

Lines changed: 120 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -1,172 +1,160 @@
1-
# -*- coding: utf-8 -*-
1+
#coding=utf-8
22
import os
33
try:
4-
import zlib as binascii
4+
import zlib as binascii
55
except ImportError:
6-
import binascii
6+
import binascii
77
from base64 import urlsafe_b64encode
88

9-
import auth.up
9+
from auth import up as auth_up
1010
import conf
1111

1212
_workers = 1
1313
_task_queue_size = _workers * 4
14-
_chunk_size = 256 * 1024
1514
_try_times = 3
16-
_block_size = 4 * 1024 * 1024
15+
_block_bits = 22
16+
_block_size = 1 << _block_bits
1717
_block_mask = _block_size - 1
18+
_chunk_size = _block_size # 简化模式,弃用
1819

19-
class Error(Exception):
20-
value = None
21-
def __init__(self, value):
22-
self.value = value
23-
def __str__(self):
24-
return self.value
2520

26-
err_invalid_put_progress = Error("invalid put progress")
27-
err_put_failed = Error("resumable put failed")
28-
err_unmatched_checksum = Error("unmatched checksum")
21+
class ResumableIoError(object):
22+
value = None
2923

30-
def setup(chunk_size=0, try_times=0):
31-
"""
32-
* chunk_size => 默认的Chunk大小,不设定则为256k
33-
* try_times => 默认的尝试次数,不设定则为3
34-
"""
35-
global _chunk_size, _try_times
24+
def __init__(self, value):
25+
self.value = value
26+
return
27+
28+
def __str__(self):
29+
return self.value
3630

37-
if chunk_size == 0:
38-
chunk_size = 1 << 18
3931

40-
if try_times == 0:
41-
try_times = 3
32+
err_invalid_put_progress = ResumableIoError("invalid put progress")
33+
err_put_failed = ResumableIoError("resumable put failed")
34+
err_unmatched_checksum = ResumableIoError("unmatched checksum")
35+
err_putExtra_type = ResumableIoError("extra must the instance of PutExtra")
36+
37+
38+
def setup(chunk_size=0, try_times=0):
39+
global _chunk_size, _try_times
40+
_chunk_size = 1 << 22 if chunk_size <= 0 else chunk_size
41+
_try_times = 3 if try_times == 0 else try_times
42+
return
4243

43-
_chunk_size, _try_times = chunk_size, try_times
4444

45-
# ----------------------------------------------------------
4645
def gen_crc32(data):
47-
return binascii.crc32(data) & 0xffffffff
46+
return binascii.crc32(data) & 0xffffffff
47+
4848

4949
class PutExtra(object):
50-
params = None # 自定义用户变量, key需要x: 开头
51-
mimetype = None # 可选。在 uptoken 没有指定 DetectMime 时,用户客户端可自己指定 MimeType
52-
chunk_size = None # 可选。每次上传的Chunk大小
53-
try_times = None # 可选。尝试次数
54-
progresses = None # 可选。上传进度
55-
notify = lambda self, idx, size, ret: None # 可选。进度提示
56-
notify_err = lambda self, idx, size, err: None
50+
params = None # 自定义用户变量, key需要x: 开头
51+
mimetype = None # 可选。在 uptoken 没有指定 DetectMime 时,用户客户端可自己指定 MimeType
52+
chunk_size = None # 可选。每次上传的Chunk大小 简化模式,弃用
53+
try_times = None # 可选。尝试次数
54+
progresses = None # 可选。上传进度
55+
notify = lambda self, idx, size, ret: None # 可选。进度提示
56+
notify_err = lambda self, idx, size, err: None
57+
58+
def __init__(self, bucket=None):
59+
self.bucket = bucket
60+
return
5761

58-
def __init__(self, bucket):
59-
self.bucket = bucket
6062

6163
def put_file(uptoken, key, localfile, extra):
62-
""" 上传文件 """
63-
f = open(localfile, "rb")
64-
statinfo = os.stat(localfile)
65-
ret = put(uptoken, key, f, statinfo.st_size, extra)
66-
f.close()
67-
return ret
64+
""" 上传文件 """
65+
f = open(localfile, "rb")
66+
statinfo = os.stat(localfile)
67+
ret = put(uptoken, key, f, statinfo.st_size, extra)
68+
f.close()
69+
return ret
70+
6871

6972
def put(uptoken, key, f, fsize, extra):
70-
""" 上传二进制流, 通过将data "切片" 分段上传 """
71-
if not isinstance(extra, PutExtra):
72-
print("extra must the instance of PutExtra")
73-
return
74-
75-
block_cnt = block_count(fsize)
76-
if extra.progresses is None:
77-
extra.progresses = [None for i in xrange(0, block_cnt)]
78-
else:
79-
if not len(extra.progresses) == block_cnt:
80-
return None, err_invalid_put_progress
81-
82-
if extra.try_times is None:
83-
extra.try_times = _try_times
84-
85-
if extra.chunk_size is None:
86-
extra.chunk_size = _chunk_size
87-
88-
for i in xrange(0, block_cnt):
89-
try_time = extra.try_times
90-
read_length = _block_size
91-
if (i+1)*_block_size > fsize:
92-
read_length = fsize - i*_block_size
93-
data_slice = f.read(read_length)
94-
while True:
95-
err = resumable_block_put(data_slice, i, extra, uptoken)
96-
if err is None:
97-
break
98-
99-
try_time -= 1
100-
if try_time <= 0:
101-
return None, err_put_failed
102-
print err, ".. retry"
103-
104-
mkfile_client = auth.up.Client(uptoken, extra.progresses[-1]["host"])
105-
return mkfile(mkfile_client, key, fsize, extra)
106-
107-
# ----------------------------------------------------------
73+
""" 上传二进制流, 通过将data "切片" 分段上传 """
74+
if not isinstance(extra, PutExtra):
75+
print("extra must the instance of PutExtra")
76+
return
77+
78+
block_cnt = block_count(fsize)
79+
if extra.progresses is None:
80+
extra.progresses = [None] * block_cnt
81+
else:
82+
if not len(extra.progresses) == block_cnt:
83+
return None, err_invalid_put_progress
84+
85+
if extra.try_times is None:
86+
extra.try_times = _try_times
87+
88+
if extra.chunk_size is None:
89+
extra.chunk_size = _chunk_size
90+
91+
for i in xrange(block_cnt):
92+
try_time = extra.try_times
93+
read_length = _block_size
94+
if (i+1)*_block_size > fsize:
95+
read_length = fsize - i*_block_size
96+
data_slice = f.read(read_length)
97+
while True:
98+
err = resumable_block_put(data_slice, i, extra, uptoken)
99+
if err is None:
100+
break
101+
102+
try_time -= 1
103+
if try_time <= 0:
104+
return None, err_put_failed
105+
print err, ".. retry"
106+
107+
mkfile_client = auth_up.Client(uptoken, extra.progresses[-1]["host"])
108+
return mkfile(mkfile_client, key, fsize, extra)
109+
108110

109111
def resumable_block_put(block, index, extra, uptoken):
110-
block_size = len(block)
111-
112-
mkblk_client = auth.up.Client(uptoken, conf.UP_HOST)
113-
if extra.progresses[index] is None or "ctx" not in extra.progresses[index]:
114-
end_pos = extra.chunk_size-1
115-
if block_size < extra.chunk_size:
116-
end_pos = block_size-1
117-
chunk = block[: end_pos]
118-
crc32 = gen_crc32(chunk)
119-
chunk = bytearray(chunk)
120-
extra.progresses[index], err = mkblock(mkblk_client, block_size, chunk)
121-
if not extra.progresses[index]["crc32"] == crc32:
122-
return err_unmatched_checksum
123-
if err is not None:
124-
extra.notify_err(index, end_pos + 1, err)
125-
return err
126-
extra.notify(index, end_pos + 1, extra.progresses[index])
127-
128-
bput_client = auth.up.Client(uptoken, extra.progresses[index]["host"])
129-
while extra.progresses[index]["offset"] < block_size:
130-
offset = extra.progresses[index]["offset"]
131-
chunk = block[offset: offset+extra.chunk_size-1]
132-
crc32 = gen_crc32(chunk)
133-
chunk = bytearray(chunk)
134-
135-
extra.progresses[index], err = putblock(bput_client, extra.progresses[index], chunk)
136-
if not extra.progresses[index]["crc32"] == crc32:
137-
return err_unmatched_checksum
138-
if err is not None:
139-
extra.notify_err(index, len(chunk), err)
140-
return err
141-
extra.notify(index, len(chunk), extra.progresses[index])
112+
block_size = len(block)
113+
114+
mkblk_client = auth_up.Client(uptoken, conf.UP_HOST)
115+
if extra.progresses[index] is None or "ctx" not in extra.progresses[index]:
116+
crc32 = gen_crc32(block)
117+
block = bytearray(block)
118+
extra.progresses[index], err = mkblock(mkblk_client, block_size, block)
119+
if err is not None:
120+
extra.notify_err(index, block_size, err)
121+
return err
122+
if not extra.progresses[index]["crc32"] == crc32:
123+
return err_unmatched_checksum
124+
extra.notify(index, block_size, extra.progresses[index])
125+
return
126+
142127

143128
def block_count(size):
144-
global _block_size
145-
return (size + _block_mask) / _block_size
129+
global _block_size
130+
return (size + _block_mask) / _block_size
131+
146132

147133
def mkblock(client, block_size, first_chunk):
148-
url = "http://%s/mkblk/%s" % (conf.UP_HOST, block_size)
149-
content_type = "application/octet-stream"
150-
return client.call_with(url, first_chunk, content_type, len(first_chunk))
134+
url = "http://%s/mkblk/%s" % (conf.UP_HOST, block_size)
135+
content_type = "application/octet-stream"
136+
return client.call_with(url, first_chunk, content_type, len(first_chunk))
137+
151138

152139
def putblock(client, block_ret, chunk):
153-
url = "%s/bput/%s/%s" % (block_ret["host"], block_ret["ctx"], block_ret["offset"])
154-
content_type = "application/octet-stream"
155-
return client.call_with(url, chunk, content_type, len(chunk))
140+
url = "%s/bput/%s/%s" % (block_ret["host"], block_ret["ctx"], block_ret["offset"])
141+
content_type = "application/octet-stream"
142+
return client.call_with(url, chunk, content_type, len(chunk))
143+
156144

157145
def mkfile(client, key, fsize, extra):
158-
url = ["http://%s/mkfile/%s" % (conf.UP_HOST, fsize)]
146+
url = ["http://%s/mkfile/%s" % (conf.UP_HOST, fsize)]
159147

160-
if extra.mimetype:
161-
url.append("mimeType/%s" % urlsafe_b64encode(extra.mimetype))
148+
if extra.mimetype:
149+
url.append("mimeType/%s" % urlsafe_b64encode(extra.mimetype))
162150

163-
if key is not None:
164-
url.append("key/%s" % urlsafe_b64encode(key))
151+
if key is not None:
152+
url.append("key/%s" % urlsafe_b64encode(key))
165153

166-
if extra.params:
167-
for k, v in extra.params.iteritems():
168-
url.append("%s/%s" % (k, urlsafe_b64encode(v)))
154+
if extra.params:
155+
for k, v in extra.params.iteritems():
156+
url.append("%s/%s" % (k, urlsafe_b64encode(v)))
169157

170-
url = "/".join(url)
171-
body = ",".join([i["ctx"] for i in extra.progresses])
172-
return client.call_with(url, body, "text/plain", len(body))
158+
url = "/".join(url)
159+
body = ",".join([i["ctx"] for i in extra.progresses])
160+
return client.call_with(url, body, "text/plain", len(body))

0 commit comments

Comments
 (0)