Skip to content

Retry host #120

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jul 1, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions qiniu/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
RS_HOST = "rs.qbox.me"
RSF_HOST = "rsf.qbox.me"
UP_HOST = "up.qiniu.com"
UP_HOST2 = "up.qbox.me"

from . import __version__
import platform
Expand Down
7 changes: 6 additions & 1 deletion qiniu/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ def put(uptoken, key, data, extra=None):
files = [
{'filename': fname, 'data': data, 'mime_type': extra.mime_type},
]
return rpc.Client(conf.UP_HOST).call_with_multipart("/", fields, files)
ret, err, code = rpc.Client(conf.UP_HOST).call_with_multipart("/", fields, files)
if err is None or code == 571 or code == 614 or code == 301:
return ret, err

ret, err, code = rpc.Client(conf.UP_HOST2).call_with_multipart("/", fields, files)
return ret, err


def put_file(uptoken, key, localfile, extra=None):
Expand Down
41 changes: 27 additions & 14 deletions qiniu/resumable_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,23 +66,35 @@ def put_file(uptoken, key, localfile, extra):
""" 上传文件 """
f = open(localfile, "rb")
statinfo = os.stat(localfile)
ret = put(uptoken, key, f, statinfo.st_size, extra)
ret, err = put(uptoken, key, f, statinfo.st_size, extra)
f.close()
return ret
return ret, err


def put(uptoken, key, f, fsize, extra):
""" 上传二进制流, 通过将data "切片" 分段上传 """
if not isinstance(extra, PutExtra):
print("extra must the instance of PutExtra")
return
host = conf.UP_HOST
try:
ret, err, code = put_with_host(uptoken, key, f, fsize, extra, host)
if err is None or code == 571 or code == 614 or code == 301:
return ret, err
except:
pass

ret, err, code = put_with_host(uptoken, key, f, fsize, extra, conf.UP_HOST2)
return ret, err


def put_with_host(uptoken, key, f, fsize, extra, host):
block_cnt = block_count(fsize)
if extra.progresses is None:
extra.progresses = [None] * block_cnt
else:
if not len(extra.progresses) == block_cnt:
return None, err_invalid_put_progress
return None, err_invalid_put_progress, 0

if extra.try_times is None:
extra.try_times = _try_times
Expand All @@ -97,28 +109,29 @@ def put(uptoken, key, f, fsize, extra):
read_length = fsize - i * _block_size
data_slice = f.read(read_length)
while True:
err = resumable_block_put(data_slice, i, extra, uptoken)
err = resumable_block_put(data_slice, i, extra, uptoken, host)
if err is None:
break

try_time -= 1
if try_time <= 0:
return None, err_put_failed
return None, err_put_failed, 0
print err, ".. retry"

mkfile_host = extra.progresses[-1]["host"] if block_cnt else conf.UP_HOST
mkfile_host = extra.progresses[-1]["host"] if block_cnt else host
mkfile_client = auth_up.Client(uptoken, mkfile_host)
return mkfile(mkfile_client, key, fsize, extra)

return mkfile(mkfile_client, key, fsize, extra, host)


def resumable_block_put(block, index, extra, uptoken):
def resumable_block_put(block, index, extra, uptoken, host):
block_size = len(block)

mkblk_client = auth_up.Client(uptoken, conf.UP_HOST)
mkblk_client = auth_up.Client(uptoken, host)
if extra.progresses[index] is None or "ctx" not in extra.progresses[index]:
crc32 = gen_crc32(block)
block = bytearray(block)
extra.progresses[index], err = mkblock(mkblk_client, block_size, block)
extra.progresses[index], err, code = mkblock(mkblk_client, block_size, block, host)
if err is not None:
extra.notify_err(index, block_size, err)
return err
Expand All @@ -133,8 +146,8 @@ def block_count(size):
return (size + _block_mask) / _block_size


def mkblock(client, block_size, first_chunk):
url = "http://%s/mkblk/%s" % (conf.UP_HOST, block_size)
def mkblock(client, block_size, first_chunk, host):
url = "http://%s/mkblk/%s" % (host, block_size)
content_type = "application/octet-stream"
return client.call_with(url, first_chunk, content_type, len(first_chunk))

Expand All @@ -146,8 +159,8 @@ def putblock(client, block_ret, chunk):
return client.call_with(url, chunk, content_type, len(chunk))


def mkfile(client, key, fsize, extra):
url = ["http://%s/mkfile/%s" % (conf.UP_HOST, fsize)]
def mkfile(client, key, fsize, extra, host):
url = ["http://%s/mkfile/%s" % (host, fsize)]

if extra.mimetype:
url.append("mimeType/%s" % urlsafe_b64encode(extra.mimetype))
Expand Down
12 changes: 7 additions & 5 deletions qiniu/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ def merged_headers(self, header):
return _header

def call(self, path):
return self.call_with(path, None)
ret, err, code = self.call_with(path, None)
return ret, err

