Skip to content

Commit 890efc8

Browse files
authored
feat(MPM-329): add subscription param (#59)
* feat(MPM-329): add subscription param * add test, remove from low level (legacy) * fix cs
1 parent c93d278 commit 890efc8

File tree

5 files changed

+36
-24
lines changed

5 files changed

+36
-24
lines changed

src/Conf/KafkaConfiguration.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class KafkaConfiguration extends RdKafkaConf
4141
* @param mixed[] $config
4242
* @param string $type
4343
*/
44-
public function __construct(array $brokers, array $topicSubscriptions, array $config = [], string $type = '')
44+
public function __construct(array $brokers, array $topicSubscriptions = [], array $config = [], string $type = '')
4545
{
4646
parent::__construct();
4747

src/Consumer/KafkaConsumerBuilder.php

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -292,10 +292,6 @@ public function build(): KafkaConsumerInterface
292292
throw new KafkaConsumerBuilderException(KafkaConsumerBuilderException::NO_BROKER_EXCEPTION_MESSAGE);
293293
}
294294

295-
if ([] === $this->topics) {
296-
throw new KafkaConsumerBuilderException(KafkaConsumerBuilderException::NO_TOPICS_EXCEPTION_MESSAGE);
297-
}
298-
299295
//set additional config
300296
$this->config['group.id'] = $this->consumerGroup;
301297

src/Consumer/KafkaHighLevelConsumer.php

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
use Jobcloud\Kafka\Message\KafkaConsumerMessageInterface;
1414
use RdKafka\Exception as RdKafkaException;
1515
use RdKafka\Message as RdKafkaMessage;
16+
use RdKafka\TopicPartition;
1617
use RdKafka\TopicPartition as RdKafkaTopicPartition;
1718
use RdKafka\KafkaConsumer as RdKafkaHighLevelConsumer;
1819

@@ -41,13 +42,14 @@ public function __construct(
4142
* Subscribes to all defined topics, if no partitions were set, subscribes to all partitions.
4243
* If partition(s) (and optionally offset(s)) were set, subscribes accordingly
4344
*
45+
* @param array<TopicSubscription> $topicSubscriptions
4446
* @throws KafkaConsumerSubscriptionException
4547
* @return void
4648
*/
47-
public function subscribe(): void
49+
public function subscribe(array $topicSubscriptions = []): void
4850
{
49-
$subscriptions = $this->getTopicSubscriptions();
50-
$assignments = $this->getTopicAssignments();
51+
$subscriptions = $this->getTopicSubscriptions($topicSubscriptions);
52+
$assignments = $this->getTopicAssignments($topicSubscriptions);
5153

5254
if ([] !== $subscriptions && [] !== $assignments) {
5355
throw new KafkaConsumerSubscriptionException(
@@ -239,13 +241,18 @@ private function getOffsetsToCommitForMessages(array $messages): array
239241
}
240242

241243
/**
244+
* @param array<TopicSubscription> $topicSubscriptions
242245
* @return array|string[]
243246
*/
244-
private function getTopicSubscriptions(): array
247+
private function getTopicSubscriptions(array $topicSubscriptions = []): array
245248
{
246249
$subscriptions = [];
247250

248-
foreach ($this->kafkaConfiguration->getTopicSubscriptions() as $topicSubscription) {
251+
if ([] === $topicSubscriptions) {
252+
$topicSubscriptions = $this->kafkaConfiguration->getTopicSubscriptions();
253+
}
254+
255+
foreach ($topicSubscriptions as $topicSubscription) {
249256
if (
250257
[] !== $topicSubscription->getPartitions()
251258
|| KafkaConsumerBuilderInterface::OFFSET_STORED !== $topicSubscription->getOffset()
@@ -259,13 +266,18 @@ private function getTopicSubscriptions(): array
259266
}
260267

261268
/**
269+
* @param array<TopicSubscription> $topicSubscriptions
262270
* @return array|RdKafkaTopicPartition[]
263271
*/
264-
private function getTopicAssignments(): array
272+
private function getTopicAssignments(array $topicSubscriptions = []): array
265273
{
266274
$assignments = [];
267275

268-
foreach ($this->kafkaConfiguration->getTopicSubscriptions() as $topicSubscription) {
276+
if ([] === $topicSubscriptions) {
277+
$topicSubscriptions = $this->kafkaConfiguration->getTopicSubscriptions();
278+
}
279+
280+
foreach ($topicSubscriptions as $topicSubscription) {
269281
if (
270282
[] === $topicSubscription->getPartitions()
271283
&& KafkaConsumerBuilderInterface::OFFSET_STORED === $topicSubscription->getOffset()

tests/Unit/Consumer/KafkaConsumerBuilderTest.php

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -288,18 +288,6 @@ public function testBuildFailMissingBrokers(): void
288288
$this->kafkaConsumerBuilder->build();
289289
}
290290

291-
/**
292-
* @return void
293-
* @throws KafkaConsumerBuilderException
294-
*/
295-
public function testBuildFailMissingTopics(): void
296-
{
297-
self::expectException(KafkaConsumerBuilderException::class);
298-
self::expectExceptionMessage(KafkaConsumerBuilderException::NO_TOPICS_EXCEPTION_MESSAGE);
299-
300-
$this->kafkaConsumerBuilder->withAdditionalBroker('localhost')->build();
301-
}
302-
303291
/**
304292
* @return void
305293
*/

tests/Unit/Consumer/KafkaHighLevelConsumerTest.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,22 @@ public function testSubscribeSuccess(): void
4747
$kafkaConsumer->subscribe();
4848
}
4949

50+
/**
51+
* @throws KafkaConsumerSubscriptionException
52+
*/
53+
public function testSubscribeSuccessWithParam(): void
54+
{
55+
$rdKafkaConsumerMock = $this->createMock(RdKafkaHighLevelConsumer::class);
56+
$kafkaConfigurationMock = $this->createMock(KafkaConfiguration::class);
57+
$kafkaConfigurationMock->expects(self::never())->method('getTopicSubscriptions');
58+
$decoderMock = $this->getMockForAbstractClass(DecoderInterface::class);
59+
$kafkaConsumer = new KafkaHighLevelConsumer($rdKafkaConsumerMock, $kafkaConfigurationMock, $decoderMock);
60+
61+
$rdKafkaConsumerMock->expects(self::once())->method('subscribe')->with(['testTopic3']);
62+
63+
$kafkaConsumer->subscribe([new TopicSubscription('testTopic3')]);
64+
}
65+
5066
/**
5167
* @throws KafkaConsumerSubscriptionException
5268
*/

0 commit comments

Comments
 (0)