Skip to content

Commit 7cad2e7

Browse files
committed
bump 0.50.0-public
1 parent 5326f9b commit 7cad2e7

File tree

10 files changed

+75
-9
lines changed

10 files changed

+75
-9
lines changed

CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,15 @@
11
# Changelog
22

3+
## [0.50.0-public] - 2024-10-09
4+
5+
### Features
6+
- Added `SchemaMismatchException`: This exception will be thrown when using `StreamUploadSession` if the Record structure uploaded by the user does not match the table structure. This exception will additionally carry the latest schema version to assist users in rebuilding the Session and performing retry operations.
7+
- Added `allowSchemaMismatch` method in `StreamUploadSession.Builder`: This method specifies whether to tolerate mismatches between the user's uploaded Record structure and the table structure without throwing an exception. The default value is `true`.
8+
9+
### Fixes
10+
- Fixed an issue where specifying `tunnelEndpoint` in Odps was ineffective when using `StreamUploadSession`.
11+
- Fixed a potential NPE issue in `TunnelRetryHandler`.
12+
313
## [0.50.0-rc1] - 2024-09-19
414

515
### Features

CHANGELOG_CN.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,15 @@
11
# 更新日志
2+
## [0.50.0-public] - 2024-10-09
3+
4+
### 功能
5+
- 新增 `SchemaMismatchException` 异常:当使用 `StreamUploadSession` 时,如果用户上传的 Record 结构与表结构不匹配,将抛出该异常。此异常将额外携带最新的 schema version,方便用户重建 Session 并进行重试操作。
6+
-`StreamUploadSession.Builder` 中新增 `allowSchemaMismatch` 方法,用于指定是否容忍用户上传的 Record 结构与表结构不匹配时是否抛出异常。默认值为 `true`
7+
8+
### 修复
9+
- 修复了在 Odps 中指定 `tunnelEndpoint` 时,使用 `StreamUploadSession` 无法生效的问题。
10+
- 修复了 `TunnelRetryHandler` 潜在的 NPE 问题。
11+
12+
213
## [0.50.0-rc1] - 2024-09-19
314
### 功能
415
- **SQLExecutor** 新增 `isUseInstanceTunnel` 方法:

odps-examples/tunnel-examples/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@
1111
<dependency>
1212
<groupId>com.aliyun.odps</groupId>
1313
<artifactId>odps-sdk-commons</artifactId>
14-
<version>0.48.8-public</version>
14+
<version>0.50.0-public</version>
1515
</dependency>
1616
<dependency>
1717
<groupId>com.aliyun.odps</groupId>
1818
<artifactId>odps-sdk-core</artifactId>
19-
<version>0.48.8-public</version>
19+
<version>0.50.0-public</version>
2020
</dependency>
2121
</dependencies>
2222
</project>
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.aliyun.odps.exceptions;
2+
3+
import java.io.IOException;
4+
5+
/**
6+
* @author dingxin ([email protected])
7+
*/
8+
public class SchemaMismatchException extends IOException {
9+
10+
private final String latestSchemaVersion;
11+
12+
public SchemaMismatchException(String msg, String latestSchemaVersion) {
13+
super(msg);
14+
this.latestSchemaVersion = latestSchemaVersion;
15+
}
16+
17+
public String getLatestSchemaVersion() {
18+
return latestSchemaVersion;
19+
}
20+
}

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/tunnel/Configuration.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ public class Configuration extends GeneralConfiguration {
5050

5151
public Configuration(Odps odps) {
5252
super(odps);
53+
if (!StringUtils.isNullOrEmpty(odps.getTunnelEndpoint())) {
54+
endpoint = URI.create(odps.getTunnelEndpoint());
55+
}
5356
}
5457

5558
public Configuration(Builder builder) {

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/tunnel/HttpHeaders.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,6 @@ public class HttpHeaders extends Headers {
3434
public static final String HEADER_ODPS_SLOT_NUM = "odps-tunnel-slot-num";
3535
public static final String HEADER_ODPS_TUNNEL_TAGS= "odps-tunnel-tags";
3636
public static final String HEADER_ODPS_TUNNEL_SDK_SUPPORT_SCHEMA_EVOLUTION = "odps-tunnel-sdk-support-schema-evolution";
37+
public static final String HEADER_ODPS_TUNNEL_LATEST_SCHEMA_VERSION = "odps-tunnel-latest-schema-version";
3738

3839
}

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/tunnel/TableTunnel.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1156,6 +1156,7 @@ abstract class Builder {
11561156
private long slotNum = 0;
11571157
private boolean createPartition = false;
11581158
private String schemaVersion;
1159+
protected boolean allowSchemaMismatch = true;
11591160
public String getSchemaName() {
11601161
return schemaName;
11611162
}
@@ -1206,6 +1207,11 @@ public Builder setCreatePartition(boolean createPartition) {
12061207
return this;
12071208
}
12081209

1210+
public Builder allowSchemaMismatch(boolean allowSchemaMismatch) {
1211+
this.allowSchemaMismatch = allowSchemaMismatch;
1212+
return this;
1213+
}
1214+
12091215
abstract public StreamUploadSession build() throws TunnelException;
12101216
}
12111217
}

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/tunnel/TunnelConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ public interface TunnelConstants {
8585
public static String PARAM_QUOTA_NAME= "quotaName";
8686
public static String PARAM_SHARING_QUOTA_TOKEN= "sharingQuotaToken";
8787
public static String PARAM_BLOCK_VERSION = "block_version";
88+
public static String PARAM_CHECK_LATEST_SCHEMA = "check_latest_schema";
8889
public static String ENABLE_PARTIAL_UPDATE = "enable_partial_update";
8990
public static String SCHEMA_VERSION = "schema_version";
9091
public static String PARAM_DISABLE_MODIFIED_CHECK = "disable_modified_check";

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/tunnel/impl/StreamUploadSessionImpl.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.aliyun.odps.commons.transport.Response;
2121
import com.aliyun.odps.data.ArrayRecord;
2222
import com.aliyun.odps.data.Record;
23+
import com.aliyun.odps.exceptions.SchemaMismatchException;
2324
import com.aliyun.odps.tunnel.Configuration;
2425
import com.aliyun.odps.tunnel.HttpHeaders;
2526
import com.aliyun.odps.tunnel.TableTunnel;
@@ -98,13 +99,15 @@ public TableTunnel.StreamUploadSession build() throws TunnelException {
9899
isCreatePartition(),
99100
getSlotNum(),
100101
zorderColumns,
101-
getSchemaVersion());
102+
getSchemaVersion(),
103+
allowSchemaMismatch);
102104
}
103105
}
104106

