Skip to content

Commit a50c9e5

Browse files
committed
prepare 0.50.2-public
1 parent e323f86 commit a50c9e5

File tree

11 files changed

+158
-12
lines changed

11 files changed

+158
-12
lines changed

CHANGELOG.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,27 @@
11
# Changelog
2+
## [0.50.2-public] - 2024-10-23
3+
### Features
4+
- **SQLExecutor** Enhanced MCQA 2.0 functionality:
5+
- `isActive` will return false, indicating that there are no active Sessions in MCQA 2.0 mode.
6+
- Added a `cancel` method to terminate ongoing jobs.
7+
- `getExecutionLog` now returns a deep copy of the current log and clears the current log,
8+
preventing duplicates.
9+
- New `quota` method in `SQLExecutorBuilder` allows reusing already loaded `Quota`, reducing
10+
load times.
11+
- New `regionId` method in `SQLExecutorBuilder` allows specifying the region where the quota is
12+
located.
13+
- **Quotas** Added `getWlmQuota` method with `regionId` parameter to fetch quota for a specified regionId.
14+
- **Quota** Introduced `setMcqaConnHeader` method to allow users to override quota using a custom
15+
McqaConnHeader, supporting MCQA 2.0.
16+
- **Instances** Added `get` method applicable for MCQA 2.0 jobs, requiring additional parameters for QuotaName
17+
and RegionId.
18+
- **Instance** Further adapted for MCQA 2.0 jobs.
19+
- **TableSchema** `basicallyEquals` method will no longer strictly check for identical Class types.
20+
### Optimization
21+
- **SQLExecutor** The `run` method's hints will now be deep-copied, preserving the user-provided Map and
22+
supporting immutable types (e.g., `ImmutableMap`).
23+
### Fixes
24+
- **Stream** Fixed potential SQL syntax errors in the `create` method.
225

326
## [0.50.1-public] - 2024-10-11
427

CHANGELOG_CN.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,23 @@
11
# 更新日志
22

3+
## [0.50.2-public] - 2024-10-23
4+
### 功能
5+
- **SQLExecutor** 增强 MCQA 2.0 功能:
6+
- `isActive` 将返回 false,指示在 MCQA 2.0 模式下没有活跃的 Session。
7+
- 新增 `cancel` 方法,用于中止正在执行的作业。
8+
- `getExecutionLog` 现在返回当前日志的深拷贝并清空当前日志,避免重复获取。
9+
-`SQLExecutorBuilder` 新增 `quota` 方法,支持复用已加载的 `Quota`,减少加载时间。
10+
-`SQLExecutorBuilder` 新增 `regionId` 方法,允许指定 quota 所在的 region。
11+
- **Quotas** 新增带 `regionId` 参数的 `getWlmQuota` 方法,用于获取指定 regionId 的 quota。
12+
- **Quota** 新增 `setMcqaConnHeader` 方法,支持用户通过自定义的 McqaConnHeader 重载 quota,以适配 MCQA 2.0。
13+
- **Instances** 新增适用于 MCQA 2.0 的 `get` 方法,需额外传入 MCQA 2.0 的 QuotaName 和 RegionId。
14+
- **Instance** 进一步适配 MCQA 2.0 作业。
15+
- **TableSchema** `basicallyEquals` 方法将不再严格检查两个类的 Class 类型一致性。
16+
### 优化
17+
- **SQLExecutor** `run` 方法中的 hints 现在会进行深拷贝,保护用户传入的 Map,支持不可变类型(如 `ImmutableMap`)。
18+
### 修复
19+
- **Stream** 修复 `create` 方法中的潜在 SQL 语法错误。
20+
321
## [0.50.1-public] - 2024-10-11
422

523
### 修复

