Skip to content

Commit 25f156b

Browse files
authored
Initial standalone Recorder+Converter implementation (#1)
* Changes from https://github.com/elixir-webrtc/ex_webrtc/pull/197/files * Review fixes * Update README and mix.exs * Update ex_webrtc dep
1 parent 61e5508 commit 25f156b

File tree

12 files changed

+1371
-1
lines changed

12 files changed

+1371
-1
lines changed

README.md

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,40 @@
11
# ExWebRTC Recorder
22

3-
TODO WRITEME
3+
[![Hex.pm](https://img.shields.io/hexpm/v/ex_webrtc_recorder.svg)](https://hex.pm/packages/ex_webrtc_recorder)
4+
[![API Docs](https://img.shields.io/badge/api-docs-yellow.svg?style=flat)](https://hexdocs.pm/ex_webrtc_recorder)
5+
6+
Records and processes RTP packets sent and received using [ExWebRTC](https://github.com/elixir-webrtc/ex_webrtc).
7+
8+
## Installation
9+
10+
Add `:ex_webrtc_recorder` to your list of dependencies
11+
12+
```elixir
13+
def deps do
14+
[
15+
{:ex_webrtc_recorder, "~> 0.1.0"}
16+
]
17+
end
18+
```
19+
20+
If you want to use Converter to generate WEBM files from the recordings,
21+
you need to have the `ffmpeg` binary with the relevant libraries present in `PATH`.
22+
23+
### S3
24+
25+
ExWebRTC Recorder comes with optional support for uploading the recordings to S3-compatible storage,
26+
but it must be explicitly turned on by adding the following dependencies:
27+
28+
```elixir
29+
def deps do
30+
[
31+
{:ex_webrtc_recorder, "~> 0.1.0"},
32+
{:ex_aws_s3, "~> 2.5"},
33+
{:ex_aws, "~> 2.5"},
34+
{:sweet_xml, "~> 0.7"},
35+
{:req, "~> 0.5"} # or any other HTTP client supported by `ex_aws`
36+
]
37+
end
38+
```
39+
40+
See `ExWebRTC.Recorder.S3` for more info.

lib/ex_webrtc_recorder.ex

Lines changed: 351 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,351 @@
1+
defmodule ExWebRTC.Recorder do
2+
@moduledoc """
3+
Saves received RTP packets to a file for later processing/analysis.
4+
5+
Dumps raw RTP packets fed to it in a custom format. Use `ExWebRTC.Recorder.Converter` to process them.
6+
7+
Can optionally upload the saved files to S3-compatible storage.
8+
See `ExWebRTC.Recorder.S3` and `t:options/0` for more info.
9+
"""
10+
11+
alias ExWebRTC.MediaStreamTrack
12+
alias __MODULE__.S3
13+
14+
require Logger
15+
16+
use GenServer
17+
18+
@default_base_dir "./recordings"
19+
20+
@type recorder :: GenServer.server()
21+
22+
@typedoc """
23+
Options that can be passed to `start_link/1`.
24+
25+
* `:base_dir` - Base directory where Recorder will save its artifacts. `#{@default_base_dir}` by default.
26+
* `:on_start` - Callback that will be executed just after the Recorder is (re)started.
27+
It should return the initial list of tracks to be added.
28+
* `:controlling_process` - PID of a process where all messages will be sent. `self()` by default.
29+
* `:s3_upload_config` - If passed, finished recordings will be uploaded to S3-compatible storage.
30+
See `t:ExWebRTC.Recorder.S3.upload_config/0` for more info.
31+
"""
32+
@type option ::
33+
{:base_dir, String.t()}
34+
| {:on_start, (-> [MediaStreamTrack.t()])}
35+
| {:controlling_process, Process.dest()}
36+
| {:s3_upload_config, S3.upload_config()}
37+
38+
@type options :: [option()]
39+
40+
@typedoc """
41+
Messages sent by the `ExWebRTC.Recorder` process to its controlling process.
42+
43+
* `:upload_complete`, `:upload_failed` - Sent after the completion of the upload task, identified by its reference.
44+
Contains the updated manifest with `s3://` scheme URLs to uploaded files.
45+
"""
46+
@type message ::
47+
{:ex_webrtc_recorder, pid(),
48+
{:upload_complete, S3.upload_task_ref(), __MODULE__.Manifest.t()}
49+
| {:upload_failed, S3.upload_task_ref(), __MODULE__.Manifest.t()}}
50+
51+
# Necessary to start Recorder under a supervisor using `{Recorder, [recorder_opts, gen_server_opts]}`
52+
@doc false
53+
@spec child_spec(list()) :: Supervisor.child_spec()
54+
def child_spec(args) do
55+
%{
56+
id: __MODULE__,
57+
start: {__MODULE__, :start_link, args}
58+
}
59+
end
60+
61+
@doc """
62+
Starts a new `ExWebRTC.Recorder` process.
63+
64+
`ExWebRTC.Recorder` is a `GenServer` under the hood, thus this function allows for
65+
passing the generic `t:GenServer.options/0` as an argument.
66+
"""
67+
@spec start(options(), GenServer.options()) :: GenServer.on_start()
68+
def start(recorder_opts \\ [], gen_server_opts \\ []) do
69+
config =
70+
recorder_opts
71+
|> Keyword.put_new(:controlling_process, self())
72+
73+
GenServer.start(__MODULE__, config, gen_server_opts)
74+
end
75+
76+
@doc """
77+
Starts a new `ExWebRTC.Recorder` process.
78+
79+
Works identically to `start/2`, but links to the calling process.
80+
"""
81+
@spec start_link(options(), GenServer.options()) :: GenServer.on_start()
82+
def start_link(recorder_opts \\ [], gen_server_opts \\ []) do
83+
config =
84+
recorder_opts
85+
|> Keyword.put_new(:controlling_process, self())
86+
87+
GenServer.start_link(__MODULE__, config, gen_server_opts)
88+
end
89+
90+
@doc """
91+
Adds new tracks to the recording.
92+
93+
Returns the part of the recording manifest that's relevant to the freshly added tracks.
94+
See `t:ExWebRTC.Recorder.Manifest.t/0` for more info.
95+
"""
96+
@spec add_tracks(recorder(), [MediaStreamTrack.t()]) :: {:ok, __MODULE__.Manifest.t()}
97+
def add_tracks(recorder, tracks) do
98+
GenServer.call(recorder, {:add_tracks, tracks})
99+
end
100+
101+
@doc """
102+
Records a received packet on the given track.
103+
"""
104+
@spec record(
105+
recorder(),
106+
MediaStreamTrack.id(),
107+
MediaStreamTrack.rid() | nil,
108+
ExRTP.Packet.t()
109+
) :: :ok
110+
def record(recorder, track_id, rid, %ExRTP.Packet{} = packet) do
111+
recv_time = System.monotonic_time(:millisecond)
112+
GenServer.cast(recorder, {:record, track_id, rid, recv_time, packet})
113+
end
114+
115+
@doc """
116+
Changes the controlling process of this `recorder` process.
117+
118+
Controlling process is a process that receives all of the messages (described
119+
by `t:message/0`) from this Recorder.
120+
"""
121+
@spec controlling_process(recorder(), Process.dest()) :: :ok
122+
def controlling_process(recorder, controlling_process) do
123+
GenServer.call(recorder, {:controlling_process, controlling_process})
124+
end
125+
126+
@doc """
127+
Finishes the recording for the given tracks and optionally uploads the result files.
128+
129+
Returns the part of the recording manifest that's relevant to the freshly ended tracks.
130+
See `t:ExWebRTC.Recorder.Manifest.t/0` for more info.
131+
132+
If uploads are configured:
133+
* Returns the reference to the upload task that was spawned.
134+
* Will send the `:upload_complete`/`:upload_failed` message with this reference
135+
to the controlling process when the task finishes.
136+
137+
Note that the manifest returned by this function always contains local paths to files.
138+
The updated manifest with `s3://` scheme URLs is sent in the aforementioned message.
139+
"""
140+
@spec end_tracks(recorder(), [MediaStreamTrack.id()]) ::
141+
{:ok, __MODULE__.Manifest.t(), S3.upload_task_ref() | nil} | {:error, :tracks_not_found}
142+
def end_tracks(recorder, track_ids) do
143+
GenServer.call(recorder, {:end_tracks, track_ids})
144+
end
145+
146+
@impl true
147+
def init(config) do
148+
base_dir =
149+
(config[:base_dir] || @default_base_dir)
150+
|> Path.join(current_datetime())
151+
|> Path.expand()
152+
153+
:ok = File.mkdir_p!(base_dir)
154+
Logger.info("Starting recorder. Recordings will be saved under: #{base_dir}")
155+
156+
upload_handler =
157+
if config[:s3_upload_config] do
158+
Logger.info("Recordings will be uploaded to S3")
159+
S3.UploadHandler.new(config[:s3_upload_config])
160+
end
161+
162+
state = %{
163+
owner: config[:controlling_process],
164+
base_dir: base_dir,
165+
manifest_path: Path.join(base_dir, "manifest.json"),
166+
track_data: %{},
167+
upload_handler: upload_handler
168+
}
169+
170+
case config[:on_start] do
171+
nil ->
172+
{:ok, state}
173+
174+
callback ->
175+
{:ok, state, {:continue, {:on_start, callback}}}
176+
end
177+
end
178+
179+
@impl true
180+
def handle_continue({:on_start, on_start}, state) do
181+
case on_start.() do
182+
[] ->
183+
{:noreply, state}
184+
185+
tracks ->
186+
{_manifest_diff, state} = do_add_tracks(tracks, state)
187+
{:noreply, state}
188+
end
189+
end
190+
191+
@impl true
192+
def handle_call({:controlling_process, controlling_process}, _from, state) do
193+
state = %{state | owner: controlling_process}
194+
{:reply, :ok, state}
195+
end
196+
197+
@impl true
198+
def handle_call({:add_tracks, tracks}, _from, state) do
199+
{manifest_diff, state} = do_add_tracks(tracks, state)
200+
{:reply, {:ok, manifest_diff}, state}
201+
end
202+
203+
@impl true
204+
def handle_call({:end_tracks, track_ids}, _from, state) do
205+
case Enum.filter(track_ids, &Map.has_key?(state.track_data, &1)) do
206+
[] ->
207+
{:reply, {:error, :tracks_not_found}, state}
208+
209+
track_ids ->
210+
{manifest_diff, ref, state} = do_end_tracks(track_ids, state)
211+
{:reply, {:ok, manifest_diff, ref}, state}
212+
end
213+
end
214+
215+
@impl true
216+
def handle_cast({:record, track_id, rid, recv_time, packet}, state)
217+
when is_map_key(state.track_data, track_id) do
218+
%{file: file, rid_map: rid_map} = state.track_data[track_id]
219+
220+
with {:ok, rid_idx} <- Map.fetch(rid_map, rid),
221+
false <- is_nil(file) do
222+
:ok = IO.binwrite(file, serialize_packet(packet, rid_idx, recv_time))
223+
else
224+
:error ->
225+
Logger.warning("""
226+
Tried to save packet for unknown rid. Ignoring. Track id: #{inspect(track_id)}, rid: #{inspect(rid)}.\
227+
""")
228+
229+
true ->
230+
Logger.warning("""
231+
Tried to save packet for track which has been ended. Ignoring. Track id: #{inspect(track_id)} \
232+
""")
233+
end
234+
235+
{:noreply, state}
236+
end
237+
238+
@impl true
239+
def handle_cast({:record, track_id, _rid, _recv_time, _packet}, state) do
240+
Logger.warning("""
241+
Tried to save packet for unknown track id. Ignoring. Track id: #{inspect(track_id)}.\
242+
""")
243+
244+
{:noreply, state}
245+
end
246+
247+
@impl true
248+
def handle_info({ref, _res} = task_result, state) when is_reference(ref) do
249+
if state.upload_handler do
250+
{result, manifest, handler} =
251+
S3.UploadHandler.process_result(state.upload_handler, task_result)
252+
253+
case result do
254+
:ok ->
255+
send(state.owner, {:ex_webrtc_recorder, self(), {:upload_complete, ref, manifest}})
256+
257+
{:error, :upload_failed} ->
258+
send(state.owner, {:ex_webrtc_recorder, self(), {:upload_failed, ref, manifest}})
259+
260+
{:error, :unknown_task} ->
261+
raise "Upload handler encountered result of unknown task"
262+
end
263+
264+
{:noreply, %{state | upload_handler: handler}}
265+
else
266+
{:noreply, state}
267+
end
268+
end
269+
270+
@impl true
271+
def handle_info(_msg, state) do
272+
{:noreply, state}
273+
end
274+
275+
defp do_add_tracks(tracks, state) do
276+
start_time = DateTime.utc_now()
277+
278+
new_track_data =
279+
Map.new(tracks, fn track ->
280+
file_path = Path.join(state.base_dir, "#{track.id}.rtpx")
281+
282+
track_entry = %{
283+
start_time: start_time,
284+
kind: track.kind,
285+
streams: track.streams,
286+
rid_map: (track.rids || [nil]) |> Enum.with_index() |> Map.new(),
287+
location: file_path,
288+
file: File.open!(file_path, [:write])
289+
}
290+
291+
{track.id, track_entry}
292+
end)
293+
294+
manifest_diff = to_manifest(new_track_data)
295+
296+
state = %{state | track_data: Map.merge(state.track_data, new_track_data)}
297+
298+
:ok = File.write!(state.manifest_path, state.track_data |> to_manifest() |> Jason.encode!())
299+
300+
{manifest_diff, state}
301+
end
302+
303+
defp do_end_tracks(track_ids, state) do
304+
# We're keeping entries from `track_data` for ended tracks
305+
# because they need to be present in the global manifest,
306+
# which gets recreated on each call to `add_tracks`
307+
state =
308+
Enum.reduce(track_ids, state, fn track_id, state ->
309+
%{file: file} = state.track_data[track_id]
310+
File.close(file)
311+
312+
put_in(state, [:track_data, track_id, :file], nil)
313+
end)
314+
315+
manifest_diff = to_manifest(state.track_data, track_ids)
316+
317+
case state.upload_handler do
318+
nil ->
319+
{manifest_diff, nil, state}
320+
321+
handler ->
322+
{ref, handler} = S3.UploadHandler.spawn_task(handler, manifest_diff)
323+
324+
{manifest_diff, ref, %{state | upload_handler: handler}}
325+
end
326+
end
327+
328+
defp to_manifest(track_data, track_ids) do
329+
track_data |> Map.take(track_ids) |> to_manifest()
330+
end
331+
332+
defp to_manifest(track_data) do
333+
Map.new(track_data, fn {id, track} ->
334+
{id, Map.delete(track, :file)}
335+
end)
336+
end
337+
338+
defp serialize_packet(packet, rid_idx, recv_time) do
339+
packet = ExRTP.Packet.encode(packet)
340+
packet_size = byte_size(packet)
341+
<<rid_idx::8, recv_time::64, packet_size::32, packet::binary>>
342+
end
343+
344+
defp current_datetime() do
345+
{{y, mo, d}, {h, m, s}} = :calendar.local_time()
346+
347+
# e.g. 20240130-120315
348+
:io_lib.format("~4..0w~2..0w~2..0w-~2..0w~2..0w~2..0w", [y, mo, d, h, m, s])
349+
|> to_string()
350+
end
351+
end

0 commit comments

Comments
 (0)