Skip to content

Commit c4a2762

Browse files
Pillage Concat from RxJava v2
RxJava v2 isn't ready to depend on, so this pillages the necessary code for OperatorConcatMap to work which I'll adjust to work for the fragmentation requirements.
1 parent 8fa38c5 commit c4a2762

14 files changed

+1922
-0
lines changed
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivesocket.internal.rx;
15+
16+
import java.util.function.*;
17+
18+
/**
19+
* A linked-array-list implementation that only supports appending and consumption.
20+
*
21+
* @param <T> the value type
22+
*/
23+
public class AppendOnlyLinkedArrayList<T> {
24+
final int capacity;
25+
Object[] head;
26+
Object[] tail;
27+
int offset;
28+
29+
/**
30+
* Constructs an empty list with a per-link capacity
31+
* @param capacity the capacity of each link
32+
*/
33+
public AppendOnlyLinkedArrayList(int capacity) {
34+
this.capacity = capacity;
35+
this.head = new Object[capacity + 1];
36+
this.tail = head;
37+
}
38+
39+
/**
40+
* Append a non-null value to the list.
41+
* <p>Don't add null to the list!
42+
* @param value the value to append
43+
*/
44+
public void add(T value) {
45+
final int c = capacity;
46+
int o = offset;
47+
if (o == c) {
48+
Object[] next = new Object[c + 1];
49+
tail[c] = next;
50+
tail = next;
51+
o = 0;
52+
}
53+
tail[o] = value;
54+
offset = o + 1;
55+
}
56+
57+
/**
58+
* Set a value as the first element of the list.
59+
* @param value the value to set
60+
*/
61+
public void setFirst(T value) {
62+
head[0] = value;
63+
}
64+
65+
/**
66+
* Loops through all elements of the list.
67+
* @param consumer the consumer of elements
68+
*/
69+
@SuppressWarnings("unchecked")
70+
public void forEach(Consumer<? super T> consumer) {
71+
Object[] a = head;
72+
final int c = capacity;
73+
while (a != null) {
74+
for (int i = 0; i < c; i++) {
75+
Object o = a[i];
76+
if (o == null) {
77+
return;
78+
}
79+
consumer.accept((T)o);
80+
}
81+
a = (Object[])a[c];
82+
}
83+
}
84+
85+
/**
86+
* Loops over all elements of the array until a null element is encountered or
87+
* the given predicate returns true.
88+
* @param consumer the consumer of values that returns true if the forEach should terminate
89+
*/
90+
@SuppressWarnings("unchecked")
91+
public void forEachWhile(Predicate<? super T> consumer) {
92+
Object[] a = head;
93+
final int c = capacity;
94+
while (a != null) {
95+
for (int i = 0; i < c; i++) {
96+
Object o = a[i];
97+
if (o == null) {
98+
return;
99+
}
100+
if (consumer.test((T)o)) {
101+
return;
102+
}
103+
}
104+
a = (Object[])a[c];
105+
}
106+
}
107+
108+
@SuppressWarnings("unchecked")
109+
public <S> void forEachWhile(S state, BiPredicate<? super S, ? super T> consumer) {
110+
Object[] a = head;
111+
final int c = capacity;
112+
while (a != null) {
113+
for (int i = 0; i < c; i++) {
114+
Object o = a[i];
115+
if (o == null) {
116+
return;
117+
}
118+
if (consumer.test(state, (T)o)) {
119+
return;
120+
}
121+
}
122+
a = (Object[])a[c];
123+
}
124+
}
125+
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
/*
15+
* The code was inspired by the similarly named JCTools class:
16+
* https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic
17+
*/
18+
19+
package io.reactivesocket.internal.rx;
20+
21+
import java.util.*;
22+
import java.util.concurrent.atomic.AtomicReferenceArray;
23+
24+
abstract class BaseArrayQueue<E> extends AtomicReferenceArray<E> implements Queue<E> {
25+
/** */
26+
private static final long serialVersionUID = 5238363267841964068L;
27+
protected final int mask;
28+
public BaseArrayQueue(int capacity) {
29+
super(Pow2.roundToPowerOfTwo(capacity));
30+
this.mask = length() - 1;
31+
}
32+
@Override
33+
public Iterator<E> iterator() {
34+
throw new UnsupportedOperationException();
35+
}
36+
@Override
37+
public void clear() {
38+
// we have to test isEmpty because of the weaker poll() guarantee
39+
while (poll() != null || !isEmpty())
40+
;
41+
}
42+
protected final int calcElementOffset(long index, int mask) {
43+
return (int)index & mask;
44+
}
45+
protected final int calcElementOffset(long index) {
46+
return (int)index & mask;
47+
}
48+
protected final E lvElement(AtomicReferenceArray<E> buffer, int offset) {
49+
return buffer.get(offset);
50+
}
51+
protected final E lpElement(AtomicReferenceArray<E> buffer, int offset) {
52+
return buffer.get(offset); // no weaker form available
53+
}
54+
protected final E lpElement(int offset) {
55+
return get(offset); // no weaker form available
56+
}
57+
protected final void spElement(AtomicReferenceArray<E> buffer, int offset, E value) {
58+
buffer.lazySet(offset, value); // no weaker form available
59+
}
60+
protected final void spElement(int offset, E value) {
61+
lazySet(offset, value); // no weaker form available
62+
}
63+
protected final void soElement(AtomicReferenceArray<E> buffer, int offset, E value) {
64+
buffer.lazySet(offset, value);
65+
}
66+
protected final void soElement(int offset, E value) {
67+
lazySet(offset, value);
68+
}
69+
protected final void svElement(AtomicReferenceArray<E> buffer, int offset, E value) {
70+
buffer.set(offset, value);
71+
}
72+
protected final E lvElement(int offset) {
73+
return get(offset);
74+
}
75+
76+
@Override
77+
public boolean add(E e) {
78+
throw new UnsupportedOperationException();
79+
}
80+
81+
@Override
82+
public E remove() {
83+
throw new UnsupportedOperationException();
84+
}
85+
86+
@Override
87+
public E element() {
88+
throw new UnsupportedOperationException();
89+
}
90+
91+
@Override
92+
public boolean contains(Object o) {
93+
throw new UnsupportedOperationException();
94+
}
95+
96+
@Override
97+
public Object[] toArray() {
98+
throw new UnsupportedOperationException();
99+
}
100+
101+
@Override
102+
public <T> T[] toArray(T[] a) {
103+
throw new UnsupportedOperationException();
104+
}
105+
106+
@Override
107+
public boolean remove(Object o) {
108+
throw new UnsupportedOperationException();
109+
}
110+
111+
@Override
112+
public boolean containsAll(Collection<?> c) {
113+
throw new UnsupportedOperationException();
114+
}
115+
116+
@Override
117+
public boolean addAll(Collection<? extends E> c) {
118+
throw new UnsupportedOperationException();
119+
}
120+
121+
@Override
122+
public boolean removeAll(Collection<?> c) {
123+
throw new UnsupportedOperationException();
124+
}
125+
126+
@Override
127+
public boolean retainAll(Collection<?> c) {
128+
throw new UnsupportedOperationException();
129+
}
130+
}
131+
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
/*
15+
* The code was inspired by the similarly named JCTools class:
16+
* https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic
17+
*/
18+
19+
package io.reactivesocket.internal.rx;
20+
21+
import java.util.*;
22+
import java.util.concurrent.atomic.AtomicReference;
23+
24+
abstract class BaseLinkedQueue<E> extends AbstractQueue<E> {
25+
private final AtomicReference<LinkedQueueNode<E>> producerNode;
26+
private final AtomicReference<LinkedQueueNode<E>> consumerNode;
27+
public BaseLinkedQueue() {
28+
producerNode = new AtomicReference<>();
29+
consumerNode = new AtomicReference<>();
30+
}
31+
protected final LinkedQueueNode<E> lvProducerNode() {
32+
return producerNode.get();
33+
}
34+
protected final LinkedQueueNode<E> lpProducerNode() {
35+
return producerNode.get();
36+
}
37+
protected final void spProducerNode(LinkedQueueNode<E> node) {
38+
producerNode.lazySet(node);
39+
}
40+
protected final LinkedQueueNode<E> xchgProducerNode(LinkedQueueNode<E> node) {
41+
return producerNode.getAndSet(node);
42+
}
43+
protected final LinkedQueueNode<E> lvConsumerNode() {
44+
return consumerNode.get();
45+
}
46+
47+
protected final LinkedQueueNode<E> lpConsumerNode() {
48+
return consumerNode.get();
49+
}
50+
protected final void spConsumerNode(LinkedQueueNode<E> node) {
51+
consumerNode.lazySet(node);
52+
}
53+
@Override
54+
public final Iterator<E> iterator() {
55+
throw new UnsupportedOperationException();
56+
}
57+
58+
/**
59+
* {@inheritDoc} <br>
60+
* <p>
61+
* IMPLEMENTATION NOTES:<br>
62+
* This is an O(n) operation as we run through all the nodes and count them.<br>
63+
*
64+
* @see java.util.Queue#size()
65+
*/
66+
@Override
67+
public final int size() {
68+
LinkedQueueNode<E> chaserNode = lvConsumerNode();
69+
final LinkedQueueNode<E> producerNode = lvProducerNode();
70+
int size = 0;
71+
// must chase the nodes all the way to the producer node, but there's no need to chase a moving target.
72+
while (chaserNode != producerNode && size < Integer.MAX_VALUE) {
73+
LinkedQueueNode<E> next;
74+
while((next = chaserNode.lvNext()) == null);
75+
chaserNode = next;
76+
size++;
77+
}
78+
return size;
79+
}
80+
/**
81+
* {@inheritDoc} <br>
82+
* <p>
83+
* IMPLEMENTATION NOTES:<br>
84+
* Queue is empty when producerNode is the same as consumerNode. An alternative implementation would be to observe
85+
* the producerNode.value is null, which also means an empty queue because only the consumerNode.value is allowed to
86+
* be null.
87+
*
88+
* @see MessagePassingQueue#isEmpty()
89+
*/
90+
@Override
91+
public final boolean isEmpty() {
92+
return lvConsumerNode() == lvProducerNode();
93+
}
94+
}

0 commit comments

Comments
 (0)