Skip to content

Commit 9e7fca8

Browse files
committed
feat(tunnel-examples): 添加 Schema Evolution 示例
1 parent 1376809 commit 9e7fca8

File tree

3 files changed

+378
-2
lines changed

3 files changed

+378
-2
lines changed

odps-examples/tunnel-examples/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@
1111
<dependency>
1212
<groupId>com.aliyun.odps</groupId>
1313
<artifactId>odps-sdk-commons</artifactId>
14-
<version>0.50.0-public</version>
14+
<version>0.50.1-public</version>
1515
</dependency>
1616
<dependency>
1717
<groupId>com.aliyun.odps</groupId>
1818
<artifactId>odps-sdk-core</artifactId>
19-
<version>0.50.0-public</version>
19+
<version>0.50.1-public</version>
2020
</dependency>
2121
</dependencies>
2222
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
package SchemaEvolution;
2+
3+
import java.util.concurrent.TimeUnit;
4+
5+
import com.aliyun.odps.Instance;
6+
import com.aliyun.odps.Odps;
7+
import com.aliyun.odps.OdpsException;
8+
import com.aliyun.odps.PartitionSpec;
9+
import com.aliyun.odps.TableSchema;
10+
import com.aliyun.odps.account.Account;
11+
import com.aliyun.odps.account.AliyunAccount;
12+
import com.aliyun.odps.account.StsAccount;
13+
import com.aliyun.odps.data.Record;
14+
import com.aliyun.odps.task.SQLTask;
15+
import com.aliyun.odps.tunnel.TableTunnel;
16+
import com.aliyun.odps.utils.StringUtils;
17+
18+
/**
19+
* StreamingUploadExample demonstrates how to upload data to ODPS using streaming upload functionality.
20+
* <p>
21+
* The response scenario is While Client A is continuously writing,
22+
* Client B triggers Schema Evolution, and upstream data uses the new Schema.
23+
* <p>
24+
* This example uses Access Key authentication.
25+
* <p>
26+
* Note: Make sure to replace the placeholders with your actual ODPS credentials and parameters.
27+
* <p>
28+
* Available only in odps-sdk >= 0.51.0
29+
*/
30+
public class StreamUploadIfSchemaEvolutionExpectedSample {
31+
32+
// Replace placeholders with your own credentials and parameters
33+
private static String accessId; // Your Access ID
34+
private static String accessKey; // Your Access Key
35+
private static String odpsUrl; // Your ODPS endpoint URL
36+
private static String project; // Your ODPS project name
37+
private static String table; // The table to upload data
38+
// Specifying partition if the target table is partitioned
39+
// If the table is not partitioned, set this to null or an empty string
40+
private static String partition;
41+
42+
/**
43+
* Initializes the ODPS client using Access Key authentication.
44+
* You can also use STS authentication via {@link StsAccount}.
45+
*
46+
* @return the configured ODPS client instance.
47+
*/
48+
private static Odps getOdps() {
49+
// Create an AliyunAccount instance with your Access ID and Access Key
50+
Account account = new AliyunAccount(accessId, accessKey);
51+
// Initialize ODPS with the account credentials
52+
Odps odps = new Odps(account);
53+
// Set the endpoint for the ODPS client
54+
odps.setEndpoint(odpsUrl);
55+
// Set the default project for operations
56+
odps.setDefaultProject(project);
57+
// Return the configured ODPS client instance.
58+
return odps;
59+
}
60+
61+
public static void main(String[] args) throws Exception {
62+
// Get the initialized ODPS client
63+
Odps odps = getOdps();
64+
createTestTable();
65+
try {
66+
// Initialize a stream upload session for the specified table
67+
TableTunnel.StreamUploadSession session =
68+
odps.tableTunnel()
69+
.buildStreamUploadSession(project, table)
70+
.setPartitionSpec(partition == null ? null : new PartitionSpec(partition))
71+
.allowSchemaMismatch(false)
72+
.build();
73+
// record the schema version
74+
System.out.println(session.getSchemaVersion());
75+
// Create a new record pack for batch operations
76+
TableTunnel.StreamRecordPack recordPack = session.newRecordPack();
77+
78+
// Loop to create and append multiple records
79+
for (int i = 0; i < 100; i++) {
80+
// Create a new record instance for the session
81+
Record record = session.newRecord();
82+
// Set values for each column in the record
83+
record.setBigint("c1", (long) i); // First column
84+
record.setBigint("c2", (long) (i * 2)); // Second column
85+
// Append the record to the record pack for uploading
86+
recordPack.append(record);
87+
}
88+
// Wait 3 seconds so as not to take up too many resources
89+
TimeUnit.SECONDS.sleep(3);
90+
91+
// upstream trigger schema evolution, and start send new data to downstream (three column data)
92+
triggerSchemaEvolution();
93+
94+
// If the client can detect that schema evolution has occurred, for example, if it gets a 'SchemaChangeEvent',
95+
// it can directly reconstruct the Session.
96+
// TODO: rebuildSessionUtilSchemaEvolution(odps, null);
97+
98+
// Loop to create and append multiple records
99+
for (int i = 0; i < 100; i++) {
100+
// Create a new record instance for the session
101+
Record record = session.newRecord();
102+
// Set values for each column in the record
103+
try {
104+
record.setBigint("c1", (long) i); // First column
105+
record.setBigint("c2", (long) (i * 2)); // Second column
106+
record.setBigint("c3", (long) (i * 2)); // New column
107+
} catch (IllegalArgumentException e) {
108+
// IllegalArgumentException will throw when the data is not compatible with the session schema
109+
// then should rebuild the session
110+
recordPack.flush();
111+
rebuildSessionUtilSchemaEvolution(odps, null);
112+
}
113+
// Append the record to the record pack for uploading
114+
recordPack.append(record);
115+
}
116+
recordPack.flush();
117+
} catch (Exception e) {
118+
e.printStackTrace();
119+
} finally {
120+
// Any cleanup code can be placed here
121+
// In this example, there's no specific resource to close, but it's good practice
122+
}
123+
}
124+
125+
private static TableTunnel.StreamUploadSession rebuildSessionUtilSchemaEvolution(Odps odps,
126+
String latestSchemaVersion)
127+
throws Exception {
128+
if (!StringUtils.isNullOrEmpty(latestSchemaVersion)) {
129+
return odps.tableTunnel()
130+
.buildStreamUploadSession(project, table)
131+
.setPartitionSpec(partition == null ? null : new PartitionSpec(partition))
132+
.allowSchemaMismatch(false)
133+
.setSchemaVersion(latestSchemaVersion)
134+
.build();
135+
} else {
136+
/**
137+
* If the schema has changed, but no new schema version,
138+
* (This may occur when the server is not updated and usually does not go to this branch.)
139+
* the user should re-create the session and check if the session has noticed the new schema.
140+
*/
141+
TableTunnel.StreamUploadSession session;
142+
do {
143+
session =
144+
odps.tableTunnel()
145+
.buildStreamUploadSession(project, table)
146+
.setPartitionSpec(partition == null ? null : new PartitionSpec(partition))
147+
.allowSchemaMismatch(false)
148+
.build();
149+
} while (!odps.tables().get(project, table).getSchema()
150+
.basicallyEquals(session.getSchema()));
151+
return session;
152+
}
153+
}
154+
155+
/**
156+
* Create a test table, with two columns, c1 bigint and c2 bigint
157+
*/
158+
private static void createTestTable() throws OdpsException {
159+
getOdps().tables().delete(project, table, true);
160+
getOdps()
161+
.tables()
162+
.newTableCreator(project, table, TableSchema.builder()
163+
.withBigintColumn("c1")
164+
.withBigintColumn("c2")
165+
.build())
166+
.ifNotExists()
167+
.debug()
168+
.create();
169+
}
170+
171+
/**
172+
* Trigger schema evolution on the table. Here we run a SQL statement to add a column.
173+
*/
174+
private static void triggerSchemaEvolution() throws OdpsException {
175+
Instance
176+
instance =
177+
SQLTask.run(getOdps(), "alter table " + project + "." + table + " add column c3 bigint;");
178+
// print logview to check the progress of schema evolution
179+
System.out.println(getOdps().logview().generateLogView(instance, 24));
180+
instance.waitForSuccess();
181+
}
182+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
package SchemaEvolution;
2+
3+
import java.util.concurrent.TimeUnit;
4+
5+
import com.aliyun.odps.Instance;
6+
import com.aliyun.odps.Odps;
7+
import com.aliyun.odps.OdpsException;
8+
import com.aliyun.odps.PartitionSpec;
9+
import com.aliyun.odps.TableSchema;
10+
import com.aliyun.odps.account.Account;
11+
import com.aliyun.odps.account.AliyunAccount;
12+
import com.aliyun.odps.account.StsAccount;
13+
import com.aliyun.odps.data.Record;
14+
import com.aliyun.odps.exceptions.SchemaMismatchException;
15+
import com.aliyun.odps.task.SQLTask;
16+
import com.aliyun.odps.tunnel.TableTunnel;
17+
import com.aliyun.odps.utils.StringUtils;
18+
19+
/**
20+
* StreamingUploadExample demonstrates how to upload data to ODPS using streaming upload functionality.
21+
* <p>
22+
* The response scenario is while Client A is continuously writing, Client B triggers Schema Evolution,
23+
* and upstream data uses the original Schema (actually an unintended situation, such as a user executing a DDL task).
24+
* <p>
25+
* This example uses Access Key authentication.
26+
* <p>
27+
* Note: Make sure to replace the placeholders with your actual ODPS credentials and parameters.
28+
* <p>
29+
* Available only in odps-sdk >= 0.51.0
30+
*/
31+
public class StreamUploadIfSchemaEvolutionUnexpectedSample {
32+
33+
// Replace placeholders with your own credentials and parameters
34+
private static String accessId; // Your Access ID
35+
private static String accessKey; // Your Access Key
36+
private static String odpsUrl; // Your ODPS endpoint URL
37+
private static String project; // Your ODPS project name
38+
private static String table; // The table to upload data
39+
// Specifying partition if the target table is partitioned
40+
// If the table is not partitioned, set this to null or an empty string
41+
private static String partition;
42+
43+
/**
44+
* Initializes the ODPS client using Access Key authentication.
45+
* You can also use STS authentication via {@link StsAccount}.
46+
*
47+
* @return the configured ODPS client instance.
48+
*/
49+
private static Odps getOdps() {
50+
// Create an AliyunAccount instance with your Access ID and Access Key
51+
Account account = new AliyunAccount(accessId, accessKey);
52+
// Initialize ODPS with the account credentials
53+
Odps odps = new Odps(account);
54+
// Set the endpoint for the ODPS client
55+
odps.setEndpoint(odpsUrl);
56+
// Set the default project for operations
57+
odps.setDefaultProject(project);
58+
// Return the configured ODPS client instance.
59+
return odps;
60+
}
61+
62+
public static void main(String[] args) throws Exception {
63+
// Get the initialized ODPS client
64+
Odps odps = getOdps();
65+
createTestTable();
66+
try {
67+
// If we have a data import segment, we continuously write data to the table.
68+
Thread dataWritingThread = new Thread(() -> {
69+
try {
70+
// Initialize a stream upload session for the specified table
71+
TableTunnel.StreamUploadSession session =
72+
odps.tableTunnel()
73+
.buildStreamUploadSession(project, table)
74+
.setPartitionSpec(partition == null ? null : new PartitionSpec(partition))
75+
.allowSchemaMismatch(false)
76+
.build();
77+
// record the schema version
78+
System.out.println(session.getSchemaVersion());
79+
// Create a new record pack for batch operations
80+
TableTunnel.StreamRecordPack recordPack = session.newRecordPack();
81+
// loop 30 times
82+
for (int j = 0; j < 30; j++) {
83+
// Loop to create and append multiple records
84+
for (int i = 0; i < 100; i++) {
85+
// Create a new record instance for the session
86+
Record record = session.newRecord();
87+
// Set values for each column in the record
88+
record.setBigint("c1", (long) i); // First column
89+
record.setBigint("c2", (long) (i * 2)); // Second column
90+
// Append the record to the record pack for uploading
91+
recordPack.append(record);
92+
}
93+
// Wait 3 seconds so as not to take up too many resources
94+
TimeUnit.SECONDS.sleep(3);
95+
try {
96+
// Flush the record pack to upload all records at once
97+
// When it is found that the table schema has changed, this method will throw the exception.
98+
recordPack.flush();
99+
System.out.println("flush success");
100+
} catch (SchemaMismatchException sme) {
101+
// If the schema has changed, user will get the SchemaMismatchException
102+
// FIXME: User intervention is actually required here, and the sample code demonstrates how to resume writing
103+
// The user should re-create the session using the new schema version at this time. In the example, we continue the loop.
104+
String newSchemaVersion = sme.getLatestSchemaVersion();
105+
session = rebuildSessionUtilSchemaEvolution(odps, newSchemaVersion);
106+
recordPack = session.newRecordPack();
107+
} catch (Exception e) {
108+
e.printStackTrace();
109+
break;
110+
}
111+
}
112+
} catch (Exception e) {
113+
e.printStackTrace();
114+
}
115+
});
116+
// At the same time, if another user changes the schema of the table...
117+
Thread schemaEvolutionThread = new Thread(() -> {
118+
try {
119+
TimeUnit.SECONDS.sleep(3);
120+
triggerSchemaEvolution();
121+
} catch (Exception e) {
122+
e.printStackTrace();
123+
}
124+
});
125+
dataWritingThread.start();
126+
schemaEvolutionThread.start();
127+
dataWritingThread.join();
128+
schemaEvolutionThread.join();
129+
} catch (Exception e) {
130+
e.printStackTrace();
131+
} finally {
132+
// Any cleanup code can be placed here
133+
// In this example, there's no specific resource to close, but it's good practice
134+
}
135+
}
136+
137+
private static TableTunnel.StreamUploadSession rebuildSessionUtilSchemaEvolution(Odps odps,
138+
String latestSchemaVersion)
139+
throws Exception {
140+
if (!StringUtils.isNullOrEmpty(latestSchemaVersion)) {
141+
return odps.tableTunnel()
142+
.buildStreamUploadSession(project, table)
143+
.setPartitionSpec(partition == null ? null : new PartitionSpec(partition))
144+
.allowSchemaMismatch(false)
145+
.setSchemaVersion(latestSchemaVersion)
146+
.build();
147+
} else {
148+
/**
149+
* If the schema has changed, but no new schema version,
150+
* (This may occur when the server is not updated and usually does not go to this branch.)
151+
* the user should re-create the session and check if the session has noticed the new schema.
152+
*/
153+
TableTunnel.StreamUploadSession session;
154+
do {
155+
session =
156+
odps.tableTunnel()
157+
.buildStreamUploadSession(project, table)
158+
.setPartitionSpec(partition == null ? null : new PartitionSpec(partition))
159+
.allowSchemaMismatch(false)
160+
.build();
161+
} while (!odps.tables().get(project, table).getSchema()
162+
.basicallyEquals(session.getSchema()));
163+
return session;
164+
}
165+
}
166+
167+
/**
168+
* Create a test table, with two columns, c1 bigint and c2 bigint
169+
*/
170+
private static void createTestTable() throws OdpsException {
171+
getOdps().tables().delete(project, table, true);
172+
getOdps()
173+
.tables()
174+
.newTableCreator(project, table, TableSchema.builder()
175+
.withBigintColumn("c1")
176+
.withBigintColumn("c2")
177+
.build())
178+
.ifNotExists()
179+
.debug()
180+
.create();
181+
}
182+
183+
/**
184+
* Trigger schema evolution on the table. Here we run a SQL statement to add a column.
185+
*/
186+
private static void triggerSchemaEvolution() throws OdpsException {
187+
Instance
188+
instance =
189+
SQLTask.run(getOdps(), "alter table " + project + "." + table + " add column c3 bigint;");
190+
// print logview to check the progress of schema evolution
191+
System.out.println(getOdps().logview().generateLogView(instance, 24));
192+
instance.waitForSuccess();
193+
}
194+
}

0 commit comments

Comments
 (0)