Skip to content

Commit c3294b1

Browse files
committed
Major changes to implementation of BindingSubscriber, but how to dispose binding?
1 parent 4b3cf2d commit c3294b1

File tree

2 files changed

+76
-41
lines changed

2 files changed

+76
-41
lines changed

src/main/java/rx/subscribers/BindingSubscriber.java

Lines changed: 67 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,44 +16,40 @@
1616

1717
package rx.subscribers;
1818

19+
import com.sun.javafx.binding.ExpressionHelper;
20+
import javafx.beans.InvalidationListener;
1921
import javafx.beans.binding.Binding;
20-
import javafx.beans.value.ObservableValueBase;
22+
import javafx.beans.value.ChangeListener;
23+
import javafx.beans.value.ObservableValue;
2124
import javafx.collections.ObservableList;
22-
import rx.Observable;
2325
import rx.Subscriber;
26+
import rx.Subscription;
2427
import rx.functions.Action1;
2528

26-
import java.util.Optional;
2729

30+
public final class BindingSubscriber<T> extends Subscriber<T> implements ObservableValue<T>, Binding<T>, Subscription {
2831

29-
public final class BindingSubscriber<T> extends ObservableValueBase<T> implements Binding<T> {
30-
31-
private final Subscriber<T> subscriber;
32-
32+
private final Action1<Throwable> onError;
33+
private ExpressionHelper<T> helper;
3334
private T value;
3435

35-
BindingSubscriber(Observable<T> observable, Optional<T> initialValue, final Action1<Throwable> onError) {
36-
37-
this.subscriber = new Subscriber<T>() {
38-
@Override
39-
public void onCompleted() {
40-
//do nothing
41-
}
42-
43-
@Override
44-
public void onError(Throwable e) {
45-
onError.call(e);
46-
}
47-
48-
@Override
49-
public void onNext(T item) {
50-
value = item;
51-
fireValueChangedEvent();
52-
}
53-
};
36+
BindingSubscriber(final Action1<Throwable> onError) {
37+
this.onError = onError;
38+
}
39+
@Override
40+
public void onCompleted() {
41+
//do nothing
42+
}
5443

55-
observable.subscribe(subscriber);
44+
@Override
45+
public void onError(Throwable e) {
46+
onError.call(e);
47+
}
5648

49+
@Override
50+
public void onNext(T t) {
51+
value = t;
52+
fireValueChangedEvent();
5753
}
5854
@Override
5955
public T getValue() {
@@ -76,6 +72,49 @@ public ObservableList<?> getDependencies() {
7672

7773
@Override
7874
public void dispose() {
79-
subscriber.unsubscribe();
75+
this.unsubscribe();
76+
}
77+
78+
/**
79+
* {@inheritDoc}
80+
*/
81+
@Override
82+
public void addListener(InvalidationListener listener) {
83+
helper = ExpressionHelper.addListener(helper, this, listener);
84+
}
85+
86+
/**
87+
* {@inheritDoc}
88+
*/
89+
@Override
90+
public void addListener(ChangeListener<? super T> listener) {
91+
helper = ExpressionHelper.addListener(helper, this, listener);
92+
}
93+
94+
/**
95+
* {@inheritDoc}
96+
*/
97+
@Override
98+
public void removeListener(InvalidationListener listener) {
99+
helper = ExpressionHelper.removeListener(helper, listener);
100+
}
101+
102+
/**
103+
* {@inheritDoc}
104+
*/
105+
@Override
106+
public void removeListener(ChangeListener<? super T> listener) {
107+
helper = ExpressionHelper.removeListener(helper, listener);
108+
}
109+
110+
/**
111+
* Notify the currently registered observers of a value change.
112+
*
113+
* This implementation will ignore all adds and removes of observers that
114+
* are done while a notification is processed. The changes take effect in
115+
* the following call to fireValueChangedEvent.
116+
*/
117+
protected void fireValueChangedEvent() {
118+
ExpressionHelper.fireValueChangedEvent(helper);
80119
}
81120
}

src/main/java/rx/subscribers/JavaFxSubscriber.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,20 @@
1616

1717
package rx.subscribers;
1818

19-
import rx.Observable;
19+
import javafx.beans.property.Property;
2020
import rx.functions.Action1;
2121

22-
import java.util.Optional;
23-
2422
public enum JavaFxSubscriber {
2523
;//no instances
2624

27-
public static <T> BindingSubscriber<T> toBinding(Observable<T> observable) {
28-
return new BindingSubscriber<T>(observable, Optional.empty(), e -> {});
29-
}
30-
public static <T> BindingSubscriber<T> toBinding(Observable<T> observable,Action1<Throwable> onErrorAction ) {
31-
return new BindingSubscriber<T>(observable, Optional.empty(), onErrorAction);
32-
}
33-
public static <T> BindingSubscriber<T> toBinding(Observable<T> observable, T initialValue, Action1<Throwable> onErrorAction ) {
34-
return new BindingSubscriber<T>(observable, Optional.of(initialValue), onErrorAction);
25+
public static <T> BindingSubscriber<T> toBinding(Property<? super T> t) {
26+
BindingSubscriber<T> bindingSubscriber = new BindingSubscriber<>(e -> {});
27+
t.bind(bindingSubscriber);
28+
return bindingSubscriber;
3529
}
36-
public static <T> BindingSubscriber<T> toBinding(Observable<T> observable, T initialValue) {
37-
return new BindingSubscriber<T>(observable, Optional.of(initialValue), e -> {});
30+
public static <T> BindingSubscriber<T> toBinding(Property<? super T> t, Action1<Throwable> onErrorAction ) {
31+
BindingSubscriber<T> bindingSubscriber = new BindingSubscriber<>(onErrorAction);
32+
t.bind(bindingSubscriber);
33+
return bindingSubscriber;
3834
}
3935
}

0 commit comments

Comments
 (0)