@@ -19,37 +19,34 @@ sidebar_position: 5
19
19
首先,您需通过 ` TableTunnel ` 的 ` buildUpsertSession ` 方法初始化一个 Upsert 会话,并配置必要的参数,如项目名、表名、模式名(如果适用)以及分区规格等。
20
20
``` java
21
21
TableTunnel tunnel = odps. tableTunnel();
22
- UpsertSession upsert = tunnel. buildUpsertSession(projectName, tableName)
22
+ UpsertSession upsert = tunnel. buildUpsertSession(projectName, tableName)
23
23
.setSchemaName(schemaName)
24
24
.setPartitionSpec(partitionSpec)
25
25
.build();
26
- System . out. println(" Upsert Session ID: " + upsert. getId());
26
+ System . out. println(" Upsert Session ID: " + upsert. getId());
27
27
```
28
28
若需要重新加载已有的 Upsert 会话,可以使用会话ID进行重建。
29
29
``` java
30
30
if (reload) {
31
31
String id = upsert. getId();
32
32
upsert = tunnel. buildUpsertSession(projectName, tableName)
33
- .setSchemaName(schemaName)
34
- .setPartitionSpec(partitionSpec)
35
- .setUpsertId(id)
36
- .build();
37
- }
33
+ .setSchemaName(schemaName)
34
+ .setPartitionSpec(partitionSpec)
35
+ .setUpsertId(id)
36
+ .build();
37
+ }
38
38
```
39
39
40
40
## 执行 Upsert 操作
41
41
接下来,创建 ` UpsertStream ` 实例,并利用它执行具体的插入或更新记录操作。
42
42
``` java
43
- try ( UpsertStream stream = upsert. buildUpsertStream(). setListener(listener) . build()) {
43
+ UpsertStream stream = upsert. buildUpsertStream(). build()
44
44
Record record = upsert. newRecord();
45
45
record. setString(" key" , " 0" );
46
46
record. setString(" value" , " v1" );
47
47
stream. upsert(record); // 执行 Upsert
48
48
stream. delete(record); // 删除记录示例
49
49
stream. flush(); // 刷新到缓冲区
50
- } catch (Exception e) {
51
- e. printStackTrace();
52
- }
53
50
```
54
51
55
52
## 提交并关闭会话
@@ -71,38 +68,38 @@ upsert.close();
71
68
import ... ; // 导入必要的包
72
69
73
70
public class UpsertDeltaTableDemo {
74
- public static void main (String [] args ) {
75
- try {
76
- TableTunnel tunnel = TableTunnel . newInstance(" <Your_ODPS_Instance>" );
77
- String projectName = " <Your_Project_Name>" ;
78
- String tableName = " <Your_Table_Name>" ;
79
- UpsertSession upsert = tunnel. buildUpsertSession(projectName, tableName)
80
- .setSchemaName(" <Your_Schema_Name>" )
81
- .setPartitionSpec(" <Your_Partition_Spec>" )
82
- .setLifecycle(1 )
83
- .build();
71
+ public static void main (String [] args ) {
72
+ try {
73
+ Account account = new AliyunAccount (" <Your_Access_ID>" , " <Your_Access_Key>" );
74
+ Odps odps = new Odps (account);
84
75
85
- try (UpsertStream stream = upsert. buildUpsertStream(). build()) {
86
- Record record = upsert. newRecord();
87
- record. setString(" key" , " exampleKey" );
88
- record. setString(" value" , " exampleValue" );
89
- stream. upsert(record);
90
- stream. flush();
91
- }
76
+ TableTunnel tunnel = odps. tableTunnel();
77
+ String projectName = " <Your_Project_Name>" ;
78
+ String tableName = " <Your_Table_Name>" ;
79
+ try (TableTunnel . UpsertSession upsert = tunnel. buildUpsertSession(projectName, tableName)
80
+ .setSchemaName(" <Your_Schema_Name>" )
81
+ .setPartitionSpec(" <Your_Partition_Spec>" )
82
+ .build()) {
83
+ UpsertStream stream = upsert. buildUpsertStream(). build();
84
+ Record record = upsert. newRecord();
85
+ record. setString(" key" , " exampleKey" );
86
+ record. setString(" value" , " exampleValue" );
87
+ stream. upsert(record);
88
+ stream. flush();
92
89
93
- upsert. commit(false );
94
- System . out. println(" Commit successful." );
95
- upsert. close();
96
- } catch (Exception e) {
97
- e. printStackTrace();
98
- }
90
+ upsert. commit(false );
91
+ System . out. println(" Commit successful." );
92
+ }
93
+ } catch (Exception e) {
94
+ e. printStackTrace();
99
95
}
96
+ }
100
97
}
101
98
```
102
99
103
100
## 注意事项
104
101
- 由于主键表的写入特性,我们在并发写入同一张表(分区)时,应当谨慎地控制写入逻辑。如果存在多个并发,同时对同一主键进行写入,则可能发生不可预期的行为。
105
- 常用的方案是使用 shuffle by pk 操作,将相同主键的记录分配到同一个线程中进行写入。
102
+ 常用的方案是使用 shuffle by pk 操作,将相同主键的记录分配到同一个线程中进行写入。
106
103
107
104
## 相关链接
108
105
- [ UpsertSession API 参考] ( ../api-reference/tunnel/UpsertSession.md )
0 commit comments