Skip to content

Commit a775625

Browse files
fix: subscribing to a message channel while unsubscribing is pending [MTT-3756] (#675)
* adding removal of pending sub/unsub when unsubscribing/subscribing * Adding tests * adding specificity to auto-injected GameObjects in MainMenu scene * Moving subscribing/unsubscribing to LifeStateChangedEventMessages in ServerBossRoomState to OnNetworkSpawn/Despawn
1 parent eb52c45 commit a775625

File tree

6 files changed

+148
-24
lines changed

6 files changed

+148
-24
lines changed

Assets/Scenes/MainMenu.unity

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
version https://git-lfs.github.com/spec/v1
2-
oid sha256:f9162f4f410f9bf87f75cb0b933e0902b542ef09f9b1c1b7a2a93ce01ec11bf6
3-
size 67373
2+
oid sha256:3ca89d724f7bc5f0ef930a26462369191017995509ae4cdaefafe43010e62491
3+
size 68542

Assets/Scripts/Gameplay/GameState/ServerBossRoomState.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,16 @@ void OnNetworkSpawn()
7171
return;
7272
}
7373

74+
m_Subscription = m_LifeStateChangedEventMessageSubscriber.Subscribe(OnLifeStateChangedEventMessage);
75+
7476
NetworkManager.Singleton.SceneManager.OnLoadComplete += OnServerLoadComplete;
7577
NetworkManager.Singleton.SceneManager.OnUnloadComplete += OnServerUnloadComplete;
7678
}
7779

7880
void OnNetworkDespawn()
7981
{
82+
m_Subscription?.Dispose();
83+
8084
NetworkManager.Singleton.SceneManager.OnLoadComplete -= OnServerLoadComplete;
8185
NetworkManager.Singleton.SceneManager.OnUnloadComplete -= OnServerUnloadComplete;
8286
}
@@ -96,7 +100,6 @@ void OnServerLoadComplete(ulong clientId, string sceneName, LoadSceneMode loadSc
96100
NetworkManager.Singleton.SceneManager.OnSynchronizeComplete += OnSynchronizeComplete;
97101

98102
SessionManager<SessionPlayerData>.Instance.OnSessionStarted();
99-
m_Subscription = m_LifeStateChangedEventMessageSubscriber.Subscribe(OnLifeStateChangedEventMessage);
100103
}
101104

102105
void OnServerUnloadComplete(ulong clientId, string sceneName)
@@ -109,7 +112,6 @@ void OnServerUnloadComplete(ulong clientId, string sceneName)
109112
NetworkManager.Singleton.OnClientDisconnectCallback -= OnClientDisconnect;
110113
NetworkManager.Singleton.SceneManager.OnLoadEventCompleted -= OnLoadEventCompleted;
111114
NetworkManager.Singleton.SceneManager.OnSynchronizeComplete -= OnSynchronizeComplete;
112-
m_Subscription?.Dispose();
113115
}
114116

115117
protected override void OnDestroy()

Assets/Scripts/Infrastructure/PubSub/MessageChannel.cs

