|
21 | 21 | import static org.junit.Assert.assertEquals;
|
22 | 22 | import static org.junit.Assert.assertNotNull;
|
23 | 23 | import static org.junit.Assert.assertNull;
|
| 24 | +import static org.junit.Assert.assertTrue; |
24 | 25 |
|
| 26 | +import com.google.api.gax.core.InstantiatingExecutorProvider; |
25 | 27 | import com.google.api.gax.rpc.ServerStream;
|
26 | 28 | import com.google.cloud.RetryOption;
|
27 | 29 | import com.google.cloud.ServiceOptions;
|
|
39 | 41 | import com.google.cloud.bigquery.TableInfo;
|
40 | 42 | import com.google.cloud.bigquery.TimePartitioning;
|
41 | 43 | import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
|
| 44 | +import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings; |
42 | 45 | import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
|
43 | 46 | import com.google.cloud.bigquery.storage.v1.DataFormat;
|
44 | 47 | import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
|
@@ -806,6 +809,56 @@ public void testStructAndArraySqlTypes() throws InterruptedException, IOExceptio
|
806 | 809 | assertEquals(rowAssertMessage, new Utf8("abc"), structRecord.get("str_field"));
|
807 | 810 | }
|
808 | 811 |
|
| 812 | + @Test |
| 813 | + public void testSimpleReadWithBackgroundExecutorProvider() throws IOException { |
| 814 | + BigQueryReadSettings bigQueryReadSettings = |
| 815 | + BigQueryReadSettings.newBuilder() |
| 816 | + .setBackgroundExecutorProvider( |
| 817 | + InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(14).build()) |
| 818 | + .build(); |
| 819 | + // Overriding the default client |
| 820 | + client = BigQueryReadClient.create(bigQueryReadSettings); |
| 821 | + assertTrue( |
| 822 | + client.getStub().getStubSettings().getBackgroundExecutorProvider() |
| 823 | + instanceof InstantiatingExecutorProvider); |
| 824 | + assertEquals( |
| 825 | + 14, |
| 826 | + ((InstantiatingExecutorProvider) |
| 827 | + client.getStub().getStubSettings().getBackgroundExecutorProvider()) |
| 828 | + .getExecutorThreadCount()); |
| 829 | + String table = |
| 830 | + BigQueryResource.FormatTableResource( |
| 831 | + /* projectId = */ "bigquery-public-data", |
| 832 | + /* datasetId = */ "samples", |
| 833 | + /* tableId = */ "shakespeare"); |
| 834 | + |
| 835 | + ReadSession session = |
| 836 | + client.createReadSession( |
| 837 | + /* parent = */ parentProjectId, |
| 838 | + /* readSession = */ ReadSession.newBuilder() |
| 839 | + .setTable(table) |
| 840 | + .setDataFormat(DataFormat.AVRO) |
| 841 | + .build(), |
| 842 | + /* maxStreamCount = */ 1); |
| 843 | + assertEquals( |
| 844 | + String.format( |
| 845 | + "Did not receive expected number of streams for table '%s' CreateReadSession response:%n%s", |
| 846 | + table, session.toString()), |
| 847 | + 1, |
| 848 | + session.getStreamsCount()); |
| 849 | + |
| 850 | + ReadRowsRequest readRowsRequest = |
| 851 | + ReadRowsRequest.newBuilder().setReadStream(session.getStreams(0).getName()).build(); |
| 852 | + |
| 853 | + long rowCount = 0; |
| 854 | + ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest); |
| 855 | + for (ReadRowsResponse response : stream) { |
| 856 | + rowCount += response.getRowCount(); |
| 857 | + } |
| 858 | + |
| 859 | + assertEquals(164_656, rowCount); |
| 860 | + } |
| 861 | + |
809 | 862 | /**
|
810 | 863 | * Reads to the specified row offset within the stream. If the stream does not have the desired
|
811 | 864 | * rows to read, it will read all of them.
|
|
0 commit comments