@@ -23,7 +23,7 @@ use rt::io::net::ip::{SocketAddr, IpAddr};
23
23
use rt:: io:: { standard_error, OtherIoError } ;
24
24
use rt:: local:: Local ;
25
25
use rt:: rtio:: * ;
26
- use rt:: sched:: Scheduler ;
26
+ use rt:: sched:: { Scheduler , SchedHandle } ;
27
27
use rt:: tube:: Tube ;
28
28
use rt:: uv:: * ;
29
29
use rt:: uv:: idle:: IdleWatcher ;
@@ -239,6 +239,27 @@ impl UvIoFactory {
239
239
pub fn uv_loop < ' a > ( & ' a mut self ) -> & ' a mut Loop {
240
240
match self { & UvIoFactory ( ref mut ptr) => ptr }
241
241
}
242
+
243
+ pub fn homed_udp_bind ( & mut self , addr : SocketAddr ) -> Result < ~HomedUvUdpSocket , IoError > {
244
+ let mut watcher = UdpWatcher :: new ( self . uv_loop ( ) ) ;
245
+ match watcher. bind ( addr) {
246
+ Ok ( _) => {
247
+ let home = do Local :: borrow :: < Scheduler , SchedHandle > |sched| { sched. make_handle ( ) } ;
248
+ Ok ( ~HomedUvUdpSocket { watcher : watcher, home : home } )
249
+ }
250
+ Err ( uverr) => {
251
+ let scheduler = Local :: take :: < Scheduler > ( ) ;
252
+ do scheduler. deschedule_running_task_and_then |_, task| {
253
+ let task_cell = Cell :: new ( task) ;
254
+ do watcher. close {
255
+ let scheduler = Local :: take :: < Scheduler > ( ) ;
256
+ scheduler. resume_blocked_task_immediately ( task_cell. take ( ) ) ;
257
+ }
258
+ }
259
+ Err ( uv_error_to_io_error ( uverr) )
260
+ }
261
+ }
262
+ }
242
263
}
243
264
244
265
impl IoFactory for UvIoFactory {
@@ -582,6 +603,135 @@ impl RtioTcpStream for UvTcpStream {
582
603
}
583
604
}
584
605
606
+ pub struct HomedUvUdpSocket {
607
+ watcher : UdpWatcher ,
608
+ home : SchedHandle ,
609
+ }
610
+
611
+ impl HomedUvUdpSocket {
612
+ fn go_home ( & mut self ) {
613
+ use rt:: sched:: PinnedTask ;
614
+ let scheduler = Local :: take :: < Scheduler > ( ) ;
615
+ do scheduler. deschedule_running_task_and_then |_, task| {
616
+ do task. wake ( ) . map_move |task| { self . home . send ( PinnedTask ( task) ) ; } ;
617
+ }
618
+ }
619
+ }
620
+
621
+ impl Drop for HomedUvUdpSocket {
622
+ fn drop ( & self ) {
623
+ rtdebug ! ( "closing homed udp socket" ) ;
624
+ // first go home
625
+ // XXX need mutable finalizer
626
+ let this = unsafe { transmute :: < & HomedUvUdpSocket , & mut HomedUvUdpSocket > ( self ) } ;
627
+ this. go_home ( ) ;
628
+ // now we're home so block the task and start IO
629
+ let scheduler = Local :: take :: < Scheduler > ( ) ;
630
+ do scheduler. deschedule_running_task_and_then |_, task| {
631
+ let task_cell = Cell :: new ( task) ;
632
+ do this. watcher . close {
633
+ // now IO is finished so resume the blocked task
634
+ let scheduler = Local :: take :: < Scheduler > ( ) ;
635
+ scheduler. resume_blocked_task_immediately ( task_cell. take ( ) ) ;
636
+ }
637
+ }
638
+ }
639
+ }
640
+
641
+ impl RtioSocket for HomedUvUdpSocket {
642
+ fn socket_name ( & mut self ) -> Result < SocketAddr , IoError > {
643
+ self . go_home ( ) ;
644
+ socket_name ( Udp , self . watcher )
645
+ }
646
+ }
647
+
648
+ #[ test]
649
+ fn test_simple_homed_udp_io_bind_only ( ) {
650
+ do run_in_newsched_task {
651
+ unsafe {
652
+ let io = Local :: unsafe_borrow :: < IoFactoryObject > ( ) ;
653
+ let addr = next_test_ip4 ( ) ;
654
+ let maybe_socket = ( * io) . homed_udp_bind ( addr) ;
655
+ assert ! ( maybe_socket. is_ok( ) ) ;
656
+ }
657
+ }
658
+ }
659
+
660
+ #[ test]
661
+ fn test_simple_homed_udp_io_bind_then_move_then_home_and_close ( ) {
662
+ use rt:: sleeper_list:: SleeperList ;
663
+ use rt:: work_queue:: WorkQueue ;
664
+ use rt:: thread:: Thread ;
665
+ use rt:: task:: Task ;
666
+ use rt:: sched:: { Shutdown , TaskFromFriend } ;
667
+ do run_in_bare_thread {
668
+ let sleepers = SleeperList :: new ( ) ;
669
+ let work_queue1 = WorkQueue :: new ( ) ;
670
+ let work_queue2 = WorkQueue :: new ( ) ;
671
+ let queues = ~[ work_queue1. clone ( ) , work_queue2. clone ( ) ] ;
672
+
673
+ let mut sched1 = ~Scheduler :: new ( ~UvEventLoop :: new ( ) , work_queue1, queues. clone ( ) ,
674
+ sleepers. clone ( ) ) ;
675
+ let mut sched2 = ~Scheduler :: new ( ~UvEventLoop :: new ( ) , work_queue2, queues. clone ( ) ,
676
+ sleepers. clone ( ) ) ;
677
+
678
+ let handle1 = Cell :: new ( sched1. make_handle ( ) ) ;
679
+ let handle2 = Cell :: new ( sched2. make_handle ( ) ) ;
680
+ let tasksFriendHandle = Cell :: new ( sched2. make_handle ( ) ) ;
681
+
682
+ let on_exit: ~fn ( bool ) = |exit_status| {
683
+ handle1. take ( ) . send ( Shutdown ) ;
684
+ handle2. take ( ) . send ( Shutdown ) ;
685
+ rtassert ! ( exit_status) ;
686
+ } ;
687
+
688
+ let test_function: ~fn ( ) = || {
689
+ let io = unsafe { Local :: unsafe_borrow :: < IoFactoryObject > ( ) } ;
690
+ let addr = next_test_ip4 ( ) ;
691
+ let maybe_socket = unsafe { ( * io) . homed_udp_bind ( addr) } ;
692
+ // this socket is bound to this event loop
693
+ assert ! ( maybe_socket. is_ok( ) ) ;
694
+
695
+ // block self on sched1
696
+ let scheduler = Local :: take :: < Scheduler > ( ) ;
697
+ do scheduler. deschedule_running_task_and_then |_, task| {
698
+ // unblock task
699
+ do task. wake ( ) . map_move |task| {
700
+ // send self to sched2
701
+ tasksFriendHandle. take ( ) . send ( TaskFromFriend ( task) ) ;
702
+ } ;
703
+ // sched1 should now sleep since it has nothing else to do
704
+ }
705
+ // sched2 will wake up and get the task
706
+ // as we do nothing else, the function ends and the socket goes out of scope
707
+ // sched2 will start to run the destructor
708
+ // the destructor will first block the task, set it's home as sched1, then enqueue it
709
+ // sched2 will dequeue the task, see that it has a home, and send it to sched1
710
+ // sched1 will wake up, execute the close function on the correct loop, and then we're done
711
+ } ;
712
+
713
+ let mut main_task = ~Task :: new_root ( & mut sched1. stack_pool , None , test_function) ;
714
+ main_task. death . on_exit = Some ( on_exit) ;
715
+ let main_task = Cell :: new ( main_task) ;
716
+
717
+ let null_task = Cell :: new ( ~do Task :: new_root ( & mut sched2. stack_pool , None ) || { } ) ;
718
+
719
+ let sched1 = Cell :: new ( sched1) ;
720
+ let sched2 = Cell :: new ( sched2) ;
721
+
722
+ // XXX could there be a race on the threads that causes a crash?
723
+ let thread1 = do Thread :: start {
724
+ sched1. take ( ) . bootstrap ( main_task. take ( ) ) ;
725
+ } ;
726
+ let thread2 = do Thread :: start {
727
+ sched2. take ( ) . bootstrap ( null_task. take ( ) ) ;
728
+ } ;
729
+
730
+ thread1. join ( ) ;
731
+ thread2. join ( ) ;
732
+ }
733
+ }
734
+
585
735
pub struct UvUdpSocket ( UdpWatcher ) ;
586
736
587
737
impl Drop for UvUdpSocket {
0 commit comments