@@ -87,6 +87,58 @@ public void testBufferedReplay() {
87
87
}
88
88
}
89
89
90
+ @ Test
91
+ public void testBufferedWindowReplay () {
92
+ PublishSubject <Integer > source = PublishSubject .create ();
93
+ TestScheduler scheduler = new TestScheduler ();
94
+ ConnectableObservable <Integer > co = source .replay (3 , 100 , TimeUnit .MILLISECONDS , scheduler );
95
+ co .connect ();
96
+
97
+ {
98
+ @ SuppressWarnings ("unchecked" )
99
+ Observer <Object > observer1 = mock (Observer .class );
100
+ InOrder inOrder = inOrder (observer1 );
101
+
102
+ co .subscribe (observer1 );
103
+
104
+ source .onNext (1 );
105
+ scheduler .advanceTimeBy (10 , TimeUnit .MILLISECONDS );
106
+ source .onNext (2 );
107
+ scheduler .advanceTimeBy (10 , TimeUnit .MILLISECONDS );
108
+ source .onNext (3 );
109
+ scheduler .advanceTimeBy (10 , TimeUnit .MILLISECONDS );
110
+
111
+ inOrder .verify (observer1 , times (1 )).onNext (1 );
112
+ inOrder .verify (observer1 , times (1 )).onNext (2 );
113
+ inOrder .verify (observer1 , times (1 )).onNext (3 );
114
+
115
+ source .onNext (4 );
116
+ source .onNext (5 );
117
+ scheduler .advanceTimeBy (90 , TimeUnit .MILLISECONDS );
118
+
119
+ inOrder .verify (observer1 , times (1 )).onNext (4 );
120
+
121
+ inOrder .verify (observer1 , times (1 )).onNext (5 );
122
+
123
+ inOrder .verifyNoMoreInteractions ();
124
+ verify (observer1 , never ()).onError (any (Throwable .class ));
125
+
126
+ }
127
+
128
+ {
129
+ @ SuppressWarnings ("unchecked" )
130
+ Observer <Object > observer1 = mock (Observer .class );
131
+ InOrder inOrder = inOrder (observer1 );
132
+
133
+ co .subscribe (observer1 );
134
+
135
+ inOrder .verify (observer1 , times (1 )).onNext (4 );
136
+ inOrder .verify (observer1 , times (1 )).onNext (5 );
137
+ inOrder .verifyNoMoreInteractions ();
138
+ verify (observer1 , never ()).onError (any (Throwable .class ));
139
+ }
140
+ }
141
+
90
142
@ Test
91
143
public void testWindowedReplay () {
92
144
TestScheduler scheduler = new TestScheduler ();
0 commit comments