Skip to content

Commit 2ff2406

Browse files
committed
recent_senders: Introduce MessageIdTracker and RecentSenders data structures
MessageIdTracker data structure is used to keep track of message ids in an ascending sorted list. It is used in RecentSenders data structure. RecentSenders data structure is used to keep track of user messages in topics and streams. Much of this code is transcribed from Zulip web; in particular, from: https://github.com/zulip/zulip/blob/bd04a30bbc6dc5bd7c20940a3d1d34cf8c8c6f28/web/src/recent_senders.ts
1 parent e2b989d commit 2ff2406

File tree

5 files changed

+531
-0
lines changed

5 files changed

+531
-0
lines changed

lib/model/message_list.dart

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,7 @@ class MessageListView with ChangeNotifier, _MessageSequence {
403403
numAfter: 0,
404404
);
405405
store.reconcileMessages(result.messages);
406+
store.recentSenders.handleMessages(result.messages); // TODO(#824)
406407
for (final message in result.messages) {
407408
if (_messageVisible(message)) {
408409
_addMessage(message);
@@ -439,6 +440,7 @@ class MessageListView with ChangeNotifier, _MessageSequence {
439440
}
440441

441442
store.reconcileMessages(result.messages);
443+
store.recentSenders.handleMessages(result.messages); // TODO(#824)
442444

443445
final fetchedMessages = _allMessagesVisible
444446
? result.messages // Avoid unnecessarily copying the list.

lib/model/recent_senders.dart

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
import 'package:collection/collection.dart';
2+
import 'package:flutter/foundation.dart';
3+
4+
import '../api/model/events.dart';
5+
import '../api/model/model.dart';
6+
import 'algorithms.dart';
7+
8+
/// A data structure to keep track of stream and topic messages of users (senders).
9+
///
10+
/// Use [latestMessageIdOfSenderInStream] and [latestMessageIdOfSenderInTopic]
11+
/// to get the relevant data.
12+
class RecentSenders {
13+
// streamSenders[streamId][senderId] = MessageIdTracker
14+
@visibleForTesting
15+
final Map<int, Map<int, MessageIdTracker>> streamSenders = {};
16+
17+
// topicSenders[streamId][topic][senderId] = MessageIdTracker
18+
@visibleForTesting
19+
final Map<int, Map<String, Map<int, MessageIdTracker>>> topicSenders = {};
20+
21+
int? latestMessageIdOfSenderInStream({
22+
required int streamId,
23+
required int senderId,
24+
}) => streamSenders[streamId]?[senderId]?.maxId;
25+
26+
int? latestMessageIdOfSenderInTopic({
27+
required int streamId,
28+
required String topic,
29+
required int senderId,
30+
}) => topicSenders[streamId]?[topic]?[senderId]?.maxId;
31+
32+
/// Records the necessary data from each message if it is a [StreamMessage].
33+
///
34+
/// [messages] should be sorted by [id] ascendingly, which are, the way app
35+
/// receives and handles messages.
36+
void handleMessages(List<Message> messages) {
37+
final messagesByUserInStream = <(int, int), List<int>>{};
38+
final messagesByUserInTopic = <(int, String, int), List<int>>{};
39+
for (final message in messages) {
40+
if (message is! StreamMessage) continue;
41+
final StreamMessage(:streamId, :topic, :senderId, id: int messageId) = message;
42+
(messagesByUserInStream[(streamId, senderId)] ??= []).add(messageId);
43+
(messagesByUserInTopic[(streamId, topic, senderId)] ??= []).add(messageId);
44+
}
45+
46+
for (final entry in messagesByUserInStream.entries) {
47+
final (streamId, senderId) = entry.key;
48+
((streamSenders[streamId] ??= {})
49+
[senderId] ??= MessageIdTracker()).addAll(entry.value);
50+
}
51+
for (final entry in messagesByUserInTopic.entries) {
52+
final (streamId, topic, senderId) = entry.key;
53+
(((topicSenders[streamId] ??= {})[topic] ??= {})
54+
[senderId] ??= MessageIdTracker()).addAll(entry.value);
55+
}
56+
}
57+
58+
/// Records the necessary data from [message] if it is a [StreamMessage].
59+
///
60+
/// If [message] is not a [StreamMessage], this is a no-op.
61+
void handleMessage(Message message) {
62+
if (message is! StreamMessage) return;
63+
final StreamMessage(:streamId, :topic, :senderId, id: int messageId) = message;
64+
((streamSenders[streamId] ??= {})
65+
[senderId] ??= MessageIdTracker()).add(messageId);
66+
(((topicSenders[streamId] ??= {})[topic] ??= {})
67+
[senderId] ??= MessageIdTracker()).add(messageId);
68+
}
69+
70+
void handleDeleteMessageEvent(DeleteMessageEvent event, Map<int, Message> cachedMessages) {
71+
if (event.messageType != MessageType.stream) return;
72+
73+
final messagesByUser = <int, List<int>>{};
74+
for (final id in event.messageIds) {
75+
final message = cachedMessages[id] as StreamMessage?;
76+
if (message == null) continue;
77+
(messagesByUser[message.senderId] ??= []).add(id);
78+
}
79+
80+
final DeleteMessageEvent(:streamId!, :topic!) = event;
81+
final sendersByStream = streamSenders[streamId];
82+
final topicsByStream = topicSenders[streamId];
83+
final sendersByTopic = topicsByStream?[topic];
84+
for (final entry in messagesByUser.entries) {
85+
final MapEntry(key: senderId, value: messages) = entry;
86+
87+
final messagesBySenderInStream = sendersByStream?[senderId];
88+
messagesBySenderInStream?.removeAll(messages);
89+
if (messagesBySenderInStream?.maxId == null) sendersByStream?.remove(senderId);
90+
91+
final messagesBySenderInTopic = sendersByTopic?[senderId];
92+
messagesBySenderInTopic?.removeAll(messages);
93+
if (messagesBySenderInTopic?.maxId == null) sendersByTopic?.remove(senderId);
94+
}
95+
if (sendersByStream?.isEmpty ?? false) streamSenders.remove(streamId);
96+
if (sendersByTopic?.isEmpty ?? false) topicsByStream?.remove(topic);
97+
if (topicsByStream?.isEmpty ?? false) topicSenders.remove(streamId);
98+
}
99+
}
100+
101+
@visibleForTesting
102+
class MessageIdTracker {
103+
/// A list of distinct message IDs, sorted ascendingly.
104+
@visibleForTesting
105+
QueueList<int> ids = QueueList();
106+
107+
/// The maximum id in the tracker list, or `null` if the list is empty.
108+
int? get maxId => ids.lastOrNull;
109+
110+
/// Add the message ID to the tracker list at the proper place, if not present.
111+
///
112+
/// Optimized, taking O(1) time for the case where that place is the end,
113+
/// because that's the common case for a message that is received through
114+
/// [PerAccountStore.handleEvent]. May take O(n) time in some rare cases.
115+
void add(int id) {
116+
if (ids.isEmpty || id > ids.last) {
117+
ids.addLast(id);
118+
return;
119+
}
120+
final i = lowerBound(ids, id);
121+
if (i < ids.length && ids[i] == id) {
122+
// The ID is already present. Nothing to do.
123+
return;
124+
}
125+
ids.insert(i, id);
126+
}
127+
128+
/// Add the messages IDs to the tracker list at the proper place, if not present.
129+
///
130+
/// [newIds] should be sorted ascendingly.
131+
void addAll(List<int> newIds) {
132+
if (ids.isEmpty) {
133+
ids = QueueList.from(newIds);
134+
return;
135+
}
136+
ids = setUnion(ids, newIds);
137+
}
138+
139+
void removeAll(List<int> idsToRemove) {
140+
ids.removeWhere((id) {
141+
final i = lowerBound(idsToRemove, id);
142+
return i < idsToRemove.length && idsToRemove[i] == id;
143+
});
144+
}
145+
146+
@override
147+
String toString() => ids.toString();
148+
}

lib/model/store.dart

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import 'database.dart';
2323
import 'message.dart';
2424
import 'message_list.dart';
2525
import 'recent_dm_conversations.dart';
26+
import 'recent_senders.dart';
2627
import 'channel.dart';
2728
import 'typing_status.dart';
2829
import 'unreads.dart';
@@ -256,6 +257,7 @@ class PerAccountStore extends ChangeNotifier with ChannelStore, MessageStore {
256257
),
257258
recentDmConversationsView: RecentDmConversationsView(
258259
initial: initialSnapshot.recentPrivateConversations, selfUserId: account.userId),
260+
recentSenders: RecentSenders(),
259261
);
260262
}
261263

