Skip to content

Add PeerConneciton.set_packet_loss #182

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 32 additions & 9 deletions lib/ex_webrtc/dtls_transport.ex
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ defmodule ExWebRTC.DTLSTransport do
GenServer.cast(dtls_transport, {:send_data, data})
end

@spec set_packet_loss(dtls_transport(), 0..100) :: :ok
def set_packet_loss(dtls_transport, packet_loss) do
GenServer.cast(dtls_transport, {:set_packet_loss, packet_loss})
end

@spec stop(dtls_transport()) :: :ok
def stop(dtls_transport) do
GenServer.stop(dtls_transport)
Expand Down Expand Up @@ -127,7 +132,8 @@ defmodule ExWebRTC.DTLSTransport do
peer_fingerprint: nil,
dtls_state: :new,
dtls: nil,
mode: nil
mode: nil,
packet_loss: 0
}

notify(state.owner, {:state_change, :new})
Expand All @@ -142,7 +148,7 @@ defmodule ExWebRTC.DTLSTransport do
if state.mode == :active do
{packets, timeout} = ExDTLS.do_handshake(state.dtls)
Process.send_after(self(), :dtls_timeout, timeout)
:ok = state.ice_transport.send_data(state.ice_pid, packets)
:ok = do_send(state, packets)
state = update_dtls_state(state, :connecting)
Logger.debug("Started DTLS handshake")
{:reply, :ok, state}
Expand All @@ -157,7 +163,7 @@ defmodule ExWebRTC.DTLSTransport do

if state.buffered_local_packets do
Logger.debug("Sending buffered DTLS packets")
:ok = state.ice_transport.send_data(state.ice_pid, state.buffered_local_packets)
:ok = do_send(state, state.buffered_local_packets)
state = %{state | buffered_local_packets: nil}
{:reply, :ok, state}
else
Expand Down Expand Up @@ -236,7 +242,7 @@ defmodule ExWebRTC.DTLSTransport do
@impl true
def handle_cast({:send_rtp, data}, %{dtls_state: :connected, ice_connected: true} = state) do
case ExLibSRTP.protect(state.out_srtp, data) do
{:ok, protected} -> state.ice_transport.send_data(state.ice_pid, protected)
{:ok, protected} -> do_send(state, protected)
{:error, reason} -> Logger.warning("Unable to protect RTP: #{inspect(reason)}")
end

Expand All @@ -252,7 +258,7 @@ defmodule ExWebRTC.DTLSTransport do
@impl true
def handle_cast({:send_rtcp, data}, state) do
case ExLibSRTP.protect_rtcp(state.out_srtp, data) do
{:ok, protected} -> state.ice_transport.send_data(state.ice_pid, protected)
{:ok, protected} -> do_send(state, protected)
{:error, reason} -> Logger.warning("Unable to protect RTCP: #{inspect(reason)}")
end

Expand All @@ -262,18 +268,24 @@ defmodule ExWebRTC.DTLSTransport do
@impl true
def handle_cast({:send_data, data}, state) do
case ExDTLS.write_data(state.dtls, data) do
{:ok, protected} -> state.ice_transport.send_data(state.ice_pid, protected)
{:ok, protected} -> do_send(state, protected)
{:error, reason} -> Logger.warning("Unable to protect data: #{inspect(reason)}")
end

{:noreply, state}
end

@impl true
def handle_cast({:set_packet_loss, value}, state) do
state = %{state | packet_loss: value}
{:noreply, state}
end

@impl true
def handle_info(:dtls_timeout, %{buffered_local_packets: buffered_local_packets} = state) do
case ExDTLS.handle_timeout(state.dtls) do
{:retransmit, packets, timeout} when state.ice_connected ->
state.ice_transport.send_data(state.ice_pid, packets)
do_send(state, packets)
Logger.debug("Retransmitted DTLS packets")
Process.send_after(self(), :dtls_timeout, timeout)

Expand Down Expand Up @@ -327,7 +339,7 @@ defmodule ExWebRTC.DTLSTransport do
defp handle_ice_data({:data, <<f, _rest::binary>> = data}, state) when f in 20..63 do
case ExDTLS.handle_data(state.dtls, data) do
{:handshake_packets, packets, timeout} when state.ice_connected ->
:ok = state.ice_transport.send_data(state.ice_pid, packets)
:ok = do_send(state, packets)
Process.send_after(self(), :dtls_timeout, timeout)
state = update_dtls_state(state, :connecting)
{:ok, state}
Expand All @@ -346,7 +358,7 @@ defmodule ExWebRTC.DTLSTransport do
{:handshake_finished, lkm, rkm, profile, packets} ->
Logger.debug("DTLS handshake finished")
state = update_remote_cert_info(state)
state.ice_transport.send_data(state.ice_pid, packets)
do_send(state, packets)

peer_fingerprint =
state.dtls
Expand Down Expand Up @@ -466,5 +478,16 @@ defmodule ExWebRTC.DTLSTransport do
%{state | buffered_remote_rtp_packets: []}
end

defp do_send(%{packet_loss: 0} = state, data),
do: state.ice_transport.send_data(state.ice_pid, data)

defp do_send(state, data) do
if Enum.random(1..100) > state.packet_loss do
state.ice_transport.send_data(state.ice_pid, data)
else
:ok
end
end

defp notify(dst, msg), do: send(dst, {:dtls_transport, self(), msg})
end
16 changes: 16 additions & 0 deletions lib/ex_webrtc/peer_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,16 @@ defmodule ExWebRTC.PeerConnection do
GenServer.cast(peer_connection, {:send_data, channel_ref, data})
end

