Skip to content

Commit 3cd5b9e

Browse files
committed
feat(tunnel-examples): 改进 Schema Evolution 示例
1 parent 9e7fca8 commit 3cd5b9e

File tree

4 files changed

+78
-5
lines changed

4 files changed

+78
-5
lines changed

odps-examples/tunnel-examples/src/main/java/SchemaEvolution/StreamUploadIfSchemaEvolutionExpectedSample.java

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package SchemaEvolution;
22

3+
import java.util.List;
34
import java.util.concurrent.TimeUnit;
5+
import java.util.stream.Collectors;
46

7+
import com.aliyun.odps.Column;
58
import com.aliyun.odps.Instance;
69
import com.aliyun.odps.Odps;
710
import com.aliyun.odps.OdpsException;
@@ -108,7 +111,8 @@ public static void main(String[] args) throws Exception {
108111
// IllegalArgumentException will throw when the data is not compatible with the session schema
109112
// then should rebuild the session
110113
recordPack.flush();
111-
rebuildSessionUtilSchemaEvolution(odps, null);
114+
session = rebuildSessionUtilSchemaEvolution(odps, null);
115+
recordPack = session.newRecordPack();
112116
}
113117
// Append the record to the record pack for uploading
114118
recordPack.append(record);
@@ -146,8 +150,11 @@ private static TableTunnel.StreamUploadSession rebuildSessionUtilSchemaEvolution
146150
.setPartitionSpec(partition == null ? null : new PartitionSpec(partition))
147151
.allowSchemaMismatch(false)
148152
.build();
149-
} while (!odps.tables().get(project, table).getSchema()
150-
.basicallyEquals(session.getSchema()));
153+
System.out.println("Session Schema: " + debugString(session.getSchema()));
154+
System.out.println("Table Schema: " + debugString(odps.tables().get(project, table).getSchema()));
155+
156+
} while (!basicallyEquals(odps.tables().get(project, table).getSchema()
157+
, session.getSchema()));
151158
return session;
152159
}
153160
}
@@ -179,4 +186,30 @@ private static void triggerSchemaEvolution() throws OdpsException {
179186
System.out.println(getOdps().logview().generateLogView(instance, 24));
180187
instance.waitForSuccess();
181188
}
189+
190+
private static String debugString(TableSchema schema) {
191+
return schema.getAllColumns().stream()
192+
.map(column -> column.getName() + "(" + column.getTypeInfo().getTypeName() + ")")
193+
.collect(Collectors.joining(", "));
194+
}
195+
196+
/**
197+
* Check if two schemas are basically equal
198+
*/
199+
private static boolean basicallyEquals(TableSchema a, TableSchema b) {
200+
List<Column> columnsA = a.getAllColumns();
201+
List<Column> columnsB = b.getAllColumns();
202+
if (columnsA.size() != columnsB.size()) {
203+
return false;
204+
}
205+
for (int i = 0; i < columnsA.size(); i++) {
206+
Column columnA = columnsA.get(i);
207+
Column columnB = columnsB.get(i);
208+
if (!columnA.getName().equals(columnB.getName()) || !columnA.getTypeInfo().getTypeName()
209+
.equals(columnB.getTypeInfo().getTypeName())) {
210+
return false;
211+
}
212+
}
213+
return true;
214+
}
182215
}

odps-examples/tunnel-examples/src/main/java/SchemaEvolution/StreamUploadIfSchemaEvolutionUnexpectedSample.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package SchemaEvolution;
22

3+
import java.util.List;
34
import java.util.concurrent.TimeUnit;
5+
import java.util.stream.Collectors;
46

7+
import com.aliyun.odps.Column;
58
import com.aliyun.odps.Instance;
69
import com.aliyun.odps.Odps;
710
import com.aliyun.odps.OdpsException;
@@ -158,8 +161,11 @@ private static TableTunnel.StreamUploadSession rebuildSessionUtilSchemaEvolution
158161
.setPartitionSpec(partition == null ? null : new PartitionSpec(partition))
159162
.allowSchemaMismatch(false)
160163
.build();
161-
} while (!odps.tables().get(project, table).getSchema()
162-
.basicallyEquals(session.getSchema()));
164+
System.out.println("Session Schema: " + debugString(session.getSchema()));
165+
System.out.println("Table Schema: " + debugString(odps.tables().get(project, table).getSchema()));
166+
167+
} while (!basicallyEquals(odps.tables().get(project, table).getSchema()
168+
, session.getSchema()));
163169
return session;
164170
}
165171
}
@@ -191,4 +197,30 @@ private static void triggerSchemaEvolution() throws OdpsException {
191197
System.out.println(getOdps().logview().generateLogView(instance, 24));
192198
instance.waitForSuccess();
193199
}
200+
201+
private static String debugString(TableSchema schema) {
202+
return schema.getAllColumns().stream()
203+
.map(column -> column.getName() + "(" + column.getTypeInfo().getTypeName() + ")")
204+
.collect(Collectors.joining(", "));
205+
}
206+
207+
/**
208+
* Check if two schemas are basically equal
209+
*/
210+
private static boolean basicallyEquals(TableSchema a, TableSchema b) {
211+
List<Column> columnsA = a.getAllColumns();
212+
List<Column> columnsB = b.getAllColumns();
213+
if (columnsA.size() != columnsB.size()) {
214+
return false;
215+
}
216+
for (int i = 0; i < columnsA.size(); i++) {
217+
Column columnA = columnsA.get(i);
218+
Column columnB = columnsB.get(i);
219+
if (!columnA.getName().equals(columnB.getName()) || !columnA.getTypeInfo().getTypeName()
220+
.equals(columnB.getTypeInfo().getTypeName())) {
221+
return false;
222+
}
223+
}
224+
return true;
225+
}
194226
}

odps-examples/tunnel-examples/src/main/java/TunnelStreamingUploadExample.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
import java.io.IOException;
22
import java.util.concurrent.TimeUnit;
33

4+
import com.aliyun.odps.Instance;
5+
import com.aliyun.odps.Odps;
6+
import com.aliyun.odps.OdpsException;
7+
import com.aliyun.odps.PartitionSpec;
8+
import com.aliyun.odps.TableSchema;
49
import com.aliyun.odps.account.Account;
510
import com.aliyun.odps.account.AliyunAccount;
611
import com.aliyun.odps.account.StsAccount;

odps-examples/tunnel-examples/src/main/java/TunnelTimeDataSample.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
import com.aliyun.odps.TableSchema;
1717
import com.aliyun.odps.account.Account;
1818
import com.aliyun.odps.account.AliyunAccount;
19+
import com.aliyun.odps.data.ArrayRecord;
20+
import com.aliyun.odps.data.RecordReader;
21+
import com.aliyun.odps.data.RecordWriter;
1922
import com.aliyun.odps.tunnel.TableTunnel;
2023
import com.aliyun.odps.tunnel.TableTunnel.DownloadSession;
2124
import com.aliyun.odps.tunnel.TableTunnel.UploadSession;

0 commit comments

Comments
 (0)