C#使用rabbitmq在接收消息事件处理中报错:
Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=505, text='UNEXPECTED_frame - expected content header for class 60, got non content header frame instead', classId=60, methodId=40
解决办法是将接收事件代码里面末尾加个线程休眠“System.Threading.Thread.Sleep(1);”
////// 监听消息队里的消息 /// /// 外围业务方法 public static void Receive(Funcfunc) { try { //如果未连接则重连连接队列 AgainInitMessageQueue(); //创建消费者对象 var consumer = new EventingBasicConsumer(channel); //监听消费事件,如果执行shutdown了,此事件会执行失败 consumer.Received += (model, ea) => { //接收到的消息 var message = Encoding.UTF8.GetString(ea.Body.Span); //委托外围方法处理业务 var result = func(message); if (result) { //业务处理成功后单条通知生产者 channel.BasicAck(ea.DeliveryTag, true);//手动应答MQ已经成功接收,批量应答 LogHelper.Info(typeof(MessageQueueFactory), $"MessageQueueFactory.Receive()-消费成功,message={message}"); } else { //业务处理失败需要重回队列等待消费 //channel.BasicReject(ea.DeliveryTag, true);//拒绝接收单条消息,是否重回队列 LogHelper.Error(typeof(MessageQueueFactory), $"MessageQueueFactory.Receive()-业务处理失败,进入缓存等待重新入列,message={message}", true); //###为了防止队里一直处于等到出列,导致后面数据无法消费,此处自动消费成功,进入缓存等待重新入列消费#### //添加处理失败的数据进缓存中 //添加消息重试次数缓存 超过上限不再进行重试 var mqbaseModel = SerializerHelper.DeserializerJson (message); if (mqbaseModel != null) { int retryTimes = ConvertBasic.ToInt32(RedisClient.GetValue (CacheKey.SendMsgFailRetryKey + mqbaseModel.MQCode)); RedisClient.SetValue(CacheKey.SendMsgFailRetryKey + mqbaseModel.MQCode, retryTimes + 1); } var ret = RedisClient.SAdd(CacheKey.SendMsgFailListKey, message); if (!ret) { LogHelper.Error(typeof(MessageQueueFactory), $"MessageQueueFactory.Receive()-添加处理失败的数据进缓存中失败,message={message}", true); } channel.BasicAck(ea.DeliveryTag, true);//手动应答MQ已经成功接收,批量应答 } Thread.Sleep(1); //此行代码必须,不然写入队列消息会报错 //错误消息:Already closed: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=505, text='UNEXPECTED_frame - expected content header for class 60, got non content header frame instead', classId=60, methodId=40 }; //监听shutdown事件 consumer.Shutdown += (model, e) => { //记录日志 LogHelper.Error(typeof(MessageQueueFactory), $"MessageQueueFactory.Receive()-shutdown被执行,队列监听失败e={SerializerHelper.SerializerJson(e)}", true); }; //消费者开启监听,手动应答 channel.BasicQos(0, 1, false);//逐条消费 channel.BasicConsume(queue: MQNameConfig.CommentQueue, autoAck: false, consumer: consumer); } catch (Exception ex) { LogHelper.Error(typeof(MessageQueueFactory), $"MessageQueueFactory.Receive()-监听消息队里的消息出错,{ex.Message}", ex, true); } }



