Skip to content

Commit ab2e681

Browse files
committed
Add code samples for website documentation
References rabbitmq/rabbitmq-server#2059
1 parent dabf005 commit ab2e681

File tree

1 file changed

+231
-0
lines changed

1 file changed

+231
-0
lines changed
Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
package com.rabbitmq.client.amqp.docs;
2+
3+
import com.rabbitmq.client.amqp.*;
4+
import com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder;
5+
6+
import java.nio.charset.StandardCharsets;
7+
import java.time.Duration;
8+
9+
public class WebsiteDocumentation {
10+
11+
void environment() {
12+
Environment environment = new AmqpEnvironmentBuilder()
13+
.build();
14+
// ...
15+
// close the environment when the application stops
16+
environment.close();
17+
}
18+
19+
void connection() {
20+
Environment environment = null;
21+
22+
// open a connection from the environment
23+
Connection connection = environment.connectionBuilder()
24+
.uri("amqp://admin:admin@localhost:5672/%2f")
25+
.build();
26+
// ...
27+
// close the connection when it is no longer necessary
28+
connection.close();
29+
}
30+
31+
void publishing() {
32+
Connection connection = null;
33+
34+
Publisher publisher = connection.publisherBuilder()
35+
.exchange("foo").key("bar")
36+
.build();
37+
// ...
38+
// close the publisher when it is no longer necessary
39+
publisher.close();
40+
41+
// create the message
42+
Message message = publisher
43+
.message("hello".getBytes(StandardCharsets.UTF_8))
44+
.messageId(1L);
45+
46+
// publish the message and deal with broker feedback
47+
publisher.publish(message, context -> {
48+
// the broker confirmation
49+
if (context.status() == Publisher.Status.ACCEPTED) {
50+
// the broker accepted (confirmed) the message
51+
} else {
52+
// deal with possible failure
53+
}
54+
});
55+
}
56+
57+
void publisherAddressFormat() {
58+
Connection connection = null;
59+
60+
// publish to an exchange with a routing key
61+
Publisher publisher1 = connection.publisherBuilder()
62+
.exchange("foo").key("bar") // /exchanges/foo/bar
63+
.build();
64+
65+
// publish to an exchange without a routing key
66+
Publisher publisher2 = connection.publisherBuilder()
67+
.exchange("foo") // /exchanges/foo
68+
.build();
69+
70+
// publish to a queue
71+
Publisher publisher3 = connection.publisherBuilder()
72+
.queue("some-queue") // /queues/some-queue
73+
.build();
74+
}
75+
76+
void publishAddressFormatInMessages() {
77+
Connection connection = null;
78+
79+
// no target defined on publisher creation
80+
Publisher publisher = connection.publisherBuilder()
81+
.build();
82+
83+
// publish to an exchange with a routing key
84+
Message message1 = publisher.message()
85+
.toAddress().exchange("foo").key("bar")
86+
.message();
87+
88+
// publish to an exchange without a routing key
89+
Message message2 = publisher.message()
90+
.toAddress().exchange("foo")
91+
.message();
92+
93+
// publish to a queue
94+
Message message3 = publisher.message()
95+
.toAddress().queue("my-queue")
96+
.message();
97+
}
98+
99+
void consuming() {
100+
Connection connection = null;
101+
Consumer consumer = connection.consumerBuilder()
102+
.queue("some-queue")
103+
.messageHandler((context, message) -> {
104+
byte[] body = message.body();
105+
// ...
106+
context.accept(); // settle the message
107+
})
108+
.build(); // do not forget to build the instance!
109+
110+
// pause the delivery of messages
111+
consumer.pause();
112+
// ensure the number of unsettled messages reaches 0
113+
long unsettledMessageCount = consumer.unsettledMessageCount();
114+
// close the consumer
115+
consumer.close();
116+
117+
}
118+
119+
void consumingSupportForStreams() {
120+
Connection connection = null;
121+
122+
Consumer consumer = connection.consumerBuilder()
123+
.queue("some-stream")
124+
.stream()
125+
.offset(ConsumerBuilder.StreamOffsetSpecification.FIRST)
126+
.builder()
127+
.messageHandler((context, message) -> {
128+
// message processing
129+
})
130+
.build();
131+
}
132+
133+
void consumingStreamFiltering() {
134+
Connection connection = null;
135+
136+
Consumer consumer = connection.consumerBuilder()
137+
.queue("some-stream")
138+
.stream()
139+
.filterValues("invoices", "orders")
140+
.filterMatchUnfiltered(true)
141+
.builder()
142+
.messageHandler((context, message) -> {
143+
// message processing
144+
})
145+
.build();
146+
147+
}
148+
149+
void management() {
150+
Connection connection = null;
151+
152+
Management management = connection.management();
153+
// ...
154+
// close the management instance when it is no longer needed
155+
management.close();
156+
}
157+
158+
void managementExchange() {
159+
Management management = null;
160+
161+
management.exchange()
162+
.name("my-exchange")
163+
.type(Management.ExchangeType.FANOUT) // enum for built-in type
164+
.declare();
165+
166+
management.exchange()
167+
.name("my-exchange")
168+
.type("x-delayed-message") // non-built-in type
169+
.autoDelete(false)
170+
.argument("x-delayed-type", "direct")
171+
.declare();
172+
173+
management.exchangeDeletion().delete("my-exchange");
174+
}
175+
176+
void managementQueues() {
177+
Management management = null;
178+
179+
management.queue()
180+
.name("my-queue")
181+
.exclusive(true)
182+
.autoDelete(false)
183+
.declare();
184+
185+
management
186+
.queue()
187+
.name("my-queue")
188+
.type(Management.QueueType.CLASSIC)
189+
.messageTtl(Duration.ofMinutes(10))
190+
.maxLengthBytes(ByteCapacity.MB(100))
191+
.declare();
192+
193+
management
194+
.queue()
195+
.name("my-quorum-queue")
196+
.quorum() // set queue type to 'quorum'
197+
.quorumInitialGroupSize(3) // specific to quorum queues
198+
.deliveryLimit(3) // specific to quorum queues
199+
.queue()
200+
.declare();
201+
202+
Management.QueueInfo info = management.queueInfo("my-queue");
203+
long messageCount = info.messageCount();
204+
int consumerCount = info.consumerCount();
205+
String leaderNode = info.leader();
206+
207+
management.queueDeletion().delete("my-queue");
208+
}
209+
210+
void binding() {
211+
Management management = null;
212+
213+
management.binding()
214+
.sourceExchange("my-exchange")
215+
.destinationQueue("my-queue")
216+
.key("foo")
217+
.bind();
218+
219+
management.binding()
220+
.sourceExchange("my-exchange")
221+
.destinationExchange("my-other-exchange")
222+
.key("foo")
223+
.bind();
224+
225+
management.unbind()
226+
.sourceExchange("my-exchange")
227+
.destinationQueue("my-queue")
228+
.key("foo")
229+
.unbind();
230+
}
231+
}

0 commit comments

Comments
 (0)