1
1
"""
2
- FetchNodelevelK Module
2
+ FetchNodeLevelK Module
3
3
"""
4
4
from typing import List , Optional
5
5
from .base_node import BaseNode
6
+ from ..docloaders import ChromiumLoader
7
+ from ..utils .cleanup_html import cleanup_html
8
+ from ..utils .convert_to_md import convert_to_md
9
+ from langchain_core .documents import Document
10
+ from bs4 import BeautifulSoup
11
+ from urllib .parse import quote , urljoin
6
12
7
- class FetchNodelevelK (BaseNode ):
13
+ class FetchNodeLevelK (BaseNode ):
8
14
"""
9
- A node responsible for compressing the input tokens and storing the document
10
- in a vector database for retrieval. Relevant chunks are stored in the state.
11
-
12
- It allows scraping of big documents without exceeding the token limit of the language model .
15
+ A node responsible for fetching the HTML content of a specified URL and all its sub-links
16
+ recursively up to a certain level of hyperlink the graph. This content is then used to update
17
+ the graph's state. It uses ChromiumLoader to fetch the content from a web page asynchronously
18
+ (with proxy protection) .
13
19
14
20
Attributes:
15
21
llm_model: An instance of a language model client, configured for generating answers.
@@ -27,16 +33,158 @@ def __init__(
27
33
input : str ,
28
34
output : List [str ],
29
35
node_config : Optional [dict ] = None ,
30
- node_name : str = "RAG " ,
36
+ node_name : str = "FetchLevelK " ,
31
37
):
32
38
super ().__init__ (node_name , "node" , input , output , 2 , node_config )
33
-
34
- self .llm_model = node_config ["llm_model" ]
39
+
35
40
self .embedder_model = node_config .get ("embedder_model" , None )
41
+
36
42
self .verbose = (
37
43
False if node_config is None else node_config .get ("verbose" , False )
38
44
)
45
+
39
46
self .cache_path = node_config .get ("cache_path" , False )
47
+
48
+ self .headless = (
49
+ True if node_config is None else node_config .get ("headless" , True )
50
+ )
51
+
52
+ self .loader_kwargs = (
53
+ {} if node_config is None else node_config .get ("loader_kwargs" , {})
54
+ )
55
+
56
+ self .browser_base = (
57
+ None if node_config is None else node_config .get ("browser_base" , None )
58
+ )
59
+
60
+ self .depth = (
61
+ 1 if node_config is None else node_config .get ("depth" , 1 )
62
+ )
63
+
64
+ self .only_inside_links = (
65
+ False if node_config is None else node_config .get ("only_inside_links" , False )
66
+ )
67
+
68
+ self .min_input_len = 1
40
69
41
70
def execute (self , state : dict ) -> dict :
42
- pass
71
+ """
72
+ Executes the node's logic to fetch the HTML content of a specified URL and all its sub-links
73
+ and update the graph's state with the content.
74
+
75
+ Args:
76
+ state (dict): The current state of the graph. The input keys will be used
77
+ to fetch the correct data types from the state.
78
+
79
+ Returns:
80
+ dict: The updated state with a new output key containing the fetched HTML content.
81
+
82
+ Raises:
83
+ KeyError: If the input key is not found in the state, indicating that the
84
+ necessary information to perform the operation is missing.
85
+ """
86
+
87
+ self .logger .info (f"--- Executing { self .node_name } Node ---" )
88
+
89
+ # Interpret input keys based on the provided input expression
90
+ input_keys = self .get_input_keys (state )
91
+ # Fetching data from the state based on the input keys
92
+ input_data = [state [key ] for key in input_keys ]
93
+
94
+ source = input_data [0 ]
95
+
96
+ documents = [{"source" : source }]
97
+
98
+ loader_kwargs = {}
99
+
100
+ if self .node_config is not None :
101
+ loader_kwargs = self .node_config .get ("loader_kwargs" , {})
102
+
103
+ for _ in range (self .depth ):
104
+ documents = self .obtain_content (documents , loader_kwargs )
105
+
106
+ filtered_documents = [doc for doc in documents if 'document' in doc ]
107
+
108
+ state .update ({self .output [0 ]: filtered_documents })
109
+
110
+ return state
111
+
112
+ def fetch_content (self , source : str , loader_kwargs ) -> Optional [str ]:
113
+ self .logger .info (f"--- (Fetching HTML from: { source } ) ---" )
114
+
115
+ if self .browser_base is not None :
116
+ try :
117
+ from ..docloaders .browser_base import browser_base_fetch
118
+ except ImportError :
119
+ raise ImportError ("""The browserbase module is not installed.
120
+ Please install it using `pip install browserbase`.""" )
121
+
122
+ data = browser_base_fetch (self .browser_base .get ("api_key" ),
123
+ self .browser_base .get ("project_id" ), [source ])
124
+
125
+ document = [Document (page_content = content ,
126
+ metadata = {"source" : source }) for content in data ]
127
+
128
+ else :
129
+ loader = ChromiumLoader ([source ], headless = self .headless , ** loader_kwargs )
130
+
131
+ document = loader .load ()
132
+
133
+ return document
134
+
135
+ def extract_links (self , html_content : str ) -> list :
136
+ soup = BeautifulSoup (html_content , 'html.parser' )
137
+ links = [link ['href' ] for link in soup .find_all ('a' , href = True )]
138
+ self .logger .info (f"Extracted { len (links )} links." )
139
+ return links
140
+
141
+ def get_full_links (self , base_url : str , links : list ) -> list :
142
+ full_links = []
143
+ for link in links :
144
+ if self .only_inside_links and link .startswith ("http" ):
145
+ continue
146
+ full_link = link if link .startswith ("http" ) else urljoin (base_url , link )
147
+ full_links .append (full_link )
148
+ return full_links
149
+
150
+ def obtain_content (self , documents : List , loader_kwargs ) -> List :
151
+ new_documents = []
152
+ for doc in documents :
153
+ source = doc ['source' ]
154
+ if 'document' not in doc :
155
+ document = self .fetch_content (source , loader_kwargs )
156
+
157
+ if not document or not document [0 ].page_content .strip ():
158
+ self .logger .warning (f"Failed to fetch content for { source } " )
159
+ documents .remove (doc )
160
+ continue
161
+
162
+ #doc['document'] = document[0].page_content
163
+ doc ['document' ] = document
164
+
165
+ links = self .extract_links (doc ['document' ][0 ].page_content )
166
+ full_links = self .get_full_links (source , links )
167
+
168
+ # Check if the links are already present in other documents
169
+ for link in full_links :
170
+ # Check if any document is from the same link
171
+ if not any (d .get ('source' , '' ) == link for d in documents ) and not any (d .get ('source' , '' ) == link for d in new_documents ):
172
+ # Add the document
173
+ new_documents .append ({"source" : link })
174
+
175
+ documents .extend (new_documents )
176
+ return documents
177
+
178
+ def process_links (self , base_url : str , links : list , loader_kwargs , depth : int , current_depth : int = 1 ) -> dict :
179
+ content_dict = {}
180
+ for idx , link in enumerate (links , start = 1 ):
181
+ full_link = link if link .startswith ("http" ) else urljoin (base_url , link )
182
+ self .logger .info (f"Processing link { idx } : { full_link } " )
183
+ link_content = self .fetch_content (full_link , loader_kwargs )
184
+
185
+ if current_depth < depth :
186
+ new_links = self .extract_links (link_content )
187
+ content_dict .update (self .process_links (full_link , new_links , depth , current_depth + 1 ))
188
+ else :
189
+ self .logger .warning (f"Failed to fetch content for { full_link } " )
190
+ return content_dict
0 commit comments