67
67
#include "task_net.h"
68
68
#include "task_os.h"
69
69
#include "xcom_cfg.h"
70
+ #ifndef _WIN32
70
71
#ifndef USE_SELECT
71
72
#include <poll.h>
72
73
#endif
74
+ #endif
73
75
74
76
#include "retry.h"
77
+ #include "xdr_utils.h"
75
78
76
79
extern char * pax_op_to_str (int x );
77
80
@@ -89,17 +92,33 @@ struct iotasks {
89
92
linkage tasks ; /* OHKFIX Should be one each for read and write */
90
93
};
91
94
#else
95
+ typedef struct {
96
+ u_int pollfd_array_len ;
97
+ pollfd * pollfd_array_val ;
98
+ } pollfd_array ;
99
+
100
+ typedef task_env * task_env_p ;
101
+
102
+ typedef struct {
103
+ u_int task_env_p_array_len ;
104
+ task_env_p * task_env_p_array_val ;
105
+ } task_env_p_array ;
106
+
107
+ define_xdr_funcs (pollfd )
108
+ define_xdr_funcs (task_env_p )
109
+
92
110
struct iotasks {
93
111
int nwait ;
94
- struct pollfd fd [ MAXFILES ] ;
95
- task_env * tasks [ MAXFILES ] ;
112
+ pollfd_array fd ;
113
+ task_env_p_array tasks ;
96
114
};
97
115
#endif
98
116
int task_errno = 0 ;
99
117
static task_env * extract_first_delayed ();
100
118
static task_env * task_ref (task_env * t );
101
119
static task_env * task_unref (task_env * t );
102
120
static void wake_all_io ();
121
+ static void task_sys_deinit ();
103
122
104
123
/* Return time as seconds */
105
124
static double _now = 0.0 ;
@@ -506,6 +525,7 @@ static void task_delete(task_env *t) {
506
525
#if 1
507
526
free (deactivate (t )); /* Deactivate and free task */
508
527
#else
528
+ deactivate (t );
509
529
link_into (& t -> l , & free_tasks );
510
530
#endif
511
531
active_tasks -- ;
@@ -536,7 +556,7 @@ task_env *task_deactivate(task_env *t) { return deactivate(t); }
536
556
/* Set terminate flag and activate task */
537
557
task_env * task_terminate (task_env * t ) {
538
558
if (t ) {
539
- MAY_DBG (FN ; PTREXP (t ); STREXP (t -> name ));
559
+ DBGOUT (FN ; PTREXP (t ); STREXP (t -> name ); NDBG ( t -> refcnt , d ));
540
560
t -> terminate = KILL ; /* Set terminate flag */
541
561
activate (t ); /* and get it running */
542
562
}
@@ -577,6 +597,11 @@ static void iotasks_init(iotasks *iot) {
577
597
link_init (& iot -> tasks , type_hash ("task_env" ));
578
598
}
579
599
600
+ static void iotasks_deinit (iotasks * iot )
601
+ {
602
+ DBGOUT (FN );
603
+ }
604
+
580
605
#if TASK_DBUG_ON
581
606
static void poll_debug () MY_ATTRIBUTE ((unused ));
582
607
static void poll_debug () {
@@ -706,14 +731,20 @@ void remove_and_wakeup(int fd) {
706
731
}
707
732
708
733
#else
709
- static int active_io () { return iot .nwait > 0 ; }
734
+ static void iotasks_init (iotasks * iot )
735
+ {
736
+ DBGOUT (FN );
737
+ iot -> nwait = 0 ;
738
+ init_pollfd_array (& iot -> fd );
739
+ init_task_env_p_array (& iot -> tasks );
740
+ }
710
741
711
- static void iotasks_init (iotasks * iot ) {
712
- int i ;
742
+ static void iotasks_deinit (iotasks * iot )
743
+ {
744
+ DBGOUT (FN );
713
745
iot -> nwait = 0 ;
714
- for (i = 0 ; i < MAXFILES ; i ++ ) {
715
- iot -> tasks [i ] = 0 ;
716
- }
746
+ free_pollfd_array (& iot -> fd );
747
+ free_task_env_p_array (& iot -> tasks );
717
748
}
718
749
719
750
#if TASK_DBUG_ON
@@ -729,26 +760,27 @@ static void poll_debug() {
729
760
}
730
761
#endif
731
762
732
- static void poll_wakeup (int i ) {
733
- activate (task_unref (iot .tasks [i ]));
734
- iot .tasks [i ] = NULL ;
763
+ static void poll_wakeup (int i )
764
+ {
765
+ activate (task_unref (get_task_env_p (& iot .tasks ,i )));
766
+ set_task_env_p (& iot .tasks , NULL ,i );
735
767
iot .nwait -- ; /* Shrink array of pollfds */
736
- iot .fd [ i ] = iot .fd [ iot .nwait ] ;
737
- iot .tasks [ i ] = iot .tasks [ iot .nwait ] ;
768
+ set_pollfd ( & iot .fd , get_pollfd ( & iot .fd , iot .nwait ), i ) ;
769
+ set_task_env_p ( & iot .tasks , get_task_env_p ( & iot .tasks , iot .nwait ), i ) ;
738
770
}
739
771
740
772
static int poll_wait (int ms ) {
741
773
result nfds = {0 , 0 };
774
+ int wake = 0 ;
742
775
743
776
/* Wait at most ms milliseconds */
744
777
MAY_DBG (FN ; NDBG (ms , d ));
745
- int wake = 0 ;
746
778
if (ms < 0 || ms > 1000 ) ms = 1000 ; /* Wait at most 1000 ms */
747
779
SET_OS_ERR (0 );
748
- while ((nfds .val = poll (iot .fd , iot .nwait , ms )) == -1 ) {
749
- nfds .err = to_errno (GET_OS_ERR );
750
- if (nfds .err != SOCK_EINTR ) {
751
- task_dump_err (nfds .err );
780
+ while ((nfds .val = poll (iot .fd . pollfd_array_val , iot .nwait , ms )) == -1 ) {
781
+ nfds .funerr = to_errno (GET_OS_ERR );
782
+ if (nfds .funerr != SOCK_EINTR ) {
783
+ task_dump_err (nfds .funerr );
752
784
MAY_DBG (FN ; STRLIT ("poll failed" ));
753
785
abort ();
754
786
}
@@ -760,11 +792,12 @@ static int poll_wait(int ms) {
760
792
int interrupt = 0 ;
761
793
while (i < iot .nwait ) {
762
794
interrupt =
763
- (iot .tasks [i ]-> time != 0.0 && iot .tasks [i ]-> time < task_now ());
795
+ (get_task_env_p (& iot .tasks ,i )-> time != 0.0 &&
796
+ get_task_env_p (& iot .tasks ,i )-> time < task_now ());
764
797
if (interrupt || /* timeout ? */
765
- iot .fd [ i ] .revents ) {
798
+ get_pollfd ( & iot .fd , i ) .revents ) {
766
799
/* if(iot.fd[i].revents & POLLERR) abort(); */
767
- iot .tasks [ i ] -> interrupt = interrupt ;
800
+ get_task_env_p ( & iot .tasks , i ) -> interrupt = interrupt ;
768
801
poll_wakeup (i );
769
802
wake = 1 ;
770
803
} else {
@@ -779,28 +812,37 @@ static void add_fd(task_env *t, int fd, int op) {
779
812
int events = 'r' == op ? POLLIN | POLLRDNORM : POLLOUT ;
780
813
MAY_DBG (FN ; PTREXP (t ); NDBG (fd , d ); NDBG (op , d ));
781
814
assert (fd >= 0 );
782
- assert (fd < MAXFILES );
783
815
t -> waitfd = fd ;
784
816
deactivate (t );
785
817
task_ref (t );
786
- iot .tasks [iot .nwait ] = t ;
787
- iot .fd [iot .nwait ].fd = fd ;
788
- iot .fd [iot .nwait ].events = events ;
789
- iot .fd [iot .nwait ].revents = 0 ;
818
+ set_task_env_p (& iot .tasks , t , iot .nwait );
819
+ {
820
+ pollfd x ;
821
+ x .fd = fd ;
822
+ x .events = events ;
823
+ x .revents = 0 ;
824
+ set_pollfd (& iot .fd , x , iot .nwait );
825
+ }
790
826
iot .nwait ++ ;
791
827
}
792
828
793
829
void unpoll (int i ) {
794
- assert (i < MAXFILES );
795
- iot .tasks [i ] = NULL ;
796
- iot .fd [i ].fd = -1 ;
830
+ task_unref (get_task_env_p (& iot .tasks , i ));
831
+ set_task_env_p (& iot .tasks , NULL ,i );
832
+ {
833
+ pollfd x ;
834
+ x .fd = -1 ;
835
+ x .events = 0 ;
836
+ x .revents = 0 ;
837
+ set_pollfd (& iot .fd , x , i );
838
+ }
797
839
}
798
840
799
841
static void wake_all_io () {
800
842
int i ;
801
843
for (i = 0 ; i < iot .nwait ; i ++ ) {
844
+ activate (get_task_env_p (& iot .tasks ,i ));
802
845
unpoll (i );
803
- activate (task_unref (iot .tasks [i ]));
804
846
}
805
847
iot .nwait = 0 ;
806
848
}
@@ -809,7 +851,7 @@ void remove_and_wakeup(int fd) {
809
851
int i = 0 ;
810
852
MAY_DBG (FN ; NDBG (fd , d ));
811
853
while (i < iot .nwait ) {
812
- if (iot .fd [ i ] .fd == fd ) {
854
+ if (get_pollfd ( & iot .fd , i ) .fd == fd ) {
813
855
poll_wakeup (i );
814
856
} else {
815
857
i ++ ;
@@ -819,36 +861,6 @@ void remove_and_wakeup(int fd) {
819
861
820
862
#endif
821
863
task_env * stack = NULL ;
822
- /* Locks needed to get atomic reads and writes by protecting file descriptors
823
- while the task has been suspended by wait_io */
824
- static task_env * io_wait_locks [MAXFILES ][2 ];
825
- /* purecov: begin deadcode */
826
- int is_locked (int fd ) { return io_wait_locks [fd ][0 ] || io_wait_locks [fd ][1 ]; }
827
-
828
- int lock_fd (int fd , task_env * t , int lock ) {
829
- if (fd < 0 ) return 0 ;
830
- lock = lock != 'r' ;
831
- if (io_wait_locks [fd ][lock ]) {
832
- DBGOUT (FN ; NDBG (fd , d ); PTREXP (t ); STRLIT (" failed" ));
833
- return 0 ;
834
- } else {
835
- io_wait_locks [fd ][lock ] = t ;
836
- return 1 ;
837
- }
838
- }
839
-
840
- int unlock_fd (int fd , task_env * t , int lock ) {
841
- if (fd < 0 ) return 0 ;
842
- lock = lock != 'r' ;
843
- if (io_wait_locks [fd ][lock ] != t ) {
844
- DBGOUT (FN ; NDBG (fd , d ); PTREXP (t ); STRLIT (" failed" ));
845
- return 0 ;
846
- } else {
847
- io_wait_locks [fd ][lock ] = NULL ;
848
- return 1 ;
849
- }
850
- }
851
- /* purecov: end */
852
864
853
865
task_env * wait_io (task_env * t , int fd , int op ) {
854
866
t -> time = 0.0 ;
@@ -1158,6 +1170,7 @@ void task_loop() {
1158
1170
idle_time += seconds () - time ;
1159
1171
}
1160
1172
}
1173
+ task_sys_deinit ();
1161
1174
}
1162
1175
1163
1176
static int init_sockaddr (char * server , struct sockaddr_in * sock_addr ,
@@ -1471,6 +1484,13 @@ void task_sys_init() {
1471
1484
/* task_new(statistics_task, null_arg, "statistics_task", 1); */
1472
1485
}
1473
1486
1487
+
1488
+ static void task_sys_deinit ()
1489
+ {
1490
+ DBGOUT (FN );
1491
+ iotasks_deinit (& iot );
1492
+ }
1493
+
1474
1494
/* purecov: begin deadcode */
1475
1495
int is_running (task_env * t ) { return t && t -> terminate == RUN ; }
1476
1496
/* purecov: end */
0 commit comments