OpenFire源码学习之二十三:关于消息的优化处理

消息处理

之前有说过,openfire的消息处理策略本人并不是很喜欢。先看下openfire上脱机消息策略。


个人认为消息关于会话的消息,用户的存储量应该无限大。服务器不应该被消息吃撑了。所谓聊天通讯,这一关很重要。

Openfire的消息是什么流程呢。

1、当用户登陆连接的时候。握手、认证、绑定资源、获取花名册、获取离线消息。

2、服务端会查找关系型数据库。经本人测试离线消息在数据库表中达到100万条以上的时候,查询速度非常慢,甚至会导致openfire奔溃。

.....

那么openfire在消息这块会有哪些不足之处呢。本人认为有一下几点:

1)用户登陆的时候需要验证用户的可效性。又一次需要查询数据库,而本人认为这一步没

   必要。可以删掉

2)用户每次登陆都需要重新获取自己的好友花名册。这样导致数据访问次数过多,服务端

push消息的量也增多。导致登陆流程很慢

3)用户发送的消息,没有回执。就是说A发送消息给B。而B又一致不回复。所以对于客

   户端A来讲压根不知道消息是否发送成。然而S(服务端)接收到A的消息转发给B的时

   候,B也不做回执。所以S也不知道消息到底是否成功到达。

4)用户的离线消息表访问峰值时,系统可能会奔溃。

针对上面的问题,下面一一做解答。

用户校验

问题1:比较简单。在DefaultLockOutProvider中有一个getDisabledStatus方法。该方法判断用户的可效行。禁用这个方法则可。

源码如下:

    public LockOutFlag getDisabledStatus(String username) {
        if (username == null) {
            throw new UnsupportedOperationException("Null username not allowed!");
        }
        if (provider.shouldNotBeCached()) {
            return provider.getDisabledStatus(username);
        }
        LockOutFlag flag = lockOutCache.get(username);
        // If ID wan't found in cache, load it up and put it there.
        if (flag == null) {
            synchronized (username.intern()) {
                flag = lockOutCache.get(username);
                // If group wan't found in cache, load it up and put it there.
                if (flag == null) {
                    flag = provider.getDisabledStatus(username);
                    lockOutCache.put(username, flag);
                }
            }
        }
        return flag;
    }

用户数据同步

问题2:将用户花名册、group、MUC等相关信息预知Redis中。设置用户的数据版本标志。用户每次登陆后只需要跟服务端记录的用户版本进行匹配。版本一致的时候则不需要每次都要重新同步了。本人针对用户聊天室重新做了XMPP的拓展。

A.用户发送请求获取自己所拥有的房间:

<iq id="Ho4CO-2" type="get">
    <query xmlns="jabber:iq:room"></query>
</iq>

B.服务端返回消息:

<iq type="result" id="GYIEb-8" to="1aaa@8ntmorv1ep4wgcy/Smack#1aaa1387704207688">
  <query xmlns="jabber:iq:room">
    <item>
      <room>
        <serviceid>1</serviceid>
        <name>eds</name>
        <roomid>16632</roomid>
        <naturalName>dd</naturalName>
        <description>dd</description>
        <subject>admin</subject>
        <affiliation>member</affiliation>
      </room>
      <room>
       .......
      </room>
    </item>
  </query>
</iq>	

C.用户获取房间内成员:

<iq id="hF4p7-9" to="srcs_room0@conference.8ntmorv1ep4wgcy" type="get">
     <query xmlns="http://jabber.org/protocol/muc#members"></query>
</iq>

D.服务端返回消息

<iq type="result" id="S4Q6V-8" from="srcs_room0@conference.8ntmorv1ep4wgcy" to="user1@8ntmorv1ep4wgcy/Smack#user11387159214678">
  <query xmlns="http://jabber.org/protocol/muc#members">
    <item affiliation="owner">
      <user jid="56@8ntmorv1ep4wgcy"/>
      <user jid="58@8ntmorv1ep4wgcy"/>
      <user jid="admin@8ntmorv1ep4wgcy"/>
    </item>
    <item affiliation="admin">
      <user jid="user1@8ntmorv1ep4wgcy"/>
      <user jid="user62@8ntmorv1ep4wgcy"/>
      <user jid="user63@8ntmorv1ep4wgcy"/>
    </item>
    <item affiliation="members">
      <user jid="user32@8ntmorv1ep4wgcy"/>
      .......
    </item>
  </query>
</iq>

下面代码主要描述服务端对用户聊天室请求的处理

public class IQMucMembersHandler extends IQHandler {

	private IQHandlerInfo info;
	private XMPPServer localServer;
	
	public IQMucMembersHandler() {
		super("XMPP MucMembers Handler");
        info = new IQHandlerInfo("query", "http://jabber.org/protocol/muc#members");
	}

	@Override
	public IQ handleIQ(IQ packet)  {
		IQ reply = IQ.createResultIQ(packet);
		reply.setType(IQ.Type.result);
		reply.setID(packet.getID());
		reply.setTo(packet.getFrom());
		reply.setFrom(packet.getTo());
		
		if (IQ.Type.get.equals(packet.getType())) {
			JID roomJID = packet.getTo();
			MUCRoom room = localServer.getMultiUserChatManager().
							getMultiUserChatService(roomJID).getChatRoom(packet.getTo().getNode());
			Element item = reply.setChildElement("query","http://jabber.org/protocol/muc#members");
			if (room != null) {
				Element owner = item.addElement("item");
				owner.addAttribute("affiliation", "owner");
				Collection<JID> owners  = room.getOwners();
				if (!owners.isEmpty()) {
					for (JID userJID : owners) {
						if ("admin".equals(userJID.getNode())){
							continue;
						}
						Element jid = owner.addElement("user");
						jid.addAttribute("jid", userJID.toBareJID());
					}
				}
				
				Element admin = item.addElement("item");
				admin.addAttribute("affiliation", "admin");
				Collection<JID> admins  = room.getAdmins();
				if (!admins.isEmpty() ) {
					for (JID userJID : admins) {
						Element jid = admin.addElement("user");
						jid.addAttribute("jid", userJID.toBareJID());
					}
				}
				
				Element member = item.addElement("item");
				member.addAttribute("affiliation", "members");
				Collection<JID> members  = room.getMembers();
				if (!members.isEmpty()) {
					for (JID userJID : members) {
						Element jid = member.addElement("user");
						jid.addAttribute("jid", userJID.toBareJID());
					}
				}
			}
			
		}
		return reply;
	}

	@Override
	public void initialize(XMPPServer server) {
        super.initialize(server);
        localServer = server;
     }

    @Override
	public IQHandlerInfo getInfo() {
        return info;
    }
}
这里回答了提出来的2点问题。第3、4个问下会在下面章节中回答
原文地址:https://www.cnblogs.com/huwf/p/4273345.html