1
1
"""
2
2
FetchNode Module
3
3
"""
4
- import pandas as pd
4
+
5
5
import json
6
6
from typing import List , Optional
7
- from langchain_community . document_loaders import AsyncChromiumLoader
8
- from langchain_core . documents import Document
7
+
8
+ import pandas as pd
9
9
from langchain_community .document_loaders import PyPDFLoader
10
- from .base_node import BaseNode
10
+ from langchain_core .documents import Document
11
+
12
+ from ..docloaders import ChromiumLoader
11
13
from ..utils .remover import remover
14
+ from .base_node import BaseNode
12
15
13
16
14
17
class FetchNode (BaseNode ):
15
18
"""
16
19
A node responsible for fetching the HTML content of a specified URL and updating
17
- the graph's state with this content. It uses the AsyncChromiumLoader to fetch the
18
- content asynchronously.
20
+ the graph's state with this content. It uses ChromiumLoader to fetch
21
+ the content from a web page asynchronously (with proxy protection) .
19
22
20
23
This node acts as a starting point in many scraping workflows, preparing the state
21
24
with the necessary HTML content for further processing by subsequent nodes in the graph.
@@ -31,13 +34,21 @@ class FetchNode(BaseNode):
31
34
node_name (str): The unique identifier name for the node, defaulting to "Fetch".
32
35
"""
33
36
34
- def __init__ (self , input : str , output : List [str ], node_config : Optional [dict ] = None , node_name : str = "Fetch" ):
37
+ def __init__ (
38
+ self ,
39
+ input : str ,
40
+ output : List [str ],
41
+ node_config : Optional [dict ] = None ,
42
+ node_name : str = "Fetch" ,
43
+ ):
35
44
super ().__init__ (node_name , "node" , input , output , 1 )
36
45
37
- self .headless = True if node_config is None else node_config .get (
38
- "headless" , True )
39
- self .verbose = False if node_config is None else node_config .get (
40
- "verbose" , False )
46
+ self .headless = (
47
+ True if node_config is None else node_config .get ("headless" , True )
48
+ )
49
+ self .verbose = (
50
+ False if node_config is None else node_config .get ("verbose" , False )
51
+ )
41
52
42
53
def execute (self , state ):
43
54
"""
@@ -64,10 +75,14 @@ def execute(self, state):
64
75
input_data = [state [key ] for key in input_keys ]
65
76
66
77
source = input_data [0 ]
67
- if self .input == "json_dir" or self .input == "xml_dir" or self .input == "csv_dir" :
68
- compressed_document = [Document (page_content = source , metadata = {
69
- "source" : "local_dir"
70
- })]
78
+ if (
79
+ self .input == "json_dir"
80
+ or self .input == "xml_dir"
81
+ or self .input == "csv_dir"
82
+ ):
83
+ compressed_document = [
84
+ Document (page_content = source , metadata = {"source" : "local_dir" })
85
+ ]
71
86
# if it is a local directory
72
87
73
88
# handling for pdf
@@ -76,45 +91,42 @@ def execute(self, state):
76
91
compressed_document = loader .load ()
77
92
78
93
elif self .input == "csv" :
79
- compressed_document = [Document (page_content = str (pd .read_csv (source )), metadata = {
80
- "source" : "csv"
81
- })]
94
+ compressed_document = [
95
+ Document (
96
+ page_content = str (pd .read_csv (source )), metadata = {"source" : "csv" }
97
+ )
98
+ ]
82
99
elif self .input == "json" :
83
100
f = open (source )
84
- compressed_document = [Document ( page_content = str ( json . load ( f )), metadata = {
85
- "source" : "json"
86
- }) ]
101
+ compressed_document = [
102
+ Document ( page_content = str ( json . load ( f )), metadata = { "source" : "json" })
103
+ ]
87
104
elif self .input == "xml" :
88
- with open (source , 'r' , encoding = ' utf-8' ) as f :
105
+ with open (source , "r" , encoding = " utf-8" ) as f :
89
106
data = f .read ()
90
- compressed_document = [Document ( page_content = data , metadata = {
91
- "source" : "xml"
92
- }) ]
107
+ compressed_document = [
108
+ Document ( page_content = data , metadata = { "source" : "xml" })
109
+ ]
93
110
elif self .input == "pdf_dir" :
94
111
pass
95
112
96
113
elif not source .startswith ("http" ):
97
- compressed_document = [Document ( page_content = remover ( source ), metadata = {
98
- "source" : "local_dir"
99
- }) ]
114
+ compressed_document = [
115
+ Document ( page_content = remover ( source ), metadata = { "source" : "local_dir" })
116
+ ]
100
117
101
118
else :
102
- if self . node_config is not None and self . node_config . get ( "endpoint" ) is not None :
119
+ loader_kwargs = {}
103
120
104
- loader = AsyncChromiumLoader (
105
- [source ],
106
- proxies = {"http" : self .node_config ["endpoint" ]},
107
- headless = self .headless ,
108
- )
109
- else :
110
- loader = AsyncChromiumLoader (
111
- [source ],
112
- headless = self .headless ,
113
- )
121
+ if self .node_config is not None :
122
+ loader_kwargs = self .node_config .get ("loader_kwargs" , {})
123
+
124
+ loader = ChromiumLoader ([source ], headless = self .headless , ** loader_kwargs )
114
125
115
126
document = loader .load ()
116
127
compressed_document = [
117
- Document (page_content = remover (str (document [0 ].page_content )))]
128
+ Document (page_content = remover (str (document [0 ].page_content )))
129
+ ]
118
130
119
131
state .update ({self .output [0 ]: compressed_document })
120
132
return state
0 commit comments