-
Notifications
You must be signed in to change notification settings - Fork 1.2k
add hub and hubcontent support in retrieval function for jumpstart model cache #4438
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
evakravi
merged 6 commits into
aws:master-jumpstart-curated-hub
from
bencrabtree:feat/jsch-retrieval-cache-update
Feb 21, 2024
Merged
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
032cb80
add hub and hubcontent support in retrieval function for jumpstart mo…
bencrabtree ef042d9
update types and var names
bencrabtree 49ae11b
update linter
bencrabtree 6175087
linter
bencrabtree 4c9b2d0
linter
bencrabtree 63345ea
flake8 check
bencrabtree File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,7 +15,7 @@ | |
import datetime | ||
from difflib import get_close_matches | ||
import os | ||
from typing import List, Optional, Tuple, Union | ||
from typing import Any, Dict, List, Optional, Tuple, Union | ||
import json | ||
import boto3 | ||
import botocore | ||
|
@@ -29,6 +29,7 @@ | |
JUMPSTART_LOGGER, | ||
MODEL_ID_LIST_WEB_URL, | ||
) | ||
from sagemaker.jumpstart.curated_hub.curated_hub import CuratedHub | ||
from sagemaker.jumpstart.exceptions import get_wildcard_model_version_msg | ||
from sagemaker.jumpstart.parameters import ( | ||
JUMPSTART_DEFAULT_MAX_S3_CACHE_ITEMS, | ||
|
@@ -37,12 +38,13 @@ | |
JUMPSTART_DEFAULT_SEMANTIC_VERSION_CACHE_EXPIRATION_HORIZON, | ||
) | ||
from sagemaker.jumpstart.types import ( | ||
JumpStartCachedS3ContentKey, | ||
JumpStartCachedS3ContentValue, | ||
JumpStartCachedContentKey, | ||
JumpStartCachedContentValue, | ||
JumpStartModelHeader, | ||
JumpStartModelSpecs, | ||
JumpStartS3FileType, | ||
JumpStartVersionedModelId, | ||
HubDataType, | ||
) | ||
from sagemaker.jumpstart import utils | ||
from sagemaker.utilities.cache import LRUCache | ||
|
@@ -95,7 +97,7 @@ def __init__( | |
""" | ||
|
||
self._region = region | ||
self._s3_cache = LRUCache[JumpStartCachedS3ContentKey, JumpStartCachedS3ContentValue]( | ||
self._content_cache = LRUCache[JumpStartCachedContentKey, JumpStartCachedContentValue]( | ||
max_cache_items=max_s3_cache_items, | ||
expiration_horizon=s3_cache_expiration_horizon, | ||
retrieval_function=self._retrieval_function, | ||
|
@@ -172,8 +174,8 @@ def _get_manifest_key_from_model_id_semantic_version( | |
|
||
model_id, version = key.model_id, key.version | ||
|
||
manifest = self._s3_cache.get( | ||
JumpStartCachedS3ContentKey(JumpStartS3FileType.MANIFEST, self._manifest_file_s3_key) | ||
manifest = self._content_cache.get( | ||
JumpStartCachedContentKey(JumpStartS3FileType.MANIFEST, self._manifest_file_s3_key) | ||
)[0].formatted_content | ||
|
||
sm_version = utils.get_sagemaker_version() | ||
|
@@ -301,50 +303,71 @@ def _get_json_file_from_local_override( | |
|
||
def _retrieval_function( | ||
self, | ||
key: JumpStartCachedS3ContentKey, | ||
value: Optional[JumpStartCachedS3ContentValue], | ||
) -> JumpStartCachedS3ContentValue: | ||
"""Return s3 content given a file type and s3_key in ``JumpStartCachedS3ContentKey``. | ||
key: JumpStartCachedContentKey, | ||
value: Optional[JumpStartCachedContentValue], | ||
) -> JumpStartCachedContentValue: | ||
"""Return s3 content given a data type and s3_key in ``JumpStartCachedContentKey``. | ||
|
||
If a manifest file is being fetched, we only download the object if the md5 hash in | ||
bencrabtree marked this conversation as resolved.
Show resolved
Hide resolved
|
||
``head_object`` does not match the current md5 hash for the stored value. This prevents | ||
unnecessarily downloading the full manifest when it hasn't changed. | ||
|
||
Args: | ||
key (JumpStartCachedS3ContentKey): key for which to fetch s3 content. | ||
key (JumpStartCachedContentKey): key for which to fetch JumpStart content. | ||
value (Optional[JumpStartVersionedModelId]): Current value of old cached | ||
s3 content. This is used for the manifest file, so that it is only | ||
downloaded when its content changes. | ||
""" | ||
|
||
file_type, s3_key = key.file_type, key.s3_key | ||
data_type, id_info = key.data_type, key.id_info | ||
|
||
if file_type == JumpStartS3FileType.MANIFEST: | ||
if data_type == JumpStartS3FileType.MANIFEST: | ||
if value is not None and not self._is_local_metadata_mode(): | ||
etag = self._get_json_md5_hash(s3_key) | ||
etag = self._get_json_md5_hash(id_info) | ||
if etag == value.md5_hash: | ||
return value | ||
formatted_body, etag = self._get_json_file(s3_key, file_type) | ||
return JumpStartCachedS3ContentValue( | ||
formatted_body, etag = self._get_json_file(id_info, data_type) | ||
return JumpStartCachedContentValue( | ||
formatted_content=utils.get_formatted_manifest(formatted_body), | ||
md5_hash=etag, | ||
) | ||
if file_type == JumpStartS3FileType.SPECS: | ||
formatted_body, _ = self._get_json_file(s3_key, file_type) | ||
if data_type == JumpStartS3FileType.SPECS: | ||
formatted_body, _ = self._get_json_file(id_info, data_type) | ||
model_specs = JumpStartModelSpecs(formatted_body) | ||
utils.emit_logs_based_on_model_specs(model_specs, self.get_region(), self._s3_client) | ||
return JumpStartCachedS3ContentValue( | ||
return JumpStartCachedContentValue( | ||
formatted_content=model_specs | ||
) | ||
if data_type == HubDataType.MODEL: | ||
hub_name, region, model_name, model_version = utils.extract_info_from_hub_content_arn( | ||
id_info | ||
) | ||
hub = CuratedHub(hub_name=hub_name, region=region) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's weird that we're instantiating the |
||
hub_content = hub.describe_model(model_name=model_name, model_version=model_version) | ||
utils.emit_logs_based_on_model_specs( | ||
hub_content.content_document, | ||
self.get_region(), | ||
self._s3_client | ||
) | ||
model_specs = JumpStartModelSpecs(hub_content.content_document, is_hub_content=True) | ||
return JumpStartCachedContentValue( | ||
formatted_content=model_specs | ||
) | ||
if data_type == HubDataType.HUB: | ||
hub_name, region, _, _ = utils.extract_info_from_hub_content_arn(id_info) | ||
hub = CuratedHub(hub_name=hub_name, region=region) | ||
hub_info = hub.describe() | ||
return JumpStartCachedContentValue(formatted_content=hub_info) | ||
raise ValueError( | ||
f"Bad value for key '{key}': must be in {[JumpStartS3FileType.MANIFEST, JumpStartS3FileType.SPECS]}" | ||
f"Bad value for key '{key}': must be in", | ||
f"{[JumpStartS3FileType.MANIFEST, JumpStartS3FileType.SPECS, HubDataType.HUB, HubDataType.MODEL]}" | ||
bencrabtree marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) | ||
|
||
def get_manifest(self) -> List[JumpStartModelHeader]: | ||
"""Return entire JumpStart models manifest.""" | ||
|
||
manifest_dict = self._s3_cache.get( | ||
JumpStartCachedS3ContentKey(JumpStartS3FileType.MANIFEST, self._manifest_file_s3_key) | ||
manifest_dict = self._content_cache.get( | ||
JumpStartCachedContentKey(JumpStartS3FileType.MANIFEST, self._manifest_file_s3_key) | ||
)[0].formatted_content | ||
manifest = list(manifest_dict.values()) # type: ignore | ||
return manifest | ||
|
@@ -407,8 +430,8 @@ def _get_header_impl( | |
JumpStartVersionedModelId(model_id, semantic_version_str) | ||
)[0] | ||
|
||
manifest = self._s3_cache.get( | ||
JumpStartCachedS3ContentKey(JumpStartS3FileType.MANIFEST, self._manifest_file_s3_key) | ||
manifest = self._content_cache.get( | ||
JumpStartCachedContentKey(JumpStartS3FileType.MANIFEST, self._manifest_file_s3_key) | ||
)[0].formatted_content | ||
try: | ||
header = manifest[versioned_model_id] # type: ignore | ||
|
@@ -430,8 +453,8 @@ def get_specs(self, model_id: str, semantic_version_str: str) -> JumpStartModelS | |
|
||
header = self.get_header(model_id, semantic_version_str) | ||
spec_key = header.spec_key | ||
specs, cache_hit = self._s3_cache.get( | ||
JumpStartCachedS3ContentKey(JumpStartS3FileType.SPECS, spec_key) | ||
specs, cache_hit = self._content_cache.get( | ||
JumpStartCachedContentKey(JumpStartS3FileType.SPECS, spec_key) | ||
) | ||
if not cache_hit and "*" in semantic_version_str: | ||
JUMPSTART_LOGGER.warning( | ||
|
@@ -443,7 +466,29 @@ def get_specs(self, model_id: str, semantic_version_str: str) -> JumpStartModelS | |
) | ||
return specs.formatted_content | ||
|
||
def get_hub_model(self, hub_model_arn: str) -> JumpStartModelSpecs: | ||
"""Return JumpStart-compatible specs for a given Hub model | ||
|
||
Args: | ||
hub_model_arn (str): Arn for the Hub model to get specs for | ||
""" | ||
|
||
details, _ = self._content_cache.get( | ||
JumpStartCachedContentKey(HubDataType.MODEL, hub_model_arn) | ||
) | ||
return details.formatted_content | ||
|
||
def get_hub(self, hub_arn: str) -> Dict[str, Any]: | ||
"""Return descriptive info for a given Hub | ||
|
||
Args: | ||
hub_arn (str): Arn for the Hub to get info for | ||
""" | ||
|
||
details, _ = self._content_cache.get(JumpStartCachedContentKey(HubDataType.HUB, hub_arn)) | ||
return details.formatted_content | ||
|
||
def clear(self) -> None: | ||
"""Clears the model ID/version and s3 cache.""" | ||
self._s3_cache.clear() | ||
self._content_cache.clear() | ||
self._model_id_semantic_version_manifest_key_cache.clear() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"). You | ||
# may not use this file except in compliance with the License. A copy of | ||
# the License is located at | ||
# | ||
# http://aws.amazon.com/apache2.0/ | ||
# | ||
# or in the "license" file accompanying this file. This file is | ||
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF | ||
# ANY KIND, either express or implied. See the License for the specific | ||
# language governing permissions and limitations under the License. | ||
"""This module provides the JumpStart Curated Hub class.""" | ||
from __future__ import absolute_import | ||
|
||
from typing import Optional, Dict, Any | ||
|
||
from sagemaker.session import Session | ||
|
||
|
||
class CuratedHub: | ||
"""Class for creating and managing a curated JumpStart hub""" | ||
|
||
def __init__(self, hub_name: str, region: str, session: Optional[Session] = None): | ||
self.hub_name = hub_name | ||
self.region = region | ||
self.session = session | ||
self._sm_session = session or Session() | ||
bencrabtree marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def describe_model(self, model_name: str, model_version: str = "*") -> Dict[str, Any]: | ||
"""Returns descriptive information about the Hub Model""" | ||
|
||
hub_content = self._sm_session.describe_hub_content( | ||
model_name, "Model", self.hub_name, model_version | ||
) | ||
|
||
# TODO: Parse HubContent | ||
# TODO: Parse HubContentDocument | ||
|
||
return hub_content | ||
|
||
def describe(self) -> Dict[str, Any]: | ||
"""Returns descriptive information about the Hub""" | ||
|
||
hub_info = self._sm_session.describe_hub(hub_name=self.hub_name) | ||
|
||
# TODO: Validations? | ||
|
||
return hub_info |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
Empty file.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.