4
4
import time
5
5
import warnings
6
6
from datetime import datetime
7
- from distutils .version import StrictVersion
8
7
from time import sleep
9
8
10
9
import numpy as np
@@ -23,17 +22,15 @@ def _check_google_client_version():
23
22
raise ImportError ('Could not import pkg_resources (setuptools).' )
24
23
25
24
# https://github.com/GoogleCloudPlatform/google-cloud-python/blob/master/bigquery/CHANGELOG.md
26
- bigquery_client_minimum_version = '0.29.0'
25
+ bigquery_minimum_version = pkg_resources .parse_version ('0.32.0.dev1' )
26
+ bigquery_installed_version = pkg_resources .get_distribution (
27
+ 'google-cloud-bigquery' ).parsed_version
27
28
28
- _BIGQUERY_CLIENT_VERSION = pkg_resources .get_distribution (
29
- 'google-cloud-bigquery' ).version
30
-
31
- if (StrictVersion (_BIGQUERY_CLIENT_VERSION ) <
32
- StrictVersion (bigquery_client_minimum_version )):
33
- raise ImportError ('pandas-gbq requires google-cloud-bigquery >= {0}, '
34
- 'current version {1}'
35
- .format (bigquery_client_minimum_version ,
36
- _BIGQUERY_CLIENT_VERSION ))
29
+ if bigquery_installed_version < bigquery_minimum_version :
30
+ raise ImportError (
31
+ 'pandas-gbq requires google-cloud-bigquery >= {0}, '
32
+ 'current version {1}' .format (
33
+ bigquery_minimum_version , bigquery_installed_version ))
37
34
38
35
39
36
def _test_google_api_imports ():
@@ -459,29 +456,23 @@ def run_query(self, query, **kwargs):
459
456
}
460
457
config = kwargs .get ('configuration' )
461
458
if config is not None :
462
- if len (config ) != 1 :
463
- raise ValueError ("Only one job type must be specified, but "
464
- "given {}" .format (',' .join (config .keys ())))
465
- if 'query' in config :
466
- if 'query' in config ['query' ]:
467
- if query is not None :
468
- raise ValueError ("Query statement can't be specified "
469
- "inside config while it is specified "
470
- "as parameter" )
471
- query = config ['query' ]['query' ]
472
- del config ['query' ]['query' ]
473
-
474
- job_config ['query' ].update (config ['query' ])
475
- else :
476
- raise ValueError ("Only 'query' job type is supported" )
459
+ job_config .update (config )
460
+
461
+ if 'query' in config and 'query' in config ['query' ]:
462
+ if query is not None :
463
+ raise ValueError ("Query statement can't be specified "
464
+ "inside config while it is specified "
465
+ "as parameter" )
466
+ query = config ['query' ]['query' ]
467
+ del config ['query' ]['query' ]
477
468
478
469
self ._start_timer ()
479
470
try :
480
471
481
472
logger .info ('Requesting query... ' )
482
473
query_reply = self .client .query (
483
474
query ,
484
- job_config = QueryJobConfig .from_api_repr (job_config [ 'query' ] ))
475
+ job_config = QueryJobConfig .from_api_repr (job_config ))
485
476
logger .info ('ok.\n Query running...' )
486
477
except (RefreshError , ValueError ):
487
478
if self .private_key :
@@ -598,6 +589,15 @@ def schema(self, dataset_id, table_id):
598
589
except self .http_error as ex :
599
590
self .process_http_error (ex )
600
591
592
+ def _clean_schema_fields (self , fields ):
593
+ """Return a sanitized version of the schema for comparisons."""
594
+ fields_sorted = sorted (fields , key = lambda field : field ['name' ])
595
+ # Ignore mode and description when comparing schemas.
596
+ return [
597
+ {'name' : field ['name' ], 'type' : field ['type' ]}
598
+ for field in fields_sorted
599
+ ]
600
+
601
601
def verify_schema (self , dataset_id , table_id , schema ):
602
602
"""Indicate whether schemas match exactly
603
603
@@ -621,17 +621,9 @@ def verify_schema(self, dataset_id, table_id, schema):
621
621
Whether the schemas match
622
622
"""
623
623
624
- fields_remote = sorted (self .schema (dataset_id , table_id ),
625
- key = lambda x : x ['name' ])
626
- fields_local = sorted (schema ['fields' ], key = lambda x : x ['name' ])
627
-
628
- # Ignore mode when comparing schemas.
629
- for field in fields_local :
630
- if 'mode' in field :
631
- del field ['mode' ]
632
- for field in fields_remote :
633
- if 'mode' in field :
634
- del field ['mode' ]
624
+ fields_remote = self ._clean_schema_fields (
625
+ self .schema (dataset_id , table_id ))
626
+ fields_local = self ._clean_schema_fields (schema ['fields' ])
635
627
636
628
return fields_remote == fields_local
637
629
@@ -658,16 +650,9 @@ def schema_is_subset(self, dataset_id, table_id, schema):
658
650
Whether the passed schema is a subset
659
651
"""
660
652
661
- fields_remote = self .schema (dataset_id , table_id )
662
- fields_local = schema ['fields' ]
663
-
664
- # Ignore mode when comparing schemas.
665
- for field in fields_local :
666
- if 'mode' in field :
667
- del field ['mode' ]
668
- for field in fields_remote :
669
- if 'mode' in field :
670
- del field ['mode' ]
653
+ fields_remote = self ._clean_schema_fields (
654
+ self .schema (dataset_id , table_id ))
655
+ fields_local = self ._clean_schema_fields (schema ['fields' ])
671
656
672
657
return all (field in fields_remote for field in fields_local )
673
658
@@ -709,7 +694,7 @@ def _parse_data(schema, rows):
709
694
col_names = [str (field ['name' ]) for field in fields ]
710
695
col_dtypes = [
711
696
dtype_map .get (field ['type' ].upper (), object )
712
- if field ['mode' ] != 'repeated'
697
+ if field ['mode' ]. lower () != 'repeated'
713
698
else object
714
699
for field in fields
715
700
]
@@ -847,7 +832,7 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None,
847
832
for field in schema ['fields' ]:
848
833
if field ['type' ].upper () in type_map and \
849
834
final_df [field ['name' ]].notnull ().all () and \
850
- field ['mode' ] != 'repeated' :
835
+ field ['mode' ]. lower () != 'repeated' :
851
836
final_df [field ['name' ]] = \
852
837
final_df [field ['name' ]].astype (type_map [field ['type' ].upper ()])
853
838
0 commit comments