25
25
import com .rabbitmq .model .Management ;
26
26
import com .rabbitmq .model .Publisher ;
27
27
import com .rabbitmq .model .observation .micrometer .MicrometerObservationCollector ;
28
+ import io .micrometer .tracing .exporter .FinishedSpan ;
28
29
import io .micrometer .tracing .test .SampleTestRunner ;
30
+ import io .micrometer .tracing .test .reporter .BuildingBlocks ;
29
31
import io .micrometer .tracing .test .simple .SpanAssert ;
30
32
import io .micrometer .tracing .test .simple .SpansAssert ;
31
33
import java .nio .charset .StandardCharsets ;
34
+ import java .util .List ;
32
35
import java .util .UUID ;
33
36
import java .util .concurrent .CountDownLatch ;
34
37
import java .util .concurrent .atomic .AtomicReference ;
@@ -101,7 +104,7 @@ public SampleTestRunner.SampleTestRunnerConsumer yourCode() {
101
104
102
105
waitAtMost (() -> buildingBlocks .getFinishedSpans ().size () == 2 );
103
106
SpansAssert .assertThat (buildingBlocks .getFinishedSpans ()).haveSameTraceId ().hasSize (2 );
104
- SpanAssert .assertThat (buildingBlocks . getFinishedSpans (). get ( 0 ))
107
+ SpanAssert .assertThat (lastPublish ( buildingBlocks ))
105
108
.hasNameEqualTo (e + " publish" )
106
109
.hasTag ("messaging.rabbitmq.destination.routing_key" , "foo" )
107
110
.hasTag ("messaging.destination.name" , e )
@@ -113,7 +116,7 @@ public SampleTestRunner.SampleTestRunnerConsumer yourCode() {
113
116
.hasTag ("net.protocol.name" , "amqp" )
114
117
.hasTag ("net.protocol.version" , "1.0" );
115
118
116
- SpanAssert .assertThat (buildingBlocks . getFinishedSpans (). get ( 1 ))
119
+ SpanAssert .assertThat (lastProcess ( buildingBlocks ))
117
120
.hasNameEqualTo (q + " process" )
118
121
.hasTag ("messaging.rabbitmq.destination.routing_key" , "foo" )
119
122
.hasTag ("messaging.destination.name" , e )
@@ -142,16 +145,19 @@ public SampleTestRunner.SampleTestRunnerConsumer yourCode() {
142
145
publisher = publisherConnection .publisherBuilder ().exchange (e ).build ();
143
146
144
147
consumeLatch .set (new CountDownLatch (1 ));
145
- publisher .publish (publisher .message (PAYLOAD ), ctx -> {});
148
+ messageId = UUID .randomUUID ();
149
+ publisher .publish (publisher .message (PAYLOAD ).messageId (messageId ), ctx -> {});
146
150
assertThat (consumeLatch ).completes ();
147
151
waitAtMost (() -> buildingBlocks .getFinishedSpans ().size () == 4 );
148
152
SpansAssert .assertThat (buildingBlocks .getFinishedSpans ()).haveSameTraceId ().hasSize (4 );
149
- SpanAssert .assertThat (buildingBlocks . getFinishedSpans (). get ( 2 ))
153
+ SpanAssert .assertThat (lastPublish ( buildingBlocks ))
150
154
.hasNameEqualTo (e + " publish" )
155
+ .hasTag ("messaging.message.id" , messageId .toString ())
151
156
.hasTag ("messaging.rabbitmq.destination.routing_key" , "" )
152
157
.hasTag ("messaging.destination.name" , e );
153
- SpanAssert .assertThat (buildingBlocks . getFinishedSpans (). get ( 3 ))
158
+ SpanAssert .assertThat (lastProcess ( buildingBlocks ))
154
159
.hasNameEqualTo (q + " process" )
160
+ .hasTag ("messaging.message.id" , messageId .toString ())
155
161
.hasTag ("messaging.rabbitmq.destination.routing_key" , "" )
156
162
.hasTag ("messaging.destination.name" , e )
157
163
.hasTag ("messaging.source.name" , q );
@@ -160,16 +166,19 @@ public SampleTestRunner.SampleTestRunnerConsumer yourCode() {
160
166
publisher = publisherConnection .publisherBuilder ().queue (q ).build ();
161
167
162
168
consumeLatch .set (new CountDownLatch (1 ));
163
- publisher .publish (publisher .message (PAYLOAD ), ctx -> {});
169
+ messageId = UUID .randomUUID ();
170
+ publisher .publish (publisher .message (PAYLOAD ).messageId (messageId ), ctx -> {});
164
171
assertThat (consumeLatch ).completes ();
165
172
waitAtMost (() -> buildingBlocks .getFinishedSpans ().size () == 6 );
166
173
SpansAssert .assertThat (buildingBlocks .getFinishedSpans ()).haveSameTraceId ().hasSize (6 );
167
- SpanAssert .assertThat (buildingBlocks . getFinishedSpans (). get ( 4 ))
174
+ SpanAssert .assertThat (lastPublish ( buildingBlocks ))
168
175
.hasNameEqualTo ("amq.default publish" )
176
+ .hasTag ("messaging.message.id" , messageId .toString ())
169
177
.hasTag ("messaging.rabbitmq.destination.routing_key" , q )
170
178
.hasTag ("messaging.destination.name" , "" );
171
- SpanAssert .assertThat (buildingBlocks . getFinishedSpans (). get ( 5 ))
179
+ SpanAssert .assertThat (lastProcess ( buildingBlocks ))
172
180
.hasNameEqualTo (q + " process" )
181
+ .hasTag ("messaging.message.id" , messageId .toString ())
173
182
.hasTag ("messaging.rabbitmq.destination.routing_key" , q )
174
183
.hasTag ("messaging.destination.name" , "" )
175
184
.hasTag ("messaging.source.name" , q );
@@ -178,22 +187,51 @@ public SampleTestRunner.SampleTestRunnerConsumer yourCode() {
178
187
publisher = publisherConnection .publisherBuilder ().build ();
179
188
180
189
consumeLatch .set (new CountDownLatch (1 ));
190
+ messageId = UUID .randomUUID ();
181
191
publisher .publish (
182
- publisher .message (PAYLOAD ).toAddress ().exchange (e ).key ("foo" ).message (), ctx -> {});
192
+ publisher
193
+ .message (PAYLOAD )
194
+ .messageId (messageId )
195
+ .toAddress ()
196
+ .exchange (e )
197
+ .key ("foo" )
198
+ .message (),
199
+ ctx -> {});
183
200
assertThat (consumeLatch ).completes ();
184
201
waitAtMost (() -> buildingBlocks .getFinishedSpans ().size () == 8 );
185
202
SpansAssert .assertThat (buildingBlocks .getFinishedSpans ()).haveSameTraceId ().hasSize (8 );
186
- SpanAssert .assertThat (buildingBlocks . getFinishedSpans (). get ( 6 ))
203
+ SpanAssert .assertThat (lastPublish ( buildingBlocks ))
187
204
.hasNameEqualTo (e + " publish" )
205
+ .hasTag ("messaging.message.id" , messageId .toString ())
188
206
.hasTag ("messaging.rabbitmq.destination.routing_key" , "foo" )
189
207
.hasTag ("messaging.destination.name" , e );
190
- SpanAssert .assertThat (buildingBlocks . getFinishedSpans (). get ( 7 ))
208
+ SpanAssert .assertThat (lastProcess ( buildingBlocks ))
191
209
.hasNameEqualTo (q + " process" )
210
+ .hasTag ("messaging.message.id" , messageId .toString ())
192
211
.hasTag ("messaging.rabbitmq.destination.routing_key" , "foo" )
193
212
.hasTag ("messaging.destination.name" , e )
194
213
.hasTag ("messaging.source.name" , q );
195
214
}
196
215
};
197
216
}
198
217
}
218
+
219
+ private static FinishedSpan lastPublish (BuildingBlocks blocks ) {
220
+ return lastWithNameEnding (blocks , "publish" );
221
+ }
222
+
223
+ private static FinishedSpan lastProcess (BuildingBlocks blocks ) {
224
+ return lastWithNameEnding (blocks , "process" );
225
+ }
226
+
227
+ private static FinishedSpan lastWithNameEnding (BuildingBlocks blocks , String nameEnding ) {
228
+ List <FinishedSpan > spans = blocks .getFinishedSpans ();
229
+ for (int i = spans .size () - 1 ; i >= 0 ; i --) {
230
+ FinishedSpan span = spans .get (i );
231
+ if (span .getName ().endsWith (nameEnding )) {
232
+ return span ;
233
+ }
234
+ }
235
+ throw new IllegalStateException ();
236
+ }
199
237
}
0 commit comments