Skip to content

Commit 461ad30

Browse files
committed
add more envelopes and tests, refactored utils to autocomplete event files
1 parent f08f064 commit 461ad30

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+900
-246
lines changed

packages/parser/src/envelopes/Envelope.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,13 @@ export abstract class Envelope {
1515
data: unknown,
1616
schema: T
1717
): z.infer<T>[] {
18-
if (typeof data !== 'object') {
19-
throw new Error('Data must be an object');
20-
}
21-
22-
return schema.parse(data);
18+
if (typeof data === 'string') {
19+
return schema.parse(JSON.parse(data));
20+
} else if (typeof data === 'object') {
21+
return schema.parse(data);
22+
} else
23+
throw new Error(
24+
`Invalid data type for envelope. Expected string or object, got ${typeof data}`
25+
);
2326
}
2427
}

packages/parser/src/envelopes/Envelopes.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import { LambdaFunctionUrlEnvelope } from './lambda.js';
99
import { SnsEnvelope, SnsSqsEnvelope } from './sns.js';
1010
import { VpcLatticeEnvelope } from './vpc-lattice.js';
1111
import { VpcLatticeV2Envelope } from './vpc-latticev2.js';
12+
import { DynamoDBStreamEnvelope } from './dynamodb.js';
13+
import { KinesisEnvelope } from './kinesis.js';
1214

1315
/**
1416
* A collection of envelopes to create new envelopes.
@@ -17,9 +19,11 @@ export class Envelopes {
1719
public static readonly API_GW_ENVELOPE = new ApiGatewayEnvelope();
1820
public static readonly API_GW_V2_ENVELOPE = new ApiGatwayV2Envelope();
1921
public static readonly CLOUDWATCH_ENVELOPE = new CloudWatchEnvelope();
22+
public static readonly DYNAMO_DB_STREAM_ENVELOPE =
23+
new DynamoDBStreamEnvelope();
2024
public static readonly EVENT_BRIDGE_ENVELOPE = new EventBridgeEnvelope();
2125
public static readonly KAFKA_ENVELOPE = new KafkaEnvelope();
22-
public static readonly KINESIS_ENVELOPE = new KafkaEnvelope();
26+
public static readonly KINESIS_ENVELOPE = new KinesisEnvelope();
2327
public static readonly KINESIS_FIREHOSE_ENVELOPE =
2428
new KinesisFirehoseEnvelope();
2529
public static readonly LAMBDA_FUCTION_URL_ENVELOPE =

packages/parser/src/envelopes/apigw.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { Envelope } from './Envelope.js';
22
import { z, ZodSchema } from 'zod';
33
import { APIGatewayProxyEventSchema } from '../schemas/apigw.js';
4+
import { type ApiGatewayProxyEvent } from '../types/schema.js';
45

56
/**
67
* API Gateway envelope to extract data within body key"
@@ -11,7 +12,11 @@ export class ApiGatewayEnvelope extends Envelope {
1112
}
1213

1314
public parse<T extends ZodSchema>(data: unknown, schema: T): z.infer<T> {
14-
const parsedEnvelope = APIGatewayProxyEventSchema.parse(data);
15+
const parsedEnvelope: ApiGatewayProxyEvent =
16+
APIGatewayProxyEventSchema.parse(data);
17+
if (parsedEnvelope.body === undefined) {
18+
throw new Error('Body field of API Gateway event is undefined');
19+
}
1520

1621
return this._parse(parsedEnvelope.body, schema);
1722
}

packages/parser/src/envelopes/apigwv2.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ export class ApiGatwayV2Envelope extends Envelope {
1212

1313
public parse<T extends ZodSchema>(data: unknown, schema: T): z.infer<T> {
1414
const parsedEnvelope = APIGatewayProxyEventV2Schema.parse(data);
15+
if (parsedEnvelope.body === undefined) {
16+
throw new Error('Body field of API Gateway V2 event is undefined');
17+
}
1518

1619
return this._parse(parsedEnvelope.body, schema);
1720
}

packages/parser/src/envelopes/cloudwatch.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ export class CloudWatchEnvelope extends Envelope {
2020
const parsedEnvelope = CloudWatchLogsSchema.parse(data);
2121

2222
return parsedEnvelope.awslogs.data.logEvents.map((record) => {
23-
return this._parse(record, schema);
23+
return this._parse(record.message, schema);
2424
});
2525
}
2626
}

packages/parser/src/envelopes/kafka.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,10 @@ export class KafkaEnvelope extends Envelope {
3333
? KafkaMskEventSchema.parse(data)
3434
: KafkaSelfManagedEventSchema.parse(data);
3535

36-
return parsedEnvelope.records.data.map((record: KafkaRecord) => {
37-
return this._parse(record.value, schema);
36+
return Object.values(parsedEnvelope.records).map((topicRecord) => {
37+
return topicRecord.map((record: KafkaRecord) => {
38+
return this._parse(record.value, schema);
39+
});
3840
});
3941
}
4042
}

packages/parser/src/envelopes/lambda.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ export class LambdaFunctionUrlEnvelope extends Envelope {
1313
public parse<T extends ZodSchema>(data: unknown, schema: T): z.infer<T> {
1414
const parsedEnvelope = LambdaFunctionUrlSchema.parse(data);
1515

16-
return this.parse(parsedEnvelope.body, schema);
16+
if (parsedEnvelope.body === undefined) {
17+
throw new Error('Body field of Lambda function URL event is undefined');
18+
}
19+
20+
return this._parse(parsedEnvelope.body, schema);
1721
}
1822
}

packages/parser/src/envelopes/sns.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { z, ZodSchema } from 'zod';
22
import { Envelope } from './Envelope.js';
3-
import { SnsNotificationSchema, SnsSchema } from '../schemas/sns.js';
3+
import { SnsSchema, SnsSqsNotificationSchema } from '../schemas/sns.js';
44
import { SqsSchema } from '../schemas/sqs.js';
55

66
/**
@@ -46,9 +46,11 @@ export class SnsSqsEnvelope extends Envelope {
4646
const parsedEnvelope = SqsSchema.parse(data);
4747

4848
return parsedEnvelope.Records.map((record) => {
49-
const snsNotification = SnsNotificationSchema.parse(record.body);
49+
const snsNotification = SnsSqsNotificationSchema.parse(
50+
JSON.parse(record.body)
51+
);
5052

51-
return this._parse(snsNotification, schema);
53+
return this._parse(snsNotification.Message, schema);
5254
});
5355
}
5456
}

packages/parser/src/envelopes/sqs.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@ export class SqsEnvelope extends Envelope {
2020
const parsedEnvelope = SqsSchema.parse(data);
2121

2222
return parsedEnvelope.Records.map((record) => {
23-
const body = JSON.parse(record.body);
24-
25-
return this._parse(body, schema);
23+
return this._parse(record.body, schema);
2624
});
2725
}
2826
}

packages/parser/src/middleware/parser.ts

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ import { MiddlewareObj } from '@middy/core';
33
import { ZodSchema } from 'zod';
44
import { Envelope } from '../envelopes/Envelope.js';
55

6-
interface ParserOptions {
7-
schema: ZodSchema;
8-
envelope?: Envelope;
6+
interface ParserOptions<S extends ZodSchema, E extends Envelope> {
7+
schema: S;
8+
envelope?: E;
99
}
1010

1111
/**
@@ -36,7 +36,9 @@ interface ParserOptions {
3636
*
3737
* @param options
3838
*/
39-
const parser = (options: ParserOptions): MiddlewareObj => {
39+
const parser = <S extends ZodSchema, E extends Envelope>(
40+
options: ParserOptions<S, E>
41+
): MiddlewareObj => {
4042
const before = (request: MiddyLikeRequest): void => {
4143
const { schema, envelope } = options;
4244
if (envelope) {
@@ -46,14 +48,8 @@ const parser = (options: ParserOptions): MiddlewareObj => {
4648
}
4749
};
4850

49-
const after = (_request: MiddyLikeRequest): void => {};
50-
51-
const onError = (_request: MiddyLikeRequest): void => {};
52-
5351
return {
5452
before,
55-
after,
56-
onError,
5753
};
5854
};
5955

packages/parser/src/schemas/cloudwatch.ts

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,9 @@ const CloudWatchLogsSchema = z.object({
3030
}),
3131
});
3232

