Skip to content

Recorder+Converter enhancements #197

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 191 additions & 36 deletions lib/ex_webrtc/recorder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,52 @@ defmodule ExWebRTC.Recorder do
@moduledoc """
Saves received RTP packets to a file for later processing/analysis.

Dumps raw RTP packets fed to it in a custom format. Use `Recorder.Converter` to process them.
"""
Dumps raw RTP packets fed to it in a custom format. Use `ExWebRTC.Recorder.Converter` to process them.

use GenServer
Can optionally upload the saved files to S3-compatible storage.
See `ExWebRTC.Recorder.S3` and `t:options/0` for more info.
"""

alias ExWebRTC.MediaStreamTrack
alias __MODULE__.S3

require Logger

use GenServer

@default_base_dir "./recordings"

@type recorder :: GenServer.server()

@typedoc """
Options that can be passed to `start_link/1`.

* `base_dir` - Base directory where Recorder will save its artifacts. `#{@default_base_dir}` by default.
* `on_start` - Callback that will be executed just after the Recorder is (re)started.
It should return the initial list of tracks to be added.
* `:base_dir` - Base directory where Recorder will save its artifacts. `#{@default_base_dir}` by default.
* `:on_start` - Callback that will be executed just after the Recorder is (re)started.
It should return the initial list of tracks to be added.
* `:controlling_process` - PID of a process where all messages will be sent. `self()` by default.
* `:s3_upload_config` - If passed, finished recordings will be uploaded to S3-compatible storage.
See `t:ExWebRTC.Recorder.S3.upload_config/0` for more info.
"""
@type option ::
{:base_dir, String.t()}
| {:on_start, (-> [MediaStreamTrack.t()])}
| {:controlling_process, Process.dest()}
| {:s3_upload_config, S3.upload_config()}

@type options :: [option()]

@typedoc """
Messages sent by the `ExWebRTC.Recorder` process to its controlling process.

* `:upload_complete`, `:upload_failed` - Sent after the completion of the upload task, identified by its reference.
Contains the updated manifest with `s3://` scheme URLs to uploaded files.
"""
@type message ::
{:ex_webrtc_recorder, pid(),
{:upload_complete, S3.upload_task_ref(), __MODULE__.Manifest.t()}
| {:upload_failed, S3.upload_task_ref(), __MODULE__.Manifest.t()}}

# Necessary to start Recorder under a supervisor using `{Recorder, [recorder_opts, gen_server_opts]}`
@doc false
@spec child_spec(list()) :: Supervisor.child_spec()
Expand All @@ -44,7 +66,11 @@ defmodule ExWebRTC.Recorder do
"""
@spec start(options(), GenServer.options()) :: GenServer.on_start()
def start(recorder_opts \\ [], gen_server_opts \\ []) do
GenServer.start(__MODULE__, recorder_opts, gen_server_opts)
config =
recorder_opts
|> Keyword.put_new(:controlling_process, self())

GenServer.start(__MODULE__, config, gen_server_opts)
end

@doc """
Expand All @@ -54,13 +80,20 @@ defmodule ExWebRTC.Recorder do
"""
@spec start_link(options(), GenServer.options()) :: GenServer.on_start()
def start_link(recorder_opts \\ [], gen_server_opts \\ []) do
GenServer.start_link(__MODULE__, recorder_opts, gen_server_opts)
config =
recorder_opts
|> Keyword.put_new(:controlling_process, self())

GenServer.start_link(__MODULE__, config, gen_server_opts)
end

