1
1
import json
2
+ import logging
2
3
import os
3
- import sys
4
4
import time
5
5
import warnings
6
6
from datetime import datetime
11
11
from pandas import DataFrame , compat
12
12
from pandas .compat import lzip
13
13
14
+ logger = logging .getLogger (__name__ )
15
+
14
16
15
17
def _check_google_client_version ():
16
18
@@ -162,15 +164,14 @@ class TableCreationError(ValueError):
162
164
class GbqConnector (object ):
163
165
scope = 'https://www.googleapis.com/auth/bigquery'
164
166
165
- def __init__ (self , project_id , reauth = False , verbose = False ,
167
+ def __init__ (self , project_id , reauth = False ,
166
168
private_key = None , auth_local_webserver = False ,
167
169
dialect = 'legacy' ):
168
170
from google .api_core .exceptions import GoogleAPIError
169
171
from google .api_core .exceptions import ClientError
170
172
self .http_error = (ClientError , GoogleAPIError )
171
173
self .project_id = project_id
172
174
self .reauth = reauth
173
- self .verbose = verbose
174
175
self .private_key = private_key
175
176
self .auth_local_webserver = auth_local_webserver
176
177
self .dialect = dialect
@@ -324,7 +325,7 @@ def save_user_account_credentials(self, credentials):
324
325
}
325
326
json .dump (credentials_json , credentials_file )
326
327
except IOError :
327
- self . _print ('Unable to save credentials.' )
328
+ logger . warning ('Unable to save credentials.' )
328
329
329
330
def get_user_account_credentials (self ):
330
331
"""Gets user account credentials.
@@ -410,22 +411,17 @@ def get_service_account_credentials(self):
410
411
"Can be obtained from: https://console.developers.google."
411
412
"com/permissions/serviceaccounts" )
412
413
413
- def _print (self , msg , end = '\n ' ):
414
- if self .verbose :
415
- sys .stdout .write (msg + end )
416
- sys .stdout .flush ()
417
-
418
414
def _start_timer (self ):
419
415
self .start = time .time ()
420
416
421
417
def get_elapsed_seconds (self ):
422
418
return round (time .time () - self .start , 2 )
423
419
424
- def print_elapsed_seconds (self , prefix = 'Elapsed' , postfix = 's.' ,
425
- overlong = 7 ):
420
+ def log_elapsed_seconds (self , prefix = 'Elapsed' , postfix = 's.' ,
421
+ overlong = 7 ):
426
422
sec = self .get_elapsed_seconds ()
427
423
if sec > overlong :
428
- self . _print ('{} {} {}' .format (prefix , sec , postfix ))
424
+ logger . info ('{} {} {}' .format (prefix , sec , postfix ))
429
425
430
426
# http://stackoverflow.com/questions/1094841/reusable-library-to-get-human-readable-version-of-file-size
431
427
@staticmethod
@@ -481,11 +477,12 @@ def run_query(self, query, **kwargs):
481
477
482
478
self ._start_timer ()
483
479
try :
484
- self ._print ('Requesting query... ' , end = "" )
480
+
481
+ logger .info ('Requesting query... ' )
485
482
query_reply = self .client .query (
486
483
query ,
487
484
job_config = QueryJobConfig .from_api_repr (job_config ['query' ]))
488
- self . _print ('ok.' )
485
+ logger . info ('ok. \n Query running.. .' )
489
486
except (RefreshError , ValueError ):
490
487
if self .private_key :
491
488
raise AccessDenied (
@@ -498,10 +495,10 @@ def run_query(self, query, **kwargs):
498
495
self .process_http_error (ex )
499
496
500
497
job_id = query_reply .job_id
501
- self . _print ('Job ID: %s\n Query running...' % job_id )
498
+ logger . info ('Job ID: %s\n Query running...' % job_id )
502
499
503
500
while query_reply .state != 'DONE' :
504
- self .print_elapsed_seconds (' Elapsed' , 's. Waiting...' )
501
+ self .log_elapsed_seconds (' Elapsed' , 's. Waiting...' )
505
502
506
503
timeout_ms = job_config ['query' ].get ('timeoutMs' )
507
504
if timeout_ms and timeout_ms < self .get_elapsed_seconds () * 1000 :
@@ -520,19 +517,16 @@ def run_query(self, query, **kwargs):
520
517
except self .http_error as ex :
521
518
self .process_http_error (ex )
522
519
523
- if self .verbose :
524
- if query_reply .cache_hit :
525
- self ._print ('Query done.\n Cache hit.\n ' )
526
- else :
527
- bytes_processed = query_reply .total_bytes_processed or 0
528
- bytes_billed = query_reply .total_bytes_billed or 0
529
- self ._print ('Query done.\n Processed: {} Billed: {}' .format (
530
- self .sizeof_fmt (bytes_processed ),
531
- self .sizeof_fmt (bytes_billed )))
532
- self ._print ('Standard price: ${:,.2f} USD\n ' .format (
533
- bytes_billed * self .query_price_for_TB ))
534
-
535
- self ._print ('Retrieving results...' )
520
+ if query_reply .cache_hit :
521
+ logger .debug ('Query done.\n Cache hit.\n ' )
522
+ else :
523
+ bytes_processed = query_reply .total_bytes_processed or 0
524
+ bytes_billed = query_reply .total_bytes_billed or 0
525
+ logger .debug ('Query done.\n Processed: {} Billed: {}' .format (
526
+ self .sizeof_fmt (bytes_processed ),
527
+ self .sizeof_fmt (bytes_billed )))
528
+ logger .debug ('Standard price: ${:,.2f} USD\n ' .format (
529
+ bytes_billed * self .query_price_for_TB ))
536
530
537
531
try :
538
532
rows_iter = query_reply .result ()
@@ -546,8 +540,8 @@ def run_query(self, query, **kwargs):
546
540
for field in rows_iter .schema ],
547
541
}
548
542
549
- # print basic query stats
550
- self . _print ('Got {} rows.\n ' .format (total_rows ))
543
+ # log basic query stats
544
+ logger . info ('Got {} rows.\n ' .format (total_rows ))
551
545
552
546
return schema , result_rows
553
547
@@ -557,18 +551,18 @@ def load_data(
557
551
from pandas_gbq import _load
558
552
559
553
total_rows = len (dataframe )
560
- self . _print ("\n \n " )
554
+ logger . info ("\n \n " )
561
555
562
556
try :
563
557
for remaining_rows in _load .load_chunks (
564
558
self .client , dataframe , dataset_id , table_id ,
565
559
chunksize = chunksize , schema = schema ):
566
- self . _print ("\r Load is {0}% Complete" .format (
560
+ logger . info ("\r Load is {0}% Complete" .format (
567
561
((total_rows - remaining_rows ) * 100 ) / total_rows ))
568
562
except self .http_error as ex :
569
563
self .process_http_error (ex )
570
564
571
- self . _print ("\n " )
565
+ logger . info ("\n " )
572
566
573
567
def schema (self , dataset_id , table_id ):
574
568
"""Retrieve the schema of the table
@@ -687,7 +681,7 @@ def delete_and_recreate_table(self, dataset_id, table_id, table_schema):
687
681
# be a 120 second delay
688
682
689
683
if not self .verify_schema (dataset_id , table_id , table_schema ):
690
- self . _print ('The existing table has a different schema. Please '
684
+ logger . info ('The existing table has a different schema. Please '
691
685
'wait 2 minutes. See Google BigQuery issue #191' )
692
686
delay = 120
693
687
@@ -729,7 +723,7 @@ def _parse_data(schema, rows):
729
723
730
724
731
725
def read_gbq (query , project_id = None , index_col = None , col_order = None ,
732
- reauth = False , verbose = True , private_key = None ,
726
+ reauth = False , verbose = None , private_key = None ,
733
727
auth_local_webserver = False , dialect = 'legacy' , ** kwargs ):
734
728
r"""Load data from Google BigQuery using google-cloud-python
735
729
@@ -768,8 +762,6 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None,
768
762
reauth : boolean (default False)
769
763
Force Google BigQuery to reauthenticate the user. This is useful
770
764
if multiple accounts are used.
771
- verbose : boolean (default True)
772
- Verbose output
773
765
private_key : str (optional)
774
766
Service account private key in JSON format. Can be file path
775
767
or string contents. This is useful for remote server
@@ -793,6 +785,7 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None,
793
785
compliant with the SQL 2011 standard. For more information
794
786
see `BigQuery SQL Reference
795
787
<https://cloud.google.com/bigquery/sql-reference/>`__
788
+ verbose : None, deprecated
796
789
797
790
**kwargs : Arbitrary keyword arguments
798
791
configuration (dict): query config parameters for job processing.
@@ -809,6 +802,11 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None,
809
802
DataFrame representing results of query
810
803
811
804
"""
805
+ if verbose is not None :
806
+ warnings .warn (
807
+ "verbose is deprecated and will be removed in "
808
+ "a future version. Set logging level in order to vary "
809
+ "verbosity" , FutureWarning , stacklevel = 1 )
812
810
813
811
_test_google_api_imports ()
814
812
@@ -819,7 +817,7 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None,
819
817
raise ValueError ("'{0}' is not valid for dialect" .format (dialect ))
820
818
821
819
connector = GbqConnector (
822
- project_id , reauth = reauth , verbose = verbose , private_key = private_key ,
820
+ project_id , reauth = reauth , private_key = private_key ,
823
821
dialect = dialect , auth_local_webserver = auth_local_webserver )
824
822
schema , rows = connector .run_query (query , ** kwargs )
825
823
final_df = _parse_data (schema , rows )
@@ -853,7 +851,7 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None,
853
851
final_df [field ['name' ]] = \
854
852
final_df [field ['name' ]].astype (type_map [field ['type' ].upper ()])
855
853
856
- connector .print_elapsed_seconds (
854
+ connector .log_elapsed_seconds (
857
855
'Total time taken' ,
858
856
datetime .now ().strftime ('s.\n Finished at %Y-%m-%d %H:%M:%S.' ),
859
857
0
@@ -863,7 +861,7 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None,
863
861
864
862
865
863
def to_gbq (dataframe , destination_table , project_id , chunksize = None ,
866
- verbose = True , reauth = False , if_exists = 'fail' , private_key = None ,
864
+ verbose = None , reauth = False , if_exists = 'fail' , private_key = None ,
867
865
auth_local_webserver = False , table_schema = None ):
868
866
"""Write a DataFrame to a Google BigQuery table.
869
867
@@ -899,8 +897,6 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=None,
899
897
chunksize : int (default None)
900
898
Number of rows to be inserted in each chunk from the dataframe. Use
901
899
``None`` to load the dataframe in a single chunk.
902
- verbose : boolean (default True)
903
- Show percentage complete
904
900
reauth : boolean (default False)
905
901
Force Google BigQuery to reauthenticate the user. This is useful
906
902
if multiple accounts are used.
@@ -930,10 +926,17 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=None,
930
926
of DataFrame columns. See BigQuery API documentation on available
931
927
names of a field.
932
928
.. versionadded:: 0.3.1
929
+ verbose : None, deprecated
933
930
"""
934
931
935
932
_test_google_api_imports ()
936
933
934
+ if verbose is not None :
935
+ warnings .warn (
936
+ "verbose is deprecated and will be removed in "
937
+ "a future version. Set logging level in order to vary "
938
+ "verbosity" , FutureWarning , stacklevel = 1 )
939
+
937
940
if if_exists not in ('fail' , 'replace' , 'append' ):
938
941
raise ValueError ("'{0}' is not valid for if_exists" .format (if_exists ))
939
942
@@ -942,7 +945,7 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=None,
942
945
"Invalid Table Name. Should be of the form 'datasetId.tableId' " )
943
946
944
947
connector = GbqConnector (
945
- project_id , reauth = reauth , verbose = verbose , private_key = private_key ,
948
+ project_id , reauth = reauth , private_key = private_key ,
946
949
auth_local_webserver = auth_local_webserver )
947
950
dataset_id , table_id = destination_table .rsplit ('.' , 1 )
948
951
@@ -1004,10 +1007,9 @@ def _generate_bq_schema(df, default_type='STRING'):
1004
1007
1005
1008
class _Table (GbqConnector ):
1006
1009
1007
- def __init__ (self , project_id , dataset_id , reauth = False , verbose = False ,
1008
- private_key = None ):
1010
+ def __init__ (self , project_id , dataset_id , reauth = False , private_key = None ):
1009
1011
self .dataset_id = dataset_id
1010
- super (_Table , self ).__init__ (project_id , reauth , verbose , private_key )
1012
+ super (_Table , self ).__init__ (project_id , reauth , private_key )
1011
1013
1012
1014
def exists (self , table_id ):
1013
1015
""" Check if a table exists in Google BigQuery
@@ -1101,10 +1103,8 @@ def delete(self, table_id):
1101
1103
1102
1104
class _Dataset (GbqConnector ):
1103
1105
1104
- def __init__ (self , project_id , reauth = False , verbose = False ,
1105
- private_key = None ):
1106
- super (_Dataset , self ).__init__ (project_id , reauth , verbose ,
1107
- private_key )
1106
+ def __init__ (self , project_id , reauth = False , private_key = None ):
1107
+ super (_Dataset , self ).__init__ (project_id , reauth , private_key )
1108
1108
1109
1109
def exists (self , dataset_id ):
1110
1110
""" Check if a dataset exists in Google BigQuery
0 commit comments