Skip to content

Commit 6810d0c

Browse files
authored
feat: support sending metrics before handler initialization (#524)
* add `warning` level to logger * export `logWarning` from `utils` * add `queue.ts` for `MetricsQueue` * export `MetricsQueue` from `metrics` * use `MetricsQueue` for global metrics queue for later processing when using on handler not initialized * remove unused import * format and lint * remove unused import * format and lint * add entry to integration tets * fix `return` to `continue` * update `shift` to `pop` reducing time complexity to `O(2n)`
1 parent dadad52 commit 6810d0c

File tree

12 files changed

+269
-26
lines changed

12 files changed

+269
-26
lines changed

integration_tests/send-metrics.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
const { datadog, sendDistributionMetric } = require("datadog-lambda-js");
22

3+
sendDistributionMetric(
4+
"serverless.integration_test.outside_handler",
5+
1,
6+
"tagkey:tagvalue",
7+
`eventsource:outside_handler`,
8+
);
9+
310
async function handle(event, context) {
411
const responsePayload = { message: "hello, dog!" };
512

integration_tests/snapshots/logs/async-metrics_node16.log

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,17 @@ START
1515
],
1616
"v": 1
1717
}
18+
{
19+
"e": XXXX,
20+
"m": "serverless.integration_test.outside_handler",
21+
"t": [
22+
"tagkey:tagvalue",
23+
"eventsource:outside_handler",
24+
"dd_lambda_layer:datadog-nodev16.XX.X"
25+
],
26+
"v": 1
27+
}
28+
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed APIGateway request
1829
{
1930
"e": XXXX,
2031
"m": "serverless.integration_test.execution",
@@ -25,7 +36,6 @@ START
2536
],
2637
"v": 1
2738
}
28-
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed APIGateway request
2939
END Duration: XXXX ms (init: XXXX ms) Memory Used: XXXX MB
3040
START
3141
{
@@ -43,6 +53,7 @@ START
4353
],
4454
"v": 1
4555
}
56+
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS request
4657
{
4758
"e": XXXX,
4859
"m": "serverless.integration_test.records_processed",
@@ -63,9 +74,9 @@ START
6374
],
6475
"v": 1
6576
}
66-
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS request
6777
END Duration: XXXX ms Memory Used: XXXX MB
6878
START
79+
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS request
6980
{
7081
"e": XXXX,
7182
"m": "aws.lambda.enhanced.invocations",
@@ -111,5 +122,4 @@ START
111122
],
112123
"v": 1
113124
}
114-
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS request
115125
END Duration: XXXX ms Memory Used: XXXX MB

integration_tests/snapshots/logs/async-metrics_node18.log

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,17 @@ START
1515
],
1616
"v": 1
1717
}
18+
{
19+
"e": XXXX,
20+
"m": "serverless.integration_test.outside_handler",
21+
"t": [
22+
"tagkey:tagvalue",
23+
"eventsource:outside_handler",
24+
"dd_lambda_layer:datadog-nodev18.XX.X"
25+
],
26+
"v": 1
27+
}
28+
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed APIGateway request
1829
{
1930
"e": XXXX,
2031
"m": "serverless.integration_test.execution",
@@ -25,7 +36,6 @@ START
2536
],
2637
"v": 1
2738
}
28-
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed APIGateway request
2939
END Duration: XXXX ms (init: XXXX ms) Memory Used: XXXX MB
3040
START
3141
{
@@ -43,6 +53,7 @@ START
4353
],
4454
"v": 1
4555
}
56+
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS request
4657
{
4758
"e": XXXX,
4859
"m": "serverless.integration_test.records_processed",
@@ -63,7 +74,6 @@ START
6374
],
6475
"v": 1
6576
}
66-
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS request
6777
END Duration: XXXX ms Memory Used: XXXX MB
6878
START
6979
{
@@ -81,6 +91,7 @@ START
8191
],
8292
"v": 1
8393
}
94+
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS request
8495
{
8596
"e": XXXX,
8697
"m": "serverless.integration_test.records_processed",
@@ -111,5 +122,4 @@ START
111122
],
112123
"v": 1
113124
}
114-
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS request
115125
END Duration: XXXX ms Memory Used: XXXX MB

