Skip to content

infra: use pytest fixtures in batch transform integ tests to train and upload to S3 only once #1410

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 14, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 40 additions & 140 deletions tests/integ/test_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,13 @@
from tests.integ.timeout import timeout, timeout_and_delete_model_with_transformer
from tests.integ.vpc_test_utils import get_or_create_vpc_resources

MXNET_MNIST_PATH = os.path.join(DATA_DIR, "mxnet_mnist")

@pytest.mark.canary_quick
def test_transform_mxnet(sagemaker_session, mxnet_full_version, cpu_instance_type):
data_path = os.path.join(DATA_DIR, "mxnet_mnist")
script_path = os.path.join(data_path, "mnist.py")

@pytest.fixture(scope="module")
def mxnet_estimator(sagemaker_session, mxnet_full_version, cpu_instance_type):
mx = MXNet(
entry_point=script_path,
entry_point=os.path.join(MXNET_MNIST_PATH, "mnist.py"),
role="SageMakerRole",
train_instance_count=1,
train_instance_type=cpu_instance_type,
Expand All @@ -52,29 +51,39 @@ def test_transform_mxnet(sagemaker_session, mxnet_full_version, cpu_instance_typ
)

train_input = mx.sagemaker_session.upload_data(
path=os.path.join(data_path, "train"), key_prefix="integ-test-data/mxnet_mnist/train"
path=os.path.join(MXNET_MNIST_PATH, "train"), key_prefix="integ-test-data/mxnet_mnist/train"
)
test_input = mx.sagemaker_session.upload_data(
path=os.path.join(data_path, "test"), key_prefix="integ-test-data/mxnet_mnist/test"
path=os.path.join(MXNET_MNIST_PATH, "test"), key_prefix="integ-test-data/mxnet_mnist/test"
)
job_name = unique_name_from_base("test-mxnet-transform")

job_name = unique_name_from_base("test-mxnet-transform")
with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES):
mx.fit({"train": train_input, "test": test_input}, job_name=job_name)

transform_input_path = os.path.join(data_path, "transform", "data.csv")
return mx


