Skip to content

Commit dcc55f8

Browse files
committed
rewrite
1 parent 243bc8b commit dcc55f8

File tree

1 file changed

+160
-0
lines changed

1 file changed

+160
-0
lines changed

qiniu/resumable_io_rewrite.py

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
#coding=utf-8
2+
import os
3+
try:
4+
import zlib as binascii
5+
except ImportError:
6+
import binascii
7+
from base64 import urlsafe_b64encode
8+
9+
from auth import up as auth_up
10+
import conf
11+
12+
_workers = 1
13+
_task_queue_size = _workers * 4
14+
_try_times = 3
15+
_block_bits = 22
16+
_block_size = 1 << _block_bits
17+
_block_mask = _block_size - 1
18+
_chunk_size = _block_size # 简化模式,弃用
19+
20+
21+
class ResumableIoError(object):
22+
value = None
23+
24+
def __init__(self, value):
25+
self.value = value
26+
return
27+
28+
def __str__(self):
29+
return self.value
30+
31+
32+
err_invalid_put_progress = ResumableIoError("invalid put progress")
33+
err_put_failed = ResumableIoError("resumable put failed")
34+
err_unmatched_checksum = ResumableIoError("unmatched checksum")
35+
err_putExtra_type = ResumableIoError("extra must the instance of PutExtra")
36+
37+
38+
def setup(chunk_size=0, try_times=0):
39+
global _chunk_size, _try_times
40+
_chunk_size = 1 << 22 if chunk_size <= 0 else chunk_size
41+
_try_times = 3 if try_times == 0 else try_times
42+
return
43+
44+
45+
def gen_crc32(data):
46+
return binascii.crc32(data) & 0xffffffff
47+
48+
49+
class PutExtra(object):
50+
params = None # 自定义用户变量, key需要x: 开头
51+
mimetype = None # 可选。在 uptoken 没有指定 DetectMime 时,用户客户端可自己指定 MimeType
52+
chunk_size = None # 可选。每次上传的Chunk大小 简化模式,弃用
53+
try_times = None # 可选。尝试次数
54+
progresses = None # 可选。上传进度
55+
notify = lambda self, idx, size, ret: None # 可选。进度提示
56+
notify_err = lambda self, idx, size, err: None
57+
58+
def __init__(self, bucket=None):
59+
self.bucket = bucket
60+
return
61+
62+
63+
def put_file(uptoken, key, localfile, extra):
64+
""" 上传文件 """
65+
f = open(localfile, "rb")
66+
statinfo = os.stat(localfile)
67+
ret = put(uptoken, key, f, statinfo.st_size, extra)
68+
f.close()
69+
return ret
70+
71+
72+
def put(uptoken, key, f, fsize, extra):
73+
""" 上传二进制流, 通过将data "切片" 分段上传 """
74+
if not isinstance(extra, PutExtra):
75+
print("extra must the instance of PutExtra")
76+
return
77+
78+
block_cnt = block_count(fsize)
79+
if extra.progresses is None:
80+
extra.progresses = [None] * block_cnt
81+
else:
82+
if not len(extra.progresses) == block_cnt:
83+
return None, err_invalid_put_progress
84+
85+
if extra.try_times is None:
86+
extra.try_times = _try_times
87+
88+
if extra.chunk_size is None:
89+
extra.chunk_size = _chunk_size
90+
91+
for i in xrange(block_cnt):
92+
try_time = extra.try_times
93+
read_length = _block_size
94+
if (i+1)*_block_size > fsize:
95+
read_length = fsize - i*_block_size
96+
data_slice = f.read(read_length)
97+
while True:
98+
err = resumable_block_put(data_slice, i, extra, uptoken)
99+
if err is None:
100+
break
101+
102+
try_time -= 1
103+
if try_time <= 0:
104+
return None, err_put_failed
105+
print err, ".. retry"
106+
107+
mkfile_client = auth_up.Client(uptoken, extra.progresses[-1]["host"])
108+
return mkfile(mkfile_client, key, fsize, extra)
109+
110+
111+
def resumable_block_put(block, index, extra, uptoken):
112+
block_size = len(block)
113+
114+
mkblk_client = auth_up.Client(uptoken, conf.UP_HOST)
115+
if extra.progresses[index] is None or "ctx" not in extra.progresses[index]:
116+
crc32 = gen_crc32(block)
117+
block = bytearray(block)
118+
extra.progresses[index], err = mkblock(mkblk_client, block_size, block)
119+
if err is not None:
120+
extra.notify_err(index, block_size, err)
121+
return err
122+
if not extra.progresses[index]["crc32"] == crc32:
123+
return err_unmatched_checksum
124+
extra.notify(index, block_size, extra.progresses[index])
125+
return
126+
127+
128+
def block_count(size):
129+
global _block_size
130+
return (size + _block_mask) / _block_size
131+
132+
133+
def mkblock(client, block_size, first_chunk):
134+
url = "http://%s/mkblk/%s" % (conf.UP_HOST, block_size)
135+
content_type = "application/octet-stream"
136+
return client.call_with(url, first_chunk, content_type, len(first_chunk))
137+
138+
139+
def putblock(client, block_ret, chunk):
140+
url = "%s/bput/%s/%s" % (block_ret["host"], block_ret["ctx"], block_ret["offset"])
141+
content_type = "application/octet-stream"
142+
return client.call_with(url, chunk, content_type, len(chunk))
143+
144+
145+
def mkfile(client, key, fsize, extra):
146+
url = ["http://%s/mkfile/%s" % (conf.UP_HOST, fsize)]
147+
148+
if extra.mimetype:
149+
url.append("mimeType/%s" % urlsafe_b64encode(extra.mimetype))
150+
151+
if key is not None:
152+
url.append("key/%s" % urlsafe_b64encode(key))
153+
154+
if extra.params:
155+
for k, v in extra.params.iteritems():
156+
url.append("%s/%s" % (k, urlsafe_b64encode(v)))
157+
158+
url = "/".join(url)
159+
body = ",".join([i["ctx"] for i in extra.progresses])
160+
return client.call_with(url, body, "text/plain", len(body))

0 commit comments

Comments
 (0)