Skip to content

Commit 06357bd

Browse files
committed
Merge branch 'develop' into sam/feat/dedicated-server-support
* develop: fix: subscribing to a message channel while unsubscribing is pending [MTT-3756] (#675) # Conflicts: # Assets/Scripts/Gameplay/GameState/ServerBossRoomState.cs
2 parents 2456c2a + a775625 commit 06357bd

File tree

5 files changed

+144
-22
lines changed

5 files changed

+144
-22
lines changed

Assets/Scenes/MainMenu.unity

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
version https://git-lfs.github.com/spec/v1
2-
oid sha256:f9162f4f410f9bf87f75cb0b933e0902b542ef09f9b1c1b7a2a93ce01ec11bf6
3-
size 67373
2+
oid sha256:3ca89d724f7bc5f0ef930a26462369191017995509ae4cdaefafe43010e62491
3+
size 68542

Assets/Scripts/Infrastructure/PubSub/MessageChannel.cs

Lines changed: 42 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@ public class MessageChannel<T> : IMessageChannel<T>
88
{
99
readonly List<Action<T>> m_MessageHandlers = new List<Action<T>>();
1010

11-
/// <summary>
12-
/// This queue of actions that would either add or remove subscriber is used to prevent problems from immediate modification
13-
/// of the list of subscribers. It could happen if one decides to unsubscribe in a message handler etc.
14-
/// </summary>
15-
readonly Queue<Action> m_PendingHandlers = new Queue<Action>();
11+
/// This dictionary of handlers to be either added or removed is used to prevent problems from immediate
12+
/// modification of the list of subscribers. It could happen if one decides to unsubscribe in a message handler
13+
/// etc.A true value means this handler should be added, and a false one means it should be removed
14+
readonly Dictionary<Action<T>, bool> m_PendingHandlers = new Dictionary<Action<T>, bool>();
1615

1716
public bool IsDisposed { get; private set; } = false;
1817

@@ -28,10 +27,18 @@ public virtual void Dispose()
2827

2928
public virtual void Publish(T message)
3029
{
31-
while (m_PendingHandlers.Count > 0)
30+
foreach (var handler in m_PendingHandlers.Keys)
3231
{
33-
m_PendingHandlers.Dequeue()?.Invoke();
32+
if (m_PendingHandlers[handler])
33+
{
34+
m_MessageHandlers.Add(handler);
35+
}
36+
else
37+
{
38+
m_MessageHandlers.Remove(handler);
39+
}
3440
}
41+
m_PendingHandlers.Clear();
3542

3643
foreach (var messageHandler in m_MessageHandlers)
3744
{
@@ -41,30 +48,46 @@ public virtual void Publish(T message)
4148

4249
public virtual IDisposable Subscribe(Action<T> handler)
4350
{
44-
Assert.IsTrue(!m_MessageHandlers.Contains(handler), "Attempting to subscribe with the same handler more than once");
45-
// so we don't modify the handler list while iterating on it (which could happen if the handler unsubscribes). With this, we'd unsubscribe on next publish.
46-
m_PendingHandlers.Enqueue(() => { DoSubscribe(handler); });
47-
48-
var subscription = new DisposableSubscription<T>(this, handler);
49-
return subscription;
51+
Assert.IsTrue(!IsSubscribed(handler), "Attempting to subscribe with the same handler more than once");
5052

51-
void DoSubscribe(Action<T> h)
53+
if (m_PendingHandlers.ContainsKey(handler))
5254
{
53-
if (h != null && !m_MessageHandlers.Contains(h))
55+
if (!m_PendingHandlers[handler])
5456
{
55-
m_MessageHandlers.Add(h);
57+
m_PendingHandlers.Remove(handler);
5658
}
5759
}
60+
else
61+
{
62+
m_PendingHandlers[handler] = true;
63+
}
64+
65+
var subscription = new DisposableSubscription<T>(this, handler);
66+
return subscription;
5867
}
5968

6069
public void Unsubscribe(Action<T> handler)
6170
{
62-
m_PendingHandlers.Enqueue(() => { DoUnsubscribe(handler); });
71+
Assert.IsTrue(IsSubscribed(handler), "Attempting to unsubscribe with a handler that is not subscribed");
6372

64-
void DoUnsubscribe(Action<T> h)
73+
if (m_PendingHandlers.ContainsKey(handler))
6574
{
66-
m_MessageHandlers.Remove(h);
75+
if (m_PendingHandlers[handler])
76+
{
77+
m_PendingHandlers.Remove(handler);
78+
}
6779
}
80+
else
81+
{
82+
m_PendingHandlers[handler] = false;
83+
}
84+
}
85+
86+
bool IsSubscribed(Action<T> handler)
87+
{
88+
var isPendingRemoval = m_PendingHandlers.ContainsKey(handler) && !m_PendingHandlers[handler];
89+
var isPendingAdding = m_PendingHandlers.ContainsKey(handler) && m_PendingHandlers[handler];
90+
return m_MessageHandlers.Contains(handler) && !isPendingRemoval || isPendingAdding;
6891
}
6992
}
7093
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
using System;
2+
using NUnit.Framework;
3+
using Unity.Multiplayer.Samples.BossRoom.Shared.Infrastructure;
4+
using UnityEngine;
5+
6+
namespace Unity.Multiplayer.Samples.BossRoom.Tests.Runtime
7+
{
8+
public class MessageChannelTests
9+
{
10+
struct EmptyMessage { }
11+
12+
int m_NbSubscribers = 2;
13+
int m_NbMessagesReceived;
14+
15+
IDisposable SubscribeToChannel(MessageChannel<EmptyMessage> channel)
16+
{
17+
var subscriptions = new DisposableGroup();
18+
subscriptions.Add(channel.Subscribe(Subscription1));
19+
subscriptions.Add(channel.Subscribe(Subscription2));
20+
21+
return subscriptions;
22+
}
23+
24+
void Subscription1(EmptyMessage message)
25+
{
26+
m_NbMessagesReceived++;
27+
}
28+
29+
void Subscription2(EmptyMessage message)
30+
{
31+
m_NbMessagesReceived++;
32+
}
33+
34+
35+
[SetUp]
36+
public void Setup()
37+
{
38+
m_NbMessagesReceived = 0;
39+
}
40+
41+
[Test]
42+
public void MessagePublishedIsReceivedByAllSubscribers()
43+
{
44+
var messageChannel = new MessageChannel<EmptyMessage>();
45+
var subscriptions = SubscribeToChannel(messageChannel);
46+
47+
messageChannel.Publish(new EmptyMessage());
48+
Assert.AreEqual(m_NbSubscribers, m_NbMessagesReceived);
49+
subscriptions.Dispose();
50+
}
51+
52+
[Test]
53+
public void MessagePublishedIsNotReceivedByAllSubscribersAfterUnsubscribing()
54+
{
55+
var messageChannel = new MessageChannel<EmptyMessage>();
56+
var subscriptions = SubscribeToChannel(messageChannel);
57+
58+
messageChannel.Publish(new EmptyMessage());
59+
Assert.AreEqual(m_NbSubscribers, m_NbMessagesReceived);
60+
61+
m_NbMessagesReceived = 0;
62+
63+
subscriptions.Dispose();
64+
65+
messageChannel.Publish(new EmptyMessage());
66+
Assert.AreEqual(0, m_NbMessagesReceived);
67+
}
68+
69+
[Test]
70+
public void MessagePublishedIsReceivedByAllSubscribersAfterResubscribing()
71+
{
72+
var messageChannel = new MessageChannel<EmptyMessage>();
73+
var subscriptions = SubscribeToChannel(messageChannel);
74+
75+
messageChannel.Publish(new EmptyMessage());
76+
Assert.AreEqual(m_NbSubscribers, m_NbMessagesReceived);
77+
78+
m_NbMessagesReceived = 0;
79+
80+
subscriptions.Dispose();
81+
subscriptions = SubscribeToChannel(messageChannel);
82+
83+
messageChannel.Publish(new EmptyMessage());
84+
Assert.AreEqual(m_NbSubscribers, m_NbMessagesReceived);
85+
subscriptions.Dispose();
86+
}
87+
}
88+
}

Assets/Tests/Runtime/MessageChannelTest.cs.meta

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ Additional documentation and release notes are available at [Multiplayer Documen
1818
### Removed
1919
*
2020
### Fixed
21-
*
21+
* Subscribing to a message channel while unsubscribing is pending (#675)
2222

2323
## [v1.3.0-pre] - 2022-06-23
2424

0 commit comments

Comments
 (0)