Skip to content

Commit 24fcd47

Browse files
authored
Add raw_data query support to backend (#203)
1 parent 34a4fdd commit 24fcd47

File tree

6 files changed

+764
-35
lines changed

6 files changed

+764
-35
lines changed

pkg/opensearch/client/search_request.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,14 @@ func (b *SearchRequestBuilder) AddDocValueField(field string) *SearchRequestBuil
101101
return b
102102
}
103103

104+
// AddTimeFieldWithStandardizedFormat adds timeField as field with standardized time format to not receive
105+
// invalid formats that Elasticsearch/OpenSearch can parse, but our frontend can't (e.g. yyyy_MM_dd_HH_mm_ss)
106+
// https://opensearch.org/docs/latest/api-reference/search/#request-body
107+
// https://opensearch.org/docs/latest/field-types/supported-field-types/date/#full-date-formats
108+
func (b *SearchRequestBuilder) AddTimeFieldWithStandardizedFormat(timeField string) {
109+
b.customProps["fields"] = []map[string]string{{"field": timeField, "format": "strict_date_optional_time_nanos"}}
110+
}
111+
104112
// Query creates and return a query builder
105113
func (b *SearchRequestBuilder) Query() *QueryBuilder {
106114
if b.queryBuilder == nil {

pkg/opensearch/lucene_handler.go

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package opensearch
22

33
import (
4+
"fmt"
45
"strconv"
56
"time"
67

@@ -50,16 +51,33 @@ func (h *luceneHandler) processQuery(q *Query) error {
5051
}
5152

5253
if len(q.BucketAggs) == 0 {
53-
if len(q.Metrics) == 0 || q.Metrics[0].Type != "raw_document" {
54-
return nil
54+
// If no aggregations, only document and logs queries are valid
55+
if len(q.Metrics) == 0 || !(q.Metrics[0].Type == rawDataType || q.Metrics[0].Type == rawDocumentType) {
56+
return fmt.Errorf("invalid query, missing metrics and aggregations")
5557
}
56-
metric := q.Metrics[0]
57-
b.Size(metric.Settings.Get("size").MustInt(500))
58-
b.SortDesc("@timestamp", "boolean")
59-
b.AddDocValueField("@timestamp")
60-
return nil
6158
}
6259

60+
switch {
61+
case q.Metrics[0].Type == rawDataType:
62+
processRawDataQuery(q, b, h.client.GetTimeField())
63+
default:
64+
processTimeSeriesQuery(q, b, fromMs, toMs)
65+
}
66+
67+
return nil
68+
}
69+
70+
func processRawDataQuery(q *Query, b *es.SearchRequestBuilder, defaultTimeField string) {
71+
metric := q.Metrics[0]
72+
b.SortDesc(defaultTimeField, "boolean")
73+
b.SortDesc("_doc", "")
74+
b.AddTimeFieldWithStandardizedFormat(defaultTimeField)
75+
b.Size(metric.Settings.Get("size").MustInt(500))
76+
}
77+
78+
func processTimeSeriesQuery(q *Query, b *es.SearchRequestBuilder, fromMs int64, toMs int64) {
79+
metric := q.Metrics[0]
80+
b.Size(metric.Settings.Get("size").MustInt(500))
6381
aggBuilder := b.Agg()
6482

6583
// iterate backwards to create aggregations bottom-down
@@ -143,8 +161,6 @@ func (h *luceneHandler) processQuery(q *Query) error {
143161
})
144162
}
145163
}
146-
147-
return nil
148164
}
149165

150166
func getPipelineAggField(m *MetricAgg) string {
@@ -177,7 +193,7 @@ func (h *luceneHandler) executeQueries() (*backend.QueryDataResponse, error) {
177193
}
178194

179195
rp := newResponseParser(res.Responses, h.queries, res.DebugInfo)
180-
return rp.getTimeSeries()
196+
return rp.getTimeSeries(h.client.GetTimeField())
181197
}
182198