odps-sdk/odps-sdk-commons/src/main/java/com/aliyun/odps/TableSchema.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ public boolean basicallyEquals(Object o) {
253253
if (this == o) {
254254
return true;
255255
}
256-
if (o == null || getClass() != o.getClass()) {
256+
if (!(o instanceof TableSchema)) {
257257
return false;
258258
}
259259
TableSchema that = (TableSchema) o;

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/Instance.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ public class SetInformationResult {
130130
private String project;
131131
private Map<String, TaskResult> results;
132132
private boolean isSync = false;
133+
private boolean isMcqaV2 = false;
133134

134135
private TaskStatusModel model;
135136

@@ -157,6 +158,10 @@ public class SetInformationResult {
157158

158159
this.odps = odps;
159160
this.client = odps.getRestClient();
161+
162+
if (getId().endsWith("_mcqa")) {
163+
setMcqaV2(true);
164+
}
160165
}
161166

162167
@Root(name = "Instance", strict = false)
@@ -297,7 +302,8 @@ private void reload(boolean isBlock) throws OdpsException {
297302
params.put("instancestatus", null);
298303
}
299304

300-
Response resp = client.request(getResource(), "GET", params, userDefinedHeaders, null);
305+
String resource = isMcqaV2 ? "/mcqa" + getResource() : getResource();
306+
Response resp = client.request(resource, "GET", params, userDefinedHeaders, null);
301307
model.owner = resp.getHeaders().get(Headers.ODPS_OWNER);
302308
String startTimeStr = resp.getHeaders().get(Headers.ODPS_START_TIME);
303309
String endTimeStr = resp.getHeaders().get(Headers.ODPS_END_TIME);
@@ -670,7 +676,7 @@ public TaskSummary getTaskSummary(String taskName) throws OdpsException {
670676
params.put("instancesummary", null);
671677
params.put("taskname", taskName);
672678
Response result = client.request(getResource(), "GET", params, null, null);
673-
679+
System.out.println(new String(result.getBody()));
674680
TaskSummary summary = null;
675681
try {
676682
Gson gson = GsonObjectBuilder.get();
@@ -855,6 +861,9 @@ public Map<String, TaskStatus> getTaskStatus() throws OdpsException {
855861
}
856862

857863
private boolean hasTaskStatus(TaskStatusModel model) {
864+
if (model.tasks == null || model.tasks.isEmpty()) {
865+
return false;
866+
}
858867
for (InstanceTaskModel taskModel : model.tasks) {
859868
TaskStatus status = new TaskStatus(taskModel);
860869
if (status.model.status == null) {
@@ -1391,6 +1400,9 @@ public String getLog(String workerId, LogType logType, int size) throws OdpsExce
13911400

13921401
/* Un-document */
13931402
public String getTaskDetailJson(String taskName) throws OdpsException {
1403+
if (isMcqaV2) {
1404+
return getTaskDetailJson2(taskName);
1405+
}
13941406
Map<String, String> params = new HashMap<String, String>();
13951407
params.put("instancedetail", null);
13961408
params.put("taskname", taskName);
@@ -1422,7 +1434,8 @@ public String getTaskDetailJson2(String taskName) throws OdpsException {
14221434
Map<String, String> params = new HashMap<String, String>();
14231435
params.put("detail", null);
14241436
params.put("taskname", taskName);
1425-
Response result = client.request(getResource(), "GET", params, null, null);
1437+
String resource = isMcqaV2 ? "/mcqa" + getResource() : getResource();
1438+
Response result = client.request(resource, "GET", params, null, null);
14261439
return new String(result.getBody());
14271440
}
14281441

@@ -1723,7 +1736,7 @@ public Status getStatus() {
17231736
* @throws OdpsException
17241737
*/
17251738
public InstanceQueueingInfo getQueueingInfo() throws OdpsException {
1726-
Map<String, String> params = new HashMap<String, String>();
1739+
Map<String, String> params = new HashMap<>();
17271740
params.put("cached", null);
17281741

17291742
Response resp = client.request(getResource(), "GET", params, null, null);
@@ -1744,6 +1757,12 @@ public void addUserDefinedHeaders(Map<String, String> headers) {
17441757
userDefinedHeaders.putAll(headers);
17451758
}
17461759

1760+
public void setMcqaV2(boolean mcqaV2) {
1761+
isMcqaV2 = mcqaV2;
1762+
client = odps.clone().getRestClient();
1763+
client.setPrefix("");
1764+
}
1765+
17471766
private Map<String, String> getCommonHeaders() {
17481767
Map<String, String> headers = new HashMap<>();
17491768
headers.put(Headers.CONTENT_TYPE, "application/xml");

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/Instances.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,35 @@ public Instance get(String projectName, String id) {
197197
return new Instance(projectName, model, null, odps);
198198
}
199199

200+
/**
201+
* 获取指定Instance(适用于 MCQA2.0)
202+
*
203+
* @param projectName
204+
* {@link Instance}所在的{@link Project}名称
205+
* @param id
206+
* Instance ID
207+
* @param quotaName
208+
* 指定的交互式quota
209+
* @param regionId
210+
* 指定的region,如果为null,则使用project的region
211+
* @return {@link Instance}对象
212+
* @throws OdpsException
213+
*/
214+
public Instance get(String projectName, String id, String quotaName, String regionId) throws OdpsException {
215+
Instance instance = get(projectName, id);
216+
if (id.endsWith("_mcqa") && StringUtils.isNullOrEmpty(quotaName)) {
217+
throw new IllegalArgumentException("quotaName cannot be null when get MCQA 2.0 instance");
218+
}
219+
if (!StringUtils.isNullOrEmpty(quotaName)) {
220+
Quota quota = odps.quotas().getWlmQuota(projectName, quotaName, regionId);
221+
if (quota.isInteractiveQuota()) {
222+
String mcqaConnHeader = quota.getMcqaConnHeader();
223+
instance.addUserDefinedHeaders(ImmutableMap.of(Headers.ODPS_MCQA_CONN, mcqaConnHeader));
224+
}
225+
}
226+
return instance;
227+
}
228+
200229

201230
/**
202231
* 判断指定 Instance 是否存在

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/Quota.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,6 +612,18 @@ public String getMcqaConnHeader() {
612612
return mcqaConnHeader;
613613
}
614614

615+
public void setMcqaConnHeader(String mcqaConnHeader) {
616+
if (StringUtils.isNullOrEmpty(mcqaConnHeader)) {
617+
throw new IllegalArgumentException("McqaConnHeader cannot be null or empty.");
618+
}
619+
setLoaded(true);
620+
if (model == null) {
621+
model = new QuotaModel();
622+
}
623+
this.model.resourceSystemType = ResourceSystemType.FUXI_VW.name();
624+
this.mcqaConnHeader = mcqaConnHeader;
625+
}
626+
615627
public Map<String, String> getProperties() {
616628
lazyLoad();
617629
if (model.properties != null) {

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/Quotas.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,14 +92,18 @@ public Quota get(String regionId, String name) {
9292
}
9393

9494
public Quota getWlmQuota(String project, String name) throws OdpsException {
95+
return getWlmQuota(project, name, null);
96+
}
97+
98+
public Quota getWlmQuota(String project, String name, String regionId) throws OdpsException {
9599
if (StringUtils.isNullOrEmpty(name)) {
96100
throw new IllegalArgumentException("Argument 'name' cannot be null or empty");
97101
}
98102
if (StringUtils.isNullOrEmpty(project)) {
99103
throw new IllegalArgumentException("Argument 'project' cannot be null or empty");
100104
}
101105
String tenantId = odps.projects().get(project).getTenantId();
102-
return new Quota(odps, null, name, tenantId);
106+
return new Quota(odps, regionId, name, tenantId);
103107
}
104108

105109
public Quota get(String name) {

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/Streams.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public void create(StreamIdentifier identifier, TableIdentifier refTable, boolea
130130
StringBuilder sql = new StringBuilder();
131131
sql.append("CREATE STREAM ");
132132
if (ifNotExists) {
133-
sql.append("IF NOTEXISTS ");
133+
sql.append("IF NOT EXISTS ");
134134
}
135135
sql.append(identifier).append(" ON TABLE ");
136136
sql.append(refTable);
@@ -140,7 +140,7 @@ public void create(StreamIdentifier identifier, TableIdentifier refTable, boolea
140140
if (version != null) {
141141
sql.append(" VERSION AS OF ").append(version);
142142
}
143-
sql.append(" STMPROPERTIES(\"read_mode\"=\"").append(readMode.name().toLowerCase())
143+
sql.append(" STRMPROPERTIES(\"read_mode\"=\"").append(readMode.name().toLowerCase())
144144
.append("\")");
145145
if (StringUtils.isNotBlank(comment)) {
146146
sql.append(" COMMENT ").append(comment);

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/sqa/SQLExecutorBuilder.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.aliyun.odps.Instance;
77
import com.aliyun.odps.Odps;
88
import com.aliyun.odps.OdpsException;
9+
import com.aliyun.odps.Quota;
910
import com.aliyun.odps.table.utils.Preconditions;
1011

1112
/**
@@ -41,6 +42,8 @@ public class SQLExecutorBuilder {
4142
private boolean sessionSupportNonSelect = false;
4243
private boolean useMcqaV2 = false;
4344
private Integer offlineJobPriority = null;
45+
private String regionId = null;
46+
private Quota quota = null;
4447

4548
public static SQLExecutorBuilder builder() {
4649
return new SQLExecutorBuilder();
@@ -90,6 +93,12 @@ public SQLExecutorBuilder tunnelEndpoint(String tunnelEndpoint) {
9093
return this;
9194
}
9295

96+
public SQLExecutorBuilder quota(Quota quota) {
97+
this.quota = quota;
98+
this.quotaName = quota.getNickname();
99+
return this;
100+
}
101+
93102
public SQLExecutorBuilder quotaName(String quotaName) {
94103
this.quotaName = quotaName;
95104
return this;
@@ -175,6 +184,11 @@ public SQLExecutorBuilder offlineJobPriority(Integer offlineJobPriority) {
175184
return this;
176185
}
177186

187+
public SQLExecutorBuilder regionId(String regionId) {
188+
this.regionId = regionId;
189+
return this;
190+
}
191+
178192
public ExecuteMode getExecuteMode() {
179193
return executeMode;
180194
}
@@ -211,6 +225,10 @@ public String getQuotaName() {
211225
return quotaName;
212226
}
213227

228+
public Quota getQuota() {
229+
return quota;
230+
}
231+
214232
public SQLExecutorPool getPool() {
215233
return pool;
216234
}
@@ -262,4 +280,8 @@ public boolean isUseMcqaV2() {
262280
public Integer getOfflineJobPriority() {
263281
return offlineJobPriority;
264282
}
283+
284+
public String getRegionId() {
285+
return regionId;
286+
}
265287
}

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/sqa/SQLExecutorImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -779,6 +779,8 @@ public ResultSet getResultSet(Long offset, Long countLimit, Long sizeLimit, bool
779779
public void run(String sql, Map<String, String> hint) throws OdpsException {
780780
if (hint == null) {
781781
hint = new HashMap<>();
782+
} else {
783+
hint = new HashMap<>(hint);
782784
}
783785
queryInfo = new QueryInfo(sql, hint, executeMode);
784786
queryInfo.setCommandInfo(new CommandInfo(sql, hint));

odps-sdk/odps-sdk-core/src/main/java/com/aliyun/odps/sqa/v2/SQLExecutorImpl.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,11 @@ public SQLExecutorImpl(SQLExecutorBuilder builder)
8585
// each executor has a uuid
8686
this.id = UUID.randomUUID().toString();
8787
mcqaOdps.getRestClient().setPrefix(MCQA_PREFIX);
88-
Quota quota = odps.quotas().getWlmQuota(odps.getDefaultProject(), quotaNickName);
88+
Quota quota = builder.getQuota();
89+
if (quota == null) {
90+
quota = odps.quotas()
91+
.getWlmQuota(odps.getDefaultProject(), quotaNickName, builder.getRegionId());
92+
}
8993
String mcqaConnectionHeader = quota.getMcqaConnHeader();
9094

9195
if (!quota.isInteractiveQuota()) {
@@ -113,6 +117,8 @@ public SQLExecutorImpl(SQLExecutorBuilder builder)
113117
public void run(String sql, Map<String, String> hint) throws OdpsException {
114118
if (hint == null) {
115119
hint = new HashMap<>();
120+
} else {
121+
hint = new HashMap<>(hint);
116122
}
117123
queryInfo = new QueryInfo(sql, hint, ExecuteMode.INTERACTIVE);
118124
queryInfo.setCommandInfo(new CommandInfo(sql, hint));
@@ -133,8 +139,10 @@ public void run(String sql, Map<String, String> hint) throws OdpsException {
133139
Instance currentInstance =
134140
SQLTask.run(mcqaOdps, mcqaOdps.getDefaultProject(), sql, taskName, hint,
135141
null);
142+
currentInstance.setMcqaV2(true);
136143
queryInfo.setInstance(currentInstance, ExecuteMode.INTERACTIVE, null, null);
137144
queryInfo.setSelect(isSelect(sql));
145+
log.add("Successfully submitted MCQA 2.0 Job, ID: " + currentInstance.getId());
138146
}
139147

140148
@Override
@@ -174,12 +182,19 @@ public String getLogView() {
174182

175183
@Override
176184
public boolean isActive() {
177-
return true;
185+
// in mcqa 1.0, this method will check session status,
186+
// however, in mcqa 2.0, no session is created, and this method will always return false.
187+
return false;
178188
}
179189

180190
@Override
181191
public void cancel() throws OdpsException {
182-
// do nothing in mcqa2.0
192+
if (queryInfo != null) {
193+
Instance instance = queryInfo.getInstance();
194+
if (instance.getStatus() == Instance.Status.RUNNING) {
195+
instance.stop();
196+
}
197+
}
183198
}
184199

185200
@Override
@@ -202,7 +217,9 @@ public List<Instance.StageProgress> getProgress() throws OdpsException {
202217

203218
@Override
204219
public List<String> getExecutionLog() {
205-
return log;
220+
List<String> executionLog = new ArrayList<>(log);
221+
log.clear();
222+
return executionLog;
206223
}
207224

208225
@Override

0 commit comments

Comments
 (0)