Skip to content

Commit 334fa5a

Browse files
authored
Fix RC between publishers with duplicated ids (#15)
1 parent 726dd54 commit 334fa5a

File tree

2 files changed

+176
-39
lines changed

2 files changed

+176
-39
lines changed

lib/live_ex_webrtc/player.ex

Lines changed: 75 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ defmodule LiveExWebRTC.Player do
8080

8181
@type t() :: struct()
8282

83+
@check_lock_timeout_ms 3000
84+
@max_lock_timeout_ms 3000
85+
8386
defstruct id: nil,
8487
publisher_id: nil,
8588
publisher_audio_track: nil,
@@ -101,7 +104,10 @@ defmodule LiveExWebRTC.Player do
101104
target_layer: nil,
102105
video_layers: [],
103106
# codec that will be used for video sending
104-
video_send_codec: nil
107+
video_send_codec: nil,
108+
last_seen: nil,
109+
locked: false,
110+
lock_timer: nil
105111

106112
alias ExWebRTC.{ICECandidate, MediaStreamTrack, PeerConnection, RTP.Munger, SessionDescription}
107113
alias ExRTCP.Packet.PayloadFeedback.PLI
@@ -351,16 +357,34 @@ defmodule LiveExWebRTC.Player do
351357
publisher_audio_track: ^publisher_audio_track,
352358
publisher_video_track: ^publisher_video_track
353359
} ->
354-
# tracks are the same, do nothing
360+
# tracks are the same, update last_seen and do nothing
361+
player = %Player{player | last_seen: System.monotonic_time(:millisecond)}
362+
socket = assign(socket, player: player)
363+
{:noreply, socket}
364+
365+
%Player{locked: true} ->
366+
# Different tracks but we are still receiving updates from old publisher. Ignore.
355367
{:noreply, socket}
356368

357369
%Player{
358370
publisher_audio_track: old_publisher_audio_track,
359371
publisher_video_track: old_publisher_video_track,
360-
video_layers: old_layers
372+
video_layers: old_layers,
373+
locked: false
361374
} ->
362375
if player.pc, do: PeerConnection.close(player.pc)
363376

377+
if player.lock_timer do
378+
Process.cancel_timer(player.lock_timer)
379+
380+
# flush mailbox
381+
receive do
382+
:check_lock -> :ok
383+
after
384+
0 -> :ok
385+
end
386+
end
387+
364388
video_layers = (publisher_video_track && publisher_video_track.rids) || ["h"]
365389

366390
video_layers =
@@ -378,7 +402,10 @@ defmodule LiveExWebRTC.Player do
378402
layer: "h",
379403
target_layer: "h",
380404
video_layers: video_layers,
381-
munger: nil
405+
munger: nil,
406+
last_seen: System.monotonic_time(:millisecond),
407+
locked: true,
408+
lock_timer: Process.send_after(self(), :check_lock, @check_lock_timeout_ms)
382409
}
383410

384411
socket = assign(socket, :player, player)
@@ -406,6 +433,25 @@ defmodule LiveExWebRTC.Player do
406433
end
407434
end
408435

436+
@impl true
437+
def handle_info({:live_ex_webrtc, :bye, publisher_audio_track, publisher_video_track}, socket) do
438+
%{player: player} = socket.assigns
439+
440+
case player do
441+
%Player{
442+
publisher_audio_track: ^publisher_audio_track,
443+
publisher_video_track: ^publisher_video_track
444+
} ->
445+
player = %Player{player | locked: false}
446+
socket = assign(socket, player: player)
447+
{:noreply, socket}
448+
449+
_ ->
450+
{:noreply, socket}
451+
end
452+
end
453+
454+
@impl true
409455
def handle_info({:live_ex_webrtc, :audio, packet}, socket) do
410456
%{player: player} = socket.assigns
411457

@@ -418,6 +464,7 @@ defmodule LiveExWebRTC.Player do
418464
{:noreply, socket}
419465
end
420466

467+
@impl true
421468
def handle_info({:live_ex_webrtc, :video, rid, packet}, socket) do
422469
%{player: player} = socket.assigns
423470

@@ -461,6 +508,30 @@ defmodule LiveExWebRTC.Player do
461508
end
462509
end
463510

511+
@impl true
512+
def handle_info(:check_lock, %{assigns: %{player: %Player{locked: true} = player}} = socket) do
513+
now = System.monotonic_time(:millisecond)
514+
515+
if now - socket.assigns.player.last_seen > @max_lock_timeout_ms do
516+
# unlock i.e. allow for track update
517+
player = %Player{player | lock_timer: nil, locked: false}
518+
socket = assign(socket, :player, player)
519+
{:noreply, socket}
520+
else
521+
timer = Process.send_after(self(), :check_lock, @check_lock_timeout_ms)
522+
player = %Player{player | lock_timer: timer}
523+
socket = assign(socket, :player, player)
524+
{:noreply, socket}
525+
end
526+
end
527+
528+
@impl true
529+
def handle_info(:check_lock, socket) do
530+
player = %Player{socket.assigns.player | lock_timer: nil}
531+
socket = assign(socket, :player, player)
532+
{:noreply, socket}
533+
end
534+
464535
@impl true
465536
def handle_event("toggle-settings", _params, socket) do
466537
socket =

