Skip to content

Commit 8109856

Browse files
authored
DataChannels: allow for closing, apply negotiated stream parameters (#161)
1 parent 637f2d7 commit 8109856

File tree

6 files changed

+191
-23
lines changed

6 files changed

+191
-23
lines changed

examples/chat/lib/chat/peer_handler.ex

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,5 +103,10 @@ defmodule Chat.PeerHandler do
103103
{:ok, state}
104104
end
105105

106+
defp handle_webrtc_msg({:data_channel_state_change, ref, :closed}, %{channel_ref: ref} = state) do
107+
Logger.warning("Channel #{inspect(ref)} has been closed")
108+
{:stop, :channel_closed, state}
109+
end
110+
106111
defp handle_webrtc_msg(_msg, state), do: {:ok, state}
107112
end

examples/chat/mix.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
"ex_libsrtp": {:hex, :ex_libsrtp, "0.7.2", "211bd89c08026943ce71f3e2c0231795b99cee748808ed3ae7b97cd8d2450b6b", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "2e20645d0d739a4ecdcf8d4810a0c198120c8a2f617f2b75b2e2e704d59f492a"},
1313
"ex_rtcp": {:hex, :ex_rtcp, "0.4.0", "f9e515462a9581798ff6413583a25174cfd2101c94a2ebee871cca7639886f0a", [:mix], [], "hexpm", "28956602cf210d692fcdaf3f60ca49681634e1deb28ace41246aee61ee22dc3b"},
1414
"ex_rtp": {:hex, :ex_rtp, "0.4.0", "1f1b5c1440a904706011e3afbb41741f5da309ce251cb986690ce9fd82636658", [:mix], [], "hexpm", "0f72d80d5953a62057270040f0f1ee6f955c08eeae82ac659c038001d7d5a790"},
15-
"ex_sctp": {:git, "https://github.com/elixir-webrtc/ex_sctp.git", "1576207bda0eba3634a2b0075899042e9b309e60", []},
15+
"ex_sctp": {:git, "https://github.com/elixir-webrtc/ex_sctp.git", "0e9f33fc220ee7f2b1f5843d5ab02cb7c229d837", []},
1616
"ex_sdp": {:hex, :ex_sdp, "1.0.0", "c66cd66d60ad03ff1eecdc6db6a1b8a7b89fec260fcc22e8d6703fc5bbf430a3", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}], "hexpm", "e165dff988b8ab9d93588636aa5f3f683e1f848fc63b78b12382c8fa3dd39216"},
1717
"ex_stun": {:hex, :ex_stun, "0.2.0", "feb1fc7db0356406655b2a617805e6c712b93308c8ea2bf0ba1197b1f0866deb", [:mix], [], "hexpm", "1e01ba8290082ccbf37acaa5190d1f69b51edd6de2026a8d6d51368b29d115d0"},
1818
"ex_turn": {:hex, :ex_turn, "0.1.0", "177405aadf3d754567d0d37cf881a83f9cacf8f45314d188633b04c4a9e7c1ec", [:mix], [{:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}], "hexpm", "d677737fb7d45274d5dac19fe3c26b9038b6effbc0a6b3e7417bccc76b6d1cd3"},

