Skip to content

Commit fc21940

Browse files
committed
Fix RabbitMqMessagePublisherShould test.
* Purge the queue before performing the test in order to avoid consuming previously published events such as the ones coming from the acceptance tests. * Since the `MessagePublisher` publishes out the messages asynchronously, we were validating that the queue was empty even before publishing the message, so we were not really waiting for the message to be consumed before asserting that the consumed message was the expected one. Now we wait until the queue has messages before starting to consume. * The previous assertion was running in another thread and, since the default exception handler used by the RabbitMQ library is an implementation which catches the exceptions and traduce them into a log error (rabbitmq/rabbitmq-java-client#74), the test was never failing (only logging errors). Now we inject from the test to the messages consumer a handler which extracts the event into a synchronized `mutable.Buffer` in order to perform the assertions proving that the consumed messages are the ones we're really expecting. Since that assertion is performed outside the consumer, the test will fail in case the expected messages doesn't match the consumed ones.
1 parent fb31205 commit fc21940

File tree

7 files changed

+57
-22
lines changed

7 files changed

+57
-22
lines changed

src/test/tv/codely/scala_http_api/module/IntegrationTestCase.scala

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,21 @@ import com.typesafe.config.ConfigFactory
44
import tv.codely.scala_http_api.module.shared.domain.MessagePublisher
55
import tv.codely.scala_http_api.module.shared.infrastructure.config.{DbConfig, MessageBrokerConfig}
66
import tv.codely.scala_http_api.module.shared.infrastructure.dependency_injection.SharedModuleDependencyContainer
7-
import tv.codely.scala_http_api.module.shared.infrastructure.message_broker.rabbitmq.{MessageConsumer, RabbitMqMessageConsumer}
87
import tv.codely.scala_http_api.module.shared.infrastructure.persistence.doobie.DoobieDbConnection
98

109
import scala.concurrent.ExecutionContext
1110

1211
protected[scala_http_api] trait IntegrationTestCase extends UnitTestCase {
1312
private val actorSystemName = "scala-http-api-integration-test"
1413

15-
private val appConfig = ConfigFactory.load("application")
16-
private val dbConfig = DbConfig(appConfig.getConfig("database"))
17-
private val publisherConfig = MessageBrokerConfig(appConfig.getConfig("message-publisher"))
14+
private val appConfig = ConfigFactory.load("application")
15+
private val dbConfig = DbConfig(appConfig.getConfig("database"))
16+
protected val publisherConfig = MessageBrokerConfig(appConfig.getConfig("message-publisher"))
1817

1918
private val sharedDependencies = new SharedModuleDependencyContainer(actorSystemName, dbConfig, publisherConfig)
2019

2120
implicit protected val executionContext: ExecutionContext = sharedDependencies.executionContext
2221

2322
protected val doobieDbConnection: DoobieDbConnection = sharedDependencies.doobieDbConnection
2423
protected val messagePublisher: MessagePublisher = sharedDependencies.messagePublisher
25-
26-
protected val videoCreatedQueueConsumer: MessageConsumer =
27-
new RabbitMqMessageConsumer(publisherConfig)("codelytv_scala_api.video_created")
2824
}

src/test/tv/codely/scala_http_api/module/shared/infrastructure/message_broker/rabbitmq/MessageConsumer.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@ import tv.codely.scala_http_api.module.shared.domain.Message
44

55
trait MessageConsumer {
66
def startConsuming(handler: Message => Boolean): Unit
7-
def hasMessages: Boolean
8-
def isEmpty: Boolean = !hasMessages
7+
def hasMessagesToConsume: Boolean
8+
def isEmpty: Boolean = !hasMessagesToConsume
99
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package tv.codely.scala_http_api.module.shared.infrastructure.message_broker.rabbitmq
2+
3+
trait MessagePurger {
4+
def purgeQueue(): Unit
5+
}

src/test/tv/codely/scala_http_api/module/shared/infrastructure/message_broker/rabbitmq/RabbitMqMessageConsumer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,5 +38,5 @@ final class RabbitMqMessageConsumer(brokerConfig: MessageBrokerConfig)(queueName
3838
channel.basicConsume(queueName, autoAckAfterConsume, consumer).map(_ => ())
3939
}
4040

41-
override def hasMessages: Boolean = channel.messageCount(queueName) > 0
41+
override def hasMessagesToConsume: Boolean = channel.messageCount(queueName) > 0
4242
}

src/test/tv/codely/scala_http_api/module/shared/infrastructure/message_broker/rabbitmq/RabbitMqMessagePublisherShould.scala

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,31 +5,49 @@ import tv.codely.scala_http_api.module.IntegrationTestCase
55
import tv.codely.scala_http_api.module.shared.domain.Message
66
import tv.codely.scala_http_api.module.video.domain.VideoCreatedStub
77

8+
import scala.collection.mutable
89
import scala.concurrent.duration._
910

1011
final class RabbitMqMessagePublisherShould extends IntegrationTestCase with Eventually {
1112
override implicit val patienceConfig: PatienceConfig = PatienceConfig(timeout = 1.second, interval = 50.millis)
1213

13-
// @ToDo: Fix this test. Since the `MessagePublisher` publishes out the event asynchronously,
14-
// we validate that the queue is empty even before publishing the event.
15-
// Furthermore, the `assertConsumes` assertion could contain an invalid actual result that it will not fail.
14+
private val queueName = "codelytv_scala_api.video_created"
15+
private val videoCreatedQueuePurger: MessagePurger = new RabbitMqMessagePurger(publisherConfig)(queueName)
16+
private val videoCreatedQueueConsumer: MessageConsumer = new RabbitMqMessageConsumer(publisherConfig)(queueName)
17+
18+
private val consumedMessages: mutable.Buffer[Message] = mutable.Buffer.empty
19+
1620
"publish VideoCreated domain events" in {
17-
val videoCreated = VideoCreatedStub.random
21+
videoCreatedQueuePurger.purgeQueue()
22+
waitUntilQueueIsEmpty()
1823

24+
val videoCreated = VideoCreatedStub.random
1925
messagePublisher.publish(videoCreated)
26+
waitUntilQueueHasMessages()
2027

21-
videoCreatedQueueConsumer.startConsuming(handler = assertConsumes(videoCreated))
22-
28+
videoCreatedQueueConsumer.startConsuming(extractConsumedMessagesHandler)
2329
waitUntilQueueIsEmpty()
30+
31+
consumedMessages.synchronized(consumedMessages shouldBe Seq(videoCreated))
2432
}
2533

26-
private def assertConsumes(expectedMessage: Message)(consumedMessage: Message): Boolean = {
27-
consumedMessage shouldBe expectedMessage
28-
true
34+
private def extractConsumedMessagesHandler(consumedMessage: Message): Boolean = {
35+
consumedMessages.synchronized(consumedMessages += consumedMessage)
36+
val handledSuccessfully = true
37+
handledSuccessfully
2938
}
3039

40+
private def waitUntilQueueHasMessages(): Unit = eventually(
41+
if (videoCreatedQueueConsumer.hasMessagesToConsume) ()
42+
else throw new RuntimeException("Queue has no messages. Waiting a little bit more…")
43+
)
44+
3145
private def waitUntilQueueIsEmpty(): Unit = eventually(
32-
if (videoCreatedQueueConsumer.isEmpty) ()
33-
else throw new RuntimeException("Queue is not empty. Waiting a little bit more…")
46+
if (videoCreatedQueueConsumer.isEmpty) {
47+
// If the RabbitMQ queue doesn't has any message, it doesn't mean we're not processing the last ones.
48+
// Wait a little in order to let consuming and acknowledge these messages.
49+
// More info under the `message-count` domain concept: https://www.rabbitmq.com/amqp-0-9-1-reference.html#domains
50+
Thread.sleep(50)
51+
} else throw new RuntimeException("Queue is not empty. Waiting a little bit more…")
3452
)
3553
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package tv.codely.scala_http_api.module.shared.infrastructure.message_broker.rabbitmq
2+
3+
import com.newmotion.akka.rabbitmq.ConnectionFactory
4+
import tv.codely.scala_http_api.module.shared.infrastructure.config.MessageBrokerConfig
5+
6+
final class RabbitMqMessagePurger(brokerConfig: MessageBrokerConfig)(queueName: String) extends MessagePurger {
7+
private val factory = new ConnectionFactory()
8+
factory.setHost(brokerConfig.host)
9+
factory.setPort(brokerConfig.port)
10+
factory.setUsername(brokerConfig.user)
11+
factory.setPassword(brokerConfig.password)
12+
private val connection = factory.newConnection()
13+
private val channel = connection.createChannel()
14+
15+
override def purgeQueue(): Unit = channel.queuePurge(queueName)
16+
}

src/test/tv/codely/scala_http_api/module/user/application/register/UserRegistererShould.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ final class UserRegistererShould extends UserUnitTestCase with MessagePublisherM
88
private val registerer = new UserRegisterer(repository, messagePublisher)
99

1010
"register a user" in {
11-
val user = UserStub.random
11+
val user = UserStub.random
1212
val userRegistered = UserRegisteredStub(user)
1313

1414
repositoryShouldSave(user)

0 commit comments

Comments
 (0)