183199
func addDateHistogramAgg(aggBuilder es.AggBuilder, bucketAgg *BucketAgg, timeFrom, timeTo int64) es.AggBuilder {

pkg/opensearch/response_parser.go

Lines changed: 208 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package opensearch
22

33
import (
4+
"encoding/json"
45
"errors"
56
"regexp"
67
"sort"
@@ -26,6 +27,8 @@ const (
2627
filtersType = "filters"
2728
termsType = "terms"
2829
geohashGridType = "geohash_grid"
30+
rawDataType = "raw_data"
31+
rawDocumentType = "raw_document"
2932
)
3033

3134
type responseParser struct {
@@ -42,7 +45,7 @@ var newResponseParser = func(responses []*es.SearchResponse, targets []*Query, d
4245
}
4346
}
4447

45-
func (rp *responseParser) getTimeSeries() (*backend.QueryDataResponse, error) {
48+
func (rp *responseParser) getTimeSeries(timeField string) (*backend.QueryDataResponse, error) {
4649
result := backend.NewQueryDataResponse()
4750

4851
if rp.Responses == nil {
@@ -74,19 +77,217 @@ func (rp *responseParser) getTimeSeries() (*backend.QueryDataResponse, error) {
7477
queryRes := backend.DataResponse{
7578
Frames: data.Frames{},
7679
}
77-
props := make(map[string]string)
78-
err := rp.processBuckets(res.Aggregations, target, &queryRes, props, 0)
79-
if err != nil {
80-
return nil, err
80+
81+
switch {
82+
case target.Metrics[0].Type == rawDataType:
83+
queryRes = processRawDataResponse(res, timeField, queryRes)
84+
default:
85+
props := make(map[string]string)
86+
err := rp.processBuckets(res.Aggregations, target, &queryRes, props, 0)
87+
if err != nil {
88+
return nil, err
89+
}
90+
rp.nameFields(&queryRes.Frames, target)
91+
rp.trimDatapoints(&queryRes.Frames, target)
8192
}
82-
rp.nameFields(&queryRes.Frames, target)
83-
rp.trimDatapoints(&queryRes.Frames, target)
8493

8594
result.Responses[target.RefID] = queryRes
8695
}
8796
return result, nil
8897
}
8998

99+
func processRawDataResponse(res *es.SearchResponse, timeField string, queryRes backend.DataResponse) backend.DataResponse {
100+
propNames := make(map[string]bool)
101+
docs := make([]map[string]interface{}, len(res.Hits.Hits))
102+
103+
for hitIdx, hit := range res.Hits.Hits {
104+
var flattenedSource map[string]interface{}
105+
if hit["_source"] != nil {
106+
// On frontend maxDepth wasn't used but as we are processing on backend
107+
// let's put a limit to avoid infinite loop. 10 was chosen arbitrarily.
108+
flattenedSource = flatten(hit["_source"].(map[string]interface{}), 10)
109+
}
110+
111+
flattenedSource["_id"] = hit["_id"]
112+
flattenedSource["_type"] = hit["_type"]
113+
flattenedSource["_index"] = hit["_index"]
114+
if timestamp, ok := getTimestamp(hit, flattenedSource, timeField); ok {
115+
flattenedSource[timeField] = timestamp
116+
}
117+
118+
for key := range flattenedSource {
119+
propNames[key] = true
120+
}
121+
122+
docs[hitIdx] = flattenedSource
123+
}
124+
fields := processDocsToDataFrameFields(docs, propNames)
125+
126+
frames := data.Frames{}
127+
frame := data.NewFrame("", fields...)
128+
frames = append(frames, frame)
129+
130+
queryRes.Frames = frames
131+
return queryRes
132+
}
133+
134+
func getTimestamp(hit, source map[string]interface{}, timeField string) (*time.Time, bool) {
135+
// "fields" is requested in the query with a specific format in AddTimeFieldWithStandardizedFormat
136+
timeString, ok := lookForTimeFieldInFields(hit, timeField)
137+
if !ok {
138+
// When "fields" is absent, then getTimestamp tries to find a timestamp in _source
139+
timeString, ok = lookForTimeFieldInSource(source, timeField)
140+
if !ok {
141+
// When both "fields" and "_source" timestamps are not present in the expected JSON structure, nil time.Time is returned
142+
return nil, false
143+
}
144+
}
145+
146+
timeValue, err := time.Parse(time.RFC3339Nano, timeString)
147+
if err != nil {
148+
// For an invalid format, nil time.Time is returned
149+
return nil, false
150+
}
151+
152+
return &timeValue, true
153+
}
154+
155+
func lookForTimeFieldInFields(hit map[string]interface{}, timeField string) (string, bool) {
156+
// "fields" should be present with an array of timestamps
157+
if hit["fields"] != nil {
158+
if fieldsMap, ok := hit["fields"].(map[string]interface{}); ok {
159+
timesArray, ok := fieldsMap[timeField].([]interface{})
160+
if !ok {
161+
return "", false
162+
}
163+
if len(timesArray) == 1 {
164+
if timeString, ok := timesArray[0].(string); ok {
165+
return timeString, true
166+
}
167+
}
168+
}
169+
}
170+
return "", false
171+
}
172+
173+
func lookForTimeFieldInSource(source map[string]interface{}, timeField string) (string, bool) {
174+
if source[timeField] != nil {
175+
if timeString, ok := source[timeField].(string); ok {
176+
return timeString, true
177+
}
178+
}
179+
180+
return "", false
181+
}
182+
183+
func flatten(target map[string]interface{}, maxDepth int) map[string]interface{} {
184+
// On frontend maxDepth wasn't used but as we are processing on backend
185+
// let's put a limit to avoid infinite loop. 10 was chosen arbitrary.
186+
output := make(map[string]interface{})
187+
step(0, maxDepth, target, "", output)
188+
return output
189+
}
190+
191+
func step(currentDepth, maxDepth int, target map[string]interface{}, prev string, output map[string]interface{}) {
192+
nextDepth := currentDepth + 1
193+
for key, value := range target {
194+
newKey := strings.Trim(prev+"."+key, ".")
195+
196+
v, ok := value.(map[string]interface{})
197+
if ok && len(v) > 0 && currentDepth < maxDepth {
198+
step(nextDepth, maxDepth, v, newKey, output)
199+
} else {
200+
output[newKey] = value
201+
}
202+
}
203+
}
204+
205+
func processDocsToDataFrameFields(docs []map[string]interface{}, propNames map[string]bool) []*data.Field {
206+
allFields := make([]*data.Field, 0, len(propNames))
207+
var timeDataField *data.Field
208+
for propName := range propNames {
209+
propNameValue := findTheFirstNonNilDocValueForPropName(docs, propName)
210+
switch propNameValue.(type) {
211+
// We are checking for default data types values (float64, int, bool, string)
212+
// and default to json.RawMessage if we cannot find any of them
213+
case *time.Time:
214+
timeDataField = createTimeField(docs, propName)
215+
case float64:
216+
allFields = append(allFields, createFieldOfType[float64](docs, propName))
217+
case int:
218+
allFields = append(allFields, createFieldOfType[int](docs, propName))
219+
case string:
220+
allFields = append(allFields, createFieldOfType[string](docs, propName))
221+
case bool:
222+
allFields = append(allFields, createFieldOfType[bool](docs, propName))
223+
default:
224+
fieldVector := make([]*json.RawMessage, len(docs))
225+
for i, doc := range docs {
226+
bytes, err := json.Marshal(doc[propName])
227+
if err != nil {
228+
// We skip values that cannot be marshalled
229+
continue
230+
}
231+
value := json.RawMessage(bytes)
232+
fieldVector[i] = &value
233+
}
234+
field := data.NewField(propName, nil, fieldVector)
235+
isFilterable := true
236+
field.Config = &data.FieldConfig{Filterable: &isFilterable}
237+
allFields = append(allFields, field)
238+
}
239+
}
240+
241+
sort.Slice(allFields, func(i, j int) bool {
242+
return allFields[i].Name < allFields[j].Name
243+
})
244+
245+
if timeDataField != nil {
246+
allFields = append([]*data.Field{timeDataField}, allFields...)
247+
}
248+
249+
return allFields
250+
}
251+
252+
func findTheFirstNonNilDocValueForPropName(docs []map[string]interface{}, propName string) interface{} {
253+
for _, doc := range docs {
254+
if doc[propName] != nil {
255+
return doc[propName]
256+
}
257+
}
258+
return docs[0][propName]
259+
}
260+
261+
func createTimeField(docs []map[string]interface{}, timeField string) *data.Field {
262+
isFilterable := true
263+
fieldVector := make([]*time.Time, len(docs))
264+
for i, doc := range docs {
265+
value, ok := doc[timeField].(*time.Time) // cannot use generic function below because the type is already a pointer
266+
if !ok {
267+
continue
268+
}
269+
fieldVector[i] = value
270+
}
271+
field := data.NewField(timeField, nil, fieldVector)
272+
field.Config = &data.FieldConfig{Filterable: &isFilterable}
273+
return field
274+
}
275+
276+
func createFieldOfType[T int | float64 | bool | string](docs []map[string]interface{}, propName string) *data.Field {
277+
isFilterable := true
278+
fieldVector := make([]*T, len(docs))
279+
for i, doc := range docs {
280+
value, ok := doc[propName].(T)
281+
if !ok {
282+
continue
283+
}
284+
fieldVector[i] = &value
285+
}
286+
field := data.NewField(propName, nil, fieldVector)
287+
field.Config = &data.FieldConfig{Filterable: &isFilterable}
288+
return field
289+
}
290+
90291
func (rp *responseParser) processBuckets(aggs map[string]interface{}, target *Query, queryResult *backend.DataResponse, props map[string]string, depth int) error {
91292
var err error
92293
maxDepth := len(target.BucketAggs) - 1

0 commit comments

Comments
 (0)