@pytest.fixture(scope="module")
def mxnet_transform_input(sagemaker_session):
transform_input_path = os.path.join(MXNET_MNIST_PATH, "transform", "data.csv")
transform_input_key_prefix = "integ-test-data/mxnet_mnist/transform"
transform_input = mx.sagemaker_session.upload_data(
return sagemaker_session.upload_data(
path=transform_input_path, key_prefix=transform_input_key_prefix
)


@pytest.mark.canary_quick
def test_transform_mxnet(
mxnet_estimator, mxnet_transform_input, sagemaker_session, cpu_instance_type
):
kms_key_arn = get_or_create_kms_key(sagemaker_session)
output_filter = "$"
input_filter = "$"

transformer = _create_transformer_and_transform_job(
mx,
transform_input,
mxnet_estimator,
mxnet_transform_input,
cpu_instance_type,
kms_key_arn,
input_filter=input_filter,
Expand Down Expand Up @@ -197,39 +206,13 @@ def test_transform_pytorch_vpc_custom_model_bucket(
assert custom_bucket_name == model_bucket


def test_transform_mxnet_tags(sagemaker_session, mxnet_full_version, cpu_instance_type):
data_path = os.path.join(DATA_DIR, "mxnet_mnist")
script_path = os.path.join(data_path, "mnist.py")
def test_transform_mxnet_tags(
mxnet_estimator, mxnet_transform_input, sagemaker_session, cpu_instance_type
):
tags = [{"Key": "some-tag", "Value": "value-for-tag"}]

mx = MXNet(
entry_point=script_path,
role="SageMakerRole",
train_instance_count=1,
train_instance_type=cpu_instance_type,
sagemaker_session=sagemaker_session,
framework_version=mxnet_full_version,
)

train_input = mx.sagemaker_session.upload_data(
path=os.path.join(data_path, "train"), key_prefix="integ-test-data/mxnet_mnist/train"
)
test_input = mx.sagemaker_session.upload_data(
path=os.path.join(data_path, "test"), key_prefix="integ-test-data/mxnet_mnist/test"
)
job_name = unique_name_from_base("test-mxnet-transform")

with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES):
mx.fit({"train": train_input, "test": test_input}, job_name=job_name)

transform_input_path = os.path.join(data_path, "transform", "data.csv")
transform_input_key_prefix = "integ-test-data/mxnet_mnist/transform"
transform_input = mx.sagemaker_session.upload_data(
path=transform_input_path, key_prefix=transform_input_key_prefix
)

transformer = mx.transformer(1, cpu_instance_type, tags=tags)
transformer.transform(transform_input, content_type="text/csv")
transformer = mxnet_estimator.transformer(1, cpu_instance_type, tags=tags)
transformer.transform(mxnet_transform_input, content_type="text/csv")

with timeout_and_delete_model_with_transformer(
transformer, sagemaker_session, minutes=TRANSFORM_DEFAULT_TIMEOUT_MINUTES
Expand Down Expand Up @@ -306,86 +289,29 @@ def test_transform_byo_estimator(sagemaker_session, cpu_instance_type):
assert tags == model_tags


def test_single_transformer_multiple_jobs(sagemaker_session, mxnet_full_version, cpu_instance_type):
data_path = os.path.join(DATA_DIR, "mxnet_mnist")
script_path = os.path.join(data_path, "mnist.py")

mx = MXNet(
entry_point=script_path,
role="SageMakerRole",
train_instance_count=1,
train_instance_type=cpu_instance_type,
sagemaker_session=sagemaker_session,
framework_version=mxnet_full_version,
)

train_input = mx.sagemaker_session.upload_data(
path=os.path.join(data_path, "train"), key_prefix="integ-test-data/mxnet_mnist/train"
)
test_input = mx.sagemaker_session.upload_data(
path=os.path.join(data_path, "test"), key_prefix="integ-test-data/mxnet_mnist/test"
)
job_name = unique_name_from_base("test-mxnet-transform")

with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES):
mx.fit({"train": train_input, "test": test_input}, job_name=job_name)

transform_input_path = os.path.join(data_path, "transform", "data.csv")
transform_input_key_prefix = "integ-test-data/mxnet_mnist/transform"
transform_input = mx.sagemaker_session.upload_data(
path=transform_input_path, key_prefix=transform_input_key_prefix
)

transformer = mx.transformer(1, cpu_instance_type)
def test_single_transformer_multiple_jobs(
mxnet_estimator, mxnet_transform_input, sagemaker_session, cpu_instance_type
):
transformer = mxnet_estimator.transformer(1, cpu_instance_type)

job_name = unique_name_from_base("test-mxnet-transform")
transformer.transform(transform_input, content_type="text/csv", job_name=job_name)
transformer.transform(mxnet_transform_input, content_type="text/csv", job_name=job_name)
with timeout_and_delete_model_with_transformer(
transformer, sagemaker_session, minutes=TRANSFORM_DEFAULT_TIMEOUT_MINUTES
):
assert transformer.output_path == "s3://{}/{}".format(
sagemaker_session.default_bucket(), job_name
)
job_name = unique_name_from_base("test-mxnet-transform")
transformer.transform(transform_input, content_type="text/csv", job_name=job_name)
transformer.transform(mxnet_transform_input, content_type="text/csv", job_name=job_name)
assert transformer.output_path == "s3://{}/{}".format(
sagemaker_session.default_bucket(), job_name
)


def test_stop_transform_job(sagemaker_session, mxnet_full_version, cpu_instance_type):
data_path = os.path.join(DATA_DIR, "mxnet_mnist")
script_path = os.path.join(data_path, "mnist.py")
tags = [{"Key": "some-tag", "Value": "value-for-tag"}]

mx = MXNet(
entry_point=script_path,
role="SageMakerRole",
train_instance_count=1,
train_instance_type=cpu_instance_type,
sagemaker_session=sagemaker_session,
framework_version=mxnet_full_version,
)

train_input = mx.sagemaker_session.upload_data(
path=os.path.join(data_path, "train"), key_prefix="integ-test-data/mxnet_mnist/train"
)
test_input = mx.sagemaker_session.upload_data(
path=os.path.join(data_path, "test"), key_prefix="integ-test-data/mxnet_mnist/test"
)
job_name = unique_name_from_base("test-mxnet-transform")

with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES):
mx.fit({"train": train_input, "test": test_input}, job_name=job_name)

transform_input_path = os.path.join(data_path, "transform", "data.csv")
transform_input_key_prefix = "integ-test-data/mxnet_mnist/transform"
transform_input = mx.sagemaker_session.upload_data(
path=transform_input_path, key_prefix=transform_input_key_prefix
)

transformer = mx.transformer(1, cpu_instance_type, tags=tags)
transformer.transform(transform_input, content_type="text/csv")
def test_stop_transform_job(mxnet_estimator, mxnet_transform_input, cpu_instance_type):
transformer = mxnet_estimator.transformer(1, cpu_instance_type)
transformer.transform(mxnet_transform_input, content_type="text/csv")

time.sleep(15)

Expand All @@ -401,39 +327,12 @@ def test_stop_transform_job(sagemaker_session, mxnet_full_version, cpu_instance_
assert desc["TransformJobStatus"] == "Stopped"


def test_transform_mxnet_logs(sagemaker_session, mxnet_full_version, cpu_instance_type):
data_path = os.path.join(DATA_DIR, "mxnet_mnist")
script_path = os.path.join(data_path, "mnist.py")

mx = MXNet(
entry_point=script_path,
role="SageMakerRole",
train_instance_count=1,
train_instance_type=cpu_instance_type,
sagemaker_session=sagemaker_session,
framework_version=mxnet_full_version,
)

train_input = mx.sagemaker_session.upload_data(
path=os.path.join(data_path, "train"), key_prefix="integ-test-data/mxnet_mnist/train"
)
test_input = mx.sagemaker_session.upload_data(
path=os.path.join(data_path, "test"), key_prefix="integ-test-data/mxnet_mnist/test"
)
job_name = unique_name_from_base("test-mxnet-transform")

with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES):
mx.fit({"train": train_input, "test": test_input}, job_name=job_name)

transform_input_path = os.path.join(data_path, "transform", "data.csv")
transform_input_key_prefix = "integ-test-data/mxnet_mnist/transform"
transform_input = mx.sagemaker_session.upload_data(
path=transform_input_path, key_prefix=transform_input_key_prefix
)

def test_transform_mxnet_logs(
mxnet_estimator, mxnet_transform_input, sagemaker_session, cpu_instance_type
):
with timeout(minutes=45):
transformer = _create_transformer_and_transform_job(
mx, transform_input, cpu_instance_type, wait=True, logs=True
mxnet_estimator, mxnet_transform_input, cpu_instance_type, wait=True, logs=True
)

with timeout_and_delete_model_with_transformer(
Expand Down Expand Up @@ -462,5 +361,6 @@ def _create_transformer_and_transform_job(
join_source=join_source,
wait=wait,
logs=logs,
job_name=unique_name_from_base("test-transform"),
)
return transformer