@@ -97,9 +97,7 @@ object EmbeddedKafka extends EmbeddedKafkaSupport {
97
97
* Stops all in memory Zookeeper instances, preserving the logs directories.
98
98
*/
99
99
def stopZooKeeper (): Unit = {
100
- val factories = servers
101
- .filter(_.isInstanceOf [EmbeddedZ ])
102
- .asInstanceOf [Seq [EmbeddedZ ]]
100
+ val factories = servers.toFilteredSeq[EmbeddedZ ](isEmbeddedZ)
103
101
104
102
factories
105
103
.foreach(_.stop(false ))
@@ -111,9 +109,7 @@ object EmbeddedKafka extends EmbeddedKafkaSupport {
111
109
* Stops all in memory Kafka instances, preserving the logs directories.
112
110
*/
113
111
def stopKafka (): Unit = {
114
- val brokers = servers
115
- .filter(_.isInstanceOf [EmbeddedK ])
116
- .asInstanceOf [Seq [EmbeddedK ]]
112
+ val brokers = servers.toFilteredSeq[EmbeddedK ](isEmbeddedK)
117
113
118
114
brokers
119
115
.foreach(_.stop(false ))
@@ -122,13 +118,17 @@ object EmbeddedKafka extends EmbeddedKafkaSupport {
122
118
}
123
119
124
120
/**
125
- * Returns whether the in memory Kafka and Zookeeper are running.
121
+ * Returns whether the in memory Kafka and Zookeeper are both running.
126
122
*/
127
- def isRunning : Boolean =
128
- servers
129
- .filter(_.isInstanceOf [EmbeddedK ])
130
- .asInstanceOf [Seq [EmbeddedK ]]
131
- .exists(_.factory.isDefined)
123
+ def isRunning : Boolean = servers.toFilteredSeq[EmbeddedK ](isEmbeddedK).exists(_.factory.isDefined)
124
+
125
+ private def isEmbeddedK (server : EmbeddedServer ): Boolean = server.isInstanceOf [EmbeddedK ]
126
+ private def isEmbeddedZ (server : EmbeddedServer ): Boolean = server.isInstanceOf [EmbeddedZ ]
127
+
128
+ implicit class ServerOps (servers : Seq [EmbeddedServer ]) {
129
+ def toFilteredSeq [T <: EmbeddedServer ](filter : EmbeddedServer => Boolean ): Seq [T ] =
130
+ servers.filter(filter).asInstanceOf [Seq [T ]]
131
+ }
132
132
}
133
133
134
134
sealed trait EmbeddedKafkaSupport {
0 commit comments