Skip to content

Commit e1bc4cd

Browse files
committed
Merge branch '4.2.x-stable'
2 parents bf76e45 + 295688c commit e1bc4cd

File tree

1 file changed

+99
-17
lines changed
  • src/test/java/com/rabbitmq/client/test/functional

1 file changed

+99
-17
lines changed

src/test/java/com/rabbitmq/client/test/functional/Metrics.java

+99-17
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import java.util.Random;
3333
import java.util.concurrent.*;
3434

35-
import static org.awaitility.Awaitility.to;
3635
import static org.awaitility.Awaitility.waitAtMost;
3736
import static org.hamcrest.Matchers.equalTo;
3837
import static org.hamcrest.Matchers.is;
@@ -106,15 +105,15 @@ private void doMetrics(ConnectionFactory connectionFactory) throws IOException,
106105
assertThat(metrics.getConsumedMessages().getCount(), is(2L+1L));
107106

108107
channel.basicConsume(QUEUE, true, new DefaultConsumer(channel));
109-
waitAtMost(timeout()).untilCall(to(metrics.getConsumedMessages()).getCount(), equalTo(2L+1L+1L));
108+
waitAtMost(timeout()).until(new ConsumedMessagesMetricsCallable(metrics), equalTo(2L+1L+1L));
110109

111110
safeClose(connection1);
112-
waitAtMost(timeout()).untilCall(to(metrics.getConnections()).getCount(), equalTo(1L));
113-
waitAtMost(timeout()).untilCall(to(metrics.getChannels()).getCount(), equalTo(2L));
111+
waitAtMost(timeout()).until(new ConnectionsMetricsCallable(metrics), equalTo(1L));
112+
waitAtMost(timeout()).until(new ChannelsMetricsCallable(metrics), equalTo(2L));
114113

115114
safeClose(connection2);
116-
waitAtMost(timeout()).untilCall(to(metrics.getConnections()).getCount(), equalTo(0L));
117-
waitAtMost(timeout()).untilCall(to(metrics.getChannels()).getCount(), equalTo(0L));
115+
waitAtMost(timeout()).until(new ConnectionsMetricsCallable(metrics), equalTo(0L));
116+
waitAtMost(timeout()).until(new ChannelsMetricsCallable(metrics), equalTo(0L));
118117

119118
assertThat(metrics.getAcknowledgedMessages().getCount(), is(0L));
120119
assertThat(metrics.getRejectedMessages().getCount(), is(0L));
@@ -192,13 +191,13 @@ private void doMetricsAck(ConnectionFactory connectionFactory) throws IOExceptio
192191
sendMessage(i%2 == 0 ? channel1 : channel2);
193192
}
194193

