Skip to content

Commit 9561f35

Browse files
committed
WIP: address PR comments
1 parent 35ec8cb commit 9561f35

File tree

4 files changed

+24
-23
lines changed

4 files changed

+24
-23
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
pytest==5.3.2
1+
pytest==6.0.0
Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
1-
grpcio==1.29.0
2-
google-auth==1.16.0
3-
google-auth-httplib2==0.0.3
4-
google-cloud==0.34.0
1+
#grpcio==1.29.0
2+
#google-auth==1.16.0
3+
#google-auth-httplib2==0.0.3
54
google-cloud-storage==1.28.1
6-
google-cloud-dataproc==0.8.0
5+
google-cloud-dataproc==2.0.0
76
google-cloud-bigquery==1.25.0

data-science-onramp/data-ingestion/setup.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@
1818
from pyspark.sql.functions import date_format, expr, UserDefinedFunction, when
1919
from pyspark.sql.types import FloatType, StringType, StructField, StructType
2020

21-
22-
BUCKET_NAME = sys.argv[1]
23-
DATASET_NAME = sys.argv[2]
2421
TABLE = "bigquery-public-data.new_york_citibike.citibike_trips"
2522
CITIBIKE_TABLE_NAME = "RAW_DATA"
2623
EXTERNAL_TABLES = {
@@ -96,7 +93,7 @@ def gender(s):
9693

9794
def convert_angle(angle):
9895
"""Converts long and lat to DMS notation"""
99-
if angle is None:
96+
if not angle:
10097
return None
10198
degrees = int(angle)
10299
minutes = int((angle - degrees) * 60)
@@ -105,19 +102,19 @@ def convert_angle(angle):
105102
return random.choices([str(angle), new_angle], weights=[0.55, 0.45])[0]
106103

107104

108-
def create_bigquery_dataset():
105+
def create_bigquery_dataset(dataset_name):
109106
# Create BigQuery Dataset
110107
client = bigquery.Client()
111-
dataset_id = f"{client.project}.{DATASET_NAME}"
108+
dataset_id = f"{client.project}.{dataset_name}"
112109
dataset = bigquery.Dataset(dataset_id)
113110
dataset.location = "US"
114111
dataset = client.create_dataset(dataset)
115112

116113

117-
def write_to_bigquery(df, table_name):
114+
def write_to_bigquery(df, table_name, dataset_name):
118115
"""Write a dataframe to BigQuery"""
119116
client = bigquery.Client()
120-
dataset_id = f"{client.project}.{DATASET_NAME}"
117+
dataset_id = f"{client.project}.{dataset_name}"
121118

122119
# Saving the data to BigQuery
123120
df.write.format("bigquery").option("table", f"{dataset_id}.{table_name}").save()
@@ -126,12 +123,16 @@ def write_to_bigquery(df, table_name):
126123

127124

128125
def main():
129-
# Create a SparkSession under the name "setup". Viewable via the Spark UI
126+
# Get command line arguments
127+
BUCKET_NAME = sys.argv[1]
128+
DATASET_NAME = sys.argv[2]
129+
130+
# Create a SparkSession under the name "setup"
130131
spark = SparkSession.builder.appName("setup").getOrCreate()
131132

132133
spark.conf.set("temporaryGcsBucket", BUCKET_NAME)
133134

134-
create_bigquery_dataset()
135+
create_bigquery_dataset(DATASET_NAME)
135136

136137
# Whether we are running the job as a test
137138
test = False
@@ -147,7 +148,7 @@ def main():
147148
for table_name, data in EXTERNAL_TABLES.items():
148149
df = spark.createDataFrame(pd.read_csv(data["url"]), schema=data["schema"])
149150

150-
write_to_bigquery(df, table_name)
151+
write_to_bigquery(df, table_name, DATASET_NAME)
151152

152153
# Check if table exists
153154
try:
@@ -203,7 +204,7 @@ def main():
203204
df = df.union(dup_df)
204205

205206
print("Uploading citibike dataset...")
206-
write_to_bigquery(df, CITIBIKE_TABLE_NAME)
207+
write_to_bigquery(df, CITIBIKE_TABLE_NAME, DATASET_NAME)
207208

208209

209210
if __name__ == "__main__":

data-science-onramp/data-ingestion/setup_test.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
"project_id": PROJECT_ID,
3636
"cluster_name": DATAPROC_CLUSTER,
3737
"config": {
38-
"gce_cluster_config": {"zone_uri": "",},
38+
"gce_cluster_config": {"zone_uri": ""},
3939
"master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-8"},
4040
"worker_config": {"num_instances": 6, "machine_type_uri": "n1-standard-8"},
4141
"software_config": {
@@ -48,7 +48,7 @@
4848
"placement": {"cluster_name": DATAPROC_CLUSTER},
4949
"pyspark_job": {
5050
"main_python_file_uri": f"gs://{BUCKET_NAME}/{BUCKET_BLOB}",
51-
"args": [BUCKET_NAME, BQ_DATASET, "--test",],
51+
"args": [BUCKET_NAME, BQ_DATASET, "--test"],
5252
"jar_file_uris": ["gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"],
5353
},
5454
}
@@ -58,10 +58,11 @@
5858
def setup_and_teardown_cluster():
5959
# Create cluster using cluster client
6060
cluster_client = dataproc.ClusterControllerClient(
61-
client_options={"api_endpoint": f"{CLUSTER_REGION}-dataproc.googleapis.com:443"}
61+
#client_options={"api_endpoint": f"{CLUSTER_REGION}-dataproc.googleapis.com:443"}
6262
)
63+
6364
operation = cluster_client.create_cluster(
64-
PROJECT_ID, CLUSTER_REGION, CLUSTER_CONFIG
65+
project_id=PROJECT_ID, region=CLUSTER_REGION, cluster=CLUSTER_CONFIG
6566
)
6667

6768
# Wait for cluster to provision
@@ -71,7 +72,7 @@ def setup_and_teardown_cluster():
7172

7273
# Delete cluster
7374
operation = cluster_client.delete_cluster(
74-
PROJECT_ID, CLUSTER_REGION, DATAPROC_CLUSTER
75+
project_id=PROJECT_ID, region=CLUSTER_REGION, name=DATAPROC_CLUSTER
7576
)
7677
operation.result()
7778

0 commit comments

Comments
 (0)