33-
const extractCloudWatchLogFromEvent = (
34-
data: string
35-
): z.infer<typeof CloudWatchLogsDecodeSchema> => {
36-
return decompressRecordToJSON(data);
37-
};
38-
3933
export {
4034
CloudWatchLogsSchema,
4135
CloudWatchLogsDecodeSchema,
4236
decompressRecordToJSON,
43-
extractCloudWatchLogFromEvent,
37+
CloudWatchLogEventSchema,
4438
};

packages/parser/src/schemas/kinesis.ts

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,31 @@
11
import { z } from 'zod';
2+
import { gunzipSync } from 'node:zlib';
23

34
const KinesisDataStreamRecordPayload = z.object({
45
kinesisSchemaVersion: z.string(),
56
partitionKey: z.string(),
67
sequenceNumber: z.string(),
78
approximateArrivalTimestamp: z.number(),
8-
data: z.string(),
9+
data: z.string().transform((data) => {
10+
const decompresed = decompress(data);
11+
const decoded = Buffer.from(data, 'base64').toString('utf-8');
12+
try {
13+
// If data was not compressed, try to parse it as JSON otherwise it must be string
14+
return decompresed === data ? JSON.parse(decoded) : decompresed;
15+
} catch (e) {
16+
return decoded;
17+
}
18+
}),
919
});
1020

