Skip to content

支持多区域上传 #522

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
May 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
339 changes: 182 additions & 157 deletions src/main/java/com/qiniu/storage/AutoRegion.java

Large diffs are not rendered by default.

76 changes: 76 additions & 0 deletions src/main/java/com/qiniu/storage/BaseUploader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.qiniu.storage;

import com.qiniu.common.QiniuException;
import com.qiniu.http.Client;
import com.qiniu.http.Response;

public abstract class BaseUploader {

protected final Client client;
protected final String key;
protected final String upToken;
protected final ConfigHelper configHelper;
protected final Configuration config;

BaseUploader(Client client, String upToken, String key, Configuration config) {
this.client = client;
this.key = key;
this.upToken = upToken;
if (config == null) {
this.config = new Configuration();
} else {
this.config = config.clone();
}
this.configHelper = new ConfigHelper(this.config);
}

public Response upload() throws QiniuException {
if (this.config == null) {
throw QiniuException.unrecoverable("config can't be empty");
}
return uploadWithRegionRetry();
}

private Response uploadWithRegionRetry() throws QiniuException {
Response response = null;
while (true) {
try {
response = uploadFlows();
if (!couldSwitchRegionAndRetry(response, null)
|| !couldReloadSource() || !reloadSource()
|| config.region == null || !config.region.switchRegion(new UploadToken(upToken))) {
break;
}
} catch (QiniuException e) {
if (!couldSwitchRegionAndRetry(null, e)
|| !couldReloadSource() || !reloadSource()
|| config.region == null || !config.region.switchRegion(new UploadToken(upToken))) {
throw e;
}
}
}
return response;
}

abstract Response uploadFlows() throws QiniuException;

abstract boolean couldReloadSource();

abstract boolean reloadSource();

private boolean couldSwitchRegionAndRetry(Response response, QiniuException exception) {
Response checkResponse = response;
if (checkResponse == null && exception != null) {
checkResponse = exception.response;
}

if (checkResponse != null) {
int statusCode = checkResponse.statusCode;
return (statusCode > -2 && statusCode < 200) || (statusCode > 299
&& statusCode != 401 && statusCode != 413 && statusCode != 419
&& statusCode != 608 && statusCode != 614 && statusCode != 630);
}

return exception == null || !exception.isUnrecoverable();
}
}
2 changes: 2 additions & 0 deletions src/main/java/com/qiniu/storage/FixBlockUploader.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

