Skip to content

Extend Firebase SDK with new APIs to consume streaming callable function response #6602

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 63 commits into from
Mar 10, 2025
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
535fb90
Extend Firebase SDK with new APIs to consume streaming callable funct…
MustafaJadid2025 Dec 16, 2024
9e13ef7
Extend Firebase SDK with new APIs to consume streaming callable funct…
MustafaJadid2025 Dec 16, 2024
6d3012d
Merge remote-tracking branch 'origin/stream-functions-api' into strea…
MustafaJadid2025 Dec 26, 2024
a4a442c
Merge branch 'firebase:main' into stream-functions-api
MustafaJadid2025 Dec 26, 2024
1fa85a6
Update the SSETaskListener implementation to conform to the org.react…
MustafaJadid2025 Dec 27, 2024
21b45b0
Merge branch 'main' into stream-functions-api
MustafaJadid2025 Jan 22, 2025
5fda178
Merge branch 'main' into stream-functions-api
MustafaJadid2025 Jan 28, 2025
a244078
Merge branch 'main' of https://github.com/MustafaJadid2025/firebase-a…
MustafaJadid2025 Jan 28, 2025
05b9772
Merge remote-tracking branch 'origin/stream-functions-api' into strea…
MustafaJadid2025 Jan 28, 2025
7034537
Refactor Stream Listener Implementation
MustafaJadid2025 Jan 30, 2025
7f0382b
Fix test cases on StreamTests.
MustafaJadid2025 Jan 30, 2025
e009960
Optimize streaming by introducing reactive.streams
MustafaJadid2025 Feb 4, 2025
8ea53bd
Merge branch 'main' into stream-functions-api
MustafaJadid2025 Feb 4, 2025
1584847
Fix test case testGenStreamError.
MustafaJadid2025 Feb 4, 2025
ee964a5
Update api.txt.
MustafaJadid2025 Feb 4, 2025
895ee02
Remove StreamListener.kt and StreamFunctionsTask.kt
MustafaJadid2025 Feb 4, 2025
e2069d4
Merge branch 'main' into stream-functions-api
MustafaJadid2025 Feb 6, 2025
0fd7894
Merge branch 'main' into stream-functions-api
MustafaJadid2025 Feb 11, 2025
4f8f7d4
Introduce StreamResponse to accommodate Stream Message and Result.
MustafaJadid2025 Feb 11, 2025
38f3e8e
Refactor StreamResponse, PublisherStream.
MustafaJadid2025 Feb 12, 2025
e8d0f32
Merge branch 'main' into stream-functions-api
MustafaJadid2025 Feb 12, 2025
5883572
Refactor StreamResponse.
MustafaJadid2025 Feb 13, 2025
da164ed
Merge branch 'main' into stream-functions-api
MustafaJadid2025 Feb 14, 2025
d649ff4
Merge branch 'main' into stream-functions-api
MustafaJadid2025 Feb 18, 2025
ca17db7
Add Copyright to StreamTests.
MustafaJadid2025 Feb 19, 2025
f2efd11
Update firebase-functions/src/main/java/com/google/firebase/functions…
MustafaJadid2025 Feb 19, 2025
c3fb9f1
Design StreamSubscriber class inside StreamTests to prevent calling e…
MustafaJadid2025 Feb 19, 2025
2a4591a
Remove duplicated code inside the stream method.
MustafaJadid2025 Feb 19, 2025
e29489e
Add Copyright to PublisherStream.
MustafaJadid2025 Feb 19, 2025
5d56561
Add Copyright to StreamResponse.
MustafaJadid2025 Feb 19, 2025
dd1bf7a
Modified the index.js stream functions by using sendChunk method.
MustafaJadid2025 Feb 19, 2025
e5fd3c0
Rename data to message inside the StreamResponse class.
MustafaJadid2025 Feb 19, 2025
22d723e
Rename data to message inside the StreamResponse class.
MustafaJadid2025 Feb 19, 2025
63356ab
Modify Kdoc on HttpsCallableReference and StreamResponse.
MustafaJadid2025 Feb 19, 2025
6215438
Optimized the overloaded method stream() on HttpCallableReference.
MustafaJadid2025 Feb 19, 2025
fff07fa
Merge branch 'main' into stream-functions-api
MustafaJadid2025 Feb 19, 2025
ed21d7d
Fix test case genStreamError_receivesErrorAndStops.
MustafaJadid2025 Feb 19, 2025
d8aa361
Merge remote-tracking branch 'origin/stream-functions-api' into strea…
MustafaJadid2025 Feb 19, 2025
a6f661a
Format index.js
MustafaJadid2025 Feb 20, 2025
fdea72b
Format index.js
MustafaJadid2025 Feb 20, 2025
31a7120
Fix Subscription Handling: Ensure subscribe Does Not Trigger Backend …
MustafaJadid2025 Feb 20, 2025
db2d021
Add helper methods to check whether the 'StreamResponse' is a Message…
MustafaJadid2025 Feb 25, 2025
f6f1934
Merge branch 'main' into stream-functions-api
rlazo Feb 25, 2025
ef7132b
Run script to update the api.txt
MustafaJadid2025 Feb 25, 2025
89fa46a
Revert helper methods on StreamResponse and rename properties on Clas…
MustafaJadid2025 Feb 25, 2025
06b78c8
Remove the base property from the StreamResponse.
MustafaJadid2025 Feb 25, 2025
06b2e61
Make the private constructor at StreamResponse to prevent inheriting …
MustafaJadid2025 Feb 25, 2025
d839f6d
Merge branch 'main' into stream-functions-api
MustafaJadid2025 Mar 3, 2025
3f313c7
Add a test case where the client input is processed.
MustafaJadid2025 Mar 4, 2025
e3c26a3
Optimize the stream() method on HttpCallableReference.
MustafaJadid2025 Mar 4, 2025
14ea9a6
Improve Subscription Handling: Ensure onError() Terminates Properly, …
MustafaJadid2025 Mar 4, 2025
37eed41
Merge branch 'main' into stream-functions-api
MustafaJadid2025 Mar 4, 2025
fc2516c
Merge branch 'main' into stream-functions-api
MustafaJadid2025 Mar 5, 2025
2fa63b8
Remove excessive synchronization in methods notifyError, notifyComplete.
MustafaJadid2025 Mar 5, 2025
2f2d66d
Merge branch 'main' into stream-functions-api
MustafaJadid2025 Mar 5, 2025
b6d9b22
Add firebase functions genStreamEmpty, genStreamResultOnly, genStream…
MustafaJadid2025 Mar 6, 2025
5541852
Correct the KDoc format.
MustafaJadid2025 Mar 6, 2025
38876b5
Add test cases to cover genStreamEmpty, genStreamResultOnly, genStrea…
MustafaJadid2025 Mar 6, 2025
b74c68c
Merge branch 'main' into stream-functions-api
MustafaJadid2025 Mar 7, 2025
76567b4
Add annotation @JvmOverloads to stream method on HttpsCallableReference.
MustafaJadid2025 Mar 10, 2025
2cf4099
Merge branch 'main' into stream-functions-api
MustafaJadid2025 Mar 10, 2025
005223c
Update api.txt.
MustafaJadid2025 Mar 10, 2025
4e87b0c
Merge remote-tracking branch 'origin/stream-functions-api' into strea…
MustafaJadid2025 Mar 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions firebase-functions/api.txt
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ package com.google.firebase.functions {
public final class HttpsCallableReference {
method public com.google.android.gms.tasks.Task<com.google.firebase.functions.HttpsCallableResult> call();
method public com.google.android.gms.tasks.Task<com.google.firebase.functions.HttpsCallableResult> call(Object? data);
method @NonNull public org.reactivestreams.Publisher stream(@Nullable Object data = null);
method @NonNull public org.reactivestreams.Publisher stream();
method public long getTimeout();
method public void setTimeout(long timeout, java.util.concurrent.TimeUnit units);
method public com.google.firebase.functions.HttpsCallableReference withTimeout(long timeout, java.util.concurrent.TimeUnit units);
Expand Down
2 changes: 2 additions & 0 deletions firebase-functions/firebase-functions.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ dependencies {
implementation(libs.okhttp)
implementation(libs.playservices.base)
implementation(libs.playservices.basement)
implementation(libs.reactive.streams)

api(libs.playservices.tasks)

kapt(libs.autovalue)
Expand Down
55 changes: 55 additions & 0 deletions firebase-functions/src/androidTest/backend/functions/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,58 @@ exports.timeoutTest = functions.https.onRequest((request, response) => {
// Wait for longer than 500ms.
setTimeout(() => response.send({data: true}), 500);
});

const data = ["hello", "world", "this", "is", "cool"];

/**
* Pauses the execution for a specified amount of time.
* @param {number} ms - The number of milliseconds to sleep.
* @return {Promise<void>} A promise that resolves after the specified time.
*/
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}

/**
* Generates chunks of text asynchronously, yielding one chunk at a time.
* @async
* @generator
* @yields {string} A chunk of text from the data array.
*/
async function* generateText() {
for (const chunk of data) {
yield chunk;
await sleep(1000);
}
}

exports.genStream = functions.https.onCall(async (request, response) => {
if (response && response.acceptsStreaming) {
for await (const chunk of generateText()) {
console.log("got chunk", chunk);
response.write({chunk});
}
}
return data.join(" ");
});

exports.genStreamError = functions.https.onCall(async (request, response) => {
if (response && response.acceptsStreaming) {
for await (const chunk of generateText()) {
console.log("got chunk", chunk);
response.write({chunk});
}
throw new Error("BOOM");
}
});

exports.genStreamNoReturn = functions.https.onCall(
async (request, response) => {
if (response && response.acceptsStreaming) {
for await (const chunk of generateText()) {
console.log("got chunk", chunk);
response.write({chunk});
}
}
},
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package com.google.firebase.functions.ktx

import androidx.test.core.app.ApplicationProvider
import androidx.test.ext.junit.runners.AndroidJUnit4
import com.google.common.truth.Truth.assertThat
import com.google.firebase.Firebase
import com.google.firebase.functions.FirebaseFunctions
import com.google.firebase.functions.StreamResponse
import com.google.firebase.functions.functions
import com.google.firebase.initialize
import java.util.concurrent.TimeUnit
import org.junit.After
import org.junit.Before
import org.junit.Test
import org.junit.runner.RunWith
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription

@RunWith(AndroidJUnit4::class)
class StreamTests {

private lateinit var functions: FirebaseFunctions
var onNextList = mutableListOf<StreamResponse>()
private lateinit var subscriber: Subscriber<StreamResponse>
private var throwable: Throwable? = null
private var isComplete = false

@Before
fun setup() {
Firebase.initialize(ApplicationProvider.getApplicationContext())
functions = Firebase.functions
subscriber =
object : Subscriber<StreamResponse> {
override fun onSubscribe(subscription: Subscription?) {
subscription?.request(1)
}

override fun onNext(streamResponse: StreamResponse) {
onNextList.add(streamResponse)
}

override fun onError(t: Throwable?) {
throwable = t
}

override fun onComplete() {
isComplete = true
}
}
}

@After
fun clear() {
onNextList.clear()
throwable = null
isComplete = false
}

@Test
fun testGenStream() {
val input = hashMapOf("data" to "Why is the sky blue")
val function = functions.getHttpsCallable("genStream")

function.stream(input).subscribe(subscriber)

Thread.sleep(8000)
val onNextStringList = onNextList.map { it.data.toString() }
assertThat(onNextStringList)
.containsExactly(
"{chunk=hello}",
"{chunk=world}",
"{chunk=this}",
"{chunk=is}",
"{chunk=cool}",
"hello world this is cool"
)
assertThat(throwable).isNull()
assertThat(isComplete).isTrue()
}

@Test
fun testGenStreamError() {
val input = hashMapOf("data" to "Why is the sky blue")
val function =
functions.getHttpsCallable("genStreamError").withTimeout(800, TimeUnit.MILLISECONDS)

function.stream(input).subscribe(subscriber)
Thread.sleep(2000)

val onNextStringList = onNextList.map { it.data.toString() }
assertThat(onNextStringList)
.containsExactly(
"{chunk=hello}",
)
assertThat(throwable).isNotNull()
assertThat(isComplete).isFalse()
}

@Test
fun testGenStreamNoReturn() {
val input = hashMapOf("data" to "Why is the sky blue")
val function = functions.getHttpsCallable("genStreamNoReturn")

function.stream(input).subscribe(subscriber)
Thread.sleep(8000)

val onNextStringList = onNextList.map { it.data.toString() }
assertThat(onNextStringList)
.containsExactly(
"{chunk=hello}",
"{chunk=world}",
"{chunk=this}",
"{chunk=is}",
"{chunk=cool}"
)
assertThat(throwable).isNull()
assertThat(isComplete).isFalse()
}

@Test
fun testGenStream_cancelStream() {
val input = hashMapOf("data" to "Why is the sky blue")
val function = functions.getHttpsCallable("genStreamNoReturn")
val publisher = function.stream(input)
var subscription: Subscription? = null
val cancelableSubscriber =
object : Subscriber<StreamResponse> {
override fun onSubscribe(s: Subscription?) {
subscription = s
s?.request(1)
}

override fun onNext(streamResponse: StreamResponse) {
onNextList.add(streamResponse)
}

override fun onError(t: Throwable?) {
throwable = t
}

override fun onComplete() {
isComplete = true
}
}

publisher.subscribe(cancelableSubscriber)
Thread.sleep(2000)
subscription?.cancel()
Thread.sleep(6000)

val onNextStringList = onNextList.map { it.data.toString() }
assertThat(onNextStringList)
.containsExactly(
"{chunk=hello}",
"{chunk=world}",
)
assertThat(throwable).isNotNull()
assertThat(requireNotNull(throwable).message).isEqualTo("Stream was canceled")
assertThat(isComplete).isFalse()
}
}
Loading