Skip to content

Commit ffbbe80

Browse files
authored
Merge pull request #1225 from prometheus/beorn7/api
api: Extend and improve json-iterator usage
2 parents 66687e5 + 2236d78 commit ffbbe80

File tree

5 files changed

+555
-66
lines changed

5 files changed

+555
-66
lines changed

api/prometheus/v1/api.go

Lines changed: 263 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,15 @@ import (
3535
)
3636

3737
func init() {
38-
json.RegisterTypeEncoderFunc("model.SamplePair", marshalPointJSON, marshalPointJSONIsEmpty)
39-
json.RegisterTypeDecoderFunc("model.SamplePair", unMarshalPointJSON)
38+
json.RegisterTypeEncoderFunc("model.SamplePair", marshalSamplePairJSON, marshalJSONIsEmpty)
39+
json.RegisterTypeDecoderFunc("model.SamplePair", unmarshalSamplePairJSON)
40+
json.RegisterTypeEncoderFunc("model.SampleHistogramPair", marshalSampleHistogramPairJSON, marshalJSONIsEmpty)
41+
json.RegisterTypeDecoderFunc("model.SampleHistogramPair", unmarshalSampleHistogramPairJSON)
42+
json.RegisterTypeEncoderFunc("model.SampleStream", marshalSampleStreamJSON, marshalJSONIsEmpty) // Only needed for benchmark.
43+
json.RegisterTypeDecoderFunc("model.SampleStream", unmarshalSampleStreamJSON) // Only needed for benchmark.
4044
}
4145

