Skip to content

Supporting inter-container traffic encryption flag #595

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
CHANGELOG
=========

1.17.1.dev
==========
1.17.1
======

* enhancement: Workflow: Specify tasks from which training/tuning operator to transform/deploy in related operators
* feature: Supporting inter-container traffic encryption flag

1.17.0
======
Expand Down
20 changes: 20 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,26 @@ To train a model using your own VPC, set the optional parameters ``subnets`` and
# SageMaker Training Job will set VpcConfig and container instances will run in your VPC
mxnet_vpc_estimator.fit('s3://my_bucket/my_training_data/')

To train a model with the inter-container traffic encrypted, set the optional parameters ``subnets`` and ``security_group_ids`` and
the flag ``encrypt_inter_container_traffic`` as ``True`` on an Estimator (Note: This flag can be used only if you specify that the training
job runs in a VPC):

.. code:: python

from sagemaker.mxnet import MXNet

# Configure an MXNet Estimator with subnets and security groups from your VPC
mxnet_vpc_estimator = MXNet('train.py',
train_instance_type='ml.p2.xlarge',
train_instance_count=1,
framework_version='1.2.1',
subnets=['subnet-1', 'subnet-2'],
security_group_ids=['sg-1'],
encrypt_inter_container_traffic=True)

# The SageMaker training job sets the VpcConfig, and training container instances run in your VPC with traffic between the containers encrypted
mxnet_vpc_estimator.fit('s3://my_bucket/my_training_data/')

When you create a ``Predictor`` from the ``Estimator`` using ``deploy()``, the same VPC configurations will be set on the SageMaker Model:

.. code:: python
Expand Down
2 changes: 1 addition & 1 deletion doc/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __getattr__(cls, name):
'numpy', 'scipy', 'scipy.sparse']
sys.modules.update((mod_name, Mock()) for mod_name in MOCK_MODULES)

version = '1.17.0'
version = '1.17.1'
project = u'sagemaker'

# Add any Sphinx extension module names here, as strings. They can be extensions
Expand Down
2 changes: 1 addition & 1 deletion src/sagemaker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@
from sagemaker.session import s3_input # noqa: F401
from sagemaker.session import get_execution_role # noqa: F401

