Skip to content

[Broadcaster] Add recordings #77

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions broadcaster/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,7 @@ npm-debug.log
/node_modules
package.json
package-lock.json

# default recordings & converter output path
/recordings/
/converter_output/
27 changes: 26 additions & 1 deletion broadcaster/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,23 @@ read_ice_port_range! = fn ->
end
end

read_boolean! = fn env, default ->
if value = System.get_env(env) do
case String.downcase(value) do
"true" ->
true

"false" ->
false

_other ->
raise "Bad #{env} environment variable value. Expected true or false, got: #{value}"
end
else
default
end
end

dist_config =
case System.get_env("DISTRIBUTION_MODE") do
"k8s" -> read_k8s_dist_config!.()
Expand Down Expand Up @@ -89,9 +106,17 @@ pc_config = [
ice_port_range: read_ice_port_range!.()
]

recordings_config =
if read_boolean!.("RECORDINGS_ENABLED", false) do
[base_dir: System.get_env("RECORDINGS_BASE_DIR") || "./recordings"]
else
nil
end

config :broadcaster,
dist_config: dist_config,
pc_config: pc_config
pc_config: pc_config,
recordings_config: recordings_config

if System.get_env("PHX_SERVER") do
config :broadcaster, BroadcasterWeb.Endpoint, server: true
Expand Down
16 changes: 13 additions & 3 deletions broadcaster/lib/broadcaster/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,17 @@ defmodule Broadcaster.Application do
[{Cluster.Supervisor, [[cluster: config], [name: Broadcaster.ClusterSupervisor]]}]
end

# Start dist_config before starting Forwarder,
{recordings_enabled?, recordings_config} =
case Application.fetch_env!(:broadcaster, :recordings_config) do
nil ->
{false, []}

config ->
config = Keyword.put(config, :on_start, &Broadcaster.Forwarder.on_recorder_start/0)
{true, [{ExWebRTC.Recorder, [config, [name: Broadcaster.Recorder]]}]}
end

# Start dist_config before starting Forwarder,
# as Forwarder asks other nodes for their inputs.
children =
[
Expand All @@ -33,10 +43,10 @@ defmodule Broadcaster.Application do
dist_config ++
[
Broadcaster.PeerSupervisor,
Broadcaster.Forwarder,
{Broadcaster.Forwarder, [recordings_enabled?: recordings_enabled?]},
Broadcaster.ChatHistory,
{Registry, name: Broadcaster.ChatNicknamesRegistry, keys: :unique}
]
] ++ recordings_config

# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
Expand Down
41 changes: 37 additions & 4 deletions broadcaster/lib/broadcaster/forwarder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ defmodule Broadcaster.Forwarder do
alias ExWebRTC.PeerConnection
alias ExWebRTC.RTP.H264
alias ExWebRTC.RTP.Munger
alias ExWebRTC.Recorder

alias Broadcaster.PeerSupervisor
alias BroadcasterWeb.Channel
Expand Down Expand Up @@ -39,6 +40,9 @@ defmodule Broadcaster.Forwarder do
}

@type state :: %{
# Options
recordings_enabled?: boolean(),

# WHIP
pending_inputs: %{pid() => id()},
local_inputs: %{pid() => input_spec()},
Expand All @@ -50,9 +54,11 @@ defmodule Broadcaster.Forwarder do
outputs: %{pid() => output_spec()}
}

@spec start_link(any()) :: GenServer.on_start()
def start_link(_arg) do
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
@type options :: [recordings_enabled?: boolean()]

@spec start_link(options()) :: GenServer.on_start()
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

@spec set_layer(pid(), String.t()) :: :ok | :error
Expand Down Expand Up @@ -85,9 +91,15 @@ defmodule Broadcaster.Forwarder do
GenServer.call(__MODULE__, :local_inputs)
end

@spec on_recorder_start() :: [ExWebRTC.MediaStreamTrack.t()]
def on_recorder_start() do
GenServer.call(__MODULE__, :on_recorder_start)
end

@impl true
def init(_arg) do
def init(opts) do
state = %{
recordings_enabled?: Keyword.get(opts, :recordings_enabled?, false),
pending_inputs: %{},
local_inputs: %{},
remote_inputs: %{},
Expand Down Expand Up @@ -182,6 +194,16 @@ defmodule Broadcaster.Forwarder do
{:reply, :ok, %{state | pending_outputs: pending_outputs}}
end

@impl true
def handle_call(:on_recorder_start, _from, state) do
tracks =
state.local_inputs
|> Map.keys()
|> Enum.flat_map(&(get_tracks(&1, :receiver) |> Tuple.to_list()))

{:reply, tracks, state}
end

@impl true
def handle_info({:connect_timeout, pc}, state) do
direction =
Expand Down Expand Up @@ -210,6 +232,9 @@ defmodule Broadcaster.Forwarder do

{audio_track, video_track} = get_tracks(pc, :receiver)

if state.recordings_enabled?,
do: Recorder.add_tracks(Broadcaster.Recorder, [audio_track, video_track])

input = %{
pc: pc,
id: id,
Expand Down Expand Up @@ -272,10 +297,18 @@ defmodule Broadcaster.Forwarder do
cond do
input_track == input.audio and rid == nil ->
PubSub.broadcast(@pubsub, "input:#{input.id}", {:input, input_pc, :audio, nil, packet})

if state.recordings_enabled?,
do: Recorder.record(Broadcaster.Recorder, input_track, nil, packet)

forward_audio_packet(packet, input.id, state)

input_track == input.video ->
PubSub.broadcast(@pubsub, "input:#{input.id}", {:input, input_pc, :video, rid, packet})

if state.recordings_enabled?,
do: Recorder.record(Broadcaster.Recorder, input_track, rid, packet)

forward_video_packet(packet, input.id, rid, state)

true ->
Expand Down
2 changes: 1 addition & 1 deletion broadcaster/lib/broadcaster/peer_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ defmodule Broadcaster.PeerSupervisor do
@video_codecs [
%RTPCodecParameters{
payload_type: 96,
mime_type: "video/H264",
mime_type: "video/VP8",
clock_rate: 90_000
}
]
Expand Down
5 changes: 3 additions & 2 deletions broadcaster/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Broadcaster.MixProject do
def project do
[
app: :broadcaster,
version: "0.8.1",
version: "0.9.0-dev",
elixir: "~> 1.14",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down Expand Up @@ -58,7 +58,8 @@ defmodule Broadcaster.MixProject do
{:jason, "~> 1.2"},
{:bandit, "~> 1.2"},
{:corsica, "~> 2.1.3"},
{:ex_webrtc, "~> 0.7.0"},
# {:ex_webrtc, "~> 0.7.0"},
{:ex_webrtc, github: "elixir-webrtc/ex_webrtc", override: true},
{:ex_webrtc_dashboard, "~> 0.7.0"},
{:earmark, "~> 1.4"},
{:libcluster, "~> 3.4"},
Expand Down
Loading
Loading