@@ -23,10 +23,10 @@ public async Task PublisherMetricsShouldBeIncrementedWhenMessageIsSendWithSucces
23
23
Assert . NotNull ( _management ) ;
24
24
Assert . NotNull ( _meterFactory ) ;
25
25
Assert . NotNull ( _metricsReporter ) ;
26
- var messageSendCollector =
27
- new MetricCollector < int > ( _meterFactory , "RabbitMQ.Amqp" , "messaging.client.sent.messages" ) ;
28
- var messageSendDurationCollector =
29
- new MetricCollector < double > ( _meterFactory , "RabbitMQ.Amqp" , "messaging.client.operation.duration" ) ;
26
+ MetricCollector < int > clientSendMessageCounterCollector =
27
+ new ( _meterFactory , "RabbitMQ.Amqp" , "messaging.client.sent.messages" ) ;
28
+ MetricCollector < double > clientSendDurationCollector =
29
+ new ( _meterFactory , "RabbitMQ.Amqp" , "messaging.client.operation.duration" ) ;
30
30
31
31
IQueueSpecification queueSpecification = _management . Queue ( _queueName ) ;
32
32
await queueSpecification . DeclareAsync ( ) ;
@@ -37,34 +37,122 @@ public async Task PublisherMetricsShouldBeIncrementedWhenMessageIsSendWithSucces
37
37
38
38
await SystemUtils . WaitUntilQueueMessageCount ( queueSpecification , 1 ) ;
39
39
40
- var consumedMessagesMeasurements = messageSendCollector . GetMeasurementSnapshot ( ) ;
41
- Assert . NotEmpty ( consumedMessagesMeasurements ) ;
42
- Assert . Equal ( 1 , consumedMessagesMeasurements [ 0 ] . Value ) ;
43
- Assert . Equal ( consumedMessagesMeasurements [ 0 ] . Tags [ "messaging.system" ] , "rabbitmq" ) ;
44
- Assert . Equal ( consumedMessagesMeasurements [ 0 ] . Tags [ "messaging.operation.name" ] , "publish" ) ;
45
- Assert . Equal ( consumedMessagesMeasurements [ 0 ] . Tags [ "messaging.operation.type" ] , "send" ) ;
46
- Assert . Equal ( consumedMessagesMeasurements [ 0 ] . Tags [ "messaging.destination.name" ] ,
40
+ var clientSendMessagesMeasurements =
41
+ clientSendMessageCounterCollector
42
+ . GetMeasurementSnapshot ( ) ;
43
+ Assert . NotEmpty ( clientSendMessagesMeasurements ) ;
44
+ Assert . Equal ( 1 , clientSendMessagesMeasurements [ 0 ] . Value ) ;
45
+ Assert . Equal ( clientSendMessagesMeasurements [ 0 ] . Tags [ "messaging.system" ] , "rabbitmq" ) ;
46
+ Assert . Equal ( clientSendMessagesMeasurements [ 0 ] . Tags [ "messaging.operation.name" ] , "publish" ) ;
47
+ Assert . Equal ( clientSendMessagesMeasurements [ 0 ] . Tags [ "messaging.operation.type" ] , "send" ) ;
48
+ Assert . Equal ( clientSendMessagesMeasurements [ 0 ] . Tags [ "messaging.destination.name" ] ,
47
49
$ "/queues/{ Utils . EncodePathSegment ( queueSpecification . QueueName ) } ") ;
48
- Assert . Equal ( consumedMessagesMeasurements [ 0 ] . Tags [ "server.port" ] ,
50
+ Assert . Equal ( clientSendMessagesMeasurements [ 0 ] . Tags [ "server.port" ] ,
49
51
_connectionSettings ! . Port ) ;
50
- Assert . Equal ( consumedMessagesMeasurements [ 0 ] . Tags [ "server.address" ] ,
52
+ Assert . Equal ( clientSendMessagesMeasurements [ 0 ] . Tags [ "server.address" ] ,
51
53
_connectionSettings ! . Host ) ;
52
54
53
- var sendDurationsMeasurements = messageSendDurationCollector . GetMeasurementSnapshot ( ) ;
54
- Assert . NotEmpty ( sendDurationsMeasurements ) ;
55
- Assert . True ( sendDurationsMeasurements [ 0 ] . Value > 0 ) ;
56
- Assert . Equal ( sendDurationsMeasurements [ 0 ] . Tags [ "messaging.system" ] , "rabbitmq" ) ;
57
- Assert . Equal ( sendDurationsMeasurements [ 0 ] . Tags [ "messaging.operation.name" ] , "publish" ) ;
58
- Assert . Equal ( sendDurationsMeasurements [ 0 ] . Tags [ "messaging.operation.type" ] , "send" ) ;
59
- Assert . Equal ( sendDurationsMeasurements [ 0 ] . Tags [ "messaging.destination.name" ] ,
55
+ var clientSendDurationsMeasurements =
56
+ clientSendDurationCollector
57
+ . GetMeasurementSnapshot ( ) ;
58
+ Assert . NotEmpty ( clientSendDurationsMeasurements ) ;
59
+ Assert . True ( clientSendDurationsMeasurements [ 0 ] . Value > 0 ) ;
60
+ Assert . Equal ( clientSendDurationsMeasurements [ 0 ] . Tags [ "messaging.system" ] , "rabbitmq" ) ;
61
+ Assert . Equal ( clientSendDurationsMeasurements [ 0 ] . Tags [ "messaging.operation.name" ] , "publish" ) ;
62
+ Assert . Equal ( clientSendDurationsMeasurements [ 0 ] . Tags [ "messaging.operation.type" ] , "send" ) ;
63
+ Assert . Equal ( clientSendDurationsMeasurements [ 0 ] . Tags [ "messaging.destination.name" ] ,
60
64
$ "/queues/{ Utils . EncodePathSegment ( queueSpecification . QueueName ) } ") ;
61
- Assert . Equal ( sendDurationsMeasurements [ 0 ] . Tags [ "server.port" ] ,
65
+ Assert . Equal ( clientSendDurationsMeasurements [ 0 ] . Tags [ "server.port" ] ,
62
66
_connectionSettings ! . Port ) ;
63
- Assert . Equal ( sendDurationsMeasurements [ 0 ] . Tags [ "server.address" ] ,
67
+ Assert . Equal ( clientSendDurationsMeasurements [ 0 ] . Tags [ "server.address" ] ,
64
68
_connectionSettings ! . Host ) ;
65
69
66
70
await publisher . CloseAsync ( ) ;
67
71
publisher . Dispose ( ) ;
68
72
}
73
+
74
+ [ Fact ]
75
+ public async Task PublisherShouldRecordAFailureWhenSendingThrowAnException ( )
76
+ {
77
+ Assert . NotNull ( _connection ) ;
78
+ Assert . NotNull ( _management ) ;
79
+ Assert . NotNull ( _meterFactory ) ;
80
+ Assert . NotNull ( _metricsReporter ) ;
81
+ MetricCollector < int > clientSendMessageCounterCollector =
82
+ new ( _meterFactory , "RabbitMQ.Amqp" , "messaging.client.sent.messages" ) ;
83
+ MetricCollector < double > clientSendDurationCollector =
84
+ new ( _meterFactory , "RabbitMQ.Amqp" , "messaging.client.operation.duration" ) ;
85
+
86
+ IMessage message = new AmqpMessage ( Encoding . ASCII . GetBytes ( "hello" ) ) ;
87
+
88
+ IExchangeSpecification exchangeSpecification = _management . Exchange ( _exchangeName ) . Type ( ExchangeType . FANOUT ) ;
89
+ await exchangeSpecification . DeclareAsync ( ) ;
90
+
91
+ IPublisherBuilder publisherBuilder = _connection . PublisherBuilder ( ) ;
92
+ // TODO implement Listeners
93
+ IPublisher publisher = await publisherBuilder . Exchange ( exchangeSpecification ) . BuildAsync ( ) ;
94
+
95
+ try
96
+ {
97
+ IQueueSpecification queueSpecification = _management . Queue ( ) . Exclusive ( true ) ;
98
+ IQueueInfo queueInfo = await queueSpecification . DeclareAsync ( ) ;
99
+ IBindingSpecification bindingSpecification = _management . Binding ( )
100
+ . SourceExchange ( _exchangeName )
101
+ . DestinationQueue ( queueInfo . Name ( ) ) ;
102
+ await bindingSpecification . BindAsync ( ) ;
103
+
104
+ PublishResult publishResult = await publisher . PublishAsync ( message ) ;
105
+ Assert . Equal ( OutcomeState . Accepted , publishResult . Outcome . State ) ;
106
+ }
107
+ finally
108
+ {
109
+ await exchangeSpecification . DeleteAsync ( ) ;
110
+ }
111
+
112
+ for ( int i = 0 ; i < 100 ; i ++ )
113
+ {
114
+ PublishResult nextPublishResult = await publisher . PublishAsync ( message ) ;
115
+ if ( OutcomeState . Rejected == nextPublishResult . Outcome . State )
116
+ {
117
+ break ;
118
+ }
119
+
120
+ await Task . Delay ( TimeSpan . FromMilliseconds ( 100 ) ) ;
121
+ }
122
+
123
+ var failedSendMeasure =
124
+ clientSendMessageCounterCollector
125
+ . LastMeasurement ! ;
126
+ Assert . Equal ( 1 , failedSendMeasure . Value ) ;
127
+ Assert . Equal ( failedSendMeasure . Tags [ "messaging.system" ] , "rabbitmq" ) ;
128
+ Assert . Equal ( failedSendMeasure . Tags [ "messaging.operation.name" ] , "publish" ) ;
129
+ Assert . Equal ( failedSendMeasure . Tags [ "messaging.operation.type" ] , "send" ) ;
130
+ Assert . Equal ( failedSendMeasure . Tags [ "messaging.destination.name" ] ,
131
+ $ "/exchanges/{ Utils . EncodePathSegment ( exchangeSpecification . ExchangeName ) } ") ;
132
+ Assert . Equal ( failedSendMeasure . Tags [ "server.port" ] ,
133
+ _connectionSettings ! . Port ) ;
134
+ Assert . Equal ( failedSendMeasure . Tags [ "server.address" ] ,
135
+ _connectionSettings ! . Host ) ;
136
+ Assert . Equal ( failedSendMeasure . Tags [ "error.type" ] ,
137
+ "amqp:not-found" ) ;
138
+
139
+ var failedMessageSendDuration =
140
+ clientSendDurationCollector
141
+ . LastMeasurement ! ;
142
+ Assert . True ( failedMessageSendDuration . Value > 0 ) ;
143
+ Assert . Equal ( failedMessageSendDuration . Tags [ "messaging.system" ] , "rabbitmq" ) ;
144
+ Assert . Equal ( failedMessageSendDuration . Tags [ "messaging.operation.name" ] , "publish" ) ;
145
+ Assert . Equal ( failedMessageSendDuration . Tags [ "messaging.operation.type" ] , "send" ) ;
146
+ Assert . Equal ( failedMessageSendDuration . Tags [ "messaging.destination.name" ] ,
147
+ $ "/exchanges/{ Utils . EncodePathSegment ( exchangeSpecification . ExchangeName ) } ") ;
148
+ Assert . Equal ( failedMessageSendDuration . Tags [ "server.port" ] ,
149
+ _connectionSettings ! . Port ) ;
150
+ Assert . Equal ( failedMessageSendDuration . Tags [ "server.address" ] ,
151
+ _connectionSettings ! . Host ) ;
152
+ Assert . Equal ( failedMessageSendDuration . Tags [ "error.type" ] ,
153
+ "amqp:not-found" ) ;
154
+ await publisher . CloseAsync ( ) ;
155
+ publisher . Dispose ( ) ;
156
+ }
69
157
}
70
158
#endif
0 commit comments