lib/ex_webrtc/peer_connection.ex

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,8 @@ defmodule ExWebRTC.PeerConnection do
173173
Sends data over DataChannel, using channel identified by `ref`.
174174
175175
Requires the channel to be in `:open` state.
176+
177+
If `ref` does not identify any DataChannel, this function behaves like no-op.
176178
"""
177179
@spec send_data(peer_connection(), DataChannel.ref(), binary()) :: :ok
178180
def send_data(peer_connection, channel_ref, data) do
@@ -435,6 +437,33 @@ defmodule ExWebRTC.PeerConnection do
435437
GenServer.call(peer_connection, {:create_data_channel, label, opts})
436438
end
437439

440+
@doc """
441+
Closes a DataChannel identified by `ref`.
442+
443+
As of now, the closed channel directly transitions to closed state,
444+
which is signaled with `{:data_channel_state_change, ref, :closed}` message.
445+
For more information, refer to the [RTCDataChannel: close() method](https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel/close).
446+
447+
If `ref` does not identify any DataChannel, this function behaves like no-op.
448+
"""
449+
@spec close_data_channel(peer_connection(), DataChannel.ref()) :: :ok
450+
def close_data_channel(peer_connection, channel_ref) do
451+
GenServer.call(peer_connection, {:close_data_channel, channel_ref})
452+
end
453+
454+
@doc """
455+
Returns `t:ExWebRTC.DataChannel.t()` identified by `channel_ref` if it exists, `nil` otherwise.
456+
457+
This function can be especially helpful when you want to obtain DataChannel `id`. Normally,
458+
before SCTP connection is established, `create_data_channel/3` will return DataChannel struct
459+
with `id` set to `nil`. After receiving `{:data_channel_state_change, ref, :open}` message,
460+
you can call this function to obtain the same struct, but with `id` set to proper value.
461+
"""
462+
@spec get_data_channel(peer_connection(), DataChannel.ref()) :: DataChannel.t() | nil
463+
def get_data_channel(peer_connection, channel_ref) do
464+
GenServer.call(peer_connection, {:get_data_channel, channel_ref})
465+
end
466+
438467
@doc """
439468
Closes the PeerConnection.
440469
@@ -912,6 +941,20 @@ defmodule ExWebRTC.PeerConnection do
912941
end
913942
end
914943

944+
@impl true
945+
def handle_call({:close_data_channel, channel_ref}, _from, state) do
946+
{events, sctp_transport} = SCTPTransport.close_channel(state.sctp_transport, channel_ref)
947+
handle_sctp_events(events, state)
948+
949+
{:reply, :ok, %{state | sctp_transport: sctp_transport}}
950+
end
951+
952+
@impl true
953+
def handle_call({:get_data_channel, channel_ref}, _from, state) do
954+
channel = SCTPTransport.get_channel(state.sctp_transport, channel_ref)
955+
{:reply, channel, state}
956+
end
957+
915958
@impl true
916959
def handle_call(:get_stats, _from, state) do
917960
timestamp = System.os_time(:millisecond)

lib/ex_webrtc/sctp_transport.ex

Lines changed: 101 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,33 @@ defmodule ExWebRTC.SCTPTransport do
8080
{events, channel, sctp_transport}
8181
end
8282

83-
# TODO: close channel
83+
@spec close_channel(t(), DataChannel.ref()) :: {[event()], t()}
84+
def close_channel(sctp_transport, ref) do
85+
# TODO: according to spec, this should move to `closing` state
86+
# and only then be closed, but oh well...
87+
case Map.pop(sctp_transport.channels, ref) do
88+
{nil, _channels} ->
89+
Logger.warning("Trying to close non-existent channel with ref #{inspect(ref)}")
90+
{[], sctp_transport}
91+
92+
{%DataChannel{id: id}, channels} ->
93+
sctp_transport = %{sctp_transport | channels: channels}
94+
95+
{events, sctp_transport} =
96+
if id != nil do
97+
:ok = ExSCTP.close_stream(sctp_transport.ref, id)
98+
handle_events(sctp_transport)
99+
else
100+
{[], sctp_transport}
101+
end
102+
103+
event = {:state_change, ref, :closed}
104+
{[event | events], sctp_transport}
105+
end
106+
end
107+
108+
@spec get_channel(t(), DataChannel.ref()) :: DataChannel.t() | nil
109+
def get_channel(sctp_transport, ref), do: Map.get(sctp_transport.channels, ref)
84110

85111
@spec send(t(), DataChannel.ref(), :string | :binary, binary()) :: {[event()], t()}
86112
def send(sctp_transport, ref, type, data) do
@@ -99,7 +125,10 @@ defmodule ExWebRTC.SCTPTransport do
99125
{[], sctp_transport}
100126

101127
:error ->
102-
Logger.warning("Trying to send data over non-existing DataChannel with ref #{ref}")
128+
Logger.warning(
129+
"Trying to send data over non-existent DataChannel with ref #{inspect(ref)}"
130+
)
131+
103132
{[], sctp_transport}
104133
end
105134
end
@@ -157,6 +186,7 @@ defmodule ExWebRTC.SCTPTransport do
157186
case handle_event(sctp_transport, event) do
158187
{:none, transport} -> {Enum.reverse(events), transport}
159188
{nil, transport} -> handle_events(transport, events)
189+
{other, transport} when is_list(other) -> handle_events(transport, other ++ events)
160190
{other, transport} -> handle_events(transport, [other | events])
161191
end
162192
end
@@ -171,9 +201,18 @@ defmodule ExWebRTC.SCTPTransport do
171201
{nil, sctp_transport}
172202
end
173203

