18
18
*/
19
19
//! HTTP transport and connection components
20
20
21
+ #[ cfg( all( target_arch = "wasm32" , any( feature = "native-tls" , feature = "rustls-tls" ) ) ) ]
22
+ compile_error ! ( "TLS features are not compatible with the wasm target" ) ;
23
+
21
24
#[ cfg( any( feature = "native-tls" , feature = "rustls-tls" ) ) ]
22
25
use crate :: auth:: ClientCertificate ;
23
26
#[ cfg( any( feature = "native-tls" , feature = "rustls-tls" ) ) ]
@@ -130,6 +133,8 @@ fn build_meta() -> String {
130
133
meta. push_str ( ",tls=n" ) ;
131
134
} else if cfg ! ( feature = "rustls-tls" ) {
132
135
meta. push_str ( ",tls=r" ) ;
136
+ } else if cfg ! ( target_arch = "wasm32" ) {
137
+ meta. push_str ( ",tls=w" ) ;
133
138
}
134
139
135
140
meta
@@ -138,15 +143,19 @@ fn build_meta() -> String {
138
143
/// Builds a HTTP transport to make API calls to Elasticsearch
139
144
pub struct TransportBuilder {
140
145
client_builder : reqwest:: ClientBuilder ,
141
- conn_pool : Box < dyn ConnectionPool > ,
146
+ conn_pool : Arc < dyn ConnectionPool > ,
142
147
credentials : Option < Credentials > ,
143
148
#[ cfg( any( feature = "native-tls" , feature = "rustls-tls" ) ) ]
144
149
cert_validation : Option < CertificateValidation > ,
150
+ #[ cfg( not( target_arch = "wasm32" ) ) ]
145
151
proxy : Option < Url > ,
152
+ #[ cfg( not( target_arch = "wasm32" ) ) ]
146
153
proxy_credentials : Option < Credentials > ,
154
+ #[ cfg( not( target_arch = "wasm32" ) ) ]
147
155
disable_proxy : bool ,
148
156
headers : HeaderMap ,
149
157
meta_header : bool ,
158
+ #[ cfg( not( target_arch = "wasm32" ) ) ]
150
159
timeout : Option < Duration > ,
151
160
}
152
161
@@ -159,15 +168,19 @@ impl TransportBuilder {
159
168
{
160
169
Self {
161
170
client_builder : reqwest:: ClientBuilder :: new ( ) ,
162
- conn_pool : Box :: new ( conn_pool) ,
171
+ conn_pool : Arc :: new ( conn_pool) ,
163
172
credentials : None ,
164
173
#[ cfg( any( feature = "native-tls" , feature = "rustls-tls" ) ) ]
165
174
cert_validation : None ,
175
+ #[ cfg( not( target_arch = "wasm32" ) ) ]
166
176
proxy : None ,
177
+ #[ cfg( not( target_arch = "wasm32" ) ) ]
167
178
proxy_credentials : None ,
179
+ #[ cfg( not( target_arch = "wasm32" ) ) ]
168
180
disable_proxy : false ,
169
181
headers : HeaderMap :: new ( ) ,
170
182
meta_header : true ,
183
+ #[ cfg( not( target_arch = "wasm32" ) ) ]
171
184
timeout : None ,
172
185
}
173
186
}
@@ -176,6 +189,7 @@ impl TransportBuilder {
176
189
///
177
190
/// An optional username and password will be used to set the
178
191
/// `Proxy-Authorization` header using Basic Authentication.
192
+ #[ cfg( not( target_arch = "wasm32" ) ) ]
179
193
pub fn proxy ( mut self , url : Url , username : Option < & str > , password : Option < & str > ) -> Self {
180
194
self . proxy = Some ( url) ;
181
195
if let Some ( u) = username {
@@ -189,6 +203,7 @@ impl TransportBuilder {
189
203
/// Whether to disable proxies, including system proxies.
190
204
///
191
205
/// NOTE: System proxies are enabled by default.
206
+ #[ cfg( not( target_arch = "wasm32" ) ) ]
192
207
pub fn disable_proxy ( mut self ) -> Self {
193
208
self . disable_proxy = true ;
194
209
self
@@ -241,6 +256,7 @@ impl TransportBuilder {
241
256
///
242
257
/// The timeout is applied from when the request starts connecting until the response body has finished.
243
258
/// Default is no timeout.
259
+ #[ cfg( not( target_arch = "wasm32" ) ) ]
244
260
pub fn timeout ( mut self , timeout : Duration ) -> Self {
245
261
self . timeout = Some ( timeout) ;
246
262
self
@@ -254,6 +270,7 @@ impl TransportBuilder {
254
270
client_builder = client_builder. default_headers ( self . headers ) ;
255
271
}
256
272
273
+ #[ cfg( not( target_arch = "wasm32" ) ) ]
257
274
if let Some ( t) = self . timeout {
258
275
client_builder = client_builder. timeout ( t) ;
259
276
}
@@ -300,6 +317,7 @@ impl TransportBuilder {
300
317
}
301
318
}
302
319
320
+ #[ cfg( not( target_arch = "wasm32" ) ) ]
303
321
if self . disable_proxy {
304
322
client_builder = client_builder. no_proxy ( ) ;
305
323
} else if let Some ( url) = self . proxy {
@@ -316,7 +334,7 @@ impl TransportBuilder {
316
334
let client = client_builder. build ( ) ?;
317
335
Ok ( Transport {
318
336
client,
319
- conn_pool : Arc :: new ( self . conn_pool ) ,
337
+ conn_pool : self . conn_pool ,
320
338
credentials : self . credentials ,
321
339
send_meta : self . meta_header ,
322
340
} )
@@ -363,7 +381,7 @@ impl Connection {
363
381
pub struct Transport {
364
382
client : reqwest:: Client ,
365
383
credentials : Option < Credentials > ,
366
- conn_pool : Arc < Box < dyn ConnectionPool > > ,
384
+ conn_pool : Arc < dyn ConnectionPool > ,
367
385
send_meta : bool ,
368
386
}
369
387
@@ -463,6 +481,7 @@ impl Transport {
463
481
headers : HeaderMap ,
464
482
query_string : Option < & Q > ,
465
483
body : Option < B > ,
484
+ #[ allow( unused_variables) ]
466
485
timeout : Option < Duration > ,
467
486
) -> Result < reqwest:: RequestBuilder , Error >
468
487
where
@@ -473,6 +492,7 @@ impl Transport {
473
492
let url = connection. url . join ( path. trim_start_matches ( '/' ) ) ?;
474
493
let mut request_builder = self . client . request ( reqwest_method, url) ;
475
494
495
+ #[ cfg( not( target_arch = "wasm32" ) ) ]
476
496
if let Some ( t) = timeout {
477
497
request_builder = request_builder. timeout ( t) ;
478
498
}
@@ -564,6 +584,47 @@ impl Transport {
564
584
) ?)
565
585
}
566
586
587
+ async fn reseed ( & self ) {
588
+ // Requests will execute against old connection pool during reseed
589
+ let connection = self . conn_pool . next ( ) ;
590
+
591
+ // Build node info request
592
+ let node_request = self . request_builder (
593
+ & connection,
594
+ Method :: Get ,
595
+ "_nodes/http?filter_path=nodes.*.http" ,
596
+ HeaderMap :: default ( ) ,
597
+ None :: < & ( ) > ,
598
+ None :: < ( ) > ,
599
+ None ,
600
+ ) . unwrap ( ) ;
601
+
602
+ let scheme = connection. url . scheme ( ) ;
603
+ let resp = node_request. send ( ) . await . unwrap ( ) ;
604
+ let json: Value = resp. json ( ) . await . unwrap ( ) ;
605
+ let connections: Vec < Connection > = json[ "nodes" ]
606
+ . as_object ( )
607
+ . unwrap ( )
608
+ . iter ( )
609
+ . map ( |( _, node) | {
610
+ let address = node[ "http" ] [ "publish_address" ]
611
+ . as_str ( )
612
+ . or_else ( || {
613
+ Some (
614
+ node[ "http" ] [ "bound_address" ] . as_array ( ) . unwrap ( ) [ 0 ]
615
+ . as_str ( )
616
+ . unwrap ( ) ,
617
+ )
618
+ } )
619
+ . unwrap ( ) ;
620
+ let url = Self :: parse_to_url ( address, scheme) . unwrap ( ) ;
621
+ Connection :: new ( url)
622
+ } )
623
+ . collect ( ) ;
624
+
625
+ self . conn_pool . reseed ( connections) ;
626
+ }
627
+
567
628
/// Creates an asynchronous request that can be awaited
568
629
pub async fn send < B , Q > (
569
630
& self ,
@@ -578,47 +639,19 @@ impl Transport {
578
639
B : Body ,
579
640
Q : Serialize + ?Sized ,
580
641
{
581
- // Requests will execute against old connection pool during reseed
582
642
if self . conn_pool . reseedable ( ) {
583
- let conn_pool = self . conn_pool . clone ( ) ;
584
- let connection = conn_pool. next ( ) ;
585
-
586
- // Build node info request
587
- let node_request = self . request_builder (
588
- & connection,
589
- Method :: Get ,
590
- "_nodes/http?filter_path=nodes.*.http" ,
591
- headers. clone ( ) ,
592
- None :: < & Q > ,
593
- None :: < B > ,
594
- timeout,
595
- ) ?;
596
-
597
- tokio:: spawn ( async move {
598
- let scheme = connection. url . scheme ( ) ;
599
- let resp = node_request. send ( ) . await . unwrap ( ) ;
600
- let json: Value = resp. json ( ) . await . unwrap ( ) ;
601
- let connections: Vec < Connection > = json[ "nodes" ]
602
- . as_object ( )
603
- . unwrap ( )
604
- . iter ( )
605
- . map ( |( _, node) | {
606
- let address = node[ "http" ] [ "publish_address" ]
607
- . as_str ( )
608
- . or_else ( || {
609
- Some (
610
- node[ "http" ] [ "bound_address" ] . as_array ( ) . unwrap ( ) [ 0 ]
611
- . as_str ( )
612
- . unwrap ( ) ,
613
- )
614
- } )
615
- . unwrap ( ) ;
616
- let url = Self :: parse_to_url ( address, scheme) . unwrap ( ) ;
617
- Connection :: new ( url)
618
- } )
619
- . collect ( ) ;
620
- conn_pool. reseed ( connections) ;
621
- } ) ;
643
+ #[ cfg( not( target_arch = "wasm32" ) ) ]
644
+ {
645
+ let transport = self . clone ( ) ;
646
+ tokio:: spawn ( async move { transport. reseed ( ) . await } ) ;
647
+ }
648
+ #[ cfg( target_arch = "wasm32" ) ]
649
+ {
650
+ // Reseed synchronously (i.e. do not spawn a background task) in WASM.
651
+ // Running in the background is platform-dependent (web-sys / wasi), we'll
652
+ // address this if synchronous reseed is an issue.
653
+ self . reseed ( ) . await
654
+ }
622
655
}
623
656
624
657
let connection = self . conn_pool . next ( ) ;
0 commit comments