1
1
"""
2
2
ParseNode Module
3
3
"""
4
- from typing import List , Optional
4
+ import re
5
+ from typing import List , Optional , Tuple
6
+ from urllib .parse import urljoin
5
7
from langchain_community .document_transformers import Html2TextTransformer
6
8
from langchain_core .documents import Document
7
9
from .base_node import BaseNode
8
10
from ..utils .split_text_into_chunks import split_text_into_chunks
11
+ from ..helpers import default_filters
9
12
10
13
class ParseNode (BaseNode ):
11
14
"""
@@ -40,6 +43,9 @@ def __init__(
40
43
self .parse_html = (
41
44
True if node_config is None else node_config .get ("parse_html" , True )
42
45
)
46
+ self .parse_urls = (
47
+ False if node_config is None else node_config .get ("parse_urls" , False )
48
+ )
43
49
44
50
self .llm_model = node_config .get ("llm_model" )
45
51
self .chunk_size = node_config .get ("chunk_size" )
@@ -66,16 +72,21 @@ def execute(self, state: dict) -> dict:
66
72
67
73
input_data = [state [key ] for key in input_keys ]
68
74
docs_transformed = input_data [0 ]
75
+ source = input_data [1 ] if self .parse_urls else None
69
76
70
77
if self .parse_html :
71
78
docs_transformed = Html2TextTransformer (ignore_links = False ).transform_documents (input_data [0 ])
72
79
docs_transformed = docs_transformed [0 ]
73
80
81
+ link_urls , img_urls = self ._extract_urls (docs_transformed .page_content , source )
82
+
74
83
chunks = split_text_into_chunks (text = docs_transformed .page_content ,
75
84
chunk_size = self .chunk_size - 250 , model = self .llm_model )
76
85
else :
77
86
docs_transformed = docs_transformed [0 ]
78
87
88
+ link_urls , img_urls = self ._extract_urls (docs_transformed .page_content , source )
89
+
79
90
chunk_size = self .chunk_size
80
91
chunk_size = min (chunk_size - 500 , int (chunk_size * 0.9 ))
81
92
@@ -89,5 +100,57 @@ def execute(self, state: dict) -> dict:
89
100
model = self .llm_model )
90
101
91
102
state .update ({self .output [0 ]: chunks })
103
+ if self .parse_urls :
104
+ state .update ({self .output [1 ]: link_urls })
105
+ state .update ({self .output [2 ]: img_urls })
92
106
93
107
return state
108
+
109
+ def _extract_urls (self , text : str , source : str ) -> Tuple [List [str ], List [str ]]:
110
+ """
111
+ Extracts URLs from the given text.
112
+
113
+ Args:
114
+ text (str): The text to extract URLs from.
115
+
116
+ Returns:
117
+ Tuple[List[str], List[str]]: A tuple containing the extracted link URLs and image URLs.
118
+ """
119
+ if not self .parse_urls :
120
+ return [], []
121
+
122
+ image_extensions = default_filters .filter_dict ["img_exts" ]
123
+ image_extension_seq = '|' .join (image_extensions ).replace ('.' ,'' )
124
+ url_pattern = re .compile (r'(https?://[^\s]+|\S+\.(?:' + image_extension_seq + '))' )
125
+
126
+ all_urls = url_pattern .findall (text )
127
+ all_urls = self ._clean_urls (all_urls )
128
+
129
+ if not source .startswith ("http" ):
130
+ all_urls = [url for url in all_urls if url .startswith ("http" )]
131
+ else :
132
+ all_urls = [urljoin (source , url ) for url in all_urls ]
133
+
134
+ images = [url for url in all_urls if any (url .endswith (ext ) for ext in image_extensions )]
135
+ links = [url for url in all_urls if url not in images ]
136
+
137
+ return links , images
138
+
139
+ def _clean_urls (self , urls : List [str ]) -> List [str ]:
140
+ """
141
+ Cleans the URLs extracted from the text.
142
+
143
+ Args:
144
+ urls (List[str]): The list of URLs to clean.
145
+
146
+ Returns:
147
+ List[str]: The cleaned URLs.
148
+ """
149
+ cleaned_urls = []
150
+ for url in urls :
151
+ url = re .sub (r'.*?\]\(' , '' , url )
152
+ url = url .rstrip (').' )
153
+
154
+ cleaned_urls .append (url )
155
+
156
+ return cleaned_urls
0 commit comments