@@ -34,17 +34,19 @@ const EVENT_UPDATE_THRESHOLD_WINDOW_IN_MSECS = 5 * 1000; // 5 seconds
34
34
35
35
export class IngestSendEvent {
36
36
#prismaClient: PrismaClientOrTransaction ;
37
- #rateLimiter: RateLimiter ;
37
+ #rateLimiter: RateLimiter | undefined ;
38
38
39
39
constructor ( prismaClient : PrismaClientOrTransaction = prisma , private deliverEvents = true ) {
40
40
this . #prismaClient = prismaClient ;
41
- this . #rateLimiter = new RateLimiter ( {
42
- keyPrefix : "ingestsendevent" ,
43
- limiter : Ratelimit . slidingWindow (
44
- env . INGEST_EVENT_RATE_LIMIT_MAX ,
45
- env . INGEST_EVENT_RATE_LIMIT_WINDOW as Duration
46
- ) ,
47
- } ) ;
41
+ this . #rateLimiter = env . INGEST_EVENT_RATE_LIMIT_MAX
42
+ ? new RateLimiter ( {
43
+ keyPrefix : "ingestsendevent" ,
44
+ limiter : Ratelimit . slidingWindow (
45
+ env . INGEST_EVENT_RATE_LIMIT_MAX ,
46
+ env . INGEST_EVENT_RATE_LIMIT_WINDOW as Duration
47
+ ) ,
48
+ } )
49
+ : undefined ;
48
50
}
49
51
50
52
#calculateDeliverAt( options ?: SendEventOptions ) {
@@ -76,7 +78,7 @@ export class IngestSendEvent {
76
78
return ;
77
79
}
78
80
79
- return await $transaction ( this . #prismaClient, async ( tx ) => {
81
+ const createdEvent = await $transaction ( this . #prismaClient, async ( tx ) => {
80
82
const externalAccount = options ?. accountId
81
83
? await tx . externalAccount . upsert ( {
82
84
where : {
@@ -115,23 +117,25 @@ export class IngestSendEvent {
115
117
eventSource,
116
118
} ) ) ;
117
119
118
- //rate limit
119
- const { success, reset, limit } = await this . #rateLimiter. limit ( environment . organizationId ) ;
120
-
121
- if ( success ) {
122
- await this . enqueueWorkerEvent ( tx , eventLog ) ;
123
- } else {
124
- logger . info ( "IngestSendEvent: Rate limit exceeded" , {
125
- eventRecordId : eventLog . id ,
126
- organizationId : environment . organizationId ,
127
- reset,
128
- limit,
129
- } ) ;
130
- return ;
131
- }
132
-
133
120
return eventLog ;
134
121
} ) ;
122
+
123
+ if ( ! createdEvent ) return ;
124
+
125
+ //rate limit
126
+ const result = await this . #rateLimiter?. limit ( environment . organizationId ) ;
127
+ if ( result && ! result . success ) {
128
+ logger . info ( "IngestSendEvent: Rate limit exceeded" , {
129
+ eventRecordId : createdEvent . id ,
130
+ organizationId : environment . organizationId ,
131
+ reset : result . reset ,
132
+ limit : result . limit ,
133
+ } ) ;
134
+ return createdEvent ;
135
+ }
136
+
137
+ await this . enqueueWorkerEvent ( this . #prismaClient, createdEvent ) ;
138
+ return createdEvent ;
135
139
} catch ( error ) {
136
140
const prismaError = PrismaErrorSchema . safeParse ( error ) ;
137
141
0 commit comments