Skip to content

Commit c6da63f

Browse files
committed
Rewrite toMultimap
1 parent 06fee6e commit c6da63f

File tree

2 files changed

+31
-23
lines changed

2 files changed

+31
-23
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import java.io.IOException
1919
import java.util.concurrent.CountDownLatch
2020
import java.util.concurrent.TimeUnit
2121

22+
import scala.collection.mutable
2223
import scala.concurrent.duration.Duration
2324
import scala.concurrent.duration.DurationInt
2425
import scala.concurrent.duration.DurationLong
@@ -881,10 +882,10 @@ class RxScalaDemo extends JUnitSuite {
881882
}
882883

883884
@Test def toMultimapExample3(): Unit = {
884-
val o : Observable[String] = List("alice", "bob", "carol", "allen", "clarke").toObservable
885+
val o: Observable[String] = List("alice", "bob", "carol", "allen", "clarke").toObservable
885886
val keySelector = (s: String) => s.head
886887
val valueSelector = (s: String) => s.tail
887-
val mapFactory = () => Map('d' -> List("oug"))
888+
val mapFactory: () => mutable.Map[Char, mutable.Buffer[String]] = () => mutable.Map('d' -> mutable.ListBuffer("oug"))
888889
val m = o.toMultimap(keySelector, valueSelector, mapFactory)
889890
println(m.toBlocking.single.mapValues(_.toList))
890891
}
@@ -893,9 +894,9 @@ class RxScalaDemo extends JUnitSuite {
893894
val o : Observable[String] = List("alice", "bob", "carol", "allen", "clarke").toObservable
894895
val keySelector = (s: String) => s.head
895896
val valueSelector = (s: String) => s.tail
896-
val mapFactory = () => Map('d' -> List("oug"))
897-
val valueFactor = (k: Char) => List[String]()
898-
val m = o.toMultimap(keySelector, valueSelector, mapFactory, valueFactor)
897+
val mapFactory: () => mutable.Map[Char, mutable.Buffer[String]] = () => mutable.Map('d' -> mutable.ListBuffer("oug"))
898+
val valueFactory = (k: Char) => mutable.ListBuffer[String]()
899+
val m = o.toMultimap(keySelector, valueSelector, mapFactory, valueFactory)
899900
println(m.toBlocking.single)
900901
}
901902

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ trait Observable[+T]
102102
import scala.collection.JavaConverters._
103103
import scala.collection.Seq
104104
import scala.concurrent.duration.{Duration, TimeUnit, MILLISECONDS}
105+
import scala.collection.mutable
105106
import rx.functions._
106107
import rx.lang.scala.observables.BlockingObservable
107108
import ImplicitFunctionConversions._
@@ -3891,10 +3892,8 @@ trait Observable[+T]
38913892
* @return an Observable that emits a single item: a `Map` that contains an `Seq` of items mapped from
38923893
* the source Observable
38933894
*/
3894-
def toMultimap[K](keySelector: T => K): Observable[Map[K, Seq[T]]] = {
3895-
val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]]
3896-
val o: rx.Observable[java.util.Map[K, java.util.Collection[T]]] = thisJava.toMultimap[K](keySelector)
3897-
toScalaObservable[java.util.Map[K, java.util.Collection[T]]](o).map(m => m.toMap.mapValues(_.toSeq))
3895+
def toMultimap[K](keySelector: T => K): Observable[scala.collection.Map[K, Seq[T]]] = {
3896+
toMultimap(keySelector, k => k)
38983897
}
38993898

39003899
/**
@@ -3909,10 +3908,8 @@ trait Observable[+T]
39093908
* @return an Observable that emits a single item: a `Map` that contains an `Seq` of items mapped from
39103909
* the source Observable
39113910
*/
3912-
def toMultimap[K, V](keySelector: T => K, valueSelector: T => V): Observable[Map[K, Seq[V]]] = {
3913-
val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]]
3914-
val o: rx.Observable[java.util.Map[K, java.util.Collection[V]]] = thisJava.toMultimap[K, V](keySelector, valueSelector)
3915-
toScalaObservable[java.util.Map[K, java.util.Collection[V]]](o).map(m => m.toMap.mapValues(_.toSeq))
3911+
def toMultimap[K, V](keySelector: T => K, valueSelector: T => V): Observable[scala.collection.Map[K, Seq[V]]] = {
3912+
toMultimap(keySelector, valueSelector, () => mutable.Map[K, mutable.Buffer[V]]())
39163913
}
39173914

39183915
/**
@@ -3928,10 +3925,8 @@ trait Observable[+T]
39283925
* @return an Observable that emits a single item: a `Map` that contains a `Seq` items mapped from the source
39293926
* Observable
39303927
*/
3931-
def toMultimap[K, V](keySelector: T => K, valueSelector: T => V, mapFactory: () => Map[K, Seq[V]]): Observable[Map[K, Seq[V]]] = {
3932-
val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]]
3933-
val o: rx.Observable[java.util.Map[K, java.util.Collection[V]]] = thisJava.toMultimap[K, V](keySelector, valueSelector)
3934-
toScalaObservable[java.util.Map[K, java.util.Collection[V]]](o).map(m => mapFactory() ++ m.toMap.mapValues(_.toSeq))
3928+
def toMultimap[K, V](keySelector: T => K, valueSelector: T => V, mapFactory: () => mutable.Map[K, mutable.Buffer[V]]): Observable[scala.collection.Map[K, Seq[V]]] = {
3929+
toMultimap(keySelector, valueSelector, mapFactory, k => mutable.ListBuffer[V]())
39353930
}
39363931

39373932
/**
@@ -3948,12 +3943,24 @@ trait Observable[+T]
39483943
* @return an Observable that emits a single item: a `Map` that contains the `Seq` of mapped items from
39493944
* the source Observable
39503945
*/
3951-
def toMultimap[K, V](keySelector: T => K, valueSelector: T => V, mapFactory: () => Map[K, Seq[V]], collectionFactory: K => Seq[V]): Observable[Map[K, Seq[V]]] = {
3952-
val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]]
3953-
val o: rx.Observable[java.util.Map[K, java.util.Collection[V]]] = thisJava.toMultimap[K, V](keySelector, valueSelector)
3954-
toScalaObservable[java.util.Map[K, java.util.Collection[V]]](o).map {
3955-
m => mapFactory() ++ m.toMap.map {
3956-
case (k: K, v: java.util.Collection[V]) => (k, collectionFactory(k) ++ v)
3946+
def toMultimap[K, V](keySelector: T => K, valueSelector: T => V, mapFactory: () => mutable.Map[K, mutable.Buffer[V]], collectionFactory: K => mutable.Buffer[V]): Observable[scala.collection.Map[K, Seq[V]]] = {
3947+
lift {
3948+
(subscriber: Subscriber[scala.collection.Map[K, Seq[V]]]) => {
3949+
val map = mapFactory().withDefault(collectionFactory)
3950+
Subscriber[T](
3951+
subscriber,
3952+
(t: T) => {
3953+
val key = keySelector(t)
3954+
val value = map(key)
3955+
value += valueSelector(t)
3956+
map += key -> value: Unit
3957+
},
3958+
e => subscriber.onError(e),
3959+
() => {
3960+
subscriber.onNext(map)
3961+
subscriber.onCompleted()
3962+
}
3963+
)
39573964
}
39583965
}
39593966
}

0 commit comments

Comments
 (0)