174-
defp handle_event(sctp_transport, {:stream_closed, _id}) do
175-
# TODO: handle closing channels
176-
{nil, sctp_transport}
204+
defp handle_event(sctp_transport, {:stream_closed, id}) do
205+
Logger.debug("SCTP stream #{id} has been closed")
206+
207+
case Enum.find(sctp_transport.channels, fn {_k, v} -> v.id == id end) do
208+
{ref, %DataChannel{ref: ref}} ->
209+
channels = Map.delete(sctp_transport.channels, ref)
210+
event = {:state_change, ref, :closed}
211+
{event, %{sctp_transport | channels: channels}}
212+
213+
_other ->
214+
{nil, sctp_transport}
215+
end
177216
end
178217

179218
defp handle_event(sctp_transport, :connected) do
@@ -206,13 +245,23 @@ defmodule ExWebRTC.SCTPTransport do
206245

207246
defp handle_event(sctp_transport, {:data, id, @dcep_ppi, data}) do
208247
with {:ok, dcep} <- DCEP.decode(data),
209-
{:ok, sctp_transport, event} <- handle_dcep(sctp_transport, id, dcep) do
210-
{event, sctp_transport}
248+
{:ok, sctp_transport, events} <- handle_dcep(sctp_transport, id, dcep) do
249+
# events is either list or a single event
250+
{events, sctp_transport}
211251
else
212252
:error ->
213-
# TODO: close the channel
214253
Logger.warning("Received invalid DCEP message. Closing the stream with id #{id}")
215-
{nil, sctp_transport}
254+
255+
ExSCTP.close_stream(sctp_transport.ref, id)
256+
257+
case Enum.find_value(sctp_transport.channels, fn {_k, v} -> v.id == id end) do
258+
{ref, %DataChannel{}} ->
259+
channels = Map.delete(sctp_transport.channels, ref)
260+
{{:state_change, ref, :closed}, %{sctp_transport | channels: channels}}
261+
262+
nil ->
263+
{nil, sctp_transport}
264+
end
216265
end
217266
end
218267

@@ -228,7 +277,7 @@ defmodule ExWebRTC.SCTPTransport do
228277

229278
nil ->
230279
Logger.warning(
231-
"Received data over non-existing DataChannel on stream with id #{id}. Discarding"
280+
"Received data over non-existent DataChannel on stream with id #{id}. Discarding"
232281
)
233282

234283
{nil, sctp_transport}
@@ -258,28 +307,63 @@ defmodule ExWebRTC.SCTPTransport do
258307
}
259308

260309
# In theory, we should also send the :open event here (W3C 6.2.3)
310+
# TODO
261311
channels = Map.put(sctp_transport.channels, channel.ref, channel)
262-
{:ok, %{sctp_transport | channels: channels}, {:channel, channel}}
312+
sctp_transport = %{sctp_transport | channels: channels}
313+
314+
case ExSCTP.configure_stream(
315+
sctp_transport.ref,
316+
id,
317+
channel.ordered,
318+
dco.reliability,
319+
dco.param
320+
) do
321+
:ok ->
322+
# remote channels also result in open event
323+
# even tho they already have ready_state open in the {:data_channel, ...} message
324+
# W3C 6.2.3
325+
events = [{:state_change, channel.ref, :open}, {:channel, channel}]
326+
{:ok, sctp_transport, events}
327+
328+
{:error, _res} ->
329+
Logger.warning("Unable to set stream #{id} parameters")
330+
:error
331+
end
263332
else
264-
_other -> :error
333+
_other ->
334+
Logger.warning("Received invalid DCEP Open on stream #{id}")
335+
:error
265336
end
266337
end
267338

