|
33 | 33 | */
|
34 | 34 | public final class OnSubscribeUsing<T, Resource> implements OnSubscribe<T> {
|
35 | 35 |
|
36 |
| - private final Func0<Resource> resourceFactory; |
37 |
| - private final Func1<? super Resource, ? extends Observable<? extends T>> observableFactory; |
38 |
| - private final Action1<? super Resource> dispose; |
39 |
| - private final boolean disposeEagerly; |
| 36 | + private final Func0<Resource> resourceFactory; |
| 37 | + private final Func1<? super Resource, ? extends Observable<? extends T>> observableFactory; |
| 38 | + private final Action1<? super Resource> dispose; |
| 39 | + private final boolean disposeEagerly; |
40 | 40 |
|
41 |
| - public OnSubscribeUsing( |
42 |
| - Func0<Resource> resourceFactory, |
43 |
| - Func1<? super Resource, ? extends Observable<? extends T>> observableFactory, |
44 |
| - Action1<? super Resource> dispose, boolean disposeEagerly) { |
45 |
| - this.resourceFactory = resourceFactory; |
46 |
| - this.observableFactory = observableFactory; |
47 |
| - this.dispose = dispose; |
48 |
| - this.disposeEagerly = disposeEagerly; |
49 |
| - } |
| 41 | + public OnSubscribeUsing(Func0<Resource> resourceFactory, |
| 42 | + Func1<? super Resource, ? extends Observable<? extends T>> observableFactory, |
| 43 | + Action1<? super Resource> dispose, boolean disposeEagerly) { |
| 44 | + this.resourceFactory = resourceFactory; |
| 45 | + this.observableFactory = observableFactory; |
| 46 | + this.dispose = dispose; |
| 47 | + this.disposeEagerly = disposeEagerly; |
| 48 | + } |
50 | 49 |
|
51 |
| - @Override |
52 |
| - public void call(Subscriber<? super T> subscriber) { |
| 50 | + @Override |
| 51 | + public void call(Subscriber<? super T> subscriber) { |
53 | 52 |
|
54 |
| - try { |
55 |
| - |
56 |
| - // create the resource |
57 |
| - final Resource resource = resourceFactory.call(); |
58 |
| - // create an action/subscription that disposes only once |
59 |
| - final DisposeAction<Resource> disposeOnceOnly = new DisposeAction<Resource>(dispose, resource); |
60 |
| - // dispose on unsubscription |
61 |
| - subscriber.add(disposeOnceOnly); |
62 |
| - // create the observable |
63 |
| - final Observable<? extends T> source = observableFactory |
64 |
| - // create the observable |
65 |
| - .call(resource); |
66 |
| - final Observable<? extends T> observable; |
67 |
| - // supplement with on termination disposal if requested |
68 |
| - if (disposeEagerly) |
69 |
| - observable = source |
70 |
| - // dispose on completion or error |
71 |
| - .doOnTerminate(disposeOnceOnly); |
72 |
| - else |
73 |
| - observable = source; |
74 |
| - try { |
75 |
| - // start |
76 |
| - observable.unsafeSubscribe(subscriber); |
77 |
| - } catch (Throwable e) { |
78 |
| - Throwable disposeError = disposeEagerlyIfRequested(disposeOnceOnly); |
79 |
| - if (disposeError != null) |
80 |
| - subscriber.onError(new CompositeException(Arrays.asList(e, |
81 |
| - disposeError))); |
82 |
| - else |
83 |
| - // propagate error |
84 |
| - subscriber.onError(e); |
85 |
| - } |
86 |
| - } catch (Throwable e) { |
87 |
| - // then propagate error |
88 |
| - subscriber.onError(e); |
89 |
| - } |
90 |
| - } |
| 53 | + try { |
91 | 54 |
|
92 |
| - private Throwable disposeEagerlyIfRequested(final Action0 disposeOnceOnly) { |
93 |
| - if (disposeEagerly) |
94 |
| - try { |
95 |
| - disposeOnceOnly.call(); |
96 |
| - return null; |
97 |
| - } catch (Throwable e) { |
98 |
| - return e; |
99 |
| - } |
100 |
| - else |
101 |
| - return null; |
102 |
| - } |
| 55 | + // create the resource |
| 56 | + final Resource resource = resourceFactory.call(); |
| 57 | + // create an action/subscription that disposes only once |
| 58 | + final DisposeAction<Resource> disposeOnceOnly = new DisposeAction<Resource>(dispose, |
| 59 | + resource); |
| 60 | + // dispose on unsubscription |
| 61 | + subscriber.add(disposeOnceOnly); |
| 62 | + // create the observable |
| 63 | + final Observable<? extends T> source = observableFactory |
| 64 | + // create the observable |
| 65 | + .call(resource); |
| 66 | + final Observable<? extends T> observable; |
| 67 | + // supplement with on termination disposal if requested |
| 68 | + if (disposeEagerly) |
| 69 | + observable = source |
| 70 | + // dispose on completion or error |
| 71 | + .doOnTerminate(disposeOnceOnly); |
| 72 | + else |
| 73 | + observable = source; |
| 74 | + try { |
| 75 | + // start |
| 76 | + observable.unsafeSubscribe(subscriber); |
| 77 | + } catch (Throwable e) { |
| 78 | + Throwable disposeError = disposeEagerlyIfRequested(disposeOnceOnly); |
| 79 | + if (disposeError != null) |
| 80 | + subscriber.onError(new CompositeException(Arrays.asList(e, disposeError))); |
| 81 | + else |
| 82 | + // propagate error |
| 83 | + subscriber.onError(e); |
| 84 | + } |
| 85 | + } catch (Throwable e) { |
| 86 | + // then propagate error |
| 87 | + subscriber.onError(e); |
| 88 | + } |
| 89 | + } |
103 | 90 |
|
104 |
| - private static final class DisposeAction<Resource> extends AtomicBoolean implements Action0, Subscription { |
| 91 | + private Throwable disposeEagerlyIfRequested(final Action0 disposeOnceOnly) { |
| 92 | + if (disposeEagerly) |
| 93 | + try { |
| 94 | + disposeOnceOnly.call(); |
| 95 | + return null; |
| 96 | + } catch (Throwable e) { |
| 97 | + return e; |
| 98 | + } |
| 99 | + else |
| 100 | + return null; |
| 101 | + } |
| 102 | + |
| 103 | + private static final class DisposeAction<Resource> extends AtomicBoolean implements Action0, |
| 104 | + Subscription { |
105 | 105 | private static final long serialVersionUID = 4262875056400218316L;
|
106 |
| - |
| 106 | + |
107 | 107 | private Action1<? super Resource> dispose;
|
108 |
| - private Resource resource; |
109 |
| - |
110 |
| - private DisposeAction(Action1<? super Resource> dispose, Resource resource) { |
111 |
| - this.dispose = dispose; |
112 |
| - this.resource = resource; |
113 |
| - lazySet(false); // StoreStore barrier |
114 |
| - } |
115 |
| - @Override |
116 |
| - public void call() { |
117 |
| - if (compareAndSet(false, true)) { |
118 |
| - try { |
119 |
| - dispose.call(resource); |
120 |
| - } finally { |
121 |
| - resource = null; |
122 |
| - dispose = null; |
123 |
| - } |
124 |
| - } |
125 |
| - } |
126 |
| - @Override |
127 |
| - public boolean isUnsubscribed() { |
128 |
| - return get(); |
129 |
| - } |
130 |
| - public void unsubscribe() { |
131 |
| - call(); |
132 |
| - } |
133 |
| - } |
| 108 | + private Resource resource; |
| 109 | + |
| 110 | + private DisposeAction(Action1<? super Resource> dispose, Resource resource) { |
| 111 | + this.dispose = dispose; |
| 112 | + this.resource = resource; |
| 113 | + lazySet(false); // StoreStore barrier |
| 114 | + } |
| 115 | + |
| 116 | + @Override |
| 117 | + public void call() { |
| 118 | + if (compareAndSet(false, true)) { |
| 119 | + try { |
| 120 | + dispose.call(resource); |
| 121 | + } finally { |
| 122 | + resource = null; |
| 123 | + dispose = null; |
| 124 | + } |
| 125 | + } |
| 126 | + } |
| 127 | + |
| 128 | + @Override |
| 129 | + public boolean isUnsubscribed() { |
| 130 | + return get(); |
| 131 | + } |
| 132 | + |
| 133 | + @Override |
| 134 | + public void unsubscribe() { |
| 135 | + call(); |
| 136 | + } |
| 137 | + } |
134 | 138 | }
|
0 commit comments