@doc """
Adds new tracks to the recording.

Returns the part of the recording manifest that's relevant to the freshly added tracks.
See `t:ExWebRTC.Recorder.Manifest.t/0` for more info.
"""
@spec add_tracks(GenServer.server(), [MediaStreamTrack.t()]) :: :ok
@spec add_tracks(recorder(), [MediaStreamTrack.t()]) :: {:ok, __MODULE__.Manifest.t()}
def add_tracks(recorder, tracks) do
GenServer.call(recorder, {:add_tracks, tracks})
end
Expand All @@ -69,7 +102,7 @@ defmodule ExWebRTC.Recorder do
Records a received packet on the given track.
"""
@spec record(
GenServer.server(),
recorder(),
MediaStreamTrack.id(),
MediaStreamTrack.rid() | nil,
ExRTP.Packet.t()
Expand All @@ -79,6 +112,37 @@ defmodule ExWebRTC.Recorder do
GenServer.cast(recorder, {:record, track_id, rid, recv_time, packet})
end

@doc """
Changes the controlling process of this `recorder` process.

Controlling process is a process that receives all of the messages (described
by `t:message/0`) from this Recorder.
"""
@spec controlling_process(recorder(), Process.dest()) :: :ok
def controlling_process(recorder, controlling_process) do
GenServer.call(recorder, {:controlling_process, controlling_process})
end

@doc """
Finishes the recording for the given tracks and optionally uploads the result files.

Returns the part of the recording manifest that's relevant to the freshly ended tracks.
See `t:ExWebRTC.Recorder.Manifest.t/0` for more info.

If uploads are configured:
* Returns the reference to the upload task that was spawned.
* Will send the `:upload_complete`/`:upload_failed` message with this reference
to the controlling process when the task finishes.