42-
func unMarshalPointJSON(ptr unsafe.Pointer, iter *json.Iterator) {
46+
func unmarshalSamplePairJSON(ptr unsafe.Pointer, iter *json.Iterator) {
4347
p := (*model.SamplePair)(ptr)
4448
if !iter.ReadArray() {
4549
iter.ReportError("unmarshal model.SamplePair", "SamplePair must be [timestamp, value]")
@@ -68,12 +72,165 @@ func unMarshalPointJSON(ptr unsafe.Pointer, iter *json.Iterator) {
6872
}
6973
}
7074

71-
func marshalPointJSON(ptr unsafe.Pointer, stream *json.Stream) {
75+
func marshalSamplePairJSON(ptr unsafe.Pointer, stream *json.Stream) {
7276
p := *((*model.SamplePair)(ptr))
7377
stream.WriteArrayStart()
78+
marshalTimestamp(p.Timestamp, stream)
79+
stream.WriteMore()
80+
marshalFloat(float64(p.Value), stream)
81+
stream.WriteArrayEnd()
82+
}
83+
84+
func unmarshalSampleHistogramPairJSON(ptr unsafe.Pointer, iter *json.Iterator) {
85+
p := (*model.SampleHistogramPair)(ptr)
86+
if !iter.ReadArray() {
87+
iter.ReportError("unmarshal model.SampleHistogramPair", "SampleHistogramPair must be [timestamp, {histogram}]")
88+
return
89+
}
90+
t := iter.ReadNumber()
91+
if err := p.Timestamp.UnmarshalJSON([]byte(t)); err != nil {
92+
iter.ReportError("unmarshal model.SampleHistogramPair", err.Error())
93+
return
94+
}
95+
if !iter.ReadArray() {
96+
iter.ReportError("unmarshal model.SampleHistogramPair", "SamplePair missing histogram")
97+
return
98+
}
99+
h := &model.SampleHistogram{}
100+
p.Histogram = h
101+
for key := iter.ReadObject(); key != ""; key = iter.ReadObject() {
102+
switch key {
103+
case "count":
104+
f, err := strconv.ParseFloat(iter.ReadString(), 64)
105+
if err != nil {
106+
iter.ReportError("unmarshal model.SampleHistogramPair", "count of histogram is not a float")
107+
return
108+
}
109+
h.Count = model.FloatString(f)
110+
case "sum":
111+
f, err := strconv.ParseFloat(iter.ReadString(), 64)
112+
if err != nil {
113+
iter.ReportError("unmarshal model.SampleHistogramPair", "sum of histogram is not a float")
114+
return
115+
}
116+
h.Sum = model.FloatString(f)
117+
case "buckets":
118+
for iter.ReadArray() {
119+
b, err := unmarshalHistogramBucket(iter)
120+
if err != nil {
121+
iter.ReportError("unmarshal model.HistogramBucket", err.Error())
122+
return
123+
}
124+
h.Buckets = append(h.Buckets, b)
125+
}
126+
default:
127+
iter.ReportError("unmarshal model.SampleHistogramPair", fmt.Sprint("unexpected key in histogram:", key))
128+
return
129+
}
130+
}
131+
if iter.ReadArray() {
132+
iter.ReportError("unmarshal model.SampleHistogramPair", "SampleHistogramPair has too many values, must be [timestamp, {histogram}]")
133+
return
134+
}
135+
}
136+
137+
func marshalSampleHistogramPairJSON(ptr unsafe.Pointer, stream *json.Stream) {
138+
p := *((*model.SampleHistogramPair)(ptr))
139+
stream.WriteArrayStart()
140+
marshalTimestamp(p.Timestamp, stream)
141+
stream.WriteMore()
142+
marshalHistogram(*p.Histogram, stream)
143+
stream.WriteArrayEnd()
144+
}
145+
146+
func unmarshalSampleStreamJSON(ptr unsafe.Pointer, iter *json.Iterator) {
147+
ss := (*model.SampleStream)(ptr)
148+
for key := iter.ReadObject(); key != ""; key = iter.ReadObject() {
149+
switch key {
150+
case "metric":
151+
metricString := iter.ReadAny().ToString()
152+
if err := json.UnmarshalFromString(metricString, &ss.Metric); err != nil {
153+
iter.ReportError("unmarshal model.SampleStream", err.Error())
154+
return
155+
}
156+
case "values":
157+
for iter.ReadArray() {
158+
v := model.SamplePair{}
159+
unmarshalSamplePairJSON(unsafe.Pointer(&v), iter)
160+
ss.Values = append(ss.Values, v)
161+
}
162+
case "histograms":
163+
for iter.ReadArray() {
164+
h := model.SampleHistogramPair{}
165+
unmarshalSampleHistogramPairJSON(unsafe.Pointer(&h), iter)
166+
ss.Histograms = append(ss.Histograms, h)
167+
}
168+
default:
169+
iter.ReportError("unmarshal model.SampleStream", fmt.Sprint("unexpected key:", key))
170+
return
171+
}
172+
}
173+
}
174+
175+
func marshalSampleStreamJSON(ptr unsafe.Pointer, stream *json.Stream) {
176+
ss := *((*model.SampleStream)(ptr))
177+
stream.WriteObjectStart()
178+
stream.WriteObjectField(`metric`)
179+
m, err := json.ConfigCompatibleWithStandardLibrary.Marshal(ss.Metric)
180+
if err != nil {
181+
stream.Error = err
182+
return
183+
}
184+
stream.SetBuffer(append(stream.Buffer(), m...))
185+
if len(ss.Values) > 0 {
186+
stream.WriteMore()
187+
stream.WriteObjectField(`values`)
188+
stream.WriteArrayStart()
189+
for i, v := range ss.Values {
190+
if i > 0 {
191+
stream.WriteMore()
192+
}
193+
marshalSamplePairJSON(unsafe.Pointer(&v), stream)
194+
}
195+
stream.WriteArrayEnd()
196+
}
197+
if len(ss.Histograms) > 0 {
198+
stream.WriteMore()
199+
stream.WriteObjectField(`histograms`)
200+
stream.WriteArrayStart()
201+
for i, h := range ss.Histograms {
202+
if i > 0 {
203+
stream.WriteMore()
204+
}
205+
marshalSampleHistogramPairJSON(unsafe.Pointer(&h), stream)
206+
}
207+
stream.WriteArrayEnd()
208+
}
209+
stream.WriteObjectEnd()
210+
}
211+
212+
func marshalFloat(v float64, stream *json.Stream) {
213+
stream.WriteRaw(`"`)
214+
// Taken from https://github.com/json-iterator/go/blob/master/stream_float.go#L71 as a workaround
215+
// to https://github.com/json-iterator/go/issues/365 (json-iterator, to follow json standard, doesn't allow inf/nan).
216+
buf := stream.Buffer()
217+
abs := math.Abs(v)
218+
fmt := byte('f')
219+
// Note: Must use float32 comparisons for underlying float32 value to get precise cutoffs right.
220+
if abs != 0 {
221+
if abs < 1e-6 || abs >= 1e21 {
222+
fmt = 'e'
223+
}
224+
}
225+
buf = strconv.AppendFloat(buf, v, fmt, -1, 64)
226+
stream.SetBuffer(buf)
227+
stream.WriteRaw(`"`)
228+
}
229+
230+
func marshalTimestamp(timestamp model.Time, stream *json.Stream) {
231+
t := int64(timestamp)
74232
// Write out the timestamp as a float divided by 1000.
75233
// This is ~3x faster than converting to a float.
76-
t := int64(p.Timestamp)
77234
if t < 0 {
78235
stream.WriteRaw(`-`)
79236
t = -t
@@ -90,28 +247,113 @@ func marshalPointJSON(ptr unsafe.Pointer, stream *json.Stream) {
90247
}
91248
stream.WriteInt64(fraction)
92249
}
93-
stream.WriteMore()
94-
stream.WriteRaw(`"`)
250+
}
95251

96-
// Taken from https://github.com/json-iterator/go/blob/master/stream_float.go#L71 as a workaround
97-
// to https://github.com/json-iterator/go/issues/365 (jsoniter, to follow json standard, doesn't allow inf/nan)
98-
buf := stream.Buffer()
99-
abs := math.Abs(float64(p.Value))
100-
fmt := byte('f')
101-
// Note: Must use float32 comparisons for underlying float32 value to get precise cutoffs right.
102-
if abs != 0 {
103-
if abs < 1e-6 || abs >= 1e21 {
104-
fmt = 'e'
105-
}
252+
func unmarshalHistogramBucket(iter *json.Iterator) (*model.HistogramBucket, error) {
253+
b := model.HistogramBucket{}
254+
if !iter.ReadArray() {
255+
return nil, errors.New("HistogramBucket must be [boundaries, lower, upper, count]")
106256
}
107-
buf = strconv.AppendFloat(buf, float64(p.Value), fmt, -1, 64)
108-
stream.SetBuffer(buf)
257+
boundaries, err := iter.ReadNumber().Int64()
258+
if err != nil {
259+
return nil, err
260+
}
261+
b.Boundaries = int32(boundaries)
262+
if !iter.ReadArray() {
263+
return nil, errors.New("HistogramBucket must be [boundaries, lower, upper, count]")
264+
}
265+
f, err := strconv.ParseFloat(iter.ReadString(), 64)
266+
if err != nil {
267+
return nil, err
268+
}
269+
b.Lower = model.FloatString(f)
270+
if !iter.ReadArray() {
271+
return nil, errors.New("HistogramBucket must be [boundaries, lower, upper, count]")
272+
}
273+
f, err = strconv.ParseFloat(iter.ReadString(), 64)
274+
if err != nil {
275+
return nil, err
276+
}
277+
b.Upper = model.FloatString(f)
278+
if !iter.ReadArray() {
279+
return nil, errors.New("HistogramBucket must be [boundaries, lower, upper, count]")
280+
}
281+
f, err = strconv.ParseFloat(iter.ReadString(), 64)
282+
if err != nil {
283+
return nil, err
284+
}
285+
b.Count = model.FloatString(f)
286+
if iter.ReadArray() {
287+
return nil, errors.New("HistogramBucket has too many values, must be [boundaries, lower, upper, count]")
288+
}
289+
return &b, nil
290+
}
109291

110-
stream.WriteRaw(`"`)
292+
// marshalHistogramBucket writes something like: [ 3, "-0.25", "0.25", "3"]
293+
// See marshalHistogram to understand what the numbers mean
294+
func marshalHistogramBucket(b model.HistogramBucket, stream *json.Stream) {
295+
stream.WriteArrayStart()
296+
stream.WriteInt32(b.Boundaries)
297+
stream.WriteMore()
298+
marshalFloat(float64(b.Lower), stream)
299+
stream.WriteMore()
300+
marshalFloat(float64(b.Upper), stream)
301+
stream.WriteMore()
302+
marshalFloat(float64(b.Count), stream)
111303
stream.WriteArrayEnd()
112304
}
113305

114-
func marshalPointJSONIsEmpty(ptr unsafe.Pointer) bool {
306+
// marshalHistogram writes something like:
307+
//
308+
// {
309+
// "count": "42",
310+
// "sum": "34593.34",
311+
// "buckets": [
312+
// [ 3, "-0.25", "0.25", "3"],
313+
// [ 0, "0.25", "0.5", "12"],
314+
// [ 0, "0.5", "1", "21"],
315+
// [ 0, "2", "4", "6"]
316+
// ]
317+
// }
318+
//
319+
// The 1st element in each bucket array determines if the boundaries are
320+
// inclusive (AKA closed) or exclusive (AKA open):
321+
//
322+
// 0: lower exclusive, upper inclusive
323+
// 1: lower inclusive, upper exclusive
324+
// 2: both exclusive
325+
// 3: both inclusive
326+
//
327+
// The 2nd and 3rd elements are the lower and upper boundary. The 4th element is
328+
// the bucket count.
329+
func marshalHistogram(h model.SampleHistogram, stream *json.Stream) {
330+
stream.WriteObjectStart()
331+
stream.WriteObjectField(`count`)
332+
marshalFloat(float64(h.Count), stream)
333+
stream.WriteMore()
334+
stream.WriteObjectField(`sum`)
335+
marshalFloat(float64(h.Sum), stream)
336+
337+
bucketFound := false
338+
for _, bucket := range h.Buckets {
339+
if bucket.Count == 0 {
340+
continue // No need to expose empty buckets in JSON.
341+
}
342+
stream.WriteMore()
343+
if !bucketFound {
344+
stream.WriteObjectField(`buckets`)
345+
stream.WriteArrayStart()
346+
}
347+
bucketFound = true
348+
marshalHistogramBucket(*bucket, stream)
349+
}
350+
if bucketFound {
351+
stream.WriteArrayEnd()
352+
}
353+
stream.WriteObjectEnd()
354+
}
355+
356+
func marshalJSONIsEmpty(ptr unsafe.Pointer) bool {
115357
return false
116358
}
117359

0 commit comments

Comments
 (0)