Skip to content

Commit f8f9883

Browse files
committed
Getting rid of silly double queue
1 parent ccabf32 commit f8f9883

File tree

6 files changed

+44
-95
lines changed

6 files changed

+44
-95
lines changed

src/Commands/VaporHandle.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,10 @@ protected function handleWebsocketEvent(array $event, Context $context): void
6767
*/
6868
protected function message()
6969
{
70-
return tap(json_decode(base64_decode($this->argument('message')), true), function ($message) {
70+
/** @var string $message */
71+
$message = $this->argument('message');
72+
73+
return tap(json_decode(base64_decode($message), true), function ($message) {
7174
if ($message === false) {
7275
throw new InvalidArgumentException('Unable to unserialize message.');
7376
}

src/Driver.php

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
namespace Georgeboot\LaravelEchoApiGateway;
44

5-
use Georgeboot\LaravelEchoApiGateway\Jobs\QueueMessageToChannels;
5+
use Aws\ApiGatewayManagementApi\Exception\ApiGatewayManagementApiException;
66
use Illuminate\Broadcasting\Broadcasters\Broadcaster;
77
use Illuminate\Broadcasting\Broadcasters\UsePusherChannelConventions;
88
use Illuminate\Support\Arr;
@@ -13,6 +13,15 @@ class Driver extends Broadcaster
1313
{
1414
use UsePusherChannelConventions;
1515

16+
protected SubscriptionRepository $subscriptionRepository;
17+
protected ConnectionRepository $connectionRepository;
18+
19+
public function __construct(SubscriptionRepository $subscriptionRepository, ConnectionRepository $connectionRepository)
20+
{
21+
$this->subscriptionRepository = $subscriptionRepository;
22+
$this->connectionRepository = $connectionRepository;
23+
}
24+
1625
/**
1726
* Authenticate the incoming request for a given channel.
1827
*
@@ -108,11 +117,11 @@ public function broadcast(array $channels, $event, array $payload = [])
108117
'data' => $payload,
109118
], JSON_THROW_ON_ERROR);
110119

111-
dispatch(new QueueMessageToChannels(
112-
$channels,
113-
$data,
114-
Arr::pull($payload, 'socket')
115-
));
120+
$skipConnectionId = Arr::pull($payload, 'socket');
121+
122+
$this->subscriptionRepository->getConnectionIdsForChannels(...$channels)
123+
->reject(fn($connectionId) => $connectionId === $skipConnectionId)
124+
->each(fn(string $connectionId) => $this->sendMessage($connectionId, $data));
116125

117126
return;
118127

@@ -127,4 +136,26 @@ public function broadcast(array $channels, $event, array $payload = [])
127136
// : 'Failed to connect to Pusher.'
128137
// );
129138
}
139+
140+
protected function sendMessage(string $connectionId, string $data): void
141+
{
142+
try {
143+
$this->connectionRepository->sendMessage($connectionId, $data);
144+
} catch (ApiGatewayManagementApiException $exception) {
145+
if ($exception->getAwsErrorCode() === 'GoneException') {
146+
$this->subscriptionRepository->clearConnection($connectionId);
147+
return;
148+
}
149+
150+
throw $exception;
151+
152+
// $exception->getErrorCode() is one of:
153+
// GoneException
154+
// LimitExceededException
155+
// PayloadTooLargeException
156+
// ForbiddenException
157+
158+
// otherwise: call $exception->getPrevious() which is a guzzle exception
159+
}
160+
}
130161
}

src/Handler.php

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,16 @@
66
use Bref\Event\ApiGateway\WebsocketEvent;
77
use Bref\Event\ApiGateway\WebsocketHandler;
88
use Bref\Event\Http\HttpResponse;
9-
use Illuminate\Contracts\Debug\ExceptionHandler;
109
use Illuminate\Support\Str;
1110
use Throwable;
1211

1312
class Handler extends WebsocketHandler
1413
{
15-
protected ExceptionHandler $exceptionHandler;
1614
protected SubscriptionRepository $subscriptionRepository;
1715
protected ConnectionRepository $connectionRepository;
1816

19-
public function __construct(ExceptionHandler $exceptionHandler, SubscriptionRepository $subscriptionRepository, ConnectionRepository $connectionRepository)
17+
public function __construct(SubscriptionRepository $subscriptionRepository, ConnectionRepository $connectionRepository)
2018
{
21-
$this->exceptionHandler = $exceptionHandler;
2219
$this->subscriptionRepository = $subscriptionRepository;
2320
$this->connectionRepository = $connectionRepository;
2421
}
@@ -41,8 +38,7 @@ public function handleWebsocket(WebsocketEvent $event, Context $context): HttpRe
4138
app('sentry')->captureException($throwable);
4239
}
4340

44-
45-
$this->exceptionHandler->report($throwable);
41+
report($throwable);
4642

4743
throw $throwable;
4844
}

src/Jobs/QueueMessageToChannels.php

Lines changed: 0 additions & 38 deletions
This file was deleted.

src/Jobs/SendMessageToConnection.php

Lines changed: 0 additions & 43 deletions
This file was deleted.

src/ServiceProvider.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public function register()
4646
public function boot(BroadcastManager $broadcastManager): void
4747
{
4848
$broadcastManager->extend('laravel-echo-api-gateway', function (): Broadcaster {
49-
return new Driver();
49+
return $this->app->make(Driver::class);
5050
});
5151
}
5252
}

0 commit comments

Comments
 (0)