22
22
import uuid
23
23
import dill
24
24
import inspect
25
+ import shutil
25
26
from pathlib import Path
26
27
27
28
from typing import List , Dict , Optional , Tuple , Callable , Union
28
29
from cortex .binary import run_cli , get_cli_path
29
30
from cortex import util
30
31
32
+ # Change if PYTHONVERSION changes
33
+ EXPECTED_PYTHON_VERSION = "3.6.9"
34
+
31
35
32
36
class Client :
33
37
def __init__ (self , env : str ):
@@ -44,10 +48,10 @@ def deploy(
44
48
self ,
45
49
api_spec : dict ,
46
50
predictor = None ,
47
- pip_dependencies = [],
48
- conda_dependencies = [],
51
+ requirements = [],
52
+ conda_packages = [],
49
53
project_dir : Optional [str ] = None ,
50
- force : bool = False ,
54
+ force : bool = True ,
51
55
wait : bool = False ,
52
56
) -> list :
53
57
"""
@@ -61,8 +65,8 @@ def deploy(
61
65
predictor: A Cortex Predictor class implementation. Not required when deploying a traffic splitter.
62
66
→ Realtime API: https://docs.cortex.dev/v/master/deployments/realtime-api/predictors
63
67
→ Batch API: https://docs.cortex.dev/v/master/deployments/batch-api/predictors
64
- pip_dependencies : A list of PyPI dependencies that will be installed before the predictor class implementation is invoked.
65
- conda_dependencies : A list of Conda dependencies that will be installed before the predictor class implementation is invoked.
68
+ requirements : A list of PyPI dependencies that will be installed before the predictor class implementation is invoked.
69
+ conda_packages : A list of Conda dependencies that will be installed before the predictor class implementation is invoked.
66
70
project_dir: Path to a python project.
67
71
force: Override any in-progress api updates.
68
72
wait: Streams logs until the APIs are ready.
@@ -83,62 +87,67 @@ def deploy(
83
87
yaml .dump ([api_spec ], f ) # write a list
84
88
return self ._deploy (cortex_yaml_path , force , wait )
85
89
86
- project_dir = Path .home () / ".cortex" / "deployments" / str (uuid .uuid4 ())
87
- with util .open_tempdir (str (project_dir )):
88
- cortex_yaml_path = os .path .join (project_dir , "cortex.yaml" )
89
-
90
- if predictor is None :
91
- # for deploying a traffic splitter
92
- with open (cortex_yaml_path , "w" ) as f :
93
- yaml .dump ([api_spec ], f ) # write a list
94
- return self ._deploy (cortex_yaml_path , force = force , wait = wait )
95
-
96
- # Change if PYTHONVERSION changes
97
- expected_version = "3.6"
98
- actual_version = f"{ sys .version_info .major } .{ sys .version_info .minor } "
99
- if actual_version < expected_version :
100
- raise Exception ("cortex is only supported for python versions >= 3.6" ) # unexpected
101
- if actual_version > expected_version :
102
- is_python_set = any (
103
- conda_dep .startswith ("python=" ) or "::python=" in conda_dep
104
- for conda_dep in conda_dependencies
105
- )
106
-
107
- if not is_python_set :
108
- conda_dependencies = [
109
- f"conda-forge::python={ sys .version_info .major } .{ sys .version_info .minor } .{ sys .version_info .micro } "
110
- ] + conda_dependencies
111
-
112
- if len (pip_dependencies ) > 0 :
113
- with open (project_dir / "requirements.txt" , "w" ) as requirements_file :
114
- requirements_file .write ("\n " .join (pip_dependencies ))
115
-
116
- if len (conda_dependencies ) > 0 :
117
- with open (project_dir / "conda-packages.txt" , "w" ) as conda_file :
118
- conda_file .write ("\n " .join (conda_dependencies ))
119
-
120
- if not inspect .isclass (predictor ):
121
- raise ValueError ("predictor parameter must be a class definition" )
122
-
123
- with open (project_dir / "predictor.pickle" , "wb" ) as pickle_file :
124
- dill .dump (predictor , pickle_file )
125
- if api_spec .get ("predictor" ) is None :
126
- api_spec ["predictor" ] = {}
127
-
128
- if predictor .__name__ == "PythonPredictor" :
129
- predictor_type = "python"
130
- if predictor .__name__ == "TensorFlowPredictor" :
131
- predictor_type = "tensorflow"
132
- if predictor .__name__ == "ONNXPredictor" :
133
- predictor_type = "onnx"
134
-
135
- api_spec ["predictor" ]["path" ] = "predictor.pickle"
136
- api_spec ["predictor" ]["type" ] = predictor_type
90
+ if api_spec .get ("name" ) is None :
91
+ raise ValueError ("`api_spec` must have the `name` key set" )
92
+
93
+ project_dir = Path .home () / ".cortex" / "deployments" / api_spec ["name" ]
94
+
95
+ if project_dir .exists ():
96
+ shutil .rmtree (str (project_dir ))
97
+
98
+ project_dir .mkdir (parents = True )
99
+
100
+ cortex_yaml_path = os .path .join (project_dir , "cortex.yaml" )
137
101
102
+ if predictor is None :
103
+ # for deploying a traffic splitter
138
104
with open (cortex_yaml_path , "w" ) as f :
139
105
yaml .dump ([api_spec ], f ) # write a list
140
106
return self ._deploy (cortex_yaml_path , force = force , wait = wait )
141
107
108
+ actual_version = (
109
+ f"{ sys .version_info .major } .{ sys .version_info .minor } .{ sys .version_info .micro } "
110
+ )
111
+
112
+ if actual_version != EXPECTED_PYTHON_VERSION :
113
+ is_python_set = any (
114
+ conda_dep .startswith ("python=" ) or "::python=" in conda_dep
115
+ for conda_dep in conda_packages
116
+ )
117
+
118
+ if not is_python_set :
119
+ conda_packages = [f"python={ actual_version } " ] + conda_packages
120
+
121
+ if len (requirements ) > 0 :
122
+ with open (project_dir / "requirements.txt" , "w" ) as requirements_file :
123
+ requirements_file .write ("\n " .join (requirements ))
124
+
125
+ if len (conda_packages ) > 0 :
126
+ with open (project_dir / "conda-packages.txt" , "w" ) as conda_file :
127
+ conda_file .write ("\n " .join (conda_packages ))
128
+
129
+ if not inspect .isclass (predictor ):
130
+ raise ValueError ("predictor parameter must be a class definition" )
131
+
132
+ with open (project_dir / "predictor.pickle" , "wb" ) as pickle_file :
133
+ dill .dump (predictor , pickle_file )
134
+ if api_spec .get ("predictor" ) is None :
135
+ api_spec ["predictor" ] = {}
136
+
137
+ if predictor .__name__ == "PythonPredictor" :
138
+ predictor_type = "python"
139
+ if predictor .__name__ == "TensorFlowPredictor" :
140
+ predictor_type = "tensorflow"
141
+ if predictor .__name__ == "ONNXPredictor" :
142
+ predictor_type = "onnx"
143
+
144
+ api_spec ["predictor" ]["path" ] = "predictor.pickle"
145
+ api_spec ["predictor" ]["type" ] = predictor_type
146
+
147
+ with open (cortex_yaml_path , "w" ) as f :
148
+ yaml .dump ([api_spec ], f ) # write a list
149
+ return self ._deploy (cortex_yaml_path , force = force , wait = wait )
150
+
142
151
def _deploy (
143
152
self ,
144
153
config_file : str ,
@@ -164,6 +173,7 @@ def _deploy(
164
173
self .env ,
165
174
"-o" ,
166
175
"mixed" ,
176
+ "-y" ,
167
177
]
168
178
169
179
if force :
@@ -173,42 +183,44 @@ def _deploy(
173
183
174
184
deploy_results = json .loads (output .strip ())
175
185
186
+ deploy_result = deploy_results [0 ]
187
+
176
188
if not wait :
177
- return deploy_results
189
+ return deploy_result
178
190
179
191
def stream_to_stdout (process ):
180
192
for c in iter (lambda : process .stdout .read (1 ), "" ):
181
193
sys .stdout .write (c )
182
-
183
- for deploy_result in deploy_results :
184
- api_name = deploy_result ["api" ]["spec" ]["name" ]
185
- kind = deploy_result ["api" ]["spec" ]["kind" ]
186
- if kind != "RealtimeAPI" :
187
- continue
188
-
189
- env = os . environ . copy ()
190
- env [ "CORTEX_CLI_INVOKER" ] = "python"
191
- process = subprocess . Popen (
192
- [ get_cli_path (), "logs" , "--env" , self . env , api_name ] ,
193
- stderr = subprocess .STDOUT ,
194
- stdout = subprocess . PIPE ,
195
- encoding = " utf8" ,
196
- env = env ,
197
- )
198
-
199
- streamer = threading .Thread (target = stream_to_stdout , args = [process ])
200
- streamer .start ()
201
-
202
- while process .poll () is None :
203
- api = self .get_api (api_name )
204
- if api ["status" ]["status_code" ] != "status_updating" :
205
- if api [ "status" ][ "status_code" ] == "status_live" :
206
- time . sleep ( 2 )
207
- process . terminate ()
208
- break
209
- time . sleep ( 2 )
210
-
211
- return deploy_results
194
+ sys . stdout . flush ()
195
+
196
+ api_name = deploy_result ["api" ]["spec" ]["name" ]
197
+ if deploy_result ["api" ]["spec" ]["kind" ] != "RealtimeAPI" :
198
+ return deploy_result
199
+
200
+ env = os . environ . copy ()
201
+ env [ "CORTEX_CLI_INVOKER" ] = "python"
202
+ process = subprocess . Popen (
203
+ [ get_cli_path (), "logs" , "--env" , self . env , api_name ],
204
+ stderr = subprocess . STDOUT ,
205
+ stdout = subprocess .PIPE ,
206
+ encoding = "utf8" ,
207
+ errors = "replace" , # replace non- utf8 characters with `?` instead of failing
208
+ env = env ,
209
+ )
210
+
211
+ streamer = threading .Thread (target = stream_to_stdout , args = [process ])
212
+ streamer .start ()
213
+
214
+ while process .poll () is None :
215
+ api = self .get_api (api_name )
216
+ if api ["status" ]["status_code" ] != "status_updating" :
217
+ time . sleep ( 10 ) # wait for logs to stream
218
+ process . terminate ( )
219
+ break
220
+ time . sleep ( 5 )
221
+ streamer . join ( timeout = 10 )
222
+
223
+ return api
212
224
213
225
def get_api (self , api_name : str ) -> dict :
214
226
"""
0 commit comments