Skip to content

Improve python client cross python version compatibility #1640

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Dec 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions dev/deploy_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Copyright 2020 Cortex Labs, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# USAGE: python ./dev/deploy_test.py <env_name>
# e.g.: python ./dev/deploy_test.py aws

import os
import cortex
import sys
import requests

cx = cortex.client(sys.argv[1])
api_config = {
"name": "text-generator",
"kind": "RealtimeAPI",
}


class PythonPredictor:
def __init__(self, config):
from transformers import pipeline

self.model = pipeline(task="text-generation")

def predict(self, payload):
return self.model(payload["text"])[0]


api = cx.deploy(
api_config,
predictor=PythonPredictor,
requirements=["torch", "transformers"],
wait=True,
)

response = requests.post(
api["endpoint"],
json={"text": "machine learning is great because"},
)

print(response.status_code)
print(response.text)

cx.delete_api(api_config["name"])
47 changes: 47 additions & 0 deletions dev/python_version_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/bin/bash

# Copyright 2020 Cortex Labs, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


# USAGE: ./dev/python_version_test.sh <python version> <env_name>
# e.g.: ./dev/python_version_test.sh 3.6.9 aws

set -e

ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")"/.. >/dev/null && pwd)"

# create a new conda environment based on the supplied python version
conda create -n env -y
CONDA_BASE=$(conda info --base)
source $CONDA_BASE/etc/profile.d/conda.sh
conda activate env
conda config --append channels conda-forge
conda install python=$1 -y

pip install requests

export CORTEX_CLI_PATH=$ROOT/bin/cortex

# install cortex
cd $ROOT/pkg/workloads/cortex/client
pip install -e .

# run script.py
python $ROOT/dev/deploy_test.py $2

# clean up conda
conda deactivate
conda env remove -n env
rm -rf $ROOT/pkg/workloads/cortex/client/cortex.egg-info
6 changes: 3 additions & 3 deletions docs/miscellaneous/python-client.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ Delete an environment configured on this machine.
<!-- CORTEX_VERSION_MINOR x5 -->

```python
| deploy(api_spec: dict, predictor=None, pip_dependencies=[], conda_dependencies=[], project_dir: Optional[str] = None, force: bool = False, wait: bool = False) -> list
| deploy(api_spec: dict, predictor=None, requirements=[], conda_packages=[], project_dir: Optional[str] = None, force: bool = True, wait: bool = False) -> list
```

Deploy an API.
Expand All @@ -121,8 +121,8 @@ Deploy an API.
- `predictor` - A Cortex Predictor class implementation. Not required when deploying a traffic splitter.
→ Realtime API: https://docs.cortex.dev/v/master/deployments/realtime-api/predictors
→ Batch API: https://docs.cortex.dev/v/master/deployments/batch-api/predictors
- `pip_dependencies` - A list of PyPI dependencies that will be installed before the predictor class implementation is invoked.
- `conda_dependencies` - A list of Conda dependencies that will be installed before the predictor class implementation is invoked.
- `requirements` - A list of PyPI dependencies that will be installed before the predictor class implementation is invoked.
- `conda_packages` - A list of Conda dependencies that will be installed before the predictor class implementation is invoked.
- `project_dir` - Path to a python project.
- `force` - Override any in-progress api updates.
- `wait` - Streams logs until the APIs are ready.
Expand Down
186 changes: 99 additions & 87 deletions pkg/workloads/cortex/client/cortex/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@
import uuid
import dill
import inspect
import shutil
from pathlib import Path

from typing import List, Dict, Optional, Tuple, Callable, Union
from cortex.binary import run_cli, get_cli_path
from cortex import util

# Change if PYTHONVERSION changes
EXPECTED_PYTHON_VERSION = "3.6.9"