195-
waitAtMost(timeout()).untilCall(
196-
to(metrics.getConsumedMessages()).getCount(),
194+
waitAtMost(timeout()).until(
195+
new ConsumedMessagesMetricsCallable(metrics),
197196
equalTo(alreadySentMessages+nbMessages)
198197
);
199198

200-
waitAtMost(timeout()).untilCall(
201-
to(metrics.getAcknowledgedMessages()).getCount(),
199+
waitAtMost(timeout()).until(
200+
new AcknowledgedMessagesMetricsCallable(metrics),
202201
equalTo(alreadySentMessages+nbMessages)
203202
);
204203

@@ -296,7 +295,7 @@ private void doMultiThreadedMetrics(ConnectionFactory connectionFactory) throws
296295
executorService.invokeAll(tasks);
297296

298297
assertThat(metrics.getPublishedMessages().getCount(), is(nbOfMessages));
299-
waitAtMost(timeout()).untilCall(to(metrics.getConsumedMessages()).getCount(), equalTo(nbOfMessages));
298+
waitAtMost(timeout()).until(new ConsumedMessagesMetricsCallable(metrics), equalTo(nbOfMessages));
300299
assertThat(metrics.getAcknowledgedMessages().getCount(), is(0L));
301300

302301
// to remove the listeners
@@ -325,8 +324,8 @@ private void doMultiThreadedMetrics(ConnectionFactory connectionFactory) throws
325324
executorService.invokeAll(tasks);
326325

327326
assertThat(metrics.getPublishedMessages().getCount(), is(2*nbOfMessages));
328-
waitAtMost(timeout()).untilCall(to(metrics.getConsumedMessages()).getCount(), equalTo(2*nbOfMessages));
329-
waitAtMost(timeout()).untilCall(to(metrics.getAcknowledgedMessages()).getCount(), equalTo(nbOfMessages));
327+
waitAtMost(timeout()).until(new ConsumedMessagesMetricsCallable(metrics), equalTo(2*nbOfMessages));
328+
waitAtMost(timeout()).until(new AcknowledgedMessagesMetricsCallable(metrics), equalTo(nbOfMessages));
330329

331330
// to remove the listeners
332331
for(int i = 0; i < nbChannels; i++) {
@@ -354,9 +353,9 @@ private void doMultiThreadedMetrics(ConnectionFactory connectionFactory) throws
354353
executorService.invokeAll(tasks);
355354

356355
assertThat(metrics.getPublishedMessages().getCount(), is(3*nbOfMessages));
357-
waitAtMost(timeout()).untilCall(to(metrics.getConsumedMessages()).getCount(), equalTo(3*nbOfMessages));
358-
waitAtMost(timeout()).untilCall(to(metrics.getAcknowledgedMessages()).getCount(), equalTo(nbOfMessages));
359-
waitAtMost(timeout()).untilCall(to(metrics.getRejectedMessages()).getCount(), equalTo(nbOfMessages));
356+
waitAtMost(timeout()).until(new ConsumedMessagesMetricsCallable(metrics), equalTo(3*nbOfMessages));
357+
waitAtMost(timeout()).until(new AcknowledgedMessagesMetricsCallable(metrics), equalTo(nbOfMessages));
358+
waitAtMost(timeout()).until(new RejectedMessagesMetricsCallable(metrics), equalTo(nbOfMessages));
360359
} finally {
361360
for (Connection connection : connections) {
362361
safeClose(connection);
@@ -390,7 +389,7 @@ private void errorInChannel(ConnectionFactory connectionFactory) throws IOExcept
390389

391390
channel.basicPublish("unlikelynameforanexchange", "", null, "msg".getBytes("UTF-8"));
392391

393-
waitAtMost(timeout()).untilCall(to(metrics.getChannels()).getCount(), is(0L));
392+
waitAtMost(timeout()).until(new ChannelsMetricsCallable(metrics), is(0L));
394393
assertThat(metrics.getConnections().getCount(), is(1L));
395394
} finally {
396395
safeClose(connection);
@@ -590,4 +589,87 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
590589
}
591590
}
592591

592+
static abstract class MetricsCallable implements Callable<Long> {
593+
594+
final StandardMetricsCollector metrics;
595+
596+
protected MetricsCallable(StandardMetricsCollector metrics) {
597+
this.metrics = metrics;
598+
}
599+
600+
601+
}
602+
603+
static class ConnectionsMetricsCallable extends MetricsCallable {
604+
605+
ConnectionsMetricsCallable(StandardMetricsCollector metrics) {
606+
super(metrics);
607+
}
608+
609+
@Override
610+
public Long call() throws Exception {
611+
return metrics.getConnections().getCount();
612+
}
613+
}
614+
615+
static class ChannelsMetricsCallable extends MetricsCallable {
616+
617+
ChannelsMetricsCallable(StandardMetricsCollector metrics) {
618+
super(metrics);
619+
}
620+
621+
@Override
622+
public Long call() throws Exception {
623+
return metrics.getChannels().getCount();
624+
}
625+
}
626+
627+
static class PublishedMessagesMetricsCallable extends MetricsCallable {
628+
629+
PublishedMessagesMetricsCallable(StandardMetricsCollector metrics) {
630+
super(metrics);
631+
}
632+
633+
@Override
634+
public Long call() throws Exception {
635+
return metrics.getPublishedMessages().getCount();
636+
}
637+
}
638+
639+
static class ConsumedMessagesMetricsCallable extends MetricsCallable {
640+
641+
ConsumedMessagesMetricsCallable(StandardMetricsCollector metrics) {
642+
super(metrics);
643+
}
644+
645+
@Override
646+
public Long call() throws Exception {
647+
return metrics.getConsumedMessages().getCount();
648+
}
649+
}
650+
651+
static class AcknowledgedMessagesMetricsCallable extends MetricsCallable {
652+
653+
AcknowledgedMessagesMetricsCallable(StandardMetricsCollector metrics) {
654+
super(metrics);
655+
}
656+
657+
@Override
658+
public Long call() throws Exception {
659+
return metrics.getAcknowledgedMessages().getCount();
660+
}
661+
}
662+
663+
static class RejectedMessagesMetricsCallable extends MetricsCallable {
664+
665+
RejectedMessagesMetricsCallable(StandardMetricsCollector metrics) {
666+
super(metrics);
667+
}
668+
669+
@Override
670+
public Long call() throws Exception {
671+
return metrics.getRejectedMessages().getCount();
672+
}
673+
}
674+
593675
}

0 commit comments

Comments
 (0)