Csharp/C#教程:RabbitMQ C#驱动程序停止接收消息分享


RabbitMQ C#驱动程序停止接收消息

您是否有任何指示如何确定订阅问题何时发生,以便重新连接?

我的服务使用RabbitMQ.Client.MessagePatterns.Subscription进行订阅。 一段时间后,我的客户端默默地停止接收消息。 我怀疑网络问题,因为我的VPN连接不是最可靠的。

我已经通过文档阅读了一段时间,寻找一个关键,以找出这个订阅何时可能由于网络问题而没有太多运气而被打破。 我已经尝试检查连接和通道是否仍然打开,但似乎总是报告它仍处于打开状态。

它确实处理的消息工作得很好,并被确认回队列,因此我认为这不是“ack”的问题。

我确信我一定只是错过了一些简单的东西,但我还没有找到它。

public void Run(string brokerUri, Action handler) { log.Debug("Connecting to broker: {0}".Fill(brokerUri)); ConnectionFactory factory = new ConnectionFactory { Uri = brokerUri }; using (IConnection connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { channel.QueueDeclare(queueName, true, false, false, null); using (Subscription subscription = new Subscription(channel, queueName, false)) { while (!Cancelled) { BasicDeliverEventArgs args; if (!channel.IsOpen) { log.Error("The channel is no longer open, but we are still trying to process messages."); throw new InvalidOperationException("Channel is closed."); } else if (!connection.IsOpen) { log.Error("The connection is no longer open, but we are still trying to process message."); throw new InvalidOperationException("Connection is closed."); } bool gotMessage = subscription.Next(250, out args); if (gotMessage) { log.Debug("Received message"); try { handler(args.Body); } catch (Exception e) { log.Debug("Exception caught while processing message. Will be bubbled up.", e); throw; } log.Debug("Acknowledging message completion"); subscription.Ack(args); } } } } } } 

更新:

我通过在虚拟机中运行服务器来模拟网络故障,并且当我断开连接足够长的时间时,我确实得到exception(RabbitMQ.Client.Exceptions.OperationInterruptedException:AMQP操作被中断),所以它可能不是网络问题。 现在我不知道它会是什么但是在运行几个小时后就失败了。

编辑:由于我已经开始投票了,我应该指出.NET RabbitMQ客户端现在内置了这个function: https : //www.rabbitmq.com/dotnet-api-guide.html#connection-recovery

理想情况下,您应该能够使用它并避免手动实现重新连接逻辑。


我最近不得不实施几乎相同的事情。 据我所知,RabbitMQ上的大多数可用信息都假定您的网络非常可靠,或者您在与任何发送或接收消息的客户端相同的机器上运行RabbitMQ代理,从而允许Rabbit处理任何连接问题。

设置Rabbit客户端以防止断开连接并不是那么难,但是您需要处理一些特性。

你需要做的第一件事就是打开心跳:

 ConnectionFactory factory = new ConnectionFactory() { Uri = brokerUri, RequestedHeartbeat = 30, }; 

将“RequestedHeartbeat”设置为30将使客户端每30秒检查一次连接是否仍然存在。 如果没有打开,消息订阅者将愉快地坐在那里等待另一条消息进来,而不知道其连接已变坏。

打开心跳也会使服务器检查连接是否仍然正常,这可能非常重要。 如果在订户接收到消息之后但在确认消息之前连接变坏,则服务器只是假定客户端花费很长时间,并且消息在死连接上“卡住”直到它被关闭。 打开心跳后,服务器将识别连接变坏并关闭它,将消息放回队列中,以便其他用户可以处理它。 如果没有心跳,我必须手动进入并关闭Rabbit管理UI中的连接,以便将卡住的消息传递给订阅者。

其次,您需要处理OperationInterruptedException 。 正如您所注意到的,这通常是Rabbit客户端在注意到连接被中断时将抛出的exception。 如果在连接中断时调用IModel.QueueDeclare() ,则这是您将获得的exception。 处理此例外处理您的订阅,渠道和连接并创建新的例外。

最后,您必须处理消费者在尝试使用已关闭连接的消息时所执行的操作。 不幸的是,从Rabbit客户端队列中消费消息的每种不同方式似乎都有不同的反应。 如果在已关闭的连接上调用QueueingBasicConsumer.Queue.DequeueQueueingBasicConsumer将抛出EndOfStreamExceptionEventingBasicConsumer什么都不做,因为它只是在等待消息。 从我尝试它可以看出,您正在使用的Subscription类似乎从对Subscription.Next的调用返回true,但args值为null。 再一次,通过处理您的连接,通道和订阅并重新创建它们来处理这个问题。

当连接因心跳开启而失败时,connection.IsOpen的值将更新为False,因此您可以根据需要进行检查。 但是,由于心跳在单独的线程上运行,因此您仍需要处理检查时连接打开的情况,但在调用subscription.Next()之前关闭。

最后要注意的是IConnection.Dispose() 。 如果在关闭连接后调用dispose,则此调用将抛出EndOfStreamException 。 这对我来说似乎是一个错误,我不喜欢不在IDisposable对象上调用dispose,所以我调用它并吞下exception。

将这些全部放在一个快速而肮脏的例子中:

上述就是C#学习教程:RabbitMQ C#驱动程序停止接收消息分享的全部内容,如果对大家有所用处且需要了解更多关于C#学习教程,希望大家多多关注—计算机技术网(www.ctvol.com)!

 public bool Cancelled { get; set; } IConnection _connection = null; IModel _channel = null; Subscription _subscription = null; public void Run(string brokerUri, string queueName, Action handler) { ConnectionFactory factory = new ConnectionFactory() { Uri = brokerUri, RequestedHeartbeat = 30, }; while (!Cancelled) { try { if(_subscription == null) { try { _connection = factory.CreateConnection(); } catch(BrokerUnreachableException) { //You probably want to log the error and cancel after N tries, //otherwise start the loop over to try to connect again after a second or so. continue; } _channel = _connection.CreateModel(); _channel.QueueDeclare(queueName, true, false, false, null); _subscription = new Subscription(_channel, queueName, false); } BasicDeliverEventArgs args; bool gotMessage = _subscription.Next(250, out args); if (gotMessage) { if(args == null) { //This means the connection is closed. DisposeAllConnectionObjects(); continue; } handler(args.Body); _subscription.Ack(args); } } catch(OperationInterruptedException ex) { DisposeAllConnectionObjects(); } } DisposeAllConnectionObjects(); } private void DisposeAllConnectionObjects() { if(_subscription != null) { //IDisposable is implemented explicitly for some reason. ((IDisposable)_subscription).Dispose(); _subscription = null; } if(_channel != null) { _channel.Dispose(); _channel = null; } if(_connection != null) { try { _connection.Dispose(); } catch(EndOfStreamException) { } _connection = null; } } 

本文来自网络收集,不代表计算机技术网立场,如涉及侵权请联系管理员删除。

ctvol管理联系方式QQ:251552304

本文章地址:https://www.ctvol.com/cdevelopment/1010291.html

(0)
上一篇 2021年12月30日
下一篇 2021年12月30日

精彩推荐