Skip to content

Commit f6db44a

Browse files
committed
docs: 更新 UpsertSession 初始化文档并添加示例代码
1 parent f91534f commit f6db44a

File tree

6 files changed

+310
-47
lines changed

6 files changed

+310
-47
lines changed

.gitignore

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,34 @@
22
.classpath
33
.project
44
*.iml
5-
.idea
65
target/
76
.DS_Store
87
*.pyc
98
replace_config.sh
109
ajcore.*
1110
.vscode/
11+
12+
# Dependencies
13+
/docs/node_modules
14+
15+
# Production
16+
/docs/build
17+
18+
# Generated files
19+
.docusaurus
20+
.cache-loader
21+
22+
# Misc
23+
.DS_Store
24+
.env.local
25+
.env.development.local
26+
.env.test.local
27+
.env.production.local
28+
29+
npm-debug.log*
30+
yarn-debug.log*
31+
yarn-error.log*
32+
33+
# IDE
34+
.idea
35+
.idea/*

docs/docs/api-reference/tunnel/TableTunnel.md

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ sidebar_position: 3
1515

1616
- `StreamUploadSession`
1717
- `UploadSession`
18-
- `UpsertSession`
18+
- [UpsertSession](#初始化upsertsession)
1919

2020
用户可以根据业务需求选择不同的写入类,以满足不同场景下的需求。下面提供了一个快速选择写入类的流程图。
2121

@@ -81,15 +81,29 @@ sidebar_position: 3
8181
### 待完善
8282

8383
## 初始化
84-
### 待完善
8584

86-
在使用 `TableTunnel` 之前,需要进行初始化操作,包括设置访问ID、访问密钥、ODPS和Tunnel的端点等。
85+
在使用 `TableTunnel` 之前,需要进行 Odps 对象的初始化操作,对 Odps 对象的初始化操作参考[初始化 Odps 对象](../../example-code/init-odps-client.md)
86+
87+
根据是否需要自定义TableTunnel的配置,可以使用如下两个接口来初始化TableTunnel。
88+
89+
**无参初始化**
90+
```java
91+
TableTunnel tunnel = odps.tableTunnel();
92+
```
93+
94+
**使用配置文件进行初始化**
95+
```java
96+
// com.aliyun.odps.tunnel.Configuration;
97+
Configuration configuration=Configuration.builder(odps).build();
98+
TableTunnel tunnel = odps.tableTunnel(configuration);
99+
```
100+
有关配置文件的配置项,请参考[Configuration 文档](Configuration.md)
101+
102+
103+
## 初始化UpsertSession
87104

105+
`TableTunnel` 提供了 `buildUpsertSession` 方法来初始化一个 Upsert 会话的 Builder。
88106
```java
89-
Account account = new AliyunAccount("<your access id>", "<your access key>");
90-
Odps odps = new Odps(account);
91-
odps.setEndpoint("<your odps endpoint>");
92-
odps.setDefaultProject("<your project>");
93-
TableTunnel tunnel = new TableTunnel(odps);
94-
tunnel.setEndpoint("<your tunnel endpoint>");
107+
TableTunnel.UpsertSession.Builder buildUpsertSession(String projectName, String tableName);
95108
```
109+
有关 Upsert 会话 Builder 的使用方法和全部配置项,请参考[UpsertSession 初始化](UpsertSession.md#builder-接口)

docs/docs/api-reference/tunnel/UpsertSession.md

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,29 +3,32 @@ title: UpsertSession
33
sidebar_position: 4
44
---
55

6-
# UpsertSession 接口文档
6+
# UpsertSession
77

88
## 概述
99

1010
`UpsertSession` 接口用于管理和操作数据插入或更新的会话。通过该接口,用户可以获取会话的相关信息、提交或中止会话、创建新的记录对象等。
1111

12-
## 方法
12+
## 初始化
13+
通常由[TableTunnel#buildUpsertSession](TableTunnel.md#初始化upsertsession)方法创建 Builder,
14+
[Builder接口](UpsertSession.md#builder-接口)的build方法来构建 UpsertSession。
1315

16+
## 方法
1417
### 获取Session ID
1518

1619
```java
1720
String getId();
1821
```
1922

20-
返回当前会话的ID。
23+
返回当前会话的ID,可以基于这个ID进行重建会话操作
2124

2225
### 获取当前 Quota
2326

2427
```java
2528
public String getQuotaName();
2629
```
2730

28-
返回当前的配额名称
31+
返回当前使用的Quota名称,注意不是QuotaNickName
2932

3033
### 获取Session状态
3134

@@ -34,20 +37,20 @@ String getStatus() throws TunnelException;
3437
```
3538

3639
返回当前会话的状态码,可能的状态包括:
37-
- normal
38-
- committing
39-
- committed
40-
- expired
41-
- critical
42-
- aborted
40+
- normal 正常
41+
- committing 提交中
42+
- committed 已提交
43+
- expired 过期
44+
- critical 错误
45+
- aborted 已中止
4346

4447
### 获取表结构
4548

4649
```java
4750
TableSchema getSchema();
4851
```
4952

50-
返回当前会话的表结构
53+
返回当前会话写入的表的表结构
5154

5255
### 提交UpsertSession
5356

@@ -56,6 +59,7 @@ void commit(boolean async) throws TunnelException;
5659
```
5760

5861
提交当前会话。可以选择是否异步提交。
62+
当选择异步提交时,提交操作像服务端发送提交请求后立即返回,不会等待数据提交完成,这也意味着数据不会立即可见。
5963

6064
### 中止UpsertSession
6165

@@ -81,6 +85,9 @@ Record newRecord();
8185

8286
创建并返回一个新的 `Record` 对象。
8387

88+
这个`Record``UpsertRecord`的实例,这个实例包含执行upsert操作必要的一些信息。
89+
因此当使用Upsert操作时,永远记得使用这个方法,来获取`Record`的实例。
90+
8491
### 构建UpsertStream
8592

8693
```java
@@ -89,10 +96,14 @@ UpsertStream.Builder buildUpsertStream();
8996

9097
返回一个用于构建 `UpsertStream``Builder` 对象。
9198

99+
`UpsertStream` 是用于执行数据插入或更新操作的核心接口,详见 [UpsertStream 接口文档](UpsertStream.md)
100+
92101
## Builder 接口
93102

94103
`UpsertSession.Builder` 接口用于构建 `UpsertSession` 对象。
95104

105+
通常由[TableTunnel#buildUpsertSession](TableTunnel.md#初始化upsertsession) 方法创建。
106+
96107
### 获取和设置Upsert ID
97108

98109
```java
@@ -139,7 +150,7 @@ UpsertSession.Builder setCommitTimeout(long commitTimeoutMs);
139150

140151
获取和设置提交超时时间(毫秒)。
141152

142-
### 设置网络线程数
153+
### 设置Netty进行网络IO的线程数
143154

144155
```java
145156
UpsertSession.Builder setNetworkThreadNum(int threadNum);
@@ -178,7 +189,7 @@ long getLifecycle();
178189
UpsertSession.Builder setLifecycle(long lifecycle);
179190
```
180191

181-
获取和设置Session生命周期(小时),有效值域为1 - 24,指定有效值域以外的值该参数会被忽略,使用服务端默认值。
192+
获取和设置Session生命周期(小时),有效取值范围为1 - 24,指定有效取值范围以外的值该参数会被忽略,使用服务端默认值。
182193

183194
### 构建UpsertSession
184195

docs/docs/api-reference/tunnel/UpsertStream.md

Lines changed: 120 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
---
2-
title: UpsertStreamImpl
2+
title: UpsertStream
33
sidebar_position: 5
44
---
55

6-
# UpsertStream 类文档
6+
# UpsertStream
77

88
## 概述
99

10-
在当前版本,`UpsertStreamImpl` 类是 `UpsertStream`接口唯一的实现类,因此本文主要介绍`UpsertStreamImpl`的类接口。
10+
在当前版本,`UpsertStreamImpl` 类是 `UpsertStream`接口唯一的实现类,因此本文主要介绍`UpsertStreamImpl`
11+
的类接口。
1112
通过该类,用户可以对`Delta Table`进行数据的插入、更新、删除操作,并支持数据的缓冲和批量提交。
1213

1314
## 构造方法
@@ -27,16 +28,18 @@ public UpsertStreamImpl(Builder builder)throws IOException,TunnelException
2728

2829
```java
2930
@Override
30-
public void upsert(Record record) throws IOException,TunnelException
31+
public void upsert(Record record)throws IOException,TunnelException
3132
```
3233

3334
插入或更新一条记录。
3435

3536
参数列表:
36-
record:**注:参数`record`应当是`UpsertRecord`的实例**,这个实例可以通过 [`UpsertSession#newRecord()`](UpsertSession.md#创建一个record对象) 方法创建。
37+
record:**注:参数`record`应当是`UpsertRecord`的实例**
38+
,这个实例可以通过 [`UpsertSession#newRecord()`](UpsertSession.md#创建一个record对象) 方法创建。
3739

40+
:::info
3841
`UpsertRecord`内部维护了一些元数据隐藏列,因此用户不应当手动创建`ArrayRecord`对象将其传入。
39-
42+
:::
4043

4144
### 插入或更新部分列
4245

@@ -47,7 +50,8 @@ public void upsert(Record record,List<String> upsertCols)throws IOException,Tunn
4750

4851
插入或更新一条记录的部分列。
4952

50-
record : `UpsertRecord`的实例,这个实例可以通过 [`UpsertSession#newRecord()`](UpsertSession.md#创建一个record对象) 方法创建。
53+
record : `UpsertRecord`
54+
的实例,这个实例可以通过 [`UpsertSession#newRecord()`](UpsertSession.md#创建一个record对象) 方法创建。
5155

5256
upsertCols : 一个包含要更新的列名的列表。
5357

@@ -60,7 +64,8 @@ public void delete(Record record)throws IOException,TunnelException
6064

6165
删除一条记录。
6266

63-
record : `UpsertRecord`的实例,这个实例可以通过 [`UpsertSession#newRecord()`](UpsertSession.md#创建一个record对象) 方法创建。
67+
record : `UpsertRecord`
68+
的实例,这个实例可以通过 [`UpsertSession#newRecord()`](UpsertSession.md#创建一个record对象) 方法创建。
6469

6570
### 刷新缓冲区
6671

@@ -89,24 +94,116 @@ public void reset()throws IOException
8994

9095
重置缓冲区,清空所有未提交的数据。
9196

97+
## 重试逻辑
98+
99+
UpsertStream 通过用户传入 `Listener` 监听器来处理重试逻辑。当用户不传入 `Listener` 时,UpsertStream
100+
将使用一个默认的 `Listener` 实例(`UpsertSessionImpl.DefaultUpsertSteamListener`)
101+
102+
这个默认的`Listener`实例,遵循`TableTunnel`默认的[重试逻辑](RetryLogic.md)
103+
104+
:::tip
105+
十分推荐当用户实现自定义`Listener`时,继承`UpsertSessionImpl.DefaultUpsertSteamListener`
106+
,以使`TableTunnel`的重试逻辑生效。
107+
:::
108+
92109

93110
## Builder 类
94111

95-
`UpsertStreamImpl.Builder` 类用于构建 `UpsertStreamImpl` 对象。通常由 [`UpsertSession#buildUpsertStream()`](UpsertSession.md#构建upsertstream) 方法调用。
96-
97-
### 方法
98-
99-
- `public Builder setSession(UpsertSessionImpl session)`
100-
- `public UpsertSessionImpl getSession()`
101-
- `public long getMaxBufferSize()`
102-
- `public Builder setMaxBufferSize(long maxBufferSize)`
103-
- `public long getSlotBufferSize()`
104-
- `public Builder setSlotBufferSize(long slotBufferSize)`
105-
- `@Override public CompressOption getCompressOption()`
106-
- `@Override public Builder setCompressOption(CompressOption compressOption)`
107-
- `@Override public Listener getListener()`
108-
- `@Override public Builder setListener(Listener listener)`
109-
- `@Override public UpsertStream build() throws IOException, TunnelException`
112+
`UpsertStreamImpl.Builder` 类是构建 `UpsertStreamImpl` 实例的基石,它提供了灵活的配置选项以便用户根据需求定制 `UpsertStream` 的行为。
113+
此构造器主要通过 [`UpsertSession#buildUpsertStream()`](UpsertSession.md#构建upsertstream) 方法被调用,允许用户在创建 `UpsertStream` 时精确控制多个参数,
114+
以优化数据插入或更新的性能和资源使用。
115+
116+
### 方法详情
117+
118+
#### 设置会话对象
119+
120+
- **方法签名**:
121+
```java
122+
public Builder setSession(UpsertSessionImpl session)
123+
```
124+
- **功能说明**:
125+
`UpsertStream` 指定关联的 `UpsertSessionImpl` 实例。这是UpsertSession#buildUpsertStream()的默认步骤,通常不需要用户手动调用。
126+
127+
#### 获取会话对象
128+
129+
- **方法签名**:
130+
```java
131+
public UpsertSessionImpl getSession()
132+
```
133+
- **功能说明**:
134+
返回已设置的 `UpsertSessionImpl` 实例,供外部查询或验证使用。
135+
136+
#### 设置最大缓冲区大小
137+
138+
- **方法签名**:
139+
```java
140+
public Builder setMaxBufferSize(long maxBufferSize)
141+
```
142+
- **功能说明**:
143+
允许用户设定数据缓冲的最大容量(单位:字节)。这直接影响到内存使用和批量写入的效率,较大的缓冲区可以减少I/O操作次数,但会占用更多内存。
144+
145+
#### 获取最大缓冲区大小
146+
147+
- **方法签名**:
148+
```java
149+
public long getMaxBufferSize()
150+
```
151+
- **功能说明**:
152+
查询当前设置的最大缓冲区大小。
153+
154+
#### 设置槽缓冲区大小
155+
156+
- **方法签名**:
157+
```java
158+
public Builder setSlotBufferSize(long slotBufferSize)
159+
```
160+
- **功能说明**:
161+
为每个桶设置缓冲区大小(单位:字节)。
162+
163+
#### 获取槽缓冲区大小
164+
165+
- **方法签名**:
166+
```java
167+
public long getSlotBufferSize()
168+
```
169+
- **功能说明**:
170+
获取当前设置的每个桶的缓冲区大小。
171+
172+
#### 压缩选项配置
173+
174+
- **获取压缩选项**:
175+
```java
176+
@Override public CompressOption getCompressOption()
177+
```
178+
- **设置压缩选项**:
179+
```java
180+
@Override public Builder setCompressOption(CompressOption compressOption)
181+
```
182+
- **功能说明**:
183+
控制数据在传输前的压缩策略。通过选择合适的压缩算法,可以在减少网络传输成本和存储空间的同时,可能会增加一定的CPU使用。
184+
185+
#### 设置监听器
186+
187+
- **获取监听器**:
188+
```java
189+
@Override public Listener getListener()
190+
```
191+
- **设置监听器**:
192+
```java
193+
@Override public Builder setListener(Listener listener)
194+
```
195+
- **功能说明**:
196+
注册一个监听器,用于接收 `UpsertStream` 在数据写入过程中的事件通知,和重试逻辑的配置。UpsertSession#buildUpsertStream() 方法会为 UpsertStream 配置一个默认的 Listener 实例。详情参考[重试逻辑](#重试逻辑)
197+
198+
199+
#### 构建 UpsertStream 实例
200+
201+
- **方法签名**:
202+
```java
203+
@Override public UpsertStream build() throws IOException, TunnelException
204+
```
205+
- **功能说明**:
206+
根据之前设置的所有配置项构建并返回一个 `UpsertStream` 实例。
110207

111208
## 总结
112209

0 commit comments

Comments
 (0)