Skip to content

Utility that converts async stream to sync stream #8162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 4, 2025

Conversation

chenmoneygithub
Copy link
Collaborator

Introduce a util function dspy.streaming.apply_sync_streaming() which converts the async stream generator to a sync generator, for ease of use in scenarios that the caller is sync function. The implementation spins up a child thread to consume the async generator and write to a queue to create the sync generator.

Sample usage:

import dspy
from dspy.streaming import StatusMessage
from dspy.streaming.streamify import apply_sync_streaming

lm = dspy.LM("openai/gpt-4o-mini", cache=False)
dspy.settings.configure(lm=lm)


class MyModule(dspy.Module):
    def __init__(self):
        super().__init__()

        self.predict1 = dspy.Predict("question->answer")
        self.predict2 = dspy.Predict("answer->simplified_answer")

    def forward(self, question: str, **kwargs):
        answer = self.predict1(question=question)
        simplified_answer = self.predict2(answer=answer)
        return simplified_answer


predict = MyModule()
stream_listeners = [
    dspy.streaming.StreamListener(signature_field_name="answer"),
    dspy.streaming.StreamListener(signature_field_name="simplified_answer"),
]
stream_predict = dspy.streamify(
    predict, stream_listeners=stream_listeners, include_final_prediction_in_output_stream=False
)
output = stream_predict(question="why did a chicken cross the kitchen?")
sync_output = apply_sync_streaming(output)

import time

start = time.time()

for value in sync_output:
    if isinstance(value, StatusMessage):
        print(value.message)
    elif isinstance(value, dspy.Prediction):
        print(value)
    elif isinstance(value, dspy.streaming.StreamResponse):
        print(value)

end = time.time()
print(f"Time taken: {end - start} seconds")

Sample output:

StreamResponse(predict_name='predict1', signature_field_name='answer', chunk='To')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' get')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' to')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' the')
StreamResponse(predict_name='predict1', signature_field_name='answer', chunk=' other side of the recipe!')
StreamResponse(predict_name='predict2', signature_field_name='simplified_answer', chunk='The')
StreamResponse(predict_name='predict2', signature_field_name='simplified_answer', chunk=' joke')
StreamResponse(predict_name='predict2', signature_field_name='simplified_answer', chunk=' is')
StreamResponse(predict_name='predict2', signature_field_name='simplified_answer', chunk=' that')
StreamResponse(predict_name='predict2', signature_field_name='simplified_answer', chunk=' the')
StreamResponse(predict_name='predict2', signature_field_name='simplified_answer', chunk=' chicken')
StreamResponse(predict_name='predict2', signature_field_name='simplified_answer', chunk=' crossed')
StreamResponse(predict_name='predict2', signature_field_name='simplified_answer', chunk=' the')
StreamResponse(predict_name='predict2', signature_field_name='simplified_answer', chunk=' road')
StreamResponse(predict_name='predict2', signature_field_name='simplified_answer', chunk=' to')
StreamResponse(predict_name='predict2', signature_field_name='simplified_answer', chunk=' reach')
StreamResponse(predict_name='predict2', signature_field_name='simplified_answer', chunk=' the')
StreamResponse(predict_name='predict2', signature_field_name='simplified_answer', chunk=' other side of the recipe!')
Time taken: 2.294755697250366 seconds

@chenmoneygithub chenmoneygithub requested a review from okhat May 2, 2025 18:57
@okhat okhat merged commit d89028f into stanfordnlp:main May 4, 2025
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants