@@ -64,30 +64,17 @@ class EmbeddedKafkaObjectSpec extends EmbeddedKafkaSpecSupport {
64
64
val someOtherBroker = EmbeddedKafka .start()(someOtherConfig)
65
65
66
66
val topic = " publish_test_topic_1"
67
- val someMessage = " hello world!"
68
67
val someOtherMessage = " another message!"
69
68
70
69
val serializer = new StringSerializer
71
70
val deserializer = new StringDeserializer
72
71
73
- publishToKafka(topic, someMessage )(someConfig, serializer)
72
+ publishToKafka(topic, " hello world! " )(someConfig, serializer)
74
73
publishToKafka(topic, someOtherMessage)(someOtherConfig, serializer)
75
74
76
- // first
77
-
78
- val consumer = kafkaConsumer(someConfig, deserializer, deserializer)
79
- consumer.subscribe(List (topic).asJava)
80
-
81
- val records = consumer.poll(consumerPollTimeout)
82
- records.count shouldBe 1
83
-
84
- val record = records.iterator().next
85
- record.value shouldBe someMessage
86
-
75
+ kafkaIsAvailable(someConfig.kafkaPort)
87
76
EmbeddedKafka .stop(someBroker)
88
77
89
- // second
90
-
91
78
val anotherConsumer = kafkaConsumer(someOtherConfig, deserializer, deserializer)
92
79
anotherConsumer.subscribe(List (topic).asJava)
93
80
0 commit comments