@@ -2,9 +2,18 @@ use std::fmt;
2
2
use std:: future:: Future ;
3
3
use std:: marker:: PhantomData ;
4
4
use std:: pin:: Pin ;
5
+ use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
6
+ use std:: sync:: Arc ;
5
7
use std:: task:: Context ;
8
+ use std:: time:: Duration ;
6
9
7
10
use tokio:: sync:: { mpsc, oneshot} ;
11
+ use tokio:: time:: MissedTickBehavior ;
12
+
13
+ use crate :: statsd:: SystemGauges ;
14
+
15
+ /// Interval for recording backlog metrics on service channels.
16
+ const BACKLOG_INTERVAL : Duration = Duration :: from_secs ( 1 ) ;
8
17
9
18
/// A message interface for [services](Service).
10
19
///
@@ -305,12 +314,14 @@ pub trait FromMessage<M>: Interface {
305
314
/// long as the service is running. It can be freely cloned.
306
315
pub struct Addr < I : Interface > {
307
316
tx : mpsc:: UnboundedSender < I > ,
317
+ queue_size : Arc < AtomicU64 > ,
308
318
}
309
319
310
320
impl < I : Interface > fmt:: Debug for Addr < I > {
311
321
fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
312
322
f. debug_struct ( "Addr" )
313
323
. field ( "open" , & !self . tx . is_closed ( ) )
324
+ . field ( "queue_size" , & self . queue_size . load ( Ordering :: Relaxed ) )
314
325
. finish ( )
315
326
}
316
327
}
@@ -321,6 +332,7 @@ impl<I: Interface> Clone for Addr<I> {
321
332
fn clone ( & self ) -> Self {
322
333
Self {
323
334
tx : self . tx . clone ( ) ,
335
+ queue_size : self . queue_size . clone ( ) ,
324
336
}
325
337
}
326
338
}
@@ -341,6 +353,7 @@ impl<I: Interface> Addr<I> {
341
353
I : FromMessage < M > ,
342
354
{
343
355
let ( tx, rx) = I :: Response :: channel ( ) ;
356
+ self . queue_size . fetch_add ( 1 , Ordering :: SeqCst ) ;
344
357
self . tx . send ( I :: from_message ( message, tx) ) . ok ( ) ; // it's ok to drop, the response will fail
345
358
rx
346
359
}
@@ -352,15 +365,78 @@ impl<I: Interface> Addr<I> {
352
365
///
353
366
/// Instances are created automatically when [spawning](Service::spawn_handler) a service, or can be
354
367
/// created through [`channel`]. The channel closes when all associated [`Addr`]s are dropped.
355
- pub type Receiver < I > = mpsc:: UnboundedReceiver < I > ;
368
+ pub struct Receiver < I : Interface > {
369
+ rx : mpsc:: UnboundedReceiver < I > ,
370
+ name : & ' static str ,
371
+ interval : tokio:: time:: Interval ,
372
+ queue_size : Arc < AtomicU64 > ,
373
+ }
374
+
375
+ impl < I : Interface > Receiver < I > {
376
+ /// Receives the next value for this receiver.
377
+ ///
378
+ /// This method returns `None` if the channel has been closed and there are
379
+ /// no remaining messages in the channel's buffer. This indicates that no
380
+ /// further values can ever be received from this `Receiver`. The channel is
381
+ /// closed when all senders have been dropped.
382
+ ///
383
+ /// If there are no messages in the channel's buffer, but the channel has
384
+ /// not yet been closed, this method will sleep until a message is sent or
385
+ /// the channel is closed.
386
+ pub async fn recv ( & mut self ) -> Option < I > {
387
+ loop {
388
+ tokio:: select! {
389
+ biased;
390
+
391
+ _ = self . interval. tick( ) => {
392
+ let backlog = self . queue_size. load( Ordering :: Relaxed ) ;
393
+ relay_statsd:: metric!(
394
+ gauge( SystemGauges :: ServiceBackPressure ) = backlog,
395
+ service = self . name
396
+ ) ;
397
+ } ,
398
+ message = self . rx. recv( ) => {
399
+ self . queue_size. fetch_sub( 1 , Ordering :: SeqCst ) ;
400
+ return message;
401
+ } ,
402
+ }
403
+ }
404
+ }
405
+ }
406
+
407
+ impl < I : Interface > fmt:: Debug for Receiver < I > {
408
+ fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
409
+ f. debug_struct ( "Receiver" )
410
+ . field ( "name" , & self . name )
411
+ . field ( "queue_size" , & self . queue_size . load ( Ordering :: Relaxed ) )
412
+ . finish ( )
413
+ }
414
+ }
356
415
357
416
/// Creates an unbounded channel for communicating with a [`Service`].
358
417
///
359
418
/// The `Addr` as the sending part provides public access to the service, while the `Receiver`
360
419
/// should remain internal to the service.
361
- pub fn channel < I : Interface > ( ) -> ( Addr < I > , Receiver < I > ) {
420
+ pub fn channel < I : Interface > ( name : & ' static str ) -> ( Addr < I > , Receiver < I > ) {
421
+ let queue_size = Arc :: new ( AtomicU64 :: new ( 0 ) ) ;
362
422
let ( tx, rx) = mpsc:: unbounded_channel ( ) ;
363
- ( Addr { tx } , rx)
423
+
424
+ let addr = Addr {
425
+ tx,
426
+ queue_size : queue_size. clone ( ) ,
427
+ } ;
428
+
429
+ let mut interval = tokio:: time:: interval ( BACKLOG_INTERVAL ) ;
430
+ interval. set_missed_tick_behavior ( MissedTickBehavior :: Skip ) ;
431
+
432
+ let receiver = Receiver {
433
+ rx,
434
+ name,
435
+ interval,
436
+ queue_size,
437
+ } ;
438
+
439
+ ( addr, receiver)
364
440
}
365
441
366
442
/// An asynchronous unit responding to messages.
@@ -426,8 +502,105 @@ pub trait Service: Sized {
426
502
427
503
/// Starts the service in the current runtime and returns an address for it.
428
504
fn start ( self ) -> Addr < Self :: Interface > {
429
- let ( addr, rx) = channel ( ) ;
505
+ let ( addr, rx) = channel ( Self :: name ( ) ) ;
430
506
self . spawn_handler ( rx) ;
431
507
addr
432
508
}
509
+
510
+ /// Returns a unique name for this service implementation.
511
+ ///
512
+ /// This is used for internal diagnostics and uses the fully qualified type name of the service
513
+ /// implementor by default.
514
+ fn name ( ) -> & ' static str {
515
+ std:: any:: type_name :: < Self > ( )
516
+ }
517
+ }
518
+
519
+ #[ cfg( test) ]
520
+ mod tests {
521
+ use super :: * ;
522
+
523
+ struct MockMessage ;
524
+
525
+ impl Interface for MockMessage { }
526
+
527
+ impl FromMessage < Self > for MockMessage {
528
+ type Response = NoResponse ;
529
+
530
+ fn from_message ( message : Self , _: ( ) ) -> Self {
531
+ message
532
+ }
533
+ }
534
+
535
+ struct MockService ;
536
+
537
+ impl Service for MockService {
538
+ type Interface = MockMessage ;
539
+
540
+ fn spawn_handler ( self , mut rx : Receiver < Self :: Interface > ) {
541
+ tokio:: spawn ( async move {
542
+ while rx. recv ( ) . await . is_some ( ) {
543
+ tokio:: time:: sleep ( BACKLOG_INTERVAL * 2 ) . await ;
544
+ }
545
+ } ) ;
546
+ }
547
+
548
+ fn name ( ) -> & ' static str {
549
+ "mock"
550
+ }
551
+ }
552
+
553
+ #[ test]
554
+ fn test_backpressure_metrics ( ) {
555
+ let rt = tokio:: runtime:: Builder :: new_current_thread ( )
556
+ . enable_time ( )
557
+ . build ( )
558
+ . unwrap ( ) ;
559
+
560
+ let _guard = rt. enter ( ) ;
561
+ tokio:: time:: pause ( ) ;
562
+
563
+ // Mock service takes 2 * BACKLOG_INTERVAL for every message
564
+ let addr = MockService . start ( ) ;
565
+
566
+ // Advance the timer by a tiny offset to trigger the first metric emission.
567
+ let captures = relay_statsd:: with_capturing_test_client ( || {
568
+ rt. block_on ( async {
569
+ tokio:: time:: sleep ( Duration :: from_millis ( 10 ) ) . await ;
570
+ } )
571
+ } ) ;
572
+
573
+ assert_eq ! ( captures, [ "service.back_pressure:0|g|#service:mock" ] ) ;
574
+
575
+ // Send messages and advance to 0.5 * INTERVAL. No metrics expected at this point.
576
+ let captures = relay_statsd:: with_capturing_test_client ( || {
577
+ rt. block_on ( async {
578
+ addr. send ( MockMessage ) ; // will be pulled immediately
579
+ addr. send ( MockMessage ) ;
580
+ addr. send ( MockMessage ) ;
581
+
582
+ tokio:: time:: sleep ( BACKLOG_INTERVAL / 2 ) . await ;
583
+ } )
584
+ } ) ;
585
+
586
+ assert ! ( captures. is_empty( ) ) ;
587
+
588
+ // Advance to 6.5 * INTERVAL. The service should pull the first message immediately, another
589
+ // message every 2 INTERVALS. The messages are fully handled after 6 INTERVALS, but we
590
+ // cannot observe that since the last message exits the queue at 4.
591
+ let captures = relay_statsd:: with_capturing_test_client ( || {
592
+ rt. block_on ( async {
593
+ tokio:: time:: sleep ( BACKLOG_INTERVAL * 6 ) . await ;
594
+ } )
595
+ } ) ;
596
+
597
+ assert_eq ! (
598
+ captures,
599
+ [
600
+ "service.back_pressure:2|g|#service:mock" , // 2 * INTERVAL
601
+ "service.back_pressure:1|g|#service:mock" , // 4 * INTERVAL
602
+ "service.back_pressure:0|g|#service:mock" , // 6 * INTERVAL
603
+ ]
604
+ ) ;
605
+ }
433
606
}
0 commit comments