@@ -176,46 +176,34 @@ class FirebaseFirestoreKtxRegistrar : ComponentRegistrar {
176
176
* Attach a snapshotListener to a DocumentReference and use it as a coroutine flow
177
177
* @param metadataChanges Indicates whether metadata-only changes
178
178
*/
179
- fun DocumentReference.toFlow (metadataChanges : MetadataChanges = MetadataChanges .EXCLUDE ) = flow {
180
- val channel = Channel <DocumentSnapshot >(Channel .CONFLATED )
181
- var listener: ListenerRegistration ? = null
182
- withContext(Dispatchers .Main .immediate) {
183
- listener = addSnapshotListener(metadataChanges) { value, error ->
184
- value?.let { channel.offer(it) }
185
- error?.let { channel.close(it) }
179
+ fun DocumentReference.toFlow (metadataChanges : MetadataChanges = MetadataChanges .EXCLUDE ) =
180
+ callbackFlow {
181
+ val registration = addSnapshotListener(metadataChanges) { snapshot, exception ->
182
+ if (exception != null ) {
183
+ cancel(message = " Error getting DocumentReference snapshot" , cause = exception)
184
+ }
185
+
186
+ if (snapshot != null ) {
187
+ trySend(snapshot)
188
+ }
186
189
}
187
- }
188
- try {
189
- for (value in channel) {
190
- emit(value)
191
- }
192
- } finally {
193
- withContext(Dispatchers .Main .immediate + NonCancellable ) {
194
- listener?.remove()
195
- }
196
- }
197
- }
190
+ awaitClose { registration.remove() }
191
+ }.buffer(Channel .CONFLATED )
198
192
199
193
/* *
200
194
* Attach a snapshotListener to a Query and use it as a coroutine flow
201
195
* @param metadataChanges Indicates whether metadata-only changes
202
196
*/
203
- fun Query.toFlow (metadataChanges : MetadataChanges = MetadataChanges .EXCLUDE ): Flow <QuerySnapshot > = flow {
204
- val channel = Channel <QuerySnapshot >(Channel .CONFLATED )
205
- var listener: ListenerRegistration ? = null
206
- withContext(Dispatchers .Main .immediate) {
207
- listener = addSnapshotListener(metadataChanges) { value, error ->
208
- value?.let { channel.offer(it) }
209
- error?.let { channel.close(it) }
210
- }
211
- }
212
- try {
213
- for (value in channel) {
214
- emit(value)
215
- }
216
- } finally {
217
- withContext(Dispatchers .Main .immediate + NonCancellable ) {
218
- listener?.remove()
197
+ fun Query.toFlow (metadataChanges : MetadataChanges = MetadataChanges .EXCLUDE ) =
198
+ callbackFlow {
199
+ val registration = addSnapshotListener(metadataChanges) { snapshot, exception ->
200
+ if (exception != null ) {
201
+ cancel(message = " Error getting Query snapshot" , cause = exception)
202
+ }
203
+
204
+ if (snapshot != null ) {
205
+ trySend(snapshot)
206
+ }
219
207
}
220
- }
221
- }
208
+ awaitClose { registration.remove() }
209
+ }.buffer( Channel . CONFLATED )
0 commit comments