Skip to content

Commit be46322

Browse files
committed
add "bufferCapacity" as a parameter
1 parent 295ffc8 commit be46322

File tree

3 files changed

+52
-26
lines changed

3 files changed

+52
-26
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import com.google.firebase.gradle.plugins.license.LicenseResolverPlugin
1616
import com.google.firebase.gradle.MultiProjectReleasePlugin
1717

1818
buildscript {
19-
ext.kotlinVersion = '1.3.72'
19+
ext.kotlinVersion = '1.4.32'
2020
repositories {
2121
google()
2222
mavenCentral()

firebase-firestore/ktx/ktx.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ dependencies {
5151
implementation project(':firebase-common:ktx')
5252
implementation project(':firebase-firestore')
5353
implementation 'androidx.annotation:annotation:1.1.0'
54-
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.3'
54+
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.1'
5555
implementation 'com.google.android.gms:play-services-base:17.0.0'
5656
testImplementation project(':firebase-database-collection')
5757
testImplementation 'org.mockito:mockito-core:2.25.0'

firebase-firestore/ktx/src/main/kotlin/com/google/firebase/firestore/ktx/Firestore.kt

Lines changed: 50 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -14,27 +14,21 @@
1414

1515
package com.google.firebase.firestore.ktx
1616

17+
import android.support.multidex.BuildConfig
1718
import androidx.annotation.Keep
1819
import com.google.firebase.FirebaseApp
1920
import com.google.firebase.components.Component
2021
import com.google.firebase.components.ComponentRegistrar
21-
import com.google.firebase.firestore.DocumentSnapshot
22-
import com.google.firebase.firestore.FieldPath
23-
import com.google.firebase.firestore.FirebaseFirestore
24-
import com.google.firebase.firestore.FirebaseFirestoreSettings
25-
import com.google.firebase.firestore.QueryDocumentSnapshot
26-
import com.google.firebase.firestore.QuerySnapshot
27-
import com.google.firebase.firestore.FirebaseFirestoreSettings
28-
import com.google.firebase.firestore.MetadataChanges
29-
import com.google.firebase.firestore.ListenerRegistration
22+
import com.google.firebase.firestore.*
3023
import com.google.firebase.ktx.Firebase
3124
import com.google.firebase.platforminfo.LibraryVersionComponent
32-
import kotlinx.coroutines.Dispatchers
33-
import kotlinx.coroutines.NonCancellable
25+
import kotlinx.coroutines.*
3426
import kotlinx.coroutines.channels.Channel
27+
import kotlinx.coroutines.channels.awaitClose
3528
import kotlinx.coroutines.flow.Flow
29+
import kotlinx.coroutines.flow.buffer
30+
import kotlinx.coroutines.flow.callbackFlow
3631
import kotlinx.coroutines.flow.flow
37-
import kotlinx.coroutines.withContext
3832

3933
/** Returns the [FirebaseFirestore] instance of the default [FirebaseApp]. */
4034
val Firebase.firestore: FirebaseFirestore
@@ -173,37 +167,69 @@ class FirebaseFirestoreKtxRegistrar : ComponentRegistrar {
173167
}
174168

175169
/**
176-
* Attach a snapshotListener to a DocumentReference and use it as a coroutine flow
177-
* @param metadataChanges Indicates whether metadata-only changes
170+
* Transforms a [DocumentReference] into a coroutine [Flow]
171+
*
172+
* **Backpressure handling**: by default this method conflates items. If the consumer isn't fast enough,
173+
* it might miss some values but is always guaranteed to get the latest value. Use [bufferCapacity]
174+
* to change that behaviour
175+
*
176+
* @param metadataChanges controls metadata-only changes. Default: [MetadataChanges.EXCLUDE]
177+
* @param bufferCapacity the buffer capacity as in [Flow.buffer] or null to not buffer at all
178178
*/
179-
fun DocumentReference.toFlow(metadataChanges: MetadataChanges = MetadataChanges.EXCLUDE) =
180-
callbackFlow {
179+
fun DocumentReference.toFlow(
180+
metadataChanges: MetadataChanges = MetadataChanges.EXCLUDE,
181+
bufferCapacity: Int? = Channel.CONFLATED
182+
): Flow<DocumentSnapshot?> {
183+
val flow = callbackFlow {
181184
val registration = addSnapshotListener(metadataChanges) { snapshot, exception ->
182185
if (exception != null) {
183-
cancel(message = "Error getting DocumentReference snapshot", cause = exception)
186+
cancel(CancellationException("Error getting DocumentReference snapshot", exception))
184187
}
185188

186189
if (snapshot != null) {
187190
trySend(snapshot)
188191
}
189192
}
190193
awaitClose { registration.remove() }
191-
}.buffer(Channel.CONFLATED)
194+
}
195+
196+
return if (bufferCapacity != null) {
197+
flow.buffer(bufferCapacity)
198+
} else {
199+
flow
200+
}
201+
}
192202

193203
/**
194-
* Attach a snapshotListener to a Query and use it as a coroutine flow
195-
* @param metadataChanges Indicates whether metadata-only changes
204+
* Transforms a [Query] into a coroutine [Flow]
205+
*
206+
* **Backpressure handling**: by default this method conflates items. If the consumer isn't fast enough,
207+
* it might miss some values but is always guaranteed to get the latest value. Use [bufferCapacity]
208+
* to change that behaviour
209+
*
210+
* @param metadataChanges controls metadata-only changes. Default: [MetadataChanges.EXCLUDE]
211+
* @param bufferCapacity the buffer capacity as in [Flow.buffer] or null to not buffer at all
196212
*/
197-
fun Query.toFlow(metadataChanges: MetadataChanges = MetadataChanges.EXCLUDE) =
198-
callbackFlow {
213+
fun Query.toFlow(
214+
metadataChanges: MetadataChanges = MetadataChanges.EXCLUDE,
215+
bufferCapacity: Int? = Channel.CONFLATED
216+
): Flow<QuerySnapshot?> {
217+
val flow = callbackFlow {
199218
val registration = addSnapshotListener(metadataChanges) { snapshot, exception ->
200219
if (exception != null) {
201-
cancel(message = "Error getting Query snapshot", cause = exception)
220+
cancel(CancellationException("Error getting Query snapshot", exception))
202221
}
203222

204223
if (snapshot != null) {
205224
trySend(snapshot)
206225
}
207226
}
208227
awaitClose { registration.remove() }
209-
}.buffer(Channel.CONFLATED)
228+
}
229+
230+
return if (bufferCapacity != null) {
231+
flow.buffer(bufferCapacity)
232+
} else {
233+
flow
234+
}
235+
}

0 commit comments

Comments
 (0)