@@ -30,7 +30,7 @@ class OkHttpWebSocketWrapper extends WebSocketWrapper {
30
30
private WebSocketOnClosedCallback onClose ;
31
31
private CompletableSubject startSubject = CompletableSubject .create ();
32
32
private CompletableSubject closeSubject = CompletableSubject .create ();
33
- private final ReentrantLock closeLock = new ReentrantLock ();
33
+ private final ReentrantLock stateLock = new ReentrantLock ();
34
34
35
35
private final Logger logger = LoggerFactory .getLogger (OkHttpWebSocketWrapper .class );
36
36
@@ -82,7 +82,12 @@ public void setOnClose(WebSocketOnClosedCallback onClose) {
82
82
private class SignalRWebSocketListener extends WebSocketListener {
83
83
@ Override
84
84
public void onOpen (WebSocket webSocket , Response response ) {
85
- startSubject .onComplete ();
85
+ stateLock .lock ();
86
+ try {
87
+ startSubject .onComplete ();
88
+ } finally {
89
+ stateLock .unlock ();
90
+ }
86
91
}
87
92
88
93
@ Override
@@ -97,39 +102,64 @@ public void onMessage(WebSocket webSocket, ByteString bytes) {
97
102
98
103
@ Override
99
104
public void onClosing (WebSocket webSocket , int code , String reason ) {
100
- onClose .invoke (code , reason );
105
+ boolean isOpen = false ;
106
+ stateLock .lock ();
107
+ try {
108
+ isOpen = startSubject .hasComplete ();
109
+ } finally {
110
+ stateLock .unlock ();
111
+ }
112
+
113
+ logger .info ("WebSocket closing with status code '{}' and reason '{}'." , code , reason );
114
+
115
+ // Only call onClose if connection is open
116
+ if (isOpen ) {
117
+ onClose .invoke (code , reason );
118
+ }
119
+
101
120
try {
102
- closeLock .lock ();
121
+ stateLock .lock ();
103
122
closeSubject .onComplete ();
104
123
}
105
124
finally {
106
- closeLock .unlock ();
125
+ stateLock .unlock ();
107
126
}
108
- checkStartFailure ();
127
+ checkStartFailure (null );
109
128
}
110
129
111
130
@ Override
112
131
public void onFailure (WebSocket webSocket , Throwable t , Response response ) {
113
- logger .error ("WebSocket closed from an error: {} ." , t . getMessage () );
132
+ logger .error ("WebSocket closed from an error." , t );
114
133
134
+ boolean isOpen = false ;
115
135
try {
116
- closeLock .lock ();
136
+ stateLock .lock ();
117
137
if (!closeSubject .hasComplete ()) {
118
138
closeSubject .onError (new RuntimeException (t ));
119
139
}
140
+
141
+ isOpen = startSubject .hasComplete ();
120
142
}
121
143
finally {
122
- closeLock .unlock ();
144
+ stateLock .unlock ();
123
145
}
124
- onClose .invoke (null , t .getMessage ());
125
- checkStartFailure ();
146
+ // Only call onClose if connection is open
147
+ if (isOpen ) {
148
+ onClose .invoke (null , t .getMessage ());
149
+ }
150
+ checkStartFailure (t );
126
151
}
127
152
128
- private void checkStartFailure () {
129
- // If the start task hasn't completed yet, then we need to complete it
130
- // exceptionally.
131
- if (!startSubject .hasComplete ()) {
132
- startSubject .onError (new RuntimeException ("There was an error starting the WebSocket transport." ));
153
+ private void checkStartFailure (Throwable t ) {
154
+ stateLock .lock ();
155
+ try {
156
+ // If the start task hasn't completed yet, then we need to complete it
157
+ // exceptionally.
158
+ if (!startSubject .hasComplete ()) {
159
+ startSubject .onError (new RuntimeException ("There was an error starting the WebSocket transport." , t ));
160
+ }
161
+ } finally {
162
+ stateLock .unlock ();
133
163
}
134
164
}
135
165
}
0 commit comments