-
Notifications
You must be signed in to change notification settings - Fork 619
Extend Firebase SDK with new APIs to consume streaming callable function response #6602
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
emilypgoogle
merged 63 commits into
firebase:main
from
MustafaJadid2025:stream-functions-api
Mar 10, 2025
Merged
Changes from 59 commits
Commits
Show all changes
63 commits
Select commit
Hold shift + click to select a range
535fb90
Extend Firebase SDK with new APIs to consume streaming callable funct…
MustafaJadid2025 9e13ef7
Extend Firebase SDK with new APIs to consume streaming callable funct…
MustafaJadid2025 6d3012d
Merge remote-tracking branch 'origin/stream-functions-api' into strea…
MustafaJadid2025 a4a442c
Merge branch 'firebase:main' into stream-functions-api
MustafaJadid2025 1fa85a6
Update the SSETaskListener implementation to conform to the org.react…
MustafaJadid2025 21b45b0
Merge branch 'main' into stream-functions-api
MustafaJadid2025 5fda178
Merge branch 'main' into stream-functions-api
MustafaJadid2025 a244078
Merge branch 'main' of https://github.com/MustafaJadid2025/firebase-a…
MustafaJadid2025 05b9772
Merge remote-tracking branch 'origin/stream-functions-api' into strea…
MustafaJadid2025 7034537
Refactor Stream Listener Implementation
MustafaJadid2025 7f0382b
Fix test cases on StreamTests.
MustafaJadid2025 e009960
Optimize streaming by introducing reactive.streams
MustafaJadid2025 8ea53bd
Merge branch 'main' into stream-functions-api
MustafaJadid2025 1584847
Fix test case testGenStreamError.
MustafaJadid2025 ee964a5
Update api.txt.
MustafaJadid2025 895ee02
Remove StreamListener.kt and StreamFunctionsTask.kt
MustafaJadid2025 e2069d4
Merge branch 'main' into stream-functions-api
MustafaJadid2025 0fd7894
Merge branch 'main' into stream-functions-api
MustafaJadid2025 4f8f7d4
Introduce StreamResponse to accommodate Stream Message and Result.
MustafaJadid2025 38f3e8e
Refactor StreamResponse, PublisherStream.
MustafaJadid2025 e8d0f32
Merge branch 'main' into stream-functions-api
MustafaJadid2025 5883572
Refactor StreamResponse.
MustafaJadid2025 da164ed
Merge branch 'main' into stream-functions-api
MustafaJadid2025 d649ff4
Merge branch 'main' into stream-functions-api
MustafaJadid2025 ca17db7
Add Copyright to StreamTests.
MustafaJadid2025 f2efd11
Update firebase-functions/src/main/java/com/google/firebase/functions…
MustafaJadid2025 c3fb9f1
Design StreamSubscriber class inside StreamTests to prevent calling e…
MustafaJadid2025 2a4591a
Remove duplicated code inside the stream method.
MustafaJadid2025 e29489e
Add Copyright to PublisherStream.
MustafaJadid2025 5d56561
Add Copyright to StreamResponse.
MustafaJadid2025 dd1bf7a
Modified the index.js stream functions by using sendChunk method.
MustafaJadid2025 e5fd3c0
Rename data to message inside the StreamResponse class.
MustafaJadid2025 22d723e
Rename data to message inside the StreamResponse class.
MustafaJadid2025 63356ab
Modify Kdoc on HttpsCallableReference and StreamResponse.
MustafaJadid2025 6215438
Optimized the overloaded method stream() on HttpCallableReference.
MustafaJadid2025 fff07fa
Merge branch 'main' into stream-functions-api
MustafaJadid2025 ed21d7d
Fix test case genStreamError_receivesErrorAndStops.
MustafaJadid2025 d8aa361
Merge remote-tracking branch 'origin/stream-functions-api' into strea…
MustafaJadid2025 a6f661a
Format index.js
MustafaJadid2025 fdea72b
Format index.js
MustafaJadid2025 31a7120
Fix Subscription Handling: Ensure subscribe Does Not Trigger Backend …
MustafaJadid2025 db2d021
Add helper methods to check whether the 'StreamResponse' is a Message…
MustafaJadid2025 f6f1934
Merge branch 'main' into stream-functions-api
rlazo ef7132b
Run script to update the api.txt
MustafaJadid2025 89fa46a
Revert helper methods on StreamResponse and rename properties on Clas…
MustafaJadid2025 06b78c8
Remove the base property from the StreamResponse.
MustafaJadid2025 06b2e61
Make the private constructor at StreamResponse to prevent inheriting …
MustafaJadid2025 d839f6d
Merge branch 'main' into stream-functions-api
MustafaJadid2025 3f313c7
Add a test case where the client input is processed.
MustafaJadid2025 e3c26a3
Optimize the stream() method on HttpCallableReference.
MustafaJadid2025 14ea9a6
Improve Subscription Handling: Ensure onError() Terminates Properly, …
MustafaJadid2025 37eed41
Merge branch 'main' into stream-functions-api
MustafaJadid2025 fc2516c
Merge branch 'main' into stream-functions-api
MustafaJadid2025 2fa63b8
Remove excessive synchronization in methods notifyError, notifyComplete.
MustafaJadid2025 2f2d66d
Merge branch 'main' into stream-functions-api
MustafaJadid2025 b6d9b22
Add firebase functions genStreamEmpty, genStreamResultOnly, genStream…
MustafaJadid2025 5541852
Correct the KDoc format.
MustafaJadid2025 38876b5
Add test cases to cover genStreamEmpty, genStreamResultOnly, genStrea…
MustafaJadid2025 b74c68c
Merge branch 'main' into stream-functions-api
MustafaJadid2025 76567b4
Add annotation @JvmOverloads to stream method on HttpsCallableReference.
MustafaJadid2025 2cf4099
Merge branch 'main' into stream-functions-api
MustafaJadid2025 005223c
Update api.txt.
MustafaJadid2025 4e87b0c
Merge remote-tracking branch 'origin/stream-functions-api' into strea…
MustafaJadid2025 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
218 changes: 218 additions & 0 deletions
218
firebase-functions/src/androidTest/java/com/google/firebase/functions/StreamTests.kt
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,218 @@ | ||
/* | ||
* Copyright 2025 Google LLC | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.google.firebase.functions | ||
|
||
import androidx.test.core.app.ApplicationProvider | ||
import androidx.test.ext.junit.runners.AndroidJUnit4 | ||
import com.google.common.truth.Truth.assertThat | ||
import com.google.firebase.Firebase | ||
import com.google.firebase.initialize | ||
import java.util.concurrent.TimeUnit | ||
import kotlinx.coroutines.delay | ||
import kotlinx.coroutines.reactive.asFlow | ||
import kotlinx.coroutines.runBlocking | ||
import kotlinx.coroutines.withTimeout | ||
import org.junit.Before | ||
import org.junit.Test | ||
import org.junit.runner.RunWith | ||
import org.reactivestreams.Subscriber | ||
import org.reactivestreams.Subscription | ||
|
||
@RunWith(AndroidJUnit4::class) | ||
class StreamTests { | ||
MustafaJadid2025 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
private lateinit var functions: FirebaseFunctions | ||
|
||
@Before | ||
fun setup() { | ||
Firebase.initialize(ApplicationProvider.getApplicationContext()) | ||
functions = Firebase.functions | ||
} | ||
|
||
internal class StreamSubscriber : Subscriber<StreamResponse> { | ||
internal val messages = mutableListOf<StreamResponse.Message>() | ||
internal var result: StreamResponse.Result? = null | ||
internal var throwable: Throwable? = null | ||
internal var isComplete = false | ||
internal lateinit var subscription: Subscription | ||
|
||
override fun onSubscribe(subscription: Subscription) { | ||
this.subscription = subscription | ||
subscription.request(Long.MAX_VALUE) | ||
} | ||
|
||
override fun onNext(streamResponse: StreamResponse) { | ||
if (streamResponse is StreamResponse.Message) { | ||
messages.add(streamResponse) | ||
} else { | ||
result = streamResponse as StreamResponse.Result | ||
} | ||
} | ||
|
||
override fun onError(t: Throwable?) { | ||
throwable = t | ||
} | ||
|
||
override fun onComplete() { | ||
isComplete = true | ||
} | ||
} | ||
|
||
@Test | ||
fun genStream_withPublisher_receivesMessagesAndFinalResult() = runBlocking { | ||
val input = mapOf("data" to "Why is the sky blue") | ||
val function = functions.getHttpsCallable("genStream") | ||
val subscriber = StreamSubscriber() | ||
|
||
function.stream(input).subscribe(subscriber) | ||
|
||
while (!subscriber.isComplete) { | ||
delay(100) | ||
} | ||
assertThat(subscriber.messages.map { it.message.data.toString() }) | ||
.containsExactly("hello", "world", "this", "is", "cool") | ||
assertThat(subscriber.result).isNotNull() | ||
assertThat(subscriber.result!!.result.data.toString()).isEqualTo("hello world this is cool") | ||
assertThat(subscriber.throwable).isNull() | ||
assertThat(subscriber.isComplete).isTrue() | ||
} | ||
|
||
@Test | ||
fun genStream_withFlow_receivesMessagesAndFinalResult() = runBlocking { | ||
val input = mapOf("data" to "Why is the sky blue") | ||
val function = functions.getHttpsCallable("genStream") | ||
var isComplete = false | ||
var throwable: Throwable? = null | ||
val messages = mutableListOf<StreamResponse.Message>() | ||
var result: StreamResponse.Result? = null | ||
|
||
val flow = function.stream(input).asFlow() | ||
try { | ||
withTimeout(1000) { | ||
flow.collect { response -> | ||
if (response is StreamResponse.Message) { | ||
messages.add(response) | ||
} else { | ||
result = response as StreamResponse.Result | ||
} | ||
} | ||
} | ||
isComplete = true | ||
} catch (e: Throwable) { | ||
throwable = e | ||
} | ||
|
||
assertThat(messages.map { it.message.data.toString() }) | ||
.containsExactly("hello", "world", "this", "is", "cool") | ||
assertThat(result).isNotNull() | ||
assertThat(result!!.result.data.toString()).isEqualTo("hello world this is cool") | ||
assertThat(throwable).isNull() | ||
assertThat(isComplete).isTrue() | ||
} | ||
|
||
@Test | ||
fun genStreamError_receivesError() = runBlocking { | ||
val input = mapOf("data" to "test error") | ||
val function = | ||
functions.getHttpsCallable("genStreamError").withTimeout(2000, TimeUnit.MILLISECONDS) | ||
val subscriber = StreamSubscriber() | ||
|
||
function.stream(input).subscribe(subscriber) | ||
|
||
withTimeout(2000) { | ||
while (subscriber.throwable == null) { | ||
delay(100) | ||
} | ||
} | ||
|
||
assertThat(subscriber.throwable).isNotNull() | ||
emilypgoogle marked this conversation as resolved.
Show resolved
Hide resolved
|
||
assertThat(subscriber.throwable).isInstanceOf(FirebaseFunctionsException::class.java) | ||
} | ||
|
||
@Test | ||
fun genStreamWeather_receivesWeatherForecasts() = runBlocking { | ||
val inputData = listOf(mapOf("name" to "Toronto"), mapOf("name" to "London")) | ||
val input = mapOf("data" to inputData) | ||
|
||
val function = functions.getHttpsCallable("genStreamWeather") | ||
val subscriber = StreamSubscriber() | ||
|
||
function.stream(input).subscribe(subscriber) | ||
|
||
while (!subscriber.isComplete) { | ||
delay(100) | ||
} | ||
|
||
assertThat(subscriber.messages.map { it.message.data.toString() }) | ||
.containsExactly( | ||
"{temperature=25, location={name=Toronto}, conditions=snowy}", | ||
"{temperature=50, location={name=London}, conditions=rainy}" | ||
) | ||
assertThat(subscriber.result).isNotNull() | ||
assertThat(subscriber.result!!.result.data.toString()).contains("forecasts") | ||
assertThat(subscriber.throwable).isNull() | ||
assertThat(subscriber.isComplete).isTrue() | ||
} | ||
|
||
@Test | ||
fun genStreamEmpty_receivesNoMessages() = runBlocking { | ||
val function = functions.getHttpsCallable("genStreamEmpty") | ||
val subscriber = StreamSubscriber() | ||
|
||
function.stream(mapOf("data" to "test")).subscribe(subscriber) | ||
|
||
withTimeout(2000) { delay(500) } | ||
assertThat(subscriber.messages).isEmpty() | ||
assertThat(subscriber.result).isNull() | ||
} | ||
|
||
@Test | ||
fun genStreamResultOnly_receivesOnlyResult() = runBlocking { | ||
val function = functions.getHttpsCallable("genStreamResultOnly") | ||
val subscriber = StreamSubscriber() | ||
|
||
function.stream(mapOf("data" to "test")).subscribe(subscriber) | ||
|
||
while (!subscriber.isComplete) { | ||
delay(100) | ||
} | ||
assertThat(subscriber.messages).isEmpty() | ||
assertThat(subscriber.result).isNotNull() | ||
assertThat(subscriber.result!!.result.data.toString()).isEqualTo("Only a result") | ||
} | ||
|
||
@Test | ||
fun genStreamLargeData_receivesMultipleChunks() = runBlocking { | ||
val function = functions.getHttpsCallable("genStreamLargeData") | ||
val subscriber = StreamSubscriber() | ||
|
||
function.stream(mapOf("data" to "test large data")).subscribe(subscriber) | ||
|
||
while (!subscriber.isComplete) { | ||
delay(100) | ||
} | ||
assertThat(subscriber.messages).isNotEmpty() | ||
assertThat(subscriber.messages.size).isEqualTo(10) | ||
val receivedString = | ||
subscriber.messages.joinToString(separator = "") { it.message.data.toString() } | ||
val expectedString = "A".repeat(10000) | ||
assertThat(receivedString.length).isEqualTo(10000) | ||
assertThat(receivedString).isEqualTo(expectedString) | ||
assertThat(subscriber.result).isNotNull() | ||
assertThat(subscriber.result!!.result.data.toString()).isEqualTo("Stream Completed") | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.