1
+ /*
2
+ * Copyright 2015-2018 the original author or authors.
3
+ *
4
+ * Licensed under the Apache License, Version 2.0 (the "License");
5
+ * you may not use this file except in compliance with the License.
6
+ * You may obtain a copy of the License at
7
+ *
8
+ * http://www.apache.org/licenses/LICENSE-2.0
9
+ *
10
+ * Unless required by applicable law or agreed to in writing, software
11
+ * distributed under the License is distributed on an "AS IS" BASIS,
12
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
+ * See the License for the specific language governing permissions and
14
+ * limitations under the License.
15
+ */
16
+
17
+ package io .rsocket .internal ;
18
+
19
+ import java .util .Objects ;
20
+ import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
21
+ import java .util .concurrent .atomic .AtomicReferenceFieldUpdater ;
22
+ import java .util .function .BiFunction ;
23
+
24
+ import io .netty .util .ReferenceCountUtil ;
25
+ import org .reactivestreams .Publisher ;
26
+ import org .reactivestreams .Subscription ;
27
+ import reactor .core .CoreSubscriber ;
28
+ import reactor .core .Scannable ;
29
+ import reactor .core .publisher .Flux ;
30
+ import reactor .core .publisher .Operators ;
31
+ import reactor .util .annotation .Nullable ;
32
+
33
+ public final class SwitchTransformFlux <T , R > extends Flux <R > {
34
+
35
+ final Publisher <? extends T > source ;
36
+ final BiFunction <T , Flux <T >, Publisher <? extends R >> transformer ;
37
+
38
+ public SwitchTransformFlux (
39
+ Publisher <? extends T > source , BiFunction <T , Flux <T >, Publisher <? extends R >> transformer ) {
40
+ this .source = Objects .requireNonNull (source , "source" );
41
+ this .transformer = Objects .requireNonNull (transformer , "transformer" );
42
+ }
43
+
44
+ @ Override
45
+ public int getPrefetch () {
46
+ return 1 ;
47
+ }
48
+
49
+ @ Override
50
+ public void subscribe (CoreSubscriber <? super R > actual ) {
51
+ source .subscribe (new SwitchTransformMain <>(actual , transformer ));
52
+ }
53
+
54
+ static final class SwitchTransformMain <T , R > implements CoreSubscriber <T >, Scannable {
55
+
56
+ final CoreSubscriber <? super R > actual ;
57
+ final BiFunction <T , Flux <T >, Publisher <? extends R >> transformer ;
58
+ final SwitchTransformInner <T > inner ;
59
+
60
+ Subscription s ;
61
+
62
+ volatile int once ;
63
+ @ SuppressWarnings ("rawtypes" )
64
+ static final AtomicIntegerFieldUpdater <SwitchTransformMain > ONCE =
65
+ AtomicIntegerFieldUpdater .newUpdater (SwitchTransformMain .class , "once" );
66
+
67
+ SwitchTransformMain (
68
+ CoreSubscriber <? super R > actual ,
69
+ BiFunction <T , Flux <T >, Publisher <? extends R >> transformer
70
+ ) {
71
+ this .actual = actual ;
72
+ this .transformer = transformer ;
73
+ this .inner = new SwitchTransformInner <>(this );
74
+ }
75
+
76
+ @ Override
77
+ @ Nullable
78
+ public Object scanUnsafe (Attr key ) {
79
+ if (key == Attr .CANCELLED ) return s == Operators .cancelledSubscription ();
80
+ if (key == Attr .PREFETCH ) return 1 ;
81
+
82
+ return null ;
83
+ }
84
+
85
+ @ Override
86
+ public void onSubscribe (Subscription s ) {
87
+ if (Operators .validate (this .s , s )) {
88
+ this .s = s ;
89
+ s .request (1 );
90
+ }
91
+ }
92
+
93
+ @ Override
94
+ public void onNext (T t ) {
95
+ if (isCanceled ()) {
96
+ return ;
97
+ }
98
+
99
+ if (once == 0 && ONCE .compareAndSet (this , 0 , 1 )) {
100
+ try {
101
+ inner .first = t ;
102
+ Publisher <? extends R > result =
103
+ Objects .requireNonNull (transformer .apply (t , inner ), "The transformer returned a null value" );
104
+ result .subscribe (actual );
105
+ return ;
106
+ } catch (Throwable e ) {
107
+ onError (Operators .onOperatorError (s , e , t , actual .currentContext ()));
108
+ ReferenceCountUtil .safeRelease (t );
109
+ return ;
110
+ }
111
+ }
112
+
113
+ inner .onNext (t );
114
+ }
115
+
116
+ @ Override
117
+ public void onError (Throwable t ) {
118
+ if (isCanceled ()) {
119
+ return ;
120
+ }
121
+
122
+ if (once != 0 ) {
123
+ inner .onError (t );
124
+ } else {
125
+ actual .onSubscribe (Operators .emptySubscription ());
126
+ actual .onError (t );
127
+ }
128
+ }
129
+
130
+ @ Override
131
+ public void onComplete () {
132
+ if (isCanceled ()) {
133
+ return ;
134
+ }
135
+
136
+ if (once != 0 ) {
137
+ inner .onComplete ();
138
+ } else {
139
+ actual .onSubscribe (Operators .emptySubscription ());
140
+ actual .onComplete ();
141
+ }
142
+ }
143
+
144
+ boolean isCanceled () {
145
+ return s == Operators .cancelledSubscription ();
146
+ }
147
+
148
+ void cancel () {
149
+ s = Operators .cancelledSubscription ();
150
+ }
151
+ }
152
+
153
+ static final class SwitchTransformInner <V > extends Flux <V >
154
+ implements Scannable , Subscription {
155
+
156
+ final SwitchTransformMain <V , ?> parent ;
157
+
158
+ volatile CoreSubscriber <? super V > actual ;
159
+ @ SuppressWarnings ("rawtypes" )
160
+ static final AtomicReferenceFieldUpdater <SwitchTransformInner , CoreSubscriber > ACTUAL =
161
+ AtomicReferenceFieldUpdater .newUpdater (SwitchTransformInner .class , CoreSubscriber .class , "actual" );
162
+
163
+ volatile V first ;
164
+ @ SuppressWarnings ("rawtypes" )
165
+ static final AtomicReferenceFieldUpdater <SwitchTransformInner , Object > FIRST =
166
+ AtomicReferenceFieldUpdater .newUpdater (SwitchTransformInner .class , Object .class , "first" );
167
+
168
+ volatile int once ;
169
+ @ SuppressWarnings ("rawtypes" )
170
+ static final AtomicIntegerFieldUpdater <SwitchTransformInner > ONCE =
171
+ AtomicIntegerFieldUpdater .newUpdater (SwitchTransformInner .class , "once" );
172
+
173
+ SwitchTransformInner (SwitchTransformMain <V , ?> parent ) {
174
+ this .parent = parent ;
175
+ }
176
+
177
+ public void onNext (V t ) {
178
+ CoreSubscriber <? super V > a = actual ;
179
+
180
+ if (a != null ) {
181
+ a .onNext (t );
182
+ }
183
+ }
184
+
185
+ public void onError (Throwable t ) {
186
+ CoreSubscriber <? super V > a = actual ;
187
+
188
+ if (a != null ) {
189
+ a .onError (t );
190
+ }
191
+ }
192
+
193
+ public void onComplete () {
194
+ CoreSubscriber <? super V > a = actual ;
195
+
196
+ if (a != null ) {
197
+ a .onComplete ();
198
+ }
199
+ }
200
+
201
+ @ Override
202
+ public void subscribe (CoreSubscriber <? super V > actual ) {
203
+ if (once == 0 && ONCE .compareAndSet (this , 0 , 1 )) {
204
+ ACTUAL .lazySet (this , actual );
205
+ actual .onSubscribe (this );
206
+ }
207
+ else {
208
+ actual .onError (new IllegalStateException ("SwitchTransform allows only one Subscriber" ));
209
+ }
210
+ }
211
+
212
+ @ Override
213
+ public void request (long n ) {
214
+ V f = first ;
215
+
216
+ if (f != null && FIRST .compareAndSet (this , f , null )) {
217
+ actual .onNext (f );
218
+
219
+ long r = Operators .addCap (n , -1 );
220
+ if (r > 0 ) {
221
+ parent .s .request (r );
222
+ }
223
+ } else {
224
+ parent .s .request (n );
225
+ }
226
+ }
227
+
228
+ @ Override
229
+ public void cancel () {
230
+ actual = null ;
231
+ first = null ;
232
+ parent .cancel ();
233
+ }
234
+
235
+ @ Override
236
+ @ Nullable
237
+ public Object scanUnsafe (Attr key ) {
238
+ if (key == Attr .PARENT ) return parent ;
239
+ if (key == Attr .ACTUAL ) return actual ();
240
+
241
+ return null ;
242
+ }
243
+
244
+ public CoreSubscriber <? super V > actual () {
245
+ return actual ;
246
+ }
247
+ }
248
+ }
0 commit comments