__version__ = '1.17.0'
__version__ = '1.17.1'
6 changes: 5 additions & 1 deletion src/sagemaker/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def __init__(
model_uri=None,
model_channel_name='model',
metric_definitions=None,
encrypt_inter_container_traffic=False
):
"""Initialize an ``AlgorithmEstimator`` instance.

Expand All @@ -75,7 +76,7 @@ def __init__(
* 'Pipe' - Amazon SageMaker streams data directly from S3 to the container via a Unix-named pipe.

This argument can be overriden on a per-channel basis using ``sagemaker.session.s3_input.input_mode``.
output_path (str): S3 location for saving the trainig result (model artifacts and output files).
output_path (str): S3 location for saving the training result (model artifacts and output files).
If not specified, results are stored to a default bucket. If the bucket with the specific name
does not exist, the estimator creates the bucket during the
:meth:`~sagemaker.estimator.EstimatorBase.fit` method execution.
Expand All @@ -100,6 +101,8 @@ def __init__(
metric_definitions (list[dict]): A list of dictionaries that defines the metric(s) used to evaluate the
training jobs. Each dictionary contains two keys: 'Name' for the name of the metric, and 'Regex' for
the regular expression used to extract the metric from the logs.
encrypt_inter_container_traffic (bool): Specifies whether traffic between training containers is encrypted
for the training job (default: ``False``).
"""
self.algorithm_arn = algorithm_arn
super(AlgorithmEstimator, self).__init__(
Expand All @@ -120,6 +123,7 @@ def __init__(
model_uri=model_uri,
model_channel_name=model_channel_name,
metric_definitions=metric_definitions,
encrypt_inter_container_traffic=encrypt_inter_container_traffic
)

self.algorithm_spec = self.sagemaker_session.sagemaker_client.describe_algorithm(
Expand Down
23 changes: 19 additions & 4 deletions src/sagemaker/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def __init__(self, role, train_instance_count, train_instance_type,
train_volume_size=30, train_volume_kms_key=None, train_max_run=24 * 60 * 60, input_mode='File',
output_path=None, output_kms_key=None, base_job_name=None, sagemaker_session=None, tags=None,
subnets=None, security_group_ids=None, model_uri=None, model_channel_name='model',
metric_definitions=None):
metric_definitions=None, encrypt_inter_container_traffic=False):
"""Initialize an ``EstimatorBase`` instance.

Args:
Expand Down Expand Up @@ -103,6 +103,8 @@ def __init__(self, role, train_instance_count, train_instance_type,
training jobs. Each dictionary contains two keys: 'Name' for the name of the metric, and 'Regex' for
the regular expression used to extract the metric from the logs. This should be defined only
for jobs that don't use an Amazon algorithm.
encrypt_inter_container_traffic (bool): Specifies whether traffic between training containers is encrypted
for the training job (default: ``False``).
"""
self.role = role
self.train_instance_count = train_instance_count
Expand Down Expand Up @@ -138,6 +140,8 @@ def __init__(self, role, train_instance_count, train_instance_type,
self.subnets = subnets
self.security_group_ids = security_group_ids

self.encrypt_inter_container_traffic = encrypt_inter_container_traffic

@abstractmethod
def train_image(self):
"""Return the Docker image to use for training.
Expand Down Expand Up @@ -429,6 +433,10 @@ def _prepare_init_params_from_job_description(cls, job_details, model_channel_na
if 'MetricDefinitons' in job_details['AlgorithmSpecification']:
init_params['metric_definitions'] = job_details['AlgorithmSpecification']['MetricsDefinition']

if 'EnableInterContainerTrafficEncryption' in job_details:
init_params['encrypt_inter_container_traffic'] = \
job_details['EnableInterContainerTrafficEncryption']

subnets, security_group_ids = vpc_utils.from_dict(job_details.get(vpc_utils.VPC_CONFIG_KEY))
if subnets:
init_params['subnets'] = subnets
Expand Down Expand Up @@ -555,6 +563,9 @@ def start_new(cls, estimator, inputs):
if estimator.enable_network_isolation():
train_args['enable_network_isolation'] = True

if estimator.encrypt_inter_container_traffic:
train_args['encrypt_inter_container_traffic'] = True

if isinstance(estimator, sagemaker.algorithm.AlgorithmEstimator):
train_args['algorithm_arn'] = estimator.algorithm_arn
else:
Expand Down Expand Up @@ -585,7 +596,8 @@ def __init__(self, image_name, role, train_instance_count, train_instance_type,
train_volume_size=30, train_volume_kms_key=None, train_max_run=24 * 60 * 60,
input_mode='File', output_path=None, output_kms_key=None, base_job_name=None,
sagemaker_session=None, hyperparameters=None, tags=None, subnets=None, security_group_ids=None,
model_uri=None, model_channel_name='model', metric_definitions=None):
model_uri=None, model_channel_name='model', metric_definitions=None,
encrypt_inter_container_traffic=False):
"""Initialize an ``Estimator`` instance.

Args:
Expand Down Expand Up @@ -640,14 +652,17 @@ def __init__(self, image_name, role, train_instance_count, train_instance_type,
training jobs. Each dictionary contains two keys: 'Name' for the name of the metric, and 'Regex' for
the regular expression used to extract the metric from the logs. This should be defined only
for jobs that don't use an Amazon algorithm.
encrypt_inter_container_traffic (bool): Specifies whether traffic between training containers is encrypted
for the training job (default: ``False``).
"""
self.image_name = image_name
self.hyperparam_dict = hyperparameters.copy() if hyperparameters else {}
super(Estimator, self).__init__(role, train_instance_count, train_instance_type,
train_volume_size, train_volume_kms_key, train_max_run, input_mode,
output_path, output_kms_key, base_job_name, sagemaker_session,
tags, subnets, security_group_ids, model_uri=model_uri,
model_channel_name=model_channel_name, metric_definitions=metric_definitions)
model_channel_name=model_channel_name, metric_definitions=metric_definitions,
encrypt_inter_container_traffic=encrypt_inter_container_traffic)

def train_image(self):
"""
Expand Down Expand Up @@ -743,7 +758,7 @@ def __init__(self, entry_point, source_dir=None, hyperparameters=None, enable_cl
entry_point (str): Path (absolute or relative) to the local Python source file which should be executed
as the entry point to training. This should be compatible with either Python 2.7 or Python 3.5.
source_dir (str): Path (absolute or relative) to a directory with any other training
source code dependencies aside from tne entry point file (default: None). Structure within this
source code dependencies aside from the entry point file (default: None). Structure within this
directory are preserved when training on Amazon SageMaker.
dependencies (list[str]): A list of paths to directories (absolute or relative) with
any additional libraries that will be exported to the container (default: []).
Expand Down
17 changes: 15 additions & 2 deletions src/sagemaker/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ def default_bucket(self):

def train(self, input_mode, input_config, role, job_name, output_config, # noqa: C901
resource_config, vpc_config, hyperparameters, stop_condition, tags, metric_definitions,
enable_network_isolation=False, image=None, algorithm_arn=None):
enable_network_isolation=False, image=None, algorithm_arn=None,
encrypt_inter_container_traffic=False):
"""Create an Amazon SageMaker training job.

Args:
Expand Down Expand Up @@ -261,6 +262,8 @@ def train(self, input_mode, input_config, role, job_name, output_config, # noqa
network isolation or not.
image (str): Docker image containing training code.
algorithm_arn (str): Algorithm Arn from Marketplace.
encrypt_inter_container_traffic (bool): Specifies whether traffic between training containers is
encrypted for the training job (default: ``False``).

Returns:
str: ARN of the training job, if it is created.
Expand Down Expand Up @@ -308,6 +311,10 @@ def train(self, input_mode, input_config, role, job_name, output_config, # noqa
if enable_network_isolation:
train_request['EnableNetworkIsolation'] = enable_network_isolation

if encrypt_inter_container_traffic:
train_request['EnableInterContainerTrafficEncryption'] = \
encrypt_inter_container_traffic

LOGGER.info('Creating training-job with name: {}'.format(job_name))
LOGGER.debug('train request: {}'.format(json.dumps(train_request, indent=4)))
self.sagemaker_client.create_training_job(**train_request)
Expand Down Expand Up @@ -351,7 +358,7 @@ def tune(self, job_name, strategy, objective_type, objective_metric_name,
static_hyperparameters, input_mode, metric_definitions,
role, input_config, output_config, resource_config, stop_condition, tags,
warm_start_config, enable_network_isolation=False, image=None, algorithm_arn=None,
early_stopping_type='Off'):
early_stopping_type='Off', encrypt_inter_container_traffic=False):
"""Create an Amazon SageMaker hyperparameter tuning job

Args:
Expand Down Expand Up @@ -400,6 +407,9 @@ def tune(self, job_name, strategy, objective_type, objective_metric_name,
early_stopping_type (str): Specifies whether early stopping is enabled for the job.
Can be either 'Auto' or 'Off'. If set to 'Off', early stopping will not be attempted.
If set to 'Auto', early stopping of some training jobs may happen, but is not guaranteed to.
encrypt_inter_container_traffic (bool): Specifies whether traffic between training containers
is encrypted for the training jobs started for this hyperparameter tuning job. Set to ``False``
by default.
"""
tune_request = {
'HyperParameterTuningJobName': job_name,
Expand Down Expand Up @@ -450,6 +460,9 @@ def tune(self, job_name, strategy, objective_type, objective_metric_name,
if enable_network_isolation:
tune_request['TrainingJobDefinition']['EnableNetworkIsolation'] = True

if encrypt_inter_container_traffic:
tune_request['TrainingJobDefinition']['EnableInterContainerTrafficEncryption'] = True

LOGGER.info('Creating hyperparameter tuning job with name: {}'.format(job_name))
LOGGER.debug('tune request: {}'.format(json.dumps(tune_request, indent=4)))
self.sagemaker_client.create_hyper_parameter_tuning_job(**tune_request)
Expand Down
2 changes: 2 additions & 0 deletions src/sagemaker/tuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,8 @@ def start_new(cls, tuner, inputs):
tuner_args['image'] = tuner.estimator.train_image()

tuner_args['enable_network_isolation'] = tuner.estimator.enable_network_isolation()
tuner_args['encrypt_inter_container_traffic'] = \
tuner.estimator.encrypt_inter_container_traffic

tuner.estimator.sagemaker_session.tune(**tuner_args)

Expand Down
8 changes: 6 additions & 2 deletions tests/integ/test_tf.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from sagemaker.utils import sagemaker_timestamp
from tests.integ import DATA_DIR, TRAINING_DEFAULT_TIMEOUT_MINUTES, PYTHON_VERSION
from tests.integ.timeout import timeout_and_delete_endpoint_by_name, timeout
from tests.integ.vpc_test_utils import get_or_create_vpc_resources
from tests.integ.vpc_test_utils import get_or_create_vpc_resources, setup_security_group_for_encryption

DATA_PATH = os.path.join(DATA_DIR, 'iris', 'data')

Expand Down Expand Up @@ -149,6 +149,8 @@ def test_tf_vpc_multi(sagemaker_session, tf_full_version):
subnet_ids, security_group_id = get_or_create_vpc_resources(ec2_client,
sagemaker_session.boto_session.region_name)

setup_security_group_for_encryption(ec2_client, security_group_id)

estimator = TensorFlow(entry_point=script_path,
role='SageMakerRole',
framework_version=tf_full_version,
Expand All @@ -160,7 +162,8 @@ def test_tf_vpc_multi(sagemaker_session, tf_full_version):
sagemaker_session=sagemaker_session,
base_job_name='test-vpc-tf',
subnets=subnet_ids,
security_group_ids=[security_group_id])
security_group_ids=[security_group_id],
encrypt_inter_container_traffic=True)

with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES):
estimator.fit(train_input)
Expand All @@ -170,6 +173,7 @@ def test_tf_vpc_multi(sagemaker_session, tf_full_version):
TrainingJobName=estimator.latest_training_job.name)
assert set(subnet_ids) == set(job_desc['VpcConfig']['Subnets'])
assert [security_group_id] == job_desc['VpcConfig']['SecurityGroupIds']
assert job_desc['EnableInterContainerTrafficEncryption'] is True

endpoint_name = estimator.latest_training_job.name
with timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session):
Expand Down
14 changes: 14 additions & 0 deletions tests/integ/vpc_test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,17 @@ def get_or_create_vpc_resources(ec2_client, region, name=VPC_NAME):
else:
print('creating new vpc: {}'.format(name))
return _create_vpc_with_name(ec2_client, region, name)


def setup_security_group_for_encryption(ec2_client, security_group_id):
sg_desc = ec2_client.describe_security_groups(GroupIds=[security_group_id])
ingress_perms = sg_desc['SecurityGroups'][0]['IpPermissions']
if len(ingress_perms) == 1:
ec2_client.\
authorize_security_group_ingress(GroupId=security_group_id,
IpPermissions=[{'IpProtocol': '50',
'UserIdGroupPairs': [{'GroupId': security_group_id}]},
{'IpProtocol': 'udp',
'FromPort': 500,
'ToPort': 500,
'UserIdGroupPairs': [{'GroupId': security_group_id}]}])
19 changes: 19 additions & 0 deletions tests/unit/test_algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -894,3 +894,22 @@ def test_algorithm_enable_network_isolation_with_product_id(sagemaker_session):

network_isolation = estimator.enable_network_isolation()
assert network_isolation is True


def test_algorithm_encrypt_inter_container_traffic(sagemaker_session):
response = copy.deepcopy(DESCRIBE_ALGORITHM_RESPONSE)
response['encrypt_inter_container_traffic'] = True
sagemaker_session.sagemaker_client.describe_algorithm = Mock(
return_value=response)

estimator = AlgorithmEstimator(
algorithm_arn='arn:aws:sagemaker:us-east-2:1234:algorithm/scikit-decision-trees',
role='SageMakerRole',
train_instance_type='ml.m4.xlarge',
train_instance_count=1,
sagemaker_session=sagemaker_session,
encrypt_inter_container_traffic=True
)

encrypt_inter_container_traffic = estimator.encrypt_inter_container_traffic
assert encrypt_inter_container_traffic is True
Loading