24
24
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
25
25
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
26
27
+ import queue
28
+ import threading
27
29
from typing import Dict , List , Union
28
30
29
31
import triton_python_backend_utils as pb_utils
@@ -170,11 +172,18 @@ def __init__(self, labels: List[str], max_model_len: int):
170
172
class VllmStatLogger (VllmStatLoggerBase ):
171
173
"""StatLogger is used as an adapter between vLLM stats collector and Triton metrics provider."""
172
174
173
- # local_interval not used here. It's for vLLM logs to stdout.
174
- def __init__ (self , labels : Dict , max_model_len : int ) -> None :
175
+ def __init__ (self , labels : Dict , max_model_len : int , log_logger ) -> None :
175
176
# Tracked stats over current local logging interval.
177
+ # local_interval not used here. It's for vLLM logs to stdout.
176
178
super ().__init__ (local_interval = 0 )
177
179
self .metrics = TritonMetrics (labels , max_model_len )
180
+ self .log_logger = log_logger
181
+
182
+ # Starting the metrics thread. It allows vLLM to keep making progress
183
+ # while reporting metrics to triton metrics service.
184
+ self ._logger_queue = queue .Queue ()
185
+ self ._logger_thread = threading .Thread (target = self .logger_loop )
186
+ self ._logger_thread .start ()
178
187
179
188
def info (self , type : str , obj : SupportsMetricsInfo ) -> None :
180
189
pass
@@ -190,7 +199,7 @@ def _log_counter(self, counter, data: Union[int, float]) -> None:
190
199
None
191
200
"""
192
201
if data != 0 :
193
- counter . increment ( data )
202
+ self . _logger_queue . put_nowait (( counter , "increment" , data ) )
194
203
195
204
def _log_histogram (self , histogram , data : Union [List [int ], List [float ]]) -> None :
196
205
"""Convenience function for logging list to histogram.
@@ -203,7 +212,7 @@ def _log_histogram(self, histogram, data: Union[List[int], List[float]]) -> None
203
212
None
204
213
"""
205
214
for datum in data :
206
- histogram . observe ( datum )
215
+ self . _logger_queue . put_nowait (( histogram , "observe" , datum ) )
207
216
208
217
def log (self , stats : VllmStats ) -> None :
209
218
"""Report stats to Triton metrics server.
@@ -246,3 +255,24 @@ def log(self, stats: VllmStats) -> None:
246
255
self ._log_counter (metric , data )
247
256
for metric , data in histogram_metrics :
248
257
self ._log_histogram (metric , data )
258
+
259
+ def logger_loop (self ):
260
+ while True :
261
+ item = self ._logger_queue .get ()
262
+ # To signal shutdown a None item will be added to the queue.
263
+ if item is None :
264
+ break
265
+ metric , command , data = item
266
+ if command == "increment" :
267
+ metric .increment (data )
268
+ elif command == "observe" :
269
+ metric .observe (data )
270
+ else :
271
+ self .log_logger .log_error (f"Undefined command name: { command } " )
272
+
273
+ def finalize (self ):
274
+ # Shutdown the logger thread.
275
+ self ._logger_queue .put (None )
276
+ if self ._logger_thread is not None :
277
+ self ._logger_thread .join ()
278
+ self ._logger_thread = None
0 commit comments