MQTT大消息失败原因排查的过程

mqtt"/>Background小组内使用 MQTT 协议搭建了一个聊天服务器,前天在测大消息(超过5000汉字)时,连接直接变得不可用,后续发送的消息全部都收不到回复。服务器环境: Netty :4.1.32.Fin

本文将详细讲解MQTT新闻故障原因的排除过程。文章内容质量很高,我就分享给大家作为参考。希望大家看完这篇文章后对相关知识有一定的了解。

使用MQTT协议在

Background

组中设置了聊天服务器。前天测大消息(5000多个汉字)时,直接连接不上了,后续所有消息都没有回复。

服务器环境:

Netty :4.1.32.Final

使用Netty包中包含的MqttDecoder。

客户端:安卓。

排查过程

由于所有消息都打印在日志中,我们首先搜索了服务器日志,发现日志中没有发送任何消息。

是不是因为太长的时候客户端没有发消息?用tcpdump抓包,发现客户端正常发送,所有的包服务器都有ack,但是后续的服务器没有发回响应,猜测是服务器处理大消息失败。

Tcump使用-nn打印出ip和端口,-X打印网络包的内容,或者-w选项保存到文件中,然后使用tcpdump或wireshark进行分析。

然后我查了一下MQTT支持的最大有效载荷,MQTT的官方文件说是256M,肯定不会超过。

在服务器上抓取了包,收到了确认消息,但没有返回确认消息。

启动了联机调试,发现收到了一条发布类型的消息,但消息的类不是MqttPublishMessage,有效负载中没有数据,但消息中有一条错误消息太大消息: 56234字节。

谷歌方面,有网友遇到了同样的问题,虽然MQTT是用C语言在解决这个问题。

查看MqttDecoder,发现解码器的有效载荷限制最长(以下为部分代码),启动代码中调用默认构造函数,因此默认最大数据为8092字节。

public finalclashmqttdecoderextendsdreplayingdecoderdecoderstate {

privatedstaticfinaintdefault _ MAX _ BYTES _ IN _ MESSAGE=8092;

publicMqttDecoder(){ 0

这(DEFAULT _ MAX _ BYTES _ IN _ MEssage);

}

publicMqttDecoder(intmaxbytesimessage){ 0

super(DecoderState。READ _ FIXED _ HEADER);

this . maxbytes message=maxbytes message;

}

@覆盖

protected void decode(ChannelHandlerContextctx,ByteBufbuffer,ListObjectout)throwsException{

开关(状态()){ 0

caseREAD_FIXED_HEADER:tr

y {
                mqttFixedHeader = decodeFixedHeader(buffer);
                bytesRemainingInVariablePart = mqttFixedHeader.remainingLength();
                checkpoint(DecoderState.READ_VARIABLE_HEADER);
                // fall through
            } catch (Exception cause) {
                out.add(invalidMessage(cause));
                return;
            }
            case READ_VARIABLE_HEADER:  try {
                final Result<?> decodedVariableHeader = decodeVariableHeader(buffer, mqttFixedHeader);
                variableHeader = decodedVariableHeader.value;
                if (bytesRemainingInVariablePart > maxBytesInMessage) {
                    throw new DecoderException("too large message: " + bytesRemainingInVariablePart + " bytes");
                }
                bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed;
                checkpoint(DecoderState.READ_PAYLOAD);
                // fall through
            } catch (Exception cause) {
                out.add(invalidMessage(cause));
                return;
            }
            case READ_PAYLOAD: try {
                final Result<?> decodedPayload =
                        decodePayload(
                                buffer,
                                mqttFixedHeader.messageType(),
                                bytesRemainingInVariablePart,
                                variableHeader);
                bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed;
                if (bytesRemainingInVariablePart != 0) {
                    throw new DecoderException(
                            "non-zero remaining payload bytes: " +
                                    bytesRemainingInVariablePart + " (" + mqttFixedHeader.messageType() + ')');
                }
                checkpoint(DecoderState.READ_FIXED_HEADER);
                MqttMessage message = MqttMessageFactory.newMessage(
                        mqttFixedHeader, variableHeader, decodedPayload.value);
                mqttFixedHeader = null;
                variableHeader = null;
                out.add(message);
                break;
            } catch (Exception cause) {
                out.add(invalidMessage(cause));
                return;
            }
            case BAD_MESSAGE:
                // Keep discarding until disconnection.
                buffer.skipBytes(actualReadableBytes());
                break;
            default:
                // Shouldn't reach here.
                throw new Error();
        }
    }
    private MqttMessage invalidMessage(Throwable cause) {
      checkpoint(DecoderState.BAD_MESSAGE);
      return MqttMessageFactory.newInvalidMessage(mqttFixedHeader, variableHeader, cause);
    }
}

  1. 长消息的原因找到了,还剩一个问题,为什么后续的消息包括 ping 消息就再也发不出去了?经过查看代码,这与 MqttDecoder 的父类 ReplayingDecoder 有关系,查看源码有详尽的类说明, 在读取可变长度头部时,如果payload 超过了最大限制,那么直接抛出异常。摘出代码如下:

case READ_VARIABLE_HEADER:  try {
    final Result<?> decodedVariableHeader = decodeVariableHeader(buffer, mqttFixedHeader);
    variableHeader = decodedVariableHeader.value;
    if (bytesRemainingInVariablePart > maxBytesInMessage) {
        throw new DecoderException("too large message: " + bytesRemainingInVariablePart + " bytes");
    }
    bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed;
    checkpoint(DecoderState.READ_PAYLOAD);
    // fall through
} catch (Exception cause) {
    out.add(invalidMessage(cause));
    return;
}

在异常处理中,调用了 invalidMessage 方法,这个方法将 状态设为 DecoderState.BAD_MESSAGE, 在这个状态下,所有的字节都直接被丢弃。

case BAD_MESSAGE:
    // Keep discarding until disconnection.
    buffer.skipBytes(actualReadableBytes());
    break;

也就是说此后的消息都不会进入到业务处理逻辑,这条长连接废掉了。

解决方案

  1. 客户端对长消息做字数限制和拆分,保证单条消息不超过最大限制

  2. 服务端增大最大载荷长度,MqttDecoder 提供了构造函数(不建议使用,这样会增大服务器处理时间和内存负担)

关于MQTT大消息失败原因排查的过程就分享到这里了,希望

内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/36010.html

(0)