16
16
"""
17
17
18
18
from mbed_host_tests import BaseHostTest
19
+ import time
19
20
20
21
21
- class TimingDriftTest (BaseHostTest ):
22
- """ This test is reading single characters from stdio
23
- and measures time between their occurrences.
22
+ class TimingDriftSync (BaseHostTest ):
23
+ """
24
+ This works as master-slave fashion
25
+ 1) Device says its booted up and ready to run the test, wait for host to respond
26
+ 2) Host sends the message to get the device current time i.e base time
27
+
28
+ #
29
+ # *
30
+ # * |
31
+ #<---* DUT<- base_time | - round_trip_base_time ------
32
+ # * | |
33
+ # * - |
34
+ # - |
35
+ # | |
36
+ # | |
37
+ # | - measurement_stretch | - nominal_time
38
+ # | |
39
+ # | |
40
+ # - |
41
+ # * - |
42
+ # * | |
43
+ #<---* DUT <-final_time | - round_trip_final_time------
44
+ # * |
45
+ # * -
46
+ #
47
+ #
48
+ # As we increase the measurement_stretch, the error because of transport delay diminishes.
49
+ # The values of measurement_stretch is propotional to round_trip_base_time(transport delays)
50
+ # by factor time_measurement_multiplier.This multiplier is used is 80 to tolerate 2 sec of
51
+ # transport delay and test time ~ 180 secs
52
+ #
53
+ # Failure in timing can occur if we are ticking too fast or we are ticking too slow, hence we have
54
+ # min_range and max_range. if we cross on either side tests would be marked fail. The range is a function of
55
+ # tolerance/acceptable drift currently its 5%.
56
+ #
57
+
24
58
"""
25
59
__result = None
26
-
27
- # This is calculated later: average_drift_max * number of tick events
28
- total_drift_max = None
29
-
30
- average_drift_max = 0.05
31
- ticks = []
32
- start_time = None
33
- finish_time = None
34
- dut_seconds_passed = None
35
- total_time = None
36
- total_drift = None
37
- average_drift = None
38
-
39
- def _callback_result (self , key , value , timestamp ):
40
- # We should not see result data in this test
41
- self .__result = False
42
-
43
- def _callback_end (self , key , value , timestamp ):
44
- """ {{end;%s}}} """
45
- self .log ("Received end event, timestamp: %f" % timestamp )
46
- self .notify_complete (result = self .result (print_stats = True ))
47
-
48
-
49
- def _callback_tick (self , key , value , timestamp ):
50
- """ {{tick;%d}}} """
51
- self .log ("tick! %f" % timestamp )
52
- self .ticks .append ((key , value , timestamp ))
60
+ mega = 1000000.0
61
+ max_measurement_time = 180
53
62
63
+ # this value is obtained for measurements when there is 0 transport delay and we want accurancy of 5%
64
+ time_measurement_multiplier = 80
54
65
55
- def setup (self ):
56
- self .register_callback ( "end" , self . _callback_end )
57
- self .register_callback ( 'tick' , self . _callback_tick )
66
+ def _callback_timing_drift_check_start (self , key , value , timestamp ):
67
+ self .round_trip_base_start = timestamp
68
+ self .send_kv ( "base_time" , 0 )
58
69
70
+ def _callback_base_time (self , key , value , timestamp ):
71
+ self .round_trip_base_end = timestamp
72
+ self .device_time_base = float (value )
73
+ self .round_trip_base_time = self .round_trip_base_end - self .round_trip_base_start
59
74
60
- def result (self , print_stats = True ):
61
- self .dut_seconds_passed = len (self .ticks ) - 1
62
-
63
- if self .dut_seconds_passed < 1 :
64
- if print_stats :
65
- self .log ("FAIL: failed to receive at least two tick events" )
66
- self .__result = False
67
- return self .__result
68
-
69
- self .total_drift_max = self .dut_seconds_passed * self .average_drift_max
70
-
71
- self .start_time = self .ticks [0 ][2 ]
72
- self .finish_time = self .ticks [- 1 ][2 ]
73
- self .total_time = self .finish_time - self .start_time
74
- self .total_drift = self .total_time - self .dut_seconds_passed
75
- self .average_drift = self .total_drift / self .dut_seconds_passed
76
-
77
- if print_stats :
78
- self .log ("Start: %f" % self .start_time )
79
- self .log ("Finish: %f" % self .finish_time )
80
- self .log ("Total time taken: %f" % self .total_time )
81
-
82
- total_drift_ratio_string = "Total drift/Max total drift: %f/%f"
83
- self .log (total_drift_ratio_string % (self .total_drift ,
84
- self .total_drift_max ))
85
-
86
- average_drift_ratio_string = "Average drift/Max average drift: %f/%f"
87
- self .log (average_drift_ratio_string % (self .average_drift ,
88
- self .average_drift_max ))
89
-
90
-
91
- if abs (self .total_drift ) > self .total_drift_max :
92
- if print_stats :
93
- self .log ("FAIL: Total drift exceeded max total drift" )
94
- self .__result = False
95
- elif self .average_drift > self .average_drift_max :
96
- if print_stats :
97
- self .log ("FAIL: Average drift exceeded max average drift" )
98
- self .__result = False
75
+ self .log ("Device base time {}" .format (value ))
76
+ measurement_stretch = (self .round_trip_base_time * self .time_measurement_multiplier ) + 5
77
+
78
+ if measurement_stretch > self .max_measurement_time :
79
+ self .log ("Time required {} to determine device timer is too high due to transport delay, skipping" .format (measurement_stretch ))
99
80
else :
81
+ self .log ("sleeping for {} to measure drift accurately" .format (measurement_stretch ))
82
+ time .sleep (measurement_stretch )
83
+ self .round_trip_final_start = time .time ()
84
+ self .send_kv ("final_time" , 0 )
85
+
86
+ def _callback_final_time (self , key , value , timestamp ):
87
+ self .round_trip_final_end = timestamp
88
+ self .device_time_final = float (value )
89
+ self .round_trip_final_time = self .round_trip_final_end - self .round_trip_final_start
90
+ self .log ("Device final time {} " .format (value ))
91
+
92
+ # compute the test results and send to device
93
+ results = "pass" if self .compute_parameter () else "fail"
94
+ self .send_kv (results , "0" )
95
+
96
+ def setup (self ):
97
+ self .register_callback ('timing_drift_check_start' , self ._callback_timing_drift_check_start )
98
+ self .register_callback ('base_time' , self ._callback_base_time )
99
+ self .register_callback ('final_time' , self ._callback_final_time )
100
+
101
+ def compute_parameter (self , failure_criteria = 0.05 ):
102
+ t_max = self .round_trip_final_end - self .round_trip_base_start
103
+ t_min = self .round_trip_final_start - self .round_trip_base_end
104
+ t_max_hi = t_max * (1 + failure_criteria )
105
+ t_max_lo = t_max * (1 - failure_criteria )
106
+ t_min_hi = t_min * (1 + failure_criteria )
107
+ t_min_lo = t_min * (1 - failure_criteria )
108
+ device_time = (self .device_time_final - self .device_time_base ) / self .mega
109
+
110
+ self .log ("Compute host events" )
111
+ self .log ("Transport delay 0: {}" .format (self .round_trip_base_time ))
112
+ self .log ("Transport delay 1: {}" .format (self .round_trip_final_time ))
113
+ self .log ("DUT base time : {}" .format (self .device_time_base ))
114
+ self .log ("DUT end time : {}" .format (self .device_time_final ))
115
+
116
+ self .log ("min_pass : {} , max_pass : {} for {}%%" .format (t_max_lo , t_min_hi , failure_criteria * 100 ))
117
+ self .log ("min_inconclusive : {} , max_inconclusive : {}" .format (t_min_lo , t_max_hi ))
118
+ self .log ("Time reported by device: {}" .format (device_time ))
119
+
120
+ if t_max_lo <= device_time <= t_min_hi :
121
+ self .log ("Test passed !!!" )
100
122
self .__result = True
101
-
123
+ elif t_min_lo <= device_time <= t_max_hi :
124
+ self .log ("Test inconclusive due to transport delay, retrying" )
125
+ self .__result = False
126
+ else :
127
+ self .log ("Time outside of passing range. Timing drift seems to be present !!!" )
128
+ self .__result = False
102
129
return self .__result
103
130
131
+ def result (self ):
132
+ return self .__result
104
133
105
134
def teardown (self ):
106
- pass
135
+ pass
0 commit comments