def call_with(self, path, body, content_type=None, content_length=None):
ret = None
Expand All @@ -52,16 +53,16 @@ def call_with(self, path, body, content_type=None, content_length=None):
except ValueError:
pass

if resp.status / 100 != 2:
if resp.status >= 400:
err_msg = ret if "error" not in ret else ret["error"]
reqid = resp.getheader("X-Reqid", None)
# detail = resp.getheader("x-log", None)
if reqid is not None:
err_msg += ", reqid:%s" % reqid

return None, err_msg
return None, err_msg, resp.status

return ret, None
return ret, None, resp.status

def call_with_multipart(self, path, fields=None, files=None):
"""
Expand All @@ -87,7 +88,8 @@ def call_with_form(self, path, ops):
body = '&'.join(body)

content_type = "application/x-www-form-urlencoded"
return self.call_with(path, body, content_type, len(body))
ret, err, code = self.call_with(path, body, content_type, len(body))
return ret, err

def set_header(self, field, value):
self._header[field] = value
Expand Down
2 changes: 1 addition & 1 deletion qiniu/rsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def list_prefix(self, bucket, prefix=None, marker=None, limit=None):
if prefix is not None:
ops['prefix'] = prefix
url = '%s?%s' % ('/list', urllib.urlencode(ops))
ret, err = self.conn.call_with(
ret, err, code = self.conn.call_with(
url, body=None, content_type='application/x-www-form-urlencoded')
if ret and not ret.get('marker'):
err = EOF
Expand Down
14 changes: 12 additions & 2 deletions qiniu/test/io_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ def test_put_quote_key():
data = r(100)
key = 'a\\b\\c"你好' + r(9)
ret, err = io.put(policy.token(), key, data)
print err
assert err is None
assert ret['key'].encode('utf8') == key

Expand Down Expand Up @@ -114,7 +113,7 @@ def test_put_StringIO():

def test_put_urlopen():
key = "test_%s" % r(9)
data = urllib.urlopen('http://cheneya.qiniudn.com/hello_jpg')
data = urllib.urlopen('http://pythonsdk.qiniudn.com/hello.jpg')
ret, err = io.put(policy.token(), key, data)
assert err is None
assert ret['key'] == key
Expand Down Expand Up @@ -178,6 +177,17 @@ def test_put_fail_reqid(self):
ret, err = io.put("", key, data, extra)
assert "reqid" in err

def test_put_with_uphost2(self):
conf.UP_HOST = "api.qiniu.com" # mistake up host
localfile = "%s" % __file__
key = "test_up2_%s" % r(9)

extra.check_crc = 1
ret, err = io.put_file(policy.token(), key, localfile, extra)
assert err is None
assert ret['key'] == key
conf.UP_HOST = "up.qiniu.com"


class Test_get_file_crc32(unittest.TestCase):

Expand Down
11 changes: 5 additions & 6 deletions qiniu/test/resumable_io_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,15 @@ class TestBlock(unittest.TestCase):
def test_block(self):
if is_travis:
return
host = conf.UP_HOST
policy = rs.PutPolicy(bucket)
uptoken = policy.token()
client = up.Client(uptoken)

# rets = [0, 0]
data_slice_2 = "\nbye!"
ret, err = resumable_io.mkblock(
client, len(data_slice_2), data_slice_2)
ret, err, code = resumable_io.mkblock(
client, len(data_slice_2), data_slice_2, host)
assert err is None, err
self.assertEqual(ret["crc32"], binascii.crc32(data_slice_2))

Expand All @@ -56,7 +57,7 @@ def test_block(self):
lens += extra.progresses[i]["offset"]

key = u"sdk_py_resumable_block_4_%s" % r(9)
ret, err = resumable_io.mkfile(client, key, lens, extra)
ret, err, code = resumable_io.mkfile(client, key, lens, extra, host)
assert err is None, err
self.assertEqual(
ret["hash"], "FtCFo0mQugW98uaPYgr54Vb1QsO0", "hash not match")
Expand All @@ -65,7 +66,7 @@ def test_block(self):
def test_put(self):
if is_travis:
return
src = urllib.urlopen("http://cheneya.qiniudn.com/hello_jpg")
src = urllib.urlopen("http://pythonsdk.qiniudn.com/hello.jpg")
ostype = platform.system()
if ostype.lower().find("windows") != -1:
tmpf = "".join([os.getcwd(), mktemp()])
Expand All @@ -84,7 +85,6 @@ def test_put(self):
ret, err = resumable_io.put_file(policy.token(), key, localfile, extra)
dst.close()
os.remove(tmpf)

assert err is None, err
assert ret.get("x:foo") == "test", "return data not contains 'x:foo'"
self.assertEqual(
Expand Down Expand Up @@ -112,7 +112,6 @@ def test_put_4m(self):
ret, err = resumable_io.put_file(policy.token(), key, localfile, extra)
dst.close()
os.remove(tmpf)

assert err is None, err
assert ret.get("x:foo") == "test", "return data not contains 'x:foo'"
self.assertEqual(
Expand Down