Lines changed: 42 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@ public class MessageChannel<T> : IMessageChannel<T>
88
{
99
readonly List<Action<T>> m_MessageHandlers = new List<Action<T>>();
1010

11-
/// <summary>
12-
/// This queue of actions that would either add or remove subscriber is used to prevent problems from immediate modification
13-
/// of the list of subscribers. It could happen if one decides to unsubscribe in a message handler etc.
14-
/// </summary>
15-
readonly Queue<Action> m_PendingHandlers = new Queue<Action>();
11+
/// This dictionary of handlers to be either added or removed is used to prevent problems from immediate
12+
/// modification of the list of subscribers. It could happen if one decides to unsubscribe in a message handler
13+
/// etc.A true value means this handler should be added, and a false one means it should be removed
14+
readonly Dictionary<Action<T>, bool> m_PendingHandlers = new Dictionary<Action<T>, bool>();
1615

1716
public bool IsDisposed { get; private set; } = false;
1817

@@ -28,10 +27,18 @@ public virtual void Dispose()
2827

2928
public virtual void Publish(T message)
3029
{
31-
while (m_PendingHandlers.Count > 0)
30+
foreach (var handler in m_PendingHandlers.Keys)
3231
{
33-
m_PendingHandlers.Dequeue()?.Invoke();
32+
if (m_PendingHandlers[handler])
33+
{
34+
m_MessageHandlers.Add(handler);
35+
}
36+
else
37+
{
38+
m_MessageHandlers.Remove(handler);
39+
}
3440
}
41+
m_PendingHandlers.Clear();
3542

3643
foreach (var messageHandler in m_MessageHandlers)
3744
{
@@ -41,30 +48,46 @@ public virtual void Publish(T message)
4148

4249
public virtual IDisposable Subscribe(Action<T> handler)
4350
{
44-
Assert.IsTrue(!m_MessageHandlers.Contains(handler), "Attempting to subscribe with the same handler more than once");
45-
// so we don't modify the handler list while iterating on it (which could happen if the handler unsubscribes). With this, we'd unsubscribe on next publish.
46-
m_PendingHandlers.Enqueue(() => { DoSubscribe(handler); });
47-
48-
var subscription = new DisposableSubscription<T>(this, handler);
49-
return subscription;
51+
Assert.IsTrue(!IsSubscribed(handler), "Attempting to subscribe with the same handler more than once");
5052

51-
void DoSubscribe(Action<T> h)
53+
if (m_PendingHandlers.ContainsKey(handler))
5254
{
53-
if (h != null && !m_MessageHandlers.Contains(h))
55+
if (!m_PendingHandlers[handler])
5456
{
55-
m_MessageHandlers.Add(h);
57+
m_PendingHandlers.Remove(handler);
5658
}
5759
}
60+
else
61+
{
62+
m_PendingHandlers[handler] = true;
63+
}
64+
65+
var subscription = new DisposableSubscription<T>(this, handler);
66+
return subscription;
5867
}
5968

6069
public void Unsubscribe(Action<T> handler)
6170
{
62-
m_PendingHandlers.Enqueue(() => { DoUnsubscribe(handler); });
71+
Assert.IsTrue(IsSubscribed(handler), "Attempting to unsubscribe with a handler that is not subscribed");
6372

64-
void DoUnsubscribe(Action<T> h)
73+
if (m_PendingHandlers.ContainsKey(handler))
6574
{
66-
m_MessageHandlers.Remove(h);
75+
if (m_PendingHandlers[handler])
76+
{
77+
m_PendingHandlers.Remove(handler);
78+
}
6779
}
80+
else
81+
{
82+
m_PendingHandlers[handler] = false;
83+
}
84+
}
85+
86+
bool IsSubscribed(Action<T> handler)
87+
{
88+
var isPendingRemoval = m_PendingHandlers.ContainsKey(handler) && !m_PendingHandlers[handler];
89+
var isPendingAdding = m_PendingHandlers.ContainsKey(handler) && m_PendingHandlers[handler];
90+
return m_MessageHandlers.Contains(handler) && !isPendingRemoval || isPendingAdding;
6891
}
6992
}
7093
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
using System;
2+
using NUnit.Framework;
3+
using Unity.Multiplayer.Samples.BossRoom.Shared.Infrastructure;
4+
using UnityEngine;
5+
6+
namespace Unity.Multiplayer.Samples.BossRoom.Tests.Runtime
7+
{
8+
public class MessageChannelTests
9+
{
10+
struct EmptyMessage { }
11+
12+
int m_NbSubscribers = 2;
13+
int m_NbMessagesReceived;
14+
15+
IDisposable SubscribeToChannel(MessageChannel<EmptyMessage> channel)
16+
{
17+
var subscriptions = new DisposableGroup();
18+
subscriptions.Add(channel.Subscribe(Subscription1));
19+
subscriptions.Add(channel.Subscribe(Subscription2));
20+
21+
return subscriptions;
22+
}
23+
24+
void Subscription1(EmptyMessage message)
25+
{
26+
m_NbMessagesReceived++;
27+
}
28+
29+
void Subscription2(EmptyMessage message)
30+
{
31+
m_NbMessagesReceived++;
32+
}
33+
34+
35+
[SetUp]
36+
public void Setup()
37+
{
38+
m_NbMessagesReceived = 0;
39+
}
40+
41+
[Test]
42+
public void MessagePublishedIsReceivedByAllSubscribers()
43+
{
44+
var messageChannel = new MessageChannel<EmptyMessage>();
45+
var subscriptions = SubscribeToChannel(messageChannel);
46+
47+
messageChannel.Publish(new EmptyMessage());
48+
Assert.AreEqual(m_NbSubscribers, m_NbMessagesReceived);
49+
subscriptions.Dispose();
50+
}
51+
52+
[Test]
53+
public void MessagePublishedIsNotReceivedByAllSubscribersAfterUnsubscribing()
54+
{
55+
var messageChannel = new MessageChannel<EmptyMessage>();
56+
var subscriptions = SubscribeToChannel(messageChannel);
57+
58+
messageChannel.Publish(new EmptyMessage());
59+
Assert.AreEqual(m_NbSubscribers, m_NbMessagesReceived);
60+
61+
m_NbMessagesReceived = 0;
62+
63+
subscriptions.Dispose();
64+
65+
messageChannel.Publish(new EmptyMessage());
66+
Assert.AreEqual(0, m_NbMessagesReceived);
67+
}
68+
69+
[Test]
70+
public void MessagePublishedIsReceivedByAllSubscribersAfterResubscribing()
71+
{
72+
var messageChannel = new MessageChannel<EmptyMessage>();
73+
var subscriptions = SubscribeToChannel(messageChannel);
74+
75+
messageChannel.Publish(new EmptyMessage());
76+
Assert.AreEqual(m_NbSubscribers, m_NbMessagesReceived);
77+
78+
m_NbMessagesReceived = 0;
79+
80+
subscriptions.Dispose();
81+
subscriptions = SubscribeToChannel(messageChannel);
82+
83+
messageChannel.Publish(new EmptyMessage());
84+
Assert.AreEqual(m_NbSubscribers, m_NbMessagesReceived);
85+
subscriptions.Dispose();
86+
}
87+
}
88+
}

Assets/Tests/Runtime/MessageChannelTest.cs.meta

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ Additional documentation and release notes are available at [Multiplayer Documen
1818
### Removed
1919
*
2020
### Fixed
21-
*
21+
* Subscribing to a message channel while unsubscribing is pending (#675)
2222

2323
## [v1.3.0-pre] - 2022-06-23
2424

0 commit comments

Comments
 (0)