Skip to content

Replace servo usage with spectator #219

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion reactivesocket-examples/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ dependencies {
compile project(':reactivesocket-core')
compile project(':reactivesocket-client')
compile project(':reactivesocket-discovery-eureka')
compile project(':reactivesocket-stats-servo')
compile project(':reactivesocket-spectator')
compile project(':reactivesocket-transport-tcp')
compile project(':reactivesocket-transport-local')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

dependencies {
compile project(':reactivesocket-core')
compile 'com.netflix.servo:servo-core:latest.release'
compile 'com.netflix.spectator:spectator-api:0.45.0'
compile 'org.hdrhistogram:HdrHistogram:latest.release'

testCompile project(':reactivesocket-test')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,11 @@
* limitations under the License.
*/

package io.reactivesocket.loadbalancer.servo;
package io.reactivesocket.spectator;

import com.google.common.util.concurrent.AtomicDouble;
import com.netflix.servo.DefaultMonitorRegistry;
import com.netflix.servo.monitor.DoubleGauge;
import com.netflix.servo.monitor.MonitorConfig;
import com.netflix.servo.tag.TagList;
import com.netflix.spectator.api.Id;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.impl.AtomicDouble;
import io.reactivesocket.Payload;
import io.reactivesocket.ReactiveSocket;
import org.reactivestreams.Publisher;
Expand All @@ -29,26 +27,17 @@
* ReactiveSocket that delegates all calls to child reactive socket, and records the current availability as a servo metric
*/
public class AvailabilityMetricReactiveSocket implements ReactiveSocket {
private final ReactiveSocket child;

private final DoubleGauge availabilityGauge;

private final ReactiveSocket child;
private final AtomicDouble atomicDouble;

public AvailabilityMetricReactiveSocket(ReactiveSocket child, String name, TagList tags) {
public AvailabilityMetricReactiveSocket(ReactiveSocket child, Registry registry, String name, String monitorId) {
atomicDouble = new AtomicDouble();
this.child = child;
MonitorConfig.Builder builder = MonitorConfig.builder(name);

if (tags != null) {
builder.withTags(tags);
}
MonitorConfig config = builder.build();
availabilityGauge = new DoubleGauge(config);
DefaultMonitorRegistry.getInstance().register(availabilityGauge);
atomicDouble = availabilityGauge.getNumber();
Id id = registry.createId(name, "id", monitorId);
registry.gauge(id, this, socket -> socket.atomicDouble.get());
}


@Override
public Publisher<Payload> requestResponse(Payload payload) {
return child.requestResponse(payload);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
/*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Copyright 2017 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.reactivesocket.loadbalancer.servo;
package io.reactivesocket.spectator;

import io.reactivesocket.Payload;
import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.loadbalancer.servo.internal.HdrHistogramServoTimer;
import io.reactivesocket.loadbalancer.servo.internal.ThreadLocalAdderCounter;
import io.reactivesocket.spectator.internal.HdrHistogramPercentileTimer;
import io.reactivesocket.spectator.internal.ThreadLocalAdderCounter;
import io.reactivesocket.util.ReactiveSocketProxy;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
Expand All @@ -29,10 +26,10 @@
/**
* An implementation of {@link ReactiveSocket} that sends metrics to Servo
*/
public class ServoMetricsReactiveSocket extends ReactiveSocketProxy {
public class InstrumentedReactiveSocket extends ReactiveSocketProxy {
final ThreadLocalAdderCounter success;
final ThreadLocalAdderCounter failure;
final HdrHistogramServoTimer timer;
final HdrHistogramPercentileTimer timer;

private class RecordingSubscriber<T> implements Subscriber<T> {
private final Subscriber<T> child;
Expand Down Expand Up @@ -66,11 +63,11 @@ public void onComplete() {
}
}

public ServoMetricsReactiveSocket(ReactiveSocket child, String prefix) {
public InstrumentedReactiveSocket(ReactiveSocket child, String prefix) {
super(child);
this.success = ThreadLocalAdderCounter.newThreadLocalAdderCounter(prefix + "_success");
this.failure = ThreadLocalAdderCounter.newThreadLocalAdderCounter(prefix + "_failure");
this.timer = HdrHistogramServoTimer.newInstance(prefix + "_timer");
success = new ThreadLocalAdderCounter("success", prefix);
failure = new ThreadLocalAdderCounter("failure", prefix);
timer = new HdrHistogramPercentileTimer("latency", prefix);
}

@Override
Expand Down Expand Up @@ -118,13 +115,13 @@ public String histrogramToString() {

StringBuilder s = new StringBuilder();
s.append(String.format("%-12s%-12s\n","Percentile","Latency"));
s.append(String.format("=========================\n"));
s.append("=========================\n");
s.append(String.format("%-12s%dms\n","50%",NANOSECONDS.toMillis(timer.getP50())));
s.append(String.format("%-12s%dms\n","90%",NANOSECONDS.toMillis(timer.getP90())));
s.append(String.format("%-12s%dms\n","99%",NANOSECONDS.toMillis(timer.getP99())));
s.append(String.format("%-12s%dms\n","99.9%",NANOSECONDS.toMillis(timer.getP99_9())));
s.append(String.format("%-12s%dms\n","99.99%",NANOSECONDS.toMillis(timer.getP99_99())));
s.append(String.format("-------------------------\n"));
s.append("-------------------------\n");
s.append(String.format("%-12s%dms\n","min",NANOSECONDS.toMillis(timer.getMin())));
s.append(String.format("%-12s%dms\n","max",NANOSECONDS.toMillis(timer.getMax())));
s.append(String.format("%-12s%d (%.0f%%)\n","success",successCount,100.0*successCount/totalCount));
Expand All @@ -133,7 +130,7 @@ public String histrogramToString() {
return s.toString();
}

private long recordStart() {
private static long recordStart() {
return System.nanoTime();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.reactivesocket.spectator.internal;

import com.netflix.spectator.api.Id;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Spectator;

import java.util.concurrent.TimeUnit;
import java.util.function.ToDoubleFunction;

/**
* Captures a HdrHistogram and sends it to pre-defined Server Counters.
* The buckets are min, max, 50%, 90%, 99%, 99.9%, and 99.99%
*/
public class HdrHistogramPercentileTimer {
private final SlidingWindowHistogram histogram = new SlidingWindowHistogram();

private static final long TIMEOUT = TimeUnit.MINUTES.toMillis(1);

private volatile long lastCleared = System.currentTimeMillis();

public HdrHistogramPercentileTimer(Registry registry, String name, String monitorId) {
registerGauge(name, monitorId, registry, "min", timer -> getMin());
registerGauge(name, monitorId, registry, "max", timer -> getMax());
registerGauge(name, monitorId, registry, "50", timer -> getP50());
registerGauge(name, monitorId, registry, "90", timer -> getP90());
registerGauge(name, monitorId, registry, "99", timer -> getP99());
registerGauge(name, monitorId, registry, "99.9", timer -> getP99_9());
registerGauge(name, monitorId, registry, "99.99", timer -> getP99_99());
}

public HdrHistogramPercentileTimer(String name, String monitorId) {
this(Spectator.globalRegistry(), name, monitorId);
}

/**
* Records a value for to the histogram and updates the Servo counter buckets
*
* @param value the value to update
*/
public void record(long value) {
histogram.recordValue(value);
}

public Long getMin() {
return histogram.aggregateHistogram().getMinValue();
}

public Long getMax() {
return histogram.aggregateHistogram().getMaxValue();
}

public Long getP50() {
return getPercentile(50);
}

public Long getP90() {
return getPercentile(90);
}

public Long getP99() {
return getPercentile(99);
}

public Long getP99_9() {
return getPercentile(99.9);
}

public Long getP99_99() {
return getPercentile(99.99);
}

private synchronized void slide() {
if (System.currentTimeMillis() - lastCleared > TIMEOUT) {
histogram.rotateHistogram();
lastCleared = System.currentTimeMillis();
}
}

private Long getPercentile(double percentile) {
slide();
return histogram.aggregateHistogram().getValueAtPercentile(percentile);
}

private void registerGauge(String metricName, String monitorId, Registry registry, String percentileTag,
ToDoubleFunction<HdrHistogramPercentileTimer> function) {
Id id = registry.createId(metricName, "id", monitorId, "value", percentileTag);
registry.gauge(id, this, function);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,17 @@
package io.reactivesocket.loadbalancer.servo.internal;
/*
* Copyright 2017 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.reactivesocket.spectator.internal;

import org.HdrHistogram.ConcurrentHistogram;
import org.HdrHistogram.Histogram;
Expand All @@ -15,7 +28,7 @@ public class SlidingWindowHistogram {

private final ArrayDeque<Histogram> histogramQueue;

private final Object LOCK = new Object();
private final Object lock = new Object();

public SlidingWindowHistogram() {
this(5);
Expand All @@ -25,8 +38,8 @@ public SlidingWindowHistogram(final int numOfWindows) {
if (numOfWindows < 2) {
throw new IllegalArgumentException("number of windows must be greater than 1");
}
this.histogramQueue = new ArrayDeque<>(numOfWindows - 1);
this.liveHistogram = createHistogram();
histogramQueue = new ArrayDeque<>(numOfWindows - 1);
liveHistogram = createHistogram();

for (int i = 0; i < numOfWindows - 1; i++) {
histogramQueue.offer(createHistogram());
Expand All @@ -53,7 +66,7 @@ public void recordValue(long value) {
* on in the queue.
*/
public void rotateHistogram() {
synchronized (LOCK) {
synchronized (lock) {
Histogram onDeck = histogramQueue.poll();
if (onDeck != null) {
onDeck.reset();
Expand All @@ -72,7 +85,7 @@ public void rotateHistogram() {
public Histogram aggregateHistogram() {
Histogram aggregate = createHistogram();

synchronized (LOCK) {
synchronized (lock) {
aggregate.add(liveHistogram);
histogramQueue
.forEach(aggregate::add);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.reactivesocket.loadbalancer.servo.internal;
package io.reactivesocket.spectator.internal;

import org.agrona.UnsafeAccess;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.reactivesocket.spectator.internal;


import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Id;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Spectator;
import com.netflix.spectator.api.Tag;

import java.util.List;

/**
* A {@link Counter} implementation that uses {@link ThreadLocalAdderCounter}
*/
public class ThreadLocalAdderCounter {

private final ThreadLocalAdder adder = new ThreadLocalAdder();
private final Counter counter;

public ThreadLocalAdderCounter(String name, String monitorId) {
this(Spectator.globalRegistry(), name, monitorId);
}

public ThreadLocalAdderCounter(Registry registry, String name, String monitorId) {
counter = registry.counter(name, "id", monitorId);
}

public void increment() {
adder.increment();
}

public void increment(long amount) {
adder.increment(amount);
}

public long get() {
return adder.get();
}
}
Loading