Note that the manifest returned by this function always contains local paths to files.
The updated manifest with `s3://` scheme URLs is sent in the aforementioned message.
"""
@spec end_tracks(recorder(), [MediaStreamTrack.id()]) ::
{:ok, __MODULE__.Manifest.t(), S3.upload_task_ref() | nil} | {:error, :tracks_not_found}
def end_tracks(recorder, track_ids) do
GenServer.call(recorder, {:end_tracks, track_ids})
end

@impl true
def init(config) do
base_dir =
Expand All @@ -89,9 +153,18 @@ defmodule ExWebRTC.Recorder do
:ok = File.mkdir_p!(base_dir)
Logger.info("Starting recorder. Recordings will be saved under: #{base_dir}")

upload_handler =
if config[:s3_upload_config] do
Logger.info("Recordings will be uploaded to S3")
S3.UploadHandler.new(config[:s3_upload_config])
end

state = %{
owner: config[:controlling_process],
base_dir: base_dir,
tracks: %{}
manifest_path: Path.join(base_dir, "manifest.json"),
track_data: %{},
upload_handler: upload_handler
}

case config[:on_start] do
Expand All @@ -110,30 +183,53 @@ defmodule ExWebRTC.Recorder do
{:noreply, state}

tracks ->
state = do_add_tracks(tracks, state)
{_manifest_diff, state} = do_add_tracks(tracks, state)
{:noreply, state}
end
end

@impl true
def handle_call({:add_tracks, tracks}, _from, state) do
state = do_add_tracks(tracks, state)
def handle_call({:controlling_process, controlling_process}, _from, state) do
state = %{state | owner: controlling_process}
{:reply, :ok, state}
end

@impl true
def handle_cast({:record, track_id, rid, recv_time, packet}, state)
when is_map_key(state.tracks, track_id) do
%{file: file, rid_map: rid_map} = state.tracks[track_id]
def handle_call({:add_tracks, tracks}, _from, state) do
{manifest_diff, state} = do_add_tracks(tracks, state)
{:reply, {:ok, manifest_diff}, state}
end

@impl true
def handle_call({:end_tracks, track_ids}, _from, state) do
case Enum.filter(track_ids, &Map.has_key?(state.track_data, &1)) do
[] ->
{:reply, {:error, :tracks_not_found}, state}

case rid_map do
%{^rid => rid_idx} ->
:ok = IO.binwrite(file, serialize_packet(packet, rid_idx, recv_time))
track_ids ->
{manifest_diff, ref, state} = do_end_tracks(track_ids, state)
{:reply, {:ok, manifest_diff, ref}, state}
end
end

_other ->
@impl true
def handle_cast({:record, track_id, rid, recv_time, packet}, state)
when is_map_key(state.track_data, track_id) do
%{file: file, rid_map: rid_map} = state.track_data[track_id]

with {:ok, rid_idx} <- Map.fetch(rid_map, rid),
false <- is_nil(file) do
:ok = IO.binwrite(file, serialize_packet(packet, rid_idx, recv_time))
else
:error ->
Logger.warning("""
Tried to save packet for unknown rid. Ignoring. Track id: #{inspect(track_id)}, rid: #{inspect(rid)}.\
""")

true ->
Logger.warning("""
Tried to save packet for track which has been ended. Ignoring. Track id: #{inspect(track_id)} \
""")
end

{:noreply, state}
Expand All @@ -148,6 +244,29 @@ defmodule ExWebRTC.Recorder do
{:noreply, state}
end

@impl true
def handle_info({ref, _res} = task_result, state) when is_reference(ref) do
if state.upload_handler do
{result, manifest, handler} =
S3.UploadHandler.process_result(state.upload_handler, task_result)

case result do
:ok ->
send(state.owner, {:ex_webrtc_recorder, self(), {:upload_complete, ref, manifest}})

{:error, :upload_failed} ->
send(state.owner, {:ex_webrtc_recorder, self(), {:upload_failed, ref, manifest}})

{:error, :unknown_task} ->
raise "Upload handler encountered result of unknown task"
end

{:noreply, %{state | upload_handler: handler}}
else
{:noreply, state}
end
end

@impl true
def handle_info(_msg, state) do
{:noreply, state}
Expand All @@ -156,28 +275,64 @@ defmodule ExWebRTC.Recorder do
defp do_add_tracks(tracks, state) do
start_time = DateTime.utc_now()

tracks =
new_track_data =
Map.new(tracks, fn track ->
path = Path.join(state.base_dir, "#{track.id}.rtpx")
file = File.open!(path, [:write])
rid_map = (track.rids || [nil]) |> Enum.with_index() |> Map.new()

{track.id,
%{kind: track.kind, rid_map: rid_map, path: path, file: file, start_time: start_time}}
file_path = Path.join(state.base_dir, "#{track.id}.rtpx")

track_entry = %{
start_time: start_time,
kind: track.kind,
streams: track.streams,
rid_map: (track.rids || [nil]) |> Enum.with_index() |> Map.new(),
location: file_path,
file: File.open!(file_path, [:write])
}

{track.id, track_entry}
end)

state = %{state | tracks: Map.merge(state.tracks, tracks)}
report_path = Path.join(state.base_dir, "report.json")
manifest_diff = to_manifest(new_track_data)

state = %{state | track_data: Map.merge(state.track_data, new_track_data)}

:ok = File.write!(state.manifest_path, state.track_data |> to_manifest() |> Jason.encode!())

report =
Map.new(state.tracks, fn {id, track} ->
track = Map.delete(track, :file)
{id, track}
{manifest_diff, state}
end

defp do_end_tracks(track_ids, state) do
# We're keeping entries from `track_data` for ended tracks
# because they need to be present in the global manifest,
# which gets recreated on each call to `add_tracks`
state =
Enum.reduce(track_ids, state, fn track_id, state ->
%{file: file} = state.track_data[track_id]
File.close(file)

put_in(state, [:track_data, track_id, :file], nil)
end)

:ok = File.write!(report_path, Jason.encode!(report))
manifest_diff = to_manifest(state.track_data, track_ids)

case state.upload_handler do
nil ->
{manifest_diff, nil, state}

handler ->
{ref, handler} = S3.UploadHandler.spawn_task(handler, manifest_diff)

{manifest_diff, ref, %{state | upload_handler: handler}}
end
end

defp to_manifest(track_data, track_ids) do
track_data |> Map.take(track_ids) |> to_manifest()
end

state
defp to_manifest(track_data) do
Map.new(track_data, fn {id, track} ->
{id, Map.delete(track, :file)}
end)
end

defp serialize_packet(packet, rid_idx, recv_time) do
Expand Down
Loading