Skip to content

Commit f058520

Browse files
authored
Replace servo usage with spectator (#219)
__Problem__ Currently, servo is being used for metric. Spectator is a more generic library and has better APIs. __Modification__ Modified classes to use spectator-api which reduces the code considerably. This is a change required before we layer the events => metric changes. __Result__ Using recommended library at Netflix for metric and lesser code.
1 parent e5dc7e0 commit f058520

File tree

16 files changed

+233
-434
lines changed

16 files changed

+233
-434
lines changed

reactivesocket-examples/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ dependencies {
3636
compile project(':reactivesocket-core')
3737
compile project(':reactivesocket-client')
3838
compile project(':reactivesocket-discovery-eureka')
39-
compile project(':reactivesocket-stats-servo')
39+
compile project(':reactivesocket-spectator')
4040
compile project(':reactivesocket-transport-tcp')
4141
compile project(':reactivesocket-transport-local')
4242

reactivesocket-stats-servo/build.gradle renamed to reactivesocket-spectator/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
dependencies {
1818
compile project(':reactivesocket-core')
19-
compile 'com.netflix.servo:servo-core:latest.release'
19+
compile 'com.netflix.spectator:spectator-api:0.45.0'
2020
compile 'org.hdrhistogram:HdrHistogram:latest.release'
2121

2222
testCompile project(':reactivesocket-test')
Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,11 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.reactivesocket.loadbalancer.servo;
17+
package io.reactivesocket.spectator;
1818

19-
import com.google.common.util.concurrent.AtomicDouble;
20-
import com.netflix.servo.DefaultMonitorRegistry;
21-
import com.netflix.servo.monitor.DoubleGauge;
22-
import com.netflix.servo.monitor.MonitorConfig;
23-
import com.netflix.servo.tag.TagList;
19+
import com.netflix.spectator.api.Id;
20+
import com.netflix.spectator.api.Registry;
21+
import com.netflix.spectator.impl.AtomicDouble;
2422
import io.reactivesocket.Payload;
2523
import io.reactivesocket.ReactiveSocket;
2624
import org.reactivestreams.Publisher;
@@ -29,26 +27,17 @@
2927
* ReactiveSocket that delegates all calls to child reactive socket, and records the current availability as a servo metric
3028
*/
3129
public class AvailabilityMetricReactiveSocket implements ReactiveSocket {
32-
private final ReactiveSocket child;
33-
34-
private final DoubleGauge availabilityGauge;
3530

31+
private final ReactiveSocket child;
3632
private final AtomicDouble atomicDouble;
3733

38-
public AvailabilityMetricReactiveSocket(ReactiveSocket child, String name, TagList tags) {
34+
public AvailabilityMetricReactiveSocket(ReactiveSocket child, Registry registry, String name, String monitorId) {
35+
atomicDouble = new AtomicDouble();
3936
this.child = child;
40-
MonitorConfig.Builder builder = MonitorConfig.builder(name);
41-
42-
if (tags != null) {
43-
builder.withTags(tags);
44-
}
45-
MonitorConfig config = builder.build();
46-
availabilityGauge = new DoubleGauge(config);
47-
DefaultMonitorRegistry.getInstance().register(availabilityGauge);
48-
atomicDouble = availabilityGauge.getNumber();
37+
Id id = registry.createId(name, "id", monitorId);
38+
registry.gauge(id, this, socket -> socket.atomicDouble.get());
4939
}
5040

51-
5241
@Override
5342
public Publisher<Payload> requestResponse(Payload payload) {
5443
return child.requestResponse(payload);
Lines changed: 22 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,21 @@
11
/*
2-
* Copyright 2016 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
2+
* Copyright 2017 Netflix, Inc.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
1512
*/
16-
package io.reactivesocket.loadbalancer.servo;
13+
package io.reactivesocket.spectator;
1714

1815
import io.reactivesocket.Payload;
1916
import io.reactivesocket.ReactiveSocket;
20-
import io.reactivesocket.loadbalancer.servo.internal.HdrHistogramServoTimer;
21-
import io.reactivesocket.loadbalancer.servo.internal.ThreadLocalAdderCounter;
17+
import io.reactivesocket.spectator.internal.HdrHistogramPercentileTimer;
18+
import io.reactivesocket.spectator.internal.ThreadLocalAdderCounter;
2219
import io.reactivesocket.util.ReactiveSocketProxy;
2320
import org.reactivestreams.Publisher;
2421
import org.reactivestreams.Subscriber;
@@ -29,10 +26,10 @@
2926
/**
3027
* An implementation of {@link ReactiveSocket} that sends metrics to Servo
3128
*/
32-
public class ServoMetricsReactiveSocket extends ReactiveSocketProxy {
29+
public class InstrumentedReactiveSocket extends ReactiveSocketProxy {
3330
final ThreadLocalAdderCounter success;
3431
final ThreadLocalAdderCounter failure;
35-
final HdrHistogramServoTimer timer;
32+
final HdrHistogramPercentileTimer timer;
3633

3734
private class RecordingSubscriber<T> implements Subscriber<T> {
3835
private final Subscriber<T> child;
@@ -66,11 +63,11 @@ public void onComplete() {
6663
}
6764
}
6865

69-
public ServoMetricsReactiveSocket(ReactiveSocket child, String prefix) {
66+
public InstrumentedReactiveSocket(ReactiveSocket child, String prefix) {
7067
super(child);
71-
this.success = ThreadLocalAdderCounter.newThreadLocalAdderCounter(prefix + "_success");
72-
this.failure = ThreadLocalAdderCounter.newThreadLocalAdderCounter(prefix + "_failure");
73-
this.timer = HdrHistogramServoTimer.newInstance(prefix + "_timer");
68+
success = new ThreadLocalAdderCounter("success", prefix);
69+
failure = new ThreadLocalAdderCounter("failure", prefix);
70+
timer = new HdrHistogramPercentileTimer("latency", prefix);
7471
}
7572

7673
@Override
@@ -118,13 +115,13 @@ public String histrogramToString() {
118115

119116
StringBuilder s = new StringBuilder();
120117
s.append(String.format("%-12s%-12s\n","Percentile","Latency"));
121-
s.append(String.format("=========================\n"));
118+
s.append("=========================\n");
122119
s.append(String.format("%-12s%dms\n","50%",NANOSECONDS.toMillis(timer.getP50())));
123120
s.append(String.format("%-12s%dms\n","90%",NANOSECONDS.toMillis(timer.getP90())));
124121
s.append(String.format("%-12s%dms\n","99%",NANOSECONDS.toMillis(timer.getP99())));
125122
s.append(String.format("%-12s%dms\n","99.9%",NANOSECONDS.toMillis(timer.getP99_9())));
126123
s.append(String.format("%-12s%dms\n","99.99%",NANOSECONDS.toMillis(timer.getP99_99())));
127-
s.append(String.format("-------------------------\n"));
124+
s.append("-------------------------\n");
128125
s.append(String.format("%-12s%dms\n","min",NANOSECONDS.toMillis(timer.getMin())));
129126
s.append(String.format("%-12s%dms\n","max",NANOSECONDS.toMillis(timer.getMax())));
130127
s.append(String.format("%-12s%d (%.0f%%)\n","success",successCount,100.0*successCount/totalCount));
@@ -133,7 +130,7 @@ public String histrogramToString() {
133130
return s.toString();
134131
}
135132

136-
private long recordStart() {
133+
private static long recordStart() {
137134
return System.nanoTime();
138135
}
139136

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.reactivesocket.spectator.internal;
17+
18+
import com.netflix.spectator.api.Id;
19+
import com.netflix.spectator.api.Registry;
20+
import com.netflix.spectator.api.Spectator;
21+
22+
import java.util.concurrent.TimeUnit;
23+
import java.util.function.ToDoubleFunction;
24+
25+
/**
26+
* Captures a HdrHistogram and sends it to pre-defined Server Counters.
27+
* The buckets are min, max, 50%, 90%, 99%, 99.9%, and 99.99%
28+
*/
29+
public class HdrHistogramPercentileTimer {
30+
private final SlidingWindowHistogram histogram = new SlidingWindowHistogram();
31+
32+
private static final long TIMEOUT = TimeUnit.MINUTES.toMillis(1);
33+
34+
private volatile long lastCleared = System.currentTimeMillis();
35+
36+
public HdrHistogramPercentileTimer(Registry registry, String name, String monitorId) {
37+
registerGauge(name, monitorId, registry, "min", timer -> getMin());
38+
registerGauge(name, monitorId, registry, "max", timer -> getMax());
39+
registerGauge(name, monitorId, registry, "50", timer -> getP50());
40+
registerGauge(name, monitorId, registry, "90", timer -> getP90());
41+
registerGauge(name, monitorId, registry, "99", timer -> getP99());
42+
registerGauge(name, monitorId, registry, "99.9", timer -> getP99_9());
43+
registerGauge(name, monitorId, registry, "99.99", timer -> getP99_99());
44+
}
45+
46+
public HdrHistogramPercentileTimer(String name, String monitorId) {
47+
this(Spectator.globalRegistry(), name, monitorId);
48+
}
49+
50+
/**
51+
* Records a value for to the histogram and updates the Servo counter buckets
52+
*
53+
* @param value the value to update
54+
*/
55+
public void record(long value) {
56+
histogram.recordValue(value);
57+
}
58+
59+
public Long getMin() {
60+
return histogram.aggregateHistogram().getMinValue();
61+
}
62+
63+
public Long getMax() {
64+
return histogram.aggregateHistogram().getMaxValue();
65+
}
66+
67+
public Long getP50() {
68+
return getPercentile(50);
69+
}
70+
71+
public Long getP90() {
72+
return getPercentile(90);
73+
}
74+
75+
public Long getP99() {
76+
return getPercentile(99);
77+
}
78+
79+
public Long getP99_9() {
80+
return getPercentile(99.9);
81+
}
82+
83+
public Long getP99_99() {
84+
return getPercentile(99.99);
85+
}
86+
87+
private synchronized void slide() {
88+
if (System.currentTimeMillis() - lastCleared > TIMEOUT) {
89+
histogram.rotateHistogram();
90+
lastCleared = System.currentTimeMillis();
91+
}
92+
}
93+
94+
private Long getPercentile(double percentile) {
95+
slide();
96+
return histogram.aggregateHistogram().getValueAtPercentile(percentile);
97+
}
98+
99+
private void registerGauge(String metricName, String monitorId, Registry registry, String percentileTag,
100+
ToDoubleFunction<HdrHistogramPercentileTimer> function) {
101+
Id id = registry.createId(metricName, "id", monitorId, "value", percentileTag);
102+
registry.gauge(id, this, function);
103+
}
104+
}
Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,17 @@
1-
package io.reactivesocket.loadbalancer.servo.internal;
1+
/*
2+
* Copyright 2017 Netflix, Inc.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivesocket.spectator.internal;
215

316
import org.HdrHistogram.ConcurrentHistogram;
417
import org.HdrHistogram.Histogram;
@@ -15,7 +28,7 @@ public class SlidingWindowHistogram {
1528

1629
private final ArrayDeque<Histogram> histogramQueue;
1730

18-
private final Object LOCK = new Object();
31+
private final Object lock = new Object();
1932

2033
public SlidingWindowHistogram() {
2134
this(5);
@@ -25,8 +38,8 @@ public SlidingWindowHistogram(final int numOfWindows) {
2538
if (numOfWindows < 2) {
2639
throw new IllegalArgumentException("number of windows must be greater than 1");
2740
}
28-
this.histogramQueue = new ArrayDeque<>(numOfWindows - 1);
29-
this.liveHistogram = createHistogram();
41+
histogramQueue = new ArrayDeque<>(numOfWindows - 1);
42+
liveHistogram = createHistogram();
3043

3144
for (int i = 0; i < numOfWindows - 1; i++) {
3245
histogramQueue.offer(createHistogram());
@@ -53,7 +66,7 @@ public void recordValue(long value) {
5366
* on in the queue.
5467
*/
5568
public void rotateHistogram() {
56-
synchronized (LOCK) {
69+
synchronized (lock) {
5770
Histogram onDeck = histogramQueue.poll();
5871
if (onDeck != null) {
5972
onDeck.reset();
@@ -72,7 +85,7 @@ public void rotateHistogram() {
7285
public Histogram aggregateHistogram() {
7386
Histogram aggregate = createHistogram();
7487

75-
synchronized (LOCK) {
88+
synchronized (lock) {
7689
aggregate.add(liveHistogram);
7790
histogramQueue
7891
.forEach(aggregate::add);
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.reactivesocket.loadbalancer.servo.internal;
16+
package io.reactivesocket.spectator.internal;
1717

1818
import org.agrona.UnsafeAccess;
1919

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.reactivesocket.spectator.internal;
17+
18+
19+
import com.netflix.spectator.api.Counter;
20+
import com.netflix.spectator.api.Id;
21+
import com.netflix.spectator.api.Registry;
22+
import com.netflix.spectator.api.Spectator;
23+
import com.netflix.spectator.api.Tag;
24+
25+
import java.util.List;
26+
27+
/**
28+
* A {@link Counter} implementation that uses {@link ThreadLocalAdderCounter}
29+
*/
30+
public class ThreadLocalAdderCounter {
31+
32+
private final ThreadLocalAdder adder = new ThreadLocalAdder();
33+
private final Counter counter;
34+
35+
public ThreadLocalAdderCounter(String name, String monitorId) {
36+
this(Spectator.globalRegistry(), name, monitorId);
37+
}
38+
39+
public ThreadLocalAdderCounter(Registry registry, String name, String monitorId) {
40+
counter = registry.counter(name, "id", monitorId);
41+
}
42+
43+
public void increment() {
44+
adder.increment();
45+
}
46+
47+
public void increment(long amount) {
48+
adder.increment(amount);
49+
}
50+
51+
public long get() {
52+
return adder.get();
53+
}
54+
}

0 commit comments

Comments
 (0)