Skip to content

Commit 89c01b8

Browse files
committed
Rename query file and use async method.
Update to latest version of google-cloud-bigquery.
1 parent 3554dad commit 89c01b8

File tree

4 files changed

+89
-49
lines changed

4 files changed

+89
-49
lines changed

bigquery/cloud-client/sync_query_params.py renamed to bigquery/cloud-client/query_params.py

Lines changed: 65 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -14,22 +14,34 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616

17-
"""Command-line app to perform synchronous queries with parameters in BigQuery.
17+
"""Command-line app to perform queries with parameters in BigQuery.
1818
1919
For more information, see the README.md under /bigquery.
2020
2121
Example invocation:
22-
$ python sync_query_params.py --use-named-params 'romeoandjuliet' 100
23-
$ python sync_query_params.py --use-positional-params 'romeoandjuliet' 100
22+
$ python query_params.py --use-named-params 'romeoandjuliet' 100
23+
$ python query_params.py --use-positional-params 'romeoandjuliet' 100
2424
"""
2525

2626
import argparse
2727
import datetime
28+
import time
29+
import uuid
2830

2931
from google.cloud import bigquery
3032
import pytz
3133

3234

35+
def wait_for_job(job):
36+
while True:
37+
job.reload() # Refreshes the state via a GET request.
38+
if job.state == 'DONE':
39+
if job.error_result:
40+
raise RuntimeError(job.errors)
41+
return
42+
time.sleep(1)
43+
44+
3345
def print_results(query_results):
3446
"""Print the query results by requesting a page at a time."""
3547
page_token = None
@@ -46,15 +58,16 @@ def print_results(query_results):
4658
break
4759

4860

49-
def sync_query_positional_params(corpus, min_word_count):
61+
def query_positional_params(corpus, min_word_count):
5062
client = bigquery.Client()
5163
query = """SELECT word, word_count
5264
FROM `bigquery-public-data.samples.shakespeare`
5365
WHERE corpus = ?
5466
AND word_count >= ?
5567
ORDER BY word_count DESC;
5668
"""
57-
query_results = client.run_sync_query(
69+
query_job = client.run_async_query(
70+
str(uuid.uuid4()),
5871
query,
5972
query_parameters=(
6073
bigquery.ScalarQueryParameter(
@@ -68,33 +81,40 @@ def sync_query_positional_params(corpus, min_word_count):
6881

6982
# Only standard SQL syntax supports parameters in queries.
7083
# See: https://cloud.google.com/bigquery/sql-reference/
71-
query_results.use_legacy_sql = False
72-
query_results.run()
73-
print_results(query_results)
84+
query_job.use_legacy_sql = False
85+
86+
# Start the query and wait for the job to complete.
87+
query_job.begin()
88+
wait_for_job(query_job)
89+
print_results(query_job.results())
7490

7591

76-
def sync_query_named_params(corpus, min_word_count):
92+
def query_named_params(corpus, min_word_count):
7793
client = bigquery.Client()
7894
query = """SELECT word, word_count
7995
FROM `bigquery-public-data.samples.shakespeare`
8096
WHERE corpus = @corpus
8197
AND word_count >= @min_word_count
8298
ORDER BY word_count DESC;
8399
"""
84-
query_results = client.run_sync_query(
100+
query_job = client.run_async_query(
101+
str(uuid.uuid4()),
85102
query,
86103
query_parameters=(
87104
bigquery.ScalarQueryParameter('corpus', 'STRING', corpus),
88105
bigquery.ScalarQueryParameter(
89106
'min_word_count',
90107
'INT64',
91108
min_word_count)))
92-
query_results.use_legacy_sql = False
93-
query_results.run()
94-
print_results(query_results)
109+
query_job.use_legacy_sql = False
95110

111+
# Start the query and wait for the job to complete.
112+
query_job.begin()
113+
wait_for_job(query_job)
114+
print_results(query_job.results())
96115

97-
def sync_query_array_params(gender, states):
116+
117+
def query_array_params(gender, states):
98118
client = bigquery.Client()
99119
query = """SELECT name, sum(number) as count
100120
FROM `bigquery-public-data.usa_names.usa_1910_2013`
@@ -104,45 +124,57 @@ def sync_query_array_params(gender, states):
104124
ORDER BY count DESC
105125
LIMIT 10;
106126
"""
107-
query_results = client.run_sync_query(
127+
query_job = client.run_async_query(
128+
str(uuid.uuid4()),
108129
query,
109130
query_parameters=(
110131
bigquery.ScalarQueryParameter('gender', 'STRING', gender),
111132
bigquery.ArrayQueryParameter('states', 'STRING', states)))
112-
query_results.use_legacy_sql = False
113-
query_results.run()
114-
print_results(query_results)
133+
query_job.use_legacy_sql = False
134+
135+
# Start the query and wait for the job to complete.
136+
query_job.begin()
137+
wait_for_job(query_job)
138+
print_results(query_job.results())
115139

116140

117-
def sync_query_timestamp_params(year, month, day, hour, minute):
141+
def query_timestamp_params(year, month, day, hour, minute):
118142
client = bigquery.Client()
119143
query = 'SELECT TIMESTAMP_ADD(@ts_value, INTERVAL 1 HOUR);'
120-
query_results = client.run_sync_query(
144+
query_job = client.run_async_query(
145+
str(uuid.uuid4()),
121146
query,
122147
query_parameters=[
123148
bigquery.ScalarQueryParameter(
124149
'ts_value',
125150
'TIMESTAMP',
126151
datetime.datetime(
127152
year, month, day, hour, minute, tzinfo=pytz.UTC))])
128-
query_results.use_legacy_sql = False
129-
query_results.run()
130-
print_results(query_results)
153+
query_job.use_legacy_sql = False
131154

155+
# Start the query and wait for the job to complete.
156+
query_job.begin()
157+
wait_for_job(query_job)
158+
print_results(query_job.results())
132159

133-
def sync_query_struct_params(x, y):
160+
161+
def query_struct_params(x, y):
134162
client = bigquery.Client()
135163
query = 'SELECT @struct_value AS s;'
136-
query_results = client.run_sync_query(
164+
query_job = client.run_async_query(
165+
str(uuid.uuid4()),
137166
query,
138167
query_parameters=[
139168
bigquery.StructQueryParameter(
140169
'struct_value',
141170
bigquery.ScalarQueryParameter('x', 'INT64', x),
142171
bigquery.ScalarQueryParameter('y', 'STRING', y))])
143-
query_results.use_legacy_sql = False
144-
query_results.run()
145-
print_results(query_results)
172+
query_job.use_legacy_sql = False
173+
174+
# Start the query and wait for the job to complete.
175+
query_job.begin()
176+
wait_for_job(query_job)
177+
print_results(query_job.results())
146178

147179

148180
if __name__ == '__main__':
@@ -197,15 +229,15 @@ def sync_query_struct_params(x, y):
197229
args = parser.parse_args()
198230

199231
if args.sample == 'named':
200-
sync_query_named_params(args.corpus, args.min_word_count)
232+
query_named_params(args.corpus, args.min_word_count)
201233
elif args.sample == 'positional':
202-
sync_query_positional_params(args.corpus, args.min_word_count)
234+
query_positional_params(args.corpus, args.min_word_count)
203235
elif args.sample == 'array':
204-
sync_query_array_params(args.gender, args.states)
236+
query_array_params(args.gender, args.states)
205237
elif args.sample == 'timestamp':
206-
sync_query_timestamp_params(
238+
query_timestamp_params(
207239
args.year, args.month, args.day, args.hour, args.minute)
208240
elif args.sample == 'struct':
209-
sync_query_struct_params(args.x, args.y)
241+
query_struct_params(args.x, args.y)
210242
else:
211243
print('Unexpected value for sample')

bigquery/cloud-client/sync_query_params_test.py renamed to bigquery/cloud-client/query_params_test.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,33 +12,41 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
import sync_query_params
15+
import query_params
1616

1717

18-
def test_sync_query_named_params(capsys):
19-
sync_query_params.sync_query_array_params(
18+
def test_query_array_params(capsys):
19+
query_params.query_array_params(
2020
gender='M',
2121
states=['WA', 'WI', 'WV', 'WY'])
2222
out, _ = capsys.readouterr()
2323
assert 'James' in out
2424

2525

26-
def test_sync_query_positional_params(capsys):
27-
sync_query_params.sync_query_positional_params(
26+
def test_query_named_params(capsys):
27+
query_params.query_named_params(
2828
corpus='romeoandjuliet',
2929
min_word_count=100)
3030
out, _ = capsys.readouterr()
3131
assert 'love' in out
3232

3333

34-
def test_sync_query_struct_params(capsys):
35-
sync_query_params.sync_query_struct_params(765, "hello world")
34+
def test_query_positional_params(capsys):
35+
query_params.query_positional_params(
36+
corpus='romeoandjuliet',
37+
min_word_count=100)
38+
out, _ = capsys.readouterr()
39+
assert 'love' in out
40+
41+
42+
def test_query_struct_params(capsys):
43+
query_params.query_struct_params(765, "hello world")
3644
out, _ = capsys.readouterr()
3745
assert '765' in out
3846
assert 'hello world' in out
3947

4048

41-
def test_sync_query_timestamp_params(capsys):
42-
sync_query_params.sync_query_timestamp_params(2016, 12, 7, 8, 0)
49+
def test_query_timestamp_params(capsys):
50+
query_params.query_timestamp_params(2016, 12, 7, 8, 0)
4351
out, _ = capsys.readouterr()
44-
assert '2016-12-07 09:00:00' in out
52+
assert '2016, 12, 7, 9, 0' in out
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
google-cloud-bigquery==0.22.1
1+
google-cloud-bigquery==0.24.0
22
pytz==2016.10

scripts/prepare-testing-project.sh

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,13 @@ gcloud alpha bigtable clusters create bigtable-test \
3131
--zone=us-central1-c
3232

3333
echo "Creating bigquery resources."
34-
gcloud alpha bigquery datasets create test_dataset
35-
gcloud alpha bigquery datasets create ephemeral_test_dataset
34+
bq mk test_dataset
35+
bq mk ephemeral_test_dataset
3636
gsutil cp bigquery/api/resources/data.csv gs://$GCLOUD_PROJECT/data.csv
37-
gcloud alpha bigquery import \
37+
bq load \
38+
test_dataset.test_table \
3839
gs://$GCLOUD_PROJECT/data.csv \
39-
test_dataset/test_table \
40-
--schema-file bigquery/api/resources/schema.json
40+
bigquery/api/resources/schema.json
4141

4242
echo "Creating datastore indexes."
4343
gcloud app deploy -q datastore/api/index.yaml

0 commit comments

Comments
 (0)