lib/live_ex_webrtc/publisher.ex

Lines changed: 101 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ defmodule LiveExWebRTC.Publisher do
2525
`{:live_ex_webrtc, :audio, ExRTP.Packet.t()}`. Packets for non-simulcast video tracks are always
2626
sent with "h" identifier.
2727
* `streams:info:#{publisher.id}"` - for receiving information about publisher tracks and their layers.
28-
The message is in form of: `{:live_ex_webrtc, :info, audio_track :: ExWebRTC.MediaStreamTrack.t(), video_track :: ExWebRTC.MediaStreamTrack.t()}`.
28+
The message is in form of: `{:live_ex_webrtc, :info | :bye, audio_track :: ExWebRTC.MediaStreamTrack.t(), video_track :: ExWebRTC.MediaStreamTrack.t()}`.
2929
* `publishers:#{publisher_id}` for sending keyframe request.
3030
The message must be in form of `{:live_ex_webrtc, :keyframe_req, "l" | "m" | "h"}`
3131
E.g.
@@ -139,6 +139,7 @@ defmodule LiveExWebRTC.Publisher do
139139

140140
defstruct id: nil,
141141
pc: nil,
142+
pc_mref: nil,
142143
streaming?: false,
143144
simulcast_supported?: nil,
144145
# record checkbox status
@@ -160,7 +161,8 @@ defmodule LiveExWebRTC.Publisher do
160161
ice_port_range: nil,
161162
audio_codecs: nil,
162163
video_codecs: nil,
163-
pc_genserver_opts: nil
164+
pc_genserver_opts: nil,
165+
info_timer: nil
164166

165167
attr(:socket, Phoenix.LiveView.Socket, required: true, doc: "Parent live view socket")
166168

@@ -507,7 +509,6 @@ defmodule LiveExWebRTC.Publisher do
507509
socket =
508510
receive do
509511
{^ref, %Publisher{id: ^pub_id} = publisher} ->
510-
Process.send_after(self(), :streams_info, 1000)
511512
codecs = publisher.video_codecs || PeerConnection.Configuration.default_video_codecs()
512513
publisher = %Publisher{publisher | simulcast_supported?: simulcast_supported?(codecs)}
513514
assign(socket, publisher: publisher)
@@ -522,7 +523,11 @@ defmodule LiveExWebRTC.Publisher do
522523
end
523524

524525
@impl true
525-
def handle_info({:live_ex_webrtc, :keyframe_req, layer}, socket) do
526+
def handle_info(
527+
{:live_ex_webrtc, :keyframe_req, layer},
528+
%{assigns: %{publisher: %{pc: pc}}} = socket
529+
)
530+
when pc != nil do
526531
%{publisher: publisher} = socket.assigns
527532

528533
# Non-simulcast tracks are always sent with "h" identifier
@@ -535,9 +540,7 @@ defmodule LiveExWebRTC.Publisher do
535540
layer
536541
end
537542

538-
if pc = publisher.pc do
539-
:ok = PeerConnection.send_pli(pc, publisher.video_track.id, layer)
540-
end
543+
:ok = PeerConnection.send_pli(pc, publisher.video_track.id, layer)
541544

542545
{:noreply, socket}
543546
end
@@ -575,6 +578,10 @@ defmodule LiveExWebRTC.Publisher do
575578
def handle_info({:ex_webrtc, _pid, {:connection_state_change, :connected}}, socket) do
576579
%{publisher: pub} = socket.assigns
577580

