@@ -18,15 +18,17 @@ package discovery
18
18
import (
19
19
"context"
20
20
"fmt"
21
+ "io"
21
22
"net"
23
+ "net/http"
24
+ "net/url"
22
25
"strconv"
23
26
"time"
24
27
25
28
"github.com/haproxytech/client-native/v4/configuration"
26
29
"github.com/haproxytech/client-native/v4/models"
27
30
"github.com/haproxytech/dataplaneapi/log"
28
-
29
- "github.com/hashicorp/consul/api"
31
+ jsoniter "github.com/json-iterator/go"
30
32
)
31
33
32
34
type consulService struct {
@@ -54,7 +56,7 @@ func (c *consulService) GetServers() []configuration.ServiceServer {
54
56
55
57
type consulInstance struct {
56
58
params * models.Consul
57
- api * api .Client
59
+ httpClient * http .Client
58
60
discoveryConfig * ServiceDiscoveryInstance
59
61
prevIndexes map [string ]uint64
60
62
timeout time.Duration
@@ -75,15 +77,9 @@ func (c *consulInstance) start() error {
75
77
}
76
78
77
79
func (c * consulInstance ) setAPIClient () error {
78
- address := net .JoinHostPort (* c .params .Address , strconv .FormatInt (* c .params .Port , 10 ))
79
- consulConfig := api .DefaultConfig ()
80
- consulConfig .Address = address
81
- consulConfig .Token = c .params .Token
82
- cc , err := api .NewClient (consulConfig )
83
- if err != nil {
84
- return err
80
+ c .httpClient = & http.Client {
81
+ Timeout : time .Minute ,
85
82
}
86
- c .api = cc
87
83
return nil
88
84
}
89
85
@@ -131,18 +127,18 @@ func (c *consulInstance) watch() {
131
127
132
128
func (c * consulInstance ) stop () {
133
129
c .logDebug ("discovery job stopping" )
134
- c .api = nil
130
+ c .httpClient = nil
135
131
c .prevEnabled = false
136
132
close (c .update )
137
133
}
138
134
139
135
func (c * consulInstance ) updateServices () error {
140
136
services := make ([]ServiceInstance , 0 )
141
- params := & api. QueryOptions {}
137
+ params := & queryParams {}
142
138
if c .params .Namespace != "" {
143
139
params .Namespace = c .params .Namespace
144
140
}
145
- cServices , _ , err := c .api . Catalog (). Services (params )
141
+ cServices , _ , err := c .queryCatalogServices (params )
146
142
if err != nil {
147
143
return err
148
144
}
@@ -151,7 +147,7 @@ func (c *consulInstance) updateServices() error {
151
147
if se == "consul" {
152
148
continue
153
149
}
154
- nodes , meta , err := c .api . Health (). Service ( se , "" , false , & api. QueryOptions {})
150
+ nodes , meta , err := c .queryHealthService ( se , & queryParams {})
155
151
if err != nil {
156
152
continue
157
153
}
@@ -167,7 +163,7 @@ func (c *consulInstance) updateServices() error {
167
163
return c .discoveryConfig .UpdateServices (services )
168
164
}
169
165
170
- func (c * consulInstance ) convertToServers (nodes []* api. ServiceEntry ) []configuration.ServiceServer {
166
+ func (c * consulInstance ) convertToServers (nodes []* serviceEntry ) []configuration.ServiceServer {
171
167
servers := make ([]configuration.ServiceServer , 0 )
172
168
for _ , node := range nodes {
173
169
if node .Service .Address != "" {
@@ -236,3 +232,102 @@ func (c *consulInstance) logDebug(message string) {
236
232
func (c * consulInstance ) logErrorf (format string , args ... interface {}) {
237
233
log .WithFieldsf (c .logFields , log .ErrorLevel , format , args ... )
238
234
}
235
+
236
+ func (c * consulInstance ) queryCatalogServices (params * queryParams ) (map [string ][]string , * queryMetadata , error ) {
237
+ nodes := map [string ][]string {}
238
+ meta , err := c .doConsulQuery (http .MethodGet , "/v1/catalog/services" , params , & nodes )
239
+ if err != nil {
240
+ return nil , nil , err
241
+ }
242
+ return nodes , meta , nil
243
+ }
244
+
245
+ func (c * consulInstance ) queryHealthService (se string , params * queryParams ) ([]* serviceEntry , * queryMetadata , error ) {
246
+ services := []* serviceEntry {}
247
+ path , err := url .JoinPath ("/v1/health/service/" , se )
248
+ if err != nil {
249
+ return nil , nil , err
250
+ }
251
+ meta , err := c .doConsulQuery (http .MethodGet , path , params , & services )
252
+ if err != nil {
253
+ return nil , nil , err
254
+ }
255
+ return services , meta , nil
256
+ }
257
+
258
+ func (c * consulInstance ) doConsulQuery (method string , path string , params * queryParams , resp interface {}) (* queryMetadata , error ) {
259
+ fullPath , err := url .JoinPath (
260
+ "http://" ,
261
+ net .JoinHostPort (* c .params .Address , strconv .FormatInt (* c .params .Port , 10 )),
262
+ path ,
263
+ )
264
+ if err != nil {
265
+ return nil , err
266
+ }
267
+ req , err := http .NewRequest (method , fullPath , nil )
268
+ if err != nil {
269
+ return nil , err
270
+ }
271
+
272
+ // Global Consul parameters.
273
+ if c .params .Token != "" {
274
+ req .Header .Add ("X-Consul-Token" , c .params .Token )
275
+ }
276
+
277
+ // Request's parameters.
278
+ if params .Namespace != "" {
279
+ req .Header .Add ("ns" , c .params .Namespace )
280
+ }
281
+
282
+ httpResp , err := c .httpClient .Do (req )
283
+ if err != nil {
284
+ return nil , err
285
+ }
286
+ raw , err := io .ReadAll (httpResp .Body )
287
+ if err != nil {
288
+ return nil , err
289
+ }
290
+ json := jsoniter .ConfigCompatibleWithStandardLibrary
291
+ err = json .Unmarshal (raw , resp )
292
+ if err != nil {
293
+ return nil , err
294
+ }
295
+
296
+ meta , err := extractMetadata (httpResp )
297
+ if err != nil {
298
+ return nil , err
299
+ }
300
+
301
+ return meta , nil
302
+ }
303
+
304
+ type serviceEntry struct {
305
+ Node * struct {
306
+ Address string
307
+ }
308
+ Service * struct {
309
+ Address string
310
+ Port int
311
+ }
312
+ }
313
+
314
+ type queryParams struct {
315
+ Namespace string
316
+ }
317
+
318
+ type queryMetadata struct {
319
+ LastIndex uint64
320
+ }
321
+
322
+ func extractMetadata (resp * http.Response ) (* queryMetadata , error ) {
323
+ meta := queryMetadata {}
324
+ indexStr := resp .Header .Get ("X-Consul-Index" )
325
+ if indexStr != "" {
326
+ lastIndex , err := strconv .ParseUint (indexStr , 10 , 64 )
327
+ if err != nil {
328
+ return nil , err
329
+ }
330
+ meta .LastIndex = lastIndex
331
+ }
332
+ return & meta , nil
333
+ }
0 commit comments