@doc """
Sets very simple packet loss.

Can be used for experimental purposes.
"""
@spec set_packet_loss(peer_connection(), 0..100) :: :ok
def set_packet_loss(peer_connection, value) when value in 0..100 do
GenServer.cast(peer_connection, {:set_packet_loss, value})
end

#### MDN-API ####

@doc """
Expand Down Expand Up @@ -1162,6 +1172,12 @@ defmodule ExWebRTC.PeerConnection do
{:noreply, %{state | sctp_transport: sctp_transport}}
end

@impl true
def handle_cast({:set_packet_loss, packet_loss}, state) do
DTLSTransport.set_packet_loss(state.dtls_transport, packet_loss)
{:noreply, state}
end

@impl true
def handle_info({:ex_ice, _from, {:connection_state_change, new_ice_state}}, state) do
state = %{state | ice_state: new_ice_state}
Expand Down
58 changes: 57 additions & 1 deletion test/ex_webrtc/dtls_transport_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ defmodule ExWebRTC.DTLSTransportTest do
|> ExDTLS.get_cert_fingerprint()
|> Utils.hex_dump()

@rtp_header <<1::1, 0::1, 0::1, 0::1, 0::4, 0::1, 96::7, 1::16, 1::32, 1::32>>
@rtp_payload <<0>>
@rtp_packet <<@rtp_header::binary, @rtp_payload::binary>>

# empty rr packet
@rtcp_rr_header <<2::2, 0::1, 0::5, 201::8, 1::16, 1::32>>
@rtcp_rr_packet <<@rtcp_rr_header::binary>>

defmodule MockICETransport do
@behaviour ExWebRTC.ICETransport

Expand Down Expand Up @@ -87,7 +95,7 @@ defmodule ExWebRTC.DTLSTransportTest do
end

test "cannot send data when handshake not finished", %{dtls: dtls} do
DTLSTransport.send_rtp(dtls, <<1, 2, 3>>)
DTLSTransport.send_rtp(dtls, @rtp_packet)

refute_receive {:mock_ice, _data}
end
Expand Down Expand Up @@ -175,6 +183,14 @@ defmodule ExWebRTC.DTLSTransportTest do
assert :ok = check_handshake(dtls, ice_transport, ice_pid, remote_dtls)
assert_receive {:dtls_transport, ^dtls, {:state_change, :connecting}}
assert_receive {:dtls_transport, ^dtls, {:state_change, :connected}}

# assert we can send data
assert :ok = DTLSTransport.send_rtp(dtls, @rtp_packet)
assert_receive {:mock_ice, <<@rtp_header::binary, _payload::binary>>}
assert :ok = DTLSTransport.send_rtcp(dtls, @rtcp_rr_packet)
assert_receive {:mock_ice, <<@rtcp_rr_header::binary, _payload::binary>>}
assert :ok = DTLSTransport.send_data(dtls, <<1, 2, 3>>)
assert_receive {:mock_ice, _datachannel_packet}
end

test "finishes handshake in passive mode", %{
Expand All @@ -200,6 +216,46 @@ defmodule ExWebRTC.DTLSTransportTest do
assert :ok == check_handshake(dtls, ice_transport, ice_pid, remote_dtls)
assert_receive {:dtls_transport, ^dtls, {:state_change, :connecting}}
assert_receive {:dtls_transport, ^dtls, {:state_change, :connected}}

# assert we can send data
assert :ok = DTLSTransport.send_rtp(dtls, @rtp_packet)
assert_receive {:mock_ice, <<@rtp_header::binary, _payload::binary>>}
assert :ok = DTLSTransport.send_rtcp(dtls, @rtcp_rr_packet)
assert_receive {:mock_ice, <<@rtcp_rr_header::binary, _payload::binary>>}
assert :ok = DTLSTransport.send_data(dtls, <<1, 2, 3>>)
assert_receive {:mock_ice, _datachannel_packet}
end

test "drops packets when packet loss is set", %{
dtls: dtls,
ice_transport: ice_transport,
ice_pid: ice_pid
} do
:ok = DTLSTransport.start_dtls(dtls, :active, @fingerprint)
remote_dtls = ExDTLS.init(mode: :server, dtls_srtp: true)

:ok = DTLSTransport.set_ice_connected(dtls)

assert :ok = check_handshake(dtls, ice_transport, ice_pid, remote_dtls)
assert_receive {:dtls_transport, ^dtls, {:state_change, :connecting}}
assert_receive {:dtls_transport, ^dtls, {:state_change, :connected}}

# assert we can send data
DTLSTransport.send_rtp(dtls, @rtp_packet)
assert_receive {:mock_ice, <<@rtp_header::binary, _payload::binary>>}
DTLSTransport.send_rtcp(dtls, @rtcp_rr_packet)
assert_receive {:mock_ice, <<@rtcp_rr_packet::binary, _rest::binary>>}
DTLSTransport.send_data(dtls, <<1, 2, 3>>)
assert_receive {:mock_ice, _datachannel_packet}

# now set packet-loss
DTLSTransport.set_packet_loss(dtls, 100)
DTLSTransport.send_rtp(dtls, @rtp_packet)
refute_receive {:mock_ice, _rtp_packet}
DTLSTransport.send_rtcp(dtls, @rtcp_rr_packet)
refute_receive {:mock_ice, _rtcp_rr_packet}
DTLSTransport.send_data(dtls, <<1, 2, 3>>)
refute_receive {:mock_ice, _datachannel_packet}
end

defp check_handshake(dtls, ice_transport, ice_pid, remote_dtls) do
Expand Down
Loading