@Deprecated
public class FixBlockUploader {
private final int blockSize;
private final ConfigHelper configHelper;
Expand Down Expand Up @@ -883,6 +884,7 @@ class Record {
long size;
long blockSize;
List<EtagIdx> etagIdxes;

// 用于区分记录是 V1 还是 V2
boolean isValid() {
return uploadId != null && etagIdxes != null && etagIdxes.size() > 0;
Expand Down
42 changes: 24 additions & 18 deletions src/main/java/com/qiniu/storage/FormUploader.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@
* 该类封装了七牛提供的表单上传机制
* 参考文档:<a href="https://developer.qiniu.com/kodo/manual/form-upload">表单上传</a>
*/
public final class FormUploader {
public final class FormUploader extends BaseUploader {

private final String token;
private final String key;
private final File file;
private final byte[] data;
private final String mime;
Expand Down Expand Up @@ -45,9 +43,9 @@ public FormUploader(Client client, String upToken, String key, File file, String

private FormUploader(Client client, String upToken, String key, byte[] data, File file, StringMap params,
String mime, boolean checkCrc, Configuration configuration) {
super(client, upToken, key, configuration);

this.client = client;
token = upToken;
this.key = key;
this.file = file;
this.data = data;
this.params = params;
Expand All @@ -56,23 +54,21 @@ private FormUploader(Client client, String upToken, String key, byte[] data, Fil
this.configHelper = new ConfigHelper(configuration);
}

/**
* 同步上传文件
*/
public Response upload() throws QiniuException {
@Override
Response uploadFlows() throws QiniuException {
buildParams();
String host = configHelper.upHost(token);
String host = configHelper.upHost(upToken);
try {
if (data != null) {
return client.multipartPost(configHelper.upHost(token), params, "file", filename, data,
return client.multipartPost(configHelper.upHost(upToken), params, "file", filename, data,
mime, new StringMap());
} else {
return client.multipartPost(configHelper.upHost(token), params, "file", filename, file,
return client.multipartPost(configHelper.upHost(upToken), params, "file", filename, file,
mime, new StringMap());
}
} catch (QiniuException e) {
if (e.response == null || e.response.needSwitchServer()) {
changeHost(token, host);
changeHost(upToken, host);
}
throw e;
}
Expand All @@ -83,32 +79,42 @@ public Response upload() throws QiniuException {
*/
public void asyncUpload(final UpCompletionHandler handler) throws IOException {
buildParams();
final String host = configHelper.upHost(token);
final String host = configHelper.upHost(upToken);
if (data != null) {
client.asyncMultipartPost(host, params, "file", filename,
data, mime, new StringMap(), new AsyncCallback() {
@Override
public void complete(Response res) {
if (res != null && res.needSwitchServer()) {
changeHost(token, host);
changeHost(upToken, host);
}
handler.complete(key, res);
}
});
return;
}
client.asyncMultipartPost(configHelper.upHost(token), params, "file", filename,
client.asyncMultipartPost(configHelper.upHost(upToken), params, "file", filename,
file, mime, new StringMap(), new AsyncCallback() {
@Override
public void complete(Response res) {
if (res != null && res.needSwitchServer()) {
changeHost(token, host);
changeHost(upToken, host);
}
handler.complete(key, res);
}
});
}

@Override
boolean couldReloadSource() {
return true;
}

@Override
boolean reloadSource() {
return true;
}

private void changeHost(String upToken, String host) {
try {
configHelper.tryChangeUpHost(upToken, host);
Expand All @@ -120,7 +126,7 @@ private void changeHost(String upToken, String host) {

private void buildParams() throws QiniuException {
if (params == null) return;
params.put("token", token);
params.put("token", upToken);

if (key != null) {
params.put("key", key);
Expand Down
35 changes: 35 additions & 0 deletions src/main/java/com/qiniu/storage/Region.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

public class Region {

// 有效时间戳,过了有效期,region会无效,此处只在取时缓存判断; -1 为无限期
private long timestamp = -1;
// 区域名称:z0 华东 z1 华北 z2 华南 na0 北美 as0 东南亚
private String region = "z0";

Expand All @@ -25,6 +27,23 @@ public class Region {
private String rsHost = "rs.qbox.me";
private String rsfHost = "rsf.qbox.me";
private String apiHost = "api.qiniu.com";
private String ucHost = "uc.qbox.me";

Region() {
}

Region(long timestamp, String region, List<String> srcUpHosts, List<String> accUpHosts, String iovipHost,
String rsHost, String rsfHost, String apiHost, String ucHost) {
this.timestamp = timestamp;
this.region = region;
this.srcUpHosts = srcUpHosts;
this.accUpHosts = accUpHosts;
this.iovipHost = iovipHost;
this.rsHost = rsHost;
this.rsfHost = rsfHost;
this.apiHost = apiHost;
this.ucHost = ucHost;
}

/**
* 华东机房相关域名
Expand Down Expand Up @@ -211,10 +230,18 @@ public static Region autoRegion(String ucServer) {
return new Builder().autoRegion(ucServer);
}

boolean switchRegion(RegionReqInfo regionReqInfo) {
return false;
}

String getRegion(RegionReqInfo regionReqInfo) {
return this.region;
}

Region getCurrentRegion(RegionReqInfo regionReqInfo) {
return this;
}

List<String> getSrcUpHost(RegionReqInfo regionReqInfo) throws QiniuException {
return this.srcUpHosts;
}
Expand All @@ -239,6 +266,14 @@ String getApiHost(RegionReqInfo regionReqInfo) throws QiniuException {
return apiHost;
}

boolean isValid() {
if (timestamp < 0) {
return true;
} else {
return System.currentTimeMillis() < timestamp * 1000;
}
}

/**
* 域名构造器
*/
Expand Down
128 changes: 128 additions & 0 deletions src/main/java/com/qiniu/storage/RegionGroup.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package com.qiniu.storage;

import com.qiniu.common.QiniuException;

import java.util.ArrayList;
import java.util.List;

public class RegionGroup extends Region {

private Region currentRegion = null;
private int currentRegionIndex = 0;
private final List<Region> regionList = new ArrayList<>();


public boolean addRegion(Region region) {
if (region == null) {
return false;
}

regionList.add(region);

if (currentRegion == null) {
updateCurrentRegion();
}

return true;
}

@Override
boolean switchRegion(RegionReqInfo regionReqInfo) {
if (currentRegion != null && currentRegion.isValid() && currentRegion.switchRegion(regionReqInfo)) {
return true;
}

if ((currentRegionIndex + 1) < regionList.size()) {
currentRegionIndex += 1;
updateCurrentRegion();
return true;
} else {
return false;
}
}

String getRegion(RegionReqInfo regionReqInfo) {
if (currentRegion == null) {
return "";
} else {
return currentRegion.getRegion(regionReqInfo);
}
}

List<String> getSrcUpHost(RegionReqInfo regionReqInfo) throws QiniuException {
if (currentRegion == null) {
return null;
} else {
return currentRegion.getSrcUpHost(regionReqInfo);
}
}

List<String> getAccUpHost(RegionReqInfo regionReqInfo) throws QiniuException {
if (currentRegion == null) {
return null;
} else {
return currentRegion.getAccUpHost(regionReqInfo);
}
}

String getIovipHost(RegionReqInfo regionReqInfo) throws QiniuException {
if (currentRegion == null) {
return null;
} else {
return currentRegion.getIovipHost(regionReqInfo);
}
}

String getRsHost(RegionReqInfo regionReqInfo) throws QiniuException {
if (currentRegion == null) {
return null;
} else {
return currentRegion.getRsHost(regionReqInfo);
}
}

String getRsfHost(RegionReqInfo regionReqInfo) throws QiniuException {
if (currentRegion == null) {
return null;
} else {
return currentRegion.getRsfHost(regionReqInfo);
}
}

String getApiHost(RegionReqInfo regionReqInfo) throws QiniuException {
if (currentRegion == null) {
return null;
} else {
return currentRegion.getApiHost(regionReqInfo);
}
}

Region getCurrentRegion(RegionReqInfo regionReqInfo) {
if (currentRegion == null) {
return null;
} else if (currentRegion instanceof AutoRegion || currentRegion instanceof RegionGroup) {
return currentRegion.getCurrentRegion(regionReqInfo);
} else {
return currentRegion;
}
}

@Override
boolean isValid() {
if (currentRegion == null) {
return false;
}
// 只判断当前的
return currentRegion.isValid();
}

private void updateCurrentRegion() {
if (regionList.size() == 0) {
return;
}

if (currentRegionIndex < regionList.size()) {
currentRegion = regionList.get(currentRegionIndex);
}
}
}
Loading