@@ -97,9 +97,7 @@ object EmbeddedKafka extends EmbeddedKafkaSupport {
97
97
* Stops all in memory Zookeeper instances, preserving the logs directories.
98
98
*/
99
99
def stopZooKeeper (): Unit = {
100
- val factories = servers
101
- .filter(_.isInstanceOf [EmbeddedZ ])
102
- .asInstanceOf [Seq [EmbeddedZ ]]
100
+ val factories : Seq [EmbeddedZ ] = servers.toFilteredSeq(isEmbeddedZ)
103
101
104
102
factories
105
103
.foreach(_.stop(false ))
@@ -111,9 +109,7 @@ object EmbeddedKafka extends EmbeddedKafkaSupport {
111
109
* Stops all in memory Kafka instances, preserving the logs directories.
112
110
*/
113
111
def stopKafka (): Unit = {
114
- val brokers = servers
115
- .filter(_.isInstanceOf [EmbeddedK ])
116
- .asInstanceOf [Seq [EmbeddedK ]]
112
+ val brokers : Seq [EmbeddedK ] = servers.toFilteredSeq(isEmbeddedK)
117
113
118
114
brokers
119
115
.foreach(_.stop(false ))
@@ -123,12 +119,21 @@ object EmbeddedKafka extends EmbeddedKafkaSupport {
123
119
124
120
/**
125
121
* Returns whether the in memory Kafka and Zookeeper are running.
122
+ *
123
+ * `brokers` val is required for type information.
126
124
*/
127
- def isRunning : Boolean =
128
- servers
129
- .filter(_.isInstanceOf [EmbeddedK ])
130
- .asInstanceOf [Seq [EmbeddedK ]]
131
- .exists(_.factory.isDefined)
125
+ def isRunning : Boolean = {
126
+ val brokers : Seq [EmbeddedK ] = servers.toFilteredSeq(isEmbeddedK)
127
+ brokers.exists(_.factory.isDefined)
128
+ }
129
+
130
+ private def isEmbeddedK (server : EmbeddedServer ): Boolean = server.isInstanceOf [EmbeddedK ]
131
+ private def isEmbeddedZ (server : EmbeddedServer ): Boolean = server.isInstanceOf [EmbeddedZ ]
132
+
133
+ implicit class ServerOps [T <: EmbeddedServer ](servers : Seq [EmbeddedServer ]) {
134
+ def toFilteredSeq (implicit filter : EmbeddedServer => Boolean ): Seq [T ] =
135
+ servers.filter(filter).asInstanceOf [Seq [T ]]
136
+ }
132
137
}
133
138
134
139
sealed trait EmbeddedKafkaSupport {
0 commit comments