Skip to content

Commit e55a1f9

Browse files
Merge pull request #251 from rabbitmq/rabbitmq-java-client-246
Use lambdas for *Listener objects
2 parents aaa6f90 + a9c5c47 commit e55a1f9

18 files changed

+413
-6
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
package com.rabbitmq.client;
17+
18+
import java.io.IOException;
19+
20+
/**
21+
* Implement this interface in order to be notified of connection block events.
22+
* Prefer it over {@link BlockedListener} for a lambda-oriented syntax.
23+
* @see BlockedListener
24+
* @see UnblockedCallback
25+
*/
26+
@FunctionalInterface
27+
public interface BlockedCallback {
28+
29+
void handle(String reason) throws IOException;
30+
31+
}

src/main/java/com/rabbitmq/client/BlockedListener.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
/**
2222
* Implement this interface in order to be notified of connection block and
2323
* unblock events.
24+
* For a lambda-oriented syntax, use {@link BlockedCallback} and
25+
* {@link UnblockedCallback}.
2426
*/
2527
public interface BlockedListener {
2628
void handleBlocked(String reason) throws IOException;

src/main/java/com/rabbitmq/client/Channel.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,16 @@ public interface Channel extends ShutdownNotifier {
107107
*/
108108
void addReturnListener(ReturnListener listener);
109109

110+
/**
111+
* Add a lambda-based {@link ReturnListener}.
112+
* @see ReturnListener
113+
* @see ReturnCallback
114+
* @see Return
115+
* @param returnCallback the callback when the message is returned
116+
* @return the listener that wraps the callback
117+
*/
118+
ReturnListener addReturnListener(ReturnCallback returnCallback);
119+
110120
/**
111121
* Remove a {@link ReturnListener}.
112122
* @param listener the listener to remove
@@ -126,6 +136,16 @@ public interface Channel extends ShutdownNotifier {
126136
*/
127137
void addConfirmListener(ConfirmListener listener);
128138

139+
/**
140+
* Add a lambda-based {@link ConfirmListener}.
141+
* @see ConfirmListener
142+
* @see ConfirmCallback
143+
* @param ackCallback callback on ack
144+
* @param nackCallback call on nack (negative ack)
145+
* @return the listener that wraps the callbacks
146+
*/
147+
ConfirmListener addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback);
148+
129149
/**
130150
* Remove a {@link ConfirmListener}.
131151
* @param listener the listener to remove
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
17+
package com.rabbitmq.client;
18+
19+
import java.io.IOException;
20+
21+
/**
22+
* Implement this interface in order to be notified of Confirm events.
23+
* Acks represent messages handled successfully; Nacks represent
24+
* messages lost by the broker. Note, the lost messages could still
25+
* have been delivered to consumers, but the broker cannot guarantee
26+
* this.
27+
* Prefer this interface over {@link ConfirmListener} for
28+
* a lambda-oriented syntax.
29+
* @see ConfirmListener
30+
*/
31+
@FunctionalInterface
32+
public interface ConfirmCallback {
33+
34+
void handle(long deliveryTag, boolean multiple) throws IOException;
35+
36+
}

src/main/java/com/rabbitmq/client/ConfirmListener.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
* messages lost by the broker. Note, the lost messages could still
2525
* have been delivered to consumers, but the broker cannot guarantee
2626
* this.
27+
* For a lambda-oriented syntax, use {@link ConfirmCallback}.
2728
*/
2829
public interface ConfirmListener {
2930
void handleAck(long deliveryTag, boolean multiple)

src/main/java/com/rabbitmq/client/Connection.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,17 @@ public interface Connection extends ShutdownNotifier, Closeable { // rename to A
239239
*/
240240
void addBlockedListener(BlockedListener listener);
241241

242+
/**
243+
* Add a lambda-based {@link BlockedListener}.
244+
* @see BlockedListener
245+
* @see BlockedCallback
246+
* @see UnblockedCallback
247+
* @param blockedCallback the callback when the connection is blocked
248+
* @param unblockedCallback the callback when the connection is unblocked
249+
* @return the listener that wraps the callback
250+
*/
251+
BlockedListener addBlockedListener(BlockedCallback blockedCallback, UnblockedCallback unblockedCallback);
252+
242253
/**
243254
* Remove a {@link BlockedListener}.
244255
* @param listener the listener to remove
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
package com.rabbitmq.client;
17+
18+
/**
19+
*
20+
*/
21+
public class Return {
22+
23+
private final int replyCode;
24+
private final String replyText;
25+
private final String exchange;
26+
private final String routingKey;
27+
private final AMQP.BasicProperties properties;
28+
private final byte[] body;
29+
30+
public Return(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) {
31+
this.replyCode = replyCode;
32+
this.replyText = replyText;
33+
this.exchange = exchange;
34+
this.routingKey = routingKey;
35+
this.properties = properties;
36+
this.body = body;
37+
}
38+
39+
public int getReplyCode() {
40+
return replyCode;
41+
}
42+
43+
public String getReplyText() {
44+
return replyText;
45+
}
46+
47+
public String getExchange() {
48+
return exchange;
49+
}
50+
51+
public String getRoutingKey() {
52+
return routingKey;
53+
}
54+
55+
public AMQP.BasicProperties getProperties() {
56+
return properties;
57+
}
58+
59+
public byte[] getBody() {
60+
return body;
61+
}
62+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
package com.rabbitmq.client;
17+
18+
/**
19+
* Implement this interface in order to be notified of failed
20+
* deliveries when basicPublish is called with "mandatory" or
21+
* "immediate" flags set.
22+
* Prefer this interface over {@link ReturnListener} for
23+
* a simpler, lambda-oriented syntax.
24+
* @see Channel#basicPublish
25+
* @see ReturnListener
26+
* @see Return
27+
*/
28+
@FunctionalInterface
29+
public interface ReturnCallback {
30+
31+
void handle(Return returnMessage);
32+
33+
}

src/main/java/com/rabbitmq/client/ReturnListener.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
* Implement this interface in order to be notified of failed
2323
* deliveries when basicPublish is called with "mandatory" or
2424
* "immediate" flags set.
25+
* For a lambda-oriented syntax, use {@link ReturnCallback}.
2526
* @see Channel#basicPublish
2627
*/
2728
public interface ReturnListener {

src/main/java/com/rabbitmq/client/ShutdownListener.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
* @see ShutdownNotifier
3030
* @see ShutdownSignalException
3131
*/
32+
@FunctionalInterface
3233
public interface ShutdownListener extends EventListener {
33-
public void shutdownCompleted(ShutdownSignalException cause);
34+
void shutdownCompleted(ShutdownSignalException cause);
3435
}

src/main/java/com/rabbitmq/client/ShutdownNotifier.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,26 +30,26 @@ public interface ShutdownNotifier {
3030
*
3131
* @param listener {@link ShutdownListener} to the component
3232
*/
33-
public void addShutdownListener(ShutdownListener listener);
33+
void addShutdownListener(ShutdownListener listener);
3434

3535
/**
3636
* Remove shutdown listener for the component.
3737
*
3838
* @param listener {@link ShutdownListener} to be removed
3939
*/
40-
public void removeShutdownListener(ShutdownListener listener);
40+
void removeShutdownListener(ShutdownListener listener);
4141

4242
/**
4343
* Get the shutdown reason object
4444
* @return ShutdownSignalException if component is closed, null otherwise
4545
*/
46-
public ShutdownSignalException getCloseReason();
46+
ShutdownSignalException getCloseReason();
4747

4848
/**
4949
* Protected API - notify the listeners attached to the component
5050
* @see com.rabbitmq.client.ShutdownListener
5151
*/
52-
public void notifyListeners();
52+
void notifyListeners();
5353

5454
/**
5555
* Determine whether the component is currently open.
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
16+
package com.rabbitmq.client;
17+
18+
import java.io.IOException;
19+
20+
/**
21+
* Implement this interface in order to be notified of connection unblock events.
22+
* Prefer it over {@link BlockedListener} for a lambda-oriented syntax.
23+
* @see BlockedListener
24+
* @see BlockedCallback
25+
*/
26+
@FunctionalInterface
27+
public interface UnblockedCallback {
28+
29+
void handle() throws IOException;
30+
31+
}

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1031,6 +1031,24 @@ public void addBlockedListener(BlockedListener listener) {
10311031
blockedListeners.add(listener);
10321032
}
10331033

1034+
@Override
1035+
public BlockedListener addBlockedListener(BlockedCallback blockedCallback, UnblockedCallback unblockedCallback) {
1036+
BlockedListener blockedListener = new BlockedListener() {
1037+
1038+
@Override
1039+
public void handleBlocked(String reason) throws IOException {
1040+
blockedCallback.handle(reason);
1041+
}
1042+
1043+
@Override
1044+
public void handleUnblocked() throws IOException {
1045+
unblockedCallback.handle();
1046+
}
1047+
};
1048+
this.addBlockedListener(blockedListener);
1049+
return blockedListener;
1050+
}
1051+
10341052
@Override
10351053
public boolean removeBlockedListener(BlockedListener listener) {
10361054
return blockedListeners.remove(listener);

src/main/java/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.CountDownLatch;
2727
import java.util.concurrent.TimeoutException;
2828

29+
import com.rabbitmq.client.ConfirmCallback;
2930
import com.rabbitmq.client.*;
3031
import com.rabbitmq.client.AMQP.BasicProperties;
3132
import com.rabbitmq.client.Method;
@@ -135,6 +136,15 @@ public void addReturnListener(ReturnListener listener) {
135136
returnListeners.add(listener);
136137
}
137138

139+
@Override
140+
public ReturnListener addReturnListener(ReturnCallback returnCallback) {
141+
ReturnListener returnListener = (replyCode, replyText, exchange, routingKey, properties, body) -> returnCallback.handle(new Return(
142+
replyCode, replyText, exchange, routingKey, properties, body
143+
));
144+
this.addReturnListener(returnListener);
145+
return returnListener;
146+
}
147+
138148
@Override
139149
public boolean removeReturnListener(ReturnListener listener) {
140150
return returnListeners.remove(listener);
@@ -150,6 +160,24 @@ public void addConfirmListener(ConfirmListener listener) {
150160
confirmListeners.add(listener);
151161
}
152162

163+
@Override
164+
public ConfirmListener addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback) {
165+
ConfirmListener confirmListener = new ConfirmListener() {
166+
167+
@Override
168+
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
169+
ackCallback.handle(deliveryTag, multiple);
170+
}
171+
172+
@Override
173+
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
174+
nackCallback.handle(deliveryTag, multiple);
175+
}
176+
};
177+
this.addConfirmListener(confirmListener);
178+
return confirmListener;
179+
}
180+
153181
@Override
154182
public boolean removeConfirmListener(ConfirmListener listener) {
155183
return confirmListeners.remove(listener);

0 commit comments

Comments
 (0)