1
- """
2
- generate answer from image module
3
- """
4
1
import base64
2
+ import asyncio
5
3
from typing import List , Optional
6
- import requests
4
+ import aiohttp
7
5
from .base_node import BaseNode
8
6
from ..utils .logging import get_logger
9
7
@@ -22,10 +20,46 @@ def __init__(
22
20
):
23
21
super ().__init__ (node_name , "node" , input , output , 2 , node_config )
24
22
25
- def execute (self , state : dict ) -> dict :
23
+ async def process_image (self , session , api_key , image_data , user_prompt ):
24
+ # Convert image data to base64
25
+ base64_image = base64 .b64encode (image_data ).decode ('utf-8' )
26
+
27
+ headers = {
28
+ "Content-Type" : "application/json" ,
29
+ "Authorization" : f"Bearer { api_key } "
30
+ }
31
+
32
+ payload = {
33
+ "model" : self .node_config ["config" ]["llm" ]["model" ],
34
+ "messages" : [
35
+ {
36
+ "role" : "user" ,
37
+ "content" : [
38
+ {
39
+ "type" : "text" ,
40
+ "text" : user_prompt
41
+ },
42
+ {
43
+ "type" : "image_url" ,
44
+ "image_url" : {
45
+ "url" : f"data:image/jpeg;base64,{ base64_image } "
46
+ }
47
+ }
48
+ ]
49
+ }
50
+ ],
51
+ "max_tokens" : 300
52
+ }
53
+
54
+ async with session .post ("https://api.openai.com/v1/chat/completions" ,
55
+ headers = headers , json = payload ) as response :
56
+ result = await response .json ()
57
+ return result .get ('choices' , [{}])[0 ].get ('message' , {}).get ('content' , 'No response' )
58
+
59
+ async def execute_async (self , state : dict ) -> dict :
26
60
"""
27
61
Processes images from the state, generates answers,
28
- consolidates the results, and updates the state.
62
+ consolidates the results, and updates the state asynchronously .
29
63
"""
30
64
self .logger .info (f"--- Executing { self .node_name } Node ---" )
31
65
@@ -39,54 +73,27 @@ def execute(self, state: dict) -> dict:
39
73
is not supported. Supported models are:
40
74
{ ', ' .join (supported_models )} .""" )
41
75
42
- if self .node_config ["config" ]["llm" ]["model" ].startswith ("gpt" ):
43
- api_key = self .node_config .get ("config" , {}).get ("llm" , {}).get ("api_key" , "" )
76
+ api_key = self .node_config .get ("config" , {}).get ("llm" , {}).get ("api_key" , "" )
44
77
45
- for image_data in images :
46
- base64_image = base64 .b64encode (image_data ).decode ('utf-8' )
78
+ async with aiohttp .ClientSession () as session :
79
+ tasks = [
80
+ self .process_image (session , api_key , image_data ,
81
+ state .get ("user_prompt" , "Extract information from the image" ))
82
+ for image_data in images
83
+ ]
47
84
48
- headers = {
49
- "Content-Type" : "application/json" ,
50
- "Authorization" : f"Bearer { api_key } "
51
- }
52
-
53
- payload = {
54
- "model" : self .node_config ["config" ]["llm" ]["model" ],
55
- "messages" : [
56
- {
57
- "role" : "user" ,
58
- "content" : [
59
- {
60
- "type" : "text" ,
61
- "text" : state .get ("user_prompt" ,
62
- "Extract information from the image" )
63
- },
64
- {
65
- "type" : "image_url" ,
66
- "image_url" : {
67
- "url" : f"data:image/jpeg;base64,{ base64_image } "
68
- }
69
- }
70
- ]
71
- }
72
- ],
73
- "max_tokens" : 300
74
- }
85
+ analyses = await asyncio .gather (* tasks )
75
86
76
- response = requests .post ("https://api.openai.com/v1/chat/completions" ,
77
- headers = headers ,
78
- json = payload ,
79
- timeout = 10 )
80
- result = response .json ()
87
+ consolidated_analysis = " " .join (analyses )
81
88
82
- response_text = result . get ( 'choices' ,
83
- [{}])[ 0 ]. get ( 'message' , {}). get ( 'content' , 'No response' )
84
- analyses . append ( response_text )
89
+ state [ 'answer' ] = {
90
+ "consolidated_analysis" : consolidated_analysis
91
+ }
85
92
86
- consolidated_analysis = " " . join ( analyses )
93
+ return state
87
94
88
- state [ 'answer' ] = {
89
- "consolidated_analysis" : consolidated_analysis
90
- }
91
-
92
- return state
95
+ def execute ( self , state : dict ) -> dict :
96
+ """
97
+ Wrapper to run the asynchronous execute_async function in a synchronous context.
98
+ """
99
+ return asyncio . run ( self . execute_async ( state ))
0 commit comments