581+
info_timer = Process.send_after(self(), :streams_info, 1000)
582+
pub = %Publisher{pub | info_timer: info_timer}
583+
socket = assign(socket, :publisher, pub)
584+
578585
if pub.record? do
579586
[
580587
%{kind: :audio, receiver: %{track: audio_track}},
@@ -589,6 +596,11 @@ defmodule LiveExWebRTC.Publisher do
589596
{:noreply, socket}
590597
end
591598

599+
@impl true
600+
def handle_info({:ex_webrtc, _pid, {:connection_state_change, :failed}}, socket) do
601+
{:noreply, bye(socket)}
602+
end
603+
592604
@impl true
593605
def handle_info({:ex_webrtc, _, _}, socket) do
594606
{:noreply, socket}
@@ -598,31 +610,33 @@ defmodule LiveExWebRTC.Publisher do
598610
def handle_info(:streams_info, socket) do
599611
%{publisher: publisher} = socket.assigns
600612

601-
PubSub.broadcast(
602-
publisher.pubsub,
603-
"streams:info:#{publisher.id}",
604-
{:live_ex_webrtc, :info, publisher.audio_track, publisher.video_track}
605-
)
606-
607-
Process.send_after(self(), :streams_info, 1_000)
608-
609-
{:noreply, socket}
613+
if publisher.audio_track != nil or publisher.video_track != nil do
614+
PubSub.broadcast(
615+
publisher.pubsub,
616+
"streams:info:#{publisher.id}",
617+
{:live_ex_webrtc, :info, publisher.audio_track, publisher.video_track}
618+
)
619+
620+
info_timer = Process.send_after(self(), :streams_info, 1_000)
621+
publisher = %Publisher{publisher | info_timer: info_timer}
622+
socket = assign(socket, :publisher, publisher)
623+
{:noreply, socket}
624+
else
625+
{:noreply, socket}
626+
end
610627
end
611628

629+
@impl true
612630
def handle_info(
613631
{:DOWN, _ref, :process, pc, _reason},
614-
%{assigns: %{publisher: %{pc: pc} = pub}} = socket
632+
%{assigns: %{publisher: %{pc: pc}}} = socket
615633
) do
616-
if pub.record? do
617-
recorder_result =
618-
Recorder.end_tracks(pub.recorder, [pub.audio_track.id, pub.video_track.id])
619-
620-
if pub.on_recording_finished, do: pub.on_recording_finished.(pub.id, recorder_result)
621-
end
622-
623-
if pub.on_disconnected, do: pub.on_disconnected.(pub.id)
634+
{:noreply, bye(socket)}
635+
end
624636

625-
{:noreply, assign(socket, publisher: %Publisher{pub | streaming?: false})}
637+
@impl true
638+
def handle_info(_msg, socket) do
639+
{:noreply, socket}
626640
end
627641

628642
@impl true
@@ -647,27 +661,21 @@ defmodule LiveExWebRTC.Publisher do
647661

648662
@impl true
649663
def handle_event("stop-streaming", _, socket) do
650-
{:noreply,
651-
socket
652-
|> assign(publisher: %Publisher{socket.assigns.publisher | streaming?: false})
653-
|> push_event("stop-streaming", %{})}
664+
{:noreply, bye(socket)}
654665
end
655666

656667
@impl true
657668
def handle_event("record-stream-change", params, socket) do
658669
record? = params["value"] == "on"
659-
660-
{:noreply,
661-
socket
662-
|> assign(publisher: %Publisher{socket.assigns.publisher | record?: record?})}
670+
{:noreply, assign(socket, publisher: %Publisher{socket.assigns.publisher | record?: record?})}
663671
end
664672

665673
@impl true
666674
def handle_event("offer", unsigned_params, socket) do
667675
%{publisher: publisher} = socket.assigns
668676
offer = SessionDescription.from_json(unsigned_params)
669677
{:ok, pc} = spawn_peer_connection(socket)
670-
Process.monitor(pc)
678+
pc_mref = Process.monitor(pc)
671679

672680
:ok = PeerConnection.set_remote_description(pc, offer)
673681

@@ -687,6 +695,7 @@ defmodule LiveExWebRTC.Publisher do
687695
new_publisher = %Publisher{
688696
publisher
689697
| pc: pc,
698+
pc_mref: pc_mref,
690699
audio_track: audio_track,
691700
video_track: video_track
692701
}
@@ -731,6 +740,9 @@ defmodule LiveExWebRTC.Publisher do
731740
end
732741
end
733742

743+
@impl true
744+
def terminate(_reason, socket), do: bye(socket)
745+
734746
defp spawn_peer_connection(socket) do
735747
%{publisher: publisher} = socket.assigns
736748

@@ -770,4 +782,58 @@ defmodule LiveExWebRTC.Publisher do
770782
false
771783
end)
772784
end
785+
786+
defp bye(socket) do
787+
%{publisher: publisher} = socket.assigns
788+
789+
if publisher.audio_track != nil or publisher.video_track != nil do
790+
PubSub.broadcast(
791+
publisher.pubsub,
792+
"streams:info:#{publisher.id}",
793+
{:live_ex_webrtc, :bye, publisher.audio_track, publisher.video_track}
794+
)
795+
end
796+
797+
if publisher.info_timer != nil, do: Process.cancel_timer(publisher.info_timer)
798+
799+
receive do
800+
:streams_info -> :ok
801+
after
802+
0 -> :ok
803+
end
804+
805+
try do
806+
Process.demonitor(publisher.pc_mref)
807+
PeerConnection.close(publisher.pc)
808+
catch
809+
_, _ -> :ok
810+
end
811+
812+
if publisher.record? and publisher.audio_track != nil and publisher.video_track != nil do
813+
recorder_result =
814+
Recorder.end_tracks(publisher.recorder, [
815+
publisher.audio_track.id,
816+
publisher.video_track.id
817+
])
818+
819+
if publisher.on_recording_finished,
820+
do: publisher.on_recording_finished.(publisher.id, recorder_result)
821+
end
822+
823+
if publisher.on_disconnected, do: publisher.on_disconnected.(publisher.id)
824+
825+
publisher = %Publisher{
826+
publisher
827+
| info_timer: nil,
828+
audio_track: nil,
829+
video_track: nil,
830+
streaming?: false,
831+
pc: nil,
832+
pc_mref: nil
833+
}
834+
835+
socket
836+
|> assign(:publisher, publisher)
837+
|> push_event("stop-streaming", %{})
838+
end
773839
end

0 commit comments

Comments
 (0)