@@ -276,6 +278,7 @@ class PerAccountStore extends ChangeNotifier with ChannelStore, MessageStore {
276278
required MessageStoreImpl messages,
277279
required this.unreads,
278280
required this.recentDmConversationsView,
281+
required this.recentSenders,
279282
}) : assert(selfUserId == globalStore.getAccount(accountId)!.userId),
280283
assert(realmUrl == globalStore.getAccount(accountId)!.realmUrl),
281284
assert(realmUrl == connection.realmUrl),
@@ -369,6 +372,8 @@ class PerAccountStore extends ChangeNotifier with ChannelStore, MessageStore {
369372

370373
final RecentDmConversationsView recentDmConversationsView;
371374

375+
final RecentSenders recentSenders;
376+
372377
////////////////////////////////
373378
// Other digests of data.
374379

@@ -492,6 +497,7 @@ class PerAccountStore extends ChangeNotifier with ChannelStore, MessageStore {
492497
_messages.handleMessageEvent(event);
493498
unreads.handleMessageEvent(event);
494499
recentDmConversationsView.handleMessageEvent(event);
500+
recentSenders.handleMessage(event.message); // TODO(#824)
495501
// When adding anything here (to handle [MessageEvent]),
496502
// it probably belongs in [reconcileMessages] too.
497503

@@ -502,6 +508,11 @@ class PerAccountStore extends ChangeNotifier with ChannelStore, MessageStore {
502508

503509
case DeleteMessageEvent():
504510
assert(debugLog("server event: delete_message ${event.messageIds}"));
511+
// This should be called before [_messages.handleDeleteMessageEvent(event)],
512+
// as we need to know about each message for [event.messageIds],
513+
// specifically, their `senderId`s. By calling this after the
514+
// aforementioned line, we'll lose reference to those messages.
515+
recentSenders.handleDeleteMessageEvent(event, messages);
505516
_messages.handleDeleteMessageEvent(event);
506517
unreads.handleDeleteMessageEvent(event);
507518

test/model/message_list_test.dart

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import '../api/model/model_checks.dart';
1818
import '../example_data.dart' as eg;
1919
import '../stdlib_checks.dart';
2020
import 'content_checks.dart';
21+
import 'recent_senders_test.dart' as recent_senders_test;
2122
import 'test_store.dart';
2223

2324
void main() {
@@ -141,6 +142,25 @@ void main() {
141142
..haveOldest.isTrue();
142143
});
143144

145+
// TODO(#824): move this test
146+
test('fetchInitial, recent senders track all the messages', () async {
147+
const narrow = CombinedFeedNarrow();
148+
await prepare(narrow: narrow);
149+
final messages = [
150+
eg.streamMessage(),
151+
// Not subscribed to the stream with id 10.
152+
eg.streamMessage(stream: eg.stream(streamId: 10)),
153+
];
154+
connection.prepare(json: newestResult(
155+
foundOldest: false,
156+
messages: messages,
157+
).toJson());
158+
await model.fetchInitial();
159+
160+
check(model).messages.length.equals(1);
161+
recent_senders_test.checkMatchesMessages(store.recentSenders, messages);
162+
});
163+
144164
test('fetchOlder', () async {
145165
const narrow = CombinedFeedNarrow();
146166
await prepare(narrow: narrow);
@@ -233,6 +253,27 @@ void main() {
233253
..messages.length.equals(200);
234254
});
235255

256+
// TODO(#824): move this test
257+
test('fetchOlder, recent senders track all the messages', () async {
258+
const narrow = CombinedFeedNarrow();
259+
await prepare(narrow: narrow);
260+
final initialMessages = List.generate(10, (i) => eg.streamMessage(id: 100 + i));
261+
await prepareMessages(foundOldest: false, messages: initialMessages);
262+
263+
final oldMessages = List.generate(10, (i) => eg.streamMessage(id: 89 + i))
264+
// Not subscribed to the stream with id 10.
265+
..add(eg.streamMessage(id: 99, stream: eg.stream(streamId: 10)));
266+
connection.prepare(json: olderResult(
267+
anchor: 100, foundOldest: false,
268+
messages: oldMessages,
269+
).toJson());
270+
await model.fetchOlder();
271+
272+
check(model).messages.length.equals(20);
273+
recent_senders_test.checkMatchesMessages(store.recentSenders,
274+
[...initialMessages, ...oldMessages]);
275+
});
276+
236277
test('MessageEvent', () async {
237278
final stream = eg.stream();
238279
await prepare(narrow: StreamNarrow(stream.streamId));

0 commit comments

Comments
 (0)