105107
protected StreamUploadSessionImpl.Slots slots;
106108
private boolean p2pMode = false;
107109
private List<Column> columns;
110+
private boolean checkLatestSchema;
108111

109112
public StreamUploadSessionImpl(Configuration conf,
110113
String projectName,
@@ -114,7 +117,8 @@ public StreamUploadSessionImpl(Configuration conf,
114117
boolean cretaPartition,
115118
long slotNum,
116119
List<Column> zorderColumns,
117-
String schemaVersion) throws TunnelException {
120+
String schemaVersion,
121+
boolean allowSchemaMismatch) throws TunnelException {
118122
this.config = conf;
119123
this.projectName = projectName;
120124
this.schemaName = schemaName;
@@ -123,6 +127,7 @@ public StreamUploadSessionImpl(Configuration conf,
123127
this.columns = zorderColumns;
124128
this.httpClient = Util.newRestClient(conf, projectName);
125129
this.schemaVersion = schemaVersion;
130+
this.checkLatestSchema = !allowSchemaMismatch;
126131
initiate(slotNum, cretaPartition);
127132
}
128133

@@ -275,6 +280,7 @@ private Connection getConnection(CompressOption compress, Slot slot, long size,
275280
if (!StringUtils.isNullOrEmpty(config.getQuotaName())) {
276281
params.put(TunnelConstants.PARAM_QUOTA_NAME, config.getQuotaName());
277282
}
283+
params.put(TunnelConstants.PARAM_CHECK_LATEST_SCHEMA, String.valueOf(checkLatestSchema));
278284

279285
switch (compress.algorithm) {
280286
case ODPS_RAW: {
@@ -352,7 +358,7 @@ public String writeBlock(ProtobufRecordPack pack, long timeout)
352358
}
353359
}
354360
});
355-
} catch (RuntimeException re) {
361+
} catch (RuntimeException | IOException re) {
356362
throw re;
357363
} catch (Exception e) {
358364
throw new IOException(e.getMessage(), e);
@@ -385,9 +391,15 @@ private String sendBlock(ProtobufRecordPack pack, Connection conn, Slot slot, lo
385391
}
386392
}
387393
if (!response.isOK()) {
388-
TunnelException exception =
389-
new TunnelException(response.getHeader(HEADER_ODPS_REQUEST_ID), conn.getInputStream(),
390-
response.getStatus());
394+
TunnelException exception = new TunnelException(response.getHeader(HEADER_ODPS_REQUEST_ID),
395+
conn.getInputStream(),
396+
response.getStatus());
397+
if (exception.getErrorCode().equals("SchemaModified") &&
398+
exception.getStatus().equals(412)) {
399+
throw new SchemaMismatchException("SchemaModified",
400+
response.getHeader(
401+
HttpHeaders.HEADER_ODPS_TUNNEL_LATEST_SCHEMA_VERSION));
402+
}
391403
throw new IOException(exception.getMessage(), exception);
392404
}
393405

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/tunnel/io/TunnelRetryHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,9 @@ public <T> T executeWithRetry(Callable<T> action, IntConsumer errorCodeHandler)
113113
if (!policy.shouldRetry(e, attempt)) {
114114
throw e;
115115
}
116-
retryLogger.onRetryLog(e, attempt, policy.getRetryWaitTime(attempt));
116+
if (retryLogger != null) {
117+
retryLogger.onRetryLog(e, attempt, policy.getRetryWaitTime(attempt));
118+
}
117119
try {
118120
policy.waitForNextRetry(attempt);
119121
if (errorCodeHandler != null && e instanceof TunnelException) {

0 commit comments

Comments
 (0)