7
7
import { v1 } from "@authzed/authzed-node" ;
8
8
import { log } from "@gitpod/gitpod-protocol/lib/util/logging" ;
9
9
import * as grpc from "@grpc/grpc-js_1_9_0" ;
10
+ import { Status } from "@grpc/grpc-js_1_9_0/build/src/constants" ;
11
+ import { inject , optional } from "inversify" ;
10
12
11
13
export const SpiceDBClientProvider = Symbol ( "SpiceDBClientProvider" ) ;
14
+ export const SpiceDBClientCallMetrics = Symbol ( "SpiceDBClientCallMetrics" ) ;
12
15
13
16
export interface SpiceDBClientConfig {
14
17
address : string ;
@@ -57,7 +60,17 @@ function spicedbClientFromConfig(config: SpiceDBClientConfig): Client {
57
60
export class CachingSpiceDBClientProvider implements SpiceDBClientProvider {
58
61
private client : Client | undefined ;
59
62
60
- constructor ( private readonly clientConfig : SpiceDBClientConfig ) { }
63
+ @inject ( SpiceDBClientCallMetrics )
64
+ @optional ( )
65
+ private readonly clientCallMetrics : IClientCallMetrics ;
66
+
67
+ private readonly interceptors : grpc . Interceptor [ ] = [ ] ;
68
+
69
+ constructor ( private readonly _clientConfig : SpiceDBClientConfig ) {
70
+ if ( this . clientCallMetrics ) {
71
+ this . interceptors . push ( createClientCallMetricsInterceptor ( this . clientCallMetrics ) ) ;
72
+ }
73
+ }
61
74
62
75
getClient ( ) : SpiceDBClient {
63
76
let client = this . client ;
@@ -76,4 +89,107 @@ export class CachingSpiceDBClientProvider implements SpiceDBClientProvider {
76
89
77
90
return client . promises ;
78
91
}
92
+
93
+ protected get clientConfig ( ) {
94
+ const config = this . _clientConfig ;
95
+ if ( this . interceptors ) {
96
+ return {
97
+ ...config ,
98
+ options : {
99
+ interceptors : [ ...this . interceptors ] ,
100
+ } ,
101
+ } ;
102
+ }
103
+ return config ;
104
+ }
105
+ }
106
+
107
+ // TODO(gpl): remove all of this and merge again with "util/grpc.ts" once we are beyond grpc-js 1.9.0
108
+ export type GrpcMethodType = "unary" | "client_stream" | "server_stream" | "bidi_stream" ;
109
+
110
+ export interface IGrpcCallMetricsLabels {
111
+ service : string ;
112
+ method : string ;
113
+ type : GrpcMethodType ;
114
+ }
115
+
116
+ export interface IGrpcCallMetricsLabelsWithCode extends IGrpcCallMetricsLabels {
117
+ code : string ;
118
+ }
119
+
120
+ export const IClientCallMetrics = Symbol ( "IClientCallMetrics" ) ;
121
+
122
+ export interface IClientCallMetrics {
123
+ started ( labels : IGrpcCallMetricsLabels ) : void ;
124
+ sent ( labels : IGrpcCallMetricsLabels ) : void ;
125
+ received ( labels : IGrpcCallMetricsLabels ) : void ;
126
+ handled ( labels : IGrpcCallMetricsLabelsWithCode ) : void ;
127
+ startHandleTimer (
128
+ labels : IGrpcCallMetricsLabels ,
129
+ ) : ( labels ?: Partial < Record < string , string | number > > | undefined ) => number ;
130
+ }
131
+
132
+ export function getGrpcMethodType ( requestStream : boolean , responseStream : boolean ) : GrpcMethodType {
133
+ if ( requestStream ) {
134
+ if ( responseStream ) {
135
+ return "bidi_stream" ;
136
+ } else {
137
+ return "client_stream" ;
138
+ }
139
+ } else {
140
+ if ( responseStream ) {
141
+ return "server_stream" ;
142
+ } else {
143
+ return "unary" ;
144
+ }
145
+ }
146
+ }
147
+
148
+ export function createClientCallMetricsInterceptor ( metrics : IClientCallMetrics ) : grpc . Interceptor {
149
+ return ( options , nextCall ) : grpc . InterceptingCall => {
150
+ const methodDef = options . method_definition ;
151
+ const method = methodDef . path . substring ( methodDef . path . lastIndexOf ( "/" ) + 1 ) ;
152
+ const service = methodDef . path . substring ( 1 , methodDef . path . length - method . length - 1 ) ;
153
+ const labels = {
154
+ service,
155
+ method,
156
+ type : getGrpcMethodType ( options . method_definition . requestStream , options . method_definition . responseStream ) ,
157
+ } ;
158
+ const requester = new grpc . RequesterBuilder ( )
159
+ . withStart ( ( metadata , listener , next ) => {
160
+ const newListener = new grpc . ListenerBuilder ( )
161
+ . withOnReceiveStatus ( ( status , next ) => {
162
+ try {
163
+ metrics . handled ( {
164
+ ...labels ,
165
+ code : Status [ status . code ] ,
166
+ } ) ;
167
+ } finally {
168
+ next ( status ) ;
169
+ }
170
+ } )
171
+ . withOnReceiveMessage ( ( message , next ) => {
172
+ try {
173
+ metrics . received ( labels ) ;
174
+ } finally {
175
+ next ( message ) ;
176
+ }
177
+ } )
178
+ . build ( ) ;
179
+ try {
180
+ metrics . started ( labels ) ;
181
+ } finally {
182
+ next ( metadata , newListener ) ;
183
+ }
184
+ } )
185
+ . withSendMessage ( ( message , next ) => {
186
+ try {
187
+ metrics . sent ( labels ) ;
188
+ } finally {
189
+ next ( message ) ;
190
+ }
191
+ } )
192
+ . build ( ) ;
193
+ return new grpc . InterceptingCall ( nextCall ( options ) , requester ) ;
194
+ } ;
79
195
}
0 commit comments