Skip to content

change: improving Chainer integ tests #872

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jul 22, 2019
9 changes: 3 additions & 6 deletions tests/data/chainer_mnist/distributed_mnist.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def __call__(self, x):


def _preprocess_mnist(raw, withlabel, ndim, scale, image_dtype, label_dtype, rgb_format):
images = raw["x"]
images = raw["x"][-100:]
if ndim == 2:
images = images.reshape(-1, 28, 28)
elif ndim == 3:
Expand All @@ -59,7 +59,7 @@ def _preprocess_mnist(raw, withlabel, ndim, scale, image_dtype, label_dtype, rgb
images *= scale / 255.0

if withlabel:
labels = raw["y"].astype(label_dtype)
labels = raw["y"][-100:].astype(label_dtype)
return tuple_dataset.TupleDataset(images, labels)
return images

Expand Down Expand Up @@ -111,9 +111,6 @@ def _preprocess_mnist(raw, withlabel, ndim, scale, image_dtype, label_dtype, rgb
optimizer = chainermn.create_multi_node_optimizer(chainer.optimizers.Adam(), comm)
optimizer.setup(model)

train_file = np.load(os.path.join(args.train, "train.npz"))
test_file = np.load(os.path.join(args.test, "test.npz"))

preprocess_mnist_options = {
"withlabel": True,
"ndim": 1,
Expand Down Expand Up @@ -173,7 +170,7 @@ def _preprocess_mnist(raw, withlabel, ndim, scale, image_dtype, label_dtype, rgb
trainer.run()

# only save the model in the master node
if args.host == "algo-1":
if args.host == env.hosts[0]:
serializers.save_npz(os.path.join(env.model_dir, "model.npz"), model)


Expand Down
4 changes: 2 additions & 2 deletions tests/data/chainer_mnist/mnist.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __call__(self, x):


def _preprocess_mnist(raw, withlabel, ndim, scale, image_dtype, label_dtype, rgb_format):
images = raw["x"]
images = raw["x"][-100:]
if ndim == 2:
images = images.reshape(-1, 28, 28)
elif ndim == 3:
Expand All @@ -55,7 +55,7 @@ def _preprocess_mnist(raw, withlabel, ndim, scale, image_dtype, label_dtype, rgb
images *= scale / 255.0

if withlabel:
labels = raw["y"].astype(label_dtype)
labels = raw["y"][-100:].astype(label_dtype)
return tuple_dataset.TupleDataset(images, labels)
else:
return images
Expand Down
174 changes: 80 additions & 94 deletions tests/integ/test_chainer_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,152 +13,138 @@
from __future__ import absolute_import

import os
import time

import pytest
import numpy
import pytest

from sagemaker.chainer.defaults import CHAINER_VERSION
from sagemaker.chainer.estimator import Chainer
from sagemaker.chainer.model import ChainerModel
from sagemaker.utils import unique_name_from_base
import tests.integ
from tests.integ import DATA_DIR, PYTHON_VERSION, TRAINING_DEFAULT_TIMEOUT_MINUTES
from tests.integ.timeout import timeout, timeout_and_delete_endpoint_by_name


@pytest.fixture(scope="module")
def chainer_training_job(sagemaker_session, chainer_full_version):
return _run_mnist_training_job(sagemaker_session, "ml.c4.xlarge", 1, chainer_full_version)
def chainer_local_training_job(sagemaker_local_session, chainer_full_version):
return _run_mnist_training_job(sagemaker_local_session, "local", 1, chainer_full_version)


@pytest.mark.local_mode
def test_distributed_cpu_training(sagemaker_local_session, chainer_full_version):
_run_mnist_training_job(sagemaker_local_session, "local", 2, chainer_full_version)


def test_distributed_cpu_training(sagemaker_session, chainer_full_version):
_run_mnist_training_job(sagemaker_session, "ml.c4.xlarge", 2, chainer_full_version)
@pytest.mark.local_mode
def test_training_with_additional_hyperparameters(sagemaker_local_session, chainer_full_version):
script_path = os.path.join(DATA_DIR, "chainer_mnist", "mnist.py")
data_path = os.path.join(DATA_DIR, "chainer_mnist")

chainer = Chainer(
entry_point=script_path,
role="SageMakerRole",
train_instance_count=1,
train_instance_type="local",
framework_version=chainer_full_version,
py_version=PYTHON_VERSION,
sagemaker_session=sagemaker_local_session,
hyperparameters={"epochs": 1},
use_mpi=True,
num_processes=2,
process_slots_per_host=2,
additional_mpi_options="-x NCCL_DEBUG=INFO",
)

@pytest.mark.skipif(
tests.integ.test_region() in tests.integ.HOSTING_NO_P2_REGIONS
or tests.integ.test_region() in tests.integ.TRAINING_NO_P2_REGIONS,
reason="no ml.p2 instances in these regions",
)
def test_distributed_gpu_training(sagemaker_session, chainer_full_version):
_run_mnist_training_job(sagemaker_session, "ml.p2.xlarge", 2, chainer_full_version)
train_input = "file://" + os.path.join(data_path, "train")
test_input = "file://" + os.path.join(data_path, "test")

chainer.fit({"train": train_input, "test": test_input})

def test_training_with_additional_hyperparameters(sagemaker_session, chainer_full_version):

@pytest.mark.canary_quick
@pytest.mark.regional_testing
def test_attach_deploy(sagemaker_session, chainer_full_version):
with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES):
script_path = os.path.join(DATA_DIR, "chainer_mnist", "mnist.py")
data_path = os.path.join(DATA_DIR, "chainer_mnist")

chainer = Chainer(
entry_point=script_path,
role="SageMakerRole",
train_instance_count=1,
train_instance_type="ml.c4.xlarge",
framework_version=chainer_full_version,
py_version=PYTHON_VERSION,
train_instance_count=1,
train_instance_type="ml.c4.xlarge",
sagemaker_session=sagemaker_session,
hyperparameters={"epochs": 1},
use_mpi=True,
num_processes=2,
process_slots_per_host=2,
additional_mpi_options="-x NCCL_DEBUG=INFO",
)

train_input = chainer.sagemaker_session.upload_data(
train_input = sagemaker_session.upload_data(
path=os.path.join(data_path, "train"), key_prefix="integ-test-data/chainer_mnist/train"
)
test_input = chainer.sagemaker_session.upload_data(

test_input = sagemaker_session.upload_data(
path=os.path.join(data_path, "test"), key_prefix="integ-test-data/chainer_mnist/test"
)

job_name = unique_name_from_base("test-chainer-training")
chainer.fit({"train": train_input, "test": test_input}, job_name=job_name)
return chainer.latest_training_job.name

chainer.fit({"train": train_input, "test": test_input}, wait=False, job_name=job_name)

@pytest.mark.canary_quick
@pytest.mark.regional_testing
def test_attach_deploy(chainer_training_job, sagemaker_session):
endpoint_name = unique_name_from_base("test-chainer-attach-deploy")

with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session):
estimator = Chainer.attach(chainer_training_job, sagemaker_session=sagemaker_session)
estimator = Chainer.attach(
chainer.latest_training_job.name, sagemaker_session=sagemaker_session
)
predictor = estimator.deploy(1, "ml.m4.xlarge", endpoint_name=endpoint_name)
_predict_and_assert(predictor)


def test_deploy_model(chainer_training_job, sagemaker_session):
endpoint_name = unique_name_from_base("test-chainer-deploy-model")
with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session):
desc = sagemaker_session.sagemaker_client.describe_training_job(
TrainingJobName=chainer_training_job
)
model_data = desc["ModelArtifacts"]["S3ModelArtifacts"]
script_path = os.path.join(DATA_DIR, "chainer_mnist", "mnist.py")
model = ChainerModel(
model_data,
"SageMakerRole",
entry_point=script_path,
sagemaker_session=sagemaker_session,
)
predictor = model.deploy(1, "ml.m4.xlarge", endpoint_name=endpoint_name)
_predict_and_assert(predictor)

@pytest.mark.local_mode
def test_deploy_model(chainer_local_training_job, sagemaker_local_session):
script_path = os.path.join(DATA_DIR, "chainer_mnist", "mnist.py")

def test_async_fit(sagemaker_session):
with timeout(minutes=5):
training_job_name = _run_mnist_training_job(
sagemaker_session, "ml.c4.xlarge", 1, chainer_full_version=CHAINER_VERSION, wait=False
)
model = ChainerModel(
chainer_local_training_job.model_data,
"SageMakerRole",
entry_point=script_path,
sagemaker_session=sagemaker_local_session,
)

print("Waiting to re-attach to the training job: %s" % training_job_name)
time.sleep(20)

endpoint_name = unique_name_from_base("test-chainer-async-fit")
with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session):
print("Re-attaching now to: %s" % training_job_name)
estimator = Chainer.attach(
training_job_name=training_job_name, sagemaker_session=sagemaker_session
)
predictor = estimator.deploy(1, "ml.c4.xlarge", endpoint_name=endpoint_name)
predictor = model.deploy(1, "local")
try:
_predict_and_assert(predictor)
finally:
predictor.delete_endpoint()


def _run_mnist_training_job(
sagemaker_session, instance_type, instance_count, chainer_full_version, wait=True
):
with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES):

script_path = (
os.path.join(DATA_DIR, "chainer_mnist", "mnist.py")
if instance_type == 1
else os.path.join(DATA_DIR, "chainer_mnist", "distributed_mnist.py")
)

data_path = os.path.join(DATA_DIR, "chainer_mnist")

chainer = Chainer(
entry_point=script_path,
role="SageMakerRole",
framework_version=chainer_full_version,
py_version=PYTHON_VERSION,
train_instance_count=instance_count,
train_instance_type=instance_type,
sagemaker_session=sagemaker_session,
hyperparameters={"epochs": 1},
)

train_input = chainer.sagemaker_session.upload_data(
path=os.path.join(data_path, "train"), key_prefix="integ-test-data/chainer_mnist/train"
)
test_input = chainer.sagemaker_session.upload_data(
path=os.path.join(data_path, "test"), key_prefix="integ-test-data/chainer_mnist/test"
)

job_name = unique_name_from_base("test-chainer-training")
chainer.fit({"train": train_input, "test": test_input}, wait=wait, job_name=job_name)
return chainer.latest_training_job.name
script_path = (
os.path.join(DATA_DIR, "chainer_mnist", "mnist.py")
if instance_type == 1
else os.path.join(DATA_DIR, "chainer_mnist", "distributed_mnist.py")
)

data_path = os.path.join(DATA_DIR, "chainer_mnist")

chainer = Chainer(
entry_point=script_path,
role="SageMakerRole",
framework_version=chainer_full_version,
py_version=PYTHON_VERSION,
train_instance_count=instance_count,
train_instance_type=instance_type,
sagemaker_session=sagemaker_session,
hyperparameters={"epochs": 1},
)

train_input = "file://" + os.path.join(data_path, "train")
test_input = "file://" + os.path.join(data_path, "test")

job_name = unique_name_from_base("test-chainer-training")
chainer.fit({"train": train_input, "test": test_input}, wait=wait, job_name=job_name)
return chainer


def _predict_and_assert(predictor):
Expand Down