@@ -26,10 +26,15 @@ import com.google.firebase.firestore.QueryDocumentSnapshot
26
26
import com.google.firebase.firestore.QuerySnapshot
27
27
import com.google.firebase.firestore.FirebaseFirestoreSettings
28
28
import com.google.firebase.firestore.MetadataChanges
29
+ import com.google.firebase.firestore.ListenerRegistration
29
30
import com.google.firebase.ktx.Firebase
30
31
import com.google.firebase.platforminfo.LibraryVersionComponent
31
- import kotlinx.coroutines.channels.awaitClose
32
- import kotlinx.coroutines.flow.callbackFlow
32
+ import kotlinx.coroutines.Dispatchers
33
+ import kotlinx.coroutines.NonCancellable
34
+ import kotlinx.coroutines.channels.Channel
35
+ import kotlinx.coroutines.flow.Flow
36
+ import kotlinx.coroutines.flow.flow
37
+ import kotlinx.coroutines.withContext
33
38
34
39
/* * Returns the [FirebaseFirestore] instance of the default [FirebaseApp]. */
35
40
val Firebase .firestore: FirebaseFirestore
@@ -171,46 +176,46 @@ class FirebaseFirestoreKtxRegistrar : ComponentRegistrar {
171
176
* Attach a snapshotListener to a DocumentReference and use it as a coroutine flow
172
177
* @param metadataChanges Indicates whether metadata-only changes
173
178
*/
174
- fun DocumentReference.toFlow (metadataChanges : MetadataChanges = MetadataChanges .EXCLUDE ) = callbackFlow {
175
- val listener = addSnapshotListener(metadataChanges) { value, error ->
176
- if (value != null && value.exists()) {
177
- /* *
178
- * Offer will throw if the channel is canceled.
179
- * To avoid that, we wrap the call in runCatching.
180
- * See https://github.com/Kotlin/kotlinx.coroutines/issues/974
181
- */
182
- runCatching {
183
- offer(value)
184
- }
185
- } else if (error != null ) {
186
- close(error)
179
+ fun DocumentReference.toFlow (metadataChanges : MetadataChanges = MetadataChanges .EXCLUDE ) = flow {
180
+ val channel = Channel <DocumentSnapshot >(Channel .CONFLATED )
181
+ var listener: ListenerRegistration ? = null
182
+ withContext(Dispatchers .Main .immediate) {
183
+ listener = addSnapshotListener(metadataChanges) { value, error ->
184
+ value?.let { channel.offer(it) }
185
+ error?.let { channel.close(it) }
187
186
}
188
187
}
189
- awaitClose {
190
- listener.remove()
188
+ try {
189
+ for (value in channel) {
190
+ emit(value)
191
+ }
192
+ } finally {
193
+ withContext(Dispatchers .Main .immediate + NonCancellable ) {
194
+ listener?.remove()
195
+ }
191
196
}
192
197
}
193
198
194
199
/* *
195
200
* Attach a snapshotListener to a Query and use it as a coroutine flow
196
201
* @param metadataChanges Indicates whether metadata-only changes
197
202
*/
198
- fun Query.toFlow (metadataChanges : MetadataChanges = MetadataChanges .EXCLUDE ) = callbackFlow {
199
- val listener = addSnapshotListener(metadataChanges) { value, error ->
200
- if (value != null ) {
201
- /* *
202
- * Offer will throw if the channel is canceled.
203
- * To avoid that, we wrap the call in runCatching.
204
- * See https://github.com/Kotlin/kotlinx.coroutines/issues/974
205
- */
206
- runCatching {
207
- offer(value)
208
- }
209
- } else if (error != null ) {
210
- close(error)
203
+ fun Query.toFlow (metadataChanges : MetadataChanges = MetadataChanges .EXCLUDE ): Flow <QuerySnapshot > = flow {
204
+ val channel = Channel <QuerySnapshot >(Channel .CONFLATED )
205
+ var listener: ListenerRegistration ? = null
206
+ withContext(Dispatchers .Main .immediate) {
207
+ listener = addSnapshotListener(metadataChanges) { value, error ->
208
+ value?.let { channel.offer(it) }
209
+ error?.let { channel.close(it) }
211
210
}
212
211
}
213
- awaitClose {
214
- listener.remove()
212
+ try {
213
+ for (value in channel) {
214
+ emit(value)
215
+ }
216
+ } finally {
217
+ withContext(Dispatchers .Main .immediate + NonCancellable ) {
218
+ listener?.remove()
219
+ }
215
220
}
216
221
}
0 commit comments