Skip to content

Commit b16629f

Browse files
committed
refactor to use helper method
1 parent 64b8892 commit b16629f

11 files changed

+134
-252
lines changed

tests/integ/test_airflow_config.py

Lines changed: 20 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,17 @@
1212
# language governing permissions and limitations under the License.
1313
from __future__ import absolute_import
1414

15-
import gzip
1615
import os
17-
import pickle
18-
import pytest
19-
import tests.integ
2016

17+
import airflow
18+
import pytest
2119
import numpy as np
20+
from airflow import DAG
21+
from airflow.contrib.operators.sagemaker_training_operator import SageMakerTrainingOperator
22+
from airflow.contrib.operators.sagemaker_transform_operator import SageMakerTransformOperator
23+
from six.moves.urllib.parse import urlparse
2224

25+
import tests.integ
2326
from sagemaker import (
2427
KMeans,
2528
FactorizationMachines,
@@ -39,21 +42,13 @@
3942
from sagemaker.pytorch.estimator import PyTorch
4043
from sagemaker.sklearn import SKLearn
4144
from sagemaker.tensorflow import TensorFlow
42-
from sagemaker.workflow import airflow as sm_airflow
4345
from sagemaker.utils import sagemaker_timestamp
44-
45-
import airflow
46-
from airflow import DAG
47-
from airflow.contrib.operators.sagemaker_training_operator import SageMakerTrainingOperator
48-
from airflow.contrib.operators.sagemaker_transform_operator import SageMakerTransformOperator
49-
46+
from sagemaker.workflow import airflow as sm_airflow
5047
from sagemaker.xgboost import XGBoost
51-
from tests.integ import DATA_DIR, PYTHON_VERSION
48+
from tests.integ import datasets, DATA_DIR, PYTHON_VERSION
5249
from tests.integ.record_set import prepare_record_set_from_local_files
5350
from tests.integ.timeout import timeout
5451

55-
from six.moves.urllib.parse import urlparse
56-
5752
PYTORCH_MNIST_DIR = os.path.join(DATA_DIR, "pytorch_mnist")
5853
PYTORCH_MNIST_SCRIPT = os.path.join(PYTORCH_MNIST_DIR, "mnist.py")
5954
AIRFLOW_CONFIG_TIMEOUT_IN_SECONDS = 10
@@ -100,12 +95,6 @@ def test_byo_airflow_config_uploads_data_source_to_s3_when_inputs_provided(
10095
@pytest.mark.canary_quick
10196
def test_kmeans_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_instance_type):
10297
with timeout(seconds=AIRFLOW_CONFIG_TIMEOUT_IN_SECONDS):
103-
data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz")
104-
105-
# Load the data into memory as numpy arrays
106-
with gzip.open(data_path, "rb") as f:
107-
train_set, _, _ = pickle.load(f, encoding="latin1")
108-
10998
kmeans = KMeans(
11099
role=ROLE,
111100
train_instance_count=SINGLE_INSTANCE_COUNT,
@@ -124,7 +113,7 @@ def test_kmeans_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_
124113
kmeans.center_factor = 1
125114
kmeans.eval_metrics = ["ssd", "msd"]
126115

127-
records = kmeans.record_set(train_set[0][:100])
116+
records = kmeans.record_set(datasets.one_p_mnist()[0][:100])
128117

129118
training_config = _build_airflow_workflow(
130119
estimator=kmeans, instance_type=cpu_instance_type, inputs=records
@@ -138,12 +127,6 @@ def test_kmeans_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_
138127

139128
def test_fm_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_instance_type):
140129
with timeout(seconds=AIRFLOW_CONFIG_TIMEOUT_IN_SECONDS):
141-
data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz")
142-
143-
# Load the data into memory as numpy arrays
144-
with gzip.open(data_path, "rb") as f:
145-
train_set, _, _ = pickle.load(f, encoding="latin1")
146-
147130
fm = FactorizationMachines(
148131
role=ROLE,
149132
train_instance_count=SINGLE_INSTANCE_COUNT,
@@ -157,7 +140,8 @@ def test_fm_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_inst
157140
sagemaker_session=sagemaker_session,
158141
)
159142

160-
records = fm.record_set(train_set[0][:200], train_set[1][:200].astype("float32"))
143+
training_set = datasets.one_p_mnist()
144+
records = fm.record_set(training_set[0][:200], training_set[1][:200].astype("float32"))
161145

162146
training_config = _build_airflow_workflow(
163147
estimator=fm, instance_type=cpu_instance_type, inputs=records
@@ -203,12 +187,6 @@ def test_ipinsights_airflow_config_uploads_data_source_to_s3(sagemaker_session,
203187

204188
def test_knn_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_instance_type):
205189
with timeout(seconds=AIRFLOW_CONFIG_TIMEOUT_IN_SECONDS):
206-
data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz")
207-
208-
# Load the data into memory as numpy arrays
209-
with gzip.open(data_path, "rb") as f:
210-
train_set, _, _ = pickle.load(f, encoding="latin1")
211-
212190
knn = KNN(
213191
role=ROLE,
214192
train_instance_count=SINGLE_INSTANCE_COUNT,
@@ -219,7 +197,8 @@ def test_knn_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_ins
219197
sagemaker_session=sagemaker_session,
220198
)
221199

222-
records = knn.record_set(train_set[0][:200], train_set[1][:200].astype("float32"))
200+
training_set = datasets.one_p_mnist()
201+
records = knn.record_set(training_set[0][:200], training_set[1][:200].astype("float32"))
223202

224203
training_config = _build_airflow_workflow(
225204
estimator=knn, instance_type=cpu_instance_type, inputs=records
@@ -273,15 +252,10 @@ def test_linearlearner_airflow_config_uploads_data_source_to_s3(
273252
sagemaker_session, cpu_instance_type
274253
):
275254
with timeout(seconds=AIRFLOW_CONFIG_TIMEOUT_IN_SECONDS):
276-
data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz")
277-
278-
# Load the data into memory as numpy arrays
279-
with gzip.open(data_path, "rb") as f:
280-
train_set, _, _ = pickle.load(f, encoding="latin1")
281-
282-
train_set[1][:100] = 1
283-
train_set[1][100:200] = 0
284-
train_set = train_set[0], train_set[1].astype(np.dtype("float32"))
255+
training_set = datasets.one_p_mnist()
256+
training_set[1][:100] = 1
257+
training_set[1][100:200] = 0
258+
training_set = training_set[0], training_set[1].astype(np.dtype("float32"))
285259

286260
ll = LinearLearner(
287261
ROLE,
@@ -326,7 +300,7 @@ def test_linearlearner_airflow_config_uploads_data_source_to_s3(
326300
ll.early_stopping_tolerance = 0.0001
327301
ll.early_stopping_patience = 3
328302

329-
records = ll.record_set(train_set[0][:200], train_set[1][:200])
303+
records = ll.record_set(training_set[0][:200], training_set[1][:200])
330304

331305
training_config = _build_airflow_workflow(
332306
estimator=ll, instance_type=cpu_instance_type, inputs=records
@@ -375,12 +349,6 @@ def test_ntm_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_ins
375349
@pytest.mark.canary_quick
376350
def test_pca_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_instance_type):
377351
with timeout(seconds=AIRFLOW_CONFIG_TIMEOUT_IN_SECONDS):
378-
data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz")
379-
380-
# Load the data into memory as numpy arrays
381-
with gzip.open(data_path, "rb") as f:
382-
train_set, _, _ = pickle.load(f, encoding="latin1")
383-
384352
pca = PCA(
385353
role=ROLE,
386354
train_instance_count=SINGLE_INSTANCE_COUNT,
@@ -393,7 +361,7 @@ def test_pca_airflow_config_uploads_data_source_to_s3(sagemaker_session, cpu_ins
393361
pca.subtract_mean = True
394362
pca.extra_components = 5
395363

396-
records = pca.record_set(train_set[0][:100])
364+
records = pca.record_set(datasets.one_p_mnist()[0][:100])
397365

398366
training_config = _build_airflow_workflow(
399367
estimator=pca, instance_type=cpu_instance_type, inputs=records

tests/integ/test_byo_estimator.py

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,16 @@
1212
# language governing permissions and limitations under the License.
1313
from __future__ import absolute_import
1414

15-
import gzip
1615
import json
1716
import os
18-
import pickle
1917

2018
import pytest
2119

2220
import sagemaker
2321
from sagemaker.amazon.amazon_estimator import get_image_uri
2422
from sagemaker.estimator import Estimator
2523
from sagemaker.utils import unique_name_from_base
26-
from tests.integ import DATA_DIR, TRAINING_DEFAULT_TIMEOUT_MINUTES
24+
from tests.integ import DATA_DIR, TRAINING_DEFAULT_TIMEOUT_MINUTES, datasets
2725
from tests.integ.timeout import timeout, timeout_and_delete_endpoint_by_name
2826

2927

@@ -32,6 +30,11 @@ def region(sagemaker_session):
3230
return sagemaker_session.boto_session.region_name
3331

3432

33+
@pytest.fixture
34+
def training_set():
35+
return datasets.one_p_mnist()
36+
37+
3538
def fm_serializer(data):
3639
js = {"instances": []}
3740
for row in data:
@@ -40,7 +43,7 @@ def fm_serializer(data):
4043

4144

4245
@pytest.mark.canary_quick
43-
def test_byo_estimator(sagemaker_session, region, cpu_instance_type):
46+
def test_byo_estimator(sagemaker_session, region, cpu_instance_type, training_set):
4447
"""Use Factorization Machines algorithm as an example here.
4548
4649
First we need to prepare data for training. We take standard data set, convert it to the
@@ -56,11 +59,6 @@ def test_byo_estimator(sagemaker_session, region, cpu_instance_type):
5659
job_name = unique_name_from_base("byo")
5760

5861
with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES):
59-
data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz")
60-
61-
with gzip.open(data_path, "rb") as f:
62-
train_set, _, _ = pickle.load(f, encoding="latin1")
63-
6462
prefix = "test_byo_estimator"
6563
key = "recordio-pb-data"
6664

@@ -90,25 +88,20 @@ def test_byo_estimator(sagemaker_session, region, cpu_instance_type):
9088
predictor.content_type = "application/json"
9189
predictor.deserializer = sagemaker.predictor.json_deserializer
9290

93-
result = predictor.predict(train_set[0][:10])
91+
result = predictor.predict(training_set[0][:10])
9492

9593
assert len(result["predictions"]) == 10
9694
for prediction in result["predictions"]:
9795
assert prediction["score"] is not None
9896

9997

100-
def test_async_byo_estimator(sagemaker_session, region, cpu_instance_type):
98+
def test_async_byo_estimator(sagemaker_session, region, cpu_instance_type, training_set):
10199
image_name = get_image_uri(region, "factorization-machines")
102100
endpoint_name = unique_name_from_base("byo")
103101
training_data_path = os.path.join(DATA_DIR, "dummy_tensor")
104102
job_name = unique_name_from_base("byo")
105103

106104
with timeout(minutes=5):
107-
data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz")
108-
109-
with gzip.open(data_path, "rb") as f:
110-
train_set, _, _ = pickle.load(f, encoding="latin1")
111-
112105
prefix = "test_byo_estimator"
113106
key = "recordio-pb-data"
114107

@@ -141,7 +134,7 @@ def test_async_byo_estimator(sagemaker_session, region, cpu_instance_type):
141134
predictor.content_type = "application/json"
142135
predictor.deserializer = sagemaker.predictor.json_deserializer
143136

144-
result = predictor.predict(train_set[0][:10])
137+
result = predictor.predict(training_set[0][:10])
145138

146139
assert len(result["predictions"]) == 10
147140
for prediction in result["predictions"]:

tests/integ/test_factorization_machines.py

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,27 +12,25 @@
1212
# language governing permissions and limitations under the License.
1313
from __future__ import absolute_import
1414

15-
import gzip
16-
import os
17-
import pickle
1815
import time
1916

17+
import pytest
18+
2019
from sagemaker import FactorizationMachines, FactorizationMachinesModel
2120
from sagemaker.utils import unique_name_from_base
22-
from tests.integ import DATA_DIR, TRAINING_DEFAULT_TIMEOUT_MINUTES
21+
from tests.integ import datasets, TRAINING_DEFAULT_TIMEOUT_MINUTES
2322
from tests.integ.timeout import timeout, timeout_and_delete_endpoint_by_name
2423

2524

26-
def test_factorization_machines(sagemaker_session, cpu_instance_type):
27-
job_name = unique_name_from_base("fm")
25+
@pytest.fixture
26+
def training_set():
27+
return datasets.one_p_mnist()
2828

29-
with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES):
30-
data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz")
3129

32-
# Load the data into memory as numpy arrays
33-
with gzip.open(data_path, "rb") as f:
34-
train_set, _, _ = pickle.load(f, encoding="latin1")
30+
def test_factorization_machines(sagemaker_session, cpu_instance_type, training_set):
31+
job_name = unique_name_from_base("fm")
3532

33+
with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES):
3634
fm = FactorizationMachines(
3735
role="SageMakerRole",
3836
train_instance_count=1,
@@ -48,7 +46,7 @@ def test_factorization_machines(sagemaker_session, cpu_instance_type):
4846

4947
# training labels must be 'float32'
5048
fm.fit(
51-
fm.record_set(train_set[0][:200], train_set[1][:200].astype("float32")),
49+
fm.record_set(training_set[0][:200], training_set[1][:200].astype("float32")),
5250
job_name=job_name,
5351
)
5452

@@ -57,23 +55,17 @@ def test_factorization_machines(sagemaker_session, cpu_instance_type):
5755
fm.model_data, role="SageMakerRole", sagemaker_session=sagemaker_session
5856
)
5957
predictor = model.deploy(1, cpu_instance_type, endpoint_name=job_name)
60-
result = predictor.predict(train_set[0][:10])
58+
result = predictor.predict(training_set[0][:10])
6159

6260
assert len(result) == 10
6361
for record in result:
6462
assert record.label["score"] is not None
6563

6664

67-
def test_async_factorization_machines(sagemaker_session, cpu_instance_type):
65+
def test_async_factorization_machines(sagemaker_session, cpu_instance_type, training_set):
6866
job_name = unique_name_from_base("fm")
6967

7068
with timeout(minutes=5):
71-
data_path = os.path.join(DATA_DIR, "one_p_mnist", "mnist.pkl.gz")
72-
73-
# Load the data into memory as numpy arrays
74-
with gzip.open(data_path, "rb") as f:
75-
train_set, _, _ = pickle.load(f, encoding="latin1")
76-
7769
fm = FactorizationMachines(
7870
role="SageMakerRole",
7971
train_instance_count=1,
@@ -89,7 +81,7 @@ def test_async_factorization_machines(sagemaker_session, cpu_instance_type):
8981

9082
# training labels must be 'float32'
9183
fm.fit(
92-
fm.record_set(train_set[0][:200], train_set[1][:200].astype("float32")),
84+
fm.record_set(training_set[0][:200], training_set[1][:200].astype("float32")),
9385
job_name=job_name,
9486
wait=False,
9587
)
@@ -106,7 +98,7 @@ def test_async_factorization_machines(sagemaker_session, cpu_instance_type):
10698
estimator.model_data, role="SageMakerRole", sagemaker_session=sagemaker_session
10799
)
108100
predictor = model.deploy(1, cpu_instance_type, endpoint_name=job_name)
109-
result = predictor.predict(train_set[0][:10])
101+
result = predictor.predict(training_set[0][:10])
110102

111103
assert len(result) == 10
112104
for record in result:

0 commit comments

Comments
 (0)