@@ -5,30 +5,20 @@ import java.util.Properties
5
5
import java .util .concurrent .Executors
6
6
7
7
import kafka .admin .AdminUtils
8
- import kafka .server .KafkaConfig ._
9
8
import kafka .server .{KafkaConfig , KafkaServer }
10
9
import kafka .utils .ZkUtils
11
10
import org .apache .kafka .clients .consumer .{KafkaConsumer , OffsetAndMetadata }
12
- import org .apache .kafka .clients .producer .{
13
- KafkaProducer ,
14
- ProducerConfig ,
15
- ProducerRecord
16
- }
11
+ import org .apache .kafka .clients .producer .{KafkaProducer , ProducerConfig , ProducerRecord }
12
+ import org .apache .kafka .common .serialization .{Deserializer , Serializer , StringDeserializer , StringSerializer }
17
13
import org .apache .kafka .common .{KafkaException , TopicPartition }
18
- import org .apache .kafka .common .serialization .{
19
- Deserializer ,
20
- Serializer ,
21
- StringDeserializer ,
22
- StringSerializer
23
- }
24
14
import org .apache .zookeeper .server .{ServerCnxnFactory , ZooKeeperServer }
25
15
import org .scalatest .Suite
26
16
27
17
import scala .collection .JavaConverters ._
28
18
import scala .collection .mutable
29
19
import scala .collection .mutable .ListBuffer
30
20
import scala .concurrent .duration ._
31
- import scala .concurrent .{ExecutionContext , TimeoutException }
21
+ import scala .concurrent .{ExecutionContext , ExecutionContextExecutorService , TimeoutException }
32
22
import scala .language .{higherKinds , postfixOps }
33
23
import scala .reflect .io .Directory
34
24
import scala .util .Try
@@ -118,7 +108,7 @@ object EmbeddedKafka extends EmbeddedKafkaSupport {
118
108
119
109
sealed trait EmbeddedKafkaSupport {
120
110
private val executorService = Executors .newFixedThreadPool(2 )
121
- implicit private val executionContext =
111
+ implicit private val executionContext : ExecutionContextExecutorService =
122
112
ExecutionContext .fromExecutorService(executorService)
123
113
124
114
val zkSessionTimeoutMs = 10000
@@ -136,7 +126,7 @@ sealed trait EmbeddedKafkaSupport {
136
126
withRunningZooKeeper(config.zooKeeperPort) { zkPort =>
137
127
withTempDir(" kafka" ) { kafkaLogsDir =>
138
128
val broker =
139
- startKafka(config.copy(zooKeeperPort = zkPort) , kafkaLogsDir)
129
+ startKafka(config.kafkaPort, zkPort, config.customBrokerProperties , kafkaLogsDir)
140
130
try {
141
131
body
142
132
} finally {
@@ -162,11 +152,11 @@ sealed trait EmbeddedKafkaSupport {
162
152
withRunningZooKeeper(config.zooKeeperPort) { zkPort =>
163
153
withTempDir(" kafka" ) { kafkaLogsDir =>
164
154
val broker : KafkaServer =
165
- startKafka(config.copy(zooKeeperPort = zkPort) , kafkaLogsDir)
155
+ startKafka(config.kafkaPort, zkPort, config.customBrokerProperties , kafkaLogsDir)
166
156
val kafkaPort =
167
157
broker.boundPort(broker.config.listeners.head.listenerName)
168
158
val actualConfig =
169
- config.copy (kafkaPort = kafkaPort, zooKeeperPort = zkPort )
159
+ EmbeddedKafkaConfigImpl (kafkaPort, zkPort, config.customBrokerProperties, config.customProducerProperties, config.customConsumerProperties )
170
160
try {
171
161
body(actualConfig)
172
162
} finally {
@@ -556,10 +546,12 @@ sealed trait EmbeddedKafkaSupport {
556
546
factory
557
547
}
558
548
559
- def startKafka (config : EmbeddedKafkaConfig ,
560
- kafkaLogDir : Directory ): KafkaServer = {
561
- val zkAddress = s " localhost: ${config.zooKeeperPort}"
562
- val listener = s " PLAINTEXT://localhost: ${config.kafkaPort}"
549
+ private def startKafka (kafkaPort : Int ,
550
+ zooKeeperPort : Int ,
551
+ customBrokerProperties : Map [String , String ],
552
+ kafkaLogDir : Directory ) = {
553
+ val zkAddress = s " localhost: $zooKeeperPort"
554
+ val listener = s " PLAINTEXT://localhost: $kafkaPort"
563
555
564
556
val properties = new Properties
565
557
properties.setProperty(" zookeeper.connect" , zkAddress)
@@ -577,7 +569,7 @@ sealed trait EmbeddedKafkaSupport {
577
569
// The total memory used for log deduplication across all cleaner threads, keep it small to not exhaust suite memory
578
570
properties.setProperty(" log.cleaner.dedupe.buffer.size" , " 1048577" )
579
571
580
- config. customBrokerProperties.foreach {
572
+ customBrokerProperties.foreach {
581
573
case (key, value) => properties.setProperty(key, value)
582
574
}
583
575
@@ -586,6 +578,11 @@ sealed trait EmbeddedKafkaSupport {
586
578
broker
587
579
}
588
580
581
+ def startKafka (config : EmbeddedKafkaConfig ,
582
+ kafkaLogDir : Directory ): KafkaServer = {
583
+ startKafka(config.kafkaPort, config.zooKeeperPort, config.customBrokerProperties, kafkaLogDir)
584
+ }
585
+
589
586
/**
590
587
* Creates a topic with a custom configuration
591
588
*
0 commit comments