Skip to content

Commit 038f973

Browse files
authored
[bq] 0.2 refactoring
1 parent 760cde4 commit 038f973

File tree

9 files changed

+187
-78
lines changed

9 files changed

+187
-78
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright 2002-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.batch.extensions.bigquery.common;
18+
19+
import com.google.cloud.bigquery.BigQuery;
20+
import com.google.cloud.bigquery.FormatOptions;
21+
import com.google.cloud.bigquery.Job;
22+
import com.google.cloud.bigquery.TableId;
23+
import com.google.cloud.bigquery.WriteChannelConfiguration;
24+
import org.springframework.batch.extensions.bigquery.writer.BigQueryCsvItemWriter;
25+
import org.springframework.batch.extensions.bigquery.writer.BigQueryJsonItemWriter;
26+
import org.springframework.batch.extensions.bigquery.writer.builder.BigQueryCsvItemWriterBuilder;
27+
import org.springframework.batch.extensions.bigquery.writer.builder.BigQueryJsonItemWriterBuilder;
28+
import org.springframework.batch.item.Chunk;
29+
30+
import java.util.concurrent.atomic.AtomicReference;
31+
32+
public class BigQueryDataLoader {
33+
34+
public static final Chunk<PersonDto> CHUNK = Chunk.of(
35+
new PersonDto("Volodymyr", 27), new PersonDto("Oleksandra", 26)
36+
);
37+
38+
private final BigQuery bigQuery;
39+
40+
public BigQueryDataLoader(BigQuery bigQuery) {
41+
this.bigQuery = bigQuery;
42+
}
43+
44+
45+
public void loadCsvSample() throws Exception {
46+
loadCsvSample(TestConstants.PERSONS_TABLE);
47+
}
48+
49+
public void loadCsvSample(String tableName) throws Exception {
50+
AtomicReference<Job> job = new AtomicReference<>();
51+
52+
WriteChannelConfiguration channelConfiguration = WriteChannelConfiguration
53+
.newBuilder(TableId.of(TestConstants.DATASET, tableName))
54+
.setSchema(PersonDto.getBigQuerySchema())
55+
.setAutodetect(false)
56+
.setFormatOptions(FormatOptions.csv())
57+
.build();
58+
59+
BigQueryCsvItemWriter<PersonDto> writer = new BigQueryCsvItemWriterBuilder<PersonDto>()
60+
.bigQuery(bigQuery)
61+
.writeChannelConfig(channelConfiguration)
62+
.jobConsumer(job::set)
63+
.build();
64+
65+
writer.afterPropertiesSet();
66+
writer.write(CHUNK);
67+
job.get().waitFor();
68+
}
69+
70+
public void loadJsonSample(String tableName) throws Exception {
71+
AtomicReference<Job> job = new AtomicReference<>();
72+
73+
WriteChannelConfiguration channelConfiguration = WriteChannelConfiguration
74+
.newBuilder(TableId.of(TestConstants.DATASET, tableName))
75+
.setSchema(PersonDto.getBigQuerySchema())
76+
.setAutodetect(false)
77+
.setFormatOptions(FormatOptions.json())
78+
.build();
79+
80+
BigQueryJsonItemWriter<PersonDto> writer = new BigQueryJsonItemWriterBuilder<PersonDto>()
81+
.bigQuery(bigQuery)
82+
.writeChannelConfig(channelConfiguration)
83+
.jobConsumer(job::set)
84+
.build();
85+
86+
writer.afterPropertiesSet();
87+
writer.write(CHUNK);
88+
job.get().waitFor();
89+
}
90+
91+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2002-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.batch.extensions.bigquery.common;
18+
19+
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
20+
import com.google.cloud.bigquery.Field;
21+
import com.google.cloud.bigquery.Schema;
22+
import com.google.cloud.bigquery.StandardSQLTypeName;
23+
24+
@JsonPropertyOrder(value = {"name", "age"})
25+
public record PersonDto(String name, Integer age) {
26+
27+
public static Schema getBigQuerySchema() {
28+
Field nameField = Field.newBuilder("name", StandardSQLTypeName.STRING).build();
29+
Field ageField = Field.newBuilder("age", StandardSQLTypeName.INT64).build();
30+
return Schema.of(nameField, ageField);
31+
}
32+
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright 2002-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.batch.extensions.bigquery.common;
18+
19+
public class TestConstants {
20+
21+
private TestConstants() {}
22+
23+
public static final String DATASET = "spring_batch_extensions";
24+
public static final String PERSONS_TABLE = "persons";
25+
26+
}

spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/integration/writer/BigQueryCsvItemWriterTest.java

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818

1919
import com.google.cloud.bigquery.BigQuery;
2020
import com.google.cloud.bigquery.Dataset;
21-
import com.google.cloud.bigquery.FormatOptions;
22-
import com.google.cloud.bigquery.JobId;
2321
import com.google.cloud.bigquery.Table;
2422
import com.google.cloud.bigquery.TableId;
2523
import com.google.cloud.bigquery.TableResult;
@@ -28,35 +26,23 @@
2826
import org.junit.jupiter.api.Tag;
2927
import org.junit.jupiter.api.Test;
3028
import org.junit.jupiter.api.TestInfo;
29+
import org.springframework.batch.extensions.bigquery.common.BigQueryDataLoader;
30+
import org.springframework.batch.extensions.bigquery.common.PersonDto;
31+
import org.springframework.batch.extensions.bigquery.common.TestConstants;
3132
import org.springframework.batch.extensions.bigquery.integration.writer.base.BaseBigQueryItemWriterTest;
32-
import org.springframework.batch.extensions.bigquery.writer.BigQueryCsvItemWriter;
33-
import org.springframework.batch.extensions.bigquery.writer.builder.BigQueryCsvItemWriterBuilder;
3433
import org.springframework.batch.item.Chunk;
3534

36-
import java.util.concurrent.atomic.AtomicReference;
37-
3835
@Tag("csv")
3936
public class BigQueryCsvItemWriterTest extends BaseBigQueryItemWriterTest {
4037

4138
@Test
4239
void test1(TestInfo testInfo) throws Exception {
43-
AtomicReference<JobId> jobId = new AtomicReference<>();
44-
45-
BigQueryCsvItemWriter writer = new BigQueryCsvItemWriterBuilder<PersonDto>()
46-
.bigQuery(bigQuery)
47-
.writeChannelConfig(generateConfiguration(testInfo, FormatOptions.csv()))
48-
.jobConsumer(j -> jobId.set(j.getJobId()))
49-
.build();
50-
51-
writer.afterPropertiesSet();
52-
53-
Chunk<PersonDto> chunk = Chunk.of(new PersonDto("Volodymyr", 27), new PersonDto("Oleksandra", 26));
54-
writer.write(chunk);
55-
56-
waitForJobToFinish(jobId.get());
40+
String tableName = getTableName(testInfo);
41+
new BigQueryDataLoader(bigQuery).loadCsvSample(tableName);
42+
Chunk<PersonDto> chunk = BigQueryDataLoader.CHUNK;
5743

58-
Dataset dataset = bigQuery.getDataset(DATASET);
59-
Table table = bigQuery.getTable(TableId.of(DATASET, getTableName(testInfo)));
44+
Dataset dataset = bigQuery.getDataset(TestConstants.DATASET);
45+
Table table = bigQuery.getTable(TableId.of(TestConstants.DATASET, tableName));
6046
TableId tableId = table.getTableId();
6147
TableResult tableResult = bigQuery.listTableData(tableId, BigQuery.TableDataListOption.pageSize(2L));
6248

spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/integration/writer/BigQueryJsonItemWriterTest.java

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818

1919
import com.google.cloud.bigquery.BigQuery;
2020
import com.google.cloud.bigquery.Dataset;
21-
import com.google.cloud.bigquery.FormatOptions;
22-
import com.google.cloud.bigquery.JobId;
2321
import com.google.cloud.bigquery.Table;
2422
import com.google.cloud.bigquery.TableId;
2523
import com.google.cloud.bigquery.TableResult;
@@ -28,35 +26,23 @@
2826
import org.junit.jupiter.api.Tag;
2927
import org.junit.jupiter.api.Test;
3028
import org.junit.jupiter.api.TestInfo;
29+
import org.springframework.batch.extensions.bigquery.common.BigQueryDataLoader;
30+
import org.springframework.batch.extensions.bigquery.common.PersonDto;
31+
import org.springframework.batch.extensions.bigquery.common.TestConstants;
3132
import org.springframework.batch.extensions.bigquery.integration.writer.base.BaseBigQueryItemWriterTest;
32-
import org.springframework.batch.extensions.bigquery.writer.BigQueryJsonItemWriter;
33-
import org.springframework.batch.extensions.bigquery.writer.builder.BigQueryJsonItemWriterBuilder;
3433
import org.springframework.batch.item.Chunk;
3534

36-
import java.util.concurrent.atomic.AtomicReference;
37-
3835
@Tag("json")
3936
public class BigQueryJsonItemWriterTest extends BaseBigQueryItemWriterTest {
4037

4138
@Test
4239
void test1(TestInfo testInfo) throws Exception {
43-
AtomicReference<JobId> jobId = new AtomicReference<>();
44-
45-
BigQueryJsonItemWriter<PersonDto> writer = new BigQueryJsonItemWriterBuilder<PersonDto>()
46-
.bigQuery(bigQuery)
47-
.writeChannelConfig(generateConfiguration(testInfo, FormatOptions.json()))
48-
.jobConsumer(j -> jobId.set(j.getJobId()))
49-
.build();
50-
51-
writer.afterPropertiesSet();
52-
53-
Chunk<PersonDto> chunk = Chunk.of(new PersonDto("Viktor", 57), new PersonDto("Nina", 57));
54-
writer.write(chunk);
55-
56-
waitForJobToFinish(jobId.get());
40+
String tableName = getTableName(testInfo);
41+
new BigQueryDataLoader(bigQuery).loadJsonSample(tableName);
42+
Chunk<PersonDto> chunk = BigQueryDataLoader.CHUNK;
5743

58-
Dataset dataset = bigQuery.getDataset(DATASET);
59-
Table table = bigQuery.getTable(TableId.of(DATASET, getTableName(testInfo)));
44+
Dataset dataset = bigQuery.getDataset(TestConstants.DATASET);
45+
Table table = bigQuery.getTable(TableId.of(TestConstants.DATASET, tableName));
6046
TableId tableId = table.getTableId();
6147
TableResult tableResult = bigQuery.listTableData(tableId, BigQuery.TableDataListOption.pageSize(2L));
6248

spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/integration/writer/base/BaseBigQueryItemWriterTest.java

Lines changed: 11 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,13 @@
1616

1717
package org.springframework.batch.extensions.bigquery.integration.writer.base;
1818

19-
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
19+
import com.google.cloud.RetryOption;
2020
import com.google.cloud.bigquery.BigQuery;
2121
import com.google.cloud.bigquery.BigQueryOptions;
2222
import com.google.cloud.bigquery.DatasetInfo;
23-
import com.google.cloud.bigquery.Field;
2423
import com.google.cloud.bigquery.FormatOptions;
2524
import com.google.cloud.bigquery.JobId;
2625
import com.google.cloud.bigquery.JobStatus;
27-
import com.google.cloud.bigquery.Schema;
28-
import com.google.cloud.bigquery.StandardSQLTypeName;
2926
import com.google.cloud.bigquery.StandardTableDefinition;
3027
import com.google.cloud.bigquery.TableDefinition;
3128
import com.google.cloud.bigquery.TableId;
@@ -35,33 +32,33 @@
3532
import org.junit.jupiter.api.AfterEach;
3633
import org.junit.jupiter.api.BeforeEach;
3734
import org.junit.jupiter.api.TestInfo;
35+
import org.springframework.batch.extensions.bigquery.common.PersonDto;
36+
import org.springframework.batch.extensions.bigquery.common.TestConstants;
3837

3938
import java.lang.reflect.Method;
4039
import java.util.Objects;
4140

4241
public abstract class BaseBigQueryItemWriterTest {
4342

44-
protected static final String DATASET = "spring_extensions";
43+
private static final String TABLE_PATTERN = "%s_%s";
4544

4645
protected final BigQuery bigQuery = BigQueryOptions.getDefaultInstance().getService();
4746

48-
private static final String TABLE_PATTERN = "%s_%s";
49-
5047
@BeforeEach
5148
void prepareTest(TestInfo testInfo) {
52-
if (Objects.isNull(bigQuery.getDataset(DATASET))) {
53-
bigQuery.create(DatasetInfo.of(DATASET));
49+
if (Objects.isNull(bigQuery.getDataset(TestConstants.DATASET))) {
50+
bigQuery.create(DatasetInfo.of(TestConstants.DATASET));
5451
}
5552

56-
if (Objects.isNull(bigQuery.getTable(DATASET, getTableName(testInfo)))) {
53+
if (Objects.isNull(bigQuery.getTable(TestConstants.DATASET, getTableName(testInfo)))) {
5754
TableDefinition tableDefinition = StandardTableDefinition.of(PersonDto.getBigQuerySchema());
58-
bigQuery.create(TableInfo.of(TableId.of(DATASET, getTableName(testInfo)), tableDefinition));
55+
bigQuery.create(TableInfo.of(TableId.of(TestConstants.DATASET, getTableName(testInfo)), tableDefinition));
5956
}
6057
}
6158

6259
@AfterEach
6360
void cleanupTest(TestInfo testInfo) {
64-
bigQuery.delete(TableId.of(DATASET, getTableName(testInfo)));
61+
bigQuery.delete(TableId.of(TestConstants.DATASET, getTableName(testInfo)));
6562
}
6663

6764
protected String getTableName(TestInfo testInfo) {
@@ -74,13 +71,14 @@ protected String getTableName(TestInfo testInfo) {
7471

7572
protected WriteChannelConfiguration generateConfiguration(TestInfo testInfo, FormatOptions formatOptions) {
7673
return WriteChannelConfiguration
77-
.newBuilder(TableId.of(DATASET, getTableName(testInfo)))
74+
.newBuilder(TableId.of(TestConstants.DATASET, getTableName(testInfo)))
7875
.setSchema(PersonDto.getBigQuerySchema())
7976
.setAutodetect(false)
8077
.setFormatOptions(formatOptions)
8178
.build();
8279
}
8380

81+
/** TODO check {@link com.google.cloud.bigquery.Job#waitFor(RetryOption...)} */
8482
protected void waitForJobToFinish(JobId jobId) {
8583
JobStatus status = bigQuery.getJob(jobId).getStatus();
8684

@@ -89,15 +87,4 @@ protected void waitForJobToFinish(JobId jobId) {
8987
}
9088
}
9189

92-
@JsonPropertyOrder(value = {"name", "age"})
93-
public record PersonDto(String name, Integer age) {
94-
95-
public static Schema getBigQuerySchema() {
96-
Field nameField = Field.newBuilder("name", StandardSQLTypeName.STRING).build();
97-
Field ageField = Field.newBuilder("age", StandardSQLTypeName.INT64).build();
98-
return Schema.of(nameField, ageField);
99-
}
100-
101-
}
102-
10390
}

spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/base/AbstractBigQueryTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,4 @@ protected BigQuery prepareMockedBigQuery() {
1919
return mockedBigQuery;
2020
}
2121

22-
public record PersonDto(String name) {}
23-
2422
}

spring-batch-bigquery/src/test/java/org/springframework/batch/extensions/bigquery/unit/writer/builder/BigQueryCsvItemWriterBuilderTests.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@
2727
import org.apache.commons.logging.LogFactory;
2828
import org.junit.jupiter.api.Assertions;
2929
import org.junit.jupiter.api.Test;
30+
import org.springframework.batch.extensions.bigquery.common.PersonDto;
31+
import org.springframework.batch.extensions.bigquery.common.TestConstants;
3032
import org.springframework.batch.extensions.bigquery.unit.base.AbstractBigQueryTest;
3133
import org.springframework.batch.extensions.bigquery.writer.BigQueryCsvItemWriter;
3234
import org.springframework.batch.extensions.bigquery.writer.builder.BigQueryCsvItemWriterBuilder;
3335

3436
class BigQueryCsvItemWriterBuilderTests extends AbstractBigQueryTest {
3537

36-
private static final String DATASET_NAME = "my_dataset";
38+
private static final String TABLE = "persons_csv";
3739

3840
private final Log logger = LogFactory.getLog(getClass());
3941

@@ -44,10 +46,10 @@ class BigQueryCsvItemWriterBuilderTests extends AbstractBigQueryTest {
4446
void testCsvWriterWithRowMapper() {
4547
BigQuery mockedBigQuery = prepareMockedBigQuery();
4648
CsvMapper csvMapper = new CsvMapper();
47-
DatasetInfo datasetInfo = DatasetInfo.newBuilder(DATASET_NAME).setLocation("europe-west-2").build();
49+
DatasetInfo datasetInfo = DatasetInfo.newBuilder(TestConstants.DATASET).setLocation("europe-west-2").build();
4850

4951
WriteChannelConfiguration writeConfiguration = WriteChannelConfiguration
50-
.newBuilder(TableId.of(datasetInfo.getDatasetId().getDataset(), "csv_table"))
52+
.newBuilder(TableId.of(datasetInfo.getDatasetId().getDataset(), TABLE))
5153
.setAutodetect(true)
5254
.setFormatOptions(FormatOptions.csv())
5355
.build();
@@ -70,7 +72,7 @@ void testCsvWriterWithCsvMapper() {
7072
BigQuery mockedBigQuery = prepareMockedBigQuery();
7173

7274
WriteChannelConfiguration writeConfiguration = WriteChannelConfiguration
73-
.newBuilder(TableId.of(DATASET_NAME, "csv_table"))
75+
.newBuilder(TableId.of(TestConstants.DATASET, TABLE))
7476
.setAutodetect(true)
7577
.setFormatOptions(FormatOptions.csv())
7678
.build();

0 commit comments

Comments
 (0)