integration_tests/snapshots/logs/async-metrics_node20.log

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,16 @@ START
1515
],
1616
"v": 1
1717
}
18-
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed APIGateway request
18+
{
19+
"e": XXXX,
20+
"m": "serverless.integration_test.outside_handler",
21+
"t": [
22+
"tagkey:tagvalue",
23+
"eventsource:outside_handler",
24+
"dd_lambda_layer:datadog-nodev20.XX.X"
25+
],
26+
"v": 1
27+
}
1928
{
2029
"e": XXXX,
2130
"m": "serverless.integration_test.execution",
@@ -26,8 +35,10 @@ XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed APIGat
2635
],
2736
"v": 1
2837
}
38+
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed APIGateway request
2939
END Duration: XXXX ms (init: XXXX ms) Memory Used: XXXX MB
3040
START
41+
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS request
3142
{
3243
"e": XXXX,
3344
"m": "aws.lambda.enhanced.invocations",
@@ -43,7 +54,6 @@ START
4354
],
4455
"v": 1
4556
}
46-
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS request
4757
{
4858
"e": XXXX,
4959
"m": "serverless.integration_test.records_processed",
@@ -66,7 +76,6 @@ XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SNS re
6676
}
6777
END Duration: XXXX ms Memory Used: XXXX MB
6878
START
69-
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS request
7079
{
7180
"e": XXXX,
7281
"m": "aws.lambda.enhanced.invocations",
@@ -82,6 +91,7 @@ XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS re
8291
],
8392
"v": 1
8493
}
94+
XXXX-XX-XX XX:XX:XX.XXX INFO [dd.trace_id=XXXX dd.span_id=XXXX] Processed SQS request
8595
{
8696
"e": XXXX,
8797
"m": "serverless.integration_test.records_processed",

src/index.spec.ts

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,13 @@ import http from "http";
22
import nock from "nock";
33

44
import { Context, Handler } from "aws-lambda";
5-
import { datadog, getTraceHeaders, sendDistributionMetric, sendDistributionMetricWithDate } from "./index";
5+
import {
6+
datadog,
7+
getTraceHeaders,
8+
sendDistributionMetric,
9+
sendDistributionMetricWithDate,
10+
_metricsQueue,
11+
} from "./index";
612
import { incrementErrorsMetric, incrementInvocationsMetric } from "./metrics/enhanced-metrics";
713
import { LogLevel, setLogLevel } from "./utils";
814
import { HANDLER_STREAMING, STREAM_RESPONSE } from "./constants";
@@ -407,6 +413,7 @@ describe("datadog", () => {
407413
const logger = {
408414
debug: jest.fn(),
409415
error: jest.fn(),
416+
warn: jest.fn(),
410417
};
411418

412419
const wrapped = datadog(handler, { forceWrap: true, logger: logger, debugLogging: true });
@@ -440,3 +447,25 @@ describe("datadog", () => {
440447
expect(wrapped[HANDLER_STREAMING]).toBe(undefined);
441448
});
442449
});
450+
451+
describe("sendDistributionMetric", () => {
452+
beforeEach(() => {
453+
_metricsQueue.reset();
454+
setLogLevel(LogLevel.NONE);
455+
});
456+
it("enqueues a metric for later processing when metrics listener is not initialized", () => {
457+
sendDistributionMetric("metric", 1, "first-tag", "second-tag");
458+
expect(_metricsQueue.length).toBe(1);
459+
});
460+
});
461+
462+
describe("sendDistributionMetricWithDate", () => {
463+
beforeEach(() => {
464+
_metricsQueue.reset();
465+
setLogLevel(LogLevel.NONE);
466+
});
467+
it("enqueues a metric for later processing when metrics listener is not initialized", () => {
468+
sendDistributionMetricWithDate("metric", 1, new Date(), "first-tag", "second-tag");
469+
expect(_metricsQueue.length).toBe(1);
470+
});
471+
});

src/index.ts

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,11 @@ import {
66
KMSService,
77
MetricsConfig,
88
MetricsListener,
9+
MetricsQueue,
910
} from "./metrics";
1011
import { TraceConfig, TraceListener } from "./trace";
1112
import { subscribeToDC } from "./runtime";
12-
import {
13-
logDebug,
14-
logError,
15-
Logger,
16-
LogLevel,
17-
promisifiedHandler,
18-
setSandboxInit,
19-
setLogger,
20-
setLogLevel,
21-
} from "./utils";
13+
import { logDebug, Logger, LogLevel, promisifiedHandler, setSandboxInit, setLogger, setLogLevel } from "./utils";
2214
import { getEnhancedMetricTags } from "./metrics/enhanced-metrics";
2315
import { DatadogTraceHeaders } from "./trace/context/extractor";
2416

@@ -90,6 +82,8 @@ export const defaultConfig: Config = {
9082
localTesting: false,
9183
} as const;
9284

85+
export const _metricsQueue: MetricsQueue = new MetricsQueue();
86+
9387
let currentMetricsListener: MetricsListener | undefined;
9488
let currentTraceListener: TraceListener | undefined;
9589

@@ -150,6 +144,8 @@ export function datadog<TEvent, TResult>(
150144
if (finalConfig.enhancedMetrics) {
151145
incrementInvocationsMetric(metricsListener, context);
152146
}
147+
148+
sendQueueMetrics(metricsListener);
153149
} catch (err) {
154150
if (err instanceof Error) {
155151
logDebug("Failed to start listeners", err);
@@ -259,9 +255,10 @@ export function sendDistributionMetricWithDate(name: string, value: number, metr
259255

260256
if (currentMetricsListener !== undefined) {
261257
currentMetricsListener.sendDistributionMetricWithDate(name, value, metricTime, false, ...tags);
262-
} else {
263-
logError("handler not initialized");
258+
return;
264259
}
260+
261+
_metricsQueue.push({ name, value, metricTime, tags });
265262
}
266263

267264
/**
@@ -275,8 +272,26 @@ export function sendDistributionMetric(name: string, value: number, ...tags: str
275272

276273
if (currentMetricsListener !== undefined) {
277274
currentMetricsListener.sendDistributionMetric(name, value, false, ...tags);
278-
} else {
279-
logError("handler not initialized");
275+
return;
276+
}
277+
278+
_metricsQueue.push({ name, value, tags });
279+
}
280+
281+
function sendQueueMetrics(listener: MetricsListener) {
282+
// Reverse the queue to send metrics in order.
283+
// This is necessary because the "queue" is a stack,
284+
// and we want to send metrics in the order they were added.
285+
_metricsQueue.reverse();
286+
while (_metricsQueue.length > 0) {
287+
const metric = _metricsQueue.pop()!; // This will always exist.
288+
const { name, value, metricTime, tags } = metric;
289+
if (metricTime !== undefined) {
290+
listener.sendDistributionMetricWithDate(name, value, metricTime, false, ...tags);
291+
continue;
292+
}
293+
294+
listener.sendDistributionMetric(name, value, false, ...tags);
280295
}
281296
}
282297

src/metrics/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
export { MetricsConfig, MetricsListener } from "./listener";
2+
export { MetricsQueue } from "./queue";
23
export { KMSService } from "./kms-service";
34
export { incrementErrorsMetric, incrementInvocationsMetric } from "./enhanced-metrics";

src/metrics/queue.spec.ts

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import { LogLevel, logDebug, setLogLevel, setLogger } from "../utils";
2+
import { METRICS_QUEUE_LIMIT, MetricsQueue } from "./queue";
3+
4+
describe("MetricsQueue", () => {
5+
const logger = {
6+
debug: jest.fn(),
7+
error: jest.fn(),
8+
warn: jest.fn(),
9+
};
10+
describe("push", () => {
11+
beforeEach(() => {
12+
setLogLevel(LogLevel.NONE);
13+
setLogger(logger);
14+
});
15+
it("resets metrics queue when its full", () => {
16+
setLogLevel(LogLevel.WARNING);
17+
const queue = new MetricsQueue();
18+
for (let i = 0; i < METRICS_QUEUE_LIMIT + 1; i++) {
19+
queue.push({ name: "metric", tags: [], value: i });
20+
}
21+
22+
// The queue should have been reset and only contain the last metric
23+
expect(queue.length).toBe(1);
24+
expect(logger.warn).toHaveBeenLastCalledWith(
25+
'{"status":"warning","message":"datadog:Metrics queue is full, dropping all metrics."}',
26+
);
27+
});
28+
29+
it("enqueue metric", () => {
30+
setLogLevel(LogLevel.DEBUG);
31+
const queue = new MetricsQueue();
32+
queue.push({ name: "metric", tags: [], value: 1 });
33+
expect(queue.length).toBe(1);
34+
expect(logger.debug).toHaveBeenLastCalledWith(
35+
'{"status":"debug","message":"datadog:Metrics Listener was not initialized. Enqueuing metric for later processing."}',
36+
);
37+
});
38+
});
39+
40+
describe("pop", () => {
41+
it("returns undefined when queue is empty", () => {
42+
const queue = new MetricsQueue();
43+
expect(queue.pop()).toBeUndefined();
44+
});
45+
46+
it("returns the first element in the queue", () => {
47+
const queue = new MetricsQueue();
48+
queue.push({ name: "metric", tags: [], value: 1 });
49+
queue.push({ name: "metric", tags: [], value: 2 });
50+
expect(queue.pop()).toEqual({ name: "metric", tags: [], value: 2 });
51+
});
52+
});
53+
54+
it("reverses the queue", () => {
55+
const queue = new MetricsQueue();
56+
queue.push({ name: "metric", tags: [], value: 1 });
57+
queue.push({ name: "metric", tags: [], value: 2 });
58+
queue.reverse();
59+
expect(queue.pop()).toEqual({ name: "metric", tags: [], value: 1 });
60+
});
61+
62+
it("resets the queue", () => {
63+
const queue = new MetricsQueue();
64+
queue.push({ name: "metric", tags: [], value: 1 });
65+
queue.push({ name: "metric", tags: [], value: 2 });
66+
queue.reset();
67+
expect(queue.length).toBe(0);
68+
});
69+
70+
it("returns the length of the queue", () => {
71+
const queue = new MetricsQueue();
72+
queue.push({ name: "metric", tags: [], value: 1 });
73+
queue.push({ name: "metric", tags: [], value: 2 });
74+
expect(queue.length).toBe(2);
75+
});
76+
});

0 commit comments

Comments
 (0)