268339
defp handle_dcep(sctp_transport, id, %DCEP.DataChannelAck{}) do
269340
case Enum.find(sctp_transport.channels, fn {_k, v} -> v.id == id end) do
270341
{ref, %DataChannel{ready_state: :connecting} = channel} ->
271342
Logger.debug("Locally opened DataChannel #{id} has been negotiated succesfully")
272-
# TODO: set the parameters
343+
273344
channel = %DataChannel{channel | ready_state: :open}
274345
channels = Map.put(sctp_transport.channels, ref, channel)
275-
event = {:state_change, ref, :open}
346+
sctp_transport = %{sctp_transport | channels: channels}
347+
348+
{rel_type, rel_param} =
349+
case channel do
350+
%DataChannel{max_packet_life_time: nil, max_retransmits: nil} -> {:reliable, 0}
351+
%DataChannel{max_retransmits: v} when v != nil -> {:rexmit, v}
352+
%DataChannel{max_packet_life_time: v} when v != nil -> {:timed, v}
353+
end
354+
355+
case ExSCTP.configure_stream(sctp_transport.ref, id, channel.ordered, rel_type, rel_param) do
356+
:ok ->
357+
{:ok, sctp_transport, {:state_change, ref, :open}}
276358

277-
{:ok, %{sctp_transport | channels: channels}, event}
359+
{:error, _res} ->
360+
Logger.warning("Unable to set stream #{id} parameters")
361+
:error
362+
end
278363

279364
_other ->
280-
# TODO: close the channel
281365
Logger.warning("Received DCEP Ack without sending the DCEP Open message on stream #{id}")
282-
{:ok, sctp_transport, nil}
366+
:error
283367
end
284368
end
285369

mix.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
"ex_libsrtp": {:hex, :ex_libsrtp, "0.7.2", "211bd89c08026943ce71f3e2c0231795b99cee748808ed3ae7b97cd8d2450b6b", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "2e20645d0d739a4ecdcf8d4810a0c198120c8a2f617f2b75b2e2e704d59f492a"},
1818
"ex_rtcp": {:hex, :ex_rtcp, "0.4.0", "f9e515462a9581798ff6413583a25174cfd2101c94a2ebee871cca7639886f0a", [:mix], [], "hexpm", "28956602cf210d692fcdaf3f60ca49681634e1deb28ace41246aee61ee22dc3b"},
1919
"ex_rtp": {:hex, :ex_rtp, "0.4.0", "1f1b5c1440a904706011e3afbb41741f5da309ce251cb986690ce9fd82636658", [:mix], [], "hexpm", "0f72d80d5953a62057270040f0f1ee6f955c08eeae82ac659c038001d7d5a790"},
20-
"ex_sctp": {:git, "https://github.com/elixir-webrtc/ex_sctp.git", "3de986b3cd00796a1f2a5ac40b001682d5712264", []},
20+
"ex_sctp": {:git, "https://github.com/elixir-webrtc/ex_sctp.git", "0e9f33fc220ee7f2b1f5843d5ab02cb7c229d837", []},
2121
"ex_sdp": {:hex, :ex_sdp, "1.0.0", "c66cd66d60ad03ff1eecdc6db6a1b8a7b89fec260fcc22e8d6703fc5bbf430a3", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}], "hexpm", "e165dff988b8ab9d93588636aa5f3f683e1f848fc63b78b12382c8fa3dd39216"},
2222
"ex_stun": {:hex, :ex_stun, "0.2.0", "feb1fc7db0356406655b2a617805e6c712b93308c8ea2bf0ba1197b1f0866deb", [:mix], [], "hexpm", "1e01ba8290082ccbf37acaa5190d1f69b51edd6de2026a8d6d51368b29d115d0"},
2323
"ex_turn": {:hex, :ex_turn, "0.1.0", "177405aadf3d754567d0d37cf881a83f9cacf8f45314d188633b04c4a9e7c1ec", [:mix], [{:ex_stun, "~> 0.2.0", [hex: :ex_stun, repo: "hexpm", optional: false]}], "hexpm", "d677737fb7d45274d5dac19fe3c26b9038b6effbc0a6b3e7417bccc76b6d1cd3"},

test/ex_webrtc/data_channel_test.exs

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ defmodule ExWebRTC.DataChannelTest do
1010
{:ok, pc2} = PeerConnection.start_link()
1111

