Skip to content

Extend Firebase SDK with new APIs to consume streaming callable function response #6602

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 63 commits into from
Mar 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
535fb90
Extend Firebase SDK with new APIs to consume streaming callable funct…
MustafaJadid2025 Dec 16, 2024
9e13ef7
Extend Firebase SDK with new APIs to consume streaming callable funct…
MustafaJadid2025 Dec 16, 2024
6d3012d
Merge remote-tracking branch 'origin/stream-functions-api' into strea…
MustafaJadid2025 Dec 26, 2024
a4a442c
Merge branch 'firebase:main' into stream-functions-api
MustafaJadid2025 Dec 26, 2024
1fa85a6
Update the SSETaskListener implementation to conform to the org.react…
MustafaJadid2025 Dec 27, 2024
21b45b0
Merge branch 'main' into stream-functions-api
MustafaJadid2025 Jan 22, 2025
5fda178
Merge branch 'main' into stream-functions-api
MustafaJadid2025 Jan 28, 2025
a244078
Merge branch 'main' of https://github.com/MustafaJadid2025/firebase-a…
MustafaJadid2025 Jan 28, 2025
05b9772
Merge remote-tracking branch 'origin/stream-functions-api' into strea…
MustafaJadid2025 Jan 28, 2025
7034537
Refactor Stream Listener Implementation
MustafaJadid2025 Jan 30, 2025
7f0382b
Fix test cases on StreamTests.
MustafaJadid2025 Jan 30, 2025
e009960
Optimize streaming by introducing reactive.streams
MustafaJadid2025 Feb 4, 2025
8ea53bd
Merge branch 'main' into stream-functions-api
MustafaJadid2025 Feb 4, 2025
1584847
Fix test case testGenStreamError.
MustafaJadid2025 Feb 4, 2025
ee964a5
Update api.txt.
MustafaJadid2025 Feb 4, 2025
895ee02
Remove StreamListener.kt and StreamFunctionsTask.kt
MustafaJadid2025 Feb 4, 2025
e2069d4
Merge branch 'main' into stream-functions-api
MustafaJadid2025 Feb 6, 2025
0fd7894
Merge branch 'main' into stream-functions-api
MustafaJadid2025 Feb 11, 2025
4f8f7d4
Introduce StreamResponse to accommodate Stream Message and Result.
MustafaJadid2025 Feb 11, 2025
38f3e8e
Refactor StreamResponse, PublisherStream.
MustafaJadid2025 Feb 12, 2025
e8d0f32
Merge branch 'main' into stream-functions-api
MustafaJadid2025 Feb 12, 2025
5883572
Refactor StreamResponse.
MustafaJadid2025 Feb 13, 2025
da164ed
Merge branch 'main' into stream-functions-api
MustafaJadid2025 Feb 14, 2025
d649ff4
Merge branch 'main' into stream-functions-api
MustafaJadid2025 Feb 18, 2025
ca17db7
Add Copyright to StreamTests.
MustafaJadid2025 Feb 19, 2025
f2efd11
Update firebase-functions/src/main/java/com/google/firebase/functions…
MustafaJadid2025 Feb 19, 2025
c3fb9f1
Design StreamSubscriber class inside StreamTests to prevent calling e…
MustafaJadid2025 Feb 19, 2025
2a4591a
Remove duplicated code inside the stream method.
MustafaJadid2025 Feb 19, 2025
e29489e
Add Copyright to PublisherStream.
MustafaJadid2025 Feb 19, 2025
5d56561
Add Copyright to StreamResponse.
MustafaJadid2025 Feb 19, 2025
dd1bf7a
Modified the index.js stream functions by using sendChunk method.
MustafaJadid2025 Feb 19, 2025
e5fd3c0
Rename data to message inside the StreamResponse class.
MustafaJadid2025 Feb 19, 2025
22d723e
Rename data to message inside the StreamResponse class.
MustafaJadid2025 Feb 19, 2025
63356ab
Modify Kdoc on HttpsCallableReference and StreamResponse.
MustafaJadid2025 Feb 19, 2025
6215438
Optimized the overloaded method stream() on HttpCallableReference.
MustafaJadid2025 Feb 19, 2025
fff07fa
Merge branch 'main' into stream-functions-api
MustafaJadid2025 Feb 19, 2025
ed21d7d
Fix test case genStreamError_receivesErrorAndStops.
MustafaJadid2025 Feb 19, 2025
d8aa361
Merge remote-tracking branch 'origin/stream-functions-api' into strea…
MustafaJadid2025 Feb 19, 2025
a6f661a
Format index.js
MustafaJadid2025 Feb 20, 2025
fdea72b
Format index.js
MustafaJadid2025 Feb 20, 2025
31a7120
Fix Subscription Handling: Ensure subscribe Does Not Trigger Backend …
MustafaJadid2025 Feb 20, 2025
db2d021
Add helper methods to check whether the 'StreamResponse' is a Message…
MustafaJadid2025 Feb 25, 2025
f6f1934
Merge branch 'main' into stream-functions-api
rlazo Feb 25, 2025
ef7132b
Run script to update the api.txt
MustafaJadid2025 Feb 25, 2025
89fa46a
Revert helper methods on StreamResponse and rename properties on Clas…
MustafaJadid2025 Feb 25, 2025
06b78c8
Remove the base property from the StreamResponse.
MustafaJadid2025 Feb 25, 2025
06b2e61
Make the private constructor at StreamResponse to prevent inheriting …
MustafaJadid2025 Feb 25, 2025
d839f6d
Merge branch 'main' into stream-functions-api
MustafaJadid2025 Mar 3, 2025
3f313c7
Add a test case where the client input is processed.
MustafaJadid2025 Mar 4, 2025
e3c26a3
Optimize the stream() method on HttpCallableReference.
MustafaJadid2025 Mar 4, 2025
14ea9a6
Improve Subscription Handling: Ensure onError() Terminates Properly, …
MustafaJadid2025 Mar 4, 2025
37eed41
Merge branch 'main' into stream-functions-api
MustafaJadid2025 Mar 4, 2025
fc2516c
Merge branch 'main' into stream-functions-api
MustafaJadid2025 Mar 5, 2025
2fa63b8
Remove excessive synchronization in methods notifyError, notifyComplete.
MustafaJadid2025 Mar 5, 2025
2f2d66d
Merge branch 'main' into stream-functions-api
MustafaJadid2025 Mar 5, 2025
b6d9b22
Add firebase functions genStreamEmpty, genStreamResultOnly, genStream…
MustafaJadid2025 Mar 6, 2025
5541852
Correct the KDoc format.
MustafaJadid2025 Mar 6, 2025
38876b5
Add test cases to cover genStreamEmpty, genStreamResultOnly, genStrea…
MustafaJadid2025 Mar 6, 2025
b74c68c
Merge branch 'main' into stream-functions-api
MustafaJadid2025 Mar 7, 2025
76567b4
Add annotation @JvmOverloads to stream method on HttpsCallableReference.
MustafaJadid2025 Mar 10, 2025
2cf4099
Merge branch 'main' into stream-functions-api
MustafaJadid2025 Mar 10, 2025
005223c
Update api.txt.
MustafaJadid2025 Mar 10, 2025
4e87b0c
Merge remote-tracking branch 'origin/stream-functions-api' into strea…
MustafaJadid2025 Mar 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions firebase-functions/api.txt
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ package com.google.firebase.functions {
method public com.google.android.gms.tasks.Task<com.google.firebase.functions.HttpsCallableResult> call(Object? data);
method public long getTimeout();
method public void setTimeout(long timeout, java.util.concurrent.TimeUnit units);
method public org.reactivestreams.Publisher<com.google.firebase.functions.StreamResponse> stream();
method public org.reactivestreams.Publisher<com.google.firebase.functions.StreamResponse> stream(Object? data = null);
method public com.google.firebase.functions.HttpsCallableReference withTimeout(long timeout, java.util.concurrent.TimeUnit units);
property public final long timeout;
}
Expand All @@ -93,6 +95,21 @@ package com.google.firebase.functions {
field public final Object? data;
}

