Skip to content

Commit 82e45b6

Browse files
committed
Events interface (#189)
__Problem__ Insights into the internals of `ReactiveSocket` is not available for users. eg: Load balancing internals, leases sent/received, keep-alives sent/received, etc. __Modification__ This change is first of a series of changes intended to address this problem. It only contains the core interfaces to support event publications from `ReactiveSocket` internals. The model is adopted from the tried and tested model in `RxNetty` to provide an event publishing infrastructure that can be used to implement metrics of event firehose. One of the primary guideline of this model is to reduce the object allocations that comes along with an event stream based on emitting an event as an object. Instead, this approach embraces distinct callbacks that any listener can choose to implement if interested. Primary classes are `EventListener` and `EventSource`. There are a few extensions of `EventListener` to provide events specific to a server, client or load balancer. __Result__ Better insight into `ReactiveSocket` internals.
1 parent f419554 commit 82e45b6

File tree

5 files changed

+437
-0
lines changed

5 files changed

+437
-0
lines changed
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright 2016 Netflix, Inc.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivesocket.client.events;
15+
16+
import io.reactivesocket.client.LoadBalancingClient;
17+
import io.reactivesocket.events.ClientEventListener;
18+
19+
import java.net.SocketAddress;
20+
import java.util.concurrent.TimeUnit;
21+
22+
/**
23+
* A {@link ClientEventListener} for {@link LoadBalancingClient}
24+
*/
25+
public interface LoadBalancingClientListener extends ClientEventListener {
26+
27+
/**
28+
* Event when a new socket is added to the load balancer.
29+
*
30+
* @param socketAddress Address for the socket.
31+
*/
32+
default void socketAdded(SocketAddress socketAddress) {}
33+
34+
/**
35+
* Event when a socket is removed from the load balancer.
36+
*
37+
* @param socketAddress Address for the socket.
38+
*/
39+
default void socketRemoved(SocketAddress socketAddress) {}
40+
41+
/**
42+
* An event when a server is added to the load balancer.
43+
*
44+
* @param socketAddress Address for the server.
45+
*/
46+
default void serverAdded(SocketAddress socketAddress) {}
47+
48+
/**
49+
* An event when a server is removed from the load balancer.
50+
*
51+
* @param socketAddress Address for the server.
52+
*/
53+
default void serverRemoved(SocketAddress socketAddress) {}
54+
55+
/**
56+
* An event when the expected number of active sockets held by the load balancer changes.
57+
*
58+
* @param newAperture New aperture size, i.e. expected number of active sockets.
59+
*/
60+
default void apertureChanged(int newAperture) {}
61+
62+
/**
63+
* An event when the expected time period for refreshing active sockets in the load balancer changes.
64+
*
65+
* @param newPeriod New refresh period.
66+
* @param periodUnit {@link TimeUnit} for the refresh period.
67+
*/
68+
default void socketRefreshPeriodChanged(long newPeriod, TimeUnit periodUnit) {}
69+
70+
/**
71+
* An event to mark the start of the socket refresh cycle.
72+
*/
73+
default void socketsRefreshStart() {}
74+
75+
/**
76+
* An event to mark the end of the socket refresh cycle.
77+
*
78+
* @param duration Time taken to refresh sockets.
79+
* @param durationUnit {@code TimeUnit} for the duration.
80+
*/
81+
default void socketsRefreshCompleted(long duration, TimeUnit durationUnit) {}
82+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2016 Netflix, Inc.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivesocket.events;
15+
16+
import java.util.concurrent.TimeUnit;
17+
import java.util.function.DoubleSupplier;
18+
19+
/**
20+
* {@link EventListener} for a client.
21+
*/
22+
public interface ClientEventListener extends EventListener {
23+
24+
/**
25+
* Event when a new connection is initiated.
26+
*/
27+
default void connectStart() {}
28+
29+
/**
30+
* Event when a connection is successfully completed.
31+
*
32+
* @param socketAvailabilitySupplier A supplier for the availability of the connected socket.
33+
* @param duration Time taken since connection initiation and completion.
34+
* @param durationUnit {@code TimeUnit} for the duration.
35+
*/
36+
default void connectCompleted(DoubleSupplier socketAvailabilitySupplier, long duration, TimeUnit durationUnit) {}
37+
38+
/**
39+
* Event when a connection attempt fails.
40+
*
41+
* @param duration Time taken since connection initiation and failure.
42+
* @param durationUnit {@code TimeUnit} for the duration.
43+
* @param cause Cause for the failure.
44+
*/
45+
default void connectFailed(long duration, TimeUnit durationUnit, Throwable cause) {}
46+
}
Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
/*
2+
* Copyright 2016 Netflix, Inc.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivesocket.events;
15+
16+
import io.reactivesocket.DuplexConnection;
17+
import io.reactivesocket.FrameType;
18+
import io.reactivesocket.Payload;
19+
import io.reactivesocket.ReactiveSocket;
20+
import io.reactivesocket.lease.Lease;
21+
22+
import java.util.concurrent.TimeUnit;
23+
24+
/**
25+
* A listener of events for {@link ReactiveSocket}
26+
*/
27+
public interface EventListener {
28+
29+
/**
30+
* An enum to represent the various interaction models of {@code ReactiveSocket}.
31+
*/
32+
enum RequestType {
33+
RequestResponse,
34+
RequestStream,
35+
RequestChannel,
36+
MetadataPush,
37+
FireAndForget
38+
}
39+
40+
/**
41+
* Start event for receiving a new request from the peer. This callback will be invoked when the first frame for the
42+
* request is received.
43+
*
44+
* @param streamId Stream Id for the request.
45+
* @param type Request type.
46+
*/
47+
default void requestReceiveStart(int streamId, RequestType type) {}
48+
49+
/**
50+
* End event for receiving a new request from the peer. This callback will be invoked when the last frame for the
51+
* request is received. For single item requests like {@link ReactiveSocket#requestResponse(Payload)}, the two
52+
* events {@link #requestReceiveStart(int, RequestType)} and this will be emitted for the same frame. In case
53+
* request ends with an error, {@link #requestReceiveFailed(int, RequestType, long, TimeUnit, Throwable)} will be
54+
* called instead.
55+
*
56+
* @param streamId Stream Id for the request.
57+
* @param type Request type.
58+
* @param duration Time in the {@code durationUnit} since the start of the request receive.
59+
* @param durationUnit {@code TimeUnit} for the duration.
60+
*/
61+
default void requestReceiveComplete(int streamId, RequestType type, long duration, TimeUnit durationUnit) {}
62+
63+
/**
64+
* End event for receiving a new request from the peer. This callback will be invoked when an cause frame is
65+
* received on the request. If the request is successfully completed,
66+
* {@link #requestReceiveComplete(int, RequestType, long, TimeUnit)} will be called instead.
67+
*
68+
* @param streamId Stream Id for the request.
69+
* @param type Request type.
70+
* @param duration Time in the {@code durationUnit} since the start of the request receive.
71+
* @param durationUnit {@code TimeUnit} for the duration.
72+
* @param cause Cause for the failure.
73+
*/
74+
default void requestReceiveFailed(int streamId, RequestType type, long duration, TimeUnit durationUnit,
75+
Throwable cause) {}
76+
77+
/**
78+
* Start event for sending a new request to the peer. This callback will be invoked when first frame of the
79+
* request is successfully written to the underlying {@link DuplexConnection}. <p>
80+
* For latencies related to write and buffering of frames, the events must be exposed by the transport.
81+
*
82+
* @param streamId Stream Id for the request.
83+
* @param type Request type.
84+
*/
85+
default void requestSendStart(int streamId, RequestType type) {}
86+
87+
/**
88+
* End event for sending a new request to the peer. This callback will be invoked when last frame of the
89+
* request is successfully written to the underlying {@link DuplexConnection}.
90+
*
91+
* @param streamId Stream Id for the request.
92+
* @param type Request type.
93+
* @param duration Time between writing of the first request frame and last.
94+
* @param durationUnit {@code TimeUnit} for the duration.
95+
*/
96+
default void requestSendComplete(int streamId, RequestType type, long duration, TimeUnit durationUnit) {}
97+
98+
/**
99+
* End event for sending a new request to the peer. This callback will be invoked if the request itself emits an
100+
* error or the write to the underlying {@link DuplexConnection} failed.
101+
*
102+
* @param streamId Stream Id for the request.
103+
* @param type Request type.
104+
* @param duration Time between writing of the first request frame and error.
105+
* @param durationUnit {@code TimeUnit} for the duration.
106+
* @param cause Cause for the failure.
107+
*/
108+
default void requestSendFailed(int streamId, RequestType type, long duration, TimeUnit durationUnit,
109+
Throwable cause) {}
110+
111+
/**
112+
* Start event for sending a response to the peer. This callback will be invoked when first frame of the
113+
* response is written to the underlying {@link DuplexConnection}.
114+
*
115+
* @param streamId Stream Id for the response.
116+
* @param type Request type.
117+
* @param duration Time between event {@link #requestSendComplete(int, RequestType, long, TimeUnit)} and this.
118+
* @param durationUnit {@code TimeUnit} for the duration.
119+
*/
120+
default void responseSendStart(int streamId, RequestType type, long duration, TimeUnit durationUnit) {}
121+
122+
/**
123+
* End event for sending a response to the peer. This callback will be invoked when last frame of the
124+
* response is written to the underlying {@link DuplexConnection}.
125+
*
126+
* @param streamId Stream Id for the response.
127+
* @param type Request type.
128+
* @param duration Time between sending the first response frame and last.
129+
* @param durationUnit {@code TimeUnit} for the duration.
130+
*/
131+
default void responseSendComplete(int streamId, RequestType type, long duration, TimeUnit durationUnit) {}
132+
133+
/**
134+
* End event for sending a response to the peer. This callback will be invoked when the response terminates with
135+
* an error.
136+
*
137+
* @param streamId Stream Id for the response.
138+
* @param type Request type.
139+
* @param duration Time between sending the first response frame and error.
140+
* @param durationUnit {@code TimeUnit} for the duration.
141+
* @param cause Cause for the failure.
142+
*/
143+
default void responseSendFailed(int streamId, RequestType type, long duration, TimeUnit durationUnit,
144+
Throwable cause) {}
145+
146+
/**
147+
* Start event for receiving a response from the peer. This callback will be invoked when first frame of the
148+
* response is received from the underlying {@link DuplexConnection}.
149+
*
150+
* @param streamId Stream Id for the response.
151+
* @param type Request type.
152+
* @param duration Time between event {@link #requestSendComplete(int, RequestType, long, TimeUnit)} and this.
153+
* @param durationUnit {@code TimeUnit} for the duration.
154+
*/
155+
default void responseReceiveStart(int streamId, RequestType type, long duration, TimeUnit durationUnit) {}
156+
157+
/**
158+
* End event for receiving a response from the peer. This callback will be invoked when last frame of the
159+
* response is received from the underlying {@link DuplexConnection}.
160+
*
161+
* @param streamId Stream Id for the response.
162+
* @param type Request type.
163+
* @param duration Time between receiving the first response frame and last.
164+
* @param durationUnit {@code TimeUnit} for the duration.
165+
*/
166+
default void responseReceiveComplete(int streamId, RequestType type, long duration, TimeUnit durationUnit) {}
167+
168+
/**
169+
* End event for receiving a response from the peer. This callback will be invoked when the response terminates with
170+
* an error.
171+
*
172+
* @param streamId Stream Id for the response.
173+
* @param type Request type.
174+
* @param duration Time between receiving the first response frame and error.
175+
* @param durationUnit {@code TimeUnit} for the duration.
176+
* @param cause Cause for the failure.
177+
*/
178+
default void responseReceiveFailed(int streamId, RequestType type, long duration, TimeUnit durationUnit,
179+
Throwable cause) {}
180+
181+
/**
182+
* On {@code ReactiveSocket} close.
183+
*
184+
* @param duration Time for which the socket was active.
185+
* @param durationUnit {@code TimeUnit} for the duration.
186+
*/
187+
default void socketClosed(long duration, TimeUnit durationUnit) {}
188+
189+
/**
190+
* When a frame of type {@code frameType} is written.
191+
*
192+
* @param streamId Stream Id for the frame.
193+
* @param frameType Type of the frame.
194+
*/
195+
default void frameWritten(int streamId, FrameType frameType) {}
196+
197+
/**
198+
* When a frame of type {@code frameType} is read.
199+
*
200+
* @param streamId Stream Id for the frame.
201+
* @param frameType Type of the frame.
202+
*/
203+
default void frameRead(int streamId, FrameType frameType) {}
204+
205+
/**
206+
* When a lease is sent.
207+
*
208+
* @param lease Lease sent.
209+
*/
210+
default void leaseSent(Lease lease) {}
211+
212+
/**
213+
* When a lease is received.
214+
*
215+
* @param lease Lease received.
216+
*/
217+
default void leaseReceived(Lease lease) {}
218+
219+
/**
220+
* When an error is sent.
221+
*
222+
* @param streamId Stream Id for the error.
223+
* @param errorCode Error code.
224+
*/
225+
default void errorSent(int streamId, int errorCode) {}
226+
227+
/**
228+
* When an error is received.
229+
*
230+
* @param streamId Stream Id for the error.
231+
* @param errorCode Error code.
232+
*/
233+
default void errorReceived(int streamId, int errorCode) {}
234+
235+
}

0 commit comments

Comments
 (0)