1212
label1 = "my label 1"
13-
{:ok, %DataChannel{ref: ref1}} = PeerConnection.create_data_channel(pc1, label1)
13+
{:ok, %DataChannel{ref: ref1, id: nil}} = PeerConnection.create_data_channel(pc1, label1)
1414
assert_receive {:ex_webrtc, ^pc1, :negotiation_needed}
1515

1616
:ok = negotiate(pc1, pc2)
@@ -20,9 +20,12 @@ defmodule ExWebRTC.DataChannelTest do
2020
:ok = connect(pc1, pc2)
2121

2222
assert_receive {:ex_webrtc, ^pc2, {:data_channel, chan1}}
23-
assert %DataChannel{id: 1, label: ^label1, ordered: true} = chan1
23+
assert %DataChannel{ref: rem_ref1, id: 1, label: ^label1, ordered: true} = chan1
24+
assert_receive {:ex_webrtc, ^pc2, {:data_channel_state_change, ^rem_ref1, :open}}
2425
assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^ref1, :open}}
2526

27+
assert %DataChannel{id: 1} = PeerConnection.get_data_channel(pc1, ref1)
28+
2629
label2 = "my label 2"
2730
protocol = "my proto"
2831

@@ -32,7 +35,11 @@ defmodule ExWebRTC.DataChannelTest do
3235
refute_receive {:ex_webrtc, ^pc1, :negotiation_needed}
3336

3437
assert_receive {:ex_webrtc, ^pc2, {:data_channel, chan2}}
35-
assert %DataChannel{id: 3, label: ^label2, protocol: ^protocol, ordered: false} = chan2
38+
39+
assert %DataChannel{ref: rem_ref2, id: 3, label: ^label2, protocol: ^protocol, ordered: false} =
40+
chan2
41+
42+
assert_receive {:ex_webrtc, ^pc2, {:data_channel_state_change, ^rem_ref2, :open}}
3643
assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^ref2, :open}}
3744

3845
label3 = "my label 3"
@@ -41,8 +48,37 @@ defmodule ExWebRTC.DataChannelTest do
4148
refute_receive {:ex_webrtc, ^pc2, :negotiation_needed}
4249

4350
assert_receive {:ex_webrtc, ^pc1, {:data_channel, chan3}}
44-
assert %DataChannel{id: 4, label: ^label3} = chan3
51+
assert %DataChannel{ref: rem_ref3, id: 4, label: ^label3} = chan3
4552
assert_receive {:ex_webrtc, ^pc2, {:data_channel_state_change, ^ref3, :open}}
53+
assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^rem_ref3, :open}}
54+
end
55+
56+
describe "closing the channel" do
57+
setup do
58+
{:ok, pc1} = PeerConnection.start_link()
59+
{:ok, pc2} = PeerConnection.start_link()
60+
{:ok, %DataChannel{ref: ref1}} = PeerConnection.create_data_channel(pc1, "label")
61+
62+
:ok = negotiate(pc1, pc2)
63+
:ok = connect(pc1, pc2)
64+
65+
assert_receive {:ex_webrtc, ^pc2, {:data_channel, %DataChannel{ref: ref2}}}
66+
assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^ref1, :open}}
67+
68+
%{pc1: pc1, pc2: pc2, ref1: ref1, ref2: ref2}
69+
end
70+
71+
test "by initiating peer", %{pc1: pc1, pc2: pc2, ref1: ref1, ref2: ref2} do
72+
assert :ok = PeerConnection.close_data_channel(pc1, ref1)
73+
assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^ref1, :closed}}
74+
assert_receive {:ex_webrtc, ^pc2, {:data_channel_state_change, ^ref2, :closed}}
75+
end
76+
77+
test "by receiving peer", %{pc1: pc1, pc2: pc2, ref1: ref1, ref2: ref2} do
78+
assert :ok = PeerConnection.close_data_channel(pc2, ref2)
79+
assert_receive {:ex_webrtc, ^pc1, {:data_channel_state_change, ^ref1, :closed}}
80+
assert_receive {:ex_webrtc, ^pc2, {:data_channel_state_change, ^ref2, :closed}}
81+
end
4682
end
4783

4884
describe "negotiating" do

0 commit comments

Comments
 (0)