25
25
import co .elastic .clients .elasticsearch ._helpers .esql .jdbc .ResultSetEsqlAdapter ;
26
26
import co .elastic .clients .elasticsearch ._helpers .esql .objects .ObjectsEsqlAdapter ;
27
27
import co .elastic .clients .json .jackson .JacksonJsonpMapper ;
28
+ import co .elastic .clients .transport .rest5_client .Rest5ClientTransport ;
29
+ import co .elastic .clients .transport .rest5_client .low_level .ESRequest ;
30
+ import co .elastic .clients .transport .rest5_client .low_level .Rest5Client ;
28
31
import co .elastic .clients .transport .rest_client .RestClientTransport ;
29
32
import com .fasterxml .jackson .annotation .JsonIgnoreProperties ;
30
33
import com .fasterxml .jackson .databind .PropertyNamingStrategies ;
@@ -55,26 +58,42 @@ public class EsqlAdapterEndToEndTest extends Assertions {
55
58
@ BeforeAll
56
59
public static void setup () throws Exception {
57
60
ElasticsearchClient global = ElasticsearchTestServer .global ().client ();
58
- RestClient restClient = ((RestClientTransport ) global ._transport ()).restClient ();
59
- esClient = new ElasticsearchClient (new RestClientTransport (restClient , new JacksonJsonpMapper ()));
61
+ if (global ._transport () instanceof RestClientTransport ) {
62
+ RestClient restClient = ((RestClientTransport ) global ._transport ()).restClient ();
63
+ esClient = new ElasticsearchClient (new RestClientTransport (restClient , new JacksonJsonpMapper ()));
60
64
61
- esClient .indices ().delete (d -> d .index ("employees" ).ignoreUnavailable (true ));
65
+ esClient .indices ().delete (d -> d .index ("employees" ).ignoreUnavailable (true ));
62
66
63
- Request request = new Request ("POST" , "/employees/_bulk?refresh=true" );
67
+ Request request = new Request ("POST" , "/employees/_bulk?refresh=true" );
64
68
65
- InputStream resourceAsStream = EsqlAdapterTest .class .getResourceAsStream ("employees.ndjson" );
66
- byte [] bytes = IOUtils .toByteArray (resourceAsStream );
67
- request .setEntity (new ByteArrayEntity (bytes , ContentType .APPLICATION_JSON ));
69
+ InputStream resourceAsStream = EsqlAdapterTest .class .getResourceAsStream ("employees.ndjson" );
70
+ byte [] bytes = IOUtils .toByteArray (resourceAsStream );
71
+ request .setEntity (new ByteArrayEntity (bytes , ContentType .APPLICATION_JSON ));
68
72
69
- restClient .performRequest (request );
73
+ restClient .performRequest (request );
74
+ } else if (global ._transport () instanceof Rest5ClientTransport ) {
75
+ Rest5Client restClient = ((Rest5ClientTransport ) global ._transport ()).restClient ();
76
+ esClient = new ElasticsearchClient (new Rest5ClientTransport (restClient , new JacksonJsonpMapper ()));
77
+
78
+ esClient .indices ().delete (d -> d .index ("employees" ).ignoreUnavailable (true ));
79
+
80
+ ESRequest request = new ESRequest ("POST" , "/employees/_bulk?refresh=true" );
81
+
82
+ InputStream resourceAsStream = EsqlAdapterTest .class .getResourceAsStream ("employees.ndjson" );
83
+ byte [] bytes = IOUtils .toByteArray (resourceAsStream );
84
+ request .setEntity (new org .apache .hc .core5 .http .io .entity .ByteArrayEntity (bytes , org .apache .hc .core5 .http .ContentType .APPLICATION_JSON ));
85
+
86
+ restClient .performRequest (request );
87
+ }
70
88
}
71
89
72
90
@ Test
73
91
public void resultSetTest () throws Exception {
74
92
75
93
ResultSet rs = esClient .esql ().query (
76
94
ResultSetEsqlAdapter .INSTANCE ,
77
- "FROM employees | WHERE emp_no == ? or emp_no == ? | KEEP emp_no, job_positions, hire_date | SORT emp_no | LIMIT 300" ,
95
+ "FROM employees | WHERE emp_no == ? or emp_no == ? | KEEP emp_no, job_positions, hire_date | " +
96
+ "SORT emp_no | LIMIT 300" ,
78
97
// Testing parameters. Note that FROM and LIMIT do not accept parameters
79
98
"10042" , "10002"
80
99
);
@@ -116,7 +135,8 @@ public void resultSetTest() throws Exception {
116
135
public void objectsTest () throws Exception {
117
136
Iterable <EmpData > result = esClient .esql ().query (
118
137
ObjectsEsqlAdapter .of (EmpData .class ),
119
- "FROM employees | WHERE emp_no == ? or emp_no == ? | KEEP emp_no, job_positions, hire_date | SORT emp_no | LIMIT 300" ,
138
+ "FROM employees | WHERE emp_no == ? or emp_no == ? | KEEP emp_no, job_positions, hire_date | " +
139
+ "SORT emp_no | LIMIT 300" ,
120
140
// Testing parameters. Note that FROM and LIMIT do not accept parameters
121
141
"10042" , "10002"
122
142
);
@@ -152,12 +172,14 @@ public void objectsTest() throws Exception {
152
172
@ Test
153
173
public void asyncObjects () throws Exception {
154
174
155
- ElasticsearchAsyncClient asyncClient = new ElasticsearchAsyncClient (esClient ._transport (), esClient ._transportOptions ());
175
+ ElasticsearchAsyncClient asyncClient = new ElasticsearchAsyncClient (esClient ._transport (),
176
+ esClient ._transportOptions ());
156
177
157
178
158
179
CompletableFuture <Iterable <EmpData >> future = asyncClient .esql ().query (
159
180
ObjectsEsqlAdapter .of (EmpData .class ),
160
- "FROM employees | WHERE emp_no == ? or emp_no == ? | KEEP emp_no, job_positions, hire_date | SORT emp_no | LIMIT 300" ,
181
+ "FROM employees | WHERE emp_no == ? or emp_no == ? | KEEP emp_no, job_positions, hire_date | " +
182
+ "SORT emp_no | LIMIT 300" ,
161
183
// Testing parameters. Note that FROM and LIMIT do not accept parameters
162
184
"10042" , "10002"
163
185
);
@@ -169,7 +191,8 @@ public void asyncObjects() throws Exception {
169
191
EmpData emp = it .next ();
170
192
assertEquals ("10002" , emp .empNo );
171
193
List <String > jobPositions = emp .jobPositions ;
172
- // In addition to the value, this tests that single strings are correctly deserialized as a list
194
+ // In addition to the value, this tests that single strings are correctly deserialized
195
+ // as a list
173
196
assertEquals (Arrays .asList ("Senior Team Lead" ), emp .jobPositions );
174
197
}
175
198
@@ -183,7 +206,8 @@ public void asyncObjects() throws Exception {
183
206
assertTrue (emp .jobPositions .contains ("Junior Developer" ));
184
207
185
208
assertEquals ("1993-03-21T00:00:00Z[UTC]" ,
186
- DateTimeFormatter .ISO_DATE_TIME .format (emp .hireDate .toInstant ().atZone (ZoneId .of ("UTC" )))
209
+ DateTimeFormatter .ISO_DATE_TIME .format (emp .hireDate .toInstant ().atZone (ZoneId .of (
210
+ "UTC" )))
187
211
);
188
212
}
189
213
0 commit comments