File tree Expand file tree Collapse file tree 2 files changed +30
-0
lines changed
language-adaptors/rxjava-clojure/src
main/clojure/rx/lang/clojure
test/clojure/rx/lang/clojure Expand file tree Collapse file tree 2 files changed +30
-0
lines changed Original file line number Diff line number Diff line change 17
17
(let [f-name (gensym " rc" )]
18
18
`(let [~f-name ~f]
19
19
(reify
20
+ ; If they want Func1, give them onSubscribe as well so Observable/create can be
21
+ ; used seemlessly with rx/fn.
22
+ ~@(if (and (= prefix " rx.util.functions.Func" )
23
+ (some #{1 } arities))
24
+ `(rx.Observable$OnSubscribeFunc
25
+ (~'onSubscribe [~'this observer#]
26
+ (~f-name observer#))))
27
+
20
28
~@(mapcat (clojure.core/fn [n]
21
29
(let [ifc-sym (symbol (str prefix n))
22
30
arg-syms (map #(symbol (str " v" %)) (range n))]
31
39
32
40
If the f has the wrong arity, an ArityException will be thrown at runtime.
33
41
42
+ This will also implement rx.Observable$OnSubscribeFunc.onSubscribe for use with
43
+ Observable/create. In this case, the function must take an Observable as its single
44
+ argument and return a subscription object.
45
+
34
46
Example:
35
47
36
48
(.reduce my-numbers (rx/fn* +))
64
76
65
77
(.map my-observable (rx/fn [a] (* 2 a)))
66
78
79
+ or, to create an Observable:
80
+
81
+ (Observable/create (rx/fn [observer]
82
+ (.onNext observer 10)
83
+ (.onCompleted observer)
84
+ (Subscriptions/empty)))
85
+
86
+ See:
87
+ rx.lang.clojure.interop/fn*
67
88
"
68
89
[& fn-form]
69
90
; preserve metadata so type hints work
Original file line number Diff line number Diff line change 8
8
(deftest test-fn*
9
9
(testing " implements Func0-9"
10
10
(let [f (rx/fn* vector)]
11
+ (is (instance? rx.Observable$OnSubscribeFunc f))
11
12
(is (instance? rx.util.functions.Func0 f))
12
13
(is (instance? rx.util.functions.Func1 f))
13
14
(is (instance? rx.util.functions.Func2 f))
113
114
114
115
(deftest test-basic-usage
115
116
117
+ (testing " can create an observable"
118
+ (is (= 99
119
+ (-> (Observable/create (rx/fn [^rx.Observer o]
120
+ (.onNext o 99 )
121
+ (.onCompleted o)
122
+ (rx.subscriptions.Subscriptions/empty )))
123
+ (BlockingObservable/single )))))
124
+
116
125
(testing " can pass rx/fn to map and friends"
117
126
(is (= (+ 1 4 9 )
118
127
(-> (Observable/from [1 2 3 ])
You can’t perform that action at this time.
0 commit comments