public abstract class StreamResponse {
}

public static final class StreamResponse.Message extends com.google.firebase.functions.StreamResponse {
ctor public StreamResponse.Message(com.google.firebase.functions.HttpsCallableResult message);
method public com.google.firebase.functions.HttpsCallableResult getMessage();
property public final com.google.firebase.functions.HttpsCallableResult message;
}

public static final class StreamResponse.Result extends com.google.firebase.functions.StreamResponse {
ctor public StreamResponse.Result(com.google.firebase.functions.HttpsCallableResult result);
method public com.google.firebase.functions.HttpsCallableResult getResult();
property public final com.google.firebase.functions.HttpsCallableResult result;
}

}

package com.google.firebase.functions.ktx {
Expand Down
3 changes: 3 additions & 0 deletions firebase-functions/firebase-functions.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ dependencies {
implementation(libs.okhttp)
implementation(libs.playservices.base)
implementation(libs.playservices.basement)
implementation(libs.reactive.streams)

api(libs.playservices.tasks)

kapt(libs.autovalue)
Expand All @@ -131,6 +133,7 @@ dependencies {
androidTestImplementation(libs.truth)
androidTestImplementation(libs.androidx.test.runner)
androidTestImplementation(libs.androidx.test.junit)
androidTestImplementation(libs.kotlinx.coroutines.reactive)
androidTestImplementation(libs.mockito.core)
androidTestImplementation(libs.mockito.dexmaker)
kapt("com.google.dagger:dagger-android-processor:2.43.2")
Expand Down
107 changes: 107 additions & 0 deletions firebase-functions/src/androidTest/backend/functions/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@

const assert = require('assert');
const functions = require('firebase-functions');
const functionsV2 = require('firebase-functions/v2');

/**
* Pauses the execution for a specified amount of time.
* @param {number} ms - The number of milliseconds to sleep.
* @return {Promise<void>} A promise that resolves after the specified time.
*/
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}

exports.dataTest = functions.https.onRequest((request, response) => {
assert.deepEqual(request.body, {
Expand Down Expand Up @@ -122,3 +132,100 @@ exports.timeoutTest = functions.https.onRequest((request, response) => {
// Wait for longer than 500ms.
setTimeout(() => response.send({data: true}), 500);
});

const streamData = ['hello', 'world', 'this', 'is', 'cool'];

/**
* Generates chunks of text asynchronously, yielding one chunk at a time.
* @async
* @generator
* @yields {string} A chunk of text from the data array.
*/
async function* generateText() {
for (const chunk of streamData) {
yield chunk;
await sleep(100);
}
}

exports.genStream = functionsV2.https.onCall(async (request, response) => {
if (request.acceptsStreaming) {
for await (const chunk of generateText()) {
response.sendChunk(chunk);
}
}
return streamData.join(' ');
});

exports.genStreamError = functionsV2.https.onCall(
async (request, response) => {
// Note: The functions backend does not pass the error message to the
// client at this time.
throw Error("BOOM")
});

const weatherForecasts = {
Toronto: { conditions: 'snowy', temperature: 25 },
London: { conditions: 'rainy', temperature: 50 },
Dubai: { conditions: 'sunny', temperature: 75 }
};

/**
* Generates weather forecasts asynchronously for the given locations.
* @async
* @generator
* @param {Array<{name: string}>} locations - An array of location objects.
*/
async function* generateForecast(locations) {
for (const location of locations) {
yield { 'location': location, ...weatherForecasts[location.name] };
await sleep(100);
}
};

exports.genStreamWeather = functionsV2.https.onCall(
async (request, response) => {
const locations = request.data && request.data.data?
request.data.data: [];
const forecasts = [];
if (request.acceptsStreaming) {
for await (const chunk of generateForecast(locations)) {
forecasts.push(chunk);
response.sendChunk(chunk);
}
}
return {forecasts};
});

exports.genStreamEmpty = functionsV2.https.onCall(
async (request, response) => {
if (request.acceptsStreaming) {
// Send no chunks
}
// Implicitly return null.
}
);

exports.genStreamResultOnly = functionsV2.https.onCall(
async (request, response) => {
if (request.acceptsStreaming) {
// Do not send any chunks.
}
return "Only a result";
}
);

exports.genStreamLargeData = functionsV2.https.onCall(
async (request, response) => {
if (request.acceptsStreaming) {
const largeString = 'A'.repeat(10000);
const chunkSize = 1024;
for (let i = 0; i < largeString.length; i += chunkSize) {
const chunk = largeString.substring(i, i + chunkSize);
response.sendChunk(chunk);
await sleep(100);
}
}
return "Stream Completed";
}
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
/*
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.firebase.functions

import androidx.test.core.app.ApplicationProvider
import androidx.test.ext.junit.runners.AndroidJUnit4
import com.google.common.truth.Truth.assertThat
import com.google.firebase.Firebase
import com.google.firebase.initialize
import java.util.concurrent.TimeUnit
import kotlinx.coroutines.delay
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout
import org.junit.Before
import org.junit.Test
import org.junit.runner.RunWith
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription

@RunWith(AndroidJUnit4::class)
class StreamTests {

private lateinit var functions: FirebaseFunctions

@Before
fun setup() {
Firebase.initialize(ApplicationProvider.getApplicationContext())
functions = Firebase.functions
}

internal class StreamSubscriber : Subscriber<StreamResponse> {
internal val messages = mutableListOf<StreamResponse.Message>()
internal var result: StreamResponse.Result? = null
internal var throwable: Throwable? = null
internal var isComplete = false
internal lateinit var subscription: Subscription

override fun onSubscribe(subscription: Subscription) {
this.subscription = subscription
subscription.request(Long.MAX_VALUE)
}

override fun onNext(streamResponse: StreamResponse) {
if (streamResponse is StreamResponse.Message) {
messages.add(streamResponse)
} else {
result = streamResponse as StreamResponse.Result
}
}

override fun onError(t: Throwable?) {
throwable = t
}

override fun onComplete() {
isComplete = true
}
}

@Test
fun genStream_withPublisher_receivesMessagesAndFinalResult() = runBlocking {
val input = mapOf("data" to "Why is the sky blue")
val function = functions.getHttpsCallable("genStream")
val subscriber = StreamSubscriber()

function.stream(input).subscribe(subscriber)

while (!subscriber.isComplete) {
delay(100)
}
assertThat(subscriber.messages.map { it.message.data.toString() })
.containsExactly("hello", "world", "this", "is", "cool")
assertThat(subscriber.result).isNotNull()
assertThat(subscriber.result!!.result.data.toString()).isEqualTo("hello world this is cool")
assertThat(subscriber.throwable).isNull()
assertThat(subscriber.isComplete).isTrue()
}

@Test
fun genStream_withFlow_receivesMessagesAndFinalResult() = runBlocking {
val input = mapOf("data" to "Why is the sky blue")
val function = functions.getHttpsCallable("genStream")
var isComplete = false
var throwable: Throwable? = null
val messages = mutableListOf<StreamResponse.Message>()
var result: StreamResponse.Result? = null

val flow = function.stream(input).asFlow()
try {
withTimeout(1000) {
flow.collect { response ->
if (response is StreamResponse.Message) {
messages.add(response)
} else {
result = response as StreamResponse.Result
}
}
}
isComplete = true
} catch (e: Throwable) {
throwable = e
}

assertThat(messages.map { it.message.data.toString() })
.containsExactly("hello", "world", "this", "is", "cool")
assertThat(result).isNotNull()
assertThat(result!!.result.data.toString()).isEqualTo("hello world this is cool")
assertThat(throwable).isNull()
assertThat(isComplete).isTrue()
}

@Test
fun genStreamError_receivesError() = runBlocking {
val input = mapOf("data" to "test error")
val function =
functions.getHttpsCallable("genStreamError").withTimeout(2000, TimeUnit.MILLISECONDS)
val subscriber = StreamSubscriber()

function.stream(input).subscribe(subscriber)

withTimeout(2000) {
while (subscriber.throwable == null) {
delay(100)
}
}

assertThat(subscriber.throwable).isNotNull()
assertThat(subscriber.throwable).isInstanceOf(FirebaseFunctionsException::class.java)
}

@Test
fun genStreamWeather_receivesWeatherForecasts() = runBlocking {
val inputData = listOf(mapOf("name" to "Toronto"), mapOf("name" to "London"))
val input = mapOf("data" to inputData)

val function = functions.getHttpsCallable("genStreamWeather")
val subscriber = StreamSubscriber()

function.stream(input).subscribe(subscriber)

while (!subscriber.isComplete) {
delay(100)
}

assertThat(subscriber.messages.map { it.message.data.toString() })
.containsExactly(
"{temperature=25, location={name=Toronto}, conditions=snowy}",
"{temperature=50, location={name=London}, conditions=rainy}"
)
assertThat(subscriber.result).isNotNull()
assertThat(subscriber.result!!.result.data.toString()).contains("forecasts")
assertThat(subscriber.throwable).isNull()
assertThat(subscriber.isComplete).isTrue()
}

@Test
fun genStreamEmpty_receivesNoMessages() = runBlocking {
val function = functions.getHttpsCallable("genStreamEmpty")
val subscriber = StreamSubscriber()

function.stream(mapOf("data" to "test")).subscribe(subscriber)

withTimeout(2000) { delay(500) }
assertThat(subscriber.messages).isEmpty()
assertThat(subscriber.result).isNull()
}

@Test
fun genStreamResultOnly_receivesOnlyResult() = runBlocking {
val function = functions.getHttpsCallable("genStreamResultOnly")
val subscriber = StreamSubscriber()

function.stream(mapOf("data" to "test")).subscribe(subscriber)

while (!subscriber.isComplete) {
delay(100)
}
assertThat(subscriber.messages).isEmpty()
assertThat(subscriber.result).isNotNull()
assertThat(subscriber.result!!.result.data.toString()).isEqualTo("Only a result")
}

@Test
fun genStreamLargeData_receivesMultipleChunks() = runBlocking {
val function = functions.getHttpsCallable("genStreamLargeData")
val subscriber = StreamSubscriber()

function.stream(mapOf("data" to "test large data")).subscribe(subscriber)

while (!subscriber.isComplete) {
delay(100)
}
assertThat(subscriber.messages).isNotEmpty()
assertThat(subscriber.messages.size).isEqualTo(10)
val receivedString =
subscriber.messages.joinToString(separator = "") { it.message.data.toString() }
val expectedString = "A".repeat(10000)
assertThat(receivedString.length).isEqualTo(10000)
assertThat(receivedString).isEqualTo(expectedString)
assertThat(subscriber.result).isNotNull()
assertThat(subscriber.result!!.result.data.toString()).isEqualTo("Stream Completed")
}
}
Loading
Loading