diff --git a/projects/RabbitMQ.Client/Exceptions/PublishException.cs b/projects/RabbitMQ.Client/Exceptions/PublishException.cs index 42939ed23..b03366c06 100644 --- a/projects/RabbitMQ.Client/Exceptions/PublishException.cs +++ b/projects/RabbitMQ.Client/Exceptions/PublishException.cs @@ -42,7 +42,11 @@ public class PublishException : RabbitMQClientException private bool _isReturn = false; private ulong _publishSequenceNumber = ulong.MinValue; - public PublishException(ulong publishSequenceNumber, bool isReturn) : base() + public PublishException(ulong publishSequenceNumber, bool isReturn) : this(publishSequenceNumber, isReturn, "Message rejected by broker.") + { + } + + public PublishException(ulong publishSequenceNumber, bool isReturn, string message) : base(message) { if (publishSequenceNumber == ulong.MinValue) { @@ -63,4 +67,66 @@ public PublishException(ulong publishSequenceNumber, bool isReturn) : base() /// public ulong PublishSequenceNumber => _publishSequenceNumber; } + + /// + /// Class for exceptions related to publisher confirmations + /// or the mandatory flag, when basic.return is + /// sent from the broker. + /// + public class PublishReturnException : PublishException + { + private readonly string _exchange; + private readonly string _routingKey; + private readonly ushort _replyCode; + private readonly string _replyText; + + public PublishReturnException(ulong publishSequenceNumber, string message, + string? exchange = null, string? routingKey = null, + ushort? replyCode = null, string? replyText = null) + : base(publishSequenceNumber, true, message) + { + _exchange = exchange ?? string.Empty; + _routingKey = routingKey ?? string.Empty; + _replyCode = replyCode ?? 0; + _replyText = replyText ?? string.Empty; + } + + /// + /// Get the exchange associated with this basic.return + /// + public string Exchange => _exchange; + + /// + /// Get the routing key associated with this basic.return + /// + public string RoutingKey => _routingKey; + + /// + /// Get the reply code associated with this basic.return + /// + public ushort ReplyCode => _replyCode; + + /// + /// Get the reply text associated with this basic.return + /// + public string ReplyText => _replyText; + } + + internal static class PublishExceptionFactory + { + internal static PublishException Create(bool isReturn, + ulong deliveryTag, string? exchange = null, string? routingKey = null, + ushort? replyCode = null, string? replyText = null) + { + if (isReturn) + { + string message = $"{replyCode} {replyText} Exchange: {exchange} Routing Key: {routingKey}"; + return new PublishReturnException(deliveryTag, message, exchange, routingKey, replyCode, replyText); + } + else + { + return new PublishException(deliveryTag, isReturn); + } + } + } } diff --git a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs index d2ca45247..c694d2941 100644 --- a/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs +++ b/projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs @@ -200,7 +200,9 @@ private void HandleAck(ulong deliveryTag, bool multiple) } [MethodImpl(MethodImplOptions.AggressiveInlining)] - private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn) + private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn, + string? exchange = null, string? routingKey = null, + ushort? replyCode = null, string? replyText = null) { if (ShouldHandleAckOrNack(deliveryTag)) { @@ -210,7 +212,9 @@ private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn) { if (pair.Key <= deliveryTag) { - pair.Value.SetException(new PublishException(pair.Key, isReturn)); + PublishException ex = PublishExceptionFactory.Create(isReturn, pair.Key, + exchange, routingKey, replyCode, replyText); + pair.Value.SetException(ex); _confirmsTaskCompletionSources.Remove(pair.Key, out _); } } @@ -219,7 +223,9 @@ private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn) { if (_confirmsTaskCompletionSources.Remove(deliveryTag, out TaskCompletionSource? tcs)) { - tcs.SetException(new PublishException(deliveryTag, isReturn)); + PublishException ex = PublishExceptionFactory.Create(isReturn, deliveryTag, + exchange, routingKey, replyCode, replyText); + tcs.SetException(ex); } } } @@ -249,7 +255,9 @@ private void HandleReturn(BasicReturnEventArgs basicReturnEvent) } } - HandleNack(publishSequenceNumber, multiple: false, isReturn: true); + HandleNack(publishSequenceNumber, multiple: false, isReturn: true, + exchange: basicReturnEvent.Exchange, routingKey: basicReturnEvent.RoutingKey, + replyCode: basicReturnEvent.ReplyCode, replyText: basicReturnEvent.ReplyText); } } diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt index e69de29bb..9a4615009 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt @@ -0,0 +1,7 @@ +RabbitMQ.Client.Exceptions.PublishException.PublishException(ulong publishSequenceNumber, bool isReturn, string! message) -> void +RabbitMQ.Client.Exceptions.PublishReturnException +RabbitMQ.Client.Exceptions.PublishReturnException.Exchange.get -> string! +RabbitMQ.Client.Exceptions.PublishReturnException.PublishReturnException(ulong publishSequenceNumber, string! message, string? exchange = null, string? routingKey = null, ushort? replyCode = null, string? replyText = null) -> void +RabbitMQ.Client.Exceptions.PublishReturnException.ReplyCode.get -> ushort +RabbitMQ.Client.Exceptions.PublishReturnException.ReplyText.get -> string! +RabbitMQ.Client.Exceptions.PublishReturnException.RoutingKey.get -> string! \ No newline at end of file diff --git a/projects/Test/Integration/TestBasicPublishAsync.cs b/projects/Test/Integration/TestBasicPublishAsync.cs index 25de62eb7..4ce2dfc2c 100644 --- a/projects/Test/Integration/TestBasicPublishAsync.cs +++ b/projects/Test/Integration/TestBasicPublishAsync.cs @@ -29,8 +29,10 @@ // Copyright (c) 2007-2025 Broadcom. All Rights Reserved. //--------------------------------------------------------------------------- +using System; using System.Threading.Tasks; using RabbitMQ.Client; +using RabbitMQ.Client.Exceptions; using Xunit; using Xunit.Abstractions; @@ -49,7 +51,6 @@ public async Task TestQueuePurgeAsync() var publishSyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, true); var publishTask = Task.Run(async () => @@ -65,5 +66,23 @@ public async Task TestQueuePurgeAsync() Assert.True(await publishSyncSource.Task); Assert.Equal((uint)messageCount, await _channel.QueuePurgeAsync(q)); } + + [Fact] + public async Task TestBasicReturnAsync() + { + try + { + await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: Guid.NewGuid().ToString(), + mandatory: true, body: GetRandomBody()); + } + catch (PublishReturnException prex) + { + Assert.True(prex.IsReturn); + Assert.NotNull(prex.Exchange); + Assert.NotNull(prex.RoutingKey); + Assert.NotEqual(0, prex.ReplyCode); + Assert.NotNull(prex.ReplyText); + } + } } } diff --git a/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs b/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs index b033839a1..36b66346b 100644 --- a/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs +++ b/projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs @@ -145,8 +145,13 @@ await TestConcurrentOperationsAsync(async () => } catch (PublishException ex) { - if (ex.IsReturn) + if (ex is PublishReturnException prex) { + Assert.True(prex.IsReturn); + Assert.NotNull(prex.Exchange); + Assert.NotNull(prex.RoutingKey); + Assert.NotEqual(0, prex.ReplyCode); + Assert.NotNull(prex.ReplyText); Interlocked.Increment(ref totalReturnCount); } else