21+
const decompress = (data: string): string => {
22+
try {
23+
return JSON.parse(gunzipSync(Buffer.from(data, 'base64')).toString('utf8'));
24+
} catch (e) {
25+
return data;
26+
}
27+
};
28+
1129
const KinesisDataStreamRecord = z.object({
1230
eventSource: z.literal('aws:kinesis'),
1331
eventVersion: z.string(),

packages/parser/src/schemas/sns.ts

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,26 @@ const SnsNotificationSchema = z.object({
99
Subject: z.string().optional(),
1010
TopicArn: z.string(),
1111
UnsubscribeUrl: z.string().url(),
12+
UnsubscribeURL: z.string().url().optional(),
13+
SigningCertUrl: z.string().url().optional(),
14+
SigningCertURL: z.string().url().optional(),
1215
Type: z.literal('Notification'),
1316
MessageAttributes: z.record(z.string(), SnsMsgAttribute).optional(),
1417
Message: z.string(),
1518
MessageId: z.string(),
1619
Signature: z.string().optional(),
1720
SignatureVersion: z.string().optional(),
18-
SigningCertUrl: z.string().url().optional(),
1921
Timestamp: z.string().datetime(),
2022
});
2123

24+
const SnsSqsNotificationSchema = SnsNotificationSchema.extend({
25+
UnsubscribeURL: z.string().optional(),
26+
SigningCertURL: z.string().url().optional(),
27+
}).omit({
28+
UnsubscribeUrl: true,
29+
SigningCertUrl: true,
30+
});
31+
2232
const SnsRecordSchema = z.object({
2333
EventSource: z.literal('aws:sns'),
2434
EventVersion: z.string(),
@@ -30,4 +40,10 @@ const SnsSchema = z.object({
3040
Records: z.array(SnsRecordSchema),
3141
});
3242

33-
export { SnsSchema, SnsRecordSchema, SnsNotificationSchema, SnsMsgAttribute };
43+
export {
44+
SnsSchema,
45+
SnsRecordSchema,
46+
SnsNotificationSchema,
47+
SnsMsgAttribute,
48+
SnsSqsNotificationSchema,
49+
};

packages/parser/src/types/schema.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import { KafkaRecordSchema } from '../schemas/kafka.js';
2+
import { z } from 'zod';
3+
import {
4+
KinesisDataStreamRecord,
5+
KinesisDataStreamRecordPayload,
6+
} from '../schemas/kinesis.js';
7+
import { APIGatewayProxyEventSchema } from '../schemas/apigw.js';
8+
9+
export type KafkaRecord = z.infer<typeof KafkaRecordSchema>;
10+
11+
export type KinesisDataStreamRecord = z.infer<typeof KinesisDataStreamRecord>;
12+
13+
export type KinesisDataStreamRecordPayload = z.infer<
14+
typeof KinesisDataStreamRecordPayload
15+
>;
16+
17+
export type ApiGatewayProxyEvent = z.infer<typeof APIGatewayProxyEventSchema>;
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/**
2+
* Test built in schema envelopes for api gateway
3+
*
4+
* @group unit/parser/envelopes
5+
*/
6+
7+
import { generateMock } from '@anatine/zod-mock';
8+
import { Envelopes } from '../../../src/envelopes/Envelopes.js';
9+
import { TestEvents, TestSchema } from '../schema/utils.js';
10+
import { ApiGatewayProxyEvent } from '../../../src/types/schema.js';
11+
12+
describe('ApigwEnvelope ', () => {
13+
const envelope = Envelopes.API_GW_ENVELOPE;
14+
it('should parse custom schema in envelope', () => {
15+
const testCustomSchemaObject = generateMock(TestSchema);
16+
const testEvent = TestEvents.apiGatewayProxyEvent as ApiGatewayProxyEvent;
17+
18+
testEvent.body = JSON.stringify(testCustomSchemaObject);
19+
const resp = envelope.parse(testEvent, TestSchema);
20+
expect(resp).toEqual(testCustomSchemaObject);
21+
});
22+
23+
it('should throw no body provided', () => {
24+
const testEvent = TestEvents.apiGatewayProxyEvent as ApiGatewayProxyEvent;
25+
testEvent.body = undefined;
26+
27+
expect(() => envelope.parse(testEvent, TestSchema)).toThrow();
28+
});
29+
});
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/**
2+
* Test built in schema envelopes for api gateway v2
3+
*
4+
* @group unit/parser/envelopes
5+
*/
6+
7+
import { Envelopes } from '../../../src/envelopes/Envelopes.js';
8+
import { TestEvents, TestSchema } from '../schema/utils.js';
9+
import { generateMock } from '@anatine/zod-mock';
10+
import { APIGatewayProxyEventV2 } from 'aws-lambda';
11+
12+
describe('ApiGwV2Envelope ', () => {
13+
const envelope = Envelopes.API_GW_V2_ENVELOPE;
14+
15+
it('should parse custom schema in envelope', () => {
16+
const testEvent =
17+
TestEvents.apiGatewayProxyV2Event as APIGatewayProxyEventV2;
18+
const data = generateMock(TestSchema);
19+
20+
testEvent.body = JSON.stringify(data);
21+
22+
expect(envelope.parse(testEvent, TestSchema)).toEqual(data);
23+
});
24+
25+
it('should throw when no body provided', () => {
26+
const testEvent =
27+
TestEvents.apiGatewayProxyV2Event as APIGatewayProxyEventV2;
28+
testEvent.body = undefined;
29+
30+
expect(() => envelope.parse(testEvent, TestSchema)).toThrow();
31+
});
32+
});

0 commit comments

Comments
 (0)