class Client:
def __init__(self, env: str):
Expand All @@ -44,10 +48,10 @@ def deploy(
self,
api_spec: dict,
predictor=None,
pip_dependencies=[],
conda_dependencies=[],
requirements=[],
conda_packages=[],
project_dir: Optional[str] = None,
force: bool = False,
force: bool = True,
wait: bool = False,
) -> list:
"""
Expand All @@ -61,8 +65,8 @@ def deploy(
predictor: A Cortex Predictor class implementation. Not required when deploying a traffic splitter.
→ Realtime API: https://docs.cortex.dev/v/master/deployments/realtime-api/predictors
→ Batch API: https://docs.cortex.dev/v/master/deployments/batch-api/predictors
pip_dependencies: A list of PyPI dependencies that will be installed before the predictor class implementation is invoked.
conda_dependencies: A list of Conda dependencies that will be installed before the predictor class implementation is invoked.
requirements: A list of PyPI dependencies that will be installed before the predictor class implementation is invoked.
conda_packages: A list of Conda dependencies that will be installed before the predictor class implementation is invoked.
project_dir: Path to a python project.
force: Override any in-progress api updates.
wait: Streams logs until the APIs are ready.
Expand All @@ -83,62 +87,67 @@ def deploy(
yaml.dump([api_spec], f) # write a list
return self._deploy(cortex_yaml_path, force, wait)

project_dir = Path.home() / ".cortex" / "deployments" / str(uuid.uuid4())
with util.open_tempdir(str(project_dir)):
cortex_yaml_path = os.path.join(project_dir, "cortex.yaml")

if predictor is None:
# for deploying a traffic splitter
with open(cortex_yaml_path, "w") as f:
yaml.dump([api_spec], f) # write a list
return self._deploy(cortex_yaml_path, force=force, wait=wait)

# Change if PYTHONVERSION changes
expected_version = "3.6"
actual_version = f"{sys.version_info.major}.{sys.version_info.minor}"
if actual_version < expected_version:
raise Exception("cortex is only supported for python versions >= 3.6") # unexpected
if actual_version > expected_version:
is_python_set = any(
conda_dep.startswith("python=") or "::python=" in conda_dep
for conda_dep in conda_dependencies
)

if not is_python_set:
conda_dependencies = [
f"conda-forge::python={sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
] + conda_dependencies

if len(pip_dependencies) > 0:
with open(project_dir / "requirements.txt", "w") as requirements_file:
requirements_file.write("\n".join(pip_dependencies))

if len(conda_dependencies) > 0:
with open(project_dir / "conda-packages.txt", "w") as conda_file:
conda_file.write("\n".join(conda_dependencies))

if not inspect.isclass(predictor):
raise ValueError("predictor parameter must be a class definition")

with open(project_dir / "predictor.pickle", "wb") as pickle_file:
dill.dump(predictor, pickle_file)
if api_spec.get("predictor") is None:
api_spec["predictor"] = {}

if predictor.__name__ == "PythonPredictor":
predictor_type = "python"
if predictor.__name__ == "TensorFlowPredictor":
predictor_type = "tensorflow"
if predictor.__name__ == "ONNXPredictor":
predictor_type = "onnx"

api_spec["predictor"]["path"] = "predictor.pickle"
api_spec["predictor"]["type"] = predictor_type
if api_spec.get("name") is None:
raise ValueError("`api_spec` must have the `name` key set")

project_dir = Path.home() / ".cortex" / "deployments" / api_spec["name"]

if project_dir.exists():
shutil.rmtree(str(project_dir))

project_dir.mkdir(parents=True)

cortex_yaml_path = os.path.join(project_dir, "cortex.yaml")

if predictor is None:
# for deploying a traffic splitter
with open(cortex_yaml_path, "w") as f:
yaml.dump([api_spec], f) # write a list
return self._deploy(cortex_yaml_path, force=force, wait=wait)

actual_version = (
f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
)

if actual_version != EXPECTED_PYTHON_VERSION:
is_python_set = any(
conda_dep.startswith("python=") or "::python=" in conda_dep
for conda_dep in conda_packages
)

if not is_python_set:
conda_packages = [f"python={actual_version}"] + conda_packages

if len(requirements) > 0:
with open(project_dir / "requirements.txt", "w") as requirements_file:
requirements_file.write("\n".join(requirements))

if len(conda_packages) > 0:
with open(project_dir / "conda-packages.txt", "w") as conda_file:
conda_file.write("\n".join(conda_packages))

if not inspect.isclass(predictor):
raise ValueError("predictor parameter must be a class definition")

with open(project_dir / "predictor.pickle", "wb") as pickle_file:
dill.dump(predictor, pickle_file)
if api_spec.get("predictor") is None:
api_spec["predictor"] = {}

if predictor.__name__ == "PythonPredictor":
predictor_type = "python"
if predictor.__name__ == "TensorFlowPredictor":
predictor_type = "tensorflow"
if predictor.__name__ == "ONNXPredictor":
predictor_type = "onnx"

api_spec["predictor"]["path"] = "predictor.pickle"
api_spec["predictor"]["type"] = predictor_type

with open(cortex_yaml_path, "w") as f:
yaml.dump([api_spec], f) # write a list
return self._deploy(cortex_yaml_path, force=force, wait=wait)

def _deploy(
self,
config_file: str,
Expand All @@ -164,6 +173,7 @@ def _deploy(
self.env,
"-o",
"mixed",
"-y",
]

if force:
Expand All @@ -173,42 +183,44 @@ def _deploy(

deploy_results = json.loads(output.strip())

deploy_result = deploy_results[0]

if not wait:
return deploy_results
return deploy_result

def stream_to_stdout(process):
for c in iter(lambda: process.stdout.read(1), ""):
sys.stdout.write(c)

for deploy_result in deploy_results:
api_name = deploy_result["api"]["spec"]["name"]
kind = deploy_result["api"]["spec"]["kind"]
if kind != "RealtimeAPI":
continue

env = os.environ.copy()
env["CORTEX_CLI_INVOKER"] = "python"
process = subprocess.Popen(
[get_cli_path(), "logs", "--env", self.env, api_name],
stderr=subprocess.STDOUT,
stdout=subprocess.PIPE,
encoding="utf8",
env=env,
)

streamer = threading.Thread(target=stream_to_stdout, args=[process])
streamer.start()

while process.poll() is None:
api = self.get_api(api_name)
if api["status"]["status_code"] != "status_updating":
if api["status"]["status_code"] == "status_live":
time.sleep(2)
process.terminate()
break
time.sleep(2)

return deploy_results
sys.stdout.flush()

api_name = deploy_result["api"]["spec"]["name"]
if deploy_result["api"]["spec"]["kind"] != "RealtimeAPI":
return deploy_result

env = os.environ.copy()
env["CORTEX_CLI_INVOKER"] = "python"
process = subprocess.Popen(
[get_cli_path(), "logs", "--env", self.env, api_name],
stderr=subprocess.STDOUT,
stdout=subprocess.PIPE,
encoding="utf8",
errors="replace", # replace non-utf8 characters with `?` instead of failing
env=env,
)

streamer = threading.Thread(target=stream_to_stdout, args=[process])
streamer.start()

while process.poll() is None:
api = self.get_api(api_name)
if api["status"]["status_code"] != "status_updating":
time.sleep(10) # wait for logs to stream
process.terminate()
break
time.sleep(5)
streamer.join(timeout=10)

return api

def get_api(self, api_name: str) -> dict:
"""
Expand Down
5 changes: 5 additions & 0 deletions pkg/workloads/cortex/serve/init/bootloader.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,19 @@ if [ -f "/mnt/project/conda-packages.txt" ]; then
py_version_cmd='echo $(python -c "import sys; v=sys.version_info[:2]; print(\"{}.{}\".format(*v));")'
old_py_version=$(eval $py_version_cmd)

# look for packages in defaults and then conda-forge to improve chances of finding the package (specifically for python reinstalls)
conda config --append channels conda-forge

conda install -y --file /mnt/project/conda-packages.txt

new_py_version=$(eval $py_version_cmd)

# reinstall core packages if Python version has changed
if [ $old_py_version != $new_py_version ]; then
echo "warning: you have changed the Python version from $old_py_version to $new_py_version; this may break Cortex's web server"
echo "reinstalling core packages ..."
pip --no-cache-dir install -r /src/cortex/serve/requirements.txt

rm -rf $CONDA_PREFIX/lib/python${old_py_version} # previous python is no longer needed
fi
fi
Expand Down