Skip to content

Commit bafd440

Browse files
Merge pull request #249 from jmhofer/timestamp
Timestamp operation
2 parents 1aa722d + f02257a commit bafd440

File tree

3 files changed

+126
-0
lines changed

3 files changed

+126
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import rx.operators.OperationTakeLast;
6666
import rx.operators.OperationTakeUntil;
6767
import rx.operators.OperationTakeWhile;
68+
import rx.operators.OperationTimestamp;
6869
import rx.operators.OperationToFuture;
6970
import rx.operators.OperationToIterator;
7071
import rx.operators.OperationToObservableFuture;
@@ -82,6 +83,7 @@
8283
import rx.util.AtomicObservableSubscription;
8384
import rx.util.AtomicObserver;
8485
import rx.util.Range;
86+
import rx.util.Timestamped;
8587
import rx.util.functions.Action0;
8688
import rx.util.functions.Action1;
8789
import rx.util.functions.Func0;
@@ -2093,6 +2095,14 @@ public Boolean call(T t, Integer integer)
20932095
}));
20942096
}
20952097

2098+
/**
2099+
* Adds a timestamp to each item emitted by this observable.
2100+
* @return An observable sequence of timestamped items.
2101+
*/
2102+
public Observable<Timestamped<T>> timestamp() {
2103+
return create(OperationTimestamp.timestamp(this));
2104+
}
2105+
20962106
/**
20972107
* Return a Future representing a single value of the Observable.
20982108
* <p>
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Observable;
19+
import rx.Observer;
20+
import rx.Subscription;
21+
import rx.util.Timestamped;
22+
import rx.util.functions.Func1;
23+
24+
public final class OperationTimestamp {
25+
26+
/**
27+
* Accepts a sequence and adds timestamps to each item in it.
28+
*
29+
* @param sequence
30+
* the input sequence.
31+
* @param <T>
32+
* the type of the input sequence.
33+
* @return a sequence of timestamped values created by adding timestamps to each item in the input sequence.
34+
*/
35+
public static <T> Func1<Observer<Timestamped<T>>, Subscription> timestamp(Observable<T> sequence) {
36+
return OperationMap.map(sequence, new Func1<T, Timestamped<T>>() {
37+
@Override
38+
public Timestamped<T> call(T value) {
39+
return new Timestamped<T>(System.currentTimeMillis(), value);
40+
}
41+
});
42+
}
43+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.util;
17+
18+
/**
19+
* Composite class that takes a value and a timestamp and wraps them.
20+
*/
21+
public final class Timestamped<T> {
22+
private final long timestampMillis;
23+
private final T value;
24+
25+
public Timestamped(long timestampMillis, T value) {
26+
this.value = value;
27+
this.timestampMillis = timestampMillis;
28+
}
29+
30+
public long getTimestampMillis() {
31+
return timestampMillis;
32+
}
33+
34+
public T getValue() {
35+
return value;
36+
}
37+
38+
@Override
39+
public boolean equals(Object obj) {
40+
if (this == obj) {
41+
return true;
42+
}
43+
if (!(obj instanceof Timestamped)) {
44+
return false;
45+
}
46+
Timestamped<?> other = (Timestamped<?>) obj;
47+
if (timestampMillis != other.timestampMillis) {
48+
return false;
49+
}
50+
if (value == null) {
51+
if (other.value != null) {
52+
return false;
53+
}
54+
} else if (!value.equals(other.value)) {
55+
return false;
56+
}
57+
return true;
58+
}
59+
60+
@Override
61+
public int hashCode() {
62+
final int prime = 31;
63+
int result = 1;
64+
result = prime * result + (int) (timestampMillis ^ (timestampMillis));
65+
result = prime * result + ((value == null) ? 0 : value.hashCode());
66+
return result;
67+
}
68+
69+
@Override
70+
public String toString() {
71+
return String.format("Timestamped(timestampMillis = %d, value = %s)", timestampMillis, value.toString());
72+
}
73+
}

0 commit comments

Comments
 (0)