Skip to content

Commit e0ff731

Browse files
authored
Track recordings (#180)
1 parent d74a937 commit e0ff731

File tree

4 files changed

+381
-3
lines changed

4 files changed

+381
-3
lines changed

lib/ex_webrtc/media/opus.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ defmodule ExWebRTC.Media.Opus do
77
88
Returns the duration in milliseconds.
99
"""
10-
@spec duration(binary()) :: {:ok, float()} | {:error, term()}
10+
@spec duration(binary()) :: {:ok, number()} | {:error, term()}
1111
def duration(<<config::5, rest::bitstring>>) do
1212
with {:ok, frame_count} <- get_frame_count(rest) do
1313
{:ok, frame_count * get_frame_duration(config)}

lib/ex_webrtc/recorder.ex

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
defmodule ExWebRTC.Recorder do
2+
@moduledoc """
3+
Saves received RTP packets to a file for later processing/analysis.
4+
5+
Dumps raw RTP packets fed to it in a custom format. Use `Recorder.Converter` to process them.
6+
"""
7+
8+
use GenServer
9+
10+
alias ExWebRTC.MediaStreamTrack
11+
12+
require Logger
13+
14+
@default_base_dir "./recordings"
15+
16+
@typedoc """
17+
Options that can be passed to `start_link/1`.
18+
19+
* `base_dir` - Base directory where Recorder will save its artifacts. `#{@default_base_dir}` by default.
20+
* `on_start` - Callback that will be executed just after the Recorder is (re)started.
21+
It should return the initial list of tracks to be added.
22+
"""
23+
@type option ::
24+
{:base_dir, String.t()}
25+
| {:on_start, (-> [MediaStreamTrack.t()])}
26+
27+
@type options :: [option()]
28+
29+
# Necessary to start Recorder under a supervisor using `{Recorder, [recorder_opts, gen_server_opts]}`
30+
@doc false
31+
@spec child_spec(list()) :: Supervisor.child_spec()
32+
def child_spec(args) do
33+
%{
34+
id: __MODULE__,
35+
start: {__MODULE__, :start_link, args}
36+
}
37+
end
38+
39+
@doc """
40+
Starts a new `ExWebRTC.Recorder` process.
41+
42+
`ExWebRTC.Recorder` is a `GenServer` under the hood, thus this function allows for
43+
passing the generic `t:GenServer.options/0` as an argument.
44+
"""
45+
@spec start(options(), GenServer.options()) :: GenServer.on_start()
46+
def start(recorder_opts \\ [], gen_server_opts \\ []) do
47+
GenServer.start(__MODULE__, recorder_opts, gen_server_opts)
48+
end
49+
50+
@doc """
51+
Starts a new `ExWebRTC.Recorder` process.
52+
53+
Works identically to `start/2`, but links to the calling process.
54+
"""
55+
@spec start_link(options(), GenServer.options()) :: GenServer.on_start()
56+
def start_link(recorder_opts \\ [], gen_server_opts \\ []) do
57+
GenServer.start_link(__MODULE__, recorder_opts, gen_server_opts)
58+
end
59+
60+
@doc """
61+
Adds new tracks to the recording.
62+
"""
63+
@spec add_tracks(GenServer.server(), [MediaStreamTrack.t()]) :: :ok
64+
def add_tracks(recorder, tracks) do
65+
GenServer.call(recorder, {:add_tracks, tracks})
66+
end
67+
68+
@doc """
69+
Records a received packet on the given track.
70+
"""
71+
@spec record(
72+
GenServer.server(),
73+
MediaStreamTrack.id(),
74+
MediaStreamTrack.rid() | nil,
75+
ExRTP.Packet.t()
76+
) :: :ok
77+
def record(recorder, track_id, rid, %ExRTP.Packet{} = packet) do
78+
recv_time = System.monotonic_time(:millisecond)
79+
GenServer.cast(recorder, {:record, track_id, rid, recv_time, packet})
80+
end
81+
82+
@impl true
83+
def init(config) do
84+
base_dir =
85+
(config[:base_dir] || @default_base_dir)
86+
|> Path.join(current_datetime())
87+
|> Path.expand()
88+
89+
:ok = File.mkdir_p!(base_dir)
90+
Logger.info("Starting recorder. Recordings will be saved under: #{base_dir}")
91+
92+
state = %{
93+
base_dir: base_dir,
94+
tracks: %{}
95+
}
96+
97+
case config[:on_start] do
98+
nil ->
99+
{:ok, state}
100+
101+
callback ->
102+
{:ok, state, {:continue, {:on_start, callback}}}
103+
end
104+
end
105+
106+
@impl true
107+
def handle_continue({:on_start, on_start}, state) do
108+
case on_start.() do
109+
[] ->
110+
{:noreply, state}
111+
112+
tracks ->
113+
state = do_add_tracks(tracks, state)
114+
{:noreply, state}
115+
end
116+
end
117+
118+
@impl true
119+
def handle_call({:add_tracks, tracks}, _from, state) do
120+
state = do_add_tracks(tracks, state)
121+
{:reply, :ok, state}
122+
end
123+
124+
@impl true
125+
def handle_cast({:record, track_id, rid, recv_time, packet}, state)
126+
when is_map_key(state.tracks, track_id) do
127+
%{file: file, rid_map: rid_map} = state.tracks[track_id]
128+
129+
case rid_map do
130+
%{^rid => rid_idx} ->
131+
:ok = IO.binwrite(file, serialize_packet(packet, rid_idx, recv_time))
132+
133+
_other ->
134+
Logger.warning("""
135+
Tried to save packet for unknown rid. Ignoring. Track id: #{inspect(track_id)}, rid: #{inspect(rid)}.\
136+
""")
137+
end
138+
139+
{:noreply, state}
140+
end
141+
142+
@impl true
143+
def handle_cast({:record, track_id, _rid, _recv_time, _packet}, state) do
144+
Logger.warning("""
145+
Tried to save packet for unknown track id. Ignoring. Track id: #{inspect(track_id)}.\
146+
""")
147+
148+
{:noreply, state}
149+
end
150+
151+
@impl true
152+
def handle_info(_msg, state) do
153+
{:noreply, state}
154+
end
155+
156+
defp do_add_tracks(tracks, state) do
157+
start_time = DateTime.utc_now()
158+
159+
tracks =
160+
Map.new(tracks, fn track ->
161+
path = Path.join(state.base_dir, "#{track.id}.rtpx")
162+
file = File.open!(path, [:write])
163+
rid_map = (track.rids || [nil]) |> Enum.with_index() |> Map.new()
164+
165+
{track.id,
166+
%{kind: track.kind, rid_map: rid_map, path: path, file: file, start_time: start_time}}
167+
end)
168+
169+
state = %{state | tracks: Map.merge(state.tracks, tracks)}
170+
report_path = Path.join(state.base_dir, "report.json")
171+
172+
report =
173+
Map.new(state.tracks, fn {id, track} ->
174+
track = Map.delete(track, :file)
175+
{id, track}
176+
end)
177+
178+
:ok = File.write!(report_path, Jason.encode!(report))
179+
180+
%{state | tracks: tracks}
181+
end
182+
183+
defp serialize_packet(packet, rid_idx, recv_time) do
184+
packet = ExRTP.Packet.encode(packet)
185+
packet_size = byte_size(packet)
186+
<<rid_idx::8, recv_time::64, packet_size::32, packet::binary>>
187+
end
188+
189+
defp current_datetime() do
190+
{{y, mo, d}, {h, m, s}} = :calendar.local_time()
191+
192+
# e.g. 20240130-120315
193+
:io_lib.format("~4..0w~2..0w~2..0w-~2..0w~2..0w~2..0w", [y, mo, d, h, m, s])
194+
|> to_string()
195+
end
196+
end

lib/ex_webrtc/recorder/converter.ex

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
defmodule ExWebRTC.Recorder.Converter do
2+
@moduledoc """
3+
Processes RTP packet files saved by `Recorder`.
4+
5+
At the moment, `Converter` works only with VP8 video and Opus audio.
6+
"""
7+
8+
require Logger
9+
10+
alias ExWebRTC.RTP.JitterBuffer.PacketStore
11+
alias ExWebRTC.RTPCodecParameters
12+
alias ExWebRTC.RTP.Depayloader
13+
alias ExWebRTC.Media.{IVF, Ogg}
14+
15+
# TODO: Allow changing these values
16+
@ivf_header_opts [
17+
# <<fourcc::little-32>> = "VP80"
18+
fourcc: 808_996_950,
19+
height: 720,
20+
width: 1280,
21+
num_frames: 1024,
22+
timebase_denum: 24,
23+
timebase_num: 1
24+
]
25+
26+
# TODO: Support codecs other than VP8/Opus
27+
@video_codec_params %RTPCodecParameters{
28+
payload_type: 96,
29+
mime_type: "video/VP8",
30+
clock_rate: 90_000
31+
}
32+
33+
@audio_codec_params %RTPCodecParameters{
34+
payload_type: 111,
35+
mime_type: "audio/opus",
36+
clock_rate: 48_000,
37+
channels: 2
38+
}
39+
40+
@default_output_path "./converter_output"
41+
42+
@doc """
43+
Convert the saved dumps of tracks in the report to IVF and Ogg files.
44+
"""
45+
@spec convert!(Path.t(), Path.t()) :: :ok | no_return()
46+
def convert!(report_path, output_path \\ @default_output_path) do
47+
report_path =
48+
report_path
49+
|> Path.expand()
50+
|> then(
51+
&if(File.dir?(&1),
52+
do: Path.join(&1, "report.json"),
53+
else: &1
54+
)
55+
)
56+
57+
output_path = Path.expand(output_path)
58+
File.mkdir_p!(output_path)
59+
60+
report =
61+
report_path
62+
|> File.read!()
63+
|> Jason.decode!()
64+
65+
for {id, track} <- report do
66+
%{
67+
"path" => path,
68+
"kind" => kind,
69+
"rid_map" => rid_map
70+
} = track
71+
72+
file = File.open!(path)
73+
74+
packets =
75+
read_packets(file, Map.new(rid_map, fn {_rid, rid_idx} -> {rid_idx, %PacketStore{}} end))
76+
77+
case kind do
78+
"video" ->
79+
convert_video_track(id, rid_map, output_path, packets)
80+
81+
"audio" ->
82+
convert_audio_track(id, output_path, packets |> Map.values() |> hd())
83+
end
84+
end
85+
86+
:ok
87+
end
88+
89+
defp convert_video_track(id, rid_map, output_path, packets) do
90+
for {rid, rid_idx} <- rid_map do
91+
filename = if rid == "nil", do: "#{id}.ivf", else: "#{id}_#{rid}.ivf"
92+
93+
{:ok, writer} =
94+
output_path
95+
|> Path.join(filename)
96+
|> IVF.Writer.open(@ivf_header_opts)
97+
98+
{:ok, depayloader} = Depayloader.new(@video_codec_params)
99+
do_convert_video_track(packets[rid_idx], depayloader, writer)
100+
end
101+
end
102+
103+
defp do_convert_video_track(packets, depayloader, writer, frames_cnt \\ 0)
104+
defp do_convert_video_track([], _depayloader, writer, _frames_cnt), do: IVF.Writer.close(writer)
105+
106+
defp do_convert_video_track([packet | rest], depayloader, writer, frames_cnt) do
107+
case Depayloader.depayload(depayloader, packet) do
108+
{nil, depayloader} ->
109+
do_convert_video_track(rest, depayloader, writer, frames_cnt)
110+
111+
{vp8_frame, depayloader} ->
112+
frame = %IVF.Frame{timestamp: frames_cnt, data: vp8_frame}
113+
{:ok, writer} = IVF.Writer.write_frame(writer, frame)
114+
do_convert_video_track(rest, depayloader, writer, frames_cnt + 1)
115+
end
116+
end
117+
118+
defp convert_audio_track(id, output_path, packets) do
119+
{:ok, writer} =
120+
output_path
121+
|> Path.join("#{id}.ogg")
122+
|> Ogg.Writer.open()
123+
124+
{:ok, depayloader} = Depayloader.new(@audio_codec_params)
125+
do_convert_audio_track(packets, depayloader, writer)
126+
end
127+
128+
defp do_convert_audio_track([], _depayloader, writer), do: Ogg.Writer.close(writer)
129+
130+
defp do_convert_audio_track([packet | rest], depayloader, writer) do
131+
{opus_packet, depayloader} = Depayloader.depayload(depayloader, packet)
132+
{:ok, writer} = Ogg.Writer.write_packet(writer, opus_packet)
133+
do_convert_audio_track(rest, depayloader, writer)
134+
end
135+
136+
defp read_packets(file, stores) do
137+
case read_packet(file) do
138+
{:ok, rid_idx, packet} ->
139+
stores = Map.update!(stores, rid_idx, &insert_packet_to_store(&1, packet))
140+
read_packets(file, stores)
141+
142+
{:error, reason} ->
143+
Logger.warning("Error decoding RTP packet: #{inspect(reason)}")
144+
read_packets(file, stores)
145+
146+
:eof ->
147+
Map.new(stores, fn {rid_idx, store} ->
148+
{rid_idx, store |> PacketStore.dump() |> Enum.reject(&is_nil/1)}
149+
end)
150+
end
151+
end
152+
153+
defp read_packet(file) do
154+
with {:ok, <<rid_idx::8, _recv_time::64, packet_size::32>>} <- read_exactly_n_bytes(file, 13),
155+
{:ok, packet_data} <- read_exactly_n_bytes(file, packet_size),
156+
{:ok, packet} <- ExRTP.Packet.decode(packet_data) do
157+
{:ok, rid_idx, packet}
158+
end
159+
end
160+
161+
defp read_exactly_n_bytes(file, byte_cnt) do
162+
with data when is_binary(data) <- IO.binread(file, byte_cnt),
163+
true <- byte_cnt == byte_size(data) do
164+
{:ok, data}
165+
else
166+
:eof -> :eof
167+
false -> {:error, :not_enough_data}
168+
{:error, _reason} = error -> error
169+
end
170+
end
171+
172+
defp insert_packet_to_store(store, packet) do
173+
case PacketStore.insert(store, packet) do
174+
{:ok, store} ->
175+
store
176+
177+
{:error, :late_packet} ->
178+
Logger.warning("Decoded late RTP packet")
179+
store
180+
end
181+
end
182+
end

0 commit comments

Comments
 (0)